Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

986 lignes
29KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "math"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/jan/fm-rds-tx/internal/audio"
  12. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  13. "github.com/jan/fm-rds-tx/internal/dsp"
  14. "github.com/jan/fm-rds-tx/internal/license"
  15. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  16. "github.com/jan/fm-rds-tx/internal/output"
  17. "github.com/jan/fm-rds-tx/internal/platform"
  18. )
  19. type EngineState int
  20. const (
  21. EngineIdle EngineState = iota
  22. EngineRunning
  23. EngineStopping
  24. )
  25. func (s EngineState) String() string {
  26. switch s {
  27. case EngineIdle:
  28. return "idle"
  29. case EngineRunning:
  30. return "running"
  31. case EngineStopping:
  32. return "stopping"
  33. default:
  34. return "unknown"
  35. }
  36. }
  37. type RuntimeState string
  38. const (
  39. RuntimeStateIdle RuntimeState = "idle"
  40. RuntimeStateArming RuntimeState = "arming"
  41. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  42. RuntimeStateRunning RuntimeState = "running"
  43. RuntimeStateDegraded RuntimeState = "degraded"
  44. RuntimeStateMuted RuntimeState = "muted"
  45. RuntimeStateFaulted RuntimeState = "faulted"
  46. RuntimeStateStopping RuntimeState = "stopping"
  47. )
  48. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  49. v := uint64(d)
  50. for {
  51. cur := dst.Load()
  52. if v <= cur {
  53. return
  54. }
  55. if dst.CompareAndSwap(cur, v) {
  56. return
  57. }
  58. }
  59. }
  60. func durationMs(ns uint64) float64 {
  61. return float64(ns) / float64(time.Millisecond)
  62. }
  63. type EngineStats struct {
  64. State string `json:"state"`
  65. RuntimeStateDurationSeconds float64 `json:"runtimeStateDurationSeconds"`
  66. ChunksProduced uint64 `json:"chunksProduced"`
  67. TotalSamples uint64 `json:"totalSamples"`
  68. Underruns uint64 `json:"underruns"`
  69. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  70. LastError string `json:"lastError,omitempty"`
  71. UptimeSeconds float64 `json:"uptimeSeconds"`
  72. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  73. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  74. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  75. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  76. MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"`
  77. MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"`
  78. Queue output.QueueStats `json:"queue"`
  79. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  80. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  81. AppliedFrequencyMHz float64 `json:"appliedFrequencyMHz"`
  82. ActivePS string `json:"activePS,omitempty"`
  83. ActiveRadioText string `json:"activeRadioText,omitempty"`
  84. Measurement *offpkg.MeasurementSnapshot `json:"measurement,omitempty"`
  85. LastFault *FaultEvent `json:"lastFault,omitempty"`
  86. DegradedTransitions uint64 `json:"degradedTransitions"`
  87. MutedTransitions uint64 `json:"mutedTransitions"`
  88. FaultedTransitions uint64 `json:"faultedTransitions"`
  89. FaultCount uint64 `json:"faultCount"`
  90. FaultHistory []FaultEvent `json:"faultHistory,omitempty"`
  91. TransitionHistory []RuntimeTransition `json:"transitionHistory,omitempty"`
  92. }
  93. type RuntimeIndicator string
  94. const (
  95. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  96. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  97. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  98. )
  99. type RuntimeTransition struct {
  100. Time time.Time `json:"time"`
  101. From RuntimeState `json:"from"`
  102. To RuntimeState `json:"to"`
  103. Severity string `json:"severity"`
  104. }
  105. const (
  106. lateBufferIndicatorWindow = 2 * time.Second
  107. writeLateTolerance = 10 * time.Millisecond
  108. queueCriticalStreakThreshold = 3
  109. queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
  110. queueMutedRecoveryThreshold = queueCriticalStreakThreshold
  111. queueFaultedStreakThreshold = queueCriticalStreakThreshold
  112. faultRepeatWindow = 1 * time.Second
  113. lateBufferStreakThreshold = 3 // consecutive late writes required before alerting
  114. faultHistoryCapacity = 8
  115. runtimeTransitionHistoryCapacity = 8
  116. )
  117. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  118. // resamples to device rate, and pushes to hardware in a tight loop.
  119. // The hardware buffer_push call is blocking — it returns when the hardware
  120. // has consumed the previous buffer and is ready for the next one.
  121. // This naturally paces the loop to real-time without a ticker.
  122. type Engine struct {
  123. cfg cfgpkg.Config
  124. driver platform.SoapyDriver
  125. generator *offpkg.Generator
  126. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  127. chunkDuration time.Duration
  128. deviceRate float64
  129. frameQueue *output.FrameQueue
  130. mu sync.Mutex
  131. state EngineState
  132. cancel context.CancelFunc
  133. startedAt time.Time
  134. wg sync.WaitGroup
  135. runtimeState atomic.Value
  136. stateMu sync.Mutex // guards setRuntimeState check-then-store (NEW-2 fix)
  137. chunksProduced atomic.Uint64
  138. totalSamples atomic.Uint64
  139. underruns atomic.Uint64
  140. lateBuffers atomic.Uint64
  141. lateBufferAlertAt atomic.Uint64
  142. lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write
  143. criticalStreak atomic.Uint64
  144. mutedRecoveryStreak atomic.Uint64
  145. mutedFaultStreak atomic.Uint64
  146. maxCycleNs atomic.Uint64
  147. maxGenerateNs atomic.Uint64
  148. maxUpsampleNs atomic.Uint64
  149. maxWriteNs atomic.Uint64
  150. maxQueueResidenceNs atomic.Uint64
  151. maxPipelineNs atomic.Uint64
  152. lastError atomic.Value // string
  153. lastFault atomic.Value // *FaultEvent
  154. faultHistoryMu sync.Mutex
  155. faultHistory []FaultEvent
  156. transitionHistoryMu sync.Mutex
  157. transitionHistory []RuntimeTransition
  158. degradedTransitions atomic.Uint64
  159. mutedTransitions atomic.Uint64
  160. faultedTransitions atomic.Uint64
  161. faultEvents atomic.Uint64
  162. runtimeStateEnteredAt atomic.Uint64
  163. // Live config: pending frequency change, applied between chunks
  164. pendingFreq atomic.Pointer[float64]
  165. // Most recently tuned frequency (Hz)
  166. appliedFreqHz atomic.Uint64
  167. // Live audio stream (optional)
  168. streamSrc *audio.StreamSource
  169. measurementPublisherMu sync.RWMutex
  170. measurementPublisher func(*offpkg.MeasurementSnapshot)
  171. lastPublishedMeasSeq atomic.Uint64
  172. }
  173. // SetStreamSource configures a live audio stream as the audio source.
  174. // Must be called before Start(). The StreamResampler is created internally
  175. // to convert from the stream's sample rate to the DSP composite rate.
  176. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  177. e.streamSrc = src
  178. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  179. if compositeRate <= 0 {
  180. compositeRate = 228000
  181. }
  182. resampler := audio.NewStreamResampler(src, compositeRate)
  183. if err := e.generator.SetExternalSource(resampler); err != nil {
  184. // Should never happen: SetStreamSource must be called before Start().
  185. log.Printf("engine: SetExternalSource failed (called too late): %v", err)
  186. return
  187. }
  188. log.Printf("engine: live audio stream wired — initial %d Hz → %.0f Hz composite (buffer %d frames); actual decoded rate auto-corrects on first chunk",
  189. src.SampleRate, compositeRate, src.Stats().Capacity)
  190. }
  191. // SetLicenseState passes license/jingle state to the generator.
  192. // Must be called before Start(). It does not implicitly enable watermarking.
  193. func (e *Engine) SetLicenseState(s *license.State, _ string) {
  194. e.generator.SetLicense(s)
  195. }
  196. // ConfigureWatermark explicitly enables or disables the optional program-audio watermark.
  197. // Must be called before Start().
  198. func (e *Engine) ConfigureWatermark(enabled bool, key string) {
  199. e.generator.ConfigureWatermark(enabled, key)
  200. }
  201. // StreamSource returns the live audio stream source, or nil.
  202. // Used by the control server for stats and HTTP audio ingest.
  203. func (e *Engine) StreamSource() *audio.StreamSource {
  204. return e.streamSrc
  205. }
  206. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  207. deviceRate := cfg.EffectiveDeviceRate()
  208. compositeRate := float64(cfg.FM.CompositeRateHz)
  209. if compositeRate <= 0 {
  210. compositeRate = 228000
  211. }
  212. var upsampler *dsp.FMUpsampler
  213. if deviceRate > compositeRate*1.001 {
  214. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  215. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  216. // This halves CPU load compared to running all DSP at deviceRate.
  217. cfg.FM.FMModulationEnabled = false
  218. maxDev := cfg.FM.MaxDeviationHz
  219. if maxDev <= 0 {
  220. maxDev = 75000
  221. }
  222. // mpxGain scales the FM deviation to compensate for hardware
  223. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  224. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  225. maxDev *= cfg.FM.MpxGain
  226. }
  227. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  228. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  229. compositeRate, deviceRate, deviceRate/compositeRate)
  230. } else {
  231. // Same-rate: entire DSP chain runs at deviceRate.
  232. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  233. if deviceRate > 0 {
  234. cfg.FM.CompositeRateHz = int(deviceRate)
  235. }
  236. cfg.FM.FMModulationEnabled = true
  237. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  238. }
  239. engine := &Engine{
  240. cfg: cfg,
  241. driver: driver,
  242. generator: offpkg.NewGenerator(cfg),
  243. upsampler: upsampler,
  244. chunkDuration: 50 * time.Millisecond,
  245. deviceRate: deviceRate,
  246. state: EngineIdle,
  247. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  248. faultHistory: make([]FaultEvent, 0, faultHistoryCapacity),
  249. transitionHistory: make([]RuntimeTransition, 0, runtimeTransitionHistoryCapacity),
  250. }
  251. initFreqHz := cfg.FM.FrequencyMHz * 1e6
  252. engine.appliedFreqHz.Store(math.Float64bits(initFreqHz))
  253. engine.setRuntimeState(RuntimeStateIdle)
  254. return engine
  255. }
  256. func (e *Engine) SetChunkDuration(d time.Duration) {
  257. e.chunkDuration = d
  258. }
  259. func (e *Engine) SetMeasurementPublisher(fn func(*offpkg.MeasurementSnapshot)) {
  260. e.measurementPublisherMu.Lock()
  261. e.measurementPublisher = fn
  262. e.measurementPublisherMu.Unlock()
  263. }
  264. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  265. // nil pointers mean "no change". Validated before applying.
  266. type LiveConfigUpdate struct {
  267. FrequencyMHz *float64
  268. OutputDrive *float64
  269. StereoEnabled *bool
  270. StereoMode *string
  271. PilotLevel *float64
  272. RDSInjection *float64
  273. RDSEnabled *bool
  274. LimiterEnabled *bool
  275. LimiterCeiling *float64
  276. PS *string
  277. RadioText *string
  278. TA *bool
  279. TP *bool
  280. // Tone and gain: live-patchable without engine restart.
  281. ToneLeftHz *float64
  282. ToneRightHz *float64
  283. ToneAmplitude *float64
  284. AudioGain *float64
  285. CompositeClipperEnabled *bool
  286. }
  287. // UpdateConfig applies live parameter changes without restarting the engine.
  288. // DSP params take effect at the next chunk boundary (~50ms max).
  289. // Frequency changes are applied between chunks via driver.Tune().
  290. // RDS text updates are applied at the next RDS group boundary (~88ms).
  291. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  292. // --- Validate ---
  293. if u.FrequencyMHz != nil {
  294. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  295. return fmt.Errorf("frequencyMHz out of range (65-110)")
  296. }
  297. }
  298. if u.OutputDrive != nil {
  299. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  300. return fmt.Errorf("outputDrive out of range (0-10)")
  301. }
  302. }
  303. if u.PilotLevel != nil {
  304. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  305. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  306. }
  307. }
  308. if u.RDSInjection != nil {
  309. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  310. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  311. }
  312. }
  313. if u.LimiterCeiling != nil {
  314. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  315. return fmt.Errorf("limiterCeiling out of range (0-2)")
  316. }
  317. }
  318. if u.ToneAmplitude != nil {
  319. if *u.ToneAmplitude < 0 || *u.ToneAmplitude > 1 {
  320. return fmt.Errorf("toneAmplitude out of range (0-1)")
  321. }
  322. }
  323. if u.AudioGain != nil {
  324. if *u.AudioGain < 0 || *u.AudioGain > 4 {
  325. return fmt.Errorf("audioGain out of range (0-4)")
  326. }
  327. }
  328. // --- Frequency: store for run loop to apply via driver.Tune() ---
  329. if u.FrequencyMHz != nil {
  330. freqHz := *u.FrequencyMHz * 1e6
  331. e.pendingFreq.Store(&freqHz)
  332. }
  333. // --- RDS text: forward to encoder atomics ---
  334. if u.PS != nil || u.RadioText != nil {
  335. if enc := e.generator.RDSEncoder(); enc != nil {
  336. ps, rt := "", ""
  337. if u.PS != nil {
  338. ps = *u.PS
  339. }
  340. if u.RadioText != nil {
  341. rt = *u.RadioText
  342. }
  343. enc.UpdateText(ps, rt)
  344. }
  345. }
  346. // --- RDS traffic flags: live-update ---
  347. if u.TA != nil || u.TP != nil {
  348. if enc := e.generator.RDSEncoder(); enc != nil {
  349. if u.TA != nil {
  350. enc.UpdateTA(*u.TA)
  351. }
  352. if u.TP != nil {
  353. enc.UpdateTP(*u.TP)
  354. }
  355. }
  356. }
  357. // --- DSP params: build new LiveParams from current + patch ---
  358. // Read current, apply deltas, store new
  359. current := e.generator.CurrentLiveParams()
  360. next := current // copy
  361. if u.OutputDrive != nil {
  362. next.OutputDrive = *u.OutputDrive
  363. }
  364. if u.StereoEnabled != nil {
  365. next.StereoEnabled = *u.StereoEnabled
  366. }
  367. if u.StereoMode != nil {
  368. next.StereoMode = *u.StereoMode
  369. }
  370. if u.PilotLevel != nil {
  371. next.PilotLevel = *u.PilotLevel
  372. }
  373. if u.RDSInjection != nil {
  374. next.RDSInjection = *u.RDSInjection
  375. }
  376. if u.RDSEnabled != nil {
  377. next.RDSEnabled = *u.RDSEnabled
  378. }
  379. if u.LimiterEnabled != nil {
  380. next.LimiterEnabled = *u.LimiterEnabled
  381. }
  382. if u.LimiterCeiling != nil {
  383. next.LimiterCeiling = *u.LimiterCeiling
  384. }
  385. if u.ToneLeftHz != nil {
  386. next.ToneLeftHz = *u.ToneLeftHz
  387. }
  388. if u.ToneRightHz != nil {
  389. next.ToneRightHz = *u.ToneRightHz
  390. }
  391. if u.ToneAmplitude != nil {
  392. next.ToneAmplitude = *u.ToneAmplitude
  393. }
  394. if u.AudioGain != nil {
  395. next.AudioGain = *u.AudioGain
  396. }
  397. if u.CompositeClipperEnabled != nil {
  398. next.CompositeClipperEnabled = *u.CompositeClipperEnabled
  399. }
  400. e.generator.UpdateLive(next)
  401. return nil
  402. }
  403. func (e *Engine) Start(ctx context.Context) error {
  404. e.mu.Lock()
  405. if e.state != EngineIdle {
  406. e.mu.Unlock()
  407. return fmt.Errorf("engine already in state %s", e.state)
  408. }
  409. if err := e.driver.Start(ctx); err != nil {
  410. e.mu.Unlock()
  411. return fmt.Errorf("driver start: %w", err)
  412. }
  413. e.generator.Reset()
  414. if e.upsampler != nil {
  415. e.upsampler.Reset()
  416. }
  417. runCtx, cancel := context.WithCancel(ctx)
  418. e.cancel = cancel
  419. e.state = EngineRunning
  420. e.setRuntimeState(RuntimeStateArming)
  421. e.startedAt = time.Now()
  422. // BUG-A fix: discard any frames left from a previous run so writerLoop
  423. // does not send stale data with expired timestamps on restart.
  424. e.frameQueue.Drain()
  425. e.wg.Add(1)
  426. e.mu.Unlock()
  427. go e.run(runCtx)
  428. return nil
  429. }
  430. func (e *Engine) Stop(ctx context.Context) error {
  431. e.mu.Lock()
  432. if e.state != EngineRunning {
  433. e.mu.Unlock()
  434. return nil
  435. }
  436. e.state = EngineStopping
  437. e.setRuntimeState(RuntimeStateStopping)
  438. e.cancel()
  439. e.mu.Unlock()
  440. // Wait for run() goroutine to exit — deterministic, no guessing
  441. e.wg.Wait()
  442. if err := e.driver.Flush(ctx); err != nil {
  443. return err
  444. }
  445. if err := e.driver.Stop(ctx); err != nil {
  446. return err
  447. }
  448. e.mu.Lock()
  449. e.state = EngineIdle
  450. e.setRuntimeState(RuntimeStateIdle)
  451. e.mu.Unlock()
  452. return nil
  453. }
  454. func (e *Engine) Stats() EngineStats {
  455. e.mu.Lock()
  456. state := e.state
  457. startedAt := e.startedAt
  458. e.mu.Unlock()
  459. var uptime float64
  460. if state == EngineRunning {
  461. uptime = time.Since(startedAt).Seconds()
  462. }
  463. errVal, _ := e.lastError.Load().(string)
  464. queue := e.frameQueue.Stats()
  465. lateBuffers := e.lateBuffers.Load()
  466. hasRecentLateBuffers := e.hasRecentLateBuffers()
  467. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  468. lastFault := e.lastFaultEvent()
  469. activePS, activeRT := "", ""
  470. if enc := e.generator.RDSEncoder(); enc != nil {
  471. activePS, activeRT = enc.CurrentText()
  472. }
  473. return EngineStats{
  474. State: string(e.currentRuntimeState()),
  475. RuntimeStateDurationSeconds: e.runtimeStateDurationSeconds(),
  476. ChunksProduced: e.chunksProduced.Load(),
  477. TotalSamples: e.totalSamples.Load(),
  478. Underruns: e.underruns.Load(),
  479. LateBuffers: lateBuffers,
  480. LastError: errVal,
  481. UptimeSeconds: uptime,
  482. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  483. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  484. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  485. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  486. MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()),
  487. MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()),
  488. Queue: queue,
  489. RuntimeIndicator: ri,
  490. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  491. AppliedFrequencyMHz: e.appliedFrequencyMHz(),
  492. ActivePS: activePS,
  493. ActiveRadioText: activeRT,
  494. Measurement: e.generator.LatestMeasurement(),
  495. LastFault: lastFault,
  496. DegradedTransitions: e.degradedTransitions.Load(),
  497. MutedTransitions: e.mutedTransitions.Load(),
  498. FaultedTransitions: e.faultedTransitions.Load(),
  499. FaultCount: e.faultEvents.Load(),
  500. FaultHistory: e.FaultHistory(),
  501. TransitionHistory: e.TransitionHistory(),
  502. }
  503. }
  504. func (e *Engine) appliedFrequencyMHz() float64 {
  505. bits := e.appliedFreqHz.Load()
  506. return math.Float64frombits(bits) / 1e6
  507. }
  508. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  509. switch {
  510. case queueHealth == output.QueueHealthCritical:
  511. return RuntimeIndicatorQueueCritical
  512. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  513. return RuntimeIndicatorDegraded
  514. default:
  515. return RuntimeIndicatorNormal
  516. }
  517. }
  518. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  519. switch {
  520. case queueHealth == output.QueueHealthCritical:
  521. return "queue health critical"
  522. case recentLateBuffers:
  523. return "late buffers"
  524. case queueHealth == output.QueueHealthLow:
  525. return "queue health low"
  526. default:
  527. return ""
  528. }
  529. }
  530. func runtimeStateSeverity(state RuntimeState) string {
  531. switch state {
  532. case RuntimeStateRunning:
  533. return "ok"
  534. case RuntimeStateDegraded, RuntimeStateMuted:
  535. return "warn"
  536. case RuntimeStateFaulted:
  537. return "err"
  538. default:
  539. return "info"
  540. }
  541. }
  542. func (e *Engine) run(ctx context.Context) {
  543. e.setRuntimeState(RuntimeStatePrebuffering)
  544. e.wg.Add(1)
  545. go e.writerLoop(ctx)
  546. defer e.wg.Done()
  547. for {
  548. if ctx.Err() != nil {
  549. return
  550. }
  551. // Apply pending frequency change between chunks
  552. if pf := e.pendingFreq.Swap(nil); pf != nil {
  553. if err := e.driver.Tune(ctx, *pf); err != nil {
  554. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  555. } else {
  556. e.appliedFreqHz.Store(math.Float64bits(*pf))
  557. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  558. }
  559. }
  560. t0 := time.Now()
  561. frame := e.generator.GenerateFrame(e.chunkDuration)
  562. frame.GeneratedAt = t0
  563. t1 := time.Now()
  564. if e.upsampler != nil {
  565. frame = e.upsampler.Process(frame)
  566. frame.GeneratedAt = t0
  567. }
  568. t2 := time.Now()
  569. genDur := t1.Sub(t0)
  570. upDur := t2.Sub(t1)
  571. updateMaxDuration(&e.maxGenerateNs, genDur)
  572. updateMaxDuration(&e.maxUpsampleNs, upDur)
  573. // cloneFrame never returns nil when src is non-nil (NEW-3: dead nil check removed)
  574. enqueued := cloneFrame(frame)
  575. enqueued.EnqueuedAt = time.Now()
  576. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  577. if ctx.Err() != nil {
  578. return
  579. }
  580. if errors.Is(err, output.ErrFrameQueueClosed) {
  581. return
  582. }
  583. e.lastError.Store(err.Error())
  584. e.underruns.Add(1)
  585. select {
  586. case <-time.After(e.chunkDuration):
  587. case <-ctx.Done():
  588. return
  589. }
  590. continue
  591. }
  592. queueStats := e.frameQueue.Stats()
  593. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  594. }
  595. }
  596. func (e *Engine) writerLoop(ctx context.Context) {
  597. defer e.wg.Done()
  598. for {
  599. frame, err := e.frameQueue.Pop(ctx)
  600. if err != nil {
  601. if ctx.Err() != nil {
  602. return
  603. }
  604. if errors.Is(err, output.ErrFrameQueueClosed) {
  605. return
  606. }
  607. e.lastError.Store(err.Error())
  608. e.underruns.Add(1)
  609. continue
  610. }
  611. frame.DequeuedAt = time.Now()
  612. queueResidence := time.Duration(0)
  613. if !frame.EnqueuedAt.IsZero() {
  614. queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt)
  615. }
  616. writeStart := time.Now()
  617. frame.WriteStartedAt = writeStart
  618. n, err := e.driver.Write(ctx, frame)
  619. writeDur := time.Since(writeStart)
  620. pipelineLatency := writeDur
  621. if !frame.GeneratedAt.IsZero() {
  622. pipelineLatency = time.Since(frame.GeneratedAt)
  623. }
  624. updateMaxDuration(&e.maxWriteNs, writeDur)
  625. updateMaxDuration(&e.maxQueueResidenceNs, queueResidence)
  626. updateMaxDuration(&e.maxPipelineNs, pipelineLatency)
  627. updateMaxDuration(&e.maxCycleNs, writeDur)
  628. queueStats := e.frameQueue.Stats()
  629. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  630. lateOver := writeDur - e.chunkDuration
  631. if lateOver > writeLateTolerance {
  632. streak := e.lateBufferStreak.Add(1)
  633. late := e.lateBuffers.Add(1)
  634. // Only arm the alert window once the streak threshold is reached.
  635. // Isolated OS-scheduling or USB jitter spikes (single late writes)
  636. // are normal on a loaded system and must not trigger degraded state.
  637. // This mirrors the queue-health streak logic.
  638. if streak >= lateBufferStreakThreshold {
  639. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  640. }
  641. if late <= 5 || late%20 == 0 {
  642. log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s",
  643. streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency)
  644. }
  645. } else {
  646. // Clean write — reset the consecutive streak so isolated spikes
  647. // never accumulate toward the threshold.
  648. e.lateBufferStreak.Store(0)
  649. }
  650. if err != nil {
  651. if ctx.Err() != nil {
  652. return
  653. }
  654. e.recordFault(FaultReasonWriteTimeout, FaultSeverityWarn, fmt.Sprintf("driver write error: %v", err))
  655. e.lastError.Store(err.Error())
  656. e.underruns.Add(1)
  657. select {
  658. case <-time.After(e.chunkDuration):
  659. case <-ctx.Done():
  660. return
  661. }
  662. continue
  663. }
  664. e.chunksProduced.Add(1)
  665. e.totalSamples.Add(uint64(n))
  666. if m := e.generator.LatestMeasurement(); m != nil {
  667. if m.Sequence != e.lastPublishedMeasSeq.Load() {
  668. e.measurementPublisherMu.RLock()
  669. pub := e.measurementPublisher
  670. e.measurementPublisherMu.RUnlock()
  671. if pub != nil {
  672. pub(m)
  673. }
  674. e.lastPublishedMeasSeq.Store(m.Sequence)
  675. }
  676. }
  677. }
  678. }
  679. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  680. if src == nil {
  681. return nil
  682. }
  683. samples := make([]output.IQSample, len(src.Samples))
  684. copy(samples, src.Samples)
  685. return &output.CompositeFrame{
  686. Samples: samples,
  687. SampleRateHz: src.SampleRateHz,
  688. Timestamp: src.Timestamp,
  689. GeneratedAt: src.GeneratedAt,
  690. EnqueuedAt: src.EnqueuedAt,
  691. DequeuedAt: src.DequeuedAt,
  692. WriteStartedAt: src.WriteStartedAt,
  693. Sequence: src.Sequence,
  694. }
  695. }
  696. func (e *Engine) setRuntimeState(state RuntimeState) {
  697. // NEW-2 fix: hold stateMu so that concurrent calls from run() and
  698. // writerLoop() cannot both see prev != state and both record a
  699. // spurious duplicate transition.
  700. e.stateMu.Lock()
  701. defer e.stateMu.Unlock()
  702. now := time.Now()
  703. prev := e.currentRuntimeState()
  704. if prev != state {
  705. e.recordRuntimeTransition(prev, state, now)
  706. switch state {
  707. case RuntimeStateDegraded:
  708. e.degradedTransitions.Add(1)
  709. case RuntimeStateMuted:
  710. e.mutedTransitions.Add(1)
  711. case RuntimeStateFaulted:
  712. e.faultedTransitions.Add(1)
  713. }
  714. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  715. } else if e.runtimeStateEnteredAt.Load() == 0 {
  716. e.runtimeStateEnteredAt.Store(uint64(now.UnixNano()))
  717. }
  718. e.runtimeState.Store(state)
  719. }
  720. func (e *Engine) currentRuntimeState() RuntimeState {
  721. if v := e.runtimeState.Load(); v != nil {
  722. if rs, ok := v.(RuntimeState); ok {
  723. return rs
  724. }
  725. }
  726. return RuntimeStateIdle
  727. }
  728. func (e *Engine) runtimeStateDurationSeconds() float64 {
  729. if ts := e.runtimeStateEnteredAt.Load(); ts != 0 {
  730. return time.Since(time.Unix(0, int64(ts))).Seconds()
  731. }
  732. return 0
  733. }
  734. func (e *Engine) hasRecentLateBuffers() bool {
  735. lateAlertAt := e.lateBufferAlertAt.Load()
  736. if lateAlertAt == 0 {
  737. return false
  738. }
  739. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  740. }
  741. func (e *Engine) lastFaultEvent() *FaultEvent {
  742. return copyFaultEvent(e.loadLastFault())
  743. }
  744. // LastFault exposes the most recent captured fault, if any.
  745. func (e *Engine) LastFault() *FaultEvent {
  746. return e.lastFaultEvent()
  747. }
  748. func (e *Engine) FaultHistory() []FaultEvent {
  749. e.faultHistoryMu.Lock()
  750. defer e.faultHistoryMu.Unlock()
  751. history := make([]FaultEvent, len(e.faultHistory))
  752. copy(history, e.faultHistory)
  753. return history
  754. }
  755. func (e *Engine) TransitionHistory() []RuntimeTransition {
  756. e.transitionHistoryMu.Lock()
  757. defer e.transitionHistoryMu.Unlock()
  758. history := make([]RuntimeTransition, len(e.transitionHistory))
  759. copy(history, e.transitionHistory)
  760. return history
  761. }
  762. func (e *Engine) recordRuntimeTransition(from, to RuntimeState, when time.Time) {
  763. if when.IsZero() {
  764. when = time.Now()
  765. }
  766. ev := RuntimeTransition{
  767. Time: when,
  768. From: from,
  769. To: to,
  770. Severity: runtimeStateSeverity(to),
  771. }
  772. e.transitionHistoryMu.Lock()
  773. defer e.transitionHistoryMu.Unlock()
  774. if len(e.transitionHistory) >= runtimeTransitionHistoryCapacity {
  775. copy(e.transitionHistory, e.transitionHistory[1:])
  776. e.transitionHistory[len(e.transitionHistory)-1] = ev
  777. return
  778. }
  779. e.transitionHistory = append(e.transitionHistory, ev)
  780. }
  781. func (e *Engine) recordFault(reason FaultReason, severity FaultSeverity, message string) {
  782. if reason == "" {
  783. reason = FaultReasonUnknown
  784. }
  785. now := time.Now()
  786. if last := e.loadLastFault(); last != nil {
  787. if last.Reason == reason && last.Severity == severity && now.Sub(last.Time) < faultRepeatWindow {
  788. return
  789. }
  790. }
  791. ev := &FaultEvent{
  792. Time: now,
  793. Reason: reason,
  794. Severity: severity,
  795. Message: message,
  796. }
  797. e.lastFault.Store(ev)
  798. e.appendFaultHistory(ev)
  799. e.faultEvents.Add(1)
  800. }
  801. func (e *Engine) loadLastFault() *FaultEvent {
  802. if v := e.lastFault.Load(); v != nil {
  803. if ev, ok := v.(*FaultEvent); ok {
  804. return ev
  805. }
  806. }
  807. return nil
  808. }
  809. func copyFaultEvent(source *FaultEvent) *FaultEvent {
  810. if source == nil {
  811. return nil
  812. }
  813. copy := *source
  814. return &copy
  815. }
  816. func (e *Engine) appendFaultHistory(ev *FaultEvent) {
  817. e.faultHistoryMu.Lock()
  818. defer e.faultHistoryMu.Unlock()
  819. if len(e.faultHistory) >= faultHistoryCapacity {
  820. copy(e.faultHistory, e.faultHistory[1:])
  821. e.faultHistory[len(e.faultHistory)-1] = *ev
  822. return
  823. }
  824. e.faultHistory = append(e.faultHistory, *ev)
  825. }
  826. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  827. state := e.currentRuntimeState()
  828. switch state {
  829. case RuntimeStateStopping, RuntimeStateFaulted:
  830. return
  831. case RuntimeStateMuted:
  832. if queue.Health == output.QueueHealthCritical {
  833. if count := e.mutedFaultStreak.Add(1); count >= queueFaultedStreakThreshold {
  834. e.mutedFaultStreak.Store(0)
  835. e.recordFault(FaultReasonQueueCritical, FaultSeverityFaulted,
  836. fmt.Sprintf("queue health critical for %d checks while muted (depth=%d)", count, queue.Depth))
  837. e.setRuntimeState(RuntimeStateFaulted)
  838. return
  839. }
  840. } else {
  841. e.mutedFaultStreak.Store(0)
  842. }
  843. if queue.Health == output.QueueHealthNormal && !hasLateBuffers {
  844. if count := e.mutedRecoveryStreak.Add(1); count >= queueMutedRecoveryThreshold {
  845. e.mutedRecoveryStreak.Store(0)
  846. e.mutedFaultStreak.Store(0)
  847. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  848. fmt.Sprintf("queue healthy for %d checks after mute", count))
  849. e.setRuntimeState(RuntimeStateDegraded)
  850. }
  851. } else {
  852. e.mutedRecoveryStreak.Store(0)
  853. }
  854. return
  855. }
  856. if state == RuntimeStatePrebuffering {
  857. if queue.Depth >= 1 {
  858. e.setRuntimeState(RuntimeStateRunning)
  859. }
  860. return
  861. }
  862. critical := queue.Health == output.QueueHealthCritical
  863. if critical {
  864. count := e.criticalStreak.Add(1)
  865. if count >= queueMutedStreakThreshold {
  866. e.recordFault(FaultReasonQueueCritical, FaultSeverityMuted,
  867. fmt.Sprintf("queue health critical for %d consecutive checks (depth=%d)", count, queue.Depth))
  868. e.setRuntimeState(RuntimeStateMuted)
  869. return
  870. }
  871. if count >= queueCriticalStreakThreshold {
  872. e.recordFault(FaultReasonQueueCritical, FaultSeverityDegraded,
  873. fmt.Sprintf("queue health critical (depth=%d)", queue.Depth))
  874. e.setRuntimeState(RuntimeStateDegraded)
  875. return
  876. }
  877. } else {
  878. e.criticalStreak.Store(0)
  879. }
  880. if hasLateBuffers {
  881. e.recordFault(FaultReasonLateBuffers, FaultSeverityWarn,
  882. fmt.Sprintf("late buffers detected (health=%s)", queue.Health))
  883. e.setRuntimeState(RuntimeStateDegraded)
  884. return
  885. }
  886. e.setRuntimeState(RuntimeStateRunning)
  887. }
  888. // ResetFault attempts to move the engine out of the faulted state.
  889. func (e *Engine) ResetFault() error {
  890. state := e.currentRuntimeState()
  891. if state != RuntimeStateFaulted {
  892. return fmt.Errorf("engine not in faulted state (current=%s)", state)
  893. }
  894. e.criticalStreak.Store(0)
  895. e.mutedRecoveryStreak.Store(0)
  896. e.mutedFaultStreak.Store(0)
  897. e.setRuntimeState(RuntimeStateDegraded)
  898. return nil
  899. }