diff --git a/docs/pro-runtime-hardening-workboard.md b/docs/pro-runtime-hardening-workboard.md index 0a46e29..9b393ae 100644 --- a/docs/pro-runtime-hardening-workboard.md +++ b/docs/pro-runtime-hardening-workboard.md @@ -274,6 +274,7 @@ Einführen eines klaren Betriebsmodells mit Fault-, Recovery- und Muted-Zuständ ## Fortschritt - EngineStats liefert das Runtime-State-Feld (`idle`, `arming`, `prebuffering`, `running`) und reagiert nun auf Queue-Gesundheit bzw. späte Buffers, indem es bei `low`/`critical` oder späten Buffern in `degraded` wechselt und sonst auf `running` zurückkehrt. - `evaluateRuntimeState` escalates persistent `critical` queues from `degraded` to `muted`, while `FaultReasonQueueCritical` surfaces `muted` severity so the mute transition stays observable. +- `evaluateRuntimeState` now waits for a short healthy streak before leaving `muted`, logging a degraded-severity recovery event once the queue settles. ## Zielzustände laut Konzept - `idle` diff --git a/internal/app/engine.go b/internal/app/engine.go index a33edc7..d294ca4 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -98,6 +98,7 @@ const ( lateBufferIndicatorWindow = 5 * time.Second queueCriticalStreakThreshold = 3 queueMutedStreakThreshold = queueCriticalStreakThreshold * 2 + queueMutedRecoveryThreshold = queueCriticalStreakThreshold faultRepeatWindow = 1 * time.Second faultHistoryCapacity = 8 ) @@ -123,20 +124,21 @@ type Engine struct { wg sync.WaitGroup runtimeState atomic.Value - chunksProduced atomic.Uint64 - totalSamples atomic.Uint64 - underruns atomic.Uint64 - lateBuffers atomic.Uint64 - lateBufferAlertAt atomic.Uint64 - criticalStreak atomic.Uint64 - maxCycleNs atomic.Uint64 - maxGenerateNs atomic.Uint64 - maxUpsampleNs atomic.Uint64 - maxWriteNs atomic.Uint64 - lastError atomic.Value // string - lastFault atomic.Value // *FaultEvent - faultHistoryMu sync.Mutex - faultHistory []FaultEvent + chunksProduced atomic.Uint64 + totalSamples atomic.Uint64 + underruns atomic.Uint64 + lateBuffers atomic.Uint64 + lateBufferAlertAt atomic.Uint64 + criticalStreak atomic.Uint64 + mutedRecoveryStreak atomic.Uint64 + maxCycleNs atomic.Uint64 + maxGenerateNs atomic.Uint64 + maxUpsampleNs atomic.Uint64 + maxWriteNs atomic.Uint64 + lastError atomic.Value // string + lastFault atomic.Value // *FaultEvent + faultHistoryMu sync.Mutex + faultHistory []FaultEvent // Live config: pending frequency change, applied between chunks pendingFreq atomic.Pointer[float64] @@ -658,6 +660,18 @@ func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bo switch state { case RuntimeStateStopping, RuntimeStateFaulted: return + case RuntimeStateMuted: + if queue.Health == output.QueueHealthNormal && !hasLateBuffers { + if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold { + e.mutedRecoveryStreak.Store(0) + e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded, + fmt.Sprintf("queue healthy for %d checks after mute", count)) + e.setRuntimeState(RuntimeStateDegraded) + } + } else { + e.mutedRecoveryStreak.Store(0) + } + return } if state == RuntimeStatePrebuffering { if queue.Depth >= 1 { diff --git a/internal/app/runtime_state_test.go b/internal/app/runtime_state_test.go index 744a36b..36d8e6f 100644 --- a/internal/app/runtime_state_test.go +++ b/internal/app/runtime_state_test.go @@ -69,14 +69,43 @@ func TestEngineRuntimeStateMuteOnPersistentQueueCritical(t *testing.T) { t.Fatalf("expected muted after prolonged queue critical, got %s", got) } - last := e.LastFault() - if last == nil { + muteFault := e.LastFault() + if muteFault == nil { t.Fatal("expected fault recorded for the mute transition") } - if last.Reason != FaultReasonQueueCritical { - t.Fatalf("expected queue critical reason, got %s", last.Reason) + if muteFault.Reason != FaultReasonQueueCritical { + t.Fatalf("expected queue critical reason, got %s", muteFault.Reason) } - if last.Severity != FaultSeverityMuted { - t.Fatalf("expected muted severity, got %s", last.Severity) + if muteFault.Severity != FaultSeverityMuted { + t.Fatalf("expected muted severity, got %s", muteFault.Severity) + } + + queue.Health = output.QueueHealthNormal + for i := 0; i < queueMutedRecoveryThreshold-1; i++ { + e.evaluateRuntimeState(queue, false) + if got := e.currentRuntimeState(); got != RuntimeStateMuted { + t.Fatalf("expected still muted while recovery window builds, got %s", got) + } + } + + e.evaluateRuntimeState(queue, false) + if got := e.currentRuntimeState(); got != RuntimeStateDegraded { + t.Fatalf("expected degrade once mute recovery threshold reached, got %s", got) + } + + recoveryFault := e.LastFault() + if recoveryFault == nil { + t.Fatal("expected recovery fault entry after leaving mute") + } + if recoveryFault.Severity != FaultSeverityDegraded { + t.Fatalf("expected degraded severity for recovery event, got %s", recoveryFault.Severity) + } + if recoveryFault.Reason != FaultReasonQueueCritical { + t.Fatalf("expected queue critical reason for recovery event, got %s", recoveryFault.Reason) + } + + e.evaluateRuntimeState(queue, false) + if got := e.currentRuntimeState(); got != RuntimeStateRunning { + t.Fatalf("expected running after recovery, got %s", got) } }