Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

414 satır
11KB

  1. package app
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/jan/fm-rds-tx/internal/audio"
  10. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  11. "github.com/jan/fm-rds-tx/internal/dsp"
  12. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  13. "github.com/jan/fm-rds-tx/internal/platform"
  14. )
  15. type EngineState int
  16. const (
  17. EngineIdle EngineState = iota
  18. EngineRunning
  19. EngineStopping
  20. )
  21. func (s EngineState) String() string {
  22. switch s {
  23. case EngineIdle:
  24. return "idle"
  25. case EngineRunning:
  26. return "running"
  27. case EngineStopping:
  28. return "stopping"
  29. default:
  30. return "unknown"
  31. }
  32. }
  33. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  34. v := uint64(d)
  35. for {
  36. cur := dst.Load()
  37. if v <= cur {
  38. return
  39. }
  40. if dst.CompareAndSwap(cur, v) {
  41. return
  42. }
  43. }
  44. }
  45. func durationMs(ns uint64) float64 {
  46. return float64(ns) / float64(time.Millisecond)
  47. }
  48. type EngineStats struct {
  49. State string `json:"state"`
  50. ChunksProduced uint64 `json:"chunksProduced"`
  51. TotalSamples uint64 `json:"totalSamples"`
  52. Underruns uint64 `json:"underruns"`
  53. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  54. LastError string `json:"lastError,omitempty"`
  55. UptimeSeconds float64 `json:"uptimeSeconds"`
  56. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  57. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  58. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  59. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  60. }
  61. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  62. // resamples to device rate, and pushes to hardware in a tight loop.
  63. // The hardware buffer_push call is blocking — it returns when the hardware
  64. // has consumed the previous buffer and is ready for the next one.
  65. // This naturally paces the loop to real-time without a ticker.
  66. type Engine struct {
  67. cfg cfgpkg.Config
  68. driver platform.SoapyDriver
  69. generator *offpkg.Generator
  70. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  71. chunkDuration time.Duration
  72. deviceRate float64
  73. mu sync.Mutex
  74. state EngineState
  75. cancel context.CancelFunc
  76. startedAt time.Time
  77. wg sync.WaitGroup
  78. chunksProduced atomic.Uint64
  79. totalSamples atomic.Uint64
  80. underruns atomic.Uint64
  81. lateBuffers atomic.Uint64
  82. maxCycleNs atomic.Uint64
  83. maxGenerateNs atomic.Uint64
  84. maxUpsampleNs atomic.Uint64
  85. maxWriteNs atomic.Uint64
  86. lastError atomic.Value // string
  87. // Live config: pending frequency change, applied between chunks
  88. pendingFreq atomic.Pointer[float64]
  89. // Live audio stream (optional)
  90. streamSrc *audio.StreamSource
  91. }
  92. // SetStreamSource configures a live audio stream as the audio source.
  93. // Must be called before Start(). The StreamResampler is created internally
  94. // to convert from the stream's sample rate to the DSP composite rate.
  95. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  96. e.streamSrc = src
  97. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  98. if compositeRate <= 0 {
  99. compositeRate = 228000
  100. }
  101. resampler := audio.NewStreamResampler(src, compositeRate)
  102. e.generator.SetExternalSource(resampler)
  103. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  104. src.SampleRate, compositeRate, src.Stats().Capacity)
  105. }
  106. // StreamSource returns the live audio stream source, or nil.
  107. // Used by the control server for stats and HTTP audio ingest.
  108. func (e *Engine) StreamSource() *audio.StreamSource {
  109. return e.streamSrc
  110. }
  111. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  112. deviceRate := cfg.EffectiveDeviceRate()
  113. compositeRate := float64(cfg.FM.CompositeRateHz)
  114. if compositeRate <= 0 {
  115. compositeRate = 228000
  116. }
  117. var upsampler *dsp.FMUpsampler
  118. if deviceRate > compositeRate*1.001 {
  119. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  120. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  121. // This halves CPU load compared to running all DSP at deviceRate.
  122. cfg.FM.FMModulationEnabled = false
  123. maxDev := cfg.FM.MaxDeviationHz
  124. if maxDev <= 0 {
  125. maxDev = 75000
  126. }
  127. // mpxGain scales the FM deviation to compensate for hardware
  128. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  129. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  130. maxDev *= cfg.FM.MpxGain
  131. }
  132. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  133. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  134. compositeRate, deviceRate, deviceRate/compositeRate)
  135. } else {
  136. // Same-rate: entire DSP chain runs at deviceRate.
  137. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  138. if deviceRate > 0 {
  139. cfg.FM.CompositeRateHz = int(deviceRate)
  140. }
  141. cfg.FM.FMModulationEnabled = true
  142. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  143. }
  144. return &Engine{
  145. cfg: cfg,
  146. driver: driver,
  147. generator: offpkg.NewGenerator(cfg),
  148. upsampler: upsampler,
  149. chunkDuration: 50 * time.Millisecond,
  150. deviceRate: deviceRate,
  151. state: EngineIdle,
  152. }
  153. }
  154. func (e *Engine) SetChunkDuration(d time.Duration) {
  155. e.chunkDuration = d
  156. }
  157. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  158. // nil pointers mean "no change". Validated before applying.
  159. type LiveConfigUpdate struct {
  160. FrequencyMHz *float64
  161. OutputDrive *float64
  162. StereoEnabled *bool
  163. PilotLevel *float64
  164. RDSInjection *float64
  165. RDSEnabled *bool
  166. LimiterEnabled *bool
  167. LimiterCeiling *float64
  168. PS *string
  169. RadioText *string
  170. }
  171. // UpdateConfig applies live parameter changes without restarting the engine.
  172. // DSP params take effect at the next chunk boundary (~50ms max).
  173. // Frequency changes are applied between chunks via driver.Tune().
  174. // RDS text updates are applied at the next RDS group boundary (~88ms).
  175. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  176. // --- Validate ---
  177. if u.FrequencyMHz != nil {
  178. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  179. return fmt.Errorf("frequencyMHz out of range (65-110)")
  180. }
  181. }
  182. if u.OutputDrive != nil {
  183. if *u.OutputDrive < 0 || *u.OutputDrive > 3 {
  184. return fmt.Errorf("outputDrive out of range (0-3)")
  185. }
  186. }
  187. if u.PilotLevel != nil {
  188. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  189. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  190. }
  191. }
  192. if u.RDSInjection != nil {
  193. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  194. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  195. }
  196. }
  197. if u.LimiterCeiling != nil {
  198. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  199. return fmt.Errorf("limiterCeiling out of range (0-2)")
  200. }
  201. }
  202. // --- Frequency: store for run loop to apply via driver.Tune() ---
  203. if u.FrequencyMHz != nil {
  204. freqHz := *u.FrequencyMHz * 1e6
  205. e.pendingFreq.Store(&freqHz)
  206. }
  207. // --- RDS text: forward to encoder atomics ---
  208. if u.PS != nil || u.RadioText != nil {
  209. if enc := e.generator.RDSEncoder(); enc != nil {
  210. ps, rt := "", ""
  211. if u.PS != nil {
  212. ps = *u.PS
  213. }
  214. if u.RadioText != nil {
  215. rt = *u.RadioText
  216. }
  217. enc.UpdateText(ps, rt)
  218. }
  219. }
  220. // --- DSP params: build new LiveParams from current + patch ---
  221. // Read current, apply deltas, store new
  222. current := e.generator.CurrentLiveParams()
  223. next := current // copy
  224. if u.OutputDrive != nil {
  225. next.OutputDrive = *u.OutputDrive
  226. }
  227. if u.StereoEnabled != nil {
  228. next.StereoEnabled = *u.StereoEnabled
  229. }
  230. if u.PilotLevel != nil {
  231. next.PilotLevel = *u.PilotLevel
  232. }
  233. if u.RDSInjection != nil {
  234. next.RDSInjection = *u.RDSInjection
  235. }
  236. if u.RDSEnabled != nil {
  237. next.RDSEnabled = *u.RDSEnabled
  238. }
  239. if u.LimiterEnabled != nil {
  240. next.LimiterEnabled = *u.LimiterEnabled
  241. }
  242. if u.LimiterCeiling != nil {
  243. next.LimiterCeiling = *u.LimiterCeiling
  244. }
  245. e.generator.UpdateLive(next)
  246. return nil
  247. }
  248. func (e *Engine) Start(ctx context.Context) error {
  249. e.mu.Lock()
  250. if e.state != EngineIdle {
  251. e.mu.Unlock()
  252. return fmt.Errorf("engine already in state %s", e.state)
  253. }
  254. if err := e.driver.Start(ctx); err != nil {
  255. e.mu.Unlock()
  256. return fmt.Errorf("driver start: %w", err)
  257. }
  258. runCtx, cancel := context.WithCancel(ctx)
  259. e.cancel = cancel
  260. e.state = EngineRunning
  261. e.startedAt = time.Now()
  262. e.wg.Add(1)
  263. e.mu.Unlock()
  264. go e.run(runCtx)
  265. return nil
  266. }
  267. func (e *Engine) Stop(ctx context.Context) error {
  268. e.mu.Lock()
  269. if e.state != EngineRunning {
  270. e.mu.Unlock()
  271. return nil
  272. }
  273. e.state = EngineStopping
  274. e.cancel()
  275. e.mu.Unlock()
  276. // Wait for run() goroutine to exit — deterministic, no guessing
  277. e.wg.Wait()
  278. if err := e.driver.Flush(ctx); err != nil {
  279. return err
  280. }
  281. if err := e.driver.Stop(ctx); err != nil {
  282. return err
  283. }
  284. e.mu.Lock()
  285. e.state = EngineIdle
  286. e.mu.Unlock()
  287. return nil
  288. }
  289. func (e *Engine) Stats() EngineStats {
  290. e.mu.Lock()
  291. state := e.state
  292. startedAt := e.startedAt
  293. e.mu.Unlock()
  294. var uptime float64
  295. if state == EngineRunning {
  296. uptime = time.Since(startedAt).Seconds()
  297. }
  298. errVal, _ := e.lastError.Load().(string)
  299. return EngineStats{
  300. State: state.String(),
  301. ChunksProduced: e.chunksProduced.Load(),
  302. TotalSamples: e.totalSamples.Load(),
  303. Underruns: e.underruns.Load(),
  304. LateBuffers: e.lateBuffers.Load(),
  305. LastError: errVal,
  306. UptimeSeconds: uptime,
  307. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  308. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  309. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  310. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  311. }
  312. }
  313. func (e *Engine) run(ctx context.Context) {
  314. defer e.wg.Done()
  315. for {
  316. if ctx.Err() != nil {
  317. return
  318. }
  319. // Apply pending frequency change between chunks
  320. if pf := e.pendingFreq.Swap(nil); pf != nil {
  321. if err := e.driver.Tune(ctx, *pf); err != nil {
  322. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  323. } else {
  324. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  325. }
  326. }
  327. t0 := time.Now()
  328. frame := e.generator.GenerateFrame(e.chunkDuration)
  329. t1 := time.Now()
  330. if e.upsampler != nil {
  331. frame = e.upsampler.Process(frame)
  332. }
  333. t2 := time.Now()
  334. n, err := e.driver.Write(ctx, frame)
  335. t3 := time.Now()
  336. genDur := t1.Sub(t0)
  337. upDur := t2.Sub(t1)
  338. writeDur := t3.Sub(t2)
  339. cycleDur := t3.Sub(t0)
  340. updateMaxDuration(&e.maxGenerateNs, genDur)
  341. updateMaxDuration(&e.maxUpsampleNs, upDur)
  342. updateMaxDuration(&e.maxWriteNs, writeDur)
  343. updateMaxDuration(&e.maxCycleNs, cycleDur)
  344. if cycleDur > e.chunkDuration {
  345. late := e.lateBuffers.Add(1)
  346. if late <= 5 || late%20 == 0 {
  347. log.Printf("TX LATE: cycle=%s budget=%s gen=%s up=%s write=%s over=%s",
  348. cycleDur, e.chunkDuration, genDur, upDur, writeDur, cycleDur-e.chunkDuration)
  349. }
  350. }
  351. if err != nil {
  352. if ctx.Err() != nil {
  353. return
  354. }
  355. e.lastError.Store(err.Error())
  356. e.underruns.Add(1)
  357. // Back off to avoid pegging CPU on persistent errors
  358. select {
  359. case <-time.After(e.chunkDuration):
  360. case <-ctx.Done():
  361. return
  362. }
  363. continue
  364. }
  365. e.chunksProduced.Add(1)
  366. e.totalSamples.Add(uint64(n))
  367. }
  368. }