diff --git a/go.mod b/go.mod index 57c4f25..68ef787 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,6 @@ go 1.22 require github.com/jan/fm-rds-tx/internal v0.0.0 +require github.com/hajimehoshi/go-mp3 v0.3.4 // indirect + replace github.com/jan/fm-rds-tx/internal => ./internal diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..fa80656 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/hajimehoshi/go-mp3 v0.3.4 h1:NUP7pBYH8OguP4diaTZ9wJbUbk3tC0KlfzsEpWmYj68= +github.com/hajimehoshi/go-mp3 v0.3.4/go.mod h1:fRtZraRFcWb0pu7ok0LqyFhCUrPeMsGRSVop0eemFmo= +github.com/hajimehoshi/oto/v2 v2.3.1/go.mod h1:seWLbgHH7AyUMYKfKYT9pg7PhUu9/SisyJvNTT+ASQo= +golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/go.mod b/internal/go.mod index 003cad7..8357e61 100644 --- a/internal/go.mod +++ b/internal/go.mod @@ -1,3 +1,5 @@ module github.com/jan/fm-rds-tx/internal go 1.21 + +require github.com/hajimehoshi/go-mp3 v0.3.4 diff --git a/internal/go.sum b/internal/go.sum new file mode 100644 index 0000000..fa80656 --- /dev/null +++ b/internal/go.sum @@ -0,0 +1,4 @@ +github.com/hajimehoshi/go-mp3 v0.3.4 h1:NUP7pBYH8OguP4diaTZ9wJbUbk3tC0KlfzsEpWmYj68= +github.com/hajimehoshi/go-mp3 v0.3.4/go.mod h1:fRtZraRFcWb0pu7ok0LqyFhCUrPeMsGRSVop0eemFmo= +github.com/hajimehoshi/oto/v2 v2.3.1/go.mod h1:seWLbgHH7AyUMYKfKYT9pg7PhUu9/SisyJvNTT+ASQo= +golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/ingest/decoder/mp3/decoder.go b/internal/ingest/decoder/mp3/decoder.go index 93e5c79..2c7d46e 100644 --- a/internal/ingest/decoder/mp3/decoder.go +++ b/internal/ingest/decoder/mp3/decoder.go @@ -2,9 +2,12 @@ package mp3 import ( "context" + "encoding/binary" "fmt" "io" + "time" + gomp3 "github.com/hajimehoshi/go-mp3" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" ) @@ -15,6 +18,73 @@ func New() *Decoder { return &Decoder{} } func (d *Decoder) Name() string { return "mp3-native" } -func (d *Decoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { - return fmt.Errorf("%w: mp3 native decoder not wired yet", decoder.ErrUnsupported) +func (d *Decoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.StreamMeta, emit func(ingest.PCMChunk) error) error { + if r == nil { + return fmt.Errorf("%w: mp3 decoder stream reader is nil", decoder.ErrUnsupported) + } + if emit == nil { + return fmt.Errorf("%w: mp3 decoder emit callback is nil", decoder.ErrUnsupported) + } + + dec, err := gomp3.NewDecoder(r) + if err != nil { + return fmt.Errorf("%w: mp3 decoder init: %v", decoder.ErrUnsupported, err) + } + + const channels = 2 // go-mp3 always decodes to stereo s16le + sampleRate := dec.SampleRate() + if sampleRate <= 0 { + if meta.SampleRateHz > 0 { + sampleRate = meta.SampleRateHz + } else { + sampleRate = 44100 + } + } + + const chunkFrames = 1024 + const frameBytes = channels * 2 + buf := make([]byte, chunkFrames*frameBytes) + seq := uint64(0) + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + n, readErr := io.ReadAtLeast(dec, buf, frameBytes) + if readErr != nil { + if readErr == io.EOF || readErr == io.ErrUnexpectedEOF { + if n > 0 { + if err := emitChunk(buf[:n], seq, sampleRate, meta.SourceID, emit); err != nil { + return err + } + } + return nil + } + return fmt.Errorf("mp3 decoder read pcm: %w", readErr) + } + + if err := emitChunk(buf[:n], seq, sampleRate, meta.SourceID, emit); err != nil { + return err + } + seq++ + } +} + +func emitChunk(data []byte, seq uint64, sampleRate int, sourceID string, emit func(ingest.PCMChunk) error) error { + samples := make([]int32, 0, len(data)/2) + for i := 0; i+1 < len(data); i += 2 { + v := int16(binary.LittleEndian.Uint16(data[i : i+2])) + samples = append(samples, int32(v)<<16) + } + return emit(ingest.PCMChunk{ + Samples: samples, + Channels: 2, + SampleRateHz: sampleRate, + Sequence: seq, + Timestamp: time.Now(), + SourceID: sourceID, + }) } diff --git a/internal/ingest/decoder/mp3/decoder_test.go b/internal/ingest/decoder/mp3/decoder_test.go new file mode 100644 index 0000000..bdc752b --- /dev/null +++ b/internal/ingest/decoder/mp3/decoder_test.go @@ -0,0 +1,60 @@ +package mp3 + +import ( + "bytes" + "context" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" +) + +func TestDecodeStream(t *testing.T) { + tonePath := filepath.Join("testdata", "tone_44k_stereo.mp3") + data, err := os.ReadFile(tonePath) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var chunks []ingest.PCMChunk + d := New() + err = d.DecodeStream(context.Background(), bytes.NewReader(data), decoder.StreamMeta{ + ContentType: "audio/mpeg", + SourceID: "mp3-test", + }, func(c ingest.PCMChunk) error { + chunks = append(chunks, c) + return nil + }) + if err != nil { + t.Fatalf("decode: %v", err) + } + if len(chunks) == 0 { + t.Fatal("expected chunks") + } + if chunks[0].Channels != 2 { + t.Fatalf("channels=%d want 2", chunks[0].Channels) + } + if chunks[0].SampleRateHz != 44100 { + t.Fatalf("sampleRate=%d want 44100", chunks[0].SampleRateHz) + } + if len(chunks[0].Samples) == 0 { + t.Fatal("expected samples in first chunk") + } +} + +func TestDecodeStreamNilReader(t *testing.T) { + err := New().DecodeStream(context.Background(), nil, decoder.StreamMeta{}, func(ingest.PCMChunk) error { return nil }) + if !errors.Is(err, decoder.ErrUnsupported) { + t.Fatalf("expected unsupported, got %v", err) + } +} + +func TestDecodeStreamNilEmit(t *testing.T) { + err := New().DecodeStream(context.Background(), bytes.NewReader([]byte("not-mp3")), decoder.StreamMeta{}, nil) + if !errors.Is(err, decoder.ErrUnsupported) { + t.Fatalf("expected unsupported, got %v", err) + } +} diff --git a/internal/ingest/decoder/mp3/testdata/tone_44k_stereo.mp3 b/internal/ingest/decoder/mp3/testdata/tone_44k_stereo.mp3 new file mode 100644 index 0000000..8ad9de3 Binary files /dev/null and b/internal/ingest/decoder/mp3/testdata/tone_44k_stereo.mp3 differ