Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

963 lines
28KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "math"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/jan/fm-rds-tx/internal/audio"
  12. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  13. "github.com/jan/fm-rds-tx/internal/dsp"
  14. "github.com/jan/fm-rds-tx/internal/license"
  15. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  16. "github.com/jan/fm-rds-tx/internal/output"
  17. "github.com/jan/fm-rds-tx/internal/platform"
  18. )
  19. type EngineState int
  20. const (
  21. EngineIdle EngineState = iota
  22. EngineRunning
  23. EngineStopping
  24. )
  25. func (s EngineState) String() string {
  26. switch s {
  27. case EngineIdle:
  28. return "idle"
  29. case EngineRunning:
  30. return "running"
  31. case EngineStopping:
  32. return "stopping"
  33. default:
  34. return "unknown"
  35. }
  36. }
  37. type RuntimeState string
  38. const (
  39. RuntimeStateIdle RuntimeState = "idle"
  40. RuntimeStateArming RuntimeState = "arming"
  41. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  42. RuntimeStateRunning RuntimeState = "running"
  43. RuntimeStateDegraded RuntimeState = "degraded"
  44. RuntimeStateMuted RuntimeState = "muted"
  45. RuntimeStateFaulted RuntimeState = "faulted"
  46. RuntimeStateStopping RuntimeState = "stopping"
  47. )
  48. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  49. v := uint64(d)
  50. for {
  51. cur := dst.Load()
  52. if v <= cur {
  53. return
  54. }
  55. if dst.CompareAndSwap(cur, v) {
  56. return
  57. }
  58. }
  59. }
  60. func durationMs(ns uint64) float64 {
  61. return float64(ns) / float64(time.Millisecond)
  62. }
  63. type EngineStats struct {
  64. State string `json:"state"`
  65. RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"`
  66. ChunksProduced uint64 `json:"chunksProduced"`
  67. TotalSamples uint64 `json:"totalSamples"`
  68. Underruns uint64 `json:"underruns"`
  69. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  70. LastError string `json:"lastError,omitempty"`
  71. UptimeSeconds float64 `json:"uptimeSeconds"`
  72. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  73. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  74. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  75. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  76. MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"`
  77. MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"`
  78. Queue output.QueueStats `json:"queue"`
  79. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  80. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  81. AppliedFrequencyMHz float64 `json:"appliedFrequencyMHz"`
  82. ActivePS string `json:"activePS,omitempty"`
  83. ActiveRadioText string `json:"activeRadioText,omitempty"`
  84. LastFault *FaultEvent `json:"lastFault,omitempty"`
  85. DegradedTransitions uint64 `json:"degradedTransitions"`
  86. MutedTransitions uint64 `json:"mutedTransitions"`
  87. FaultedTransitions uint64 `json:"faultedTransitions"`
  88. FaultCount uint64 `json:"faultCount"`
  89. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  90. TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
  91. }
  92. type RuntimeIndicator string
  93. const (
  94. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  95. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  96. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  97. )
  98. type RuntimeTransition struct {
  99. Time time.Time `json:"time"`
  100. From RuntimeState `json:"from"`
  101. To RuntimeState `json:"to"`
  102. Severity string `json:"severity"`
  103. }
  104. const (
  105. lateBufferIndicatorWindow = 2 * time.Second
  106. writeLateTolerance = 10 * time.Millisecond
  107. queueCriticalStreakThreshold = 3
  108. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  109. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  110. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  111. faultRepeatWindow = 1 * time.Second
  112. lateBufferStreakThreshold = 3 // consecutive late writes required before alerting
  113. faultHistoryCapacity = 8
  114. runtimeTransitionHistoryCapacity = 8
  115. )
  116. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  117. // resamples to device rate, and pushes to hardware in a tight loop.
  118. // The hardware buffer_push call is blocking — it returns when the hardware
  119. // has consumed the previous buffer and is ready for the next one.
  120. // This naturally paces the loop to real-time without a ticker.
  121. type Engine struct {
  122. cfg cfgpkg.Config
  123. driver platform.SoapyDriver
  124. generator *offpkg.Generator
  125. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  126. chunkDuration time.Duration
  127. deviceRate float64
  128. frameQueue *output.FrameQueue
  129. mu sync.Mutex
  130. state EngineState
  131. cancel context.CancelFunc
  132. startedAt time.Time
  133. wg sync.WaitGroup
  134. runtimeState atomic.Value
  135. stateMu sync.Mutex // guards setRuntimeState check-then-store (NEW-2 fix)
  136. chunksProduced atomic.Uint64
  137. totalSamples atomic.Uint64
  138. underruns atomic.Uint64
  139. lateBuffers atomic.Uint64
  140. lateBufferAlertAt atomic.Uint64
  141. lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write
  142. criticalStreak atomic.Uint64
  143. mutedRecoveryStreak atomic.Uint64
  144. mutedFaultStreak atomic.Uint64
  145. maxCycleNs atomic.Uint64
  146. maxGenerateNs atomic.Uint64
  147. maxUpsampleNs atomic.Uint64
  148. maxWriteNs atomic.Uint64
  149. maxQueueResidenceNs atomic.Uint64
  150. maxPipelineNs atomic.Uint64
  151. lastError atomic.Value // string
  152. lastFault atomic.Value // *FaultEvent
  153. faultHistoryMu sync.Mutex
  154. faultHistory []FaultEvent
  155. transitionHistoryMu sync.Mutex
  156. transitionHistory []RuntimeTransition
  157. degradedTransitions atomic.Uint64
  158. mutedTransitions atomic.Uint64
  159. faultedTransitions atomic.Uint64
  160. faultEvents atomic.Uint64
  161. runtimeStateEnteredAt atomic.Uint64
  162. // Live config: pending frequency change, applied between chunks
  163. pendingFreq atomic.Pointer[float64]
  164. // Most recently tuned frequency (Hz)
  165. appliedFreqHz atomic.Uint64
  166. // Live audio stream (optional)
  167. streamSrc *audio.StreamSource
  168. }
  169. // SetStreamSource configures a live audio stream as the audio source.
  170. // Must be called before Start(). The StreamResampler is created internally
  171. // to convert from the stream's sample rate to the DSP composite rate.
  172. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  173. e.streamSrc = src
  174. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  175. if compositeRate <= 0 {
  176. compositeRate = 228000
  177. }
  178. resampler := audio.NewStreamResampler(src, compositeRate)
  179. if err := e.generator.SetExternalSource(resampler); err != nil {
  180. // Should never happen: SetStreamSource must be called before Start().
  181. log.Printf("engine: SetExternalSource failed (called too late): %v", err)
  182. return
  183. }
  184. log.Printf("engine: live audio stream wired — initial %d Hz → %.0f Hz composite (buffer %d frames); actual decoded rate auto-corrects on first chunk",
  185. src.SampleRate, compositeRate, src.Stats().Capacity)
  186. }
  187. // SetLicenseState passes license/jingle state to the generator.
  188. // Must be called before Start(). It does not implicitly enable watermarking.
  189. func (e *Engine) SetLicenseState(s *license.State, _ string) {
  190. e.generator.SetLicense(s)
  191. }
  192. // ConfigureWatermark explicitly enables or disables the optional program-audio watermark.
  193. // Must be called before Start().
  194. func (e *Engine) ConfigureWatermark(enabled bool, key string) {
  195. e.generator.ConfigureWatermark(enabled, key)
  196. }
  197. // StreamSource returns the live audio stream source, or nil.
  198. // Used by the control server for stats and HTTP audio ingest.
  199. func (e *Engine) StreamSource() *audio.StreamSource {
  200. return e.streamSrc
  201. }
  202. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  203. deviceRate := cfg.EffectiveDeviceRate()
  204. compositeRate := float64(cfg.FM.CompositeRateHz)
  205. if compositeRate <= 0 {
  206. compositeRate = 228000
  207. }
  208. var upsampler *dsp.FMUpsampler
  209. if deviceRate > compositeRate*1.001 {
  210. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  211. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  212. // This halves CPU load compared to running all DSP at deviceRate.
  213. cfg.FM.FMModulationEnabled = false
  214. maxDev := cfg.FM.MaxDeviationHz
  215. if maxDev <= 0 {
  216. maxDev = 75000
  217. }
  218. // mpxGain scales the FM deviation to compensate for hardware
  219. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  220. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  221. maxDev *= cfg.FM.MpxGain
  222. }
  223. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  224. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  225. compositeRate, deviceRate, deviceRate/compositeRate)
  226. } else {
  227. // Same-rate: entire DSP chain runs at deviceRate.
  228. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  229. if deviceRate > 0 {
  230. cfg.FM.CompositeRateHz = int(deviceRate)
  231. }
  232. cfg.FM.FMModulationEnabled = true
  233. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  234. }
  235. engine := &Engine{
  236. cfg: cfg,
  237. driver: driver,
  238. generator: offpkg.NewGenerator(cfg),
  239. upsampler: upsampler,
  240. chunkDuration: 50 * time.Millisecond,
  241. deviceRate: deviceRate,
  242. state: EngineIdle,
  243. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  244. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  245. transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
  246. }
  247. initFreqHz := cfg.FM.FrequencyMHz * 1e6
  248. engine.appliedFreqHz.Store(math.Float64bits(initFreqHz))
  249. engine.setRuntimeState(RuntimeStateIdle)
  250. return engine
  251. }
  252. func (e *Engine) SetChunkDuration(d time.Duration) {
  253. e.chunkDuration = d
  254. }
  255. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  256. // nil pointers mean "no change". Validated before applying.
  257. type LiveConfigUpdate struct {
  258. FrequencyMHz *float64
  259. OutputDrive *float64
  260. StereoEnabled *bool
  261. StereoMode *string
  262. PilotLevel *float64
  263. RDSInjection *float64
  264. RDSEnabled *bool
  265. LimiterEnabled *bool
  266. LimiterCeiling *float64
  267. PS *string
  268. RadioText *string
  269. TA *bool
  270. TP *bool
  271. // Tone and gain: live-patchable without engine restart.
  272. ToneLeftHz *float64
  273. ToneRightHz *float64
  274. ToneAmplitude *float64
  275. AudioGain *float64
  276. CompositeClipperEnabled *bool
  277. }
  278. // UpdateConfig applies live parameter changes without restarting the engine.
  279. // DSP params take effect at the next chunk boundary (~50ms max).
  280. // Frequency changes are applied between chunks via driver.Tune().
  281. // RDS text updates are applied at the next RDS group boundary (~88ms).
  282. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  283. // --- Validate ---
  284. if u.FrequencyMHz != nil {
  285. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  286. return fmt.Errorf("frequencyMHz out of range (65-110)")
  287. }
  288. }
  289. if u.OutputDrive != nil {
  290. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  291. return fmt.Errorf("outputDrive out of range (0-10)")
  292. }
  293. }
  294. if u.PilotLevel != nil {
  295. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  296. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  297. }
  298. }
  299. if u.RDSInjection != nil {
  300. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  301. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  302. }
  303. }
  304. if u.LimiterCeiling != nil {
  305. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  306. return fmt.Errorf("limiterCeiling out of range (0-2)")
  307. }
  308. }
  309. if u.ToneAmplitude != nil {
  310. if *u.ToneAmplitude < 0 || *u.ToneAmplitude > 1 {
  311. return fmt.Errorf("toneAmplitude out of range (0-1)")
  312. }
  313. }
  314. if u.AudioGain != nil {
  315. if *u.AudioGain < 0 || *u.AudioGain > 4 {
  316. return fmt.Errorf("audioGain out of range (0-4)")
  317. }
  318. }
  319. // --- Frequency: store for run loop to apply via driver.Tune() ---
  320. if u.FrequencyMHz != nil {
  321. freqHz := *u.FrequencyMHz * 1e6
  322. e.pendingFreq.Store(&freqHz)
  323. }
  324. // --- RDS text: forward to encoder atomics ---
  325. if u.PS != nil || u.RadioText != nil {
  326. if enc := e.generator.RDSEncoder(); enc != nil {
  327. ps, rt := "", ""
  328. if u.PS != nil {
  329. ps = *u.PS
  330. }
  331. if u.RadioText != nil {
  332. rt = *u.RadioText
  333. }
  334. enc.UpdateText(ps, rt)
  335. }
  336. }
  337. // --- RDS traffic flags: live-update ---
  338. if u.TA != nil || u.TP != nil {
  339. if enc := e.generator.RDSEncoder(); enc != nil {
  340. if u.TA != nil {
  341. enc.UpdateTA(*u.TA)
  342. }
  343. if u.TP != nil {
  344. enc.UpdateTP(*u.TP)
  345. }
  346. }
  347. }
  348. // --- DSP params: build new LiveParams from current + patch ---
  349. // Read current, apply deltas, store new
  350. current := e.generator.CurrentLiveParams()
  351. next := current // copy
  352. if u.OutputDrive != nil {
  353. next.OutputDrive = *u.OutputDrive
  354. }
  355. if u.StereoEnabled != nil {
  356. next.StereoEnabled = *u.StereoEnabled
  357. }
  358. if u.StereoMode != nil {
  359. next.StereoMode = *u.StereoMode
  360. }
  361. if u.PilotLevel != nil {
  362. next.PilotLevel = *u.PilotLevel
  363. }
  364. if u.RDSInjection != nil {
  365. next.RDSInjection = *u.RDSInjection
  366. }
  367. if u.RDSEnabled != nil {
  368. next.RDSEnabled = *u.RDSEnabled
  369. }
  370. if u.LimiterEnabled != nil {
  371. next.LimiterEnabled = *u.LimiterEnabled
  372. }
  373. if u.LimiterCeiling != nil {
  374. next.LimiterCeiling = *u.LimiterCeiling
  375. }
  376. if u.ToneLeftHz != nil {
  377. next.ToneLeftHz = *u.ToneLeftHz
  378. }
  379. if u.ToneRightHz != nil {
  380. next.ToneRightHz = *u.ToneRightHz
  381. }
  382. if u.ToneAmplitude != nil {
  383. next.ToneAmplitude = *u.ToneAmplitude
  384. }
  385. if u.AudioGain != nil {
  386. next.AudioGain = *u.AudioGain
  387. }
  388. if u.CompositeClipperEnabled != nil {
  389. next.CompositeClipperEnabled = *u.CompositeClipperEnabled
  390. }
  391. e.generator.UpdateLive(next)
  392. return nil
  393. }
  394. func (e *Engine) Start(ctx context.Context) error {
  395. e.mu.Lock()
  396. if e.state != EngineIdle {
  397. e.mu.Unlock()
  398. return fmt.Errorf("engine already in state %s", e.state)
  399. }
  400. if err := e.driver.Start(ctx); err != nil {
  401. e.mu.Unlock()
  402. return fmt.Errorf("driver start: %w", err)
  403. }
  404. e.generator.Reset()
  405. if e.upsampler != nil {
  406. e.upsampler.Reset()
  407. }
  408. runCtx, cancel := context.WithCancel(ctx)
  409. e.cancel = cancel
  410. e.state = EngineRunning
  411. e.setRuntimeState(RuntimeStateArming)
  412. e.startedAt = time.Now()
  413. // BUG-A fix: discard any frames left from a previous run so writerLoop
  414. // does not send stale data with expired timestamps on restart.
  415. e.frameQueue.Drain()
  416. e.wg.Add(1)
  417. e.mu.Unlock()
  418. go e.run(runCtx)
  419. return nil
  420. }
  421. func (e *Engine) Stop(ctx context.Context) error {
  422. e.mu.Lock()
  423. if e.state != EngineRunning {
  424. e.mu.Unlock()
  425. return nil
  426. }
  427. e.state = EngineStopping
  428. e.setRuntimeState(RuntimeStateStopping)
  429. e.cancel()
  430. e.mu.Unlock()
  431. // Wait for run() goroutine to exit — deterministic, no guessing
  432. e.wg.Wait()
  433. if err := e.driver.Flush(ctx); err != nil {
  434. return err
  435. }
  436. if err := e.driver.Stop(ctx); err != nil {
  437. return err
  438. }
  439. e.mu.Lock()
  440. e.state = EngineIdle
  441. e.setRuntimeState(RuntimeStateIdle)
  442. e.mu.Unlock()
  443. return nil
  444. }
  445. func (e *Engine) Stats() EngineStats {
  446. e.mu.Lock()
  447. state := e.state
  448. startedAt := e.startedAt
  449. e.mu.Unlock()
  450. var uptime float64
  451. if state == EngineRunning {
  452. uptime = time.Since(startedAt).Seconds()
  453. }
  454. errVal, _ := e.lastError.Load().(string)
  455. queue := e.frameQueue.Stats()
  456. lateBuffers := e.lateBuffers.Load()
  457. hasRecentLateBuffers := e.hasRecentLateBuffers()
  458. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  459. lastFault := e.lastFaultEvent()
  460. activePS, activeRT := "", ""
  461. if enc := e.generator.RDSEncoder(); enc != nil {
  462. activePS, activeRT = enc.CurrentText()
  463. }
  464. return EngineStats{
  465. State: string(e.currentRuntimeState()),
  466. RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(),
  467. ChunksProduced: e.chunksProduced.Load(),
  468. TotalSamples: e.totalSamples.Load(),
  469. Underruns: e.underruns.Load(),
  470. LateBuffers: lateBuffers,
  471. LastError: errVal,
  472. UptimeSeconds: uptime,
  473. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  474. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  475. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  476. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  477. MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()),
  478. MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()),
  479. Queue: queue,
  480. RuntimeIndicator: ri,
  481. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  482. AppliedFrequencyMHz: e.appliedFrequencyMHz(),
  483. ActivePS: activePS,
  484. ActiveRadioText: activeRT,
  485. LastFault: lastFault,
  486. DegradedTransitions: e.degradedTransitions.Load(),
  487. MutedTransitions: e.mutedTransitions.Load(),
  488. FaultedTransitions: e.faultedTransitions.Load(),
  489. FaultCount: e.faultEvents.Load(),
  490. FaultHistory: e.FaultHistory(),
  491. TransitionHistory: e.TransitionHistory(),
  492. }
  493. }
  494. func (e *Engine) appliedFrequencyMHz() float64 {
  495. bits := e.appliedFreqHz.Load()
  496. return math.Float64frombits(bits) / 1e6
  497. }
  498. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  499. switch {
  500. case queueHealth == output.QueueHealthCritical:
  501. return RuntimeIndicatorQueueCritical
  502. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  503. return RuntimeIndicatorDegraded
  504. default:
  505. return RuntimeIndicatorNormal
  506. }
  507. }
  508. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  509. switch {
  510. case queueHealth == output.QueueHealthCritical:
  511. return "queue health critical"
  512. case recentLateBuffers:
  513. return "late buffers"
  514. case queueHealth == output.QueueHealthLow:
  515. return "queue health low"
  516. default:
  517. return ""
  518. }
  519. }
  520. func runtimeStateSeverity(state RuntimeState) string {
  521. switch state {
  522. case RuntimeStateRunning:
  523. return "ok"
  524. case RuntimeStateDegraded, RuntimeStateMuted:
  525. return "warn"
  526. case RuntimeStateFaulted:
  527. return "err"
  528. default:
  529. return "info"
  530. }
  531. }
  532. func (e *Engine) run(ctx context.Context) {
  533. e.setRuntimeState(RuntimeStatePrebuffering)
  534. e.wg.Add(1)
  535. go e.writerLoop(ctx)
  536. defer e.wg.Done()
  537. for {
  538. if ctx.Err() != nil {
  539. return
  540. }
  541. // Apply pending frequency change between chunks
  542. if pf := e.pendingFreq.Swap(nil); pf != nil {
  543. if err := e.driver.Tune(ctx, *pf); err != nil {
  544. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  545. } else {
  546. e.appliedFreqHz.Store(math.Float64bits(*pf))
  547. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  548. }
  549. }
  550. t0 := time.Now()
  551. frame := e.generator.GenerateFrame(e.chunkDuration)
  552. frame.GeneratedAt = t0
  553. t1 := time.Now()
  554. if e.upsampler != nil {
  555. frame = e.upsampler.Process(frame)
  556. frame.GeneratedAt = t0
  557. }
  558. t2 := time.Now()
  559. genDur := t1.Sub(t0)
  560. upDur := t2.Sub(t1)
  561. updateMaxDuration(&e.maxGenerateNs, genDur)
  562. updateMaxDuration(&e.maxUpsampleNs, upDur)
  563. // cloneFrame never returns nil when src is non-nil (NEW-3: dead nil check removed)
  564. enqueued := cloneFrame(frame)
  565. enqueued.EnqueuedAt = time.Now()
  566. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  567. if ctx.Err() != nil {
  568. return
  569. }
  570. if errors.Is(err, output.ErrFrameQueueClosed) {
  571. return
  572. }
  573. e.lastError.Store(err.Error())
  574. e.underruns.Add(1)
  575. select {
  576. case <-time.After(e.chunkDuration):
  577. case <-ctx.Done():
  578. return
  579. }
  580. continue
  581. }
  582. queueStats := e.frameQueue.Stats()
  583. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  584. }
  585. }
  586. func (e *Engine) writerLoop(ctx context.Context) {
  587. defer e.wg.Done()
  588. for {
  589. frame, err := e.frameQueue.Pop(ctx)
  590. if err != nil {
  591. if ctx.Err() != nil {
  592. return
  593. }
  594. if errors.Is(err, output.ErrFrameQueueClosed) {
  595. return
  596. }
  597. e.lastError.Store(err.Error())
  598. e.underruns.Add(1)
  599. continue
  600. }
  601. frame.DequeuedAt = time.Now()
  602. queueResidence := time.Duration(0)
  603. if !frame.EnqueuedAt.IsZero() {
  604. queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt)
  605. }
  606. writeStart := time.Now()
  607. frame.WriteStartedAt = writeStart
  608. n, err := e.driver.Write(ctx, frame)
  609. writeDur := time.Since(writeStart)
  610. pipelineLatency := writeDur
  611. if !frame.GeneratedAt.IsZero() {
  612. pipelineLatency = time.Since(frame.GeneratedAt)
  613. }
  614. updateMaxDuration(&e.maxWriteNs, writeDur)
  615. updateMaxDuration(&e.maxQueueResidenceNs, queueResidence)
  616. updateMaxDuration(&e.maxPipelineNs, pipelineLatency)
  617. updateMaxDuration(&e.maxCycleNs, writeDur)
  618. queueStats := e.frameQueue.Stats()
  619. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  620. lateOver := writeDur - e.chunkDuration
  621. if lateOver > writeLateTolerance {
  622. streak := e.lateBufferStreak.Add(1)
  623. late := e.lateBuffers.Add(1)
  624. // Only arm the alert window once the streak threshold is reached.
  625. // Isolated OS-scheduling or USB jitter spikes (single late writes)
  626. // are normal on a loaded system and must not trigger degraded state.
  627. // This mirrors the queue-health streak logic.
  628. if streak >= lateBufferStreakThreshold {
  629. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  630. }
  631. if late <= 5 || late%20 == 0 {
  632. log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s",
  633. streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency)
  634. }
  635. } else {
  636. // Clean write — reset the consecutive streak so isolated spikes
  637. // never accumulate toward the threshold.
  638. e.lateBufferStreak.Store(0)
  639. }
  640. if err != nil {
  641. if ctx.Err() != nil {
  642. return
  643. }
  644. e.recordFault(FaultReasonWriteTimeout, FaultSeverityWarn, fmt.Sprintf("driver write error: %v", err))
  645. e.lastError.Store(err.Error())
  646. e.underruns.Add(1)
  647. select {
  648. case <-time.After(e.chunkDuration):
  649. case <-ctx.Done():
  650. return
  651. }
  652. continue
  653. }
  654. e.chunksProduced.Add(1)
  655. e.totalSamples.Add(uint64(n))
  656. }
  657. }
  658. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  659. if src == nil {
  660. return nil
  661. }
  662. samples := make([]output.IQSample, len(src.Samples))
  663. copy(samples, src.Samples)
  664. return &output.CompositeFrame{
  665. Samples: samples,
  666. SampleRateHz: src.SampleRateHz,
  667. Timestamp: src.Timestamp,
  668. GeneratedAt: src.GeneratedAt,
  669. EnqueuedAt: src.EnqueuedAt,
  670. DequeuedAt: src.DequeuedAt,
  671. WriteStartedAt: src.WriteStartedAt,
  672. Sequence: src.Sequence,
  673. }
  674. }
  675. func (e *Engine) setRuntimeState(state RuntimeState) {
  676. // NEW-2 fix: hold stateMu so that concurrent calls from run() and
  677. // writerLoop() cannot both see prev != state and both record a
  678. // spurious duplicate transition.
  679. e.stateMu.Lock()
  680. defer e.stateMu.Unlock()
  681. now := time.Now()
  682. prev := e.currentRuntimeState()
  683. if prev != state {
  684. e.recordRuntimeTransition(prev, state, now)
  685. switch state {
  686. case RuntimeStateDegraded:
  687. e.degradedTransitions.Add(1)
  688. case RuntimeStateMuted:
  689. e.mutedTransitions.Add(1)
  690. case RuntimeStateFaulted:
  691. e.faultedTransitions.Add(1)
  692. }
  693. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  694. } else if e.runtimeStateEnteredAt.Load() == 0 {
  695. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  696. }
  697. e.runtimeState.Store(state)
  698. }
  699. func (e *Engine) currentRuntimeState() RuntimeState {
  700. if v := e.runtimeState.Load(); v != nil {
  701. if rs, ok := v.(RuntimeState); ok {
  702. return rs
  703. }
  704. }
  705. return RuntimeStateIdle
  706. }
  707. func (e *Engine) runtimeStateDurationSeconds() float64 {
  708. if ts := e.runtimeStateEnteredAt.Load(); ts != 0 {
  709. return time.Since(time.Unix(0, int64(ts))).Seconds()
  710. }
  711. return 0
  712. }
  713. func (e *Engine) hasRecentLateBuffers() bool {
  714. lateAlertAt := e.lateBufferAlertAt.Load()
  715. if lateAlertAt == 0 {
  716. return false
  717. }
  718. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  719. }
  720. func (e *Engine) lastFaultEvent() *FaultEvent {
  721. return copyFaultEvent(e.loadLastFault())
  722. }
  723. // LastFault exposes the most recent captured fault, if any.
  724. func (e *Engine) LastFault() *FaultEvent {
  725. return e.lastFaultEvent()
  726. }
  727. func (e *Engine) FaultHistory() []FaultEvent {
  728. e.faultHistoryMu.Lock()
  729. defer e.faultHistoryMu.Unlock()
  730. history := make([]FaultEvent, len(e.faultHistory))
  731. copy(history, e.faultHistory)
  732. return history
  733. }
  734. func (e *Engine) TransitionHistory() []RuntimeTransition {
  735. e.transitionHistoryMu.Lock()
  736. defer e.transitionHistoryMu.Unlock()
  737. history := make([]RuntimeTransition, len(e.transitionHistory))
  738. copy(history, e.transitionHistory)
  739. return history
  740. }
  741. func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) {
  742. if when.IsZero() {
  743. when = time.Now()
  744. }
  745. ev := RuntimeTransition{
  746. Time: when,
  747. From: from,
  748. To: to,
  749. Severity: runtimeStateSeverity(to),
  750. }
  751. e.transitionHistoryMu.Lock()
  752. defer e.transitionHistoryMu.Unlock()
  753. if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
  754. copy(e.transitionHistory, e.transitionHistory[1:])
  755. e.transitionHistory[len(e.transitionHistory)-1] = ev
  756. return
  757. }
  758. e.transitionHistory = append(e.transitionHistory, ev)
  759. }
  760. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  761. if reason == "" {
  762. reason = FaultReasonUnknown
  763. }
  764. now := time.Now()
  765. if last := e.loadLastFault(); last != nil {
  766. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  767. return
  768. }
  769. }
  770. ev := &FaultEvent{
  771. Time: now,
  772. Reason: reason,
  773. Severity: severity,
  774. Message: message,
  775. }
  776. e.lastFault.Store(ev)
  777. e.appendFaultHistory(ev)
  778. e.faultEvents.Add(1)
  779. }
  780. func (e *Engine) loadLastFault() *FaultEvent {
  781. if v := e.lastFault.Load(); v != nil {
  782. if ev, ok := v.(*FaultEvent); ok {
  783. return ev
  784. }
  785. }
  786. return nil
  787. }
  788. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  789. if source == nil {
  790. return nil
  791. }
  792. copy := *source
  793. return &copy
  794. }
  795. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  796. e.faultHistoryMu.Lock()
  797. defer e.faultHistoryMu.Unlock()
  798. if len(e.faultHistory) >= faultHistoryCapacity {
  799. copy(e.faultHistory, e.faultHistory[1:])
  800. e.faultHistory[len(e.faultHistory)-1] = *ev
  801. return
  802. }
  803. e.faultHistory = append(e.faultHistory, *ev)
  804. }
  805. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  806. state := e.currentRuntimeState()
  807. switch state {
  808. case RuntimeStateStopping, RuntimeStateFaulted:
  809. return
  810. case RuntimeStateMuted:
  811. if queue.Health == output.QueueHealthCritical {
  812. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  813. e.mutedFaultStreak.Store(0)
  814. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  815. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  816. e.setRuntimeState(RuntimeStateFaulted)
  817. return
  818. }
  819. } else {
  820. e.mutedFaultStreak.Store(0)
  821. }
  822. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  823. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  824. e.mutedRecoveryStreak.Store(0)
  825. e.mutedFaultStreak.Store(0)
  826. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  827. fmt.Sprintf("queue healthy for %d checks after mute", count))
  828. e.setRuntimeState(RuntimeStateDegraded)
  829. }
  830. } else {
  831. e.mutedRecoveryStreak.Store(0)
  832. }
  833. return
  834. }
  835. if state == RuntimeStatePrebuffering {
  836. if queue.Depth >= 1 {
  837. e.setRuntimeState(RuntimeStateRunning)
  838. }
  839. return
  840. }
  841. critical := queue.Health == output.QueueHealthCritical
  842. if critical {
  843. count := e.criticalStreak.Add(1)
  844. if count >= queueMutedStreakThreshold {
  845. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  846. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  847. e.setRuntimeState(RuntimeStateMuted)
  848. return
  849. }
  850. if count >= queueCriticalStreakThreshold {
  851. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  852. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  853. e.setRuntimeState(RuntimeStateDegraded)
  854. return
  855. }
  856. } else {
  857. e.criticalStreak.Store(0)
  858. }
  859. if hasLateBuffers {
  860. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  861. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  862. e.setRuntimeState(RuntimeStateDegraded)
  863. return
  864. }
  865. e.setRuntimeState(RuntimeStateRunning)
  866. }
  867. // ResetFault attempts to move the engine out of the faulted state.
  868. func (e *Engine) ResetFault() error {
  869. state := e.currentRuntimeState()
  870. if state != RuntimeStateFaulted {
  871. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  872. }
  873. e.criticalStreak.Store(0)
  874. e.mutedRecoveryStreak.Store(0)
  875. e.mutedFaultStreak.Store(0)
  876. e.setRuntimeState(RuntimeStateDegraded)
  877. return nil
  878. }