From 70dd4ab8b8a5010383daeb0b666b42f57f7b7b2b Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 14:17:54 +0200 Subject: [PATCH] ingest: add native mp3 decoder --- go.mod | 2 + go.sum | 4 + internal/go.mod | 2 + internal/go.sum | 4 + internal/ingest/decoder/mp3/decoder.go | 74 +++++++++++++++++- internal/ingest/decoder/mp3/decoder_test.go | 60 ++++++++++++++ .../decoder/mp3/testdata/tone_44k_stereo.mp3 | Bin 0 -> 3386 bytes 7 files changed, 144 insertions(+), 2 deletions(-) create mode 100644 go.sum create mode 100644 internal/go.sum create mode 100644 internal/ingest/decoder/mp3/decoder_test.go create mode 100644 internal/ingest/decoder/mp3/testdata/tone_44k_stereo.mp3 diff --git a/go.mod b/go.mod index 57c4f25..68ef787 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,6 @@ go 1.22 require github.com/jan/fm-rds-tx/internal v0.0.0 +require github.com/hajimehoshi/go-mp3 v0.3.4 // indirect + replace github.com/jan/fm-rds-tx/internal => ./internal diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..fa80656 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/hajimehoshi/go-mp3 v0.3.4 h1:NUP7pBYH8OguP4diaTZ9wJbUbk3tC0KlfzsEpWmYj68= +github.com/hajimehoshi/go-mp3 v0.3.4/go.mod h1:fRtZraRFcWb0pu7ok0LqyFhCUrPeMsGRSVop0eemFmo= +github.com/hajimehoshi/oto/v2 v2.3.1/go.mod h1:seWLbgHH7AyUMYKfKYT9pg7PhUu9/SisyJvNTT+ASQo= +golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/go.mod b/internal/go.mod index 003cad7..8357e61 100644 --- a/internal/go.mod +++ b/internal/go.mod @@ -1,3 +1,5 @@ module github.com/jan/fm-rds-tx/internal go 1.21 + +require github.com/hajimehoshi/go-mp3 v0.3.4 diff --git a/internal/go.sum b/internal/go.sum new file mode 100644 index 0000000..fa80656 --- /dev/null +++ b/internal/go.sum @@ -0,0 +1,4 @@ +github.com/hajimehoshi/go-mp3 v0.3.4 h1:NUP7pBYH8OguP4diaTZ9wJbUbk3tC0KlfzsEpWmYj68= +github.com/hajimehoshi/go-mp3 v0.3.4/go.mod h1:fRtZraRFcWb0pu7ok0LqyFhCUrPeMsGRSVop0eemFmo= +github.com/hajimehoshi/oto/v2 v2.3.1/go.mod h1:seWLbgHH7AyUMYKfKYT9pg7PhUu9/SisyJvNTT+ASQo= +golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/ingest/decoder/mp3/decoder.go b/internal/ingest/decoder/mp3/decoder.go index 93e5c79..2c7d46e 100644 --- a/internal/ingest/decoder/mp3/decoder.go +++ b/internal/ingest/decoder/mp3/decoder.go @@ -2,9 +2,12 @@ package mp3 import ( "context" + "encoding/binary" "fmt" "io" + "time" + gomp3 "github.com/hajimehoshi/go-mp3" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" ) @@ -15,6 +18,73 @@ func New() *Decoder { return &Decoder{} } func (d *Decoder) Name() string { return "mp3-native" } -func (d *Decoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { - return fmt.Errorf("%w: mp3 native decoder not wired yet", decoder.ErrUnsupported) +func (d *Decoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.StreamMeta, emit func(ingest.PCMChunk) error) error { + if r == nil { + return fmt.Errorf("%w: mp3 decoder stream reader is nil", decoder.ErrUnsupported) + } + if emit == nil { + return fmt.Errorf("%w: mp3 decoder emit callback is nil", decoder.ErrUnsupported) + } + + dec, err := gomp3.NewDecoder(r) + if err != nil { + return fmt.Errorf("%w: mp3 decoder init: %v", decoder.ErrUnsupported, err) + } + + const channels = 2 // go-mp3 always decodes to stereo s16le + sampleRate := dec.SampleRate() + if sampleRate <= 0 { + if meta.SampleRateHz > 0 { + sampleRate = meta.SampleRateHz + } else { + sampleRate = 44100 + } + } + + const chunkFrames = 1024 + const frameBytes = channels * 2 + buf := make([]byte, chunkFrames*frameBytes) + seq := uint64(0) + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + n, readErr := io.ReadAtLeast(dec, buf, frameBytes) + if readErr != nil { + if readErr == io.EOF || readErr == io.ErrUnexpectedEOF { + if n > 0 { + if err := emitChunk(buf[:n], seq, sampleRate, meta.SourceID, emit); err != nil { + return err + } + } + return nil + } + return fmt.Errorf("mp3 decoder read pcm: %w", readErr) + } + + if err := emitChunk(buf[:n], seq, sampleRate, meta.SourceID, emit); err != nil { + return err + } + seq++ + } +} + +func emitChunk(data []byte, seq uint64, sampleRate int, sourceID string, emit func(ingest.PCMChunk) error) error { + samples := make([]int32, 0, len(data)/2) + for i := 0; i+1 < len(data); i += 2 { + v := int16(binary.LittleEndian.Uint16(data[i : i+2])) + samples = append(samples, int32(v)<<16) + } + return emit(ingest.PCMChunk{ + Samples: samples, + Channels: 2, + SampleRateHz: sampleRate, + Sequence: seq, + Timestamp: time.Now(), + SourceID: sourceID, + }) } diff --git a/internal/ingest/decoder/mp3/decoder_test.go b/internal/ingest/decoder/mp3/decoder_test.go new file mode 100644 index 0000000..bdc752b --- /dev/null +++ b/internal/ingest/decoder/mp3/decoder_test.go @@ -0,0 +1,60 @@ +package mp3 + +import ( + "bytes" + "context" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" +) + +func TestDecodeStream(t *testing.T) { + tonePath := filepath.Join("testdata", "tone_44k_stereo.mp3") + data, err := os.ReadFile(tonePath) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var chunks []ingest.PCMChunk + d := New() + err = d.DecodeStream(context.Background(), bytes.NewReader(data), decoder.StreamMeta{ + ContentType: "audio/mpeg", + SourceID: "mp3-test", + }, func(c ingest.PCMChunk) error { + chunks = append(chunks, c) + return nil + }) + if err != nil { + t.Fatalf("decode: %v", err) + } + if len(chunks) == 0 { + t.Fatal("expected chunks") + } + if chunks[0].Channels != 2 { + t.Fatalf("channels=%d want 2", chunks[0].Channels) + } + if chunks[0].SampleRateHz != 44100 { + t.Fatalf("sampleRate=%d want 44100", chunks[0].SampleRateHz) + } + if len(chunks[0].Samples) == 0 { + t.Fatal("expected samples in first chunk") + } +} + +func TestDecodeStreamNilReader(t *testing.T) { + err := New().DecodeStream(context.Background(), nil, decoder.StreamMeta{}, func(ingest.PCMChunk) error { return nil }) + if !errors.Is(err, decoder.ErrUnsupported) { + t.Fatalf("expected unsupported, got %v", err) + } +} + +func TestDecodeStreamNilEmit(t *testing.T) { + err := New().DecodeStream(context.Background(), bytes.NewReader([]byte("not-mp3")), decoder.StreamMeta{}, nil) + if !errors.Is(err, decoder.ErrUnsupported) { + t.Fatalf("expected unsupported, got %v", err) + } +} diff --git a/internal/ingest/decoder/mp3/testdata/tone_44k_stereo.mp3 b/internal/ingest/decoder/mp3/testdata/tone_44k_stereo.mp3 new file mode 100644 index 0000000000000000000000000000000000000000..8ad9de3483ce8f368af5ceb3110ea872db92a7d3 GIT binary patch literal 3386 zcmeH}dr%YC9)Qp8hIaz;3PA;fgfyi{mPjd3z$k)(0TBcOqHX{~K%NRp3nd8zM4<&l zG(uZQK`NktBBIjpP(cu|0`kya46j(2dV&f~Xd&d%BM z?Qi$EZCr~3i>%)6@Xty=HDa;ZWywF^S5;wrKt5QEPJEl@1lvK?<>tX0UM1%-`RkB$oC&AXfOdIv?B(>H1=-W(mfQB_j_ucs zW7w7Hl`)%NosiQ>Cp^6A6tma_zI?d;B+%+w)06=%15}8K=OVRuE?k16LQiz5s$Nc1 zq9BM%>`X#YTgaMI!|AD1bD+i!ty> zbXUaIih}FtGpl*P0D2fre zK=2C4y_xw%!L>)G8W+6F#2X5WhU`+Zj2e0)diO2_r(f9Tf4c8drE+jEEiVn+ikNZGD!SeRTQ+V z_n7*ueB>oY5>YBwqBhuSS3gQ5GvrnK%36yXmbJ*}=qwb${Sp_-DTj%x0VnSJ!RI6L zU+1OwZ&%%m+TW?xmg|&C6$cazJ`v3m4?8EG+Dhc)_T}1H7Wpv>mHNusS}LtX-toid zK5%&&5}4bRSHN2D;j(t|qviW=Ap;03giNzw18qpP^)~H{#Vw^TQt($74dvy`WjS6= z1={&3g#gMl!Wm?qAumu`M- z^znk3s!R6&$sR9}F#mGaNWeTOMwkE*Y=yu#m?|j8)S(h=lPHzr>d%Xk5(2R^@avd_ z^33nJVO)E27}e3>9=H7Yvu5e92WD?wOuAww1_ zTFYBQ^YX{5?$$Uk8jRye8CVLVIeHJo+SwS?L1UO*2yRxpP}i59p`Aevw8eaL%Djy% z(%m#_4=C&1@>#bhI$uT>H=NW((m&RVani20xL+LY{Eyfn6BkgE1I}Y60vz@}*oB!x z=9pn>ET*RHhp8u)a1wMV*-F2?6Py? zo9hO7$u7KsAz544je|xbpI3>)s^`~OE}ZK}&h|Z@id$B&(%!>44G36ANw6z3tmJ0&@t71Xgf%QVpS2uLOmZ$8iZ)s zveSr%Yn3WoW;=_{_th|XX z`!4!<<+7a=(v_l*sYfH+cokvRgF}J$=ej@f-&1$~z3n%Bij#PG3zNrQ?22$A%)NP3 zB@2F-O6N~y(fN|-N9^L$x(f+wX86)0e`|;g08mHF7~`l&@t5w&29mTVB(X0_gvVw`Ba0S{=)QBEG1euAvPUB8WN5S}H% z2=sLj1cCQIv9=gb-YRkk69I%tf{zLo4avZ>L0)U!?hT3~-XA%nXWOqn@Cv9Z45bPH z&YMi3?AF|t{Fw%a%~cWmt4M`<0* zH-jU)Pj4;K_sD8rw@WJHM-|3)bYBu)mQSf0~vnSmgS)r}8uM9hmCPDdU9jp8r zvxHuQwcBgs-Idf3=WAGHTGjPR2ZDulz^0~|YwiyI+Me=`TgSIO{98lsm`?9BZPCzk opsIbqJ^E9D;>{qCTL8d!N8WUJp@