package control import ( _ "embed" "encoding/json" "io" "mime" "net/http" "strings" "sync" "github.com/jan/fm-rds-tx/internal/audio" "github.com/jan/fm-rds-tx/internal/config" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" "github.com/jan/fm-rds-tx/internal/platform" ) //go:embed ui.html var uiHTML []byte // TXController is an optional interface the Server uses to start/stop TX // and apply live config changes. type TXController interface { StartTX() error StopTX() error TXStats() map[string]any UpdateConfig(patch LivePatch) error ResetFault() error } // LivePatch mirrors the patchable fields from ConfigPatch for the engine. // nil = no change. type LivePatch struct { FrequencyMHz *float64 OutputDrive *float64 StereoEnabled *bool PilotLevel *float64 RDSInjection *float64 RDSEnabled *bool LimiterEnabled *bool LimiterCeiling *float64 PS *string RadioText *string } type Server struct { mu sync.RWMutex cfg config.Config tx TXController drv platform.SoapyDriver // optional, for runtime stats streamSrc *audio.StreamSource // optional, for live audio ingest } const ( maxConfigBodyBytes = 64 << 10 // 64 KiB configContentTypeHeader = "application/json" ) func isJSONContentType(r *http.Request) bool { ct := strings.TrimSpace(r.Header.Get("Content-Type")) if ct == "" { return false } mediaType, _, err := mime.ParseMediaType(ct) if err != nil { return false } return strings.EqualFold(mediaType, configContentTypeHeader) } type ConfigPatch struct { FrequencyMHz *float64 `json:"frequencyMHz,omitempty"` OutputDrive *float64 `json:"outputDrive,omitempty"` StereoEnabled *bool `json:"stereoEnabled,omitempty"` PilotLevel *float64 `json:"pilotLevel,omitempty"` RDSInjection *float64 `json:"rdsInjection,omitempty"` RDSEnabled *bool `json:"rdsEnabled,omitempty"` ToneLeftHz *float64 `json:"toneLeftHz,omitempty"` ToneRightHz *float64 `json:"toneRightHz,omitempty"` ToneAmplitude *float64 `json:"toneAmplitude,omitempty"` PS *string `json:"ps,omitempty"` RadioText *string `json:"radioText,omitempty"` PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"` LimiterEnabled *bool `json:"limiterEnabled,omitempty"` LimiterCeiling *float64 `json:"limiterCeiling,omitempty"` } func NewServer(cfg config.Config) *Server { return &Server{cfg: cfg} } func (s *Server) SetTXController(tx TXController) { s.mu.Lock() s.tx = tx s.mu.Unlock() } func (s *Server) SetDriver(drv platform.SoapyDriver) { s.mu.Lock() s.drv = drv s.mu.Unlock() } func (s *Server) SetStreamSource(src *audio.StreamSource) { s.mu.Lock() s.streamSrc = src s.mu.Unlock() } func (s *Server) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/", s.handleUI) mux.HandleFunc("/healthz", s.handleHealth) mux.HandleFunc("/status", s.handleStatus) mux.HandleFunc("/dry-run", s.handleDryRun) mux.HandleFunc("/config", s.handleConfig) mux.HandleFunc("/runtime", s.handleRuntime) mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset) mux.HandleFunc("/tx/start", s.handleTXStart) mux.HandleFunc("/tx/stop", s.handleTXStop) mux.HandleFunc("/audio/stream", s.handleAudioStream) return mux } func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) } func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { http.NotFound(w, r) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Cache-Control", "no-cache") w.Write(uiHTML) } func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() cfg := s.cfg tx := s.tx s.mu.RUnlock() status := map[string]any{ "service": "fm-rds-tx", "backend": cfg.Backend.Kind, "frequencyMHz": cfg.FM.FrequencyMHz, "stereoEnabled": cfg.FM.StereoEnabled, "rdsEnabled": cfg.RDS.Enabled, "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS, "limiterEnabled": cfg.FM.LimiterEnabled, "fmModulationEnabled": cfg.FM.FMModulationEnabled, } if tx != nil { if stats := tx.TXStats(); stats != nil { if ri, ok := stats["runtimeIndicator"]; ok { status["runtimeIndicator"] = ri } if alert, ok := stats["runtimeAlert"]; ok { status["runtimeAlert"] = alert } if queue, ok := stats["queue"]; ok { status["queue"] = queue } if runtimeState, ok := stats["state"]; ok { status["runtimeState"] = runtimeState } } } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(status) } func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() drv := s.drv tx := s.tx stream := s.streamSrc s.mu.RUnlock() result := map[string]any{} if drv != nil { result["driver"] = drv.Stats() } if tx != nil { result["engine"] = tx.TXStats() } if stream != nil { result["audioStream"] = stream.Stats() } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(result) } func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } s.mu.RLock() tx := s.tx s.mu.RUnlock() if tx == nil { http.Error(w, "tx controller not available", http.StatusServiceUnavailable) return } if err := tx.ResetFault(); err != nil { http.Error(w, err.Error(), http.StatusConflict) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) } // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes // it into the live audio ring buffer. Use with: // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } s.mu.RLock() stream := s.streamSrc s.mu.RUnlock() if stream == nil { http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable) return } // Read body in chunks and push to ring buffer buf := make([]byte, 32768) totalFrames := 0 for { n, err := r.Body.Read(buf) if n > 0 { totalFrames += stream.WritePCM(buf[:n]) } if err != nil { if err == io.EOF { break } http.Error(w, err.Error(), http.StatusInternalServerError) return } } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{ "ok": true, "frames": totalFrames, "stats": stream.Stats(), }) } func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } s.mu.RLock() tx := s.tx s.mu.RUnlock() if tx == nil { http.Error(w, "tx controller not available", http.StatusServiceUnavailable) return } if err := tx.StartTX(); err != nil { http.Error(w, err.Error(), http.StatusConflict) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"}) } func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } s.mu.RLock() tx := s.tx s.mu.RUnlock() if tx == nil { http.Error(w, "tx controller not available", http.StatusServiceUnavailable) return } if err := tx.StopTX(); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"}) } func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() cfg := s.cfg s.mu.RUnlock() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg)) } func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: s.mu.RLock() cfg := s.cfg s.mu.RUnlock() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(cfg) case http.MethodPost: if !isJSONContentType(r) { http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType) return } r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes) var patch ConfigPatch if err := json.NewDecoder(r.Body).Decode(&patch); err != nil { statusCode := http.StatusBadRequest if strings.Contains(err.Error(), "http: request body too large") { statusCode = http.StatusRequestEntityTooLarge } http.Error(w, err.Error(), statusCode) return } // Update the server's config snapshot (for GET /config and /status) s.mu.Lock() next := s.cfg if patch.FrequencyMHz != nil { next.FM.FrequencyMHz = *patch.FrequencyMHz } if patch.OutputDrive != nil { next.FM.OutputDrive = *patch.OutputDrive } if patch.ToneLeftHz != nil { next.Audio.ToneLeftHz = *patch.ToneLeftHz } if patch.ToneRightHz != nil { next.Audio.ToneRightHz = *patch.ToneRightHz } if patch.ToneAmplitude != nil { next.Audio.ToneAmplitude = *patch.ToneAmplitude } if patch.PS != nil { next.RDS.PS = *patch.PS } if patch.RadioText != nil { next.RDS.RadioText = *patch.RadioText } if patch.PreEmphasisTauUS != nil { next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS } if patch.StereoEnabled != nil { next.FM.StereoEnabled = *patch.StereoEnabled } if patch.LimiterEnabled != nil { next.FM.LimiterEnabled = *patch.LimiterEnabled } if patch.LimiterCeiling != nil { next.FM.LimiterCeiling = *patch.LimiterCeiling } if patch.RDSEnabled != nil { next.RDS.Enabled = *patch.RDSEnabled } if patch.PilotLevel != nil { next.FM.PilotLevel = *patch.PilotLevel } if patch.RDSInjection != nil { next.FM.RDSInjection = *patch.RDSInjection } if err := next.Validate(); err != nil { s.mu.Unlock() http.Error(w, err.Error(), http.StatusBadRequest) return } lp := LivePatch{ FrequencyMHz: patch.FrequencyMHz, OutputDrive: patch.OutputDrive, StereoEnabled: patch.StereoEnabled, PilotLevel: patch.PilotLevel, RDSInjection: patch.RDSInjection, RDSEnabled: patch.RDSEnabled, LimiterEnabled: patch.LimiterEnabled, LimiterCeiling: patch.LimiterCeiling, PS: patch.PS, RadioText: patch.RadioText, } tx := s.tx if tx != nil { if err := tx.UpdateConfig(lp); err != nil { s.mu.Unlock() http.Error(w, err.Error(), http.StatusBadRequest) return } } s.cfg = next live := tx != nil s.mu.Unlock() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live}) default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } }