Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

342 行
6.9KB

  1. package aoip
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "aoiprxkit"
  10. "github.com/jan/fm-rds-tx/internal/ingest"
  11. )
  12. type ReceiverClient interface {
  13. Start(ctx context.Context) error
  14. Stop() error
  15. Stats() aoiprxkit.Stats
  16. }
  17. type ReceiverFactory func(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error)
  18. type Option func(*Source)
  19. func WithReceiverFactory(factory ReceiverFactory) Option {
  20. return func(s *Source) {
  21. if factory != nil {
  22. s.factory = factory
  23. }
  24. }
  25. }
  26. func WithDetail(detail string) Option {
  27. return func(s *Source) {
  28. s.detail = detail
  29. }
  30. }
  31. func WithOrigin(origin ingest.SourceOrigin) Option {
  32. return func(s *Source) {
  33. clone := origin
  34. s.origin = &clone
  35. }
  36. }
  37. type Source struct {
  38. id string
  39. cfg aoiprxkit.Config
  40. factory ReceiverFactory
  41. detail string
  42. origin *ingest.SourceOrigin
  43. chunks chan ingest.PCMChunk
  44. errs chan error
  45. cancel context.CancelFunc
  46. wg sync.WaitGroup
  47. mu sync.Mutex
  48. rx ReceiverClient
  49. started atomic.Bool
  50. closeOnce sync.Once
  51. state atomic.Value // string
  52. connected atomic.Bool
  53. chunksIn atomic.Uint64
  54. samplesIn atomic.Uint64
  55. overflows atomic.Uint64
  56. discontinuities atomic.Uint64
  57. transportLoss atomic.Uint64
  58. reorders atomic.Uint64
  59. lastChunkAtUnix atomic.Int64
  60. lastError atomic.Value // string
  61. nextSeq atomic.Uint64
  62. seqMu sync.Mutex
  63. lastFrame uint16
  64. lastHasVal bool
  65. }
  66. func New(id string, cfg aoiprxkit.Config, opts ...Option) *Source {
  67. if id == "" {
  68. id = "aes67-main"
  69. }
  70. if cfg.MulticastGroup == "" {
  71. cfg = aoiprxkit.DefaultConfig()
  72. }
  73. s := &Source{
  74. id: id,
  75. cfg: cfg,
  76. factory: newReceiverAdapter,
  77. chunks: make(chan ingest.PCMChunk, 64),
  78. errs: make(chan error, 8),
  79. }
  80. for _, opt := range opts {
  81. if opt != nil {
  82. opt(s)
  83. }
  84. }
  85. s.state.Store("idle")
  86. s.lastError.Store("")
  87. return s
  88. }
  89. func (s *Source) Descriptor() ingest.SourceDescriptor {
  90. detail := s.detail
  91. if detail == "" {
  92. detail = fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port)
  93. }
  94. origin := s.origin
  95. if origin == nil {
  96. origin = &ingest.SourceOrigin{
  97. Kind: "manual",
  98. }
  99. }
  100. if origin.Endpoint == "" {
  101. copyOrigin := *origin
  102. copyOrigin.Endpoint = fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port)
  103. origin = &copyOrigin
  104. }
  105. return ingest.SourceDescriptor{
  106. ID: s.id,
  107. Kind: "aes67",
  108. Family: "aoip",
  109. Transport: "rtp",
  110. Codec: "l24",
  111. Channels: s.cfg.Channels,
  112. SampleRateHz: s.cfg.SampleRateHz,
  113. Detail: detail,
  114. Origin: origin,
  115. }
  116. }
  117. func (s *Source) Start(ctx context.Context) error {
  118. if !s.started.CompareAndSwap(false, true) {
  119. return nil
  120. }
  121. // BUG-2 fix: recreate channels and reset closeOnce so Stop+Start works.
  122. s.chunks = make(chan ingest.PCMChunk, 64)
  123. s.errs = make(chan error, 8)
  124. s.closeOnce = sync.Once{}
  125. rx, err := s.factory(s.cfg, s.handleFrame)
  126. if err != nil {
  127. s.started.Store(false)
  128. s.connected.Store(false)
  129. s.state.Store("failed")
  130. s.setError(err)
  131. return err
  132. }
  133. runCtx, cancel := context.WithCancel(ctx)
  134. s.cancel = cancel
  135. s.mu.Lock()
  136. s.rx = rx
  137. s.mu.Unlock()
  138. s.lastError.Store("")
  139. s.connected.Store(false)
  140. s.state.Store("connecting")
  141. if err := rx.Start(runCtx); err != nil {
  142. s.started.Store(false)
  143. s.connected.Store(false)
  144. s.state.Store("failed")
  145. s.setError(err)
  146. return err
  147. }
  148. s.connected.Store(true)
  149. s.state.Store("running")
  150. s.wg.Add(1)
  151. go func() {
  152. defer s.wg.Done()
  153. <-runCtx.Done()
  154. _ = s.stopReceiver()
  155. s.connected.Store(false)
  156. s.closeChannels()
  157. }()
  158. return nil
  159. }
  160. func (s *Source) Stop() error {
  161. if !s.started.CompareAndSwap(true, false) {
  162. return nil
  163. }
  164. if s.cancel != nil {
  165. s.cancel()
  166. }
  167. if err := s.stopReceiver(); err != nil {
  168. s.setError(err)
  169. s.state.Store("failed")
  170. }
  171. s.wg.Wait()
  172. s.connected.Store(false)
  173. state, _ := s.state.Load().(string)
  174. if state != "failed" {
  175. s.state.Store("stopped")
  176. }
  177. return nil
  178. }
  179. func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
  180. func (s *Source) Errors() <-chan error { return s.errs }
  181. func (s *Source) Stats() ingest.SourceStats {
  182. state, _ := s.state.Load().(string)
  183. last := s.lastChunkAtUnix.Load()
  184. errStr, _ := s.lastError.Load().(string)
  185. var lastChunkAt time.Time
  186. if last > 0 {
  187. lastChunkAt = time.Unix(0, last)
  188. }
  189. var rxStats aoiprxkit.Stats
  190. s.mu.Lock()
  191. rx := s.rx
  192. s.mu.Unlock()
  193. if rx != nil {
  194. rxStats = rx.Stats()
  195. }
  196. transportLoss := s.transportLoss.Load()
  197. if rxStats.PacketsGapLoss > transportLoss {
  198. transportLoss = rxStats.PacketsGapLoss
  199. }
  200. reorders := s.reorders.Load()
  201. if rxStats.JitterReorders > reorders {
  202. reorders = rxStats.JitterReorders
  203. }
  204. return ingest.SourceStats{
  205. State: state,
  206. Connected: s.connected.Load(),
  207. LastChunkAt: lastChunkAt,
  208. ChunksIn: s.chunksIn.Load(),
  209. SamplesIn: s.samplesIn.Load(),
  210. Overflows: s.overflows.Load(),
  211. Underruns: rxStats.PacketsLateDrop,
  212. Discontinuities: s.discontinuities.Load() + rxStats.PacketsLateDrop,
  213. TransportLoss: transportLoss,
  214. Reorders: reorders,
  215. JitterDepth: s.cfg.JitterDepthPackets,
  216. LastError: errStr,
  217. }
  218. }
  219. func (s *Source) handleFrame(frame aoiprxkit.PCMFrame) {
  220. if !s.started.Load() {
  221. return
  222. }
  223. discontinuity := false
  224. s.seqMu.Lock()
  225. if s.lastHasVal {
  226. expected := s.lastFrame + 1
  227. if frame.SequenceNumber != expected {
  228. discontinuity = true
  229. delta := int16(frame.SequenceNumber - expected)
  230. if delta > 0 {
  231. s.transportLoss.Add(uint64(delta))
  232. } else {
  233. s.reorders.Add(1)
  234. }
  235. }
  236. }
  237. s.lastFrame = frame.SequenceNumber
  238. s.lastHasVal = true
  239. s.seqMu.Unlock()
  240. chunk := ingest.PCMChunk{
  241. Samples: append([]int32(nil), frame.Samples...),
  242. Channels: frame.Channels,
  243. SampleRateHz: frame.SampleRateHz,
  244. Sequence: s.nextSeq.Add(1) - 1,
  245. Timestamp: frame.ReceivedAt,
  246. SourceID: s.id,
  247. Discontinuity: discontinuity,
  248. }
  249. s.chunksIn.Add(1)
  250. s.samplesIn.Add(uint64(len(chunk.Samples)))
  251. s.lastChunkAtUnix.Store(time.Now().UnixNano())
  252. if discontinuity {
  253. s.discontinuities.Add(1)
  254. }
  255. select {
  256. case s.chunks <- chunk:
  257. default:
  258. s.overflows.Add(1)
  259. s.discontinuities.Add(1)
  260. s.setError(io.ErrShortBuffer)
  261. s.emitError(fmt.Errorf("aes67 chunk buffer overflow"))
  262. }
  263. }
  264. func (s *Source) stopReceiver() error {
  265. s.mu.Lock()
  266. rx := s.rx
  267. s.rx = nil
  268. s.mu.Unlock()
  269. if rx == nil {
  270. return nil
  271. }
  272. return rx.Stop()
  273. }
  274. func (s *Source) closeChannels() {
  275. s.closeOnce.Do(func() {
  276. close(s.chunks)
  277. close(s.errs)
  278. })
  279. }
  280. func (s *Source) setError(err error) {
  281. if err == nil {
  282. return
  283. }
  284. s.lastError.Store(err.Error())
  285. s.emitError(err)
  286. }
  287. func (s *Source) emitError(err error) {
  288. if err == nil {
  289. return
  290. }
  291. select {
  292. case s.errs <- err:
  293. default:
  294. }
  295. }
  296. type receiverAdapter struct {
  297. *aoiprxkit.Receiver
  298. }
  299. func newReceiverAdapter(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error) {
  300. rx, err := aoiprxkit.NewReceiver(cfg, onFrame)
  301. if err != nil {
  302. return nil, err
  303. }
  304. return &receiverAdapter{Receiver: rx}, nil
  305. }