From 1f49bdd14442252e03bd95def03a1d3fa6c99823 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 22:07:04 +0200 Subject: [PATCH] runtime: tighten queue, generator, and late-write semantics --- internal/app/engine.go | 2 +- internal/offline/generator.go | 23 +++++++++++++++++++---- internal/output/frame_queue.go | 25 ++++++++++++++----------- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/internal/app/engine.go b/internal/app/engine.go index c25537b..b4a7707 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -194,7 +194,7 @@ func (e *Engine) SetStreamSource(src *audio.StreamSource) { } resampler := audio.NewStreamResampler(src, compositeRate) e.generator.SetExternalSource(resampler) - log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)", + log.Printf("engine: live audio stream wired — initial %d Hz → %.0f Hz composite (buffer %d frames); actual decoded rate auto-corrects on first chunk", src.SampleRate, compositeRate, src.Stats().Capacity) } diff --git a/internal/offline/generator.go b/internal/offline/generator.go index dd6afde..6598087 100644 --- a/internal/offline/generator.go +++ b/internal/offline/generator.go @@ -120,8 +120,15 @@ func NewGenerator(cfg cfgpkg.Config) *Generator { // SetExternalSource sets a live audio source (e.g. StreamResampler) that // takes priority over WAV/tone sources. Must be called before the first -// GenerateFrame() call (i.e. before init). +// GenerateFrame() call; calling it after init() has no effect because +// g.source is already wired to the old source. func (g *Generator) SetExternalSource(src frameSource) { + if g.initialized { + // init() already called sourceFor() and wired g.source. Updating + // g.externalSource here would have no effect on the live DSP chain. + // This is a programming error — log loudly rather than silently break. + panic("generator: SetExternalSource called after GenerateFrame; call it before the engine starts") + } g.externalSource = src } @@ -189,12 +196,14 @@ func (g *Generator) init() { g.mpxNotch19, g.mpxNotch57 = dsp.NewCompositeProtection(g.sampleRate) // BS.412 MPX power limiter (EU/CH requirement for licensed FM) if g.cfg.FM.BS412Enabled { - chunkSec := 0.05 // 50ms chunks (matches engine default) + // chunkSec is not known at init time (Engine.chunkDuration may differ). + // Pass 0 here; GenerateFrame computes the actual chunk duration from + // the real sample count and updates BS.412 accordingly. g.bs412 = dsp.NewBS412Limiter( g.cfg.FM.BS412ThresholdDBr, g.cfg.FM.PilotLevel, g.cfg.FM.RDSInjection, - chunkSec, + 0, ) } if g.cfg.FM.FMModulationEnabled { @@ -360,8 +369,14 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame } } - // BS.412: feed this chunk's average audio power for next chunk's gain calculation + // BS.412: feed this chunk's actual duration and average audio power for + // the next chunk's gain calculation. Using the real sample count avoids + // the error that occurred when chunkSec was hardcoded to 0.05 — any + // SetChunkDuration() call from the engine would silently miscalibrate + // the ITU-R BS.412 power measurement window. if g.bs412 != nil && samples > 0 { + chunkSec := float64(samples) / g.sampleRate + g.bs412.UpdateChunkDuration(chunkSec) g.bs412.ProcessChunk(bs412PowerAccum / float64(samples)) } diff --git a/internal/output/frame_queue.go b/internal/output/frame_queue.go index e3db114..0443eec 100644 --- a/internal/output/frame_queue.go +++ b/internal/output/frame_queue.go @@ -80,22 +80,19 @@ func (q *FrameQueue) Capacity() int { } // FillLevel reports the current occupancy as a fraction of capacity. +// Uses len(ch) directly for accuracy: updateDepth() is called after the +// channel operation, so q.depth can lag by one frame transiently. func (q *FrameQueue) FillLevel() float64 { - q.mu.Lock() - depth := q.depth - q.mu.Unlock() if q.capacity == 0 { return 0 } - return float64(depth) / float64(q.capacity) + return float64(len(q.ch)) / float64(q.capacity) } // Depth returns the current number of frames in the queue. +// Uses len(ch) directly for accuracy (see FillLevel). func (q *FrameQueue) Depth() int { - q.mu.Lock() - depth := q.depth - q.mu.Unlock() - return depth + return len(q.ch) } // Stats returns a snapshot of the queue metrics. @@ -104,7 +101,7 @@ func (q *FrameQueue) Stats() QueueStats { fill := q.fillLevelLocked() stats := QueueStats{ Capacity: q.capacity, - Depth: q.depth, + Depth: len(q.ch), FillLevel: fill, Health: queueHealthFromFill(fill), HighWaterMark: q.highWaterMark, @@ -128,11 +125,15 @@ func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error { return ErrFrameQueueClosed } + // BUG-05 fix: increment depth BEFORE the channel send so that Stats() + // never reports fill=0 while a frame is in the channel awaiting receive. + // On context cancellation, undo the increment. + q.updateDepth(+1) select { case q.ch <- frame: - q.updateDepth(+1) return nil case <-ctx.Done(): + q.updateDepth(-1) q.recordPushTimeout() return ctx.Err() } @@ -211,7 +212,9 @@ func (q *FrameQueue) fillLevelLocked() float64 { if q.capacity == 0 { return 0 } - return float64(q.depth) / float64(q.capacity) + // Use len(ch) rather than q.depth: depth is updated after the channel + // operation, so it can be off by one during the Push/Pop window. + return float64(len(q.ch)) / float64(q.capacity) } func (q *FrameQueue) recordPushTimeout() {