From 8d02c57348ae40d3d1133058c84c119bd9114cb7 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 08:01:30 +0200 Subject: [PATCH] ingest: add stdin PCM source adapter --- internal/ingest/adapters/stdinpcm/source.go | 180 ++++++++++++++++++ .../ingest/adapters/stdinpcm/source_test.go | 33 ++++ 2 files changed, 213 insertions(+) create mode 100644 internal/ingest/adapters/stdinpcm/source.go create mode 100644 internal/ingest/adapters/stdinpcm/source_test.go diff --git a/internal/ingest/adapters/stdinpcm/source.go b/internal/ingest/adapters/stdinpcm/source.go new file mode 100644 index 0000000..5785928 --- /dev/null +++ b/internal/ingest/adapters/stdinpcm/source.go @@ -0,0 +1,180 @@ +package stdinpcm + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/jan/fm-rds-tx/internal/ingest" +) + +type Source struct { + id string + reader io.Reader + sampleRate int + channels int + chunkFrames int + + chunks chan ingest.PCMChunk + errs chan error + + cancel context.CancelFunc + wg sync.WaitGroup + + state atomic.Value // string + chunksIn atomic.Uint64 + samplesIn atomic.Uint64 + discontinuities atomic.Uint64 + lastChunkAtUnix atomic.Int64 + lastError atomic.Value // string +} + +func New(id string, reader io.Reader, sampleRate, channels, chunkFrames int) *Source { + if id == "" { + id = "stdin" + } + if sampleRate <= 0 { + sampleRate = 44100 + } + if channels <= 0 { + channels = 2 + } + if chunkFrames <= 0 { + chunkFrames = 1024 + } + + s := &Source{ + id: id, + reader: reader, + sampleRate: sampleRate, + channels: channels, + chunkFrames: chunkFrames, + chunks: make(chan ingest.PCMChunk, 8), + errs: make(chan error, 4), + } + s.state.Store("idle") + return s +} + +func (s *Source) Descriptor() ingest.SourceDescriptor { + return ingest.SourceDescriptor{ + ID: s.id, + Kind: "stdin-pcm", + Family: "raw", + Transport: "stdin", + Codec: "pcm_s16le", + Channels: s.channels, + SampleRateHz: s.sampleRate, + Detail: "S16LE interleaved PCM via stdin", + } +} + +func (s *Source) Start(ctx context.Context) error { + if s.reader == nil { + return fmt.Errorf("stdin source reader is nil") + } + runCtx, cancel := context.WithCancel(ctx) + s.cancel = cancel + s.state.Store("running") + + s.wg.Add(1) + go s.readLoop(runCtx) + return nil +} + +func (s *Source) Stop() error { + if s.cancel != nil { + s.cancel() + } + s.wg.Wait() + s.state.Store("stopped") + return nil +} + +func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks } +func (s *Source) Errors() <-chan error { return s.errs } + +func (s *Source) Stats() ingest.SourceStats { + state, _ := s.state.Load().(string) + last := s.lastChunkAtUnix.Load() + errStr, _ := s.lastError.Load().(string) + var lastChunkAt time.Time + if last > 0 { + lastChunkAt = time.Unix(0, last) + } + return ingest.SourceStats{ + State: state, + Connected: state == "running", + LastChunkAt: lastChunkAt, + ChunksIn: s.chunksIn.Load(), + SamplesIn: s.samplesIn.Load(), + Discontinuities: s.discontinuities.Load(), + LastError: errStr, + } +} + +func (s *Source) readLoop(ctx context.Context) { + defer s.wg.Done() + defer close(s.chunks) + + frameBytes := s.channels * 2 + buf := make([]byte, s.chunkFrames*frameBytes) + seq := uint64(0) + + for { + select { + case <-ctx.Done(): + return + default: + } + + n, err := io.ReadAtLeast(s.reader, buf, frameBytes) + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + if n > 0 { + s.emitChunk(buf[:n], seq) + } + s.state.Store("stopped") + return + } + wrapped := fmt.Errorf("stdin read: %w", err) + s.lastError.Store(wrapped.Error()) + s.state.Store("failed") + select { + case s.errs <- wrapped: + default: + } + return + } + s.emitChunk(buf[:n], seq) + seq++ + } +} + +func (s *Source) emitChunk(data []byte, seq uint64) { + samples := make([]int32, 0, len(data)/2) + for i := 0; i+1 < len(data); i += 2 { + v := int16(binary.LittleEndian.Uint16(data[i : i+2])) + samples = append(samples, int32(v)<<16) + } + chunk := ingest.PCMChunk{ + Samples: samples, + Channels: s.channels, + SampleRateHz: s.sampleRate, + Sequence: seq, + Timestamp: time.Now(), + SourceID: s.id, + } + s.chunksIn.Add(1) + s.samplesIn.Add(uint64(len(samples))) + s.lastChunkAtUnix.Store(time.Now().UnixNano()) + select { + case s.chunks <- chunk: + default: + s.discontinuities.Add(1) + } +} diff --git a/internal/ingest/adapters/stdinpcm/source_test.go b/internal/ingest/adapters/stdinpcm/source_test.go new file mode 100644 index 0000000..7331d99 --- /dev/null +++ b/internal/ingest/adapters/stdinpcm/source_test.go @@ -0,0 +1,33 @@ +package stdinpcm + +import ( + "bytes" + "context" + "testing" + "time" +) + +func TestSourceReadsPCMChunks(t *testing.T) { + // Two stereo frames (S16LE): [0,0] and [32767,-32768] + raw := []byte{ + 0x00, 0x00, 0x00, 0x00, + 0xff, 0x7f, 0x00, 0x80, + } + src := New("stdin-test", bytes.NewReader(raw), 44100, 2, 2) + if err := src.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer src.Stop() + + select { + case chunk := <-src.Chunks(): + if chunk.Channels != 2 { + t.Fatalf("channels=%d", chunk.Channels) + } + if len(chunk.Samples) != 4 { + t.Fatalf("samples=%d want 4", len(chunk.Samples)) + } + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for chunk") + } +}