Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

186 рядки
4.2KB

  1. package app
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. cfgpkg "github.com/jan/fm-rds-tx/internal/config"
  9. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  10. "github.com/jan/fm-rds-tx/internal/platform"
  11. )
  12. type EngineState int
  13. const (
  14. EngineIdle EngineState = iota
  15. EngineRunning
  16. EngineStopping
  17. )
  18. func (s EngineState) String() string {
  19. switch s {
  20. case EngineIdle:
  21. return "idle"
  22. case EngineRunning:
  23. return "running"
  24. case EngineStopping:
  25. return "stopping"
  26. default:
  27. return "unknown"
  28. }
  29. }
  30. type EngineStats struct {
  31. State string `json:"state"`
  32. ChunksProduced uint64 `json:"chunksProduced"`
  33. TotalSamples uint64 `json:"totalSamples"`
  34. Underruns uint64 `json:"underruns"`
  35. LastError string `json:"lastError,omitempty"`
  36. UptimeSeconds float64 `json:"uptimeSeconds"`
  37. }
  38. // Engine is the continuous TX loop. It generates composite IQ in chunks,
  39. // resamples to device rate, and pushes to hardware in a tight loop.
  40. // The hardware buffer_push call is blocking — it returns when the hardware
  41. // has consumed the previous buffer and is ready for the next one.
  42. // This naturally paces the loop to real-time without a ticker.
  43. type Engine struct {
  44. cfg cfgpkg.Config
  45. driver platform.SoapyDriver
  46. generator *offpkg.Generator
  47. chunkDuration time.Duration
  48. deviceRate float64
  49. mu sync.Mutex
  50. state EngineState
  51. cancel context.CancelFunc
  52. startedAt time.Time
  53. chunksProduced atomic.Uint64
  54. totalSamples atomic.Uint64
  55. underruns atomic.Uint64
  56. lastError atomic.Value // string
  57. }
  58. func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
  59. // When device rate differs from composite rate, run the entire DSP chain
  60. // at device rate directly. This avoids resampling artifacts on the
  61. // 19/38/57 kHz subcarriers and gives much better spectral quality.
  62. deviceRate := cfg.EffectiveDeviceRate()
  63. if deviceRate > 0 && deviceRate != float64(cfg.FM.CompositeRateHz) {
  64. cfg.FM.CompositeRateHz = int(deviceRate)
  65. }
  66. return &Engine{
  67. cfg: cfg,
  68. driver: driver,
  69. generator: offpkg.NewGenerator(cfg),
  70. chunkDuration: 50 * time.Millisecond,
  71. deviceRate: deviceRate,
  72. state: EngineIdle,
  73. }
  74. }
  75. func (e *Engine) SetChunkDuration(d time.Duration) {
  76. e.chunkDuration = d
  77. }
  78. func (e *Engine) Start(ctx context.Context) error {
  79. e.mu.Lock()
  80. if e.state != EngineIdle {
  81. e.mu.Unlock()
  82. return fmt.Errorf("engine already in state %s", e.state)
  83. }
  84. if err := e.driver.Start(ctx); err != nil {
  85. e.mu.Unlock()
  86. return fmt.Errorf("driver start: %w", err)
  87. }
  88. runCtx, cancel := context.WithCancel(ctx)
  89. e.cancel = cancel
  90. e.state = EngineRunning
  91. e.startedAt = time.Now()
  92. e.mu.Unlock()
  93. go e.run(runCtx)
  94. return nil
  95. }
  96. func (e *Engine) Stop(ctx context.Context) error {
  97. e.mu.Lock()
  98. if e.state != EngineRunning {
  99. e.mu.Unlock()
  100. return nil
  101. }
  102. e.state = EngineStopping
  103. e.cancel()
  104. e.mu.Unlock()
  105. time.Sleep(e.chunkDuration * 2)
  106. if err := e.driver.Flush(ctx); err != nil {
  107. return err
  108. }
  109. if err := e.driver.Stop(ctx); err != nil {
  110. return err
  111. }
  112. e.mu.Lock()
  113. e.state = EngineIdle
  114. e.mu.Unlock()
  115. return nil
  116. }
  117. func (e *Engine) Stats() EngineStats {
  118. e.mu.Lock()
  119. state := e.state
  120. startedAt := e.startedAt
  121. e.mu.Unlock()
  122. var uptime float64
  123. if state == EngineRunning {
  124. uptime = time.Since(startedAt).Seconds()
  125. }
  126. errVal, _ := e.lastError.Load().(string)
  127. return EngineStats{
  128. State: state.String(),
  129. ChunksProduced: e.chunksProduced.Load(),
  130. TotalSamples: e.totalSamples.Load(),
  131. Underruns: e.underruns.Load(),
  132. LastError: errVal,
  133. UptimeSeconds: uptime,
  134. }
  135. }
  136. func (e *Engine) run(ctx context.Context) {
  137. // Tight loop: generate → resample → push.
  138. // The driver.Write/buffer_push call blocks until hardware is ready
  139. // for the next buffer. This naturally paces to real-time.
  140. // No ticker needed — the hardware clock drives the timing.
  141. for {
  142. if ctx.Err() != nil {
  143. return
  144. }
  145. frame := e.generator.GenerateFrame(e.chunkDuration)
  146. n, err := e.driver.Write(ctx, frame)
  147. if err != nil {
  148. if ctx.Err() != nil {
  149. return
  150. }
  151. e.lastError.Store(err.Error())
  152. e.underruns.Add(1)
  153. continue
  154. }
  155. e.chunksProduced.Add(1)
  156. e.totalSamples.Add(uint64(n))
  157. }
  158. }