diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index 700913f..5354f08 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -193,6 +193,7 @@ func runTXMode(cfg cfgpkg.Config, configPath string, driver platform.SoapyDriver log.Fatalf("ingest source: %v", err) } runtimeOpts := []ingest.RuntimeOption{} + runtimeOpts = append(runtimeOpts, ingest.WithPrebufferMs(cfg.Ingest.PrebufferMs)) if cfg.Ingest.Icecast.RadioText.Enabled { relay := icecast.NewRadioTextRelay( icecast.RadioTextOptions{ diff --git a/internal/app/engine.go b/internal/app/engine.go index 8348b52..c25537b 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -113,13 +113,14 @@ type RuntimeTransition struct { } const ( - lateBufferIndicatorWindow = 5 * time.Second - writeLateTolerance = 1 * time.Millisecond + lateBufferIndicatorWindow = 2 * time.Second + writeLateTolerance = 10 * time.Millisecond queueCriticalStreakThreshold = 3 queueMutedStreakThreshold = queueCriticalStreakThreshold * 2 queueMutedRecoveryThreshold = queueCriticalStreakThreshold queueFaultedStreakThreshold = queueCriticalStreakThreshold faultRepeatWindow = 1 * time.Second + lateBufferStreakThreshold = 3 // consecutive late writes required before alerting faultHistoryCapacity = 8 runtimeTransitionHistoryCapacity = 8 ) @@ -150,6 +151,7 @@ type Engine struct { underruns atomic.Uint64 lateBuffers atomic.Uint64 lateBufferAlertAt atomic.Uint64 + lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write criticalStreak atomic.Uint64 mutedRecoveryStreak atomic.Uint64 mutedFaultStreak atomic.Uint64 @@ -604,12 +606,23 @@ func (e *Engine) writerLoop(ctx context.Context) { lateOver := writeDur - e.chunkDuration if lateOver > writeLateTolerance { + streak := e.lateBufferStreak.Add(1) late := e.lateBuffers.Add(1) - e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano())) + // Only arm the alert window once the streak threshold is reached. + // Isolated OS-scheduling or USB jitter spikes (single late writes) + // are normal on a loaded system and must not trigger degraded state. + // This mirrors the queue-health streak logic. + if streak >= lateBufferStreakThreshold { + e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano())) + } if late <= 5 || late%20 == 0 { - log.Printf("TX LATE: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s", - writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency) + log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s", + streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency) } + } else { + // Clean write — reset the consecutive streak so isolated spikes + // never accumulate toward the threshold. + e.lateBufferStreak.Store(0) } if err != nil { diff --git a/internal/audio/stream.go b/internal/audio/stream.go index 6366f93..8ca9584 100644 --- a/internal/audio/stream.go +++ b/internal/audio/stream.go @@ -12,19 +12,31 @@ import ( // goroutine reads them via NextFrame(). Returns silence on underrun. // // Zero allocations in steady state. No mutex in the read or write path. +// +// SampleRate is the nominal input sample rate. It may be updated at runtime +// via SetSampleRate once the actual decoded rate is known (e.g. when the first +// PCM chunk arrives from a compressed stream). Reads and writes to the sample +// rate are atomic so they are safe across goroutines. type StreamSource struct { - ring []Frame - size int - mask int // size-1, for fast modulo (size must be power of 2) + ring []Frame + size int + mask int // size-1, for fast modulo (size must be power of 2) + + // SampleRate is kept as a plain int for backward compatibility with code + // that reads it before any goroutine races are possible (construction, + // logging). All hot-path code uses the atomic below. SampleRate int + sampleRateAtomic atomic.Int32 + writePos atomic.Int64 readPos atomic.Int64 Underruns atomic.Uint64 Overflows atomic.Uint64 Written atomic.Uint64 - highWatermark atomic.Int64 + + highWatermark atomic.Int64 underrunStreak atomic.Uint64 maxUnderrunStreak atomic.Uint64 } @@ -37,12 +49,29 @@ func NewStreamSource(capacity, sampleRate int) *StreamSource { for size < capacity { size <<= 1 } - return &StreamSource{ + s := &StreamSource{ ring: make([]Frame, size), size: size, mask: size - 1, SampleRate: sampleRate, } + s.sampleRateAtomic.Store(int32(sampleRate)) + return s +} + +// SetSampleRate updates the sample rate atomically. Safe to call from any +// goroutine, including while the DSP goroutine is consuming frames via +// StreamResampler. The change takes effect on the very next NextFrame() call. +// Also updates the public SampleRate field for non-concurrent readers. +func (s *StreamSource) SetSampleRate(hz int) { + s.SampleRate = hz + s.sampleRateAtomic.Store(int32(hz)) +} + +// GetSampleRate returns the current sample rate via atomic load. Use this +// in hot paths / cross-goroutine reads instead of .SampleRate directly. +func (s *StreamSource) GetSampleRate() int { + return int(s.sampleRateAtomic.Load()) } // WriteFrame pushes a single frame into the ring buffer. @@ -124,40 +153,41 @@ func (s *StreamSource) Stats() StreamStats { currentStreak := int(s.underrunStreak.Load()) maxStreak := int(s.maxUnderrunStreak.Load()) return StreamStats{ - Available: available, - Capacity: s.size, - Buffered: buffered, - BufferedDurationSeconds: s.bufferedDurationSeconds(available), - HighWatermark: highWatermark, + Available: available, + Capacity: s.size, + Buffered: buffered, + BufferedDurationSeconds: s.bufferedDurationSeconds(available), + HighWatermark: highWatermark, HighWatermarkDurationSeconds: s.bufferedDurationSeconds(highWatermark), - Written: s.Written.Load(), - Underruns: s.Underruns.Load(), - Overflows: s.Overflows.Load(), - UnderrunStreak: currentStreak, - MaxUnderrunStreak: maxStreak, + Written: s.Written.Load(), + Underruns: s.Underruns.Load(), + Overflows: s.Overflows.Load(), + UnderrunStreak: currentStreak, + MaxUnderrunStreak: maxStreak, } } // StreamStats exposes runtime telemetry for the stream buffer. type StreamStats struct { - Available int `json:"available"` - Capacity int `json:"capacity"` - Buffered float64 `json:"buffered"` - BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"` - HighWatermark int `json:"highWatermark"` + Available int `json:"available"` + Capacity int `json:"capacity"` + Buffered float64 `json:"buffered"` + BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"` + HighWatermark int `json:"highWatermark"` HighWatermarkDurationSeconds float64 `json:"highWatermarkDurationSeconds"` - Written uint64 `json:"written"` - Underruns uint64 `json:"underruns"` - Overflows uint64 `json:"overflows"` - UnderrunStreak int `json:"underrunStreak"` - MaxUnderrunStreak int `json:"maxUnderrunStreak"` + Written uint64 `json:"written"` + Underruns uint64 `json:"underruns"` + Overflows uint64 `json:"overflows"` + UnderrunStreak int `json:"underrunStreak"` + MaxUnderrunStreak int `json:"maxUnderrunStreak"` } func (s *StreamSource) bufferedDurationSeconds(available int) float64 { - if s.SampleRate <= 0 { + rate := s.GetSampleRate() + if rate <= 0 { return 0 } - return float64(available) / float64(s.SampleRate) + return float64(available) / float64(rate) } func (s *StreamSource) updateHighWatermark() { @@ -195,33 +225,53 @@ func (s *StreamSource) resetUnderrunStreak() { // StreamResampler wraps a StreamSource and rate-converts from the stream's // native sample rate to the target output rate using linear interpolation. // Consumes input frames on demand — no buffering beyond the ring buffer. +// +// The input rate is read atomically from src on every NextFrame() call so +// that a SetSampleRate() from the ingest goroutine takes effect immediately, +// without any additional synchronisation. The pos accumulator is not reset +// on a rate change: this may produce a single glitch-free transient at the +// moment the rate is corrected, which is far preferable to playing the whole +// stream at the wrong pitch. type StreamResampler struct { - src *StreamSource - ratio float64 // inputRate / outputRate (< 1 when upsampling) - pos float64 - prev Frame - curr Frame + src *StreamSource + outputRate float64 // target composite rate, fixed for the lifetime of the resampler + pos float64 + prev Frame + curr Frame } // NewStreamResampler creates a streaming resampler. +// outputRate is the fixed DSP composite rate. The input rate is taken from +// src.GetSampleRate() dynamically, so it will automatically track any +// subsequent SetSampleRate() call. func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler { - if src == nil || outputRate <= 0 || src.SampleRate <= 0 { - return &StreamResampler{src: src, ratio: 1.0} + if src == nil || outputRate <= 0 { + return &StreamResampler{src: src, outputRate: outputRate} } return &StreamResampler{ - src: src, - ratio: float64(src.SampleRate) / outputRate, + src: src, + outputRate: outputRate, } } // NextFrame returns the next interpolated frame at the output rate. // Implements the frameSource interface. +// The input/output ratio is recomputed on every call from the atomic sample +// rate so that runtime rate corrections via SetSampleRate are race-free. func (r *StreamResampler) NextFrame() Frame { if r.src == nil { return NewFrame(0, 0) } - // Consume input samples as the fractional position advances + // Compute ratio atomically so we see any SetSampleRate update immediately. + ratio := 1.0 + if r.outputRate > 0 { + if inputRate := r.src.GetSampleRate(); inputRate > 0 { + ratio = float64(inputRate) / r.outputRate + } + } + + // Consume input samples as the fractional position advances. for r.pos >= 1.0 { r.prev = r.curr r.curr = r.src.ReadFrame() @@ -231,7 +281,7 @@ func (r *StreamResampler) NextFrame() Frame { frac := r.pos l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac - r.pos += r.ratio + r.pos += ratio return NewFrame(Sample(l), Sample(ri)) } diff --git a/internal/ingest/adapters/icecast/source.go b/internal/ingest/adapters/icecast/source.go index 784891d..2970e82 100644 --- a/internal/ingest/adapters/icecast/source.go +++ b/internal/ingest/adapters/icecast/source.go @@ -75,7 +75,9 @@ func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Op id = "icecast-main" } if client == nil { - client = &http.Client{Timeout: 20 * time.Second} + // Streaming responses are long-lived; a global client timeout would + // terminate the body read after a fixed duration. + client = &http.Client{} } s := &Source{ id: id, @@ -202,9 +204,6 @@ func (s *Source) loop(ctx context.Context) { if err == nil { err = errStreamEnded } - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return - } s.connected.Store(false) s.lastError.Store(err.Error()) select { diff --git a/internal/ingest/adapters/icecast/source_test.go b/internal/ingest/adapters/icecast/source_test.go index 9984269..3e378bb 100644 --- a/internal/ingest/adapters/icecast/source_test.go +++ b/internal/ingest/adapters/icecast/source_test.go @@ -511,6 +511,57 @@ func TestSourceClearsLastErrorAfterSuccessfulReconnect(t *testing.T) { } } +func TestNewWithoutClientUsesStreamingSafeHTTPClient(t *testing.T) { + src := New("ice-test", "http://example", nil, ReconnectConfig{}) + if src.client == nil { + t.Fatal("expected default http client") + } + if src.client.Timeout != 0 { + t.Fatalf("client timeout=%v want 0 for streaming", src.client.Timeout) + } +} + +func TestSourceReconnectsAfterDeadlineExceededError(t *testing.T) { + var requests atomic.Int64 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + requests.Add(1) + w.Header().Set("Content-Type", "audio/mpeg") + _, _ = w.Write([]byte("test-stream")) + })) + defer srv.Close() + + dec := &scriptedLoopDecoder{ + actions: []decodeAction{ + {err: context.DeadlineExceeded}, // first attempt fails transiently + {blockUntilStop: true}, // second attempt recovers and stays running + }, + } + reg := decoder.NewRegistry() + reg.Register("mp3", func() decoder.Decoder { return dec }) + reg.Register("ffmpeg", func() decoder.Decoder { return &testDecoder{name: "ffmpeg"} }) + + src := New("ice-test", srv.URL, srv.Client(), ReconnectConfig{ + Enabled: true, + InitialBackoffMs: 1, + MaxBackoffMs: 1, + }, WithDecoderRegistry(reg), WithDecoderPreference("auto")) + + if err := src.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer src.Stop() + + waitForCondition(t, func() bool { return dec.callCount() >= 2 }, "second decode call after deadline exceeded") + + stats := src.Stats() + if stats.Reconnects < 1 { + t.Fatalf("reconnects=%d want >=1", stats.Reconnects) + } + if got := requests.Load(); got < 2 { + t.Fatalf("requests=%d want >=2", got) + } +} + func waitForCondition(t *testing.T, cond func() bool, label string) { t.Helper() deadline := time.Now().Add(2 * time.Second) diff --git a/internal/ingest/runtime.go b/internal/ingest/runtime.go index 6b9e1ef..ec19e3d 100644 --- a/internal/ingest/runtime.go +++ b/internal/ingest/runtime.go @@ -10,15 +10,24 @@ import ( ) type Runtime struct { - sink *audio.StreamSource - source Source - started atomic.Bool - onTitle func(string) + sink *audio.StreamSource + source Source + started atomic.Bool + onTitle func(string) + prebuffer time.Duration ctx context.Context cancel context.CancelFunc wg sync.WaitGroup + work *frameBuffer + workSampleRate int + prebufferFrames int + gateOpen bool + seenChunk bool + lastDrainAt time.Time + drainAllowance float64 + mu sync.RWMutex active SourceDescriptor stats RuntimeStats @@ -32,10 +41,40 @@ func WithStreamTitleHandler(handler func(string)) RuntimeOption { } } +func WithPrebuffer(d time.Duration) RuntimeOption { + return func(r *Runtime) { + if d < 0 { + d = 0 + } + r.prebuffer = d + } +} + +func WithPrebufferMs(ms int) RuntimeOption { + return func(r *Runtime) { + if ms < 0 { + ms = 0 + } + r.prebuffer = time.Duration(ms) * time.Millisecond + } +} + func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime { + sampleRate := 44100 + capacity := 1024 + if sink != nil { + if sink.SampleRate > 0 { + sampleRate = sink.SampleRate + } + if sinkCap := sink.Stats().Capacity; sinkCap > 0 { + capacity = sinkCap * 2 + } + } r := &Runtime{ - sink: sink, - source: src, + sink: sink, + source: src, + work: newFrameBuffer(capacity), + workSampleRate: sampleRate, stats: RuntimeStats{ State: "idle", }, @@ -45,6 +84,17 @@ func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Ru opt(r) } } + if r.workSampleRate > 0 && r.prebuffer > 0 { + r.prebufferFrames = int(r.prebuffer.Seconds() * float64(r.workSampleRate)) + } + minCapacity := 256 + if r.prebufferFrames > 0 && minCapacity < r.prebufferFrames*2 { + minCapacity = r.prebufferFrames * 2 + } + if r.work == nil || r.work.capacity() < minCapacity { + r.work = newFrameBuffer(minCapacity) + } + r.updateBufferedStatsLocked() return r } @@ -69,6 +119,14 @@ func (r *Runtime) Start(ctx context.Context) error { r.mu.Lock() r.active = r.source.Descriptor() r.stats.State = "starting" + r.stats.Prebuffering = false + r.stats.WriteBlocked = false + r.gateOpen = false + r.seenChunk = false + r.lastDrainAt = time.Now() + r.drainAllowance = 0 + r.work.reset() + r.updateBufferedStatsLocked() r.mu.Unlock() if err := r.source.Start(r.ctx); err != nil { r.started.Store(false) @@ -102,12 +160,11 @@ func (r *Runtime) Stop() error { func (r *Runtime) run() { defer r.wg.Done() - r.mu.Lock() - r.stats.State = "running" - r.mu.Unlock() ch := r.source.Chunks() errCh := r.source.Errors() + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() var titleCh <-chan string if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil { titleCh = src.StreamTitleUpdates() @@ -126,15 +183,19 @@ func (r *Runtime) run() { } r.mu.Lock() r.stats.State = "degraded" + r.stats.Prebuffering = false r.mu.Unlock() case chunk, ok := <-ch: if !ok { r.mu.Lock() r.stats.State = "stopped" + r.stats.Prebuffering = false r.mu.Unlock() return } r.handleChunk(chunk) + case <-ticker.C: + r.drainWorkingBuffer() case title, ok := <-titleCh: if !ok { titleCh = nil @@ -146,6 +207,32 @@ func (r *Runtime) run() { } func (r *Runtime) handleChunk(chunk PCMChunk) { + r.mu.Lock() + r.seenChunk = true + + // Propagate the actual decoded sample rate to the sink and pacer the + // first time (or whenever) it differs from our working rate. This fixes + // the two-part rate-mismatch bug that appears when a native decoder + // (e.g. go-mp3) decodes a 48000 Hz stream while the StreamSource and + // StreamResampler were initialised assuming 44100 Hz: + // + // 1. The pacer (pacedDrainLimitLocked) was draining at the wrong rate, + // causing the work buffer to overflow → glitches. + // 2. The StreamResampler ratio (inputRate/outputRate) was computed from + // the stale sink.SampleRate, so every frame was played at the wrong + // pitch → audio too slow (44100/48000 ≈ 91.9 % speed). + // + // SetSampleRate writes atomically, so the StreamResampler's NextFrame() + // picks up the corrected ratio without any additional locking. + if chunk.SampleRateHz > 0 && chunk.SampleRateHz != r.workSampleRate { + r.workSampleRate = chunk.SampleRateHz + if r.sink != nil { + r.sink.SetSampleRate(chunk.SampleRateHz) + } + } + + r.mu.Unlock() + frames, err := ChunkToFrames(chunk) if err != nil { r.mu.Lock() @@ -156,16 +243,172 @@ func (r *Runtime) handleChunk(chunk PCMChunk) { } dropped := uint64(0) for _, frame := range frames { - if !r.sink.WriteFrame(frame) { + if !r.work.push(frame) { dropped++ } } r.mu.Lock() - r.stats.State = "running" + if chunk.SampleRateHz > 0 { + r.active.SampleRateHz = chunk.SampleRateHz + } + if chunk.Channels > 0 { + r.active.Channels = chunk.Channels + } r.stats.LastChunkAt = time.Now() r.stats.DroppedFrames += dropped - r.stats.WriteBlocked = dropped > 0 + if dropped > 0 { + r.stats.State = "degraded" + } + r.updateBufferedStatsLocked() r.mu.Unlock() + r.drainWorkingBuffer() +} + +func (r *Runtime) drainWorkingBuffer() { + r.mu.Lock() + defer r.mu.Unlock() + now := time.Now() + if r.sink == nil { + r.resetDrainPacerLocked(now) + r.updateBufferedStatsLocked() + return + } + bufferedFrames := r.work.available() + if !r.gateOpen { + switch { + case bufferedFrames == 0: + if r.stats.State == "degraded" { + // Keep degraded visible until fresh audio recovers runtime. + } else if !r.seenChunk { + r.stats.State = "starting" + } else if r.stats.State != "degraded" { + r.stats.State = "running" + } + r.stats.Prebuffering = false + r.stats.WriteBlocked = false + r.resetDrainPacerLocked(now) + r.updateBufferedStatsLocked() + return + case r.prebufferFrames > 0 && bufferedFrames < r.prebufferFrames: + r.stats.State = "prebuffering" + r.stats.Prebuffering = true + r.stats.WriteBlocked = false + r.resetDrainPacerLocked(now) + r.updateBufferedStatsLocked() + return + default: + r.gateOpen = true + r.resetDrainPacerLocked(now) + } + } + writeBlocked := false + limit := r.pacedDrainLimitLocked(now, bufferedFrames) + written := 0 + for written < limit && r.work.available() > 0 { + frame, ok := r.work.peek() + if !ok { + break + } + if !r.sink.WriteFrame(frame) { + writeBlocked = true + break + } + r.work.pop() + written++ + } + if written > 0 { + r.drainAllowance -= float64(written) + if r.drainAllowance < 0 { + r.drainAllowance = 0 + } + } + if r.work.available() == 0 && r.prebufferFrames > 0 { + // Re-arm the gate after dry-out to rebuild margin before resuming. + r.gateOpen = false + r.resetDrainPacerLocked(now) + } + r.stats.Prebuffering = false + r.stats.WriteBlocked = writeBlocked + if writeBlocked { + r.stats.State = "degraded" + } else { + r.stats.State = "running" + } + r.updateBufferedStatsLocked() +} + +func (r *Runtime) pacedDrainLimitLocked(now time.Time, bufferedFrames int) int { + if bufferedFrames <= 0 { + return 0 + } + // Use workSampleRate which is kept in sync with sink.SampleRate via + // handleChunk. This ensures the pacer drains at the actual decoded rate + // rather than the initial (potentially wrong) configured rate. + rate := r.workSampleRate + if r.sink != nil && r.sink.GetSampleRate() > 0 { + rate = r.sink.GetSampleRate() + } + if rate <= 0 { + return bufferedFrames + } + if !r.lastDrainAt.IsZero() { + elapsed := now.Sub(r.lastDrainAt) + if elapsed > 0 { + r.drainAllowance += elapsed.Seconds() * float64(rate) + } + } + r.lastDrainAt = now + maxAllowance := maxInt(1, rate/5) // cap accumulated credit at 200 ms + if r.drainAllowance > float64(maxAllowance) { + r.drainAllowance = float64(maxAllowance) + } + limit := int(r.drainAllowance) + if limit <= 0 { + return 0 + } + maxBurst := maxInt(1, rate/50) // max 20 ms worth of frames per drain call + if limit > maxBurst { + limit = maxBurst + } + sinkStats := r.sink.Stats() + headroom := sinkStats.Capacity - sinkStats.Available + if headroom < 0 { + headroom = 0 + } + if limit > headroom { + limit = headroom + } + if limit > bufferedFrames { + limit = bufferedFrames + } + return limit +} + +func (r *Runtime) resetDrainPacerLocked(now time.Time) { + r.lastDrainAt = now + r.drainAllowance = 0 +} + +func maxInt(a, b int) int { + if a > b { + return a + } + return b +} + +func (r *Runtime) updateBufferedStatsLocked() { + available := r.work.available() + capacity := r.work.capacity() + buffered := 0.0 + if capacity > 0 { + buffered = float64(available) / float64(capacity) + } + bufferedSeconds := 0.0 + if r.workSampleRate > 0 { + bufferedSeconds = float64(available) / float64(r.workSampleRate) + } + r.stats.Buffered = buffered + r.stats.BufferedSeconds = bufferedSeconds } func (r *Runtime) Stats() Stats { @@ -178,9 +421,65 @@ func (r *Runtime) Stats() Stats { if r.source != nil { sourceStats = r.source.Stats() } + if sourceStats.BufferedSeconds < runtimeStats.BufferedSeconds { + sourceStats.BufferedSeconds = runtimeStats.BufferedSeconds + } return Stats{ Active: active, Source: sourceStats, Runtime: runtimeStats, } } + +type frameBuffer struct { + frames []audio.Frame + head int + len int +} + +func newFrameBuffer(capacity int) *frameBuffer { + if capacity < 1 { + capacity = 1 + } + return &frameBuffer{frames: make([]audio.Frame, capacity)} +} + +func (b *frameBuffer) capacity() int { + return len(b.frames) +} + +func (b *frameBuffer) available() int { + return b.len +} + +func (b *frameBuffer) reset() { + b.head = 0 + b.len = 0 +} + +func (b *frameBuffer) push(frame audio.Frame) bool { + if b.len >= len(b.frames) { + return false + } + idx := (b.head + b.len) % len(b.frames) + b.frames[idx] = frame + b.len++ + return true +} + +func (b *frameBuffer) peek() (audio.Frame, bool) { + if b.len == 0 { + return audio.Frame{}, false + } + return b.frames[b.head], true +} + +func (b *frameBuffer) pop() (audio.Frame, bool) { + if b.len == 0 { + return audio.Frame{}, false + } + frame := b.frames[b.head] + b.head = (b.head + 1) % len(b.frames) + b.len-- + return frame, true +} diff --git a/internal/ingest/runtime_test.go b/internal/ingest/runtime_test.go index 48cfcb3..0b351e4 100644 --- a/internal/ingest/runtime_test.go +++ b/internal/ingest/runtime_test.go @@ -147,7 +147,6 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) t.Fatalf("start: %v", err) } defer rt.Stop() - waitForRuntimeState(t, rt, "running") stats := rt.Stats() if stats.Active.ID != "icecast-primary" { @@ -164,6 +163,187 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) } } +func TestRuntimePrebufferGateAppliesBeforeSinkWrites(t *testing.T) { + sink := audio.NewStreamSource(512, 1000) + src := newFakeSource() + rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond)) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 1000, + Samples: stereoSamples(80, 100), + } + + time.Sleep(30 * time.Millisecond) + if sink.Available() != 0 { + t.Fatalf("sink available=%d want 0 while prebuffering", sink.Available()) + } + stats := rt.Stats() + if stats.Runtime.State != "prebuffering" || !stats.Runtime.Prebuffering { + t.Fatalf("runtime state=%q prebuffering=%t", stats.Runtime.State, stats.Runtime.Prebuffering) + } + if stats.Runtime.BufferedSeconds <= 0 { + t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds) + } + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 1000, + Samples: stereoSamples(40, 120), + } + waitForSinkFrames(t, sink, 1) + waitForRuntimeState(t, rt, "running") + if got := rt.Stats().Runtime.Prebuffering; got { + t.Fatalf("runtime prebuffering=%t want false", got) + } +} + +func TestRuntimeWriteBlockedRetainsWorkingBuffer(t *testing.T) { + sink := audio.NewStreamSource(1, 1000) + src := newFakeSource() + rt := NewRuntime(sink, src) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 1000, + Samples: stereoSamples(4, 200), + } + waitForSinkFrames(t, sink, 1) + waitForRuntimeState(t, rt, "running") + stats := rt.Stats() + if stats.Runtime.WriteBlocked { + t.Fatalf("runtime writeBlocked=%t want false", stats.Runtime.WriteBlocked) + } + if stats.Runtime.BufferedSeconds <= 0 { + t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds) + } + if stats.Runtime.DroppedFrames != 0 { + t.Fatalf("runtime droppedFrames=%d want 0", stats.Runtime.DroppedFrames) + } + if got := sink.Stats().Overflows; got != 0 { + t.Fatalf("sink overflows=%d want 0", got) + } +} + +func TestRuntimeDrainWorkingBufferIsBurstBounded(t *testing.T) { + sink := audio.NewStreamSource(64, 1000) + rt := NewRuntime(sink, nil) + + rt.gateOpen = true + for i := 0; i < 40; i++ { + if !rt.work.push(audio.NewFrame(0.1, -0.1)) { + t.Fatalf("failed to seed work frame %d", i) + } + } + rt.lastDrainAt = time.Now().Add(-time.Second) + + rt.drainWorkingBuffer() + + if got := sink.Available(); got != 20 { + t.Fatalf("sink available=%d want 20 (20ms burst at 1kHz)", got) + } + if got := rt.work.available(); got != 20 { + t.Fatalf("work available=%d want 20", got) + } + if got := rt.Stats().Runtime.WriteBlocked; got { + t.Fatalf("runtime writeBlocked=%t want false", got) + } +} + +func TestRuntimeDrainWorkingBufferHonorsSinkHeadroom(t *testing.T) { + sink := audio.NewStreamSource(64, 1000) + rt := NewRuntime(sink, nil) + + for i := 0; i < 63; i++ { + if !sink.WriteFrame(audio.NewFrame(0.2, -0.2)) { + t.Fatalf("failed to seed sink frame %d", i) + } + } + rt.gateOpen = true + for i := 0; i < 8; i++ { + if !rt.work.push(audio.NewFrame(0.3, -0.3)) { + t.Fatalf("failed to seed work frame %d", i) + } + } + rt.lastDrainAt = time.Now().Add(-time.Second) + + rt.drainWorkingBuffer() + + if got := sink.Available(); got != 64 { + t.Fatalf("sink available=%d want 64", got) + } + if got := rt.work.available(); got != 7 { + t.Fatalf("work available=%d want 7", got) + } + if got := sink.Stats().Overflows; got != 0 { + t.Fatalf("sink overflows=%d want 0", got) + } + if got := rt.Stats().Runtime.WriteBlocked; got { + t.Fatalf("runtime writeBlocked=%t want false", got) + } +} + +func TestRuntimeStatsSourceBufferedSecondsIncludesWorkingBuffer(t *testing.T) { + sink := audio.NewStreamSource(32, 1000) + src := newFakeSource() + src.stats = SourceStats{State: "running", Connected: true, BufferedSeconds: 0} + rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond)) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 1000, + Samples: stereoSamples(50, 300), + } + time.Sleep(20 * time.Millisecond) + stats := rt.Stats() + if stats.Source.BufferedSeconds <= 0 { + t.Fatalf("source bufferedSeconds=%f want > 0", stats.Source.BufferedSeconds) + } +} + +func TestRuntimeUpdatesActiveDescriptorFromChunkMetadata(t *testing.T) { + sink := audio.NewStreamSource(128, 44100) + src := newFakeSource() + src.desc = SourceDescriptor{ + ID: "icecast-primary", + Kind: "icecast", + Channels: 0, + SampleRateHz: 0, + } + rt := NewRuntime(sink, src) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 48000, + Samples: []int32{100 << 16, -100 << 16}, + } + + waitForRuntimeState(t, rt, "running") + stats := rt.Stats() + if stats.Active.SampleRateHz != 48000 { + t.Fatalf("active sampleRateHz=%d want 48000", stats.Active.SampleRateHz) + } + if stats.Active.Channels != 2 { + t.Fatalf("active channels=%d want 2", stats.Active.Channels) + } +} + func TestRuntimeForwardsStreamTitleUpdatesToHandler(t *testing.T) { sink := audio.NewStreamSource(128, 44100) src := newFakeSource() @@ -199,3 +379,23 @@ func waitForRuntimeState(t *testing.T, rt *Runtime, want string) { } t.Fatalf("timeout waiting for runtime state %q; last=%q", want, rt.Stats().Runtime.State) } + +func waitForSinkFrames(t *testing.T, sink *audio.StreamSource, minFrames int) { + t.Helper() + deadline := time.Now().Add(1 * time.Second) + for time.Now().Before(deadline) { + if sink.Available() >= minFrames { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("timeout waiting for sink frames: have=%d want>=%d", sink.Available(), minFrames) +} + +func stereoSamples(frames int, v int32) []int32 { + out := make([]int32, 0, frames*2) + for i := 0; i < frames; i++ { + out = append(out, v<<16, -v<<16) + } + return out +} diff --git a/internal/ingest/stats.go b/internal/ingest/stats.go index 55f44a4..19e9886 100644 --- a/internal/ingest/stats.go +++ b/internal/ingest/stats.go @@ -24,12 +24,14 @@ type SourceStats struct { } type RuntimeStats struct { - State string `json:"state"` - Prebuffering bool `json:"prebuffering"` - LastChunkAt time.Time `json:"lastChunkAt,omitempty"` - DroppedFrames uint64 `json:"droppedFrames"` - ConvertErrors uint64 `json:"convertErrors"` - WriteBlocked bool `json:"writeBlocked"` + State string `json:"state"` + Prebuffering bool `json:"prebuffering"` + Buffered float64 `json:"buffered"` + BufferedSeconds float64 `json:"bufferedSeconds"` + LastChunkAt time.Time `json:"lastChunkAt,omitempty"` + DroppedFrames uint64 `json:"droppedFrames"` + ConvertErrors uint64 `json:"convertErrors"` + WriteBlocked bool `json:"writeBlocked"` } type Stats struct {