diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index 700913f..5354f08 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -193,6 +193,7 @@ func runTXMode(cfg cfgpkg.Config, configPath string, driver platform.SoapyDriver log.Fatalf("ingest source: %v", err) } runtimeOpts := []ingest.RuntimeOption{} + runtimeOpts = append(runtimeOpts, ingest.WithPrebufferMs(cfg.Ingest.PrebufferMs)) if cfg.Ingest.Icecast.RadioText.Enabled { relay := icecast.NewRadioTextRelay( icecast.RadioTextOptions{ diff --git a/internal/ingest/runtime.go b/internal/ingest/runtime.go index 46f45d0..5c6167d 100644 --- a/internal/ingest/runtime.go +++ b/internal/ingest/runtime.go @@ -25,6 +25,8 @@ type Runtime struct { prebufferFrames int gateOpen bool seenChunk bool + lastDrainAt time.Time + drainAllowance float64 mu sync.RWMutex active SourceDescriptor @@ -121,6 +123,8 @@ func (r *Runtime) Start(ctx context.Context) error { r.stats.WriteBlocked = false r.gateOpen = false r.seenChunk = false + r.lastDrainAt = time.Now() + r.drainAllowance = 0 r.work.reset() r.updateBufferedStatsLocked() r.mu.Unlock() @@ -241,7 +245,9 @@ func (r *Runtime) handleChunk(chunk PCMChunk) { func (r *Runtime) drainWorkingBuffer() { r.mu.Lock() defer r.mu.Unlock() + now := time.Now() if r.sink == nil { + r.resetDrainPacerLocked(now) r.updateBufferedStatsLocked() return } @@ -258,20 +264,25 @@ func (r *Runtime) drainWorkingBuffer() { } r.stats.Prebuffering = false r.stats.WriteBlocked = false + r.resetDrainPacerLocked(now) r.updateBufferedStatsLocked() return case r.prebufferFrames > 0 && bufferedFrames < r.prebufferFrames: r.stats.State = "prebuffering" r.stats.Prebuffering = true r.stats.WriteBlocked = false + r.resetDrainPacerLocked(now) r.updateBufferedStatsLocked() return default: r.gateOpen = true + r.resetDrainPacerLocked(now) } } writeBlocked := false - for r.work.available() > 0 { + limit := r.pacedDrainLimitLocked(now, bufferedFrames) + written := 0 + for written < limit && r.work.available() > 0 { frame, ok := r.work.peek() if !ok { break @@ -281,10 +292,18 @@ func (r *Runtime) drainWorkingBuffer() { break } r.work.pop() + written++ + } + if written > 0 { + r.drainAllowance -= float64(written) + if r.drainAllowance < 0 { + r.drainAllowance = 0 + } } if r.work.available() == 0 && r.prebufferFrames > 0 { // Re-arm the gate after dry-out to rebuild margin before resuming. r.gateOpen = false + r.resetDrainPacerLocked(now) } r.stats.Prebuffering = false r.stats.WriteBlocked = writeBlocked @@ -296,6 +315,62 @@ func (r *Runtime) drainWorkingBuffer() { r.updateBufferedStatsLocked() } +func (r *Runtime) pacedDrainLimitLocked(now time.Time, bufferedFrames int) int { + if bufferedFrames <= 0 { + return 0 + } + rate := r.workSampleRate + if r.sink != nil && r.sink.SampleRate > 0 { + rate = r.sink.SampleRate + } + if rate <= 0 { + return bufferedFrames + } + if !r.lastDrainAt.IsZero() { + elapsed := now.Sub(r.lastDrainAt) + if elapsed > 0 { + r.drainAllowance += elapsed.Seconds() * float64(rate) + } + } + r.lastDrainAt = now + maxAllowance := maxInt(1, rate/5) // cap accumulated credit at 200 ms + if r.drainAllowance > float64(maxAllowance) { + r.drainAllowance = float64(maxAllowance) + } + limit := int(r.drainAllowance) + if limit <= 0 { + return 0 + } + maxBurst := maxInt(1, rate/50) // max 20 ms worth of frames per drain call + if limit > maxBurst { + limit = maxBurst + } + sinkStats := r.sink.Stats() + headroom := sinkStats.Capacity - sinkStats.Available + if headroom < 0 { + headroom = 0 + } + if limit > headroom { + limit = headroom + } + if limit > bufferedFrames { + limit = bufferedFrames + } + return limit +} + +func (r *Runtime) resetDrainPacerLocked(now time.Time) { + r.lastDrainAt = now + r.drainAllowance = 0 +} + +func maxInt(a, b int) int { + if a > b { + return a + } + return b +} + func (r *Runtime) updateBufferedStatsLocked() { available := r.work.available() capacity := r.work.capacity() diff --git a/internal/ingest/runtime_test.go b/internal/ingest/runtime_test.go index b8f65a9..0b351e4 100644 --- a/internal/ingest/runtime_test.go +++ b/internal/ingest/runtime_test.go @@ -216,10 +216,11 @@ func TestRuntimeWriteBlockedRetainsWorkingBuffer(t *testing.T) { SampleRateHz: 1000, Samples: stereoSamples(4, 200), } - waitForRuntimeState(t, rt, "degraded") + waitForSinkFrames(t, sink, 1) + waitForRuntimeState(t, rt, "running") stats := rt.Stats() - if !stats.Runtime.WriteBlocked { - t.Fatalf("runtime writeBlocked=%t want true", stats.Runtime.WriteBlocked) + if stats.Runtime.WriteBlocked { + t.Fatalf("runtime writeBlocked=%t want false", stats.Runtime.WriteBlocked) } if stats.Runtime.BufferedSeconds <= 0 { t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds) @@ -227,6 +228,67 @@ func TestRuntimeWriteBlockedRetainsWorkingBuffer(t *testing.T) { if stats.Runtime.DroppedFrames != 0 { t.Fatalf("runtime droppedFrames=%d want 0", stats.Runtime.DroppedFrames) } + if got := sink.Stats().Overflows; got != 0 { + t.Fatalf("sink overflows=%d want 0", got) + } +} + +func TestRuntimeDrainWorkingBufferIsBurstBounded(t *testing.T) { + sink := audio.NewStreamSource(64, 1000) + rt := NewRuntime(sink, nil) + + rt.gateOpen = true + for i := 0; i < 40; i++ { + if !rt.work.push(audio.NewFrame(0.1, -0.1)) { + t.Fatalf("failed to seed work frame %d", i) + } + } + rt.lastDrainAt = time.Now().Add(-time.Second) + + rt.drainWorkingBuffer() + + if got := sink.Available(); got != 20 { + t.Fatalf("sink available=%d want 20 (20ms burst at 1kHz)", got) + } + if got := rt.work.available(); got != 20 { + t.Fatalf("work available=%d want 20", got) + } + if got := rt.Stats().Runtime.WriteBlocked; got { + t.Fatalf("runtime writeBlocked=%t want false", got) + } +} + +func TestRuntimeDrainWorkingBufferHonorsSinkHeadroom(t *testing.T) { + sink := audio.NewStreamSource(64, 1000) + rt := NewRuntime(sink, nil) + + for i := 0; i < 63; i++ { + if !sink.WriteFrame(audio.NewFrame(0.2, -0.2)) { + t.Fatalf("failed to seed sink frame %d", i) + } + } + rt.gateOpen = true + for i := 0; i < 8; i++ { + if !rt.work.push(audio.NewFrame(0.3, -0.3)) { + t.Fatalf("failed to seed work frame %d", i) + } + } + rt.lastDrainAt = time.Now().Add(-time.Second) + + rt.drainWorkingBuffer() + + if got := sink.Available(); got != 64 { + t.Fatalf("sink available=%d want 64", got) + } + if got := rt.work.available(); got != 7 { + t.Fatalf("work available=%d want 7", got) + } + if got := sink.Stats().Overflows; got != 0 { + t.Fatalf("sink overflows=%d want 0", got) + } + if got := rt.Stats().Runtime.WriteBlocked; got { + t.Fatalf("runtime writeBlocked=%t want false", got) + } } func TestRuntimeStatsSourceBufferedSecondsIncludesWorkingBuffer(t *testing.T) {