Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

489 řádky
10KB

  1. package ingest
  2. import (
  3. "context"
  4. "log"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/jan/fm-rds-tx/internal/audio"
  9. )
  10. type Runtime struct {
  11. sink *audio.StreamSource
  12. source Source
  13. started atomic.Bool
  14. onTitle func(string)
  15. prebuffer time.Duration
  16. ctx context.Context
  17. cancel context.CancelFunc
  18. wg sync.WaitGroup
  19. work *frameBuffer
  20. workSampleRate int
  21. prebufferFrames int
  22. gateOpen bool
  23. seenChunk bool
  24. lastDrainAt time.Time
  25. drainAllowance float64
  26. mu sync.RWMutex
  27. active SourceDescriptor
  28. stats RuntimeStats
  29. }
  30. type RuntimeOption func(*Runtime)
  31. func WithStreamTitleHandler(handler func(string)) RuntimeOption {
  32. return func(r *Runtime) {
  33. r.onTitle = handler
  34. }
  35. }
  36. func WithPrebuffer(d time.Duration) RuntimeOption {
  37. return func(r *Runtime) {
  38. if d < 0 {
  39. d = 0
  40. }
  41. r.prebuffer = d
  42. }
  43. }
  44. func WithPrebufferMs(ms int) RuntimeOption {
  45. return func(r *Runtime) {
  46. if ms < 0 {
  47. ms = 0
  48. }
  49. r.prebuffer = time.Duration(ms) * time.Millisecond
  50. }
  51. }
  52. func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime {
  53. sampleRate := 44100
  54. capacity := 1024
  55. if sink != nil {
  56. if sink.SampleRate > 0 {
  57. sampleRate = sink.SampleRate
  58. }
  59. if sinkCap := sink.Stats().Capacity; sinkCap > 0 {
  60. capacity = sinkCap * 2
  61. }
  62. }
  63. r := &Runtime{
  64. sink: sink,
  65. source: src,
  66. work: newFrameBuffer(capacity),
  67. workSampleRate: sampleRate,
  68. stats: RuntimeStats{
  69. State: "idle",
  70. },
  71. }
  72. for _, opt := range opts {
  73. if opt != nil {
  74. opt(r)
  75. }
  76. }
  77. if r.workSampleRate > 0 && r.prebuffer > 0 {
  78. r.prebufferFrames = int(r.prebuffer.Seconds() * float64(r.workSampleRate))
  79. }
  80. minCapacity := 256
  81. if r.prebufferFrames > 0 && minCapacity < r.prebufferFrames*2 {
  82. minCapacity = r.prebufferFrames * 2
  83. }
  84. if r.work == nil || r.work.capacity() < minCapacity {
  85. r.work = newFrameBuffer(minCapacity)
  86. }
  87. r.updateBufferedStatsLocked()
  88. return r
  89. }
  90. func (r *Runtime) Start(ctx context.Context) error {
  91. if r.sink == nil {
  92. r.mu.Lock()
  93. r.stats.State = "failed"
  94. r.mu.Unlock()
  95. return nil
  96. }
  97. if r.source == nil {
  98. r.mu.Lock()
  99. r.stats.State = "idle"
  100. r.mu.Unlock()
  101. return nil
  102. }
  103. if !r.started.CompareAndSwap(false, true) {
  104. return nil
  105. }
  106. r.ctx, r.cancel = context.WithCancel(ctx)
  107. r.mu.Lock()
  108. r.active = r.source.Descriptor()
  109. r.stats.State = "starting"
  110. r.stats.Prebuffering = false
  111. r.stats.WriteBlocked = false
  112. r.gateOpen = false
  113. r.seenChunk = false
  114. r.lastDrainAt = time.Now()
  115. r.drainAllowance = 0
  116. r.work.reset()
  117. r.updateBufferedStatsLocked()
  118. r.mu.Unlock()
  119. if err := r.source.Start(r.ctx); err != nil {
  120. r.started.Store(false)
  121. r.mu.Lock()
  122. r.stats.State = "failed"
  123. r.mu.Unlock()
  124. return err
  125. }
  126. r.wg.Add(1)
  127. go r.run()
  128. return nil
  129. }
  130. func (r *Runtime) Stop() error {
  131. if !r.started.CompareAndSwap(true, false) {
  132. return nil
  133. }
  134. if r.cancel != nil {
  135. r.cancel()
  136. }
  137. if r.source != nil {
  138. _ = r.source.Stop()
  139. }
  140. r.wg.Wait()
  141. r.mu.Lock()
  142. r.stats.State = "stopped"
  143. r.mu.Unlock()
  144. return nil
  145. }
  146. func (r *Runtime) run() {
  147. defer r.wg.Done()
  148. ch := r.source.Chunks()
  149. errCh := r.source.Errors()
  150. ticker := time.NewTicker(10 * time.Millisecond)
  151. defer ticker.Stop()
  152. var titleCh <-chan string
  153. if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil {
  154. titleCh = src.StreamTitleUpdates()
  155. }
  156. for {
  157. select {
  158. case <-r.ctx.Done():
  159. return
  160. case err, ok := <-errCh:
  161. if !ok {
  162. errCh = nil
  163. continue
  164. }
  165. if err == nil {
  166. continue
  167. }
  168. r.mu.Lock()
  169. r.stats.State = "degraded"
  170. r.stats.Prebuffering = false
  171. r.mu.Unlock()
  172. case chunk, ok := <-ch:
  173. if !ok {
  174. r.mu.Lock()
  175. r.stats.State = "stopped"
  176. r.stats.Prebuffering = false
  177. r.mu.Unlock()
  178. return
  179. }
  180. r.handleChunk(chunk)
  181. case <-ticker.C:
  182. r.drainWorkingBuffer()
  183. case title, ok := <-titleCh:
  184. if !ok {
  185. titleCh = nil
  186. continue
  187. }
  188. r.onTitle(title)
  189. }
  190. }
  191. }
  192. func (r *Runtime) handleChunk(chunk PCMChunk) {
  193. r.mu.Lock()
  194. r.seenChunk = true
  195. // Propagate the actual decoded sample rate to the sink and pacer the
  196. // first time (or whenever) it differs from our working rate. This fixes
  197. // the two-part rate-mismatch bug that appears when a native decoder
  198. // (e.g. go-mp3) decodes a 48000 Hz stream while the StreamSource and
  199. // StreamResampler were initialised assuming 44100 Hz:
  200. //
  201. // 1. The pacer (pacedDrainLimitLocked) was draining at the wrong rate,
  202. // causing the work buffer to overflow → glitches.
  203. // 2. The StreamResampler ratio (inputRate/outputRate) was computed from
  204. // the stale sink.SampleRate, so every frame was played at the wrong
  205. // pitch → audio too slow (44100/48000 ≈ 91.9 % speed).
  206. //
  207. // SetSampleRate writes atomically, so the StreamResampler's NextFrame()
  208. // picks up the corrected ratio without any additional locking.
  209. if chunk.SampleRateHz > 0 && chunk.SampleRateHz != r.workSampleRate {
  210. prev := r.workSampleRate
  211. r.workSampleRate = chunk.SampleRateHz
  212. if r.sink != nil {
  213. r.sink.SetSampleRate(chunk.SampleRateHz)
  214. }
  215. log.Printf("ingest: actual decoded sample rate %d Hz (was %d Hz) — resampler and pacer updated", chunk.SampleRateHz, prev)
  216. }
  217. r.mu.Unlock()
  218. frames, err := ChunkToFrames(chunk)
  219. if err != nil {
  220. r.mu.Lock()
  221. r.stats.ConvertErrors++
  222. r.stats.State = "degraded"
  223. r.mu.Unlock()
  224. return
  225. }
  226. dropped := uint64(0)
  227. for _, frame := range frames {
  228. if !r.work.push(frame) {
  229. dropped++
  230. }
  231. }
  232. r.mu.Lock()
  233. if chunk.SampleRateHz > 0 {
  234. r.active.SampleRateHz = chunk.SampleRateHz
  235. }
  236. if chunk.Channels > 0 {
  237. r.active.Channels = chunk.Channels
  238. }
  239. r.stats.LastChunkAt = time.Now()
  240. r.stats.DroppedFrames += dropped
  241. if dropped > 0 {
  242. r.stats.State = "degraded"
  243. }
  244. r.updateBufferedStatsLocked()
  245. r.mu.Unlock()
  246. r.drainWorkingBuffer()
  247. }
  248. func (r *Runtime) drainWorkingBuffer() {
  249. r.mu.Lock()
  250. defer r.mu.Unlock()
  251. now := time.Now()
  252. if r.sink == nil {
  253. r.resetDrainPacerLocked(now)
  254. r.updateBufferedStatsLocked()
  255. return
  256. }
  257. bufferedFrames := r.work.available()
  258. if !r.gateOpen {
  259. switch {
  260. case bufferedFrames == 0:
  261. if r.stats.State == "degraded" {
  262. // Keep degraded visible until fresh audio recovers runtime.
  263. } else if !r.seenChunk {
  264. r.stats.State = "starting"
  265. } else if r.stats.State != "degraded" {
  266. r.stats.State = "running"
  267. }
  268. r.stats.Prebuffering = false
  269. r.stats.WriteBlocked = false
  270. r.resetDrainPacerLocked(now)
  271. r.updateBufferedStatsLocked()
  272. return
  273. case r.prebufferFrames > 0 && bufferedFrames < r.prebufferFrames:
  274. r.stats.State = "prebuffering"
  275. r.stats.Prebuffering = true
  276. r.stats.WriteBlocked = false
  277. r.resetDrainPacerLocked(now)
  278. r.updateBufferedStatsLocked()
  279. return
  280. default:
  281. r.gateOpen = true
  282. r.resetDrainPacerLocked(now)
  283. }
  284. }
  285. writeBlocked := false
  286. limit := r.pacedDrainLimitLocked(now, bufferedFrames)
  287. written := 0
  288. for written < limit && r.work.available() > 0 {
  289. frame, ok := r.work.peek()
  290. if !ok {
  291. break
  292. }
  293. if !r.sink.WriteFrame(frame) {
  294. writeBlocked = true
  295. break
  296. }
  297. r.work.pop()
  298. written++
  299. }
  300. if written > 0 {
  301. r.drainAllowance -= float64(written)
  302. if r.drainAllowance < 0 {
  303. r.drainAllowance = 0
  304. }
  305. }
  306. if r.work.available() == 0 && r.prebufferFrames > 0 {
  307. // Re-arm the gate after dry-out to rebuild margin before resuming.
  308. r.gateOpen = false
  309. r.resetDrainPacerLocked(now)
  310. }
  311. r.stats.Prebuffering = false
  312. r.stats.WriteBlocked = writeBlocked
  313. if writeBlocked {
  314. r.stats.State = "degraded"
  315. } else {
  316. r.stats.State = "running"
  317. }
  318. r.updateBufferedStatsLocked()
  319. }
  320. func (r *Runtime) pacedDrainLimitLocked(now time.Time, bufferedFrames int) int {
  321. if bufferedFrames <= 0 {
  322. return 0
  323. }
  324. // Use workSampleRate which is kept in sync with sink.SampleRate via
  325. // handleChunk. This ensures the pacer drains at the actual decoded rate
  326. // rather than the initial (potentially wrong) configured rate.
  327. rate := r.workSampleRate
  328. if r.sink != nil && r.sink.GetSampleRate() > 0 {
  329. rate = r.sink.GetSampleRate()
  330. }
  331. if rate <= 0 {
  332. return bufferedFrames
  333. }
  334. if !r.lastDrainAt.IsZero() {
  335. elapsed := now.Sub(r.lastDrainAt)
  336. if elapsed > 0 {
  337. r.drainAllowance += elapsed.Seconds() * float64(rate)
  338. }
  339. }
  340. r.lastDrainAt = now
  341. maxAllowance := maxInt(1, rate/5) // cap accumulated credit at 200 ms
  342. if r.drainAllowance > float64(maxAllowance) {
  343. r.drainAllowance = float64(maxAllowance)
  344. }
  345. limit := int(r.drainAllowance)
  346. if limit <= 0 {
  347. return 0
  348. }
  349. maxBurst := maxInt(1, rate/50) // max 20 ms worth of frames per drain call
  350. if limit > maxBurst {
  351. limit = maxBurst
  352. }
  353. sinkStats := r.sink.Stats()
  354. headroom := sinkStats.Capacity - sinkStats.Available
  355. if headroom < 0 {
  356. headroom = 0
  357. }
  358. if limit > headroom {
  359. limit = headroom
  360. }
  361. if limit > bufferedFrames {
  362. limit = bufferedFrames
  363. }
  364. return limit
  365. }
  366. func (r *Runtime) resetDrainPacerLocked(now time.Time) {
  367. r.lastDrainAt = now
  368. r.drainAllowance = 0
  369. }
  370. func maxInt(a, b int) int {
  371. if a > b {
  372. return a
  373. }
  374. return b
  375. }
  376. func (r *Runtime) updateBufferedStatsLocked() {
  377. available := r.work.available()
  378. capacity := r.work.capacity()
  379. buffered := 0.0
  380. if capacity > 0 {
  381. buffered = float64(available) / float64(capacity)
  382. }
  383. bufferedSeconds := 0.0
  384. if r.workSampleRate > 0 {
  385. bufferedSeconds = float64(available) / float64(r.workSampleRate)
  386. }
  387. r.stats.Buffered = buffered
  388. r.stats.BufferedSeconds = bufferedSeconds
  389. }
  390. func (r *Runtime) Stats() Stats {
  391. r.mu.RLock()
  392. runtimeStats := r.stats
  393. active := r.active
  394. r.mu.RUnlock()
  395. sourceStats := SourceStats{}
  396. if r.source != nil {
  397. sourceStats = r.source.Stats()
  398. }
  399. if sourceStats.BufferedSeconds < runtimeStats.BufferedSeconds {
  400. sourceStats.BufferedSeconds = runtimeStats.BufferedSeconds
  401. }
  402. return Stats{
  403. Active: active,
  404. Source: sourceStats,
  405. Runtime: runtimeStats,
  406. }
  407. }
  408. type frameBuffer struct {
  409. frames []audio.Frame
  410. head int
  411. len int
  412. }
  413. func newFrameBuffer(capacity int) *frameBuffer {
  414. if capacity < 1 {
  415. capacity = 1
  416. }
  417. return &frameBuffer{frames: make([]audio.Frame, capacity)}
  418. }
  419. func (b *frameBuffer) capacity() int {
  420. return len(b.frames)
  421. }
  422. func (b *frameBuffer) available() int {
  423. return b.len
  424. }
  425. func (b *frameBuffer) reset() {
  426. b.head = 0
  427. b.len = 0
  428. }
  429. func (b *frameBuffer) push(frame audio.Frame) bool {
  430. if b.len >= len(b.frames) {
  431. return false
  432. }
  433. idx := (b.head + b.len) % len(b.frames)
  434. b.frames[idx] = frame
  435. b.len++
  436. return true
  437. }
  438. func (b *frameBuffer) peek() (audio.Frame, bool) {
  439. if b.len == 0 {
  440. return audio.Frame{}, false
  441. }
  442. return b.frames[b.head], true
  443. }
  444. func (b *frameBuffer) pop() (audio.Frame, bool) {
  445. if b.len == 0 {
  446. return audio.Frame{}, false
  447. }
  448. frame := b.frames[b.head]
  449. b.head = (b.head + 1) % len(b.frames)
  450. b.len--
  451. return frame, true
  452. }