Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

258 lines
6.9KB

  1. package audio
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. "sync/atomic"
  7. )
  8. // StreamSource is a lock-free SPSC (single-producer, single-consumer) ring buffer
  9. // for real-time audio streaming. One goroutine writes PCM frames, the DSP
  10. // goroutine reads them via NextFrame(). Returns silence on underrun.
  11. //
  12. // Zero allocations in steady state. No mutex in the read or write path.
  13. type StreamSource struct {
  14. ring []Frame
  15. size int
  16. mask int // size-1, for fast modulo (size must be power of 2)
  17. SampleRate int
  18. writePos atomic.Int64
  19. readPos atomic.Int64
  20. Underruns atomic.Uint64
  21. Overflows atomic.Uint64
  22. Written atomic.Uint64
  23. highWatermark atomic.Int64
  24. underrunStreak atomic.Uint64
  25. maxUnderrunStreak atomic.Uint64
  26. }
  27. // NewStreamSource creates a ring buffer with the given capacity (rounded up
  28. // to next power of 2) and input sample rate.
  29. func NewStreamSource(capacity, sampleRate int) *StreamSource {
  30. // Round up to power of 2
  31. size := 1
  32. for size < capacity {
  33. size <<= 1
  34. }
  35. return &StreamSource{
  36. ring: make([]Frame, size),
  37. size: size,
  38. mask: size - 1,
  39. SampleRate: sampleRate,
  40. }
  41. }
  42. // WriteFrame pushes a single frame into the ring buffer.
  43. // Returns false if the buffer is full (overflow).
  44. func (s *StreamSource) WriteFrame(f Frame) bool {
  45. wp := s.writePos.Load()
  46. rp := s.readPos.Load()
  47. if wp-rp >= int64(s.size) {
  48. s.Overflows.Add(1)
  49. return false
  50. }
  51. s.ring[int(wp)&s.mask] = f
  52. s.writePos.Add(1)
  53. s.Written.Add(1)
  54. s.updateHighWatermark()
  55. return true
  56. }
  57. // WritePCM decodes interleaved S16LE stereo PCM bytes and writes frames
  58. // to the ring buffer. Returns the number of frames written.
  59. func (s *StreamSource) WritePCM(data []byte) int {
  60. frames := len(data) / 4 // 2 channels × 2 bytes per sample
  61. written := 0
  62. for i := 0; i < frames; i++ {
  63. off := i * 4
  64. l := int16(binary.LittleEndian.Uint16(data[off:]))
  65. r := int16(binary.LittleEndian.Uint16(data[off+2:]))
  66. f := NewFrame(
  67. Sample(float64(l)/32768.0),
  68. Sample(float64(r)/32768.0),
  69. )
  70. if !s.WriteFrame(f) {
  71. break
  72. }
  73. written++
  74. }
  75. return written
  76. }
  77. // ReadFrame consumes one frame from the ring buffer.
  78. // Returns silence (0,0) on underrun.
  79. func (s *StreamSource) ReadFrame() Frame {
  80. rp := s.readPos.Load()
  81. wp := s.writePos.Load()
  82. if rp >= wp {
  83. s.Underruns.Add(1)
  84. s.recordUnderrunStreak()
  85. return NewFrame(0, 0)
  86. }
  87. f := s.ring[int(rp)&s.mask]
  88. s.readPos.Add(1)
  89. s.resetUnderrunStreak()
  90. return f
  91. }
  92. // NextFrame implements the frameSource interface.
  93. func (s *StreamSource) NextFrame() Frame {
  94. return s.ReadFrame()
  95. }
  96. // Available returns the number of frames currently buffered.
  97. func (s *StreamSource) Available() int {
  98. return int(s.writePos.Load() - s.readPos.Load())
  99. }
  100. // Buffered returns the fill ratio (0.0 = empty, 1.0 = full).
  101. func (s *StreamSource) Buffered() float64 {
  102. return float64(s.Available()) / float64(s.size)
  103. }
  104. // Stats returns diagnostic counters.
  105. func (s *StreamSource) Stats() StreamStats {
  106. available := s.Available()
  107. buffered := 0.0
  108. if s.size > 0 {
  109. buffered = float64(available) / float64(s.size)
  110. }
  111. highWatermark := int(s.highWatermark.Load())
  112. currentStreak := int(s.underrunStreak.Load())
  113. maxStreak := int(s.maxUnderrunStreak.Load())
  114. return StreamStats{
  115. Available: available,
  116. Capacity: s.size,
  117. Buffered: buffered,
  118. BufferedDurationSeconds: s.bufferedDurationSeconds(available),
  119. HighWatermark: highWatermark,
  120. HighWatermarkDurationSeconds: s.bufferedDurationSeconds(highWatermark),
  121. Written: s.Written.Load(),
  122. Underruns: s.Underruns.Load(),
  123. Overflows: s.Overflows.Load(),
  124. UnderrunStreak: currentStreak,
  125. MaxUnderrunStreak: maxStreak,
  126. }
  127. }
  128. // StreamStats exposes runtime telemetry for the stream buffer.
  129. type StreamStats struct {
  130. Available int `json:"available"`
  131. Capacity int `json:"capacity"`
  132. Buffered float64 `json:"buffered"`
  133. BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"`
  134. HighWatermark int `json:"highWatermark"`
  135. HighWatermarkDurationSeconds float64 `json:"highWatermarkDurationSeconds"`
  136. Written uint64 `json:"written"`
  137. Underruns uint64 `json:"underruns"`
  138. Overflows uint64 `json:"overflows"`
  139. UnderrunStreak int `json:"underrunStreak"`
  140. MaxUnderrunStreak int `json:"maxUnderrunStreak"`
  141. }
  142. func (s *StreamSource) bufferedDurationSeconds(available int) float64 {
  143. if s.SampleRate <= 0 {
  144. return 0
  145. }
  146. return float64(available) / float64(s.SampleRate)
  147. }
  148. func (s *StreamSource) updateHighWatermark() {
  149. available := s.Available()
  150. for {
  151. prev := s.highWatermark.Load()
  152. if int64(available) <= prev {
  153. return
  154. }
  155. if s.highWatermark.CompareAndSwap(prev, int64(available)) {
  156. return
  157. }
  158. }
  159. }
  160. func (s *StreamSource) recordUnderrunStreak() {
  161. current := s.underrunStreak.Add(1)
  162. for {
  163. prevMax := s.maxUnderrunStreak.Load()
  164. if current <= prevMax {
  165. return
  166. }
  167. if s.maxUnderrunStreak.CompareAndSwap(prevMax, current) {
  168. return
  169. }
  170. }
  171. }
  172. func (s *StreamSource) resetUnderrunStreak() {
  173. s.underrunStreak.Store(0)
  174. }
  175. // --- StreamResampler ---
  176. // StreamResampler wraps a StreamSource and rate-converts from the stream's
  177. // native sample rate to the target output rate using linear interpolation.
  178. // Consumes input frames on demand — no buffering beyond the ring buffer.
  179. type StreamResampler struct {
  180. src *StreamSource
  181. ratio float64 // inputRate / outputRate (< 1 when upsampling)
  182. pos float64
  183. prev Frame
  184. curr Frame
  185. }
  186. // NewStreamResampler creates a streaming resampler.
  187. func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler {
  188. if src == nil || outputRate <= 0 || src.SampleRate <= 0 {
  189. return &StreamResampler{src: src, ratio: 1.0}
  190. }
  191. return &StreamResampler{
  192. src: src,
  193. ratio: float64(src.SampleRate) / outputRate,
  194. }
  195. }
  196. // NextFrame returns the next interpolated frame at the output rate.
  197. // Implements the frameSource interface.
  198. func (r *StreamResampler) NextFrame() Frame {
  199. if r.src == nil {
  200. return NewFrame(0, 0)
  201. }
  202. // Consume input samples as the fractional position advances
  203. for r.pos >= 1.0 {
  204. r.prev = r.curr
  205. r.curr = r.src.ReadFrame()
  206. r.pos -= 1.0
  207. }
  208. frac := r.pos
  209. l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac
  210. ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac
  211. r.pos += r.ratio
  212. return NewFrame(Sample(l), Sample(ri))
  213. }
  214. // --- Ingest helpers ---
  215. // IngestReader continuously reads S16LE stereo PCM from an io.Reader into
  216. // a StreamSource. Blocks until the reader returns an error or io.EOF.
  217. // Designed to run as a goroutine.
  218. func IngestReader(r io.Reader, dst *StreamSource) error {
  219. buf := make([]byte, 16384) // 4096 frames per read (16KB)
  220. for {
  221. n, err := r.Read(buf)
  222. if n > 0 {
  223. dst.WritePCM(buf[:n])
  224. }
  225. if err != nil {
  226. if err == io.EOF {
  227. return nil
  228. }
  229. return fmt.Errorf("audio ingest: %w", err)
  230. }
  231. }
  232. }