| @@ -256,6 +256,7 @@ func (b *txBridge) TXStats() map[string]any { | |||||
| "maxWriteMs": s.MaxWriteMs, | "maxWriteMs": s.MaxWriteMs, | ||||
| "queue": s.Queue, | "queue": s.Queue, | ||||
| "runtimeIndicator": s.RuntimeIndicator, | "runtimeIndicator": s.RuntimeIndicator, | ||||
| "runtimeAlert": s.RuntimeAlert, | |||||
| } | } | ||||
| } | } | ||||
| func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error { | func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error { | ||||
| @@ -69,6 +69,7 @@ type EngineStats struct { | |||||
| MaxWriteMs float64 `json:"maxWriteMs,omitempty"` | MaxWriteMs float64 `json:"maxWriteMs,omitempty"` | ||||
| Queue output.QueueStats `json:"queue"` | Queue output.QueueStats `json:"queue"` | ||||
| RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"` | RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"` | ||||
| RuntimeAlert string `json:"runtimeAlert,omitempty"` | |||||
| } | } | ||||
| type RuntimeIndicator string | type RuntimeIndicator string | ||||
| @@ -350,6 +351,7 @@ func (e *Engine) Stats() EngineStats { | |||||
| queue := e.frameQueue.Stats() | queue := e.frameQueue.Stats() | ||||
| lateBuffers := e.lateBuffers.Load() | lateBuffers := e.lateBuffers.Load() | ||||
| ri := runtimeIndicator(queue.Health, lateBuffers) | |||||
| return EngineStats{ | return EngineStats{ | ||||
| State: state.String(), | State: state.String(), | ||||
| ChunksProduced: e.chunksProduced.Load(), | ChunksProduced: e.chunksProduced.Load(), | ||||
| @@ -363,7 +365,8 @@ func (e *Engine) Stats() EngineStats { | |||||
| MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()), | MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()), | ||||
| MaxWriteMs: durationMs(e.maxWriteNs.Load()), | MaxWriteMs: durationMs(e.maxWriteNs.Load()), | ||||
| Queue: queue, | Queue: queue, | ||||
| RuntimeIndicator: runtimeIndicator(queue.Health, lateBuffers), | |||||
| RuntimeIndicator: ri, | |||||
| RuntimeAlert: runtimeAlert(queue.Health, lateBuffers), | |||||
| } | } | ||||
| } | } | ||||
| @@ -378,6 +381,19 @@ func runtimeIndicator(queueHealth output.QueueHealth, lateBuffers uint64) Runtim | |||||
| } | } | ||||
| } | } | ||||
| func runtimeAlert(queueHealth output.QueueHealth, lateBuffers uint64) string { | |||||
| switch { | |||||
| case queueHealth == output.QueueHealthCritical: | |||||
| return "queue health critical" | |||||
| case lateBuffers > 0: | |||||
| return "late buffers" | |||||
| case queueHealth == output.QueueHealthLow: | |||||
| return "queue health low" | |||||
| default: | |||||
| return "" | |||||
| } | |||||
| } | |||||
| func (e *Engine) run(ctx context.Context) { | func (e *Engine) run(ctx context.Context) { | ||||
| e.wg.Add(1) | e.wg.Add(1) | ||||
| go e.writerLoop(ctx) | go e.writerLoop(ctx) | ||||
| @@ -55,3 +55,53 @@ func TestRuntimeIndicator(t *testing.T) { | |||||
| }) | }) | ||||
| } | } | ||||
| } | } | ||||
| func TestRuntimeAlert(t *testing.T) { | |||||
| cases := []struct { | |||||
| name string | |||||
| queueHealth output.QueueHealth | |||||
| lateBuffers uint64 | |||||
| want string | |||||
| }{ | |||||
| { | |||||
| name: "normal", | |||||
| queueHealth: output.QueueHealthNormal, | |||||
| lateBuffers: 0, | |||||
| want: "", | |||||
| }, | |||||
| { | |||||
| name: "lateBuffers", | |||||
| queueHealth: output.QueueHealthNormal, | |||||
| lateBuffers: 1, | |||||
| want: "late buffers", | |||||
| }, | |||||
| { | |||||
| name: "queueLow", | |||||
| queueHealth: output.QueueHealthLow, | |||||
| lateBuffers: 0, | |||||
| want: "queue health low", | |||||
| }, | |||||
| { | |||||
| name: "queueCritical", | |||||
| queueHealth: output.QueueHealthCritical, | |||||
| lateBuffers: 0, | |||||
| want: "queue health critical", | |||||
| }, | |||||
| { | |||||
| name: "criticalLateBuffers", | |||||
| queueHealth: output.QueueHealthCritical, | |||||
| lateBuffers: 5, | |||||
| want: "queue health critical", | |||||
| }, | |||||
| } | |||||
| for _, tc := range cases { | |||||
| tc := tc | |||||
| t.Run(tc.name, func(t *testing.T) { | |||||
| if got := runtimeAlert(tc.queueHealth, tc.lateBuffers); got != tc.want { | |||||
| t.Fatalf("runtime alert mismatch: queue=%s late=%d want=%q got=%q", | |||||
| tc.queueHealth, tc.lateBuffers, tc.want, got) | |||||
| } | |||||
| }) | |||||
| } | |||||
| } | |||||
| @@ -137,6 +137,9 @@ func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) { | |||||
| if ri, ok := stats["runtimeIndicator"]; ok { | if ri, ok := stats["runtimeIndicator"]; ok { | ||||
| status["runtimeIndicator"] = ri | status["runtimeIndicator"] = ri | ||||
| } | } | ||||
| if alert, ok := stats["runtimeAlert"]; ok { | |||||
| status["runtimeAlert"] = alert | |||||
| } | |||||
| } | } | ||||
| } | } | ||||
| @@ -39,7 +39,7 @@ func TestStatus(t *testing.T) { | |||||
| func TestStatusReportsRuntimeIndicator(t *testing.T) { | func TestStatusReportsRuntimeIndicator(t *testing.T) { | ||||
| srv := NewServer(cfgpkg.Default()) | srv := NewServer(cfgpkg.Default()) | ||||
| srv.SetTXController(&fakeTXController{stats: map[string]any{"runtimeIndicator": "degraded"}}) | |||||
| srv.SetTXController(&fakeTXController{stats: map[string]any{"runtimeIndicator": "degraded", "runtimeAlert": "late buffers"}}) | |||||
| rec := httptest.NewRecorder() | rec := httptest.NewRecorder() | ||||
| srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/status", nil)) | srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/status", nil)) | ||||
| if rec.Code != 200 { | if rec.Code != 200 { | ||||
| @@ -50,6 +50,9 @@ func TestStatusReportsRuntimeIndicator(t *testing.T) { | |||||
| if body["runtimeIndicator"] != "degraded" { | if body["runtimeIndicator"] != "degraded" { | ||||
| t.Fatalf("expected runtimeIndicator degraded, got %v", body["runtimeIndicator"]) | t.Fatalf("expected runtimeIndicator degraded, got %v", body["runtimeIndicator"]) | ||||
| } | } | ||||
| if body["runtimeAlert"] != "late buffers" { | |||||
| t.Fatalf("expected runtimeAlert late buffers, got %v", body["runtimeAlert"]) | |||||
| } | |||||
| } | } | ||||
| func TestDryRunEndpoint(t *testing.T) { | func TestDryRunEndpoint(t *testing.T) { | ||||