Kaynağa Gözat

ingest: improve runtime observability coverage

main
Jan 1 ay önce
ebeveyn
işleme
42d74c8665
3 değiştirilmiş dosya ile 148 ekleme ve 4 silme
  1. +3
    -1
      internal/control/control.go
  2. +93
    -3
      internal/control/control_test.go
  3. +52
    -0
      internal/ingest/runtime_test.go

+ 3
- 1
internal/control/control.go Dosyayı Görüntüle

@@ -299,7 +299,9 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
result["driver"] = drv.Stats() result["driver"] = drv.Stats()
} }
if tx != nil { if tx != nil {
result["engine"] = tx.TXStats()
if stats := tx.TXStats(); stats != nil {
result["engine"] = stats
}
} }
if stream != nil { if stream != nil {
result["audioStream"] = stream.Stats() result["audioStream"] = stream.Stats()


+ 93
- 3
internal/control/control_test.go Dosyayı Görüntüle

@@ -175,6 +175,16 @@ func TestRuntimeWithoutDriver(t *testing.T) {
if rec.Code != 200 { if rec.Code != 200 {
t.Fatalf("status: %d", rec.Code) t.Fatalf("status: %d", rec.Code)
} }
var body map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
t.Fatalf("unmarshal runtime: %v", err)
}
if _, ok := body["ingest"]; ok {
t.Fatalf("expected ingest payload to be absent when ingest runtime is not configured")
}
if _, ok := body["engine"]; ok {
t.Fatalf("expected engine payload to be absent when tx controller is not configured")
}
} }


func TestRuntimeIncludesIngestStats(t *testing.T) { func TestRuntimeIncludesIngestStats(t *testing.T) {
@@ -207,6 +217,82 @@ func TestRuntimeIncludesIngestStats(t *testing.T) {
} }
} }


func TestRuntimeIncludesDetailedIngestSourceAndRuntimeStats(t *testing.T) {
srv := NewServer(cfgpkg.Default())
srv.SetIngestRuntime(&fakeIngestRuntime{
stats: ingest.Stats{
Active: ingest.SourceDescriptor{ID: "icecast-main", Kind: "icecast"},
Source: ingest.SourceStats{
State: "reconnecting",
Connected: false,
Reconnects: 3,
LastError: "dial tcp timeout",
},
Runtime: ingest.RuntimeStats{
State: "degraded",
ConvertErrors: 2,
WriteBlocked: true,
},
},
})
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/runtime", nil))
if rec.Code != http.StatusOK {
t.Fatalf("status: %d", rec.Code)
}
var body map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
t.Fatalf("unmarshal runtime: %v", err)
}
ingestPayload, ok := body["ingest"].(map[string]any)
if !ok {
t.Fatalf("expected ingest payload map, got %T", body["ingest"])
}
source, ok := ingestPayload["source"].(map[string]any)
if !ok {
t.Fatalf("expected ingest.source map, got %T", ingestPayload["source"])
}
if source["state"] != "reconnecting" {
t.Fatalf("source state mismatch: got %v", source["state"])
}
if source["reconnects"] != float64(3) {
t.Fatalf("source reconnects mismatch: got %v", source["reconnects"])
}
if source["lastError"] != "dial tcp timeout" {
t.Fatalf("source lastError mismatch: got %v", source["lastError"])
}
runtimePayload, ok := ingestPayload["runtime"].(map[string]any)
if !ok {
t.Fatalf("expected ingest.runtime map, got %T", ingestPayload["runtime"])
}
if runtimePayload["state"] != "degraded" {
t.Fatalf("runtime state mismatch: got %v", runtimePayload["state"])
}
if runtimePayload["convertErrors"] != float64(2) {
t.Fatalf("runtime convertErrors mismatch: got %v", runtimePayload["convertErrors"])
}
if runtimePayload["writeBlocked"] != true {
t.Fatalf("runtime writeBlocked mismatch: got %v", runtimePayload["writeBlocked"])
}
}

func TestRuntimeOmitsEngineWhenControllerReturnsNilStats(t *testing.T) {
srv := NewServer(cfgpkg.Default())
srv.SetTXController(&fakeTXController{returnNilStats: true})
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/runtime", nil))
if rec.Code != http.StatusOK {
t.Fatalf("status: %d", rec.Code)
}
var body map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
t.Fatalf("unmarshal runtime: %v", err)
}
if _, ok := body["engine"]; ok {
t.Fatalf("expected engine field to be omitted when TXStats returns nil")
}
}

func TestRuntimeReportsFaultHistory(t *testing.T) { func TestRuntimeReportsFaultHistory(t *testing.T) {
srv := NewServer(cfgpkg.Default()) srv := NewServer(cfgpkg.Default())
history := []map[string]any{ history := []map[string]any{
@@ -626,9 +712,10 @@ func newConfigPostRequest(body []byte) *http.Request {
} }


type fakeTXController struct { type fakeTXController struct {
updateErr error
resetErr error
stats map[string]any
updateErr error
resetErr error
stats map[string]any
returnNilStats bool
} }


type fakeAudioIngress struct { type fakeAudioIngress struct {
@@ -652,6 +739,9 @@ func (f *fakeIngestRuntime) Stats() ingest.Stats {
func (f *fakeTXController) StartTX() error { return nil } func (f *fakeTXController) StartTX() error { return nil }
func (f *fakeTXController) StopTX() error { return nil } func (f *fakeTXController) StopTX() error { return nil }
func (f *fakeTXController) TXStats() map[string]any { func (f *fakeTXController) TXStats() map[string]any {
if f.returnNilStats {
return nil
}
if f.stats != nil { if f.stats != nil {
return f.stats return f.stats
} }


+ 52
- 0
internal/ingest/runtime_test.go Dosyayı Görüntüle

@@ -107,6 +107,58 @@ func TestRuntimeRecoversToRunningAfterConvertError(t *testing.T) {
waitForRuntimeState(t, rt, "running") waitForRuntimeState(t, rt, "running")
} }


func TestRuntimeWithMissingSourceStaysIdleAndReturnsZeroSourceStats(t *testing.T) {
sink := audio.NewStreamSource(128, 44100)
rt := NewRuntime(sink, nil)

if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
stats := rt.Stats()
if stats.Runtime.State != "idle" {
t.Fatalf("runtime state=%q want idle", stats.Runtime.State)
}
if stats.Active.ID != "" || stats.Active.Kind != "" {
t.Fatalf("expected empty active descriptor, got %+v", stats.Active)
}
if stats.Source.State != "" {
t.Fatalf("expected zero-value source stats, got state=%q", stats.Source.State)
}
}

func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) {
sink := audio.NewStreamSource(128, 44100)
src := newFakeSource()
src.desc = SourceDescriptor{ID: "icecast-primary", Kind: "icecast"}
src.stats = SourceStats{
State: "reconnecting",
Connected: false,
Reconnects: 4,
LastError: "stream ended",
}
rt := NewRuntime(sink, src)

if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()
waitForRuntimeState(t, rt, "running")

stats := rt.Stats()
if stats.Active.ID != "icecast-primary" {
t.Fatalf("active id=%q want icecast-primary", stats.Active.ID)
}
if stats.Active.Kind != "icecast" {
t.Fatalf("active kind=%q want icecast", stats.Active.Kind)
}
if stats.Source.Reconnects != 4 {
t.Fatalf("source reconnects=%d want 4", stats.Source.Reconnects)
}
if stats.Source.LastError != "stream ended" {
t.Fatalf("source lastError=%q want stream ended", stats.Source.LastError)
}
}

func waitForRuntimeState(t *testing.T, rt *Runtime, want string) { func waitForRuntimeState(t *testing.T, rt *Runtime, want string) {
t.Helper() t.Helper()
deadline := time.Now().Add(1 * time.Second) deadline := time.Now().Add(1 * time.Second)


Yükleniyor…
İptal
Kaydet