From 2d23a50abc43271075f3e436e590cbc4450f0be3 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Fri, 3 Apr 2026 17:24:42 +0200 Subject: [PATCH 1/4] feat: live config hot-reload via POST /config (fix27) All major TX parameters are now hot-swappable during transmission: - DSP params (drive, stereo, pilot, RDS levels, limiter) via atomic.Pointer[LiveParams], loaded once per chunk (~50ms) - RDS text (PS, RT) via atomic.Value in encoder, applied at RDS group boundaries (~88ms) - TX frequency via driver.Tune(), applied between chunks Zero locks in DSP path. HTTP handler writes atomics, run loop reads them. Validation happens before store. New: SoapyDriver.Tune() for live frequency changes. New: LiveConfigUpdate/LivePatch types for type-safe patching. New: 4 engine tests for live DSP/freq/RDS updates + validation. --- cmd/fmrtx/main.go | 14 +++ internal/app/engine.go | 93 +++++++++++++++ internal/app/engine_test.go | 118 ++++++++++++++++++++ internal/control/control.go | 95 ++++++++++------ internal/offline/generator.go | 73 ++++++++++-- internal/platform/plutosdr/pluto_windows.go | 14 +++ internal/platform/soapy.go | 9 ++ internal/platform/soapysdr/native.go | 9 ++ internal/rds/encoder.go | 32 +++++- 9 files changed, 412 insertions(+), 45 deletions(-) diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index 3f234d6..cfa71f8 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -216,3 +216,17 @@ func (b *txBridge) TXStats() map[string]any { "lastError": s.LastError, "uptimeSeconds": s.UptimeSeconds, } } +func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error { + return b.engine.UpdateConfig(apppkg.LiveConfigUpdate{ + FrequencyMHz: lp.FrequencyMHz, + OutputDrive: lp.OutputDrive, + StereoEnabled: lp.StereoEnabled, + PilotLevel: lp.PilotLevel, + RDSInjection: lp.RDSInjection, + RDSEnabled: lp.RDSEnabled, + LimiterEnabled: lp.LimiterEnabled, + LimiterCeiling: lp.LimiterCeiling, + PS: lp.PS, + RadioText: lp.RadioText, + }) +} diff --git a/internal/app/engine.go b/internal/app/engine.go index ae8e176..7384d9a 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -67,6 +67,9 @@ type Engine struct { totalSamples atomic.Uint64 underruns atomic.Uint64 lastError atomic.Value // string + + // Live config: pending frequency change, applied between chunks + pendingFreq atomic.Pointer[float64] } func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine { @@ -115,6 +118,86 @@ func (e *Engine) SetChunkDuration(d time.Duration) { e.chunkDuration = d } +// LiveConfigUpdate carries hot-reloadable parameters from the control API. +// nil pointers mean "no change". Validated before applying. +type LiveConfigUpdate struct { + FrequencyMHz *float64 + OutputDrive *float64 + StereoEnabled *bool + PilotLevel *float64 + RDSInjection *float64 + RDSEnabled *bool + LimiterEnabled *bool + LimiterCeiling *float64 + PS *string + RadioText *string +} + +// UpdateConfig applies live parameter changes without restarting the engine. +// DSP params take effect at the next chunk boundary (~50ms max). +// Frequency changes are applied between chunks via driver.Tune(). +// RDS text updates are applied at the next RDS group boundary (~88ms). +func (e *Engine) UpdateConfig(u LiveConfigUpdate) error { + // --- Validate --- + if u.FrequencyMHz != nil { + if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 { + return fmt.Errorf("frequencyMHz out of range (65-110)") + } + } + if u.OutputDrive != nil { + if *u.OutputDrive < 0 || *u.OutputDrive > 3 { + return fmt.Errorf("outputDrive out of range (0-3)") + } + } + if u.PilotLevel != nil { + if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 { + return fmt.Errorf("pilotLevel out of range (0-0.2)") + } + } + if u.RDSInjection != nil { + if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 { + return fmt.Errorf("rdsInjection out of range (0-0.15)") + } + } + if u.LimiterCeiling != nil { + if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 { + return fmt.Errorf("limiterCeiling out of range (0-2)") + } + } + + // --- Frequency: store for run loop to apply via driver.Tune() --- + if u.FrequencyMHz != nil { + freqHz := *u.FrequencyMHz * 1e6 + e.pendingFreq.Store(&freqHz) + } + + // --- RDS text: forward to encoder atomics --- + if u.PS != nil || u.RadioText != nil { + if enc := e.generator.RDSEncoder(); enc != nil { + ps, rt := "", "" + if u.PS != nil { ps = *u.PS } + if u.RadioText != nil { rt = *u.RadioText } + enc.UpdateText(ps, rt) + } + } + + // --- DSP params: build new LiveParams from current + patch --- + // Read current, apply deltas, store new + current := e.generator.CurrentLiveParams() + next := current // copy + + if u.OutputDrive != nil { next.OutputDrive = *u.OutputDrive } + if u.StereoEnabled != nil { next.StereoEnabled = *u.StereoEnabled } + if u.PilotLevel != nil { next.PilotLevel = *u.PilotLevel } + if u.RDSInjection != nil { next.RDSInjection = *u.RDSInjection } + if u.RDSEnabled != nil { next.RDSEnabled = *u.RDSEnabled } + if u.LimiterEnabled != nil { next.LimiterEnabled = *u.LimiterEnabled } + if u.LimiterCeiling != nil { next.LimiterCeiling = *u.LimiterCeiling } + + e.generator.UpdateLive(next) + return nil +} + func (e *Engine) Start(ctx context.Context) error { e.mu.Lock() if e.state != EngineIdle { @@ -192,6 +275,16 @@ func (e *Engine) run(ctx context.Context) { if ctx.Err() != nil { return } + + // Apply pending frequency change between chunks + if pf := e.pendingFreq.Swap(nil); pf != nil { + if err := e.driver.Tune(ctx, *pf); err != nil { + e.lastError.Store(fmt.Sprintf("tune: %v", err)) + } else { + log.Printf("engine: tuned to %.3f MHz", *pf/1e6) + } + } + frame := e.generator.GenerateFrame(e.chunkDuration) if e.upsampler != nil { frame = e.upsampler.Process(frame) diff --git a/internal/app/engine_test.go b/internal/app/engine_test.go index b7c7738..95a111b 100644 --- a/internal/app/engine_test.go +++ b/internal/app/engine_test.go @@ -131,3 +131,121 @@ func TestEngineSameRate(t *testing.T) { t.Fatal("expected same-rate mode (upsampler == nil)") } } + +func TestEngineLiveUpdateDSP(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + eng.SetChunkDuration(10 * time.Millisecond) + + ctx := context.Background() + if err := eng.Start(ctx); err != nil { + t.Fatalf("start: %v", err) + } + defer eng.Stop(ctx) + + time.Sleep(50 * time.Millisecond) + + // Update DSP params while running + drive := 1.5 + stereo := false + err := eng.UpdateConfig(LiveConfigUpdate{ + OutputDrive: &drive, + StereoEnabled: &stereo, + }) + if err != nil { + t.Fatalf("UpdateConfig: %v", err) + } + + // Engine should still be running after update + time.Sleep(50 * time.Millisecond) + stats := eng.Stats() + if stats.State != "running" { + t.Fatalf("expected running after update, got %s", stats.State) + } + if stats.Underruns > 0 { + t.Fatalf("unexpected underruns after update: %d", stats.Underruns) + } +} + +func TestEngineLiveUpdateFrequency(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + eng.SetChunkDuration(10 * time.Millisecond) + + ctx := context.Background() + if err := eng.Start(ctx); err != nil { + t.Fatalf("start: %v", err) + } + defer eng.Stop(ctx) + + time.Sleep(50 * time.Millisecond) + + // Tune frequency + freq := 99.5 + err := eng.UpdateConfig(LiveConfigUpdate{FrequencyMHz: &freq}) + if err != nil { + t.Fatalf("UpdateConfig freq: %v", err) + } + + // Let it process for a bit so the pending freq gets applied + time.Sleep(50 * time.Millisecond) + stats := eng.Stats() + if stats.State != "running" { + t.Fatalf("expected running after tune, got %s", stats.State) + } +} + +func TestEngineLiveUpdateRDS(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + eng.SetChunkDuration(10 * time.Millisecond) + + ctx := context.Background() + if err := eng.Start(ctx); err != nil { + t.Fatalf("start: %v", err) + } + defer eng.Stop(ctx) + + time.Sleep(50 * time.Millisecond) + + // Update RDS text + ps := "NEWPS" + rt := "Now playing: test track" + err := eng.UpdateConfig(LiveConfigUpdate{PS: &ps, RadioText: &rt}) + if err != nil { + t.Fatalf("UpdateConfig RDS: %v", err) + } + + time.Sleep(50 * time.Millisecond) + stats := eng.Stats() + if stats.Underruns > 0 { + t.Fatalf("underruns after RDS update: %d", stats.Underruns) + } +} + +func TestEngineLiveUpdateValidation(t *testing.T) { + cfg := cfgpkg.Default() + driver := platform.NewSimulatedDriver(nil) + eng := NewEngine(cfg, driver) + + // Out of range frequency + badFreq := 200.0 + if err := eng.UpdateConfig(LiveConfigUpdate{FrequencyMHz: &badFreq}); err == nil { + t.Fatal("expected validation error for bad frequency") + } + + // Out of range drive + badDrive := 10.0 + if err := eng.UpdateConfig(LiveConfigUpdate{OutputDrive: &badDrive}); err == nil { + t.Fatal("expected validation error for bad drive") + } + + // Valid update should succeed + goodDrive := 1.0 + if err := eng.UpdateConfig(LiveConfigUpdate{OutputDrive: &goodDrive}); err != nil { + t.Fatalf("expected valid update to succeed: %v", err) + } +} diff --git a/internal/control/control.go b/internal/control/control.go index 2dc4c72..25a5a5e 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -10,11 +10,28 @@ import ( "github.com/jan/fm-rds-tx/internal/platform" ) -// TXController is an optional interface the Server uses to start/stop TX. +// TXController is an optional interface the Server uses to start/stop TX +// and apply live config changes. type TXController interface { StartTX() error StopTX() error TXStats() map[string]any + UpdateConfig(patch LivePatch) error +} + +// LivePatch mirrors the patchable fields from ConfigPatch for the engine. +// nil = no change. +type LivePatch struct { + FrequencyMHz *float64 + OutputDrive *float64 + StereoEnabled *bool + PilotLevel *float64 + RDSInjection *float64 + RDSEnabled *bool + LimiterEnabled *bool + LimiterCeiling *float64 + PS *string + RadioText *string } type Server struct { @@ -27,6 +44,10 @@ type Server struct { type ConfigPatch struct { FrequencyMHz *float64 `json:"frequencyMHz,omitempty"` OutputDrive *float64 `json:"outputDrive,omitempty"` + StereoEnabled *bool `json:"stereoEnabled,omitempty"` + PilotLevel *float64 `json:"pilotLevel,omitempty"` + RDSInjection *float64 `json:"rdsInjection,omitempty"` + RDSEnabled *bool `json:"rdsEnabled,omitempty"` ToneLeftHz *float64 `json:"toneLeftHz,omitempty"` ToneRightHz *float64 `json:"toneRightHz,omitempty"` ToneAmplitude *float64 `json:"toneAmplitude,omitempty"` @@ -162,58 +183,60 @@ func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(cfg) case http.MethodPost: - // TODO: config changes only update the control server's copy. - // The running Engine/Generator holds its own snapshot and won't - // pick up these changes until restarted. Wire up a hot-reload - // path or document this limitation clearly for operators. var patch ConfigPatch if err := json.NewDecoder(r.Body).Decode(&patch); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } + // Update the server's config snapshot (for GET /config and /status) s.mu.Lock() next := s.cfg - if patch.FrequencyMHz != nil { - next.FM.FrequencyMHz = *patch.FrequencyMHz - } - if patch.OutputDrive != nil { - next.FM.OutputDrive = *patch.OutputDrive - } - if patch.ToneLeftHz != nil { - next.Audio.ToneLeftHz = *patch.ToneLeftHz - } - if patch.ToneRightHz != nil { - next.Audio.ToneRightHz = *patch.ToneRightHz - } - if patch.ToneAmplitude != nil { - next.Audio.ToneAmplitude = *patch.ToneAmplitude - } - if patch.PS != nil { - next.RDS.PS = *patch.PS - } - if patch.RadioText != nil { - next.RDS.RadioText = *patch.RadioText - } - if patch.PreEmphasisTauUS != nil { - next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS - } - if patch.LimiterEnabled != nil { - next.FM.LimiterEnabled = *patch.LimiterEnabled - } - if patch.LimiterCeiling != nil { - next.FM.LimiterCeiling = *patch.LimiterCeiling - } + if patch.FrequencyMHz != nil { next.FM.FrequencyMHz = *patch.FrequencyMHz } + if patch.OutputDrive != nil { next.FM.OutputDrive = *patch.OutputDrive } + if patch.ToneLeftHz != nil { next.Audio.ToneLeftHz = *patch.ToneLeftHz } + if patch.ToneRightHz != nil { next.Audio.ToneRightHz = *patch.ToneRightHz } + if patch.ToneAmplitude != nil { next.Audio.ToneAmplitude = *patch.ToneAmplitude } + if patch.PS != nil { next.RDS.PS = *patch.PS } + if patch.RadioText != nil { next.RDS.RadioText = *patch.RadioText } + if patch.PreEmphasisTauUS != nil { next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS } + if patch.StereoEnabled != nil { next.FM.StereoEnabled = *patch.StereoEnabled } + if patch.LimiterEnabled != nil { next.FM.LimiterEnabled = *patch.LimiterEnabled } + if patch.LimiterCeiling != nil { next.FM.LimiterCeiling = *patch.LimiterCeiling } + if patch.RDSEnabled != nil { next.RDS.Enabled = *patch.RDSEnabled } + if patch.PilotLevel != nil { next.FM.PilotLevel = *patch.PilotLevel } + if patch.RDSInjection != nil { next.FM.RDSInjection = *patch.RDSInjection } if err := next.Validate(); err != nil { s.mu.Unlock() http.Error(w, err.Error(), http.StatusBadRequest) return } s.cfg = next + tx := s.tx s.mu.Unlock() + // Forward live-patchable params to running engine (if active) + if tx != nil { + lp := LivePatch{ + FrequencyMHz: patch.FrequencyMHz, + OutputDrive: patch.OutputDrive, + StereoEnabled: patch.StereoEnabled, + PilotLevel: patch.PilotLevel, + RDSInjection: patch.RDSInjection, + RDSEnabled: patch.RDSEnabled, + LimiterEnabled: patch.LimiterEnabled, + LimiterCeiling: patch.LimiterCeiling, + PS: patch.PS, + RadioText: patch.RadioText, + } + if err := tx.UpdateConfig(lp); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } + w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": tx != nil}) default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } diff --git a/internal/offline/generator.go b/internal/offline/generator.go index 25eebb7..e0513e4 100644 --- a/internal/offline/generator.go +++ b/internal/offline/generator.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "path/filepath" + "sync/atomic" "time" "github.com/jan/fm-rds-tx/internal/audio" @@ -20,6 +21,18 @@ type frameSource interface { NextFrame() audio.Frame } +// LiveParams carries DSP parameters that can be hot-swapped at runtime. +// Loaded once per chunk via atomic pointer — zero per-sample overhead. +type LiveParams struct { + OutputDrive float64 + StereoEnabled bool + PilotLevel float64 + RDSInjection float64 + RDSEnabled bool + LimiterEnabled bool + LimiterCeiling float64 +} + // PreEmphasizedSource wraps an audio source and applies pre-emphasis. // The source is expected to already output at composite rate (resampled // upstream). Pre-emphasis is applied per-sample at that rate. @@ -71,17 +84,38 @@ type Generator struct { frameSeq uint64 // Pre-allocated frame buffer — reused every GenerateFrame call. - // Safe because driver.Write() is blocking: it returns only after - // the hardware has consumed the data. Do NOT hold references to - // frame.Samples beyond Write's return. frameBuf *output.CompositeFrame bufCap int + + // Live-updatable DSP parameters — written by control API, read per chunk. + liveParams atomic.Pointer[LiveParams] } func NewGenerator(cfg cfgpkg.Config) *Generator { return &Generator{cfg: cfg} } +// UpdateLive hot-swaps DSP parameters. Thread-safe — called from control API, +// applied at the next chunk boundary by the DSP goroutine. +func (g *Generator) UpdateLive(p LiveParams) { + g.liveParams.Store(&p) +} + +// CurrentLiveParams returns the current live parameter snapshot. +// Used by Engine.UpdateConfig to do read-modify-write on the params. +func (g *Generator) CurrentLiveParams() LiveParams { + if lp := g.liveParams.Load(); lp != nil { + return *lp + } + return LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0} +} + +// RDSEncoder returns the live RDS encoder, or nil if RDS is disabled. +// Used by the Engine to forward text updates. +func (g *Generator) RDSEncoder() *rds.Encoder { + return g.rdsEnc +} + func (g *Generator) init() { if g.initialized { return @@ -113,6 +147,18 @@ func (g *Generator) init() { g.fmMod = dsp.NewFMModulator(g.sampleRate) if g.cfg.FM.MaxDeviationHz > 0 { g.fmMod.MaxDeviation = g.cfg.FM.MaxDeviationHz } } + + // Seed initial live params from config + g.liveParams.Store(&LiveParams{ + OutputDrive: g.cfg.FM.OutputDrive, + StereoEnabled: g.cfg.FM.StereoEnabled, + PilotLevel: g.cfg.FM.PilotLevel, + RDSInjection: g.cfg.FM.RDSInjection, + RDSEnabled: g.cfg.RDS.Enabled, + LimiterEnabled: g.cfg.FM.LimiterEnabled, + LimiterCeiling: ceiling, + }) + g.initialized = true } @@ -146,27 +192,38 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame g.frameSeq++ frame.Sequence = g.frameSeq - ceiling := g.cfg.FM.LimiterCeiling + // Load live params once per chunk — single atomic read, zero per-sample cost + lp := g.liveParams.Load() + if lp == nil { + // Fallback: should never happen after init(), but be safe + lp = &LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0} + } + + // Apply live combiner gains + g.combiner.PilotGain = lp.PilotLevel + g.combiner.RDSGain = lp.RDSInjection + + ceiling := lp.LimiterCeiling if ceiling <= 0 { ceiling = 1.0 } for i := 0; i < samples; i++ { in := g.source.NextFrame() comps := g.stereoEncoder.Encode(in) - if !g.cfg.FM.StereoEnabled { + if !lp.StereoEnabled { comps.Stereo = 0; comps.Pilot = 0 } rdsValue := 0.0 - if g.rdsEnc != nil { + if g.rdsEnc != nil && lp.RDSEnabled { rdsCarrier := g.stereoEncoder.RDSCarrier() rdsValue = g.rdsEnc.NextSampleWithCarrier(rdsCarrier) } composite := g.combiner.Combine(comps.Mono, comps.Stereo, comps.Pilot, rdsValue) - composite *= g.cfg.FM.OutputDrive + composite *= lp.OutputDrive - if g.limiter != nil { + if lp.LimiterEnabled && g.limiter != nil { composite = g.limiter.Process(composite) composite = dsp.HardClip(composite, ceiling) } diff --git a/internal/platform/plutosdr/pluto_windows.go b/internal/platform/plutosdr/pluto_windows.go index 7bf5b57..fa57e3f 100644 --- a/internal/platform/plutosdr/pluto_windows.go +++ b/internal/platform/plutosdr/pluto_windows.go @@ -368,6 +368,20 @@ func (d *PlutoDriver) Stop(_ context.Context) error { func (d *PlutoDriver) Flush(_ context.Context) error { return nil } +func (d *PlutoDriver) Tune(_ context.Context, freqHz float64) error { + d.mu.Lock() + defer d.mu.Unlock() + if d.phyDev == 0 { + return fmt.Errorf("pluto: not configured") + } + phyChanLO := d.findChannel(d.phyDev, "altvoltage1", true) + if phyChanLO == 0 { + return fmt.Errorf("pluto: TX LO channel not found") + } + d.writeChanAttrLL(phyChanLO, "frequency", int64(freqHz)) + return nil +} + func (d *PlutoDriver) Close(_ context.Context) error { d.mu.Lock() defer d.mu.Unlock() diff --git a/internal/platform/soapy.go b/internal/platform/soapy.go index 81e94b6..8fe4845 100644 --- a/internal/platform/soapy.go +++ b/internal/platform/soapy.go @@ -68,6 +68,8 @@ type SoapyDriver interface { Flush(ctx context.Context) error Close(ctx context.Context) error Stats() RuntimeStats + // Tune changes the TX center frequency while streaming. Thread-safe. + Tune(ctx context.Context, freqHz float64) error } // ----------------------------------------------------------------------- @@ -220,3 +222,10 @@ func (sd *SimulatedDriver) Stats() RuntimeStats { EffectiveRate: sd.cfg.SampleRateHz, } } + +func (sd *SimulatedDriver) Tune(_ context.Context, freqHz float64) error { + sd.mu.Lock() + sd.cfg.CenterFreqHz = freqHz + sd.mu.Unlock() + return nil +} diff --git a/internal/platform/soapysdr/native.go b/internal/platform/soapysdr/native.go index de60027..4c9d824 100644 --- a/internal/platform/soapysdr/native.go +++ b/internal/platform/soapysdr/native.go @@ -209,6 +209,15 @@ func (d *nativeDriver) Stop(_ context.Context) error { func (d *nativeDriver) Flush(_ context.Context) error { return nil } +func (d *nativeDriver) Tune(_ context.Context, freqHz float64) error { + d.mu.Lock() + defer d.mu.Unlock() + if d.dev == 0 || d.lib == nil { + return fmt.Errorf("soapy: not configured") + } + return d.lib.setFrequency(d.dev, dirTX, 0, freqHz) +} + func (d *nativeDriver) Close(_ context.Context) error { d.mu.Lock() defer d.mu.Unlock() diff --git a/internal/rds/encoder.go b/internal/rds/encoder.go index e87b8b0..8abcaf5 100644 --- a/internal/rds/encoder.go +++ b/internal/rds/encoder.go @@ -1,6 +1,9 @@ package rds -import "math" +import ( + "math" + "sync/atomic" +) // RDS encoder — port of PiFmRds, adapted for arbitrary sample rates. // At 228 kHz: uses exact {0,+1,0,-1} carrier (identical to PiFmRds). @@ -88,6 +91,11 @@ type Encoder struct { carrierStep float64 SampleRate float64 + + // Live-updatable text — written by control API, read at group boundaries. + // Zero-contention: atomic swap, checked once per RDS group (~88ms at 228kHz). + livePS atomic.Value // string + liveRT atomic.Value // string } func NewEncoder(cfg RDSConfig) (*Encoder, error) { @@ -141,6 +149,18 @@ func (e *Encoder) Reset() { for i := range e.ring { e.ring[i] = 0 } } +// UpdateText hot-swaps PS and/or RT. Thread-safe — called from HTTP handlers, +// applied at the next RDS group boundary by the DSP goroutine. +// Pass empty string to leave a field unchanged. +func (e *Encoder) UpdateText(ps, rt string) { + if ps != "" { + e.livePS.Store(normalizePS(ps)) + } + if rt != "" { + e.liveRT.Store(normalizeRT(rt)) + } +} + // NextSample returns the next RDS subcarrier sample at the configured rate. // Uses the internal free-running 57 kHz carrier. Prefer NextSampleWithCarrier // for phase-locked operation in a stereo MPX chain. @@ -157,6 +177,16 @@ func (e *Encoder) NextSample() float64 { func (e *Encoder) NextSampleWithCarrier(carrier float64) float64 { if e.sampleCount >= e.spb { if e.bitPos >= bitsPerGroup { + // Apply live text updates at group boundaries (~88ms at 228kHz). + // This is the only place we read the atomics — zero per-sample overhead. + if ps, ok := e.livePS.Load().(string); ok && ps != "" { + e.scheduler.cfg.PS = ps + } + if rt, ok := e.liveRT.Load().(string); ok && rt != "" { + e.scheduler.cfg.RT = rt + e.scheduler.rtIdx = 0 // restart RT transmission for new text + e.scheduler.rtABFlag = !e.scheduler.rtABFlag // toggle A/B per RDS spec + } e.getRDSGroup() e.bitPos = 0 } From c93607625df8a0ce684cd4bc36544159f8083d6a Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Fri, 3 Apr 2026 17:28:51 +0200 Subject: [PATCH 2/4] fix: make Pluto live LO retune fail loudly and avoid redundant channel lookups Cache altvoltage1 during Configure(), clear it in cleanup(), and use the cached channel for Tune() instead of calling findChannel on every retune. Tune() now checks the direct iio_channel_attr_write_longlong return code and reports LO write failures back to the caller with the failing frequency and libiio error code. --- internal/platform/plutosdr/pluto_windows.go | 22 +++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/internal/platform/plutosdr/pluto_windows.go b/internal/platform/plutosdr/pluto_windows.go index fa57e3f..eb51e24 100644 --- a/internal/platform/plutosdr/pluto_windows.go +++ b/internal/platform/plutosdr/pluto_windows.go @@ -110,6 +110,7 @@ type PlutoDriver struct { phyDev uintptr // iio_device* (ad9361-phy) chanI uintptr // iio_channel* TX I chanQ uintptr // iio_channel* TX Q + chanLO uintptr // iio_channel* TX LO (altvoltage1), cached for Tune() buf uintptr // iio_buffer* bufSize int // samples per buffer push @@ -204,6 +205,7 @@ func (d *PlutoDriver) Configure(_ context.Context, cfg platform.SoapyConfig) err // TX LO frequency phyChanLO := d.findChannel(phyDev, "altvoltage1", true) // TX LO + d.chanLO = phyChanLO // cache for Tune() if phyChanLO != 0 { freqHz := int64(cfg.CenterFreqHz) if freqHz <= 0 { @@ -371,14 +373,21 @@ func (d *PlutoDriver) Flush(_ context.Context) error { return nil } func (d *PlutoDriver) Tune(_ context.Context, freqHz float64) error { d.mu.Lock() defer d.mu.Unlock() - if d.phyDev == 0 { - return fmt.Errorf("pluto: not configured") + if !d.configured || d.chanLO == 0 { + return fmt.Errorf("pluto: not configured or LO channel not available") + } + if d.lib.pChannelAttrWriteLongLong == nil { + return fmt.Errorf("pluto: iio_channel_attr_write_longlong not loaded") } - phyChanLO := d.findChannel(d.phyDev, "altvoltage1", true) - if phyChanLO == 0 { - return fmt.Errorf("pluto: TX LO channel not found") + cAttr, _ := syscall.BytePtrFromString("frequency") + ret, _, _ := d.lib.pChannelAttrWriteLongLong.Call( + d.chanLO, + uintptr(unsafe.Pointer(cAttr)), + uintptr(int64(freqHz)), + ) + if int32(ret) < 0 { + return fmt.Errorf("pluto: LO tune to %.0f Hz failed (iio rc=%d)", freqHz, int32(ret)) } - d.writeChanAttrLL(phyChanLO, "frequency", int64(freqHz)) return nil } @@ -420,6 +429,7 @@ func (d *PlutoDriver) cleanup() { d.disableChannel(d.chanQ) d.chanQ = 0 } + d.chanLO = 0 // config-only channel, no disable needed if d.ctx != 0 && d.lib.pDestroyCtx != nil { d.lib.pDestroyCtx.Call(d.ctx) d.ctx = 0 From 66ecde05de23e9be71b9e51ff824bd492aa664e3 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Fri, 3 Apr 2026 17:33:28 +0200 Subject: [PATCH 3/4] docs: add API reference for live TX control and hot reload Document the HTTP API surface in detail, including runtime control, live config updates, split-rate behaviour, and the current hot-reload semantics for on-air parameter changes. --- docs/API.md | 220 +++++++++++++++++++++++++++++++++++++++++++++++++ docs/README.md | 43 ++++++---- 2 files changed, 245 insertions(+), 18 deletions(-) create mode 100644 docs/API.md diff --git a/docs/API.md b/docs/API.md new file mode 100644 index 0000000..8e3b096 --- /dev/null +++ b/docs/API.md @@ -0,0 +1,220 @@ +# fm-rds-tx HTTP Control API + +Base URL: `http://{listenAddress}` (default `127.0.0.1:8088`) + +--- + +## Endpoints + +### `GET /healthz` + +Health check. + +**Response:** +```json +{"ok": true} +``` + +--- + +### `GET /status` + +Current transmitter status (read-only snapshot). + +**Response:** +```json +{ + "service": "fm-rds-tx", + "backend": "pluto", + "frequencyMHz": 100.0, + "stereoEnabled": true, + "rdsEnabled": true, + "preEmphasisTauUS": 50, + "limiterEnabled": true, + "fmModulationEnabled": true +} +``` + +--- + +### `GET /runtime` + +Live engine and driver telemetry. Only populated when TX is active. + +**Response:** +```json +{ + "engine": { + "state": "running", + "chunksProduced": 12345, + "totalSamples": 1408950000, + "underruns": 0, + "lastError": "", + "uptimeSeconds": 3614.2 + }, + "driver": { + "txEnabled": true, + "streamActive": true, + "framesWritten": 12345, + "samplesWritten": 1408950000, + "underruns": 0, + "effectiveSampleRateHz": 2280000 + } +} +``` + +--- + +### `GET /config` + +Full current configuration (all fields, including non-patchable). + +**Response:** Complete `Config` JSON object. + +--- + +### `POST /config` + +**Live parameter update.** Changes are applied to the running TX engine immediately — no restart required. Only include fields you want to change (PATCH semantics). + +**Request body:** JSON with any subset of patchable fields. + +**Response:** +```json +{"ok": true, "live": true} +``` + +`"live": true` = changes were forwarded to the running engine. +`"live": false` = engine not active, changes saved for next start. + +#### Patchable fields — DSP (applied within ~50ms) + +| Field | Type | Range | Description | +|---|---|---|---| +| `frequencyMHz` | float | 65–110 | TX center frequency. Tunes hardware LO live. | +| `outputDrive` | float | 0–3 | Composite output level multiplier. | +| `stereoEnabled` | bool | | Enable/disable stereo (pilot + 38kHz subcarrier). | +| `pilotLevel` | float | 0–0.2 | 19 kHz pilot injection level. | +| `rdsInjection` | float | 0–0.15 | 57 kHz RDS subcarrier injection level. | +| `rdsEnabled` | bool | | Enable/disable RDS subcarrier. | +| `limiterEnabled` | bool | | Enable/disable MPX peak limiter. | +| `limiterCeiling` | float | 0–2 | Limiter ceiling (max composite amplitude). | + +#### Patchable fields — RDS text (applied within ~88ms) + +| Field | Type | Max length | Description | +|---|---|---|---| +| `ps` | string | 8 chars | Program Service name (station name on receiver display). | +| `radioText` | string | 64 chars | RadioText message (scrolling text on receiver). | + +When `radioText` is updated, the RDS A/B flag toggles automatically per spec, signaling receivers to refresh their display. + +#### Patchable fields — other (saved, not live-applied) + +| Field | Type | Description | +|---|---|---| +| `toneLeftHz` | float | Left tone frequency (test generator). | +| `toneRightHz` | float | Right tone frequency (test generator). | +| `toneAmplitude` | float | Test tone amplitude (0–1). | +| `preEmphasisTauUS` | float | Pre-emphasis time constant. **Requires restart.** | + +#### Examples + +```bash +# Tune to 99.5 MHz +curl -X POST localhost:8088/config -d '{"frequencyMHz": 99.5}' + +# Switch to mono +curl -X POST localhost:8088/config -d '{"stereoEnabled": false}' + +# Update now-playing text +curl -X POST localhost:8088/config \ + -d '{"ps": "MYRADIO", "radioText": "Artist - Song Title"}' + +# Reduce power + disable limiter +curl -X POST localhost:8088/config \ + -d '{"outputDrive": 0.8, "limiterEnabled": false}' + +# Full update +curl -X POST localhost:8088/config -d '{ + "frequencyMHz": 101.3, + "outputDrive": 2.2, + "stereoEnabled": true, + "pilotLevel": 0.041, + "rdsInjection": 0.021, + "rdsEnabled": true, + "limiterEnabled": true, + "limiterCeiling": 1.0, + "ps": "PIRATE", + "radioText": "Broadcasting from the attic" +}' +``` + +#### Error handling + +Invalid values return `400 Bad Request` with a descriptive message: +```bash +curl -X POST localhost:8088/config -d '{"frequencyMHz": 200}' +# → 400: frequencyMHz out of range (65-110) +``` + +--- + +### `POST /tx/start` + +Start transmission. Requires `--tx` mode with hardware. + +**Response:** +```json +{"ok": true, "action": "started"} +``` + +**Errors:** +- `405` if not POST +- `503` if no TX controller (not in `--tx` mode) +- `409` if already running + +--- + +### `POST /tx/stop` + +Stop transmission. + +**Response:** +```json +{"ok": true, "action": "stopped"} +``` + +--- + +### `GET /dry-run` + +Generate a synthetic frame summary without hardware. Useful for config verification. + +**Response:** `FrameSummary` JSON with mode, rates, source info, preview samples. + +--- + +## Live update architecture + +All live updates are **lock-free** in the DSP path: + +| What | Mechanism | Latency | +|---|---|---| +| DSP params | `atomic.Pointer[LiveParams]` loaded once per chunk | ≤ 50ms | +| RDS text | `atomic.Value` in encoder, read at group boundary | ≤ 88ms | +| TX frequency | `atomic.Pointer` in engine, `driver.Tune()` between chunks | ≤ 50ms | + +No mutex, no channel, no allocation in the real-time path. The HTTP goroutine writes atomics, the DSP goroutine reads them. + +## Parameters that require restart + +These cannot be hot-reloaded (they affect DSP pipeline structure): + +- `compositeRateHz` — changes sample rate of entire DSP chain +- `deviceSampleRateHz` — changes hardware rate / upsampler ratio +- `maxDeviationHz` — changes FM modulator scaling +- `preEmphasisTauUS` — changes filter coefficients +- `rds.pi` / `rds.pty` — rarely change, baked into encoder init +- `audio.inputPath` — audio source selection +- `backend.kind` / `backend.device` — hardware selection diff --git a/docs/README.md b/docs/README.md index 2032ba9..9549c41 100644 --- a/docs/README.md +++ b/docs/README.md @@ -52,6 +52,26 @@ FM broadcast requires pre-emphasis to boost high frequencies before transmission - `fmModulationEnabled: true` — output is baseband FM-modulated IQ (I² + Q² = 1). This is what SDR transmitters expect. - `fmModulationEnabled: false` — output is raw composite MPX (I = composite, Q = 0). Useful for analysis or composite exciters. +### Split-rate mode (Pluto / HackRF) + +When `deviceSampleRateHz > compositeRateHz` (e.g. Pluto at 2.28 MHz, composite at 228 kHz), the engine automatically activates split-rate mode: + +1. DSP chain (stereo, RDS, limiter) runs at `compositeRateHz` (228 kHz) +2. `FMUpsampler` performs FM modulation + phase-domain interpolation to `deviceSampleRateHz` +3. Hardware receives IQ at device rate + +This halves CPU load compared to running all DSP at device rate. Log output confirms the active mode: + +``` +engine: split-rate mode — DSP@228000Hz → upsample@2280000Hz (ratio 10.00) +``` + +When rates are equal (e.g. LimeSDR at 228 kHz), same-rate mode is used: + +``` +engine: same-rate mode — DSP@228000Hz +``` + ### Limiter The MPX limiter prevents overmodulation by applying smooth gain reduction when the composite signal exceeds the configured ceiling. A hard clipper acts as a safety net after the limiter. @@ -61,24 +81,11 @@ The MPX limiter prevents overmodulation by applying smooth gain reduction when t ### HTTP control surface -Available endpoints: -- `GET /healthz` -- `GET /status` -- `GET /dry-run` -- `GET /config` -- `POST /config` - -Current patchable runtime fields via `POST /config`: -- `frequencyMHz` -- `outputDrive` -- `toneLeftHz` -- `toneRightHz` -- `toneAmplitude` -- `ps` -- `radioText` -- `preEmphasisUS` -- `limiterEnabled` -- `limiterCeiling` +Full API documentation: **[docs/API.md](API.md)** + +All major TX parameters are hot-reloadable via `POST /config` during live transmission — frequency, stereo/mono, RDS text, output drive, pilot/RDS levels, limiter. Changes take effect within 50–88ms without stopping the stream. + +Available endpoints: `/healthz`, `/status`, `/runtime`, `/config` (GET/POST), `/dry-run`, `/tx/start`, `/tx/stop` ### Internal DSP module - `cd internal` From 9daa48195420ac54bf80a1d60882ac386220b359 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Fri, 3 Apr 2026 20:12:26 +0200 Subject: [PATCH 4/4] feat: add embedded web UI for live TX control Embed a browser-based control surface into the HTTP server and document the live-control API. The new UI exposes TX start/stop, runtime telemetry, frequency, levels, toggles, and RDS text updates against the existing live-config endpoints. --- internal/control/control.go | 15 + internal/control/ui.html | 875 ++++++++++++++++++++++++++++++++++++ 2 files changed, 890 insertions(+) create mode 100644 internal/control/ui.html diff --git a/internal/control/control.go b/internal/control/control.go index 25a5a5e..e0cf0e8 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -1,6 +1,7 @@ package control import ( + _ "embed" "encoding/json" "net/http" "sync" @@ -10,6 +11,9 @@ import ( "github.com/jan/fm-rds-tx/internal/platform" ) +//go:embed ui.html +var uiHTML []byte + // TXController is an optional interface the Server uses to start/stop TX // and apply live config changes. type TXController interface { @@ -76,6 +80,7 @@ func (s *Server) SetDriver(drv platform.SoapyDriver) { func (s *Server) Handler() http.Handler { mux := http.NewServeMux() + mux.HandleFunc("/", s.handleUI) mux.HandleFunc("/healthz", s.handleHealth) mux.HandleFunc("/status", s.handleStatus) mux.HandleFunc("/dry-run", s.handleDryRun) @@ -91,6 +96,16 @@ func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) } +func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Cache-Control", "no-cache") + w.Write(uiHTML) +} + func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() cfg := s.cfg diff --git a/internal/control/ui.html b/internal/control/ui.html new file mode 100644 index 0000000..1a70e9c --- /dev/null +++ b/internal/control/ui.html @@ -0,0 +1,875 @@ + + + + + +fm-rds-tx + + + +
+ + +
+

FM-RDS-TX

+
+
+ connecting +
+
+ + +
+
---.--MHz
+ + +
--
+
+ + +
+
Chunks
--
+
Samples
--
+
Underruns
0
+
Uptime
--
+
Rate
--
+
+ + +
+
+
+

Frequency

+ +
+
+
+ TX Freq +
+ + + MHz +
+
+ +
+
+ + +
+
+
+

Levels

+ +
+
+
+ Output Drive +
+ + -- +
+
+
+ Pilot Level +
+ + -- +
+
+
+ RDS Inject +
+ + -- +
+
+
+ Limiter Ceil +
+ + -- +
+
+ +
+
+ + +
+
+
+

Switches

+ +
+
+
+ Stereo +
+
+ -- +
+
+
+ RDS +
+
+ -- +
+
+
+ Limiter +
+
+ -- +
+
+
+
+ + +
+
+
+

RDS

+ +
+
+
+ Program Service (PS) + +
0/8
+
+
+ RadioText (RT) + +
0/64
+
+ +
+
+ + +
+
+

Log

+ +
+
+
+
+
+ +
+ + +
+ + + +