Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

380 satır
9.5KB

  1. package icecast
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/jan/fm-rds-tx/internal/ingest"
  15. "github.com/jan/fm-rds-tx/internal/ingest/decoder"
  16. "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac"
  17. "github.com/jan/fm-rds-tx/internal/ingest/decoder/fallback"
  18. "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3"
  19. "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis"
  20. )
  21. type Source struct {
  22. id string
  23. url string
  24. client *http.Client
  25. decReg *decoder.Registry
  26. reconn ReconnectConfig
  27. decoderPreference string
  28. chunks chan ingest.PCMChunk
  29. errs chan error
  30. title chan string
  31. cancel context.CancelFunc
  32. wg sync.WaitGroup
  33. state atomic.Value // string
  34. connected atomic.Bool
  35. chunksIn atomic.Uint64
  36. samplesIn atomic.Uint64
  37. reconnects atomic.Uint64
  38. discontinuities atomic.Uint64
  39. lastChunkAtUnix atomic.Int64
  40. lastMetaAtUnix atomic.Int64
  41. metadataUpdates atomic.Uint64
  42. icyMetaInt atomic.Int64
  43. lastError atomic.Value // string
  44. streamTitle atomic.Value // string
  45. }
  46. var errStreamEnded = errors.New("icecast stream ended")
  47. type Option func(*Source)
  48. func WithDecoderPreference(pref string) Option {
  49. return func(s *Source) {
  50. s.decoderPreference = normalizeDecoderPreference(pref)
  51. }
  52. }
  53. func WithDecoderRegistry(reg *decoder.Registry) Option {
  54. return func(s *Source) {
  55. if reg != nil {
  56. s.decReg = reg
  57. }
  58. }
  59. }
  60. func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Option) *Source {
  61. if id == "" {
  62. id = "icecast-main"
  63. }
  64. if client == nil {
  65. // Streaming responses are long-lived; a global client timeout would
  66. // terminate the body read after a fixed duration.
  67. client = &http.Client{}
  68. }
  69. s := &Source{
  70. id: id,
  71. url: strings.TrimSpace(url),
  72. client: client,
  73. reconn: reconn,
  74. chunks: make(chan ingest.PCMChunk, 64),
  75. errs: make(chan error, 8),
  76. title: make(chan string, 16),
  77. decReg: defaultRegistry(),
  78. decoderPreference: "auto",
  79. }
  80. for _, opt := range opts {
  81. if opt != nil {
  82. opt(s)
  83. }
  84. }
  85. s.decoderPreference = normalizeDecoderPreference(s.decoderPreference)
  86. s.state.Store("idle")
  87. s.streamTitle.Store("")
  88. return s
  89. }
  90. func defaultRegistry() *decoder.Registry {
  91. r := decoder.NewRegistry()
  92. r.Register("mp3", func() decoder.Decoder { return mp3.New() })
  93. r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() })
  94. r.Register("aac", func() decoder.Decoder { return aac.New() })
  95. r.Register("ffmpeg", func() decoder.Decoder { return fallback.NewFFmpeg() })
  96. return r
  97. }
  98. func (s *Source) Descriptor() ingest.SourceDescriptor {
  99. return ingest.SourceDescriptor{
  100. ID: s.id,
  101. Kind: "icecast",
  102. Family: "streaming",
  103. Transport: "http",
  104. Codec: s.decoderPreference,
  105. Detail: s.url,
  106. Origin: &ingest.SourceOrigin{
  107. Kind: "url",
  108. Endpoint: redactURL(s.url),
  109. },
  110. }
  111. }
  112. func (s *Source) Start(ctx context.Context) error {
  113. if s.url == "" {
  114. return fmt.Errorf("icecast url is required")
  115. }
  116. runCtx, cancel := context.WithCancel(ctx)
  117. s.cancel = cancel
  118. s.lastError.Store("")
  119. s.state.Store("connecting")
  120. s.wg.Add(1)
  121. go s.loop(runCtx)
  122. return nil
  123. }
  124. func (s *Source) Stop() error {
  125. if s.cancel != nil {
  126. s.cancel()
  127. }
  128. s.wg.Wait()
  129. s.state.Store("stopped")
  130. return nil
  131. }
  132. func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
  133. func (s *Source) Errors() <-chan error { return s.errs }
  134. func (s *Source) StreamTitleUpdates() <-chan string {
  135. return s.title
  136. }
  137. func (s *Source) Stats() ingest.SourceStats {
  138. state, _ := s.state.Load().(string)
  139. last := s.lastChunkAtUnix.Load()
  140. lastMeta := s.lastMetaAtUnix.Load()
  141. errStr, _ := s.lastError.Load().(string)
  142. streamTitle, _ := s.streamTitle.Load().(string)
  143. var lastChunkAt time.Time
  144. var lastMetaAt time.Time
  145. if last > 0 {
  146. lastChunkAt = time.Unix(0, last)
  147. }
  148. if lastMeta > 0 {
  149. lastMetaAt = time.Unix(0, lastMeta)
  150. }
  151. return ingest.SourceStats{
  152. State: state,
  153. Connected: s.connected.Load(),
  154. LastChunkAt: lastChunkAt,
  155. LastMetaAt: lastMetaAt,
  156. StreamTitle: streamTitle,
  157. MetadataUpdates: s.metadataUpdates.Load(),
  158. IcyMetaInt: int(s.icyMetaInt.Load()),
  159. ChunksIn: s.chunksIn.Load(),
  160. SamplesIn: s.samplesIn.Load(),
  161. Reconnects: s.reconnects.Load(),
  162. Discontinuities: s.discontinuities.Load(),
  163. LastError: errStr,
  164. }
  165. }
  166. func (s *Source) loop(ctx context.Context) {
  167. defer s.wg.Done()
  168. defer close(s.chunks)
  169. defer close(s.errs)
  170. defer close(s.title)
  171. attempt := 0
  172. for {
  173. select {
  174. case <-ctx.Done():
  175. return
  176. default:
  177. }
  178. s.state.Store("connecting")
  179. err := s.connectAndRun(ctx)
  180. if ctx.Err() != nil {
  181. return
  182. }
  183. if err == nil {
  184. err = errStreamEnded
  185. }
  186. s.connected.Store(false)
  187. s.lastError.Store(err.Error())
  188. select {
  189. case s.errs <- err:
  190. default:
  191. }
  192. s.state.Store("reconnecting")
  193. attempt++
  194. s.reconnects.Add(1)
  195. backoff := s.reconn.nextBackoff(attempt)
  196. if backoff <= 0 {
  197. s.state.Store("failed")
  198. return
  199. }
  200. select {
  201. case <-time.After(backoff):
  202. case <-ctx.Done():
  203. return
  204. }
  205. }
  206. }
  207. func (s *Source) connectAndRun(ctx context.Context) error {
  208. req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
  209. if err != nil {
  210. return err
  211. }
  212. req.Header.Set("Icy-MetaData", "1")
  213. resp, err := s.client.Do(req)
  214. if err != nil {
  215. return fmt.Errorf("icecast connect: %w", err)
  216. }
  217. defer resp.Body.Close()
  218. if resp.StatusCode != http.StatusOK {
  219. return fmt.Errorf("icecast status: %s", resp.Status)
  220. }
  221. s.connected.Store(true)
  222. s.state.Store("buffering")
  223. s.lastError.Store("")
  224. icyMetaInt, _ := parseICYMetaInt(resp.Header.Get("icy-metaint"))
  225. s.icyMetaInt.Store(int64(icyMetaInt))
  226. stream := newICYReader(resp.Body, icyMetaInt, s.onMetadata)
  227. s.state.Store("running")
  228. return s.decodeWithPreference(ctx, stream, decoder.StreamMeta{
  229. ContentType: resp.Header.Get("Content-Type"),
  230. SourceID: s.id,
  231. SampleRateHz: 44100,
  232. Channels: 2,
  233. })
  234. }
  235. func (s *Source) onMetadata(meta icyMetadata) {
  236. s.streamTitle.Store(meta.StreamTitle)
  237. s.metadataUpdates.Add(1)
  238. s.lastMetaAtUnix.Store(time.Now().UnixNano())
  239. select {
  240. case s.title <- meta.StreamTitle:
  241. default:
  242. }
  243. }
  244. func (s *Source) emitChunk(chunk ingest.PCMChunk) error {
  245. select {
  246. case s.chunks <- chunk:
  247. default:
  248. s.discontinuities.Add(1)
  249. return io.ErrShortBuffer
  250. }
  251. s.chunksIn.Add(1)
  252. s.samplesIn.Add(uint64(len(chunk.Samples)))
  253. s.lastChunkAtUnix.Store(time.Now().UnixNano())
  254. return nil
  255. }
  256. func (s *Source) decodeWithPreference(ctx context.Context, stream io.Reader, meta decoder.StreamMeta) error {
  257. mode := normalizeDecoderPreference(s.decoderPreference)
  258. switch mode {
  259. case "ffmpeg":
  260. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  261. case "native":
  262. native, err := s.decReg.SelectByContentType(meta.ContentType)
  263. if err != nil {
  264. return fmt.Errorf("icecast native decoder select: %w", err)
  265. }
  266. return native.DecodeStream(ctx, stream, meta, s.emitChunk)
  267. case "auto":
  268. // Phase-1 policy: try native decoder first, then fall back to ffmpeg
  269. // only when native selection/decode reports "unsupported".
  270. native, err := s.decReg.SelectByContentType(meta.ContentType)
  271. if err == nil {
  272. captured := &capturingReader{r: stream}
  273. if err := native.DecodeStream(ctx, captured, meta, s.emitChunk); err == nil {
  274. return nil
  275. } else if !errors.Is(err, decoder.ErrUnsupported) {
  276. return err
  277. }
  278. // Native decode can consume stream bytes before returning "unsupported".
  279. // Reconstruct a full reader for fallback: consumed prefix + remaining stream.
  280. stream = io.MultiReader(bytes.NewReader(captured.Bytes()), stream)
  281. } else if !errors.Is(err, decoder.ErrUnsupported) {
  282. return fmt.Errorf("icecast decoder select: %w", err)
  283. }
  284. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  285. default:
  286. return fmt.Errorf("unsupported icecast decoder mode: %s", mode)
  287. }
  288. }
  289. // maxCaptureBytes caps the amount of stream data buffered while the native
  290. // decoder is deciding whether it can handle the format. Without a cap, a
  291. // decoder that reads extensively before returning ErrUnsupported could grow
  292. // this buffer unboundedly on a corrupt or adversarial stream.
  293. const maxCaptureBytes = 1 << 20 // 1 MiB
  294. // errCaptureLimitExceeded is returned by capturingReader when the buffer cap
  295. // is hit. The caller should treat it like ErrUnsupported and fall back.
  296. var errCaptureLimitExceeded = errors.New("capture buffer limit exceeded")
  297. type capturingReader struct {
  298. r io.Reader
  299. buf bytes.Buffer
  300. }
  301. func (r *capturingReader) Read(p []byte) (int, error) {
  302. if r.buf.Len() >= maxCaptureBytes {
  303. return 0, errCaptureLimitExceeded
  304. }
  305. n, err := r.r.Read(p)
  306. if n > 0 {
  307. _, _ = r.buf.Write(p[:n])
  308. }
  309. return n, err
  310. }
  311. func (r *capturingReader) Bytes() []byte {
  312. return r.buf.Bytes()
  313. }
  314. func (s *Source) decodeNamed(ctx context.Context, name string, stream io.Reader, meta decoder.StreamMeta) error {
  315. dec, err := s.decReg.Create(name)
  316. if err != nil {
  317. return fmt.Errorf("icecast decoder=%s unavailable: %w", name, err)
  318. }
  319. return dec.DecodeStream(ctx, stream, meta, s.emitChunk)
  320. }
  321. func normalizeDecoderPreference(pref string) string {
  322. switch strings.ToLower(strings.TrimSpace(pref)) {
  323. case "", "auto":
  324. return "auto"
  325. case "native":
  326. return "native"
  327. case "ffmpeg", "fallback":
  328. return "ffmpeg"
  329. default:
  330. return strings.ToLower(strings.TrimSpace(pref))
  331. }
  332. }
  333. func redactURL(raw string) string {
  334. trimmed := strings.TrimSpace(raw)
  335. if trimmed == "" {
  336. return ""
  337. }
  338. u, err := url.Parse(trimmed)
  339. if err != nil || u.Host == "" {
  340. return trimmed
  341. }
  342. u.User = nil
  343. u.RawQuery = ""
  344. u.Fragment = ""
  345. return u.String()
  346. }