package audio import ( "encoding/binary" "fmt" "io" "sync/atomic" ) // StreamSource is a lock-free SPSC (single-producer, single-consumer) ring buffer // for real-time audio streaming. One goroutine writes PCM frames, the DSP // goroutine reads them via NextFrame(). Returns silence on underrun. // // Zero allocations in steady state. No mutex in the read or write path. // // SampleRate is the nominal input sample rate. It may be updated at runtime // via SetSampleRate once the actual decoded rate is known (e.g. when the first // PCM chunk arrives from a compressed stream). Reads and writes to the sample // rate are atomic so they are safe across goroutines. type StreamSource struct { ring []Frame size int mask int // size-1, for fast modulo (size must be power of 2) // SampleRate is kept as a plain int for backward compatibility with code // that reads it before any goroutine races are possible (construction, // logging). All hot-path code uses the atomic below. SampleRate int sampleRateAtomic atomic.Int32 writePos atomic.Int64 readPos atomic.Int64 Underruns atomic.Uint64 Overflows atomic.Uint64 Written atomic.Uint64 highWatermark atomic.Int64 underrunStreak atomic.Uint64 maxUnderrunStreak atomic.Uint64 } // NewStreamSource creates a ring buffer with the given capacity (rounded up // to next power of 2) and input sample rate. func NewStreamSource(capacity, sampleRate int) *StreamSource { // Round up to power of 2 size := 1 for size < capacity { size <<= 1 } s := &StreamSource{ ring: make([]Frame, size), size: size, mask: size - 1, SampleRate: sampleRate, } s.sampleRateAtomic.Store(int32(sampleRate)) return s } // SetSampleRate updates the sample rate atomically. Safe to call from any // goroutine, including while the DSP goroutine is consuming frames via // StreamResampler. The change takes effect on the very next NextFrame() call. // Also updates the public SampleRate field for non-concurrent readers. func (s *StreamSource) SetSampleRate(hz int) { s.SampleRate = hz s.sampleRateAtomic.Store(int32(hz)) } // GetSampleRate returns the current sample rate via atomic load. Use this // in hot paths / cross-goroutine reads instead of .SampleRate directly. func (s *StreamSource) GetSampleRate() int { return int(s.sampleRateAtomic.Load()) } // WriteFrame pushes a single frame into the ring buffer. // Returns false if the buffer is full (overflow). func (s *StreamSource) WriteFrame(f Frame) bool { wp := s.writePos.Load() rp := s.readPos.Load() if wp-rp >= int64(s.size) { s.Overflows.Add(1) return false } s.ring[int(wp)&s.mask] = f s.writePos.Add(1) s.Written.Add(1) s.updateHighWatermark() return true } // WritePCM decodes interleaved S16LE stereo PCM bytes and writes frames // to the ring buffer. Returns the number of frames written. func (s *StreamSource) WritePCM(data []byte) int { frames := len(data) / 4 // 2 channels × 2 bytes per sample written := 0 for i := 0; i < frames; i++ { off := i * 4 l := int16(binary.LittleEndian.Uint16(data[off:])) r := int16(binary.LittleEndian.Uint16(data[off+2:])) f := NewFrame( Sample(float64(l)/32768.0), Sample(float64(r)/32768.0), ) if !s.WriteFrame(f) { break } written++ } return written } // ReadFrame consumes one frame from the ring buffer. // Returns silence (0,0) on underrun. func (s *StreamSource) ReadFrame() Frame { rp := s.readPos.Load() wp := s.writePos.Load() if rp >= wp { s.Underruns.Add(1) s.recordUnderrunStreak() return NewFrame(0, 0) } f := s.ring[int(rp)&s.mask] s.readPos.Add(1) s.resetUnderrunStreak() return f } // NextFrame implements the frameSource interface. func (s *StreamSource) NextFrame() Frame { return s.ReadFrame() } // Available returns the number of frames currently buffered. func (s *StreamSource) Available() int { return int(s.writePos.Load() - s.readPos.Load()) } // Buffered returns the fill ratio (0.0 = empty, 1.0 = full). func (s *StreamSource) Buffered() float64 { return float64(s.Available()) / float64(s.size) } // Stats returns diagnostic counters. func (s *StreamSource) Stats() StreamStats { available := s.Available() buffered := 0.0 if s.size > 0 { buffered = float64(available) / float64(s.size) } highWatermark := int(s.highWatermark.Load()) currentStreak := int(s.underrunStreak.Load()) maxStreak := int(s.maxUnderrunStreak.Load()) return StreamStats{ Available: available, Capacity: s.size, Buffered: buffered, BufferedDurationSeconds: s.bufferedDurationSeconds(available), HighWatermark: highWatermark, HighWatermarkDurationSeconds: s.bufferedDurationSeconds(highWatermark), Written: s.Written.Load(), Underruns: s.Underruns.Load(), Overflows: s.Overflows.Load(), UnderrunStreak: currentStreak, MaxUnderrunStreak: maxStreak, } } // StreamStats exposes runtime telemetry for the stream buffer. type StreamStats struct { Available int `json:"available"` Capacity int `json:"capacity"` Buffered float64 `json:"buffered"` BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"` HighWatermark int `json:"highWatermark"` HighWatermarkDurationSeconds float64 `json:"highWatermarkDurationSeconds"` Written uint64 `json:"written"` Underruns uint64 `json:"underruns"` Overflows uint64 `json:"overflows"` UnderrunStreak int `json:"underrunStreak"` MaxUnderrunStreak int `json:"maxUnderrunStreak"` } func (s *StreamSource) bufferedDurationSeconds(available int) float64 { rate := s.GetSampleRate() if rate <= 0 { return 0 } return float64(available) / float64(rate) } func (s *StreamSource) updateHighWatermark() { available := s.Available() for { prev := s.highWatermark.Load() if int64(available) <= prev { return } if s.highWatermark.CompareAndSwap(prev, int64(available)) { return } } } func (s *StreamSource) recordUnderrunStreak() { current := s.underrunStreak.Add(1) for { prevMax := s.maxUnderrunStreak.Load() if current <= prevMax { return } if s.maxUnderrunStreak.CompareAndSwap(prevMax, current) { return } } } func (s *StreamSource) resetUnderrunStreak() { s.underrunStreak.Store(0) } // --- StreamResampler --- // StreamResampler wraps a StreamSource and rate-converts from the stream's // native sample rate to the target output rate using linear interpolation. // Consumes input frames on demand — no buffering beyond the ring buffer. // // The input rate is read atomically from src on every NextFrame() call so // that a SetSampleRate() from the ingest goroutine takes effect immediately, // without any additional synchronisation. The pos accumulator is not reset // on a rate change: this may produce a single glitch-free transient at the // moment the rate is corrected, which is far preferable to playing the whole // stream at the wrong pitch. type StreamResampler struct { src *StreamSource outputRate float64 // target composite rate, fixed for the lifetime of the resampler pos float64 prev Frame curr Frame } // NewStreamResampler creates a streaming resampler. // outputRate is the fixed DSP composite rate. The input rate is taken from // src.GetSampleRate() dynamically, so it will automatically track any // subsequent SetSampleRate() call. func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler { if src == nil || outputRate <= 0 { return &StreamResampler{src: src, outputRate: outputRate} } return &StreamResampler{ src: src, outputRate: outputRate, } } // NextFrame returns the next interpolated frame at the output rate. // Implements the frameSource interface. // The input/output ratio is recomputed on every call from the atomic sample // rate so that runtime rate corrections via SetSampleRate are race-free. func (r *StreamResampler) NextFrame() Frame { if r.src == nil { return NewFrame(0, 0) } // Compute ratio atomically so we see any SetSampleRate update immediately. ratio := 1.0 if r.outputRate > 0 { if inputRate := r.src.GetSampleRate(); inputRate > 0 { ratio = float64(inputRate) / r.outputRate } } // Consume input samples as the fractional position advances. for r.pos >= 1.0 { r.prev = r.curr r.curr = r.src.ReadFrame() r.pos -= 1.0 } frac := r.pos l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac r.pos += ratio return NewFrame(Sample(l), Sample(ri)) } // --- Ingest helpers --- // IngestReader continuously reads S16LE stereo PCM from an io.Reader into // a StreamSource. Blocks until the reader returns an error or io.EOF. // Designed to run as a goroutine. func IngestReader(r io.Reader, dst *StreamSource) error { buf := make([]byte, 16384) // 4096 frames per read (16KB) for { n, err := r.Read(buf) if n > 0 { dst.WritePCM(buf[:n]) } if err != nil { if err == io.EOF { return nil } return fmt.Errorf("audio ingest: %w", err) } } }