Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

737 строки
21KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "mime"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jan/fm-rds-tx/internal/audio"
  14. "github.com/jan/fm-rds-tx/internal/config"
  15. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  16. "github.com/jan/fm-rds-tx/internal/ingest"
  17. "github.com/jan/fm-rds-tx/internal/platform"
  18. )
  19. //go:embed ui.html
  20. var uiHTML []byte
  21. // TXController is an optional interface the Server uses to start/stop TX
  22. // and apply live config changes.
  23. type TXController interface {
  24. StartTX() error
  25. StopTX() error
  26. TXStats() map[string]any
  27. UpdateConfig(patch LivePatch) error
  28. ResetFault() error
  29. }
  30. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  31. // nil = no change.
  32. type LivePatch struct {
  33. FrequencyMHz *float64
  34. OutputDrive *float64
  35. StereoEnabled *bool
  36. StereoMode *string
  37. PilotLevel *float64
  38. RDSInjection *float64
  39. RDSEnabled *bool
  40. LimiterEnabled *bool
  41. LimiterCeiling *float64
  42. PS *string
  43. RadioText *string
  44. ToneLeftHz *float64
  45. ToneRightHz *float64
  46. ToneAmplitude *float64
  47. AudioGain *float64
  48. CompositeClipperEnabled *bool
  49. }
  50. type Server struct {
  51. mu sync.RWMutex
  52. cfg config.Config
  53. tx TXController
  54. drv platform.SoapyDriver // optional, for runtime stats
  55. streamSrc *audio.StreamSource // optional, for live audio ring stats
  56. audioIngress AudioIngress // optional, for /audio/stream
  57. ingestRt IngestRuntime // optional, for /runtime ingest stats
  58. saveConfig func(config.Config) error
  59. hardReload func()
  60. // BUG-F fix: reloadPending prevents multiple concurrent goroutines from
  61. // calling hardReload when handleIngestSave is hit multiple times quickly.
  62. reloadPending atomic.Bool
  63. audit auditCounters
  64. }
  65. type AudioIngress interface {
  66. WritePCM16(data []byte) (int, error)
  67. }
  68. type IngestRuntime interface {
  69. Stats() ingest.Stats
  70. }
  71. type auditEvent string
  72. const (
  73. auditMethodNotAllowed auditEvent = "methodNotAllowed"
  74. auditUnsupportedMediaType auditEvent = "unsupportedMediaType"
  75. auditBodyTooLarge auditEvent = "bodyTooLarge"
  76. auditUnexpectedBody auditEvent = "unexpectedBody"
  77. )
  78. type auditCounters struct {
  79. methodNotAllowed uint64
  80. unsupportedMediaType uint64
  81. bodyTooLarge uint64
  82. unexpectedBody uint64
  83. }
  84. const (
  85. maxConfigBodyBytes = 64 << 10 // 64 KiB
  86. configContentTypeHeader = "application/json"
  87. noBodyErrMsg = "request must not include a body"
  88. audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16"
  89. audioStreamBodyLimitDefault = 512 << 20 // 512 MiB
  90. )
  91. var audioStreamAllowedMediaTypes = []string{
  92. "application/octet-stream",
  93. "audio/l16",
  94. }
  95. var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override.
  96. func isJSONContentType(r *http.Request) bool {
  97. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  98. if ct == "" {
  99. return false
  100. }
  101. mediaType, _, err := mime.ParseMediaType(ct)
  102. if err != nil {
  103. return false
  104. }
  105. return strings.EqualFold(mediaType, configContentTypeHeader)
  106. }
  107. type ConfigPatch struct {
  108. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  109. OutputDrive *float64 `json:"outputDrive,omitempty"`
  110. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  111. StereoMode *string `json:"stereoMode,omitempty"`
  112. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  113. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  114. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  115. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  116. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  117. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  118. PS *string `json:"ps,omitempty"`
  119. RadioText *string `json:"radioText,omitempty"`
  120. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  121. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  122. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  123. AudioGain *float64 `json:"audioGain,omitempty"`
  124. PI *string `json:"pi,omitempty"`
  125. PTY *int `json:"pty,omitempty"`
  126. BS412Enabled *bool `json:"bs412Enabled,omitempty"`
  127. BS412ThresholdDBr *float64 `json:"bs412ThresholdDBr,omitempty"`
  128. MpxGain *float64 `json:"mpxGain,omitempty"`
  129. CompositeClipperEnabled *bool `json:"compositeClipperEnabled,omitempty"`
  130. CompositeClipperIterations *int `json:"compositeClipperIterations,omitempty"`
  131. CompositeClipperSoftKnee *float64 `json:"compositeClipperSoftKnee,omitempty"`
  132. CompositeClipperLookaheadMs *float64 `json:"compositeClipperLookaheadMs,omitempty"`
  133. }
  134. type IngestSaveRequest struct {
  135. Ingest config.IngestConfig `json:"ingest"`
  136. }
  137. func NewServer(cfg config.Config) *Server {
  138. return &Server{cfg: cfg}
  139. }
  140. func hasRequestBody(r *http.Request) bool {
  141. if r.ContentLength > 0 {
  142. return true
  143. }
  144. for _, te := range r.TransferEncoding {
  145. if strings.EqualFold(te, "chunked") {
  146. return true
  147. }
  148. }
  149. return false
  150. }
  151. func (s *Server) rejectBody(w http.ResponseWriter, r *http.Request) bool {
  152. // Returns true when the request has an unexpected body and the error response
  153. // has already been written — callers should return immediately in that case.
  154. // Returns false when there is no body (happy path — request should proceed).
  155. if !hasRequestBody(r) {
  156. return false
  157. }
  158. s.recordAudit(auditUnexpectedBody)
  159. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  160. return true
  161. }
  162. func (s *Server) recordAudit(evt auditEvent) {
  163. switch evt {
  164. case auditMethodNotAllowed:
  165. atomic.AddUint64(&s.audit.methodNotAllowed, 1)
  166. case auditUnsupportedMediaType:
  167. atomic.AddUint64(&s.audit.unsupportedMediaType, 1)
  168. case auditBodyTooLarge:
  169. atomic.AddUint64(&s.audit.bodyTooLarge, 1)
  170. case auditUnexpectedBody:
  171. atomic.AddUint64(&s.audit.unexpectedBody, 1)
  172. }
  173. }
  174. func (s *Server) auditSnapshot() map[string]uint64 {
  175. return map[string]uint64{
  176. "methodNotAllowed": atomic.LoadUint64(&s.audit.methodNotAllowed),
  177. "unsupportedMediaType": atomic.LoadUint64(&s.audit.unsupportedMediaType),
  178. "bodyTooLarge": atomic.LoadUint64(&s.audit.bodyTooLarge),
  179. "unexpectedBody": atomic.LoadUint64(&s.audit.unexpectedBody),
  180. }
  181. }
  182. func isAudioStreamContentType(r *http.Request) bool {
  183. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  184. if ct == "" {
  185. return false
  186. }
  187. mediaType, _, err := mime.ParseMediaType(ct)
  188. if err != nil {
  189. return false
  190. }
  191. for _, allowed := range audioStreamAllowedMediaTypes {
  192. if strings.EqualFold(mediaType, allowed) {
  193. return true
  194. }
  195. }
  196. return false
  197. }
  198. func (s *Server) SetTXController(tx TXController) {
  199. s.mu.Lock()
  200. s.tx = tx
  201. s.mu.Unlock()
  202. }
  203. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  204. s.mu.Lock()
  205. s.drv = drv
  206. s.mu.Unlock()
  207. }
  208. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  209. s.mu.Lock()
  210. s.streamSrc = src
  211. s.mu.Unlock()
  212. }
  213. func (s *Server) SetAudioIngress(ingress AudioIngress) {
  214. s.mu.Lock()
  215. s.audioIngress = ingress
  216. s.mu.Unlock()
  217. }
  218. func (s *Server) SetIngestRuntime(rt IngestRuntime) {
  219. s.mu.Lock()
  220. s.ingestRt = rt
  221. s.mu.Unlock()
  222. }
  223. func (s *Server) SetConfigSaver(save func(config.Config) error) {
  224. s.mu.Lock()
  225. s.saveConfig = save
  226. s.mu.Unlock()
  227. }
  228. func (s *Server) SetHardReload(fn func()) {
  229. s.mu.Lock()
  230. s.hardReload = fn
  231. s.mu.Unlock()
  232. }
  233. func (s *Server) Handler() http.Handler {
  234. mux := http.NewServeMux()
  235. mux.HandleFunc("/", s.handleUI)
  236. mux.HandleFunc("/healthz", s.handleHealth)
  237. mux.HandleFunc("/status", s.handleStatus)
  238. mux.HandleFunc("/dry-run", s.handleDryRun)
  239. mux.HandleFunc("/config", s.handleConfig)
  240. mux.HandleFunc("/config/ingest/save", s.handleIngestSave)
  241. mux.HandleFunc("/runtime", s.handleRuntime)
  242. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  243. mux.HandleFunc("/tx/start", s.handleTXStart)
  244. mux.HandleFunc("/tx/stop", s.handleTXStop)
  245. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  246. return mux
  247. }
  248. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  249. w.Header().Set("Content-Type", "application/json")
  250. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  251. }
  252. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  253. if r.URL.Path != "/" {
  254. http.NotFound(w, r)
  255. return
  256. }
  257. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  258. w.Header().Set("Cache-Control", "no-cache")
  259. w.Write(uiHTML)
  260. }
  261. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  262. s.mu.RLock()
  263. cfg := s.cfg
  264. tx := s.tx
  265. s.mu.RUnlock()
  266. status := map[string]any{
  267. "service": "fm-rds-tx",
  268. "backend": cfg.Backend.Kind,
  269. "frequencyMHz": cfg.FM.FrequencyMHz,
  270. "stereoEnabled": cfg.FM.StereoEnabled,
  271. "stereoMode": cfg.FM.StereoMode,
  272. "rdsEnabled": cfg.RDS.Enabled,
  273. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  274. "limiterEnabled": cfg.FM.LimiterEnabled,
  275. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  276. }
  277. if tx != nil {
  278. if stats := tx.TXStats(); stats != nil {
  279. if ri, ok := stats["runtimeIndicator"]; ok {
  280. status["runtimeIndicator"] = ri
  281. }
  282. if alert, ok := stats["runtimeAlert"]; ok {
  283. status["runtimeAlert"] = alert
  284. }
  285. if queue, ok := stats["queue"]; ok {
  286. status["queue"] = queue
  287. }
  288. if runtimeState, ok := stats["state"]; ok {
  289. status["runtimeState"] = runtimeState
  290. }
  291. }
  292. }
  293. w.Header().Set("Content-Type", "application/json")
  294. _ = json.NewEncoder(w).Encode(status)
  295. }
  296. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  297. s.mu.RLock()
  298. drv := s.drv
  299. tx := s.tx
  300. stream := s.streamSrc
  301. ingestRt := s.ingestRt
  302. s.mu.RUnlock()
  303. result := map[string]any{}
  304. if drv != nil {
  305. result["driver"] = drv.Stats()
  306. }
  307. if tx != nil {
  308. if stats := tx.TXStats(); stats != nil {
  309. result["engine"] = stats
  310. }
  311. }
  312. if stream != nil {
  313. result["audioStream"] = stream.Stats()
  314. }
  315. if ingestRt != nil {
  316. result["ingest"] = ingestRt.Stats()
  317. }
  318. result["controlAudit"] = s.auditSnapshot()
  319. w.Header().Set("Content-Type", "application/json")
  320. _ = json.NewEncoder(w).Encode(result)
  321. }
  322. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  323. if r.Method != http.MethodPost {
  324. s.recordAudit(auditMethodNotAllowed)
  325. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  326. return
  327. }
  328. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  329. return
  330. }
  331. s.mu.RLock()
  332. tx := s.tx
  333. s.mu.RUnlock()
  334. if tx == nil {
  335. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  336. return
  337. }
  338. if err := tx.ResetFault(); err != nil {
  339. http.Error(w, err.Error(), http.StatusConflict)
  340. return
  341. }
  342. w.Header().Set("Content-Type", "application/json")
  343. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  344. }
  345. // handleAudioStream accepts raw S16LE PCM via HTTP POST and pushes
  346. // it into the configured ingest http-raw source. Use with:
  347. //
  348. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  349. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  350. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  351. if r.Method != http.MethodPost {
  352. s.recordAudit(auditMethodNotAllowed)
  353. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  354. return
  355. }
  356. if !isAudioStreamContentType(r) {
  357. s.recordAudit(auditUnsupportedMediaType)
  358. http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType)
  359. return
  360. }
  361. s.mu.RLock()
  362. ingress := s.audioIngress
  363. s.mu.RUnlock()
  364. if ingress == nil {
  365. http.Error(w, "audio ingest not configured (use --audio-http with ingest runtime)", http.StatusServiceUnavailable)
  366. return
  367. }
  368. // BUG-10 fix: /audio/stream is a long-lived streaming endpoint.
  369. // The global HTTP server ReadTimeout (5s) and WriteTimeout (10s) would
  370. // kill connections mid-stream. Disable them per-request via ResponseController
  371. // (requires Go 1.20+, confirmed Go 1.22).
  372. rc := http.NewResponseController(w)
  373. _ = rc.SetReadDeadline(time.Time{})
  374. _ = rc.SetWriteDeadline(time.Time{})
  375. r.Body = http.MaxBytesReader(w, r.Body, audioStreamBodyLimit)
  376. // Read body in chunks and push to ring buffer
  377. buf := make([]byte, 32768)
  378. totalFrames := 0
  379. for {
  380. n, err := r.Body.Read(buf)
  381. if n > 0 {
  382. written, writeErr := ingress.WritePCM16(buf[:n])
  383. totalFrames += written
  384. if writeErr != nil {
  385. http.Error(w, writeErr.Error(), http.StatusServiceUnavailable)
  386. return
  387. }
  388. }
  389. if err != nil {
  390. if err == io.EOF {
  391. break
  392. }
  393. var maxErr *http.MaxBytesError
  394. if errors.As(err, &maxErr) {
  395. s.recordAudit(auditBodyTooLarge)
  396. http.Error(w, maxErr.Error(), http.StatusRequestEntityTooLarge)
  397. return
  398. }
  399. http.Error(w, err.Error(), http.StatusInternalServerError)
  400. return
  401. }
  402. }
  403. w.Header().Set("Content-Type", "application/json")
  404. _ = json.NewEncoder(w).Encode(map[string]any{
  405. "ok": true,
  406. "frames": totalFrames,
  407. })
  408. }
  409. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  410. if r.Method != http.MethodPost {
  411. s.recordAudit(auditMethodNotAllowed)
  412. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  413. return
  414. }
  415. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  416. return
  417. }
  418. s.mu.RLock()
  419. tx := s.tx
  420. s.mu.RUnlock()
  421. if tx == nil {
  422. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  423. return
  424. }
  425. if err := tx.StartTX(); err != nil {
  426. http.Error(w, err.Error(), http.StatusConflict)
  427. return
  428. }
  429. w.Header().Set("Content-Type", "application/json")
  430. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  431. }
  432. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  433. if r.Method != http.MethodPost {
  434. s.recordAudit(auditMethodNotAllowed)
  435. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  436. return
  437. }
  438. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  439. return
  440. }
  441. s.mu.RLock()
  442. tx := s.tx
  443. s.mu.RUnlock()
  444. if tx == nil {
  445. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  446. return
  447. }
  448. go func() {
  449. _ = tx.StopTX()
  450. }()
  451. w.Header().Set("Content-Type", "application/json")
  452. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stop-requested"})
  453. }
  454. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  455. s.mu.RLock()
  456. cfg := s.cfg
  457. s.mu.RUnlock()
  458. w.Header().Set("Content-Type", "application/json")
  459. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  460. }
  461. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  462. switch r.Method {
  463. case http.MethodGet:
  464. s.mu.RLock()
  465. cfg := s.cfg
  466. s.mu.RUnlock()
  467. w.Header().Set("Content-Type", "application/json")
  468. _ = json.NewEncoder(w).Encode(cfg)
  469. case http.MethodPost:
  470. if !isJSONContentType(r) {
  471. s.recordAudit(auditUnsupportedMediaType)
  472. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  473. return
  474. }
  475. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  476. var patch ConfigPatch
  477. // BUG-4 fix: reject unknown JSON fields (typos) with 400 rather than
  478. // silently ignoring them (e.g. "outputDrvie" would succeed and do nothing).
  479. dec := json.NewDecoder(r.Body)
  480. dec.DisallowUnknownFields()
  481. if err := dec.Decode(&patch); err != nil {
  482. statusCode := http.StatusBadRequest
  483. if strings.Contains(err.Error(), "http: request body too large") {
  484. statusCode = http.StatusRequestEntityTooLarge
  485. s.recordAudit(auditBodyTooLarge)
  486. }
  487. http.Error(w, err.Error(), statusCode)
  488. return
  489. }
  490. // Update the server's config snapshot (for GET /config and /status)
  491. s.mu.Lock()
  492. next := s.cfg
  493. if patch.FrequencyMHz != nil {
  494. next.FM.FrequencyMHz = *patch.FrequencyMHz
  495. }
  496. if patch.OutputDrive != nil {
  497. next.FM.OutputDrive = *patch.OutputDrive
  498. }
  499. if patch.ToneLeftHz != nil {
  500. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  501. }
  502. if patch.ToneRightHz != nil {
  503. next.Audio.ToneRightHz = *patch.ToneRightHz
  504. }
  505. if patch.ToneAmplitude != nil {
  506. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  507. }
  508. if patch.AudioGain != nil {
  509. next.Audio.Gain = *patch.AudioGain
  510. }
  511. if patch.PS != nil {
  512. next.RDS.PS = *patch.PS
  513. }
  514. if patch.RadioText != nil {
  515. next.RDS.RadioText = *patch.RadioText
  516. }
  517. if patch.PI != nil {
  518. next.RDS.PI = *patch.PI
  519. }
  520. if patch.PTY != nil {
  521. next.RDS.PTY = *patch.PTY
  522. }
  523. if patch.PreEmphasisTauUS != nil {
  524. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  525. }
  526. if patch.StereoEnabled != nil {
  527. next.FM.StereoEnabled = *patch.StereoEnabled
  528. }
  529. if patch.StereoMode != nil {
  530. next.FM.StereoMode = *patch.StereoMode
  531. }
  532. if patch.LimiterEnabled != nil {
  533. next.FM.LimiterEnabled = *patch.LimiterEnabled
  534. }
  535. if patch.LimiterCeiling != nil {
  536. next.FM.LimiterCeiling = *patch.LimiterCeiling
  537. }
  538. if patch.RDSEnabled != nil {
  539. next.RDS.Enabled = *patch.RDSEnabled
  540. }
  541. if patch.PilotLevel != nil {
  542. next.FM.PilotLevel = *patch.PilotLevel
  543. }
  544. if patch.RDSInjection != nil {
  545. next.FM.RDSInjection = *patch.RDSInjection
  546. }
  547. if patch.BS412Enabled != nil {
  548. next.FM.BS412Enabled = *patch.BS412Enabled
  549. }
  550. if patch.BS412ThresholdDBr != nil {
  551. next.FM.BS412ThresholdDBr = *patch.BS412ThresholdDBr
  552. }
  553. if patch.MpxGain != nil {
  554. next.FM.MpxGain = *patch.MpxGain
  555. }
  556. if patch.CompositeClipperEnabled != nil {
  557. next.FM.CompositeClipper.Enabled = *patch.CompositeClipperEnabled
  558. }
  559. if patch.CompositeClipperIterations != nil {
  560. next.FM.CompositeClipper.Iterations = *patch.CompositeClipperIterations
  561. }
  562. if patch.CompositeClipperSoftKnee != nil {
  563. next.FM.CompositeClipper.SoftKnee = *patch.CompositeClipperSoftKnee
  564. }
  565. if patch.CompositeClipperLookaheadMs != nil {
  566. next.FM.CompositeClipper.LookaheadMs = *patch.CompositeClipperLookaheadMs
  567. }
  568. if err := next.Validate(); err != nil {
  569. s.mu.Unlock()
  570. http.Error(w, err.Error(), http.StatusBadRequest)
  571. return
  572. }
  573. lp := LivePatch{
  574. FrequencyMHz: patch.FrequencyMHz,
  575. OutputDrive: patch.OutputDrive,
  576. StereoEnabled: patch.StereoEnabled,
  577. StereoMode: patch.StereoMode,
  578. PilotLevel: patch.PilotLevel,
  579. RDSInjection: patch.RDSInjection,
  580. RDSEnabled: patch.RDSEnabled,
  581. LimiterEnabled: patch.LimiterEnabled,
  582. LimiterCeiling: patch.LimiterCeiling,
  583. PS: patch.PS,
  584. RadioText: patch.RadioText,
  585. ToneLeftHz: patch.ToneLeftHz,
  586. ToneRightHz: patch.ToneRightHz,
  587. ToneAmplitude: patch.ToneAmplitude,
  588. AudioGain: patch.AudioGain,
  589. CompositeClipperEnabled: patch.CompositeClipperEnabled,
  590. }
  591. // NEU-02 fix: determine whether any live-patchable fields are present,
  592. // then release the lock before calling UpdateConfig to avoid holding
  593. // s.mu across a potentially blocking engine call.
  594. tx := s.tx
  595. hasLiveFields := patch.FrequencyMHz != nil || patch.OutputDrive != nil ||
  596. patch.StereoEnabled != nil || patch.StereoMode != nil || patch.PilotLevel != nil ||
  597. patch.RDSInjection != nil || patch.RDSEnabled != nil ||
  598. patch.LimiterEnabled != nil || patch.LimiterCeiling != nil ||
  599. patch.PS != nil || patch.RadioText != nil ||
  600. patch.ToneLeftHz != nil || patch.ToneRightHz != nil ||
  601. patch.ToneAmplitude != nil || patch.AudioGain != nil ||
  602. patch.CompositeClipperEnabled != nil
  603. s.cfg = next
  604. s.mu.Unlock()
  605. // Apply live fields to running engine outside the lock.
  606. var updateErr error
  607. if tx != nil && hasLiveFields {
  608. if err := tx.UpdateConfig(lp); err != nil {
  609. updateErr = err
  610. }
  611. }
  612. if updateErr != nil {
  613. http.Error(w, updateErr.Error(), http.StatusBadRequest)
  614. return
  615. }
  616. // NEU-03 fix: report live=true only when live-patchable fields were applied.
  617. live := tx != nil && hasLiveFields
  618. w.Header().Set("Content-Type", "application/json")
  619. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  620. default:
  621. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  622. }
  623. }
  624. func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) {
  625. if r.Method != http.MethodPost {
  626. s.recordAudit(auditMethodNotAllowed)
  627. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  628. return
  629. }
  630. if !isJSONContentType(r) {
  631. s.recordAudit(auditUnsupportedMediaType)
  632. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  633. return
  634. }
  635. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  636. var req IngestSaveRequest
  637. if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
  638. statusCode := http.StatusBadRequest
  639. if strings.Contains(err.Error(), "http: request body too large") {
  640. statusCode = http.StatusRequestEntityTooLarge
  641. s.recordAudit(auditBodyTooLarge)
  642. }
  643. http.Error(w, err.Error(), statusCode)
  644. return
  645. }
  646. s.mu.Lock()
  647. next := s.cfg
  648. next.Ingest = req.Ingest
  649. if err := next.Validate(); err != nil {
  650. s.mu.Unlock()
  651. http.Error(w, err.Error(), http.StatusBadRequest)
  652. return
  653. }
  654. save := s.saveConfig
  655. reload := s.hardReload
  656. if save == nil {
  657. s.mu.Unlock()
  658. http.Error(w, "config save is not configured (start with --config <path>)", http.StatusServiceUnavailable)
  659. return
  660. }
  661. if err := save(next); err != nil {
  662. s.mu.Unlock()
  663. http.Error(w, err.Error(), http.StatusInternalServerError)
  664. return
  665. }
  666. s.cfg = next
  667. s.mu.Unlock()
  668. w.Header().Set("Content-Type", "application/json")
  669. reloadScheduled := reload != nil
  670. _ = json.NewEncoder(w).Encode(map[string]any{
  671. "ok": true,
  672. "saved": true,
  673. "reloadScheduled": reloadScheduled,
  674. })
  675. if reloadScheduled && s.reloadPending.CompareAndSwap(false, true) {
  676. go func(fn func()) {
  677. time.Sleep(250 * time.Millisecond)
  678. s.reloadPending.Store(false)
  679. fn()
  680. }(reload)
  681. }
  682. }