Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

805 satır
24KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "mime"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jan/fm-rds-tx/internal/audio"
  14. "github.com/jan/fm-rds-tx/internal/config"
  15. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  16. "github.com/jan/fm-rds-tx/internal/ingest"
  17. "github.com/jan/fm-rds-tx/internal/platform"
  18. )
  19. //go:embed ui.html
  20. var uiHTML []byte
  21. // TXController is an optional interface the Server uses to start/stop TX
  22. // and apply live config changes.
  23. type TXController interface {
  24. StartTX() error
  25. StopTX() error
  26. TXStats() map[string]any
  27. UpdateConfig(patch LivePatch) error
  28. ResetFault() error
  29. }
  30. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  31. // nil = no change.
  32. type LivePatch struct {
  33. FrequencyMHz *float64
  34. OutputDrive *float64
  35. StereoEnabled *bool
  36. StereoMode *string
  37. PilotLevel *float64
  38. RDSInjection *float64
  39. RDSEnabled *bool
  40. LimiterEnabled *bool
  41. LimiterCeiling *float64
  42. PS *string
  43. RadioText *string
  44. TA *bool
  45. TP *bool
  46. ToneLeftHz *float64
  47. ToneRightHz *float64
  48. ToneAmplitude *float64
  49. AudioGain *float64
  50. CompositeClipperEnabled *bool
  51. }
  52. type Server struct {
  53. mu sync.RWMutex
  54. cfg config.Config
  55. tx TXController
  56. drv platform.SoapyDriver // optional, for runtime stats
  57. streamSrc *audio.StreamSource // optional, for live audio ring stats
  58. audioIngress AudioIngress // optional, for /audio/stream
  59. ingestRt IngestRuntime // optional, for /runtime ingest stats
  60. saveConfig func(config.Config) error
  61. hardReload func()
  62. // BUG-F fix: reloadPending prevents multiple concurrent goroutines from
  63. // calling hardReload when handleIngestSave is hit multiple times quickly.
  64. reloadPending atomic.Bool
  65. audit auditCounters
  66. }
  67. type AudioIngress interface {
  68. WritePCM16(data []byte) (int, error)
  69. }
  70. type IngestRuntime interface {
  71. Stats() ingest.Stats
  72. }
  73. type auditEvent string
  74. const (
  75. auditMethodNotAllowed auditEvent = "methodNotAllowed"
  76. auditUnsupportedMediaType auditEvent = "unsupportedMediaType"
  77. auditBodyTooLarge auditEvent = "bodyTooLarge"
  78. auditUnexpectedBody auditEvent = "unexpectedBody"
  79. )
  80. type auditCounters struct {
  81. methodNotAllowed uint64
  82. unsupportedMediaType uint64
  83. bodyTooLarge uint64
  84. unexpectedBody uint64
  85. }
  86. const (
  87. maxConfigBodyBytes = 64 << 10 // 64 KiB
  88. configContentTypeHeader = "application/json"
  89. noBodyErrMsg = "request must not include a body"
  90. audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16"
  91. audioStreamBodyLimitDefault = 512 << 20 // 512 MiB
  92. )
  93. var audioStreamAllowedMediaTypes = []string{
  94. "application/octet-stream",
  95. "audio/l16",
  96. }
  97. var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override.
  98. func isJSONContentType(r *http.Request) bool {
  99. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  100. if ct == "" {
  101. return false
  102. }
  103. mediaType, _, err := mime.ParseMediaType(ct)
  104. if err != nil {
  105. return false
  106. }
  107. return strings.EqualFold(mediaType, configContentTypeHeader)
  108. }
  109. type ConfigPatch struct {
  110. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  111. OutputDrive *float64 `json:"outputDrive,omitempty"`
  112. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  113. StereoMode *string `json:"stereoMode,omitempty"`
  114. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  115. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  116. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  117. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  118. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  119. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  120. PS *string `json:"ps,omitempty"`
  121. RadioText *string `json:"radioText,omitempty"`
  122. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  123. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  124. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  125. AudioGain *float64 `json:"audioGain,omitempty"`
  126. PI *string `json:"pi,omitempty"`
  127. PTY *int `json:"pty,omitempty"`
  128. TP *bool `json:"tp,omitempty"`
  129. TA *bool `json:"ta,omitempty"`
  130. MS *bool `json:"ms,omitempty"`
  131. CTEnabled *bool `json:"ctEnabled,omitempty"`
  132. RTPlusEnabled *bool `json:"rtPlusEnabled,omitempty"`
  133. RTPlusSeparator *string `json:"rtPlusSeparator,omitempty"`
  134. PTYN *string `json:"ptyn,omitempty"`
  135. LPS *string `json:"lps,omitempty"`
  136. ERTEnabled *bool `json:"ertEnabled,omitempty"`
  137. ERT *string `json:"ert,omitempty"`
  138. RDS2Enabled *bool `json:"rds2Enabled,omitempty"`
  139. StationLogoPath *string `json:"stationLogoPath,omitempty"`
  140. AF *[]float64 `json:"af,omitempty"`
  141. BS412Enabled *bool `json:"bs412Enabled,omitempty"`
  142. BS412ThresholdDBr *float64 `json:"bs412ThresholdDBr,omitempty"`
  143. MpxGain *float64 `json:"mpxGain,omitempty"`
  144. CompositeClipperEnabled *bool `json:"compositeClipperEnabled,omitempty"`
  145. CompositeClipperIterations *int `json:"compositeClipperIterations,omitempty"`
  146. CompositeClipperSoftKnee *float64 `json:"compositeClipperSoftKnee,omitempty"`
  147. CompositeClipperLookaheadMs *float64 `json:"compositeClipperLookaheadMs,omitempty"`
  148. }
  149. type IngestSaveRequest struct {
  150. Ingest config.IngestConfig `json:"ingest"`
  151. }
  152. func NewServer(cfg config.Config) *Server {
  153. return &Server{cfg: cfg}
  154. }
  155. func hasRequestBody(r *http.Request) bool {
  156. if r.ContentLength > 0 {
  157. return true
  158. }
  159. for _, te := range r.TransferEncoding {
  160. if strings.EqualFold(te, "chunked") {
  161. return true
  162. }
  163. }
  164. return false
  165. }
  166. func (s *Server) rejectBody(w http.ResponseWriter, r *http.Request) bool {
  167. // Returns true when the request has an unexpected body and the error response
  168. // has already been written — callers should return immediately in that case.
  169. // Returns false when there is no body (happy path — request should proceed).
  170. if !hasRequestBody(r) {
  171. return false
  172. }
  173. s.recordAudit(auditUnexpectedBody)
  174. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  175. return true
  176. }
  177. func (s *Server) recordAudit(evt auditEvent) {
  178. switch evt {
  179. case auditMethodNotAllowed:
  180. atomic.AddUint64(&s.audit.methodNotAllowed, 1)
  181. case auditUnsupportedMediaType:
  182. atomic.AddUint64(&s.audit.unsupportedMediaType, 1)
  183. case auditBodyTooLarge:
  184. atomic.AddUint64(&s.audit.bodyTooLarge, 1)
  185. case auditUnexpectedBody:
  186. atomic.AddUint64(&s.audit.unexpectedBody, 1)
  187. }
  188. }
  189. func (s *Server) auditSnapshot() map[string]uint64 {
  190. return map[string]uint64{
  191. "methodNotAllowed": atomic.LoadUint64(&s.audit.methodNotAllowed),
  192. "unsupportedMediaType": atomic.LoadUint64(&s.audit.unsupportedMediaType),
  193. "bodyTooLarge": atomic.LoadUint64(&s.audit.bodyTooLarge),
  194. "unexpectedBody": atomic.LoadUint64(&s.audit.unexpectedBody),
  195. }
  196. }
  197. func isAudioStreamContentType(r *http.Request) bool {
  198. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  199. if ct == "" {
  200. return false
  201. }
  202. mediaType, _, err := mime.ParseMediaType(ct)
  203. if err != nil {
  204. return false
  205. }
  206. for _, allowed := range audioStreamAllowedMediaTypes {
  207. if strings.EqualFold(mediaType, allowed) {
  208. return true
  209. }
  210. }
  211. return false
  212. }
  213. func (s *Server) SetTXController(tx TXController) {
  214. s.mu.Lock()
  215. s.tx = tx
  216. s.mu.Unlock()
  217. }
  218. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  219. s.mu.Lock()
  220. s.drv = drv
  221. s.mu.Unlock()
  222. }
  223. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  224. s.mu.Lock()
  225. s.streamSrc = src
  226. s.mu.Unlock()
  227. }
  228. func (s *Server) SetAudioIngress(ingress AudioIngress) {
  229. s.mu.Lock()
  230. s.audioIngress = ingress
  231. s.mu.Unlock()
  232. }
  233. func (s *Server) SetIngestRuntime(rt IngestRuntime) {
  234. s.mu.Lock()
  235. s.ingestRt = rt
  236. s.mu.Unlock()
  237. }
  238. func (s *Server) SetConfigSaver(save func(config.Config) error) {
  239. s.mu.Lock()
  240. s.saveConfig = save
  241. s.mu.Unlock()
  242. }
  243. func (s *Server) SetHardReload(fn func()) {
  244. s.mu.Lock()
  245. s.hardReload = fn
  246. s.mu.Unlock()
  247. }
  248. func (s *Server) Handler() http.Handler {
  249. mux := http.NewServeMux()
  250. mux.HandleFunc("/", s.handleUI)
  251. mux.HandleFunc("/healthz", s.handleHealth)
  252. mux.HandleFunc("/status", s.handleStatus)
  253. mux.HandleFunc("/dry-run", s.handleDryRun)
  254. mux.HandleFunc("/config", s.handleConfig)
  255. mux.HandleFunc("/config/ingest/save", s.handleIngestSave)
  256. mux.HandleFunc("/runtime", s.handleRuntime)
  257. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  258. mux.HandleFunc("/tx/start", s.handleTXStart)
  259. mux.HandleFunc("/tx/stop", s.handleTXStop)
  260. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  261. return mux
  262. }
  263. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  264. w.Header().Set("Content-Type", "application/json")
  265. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  266. }
  267. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  268. if r.URL.Path != "/" {
  269. http.NotFound(w, r)
  270. return
  271. }
  272. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  273. w.Header().Set("Cache-Control", "no-cache")
  274. w.Write(uiHTML)
  275. }
  276. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  277. s.mu.RLock()
  278. cfg := s.cfg
  279. tx := s.tx
  280. s.mu.RUnlock()
  281. status := map[string]any{
  282. "service": "fm-rds-tx",
  283. "backend": cfg.Backend.Kind,
  284. "frequencyMHz": cfg.FM.FrequencyMHz,
  285. "stereoEnabled": cfg.FM.StereoEnabled,
  286. "stereoMode": cfg.FM.StereoMode,
  287. "rdsEnabled": cfg.RDS.Enabled,
  288. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  289. "limiterEnabled": cfg.FM.LimiterEnabled,
  290. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  291. }
  292. if tx != nil {
  293. if stats := tx.TXStats(); stats != nil {
  294. if ri, ok := stats["runtimeIndicator"]; ok {
  295. status["runtimeIndicator"] = ri
  296. }
  297. if alert, ok := stats["runtimeAlert"]; ok {
  298. status["runtimeAlert"] = alert
  299. }
  300. if queue, ok := stats["queue"]; ok {
  301. status["queue"] = queue
  302. }
  303. if runtimeState, ok := stats["state"]; ok {
  304. status["runtimeState"] = runtimeState
  305. }
  306. }
  307. }
  308. w.Header().Set("Content-Type", "application/json")
  309. _ = json.NewEncoder(w).Encode(status)
  310. }
  311. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  312. s.mu.RLock()
  313. drv := s.drv
  314. tx := s.tx
  315. stream := s.streamSrc
  316. ingestRt := s.ingestRt
  317. s.mu.RUnlock()
  318. result := map[string]any{}
  319. if drv != nil {
  320. result["driver"] = drv.Stats()
  321. }
  322. if tx != nil {
  323. if stats := tx.TXStats(); stats != nil {
  324. result["engine"] = stats
  325. }
  326. }
  327. if stream != nil {
  328. result["audioStream"] = stream.Stats()
  329. }
  330. if ingestRt != nil {
  331. result["ingest"] = ingestRt.Stats()
  332. }
  333. result["controlAudit"] = s.auditSnapshot()
  334. w.Header().Set("Content-Type", "application/json")
  335. _ = json.NewEncoder(w).Encode(result)
  336. }
  337. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  338. if r.Method != http.MethodPost {
  339. s.recordAudit(auditMethodNotAllowed)
  340. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  341. return
  342. }
  343. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  344. return
  345. }
  346. s.mu.RLock()
  347. tx := s.tx
  348. s.mu.RUnlock()
  349. if tx == nil {
  350. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  351. return
  352. }
  353. if err := tx.ResetFault(); err != nil {
  354. http.Error(w, err.Error(), http.StatusConflict)
  355. return
  356. }
  357. w.Header().Set("Content-Type", "application/json")
  358. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  359. }
  360. // handleAudioStream accepts raw S16LE PCM via HTTP POST and pushes
  361. // it into the configured ingest http-raw source. Use with:
  362. //
  363. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  364. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  365. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  366. if r.Method != http.MethodPost {
  367. s.recordAudit(auditMethodNotAllowed)
  368. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  369. return
  370. }
  371. if !isAudioStreamContentType(r) {
  372. s.recordAudit(auditUnsupportedMediaType)
  373. http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType)
  374. return
  375. }
  376. s.mu.RLock()
  377. ingress := s.audioIngress
  378. s.mu.RUnlock()
  379. if ingress == nil {
  380. http.Error(w, "audio ingest not configured (use --audio-http with ingest runtime)", http.StatusServiceUnavailable)
  381. return
  382. }
  383. // BUG-10 fix: /audio/stream is a long-lived streaming endpoint.
  384. // The global HTTP server ReadTimeout (5s) and WriteTimeout (10s) would
  385. // kill connections mid-stream. Disable them per-request via ResponseController
  386. // (requires Go 1.20+, confirmed Go 1.22).
  387. rc := http.NewResponseController(w)
  388. _ = rc.SetReadDeadline(time.Time{})
  389. _ = rc.SetWriteDeadline(time.Time{})
  390. r.Body = http.MaxBytesReader(w, r.Body, audioStreamBodyLimit)
  391. // Read body in chunks and push to ring buffer
  392. buf := make([]byte, 32768)
  393. totalFrames := 0
  394. for {
  395. n, err := r.Body.Read(buf)
  396. if n > 0 {
  397. written, writeErr := ingress.WritePCM16(buf[:n])
  398. totalFrames += written
  399. if writeErr != nil {
  400. http.Error(w, writeErr.Error(), http.StatusServiceUnavailable)
  401. return
  402. }
  403. }
  404. if err != nil {
  405. if err == io.EOF {
  406. break
  407. }
  408. var maxErr *http.MaxBytesError
  409. if errors.As(err, &maxErr) {
  410. s.recordAudit(auditBodyTooLarge)
  411. http.Error(w, maxErr.Error(), http.StatusRequestEntityTooLarge)
  412. return
  413. }
  414. http.Error(w, err.Error(), http.StatusInternalServerError)
  415. return
  416. }
  417. }
  418. w.Header().Set("Content-Type", "application/json")
  419. _ = json.NewEncoder(w).Encode(map[string]any{
  420. "ok": true,
  421. "frames": totalFrames,
  422. })
  423. }
  424. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  425. if r.Method != http.MethodPost {
  426. s.recordAudit(auditMethodNotAllowed)
  427. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  428. return
  429. }
  430. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  431. return
  432. }
  433. s.mu.RLock()
  434. tx := s.tx
  435. s.mu.RUnlock()
  436. if tx == nil {
  437. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  438. return
  439. }
  440. if err := tx.StartTX(); err != nil {
  441. http.Error(w, err.Error(), http.StatusConflict)
  442. return
  443. }
  444. w.Header().Set("Content-Type", "application/json")
  445. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  446. }
  447. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  448. if r.Method != http.MethodPost {
  449. s.recordAudit(auditMethodNotAllowed)
  450. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  451. return
  452. }
  453. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  454. return
  455. }
  456. s.mu.RLock()
  457. tx := s.tx
  458. s.mu.RUnlock()
  459. if tx == nil {
  460. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  461. return
  462. }
  463. go func() {
  464. _ = tx.StopTX()
  465. }()
  466. w.Header().Set("Content-Type", "application/json")
  467. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stop-requested"})
  468. }
  469. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  470. s.mu.RLock()
  471. cfg := s.cfg
  472. s.mu.RUnlock()
  473. w.Header().Set("Content-Type", "application/json")
  474. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  475. }
  476. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  477. switch r.Method {
  478. case http.MethodGet:
  479. s.mu.RLock()
  480. cfg := s.cfg
  481. s.mu.RUnlock()
  482. w.Header().Set("Content-Type", "application/json")
  483. _ = json.NewEncoder(w).Encode(cfg)
  484. case http.MethodPost:
  485. if !isJSONContentType(r) {
  486. s.recordAudit(auditUnsupportedMediaType)
  487. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  488. return
  489. }
  490. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  491. var patch ConfigPatch
  492. // BUG-4 fix: reject unknown JSON fields (typos) with 400 rather than
  493. // silently ignoring them (e.g. "outputDrvie" would succeed and do nothing).
  494. dec := json.NewDecoder(r.Body)
  495. dec.DisallowUnknownFields()
  496. if err := dec.Decode(&patch); err != nil {
  497. statusCode := http.StatusBadRequest
  498. if strings.Contains(err.Error(), "http: request body too large") {
  499. statusCode = http.StatusRequestEntityTooLarge
  500. s.recordAudit(auditBodyTooLarge)
  501. }
  502. http.Error(w, err.Error(), statusCode)
  503. return
  504. }
  505. // Update the server's config snapshot (for GET /config and /status)
  506. s.mu.Lock()
  507. next := s.cfg
  508. if patch.FrequencyMHz != nil {
  509. next.FM.FrequencyMHz = *patch.FrequencyMHz
  510. }
  511. if patch.OutputDrive != nil {
  512. next.FM.OutputDrive = *patch.OutputDrive
  513. }
  514. if patch.ToneLeftHz != nil {
  515. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  516. }
  517. if patch.ToneRightHz != nil {
  518. next.Audio.ToneRightHz = *patch.ToneRightHz
  519. }
  520. if patch.ToneAmplitude != nil {
  521. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  522. }
  523. if patch.AudioGain != nil {
  524. next.Audio.Gain = *patch.AudioGain
  525. }
  526. if patch.PS != nil {
  527. next.RDS.PS = *patch.PS
  528. }
  529. if patch.RadioText != nil {
  530. next.RDS.RadioText = *patch.RadioText
  531. }
  532. if patch.PI != nil {
  533. next.RDS.PI = *patch.PI
  534. }
  535. if patch.PTY != nil {
  536. next.RDS.PTY = *patch.PTY
  537. }
  538. if patch.TP != nil {
  539. next.RDS.TP = *patch.TP
  540. }
  541. if patch.TA != nil {
  542. next.RDS.TA = *patch.TA
  543. }
  544. if patch.MS != nil {
  545. next.RDS.MS = *patch.MS
  546. }
  547. if patch.CTEnabled != nil {
  548. next.RDS.CTEnabled = *patch.CTEnabled
  549. }
  550. if patch.RTPlusEnabled != nil {
  551. next.RDS.RTPlusEnabled = *patch.RTPlusEnabled
  552. }
  553. if patch.RTPlusSeparator != nil {
  554. next.RDS.RTPlusSeparator = *patch.RTPlusSeparator
  555. }
  556. if patch.PTYN != nil {
  557. next.RDS.PTYN = *patch.PTYN
  558. }
  559. if patch.LPS != nil {
  560. next.RDS.LPS = *patch.LPS
  561. }
  562. if patch.ERTEnabled != nil {
  563. next.RDS.ERTEnabled = *patch.ERTEnabled
  564. }
  565. if patch.ERT != nil {
  566. next.RDS.ERT = *patch.ERT
  567. }
  568. if patch.RDS2Enabled != nil {
  569. next.RDS.RDS2Enabled = *patch.RDS2Enabled
  570. }
  571. if patch.StationLogoPath != nil {
  572. next.RDS.StationLogoPath = *patch.StationLogoPath
  573. }
  574. if patch.AF != nil {
  575. next.RDS.AF = *patch.AF
  576. }
  577. if patch.PreEmphasisTauUS != nil {
  578. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  579. }
  580. if patch.StereoEnabled != nil {
  581. next.FM.StereoEnabled = *patch.StereoEnabled
  582. }
  583. if patch.StereoMode != nil {
  584. next.FM.StereoMode = *patch.StereoMode
  585. }
  586. if patch.LimiterEnabled != nil {
  587. next.FM.LimiterEnabled = *patch.LimiterEnabled
  588. }
  589. if patch.LimiterCeiling != nil {
  590. next.FM.LimiterCeiling = *patch.LimiterCeiling
  591. }
  592. if patch.RDSEnabled != nil {
  593. next.RDS.Enabled = *patch.RDSEnabled
  594. }
  595. if patch.PilotLevel != nil {
  596. next.FM.PilotLevel = *patch.PilotLevel
  597. }
  598. if patch.RDSInjection != nil {
  599. next.FM.RDSInjection = *patch.RDSInjection
  600. }
  601. if patch.BS412Enabled != nil {
  602. next.FM.BS412Enabled = *patch.BS412Enabled
  603. }
  604. if patch.BS412ThresholdDBr != nil {
  605. next.FM.BS412ThresholdDBr = *patch.BS412ThresholdDBr
  606. }
  607. if patch.MpxGain != nil {
  608. next.FM.MpxGain = *patch.MpxGain
  609. }
  610. if patch.CompositeClipperEnabled != nil {
  611. next.FM.CompositeClipper.Enabled = *patch.CompositeClipperEnabled
  612. }
  613. if patch.CompositeClipperIterations != nil {
  614. next.FM.CompositeClipper.Iterations = *patch.CompositeClipperIterations
  615. }
  616. if patch.CompositeClipperSoftKnee != nil {
  617. next.FM.CompositeClipper.SoftKnee = *patch.CompositeClipperSoftKnee
  618. }
  619. if patch.CompositeClipperLookaheadMs != nil {
  620. next.FM.CompositeClipper.LookaheadMs = *patch.CompositeClipperLookaheadMs
  621. }
  622. if err := next.Validate(); err != nil {
  623. s.mu.Unlock()
  624. http.Error(w, err.Error(), http.StatusBadRequest)
  625. return
  626. }
  627. lp := LivePatch{
  628. FrequencyMHz: patch.FrequencyMHz,
  629. OutputDrive: patch.OutputDrive,
  630. StereoEnabled: patch.StereoEnabled,
  631. StereoMode: patch.StereoMode,
  632. PilotLevel: patch.PilotLevel,
  633. RDSInjection: patch.RDSInjection,
  634. RDSEnabled: patch.RDSEnabled,
  635. LimiterEnabled: patch.LimiterEnabled,
  636. LimiterCeiling: patch.LimiterCeiling,
  637. PS: patch.PS,
  638. RadioText: patch.RadioText,
  639. TA: patch.TA,
  640. TP: patch.TP,
  641. ToneLeftHz: patch.ToneLeftHz,
  642. ToneRightHz: patch.ToneRightHz,
  643. ToneAmplitude: patch.ToneAmplitude,
  644. AudioGain: patch.AudioGain,
  645. CompositeClipperEnabled: patch.CompositeClipperEnabled,
  646. }
  647. // NEU-02 fix: determine whether any live-patchable fields are present,
  648. // then release the lock before calling UpdateConfig to avoid holding
  649. // s.mu across a potentially blocking engine call.
  650. tx := s.tx
  651. hasLiveFields := patch.FrequencyMHz != nil || patch.OutputDrive != nil ||
  652. patch.StereoEnabled != nil || patch.StereoMode != nil || patch.PilotLevel != nil ||
  653. patch.RDSInjection != nil || patch.RDSEnabled != nil ||
  654. patch.LimiterEnabled != nil || patch.LimiterCeiling != nil ||
  655. patch.PS != nil || patch.RadioText != nil || patch.TA != nil || patch.TP != nil ||
  656. patch.ToneLeftHz != nil || patch.ToneRightHz != nil ||
  657. patch.ToneAmplitude != nil || patch.AudioGain != nil ||
  658. patch.CompositeClipperEnabled != nil
  659. s.mu.Unlock()
  660. // Apply live fields to running engine outside the lock.
  661. if tx != nil && hasLiveFields {
  662. if err := tx.UpdateConfig(lp); err != nil {
  663. http.Error(w, err.Error(), http.StatusBadRequest)
  664. return
  665. }
  666. }
  667. // Persist the validated config snapshot when a config saver is available.
  668. // This ensures restart-required UI changes survive process restarts instead
  669. // of only updating the in-memory snapshot.
  670. s.mu.RLock()
  671. save := s.saveConfig
  672. s.mu.RUnlock()
  673. if save != nil {
  674. if err := save(next); err != nil {
  675. http.Error(w, err.Error(), http.StatusInternalServerError)
  676. return
  677. }
  678. }
  679. // Commit the server snapshot only after validation, optional persistence,
  680. // and any required live update succeeded.
  681. s.mu.Lock()
  682. s.cfg = next
  683. s.mu.Unlock()
  684. // NEU-03 fix: report live=true only when live-patchable fields were applied.
  685. live := tx != nil && hasLiveFields
  686. w.Header().Set("Content-Type", "application/json")
  687. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  688. default:
  689. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  690. }
  691. }
  692. func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) {
  693. if r.Method != http.MethodPost {
  694. s.recordAudit(auditMethodNotAllowed)
  695. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  696. return
  697. }
  698. if !isJSONContentType(r) {
  699. s.recordAudit(auditUnsupportedMediaType)
  700. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  701. return
  702. }
  703. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  704. var req IngestSaveRequest
  705. if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
  706. statusCode := http.StatusBadRequest
  707. if strings.Contains(err.Error(), "http: request body too large") {
  708. statusCode = http.StatusRequestEntityTooLarge
  709. s.recordAudit(auditBodyTooLarge)
  710. }
  711. http.Error(w, err.Error(), statusCode)
  712. return
  713. }
  714. s.mu.Lock()
  715. next := s.cfg
  716. next.Ingest = req.Ingest
  717. if err := next.Validate(); err != nil {
  718. s.mu.Unlock()
  719. http.Error(w, err.Error(), http.StatusBadRequest)
  720. return
  721. }
  722. save := s.saveConfig
  723. reload := s.hardReload
  724. if save == nil {
  725. s.mu.Unlock()
  726. http.Error(w, "config save is not configured (start with --config <path>)", http.StatusServiceUnavailable)
  727. return
  728. }
  729. if err := save(next); err != nil {
  730. s.mu.Unlock()
  731. http.Error(w, err.Error(), http.StatusInternalServerError)
  732. return
  733. }
  734. s.cfg = next
  735. s.mu.Unlock()
  736. w.Header().Set("Content-Type", "application/json")
  737. reloadScheduled := reload != nil
  738. _ = json.NewEncoder(w).Encode(map[string]any{
  739. "ok": true,
  740. "saved": true,
  741. "reloadScheduled": reloadScheduled,
  742. })
  743. if reloadScheduled && s.reloadPending.CompareAndSwap(false, true) {
  744. go func(fn func()) {
  745. time.Sleep(250 * time.Millisecond)
  746. s.reloadPending.Store(false)
  747. fn()
  748. }(reload)
  749. }
  750. }