| @@ -147,6 +147,7 @@ type Engine struct { | |||||
| startedAt time.Time | startedAt time.Time | ||||
| wg sync.WaitGroup | wg sync.WaitGroup | ||||
| runtimeState atomic.Value | runtimeState atomic.Value | ||||
| stateMu sync.Mutex // guards setRuntimeState check-then-store (NEW-2 fix) | |||||
| chunksProduced atomic.Uint64 | chunksProduced atomic.Uint64 | ||||
| totalSamples atomic.Uint64 | totalSamples atomic.Uint64 | ||||
| @@ -571,13 +572,9 @@ func (e *Engine) run(ctx context.Context) { | |||||
| updateMaxDuration(&e.maxGenerateNs, genDur) | updateMaxDuration(&e.maxGenerateNs, genDur) | ||||
| updateMaxDuration(&e.maxUpsampleNs, upDur) | updateMaxDuration(&e.maxUpsampleNs, upDur) | ||||
| // cloneFrame never returns nil when src is non-nil (NEW-3: dead nil check removed) | |||||
| enqueued := cloneFrame(frame) | enqueued := cloneFrame(frame) | ||||
| enqueued.EnqueuedAt = time.Now() | enqueued.EnqueuedAt = time.Now() | ||||
| if enqueued == nil { | |||||
| e.lastError.Store("engine: frame clone failed") | |||||
| e.underruns.Add(1) | |||||
| continue | |||||
| } | |||||
| if err := e.frameQueue.Push(ctx, enqueued); err != nil { | if err := e.frameQueue.Push(ctx, enqueued); err != nil { | ||||
| if ctx.Err() != nil { | if ctx.Err() != nil { | ||||
| @@ -699,6 +696,11 @@ func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame { | |||||
| } | } | ||||
| func (e *Engine) setRuntimeState(state RuntimeState) { | func (e *Engine) setRuntimeState(state RuntimeState) { | ||||
| // NEW-2 fix: hold stateMu so that concurrent calls from run() and | |||||
| // writerLoop() cannot both see prev != state and both record a | |||||
| // spurious duplicate transition. | |||||
| e.stateMu.Lock() | |||||
| defer e.stateMu.Unlock() | |||||
| now := time.Now() | now := time.Now() | ||||
| prev := e.currentRuntimeState() | prev := e.currentRuntimeState() | ||||
| if prev != state { | if prev != state { | ||||
| @@ -4,6 +4,7 @@ import ( | |||||
| "encoding/json" | "encoding/json" | ||||
| "fmt" | "fmt" | ||||
| "os" | "os" | ||||
| "path/filepath" | |||||
| "strconv" | "strconv" | ||||
| "strings" | "strings" | ||||
| ) | ) | ||||
| @@ -140,7 +141,8 @@ type IngestAES67DiscoveryConfig struct { | |||||
| func Default() Config { | func Default() Config { | ||||
| return Config{ | return Config{ | ||||
| Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0.4}, | |||||
| // BUG-C fix: tones off by default (was 0.4 — caused unintended audio output). | |||||
| Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0}, | |||||
| RDS: RDSConfig{Enabled: true, PI: "1234", PS: "FMRTX", RadioText: "fm-rds-tx", PTY: 0}, | RDS: RDSConfig{Enabled: true, PI: "1234", PS: "FMRTX", RadioText: "fm-rds-tx", PTY: 0}, | ||||
| FM: FMConfig{ | FM: FMConfig{ | ||||
| FrequencyMHz: 100.0, | FrequencyMHz: 100.0, | ||||
| @@ -249,15 +251,43 @@ func Save(path string, cfg Config) error { | |||||
| return err | return err | ||||
| } | } | ||||
| data = append(data, '\n') | data = append(data, '\n') | ||||
| return os.WriteFile(path, data, 0o644) | |||||
| // NEW-1 fix: write to a temp file in the same directory, then rename atomically. | |||||
| // A direct os.WriteFile on the target leaves a corrupt file if the process | |||||
| // crashes mid-write. os.Rename is atomic on POSIX filesystems. | |||||
| dir := filepath.Dir(path) | |||||
| tmp, err := os.CreateTemp(dir, ".fmrtx-config-*.json.tmp") | |||||
| if err != nil { | |||||
| return fmt.Errorf("config save: create temp: %w", err) | |||||
| } | |||||
| tmpPath := tmp.Name() | |||||
| if _, err := tmp.Write(data); err != nil { | |||||
| _ = tmp.Close() | |||||
| _ = os.Remove(tmpPath) | |||||
| return fmt.Errorf("config save: write temp: %w", err) | |||||
| } | |||||
| if err := tmp.Sync(); err != nil { | |||||
| _ = tmp.Close() | |||||
| _ = os.Remove(tmpPath) | |||||
| return fmt.Errorf("config save: sync temp: %w", err) | |||||
| } | |||||
| if err := tmp.Close(); err != nil { | |||||
| _ = os.Remove(tmpPath) | |||||
| return fmt.Errorf("config save: close temp: %w", err) | |||||
| } | |||||
| if err := os.Rename(tmpPath, path); err != nil { | |||||
| _ = os.Remove(tmpPath) | |||||
| return fmt.Errorf("config save: rename: %w", err) | |||||
| } | |||||
| return nil | |||||
| } | } | ||||
| func (c Config) Validate() error { | func (c Config) Validate() error { | ||||
| if c.Audio.Gain < 0 || c.Audio.Gain > 4 { | if c.Audio.Gain < 0 || c.Audio.Gain > 4 { | ||||
| return fmt.Errorf("audio.gain out of range") | return fmt.Errorf("audio.gain out of range") | ||||
| } | } | ||||
| if c.Audio.ToneLeftHz <= 0 || c.Audio.ToneRightHz <= 0 { | |||||
| return fmt.Errorf("audio tone frequencies must be positive") | |||||
| // BUG-B fix: only enforce positive freq when amplitude is non-zero. | |||||
| if c.Audio.ToneAmplitude > 0 && (c.Audio.ToneLeftHz <= 0 || c.Audio.ToneRightHz <= 0) { | |||||
| return fmt.Errorf("audio tone frequencies must be positive when toneAmplitude > 0") | |||||
| } | } | ||||
| if c.Audio.ToneAmplitude < 0 || c.Audio.ToneAmplitude > 1 { | if c.Audio.ToneAmplitude < 0 || c.Audio.ToneAmplitude > 1 { | ||||
| return fmt.Errorf("audio.toneAmplitude out of range") | return fmt.Errorf("audio.toneAmplitude out of range") | ||||
| @@ -411,11 +441,15 @@ func (c Config) Validate() error { | |||||
| if c.Ingest.Icecast.RadioText.MaxLen < 0 || c.Ingest.Icecast.RadioText.MaxLen > 64 { | if c.Ingest.Icecast.RadioText.MaxLen < 0 || c.Ingest.Icecast.RadioText.MaxLen > 64 { | ||||
| return fmt.Errorf("ingest.icecast.radioText.maxLen out of range (0-64)") | return fmt.Errorf("ingest.icecast.radioText.maxLen out of range (0-64)") | ||||
| } | } | ||||
| // Fail-loud PI validation | |||||
| if c.RDS.Enabled { | |||||
| // BUG-D fix: validate PI whenever non-empty, not only when RDS is enabled. | |||||
| // An invalid PI stored in config causes a silent failure when RDS is later | |||||
| // enabled via live-patch. | |||||
| if strings.TrimSpace(c.RDS.PI) != "" { | |||||
| if _, err := ParsePI(c.RDS.PI); err != nil { | if _, err := ParsePI(c.RDS.PI); err != nil { | ||||
| return fmt.Errorf("rds config: %w", err) | return fmt.Errorf("rds config: %w", err) | ||||
| } | } | ||||
| } else if c.RDS.Enabled { | |||||
| return fmt.Errorf("rds.pi is required when rds.enabled is true") | |||||
| } | } | ||||
| if c.RDS.PTY < 0 || c.RDS.PTY > 31 { | if c.RDS.PTY < 0 || c.RDS.PTY > 31 { | ||||
| return fmt.Errorf("rds.pty out of range (0-31)") | return fmt.Errorf("rds.pty out of range (0-31)") | ||||
| @@ -61,6 +61,9 @@ type Server struct { | |||||
| ingestRt IngestRuntime // optional, for /runtime ingest stats | ingestRt IngestRuntime // optional, for /runtime ingest stats | ||||
| saveConfig func(config.Config) error | saveConfig func(config.Config) error | ||||
| hardReload func() | hardReload func() | ||||
| // BUG-F fix: reloadPending prevents multiple concurrent goroutines from | |||||
| // calling hardReload when handleIngestSave is hit multiple times quickly. | |||||
| reloadPending atomic.Bool | |||||
| audit auditCounters | audit auditCounters | ||||
| } | } | ||||
| @@ -693,9 +696,10 @@ func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) { | |||||
| "saved": true, | "saved": true, | ||||
| "reloadScheduled": reloadScheduled, | "reloadScheduled": reloadScheduled, | ||||
| }) | }) | ||||
| if reloadScheduled { | |||||
| if reloadScheduled && s.reloadPending.CompareAndSwap(false, true) { | |||||
| go func(fn func()) { | go func(fn func()) { | ||||
| time.Sleep(250 * time.Millisecond) | time.Sleep(250 * time.Millisecond) | ||||
| s.reloadPending.Store(false) | |||||
| fn() | fn() | ||||
| }(reload) | }(reload) | ||||
| } | } | ||||
| @@ -115,6 +115,13 @@ func (l *BS412Limiter) ProcessChunk(audioPower float64) float64 { | |||||
| old := l.powerBuf[l.bufIdx] | old := l.powerBuf[l.bufIdx] | ||||
| l.powerBuf[l.bufIdx] = audioPower | l.powerBuf[l.bufIdx] = audioPower | ||||
| l.powerSum += audioPower - old | l.powerSum += audioPower - old | ||||
| // BUG-G fix: float64 accumulation over 1200+ chunks can drift slightly | |||||
| // negative due to rounding. A negative powerSum → negative avgPower → | |||||
| // math.Sqrt of negative → NaN → gain becomes NaN, silently disabling | |||||
| // the limiter. Clamp to zero to keep the invariant powerSum >= 0. | |||||
| if l.powerSum < 0 { | |||||
| l.powerSum = 0 | |||||
| } | |||||
| l.bufIdx++ | l.bufIdx++ | ||||
| if l.bufIdx >= len(l.powerBuf) { | if l.bufIdx >= len(l.powerBuf) { | ||||
| l.bufIdx = 0 | l.bufIdx = 0 | ||||
| @@ -20,11 +20,15 @@ func (c ReconnectConfig) nextBackoff(attempt int) time.Duration { | |||||
| if max <= 0 { | if max <= 0 { | ||||
| max = 15000 | max = 15000 | ||||
| } | } | ||||
| maxD := time.Duration(max) * time.Millisecond | |||||
| d := time.Duration(initial) * time.Millisecond | d := time.Duration(initial) * time.Millisecond | ||||
| // BUG-E fix: check d <= 0 (overflow) as well as d >= max. | |||||
| // int64 overflow after ~63 doublings caused d to go negative, | |||||
| // producing spurious short backoffs before recovering. | |||||
| for i := 1; i < attempt; i++ { | for i := 1; i < attempt; i++ { | ||||
| d *= 2 | d *= 2 | ||||
| if d >= time.Duration(max)*time.Millisecond { | |||||
| return time.Duration(max) * time.Millisecond | |||||
| if d <= 0 || d >= maxD { | |||||
| return maxD | |||||
| } | } | ||||
| } | } | ||||
| return d | return d | ||||
| @@ -81,7 +81,16 @@ func (d *FFmpegDecoder) DecodeStream(ctx context.Context, r io.Reader, meta deco | |||||
| } | } | ||||
| }() | }() | ||||
| stderrData, _ := io.ReadAll(stderr) | |||||
| // DEADLOCK FIX: stderr and stdout must be drained concurrently. | |||||
| // Reading stderr synchronously before readPCM means ffmpeg blocks when | |||||
| // stdout's pipe buffer fills (typically 64KB), which prevents it from | |||||
| // closing stderr, which prevents ReadAll from returning — deadlock. | |||||
| var stderrData []byte | |||||
| wg.Add(1) | |||||
| go func() { | |||||
| defer wg.Done() | |||||
| stderrData, _ = io.ReadAll(stderr) | |||||
| }() | |||||
| readErr := d.readPCM(ctx, stdout, sampleRate, channels, meta.SourceID, emit) | readErr := d.readPCM(ctx, stdout, sampleRate, channels, meta.SourceID, emit) | ||||
| waitErr := cmd.Wait() | waitErr := cmd.Wait() | ||||
| wg.Wait() | wg.Wait() | ||||
| @@ -45,6 +45,9 @@ const ( | |||||
| type FrameQueue struct { | type FrameQueue struct { | ||||
| capacity int | capacity int | ||||
| ch chan *CompositeFrame | ch chan *CompositeFrame | ||||
| // closeCh is closed by Close() and used in Push/Pop selects so that | |||||
| // a concurrent Close() can never race with a channel send. | |||||
| closeCh chan struct{} | |||||
| mu sync.Mutex | mu sync.Mutex | ||||
| depth int | depth int | ||||
| @@ -68,6 +71,7 @@ func NewFrameQueue(capacity int) *FrameQueue { | |||||
| fq := &FrameQueue{ | fq := &FrameQueue{ | ||||
| capacity: capacity, | capacity: capacity, | ||||
| ch: make(chan *CompositeFrame, capacity), | ch: make(chan *CompositeFrame, capacity), | ||||
| closeCh: make(chan struct{}), | |||||
| lowWaterMark: capacity, | lowWaterMark: capacity, | ||||
| } | } | ||||
| fq.trackDepth(0) | fq.trackDepth(0) | ||||
| @@ -121,17 +125,18 @@ func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error { | |||||
| if frame == nil { | if frame == nil { | ||||
| return errors.New("frame required") | return errors.New("frame required") | ||||
| } | } | ||||
| if q.isClosed() { | |||||
| return ErrFrameQueueClosed | |||||
| } | |||||
| // BUG-05 fix: increment depth BEFORE the channel send so that Stats() | |||||
| // never reports fill=0 while a frame is in the channel awaiting receive. | |||||
| // On context cancellation, undo the increment. | |||||
| // BUG-A fix: use closeCh in the select so that a concurrent Close() can | |||||
| // never race with the send. The old isClosed() pre-check + separate | |||||
| // ch<- send had a TOCTOU gap that could panic with "send on closed channel". | |||||
| // BUG-05 fix: increment depth before the send; undo on cancel/close. | |||||
| q.updateDepth(+1) | q.updateDepth(+1) | ||||
| select { | select { | ||||
| case q.ch <- frame: | case q.ch <- frame: | ||||
| return nil | return nil | ||||
| case <-q.closeCh: | |||||
| q.updateDepth(-1) | |||||
| return ErrFrameQueueClosed | |||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| q.updateDepth(-1) | q.updateDepth(-1) | ||||
| q.recordPushTimeout() | q.recordPushTimeout() | ||||
| @@ -160,6 +165,9 @@ func (q *FrameQueue) Close() { | |||||
| q.mu.Lock() | q.mu.Lock() | ||||
| q.closed = true | q.closed = true | ||||
| q.mu.Unlock() | q.mu.Unlock() | ||||
| // Close closeCh first so blocked Push() calls unblock safely | |||||
| // before close(ch) removes the destination. | |||||
| close(q.closeCh) | |||||
| close(q.ch) | close(q.ch) | ||||
| }) | }) | ||||
| } | } | ||||