Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

304 Zeilen
7.3KB

  1. package icecast
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jan/fm-rds-tx/internal/ingest"
  14. "github.com/jan/fm-rds-tx/internal/ingest/decoder"
  15. "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac"
  16. "github.com/jan/fm-rds-tx/internal/ingest/decoder/fallback"
  17. "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3"
  18. "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis"
  19. )
  20. type Source struct {
  21. id string
  22. url string
  23. client *http.Client
  24. decReg *decoder.Registry
  25. reconn ReconnectConfig
  26. decoderPreference string
  27. chunks chan ingest.PCMChunk
  28. errs chan error
  29. cancel context.CancelFunc
  30. wg sync.WaitGroup
  31. state atomic.Value // string
  32. connected atomic.Bool
  33. chunksIn atomic.Uint64
  34. samplesIn atomic.Uint64
  35. reconnects atomic.Uint64
  36. discontinuities atomic.Uint64
  37. lastChunkAtUnix atomic.Int64
  38. lastError atomic.Value // string
  39. }
  40. type Option func(*Source)
  41. func WithDecoderPreference(pref string) Option {
  42. return func(s *Source) {
  43. s.decoderPreference = normalizeDecoderPreference(pref)
  44. }
  45. }
  46. func WithDecoderRegistry(reg *decoder.Registry) Option {
  47. return func(s *Source) {
  48. if reg != nil {
  49. s.decReg = reg
  50. }
  51. }
  52. }
  53. func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Option) *Source {
  54. if id == "" {
  55. id = "icecast-main"
  56. }
  57. if client == nil {
  58. client = &http.Client{Timeout: 20 * time.Second}
  59. }
  60. s := &Source{
  61. id: id,
  62. url: strings.TrimSpace(url),
  63. client: client,
  64. reconn: reconn,
  65. chunks: make(chan ingest.PCMChunk, 64),
  66. errs: make(chan error, 8),
  67. decReg: defaultRegistry(),
  68. decoderPreference: "auto",
  69. }
  70. for _, opt := range opts {
  71. if opt != nil {
  72. opt(s)
  73. }
  74. }
  75. s.decoderPreference = normalizeDecoderPreference(s.decoderPreference)
  76. s.state.Store("idle")
  77. return s
  78. }
  79. func defaultRegistry() *decoder.Registry {
  80. r := decoder.NewRegistry()
  81. r.Register("mp3", func() decoder.Decoder { return mp3.New() })
  82. r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() })
  83. r.Register("aac", func() decoder.Decoder { return aac.New() })
  84. r.Register("ffmpeg", func() decoder.Decoder { return fallback.NewFFmpeg() })
  85. return r
  86. }
  87. func (s *Source) Descriptor() ingest.SourceDescriptor {
  88. return ingest.SourceDescriptor{
  89. ID: s.id,
  90. Kind: "icecast",
  91. Family: "streaming",
  92. Transport: "http",
  93. Codec: s.decoderPreference,
  94. Detail: s.url,
  95. }
  96. }
  97. func (s *Source) Start(ctx context.Context) error {
  98. if s.url == "" {
  99. return fmt.Errorf("icecast url is required")
  100. }
  101. runCtx, cancel := context.WithCancel(ctx)
  102. s.cancel = cancel
  103. s.state.Store("connecting")
  104. s.wg.Add(1)
  105. go s.loop(runCtx)
  106. return nil
  107. }
  108. func (s *Source) Stop() error {
  109. if s.cancel != nil {
  110. s.cancel()
  111. }
  112. s.wg.Wait()
  113. s.state.Store("stopped")
  114. return nil
  115. }
  116. func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
  117. func (s *Source) Errors() <-chan error { return s.errs }
  118. func (s *Source) Stats() ingest.SourceStats {
  119. state, _ := s.state.Load().(string)
  120. last := s.lastChunkAtUnix.Load()
  121. errStr, _ := s.lastError.Load().(string)
  122. var lastChunkAt time.Time
  123. if last > 0 {
  124. lastChunkAt = time.Unix(0, last)
  125. }
  126. return ingest.SourceStats{
  127. State: state,
  128. Connected: s.connected.Load(),
  129. LastChunkAt: lastChunkAt,
  130. ChunksIn: s.chunksIn.Load(),
  131. SamplesIn: s.samplesIn.Load(),
  132. Reconnects: s.reconnects.Load(),
  133. Discontinuities: s.discontinuities.Load(),
  134. LastError: errStr,
  135. }
  136. }
  137. func (s *Source) loop(ctx context.Context) {
  138. defer s.wg.Done()
  139. defer close(s.chunks)
  140. attempt := 0
  141. for {
  142. select {
  143. case <-ctx.Done():
  144. return
  145. default:
  146. }
  147. s.state.Store("connecting")
  148. err := s.connectAndRun(ctx)
  149. if err == nil || ctx.Err() != nil {
  150. return
  151. }
  152. s.connected.Store(false)
  153. s.lastError.Store(err.Error())
  154. select {
  155. case s.errs <- err:
  156. default:
  157. }
  158. s.state.Store("reconnecting")
  159. attempt++
  160. s.reconnects.Add(1)
  161. backoff := s.reconn.nextBackoff(attempt)
  162. if backoff <= 0 {
  163. s.state.Store("failed")
  164. return
  165. }
  166. select {
  167. case <-time.After(backoff):
  168. case <-ctx.Done():
  169. return
  170. }
  171. }
  172. }
  173. func (s *Source) connectAndRun(ctx context.Context) error {
  174. req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
  175. if err != nil {
  176. return err
  177. }
  178. req.Header.Set("Icy-MetaData", "0")
  179. resp, err := s.client.Do(req)
  180. if err != nil {
  181. return fmt.Errorf("icecast connect: %w", err)
  182. }
  183. defer resp.Body.Close()
  184. if resp.StatusCode != http.StatusOK {
  185. return fmt.Errorf("icecast status: %s", resp.Status)
  186. }
  187. s.connected.Store(true)
  188. s.state.Store("buffering")
  189. s.state.Store("running")
  190. return s.decodeWithPreference(ctx, resp.Body, decoder.StreamMeta{
  191. ContentType: resp.Header.Get("Content-Type"),
  192. SourceID: s.id,
  193. SampleRateHz: 44100,
  194. Channels: 2,
  195. })
  196. }
  197. func (s *Source) emitChunk(chunk ingest.PCMChunk) error {
  198. select {
  199. case s.chunks <- chunk:
  200. default:
  201. s.discontinuities.Add(1)
  202. return io.ErrShortBuffer
  203. }
  204. s.chunksIn.Add(1)
  205. s.samplesIn.Add(uint64(len(chunk.Samples)))
  206. s.lastChunkAtUnix.Store(time.Now().UnixNano())
  207. return nil
  208. }
  209. func (s *Source) decodeWithPreference(ctx context.Context, stream io.Reader, meta decoder.StreamMeta) error {
  210. mode := normalizeDecoderPreference(s.decoderPreference)
  211. switch mode {
  212. case "ffmpeg":
  213. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  214. case "native":
  215. native, err := s.decReg.SelectByContentType(meta.ContentType)
  216. if err != nil {
  217. return fmt.Errorf("icecast native decoder select: %w", err)
  218. }
  219. return native.DecodeStream(ctx, stream, meta, s.emitChunk)
  220. case "auto":
  221. // Phase-1 policy: try native decoder first, then fall back to ffmpeg
  222. // only when native selection/decode reports "unsupported".
  223. native, err := s.decReg.SelectByContentType(meta.ContentType)
  224. if err == nil {
  225. captured := &capturingReader{r: stream}
  226. if err := native.DecodeStream(ctx, captured, meta, s.emitChunk); err == nil {
  227. return nil
  228. } else if !errors.Is(err, decoder.ErrUnsupported) {
  229. return err
  230. }
  231. // Native decode can consume stream bytes before returning "unsupported".
  232. // Reconstruct a full reader for fallback: consumed prefix + remaining stream.
  233. stream = io.MultiReader(bytes.NewReader(captured.Bytes()), stream)
  234. } else if !errors.Is(err, decoder.ErrUnsupported) {
  235. return fmt.Errorf("icecast decoder select: %w", err)
  236. }
  237. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  238. default:
  239. return fmt.Errorf("unsupported icecast decoder mode: %s", mode)
  240. }
  241. }
  242. type capturingReader struct {
  243. r io.Reader
  244. buf bytes.Buffer
  245. }
  246. func (r *capturingReader) Read(p []byte) (int, error) {
  247. n, err := r.r.Read(p)
  248. if n > 0 {
  249. _, _ = r.buf.Write(p[:n])
  250. }
  251. return n, err
  252. }
  253. func (r *capturingReader) Bytes() []byte {
  254. return r.buf.Bytes()
  255. }
  256. func (s *Source) decodeNamed(ctx context.Context, name string, stream io.Reader, meta decoder.StreamMeta) error {
  257. dec, err := s.decReg.Create(name)
  258. if err != nil {
  259. return fmt.Errorf("icecast decoder=%s unavailable: %w", name, err)
  260. }
  261. return dec.DecodeStream(ctx, stream, meta, s.emitChunk)
  262. }
  263. func normalizeDecoderPreference(pref string) string {
  264. switch strings.ToLower(strings.TrimSpace(pref)) {
  265. case "", "auto":
  266. return "auto"
  267. case "native":
  268. return "native"
  269. case "ffmpeg", "fallback":
  270. return "ffmpeg"
  271. default:
  272. return strings.ToLower(strings.TrimSpace(pref))
  273. }
  274. }