Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

880 lines
26KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "math"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/jan/fm-rds-tx/internal/audio"
  12. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  13. "github.com/jan/fm-rds-tx/internal/dsp"
  14. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  15. "github.com/jan/fm-rds-tx/internal/output"
  16. "github.com/jan/fm-rds-tx/internal/platform"
  17. )
  18. type EngineState int
  19. const (
  20. EngineIdle EngineState = iota
  21. EngineRunning
  22. EngineStopping
  23. )
  24. func (s EngineState) String() string {
  25. switch s {
  26. case EngineIdle:
  27. return "idle"
  28. case EngineRunning:
  29. return "running"
  30. case EngineStopping:
  31. return "stopping"
  32. default:
  33. return "unknown"
  34. }
  35. }
  36. type RuntimeState string
  37. const (
  38. RuntimeStateIdle RuntimeState = "idle"
  39. RuntimeStateArming RuntimeState = "arming"
  40. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  41. RuntimeStateRunning RuntimeState = "running"
  42. RuntimeStateDegraded RuntimeState = "degraded"
  43. RuntimeStateMuted RuntimeState = "muted"
  44. RuntimeStateFaulted RuntimeState = "faulted"
  45. RuntimeStateStopping RuntimeState = "stopping"
  46. )
  47. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  48. v := uint64(d)
  49. for {
  50. cur := dst.Load()
  51. if v <= cur {
  52. return
  53. }
  54. if dst.CompareAndSwap(cur, v) {
  55. return
  56. }
  57. }
  58. }
  59. func durationMs(ns uint64) float64 {
  60. return float64(ns) / float64(time.Millisecond)
  61. }
  62. type EngineStats struct {
  63. State string `json:"state"`
  64. RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"`
  65. ChunksProduced uint64 `json:"chunksProduced"`
  66. TotalSamples uint64 `json:"totalSamples"`
  67. Underruns uint64 `json:"underruns"`
  68. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  69. LastError string `json:"lastError,omitempty"`
  70. UptimeSeconds float64 `json:"uptimeSeconds"`
  71. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  72. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  73. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  74. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  75. MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"`
  76. MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"`
  77. Queue output.QueueStats `json:"queue"`
  78. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  79. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  80. AppliedFrequencyMHz float64 `json:"appliedFrequencyMHz"`
  81. LastFault *FaultEvent `json:"lastFault,omitempty"`
  82. DegradedTransitions uint64 `json:"degradedTransitions"`
  83. MutedTransitions uint64 `json:"mutedTransitions"`
  84. FaultedTransitions uint64 `json:"faultedTransitions"`
  85. FaultCount uint64 `json:"faultCount"`
  86. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  87. TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
  88. }
  89. type RuntimeIndicator string
  90. const (
  91. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  92. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  93. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  94. )
  95. type RuntimeTransition struct {
  96. Time time.Time `json:"time"`
  97. From RuntimeState `json:"from"`
  98. To RuntimeState `json:"to"`
  99. Severity string `json:"severity"`
  100. }
  101. const (
  102. lateBufferIndicatorWindow = 2 * time.Second
  103. writeLateTolerance = 10 * time.Millisecond
  104. queueCriticalStreakThreshold = 3
  105. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  106. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  107. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  108. faultRepeatWindow = 1 * time.Second
  109. lateBufferStreakThreshold = 3 // consecutive late writes required before alerting
  110. faultHistoryCapacity = 8
  111. runtimeTransitionHistoryCapacity = 8
  112. )
  113. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  114. // resamples to device rate, and pushes to hardware in a tight loop.
  115. // The hardware buffer_push call is blocking — it returns when the hardware
  116. // has consumed the previous buffer and is ready for the next one.
  117. // This naturally paces the loop to real-time without a ticker.
  118. type Engine struct {
  119. cfg cfgpkg.Config
  120. driver platform.SoapyDriver
  121. generator *offpkg.Generator
  122. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  123. chunkDuration time.Duration
  124. deviceRate float64
  125. frameQueue *output.FrameQueue
  126. mu sync.Mutex
  127. state EngineState
  128. cancel context.CancelFunc
  129. startedAt time.Time
  130. wg sync.WaitGroup
  131. runtimeState atomic.Value
  132. chunksProduced atomic.Uint64
  133. totalSamples atomic.Uint64
  134. underruns atomic.Uint64
  135. lateBuffers atomic.Uint64
  136. lateBufferAlertAt atomic.Uint64
  137. lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write
  138. criticalStreak atomic.Uint64
  139. mutedRecoveryStreak atomic.Uint64
  140. mutedFaultStreak atomic.Uint64
  141. maxCycleNs atomic.Uint64
  142. maxGenerateNs atomic.Uint64
  143. maxUpsampleNs atomic.Uint64
  144. maxWriteNs atomic.Uint64
  145. maxQueueResidenceNs atomic.Uint64
  146. maxPipelineNs atomic.Uint64
  147. lastError atomic.Value // string
  148. lastFault atomic.Value // *FaultEvent
  149. faultHistoryMu sync.Mutex
  150. faultHistory []FaultEvent
  151. transitionHistoryMu sync.Mutex
  152. transitionHistory []RuntimeTransition
  153. degradedTransitions atomic.Uint64
  154. mutedTransitions atomic.Uint64
  155. faultedTransitions atomic.Uint64
  156. faultEvents atomic.Uint64
  157. runtimeStateEnteredAt atomic.Uint64
  158. // Live config: pending frequency change, applied between chunks
  159. pendingFreq atomic.Pointer[float64]
  160. // Most recently tuned frequency (Hz)
  161. appliedFreqHz atomic.Uint64
  162. // Live audio stream (optional)
  163. streamSrc *audio.StreamSource
  164. }
  165. // SetStreamSource configures a live audio stream as the audio source.
  166. // Must be called before Start(). The StreamResampler is created internally
  167. // to convert from the stream's sample rate to the DSP composite rate.
  168. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  169. e.streamSrc = src
  170. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  171. if compositeRate <= 0 {
  172. compositeRate = 228000
  173. }
  174. resampler := audio.NewStreamResampler(src, compositeRate)
  175. e.generator.SetExternalSource(resampler)
  176. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  177. src.SampleRate, compositeRate, src.Stats().Capacity)
  178. }
  179. // StreamSource returns the live audio stream source, or nil.
  180. // Used by the control server for stats and HTTP audio ingest.
  181. func (e *Engine) StreamSource() *audio.StreamSource {
  182. return e.streamSrc
  183. }
  184. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  185. deviceRate := cfg.EffectiveDeviceRate()
  186. compositeRate := float64(cfg.FM.CompositeRateHz)
  187. if compositeRate <= 0 {
  188. compositeRate = 228000
  189. }
  190. var upsampler *dsp.FMUpsampler
  191. if deviceRate > compositeRate*1.001 {
  192. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  193. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  194. // This halves CPU load compared to running all DSP at deviceRate.
  195. cfg.FM.FMModulationEnabled = false
  196. maxDev := cfg.FM.MaxDeviationHz
  197. if maxDev <= 0 {
  198. maxDev = 75000
  199. }
  200. // mpxGain scales the FM deviation to compensate for hardware
  201. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  202. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  203. maxDev *= cfg.FM.MpxGain
  204. }
  205. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  206. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  207. compositeRate, deviceRate, deviceRate/compositeRate)
  208. } else {
  209. // Same-rate: entire DSP chain runs at deviceRate.
  210. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  211. if deviceRate > 0 {
  212. cfg.FM.CompositeRateHz = int(deviceRate)
  213. }
  214. cfg.FM.FMModulationEnabled = true
  215. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  216. }
  217. engine := &Engine{
  218. cfg: cfg,
  219. driver: driver,
  220. generator: offpkg.NewGenerator(cfg),
  221. upsampler: upsampler,
  222. chunkDuration: 50 * time.Millisecond,
  223. deviceRate: deviceRate,
  224. state: EngineIdle,
  225. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  226. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  227. transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
  228. }
  229. initFreqHz := cfg.FM.FrequencyMHz * 1e6
  230. engine.appliedFreqHz.Store(math.Float64bits(initFreqHz))
  231. engine.setRuntimeState(RuntimeStateIdle)
  232. return engine
  233. }
  234. func (e *Engine) SetChunkDuration(d time.Duration) {
  235. e.chunkDuration = d
  236. }
  237. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  238. // nil pointers mean "no change". Validated before applying.
  239. type LiveConfigUpdate struct {
  240. FrequencyMHz *float64
  241. OutputDrive *float64
  242. StereoEnabled *bool
  243. PilotLevel *float64
  244. RDSInjection *float64
  245. RDSEnabled *bool
  246. LimiterEnabled *bool
  247. LimiterCeiling *float64
  248. PS *string
  249. RadioText *string
  250. }
  251. // UpdateConfig applies live parameter changes without restarting the engine.
  252. // DSP params take effect at the next chunk boundary (~50ms max).
  253. // Frequency changes are applied between chunks via driver.Tune().
  254. // RDS text updates are applied at the next RDS group boundary (~88ms).
  255. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  256. // --- Validate ---
  257. if u.FrequencyMHz != nil {
  258. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  259. return fmt.Errorf("frequencyMHz out of range (65-110)")
  260. }
  261. }
  262. if u.OutputDrive != nil {
  263. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  264. return fmt.Errorf("outputDrive out of range (0-10)")
  265. }
  266. }
  267. if u.PilotLevel != nil {
  268. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  269. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  270. }
  271. }
  272. if u.RDSInjection != nil {
  273. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  274. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  275. }
  276. }
  277. if u.LimiterCeiling != nil {
  278. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  279. return fmt.Errorf("limiterCeiling out of range (0-2)")
  280. }
  281. }
  282. // --- Frequency: store for run loop to apply via driver.Tune() ---
  283. if u.FrequencyMHz != nil {
  284. freqHz := *u.FrequencyMHz * 1e6
  285. e.pendingFreq.Store(&freqHz)
  286. }
  287. // --- RDS text: forward to encoder atomics ---
  288. if u.PS != nil || u.RadioText != nil {
  289. if enc := e.generator.RDSEncoder(); enc != nil {
  290. ps, rt := "", ""
  291. if u.PS != nil {
  292. ps = *u.PS
  293. }
  294. if u.RadioText != nil {
  295. rt = *u.RadioText
  296. }
  297. enc.UpdateText(ps, rt)
  298. }
  299. }
  300. // --- DSP params: build new LiveParams from current + patch ---
  301. // Read current, apply deltas, store new
  302. current := e.generator.CurrentLiveParams()
  303. next := current // copy
  304. if u.OutputDrive != nil {
  305. next.OutputDrive = *u.OutputDrive
  306. }
  307. if u.StereoEnabled != nil {
  308. next.StereoEnabled = *u.StereoEnabled
  309. }
  310. if u.PilotLevel != nil {
  311. next.PilotLevel = *u.PilotLevel
  312. }
  313. if u.RDSInjection != nil {
  314. next.RDSInjection = *u.RDSInjection
  315. }
  316. if u.RDSEnabled != nil {
  317. next.RDSEnabled = *u.RDSEnabled
  318. }
  319. if u.LimiterEnabled != nil {
  320. next.LimiterEnabled = *u.LimiterEnabled
  321. }
  322. if u.LimiterCeiling != nil {
  323. next.LimiterCeiling = *u.LimiterCeiling
  324. }
  325. e.generator.UpdateLive(next)
  326. return nil
  327. }
  328. func (e *Engine) Start(ctx context.Context) error {
  329. e.mu.Lock()
  330. if e.state != EngineIdle {
  331. e.mu.Unlock()
  332. return fmt.Errorf("engine already in state %s", e.state)
  333. }
  334. if err := e.driver.Start(ctx); err != nil {
  335. e.mu.Unlock()
  336. return fmt.Errorf("driver start: %w", err)
  337. }
  338. runCtx, cancel := context.WithCancel(ctx)
  339. e.cancel = cancel
  340. e.state = EngineRunning
  341. e.setRuntimeState(RuntimeStateArming)
  342. e.startedAt = time.Now()
  343. e.wg.Add(1)
  344. e.mu.Unlock()
  345. go e.run(runCtx)
  346. return nil
  347. }
  348. func (e *Engine) Stop(ctx context.Context) error {
  349. e.mu.Lock()
  350. if e.state != EngineRunning {
  351. e.mu.Unlock()
  352. return nil
  353. }
  354. e.state = EngineStopping
  355. e.setRuntimeState(RuntimeStateStopping)
  356. e.cancel()
  357. e.mu.Unlock()
  358. // Wait for run() goroutine to exit — deterministic, no guessing
  359. e.wg.Wait()
  360. if err := e.driver.Flush(ctx); err != nil {
  361. return err
  362. }
  363. if err := e.driver.Stop(ctx); err != nil {
  364. return err
  365. }
  366. e.mu.Lock()
  367. e.state = EngineIdle
  368. e.setRuntimeState(RuntimeStateIdle)
  369. e.mu.Unlock()
  370. return nil
  371. }
  372. func (e *Engine) Stats() EngineStats {
  373. e.mu.Lock()
  374. state := e.state
  375. startedAt := e.startedAt
  376. e.mu.Unlock()
  377. var uptime float64
  378. if state == EngineRunning {
  379. uptime = time.Since(startedAt).Seconds()
  380. }
  381. errVal, _ := e.lastError.Load().(string)
  382. queue := e.frameQueue.Stats()
  383. lateBuffers := e.lateBuffers.Load()
  384. hasRecentLateBuffers := e.hasRecentLateBuffers()
  385. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  386. lastFault := e.lastFaultEvent()
  387. return EngineStats{
  388. State: string(e.currentRuntimeState()),
  389. RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(),
  390. ChunksProduced: e.chunksProduced.Load(),
  391. TotalSamples: e.totalSamples.Load(),
  392. Underruns: e.underruns.Load(),
  393. LateBuffers: lateBuffers,
  394. LastError: errVal,
  395. UptimeSeconds: uptime,
  396. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  397. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  398. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  399. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  400. MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()),
  401. MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()),
  402. Queue: queue,
  403. RuntimeIndicator: ri,
  404. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  405. AppliedFrequencyMHz: e.appliedFrequencyMHz(),
  406. LastFault: lastFault,
  407. DegradedTransitions: e.degradedTransitions.Load(),
  408. MutedTransitions: e.mutedTransitions.Load(),
  409. FaultedTransitions: e.faultedTransitions.Load(),
  410. FaultCount: e.faultEvents.Load(),
  411. FaultHistory: e.FaultHistory(),
  412. TransitionHistory: e.TransitionHistory(),
  413. }
  414. }
  415. func (e *Engine) appliedFrequencyMHz() float64 {
  416. bits := e.appliedFreqHz.Load()
  417. return math.Float64frombits(bits) / 1e6
  418. }
  419. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  420. switch {
  421. case queueHealth == output.QueueHealthCritical:
  422. return RuntimeIndicatorQueueCritical
  423. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  424. return RuntimeIndicatorDegraded
  425. default:
  426. return RuntimeIndicatorNormal
  427. }
  428. }
  429. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  430. switch {
  431. case queueHealth == output.QueueHealthCritical:
  432. return "queue health critical"
  433. case recentLateBuffers:
  434. return "late buffers"
  435. case queueHealth == output.QueueHealthLow:
  436. return "queue health low"
  437. default:
  438. return ""
  439. }
  440. }
  441. func runtimeStateSeverity(state RuntimeState) string {
  442. switch state {
  443. case RuntimeStateRunning:
  444. return "ok"
  445. case RuntimeStateDegraded, RuntimeStateMuted:
  446. return "warn"
  447. case RuntimeStateFaulted:
  448. return "err"
  449. default:
  450. return "info"
  451. }
  452. }
  453. func (e *Engine) run(ctx context.Context) {
  454. e.setRuntimeState(RuntimeStatePrebuffering)
  455. e.wg.Add(1)
  456. go e.writerLoop(ctx)
  457. defer e.wg.Done()
  458. for {
  459. if ctx.Err() != nil {
  460. return
  461. }
  462. // Apply pending frequency change between chunks
  463. if pf := e.pendingFreq.Swap(nil); pf != nil {
  464. if err := e.driver.Tune(ctx, *pf); err != nil {
  465. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  466. } else {
  467. e.appliedFreqHz.Store(math.Float64bits(*pf))
  468. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  469. }
  470. }
  471. t0 := time.Now()
  472. frame := e.generator.GenerateFrame(e.chunkDuration)
  473. frame.GeneratedAt = t0
  474. t1 := time.Now()
  475. if e.upsampler != nil {
  476. frame = e.upsampler.Process(frame)
  477. frame.GeneratedAt = t0
  478. }
  479. t2 := time.Now()
  480. genDur := t1.Sub(t0)
  481. upDur := t2.Sub(t1)
  482. updateMaxDuration(&e.maxGenerateNs, genDur)
  483. updateMaxDuration(&e.maxUpsampleNs, upDur)
  484. enqueued := cloneFrame(frame)
  485. enqueued.EnqueuedAt = time.Now()
  486. if enqueued == nil {
  487. e.lastError.Store("engine: frame clone failed")
  488. e.underruns.Add(1)
  489. continue
  490. }
  491. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  492. if ctx.Err() != nil {
  493. return
  494. }
  495. if errors.Is(err, output.ErrFrameQueueClosed) {
  496. return
  497. }
  498. e.lastError.Store(err.Error())
  499. e.underruns.Add(1)
  500. select {
  501. case <-time.After(e.chunkDuration):
  502. case <-ctx.Done():
  503. return
  504. }
  505. continue
  506. }
  507. queueStats := e.frameQueue.Stats()
  508. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  509. }
  510. }
  511. func (e *Engine) writerLoop(ctx context.Context) {
  512. defer e.wg.Done()
  513. for {
  514. frame, err := e.frameQueue.Pop(ctx)
  515. if err != nil {
  516. if ctx.Err() != nil {
  517. return
  518. }
  519. if errors.Is(err, output.ErrFrameQueueClosed) {
  520. return
  521. }
  522. e.lastError.Store(err.Error())
  523. e.underruns.Add(1)
  524. continue
  525. }
  526. frame.DequeuedAt = time.Now()
  527. queueResidence := time.Duration(0)
  528. if !frame.EnqueuedAt.IsZero() {
  529. queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt)
  530. }
  531. writeStart := time.Now()
  532. frame.WriteStartedAt = writeStart
  533. n, err := e.driver.Write(ctx, frame)
  534. writeDur := time.Since(writeStart)
  535. pipelineLatency := writeDur
  536. if !frame.GeneratedAt.IsZero() {
  537. pipelineLatency = time.Since(frame.GeneratedAt)
  538. }
  539. updateMaxDuration(&e.maxWriteNs, writeDur)
  540. updateMaxDuration(&e.maxQueueResidenceNs, queueResidence)
  541. updateMaxDuration(&e.maxPipelineNs, pipelineLatency)
  542. updateMaxDuration(&e.maxCycleNs, writeDur)
  543. queueStats := e.frameQueue.Stats()
  544. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  545. lateOver := writeDur - e.chunkDuration
  546. if lateOver > writeLateTolerance {
  547. streak := e.lateBufferStreak.Add(1)
  548. late := e.lateBuffers.Add(1)
  549. // Only arm the alert window once the streak threshold is reached.
  550. // Isolated OS-scheduling or USB jitter spikes (single late writes)
  551. // are normal on a loaded system and must not trigger degraded state.
  552. // This mirrors the queue-health streak logic.
  553. if streak >= lateBufferStreakThreshold {
  554. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  555. }
  556. if late <= 5 || late%20 == 0 {
  557. log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s",
  558. streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency)
  559. }
  560. } else {
  561. // Clean write — reset the consecutive streak so isolated spikes
  562. // never accumulate toward the threshold.
  563. e.lateBufferStreak.Store(0)
  564. }
  565. if err != nil {
  566. if ctx.Err() != nil {
  567. return
  568. }
  569. e.recordFault(FaultReasonWriteTimeout, FaultSeverityWarn, fmt.Sprintf("driver write error: %v", err))
  570. e.lastError.Store(err.Error())
  571. e.underruns.Add(1)
  572. select {
  573. case <-time.After(e.chunkDuration):
  574. case <-ctx.Done():
  575. return
  576. }
  577. continue
  578. }
  579. e.chunksProduced.Add(1)
  580. e.totalSamples.Add(uint64(n))
  581. }
  582. }
  583. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  584. if src == nil {
  585. return nil
  586. }
  587. samples := make([]output.IQSample, len(src.Samples))
  588. copy(samples, src.Samples)
  589. return &output.CompositeFrame{
  590. Samples: samples,
  591. SampleRateHz: src.SampleRateHz,
  592. Timestamp: src.Timestamp,
  593. GeneratedAt: src.GeneratedAt,
  594. EnqueuedAt: src.EnqueuedAt,
  595. DequeuedAt: src.DequeuedAt,
  596. WriteStartedAt: src.WriteStartedAt,
  597. Sequence: src.Sequence,
  598. }
  599. }
  600. func (e *Engine) setRuntimeState(state RuntimeState) {
  601. now := time.Now()
  602. prev := e.currentRuntimeState()
  603. if prev != state {
  604. e.recordRuntimeTransition(prev, state, now)
  605. switch state {
  606. case RuntimeStateDegraded:
  607. e.degradedTransitions.Add(1)
  608. case RuntimeStateMuted:
  609. e.mutedTransitions.Add(1)
  610. case RuntimeStateFaulted:
  611. e.faultedTransitions.Add(1)
  612. }
  613. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  614. } else if e.runtimeStateEnteredAt.Load() == 0 {
  615. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  616. }
  617. e.runtimeState.Store(state)
  618. }
  619. func (e *Engine) currentRuntimeState() RuntimeState {
  620. if v := e.runtimeState.Load(); v != nil {
  621. if rs, ok := v.(RuntimeState); ok {
  622. return rs
  623. }
  624. }
  625. return RuntimeStateIdle
  626. }
  627. func (e *Engine) runtimeStateDurationSeconds() float64 {
  628. if ts := e.runtimeStateEnteredAt.Load(); ts != 0 {
  629. return time.Since(time.Unix(0, int64(ts))).Seconds()
  630. }
  631. return 0
  632. }
  633. func (e *Engine) hasRecentLateBuffers() bool {
  634. lateAlertAt := e.lateBufferAlertAt.Load()
  635. if lateAlertAt == 0 {
  636. return false
  637. }
  638. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  639. }
  640. func (e *Engine) lastFaultEvent() *FaultEvent {
  641. return copyFaultEvent(e.loadLastFault())
  642. }
  643. // LastFault exposes the most recent captured fault, if any.
  644. func (e *Engine) LastFault() *FaultEvent {
  645. return e.lastFaultEvent()
  646. }
  647. func (e *Engine) FaultHistory() []FaultEvent {
  648. e.faultHistoryMu.Lock()
  649. defer e.faultHistoryMu.Unlock()
  650. history := make([]FaultEvent, len(e.faultHistory))
  651. copy(history, e.faultHistory)
  652. return history
  653. }
  654. func (e *Engine) TransitionHistory() []RuntimeTransition {
  655. e.transitionHistoryMu.Lock()
  656. defer e.transitionHistoryMu.Unlock()
  657. history := make([]RuntimeTransition, len(e.transitionHistory))
  658. copy(history, e.transitionHistory)
  659. return history
  660. }
  661. func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) {
  662. if when.IsZero() {
  663. when = time.Now()
  664. }
  665. ev := RuntimeTransition{
  666. Time: when,
  667. From: from,
  668. To: to,
  669. Severity: runtimeStateSeverity(to),
  670. }
  671. e.transitionHistoryMu.Lock()
  672. defer e.transitionHistoryMu.Unlock()
  673. if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
  674. copy(e.transitionHistory, e.transitionHistory[1:])
  675. e.transitionHistory[len(e.transitionHistory)-1] = ev
  676. return
  677. }
  678. e.transitionHistory = append(e.transitionHistory, ev)
  679. }
  680. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  681. if reason == "" {
  682. reason = FaultReasonUnknown
  683. }
  684. now := time.Now()
  685. if last := e.loadLastFault(); last != nil {
  686. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  687. return
  688. }
  689. }
  690. ev := &FaultEvent{
  691. Time: now,
  692. Reason: reason,
  693. Severity: severity,
  694. Message: message,
  695. }
  696. e.lastFault.Store(ev)
  697. e.appendFaultHistory(ev)
  698. e.faultEvents.Add(1)
  699. }
  700. func (e *Engine) loadLastFault() *FaultEvent {
  701. if v := e.lastFault.Load(); v != nil {
  702. if ev, ok := v.(*FaultEvent); ok {
  703. return ev
  704. }
  705. }
  706. return nil
  707. }
  708. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  709. if source == nil {
  710. return nil
  711. }
  712. copy := *source
  713. return &copy
  714. }
  715. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  716. e.faultHistoryMu.Lock()
  717. defer e.faultHistoryMu.Unlock()
  718. if len(e.faultHistory) >= faultHistoryCapacity {
  719. copy(e.faultHistory, e.faultHistory[1:])
  720. e.faultHistory[len(e.faultHistory)-1] = *ev
  721. return
  722. }
  723. e.faultHistory = append(e.faultHistory, *ev)
  724. }
  725. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  726. state := e.currentRuntimeState()
  727. switch state {
  728. case RuntimeStateStopping, RuntimeStateFaulted:
  729. return
  730. case RuntimeStateMuted:
  731. if queue.Health == output.QueueHealthCritical {
  732. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  733. e.mutedFaultStreak.Store(0)
  734. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  735. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  736. e.setRuntimeState(RuntimeStateFaulted)
  737. return
  738. }
  739. } else {
  740. e.mutedFaultStreak.Store(0)
  741. }
  742. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  743. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  744. e.mutedRecoveryStreak.Store(0)
  745. e.mutedFaultStreak.Store(0)
  746. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  747. fmt.Sprintf("queue healthy for %d checks after mute", count))
  748. e.setRuntimeState(RuntimeStateDegraded)
  749. }
  750. } else {
  751. e.mutedRecoveryStreak.Store(0)
  752. }
  753. return
  754. }
  755. if state == RuntimeStatePrebuffering {
  756. if queue.Depth >= 1 {
  757. e.setRuntimeState(RuntimeStateRunning)
  758. }
  759. return
  760. }
  761. critical := queue.Health == output.QueueHealthCritical
  762. if critical {
  763. count := e.criticalStreak.Add(1)
  764. if count >= queueMutedStreakThreshold {
  765. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  766. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  767. e.setRuntimeState(RuntimeStateMuted)
  768. return
  769. }
  770. if count >= queueCriticalStreakThreshold {
  771. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  772. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  773. e.setRuntimeState(RuntimeStateDegraded)
  774. return
  775. }
  776. } else {
  777. e.criticalStreak.Store(0)
  778. }
  779. if hasLateBuffers {
  780. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  781. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  782. e.setRuntimeState(RuntimeStateDegraded)
  783. return
  784. }
  785. e.setRuntimeState(RuntimeStateRunning)
  786. }
  787. // ResetFault attempts to move the engine out of the faulted state.
  788. func (e *Engine) ResetFault() error {
  789. state := e.currentRuntimeState()
  790. if state != RuntimeStateFaulted {
  791. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  792. }
  793. e.criticalStreak.Store(0)
  794. e.mutedRecoveryStreak.Store(0)
  795. e.mutedFaultStreak.Store(0)
  796. e.setRuntimeState(RuntimeStateDegraded)
  797. return nil
  798. }