From 0dd415609703da2e24f163ced1fa15ca1d1f4b5b Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Fri, 3 Apr 2026 10:06:23 +0200 Subject: [PATCH] feat: add TX engine, runtime telemetry and explicit TX control --- cmd/fmrtx/main.go | 28 ++--- internal/app/engine.go | 179 +++++++++++++++++++++++++++++++ internal/app/engine_test.go | 81 ++++++++++++++ internal/app/sim.go | 88 +++++++-------- internal/control/control.go | 131 +++++++++++++++++----- internal/control/control_test.go | 69 +++++------- internal/platform/soapy.go | 133 +++++++++++++++++++---- 7 files changed, 553 insertions(+), 156 deletions(-) create mode 100644 internal/app/engine.go create mode 100644 internal/app/engine_test.go diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index 08d699b..5127fd8 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -25,43 +25,31 @@ func main() { flag.Parse() cfg, err := cfgpkg.Load(*configPath) - if err != nil { - log.Fatalf("load config: %v", err) - } + if err != nil { log.Fatalf("load config: %v", err) } if *printConfig { preemph := "off" - if cfg.FM.PreEmphasisUS > 0 { - preemph = fmt.Sprintf("%.0fµs", cfg.FM.PreEmphasisUS) - } - fmt.Printf("backend=%s freq=%.1fMHz stereo=%t rds=%t preemph=%s limiter=%t fmmod=%t deviation=±%.0fHz listen=%s\n", + if cfg.FM.PreEmphasisTauUS > 0 { preemph = fmt.Sprintf("%.0fµs", cfg.FM.PreEmphasisTauUS) } + fmt.Printf("backend=%s freq=%.1fMHz stereo=%t rds=%t preemph=%s limiter=%t fmmod=%t deviation=±%.0fHz deviceRate=%.0fHz listen=%s\n", cfg.Backend.Kind, cfg.FM.FrequencyMHz, cfg.FM.StereoEnabled, cfg.RDS.Enabled, preemph, cfg.FM.LimiterEnabled, cfg.FM.FMModulationEnabled, cfg.FM.MaxDeviationHz, - cfg.Control.ListenAddress) + cfg.EffectiveDeviceRate(), cfg.Control.ListenAddress) return } - if *dryRun { frame := drypkg.Generate(cfg) - if err := drypkg.WriteJSON(*dryOutput, frame); err != nil { - log.Fatalf("dry-run failed: %v", err) - } - if *dryOutput != "" && *dryOutput != "-" { - fmt.Fprintf(os.Stderr, "dry run frame written to %s\n", *dryOutput) - } + if err := drypkg.WriteJSON(*dryOutput, frame); err != nil { log.Fatalf("dry-run: %v", err) } + if *dryOutput != "" && *dryOutput != "-" { fmt.Fprintf(os.Stderr, "dry run frame written to %s\n", *dryOutput) } return } - if *simulate { summary, err := apppkg.RunSimulatedTransmit(cfg, *simulateOutput, *simulateDuration) - if err != nil { - log.Fatalf("simulate-tx failed: %v", err) - } + if err != nil { log.Fatalf("simulate-tx: %v", err) } fmt.Println(summary) return } srv := ctrlpkg.NewServer(cfg) - log.Printf("fm-rds-tx listening on %s", cfg.Control.ListenAddress) + log.Printf("fm-rds-tx listening on %s (TX default: off, use POST /tx/start)", cfg.Control.ListenAddress) log.Fatal(http.ListenAndServe(cfg.Control.ListenAddress, srv.Handler())) } diff --git a/internal/app/engine.go b/internal/app/engine.go new file mode 100644 index 0000000..2b8e220 --- /dev/null +++ b/internal/app/engine.go @@ -0,0 +1,179 @@ +package app + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + cfgpkg "github.com/jan/fm-rds-tx/internal/config" + offpkg "github.com/jan/fm-rds-tx/internal/offline" + "github.com/jan/fm-rds-tx/internal/platform" +) + +// EngineState represents the current state of the TX engine. +type EngineState int + +const ( + EngineIdle EngineState = iota + EngineRunning + EngineStopping +) + +func (s EngineState) String() string { + switch s { + case EngineIdle: + return "idle" + case EngineRunning: + return "running" + case EngineStopping: + return "stopping" + default: + return "unknown" + } +} + +// EngineStats exposes runtime telemetry from the engine. +type EngineStats struct { + State string `json:"state"` + ChunksProduced uint64 `json:"chunksProduced"` + TotalSamples uint64 `json:"totalSamples"` + Underruns uint64 `json:"underruns"` + LastError string `json:"lastError,omitempty"` + UptimeSeconds float64 `json:"uptimeSeconds"` +} + +// Engine is the continuous TX loop that produces chunks of composite/IQ +// samples and feeds them to a backend driver. +type Engine struct { + cfg cfgpkg.Config + driver platform.SoapyDriver + generator *offpkg.Generator + chunkDuration time.Duration + + mu sync.Mutex + state EngineState + cancel context.CancelFunc + startedAt time.Time + + chunksProduced atomic.Uint64 + totalSamples atomic.Uint64 + underruns atomic.Uint64 + lastError atomic.Value // string +} + +// NewEngine creates a TX engine. Default chunk duration is 50ms. +func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine { + return &Engine{ + cfg: cfg, + driver: driver, + generator: offpkg.NewGenerator(cfg), + chunkDuration: 50 * time.Millisecond, + state: EngineIdle, + } +} + +// SetChunkDuration changes the generation chunk size. Must be called before Start. +func (e *Engine) SetChunkDuration(d time.Duration) { + e.chunkDuration = d +} + +// Start begins continuous transmission. TX is NOT started automatically. +func (e *Engine) Start(ctx context.Context) error { + e.mu.Lock() + if e.state != EngineIdle { + e.mu.Unlock() + return fmt.Errorf("engine already in state %s", e.state) + } + + if err := e.driver.Start(ctx); err != nil { + e.mu.Unlock() + return fmt.Errorf("driver start: %w", err) + } + + runCtx, cancel := context.WithCancel(ctx) + e.cancel = cancel + e.state = EngineRunning + e.startedAt = time.Now() + e.mu.Unlock() + + go e.run(runCtx) + return nil +} + +// Stop gracefully stops the TX engine. +func (e *Engine) Stop(ctx context.Context) error { + e.mu.Lock() + if e.state != EngineRunning { + e.mu.Unlock() + return nil + } + e.state = EngineStopping + e.cancel() + e.mu.Unlock() + + // Give the run loop time to drain + time.Sleep(e.chunkDuration * 2) + + if err := e.driver.Flush(ctx); err != nil { + return err + } + if err := e.driver.Stop(ctx); err != nil { + return err + } + + e.mu.Lock() + e.state = EngineIdle + e.mu.Unlock() + return nil +} + +// Stats returns current engine telemetry. +func (e *Engine) Stats() EngineStats { + e.mu.Lock() + state := e.state + startedAt := e.startedAt + e.mu.Unlock() + + var uptime float64 + if state == EngineRunning { + uptime = time.Since(startedAt).Seconds() + } + + errVal, _ := e.lastError.Load().(string) + + return EngineStats{ + State: state.String(), + ChunksProduced: e.chunksProduced.Load(), + TotalSamples: e.totalSamples.Load(), + Underruns: e.underruns.Load(), + LastError: errVal, + UptimeSeconds: uptime, + } +} + +func (e *Engine) run(ctx context.Context) { + ticker := time.NewTicker(e.chunkDuration) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + frame := e.generator.GenerateFrame(e.chunkDuration) + n, err := e.driver.Write(ctx, frame) + if err != nil { + if ctx.Err() != nil { + return // clean shutdown + } + e.lastError.Store(err.Error()) + e.underruns.Add(1) + continue + } + e.chunksProduced.Add(1) + e.totalSamples.Add(uint64(n)) + } + } +} diff --git a/internal/app/engine_test.go b/internal/app/engine_test.go new file mode 100644 index 0000000..b3e0259 --- /dev/null +++ b/internal/app/engine_test.go @@ -0,0 +1,81 @@ +package app + +import ( + "context" + "testing" + "time" + + cfgpkg "github.com/jan/fm-rds-tx/internal/config" + "github.com/jan/fm-rds-tx/internal/platform" +) + +func TestEngineContinuousRun(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + eng.SetChunkDuration(10 * time.Millisecond) + + ctx := context.Background() + if err := eng.Start(ctx); err != nil { + t.Fatalf("start: %v", err) + } + + // Let it run for 200ms + time.Sleep(200 * time.Millisecond) + + stats := eng.Stats() + if stats.State != "running" { + t.Fatalf("expected running, got %s", stats.State) + } + if stats.ChunksProduced < 5 { + t.Fatalf("expected at least 5 chunks, got %d", stats.ChunksProduced) + } + if stats.TotalSamples == 0 { + t.Fatal("expected non-zero samples") + } + + if err := eng.Stop(ctx); err != nil { + t.Fatalf("stop: %v", err) + } + + stats = eng.Stats() + if stats.State != "idle" { + t.Fatalf("expected idle after stop, got %s", stats.State) + } +} + +func TestEngineDoubleStartFails(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + + ctx := context.Background() + if err := eng.Start(ctx); err != nil { + t.Fatalf("first start: %v", err) + } + defer eng.Stop(ctx) + + if err := eng.Start(ctx); err == nil { + t.Fatal("expected error on double start") + } +} + +func TestEngineDriverStats(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + eng.SetChunkDuration(10 * time.Millisecond) + + ctx := context.Background() + _ = eng.Start(ctx) + time.Sleep(100 * time.Millisecond) + _ = eng.Stop(ctx) + + driverStats := driver.Stats() + if driverStats.SamplesWritten == 0 { + t.Fatal("expected driver to have written samples") + } + if driverStats.FramesWritten == 0 { + t.Fatal("expected driver to have written frames") + } +} diff --git a/internal/app/sim.go b/internal/app/sim.go index a4952da..3e70d11 100644 --- a/internal/app/sim.go +++ b/internal/app/sim.go @@ -1,63 +1,53 @@ package app import ( - "context" - "encoding/binary" - "fmt" - "path/filepath" - "time" + "context" + "encoding/binary" + "fmt" + "path/filepath" + "time" - cfgpkg "github.com/jan/fm-rds-tx/internal/config" - offpkg "github.com/jan/fm-rds-tx/internal/offline" - "github.com/jan/fm-rds-tx/internal/output" - "github.com/jan/fm-rds-tx/internal/platform" + cfgpkg "github.com/jan/fm-rds-tx/internal/config" + offpkg "github.com/jan/fm-rds-tx/internal/offline" + "github.com/jan/fm-rds-tx/internal/output" + "github.com/jan/fm-rds-tx/internal/platform" ) func RunSimulatedTransmit(cfg cfgpkg.Config, outPath string, duration time.Duration) (string, error) { - if outPath == "" { - outPath = filepath.Join("build", "sim", "simulated-soapy.iqf32") - } + if outPath == "" { + outPath = filepath.Join("build", "sim", "simulated-soapy.iqf32") + } + fileBackend, err := output.NewFileBackend(outPath, binary.LittleEndian, output.BackendInfo{ + Name: "simulated-soapy-file", Description: "simulated soapy sink to file", + }) + if err != nil { return "", err } + defer fileBackend.Close(context.Background()) - fileBackend, err := output.NewFileBackend(outPath, binary.LittleEndian, output.BackendInfo{ - Name: "simulated-soapy-file", - Description: "simulated soapy sink to file", - }) - if err != nil { - return "", err - } - defer fileBackend.Close(context.Background()) + soapyCfg := platform.SoapyConfig{ + BackendConfig: output.BackendConfig{ + SampleRateHz: float64(cfg.FM.CompositeRateHz), Channels: 2, + IQLevel: float32(cfg.FM.OutputDrive), + }, + Driver: "simulated", Device: cfg.Backend.Device, + CenterFreqHz: cfg.FM.FrequencyMHz * 1_000_000, + Simulated: true, SimulationPath: outPath, + } + driver := platform.NewSimulatedDriver(fileBackend) + backend := platform.NewSoapyBackend(soapyCfg, driver) + if err := backend.Configure(context.Background(), soapyCfg.BackendConfig); err != nil { return "", err } + if err := driver.Start(context.Background()); err != nil { return "", err } - soapyCfg := platform.SoapyConfig{ - BackendConfig: output.BackendConfig{ - SampleRateHz: float64(cfg.FM.CompositeRateHz), - Channels: 2, - IQLevel: float32(cfg.FM.OutputDrive), - }, - Driver: "simulated", - Device: cfg.Backend.Device, - CenterFreqHz: cfg.FM.FrequencyMHz * 1_000_000, - Simulated: true, - SimulationPath: outPath, - } - backend := platform.NewSoapyBackend(soapyCfg, platform.NewSimulatedDriver(fileBackend)) - if err := backend.Configure(context.Background(), soapyCfg.BackendConfig); err != nil { - return "", err - } + gen := offpkg.NewGenerator(cfg) + frame := gen.GenerateFrame(duration) + if _, err := backend.Write(context.Background(), frame); err != nil { return "", err } + if err := backend.Flush(context.Background()); err != nil { return "", err } + _ = driver.Stop(context.Background()) - gen := offpkg.NewGenerator(cfg) - frame := gen.GenerateFrame(duration) - if _, err := backend.Write(context.Background(), frame); err != nil { - return "", err - } - if err := backend.Flush(context.Background()); err != nil { - return "", err - } - return fmt.Sprintf("simulated transmit: backend=%s output=%s duration=%s input=%s freq=%.1fMHz rate=%d", backend.Info().Name, outPath, duration, inputLabel(cfg), cfg.FM.FrequencyMHz, cfg.FM.CompositeRateHz), nil + return fmt.Sprintf("simulated transmit: backend=%s output=%s duration=%s input=%s freq=%.1fMHz rate=%d", + backend.Info().Name, outPath, duration, inputLabel(cfg), cfg.FM.FrequencyMHz, cfg.FM.CompositeRateHz), nil } func inputLabel(cfg cfgpkg.Config) string { - if cfg.Audio.InputPath != "" { - return cfg.Audio.InputPath - } - return "tones" + if cfg.Audio.InputPath != "" { return cfg.Audio.InputPath } + return "tones" } diff --git a/internal/control/control.go b/internal/control/control.go index 5d410ee..0005cd5 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -7,27 +7,51 @@ import ( "github.com/jan/fm-rds-tx/internal/config" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" + "github.com/jan/fm-rds-tx/internal/platform" ) +// TXController is an optional interface the Server uses to start/stop TX. +type TXController interface { + StartTX() error + StopTX() error + TXStats() map[string]any +} + type Server struct { - mu sync.RWMutex - cfg config.Config + mu sync.RWMutex + cfg config.Config + tx TXController + drv platform.SoapyDriver // optional, for runtime stats } type ConfigPatch struct { - FrequencyMHz *float64 `json:"frequencyMHz,omitempty"` - OutputDrive *float64 `json:"outputDrive,omitempty"` - ToneLeftHz *float64 `json:"toneLeftHz,omitempty"` - ToneRightHz *float64 `json:"toneRightHz,omitempty"` - ToneAmplitude *float64 `json:"toneAmplitude,omitempty"` - PS *string `json:"ps,omitempty"` - RadioText *string `json:"radioText,omitempty"` - PreEmphasisUS *float64 `json:"preEmphasisUS,omitempty"` - LimiterEnabled *bool `json:"limiterEnabled,omitempty"` - LimiterCeiling *float64 `json:"limiterCeiling,omitempty"` + FrequencyMHz *float64 `json:"frequencyMHz,omitempty"` + OutputDrive *float64 `json:"outputDrive,omitempty"` + ToneLeftHz *float64 `json:"toneLeftHz,omitempty"` + ToneRightHz *float64 `json:"toneRightHz,omitempty"` + ToneAmplitude *float64 `json:"toneAmplitude,omitempty"` + PS *string `json:"ps,omitempty"` + RadioText *string `json:"radioText,omitempty"` + PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"` + LimiterEnabled *bool `json:"limiterEnabled,omitempty"` + LimiterCeiling *float64 `json:"limiterCeiling,omitempty"` } -func NewServer(cfg config.Config) *Server { return &Server{cfg: cfg} } +func NewServer(cfg config.Config) *Server { + return &Server{cfg: cfg} +} + +func (s *Server) SetTXController(tx TXController) { + s.mu.Lock() + s.tx = tx + s.mu.Unlock() +} + +func (s *Server) SetDriver(drv platform.SoapyDriver) { + s.mu.Lock() + s.drv = drv + s.mu.Unlock() +} func (s *Server) Handler() http.Handler { mux := http.NewServeMux() @@ -35,6 +59,9 @@ func (s *Server) Handler() http.Handler { mux.HandleFunc("/status", s.handleStatus) mux.HandleFunc("/dry-run", s.handleDryRun) mux.HandleFunc("/config", s.handleConfig) + mux.HandleFunc("/runtime", s.handleRuntime) + mux.HandleFunc("/tx/start", s.handleTXStart) + mux.HandleFunc("/tx/stop", s.handleTXStop) return mux } @@ -50,24 +77,78 @@ func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{ - "service": "fm-rds-tx", - "backend": cfg.Backend.Kind, - "frequencyMHz": cfg.FM.FrequencyMHz, - "stereoEnabled": cfg.FM.StereoEnabled, - "rdsEnabled": cfg.RDS.Enabled, - "toneLeftHz": cfg.Audio.ToneLeftHz, - "toneRightHz": cfg.Audio.ToneRightHz, - "preEmphasisUS": cfg.FM.PreEmphasisUS, - "limiterEnabled": cfg.FM.LimiterEnabled, + "service": "fm-rds-tx", + "backend": cfg.Backend.Kind, + "frequencyMHz": cfg.FM.FrequencyMHz, + "stereoEnabled": cfg.FM.StereoEnabled, + "rdsEnabled": cfg.RDS.Enabled, + "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS, + "limiterEnabled": cfg.FM.LimiterEnabled, "fmModulationEnabled": cfg.FM.FMModulationEnabled, }) } +func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) { + s.mu.RLock() + drv := s.drv + tx := s.tx + s.mu.RUnlock() + + result := map[string]any{} + if drv != nil { + result["driver"] = drv.Stats() + } + if tx != nil { + result["engine"] = tx.TXStats() + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(result) +} + +func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + s.mu.RLock() + tx := s.tx + s.mu.RUnlock() + if tx == nil { + http.Error(w, "tx controller not available", http.StatusServiceUnavailable) + return + } + if err := tx.StartTX(); err != nil { + http.Error(w, err.Error(), http.StatusConflict) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"}) +} + +func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + s.mu.RLock() + tx := s.tx + s.mu.RUnlock() + if tx == nil { + http.Error(w, "tx controller not available", http.StatusServiceUnavailable) + return + } + if err := tx.StopTX(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"}) +} + func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() cfg := s.cfg s.mu.RUnlock() - w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg)) } @@ -110,8 +191,8 @@ func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { if patch.RadioText != nil { next.RDS.RadioText = *patch.RadioText } - if patch.PreEmphasisUS != nil { - next.FM.PreEmphasisUS = *patch.PreEmphasisUS + if patch.PreEmphasisTauUS != nil { + next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS } if patch.LimiterEnabled != nil { next.FM.LimiterEnabled = *patch.LimiterEnabled diff --git a/internal/control/control_test.go b/internal/control/control_test.go index 5f7049e..6172102 100644 --- a/internal/control/control_test.go +++ b/internal/control/control_test.go @@ -12,65 +12,50 @@ import ( func TestHealthz(t *testing.T) { srv := NewServer(cfgpkg.Default()) - req := httptest.NewRequest(http.MethodGet, "/healthz", nil) rec := httptest.NewRecorder() - srv.Handler().ServeHTTP(rec, req) - if rec.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", rec.Code) - } + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/healthz", nil)) + if rec.Code != 200 { t.Fatalf("status: %d", rec.Code) } } func TestStatus(t *testing.T) { srv := NewServer(cfgpkg.Default()) - req := httptest.NewRequest(http.MethodGet, "/status", nil) rec := httptest.NewRecorder() - srv.Handler().ServeHTTP(rec, req) - if rec.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", rec.Code) - } + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/status", nil)) + if rec.Code != 200 { t.Fatalf("status: %d", rec.Code) } var body map[string]any - if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { - t.Fatalf("decode body: %v", err) - } - if body["service"] != "fm-rds-tx" { - t.Fatalf("unexpected service: %v", body["service"]) - } - if _, ok := body["preEmphasisUS"]; !ok { - t.Fatal("expected preEmphasisUS in status") - } + json.Unmarshal(rec.Body.Bytes(), &body) + if body["service"] != "fm-rds-tx" { t.Fatal("missing service") } + if _, ok := body["preEmphasisTauUS"]; !ok { t.Fatal("missing preEmphasisTauUS") } } func TestDryRunEndpoint(t *testing.T) { srv := NewServer(cfgpkg.Default()) - req := httptest.NewRequest(http.MethodGet, "/dry-run", nil) rec := httptest.NewRecorder() - srv.Handler().ServeHTTP(rec, req) - if rec.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", rec.Code) - } + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/dry-run", nil)) + if rec.Code != 200 { t.Fatalf("status: %d", rec.Code) } var body map[string]any - if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { - t.Fatalf("decode body: %v", err) - } - if body["mode"] != "dry-run" { - t.Fatalf("unexpected mode: %v", body["mode"]) - } + json.Unmarshal(rec.Body.Bytes(), &body) + if body["mode"] != "dry-run" { t.Fatal("wrong mode") } } func TestConfigPatch(t *testing.T) { srv := NewServer(cfgpkg.Default()) - body := []byte(`{"toneLeftHz":900,"radioText":"hello world","preEmphasisUS":75}`) - req := httptest.NewRequest(http.MethodPost, "/config", bytes.NewReader(body)) + body := []byte(`{"toneLeftHz":900,"radioText":"hello world","preEmphasisTauUS":75}`) rec := httptest.NewRecorder() - srv.Handler().ServeHTTP(rec, req) - if rec.Code != http.StatusOK { - t.Fatalf("unexpected status: %d body=%s", rec.Code, rec.Body.String()) - } + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/config", bytes.NewReader(body))) + if rec.Code != 200 { t.Fatalf("status: %d body=%s", rec.Code, rec.Body.String()) } +} - getReq := httptest.NewRequest(http.MethodGet, "/config", nil) - getRec := httptest.NewRecorder() - srv.Handler().ServeHTTP(getRec, getReq) - if getRec.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", getRec.Code) - } +func TestRuntimeWithoutDriver(t *testing.T) { + srv := NewServer(cfgpkg.Default()) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/runtime", nil)) + if rec.Code != 200 { t.Fatalf("status: %d", rec.Code) } +} + +func TestTXStartWithoutController(t *testing.T) { + srv := NewServer(cfgpkg.Default()) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/tx/start", nil)) + if rec.Code != http.StatusServiceUnavailable { t.Fatalf("expected 503, got %d", rec.Code) } } diff --git a/internal/platform/soapy.go b/internal/platform/soapy.go index 5255c6e..81e94b6 100644 --- a/internal/platform/soapy.go +++ b/internal/platform/soapy.go @@ -4,11 +4,44 @@ import ( "context" "fmt" "sync" + "sync/atomic" + "time" "github.com/jan/fm-rds-tx/internal/output" ) -// SoapyConfig exposes SoapySDR-specific knobs that drive hardware or simulated drivers. +// ----------------------------------------------------------------------- +// Device capabilities and runtime stats +// ----------------------------------------------------------------------- + +// DeviceCaps describes what a hardware device supports. +type DeviceCaps struct { + MinSampleRate float64 + MaxSampleRate float64 + SupportedSampleRates []float64 + HasGain bool + GainMinDB float64 + GainMaxDB float64 + Channels []int +} + +// RuntimeStats exposes live telemetry from the backend. +type RuntimeStats struct { + TXEnabled bool `json:"txEnabled"` + StreamActive bool `json:"streamActive"` + FramesWritten uint64 `json:"framesWritten"` + SamplesWritten uint64 `json:"samplesWritten"` + Underruns uint64 `json:"underruns"` + Overruns uint64 `json:"overruns"` + LastError string `json:"lastError,omitempty"` + LastErrorAt string `json:"lastErrorAt,omitempty"` + EffectiveRate float64 `json:"effectiveSampleRateHz"` +} + +// ----------------------------------------------------------------------- +// SoapyConfig +// ----------------------------------------------------------------------- + type SoapyConfig struct { output.BackendConfig Driver string @@ -21,16 +54,26 @@ type SoapyConfig struct { SimulationPath string } -// SoapyDriver is the low-level contract for talking to Soapy-style devices. +// ----------------------------------------------------------------------- +// SoapyDriver interface — extended for real HW +// ----------------------------------------------------------------------- + type SoapyDriver interface { Name() string Configure(ctx context.Context, cfg SoapyConfig) error + Capabilities(ctx context.Context) (DeviceCaps, error) + Start(ctx context.Context) error Write(ctx context.Context, frame *output.CompositeFrame) (int, error) + Stop(ctx context.Context) error Flush(ctx context.Context) error Close(ctx context.Context) error + Stats() RuntimeStats } -// SoapyBackend wraps a driver and exposes the output.Backend interface. +// ----------------------------------------------------------------------- +// SoapyBackend wraps driver and exposes output.Backend +// ----------------------------------------------------------------------- + type SoapyBackend struct { mu sync.Mutex driver SoapyDriver @@ -38,7 +81,6 @@ type SoapyBackend struct { info output.BackendInfo } -// NewSoapyBackend returns an output-aware backend that drives the provided driver. func NewSoapyBackend(cfg SoapyConfig, driver SoapyDriver) *SoapyBackend { if driver == nil { driver = NewSimulatedDriver(nil) @@ -55,7 +97,6 @@ func NewSoapyBackend(cfg SoapyConfig, driver SoapyDriver) *SoapyBackend { return &SoapyBackend{driver: driver, cfg: cfg, info: info} } -// Configure propagates the latest backend config to the driver. func (sb *SoapyBackend) Configure(ctx context.Context, cfg output.BackendConfig) error { sb.mu.Lock() sb.cfg.BackendConfig = cfg @@ -63,36 +104,43 @@ func (sb *SoapyBackend) Configure(ctx context.Context, cfg output.BackendConfig) return sb.driver.Configure(ctx, sb.cfg) } -// Write delegates to the driver. func (sb *SoapyBackend) Write(ctx context.Context, frame *output.CompositeFrame) (int, error) { return sb.driver.Write(ctx, frame) } -// Flush asks the driver to drain any buffers. func (sb *SoapyBackend) Flush(ctx context.Context) error { return sb.driver.Flush(ctx) } -// Close shuts down the driver cleanly. func (sb *SoapyBackend) Close(ctx context.Context) error { return sb.driver.Close(ctx) } -// Info reports the configured backend metadata. func (sb *SoapyBackend) Info() output.BackendInfo { sb.mu.Lock() defer sb.mu.Unlock() return sb.info } -// SimulatedDriver keeps samples in a downstream backend for testing without hardware. +func (sb *SoapyBackend) Driver() SoapyDriver { + return sb.driver +} + +// ----------------------------------------------------------------------- +// SimulatedDriver — implements full SoapyDriver interface +// ----------------------------------------------------------------------- + type SimulatedDriver struct { - mu sync.Mutex - fallback output.Backend - cfg SoapyConfig + mu sync.Mutex + fallback output.Backend + cfg SoapyConfig + started bool + framesWritten atomic.Uint64 + samplesWritten atomic.Uint64 + lastError string + lastErrorAt string } -// NewSimulatedDriver uses the provided backend or falls back to an in-memory dummy. func NewSimulatedDriver(writer output.Backend) *SimulatedDriver { if writer == nil { writer = output.NewDummyBackend("simulated-soapy") @@ -100,12 +148,10 @@ func NewSimulatedDriver(writer output.Backend) *SimulatedDriver { return &SimulatedDriver{fallback: writer} } -// Name returns the runtime label of the simulated driver. func (sd *SimulatedDriver) Name() string { return sd.fallback.Info().Name } -// Configure pushes the SoapyConfig into the fallback backend. func (sd *SimulatedDriver) Configure(ctx context.Context, cfg SoapyConfig) error { sd.mu.Lock() sd.cfg = cfg @@ -113,17 +159,64 @@ func (sd *SimulatedDriver) Configure(ctx context.Context, cfg SoapyConfig) error return sd.fallback.Configure(ctx, cfg.BackendConfig) } -// Write simply plants the frame into the fallback pipeline. +func (sd *SimulatedDriver) Capabilities(_ context.Context) (DeviceCaps, error) { + return DeviceCaps{ + MinSampleRate: 48000, + MaxSampleRate: 2400000, + HasGain: true, + GainMinDB: 0, + GainMaxDB: 47, + Channels: []int{0}, + }, nil +} + +func (sd *SimulatedDriver) Start(_ context.Context) error { + sd.mu.Lock() + defer sd.mu.Unlock() + sd.started = true + return nil +} + func (sd *SimulatedDriver) Write(ctx context.Context, frame *output.CompositeFrame) (int, error) { - return sd.fallback.Write(ctx, frame) + n, err := sd.fallback.Write(ctx, frame) + if err != nil { + sd.mu.Lock() + sd.lastError = err.Error() + sd.lastErrorAt = time.Now().UTC().Format(time.RFC3339) + sd.mu.Unlock() + } + if n > 0 { + sd.framesWritten.Add(1) + sd.samplesWritten.Add(uint64(n)) + } + return n, err +} + +func (sd *SimulatedDriver) Stop(_ context.Context) error { + sd.mu.Lock() + defer sd.mu.Unlock() + sd.started = false + return nil } -// Flush is delegated. func (sd *SimulatedDriver) Flush(ctx context.Context) error { return sd.fallback.Flush(ctx) } -// Close finalizes the fallback backend. func (sd *SimulatedDriver) Close(ctx context.Context) error { return sd.fallback.Close(ctx) } + +func (sd *SimulatedDriver) Stats() RuntimeStats { + sd.mu.Lock() + defer sd.mu.Unlock() + return RuntimeStats{ + TXEnabled: sd.started, + StreamActive: sd.started, + FramesWritten: sd.framesWritten.Load(), + SamplesWritten: sd.samplesWritten.Load(), + LastError: sd.lastError, + LastErrorAt: sd.lastErrorAt, + EffectiveRate: sd.cfg.SampleRateHz, + } +}