Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 line
2.6KB

  1. package ingest
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/jan/fm-rds-tx/internal/audio"
  8. )
  9. type Runtime struct {
  10. sink *audio.StreamSource
  11. source Source
  12. started atomic.Bool
  13. ctx context.Context
  14. cancel context.CancelFunc
  15. wg sync.WaitGroup
  16. mu sync.RWMutex
  17. active SourceDescriptor
  18. stats RuntimeStats
  19. }
  20. func NewRuntime(sink *audio.StreamSource, src Source) *Runtime {
  21. return &Runtime{
  22. sink: sink,
  23. source: src,
  24. stats: RuntimeStats{
  25. State: "idle",
  26. },
  27. }
  28. }
  29. func (r *Runtime) Start(ctx context.Context) error {
  30. if r.sink == nil {
  31. r.mu.Lock()
  32. r.stats.State = "failed"
  33. r.mu.Unlock()
  34. return nil
  35. }
  36. if r.source == nil {
  37. r.mu.Lock()
  38. r.stats.State = "idle"
  39. r.mu.Unlock()
  40. return nil
  41. }
  42. if !r.started.CompareAndSwap(false, true) {
  43. return nil
  44. }
  45. r.ctx, r.cancel = context.WithCancel(ctx)
  46. r.mu.Lock()
  47. r.active = r.source.Descriptor()
  48. r.stats.State = "starting"
  49. r.mu.Unlock()
  50. if err := r.source.Start(r.ctx); err != nil {
  51. r.started.Store(false)
  52. r.mu.Lock()
  53. r.stats.State = "failed"
  54. r.mu.Unlock()
  55. return err
  56. }
  57. r.wg.Add(1)
  58. go r.run()
  59. return nil
  60. }
  61. func (r *Runtime) Stop() error {
  62. if !r.started.CompareAndSwap(true, false) {
  63. return nil
  64. }
  65. if r.cancel != nil {
  66. r.cancel()
  67. }
  68. if r.source != nil {
  69. _ = r.source.Stop()
  70. }
  71. r.wg.Wait()
  72. r.mu.Lock()
  73. r.stats.State = "stopped"
  74. r.mu.Unlock()
  75. return nil
  76. }
  77. func (r *Runtime) run() {
  78. defer r.wg.Done()
  79. r.mu.Lock()
  80. r.stats.State = "running"
  81. r.mu.Unlock()
  82. ch := r.source.Chunks()
  83. errCh := r.source.Errors()
  84. for {
  85. select {
  86. case <-r.ctx.Done():
  87. return
  88. case err, ok := <-errCh:
  89. if !ok {
  90. errCh = nil
  91. continue
  92. }
  93. if err == nil {
  94. continue
  95. }
  96. r.mu.Lock()
  97. r.stats.State = "degraded"
  98. r.mu.Unlock()
  99. case chunk, ok := <-ch:
  100. if !ok {
  101. r.mu.Lock()
  102. r.stats.State = "stopped"
  103. r.mu.Unlock()
  104. return
  105. }
  106. r.handleChunk(chunk)
  107. }
  108. }
  109. }
  110. func (r *Runtime) handleChunk(chunk PCMChunk) {
  111. frames, err := ChunkToFrames(chunk)
  112. if err != nil {
  113. r.mu.Lock()
  114. r.stats.ConvertErrors++
  115. r.stats.State = "degraded"
  116. r.mu.Unlock()
  117. return
  118. }
  119. dropped := uint64(0)
  120. for _, frame := range frames {
  121. if !r.sink.WriteFrame(frame) {
  122. dropped++
  123. }
  124. }
  125. r.mu.Lock()
  126. r.stats.LastChunkAt = time.Now()
  127. r.stats.DroppedFrames += dropped
  128. r.stats.WriteBlocked = dropped > 0
  129. r.mu.Unlock()
  130. }
  131. func (r *Runtime) Stats() Stats {
  132. r.mu.RLock()
  133. runtimeStats := r.stats
  134. active := r.active
  135. r.mu.RUnlock()
  136. sourceStats := SourceStats{}
  137. if r.source != nil {
  138. sourceStats = r.source.Stats()
  139. }
  140. return Stats{
  141. Active: active,
  142. Source: sourceStats,
  143. Runtime: runtimeStats,
  144. }
  145. }