Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

917 wiersze
27KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "math"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/jan/fm-rds-tx/internal/audio"
  12. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  13. "github.com/jan/fm-rds-tx/internal/dsp"
  14. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  15. "github.com/jan/fm-rds-tx/internal/output"
  16. "github.com/jan/fm-rds-tx/internal/platform"
  17. )
  18. type EngineState int
  19. const (
  20. EngineIdle EngineState = iota
  21. EngineRunning
  22. EngineStopping
  23. )
  24. func (s EngineState) String() string {
  25. switch s {
  26. case EngineIdle:
  27. return "idle"
  28. case EngineRunning:
  29. return "running"
  30. case EngineStopping:
  31. return "stopping"
  32. default:
  33. return "unknown"
  34. }
  35. }
  36. type RuntimeState string
  37. const (
  38. RuntimeStateIdle RuntimeState = "idle"
  39. RuntimeStateArming RuntimeState = "arming"
  40. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  41. RuntimeStateRunning RuntimeState = "running"
  42. RuntimeStateDegraded RuntimeState = "degraded"
  43. RuntimeStateMuted RuntimeState = "muted"
  44. RuntimeStateFaulted RuntimeState = "faulted"
  45. RuntimeStateStopping RuntimeState = "stopping"
  46. )
  47. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  48. v := uint64(d)
  49. for {
  50. cur := dst.Load()
  51. if v <= cur {
  52. return
  53. }
  54. if dst.CompareAndSwap(cur, v) {
  55. return
  56. }
  57. }
  58. }
  59. func durationMs(ns uint64) float64 {
  60. return float64(ns) / float64(time.Millisecond)
  61. }
  62. type EngineStats struct {
  63. State string `json:"state"`
  64. RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"`
  65. ChunksProduced uint64 `json:"chunksProduced"`
  66. TotalSamples uint64 `json:"totalSamples"`
  67. Underruns uint64 `json:"underruns"`
  68. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  69. LastError string `json:"lastError,omitempty"`
  70. UptimeSeconds float64 `json:"uptimeSeconds"`
  71. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  72. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  73. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  74. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  75. MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"`
  76. MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"`
  77. Queue output.QueueStats `json:"queue"`
  78. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  79. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  80. AppliedFrequencyMHz float64 `json:"appliedFrequencyMHz"`
  81. ActivePS string `json:"activePS,omitempty"`
  82. ActiveRadioText string `json:"activeRadioText,omitempty"`
  83. LastFault *FaultEvent `json:"lastFault,omitempty"`
  84. DegradedTransitions uint64 `json:"degradedTransitions"`
  85. MutedTransitions uint64 `json:"mutedTransitions"`
  86. FaultedTransitions uint64 `json:"faultedTransitions"`
  87. FaultCount uint64 `json:"faultCount"`
  88. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  89. TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
  90. }
  91. type RuntimeIndicator string
  92. const (
  93. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  94. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  95. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  96. )
  97. type RuntimeTransition struct {
  98. Time time.Time `json:"time"`
  99. From RuntimeState `json:"from"`
  100. To RuntimeState `json:"to"`
  101. Severity string `json:"severity"`
  102. }
  103. const (
  104. lateBufferIndicatorWindow = 2 * time.Second
  105. writeLateTolerance = 10 * time.Millisecond
  106. queueCriticalStreakThreshold = 3
  107. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  108. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  109. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  110. faultRepeatWindow = 1 * time.Second
  111. lateBufferStreakThreshold = 3 // consecutive late writes required before alerting
  112. faultHistoryCapacity = 8
  113. runtimeTransitionHistoryCapacity = 8
  114. )
  115. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  116. // resamples to device rate, and pushes to hardware in a tight loop.
  117. // The hardware buffer_push call is blocking — it returns when the hardware
  118. // has consumed the previous buffer and is ready for the next one.
  119. // This naturally paces the loop to real-time without a ticker.
  120. type Engine struct {
  121. cfg cfgpkg.Config
  122. driver platform.SoapyDriver
  123. generator *offpkg.Generator
  124. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  125. chunkDuration time.Duration
  126. deviceRate float64
  127. frameQueue *output.FrameQueue
  128. mu sync.Mutex
  129. state EngineState
  130. cancel context.CancelFunc
  131. startedAt time.Time
  132. wg sync.WaitGroup
  133. runtimeState atomic.Value
  134. stateMu sync.Mutex // guards setRuntimeState check-then-store (NEW-2 fix)
  135. chunksProduced atomic.Uint64
  136. totalSamples atomic.Uint64
  137. underruns atomic.Uint64
  138. lateBuffers atomic.Uint64
  139. lateBufferAlertAt atomic.Uint64
  140. lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write
  141. criticalStreak atomic.Uint64
  142. mutedRecoveryStreak atomic.Uint64
  143. mutedFaultStreak atomic.Uint64
  144. maxCycleNs atomic.Uint64
  145. maxGenerateNs atomic.Uint64
  146. maxUpsampleNs atomic.Uint64
  147. maxWriteNs atomic.Uint64
  148. maxQueueResidenceNs atomic.Uint64
  149. maxPipelineNs atomic.Uint64
  150. lastError atomic.Value // string
  151. lastFault atomic.Value // *FaultEvent
  152. faultHistoryMu sync.Mutex
  153. faultHistory []FaultEvent
  154. transitionHistoryMu sync.Mutex
  155. transitionHistory []RuntimeTransition
  156. degradedTransitions atomic.Uint64
  157. mutedTransitions atomic.Uint64
  158. faultedTransitions atomic.Uint64
  159. faultEvents atomic.Uint64
  160. runtimeStateEnteredAt atomic.Uint64
  161. // Live config: pending frequency change, applied between chunks
  162. pendingFreq atomic.Pointer[float64]
  163. // Most recently tuned frequency (Hz)
  164. appliedFreqHz atomic.Uint64
  165. // Live audio stream (optional)
  166. streamSrc *audio.StreamSource
  167. }
  168. // SetStreamSource configures a live audio stream as the audio source.
  169. // Must be called before Start(). The StreamResampler is created internally
  170. // to convert from the stream's sample rate to the DSP composite rate.
  171. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  172. e.streamSrc = src
  173. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  174. if compositeRate <= 0 {
  175. compositeRate = 228000
  176. }
  177. resampler := audio.NewStreamResampler(src, compositeRate)
  178. e.generator.SetExternalSource(resampler)
  179. log.Printf("engine: live audio stream wired — initial %d Hz → %.0f Hz composite (buffer %d frames); actual decoded rate auto-corrects on first chunk",
  180. src.SampleRate, compositeRate, src.Stats().Capacity)
  181. }
  182. // StreamSource returns the live audio stream source, or nil.
  183. // Used by the control server for stats and HTTP audio ingest.
  184. func (e *Engine) StreamSource() *audio.StreamSource {
  185. return e.streamSrc
  186. }
  187. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  188. deviceRate := cfg.EffectiveDeviceRate()
  189. compositeRate := float64(cfg.FM.CompositeRateHz)
  190. if compositeRate <= 0 {
  191. compositeRate = 228000
  192. }
  193. var upsampler *dsp.FMUpsampler
  194. if deviceRate > compositeRate*1.001 {
  195. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  196. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  197. // This halves CPU load compared to running all DSP at deviceRate.
  198. cfg.FM.FMModulationEnabled = false
  199. maxDev := cfg.FM.MaxDeviationHz
  200. if maxDev <= 0 {
  201. maxDev = 75000
  202. }
  203. // mpxGain scales the FM deviation to compensate for hardware
  204. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  205. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  206. maxDev *= cfg.FM.MpxGain
  207. }
  208. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  209. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  210. compositeRate, deviceRate, deviceRate/compositeRate)
  211. } else {
  212. // Same-rate: entire DSP chain runs at deviceRate.
  213. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  214. if deviceRate > 0 {
  215. cfg.FM.CompositeRateHz = int(deviceRate)
  216. }
  217. cfg.FM.FMModulationEnabled = true
  218. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  219. }
  220. engine := &Engine{
  221. cfg: cfg,
  222. driver: driver,
  223. generator: offpkg.NewGenerator(cfg),
  224. upsampler: upsampler,
  225. chunkDuration: 50 * time.Millisecond,
  226. deviceRate: deviceRate,
  227. state: EngineIdle,
  228. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  229. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  230. transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
  231. }
  232. initFreqHz := cfg.FM.FrequencyMHz * 1e6
  233. engine.appliedFreqHz.Store(math.Float64bits(initFreqHz))
  234. engine.setRuntimeState(RuntimeStateIdle)
  235. return engine
  236. }
  237. func (e *Engine) SetChunkDuration(d time.Duration) {
  238. e.chunkDuration = d
  239. }
  240. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  241. // nil pointers mean "no change". Validated before applying.
  242. type LiveConfigUpdate struct {
  243. FrequencyMHz *float64
  244. OutputDrive *float64
  245. StereoEnabled *bool
  246. PilotLevel *float64
  247. RDSInjection *float64
  248. RDSEnabled *bool
  249. LimiterEnabled *bool
  250. LimiterCeiling *float64
  251. PS *string
  252. RadioText *string
  253. // Tone and gain: live-patchable without engine restart.
  254. ToneLeftHz *float64
  255. ToneRightHz *float64
  256. ToneAmplitude *float64
  257. AudioGain *float64
  258. }
  259. // UpdateConfig applies live parameter changes without restarting the engine.
  260. // DSP params take effect at the next chunk boundary (~50ms max).
  261. // Frequency changes are applied between chunks via driver.Tune().
  262. // RDS text updates are applied at the next RDS group boundary (~88ms).
  263. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  264. // --- Validate ---
  265. if u.FrequencyMHz != nil {
  266. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  267. return fmt.Errorf("frequencyMHz out of range (65-110)")
  268. }
  269. }
  270. if u.OutputDrive != nil {
  271. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  272. return fmt.Errorf("outputDrive out of range (0-10)")
  273. }
  274. }
  275. if u.PilotLevel != nil {
  276. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  277. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  278. }
  279. }
  280. if u.RDSInjection != nil {
  281. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  282. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  283. }
  284. }
  285. if u.LimiterCeiling != nil {
  286. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  287. return fmt.Errorf("limiterCeiling out of range (0-2)")
  288. }
  289. }
  290. if u.ToneAmplitude != nil {
  291. if *u.ToneAmplitude < 0 || *u.ToneAmplitude > 1 {
  292. return fmt.Errorf("toneAmplitude out of range (0-1)")
  293. }
  294. }
  295. if u.AudioGain != nil {
  296. if *u.AudioGain < 0 || *u.AudioGain > 4 {
  297. return fmt.Errorf("audioGain out of range (0-4)")
  298. }
  299. }
  300. // --- Frequency: store for run loop to apply via driver.Tune() ---
  301. if u.FrequencyMHz != nil {
  302. freqHz := *u.FrequencyMHz * 1e6
  303. e.pendingFreq.Store(&freqHz)
  304. }
  305. // --- RDS text: forward to encoder atomics ---
  306. if u.PS != nil || u.RadioText != nil {
  307. if enc := e.generator.RDSEncoder(); enc != nil {
  308. ps, rt := "", ""
  309. if u.PS != nil {
  310. ps = *u.PS
  311. }
  312. if u.RadioText != nil {
  313. rt = *u.RadioText
  314. }
  315. enc.UpdateText(ps, rt)
  316. }
  317. }
  318. // --- DSP params: build new LiveParams from current + patch ---
  319. // Read current, apply deltas, store new
  320. current := e.generator.CurrentLiveParams()
  321. next := current // copy
  322. if u.OutputDrive != nil {
  323. next.OutputDrive = *u.OutputDrive
  324. }
  325. if u.StereoEnabled != nil {
  326. next.StereoEnabled = *u.StereoEnabled
  327. }
  328. if u.PilotLevel != nil {
  329. next.PilotLevel = *u.PilotLevel
  330. }
  331. if u.RDSInjection != nil {
  332. next.RDSInjection = *u.RDSInjection
  333. }
  334. if u.RDSEnabled != nil {
  335. next.RDSEnabled = *u.RDSEnabled
  336. }
  337. if u.LimiterEnabled != nil {
  338. next.LimiterEnabled = *u.LimiterEnabled
  339. }
  340. if u.LimiterCeiling != nil {
  341. next.LimiterCeiling = *u.LimiterCeiling
  342. }
  343. if u.ToneLeftHz != nil {
  344. next.ToneLeftHz = *u.ToneLeftHz
  345. }
  346. if u.ToneRightHz != nil {
  347. next.ToneRightHz = *u.ToneRightHz
  348. }
  349. if u.ToneAmplitude != nil {
  350. next.ToneAmplitude = *u.ToneAmplitude
  351. }
  352. if u.AudioGain != nil {
  353. next.AudioGain = *u.AudioGain
  354. }
  355. e.generator.UpdateLive(next)
  356. return nil
  357. }
  358. func (e *Engine) Start(ctx context.Context) error {
  359. e.mu.Lock()
  360. if e.state != EngineIdle {
  361. e.mu.Unlock()
  362. return fmt.Errorf("engine already in state %s", e.state)
  363. }
  364. if err := e.driver.Start(ctx); err != nil {
  365. e.mu.Unlock()
  366. return fmt.Errorf("driver start: %w", err)
  367. }
  368. runCtx, cancel := context.WithCancel(ctx)
  369. e.cancel = cancel
  370. e.state = EngineRunning
  371. e.setRuntimeState(RuntimeStateArming)
  372. e.startedAt = time.Now()
  373. e.wg.Add(1)
  374. e.mu.Unlock()
  375. go e.run(runCtx)
  376. return nil
  377. }
  378. func (e *Engine) Stop(ctx context.Context) error {
  379. e.mu.Lock()
  380. if e.state != EngineRunning {
  381. e.mu.Unlock()
  382. return nil
  383. }
  384. e.state = EngineStopping
  385. e.setRuntimeState(RuntimeStateStopping)
  386. e.cancel()
  387. e.mu.Unlock()
  388. // Wait for run() goroutine to exit — deterministic, no guessing
  389. e.wg.Wait()
  390. if err := e.driver.Flush(ctx); err != nil {
  391. return err
  392. }
  393. if err := e.driver.Stop(ctx); err != nil {
  394. return err
  395. }
  396. e.mu.Lock()
  397. e.state = EngineIdle
  398. e.setRuntimeState(RuntimeStateIdle)
  399. e.mu.Unlock()
  400. return nil
  401. }
  402. func (e *Engine) Stats() EngineStats {
  403. e.mu.Lock()
  404. state := e.state
  405. startedAt := e.startedAt
  406. e.mu.Unlock()
  407. var uptime float64
  408. if state == EngineRunning {
  409. uptime = time.Since(startedAt).Seconds()
  410. }
  411. errVal, _ := e.lastError.Load().(string)
  412. queue := e.frameQueue.Stats()
  413. lateBuffers := e.lateBuffers.Load()
  414. hasRecentLateBuffers := e.hasRecentLateBuffers()
  415. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  416. lastFault := e.lastFaultEvent()
  417. activePS, activeRT := "", ""
  418. if enc := e.generator.RDSEncoder(); enc != nil {
  419. activePS, activeRT = enc.CurrentText()
  420. }
  421. return EngineStats{
  422. State: string(e.currentRuntimeState()),
  423. RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(),
  424. ChunksProduced: e.chunksProduced.Load(),
  425. TotalSamples: e.totalSamples.Load(),
  426. Underruns: e.underruns.Load(),
  427. LateBuffers: lateBuffers,
  428. LastError: errVal,
  429. UptimeSeconds: uptime,
  430. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  431. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  432. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  433. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  434. MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()),
  435. MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()),
  436. Queue: queue,
  437. RuntimeIndicator: ri,
  438. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  439. AppliedFrequencyMHz: e.appliedFrequencyMHz(),
  440. ActivePS: activePS,
  441. ActiveRadioText: activeRT,
  442. LastFault: lastFault,
  443. DegradedTransitions: e.degradedTransitions.Load(),
  444. MutedTransitions: e.mutedTransitions.Load(),
  445. FaultedTransitions: e.faultedTransitions.Load(),
  446. FaultCount: e.faultEvents.Load(),
  447. FaultHistory: e.FaultHistory(),
  448. TransitionHistory: e.TransitionHistory(),
  449. }
  450. }
  451. func (e *Engine) appliedFrequencyMHz() float64 {
  452. bits := e.appliedFreqHz.Load()
  453. return math.Float64frombits(bits) / 1e6
  454. }
  455. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  456. switch {
  457. case queueHealth == output.QueueHealthCritical:
  458. return RuntimeIndicatorQueueCritical
  459. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  460. return RuntimeIndicatorDegraded
  461. default:
  462. return RuntimeIndicatorNormal
  463. }
  464. }
  465. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  466. switch {
  467. case queueHealth == output.QueueHealthCritical:
  468. return "queue health critical"
  469. case recentLateBuffers:
  470. return "late buffers"
  471. case queueHealth == output.QueueHealthLow:
  472. return "queue health low"
  473. default:
  474. return ""
  475. }
  476. }
  477. func runtimeStateSeverity(state RuntimeState) string {
  478. switch state {
  479. case RuntimeStateRunning:
  480. return "ok"
  481. case RuntimeStateDegraded, RuntimeStateMuted:
  482. return "warn"
  483. case RuntimeStateFaulted:
  484. return "err"
  485. default:
  486. return "info"
  487. }
  488. }
  489. func (e *Engine) run(ctx context.Context) {
  490. e.setRuntimeState(RuntimeStatePrebuffering)
  491. e.wg.Add(1)
  492. go e.writerLoop(ctx)
  493. defer e.wg.Done()
  494. for {
  495. if ctx.Err() != nil {
  496. return
  497. }
  498. // Apply pending frequency change between chunks
  499. if pf := e.pendingFreq.Swap(nil); pf != nil {
  500. if err := e.driver.Tune(ctx, *pf); err != nil {
  501. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  502. } else {
  503. e.appliedFreqHz.Store(math.Float64bits(*pf))
  504. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  505. }
  506. }
  507. t0 := time.Now()
  508. frame := e.generator.GenerateFrame(e.chunkDuration)
  509. frame.GeneratedAt = t0
  510. t1 := time.Now()
  511. if e.upsampler != nil {
  512. frame = e.upsampler.Process(frame)
  513. frame.GeneratedAt = t0
  514. }
  515. t2 := time.Now()
  516. genDur := t1.Sub(t0)
  517. upDur := t2.Sub(t1)
  518. updateMaxDuration(&e.maxGenerateNs, genDur)
  519. updateMaxDuration(&e.maxUpsampleNs, upDur)
  520. // cloneFrame never returns nil when src is non-nil (NEW-3: dead nil check removed)
  521. enqueued := cloneFrame(frame)
  522. enqueued.EnqueuedAt = time.Now()
  523. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  524. if ctx.Err() != nil {
  525. return
  526. }
  527. if errors.Is(err, output.ErrFrameQueueClosed) {
  528. return
  529. }
  530. e.lastError.Store(err.Error())
  531. e.underruns.Add(1)
  532. select {
  533. case <-time.After(e.chunkDuration):
  534. case <-ctx.Done():
  535. return
  536. }
  537. continue
  538. }
  539. queueStats := e.frameQueue.Stats()
  540. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  541. }
  542. }
  543. func (e *Engine) writerLoop(ctx context.Context) {
  544. defer e.wg.Done()
  545. for {
  546. frame, err := e.frameQueue.Pop(ctx)
  547. if err != nil {
  548. if ctx.Err() != nil {
  549. return
  550. }
  551. if errors.Is(err, output.ErrFrameQueueClosed) {
  552. return
  553. }
  554. e.lastError.Store(err.Error())
  555. e.underruns.Add(1)
  556. continue
  557. }
  558. frame.DequeuedAt = time.Now()
  559. queueResidence := time.Duration(0)
  560. if !frame.EnqueuedAt.IsZero() {
  561. queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt)
  562. }
  563. writeStart := time.Now()
  564. frame.WriteStartedAt = writeStart
  565. n, err := e.driver.Write(ctx, frame)
  566. writeDur := time.Since(writeStart)
  567. pipelineLatency := writeDur
  568. if !frame.GeneratedAt.IsZero() {
  569. pipelineLatency = time.Since(frame.GeneratedAt)
  570. }
  571. updateMaxDuration(&e.maxWriteNs, writeDur)
  572. updateMaxDuration(&e.maxQueueResidenceNs, queueResidence)
  573. updateMaxDuration(&e.maxPipelineNs, pipelineLatency)
  574. updateMaxDuration(&e.maxCycleNs, writeDur)
  575. queueStats := e.frameQueue.Stats()
  576. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  577. lateOver := writeDur - e.chunkDuration
  578. if lateOver > writeLateTolerance {
  579. streak := e.lateBufferStreak.Add(1)
  580. late := e.lateBuffers.Add(1)
  581. // Only arm the alert window once the streak threshold is reached.
  582. // Isolated OS-scheduling or USB jitter spikes (single late writes)
  583. // are normal on a loaded system and must not trigger degraded state.
  584. // This mirrors the queue-health streak logic.
  585. if streak >= lateBufferStreakThreshold {
  586. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  587. }
  588. if late <= 5 || late%20 == 0 {
  589. log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s",
  590. streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency)
  591. }
  592. } else {
  593. // Clean write — reset the consecutive streak so isolated spikes
  594. // never accumulate toward the threshold.
  595. e.lateBufferStreak.Store(0)
  596. }
  597. if err != nil {
  598. if ctx.Err() != nil {
  599. return
  600. }
  601. e.recordFault(FaultReasonWriteTimeout, FaultSeverityWarn, fmt.Sprintf("driver write error: %v", err))
  602. e.lastError.Store(err.Error())
  603. e.underruns.Add(1)
  604. select {
  605. case <-time.After(e.chunkDuration):
  606. case <-ctx.Done():
  607. return
  608. }
  609. continue
  610. }
  611. e.chunksProduced.Add(1)
  612. e.totalSamples.Add(uint64(n))
  613. }
  614. }
  615. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  616. if src == nil {
  617. return nil
  618. }
  619. samples := make([]output.IQSample, len(src.Samples))
  620. copy(samples, src.Samples)
  621. return &output.CompositeFrame{
  622. Samples: samples,
  623. SampleRateHz: src.SampleRateHz,
  624. Timestamp: src.Timestamp,
  625. GeneratedAt: src.GeneratedAt,
  626. EnqueuedAt: src.EnqueuedAt,
  627. DequeuedAt: src.DequeuedAt,
  628. WriteStartedAt: src.WriteStartedAt,
  629. Sequence: src.Sequence,
  630. }
  631. }
  632. func (e *Engine) setRuntimeState(state RuntimeState) {
  633. // NEW-2 fix: hold stateMu so that concurrent calls from run() and
  634. // writerLoop() cannot both see prev != state and both record a
  635. // spurious duplicate transition.
  636. e.stateMu.Lock()
  637. defer e.stateMu.Unlock()
  638. now := time.Now()
  639. prev := e.currentRuntimeState()
  640. if prev != state {
  641. e.recordRuntimeTransition(prev, state, now)
  642. switch state {
  643. case RuntimeStateDegraded:
  644. e.degradedTransitions.Add(1)
  645. case RuntimeStateMuted:
  646. e.mutedTransitions.Add(1)
  647. case RuntimeStateFaulted:
  648. e.faultedTransitions.Add(1)
  649. }
  650. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  651. } else if e.runtimeStateEnteredAt.Load() == 0 {
  652. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  653. }
  654. e.runtimeState.Store(state)
  655. }
  656. func (e *Engine) currentRuntimeState() RuntimeState {
  657. if v := e.runtimeState.Load(); v != nil {
  658. if rs, ok := v.(RuntimeState); ok {
  659. return rs
  660. }
  661. }
  662. return RuntimeStateIdle
  663. }
  664. func (e *Engine) runtimeStateDurationSeconds() float64 {
  665. if ts := e.runtimeStateEnteredAt.Load(); ts != 0 {
  666. return time.Since(time.Unix(0, int64(ts))).Seconds()
  667. }
  668. return 0
  669. }
  670. func (e *Engine) hasRecentLateBuffers() bool {
  671. lateAlertAt := e.lateBufferAlertAt.Load()
  672. if lateAlertAt == 0 {
  673. return false
  674. }
  675. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  676. }
  677. func (e *Engine) lastFaultEvent() *FaultEvent {
  678. return copyFaultEvent(e.loadLastFault())
  679. }
  680. // LastFault exposes the most recent captured fault, if any.
  681. func (e *Engine) LastFault() *FaultEvent {
  682. return e.lastFaultEvent()
  683. }
  684. func (e *Engine) FaultHistory() []FaultEvent {
  685. e.faultHistoryMu.Lock()
  686. defer e.faultHistoryMu.Unlock()
  687. history := make([]FaultEvent, len(e.faultHistory))
  688. copy(history, e.faultHistory)
  689. return history
  690. }
  691. func (e *Engine) TransitionHistory() []RuntimeTransition {
  692. e.transitionHistoryMu.Lock()
  693. defer e.transitionHistoryMu.Unlock()
  694. history := make([]RuntimeTransition, len(e.transitionHistory))
  695. copy(history, e.transitionHistory)
  696. return history
  697. }
  698. func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) {
  699. if when.IsZero() {
  700. when = time.Now()
  701. }
  702. ev := RuntimeTransition{
  703. Time: when,
  704. From: from,
  705. To: to,
  706. Severity: runtimeStateSeverity(to),
  707. }
  708. e.transitionHistoryMu.Lock()
  709. defer e.transitionHistoryMu.Unlock()
  710. if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
  711. copy(e.transitionHistory, e.transitionHistory[1:])
  712. e.transitionHistory[len(e.transitionHistory)-1] = ev
  713. return
  714. }
  715. e.transitionHistory = append(e.transitionHistory, ev)
  716. }
  717. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  718. if reason == "" {
  719. reason = FaultReasonUnknown
  720. }
  721. now := time.Now()
  722. if last := e.loadLastFault(); last != nil {
  723. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  724. return
  725. }
  726. }
  727. ev := &FaultEvent{
  728. Time: now,
  729. Reason: reason,
  730. Severity: severity,
  731. Message: message,
  732. }
  733. e.lastFault.Store(ev)
  734. e.appendFaultHistory(ev)
  735. e.faultEvents.Add(1)
  736. }
  737. func (e *Engine) loadLastFault() *FaultEvent {
  738. if v := e.lastFault.Load(); v != nil {
  739. if ev, ok := v.(*FaultEvent); ok {
  740. return ev
  741. }
  742. }
  743. return nil
  744. }
  745. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  746. if source == nil {
  747. return nil
  748. }
  749. copy := *source
  750. return &copy
  751. }
  752. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  753. e.faultHistoryMu.Lock()
  754. defer e.faultHistoryMu.Unlock()
  755. if len(e.faultHistory) >= faultHistoryCapacity {
  756. copy(e.faultHistory, e.faultHistory[1:])
  757. e.faultHistory[len(e.faultHistory)-1] = *ev
  758. return
  759. }
  760. e.faultHistory = append(e.faultHistory, *ev)
  761. }
  762. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  763. state := e.currentRuntimeState()
  764. switch state {
  765. case RuntimeStateStopping, RuntimeStateFaulted:
  766. return
  767. case RuntimeStateMuted:
  768. if queue.Health == output.QueueHealthCritical {
  769. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  770. e.mutedFaultStreak.Store(0)
  771. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  772. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  773. e.setRuntimeState(RuntimeStateFaulted)
  774. return
  775. }
  776. } else {
  777. e.mutedFaultStreak.Store(0)
  778. }
  779. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  780. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  781. e.mutedRecoveryStreak.Store(0)
  782. e.mutedFaultStreak.Store(0)
  783. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  784. fmt.Sprintf("queue healthy for %d checks after mute", count))
  785. e.setRuntimeState(RuntimeStateDegraded)
  786. }
  787. } else {
  788. e.mutedRecoveryStreak.Store(0)
  789. }
  790. return
  791. }
  792. if state == RuntimeStatePrebuffering {
  793. if queue.Depth >= 1 {
  794. e.setRuntimeState(RuntimeStateRunning)
  795. }
  796. return
  797. }
  798. critical := queue.Health == output.QueueHealthCritical
  799. if critical {
  800. count := e.criticalStreak.Add(1)
  801. if count >= queueMutedStreakThreshold {
  802. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  803. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  804. e.setRuntimeState(RuntimeStateMuted)
  805. return
  806. }
  807. if count >= queueCriticalStreakThreshold {
  808. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  809. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  810. e.setRuntimeState(RuntimeStateDegraded)
  811. return
  812. }
  813. } else {
  814. e.criticalStreak.Store(0)
  815. }
  816. if hasLateBuffers {
  817. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  818. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  819. e.setRuntimeState(RuntimeStateDegraded)
  820. return
  821. }
  822. e.setRuntimeState(RuntimeStateRunning)
  823. }
  824. // ResetFault attempts to move the engine out of the faulted state.
  825. func (e *Engine) ResetFault() error {
  826. state := e.currentRuntimeState()
  827. if state != RuntimeStateFaulted {
  828. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  829. }
  830. e.criticalStreak.Store(0)
  831. e.mutedRecoveryStreak.Store(0)
  832. e.mutedFaultStreak.Store(0)
  833. e.setRuntimeState(RuntimeStateDegraded)
  834. return nil
  835. }