diff --git a/internal/audio/stream.go b/internal/audio/stream.go index 6366f93..8ca9584 100644 --- a/internal/audio/stream.go +++ b/internal/audio/stream.go @@ -12,19 +12,31 @@ import ( // goroutine reads them via NextFrame(). Returns silence on underrun. // // Zero allocations in steady state. No mutex in the read or write path. +// +// SampleRate is the nominal input sample rate. It may be updated at runtime +// via SetSampleRate once the actual decoded rate is known (e.g. when the first +// PCM chunk arrives from a compressed stream). Reads and writes to the sample +// rate are atomic so they are safe across goroutines. type StreamSource struct { - ring []Frame - size int - mask int // size-1, for fast modulo (size must be power of 2) + ring []Frame + size int + mask int // size-1, for fast modulo (size must be power of 2) + + // SampleRate is kept as a plain int for backward compatibility with code + // that reads it before any goroutine races are possible (construction, + // logging). All hot-path code uses the atomic below. SampleRate int + sampleRateAtomic atomic.Int32 + writePos atomic.Int64 readPos atomic.Int64 Underruns atomic.Uint64 Overflows atomic.Uint64 Written atomic.Uint64 - highWatermark atomic.Int64 + + highWatermark atomic.Int64 underrunStreak atomic.Uint64 maxUnderrunStreak atomic.Uint64 } @@ -37,12 +49,29 @@ func NewStreamSource(capacity, sampleRate int) *StreamSource { for size < capacity { size <<= 1 } - return &StreamSource{ + s := &StreamSource{ ring: make([]Frame, size), size: size, mask: size - 1, SampleRate: sampleRate, } + s.sampleRateAtomic.Store(int32(sampleRate)) + return s +} + +// SetSampleRate updates the sample rate atomically. Safe to call from any +// goroutine, including while the DSP goroutine is consuming frames via +// StreamResampler. The change takes effect on the very next NextFrame() call. +// Also updates the public SampleRate field for non-concurrent readers. +func (s *StreamSource) SetSampleRate(hz int) { + s.SampleRate = hz + s.sampleRateAtomic.Store(int32(hz)) +} + +// GetSampleRate returns the current sample rate via atomic load. Use this +// in hot paths / cross-goroutine reads instead of .SampleRate directly. +func (s *StreamSource) GetSampleRate() int { + return int(s.sampleRateAtomic.Load()) } // WriteFrame pushes a single frame into the ring buffer. @@ -124,40 +153,41 @@ func (s *StreamSource) Stats() StreamStats { currentStreak := int(s.underrunStreak.Load()) maxStreak := int(s.maxUnderrunStreak.Load()) return StreamStats{ - Available: available, - Capacity: s.size, - Buffered: buffered, - BufferedDurationSeconds: s.bufferedDurationSeconds(available), - HighWatermark: highWatermark, + Available: available, + Capacity: s.size, + Buffered: buffered, + BufferedDurationSeconds: s.bufferedDurationSeconds(available), + HighWatermark: highWatermark, HighWatermarkDurationSeconds: s.bufferedDurationSeconds(highWatermark), - Written: s.Written.Load(), - Underruns: s.Underruns.Load(), - Overflows: s.Overflows.Load(), - UnderrunStreak: currentStreak, - MaxUnderrunStreak: maxStreak, + Written: s.Written.Load(), + Underruns: s.Underruns.Load(), + Overflows: s.Overflows.Load(), + UnderrunStreak: currentStreak, + MaxUnderrunStreak: maxStreak, } } // StreamStats exposes runtime telemetry for the stream buffer. type StreamStats struct { - Available int `json:"available"` - Capacity int `json:"capacity"` - Buffered float64 `json:"buffered"` - BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"` - HighWatermark int `json:"highWatermark"` + Available int `json:"available"` + Capacity int `json:"capacity"` + Buffered float64 `json:"buffered"` + BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"` + HighWatermark int `json:"highWatermark"` HighWatermarkDurationSeconds float64 `json:"highWatermarkDurationSeconds"` - Written uint64 `json:"written"` - Underruns uint64 `json:"underruns"` - Overflows uint64 `json:"overflows"` - UnderrunStreak int `json:"underrunStreak"` - MaxUnderrunStreak int `json:"maxUnderrunStreak"` + Written uint64 `json:"written"` + Underruns uint64 `json:"underruns"` + Overflows uint64 `json:"overflows"` + UnderrunStreak int `json:"underrunStreak"` + MaxUnderrunStreak int `json:"maxUnderrunStreak"` } func (s *StreamSource) bufferedDurationSeconds(available int) float64 { - if s.SampleRate <= 0 { + rate := s.GetSampleRate() + if rate <= 0 { return 0 } - return float64(available) / float64(s.SampleRate) + return float64(available) / float64(rate) } func (s *StreamSource) updateHighWatermark() { @@ -195,33 +225,53 @@ func (s *StreamSource) resetUnderrunStreak() { // StreamResampler wraps a StreamSource and rate-converts from the stream's // native sample rate to the target output rate using linear interpolation. // Consumes input frames on demand — no buffering beyond the ring buffer. +// +// The input rate is read atomically from src on every NextFrame() call so +// that a SetSampleRate() from the ingest goroutine takes effect immediately, +// without any additional synchronisation. The pos accumulator is not reset +// on a rate change: this may produce a single glitch-free transient at the +// moment the rate is corrected, which is far preferable to playing the whole +// stream at the wrong pitch. type StreamResampler struct { - src *StreamSource - ratio float64 // inputRate / outputRate (< 1 when upsampling) - pos float64 - prev Frame - curr Frame + src *StreamSource + outputRate float64 // target composite rate, fixed for the lifetime of the resampler + pos float64 + prev Frame + curr Frame } // NewStreamResampler creates a streaming resampler. +// outputRate is the fixed DSP composite rate. The input rate is taken from +// src.GetSampleRate() dynamically, so it will automatically track any +// subsequent SetSampleRate() call. func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler { - if src == nil || outputRate <= 0 || src.SampleRate <= 0 { - return &StreamResampler{src: src, ratio: 1.0} + if src == nil || outputRate <= 0 { + return &StreamResampler{src: src, outputRate: outputRate} } return &StreamResampler{ - src: src, - ratio: float64(src.SampleRate) / outputRate, + src: src, + outputRate: outputRate, } } // NextFrame returns the next interpolated frame at the output rate. // Implements the frameSource interface. +// The input/output ratio is recomputed on every call from the atomic sample +// rate so that runtime rate corrections via SetSampleRate are race-free. func (r *StreamResampler) NextFrame() Frame { if r.src == nil { return NewFrame(0, 0) } - // Consume input samples as the fractional position advances + // Compute ratio atomically so we see any SetSampleRate update immediately. + ratio := 1.0 + if r.outputRate > 0 { + if inputRate := r.src.GetSampleRate(); inputRate > 0 { + ratio = float64(inputRate) / r.outputRate + } + } + + // Consume input samples as the fractional position advances. for r.pos >= 1.0 { r.prev = r.curr r.curr = r.src.ReadFrame() @@ -231,7 +281,7 @@ func (r *StreamResampler) NextFrame() Frame { frac := r.pos l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac - r.pos += r.ratio + r.pos += ratio return NewFrame(Sample(l), Sample(ri)) } diff --git a/internal/ingest/runtime.go b/internal/ingest/runtime.go index 5c6167d..ec19e3d 100644 --- a/internal/ingest/runtime.go +++ b/internal/ingest/runtime.go @@ -209,6 +209,28 @@ func (r *Runtime) run() { func (r *Runtime) handleChunk(chunk PCMChunk) { r.mu.Lock() r.seenChunk = true + + // Propagate the actual decoded sample rate to the sink and pacer the + // first time (or whenever) it differs from our working rate. This fixes + // the two-part rate-mismatch bug that appears when a native decoder + // (e.g. go-mp3) decodes a 48000 Hz stream while the StreamSource and + // StreamResampler were initialised assuming 44100 Hz: + // + // 1. The pacer (pacedDrainLimitLocked) was draining at the wrong rate, + // causing the work buffer to overflow → glitches. + // 2. The StreamResampler ratio (inputRate/outputRate) was computed from + // the stale sink.SampleRate, so every frame was played at the wrong + // pitch → audio too slow (44100/48000 ≈ 91.9 % speed). + // + // SetSampleRate writes atomically, so the StreamResampler's NextFrame() + // picks up the corrected ratio without any additional locking. + if chunk.SampleRateHz > 0 && chunk.SampleRateHz != r.workSampleRate { + r.workSampleRate = chunk.SampleRateHz + if r.sink != nil { + r.sink.SetSampleRate(chunk.SampleRateHz) + } + } + r.mu.Unlock() frames, err := ChunkToFrames(chunk) @@ -319,9 +341,12 @@ func (r *Runtime) pacedDrainLimitLocked(now time.Time, bufferedFrames int) int { if bufferedFrames <= 0 { return 0 } + // Use workSampleRate which is kept in sync with sink.SampleRate via + // handleChunk. This ensures the pacer drains at the actual decoded rate + // rather than the initial (potentially wrong) configured rate. rate := r.workSampleRate - if r.sink != nil && r.sink.SampleRate > 0 { - rate = r.sink.SampleRate + if r.sink != nil && r.sink.GetSampleRate() > 0 { + rate = r.sink.GetSampleRate() } if rate <= 0 { return bufferedFrames