package audio import ( "encoding/binary" "fmt" "io" "sync/atomic" ) // StreamSource is a lock-free SPSC (single-producer, single-consumer) ring buffer // for real-time audio streaming. One goroutine writes PCM frames, the DSP // goroutine reads them via NextFrame(). Returns silence on underrun. // // Zero allocations in steady state. No mutex in the read or write path. type StreamSource struct { ring []Frame size int mask int // size-1, for fast modulo (size must be power of 2) SampleRate int writePos atomic.Int64 readPos atomic.Int64 Underruns atomic.Uint64 Overflows atomic.Uint64 Written atomic.Uint64 highWatermark atomic.Int64 } // NewStreamSource creates a ring buffer with the given capacity (rounded up // to next power of 2) and input sample rate. func NewStreamSource(capacity, sampleRate int) *StreamSource { // Round up to power of 2 size := 1 for size < capacity { size <<= 1 } return &StreamSource{ ring: make([]Frame, size), size: size, mask: size - 1, SampleRate: sampleRate, } } // WriteFrame pushes a single frame into the ring buffer. // Returns false if the buffer is full (overflow). func (s *StreamSource) WriteFrame(f Frame) bool { wp := s.writePos.Load() rp := s.readPos.Load() if wp-rp >= int64(s.size) { s.Overflows.Add(1) return false } s.ring[int(wp)&s.mask] = f s.writePos.Add(1) s.Written.Add(1) s.updateHighWatermark() return true } // WritePCM decodes interleaved S16LE stereo PCM bytes and writes frames // to the ring buffer. Returns the number of frames written. func (s *StreamSource) WritePCM(data []byte) int { frames := len(data) / 4 // 2 channels × 2 bytes per sample written := 0 for i := 0; i < frames; i++ { off := i * 4 l := int16(binary.LittleEndian.Uint16(data[off:])) r := int16(binary.LittleEndian.Uint16(data[off+2:])) f := NewFrame( Sample(float64(l)/32768.0), Sample(float64(r)/32768.0), ) if !s.WriteFrame(f) { break } written++ } return written } // ReadFrame consumes one frame from the ring buffer. // Returns silence (0,0) on underrun. func (s *StreamSource) ReadFrame() Frame { rp := s.readPos.Load() wp := s.writePos.Load() if rp >= wp { s.Underruns.Add(1) return NewFrame(0, 0) } f := s.ring[int(rp)&s.mask] s.readPos.Add(1) return f } // NextFrame implements the frameSource interface. func (s *StreamSource) NextFrame() Frame { return s.ReadFrame() } // Available returns the number of frames currently buffered. func (s *StreamSource) Available() int { return int(s.writePos.Load() - s.readPos.Load()) } // Buffered returns the fill ratio (0.0 = empty, 1.0 = full). func (s *StreamSource) Buffered() float64 { return float64(s.Available()) / float64(s.size) } // Stats returns diagnostic counters. func (s *StreamSource) Stats() StreamStats { available := s.Available() buffered := 0.0 if s.size > 0 { buffered = float64(available) / float64(s.size) } highWatermark := int(s.highWatermark.Load()) return StreamStats{ Available: available, Capacity: s.size, Buffered: buffered, BufferedDurationSeconds: s.bufferedDurationSeconds(available), HighWatermark: highWatermark, HighWatermarkDurationSeconds: s.bufferedDurationSeconds(highWatermark), Written: s.Written.Load(), Underruns: s.Underruns.Load(), Overflows: s.Overflows.Load(), } } // StreamStats exposes runtime telemetry for the stream buffer. type StreamStats struct { Available int `json:"available"` Capacity int `json:"capacity"` Buffered float64 `json:"buffered"` BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"` HighWatermark int `json:"highWatermark"` HighWatermarkDurationSeconds float64 `json:"highWatermarkDurationSeconds"` Written uint64 `json:"written"` Underruns uint64 `json:"underruns"` Overflows uint64 `json:"overflows"` } func (s *StreamSource) bufferedDurationSeconds(available int) float64 { if s.SampleRate <= 0 { return 0 } return float64(available) / float64(s.SampleRate) } func (s *StreamSource) updateHighWatermark() { available := s.Available() for { prev := s.highWatermark.Load() if int64(available) <= prev { return } if s.highWatermark.CompareAndSwap(prev, int64(available)) { return } } } // --- StreamResampler --- // StreamResampler wraps a StreamSource and rate-converts from the stream's // native sample rate to the target output rate using linear interpolation. // Consumes input frames on demand — no buffering beyond the ring buffer. type StreamResampler struct { src *StreamSource ratio float64 // inputRate / outputRate (< 1 when upsampling) pos float64 prev Frame curr Frame } // NewStreamResampler creates a streaming resampler. func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler { if src == nil || outputRate <= 0 || src.SampleRate <= 0 { return &StreamResampler{src: src, ratio: 1.0} } return &StreamResampler{ src: src, ratio: float64(src.SampleRate) / outputRate, } } // NextFrame returns the next interpolated frame at the output rate. // Implements the frameSource interface. func (r *StreamResampler) NextFrame() Frame { if r.src == nil { return NewFrame(0, 0) } // Consume input samples as the fractional position advances for r.pos >= 1.0 { r.prev = r.curr r.curr = r.src.ReadFrame() r.pos -= 1.0 } frac := r.pos l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac r.pos += r.ratio return NewFrame(Sample(l), Sample(ri)) } // --- Ingest helpers --- // IngestReader continuously reads S16LE stereo PCM from an io.Reader into // a StreamSource. Blocks until the reader returns an error or io.EOF. // Designed to run as a goroutine. func IngestReader(r io.Reader, dst *StreamSource) error { buf := make([]byte, 16384) // 4096 frames per read (16KB) for { n, err := r.Read(buf) if n > 0 { dst.WritePCM(buf[:n]) } if err != nil { if err == io.EOF { return nil } return fmt.Errorf("audio ingest: %w", err) } } }