diff --git a/internal/app/engine.go b/internal/app/engine.go index 2ef1963..a62c11c 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -83,6 +83,7 @@ type EngineStats struct { Queue output.QueueStats `json:"queue"` RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"` RuntimeAlert string `json:"runtimeAlert,omitempty"` + LastFault *FaultEvent `json:"lastFault,omitempty"` } type RuntimeIndicator string @@ -93,8 +94,12 @@ const ( RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical" ) -const lateBufferIndicatorWindow = 5 * time.Second -const queueCriticalStreakThreshold = 3 +const ( + lateBufferIndicatorWindow = 5 * time.Second + queueCriticalStreakThreshold = 3 + faultRepeatWindow = 1 * time.Second + faultHistoryCapacity = 8 +) // Engine is the continuous TX loop. It generates composite IQ in chunks, // resamples to device rate, and pushes to hardware in a tight loop. @@ -128,6 +133,9 @@ type Engine struct { maxUpsampleNs atomic.Uint64 maxWriteNs atomic.Uint64 lastError atomic.Value // string + lastFault atomic.Value // *FaultEvent + faultHistoryMu sync.Mutex + faultHistory []FaultEvent // Live config: pending frequency change, applied between chunks pendingFreq atomic.Pointer[float64] @@ -202,6 +210,7 @@ func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine { deviceRate: deviceRate, state: EngineIdle, frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity), + faultHistory: make([]FaultEvent, 0, faultHistoryCapacity), } engine.setRuntimeState(RuntimeStateIdle) return engine @@ -377,6 +386,7 @@ func (e *Engine) Stats() EngineStats { lateBuffers := e.lateBuffers.Load() hasRecentLateBuffers := e.hasRecentLateBuffers() ri := runtimeIndicator(queue.Health, hasRecentLateBuffers) + lastFault := e.lastFaultEvent() return EngineStats{ State: string(e.currentRuntimeState()), ChunksProduced: e.chunksProduced.Load(), @@ -392,6 +402,7 @@ func (e *Engine) Stats() EngineStats { Queue: queue, RuntimeIndicator: ri, RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers), + LastFault: lastFault, } } @@ -576,6 +587,71 @@ func (e *Engine) hasRecentLateBuffers() bool { return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow } +func (e *Engine) lastFaultEvent() *FaultEvent { + return copyFaultEvent(e.loadLastFault()) +} + +// LastFault exposes the most recent captured fault, if any. +func (e *Engine) LastFault() *FaultEvent { + return e.lastFaultEvent() +} + +func (e *Engine) FaultHistory() []FaultEvent { + e.faultHistoryMu.Lock() + defer e.faultHistoryMu.Unlock() + history := make([]FaultEvent, len(e.faultHistory)) + copy(history, e.faultHistory) + return history +} + +func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) { + if reason == "" { + reason = FaultReasonUnknown + } + now := time.Now() + if last := e.loadLastFault(); last != nil { + if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow { + return + } + } + ev := &FaultEvent{ + Time: now, + Reason: reason, + Severity: severity, + Message: message, + } + e.lastFault.Store(ev) + e.appendFaultHistory(ev) +} + +func (e *Engine) loadLastFault() *FaultEvent { + if v := e.lastFault.Load(); v != nil { + if ev, ok := v.(*FaultEvent); ok { + return ev + } + } + return nil +} + +func copyFaultEvent(source *FaultEvent) *FaultEvent { + if source == nil { + return nil + } + copy := *source + return © +} + +func (e *Engine) appendFaultHistory(ev *FaultEvent) { + e.faultHistoryMu.Lock() + defer e.faultHistoryMu.Unlock() + if len(e.faultHistory) >= faultHistoryCapacity { + copy(e.faultHistory, e.faultHistory[1:]) + e.faultHistory[len(e.faultHistory)-1] = *ev + return + } + e.faultHistory = append(e.faultHistory, *ev) +} + func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) { state := e.currentRuntimeState() switch state { @@ -591,6 +667,8 @@ func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bo critical := queue.Health == output.QueueHealthCritical if critical { if e.criticalStreak.Add(1) >= queueCriticalStreakThreshold { + e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded, + fmt.Sprintf("queue health critical (depth=%d)", queue.Depth)) e.setRuntimeState(RuntimeStateDegraded) return } @@ -598,6 +676,8 @@ func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bo e.criticalStreak.Store(0) } if hasLateBuffers { + e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn, + fmt.Sprintf("late buffers detected (health=%s)", queue.Health)) e.setRuntimeState(RuntimeStateDegraded) return } diff --git a/internal/app/fault.go b/internal/app/fault.go new file mode 100644 index 0000000..efdecb3 --- /dev/null +++ b/internal/app/fault.go @@ -0,0 +1,47 @@ +package app + +import "time" + +type FaultSeverity int + +const ( + FaultSeverityWarn FaultSeverity = iota + FaultSeverityDegraded + FaultSeverityMuted + FaultSeverityFaulted +) + +var faultSeverityNames = []string{"warn", "degraded", "muted", "faulted"} + +func (s FaultSeverity) String() string { + if int(s) < 0 || int(s) >= len(faultSeverityNames) { + return "unknown" + } + return faultSeverityNames[s] +} + +// MarshalText implements encoding.TextMarshaler so that FaultSeverity +// renders as a human-friendly string in JSON and other text contexts. +func (s FaultSeverity) MarshalText() ([]byte, error) { + return []byte(s.String()), nil +} + +type FaultReason string + +const ( + FaultReasonUnknown FaultReason = "unknown" + FaultReasonQueueCritical FaultReason = "queueCritical" + FaultReasonLateBuffers FaultReason = "lateBuffers" + FaultReasonWriteTimeout FaultReason = "writeTimeout" + FaultReasonQueueEmpty FaultReason = "queueEmpty" +) + +// FaultEvent captures a single fault observation along with its severity and +// optional human-readable hint. Fault history and last-fault exposure rely on +// this struct so operators can reason about runtime behavior. +type FaultEvent struct { + Time time.Time `json:"time"` + Reason FaultReason `json:"reason"` + Severity FaultSeverity `json:"severity"` + Message string `json:"message,omitempty"` +} diff --git a/internal/app/fault_test.go b/internal/app/fault_test.go new file mode 100644 index 0000000..4637e25 --- /dev/null +++ b/internal/app/fault_test.go @@ -0,0 +1,72 @@ +package app + +import ( + "testing" + + cfgpkg "github.com/jan/fm-rds-tx/internal/config" + "github.com/jan/fm-rds-tx/internal/output" + "github.com/jan/fm-rds-tx/internal/platform" +) + +func TestFaultSeverityString(t *testing.T) { + cases := []struct { + severity FaultSeverity + want string + }{ + {FaultSeverityWarn, "warn"}, + {FaultSeverityDegraded, "degraded"}, + {FaultSeverityMuted, "muted"}, + {FaultSeverityFaulted, "faulted"}, + {FaultSeverity(99), "unknown"}, + } + for _, tc := range cases { + t.Run(tc.want, func(t *testing.T) { + if got := tc.severity.String(); got != tc.want { + t.Fatalf("expected %s, got %s", tc.want, got) + } + if txt, _ := tc.severity.MarshalText(); string(txt) != tc.want { + t.Fatalf("MarshalText mismatch: want %s, got %s", tc.want, txt) + } + }) + } +} + +func TestEngineRecordsQueueCriticalFault(t *testing.T) { + e := NewEngine(cfgpkg.Default(), platform.NewSimulatedDriver(nil)) + e.setRuntimeState(RuntimeStateRunning) + + queue := output.QueueStats{Depth: 3, Health: output.QueueHealthCritical} + for i := 0; i < queueCriticalStreakThreshold; i++ { + e.evaluateRuntimeState(queue, false) + } + + last := e.LastFault() + if last == nil { + t.Fatal("expected fault recorded, got nil") + } + if last.Reason != FaultReasonQueueCritical { + t.Fatalf("expected queue critical reason, got %s", last.Reason) + } + if last.Severity != FaultSeverityDegraded { + t.Fatalf("expected degraded severity, got %s", last.Severity) + } +} + +func TestEngineRecordsLateBufferFault(t *testing.T) { + e := NewEngine(cfgpkg.Default(), platform.NewSimulatedDriver(nil)) + e.setRuntimeState(RuntimeStateRunning) + + queue := output.QueueStats{Depth: 5, Health: output.QueueHealthNormal} + e.evaluateRuntimeState(queue, true) + + last := e.LastFault() + if last == nil { + t.Fatal("expected fault recorded for late buffers") + } + if last.Reason != FaultReasonLateBuffers { + t.Fatalf("expected late buffer reason, got %s", last.Reason) + } + if last.Severity != FaultSeverityWarn { + t.Fatalf("expected warn severity, got %s", last.Severity) + } +}