Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

533 lines
14KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  36. v := uint64(d)
  37. for {
  38. cur := dst.Load()
  39. if v <= cur {
  40. return
  41. }
  42. if dst.CompareAndSwap(cur, v) {
  43. return
  44. }
  45. }
  46. }
  47. func durationMs(ns uint64) float64 {
  48. return float64(ns) / float64(time.Millisecond)
  49. }
  50. type EngineStats struct {
  51. State string `json:"state"`
  52. ChunksProduced uint64 `json:"chunksProduced"`
  53. TotalSamples uint64 `json:"totalSamples"`
  54. Underruns uint64 `json:"underruns"`
  55. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  56. LastError string `json:"lastError,omitempty"`
  57. UptimeSeconds float64 `json:"uptimeSeconds"`
  58. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  59. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  60. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  61. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  62. Queue output.QueueStats `json:"queue"`
  63. RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
  64. RuntimeAlert string `json:"runtimeAlert,omitempty"`
  65. }
  66. type RuntimeIndicator string
  67. const (
  68. RuntimeIndicatorNormal RuntimeIndicator = "normal"
  69. RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
  70. RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
  71. )
  72. const lateBufferIndicatorWindow = 5 * time.Second
  73. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  74. // resamples to device rate, and pushes to hardware in a tight loop.
  75. // The hardware buffer_push call is blocking — it returns when the hardware
  76. // has consumed the previous buffer and is ready for the next one.
  77. // This naturally paces the loop to real-time without a ticker.
  78. type Engine struct {
  79. cfg cfgpkg.Config
  80. driver platform.SoapyDriver
  81. generator *offpkg.Generator
  82. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  83. chunkDuration time.Duration
  84. deviceRate float64
  85. frameQueue *output.FrameQueue
  86. mu sync.Mutex
  87. state EngineState
  88. cancel context.CancelFunc
  89. startedAt time.Time
  90. wg sync.WaitGroup
  91. chunksProduced atomic.Uint64
  92. totalSamples atomic.Uint64
  93. underruns atomic.Uint64
  94. lateBuffers atomic.Uint64
  95. lateBufferAlertAt atomic.Uint64
  96. maxCycleNs atomic.Uint64
  97. maxGenerateNs atomic.Uint64
  98. maxUpsampleNs atomic.Uint64
  99. maxWriteNs atomic.Uint64
  100. lastError atomic.Value // string
  101. // Live config: pending frequency change, applied between chunks
  102. pendingFreq atomic.Pointer[float64]
  103. // Live audio stream (optional)
  104. streamSrc *audio.StreamSource
  105. }
  106. // SetStreamSource configures a live audio stream as the audio source.
  107. // Must be called before Start(). The StreamResampler is created internally
  108. // to convert from the stream's sample rate to the DSP composite rate.
  109. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  110. e.streamSrc = src
  111. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  112. if compositeRate <= 0 {
  113. compositeRate = 228000
  114. }
  115. resampler := audio.NewStreamResampler(src, compositeRate)
  116. e.generator.SetExternalSource(resampler)
  117. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  118. src.SampleRate, compositeRate, src.Stats().Capacity)
  119. }
  120. // StreamSource returns the live audio stream source, or nil.
  121. // Used by the control server for stats and HTTP audio ingest.
  122. func (e *Engine) StreamSource() *audio.StreamSource {
  123. return e.streamSrc
  124. }
  125. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  126. deviceRate := cfg.EffectiveDeviceRate()
  127. compositeRate := float64(cfg.FM.CompositeRateHz)
  128. if compositeRate <= 0 {
  129. compositeRate = 228000
  130. }
  131. var upsampler *dsp.FMUpsampler
  132. if deviceRate > compositeRate*1.001 {
  133. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  134. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  135. // This halves CPU load compared to running all DSP at deviceRate.
  136. cfg.FM.FMModulationEnabled = false
  137. maxDev := cfg.FM.MaxDeviationHz
  138. if maxDev <= 0 {
  139. maxDev = 75000
  140. }
  141. // mpxGain scales the FM deviation to compensate for hardware
  142. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  143. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  144. maxDev *= cfg.FM.MpxGain
  145. }
  146. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  147. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  148. compositeRate, deviceRate, deviceRate/compositeRate)
  149. } else {
  150. // Same-rate: entire DSP chain runs at deviceRate.
  151. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  152. if deviceRate > 0 {
  153. cfg.FM.CompositeRateHz = int(deviceRate)
  154. }
  155. cfg.FM.FMModulationEnabled = true
  156. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  157. }
  158. return &Engine{
  159. cfg: cfg,
  160. driver: driver,
  161. generator: offpkg.NewGenerator(cfg),
  162. upsampler: upsampler,
  163. chunkDuration: 50 * time.Millisecond,
  164. deviceRate: deviceRate,
  165. state: EngineIdle,
  166. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  167. }
  168. }
  169. func (e *Engine) SetChunkDuration(d time.Duration) {
  170. e.chunkDuration = d
  171. }
  172. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  173. // nil pointers mean "no change". Validated before applying.
  174. type LiveConfigUpdate struct {
  175. FrequencyMHz *float64
  176. OutputDrive *float64
  177. StereoEnabled *bool
  178. PilotLevel *float64
  179. RDSInjection *float64
  180. RDSEnabled *bool
  181. LimiterEnabled *bool
  182. LimiterCeiling *float64
  183. PS *string
  184. RadioText *string
  185. }
  186. // UpdateConfig applies live parameter changes without restarting the engine.
  187. // DSP params take effect at the next chunk boundary (~50ms max).
  188. // Frequency changes are applied between chunks via driver.Tune().
  189. // RDS text updates are applied at the next RDS group boundary (~88ms).
  190. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  191. // --- Validate ---
  192. if u.FrequencyMHz != nil {
  193. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  194. return fmt.Errorf("frequencyMHz out of range (65-110)")
  195. }
  196. }
  197. if u.OutputDrive != nil {
  198. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  199. return fmt.Errorf("outputDrive out of range (0-10)")
  200. }
  201. }
  202. if u.PilotLevel != nil {
  203. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  204. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  205. }
  206. }
  207. if u.RDSInjection != nil {
  208. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  209. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  210. }
  211. }
  212. if u.LimiterCeiling != nil {
  213. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  214. return fmt.Errorf("limiterCeiling out of range (0-2)")
  215. }
  216. }
  217. // --- Frequency: store for run loop to apply via driver.Tune() ---
  218. if u.FrequencyMHz != nil {
  219. freqHz := *u.FrequencyMHz * 1e6
  220. e.pendingFreq.Store(&freqHz)
  221. }
  222. // --- RDS text: forward to encoder atomics ---
  223. if u.PS != nil || u.RadioText != nil {
  224. if enc := e.generator.RDSEncoder(); enc != nil {
  225. ps, rt := "", ""
  226. if u.PS != nil {
  227. ps = *u.PS
  228. }
  229. if u.RadioText != nil {
  230. rt = *u.RadioText
  231. }
  232. enc.UpdateText(ps, rt)
  233. }
  234. }
  235. // --- DSP params: build new LiveParams from current + patch ---
  236. // Read current, apply deltas, store new
  237. current := e.generator.CurrentLiveParams()
  238. next := current // copy
  239. if u.OutputDrive != nil {
  240. next.OutputDrive = *u.OutputDrive
  241. }
  242. if u.StereoEnabled != nil {
  243. next.StereoEnabled = *u.StereoEnabled
  244. }
  245. if u.PilotLevel != nil {
  246. next.PilotLevel = *u.PilotLevel
  247. }
  248. if u.RDSInjection != nil {
  249. next.RDSInjection = *u.RDSInjection
  250. }
  251. if u.RDSEnabled != nil {
  252. next.RDSEnabled = *u.RDSEnabled
  253. }
  254. if u.LimiterEnabled != nil {
  255. next.LimiterEnabled = *u.LimiterEnabled
  256. }
  257. if u.LimiterCeiling != nil {
  258. next.LimiterCeiling = *u.LimiterCeiling
  259. }
  260. e.generator.UpdateLive(next)
  261. return nil
  262. }
  263. func (e *Engine) Start(ctx context.Context) error {
  264. e.mu.Lock()
  265. if e.state != EngineIdle {
  266. e.mu.Unlock()
  267. return fmt.Errorf("engine already in state %s", e.state)
  268. }
  269. if err := e.driver.Start(ctx); err != nil {
  270. e.mu.Unlock()
  271. return fmt.Errorf("driver start: %w", err)
  272. }
  273. runCtx, cancel := context.WithCancel(ctx)
  274. e.cancel = cancel
  275. e.state = EngineRunning
  276. e.startedAt = time.Now()
  277. e.wg.Add(1)
  278. e.mu.Unlock()
  279. go e.run(runCtx)
  280. return nil
  281. }
  282. func (e *Engine) Stop(ctx context.Context) error {
  283. e.mu.Lock()
  284. if e.state != EngineRunning {
  285. e.mu.Unlock()
  286. return nil
  287. }
  288. e.state = EngineStopping
  289. e.cancel()
  290. e.mu.Unlock()
  291. // Wait for run() goroutine to exit — deterministic, no guessing
  292. e.wg.Wait()
  293. if err := e.driver.Flush(ctx); err != nil {
  294. return err
  295. }
  296. if err := e.driver.Stop(ctx); err != nil {
  297. return err
  298. }
  299. e.mu.Lock()
  300. e.state = EngineIdle
  301. e.mu.Unlock()
  302. return nil
  303. }
  304. func (e *Engine) Stats() EngineStats {
  305. e.mu.Lock()
  306. state := e.state
  307. startedAt := e.startedAt
  308. e.mu.Unlock()
  309. var uptime float64
  310. if state == EngineRunning {
  311. uptime = time.Since(startedAt).Seconds()
  312. }
  313. errVal, _ := e.lastError.Load().(string)
  314. queue := e.frameQueue.Stats()
  315. lateBuffers := e.lateBuffers.Load()
  316. now := time.Now()
  317. lateAlertAt := e.lateBufferAlertAt.Load()
  318. hasRecentLateBuffers := lateAlertAt > 0 && now.Sub(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
  319. ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
  320. return EngineStats{
  321. State: state.String(),
  322. ChunksProduced: e.chunksProduced.Load(),
  323. TotalSamples: e.totalSamples.Load(),
  324. Underruns: e.underruns.Load(),
  325. LateBuffers: lateBuffers,
  326. LastError: errVal,
  327. UptimeSeconds: uptime,
  328. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  329. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  330. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  331. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  332. Queue: queue,
  333. RuntimeIndicator: ri,
  334. RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
  335. }
  336. }
  337. func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
  338. switch {
  339. case queueHealth == output.QueueHealthCritical:
  340. return RuntimeIndicatorQueueCritical
  341. case queueHealth == output.QueueHealthLow || recentLateBuffers:
  342. return RuntimeIndicatorDegraded
  343. default:
  344. return RuntimeIndicatorNormal
  345. }
  346. }
  347. func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
  348. switch {
  349. case queueHealth == output.QueueHealthCritical:
  350. return "queue health critical"
  351. case recentLateBuffers:
  352. return "late buffers"
  353. case queueHealth == output.QueueHealthLow:
  354. return "queue health low"
  355. default:
  356. return ""
  357. }
  358. }
  359. func (e *Engine) run(ctx context.Context) {
  360. e.wg.Add(1)
  361. go e.writerLoop(ctx)
  362. defer e.wg.Done()
  363. for {
  364. if ctx.Err() != nil {
  365. return
  366. }
  367. // Apply pending frequency change between chunks
  368. if pf := e.pendingFreq.Swap(nil); pf != nil {
  369. if err := e.driver.Tune(ctx, *pf); err != nil {
  370. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  371. } else {
  372. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  373. }
  374. }
  375. t0 := time.Now()
  376. frame := e.generator.GenerateFrame(e.chunkDuration)
  377. frame.GeneratedAt = t0
  378. t1 := time.Now()
  379. if e.upsampler != nil {
  380. frame = e.upsampler.Process(frame)
  381. frame.GeneratedAt = t0
  382. }
  383. t2 := time.Now()
  384. genDur := t1.Sub(t0)
  385. upDur := t2.Sub(t1)
  386. updateMaxDuration(&e.maxGenerateNs, genDur)
  387. updateMaxDuration(&e.maxUpsampleNs, upDur)
  388. enqueued := cloneFrame(frame)
  389. if enqueued == nil {
  390. e.lastError.Store("engine: frame clone failed")
  391. e.underruns.Add(1)
  392. continue
  393. }
  394. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  395. if ctx.Err() != nil {
  396. return
  397. }
  398. if errors.Is(err, output.ErrFrameQueueClosed) {
  399. return
  400. }
  401. e.lastError.Store(err.Error())
  402. e.underruns.Add(1)
  403. select {
  404. case <-time.After(e.chunkDuration):
  405. case <-ctx.Done():
  406. return
  407. }
  408. continue
  409. }
  410. }
  411. }
  412. func (e *Engine) writerLoop(ctx context.Context) {
  413. defer e.wg.Done()
  414. for {
  415. frame, err := e.frameQueue.Pop(ctx)
  416. if err != nil {
  417. if ctx.Err() != nil {
  418. return
  419. }
  420. if errors.Is(err, output.ErrFrameQueueClosed) {
  421. return
  422. }
  423. e.lastError.Store(err.Error())
  424. e.underruns.Add(1)
  425. continue
  426. }
  427. writeStart := time.Now()
  428. n, err := e.driver.Write(ctx, frame)
  429. writeDur := time.Since(writeStart)
  430. cycleDur := writeDur
  431. if !frame.GeneratedAt.IsZero() {
  432. cycleDur = time.Since(frame.GeneratedAt)
  433. }
  434. updateMaxDuration(&e.maxWriteNs, writeDur)
  435. updateMaxDuration(&e.maxCycleNs, cycleDur)
  436. if cycleDur > e.chunkDuration {
  437. late := e.lateBuffers.Add(1)
  438. e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
  439. if late <= 5 || late%20 == 0 {
  440. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  441. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  442. }
  443. }
  444. if err != nil {
  445. if ctx.Err() != nil {
  446. return
  447. }
  448. e.lastError.Store(err.Error())
  449. e.underruns.Add(1)
  450. select {
  451. case <-time.After(e.chunkDuration):
  452. case <-ctx.Done():
  453. return
  454. }
  455. continue
  456. }
  457. e.chunksProduced.Add(1)
  458. e.totalSamples.Add(uint64(n))
  459. }
  460. }
  461. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  462. if src == nil {
  463. return nil
  464. }
  465. samples := make([]output.IQSample, len(src.Samples))
  466. copy(samples, src.Samples)
  467. return &output.CompositeFrame{
  468. Samples: samples,
  469. SampleRateHz: src.SampleRateHz,
  470. Timestamp: src.Timestamp,
  471. GeneratedAt: src.GeneratedAt,
  472. Sequence: src.Sequence,
  473. }
  474. }