Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

832 строки
23KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"`
  64. ChunksProduced uint64 `json:"chunksProduced"`
  65. TotalSamples uint64 `json:"totalSamples"`
  66. Underruns uint64 `json:"underruns"`
  67. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  68. LastError string `json:"lastError,omitempty"`
  69. UptimeSeconds float64 `json:"uptimeSeconds"`
  70. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  71. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  72. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  73. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  74. Queue output.QueueStats `json:"queue"`
  75. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  76. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  77. LastFault *FaultEvent `json:"lastFault,omitempty"`
  78. DegradedTransitions uint64 `json:"degradedTransitions"`
  79. MutedTransitions uint64 `json:"mutedTransitions"`
  80. FaultedTransitions uint64 `json:"faultedTransitions"`
  81. FaultCount uint64 `json:"faultCount"`
  82. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  83. TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
  84. }
  85. type RuntimeIndicator string
  86. const (
  87. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  88. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  89. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  90. )
  91. type RuntimeTransition struct {
  92. Time time.Time `json:"time"`
  93. From RuntimeState `json:"from"`
  94. To RuntimeState `json:"to"`
  95. Severity string `json:"severity"`
  96. }
  97. const (
  98. lateBufferIndicatorWindow = 5 * time.Second
  99. queueCriticalStreakThreshold = 3
  100. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  101. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  102. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  103. faultRepeatWindow = 1 * time.Second
  104. faultHistoryCapacity = 8
  105. runtimeTransitionHistoryCapacity = 8
  106. )
  107. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  108. // resamples to device rate, and pushes to hardware in a tight loop.
  109. // The hardware buffer_push call is blocking — it returns when the hardware
  110. // has consumed the previous buffer and is ready for the next one.
  111. // This naturally paces the loop to real-time without a ticker.
  112. type Engine struct {
  113. cfg cfgpkg.Config
  114. driver platform.SoapyDriver
  115. generator *offpkg.Generator
  116. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  117. chunkDuration time.Duration
  118. deviceRate float64
  119. frameQueue *output.FrameQueue
  120. mu sync.Mutex
  121. state EngineState
  122. cancel context.CancelFunc
  123. startedAt time.Time
  124. wg sync.WaitGroup
  125. runtimeState atomic.Value
  126. chunksProduced atomic.Uint64
  127. totalSamples atomic.Uint64
  128. underruns atomic.Uint64
  129. lateBuffers atomic.Uint64
  130. lateBufferAlertAt atomic.Uint64
  131. criticalStreak atomic.Uint64
  132. mutedRecoveryStreak atomic.Uint64
  133. mutedFaultStreak atomic.Uint64
  134. maxCycleNs atomic.Uint64
  135. maxGenerateNs atomic.Uint64
  136. maxUpsampleNs atomic.Uint64
  137. maxWriteNs atomic.Uint64
  138. lastError atomic.Value // string
  139. lastFault atomic.Value // *FaultEvent
  140. faultHistoryMu sync.Mutex
  141. faultHistory []FaultEvent
  142. transitionHistoryMu sync.Mutex
  143. transitionHistory []RuntimeTransition
  144. degradedTransitions atomic.Uint64
  145. mutedTransitions atomic.Uint64
  146. faultedTransitions atomic.Uint64
  147. faultEvents atomic.Uint64
  148. runtimeStateEnteredAt atomic.Uint64
  149. // Live config: pending frequency change, applied between chunks
  150. pendingFreq atomic.Pointer[float64]
  151. // Live audio stream (optional)
  152. streamSrc *audio.StreamSource
  153. }
  154. // SetStreamSource configures a live audio stream as the audio source.
  155. // Must be called before Start(). The StreamResampler is created internally
  156. // to convert from the stream's sample rate to the DSP composite rate.
  157. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  158. e.streamSrc = src
  159. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  160. if compositeRate <= 0 {
  161. compositeRate = 228000
  162. }
  163. resampler := audio.NewStreamResampler(src, compositeRate)
  164. e.generator.SetExternalSource(resampler)
  165. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  166. src.SampleRate, compositeRate, src.Stats().Capacity)
  167. }
  168. // StreamSource returns the live audio stream source, or nil.
  169. // Used by the control server for stats and HTTP audio ingest.
  170. func (e *Engine) StreamSource() *audio.StreamSource {
  171. return e.streamSrc
  172. }
  173. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  174. deviceRate := cfg.EffectiveDeviceRate()
  175. compositeRate := float64(cfg.FM.CompositeRateHz)
  176. if compositeRate <= 0 {
  177. compositeRate = 228000
  178. }
  179. var upsampler *dsp.FMUpsampler
  180. if deviceRate > compositeRate*1.001 {
  181. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  182. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  183. // This halves CPU load compared to running all DSP at deviceRate.
  184. cfg.FM.FMModulationEnabled = false
  185. maxDev := cfg.FM.MaxDeviationHz
  186. if maxDev <= 0 {
  187. maxDev = 75000
  188. }
  189. // mpxGain scales the FM deviation to compensate for hardware
  190. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  191. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  192. maxDev *= cfg.FM.MpxGain
  193. }
  194. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  195. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  196. compositeRate, deviceRate, deviceRate/compositeRate)
  197. } else {
  198. // Same-rate: entire DSP chain runs at deviceRate.
  199. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  200. if deviceRate > 0 {
  201. cfg.FM.CompositeRateHz = int(deviceRate)
  202. }
  203. cfg.FM.FMModulationEnabled = true
  204. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  205. }
  206. engine := &Engine{
  207. cfg: cfg,
  208. driver: driver,
  209. generator: offpkg.NewGenerator(cfg),
  210. upsampler: upsampler,
  211. chunkDuration: 50 * time.Millisecond,
  212. deviceRate: deviceRate,
  213. state: EngineIdle,
  214. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  215. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  216. transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
  217. }
  218. engine.setRuntimeState(RuntimeStateIdle)
  219. return engine
  220. }
  221. func (e *Engine) SetChunkDuration(d time.Duration) {
  222. e.chunkDuration = d
  223. }
  224. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  225. // nil pointers mean "no change". Validated before applying.
  226. type LiveConfigUpdate struct {
  227. FrequencyMHz *float64
  228. OutputDrive *float64
  229. StereoEnabled *bool
  230. PilotLevel *float64
  231. RDSInjection *float64
  232. RDSEnabled *bool
  233. LimiterEnabled *bool
  234. LimiterCeiling *float64
  235. PS *string
  236. RadioText *string
  237. }
  238. // UpdateConfig applies live parameter changes without restarting the engine.
  239. // DSP params take effect at the next chunk boundary (~50ms max).
  240. // Frequency changes are applied between chunks via driver.Tune().
  241. // RDS text updates are applied at the next RDS group boundary (~88ms).
  242. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  243. // --- Validate ---
  244. if u.FrequencyMHz != nil {
  245. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  246. return fmt.Errorf("frequencyMHz out of range (65-110)")
  247. }
  248. }
  249. if u.OutputDrive != nil {
  250. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  251. return fmt.Errorf("outputDrive out of range (0-10)")
  252. }
  253. }
  254. if u.PilotLevel != nil {
  255. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  256. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  257. }
  258. }
  259. if u.RDSInjection != nil {
  260. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  261. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  262. }
  263. }
  264. if u.LimiterCeiling != nil {
  265. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  266. return fmt.Errorf("limiterCeiling out of range (0-2)")
  267. }
  268. }
  269. // --- Frequency: store for run loop to apply via driver.Tune() ---
  270. if u.FrequencyMHz != nil {
  271. freqHz := *u.FrequencyMHz * 1e6
  272. e.pendingFreq.Store(&freqHz)
  273. }
  274. // --- RDS text: forward to encoder atomics ---
  275. if u.PS != nil || u.RadioText != nil {
  276. if enc := e.generator.RDSEncoder(); enc != nil {
  277. ps, rt := "", ""
  278. if u.PS != nil {
  279. ps = *u.PS
  280. }
  281. if u.RadioText != nil {
  282. rt = *u.RadioText
  283. }
  284. enc.UpdateText(ps, rt)
  285. }
  286. }
  287. // --- DSP params: build new LiveParams from current + patch ---
  288. // Read current, apply deltas, store new
  289. current := e.generator.CurrentLiveParams()
  290. next := current // copy
  291. if u.OutputDrive != nil {
  292. next.OutputDrive = *u.OutputDrive
  293. }
  294. if u.StereoEnabled != nil {
  295. next.StereoEnabled = *u.StereoEnabled
  296. }
  297. if u.PilotLevel != nil {
  298. next.PilotLevel = *u.PilotLevel
  299. }
  300. if u.RDSInjection != nil {
  301. next.RDSInjection = *u.RDSInjection
  302. }
  303. if u.RDSEnabled != nil {
  304. next.RDSEnabled = *u.RDSEnabled
  305. }
  306. if u.LimiterEnabled != nil {
  307. next.LimiterEnabled = *u.LimiterEnabled
  308. }
  309. if u.LimiterCeiling != nil {
  310. next.LimiterCeiling = *u.LimiterCeiling
  311. }
  312. e.generator.UpdateLive(next)
  313. return nil
  314. }
  315. func (e *Engine) Start(ctx context.Context) error {
  316. e.mu.Lock()
  317. if e.state != EngineIdle {
  318. e.mu.Unlock()
  319. return fmt.Errorf("engine already in state %s", e.state)
  320. }
  321. if err := e.driver.Start(ctx); err != nil {
  322. e.mu.Unlock()
  323. return fmt.Errorf("driver start: %w", err)
  324. }
  325. runCtx, cancel := context.WithCancel(ctx)
  326. e.cancel = cancel
  327. e.state = EngineRunning
  328. e.setRuntimeState(RuntimeStateArming)
  329. e.startedAt = time.Now()
  330. e.wg.Add(1)
  331. e.mu.Unlock()
  332. go e.run(runCtx)
  333. return nil
  334. }
  335. func (e *Engine) Stop(ctx context.Context) error {
  336. e.mu.Lock()
  337. if e.state != EngineRunning {
  338. e.mu.Unlock()
  339. return nil
  340. }
  341. e.state = EngineStopping
  342. e.setRuntimeState(RuntimeStateStopping)
  343. e.cancel()
  344. e.mu.Unlock()
  345. // Wait for run() goroutine to exit — deterministic, no guessing
  346. e.wg.Wait()
  347. if err := e.driver.Flush(ctx); err != nil {
  348. return err
  349. }
  350. if err := e.driver.Stop(ctx); err != nil {
  351. return err
  352. }
  353. e.mu.Lock()
  354. e.state = EngineIdle
  355. e.setRuntimeState(RuntimeStateIdle)
  356. e.mu.Unlock()
  357. return nil
  358. }
  359. func (e *Engine) Stats() EngineStats {
  360. e.mu.Lock()
  361. state := e.state
  362. startedAt := e.startedAt
  363. e.mu.Unlock()
  364. var uptime float64
  365. if state == EngineRunning {
  366. uptime = time.Since(startedAt).Seconds()
  367. }
  368. errVal, _ := e.lastError.Load().(string)
  369. queue := e.frameQueue.Stats()
  370. lateBuffers := e.lateBuffers.Load()
  371. hasRecentLateBuffers := e.hasRecentLateBuffers()
  372. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  373. lastFault := e.lastFaultEvent()
  374. return EngineStats{
  375. State: string(e.currentRuntimeState()),
  376. RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(),
  377. ChunksProduced: e.chunksProduced.Load(),
  378. TotalSamples: e.totalSamples.Load(),
  379. Underruns: e.underruns.Load(),
  380. LateBuffers: lateBuffers,
  381. LastError: errVal,
  382. UptimeSeconds: uptime,
  383. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  384. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  385. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  386. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  387. Queue: queue,
  388. RuntimeIndicator: ri,
  389. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  390. LastFault: lastFault,
  391. DegradedTransitions: e.degradedTransitions.Load(),
  392. MutedTransitions: e.mutedTransitions.Load(),
  393. FaultedTransitions: e.faultedTransitions.Load(),
  394. FaultCount: e.faultEvents.Load(),
  395. FaultHistory: e.FaultHistory(),
  396. TransitionHistory: e.TransitionHistory(),
  397. }
  398. }
  399. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  400. switch {
  401. case queueHealth == output.QueueHealthCritical:
  402. return RuntimeIndicatorQueueCritical
  403. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  404. return RuntimeIndicatorDegraded
  405. default:
  406. return RuntimeIndicatorNormal
  407. }
  408. }
  409. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  410. switch {
  411. case queueHealth == output.QueueHealthCritical:
  412. return "queue health critical"
  413. case recentLateBuffers:
  414. return "late buffers"
  415. case queueHealth == output.QueueHealthLow:
  416. return "queue health low"
  417. default:
  418. return ""
  419. }
  420. }
  421. func runtimeStateSeverity(state RuntimeState) string {
  422. switch state {
  423. case RuntimeStateRunning:
  424. return "ok"
  425. case RuntimeStateDegraded, RuntimeStateMuted:
  426. return "warn"
  427. case RuntimeStateFaulted:
  428. return "err"
  429. default:
  430. return "info"
  431. }
  432. }
  433. func (e *Engine) run(ctx context.Context) {
  434. e.setRuntimeState(RuntimeStatePrebuffering)
  435. e.wg.Add(1)
  436. go e.writerLoop(ctx)
  437. defer e.wg.Done()
  438. for {
  439. if ctx.Err() != nil {
  440. return
  441. }
  442. // Apply pending frequency change between chunks
  443. if pf := e.pendingFreq.Swap(nil); pf != nil {
  444. if err := e.driver.Tune(ctx, *pf); err != nil {
  445. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  446. } else {
  447. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  448. }
  449. }
  450. t0 := time.Now()
  451. frame := e.generator.GenerateFrame(e.chunkDuration)
  452. frame.GeneratedAt = t0
  453. t1 := time.Now()
  454. if e.upsampler != nil {
  455. frame = e.upsampler.Process(frame)
  456. frame.GeneratedAt = t0
  457. }
  458. t2 := time.Now()
  459. genDur := t1.Sub(t0)
  460. upDur := t2.Sub(t1)
  461. updateMaxDuration(&e.maxGenerateNs, genDur)
  462. updateMaxDuration(&e.maxUpsampleNs, upDur)
  463. enqueued := cloneFrame(frame)
  464. if enqueued == nil {
  465. e.lastError.Store("engine: frame clone failed")
  466. e.underruns.Add(1)
  467. continue
  468. }
  469. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  470. if ctx.Err() != nil {
  471. return
  472. }
  473. if errors.Is(err, output.ErrFrameQueueClosed) {
  474. return
  475. }
  476. e.lastError.Store(err.Error())
  477. e.underruns.Add(1)
  478. select {
  479. case <-time.After(e.chunkDuration):
  480. case <-ctx.Done():
  481. return
  482. }
  483. continue
  484. }
  485. queueStats := e.frameQueue.Stats()
  486. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  487. }
  488. }
  489. func (e *Engine) writerLoop(ctx context.Context) {
  490. defer e.wg.Done()
  491. for {
  492. frame, err := e.frameQueue.Pop(ctx)
  493. if err != nil {
  494. if ctx.Err() != nil {
  495. return
  496. }
  497. if errors.Is(err, output.ErrFrameQueueClosed) {
  498. return
  499. }
  500. e.lastError.Store(err.Error())
  501. e.underruns.Add(1)
  502. continue
  503. }
  504. writeStart := time.Now()
  505. n, err := e.driver.Write(ctx, frame)
  506. writeDur := time.Since(writeStart)
  507. cycleDur := writeDur
  508. if !frame.GeneratedAt.IsZero() {
  509. cycleDur = time.Since(frame.GeneratedAt)
  510. }
  511. updateMaxDuration(&e.maxWriteNs, writeDur)
  512. updateMaxDuration(&e.maxCycleNs, cycleDur)
  513. queueStats := e.frameQueue.Stats()
  514. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  515. if cycleDur > e.chunkDuration {
  516. late := e.lateBuffers.Add(1)
  517. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  518. if late <= 5 || late%20 == 0 {
  519. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  520. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  521. }
  522. }
  523. if err != nil {
  524. if ctx.Err() != nil {
  525. return
  526. }
  527. e.lastError.Store(err.Error())
  528. e.underruns.Add(1)
  529. select {
  530. case <-time.After(e.chunkDuration):
  531. case <-ctx.Done():
  532. return
  533. }
  534. continue
  535. }
  536. e.chunksProduced.Add(1)
  537. e.totalSamples.Add(uint64(n))
  538. }
  539. }
  540. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  541. if src == nil {
  542. return nil
  543. }
  544. samples := make([]output.IQSample, len(src.Samples))
  545. copy(samples, src.Samples)
  546. return &output.CompositeFrame{
  547. Samples: samples,
  548. SampleRateHz: src.SampleRateHz,
  549. Timestamp: src.Timestamp,
  550. GeneratedAt: src.GeneratedAt,
  551. Sequence: src.Sequence,
  552. }
  553. }
  554. func (e *Engine) setRuntimeState(state RuntimeState) {
  555. now := time.Now()
  556. prev := e.currentRuntimeState()
  557. if prev != state {
  558. e.recordRuntimeTransition(prev, state, now)
  559. switch state {
  560. case RuntimeStateDegraded:
  561. e.degradedTransitions.Add(1)
  562. case RuntimeStateMuted:
  563. e.mutedTransitions.Add(1)
  564. case RuntimeStateFaulted:
  565. e.faultedTransitions.Add(1)
  566. }
  567. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  568. } else if e.runtimeStateEnteredAt.Load() == 0 {
  569. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  570. }
  571. e.runtimeState.Store(state)
  572. }
  573. func (e *Engine) currentRuntimeState() RuntimeState {
  574. if v := e.runtimeState.Load(); v != nil {
  575. if rs, ok := v.(RuntimeState); ok {
  576. return rs
  577. }
  578. }
  579. return RuntimeStateIdle
  580. }
  581. func (e *Engine) runtimeStateDurationSeconds() float64 {
  582. if ts := e.runtimeStateEnteredAt.Load(); ts != 0 {
  583. return time.Since(time.Unix(0, int64(ts))).Seconds()
  584. }
  585. return 0
  586. }
  587. func (e *Engine) hasRecentLateBuffers() bool {
  588. lateAlertAt := e.lateBufferAlertAt.Load()
  589. if lateAlertAt == 0 {
  590. return false
  591. }
  592. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  593. }
  594. func (e *Engine) lastFaultEvent() *FaultEvent {
  595. return copyFaultEvent(e.loadLastFault())
  596. }
  597. // LastFault exposes the most recent captured fault, if any.
  598. func (e *Engine) LastFault() *FaultEvent {
  599. return e.lastFaultEvent()
  600. }
  601. func (e *Engine) FaultHistory() []FaultEvent {
  602. e.faultHistoryMu.Lock()
  603. defer e.faultHistoryMu.Unlock()
  604. history := make([]FaultEvent, len(e.faultHistory))
  605. copy(history, e.faultHistory)
  606. return history
  607. }
  608. func (e *Engine) TransitionHistory() []RuntimeTransition {
  609. e.transitionHistoryMu.Lock()
  610. defer e.transitionHistoryMu.Unlock()
  611. history := make([]RuntimeTransition, len(e.transitionHistory))
  612. copy(history, e.transitionHistory)
  613. return history
  614. }
  615. func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) {
  616. if when.IsZero() {
  617. when = time.Now()
  618. }
  619. ev := RuntimeTransition{
  620. Time: when,
  621. From: from,
  622. To: to,
  623. Severity: runtimeStateSeverity(to),
  624. }
  625. e.transitionHistoryMu.Lock()
  626. defer e.transitionHistoryMu.Unlock()
  627. if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
  628. copy(e.transitionHistory, e.transitionHistory[1:])
  629. e.transitionHistory[len(e.transitionHistory)-1] = ev
  630. return
  631. }
  632. e.transitionHistory = append(e.transitionHistory, ev)
  633. }
  634. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  635. if reason == "" {
  636. reason = FaultReasonUnknown
  637. }
  638. now := time.Now()
  639. if last := e.loadLastFault(); last != nil {
  640. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  641. return
  642. }
  643. }
  644. ev := &FaultEvent{
  645. Time: now,
  646. Reason: reason,
  647. Severity: severity,
  648. Message: message,
  649. }
  650. e.lastFault.Store(ev)
  651. e.appendFaultHistory(ev)
  652. e.faultEvents.Add(1)
  653. }
  654. func (e *Engine) loadLastFault() *FaultEvent {
  655. if v := e.lastFault.Load(); v != nil {
  656. if ev, ok := v.(*FaultEvent); ok {
  657. return ev
  658. }
  659. }
  660. return nil
  661. }
  662. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  663. if source == nil {
  664. return nil
  665. }
  666. copy := *source
  667. return &copy
  668. }
  669. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  670. e.faultHistoryMu.Lock()
  671. defer e.faultHistoryMu.Unlock()
  672. if len(e.faultHistory) >= faultHistoryCapacity {
  673. copy(e.faultHistory, e.faultHistory[1:])
  674. e.faultHistory[len(e.faultHistory)-1] = *ev
  675. return
  676. }
  677. e.faultHistory = append(e.faultHistory, *ev)
  678. }
  679. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  680. state := e.currentRuntimeState()
  681. switch state {
  682. case RuntimeStateStopping, RuntimeStateFaulted:
  683. return
  684. case RuntimeStateMuted:
  685. if queue.Health == output.QueueHealthCritical {
  686. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  687. e.mutedFaultStreak.Store(0)
  688. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  689. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  690. e.setRuntimeState(RuntimeStateFaulted)
  691. return
  692. }
  693. } else {
  694. e.mutedFaultStreak.Store(0)
  695. }
  696. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  697. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  698. e.mutedRecoveryStreak.Store(0)
  699. e.mutedFaultStreak.Store(0)
  700. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  701. fmt.Sprintf("queue healthy for %d checks after mute", count))
  702. e.setRuntimeState(RuntimeStateDegraded)
  703. }
  704. } else {
  705. e.mutedRecoveryStreak.Store(0)
  706. }
  707. return
  708. }
  709. if state == RuntimeStatePrebuffering {
  710. if queue.Depth >= 1 {
  711. e.setRuntimeState(RuntimeStateRunning)
  712. }
  713. return
  714. }
  715. critical := queue.Health == output.QueueHealthCritical
  716. if critical {
  717. count := e.criticalStreak.Add(1)
  718. if count >= queueMutedStreakThreshold {
  719. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  720. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  721. e.setRuntimeState(RuntimeStateMuted)
  722. return
  723. }
  724. if count >= queueCriticalStreakThreshold {
  725. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  726. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  727. e.setRuntimeState(RuntimeStateDegraded)
  728. return
  729. }
  730. } else {
  731. e.criticalStreak.Store(0)
  732. }
  733. if hasLateBuffers {
  734. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  735. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  736. e.setRuntimeState(RuntimeStateDegraded)
  737. return
  738. }
  739. e.setRuntimeState(RuntimeStateRunning)
  740. }
  741. // ResetFault attempts to move the engine out of the faulted state.
  742. func (e *Engine) ResetFault() error {
  743. state := e.currentRuntimeState()
  744. if state != RuntimeStateFaulted {
  745. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  746. }
  747. e.criticalStreak.Store(0)
  748. e.mutedRecoveryStreak.Store(0)
  749. e.mutedFaultStreak.Store(0)
  750. e.setRuntimeState(RuntimeStateDegraded)
  751. return nil
  752. }