package app import ( "context" "errors" "fmt" "log" "math" "sync" "sync/atomic" "time" "github.com/jan/fm-rds-tx/internal/audio" cfgpkg "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/dsp" "github.com/jan/fm-rds-tx/internal/license" offpkg "github.com/jan/fm-rds-tx/internal/offline" "github.com/jan/fm-rds-tx/internal/output" "github.com/jan/fm-rds-tx/internal/platform" ) type EngineState int const ( EngineIdle EngineState = iota EngineRunning EngineStopping ) func (s EngineState) String() string { switch s { case EngineIdle: return "idle" case EngineRunning: return "running" case EngineStopping: return "stopping" default: return "unknown" } } type RuntimeState string const ( RuntimeStateIdle RuntimeState = "idle" RuntimeStateArming RuntimeState = "arming" RuntimeStatePrebuffering RuntimeState = "prebuffering" RuntimeStateRunning RuntimeState = "running" RuntimeStateDegraded RuntimeState = "degraded" RuntimeStateMuted RuntimeState = "muted" RuntimeStateFaulted RuntimeState = "faulted" RuntimeStateStopping RuntimeState = "stopping" ) func updateMaxDuration(dst *atomic.Uint64, d time.Duration) { v := uint64(d) for { cur := dst.Load() if v <= cur { return } if dst.CompareAndSwap(cur, v) { return } } } func durationMs(ns uint64) float64 { return float64(ns) / float64(time.Millisecond) } type EngineStats struct { State string `json:"state"` RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"` ChunksProduced uint64 `json:"chunksProduced"` TotalSamples uint64 `json:"totalSamples"` Underruns uint64 `json:"underruns"` LateBuffers uint64 `json:"lateBuffers,omitempty"` LastError string `json:"lastError,omitempty"` UptimeSeconds float64 `json:"uptimeSeconds"` MaxCycleMs float64 `json:"maxCycleMs,omitempty"` MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"` MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"` MaxWriteMs float64 `json:"maxWriteMs,omitempty"` MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"` MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"` Queue output.QueueStats `json:"queue"` RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"` RuntimeAlert string `json:"runtimeAlert,omitempty"` AppliedFrequencyMHz float64 `json:"appliedFrequencyMHz"` ActivePS string `json:"activePS,omitempty"` ActiveRadioText string `json:"activeRadioText,omitempty"` LastFault *FaultEvent `json:"lastFault,omitempty"` DegradedTransitions uint64 `json:"degradedTransitions"` MutedTransitions uint64 `json:"mutedTransitions"` FaultedTransitions uint64 `json:"faultedTransitions"` FaultCount uint64 `json:"faultCount"` FaultHistory []FaultEvent `json:"faultHistory,omitempty"` TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"` } type RuntimeIndicator string const ( RuntimeIndicatorNormal RuntimeIndicator = "normal" RuntimeIndicatorDegraded RuntimeIndicator = "degraded" RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical" ) type RuntimeTransition struct { Time time.Time `json:"time"` From RuntimeState `json:"from"` To RuntimeState `json:"to"` Severity string `json:"severity"` } const ( lateBufferIndicatorWindow = 2 * time.Second writeLateTolerance = 10 * time.Millisecond queueCriticalStreakThreshold = 3 queueMutedStreakThreshold = queueCriticalStreakThreshold * 2 queueMutedRecoveryThreshold = queueCriticalStreakThreshold queueFaultedStreakThreshold = queueCriticalStreakThreshold faultRepeatWindow = 1 * time.Second lateBufferStreakThreshold = 3 // consecutive late writes required before alerting faultHistoryCapacity = 8 runtimeTransitionHistoryCapacity = 8 ) // Engine is the continuous TX loop. It generates composite IQ in chunks, // resamples to device rate, and pushes to hardware in a tight loop. // The hardware buffer_push call is blocking — it returns when the hardware // has consumed the previous buffer and is ready for the next one. // This naturally paces the loop to real-time without a ticker. type Engine struct { cfg cfgpkg.Config driver platform.SoapyDriver generator *offpkg.Generator upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate chunkDuration time.Duration deviceRate float64 frameQueue *output.FrameQueue mu sync.Mutex state EngineState cancel context.CancelFunc startedAt time.Time wg sync.WaitGroup runtimeState atomic.Value stateMu sync.Mutex // guards setRuntimeState check-then-store (NEW-2 fix) chunksProduced atomic.Uint64 totalSamples atomic.Uint64 underruns atomic.Uint64 lateBuffers atomic.Uint64 lateBufferAlertAt atomic.Uint64 lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write criticalStreak atomic.Uint64 mutedRecoveryStreak atomic.Uint64 mutedFaultStreak atomic.Uint64 maxCycleNs atomic.Uint64 maxGenerateNs atomic.Uint64 maxUpsampleNs atomic.Uint64 maxWriteNs atomic.Uint64 maxQueueResidenceNs atomic.Uint64 maxPipelineNs atomic.Uint64 lastError atomic.Value // string lastFault atomic.Value // *FaultEvent faultHistoryMu sync.Mutex faultHistory []FaultEvent transitionHistoryMu sync.Mutex transitionHistory []RuntimeTransition degradedTransitions atomic.Uint64 mutedTransitions atomic.Uint64 faultedTransitions atomic.Uint64 faultEvents atomic.Uint64 runtimeStateEnteredAt atomic.Uint64 // Live config: pending frequency change, applied between chunks pendingFreq atomic.Pointer[float64] // Most recently tuned frequency (Hz) appliedFreqHz atomic.Uint64 // Live audio stream (optional) streamSrc *audio.StreamSource } // SetStreamSource configures a live audio stream as the audio source. // Must be called before Start(). The StreamResampler is created internally // to convert from the stream's sample rate to the DSP composite rate. func (e *Engine) SetStreamSource(src *audio.StreamSource) { e.streamSrc = src compositeRate := float64(e.cfg.FM.CompositeRateHz) if compositeRate <= 0 { compositeRate = 228000 } resampler := audio.NewStreamResampler(src, compositeRate) if err := e.generator.SetExternalSource(resampler); err != nil { // Should never happen: SetStreamSource must be called before Start(). log.Printf("engine: SetExternalSource failed (called too late): %v", err) return } log.Printf("engine: live audio stream wired — initial %d Hz → %.0f Hz composite (buffer %d frames); actual decoded rate auto-corrects on first chunk", src.SampleRate, compositeRate, src.Stats().Capacity) } // SetLicenseState passes license/jingle state to the generator. // Must be called before Start(). It does not implicitly enable watermarking. func (e *Engine) SetLicenseState(s *license.State, _ string) { e.generator.SetLicense(s) } // ConfigureWatermark explicitly enables or disables the optional program-audio watermark. // Must be called before Start(). func (e *Engine) ConfigureWatermark(enabled bool, key string) { e.generator.ConfigureWatermark(enabled, key) } // StreamSource returns the live audio stream source, or nil. // Used by the control server for stats and HTTP audio ingest. func (e *Engine) StreamSource() *audio.StreamSource { return e.streamSrc } func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine { deviceRate := cfg.EffectiveDeviceRate() compositeRate := float64(cfg.FM.CompositeRateHz) if compositeRate <= 0 { compositeRate = 228000 } var upsampler *dsp.FMUpsampler if deviceRate > compositeRate*1.001 { // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz), // FMUpsampler handles FM modulation + interpolation to deviceRate. // This halves CPU load compared to running all DSP at deviceRate. cfg.FM.FMModulationEnabled = false maxDev := cfg.FM.MaxDeviationHz if maxDev <= 0 { maxDev = 75000 } // mpxGain scales the FM deviation to compensate for hardware // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 { maxDev *= cfg.FM.MpxGain } upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev) log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)", compositeRate, deviceRate, deviceRate/compositeRate) } else { // Same-rate: entire DSP chain runs at deviceRate. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz). if deviceRate > 0 { cfg.FM.CompositeRateHz = int(deviceRate) } cfg.FM.FMModulationEnabled = true log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz) } engine := &Engine{ cfg: cfg, driver: driver, generator: offpkg.NewGenerator(cfg), upsampler: upsampler, chunkDuration: 50 * time.Millisecond, deviceRate: deviceRate, state: EngineIdle, frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity), faultHistory: make([]FaultEvent, 0, faultHistoryCapacity), transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity), } initFreqHz := cfg.FM.FrequencyMHz * 1e6 engine.appliedFreqHz.Store(math.Float64bits(initFreqHz)) engine.setRuntimeState(RuntimeStateIdle) return engine } func (e *Engine) SetChunkDuration(d time.Duration) { e.chunkDuration = d } // LiveConfigUpdate carries hot-reloadable parameters from the control API. // nil pointers mean "no change". Validated before applying. type LiveConfigUpdate struct { FrequencyMHz *float64 OutputDrive *float64 StereoEnabled *bool StereoMode *string PilotLevel *float64 RDSInjection *float64 RDSEnabled *bool LimiterEnabled *bool LimiterCeiling *float64 PS *string RadioText *string TA *bool TP *bool // Tone and gain: live-patchable without engine restart. ToneLeftHz *float64 ToneRightHz *float64 ToneAmplitude *float64 AudioGain *float64 CompositeClipperEnabled *bool } // UpdateConfig applies live parameter changes without restarting the engine. // DSP params take effect at the next chunk boundary (~50ms max). // Frequency changes are applied between chunks via driver.Tune(). // RDS text updates are applied at the next RDS group boundary (~88ms). func (e *Engine) UpdateConfig(u LiveConfigUpdate) error { // --- Validate --- if u.FrequencyMHz != nil { if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 { return fmt.Errorf("frequencyMHz out of range (65-110)") } } if u.OutputDrive != nil { if *u.OutputDrive < 0 || *u.OutputDrive > 10 { return fmt.Errorf("outputDrive out of range (0-10)") } } if u.PilotLevel != nil { if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 { return fmt.Errorf("pilotLevel out of range (0-0.2)") } } if u.RDSInjection != nil { if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 { return fmt.Errorf("rdsInjection out of range (0-0.15)") } } if u.LimiterCeiling != nil { if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 { return fmt.Errorf("limiterCeiling out of range (0-2)") } } if u.ToneAmplitude != nil { if *u.ToneAmplitude < 0 || *u.ToneAmplitude > 1 { return fmt.Errorf("toneAmplitude out of range (0-1)") } } if u.AudioGain != nil { if *u.AudioGain < 0 || *u.AudioGain > 4 { return fmt.Errorf("audioGain out of range (0-4)") } } // --- Frequency: store for run loop to apply via driver.Tune() --- if u.FrequencyMHz != nil { freqHz := *u.FrequencyMHz * 1e6 e.pendingFreq.Store(&freqHz) } // --- RDS text: forward to encoder atomics --- if u.PS != nil || u.RadioText != nil { if enc := e.generator.RDSEncoder(); enc != nil { ps, rt := "", "" if u.PS != nil { ps = *u.PS } if u.RadioText != nil { rt = *u.RadioText } enc.UpdateText(ps, rt) } } // --- RDS traffic flags: live-update --- if u.TA != nil || u.TP != nil { if enc := e.generator.RDSEncoder(); enc != nil { if u.TA != nil { enc.UpdateTA(*u.TA) } if u.TP != nil { enc.UpdateTP(*u.TP) } } } // --- DSP params: build new LiveParams from current + patch --- // Read current, apply deltas, store new current := e.generator.CurrentLiveParams() next := current // copy if u.OutputDrive != nil { next.OutputDrive = *u.OutputDrive } if u.StereoEnabled != nil { next.StereoEnabled = *u.StereoEnabled } if u.StereoMode != nil { next.StereoMode = *u.StereoMode } if u.PilotLevel != nil { next.PilotLevel = *u.PilotLevel } if u.RDSInjection != nil { next.RDSInjection = *u.RDSInjection } if u.RDSEnabled != nil { next.RDSEnabled = *u.RDSEnabled } if u.LimiterEnabled != nil { next.LimiterEnabled = *u.LimiterEnabled } if u.LimiterCeiling != nil { next.LimiterCeiling = *u.LimiterCeiling } if u.ToneLeftHz != nil { next.ToneLeftHz = *u.ToneLeftHz } if u.ToneRightHz != nil { next.ToneRightHz = *u.ToneRightHz } if u.ToneAmplitude != nil { next.ToneAmplitude = *u.ToneAmplitude } if u.AudioGain != nil { next.AudioGain = *u.AudioGain } if u.CompositeClipperEnabled != nil { next.CompositeClipperEnabled = *u.CompositeClipperEnabled } e.generator.UpdateLive(next) return nil } func (e *Engine) Start(ctx context.Context) error { e.mu.Lock() if e.state != EngineIdle { e.mu.Unlock() return fmt.Errorf("engine already in state %s", e.state) } if err := e.driver.Start(ctx); err != nil { e.mu.Unlock() return fmt.Errorf("driver start: %w", err) } e.generator.Reset() if e.upsampler != nil { e.upsampler.Reset() } runCtx, cancel := context.WithCancel(ctx) e.cancel = cancel e.state = EngineRunning e.setRuntimeState(RuntimeStateArming) e.startedAt = time.Now() // BUG-A fix: discard any frames left from a previous run so writerLoop // does not send stale data with expired timestamps on restart. e.frameQueue.Drain() e.wg.Add(1) e.mu.Unlock() go e.run(runCtx) return nil } func (e *Engine) Stop(ctx context.Context) error { e.mu.Lock() if e.state != EngineRunning { e.mu.Unlock() return nil } e.state = EngineStopping e.setRuntimeState(RuntimeStateStopping) e.cancel() e.mu.Unlock() // Wait for run() goroutine to exit — deterministic, no guessing e.wg.Wait() if err := e.driver.Flush(ctx); err != nil { return err } if err := e.driver.Stop(ctx); err != nil { return err } e.mu.Lock() e.state = EngineIdle e.setRuntimeState(RuntimeStateIdle) e.mu.Unlock() return nil } func (e *Engine) Stats() EngineStats { e.mu.Lock() state := e.state startedAt := e.startedAt e.mu.Unlock() var uptime float64 if state == EngineRunning { uptime = time.Since(startedAt).Seconds() } errVal, _ := e.lastError.Load().(string) queue := e.frameQueue.Stats() lateBuffers := e.lateBuffers.Load() hasRecentLateBuffers := e.hasRecentLateBuffers() ri := runtimeIndicator(queue.Health, hasRecentLateBuffers) lastFault := e.lastFaultEvent() activePS, activeRT := "", "" if enc := e.generator.RDSEncoder(); enc != nil { activePS, activeRT = enc.CurrentText() } return EngineStats{ State: string(e.currentRuntimeState()), RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(), ChunksProduced: e.chunksProduced.Load(), TotalSamples: e.totalSamples.Load(), Underruns: e.underruns.Load(), LateBuffers: lateBuffers, LastError: errVal, UptimeSeconds: uptime, MaxCycleMs: durationMs(e.maxCycleNs.Load()), MaxGenerateMs: durationMs(e.maxGenerateNs.Load()), MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()), MaxWriteMs: durationMs(e.maxWriteNs.Load()), MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()), MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()), Queue: queue, RuntimeIndicator: ri, RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers), AppliedFrequencyMHz: e.appliedFrequencyMHz(), ActivePS: activePS, ActiveRadioText: activeRT, LastFault: lastFault, DegradedTransitions: e.degradedTransitions.Load(), MutedTransitions: e.mutedTransitions.Load(), FaultedTransitions: e.faultedTransitions.Load(), FaultCount: e.faultEvents.Load(), FaultHistory: e.FaultHistory(), TransitionHistory: e.TransitionHistory(), } } func (e *Engine) appliedFrequencyMHz() float64 { bits := e.appliedFreqHz.Load() return math.Float64frombits(bits) / 1e6 } func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator { switch { case queueHealth == output.QueueHealthCritical: return RuntimeIndicatorQueueCritical case queueHealth == output.QueueHealthLow || recentLateBuffers: return RuntimeIndicatorDegraded default: return RuntimeIndicatorNormal } } func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string { switch { case queueHealth == output.QueueHealthCritical: return "queue health critical" case recentLateBuffers: return "late buffers" case queueHealth == output.QueueHealthLow: return "queue health low" default: return "" } } func runtimeStateSeverity(state RuntimeState) string { switch state { case RuntimeStateRunning: return "ok" case RuntimeStateDegraded, RuntimeStateMuted: return "warn" case RuntimeStateFaulted: return "err" default: return "info" } } func (e *Engine) run(ctx context.Context) { e.setRuntimeState(RuntimeStatePrebuffering) e.wg.Add(1) go e.writerLoop(ctx) defer e.wg.Done() for { if ctx.Err() != nil { return } // Apply pending frequency change between chunks if pf := e.pendingFreq.Swap(nil); pf != nil { if err := e.driver.Tune(ctx, *pf); err != nil { e.lastError.Store(fmt.Sprintf("tune: %v", err)) } else { e.appliedFreqHz.Store(math.Float64bits(*pf)) log.Printf("engine: tuned to %.3f MHz", *pf/1e6) } } t0 := time.Now() frame := e.generator.GenerateFrame(e.chunkDuration) frame.GeneratedAt = t0 t1 := time.Now() if e.upsampler != nil { frame = e.upsampler.Process(frame) frame.GeneratedAt = t0 } t2 := time.Now() genDur := t1.Sub(t0) upDur := t2.Sub(t1) updateMaxDuration(&e.maxGenerateNs, genDur) updateMaxDuration(&e.maxUpsampleNs, upDur) // cloneFrame never returns nil when src is non-nil (NEW-3: dead nil check removed) enqueued := cloneFrame(frame) enqueued.EnqueuedAt = time.Now() if err := e.frameQueue.Push(ctx, enqueued); err != nil { if ctx.Err() != nil { return } if errors.Is(err, output.ErrFrameQueueClosed) { return } e.lastError.Store(err.Error()) e.underruns.Add(1) select { case <-time.After(e.chunkDuration): case <-ctx.Done(): return } continue } queueStats := e.frameQueue.Stats() e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers()) } } func (e *Engine) writerLoop(ctx context.Context) { defer e.wg.Done() for { frame, err := e.frameQueue.Pop(ctx) if err != nil { if ctx.Err() != nil { return } if errors.Is(err, output.ErrFrameQueueClosed) { return } e.lastError.Store(err.Error()) e.underruns.Add(1) continue } frame.DequeuedAt = time.Now() queueResidence := time.Duration(0) if !frame.EnqueuedAt.IsZero() { queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt) } writeStart := time.Now() frame.WriteStartedAt = writeStart n, err := e.driver.Write(ctx, frame) writeDur := time.Since(writeStart) pipelineLatency := writeDur if !frame.GeneratedAt.IsZero() { pipelineLatency = time.Since(frame.GeneratedAt) } updateMaxDuration(&e.maxWriteNs, writeDur) updateMaxDuration(&e.maxQueueResidenceNs, queueResidence) updateMaxDuration(&e.maxPipelineNs, pipelineLatency) updateMaxDuration(&e.maxCycleNs, writeDur) queueStats := e.frameQueue.Stats() e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers()) lateOver := writeDur - e.chunkDuration if lateOver > writeLateTolerance { streak := e.lateBufferStreak.Add(1) late := e.lateBuffers.Add(1) // Only arm the alert window once the streak threshold is reached. // Isolated OS-scheduling or USB jitter spikes (single late writes) // are normal on a loaded system and must not trigger degraded state. // This mirrors the queue-health streak logic. if streak >= lateBufferStreakThreshold { e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano())) } if late <= 5 || late%20 == 0 { log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s", streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency) } } else { // Clean write — reset the consecutive streak so isolated spikes // never accumulate toward the threshold. e.lateBufferStreak.Store(0) } if err != nil { if ctx.Err() != nil { return } e.recordFault(FaultReasonWriteTimeout, FaultSeverityWarn, fmt.Sprintf("driver write error: %v", err)) e.lastError.Store(err.Error()) e.underruns.Add(1) select { case <-time.After(e.chunkDuration): case <-ctx.Done(): return } continue } e.chunksProduced.Add(1) e.totalSamples.Add(uint64(n)) } } func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame { if src == nil { return nil } samples := make([]output.IQSample, len(src.Samples)) copy(samples, src.Samples) return &output.CompositeFrame{ Samples: samples, SampleRateHz: src.SampleRateHz, Timestamp: src.Timestamp, GeneratedAt: src.GeneratedAt, EnqueuedAt: src.EnqueuedAt, DequeuedAt: src.DequeuedAt, WriteStartedAt: src.WriteStartedAt, Sequence: src.Sequence, } } func (e *Engine) setRuntimeState(state RuntimeState) { // NEW-2 fix: hold stateMu so that concurrent calls from run() and // writerLoop() cannot both see prev != state and both record a // spurious duplicate transition. e.stateMu.Lock() defer e.stateMu.Unlock() now := time.Now() prev := e.currentRuntimeState() if prev != state { e.recordRuntimeTransition(prev, state, now) switch state { case RuntimeStateDegraded: e.degradedTransitions.Add(1) case RuntimeStateMuted: e.mutedTransitions.Add(1) case RuntimeStateFaulted: e.faultedTransitions.Add(1) } e.runtimeStateEnteredAt.Store(uint64(now.UnixNano())) } else if e.runtimeStateEnteredAt.Load() == 0 { e.runtimeStateEnteredAt.Store(uint64(now.UnixNano())) } e.runtimeState.Store(state) } func (e *Engine) currentRuntimeState() RuntimeState { if v := e.runtimeState.Load(); v != nil { if rs, ok := v.(RuntimeState); ok { return rs } } return RuntimeStateIdle } func (e *Engine) runtimeStateDurationSeconds() float64 { if ts := e.runtimeStateEnteredAt.Load(); ts != 0 { return time.Since(time.Unix(0, int64(ts))).Seconds() } return 0 } func (e *Engine) hasRecentLateBuffers() bool { lateAlertAt := e.lateBufferAlertAt.Load() if lateAlertAt == 0 { return false } return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow } func (e *Engine) lastFaultEvent() *FaultEvent { return copyFaultEvent(e.loadLastFault()) } // LastFault exposes the most recent captured fault, if any. func (e *Engine) LastFault() *FaultEvent { return e.lastFaultEvent() } func (e *Engine) FaultHistory() []FaultEvent { e.faultHistoryMu.Lock() defer e.faultHistoryMu.Unlock() history := make([]FaultEvent, len(e.faultHistory)) copy(history, e.faultHistory) return history } func (e *Engine) TransitionHistory() []RuntimeTransition { e.transitionHistoryMu.Lock() defer e.transitionHistoryMu.Unlock() history := make([]RuntimeTransition, len(e.transitionHistory)) copy(history, e.transitionHistory) return history } func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) { if when.IsZero() { when = time.Now() } ev := RuntimeTransition{ Time: when, From: from, To: to, Severity: runtimeStateSeverity(to), } e.transitionHistoryMu.Lock() defer e.transitionHistoryMu.Unlock() if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity { copy(e.transitionHistory, e.transitionHistory[1:]) e.transitionHistory[len(e.transitionHistory)-1] = ev return } e.transitionHistory = append(e.transitionHistory, ev) } func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) { if reason == "" { reason = FaultReasonUnknown } now := time.Now() if last := e.loadLastFault(); last != nil { if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow { return } } ev := &FaultEvent{ Time: now, Reason: reason, Severity: severity, Message: message, } e.lastFault.Store(ev) e.appendFaultHistory(ev) e.faultEvents.Add(1) } func (e *Engine) loadLastFault() *FaultEvent { if v := e.lastFault.Load(); v != nil { if ev, ok := v.(*FaultEvent); ok { return ev } } return nil } func copyFaultEvent(source *FaultEvent) *FaultEvent { if source == nil { return nil } copy := *source return © } func (e *Engine) appendFaultHistory(ev *FaultEvent) { e.faultHistoryMu.Lock() defer e.faultHistoryMu.Unlock() if len(e.faultHistory) >= faultHistoryCapacity { copy(e.faultHistory, e.faultHistory[1:]) e.faultHistory[len(e.faultHistory)-1] = *ev return } e.faultHistory = append(e.faultHistory, *ev) } func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) { state := e.currentRuntimeState() switch state { case RuntimeStateStopping, RuntimeStateFaulted: return case RuntimeStateMuted: if queue.Health == output.QueueHealthCritical { if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold { e.mutedFaultStreak.Store(0) e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted, fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth)) e.setRuntimeState(RuntimeStateFaulted) return } } else { e.mutedFaultStreak.Store(0) } if queue.Health == output.QueueHealthNormal && !hasLateBuffers { if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold { e.mutedRecoveryStreak.Store(0) e.mutedFaultStreak.Store(0) e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded, fmt.Sprintf("queue healthy for %d checks after mute", count)) e.setRuntimeState(RuntimeStateDegraded) } } else { e.mutedRecoveryStreak.Store(0) } return } if state == RuntimeStatePrebuffering { if queue.Depth >= 1 { e.setRuntimeState(RuntimeStateRunning) } return } critical := queue.Health == output.QueueHealthCritical if critical { count := e.criticalStreak.Add(1) if count >= queueMutedStreakThreshold { e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted, fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth)) e.setRuntimeState(RuntimeStateMuted) return } if count >= queueCriticalStreakThreshold { e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded, fmt.Sprintf("queue health critical (depth=%d)", queue.Depth)) e.setRuntimeState(RuntimeStateDegraded) return } } else { e.criticalStreak.Store(0) } if hasLateBuffers { e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn, fmt.Sprintf("late buffers detected (health=%s)", queue.Health)) e.setRuntimeState(RuntimeStateDegraded) return } e.setRuntimeState(RuntimeStateRunning) } // ResetFault attempts to move the engine out of the faulted state. func (e *Engine) ResetFault() error { state := e.currentRuntimeState() if state != RuntimeStateFaulted { return fmt.Errorf("engine not in faulted state (current=%s)", state) } e.criticalStreak.Store(0) e.mutedRecoveryStreak.Store(0) e.mutedFaultStreak.Store(0) e.setRuntimeState(RuntimeStateDegraded) return nil }