Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

848 line
25KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "mime"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jan/fm-rds-tx/internal/audio"
  14. "github.com/jan/fm-rds-tx/internal/config"
  15. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  16. "github.com/jan/fm-rds-tx/internal/ingest"
  17. "github.com/jan/fm-rds-tx/internal/platform"
  18. )
  19. //go:embed ui.html
  20. var uiHTML []byte
  21. //go:embed logo.png
  22. var logoPNG []byte
  23. // TXController is an optional interface the Server uses to start/stop TX
  24. // and apply live config changes.
  25. type TXController interface {
  26. StartTX() error
  27. StopTX() error
  28. TXStats() map[string]any
  29. UpdateConfig(patch LivePatch) error
  30. ResetFault() error
  31. }
  32. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  33. // nil = no change.
  34. type LivePatch struct {
  35. FrequencyMHz *float64
  36. OutputDrive *float64
  37. StereoEnabled *bool
  38. StereoMode *string
  39. PilotLevel *float64
  40. RDSInjection *float64
  41. RDSEnabled *bool
  42. LimiterEnabled *bool
  43. LimiterCeiling *float64
  44. PS *string
  45. RadioText *string
  46. TA *bool
  47. TP *bool
  48. ToneLeftHz *float64
  49. ToneRightHz *float64
  50. ToneAmplitude *float64
  51. AudioGain *float64
  52. CompositeClipperEnabled *bool
  53. }
  54. type Server struct {
  55. mu sync.RWMutex
  56. cfg config.Config
  57. tx TXController
  58. drv platform.SoapyDriver // optional, for runtime stats
  59. streamSrc *audio.StreamSource // optional, for live audio ring stats
  60. audioIngress AudioIngress // optional, for /audio/stream
  61. ingestRt IngestRuntime // optional, for /runtime ingest stats
  62. saveConfig func(config.Config) error
  63. hardReload func()
  64. // BUG-F fix: reloadPending prevents multiple concurrent goroutines from
  65. // calling hardReload when handleIngestSave is hit multiple times quickly.
  66. reloadPending atomic.Bool
  67. audit auditCounters
  68. }
  69. type AudioIngress interface {
  70. WritePCM16(data []byte) (int, error)
  71. }
  72. type IngestRuntime interface {
  73. Stats() ingest.Stats
  74. }
  75. type auditEvent string
  76. const (
  77. auditMethodNotAllowed auditEvent = "methodNotAllowed"
  78. auditUnsupportedMediaType auditEvent = "unsupportedMediaType"
  79. auditBodyTooLarge auditEvent = "bodyTooLarge"
  80. auditUnexpectedBody auditEvent = "unexpectedBody"
  81. )
  82. type auditCounters struct {
  83. methodNotAllowed uint64
  84. unsupportedMediaType uint64
  85. bodyTooLarge uint64
  86. unexpectedBody uint64
  87. }
  88. const (
  89. maxConfigBodyBytes = 64 << 10 // 64 KiB
  90. configContentTypeHeader = "application/json"
  91. noBodyErrMsg = "request must not include a body"
  92. audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16"
  93. audioStreamBodyLimitDefault = 512 << 20 // 512 MiB
  94. )
  95. var audioStreamAllowedMediaTypes = []string{
  96. "application/octet-stream",
  97. "audio/l16",
  98. }
  99. var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override.
  100. func isJSONContentType(r *http.Request) bool {
  101. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  102. if ct == "" {
  103. return false
  104. }
  105. mediaType, _, err := mime.ParseMediaType(ct)
  106. if err != nil {
  107. return false
  108. }
  109. return strings.EqualFold(mediaType, configContentTypeHeader)
  110. }
  111. type ConfigPatch struct {
  112. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  113. OutputDrive *float64 `json:"outputDrive,omitempty"`
  114. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  115. StereoMode *string `json:"stereoMode,omitempty"`
  116. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  117. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  118. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  119. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  120. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  121. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  122. PS *string `json:"ps,omitempty"`
  123. RadioText *string `json:"radioText,omitempty"`
  124. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  125. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  126. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  127. AudioGain *float64 `json:"audioGain,omitempty"`
  128. PI *string `json:"pi,omitempty"`
  129. PTY *int `json:"pty,omitempty"`
  130. TP *bool `json:"tp,omitempty"`
  131. TA *bool `json:"ta,omitempty"`
  132. MS *bool `json:"ms,omitempty"`
  133. CTEnabled *bool `json:"ctEnabled,omitempty"`
  134. RTPlusEnabled *bool `json:"rtPlusEnabled,omitempty"`
  135. RTPlusSeparator *string `json:"rtPlusSeparator,omitempty"`
  136. PTYN *string `json:"ptyn,omitempty"`
  137. LPS *string `json:"lps,omitempty"`
  138. ERTEnabled *bool `json:"ertEnabled,omitempty"`
  139. ERT *string `json:"ert,omitempty"`
  140. RDS2Enabled *bool `json:"rds2Enabled,omitempty"`
  141. StationLogoPath *string `json:"stationLogoPath,omitempty"`
  142. AF *[]float64 `json:"af,omitempty"`
  143. BS412Enabled *bool `json:"bs412Enabled,omitempty"`
  144. BS412ThresholdDBr *float64 `json:"bs412ThresholdDBr,omitempty"`
  145. MpxGain *float64 `json:"mpxGain,omitempty"`
  146. CompositeClipperEnabled *bool `json:"compositeClipperEnabled,omitempty"`
  147. CompositeClipperIterations *int `json:"compositeClipperIterations,omitempty"`
  148. CompositeClipperSoftKnee *float64 `json:"compositeClipperSoftKnee,omitempty"`
  149. CompositeClipperLookaheadMs *float64 `json:"compositeClipperLookaheadMs,omitempty"`
  150. }
  151. type IngestSaveRequest struct {
  152. Ingest config.IngestConfig `json:"ingest"`
  153. }
  154. func NewServer(cfg config.Config) *Server {
  155. return &Server{cfg: cfg}
  156. }
  157. func hasRequestBody(r *http.Request) bool {
  158. if r.ContentLength > 0 {
  159. return true
  160. }
  161. for _, te := range r.TransferEncoding {
  162. if strings.EqualFold(te, "chunked") {
  163. return true
  164. }
  165. }
  166. return false
  167. }
  168. func (s *Server) rejectBody(w http.ResponseWriter, r *http.Request) bool {
  169. // Returns true when the request has an unexpected body and the error response
  170. // has already been written — callers should return immediately in that case.
  171. // Returns false when there is no body (happy path — request should proceed).
  172. if !hasRequestBody(r) {
  173. return false
  174. }
  175. s.recordAudit(auditUnexpectedBody)
  176. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  177. return true
  178. }
  179. func (s *Server) recordAudit(evt auditEvent) {
  180. switch evt {
  181. case auditMethodNotAllowed:
  182. atomic.AddUint64(&s.audit.methodNotAllowed, 1)
  183. case auditUnsupportedMediaType:
  184. atomic.AddUint64(&s.audit.unsupportedMediaType, 1)
  185. case auditBodyTooLarge:
  186. atomic.AddUint64(&s.audit.bodyTooLarge, 1)
  187. case auditUnexpectedBody:
  188. atomic.AddUint64(&s.audit.unexpectedBody, 1)
  189. }
  190. }
  191. func (s *Server) auditSnapshot() map[string]uint64 {
  192. return map[string]uint64{
  193. "methodNotAllowed": atomic.LoadUint64(&s.audit.methodNotAllowed),
  194. "unsupportedMediaType": atomic.LoadUint64(&s.audit.unsupportedMediaType),
  195. "bodyTooLarge": atomic.LoadUint64(&s.audit.bodyTooLarge),
  196. "unexpectedBody": atomic.LoadUint64(&s.audit.unexpectedBody),
  197. }
  198. }
  199. func isAudioStreamContentType(r *http.Request) bool {
  200. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  201. if ct == "" {
  202. return false
  203. }
  204. mediaType, _, err := mime.ParseMediaType(ct)
  205. if err != nil {
  206. return false
  207. }
  208. for _, allowed := range audioStreamAllowedMediaTypes {
  209. if strings.EqualFold(mediaType, allowed) {
  210. return true
  211. }
  212. }
  213. return false
  214. }
  215. func (s *Server) SetTXController(tx TXController) {
  216. s.mu.Lock()
  217. s.tx = tx
  218. s.mu.Unlock()
  219. }
  220. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  221. s.mu.Lock()
  222. s.drv = drv
  223. s.mu.Unlock()
  224. }
  225. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  226. s.mu.Lock()
  227. s.streamSrc = src
  228. s.mu.Unlock()
  229. }
  230. func (s *Server) SetAudioIngress(ingress AudioIngress) {
  231. s.mu.Lock()
  232. s.audioIngress = ingress
  233. s.mu.Unlock()
  234. }
  235. func (s *Server) SetIngestRuntime(rt IngestRuntime) {
  236. s.mu.Lock()
  237. s.ingestRt = rt
  238. s.mu.Unlock()
  239. }
  240. func (s *Server) SetConfigSaver(save func(config.Config) error) {
  241. s.mu.Lock()
  242. s.saveConfig = save
  243. s.mu.Unlock()
  244. }
  245. func (s *Server) SetHardReload(fn func()) {
  246. s.mu.Lock()
  247. s.hardReload = fn
  248. s.mu.Unlock()
  249. }
  250. func (s *Server) Handler() http.Handler {
  251. mux := http.NewServeMux()
  252. mux.HandleFunc("/", s.handleUI)
  253. mux.HandleFunc("/logo", s.handleLogo)
  254. mux.HandleFunc("/healthz", s.handleHealth)
  255. mux.HandleFunc("/status", s.handleStatus)
  256. mux.HandleFunc("/dry-run", s.handleDryRun)
  257. mux.HandleFunc("/config", s.handleConfig)
  258. mux.HandleFunc("/config/ingest/save", s.handleIngestSave)
  259. mux.HandleFunc("/runtime", s.handleRuntime)
  260. mux.HandleFunc("/measurements", s.handleMeasurements)
  261. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  262. mux.HandleFunc("/tx/start", s.handleTXStart)
  263. mux.HandleFunc("/tx/stop", s.handleTXStop)
  264. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  265. return mux
  266. }
  267. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  268. w.Header().Set("Content-Type", "application/json")
  269. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  270. }
  271. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  272. if r.URL.Path != "/" {
  273. http.NotFound(w, r)
  274. return
  275. }
  276. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  277. w.Header().Set("Cache-Control", "no-cache")
  278. w.Write(uiHTML)
  279. }
  280. func (s *Server) handleLogo(w http.ResponseWriter, r *http.Request) {
  281. w.Header().Set("Content-Type", "image/png")
  282. w.Header().Set("Cache-Control", "public, max-age=3600")
  283. w.Write(logoPNG)
  284. }
  285. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  286. s.mu.RLock()
  287. cfg := s.cfg
  288. tx := s.tx
  289. s.mu.RUnlock()
  290. status := map[string]any{
  291. "service": "fm-rds-tx",
  292. "backend": cfg.Backend.Kind,
  293. "frequencyMHz": cfg.FM.FrequencyMHz,
  294. "stereoEnabled": cfg.FM.StereoEnabled,
  295. "stereoMode": cfg.FM.StereoMode,
  296. "rdsEnabled": cfg.RDS.Enabled,
  297. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  298. "limiterEnabled": cfg.FM.LimiterEnabled,
  299. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  300. }
  301. if tx != nil {
  302. if stats := tx.TXStats(); stats != nil {
  303. if ri, ok := stats["runtimeIndicator"]; ok {
  304. status["runtimeIndicator"] = ri
  305. }
  306. if alert, ok := stats["runtimeAlert"]; ok {
  307. status["runtimeAlert"] = alert
  308. }
  309. if queue, ok := stats["queue"]; ok {
  310. status["queue"] = queue
  311. }
  312. if runtimeState, ok := stats["state"]; ok {
  313. status["runtimeState"] = runtimeState
  314. }
  315. }
  316. }
  317. w.Header().Set("Content-Type", "application/json")
  318. _ = json.NewEncoder(w).Encode(status)
  319. }
  320. func (s *Server) handleMeasurements(w http.ResponseWriter, _ *http.Request) {
  321. s.mu.RLock()
  322. tx := s.tx
  323. s.mu.RUnlock()
  324. result := map[string]any{"noData": true, "stale": true}
  325. if tx != nil {
  326. if stats := tx.TXStats(); stats != nil {
  327. if measurement, ok := stats["measurement"]; ok && measurement != nil {
  328. result = map[string]any{"noData": false, "stale": false, "measurement": measurement}
  329. if state, ok := stats["state"]; ok {
  330. result["state"] = state
  331. }
  332. if applied, ok := stats["appliedFrequencyMHz"]; ok {
  333. result["appliedFrequencyMHz"] = applied
  334. }
  335. if queue, ok := stats["queue"]; ok {
  336. result["queue"] = queue
  337. }
  338. if runtimeIndicator, ok := stats["runtimeIndicator"]; ok {
  339. result["runtimeIndicator"] = runtimeIndicator
  340. }
  341. if runtimeAlert, ok := stats["runtimeAlert"]; ok {
  342. result["runtimeAlert"] = runtimeAlert
  343. }
  344. }
  345. }
  346. }
  347. w.Header().Set("Content-Type", "application/json")
  348. _ = json.NewEncoder(w).Encode(result)
  349. }
  350. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  351. s.mu.RLock()
  352. drv := s.drv
  353. tx := s.tx
  354. stream := s.streamSrc
  355. ingestRt := s.ingestRt
  356. s.mu.RUnlock()
  357. result := map[string]any{}
  358. if drv != nil {
  359. result["driver"] = drv.Stats()
  360. }
  361. if tx != nil {
  362. if stats := tx.TXStats(); stats != nil {
  363. result["engine"] = stats
  364. }
  365. }
  366. if stream != nil {
  367. result["audioStream"] = stream.Stats()
  368. }
  369. if ingestRt != nil {
  370. result["ingest"] = ingestRt.Stats()
  371. }
  372. result["controlAudit"] = s.auditSnapshot()
  373. w.Header().Set("Content-Type", "application/json")
  374. _ = json.NewEncoder(w).Encode(result)
  375. }
  376. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  377. if r.Method != http.MethodPost {
  378. s.recordAudit(auditMethodNotAllowed)
  379. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  380. return
  381. }
  382. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  383. return
  384. }
  385. s.mu.RLock()
  386. tx := s.tx
  387. s.mu.RUnlock()
  388. if tx == nil {
  389. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  390. return
  391. }
  392. if err := tx.ResetFault(); err != nil {
  393. http.Error(w, err.Error(), http.StatusConflict)
  394. return
  395. }
  396. w.Header().Set("Content-Type", "application/json")
  397. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  398. }
  399. // handleAudioStream accepts raw S16LE PCM via HTTP POST and pushes
  400. // it into the configured ingest http-raw source. Use with:
  401. //
  402. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  403. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  404. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  405. if r.Method != http.MethodPost {
  406. s.recordAudit(auditMethodNotAllowed)
  407. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  408. return
  409. }
  410. if !isAudioStreamContentType(r) {
  411. s.recordAudit(auditUnsupportedMediaType)
  412. http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType)
  413. return
  414. }
  415. s.mu.RLock()
  416. ingress := s.audioIngress
  417. s.mu.RUnlock()
  418. if ingress == nil {
  419. http.Error(w, "audio ingest not configured (use --audio-http with ingest runtime)", http.StatusServiceUnavailable)
  420. return
  421. }
  422. // BUG-10 fix: /audio/stream is a long-lived streaming endpoint.
  423. // The global HTTP server ReadTimeout (5s) and WriteTimeout (10s) would
  424. // kill connections mid-stream. Disable them per-request via ResponseController
  425. // (requires Go 1.20+, confirmed Go 1.22).
  426. rc := http.NewResponseController(w)
  427. _ = rc.SetReadDeadline(time.Time{})
  428. _ = rc.SetWriteDeadline(time.Time{})
  429. r.Body = http.MaxBytesReader(w, r.Body, audioStreamBodyLimit)
  430. // Read body in chunks and push to ring buffer
  431. buf := make([]byte, 32768)
  432. totalFrames := 0
  433. for {
  434. n, err := r.Body.Read(buf)
  435. if n > 0 {
  436. written, writeErr := ingress.WritePCM16(buf[:n])
  437. totalFrames += written
  438. if writeErr != nil {
  439. http.Error(w, writeErr.Error(), http.StatusServiceUnavailable)
  440. return
  441. }
  442. }
  443. if err != nil {
  444. if err == io.EOF {
  445. break
  446. }
  447. var maxErr *http.MaxBytesError
  448. if errors.As(err, &maxErr) {
  449. s.recordAudit(auditBodyTooLarge)
  450. http.Error(w, maxErr.Error(), http.StatusRequestEntityTooLarge)
  451. return
  452. }
  453. http.Error(w, err.Error(), http.StatusInternalServerError)
  454. return
  455. }
  456. }
  457. w.Header().Set("Content-Type", "application/json")
  458. _ = json.NewEncoder(w).Encode(map[string]any{
  459. "ok": true,
  460. "frames": totalFrames,
  461. })
  462. }
  463. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  464. if r.Method != http.MethodPost {
  465. s.recordAudit(auditMethodNotAllowed)
  466. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  467. return
  468. }
  469. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  470. return
  471. }
  472. s.mu.RLock()
  473. tx := s.tx
  474. s.mu.RUnlock()
  475. if tx == nil {
  476. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  477. return
  478. }
  479. if err := tx.StartTX(); err != nil {
  480. http.Error(w, err.Error(), http.StatusConflict)
  481. return
  482. }
  483. w.Header().Set("Content-Type", "application/json")
  484. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  485. }
  486. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  487. if r.Method != http.MethodPost {
  488. s.recordAudit(auditMethodNotAllowed)
  489. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  490. return
  491. }
  492. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  493. return
  494. }
  495. s.mu.RLock()
  496. tx := s.tx
  497. s.mu.RUnlock()
  498. if tx == nil {
  499. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  500. return
  501. }
  502. go func() {
  503. _ = tx.StopTX()
  504. }()
  505. w.Header().Set("Content-Type", "application/json")
  506. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stop-requested"})
  507. }
  508. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  509. s.mu.RLock()
  510. cfg := s.cfg
  511. s.mu.RUnlock()
  512. w.Header().Set("Content-Type", "application/json")
  513. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  514. }
  515. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  516. switch r.Method {
  517. case http.MethodGet:
  518. s.mu.RLock()
  519. cfg := s.cfg
  520. s.mu.RUnlock()
  521. w.Header().Set("Content-Type", "application/json")
  522. _ = json.NewEncoder(w).Encode(cfg)
  523. case http.MethodPost:
  524. if !isJSONContentType(r) {
  525. s.recordAudit(auditUnsupportedMediaType)
  526. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  527. return
  528. }
  529. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  530. var patch ConfigPatch
  531. // BUG-4 fix: reject unknown JSON fields (typos) with 400 rather than
  532. // silently ignoring them (e.g. "outputDrvie" would succeed and do nothing).
  533. dec := json.NewDecoder(r.Body)
  534. dec.DisallowUnknownFields()
  535. if err := dec.Decode(&patch); err != nil {
  536. statusCode := http.StatusBadRequest
  537. if strings.Contains(err.Error(), "http: request body too large") {
  538. statusCode = http.StatusRequestEntityTooLarge
  539. s.recordAudit(auditBodyTooLarge)
  540. }
  541. http.Error(w, err.Error(), statusCode)
  542. return
  543. }
  544. // Update the server's config snapshot (for GET /config and /status)
  545. s.mu.Lock()
  546. next := s.cfg
  547. if patch.FrequencyMHz != nil {
  548. next.FM.FrequencyMHz = *patch.FrequencyMHz
  549. }
  550. if patch.OutputDrive != nil {
  551. next.FM.OutputDrive = *patch.OutputDrive
  552. }
  553. if patch.ToneLeftHz != nil {
  554. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  555. }
  556. if patch.ToneRightHz != nil {
  557. next.Audio.ToneRightHz = *patch.ToneRightHz
  558. }
  559. if patch.ToneAmplitude != nil {
  560. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  561. }
  562. if patch.AudioGain != nil {
  563. next.Audio.Gain = *patch.AudioGain
  564. }
  565. if patch.PS != nil {
  566. next.RDS.PS = *patch.PS
  567. }
  568. if patch.RadioText != nil {
  569. next.RDS.RadioText = *patch.RadioText
  570. }
  571. if patch.PI != nil {
  572. next.RDS.PI = *patch.PI
  573. }
  574. if patch.PTY != nil {
  575. next.RDS.PTY = *patch.PTY
  576. }
  577. if patch.TP != nil {
  578. next.RDS.TP = *patch.TP
  579. }
  580. if patch.TA != nil {
  581. next.RDS.TA = *patch.TA
  582. }
  583. if patch.MS != nil {
  584. next.RDS.MS = *patch.MS
  585. }
  586. if patch.CTEnabled != nil {
  587. next.RDS.CTEnabled = *patch.CTEnabled
  588. }
  589. if patch.RTPlusEnabled != nil {
  590. next.RDS.RTPlusEnabled = *patch.RTPlusEnabled
  591. }
  592. if patch.RTPlusSeparator != nil {
  593. next.RDS.RTPlusSeparator = *patch.RTPlusSeparator
  594. }
  595. if patch.PTYN != nil {
  596. next.RDS.PTYN = *patch.PTYN
  597. }
  598. if patch.LPS != nil {
  599. next.RDS.LPS = *patch.LPS
  600. }
  601. if patch.ERTEnabled != nil {
  602. next.RDS.ERTEnabled = *patch.ERTEnabled
  603. }
  604. if patch.ERT != nil {
  605. next.RDS.ERT = *patch.ERT
  606. }
  607. if patch.RDS2Enabled != nil {
  608. next.RDS.RDS2Enabled = *patch.RDS2Enabled
  609. }
  610. if patch.StationLogoPath != nil {
  611. next.RDS.StationLogoPath = *patch.StationLogoPath
  612. }
  613. if patch.AF != nil {
  614. next.RDS.AF = *patch.AF
  615. }
  616. if patch.PreEmphasisTauUS != nil {
  617. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  618. }
  619. if patch.StereoEnabled != nil {
  620. next.FM.StereoEnabled = *patch.StereoEnabled
  621. }
  622. if patch.StereoMode != nil {
  623. next.FM.StereoMode = *patch.StereoMode
  624. }
  625. if patch.LimiterEnabled != nil {
  626. next.FM.LimiterEnabled = *patch.LimiterEnabled
  627. }
  628. if patch.LimiterCeiling != nil {
  629. next.FM.LimiterCeiling = *patch.LimiterCeiling
  630. }
  631. if patch.RDSEnabled != nil {
  632. next.RDS.Enabled = *patch.RDSEnabled
  633. }
  634. if patch.PilotLevel != nil {
  635. next.FM.PilotLevel = *patch.PilotLevel
  636. }
  637. if patch.RDSInjection != nil {
  638. next.FM.RDSInjection = *patch.RDSInjection
  639. }
  640. if patch.BS412Enabled != nil {
  641. next.FM.BS412Enabled = *patch.BS412Enabled
  642. }
  643. if patch.BS412ThresholdDBr != nil {
  644. next.FM.BS412ThresholdDBr = *patch.BS412ThresholdDBr
  645. }
  646. if patch.MpxGain != nil {
  647. next.FM.MpxGain = *patch.MpxGain
  648. }
  649. if patch.CompositeClipperEnabled != nil {
  650. next.FM.CompositeClipper.Enabled = *patch.CompositeClipperEnabled
  651. }
  652. if patch.CompositeClipperIterations != nil {
  653. next.FM.CompositeClipper.Iterations = *patch.CompositeClipperIterations
  654. }
  655. if patch.CompositeClipperSoftKnee != nil {
  656. next.FM.CompositeClipper.SoftKnee = *patch.CompositeClipperSoftKnee
  657. }
  658. if patch.CompositeClipperLookaheadMs != nil {
  659. next.FM.CompositeClipper.LookaheadMs = *patch.CompositeClipperLookaheadMs
  660. }
  661. if err := next.Validate(); err != nil {
  662. s.mu.Unlock()
  663. http.Error(w, err.Error(), http.StatusBadRequest)
  664. return
  665. }
  666. lp := LivePatch{
  667. FrequencyMHz: patch.FrequencyMHz,
  668. OutputDrive: patch.OutputDrive,
  669. StereoEnabled: patch.StereoEnabled,
  670. StereoMode: patch.StereoMode,
  671. PilotLevel: patch.PilotLevel,
  672. RDSInjection: patch.RDSInjection,
  673. RDSEnabled: patch.RDSEnabled,
  674. LimiterEnabled: patch.LimiterEnabled,
  675. LimiterCeiling: patch.LimiterCeiling,
  676. PS: patch.PS,
  677. RadioText: patch.RadioText,
  678. TA: patch.TA,
  679. TP: patch.TP,
  680. ToneLeftHz: patch.ToneLeftHz,
  681. ToneRightHz: patch.ToneRightHz,
  682. ToneAmplitude: patch.ToneAmplitude,
  683. AudioGain: patch.AudioGain,
  684. CompositeClipperEnabled: patch.CompositeClipperEnabled,
  685. }
  686. // NEU-02 fix: determine whether any live-patchable fields are present,
  687. // then release the lock before calling UpdateConfig to avoid holding
  688. // s.mu across a potentially blocking engine call.
  689. tx := s.tx
  690. hasLiveFields := patch.FrequencyMHz != nil || patch.OutputDrive != nil ||
  691. patch.StereoEnabled != nil || patch.StereoMode != nil || patch.PilotLevel != nil ||
  692. patch.RDSInjection != nil || patch.RDSEnabled != nil ||
  693. patch.LimiterEnabled != nil || patch.LimiterCeiling != nil ||
  694. patch.PS != nil || patch.RadioText != nil || patch.TA != nil || patch.TP != nil ||
  695. patch.ToneLeftHz != nil || patch.ToneRightHz != nil ||
  696. patch.ToneAmplitude != nil || patch.AudioGain != nil ||
  697. patch.CompositeClipperEnabled != nil
  698. s.mu.Unlock()
  699. // Apply live fields to running engine outside the lock.
  700. if tx != nil && hasLiveFields {
  701. if err := tx.UpdateConfig(lp); err != nil {
  702. http.Error(w, err.Error(), http.StatusBadRequest)
  703. return
  704. }
  705. }
  706. // Persist the validated config snapshot when a config saver is available.
  707. // This ensures restart-required UI changes survive process restarts instead
  708. // of only updating the in-memory snapshot.
  709. s.mu.RLock()
  710. save := s.saveConfig
  711. s.mu.RUnlock()
  712. if save != nil {
  713. if err := save(next); err != nil {
  714. http.Error(w, err.Error(), http.StatusInternalServerError)
  715. return
  716. }
  717. }
  718. // Commit the server snapshot only after validation, optional persistence,
  719. // and any required live update succeeded.
  720. s.mu.Lock()
  721. s.cfg = next
  722. s.mu.Unlock()
  723. // NEU-03 fix: report live=true only when live-patchable fields were applied.
  724. live := tx != nil && hasLiveFields
  725. w.Header().Set("Content-Type", "application/json")
  726. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  727. default:
  728. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  729. }
  730. }
  731. func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) {
  732. if r.Method != http.MethodPost {
  733. s.recordAudit(auditMethodNotAllowed)
  734. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  735. return
  736. }
  737. if !isJSONContentType(r) {
  738. s.recordAudit(auditUnsupportedMediaType)
  739. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  740. return
  741. }
  742. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  743. var req IngestSaveRequest
  744. if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
  745. statusCode := http.StatusBadRequest
  746. if strings.Contains(err.Error(), "http: request body too large") {
  747. statusCode = http.StatusRequestEntityTooLarge
  748. s.recordAudit(auditBodyTooLarge)
  749. }
  750. http.Error(w, err.Error(), statusCode)
  751. return
  752. }
  753. s.mu.Lock()
  754. next := s.cfg
  755. next.Ingest = req.Ingest
  756. if err := next.Validate(); err != nil {
  757. s.mu.Unlock()
  758. http.Error(w, err.Error(), http.StatusBadRequest)
  759. return
  760. }
  761. save := s.saveConfig
  762. reload := s.hardReload
  763. if save == nil {
  764. s.mu.Unlock()
  765. http.Error(w, "config save is not configured (start with --config <path>)", http.StatusServiceUnavailable)
  766. return
  767. }
  768. if err := save(next); err != nil {
  769. s.mu.Unlock()
  770. http.Error(w, err.Error(), http.StatusInternalServerError)
  771. return
  772. }
  773. s.cfg = next
  774. s.mu.Unlock()
  775. w.Header().Set("Content-Type", "application/json")
  776. reloadScheduled := reload != nil
  777. _ = json.NewEncoder(w).Encode(map[string]any{
  778. "ok": true,
  779. "saved": true,
  780. "reloadScheduled": reloadScheduled,
  781. })
  782. if reloadScheduled && s.reloadPending.CompareAndSwap(false, true) {
  783. go func(fn func()) {
  784. time.Sleep(250 * time.Millisecond)
  785. s.reloadPending.Store(false)
  786. fn()
  787. }(reload)
  788. }
  789. }