|
- package audio
-
- import (
- "encoding/binary"
- "fmt"
- "io"
- "sync/atomic"
- )
-
- // StreamSource is a lock-free SPSC (single-producer, single-consumer) ring buffer
- // for real-time audio streaming. One goroutine writes PCM frames, the DSP
- // goroutine reads them via NextFrame(). Returns silence on underrun.
- //
- // Zero allocations in steady state. No mutex in the read or write path.
- //
- // SampleRate is the nominal input sample rate. It may be updated at runtime
- // via SetSampleRate once the actual decoded rate is known (e.g. when the first
- // PCM chunk arrives from a compressed stream). Reads and writes to the sample
- // rate are atomic so they are safe across goroutines.
- type StreamSource struct {
- ring []Frame
- size int
- mask int // size-1, for fast modulo (size must be power of 2)
-
- // SampleRate is kept as a plain int for backward compatibility with code
- // that reads it before any goroutine races are possible (construction,
- // logging). All hot-path code uses the atomic below.
- SampleRate int
-
- sampleRateAtomic atomic.Int32
-
- writePos atomic.Int64
- readPos atomic.Int64
-
- Underruns atomic.Uint64
- Overflows atomic.Uint64
- Written atomic.Uint64
-
- highWatermark atomic.Int64
- underrunStreak atomic.Uint64
- maxUnderrunStreak atomic.Uint64
- }
-
- // NewStreamSource creates a ring buffer with the given capacity (rounded up
- // to next power of 2) and input sample rate.
- func NewStreamSource(capacity, sampleRate int) *StreamSource {
- // Round up to power of 2
- size := 1
- for size < capacity {
- size <<= 1
- }
- s := &StreamSource{
- ring: make([]Frame, size),
- size: size,
- mask: size - 1,
- SampleRate: sampleRate,
- }
- s.sampleRateAtomic.Store(int32(sampleRate))
- return s
- }
-
- // SetSampleRate updates the sample rate atomically. Safe to call from any
- // goroutine, including while the DSP goroutine is consuming frames via
- // StreamResampler. The change takes effect on the very next NextFrame() call.
- // Also updates the public SampleRate field for non-concurrent readers.
- func (s *StreamSource) SetSampleRate(hz int) {
- s.SampleRate = hz
- s.sampleRateAtomic.Store(int32(hz))
- }
-
- // GetSampleRate returns the current sample rate via atomic load. Use this
- // in hot paths / cross-goroutine reads instead of .SampleRate directly.
- func (s *StreamSource) GetSampleRate() int {
- return int(s.sampleRateAtomic.Load())
- }
-
- // WriteFrame pushes a single frame into the ring buffer.
- // Returns false if the buffer is full (overflow).
- func (s *StreamSource) WriteFrame(f Frame) bool {
- wp := s.writePos.Load()
- rp := s.readPos.Load()
- if wp-rp >= int64(s.size) {
- s.Overflows.Add(1)
- return false
- }
- s.ring[int(wp)&s.mask] = f
- s.writePos.Add(1)
- s.Written.Add(1)
- s.updateHighWatermark()
- return true
- }
-
- // WritePCM decodes interleaved S16LE stereo PCM bytes and writes frames
- // to the ring buffer. Returns the number of frames written.
- func (s *StreamSource) WritePCM(data []byte) int {
- frames := len(data) / 4 // 2 channels × 2 bytes per sample
- written := 0
- for i := 0; i < frames; i++ {
- off := i * 4
- l := int16(binary.LittleEndian.Uint16(data[off:]))
- r := int16(binary.LittleEndian.Uint16(data[off+2:]))
- f := NewFrame(
- Sample(float64(l)/32768.0),
- Sample(float64(r)/32768.0),
- )
- if !s.WriteFrame(f) {
- break
- }
- written++
- }
- return written
- }
-
- // ReadFrame consumes one frame from the ring buffer.
- // Returns silence (0,0) on underrun.
- func (s *StreamSource) ReadFrame() Frame {
- rp := s.readPos.Load()
- wp := s.writePos.Load()
- if rp >= wp {
- s.Underruns.Add(1)
- s.recordUnderrunStreak()
- return NewFrame(0, 0)
- }
- f := s.ring[int(rp)&s.mask]
- s.readPos.Add(1)
- s.resetUnderrunStreak()
- return f
- }
-
- // NextFrame implements the frameSource interface.
- func (s *StreamSource) NextFrame() Frame {
- return s.ReadFrame()
- }
-
- // Available returns the number of frames currently buffered.
- func (s *StreamSource) Available() int {
- return int(s.writePos.Load() - s.readPos.Load())
- }
-
- // Buffered returns the fill ratio (0.0 = empty, 1.0 = full).
- func (s *StreamSource) Buffered() float64 {
- return float64(s.Available()) / float64(s.size)
- }
-
- // Stats returns diagnostic counters.
- func (s *StreamSource) Stats() StreamStats {
- available := s.Available()
- buffered := 0.0
- if s.size > 0 {
- buffered = float64(available) / float64(s.size)
- }
- highWatermark := int(s.highWatermark.Load())
- currentStreak := int(s.underrunStreak.Load())
- maxStreak := int(s.maxUnderrunStreak.Load())
- return StreamStats{
- Available: available,
- Capacity: s.size,
- Buffered: buffered,
- BufferedDurationSeconds: s.bufferedDurationSeconds(available),
- HighWatermark: highWatermark,
- HighWatermarkDurationSeconds: s.bufferedDurationSeconds(highWatermark),
- Written: s.Written.Load(),
- Underruns: s.Underruns.Load(),
- Overflows: s.Overflows.Load(),
- UnderrunStreak: currentStreak,
- MaxUnderrunStreak: maxStreak,
- }
- }
-
- // StreamStats exposes runtime telemetry for the stream buffer.
- type StreamStats struct {
- Available int `json:"available"`
- Capacity int `json:"capacity"`
- Buffered float64 `json:"buffered"`
- BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"`
- HighWatermark int `json:"highWatermark"`
- HighWatermarkDurationSeconds float64 `json:"highWatermarkDurationSeconds"`
- Written uint64 `json:"written"`
- Underruns uint64 `json:"underruns"`
- Overflows uint64 `json:"overflows"`
- UnderrunStreak int `json:"underrunStreak"`
- MaxUnderrunStreak int `json:"maxUnderrunStreak"`
- }
-
- func (s *StreamSource) bufferedDurationSeconds(available int) float64 {
- rate := s.GetSampleRate()
- if rate <= 0 {
- return 0
- }
- return float64(available) / float64(rate)
- }
-
- func (s *StreamSource) updateHighWatermark() {
- available := s.Available()
- for {
- prev := s.highWatermark.Load()
- if int64(available) <= prev {
- return
- }
- if s.highWatermark.CompareAndSwap(prev, int64(available)) {
- return
- }
- }
- }
-
- func (s *StreamSource) recordUnderrunStreak() {
- current := s.underrunStreak.Add(1)
- for {
- prevMax := s.maxUnderrunStreak.Load()
- if current <= prevMax {
- return
- }
- if s.maxUnderrunStreak.CompareAndSwap(prevMax, current) {
- return
- }
- }
- }
-
- func (s *StreamSource) resetUnderrunStreak() {
- s.underrunStreak.Store(0)
- }
-
- // --- StreamResampler ---
-
- // StreamResampler wraps a StreamSource and rate-converts from the stream's
- // native sample rate to the target output rate using linear interpolation.
- // Consumes input frames on demand — no buffering beyond the ring buffer.
- //
- // The input rate is read atomically from src on every NextFrame() call so
- // that a SetSampleRate() from the ingest goroutine takes effect immediately,
- // without any additional synchronisation. The pos accumulator is not reset
- // on a rate change: this may produce a single glitch-free transient at the
- // moment the rate is corrected, which is far preferable to playing the whole
- // stream at the wrong pitch.
- type StreamResampler struct {
- src *StreamSource
- outputRate float64 // target composite rate, fixed for the lifetime of the resampler
- pos float64
- prev Frame
- curr Frame
- }
-
- // NewStreamResampler creates a streaming resampler.
- // outputRate is the fixed DSP composite rate. The input rate is taken from
- // src.GetSampleRate() dynamically, so it will automatically track any
- // subsequent SetSampleRate() call.
- func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler {
- if src == nil || outputRate <= 0 {
- return &StreamResampler{src: src, outputRate: outputRate}
- }
- return &StreamResampler{
- src: src,
- outputRate: outputRate,
- }
- }
-
- // NextFrame returns the next interpolated frame at the output rate.
- // Implements the frameSource interface.
- // The input/output ratio is recomputed on every call from the atomic sample
- // rate so that runtime rate corrections via SetSampleRate are race-free.
- func (r *StreamResampler) NextFrame() Frame {
- if r.src == nil {
- return NewFrame(0, 0)
- }
-
- // Compute ratio atomically so we see any SetSampleRate update immediately.
- ratio := 1.0
- if r.outputRate > 0 {
- if inputRate := r.src.GetSampleRate(); inputRate > 0 {
- ratio = float64(inputRate) / r.outputRate
- }
- }
-
- // Consume input samples as the fractional position advances.
- for r.pos >= 1.0 {
- r.prev = r.curr
- r.curr = r.src.ReadFrame()
- r.pos -= 1.0
- }
-
- frac := r.pos
- l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac
- ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac
- r.pos += ratio
- return NewFrame(Sample(l), Sample(ri))
- }
-
- // --- Ingest helpers ---
-
- // IngestReader continuously reads S16LE stereo PCM from an io.Reader into
- // a StreamSource. Blocks until the reader returns an error or io.EOF.
- // Designed to run as a goroutine.
- func IngestReader(r io.Reader, dst *StreamSource) error {
- buf := make([]byte, 16384) // 4096 frames per read (16KB)
- for {
- n, err := r.Read(buf)
- if n > 0 {
- dst.WritePCM(buf[:n])
- }
- if err != nil {
- if err == io.EOF {
- return nil
- }
- return fmt.Errorf("audio ingest: %w", err)
- }
- }
- }
|