Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

761 lines
21KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. ChunksProduced uint64 `json:"chunksProduced"`
  64. TotalSamples uint64 `json:"totalSamples"`
  65. Underruns uint64 `json:"underruns"`
  66. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  67. LastError string `json:"lastError,omitempty"`
  68. UptimeSeconds float64 `json:"uptimeSeconds"`
  69. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  70. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  71. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  72. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  73. Queue output.QueueStats `json:"queue"`
  74. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  75. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  76. LastFault *FaultEvent `json:"lastFault,omitempty"`
  77. DegradedTransitions uint64 `json:"degradedTransitions"`
  78. MutedTransitions uint64 `json:"mutedTransitions"`
  79. FaultedTransitions uint64 `json:"faultedTransitions"`
  80. FaultCount uint64 `json:"faultCount"`
  81. }
  82. type RuntimeIndicator string
  83. const (
  84. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  85. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  86. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  87. )
  88. const (
  89. lateBufferIndicatorWindow = 5 * time.Second
  90. queueCriticalStreakThreshold = 3
  91. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  92. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  93. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  94. faultRepeatWindow = 1 * time.Second
  95. faultHistoryCapacity = 8
  96. )
  97. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  98. // resamples to device rate, and pushes to hardware in a tight loop.
  99. // The hardware buffer_push call is blocking — it returns when the hardware
  100. // has consumed the previous buffer and is ready for the next one.
  101. // This naturally paces the loop to real-time without a ticker.
  102. type Engine struct {
  103. cfg cfgpkg.Config
  104. driver platform.SoapyDriver
  105. generator *offpkg.Generator
  106. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  107. chunkDuration time.Duration
  108. deviceRate float64
  109. frameQueue *output.FrameQueue
  110. mu sync.Mutex
  111. state EngineState
  112. cancel context.CancelFunc
  113. startedAt time.Time
  114. wg sync.WaitGroup
  115. runtimeState atomic.Value
  116. chunksProduced atomic.Uint64
  117. totalSamples atomic.Uint64
  118. underruns atomic.Uint64
  119. lateBuffers atomic.Uint64
  120. lateBufferAlertAt atomic.Uint64
  121. criticalStreak atomic.Uint64
  122. mutedRecoveryStreak atomic.Uint64
  123. mutedFaultStreak atomic.Uint64
  124. maxCycleNs atomic.Uint64
  125. maxGenerateNs atomic.Uint64
  126. maxUpsampleNs atomic.Uint64
  127. maxWriteNs atomic.Uint64
  128. lastError atomic.Value // string
  129. lastFault atomic.Value // *FaultEvent
  130. faultHistoryMu sync.Mutex
  131. faultHistory []FaultEvent
  132. degradedTransitions atomic.Uint64
  133. mutedTransitions atomic.Uint64
  134. faultedTransitions atomic.Uint64
  135. faultEvents atomic.Uint64
  136. // Live config: pending frequency change, applied between chunks
  137. pendingFreq atomic.Pointer[float64]
  138. // Live audio stream (optional)
  139. streamSrc *audio.StreamSource
  140. }
  141. // SetStreamSource configures a live audio stream as the audio source.
  142. // Must be called before Start(). The StreamResampler is created internally
  143. // to convert from the stream's sample rate to the DSP composite rate.
  144. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  145. e.streamSrc = src
  146. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  147. if compositeRate <= 0 {
  148. compositeRate = 228000
  149. }
  150. resampler := audio.NewStreamResampler(src, compositeRate)
  151. e.generator.SetExternalSource(resampler)
  152. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  153. src.SampleRate, compositeRate, src.Stats().Capacity)
  154. }
  155. // StreamSource returns the live audio stream source, or nil.
  156. // Used by the control server for stats and HTTP audio ingest.
  157. func (e *Engine) StreamSource() *audio.StreamSource {
  158. return e.streamSrc
  159. }
  160. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  161. deviceRate := cfg.EffectiveDeviceRate()
  162. compositeRate := float64(cfg.FM.CompositeRateHz)
  163. if compositeRate <= 0 {
  164. compositeRate = 228000
  165. }
  166. var upsampler *dsp.FMUpsampler
  167. if deviceRate > compositeRate*1.001 {
  168. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  169. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  170. // This halves CPU load compared to running all DSP at deviceRate.
  171. cfg.FM.FMModulationEnabled = false
  172. maxDev := cfg.FM.MaxDeviationHz
  173. if maxDev <= 0 {
  174. maxDev = 75000
  175. }
  176. // mpxGain scales the FM deviation to compensate for hardware
  177. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  178. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  179. maxDev *= cfg.FM.MpxGain
  180. }
  181. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  182. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  183. compositeRate, deviceRate, deviceRate/compositeRate)
  184. } else {
  185. // Same-rate: entire DSP chain runs at deviceRate.
  186. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  187. if deviceRate > 0 {
  188. cfg.FM.CompositeRateHz = int(deviceRate)
  189. }
  190. cfg.FM.FMModulationEnabled = true
  191. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  192. }
  193. engine := &Engine{
  194. cfg: cfg,
  195. driver: driver,
  196. generator: offpkg.NewGenerator(cfg),
  197. upsampler: upsampler,
  198. chunkDuration: 50 * time.Millisecond,
  199. deviceRate: deviceRate,
  200. state: EngineIdle,
  201. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  202. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  203. }
  204. engine.setRuntimeState(RuntimeStateIdle)
  205. return engine
  206. }
  207. func (e *Engine) SetChunkDuration(d time.Duration) {
  208. e.chunkDuration = d
  209. }
  210. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  211. // nil pointers mean "no change". Validated before applying.
  212. type LiveConfigUpdate struct {
  213. FrequencyMHz *float64
  214. OutputDrive *float64
  215. StereoEnabled *bool
  216. PilotLevel *float64
  217. RDSInjection *float64
  218. RDSEnabled *bool
  219. LimiterEnabled *bool
  220. LimiterCeiling *float64
  221. PS *string
  222. RadioText *string
  223. }
  224. // UpdateConfig applies live parameter changes without restarting the engine.
  225. // DSP params take effect at the next chunk boundary (~50ms max).
  226. // Frequency changes are applied between chunks via driver.Tune().
  227. // RDS text updates are applied at the next RDS group boundary (~88ms).
  228. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  229. // --- Validate ---
  230. if u.FrequencyMHz != nil {
  231. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  232. return fmt.Errorf("frequencyMHz out of range (65-110)")
  233. }
  234. }
  235. if u.OutputDrive != nil {
  236. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  237. return fmt.Errorf("outputDrive out of range (0-10)")
  238. }
  239. }
  240. if u.PilotLevel != nil {
  241. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  242. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  243. }
  244. }
  245. if u.RDSInjection != nil {
  246. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  247. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  248. }
  249. }
  250. if u.LimiterCeiling != nil {
  251. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  252. return fmt.Errorf("limiterCeiling out of range (0-2)")
  253. }
  254. }
  255. // --- Frequency: store for run loop to apply via driver.Tune() ---
  256. if u.FrequencyMHz != nil {
  257. freqHz := *u.FrequencyMHz * 1e6
  258. e.pendingFreq.Store(&freqHz)
  259. }
  260. // --- RDS text: forward to encoder atomics ---
  261. if u.PS != nil || u.RadioText != nil {
  262. if enc := e.generator.RDSEncoder(); enc != nil {
  263. ps, rt := "", ""
  264. if u.PS != nil {
  265. ps = *u.PS
  266. }
  267. if u.RadioText != nil {
  268. rt = *u.RadioText
  269. }
  270. enc.UpdateText(ps, rt)
  271. }
  272. }
  273. // --- DSP params: build new LiveParams from current + patch ---
  274. // Read current, apply deltas, store new
  275. current := e.generator.CurrentLiveParams()
  276. next := current // copy
  277. if u.OutputDrive != nil {
  278. next.OutputDrive = *u.OutputDrive
  279. }
  280. if u.StereoEnabled != nil {
  281. next.StereoEnabled = *u.StereoEnabled
  282. }
  283. if u.PilotLevel != nil {
  284. next.PilotLevel = *u.PilotLevel
  285. }
  286. if u.RDSInjection != nil {
  287. next.RDSInjection = *u.RDSInjection
  288. }
  289. if u.RDSEnabled != nil {
  290. next.RDSEnabled = *u.RDSEnabled
  291. }
  292. if u.LimiterEnabled != nil {
  293. next.LimiterEnabled = *u.LimiterEnabled
  294. }
  295. if u.LimiterCeiling != nil {
  296. next.LimiterCeiling = *u.LimiterCeiling
  297. }
  298. e.generator.UpdateLive(next)
  299. return nil
  300. }
  301. func (e *Engine) Start(ctx context.Context) error {
  302. e.mu.Lock()
  303. if e.state != EngineIdle {
  304. e.mu.Unlock()
  305. return fmt.Errorf("engine already in state %s", e.state)
  306. }
  307. if err := e.driver.Start(ctx); err != nil {
  308. e.mu.Unlock()
  309. return fmt.Errorf("driver start: %w", err)
  310. }
  311. runCtx, cancel := context.WithCancel(ctx)
  312. e.cancel = cancel
  313. e.state = EngineRunning
  314. e.setRuntimeState(RuntimeStateArming)
  315. e.startedAt = time.Now()
  316. e.wg.Add(1)
  317. e.mu.Unlock()
  318. go e.run(runCtx)
  319. return nil
  320. }
  321. func (e *Engine) Stop(ctx context.Context) error {
  322. e.mu.Lock()
  323. if e.state != EngineRunning {
  324. e.mu.Unlock()
  325. return nil
  326. }
  327. e.state = EngineStopping
  328. e.setRuntimeState(RuntimeStateStopping)
  329. e.cancel()
  330. e.mu.Unlock()
  331. // Wait for run() goroutine to exit — deterministic, no guessing
  332. e.wg.Wait()
  333. if err := e.driver.Flush(ctx); err != nil {
  334. return err
  335. }
  336. if err := e.driver.Stop(ctx); err != nil {
  337. return err
  338. }
  339. e.mu.Lock()
  340. e.state = EngineIdle
  341. e.setRuntimeState(RuntimeStateIdle)
  342. e.mu.Unlock()
  343. return nil
  344. }
  345. func (e *Engine) Stats() EngineStats {
  346. e.mu.Lock()
  347. state := e.state
  348. startedAt := e.startedAt
  349. e.mu.Unlock()
  350. var uptime float64
  351. if state == EngineRunning {
  352. uptime = time.Since(startedAt).Seconds()
  353. }
  354. errVal, _ := e.lastError.Load().(string)
  355. queue := e.frameQueue.Stats()
  356. lateBuffers := e.lateBuffers.Load()
  357. hasRecentLateBuffers := e.hasRecentLateBuffers()
  358. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  359. lastFault := e.lastFaultEvent()
  360. return EngineStats{
  361. State: string(e.currentRuntimeState()),
  362. ChunksProduced: e.chunksProduced.Load(),
  363. TotalSamples: e.totalSamples.Load(),
  364. Underruns: e.underruns.Load(),
  365. LateBuffers: lateBuffers,
  366. LastError: errVal,
  367. UptimeSeconds: uptime,
  368. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  369. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  370. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  371. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  372. Queue: queue,
  373. RuntimeIndicator: ri,
  374. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  375. LastFault: lastFault,
  376. DegradedTransitions: e.degradedTransitions.Load(),
  377. MutedTransitions: e.mutedTransitions.Load(),
  378. FaultedTransitions: e.faultedTransitions.Load(),
  379. FaultCount: e.faultEvents.Load(),
  380. }
  381. }
  382. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  383. switch {
  384. case queueHealth == output.QueueHealthCritical:
  385. return RuntimeIndicatorQueueCritical
  386. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  387. return RuntimeIndicatorDegraded
  388. default:
  389. return RuntimeIndicatorNormal
  390. }
  391. }
  392. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  393. switch {
  394. case queueHealth == output.QueueHealthCritical:
  395. return "queue health critical"
  396. case recentLateBuffers:
  397. return "late buffers"
  398. case queueHealth == output.QueueHealthLow:
  399. return "queue health low"
  400. default:
  401. return ""
  402. }
  403. }
  404. func (e *Engine) run(ctx context.Context) {
  405. e.setRuntimeState(RuntimeStatePrebuffering)
  406. e.wg.Add(1)
  407. go e.writerLoop(ctx)
  408. defer e.wg.Done()
  409. for {
  410. if ctx.Err() != nil {
  411. return
  412. }
  413. // Apply pending frequency change between chunks
  414. if pf := e.pendingFreq.Swap(nil); pf != nil {
  415. if err := e.driver.Tune(ctx, *pf); err != nil {
  416. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  417. } else {
  418. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  419. }
  420. }
  421. t0 := time.Now()
  422. frame := e.generator.GenerateFrame(e.chunkDuration)
  423. frame.GeneratedAt = t0
  424. t1 := time.Now()
  425. if e.upsampler != nil {
  426. frame = e.upsampler.Process(frame)
  427. frame.GeneratedAt = t0
  428. }
  429. t2 := time.Now()
  430. genDur := t1.Sub(t0)
  431. upDur := t2.Sub(t1)
  432. updateMaxDuration(&e.maxGenerateNs, genDur)
  433. updateMaxDuration(&e.maxUpsampleNs, upDur)
  434. enqueued := cloneFrame(frame)
  435. if enqueued == nil {
  436. e.lastError.Store("engine: frame clone failed")
  437. e.underruns.Add(1)
  438. continue
  439. }
  440. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  441. if ctx.Err() != nil {
  442. return
  443. }
  444. if errors.Is(err, output.ErrFrameQueueClosed) {
  445. return
  446. }
  447. e.lastError.Store(err.Error())
  448. e.underruns.Add(1)
  449. select {
  450. case <-time.After(e.chunkDuration):
  451. case <-ctx.Done():
  452. return
  453. }
  454. continue
  455. }
  456. queueStats := e.frameQueue.Stats()
  457. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  458. }
  459. }
  460. func (e *Engine) writerLoop(ctx context.Context) {
  461. defer e.wg.Done()
  462. for {
  463. frame, err := e.frameQueue.Pop(ctx)
  464. if err != nil {
  465. if ctx.Err() != nil {
  466. return
  467. }
  468. if errors.Is(err, output.ErrFrameQueueClosed) {
  469. return
  470. }
  471. e.lastError.Store(err.Error())
  472. e.underruns.Add(1)
  473. continue
  474. }
  475. writeStart := time.Now()
  476. n, err := e.driver.Write(ctx, frame)
  477. writeDur := time.Since(writeStart)
  478. cycleDur := writeDur
  479. if !frame.GeneratedAt.IsZero() {
  480. cycleDur = time.Since(frame.GeneratedAt)
  481. }
  482. updateMaxDuration(&e.maxWriteNs, writeDur)
  483. updateMaxDuration(&e.maxCycleNs, cycleDur)
  484. queueStats := e.frameQueue.Stats()
  485. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  486. if cycleDur > e.chunkDuration {
  487. late := e.lateBuffers.Add(1)
  488. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  489. if late <= 5 || late%20 == 0 {
  490. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  491. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  492. }
  493. }
  494. if err != nil {
  495. if ctx.Err() != nil {
  496. return
  497. }
  498. e.lastError.Store(err.Error())
  499. e.underruns.Add(1)
  500. select {
  501. case <-time.After(e.chunkDuration):
  502. case <-ctx.Done():
  503. return
  504. }
  505. continue
  506. }
  507. e.chunksProduced.Add(1)
  508. e.totalSamples.Add(uint64(n))
  509. }
  510. }
  511. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  512. if src == nil {
  513. return nil
  514. }
  515. samples := make([]output.IQSample, len(src.Samples))
  516. copy(samples, src.Samples)
  517. return &output.CompositeFrame{
  518. Samples: samples,
  519. SampleRateHz: src.SampleRateHz,
  520. Timestamp: src.Timestamp,
  521. GeneratedAt: src.GeneratedAt,
  522. Sequence: src.Sequence,
  523. }
  524. }
  525. func (e *Engine) setRuntimeState(state RuntimeState) {
  526. prev := e.currentRuntimeState()
  527. if prev != state {
  528. switch state {
  529. case RuntimeStateDegraded:
  530. e.degradedTransitions.Add(1)
  531. case RuntimeStateMuted:
  532. e.mutedTransitions.Add(1)
  533. case RuntimeStateFaulted:
  534. e.faultedTransitions.Add(1)
  535. }
  536. }
  537. e.runtimeState.Store(state)
  538. }
  539. func (e *Engine) currentRuntimeState() RuntimeState {
  540. if v := e.runtimeState.Load(); v != nil {
  541. if rs, ok := v.(RuntimeState); ok {
  542. return rs
  543. }
  544. }
  545. return RuntimeStateIdle
  546. }
  547. func (e *Engine) hasRecentLateBuffers() bool {
  548. lateAlertAt := e.lateBufferAlertAt.Load()
  549. if lateAlertAt == 0 {
  550. return false
  551. }
  552. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  553. }
  554. func (e *Engine) lastFaultEvent() *FaultEvent {
  555. return copyFaultEvent(e.loadLastFault())
  556. }
  557. // LastFault exposes the most recent captured fault, if any.
  558. func (e *Engine) LastFault() *FaultEvent {
  559. return e.lastFaultEvent()
  560. }
  561. func (e *Engine) FaultHistory() []FaultEvent {
  562. e.faultHistoryMu.Lock()
  563. defer e.faultHistoryMu.Unlock()
  564. history := make([]FaultEvent, len(e.faultHistory))
  565. copy(history, e.faultHistory)
  566. return history
  567. }
  568. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  569. if reason == "" {
  570. reason = FaultReasonUnknown
  571. }
  572. now := time.Now()
  573. if last := e.loadLastFault(); last != nil {
  574. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  575. return
  576. }
  577. }
  578. ev := &FaultEvent{
  579. Time: now,
  580. Reason: reason,
  581. Severity: severity,
  582. Message: message,
  583. }
  584. e.lastFault.Store(ev)
  585. e.appendFaultHistory(ev)
  586. e.faultEvents.Add(1)
  587. }
  588. func (e *Engine) loadLastFault() *FaultEvent {
  589. if v := e.lastFault.Load(); v != nil {
  590. if ev, ok := v.(*FaultEvent); ok {
  591. return ev
  592. }
  593. }
  594. return nil
  595. }
  596. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  597. if source == nil {
  598. return nil
  599. }
  600. copy := *source
  601. return &copy
  602. }
  603. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  604. e.faultHistoryMu.Lock()
  605. defer e.faultHistoryMu.Unlock()
  606. if len(e.faultHistory) >= faultHistoryCapacity {
  607. copy(e.faultHistory, e.faultHistory[1:])
  608. e.faultHistory[len(e.faultHistory)-1] = *ev
  609. return
  610. }
  611. e.faultHistory = append(e.faultHistory, *ev)
  612. }
  613. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  614. state := e.currentRuntimeState()
  615. switch state {
  616. case RuntimeStateStopping, RuntimeStateFaulted:
  617. return
  618. case RuntimeStateMuted:
  619. if queue.Health == output.QueueHealthCritical {
  620. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  621. e.mutedFaultStreak.Store(0)
  622. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  623. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  624. e.setRuntimeState(RuntimeStateFaulted)
  625. return
  626. }
  627. } else {
  628. e.mutedFaultStreak.Store(0)
  629. }
  630. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  631. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  632. e.mutedRecoveryStreak.Store(0)
  633. e.mutedFaultStreak.Store(0)
  634. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  635. fmt.Sprintf("queue healthy for %d checks after mute", count))
  636. e.setRuntimeState(RuntimeStateDegraded)
  637. }
  638. } else {
  639. e.mutedRecoveryStreak.Store(0)
  640. }
  641. return
  642. }
  643. if state == RuntimeStatePrebuffering {
  644. if queue.Depth >= 1 {
  645. e.setRuntimeState(RuntimeStateRunning)
  646. }
  647. return
  648. }
  649. critical := queue.Health == output.QueueHealthCritical
  650. if critical {
  651. count := e.criticalStreak.Add(1)
  652. if count >= queueMutedStreakThreshold {
  653. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  654. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  655. e.setRuntimeState(RuntimeStateMuted)
  656. return
  657. }
  658. if count >= queueCriticalStreakThreshold {
  659. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  660. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  661. e.setRuntimeState(RuntimeStateDegraded)
  662. return
  663. }
  664. } else {
  665. e.criticalStreak.Store(0)
  666. }
  667. if hasLateBuffers {
  668. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  669. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  670. e.setRuntimeState(RuntimeStateDegraded)
  671. return
  672. }
  673. e.setRuntimeState(RuntimeStateRunning)
  674. }
  675. // ResetFault attempts to move the engine out of the faulted state.
  676. func (e *Engine) ResetFault() error {
  677. state := e.currentRuntimeState()
  678. if state != RuntimeStateFaulted {
  679. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  680. }
  681. e.criticalStreak.Store(0)
  682. e.mutedRecoveryStreak.Store(0)
  683. e.mutedFaultStreak.Store(0)
  684. e.setRuntimeState(RuntimeStateDegraded)
  685. return nil
  686. }