Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

815 satır
22KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. ChunksProduced uint64 `json:"chunksProduced"`
  64. TotalSamples uint64 `json:"totalSamples"`
  65. Underruns uint64 `json:"underruns"`
  66. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  67. LastError string `json:"lastError,omitempty"`
  68. UptimeSeconds float64 `json:"uptimeSeconds"`
  69. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  70. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  71. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  72. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  73. Queue output.QueueStats `json:"queue"`
  74. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  75. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  76. LastFault *FaultEvent `json:"lastFault,omitempty"`
  77. DegradedTransitions uint64 `json:"degradedTransitions"`
  78. MutedTransitions uint64 `json:"mutedTransitions"`
  79. FaultedTransitions uint64 `json:"faultedTransitions"`
  80. FaultCount uint64 `json:"faultCount"`
  81. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  82. TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
  83. }
  84. type RuntimeIndicator string
  85. const (
  86. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  87. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  88. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  89. )
  90. type RuntimeTransition struct {
  91. Time time.Time `json:"time"`
  92. From RuntimeState `json:"from"`
  93. To RuntimeState `json:"to"`
  94. Severity string `json:"severity"`
  95. }
  96. const (
  97. lateBufferIndicatorWindow = 5 * time.Second
  98. queueCriticalStreakThreshold = 3
  99. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  100. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  101. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  102. faultRepeatWindow = 1 * time.Second
  103. faultHistoryCapacity = 8
  104. runtimeTransitionHistoryCapacity = 8
  105. )
  106. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  107. // resamples to device rate, and pushes to hardware in a tight loop.
  108. // The hardware buffer_push call is blocking — it returns when the hardware
  109. // has consumed the previous buffer and is ready for the next one.
  110. // This naturally paces the loop to real-time without a ticker.
  111. type Engine struct {
  112. cfg cfgpkg.Config
  113. driver platform.SoapyDriver
  114. generator *offpkg.Generator
  115. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  116. chunkDuration time.Duration
  117. deviceRate float64
  118. frameQueue *output.FrameQueue
  119. mu sync.Mutex
  120. state EngineState
  121. cancel context.CancelFunc
  122. startedAt time.Time
  123. wg sync.WaitGroup
  124. runtimeState atomic.Value
  125. chunksProduced atomic.Uint64
  126. totalSamples atomic.Uint64
  127. underruns atomic.Uint64
  128. lateBuffers atomic.Uint64
  129. lateBufferAlertAt atomic.Uint64
  130. criticalStreak atomic.Uint64
  131. mutedRecoveryStreak atomic.Uint64
  132. mutedFaultStreak atomic.Uint64
  133. maxCycleNs atomic.Uint64
  134. maxGenerateNs atomic.Uint64
  135. maxUpsampleNs atomic.Uint64
  136. maxWriteNs atomic.Uint64
  137. lastError atomic.Value // string
  138. lastFault atomic.Value // *FaultEvent
  139. faultHistoryMu sync.Mutex
  140. faultHistory []FaultEvent
  141. transitionHistoryMu sync.Mutex
  142. transitionHistory []RuntimeTransition
  143. degradedTransitions atomic.Uint64
  144. mutedTransitions atomic.Uint64
  145. faultedTransitions atomic.Uint64
  146. faultEvents atomic.Uint64
  147. // Live config: pending frequency change, applied between chunks
  148. pendingFreq atomic.Pointer[float64]
  149. // Live audio stream (optional)
  150. streamSrc *audio.StreamSource
  151. }
  152. // SetStreamSource configures a live audio stream as the audio source.
  153. // Must be called before Start(). The StreamResampler is created internally
  154. // to convert from the stream's sample rate to the DSP composite rate.
  155. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  156. e.streamSrc = src
  157. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  158. if compositeRate <= 0 {
  159. compositeRate = 228000
  160. }
  161. resampler := audio.NewStreamResampler(src, compositeRate)
  162. e.generator.SetExternalSource(resampler)
  163. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  164. src.SampleRate, compositeRate, src.Stats().Capacity)
  165. }
  166. // StreamSource returns the live audio stream source, or nil.
  167. // Used by the control server for stats and HTTP audio ingest.
  168. func (e *Engine) StreamSource() *audio.StreamSource {
  169. return e.streamSrc
  170. }
  171. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  172. deviceRate := cfg.EffectiveDeviceRate()
  173. compositeRate := float64(cfg.FM.CompositeRateHz)
  174. if compositeRate <= 0 {
  175. compositeRate = 228000
  176. }
  177. var upsampler *dsp.FMUpsampler
  178. if deviceRate > compositeRate*1.001 {
  179. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  180. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  181. // This halves CPU load compared to running all DSP at deviceRate.
  182. cfg.FM.FMModulationEnabled = false
  183. maxDev := cfg.FM.MaxDeviationHz
  184. if maxDev <= 0 {
  185. maxDev = 75000
  186. }
  187. // mpxGain scales the FM deviation to compensate for hardware
  188. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  189. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  190. maxDev *= cfg.FM.MpxGain
  191. }
  192. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  193. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  194. compositeRate, deviceRate, deviceRate/compositeRate)
  195. } else {
  196. // Same-rate: entire DSP chain runs at deviceRate.
  197. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  198. if deviceRate > 0 {
  199. cfg.FM.CompositeRateHz = int(deviceRate)
  200. }
  201. cfg.FM.FMModulationEnabled = true
  202. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  203. }
  204. engine := &Engine{
  205. cfg: cfg,
  206. driver: driver,
  207. generator: offpkg.NewGenerator(cfg),
  208. upsampler: upsampler,
  209. chunkDuration: 50 * time.Millisecond,
  210. deviceRate: deviceRate,
  211. state: EngineIdle,
  212. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  213. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  214. transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
  215. }
  216. engine.setRuntimeState(RuntimeStateIdle)
  217. return engine
  218. }
  219. func (e *Engine) SetChunkDuration(d time.Duration) {
  220. e.chunkDuration = d
  221. }
  222. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  223. // nil pointers mean "no change". Validated before applying.
  224. type LiveConfigUpdate struct {
  225. FrequencyMHz *float64
  226. OutputDrive *float64
  227. StereoEnabled *bool
  228. PilotLevel *float64
  229. RDSInjection *float64
  230. RDSEnabled *bool
  231. LimiterEnabled *bool
  232. LimiterCeiling *float64
  233. PS *string
  234. RadioText *string
  235. }
  236. // UpdateConfig applies live parameter changes without restarting the engine.
  237. // DSP params take effect at the next chunk boundary (~50ms max).
  238. // Frequency changes are applied between chunks via driver.Tune().
  239. // RDS text updates are applied at the next RDS group boundary (~88ms).
  240. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  241. // --- Validate ---
  242. if u.FrequencyMHz != nil {
  243. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  244. return fmt.Errorf("frequencyMHz out of range (65-110)")
  245. }
  246. }
  247. if u.OutputDrive != nil {
  248. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  249. return fmt.Errorf("outputDrive out of range (0-10)")
  250. }
  251. }
  252. if u.PilotLevel != nil {
  253. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  254. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  255. }
  256. }
  257. if u.RDSInjection != nil {
  258. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  259. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  260. }
  261. }
  262. if u.LimiterCeiling != nil {
  263. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  264. return fmt.Errorf("limiterCeiling out of range (0-2)")
  265. }
  266. }
  267. // --- Frequency: store for run loop to apply via driver.Tune() ---
  268. if u.FrequencyMHz != nil {
  269. freqHz := *u.FrequencyMHz * 1e6
  270. e.pendingFreq.Store(&freqHz)
  271. }
  272. // --- RDS text: forward to encoder atomics ---
  273. if u.PS != nil || u.RadioText != nil {
  274. if enc := e.generator.RDSEncoder(); enc != nil {
  275. ps, rt := "", ""
  276. if u.PS != nil {
  277. ps = *u.PS
  278. }
  279. if u.RadioText != nil {
  280. rt = *u.RadioText
  281. }
  282. enc.UpdateText(ps, rt)
  283. }
  284. }
  285. // --- DSP params: build new LiveParams from current + patch ---
  286. // Read current, apply deltas, store new
  287. current := e.generator.CurrentLiveParams()
  288. next := current // copy
  289. if u.OutputDrive != nil {
  290. next.OutputDrive = *u.OutputDrive
  291. }
  292. if u.StereoEnabled != nil {
  293. next.StereoEnabled = *u.StereoEnabled
  294. }
  295. if u.PilotLevel != nil {
  296. next.PilotLevel = *u.PilotLevel
  297. }
  298. if u.RDSInjection != nil {
  299. next.RDSInjection = *u.RDSInjection
  300. }
  301. if u.RDSEnabled != nil {
  302. next.RDSEnabled = *u.RDSEnabled
  303. }
  304. if u.LimiterEnabled != nil {
  305. next.LimiterEnabled = *u.LimiterEnabled
  306. }
  307. if u.LimiterCeiling != nil {
  308. next.LimiterCeiling = *u.LimiterCeiling
  309. }
  310. e.generator.UpdateLive(next)
  311. return nil
  312. }
  313. func (e *Engine) Start(ctx context.Context) error {
  314. e.mu.Lock()
  315. if e.state != EngineIdle {
  316. e.mu.Unlock()
  317. return fmt.Errorf("engine already in state %s", e.state)
  318. }
  319. if err := e.driver.Start(ctx); err != nil {
  320. e.mu.Unlock()
  321. return fmt.Errorf("driver start: %w", err)
  322. }
  323. runCtx, cancel := context.WithCancel(ctx)
  324. e.cancel = cancel
  325. e.state = EngineRunning
  326. e.setRuntimeState(RuntimeStateArming)
  327. e.startedAt = time.Now()
  328. e.wg.Add(1)
  329. e.mu.Unlock()
  330. go e.run(runCtx)
  331. return nil
  332. }
  333. func (e *Engine) Stop(ctx context.Context) error {
  334. e.mu.Lock()
  335. if e.state != EngineRunning {
  336. e.mu.Unlock()
  337. return nil
  338. }
  339. e.state = EngineStopping
  340. e.setRuntimeState(RuntimeStateStopping)
  341. e.cancel()
  342. e.mu.Unlock()
  343. // Wait for run() goroutine to exit — deterministic, no guessing
  344. e.wg.Wait()
  345. if err := e.driver.Flush(ctx); err != nil {
  346. return err
  347. }
  348. if err := e.driver.Stop(ctx); err != nil {
  349. return err
  350. }
  351. e.mu.Lock()
  352. e.state = EngineIdle
  353. e.setRuntimeState(RuntimeStateIdle)
  354. e.mu.Unlock()
  355. return nil
  356. }
  357. func (e *Engine) Stats() EngineStats {
  358. e.mu.Lock()
  359. state := e.state
  360. startedAt := e.startedAt
  361. e.mu.Unlock()
  362. var uptime float64
  363. if state == EngineRunning {
  364. uptime = time.Since(startedAt).Seconds()
  365. }
  366. errVal, _ := e.lastError.Load().(string)
  367. queue := e.frameQueue.Stats()
  368. lateBuffers := e.lateBuffers.Load()
  369. hasRecentLateBuffers := e.hasRecentLateBuffers()
  370. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  371. lastFault := e.lastFaultEvent()
  372. return EngineStats{
  373. State: string(e.currentRuntimeState()),
  374. ChunksProduced: e.chunksProduced.Load(),
  375. TotalSamples: e.totalSamples.Load(),
  376. Underruns: e.underruns.Load(),
  377. LateBuffers: lateBuffers,
  378. LastError: errVal,
  379. UptimeSeconds: uptime,
  380. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  381. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  382. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  383. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  384. Queue: queue,
  385. RuntimeIndicator: ri,
  386. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  387. LastFault: lastFault,
  388. DegradedTransitions: e.degradedTransitions.Load(),
  389. MutedTransitions: e.mutedTransitions.Load(),
  390. FaultedTransitions: e.faultedTransitions.Load(),
  391. FaultCount: e.faultEvents.Load(),
  392. FaultHistory: e.FaultHistory(),
  393. TransitionHistory: e.TransitionHistory(),
  394. }
  395. }
  396. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  397. switch {
  398. case queueHealth == output.QueueHealthCritical:
  399. return RuntimeIndicatorQueueCritical
  400. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  401. return RuntimeIndicatorDegraded
  402. default:
  403. return RuntimeIndicatorNormal
  404. }
  405. }
  406. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  407. switch {
  408. case queueHealth == output.QueueHealthCritical:
  409. return "queue health critical"
  410. case recentLateBuffers:
  411. return "late buffers"
  412. case queueHealth == output.QueueHealthLow:
  413. return "queue health low"
  414. default:
  415. return ""
  416. }
  417. }
  418. func runtimeStateSeverity(state RuntimeState) string {
  419. switch state {
  420. case RuntimeStateRunning:
  421. return "ok"
  422. case RuntimeStateDegraded, RuntimeStateMuted:
  423. return "warn"
  424. case RuntimeStateFaulted:
  425. return "err"
  426. default:
  427. return "info"
  428. }
  429. }
  430. func (e *Engine) run(ctx context.Context) {
  431. e.setRuntimeState(RuntimeStatePrebuffering)
  432. e.wg.Add(1)
  433. go e.writerLoop(ctx)
  434. defer e.wg.Done()
  435. for {
  436. if ctx.Err() != nil {
  437. return
  438. }
  439. // Apply pending frequency change between chunks
  440. if pf := e.pendingFreq.Swap(nil); pf != nil {
  441. if err := e.driver.Tune(ctx, *pf); err != nil {
  442. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  443. } else {
  444. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  445. }
  446. }
  447. t0 := time.Now()
  448. frame := e.generator.GenerateFrame(e.chunkDuration)
  449. frame.GeneratedAt = t0
  450. t1 := time.Now()
  451. if e.upsampler != nil {
  452. frame = e.upsampler.Process(frame)
  453. frame.GeneratedAt = t0
  454. }
  455. t2 := time.Now()
  456. genDur := t1.Sub(t0)
  457. upDur := t2.Sub(t1)
  458. updateMaxDuration(&e.maxGenerateNs, genDur)
  459. updateMaxDuration(&e.maxUpsampleNs, upDur)
  460. enqueued := cloneFrame(frame)
  461. if enqueued == nil {
  462. e.lastError.Store("engine: frame clone failed")
  463. e.underruns.Add(1)
  464. continue
  465. }
  466. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  467. if ctx.Err() != nil {
  468. return
  469. }
  470. if errors.Is(err, output.ErrFrameQueueClosed) {
  471. return
  472. }
  473. e.lastError.Store(err.Error())
  474. e.underruns.Add(1)
  475. select {
  476. case <-time.After(e.chunkDuration):
  477. case <-ctx.Done():
  478. return
  479. }
  480. continue
  481. }
  482. queueStats := e.frameQueue.Stats()
  483. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  484. }
  485. }
  486. func (e *Engine) writerLoop(ctx context.Context) {
  487. defer e.wg.Done()
  488. for {
  489. frame, err := e.frameQueue.Pop(ctx)
  490. if err != nil {
  491. if ctx.Err() != nil {
  492. return
  493. }
  494. if errors.Is(err, output.ErrFrameQueueClosed) {
  495. return
  496. }
  497. e.lastError.Store(err.Error())
  498. e.underruns.Add(1)
  499. continue
  500. }
  501. writeStart := time.Now()
  502. n, err := e.driver.Write(ctx, frame)
  503. writeDur := time.Since(writeStart)
  504. cycleDur := writeDur
  505. if !frame.GeneratedAt.IsZero() {
  506. cycleDur = time.Since(frame.GeneratedAt)
  507. }
  508. updateMaxDuration(&e.maxWriteNs, writeDur)
  509. updateMaxDuration(&e.maxCycleNs, cycleDur)
  510. queueStats := e.frameQueue.Stats()
  511. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  512. if cycleDur > e.chunkDuration {
  513. late := e.lateBuffers.Add(1)
  514. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  515. if late <= 5 || late%20 == 0 {
  516. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  517. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  518. }
  519. }
  520. if err != nil {
  521. if ctx.Err() != nil {
  522. return
  523. }
  524. e.lastError.Store(err.Error())
  525. e.underruns.Add(1)
  526. select {
  527. case <-time.After(e.chunkDuration):
  528. case <-ctx.Done():
  529. return
  530. }
  531. continue
  532. }
  533. e.chunksProduced.Add(1)
  534. e.totalSamples.Add(uint64(n))
  535. }
  536. }
  537. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  538. if src == nil {
  539. return nil
  540. }
  541. samples := make([]output.IQSample, len(src.Samples))
  542. copy(samples, src.Samples)
  543. return &output.CompositeFrame{
  544. Samples: samples,
  545. SampleRateHz: src.SampleRateHz,
  546. Timestamp: src.Timestamp,
  547. GeneratedAt: src.GeneratedAt,
  548. Sequence: src.Sequence,
  549. }
  550. }
  551. func (e *Engine) setRuntimeState(state RuntimeState) {
  552. prev := e.currentRuntimeState()
  553. if prev != state {
  554. e.recordRuntimeTransition(prev, state)
  555. switch state {
  556. case RuntimeStateDegraded:
  557. e.degradedTransitions.Add(1)
  558. case RuntimeStateMuted:
  559. e.mutedTransitions.Add(1)
  560. case RuntimeStateFaulted:
  561. e.faultedTransitions.Add(1)
  562. }
  563. }
  564. e.runtimeState.Store(state)
  565. }
  566. func (e *Engine) currentRuntimeState() RuntimeState {
  567. if v := e.runtimeState.Load(); v != nil {
  568. if rs, ok := v.(RuntimeState); ok {
  569. return rs
  570. }
  571. }
  572. return RuntimeStateIdle
  573. }
  574. func (e *Engine) hasRecentLateBuffers() bool {
  575. lateAlertAt := e.lateBufferAlertAt.Load()
  576. if lateAlertAt == 0 {
  577. return false
  578. }
  579. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  580. }
  581. func (e *Engine) lastFaultEvent() *FaultEvent {
  582. return copyFaultEvent(e.loadLastFault())
  583. }
  584. // LastFault exposes the most recent captured fault, if any.
  585. func (e *Engine) LastFault() *FaultEvent {
  586. return e.lastFaultEvent()
  587. }
  588. func (e *Engine) FaultHistory() []FaultEvent {
  589. e.faultHistoryMu.Lock()
  590. defer e.faultHistoryMu.Unlock()
  591. history := make([]FaultEvent, len(e.faultHistory))
  592. copy(history, e.faultHistory)
  593. return history
  594. }
  595. func (e *Engine) TransitionHistory() []RuntimeTransition {
  596. e.transitionHistoryMu.Lock()
  597. defer e.transitionHistoryMu.Unlock()
  598. history := make([]RuntimeTransition, len(e.transitionHistory))
  599. copy(history, e.transitionHistory)
  600. return history
  601. }
  602. func (e *Engine) recordRuntimeTransition(from, to RuntimeState) {
  603. ev := RuntimeTransition{
  604. Time: time.Now(),
  605. From: from,
  606. To: to,
  607. Severity: runtimeStateSeverity(to),
  608. }
  609. e.transitionHistoryMu.Lock()
  610. defer e.transitionHistoryMu.Unlock()
  611. if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
  612. copy(e.transitionHistory, e.transitionHistory[1:])
  613. e.transitionHistory[len(e.transitionHistory)-1] = ev
  614. return
  615. }
  616. e.transitionHistory = append(e.transitionHistory, ev)
  617. }
  618. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  619. if reason == "" {
  620. reason = FaultReasonUnknown
  621. }
  622. now := time.Now()
  623. if last := e.loadLastFault(); last != nil {
  624. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  625. return
  626. }
  627. }
  628. ev := &FaultEvent{
  629. Time: now,
  630. Reason: reason,
  631. Severity: severity,
  632. Message: message,
  633. }
  634. e.lastFault.Store(ev)
  635. e.appendFaultHistory(ev)
  636. e.faultEvents.Add(1)
  637. }
  638. func (e *Engine) loadLastFault() *FaultEvent {
  639. if v := e.lastFault.Load(); v != nil {
  640. if ev, ok := v.(*FaultEvent); ok {
  641. return ev
  642. }
  643. }
  644. return nil
  645. }
  646. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  647. if source == nil {
  648. return nil
  649. }
  650. copy := *source
  651. return &copy
  652. }
  653. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  654. e.faultHistoryMu.Lock()
  655. defer e.faultHistoryMu.Unlock()
  656. if len(e.faultHistory) >= faultHistoryCapacity {
  657. copy(e.faultHistory, e.faultHistory[1:])
  658. e.faultHistory[len(e.faultHistory)-1] = *ev
  659. return
  660. }
  661. e.faultHistory = append(e.faultHistory, *ev)
  662. }
  663. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  664. state := e.currentRuntimeState()
  665. switch state {
  666. case RuntimeStateStopping, RuntimeStateFaulted:
  667. return
  668. case RuntimeStateMuted:
  669. if queue.Health == output.QueueHealthCritical {
  670. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  671. e.mutedFaultStreak.Store(0)
  672. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  673. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  674. e.setRuntimeState(RuntimeStateFaulted)
  675. return
  676. }
  677. } else {
  678. e.mutedFaultStreak.Store(0)
  679. }
  680. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  681. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  682. e.mutedRecoveryStreak.Store(0)
  683. e.mutedFaultStreak.Store(0)
  684. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  685. fmt.Sprintf("queue healthy for %d checks after mute", count))
  686. e.setRuntimeState(RuntimeStateDegraded)
  687. }
  688. } else {
  689. e.mutedRecoveryStreak.Store(0)
  690. }
  691. return
  692. }
  693. if state == RuntimeStatePrebuffering {
  694. if queue.Depth >= 1 {
  695. e.setRuntimeState(RuntimeStateRunning)
  696. }
  697. return
  698. }
  699. critical := queue.Health == output.QueueHealthCritical
  700. if critical {
  701. count := e.criticalStreak.Add(1)
  702. if count >= queueMutedStreakThreshold {
  703. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  704. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  705. e.setRuntimeState(RuntimeStateMuted)
  706. return
  707. }
  708. if count >= queueCriticalStreakThreshold {
  709. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  710. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  711. e.setRuntimeState(RuntimeStateDegraded)
  712. return
  713. }
  714. } else {
  715. e.criticalStreak.Store(0)
  716. }
  717. if hasLateBuffers {
  718. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  719. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  720. e.setRuntimeState(RuntimeStateDegraded)
  721. return
  722. }
  723. e.setRuntimeState(RuntimeStateRunning)
  724. }
  725. // ResetFault attempts to move the engine out of the faulted state.
  726. func (e *Engine) ResetFault() error {
  727. state := e.currentRuntimeState()
  728. if state != RuntimeStateFaulted {
  729. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  730. }
  731. e.criticalStreak.Store(0)
  732. e.mutedRecoveryStreak.Store(0)
  733. e.mutedFaultStreak.Store(0)
  734. e.setRuntimeState(RuntimeStateDegraded)
  735. return nil
  736. }