Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

309 lines
8.0KB

  1. package app
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  10. "github.com/jan/fm-rds-tx/internal/dsp"
  11. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  12. "github.com/jan/fm-rds-tx/internal/platform"
  13. )
  14. type EngineState int
  15. const (
  16. EngineIdle EngineState = iota
  17. EngineRunning
  18. EngineStopping
  19. )
  20. func (s EngineState) String() string {
  21. switch s {
  22. case EngineIdle:
  23. return "idle"
  24. case EngineRunning:
  25. return "running"
  26. case EngineStopping:
  27. return "stopping"
  28. default:
  29. return "unknown"
  30. }
  31. }
  32. type EngineStats struct {
  33. State string `json:"state"`
  34. ChunksProduced uint64 `json:"chunksProduced"`
  35. TotalSamples uint64 `json:"totalSamples"`
  36. Underruns uint64 `json:"underruns"`
  37. LastError string `json:"lastError,omitempty"`
  38. UptimeSeconds float64 `json:"uptimeSeconds"`
  39. }
  40. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  41. // resamples to device rate, and pushes to hardware in a tight loop.
  42. // The hardware buffer_push call is blocking — it returns when the hardware
  43. // has consumed the previous buffer and is ready for the next one.
  44. // This naturally paces the loop to real-time without a ticker.
  45. type Engine struct {
  46. cfg cfgpkg.Config
  47. driver platform.SoapyDriver
  48. generator *offpkg.Generator
  49. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  50. chunkDuration time.Duration
  51. deviceRate float64
  52. mu sync.Mutex
  53. state EngineState
  54. cancel context.CancelFunc
  55. startedAt time.Time
  56. wg sync.WaitGroup
  57. chunksProduced atomic.Uint64
  58. totalSamples atomic.Uint64
  59. underruns atomic.Uint64
  60. lastError atomic.Value // string
  61. // Live config: pending frequency change, applied between chunks
  62. pendingFreq atomic.Pointer[float64]
  63. }
  64. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  65. deviceRate := cfg.EffectiveDeviceRate()
  66. compositeRate := float64(cfg.FM.CompositeRateHz)
  67. if compositeRate <= 0 {
  68. compositeRate = 228000
  69. }
  70. var upsampler *dsp.FMUpsampler
  71. if deviceRate > compositeRate*1.001 {
  72. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  73. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  74. // This halves CPU load compared to running all DSP at deviceRate.
  75. cfg.FM.FMModulationEnabled = false
  76. maxDev := cfg.FM.MaxDeviationHz
  77. if maxDev <= 0 {
  78. maxDev = 75000
  79. }
  80. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  81. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  82. compositeRate, deviceRate, deviceRate/compositeRate)
  83. } else {
  84. // Same-rate: entire DSP chain runs at deviceRate.
  85. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  86. if deviceRate > 0 {
  87. cfg.FM.CompositeRateHz = int(deviceRate)
  88. }
  89. cfg.FM.FMModulationEnabled = true
  90. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  91. }
  92. return &Engine{
  93. cfg: cfg,
  94. driver: driver,
  95. generator: offpkg.NewGenerator(cfg),
  96. upsampler: upsampler,
  97. chunkDuration: 50 * time.Millisecond,
  98. deviceRate: deviceRate,
  99. state: EngineIdle,
  100. }
  101. }
  102. func (e *Engine) SetChunkDuration(d time.Duration) {
  103. e.chunkDuration = d
  104. }
  105. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  106. // nil pointers mean "no change". Validated before applying.
  107. type LiveConfigUpdate struct {
  108. FrequencyMHz *float64
  109. OutputDrive *float64
  110. StereoEnabled *bool
  111. PilotLevel *float64
  112. RDSInjection *float64
  113. RDSEnabled *bool
  114. LimiterEnabled *bool
  115. LimiterCeiling *float64
  116. PS *string
  117. RadioText *string
  118. }
  119. // UpdateConfig applies live parameter changes without restarting the engine.
  120. // DSP params take effect at the next chunk boundary (~50ms max).
  121. // Frequency changes are applied between chunks via driver.Tune().
  122. // RDS text updates are applied at the next RDS group boundary (~88ms).
  123. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  124. // --- Validate ---
  125. if u.FrequencyMHz != nil {
  126. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  127. return fmt.Errorf("frequencyMHz out of range (65-110)")
  128. }
  129. }
  130. if u.OutputDrive != nil {
  131. if *u.OutputDrive < 0 || *u.OutputDrive > 3 {
  132. return fmt.Errorf("outputDrive out of range (0-3)")
  133. }
  134. }
  135. if u.PilotLevel != nil {
  136. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  137. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  138. }
  139. }
  140. if u.RDSInjection != nil {
  141. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  142. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  143. }
  144. }
  145. if u.LimiterCeiling != nil {
  146. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  147. return fmt.Errorf("limiterCeiling out of range (0-2)")
  148. }
  149. }
  150. // --- Frequency: store for run loop to apply via driver.Tune() ---
  151. if u.FrequencyMHz != nil {
  152. freqHz := *u.FrequencyMHz * 1e6
  153. e.pendingFreq.Store(&freqHz)
  154. }
  155. // --- RDS text: forward to encoder atomics ---
  156. if u.PS != nil || u.RadioText != nil {
  157. if enc := e.generator.RDSEncoder(); enc != nil {
  158. ps, rt := "", ""
  159. if u.PS != nil { ps = *u.PS }
  160. if u.RadioText != nil { rt = *u.RadioText }
  161. enc.UpdateText(ps, rt)
  162. }
  163. }
  164. // --- DSP params: build new LiveParams from current + patch ---
  165. // Read current, apply deltas, store new
  166. current := e.generator.CurrentLiveParams()
  167. next := current // copy
  168. if u.OutputDrive != nil { next.OutputDrive = *u.OutputDrive }
  169. if u.StereoEnabled != nil { next.StereoEnabled = *u.StereoEnabled }
  170. if u.PilotLevel != nil { next.PilotLevel = *u.PilotLevel }
  171. if u.RDSInjection != nil { next.RDSInjection = *u.RDSInjection }
  172. if u.RDSEnabled != nil { next.RDSEnabled = *u.RDSEnabled }
  173. if u.LimiterEnabled != nil { next.LimiterEnabled = *u.LimiterEnabled }
  174. if u.LimiterCeiling != nil { next.LimiterCeiling = *u.LimiterCeiling }
  175. e.generator.UpdateLive(next)
  176. return nil
  177. }
  178. func (e *Engine) Start(ctx context.Context) error {
  179. e.mu.Lock()
  180. if e.state != EngineIdle {
  181. e.mu.Unlock()
  182. return fmt.Errorf("engine already in state %s", e.state)
  183. }
  184. if err := e.driver.Start(ctx); err != nil {
  185. e.mu.Unlock()
  186. return fmt.Errorf("driver start: %w", err)
  187. }
  188. runCtx, cancel := context.WithCancel(ctx)
  189. e.cancel = cancel
  190. e.state = EngineRunning
  191. e.startedAt = time.Now()
  192. e.wg.Add(1)
  193. e.mu.Unlock()
  194. go e.run(runCtx)
  195. return nil
  196. }
  197. func (e *Engine) Stop(ctx context.Context) error {
  198. e.mu.Lock()
  199. if e.state != EngineRunning {
  200. e.mu.Unlock()
  201. return nil
  202. }
  203. e.state = EngineStopping
  204. e.cancel()
  205. e.mu.Unlock()
  206. // Wait for run() goroutine to exit — deterministic, no guessing
  207. e.wg.Wait()
  208. if err := e.driver.Flush(ctx); err != nil {
  209. return err
  210. }
  211. if err := e.driver.Stop(ctx); err != nil {
  212. return err
  213. }
  214. e.mu.Lock()
  215. e.state = EngineIdle
  216. e.mu.Unlock()
  217. return nil
  218. }
  219. func (e *Engine) Stats() EngineStats {
  220. e.mu.Lock()
  221. state := e.state
  222. startedAt := e.startedAt
  223. e.mu.Unlock()
  224. var uptime float64
  225. if state == EngineRunning {
  226. uptime = time.Since(startedAt).Seconds()
  227. }
  228. errVal, _ := e.lastError.Load().(string)
  229. return EngineStats{
  230. State: state.String(),
  231. ChunksProduced: e.chunksProduced.Load(),
  232. TotalSamples: e.totalSamples.Load(),
  233. Underruns: e.underruns.Load(),
  234. LastError: errVal,
  235. UptimeSeconds: uptime,
  236. }
  237. }
  238. func (e *Engine) run(ctx context.Context) {
  239. defer e.wg.Done()
  240. for {
  241. if ctx.Err() != nil {
  242. return
  243. }
  244. // Apply pending frequency change between chunks
  245. if pf := e.pendingFreq.Swap(nil); pf != nil {
  246. if err := e.driver.Tune(ctx, *pf); err != nil {
  247. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  248. } else {
  249. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  250. }
  251. }
  252. frame := e.generator.GenerateFrame(e.chunkDuration)
  253. if e.upsampler != nil {
  254. frame = e.upsampler.Process(frame)
  255. }
  256. n, err := e.driver.Write(ctx, frame)
  257. if err != nil {
  258. if ctx.Err() != nil { return }
  259. e.lastError.Store(err.Error())
  260. e.underruns.Add(1)
  261. // Back off to avoid pegging CPU on persistent errors
  262. select {
  263. case <-time.After(e.chunkDuration):
  264. case <-ctx.Done():
  265. return
  266. }
  267. continue
  268. }
  269. e.chunksProduced.Add(1)
  270. e.totalSamples.Add(uint64(n))
  271. }
  272. }