Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

205 строки
4.5KB

  1. package icecast
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/jan/fm-rds-tx/internal/ingest"
  12. "github.com/jan/fm-rds-tx/internal/ingest/decoder"
  13. "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac"
  14. "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3"
  15. "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis"
  16. )
  17. type Source struct {
  18. id string
  19. url string
  20. client *http.Client
  21. decReg *decoder.Registry
  22. reconn ReconnectConfig
  23. chunks chan ingest.PCMChunk
  24. errs chan error
  25. cancel context.CancelFunc
  26. wg sync.WaitGroup
  27. state atomic.Value // string
  28. connected atomic.Bool
  29. chunksIn atomic.Uint64
  30. samplesIn atomic.Uint64
  31. reconnects atomic.Uint64
  32. discontinuities atomic.Uint64
  33. lastChunkAtUnix atomic.Int64
  34. lastError atomic.Value // string
  35. }
  36. func New(id, url string, client *http.Client, reconn ReconnectConfig) *Source {
  37. if id == "" {
  38. id = "icecast-main"
  39. }
  40. if client == nil {
  41. client = &http.Client{Timeout: 20 * time.Second}
  42. }
  43. s := &Source{
  44. id: id,
  45. url: strings.TrimSpace(url),
  46. client: client,
  47. reconn: reconn,
  48. chunks: make(chan ingest.PCMChunk, 64),
  49. errs: make(chan error, 8),
  50. decReg: defaultRegistry(),
  51. }
  52. s.state.Store("idle")
  53. return s
  54. }
  55. func defaultRegistry() *decoder.Registry {
  56. r := decoder.NewRegistry()
  57. r.Register("mp3", func() decoder.Decoder { return mp3.New() })
  58. r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() })
  59. r.Register("aac", func() decoder.Decoder { return aac.New() })
  60. return r
  61. }
  62. func (s *Source) Descriptor() ingest.SourceDescriptor {
  63. return ingest.SourceDescriptor{
  64. ID: s.id,
  65. Kind: "icecast",
  66. Family: "streaming",
  67. Transport: "http",
  68. Codec: "auto",
  69. Detail: s.url,
  70. }
  71. }
  72. func (s *Source) Start(ctx context.Context) error {
  73. if s.url == "" {
  74. return fmt.Errorf("icecast url is required")
  75. }
  76. runCtx, cancel := context.WithCancel(ctx)
  77. s.cancel = cancel
  78. s.state.Store("connecting")
  79. s.wg.Add(1)
  80. go s.loop(runCtx)
  81. return nil
  82. }
  83. func (s *Source) Stop() error {
  84. if s.cancel != nil {
  85. s.cancel()
  86. }
  87. s.wg.Wait()
  88. s.state.Store("stopped")
  89. return nil
  90. }
  91. func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
  92. func (s *Source) Errors() <-chan error { return s.errs }
  93. func (s *Source) Stats() ingest.SourceStats {
  94. state, _ := s.state.Load().(string)
  95. last := s.lastChunkAtUnix.Load()
  96. errStr, _ := s.lastError.Load().(string)
  97. var lastChunkAt time.Time
  98. if last > 0 {
  99. lastChunkAt = time.Unix(0, last)
  100. }
  101. return ingest.SourceStats{
  102. State: state,
  103. Connected: s.connected.Load(),
  104. LastChunkAt: lastChunkAt,
  105. ChunksIn: s.chunksIn.Load(),
  106. SamplesIn: s.samplesIn.Load(),
  107. Reconnects: s.reconnects.Load(),
  108. Discontinuities: s.discontinuities.Load(),
  109. LastError: errStr,
  110. }
  111. }
  112. func (s *Source) loop(ctx context.Context) {
  113. defer s.wg.Done()
  114. defer close(s.chunks)
  115. attempt := 0
  116. for {
  117. select {
  118. case <-ctx.Done():
  119. return
  120. default:
  121. }
  122. s.state.Store("connecting")
  123. err := s.connectAndRun(ctx)
  124. if err == nil || ctx.Err() != nil {
  125. return
  126. }
  127. s.connected.Store(false)
  128. s.lastError.Store(err.Error())
  129. select {
  130. case s.errs <- err:
  131. default:
  132. }
  133. s.state.Store("reconnecting")
  134. attempt++
  135. s.reconnects.Add(1)
  136. backoff := s.reconn.nextBackoff(attempt)
  137. if backoff <= 0 {
  138. s.state.Store("failed")
  139. return
  140. }
  141. select {
  142. case <-time.After(backoff):
  143. case <-ctx.Done():
  144. return
  145. }
  146. }
  147. }
  148. func (s *Source) connectAndRun(ctx context.Context) error {
  149. req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
  150. if err != nil {
  151. return err
  152. }
  153. req.Header.Set("Icy-MetaData", "0")
  154. resp, err := s.client.Do(req)
  155. if err != nil {
  156. return fmt.Errorf("icecast connect: %w", err)
  157. }
  158. defer resp.Body.Close()
  159. if resp.StatusCode != http.StatusOK {
  160. return fmt.Errorf("icecast status: %s", resp.Status)
  161. }
  162. s.connected.Store(true)
  163. s.state.Store("buffering")
  164. dec, err := s.decReg.SelectByContentType(resp.Header.Get("Content-Type"))
  165. if err != nil {
  166. return fmt.Errorf("icecast decoder select: %w", err)
  167. }
  168. s.state.Store("running")
  169. return dec.DecodeStream(ctx, resp.Body, decoder.StreamMeta{
  170. ContentType: resp.Header.Get("Content-Type"),
  171. SourceID: s.id,
  172. }, s.emitChunk)
  173. }
  174. func (s *Source) emitChunk(chunk ingest.PCMChunk) error {
  175. select {
  176. case s.chunks <- chunk:
  177. default:
  178. s.discontinuities.Add(1)
  179. return io.ErrShortBuffer
  180. }
  181. s.chunksIn.Add(1)
  182. s.samplesIn.Add(uint64(len(chunk.Samples)))
  183. s.lastChunkAtUnix.Store(time.Now().UnixNano())
  184. return nil
  185. }