Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

851 lines
24KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"`
  64. ChunksProduced uint64 `json:"chunksProduced"`
  65. TotalSamples uint64 `json:"totalSamples"`
  66. Underruns uint64 `json:"underruns"`
  67. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  68. LastError string `json:"lastError,omitempty"`
  69. UptimeSeconds float64 `json:"uptimeSeconds"`
  70. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  71. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  72. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  73. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  74. MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"`
  75. MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"`
  76. Queue output.QueueStats `json:"queue"`
  77. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  78. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  79. LastFault *FaultEvent `json:"lastFault,omitempty"`
  80. DegradedTransitions uint64 `json:"degradedTransitions"`
  81. MutedTransitions uint64 `json:"mutedTransitions"`
  82. FaultedTransitions uint64 `json:"faultedTransitions"`
  83. FaultCount uint64 `json:"faultCount"`
  84. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  85. TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
  86. }
  87. type RuntimeIndicator string
  88. const (
  89. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  90. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  91. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  92. )
  93. type RuntimeTransition struct {
  94. Time time.Time `json:"time"`
  95. From RuntimeState `json:"from"`
  96. To RuntimeState `json:"to"`
  97. Severity string `json:"severity"`
  98. }
  99. const (
  100. lateBufferIndicatorWindow = 5 * time.Second
  101. queueCriticalStreakThreshold = 3
  102. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  103. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  104. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  105. faultRepeatWindow = 1 * time.Second
  106. faultHistoryCapacity = 8
  107. runtimeTransitionHistoryCapacity = 8
  108. )
  109. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  110. // resamples to device rate, and pushes to hardware in a tight loop.
  111. // The hardware buffer_push call is blocking — it returns when the hardware
  112. // has consumed the previous buffer and is ready for the next one.
  113. // This naturally paces the loop to real-time without a ticker.
  114. type Engine struct {
  115. cfg cfgpkg.Config
  116. driver platform.SoapyDriver
  117. generator *offpkg.Generator
  118. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  119. chunkDuration time.Duration
  120. deviceRate float64
  121. frameQueue *output.FrameQueue
  122. mu sync.Mutex
  123. state EngineState
  124. cancel context.CancelFunc
  125. startedAt time.Time
  126. wg sync.WaitGroup
  127. runtimeState atomic.Value
  128. chunksProduced atomic.Uint64
  129. totalSamples atomic.Uint64
  130. underruns atomic.Uint64
  131. lateBuffers atomic.Uint64
  132. lateBufferAlertAt atomic.Uint64
  133. criticalStreak atomic.Uint64
  134. mutedRecoveryStreak atomic.Uint64
  135. mutedFaultStreak atomic.Uint64
  136. maxCycleNs atomic.Uint64
  137. maxGenerateNs atomic.Uint64
  138. maxUpsampleNs atomic.Uint64
  139. maxWriteNs atomic.Uint64
  140. maxQueueResidenceNs atomic.Uint64
  141. maxPipelineNs atomic.Uint64
  142. lastError atomic.Value // string
  143. lastFault atomic.Value // *FaultEvent
  144. faultHistoryMu sync.Mutex
  145. faultHistory []FaultEvent
  146. transitionHistoryMu sync.Mutex
  147. transitionHistory []RuntimeTransition
  148. degradedTransitions atomic.Uint64
  149. mutedTransitions atomic.Uint64
  150. faultedTransitions atomic.Uint64
  151. faultEvents atomic.Uint64
  152. runtimeStateEnteredAt atomic.Uint64
  153. // Live config: pending frequency change, applied between chunks
  154. pendingFreq atomic.Pointer[float64]
  155. // Live audio stream (optional)
  156. streamSrc *audio.StreamSource
  157. }
  158. // SetStreamSource configures a live audio stream as the audio source.
  159. // Must be called before Start(). The StreamResampler is created internally
  160. // to convert from the stream's sample rate to the DSP composite rate.
  161. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  162. e.streamSrc = src
  163. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  164. if compositeRate <= 0 {
  165. compositeRate = 228000
  166. }
  167. resampler := audio.NewStreamResampler(src, compositeRate)
  168. e.generator.SetExternalSource(resampler)
  169. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  170. src.SampleRate, compositeRate, src.Stats().Capacity)
  171. }
  172. // StreamSource returns the live audio stream source, or nil.
  173. // Used by the control server for stats and HTTP audio ingest.
  174. func (e *Engine) StreamSource() *audio.StreamSource {
  175. return e.streamSrc
  176. }
  177. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  178. deviceRate := cfg.EffectiveDeviceRate()
  179. compositeRate := float64(cfg.FM.CompositeRateHz)
  180. if compositeRate <= 0 {
  181. compositeRate = 228000
  182. }
  183. var upsampler *dsp.FMUpsampler
  184. if deviceRate > compositeRate*1.001 {
  185. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  186. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  187. // This halves CPU load compared to running all DSP at deviceRate.
  188. cfg.FM.FMModulationEnabled = false
  189. maxDev := cfg.FM.MaxDeviationHz
  190. if maxDev <= 0 {
  191. maxDev = 75000
  192. }
  193. // mpxGain scales the FM deviation to compensate for hardware
  194. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  195. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  196. maxDev *= cfg.FM.MpxGain
  197. }
  198. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  199. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  200. compositeRate, deviceRate, deviceRate/compositeRate)
  201. } else {
  202. // Same-rate: entire DSP chain runs at deviceRate.
  203. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  204. if deviceRate > 0 {
  205. cfg.FM.CompositeRateHz = int(deviceRate)
  206. }
  207. cfg.FM.FMModulationEnabled = true
  208. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  209. }
  210. engine := &Engine{
  211. cfg: cfg,
  212. driver: driver,
  213. generator: offpkg.NewGenerator(cfg),
  214. upsampler: upsampler,
  215. chunkDuration: 50 * time.Millisecond,
  216. deviceRate: deviceRate,
  217. state: EngineIdle,
  218. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  219. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  220. transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
  221. }
  222. engine.setRuntimeState(RuntimeStateIdle)
  223. return engine
  224. }
  225. func (e *Engine) SetChunkDuration(d time.Duration) {
  226. e.chunkDuration = d
  227. }
  228. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  229. // nil pointers mean "no change". Validated before applying.
  230. type LiveConfigUpdate struct {
  231. FrequencyMHz *float64
  232. OutputDrive *float64
  233. StereoEnabled *bool
  234. PilotLevel *float64
  235. RDSInjection *float64
  236. RDSEnabled *bool
  237. LimiterEnabled *bool
  238. LimiterCeiling *float64
  239. PS *string
  240. RadioText *string
  241. }
  242. // UpdateConfig applies live parameter changes without restarting the engine.
  243. // DSP params take effect at the next chunk boundary (~50ms max).
  244. // Frequency changes are applied between chunks via driver.Tune().
  245. // RDS text updates are applied at the next RDS group boundary (~88ms).
  246. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  247. // --- Validate ---
  248. if u.FrequencyMHz != nil {
  249. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  250. return fmt.Errorf("frequencyMHz out of range (65-110)")
  251. }
  252. }
  253. if u.OutputDrive != nil {
  254. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  255. return fmt.Errorf("outputDrive out of range (0-10)")
  256. }
  257. }
  258. if u.PilotLevel != nil {
  259. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  260. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  261. }
  262. }
  263. if u.RDSInjection != nil {
  264. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  265. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  266. }
  267. }
  268. if u.LimiterCeiling != nil {
  269. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  270. return fmt.Errorf("limiterCeiling out of range (0-2)")
  271. }
  272. }
  273. // --- Frequency: store for run loop to apply via driver.Tune() ---
  274. if u.FrequencyMHz != nil {
  275. freqHz := *u.FrequencyMHz * 1e6
  276. e.pendingFreq.Store(&freqHz)
  277. }
  278. // --- RDS text: forward to encoder atomics ---
  279. if u.PS != nil || u.RadioText != nil {
  280. if enc := e.generator.RDSEncoder(); enc != nil {
  281. ps, rt := "", ""
  282. if u.PS != nil {
  283. ps = *u.PS
  284. }
  285. if u.RadioText != nil {
  286. rt = *u.RadioText
  287. }
  288. enc.UpdateText(ps, rt)
  289. }
  290. }
  291. // --- DSP params: build new LiveParams from current + patch ---
  292. // Read current, apply deltas, store new
  293. current := e.generator.CurrentLiveParams()
  294. next := current // copy
  295. if u.OutputDrive != nil {
  296. next.OutputDrive = *u.OutputDrive
  297. }
  298. if u.StereoEnabled != nil {
  299. next.StereoEnabled = *u.StereoEnabled
  300. }
  301. if u.PilotLevel != nil {
  302. next.PilotLevel = *u.PilotLevel
  303. }
  304. if u.RDSInjection != nil {
  305. next.RDSInjection = *u.RDSInjection
  306. }
  307. if u.RDSEnabled != nil {
  308. next.RDSEnabled = *u.RDSEnabled
  309. }
  310. if u.LimiterEnabled != nil {
  311. next.LimiterEnabled = *u.LimiterEnabled
  312. }
  313. if u.LimiterCeiling != nil {
  314. next.LimiterCeiling = *u.LimiterCeiling
  315. }
  316. e.generator.UpdateLive(next)
  317. return nil
  318. }
  319. func (e *Engine) Start(ctx context.Context) error {
  320. e.mu.Lock()
  321. if e.state != EngineIdle {
  322. e.mu.Unlock()
  323. return fmt.Errorf("engine already in state %s", e.state)
  324. }
  325. if err := e.driver.Start(ctx); err != nil {
  326. e.mu.Unlock()
  327. return fmt.Errorf("driver start: %w", err)
  328. }
  329. runCtx, cancel := context.WithCancel(ctx)
  330. e.cancel = cancel
  331. e.state = EngineRunning
  332. e.setRuntimeState(RuntimeStateArming)
  333. e.startedAt = time.Now()
  334. e.wg.Add(1)
  335. e.mu.Unlock()
  336. go e.run(runCtx)
  337. return nil
  338. }
  339. func (e *Engine) Stop(ctx context.Context) error {
  340. e.mu.Lock()
  341. if e.state != EngineRunning {
  342. e.mu.Unlock()
  343. return nil
  344. }
  345. e.state = EngineStopping
  346. e.setRuntimeState(RuntimeStateStopping)
  347. e.cancel()
  348. e.mu.Unlock()
  349. // Wait for run() goroutine to exit — deterministic, no guessing
  350. e.wg.Wait()
  351. if err := e.driver.Flush(ctx); err != nil {
  352. return err
  353. }
  354. if err := e.driver.Stop(ctx); err != nil {
  355. return err
  356. }
  357. e.mu.Lock()
  358. e.state = EngineIdle
  359. e.setRuntimeState(RuntimeStateIdle)
  360. e.mu.Unlock()
  361. return nil
  362. }
  363. func (e *Engine) Stats() EngineStats {
  364. e.mu.Lock()
  365. state := e.state
  366. startedAt := e.startedAt
  367. e.mu.Unlock()
  368. var uptime float64
  369. if state == EngineRunning {
  370. uptime = time.Since(startedAt).Seconds()
  371. }
  372. errVal, _ := e.lastError.Load().(string)
  373. queue := e.frameQueue.Stats()
  374. lateBuffers := e.lateBuffers.Load()
  375. hasRecentLateBuffers := e.hasRecentLateBuffers()
  376. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  377. lastFault := e.lastFaultEvent()
  378. return EngineStats{
  379. State: string(e.currentRuntimeState()),
  380. RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(),
  381. ChunksProduced: e.chunksProduced.Load(),
  382. TotalSamples: e.totalSamples.Load(),
  383. Underruns: e.underruns.Load(),
  384. LateBuffers: lateBuffers,
  385. LastError: errVal,
  386. UptimeSeconds: uptime,
  387. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  388. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  389. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  390. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  391. MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()),
  392. MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()),
  393. Queue: queue,
  394. RuntimeIndicator: ri,
  395. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  396. LastFault: lastFault,
  397. DegradedTransitions: e.degradedTransitions.Load(),
  398. MutedTransitions: e.mutedTransitions.Load(),
  399. FaultedTransitions: e.faultedTransitions.Load(),
  400. FaultCount: e.faultEvents.Load(),
  401. FaultHistory: e.FaultHistory(),
  402. TransitionHistory: e.TransitionHistory(),
  403. }
  404. }
  405. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  406. switch {
  407. case queueHealth == output.QueueHealthCritical:
  408. return RuntimeIndicatorQueueCritical
  409. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  410. return RuntimeIndicatorDegraded
  411. default:
  412. return RuntimeIndicatorNormal
  413. }
  414. }
  415. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  416. switch {
  417. case queueHealth == output.QueueHealthCritical:
  418. return "queue health critical"
  419. case recentLateBuffers:
  420. return "late buffers"
  421. case queueHealth == output.QueueHealthLow:
  422. return "queue health low"
  423. default:
  424. return ""
  425. }
  426. }
  427. func runtimeStateSeverity(state RuntimeState) string {
  428. switch state {
  429. case RuntimeStateRunning:
  430. return "ok"
  431. case RuntimeStateDegraded, RuntimeStateMuted:
  432. return "warn"
  433. case RuntimeStateFaulted:
  434. return "err"
  435. default:
  436. return "info"
  437. }
  438. }
  439. func (e *Engine) run(ctx context.Context) {
  440. e.setRuntimeState(RuntimeStatePrebuffering)
  441. e.wg.Add(1)
  442. go e.writerLoop(ctx)
  443. defer e.wg.Done()
  444. for {
  445. if ctx.Err() != nil {
  446. return
  447. }
  448. // Apply pending frequency change between chunks
  449. if pf := e.pendingFreq.Swap(nil); pf != nil {
  450. if err := e.driver.Tune(ctx, *pf); err != nil {
  451. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  452. } else {
  453. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  454. }
  455. }
  456. t0 := time.Now()
  457. frame := e.generator.GenerateFrame(e.chunkDuration)
  458. frame.GeneratedAt = t0
  459. t1 := time.Now()
  460. if e.upsampler != nil {
  461. frame = e.upsampler.Process(frame)
  462. frame.GeneratedAt = t0
  463. }
  464. t2 := time.Now()
  465. genDur := t1.Sub(t0)
  466. upDur := t2.Sub(t1)
  467. updateMaxDuration(&e.maxGenerateNs, genDur)
  468. updateMaxDuration(&e.maxUpsampleNs, upDur)
  469. enqueued := cloneFrame(frame)
  470. enqueued.EnqueuedAt = time.Now()
  471. if enqueued == nil {
  472. e.lastError.Store("engine: frame clone failed")
  473. e.underruns.Add(1)
  474. continue
  475. }
  476. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  477. if ctx.Err() != nil {
  478. return
  479. }
  480. if errors.Is(err, output.ErrFrameQueueClosed) {
  481. return
  482. }
  483. e.lastError.Store(err.Error())
  484. e.underruns.Add(1)
  485. select {
  486. case <-time.After(e.chunkDuration):
  487. case <-ctx.Done():
  488. return
  489. }
  490. continue
  491. }
  492. queueStats := e.frameQueue.Stats()
  493. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  494. }
  495. }
  496. func (e *Engine) writerLoop(ctx context.Context) {
  497. defer e.wg.Done()
  498. for {
  499. frame, err := e.frameQueue.Pop(ctx)
  500. if err != nil {
  501. if ctx.Err() != nil {
  502. return
  503. }
  504. if errors.Is(err, output.ErrFrameQueueClosed) {
  505. return
  506. }
  507. e.lastError.Store(err.Error())
  508. e.underruns.Add(1)
  509. continue
  510. }
  511. frame.DequeuedAt = time.Now()
  512. queueResidence := time.Duration(0)
  513. if !frame.EnqueuedAt.IsZero() {
  514. queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt)
  515. }
  516. writeStart := time.Now()
  517. frame.WriteStartedAt = writeStart
  518. n, err := e.driver.Write(ctx, frame)
  519. writeDur := time.Since(writeStart)
  520. pipelineLatency := writeDur
  521. if !frame.GeneratedAt.IsZero() {
  522. pipelineLatency = time.Since(frame.GeneratedAt)
  523. }
  524. updateMaxDuration(&e.maxWriteNs, writeDur)
  525. updateMaxDuration(&e.maxQueueResidenceNs, queueResidence)
  526. updateMaxDuration(&e.maxPipelineNs, pipelineLatency)
  527. updateMaxDuration(&e.maxCycleNs, writeDur)
  528. queueStats := e.frameQueue.Stats()
  529. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  530. if writeDur > e.chunkDuration {
  531. late := e.lateBuffers.Add(1)
  532. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  533. if late <= 5 || late%20 == 0 {
  534. log.Printf("TX LATE: write=%s budget=%s over=%s queueResidence=%s pipeline=%s",
  535. writeDur, e.chunkDuration, writeDur-e.chunkDuration, queueResidence, pipelineLatency)
  536. }
  537. }
  538. if err != nil {
  539. if ctx.Err() != nil {
  540. return
  541. }
  542. e.lastError.Store(err.Error())
  543. e.underruns.Add(1)
  544. select {
  545. case <-time.After(e.chunkDuration):
  546. case <-ctx.Done():
  547. return
  548. }
  549. continue
  550. }
  551. e.chunksProduced.Add(1)
  552. e.totalSamples.Add(uint64(n))
  553. }
  554. }
  555. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  556. if src == nil {
  557. return nil
  558. }
  559. samples := make([]output.IQSample, len(src.Samples))
  560. copy(samples, src.Samples)
  561. return &output.CompositeFrame{
  562. Samples: samples,
  563. SampleRateHz: src.SampleRateHz,
  564. Timestamp: src.Timestamp,
  565. GeneratedAt: src.GeneratedAt,
  566. EnqueuedAt: src.EnqueuedAt,
  567. DequeuedAt: src.DequeuedAt,
  568. WriteStartedAt: src.WriteStartedAt,
  569. Sequence: src.Sequence,
  570. }
  571. }
  572. func (e *Engine) setRuntimeState(state RuntimeState) {
  573. now := time.Now()
  574. prev := e.currentRuntimeState()
  575. if prev != state {
  576. e.recordRuntimeTransition(prev, state, now)
  577. switch state {
  578. case RuntimeStateDegraded:
  579. e.degradedTransitions.Add(1)
  580. case RuntimeStateMuted:
  581. e.mutedTransitions.Add(1)
  582. case RuntimeStateFaulted:
  583. e.faultedTransitions.Add(1)
  584. }
  585. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  586. } else if e.runtimeStateEnteredAt.Load() == 0 {
  587. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  588. }
  589. e.runtimeState.Store(state)
  590. }
  591. func (e *Engine) currentRuntimeState() RuntimeState {
  592. if v := e.runtimeState.Load(); v != nil {
  593. if rs, ok := v.(RuntimeState); ok {
  594. return rs
  595. }
  596. }
  597. return RuntimeStateIdle
  598. }
  599. func (e *Engine) runtimeStateDurationSeconds() float64 {
  600. if ts := e.runtimeStateEnteredAt.Load(); ts != 0 {
  601. return time.Since(time.Unix(0, int64(ts))).Seconds()
  602. }
  603. return 0
  604. }
  605. func (e *Engine) hasRecentLateBuffers() bool {
  606. lateAlertAt := e.lateBufferAlertAt.Load()
  607. if lateAlertAt == 0 {
  608. return false
  609. }
  610. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  611. }
  612. func (e *Engine) lastFaultEvent() *FaultEvent {
  613. return copyFaultEvent(e.loadLastFault())
  614. }
  615. // LastFault exposes the most recent captured fault, if any.
  616. func (e *Engine) LastFault() *FaultEvent {
  617. return e.lastFaultEvent()
  618. }
  619. func (e *Engine) FaultHistory() []FaultEvent {
  620. e.faultHistoryMu.Lock()
  621. defer e.faultHistoryMu.Unlock()
  622. history := make([]FaultEvent, len(e.faultHistory))
  623. copy(history, e.faultHistory)
  624. return history
  625. }
  626. func (e *Engine) TransitionHistory() []RuntimeTransition {
  627. e.transitionHistoryMu.Lock()
  628. defer e.transitionHistoryMu.Unlock()
  629. history := make([]RuntimeTransition, len(e.transitionHistory))
  630. copy(history, e.transitionHistory)
  631. return history
  632. }
  633. func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) {
  634. if when.IsZero() {
  635. when = time.Now()
  636. }
  637. ev := RuntimeTransition{
  638. Time: when,
  639. From: from,
  640. To: to,
  641. Severity: runtimeStateSeverity(to),
  642. }
  643. e.transitionHistoryMu.Lock()
  644. defer e.transitionHistoryMu.Unlock()
  645. if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
  646. copy(e.transitionHistory, e.transitionHistory[1:])
  647. e.transitionHistory[len(e.transitionHistory)-1] = ev
  648. return
  649. }
  650. e.transitionHistory = append(e.transitionHistory, ev)
  651. }
  652. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  653. if reason == "" {
  654. reason = FaultReasonUnknown
  655. }
  656. now := time.Now()
  657. if last := e.loadLastFault(); last != nil {
  658. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  659. return
  660. }
  661. }
  662. ev := &FaultEvent{
  663. Time: now,
  664. Reason: reason,
  665. Severity: severity,
  666. Message: message,
  667. }
  668. e.lastFault.Store(ev)
  669. e.appendFaultHistory(ev)
  670. e.faultEvents.Add(1)
  671. }
  672. func (e *Engine) loadLastFault() *FaultEvent {
  673. if v := e.lastFault.Load(); v != nil {
  674. if ev, ok := v.(*FaultEvent); ok {
  675. return ev
  676. }
  677. }
  678. return nil
  679. }
  680. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  681. if source == nil {
  682. return nil
  683. }
  684. copy := *source
  685. return &copy
  686. }
  687. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  688. e.faultHistoryMu.Lock()
  689. defer e.faultHistoryMu.Unlock()
  690. if len(e.faultHistory) >= faultHistoryCapacity {
  691. copy(e.faultHistory, e.faultHistory[1:])
  692. e.faultHistory[len(e.faultHistory)-1] = *ev
  693. return
  694. }
  695. e.faultHistory = append(e.faultHistory, *ev)
  696. }
  697. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  698. state := e.currentRuntimeState()
  699. switch state {
  700. case RuntimeStateStopping, RuntimeStateFaulted:
  701. return
  702. case RuntimeStateMuted:
  703. if queue.Health == output.QueueHealthCritical {
  704. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  705. e.mutedFaultStreak.Store(0)
  706. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  707. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  708. e.setRuntimeState(RuntimeStateFaulted)
  709. return
  710. }
  711. } else {
  712. e.mutedFaultStreak.Store(0)
  713. }
  714. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  715. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  716. e.mutedRecoveryStreak.Store(0)
  717. e.mutedFaultStreak.Store(0)
  718. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  719. fmt.Sprintf("queue healthy for %d checks after mute", count))
  720. e.setRuntimeState(RuntimeStateDegraded)
  721. }
  722. } else {
  723. e.mutedRecoveryStreak.Store(0)
  724. }
  725. return
  726. }
  727. if state == RuntimeStatePrebuffering {
  728. if queue.Depth >= 1 {
  729. e.setRuntimeState(RuntimeStateRunning)
  730. }
  731. return
  732. }
  733. critical := queue.Health == output.QueueHealthCritical
  734. if critical {
  735. count := e.criticalStreak.Add(1)
  736. if count >= queueMutedStreakThreshold {
  737. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  738. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  739. e.setRuntimeState(RuntimeStateMuted)
  740. return
  741. }
  742. if count >= queueCriticalStreakThreshold {
  743. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  744. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  745. e.setRuntimeState(RuntimeStateDegraded)
  746. return
  747. }
  748. } else {
  749. e.criticalStreak.Store(0)
  750. }
  751. if hasLateBuffers {
  752. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  753. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  754. e.setRuntimeState(RuntimeStateDegraded)
  755. return
  756. }
  757. e.setRuntimeState(RuntimeStateRunning)
  758. }
  759. // ResetFault attempts to move the engine out of the faulted state.
  760. func (e *Engine) ResetFault() error {
  761. state := e.currentRuntimeState()
  762. if state != RuntimeStateFaulted {
  763. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  764. }
  765. e.criticalStreak.Store(0)
  766. e.mutedRecoveryStreak.Store(0)
  767. e.mutedFaultStreak.Store(0)
  768. e.setRuntimeState(RuntimeStateDegraded)
  769. return nil
  770. }