diff --git a/go.mod b/go.mod index 68ef787..d553bb4 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,10 @@ go 1.22 require github.com/jan/fm-rds-tx/internal v0.0.0 -require github.com/hajimehoshi/go-mp3 v0.3.4 // indirect +require ( + github.com/hajimehoshi/go-mp3 v0.3.4 // indirect + github.com/jfreymuth/oggvorbis v1.0.5 // indirect + github.com/jfreymuth/vorbis v1.0.2 // indirect +) replace github.com/jan/fm-rds-tx/internal => ./internal diff --git a/go.sum b/go.sum index fa80656..a67c282 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,8 @@ github.com/hajimehoshi/go-mp3 v0.3.4 h1:NUP7pBYH8OguP4diaTZ9wJbUbk3tC0KlfzsEpWmYj68= github.com/hajimehoshi/go-mp3 v0.3.4/go.mod h1:fRtZraRFcWb0pu7ok0LqyFhCUrPeMsGRSVop0eemFmo= github.com/hajimehoshi/oto/v2 v2.3.1/go.mod h1:seWLbgHH7AyUMYKfKYT9pg7PhUu9/SisyJvNTT+ASQo= +github.com/jfreymuth/oggvorbis v1.0.5 h1:u+Ck+R0eLSRhgq8WTmffYnrVtSztJcYrl588DM4e3kQ= +github.com/jfreymuth/oggvorbis v1.0.5/go.mod h1:1U4pqWmghcoVsCJJ4fRBKv9peUJMBHixthRlBeD6uII= +github.com/jfreymuth/vorbis v1.0.2 h1:m1xH6+ZI4thH927pgKD8JOH4eaGRm18rEE9/0WKjvNE= +github.com/jfreymuth/vorbis v1.0.2/go.mod h1:DoftRo4AznKnShRl1GxiTFCseHr4zR9BN3TWXyuzrqQ= golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/go.mod b/internal/go.mod index 8357e61..89df427 100644 --- a/internal/go.mod +++ b/internal/go.mod @@ -2,4 +2,9 @@ module github.com/jan/fm-rds-tx/internal go 1.21 -require github.com/hajimehoshi/go-mp3 v0.3.4 +require ( + github.com/hajimehoshi/go-mp3 v0.3.4 + github.com/jfreymuth/oggvorbis v1.0.5 +) + +require github.com/jfreymuth/vorbis v1.0.2 // indirect diff --git a/internal/go.sum b/internal/go.sum index fa80656..a67c282 100644 --- a/internal/go.sum +++ b/internal/go.sum @@ -1,4 +1,8 @@ github.com/hajimehoshi/go-mp3 v0.3.4 h1:NUP7pBYH8OguP4diaTZ9wJbUbk3tC0KlfzsEpWmYj68= github.com/hajimehoshi/go-mp3 v0.3.4/go.mod h1:fRtZraRFcWb0pu7ok0LqyFhCUrPeMsGRSVop0eemFmo= github.com/hajimehoshi/oto/v2 v2.3.1/go.mod h1:seWLbgHH7AyUMYKfKYT9pg7PhUu9/SisyJvNTT+ASQo= +github.com/jfreymuth/oggvorbis v1.0.5 h1:u+Ck+R0eLSRhgq8WTmffYnrVtSztJcYrl588DM4e3kQ= +github.com/jfreymuth/oggvorbis v1.0.5/go.mod h1:1U4pqWmghcoVsCJJ4fRBKv9peUJMBHixthRlBeD6uII= +github.com/jfreymuth/vorbis v1.0.2 h1:m1xH6+ZI4thH927pgKD8JOH4eaGRm18rEE9/0WKjvNE= +github.com/jfreymuth/vorbis v1.0.2/go.mod h1:DoftRo4AznKnShRl1GxiTFCseHr4zR9BN3TWXyuzrqQ= golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/ingest/adapters/icecast/source_test.go b/internal/ingest/adapters/icecast/source_test.go index ce7798a..84b4572 100644 --- a/internal/ingest/adapters/icecast/source_test.go +++ b/internal/ingest/adapters/icecast/source_test.go @@ -128,6 +128,34 @@ func TestDecodeWithPreferenceAutoUnsupportedContentTypeFallsBack(t *testing.T) { } } +func TestDecodeWithPreferenceAutoUsesOggNativeForOggContentType(t *testing.T) { + ogg := &testDecoder{name: "oggvorbis"} + fallback := &testDecoder{name: "ffmpeg"} + + reg := decoder.NewRegistry() + reg.Register("oggvorbis", func() decoder.Decoder { return ogg }) + reg.Register("ffmpeg", func() decoder.Decoder { return fallback }) + + src := New("ice-test", "http://example", nil, ReconnectConfig{}, + WithDecoderRegistry(reg), + WithDecoderPreference("auto"), + ) + + err := src.decodeWithPreference(context.Background(), bytes.NewReader(nil), decoder.StreamMeta{ + ContentType: "audio/ogg", + SourceID: "ice-test", + }) + if err != nil { + t.Fatalf("decode: %v", err) + } + if ogg.called != 1 { + t.Fatalf("ogg decoder called %d times", ogg.called) + } + if fallback.called != 0 { + t.Fatalf("fallback should not be called, got %d", fallback.called) + } +} + func TestWithDecoderPreferenceFallbackAliasNormalizesToFFmpeg(t *testing.T) { src := New("ice-test", "http://example", nil, ReconnectConfig{}, WithDecoderPreference("fallback")) if got := src.Descriptor().Codec; got != "ffmpeg" { diff --git a/internal/ingest/decoder/oggvorbis/decoder.go b/internal/ingest/decoder/oggvorbis/decoder.go index 0f7affa..c3de4da 100644 --- a/internal/ingest/decoder/oggvorbis/decoder.go +++ b/internal/ingest/decoder/oggvorbis/decoder.go @@ -4,9 +4,12 @@ import ( "context" "fmt" "io" + "math" + "time" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" + libvorbis "github.com/jfreymuth/oggvorbis" ) type Decoder struct{} @@ -15,6 +18,86 @@ func New() *Decoder { return &Decoder{} } func (d *Decoder) Name() string { return "oggvorbis-native" } -func (d *Decoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { - return fmt.Errorf("%w: ogg/vorbis native decoder not wired yet", decoder.ErrUnsupported) +func (d *Decoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.StreamMeta, emit func(ingest.PCMChunk) error) error { + if r == nil { + return fmt.Errorf("%w: ogg/vorbis decoder stream reader is nil", decoder.ErrUnsupported) + } + if emit == nil { + return fmt.Errorf("%w: ogg/vorbis decoder emit callback is nil", decoder.ErrUnsupported) + } + + dec, err := libvorbis.NewReader(r) + if err != nil { + return fmt.Errorf("%w: ogg/vorbis decoder init: %v", decoder.ErrUnsupported, err) + } + + channels := dec.Channels() + if channels <= 0 { + if meta.Channels > 0 { + channels = meta.Channels + } else { + return fmt.Errorf("%w: ogg/vorbis decoder invalid channel count", decoder.ErrUnsupported) + } + } + + sampleRate := dec.SampleRate() + if sampleRate <= 0 { + if meta.SampleRateHz > 0 { + sampleRate = meta.SampleRateHz + } else { + sampleRate = 44100 + } + } + + const chunkFrames = 1024 + buf := make([]float32, chunkFrames*channels) + seq := uint64(0) + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + n, readErr := dec.Read(buf) + if n > 0 { + chunk := ingest.PCMChunk{ + Samples: float32ToPCM32(buf[:n]), + Channels: channels, + SampleRateHz: sampleRate, + Sequence: seq, + Timestamp: time.Now(), + SourceID: meta.SourceID, + } + if err := emit(chunk); err != nil { + return err + } + seq++ + } + + if readErr != nil { + if readErr == io.EOF { + return nil + } + return fmt.Errorf("ogg/vorbis decoder read pcm: %w", readErr) + } + } +} + +func float32ToPCM32(in []float32) []int32 { + out := make([]int32, len(in)) + for i, sample := range in { + if sample > 1 { + sample = 1 + } else if sample < -1 { + sample = -1 + } + if sample == -1 { + out[i] = math.MinInt32 + continue + } + out[i] = int32(sample * math.MaxInt32) + } + return out } diff --git a/internal/ingest/decoder/oggvorbis/decoder_test.go b/internal/ingest/decoder/oggvorbis/decoder_test.go new file mode 100644 index 0000000..f1f5d5a --- /dev/null +++ b/internal/ingest/decoder/oggvorbis/decoder_test.go @@ -0,0 +1,60 @@ +package oggvorbis + +import ( + "bytes" + "context" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" +) + +func TestDecodeStream(t *testing.T) { + tonePath := filepath.Join("testdata", "tone_44k_stereo.ogg") + data, err := os.ReadFile(tonePath) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var chunks []ingest.PCMChunk + d := New() + err = d.DecodeStream(context.Background(), bytes.NewReader(data), decoder.StreamMeta{ + ContentType: "audio/ogg", + SourceID: "ogg-test", + }, func(c ingest.PCMChunk) error { + chunks = append(chunks, c) + return nil + }) + if err != nil { + t.Fatalf("decode: %v", err) + } + if len(chunks) == 0 { + t.Fatal("expected chunks") + } + if chunks[0].Channels != 2 { + t.Fatalf("channels=%d want 2", chunks[0].Channels) + } + if chunks[0].SampleRateHz != 44100 { + t.Fatalf("sampleRate=%d want 44100", chunks[0].SampleRateHz) + } + if len(chunks[0].Samples) == 0 { + t.Fatal("expected samples in first chunk") + } +} + +func TestDecodeStreamNilReader(t *testing.T) { + err := New().DecodeStream(context.Background(), nil, decoder.StreamMeta{}, func(ingest.PCMChunk) error { return nil }) + if !errors.Is(err, decoder.ErrUnsupported) { + t.Fatalf("expected unsupported, got %v", err) + } +} + +func TestDecodeStreamNilEmit(t *testing.T) { + err := New().DecodeStream(context.Background(), bytes.NewReader([]byte("not-ogg")), decoder.StreamMeta{}, nil) + if !errors.Is(err, decoder.ErrUnsupported) { + t.Fatalf("expected unsupported, got %v", err) + } +} diff --git a/internal/ingest/decoder/oggvorbis/testdata/tone_44k_stereo.ogg b/internal/ingest/decoder/oggvorbis/testdata/tone_44k_stereo.ogg new file mode 100644 index 0000000..f4176ae Binary files /dev/null and b/internal/ingest/decoder/oggvorbis/testdata/tone_44k_stereo.ogg differ