From d29e9d45a300e1c4fd725af8ef44ae9e9ea02f63 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 21:50:37 +0200 Subject: [PATCH] engine: require sustained late writes before degrading runtime --- internal/app/engine.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/internal/app/engine.go b/internal/app/engine.go index 8348b52..c25537b 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -113,13 +113,14 @@ type RuntimeTransition struct { } const ( - lateBufferIndicatorWindow = 5 * time.Second - writeLateTolerance = 1 * time.Millisecond + lateBufferIndicatorWindow = 2 * time.Second + writeLateTolerance = 10 * time.Millisecond queueCriticalStreakThreshold = 3 queueMutedStreakThreshold = queueCriticalStreakThreshold * 2 queueMutedRecoveryThreshold = queueCriticalStreakThreshold queueFaultedStreakThreshold = queueCriticalStreakThreshold faultRepeatWindow = 1 * time.Second + lateBufferStreakThreshold = 3 // consecutive late writes required before alerting faultHistoryCapacity = 8 runtimeTransitionHistoryCapacity = 8 ) @@ -150,6 +151,7 @@ type Engine struct { underruns atomic.Uint64 lateBuffers atomic.Uint64 lateBufferAlertAt atomic.Uint64 + lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write criticalStreak atomic.Uint64 mutedRecoveryStreak atomic.Uint64 mutedFaultStreak atomic.Uint64 @@ -604,12 +606,23 @@ func (e *Engine) writerLoop(ctx context.Context) { lateOver := writeDur - e.chunkDuration if lateOver > writeLateTolerance { + streak := e.lateBufferStreak.Add(1) late := e.lateBuffers.Add(1) - e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano())) + // Only arm the alert window once the streak threshold is reached. + // Isolated OS-scheduling or USB jitter spikes (single late writes) + // are normal on a loaded system and must not trigger degraded state. + // This mirrors the queue-health streak logic. + if streak >= lateBufferStreakThreshold { + e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano())) + } if late <= 5 || late%20 == 0 { - log.Printf("TX LATE: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s", - writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency) + log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s", + streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency) } + } else { + // Clean write — reset the consecutive streak so isolated spikes + // never accumulate toward the threshold. + e.lateBufferStreak.Store(0) } if err != nil {