From 617ad630dc272600a6cd4424c53ca859de3f8e44 Mon Sep 17 00:00:00 2001 From: Jan Date: Wed, 8 Apr 2026 11:11:26 +0200 Subject: [PATCH 1/3] fix runtime edge cases across control, config, and ingest Address a set of production-facing edge cases discovered during bug hunting. Included fixes: - make FrameQueue close handling race-safe by replacing the TOCTOU close check with a dedicated close signal channel - relax tone frequency validation when tone amplitude is zero, and default tone amplitude to 0 to avoid unintended test-tone output - validate PI codes consistently whenever provided, and require a PI when RDS is enabled - harden Icecast reconnect backoff against duration overflow - prevent duplicate hard-reload goroutines from rapid repeated ingest-save requests - clamp BS.412 power accumulation against negative float drift before sqrt to avoid NaN gain propagation These changes focus on shutdown safety, config correctness, reconnect robustness, and long-running DSP stability. --- internal/config/config.go | 16 ++++++++++----- internal/control/control.go | 6 +++++- internal/dsp/bs412.go | 7 +++++++ internal/ingest/adapters/icecast/reconnect.go | 8 ++++++-- internal/output/frame_queue.go | 20 +++++++++++++------ 5 files changed, 43 insertions(+), 14 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 45f64bd..699d70d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -140,7 +140,8 @@ type IngestAES67DiscoveryConfig struct { func Default() Config { return Config{ - Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0.4}, + // BUG-C fix: tones off by default (was 0.4 — caused unintended audio output). + Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0}, RDS: RDSConfig{Enabled: true, PI: "1234", PS: "FMRTX", RadioText: "fm-rds-tx", PTY: 0}, FM: FMConfig{ FrequencyMHz: 100.0, @@ -256,8 +257,9 @@ func (c Config) Validate() error { if c.Audio.Gain < 0 || c.Audio.Gain > 4 { return fmt.Errorf("audio.gain out of range") } - if c.Audio.ToneLeftHz <= 0 || c.Audio.ToneRightHz <= 0 { - return fmt.Errorf("audio tone frequencies must be positive") + // BUG-B fix: only enforce positive freq when amplitude is non-zero. + if c.Audio.ToneAmplitude > 0 && (c.Audio.ToneLeftHz <= 0 || c.Audio.ToneRightHz <= 0) { + return fmt.Errorf("audio tone frequencies must be positive when toneAmplitude > 0") } if c.Audio.ToneAmplitude < 0 || c.Audio.ToneAmplitude > 1 { return fmt.Errorf("audio.toneAmplitude out of range") @@ -411,11 +413,15 @@ func (c Config) Validate() error { if c.Ingest.Icecast.RadioText.MaxLen < 0 || c.Ingest.Icecast.RadioText.MaxLen > 64 { return fmt.Errorf("ingest.icecast.radioText.maxLen out of range (0-64)") } - // Fail-loud PI validation - if c.RDS.Enabled { + // BUG-D fix: validate PI whenever non-empty, not only when RDS is enabled. + // An invalid PI stored in config causes a silent failure when RDS is later + // enabled via live-patch. + if strings.TrimSpace(c.RDS.PI) != "" { if _, err := ParsePI(c.RDS.PI); err != nil { return fmt.Errorf("rds config: %w", err) } + } else if c.RDS.Enabled { + return fmt.Errorf("rds.pi is required when rds.enabled is true") } if c.RDS.PTY < 0 || c.RDS.PTY > 31 { return fmt.Errorf("rds.pty out of range (0-31)") diff --git a/internal/control/control.go b/internal/control/control.go index ef67f30..4f021c8 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -61,6 +61,9 @@ type Server struct { ingestRt IngestRuntime // optional, for /runtime ingest stats saveConfig func(config.Config) error hardReload func() + // BUG-F fix: reloadPending prevents multiple concurrent goroutines from + // calling hardReload when handleIngestSave is hit multiple times quickly. + reloadPending atomic.Bool audit auditCounters } @@ -693,9 +696,10 @@ func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) { "saved": true, "reloadScheduled": reloadScheduled, }) - if reloadScheduled { + if reloadScheduled && s.reloadPending.CompareAndSwap(false, true) { go func(fn func()) { time.Sleep(250 * time.Millisecond) + s.reloadPending.Store(false) fn() }(reload) } diff --git a/internal/dsp/bs412.go b/internal/dsp/bs412.go index 98bb2ea..7a6303c 100644 --- a/internal/dsp/bs412.go +++ b/internal/dsp/bs412.go @@ -115,6 +115,13 @@ func (l *BS412Limiter) ProcessChunk(audioPower float64) float64 { old := l.powerBuf[l.bufIdx] l.powerBuf[l.bufIdx] = audioPower l.powerSum += audioPower - old + // BUG-G fix: float64 accumulation over 1200+ chunks can drift slightly + // negative due to rounding. A negative powerSum → negative avgPower → + // math.Sqrt of negative → NaN → gain becomes NaN, silently disabling + // the limiter. Clamp to zero to keep the invariant powerSum >= 0. + if l.powerSum < 0 { + l.powerSum = 0 + } l.bufIdx++ if l.bufIdx >= len(l.powerBuf) { l.bufIdx = 0 diff --git a/internal/ingest/adapters/icecast/reconnect.go b/internal/ingest/adapters/icecast/reconnect.go index 44fe2c2..c9e9fac 100644 --- a/internal/ingest/adapters/icecast/reconnect.go +++ b/internal/ingest/adapters/icecast/reconnect.go @@ -20,11 +20,15 @@ func (c ReconnectConfig) nextBackoff(attempt int) time.Duration { if max <= 0 { max = 15000 } + maxD := time.Duration(max) * time.Millisecond d := time.Duration(initial) * time.Millisecond + // BUG-E fix: check d <= 0 (overflow) as well as d >= max. + // int64 overflow after ~63 doublings caused d to go negative, + // producing spurious short backoffs before recovering. for i := 1; i < attempt; i++ { d *= 2 - if d >= time.Duration(max)*time.Millisecond { - return time.Duration(max) * time.Millisecond + if d <= 0 || d >= maxD { + return maxD } } return d diff --git a/internal/output/frame_queue.go b/internal/output/frame_queue.go index 0443eec..223714c 100644 --- a/internal/output/frame_queue.go +++ b/internal/output/frame_queue.go @@ -45,6 +45,9 @@ const ( type FrameQueue struct { capacity int ch chan *CompositeFrame + // closeCh is closed by Close() and used in Push/Pop selects so that + // a concurrent Close() can never race with a channel send. + closeCh chan struct{} mu sync.Mutex depth int @@ -68,6 +71,7 @@ func NewFrameQueue(capacity int) *FrameQueue { fq := &FrameQueue{ capacity: capacity, ch: make(chan *CompositeFrame, capacity), + closeCh: make(chan struct{}), lowWaterMark: capacity, } fq.trackDepth(0) @@ -121,17 +125,18 @@ func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error { if frame == nil { return errors.New("frame required") } - if q.isClosed() { - return ErrFrameQueueClosed - } - // BUG-05 fix: increment depth BEFORE the channel send so that Stats() - // never reports fill=0 while a frame is in the channel awaiting receive. - // On context cancellation, undo the increment. + // BUG-A fix: use closeCh in the select so that a concurrent Close() can + // never race with the send. The old isClosed() pre-check + separate + // ch<- send had a TOCTOU gap that could panic with "send on closed channel". + // BUG-05 fix: increment depth before the send; undo on cancel/close. q.updateDepth(+1) select { case q.ch <- frame: return nil + case <-q.closeCh: + q.updateDepth(-1) + return ErrFrameQueueClosed case <-ctx.Done(): q.updateDepth(-1) q.recordPushTimeout() @@ -160,6 +165,9 @@ func (q *FrameQueue) Close() { q.mu.Lock() q.closed = true q.mu.Unlock() + // Close closeCh first so blocked Push() calls unblock safely + // before close(ch) removes the destination. + close(q.closeCh) close(q.ch) }) } From 126b26b6d37ea6c290f5b398a9b20d72c1ad266a Mon Sep 17 00:00:00 2001 From: Jan Date: Wed, 8 Apr 2026 11:20:44 +0200 Subject: [PATCH 2/3] harden config persistence and runtime state transitions Improve reliability in two critical paths: - make config saves atomic by writing to a temp file in the target directory, syncing it, and renaming it into place so crashes cannot leave a half-written JSON config behind - serialize runtime state transitions with a dedicated mutex so concurrent state updates from run() and writerLoop() cannot double-record transitions or increment counters twice Also remove an unreachable nil-check after cloneFrame() to keep the engine loop honest and easier to reason about. --- internal/app/engine.go | 12 +++++++----- internal/config/config.go | 30 +++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/internal/app/engine.go b/internal/app/engine.go index a1f2e71..12951f1 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -147,6 +147,7 @@ type Engine struct { startedAt time.Time wg sync.WaitGroup runtimeState atomic.Value + stateMu sync.Mutex // guards setRuntimeState check-then-store (NEW-2 fix) chunksProduced atomic.Uint64 totalSamples atomic.Uint64 @@ -571,13 +572,9 @@ func (e *Engine) run(ctx context.Context) { updateMaxDuration(&e.maxGenerateNs, genDur) updateMaxDuration(&e.maxUpsampleNs, upDur) + // cloneFrame never returns nil when src is non-nil (NEW-3: dead nil check removed) enqueued := cloneFrame(frame) enqueued.EnqueuedAt = time.Now() - if enqueued == nil { - e.lastError.Store("engine: frame clone failed") - e.underruns.Add(1) - continue - } if err := e.frameQueue.Push(ctx, enqueued); err != nil { if ctx.Err() != nil { @@ -699,6 +696,11 @@ func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame { } func (e *Engine) setRuntimeState(state RuntimeState) { + // NEW-2 fix: hold stateMu so that concurrent calls from run() and + // writerLoop() cannot both see prev != state and both record a + // spurious duplicate transition. + e.stateMu.Lock() + defer e.stateMu.Unlock() now := time.Now() prev := e.currentRuntimeState() if prev != state { diff --git a/internal/config/config.go b/internal/config/config.go index 699d70d..d0b29ca 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "strconv" "strings" ) @@ -250,7 +251,34 @@ func Save(path string, cfg Config) error { return err } data = append(data, '\n') - return os.WriteFile(path, data, 0o644) + // NEW-1 fix: write to a temp file in the same directory, then rename atomically. + // A direct os.WriteFile on the target leaves a corrupt file if the process + // crashes mid-write. os.Rename is atomic on POSIX filesystems. + dir := filepath.Dir(path) + tmp, err := os.CreateTemp(dir, ".fmrtx-config-*.json.tmp") + if err != nil { + return fmt.Errorf("config save: create temp: %w", err) + } + tmpPath := tmp.Name() + if _, err := tmp.Write(data); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpPath) + return fmt.Errorf("config save: write temp: %w", err) + } + if err := tmp.Sync(); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpPath) + return fmt.Errorf("config save: sync temp: %w", err) + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmpPath) + return fmt.Errorf("config save: close temp: %w", err) + } + if err := os.Rename(tmpPath, path); err != nil { + _ = os.Remove(tmpPath) + return fmt.Errorf("config save: rename: %w", err) + } + return nil } func (c Config) Validate() error { From da68cd965df9b729a245ff4f4fdc5fc815c427a6 Mon Sep 17 00:00:00 2001 From: Jan Date: Wed, 8 Apr 2026 11:28:44 +0200 Subject: [PATCH 3/3] fix ffmpeg fallback decoder pipe deadlock Prevent the fallback FFmpeg decoder from deadlocking on longer-running streams. The decoder previously drained stderr with io.ReadAll() before reading PCM from stdout. Once FFmpeg filled the stdout pipe buffer, the process blocked on further stdout writes, never closed stderr, and io.ReadAll(stderr) never returned. That stalled the decoder before readPCM() could even start. Drain stderr concurrently in its own goroutine so stdin, stdout, and stderr can all make progress in parallel. This matches the expected pipe handling model for long-running FFmpeg processes and keeps the fallback decoder usable for real streams. --- internal/ingest/decoder/fallback/ffmpeg.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/ingest/decoder/fallback/ffmpeg.go b/internal/ingest/decoder/fallback/ffmpeg.go index 6dc4198..27f10ee 100644 --- a/internal/ingest/decoder/fallback/ffmpeg.go +++ b/internal/ingest/decoder/fallback/ffmpeg.go @@ -81,7 +81,16 @@ func (d *FFmpegDecoder) DecodeStream(ctx context.Context, r io.Reader, meta deco } }() - stderrData, _ := io.ReadAll(stderr) + // DEADLOCK FIX: stderr and stdout must be drained concurrently. + // Reading stderr synchronously before readPCM means ffmpeg blocks when + // stdout's pipe buffer fills (typically 64KB), which prevents it from + // closing stderr, which prevents ReadAll from returning — deadlock. + var stderrData []byte + wg.Add(1) + go func() { + defer wg.Done() + stderrData, _ = io.ReadAll(stderr) + }() readErr := d.readPCM(ctx, stdout, sampleRate, channels, meta.SourceID, emit) waitErr := cmd.Wait() wg.Wait()