Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

686 linhas
18KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. ChunksProduced uint64 `json:"chunksProduced"`
  64. TotalSamples uint64 `json:"totalSamples"`
  65. Underruns uint64 `json:"underruns"`
  66. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  67. LastError string `json:"lastError,omitempty"`
  68. UptimeSeconds float64 `json:"uptimeSeconds"`
  69. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  70. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  71. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  72. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  73. Queue output.QueueStats `json:"queue"`
  74. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  75. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  76. LastFault *FaultEvent `json:"lastFault,omitempty"`
  77. }
  78. type RuntimeIndicator string
  79. const (
  80. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  81. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  82. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  83. )
  84. const (
  85. lateBufferIndicatorWindow = 5 * time.Second
  86. queueCriticalStreakThreshold = 3
  87. faultRepeatWindow = 1 * time.Second
  88. faultHistoryCapacity = 8
  89. )
  90. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  91. // resamples to device rate, and pushes to hardware in a tight loop.
  92. // The hardware buffer_push call is blocking — it returns when the hardware
  93. // has consumed the previous buffer and is ready for the next one.
  94. // This naturally paces the loop to real-time without a ticker.
  95. type Engine struct {
  96. cfg cfgpkg.Config
  97. driver platform.SoapyDriver
  98. generator *offpkg.Generator
  99. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  100. chunkDuration time.Duration
  101. deviceRate float64
  102. frameQueue *output.FrameQueue
  103. mu sync.Mutex
  104. state EngineState
  105. cancel context.CancelFunc
  106. startedAt time.Time
  107. wg sync.WaitGroup
  108. runtimeState atomic.Value
  109. chunksProduced atomic.Uint64
  110. totalSamples atomic.Uint64
  111. underruns atomic.Uint64
  112. lateBuffers atomic.Uint64
  113. lateBufferAlertAt atomic.Uint64
  114. criticalStreak atomic.Uint64
  115. maxCycleNs atomic.Uint64
  116. maxGenerateNs atomic.Uint64
  117. maxUpsampleNs atomic.Uint64
  118. maxWriteNs atomic.Uint64
  119. lastError atomic.Value // string
  120. lastFault atomic.Value // *FaultEvent
  121. faultHistoryMu sync.Mutex
  122. faultHistory []FaultEvent
  123. // Live config: pending frequency change, applied between chunks
  124. pendingFreq atomic.Pointer[float64]
  125. // Live audio stream (optional)
  126. streamSrc *audio.StreamSource
  127. }
  128. // SetStreamSource configures a live audio stream as the audio source.
  129. // Must be called before Start(). The StreamResampler is created internally
  130. // to convert from the stream's sample rate to the DSP composite rate.
  131. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  132. e.streamSrc = src
  133. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  134. if compositeRate <= 0 {
  135. compositeRate = 228000
  136. }
  137. resampler := audio.NewStreamResampler(src, compositeRate)
  138. e.generator.SetExternalSource(resampler)
  139. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  140. src.SampleRate, compositeRate, src.Stats().Capacity)
  141. }
  142. // StreamSource returns the live audio stream source, or nil.
  143. // Used by the control server for stats and HTTP audio ingest.
  144. func (e *Engine) StreamSource() *audio.StreamSource {
  145. return e.streamSrc
  146. }
  147. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  148. deviceRate := cfg.EffectiveDeviceRate()
  149. compositeRate := float64(cfg.FM.CompositeRateHz)
  150. if compositeRate <= 0 {
  151. compositeRate = 228000
  152. }
  153. var upsampler *dsp.FMUpsampler
  154. if deviceRate > compositeRate*1.001 {
  155. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  156. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  157. // This halves CPU load compared to running all DSP at deviceRate.
  158. cfg.FM.FMModulationEnabled = false
  159. maxDev := cfg.FM.MaxDeviationHz
  160. if maxDev <= 0 {
  161. maxDev = 75000
  162. }
  163. // mpxGain scales the FM deviation to compensate for hardware
  164. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  165. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  166. maxDev *= cfg.FM.MpxGain
  167. }
  168. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  169. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  170. compositeRate, deviceRate, deviceRate/compositeRate)
  171. } else {
  172. // Same-rate: entire DSP chain runs at deviceRate.
  173. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  174. if deviceRate > 0 {
  175. cfg.FM.CompositeRateHz = int(deviceRate)
  176. }
  177. cfg.FM.FMModulationEnabled = true
  178. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  179. }
  180. engine := &Engine{
  181. cfg: cfg,
  182. driver: driver,
  183. generator: offpkg.NewGenerator(cfg),
  184. upsampler: upsampler,
  185. chunkDuration: 50 * time.Millisecond,
  186. deviceRate: deviceRate,
  187. state: EngineIdle,
  188. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  189. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  190. }
  191. engine.setRuntimeState(RuntimeStateIdle)
  192. return engine
  193. }
  194. func (e *Engine) SetChunkDuration(d time.Duration) {
  195. e.chunkDuration = d
  196. }
  197. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  198. // nil pointers mean "no change". Validated before applying.
  199. type LiveConfigUpdate struct {
  200. FrequencyMHz *float64
  201. OutputDrive *float64
  202. StereoEnabled *bool
  203. PilotLevel *float64
  204. RDSInjection *float64
  205. RDSEnabled *bool
  206. LimiterEnabled *bool
  207. LimiterCeiling *float64
  208. PS *string
  209. RadioText *string
  210. }
  211. // UpdateConfig applies live parameter changes without restarting the engine.
  212. // DSP params take effect at the next chunk boundary (~50ms max).
  213. // Frequency changes are applied between chunks via driver.Tune().
  214. // RDS text updates are applied at the next RDS group boundary (~88ms).
  215. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  216. // --- Validate ---
  217. if u.FrequencyMHz != nil {
  218. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  219. return fmt.Errorf("frequencyMHz out of range (65-110)")
  220. }
  221. }
  222. if u.OutputDrive != nil {
  223. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  224. return fmt.Errorf("outputDrive out of range (0-10)")
  225. }
  226. }
  227. if u.PilotLevel != nil {
  228. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  229. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  230. }
  231. }
  232. if u.RDSInjection != nil {
  233. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  234. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  235. }
  236. }
  237. if u.LimiterCeiling != nil {
  238. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  239. return fmt.Errorf("limiterCeiling out of range (0-2)")
  240. }
  241. }
  242. // --- Frequency: store for run loop to apply via driver.Tune() ---
  243. if u.FrequencyMHz != nil {
  244. freqHz := *u.FrequencyMHz * 1e6
  245. e.pendingFreq.Store(&freqHz)
  246. }
  247. // --- RDS text: forward to encoder atomics ---
  248. if u.PS != nil || u.RadioText != nil {
  249. if enc := e.generator.RDSEncoder(); enc != nil {
  250. ps, rt := "", ""
  251. if u.PS != nil {
  252. ps = *u.PS
  253. }
  254. if u.RadioText != nil {
  255. rt = *u.RadioText
  256. }
  257. enc.UpdateText(ps, rt)
  258. }
  259. }
  260. // --- DSP params: build new LiveParams from current + patch ---
  261. // Read current, apply deltas, store new
  262. current := e.generator.CurrentLiveParams()
  263. next := current // copy
  264. if u.OutputDrive != nil {
  265. next.OutputDrive = *u.OutputDrive
  266. }
  267. if u.StereoEnabled != nil {
  268. next.StereoEnabled = *u.StereoEnabled
  269. }
  270. if u.PilotLevel != nil {
  271. next.PilotLevel = *u.PilotLevel
  272. }
  273. if u.RDSInjection != nil {
  274. next.RDSInjection = *u.RDSInjection
  275. }
  276. if u.RDSEnabled != nil {
  277. next.RDSEnabled = *u.RDSEnabled
  278. }
  279. if u.LimiterEnabled != nil {
  280. next.LimiterEnabled = *u.LimiterEnabled
  281. }
  282. if u.LimiterCeiling != nil {
  283. next.LimiterCeiling = *u.LimiterCeiling
  284. }
  285. e.generator.UpdateLive(next)
  286. return nil
  287. }
  288. func (e *Engine) Start(ctx context.Context) error {
  289. e.mu.Lock()
  290. if e.state != EngineIdle {
  291. e.mu.Unlock()
  292. return fmt.Errorf("engine already in state %s", e.state)
  293. }
  294. if err := e.driver.Start(ctx); err != nil {
  295. e.mu.Unlock()
  296. return fmt.Errorf("driver start: %w", err)
  297. }
  298. runCtx, cancel := context.WithCancel(ctx)
  299. e.cancel = cancel
  300. e.state = EngineRunning
  301. e.setRuntimeState(RuntimeStateArming)
  302. e.startedAt = time.Now()
  303. e.wg.Add(1)
  304. e.mu.Unlock()
  305. go e.run(runCtx)
  306. return nil
  307. }
  308. func (e *Engine) Stop(ctx context.Context) error {
  309. e.mu.Lock()
  310. if e.state != EngineRunning {
  311. e.mu.Unlock()
  312. return nil
  313. }
  314. e.state = EngineStopping
  315. e.setRuntimeState(RuntimeStateStopping)
  316. e.cancel()
  317. e.mu.Unlock()
  318. // Wait for run() goroutine to exit — deterministic, no guessing
  319. e.wg.Wait()
  320. if err := e.driver.Flush(ctx); err != nil {
  321. return err
  322. }
  323. if err := e.driver.Stop(ctx); err != nil {
  324. return err
  325. }
  326. e.mu.Lock()
  327. e.state = EngineIdle
  328. e.setRuntimeState(RuntimeStateIdle)
  329. e.mu.Unlock()
  330. return nil
  331. }
  332. func (e *Engine) Stats() EngineStats {
  333. e.mu.Lock()
  334. state := e.state
  335. startedAt := e.startedAt
  336. e.mu.Unlock()
  337. var uptime float64
  338. if state == EngineRunning {
  339. uptime = time.Since(startedAt).Seconds()
  340. }
  341. errVal, _ := e.lastError.Load().(string)
  342. queue := e.frameQueue.Stats()
  343. lateBuffers := e.lateBuffers.Load()
  344. hasRecentLateBuffers := e.hasRecentLateBuffers()
  345. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  346. lastFault := e.lastFaultEvent()
  347. return EngineStats{
  348. State: string(e.currentRuntimeState()),
  349. ChunksProduced: e.chunksProduced.Load(),
  350. TotalSamples: e.totalSamples.Load(),
  351. Underruns: e.underruns.Load(),
  352. LateBuffers: lateBuffers,
  353. LastError: errVal,
  354. UptimeSeconds: uptime,
  355. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  356. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  357. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  358. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  359. Queue: queue,
  360. RuntimeIndicator: ri,
  361. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  362. LastFault: lastFault,
  363. }
  364. }
  365. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  366. switch {
  367. case queueHealth == output.QueueHealthCritical:
  368. return RuntimeIndicatorQueueCritical
  369. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  370. return RuntimeIndicatorDegraded
  371. default:
  372. return RuntimeIndicatorNormal
  373. }
  374. }
  375. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  376. switch {
  377. case queueHealth == output.QueueHealthCritical:
  378. return "queue health critical"
  379. case recentLateBuffers:
  380. return "late buffers"
  381. case queueHealth == output.QueueHealthLow:
  382. return "queue health low"
  383. default:
  384. return ""
  385. }
  386. }
  387. func (e *Engine) run(ctx context.Context) {
  388. e.setRuntimeState(RuntimeStatePrebuffering)
  389. e.wg.Add(1)
  390. go e.writerLoop(ctx)
  391. defer e.wg.Done()
  392. for {
  393. if ctx.Err() != nil {
  394. return
  395. }
  396. // Apply pending frequency change between chunks
  397. if pf := e.pendingFreq.Swap(nil); pf != nil {
  398. if err := e.driver.Tune(ctx, *pf); err != nil {
  399. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  400. } else {
  401. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  402. }
  403. }
  404. t0 := time.Now()
  405. frame := e.generator.GenerateFrame(e.chunkDuration)
  406. frame.GeneratedAt = t0
  407. t1 := time.Now()
  408. if e.upsampler != nil {
  409. frame = e.upsampler.Process(frame)
  410. frame.GeneratedAt = t0
  411. }
  412. t2 := time.Now()
  413. genDur := t1.Sub(t0)
  414. upDur := t2.Sub(t1)
  415. updateMaxDuration(&e.maxGenerateNs, genDur)
  416. updateMaxDuration(&e.maxUpsampleNs, upDur)
  417. enqueued := cloneFrame(frame)
  418. if enqueued == nil {
  419. e.lastError.Store("engine: frame clone failed")
  420. e.underruns.Add(1)
  421. continue
  422. }
  423. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  424. if ctx.Err() != nil {
  425. return
  426. }
  427. if errors.Is(err, output.ErrFrameQueueClosed) {
  428. return
  429. }
  430. e.lastError.Store(err.Error())
  431. e.underruns.Add(1)
  432. select {
  433. case <-time.After(e.chunkDuration):
  434. case <-ctx.Done():
  435. return
  436. }
  437. continue
  438. }
  439. queueStats := e.frameQueue.Stats()
  440. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  441. }
  442. }
  443. func (e *Engine) writerLoop(ctx context.Context) {
  444. defer e.wg.Done()
  445. for {
  446. frame, err := e.frameQueue.Pop(ctx)
  447. if err != nil {
  448. if ctx.Err() != nil {
  449. return
  450. }
  451. if errors.Is(err, output.ErrFrameQueueClosed) {
  452. return
  453. }
  454. e.lastError.Store(err.Error())
  455. e.underruns.Add(1)
  456. continue
  457. }
  458. writeStart := time.Now()
  459. n, err := e.driver.Write(ctx, frame)
  460. writeDur := time.Since(writeStart)
  461. cycleDur := writeDur
  462. if !frame.GeneratedAt.IsZero() {
  463. cycleDur = time.Since(frame.GeneratedAt)
  464. }
  465. updateMaxDuration(&e.maxWriteNs, writeDur)
  466. updateMaxDuration(&e.maxCycleNs, cycleDur)
  467. queueStats := e.frameQueue.Stats()
  468. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  469. if cycleDur > e.chunkDuration {
  470. late := e.lateBuffers.Add(1)
  471. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  472. if late <= 5 || late%20 == 0 {
  473. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  474. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  475. }
  476. }
  477. if err != nil {
  478. if ctx.Err() != nil {
  479. return
  480. }
  481. e.lastError.Store(err.Error())
  482. e.underruns.Add(1)
  483. select {
  484. case <-time.After(e.chunkDuration):
  485. case <-ctx.Done():
  486. return
  487. }
  488. continue
  489. }
  490. e.chunksProduced.Add(1)
  491. e.totalSamples.Add(uint64(n))
  492. }
  493. }
  494. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  495. if src == nil {
  496. return nil
  497. }
  498. samples := make([]output.IQSample, len(src.Samples))
  499. copy(samples, src.Samples)
  500. return &output.CompositeFrame{
  501. Samples: samples,
  502. SampleRateHz: src.SampleRateHz,
  503. Timestamp: src.Timestamp,
  504. GeneratedAt: src.GeneratedAt,
  505. Sequence: src.Sequence,
  506. }
  507. }
  508. func (e *Engine) setRuntimeState(state RuntimeState) {
  509. e.runtimeState.Store(state)
  510. }
  511. func (e *Engine) currentRuntimeState() RuntimeState {
  512. if v := e.runtimeState.Load(); v != nil {
  513. if rs, ok := v.(RuntimeState); ok {
  514. return rs
  515. }
  516. }
  517. return RuntimeStateIdle
  518. }
  519. func (e *Engine) hasRecentLateBuffers() bool {
  520. lateAlertAt := e.lateBufferAlertAt.Load()
  521. if lateAlertAt == 0 {
  522. return false
  523. }
  524. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  525. }
  526. func (e *Engine) lastFaultEvent() *FaultEvent {
  527. return copyFaultEvent(e.loadLastFault())
  528. }
  529. // LastFault exposes the most recent captured fault, if any.
  530. func (e *Engine) LastFault() *FaultEvent {
  531. return e.lastFaultEvent()
  532. }
  533. func (e *Engine) FaultHistory() []FaultEvent {
  534. e.faultHistoryMu.Lock()
  535. defer e.faultHistoryMu.Unlock()
  536. history := make([]FaultEvent, len(e.faultHistory))
  537. copy(history, e.faultHistory)
  538. return history
  539. }
  540. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  541. if reason == "" {
  542. reason = FaultReasonUnknown
  543. }
  544. now := time.Now()
  545. if last := e.loadLastFault(); last != nil {
  546. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  547. return
  548. }
  549. }
  550. ev := &FaultEvent{
  551. Time: now,
  552. Reason: reason,
  553. Severity: severity,
  554. Message: message,
  555. }
  556. e.lastFault.Store(ev)
  557. e.appendFaultHistory(ev)
  558. }
  559. func (e *Engine) loadLastFault() *FaultEvent {
  560. if v := e.lastFault.Load(); v != nil {
  561. if ev, ok := v.(*FaultEvent); ok {
  562. return ev
  563. }
  564. }
  565. return nil
  566. }
  567. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  568. if source == nil {
  569. return nil
  570. }
  571. copy := *source
  572. return &copy
  573. }
  574. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  575. e.faultHistoryMu.Lock()
  576. defer e.faultHistoryMu.Unlock()
  577. if len(e.faultHistory) >= faultHistoryCapacity {
  578. copy(e.faultHistory, e.faultHistory[1:])
  579. e.faultHistory[len(e.faultHistory)-1] = *ev
  580. return
  581. }
  582. e.faultHistory = append(e.faultHistory, *ev)
  583. }
  584. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  585. state := e.currentRuntimeState()
  586. switch state {
  587. case RuntimeStateStopping, RuntimeStateFaulted:
  588. return
  589. }
  590. if state == RuntimeStatePrebuffering {
  591. if queue.Depth >= 1 {
  592. e.setRuntimeState(RuntimeStateRunning)
  593. }
  594. return
  595. }
  596. critical := queue.Health == output.QueueHealthCritical
  597. if critical {
  598. if e.criticalStreak.Add(1) >= queueCriticalStreakThreshold {
  599. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  600. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  601. e.setRuntimeState(RuntimeStateDegraded)
  602. return
  603. }
  604. } else {
  605. e.criticalStreak.Store(0)
  606. }
  607. if hasLateBuffers {
  608. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  609. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  610. e.setRuntimeState(RuntimeStateDegraded)
  611. return
  612. }
  613. e.setRuntimeState(RuntimeStateRunning)
  614. }