diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index 3f234d6..cfa71f8 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -216,3 +216,17 @@ func (b *txBridge) TXStats() map[string]any { "lastError": s.LastError, "uptimeSeconds": s.UptimeSeconds, } } +func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error { + return b.engine.UpdateConfig(apppkg.LiveConfigUpdate{ + FrequencyMHz: lp.FrequencyMHz, + OutputDrive: lp.OutputDrive, + StereoEnabled: lp.StereoEnabled, + PilotLevel: lp.PilotLevel, + RDSInjection: lp.RDSInjection, + RDSEnabled: lp.RDSEnabled, + LimiterEnabled: lp.LimiterEnabled, + LimiterCeiling: lp.LimiterCeiling, + PS: lp.PS, + RadioText: lp.RadioText, + }) +} diff --git a/docs/API.md b/docs/API.md new file mode 100644 index 0000000..8e3b096 --- /dev/null +++ b/docs/API.md @@ -0,0 +1,220 @@ +# fm-rds-tx HTTP Control API + +Base URL: `http://{listenAddress}` (default `127.0.0.1:8088`) + +--- + +## Endpoints + +### `GET /healthz` + +Health check. + +**Response:** +```json +{"ok": true} +``` + +--- + +### `GET /status` + +Current transmitter status (read-only snapshot). + +**Response:** +```json +{ + "service": "fm-rds-tx", + "backend": "pluto", + "frequencyMHz": 100.0, + "stereoEnabled": true, + "rdsEnabled": true, + "preEmphasisTauUS": 50, + "limiterEnabled": true, + "fmModulationEnabled": true +} +``` + +--- + +### `GET /runtime` + +Live engine and driver telemetry. Only populated when TX is active. + +**Response:** +```json +{ + "engine": { + "state": "running", + "chunksProduced": 12345, + "totalSamples": 1408950000, + "underruns": 0, + "lastError": "", + "uptimeSeconds": 3614.2 + }, + "driver": { + "txEnabled": true, + "streamActive": true, + "framesWritten": 12345, + "samplesWritten": 1408950000, + "underruns": 0, + "effectiveSampleRateHz": 2280000 + } +} +``` + +--- + +### `GET /config` + +Full current configuration (all fields, including non-patchable). + +**Response:** Complete `Config` JSON object. + +--- + +### `POST /config` + +**Live parameter update.** Changes are applied to the running TX engine immediately — no restart required. Only include fields you want to change (PATCH semantics). + +**Request body:** JSON with any subset of patchable fields. + +**Response:** +```json +{"ok": true, "live": true} +``` + +`"live": true` = changes were forwarded to the running engine. +`"live": false` = engine not active, changes saved for next start. + +#### Patchable fields — DSP (applied within ~50ms) + +| Field | Type | Range | Description | +|---|---|---|---| +| `frequencyMHz` | float | 65–110 | TX center frequency. Tunes hardware LO live. | +| `outputDrive` | float | 0–3 | Composite output level multiplier. | +| `stereoEnabled` | bool | | Enable/disable stereo (pilot + 38kHz subcarrier). | +| `pilotLevel` | float | 0–0.2 | 19 kHz pilot injection level. | +| `rdsInjection` | float | 0–0.15 | 57 kHz RDS subcarrier injection level. | +| `rdsEnabled` | bool | | Enable/disable RDS subcarrier. | +| `limiterEnabled` | bool | | Enable/disable MPX peak limiter. | +| `limiterCeiling` | float | 0–2 | Limiter ceiling (max composite amplitude). | + +#### Patchable fields — RDS text (applied within ~88ms) + +| Field | Type | Max length | Description | +|---|---|---|---| +| `ps` | string | 8 chars | Program Service name (station name on receiver display). | +| `radioText` | string | 64 chars | RadioText message (scrolling text on receiver). | + +When `radioText` is updated, the RDS A/B flag toggles automatically per spec, signaling receivers to refresh their display. + +#### Patchable fields — other (saved, not live-applied) + +| Field | Type | Description | +|---|---|---| +| `toneLeftHz` | float | Left tone frequency (test generator). | +| `toneRightHz` | float | Right tone frequency (test generator). | +| `toneAmplitude` | float | Test tone amplitude (0–1). | +| `preEmphasisTauUS` | float | Pre-emphasis time constant. **Requires restart.** | + +#### Examples + +```bash +# Tune to 99.5 MHz +curl -X POST localhost:8088/config -d '{"frequencyMHz": 99.5}' + +# Switch to mono +curl -X POST localhost:8088/config -d '{"stereoEnabled": false}' + +# Update now-playing text +curl -X POST localhost:8088/config \ + -d '{"ps": "MYRADIO", "radioText": "Artist - Song Title"}' + +# Reduce power + disable limiter +curl -X POST localhost:8088/config \ + -d '{"outputDrive": 0.8, "limiterEnabled": false}' + +# Full update +curl -X POST localhost:8088/config -d '{ + "frequencyMHz": 101.3, + "outputDrive": 2.2, + "stereoEnabled": true, + "pilotLevel": 0.041, + "rdsInjection": 0.021, + "rdsEnabled": true, + "limiterEnabled": true, + "limiterCeiling": 1.0, + "ps": "PIRATE", + "radioText": "Broadcasting from the attic" +}' +``` + +#### Error handling + +Invalid values return `400 Bad Request` with a descriptive message: +```bash +curl -X POST localhost:8088/config -d '{"frequencyMHz": 200}' +# → 400: frequencyMHz out of range (65-110) +``` + +--- + +### `POST /tx/start` + +Start transmission. Requires `--tx` mode with hardware. + +**Response:** +```json +{"ok": true, "action": "started"} +``` + +**Errors:** +- `405` if not POST +- `503` if no TX controller (not in `--tx` mode) +- `409` if already running + +--- + +### `POST /tx/stop` + +Stop transmission. + +**Response:** +```json +{"ok": true, "action": "stopped"} +``` + +--- + +### `GET /dry-run` + +Generate a synthetic frame summary without hardware. Useful for config verification. + +**Response:** `FrameSummary` JSON with mode, rates, source info, preview samples. + +--- + +## Live update architecture + +All live updates are **lock-free** in the DSP path: + +| What | Mechanism | Latency | +|---|---|---| +| DSP params | `atomic.Pointer[LiveParams]` loaded once per chunk | ≤ 50ms | +| RDS text | `atomic.Value` in encoder, read at group boundary | ≤ 88ms | +| TX frequency | `atomic.Pointer` in engine, `driver.Tune()` between chunks | ≤ 50ms | + +No mutex, no channel, no allocation in the real-time path. The HTTP goroutine writes atomics, the DSP goroutine reads them. + +## Parameters that require restart + +These cannot be hot-reloaded (they affect DSP pipeline structure): + +- `compositeRateHz` — changes sample rate of entire DSP chain +- `deviceSampleRateHz` — changes hardware rate / upsampler ratio +- `maxDeviationHz` — changes FM modulator scaling +- `preEmphasisTauUS` — changes filter coefficients +- `rds.pi` / `rds.pty` — rarely change, baked into encoder init +- `audio.inputPath` — audio source selection +- `backend.kind` / `backend.device` — hardware selection diff --git a/docs/README.md b/docs/README.md index 2032ba9..9549c41 100644 --- a/docs/README.md +++ b/docs/README.md @@ -52,6 +52,26 @@ FM broadcast requires pre-emphasis to boost high frequencies before transmission - `fmModulationEnabled: true` — output is baseband FM-modulated IQ (I² + Q² = 1). This is what SDR transmitters expect. - `fmModulationEnabled: false` — output is raw composite MPX (I = composite, Q = 0). Useful for analysis or composite exciters. +### Split-rate mode (Pluto / HackRF) + +When `deviceSampleRateHz > compositeRateHz` (e.g. Pluto at 2.28 MHz, composite at 228 kHz), the engine automatically activates split-rate mode: + +1. DSP chain (stereo, RDS, limiter) runs at `compositeRateHz` (228 kHz) +2. `FMUpsampler` performs FM modulation + phase-domain interpolation to `deviceSampleRateHz` +3. Hardware receives IQ at device rate + +This halves CPU load compared to running all DSP at device rate. Log output confirms the active mode: + +``` +engine: split-rate mode — DSP@228000Hz → upsample@2280000Hz (ratio 10.00) +``` + +When rates are equal (e.g. LimeSDR at 228 kHz), same-rate mode is used: + +``` +engine: same-rate mode — DSP@228000Hz +``` + ### Limiter The MPX limiter prevents overmodulation by applying smooth gain reduction when the composite signal exceeds the configured ceiling. A hard clipper acts as a safety net after the limiter. @@ -61,24 +81,11 @@ The MPX limiter prevents overmodulation by applying smooth gain reduction when t ### HTTP control surface -Available endpoints: -- `GET /healthz` -- `GET /status` -- `GET /dry-run` -- `GET /config` -- `POST /config` - -Current patchable runtime fields via `POST /config`: -- `frequencyMHz` -- `outputDrive` -- `toneLeftHz` -- `toneRightHz` -- `toneAmplitude` -- `ps` -- `radioText` -- `preEmphasisUS` -- `limiterEnabled` -- `limiterCeiling` +Full API documentation: **[docs/API.md](API.md)** + +All major TX parameters are hot-reloadable via `POST /config` during live transmission — frequency, stereo/mono, RDS text, output drive, pilot/RDS levels, limiter. Changes take effect within 50–88ms without stopping the stream. + +Available endpoints: `/healthz`, `/status`, `/runtime`, `/config` (GET/POST), `/dry-run`, `/tx/start`, `/tx/stop` ### Internal DSP module - `cd internal` diff --git a/internal/app/engine.go b/internal/app/engine.go index ae8e176..7384d9a 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -67,6 +67,9 @@ type Engine struct { totalSamples atomic.Uint64 underruns atomic.Uint64 lastError atomic.Value // string + + // Live config: pending frequency change, applied between chunks + pendingFreq atomic.Pointer[float64] } func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine { @@ -115,6 +118,86 @@ func (e *Engine) SetChunkDuration(d time.Duration) { e.chunkDuration = d } +// LiveConfigUpdate carries hot-reloadable parameters from the control API. +// nil pointers mean "no change". Validated before applying. +type LiveConfigUpdate struct { + FrequencyMHz *float64 + OutputDrive *float64 + StereoEnabled *bool + PilotLevel *float64 + RDSInjection *float64 + RDSEnabled *bool + LimiterEnabled *bool + LimiterCeiling *float64 + PS *string + RadioText *string +} + +// UpdateConfig applies live parameter changes without restarting the engine. +// DSP params take effect at the next chunk boundary (~50ms max). +// Frequency changes are applied between chunks via driver.Tune(). +// RDS text updates are applied at the next RDS group boundary (~88ms). +func (e *Engine) UpdateConfig(u LiveConfigUpdate) error { + // --- Validate --- + if u.FrequencyMHz != nil { + if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 { + return fmt.Errorf("frequencyMHz out of range (65-110)") + } + } + if u.OutputDrive != nil { + if *u.OutputDrive < 0 || *u.OutputDrive > 3 { + return fmt.Errorf("outputDrive out of range (0-3)") + } + } + if u.PilotLevel != nil { + if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 { + return fmt.Errorf("pilotLevel out of range (0-0.2)") + } + } + if u.RDSInjection != nil { + if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 { + return fmt.Errorf("rdsInjection out of range (0-0.15)") + } + } + if u.LimiterCeiling != nil { + if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 { + return fmt.Errorf("limiterCeiling out of range (0-2)") + } + } + + // --- Frequency: store for run loop to apply via driver.Tune() --- + if u.FrequencyMHz != nil { + freqHz := *u.FrequencyMHz * 1e6 + e.pendingFreq.Store(&freqHz) + } + + // --- RDS text: forward to encoder atomics --- + if u.PS != nil || u.RadioText != nil { + if enc := e.generator.RDSEncoder(); enc != nil { + ps, rt := "", "" + if u.PS != nil { ps = *u.PS } + if u.RadioText != nil { rt = *u.RadioText } + enc.UpdateText(ps, rt) + } + } + + // --- DSP params: build new LiveParams from current + patch --- + // Read current, apply deltas, store new + current := e.generator.CurrentLiveParams() + next := current // copy + + if u.OutputDrive != nil { next.OutputDrive = *u.OutputDrive } + if u.StereoEnabled != nil { next.StereoEnabled = *u.StereoEnabled } + if u.PilotLevel != nil { next.PilotLevel = *u.PilotLevel } + if u.RDSInjection != nil { next.RDSInjection = *u.RDSInjection } + if u.RDSEnabled != nil { next.RDSEnabled = *u.RDSEnabled } + if u.LimiterEnabled != nil { next.LimiterEnabled = *u.LimiterEnabled } + if u.LimiterCeiling != nil { next.LimiterCeiling = *u.LimiterCeiling } + + e.generator.UpdateLive(next) + return nil +} + func (e *Engine) Start(ctx context.Context) error { e.mu.Lock() if e.state != EngineIdle { @@ -192,6 +275,16 @@ func (e *Engine) run(ctx context.Context) { if ctx.Err() != nil { return } + + // Apply pending frequency change between chunks + if pf := e.pendingFreq.Swap(nil); pf != nil { + if err := e.driver.Tune(ctx, *pf); err != nil { + e.lastError.Store(fmt.Sprintf("tune: %v", err)) + } else { + log.Printf("engine: tuned to %.3f MHz", *pf/1e6) + } + } + frame := e.generator.GenerateFrame(e.chunkDuration) if e.upsampler != nil { frame = e.upsampler.Process(frame) diff --git a/internal/app/engine_test.go b/internal/app/engine_test.go index b7c7738..95a111b 100644 --- a/internal/app/engine_test.go +++ b/internal/app/engine_test.go @@ -131,3 +131,121 @@ func TestEngineSameRate(t *testing.T) { t.Fatal("expected same-rate mode (upsampler == nil)") } } + +func TestEngineLiveUpdateDSP(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + eng.SetChunkDuration(10 * time.Millisecond) + + ctx := context.Background() + if err := eng.Start(ctx); err != nil { + t.Fatalf("start: %v", err) + } + defer eng.Stop(ctx) + + time.Sleep(50 * time.Millisecond) + + // Update DSP params while running + drive := 1.5 + stereo := false + err := eng.UpdateConfig(LiveConfigUpdate{ + OutputDrive: &drive, + StereoEnabled: &stereo, + }) + if err != nil { + t.Fatalf("UpdateConfig: %v", err) + } + + // Engine should still be running after update + time.Sleep(50 * time.Millisecond) + stats := eng.Stats() + if stats.State != "running" { + t.Fatalf("expected running after update, got %s", stats.State) + } + if stats.Underruns > 0 { + t.Fatalf("unexpected underruns after update: %d", stats.Underruns) + } +} + +func TestEngineLiveUpdateFrequency(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + eng.SetChunkDuration(10 * time.Millisecond) + + ctx := context.Background() + if err := eng.Start(ctx); err != nil { + t.Fatalf("start: %v", err) + } + defer eng.Stop(ctx) + + time.Sleep(50 * time.Millisecond) + + // Tune frequency + freq := 99.5 + err := eng.UpdateConfig(LiveConfigUpdate{FrequencyMHz: &freq}) + if err != nil { + t.Fatalf("UpdateConfig freq: %v", err) + } + + // Let it process for a bit so the pending freq gets applied + time.Sleep(50 * time.Millisecond) + stats := eng.Stats() + if stats.State != "running" { + t.Fatalf("expected running after tune, got %s", stats.State) + } +} + +func TestEngineLiveUpdateRDS(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + eng.SetChunkDuration(10 * time.Millisecond) + + ctx := context.Background() + if err := eng.Start(ctx); err != nil { + t.Fatalf("start: %v", err) + } + defer eng.Stop(ctx) + + time.Sleep(50 * time.Millisecond) + + // Update RDS text + ps := "NEWPS" + rt := "Now playing: test track" + err := eng.UpdateConfig(LiveConfigUpdate{PS: &ps, RadioText: &rt}) + if err != nil { + t.Fatalf("UpdateConfig RDS: %v", err) + } + + time.Sleep(50 * time.Millisecond) + stats := eng.Stats() + if stats.Underruns > 0 { + t.Fatalf("underruns after RDS update: %d", stats.Underruns) + } +} + +func TestEngineLiveUpdateValidation(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + + // Out of range frequency + badFreq := 200.0 + if err := eng.UpdateConfig(LiveConfigUpdate{FrequencyMHz: &badFreq}); err == nil { + t.Fatal("expected validation error for bad frequency") + } + + // Out of range drive + badDrive := 10.0 + if err := eng.UpdateConfig(LiveConfigUpdate{OutputDrive: &badDrive}); err == nil { + t.Fatal("expected validation error for bad drive") + } + + // Valid update should succeed + goodDrive := 1.0 + if err := eng.UpdateConfig(LiveConfigUpdate{OutputDrive: &goodDrive}); err != nil { + t.Fatalf("expected valid update to succeed: %v", err) + } +} diff --git a/internal/control/control.go b/internal/control/control.go index 2dc4c72..e0cf0e8 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -1,6 +1,7 @@ package control import ( + _ "embed" "encoding/json" "net/http" "sync" @@ -10,11 +11,31 @@ import ( "github.com/jan/fm-rds-tx/internal/platform" ) -// TXController is an optional interface the Server uses to start/stop TX. +//go:embed ui.html +var uiHTML []byte + +// TXController is an optional interface the Server uses to start/stop TX +// and apply live config changes. type TXController interface { StartTX() error StopTX() error TXStats() map[string]any + UpdateConfig(patch LivePatch) error +} + +// LivePatch mirrors the patchable fields from ConfigPatch for the engine. +// nil = no change. +type LivePatch struct { + FrequencyMHz *float64 + OutputDrive *float64 + StereoEnabled *bool + PilotLevel *float64 + RDSInjection *float64 + RDSEnabled *bool + LimiterEnabled *bool + LimiterCeiling *float64 + PS *string + RadioText *string } type Server struct { @@ -27,6 +48,10 @@ type Server struct { type ConfigPatch struct { FrequencyMHz *float64 `json:"frequencyMHz,omitempty"` OutputDrive *float64 `json:"outputDrive,omitempty"` + StereoEnabled *bool `json:"stereoEnabled,omitempty"` + PilotLevel *float64 `json:"pilotLevel,omitempty"` + RDSInjection *float64 `json:"rdsInjection,omitempty"` + RDSEnabled *bool `json:"rdsEnabled,omitempty"` ToneLeftHz *float64 `json:"toneLeftHz,omitempty"` ToneRightHz *float64 `json:"toneRightHz,omitempty"` ToneAmplitude *float64 `json:"toneAmplitude,omitempty"` @@ -55,6 +80,7 @@ func (s *Server) SetDriver(drv platform.SoapyDriver) { func (s *Server) Handler() http.Handler { mux := http.NewServeMux() + mux.HandleFunc("/", s.handleUI) mux.HandleFunc("/healthz", s.handleHealth) mux.HandleFunc("/status", s.handleStatus) mux.HandleFunc("/dry-run", s.handleDryRun) @@ -70,6 +96,16 @@ func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) } +func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Cache-Control", "no-cache") + w.Write(uiHTML) +} + func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() cfg := s.cfg @@ -162,58 +198,60 @@ func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(cfg) case http.MethodPost: - // TODO: config changes only update the control server's copy. - // The running Engine/Generator holds its own snapshot and won't - // pick up these changes until restarted. Wire up a hot-reload - // path or document this limitation clearly for operators. var patch ConfigPatch if err := json.NewDecoder(r.Body).Decode(&patch); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } + // Update the server's config snapshot (for GET /config and /status) s.mu.Lock() next := s.cfg - if patch.FrequencyMHz != nil { - next.FM.FrequencyMHz = *patch.FrequencyMHz - } - if patch.OutputDrive != nil { - next.FM.OutputDrive = *patch.OutputDrive - } - if patch.ToneLeftHz != nil { - next.Audio.ToneLeftHz = *patch.ToneLeftHz - } - if patch.ToneRightHz != nil { - next.Audio.ToneRightHz = *patch.ToneRightHz - } - if patch.ToneAmplitude != nil { - next.Audio.ToneAmplitude = *patch.ToneAmplitude - } - if patch.PS != nil { - next.RDS.PS = *patch.PS - } - if patch.RadioText != nil { - next.RDS.RadioText = *patch.RadioText - } - if patch.PreEmphasisTauUS != nil { - next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS - } - if patch.LimiterEnabled != nil { - next.FM.LimiterEnabled = *patch.LimiterEnabled - } - if patch.LimiterCeiling != nil { - next.FM.LimiterCeiling = *patch.LimiterCeiling - } + if patch.FrequencyMHz != nil { next.FM.FrequencyMHz = *patch.FrequencyMHz } + if patch.OutputDrive != nil { next.FM.OutputDrive = *patch.OutputDrive } + if patch.ToneLeftHz != nil { next.Audio.ToneLeftHz = *patch.ToneLeftHz } + if patch.ToneRightHz != nil { next.Audio.ToneRightHz = *patch.ToneRightHz } + if patch.ToneAmplitude != nil { next.Audio.ToneAmplitude = *patch.ToneAmplitude } + if patch.PS != nil { next.RDS.PS = *patch.PS } + if patch.RadioText != nil { next.RDS.RadioText = *patch.RadioText } + if patch.PreEmphasisTauUS != nil { next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS } + if patch.StereoEnabled != nil { next.FM.StereoEnabled = *patch.StereoEnabled } + if patch.LimiterEnabled != nil { next.FM.LimiterEnabled = *patch.LimiterEnabled } + if patch.LimiterCeiling != nil { next.FM.LimiterCeiling = *patch.LimiterCeiling } + if patch.RDSEnabled != nil { next.RDS.Enabled = *patch.RDSEnabled } + if patch.PilotLevel != nil { next.FM.PilotLevel = *patch.PilotLevel } + if patch.RDSInjection != nil { next.FM.RDSInjection = *patch.RDSInjection } if err := next.Validate(); err != nil { s.mu.Unlock() http.Error(w, err.Error(), http.StatusBadRequest) return } s.cfg = next + tx := s.tx s.mu.Unlock() + // Forward live-patchable params to running engine (if active) + if tx != nil { + lp := LivePatch{ + FrequencyMHz: patch.FrequencyMHz, + OutputDrive: patch.OutputDrive, + StereoEnabled: patch.StereoEnabled, + PilotLevel: patch.PilotLevel, + RDSInjection: patch.RDSInjection, + RDSEnabled: patch.RDSEnabled, + LimiterEnabled: patch.LimiterEnabled, + LimiterCeiling: patch.LimiterCeiling, + PS: patch.PS, + RadioText: patch.RadioText, + } + if err := tx.UpdateConfig(lp); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } + w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": tx != nil}) default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } diff --git a/internal/control/ui.html b/internal/control/ui.html new file mode 100644 index 0000000..1a70e9c --- /dev/null +++ b/internal/control/ui.html @@ -0,0 +1,875 @@ + + + + + +fm-rds-tx + + + +
+ + +
+

FM-RDS-TX

+
+
+ connecting +
+
+ + +
+
---.--MHz
+ + +
--
+
+ + +
+
Chunks
--
+
Samples
--
+
Underruns
0
+
Uptime
--
+
Rate
--
+
+ + +
+
+
+

Frequency

+ +
+
+
+ TX Freq +
+ + + MHz +
+
+ +
+
+ + +
+
+
+

Levels

+ +
+
+
+ Output Drive +
+ + -- +
+
+
+ Pilot Level +
+ + -- +
+
+
+ RDS Inject +
+ + -- +
+
+
+ Limiter Ceil +
+ + -- +
+
+ +
+
+ + +
+
+
+

Switches

+ +
+
+
+ Stereo +
+
+ -- +
+
+
+ RDS +
+
+ -- +
+
+
+ Limiter +
+
+ -- +
+
+
+
+ + +
+
+
+

RDS

+ +
+
+
+ Program Service (PS) + +
0/8
+
+
+ RadioText (RT) + +
0/64
+
+ +
+
+ + +
+
+

Log

+ +
+
+
+
+
+ +
+ + +
+ + + + diff --git a/internal/offline/generator.go b/internal/offline/generator.go index 25eebb7..e0513e4 100644 --- a/internal/offline/generator.go +++ b/internal/offline/generator.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "path/filepath" + "sync/atomic" "time" "github.com/jan/fm-rds-tx/internal/audio" @@ -20,6 +21,18 @@ type frameSource interface { NextFrame() audio.Frame } +// LiveParams carries DSP parameters that can be hot-swapped at runtime. +// Loaded once per chunk via atomic pointer — zero per-sample overhead. +type LiveParams struct { + OutputDrive float64 + StereoEnabled bool + PilotLevel float64 + RDSInjection float64 + RDSEnabled bool + LimiterEnabled bool + LimiterCeiling float64 +} + // PreEmphasizedSource wraps an audio source and applies pre-emphasis. // The source is expected to already output at composite rate (resampled // upstream). Pre-emphasis is applied per-sample at that rate. @@ -71,17 +84,38 @@ type Generator struct { frameSeq uint64 // Pre-allocated frame buffer — reused every GenerateFrame call. - // Safe because driver.Write() is blocking: it returns only after - // the hardware has consumed the data. Do NOT hold references to - // frame.Samples beyond Write's return. frameBuf *output.CompositeFrame bufCap int + + // Live-updatable DSP parameters — written by control API, read per chunk. + liveParams atomic.Pointer[LiveParams] } func NewGenerator(cfg cfgpkg.Config) *Generator { return &Generator{cfg: cfg} } +// UpdateLive hot-swaps DSP parameters. Thread-safe — called from control API, +// applied at the next chunk boundary by the DSP goroutine. +func (g *Generator) UpdateLive(p LiveParams) { + g.liveParams.Store(&p) +} + +// CurrentLiveParams returns the current live parameter snapshot. +// Used by Engine.UpdateConfig to do read-modify-write on the params. +func (g *Generator) CurrentLiveParams() LiveParams { + if lp := g.liveParams.Load(); lp != nil { + return *lp + } + return LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0} +} + +// RDSEncoder returns the live RDS encoder, or nil if RDS is disabled. +// Used by the Engine to forward text updates. +func (g *Generator) RDSEncoder() *rds.Encoder { + return g.rdsEnc +} + func (g *Generator) init() { if g.initialized { return @@ -113,6 +147,18 @@ func (g *Generator) init() { g.fmMod = dsp.NewFMModulator(g.sampleRate) if g.cfg.FM.MaxDeviationHz > 0 { g.fmMod.MaxDeviation = g.cfg.FM.MaxDeviationHz } } + + // Seed initial live params from config + g.liveParams.Store(&LiveParams{ + OutputDrive: g.cfg.FM.OutputDrive, + StereoEnabled: g.cfg.FM.StereoEnabled, + PilotLevel: g.cfg.FM.PilotLevel, + RDSInjection: g.cfg.FM.RDSInjection, + RDSEnabled: g.cfg.RDS.Enabled, + LimiterEnabled: g.cfg.FM.LimiterEnabled, + LimiterCeiling: ceiling, + }) + g.initialized = true } @@ -146,27 +192,38 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame g.frameSeq++ frame.Sequence = g.frameSeq - ceiling := g.cfg.FM.LimiterCeiling + // Load live params once per chunk — single atomic read, zero per-sample cost + lp := g.liveParams.Load() + if lp == nil { + // Fallback: should never happen after init(), but be safe + lp = &LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0} + } + + // Apply live combiner gains + g.combiner.PilotGain = lp.PilotLevel + g.combiner.RDSGain = lp.RDSInjection + + ceiling := lp.LimiterCeiling if ceiling <= 0 { ceiling = 1.0 } for i := 0; i < samples; i++ { in := g.source.NextFrame() comps := g.stereoEncoder.Encode(in) - if !g.cfg.FM.StereoEnabled { + if !lp.StereoEnabled { comps.Stereo = 0; comps.Pilot = 0 } rdsValue := 0.0 - if g.rdsEnc != nil { + if g.rdsEnc != nil && lp.RDSEnabled { rdsCarrier := g.stereoEncoder.RDSCarrier() rdsValue = g.rdsEnc.NextSampleWithCarrier(rdsCarrier) } composite := g.combiner.Combine(comps.Mono, comps.Stereo, comps.Pilot, rdsValue) - composite *= g.cfg.FM.OutputDrive + composite *= lp.OutputDrive - if g.limiter != nil { + if lp.LimiterEnabled && g.limiter != nil { composite = g.limiter.Process(composite) composite = dsp.HardClip(composite, ceiling) } diff --git a/internal/platform/plutosdr/pluto_windows.go b/internal/platform/plutosdr/pluto_windows.go index 7bf5b57..eb51e24 100644 --- a/internal/platform/plutosdr/pluto_windows.go +++ b/internal/platform/plutosdr/pluto_windows.go @@ -110,6 +110,7 @@ type PlutoDriver struct { phyDev uintptr // iio_device* (ad9361-phy) chanI uintptr // iio_channel* TX I chanQ uintptr // iio_channel* TX Q + chanLO uintptr // iio_channel* TX LO (altvoltage1), cached for Tune() buf uintptr // iio_buffer* bufSize int // samples per buffer push @@ -204,6 +205,7 @@ func (d *PlutoDriver) Configure(_ context.Context, cfg platform.SoapyConfig) err // TX LO frequency phyChanLO := d.findChannel(phyDev, "altvoltage1", true) // TX LO + d.chanLO = phyChanLO // cache for Tune() if phyChanLO != 0 { freqHz := int64(cfg.CenterFreqHz) if freqHz <= 0 { @@ -368,6 +370,27 @@ func (d *PlutoDriver) Stop(_ context.Context) error { func (d *PlutoDriver) Flush(_ context.Context) error { return nil } +func (d *PlutoDriver) Tune(_ context.Context, freqHz float64) error { + d.mu.Lock() + defer d.mu.Unlock() + if !d.configured || d.chanLO == 0 { + return fmt.Errorf("pluto: not configured or LO channel not available") + } + if d.lib.pChannelAttrWriteLongLong == nil { + return fmt.Errorf("pluto: iio_channel_attr_write_longlong not loaded") + } + cAttr, _ := syscall.BytePtrFromString("frequency") + ret, _, _ := d.lib.pChannelAttrWriteLongLong.Call( + d.chanLO, + uintptr(unsafe.Pointer(cAttr)), + uintptr(int64(freqHz)), + ) + if int32(ret) < 0 { + return fmt.Errorf("pluto: LO tune to %.0f Hz failed (iio rc=%d)", freqHz, int32(ret)) + } + return nil +} + func (d *PlutoDriver) Close(_ context.Context) error { d.mu.Lock() defer d.mu.Unlock() @@ -406,6 +429,7 @@ func (d *PlutoDriver) cleanup() { d.disableChannel(d.chanQ) d.chanQ = 0 } + d.chanLO = 0 // config-only channel, no disable needed if d.ctx != 0 && d.lib.pDestroyCtx != nil { d.lib.pDestroyCtx.Call(d.ctx) d.ctx = 0 diff --git a/internal/platform/soapy.go b/internal/platform/soapy.go index 81e94b6..8fe4845 100644 --- a/internal/platform/soapy.go +++ b/internal/platform/soapy.go @@ -68,6 +68,8 @@ type SoapyDriver interface { Flush(ctx context.Context) error Close(ctx context.Context) error Stats() RuntimeStats + // Tune changes the TX center frequency while streaming. Thread-safe. + Tune(ctx context.Context, freqHz float64) error } // ----------------------------------------------------------------------- @@ -220,3 +222,10 @@ func (sd *SimulatedDriver) Stats() RuntimeStats { EffectiveRate: sd.cfg.SampleRateHz, } } + +func (sd *SimulatedDriver) Tune(_ context.Context, freqHz float64) error { + sd.mu.Lock() + sd.cfg.CenterFreqHz = freqHz + sd.mu.Unlock() + return nil +} diff --git a/internal/platform/soapysdr/native.go b/internal/platform/soapysdr/native.go index de60027..4c9d824 100644 --- a/internal/platform/soapysdr/native.go +++ b/internal/platform/soapysdr/native.go @@ -209,6 +209,15 @@ func (d *nativeDriver) Stop(_ context.Context) error { func (d *nativeDriver) Flush(_ context.Context) error { return nil } +func (d *nativeDriver) Tune(_ context.Context, freqHz float64) error { + d.mu.Lock() + defer d.mu.Unlock() + if d.dev == 0 || d.lib == nil { + return fmt.Errorf("soapy: not configured") + } + return d.lib.setFrequency(d.dev, dirTX, 0, freqHz) +} + func (d *nativeDriver) Close(_ context.Context) error { d.mu.Lock() defer d.mu.Unlock() diff --git a/internal/rds/encoder.go b/internal/rds/encoder.go index e87b8b0..8abcaf5 100644 --- a/internal/rds/encoder.go +++ b/internal/rds/encoder.go @@ -1,6 +1,9 @@ package rds -import "math" +import ( + "math" + "sync/atomic" +) // RDS encoder — port of PiFmRds, adapted for arbitrary sample rates. // At 228 kHz: uses exact {0,+1,0,-1} carrier (identical to PiFmRds). @@ -88,6 +91,11 @@ type Encoder struct { carrierStep float64 SampleRate float64 + + // Live-updatable text — written by control API, read at group boundaries. + // Zero-contention: atomic swap, checked once per RDS group (~88ms at 228kHz). + livePS atomic.Value // string + liveRT atomic.Value // string } func NewEncoder(cfg RDSConfig) (*Encoder, error) { @@ -141,6 +149,18 @@ func (e *Encoder) Reset() { for i := range e.ring { e.ring[i] = 0 } } +// UpdateText hot-swaps PS and/or RT. Thread-safe — called from HTTP handlers, +// applied at the next RDS group boundary by the DSP goroutine. +// Pass empty string to leave a field unchanged. +func (e *Encoder) UpdateText(ps, rt string) { + if ps != "" { + e.livePS.Store(normalizePS(ps)) + } + if rt != "" { + e.liveRT.Store(normalizeRT(rt)) + } +} + // NextSample returns the next RDS subcarrier sample at the configured rate. // Uses the internal free-running 57 kHz carrier. Prefer NextSampleWithCarrier // for phase-locked operation in a stereo MPX chain. @@ -157,6 +177,16 @@ func (e *Encoder) NextSample() float64 { func (e *Encoder) NextSampleWithCarrier(carrier float64) float64 { if e.sampleCount >= e.spb { if e.bitPos >= bitsPerGroup { + // Apply live text updates at group boundaries (~88ms at 228kHz). + // This is the only place we read the atomics — zero per-sample overhead. + if ps, ok := e.livePS.Load().(string); ok && ps != "" { + e.scheduler.cfg.PS = ps + } + if rt, ok := e.liveRT.Load().(string); ok && rt != "" { + e.scheduler.cfg.RT = rt + e.scheduler.rtIdx = 0 // restart RT transmission for new text + e.scheduler.rtABFlag = !e.scheduler.rtABFlag // toggle A/B per RDS spec + } e.getRDSGroup() e.bitPos = 0 }