diff --git a/internal/app/engine.go b/internal/app/engine.go index 8348b52..c25537b 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -113,13 +113,14 @@ type RuntimeTransition struct { } const ( - lateBufferIndicatorWindow = 5 * time.Second - writeLateTolerance = 1 * time.Millisecond + lateBufferIndicatorWindow = 2 * time.Second + writeLateTolerance = 10 * time.Millisecond queueCriticalStreakThreshold = 3 queueMutedStreakThreshold = queueCriticalStreakThreshold * 2 queueMutedRecoveryThreshold = queueCriticalStreakThreshold queueFaultedStreakThreshold = queueCriticalStreakThreshold faultRepeatWindow = 1 * time.Second + lateBufferStreakThreshold = 3 // consecutive late writes required before alerting faultHistoryCapacity = 8 runtimeTransitionHistoryCapacity = 8 ) @@ -150,6 +151,7 @@ type Engine struct { underruns atomic.Uint64 lateBuffers atomic.Uint64 lateBufferAlertAt atomic.Uint64 + lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write criticalStreak atomic.Uint64 mutedRecoveryStreak atomic.Uint64 mutedFaultStreak atomic.Uint64 @@ -604,12 +606,23 @@ func (e *Engine) writerLoop(ctx context.Context) { lateOver := writeDur - e.chunkDuration if lateOver > writeLateTolerance { + streak := e.lateBufferStreak.Add(1) late := e.lateBuffers.Add(1) - e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano())) + // Only arm the alert window once the streak threshold is reached. + // Isolated OS-scheduling or USB jitter spikes (single late writes) + // are normal on a loaded system and must not trigger degraded state. + // This mirrors the queue-health streak logic. + if streak >= lateBufferStreakThreshold { + e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano())) + } if late <= 5 || late%20 == 0 { - log.Printf("TX LATE: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s", - writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency) + log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s", + streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency) } + } else { + // Clean write — reset the consecutive streak so isolated spikes + // never accumulate toward the threshold. + e.lateBufferStreak.Store(0) } if err != nil {