diff --git a/internal/app/engine.go b/internal/app/engine.go index a1f2e71..12951f1 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -147,6 +147,7 @@ type Engine struct { startedAt time.Time wg sync.WaitGroup runtimeState atomic.Value + stateMu sync.Mutex // guards setRuntimeState check-then-store (NEW-2 fix) chunksProduced atomic.Uint64 totalSamples atomic.Uint64 @@ -571,13 +572,9 @@ func (e *Engine) run(ctx context.Context) { updateMaxDuration(&e.maxGenerateNs, genDur) updateMaxDuration(&e.maxUpsampleNs, upDur) + // cloneFrame never returns nil when src is non-nil (NEW-3: dead nil check removed) enqueued := cloneFrame(frame) enqueued.EnqueuedAt = time.Now() - if enqueued == nil { - e.lastError.Store("engine: frame clone failed") - e.underruns.Add(1) - continue - } if err := e.frameQueue.Push(ctx, enqueued); err != nil { if ctx.Err() != nil { @@ -699,6 +696,11 @@ func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame { } func (e *Engine) setRuntimeState(state RuntimeState) { + // NEW-2 fix: hold stateMu so that concurrent calls from run() and + // writerLoop() cannot both see prev != state and both record a + // spurious duplicate transition. + e.stateMu.Lock() + defer e.stateMu.Unlock() now := time.Now() prev := e.currentRuntimeState() if prev != state { diff --git a/internal/config/config.go b/internal/config/config.go index 45f64bd..d0b29ca 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "strconv" "strings" ) @@ -140,7 +141,8 @@ type IngestAES67DiscoveryConfig struct { func Default() Config { return Config{ - Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0.4}, + // BUG-C fix: tones off by default (was 0.4 — caused unintended audio output). + Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0}, RDS: RDSConfig{Enabled: true, PI: "1234", PS: "FMRTX", RadioText: "fm-rds-tx", PTY: 0}, FM: FMConfig{ FrequencyMHz: 100.0, @@ -249,15 +251,43 @@ func Save(path string, cfg Config) error { return err } data = append(data, '\n') - return os.WriteFile(path, data, 0o644) + // NEW-1 fix: write to a temp file in the same directory, then rename atomically. + // A direct os.WriteFile on the target leaves a corrupt file if the process + // crashes mid-write. os.Rename is atomic on POSIX filesystems. + dir := filepath.Dir(path) + tmp, err := os.CreateTemp(dir, ".fmrtx-config-*.json.tmp") + if err != nil { + return fmt.Errorf("config save: create temp: %w", err) + } + tmpPath := tmp.Name() + if _, err := tmp.Write(data); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpPath) + return fmt.Errorf("config save: write temp: %w", err) + } + if err := tmp.Sync(); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpPath) + return fmt.Errorf("config save: sync temp: %w", err) + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmpPath) + return fmt.Errorf("config save: close temp: %w", err) + } + if err := os.Rename(tmpPath, path); err != nil { + _ = os.Remove(tmpPath) + return fmt.Errorf("config save: rename: %w", err) + } + return nil } func (c Config) Validate() error { if c.Audio.Gain < 0 || c.Audio.Gain > 4 { return fmt.Errorf("audio.gain out of range") } - if c.Audio.ToneLeftHz <= 0 || c.Audio.ToneRightHz <= 0 { - return fmt.Errorf("audio tone frequencies must be positive") + // BUG-B fix: only enforce positive freq when amplitude is non-zero. + if c.Audio.ToneAmplitude > 0 && (c.Audio.ToneLeftHz <= 0 || c.Audio.ToneRightHz <= 0) { + return fmt.Errorf("audio tone frequencies must be positive when toneAmplitude > 0") } if c.Audio.ToneAmplitude < 0 || c.Audio.ToneAmplitude > 1 { return fmt.Errorf("audio.toneAmplitude out of range") @@ -411,11 +441,15 @@ func (c Config) Validate() error { if c.Ingest.Icecast.RadioText.MaxLen < 0 || c.Ingest.Icecast.RadioText.MaxLen > 64 { return fmt.Errorf("ingest.icecast.radioText.maxLen out of range (0-64)") } - // Fail-loud PI validation - if c.RDS.Enabled { + // BUG-D fix: validate PI whenever non-empty, not only when RDS is enabled. + // An invalid PI stored in config causes a silent failure when RDS is later + // enabled via live-patch. + if strings.TrimSpace(c.RDS.PI) != "" { if _, err := ParsePI(c.RDS.PI); err != nil { return fmt.Errorf("rds config: %w", err) } + } else if c.RDS.Enabled { + return fmt.Errorf("rds.pi is required when rds.enabled is true") } if c.RDS.PTY < 0 || c.RDS.PTY > 31 { return fmt.Errorf("rds.pty out of range (0-31)") diff --git a/internal/control/control.go b/internal/control/control.go index ef67f30..4f021c8 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -61,6 +61,9 @@ type Server struct { ingestRt IngestRuntime // optional, for /runtime ingest stats saveConfig func(config.Config) error hardReload func() + // BUG-F fix: reloadPending prevents multiple concurrent goroutines from + // calling hardReload when handleIngestSave is hit multiple times quickly. + reloadPending atomic.Bool audit auditCounters } @@ -693,9 +696,10 @@ func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) { "saved": true, "reloadScheduled": reloadScheduled, }) - if reloadScheduled { + if reloadScheduled && s.reloadPending.CompareAndSwap(false, true) { go func(fn func()) { time.Sleep(250 * time.Millisecond) + s.reloadPending.Store(false) fn() }(reload) } diff --git a/internal/dsp/bs412.go b/internal/dsp/bs412.go index 98bb2ea..7a6303c 100644 --- a/internal/dsp/bs412.go +++ b/internal/dsp/bs412.go @@ -115,6 +115,13 @@ func (l *BS412Limiter) ProcessChunk(audioPower float64) float64 { old := l.powerBuf[l.bufIdx] l.powerBuf[l.bufIdx] = audioPower l.powerSum += audioPower - old + // BUG-G fix: float64 accumulation over 1200+ chunks can drift slightly + // negative due to rounding. A negative powerSum → negative avgPower → + // math.Sqrt of negative → NaN → gain becomes NaN, silently disabling + // the limiter. Clamp to zero to keep the invariant powerSum >= 0. + if l.powerSum < 0 { + l.powerSum = 0 + } l.bufIdx++ if l.bufIdx >= len(l.powerBuf) { l.bufIdx = 0 diff --git a/internal/ingest/adapters/icecast/reconnect.go b/internal/ingest/adapters/icecast/reconnect.go index 44fe2c2..c9e9fac 100644 --- a/internal/ingest/adapters/icecast/reconnect.go +++ b/internal/ingest/adapters/icecast/reconnect.go @@ -20,11 +20,15 @@ func (c ReconnectConfig) nextBackoff(attempt int) time.Duration { if max <= 0 { max = 15000 } + maxD := time.Duration(max) * time.Millisecond d := time.Duration(initial) * time.Millisecond + // BUG-E fix: check d <= 0 (overflow) as well as d >= max. + // int64 overflow after ~63 doublings caused d to go negative, + // producing spurious short backoffs before recovering. for i := 1; i < attempt; i++ { d *= 2 - if d >= time.Duration(max)*time.Millisecond { - return time.Duration(max) * time.Millisecond + if d <= 0 || d >= maxD { + return maxD } } return d diff --git a/internal/ingest/decoder/fallback/ffmpeg.go b/internal/ingest/decoder/fallback/ffmpeg.go index 6dc4198..27f10ee 100644 --- a/internal/ingest/decoder/fallback/ffmpeg.go +++ b/internal/ingest/decoder/fallback/ffmpeg.go @@ -81,7 +81,16 @@ func (d *FFmpegDecoder) DecodeStream(ctx context.Context, r io.Reader, meta deco } }() - stderrData, _ := io.ReadAll(stderr) + // DEADLOCK FIX: stderr and stdout must be drained concurrently. + // Reading stderr synchronously before readPCM means ffmpeg blocks when + // stdout's pipe buffer fills (typically 64KB), which prevents it from + // closing stderr, which prevents ReadAll from returning — deadlock. + var stderrData []byte + wg.Add(1) + go func() { + defer wg.Done() + stderrData, _ = io.ReadAll(stderr) + }() readErr := d.readPCM(ctx, stdout, sampleRate, channels, meta.SourceID, emit) waitErr := cmd.Wait() wg.Wait() diff --git a/internal/output/frame_queue.go b/internal/output/frame_queue.go index 0443eec..223714c 100644 --- a/internal/output/frame_queue.go +++ b/internal/output/frame_queue.go @@ -45,6 +45,9 @@ const ( type FrameQueue struct { capacity int ch chan *CompositeFrame + // closeCh is closed by Close() and used in Push/Pop selects so that + // a concurrent Close() can never race with a channel send. + closeCh chan struct{} mu sync.Mutex depth int @@ -68,6 +71,7 @@ func NewFrameQueue(capacity int) *FrameQueue { fq := &FrameQueue{ capacity: capacity, ch: make(chan *CompositeFrame, capacity), + closeCh: make(chan struct{}), lowWaterMark: capacity, } fq.trackDepth(0) @@ -121,17 +125,18 @@ func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error { if frame == nil { return errors.New("frame required") } - if q.isClosed() { - return ErrFrameQueueClosed - } - // BUG-05 fix: increment depth BEFORE the channel send so that Stats() - // never reports fill=0 while a frame is in the channel awaiting receive. - // On context cancellation, undo the increment. + // BUG-A fix: use closeCh in the select so that a concurrent Close() can + // never race with the send. The old isClosed() pre-check + separate + // ch<- send had a TOCTOU gap that could panic with "send on closed channel". + // BUG-05 fix: increment depth before the send; undo on cancel/close. q.updateDepth(+1) select { case q.ch <- frame: return nil + case <-q.closeCh: + q.updateDepth(-1) + return ErrFrameQueueClosed case <-ctx.Done(): q.updateDepth(-1) q.recordPushTimeout() @@ -160,6 +165,9 @@ func (q *FrameQueue) Close() { q.mu.Lock() q.closed = true q.mu.Unlock() + // Close closeCh first so blocked Push() calls unblock safely + // before close(ch) removes the destination. + close(q.closeCh) close(q.ch) }) }