Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

211 Zeilen
5.6KB

  1. package audio
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. "sync/atomic"
  7. )
  8. // StreamSource is a lock-free SPSC (single-producer, single-consumer) ring buffer
  9. // for real-time audio streaming. One goroutine writes PCM frames, the DSP
  10. // goroutine reads them via NextFrame(). Returns silence on underrun.
  11. //
  12. // Zero allocations in steady state. No mutex in the read or write path.
  13. type StreamSource struct {
  14. ring []Frame
  15. size int
  16. mask int // size-1, for fast modulo (size must be power of 2)
  17. SampleRate int
  18. writePos atomic.Int64
  19. readPos atomic.Int64
  20. Underruns atomic.Uint64
  21. Overflows atomic.Uint64
  22. Written atomic.Uint64
  23. }
  24. // NewStreamSource creates a ring buffer with the given capacity (rounded up
  25. // to next power of 2) and input sample rate.
  26. func NewStreamSource(capacity, sampleRate int) *StreamSource {
  27. // Round up to power of 2
  28. size := 1
  29. for size < capacity {
  30. size <<= 1
  31. }
  32. return &StreamSource{
  33. ring: make([]Frame, size),
  34. size: size,
  35. mask: size - 1,
  36. SampleRate: sampleRate,
  37. }
  38. }
  39. // WriteFrame pushes a single frame into the ring buffer.
  40. // Returns false if the buffer is full (overflow).
  41. func (s *StreamSource) WriteFrame(f Frame) bool {
  42. wp := s.writePos.Load()
  43. rp := s.readPos.Load()
  44. if wp-rp >= int64(s.size) {
  45. s.Overflows.Add(1)
  46. return false
  47. }
  48. s.ring[int(wp)&s.mask] = f
  49. s.writePos.Add(1)
  50. s.Written.Add(1)
  51. return true
  52. }
  53. // WritePCM decodes interleaved S16LE stereo PCM bytes and writes frames
  54. // to the ring buffer. Returns the number of frames written.
  55. func (s *StreamSource) WritePCM(data []byte) int {
  56. frames := len(data) / 4 // 2 channels × 2 bytes per sample
  57. written := 0
  58. for i := 0; i < frames; i++ {
  59. off := i * 4
  60. l := int16(binary.LittleEndian.Uint16(data[off:]))
  61. r := int16(binary.LittleEndian.Uint16(data[off+2:]))
  62. f := NewFrame(
  63. Sample(float64(l)/32768.0),
  64. Sample(float64(r)/32768.0),
  65. )
  66. if !s.WriteFrame(f) {
  67. break
  68. }
  69. written++
  70. }
  71. return written
  72. }
  73. // ReadFrame consumes one frame from the ring buffer.
  74. // Returns silence (0,0) on underrun.
  75. func (s *StreamSource) ReadFrame() Frame {
  76. rp := s.readPos.Load()
  77. wp := s.writePos.Load()
  78. if rp >= wp {
  79. s.Underruns.Add(1)
  80. return NewFrame(0, 0)
  81. }
  82. f := s.ring[int(rp)&s.mask]
  83. s.readPos.Add(1)
  84. return f
  85. }
  86. // NextFrame implements the frameSource interface.
  87. func (s *StreamSource) NextFrame() Frame {
  88. return s.ReadFrame()
  89. }
  90. // Available returns the number of frames currently buffered.
  91. func (s *StreamSource) Available() int {
  92. return int(s.writePos.Load() - s.readPos.Load())
  93. }
  94. // Buffered returns the fill ratio (0.0 = empty, 1.0 = full).
  95. func (s *StreamSource) Buffered() float64 {
  96. return float64(s.Available()) / float64(s.size)
  97. }
  98. // Stats returns diagnostic counters.
  99. func (s *StreamSource) Stats() StreamStats {
  100. available := s.Available()
  101. buffered := 0.0
  102. if s.size > 0 {
  103. buffered = float64(available) / float64(s.size)
  104. }
  105. return StreamStats{
  106. Available: available,
  107. Capacity: s.size,
  108. Buffered: buffered,
  109. BufferedDurationSeconds: s.bufferedDurationSeconds(available),
  110. Written: s.Written.Load(),
  111. Underruns: s.Underruns.Load(),
  112. Overflows: s.Overflows.Load(),
  113. }
  114. }
  115. // StreamStats exposes runtime telemetry for the stream buffer.
  116. type StreamStats struct {
  117. Available int `json:"available"`
  118. Capacity int `json:"capacity"`
  119. Buffered float64 `json:"buffered"`
  120. BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"`
  121. Written uint64 `json:"written"`
  122. Underruns uint64 `json:"underruns"`
  123. Overflows uint64 `json:"overflows"`
  124. }
  125. func (s *StreamSource) bufferedDurationSeconds(available int) float64 {
  126. if s.SampleRate <= 0 {
  127. return 0
  128. }
  129. return float64(available) / float64(s.SampleRate)
  130. }
  131. // --- StreamResampler ---
  132. // StreamResampler wraps a StreamSource and rate-converts from the stream's
  133. // native sample rate to the target output rate using linear interpolation.
  134. // Consumes input frames on demand — no buffering beyond the ring buffer.
  135. type StreamResampler struct {
  136. src *StreamSource
  137. ratio float64 // inputRate / outputRate (< 1 when upsampling)
  138. pos float64
  139. prev Frame
  140. curr Frame
  141. }
  142. // NewStreamResampler creates a streaming resampler.
  143. func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler {
  144. if src == nil || outputRate <= 0 || src.SampleRate <= 0 {
  145. return &StreamResampler{src: src, ratio: 1.0}
  146. }
  147. return &StreamResampler{
  148. src: src,
  149. ratio: float64(src.SampleRate) / outputRate,
  150. }
  151. }
  152. // NextFrame returns the next interpolated frame at the output rate.
  153. // Implements the frameSource interface.
  154. func (r *StreamResampler) NextFrame() Frame {
  155. if r.src == nil {
  156. return NewFrame(0, 0)
  157. }
  158. // Consume input samples as the fractional position advances
  159. for r.pos >= 1.0 {
  160. r.prev = r.curr
  161. r.curr = r.src.ReadFrame()
  162. r.pos -= 1.0
  163. }
  164. frac := r.pos
  165. l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac
  166. ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac
  167. r.pos += r.ratio
  168. return NewFrame(Sample(l), Sample(ri))
  169. }
  170. // --- Ingest helpers ---
  171. // IngestReader continuously reads S16LE stereo PCM from an io.Reader into
  172. // a StreamSource. Blocks until the reader returns an error or io.EOF.
  173. // Designed to run as a goroutine.
  174. func IngestReader(r io.Reader, dst *StreamSource) error {
  175. buf := make([]byte, 16384) // 4096 frames per read (16KB)
  176. for {
  177. n, err := r.Read(buf)
  178. if n > 0 {
  179. dst.WritePCM(buf[:n])
  180. }
  181. if err != nil {
  182. if err == io.EOF {
  183. return nil
  184. }
  185. return fmt.Errorf("audio ingest: %w", err)
  186. }
  187. }
  188. }