Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

339 строки
9.1KB

  1. package app
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/jan/fm-rds-tx/internal/audio"
  10. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  11. "github.com/jan/fm-rds-tx/internal/dsp"
  12. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  13. "github.com/jan/fm-rds-tx/internal/platform"
  14. )
  15. type EngineState int
  16. const (
  17. EngineIdle EngineState = iota
  18. EngineRunning
  19. EngineStopping
  20. )
  21. func (s EngineState) String() string {
  22. switch s {
  23. case EngineIdle:
  24. return "idle"
  25. case EngineRunning:
  26. return "running"
  27. case EngineStopping:
  28. return "stopping"
  29. default:
  30. return "unknown"
  31. }
  32. }
  33. type EngineStats struct {
  34. State string `json:"state"`
  35. ChunksProduced uint64 `json:"chunksProduced"`
  36. TotalSamples uint64 `json:"totalSamples"`
  37. Underruns uint64 `json:"underruns"`
  38. LastError string `json:"lastError,omitempty"`
  39. UptimeSeconds float64 `json:"uptimeSeconds"`
  40. }
  41. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  42. // resamples to device rate, and pushes to hardware in a tight loop.
  43. // The hardware buffer_push call is blocking — it returns when the hardware
  44. // has consumed the previous buffer and is ready for the next one.
  45. // This naturally paces the loop to real-time without a ticker.
  46. type Engine struct {
  47. cfg cfgpkg.Config
  48. driver platform.SoapyDriver
  49. generator *offpkg.Generator
  50. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  51. chunkDuration time.Duration
  52. deviceRate float64
  53. mu sync.Mutex
  54. state EngineState
  55. cancel context.CancelFunc
  56. startedAt time.Time
  57. wg sync.WaitGroup
  58. chunksProduced atomic.Uint64
  59. totalSamples atomic.Uint64
  60. underruns atomic.Uint64
  61. lastError atomic.Value // string
  62. // Live config: pending frequency change, applied between chunks
  63. pendingFreq atomic.Pointer[float64]
  64. // Live audio stream (optional)
  65. streamSrc *audio.StreamSource
  66. }
  67. // SetStreamSource configures a live audio stream as the audio source.
  68. // Must be called before Start(). The StreamResampler is created internally
  69. // to convert from the stream's sample rate to the DSP composite rate.
  70. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  71. e.streamSrc = src
  72. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  73. if compositeRate <= 0 {
  74. compositeRate = 228000
  75. }
  76. resampler := audio.NewStreamResampler(src, compositeRate)
  77. e.generator.SetExternalSource(resampler)
  78. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  79. src.SampleRate, compositeRate, src.Stats().Capacity)
  80. }
  81. // StreamSource returns the live audio stream source, or nil.
  82. // Used by the control server for stats and HTTP audio ingest.
  83. func (e *Engine) StreamSource() *audio.StreamSource {
  84. return e.streamSrc
  85. }
  86. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  87. deviceRate := cfg.EffectiveDeviceRate()
  88. compositeRate := float64(cfg.FM.CompositeRateHz)
  89. if compositeRate <= 0 {
  90. compositeRate = 228000
  91. }
  92. var upsampler *dsp.FMUpsampler
  93. if deviceRate > compositeRate*1.001 {
  94. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  95. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  96. // This halves CPU load compared to running all DSP at deviceRate.
  97. cfg.FM.FMModulationEnabled = false
  98. maxDev := cfg.FM.MaxDeviationHz
  99. if maxDev <= 0 {
  100. maxDev = 75000
  101. }
  102. // mpxGain scales the FM deviation to compensate for hardware
  103. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  104. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  105. maxDev *= cfg.FM.MpxGain
  106. }
  107. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  108. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  109. compositeRate, deviceRate, deviceRate/compositeRate)
  110. } else {
  111. // Same-rate: entire DSP chain runs at deviceRate.
  112. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  113. if deviceRate > 0 {
  114. cfg.FM.CompositeRateHz = int(deviceRate)
  115. }
  116. cfg.FM.FMModulationEnabled = true
  117. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  118. }
  119. return &Engine{
  120. cfg: cfg,
  121. driver: driver,
  122. generator: offpkg.NewGenerator(cfg),
  123. upsampler: upsampler,
  124. chunkDuration: 50 * time.Millisecond,
  125. deviceRate: deviceRate,
  126. state: EngineIdle,
  127. }
  128. }
  129. func (e *Engine) SetChunkDuration(d time.Duration) {
  130. e.chunkDuration = d
  131. }
  132. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  133. // nil pointers mean "no change". Validated before applying.
  134. type LiveConfigUpdate struct {
  135. FrequencyMHz *float64
  136. OutputDrive *float64
  137. StereoEnabled *bool
  138. PilotLevel *float64
  139. RDSInjection *float64
  140. RDSEnabled *bool
  141. LimiterEnabled *bool
  142. LimiterCeiling *float64
  143. PS *string
  144. RadioText *string
  145. }
  146. // UpdateConfig applies live parameter changes without restarting the engine.
  147. // DSP params take effect at the next chunk boundary (~50ms max).
  148. // Frequency changes are applied between chunks via driver.Tune().
  149. // RDS text updates are applied at the next RDS group boundary (~88ms).
  150. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  151. // --- Validate ---
  152. if u.FrequencyMHz != nil {
  153. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  154. return fmt.Errorf("frequencyMHz out of range (65-110)")
  155. }
  156. }
  157. if u.OutputDrive != nil {
  158. if *u.OutputDrive < 0 || *u.OutputDrive > 3 {
  159. return fmt.Errorf("outputDrive out of range (0-3)")
  160. }
  161. }
  162. if u.PilotLevel != nil {
  163. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  164. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  165. }
  166. }
  167. if u.RDSInjection != nil {
  168. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  169. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  170. }
  171. }
  172. if u.LimiterCeiling != nil {
  173. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  174. return fmt.Errorf("limiterCeiling out of range (0-2)")
  175. }
  176. }
  177. // --- Frequency: store for run loop to apply via driver.Tune() ---
  178. if u.FrequencyMHz != nil {
  179. freqHz := *u.FrequencyMHz * 1e6
  180. e.pendingFreq.Store(&freqHz)
  181. }
  182. // --- RDS text: forward to encoder atomics ---
  183. if u.PS != nil || u.RadioText != nil {
  184. if enc := e.generator.RDSEncoder(); enc != nil {
  185. ps, rt := "", ""
  186. if u.PS != nil { ps = *u.PS }
  187. if u.RadioText != nil { rt = *u.RadioText }
  188. enc.UpdateText(ps, rt)
  189. }
  190. }
  191. // --- DSP params: build new LiveParams from current + patch ---
  192. // Read current, apply deltas, store new
  193. current := e.generator.CurrentLiveParams()
  194. next := current // copy
  195. if u.OutputDrive != nil { next.OutputDrive = *u.OutputDrive }
  196. if u.StereoEnabled != nil { next.StereoEnabled = *u.StereoEnabled }
  197. if u.PilotLevel != nil { next.PilotLevel = *u.PilotLevel }
  198. if u.RDSInjection != nil { next.RDSInjection = *u.RDSInjection }
  199. if u.RDSEnabled != nil { next.RDSEnabled = *u.RDSEnabled }
  200. if u.LimiterEnabled != nil { next.LimiterEnabled = *u.LimiterEnabled }
  201. if u.LimiterCeiling != nil { next.LimiterCeiling = *u.LimiterCeiling }
  202. e.generator.UpdateLive(next)
  203. return nil
  204. }
  205. func (e *Engine) Start(ctx context.Context) error {
  206. e.mu.Lock()
  207. if e.state != EngineIdle {
  208. e.mu.Unlock()
  209. return fmt.Errorf("engine already in state %s", e.state)
  210. }
  211. if err := e.driver.Start(ctx); err != nil {
  212. e.mu.Unlock()
  213. return fmt.Errorf("driver start: %w", err)
  214. }
  215. runCtx, cancel := context.WithCancel(ctx)
  216. e.cancel = cancel
  217. e.state = EngineRunning
  218. e.startedAt = time.Now()
  219. e.wg.Add(1)
  220. e.mu.Unlock()
  221. go e.run(runCtx)
  222. return nil
  223. }
  224. func (e *Engine) Stop(ctx context.Context) error {
  225. e.mu.Lock()
  226. if e.state != EngineRunning {
  227. e.mu.Unlock()
  228. return nil
  229. }
  230. e.state = EngineStopping
  231. e.cancel()
  232. e.mu.Unlock()
  233. // Wait for run() goroutine to exit — deterministic, no guessing
  234. e.wg.Wait()
  235. if err := e.driver.Flush(ctx); err != nil {
  236. return err
  237. }
  238. if err := e.driver.Stop(ctx); err != nil {
  239. return err
  240. }
  241. e.mu.Lock()
  242. e.state = EngineIdle
  243. e.mu.Unlock()
  244. return nil
  245. }
  246. func (e *Engine) Stats() EngineStats {
  247. e.mu.Lock()
  248. state := e.state
  249. startedAt := e.startedAt
  250. e.mu.Unlock()
  251. var uptime float64
  252. if state == EngineRunning {
  253. uptime = time.Since(startedAt).Seconds()
  254. }
  255. errVal, _ := e.lastError.Load().(string)
  256. return EngineStats{
  257. State: state.String(),
  258. ChunksProduced: e.chunksProduced.Load(),
  259. TotalSamples: e.totalSamples.Load(),
  260. Underruns: e.underruns.Load(),
  261. LastError: errVal,
  262. UptimeSeconds: uptime,
  263. }
  264. }
  265. func (e *Engine) run(ctx context.Context) {
  266. defer e.wg.Done()
  267. for {
  268. if ctx.Err() != nil {
  269. return
  270. }
  271. // Apply pending frequency change between chunks
  272. if pf := e.pendingFreq.Swap(nil); pf != nil {
  273. if err := e.driver.Tune(ctx, *pf); err != nil {
  274. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  275. } else {
  276. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  277. }
  278. }
  279. frame := e.generator.GenerateFrame(e.chunkDuration)
  280. if e.upsampler != nil {
  281. frame = e.upsampler.Process(frame)
  282. }
  283. n, err := e.driver.Write(ctx, frame)
  284. if err != nil {
  285. if ctx.Err() != nil { return }
  286. e.lastError.Store(err.Error())
  287. e.underruns.Add(1)
  288. // Back off to avoid pegging CPU on persistent errors
  289. select {
  290. case <-time.After(e.chunkDuration):
  291. case <-ctx.Done():
  292. return
  293. }
  294. continue
  295. }
  296. e.chunksProduced.Add(1)
  297. e.totalSamples.Add(uint64(n))
  298. }
  299. }