Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

385 строки
9.8KB

  1. package icecast
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/jan/fm-rds-tx/internal/ingest"
  15. "github.com/jan/fm-rds-tx/internal/ingest/decoder"
  16. "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac"
  17. "github.com/jan/fm-rds-tx/internal/ingest/decoder/fallback"
  18. "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3"
  19. "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis"
  20. )
  21. type Source struct {
  22. id string
  23. url string
  24. client *http.Client
  25. decReg *decoder.Registry
  26. reconn ReconnectConfig
  27. decoderPreference string
  28. chunks chan ingest.PCMChunk
  29. errs chan error
  30. title chan string
  31. cancel context.CancelFunc
  32. wg sync.WaitGroup
  33. state atomic.Value // string
  34. connected atomic.Bool
  35. chunksIn atomic.Uint64
  36. samplesIn atomic.Uint64
  37. reconnects atomic.Uint64
  38. discontinuities atomic.Uint64
  39. lastChunkAtUnix atomic.Int64
  40. lastMetaAtUnix atomic.Int64
  41. metadataUpdates atomic.Uint64
  42. icyMetaInt atomic.Int64
  43. lastError atomic.Value // string
  44. streamTitle atomic.Value // string
  45. }
  46. var errStreamEnded = errors.New("icecast stream ended")
  47. type Option func(*Source)
  48. func WithDecoderPreference(pref string) Option {
  49. return func(s *Source) {
  50. s.decoderPreference = normalizeDecoderPreference(pref)
  51. }
  52. }
  53. func WithDecoderRegistry(reg *decoder.Registry) Option {
  54. return func(s *Source) {
  55. if reg != nil {
  56. s.decReg = reg
  57. }
  58. }
  59. }
  60. func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Option) *Source {
  61. if id == "" {
  62. id = "icecast-main"
  63. }
  64. if client == nil {
  65. // Streaming responses are long-lived; a global client timeout would
  66. // terminate the body read after a fixed duration.
  67. client = &http.Client{}
  68. }
  69. s := &Source{
  70. id: id,
  71. url: strings.TrimSpace(url),
  72. client: client,
  73. reconn: reconn,
  74. chunks: make(chan ingest.PCMChunk, 64),
  75. errs: make(chan error, 8),
  76. title: make(chan string, 16),
  77. decReg: defaultRegistry(),
  78. decoderPreference: "auto",
  79. }
  80. for _, opt := range opts {
  81. if opt != nil {
  82. opt(s)
  83. }
  84. }
  85. s.decoderPreference = normalizeDecoderPreference(s.decoderPreference)
  86. s.state.Store("idle")
  87. s.streamTitle.Store("")
  88. return s
  89. }
  90. func defaultRegistry() *decoder.Registry {
  91. r := decoder.NewRegistry()
  92. r.Register("mp3", func() decoder.Decoder { return mp3.New() })
  93. r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() })
  94. r.Register("aac", func() decoder.Decoder { return aac.New() })
  95. r.Register("ffmpeg", func() decoder.Decoder { return fallback.NewFFmpeg() })
  96. return r
  97. }
  98. func (s *Source) Descriptor() ingest.SourceDescriptor {
  99. return ingest.SourceDescriptor{
  100. ID: s.id,
  101. Kind: "icecast",
  102. Family: "streaming",
  103. Transport: "http",
  104. Codec: s.decoderPreference,
  105. Detail: s.url,
  106. Origin: &ingest.SourceOrigin{
  107. Kind: "url",
  108. Endpoint: redactURL(s.url),
  109. },
  110. }
  111. }
  112. func (s *Source) Start(ctx context.Context) error {
  113. if s.url == "" {
  114. return fmt.Errorf("icecast url is required")
  115. }
  116. // BUG-2 fix: recreate channels on every Start() so that Stop+Start works.
  117. // loop() closes chunks/errs/title when it exits; reusing closed channels panics.
  118. s.chunks = make(chan ingest.PCMChunk, 64)
  119. s.errs = make(chan error, 8)
  120. s.title = make(chan string, 16)
  121. runCtx, cancel := context.WithCancel(ctx)
  122. s.cancel = cancel
  123. s.lastError.Store("")
  124. s.state.Store("connecting")
  125. s.wg.Add(1)
  126. go s.loop(runCtx)
  127. return nil
  128. }
  129. func (s *Source) Stop() error {
  130. if s.cancel != nil {
  131. s.cancel()
  132. }
  133. s.wg.Wait()
  134. s.state.Store("stopped")
  135. return nil
  136. }
  137. func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
  138. func (s *Source) Errors() <-chan error { return s.errs }
  139. func (s *Source) StreamTitleUpdates() <-chan string {
  140. return s.title
  141. }
  142. func (s *Source) Stats() ingest.SourceStats {
  143. state, _ := s.state.Load().(string)
  144. last := s.lastChunkAtUnix.Load()
  145. lastMeta := s.lastMetaAtUnix.Load()
  146. errStr, _ := s.lastError.Load().(string)
  147. streamTitle, _ := s.streamTitle.Load().(string)
  148. var lastChunkAt time.Time
  149. var lastMetaAt time.Time
  150. if last > 0 {
  151. lastChunkAt = time.Unix(0, last)
  152. }
  153. if lastMeta > 0 {
  154. lastMetaAt = time.Unix(0, lastMeta)
  155. }
  156. return ingest.SourceStats{
  157. State: state,
  158. Connected: s.connected.Load(),
  159. LastChunkAt: lastChunkAt,
  160. LastMetaAt: lastMetaAt,
  161. StreamTitle: streamTitle,
  162. MetadataUpdates: s.metadataUpdates.Load(),
  163. IcyMetaInt: int(s.icyMetaInt.Load()),
  164. ChunksIn: s.chunksIn.Load(),
  165. SamplesIn: s.samplesIn.Load(),
  166. Reconnects: s.reconnects.Load(),
  167. Discontinuities: s.discontinuities.Load(),
  168. LastError: errStr,
  169. }
  170. }
  171. func (s *Source) loop(ctx context.Context) {
  172. defer s.wg.Done()
  173. defer close(s.chunks)
  174. defer close(s.errs)
  175. defer close(s.title)
  176. attempt := 0
  177. for {
  178. select {
  179. case <-ctx.Done():
  180. return
  181. default:
  182. }
  183. s.state.Store("connecting")
  184. err := s.connectAndRun(ctx)
  185. if ctx.Err() != nil {
  186. return
  187. }
  188. if err == nil {
  189. err = errStreamEnded
  190. }
  191. s.connected.Store(false)
  192. s.lastError.Store(err.Error())
  193. select {
  194. case s.errs <- err:
  195. default:
  196. }
  197. s.state.Store("reconnecting")
  198. attempt++
  199. s.reconnects.Add(1)
  200. backoff := s.reconn.nextBackoff(attempt)
  201. if backoff <= 0 {
  202. s.state.Store("failed")
  203. return
  204. }
  205. select {
  206. case <-time.After(backoff):
  207. case <-ctx.Done():
  208. return
  209. }
  210. }
  211. }
  212. func (s *Source) connectAndRun(ctx context.Context) error {
  213. req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
  214. if err != nil {
  215. return err
  216. }
  217. req.Header.Set("Icy-MetaData", "1")
  218. resp, err := s.client.Do(req)
  219. if err != nil {
  220. return fmt.Errorf("icecast connect: %w", err)
  221. }
  222. defer resp.Body.Close()
  223. if resp.StatusCode != http.StatusOK {
  224. return fmt.Errorf("icecast status: %s", resp.Status)
  225. }
  226. s.connected.Store(true)
  227. s.state.Store("buffering")
  228. s.lastError.Store("")
  229. icyMetaInt, _ := parseICYMetaInt(resp.Header.Get("icy-metaint"))
  230. s.icyMetaInt.Store(int64(icyMetaInt))
  231. stream := newICYReader(resp.Body, icyMetaInt, s.onMetadata)
  232. s.state.Store("running")
  233. return s.decodeWithPreference(ctx, stream, decoder.StreamMeta{
  234. ContentType: resp.Header.Get("Content-Type"),
  235. SourceID: s.id,
  236. SampleRateHz: 44100,
  237. Channels: 2,
  238. })
  239. }
  240. func (s *Source) onMetadata(meta icyMetadata) {
  241. s.streamTitle.Store(meta.StreamTitle)
  242. s.metadataUpdates.Add(1)
  243. s.lastMetaAtUnix.Store(time.Now().UnixNano())
  244. select {
  245. case s.title <- meta.StreamTitle:
  246. default:
  247. }
  248. }
  249. func (s *Source) emitChunk(chunk ingest.PCMChunk) error {
  250. select {
  251. case s.chunks <- chunk:
  252. default:
  253. s.discontinuities.Add(1)
  254. return io.ErrShortBuffer
  255. }
  256. s.chunksIn.Add(1)
  257. s.samplesIn.Add(uint64(len(chunk.Samples)))
  258. s.lastChunkAtUnix.Store(time.Now().UnixNano())
  259. return nil
  260. }
  261. func (s *Source) decodeWithPreference(ctx context.Context, stream io.Reader, meta decoder.StreamMeta) error {
  262. mode := normalizeDecoderPreference(s.decoderPreference)
  263. switch mode {
  264. case "ffmpeg":
  265. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  266. case "native":
  267. native, err := s.decReg.SelectByContentType(meta.ContentType)
  268. if err != nil {
  269. return fmt.Errorf("icecast native decoder select: %w", err)
  270. }
  271. return native.DecodeStream(ctx, stream, meta, s.emitChunk)
  272. case "auto":
  273. // Phase-1 policy: try native decoder first, then fall back to ffmpeg
  274. // only when native selection/decode reports "unsupported".
  275. native, err := s.decReg.SelectByContentType(meta.ContentType)
  276. if err == nil {
  277. captured := &capturingReader{r: stream}
  278. if err := native.DecodeStream(ctx, captured, meta, s.emitChunk); err == nil {
  279. return nil
  280. } else if !errors.Is(err, decoder.ErrUnsupported) {
  281. return err
  282. }
  283. // Native decode can consume stream bytes before returning "unsupported".
  284. // Reconstruct a full reader for fallback: consumed prefix + remaining stream.
  285. stream = io.MultiReader(bytes.NewReader(captured.Bytes()), stream)
  286. } else if !errors.Is(err, decoder.ErrUnsupported) {
  287. return fmt.Errorf("icecast decoder select: %w", err)
  288. }
  289. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  290. default:
  291. return fmt.Errorf("unsupported icecast decoder mode: %s", mode)
  292. }
  293. }
  294. // maxCaptureBytes caps the amount of stream data buffered while the native
  295. // decoder is deciding whether it can handle the format. Without a cap, a
  296. // decoder that reads extensively before returning ErrUnsupported could grow
  297. // this buffer unboundedly on a corrupt or adversarial stream.
  298. const maxCaptureBytes = 1 << 20 // 1 MiB
  299. // errCaptureLimitExceeded is returned by capturingReader when the buffer cap
  300. // is hit. The caller should treat it like ErrUnsupported and fall back.
  301. var errCaptureLimitExceeded = errors.New("capture buffer limit exceeded")
  302. type capturingReader struct {
  303. r io.Reader
  304. buf bytes.Buffer
  305. }
  306. func (r *capturingReader) Read(p []byte) (int, error) {
  307. if r.buf.Len() >= maxCaptureBytes {
  308. return 0, errCaptureLimitExceeded
  309. }
  310. n, err := r.r.Read(p)
  311. if n > 0 {
  312. _, _ = r.buf.Write(p[:n])
  313. }
  314. return n, err
  315. }
  316. func (r *capturingReader) Bytes() []byte {
  317. return r.buf.Bytes()
  318. }
  319. func (s *Source) decodeNamed(ctx context.Context, name string, stream io.Reader, meta decoder.StreamMeta) error {
  320. dec, err := s.decReg.Create(name)
  321. if err != nil {
  322. return fmt.Errorf("icecast decoder=%s unavailable: %w", name, err)
  323. }
  324. return dec.DecodeStream(ctx, stream, meta, s.emitChunk)
  325. }
  326. func normalizeDecoderPreference(pref string) string {
  327. switch strings.ToLower(strings.TrimSpace(pref)) {
  328. case "", "auto":
  329. return "auto"
  330. case "native":
  331. return "native"
  332. case "ffmpeg", "fallback":
  333. return "ffmpeg"
  334. default:
  335. return strings.ToLower(strings.TrimSpace(pref))
  336. }
  337. }
  338. func redactURL(raw string) string {
  339. trimmed := strings.TrimSpace(raw)
  340. if trimmed == "" {
  341. return ""
  342. }
  343. u, err := url.Parse(trimmed)
  344. if err != nil || u.Host == "" {
  345. return trimmed
  346. }
  347. u.User = nil
  348. u.RawQuery = ""
  349. u.Fragment = ""
  350. return u.String()
  351. }