Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

694 Zeilen
18KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. ChunksProduced uint64 `json:"chunksProduced"`
  64. TotalSamples uint64 `json:"totalSamples"`
  65. Underruns uint64 `json:"underruns"`
  66. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  67. LastError string `json:"lastError,omitempty"`
  68. UptimeSeconds float64 `json:"uptimeSeconds"`
  69. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  70. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  71. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  72. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  73. Queue output.QueueStats `json:"queue"`
  74. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  75. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  76. LastFault *FaultEvent `json:"lastFault,omitempty"`
  77. }
  78. type RuntimeIndicator string
  79. const (
  80. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  81. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  82. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  83. )
  84. const (
  85. lateBufferIndicatorWindow = 5 * time.Second
  86. queueCriticalStreakThreshold = 3
  87. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  88. faultRepeatWindow = 1 * time.Second
  89. faultHistoryCapacity = 8
  90. )
  91. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  92. // resamples to device rate, and pushes to hardware in a tight loop.
  93. // The hardware buffer_push call is blocking — it returns when the hardware
  94. // has consumed the previous buffer and is ready for the next one.
  95. // This naturally paces the loop to real-time without a ticker.
  96. type Engine struct {
  97. cfg cfgpkg.Config
  98. driver platform.SoapyDriver
  99. generator *offpkg.Generator
  100. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  101. chunkDuration time.Duration
  102. deviceRate float64
  103. frameQueue *output.FrameQueue
  104. mu sync.Mutex
  105. state EngineState
  106. cancel context.CancelFunc
  107. startedAt time.Time
  108. wg sync.WaitGroup
  109. runtimeState atomic.Value
  110. chunksProduced atomic.Uint64
  111. totalSamples atomic.Uint64
  112. underruns atomic.Uint64
  113. lateBuffers atomic.Uint64
  114. lateBufferAlertAt atomic.Uint64
  115. criticalStreak atomic.Uint64
  116. maxCycleNs atomic.Uint64
  117. maxGenerateNs atomic.Uint64
  118. maxUpsampleNs atomic.Uint64
  119. maxWriteNs atomic.Uint64
  120. lastError atomic.Value // string
  121. lastFault atomic.Value // *FaultEvent
  122. faultHistoryMu sync.Mutex
  123. faultHistory []FaultEvent
  124. // Live config: pending frequency change, applied between chunks
  125. pendingFreq atomic.Pointer[float64]
  126. // Live audio stream (optional)
  127. streamSrc *audio.StreamSource
  128. }
  129. // SetStreamSource configures a live audio stream as the audio source.
  130. // Must be called before Start(). The StreamResampler is created internally
  131. // to convert from the stream's sample rate to the DSP composite rate.
  132. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  133. e.streamSrc = src
  134. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  135. if compositeRate <= 0 {
  136. compositeRate = 228000
  137. }
  138. resampler := audio.NewStreamResampler(src, compositeRate)
  139. e.generator.SetExternalSource(resampler)
  140. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  141. src.SampleRate, compositeRate, src.Stats().Capacity)
  142. }
  143. // StreamSource returns the live audio stream source, or nil.
  144. // Used by the control server for stats and HTTP audio ingest.
  145. func (e *Engine) StreamSource() *audio.StreamSource {
  146. return e.streamSrc
  147. }
  148. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  149. deviceRate := cfg.EffectiveDeviceRate()
  150. compositeRate := float64(cfg.FM.CompositeRateHz)
  151. if compositeRate <= 0 {
  152. compositeRate = 228000
  153. }
  154. var upsampler *dsp.FMUpsampler
  155. if deviceRate > compositeRate*1.001 {
  156. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  157. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  158. // This halves CPU load compared to running all DSP at deviceRate.
  159. cfg.FM.FMModulationEnabled = false
  160. maxDev := cfg.FM.MaxDeviationHz
  161. if maxDev <= 0 {
  162. maxDev = 75000
  163. }
  164. // mpxGain scales the FM deviation to compensate for hardware
  165. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  166. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  167. maxDev *= cfg.FM.MpxGain
  168. }
  169. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  170. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  171. compositeRate, deviceRate, deviceRate/compositeRate)
  172. } else {
  173. // Same-rate: entire DSP chain runs at deviceRate.
  174. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  175. if deviceRate > 0 {
  176. cfg.FM.CompositeRateHz = int(deviceRate)
  177. }
  178. cfg.FM.FMModulationEnabled = true
  179. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  180. }
  181. engine := &Engine{
  182. cfg: cfg,
  183. driver: driver,
  184. generator: offpkg.NewGenerator(cfg),
  185. upsampler: upsampler,
  186. chunkDuration: 50 * time.Millisecond,
  187. deviceRate: deviceRate,
  188. state: EngineIdle,
  189. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  190. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  191. }
  192. engine.setRuntimeState(RuntimeStateIdle)
  193. return engine
  194. }
  195. func (e *Engine) SetChunkDuration(d time.Duration) {
  196. e.chunkDuration = d
  197. }
  198. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  199. // nil pointers mean "no change". Validated before applying.
  200. type LiveConfigUpdate struct {
  201. FrequencyMHz *float64
  202. OutputDrive *float64
  203. StereoEnabled *bool
  204. PilotLevel *float64
  205. RDSInjection *float64
  206. RDSEnabled *bool
  207. LimiterEnabled *bool
  208. LimiterCeiling *float64
  209. PS *string
  210. RadioText *string
  211. }
  212. // UpdateConfig applies live parameter changes without restarting the engine.
  213. // DSP params take effect at the next chunk boundary (~50ms max).
  214. // Frequency changes are applied between chunks via driver.Tune().
  215. // RDS text updates are applied at the next RDS group boundary (~88ms).
  216. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  217. // --- Validate ---
  218. if u.FrequencyMHz != nil {
  219. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  220. return fmt.Errorf("frequencyMHz out of range (65-110)")
  221. }
  222. }
  223. if u.OutputDrive != nil {
  224. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  225. return fmt.Errorf("outputDrive out of range (0-10)")
  226. }
  227. }
  228. if u.PilotLevel != nil {
  229. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  230. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  231. }
  232. }
  233. if u.RDSInjection != nil {
  234. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  235. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  236. }
  237. }
  238. if u.LimiterCeiling != nil {
  239. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  240. return fmt.Errorf("limiterCeiling out of range (0-2)")
  241. }
  242. }
  243. // --- Frequency: store for run loop to apply via driver.Tune() ---
  244. if u.FrequencyMHz != nil {
  245. freqHz := *u.FrequencyMHz * 1e6
  246. e.pendingFreq.Store(&freqHz)
  247. }
  248. // --- RDS text: forward to encoder atomics ---
  249. if u.PS != nil || u.RadioText != nil {
  250. if enc := e.generator.RDSEncoder(); enc != nil {
  251. ps, rt := "", ""
  252. if u.PS != nil {
  253. ps = *u.PS
  254. }
  255. if u.RadioText != nil {
  256. rt = *u.RadioText
  257. }
  258. enc.UpdateText(ps, rt)
  259. }
  260. }
  261. // --- DSP params: build new LiveParams from current + patch ---
  262. // Read current, apply deltas, store new
  263. current := e.generator.CurrentLiveParams()
  264. next := current // copy
  265. if u.OutputDrive != nil {
  266. next.OutputDrive = *u.OutputDrive
  267. }
  268. if u.StereoEnabled != nil {
  269. next.StereoEnabled = *u.StereoEnabled
  270. }
  271. if u.PilotLevel != nil {
  272. next.PilotLevel = *u.PilotLevel
  273. }
  274. if u.RDSInjection != nil {
  275. next.RDSInjection = *u.RDSInjection
  276. }
  277. if u.RDSEnabled != nil {
  278. next.RDSEnabled = *u.RDSEnabled
  279. }
  280. if u.LimiterEnabled != nil {
  281. next.LimiterEnabled = *u.LimiterEnabled
  282. }
  283. if u.LimiterCeiling != nil {
  284. next.LimiterCeiling = *u.LimiterCeiling
  285. }
  286. e.generator.UpdateLive(next)
  287. return nil
  288. }
  289. func (e *Engine) Start(ctx context.Context) error {
  290. e.mu.Lock()
  291. if e.state != EngineIdle {
  292. e.mu.Unlock()
  293. return fmt.Errorf("engine already in state %s", e.state)
  294. }
  295. if err := e.driver.Start(ctx); err != nil {
  296. e.mu.Unlock()
  297. return fmt.Errorf("driver start: %w", err)
  298. }
  299. runCtx, cancel := context.WithCancel(ctx)
  300. e.cancel = cancel
  301. e.state = EngineRunning
  302. e.setRuntimeState(RuntimeStateArming)
  303. e.startedAt = time.Now()
  304. e.wg.Add(1)
  305. e.mu.Unlock()
  306. go e.run(runCtx)
  307. return nil
  308. }
  309. func (e *Engine) Stop(ctx context.Context) error {
  310. e.mu.Lock()
  311. if e.state != EngineRunning {
  312. e.mu.Unlock()
  313. return nil
  314. }
  315. e.state = EngineStopping
  316. e.setRuntimeState(RuntimeStateStopping)
  317. e.cancel()
  318. e.mu.Unlock()
  319. // Wait for run() goroutine to exit — deterministic, no guessing
  320. e.wg.Wait()
  321. if err := e.driver.Flush(ctx); err != nil {
  322. return err
  323. }
  324. if err := e.driver.Stop(ctx); err != nil {
  325. return err
  326. }
  327. e.mu.Lock()
  328. e.state = EngineIdle
  329. e.setRuntimeState(RuntimeStateIdle)
  330. e.mu.Unlock()
  331. return nil
  332. }
  333. func (e *Engine) Stats() EngineStats {
  334. e.mu.Lock()
  335. state := e.state
  336. startedAt := e.startedAt
  337. e.mu.Unlock()
  338. var uptime float64
  339. if state == EngineRunning {
  340. uptime = time.Since(startedAt).Seconds()
  341. }
  342. errVal, _ := e.lastError.Load().(string)
  343. queue := e.frameQueue.Stats()
  344. lateBuffers := e.lateBuffers.Load()
  345. hasRecentLateBuffers := e.hasRecentLateBuffers()
  346. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  347. lastFault := e.lastFaultEvent()
  348. return EngineStats{
  349. State: string(e.currentRuntimeState()),
  350. ChunksProduced: e.chunksProduced.Load(),
  351. TotalSamples: e.totalSamples.Load(),
  352. Underruns: e.underruns.Load(),
  353. LateBuffers: lateBuffers,
  354. LastError: errVal,
  355. UptimeSeconds: uptime,
  356. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  357. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  358. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  359. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  360. Queue: queue,
  361. RuntimeIndicator: ri,
  362. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  363. LastFault: lastFault,
  364. }
  365. }
  366. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  367. switch {
  368. case queueHealth == output.QueueHealthCritical:
  369. return RuntimeIndicatorQueueCritical
  370. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  371. return RuntimeIndicatorDegraded
  372. default:
  373. return RuntimeIndicatorNormal
  374. }
  375. }
  376. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  377. switch {
  378. case queueHealth == output.QueueHealthCritical:
  379. return "queue health critical"
  380. case recentLateBuffers:
  381. return "late buffers"
  382. case queueHealth == output.QueueHealthLow:
  383. return "queue health low"
  384. default:
  385. return ""
  386. }
  387. }
  388. func (e *Engine) run(ctx context.Context) {
  389. e.setRuntimeState(RuntimeStatePrebuffering)
  390. e.wg.Add(1)
  391. go e.writerLoop(ctx)
  392. defer e.wg.Done()
  393. for {
  394. if ctx.Err() != nil {
  395. return
  396. }
  397. // Apply pending frequency change between chunks
  398. if pf := e.pendingFreq.Swap(nil); pf != nil {
  399. if err := e.driver.Tune(ctx, *pf); err != nil {
  400. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  401. } else {
  402. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  403. }
  404. }
  405. t0 := time.Now()
  406. frame := e.generator.GenerateFrame(e.chunkDuration)
  407. frame.GeneratedAt = t0
  408. t1 := time.Now()
  409. if e.upsampler != nil {
  410. frame = e.upsampler.Process(frame)
  411. frame.GeneratedAt = t0
  412. }
  413. t2 := time.Now()
  414. genDur := t1.Sub(t0)
  415. upDur := t2.Sub(t1)
  416. updateMaxDuration(&e.maxGenerateNs, genDur)
  417. updateMaxDuration(&e.maxUpsampleNs, upDur)
  418. enqueued := cloneFrame(frame)
  419. if enqueued == nil {
  420. e.lastError.Store("engine: frame clone failed")
  421. e.underruns.Add(1)
  422. continue
  423. }
  424. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  425. if ctx.Err() != nil {
  426. return
  427. }
  428. if errors.Is(err, output.ErrFrameQueueClosed) {
  429. return
  430. }
  431. e.lastError.Store(err.Error())
  432. e.underruns.Add(1)
  433. select {
  434. case <-time.After(e.chunkDuration):
  435. case <-ctx.Done():
  436. return
  437. }
  438. continue
  439. }
  440. queueStats := e.frameQueue.Stats()
  441. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  442. }
  443. }
  444. func (e *Engine) writerLoop(ctx context.Context) {
  445. defer e.wg.Done()
  446. for {
  447. frame, err := e.frameQueue.Pop(ctx)
  448. if err != nil {
  449. if ctx.Err() != nil {
  450. return
  451. }
  452. if errors.Is(err, output.ErrFrameQueueClosed) {
  453. return
  454. }
  455. e.lastError.Store(err.Error())
  456. e.underruns.Add(1)
  457. continue
  458. }
  459. writeStart := time.Now()
  460. n, err := e.driver.Write(ctx, frame)
  461. writeDur := time.Since(writeStart)
  462. cycleDur := writeDur
  463. if !frame.GeneratedAt.IsZero() {
  464. cycleDur = time.Since(frame.GeneratedAt)
  465. }
  466. updateMaxDuration(&e.maxWriteNs, writeDur)
  467. updateMaxDuration(&e.maxCycleNs, cycleDur)
  468. queueStats := e.frameQueue.Stats()
  469. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  470. if cycleDur > e.chunkDuration {
  471. late := e.lateBuffers.Add(1)
  472. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  473. if late <= 5 || late%20 == 0 {
  474. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  475. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  476. }
  477. }
  478. if err != nil {
  479. if ctx.Err() != nil {
  480. return
  481. }
  482. e.lastError.Store(err.Error())
  483. e.underruns.Add(1)
  484. select {
  485. case <-time.After(e.chunkDuration):
  486. case <-ctx.Done():
  487. return
  488. }
  489. continue
  490. }
  491. e.chunksProduced.Add(1)
  492. e.totalSamples.Add(uint64(n))
  493. }
  494. }
  495. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  496. if src == nil {
  497. return nil
  498. }
  499. samples := make([]output.IQSample, len(src.Samples))
  500. copy(samples, src.Samples)
  501. return &output.CompositeFrame{
  502. Samples: samples,
  503. SampleRateHz: src.SampleRateHz,
  504. Timestamp: src.Timestamp,
  505. GeneratedAt: src.GeneratedAt,
  506. Sequence: src.Sequence,
  507. }
  508. }
  509. func (e *Engine) setRuntimeState(state RuntimeState) {
  510. e.runtimeState.Store(state)
  511. }
  512. func (e *Engine) currentRuntimeState() RuntimeState {
  513. if v := e.runtimeState.Load(); v != nil {
  514. if rs, ok := v.(RuntimeState); ok {
  515. return rs
  516. }
  517. }
  518. return RuntimeStateIdle
  519. }
  520. func (e *Engine) hasRecentLateBuffers() bool {
  521. lateAlertAt := e.lateBufferAlertAt.Load()
  522. if lateAlertAt == 0 {
  523. return false
  524. }
  525. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  526. }
  527. func (e *Engine) lastFaultEvent() *FaultEvent {
  528. return copyFaultEvent(e.loadLastFault())
  529. }
  530. // LastFault exposes the most recent captured fault, if any.
  531. func (e *Engine) LastFault() *FaultEvent {
  532. return e.lastFaultEvent()
  533. }
  534. func (e *Engine) FaultHistory() []FaultEvent {
  535. e.faultHistoryMu.Lock()
  536. defer e.faultHistoryMu.Unlock()
  537. history := make([]FaultEvent, len(e.faultHistory))
  538. copy(history, e.faultHistory)
  539. return history
  540. }
  541. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  542. if reason == "" {
  543. reason = FaultReasonUnknown
  544. }
  545. now := time.Now()
  546. if last := e.loadLastFault(); last != nil {
  547. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  548. return
  549. }
  550. }
  551. ev := &FaultEvent{
  552. Time: now,
  553. Reason: reason,
  554. Severity: severity,
  555. Message: message,
  556. }
  557. e.lastFault.Store(ev)
  558. e.appendFaultHistory(ev)
  559. }
  560. func (e *Engine) loadLastFault() *FaultEvent {
  561. if v := e.lastFault.Load(); v != nil {
  562. if ev, ok := v.(*FaultEvent); ok {
  563. return ev
  564. }
  565. }
  566. return nil
  567. }
  568. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  569. if source == nil {
  570. return nil
  571. }
  572. copy := *source
  573. return &copy
  574. }
  575. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  576. e.faultHistoryMu.Lock()
  577. defer e.faultHistoryMu.Unlock()
  578. if len(e.faultHistory) >= faultHistoryCapacity {
  579. copy(e.faultHistory, e.faultHistory[1:])
  580. e.faultHistory[len(e.faultHistory)-1] = *ev
  581. return
  582. }
  583. e.faultHistory = append(e.faultHistory, *ev)
  584. }
  585. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  586. state := e.currentRuntimeState()
  587. switch state {
  588. case RuntimeStateStopping, RuntimeStateFaulted:
  589. return
  590. }
  591. if state == RuntimeStatePrebuffering {
  592. if queue.Depth >= 1 {
  593. e.setRuntimeState(RuntimeStateRunning)
  594. }
  595. return
  596. }
  597. critical := queue.Health == output.QueueHealthCritical
  598. if critical {
  599. count := e.criticalStreak.Add(1)
  600. if count >= queueMutedStreakThreshold {
  601. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  602. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  603. e.setRuntimeState(RuntimeStateMuted)
  604. return
  605. }
  606. if count >= queueCriticalStreakThreshold {
  607. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  608. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  609. e.setRuntimeState(RuntimeStateDegraded)
  610. return
  611. }
  612. } else {
  613. e.criticalStreak.Store(0)
  614. }
  615. if hasLateBuffers {
  616. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  617. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  618. e.setRuntimeState(RuntimeStateDegraded)
  619. return
  620. }
  621. e.setRuntimeState(RuntimeStateRunning)
  622. }