From faf1aed47290c07869c04f2b659ab8a80394231e Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 20:39:52 +0200 Subject: [PATCH] ingest: reconnect icecast streams and propagate chunk metadata --- internal/ingest/adapters/icecast/source.go | 7 ++- .../ingest/adapters/icecast/source_test.go | 51 +++++++++++++++++++ internal/ingest/runtime.go | 6 +++ internal/ingest/runtime_test.go | 31 +++++++++++ 4 files changed, 91 insertions(+), 4 deletions(-) diff --git a/internal/ingest/adapters/icecast/source.go b/internal/ingest/adapters/icecast/source.go index 784891d..2970e82 100644 --- a/internal/ingest/adapters/icecast/source.go +++ b/internal/ingest/adapters/icecast/source.go @@ -75,7 +75,9 @@ func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Op id = "icecast-main" } if client == nil { - client = &http.Client{Timeout: 20 * time.Second} + // Streaming responses are long-lived; a global client timeout would + // terminate the body read after a fixed duration. + client = &http.Client{} } s := &Source{ id: id, @@ -202,9 +204,6 @@ func (s *Source) loop(ctx context.Context) { if err == nil { err = errStreamEnded } - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return - } s.connected.Store(false) s.lastError.Store(err.Error()) select { diff --git a/internal/ingest/adapters/icecast/source_test.go b/internal/ingest/adapters/icecast/source_test.go index 9984269..3e378bb 100644 --- a/internal/ingest/adapters/icecast/source_test.go +++ b/internal/ingest/adapters/icecast/source_test.go @@ -511,6 +511,57 @@ func TestSourceClearsLastErrorAfterSuccessfulReconnect(t *testing.T) { } } +func TestNewWithoutClientUsesStreamingSafeHTTPClient(t *testing.T) { + src := New("ice-test", "http://example", nil, ReconnectConfig{}) + if src.client == nil { + t.Fatal("expected default http client") + } + if src.client.Timeout != 0 { + t.Fatalf("client timeout=%v want 0 for streaming", src.client.Timeout) + } +} + +func TestSourceReconnectsAfterDeadlineExceededError(t *testing.T) { + var requests atomic.Int64 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + requests.Add(1) + w.Header().Set("Content-Type", "audio/mpeg") + _, _ = w.Write([]byte("test-stream")) + })) + defer srv.Close() + + dec := &scriptedLoopDecoder{ + actions: []decodeAction{ + {err: context.DeadlineExceeded}, // first attempt fails transiently + {blockUntilStop: true}, // second attempt recovers and stays running + }, + } + reg := decoder.NewRegistry() + reg.Register("mp3", func() decoder.Decoder { return dec }) + reg.Register("ffmpeg", func() decoder.Decoder { return &testDecoder{name: "ffmpeg"} }) + + src := New("ice-test", srv.URL, srv.Client(), ReconnectConfig{ + Enabled: true, + InitialBackoffMs: 1, + MaxBackoffMs: 1, + }, WithDecoderRegistry(reg), WithDecoderPreference("auto")) + + if err := src.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer src.Stop() + + waitForCondition(t, func() bool { return dec.callCount() >= 2 }, "second decode call after deadline exceeded") + + stats := src.Stats() + if stats.Reconnects < 1 { + t.Fatalf("reconnects=%d want >=1", stats.Reconnects) + } + if got := requests.Load(); got < 2 { + t.Fatalf("requests=%d want >=2", got) + } +} + func waitForCondition(t *testing.T, cond func() bool, label string) { t.Helper() deadline := time.Now().Add(2 * time.Second) diff --git a/internal/ingest/runtime.go b/internal/ingest/runtime.go index 6b9e1ef..a6741b8 100644 --- a/internal/ingest/runtime.go +++ b/internal/ingest/runtime.go @@ -161,6 +161,12 @@ func (r *Runtime) handleChunk(chunk PCMChunk) { } } r.mu.Lock() + if chunk.SampleRateHz > 0 { + r.active.SampleRateHz = chunk.SampleRateHz + } + if chunk.Channels > 0 { + r.active.Channels = chunk.Channels + } r.stats.State = "running" r.stats.LastChunkAt = time.Now() r.stats.DroppedFrames += dropped diff --git a/internal/ingest/runtime_test.go b/internal/ingest/runtime_test.go index 48cfcb3..1fc7c36 100644 --- a/internal/ingest/runtime_test.go +++ b/internal/ingest/runtime_test.go @@ -164,6 +164,37 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) } } +func TestRuntimeUpdatesActiveDescriptorFromChunkMetadata(t *testing.T) { + sink := audio.NewStreamSource(128, 44100) + src := newFakeSource() + src.desc = SourceDescriptor{ + ID: "icecast-primary", + Kind: "icecast", + Channels: 0, + SampleRateHz: 0, + } + rt := NewRuntime(sink, src) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.chunks <- PCMChunk{ + Channels: 2, + SampleRateHz: 48000, + Samples: []int32{100 << 16, -100 << 16}, + } + + waitForRuntimeState(t, rt, "running") + stats := rt.Stats() + if stats.Active.SampleRateHz != 48000 { + t.Fatalf("active sampleRateHz=%d want 48000", stats.Active.SampleRateHz) + } + if stats.Active.Channels != 2 { + t.Fatalf("active channels=%d want 2", stats.Active.Channels) + } +} + func TestRuntimeForwardsStreamTitleUpdatesToHandler(t *testing.T) { sink := audio.NewStreamSource(128, 44100) src := newFakeSource()