Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

763 строки
21KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. ChunksProduced uint64 `json:"chunksProduced"`
  64. TotalSamples uint64 `json:"totalSamples"`
  65. Underruns uint64 `json:"underruns"`
  66. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  67. LastError string `json:"lastError,omitempty"`
  68. UptimeSeconds float64 `json:"uptimeSeconds"`
  69. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  70. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  71. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  72. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  73. Queue output.QueueStats `json:"queue"`
  74. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  75. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  76. LastFault *FaultEvent `json:"lastFault,omitempty"`
  77. DegradedTransitions uint64 `json:"degradedTransitions"`
  78. MutedTransitions uint64 `json:"mutedTransitions"`
  79. FaultedTransitions uint64 `json:"faultedTransitions"`
  80. FaultCount uint64 `json:"faultCount"`
  81. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  82. }
  83. type RuntimeIndicator string
  84. const (
  85. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  86. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  87. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  88. )
  89. const (
  90. lateBufferIndicatorWindow = 5 * time.Second
  91. queueCriticalStreakThreshold = 3
  92. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  93. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  94. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  95. faultRepeatWindow = 1 * time.Second
  96. faultHistoryCapacity = 8
  97. )
  98. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  99. // resamples to device rate, and pushes to hardware in a tight loop.
  100. // The hardware buffer_push call is blocking — it returns when the hardware
  101. // has consumed the previous buffer and is ready for the next one.
  102. // This naturally paces the loop to real-time without a ticker.
  103. type Engine struct {
  104. cfg cfgpkg.Config
  105. driver platform.SoapyDriver
  106. generator *offpkg.Generator
  107. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  108. chunkDuration time.Duration
  109. deviceRate float64
  110. frameQueue *output.FrameQueue
  111. mu sync.Mutex
  112. state EngineState
  113. cancel context.CancelFunc
  114. startedAt time.Time
  115. wg sync.WaitGroup
  116. runtimeState atomic.Value
  117. chunksProduced atomic.Uint64
  118. totalSamples atomic.Uint64
  119. underruns atomic.Uint64
  120. lateBuffers atomic.Uint64
  121. lateBufferAlertAt atomic.Uint64
  122. criticalStreak atomic.Uint64
  123. mutedRecoveryStreak atomic.Uint64
  124. mutedFaultStreak atomic.Uint64
  125. maxCycleNs atomic.Uint64
  126. maxGenerateNs atomic.Uint64
  127. maxUpsampleNs atomic.Uint64
  128. maxWriteNs atomic.Uint64
  129. lastError atomic.Value // string
  130. lastFault atomic.Value // *FaultEvent
  131. faultHistoryMu sync.Mutex
  132. faultHistory []FaultEvent
  133. degradedTransitions atomic.Uint64
  134. mutedTransitions atomic.Uint64
  135. faultedTransitions atomic.Uint64
  136. faultEvents atomic.Uint64
  137. // Live config: pending frequency change, applied between chunks
  138. pendingFreq atomic.Pointer[float64]
  139. // Live audio stream (optional)
  140. streamSrc *audio.StreamSource
  141. }
  142. // SetStreamSource configures a live audio stream as the audio source.
  143. // Must be called before Start(). The StreamResampler is created internally
  144. // to convert from the stream's sample rate to the DSP composite rate.
  145. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  146. e.streamSrc = src
  147. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  148. if compositeRate <= 0 {
  149. compositeRate = 228000
  150. }
  151. resampler := audio.NewStreamResampler(src, compositeRate)
  152. e.generator.SetExternalSource(resampler)
  153. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  154. src.SampleRate, compositeRate, src.Stats().Capacity)
  155. }
  156. // StreamSource returns the live audio stream source, or nil.
  157. // Used by the control server for stats and HTTP audio ingest.
  158. func (e *Engine) StreamSource() *audio.StreamSource {
  159. return e.streamSrc
  160. }
  161. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  162. deviceRate := cfg.EffectiveDeviceRate()
  163. compositeRate := float64(cfg.FM.CompositeRateHz)
  164. if compositeRate <= 0 {
  165. compositeRate = 228000
  166. }
  167. var upsampler *dsp.FMUpsampler
  168. if deviceRate > compositeRate*1.001 {
  169. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  170. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  171. // This halves CPU load compared to running all DSP at deviceRate.
  172. cfg.FM.FMModulationEnabled = false
  173. maxDev := cfg.FM.MaxDeviationHz
  174. if maxDev <= 0 {
  175. maxDev = 75000
  176. }
  177. // mpxGain scales the FM deviation to compensate for hardware
  178. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  179. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  180. maxDev *= cfg.FM.MpxGain
  181. }
  182. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  183. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  184. compositeRate, deviceRate, deviceRate/compositeRate)
  185. } else {
  186. // Same-rate: entire DSP chain runs at deviceRate.
  187. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  188. if deviceRate > 0 {
  189. cfg.FM.CompositeRateHz = int(deviceRate)
  190. }
  191. cfg.FM.FMModulationEnabled = true
  192. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  193. }
  194. engine := &Engine{
  195. cfg: cfg,
  196. driver: driver,
  197. generator: offpkg.NewGenerator(cfg),
  198. upsampler: upsampler,
  199. chunkDuration: 50 * time.Millisecond,
  200. deviceRate: deviceRate,
  201. state: EngineIdle,
  202. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  203. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  204. }
  205. engine.setRuntimeState(RuntimeStateIdle)
  206. return engine
  207. }
  208. func (e *Engine) SetChunkDuration(d time.Duration) {
  209. e.chunkDuration = d
  210. }
  211. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  212. // nil pointers mean "no change". Validated before applying.
  213. type LiveConfigUpdate struct {
  214. FrequencyMHz *float64
  215. OutputDrive *float64
  216. StereoEnabled *bool
  217. PilotLevel *float64
  218. RDSInjection *float64
  219. RDSEnabled *bool
  220. LimiterEnabled *bool
  221. LimiterCeiling *float64
  222. PS *string
  223. RadioText *string
  224. }
  225. // UpdateConfig applies live parameter changes without restarting the engine.
  226. // DSP params take effect at the next chunk boundary (~50ms max).
  227. // Frequency changes are applied between chunks via driver.Tune().
  228. // RDS text updates are applied at the next RDS group boundary (~88ms).
  229. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  230. // --- Validate ---
  231. if u.FrequencyMHz != nil {
  232. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  233. return fmt.Errorf("frequencyMHz out of range (65-110)")
  234. }
  235. }
  236. if u.OutputDrive != nil {
  237. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  238. return fmt.Errorf("outputDrive out of range (0-10)")
  239. }
  240. }
  241. if u.PilotLevel != nil {
  242. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  243. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  244. }
  245. }
  246. if u.RDSInjection != nil {
  247. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  248. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  249. }
  250. }
  251. if u.LimiterCeiling != nil {
  252. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  253. return fmt.Errorf("limiterCeiling out of range (0-2)")
  254. }
  255. }
  256. // --- Frequency: store for run loop to apply via driver.Tune() ---
  257. if u.FrequencyMHz != nil {
  258. freqHz := *u.FrequencyMHz * 1e6
  259. e.pendingFreq.Store(&freqHz)
  260. }
  261. // --- RDS text: forward to encoder atomics ---
  262. if u.PS != nil || u.RadioText != nil {
  263. if enc := e.generator.RDSEncoder(); enc != nil {
  264. ps, rt := "", ""
  265. if u.PS != nil {
  266. ps = *u.PS
  267. }
  268. if u.RadioText != nil {
  269. rt = *u.RadioText
  270. }
  271. enc.UpdateText(ps, rt)
  272. }
  273. }
  274. // --- DSP params: build new LiveParams from current + patch ---
  275. // Read current, apply deltas, store new
  276. current := e.generator.CurrentLiveParams()
  277. next := current // copy
  278. if u.OutputDrive != nil {
  279. next.OutputDrive = *u.OutputDrive
  280. }
  281. if u.StereoEnabled != nil {
  282. next.StereoEnabled = *u.StereoEnabled
  283. }
  284. if u.PilotLevel != nil {
  285. next.PilotLevel = *u.PilotLevel
  286. }
  287. if u.RDSInjection != nil {
  288. next.RDSInjection = *u.RDSInjection
  289. }
  290. if u.RDSEnabled != nil {
  291. next.RDSEnabled = *u.RDSEnabled
  292. }
  293. if u.LimiterEnabled != nil {
  294. next.LimiterEnabled = *u.LimiterEnabled
  295. }
  296. if u.LimiterCeiling != nil {
  297. next.LimiterCeiling = *u.LimiterCeiling
  298. }
  299. e.generator.UpdateLive(next)
  300. return nil
  301. }
  302. func (e *Engine) Start(ctx context.Context) error {
  303. e.mu.Lock()
  304. if e.state != EngineIdle {
  305. e.mu.Unlock()
  306. return fmt.Errorf("engine already in state %s", e.state)
  307. }
  308. if err := e.driver.Start(ctx); err != nil {
  309. e.mu.Unlock()
  310. return fmt.Errorf("driver start: %w", err)
  311. }
  312. runCtx, cancel := context.WithCancel(ctx)
  313. e.cancel = cancel
  314. e.state = EngineRunning
  315. e.setRuntimeState(RuntimeStateArming)
  316. e.startedAt = time.Now()
  317. e.wg.Add(1)
  318. e.mu.Unlock()
  319. go e.run(runCtx)
  320. return nil
  321. }
  322. func (e *Engine) Stop(ctx context.Context) error {
  323. e.mu.Lock()
  324. if e.state != EngineRunning {
  325. e.mu.Unlock()
  326. return nil
  327. }
  328. e.state = EngineStopping
  329. e.setRuntimeState(RuntimeStateStopping)
  330. e.cancel()
  331. e.mu.Unlock()
  332. // Wait for run() goroutine to exit — deterministic, no guessing
  333. e.wg.Wait()
  334. if err := e.driver.Flush(ctx); err != nil {
  335. return err
  336. }
  337. if err := e.driver.Stop(ctx); err != nil {
  338. return err
  339. }
  340. e.mu.Lock()
  341. e.state = EngineIdle
  342. e.setRuntimeState(RuntimeStateIdle)
  343. e.mu.Unlock()
  344. return nil
  345. }
  346. func (e *Engine) Stats() EngineStats {
  347. e.mu.Lock()
  348. state := e.state
  349. startedAt := e.startedAt
  350. e.mu.Unlock()
  351. var uptime float64
  352. if state == EngineRunning {
  353. uptime = time.Since(startedAt).Seconds()
  354. }
  355. errVal, _ := e.lastError.Load().(string)
  356. queue := e.frameQueue.Stats()
  357. lateBuffers := e.lateBuffers.Load()
  358. hasRecentLateBuffers := e.hasRecentLateBuffers()
  359. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  360. lastFault := e.lastFaultEvent()
  361. return EngineStats{
  362. State: string(e.currentRuntimeState()),
  363. ChunksProduced: e.chunksProduced.Load(),
  364. TotalSamples: e.totalSamples.Load(),
  365. Underruns: e.underruns.Load(),
  366. LateBuffers: lateBuffers,
  367. LastError: errVal,
  368. UptimeSeconds: uptime,
  369. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  370. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  371. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  372. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  373. Queue: queue,
  374. RuntimeIndicator: ri,
  375. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  376. LastFault: lastFault,
  377. DegradedTransitions: e.degradedTransitions.Load(),
  378. MutedTransitions: e.mutedTransitions.Load(),
  379. FaultedTransitions: e.faultedTransitions.Load(),
  380. FaultCount: e.faultEvents.Load(),
  381. FaultHistory: e.FaultHistory(),
  382. }
  383. }
  384. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  385. switch {
  386. case queueHealth == output.QueueHealthCritical:
  387. return RuntimeIndicatorQueueCritical
  388. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  389. return RuntimeIndicatorDegraded
  390. default:
  391. return RuntimeIndicatorNormal
  392. }
  393. }
  394. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  395. switch {
  396. case queueHealth == output.QueueHealthCritical:
  397. return "queue health critical"
  398. case recentLateBuffers:
  399. return "late buffers"
  400. case queueHealth == output.QueueHealthLow:
  401. return "queue health low"
  402. default:
  403. return ""
  404. }
  405. }
  406. func (e *Engine) run(ctx context.Context) {
  407. e.setRuntimeState(RuntimeStatePrebuffering)
  408. e.wg.Add(1)
  409. go e.writerLoop(ctx)
  410. defer e.wg.Done()
  411. for {
  412. if ctx.Err() != nil {
  413. return
  414. }
  415. // Apply pending frequency change between chunks
  416. if pf := e.pendingFreq.Swap(nil); pf != nil {
  417. if err := e.driver.Tune(ctx, *pf); err != nil {
  418. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  419. } else {
  420. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  421. }
  422. }
  423. t0 := time.Now()
  424. frame := e.generator.GenerateFrame(e.chunkDuration)
  425. frame.GeneratedAt = t0
  426. t1 := time.Now()
  427. if e.upsampler != nil {
  428. frame = e.upsampler.Process(frame)
  429. frame.GeneratedAt = t0
  430. }
  431. t2 := time.Now()
  432. genDur := t1.Sub(t0)
  433. upDur := t2.Sub(t1)
  434. updateMaxDuration(&e.maxGenerateNs, genDur)
  435. updateMaxDuration(&e.maxUpsampleNs, upDur)
  436. enqueued := cloneFrame(frame)
  437. if enqueued == nil {
  438. e.lastError.Store("engine: frame clone failed")
  439. e.underruns.Add(1)
  440. continue
  441. }
  442. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  443. if ctx.Err() != nil {
  444. return
  445. }
  446. if errors.Is(err, output.ErrFrameQueueClosed) {
  447. return
  448. }
  449. e.lastError.Store(err.Error())
  450. e.underruns.Add(1)
  451. select {
  452. case <-time.After(e.chunkDuration):
  453. case <-ctx.Done():
  454. return
  455. }
  456. continue
  457. }
  458. queueStats := e.frameQueue.Stats()
  459. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  460. }
  461. }
  462. func (e *Engine) writerLoop(ctx context.Context) {
  463. defer e.wg.Done()
  464. for {
  465. frame, err := e.frameQueue.Pop(ctx)
  466. if err != nil {
  467. if ctx.Err() != nil {
  468. return
  469. }
  470. if errors.Is(err, output.ErrFrameQueueClosed) {
  471. return
  472. }
  473. e.lastError.Store(err.Error())
  474. e.underruns.Add(1)
  475. continue
  476. }
  477. writeStart := time.Now()
  478. n, err := e.driver.Write(ctx, frame)
  479. writeDur := time.Since(writeStart)
  480. cycleDur := writeDur
  481. if !frame.GeneratedAt.IsZero() {
  482. cycleDur = time.Since(frame.GeneratedAt)
  483. }
  484. updateMaxDuration(&e.maxWriteNs, writeDur)
  485. updateMaxDuration(&e.maxCycleNs, cycleDur)
  486. queueStats := e.frameQueue.Stats()
  487. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  488. if cycleDur > e.chunkDuration {
  489. late := e.lateBuffers.Add(1)
  490. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  491. if late <= 5 || late%20 == 0 {
  492. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  493. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  494. }
  495. }
  496. if err != nil {
  497. if ctx.Err() != nil {
  498. return
  499. }
  500. e.lastError.Store(err.Error())
  501. e.underruns.Add(1)
  502. select {
  503. case <-time.After(e.chunkDuration):
  504. case <-ctx.Done():
  505. return
  506. }
  507. continue
  508. }
  509. e.chunksProduced.Add(1)
  510. e.totalSamples.Add(uint64(n))
  511. }
  512. }
  513. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  514. if src == nil {
  515. return nil
  516. }
  517. samples := make([]output.IQSample, len(src.Samples))
  518. copy(samples, src.Samples)
  519. return &output.CompositeFrame{
  520. Samples: samples,
  521. SampleRateHz: src.SampleRateHz,
  522. Timestamp: src.Timestamp,
  523. GeneratedAt: src.GeneratedAt,
  524. Sequence: src.Sequence,
  525. }
  526. }
  527. func (e *Engine) setRuntimeState(state RuntimeState) {
  528. prev := e.currentRuntimeState()
  529. if prev != state {
  530. switch state {
  531. case RuntimeStateDegraded:
  532. e.degradedTransitions.Add(1)
  533. case RuntimeStateMuted:
  534. e.mutedTransitions.Add(1)
  535. case RuntimeStateFaulted:
  536. e.faultedTransitions.Add(1)
  537. }
  538. }
  539. e.runtimeState.Store(state)
  540. }
  541. func (e *Engine) currentRuntimeState() RuntimeState {
  542. if v := e.runtimeState.Load(); v != nil {
  543. if rs, ok := v.(RuntimeState); ok {
  544. return rs
  545. }
  546. }
  547. return RuntimeStateIdle
  548. }
  549. func (e *Engine) hasRecentLateBuffers() bool {
  550. lateAlertAt := e.lateBufferAlertAt.Load()
  551. if lateAlertAt == 0 {
  552. return false
  553. }
  554. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  555. }
  556. func (e *Engine) lastFaultEvent() *FaultEvent {
  557. return copyFaultEvent(e.loadLastFault())
  558. }
  559. // LastFault exposes the most recent captured fault, if any.
  560. func (e *Engine) LastFault() *FaultEvent {
  561. return e.lastFaultEvent()
  562. }
  563. func (e *Engine) FaultHistory() []FaultEvent {
  564. e.faultHistoryMu.Lock()
  565. defer e.faultHistoryMu.Unlock()
  566. history := make([]FaultEvent, len(e.faultHistory))
  567. copy(history, e.faultHistory)
  568. return history
  569. }
  570. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  571. if reason == "" {
  572. reason = FaultReasonUnknown
  573. }
  574. now := time.Now()
  575. if last := e.loadLastFault(); last != nil {
  576. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  577. return
  578. }
  579. }
  580. ev := &FaultEvent{
  581. Time: now,
  582. Reason: reason,
  583. Severity: severity,
  584. Message: message,
  585. }
  586. e.lastFault.Store(ev)
  587. e.appendFaultHistory(ev)
  588. e.faultEvents.Add(1)
  589. }
  590. func (e *Engine) loadLastFault() *FaultEvent {
  591. if v := e.lastFault.Load(); v != nil {
  592. if ev, ok := v.(*FaultEvent); ok {
  593. return ev
  594. }
  595. }
  596. return nil
  597. }
  598. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  599. if source == nil {
  600. return nil
  601. }
  602. copy := *source
  603. return &copy
  604. }
  605. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  606. e.faultHistoryMu.Lock()
  607. defer e.faultHistoryMu.Unlock()
  608. if len(e.faultHistory) >= faultHistoryCapacity {
  609. copy(e.faultHistory, e.faultHistory[1:])
  610. e.faultHistory[len(e.faultHistory)-1] = *ev
  611. return
  612. }
  613. e.faultHistory = append(e.faultHistory, *ev)
  614. }
  615. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  616. state := e.currentRuntimeState()
  617. switch state {
  618. case RuntimeStateStopping, RuntimeStateFaulted:
  619. return
  620. case RuntimeStateMuted:
  621. if queue.Health == output.QueueHealthCritical {
  622. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  623. e.mutedFaultStreak.Store(0)
  624. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  625. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  626. e.setRuntimeState(RuntimeStateFaulted)
  627. return
  628. }
  629. } else {
  630. e.mutedFaultStreak.Store(0)
  631. }
  632. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  633. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  634. e.mutedRecoveryStreak.Store(0)
  635. e.mutedFaultStreak.Store(0)
  636. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  637. fmt.Sprintf("queue healthy for %d checks after mute", count))
  638. e.setRuntimeState(RuntimeStateDegraded)
  639. }
  640. } else {
  641. e.mutedRecoveryStreak.Store(0)
  642. }
  643. return
  644. }
  645. if state == RuntimeStatePrebuffering {
  646. if queue.Depth >= 1 {
  647. e.setRuntimeState(RuntimeStateRunning)
  648. }
  649. return
  650. }
  651. critical := queue.Health == output.QueueHealthCritical
  652. if critical {
  653. count := e.criticalStreak.Add(1)
  654. if count >= queueMutedStreakThreshold {
  655. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  656. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  657. e.setRuntimeState(RuntimeStateMuted)
  658. return
  659. }
  660. if count >= queueCriticalStreakThreshold {
  661. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  662. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  663. e.setRuntimeState(RuntimeStateDegraded)
  664. return
  665. }
  666. } else {
  667. e.criticalStreak.Store(0)
  668. }
  669. if hasLateBuffers {
  670. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  671. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  672. e.setRuntimeState(RuntimeStateDegraded)
  673. return
  674. }
  675. e.setRuntimeState(RuntimeStateRunning)
  676. }
  677. // ResetFault attempts to move the engine out of the faulted state.
  678. func (e *Engine) ResetFault() error {
  679. state := e.currentRuntimeState()
  680. if state != RuntimeStateFaulted {
  681. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  682. }
  683. e.criticalStreak.Store(0)
  684. e.mutedRecoveryStreak.Store(0)
  685. e.mutedFaultStreak.Store(0)
  686. e.setRuntimeState(RuntimeStateDegraded)
  687. return nil
  688. }