package icecast import ( "context" "errors" "fmt" "io" "net/http" "strings" "sync" "sync/atomic" "time" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac" "github.com/jan/fm-rds-tx/internal/ingest/decoder/fallback" "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3" "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis" ) type Source struct { id string url string client *http.Client decReg *decoder.Registry reconn ReconnectConfig decoderPreference string chunks chan ingest.PCMChunk errs chan error cancel context.CancelFunc wg sync.WaitGroup state atomic.Value // string connected atomic.Bool chunksIn atomic.Uint64 samplesIn atomic.Uint64 reconnects atomic.Uint64 discontinuities atomic.Uint64 lastChunkAtUnix atomic.Int64 lastError atomic.Value // string } type Option func(*Source) func WithDecoderPreference(pref string) Option { return func(s *Source) { s.decoderPreference = normalizeDecoderPreference(pref) } } func WithDecoderRegistry(reg *decoder.Registry) Option { return func(s *Source) { if reg != nil { s.decReg = reg } } } func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Option) *Source { if id == "" { id = "icecast-main" } if client == nil { client = &http.Client{Timeout: 20 * time.Second} } s := &Source{ id: id, url: strings.TrimSpace(url), client: client, reconn: reconn, chunks: make(chan ingest.PCMChunk, 64), errs: make(chan error, 8), decReg: defaultRegistry(), decoderPreference: "auto", } for _, opt := range opts { if opt != nil { opt(s) } } s.decoderPreference = normalizeDecoderPreference(s.decoderPreference) s.state.Store("idle") return s } func defaultRegistry() *decoder.Registry { r := decoder.NewRegistry() r.Register("mp3", func() decoder.Decoder { return mp3.New() }) r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() }) r.Register("aac", func() decoder.Decoder { return aac.New() }) r.Register("ffmpeg", func() decoder.Decoder { return fallback.NewFFmpeg() }) return r } func (s *Source) Descriptor() ingest.SourceDescriptor { return ingest.SourceDescriptor{ ID: s.id, Kind: "icecast", Family: "streaming", Transport: "http", Codec: s.decoderPreference, Detail: s.url, } } func (s *Source) Start(ctx context.Context) error { if s.url == "" { return fmt.Errorf("icecast url is required") } runCtx, cancel := context.WithCancel(ctx) s.cancel = cancel s.state.Store("connecting") s.wg.Add(1) go s.loop(runCtx) return nil } func (s *Source) Stop() error { if s.cancel != nil { s.cancel() } s.wg.Wait() s.state.Store("stopped") return nil } func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks } func (s *Source) Errors() <-chan error { return s.errs } func (s *Source) Stats() ingest.SourceStats { state, _ := s.state.Load().(string) last := s.lastChunkAtUnix.Load() errStr, _ := s.lastError.Load().(string) var lastChunkAt time.Time if last > 0 { lastChunkAt = time.Unix(0, last) } return ingest.SourceStats{ State: state, Connected: s.connected.Load(), LastChunkAt: lastChunkAt, ChunksIn: s.chunksIn.Load(), SamplesIn: s.samplesIn.Load(), Reconnects: s.reconnects.Load(), Discontinuities: s.discontinuities.Load(), LastError: errStr, } } func (s *Source) loop(ctx context.Context) { defer s.wg.Done() defer close(s.chunks) attempt := 0 for { select { case <-ctx.Done(): return default: } s.state.Store("connecting") err := s.connectAndRun(ctx) if err == nil || ctx.Err() != nil { return } s.connected.Store(false) s.lastError.Store(err.Error()) select { case s.errs <- err: default: } s.state.Store("reconnecting") attempt++ s.reconnects.Add(1) backoff := s.reconn.nextBackoff(attempt) if backoff <= 0 { s.state.Store("failed") return } select { case <-time.After(backoff): case <-ctx.Done(): return } } } func (s *Source) connectAndRun(ctx context.Context) error { req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) if err != nil { return err } req.Header.Set("Icy-MetaData", "0") resp, err := s.client.Do(req) if err != nil { return fmt.Errorf("icecast connect: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("icecast status: %s", resp.Status) } s.connected.Store(true) s.state.Store("buffering") s.state.Store("running") return s.decodeWithPreference(ctx, resp.Body, decoder.StreamMeta{ ContentType: resp.Header.Get("Content-Type"), SourceID: s.id, SampleRateHz: 44100, Channels: 2, }) } func (s *Source) emitChunk(chunk ingest.PCMChunk) error { select { case s.chunks <- chunk: default: s.discontinuities.Add(1) return io.ErrShortBuffer } s.chunksIn.Add(1) s.samplesIn.Add(uint64(len(chunk.Samples))) s.lastChunkAtUnix.Store(time.Now().UnixNano()) return nil } func (s *Source) decodeWithPreference(ctx context.Context, stream io.Reader, meta decoder.StreamMeta) error { mode := normalizeDecoderPreference(s.decoderPreference) switch mode { case "ffmpeg": return s.decodeNamed(ctx, "ffmpeg", stream, meta) case "native": native, err := s.decReg.SelectByContentType(meta.ContentType) if err != nil { return fmt.Errorf("icecast native decoder select: %w", err) } return native.DecodeStream(ctx, stream, meta, s.emitChunk) case "auto": // Phase-1 policy: try native decoder first, then fall back to ffmpeg // only when native selection/decode reports "unsupported". native, err := s.decReg.SelectByContentType(meta.ContentType) if err == nil { if err := native.DecodeStream(ctx, stream, meta, s.emitChunk); err == nil { return nil } else if !errors.Is(err, decoder.ErrUnsupported) { return err } } else if !errors.Is(err, decoder.ErrUnsupported) { return fmt.Errorf("icecast decoder select: %w", err) } return s.decodeNamed(ctx, "ffmpeg", stream, meta) default: return fmt.Errorf("unsupported icecast decoder mode: %s", mode) } } func (s *Source) decodeNamed(ctx context.Context, name string, stream io.Reader, meta decoder.StreamMeta) error { dec, err := s.decReg.Create(name) if err != nil { return fmt.Errorf("icecast decoder=%s unavailable: %w", name, err) } return dec.DecodeStream(ctx, stream, meta, s.emitChunk) } func normalizeDecoderPreference(pref string) string { switch strings.ToLower(strings.TrimSpace(pref)) { case "", "auto": return "auto" case "native": return "native" case "ffmpeg", "fallback": return "ffmpeg" default: return strings.ToLower(strings.TrimSpace(pref)) } }