From 42d74c866586a4682a40176fecb57fb97cee5953 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 17:22:51 +0200 Subject: [PATCH] ingest: improve runtime observability coverage --- internal/control/control.go | 4 +- internal/control/control_test.go | 96 +++++++++++++++++++++++++++++++- internal/ingest/runtime_test.go | 52 +++++++++++++++++ 3 files changed, 148 insertions(+), 4 deletions(-) diff --git a/internal/control/control.go b/internal/control/control.go index 006c726..131c473 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -299,7 +299,9 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) { result["driver"] = drv.Stats() } if tx != nil { - result["engine"] = tx.TXStats() + if stats := tx.TXStats(); stats != nil { + result["engine"] = stats + } } if stream != nil { result["audioStream"] = stream.Stats() diff --git a/internal/control/control_test.go b/internal/control/control_test.go index b2a2752..a59dcbf 100644 --- a/internal/control/control_test.go +++ b/internal/control/control_test.go @@ -175,6 +175,16 @@ func TestRuntimeWithoutDriver(t *testing.T) { if rec.Code != 200 { t.Fatalf("status: %d", rec.Code) } + var body map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("unmarshal runtime: %v", err) + } + if _, ok := body["ingest"]; ok { + t.Fatalf("expected ingest payload to be absent when ingest runtime is not configured") + } + if _, ok := body["engine"]; ok { + t.Fatalf("expected engine payload to be absent when tx controller is not configured") + } } func TestRuntimeIncludesIngestStats(t *testing.T) { @@ -207,6 +217,82 @@ func TestRuntimeIncludesIngestStats(t *testing.T) { } } +func TestRuntimeIncludesDetailedIngestSourceAndRuntimeStats(t *testing.T) { + srv := NewServer(cfgpkg.Default()) + srv.SetIngestRuntime(&fakeIngestRuntime{ + stats: ingest.Stats{ + Active: ingest.SourceDescriptor{ID: "icecast-main", Kind: "icecast"}, + Source: ingest.SourceStats{ + State: "reconnecting", + Connected: false, + Reconnects: 3, + LastError: "dial tcp timeout", + }, + Runtime: ingest.RuntimeStats{ + State: "degraded", + ConvertErrors: 2, + WriteBlocked: true, + }, + }, + }) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/runtime", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("status: %d", rec.Code) + } + var body map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("unmarshal runtime: %v", err) + } + ingestPayload, ok := body["ingest"].(map[string]any) + if !ok { + t.Fatalf("expected ingest payload map, got %T", body["ingest"]) + } + source, ok := ingestPayload["source"].(map[string]any) + if !ok { + t.Fatalf("expected ingest.source map, got %T", ingestPayload["source"]) + } + if source["state"] != "reconnecting" { + t.Fatalf("source state mismatch: got %v", source["state"]) + } + if source["reconnects"] != float64(3) { + t.Fatalf("source reconnects mismatch: got %v", source["reconnects"]) + } + if source["lastError"] != "dial tcp timeout" { + t.Fatalf("source lastError mismatch: got %v", source["lastError"]) + } + runtimePayload, ok := ingestPayload["runtime"].(map[string]any) + if !ok { + t.Fatalf("expected ingest.runtime map, got %T", ingestPayload["runtime"]) + } + if runtimePayload["state"] != "degraded" { + t.Fatalf("runtime state mismatch: got %v", runtimePayload["state"]) + } + if runtimePayload["convertErrors"] != float64(2) { + t.Fatalf("runtime convertErrors mismatch: got %v", runtimePayload["convertErrors"]) + } + if runtimePayload["writeBlocked"] != true { + t.Fatalf("runtime writeBlocked mismatch: got %v", runtimePayload["writeBlocked"]) + } +} + +func TestRuntimeOmitsEngineWhenControllerReturnsNilStats(t *testing.T) { + srv := NewServer(cfgpkg.Default()) + srv.SetTXController(&fakeTXController{returnNilStats: true}) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/runtime", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("status: %d", rec.Code) + } + var body map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("unmarshal runtime: %v", err) + } + if _, ok := body["engine"]; ok { + t.Fatalf("expected engine field to be omitted when TXStats returns nil") + } +} + func TestRuntimeReportsFaultHistory(t *testing.T) { srv := NewServer(cfgpkg.Default()) history := []map[string]any{ @@ -626,9 +712,10 @@ func newConfigPostRequest(body []byte) *http.Request { } type fakeTXController struct { - updateErr error - resetErr error - stats map[string]any + updateErr error + resetErr error + stats map[string]any + returnNilStats bool } type fakeAudioIngress struct { @@ -652,6 +739,9 @@ func (f *fakeIngestRuntime) Stats() ingest.Stats { func (f *fakeTXController) StartTX() error { return nil } func (f *fakeTXController) StopTX() error { return nil } func (f *fakeTXController) TXStats() map[string]any { + if f.returnNilStats { + return nil + } if f.stats != nil { return f.stats } diff --git a/internal/ingest/runtime_test.go b/internal/ingest/runtime_test.go index 6167c20..ee82678 100644 --- a/internal/ingest/runtime_test.go +++ b/internal/ingest/runtime_test.go @@ -107,6 +107,58 @@ func TestRuntimeRecoversToRunningAfterConvertError(t *testing.T) { waitForRuntimeState(t, rt, "running") } +func TestRuntimeWithMissingSourceStaysIdleAndReturnsZeroSourceStats(t *testing.T) { + sink := audio.NewStreamSource(128, 44100) + rt := NewRuntime(sink, nil) + + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + stats := rt.Stats() + if stats.Runtime.State != "idle" { + t.Fatalf("runtime state=%q want idle", stats.Runtime.State) + } + if stats.Active.ID != "" || stats.Active.Kind != "" { + t.Fatalf("expected empty active descriptor, got %+v", stats.Active) + } + if stats.Source.State != "" { + t.Fatalf("expected zero-value source stats, got state=%q", stats.Source.State) + } +} + +func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) { + sink := audio.NewStreamSource(128, 44100) + src := newFakeSource() + src.desc = SourceDescriptor{ID: "icecast-primary", Kind: "icecast"} + src.stats = SourceStats{ + State: "reconnecting", + Connected: false, + Reconnects: 4, + LastError: "stream ended", + } + rt := NewRuntime(sink, src) + + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + waitForRuntimeState(t, rt, "running") + + stats := rt.Stats() + if stats.Active.ID != "icecast-primary" { + t.Fatalf("active id=%q want icecast-primary", stats.Active.ID) + } + if stats.Active.Kind != "icecast" { + t.Fatalf("active kind=%q want icecast", stats.Active.Kind) + } + if stats.Source.Reconnects != 4 { + t.Fatalf("source reconnects=%d want 4", stats.Source.Reconnects) + } + if stats.Source.LastError != "stream ended" { + t.Fatalf("source lastError=%q want stream ended", stats.Source.LastError) + } +} + func waitForRuntimeState(t *testing.T, rt *Runtime, want string) { t.Helper() deadline := time.Now().Add(1 * time.Second)