|
- package control
-
- import (
- _ "embed"
- "encoding/json"
- "io"
- "net/http"
- "sync"
-
- "github.com/jan/fm-rds-tx/internal/audio"
- "github.com/jan/fm-rds-tx/internal/config"
- drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
- "github.com/jan/fm-rds-tx/internal/platform"
- )
-
- //go:embed ui.html
- var uiHTML []byte
-
- // TXController is an optional interface the Server uses to start/stop TX
- // and apply live config changes.
- type TXController interface {
- StartTX() error
- StopTX() error
- TXStats() map[string]any
- UpdateConfig(patch LivePatch) error
- }
-
- // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
- // nil = no change.
- type LivePatch struct {
- FrequencyMHz *float64
- OutputDrive *float64
- StereoEnabled *bool
- PilotLevel *float64
- RDSInjection *float64
- RDSEnabled *bool
- LimiterEnabled *bool
- LimiterCeiling *float64
- PS *string
- RadioText *string
- }
-
- type Server struct {
- mu sync.RWMutex
- cfg config.Config
- tx TXController
- drv platform.SoapyDriver // optional, for runtime stats
- streamSrc *audio.StreamSource // optional, for live audio ingest
- }
-
- type ConfigPatch struct {
- FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
- OutputDrive *float64 `json:"outputDrive,omitempty"`
- StereoEnabled *bool `json:"stereoEnabled,omitempty"`
- PilotLevel *float64 `json:"pilotLevel,omitempty"`
- RDSInjection *float64 `json:"rdsInjection,omitempty"`
- RDSEnabled *bool `json:"rdsEnabled,omitempty"`
- ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
- ToneRightHz *float64 `json:"toneRightHz,omitempty"`
- ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
- PS *string `json:"ps,omitempty"`
- RadioText *string `json:"radioText,omitempty"`
- PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
- LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
- LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
- }
-
- func NewServer(cfg config.Config) *Server {
- return &Server{cfg: cfg}
- }
-
- func (s *Server) SetTXController(tx TXController) {
- s.mu.Lock()
- s.tx = tx
- s.mu.Unlock()
- }
-
- func (s *Server) SetDriver(drv platform.SoapyDriver) {
- s.mu.Lock()
- s.drv = drv
- s.mu.Unlock()
- }
-
- func (s *Server) SetStreamSource(src *audio.StreamSource) {
- s.mu.Lock()
- s.streamSrc = src
- s.mu.Unlock()
- }
-
- func (s *Server) Handler() http.Handler {
- mux := http.NewServeMux()
- mux.HandleFunc("/", s.handleUI)
- mux.HandleFunc("/healthz", s.handleHealth)
- mux.HandleFunc("/status", s.handleStatus)
- mux.HandleFunc("/dry-run", s.handleDryRun)
- mux.HandleFunc("/config", s.handleConfig)
- mux.HandleFunc("/runtime", s.handleRuntime)
- mux.HandleFunc("/tx/start", s.handleTXStart)
- mux.HandleFunc("/tx/stop", s.handleTXStop)
- mux.HandleFunc("/audio/stream", s.handleAudioStream)
- return mux
- }
-
- func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
- }
-
- func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
- if r.URL.Path != "/" {
- http.NotFound(w, r)
- return
- }
- w.Header().Set("Content-Type", "text/html; charset=utf-8")
- w.Header().Set("Cache-Control", "no-cache")
- w.Write(uiHTML)
- }
-
- func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
- s.mu.RLock()
- cfg := s.cfg
- s.mu.RUnlock()
-
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(map[string]any{
- "service": "fm-rds-tx",
- "backend": cfg.Backend.Kind,
- "frequencyMHz": cfg.FM.FrequencyMHz,
- "stereoEnabled": cfg.FM.StereoEnabled,
- "rdsEnabled": cfg.RDS.Enabled,
- "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
- "limiterEnabled": cfg.FM.LimiterEnabled,
- "fmModulationEnabled": cfg.FM.FMModulationEnabled,
- })
- }
-
- func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
- s.mu.RLock()
- drv := s.drv
- tx := s.tx
- stream := s.streamSrc
- s.mu.RUnlock()
-
- result := map[string]any{}
- if drv != nil {
- result["driver"] = drv.Stats()
- }
- if tx != nil {
- result["engine"] = tx.TXStats()
- }
- if stream != nil {
- result["audioStream"] = stream.Stats()
- }
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(result)
- }
-
- // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
- // it into the live audio ring buffer. Use with:
- // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
- // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
- func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
- s.mu.RLock()
- stream := s.streamSrc
- s.mu.RUnlock()
-
- if stream == nil {
- http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
- return
- }
-
- // Read body in chunks and push to ring buffer
- buf := make([]byte, 32768)
- totalFrames := 0
- for {
- n, err := r.Body.Read(buf)
- if n > 0 {
- totalFrames += stream.WritePCM(buf[:n])
- }
- if err != nil {
- if err == io.EOF {
- break
- }
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- }
-
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(map[string]any{
- "ok": true,
- "frames": totalFrames,
- "stats": stream.Stats(),
- })
- }
-
- func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
- s.mu.RLock()
- tx := s.tx
- s.mu.RUnlock()
- if tx == nil {
- http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
- return
- }
- if err := tx.StartTX(); err != nil {
- http.Error(w, err.Error(), http.StatusConflict)
- return
- }
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
- }
-
- func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
- s.mu.RLock()
- tx := s.tx
- s.mu.RUnlock()
- if tx == nil {
- http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
- return
- }
- if err := tx.StopTX(); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"})
- }
-
- func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
- s.mu.RLock()
- cfg := s.cfg
- s.mu.RUnlock()
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
- }
-
- func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
- switch r.Method {
- case http.MethodGet:
- s.mu.RLock()
- cfg := s.cfg
- s.mu.RUnlock()
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(cfg)
- case http.MethodPost:
- var patch ConfigPatch
- if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
-
- // Update the server's config snapshot (for GET /config and /status)
- s.mu.Lock()
- next := s.cfg
- if patch.FrequencyMHz != nil { next.FM.FrequencyMHz = *patch.FrequencyMHz }
- if patch.OutputDrive != nil { next.FM.OutputDrive = *patch.OutputDrive }
- if patch.ToneLeftHz != nil { next.Audio.ToneLeftHz = *patch.ToneLeftHz }
- if patch.ToneRightHz != nil { next.Audio.ToneRightHz = *patch.ToneRightHz }
- if patch.ToneAmplitude != nil { next.Audio.ToneAmplitude = *patch.ToneAmplitude }
- if patch.PS != nil { next.RDS.PS = *patch.PS }
- if patch.RadioText != nil { next.RDS.RadioText = *patch.RadioText }
- if patch.PreEmphasisTauUS != nil { next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS }
- if patch.StereoEnabled != nil { next.FM.StereoEnabled = *patch.StereoEnabled }
- if patch.LimiterEnabled != nil { next.FM.LimiterEnabled = *patch.LimiterEnabled }
- if patch.LimiterCeiling != nil { next.FM.LimiterCeiling = *patch.LimiterCeiling }
- if patch.RDSEnabled != nil { next.RDS.Enabled = *patch.RDSEnabled }
- if patch.PilotLevel != nil { next.FM.PilotLevel = *patch.PilotLevel }
- if patch.RDSInjection != nil { next.FM.RDSInjection = *patch.RDSInjection }
- if err := next.Validate(); err != nil {
- s.mu.Unlock()
- http.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- lp := LivePatch{
- FrequencyMHz: patch.FrequencyMHz,
- OutputDrive: patch.OutputDrive,
- StereoEnabled: patch.StereoEnabled,
- PilotLevel: patch.PilotLevel,
- RDSInjection: patch.RDSInjection,
- RDSEnabled: patch.RDSEnabled,
- LimiterEnabled: patch.LimiterEnabled,
- LimiterCeiling: patch.LimiterCeiling,
- PS: patch.PS,
- RadioText: patch.RadioText,
- }
- tx := s.tx
- if tx != nil {
- if err := tx.UpdateConfig(lp); err != nil {
- s.mu.Unlock()
- http.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- }
- s.cfg = next
- live := tx != nil
- s.mu.Unlock()
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
- default:
- http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- }
- }
|