Преглед на файлове

ingest: centralize source factory and wire icecast decoder fallback

main
Jan преди 1 месец
родител
ревизия
58676ba6e2
променени са 6 файла, в които са добавени 516 реда и са изтрити 42 реда
  1. +92
    -17
      internal/ingest/adapters/icecast/source.go
  2. +107
    -0
      internal/ingest/adapters/icecast/source_test.go
  3. +139
    -2
      internal/ingest/decoder/fallback/ffmpeg.go
  4. +0
    -23
      internal/ingest/factory.go
  5. +76
    -0
      internal/ingest/factory/factory.go
  6. +102
    -0
      internal/ingest/factory/factory_test.go

+ 92
- 17
internal/ingest/adapters/icecast/source.go Целия файл

@@ -2,6 +2,7 @@ package icecast

import (
"context"
"errors"
"fmt"
"io"
"net/http"
@@ -13,6 +14,7 @@ import (
"github.com/jan/fm-rds-tx/internal/ingest"
"github.com/jan/fm-rds-tx/internal/ingest/decoder"
"github.com/jan/fm-rds-tx/internal/ingest/decoder/aac"
"github.com/jan/fm-rds-tx/internal/ingest/decoder/fallback"
"github.com/jan/fm-rds-tx/internal/ingest/decoder/mp3"
"github.com/jan/fm-rds-tx/internal/ingest/decoder/oggvorbis"
)
@@ -25,6 +27,8 @@ type Source struct {
decReg *decoder.Registry
reconn ReconnectConfig

decoderPreference string

chunks chan ingest.PCMChunk
errs chan error

@@ -41,7 +45,23 @@ type Source struct {
lastError atomic.Value // string
}

func New(id, url string, client *http.Client, reconn ReconnectConfig) *Source {
type Option func(*Source)

func WithDecoderPreference(pref string) Option {
return func(s *Source) {
s.decoderPreference = normalizeDecoderPreference(pref)
}
}

func WithDecoderRegistry(reg *decoder.Registry) Option {
return func(s *Source) {
if reg != nil {
s.decReg = reg
}
}
}

func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Option) *Source {
if id == "" {
id = "icecast-main"
}
@@ -49,14 +69,21 @@ func New(id, url string, client *http.Client, reconn ReconnectConfig) *Source {
client = &http.Client{Timeout: 20 * time.Second}
}
s := &Source{
id: id,
url: strings.TrimSpace(url),
client: client,
reconn: reconn,
chunks: make(chan ingest.PCMChunk, 64),
errs: make(chan error, 8),
decReg: defaultRegistry(),
id: id,
url: strings.TrimSpace(url),
client: client,
reconn: reconn,
chunks: make(chan ingest.PCMChunk, 64),
errs: make(chan error, 8),
decReg: defaultRegistry(),
decoderPreference: "auto",
}
for _, opt := range opts {
if opt != nil {
opt(s)
}
}
s.decoderPreference = normalizeDecoderPreference(s.decoderPreference)
s.state.Store("idle")
return s
}
@@ -66,6 +93,7 @@ func defaultRegistry() *decoder.Registry {
r.Register("mp3", func() decoder.Decoder { return mp3.New() })
r.Register("oggvorbis", func() decoder.Decoder { return oggvorbis.New() })
r.Register("aac", func() decoder.Decoder { return aac.New() })
r.Register("ffmpeg", func() decoder.Decoder { return fallback.NewFFmpeg() })
return r
}

@@ -75,7 +103,7 @@ func (s *Source) Descriptor() ingest.SourceDescriptor {
Kind: "icecast",
Family: "streaming",
Transport: "http",
Codec: "auto",
Codec: s.decoderPreference,
Detail: s.url,
}
}
@@ -179,15 +207,13 @@ func (s *Source) connectAndRun(ctx context.Context) error {
s.connected.Store(true)
s.state.Store("buffering")

dec, err := s.decReg.SelectByContentType(resp.Header.Get("Content-Type"))
if err != nil {
return fmt.Errorf("icecast decoder select: %w", err)
}
s.state.Store("running")
return dec.DecodeStream(ctx, resp.Body, decoder.StreamMeta{
ContentType: resp.Header.Get("Content-Type"),
SourceID: s.id,
}, s.emitChunk)
return s.decodeWithPreference(ctx, resp.Body, decoder.StreamMeta{
ContentType: resp.Header.Get("Content-Type"),
SourceID: s.id,
SampleRateHz: 44100,
Channels: 2,
})
}

func (s *Source) emitChunk(chunk ingest.PCMChunk) error {
@@ -202,3 +228,52 @@ func (s *Source) emitChunk(chunk ingest.PCMChunk) error {
s.lastChunkAtUnix.Store(time.Now().UnixNano())
return nil
}

func (s *Source) decodeWithPreference(ctx context.Context, stream io.Reader, meta decoder.StreamMeta) error {
mode := normalizeDecoderPreference(s.decoderPreference)
switch mode {
case "ffmpeg":
return s.decodeNamed(ctx, "ffmpeg", stream, meta)
case "native":
native, err := s.decReg.SelectByContentType(meta.ContentType)
if err != nil {
return fmt.Errorf("icecast native decoder select: %w", err)
}
return native.DecodeStream(ctx, stream, meta, s.emitChunk)
case "auto":
native, err := s.decReg.SelectByContentType(meta.ContentType)
if err == nil {
if err := native.DecodeStream(ctx, stream, meta, s.emitChunk); err == nil {
return nil
} else if !errors.Is(err, decoder.ErrUnsupported) {
return err
}
} else if !errors.Is(err, decoder.ErrUnsupported) {
return fmt.Errorf("icecast decoder select: %w", err)
}
return s.decodeNamed(ctx, "ffmpeg", stream, meta)
default:
return fmt.Errorf("unsupported icecast decoder mode: %s", mode)
}
}

func (s *Source) decodeNamed(ctx context.Context, name string, stream io.Reader, meta decoder.StreamMeta) error {
dec, err := s.decReg.Create(name)
if err != nil {
return fmt.Errorf("icecast decoder=%s unavailable: %w", name, err)
}
return dec.DecodeStream(ctx, stream, meta, s.emitChunk)
}

func normalizeDecoderPreference(pref string) string {
switch strings.ToLower(strings.TrimSpace(pref)) {
case "", "auto":
return "auto"
case "native":
return "native"
case "ffmpeg", "fallback":
return "ffmpeg"
default:
return strings.ToLower(strings.TrimSpace(pref))
}
}

+ 107
- 0
internal/ingest/adapters/icecast/source_test.go Целия файл

@@ -0,0 +1,107 @@
package icecast

import (
"bytes"
"context"
"errors"
"io"
"testing"

"github.com/jan/fm-rds-tx/internal/ingest"
"github.com/jan/fm-rds-tx/internal/ingest/decoder"
)

type testDecoder struct {
name string
err error
called int
}

func (d *testDecoder) Name() string { return d.name }

func (d *testDecoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error {
d.called++
return d.err
}

func TestDecodeWithPreferenceAutoFallsBackFromNativeUnsupported(t *testing.T) {
native := &testDecoder{name: "native", err: decoder.ErrUnsupported}
fallback := &testDecoder{name: "ffmpeg"}

reg := decoder.NewRegistry()
reg.Register("mp3", func() decoder.Decoder { return native })
reg.Register("ffmpeg", func() decoder.Decoder { return fallback })

src := New("ice-test", "http://example", nil, ReconnectConfig{},
WithDecoderRegistry(reg),
WithDecoderPreference("auto"),
)

err := src.decodeWithPreference(context.Background(), bytes.NewReader(nil), decoder.StreamMeta{
ContentType: "audio/mpeg",
SourceID: "ice-test",
})
if err != nil {
t.Fatalf("decode: %v", err)
}
if native.called != 1 {
t.Fatalf("native called %d times", native.called)
}
if fallback.called != 1 {
t.Fatalf("fallback called %d times", fallback.called)
}
}

func TestDecodeWithPreferenceNativeDoesNotFallback(t *testing.T) {
nativeErr := errors.New("decode failed")
native := &testDecoder{name: "native", err: nativeErr}
fallback := &testDecoder{name: "ffmpeg"}

reg := decoder.NewRegistry()
reg.Register("mp3", func() decoder.Decoder { return native })
reg.Register("ffmpeg", func() decoder.Decoder { return fallback })

src := New("ice-test", "http://example", nil, ReconnectConfig{},
WithDecoderRegistry(reg),
WithDecoderPreference("native"),
)

err := src.decodeWithPreference(context.Background(), bytes.NewReader(nil), decoder.StreamMeta{
ContentType: "audio/mpeg",
SourceID: "ice-test",
})
if !errors.Is(err, nativeErr) {
t.Fatalf("expected native error, got %v", err)
}
if fallback.called != 0 {
t.Fatalf("fallback should not be called, got %d", fallback.called)
}
}

func TestDecodeWithPreferenceFFmpegOnly(t *testing.T) {
native := &testDecoder{name: "native"}
fallback := &testDecoder{name: "ffmpeg"}

reg := decoder.NewRegistry()
reg.Register("mp3", func() decoder.Decoder { return native })
reg.Register("ffmpeg", func() decoder.Decoder { return fallback })

src := New("ice-test", "http://example", nil, ReconnectConfig{},
WithDecoderRegistry(reg),
WithDecoderPreference("ffmpeg"),
)

err := src.decodeWithPreference(context.Background(), bytes.NewReader(nil), decoder.StreamMeta{
ContentType: "audio/mpeg",
SourceID: "ice-test",
})
if err != nil {
t.Fatalf("decode: %v", err)
}
if native.called != 0 {
t.Fatalf("native should not be called in ffmpeg mode, got %d", native.called)
}
if fallback.called != 1 {
t.Fatalf("fallback called %d times", fallback.called)
}
}

+ 139
- 2
internal/ingest/decoder/fallback/ffmpeg.go Целия файл

@@ -2,8 +2,14 @@ package fallback

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"os/exec"
"strings"
"sync"
"time"

"github.com/jan/fm-rds-tx/internal/ingest"
"github.com/jan/fm-rds-tx/internal/ingest/decoder"
@@ -15,6 +21,137 @@ func NewFFmpeg() *FFmpegDecoder { return &FFmpegDecoder{} }

func (d *FFmpegDecoder) Name() string { return "ffmpeg-fallback" }

func (d *FFmpegDecoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error {
return fmt.Errorf("%w: ffmpeg fallback decoder not wired yet", decoder.ErrUnsupported)
func (d *FFmpegDecoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.StreamMeta, emit func(ingest.PCMChunk) error) error {
if r == nil {
return fmt.Errorf("%w: ffmpeg decoder stream reader is nil", decoder.ErrUnsupported)
}
if emit == nil {
return fmt.Errorf("%w: ffmpeg decoder emit callback is nil", decoder.ErrUnsupported)
}

sampleRate := meta.SampleRateHz
if sampleRate <= 0 {
sampleRate = 44100
}
channels := meta.Channels
if channels <= 0 {
channels = 2
}

cmd := exec.CommandContext(ctx,
"ffmpeg",
"-hide_banner", "-loglevel", "error",
"-i", "pipe:0",
"-f", "s16le",
"-acodec", "pcm_s16le",
"-ac", fmt.Sprintf("%d", channels),
"-ar", fmt.Sprintf("%d", sampleRate),
"pipe:1",
)

stdin, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("ffmpeg stdin pipe: %w", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("ffmpeg stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("ffmpeg stderr pipe: %w", err)
}

if err := cmd.Start(); err != nil {
if errorsIsNotFound(err) {
return fmt.Errorf("%w: ffmpeg executable not found in PATH", decoder.ErrUnsupported)
}
return fmt.Errorf("ffmpeg start: %w", err)
}

errCh := make(chan error, 2)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, copyErr := io.Copy(stdin, r)
_ = stdin.Close()
if copyErr != nil && ctx.Err() == nil {
errCh <- fmt.Errorf("ffmpeg stdin copy: %w", copyErr)
}
}()

stderrData, _ := io.ReadAll(stderr)
readErr := d.readPCM(ctx, stdout, sampleRate, channels, meta.SourceID, emit)
waitErr := cmd.Wait()
wg.Wait()
close(errCh)

for e := range errCh {
if e != nil {
return e
}
}
if readErr != nil {
return readErr
}
if waitErr != nil && ctx.Err() == nil {
msg := strings.TrimSpace(string(stderrData))
if msg != "" {
return fmt.Errorf("ffmpeg decode: %w (%s)", waitErr, msg)
}
return fmt.Errorf("ffmpeg decode: %w", waitErr)
}
return nil
}

func (d *FFmpegDecoder) readPCM(ctx context.Context, r io.Reader, sampleRate, channels int, sourceID string, emit func(ingest.PCMChunk) error) error {
const chunkFrames = 1024
frameBytes := channels * 2
buf := make([]byte, chunkFrames*frameBytes)
seq := uint64(0)
for {
select {
case <-ctx.Done():
return nil
default:
}
n, err := io.ReadAtLeast(r, buf, frameBytes)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
if n > 0 {
if emitErr := emitPCM(buf[:n], seq, sampleRate, channels, sourceID, emit); emitErr != nil {
return emitErr
}
}
return nil
}
return fmt.Errorf("ffmpeg read pcm: %w", err)
}
if emitErr := emitPCM(buf[:n], seq, sampleRate, channels, sourceID, emit); emitErr != nil {
return emitErr
}
seq++
}
}

func emitPCM(data []byte, seq uint64, sampleRate, channels int, sourceID string, emit func(ingest.PCMChunk) error) error {
samples := make([]int32, 0, len(data)/2)
for i := 0; i+1 < len(data); i += 2 {
v := int16(binary.LittleEndian.Uint16(data[i : i+2]))
samples = append(samples, int32(v)<<16)
}
return emit(ingest.PCMChunk{
Samples: samples,
Channels: channels,
SampleRateHz: sampleRate,
Sequence: seq,
Timestamp: time.Now(),
SourceID: sourceID,
})
}

func errorsIsNotFound(err error) bool {
var execErr *exec.Error
return err != nil && (errors.As(err, &execErr) || strings.Contains(strings.ToLower(err.Error()), "executable file not found"))
}

+ 0
- 23
internal/ingest/factory.go Целия файл

@@ -1,23 +0,0 @@
package ingest

import (
"fmt"
"io"
"net/http"

"github.com/jan/fm-rds-tx/internal/config"
)

type FactoryDeps struct {
Stdin io.Reader
HTTP *http.Client
}

func BuildSource(cfg config.Config, deps FactoryDeps) (Source, error) {
switch cfg.Ingest.Kind {
case "", "none":
return nil, nil
default:
return nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind)
}
}

+ 76
- 0
internal/ingest/factory/factory.go Целия файл

@@ -0,0 +1,76 @@
package factory

import (
"fmt"
"io"
"net/http"
"os"
"strings"

"github.com/jan/fm-rds-tx/internal/config"
"github.com/jan/fm-rds-tx/internal/ingest"
"github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw"
"github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast"
"github.com/jan/fm-rds-tx/internal/ingest/adapters/stdinpcm"
)

type Deps struct {
Stdin io.Reader
HTTP *http.Client
}

type AudioIngress interface {
WritePCM16(data []byte) (int, error)
}

func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, error) {
switch normalizeIngestKind(cfg.Ingest.Kind) {
case "", "none":
return nil, nil, nil
case "stdin", "stdin-pcm":
reader := deps.Stdin
if reader == nil {
reader = os.Stdin
}
src := stdinpcm.New("stdin-main", reader, cfg.Ingest.Stdin.SampleRateHz, cfg.Ingest.Stdin.Channels, 1024)
return src, nil, nil
case "http-raw":
src := httpraw.New("http-raw-main", cfg.Ingest.HTTPRaw.SampleRateHz, cfg.Ingest.HTTPRaw.Channels)
return src, src, nil
case "icecast":
src := icecast.New(
"icecast-main",
cfg.Ingest.Icecast.URL,
deps.HTTP,
icecast.ReconnectConfig{
Enabled: cfg.Ingest.Reconnect.Enabled,
InitialBackoffMs: cfg.Ingest.Reconnect.InitialBackoffMs,
MaxBackoffMs: cfg.Ingest.Reconnect.MaxBackoffMs,
},
icecast.WithDecoderPreference(cfg.Ingest.Icecast.Decoder),
)
return src, nil, nil
default:
return nil, nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind)
}
}

func SampleRateForKind(cfg config.Config) int {
switch normalizeIngestKind(cfg.Ingest.Kind) {
case "stdin", "stdin-pcm":
if cfg.Ingest.Stdin.SampleRateHz > 0 {
return cfg.Ingest.Stdin.SampleRateHz
}
case "http-raw":
if cfg.Ingest.HTTPRaw.SampleRateHz > 0 {
return cfg.Ingest.HTTPRaw.SampleRateHz
}
case "icecast":
return 44100
}
return 44100
}

func normalizeIngestKind(kind string) string {
return strings.ToLower(strings.TrimSpace(kind))
}

+ 102
- 0
internal/ingest/factory/factory_test.go Целия файл

@@ -0,0 +1,102 @@
package factory

import (
"bytes"
"testing"

"github.com/jan/fm-rds-tx/internal/config"
)

func TestBuildSourceNone(t *testing.T) {
cfg := config.Default()
cfg.Ingest.Kind = "none"
src, ingress, err := BuildSource(cfg, Deps{})
if err != nil {
t.Fatalf("build source: %v", err)
}
if src != nil || ingress != nil {
t.Fatalf("expected nil source and ingress for kind=none")
}
}

func TestBuildSourceHTTPRawProvidesIngress(t *testing.T) {
cfg := config.Default()
cfg.Ingest.Kind = "http-raw"
src, ingress, err := BuildSource(cfg, Deps{})
if err != nil {
t.Fatalf("build source: %v", err)
}
if src == nil {
t.Fatalf("expected source")
}
if ingress == nil {
t.Fatalf("expected ingress for http-raw")
}
}

func TestBuildSourceStdin(t *testing.T) {
cfg := config.Default()
cfg.Ingest.Kind = "stdin"
src, ingress, err := BuildSource(cfg, Deps{Stdin: bytes.NewReader(nil)})
if err != nil {
t.Fatalf("build source: %v", err)
}
if src == nil {
t.Fatalf("expected source")
}
if ingress != nil {
t.Fatalf("expected no ingress for stdin")
}
if got := src.Descriptor().Kind; got != "stdin-pcm" {
t.Fatalf("source kind=%s", got)
}
}

func TestBuildSourceIcecastUsesDecoderPreference(t *testing.T) {
cfg := config.Default()
cfg.Ingest.Kind = "icecast"
cfg.Ingest.Icecast.URL = "http://localhost:8000/stream"
cfg.Ingest.Icecast.Decoder = "ffmpeg"
src, ingress, err := BuildSource(cfg, Deps{})
if err != nil {
t.Fatalf("build source: %v", err)
}
if src == nil {
t.Fatalf("expected source")
}
if ingress != nil {
t.Fatalf("expected no ingress for icecast")
}
if got := src.Descriptor().Codec; got != "ffmpeg" {
t.Fatalf("codec=%s want ffmpeg", got)
}
}

func TestBuildSourceUnsupportedKind(t *testing.T) {
cfg := config.Default()
cfg.Ingest.Kind = "nope"
_, _, err := BuildSource(cfg, Deps{})
if err == nil {
t.Fatalf("expected error")
}
}

func TestSampleRateForKind(t *testing.T) {
cfg := config.Default()
cfg.Ingest.Kind = "stdin"
cfg.Ingest.Stdin.SampleRateHz = 48000
if got := SampleRateForKind(cfg); got != 48000 {
t.Fatalf("stdin sample rate=%d", got)
}

cfg.Ingest.Kind = "http-raw"
cfg.Ingest.HTTPRaw.SampleRateHz = 32000
if got := SampleRateForKind(cfg); got != 32000 {
t.Fatalf("http-raw sample rate=%d", got)
}

cfg.Ingest.Kind = "icecast"
if got := SampleRateForKind(cfg); got != 44100 {
t.Fatalf("icecast sample rate=%d", got)
}
}

Loading…
Отказ
Запис