Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

307 строки
6.2KB

  1. package aoip
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "aoiprxkit"
  10. "github.com/jan/fm-rds-tx/internal/ingest"
  11. )
  12. type ReceiverClient interface {
  13. Start(ctx context.Context) error
  14. Stop() error
  15. Stats() aoiprxkit.Stats
  16. }
  17. type ReceiverFactory func(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error)
  18. type Option func(*Source)
  19. func WithReceiverFactory(factory ReceiverFactory) Option {
  20. return func(s *Source) {
  21. if factory != nil {
  22. s.factory = factory
  23. }
  24. }
  25. }
  26. type Source struct {
  27. id string
  28. cfg aoiprxkit.Config
  29. factory ReceiverFactory
  30. chunks chan ingest.PCMChunk
  31. errs chan error
  32. cancel context.CancelFunc
  33. wg sync.WaitGroup
  34. mu sync.Mutex
  35. rx ReceiverClient
  36. started atomic.Bool
  37. closeOnce sync.Once
  38. state atomic.Value // string
  39. connected atomic.Bool
  40. chunksIn atomic.Uint64
  41. samplesIn atomic.Uint64
  42. overflows atomic.Uint64
  43. discontinuities atomic.Uint64
  44. transportLoss atomic.Uint64
  45. reorders atomic.Uint64
  46. lastChunkAtUnix atomic.Int64
  47. lastError atomic.Value // string
  48. nextSeq atomic.Uint64
  49. seqMu sync.Mutex
  50. lastFrame uint16
  51. lastHasVal bool
  52. }
  53. func New(id string, cfg aoiprxkit.Config, opts ...Option) *Source {
  54. if id == "" {
  55. id = "aes67-main"
  56. }
  57. if cfg.MulticastGroup == "" {
  58. cfg = aoiprxkit.DefaultConfig()
  59. }
  60. s := &Source{
  61. id: id,
  62. cfg: cfg,
  63. factory: newReceiverAdapter,
  64. chunks: make(chan ingest.PCMChunk, 64),
  65. errs: make(chan error, 8),
  66. }
  67. for _, opt := range opts {
  68. if opt != nil {
  69. opt(s)
  70. }
  71. }
  72. s.state.Store("idle")
  73. s.lastError.Store("")
  74. return s
  75. }
  76. func (s *Source) Descriptor() ingest.SourceDescriptor {
  77. return ingest.SourceDescriptor{
  78. ID: s.id,
  79. Kind: "aes67",
  80. Family: "aoip",
  81. Transport: "rtp",
  82. Codec: "l24",
  83. Channels: s.cfg.Channels,
  84. SampleRateHz: s.cfg.SampleRateHz,
  85. Detail: fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port),
  86. }
  87. }
  88. func (s *Source) Start(ctx context.Context) error {
  89. if !s.started.CompareAndSwap(false, true) {
  90. return nil
  91. }
  92. rx, err := s.factory(s.cfg, s.handleFrame)
  93. if err != nil {
  94. s.started.Store(false)
  95. s.connected.Store(false)
  96. s.state.Store("failed")
  97. s.setError(err)
  98. return err
  99. }
  100. runCtx, cancel := context.WithCancel(ctx)
  101. s.cancel = cancel
  102. s.mu.Lock()
  103. s.rx = rx
  104. s.mu.Unlock()
  105. s.lastError.Store("")
  106. s.connected.Store(false)
  107. s.state.Store("connecting")
  108. if err := rx.Start(runCtx); err != nil {
  109. s.started.Store(false)
  110. s.connected.Store(false)
  111. s.state.Store("failed")
  112. s.setError(err)
  113. return err
  114. }
  115. s.connected.Store(true)
  116. s.state.Store("running")
  117. s.wg.Add(1)
  118. go func() {
  119. defer s.wg.Done()
  120. <-runCtx.Done()
  121. _ = s.stopReceiver()
  122. s.connected.Store(false)
  123. s.closeChannels()
  124. }()
  125. return nil
  126. }
  127. func (s *Source) Stop() error {
  128. if !s.started.CompareAndSwap(true, false) {
  129. return nil
  130. }
  131. if s.cancel != nil {
  132. s.cancel()
  133. }
  134. if err := s.stopReceiver(); err != nil {
  135. s.setError(err)
  136. s.state.Store("failed")
  137. }
  138. s.wg.Wait()
  139. s.connected.Store(false)
  140. state, _ := s.state.Load().(string)
  141. if state != "failed" {
  142. s.state.Store("stopped")
  143. }
  144. return nil
  145. }
  146. func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
  147. func (s *Source) Errors() <-chan error { return s.errs }
  148. func (s *Source) Stats() ingest.SourceStats {
  149. state, _ := s.state.Load().(string)
  150. last := s.lastChunkAtUnix.Load()
  151. errStr, _ := s.lastError.Load().(string)
  152. var lastChunkAt time.Time
  153. if last > 0 {
  154. lastChunkAt = time.Unix(0, last)
  155. }
  156. var rxStats aoiprxkit.Stats
  157. s.mu.Lock()
  158. rx := s.rx
  159. s.mu.Unlock()
  160. if rx != nil {
  161. rxStats = rx.Stats()
  162. }
  163. transportLoss := s.transportLoss.Load()
  164. if rxStats.PacketsGapLoss > transportLoss {
  165. transportLoss = rxStats.PacketsGapLoss
  166. }
  167. reorders := s.reorders.Load()
  168. if rxStats.JitterReorders > reorders {
  169. reorders = rxStats.JitterReorders
  170. }
  171. return ingest.SourceStats{
  172. State: state,
  173. Connected: s.connected.Load(),
  174. LastChunkAt: lastChunkAt,
  175. ChunksIn: s.chunksIn.Load(),
  176. SamplesIn: s.samplesIn.Load(),
  177. Overflows: s.overflows.Load(),
  178. Underruns: rxStats.PacketsLateDrop,
  179. Discontinuities: s.discontinuities.Load() + rxStats.PacketsLateDrop,
  180. TransportLoss: transportLoss,
  181. Reorders: reorders,
  182. JitterDepth: s.cfg.JitterDepthPackets,
  183. LastError: errStr,
  184. }
  185. }
  186. func (s *Source) handleFrame(frame aoiprxkit.PCMFrame) {
  187. if !s.started.Load() {
  188. return
  189. }
  190. discontinuity := false
  191. s.seqMu.Lock()
  192. if s.lastHasVal {
  193. expected := s.lastFrame + 1
  194. if frame.SequenceNumber != expected {
  195. discontinuity = true
  196. delta := int16(frame.SequenceNumber - expected)
  197. if delta > 0 {
  198. s.transportLoss.Add(uint64(delta))
  199. } else {
  200. s.reorders.Add(1)
  201. }
  202. }
  203. }
  204. s.lastFrame = frame.SequenceNumber
  205. s.lastHasVal = true
  206. s.seqMu.Unlock()
  207. chunk := ingest.PCMChunk{
  208. Samples: append([]int32(nil), frame.Samples...),
  209. Channels: frame.Channels,
  210. SampleRateHz: frame.SampleRateHz,
  211. Sequence: s.nextSeq.Add(1) - 1,
  212. Timestamp: frame.ReceivedAt,
  213. SourceID: s.id,
  214. Discontinuity: discontinuity,
  215. }
  216. s.chunksIn.Add(1)
  217. s.samplesIn.Add(uint64(len(chunk.Samples)))
  218. s.lastChunkAtUnix.Store(time.Now().UnixNano())
  219. if discontinuity {
  220. s.discontinuities.Add(1)
  221. }
  222. select {
  223. case s.chunks <- chunk:
  224. default:
  225. s.overflows.Add(1)
  226. s.discontinuities.Add(1)
  227. s.setError(io.ErrShortBuffer)
  228. s.emitError(fmt.Errorf("aes67 chunk buffer overflow"))
  229. }
  230. }
  231. func (s *Source) stopReceiver() error {
  232. s.mu.Lock()
  233. rx := s.rx
  234. s.rx = nil
  235. s.mu.Unlock()
  236. if rx == nil {
  237. return nil
  238. }
  239. return rx.Stop()
  240. }
  241. func (s *Source) closeChannels() {
  242. s.closeOnce.Do(func() {
  243. close(s.chunks)
  244. close(s.errs)
  245. })
  246. }
  247. func (s *Source) setError(err error) {
  248. if err == nil {
  249. return
  250. }
  251. s.lastError.Store(err.Error())
  252. s.emitError(err)
  253. }
  254. func (s *Source) emitError(err error) {
  255. if err == nil {
  256. return
  257. }
  258. select {
  259. case s.errs <- err:
  260. default:
  261. }
  262. }
  263. type receiverAdapter struct {
  264. *aoiprxkit.Receiver
  265. }
  266. func newReceiverAdapter(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error) {
  267. rx, err := aoiprxkit.NewReceiver(cfg, onFrame)
  268. if err != nil {
  269. return nil, err
  270. }
  271. return &receiverAdapter{Receiver: rx}, nil
  272. }