package ingest import ( "context" "errors" "sync" "testing" "time" "github.com/jan/fm-rds-tx/internal/audio" ) type fakeSource struct { desc SourceDescriptor chunks chan PCMChunk errs chan error title chan string stats SourceStats once sync.Once } func newFakeSource() *fakeSource { return &fakeSource{ desc: SourceDescriptor{ID: "fake", Kind: "stdin-pcm"}, chunks: make(chan PCMChunk, 4), errs: make(chan error, 1), title: make(chan string, 4), stats: SourceStats{State: "running", Connected: true}, } } func (s *fakeSource) Descriptor() SourceDescriptor { return s.desc } func (s *fakeSource) Start(context.Context) error { return nil } func (s *fakeSource) Stop() error { s.once.Do(func() { close(s.chunks) }); return nil } func (s *fakeSource) Chunks() <-chan PCMChunk { return s.chunks } func (s *fakeSource) Errors() <-chan error { return s.errs } func (s *fakeSource) StreamTitleUpdates() <-chan string { return s.title } func (s *fakeSource) Stats() SourceStats { return s.stats } func TestRuntimeWritesFramesToStreamSink(t *testing.T) { sink := audio.NewStreamSource(128, 44100) src := newFakeSource() rt := NewRuntime(sink, src) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } defer rt.Stop() src.chunks <- PCMChunk{ Channels: 2, SampleRateHz: 44100, Samples: []int32{1000 << 16, -1000 << 16}, } deadline := time.Now().Add(1 * time.Second) for sink.Available() < 1 && time.Now().Before(deadline) { time.Sleep(10 * time.Millisecond) } if sink.Available() < 1 { t.Fatal("expected at least one frame in sink") } } func TestRuntimeRecoversToRunningAfterSourceError(t *testing.T) { sink := audio.NewStreamSource(128, 44100) src := newFakeSource() rt := NewRuntime(sink, src) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } defer rt.Stop() src.errs <- errors.New("decode transient failure") waitForRuntimeState(t, rt, "degraded") src.chunks <- PCMChunk{ Channels: 2, SampleRateHz: 44100, Samples: []int32{500 << 16, -500 << 16}, } waitForRuntimeState(t, rt, "running") } func TestRuntimeRecoversToRunningAfterConvertError(t *testing.T) { sink := audio.NewStreamSource(128, 44100) src := newFakeSource() rt := NewRuntime(sink, src) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } defer rt.Stop() // Invalid stereo chunk: odd sample count causes conversion error. src.chunks <- PCMChunk{ Channels: 2, SampleRateHz: 44100, Samples: []int32{100 << 16}, } waitForRuntimeState(t, rt, "degraded") if got := rt.Stats().Runtime.ConvertErrors; got != 1 { t.Fatalf("convertErrors=%d want 1", got) } src.chunks <- PCMChunk{ Channels: 2, SampleRateHz: 44100, Samples: []int32{300 << 16, -300 << 16}, } waitForRuntimeState(t, rt, "running") } func TestRuntimeWithMissingSourceStaysIdleAndReturnsZeroSourceStats(t *testing.T) { sink := audio.NewStreamSource(128, 44100) rt := NewRuntime(sink, nil) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } stats := rt.Stats() if stats.Runtime.State != "idle" { t.Fatalf("runtime state=%q want idle", stats.Runtime.State) } if stats.Active.ID != "" || stats.Active.Kind != "" { t.Fatalf("expected empty active descriptor, got %+v", stats.Active) } if stats.Source.State != "" { t.Fatalf("expected zero-value source stats, got state=%q", stats.Source.State) } } func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) { sink := audio.NewStreamSource(128, 44100) src := newFakeSource() src.desc = SourceDescriptor{ID: "icecast-primary", Kind: "icecast"} src.stats = SourceStats{ State: "reconnecting", Connected: false, Reconnects: 4, LastError: "stream ended", } rt := NewRuntime(sink, src) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } defer rt.Stop() stats := rt.Stats() if stats.Active.ID != "icecast-primary" { t.Fatalf("active id=%q want icecast-primary", stats.Active.ID) } if stats.Active.Kind != "icecast" { t.Fatalf("active kind=%q want icecast", stats.Active.Kind) } if stats.Source.Reconnects != 4 { t.Fatalf("source reconnects=%d want 4", stats.Source.Reconnects) } if stats.Source.LastError != "stream ended" { t.Fatalf("source lastError=%q want stream ended", stats.Source.LastError) } } func TestRuntimePrebufferGateAppliesBeforeSinkWrites(t *testing.T) { sink := audio.NewStreamSource(512, 1000) src := newFakeSource() rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond)) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } defer rt.Stop() src.chunks <- PCMChunk{ Channels: 2, SampleRateHz: 1000, Samples: stereoSamples(80, 100), } time.Sleep(30 * time.Millisecond) if sink.Available() != 0 { t.Fatalf("sink available=%d want 0 while prebuffering", sink.Available()) } stats := rt.Stats() if stats.Runtime.State != "prebuffering" || !stats.Runtime.Prebuffering { t.Fatalf("runtime state=%q prebuffering=%t", stats.Runtime.State, stats.Runtime.Prebuffering) } if stats.Runtime.BufferedSeconds <= 0 { t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds) } src.chunks <- PCMChunk{ Channels: 2, SampleRateHz: 1000, Samples: stereoSamples(40, 120), } waitForSinkFrames(t, sink, 1) waitForRuntimeState(t, rt, "running") if got := rt.Stats().Runtime.Prebuffering; got { t.Fatalf("runtime prebuffering=%t want false", got) } } func TestRuntimeWriteBlockedRetainsWorkingBuffer(t *testing.T) { sink := audio.NewStreamSource(1, 1000) src := newFakeSource() rt := NewRuntime(sink, src) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } defer rt.Stop() src.chunks <- PCMChunk{ Channels: 2, SampleRateHz: 1000, Samples: stereoSamples(4, 200), } waitForSinkFrames(t, sink, 1) waitForRuntimeState(t, rt, "running") stats := rt.Stats() if stats.Runtime.WriteBlocked { t.Fatalf("runtime writeBlocked=%t want false", stats.Runtime.WriteBlocked) } if stats.Runtime.BufferedSeconds <= 0 { t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds) } if stats.Runtime.DroppedFrames != 0 { t.Fatalf("runtime droppedFrames=%d want 0", stats.Runtime.DroppedFrames) } if got := sink.Stats().Overflows; got != 0 { t.Fatalf("sink overflows=%d want 0", got) } } func TestRuntimeDrainWorkingBufferIsBurstBounded(t *testing.T) { sink := audio.NewStreamSource(64, 1000) rt := NewRuntime(sink, nil) rt.gateOpen = true for i := 0; i < 40; i++ { if !rt.work.push(audio.NewFrame(0.1, -0.1)) { t.Fatalf("failed to seed work frame %d", i) } } rt.lastDrainAt = time.Now().Add(-time.Second) rt.drainWorkingBuffer() if got := sink.Available(); got != 20 { t.Fatalf("sink available=%d want 20 (20ms burst at 1kHz)", got) } if got := rt.work.available(); got != 20 { t.Fatalf("work available=%d want 20", got) } if got := rt.Stats().Runtime.WriteBlocked; got { t.Fatalf("runtime writeBlocked=%t want false", got) } } func TestRuntimeDrainWorkingBufferHonorsSinkHeadroom(t *testing.T) { sink := audio.NewStreamSource(64, 1000) rt := NewRuntime(sink, nil) for i := 0; i < 63; i++ { if !sink.WriteFrame(audio.NewFrame(0.2, -0.2)) { t.Fatalf("failed to seed sink frame %d", i) } } rt.gateOpen = true for i := 0; i < 8; i++ { if !rt.work.push(audio.NewFrame(0.3, -0.3)) { t.Fatalf("failed to seed work frame %d", i) } } rt.lastDrainAt = time.Now().Add(-time.Second) rt.drainWorkingBuffer() if got := sink.Available(); got != 64 { t.Fatalf("sink available=%d want 64", got) } if got := rt.work.available(); got != 7 { t.Fatalf("work available=%d want 7", got) } if got := sink.Stats().Overflows; got != 0 { t.Fatalf("sink overflows=%d want 0", got) } if got := rt.Stats().Runtime.WriteBlocked; got { t.Fatalf("runtime writeBlocked=%t want false", got) } } func TestRuntimeStatsSourceBufferedSecondsIncludesWorkingBuffer(t *testing.T) { sink := audio.NewStreamSource(32, 1000) src := newFakeSource() src.stats = SourceStats{State: "running", Connected: true, BufferedSeconds: 0} rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond)) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } defer rt.Stop() src.chunks <- PCMChunk{ Channels: 2, SampleRateHz: 1000, Samples: stereoSamples(50, 300), } time.Sleep(20 * time.Millisecond) stats := rt.Stats() if stats.Source.BufferedSeconds <= 0 { t.Fatalf("source bufferedSeconds=%f want > 0", stats.Source.BufferedSeconds) } } func TestRuntimeUpdatesActiveDescriptorFromChunkMetadata(t *testing.T) { sink := audio.NewStreamSource(128, 44100) src := newFakeSource() src.desc = SourceDescriptor{ ID: "icecast-primary", Kind: "icecast", Channels: 0, SampleRateHz: 0, } rt := NewRuntime(sink, src) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } defer rt.Stop() src.chunks <- PCMChunk{ Channels: 2, SampleRateHz: 48000, Samples: []int32{100 << 16, -100 << 16}, } waitForRuntimeState(t, rt, "running") stats := rt.Stats() if stats.Active.SampleRateHz != 48000 { t.Fatalf("active sampleRateHz=%d want 48000", stats.Active.SampleRateHz) } if stats.Active.Channels != 2 { t.Fatalf("active channels=%d want 2", stats.Active.Channels) } } func TestRuntimeForwardsStreamTitleUpdatesToHandler(t *testing.T) { sink := audio.NewStreamSource(128, 44100) src := newFakeSource() got := make(chan string, 1) rt := NewRuntime(sink, src, WithStreamTitleHandler(func(title string) { got <- title })) if err := rt.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } defer rt.Stop() src.title <- "Artist - Song" select { case title := <-got: if title != "Artist - Song" { t.Fatalf("title=%q want %q", title, "Artist - Song") } case <-time.After(1 * time.Second): t.Fatal("timed out waiting for forwarded title") } } func waitForRuntimeState(t *testing.T, rt *Runtime, want string) { t.Helper() deadline := time.Now().Add(1 * time.Second) for time.Now().Before(deadline) { if got := rt.Stats().Runtime.State; got == want { return } time.Sleep(10 * time.Millisecond) } t.Fatalf("timeout waiting for runtime state %q; last=%q", want, rt.Stats().Runtime.State) } func waitForSinkFrames(t *testing.T, sink *audio.StreamSource, minFrames int) { t.Helper() deadline := time.Now().Add(1 * time.Second) for time.Now().Before(deadline) { if sink.Available() >= minFrames { return } time.Sleep(10 * time.Millisecond) } t.Fatalf("timeout waiting for sink frames: have=%d want>=%d", sink.Available(), minFrames) } func stereoSamples(frames int, v int32) []int32 { out := make([]int32, 0, frames*2) for i := 0; i < frames; i++ { out = append(out, v<<16, -v<<16) } return out }