Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

308 line
9.1KB

  1. package audio
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. "sync/atomic"
  7. )
  8. // StreamSource is a lock-free SPSC (single-producer, single-consumer) ring buffer
  9. // for real-time audio streaming. One goroutine writes PCM frames, the DSP
  10. // goroutine reads them via NextFrame(). Returns silence on underrun.
  11. //
  12. // Zero allocations in steady state. No mutex in the read or write path.
  13. //
  14. // SampleRate is the nominal input sample rate. It may be updated at runtime
  15. // via SetSampleRate once the actual decoded rate is known (e.g. when the first
  16. // PCM chunk arrives from a compressed stream). Reads and writes to the sample
  17. // rate are atomic so they are safe across goroutines.
  18. type StreamSource struct {
  19. ring []Frame
  20. size int
  21. mask int // size-1, for fast modulo (size must be power of 2)
  22. // SampleRate is kept as a plain int for backward compatibility with code
  23. // that reads it before any goroutine races are possible (construction,
  24. // logging). All hot-path code uses the atomic below.
  25. SampleRate int
  26. sampleRateAtomic atomic.Int32
  27. writePos atomic.Int64
  28. readPos atomic.Int64
  29. Underruns atomic.Uint64
  30. Overflows atomic.Uint64
  31. Written atomic.Uint64
  32. highWatermark atomic.Int64
  33. underrunStreak atomic.Uint64
  34. maxUnderrunStreak atomic.Uint64
  35. }
  36. // NewStreamSource creates a ring buffer with the given capacity (rounded up
  37. // to next power of 2) and input sample rate.
  38. func NewStreamSource(capacity, sampleRate int) *StreamSource {
  39. // Round up to power of 2
  40. size := 1
  41. for size < capacity {
  42. size <<= 1
  43. }
  44. s := &StreamSource{
  45. ring: make([]Frame, size),
  46. size: size,
  47. mask: size - 1,
  48. SampleRate: sampleRate,
  49. }
  50. s.sampleRateAtomic.Store(int32(sampleRate))
  51. return s
  52. }
  53. // SetSampleRate updates the sample rate atomically. Safe to call from any
  54. // goroutine, including while the DSP goroutine is consuming frames via
  55. // StreamResampler. The change takes effect on the very next NextFrame() call.
  56. // Also updates the public SampleRate field for non-concurrent readers.
  57. func (s *StreamSource) SetSampleRate(hz int) {
  58. s.SampleRate = hz
  59. s.sampleRateAtomic.Store(int32(hz))
  60. }
  61. // GetSampleRate returns the current sample rate via atomic load. Use this
  62. // in hot paths / cross-goroutine reads instead of .SampleRate directly.
  63. func (s *StreamSource) GetSampleRate() int {
  64. return int(s.sampleRateAtomic.Load())
  65. }
  66. // WriteFrame pushes a single frame into the ring buffer.
  67. // Returns false if the buffer is full (overflow).
  68. func (s *StreamSource) WriteFrame(f Frame) bool {
  69. wp := s.writePos.Load()
  70. rp := s.readPos.Load()
  71. if wp-rp >= int64(s.size) {
  72. s.Overflows.Add(1)
  73. return false
  74. }
  75. s.ring[int(wp)&s.mask] = f
  76. s.writePos.Add(1)
  77. s.Written.Add(1)
  78. s.updateHighWatermark()
  79. return true
  80. }
  81. // WritePCM decodes interleaved S16LE stereo PCM bytes and writes frames
  82. // to the ring buffer. Returns the number of frames written.
  83. func (s *StreamSource) WritePCM(data []byte) int {
  84. frames := len(data) / 4 // 2 channels × 2 bytes per sample
  85. written := 0
  86. for i := 0; i < frames; i++ {
  87. off := i * 4
  88. l := int16(binary.LittleEndian.Uint16(data[off:]))
  89. r := int16(binary.LittleEndian.Uint16(data[off+2:]))
  90. f := NewFrame(
  91. Sample(float64(l)/32768.0),
  92. Sample(float64(r)/32768.0),
  93. )
  94. if !s.WriteFrame(f) {
  95. break
  96. }
  97. written++
  98. }
  99. return written
  100. }
  101. // ReadFrame consumes one frame from the ring buffer.
  102. // Returns silence (0,0) on underrun.
  103. func (s *StreamSource) ReadFrame() Frame {
  104. rp := s.readPos.Load()
  105. wp := s.writePos.Load()
  106. if rp >= wp {
  107. s.Underruns.Add(1)
  108. s.recordUnderrunStreak()
  109. return NewFrame(0, 0)
  110. }
  111. f := s.ring[int(rp)&s.mask]
  112. s.readPos.Add(1)
  113. s.resetUnderrunStreak()
  114. return f
  115. }
  116. // NextFrame implements the frameSource interface.
  117. func (s *StreamSource) NextFrame() Frame {
  118. return s.ReadFrame()
  119. }
  120. // Available returns the number of frames currently buffered.
  121. func (s *StreamSource) Available() int {
  122. return int(s.writePos.Load() - s.readPos.Load())
  123. }
  124. // Buffered returns the fill ratio (0.0 = empty, 1.0 = full).
  125. func (s *StreamSource) Buffered() float64 {
  126. return float64(s.Available()) / float64(s.size)
  127. }
  128. // Stats returns diagnostic counters.
  129. func (s *StreamSource) Stats() StreamStats {
  130. available := s.Available()
  131. buffered := 0.0
  132. if s.size > 0 {
  133. buffered = float64(available) / float64(s.size)
  134. }
  135. highWatermark := int(s.highWatermark.Load())
  136. currentStreak := int(s.underrunStreak.Load())
  137. maxStreak := int(s.maxUnderrunStreak.Load())
  138. return StreamStats{
  139. Available: available,
  140. Capacity: s.size,
  141. Buffered: buffered,
  142. BufferedDurationSeconds: s.bufferedDurationSeconds(available),
  143. HighWatermark: highWatermark,
  144. HighWatermarkDurationSeconds: s.bufferedDurationSeconds(highWatermark),
  145. Written: s.Written.Load(),
  146. Underruns: s.Underruns.Load(),
  147. Overflows: s.Overflows.Load(),
  148. UnderrunStreak: currentStreak,
  149. MaxUnderrunStreak: maxStreak,
  150. }
  151. }
  152. // StreamStats exposes runtime telemetry for the stream buffer.
  153. type StreamStats struct {
  154. Available int `json:"available"`
  155. Capacity int `json:"capacity"`
  156. Buffered float64 `json:"buffered"`
  157. BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"`
  158. HighWatermark int `json:"highWatermark"`
  159. HighWatermarkDurationSeconds float64 `json:"highWatermarkDurationSeconds"`
  160. Written uint64 `json:"written"`
  161. Underruns uint64 `json:"underruns"`
  162. Overflows uint64 `json:"overflows"`
  163. UnderrunStreak int `json:"underrunStreak"`
  164. MaxUnderrunStreak int `json:"maxUnderrunStreak"`
  165. }
  166. func (s *StreamSource) bufferedDurationSeconds(available int) float64 {
  167. rate := s.GetSampleRate()
  168. if rate <= 0 {
  169. return 0
  170. }
  171. return float64(available) / float64(rate)
  172. }
  173. func (s *StreamSource) updateHighWatermark() {
  174. available := s.Available()
  175. for {
  176. prev := s.highWatermark.Load()
  177. if int64(available) <= prev {
  178. return
  179. }
  180. if s.highWatermark.CompareAndSwap(prev, int64(available)) {
  181. return
  182. }
  183. }
  184. }
  185. func (s *StreamSource) recordUnderrunStreak() {
  186. current := s.underrunStreak.Add(1)
  187. for {
  188. prevMax := s.maxUnderrunStreak.Load()
  189. if current <= prevMax {
  190. return
  191. }
  192. if s.maxUnderrunStreak.CompareAndSwap(prevMax, current) {
  193. return
  194. }
  195. }
  196. }
  197. func (s *StreamSource) resetUnderrunStreak() {
  198. s.underrunStreak.Store(0)
  199. }
  200. // --- StreamResampler ---
  201. // StreamResampler wraps a StreamSource and rate-converts from the stream's
  202. // native sample rate to the target output rate using linear interpolation.
  203. // Consumes input frames on demand — no buffering beyond the ring buffer.
  204. //
  205. // The input rate is read atomically from src on every NextFrame() call so
  206. // that a SetSampleRate() from the ingest goroutine takes effect immediately,
  207. // without any additional synchronisation. The pos accumulator is not reset
  208. // on a rate change: this may produce a single glitch-free transient at the
  209. // moment the rate is corrected, which is far preferable to playing the whole
  210. // stream at the wrong pitch.
  211. type StreamResampler struct {
  212. src *StreamSource
  213. outputRate float64 // target composite rate, fixed for the lifetime of the resampler
  214. pos float64
  215. prev Frame
  216. curr Frame
  217. }
  218. // NewStreamResampler creates a streaming resampler.
  219. // outputRate is the fixed DSP composite rate. The input rate is taken from
  220. // src.GetSampleRate() dynamically, so it will automatically track any
  221. // subsequent SetSampleRate() call.
  222. func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler {
  223. if src == nil || outputRate <= 0 {
  224. return &StreamResampler{src: src, outputRate: outputRate}
  225. }
  226. return &StreamResampler{
  227. src: src,
  228. outputRate: outputRate,
  229. }
  230. }
  231. // NextFrame returns the next interpolated frame at the output rate.
  232. // Implements the frameSource interface.
  233. // The input/output ratio is recomputed on every call from the atomic sample
  234. // rate so that runtime rate corrections via SetSampleRate are race-free.
  235. func (r *StreamResampler) NextFrame() Frame {
  236. if r.src == nil {
  237. return NewFrame(0, 0)
  238. }
  239. // Compute ratio atomically so we see any SetSampleRate update immediately.
  240. ratio := 1.0
  241. if r.outputRate > 0 {
  242. if inputRate := r.src.GetSampleRate(); inputRate > 0 {
  243. ratio = float64(inputRate) / r.outputRate
  244. }
  245. }
  246. // Consume input samples as the fractional position advances.
  247. for r.pos >= 1.0 {
  248. r.prev = r.curr
  249. r.curr = r.src.ReadFrame()
  250. r.pos -= 1.0
  251. }
  252. frac := r.pos
  253. l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac
  254. ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac
  255. r.pos += ratio
  256. return NewFrame(Sample(l), Sample(ri))
  257. }
  258. // --- Ingest helpers ---
  259. // IngestReader continuously reads S16LE stereo PCM from an io.Reader into
  260. // a StreamSource. Blocks until the reader returns an error or io.EOF.
  261. // Designed to run as a goroutine.
  262. func IngestReader(r io.Reader, dst *StreamSource) error {
  263. buf := make([]byte, 16384) // 4096 frames per read (16KB)
  264. for {
  265. n, err := r.Read(buf)
  266. if n > 0 {
  267. dst.WritePCM(buf[:n])
  268. }
  269. if err != nil {
  270. if err == io.EOF {
  271. return nil
  272. }
  273. return fmt.Errorf("audio ingest: %w", err)
  274. }
  275. }
  276. }