diff --git a/internal/ingest/adapters/icecast/source.go b/internal/ingest/adapters/icecast/source.go index 319ed9a..93c01f0 100644 --- a/internal/ingest/adapters/icecast/source.go +++ b/internal/ingest/adapters/icecast/source.go @@ -2,6 +2,7 @@ package icecast import ( "context" + "errors" "fmt" "io" "net/http" @@ -13,6 +14,7 @@ import ( "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" "github.com/jan/fm-rds-tx/internal/ingest/decoder/aac" + "github.com/jan/fm-rds-tx/internal/ingest/decoder/fallback" "github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3" "github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis" ) @@ -25,6 +27,8 @@ type Source struct { decReg *decoder.Registry reconn ReconnectConfig + decoderPreference string + chunks chan ingest.PCMChunk errs chan error @@ -41,7 +45,23 @@ type Source struct { lastError atomic.Value // string } -func New(id, url string, client *http.Client, reconn ReconnectConfig) *Source { +type Option func(*Source) + +func WithDecoderPreference(pref string) Option { + return func(s *Source) { + s.decoderPreference = normalizeDecoderPreference(pref) + } +} + +func WithDecoderRegistry(reg *decoder.Registry) Option { + return func(s *Source) { + if reg != nil { + s.decReg = reg + } + } +} + +func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Option) *Source { if id == "" { id = "icecast-main" } @@ -49,14 +69,21 @@ func New(id, url string, client *http.Client, reconn ReconnectConfig) *Source { client = &http.Client{Timeout: 20 * time.Second} } s := &Source{ - id: id, - url: strings.TrimSpace(url), - client: client, - reconn: reconn, - chunks: make(chan ingest.PCMChunk, 64), - errs: make(chan error, 8), - decReg: defaultRegistry(), + id: id, + url: strings.TrimSpace(url), + client: client, + reconn: reconn, + chunks: make(chan ingest.PCMChunk, 64), + errs: make(chan error, 8), + decReg: defaultRegistry(), + decoderPreference: "auto", + } + for _, opt := range opts { + if opt != nil { + opt(s) + } } + s.decoderPreference = normalizeDecoderPreference(s.decoderPreference) s.state.Store("idle") return s } @@ -66,6 +93,7 @@ func defaultRegistry() *decoder.Registry { r.Register("mp3", func() decoder.Decoder { return mp3.New() }) r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() }) r.Register("aac", func() decoder.Decoder { return aac.New() }) + r.Register("ffmpeg", func() decoder.Decoder { return fallback.NewFFmpeg() }) return r } @@ -75,7 +103,7 @@ func (s *Source) Descriptor() ingest.SourceDescriptor { Kind: "icecast", Family: "streaming", Transport: "http", - Codec: "auto", + Codec: s.decoderPreference, Detail: s.url, } } @@ -179,15 +207,13 @@ func (s *Source) connectAndRun(ctx context.Context) error { s.connected.Store(true) s.state.Store("buffering") - dec, err := s.decReg.SelectByContentType(resp.Header.Get("Content-Type")) - if err != nil { - return fmt.Errorf("icecast decoder select: %w", err) - } s.state.Store("running") - return dec.DecodeStream(ctx, resp.Body, decoder.StreamMeta{ - ContentType: resp.Header.Get("Content-Type"), - SourceID: s.id, - }, s.emitChunk) + return s.decodeWithPreference(ctx, resp.Body, decoder.StreamMeta{ + ContentType: resp.Header.Get("Content-Type"), + SourceID: s.id, + SampleRateHz: 44100, + Channels: 2, + }) } func (s *Source) emitChunk(chunk ingest.PCMChunk) error { @@ -202,3 +228,52 @@ func (s *Source) emitChunk(chunk ingest.PCMChunk) error { s.lastChunkAtUnix.Store(time.Now().UnixNano()) return nil } + +func (s *Source) decodeWithPreference(ctx context.Context, stream io.Reader, meta decoder.StreamMeta) error { + mode := normalizeDecoderPreference(s.decoderPreference) + switch mode { + case "ffmpeg": + return s.decodeNamed(ctx, "ffmpeg", stream, meta) + case "native": + native, err := s.decReg.SelectByContentType(meta.ContentType) + if err != nil { + return fmt.Errorf("icecast native decoder select: %w", err) + } + return native.DecodeStream(ctx, stream, meta, s.emitChunk) + case "auto": + native, err := s.decReg.SelectByContentType(meta.ContentType) + if err == nil { + if err := native.DecodeStream(ctx, stream, meta, s.emitChunk); err == nil { + return nil + } else if !errors.Is(err, decoder.ErrUnsupported) { + return err + } + } else if !errors.Is(err, decoder.ErrUnsupported) { + return fmt.Errorf("icecast decoder select: %w", err) + } + return s.decodeNamed(ctx, "ffmpeg", stream, meta) + default: + return fmt.Errorf("unsupported icecast decoder mode: %s", mode) + } +} + +func (s *Source) decodeNamed(ctx context.Context, name string, stream io.Reader, meta decoder.StreamMeta) error { + dec, err := s.decReg.Create(name) + if err != nil { + return fmt.Errorf("icecast decoder=%s unavailable: %w", name, err) + } + return dec.DecodeStream(ctx, stream, meta, s.emitChunk) +} + +func normalizeDecoderPreference(pref string) string { + switch strings.ToLower(strings.TrimSpace(pref)) { + case "", "auto": + return "auto" + case "native": + return "native" + case "ffmpeg", "fallback": + return "ffmpeg" + default: + return strings.ToLower(strings.TrimSpace(pref)) + } +} diff --git a/internal/ingest/adapters/icecast/source_test.go b/internal/ingest/adapters/icecast/source_test.go new file mode 100644 index 0000000..3786d90 --- /dev/null +++ b/internal/ingest/adapters/icecast/source_test.go @@ -0,0 +1,107 @@ +package icecast + +import ( + "bytes" + "context" + "errors" + "io" + "testing" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" +) + +type testDecoder struct { + name string + err error + called int +} + +func (d *testDecoder) Name() string { return d.name } + +func (d *testDecoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { + d.called++ + return d.err +} + +func TestDecodeWithPreferenceAutoFallsBackFromNativeUnsupported(t *testing.T) { + native := &testDecoder{name: "native", err: decoder.ErrUnsupported} + fallback := &testDecoder{name: "ffmpeg"} + + reg := decoder.NewRegistry() + reg.Register("mp3", func() decoder.Decoder { return native }) + reg.Register("ffmpeg", func() decoder.Decoder { return fallback }) + + src := New("ice-test", "http://example", nil, ReconnectConfig{}, + WithDecoderRegistry(reg), + WithDecoderPreference("auto"), + ) + + err := src.decodeWithPreference(context.Background(), bytes.NewReader(nil), decoder.StreamMeta{ + ContentType: "audio/mpeg", + SourceID: "ice-test", + }) + if err != nil { + t.Fatalf("decode: %v", err) + } + if native.called != 1 { + t.Fatalf("native called %d times", native.called) + } + if fallback.called != 1 { + t.Fatalf("fallback called %d times", fallback.called) + } +} + +func TestDecodeWithPreferenceNativeDoesNotFallback(t *testing.T) { + nativeErr := errors.New("decode failed") + native := &testDecoder{name: "native", err: nativeErr} + fallback := &testDecoder{name: "ffmpeg"} + + reg := decoder.NewRegistry() + reg.Register("mp3", func() decoder.Decoder { return native }) + reg.Register("ffmpeg", func() decoder.Decoder { return fallback }) + + src := New("ice-test", "http://example", nil, ReconnectConfig{}, + WithDecoderRegistry(reg), + WithDecoderPreference("native"), + ) + + err := src.decodeWithPreference(context.Background(), bytes.NewReader(nil), decoder.StreamMeta{ + ContentType: "audio/mpeg", + SourceID: "ice-test", + }) + if !errors.Is(err, nativeErr) { + t.Fatalf("expected native error, got %v", err) + } + if fallback.called != 0 { + t.Fatalf("fallback should not be called, got %d", fallback.called) + } +} + +func TestDecodeWithPreferenceFFmpegOnly(t *testing.T) { + native := &testDecoder{name: "native"} + fallback := &testDecoder{name: "ffmpeg"} + + reg := decoder.NewRegistry() + reg.Register("mp3", func() decoder.Decoder { return native }) + reg.Register("ffmpeg", func() decoder.Decoder { return fallback }) + + src := New("ice-test", "http://example", nil, ReconnectConfig{}, + WithDecoderRegistry(reg), + WithDecoderPreference("ffmpeg"), + ) + + err := src.decodeWithPreference(context.Background(), bytes.NewReader(nil), decoder.StreamMeta{ + ContentType: "audio/mpeg", + SourceID: "ice-test", + }) + if err != nil { + t.Fatalf("decode: %v", err) + } + if native.called != 0 { + t.Fatalf("native should not be called in ffmpeg mode, got %d", native.called) + } + if fallback.called != 1 { + t.Fatalf("fallback called %d times", fallback.called) + } +} diff --git a/internal/ingest/decoder/fallback/ffmpeg.go b/internal/ingest/decoder/fallback/ffmpeg.go index a2fb3ee..6dc4198 100644 --- a/internal/ingest/decoder/fallback/ffmpeg.go +++ b/internal/ingest/decoder/fallback/ffmpeg.go @@ -2,8 +2,14 @@ package fallback import ( "context" + "encoding/binary" + "errors" "fmt" "io" + "os/exec" + "strings" + "sync" + "time" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" @@ -15,6 +21,137 @@ func NewFFmpeg() *FFmpegDecoder { return &FFmpegDecoder{} } func (d *FFmpegDecoder) Name() string { return "ffmpeg-fallback" } -func (d *FFmpegDecoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { - return fmt.Errorf("%w: ffmpeg fallback decoder not wired yet", decoder.ErrUnsupported) +func (d *FFmpegDecoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.StreamMeta, emit func(ingest.PCMChunk) error) error { + if r == nil { + return fmt.Errorf("%w: ffmpeg decoder stream reader is nil", decoder.ErrUnsupported) + } + if emit == nil { + return fmt.Errorf("%w: ffmpeg decoder emit callback is nil", decoder.ErrUnsupported) + } + + sampleRate := meta.SampleRateHz + if sampleRate <= 0 { + sampleRate = 44100 + } + channels := meta.Channels + if channels <= 0 { + channels = 2 + } + + cmd := exec.CommandContext(ctx, + "ffmpeg", + "-hide_banner", "-loglevel", "error", + "-i", "pipe:0", + "-f", "s16le", + "-acodec", "pcm_s16le", + "-ac", fmt.Sprintf("%d", channels), + "-ar", fmt.Sprintf("%d", sampleRate), + "pipe:1", + ) + + stdin, err := cmd.StdinPipe() + if err != nil { + return fmt.Errorf("ffmpeg stdin pipe: %w", err) + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("ffmpeg stdout pipe: %w", err) + } + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("ffmpeg stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + if errorsIsNotFound(err) { + return fmt.Errorf("%w: ffmpeg executable not found in PATH", decoder.ErrUnsupported) + } + return fmt.Errorf("ffmpeg start: %w", err) + } + + errCh := make(chan error, 2) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, copyErr := io.Copy(stdin, r) + _ = stdin.Close() + if copyErr != nil && ctx.Err() == nil { + errCh <- fmt.Errorf("ffmpeg stdin copy: %w", copyErr) + } + }() + + stderrData, _ := io.ReadAll(stderr) + readErr := d.readPCM(ctx, stdout, sampleRate, channels, meta.SourceID, emit) + waitErr := cmd.Wait() + wg.Wait() + close(errCh) + + for e := range errCh { + if e != nil { + return e + } + } + if readErr != nil { + return readErr + } + if waitErr != nil && ctx.Err() == nil { + msg := strings.TrimSpace(string(stderrData)) + if msg != "" { + return fmt.Errorf("ffmpeg decode: %w (%s)", waitErr, msg) + } + return fmt.Errorf("ffmpeg decode: %w", waitErr) + } + return nil +} + +func (d *FFmpegDecoder) readPCM(ctx context.Context, r io.Reader, sampleRate, channels int, sourceID string, emit func(ingest.PCMChunk) error) error { + const chunkFrames = 1024 + frameBytes := channels * 2 + buf := make([]byte, chunkFrames*frameBytes) + seq := uint64(0) + for { + select { + case <-ctx.Done(): + return nil + default: + } + n, err := io.ReadAtLeast(r, buf, frameBytes) + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + if n > 0 { + if emitErr := emitPCM(buf[:n], seq, sampleRate, channels, sourceID, emit); emitErr != nil { + return emitErr + } + } + return nil + } + return fmt.Errorf("ffmpeg read pcm: %w", err) + } + if emitErr := emitPCM(buf[:n], seq, sampleRate, channels, sourceID, emit); emitErr != nil { + return emitErr + } + seq++ + } +} + +func emitPCM(data []byte, seq uint64, sampleRate, channels int, sourceID string, emit func(ingest.PCMChunk) error) error { + samples := make([]int32, 0, len(data)/2) + for i := 0; i+1 < len(data); i += 2 { + v := int16(binary.LittleEndian.Uint16(data[i : i+2])) + samples = append(samples, int32(v)<<16) + } + return emit(ingest.PCMChunk{ + Samples: samples, + Channels: channels, + SampleRateHz: sampleRate, + Sequence: seq, + Timestamp: time.Now(), + SourceID: sourceID, + }) +} + +func errorsIsNotFound(err error) bool { + var execErr *exec.Error + return err != nil && (errors.As(err, &execErr) || strings.Contains(strings.ToLower(err.Error()), "executable file not found")) } diff --git a/internal/ingest/factory.go b/internal/ingest/factory.go deleted file mode 100644 index d3a65e7..0000000 --- a/internal/ingest/factory.go +++ /dev/null @@ -1,23 +0,0 @@ -package ingest - -import ( - "fmt" - "io" - "net/http" - - "github.com/jan/fm-rds-tx/internal/config" -) - -type FactoryDeps struct { - Stdin io.Reader - HTTP *http.Client -} - -func BuildSource(cfg config.Config, deps FactoryDeps) (Source, error) { - switch cfg.Ingest.Kind { - case "", "none": - return nil, nil - default: - return nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind) - } -} diff --git a/internal/ingest/factory/factory.go b/internal/ingest/factory/factory.go new file mode 100644 index 0000000..5a46905 --- /dev/null +++ b/internal/ingest/factory/factory.go @@ -0,0 +1,76 @@ +package factory + +import ( + "fmt" + "io" + "net/http" + "os" + "strings" + + "github.com/jan/fm-rds-tx/internal/config" + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw" + "github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast" + "github.com/jan/fm-rds-tx/internal/ingest/adapters/stdinpcm" +) + +type Deps struct { + Stdin io.Reader + HTTP *http.Client +} + +type AudioIngress interface { + WritePCM16(data []byte) (int, error) +} + +func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, error) { + switch normalizeIngestKind(cfg.Ingest.Kind) { + case "", "none": + return nil, nil, nil + case "stdin", "stdin-pcm": + reader := deps.Stdin + if reader == nil { + reader = os.Stdin + } + src := stdinpcm.New("stdin-main", reader, cfg.Ingest.Stdin.SampleRateHz, cfg.Ingest.Stdin.Channels, 1024) + return src, nil, nil + case "http-raw": + src := httpraw.New("http-raw-main", cfg.Ingest.HTTPRaw.SampleRateHz, cfg.Ingest.HTTPRaw.Channels) + return src, src, nil + case "icecast": + src := icecast.New( + "icecast-main", + cfg.Ingest.Icecast.URL, + deps.HTTP, + icecast.ReconnectConfig{ + Enabled: cfg.Ingest.Reconnect.Enabled, + InitialBackoffMs: cfg.Ingest.Reconnect.InitialBackoffMs, + MaxBackoffMs: cfg.Ingest.Reconnect.MaxBackoffMs, + }, + icecast.WithDecoderPreference(cfg.Ingest.Icecast.Decoder), + ) + return src, nil, nil + default: + return nil, nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind) + } +} + +func SampleRateForKind(cfg config.Config) int { + switch normalizeIngestKind(cfg.Ingest.Kind) { + case "stdin", "stdin-pcm": + if cfg.Ingest.Stdin.SampleRateHz > 0 { + return cfg.Ingest.Stdin.SampleRateHz + } + case "http-raw": + if cfg.Ingest.HTTPRaw.SampleRateHz > 0 { + return cfg.Ingest.HTTPRaw.SampleRateHz + } + case "icecast": + return 44100 + } + return 44100 +} + +func normalizeIngestKind(kind string) string { + return strings.ToLower(strings.TrimSpace(kind)) +} diff --git a/internal/ingest/factory/factory_test.go b/internal/ingest/factory/factory_test.go new file mode 100644 index 0000000..d443b2d --- /dev/null +++ b/internal/ingest/factory/factory_test.go @@ -0,0 +1,102 @@ +package factory + +import ( + "bytes" + "testing" + + "github.com/jan/fm-rds-tx/internal/config" +) + +func TestBuildSourceNone(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "none" + src, ingress, err := BuildSource(cfg, Deps{}) + if err != nil { + t.Fatalf("build source: %v", err) + } + if src != nil || ingress != nil { + t.Fatalf("expected nil source and ingress for kind=none") + } +} + +func TestBuildSourceHTTPRawProvidesIngress(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "http-raw" + src, ingress, err := BuildSource(cfg, Deps{}) + if err != nil { + t.Fatalf("build source: %v", err) + } + if src == nil { + t.Fatalf("expected source") + } + if ingress == nil { + t.Fatalf("expected ingress for http-raw") + } +} + +func TestBuildSourceStdin(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "stdin" + src, ingress, err := BuildSource(cfg, Deps{Stdin: bytes.NewReader(nil)}) + if err != nil { + t.Fatalf("build source: %v", err) + } + if src == nil { + t.Fatalf("expected source") + } + if ingress != nil { + t.Fatalf("expected no ingress for stdin") + } + if got := src.Descriptor().Kind; got != "stdin-pcm" { + t.Fatalf("source kind=%s", got) + } +} + +func TestBuildSourceIcecastUsesDecoderPreference(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "icecast" + cfg.Ingest.Icecast.URL = "http://localhost:8000/stream" + cfg.Ingest.Icecast.Decoder = "ffmpeg" + src, ingress, err := BuildSource(cfg, Deps{}) + if err != nil { + t.Fatalf("build source: %v", err) + } + if src == nil { + t.Fatalf("expected source") + } + if ingress != nil { + t.Fatalf("expected no ingress for icecast") + } + if got := src.Descriptor().Codec; got != "ffmpeg" { + t.Fatalf("codec=%s want ffmpeg", got) + } +} + +func TestBuildSourceUnsupportedKind(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "nope" + _, _, err := BuildSource(cfg, Deps{}) + if err == nil { + t.Fatalf("expected error") + } +} + +func TestSampleRateForKind(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "stdin" + cfg.Ingest.Stdin.SampleRateHz = 48000 + if got := SampleRateForKind(cfg); got != 48000 { + t.Fatalf("stdin sample rate=%d", got) + } + + cfg.Ingest.Kind = "http-raw" + cfg.Ingest.HTTPRaw.SampleRateHz = 32000 + if got := SampleRateForKind(cfg); got != 32000 { + t.Fatalf("http-raw sample rate=%d", got) + } + + cfg.Ingest.Kind = "icecast" + if got := SampleRateForKind(cfg); got != 44100 { + t.Fatalf("icecast sample rate=%d", got) + } +}