Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

606 строки
16KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. type RuntimeState string
  36. const (
  37. RuntimeStateIdle RuntimeState = "idle"
  38. RuntimeStateArming RuntimeState = "arming"
  39. RuntimeStatePrebuffering RuntimeState = "prebuffering"
  40. RuntimeStateRunning RuntimeState = "running"
  41. RuntimeStateDegraded RuntimeState = "degraded"
  42. RuntimeStateMuted RuntimeState = "muted"
  43. RuntimeStateFaulted RuntimeState = "faulted"
  44. RuntimeStateStopping RuntimeState = "stopping"
  45. )
  46. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  47. v := uint64(d)
  48. for {
  49. cur := dst.Load()
  50. if v <= cur {
  51. return
  52. }
  53. if dst.CompareAndSwap(cur, v) {
  54. return
  55. }
  56. }
  57. }
  58. func durationMs(ns uint64) float64 {
  59. return float64(ns) / float64(time.Millisecond)
  60. }
  61. type EngineStats struct {
  62. State string `json:"state"`
  63. ChunksProduced uint64 `json:"chunksProduced"`
  64. TotalSamples uint64 `json:"totalSamples"`
  65. Underruns uint64 `json:"underruns"`
  66. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  67. LastError string `json:"lastError,omitempty"`
  68. UptimeSeconds float64 `json:"uptimeSeconds"`
  69. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  70. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  71. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  72. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  73. Queue output.QueueStats `json:"queue"`
  74. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  75. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  76. }
  77. type RuntimeIndicator string
  78. const (
  79. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  80. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  81. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  82. )
  83. const lateBufferIndicatorWindow = 5 * time.Second
  84. const queueCriticalStreakThreshold = 3
  85. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  86. // resamples to device rate, and pushes to hardware in a tight loop.
  87. // The hardware buffer_push call is blocking — it returns when the hardware
  88. // has consumed the previous buffer and is ready for the next one.
  89. // This naturally paces the loop to real-time without a ticker.
  90. type Engine struct {
  91. cfg cfgpkg.Config
  92. driver platform.SoapyDriver
  93. generator *offpkg.Generator
  94. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  95. chunkDuration time.Duration
  96. deviceRate float64
  97. frameQueue *output.FrameQueue
  98. mu sync.Mutex
  99. state EngineState
  100. cancel context.CancelFunc
  101. startedAt time.Time
  102. wg sync.WaitGroup
  103. runtimeState atomic.Value
  104. chunksProduced atomic.Uint64
  105. totalSamples atomic.Uint64
  106. underruns atomic.Uint64
  107. lateBuffers atomic.Uint64
  108. lateBufferAlertAt atomic.Uint64
  109. criticalStreak atomic.Uint64
  110. maxCycleNs atomic.Uint64
  111. maxGenerateNs atomic.Uint64
  112. maxUpsampleNs atomic.Uint64
  113. maxWriteNs atomic.Uint64
  114. lastError atomic.Value // string
  115. // Live config: pending frequency change, applied between chunks
  116. pendingFreq atomic.Pointer[float64]
  117. // Live audio stream (optional)
  118. streamSrc *audio.StreamSource
  119. }
  120. // SetStreamSource configures a live audio stream as the audio source.
  121. // Must be called before Start(). The StreamResampler is created internally
  122. // to convert from the stream's sample rate to the DSP composite rate.
  123. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  124. e.streamSrc = src
  125. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  126. if compositeRate <= 0 {
  127. compositeRate = 228000
  128. }
  129. resampler := audio.NewStreamResampler(src, compositeRate)
  130. e.generator.SetExternalSource(resampler)
  131. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  132. src.SampleRate, compositeRate, src.Stats().Capacity)
  133. }
  134. // StreamSource returns the live audio stream source, or nil.
  135. // Used by the control server for stats and HTTP audio ingest.
  136. func (e *Engine) StreamSource() *audio.StreamSource {
  137. return e.streamSrc
  138. }
  139. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  140. deviceRate := cfg.EffectiveDeviceRate()
  141. compositeRate := float64(cfg.FM.CompositeRateHz)
  142. if compositeRate <= 0 {
  143. compositeRate = 228000
  144. }
  145. var upsampler *dsp.FMUpsampler
  146. if deviceRate > compositeRate*1.001 {
  147. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  148. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  149. // This halves CPU load compared to running all DSP at deviceRate.
  150. cfg.FM.FMModulationEnabled = false
  151. maxDev := cfg.FM.MaxDeviationHz
  152. if maxDev <= 0 {
  153. maxDev = 75000
  154. }
  155. // mpxGain scales the FM deviation to compensate for hardware
  156. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  157. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  158. maxDev *= cfg.FM.MpxGain
  159. }
  160. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  161. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  162. compositeRate, deviceRate, deviceRate/compositeRate)
  163. } else {
  164. // Same-rate: entire DSP chain runs at deviceRate.
  165. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  166. if deviceRate > 0 {
  167. cfg.FM.CompositeRateHz = int(deviceRate)
  168. }
  169. cfg.FM.FMModulationEnabled = true
  170. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  171. }
  172. engine := &Engine{
  173. cfg: cfg,
  174. driver: driver,
  175. generator: offpkg.NewGenerator(cfg),
  176. upsampler: upsampler,
  177. chunkDuration: 50 * time.Millisecond,
  178. deviceRate: deviceRate,
  179. state: EngineIdle,
  180. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  181. }
  182. engine.setRuntimeState(RuntimeStateIdle)
  183. return engine
  184. }
  185. func (e *Engine) SetChunkDuration(d time.Duration) {
  186. e.chunkDuration = d
  187. }
  188. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  189. // nil pointers mean "no change". Validated before applying.
  190. type LiveConfigUpdate struct {
  191. FrequencyMHz *float64
  192. OutputDrive *float64
  193. StereoEnabled *bool
  194. PilotLevel *float64
  195. RDSInjection *float64
  196. RDSEnabled *bool
  197. LimiterEnabled *bool
  198. LimiterCeiling *float64
  199. PS *string
  200. RadioText *string
  201. }
  202. // UpdateConfig applies live parameter changes without restarting the engine.
  203. // DSP params take effect at the next chunk boundary (~50ms max).
  204. // Frequency changes are applied between chunks via driver.Tune().
  205. // RDS text updates are applied at the next RDS group boundary (~88ms).
  206. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  207. // --- Validate ---
  208. if u.FrequencyMHz != nil {
  209. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  210. return fmt.Errorf("frequencyMHz out of range (65-110)")
  211. }
  212. }
  213. if u.OutputDrive != nil {
  214. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  215. return fmt.Errorf("outputDrive out of range (0-10)")
  216. }
  217. }
  218. if u.PilotLevel != nil {
  219. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  220. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  221. }
  222. }
  223. if u.RDSInjection != nil {
  224. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  225. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  226. }
  227. }
  228. if u.LimiterCeiling != nil {
  229. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  230. return fmt.Errorf("limiterCeiling out of range (0-2)")
  231. }
  232. }
  233. // --- Frequency: store for run loop to apply via driver.Tune() ---
  234. if u.FrequencyMHz != nil {
  235. freqHz := *u.FrequencyMHz * 1e6
  236. e.pendingFreq.Store(&freqHz)
  237. }
  238. // --- RDS text: forward to encoder atomics ---
  239. if u.PS != nil || u.RadioText != nil {
  240. if enc := e.generator.RDSEncoder(); enc != nil {
  241. ps, rt := "", ""
  242. if u.PS != nil {
  243. ps = *u.PS
  244. }
  245. if u.RadioText != nil {
  246. rt = *u.RadioText
  247. }
  248. enc.UpdateText(ps, rt)
  249. }
  250. }
  251. // --- DSP params: build new LiveParams from current + patch ---
  252. // Read current, apply deltas, store new
  253. current := e.generator.CurrentLiveParams()
  254. next := current // copy
  255. if u.OutputDrive != nil {
  256. next.OutputDrive = *u.OutputDrive
  257. }
  258. if u.StereoEnabled != nil {
  259. next.StereoEnabled = *u.StereoEnabled
  260. }
  261. if u.PilotLevel != nil {
  262. next.PilotLevel = *u.PilotLevel
  263. }
  264. if u.RDSInjection != nil {
  265. next.RDSInjection = *u.RDSInjection
  266. }
  267. if u.RDSEnabled != nil {
  268. next.RDSEnabled = *u.RDSEnabled
  269. }
  270. if u.LimiterEnabled != nil {
  271. next.LimiterEnabled = *u.LimiterEnabled
  272. }
  273. if u.LimiterCeiling != nil {
  274. next.LimiterCeiling = *u.LimiterCeiling
  275. }
  276. e.generator.UpdateLive(next)
  277. return nil
  278. }
  279. func (e *Engine) Start(ctx context.Context) error {
  280. e.mu.Lock()
  281. if e.state != EngineIdle {
  282. e.mu.Unlock()
  283. return fmt.Errorf("engine already in state %s", e.state)
  284. }
  285. if err := e.driver.Start(ctx); err != nil {
  286. e.mu.Unlock()
  287. return fmt.Errorf("driver start: %w", err)
  288. }
  289. runCtx, cancel := context.WithCancel(ctx)
  290. e.cancel = cancel
  291. e.state = EngineRunning
  292. e.setRuntimeState(RuntimeStateArming)
  293. e.startedAt = time.Now()
  294. e.wg.Add(1)
  295. e.mu.Unlock()
  296. go e.run(runCtx)
  297. return nil
  298. }
  299. func (e *Engine) Stop(ctx context.Context) error {
  300. e.mu.Lock()
  301. if e.state != EngineRunning {
  302. e.mu.Unlock()
  303. return nil
  304. }
  305. e.state = EngineStopping
  306. e.setRuntimeState(RuntimeStateStopping)
  307. e.cancel()
  308. e.mu.Unlock()
  309. // Wait for run() goroutine to exit — deterministic, no guessing
  310. e.wg.Wait()
  311. if err := e.driver.Flush(ctx); err != nil {
  312. return err
  313. }
  314. if err := e.driver.Stop(ctx); err != nil {
  315. return err
  316. }
  317. e.mu.Lock()
  318. e.state = EngineIdle
  319. e.setRuntimeState(RuntimeStateIdle)
  320. e.mu.Unlock()
  321. return nil
  322. }
  323. func (e *Engine) Stats() EngineStats {
  324. e.mu.Lock()
  325. state := e.state
  326. startedAt := e.startedAt
  327. e.mu.Unlock()
  328. var uptime float64
  329. if state == EngineRunning {
  330. uptime = time.Since(startedAt).Seconds()
  331. }
  332. errVal, _ := e.lastError.Load().(string)
  333. queue := e.frameQueue.Stats()
  334. lateBuffers := e.lateBuffers.Load()
  335. hasRecentLateBuffers := e.hasRecentLateBuffers()
  336. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  337. return EngineStats{
  338. State: string(e.currentRuntimeState()),
  339. ChunksProduced: e.chunksProduced.Load(),
  340. TotalSamples: e.totalSamples.Load(),
  341. Underruns: e.underruns.Load(),
  342. LateBuffers: lateBuffers,
  343. LastError: errVal,
  344. UptimeSeconds: uptime,
  345. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  346. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  347. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  348. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  349. Queue: queue,
  350. RuntimeIndicator: ri,
  351. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  352. }
  353. }
  354. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  355. switch {
  356. case queueHealth == output.QueueHealthCritical:
  357. return RuntimeIndicatorQueueCritical
  358. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  359. return RuntimeIndicatorDegraded
  360. default:
  361. return RuntimeIndicatorNormal
  362. }
  363. }
  364. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  365. switch {
  366. case queueHealth == output.QueueHealthCritical:
  367. return "queue health critical"
  368. case recentLateBuffers:
  369. return "late buffers"
  370. case queueHealth == output.QueueHealthLow:
  371. return "queue health low"
  372. default:
  373. return ""
  374. }
  375. }
  376. func (e *Engine) run(ctx context.Context) {
  377. e.setRuntimeState(RuntimeStatePrebuffering)
  378. e.wg.Add(1)
  379. go e.writerLoop(ctx)
  380. defer e.wg.Done()
  381. for {
  382. if ctx.Err() != nil {
  383. return
  384. }
  385. // Apply pending frequency change between chunks
  386. if pf := e.pendingFreq.Swap(nil); pf != nil {
  387. if err := e.driver.Tune(ctx, *pf); err != nil {
  388. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  389. } else {
  390. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  391. }
  392. }
  393. t0 := time.Now()
  394. frame := e.generator.GenerateFrame(e.chunkDuration)
  395. frame.GeneratedAt = t0
  396. t1 := time.Now()
  397. if e.upsampler != nil {
  398. frame = e.upsampler.Process(frame)
  399. frame.GeneratedAt = t0
  400. }
  401. t2 := time.Now()
  402. genDur := t1.Sub(t0)
  403. upDur := t2.Sub(t1)
  404. updateMaxDuration(&e.maxGenerateNs, genDur)
  405. updateMaxDuration(&e.maxUpsampleNs, upDur)
  406. enqueued := cloneFrame(frame)
  407. if enqueued == nil {
  408. e.lastError.Store("engine: frame clone failed")
  409. e.underruns.Add(1)
  410. continue
  411. }
  412. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  413. if ctx.Err() != nil {
  414. return
  415. }
  416. if errors.Is(err, output.ErrFrameQueueClosed) {
  417. return
  418. }
  419. e.lastError.Store(err.Error())
  420. e.underruns.Add(1)
  421. select {
  422. case <-time.After(e.chunkDuration):
  423. case <-ctx.Done():
  424. return
  425. }
  426. continue
  427. }
  428. queueStats := e.frameQueue.Stats()
  429. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  430. }
  431. }
  432. func (e *Engine) writerLoop(ctx context.Context) {
  433. defer e.wg.Done()
  434. for {
  435. frame, err := e.frameQueue.Pop(ctx)
  436. if err != nil {
  437. if ctx.Err() != nil {
  438. return
  439. }
  440. if errors.Is(err, output.ErrFrameQueueClosed) {
  441. return
  442. }
  443. e.lastError.Store(err.Error())
  444. e.underruns.Add(1)
  445. continue
  446. }
  447. writeStart := time.Now()
  448. n, err := e.driver.Write(ctx, frame)
  449. writeDur := time.Since(writeStart)
  450. cycleDur := writeDur
  451. if !frame.GeneratedAt.IsZero() {
  452. cycleDur = time.Since(frame.GeneratedAt)
  453. }
  454. updateMaxDuration(&e.maxWriteNs, writeDur)
  455. updateMaxDuration(&e.maxCycleNs, cycleDur)
  456. queueStats := e.frameQueue.Stats()
  457. e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers())
  458. if cycleDur > e.chunkDuration {
  459. late := e.lateBuffers.Add(1)
  460. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  461. if late <= 5 || late%20 == 0 {
  462. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  463. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  464. }
  465. }
  466. if err != nil {
  467. if ctx.Err() != nil {
  468. return
  469. }
  470. e.lastError.Store(err.Error())
  471. e.underruns.Add(1)
  472. select {
  473. case <-time.After(e.chunkDuration):
  474. case <-ctx.Done():
  475. return
  476. }
  477. continue
  478. }
  479. e.chunksProduced.Add(1)
  480. e.totalSamples.Add(uint64(n))
  481. }
  482. }
  483. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  484. if src == nil {
  485. return nil
  486. }
  487. samples := make([]output.IQSample, len(src.Samples))
  488. copy(samples, src.Samples)
  489. return &output.CompositeFrame{
  490. Samples: samples,
  491. SampleRateHz: src.SampleRateHz,
  492. Timestamp: src.Timestamp,
  493. GeneratedAt: src.GeneratedAt,
  494. Sequence: src.Sequence,
  495. }
  496. }
  497. func (e *Engine) setRuntimeState(state RuntimeState) {
  498. e.runtimeState.Store(state)
  499. }
  500. func (e *Engine) currentRuntimeState() RuntimeState {
  501. if v := e.runtimeState.Load(); v != nil {
  502. if rs, ok := v.(RuntimeState); ok {
  503. return rs
  504. }
  505. }
  506. return RuntimeStateIdle
  507. }
  508. func (e *Engine) hasRecentLateBuffers() bool {
  509. lateAlertAt := e.lateBufferAlertAt.Load()
  510. if lateAlertAt == 0 {
  511. return false
  512. }
  513. return time.Since(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  514. }
  515. func (e *Engine) evaluateRuntimeState(queue output.QueueStats, hasLateBuffers bool) {
  516. state := e.currentRuntimeState()
  517. switch state {
  518. case RuntimeStateStopping, RuntimeStateFaulted:
  519. return
  520. }
  521. if state == RuntimeStatePrebuffering {
  522. if queue.Depth >= 1 {
  523. e.setRuntimeState(RuntimeStateRunning)
  524. }
  525. return
  526. }
  527. critical := queue.Health == output.QueueHealthCritical
  528. if critical {
  529. if e.criticalStreak.Add(1) >= queueCriticalStreakThreshold {
  530. e.setRuntimeState(RuntimeStateDegraded)
  531. return
  532. }
  533. } else {
  534. e.criticalStreak.Store(0)
  535. }
  536. if hasLateBuffers {
  537. e.setRuntimeState(RuntimeStateDegraded)
  538. return
  539. }
  540. e.setRuntimeState(RuntimeStateRunning)
  541. }