Parcourir la source

runtime: tighten queue, generator, and late-write semantics

main
Jan il y a 1 mois
Parent
révision
1f49bdd144
3 fichiers modifiés avec 34 ajouts et 16 suppressions
  1. +1
    -1
      internal/app/engine.go
  2. +19
    -4
      internal/offline/generator.go
  3. +14
    -11
      internal/output/frame_queue.go

+ 1
- 1
internal/app/engine.go Voir le fichier

@@ -194,7 +194,7 @@ func (e *Engine) SetStreamSource(src *audio.StreamSource) {
} }
resampler := audio.NewStreamResampler(src, compositeRate) resampler := audio.NewStreamResampler(src, compositeRate)
e.generator.SetExternalSource(resampler) e.generator.SetExternalSource(resampler)
log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
log.Printf("engine: live audio stream wired initial %d Hz → %.0f Hz composite (buffer %d frames); actual decoded rate auto-corrects on first chunk",
src.SampleRate, compositeRate, src.Stats().Capacity) src.SampleRate, compositeRate, src.Stats().Capacity)
} }




+ 19
- 4
internal/offline/generator.go Voir le fichier

@@ -120,8 +120,15 @@ func NewGenerator(cfg cfgpkg.Config) *Generator {


// SetExternalSource sets a live audio source (e.g. StreamResampler) that // SetExternalSource sets a live audio source (e.g. StreamResampler) that
// takes priority over WAV/tone sources. Must be called before the first // takes priority over WAV/tone sources. Must be called before the first
// GenerateFrame() call (i.e. before init).
// GenerateFrame() call; calling it after init() has no effect because
// g.source is already wired to the old source.
func (g *Generator) SetExternalSource(src frameSource) { func (g *Generator) SetExternalSource(src frameSource) {
if g.initialized {
// init() already called sourceFor() and wired g.source. Updating
// g.externalSource here would have no effect on the live DSP chain.
// This is a programming error — log loudly rather than silently break.
panic("generator: SetExternalSource called after GenerateFrame; call it before the engine starts")
}
g.externalSource = src g.externalSource = src
} }


@@ -189,12 +196,14 @@ func (g *Generator) init() {
g.mpxNotch19, g.mpxNotch57 = dsp.NewCompositeProtection(g.sampleRate) g.mpxNotch19, g.mpxNotch57 = dsp.NewCompositeProtection(g.sampleRate)
// BS.412 MPX power limiter (EU/CH requirement for licensed FM) // BS.412 MPX power limiter (EU/CH requirement for licensed FM)
if g.cfg.FM.BS412Enabled { if g.cfg.FM.BS412Enabled {
chunkSec := 0.05 // 50ms chunks (matches engine default)
// chunkSec is not known at init time (Engine.chunkDuration may differ).
// Pass 0 here; GenerateFrame computes the actual chunk duration from
// the real sample count and updates BS.412 accordingly.
g.bs412 = dsp.NewBS412Limiter( g.bs412 = dsp.NewBS412Limiter(
g.cfg.FM.BS412ThresholdDBr, g.cfg.FM.BS412ThresholdDBr,
g.cfg.FM.PilotLevel, g.cfg.FM.PilotLevel,
g.cfg.FM.RDSInjection, g.cfg.FM.RDSInjection,
chunkSec,
0,
) )
} }
if g.cfg.FM.FMModulationEnabled { if g.cfg.FM.FMModulationEnabled {
@@ -360,8 +369,14 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame
} }
} }


// BS.412: feed this chunk's average audio power for next chunk's gain calculation
// BS.412: feed this chunk's actual duration and average audio power for
// the next chunk's gain calculation. Using the real sample count avoids
// the error that occurred when chunkSec was hardcoded to 0.05 — any
// SetChunkDuration() call from the engine would silently miscalibrate
// the ITU-R BS.412 power measurement window.
if g.bs412 != nil && samples > 0 { if g.bs412 != nil && samples > 0 {
chunkSec := float64(samples) / g.sampleRate
g.bs412.UpdateChunkDuration(chunkSec)
g.bs412.ProcessChunk(bs412PowerAccum / float64(samples)) g.bs412.ProcessChunk(bs412PowerAccum / float64(samples))
} }




+ 14
- 11
internal/output/frame_queue.go Voir le fichier

@@ -80,22 +80,19 @@ func (q *FrameQueue) Capacity() int {
} }


// FillLevel reports the current occupancy as a fraction of capacity. // FillLevel reports the current occupancy as a fraction of capacity.
// Uses len(ch) directly for accuracy: updateDepth() is called after the
// channel operation, so q.depth can lag by one frame transiently.
func (q *FrameQueue) FillLevel() float64 { func (q *FrameQueue) FillLevel() float64 {
q.mu.Lock()
depth := q.depth
q.mu.Unlock()
if q.capacity == 0 { if q.capacity == 0 {
return 0 return 0
} }
return float64(depth) / float64(q.capacity)
return float64(len(q.ch)) / float64(q.capacity)
} }


// Depth returns the current number of frames in the queue. // Depth returns the current number of frames in the queue.
// Uses len(ch) directly for accuracy (see FillLevel).
func (q *FrameQueue) Depth() int { func (q *FrameQueue) Depth() int {
q.mu.Lock()
depth := q.depth
q.mu.Unlock()
return depth
return len(q.ch)
} }


// Stats returns a snapshot of the queue metrics. // Stats returns a snapshot of the queue metrics.
@@ -104,7 +101,7 @@ func (q *FrameQueue) Stats() QueueStats {
fill := q.fillLevelLocked() fill := q.fillLevelLocked()
stats := QueueStats{ stats := QueueStats{
Capacity: q.capacity, Capacity: q.capacity,
Depth: q.depth,
Depth: len(q.ch),
FillLevel: fill, FillLevel: fill,
Health: queueHealthFromFill(fill), Health: queueHealthFromFill(fill),
HighWaterMark: q.highWaterMark, HighWaterMark: q.highWaterMark,
@@ -128,11 +125,15 @@ func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error {
return ErrFrameQueueClosed return ErrFrameQueueClosed
} }


// BUG-05 fix: increment depth BEFORE the channel send so that Stats()
// never reports fill=0 while a frame is in the channel awaiting receive.
// On context cancellation, undo the increment.
q.updateDepth(+1)
select { select {
case q.ch <- frame: case q.ch <- frame:
q.updateDepth(+1)
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
q.updateDepth(-1)
q.recordPushTimeout() q.recordPushTimeout()
return ctx.Err() return ctx.Err()
} }
@@ -211,7 +212,9 @@ func (q *FrameQueue) fillLevelLocked() float64 {
if q.capacity == 0 { if q.capacity == 0 {
return 0 return 0
} }
return float64(q.depth) / float64(q.capacity)
// Use len(ch) rather than q.depth: depth is updated after the channel
// operation, so it can be off by one during the Push/Pop window.
return float64(len(q.ch)) / float64(q.capacity)
} }


func (q *FrameQueue) recordPushTimeout() { func (q *FrameQueue) recordPushTimeout() {


Chargement…
Annuler
Enregistrer