Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

708 lignes
19KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. ChunksProduced uint64 `json:"chunksProduced"`
  64. TotalSamples uint64 `json:"totalSamples"`
  65. Underruns uint64 `json:"underruns"`
  66. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  67. LastError string `json:"lastError,omitempty"`
  68. UptimeSeconds float64 `json:"uptimeSeconds"`
  69. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  70. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  71. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  72. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  73. Queue output.QueueStats `json:"queue"`
  74. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  75. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  76. LastFault *FaultEvent `json:"lastFault,omitempty"`
  77. }
  78. type RuntimeIndicator string
  79. const (
  80. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  81. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  82. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  83. )
  84. const (
  85. lateBufferIndicatorWindow = 5 * time.Second
  86. queueCriticalStreakThreshold = 3
  87. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  88. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  89. faultRepeatWindow = 1 * time.Second
  90. faultHistoryCapacity = 8
  91. )
  92. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  93. // resamples to device rate, and pushes to hardware in a tight loop.
  94. // The hardware buffer_push call is blocking — it returns when the hardware
  95. // has consumed the previous buffer and is ready for the next one.
  96. // This naturally paces the loop to real-time without a ticker.
  97. type Engine struct {
  98. cfg cfgpkg.Config
  99. driver platform.SoapyDriver
  100. generator *offpkg.Generator
  101. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  102. chunkDuration time.Duration
  103. deviceRate float64
  104. frameQueue *output.FrameQueue
  105. mu sync.Mutex
  106. state EngineState
  107. cancel context.CancelFunc
  108. startedAt time.Time
  109. wg sync.WaitGroup
  110. runtimeState atomic.Value
  111. chunksProduced atomic.Uint64
  112. totalSamples atomic.Uint64
  113. underruns atomic.Uint64
  114. lateBuffers atomic.Uint64
  115. lateBufferAlertAt atomic.Uint64
  116. criticalStreak atomic.Uint64
  117. mutedRecoveryStreak atomic.Uint64
  118. maxCycleNs atomic.Uint64
  119. maxGenerateNs atomic.Uint64
  120. maxUpsampleNs atomic.Uint64
  121. maxWriteNs atomic.Uint64
  122. lastError atomic.Value // string
  123. lastFault atomic.Value // *FaultEvent
  124. faultHistoryMu sync.Mutex
  125. faultHistory []FaultEvent
  126. // Live config: pending frequency change, applied between chunks
  127. pendingFreq atomic.Pointer[float64]
  128. // Live audio stream (optional)
  129. streamSrc *audio.StreamSource
  130. }
  131. // SetStreamSource configures a live audio stream as the audio source.
  132. // Must be called before Start(). The StreamResampler is created internally
  133. // to convert from the stream's sample rate to the DSP composite rate.
  134. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  135. e.streamSrc = src
  136. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  137. if compositeRate <= 0 {
  138. compositeRate = 228000
  139. }
  140. resampler := audio.NewStreamResampler(src, compositeRate)
  141. e.generator.SetExternalSource(resampler)
  142. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  143. src.SampleRate, compositeRate, src.Stats().Capacity)
  144. }
  145. // StreamSource returns the live audio stream source, or nil.
  146. // Used by the control server for stats and HTTP audio ingest.
  147. func (e *Engine) StreamSource() *audio.StreamSource {
  148. return e.streamSrc
  149. }
  150. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  151. deviceRate := cfg.EffectiveDeviceRate()
  152. compositeRate := float64(cfg.FM.CompositeRateHz)
  153. if compositeRate <= 0 {
  154. compositeRate = 228000
  155. }
  156. var upsampler *dsp.FMUpsampler
  157. if deviceRate > compositeRate*1.001 {
  158. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  159. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  160. // This halves CPU load compared to running all DSP at deviceRate.
  161. cfg.FM.FMModulationEnabled = false
  162. maxDev := cfg.FM.MaxDeviationHz
  163. if maxDev <= 0 {
  164. maxDev = 75000
  165. }
  166. // mpxGain scales the FM deviation to compensate for hardware
  167. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  168. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  169. maxDev *= cfg.FM.MpxGain
  170. }
  171. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  172. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  173. compositeRate, deviceRate, deviceRate/compositeRate)
  174. } else {
  175. // Same-rate: entire DSP chain runs at deviceRate.
  176. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  177. if deviceRate > 0 {
  178. cfg.FM.CompositeRateHz = int(deviceRate)
  179. }
  180. cfg.FM.FMModulationEnabled = true
  181. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  182. }
  183. engine := &Engine{
  184. cfg: cfg,
  185. driver: driver,
  186. generator: offpkg.NewGenerator(cfg),
  187. upsampler: upsampler,
  188. chunkDuration: 50 * time.Millisecond,
  189. deviceRate: deviceRate,
  190. state: EngineIdle,
  191. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  192. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  193. }
  194. engine.setRuntimeState(RuntimeStateIdle)
  195. return engine
  196. }
  197. func (e *Engine) SetChunkDuration(d time.Duration) {
  198. e.chunkDuration = d
  199. }
  200. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  201. // nil pointers mean "no change". Validated before applying.
  202. type LiveConfigUpdate struct {
  203. FrequencyMHz *float64
  204. OutputDrive *float64
  205. StereoEnabled *bool
  206. PilotLevel *float64
  207. RDSInjection *float64
  208. RDSEnabled *bool
  209. LimiterEnabled *bool
  210. LimiterCeiling *float64
  211. PS *string
  212. RadioText *string
  213. }
  214. // UpdateConfig applies live parameter changes without restarting the engine.
  215. // DSP params take effect at the next chunk boundary (~50ms max).
  216. // Frequency changes are applied between chunks via driver.Tune().
  217. // RDS text updates are applied at the next RDS group boundary (~88ms).
  218. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  219. // --- Validate ---
  220. if u.FrequencyMHz != nil {
  221. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  222. return fmt.Errorf("frequencyMHz out of range (65-110)")
  223. }
  224. }
  225. if u.OutputDrive != nil {
  226. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  227. return fmt.Errorf("outputDrive out of range (0-10)")
  228. }
  229. }
  230. if u.PilotLevel != nil {
  231. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  232. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  233. }
  234. }
  235. if u.RDSInjection != nil {
  236. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  237. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  238. }
  239. }
  240. if u.LimiterCeiling != nil {
  241. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  242. return fmt.Errorf("limiterCeiling out of range (0-2)")
  243. }
  244. }
  245. // --- Frequency: store for run loop to apply via driver.Tune() ---
  246. if u.FrequencyMHz != nil {
  247. freqHz := *u.FrequencyMHz * 1e6
  248. e.pendingFreq.Store(&freqHz)
  249. }
  250. // --- RDS text: forward to encoder atomics ---
  251. if u.PS != nil || u.RadioText != nil {
  252. if enc := e.generator.RDSEncoder(); enc != nil {
  253. ps, rt := "", ""
  254. if u.PS != nil {
  255. ps = *u.PS
  256. }
  257. if u.RadioText != nil {
  258. rt = *u.RadioText
  259. }
  260. enc.UpdateText(ps, rt)
  261. }
  262. }
  263. // --- DSP params: build new LiveParams from current + patch ---
  264. // Read current, apply deltas, store new
  265. current := e.generator.CurrentLiveParams()
  266. next := current // copy
  267. if u.OutputDrive != nil {
  268. next.OutputDrive = *u.OutputDrive
  269. }
  270. if u.StereoEnabled != nil {
  271. next.StereoEnabled = *u.StereoEnabled
  272. }
  273. if u.PilotLevel != nil {
  274. next.PilotLevel = *u.PilotLevel
  275. }
  276. if u.RDSInjection != nil {
  277. next.RDSInjection = *u.RDSInjection
  278. }
  279. if u.RDSEnabled != nil {
  280. next.RDSEnabled = *u.RDSEnabled
  281. }
  282. if u.LimiterEnabled != nil {
  283. next.LimiterEnabled = *u.LimiterEnabled
  284. }
  285. if u.LimiterCeiling != nil {
  286. next.LimiterCeiling = *u.LimiterCeiling
  287. }
  288. e.generator.UpdateLive(next)
  289. return nil
  290. }
  291. func (e *Engine) Start(ctx context.Context) error {
  292. e.mu.Lock()
  293. if e.state != EngineIdle {
  294. e.mu.Unlock()
  295. return fmt.Errorf("engine already in state %s", e.state)
  296. }
  297. if err := e.driver.Start(ctx); err != nil {
  298. e.mu.Unlock()
  299. return fmt.Errorf("driver start: %w", err)
  300. }
  301. runCtx, cancel := context.WithCancel(ctx)
  302. e.cancel = cancel
  303. e.state = EngineRunning
  304. e.setRuntimeState(RuntimeStateArming)
  305. e.startedAt = time.Now()
  306. e.wg.Add(1)
  307. e.mu.Unlock()
  308. go e.run(runCtx)
  309. return nil
  310. }
  311. func (e *Engine) Stop(ctx context.Context) error {
  312. e.mu.Lock()
  313. if e.state != EngineRunning {
  314. e.mu.Unlock()
  315. return nil
  316. }
  317. e.state = EngineStopping
  318. e.setRuntimeState(RuntimeStateStopping)
  319. e.cancel()
  320. e.mu.Unlock()
  321. // Wait for run() goroutine to exit — deterministic, no guessing
  322. e.wg.Wait()
  323. if err := e.driver.Flush(ctx); err != nil {
  324. return err
  325. }
  326. if err := e.driver.Stop(ctx); err != nil {
  327. return err
  328. }
  329. e.mu.Lock()
  330. e.state = EngineIdle
  331. e.setRuntimeState(RuntimeStateIdle)
  332. e.mu.Unlock()
  333. return nil
  334. }
  335. func (e *Engine) Stats() EngineStats {
  336. e.mu.Lock()
  337. state := e.state
  338. startedAt := e.startedAt
  339. e.mu.Unlock()
  340. var uptime float64
  341. if state == EngineRunning {
  342. uptime = time.Since(startedAt).Seconds()
  343. }
  344. errVal, _ := e.lastError.Load().(string)
  345. queue := e.frameQueue.Stats()
  346. lateBuffers := e.lateBuffers.Load()
  347. hasRecentLateBuffers := e.hasRecentLateBuffers()
  348. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  349. lastFault := e.lastFaultEvent()
  350. return EngineStats{
  351. State: string(e.currentRuntimeState()),
  352. ChunksProduced: e.chunksProduced.Load(),
  353. TotalSamples: e.totalSamples.Load(),
  354. Underruns: e.underruns.Load(),
  355. LateBuffers: lateBuffers,
  356. LastError: errVal,
  357. UptimeSeconds: uptime,
  358. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  359. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  360. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  361. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  362. Queue: queue,
  363. RuntimeIndicator: ri,
  364. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  365. LastFault: lastFault,
  366. }
  367. }
  368. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  369. switch {
  370. case queueHealth == output.QueueHealthCritical:
  371. return RuntimeIndicatorQueueCritical
  372. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  373. return RuntimeIndicatorDegraded
  374. default:
  375. return RuntimeIndicatorNormal
  376. }
  377. }
  378. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  379. switch {
  380. case queueHealth == output.QueueHealthCritical:
  381. return "queue health critical"
  382. case recentLateBuffers:
  383. return "late buffers"
  384. case queueHealth == output.QueueHealthLow:
  385. return "queue health low"
  386. default:
  387. return ""
  388. }
  389. }
  390. func (e *Engine) run(ctx context.Context) {
  391. e.setRuntimeState(RuntimeStatePrebuffering)
  392. e.wg.Add(1)
  393. go e.writerLoop(ctx)
  394. defer e.wg.Done()
  395. for {
  396. if ctx.Err() != nil {
  397. return
  398. }
  399. // Apply pending frequency change between chunks
  400. if pf := e.pendingFreq.Swap(nil); pf != nil {
  401. if err := e.driver.Tune(ctx, *pf); err != nil {
  402. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  403. } else {
  404. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  405. }
  406. }
  407. t0 := time.Now()
  408. frame := e.generator.GenerateFrame(e.chunkDuration)
  409. frame.GeneratedAt = t0
  410. t1 := time.Now()
  411. if e.upsampler != nil {
  412. frame = e.upsampler.Process(frame)
  413. frame.GeneratedAt = t0
  414. }
  415. t2 := time.Now()
  416. genDur := t1.Sub(t0)
  417. upDur := t2.Sub(t1)
  418. updateMaxDuration(&e.maxGenerateNs, genDur)
  419. updateMaxDuration(&e.maxUpsampleNs, upDur)
  420. enqueued := cloneFrame(frame)
  421. if enqueued == nil {
  422. e.lastError.Store("engine: frame clone failed")
  423. e.underruns.Add(1)
  424. continue
  425. }
  426. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  427. if ctx.Err() != nil {
  428. return
  429. }
  430. if errors.Is(err, output.ErrFrameQueueClosed) {
  431. return
  432. }
  433. e.lastError.Store(err.Error())
  434. e.underruns.Add(1)
  435. select {
  436. case <-time.After(e.chunkDuration):
  437. case <-ctx.Done():
  438. return
  439. }
  440. continue
  441. }
  442. queueStats := e.frameQueue.Stats()
  443. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  444. }
  445. }
  446. func (e *Engine) writerLoop(ctx context.Context) {
  447. defer e.wg.Done()
  448. for {
  449. frame, err := e.frameQueue.Pop(ctx)
  450. if err != nil {
  451. if ctx.Err() != nil {
  452. return
  453. }
  454. if errors.Is(err, output.ErrFrameQueueClosed) {
  455. return
  456. }
  457. e.lastError.Store(err.Error())
  458. e.underruns.Add(1)
  459. continue
  460. }
  461. writeStart := time.Now()
  462. n, err := e.driver.Write(ctx, frame)
  463. writeDur := time.Since(writeStart)
  464. cycleDur := writeDur
  465. if !frame.GeneratedAt.IsZero() {
  466. cycleDur = time.Since(frame.GeneratedAt)
  467. }
  468. updateMaxDuration(&e.maxWriteNs, writeDur)
  469. updateMaxDuration(&e.maxCycleNs, cycleDur)
  470. queueStats := e.frameQueue.Stats()
  471. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  472. if cycleDur > e.chunkDuration {
  473. late := e.lateBuffers.Add(1)
  474. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  475. if late <= 5 || late%20 == 0 {
  476. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  477. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  478. }
  479. }
  480. if err != nil {
  481. if ctx.Err() != nil {
  482. return
  483. }
  484. e.lastError.Store(err.Error())
  485. e.underruns.Add(1)
  486. select {
  487. case <-time.After(e.chunkDuration):
  488. case <-ctx.Done():
  489. return
  490. }
  491. continue
  492. }
  493. e.chunksProduced.Add(1)
  494. e.totalSamples.Add(uint64(n))
  495. }
  496. }
  497. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  498. if src == nil {
  499. return nil
  500. }
  501. samples := make([]output.IQSample, len(src.Samples))
  502. copy(samples, src.Samples)
  503. return &output.CompositeFrame{
  504. Samples: samples,
  505. SampleRateHz: src.SampleRateHz,
  506. Timestamp: src.Timestamp,
  507. GeneratedAt: src.GeneratedAt,
  508. Sequence: src.Sequence,
  509. }
  510. }
  511. func (e *Engine) setRuntimeState(state RuntimeState) {
  512. e.runtimeState.Store(state)
  513. }
  514. func (e *Engine) currentRuntimeState() RuntimeState {
  515. if v := e.runtimeState.Load(); v != nil {
  516. if rs, ok := v.(RuntimeState); ok {
  517. return rs
  518. }
  519. }
  520. return RuntimeStateIdle
  521. }
  522. func (e *Engine) hasRecentLateBuffers() bool {
  523. lateAlertAt := e.lateBufferAlertAt.Load()
  524. if lateAlertAt == 0 {
  525. return false
  526. }
  527. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  528. }
  529. func (e *Engine) lastFaultEvent() *FaultEvent {
  530. return copyFaultEvent(e.loadLastFault())
  531. }
  532. // LastFault exposes the most recent captured fault, if any.
  533. func (e *Engine) LastFault() *FaultEvent {
  534. return e.lastFaultEvent()
  535. }
  536. func (e *Engine) FaultHistory() []FaultEvent {
  537. e.faultHistoryMu.Lock()
  538. defer e.faultHistoryMu.Unlock()
  539. history := make([]FaultEvent, len(e.faultHistory))
  540. copy(history, e.faultHistory)
  541. return history
  542. }
  543. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  544. if reason == "" {
  545. reason = FaultReasonUnknown
  546. }
  547. now := time.Now()
  548. if last := e.loadLastFault(); last != nil {
  549. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  550. return
  551. }
  552. }
  553. ev := &FaultEvent{
  554. Time: now,
  555. Reason: reason,
  556. Severity: severity,
  557. Message: message,
  558. }
  559. e.lastFault.Store(ev)
  560. e.appendFaultHistory(ev)
  561. }
  562. func (e *Engine) loadLastFault() *FaultEvent {
  563. if v := e.lastFault.Load(); v != nil {
  564. if ev, ok := v.(*FaultEvent); ok {
  565. return ev
  566. }
  567. }
  568. return nil
  569. }
  570. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  571. if source == nil {
  572. return nil
  573. }
  574. copy := *source
  575. return &copy
  576. }
  577. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  578. e.faultHistoryMu.Lock()
  579. defer e.faultHistoryMu.Unlock()
  580. if len(e.faultHistory) >= faultHistoryCapacity {
  581. copy(e.faultHistory, e.faultHistory[1:])
  582. e.faultHistory[len(e.faultHistory)-1] = *ev
  583. return
  584. }
  585. e.faultHistory = append(e.faultHistory, *ev)
  586. }
  587. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  588. state := e.currentRuntimeState()
  589. switch state {
  590. case RuntimeStateStopping, RuntimeStateFaulted:
  591. return
  592. case RuntimeStateMuted:
  593. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  594. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  595. e.mutedRecoveryStreak.Store(0)
  596. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  597. fmt.Sprintf("queue healthy for %d checks after mute", count))
  598. e.setRuntimeState(RuntimeStateDegraded)
  599. }
  600. } else {
  601. e.mutedRecoveryStreak.Store(0)
  602. }
  603. return
  604. }
  605. if state == RuntimeStatePrebuffering {
  606. if queue.Depth >= 1 {
  607. e.setRuntimeState(RuntimeStateRunning)
  608. }
  609. return
  610. }
  611. critical := queue.Health == output.QueueHealthCritical
  612. if critical {
  613. count := e.criticalStreak.Add(1)
  614. if count >= queueMutedStreakThreshold {
  615. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  616. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  617. e.setRuntimeState(RuntimeStateMuted)
  618. return
  619. }
  620. if count >= queueCriticalStreakThreshold {
  621. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  622. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  623. e.setRuntimeState(RuntimeStateDegraded)
  624. return
  625. }
  626. } else {
  627. e.criticalStreak.Store(0)
  628. }
  629. if hasLateBuffers {
  630. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  631. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  632. e.setRuntimeState(RuntimeStateDegraded)
  633. return
  634. }
  635. e.setRuntimeState(RuntimeStateRunning)
  636. }