diff --git a/internal/ingest/adapters/icecast/source.go b/internal/ingest/adapters/icecast/source.go index 3601d80..1da52c9 100644 --- a/internal/ingest/adapters/icecast/source.go +++ b/internal/ingest/adapters/icecast/source.go @@ -46,6 +46,8 @@ type Source struct { lastError atomic.Value // string } +var errStreamEnded = errors.New("icecast stream ended") + type Option func(*Source) func WithDecoderPreference(pref string) Option { @@ -115,6 +117,7 @@ func (s *Source) Start(ctx context.Context) error { } runCtx, cancel := context.WithCancel(ctx) s.cancel = cancel + s.lastError.Store("") s.state.Store("connecting") s.wg.Add(1) go s.loop(runCtx) @@ -156,6 +159,7 @@ func (s *Source) Stats() ingest.SourceStats { func (s *Source) loop(ctx context.Context) { defer s.wg.Done() defer close(s.chunks) + defer close(s.errs) attempt := 0 for { select { @@ -166,7 +170,13 @@ func (s *Source) loop(ctx context.Context) { s.state.Store("connecting") err := s.connectAndRun(ctx) - if err == nil || ctx.Err() != nil { + if ctx.Err() != nil { + return + } + if err == nil { + err = errStreamEnded + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } s.connected.Store(false) @@ -207,7 +217,7 @@ func (s *Source) connectAndRun(ctx context.Context) error { } s.connected.Store(true) s.state.Store("buffering") - + s.lastError.Store("") s.state.Store("running") return s.decodeWithPreference(ctx, resp.Body, decoder.StreamMeta{ ContentType: resp.Header.Get("Content-Type"), diff --git a/internal/ingest/adapters/icecast/source_test.go b/internal/ingest/adapters/icecast/source_test.go index 8171ebe..568110c 100644 --- a/internal/ingest/adapters/icecast/source_test.go +++ b/internal/ingest/adapters/icecast/source_test.go @@ -5,7 +5,13 @@ import ( "context" "errors" "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "sync/atomic" "testing" + "time" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" @@ -304,3 +310,149 @@ func TestWithDecoderPreferenceFallbackAliasNormalizesToFFmpeg(t *testing.T) { t.Fatalf("codec=%s want ffmpeg", got) } } + +type scriptedLoopDecoder struct { + mu sync.Mutex + actions []decodeAction + calls int + totalBytesRead int +} + +type decodeAction struct { + err error + blockUntilStop bool +} + +func (d *scriptedLoopDecoder) Name() string { return "scripted-loop" } + +func (d *scriptedLoopDecoder) DecodeStream(ctx context.Context, r io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error { + data, err := io.ReadAll(r) + if err != nil { + return err + } + + d.mu.Lock() + d.calls++ + d.totalBytesRead += len(data) + callIdx := d.calls - 1 + action := decodeAction{} + if callIdx < len(d.actions) { + action = d.actions[callIdx] + } + d.mu.Unlock() + + if action.blockUntilStop { + <-ctx.Done() + return nil + } + return action.err +} + +func (d *scriptedLoopDecoder) callCount() int { + d.mu.Lock() + defer d.mu.Unlock() + return d.calls +} + +func TestSourceReconnectsWhenStreamEndsCleanly(t *testing.T) { + var requests atomic.Int64 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + requests.Add(1) + w.Header().Set("Content-Type", "audio/mpeg") + _, _ = w.Write([]byte("test-stream")) + })) + defer srv.Close() + + dec := &scriptedLoopDecoder{ + actions: []decodeAction{ + {}, // first connection ends cleanly (EOS-like) + {blockUntilStop: true}, + }, + } + reg := decoder.NewRegistry() + reg.Register("mp3", func() decoder.Decoder { return dec }) + reg.Register("ffmpeg", func() decoder.Decoder { return &testDecoder{name: "ffmpeg"} }) + + src := New("ice-test", srv.URL, srv.Client(), ReconnectConfig{ + Enabled: true, + InitialBackoffMs: 1, + MaxBackoffMs: 1, + }, WithDecoderRegistry(reg), WithDecoderPreference("auto")) + + if err := src.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer src.Stop() + + waitForCondition(t, func() bool { return dec.callCount() >= 2 }, "second decode call after clean EOS") + + stats := src.Stats() + if stats.Reconnects < 1 { + t.Fatalf("reconnects=%d want >=1", stats.Reconnects) + } + if got := requests.Load(); got < 2 { + t.Fatalf("requests=%d want >=2", got) + } +} + +func TestSourceClearsLastErrorAfterSuccessfulReconnect(t *testing.T) { + const boom = "decoder boom" + var requests atomic.Int64 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + requests.Add(1) + w.Header().Set("Content-Type", "audio/mpeg") + _, _ = w.Write([]byte("test-stream")) + })) + defer srv.Close() + + dec := &scriptedLoopDecoder{ + actions: []decodeAction{ + {err: errors.New(boom)}, // first attempt fails + {blockUntilStop: true}, // second attempt recovers and stays running + }, + } + reg := decoder.NewRegistry() + reg.Register("mp3", func() decoder.Decoder { return dec }) + reg.Register("ffmpeg", func() decoder.Decoder { return &testDecoder{name: "ffmpeg"} }) + + src := New("ice-test", srv.URL, srv.Client(), ReconnectConfig{ + Enabled: true, + InitialBackoffMs: 1, + MaxBackoffMs: 1, + }, WithDecoderRegistry(reg), WithDecoderPreference("auto")) + + if err := src.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer src.Stop() + + select { + case err := <-src.Errors(): + if err == nil || !strings.Contains(err.Error(), boom) { + t.Fatalf("error=%v want contains %q", err, boom) + } + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for source error reporting") + } + + waitForCondition(t, func() bool { + st := src.Stats() + return dec.callCount() >= 2 && st.LastError == "" + }, "lastError cleared after successful reconnect") + + if got := requests.Load(); got < 2 { + t.Fatalf("requests=%d want >=2", got) + } +} + +func waitForCondition(t *testing.T, cond func() bool, label string) { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if cond() { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("timeout waiting for condition: %s", label) +} diff --git a/internal/ingest/runtime.go b/internal/ingest/runtime.go index df0048c..fa3ad64 100644 --- a/internal/ingest/runtime.go +++ b/internal/ingest/runtime.go @@ -136,6 +136,7 @@ func (r *Runtime) handleChunk(chunk PCMChunk) { } } r.mu.Lock() + r.stats.State = "running" r.stats.LastChunkAt = time.Now() r.stats.DroppedFrames += dropped r.stats.WriteBlocked = dropped > 0 diff --git a/internal/ingest/runtime_test.go b/internal/ingest/runtime_test.go index a3df6e7..6167c20 100644 --- a/internal/ingest/runtime_test.go +++ b/internal/ingest/runtime_test.go @@ -2,6 +2,8 @@ package ingest import ( "context" + "errors" + "sync" "testing" "time" @@ -13,6 +15,7 @@ type fakeSource struct { chunks chan PCMChunk errs chan error stats SourceStats + once sync.Once } func newFakeSource() *fakeSource { @@ -26,7 +29,7 @@ func newFakeSource() *fakeSource { func (s *fakeSource) Descriptor() SourceDescriptor { return s.desc } func (s *fakeSource) Start(context.Context) error { return nil } -func (s *fakeSource) Stop() error { close(s.chunks); return nil } +func (s *fakeSource) Stop() error { s.once.Do(func() { close(s.chunks) }); return nil } func (s *fakeSource) Chunks() <-chan PCMChunk { return s.chunks } func (s *fakeSource) Errors() <-chan error { return s.errs } func (s *fakeSource) Stats() SourceStats { return s.stats } @@ -54,3 +57,64 @@ func TestRuntimeWritesFramesToStreamSink(t *testing.T) { t.Fatal("expected at least one frame in sink") } } + +func TestRuntimeRecoversToRunningAfterSourceError(t *testing.T) { + sink := audio.NewStreamSource(128, 44100) + src := newFakeSource() + rt := NewRuntime(sink, src) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.errs <- errors.New("decode transient failure") + waitForRuntimeState(t, rt, "degraded") + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 44100, + Samples: []int32{500 << 16, -500 << 16}, + } + waitForRuntimeState(t, rt, "running") +} + +func TestRuntimeRecoversToRunningAfterConvertError(t *testing.T) { + sink := audio.NewStreamSource(128, 44100) + src := newFakeSource() + rt := NewRuntime(sink, src) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + // Invalid stereo chunk: odd sample count causes conversion error. + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 44100, + Samples: []int32{100 << 16}, + } + waitForRuntimeState(t, rt, "degraded") + + if got := rt.Stats().Runtime.ConvertErrors; got != 1 { + t.Fatalf("convertErrors=%d want 1", got) + } + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 44100, + Samples: []int32{300 << 16, -300 << 16}, + } + waitForRuntimeState(t, rt, "running") +} + +func waitForRuntimeState(t *testing.T, rt *Runtime, want string) { + t.Helper() + deadline := time.Now().Add(1 * time.Second) + for time.Now().Before(deadline) { + if got := rt.Stats().Runtime.State; got == want { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("timeout waiting for runtime state %q; last=%q", want, rt.Stats().Runtime.State) +}