diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index fbd8578..5631bca 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -17,6 +17,7 @@ import ( drypkg "github.com/jan/fm-rds-tx/internal/dryrun" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw" + "github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast" "github.com/jan/fm-rds-tx/internal/ingest/adapters/stdinpcm" "github.com/jan/fm-rds-tx/internal/platform" "github.com/jan/fm-rds-tx/internal/platform/plutosdr" @@ -265,6 +266,8 @@ func ingestSampleRate(cfg cfgpkg.Config) int { return cfg.Ingest.Stdin.SampleRateHz case "http-raw": return cfg.Ingest.HTTPRaw.SampleRateHz + case "icecast": + return 44100 default: return 44100 } @@ -278,6 +281,13 @@ func buildPhase1Source(cfg cfgpkg.Config) (ingest.Source, ctrlpkg.AudioIngress, case "http-raw": src := httpraw.New("http-raw-main", cfg.Ingest.HTTPRaw.SampleRateHz, cfg.Ingest.HTTPRaw.Channels) return src, src, nil + case "icecast": + src := icecast.New("icecast-main", cfg.Ingest.Icecast.URL, nil, icecast.ReconnectConfig{ + Enabled: cfg.Ingest.Reconnect.Enabled, + InitialBackoffMs: cfg.Ingest.Reconnect.InitialBackoffMs, + MaxBackoffMs: cfg.Ingest.Reconnect.MaxBackoffMs, + }) + return src, nil, nil case "", "none": return nil, nil, nil default: diff --git a/internal/config/config.go b/internal/config/config.go index 37ca0ae..5ea3506 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -246,6 +246,9 @@ func (c Config) Validate() error { if c.Ingest.Stdin.Channels < 0 || c.Ingest.HTTPRaw.Channels < 0 { return fmt.Errorf("ingest pcm channels must be >= 0") } + if c.Ingest.Kind == "icecast" && strings.TrimSpace(c.Ingest.Icecast.URL) == "" { + return fmt.Errorf("ingest.icecast.url is required when ingest.kind=icecast") + } // Fail-loud PI validation if c.RDS.Enabled { if _, err := ParsePI(c.RDS.PI); err != nil { diff --git a/internal/ingest/adapters/icecast/reconnect.go b/internal/ingest/adapters/icecast/reconnect.go new file mode 100644 index 0000000..44fe2c2 --- /dev/null +++ b/internal/ingest/adapters/icecast/reconnect.go @@ -0,0 +1,31 @@ +package icecast + +import "time" + +type ReconnectConfig struct { + Enabled bool + InitialBackoffMs int + MaxBackoffMs int +} + +func (c ReconnectConfig) nextBackoff(attempt int) time.Duration { + if !c.Enabled { + return 0 + } + initial := c.InitialBackoffMs + if initial <= 0 { + initial = 1000 + } + max := c.MaxBackoffMs + if max <= 0 { + max = 15000 + } + d := time.Duration(initial) * time.Millisecond + for i := 1; i < attempt; i++ { + d *= 2 + if d >= time.Duration(max)*time.Millisecond { + return time.Duration(max) * time.Millisecond + } + } + return d +} diff --git a/internal/ingest/adapters/icecast/reconnect_test.go b/internal/ingest/adapters/icecast/reconnect_test.go new file mode 100644 index 0000000..16f961d --- /dev/null +++ b/internal/ingest/adapters/icecast/reconnect_test.go @@ -0,0 +1,26 @@ +package icecast + +import ( + "testing" + "time" +) + +func TestNextBackoff(t *testing.T) { + cfg := ReconnectConfig{ + Enabled: true, + InitialBackoffMs: 1000, + MaxBackoffMs: 5000, + } + if got := cfg.nextBackoff(1); got != 1*time.Second { + t.Fatalf("attempt1 got %s", got) + } + if got := cfg.nextBackoff(2); got != 2*time.Second { + t.Fatalf("attempt2 got %s", got) + } + if got := cfg.nextBackoff(3); got != 4*time.Second { + t.Fatalf("attempt3 got %s", got) + } + if got := cfg.nextBackoff(4); got != 5*time.Second { + t.Fatalf("attempt4 got %s", got) + } +} diff --git a/internal/ingest/adapters/icecast/source.go b/internal/ingest/adapters/icecast/source.go new file mode 100644 index 0000000..319ed9a --- /dev/null +++ b/internal/ingest/adapters/icecast/source.go @@ -0,0 +1,204 @@ +package icecast + +import ( + "context" + "fmt" + "io" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" + "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac" + "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3" + "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis" +) + +type Source struct { + id string + url string + + client *http.Client + decReg *decoder.Registry + reconn ReconnectConfig + + chunks chan ingest.PCMChunk + errs chan error + + cancel context.CancelFunc + wg sync.WaitGroup + + state atomic.Value // string + connected atomic.Bool + chunksIn atomic.Uint64 + samplesIn atomic.Uint64 + reconnects atomic.Uint64 + discontinuities atomic.Uint64 + lastChunkAtUnix atomic.Int64 + lastError atomic.Value // string +} + +func New(id, url string, client *http.Client, reconn ReconnectConfig) *Source { + if id == "" { + id = "icecast-main" + } + if client == nil { + client = &http.Client{Timeout: 20 * time.Second} + } + s := &Source{ + id: id, + url: strings.TrimSpace(url), + client: client, + reconn: reconn, + chunks: make(chan ingest.PCMChunk, 64), + errs: make(chan error, 8), + decReg: defaultRegistry(), + } + s.state.Store("idle") + return s +} + +func defaultRegistry() *decoder.Registry { + r := decoder.NewRegistry() + r.Register("mp3", func() decoder.Decoder { return mp3.New() }) + r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() }) + r.Register("aac", func() decoder.Decoder { return aac.New() }) + return r +} + +func (s *Source) Descriptor() ingest.SourceDescriptor { + return ingest.SourceDescriptor{ + ID: s.id, + Kind: "icecast", + Family: "streaming", + Transport: "http", + Codec: "auto", + Detail: s.url, + } +} + +func (s *Source) Start(ctx context.Context) error { + if s.url == "" { + return fmt.Errorf("icecast url is required") + } + runCtx, cancel := context.WithCancel(ctx) + s.cancel = cancel + s.state.Store("connecting") + s.wg.Add(1) + go s.loop(runCtx) + return nil +} + +func (s *Source) Stop() error { + if s.cancel != nil { + s.cancel() + } + s.wg.Wait() + s.state.Store("stopped") + return nil +} + +func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks } +func (s *Source) Errors() <-chan error { return s.errs } + +func (s *Source) Stats() ingest.SourceStats { + state, _ := s.state.Load().(string) + last := s.lastChunkAtUnix.Load() + errStr, _ := s.lastError.Load().(string) + var lastChunkAt time.Time + if last > 0 { + lastChunkAt = time.Unix(0, last) + } + return ingest.SourceStats{ + State: state, + Connected: s.connected.Load(), + LastChunkAt: lastChunkAt, + ChunksIn: s.chunksIn.Load(), + SamplesIn: s.samplesIn.Load(), + Reconnects: s.reconnects.Load(), + Discontinuities: s.discontinuities.Load(), + LastError: errStr, + } +} + +func (s *Source) loop(ctx context.Context) { + defer s.wg.Done() + defer close(s.chunks) + attempt := 0 + for { + select { + case <-ctx.Done(): + return + default: + } + + s.state.Store("connecting") + err := s.connectAndRun(ctx) + if err == nil || ctx.Err() != nil { + return + } + s.connected.Store(false) + s.lastError.Store(err.Error()) + select { + case s.errs <- err: + default: + } + s.state.Store("reconnecting") + attempt++ + s.reconnects.Add(1) + backoff := s.reconn.nextBackoff(attempt) + if backoff <= 0 { + s.state.Store("failed") + return + } + select { + case <-time.After(backoff): + case <-ctx.Done(): + return + } + } +} + +func (s *Source) connectAndRun(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) + if err != nil { + return err + } + req.Header.Set("Icy-MetaData", "0") + resp, err := s.client.Do(req) + if err != nil { + return fmt.Errorf("icecast connect: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("icecast status: %s", resp.Status) + } + s.connected.Store(true) + s.state.Store("buffering") + + dec, err := s.decReg.SelectByContentType(resp.Header.Get("Content-Type")) + if err != nil { + return fmt.Errorf("icecast decoder select: %w", err) + } + s.state.Store("running") + return dec.DecodeStream(ctx, resp.Body, decoder.StreamMeta{ + ContentType: resp.Header.Get("Content-Type"), + SourceID: s.id, + }, s.emitChunk) +} + +func (s *Source) emitChunk(chunk ingest.PCMChunk) error { + select { + case s.chunks <- chunk: + default: + s.discontinuities.Add(1) + return io.ErrShortBuffer + } + s.chunksIn.Add(1) + s.samplesIn.Add(uint64(len(chunk.Samples))) + s.lastChunkAtUnix.Store(time.Now().UnixNano()) + return nil +} diff --git a/internal/ingest/decoder/aac/decoder.go b/internal/ingest/decoder/aac/decoder.go new file mode 100644 index 0000000..ec3816c --- /dev/null +++ b/internal/ingest/decoder/aac/decoder.go @@ -0,0 +1,20 @@ +package aac + +import ( + "context" + "fmt" + "io" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" +) + +type Decoder struct{} + +func New() *Decoder { return &Decoder{} } + +func (d *Decoder) Name() string { return "aac-native" } + +func (d *Decoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { + return fmt.Errorf("%w: aac native decoder not wired yet", decoder.ErrUnsupported) +} diff --git a/internal/ingest/decoder/decoder.go b/internal/ingest/decoder/decoder.go new file mode 100644 index 0000000..a8f2689 --- /dev/null +++ b/internal/ingest/decoder/decoder.go @@ -0,0 +1,66 @@ +package decoder + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/jan/fm-rds-tx/internal/ingest" +) + +var ErrUnsupported = fmt.Errorf("decoder unsupported") + +type StreamMeta struct { + ContentType string + SampleRateHz int + Channels int + SourceID string +} + +type Decoder interface { + Name() string + DecodeStream(ctx context.Context, r io.Reader, meta StreamMeta, emit func(ingest.PCMChunk) error) error +} + +type Builder func() Decoder + +type Registry struct { + byName map[string]Builder +} + +func NewRegistry() *Registry { + return &Registry{byName: map[string]Builder{}} +} + +func (r *Registry) Register(name string, builder Builder) { + if r == nil || builder == nil { + return + } + r.byName[strings.ToLower(strings.TrimSpace(name))] = builder +} + +func (r *Registry) Create(name string) (Decoder, error) { + if r == nil { + return nil, fmt.Errorf("%w: registry nil", ErrUnsupported) + } + builder, ok := r.byName[strings.ToLower(strings.TrimSpace(name))] + if !ok { + return nil, fmt.Errorf("%w: %s", ErrUnsupported, name) + } + return builder(), nil +} + +func (r *Registry) SelectByContentType(contentType string) (Decoder, error) { + ct := strings.ToLower(strings.TrimSpace(contentType)) + switch { + case strings.Contains(ct, "mpeg"), strings.Contains(ct, "mp3"): + return r.Create("mp3") + case strings.Contains(ct, "ogg"), strings.Contains(ct, "vorbis"): + return r.Create("oggvorbis") + case strings.Contains(ct, "aac"), strings.Contains(ct, "adts"): + return r.Create("aac") + default: + return nil, fmt.Errorf("%w: content-type=%s", ErrUnsupported, contentType) + } +} diff --git a/internal/ingest/decoder/decoder_test.go b/internal/ingest/decoder/decoder_test.go new file mode 100644 index 0000000..b304d79 --- /dev/null +++ b/internal/ingest/decoder/decoder_test.go @@ -0,0 +1,42 @@ +package decoder + +import ( + "context" + "io" + "testing" + + "github.com/jan/fm-rds-tx/internal/ingest" +) + +type fakeDecoder struct{ name string } + +func (d *fakeDecoder) Name() string { return d.name } + +func (d *fakeDecoder) DecodeStream(_ context.Context, _ io.Reader, _ StreamMeta, _ func(ingest.PCMChunk) error) error { + return nil +} + +func TestRegistrySelectByContentType(t *testing.T) { + r := NewRegistry() + r.Register("mp3", func() Decoder { return &fakeDecoder{name: "mp3"} }) + r.Register("oggvorbis", func() Decoder { return &fakeDecoder{name: "ogg"} }) + r.Register("aac", func() Decoder { return &fakeDecoder{name: "aac"} }) + + tests := []struct { + ct string + want string + }{ + {"audio/mpeg", "mp3"}, + {"application/ogg", "ogg"}, + {"audio/aac", "aac"}, + } + for _, tt := range tests { + dec, err := r.SelectByContentType(tt.ct) + if err != nil { + t.Fatalf("content-type %s: %v", tt.ct, err) + } + if dec.Name() != tt.want { + t.Fatalf("content-type %s: got %s want %s", tt.ct, dec.Name(), tt.want) + } + } +} diff --git a/internal/ingest/decoder/fallback/ffmpeg.go b/internal/ingest/decoder/fallback/ffmpeg.go new file mode 100644 index 0000000..a2fb3ee --- /dev/null +++ b/internal/ingest/decoder/fallback/ffmpeg.go @@ -0,0 +1,20 @@ +package fallback + +import ( + "context" + "fmt" + "io" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" +) + +type FFmpegDecoder struct{} + +func NewFFmpeg() *FFmpegDecoder { return &FFmpegDecoder{} } + +func (d *FFmpegDecoder) Name() string { return "ffmpeg-fallback" } + +func (d *FFmpegDecoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { + return fmt.Errorf("%w: ffmpeg fallback decoder not wired yet", decoder.ErrUnsupported) +} diff --git a/internal/ingest/decoder/mp3/decoder.go b/internal/ingest/decoder/mp3/decoder.go new file mode 100644 index 0000000..93e5c79 --- /dev/null +++ b/internal/ingest/decoder/mp3/decoder.go @@ -0,0 +1,20 @@ +package mp3 + +import ( + "context" + "fmt" + "io" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" +) + +type Decoder struct{} + +func New() *Decoder { return &Decoder{} } + +func (d *Decoder) Name() string { return "mp3-native" } + +func (d *Decoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { + return fmt.Errorf("%w: mp3 native decoder not wired yet", decoder.ErrUnsupported) +} diff --git a/internal/ingest/decoder/oggvorbis/decoder.go b/internal/ingest/decoder/oggvorbis/decoder.go new file mode 100644 index 0000000..0f7affa --- /dev/null +++ b/internal/ingest/decoder/oggvorbis/decoder.go @@ -0,0 +1,20 @@ +package oggvorbis + +import ( + "context" + "fmt" + "io" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" +) + +type Decoder struct{} + +func New() *Decoder { return &Decoder{} } + +func (d *Decoder) Name() string { return "oggvorbis-native" } + +func (d *Decoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { + return fmt.Errorf("%w: ogg/vorbis native decoder not wired yet", decoder.ErrUnsupported) +}