From 845bfce15320f8eacc70a477a319f2e60628c2b4 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 14:48:00 +0200 Subject: [PATCH] ingest: add native ogg vorbis decoder --- go.mod | 6 +- go.sum | 4 + internal/go.mod | 7 +- internal/go.sum | 4 + .../ingest/adapters/icecast/source_test.go | 28 ++++++ internal/ingest/decoder/oggvorbis/decoder.go | 87 +++++++++++++++++- .../ingest/decoder/oggvorbis/decoder_test.go | 60 ++++++++++++ .../oggvorbis/testdata/tone_44k_stereo.ogg | Bin 0 -> 4653 bytes 8 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 internal/ingest/decoder/oggvorbis/decoder_test.go create mode 100644 internal/ingest/decoder/oggvorbis/testdata/tone_44k_stereo.ogg diff --git a/go.mod b/go.mod index 68ef787..d553bb4 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,10 @@ go 1.22 require github.com/jan/fm-rds-tx/internal v0.0.0 -require github.com/hajimehoshi/go-mp3 v0.3.4 // indirect +require ( + github.com/hajimehoshi/go-mp3 v0.3.4 // indirect + github.com/jfreymuth/oggvorbis v1.0.5 // indirect + github.com/jfreymuth/vorbis v1.0.2 // indirect +) replace github.com/jan/fm-rds-tx/internal => ./internal diff --git a/go.sum b/go.sum index fa80656..a67c282 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,8 @@ github.com/hajimehoshi/go-mp3 v0.3.4 h1:NUP7pBYH8OguP4diaTZ9wJbUbk3tC0KlfzsEpWmYj68= github.com/hajimehoshi/go-mp3 v0.3.4/go.mod h1:fRtZraRFcWb0pu7ok0LqyFhCUrPeMsGRSVop0eemFmo= github.com/hajimehoshi/oto/v2 v2.3.1/go.mod h1:seWLbgHH7AyUMYKfKYT9pg7PhUu9/SisyJvNTT+ASQo= +github.com/jfreymuth/oggvorbis v1.0.5 h1:u+Ck+R0eLSRhgq8WTmffYnrVtSztJcYrl588DM4e3kQ= +github.com/jfreymuth/oggvorbis v1.0.5/go.mod h1:1U4pqWmghcoVsCJJ4fRBKv9peUJMBHixthRlBeD6uII= +github.com/jfreymuth/vorbis v1.0.2 h1:m1xH6+ZI4thH927pgKD8JOH4eaGRm18rEE9/0WKjvNE= +github.com/jfreymuth/vorbis v1.0.2/go.mod h1:DoftRo4AznKnShRl1GxiTFCseHr4zR9BN3TWXyuzrqQ= golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/go.mod b/internal/go.mod index 8357e61..89df427 100644 --- a/internal/go.mod +++ b/internal/go.mod @@ -2,4 +2,9 @@ module github.com/jan/fm-rds-tx/internal go 1.21 -require github.com/hajimehoshi/go-mp3 v0.3.4 +require ( + github.com/hajimehoshi/go-mp3 v0.3.4 + github.com/jfreymuth/oggvorbis v1.0.5 +) + +require github.com/jfreymuth/vorbis v1.0.2 // indirect diff --git a/internal/go.sum b/internal/go.sum index fa80656..a67c282 100644 --- a/internal/go.sum +++ b/internal/go.sum @@ -1,4 +1,8 @@ github.com/hajimehoshi/go-mp3 v0.3.4 h1:NUP7pBYH8OguP4diaTZ9wJbUbk3tC0KlfzsEpWmYj68= github.com/hajimehoshi/go-mp3 v0.3.4/go.mod h1:fRtZraRFcWb0pu7ok0LqyFhCUrPeMsGRSVop0eemFmo= github.com/hajimehoshi/oto/v2 v2.3.1/go.mod h1:seWLbgHH7AyUMYKfKYT9pg7PhUu9/SisyJvNTT+ASQo= +github.com/jfreymuth/oggvorbis v1.0.5 h1:u+Ck+R0eLSRhgq8WTmffYnrVtSztJcYrl588DM4e3kQ= +github.com/jfreymuth/oggvorbis v1.0.5/go.mod h1:1U4pqWmghcoVsCJJ4fRBKv9peUJMBHixthRlBeD6uII= +github.com/jfreymuth/vorbis v1.0.2 h1:m1xH6+ZI4thH927pgKD8JOH4eaGRm18rEE9/0WKjvNE= +github.com/jfreymuth/vorbis v1.0.2/go.mod h1:DoftRo4AznKnShRl1GxiTFCseHr4zR9BN3TWXyuzrqQ= golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/ingest/adapters/icecast/source_test.go b/internal/ingest/adapters/icecast/source_test.go index ce7798a..84b4572 100644 --- a/internal/ingest/adapters/icecast/source_test.go +++ b/internal/ingest/adapters/icecast/source_test.go @@ -128,6 +128,34 @@ func TestDecodeWithPreferenceAutoUnsupportedContentTypeFallsBack(t *testing.T) { } } +func TestDecodeWithPreferenceAutoUsesOggNativeForOggContentType(t *testing.T) { + ogg := &testDecoder{name: "oggvorbis"} + fallback := &testDecoder{name: "ffmpeg"} + + reg := decoder.NewRegistry() + reg.Register("oggvorbis", func() decoder.Decoder { return ogg }) + reg.Register("ffmpeg", func() decoder.Decoder { return fallback }) + + src := New("ice-test", "http://example", nil, ReconnectConfig{}, + WithDecoderRegistry(reg), + WithDecoderPreference("auto"), + ) + + err := src.decodeWithPreference(context.Background(), bytes.NewReader(nil), decoder.StreamMeta{ + ContentType: "audio/ogg", + SourceID: "ice-test", + }) + if err != nil { + t.Fatalf("decode: %v", err) + } + if ogg.called != 1 { + t.Fatalf("ogg decoder called %d times", ogg.called) + } + if fallback.called != 0 { + t.Fatalf("fallback should not be called, got %d", fallback.called) + } +} + func TestWithDecoderPreferenceFallbackAliasNormalizesToFFmpeg(t *testing.T) { src := New("ice-test", "http://example", nil, ReconnectConfig{}, WithDecoderPreference("fallback")) if got := src.Descriptor().Codec; got != "ffmpeg" { diff --git a/internal/ingest/decoder/oggvorbis/decoder.go b/internal/ingest/decoder/oggvorbis/decoder.go index 0f7affa..c3de4da 100644 --- a/internal/ingest/decoder/oggvorbis/decoder.go +++ b/internal/ingest/decoder/oggvorbis/decoder.go @@ -4,9 +4,12 @@ import ( "context" "fmt" "io" + "math" + "time" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" + libvorbis "github.com/jfreymuth/oggvorbis" ) type Decoder struct{} @@ -15,6 +18,86 @@ func New() *Decoder { return &Decoder{} } func (d *Decoder) Name() string { return "oggvorbis-native" } -func (d *Decoder) DecodeStream(_ context.Context, _ io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { - return fmt.Errorf("%w: ogg/vorbis native decoder not wired yet", decoder.ErrUnsupported) +func (d *Decoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.StreamMeta, emit func(ingest.PCMChunk) error) error { + if r == nil { + return fmt.Errorf("%w: ogg/vorbis decoder stream reader is nil", decoder.ErrUnsupported) + } + if emit == nil { + return fmt.Errorf("%w: ogg/vorbis decoder emit callback is nil", decoder.ErrUnsupported) + } + + dec, err := libvorbis.NewReader(r) + if err != nil { + return fmt.Errorf("%w: ogg/vorbis decoder init: %v", decoder.ErrUnsupported, err) + } + + channels := dec.Channels() + if channels <= 0 { + if meta.Channels > 0 { + channels = meta.Channels + } else { + return fmt.Errorf("%w: ogg/vorbis decoder invalid channel count", decoder.ErrUnsupported) + } + } + + sampleRate := dec.SampleRate() + if sampleRate <= 0 { + if meta.SampleRateHz > 0 { + sampleRate = meta.SampleRateHz + } else { + sampleRate = 44100 + } + } + + const chunkFrames = 1024 + buf := make([]float32, chunkFrames*channels) + seq := uint64(0) + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + n, readErr := dec.Read(buf) + if n > 0 { + chunk := ingest.PCMChunk{ + Samples: float32ToPCM32(buf[:n]), + Channels: channels, + SampleRateHz: sampleRate, + Sequence: seq, + Timestamp: time.Now(), + SourceID: meta.SourceID, + } + if err := emit(chunk); err != nil { + return err + } + seq++ + } + + if readErr != nil { + if readErr == io.EOF { + return nil + } + return fmt.Errorf("ogg/vorbis decoder read pcm: %w", readErr) + } + } +} + +func float32ToPCM32(in []float32) []int32 { + out := make([]int32, len(in)) + for i, sample := range in { + if sample > 1 { + sample = 1 + } else if sample < -1 { + sample = -1 + } + if sample == -1 { + out[i] = math.MinInt32 + continue + } + out[i] = int32(sample * math.MaxInt32) + } + return out } diff --git a/internal/ingest/decoder/oggvorbis/decoder_test.go b/internal/ingest/decoder/oggvorbis/decoder_test.go new file mode 100644 index 0000000..f1f5d5a --- /dev/null +++ b/internal/ingest/decoder/oggvorbis/decoder_test.go @@ -0,0 +1,60 @@ +package oggvorbis + +import ( + "bytes" + "context" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/decoder" +) + +func TestDecodeStream(t *testing.T) { + tonePath := filepath.Join("testdata", "tone_44k_stereo.ogg") + data, err := os.ReadFile(tonePath) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var chunks []ingest.PCMChunk + d := New() + err = d.DecodeStream(context.Background(), bytes.NewReader(data), decoder.StreamMeta{ + ContentType: "audio/ogg", + SourceID: "ogg-test", + }, func(c ingest.PCMChunk) error { + chunks = append(chunks, c) + return nil + }) + if err != nil { + t.Fatalf("decode: %v", err) + } + if len(chunks) == 0 { + t.Fatal("expected chunks") + } + if chunks[0].Channels != 2 { + t.Fatalf("channels=%d want 2", chunks[0].Channels) + } + if chunks[0].SampleRateHz != 44100 { + t.Fatalf("sampleRate=%d want 44100", chunks[0].SampleRateHz) + } + if len(chunks[0].Samples) == 0 { + t.Fatal("expected samples in first chunk") + } +} + +func TestDecodeStreamNilReader(t *testing.T) { + err := New().DecodeStream(context.Background(), nil, decoder.StreamMeta{}, func(ingest.PCMChunk) error { return nil }) + if !errors.Is(err, decoder.ErrUnsupported) { + t.Fatalf("expected unsupported, got %v", err) + } +} + +func TestDecodeStreamNilEmit(t *testing.T) { + err := New().DecodeStream(context.Background(), bytes.NewReader([]byte("not-ogg")), decoder.StreamMeta{}, nil) + if !errors.Is(err, decoder.ErrUnsupported) { + t.Fatalf("expected unsupported, got %v", err) + } +} diff --git a/internal/ingest/decoder/oggvorbis/testdata/tone_44k_stereo.ogg b/internal/ingest/decoder/oggvorbis/testdata/tone_44k_stereo.ogg new file mode 100644 index 0000000000000000000000000000000000000000..f4176aea98d0f881c012fbc748a56447c661be44 GIT binary patch literal 4653 zcmeHKeND`}rD#jFDpruX+Sc}HwO#hU1hnoyJ$u~0_UxQ_@7D8L-;}b17$10_Gu@?;F?bh zu8DK5hR`2luZ=%fZaUfxxdDG!>Kp4`4xT5N7tG_~^0VN{d_8?rMrPhZ;H3jEBm`&D zbF$OyY6K_ymnmMnf`EhwF;rB}5{t@5i}{ITeTQk{3mfnHikEp5(L$E3>Sn6S{OfDP zC97CFdduVpS9n`gK+)<%WPdGlVWL43un?#c0rX|#eVBOiB@tu+S45tn9p;6)X}%48 zMAJ$Hi5T>a(zVmX?PkqE0SGhZN5*PMf>D~bgUiwtb@JMMw8Z%=ebFsm*F}x1xWuf9 zn*Yd8yIxe_V;_SdiY(KRL|s(v5)sURBxt8-yG4zDT8@A`OH1cxc%w3eYU)_JI+hFO z*Xn}05W<9Ul+&AOXZ}Vz(?WA4PgOV}L_lPai`3OemfngqFcb!+t=@je%I$8o)exI{ zBto%qZj#NEDK@@{=nzRefzq*QsF0%~q^VVQZ? zQJ|cfvrElOTY!rEq)=A56z@~vrfjdVlXqxUUU;+d^G{5by(fW+Id$h+74}rbCT9Z~ z)+@wz?n?cdP3|AbqA&4C^9#8ndN|5+_R~~}$NEOI>WvG?%<|1LX@DJY2P7o>7gJ=B zldrTA|DDvlv+M&kMN&>vW1T*vn|tJ(?X}?U!6WXkx&?P)ivycGW%0T52NO5m6O5QO zAFW8**nCtbWm}9g7aM+bz+O!9%_xh*EvJ>R4^-v3a2oA!(s9=y6!Rmk&tt~3`U9=U zgBj-uHE1AY{y@A$o}YQyl&|=B`>HF&$q%<*h59HzHPMu*c&^%)4u3fvHoJWx>{C?L zIl(Byh)pgN9Ey~b2-|*(YGo|#XPocn#ysH1crA>hNCx=wKJVnXS90TC&K#JZ9RG5% zd`0d`Q|_v^{GsS?B@#~gh8D!0t0;??g4e8&Fygv z>vIf?gXrvBDcvAc+eyy`ni5KF9 zy7Q-+Nj;9CCZ`_Fh=@Wc&HUoQ^+HVY3=4jb#wwQIu%h!1QmnD^#(B?JrV)|6)~ZZa ze_P0tS8x)%pRr;nwZ~$U+d0QylC*PHds|(fv3^fMWTq>uG^)O5c zq0-?*$l4N3OdUcg3Ns4={fm|72lPZG4v5Es0U^UkQJ9?g=Ub79)ElNAAwWa`VHQ5S z!=m`K&1wkz4k!*GlB9(j?IkfcWSi&e_;@^|6jnDBR%eR`!g~1ry6W|2Ow#jGbVUQp z%E2UZKqdA%4!fqI^Cl|Zd60<24#=%EO;H^HIgzfR$qH7&VknY6zWkCr+YGiKS<1Fr z$9@chfd=4+g8Zep;hnl{Gv57O$Aj=bel*1DOZhhn>$1#X0geq!5(jobGC!HFgUMOP zj~a?|$5Nhv(3KsI@D#^pbx9F@8dB4jz@eS)7ScdT zzaW@WF4F~%RPELWpQ=z@h+_MupA(GK8})crB$M98QzN#;miHKCQs`SQWiz!G1OXOf zKZtlClwd=9f7fl|9FYxf83Euw=zHQ6 zRO2vxMMNmu&y~y1VB+VckgCx`N?weKUHr918QJ-{?eB>qVau1b4B`pZpfCc@sH9Cw zy*;{}bNwwo zfm$kHEDiSQ5lpEy4kmG{RqkST^U1`7u_~i6N!Q#+31PD>gK1a?G&fp~(zTqXPE*oG z5ji=21KV_^2y1II$n?Ra8=Ci8$`d!ZEqX*>6&QRe388fwM-*N(^@BN8lwD3feyyGe zFrn+?bit6sL6XZz*9U{g;T}_)p@&TYBa{7&w(Qq-w~k>*&b45kZ2>i@zcE(K2G7w#A@QY(0(|8@sr#M(G}9h2G7vJh%A~Aj zxp20HTK|deev#tsNNSpi$SzCyJN z761`Xgw*OXgdB-ZFQMbt(G3;0#eoAWSO~&qE1`tLu|j z3*RTq8@+kR(Vyr~7IEEc)I?g9@`Tr>f{v-nd)b_PI@pCQ5Pc7OZZ-1o=sTC1k(Hy| zQt+l)TcR(6;0J|@-h^8zQy3o3S5bZarqgE3oJIGaJ%<4y+J3pFs}YfC&zA7$OeETF z&&12VHhqNOp7H)}vG)-&H(Rtwv^e6(xkT2;b%)Q^bB!t`;acE{dfnq+*PKpYbpQ4T zkuZl3M@Em7oO<~A@Voc84t|Y$54^o^JoDm=Z|Z|Mqn^(<#d903zW(u0ZcJtdD%sXy z8?%FlI(RfK+`(6`<&6esbp2egU4(6oi;LT=9Q|c&bmxg*e|^*UwreaRED&l)M(>nC zRHJtIYva}XTVESr_|wgp4$4oV7w>QZ9fF0x@=QdlThoLAp4SpRFq<;;H?Zk z^mQ%VH`O~x69$%!6~B_|GHHPMb4Ml5X&V9qb!!4A4B(gD$)cQJ^Mjw3KI3+BH!;m$bdguJ)- zAOCEB^2_~t>G-A&A7de^c0f}zCyr<$!borzv6tbSb*Scgmk{A5a0ur7+!0+7=6>|a zZ}&#ts$bLYynBlF@3<53QiPBOcA|}r6)$+~dr9bxj$ggE;fy<~A=H!+P7|uv&-iXB zas2&+Q`35aXtxxnAA5c=&Mg1ogvxb_^xd6=NBb6$JLsLE8RpcleACzYe0e0}mOe)@ zkaOP5JTucb+iO>3<(vaKU(U&1ERjrKn`AvPy#GlhbNKVFb=FrE1R=RnlDp-th1d3e z_Bbj(;KRY;tH;*rx8AP|yEe4;y@B7xWIFiNc!s(eUX}dp7 bRh2HiTlXrlBH9p5&neY7gvDZ^N