| @@ -266,6 +266,11 @@ func (b *txBridge) TXStats() map[string]any { | |||||
| "queue": s.Queue, | "queue": s.Queue, | ||||
| "runtimeIndicator": s.RuntimeIndicator, | "runtimeIndicator": s.RuntimeIndicator, | ||||
| "runtimeAlert": s.RuntimeAlert, | "runtimeAlert": s.RuntimeAlert, | ||||
| "degradedTransitions": s.DegradedTransitions, | |||||
| "mutedTransitions": s.MutedTransitions, | |||||
| "faultedTransitions": s.FaultedTransitions, | |||||
| "faultCount": s.FaultCount, | |||||
| "lastFault": s.LastFault, | |||||
| } | } | ||||
| } | } | ||||
| func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error { | func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error { | ||||
| @@ -275,6 +275,8 @@ Einführen eines klaren Betriebsmodells mit Fault-, Recovery- und Muted-Zuständ | |||||
| - EngineStats liefert das Runtime-State-Feld (`idle`, `arming`, `prebuffering`, `running`) und reagiert nun auf Queue-Gesundheit bzw. späte Buffers, indem es bei `low`/`critical` oder späten Buffern in `degraded` wechselt und sonst auf `running` zurückkehrt. | - EngineStats liefert das Runtime-State-Feld (`idle`, `arming`, `prebuffering`, `running`) und reagiert nun auf Queue-Gesundheit bzw. späte Buffers, indem es bei `low`/`critical` oder späten Buffern in `degraded` wechselt und sonst auf `running` zurückkehrt. | ||||
| - `evaluateRuntimeState` escalates persistent `critical` queues from `degraded` to `muted`, while `FaultReasonQueueCritical` surfaces `muted` severity so the mute transition stays observable. | - `evaluateRuntimeState` escalates persistent `critical` queues from `degraded` to `muted`, while `FaultReasonQueueCritical` surfaces `muted` severity so the mute transition stays observable. | ||||
| - `evaluateRuntimeState` now waits for a short healthy streak before leaving `muted`, logging a degraded-severity recovery event once the queue settles. | - `evaluateRuntimeState` now waits for a short healthy streak before leaving `muted`, logging a degraded-severity recovery event once the queue settles. | ||||
| - Persistent queue-critical streaks while `muted` now escalate to `faulted` with `FaultSeverityFaulted`, keeping `RuntimeStateFaulted` observable. | |||||
| - `EngineStats` and `txBridge` now expose transition/fault counters plus `lastFault`, surfacing the new telemetry through `/runtime`. | |||||
| ## Zielzustände laut Konzept | ## Zielzustände laut Konzept | ||||
| - `idle` | - `idle` | ||||
| @@ -320,10 +322,14 @@ Einführen eines klaren Betriebsmodells mit Fault-, Recovery- und Muted-Zuständ | |||||
| - Welche Transitionen sind wirklich produktiv relevant und welche nur „theoretisch schön“? | - Welche Transitionen sind wirklich produktiv relevant und welche nur „theoretisch schön“? | ||||
| ## WS-02 Entscheidungslog | ## WS-02 Entscheidungslog | ||||
| - Noch leer | |||||
| | Datum | Entscheidung | Notiz | | |||||
| |---|---|---| | |||||
| | 2026-04-05 | Faulted escalation on persistent critical queue | `muted` now surfaces `RuntimeStateFaulted` when queue health stays critical and metrics capture every transition. | | |||||
| ## WS-02 Verifikation | ## WS-02 Verifikation | ||||
| - Noch leer | |||||
| | Datum | Fokus | Ergebnis | | |||||
| |---|---|---| | |||||
| | 2026-04-05 | Faulted path + transition counters | `go test ./...` exercises `TestEngineFaultsAfterMutedCriticalStreak` and `TestRuntimeTransitionCounters`, while `/runtime` now surfaces `engine.degradedTransitions`, `engine.mutedTransitions`, `engine.faultedTransitions`, `engine.faultCount`, and the last fault via `txBridge`. | | |||||
| --- | --- | ||||
| @@ -84,6 +84,10 @@ type EngineStats struct { | |||||
| RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"` | RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"` | ||||
| RuntimeAlert string `json:"runtimeAlert,omitempty"` | RuntimeAlert string `json:"runtimeAlert,omitempty"` | ||||
| LastFault *FaultEvent `json:"lastFault,omitempty"` | LastFault *FaultEvent `json:"lastFault,omitempty"` | ||||
| DegradedTransitions uint64 `json:"degradedTransitions"` | |||||
| MutedTransitions uint64 `json:"mutedTransitions"` | |||||
| FaultedTransitions uint64 `json:"faultedTransitions"` | |||||
| FaultCount uint64 `json:"faultCount"` | |||||
| } | } | ||||
| type RuntimeIndicator string | type RuntimeIndicator string | ||||
| @@ -99,6 +103,7 @@ const ( | |||||
| queueCriticalStreakThreshold = 3 | queueCriticalStreakThreshold = 3 | ||||
| queueMutedStreakThreshold = queueCriticalStreakThreshold * 2 | queueMutedStreakThreshold = queueCriticalStreakThreshold * 2 | ||||
| queueMutedRecoveryThreshold = queueCriticalStreakThreshold | queueMutedRecoveryThreshold = queueCriticalStreakThreshold | ||||
| queueFaultedStreakThreshold = queueCriticalStreakThreshold | |||||
| faultRepeatWindow = 1 * time.Second | faultRepeatWindow = 1 * time.Second | ||||
| faultHistoryCapacity = 8 | faultHistoryCapacity = 8 | ||||
| ) | ) | ||||
| @@ -131,6 +136,7 @@ type Engine struct { | |||||
| lateBufferAlertAt atomic.Uint64 | lateBufferAlertAt atomic.Uint64 | ||||
| criticalStreak atomic.Uint64 | criticalStreak atomic.Uint64 | ||||
| mutedRecoveryStreak atomic.Uint64 | mutedRecoveryStreak atomic.Uint64 | ||||
| mutedFaultStreak atomic.Uint64 | |||||
| maxCycleNs atomic.Uint64 | maxCycleNs atomic.Uint64 | ||||
| maxGenerateNs atomic.Uint64 | maxGenerateNs atomic.Uint64 | ||||
| maxUpsampleNs atomic.Uint64 | maxUpsampleNs atomic.Uint64 | ||||
| @@ -140,6 +146,11 @@ type Engine struct { | |||||
| faultHistoryMu sync.Mutex | faultHistoryMu sync.Mutex | ||||
| faultHistory []FaultEvent | faultHistory []FaultEvent | ||||
| degradedTransitions atomic.Uint64 | |||||
| mutedTransitions atomic.Uint64 | |||||
| faultedTransitions atomic.Uint64 | |||||
| faultEvents atomic.Uint64 | |||||
| // Live config: pending frequency change, applied between chunks | // Live config: pending frequency change, applied between chunks | ||||
| pendingFreq atomic.Pointer[float64] | pendingFreq atomic.Pointer[float64] | ||||
| @@ -406,6 +417,10 @@ func (e *Engine) Stats() EngineStats { | |||||
| RuntimeIndicator: ri, | RuntimeIndicator: ri, | ||||
| RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers), | RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers), | ||||
| LastFault: lastFault, | LastFault: lastFault, | ||||
| DegradedTransitions: e.degradedTransitions.Load(), | |||||
| MutedTransitions: e.mutedTransitions.Load(), | |||||
| FaultedTransitions: e.faultedTransitions.Load(), | |||||
| FaultCount: e.faultEvents.Load(), | |||||
| } | } | ||||
| } | } | ||||
| @@ -570,6 +585,17 @@ func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame { | |||||
| } | } | ||||
| func (e *Engine) setRuntimeState(state RuntimeState) { | func (e *Engine) setRuntimeState(state RuntimeState) { | ||||
| prev := e.currentRuntimeState() | |||||
| if prev != state { | |||||
| switch state { | |||||
| case RuntimeStateDegraded: | |||||
| e.degradedTransitions.Add(1) | |||||
| case RuntimeStateMuted: | |||||
| e.mutedTransitions.Add(1) | |||||
| case RuntimeStateFaulted: | |||||
| e.faultedTransitions.Add(1) | |||||
| } | |||||
| } | |||||
| e.runtimeState.Store(state) | e.runtimeState.Store(state) | ||||
| } | } | ||||
| @@ -625,6 +651,7 @@ func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message | |||||
| } | } | ||||
| e.lastFault.Store(ev) | e.lastFault.Store(ev) | ||||
| e.appendFaultHistory(ev) | e.appendFaultHistory(ev) | ||||
| e.faultEvents.Add(1) | |||||
| } | } | ||||
| func (e *Engine) loadLastFault() *FaultEvent { | func (e *Engine) loadLastFault() *FaultEvent { | ||||
| @@ -661,9 +688,21 @@ func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bo | |||||
| case RuntimeStateStopping, RuntimeStateFaulted: | case RuntimeStateStopping, RuntimeStateFaulted: | ||||
| return | return | ||||
| case RuntimeStateMuted: | case RuntimeStateMuted: | ||||
| if queue.Health == output.QueueHealthCritical { | |||||
| if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold { | |||||
| e.mutedFaultStreak.Store(0) | |||||
| e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted, | |||||
| fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth)) | |||||
| e.setRuntimeState(RuntimeStateFaulted) | |||||
| return | |||||
| } | |||||
| } else { | |||||
| e.mutedFaultStreak.Store(0) | |||||
| } | |||||
| if queue.Health == output.QueueHealthNormal && !hasLateBuffers { | if queue.Health == output.QueueHealthNormal && !hasLateBuffers { | ||||
| if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold { | if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold { | ||||
| e.mutedRecoveryStreak.Store(0) | e.mutedRecoveryStreak.Store(0) | ||||
| e.mutedFaultStreak.Store(0) | |||||
| e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded, | e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded, | ||||
| fmt.Sprintf("queue healthy for %d checks after mute", count)) | fmt.Sprintf("queue healthy for %d checks after mute", count)) | ||||
| e.setRuntimeState(RuntimeStateDegraded) | e.setRuntimeState(RuntimeStateDegraded) | ||||
| @@ -109,3 +109,73 @@ func TestEngineRuntimeStateMuteOnPersistentQueueCritical(t *testing.T) { | |||||
| t.Fatalf("expected running after recovery, got %s", got) | t.Fatalf("expected running after recovery, got %s", got) | ||||
| } | } | ||||
| } | } | ||||
| func TestEngineFaultsAfterMutedCriticalStreak(t *testing.T) { | |||||
| e := NewEngine(cfgpkg.Default(), platform.NewSimulatedDriver(nil)) | |||||
| e.setRuntimeState(RuntimeStateRunning) | |||||
| queue := output.QueueStats{Depth: 1, Health: output.QueueHealthCritical} | |||||
| for i := 0; i < queueMutedStreakThreshold; i++ { | |||||
| e.evaluateRuntimeState(queue, false) | |||||
| } | |||||
| if got := e.currentRuntimeState(); got != RuntimeStateMuted { | |||||
| t.Fatalf("expected muted after draining critical streak, got %s", got) | |||||
| } | |||||
| triggered := false | |||||
| for i := 0; i < queueFaultedStreakThreshold; i++ { | |||||
| e.evaluateRuntimeState(queue, false) | |||||
| if e.currentRuntimeState() == RuntimeStateFaulted { | |||||
| triggered = true | |||||
| break | |||||
| } | |||||
| } | |||||
| if !triggered { | |||||
| t.Fatalf("expected faulted after %d extra critical checks", queueFaultedStreakThreshold) | |||||
| } | |||||
| if got := e.currentRuntimeState(); got != RuntimeStateFaulted { | |||||
| t.Fatalf("expected faulted state, got %s", got) | |||||
| } | |||||
| fault := e.LastFault() | |||||
| if fault == nil { | |||||
| t.Fatal("expected recorded fault") | |||||
| } | |||||
| if fault.Severity != FaultSeverityFaulted { | |||||
| t.Fatalf("expected faulted severity, got %s", fault.Severity) | |||||
| } | |||||
| if fault.Reason != FaultReasonQueueCritical { | |||||
| t.Fatalf("expected queue critical reason, got %s", fault.Reason) | |||||
| } | |||||
| } | |||||
| func TestRuntimeTransitionCounters(t *testing.T) { | |||||
| e := NewEngine(cfgpkg.Default(), platform.NewSimulatedDriver(nil)) | |||||
| if got := e.Stats().DegradedTransitions; got != 0 { | |||||
| t.Fatalf("expected zero transitions initially, got %d", got) | |||||
| } | |||||
| if got := e.Stats().FaultCount; got != 0 { | |||||
| t.Fatalf("expected zero faults initially, got %d", got) | |||||
| } | |||||
| e.setRuntimeState(RuntimeStateDegraded) | |||||
| if got := e.Stats().DegradedTransitions; got != 1 { | |||||
| t.Fatalf("expected one degraded transition, got %d", got) | |||||
| } | |||||
| e.setRuntimeState(RuntimeStateMuted) | |||||
| if got := e.Stats().MutedTransitions; got != 1 { | |||||
| t.Fatalf("expected one mute transition, got %d", got) | |||||
| } | |||||
| e.setRuntimeState(RuntimeStateFaulted) | |||||
| if got := e.Stats().FaultedTransitions; got != 1 { | |||||
| t.Fatalf("expected one faulted transition, got %d", got) | |||||
| } | |||||
| e.recordFault(FaultReasonQueueCritical, FaultSeverityWarn, "audit") | |||||
| if got := e.Stats().FaultCount; got != 1 { | |||||
| t.Fatalf("expected one recorded fault, got %d", got) | |||||
| } | |||||
| } | |||||