|
- package icecast
-
- import (
- "context"
- "fmt"
- "io"
- "net/http"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/jan/fm-rds-tx/internal/ingest"
- "github.com/jan/fm-rds-tx/internal/ingest/decoder"
- "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac"
- "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3"
- "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis"
- )
-
- type Source struct {
- id string
- url string
-
- client *http.Client
- decReg *decoder.Registry
- reconn ReconnectConfig
-
- chunks chan ingest.PCMChunk
- errs chan error
-
- cancel context.CancelFunc
- wg sync.WaitGroup
-
- state atomic.Value // string
- connected atomic.Bool
- chunksIn atomic.Uint64
- samplesIn atomic.Uint64
- reconnects atomic.Uint64
- discontinuities atomic.Uint64
- lastChunkAtUnix atomic.Int64
- lastError atomic.Value // string
- }
-
- func New(id, url string, client *http.Client, reconn ReconnectConfig) *Source {
- if id == "" {
- id = "icecast-main"
- }
- if client == nil {
- client = &http.Client{Timeout: 20 * time.Second}
- }
- s := &Source{
- id: id,
- url: strings.TrimSpace(url),
- client: client,
- reconn: reconn,
- chunks: make(chan ingest.PCMChunk, 64),
- errs: make(chan error, 8),
- decReg: defaultRegistry(),
- }
- s.state.Store("idle")
- return s
- }
-
- func defaultRegistry() *decoder.Registry {
- r := decoder.NewRegistry()
- r.Register("mp3", func() decoder.Decoder { return mp3.New() })
- r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() })
- r.Register("aac", func() decoder.Decoder { return aac.New() })
- return r
- }
-
- func (s *Source) Descriptor() ingest.SourceDescriptor {
- return ingest.SourceDescriptor{
- ID: s.id,
- Kind: "icecast",
- Family: "streaming",
- Transport: "http",
- Codec: "auto",
- Detail: s.url,
- }
- }
-
- func (s *Source) Start(ctx context.Context) error {
- if s.url == "" {
- return fmt.Errorf("icecast url is required")
- }
- runCtx, cancel := context.WithCancel(ctx)
- s.cancel = cancel
- s.state.Store("connecting")
- s.wg.Add(1)
- go s.loop(runCtx)
- return nil
- }
-
- func (s *Source) Stop() error {
- if s.cancel != nil {
- s.cancel()
- }
- s.wg.Wait()
- s.state.Store("stopped")
- return nil
- }
-
- func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
- func (s *Source) Errors() <-chan error { return s.errs }
-
- func (s *Source) Stats() ingest.SourceStats {
- state, _ := s.state.Load().(string)
- last := s.lastChunkAtUnix.Load()
- errStr, _ := s.lastError.Load().(string)
- var lastChunkAt time.Time
- if last > 0 {
- lastChunkAt = time.Unix(0, last)
- }
- return ingest.SourceStats{
- State: state,
- Connected: s.connected.Load(),
- LastChunkAt: lastChunkAt,
- ChunksIn: s.chunksIn.Load(),
- SamplesIn: s.samplesIn.Load(),
- Reconnects: s.reconnects.Load(),
- Discontinuities: s.discontinuities.Load(),
- LastError: errStr,
- }
- }
-
- func (s *Source) loop(ctx context.Context) {
- defer s.wg.Done()
- defer close(s.chunks)
- attempt := 0
- for {
- select {
- case <-ctx.Done():
- return
- default:
- }
-
- s.state.Store("connecting")
- err := s.connectAndRun(ctx)
- if err == nil || ctx.Err() != nil {
- return
- }
- s.connected.Store(false)
- s.lastError.Store(err.Error())
- select {
- case s.errs <- err:
- default:
- }
- s.state.Store("reconnecting")
- attempt++
- s.reconnects.Add(1)
- backoff := s.reconn.nextBackoff(attempt)
- if backoff <= 0 {
- s.state.Store("failed")
- return
- }
- select {
- case <-time.After(backoff):
- case <-ctx.Done():
- return
- }
- }
- }
-
- func (s *Source) connectAndRun(ctx context.Context) error {
- req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
- if err != nil {
- return err
- }
- req.Header.Set("Icy-MetaData", "0")
- resp, err := s.client.Do(req)
- if err != nil {
- return fmt.Errorf("icecast connect: %w", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("icecast status: %s", resp.Status)
- }
- s.connected.Store(true)
- s.state.Store("buffering")
-
- dec, err := s.decReg.SelectByContentType(resp.Header.Get("Content-Type"))
- if err != nil {
- return fmt.Errorf("icecast decoder select: %w", err)
- }
- s.state.Store("running")
- return dec.DecodeStream(ctx, resp.Body, decoder.StreamMeta{
- ContentType: resp.Header.Get("Content-Type"),
- SourceID: s.id,
- }, s.emitChunk)
- }
-
- func (s *Source) emitChunk(chunk ingest.PCMChunk) error {
- select {
- case s.chunks <- chunk:
- default:
- s.discontinuities.Add(1)
- return io.ErrShortBuffer
- }
- s.chunksIn.Add(1)
- s.samplesIn.Add(uint64(len(chunk.Samples)))
- s.lastChunkAtUnix.Store(time.Now().UnixNano())
- return nil
- }
|