From 9885e449623412743249002893b9c258d8d88a34 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sun, 5 Apr 2026 18:52:22 +0200 Subject: [PATCH] feat: add runtime supervisor transitions --- docs/pro-runtime-hardening-workboard.md | 2 +- internal/app/engine.go | 48 ++++++++++++++++++++++--- internal/app/runtime_state_test.go | 31 ++++++++++++++++ 3 files changed, 76 insertions(+), 5 deletions(-) diff --git a/docs/pro-runtime-hardening-workboard.md b/docs/pro-runtime-hardening-workboard.md index e993968..053239f 100644 --- a/docs/pro-runtime-hardening-workboard.md +++ b/docs/pro-runtime-hardening-workboard.md @@ -272,7 +272,7 @@ Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem, Einführen eines klaren Betriebsmodells mit Fault-, Recovery- und Muted-Zuständen. ## Fortschritt -- EngineStats liefert das Runtime-State-Feld (`idle`, `arming`, `prebuffering`, `running`) und schafft eine beobachtbare Baseline für die nächste Fault-Maschine. +- EngineStats liefert das Runtime-State-Feld (`idle`, `arming`, `prebuffering`, `running`) und reagiert nun auf Queue-Gesundheit bzw. späte Buffers, indem es bei `low`/`critical` oder späten Buffern in `degraded` wechselt und sonst auf `running` zurückkehrt. ## Zielzustände laut Konzept - `idle` diff --git a/internal/app/engine.go b/internal/app/engine.go index 2736766..2ef1963 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -94,6 +94,7 @@ const ( ) const lateBufferIndicatorWindow = 5 * time.Second +const queueCriticalStreakThreshold = 3 // Engine is the continuous TX loop. It generates composite IQ in chunks, // resamples to device rate, and pushes to hardware in a tight loop. @@ -121,6 +122,7 @@ type Engine struct { underruns atomic.Uint64 lateBuffers atomic.Uint64 lateBufferAlertAt atomic.Uint64 + criticalStreak atomic.Uint64 maxCycleNs atomic.Uint64 maxGenerateNs atomic.Uint64 maxUpsampleNs atomic.Uint64 @@ -373,9 +375,7 @@ func (e *Engine) Stats() EngineStats { queue := e.frameQueue.Stats() lateBuffers := e.lateBuffers.Load() - now := time.Now() - lateAlertAt := e.lateBufferAlertAt.Load() - hasRecentLateBuffers := lateAlertAt > 0 && now.Sub(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow + hasRecentLateBuffers := e.hasRecentLateBuffers() ri := runtimeIndicator(queue.Health, hasRecentLateBuffers) return EngineStats{ State: string(e.currentRuntimeState()), @@ -420,7 +420,7 @@ func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string } func (e *Engine) run(ctx context.Context) { - e.setRuntimeState(RuntimeStateRunning) + e.setRuntimeState(RuntimeStatePrebuffering) e.wg.Add(1) go e.writerLoop(ctx) defer e.wg.Done() @@ -477,6 +477,8 @@ func (e *Engine) run(ctx context.Context) { } continue } + queueStats := e.frameQueue.Stats() + e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers()) } } @@ -507,6 +509,8 @@ func (e *Engine) writerLoop(ctx context.Context) { updateMaxDuration(&e.maxWriteNs, writeDur) updateMaxDuration(&e.maxCycleNs, cycleDur) + queueStats := e.frameQueue.Stats() + e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers()) if cycleDur > e.chunkDuration { late := e.lateBuffers.Add(1) @@ -563,3 +567,39 @@ func (e *Engine) currentRuntimeState() RuntimeState { } return RuntimeStateIdle } + +func (e *Engine) hasRecentLateBuffers() bool { + lateAlertAt := e.lateBufferAlertAt.Load() + if lateAlertAt == 0 { + return false + } + return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow +} + +func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) { + state := e.currentRuntimeState() + switch state { + case RuntimeStateStopping, RuntimeStateFaulted: + return + } + if state == RuntimeStatePrebuffering { + if queue.Depth >= 1 { + e.setRuntimeState(RuntimeStateRunning) + } + return + } + critical := queue.Health == output.QueueHealthCritical + if critical { + if e.criticalStreak.Add(1) >= queueCriticalStreakThreshold { + e.setRuntimeState(RuntimeStateDegraded) + return + } + } else { + e.criticalStreak.Store(0) + } + if hasLateBuffers { + e.setRuntimeState(RuntimeStateDegraded) + return + } + e.setRuntimeState(RuntimeStateRunning) +} diff --git a/internal/app/runtime_state_test.go b/internal/app/runtime_state_test.go index 9ef04be..b253183 100644 --- a/internal/app/runtime_state_test.go +++ b/internal/app/runtime_state_test.go @@ -4,6 +4,7 @@ import ( "testing" cfgpkg "github.com/jan/fm-rds-tx/internal/config" + "github.com/jan/fm-rds-tx/internal/output" "github.com/jan/fm-rds-tx/internal/platform" ) @@ -24,3 +25,33 @@ func TestEngineRuntimeStateReporting(t *testing.T) { t.Fatalf("currentRuntimeState mismatch: %s", got) } } + +func TestEngineRuntimeStateTransitions(t *testing.T) { + e := NewEngine(cfgpkg.Default(), platform.NewSimulatedDriver(nil)) + e.setRuntimeState(RuntimeStatePrebuffering) + + queue := output.QueueStats{Depth: 1, FillLevel: 0.75, Health: output.QueueHealthNormal} + e.evaluateRuntimeState(queue, false) + if got := e.currentRuntimeState(); got != RuntimeStateRunning { + t.Fatalf("expected running after full buffer, got %s", got) + } + + queue.Health = output.QueueHealthCritical + for i := 0; i < queueCriticalStreakThreshold; i++ { + e.evaluateRuntimeState(queue, false) + } + if got := e.currentRuntimeState(); got != RuntimeStateDegraded { + t.Fatalf("expected degraded on queue critical streak, got %s", got) + } + + queue.Health = output.QueueHealthNormal + e.evaluateRuntimeState(queue, false) + if got := e.currentRuntimeState(); got != RuntimeStateRunning { + t.Fatalf("expected running once queue healthy, got %s", got) + } + + e.evaluateRuntimeState(queue, true) + if got := e.currentRuntimeState(); got != RuntimeStateDegraded { + t.Fatalf("expected degraded when late buffers seen, got %s", got) + } +}