Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

284 строки
5.4KB

  1. package srt
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "aoiprxkit"
  10. "github.com/jan/fm-rds-tx/internal/ingest"
  11. )
  12. type Option func(*Source)
  13. func WithConnOpener(opener aoiprxkit.SRTConnOpener) Option {
  14. return func(s *Source) {
  15. if opener != nil {
  16. s.opener = opener
  17. }
  18. }
  19. }
  20. type Source struct {
  21. id string
  22. cfg aoiprxkit.SRTConfig
  23. opener aoiprxkit.SRTConnOpener
  24. chunks chan ingest.PCMChunk
  25. errs chan error
  26. cancel context.CancelFunc
  27. wg sync.WaitGroup
  28. mu sync.Mutex
  29. rx *aoiprxkit.SRTReceiver
  30. started atomic.Bool
  31. closeOnce sync.Once
  32. state atomic.Value // string
  33. connected atomic.Bool
  34. chunksIn atomic.Uint64
  35. samplesIn atomic.Uint64
  36. overflows atomic.Uint64
  37. discontinuities atomic.Uint64
  38. transportLoss atomic.Uint64
  39. reorders atomic.Uint64
  40. lastChunkAtUnix atomic.Int64
  41. lastError atomic.Value // string
  42. nextSeq atomic.Uint64
  43. seqMu sync.Mutex
  44. lastFrame uint16
  45. lastHasVal bool
  46. }
  47. func New(id string, cfg aoiprxkit.SRTConfig, opts ...Option) *Source {
  48. if id == "" {
  49. id = "srt-main"
  50. }
  51. if cfg.Mode == "" {
  52. cfg.Mode = "listener"
  53. }
  54. if cfg.SampleRateHz <= 0 {
  55. cfg.SampleRateHz = 48000
  56. }
  57. if cfg.Channels <= 0 {
  58. cfg.Channels = 2
  59. }
  60. s := &Source{
  61. id: id,
  62. cfg: cfg,
  63. chunks: make(chan ingest.PCMChunk, 64),
  64. errs: make(chan error, 8),
  65. }
  66. for _, opt := range opts {
  67. if opt != nil {
  68. opt(s)
  69. }
  70. }
  71. s.state.Store("idle")
  72. s.lastError.Store("")
  73. return s
  74. }
  75. func (s *Source) Descriptor() ingest.SourceDescriptor {
  76. return ingest.SourceDescriptor{
  77. ID: s.id,
  78. Kind: "srt",
  79. Family: "aoip",
  80. Transport: "srt",
  81. Codec: "pcm_s32le",
  82. Channels: s.cfg.Channels,
  83. SampleRateHz: s.cfg.SampleRateHz,
  84. Detail: s.cfg.URL,
  85. }
  86. }
  87. func (s *Source) Start(ctx context.Context) error {
  88. if !s.started.CompareAndSwap(false, true) {
  89. return nil
  90. }
  91. var (
  92. rx *aoiprxkit.SRTReceiver
  93. err error
  94. )
  95. if s.opener != nil {
  96. rx, err = aoiprxkit.NewSRTReceiverWithOpener(s.cfg, s.opener, s.handleFrame)
  97. } else {
  98. rx, err = aoiprxkit.NewSRTReceiver(s.cfg, s.handleFrame)
  99. }
  100. if err != nil {
  101. s.started.Store(false)
  102. s.connected.Store(false)
  103. s.state.Store("failed")
  104. s.setError(err)
  105. return err
  106. }
  107. runCtx, cancel := context.WithCancel(ctx)
  108. s.cancel = cancel
  109. s.mu.Lock()
  110. s.rx = rx
  111. s.mu.Unlock()
  112. s.lastError.Store("")
  113. s.connected.Store(false)
  114. s.state.Store("connecting")
  115. if err := rx.Start(runCtx); err != nil {
  116. s.started.Store(false)
  117. s.connected.Store(false)
  118. s.state.Store("failed")
  119. s.setError(err)
  120. return err
  121. }
  122. s.connected.Store(true)
  123. s.state.Store("running")
  124. s.wg.Add(1)
  125. go func() {
  126. defer s.wg.Done()
  127. <-runCtx.Done()
  128. _ = s.stopReceiver()
  129. s.connected.Store(false)
  130. s.closeChannels()
  131. }()
  132. return nil
  133. }
  134. func (s *Source) Stop() error {
  135. if !s.started.CompareAndSwap(true, false) {
  136. return nil
  137. }
  138. if s.cancel != nil {
  139. s.cancel()
  140. }
  141. if err := s.stopReceiver(); err != nil {
  142. s.setError(err)
  143. s.state.Store("failed")
  144. }
  145. s.wg.Wait()
  146. s.connected.Store(false)
  147. state, _ := s.state.Load().(string)
  148. if state != "failed" {
  149. s.state.Store("stopped")
  150. }
  151. return nil
  152. }
  153. func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
  154. func (s *Source) Errors() <-chan error { return s.errs }
  155. func (s *Source) Stats() ingest.SourceStats {
  156. state, _ := s.state.Load().(string)
  157. last := s.lastChunkAtUnix.Load()
  158. errStr, _ := s.lastError.Load().(string)
  159. var lastChunkAt time.Time
  160. if last > 0 {
  161. lastChunkAt = time.Unix(0, last)
  162. }
  163. return ingest.SourceStats{
  164. State: state,
  165. Connected: s.connected.Load(),
  166. LastChunkAt: lastChunkAt,
  167. ChunksIn: s.chunksIn.Load(),
  168. SamplesIn: s.samplesIn.Load(),
  169. Overflows: s.overflows.Load(),
  170. Discontinuities: s.discontinuities.Load(),
  171. TransportLoss: s.transportLoss.Load(),
  172. Reorders: s.reorders.Load(),
  173. LastError: errStr,
  174. }
  175. }
  176. func (s *Source) handleFrame(frame aoiprxkit.PCMFrame) {
  177. if !s.started.Load() {
  178. return
  179. }
  180. discontinuity := false
  181. s.seqMu.Lock()
  182. if s.lastHasVal {
  183. expected := s.lastFrame + 1
  184. if frame.SequenceNumber != expected {
  185. discontinuity = true
  186. delta := int16(frame.SequenceNumber - expected)
  187. if delta > 0 {
  188. s.transportLoss.Add(uint64(delta))
  189. } else {
  190. s.reorders.Add(1)
  191. }
  192. }
  193. }
  194. s.lastFrame = frame.SequenceNumber
  195. s.lastHasVal = true
  196. s.seqMu.Unlock()
  197. chunk := ingest.PCMChunk{
  198. Samples: append([]int32(nil), frame.Samples...),
  199. Channels: frame.Channels,
  200. SampleRateHz: frame.SampleRateHz,
  201. Sequence: s.nextSeq.Add(1) - 1,
  202. Timestamp: frame.ReceivedAt,
  203. SourceID: s.id,
  204. Discontinuity: discontinuity,
  205. }
  206. s.chunksIn.Add(1)
  207. s.samplesIn.Add(uint64(len(chunk.Samples)))
  208. s.lastChunkAtUnix.Store(time.Now().UnixNano())
  209. if discontinuity {
  210. s.discontinuities.Add(1)
  211. }
  212. select {
  213. case s.chunks <- chunk:
  214. default:
  215. s.overflows.Add(1)
  216. s.discontinuities.Add(1)
  217. s.setError(io.ErrShortBuffer)
  218. s.emitError(fmt.Errorf("srt chunk buffer overflow"))
  219. }
  220. }
  221. func (s *Source) stopReceiver() error {
  222. s.mu.Lock()
  223. rx := s.rx
  224. s.rx = nil
  225. s.mu.Unlock()
  226. if rx == nil {
  227. return nil
  228. }
  229. return rx.Stop()
  230. }
  231. func (s *Source) closeChannels() {
  232. s.closeOnce.Do(func() {
  233. close(s.chunks)
  234. close(s.errs)
  235. })
  236. }
  237. func (s *Source) setError(err error) {
  238. if err == nil {
  239. return
  240. }
  241. s.lastError.Store(err.Error())
  242. s.emitError(err)
  243. }
  244. func (s *Source) emitError(err error) {
  245. if err == nil {
  246. return
  247. }
  248. select {
  249. case s.errs <- err:
  250. default:
  251. }
  252. }