Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

216 lines
5.1KB

  1. package app
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  10. "github.com/jan/fm-rds-tx/internal/dsp"
  11. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  12. "github.com/jan/fm-rds-tx/internal/platform"
  13. )
  14. type EngineState int
  15. const (
  16. EngineIdle EngineState = iota
  17. EngineRunning
  18. EngineStopping
  19. )
  20. func (s EngineState) String() string {
  21. switch s {
  22. case EngineIdle:
  23. return "idle"
  24. case EngineRunning:
  25. return "running"
  26. case EngineStopping:
  27. return "stopping"
  28. default:
  29. return "unknown"
  30. }
  31. }
  32. type EngineStats struct {
  33. State string `json:"state"`
  34. ChunksProduced uint64 `json:"chunksProduced"`
  35. TotalSamples uint64 `json:"totalSamples"`
  36. Underruns uint64 `json:"underruns"`
  37. LastError string `json:"lastError,omitempty"`
  38. UptimeSeconds float64 `json:"uptimeSeconds"`
  39. }
  40. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  41. // resamples to device rate, and pushes to hardware in a tight loop.
  42. // The hardware buffer_push call is blocking — it returns when the hardware
  43. // has consumed the previous buffer and is ready for the next one.
  44. // This naturally paces the loop to real-time without a ticker.
  45. type Engine struct {
  46. cfg cfgpkg.Config
  47. driver platform.SoapyDriver
  48. generator *offpkg.Generator
  49. upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
  50. chunkDuration time.Duration
  51. deviceRate float64
  52. mu sync.Mutex
  53. state EngineState
  54. cancel context.CancelFunc
  55. startedAt time.Time
  56. wg sync.WaitGroup
  57. chunksProduced atomic.Uint64
  58. totalSamples atomic.Uint64
  59. underruns atomic.Uint64
  60. lastError atomic.Value // string
  61. }
  62. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  63. deviceRate := cfg.EffectiveDeviceRate()
  64. compositeRate := float64(cfg.FM.CompositeRateHz)
  65. if compositeRate <= 0 {
  66. compositeRate = 228000
  67. }
  68. var upsampler *dsp.FMUpsampler
  69. if deviceRate > compositeRate*1.001 {
  70. // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
  71. // FMUpsampler handles FM modulation + interpolation to deviceRate.
  72. // This halves CPU load compared to running all DSP at deviceRate.
  73. cfg.FM.FMModulationEnabled = false
  74. maxDev := cfg.FM.MaxDeviationHz
  75. if maxDev <= 0 {
  76. maxDev = 75000
  77. }
  78. upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
  79. log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
  80. compositeRate, deviceRate, deviceRate/compositeRate)
  81. } else {
  82. // Same-rate: entire DSP chain runs at deviceRate.
  83. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
  84. if deviceRate > 0 {
  85. cfg.FM.CompositeRateHz = int(deviceRate)
  86. }
  87. cfg.FM.FMModulationEnabled = true
  88. log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
  89. }
  90. return &Engine{
  91. cfg: cfg,
  92. driver: driver,
  93. generator: offpkg.NewGenerator(cfg),
  94. upsampler: upsampler,
  95. chunkDuration: 50 * time.Millisecond,
  96. deviceRate: deviceRate,
  97. state: EngineIdle,
  98. }
  99. }
  100. func (e *Engine) SetChunkDuration(d time.Duration) {
  101. e.chunkDuration = d
  102. }
  103. func (e *Engine) Start(ctx context.Context) error {
  104. e.mu.Lock()
  105. if e.state != EngineIdle {
  106. e.mu.Unlock()
  107. return fmt.Errorf("engine already in state %s", e.state)
  108. }
  109. if err := e.driver.Start(ctx); err != nil {
  110. e.mu.Unlock()
  111. return fmt.Errorf("driver start: %w", err)
  112. }
  113. runCtx, cancel := context.WithCancel(ctx)
  114. e.cancel = cancel
  115. e.state = EngineRunning
  116. e.startedAt = time.Now()
  117. e.wg.Add(1)
  118. e.mu.Unlock()
  119. go e.run(runCtx)
  120. return nil
  121. }
  122. func (e *Engine) Stop(ctx context.Context) error {
  123. e.mu.Lock()
  124. if e.state != EngineRunning {
  125. e.mu.Unlock()
  126. return nil
  127. }
  128. e.state = EngineStopping
  129. e.cancel()
  130. e.mu.Unlock()
  131. // Wait for run() goroutine to exit — deterministic, no guessing
  132. e.wg.Wait()
  133. if err := e.driver.Flush(ctx); err != nil {
  134. return err
  135. }
  136. if err := e.driver.Stop(ctx); err != nil {
  137. return err
  138. }
  139. e.mu.Lock()
  140. e.state = EngineIdle
  141. e.mu.Unlock()
  142. return nil
  143. }
  144. func (e *Engine) Stats() EngineStats {
  145. e.mu.Lock()
  146. state := e.state
  147. startedAt := e.startedAt
  148. e.mu.Unlock()
  149. var uptime float64
  150. if state == EngineRunning {
  151. uptime = time.Since(startedAt).Seconds()
  152. }
  153. errVal, _ := e.lastError.Load().(string)
  154. return EngineStats{
  155. State: state.String(),
  156. ChunksProduced: e.chunksProduced.Load(),
  157. TotalSamples: e.totalSamples.Load(),
  158. Underruns: e.underruns.Load(),
  159. LastError: errVal,
  160. UptimeSeconds: uptime,
  161. }
  162. }
  163. func (e *Engine) run(ctx context.Context) {
  164. defer e.wg.Done()
  165. for {
  166. if ctx.Err() != nil {
  167. return
  168. }
  169. frame := e.generator.GenerateFrame(e.chunkDuration)
  170. if e.upsampler != nil {
  171. frame = e.upsampler.Process(frame)
  172. }
  173. n, err := e.driver.Write(ctx, frame)
  174. if err != nil {
  175. if ctx.Err() != nil { return }
  176. e.lastError.Store(err.Error())
  177. e.underruns.Add(1)
  178. // Back off to avoid pegging CPU on persistent errors
  179. select {
  180. case <-time.After(e.chunkDuration):
  181. case <-ctx.Done():
  182. return
  183. }
  184. continue
  185. }
  186. e.chunksProduced.Add(1)
  187. e.totalSamples.Add(uint64(n))
  188. }
  189. }