Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

867 行
25KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "math"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/jan/fm-rds-tx/internal/audio"
  12. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  13. "github.com/jan/fm-rds-tx/internal/dsp"
  14. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  15. "github.com/jan/fm-rds-tx/internal/output"
  16. "github.com/jan/fm-rds-tx/internal/platform"
  17. )
  18. type EngineState int
  19. const (
  20. EngineIdle EngineState = iota
  21. EngineRunning
  22. EngineStopping
  23. )
  24. func (s EngineState) String() string {
  25. switch s {
  26. case EngineIdle:
  27. return "idle"
  28. case EngineRunning:
  29. return "running"
  30. case EngineStopping:
  31. return "stopping"
  32. default:
  33. return "unknown"
  34. }
  35. }
  36. type RuntimeState string
  37. const (
  38. RuntimeStateIdle RuntimeState = "idle"
  39. RuntimeStateArming RuntimeState = "arming"
  40. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  41. RuntimeStateRunning RuntimeState = "running"
  42. RuntimeStateDegraded RuntimeState = "degraded"
  43. RuntimeStateMuted RuntimeState = "muted"
  44. RuntimeStateFaulted RuntimeState = "faulted"
  45. RuntimeStateStopping RuntimeState = "stopping"
  46. )
  47. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  48. v := uint64(d)
  49. for {
  50. cur := dst.Load()
  51. if v <= cur {
  52. return
  53. }
  54. if dst.CompareAndSwap(cur, v) {
  55. return
  56. }
  57. }
  58. }
  59. func durationMs(ns uint64) float64 {
  60. return float64(ns) / float64(time.Millisecond)
  61. }
  62. type EngineStats struct {
  63. State string `json:"state"`
  64. RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"`
  65. ChunksProduced uint64 `json:"chunksProduced"`
  66. TotalSamples uint64 `json:"totalSamples"`
  67. Underruns uint64 `json:"underruns"`
  68. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  69. LastError string `json:"lastError,omitempty"`
  70. UptimeSeconds float64 `json:"uptimeSeconds"`
  71. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  72. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  73. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  74. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  75. MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"`
  76. MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"`
  77. Queue output.QueueStats `json:"queue"`
  78. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  79. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  80. AppliedFrequencyMHz float64 `json:"appliedFrequencyMHz"`
  81. LastFault *FaultEvent `json:"lastFault,omitempty"`
  82. DegradedTransitions uint64 `json:"degradedTransitions"`
  83. MutedTransitions uint64 `json:"mutedTransitions"`
  84. FaultedTransitions uint64 `json:"faultedTransitions"`
  85. FaultCount uint64 `json:"faultCount"`
  86. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  87. TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
  88. }
  89. type RuntimeIndicator string
  90. const (
  91. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  92. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  93. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  94. )
  95. type RuntimeTransition struct {
  96. Time time.Time `json:"time"`
  97. From RuntimeState `json:"from"`
  98. To RuntimeState `json:"to"`
  99. Severity string `json:"severity"`
  100. }
  101. const (
  102. lateBufferIndicatorWindow = 5 * time.Second
  103. writeLateTolerance = 1 * time.Millisecond
  104. queueCriticalStreakThreshold = 3
  105. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  106. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  107. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  108. faultRepeatWindow = 1 * time.Second
  109. faultHistoryCapacity = 8
  110. runtimeTransitionHistoryCapacity = 8
  111. )
  112. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  113. // resamples to device rate, and pushes to hardware in a tight loop.
  114. // The hardware buffer_push call is blocking — it returns when the hardware
  115. // has consumed the previous buffer and is ready for the next one.
  116. // This naturally paces the loop to real-time without a ticker.
  117. type Engine struct {
  118. cfg cfgpkg.Config
  119. driver platform.SoapyDriver
  120. generator *offpkg.Generator
  121. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  122. chunkDuration time.Duration
  123. deviceRate float64
  124. frameQueue *output.FrameQueue
  125. mu sync.Mutex
  126. state EngineState
  127. cancel context.CancelFunc
  128. startedAt time.Time
  129. wg sync.WaitGroup
  130. runtimeState atomic.Value
  131. chunksProduced atomic.Uint64
  132. totalSamples atomic.Uint64
  133. underruns atomic.Uint64
  134. lateBuffers atomic.Uint64
  135. lateBufferAlertAt atomic.Uint64
  136. criticalStreak atomic.Uint64
  137. mutedRecoveryStreak atomic.Uint64
  138. mutedFaultStreak atomic.Uint64
  139. maxCycleNs atomic.Uint64
  140. maxGenerateNs atomic.Uint64
  141. maxUpsampleNs atomic.Uint64
  142. maxWriteNs atomic.Uint64
  143. maxQueueResidenceNs atomic.Uint64
  144. maxPipelineNs atomic.Uint64
  145. lastError atomic.Value // string
  146. lastFault atomic.Value // *FaultEvent
  147. faultHistoryMu sync.Mutex
  148. faultHistory []FaultEvent
  149. transitionHistoryMu sync.Mutex
  150. transitionHistory []RuntimeTransition
  151. degradedTransitions atomic.Uint64
  152. mutedTransitions atomic.Uint64
  153. faultedTransitions atomic.Uint64
  154. faultEvents atomic.Uint64
  155. runtimeStateEnteredAt atomic.Uint64
  156. // Live config: pending frequency change, applied between chunks
  157. pendingFreq atomic.Pointer[float64]
  158. // Most recently tuned frequency (Hz)
  159. appliedFreqHz atomic.Uint64
  160. // Live audio stream (optional)
  161. streamSrc *audio.StreamSource
  162. }
  163. // SetStreamSource configures a live audio stream as the audio source.
  164. // Must be called before Start(). The StreamResampler is created internally
  165. // to convert from the stream's sample rate to the DSP composite rate.
  166. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  167. e.streamSrc = src
  168. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  169. if compositeRate <= 0 {
  170. compositeRate = 228000
  171. }
  172. resampler := audio.NewStreamResampler(src, compositeRate)
  173. e.generator.SetExternalSource(resampler)
  174. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  175. src.SampleRate, compositeRate, src.Stats().Capacity)
  176. }
  177. // StreamSource returns the live audio stream source, or nil.
  178. // Used by the control server for stats and HTTP audio ingest.
  179. func (e *Engine) StreamSource() *audio.StreamSource {
  180. return e.streamSrc
  181. }
  182. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  183. deviceRate := cfg.EffectiveDeviceRate()
  184. compositeRate := float64(cfg.FM.CompositeRateHz)
  185. if compositeRate <= 0 {
  186. compositeRate = 228000
  187. }
  188. var upsampler *dsp.FMUpsampler
  189. if deviceRate > compositeRate*1.001 {
  190. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  191. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  192. // This halves CPU load compared to running all DSP at deviceRate.
  193. cfg.FM.FMModulationEnabled = false
  194. maxDev := cfg.FM.MaxDeviationHz
  195. if maxDev <= 0 {
  196. maxDev = 75000
  197. }
  198. // mpxGain scales the FM deviation to compensate for hardware
  199. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  200. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  201. maxDev *= cfg.FM.MpxGain
  202. }
  203. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  204. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  205. compositeRate, deviceRate, deviceRate/compositeRate)
  206. } else {
  207. // Same-rate: entire DSP chain runs at deviceRate.
  208. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  209. if deviceRate > 0 {
  210. cfg.FM.CompositeRateHz = int(deviceRate)
  211. }
  212. cfg.FM.FMModulationEnabled = true
  213. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  214. }
  215. engine := &Engine{
  216. cfg: cfg,
  217. driver: driver,
  218. generator: offpkg.NewGenerator(cfg),
  219. upsampler: upsampler,
  220. chunkDuration: 50 * time.Millisecond,
  221. deviceRate: deviceRate,
  222. state: EngineIdle,
  223. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  224. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  225. transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
  226. }
  227. initFreqHz := cfg.FM.FrequencyMHz * 1e6
  228. engine.appliedFreqHz.Store(math.Float64bits(initFreqHz))
  229. engine.setRuntimeState(RuntimeStateIdle)
  230. return engine
  231. }
  232. func (e *Engine) SetChunkDuration(d time.Duration) {
  233. e.chunkDuration = d
  234. }
  235. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  236. // nil pointers mean "no change". Validated before applying.
  237. type LiveConfigUpdate struct {
  238. FrequencyMHz *float64
  239. OutputDrive *float64
  240. StereoEnabled *bool
  241. PilotLevel *float64
  242. RDSInjection *float64
  243. RDSEnabled *bool
  244. LimiterEnabled *bool
  245. LimiterCeiling *float64
  246. PS *string
  247. RadioText *string
  248. }
  249. // UpdateConfig applies live parameter changes without restarting the engine.
  250. // DSP params take effect at the next chunk boundary (~50ms max).
  251. // Frequency changes are applied between chunks via driver.Tune().
  252. // RDS text updates are applied at the next RDS group boundary (~88ms).
  253. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  254. // --- Validate ---
  255. if u.FrequencyMHz != nil {
  256. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  257. return fmt.Errorf("frequencyMHz out of range (65-110)")
  258. }
  259. }
  260. if u.OutputDrive != nil {
  261. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  262. return fmt.Errorf("outputDrive out of range (0-10)")
  263. }
  264. }
  265. if u.PilotLevel != nil {
  266. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  267. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  268. }
  269. }
  270. if u.RDSInjection != nil {
  271. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  272. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  273. }
  274. }
  275. if u.LimiterCeiling != nil {
  276. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  277. return fmt.Errorf("limiterCeiling out of range (0-2)")
  278. }
  279. }
  280. // --- Frequency: store for run loop to apply via driver.Tune() ---
  281. if u.FrequencyMHz != nil {
  282. freqHz := *u.FrequencyMHz * 1e6
  283. e.pendingFreq.Store(&freqHz)
  284. }
  285. // --- RDS text: forward to encoder atomics ---
  286. if u.PS != nil || u.RadioText != nil {
  287. if enc := e.generator.RDSEncoder(); enc != nil {
  288. ps, rt := "", ""
  289. if u.PS != nil {
  290. ps = *u.PS
  291. }
  292. if u.RadioText != nil {
  293. rt = *u.RadioText
  294. }
  295. enc.UpdateText(ps, rt)
  296. }
  297. }
  298. // --- DSP params: build new LiveParams from current + patch ---
  299. // Read current, apply deltas, store new
  300. current := e.generator.CurrentLiveParams()
  301. next := current // copy
  302. if u.OutputDrive != nil {
  303. next.OutputDrive = *u.OutputDrive
  304. }
  305. if u.StereoEnabled != nil {
  306. next.StereoEnabled = *u.StereoEnabled
  307. }
  308. if u.PilotLevel != nil {
  309. next.PilotLevel = *u.PilotLevel
  310. }
  311. if u.RDSInjection != nil {
  312. next.RDSInjection = *u.RDSInjection
  313. }
  314. if u.RDSEnabled != nil {
  315. next.RDSEnabled = *u.RDSEnabled
  316. }
  317. if u.LimiterEnabled != nil {
  318. next.LimiterEnabled = *u.LimiterEnabled
  319. }
  320. if u.LimiterCeiling != nil {
  321. next.LimiterCeiling = *u.LimiterCeiling
  322. }
  323. e.generator.UpdateLive(next)
  324. return nil
  325. }
  326. func (e *Engine) Start(ctx context.Context) error {
  327. e.mu.Lock()
  328. if e.state != EngineIdle {
  329. e.mu.Unlock()
  330. return fmt.Errorf("engine already in state %s", e.state)
  331. }
  332. if err := e.driver.Start(ctx); err != nil {
  333. e.mu.Unlock()
  334. return fmt.Errorf("driver start: %w", err)
  335. }
  336. runCtx, cancel := context.WithCancel(ctx)
  337. e.cancel = cancel
  338. e.state = EngineRunning
  339. e.setRuntimeState(RuntimeStateArming)
  340. e.startedAt = time.Now()
  341. e.wg.Add(1)
  342. e.mu.Unlock()
  343. go e.run(runCtx)
  344. return nil
  345. }
  346. func (e *Engine) Stop(ctx context.Context) error {
  347. e.mu.Lock()
  348. if e.state != EngineRunning {
  349. e.mu.Unlock()
  350. return nil
  351. }
  352. e.state = EngineStopping
  353. e.setRuntimeState(RuntimeStateStopping)
  354. e.cancel()
  355. e.mu.Unlock()
  356. // Wait for run() goroutine to exit — deterministic, no guessing
  357. e.wg.Wait()
  358. if err := e.driver.Flush(ctx); err != nil {
  359. return err
  360. }
  361. if err := e.driver.Stop(ctx); err != nil {
  362. return err
  363. }
  364. e.mu.Lock()
  365. e.state = EngineIdle
  366. e.setRuntimeState(RuntimeStateIdle)
  367. e.mu.Unlock()
  368. return nil
  369. }
  370. func (e *Engine) Stats() EngineStats {
  371. e.mu.Lock()
  372. state := e.state
  373. startedAt := e.startedAt
  374. e.mu.Unlock()
  375. var uptime float64
  376. if state == EngineRunning {
  377. uptime = time.Since(startedAt).Seconds()
  378. }
  379. errVal, _ := e.lastError.Load().(string)
  380. queue := e.frameQueue.Stats()
  381. lateBuffers := e.lateBuffers.Load()
  382. hasRecentLateBuffers := e.hasRecentLateBuffers()
  383. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  384. lastFault := e.lastFaultEvent()
  385. return EngineStats{
  386. State: string(e.currentRuntimeState()),
  387. RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(),
  388. ChunksProduced: e.chunksProduced.Load(),
  389. TotalSamples: e.totalSamples.Load(),
  390. Underruns: e.underruns.Load(),
  391. LateBuffers: lateBuffers,
  392. LastError: errVal,
  393. UptimeSeconds: uptime,
  394. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  395. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  396. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  397. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  398. MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()),
  399. MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()),
  400. Queue: queue,
  401. RuntimeIndicator: ri,
  402. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  403. AppliedFrequencyMHz: e.appliedFrequencyMHz(),
  404. LastFault: lastFault,
  405. DegradedTransitions: e.degradedTransitions.Load(),
  406. MutedTransitions: e.mutedTransitions.Load(),
  407. FaultedTransitions: e.faultedTransitions.Load(),
  408. FaultCount: e.faultEvents.Load(),
  409. FaultHistory: e.FaultHistory(),
  410. TransitionHistory: e.TransitionHistory(),
  411. }
  412. }
  413. func (e *Engine) appliedFrequencyMHz() float64 {
  414. bits := e.appliedFreqHz.Load()
  415. return math.Float64frombits(bits) / 1e6
  416. }
  417. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  418. switch {
  419. case queueHealth == output.QueueHealthCritical:
  420. return RuntimeIndicatorQueueCritical
  421. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  422. return RuntimeIndicatorDegraded
  423. default:
  424. return RuntimeIndicatorNormal
  425. }
  426. }
  427. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  428. switch {
  429. case queueHealth == output.QueueHealthCritical:
  430. return "queue health critical"
  431. case recentLateBuffers:
  432. return "late buffers"
  433. case queueHealth == output.QueueHealthLow:
  434. return "queue health low"
  435. default:
  436. return ""
  437. }
  438. }
  439. func runtimeStateSeverity(state RuntimeState) string {
  440. switch state {
  441. case RuntimeStateRunning:
  442. return "ok"
  443. case RuntimeStateDegraded, RuntimeStateMuted:
  444. return "warn"
  445. case RuntimeStateFaulted:
  446. return "err"
  447. default:
  448. return "info"
  449. }
  450. }
  451. func (e *Engine) run(ctx context.Context) {
  452. e.setRuntimeState(RuntimeStatePrebuffering)
  453. e.wg.Add(1)
  454. go e.writerLoop(ctx)
  455. defer e.wg.Done()
  456. for {
  457. if ctx.Err() != nil {
  458. return
  459. }
  460. // Apply pending frequency change between chunks
  461. if pf := e.pendingFreq.Swap(nil); pf != nil {
  462. if err := e.driver.Tune(ctx, *pf); err != nil {
  463. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  464. } else {
  465. e.appliedFreqHz.Store(math.Float64bits(*pf))
  466. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  467. }
  468. }
  469. t0 := time.Now()
  470. frame := e.generator.GenerateFrame(e.chunkDuration)
  471. frame.GeneratedAt = t0
  472. t1 := time.Now()
  473. if e.upsampler != nil {
  474. frame = e.upsampler.Process(frame)
  475. frame.GeneratedAt = t0
  476. }
  477. t2 := time.Now()
  478. genDur := t1.Sub(t0)
  479. upDur := t2.Sub(t1)
  480. updateMaxDuration(&e.maxGenerateNs, genDur)
  481. updateMaxDuration(&e.maxUpsampleNs, upDur)
  482. enqueued := cloneFrame(frame)
  483. enqueued.EnqueuedAt = time.Now()
  484. if enqueued == nil {
  485. e.lastError.Store("engine: frame clone failed")
  486. e.underruns.Add(1)
  487. continue
  488. }
  489. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  490. if ctx.Err() != nil {
  491. return
  492. }
  493. if errors.Is(err, output.ErrFrameQueueClosed) {
  494. return
  495. }
  496. e.lastError.Store(err.Error())
  497. e.underruns.Add(1)
  498. select {
  499. case <-time.After(e.chunkDuration):
  500. case <-ctx.Done():
  501. return
  502. }
  503. continue
  504. }
  505. queueStats := e.frameQueue.Stats()
  506. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  507. }
  508. }
  509. func (e *Engine) writerLoop(ctx context.Context) {
  510. defer e.wg.Done()
  511. for {
  512. frame, err := e.frameQueue.Pop(ctx)
  513. if err != nil {
  514. if ctx.Err() != nil {
  515. return
  516. }
  517. if errors.Is(err, output.ErrFrameQueueClosed) {
  518. return
  519. }
  520. e.lastError.Store(err.Error())
  521. e.underruns.Add(1)
  522. continue
  523. }
  524. frame.DequeuedAt = time.Now()
  525. queueResidence := time.Duration(0)
  526. if !frame.EnqueuedAt.IsZero() {
  527. queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt)
  528. }
  529. writeStart := time.Now()
  530. frame.WriteStartedAt = writeStart
  531. n, err := e.driver.Write(ctx, frame)
  532. writeDur := time.Since(writeStart)
  533. pipelineLatency := writeDur
  534. if !frame.GeneratedAt.IsZero() {
  535. pipelineLatency = time.Since(frame.GeneratedAt)
  536. }
  537. updateMaxDuration(&e.maxWriteNs, writeDur)
  538. updateMaxDuration(&e.maxQueueResidenceNs, queueResidence)
  539. updateMaxDuration(&e.maxPipelineNs, pipelineLatency)
  540. updateMaxDuration(&e.maxCycleNs, writeDur)
  541. queueStats := e.frameQueue.Stats()
  542. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  543. lateOver := writeDur - e.chunkDuration
  544. if lateOver > writeLateTolerance {
  545. late := e.lateBuffers.Add(1)
  546. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  547. if late <= 5 || late%20 == 0 {
  548. log.Printf("TX LATE: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s",
  549. writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency)
  550. }
  551. }
  552. if err != nil {
  553. if ctx.Err() != nil {
  554. return
  555. }
  556. e.recordFault(FaultReasonWriteTimeout, FaultSeverityWarn, fmt.Sprintf("driver write error: %v", err))
  557. e.lastError.Store(err.Error())
  558. e.underruns.Add(1)
  559. select {
  560. case <-time.After(e.chunkDuration):
  561. case <-ctx.Done():
  562. return
  563. }
  564. continue
  565. }
  566. e.chunksProduced.Add(1)
  567. e.totalSamples.Add(uint64(n))
  568. }
  569. }
  570. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  571. if src == nil {
  572. return nil
  573. }
  574. samples := make([]output.IQSample, len(src.Samples))
  575. copy(samples, src.Samples)
  576. return &output.CompositeFrame{
  577. Samples: samples,
  578. SampleRateHz: src.SampleRateHz,
  579. Timestamp: src.Timestamp,
  580. GeneratedAt: src.GeneratedAt,
  581. EnqueuedAt: src.EnqueuedAt,
  582. DequeuedAt: src.DequeuedAt,
  583. WriteStartedAt: src.WriteStartedAt,
  584. Sequence: src.Sequence,
  585. }
  586. }
  587. func (e *Engine) setRuntimeState(state RuntimeState) {
  588. now := time.Now()
  589. prev := e.currentRuntimeState()
  590. if prev != state {
  591. e.recordRuntimeTransition(prev, state, now)
  592. switch state {
  593. case RuntimeStateDegraded:
  594. e.degradedTransitions.Add(1)
  595. case RuntimeStateMuted:
  596. e.mutedTransitions.Add(1)
  597. case RuntimeStateFaulted:
  598. e.faultedTransitions.Add(1)
  599. }
  600. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  601. } else if e.runtimeStateEnteredAt.Load() == 0 {
  602. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  603. }
  604. e.runtimeState.Store(state)
  605. }
  606. func (e *Engine) currentRuntimeState() RuntimeState {
  607. if v := e.runtimeState.Load(); v != nil {
  608. if rs, ok := v.(RuntimeState); ok {
  609. return rs
  610. }
  611. }
  612. return RuntimeStateIdle
  613. }
  614. func (e *Engine) runtimeStateDurationSeconds() float64 {
  615. if ts := e.runtimeStateEnteredAt.Load(); ts != 0 {
  616. return time.Since(time.Unix(0, int64(ts))).Seconds()
  617. }
  618. return 0
  619. }
  620. func (e *Engine) hasRecentLateBuffers() bool {
  621. lateAlertAt := e.lateBufferAlertAt.Load()
  622. if lateAlertAt == 0 {
  623. return false
  624. }
  625. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  626. }
  627. func (e *Engine) lastFaultEvent() *FaultEvent {
  628. return copyFaultEvent(e.loadLastFault())
  629. }
  630. // LastFault exposes the most recent captured fault, if any.
  631. func (e *Engine) LastFault() *FaultEvent {
  632. return e.lastFaultEvent()
  633. }
  634. func (e *Engine) FaultHistory() []FaultEvent {
  635. e.faultHistoryMu.Lock()
  636. defer e.faultHistoryMu.Unlock()
  637. history := make([]FaultEvent, len(e.faultHistory))
  638. copy(history, e.faultHistory)
  639. return history
  640. }
  641. func (e *Engine) TransitionHistory() []RuntimeTransition {
  642. e.transitionHistoryMu.Lock()
  643. defer e.transitionHistoryMu.Unlock()
  644. history := make([]RuntimeTransition, len(e.transitionHistory))
  645. copy(history, e.transitionHistory)
  646. return history
  647. }
  648. func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) {
  649. if when.IsZero() {
  650. when = time.Now()
  651. }
  652. ev := RuntimeTransition{
  653. Time: when,
  654. From: from,
  655. To: to,
  656. Severity: runtimeStateSeverity(to),
  657. }
  658. e.transitionHistoryMu.Lock()
  659. defer e.transitionHistoryMu.Unlock()
  660. if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
  661. copy(e.transitionHistory, e.transitionHistory[1:])
  662. e.transitionHistory[len(e.transitionHistory)-1] = ev
  663. return
  664. }
  665. e.transitionHistory = append(e.transitionHistory, ev)
  666. }
  667. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  668. if reason == "" {
  669. reason = FaultReasonUnknown
  670. }
  671. now := time.Now()
  672. if last := e.loadLastFault(); last != nil {
  673. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  674. return
  675. }
  676. }
  677. ev := &FaultEvent{
  678. Time: now,
  679. Reason: reason,
  680. Severity: severity,
  681. Message: message,
  682. }
  683. e.lastFault.Store(ev)
  684. e.appendFaultHistory(ev)
  685. e.faultEvents.Add(1)
  686. }
  687. func (e *Engine) loadLastFault() *FaultEvent {
  688. if v := e.lastFault.Load(); v != nil {
  689. if ev, ok := v.(*FaultEvent); ok {
  690. return ev
  691. }
  692. }
  693. return nil
  694. }
  695. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  696. if source == nil {
  697. return nil
  698. }
  699. copy := *source
  700. return &copy
  701. }
  702. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  703. e.faultHistoryMu.Lock()
  704. defer e.faultHistoryMu.Unlock()
  705. if len(e.faultHistory) >= faultHistoryCapacity {
  706. copy(e.faultHistory, e.faultHistory[1:])
  707. e.faultHistory[len(e.faultHistory)-1] = *ev
  708. return
  709. }
  710. e.faultHistory = append(e.faultHistory, *ev)
  711. }
  712. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  713. state := e.currentRuntimeState()
  714. switch state {
  715. case RuntimeStateStopping, RuntimeStateFaulted:
  716. return
  717. case RuntimeStateMuted:
  718. if queue.Health == output.QueueHealthCritical {
  719. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  720. e.mutedFaultStreak.Store(0)
  721. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  722. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  723. e.setRuntimeState(RuntimeStateFaulted)
  724. return
  725. }
  726. } else {
  727. e.mutedFaultStreak.Store(0)
  728. }
  729. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  730. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  731. e.mutedRecoveryStreak.Store(0)
  732. e.mutedFaultStreak.Store(0)
  733. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  734. fmt.Sprintf("queue healthy for %d checks after mute", count))
  735. e.setRuntimeState(RuntimeStateDegraded)
  736. }
  737. } else {
  738. e.mutedRecoveryStreak.Store(0)
  739. }
  740. return
  741. }
  742. if state == RuntimeStatePrebuffering {
  743. if queue.Depth >= 1 {
  744. e.setRuntimeState(RuntimeStateRunning)
  745. }
  746. return
  747. }
  748. critical := queue.Health == output.QueueHealthCritical
  749. if critical {
  750. count := e.criticalStreak.Add(1)
  751. if count >= queueMutedStreakThreshold {
  752. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  753. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  754. e.setRuntimeState(RuntimeStateMuted)
  755. return
  756. }
  757. if count >= queueCriticalStreakThreshold {
  758. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  759. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  760. e.setRuntimeState(RuntimeStateDegraded)
  761. return
  762. }
  763. } else {
  764. e.criticalStreak.Store(0)
  765. }
  766. if hasLateBuffers {
  767. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  768. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  769. e.setRuntimeState(RuntimeStateDegraded)
  770. return
  771. }
  772. e.setRuntimeState(RuntimeStateRunning)
  773. }
  774. // ResetFault attempts to move the engine out of the faulted state.
  775. func (e *Engine) ResetFault() error {
  776. state := e.currentRuntimeState()
  777. if state != RuntimeStateFaulted {
  778. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  779. }
  780. e.criticalStreak.Store(0)
  781. e.mutedRecoveryStreak.Store(0)
  782. e.mutedFaultStreak.Store(0)
  783. e.setRuntimeState(RuntimeStateDegraded)
  784. return nil
  785. }