package control import ( _ "embed" "encoding/json" "errors" "io" "mime" "net/http" "strings" "sync" "sync/atomic" "time" "golang.org/x/net/websocket" "github.com/jan/fm-rds-tx/internal/audio" "github.com/jan/fm-rds-tx/internal/config" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" "github.com/jan/fm-rds-tx/internal/ingest" offpkg "github.com/jan/fm-rds-tx/internal/offline" "github.com/jan/fm-rds-tx/internal/platform" ) //go:embed ui.html var uiHTML []byte //go:embed logo.png var logoPNG []byte // TXController is an optional interface the Server uses to start/stop TX // and apply live config changes. type TXController interface { StartTX() error StopTX() error TXStats() map[string]any UpdateConfig(patch LivePatch) error ResetFault() error } // LivePatch mirrors the patchable fields from ConfigPatch for the engine. // nil = no change. type LivePatch struct { FrequencyMHz *float64 OutputDrive *float64 StereoEnabled *bool StereoMode *string PilotLevel *float64 RDSInjection *float64 RDSEnabled *bool LimiterEnabled *bool LimiterCeiling *float64 PS *string RadioText *string TA *bool TP *bool ToneLeftHz *float64 ToneRightHz *float64 ToneAmplitude *float64 AudioGain *float64 CompositeClipperEnabled *bool } type Server struct { mu sync.RWMutex cfg config.Config tx TXController drv platform.SoapyDriver // optional, for runtime stats streamSrc *audio.StreamSource // optional, for live audio ring stats audioIngress AudioIngress // optional, for /audio/stream ingestRt IngestRuntime // optional, for /runtime ingest stats saveConfig func(config.Config) error hardReload func() // BUG-F fix: reloadPending prevents multiple concurrent goroutines from // calling hardReload when handleIngestSave is hit multiple times quickly. reloadPending atomic.Bool audit auditCounters telemetryHub *TelemetryHub } type AudioIngress interface { WritePCM16(data []byte) (int, error) } type IngestRuntime interface { Stats() ingest.Stats } type auditEvent string const ( auditMethodNotAllowed auditEvent = "methodNotAllowed" auditUnsupportedMediaType auditEvent = "unsupportedMediaType" auditBodyTooLarge auditEvent = "bodyTooLarge" auditUnexpectedBody auditEvent = "unexpectedBody" ) type auditCounters struct { methodNotAllowed uint64 unsupportedMediaType uint64 bodyTooLarge uint64 unexpectedBody uint64 } const ( maxConfigBodyBytes = 64 << 10 // 64 KiB configContentTypeHeader = "application/json" noBodyErrMsg = "request must not include a body" audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16" audioStreamBodyLimitDefault = 512 << 20 // 512 MiB ) var audioStreamAllowedMediaTypes = []string{ "application/octet-stream", "audio/l16", } var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override. func anyToString(v any) string { if v == nil { return "" } if s, ok := v.(string); ok { return s } return "" } func isJSONContentType(r *http.Request) bool { ct := strings.TrimSpace(r.Header.Get("Content-Type")) if ct == "" { return false } mediaType, _, err := mime.ParseMediaType(ct) if err != nil { return false } return strings.EqualFold(mediaType, configContentTypeHeader) } type ConfigPatch struct { FrequencyMHz *float64 `json:"frequencyMHz,omitempty"` OutputDrive *float64 `json:"outputDrive,omitempty"` StereoEnabled *bool `json:"stereoEnabled,omitempty"` StereoMode *string `json:"stereoMode,omitempty"` PilotLevel *float64 `json:"pilotLevel,omitempty"` RDSInjection *float64 `json:"rdsInjection,omitempty"` RDSEnabled *bool `json:"rdsEnabled,omitempty"` ToneLeftHz *float64 `json:"toneLeftHz,omitempty"` ToneRightHz *float64 `json:"toneRightHz,omitempty"` ToneAmplitude *float64 `json:"toneAmplitude,omitempty"` PS *string `json:"ps,omitempty"` RadioText *string `json:"radioText,omitempty"` PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"` LimiterEnabled *bool `json:"limiterEnabled,omitempty"` LimiterCeiling *float64 `json:"limiterCeiling,omitempty"` AudioGain *float64 `json:"audioGain,omitempty"` PI *string `json:"pi,omitempty"` PTY *int `json:"pty,omitempty"` TP *bool `json:"tp,omitempty"` TA *bool `json:"ta,omitempty"` MS *bool `json:"ms,omitempty"` CTEnabled *bool `json:"ctEnabled,omitempty"` RTPlusEnabled *bool `json:"rtPlusEnabled,omitempty"` RTPlusSeparator *string `json:"rtPlusSeparator,omitempty"` PTYN *string `json:"ptyn,omitempty"` LPS *string `json:"lps,omitempty"` ERTEnabled *bool `json:"ertEnabled,omitempty"` ERT *string `json:"ert,omitempty"` RDS2Enabled *bool `json:"rds2Enabled,omitempty"` StationLogoPath *string `json:"stationLogoPath,omitempty"` AF *[]float64 `json:"af,omitempty"` BS412Enabled *bool `json:"bs412Enabled,omitempty"` BS412ThresholdDBr *float64 `json:"bs412ThresholdDBr,omitempty"` MpxGain *float64 `json:"mpxGain,omitempty"` CompositeClipperEnabled *bool `json:"compositeClipperEnabled,omitempty"` CompositeClipperIterations *int `json:"compositeClipperIterations,omitempty"` CompositeClipperSoftKnee *float64 `json:"compositeClipperSoftKnee,omitempty"` CompositeClipperLookaheadMs *float64 `json:"compositeClipperLookaheadMs,omitempty"` } type IngestSaveRequest struct { Ingest config.IngestConfig `json:"ingest"` } func NewServer(cfg config.Config) *Server { return &Server{cfg: cfg, telemetryHub: NewTelemetryHub()} } func hasRequestBody(r *http.Request) bool { if r.ContentLength > 0 { return true } for _, te := range r.TransferEncoding { if strings.EqualFold(te, "chunked") { return true } } return false } func (s *Server) rejectBody(w http.ResponseWriter, r *http.Request) bool { // Returns true when the request has an unexpected body and the error response // has already been written — callers should return immediately in that case. // Returns false when there is no body (happy path — request should proceed). if !hasRequestBody(r) { return false } s.recordAudit(auditUnexpectedBody) http.Error(w, noBodyErrMsg, http.StatusBadRequest) return true } func (s *Server) recordAudit(evt auditEvent) { switch evt { case auditMethodNotAllowed: atomic.AddUint64(&s.audit.methodNotAllowed, 1) case auditUnsupportedMediaType: atomic.AddUint64(&s.audit.unsupportedMediaType, 1) case auditBodyTooLarge: atomic.AddUint64(&s.audit.bodyTooLarge, 1) case auditUnexpectedBody: atomic.AddUint64(&s.audit.unexpectedBody, 1) } } func (s *Server) auditSnapshot() map[string]uint64 { return map[string]uint64{ "methodNotAllowed": atomic.LoadUint64(&s.audit.methodNotAllowed), "unsupportedMediaType": atomic.LoadUint64(&s.audit.unsupportedMediaType), "bodyTooLarge": atomic.LoadUint64(&s.audit.bodyTooLarge), "unexpectedBody": atomic.LoadUint64(&s.audit.unexpectedBody), } } func isAudioStreamContentType(r *http.Request) bool { ct := strings.TrimSpace(r.Header.Get("Content-Type")) if ct == "" { return false } mediaType, _, err := mime.ParseMediaType(ct) if err != nil { return false } for _, allowed := range audioStreamAllowedMediaTypes { if strings.EqualFold(mediaType, allowed) { return true } } return false } func (s *Server) TelemetryHub() *TelemetryHub { return s.telemetryHub } func (s *Server) SetTXController(tx TXController) { s.mu.Lock() s.tx = tx s.mu.Unlock() } func (s *Server) SetDriver(drv platform.SoapyDriver) { s.mu.Lock() s.drv = drv s.mu.Unlock() } func (s *Server) SetStreamSource(src *audio.StreamSource) { s.mu.Lock() s.streamSrc = src s.mu.Unlock() } func (s *Server) SetAudioIngress(ingress AudioIngress) { s.mu.Lock() s.audioIngress = ingress s.mu.Unlock() } func (s *Server) SetIngestRuntime(rt IngestRuntime) { s.mu.Lock() s.ingestRt = rt s.mu.Unlock() } func (s *Server) SetConfigSaver(save func(config.Config) error) { s.mu.Lock() s.saveConfig = save s.mu.Unlock() } func (s *Server) SetHardReload(fn func()) { s.mu.Lock() s.hardReload = fn s.mu.Unlock() } func (s *Server) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/", s.handleUI) mux.HandleFunc("/logo", s.handleLogo) mux.HandleFunc("/healthz", s.handleHealth) mux.HandleFunc("/status", s.handleStatus) mux.HandleFunc("/dry-run", s.handleDryRun) mux.HandleFunc("/config", s.handleConfig) mux.HandleFunc("/config/ingest/save", s.handleIngestSave) mux.HandleFunc("/runtime", s.handleRuntime) mux.HandleFunc("/measurements", s.handleMeasurements) mux.Handle("/ws/telemetry", websocket.Handler(s.handleTelemetryWS)) mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset) mux.HandleFunc("/tx/start", s.handleTXStart) mux.HandleFunc("/tx/stop", s.handleTXStop) mux.HandleFunc("/audio/stream", s.handleAudioStream) return mux } func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) } func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { http.NotFound(w, r) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Cache-Control", "no-cache") w.Write(uiHTML) } func (s *Server) handleLogo(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "image/png") w.Header().Set("Cache-Control", "public, max-age=3600") w.Write(logoPNG) } func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() cfg := s.cfg tx := s.tx s.mu.RUnlock() status := map[string]any{ "service": "fm-rds-tx", "backend": cfg.Backend.Kind, "frequencyMHz": cfg.FM.FrequencyMHz, "stereoEnabled": cfg.FM.StereoEnabled, "stereoMode": cfg.FM.StereoMode, "rdsEnabled": cfg.RDS.Enabled, "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS, "limiterEnabled": cfg.FM.LimiterEnabled, "fmModulationEnabled": cfg.FM.FMModulationEnabled, } if tx != nil { if stats := tx.TXStats(); stats != nil { if ri, ok := stats["runtimeIndicator"]; ok { status["runtimeIndicator"] = ri } if alert, ok := stats["runtimeAlert"]; ok { status["runtimeAlert"] = alert } if queue, ok := stats["queue"]; ok { status["queue"] = queue } if runtimeState, ok := stats["state"]; ok { status["runtimeState"] = runtimeState } } } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(status) } func (s *Server) handleMeasurements(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() tx := s.tx s.mu.RUnlock() result := map[string]any{"noData": true, "stale": true} if tx != nil { if stats := tx.TXStats(); stats != nil { if state, ok := stats["state"]; ok { result["state"] = state } if applied, ok := stats["appliedFrequencyMHz"]; ok { result["appliedFrequencyMHz"] = applied } if queue, ok := stats["queue"]; ok { result["queue"] = queue } if runtimeIndicator, ok := stats["runtimeIndicator"]; ok { result["runtimeIndicator"] = runtimeIndicator } if runtimeAlert, ok := stats["runtimeAlert"]; ok { result["runtimeAlert"] = runtimeAlert } if measurement, ok := stats["measurement"].(*offpkg.MeasurementSnapshot); ok && measurement != nil { state := strings.ToLower(strings.TrimSpace(anyToString(stats["state"]))) fresh := !measurement.Timestamp.IsZero() && time.Since(measurement.Timestamp) <= 2*time.Second runningish := state == "running" || state == "degraded" || state == "muted" || state == "faulted" || state == "arming" || state == "prebuffering" result["measurement"] = measurement result["noData"] = false result["stale"] = !(fresh && runningish) } } } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(result) } func (s *Server) handleTelemetryWS(ws *websocket.Conn) { if s.telemetryHub == nil { _ = ws.Close() return } _ = ws.SetDeadline(time.Now().Add(30 * time.Second)) sub, unsubscribe := s.telemetryHub.Subscribe() defer unsubscribe() defer ws.Close() s.mu.RLock() tx := s.tx s.mu.RUnlock() if tx != nil { if stats := tx.TXStats(); stats != nil { if measurement, ok := stats["measurement"].(interface{ }); ok { if m, ok := measurement.(*offpkg.MeasurementSnapshot); ok && m != nil { _ = ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) if err := websocket.JSON.Send(ws, TelemetryMessage{Type: "measurement", TS: m.Timestamp, Seq: m.Sequence, Data: m}); err != nil { return } } } } } for { select { case <-sub.done: return case msg := <-sub.ch: _ = ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) if err := websocket.JSON.Send(ws, msg); err != nil { return } } } } func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() drv := s.drv tx := s.tx stream := s.streamSrc ingestRt := s.ingestRt s.mu.RUnlock() result := map[string]any{} if drv != nil { result["driver"] = drv.Stats() } if tx != nil { if stats := tx.TXStats(); stats != nil { result["engine"] = stats } } if stream != nil { result["audioStream"] = stream.Stats() } if ingestRt != nil { result["ingest"] = ingestRt.Stats() } result["controlAudit"] = s.auditSnapshot() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(result) } func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { s.recordAudit(auditMethodNotAllowed) http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected return } s.mu.RLock() tx := s.tx s.mu.RUnlock() if tx == nil { http.Error(w, "tx controller not available", http.StatusServiceUnavailable) return } if err := tx.ResetFault(); err != nil { http.Error(w, err.Error(), http.StatusConflict) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) } // handleAudioStream accepts raw S16LE PCM via HTTP POST and pushes // it into the configured ingest http-raw source. Use with: // // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { s.recordAudit(auditMethodNotAllowed) http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } if !isAudioStreamContentType(r) { s.recordAudit(auditUnsupportedMediaType) http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType) return } s.mu.RLock() ingress := s.audioIngress s.mu.RUnlock() if ingress == nil { http.Error(w, "audio ingest not configured (use --audio-http with ingest runtime)", http.StatusServiceUnavailable) return } // BUG-10 fix: /audio/stream is a long-lived streaming endpoint. // The global HTTP server ReadTimeout (5s) and WriteTimeout (10s) would // kill connections mid-stream. Disable them per-request via ResponseController // (requires Go 1.20+, confirmed Go 1.22). rc := http.NewResponseController(w) _ = rc.SetReadDeadline(time.Time{}) _ = rc.SetWriteDeadline(time.Time{}) r.Body = http.MaxBytesReader(w, r.Body, audioStreamBodyLimit) // Read body in chunks and push to ring buffer buf := make([]byte, 32768) totalFrames := 0 for { n, err := r.Body.Read(buf) if n > 0 { written, writeErr := ingress.WritePCM16(buf[:n]) totalFrames += written if writeErr != nil { http.Error(w, writeErr.Error(), http.StatusServiceUnavailable) return } } if err != nil { if err == io.EOF { break } var maxErr *http.MaxBytesError if errors.As(err, &maxErr) { s.recordAudit(auditBodyTooLarge) http.Error(w, maxErr.Error(), http.StatusRequestEntityTooLarge) return } http.Error(w, err.Error(), http.StatusInternalServerError) return } } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{ "ok": true, "frames": totalFrames, }) } func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { s.recordAudit(auditMethodNotAllowed) http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected return } s.mu.RLock() tx := s.tx s.mu.RUnlock() if tx == nil { http.Error(w, "tx controller not available", http.StatusServiceUnavailable) return } if err := tx.StartTX(); err != nil { http.Error(w, err.Error(), http.StatusConflict) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"}) } func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { s.recordAudit(auditMethodNotAllowed) http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected return } s.mu.RLock() tx := s.tx s.mu.RUnlock() if tx == nil { http.Error(w, "tx controller not available", http.StatusServiceUnavailable) return } go func() { _ = tx.StopTX() }() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stop-requested"}) } func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() cfg := s.cfg s.mu.RUnlock() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg)) } func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: s.mu.RLock() cfg := s.cfg s.mu.RUnlock() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(cfg) case http.MethodPost: if !isJSONContentType(r) { s.recordAudit(auditUnsupportedMediaType) http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType) return } r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes) var patch ConfigPatch // BUG-4 fix: reject unknown JSON fields (typos) with 400 rather than // silently ignoring them (e.g. "outputDrvie" would succeed and do nothing). dec := json.NewDecoder(r.Body) dec.DisallowUnknownFields() if err := dec.Decode(&patch); err != nil { statusCode := http.StatusBadRequest if strings.Contains(err.Error(), "http: request body too large") { statusCode = http.StatusRequestEntityTooLarge s.recordAudit(auditBodyTooLarge) } http.Error(w, err.Error(), statusCode) return } // Update the server's config snapshot (for GET /config and /status) s.mu.Lock() next := s.cfg if patch.FrequencyMHz != nil { next.FM.FrequencyMHz = *patch.FrequencyMHz } if patch.OutputDrive != nil { next.FM.OutputDrive = *patch.OutputDrive } if patch.ToneLeftHz != nil { next.Audio.ToneLeftHz = *patch.ToneLeftHz } if patch.ToneRightHz != nil { next.Audio.ToneRightHz = *patch.ToneRightHz } if patch.ToneAmplitude != nil { next.Audio.ToneAmplitude = *patch.ToneAmplitude } if patch.AudioGain != nil { next.Audio.Gain = *patch.AudioGain } if patch.PS != nil { next.RDS.PS = *patch.PS } if patch.RadioText != nil { next.RDS.RadioText = *patch.RadioText } if patch.PI != nil { next.RDS.PI = *patch.PI } if patch.PTY != nil { next.RDS.PTY = *patch.PTY } if patch.TP != nil { next.RDS.TP = *patch.TP } if patch.TA != nil { next.RDS.TA = *patch.TA } if patch.MS != nil { next.RDS.MS = *patch.MS } if patch.CTEnabled != nil { next.RDS.CTEnabled = *patch.CTEnabled } if patch.RTPlusEnabled != nil { next.RDS.RTPlusEnabled = *patch.RTPlusEnabled } if patch.RTPlusSeparator != nil { next.RDS.RTPlusSeparator = *patch.RTPlusSeparator } if patch.PTYN != nil { next.RDS.PTYN = *patch.PTYN } if patch.LPS != nil { next.RDS.LPS = *patch.LPS } if patch.ERTEnabled != nil { next.RDS.ERTEnabled = *patch.ERTEnabled } if patch.ERT != nil { next.RDS.ERT = *patch.ERT } if patch.RDS2Enabled != nil { next.RDS.RDS2Enabled = *patch.RDS2Enabled } if patch.StationLogoPath != nil { next.RDS.StationLogoPath = *patch.StationLogoPath } if patch.AF != nil { next.RDS.AF = *patch.AF } if patch.PreEmphasisTauUS != nil { next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS } if patch.StereoEnabled != nil { next.FM.StereoEnabled = *patch.StereoEnabled } if patch.StereoMode != nil { next.FM.StereoMode = *patch.StereoMode } if patch.LimiterEnabled != nil { next.FM.LimiterEnabled = *patch.LimiterEnabled } if patch.LimiterCeiling != nil { next.FM.LimiterCeiling = *patch.LimiterCeiling } if patch.RDSEnabled != nil { next.RDS.Enabled = *patch.RDSEnabled } if patch.PilotLevel != nil { next.FM.PilotLevel = *patch.PilotLevel } if patch.RDSInjection != nil { next.FM.RDSInjection = *patch.RDSInjection } if patch.BS412Enabled != nil { next.FM.BS412Enabled = *patch.BS412Enabled } if patch.BS412ThresholdDBr != nil { next.FM.BS412ThresholdDBr = *patch.BS412ThresholdDBr } if patch.MpxGain != nil { next.FM.MpxGain = *patch.MpxGain } if patch.CompositeClipperEnabled != nil { next.FM.CompositeClipper.Enabled = *patch.CompositeClipperEnabled } if patch.CompositeClipperIterations != nil { next.FM.CompositeClipper.Iterations = *patch.CompositeClipperIterations } if patch.CompositeClipperSoftKnee != nil { next.FM.CompositeClipper.SoftKnee = *patch.CompositeClipperSoftKnee } if patch.CompositeClipperLookaheadMs != nil { next.FM.CompositeClipper.LookaheadMs = *patch.CompositeClipperLookaheadMs } if err := next.Validate(); err != nil { s.mu.Unlock() http.Error(w, err.Error(), http.StatusBadRequest) return } lp := LivePatch{ FrequencyMHz: patch.FrequencyMHz, OutputDrive: patch.OutputDrive, StereoEnabled: patch.StereoEnabled, StereoMode: patch.StereoMode, PilotLevel: patch.PilotLevel, RDSInjection: patch.RDSInjection, RDSEnabled: patch.RDSEnabled, LimiterEnabled: patch.LimiterEnabled, LimiterCeiling: patch.LimiterCeiling, PS: patch.PS, RadioText: patch.RadioText, TA: patch.TA, TP: patch.TP, ToneLeftHz: patch.ToneLeftHz, ToneRightHz: patch.ToneRightHz, ToneAmplitude: patch.ToneAmplitude, AudioGain: patch.AudioGain, CompositeClipperEnabled: patch.CompositeClipperEnabled, } // NEU-02 fix: determine whether any live-patchable fields are present, // then release the lock before calling UpdateConfig to avoid holding // s.mu across a potentially blocking engine call. tx := s.tx hasLiveFields := patch.FrequencyMHz != nil || patch.OutputDrive != nil || patch.StereoEnabled != nil || patch.StereoMode != nil || patch.PilotLevel != nil || patch.RDSInjection != nil || patch.RDSEnabled != nil || patch.LimiterEnabled != nil || patch.LimiterCeiling != nil || patch.PS != nil || patch.RadioText != nil || patch.TA != nil || patch.TP != nil || patch.ToneLeftHz != nil || patch.ToneRightHz != nil || patch.ToneAmplitude != nil || patch.AudioGain != nil || patch.CompositeClipperEnabled != nil s.mu.Unlock() // Apply live fields to running engine outside the lock. if tx != nil && hasLiveFields { if err := tx.UpdateConfig(lp); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } } // Persist the validated config snapshot when a config saver is available. // This ensures restart-required UI changes survive process restarts instead // of only updating the in-memory snapshot. s.mu.RLock() save := s.saveConfig s.mu.RUnlock() if save != nil { if err := save(next); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } // Commit the server snapshot only after validation, optional persistence, // and any required live update succeeded. s.mu.Lock() s.cfg = next s.mu.Unlock() // NEU-03 fix: report live=true only when live-patchable fields were applied. live := tx != nil && hasLiveFields w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live}) default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } } func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { s.recordAudit(auditMethodNotAllowed) http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } if !isJSONContentType(r) { s.recordAudit(auditUnsupportedMediaType) http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType) return } r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes) var req IngestSaveRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { statusCode := http.StatusBadRequest if strings.Contains(err.Error(), "http: request body too large") { statusCode = http.StatusRequestEntityTooLarge s.recordAudit(auditBodyTooLarge) } http.Error(w, err.Error(), statusCode) return } s.mu.Lock() next := s.cfg next.Ingest = req.Ingest if err := next.Validate(); err != nil { s.mu.Unlock() http.Error(w, err.Error(), http.StatusBadRequest) return } save := s.saveConfig reload := s.hardReload if save == nil { s.mu.Unlock() http.Error(w, "config save is not configured (start with --config )", http.StatusServiceUnavailable) return } if err := save(next); err != nil { s.mu.Unlock() http.Error(w, err.Error(), http.StatusInternalServerError) return } s.cfg = next s.mu.Unlock() w.Header().Set("Content-Type", "application/json") reloadScheduled := reload != nil _ = json.NewEncoder(w).Encode(map[string]any{ "ok": true, "saved": true, "reloadScheduled": reloadScheduled, }) if reloadScheduled && s.reloadPending.CompareAndSwap(false, true) { go func(fn func()) { time.Sleep(250 * time.Millisecond) s.reloadPending.Store(false) fn() }(reload) } }