Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

348 lines
8.6KB

  1. package icecast
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jan/fm-rds-tx/internal/ingest"
  14. "github.com/jan/fm-rds-tx/internal/ingest/decoder"
  15. "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac"
  16. "github.com/jan/fm-rds-tx/internal/ingest/decoder/fallback"
  17. "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3"
  18. "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis"
  19. )
  20. type Source struct {
  21. id string
  22. url string
  23. client *http.Client
  24. decReg *decoder.Registry
  25. reconn ReconnectConfig
  26. decoderPreference string
  27. chunks chan ingest.PCMChunk
  28. errs chan error
  29. title chan string
  30. cancel context.CancelFunc
  31. wg sync.WaitGroup
  32. state atomic.Value // string
  33. connected atomic.Bool
  34. chunksIn atomic.Uint64
  35. samplesIn atomic.Uint64
  36. reconnects atomic.Uint64
  37. discontinuities atomic.Uint64
  38. lastChunkAtUnix atomic.Int64
  39. lastMetaAtUnix atomic.Int64
  40. metadataUpdates atomic.Uint64
  41. icyMetaInt atomic.Int64
  42. lastError atomic.Value // string
  43. streamTitle atomic.Value // string
  44. }
  45. var errStreamEnded = errors.New("icecast stream ended")
  46. type Option func(*Source)
  47. func WithDecoderPreference(pref string) Option {
  48. return func(s *Source) {
  49. s.decoderPreference = normalizeDecoderPreference(pref)
  50. }
  51. }
  52. func WithDecoderRegistry(reg *decoder.Registry) Option {
  53. return func(s *Source) {
  54. if reg != nil {
  55. s.decReg = reg
  56. }
  57. }
  58. }
  59. func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Option) *Source {
  60. if id == "" {
  61. id = "icecast-main"
  62. }
  63. if client == nil {
  64. client = &http.Client{Timeout: 20 * time.Second}
  65. }
  66. s := &Source{
  67. id: id,
  68. url: strings.TrimSpace(url),
  69. client: client,
  70. reconn: reconn,
  71. chunks: make(chan ingest.PCMChunk, 64),
  72. errs: make(chan error, 8),
  73. title: make(chan string, 16),
  74. decReg: defaultRegistry(),
  75. decoderPreference: "auto",
  76. }
  77. for _, opt := range opts {
  78. if opt != nil {
  79. opt(s)
  80. }
  81. }
  82. s.decoderPreference = normalizeDecoderPreference(s.decoderPreference)
  83. s.state.Store("idle")
  84. s.streamTitle.Store("")
  85. return s
  86. }
  87. func defaultRegistry() *decoder.Registry {
  88. r := decoder.NewRegistry()
  89. r.Register("mp3", func() decoder.Decoder { return mp3.New() })
  90. r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() })
  91. r.Register("aac", func() decoder.Decoder { return aac.New() })
  92. r.Register("ffmpeg", func() decoder.Decoder { return fallback.NewFFmpeg() })
  93. return r
  94. }
  95. func (s *Source) Descriptor() ingest.SourceDescriptor {
  96. return ingest.SourceDescriptor{
  97. ID: s.id,
  98. Kind: "icecast",
  99. Family: "streaming",
  100. Transport: "http",
  101. Codec: s.decoderPreference,
  102. Detail: s.url,
  103. }
  104. }
  105. func (s *Source) Start(ctx context.Context) error {
  106. if s.url == "" {
  107. return fmt.Errorf("icecast url is required")
  108. }
  109. runCtx, cancel := context.WithCancel(ctx)
  110. s.cancel = cancel
  111. s.lastError.Store("")
  112. s.state.Store("connecting")
  113. s.wg.Add(1)
  114. go s.loop(runCtx)
  115. return nil
  116. }
  117. func (s *Source) Stop() error {
  118. if s.cancel != nil {
  119. s.cancel()
  120. }
  121. s.wg.Wait()
  122. s.state.Store("stopped")
  123. return nil
  124. }
  125. func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
  126. func (s *Source) Errors() <-chan error { return s.errs }
  127. func (s *Source) StreamTitleUpdates() <-chan string {
  128. return s.title
  129. }
  130. func (s *Source) Stats() ingest.SourceStats {
  131. state, _ := s.state.Load().(string)
  132. last := s.lastChunkAtUnix.Load()
  133. lastMeta := s.lastMetaAtUnix.Load()
  134. errStr, _ := s.lastError.Load().(string)
  135. streamTitle, _ := s.streamTitle.Load().(string)
  136. var lastChunkAt time.Time
  137. var lastMetaAt time.Time
  138. if last > 0 {
  139. lastChunkAt = time.Unix(0, last)
  140. }
  141. if lastMeta > 0 {
  142. lastMetaAt = time.Unix(0, lastMeta)
  143. }
  144. return ingest.SourceStats{
  145. State: state,
  146. Connected: s.connected.Load(),
  147. LastChunkAt: lastChunkAt,
  148. LastMetaAt: lastMetaAt,
  149. StreamTitle: streamTitle,
  150. MetadataUpdates: s.metadataUpdates.Load(),
  151. IcyMetaInt: int(s.icyMetaInt.Load()),
  152. ChunksIn: s.chunksIn.Load(),
  153. SamplesIn: s.samplesIn.Load(),
  154. Reconnects: s.reconnects.Load(),
  155. Discontinuities: s.discontinuities.Load(),
  156. LastError: errStr,
  157. }
  158. }
  159. func (s *Source) loop(ctx context.Context) {
  160. defer s.wg.Done()
  161. defer close(s.chunks)
  162. defer close(s.errs)
  163. defer close(s.title)
  164. attempt := 0
  165. for {
  166. select {
  167. case <-ctx.Done():
  168. return
  169. default:
  170. }
  171. s.state.Store("connecting")
  172. err := s.connectAndRun(ctx)
  173. if ctx.Err() != nil {
  174. return
  175. }
  176. if err == nil {
  177. err = errStreamEnded
  178. }
  179. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  180. return
  181. }
  182. s.connected.Store(false)
  183. s.lastError.Store(err.Error())
  184. select {
  185. case s.errs <- err:
  186. default:
  187. }
  188. s.state.Store("reconnecting")
  189. attempt++
  190. s.reconnects.Add(1)
  191. backoff := s.reconn.nextBackoff(attempt)
  192. if backoff <= 0 {
  193. s.state.Store("failed")
  194. return
  195. }
  196. select {
  197. case <-time.After(backoff):
  198. case <-ctx.Done():
  199. return
  200. }
  201. }
  202. }
  203. func (s *Source) connectAndRun(ctx context.Context) error {
  204. req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
  205. if err != nil {
  206. return err
  207. }
  208. req.Header.Set("Icy-MetaData", "1")
  209. resp, err := s.client.Do(req)
  210. if err != nil {
  211. return fmt.Errorf("icecast connect: %w", err)
  212. }
  213. defer resp.Body.Close()
  214. if resp.StatusCode != http.StatusOK {
  215. return fmt.Errorf("icecast status: %s", resp.Status)
  216. }
  217. s.connected.Store(true)
  218. s.state.Store("buffering")
  219. s.lastError.Store("")
  220. icyMetaInt, _ := parseICYMetaInt(resp.Header.Get("icy-metaint"))
  221. s.icyMetaInt.Store(int64(icyMetaInt))
  222. stream := newICYReader(resp.Body, icyMetaInt, s.onMetadata)
  223. s.state.Store("running")
  224. return s.decodeWithPreference(ctx, stream, decoder.StreamMeta{
  225. ContentType: resp.Header.Get("Content-Type"),
  226. SourceID: s.id,
  227. SampleRateHz: 44100,
  228. Channels: 2,
  229. })
  230. }
  231. func (s *Source) onMetadata(meta icyMetadata) {
  232. s.streamTitle.Store(meta.StreamTitle)
  233. s.metadataUpdates.Add(1)
  234. s.lastMetaAtUnix.Store(time.Now().UnixNano())
  235. select {
  236. case s.title <- meta.StreamTitle:
  237. default:
  238. }
  239. }
  240. func (s *Source) emitChunk(chunk ingest.PCMChunk) error {
  241. select {
  242. case s.chunks <- chunk:
  243. default:
  244. s.discontinuities.Add(1)
  245. return io.ErrShortBuffer
  246. }
  247. s.chunksIn.Add(1)
  248. s.samplesIn.Add(uint64(len(chunk.Samples)))
  249. s.lastChunkAtUnix.Store(time.Now().UnixNano())
  250. return nil
  251. }
  252. func (s *Source) decodeWithPreference(ctx context.Context, stream io.Reader, meta decoder.StreamMeta) error {
  253. mode := normalizeDecoderPreference(s.decoderPreference)
  254. switch mode {
  255. case "ffmpeg":
  256. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  257. case "native":
  258. native, err := s.decReg.SelectByContentType(meta.ContentType)
  259. if err != nil {
  260. return fmt.Errorf("icecast native decoder select: %w", err)
  261. }
  262. return native.DecodeStream(ctx, stream, meta, s.emitChunk)
  263. case "auto":
  264. // Phase-1 policy: try native decoder first, then fall back to ffmpeg
  265. // only when native selection/decode reports "unsupported".
  266. native, err := s.decReg.SelectByContentType(meta.ContentType)
  267. if err == nil {
  268. captured := &capturingReader{r: stream}
  269. if err := native.DecodeStream(ctx, captured, meta, s.emitChunk); err == nil {
  270. return nil
  271. } else if !errors.Is(err, decoder.ErrUnsupported) {
  272. return err
  273. }
  274. // Native decode can consume stream bytes before returning "unsupported".
  275. // Reconstruct a full reader for fallback: consumed prefix + remaining stream.
  276. stream = io.MultiReader(bytes.NewReader(captured.Bytes()), stream)
  277. } else if !errors.Is(err, decoder.ErrUnsupported) {
  278. return fmt.Errorf("icecast decoder select: %w", err)
  279. }
  280. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  281. default:
  282. return fmt.Errorf("unsupported icecast decoder mode: %s", mode)
  283. }
  284. }
  285. type capturingReader struct {
  286. r io.Reader
  287. buf bytes.Buffer
  288. }
  289. func (r *capturingReader) Read(p []byte) (int, error) {
  290. n, err := r.r.Read(p)
  291. if n > 0 {
  292. _, _ = r.buf.Write(p[:n])
  293. }
  294. return n, err
  295. }
  296. func (r *capturingReader) Bytes() []byte {
  297. return r.buf.Bytes()
  298. }
  299. func (s *Source) decodeNamed(ctx context.Context, name string, stream io.Reader, meta decoder.StreamMeta) error {
  300. dec, err := s.decReg.Create(name)
  301. if err != nil {
  302. return fmt.Errorf("icecast decoder=%s unavailable: %w", name, err)
  303. }
  304. return dec.DecodeStream(ctx, stream, meta, s.emitChunk)
  305. }
  306. func normalizeDecoderPreference(pref string) string {
  307. switch strings.ToLower(strings.TrimSpace(pref)) {
  308. case "", "auto":
  309. return "auto"
  310. case "native":
  311. return "native"
  312. case "ffmpeg", "fallback":
  313. return "ffmpeg"
  314. default:
  315. return strings.ToLower(strings.TrimSpace(pref))
  316. }
  317. }