Browse Source

ingest: harden runtime and source recovery paths

main
Jan 1 month ago
parent
commit
5354ca54a1
4 changed files with 230 additions and 3 deletions
  1. +12
    -2
      internal/ingest/adapters/icecast/source.go
  2. +152
    -0
      internal/ingest/adapters/icecast/source_test.go
  3. +1
    -0
      internal/ingest/runtime.go
  4. +65
    -1
      internal/ingest/runtime_test.go

+ 12
- 2
internal/ingest/adapters/icecast/source.go View File

@@ -46,6 +46,8 @@ type Source struct {
lastError atomic.Value // string
}

var errStreamEnded = errors.New("icecast stream ended")

type Option func(*Source)

func WithDecoderPreference(pref string) Option {
@@ -115,6 +117,7 @@ func (s *Source) Start(ctx context.Context) error {
}
runCtx, cancel := context.WithCancel(ctx)
s.cancel = cancel
s.lastError.Store("")
s.state.Store("connecting")
s.wg.Add(1)
go s.loop(runCtx)
@@ -156,6 +159,7 @@ func (s *Source) Stats() ingest.SourceStats {
func (s *Source) loop(ctx context.Context) {
defer s.wg.Done()
defer close(s.chunks)
defer close(s.errs)
attempt := 0
for {
select {
@@ -166,7 +170,13 @@ func (s *Source) loop(ctx context.Context) {

s.state.Store("connecting")
err := s.connectAndRun(ctx)
if err == nil || ctx.Err() != nil {
if ctx.Err() != nil {
return
}
if err == nil {
err = errStreamEnded
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
s.connected.Store(false)
@@ -207,7 +217,7 @@ func (s *Source) connectAndRun(ctx context.Context) error {
}
s.connected.Store(true)
s.state.Store("buffering")
s.lastError.Store("")
s.state.Store("running")
return s.decodeWithPreference(ctx, resp.Body, decoder.StreamMeta{
ContentType: resp.Header.Get("Content-Type"),


+ 152
- 0
internal/ingest/adapters/icecast/source_test.go View File

@@ -5,7 +5,13 @@ import (
"context"
"errors"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/jan/fm-rds-tx/internal/ingest"
"github.com/jan/fm-rds-tx/internal/ingest/decoder"
@@ -304,3 +310,149 @@ func TestWithDecoderPreferenceFallbackAliasNormalizesToFFmpeg(t *testing.T) {
t.Fatalf("codec=%s want ffmpeg", got)
}
}

type scriptedLoopDecoder struct {
mu sync.Mutex
actions []decodeAction
calls int
totalBytesRead int
}

type decodeAction struct {
err error
blockUntilStop bool
}

func (d *scriptedLoopDecoder) Name() string { return "scripted-loop" }

func (d *scriptedLoopDecoder) DecodeStream(ctx context.Context, r io.Reader, _ decoder.StreamMeta, _ func(ingest.PCMChunk) error) error {
data, err := io.ReadAll(r)
if err != nil {
return err
}

d.mu.Lock()
d.calls++
d.totalBytesRead += len(data)
callIdx := d.calls - 1
action := decodeAction{}
if callIdx < len(d.actions) {
action = d.actions[callIdx]
}
d.mu.Unlock()

if action.blockUntilStop {
<-ctx.Done()
return nil
}
return action.err
}

func (d *scriptedLoopDecoder) callCount() int {
d.mu.Lock()
defer d.mu.Unlock()
return d.calls
}

func TestSourceReconnectsWhenStreamEndsCleanly(t *testing.T) {
var requests atomic.Int64
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
requests.Add(1)
w.Header().Set("Content-Type", "audio/mpeg")
_, _ = w.Write([]byte("test-stream"))
}))
defer srv.Close()

dec := &scriptedLoopDecoder{
actions: []decodeAction{
{}, // first connection ends cleanly (EOS-like)
{blockUntilStop: true},
},
}
reg := decoder.NewRegistry()
reg.Register("mp3", func() decoder.Decoder { return dec })
reg.Register("ffmpeg", func() decoder.Decoder { return &testDecoder{name: "ffmpeg"} })

src := New("ice-test", srv.URL, srv.Client(), ReconnectConfig{
Enabled: true,
InitialBackoffMs: 1,
MaxBackoffMs: 1,
}, WithDecoderRegistry(reg), WithDecoderPreference("auto"))

if err := src.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer src.Stop()

waitForCondition(t, func() bool { return dec.callCount() >= 2 }, "second decode call after clean EOS")

stats := src.Stats()
if stats.Reconnects < 1 {
t.Fatalf("reconnects=%d want >=1", stats.Reconnects)
}
if got := requests.Load(); got < 2 {
t.Fatalf("requests=%d want >=2", got)
}
}

func TestSourceClearsLastErrorAfterSuccessfulReconnect(t *testing.T) {
const boom = "decoder boom"
var requests atomic.Int64
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
requests.Add(1)
w.Header().Set("Content-Type", "audio/mpeg")
_, _ = w.Write([]byte("test-stream"))
}))
defer srv.Close()

dec := &scriptedLoopDecoder{
actions: []decodeAction{
{err: errors.New(boom)}, // first attempt fails
{blockUntilStop: true}, // second attempt recovers and stays running
},
}
reg := decoder.NewRegistry()
reg.Register("mp3", func() decoder.Decoder { return dec })
reg.Register("ffmpeg", func() decoder.Decoder { return &testDecoder{name: "ffmpeg"} })

src := New("ice-test", srv.URL, srv.Client(), ReconnectConfig{
Enabled: true,
InitialBackoffMs: 1,
MaxBackoffMs: 1,
}, WithDecoderRegistry(reg), WithDecoderPreference("auto"))

if err := src.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer src.Stop()

select {
case err := <-src.Errors():
if err == nil || !strings.Contains(err.Error(), boom) {
t.Fatalf("error=%v want contains %q", err, boom)
}
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for source error reporting")
}

waitForCondition(t, func() bool {
st := src.Stats()
return dec.callCount() >= 2 && st.LastError == ""
}, "lastError cleared after successful reconnect")

if got := requests.Load(); got < 2 {
t.Fatalf("requests=%d want >=2", got)
}
}

func waitForCondition(t *testing.T, cond func() bool, label string) {
t.Helper()
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if cond() {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timeout waiting for condition: %s", label)
}

+ 1
- 0
internal/ingest/runtime.go View File

@@ -136,6 +136,7 @@ func (r *Runtime) handleChunk(chunk PCMChunk) {
}
}
r.mu.Lock()
r.stats.State = "running"
r.stats.LastChunkAt = time.Now()
r.stats.DroppedFrames += dropped
r.stats.WriteBlocked = dropped > 0


+ 65
- 1
internal/ingest/runtime_test.go View File

@@ -2,6 +2,8 @@ package ingest

import (
"context"
"errors"
"sync"
"testing"
"time"

@@ -13,6 +15,7 @@ type fakeSource struct {
chunks chan PCMChunk
errs chan error
stats SourceStats
once sync.Once
}

func newFakeSource() *fakeSource {
@@ -26,7 +29,7 @@ func newFakeSource() *fakeSource {

func (s *fakeSource) Descriptor() SourceDescriptor { return s.desc }
func (s *fakeSource) Start(context.Context) error { return nil }
func (s *fakeSource) Stop() error { close(s.chunks); return nil }
func (s *fakeSource) Stop() error { s.once.Do(func() { close(s.chunks) }); return nil }
func (s *fakeSource) Chunks() <-chan PCMChunk { return s.chunks }
func (s *fakeSource) Errors() <-chan error { return s.errs }
func (s *fakeSource) Stats() SourceStats { return s.stats }
@@ -54,3 +57,64 @@ func TestRuntimeWritesFramesToStreamSink(t *testing.T) {
t.Fatal("expected at least one frame in sink")
}
}

func TestRuntimeRecoversToRunningAfterSourceError(t *testing.T) {
sink := audio.NewStreamSource(128, 44100)
src := newFakeSource()
rt := NewRuntime(sink, src)
if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()

src.errs <- errors.New("decode transient failure")
waitForRuntimeState(t, rt, "degraded")

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 44100,
Samples: []int32{500 << 16, -500 << 16},
}
waitForRuntimeState(t, rt, "running")
}

func TestRuntimeRecoversToRunningAfterConvertError(t *testing.T) {
sink := audio.NewStreamSource(128, 44100)
src := newFakeSource()
rt := NewRuntime(sink, src)
if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()

// Invalid stereo chunk: odd sample count causes conversion error.
src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 44100,
Samples: []int32{100 << 16},
}
waitForRuntimeState(t, rt, "degraded")

if got := rt.Stats().Runtime.ConvertErrors; got != 1 {
t.Fatalf("convertErrors=%d want 1", got)
}

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 44100,
Samples: []int32{300 << 16, -300 << 16},
}
waitForRuntimeState(t, rt, "running")
}

func waitForRuntimeState(t *testing.T, rt *Runtime, want string) {
t.Helper()
deadline := time.Now().Add(1 * time.Second)
for time.Now().Before(deadline) {
if got := rt.Stats().Runtime.State; got == want {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timeout waiting for runtime state %q; last=%q", want, rt.Stats().Runtime.State)
}

Loading…
Cancel
Save