From a2e36cab153433cd9a4b3f03b826554b07efd168 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 16:14:53 +0200 Subject: [PATCH] ingest: share decoder pcm helpers --- internal/ingest/decoder/helpers.go | 58 ++++++++++++++++++++ internal/ingest/decoder/mp3/decoder.go | 31 +++-------- internal/ingest/decoder/oggvorbis/decoder.go | 43 +++------------ 3 files changed, 74 insertions(+), 58 deletions(-) create mode 100644 internal/ingest/decoder/helpers.go diff --git a/internal/ingest/decoder/helpers.go b/internal/ingest/decoder/helpers.go new file mode 100644 index 0000000..4443038 --- /dev/null +++ b/internal/ingest/decoder/helpers.go @@ -0,0 +1,58 @@ +package decoder + +import ( + "encoding/binary" + "math" + "time" + + "github.com/jan/fm-rds-tx/internal/ingest" +) + +const defaultSampleRateHz = 44100 + +func ResolveSampleRate(decodedSampleRateHz int, meta StreamMeta) int { + if decodedSampleRateHz > 0 { + return decodedSampleRateHz + } + if meta.SampleRateHz > 0 { + return meta.SampleRateHz + } + return defaultSampleRateHz +} + +func BuildChunk(samples []int32, channels, sampleRateHz int, seq uint64, sourceID string) ingest.PCMChunk { + return ingest.PCMChunk{ + Samples: samples, + Channels: channels, + SampleRateHz: sampleRateHz, + Sequence: seq, + Timestamp: time.Now(), + SourceID: sourceID, + } +} + +func PCM16LEToPCM32(in []byte) []int32 { + out := make([]int32, 0, len(in)/2) + for i := 0; i+1 < len(in); i += 2 { + v := int16(binary.LittleEndian.Uint16(in[i : i+2])) + out = append(out, int32(v)<<16) + } + return out +} + +func Float32ToPCM32(in []float32) []int32 { + out := make([]int32, len(in)) + for i, sample := range in { + if sample > 1 { + sample = 1 + } else if sample < -1 { + sample = -1 + } + if sample == -1 { + out[i] = math.MinInt32 + continue + } + out[i] = int32(sample * math.MaxInt32) + } + return out +} diff --git a/internal/ingest/decoder/mp3/decoder.go b/internal/ingest/decoder/mp3/decoder.go index 2c7d46e..4d2a71e 100644 --- a/internal/ingest/decoder/mp3/decoder.go +++ b/internal/ingest/decoder/mp3/decoder.go @@ -2,10 +2,8 @@ package mp3 import ( "context" - "encoding/binary" "fmt" "io" - "time" gomp3 "github.com/hajimehoshi/go-mp3" "github.com/jan/fm-rds-tx/internal/ingest" @@ -32,14 +30,7 @@ func (d *Decoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.St } const channels = 2 // go-mp3 always decodes to stereo s16le - sampleRate := dec.SampleRate() - if sampleRate <= 0 { - if meta.SampleRateHz > 0 { - sampleRate = meta.SampleRateHz - } else { - sampleRate = 44100 - } - } + sampleRate := decoder.ResolveSampleRate(dec.SampleRate(), meta) const chunkFrames = 1024 const frameBytes = channels * 2 @@ -74,17 +65,11 @@ func (d *Decoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.St } func emitChunk(data []byte, seq uint64, sampleRate int, sourceID string, emit func(ingest.PCMChunk) error) error { - samples := make([]int32, 0, len(data)/2) - for i := 0; i+1 < len(data); i += 2 { - v := int16(binary.LittleEndian.Uint16(data[i : i+2])) - samples = append(samples, int32(v)<<16) - } - return emit(ingest.PCMChunk{ - Samples: samples, - Channels: 2, - SampleRateHz: sampleRate, - Sequence: seq, - Timestamp: time.Now(), - SourceID: sourceID, - }) + return emit(decoder.BuildChunk( + decoder.PCM16LEToPCM32(data), + 2, + sampleRate, + seq, + sourceID, + )) } diff --git a/internal/ingest/decoder/oggvorbis/decoder.go b/internal/ingest/decoder/oggvorbis/decoder.go index c3de4da..ef1dca0 100644 --- a/internal/ingest/decoder/oggvorbis/decoder.go +++ b/internal/ingest/decoder/oggvorbis/decoder.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "io" - "math" - "time" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" @@ -40,14 +38,7 @@ func (d *Decoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.St } } - sampleRate := dec.SampleRate() - if sampleRate <= 0 { - if meta.SampleRateHz > 0 { - sampleRate = meta.SampleRateHz - } else { - sampleRate = 44100 - } - } + sampleRate := decoder.ResolveSampleRate(dec.SampleRate(), meta) const chunkFrames = 1024 buf := make([]float32, chunkFrames*channels) @@ -62,14 +53,13 @@ func (d *Decoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.St n, readErr := dec.Read(buf) if n > 0 { - chunk := ingest.PCMChunk{ - Samples: float32ToPCM32(buf[:n]), - Channels: channels, - SampleRateHz: sampleRate, - Sequence: seq, - Timestamp: time.Now(), - SourceID: meta.SourceID, - } + chunk := decoder.BuildChunk( + decoder.Float32ToPCM32(buf[:n]), + channels, + sampleRate, + seq, + meta.SourceID, + ) if err := emit(chunk); err != nil { return err } @@ -84,20 +74,3 @@ func (d *Decoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.St } } } - -func float32ToPCM32(in []float32) []int32 { - out := make([]int32, len(in)) - for i, sample := range in { - if sample > 1 { - sample = 1 - } else if sample < -1 { - sample = -1 - } - if sample == -1 { - out[i] = math.MinInt32 - continue - } - out[i] = int32(sample * math.MaxInt32) - } - return out -}