Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

368 строки
8.9KB

  1. package icecast
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/jan/fm-rds-tx/internal/ingest"
  15. "github.com/jan/fm-rds-tx/internal/ingest/decoder"
  16. "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac"
  17. "github.com/jan/fm-rds-tx/internal/ingest/decoder/fallback"
  18. "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3"
  19. "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis"
  20. )
  21. type Source struct {
  22. id string
  23. url string
  24. client *http.Client
  25. decReg *decoder.Registry
  26. reconn ReconnectConfig
  27. decoderPreference string
  28. chunks chan ingest.PCMChunk
  29. errs chan error
  30. title chan string
  31. cancel context.CancelFunc
  32. wg sync.WaitGroup
  33. state atomic.Value // string
  34. connected atomic.Bool
  35. chunksIn atomic.Uint64
  36. samplesIn atomic.Uint64
  37. reconnects atomic.Uint64
  38. discontinuities atomic.Uint64
  39. lastChunkAtUnix atomic.Int64
  40. lastMetaAtUnix atomic.Int64
  41. metadataUpdates atomic.Uint64
  42. icyMetaInt atomic.Int64
  43. lastError atomic.Value // string
  44. streamTitle atomic.Value // string
  45. }
  46. var errStreamEnded = errors.New("icecast stream ended")
  47. type Option func(*Source)
  48. func WithDecoderPreference(pref string) Option {
  49. return func(s *Source) {
  50. s.decoderPreference = normalizeDecoderPreference(pref)
  51. }
  52. }
  53. func WithDecoderRegistry(reg *decoder.Registry) Option {
  54. return func(s *Source) {
  55. if reg != nil {
  56. s.decReg = reg
  57. }
  58. }
  59. }
  60. func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Option) *Source {
  61. if id == "" {
  62. id = "icecast-main"
  63. }
  64. if client == nil {
  65. client = &http.Client{Timeout: 20 * time.Second}
  66. }
  67. s := &Source{
  68. id: id,
  69. url: strings.TrimSpace(url),
  70. client: client,
  71. reconn: reconn,
  72. chunks: make(chan ingest.PCMChunk, 64),
  73. errs: make(chan error, 8),
  74. title: make(chan string, 16),
  75. decReg: defaultRegistry(),
  76. decoderPreference: "auto",
  77. }
  78. for _, opt := range opts {
  79. if opt != nil {
  80. opt(s)
  81. }
  82. }
  83. s.decoderPreference = normalizeDecoderPreference(s.decoderPreference)
  84. s.state.Store("idle")
  85. s.streamTitle.Store("")
  86. return s
  87. }
  88. func defaultRegistry() *decoder.Registry {
  89. r := decoder.NewRegistry()
  90. r.Register("mp3", func() decoder.Decoder { return mp3.New() })
  91. r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() })
  92. r.Register("aac", func() decoder.Decoder { return aac.New() })
  93. r.Register("ffmpeg", func() decoder.Decoder { return fallback.NewFFmpeg() })
  94. return r
  95. }
  96. func (s *Source) Descriptor() ingest.SourceDescriptor {
  97. return ingest.SourceDescriptor{
  98. ID: s.id,
  99. Kind: "icecast",
  100. Family: "streaming",
  101. Transport: "http",
  102. Codec: s.decoderPreference,
  103. Detail: s.url,
  104. Origin: &ingest.SourceOrigin{
  105. Kind: "url",
  106. Endpoint: redactURL(s.url),
  107. },
  108. }
  109. }
  110. func (s *Source) Start(ctx context.Context) error {
  111. if s.url == "" {
  112. return fmt.Errorf("icecast url is required")
  113. }
  114. runCtx, cancel := context.WithCancel(ctx)
  115. s.cancel = cancel
  116. s.lastError.Store("")
  117. s.state.Store("connecting")
  118. s.wg.Add(1)
  119. go s.loop(runCtx)
  120. return nil
  121. }
  122. func (s *Source) Stop() error {
  123. if s.cancel != nil {
  124. s.cancel()
  125. }
  126. s.wg.Wait()
  127. s.state.Store("stopped")
  128. return nil
  129. }
  130. func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
  131. func (s *Source) Errors() <-chan error { return s.errs }
  132. func (s *Source) StreamTitleUpdates() <-chan string {
  133. return s.title
  134. }
  135. func (s *Source) Stats() ingest.SourceStats {
  136. state, _ := s.state.Load().(string)
  137. last := s.lastChunkAtUnix.Load()
  138. lastMeta := s.lastMetaAtUnix.Load()
  139. errStr, _ := s.lastError.Load().(string)
  140. streamTitle, _ := s.streamTitle.Load().(string)
  141. var lastChunkAt time.Time
  142. var lastMetaAt time.Time
  143. if last > 0 {
  144. lastChunkAt = time.Unix(0, last)
  145. }
  146. if lastMeta > 0 {
  147. lastMetaAt = time.Unix(0, lastMeta)
  148. }
  149. return ingest.SourceStats{
  150. State: state,
  151. Connected: s.connected.Load(),
  152. LastChunkAt: lastChunkAt,
  153. LastMetaAt: lastMetaAt,
  154. StreamTitle: streamTitle,
  155. MetadataUpdates: s.metadataUpdates.Load(),
  156. IcyMetaInt: int(s.icyMetaInt.Load()),
  157. ChunksIn: s.chunksIn.Load(),
  158. SamplesIn: s.samplesIn.Load(),
  159. Reconnects: s.reconnects.Load(),
  160. Discontinuities: s.discontinuities.Load(),
  161. LastError: errStr,
  162. }
  163. }
  164. func (s *Source) loop(ctx context.Context) {
  165. defer s.wg.Done()
  166. defer close(s.chunks)
  167. defer close(s.errs)
  168. defer close(s.title)
  169. attempt := 0
  170. for {
  171. select {
  172. case <-ctx.Done():
  173. return
  174. default:
  175. }
  176. s.state.Store("connecting")
  177. err := s.connectAndRun(ctx)
  178. if ctx.Err() != nil {
  179. return
  180. }
  181. if err == nil {
  182. err = errStreamEnded
  183. }
  184. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  185. return
  186. }
  187. s.connected.Store(false)
  188. s.lastError.Store(err.Error())
  189. select {
  190. case s.errs <- err:
  191. default:
  192. }
  193. s.state.Store("reconnecting")
  194. attempt++
  195. s.reconnects.Add(1)
  196. backoff := s.reconn.nextBackoff(attempt)
  197. if backoff <= 0 {
  198. s.state.Store("failed")
  199. return
  200. }
  201. select {
  202. case <-time.After(backoff):
  203. case <-ctx.Done():
  204. return
  205. }
  206. }
  207. }
  208. func (s *Source) connectAndRun(ctx context.Context) error {
  209. req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
  210. if err != nil {
  211. return err
  212. }
  213. req.Header.Set("Icy-MetaData", "1")
  214. resp, err := s.client.Do(req)
  215. if err != nil {
  216. return fmt.Errorf("icecast connect: %w", err)
  217. }
  218. defer resp.Body.Close()
  219. if resp.StatusCode != http.StatusOK {
  220. return fmt.Errorf("icecast status: %s", resp.Status)
  221. }
  222. s.connected.Store(true)
  223. s.state.Store("buffering")
  224. s.lastError.Store("")
  225. icyMetaInt, _ := parseICYMetaInt(resp.Header.Get("icy-metaint"))
  226. s.icyMetaInt.Store(int64(icyMetaInt))
  227. stream := newICYReader(resp.Body, icyMetaInt, s.onMetadata)
  228. s.state.Store("running")
  229. return s.decodeWithPreference(ctx, stream, decoder.StreamMeta{
  230. ContentType: resp.Header.Get("Content-Type"),
  231. SourceID: s.id,
  232. SampleRateHz: 44100,
  233. Channels: 2,
  234. })
  235. }
  236. func (s *Source) onMetadata(meta icyMetadata) {
  237. s.streamTitle.Store(meta.StreamTitle)
  238. s.metadataUpdates.Add(1)
  239. s.lastMetaAtUnix.Store(time.Now().UnixNano())
  240. select {
  241. case s.title <- meta.StreamTitle:
  242. default:
  243. }
  244. }
  245. func (s *Source) emitChunk(chunk ingest.PCMChunk) error {
  246. select {
  247. case s.chunks <- chunk:
  248. default:
  249. s.discontinuities.Add(1)
  250. return io.ErrShortBuffer
  251. }
  252. s.chunksIn.Add(1)
  253. s.samplesIn.Add(uint64(len(chunk.Samples)))
  254. s.lastChunkAtUnix.Store(time.Now().UnixNano())
  255. return nil
  256. }
  257. func (s *Source) decodeWithPreference(ctx context.Context, stream io.Reader, meta decoder.StreamMeta) error {
  258. mode := normalizeDecoderPreference(s.decoderPreference)
  259. switch mode {
  260. case "ffmpeg":
  261. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  262. case "native":
  263. native, err := s.decReg.SelectByContentType(meta.ContentType)
  264. if err != nil {
  265. return fmt.Errorf("icecast native decoder select: %w", err)
  266. }
  267. return native.DecodeStream(ctx, stream, meta, s.emitChunk)
  268. case "auto":
  269. // Phase-1 policy: try native decoder first, then fall back to ffmpeg
  270. // only when native selection/decode reports "unsupported".
  271. native, err := s.decReg.SelectByContentType(meta.ContentType)
  272. if err == nil {
  273. captured := &capturingReader{r: stream}
  274. if err := native.DecodeStream(ctx, captured, meta, s.emitChunk); err == nil {
  275. return nil
  276. } else if !errors.Is(err, decoder.ErrUnsupported) {
  277. return err
  278. }
  279. // Native decode can consume stream bytes before returning "unsupported".
  280. // Reconstruct a full reader for fallback: consumed prefix + remaining stream.
  281. stream = io.MultiReader(bytes.NewReader(captured.Bytes()), stream)
  282. } else if !errors.Is(err, decoder.ErrUnsupported) {
  283. return fmt.Errorf("icecast decoder select: %w", err)
  284. }
  285. return s.decodeNamed(ctx, "ffmpeg", stream, meta)
  286. default:
  287. return fmt.Errorf("unsupported icecast decoder mode: %s", mode)
  288. }
  289. }
  290. type capturingReader struct {
  291. r io.Reader
  292. buf bytes.Buffer
  293. }
  294. func (r *capturingReader) Read(p []byte) (int, error) {
  295. n, err := r.r.Read(p)
  296. if n > 0 {
  297. _, _ = r.buf.Write(p[:n])
  298. }
  299. return n, err
  300. }
  301. func (r *capturingReader) Bytes() []byte {
  302. return r.buf.Bytes()
  303. }
  304. func (s *Source) decodeNamed(ctx context.Context, name string, stream io.Reader, meta decoder.StreamMeta) error {
  305. dec, err := s.decReg.Create(name)
  306. if err != nil {
  307. return fmt.Errorf("icecast decoder=%s unavailable: %w", name, err)
  308. }
  309. return dec.DecodeStream(ctx, stream, meta, s.emitChunk)
  310. }
  311. func normalizeDecoderPreference(pref string) string {
  312. switch strings.ToLower(strings.TrimSpace(pref)) {
  313. case "", "auto":
  314. return "auto"
  315. case "native":
  316. return "native"
  317. case "ffmpeg", "fallback":
  318. return "ffmpeg"
  319. default:
  320. return strings.ToLower(strings.TrimSpace(pref))
  321. }
  322. }
  323. func redactURL(raw string) string {
  324. trimmed := strings.TrimSpace(raw)
  325. if trimmed == "" {
  326. return ""
  327. }
  328. u, err := url.Parse(trimmed)
  329. if err != nil || u.Host == "" {
  330. return trimmed
  331. }
  332. u.User = nil
  333. u.RawQuery = ""
  334. u.Fragment = ""
  335. return u.String()
  336. }