Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

487 строки
12KB

  1. package app
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  12. "github.com/jan/fm-rds-tx/internal/dsp"
  13. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  14. "github.com/jan/fm-rds-tx/internal/output"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. type EngineState int
  18. const (
  19. EngineIdle EngineState = iota
  20. EngineRunning
  21. EngineStopping
  22. )
  23. func (s EngineState) String() string {
  24. switch s {
  25. case EngineIdle:
  26. return "idle"
  27. case EngineRunning:
  28. return "running"
  29. case EngineStopping:
  30. return "stopping"
  31. default:
  32. return "unknown"
  33. }
  34. }
  35. func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
  36. v := uint64(d)
  37. for {
  38. cur := dst.Load()
  39. if v <= cur {
  40. return
  41. }
  42. if dst.CompareAndSwap(cur, v) {
  43. return
  44. }
  45. }
  46. }
  47. func durationMs(ns uint64) float64 {
  48. return float64(ns) / float64(time.Millisecond)
  49. }
  50. type EngineStats struct {
  51. State string `json:"state"`
  52. ChunksProduced uint64 `json:"chunksProduced"`
  53. TotalSamples uint64 `json:"totalSamples"`
  54. Underruns uint64 `json:"underruns"`
  55. LateBuffers uint64 `json:"lateBuffers,omitempty"`
  56. LastError string `json:"lastError,omitempty"`
  57. UptimeSeconds float64 `json:"uptimeSeconds"`
  58. MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
  59. MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
  60. MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
  61. MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
  62. Queue output.QueueStats `json:"queue"`
  63. }
  64. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  65. // resamples to device rate, and pushes to hardware in a tight loop.
  66. // The hardware buffer_push call is blocking — it returns when the hardware
  67. // has consumed the previous buffer and is ready for the next one.
  68. // This naturally paces the loop to real-time without a ticker.
  69. type Engine struct {
  70. cfg cfgpkg.Config
  71. driver platform.SoapyDriver
  72. generator *offpkg.Generator
  73. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  74. chunkDuration time.Duration
  75. deviceRate float64
  76. frameQueue *output.FrameQueue
  77. mu sync.Mutex
  78. state EngineState
  79. cancel context.CancelFunc
  80. startedAt time.Time
  81. wg sync.WaitGroup
  82. chunksProduced atomic.Uint64
  83. totalSamples atomic.Uint64
  84. underruns atomic.Uint64
  85. lateBuffers atomic.Uint64
  86. maxCycleNs atomic.Uint64
  87. maxGenerateNs atomic.Uint64
  88. maxUpsampleNs atomic.Uint64
  89. maxWriteNs atomic.Uint64
  90. lastError atomic.Value // string
  91. // Live config: pending frequency change, applied between chunks
  92. pendingFreq atomic.Pointer[float64]
  93. // Live audio stream (optional)
  94. streamSrc *audio.StreamSource
  95. }
  96. // SetStreamSource configures a live audio stream as the audio source.
  97. // Must be called before Start(). The StreamResampler is created internally
  98. // to convert from the stream's sample rate to the DSP composite rate.
  99. func (e *Engine) SetStreamSource(src *audio.StreamSource) {
  100. e.streamSrc = src
  101. compositeRate := float64(e.cfg.FM.CompositeRateHz)
  102. if compositeRate <= 0 {
  103. compositeRate = 228000
  104. }
  105. resampler := audio.NewStreamResampler(src, compositeRate)
  106. e.generator.SetExternalSource(resampler)
  107. log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
  108. src.SampleRate, compositeRate, src.Stats().Capacity)
  109. }
  110. // StreamSource returns the live audio stream source, or nil.
  111. // Used by the control server for stats and HTTP audio ingest.
  112. func (e *Engine) StreamSource() *audio.StreamSource {
  113. return e.streamSrc
  114. }
  115. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  116. deviceRate := cfg.EffectiveDeviceRate()
  117. compositeRate := float64(cfg.FM.CompositeRateHz)
  118. if compositeRate <= 0 {
  119. compositeRate = 228000
  120. }
  121. var upsampler *dsp.FMUpsampler
  122. if deviceRate > compositeRate*1.001 {
  123. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  124. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  125. // This halves CPU load compared to running all DSP at deviceRate.
  126. cfg.FM.FMModulationEnabled = false
  127. maxDev := cfg.FM.MaxDeviationHz
  128. if maxDev <= 0 {
  129. maxDev = 75000
  130. }
  131. // mpxGain scales the FM deviation to compensate for hardware
  132. // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
  133. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
  134. maxDev *= cfg.FM.MpxGain
  135. }
  136. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  137. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  138. compositeRate, deviceRate, deviceRate/compositeRate)
  139. } else {
  140. // Same-rate: entire DSP chain runs at deviceRate.
  141. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  142. if deviceRate > 0 {
  143. cfg.FM.CompositeRateHz = int(deviceRate)
  144. }
  145. cfg.FM.FMModulationEnabled = true
  146. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  147. }
  148. return &Engine{
  149. cfg: cfg,
  150. driver: driver,
  151. generator: offpkg.NewGenerator(cfg),
  152. upsampler: upsampler,
  153. chunkDuration: 50 * time.Millisecond,
  154. deviceRate: deviceRate,
  155. state: EngineIdle,
  156. frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
  157. }
  158. }
  159. func (e *Engine) SetChunkDuration(d time.Duration) {
  160. e.chunkDuration = d
  161. }
  162. // LiveConfigUpdate carries hot-reloadable parameters from the control API.
  163. // nil pointers mean "no change". Validated before applying.
  164. type LiveConfigUpdate struct {
  165. FrequencyMHz *float64
  166. OutputDrive *float64
  167. StereoEnabled *bool
  168. PilotLevel *float64
  169. RDSInjection *float64
  170. RDSEnabled *bool
  171. LimiterEnabled *bool
  172. LimiterCeiling *float64
  173. PS *string
  174. RadioText *string
  175. }
  176. // UpdateConfig applies live parameter changes without restarting the engine.
  177. // DSP params take effect at the next chunk boundary (~50ms max).
  178. // Frequency changes are applied between chunks via driver.Tune().
  179. // RDS text updates are applied at the next RDS group boundary (~88ms).
  180. func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
  181. // --- Validate ---
  182. if u.FrequencyMHz != nil {
  183. if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
  184. return fmt.Errorf("frequencyMHz out of range (65-110)")
  185. }
  186. }
  187. if u.OutputDrive != nil {
  188. if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
  189. return fmt.Errorf("outputDrive out of range (0-10)")
  190. }
  191. }
  192. if u.PilotLevel != nil {
  193. if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
  194. return fmt.Errorf("pilotLevel out of range (0-0.2)")
  195. }
  196. }
  197. if u.RDSInjection != nil {
  198. if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
  199. return fmt.Errorf("rdsInjection out of range (0-0.15)")
  200. }
  201. }
  202. if u.LimiterCeiling != nil {
  203. if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
  204. return fmt.Errorf("limiterCeiling out of range (0-2)")
  205. }
  206. }
  207. // --- Frequency: store for run loop to apply via driver.Tune() ---
  208. if u.FrequencyMHz != nil {
  209. freqHz := *u.FrequencyMHz * 1e6
  210. e.pendingFreq.Store(&freqHz)
  211. }
  212. // --- RDS text: forward to encoder atomics ---
  213. if u.PS != nil || u.RadioText != nil {
  214. if enc := e.generator.RDSEncoder(); enc != nil {
  215. ps, rt := "", ""
  216. if u.PS != nil {
  217. ps = *u.PS
  218. }
  219. if u.RadioText != nil {
  220. rt = *u.RadioText
  221. }
  222. enc.UpdateText(ps, rt)
  223. }
  224. }
  225. // --- DSP params: build new LiveParams from current + patch ---
  226. // Read current, apply deltas, store new
  227. current := e.generator.CurrentLiveParams()
  228. next := current // copy
  229. if u.OutputDrive != nil {
  230. next.OutputDrive = *u.OutputDrive
  231. }
  232. if u.StereoEnabled != nil {
  233. next.StereoEnabled = *u.StereoEnabled
  234. }
  235. if u.PilotLevel != nil {
  236. next.PilotLevel = *u.PilotLevel
  237. }
  238. if u.RDSInjection != nil {
  239. next.RDSInjection = *u.RDSInjection
  240. }
  241. if u.RDSEnabled != nil {
  242. next.RDSEnabled = *u.RDSEnabled
  243. }
  244. if u.LimiterEnabled != nil {
  245. next.LimiterEnabled = *u.LimiterEnabled
  246. }
  247. if u.LimiterCeiling != nil {
  248. next.LimiterCeiling = *u.LimiterCeiling
  249. }
  250. e.generator.UpdateLive(next)
  251. return nil
  252. }
  253. func (e *Engine) Start(ctx context.Context) error {
  254. e.mu.Lock()
  255. if e.state != EngineIdle {
  256. e.mu.Unlock()
  257. return fmt.Errorf("engine already in state %s", e.state)
  258. }
  259. if err := e.driver.Start(ctx); err != nil {
  260. e.mu.Unlock()
  261. return fmt.Errorf("driver start: %w", err)
  262. }
  263. runCtx, cancel := context.WithCancel(ctx)
  264. e.cancel = cancel
  265. e.state = EngineRunning
  266. e.startedAt = time.Now()
  267. e.wg.Add(1)
  268. e.mu.Unlock()
  269. go e.run(runCtx)
  270. return nil
  271. }
  272. func (e *Engine) Stop(ctx context.Context) error {
  273. e.mu.Lock()
  274. if e.state != EngineRunning {
  275. e.mu.Unlock()
  276. return nil
  277. }
  278. e.state = EngineStopping
  279. e.cancel()
  280. e.mu.Unlock()
  281. // Wait for run() goroutine to exit — deterministic, no guessing
  282. e.wg.Wait()
  283. if err := e.driver.Flush(ctx); err != nil {
  284. return err
  285. }
  286. if err := e.driver.Stop(ctx); err != nil {
  287. return err
  288. }
  289. e.mu.Lock()
  290. e.state = EngineIdle
  291. e.mu.Unlock()
  292. return nil
  293. }
  294. func (e *Engine) Stats() EngineStats {
  295. e.mu.Lock()
  296. state := e.state
  297. startedAt := e.startedAt
  298. e.mu.Unlock()
  299. var uptime float64
  300. if state == EngineRunning {
  301. uptime = time.Since(startedAt).Seconds()
  302. }
  303. errVal, _ := e.lastError.Load().(string)
  304. return EngineStats{
  305. State: state.String(),
  306. ChunksProduced: e.chunksProduced.Load(),
  307. TotalSamples: e.totalSamples.Load(),
  308. Underruns: e.underruns.Load(),
  309. LateBuffers: e.lateBuffers.Load(),
  310. LastError: errVal,
  311. UptimeSeconds: uptime,
  312. MaxCycleMs: durationMs(e.maxCycleNs.Load()),
  313. MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
  314. MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
  315. MaxWriteMs: durationMs(e.maxWriteNs.Load()),
  316. Queue: e.frameQueue.Stats(),
  317. }
  318. }
  319. func (e *Engine) run(ctx context.Context) {
  320. e.wg.Add(1)
  321. go e.writerLoop(ctx)
  322. defer e.wg.Done()
  323. for {
  324. if ctx.Err() != nil {
  325. return
  326. }
  327. // Apply pending frequency change between chunks
  328. if pf := e.pendingFreq.Swap(nil); pf != nil {
  329. if err := e.driver.Tune(ctx, *pf); err != nil {
  330. e.lastError.Store(fmt.Sprintf("tune: %v", err))
  331. } else {
  332. log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
  333. }
  334. }
  335. t0 := time.Now()
  336. frame := e.generator.GenerateFrame(e.chunkDuration)
  337. frame.GeneratedAt = t0
  338. t1 := time.Now()
  339. if e.upsampler != nil {
  340. frame = e.upsampler.Process(frame)
  341. frame.GeneratedAt = t0
  342. }
  343. t2 := time.Now()
  344. genDur := t1.Sub(t0)
  345. upDur := t2.Sub(t1)
  346. updateMaxDuration(&e.maxGenerateNs, genDur)
  347. updateMaxDuration(&e.maxUpsampleNs, upDur)
  348. enqueued := cloneFrame(frame)
  349. if enqueued == nil {
  350. e.lastError.Store("engine: frame clone failed")
  351. e.underruns.Add(1)
  352. continue
  353. }
  354. if err := e.frameQueue.Push(ctx, enqueued); err != nil {
  355. if ctx.Err() != nil {
  356. return
  357. }
  358. if errors.Is(err, output.ErrFrameQueueClosed) {
  359. return
  360. }
  361. e.lastError.Store(err.Error())
  362. e.underruns.Add(1)
  363. select {
  364. case <-time.After(e.chunkDuration):
  365. case <-ctx.Done():
  366. return
  367. }
  368. continue
  369. }
  370. }
  371. }
  372. func (e *Engine) writerLoop(ctx context.Context) {
  373. defer e.wg.Done()
  374. for {
  375. frame, err := e.frameQueue.Pop(ctx)
  376. if err != nil {
  377. if ctx.Err() != nil {
  378. return
  379. }
  380. if errors.Is(err, output.ErrFrameQueueClosed) {
  381. return
  382. }
  383. e.lastError.Store(err.Error())
  384. e.underruns.Add(1)
  385. continue
  386. }
  387. writeStart := time.Now()
  388. n, err := e.driver.Write(ctx, frame)
  389. writeDur := time.Since(writeStart)
  390. cycleDur := writeDur
  391. if !frame.GeneratedAt.IsZero() {
  392. cycleDur = time.Since(frame.GeneratedAt)
  393. }
  394. updateMaxDuration(&e.maxWriteNs, writeDur)
  395. updateMaxDuration(&e.maxCycleNs, cycleDur)
  396. if cycleDur > e.chunkDuration {
  397. late := e.lateBuffers.Add(1)
  398. if late <= 5 || late%20 == 0 {
  399. log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
  400. cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
  401. }
  402. }
  403. if err != nil {
  404. if ctx.Err() != nil {
  405. return
  406. }
  407. e.lastError.Store(err.Error())
  408. e.underruns.Add(1)
  409. select {
  410. case <-time.After(e.chunkDuration):
  411. case <-ctx.Done():
  412. return
  413. }
  414. continue
  415. }
  416. e.chunksProduced.Add(1)
  417. e.totalSamples.Add(uint64(n))
  418. }
  419. }
  420. func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
  421. if src == nil {
  422. return nil
  423. }
  424. samples := make([]output.IQSample, len(src.Samples))
  425. copy(samples, src.Samples)
  426. return &output.CompositeFrame{
  427. Samples: samples,
  428. SampleRateHz: src.SampleRateHz,
  429. Timestamp: src.Timestamp,
  430. GeneratedAt: src.GeneratedAt,
  431. Sequence: src.Sequence,
  432. }
  433. }