Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

510 lines
13KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  36. v := uint64(d)
  37. for {
  38. cur := dst.Load()
  39. if v <= cur {
  40. return
  41. }
  42. if dst.CompareAndSwap(cur, v) {
  43. return
  44. }
  45. }
  46. }
  47. func durationMs(ns uint64) float64 {
  48. return float64(ns) / float64(time.Millisecond)
  49. }
  50. type EngineStats struct {
  51. State string `json:"state"`
  52. ChunksProduced uint64 `json:"chunksProduced"`
  53. TotalSamples uint64 `json:"totalSamples"`
  54. Underruns uint64 `json:"underruns"`
  55. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  56. LastError string `json:"lastError,omitempty"`
  57. UptimeSeconds float64 `json:"uptimeSeconds"`
  58. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  59. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  60. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  61. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  62. Queue output.QueueStats `json:"queue"`
  63. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  64. }
  65. type RuntimeIndicator string
  66. const (
  67. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  68. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  69. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  70. )
  71. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  72. // resamples to device rate, and pushes to hardware in a tight loop.
  73. // The hardware buffer_push call is blocking — it returns when the hardware
  74. // has consumed the previous buffer and is ready for the next one.
  75. // This naturally paces the loop to real-time without a ticker.
  76. type Engine struct {
  77. cfg cfgpkg.Config
  78. driver platform.SoapyDriver
  79. generator *offpkg.Generator
  80. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  81. chunkDuration time.Duration
  82. deviceRate float64
  83. frameQueue *output.FrameQueue
  84. mu sync.Mutex
  85. state EngineState
  86. cancel context.CancelFunc
  87. startedAt time.Time
  88. wg sync.WaitGroup
  89. chunksProduced atomic.Uint64
  90. totalSamples atomic.Uint64
  91. underruns atomic.Uint64
  92. lateBuffers atomic.Uint64
  93. maxCycleNs atomic.Uint64
  94. maxGenerateNs atomic.Uint64
  95. maxUpsampleNs atomic.Uint64
  96. maxWriteNs atomic.Uint64
  97. lastError atomic.Value // string
  98. // Live config: pending frequency change, applied between chunks
  99. pendingFreq atomic.Pointer[float64]
  100. // Live audio stream (optional)
  101. streamSrc *audio.StreamSource
  102. }
  103. // SetStreamSource configures a live audio stream as the audio source.
  104. // Must be called before Start(). The StreamResampler is created internally
  105. // to convert from the stream's sample rate to the DSP composite rate.
  106. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  107. e.streamSrc = src
  108. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  109. if compositeRate <= 0 {
  110. compositeRate = 228000
  111. }
  112. resampler := audio.NewStreamResampler(src, compositeRate)
  113. e.generator.SetExternalSource(resampler)
  114. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  115. src.SampleRate, compositeRate, src.Stats().Capacity)
  116. }
  117. // StreamSource returns the live audio stream source, or nil.
  118. // Used by the control server for stats and HTTP audio ingest.
  119. func (e *Engine) StreamSource() *audio.StreamSource {
  120. return e.streamSrc
  121. }
  122. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  123. deviceRate := cfg.EffectiveDeviceRate()
  124. compositeRate := float64(cfg.FM.CompositeRateHz)
  125. if compositeRate <= 0 {
  126. compositeRate = 228000
  127. }
  128. var upsampler *dsp.FMUpsampler
  129. if deviceRate > compositeRate*1.001 {
  130. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  131. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  132. // This halves CPU load compared to running all DSP at deviceRate.
  133. cfg.FM.FMModulationEnabled = false
  134. maxDev := cfg.FM.MaxDeviationHz
  135. if maxDev <= 0 {
  136. maxDev = 75000
  137. }
  138. // mpxGain scales the FM deviation to compensate for hardware
  139. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  140. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  141. maxDev *= cfg.FM.MpxGain
  142. }
  143. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  144. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  145. compositeRate, deviceRate, deviceRate/compositeRate)
  146. } else {
  147. // Same-rate: entire DSP chain runs at deviceRate.
  148. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  149. if deviceRate > 0 {
  150. cfg.FM.CompositeRateHz = int(deviceRate)
  151. }
  152. cfg.FM.FMModulationEnabled = true
  153. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  154. }
  155. return &Engine{
  156. cfg: cfg,
  157. driver: driver,
  158. generator: offpkg.NewGenerator(cfg),
  159. upsampler: upsampler,
  160. chunkDuration: 50 * time.Millisecond,
  161. deviceRate: deviceRate,
  162. state: EngineIdle,
  163. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  164. }
  165. }
  166. func (e *Engine) SetChunkDuration(d time.Duration) {
  167. e.chunkDuration = d
  168. }
  169. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  170. // nil pointers mean "no change". Validated before applying.
  171. type LiveConfigUpdate struct {
  172. FrequencyMHz *float64
  173. OutputDrive *float64
  174. StereoEnabled *bool
  175. PilotLevel *float64
  176. RDSInjection *float64
  177. RDSEnabled *bool
  178. LimiterEnabled *bool
  179. LimiterCeiling *float64
  180. PS *string
  181. RadioText *string
  182. }
  183. // UpdateConfig applies live parameter changes without restarting the engine.
  184. // DSP params take effect at the next chunk boundary (~50ms max).
  185. // Frequency changes are applied between chunks via driver.Tune().
  186. // RDS text updates are applied at the next RDS group boundary (~88ms).
  187. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  188. // --- Validate ---
  189. if u.FrequencyMHz != nil {
  190. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  191. return fmt.Errorf("frequencyMHz out of range (65-110)")
  192. }
  193. }
  194. if u.OutputDrive != nil {
  195. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  196. return fmt.Errorf("outputDrive out of range (0-10)")
  197. }
  198. }
  199. if u.PilotLevel != nil {
  200. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  201. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  202. }
  203. }
  204. if u.RDSInjection != nil {
  205. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  206. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  207. }
  208. }
  209. if u.LimiterCeiling != nil {
  210. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  211. return fmt.Errorf("limiterCeiling out of range (0-2)")
  212. }
  213. }
  214. // --- Frequency: store for run loop to apply via driver.Tune() ---
  215. if u.FrequencyMHz != nil {
  216. freqHz := *u.FrequencyMHz * 1e6
  217. e.pendingFreq.Store(&freqHz)
  218. }
  219. // --- RDS text: forward to encoder atomics ---
  220. if u.PS != nil || u.RadioText != nil {
  221. if enc := e.generator.RDSEncoder(); enc != nil {
  222. ps, rt := "", ""
  223. if u.PS != nil {
  224. ps = *u.PS
  225. }
  226. if u.RadioText != nil {
  227. rt = *u.RadioText
  228. }
  229. enc.UpdateText(ps, rt)
  230. }
  231. }
  232. // --- DSP params: build new LiveParams from current + patch ---
  233. // Read current, apply deltas, store new
  234. current := e.generator.CurrentLiveParams()
  235. next := current // copy
  236. if u.OutputDrive != nil {
  237. next.OutputDrive = *u.OutputDrive
  238. }
  239. if u.StereoEnabled != nil {
  240. next.StereoEnabled = *u.StereoEnabled
  241. }
  242. if u.PilotLevel != nil {
  243. next.PilotLevel = *u.PilotLevel
  244. }
  245. if u.RDSInjection != nil {
  246. next.RDSInjection = *u.RDSInjection
  247. }
  248. if u.RDSEnabled != nil {
  249. next.RDSEnabled = *u.RDSEnabled
  250. }
  251. if u.LimiterEnabled != nil {
  252. next.LimiterEnabled = *u.LimiterEnabled
  253. }
  254. if u.LimiterCeiling != nil {
  255. next.LimiterCeiling = *u.LimiterCeiling
  256. }
  257. e.generator.UpdateLive(next)
  258. return nil
  259. }
  260. func (e *Engine) Start(ctx context.Context) error {
  261. e.mu.Lock()
  262. if e.state != EngineIdle {
  263. e.mu.Unlock()
  264. return fmt.Errorf("engine already in state %s", e.state)
  265. }
  266. if err := e.driver.Start(ctx); err != nil {
  267. e.mu.Unlock()
  268. return fmt.Errorf("driver start: %w", err)
  269. }
  270. runCtx, cancel := context.WithCancel(ctx)
  271. e.cancel = cancel
  272. e.state = EngineRunning
  273. e.startedAt = time.Now()
  274. e.wg.Add(1)
  275. e.mu.Unlock()
  276. go e.run(runCtx)
  277. return nil
  278. }
  279. func (e *Engine) Stop(ctx context.Context) error {
  280. e.mu.Lock()
  281. if e.state != EngineRunning {
  282. e.mu.Unlock()
  283. return nil
  284. }
  285. e.state = EngineStopping
  286. e.cancel()
  287. e.mu.Unlock()
  288. // Wait for run() goroutine to exit — deterministic, no guessing
  289. e.wg.Wait()
  290. if err := e.driver.Flush(ctx); err != nil {
  291. return err
  292. }
  293. if err := e.driver.Stop(ctx); err != nil {
  294. return err
  295. }
  296. e.mu.Lock()
  297. e.state = EngineIdle
  298. e.mu.Unlock()
  299. return nil
  300. }
  301. func (e *Engine) Stats() EngineStats {
  302. e.mu.Lock()
  303. state := e.state
  304. startedAt := e.startedAt
  305. e.mu.Unlock()
  306. var uptime float64
  307. if state == EngineRunning {
  308. uptime = time.Since(startedAt).Seconds()
  309. }
  310. errVal, _ := e.lastError.Load().(string)
  311. queue := e.frameQueue.Stats()
  312. lateBuffers := e.lateBuffers.Load()
  313. return EngineStats{
  314. State: state.String(),
  315. ChunksProduced: e.chunksProduced.Load(),
  316. TotalSamples: e.totalSamples.Load(),
  317. Underruns: e.underruns.Load(),
  318. LateBuffers: lateBuffers,
  319. LastError: errVal,
  320. UptimeSeconds: uptime,
  321. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  322. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  323. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  324. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  325. Queue: queue,
  326. RuntimeIndicator: runtimeIndicator(queue.Health, lateBuffers),
  327. }
  328. }
  329. func runtimeIndicator(queueHealth output.QueueHealth, lateBuffers uint64) RuntimeIndicator {
  330. switch {
  331. case queueHealth == output.QueueHealthCritical:
  332. return RuntimeIndicatorQueueCritical
  333. case queueHealth == output.QueueHealthLow || lateBuffers > 0:
  334. return RuntimeIndicatorDegraded
  335. default:
  336. return RuntimeIndicatorNormal
  337. }
  338. }
  339. func (e *Engine) run(ctx context.Context) {
  340. e.wg.Add(1)
  341. go e.writerLoop(ctx)
  342. defer e.wg.Done()
  343. for {
  344. if ctx.Err() != nil {
  345. return
  346. }
  347. // Apply pending frequency change between chunks
  348. if pf := e.pendingFreq.Swap(nil); pf != nil {
  349. if err := e.driver.Tune(ctx, *pf); err != nil {
  350. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  351. } else {
  352. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  353. }
  354. }
  355. t0 := time.Now()
  356. frame := e.generator.GenerateFrame(e.chunkDuration)
  357. frame.GeneratedAt = t0
  358. t1 := time.Now()
  359. if e.upsampler != nil {
  360. frame = e.upsampler.Process(frame)
  361. frame.GeneratedAt = t0
  362. }
  363. t2 := time.Now()
  364. genDur := t1.Sub(t0)
  365. upDur := t2.Sub(t1)
  366. updateMaxDuration(&e.maxGenerateNs, genDur)
  367. updateMaxDuration(&e.maxUpsampleNs, upDur)
  368. enqueued := cloneFrame(frame)
  369. if enqueued == nil {
  370. e.lastError.Store("engine: frame clone failed")
  371. e.underruns.Add(1)
  372. continue
  373. }
  374. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  375. if ctx.Err() != nil {
  376. return
  377. }
  378. if errors.Is(err, output.ErrFrameQueueClosed) {
  379. return
  380. }
  381. e.lastError.Store(err.Error())
  382. e.underruns.Add(1)
  383. select {
  384. case <-time.After(e.chunkDuration):
  385. case <-ctx.Done():
  386. return
  387. }
  388. continue
  389. }
  390. }
  391. }
  392. func (e *Engine) writerLoop(ctx context.Context) {
  393. defer e.wg.Done()
  394. for {
  395. frame, err := e.frameQueue.Pop(ctx)
  396. if err != nil {
  397. if ctx.Err() != nil {
  398. return
  399. }
  400. if errors.Is(err, output.ErrFrameQueueClosed) {
  401. return
  402. }
  403. e.lastError.Store(err.Error())
  404. e.underruns.Add(1)
  405. continue
  406. }
  407. writeStart := time.Now()
  408. n, err := e.driver.Write(ctx, frame)
  409. writeDur := time.Since(writeStart)
  410. cycleDur := writeDur
  411. if !frame.GeneratedAt.IsZero() {
  412. cycleDur = time.Since(frame.GeneratedAt)
  413. }
  414. updateMaxDuration(&e.maxWriteNs, writeDur)
  415. updateMaxDuration(&e.maxCycleNs, cycleDur)
  416. if cycleDur > e.chunkDuration {
  417. late := e.lateBuffers.Add(1)
  418. if late <= 5 || late%20 == 0 {
  419. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  420. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  421. }
  422. }
  423. if err != nil {
  424. if ctx.Err() != nil {
  425. return
  426. }
  427. e.lastError.Store(err.Error())
  428. e.underruns.Add(1)
  429. select {
  430. case <-time.After(e.chunkDuration):
  431. case <-ctx.Done():
  432. return
  433. }
  434. continue
  435. }
  436. e.chunksProduced.Add(1)
  437. e.totalSamples.Add(uint64(n))
  438. }
  439. }
  440. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  441. if src == nil {
  442. return nil
  443. }
  444. samples := make([]output.IQSample, len(src.Samples))
  445. copy(samples, src.Samples)
  446. return &output.CompositeFrame{
  447. Samples: samples,
  448. SampleRateHz: src.SampleRateHz,
  449. Timestamp: src.Timestamp,
  450. GeneratedAt: src.GeneratedAt,
  451. Sequence: src.Sequence,
  452. }
  453. }