Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

853 řádky
24KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"`
  64. ChunksProduced uint64 `json:"chunksProduced"`
  65. TotalSamples uint64 `json:"totalSamples"`
  66. Underruns uint64 `json:"underruns"`
  67. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  68. LastError string `json:"lastError,omitempty"`
  69. UptimeSeconds float64 `json:"uptimeSeconds"`
  70. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  71. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  72. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  73. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  74. MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"`
  75. MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"`
  76. Queue output.QueueStats `json:"queue"`
  77. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  78. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  79. LastFault *FaultEvent `json:"lastFault,omitempty"`
  80. DegradedTransitions uint64 `json:"degradedTransitions"`
  81. MutedTransitions uint64 `json:"mutedTransitions"`
  82. FaultedTransitions uint64 `json:"faultedTransitions"`
  83. FaultCount uint64 `json:"faultCount"`
  84. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  85. TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
  86. }
  87. type RuntimeIndicator string
  88. const (
  89. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  90. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  91. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  92. )
  93. type RuntimeTransition struct {
  94. Time time.Time `json:"time"`
  95. From RuntimeState `json:"from"`
  96. To RuntimeState `json:"to"`
  97. Severity string `json:"severity"`
  98. }
  99. const (
  100. lateBufferIndicatorWindow = 5 * time.Second
  101. writeLateTolerance = 1 * time.Millisecond
  102. queueCriticalStreakThreshold = 3
  103. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  104. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  105. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  106. faultRepeatWindow = 1 * time.Second
  107. faultHistoryCapacity = 8
  108. runtimeTransitionHistoryCapacity = 8
  109. )
  110. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  111. // resamples to device rate, and pushes to hardware in a tight loop.
  112. // The hardware buffer_push call is blocking — it returns when the hardware
  113. // has consumed the previous buffer and is ready for the next one.
  114. // This naturally paces the loop to real-time without a ticker.
  115. type Engine struct {
  116. cfg cfgpkg.Config
  117. driver platform.SoapyDriver
  118. generator *offpkg.Generator
  119. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  120. chunkDuration time.Duration
  121. deviceRate float64
  122. frameQueue *output.FrameQueue
  123. mu sync.Mutex
  124. state EngineState
  125. cancel context.CancelFunc
  126. startedAt time.Time
  127. wg sync.WaitGroup
  128. runtimeState atomic.Value
  129. chunksProduced atomic.Uint64
  130. totalSamples atomic.Uint64
  131. underruns atomic.Uint64
  132. lateBuffers atomic.Uint64
  133. lateBufferAlertAt atomic.Uint64
  134. criticalStreak atomic.Uint64
  135. mutedRecoveryStreak atomic.Uint64
  136. mutedFaultStreak atomic.Uint64
  137. maxCycleNs atomic.Uint64
  138. maxGenerateNs atomic.Uint64
  139. maxUpsampleNs atomic.Uint64
  140. maxWriteNs atomic.Uint64
  141. maxQueueResidenceNs atomic.Uint64
  142. maxPipelineNs atomic.Uint64
  143. lastError atomic.Value // string
  144. lastFault atomic.Value // *FaultEvent
  145. faultHistoryMu sync.Mutex
  146. faultHistory []FaultEvent
  147. transitionHistoryMu sync.Mutex
  148. transitionHistory []RuntimeTransition
  149. degradedTransitions atomic.Uint64
  150. mutedTransitions atomic.Uint64
  151. faultedTransitions atomic.Uint64
  152. faultEvents atomic.Uint64
  153. runtimeStateEnteredAt atomic.Uint64
  154. // Live config: pending frequency change, applied between chunks
  155. pendingFreq atomic.Pointer[float64]
  156. // Live audio stream (optional)
  157. streamSrc *audio.StreamSource
  158. }
  159. // SetStreamSource configures a live audio stream as the audio source.
  160. // Must be called before Start(). The StreamResampler is created internally
  161. // to convert from the stream's sample rate to the DSP composite rate.
  162. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  163. e.streamSrc = src
  164. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  165. if compositeRate <= 0 {
  166. compositeRate = 228000
  167. }
  168. resampler := audio.NewStreamResampler(src, compositeRate)
  169. e.generator.SetExternalSource(resampler)
  170. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  171. src.SampleRate, compositeRate, src.Stats().Capacity)
  172. }
  173. // StreamSource returns the live audio stream source, or nil.
  174. // Used by the control server for stats and HTTP audio ingest.
  175. func (e *Engine) StreamSource() *audio.StreamSource {
  176. return e.streamSrc
  177. }
  178. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  179. deviceRate := cfg.EffectiveDeviceRate()
  180. compositeRate := float64(cfg.FM.CompositeRateHz)
  181. if compositeRate <= 0 {
  182. compositeRate = 228000
  183. }
  184. var upsampler *dsp.FMUpsampler
  185. if deviceRate > compositeRate*1.001 {
  186. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  187. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  188. // This halves CPU load compared to running all DSP at deviceRate.
  189. cfg.FM.FMModulationEnabled = false
  190. maxDev := cfg.FM.MaxDeviationHz
  191. if maxDev <= 0 {
  192. maxDev = 75000
  193. }
  194. // mpxGain scales the FM deviation to compensate for hardware
  195. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  196. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  197. maxDev *= cfg.FM.MpxGain
  198. }
  199. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  200. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  201. compositeRate, deviceRate, deviceRate/compositeRate)
  202. } else {
  203. // Same-rate: entire DSP chain runs at deviceRate.
  204. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  205. if deviceRate > 0 {
  206. cfg.FM.CompositeRateHz = int(deviceRate)
  207. }
  208. cfg.FM.FMModulationEnabled = true
  209. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  210. }
  211. engine := &Engine{
  212. cfg: cfg,
  213. driver: driver,
  214. generator: offpkg.NewGenerator(cfg),
  215. upsampler: upsampler,
  216. chunkDuration: 50 * time.Millisecond,
  217. deviceRate: deviceRate,
  218. state: EngineIdle,
  219. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  220. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  221. transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
  222. }
  223. engine.setRuntimeState(RuntimeStateIdle)
  224. return engine
  225. }
  226. func (e *Engine) SetChunkDuration(d time.Duration) {
  227. e.chunkDuration = d
  228. }
  229. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  230. // nil pointers mean "no change". Validated before applying.
  231. type LiveConfigUpdate struct {
  232. FrequencyMHz *float64
  233. OutputDrive *float64
  234. StereoEnabled *bool
  235. PilotLevel *float64
  236. RDSInjection *float64
  237. RDSEnabled *bool
  238. LimiterEnabled *bool
  239. LimiterCeiling *float64
  240. PS *string
  241. RadioText *string
  242. }
  243. // UpdateConfig applies live parameter changes without restarting the engine.
  244. // DSP params take effect at the next chunk boundary (~50ms max).
  245. // Frequency changes are applied between chunks via driver.Tune().
  246. // RDS text updates are applied at the next RDS group boundary (~88ms).
  247. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  248. // --- Validate ---
  249. if u.FrequencyMHz != nil {
  250. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  251. return fmt.Errorf("frequencyMHz out of range (65-110)")
  252. }
  253. }
  254. if u.OutputDrive != nil {
  255. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  256. return fmt.Errorf("outputDrive out of range (0-10)")
  257. }
  258. }
  259. if u.PilotLevel != nil {
  260. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  261. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  262. }
  263. }
  264. if u.RDSInjection != nil {
  265. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  266. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  267. }
  268. }
  269. if u.LimiterCeiling != nil {
  270. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  271. return fmt.Errorf("limiterCeiling out of range (0-2)")
  272. }
  273. }
  274. // --- Frequency: store for run loop to apply via driver.Tune() ---
  275. if u.FrequencyMHz != nil {
  276. freqHz := *u.FrequencyMHz * 1e6
  277. e.pendingFreq.Store(&freqHz)
  278. }
  279. // --- RDS text: forward to encoder atomics ---
  280. if u.PS != nil || u.RadioText != nil {
  281. if enc := e.generator.RDSEncoder(); enc != nil {
  282. ps, rt := "", ""
  283. if u.PS != nil {
  284. ps = *u.PS
  285. }
  286. if u.RadioText != nil {
  287. rt = *u.RadioText
  288. }
  289. enc.UpdateText(ps, rt)
  290. }
  291. }
  292. // --- DSP params: build new LiveParams from current + patch ---
  293. // Read current, apply deltas, store new
  294. current := e.generator.CurrentLiveParams()
  295. next := current // copy
  296. if u.OutputDrive != nil {
  297. next.OutputDrive = *u.OutputDrive
  298. }
  299. if u.StereoEnabled != nil {
  300. next.StereoEnabled = *u.StereoEnabled
  301. }
  302. if u.PilotLevel != nil {
  303. next.PilotLevel = *u.PilotLevel
  304. }
  305. if u.RDSInjection != nil {
  306. next.RDSInjection = *u.RDSInjection
  307. }
  308. if u.RDSEnabled != nil {
  309. next.RDSEnabled = *u.RDSEnabled
  310. }
  311. if u.LimiterEnabled != nil {
  312. next.LimiterEnabled = *u.LimiterEnabled
  313. }
  314. if u.LimiterCeiling != nil {
  315. next.LimiterCeiling = *u.LimiterCeiling
  316. }
  317. e.generator.UpdateLive(next)
  318. return nil
  319. }
  320. func (e *Engine) Start(ctx context.Context) error {
  321. e.mu.Lock()
  322. if e.state != EngineIdle {
  323. e.mu.Unlock()
  324. return fmt.Errorf("engine already in state %s", e.state)
  325. }
  326. if err := e.driver.Start(ctx); err != nil {
  327. e.mu.Unlock()
  328. return fmt.Errorf("driver start: %w", err)
  329. }
  330. runCtx, cancel := context.WithCancel(ctx)
  331. e.cancel = cancel
  332. e.state = EngineRunning
  333. e.setRuntimeState(RuntimeStateArming)
  334. e.startedAt = time.Now()
  335. e.wg.Add(1)
  336. e.mu.Unlock()
  337. go e.run(runCtx)
  338. return nil
  339. }
  340. func (e *Engine) Stop(ctx context.Context) error {
  341. e.mu.Lock()
  342. if e.state != EngineRunning {
  343. e.mu.Unlock()
  344. return nil
  345. }
  346. e.state = EngineStopping
  347. e.setRuntimeState(RuntimeStateStopping)
  348. e.cancel()
  349. e.mu.Unlock()
  350. // Wait for run() goroutine to exit — deterministic, no guessing
  351. e.wg.Wait()
  352. if err := e.driver.Flush(ctx); err != nil {
  353. return err
  354. }
  355. if err := e.driver.Stop(ctx); err != nil {
  356. return err
  357. }
  358. e.mu.Lock()
  359. e.state = EngineIdle
  360. e.setRuntimeState(RuntimeStateIdle)
  361. e.mu.Unlock()
  362. return nil
  363. }
  364. func (e *Engine) Stats() EngineStats {
  365. e.mu.Lock()
  366. state := e.state
  367. startedAt := e.startedAt
  368. e.mu.Unlock()
  369. var uptime float64
  370. if state == EngineRunning {
  371. uptime = time.Since(startedAt).Seconds()
  372. }
  373. errVal, _ := e.lastError.Load().(string)
  374. queue := e.frameQueue.Stats()
  375. lateBuffers := e.lateBuffers.Load()
  376. hasRecentLateBuffers := e.hasRecentLateBuffers()
  377. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  378. lastFault := e.lastFaultEvent()
  379. return EngineStats{
  380. State: string(e.currentRuntimeState()),
  381. RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(),
  382. ChunksProduced: e.chunksProduced.Load(),
  383. TotalSamples: e.totalSamples.Load(),
  384. Underruns: e.underruns.Load(),
  385. LateBuffers: lateBuffers,
  386. LastError: errVal,
  387. UptimeSeconds: uptime,
  388. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  389. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  390. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  391. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  392. MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()),
  393. MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()),
  394. Queue: queue,
  395. RuntimeIndicator: ri,
  396. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  397. LastFault: lastFault,
  398. DegradedTransitions: e.degradedTransitions.Load(),
  399. MutedTransitions: e.mutedTransitions.Load(),
  400. FaultedTransitions: e.faultedTransitions.Load(),
  401. FaultCount: e.faultEvents.Load(),
  402. FaultHistory: e.FaultHistory(),
  403. TransitionHistory: e.TransitionHistory(),
  404. }
  405. }
  406. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  407. switch {
  408. case queueHealth == output.QueueHealthCritical:
  409. return RuntimeIndicatorQueueCritical
  410. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  411. return RuntimeIndicatorDegraded
  412. default:
  413. return RuntimeIndicatorNormal
  414. }
  415. }
  416. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  417. switch {
  418. case queueHealth == output.QueueHealthCritical:
  419. return "queue health critical"
  420. case recentLateBuffers:
  421. return "late buffers"
  422. case queueHealth == output.QueueHealthLow:
  423. return "queue health low"
  424. default:
  425. return ""
  426. }
  427. }
  428. func runtimeStateSeverity(state RuntimeState) string {
  429. switch state {
  430. case RuntimeStateRunning:
  431. return "ok"
  432. case RuntimeStateDegraded, RuntimeStateMuted:
  433. return "warn"
  434. case RuntimeStateFaulted:
  435. return "err"
  436. default:
  437. return "info"
  438. }
  439. }
  440. func (e *Engine) run(ctx context.Context) {
  441. e.setRuntimeState(RuntimeStatePrebuffering)
  442. e.wg.Add(1)
  443. go e.writerLoop(ctx)
  444. defer e.wg.Done()
  445. for {
  446. if ctx.Err() != nil {
  447. return
  448. }
  449. // Apply pending frequency change between chunks
  450. if pf := e.pendingFreq.Swap(nil); pf != nil {
  451. if err := e.driver.Tune(ctx, *pf); err != nil {
  452. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  453. } else {
  454. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  455. }
  456. }
  457. t0 := time.Now()
  458. frame := e.generator.GenerateFrame(e.chunkDuration)
  459. frame.GeneratedAt = t0
  460. t1 := time.Now()
  461. if e.upsampler != nil {
  462. frame = e.upsampler.Process(frame)
  463. frame.GeneratedAt = t0
  464. }
  465. t2 := time.Now()
  466. genDur := t1.Sub(t0)
  467. upDur := t2.Sub(t1)
  468. updateMaxDuration(&e.maxGenerateNs, genDur)
  469. updateMaxDuration(&e.maxUpsampleNs, upDur)
  470. enqueued := cloneFrame(frame)
  471. enqueued.EnqueuedAt = time.Now()
  472. if enqueued == nil {
  473. e.lastError.Store("engine: frame clone failed")
  474. e.underruns.Add(1)
  475. continue
  476. }
  477. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  478. if ctx.Err() != nil {
  479. return
  480. }
  481. if errors.Is(err, output.ErrFrameQueueClosed) {
  482. return
  483. }
  484. e.lastError.Store(err.Error())
  485. e.underruns.Add(1)
  486. select {
  487. case <-time.After(e.chunkDuration):
  488. case <-ctx.Done():
  489. return
  490. }
  491. continue
  492. }
  493. queueStats := e.frameQueue.Stats()
  494. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  495. }
  496. }
  497. func (e *Engine) writerLoop(ctx context.Context) {
  498. defer e.wg.Done()
  499. for {
  500. frame, err := e.frameQueue.Pop(ctx)
  501. if err != nil {
  502. if ctx.Err() != nil {
  503. return
  504. }
  505. if errors.Is(err, output.ErrFrameQueueClosed) {
  506. return
  507. }
  508. e.lastError.Store(err.Error())
  509. e.underruns.Add(1)
  510. continue
  511. }
  512. frame.DequeuedAt = time.Now()
  513. queueResidence := time.Duration(0)
  514. if !frame.EnqueuedAt.IsZero() {
  515. queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt)
  516. }
  517. writeStart := time.Now()
  518. frame.WriteStartedAt = writeStart
  519. n, err := e.driver.Write(ctx, frame)
  520. writeDur := time.Since(writeStart)
  521. pipelineLatency := writeDur
  522. if !frame.GeneratedAt.IsZero() {
  523. pipelineLatency = time.Since(frame.GeneratedAt)
  524. }
  525. updateMaxDuration(&e.maxWriteNs, writeDur)
  526. updateMaxDuration(&e.maxQueueResidenceNs, queueResidence)
  527. updateMaxDuration(&e.maxPipelineNs, pipelineLatency)
  528. updateMaxDuration(&e.maxCycleNs, writeDur)
  529. queueStats := e.frameQueue.Stats()
  530. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  531. lateOver := writeDur - e.chunkDuration
  532. if lateOver > writeLateTolerance {
  533. late := e.lateBuffers.Add(1)
  534. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  535. if late <= 5 || late%20 == 0 {
  536. log.Printf("TX LATE: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s",
  537. writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency)
  538. }
  539. }
  540. if err != nil {
  541. if ctx.Err() != nil {
  542. return
  543. }
  544. e.lastError.Store(err.Error())
  545. e.underruns.Add(1)
  546. select {
  547. case <-time.After(e.chunkDuration):
  548. case <-ctx.Done():
  549. return
  550. }
  551. continue
  552. }
  553. e.chunksProduced.Add(1)
  554. e.totalSamples.Add(uint64(n))
  555. }
  556. }
  557. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  558. if src == nil {
  559. return nil
  560. }
  561. samples := make([]output.IQSample, len(src.Samples))
  562. copy(samples, src.Samples)
  563. return &output.CompositeFrame{
  564. Samples: samples,
  565. SampleRateHz: src.SampleRateHz,
  566. Timestamp: src.Timestamp,
  567. GeneratedAt: src.GeneratedAt,
  568. EnqueuedAt: src.EnqueuedAt,
  569. DequeuedAt: src.DequeuedAt,
  570. WriteStartedAt: src.WriteStartedAt,
  571. Sequence: src.Sequence,
  572. }
  573. }
  574. func (e *Engine) setRuntimeState(state RuntimeState) {
  575. now := time.Now()
  576. prev := e.currentRuntimeState()
  577. if prev != state {
  578. e.recordRuntimeTransition(prev, state, now)
  579. switch state {
  580. case RuntimeStateDegraded:
  581. e.degradedTransitions.Add(1)
  582. case RuntimeStateMuted:
  583. e.mutedTransitions.Add(1)
  584. case RuntimeStateFaulted:
  585. e.faultedTransitions.Add(1)
  586. }
  587. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  588. } else if e.runtimeStateEnteredAt.Load() == 0 {
  589. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  590. }
  591. e.runtimeState.Store(state)
  592. }
  593. func (e *Engine) currentRuntimeState() RuntimeState {
  594. if v := e.runtimeState.Load(); v != nil {
  595. if rs, ok := v.(RuntimeState); ok {
  596. return rs
  597. }
  598. }
  599. return RuntimeStateIdle
  600. }
  601. func (e *Engine) runtimeStateDurationSeconds() float64 {
  602. if ts := e.runtimeStateEnteredAt.Load(); ts != 0 {
  603. return time.Since(time.Unix(0, int64(ts))).Seconds()
  604. }
  605. return 0
  606. }
  607. func (e *Engine) hasRecentLateBuffers() bool {
  608. lateAlertAt := e.lateBufferAlertAt.Load()
  609. if lateAlertAt == 0 {
  610. return false
  611. }
  612. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  613. }
  614. func (e *Engine) lastFaultEvent() *FaultEvent {
  615. return copyFaultEvent(e.loadLastFault())
  616. }
  617. // LastFault exposes the most recent captured fault, if any.
  618. func (e *Engine) LastFault() *FaultEvent {
  619. return e.lastFaultEvent()
  620. }
  621. func (e *Engine) FaultHistory() []FaultEvent {
  622. e.faultHistoryMu.Lock()
  623. defer e.faultHistoryMu.Unlock()
  624. history := make([]FaultEvent, len(e.faultHistory))
  625. copy(history, e.faultHistory)
  626. return history
  627. }
  628. func (e *Engine) TransitionHistory() []RuntimeTransition {
  629. e.transitionHistoryMu.Lock()
  630. defer e.transitionHistoryMu.Unlock()
  631. history := make([]RuntimeTransition, len(e.transitionHistory))
  632. copy(history, e.transitionHistory)
  633. return history
  634. }
  635. func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) {
  636. if when.IsZero() {
  637. when = time.Now()
  638. }
  639. ev := RuntimeTransition{
  640. Time: when,
  641. From: from,
  642. To: to,
  643. Severity: runtimeStateSeverity(to),
  644. }
  645. e.transitionHistoryMu.Lock()
  646. defer e.transitionHistoryMu.Unlock()
  647. if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
  648. copy(e.transitionHistory, e.transitionHistory[1:])
  649. e.transitionHistory[len(e.transitionHistory)-1] = ev
  650. return
  651. }
  652. e.transitionHistory = append(e.transitionHistory, ev)
  653. }
  654. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  655. if reason == "" {
  656. reason = FaultReasonUnknown
  657. }
  658. now := time.Now()
  659. if last := e.loadLastFault(); last != nil {
  660. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  661. return
  662. }
  663. }
  664. ev := &FaultEvent{
  665. Time: now,
  666. Reason: reason,
  667. Severity: severity,
  668. Message: message,
  669. }
  670. e.lastFault.Store(ev)
  671. e.appendFaultHistory(ev)
  672. e.faultEvents.Add(1)
  673. }
  674. func (e *Engine) loadLastFault() *FaultEvent {
  675. if v := e.lastFault.Load(); v != nil {
  676. if ev, ok := v.(*FaultEvent); ok {
  677. return ev
  678. }
  679. }
  680. return nil
  681. }
  682. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  683. if source == nil {
  684. return nil
  685. }
  686. copy := *source
  687. return &copy
  688. }
  689. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  690. e.faultHistoryMu.Lock()
  691. defer e.faultHistoryMu.Unlock()
  692. if len(e.faultHistory) >= faultHistoryCapacity {
  693. copy(e.faultHistory, e.faultHistory[1:])
  694. e.faultHistory[len(e.faultHistory)-1] = *ev
  695. return
  696. }
  697. e.faultHistory = append(e.faultHistory, *ev)
  698. }
  699. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  700. state := e.currentRuntimeState()
  701. switch state {
  702. case RuntimeStateStopping, RuntimeStateFaulted:
  703. return
  704. case RuntimeStateMuted:
  705. if queue.Health == output.QueueHealthCritical {
  706. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  707. e.mutedFaultStreak.Store(0)
  708. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  709. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  710. e.setRuntimeState(RuntimeStateFaulted)
  711. return
  712. }
  713. } else {
  714. e.mutedFaultStreak.Store(0)
  715. }
  716. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  717. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  718. e.mutedRecoveryStreak.Store(0)
  719. e.mutedFaultStreak.Store(0)
  720. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  721. fmt.Sprintf("queue healthy for %d checks after mute", count))
  722. e.setRuntimeState(RuntimeStateDegraded)
  723. }
  724. } else {
  725. e.mutedRecoveryStreak.Store(0)
  726. }
  727. return
  728. }
  729. if state == RuntimeStatePrebuffering {
  730. if queue.Depth >= 1 {
  731. e.setRuntimeState(RuntimeStateRunning)
  732. }
  733. return
  734. }
  735. critical := queue.Health == output.QueueHealthCritical
  736. if critical {
  737. count := e.criticalStreak.Add(1)
  738. if count >= queueMutedStreakThreshold {
  739. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  740. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  741. e.setRuntimeState(RuntimeStateMuted)
  742. return
  743. }
  744. if count >= queueCriticalStreakThreshold {
  745. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  746. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  747. e.setRuntimeState(RuntimeStateDegraded)
  748. return
  749. }
  750. } else {
  751. e.criticalStreak.Store(0)
  752. }
  753. if hasLateBuffers {
  754. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  755. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  756. e.setRuntimeState(RuntimeStateDegraded)
  757. return
  758. }
  759. e.setRuntimeState(RuntimeStateRunning)
  760. }
  761. // ResetFault attempts to move the engine out of the faulted state.
  762. func (e *Engine) ResetFault() error {
  763. state := e.currentRuntimeState()
  764. if state != RuntimeStateFaulted {
  765. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  766. }
  767. e.criticalStreak.Store(0)
  768. e.mutedRecoveryStreak.Store(0)
  769. e.mutedFaultStreak.Store(0)
  770. e.setRuntimeState(RuntimeStateDegraded)
  771. return nil
  772. }