|
- package app
-
- import (
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-
- cfgpkg "github.com/jan/fm-rds-tx/internal/config"
- offpkg "github.com/jan/fm-rds-tx/internal/offline"
- "github.com/jan/fm-rds-tx/internal/platform"
- )
-
- type EngineState int
-
- const (
- EngineIdle EngineState = iota
- EngineRunning
- EngineStopping
- )
-
- func (s EngineState) String() string {
- switch s {
- case EngineIdle:
- return "idle"
- case EngineRunning:
- return "running"
- case EngineStopping:
- return "stopping"
- default:
- return "unknown"
- }
- }
-
- type EngineStats struct {
- State string `json:"state"`
- ChunksProduced uint64 `json:"chunksProduced"`
- TotalSamples uint64 `json:"totalSamples"`
- Underruns uint64 `json:"underruns"`
- LastError string `json:"lastError,omitempty"`
- UptimeSeconds float64 `json:"uptimeSeconds"`
- }
-
- // Engine is the continuous TX loop. It generates composite IQ in chunks,
- // resamples to device rate, and pushes to hardware in a tight loop.
- // The hardware buffer_push call is blocking — it returns when the hardware
- // has consumed the previous buffer and is ready for the next one.
- // This naturally paces the loop to real-time without a ticker.
- type Engine struct {
- cfg cfgpkg.Config
- driver platform.SoapyDriver
- generator *offpkg.Generator
- chunkDuration time.Duration
- deviceRate float64
-
- mu sync.Mutex
- state EngineState
- cancel context.CancelFunc
- startedAt time.Time
- wg sync.WaitGroup
-
- chunksProduced atomic.Uint64
- totalSamples atomic.Uint64
- underruns atomic.Uint64
- lastError atomic.Value // string
- }
-
- func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
- // Run entire DSP chain at device rate. RDS encoder resamples its
- // PiFmRds waveform internally. No phase upsampling needed.
- deviceRate := cfg.EffectiveDeviceRate()
- if deviceRate > 0 {
- cfg.FM.CompositeRateHz = int(deviceRate)
- }
- cfg.FM.FMModulationEnabled = true
-
- return &Engine{
- cfg: cfg,
- driver: driver,
- generator: offpkg.NewGenerator(cfg),
- chunkDuration: 50 * time.Millisecond,
- deviceRate: deviceRate,
- state: EngineIdle,
- }
- }
-
- func (e *Engine) SetChunkDuration(d time.Duration) {
- e.chunkDuration = d
- }
-
- func (e *Engine) Start(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineIdle {
- e.mu.Unlock()
- return fmt.Errorf("engine already in state %s", e.state)
- }
-
- if err := e.driver.Start(ctx); err != nil {
- e.mu.Unlock()
- return fmt.Errorf("driver start: %w", err)
- }
-
- runCtx, cancel := context.WithCancel(ctx)
- e.cancel = cancel
- e.state = EngineRunning
- e.startedAt = time.Now()
- e.wg.Add(1)
- e.mu.Unlock()
-
- go e.run(runCtx)
- return nil
- }
-
- func (e *Engine) Stop(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineRunning {
- e.mu.Unlock()
- return nil
- }
- e.state = EngineStopping
- e.cancel()
- e.mu.Unlock()
-
- // Wait for run() goroutine to exit — deterministic, no guessing
- e.wg.Wait()
-
- if err := e.driver.Flush(ctx); err != nil {
- return err
- }
- if err := e.driver.Stop(ctx); err != nil {
- return err
- }
-
- e.mu.Lock()
- e.state = EngineIdle
- e.mu.Unlock()
- return nil
- }
-
- func (e *Engine) Stats() EngineStats {
- e.mu.Lock()
- state := e.state
- startedAt := e.startedAt
- e.mu.Unlock()
-
- var uptime float64
- if state == EngineRunning {
- uptime = time.Since(startedAt).Seconds()
- }
- errVal, _ := e.lastError.Load().(string)
-
- return EngineStats{
- State: state.String(),
- ChunksProduced: e.chunksProduced.Load(),
- TotalSamples: e.totalSamples.Load(),
- Underruns: e.underruns.Load(),
- LastError: errVal,
- UptimeSeconds: uptime,
- }
- }
-
- func (e *Engine) run(ctx context.Context) {
- defer e.wg.Done()
- for {
- if ctx.Err() != nil {
- return
- }
- frame := e.generator.GenerateFrame(e.chunkDuration)
- n, err := e.driver.Write(ctx, frame)
- if err != nil {
- if ctx.Err() != nil { return }
- e.lastError.Store(err.Error())
- e.underruns.Add(1)
- // Back off to avoid pegging CPU on persistent errors
- select {
- case <-time.After(e.chunkDuration):
- case <-ctx.Done():
- return
- }
- continue
- }
- e.chunksProduced.Add(1)
- e.totalSamples.Add(uint64(n))
- }
- }
|