From 33b9640ef02134fc3ac823d564e93f42d45d9fd3 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 21:06:42 +0200 Subject: [PATCH] ingest: add runtime working buffer and prebuffer gate --- internal/ingest/runtime.go | 217 ++++++++++++++++++++++++++++++-- internal/ingest/runtime_test.go | 109 +++++++++++++++- internal/ingest/stats.go | 14 ++- 3 files changed, 321 insertions(+), 19 deletions(-) diff --git a/internal/ingest/runtime.go b/internal/ingest/runtime.go index a6741b8..46f45d0 100644 --- a/internal/ingest/runtime.go +++ b/internal/ingest/runtime.go @@ -10,15 +10,22 @@ import ( ) type Runtime struct { - sink *audio.StreamSource - source Source - started atomic.Bool - onTitle func(string) + sink *audio.StreamSource + source Source + started atomic.Bool + onTitle func(string) + prebuffer time.Duration ctx context.Context cancel context.CancelFunc wg sync.WaitGroup + work *frameBuffer + workSampleRate int + prebufferFrames int + gateOpen bool + seenChunk bool + mu sync.RWMutex active SourceDescriptor stats RuntimeStats @@ -32,10 +39,40 @@ func WithStreamTitleHandler(handler func(string)) RuntimeOption { } } +func WithPrebuffer(d time.Duration) RuntimeOption { + return func(r *Runtime) { + if d < 0 { + d = 0 + } + r.prebuffer = d + } +} + +func WithPrebufferMs(ms int) RuntimeOption { + return func(r *Runtime) { + if ms < 0 { + ms = 0 + } + r.prebuffer = time.Duration(ms) * time.Millisecond + } +} + func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime { + sampleRate := 44100 + capacity := 1024 + if sink != nil { + if sink.SampleRate > 0 { + sampleRate = sink.SampleRate + } + if sinkCap := sink.Stats().Capacity; sinkCap > 0 { + capacity = sinkCap * 2 + } + } r := &Runtime{ - sink: sink, - source: src, + sink: sink, + source: src, + work: newFrameBuffer(capacity), + workSampleRate: sampleRate, stats: RuntimeStats{ State: "idle", }, @@ -45,6 +82,17 @@ func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Ru opt(r) } } + if r.workSampleRate > 0 && r.prebuffer > 0 { + r.prebufferFrames = int(r.prebuffer.Seconds() * float64(r.workSampleRate)) + } + minCapacity := 256 + if r.prebufferFrames > 0 && minCapacity < r.prebufferFrames*2 { + minCapacity = r.prebufferFrames * 2 + } + if r.work == nil || r.work.capacity() < minCapacity { + r.work = newFrameBuffer(minCapacity) + } + r.updateBufferedStatsLocked() return r } @@ -69,6 +117,12 @@ func (r *Runtime) Start(ctx context.Context) error { r.mu.Lock() r.active = r.source.Descriptor() r.stats.State = "starting" + r.stats.Prebuffering = false + r.stats.WriteBlocked = false + r.gateOpen = false + r.seenChunk = false + r.work.reset() + r.updateBufferedStatsLocked() r.mu.Unlock() if err := r.source.Start(r.ctx); err != nil { r.started.Store(false) @@ -102,12 +156,11 @@ func (r *Runtime) Stop() error { func (r *Runtime) run() { defer r.wg.Done() - r.mu.Lock() - r.stats.State = "running" - r.mu.Unlock() ch := r.source.Chunks() errCh := r.source.Errors() + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() var titleCh <-chan string if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil { titleCh = src.StreamTitleUpdates() @@ -126,15 +179,19 @@ func (r *Runtime) run() { } r.mu.Lock() r.stats.State = "degraded" + r.stats.Prebuffering = false r.mu.Unlock() case chunk, ok := <-ch: if !ok { r.mu.Lock() r.stats.State = "stopped" + r.stats.Prebuffering = false r.mu.Unlock() return } r.handleChunk(chunk) + case <-ticker.C: + r.drainWorkingBuffer() case title, ok := <-titleCh: if !ok { titleCh = nil @@ -146,6 +203,10 @@ func (r *Runtime) run() { } func (r *Runtime) handleChunk(chunk PCMChunk) { + r.mu.Lock() + r.seenChunk = true + r.mu.Unlock() + frames, err := ChunkToFrames(chunk) if err != nil { r.mu.Lock() @@ -156,7 +217,7 @@ func (r *Runtime) handleChunk(chunk PCMChunk) { } dropped := uint64(0) for _, frame := range frames { - if !r.sink.WriteFrame(frame) { + if !r.work.push(frame) { dropped++ } } @@ -167,11 +228,87 @@ func (r *Runtime) handleChunk(chunk PCMChunk) { if chunk.Channels > 0 { r.active.Channels = chunk.Channels } - r.stats.State = "running" r.stats.LastChunkAt = time.Now() r.stats.DroppedFrames += dropped - r.stats.WriteBlocked = dropped > 0 + if dropped > 0 { + r.stats.State = "degraded" + } + r.updateBufferedStatsLocked() r.mu.Unlock() + r.drainWorkingBuffer() +} + +func (r *Runtime) drainWorkingBuffer() { + r.mu.Lock() + defer r.mu.Unlock() + if r.sink == nil { + r.updateBufferedStatsLocked() + return + } + bufferedFrames := r.work.available() + if !r.gateOpen { + switch { + case bufferedFrames == 0: + if r.stats.State == "degraded" { + // Keep degraded visible until fresh audio recovers runtime. + } else if !r.seenChunk { + r.stats.State = "starting" + } else if r.stats.State != "degraded" { + r.stats.State = "running" + } + r.stats.Prebuffering = false + r.stats.WriteBlocked = false + r.updateBufferedStatsLocked() + return + case r.prebufferFrames > 0 && bufferedFrames < r.prebufferFrames: + r.stats.State = "prebuffering" + r.stats.Prebuffering = true + r.stats.WriteBlocked = false + r.updateBufferedStatsLocked() + return + default: + r.gateOpen = true + } + } + writeBlocked := false + for r.work.available() > 0 { + frame, ok := r.work.peek() + if !ok { + break + } + if !r.sink.WriteFrame(frame) { + writeBlocked = true + break + } + r.work.pop() + } + if r.work.available() == 0 && r.prebufferFrames > 0 { + // Re-arm the gate after dry-out to rebuild margin before resuming. + r.gateOpen = false + } + r.stats.Prebuffering = false + r.stats.WriteBlocked = writeBlocked + if writeBlocked { + r.stats.State = "degraded" + } else { + r.stats.State = "running" + } + r.updateBufferedStatsLocked() +} + +func (r *Runtime) updateBufferedStatsLocked() { + available := r.work.available() + capacity := r.work.capacity() + buffered := 0.0 + if capacity > 0 { + buffered = float64(available) / float64(capacity) + } + bufferedSeconds := 0.0 + if r.workSampleRate > 0 { + bufferedSeconds = float64(available) / float64(r.workSampleRate) + } + r.stats.Buffered = buffered + r.stats.BufferedSeconds = bufferedSeconds } func (r *Runtime) Stats() Stats { @@ -184,9 +321,65 @@ func (r *Runtime) Stats() Stats { if r.source != nil { sourceStats = r.source.Stats() } + if sourceStats.BufferedSeconds < runtimeStats.BufferedSeconds { + sourceStats.BufferedSeconds = runtimeStats.BufferedSeconds + } return Stats{ Active: active, Source: sourceStats, Runtime: runtimeStats, } } + +type frameBuffer struct { + frames []audio.Frame + head int + len int +} + +func newFrameBuffer(capacity int) *frameBuffer { + if capacity < 1 { + capacity = 1 + } + return &frameBuffer{frames: make([]audio.Frame, capacity)} +} + +func (b *frameBuffer) capacity() int { + return len(b.frames) +} + +func (b *frameBuffer) available() int { + return b.len +} + +func (b *frameBuffer) reset() { + b.head = 0 + b.len = 0 +} + +func (b *frameBuffer) push(frame audio.Frame) bool { + if b.len >= len(b.frames) { + return false + } + idx := (b.head + b.len) % len(b.frames) + b.frames[idx] = frame + b.len++ + return true +} + +func (b *frameBuffer) peek() (audio.Frame, bool) { + if b.len == 0 { + return audio.Frame{}, false + } + return b.frames[b.head], true +} + +func (b *frameBuffer) pop() (audio.Frame, bool) { + if b.len == 0 { + return audio.Frame{}, false + } + frame := b.frames[b.head] + b.head = (b.head + 1) % len(b.frames) + b.len-- + return frame, true +} diff --git a/internal/ingest/runtime_test.go b/internal/ingest/runtime_test.go index 1fc7c36..b8f65a9 100644 --- a/internal/ingest/runtime_test.go +++ b/internal/ingest/runtime_test.go @@ -147,7 +147,6 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) t.Fatalf("start: %v", err) } defer rt.Stop() - waitForRuntimeState(t, rt, "running") stats := rt.Stats() if stats.Active.ID != "icecast-primary" { @@ -164,6 +163,94 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) } } +func TestRuntimePrebufferGateAppliesBeforeSinkWrites(t *testing.T) { + sink := audio.NewStreamSource(512, 1000) + src := newFakeSource() + rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond)) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 1000, + Samples: stereoSamples(80, 100), + } + + time.Sleep(30 * time.Millisecond) + if sink.Available() != 0 { + t.Fatalf("sink available=%d want 0 while prebuffering", sink.Available()) + } + stats := rt.Stats() + if stats.Runtime.State != "prebuffering" || !stats.Runtime.Prebuffering { + t.Fatalf("runtime state=%q prebuffering=%t", stats.Runtime.State, stats.Runtime.Prebuffering) + } + if stats.Runtime.BufferedSeconds <= 0 { + t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds) + } + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 1000, + Samples: stereoSamples(40, 120), + } + waitForSinkFrames(t, sink, 1) + waitForRuntimeState(t, rt, "running") + if got := rt.Stats().Runtime.Prebuffering; got { + t.Fatalf("runtime prebuffering=%t want false", got) + } +} + +func TestRuntimeWriteBlockedRetainsWorkingBuffer(t *testing.T) { + sink := audio.NewStreamSource(1, 1000) + src := newFakeSource() + rt := NewRuntime(sink, src) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 1000, + Samples: stereoSamples(4, 200), + } + waitForRuntimeState(t, rt, "degraded") + stats := rt.Stats() + if !stats.Runtime.WriteBlocked { + t.Fatalf("runtime writeBlocked=%t want true", stats.Runtime.WriteBlocked) + } + if stats.Runtime.BufferedSeconds <= 0 { + t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds) + } + if stats.Runtime.DroppedFrames != 0 { + t.Fatalf("runtime droppedFrames=%d want 0", stats.Runtime.DroppedFrames) + } +} + +func TestRuntimeStatsSourceBufferedSecondsIncludesWorkingBuffer(t *testing.T) { + sink := audio.NewStreamSource(32, 1000) + src := newFakeSource() + src.stats = SourceStats{State: "running", Connected: true, BufferedSeconds: 0} + rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond)) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 1000, + Samples: stereoSamples(50, 300), + } + time.Sleep(20 * time.Millisecond) + stats := rt.Stats() + if stats.Source.BufferedSeconds <= 0 { + t.Fatalf("source bufferedSeconds=%f want > 0", stats.Source.BufferedSeconds) + } +} + func TestRuntimeUpdatesActiveDescriptorFromChunkMetadata(t *testing.T) { sink := audio.NewStreamSource(128, 44100) src := newFakeSource() @@ -230,3 +317,23 @@ func waitForRuntimeState(t *testing.T, rt *Runtime, want string) { } t.Fatalf("timeout waiting for runtime state %q; last=%q", want, rt.Stats().Runtime.State) } + +func waitForSinkFrames(t *testing.T, sink *audio.StreamSource, minFrames int) { + t.Helper() + deadline := time.Now().Add(1 * time.Second) + for time.Now().Before(deadline) { + if sink.Available() >= minFrames { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("timeout waiting for sink frames: have=%d want>=%d", sink.Available(), minFrames) +} + +func stereoSamples(frames int, v int32) []int32 { + out := make([]int32, 0, frames*2) + for i := 0; i < frames; i++ { + out = append(out, v<<16, -v<<16) + } + return out +} diff --git a/internal/ingest/stats.go b/internal/ingest/stats.go index 55f44a4..19e9886 100644 --- a/internal/ingest/stats.go +++ b/internal/ingest/stats.go @@ -24,12 +24,14 @@ type SourceStats struct { } type RuntimeStats struct { - State string `json:"state"` - Prebuffering bool `json:"prebuffering"` - LastChunkAt time.Time `json:"lastChunkAt,omitempty"` - DroppedFrames uint64 `json:"droppedFrames"` - ConvertErrors uint64 `json:"convertErrors"` - WriteBlocked bool `json:"writeBlocked"` + State string `json:"state"` + Prebuffering bool `json:"prebuffering"` + Buffered float64 `json:"buffered"` + BufferedSeconds float64 `json:"bufferedSeconds"` + LastChunkAt time.Time `json:"lastChunkAt,omitempty"` + DroppedFrames uint64 `json:"droppedFrames"` + ConvertErrors uint64 `json:"convertErrors"` + WriteBlocked bool `json:"writeBlocked"` } type Stats struct {