|
- package app
-
- import (
- "context"
- "errors"
- "fmt"
- "log"
- "math"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/jan/fm-rds-tx/internal/audio"
- cfgpkg "github.com/jan/fm-rds-tx/internal/config"
- "github.com/jan/fm-rds-tx/internal/dsp"
- offpkg "github.com/jan/fm-rds-tx/internal/offline"
- "github.com/jan/fm-rds-tx/internal/output"
- "github.com/jan/fm-rds-tx/internal/platform"
- )
-
- type EngineState int
-
- const (
- EngineIdle EngineState = iota
- EngineRunning
- EngineStopping
- )
-
- func (s EngineState) String() string {
- switch s {
- case EngineIdle:
- return "idle"
- case EngineRunning:
- return "running"
- case EngineStopping:
- return "stopping"
- default:
- return "unknown"
- }
- }
-
- type RuntimeState string
-
- const (
- RuntimeStateIdle RuntimeState = "idle"
- RuntimeStateArming RuntimeState = "arming"
- RuntimeStatePrebuffering RuntimeState = "prebuffering"
- RuntimeStateRunning RuntimeState = "running"
- RuntimeStateDegraded RuntimeState = "degraded"
- RuntimeStateMuted RuntimeState = "muted"
- RuntimeStateFaulted RuntimeState = "faulted"
- RuntimeStateStopping RuntimeState = "stopping"
- )
-
- func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
- v := uint64(d)
- for {
- cur := dst.Load()
- if v <= cur {
- return
- }
- if dst.CompareAndSwap(cur, v) {
- return
- }
- }
- }
-
- func durationMs(ns uint64) float64 {
- return float64(ns) / float64(time.Millisecond)
- }
-
- type EngineStats struct {
- State string `json:"state"`
- RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"`
- ChunksProduced uint64 `json:"chunksProduced"`
- TotalSamples uint64 `json:"totalSamples"`
- Underruns uint64 `json:"underruns"`
- LateBuffers uint64 `json:"lateBuffers,omitempty"`
- LastError string `json:"lastError,omitempty"`
- UptimeSeconds float64 `json:"uptimeSeconds"`
- MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
- MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
- MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
- MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
- MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"`
- MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"`
- Queue output.QueueStats `json:"queue"`
- RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
- RuntimeAlert string `json:"runtimeAlert,omitempty"`
- AppliedFrequencyMHz float64 `json:"appliedFrequencyMHz"`
- ActivePS string `json:"activePS,omitempty"`
- ActiveRadioText string `json:"activeRadioText,omitempty"`
- LastFault *FaultEvent `json:"lastFault,omitempty"`
- DegradedTransitions uint64 `json:"degradedTransitions"`
- MutedTransitions uint64 `json:"mutedTransitions"`
- FaultedTransitions uint64 `json:"faultedTransitions"`
- FaultCount uint64 `json:"faultCount"`
- FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
- TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
- }
-
- type RuntimeIndicator string
-
- const (
- RuntimeIndicatorNormal RuntimeIndicator = "normal"
- RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
- RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
- )
-
- type RuntimeTransition struct {
- Time time.Time `json:"time"`
- From RuntimeState `json:"from"`
- To RuntimeState `json:"to"`
- Severity string `json:"severity"`
- }
-
- const (
- lateBufferIndicatorWindow = 2 * time.Second
- writeLateTolerance = 10 * time.Millisecond
- queueCriticalStreakThreshold = 3
- queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
- queueMutedRecoveryThreshold = queueCriticalStreakThreshold
- queueFaultedStreakThreshold = queueCriticalStreakThreshold
- faultRepeatWindow = 1 * time.Second
- lateBufferStreakThreshold = 3 // consecutive late writes required before alerting
- faultHistoryCapacity = 8
- runtimeTransitionHistoryCapacity = 8
- )
-
- // Engine is the continuous TX loop. It generates composite IQ in chunks,
- // resamples to device rate, and pushes to hardware in a tight loop.
- // The hardware buffer_push call is blocking — it returns when the hardware
- // has consumed the previous buffer and is ready for the next one.
- // This naturally paces the loop to real-time without a ticker.
- type Engine struct {
- cfg cfgpkg.Config
- driver platform.SoapyDriver
- generator *offpkg.Generator
- upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
- chunkDuration time.Duration
- deviceRate float64
- frameQueue *output.FrameQueue
-
- mu sync.Mutex
- state EngineState
- cancel context.CancelFunc
- startedAt time.Time
- wg sync.WaitGroup
- runtimeState atomic.Value
- stateMu sync.Mutex // guards setRuntimeState check-then-store (NEW-2 fix)
-
- chunksProduced atomic.Uint64
- totalSamples atomic.Uint64
- underruns atomic.Uint64
- lateBuffers atomic.Uint64
- lateBufferAlertAt atomic.Uint64
- lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write
- criticalStreak atomic.Uint64
- mutedRecoveryStreak atomic.Uint64
- mutedFaultStreak atomic.Uint64
- maxCycleNs atomic.Uint64
- maxGenerateNs atomic.Uint64
- maxUpsampleNs atomic.Uint64
- maxWriteNs atomic.Uint64
- maxQueueResidenceNs atomic.Uint64
- maxPipelineNs atomic.Uint64
- lastError atomic.Value // string
- lastFault atomic.Value // *FaultEvent
- faultHistoryMu sync.Mutex
- faultHistory []FaultEvent
- transitionHistoryMu sync.Mutex
- transitionHistory []RuntimeTransition
-
- degradedTransitions atomic.Uint64
- mutedTransitions atomic.Uint64
- faultedTransitions atomic.Uint64
- faultEvents atomic.Uint64
- runtimeStateEnteredAt atomic.Uint64
-
- // Live config: pending frequency change, applied between chunks
- pendingFreq atomic.Pointer[float64]
- // Most recently tuned frequency (Hz)
- appliedFreqHz atomic.Uint64
-
- // Live audio stream (optional)
- streamSrc *audio.StreamSource
- }
-
- // SetStreamSource configures a live audio stream as the audio source.
- // Must be called before Start(). The StreamResampler is created internally
- // to convert from the stream's sample rate to the DSP composite rate.
- func (e *Engine) SetStreamSource(src *audio.StreamSource) {
- e.streamSrc = src
- compositeRate := float64(e.cfg.FM.CompositeRateHz)
- if compositeRate <= 0 {
- compositeRate = 228000
- }
- resampler := audio.NewStreamResampler(src, compositeRate)
- e.generator.SetExternalSource(resampler)
- log.Printf("engine: live audio stream wired — initial %d Hz → %.0f Hz composite (buffer %d frames); actual decoded rate auto-corrects on first chunk",
- src.SampleRate, compositeRate, src.Stats().Capacity)
- }
-
- // StreamSource returns the live audio stream source, or nil.
- // Used by the control server for stats and HTTP audio ingest.
- func (e *Engine) StreamSource() *audio.StreamSource {
- return e.streamSrc
- }
-
- func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
- deviceRate := cfg.EffectiveDeviceRate()
- compositeRate := float64(cfg.FM.CompositeRateHz)
- if compositeRate <= 0 {
- compositeRate = 228000
- }
-
- var upsampler *dsp.FMUpsampler
-
- if deviceRate > compositeRate*1.001 {
- // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
- // FMUpsampler handles FM modulation + interpolation to deviceRate.
- // This halves CPU load compared to running all DSP at deviceRate.
- cfg.FM.FMModulationEnabled = false
- maxDev := cfg.FM.MaxDeviationHz
- if maxDev <= 0 {
- maxDev = 75000
- }
- // mpxGain scales the FM deviation to compensate for hardware
- // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
- if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
- maxDev *= cfg.FM.MpxGain
- }
- upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
- log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
- compositeRate, deviceRate, deviceRate/compositeRate)
- } else {
- // Same-rate: entire DSP chain runs at deviceRate.
- // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
- if deviceRate > 0 {
- cfg.FM.CompositeRateHz = int(deviceRate)
- }
- cfg.FM.FMModulationEnabled = true
- log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
- }
-
- engine := &Engine{
- cfg: cfg,
- driver: driver,
- generator: offpkg.NewGenerator(cfg),
- upsampler: upsampler,
- chunkDuration: 50 * time.Millisecond,
- deviceRate: deviceRate,
- state: EngineIdle,
- frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
- faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
- transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
- }
- initFreqHz := cfg.FM.FrequencyMHz * 1e6
- engine.appliedFreqHz.Store(math.Float64bits(initFreqHz))
- engine.setRuntimeState(RuntimeStateIdle)
- return engine
- }
-
- func (e *Engine) SetChunkDuration(d time.Duration) {
- e.chunkDuration = d
- }
-
- // LiveConfigUpdate carries hot-reloadable parameters from the control API.
- // nil pointers mean "no change". Validated before applying.
- type LiveConfigUpdate struct {
- FrequencyMHz *float64
- OutputDrive *float64
- StereoEnabled *bool
- PilotLevel *float64
- RDSInjection *float64
- RDSEnabled *bool
- LimiterEnabled *bool
- LimiterCeiling *float64
- PS *string
- RadioText *string
- // Tone and gain: live-patchable without engine restart.
- ToneLeftHz *float64
- ToneRightHz *float64
- ToneAmplitude *float64
- AudioGain *float64
- }
-
- // UpdateConfig applies live parameter changes without restarting the engine.
- // DSP params take effect at the next chunk boundary (~50ms max).
- // Frequency changes are applied between chunks via driver.Tune().
- // RDS text updates are applied at the next RDS group boundary (~88ms).
- func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
- // --- Validate ---
- if u.FrequencyMHz != nil {
- if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
- return fmt.Errorf("frequencyMHz out of range (65-110)")
- }
- }
- if u.OutputDrive != nil {
- if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
- return fmt.Errorf("outputDrive out of range (0-10)")
- }
- }
- if u.PilotLevel != nil {
- if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
- return fmt.Errorf("pilotLevel out of range (0-0.2)")
- }
- }
- if u.RDSInjection != nil {
- if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
- return fmt.Errorf("rdsInjection out of range (0-0.15)")
- }
- }
- if u.LimiterCeiling != nil {
- if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
- return fmt.Errorf("limiterCeiling out of range (0-2)")
- }
- }
- if u.ToneAmplitude != nil {
- if *u.ToneAmplitude < 0 || *u.ToneAmplitude > 1 {
- return fmt.Errorf("toneAmplitude out of range (0-1)")
- }
- }
- if u.AudioGain != nil {
- if *u.AudioGain < 0 || *u.AudioGain > 4 {
- return fmt.Errorf("audioGain out of range (0-4)")
- }
- }
-
- // --- Frequency: store for run loop to apply via driver.Tune() ---
- if u.FrequencyMHz != nil {
- freqHz := *u.FrequencyMHz * 1e6
- e.pendingFreq.Store(&freqHz)
- }
-
- // --- RDS text: forward to encoder atomics ---
- if u.PS != nil || u.RadioText != nil {
- if enc := e.generator.RDSEncoder(); enc != nil {
- ps, rt := "", ""
- if u.PS != nil {
- ps = *u.PS
- }
- if u.RadioText != nil {
- rt = *u.RadioText
- }
- enc.UpdateText(ps, rt)
- }
- }
-
- // --- DSP params: build new LiveParams from current + patch ---
- // Read current, apply deltas, store new
- current := e.generator.CurrentLiveParams()
- next := current // copy
-
- if u.OutputDrive != nil {
- next.OutputDrive = *u.OutputDrive
- }
- if u.StereoEnabled != nil {
- next.StereoEnabled = *u.StereoEnabled
- }
- if u.PilotLevel != nil {
- next.PilotLevel = *u.PilotLevel
- }
- if u.RDSInjection != nil {
- next.RDSInjection = *u.RDSInjection
- }
- if u.RDSEnabled != nil {
- next.RDSEnabled = *u.RDSEnabled
- }
- if u.LimiterEnabled != nil {
- next.LimiterEnabled = *u.LimiterEnabled
- }
- if u.LimiterCeiling != nil {
- next.LimiterCeiling = *u.LimiterCeiling
- }
- if u.ToneLeftHz != nil {
- next.ToneLeftHz = *u.ToneLeftHz
- }
- if u.ToneRightHz != nil {
- next.ToneRightHz = *u.ToneRightHz
- }
- if u.ToneAmplitude != nil {
- next.ToneAmplitude = *u.ToneAmplitude
- }
- if u.AudioGain != nil {
- next.AudioGain = *u.AudioGain
- }
-
- e.generator.UpdateLive(next)
- return nil
- }
-
- func (e *Engine) Start(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineIdle {
- e.mu.Unlock()
- return fmt.Errorf("engine already in state %s", e.state)
- }
-
- if err := e.driver.Start(ctx); err != nil {
- e.mu.Unlock()
- return fmt.Errorf("driver start: %w", err)
- }
-
- runCtx, cancel := context.WithCancel(ctx)
- e.cancel = cancel
- e.state = EngineRunning
- e.setRuntimeState(RuntimeStateArming)
- e.startedAt = time.Now()
- e.wg.Add(1)
- e.mu.Unlock()
-
- go e.run(runCtx)
- return nil
- }
-
- func (e *Engine) Stop(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineRunning {
- e.mu.Unlock()
- return nil
- }
- e.state = EngineStopping
- e.setRuntimeState(RuntimeStateStopping)
- e.cancel()
- e.mu.Unlock()
-
- // Wait for run() goroutine to exit — deterministic, no guessing
- e.wg.Wait()
-
- if err := e.driver.Flush(ctx); err != nil {
- return err
- }
- if err := e.driver.Stop(ctx); err != nil {
- return err
- }
-
- e.mu.Lock()
- e.state = EngineIdle
- e.setRuntimeState(RuntimeStateIdle)
- e.mu.Unlock()
- return nil
- }
-
- func (e *Engine) Stats() EngineStats {
- e.mu.Lock()
- state := e.state
- startedAt := e.startedAt
- e.mu.Unlock()
-
- var uptime float64
- if state == EngineRunning {
- uptime = time.Since(startedAt).Seconds()
- }
- errVal, _ := e.lastError.Load().(string)
-
- queue := e.frameQueue.Stats()
- lateBuffers := e.lateBuffers.Load()
- hasRecentLateBuffers := e.hasRecentLateBuffers()
- ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
- lastFault := e.lastFaultEvent()
- activePS, activeRT := "", ""
- if enc := e.generator.RDSEncoder(); enc != nil {
- activePS, activeRT = enc.CurrentText()
- }
- return EngineStats{
- State: string(e.currentRuntimeState()),
- RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(),
- ChunksProduced: e.chunksProduced.Load(),
- TotalSamples: e.totalSamples.Load(),
- Underruns: e.underruns.Load(),
- LateBuffers: lateBuffers,
- LastError: errVal,
- UptimeSeconds: uptime,
- MaxCycleMs: durationMs(e.maxCycleNs.Load()),
- MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
- MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
- MaxWriteMs: durationMs(e.maxWriteNs.Load()),
- MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()),
- MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()),
- Queue: queue,
- RuntimeIndicator: ri,
- RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
- AppliedFrequencyMHz: e.appliedFrequencyMHz(),
- ActivePS: activePS,
- ActiveRadioText: activeRT,
- LastFault: lastFault,
- DegradedTransitions: e.degradedTransitions.Load(),
- MutedTransitions: e.mutedTransitions.Load(),
- FaultedTransitions: e.faultedTransitions.Load(),
- FaultCount: e.faultEvents.Load(),
- FaultHistory: e.FaultHistory(),
- TransitionHistory: e.TransitionHistory(),
- }
- }
-
- func (e *Engine) appliedFrequencyMHz() float64 {
- bits := e.appliedFreqHz.Load()
- return math.Float64frombits(bits) / 1e6
- }
-
- func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
- switch {
- case queueHealth == output.QueueHealthCritical:
- return RuntimeIndicatorQueueCritical
- case queueHealth == output.QueueHealthLow || recentLateBuffers:
- return RuntimeIndicatorDegraded
- default:
- return RuntimeIndicatorNormal
- }
- }
-
- func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
- switch {
- case queueHealth == output.QueueHealthCritical:
- return "queue health critical"
- case recentLateBuffers:
- return "late buffers"
- case queueHealth == output.QueueHealthLow:
- return "queue health low"
- default:
- return ""
- }
- }
-
- func runtimeStateSeverity(state RuntimeState) string {
- switch state {
- case RuntimeStateRunning:
- return "ok"
- case RuntimeStateDegraded, RuntimeStateMuted:
- return "warn"
- case RuntimeStateFaulted:
- return "err"
- default:
- return "info"
- }
- }
-
- func (e *Engine) run(ctx context.Context) {
- e.setRuntimeState(RuntimeStatePrebuffering)
- e.wg.Add(1)
- go e.writerLoop(ctx)
- defer e.wg.Done()
-
- for {
- if ctx.Err() != nil {
- return
- }
-
- // Apply pending frequency change between chunks
- if pf := e.pendingFreq.Swap(nil); pf != nil {
- if err := e.driver.Tune(ctx, *pf); err != nil {
- e.lastError.Store(fmt.Sprintf("tune: %v", err))
- } else {
- e.appliedFreqHz.Store(math.Float64bits(*pf))
- log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
- }
- }
-
- t0 := time.Now()
- frame := e.generator.GenerateFrame(e.chunkDuration)
- frame.GeneratedAt = t0
- t1 := time.Now()
- if e.upsampler != nil {
- frame = e.upsampler.Process(frame)
- frame.GeneratedAt = t0
- }
- t2 := time.Now()
-
- genDur := t1.Sub(t0)
- upDur := t2.Sub(t1)
- updateMaxDuration(&e.maxGenerateNs, genDur)
- updateMaxDuration(&e.maxUpsampleNs, upDur)
-
- // cloneFrame never returns nil when src is non-nil (NEW-3: dead nil check removed)
- enqueued := cloneFrame(frame)
- enqueued.EnqueuedAt = time.Now()
-
- if err := e.frameQueue.Push(ctx, enqueued); err != nil {
- if ctx.Err() != nil {
- return
- }
- if errors.Is(err, output.ErrFrameQueueClosed) {
- return
- }
- e.lastError.Store(err.Error())
- e.underruns.Add(1)
- select {
- case <-time.After(e.chunkDuration):
- case <-ctx.Done():
- return
- }
- continue
- }
- queueStats := e.frameQueue.Stats()
- e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
- }
- }
-
- func (e *Engine) writerLoop(ctx context.Context) {
- defer e.wg.Done()
- for {
- frame, err := e.frameQueue.Pop(ctx)
- if err != nil {
- if ctx.Err() != nil {
- return
- }
- if errors.Is(err, output.ErrFrameQueueClosed) {
- return
- }
- e.lastError.Store(err.Error())
- e.underruns.Add(1)
- continue
- }
-
- frame.DequeuedAt = time.Now()
- queueResidence := time.Duration(0)
- if !frame.EnqueuedAt.IsZero() {
- queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt)
- }
-
- writeStart := time.Now()
- frame.WriteStartedAt = writeStart
- n, err := e.driver.Write(ctx, frame)
- writeDur := time.Since(writeStart)
-
- pipelineLatency := writeDur
- if !frame.GeneratedAt.IsZero() {
- pipelineLatency = time.Since(frame.GeneratedAt)
- }
-
- updateMaxDuration(&e.maxWriteNs, writeDur)
- updateMaxDuration(&e.maxQueueResidenceNs, queueResidence)
- updateMaxDuration(&e.maxPipelineNs, pipelineLatency)
- updateMaxDuration(&e.maxCycleNs, writeDur)
- queueStats := e.frameQueue.Stats()
- e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
-
- lateOver := writeDur - e.chunkDuration
- if lateOver > writeLateTolerance {
- streak := e.lateBufferStreak.Add(1)
- late := e.lateBuffers.Add(1)
- // Only arm the alert window once the streak threshold is reached.
- // Isolated OS-scheduling or USB jitter spikes (single late writes)
- // are normal on a loaded system and must not trigger degraded state.
- // This mirrors the queue-health streak logic.
- if streak >= lateBufferStreakThreshold {
- e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
- }
- if late <= 5 || late%20 == 0 {
- log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s",
- streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency)
- }
- } else {
- // Clean write — reset the consecutive streak so isolated spikes
- // never accumulate toward the threshold.
- e.lateBufferStreak.Store(0)
- }
-
- if err != nil {
- if ctx.Err() != nil {
- return
- }
- e.recordFault(FaultReasonWriteTimeout, FaultSeverityWarn, fmt.Sprintf("driver write error: %v", err))
- e.lastError.Store(err.Error())
- e.underruns.Add(1)
- select {
- case <-time.After(e.chunkDuration):
- case <-ctx.Done():
- return
- }
- continue
- }
-
- e.chunksProduced.Add(1)
- e.totalSamples.Add(uint64(n))
- }
- }
-
- func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
- if src == nil {
- return nil
- }
- samples := make([]output.IQSample, len(src.Samples))
- copy(samples, src.Samples)
- return &output.CompositeFrame{
- Samples: samples,
- SampleRateHz: src.SampleRateHz,
- Timestamp: src.Timestamp,
- GeneratedAt: src.GeneratedAt,
- EnqueuedAt: src.EnqueuedAt,
- DequeuedAt: src.DequeuedAt,
- WriteStartedAt: src.WriteStartedAt,
- Sequence: src.Sequence,
- }
- }
-
- func (e *Engine) setRuntimeState(state RuntimeState) {
- // NEW-2 fix: hold stateMu so that concurrent calls from run() and
- // writerLoop() cannot both see prev != state and both record a
- // spurious duplicate transition.
- e.stateMu.Lock()
- defer e.stateMu.Unlock()
- now := time.Now()
- prev := e.currentRuntimeState()
- if prev != state {
- e.recordRuntimeTransition(prev, state, now)
- switch state {
- case RuntimeStateDegraded:
- e.degradedTransitions.Add(1)
- case RuntimeStateMuted:
- e.mutedTransitions.Add(1)
- case RuntimeStateFaulted:
- e.faultedTransitions.Add(1)
- }
- e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
- } else if e.runtimeStateEnteredAt.Load() == 0 {
- e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
- }
- e.runtimeState.Store(state)
- }
-
- func (e *Engine) currentRuntimeState() RuntimeState {
- if v := e.runtimeState.Load(); v != nil {
- if rs, ok := v.(RuntimeState); ok {
- return rs
- }
- }
- return RuntimeStateIdle
- }
-
- func (e *Engine) runtimeStateDurationSeconds() float64 {
- if ts := e.runtimeStateEnteredAt.Load(); ts != 0 {
- return time.Since(time.Unix(0, int64(ts))).Seconds()
- }
- return 0
- }
-
- func (e *Engine) hasRecentLateBuffers() bool {
- lateAlertAt := e.lateBufferAlertAt.Load()
- if lateAlertAt == 0 {
- return false
- }
- return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
- }
-
- func (e *Engine) lastFaultEvent() *FaultEvent {
- return copyFaultEvent(e.loadLastFault())
- }
-
- // LastFault exposes the most recent captured fault, if any.
- func (e *Engine) LastFault() *FaultEvent {
- return e.lastFaultEvent()
- }
-
- func (e *Engine) FaultHistory() []FaultEvent {
- e.faultHistoryMu.Lock()
- defer e.faultHistoryMu.Unlock()
- history := make([]FaultEvent, len(e.faultHistory))
- copy(history, e.faultHistory)
- return history
- }
-
- func (e *Engine) TransitionHistory() []RuntimeTransition {
- e.transitionHistoryMu.Lock()
- defer e.transitionHistoryMu.Unlock()
- history := make([]RuntimeTransition, len(e.transitionHistory))
- copy(history, e.transitionHistory)
- return history
- }
-
- func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) {
- if when.IsZero() {
- when = time.Now()
- }
- ev := RuntimeTransition{
- Time: when,
- From: from,
- To: to,
- Severity: runtimeStateSeverity(to),
- }
- e.transitionHistoryMu.Lock()
- defer e.transitionHistoryMu.Unlock()
- if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
- copy(e.transitionHistory, e.transitionHistory[1:])
- e.transitionHistory[len(e.transitionHistory)-1] = ev
- return
- }
- e.transitionHistory = append(e.transitionHistory, ev)
- }
-
- func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
- if reason == "" {
- reason = FaultReasonUnknown
- }
- now := time.Now()
- if last := e.loadLastFault(); last != nil {
- if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
- return
- }
- }
- ev := &FaultEvent{
- Time: now,
- Reason: reason,
- Severity: severity,
- Message: message,
- }
- e.lastFault.Store(ev)
- e.appendFaultHistory(ev)
- e.faultEvents.Add(1)
- }
-
- func (e *Engine) loadLastFault() *FaultEvent {
- if v := e.lastFault.Load(); v != nil {
- if ev, ok := v.(*FaultEvent); ok {
- return ev
- }
- }
- return nil
- }
-
- func copyFaultEvent(source *FaultEvent) *FaultEvent {
- if source == nil {
- return nil
- }
- copy := *source
- return ©
- }
-
- func (e *Engine) appendFaultHistory(ev *FaultEvent) {
- e.faultHistoryMu.Lock()
- defer e.faultHistoryMu.Unlock()
- if len(e.faultHistory) >= faultHistoryCapacity {
- copy(e.faultHistory, e.faultHistory[1:])
- e.faultHistory[len(e.faultHistory)-1] = *ev
- return
- }
- e.faultHistory = append(e.faultHistory, *ev)
- }
-
- func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
- state := e.currentRuntimeState()
- switch state {
- case RuntimeStateStopping, RuntimeStateFaulted:
- return
- case RuntimeStateMuted:
- if queue.Health == output.QueueHealthCritical {
- if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
- e.mutedFaultStreak.Store(0)
- e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
- fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
- e.setRuntimeState(RuntimeStateFaulted)
- return
- }
- } else {
- e.mutedFaultStreak.Store(0)
- }
- if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
- if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
- e.mutedRecoveryStreak.Store(0)
- e.mutedFaultStreak.Store(0)
- e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
- fmt.Sprintf("queue healthy for %d checks after mute", count))
- e.setRuntimeState(RuntimeStateDegraded)
- }
- } else {
- e.mutedRecoveryStreak.Store(0)
- }
- return
- }
- if state == RuntimeStatePrebuffering {
- if queue.Depth >= 1 {
- e.setRuntimeState(RuntimeStateRunning)
- }
- return
- }
- critical := queue.Health == output.QueueHealthCritical
- if critical {
- count := e.criticalStreak.Add(1)
- if count >= queueMutedStreakThreshold {
- e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
- fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
- e.setRuntimeState(RuntimeStateMuted)
- return
- }
- if count >= queueCriticalStreakThreshold {
- e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
- fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
- e.setRuntimeState(RuntimeStateDegraded)
- return
- }
- } else {
- e.criticalStreak.Store(0)
- }
- if hasLateBuffers {
- e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
- fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
- e.setRuntimeState(RuntimeStateDegraded)
- return
- }
- e.setRuntimeState(RuntimeStateRunning)
- }
-
- // ResetFault attempts to move the engine out of the faulted state.
- func (e *Engine) ResetFault() error {
- state := e.currentRuntimeState()
- if state != RuntimeStateFaulted {
- return fmt.Errorf("engine not in faulted state (current=%s)", state)
- }
-
- e.criticalStreak.Store(0)
- e.mutedRecoveryStreak.Store(0)
- e.mutedFaultStreak.Store(0)
- e.setRuntimeState(RuntimeStateDegraded)
- return nil
- }
|