|
- package app
-
- import (
- "context"
- "fmt"
- "log"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/jan/fm-rds-tx/internal/audio"
- cfgpkg "github.com/jan/fm-rds-tx/internal/config"
- "github.com/jan/fm-rds-tx/internal/dsp"
- offpkg "github.com/jan/fm-rds-tx/internal/offline"
- "github.com/jan/fm-rds-tx/internal/platform"
- )
-
- type EngineState int
-
- const (
- EngineIdle EngineState = iota
- EngineRunning
- EngineStopping
- )
-
- func (s EngineState) String() string {
- switch s {
- case EngineIdle:
- return "idle"
- case EngineRunning:
- return "running"
- case EngineStopping:
- return "stopping"
- default:
- return "unknown"
- }
- }
-
- func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
- v := uint64(d)
- for {
- cur := dst.Load()
- if v <= cur {
- return
- }
- if dst.CompareAndSwap(cur, v) {
- return
- }
- }
- }
-
- func durationMs(ns uint64) float64 {
- return float64(ns) / float64(time.Millisecond)
- }
-
- type EngineStats struct {
- State string `json:"state"`
- ChunksProduced uint64 `json:"chunksProduced"`
- TotalSamples uint64 `json:"totalSamples"`
- Underruns uint64 `json:"underruns"`
- LateBuffers uint64 `json:"lateBuffers,omitempty"`
- LastError string `json:"lastError,omitempty"`
- UptimeSeconds float64 `json:"uptimeSeconds"`
- MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
- MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
- MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
- MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
- }
-
- // Engine is the continuous TX loop. It generates composite IQ in chunks,
- // resamples to device rate, and pushes to hardware in a tight loop.
- // The hardware buffer_push call is blocking — it returns when the hardware
- // has consumed the previous buffer and is ready for the next one.
- // This naturally paces the loop to real-time without a ticker.
- type Engine struct {
- cfg cfgpkg.Config
- driver platform.SoapyDriver
- generator *offpkg.Generator
- upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
- chunkDuration time.Duration
- deviceRate float64
-
- mu sync.Mutex
- state EngineState
- cancel context.CancelFunc
- startedAt time.Time
- wg sync.WaitGroup
-
- chunksProduced atomic.Uint64
- totalSamples atomic.Uint64
- underruns atomic.Uint64
- lateBuffers atomic.Uint64
- maxCycleNs atomic.Uint64
- maxGenerateNs atomic.Uint64
- maxUpsampleNs atomic.Uint64
- maxWriteNs atomic.Uint64
- lastError atomic.Value // string
-
- // Live config: pending frequency change, applied between chunks
- pendingFreq atomic.Pointer[float64]
-
- // Live audio stream (optional)
- streamSrc *audio.StreamSource
- }
-
- // SetStreamSource configures a live audio stream as the audio source.
- // Must be called before Start(). The StreamResampler is created internally
- // to convert from the stream's sample rate to the DSP composite rate.
- func (e *Engine) SetStreamSource(src *audio.StreamSource) {
- e.streamSrc = src
- compositeRate := float64(e.cfg.FM.CompositeRateHz)
- if compositeRate <= 0 {
- compositeRate = 228000
- }
- resampler := audio.NewStreamResampler(src, compositeRate)
- e.generator.SetExternalSource(resampler)
- log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
- src.SampleRate, compositeRate, src.Stats().Capacity)
- }
-
- // StreamSource returns the live audio stream source, or nil.
- // Used by the control server for stats and HTTP audio ingest.
- func (e *Engine) StreamSource() *audio.StreamSource {
- return e.streamSrc
- }
-
- func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
- deviceRate := cfg.EffectiveDeviceRate()
- compositeRate := float64(cfg.FM.CompositeRateHz)
- if compositeRate <= 0 {
- compositeRate = 228000
- }
-
- var upsampler *dsp.FMUpsampler
-
- if deviceRate > compositeRate*1.001 {
- // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
- // FMUpsampler handles FM modulation + interpolation to deviceRate.
- // This halves CPU load compared to running all DSP at deviceRate.
- cfg.FM.FMModulationEnabled = false
- maxDev := cfg.FM.MaxDeviationHz
- if maxDev <= 0 {
- maxDev = 75000
- }
- // mpxGain scales the FM deviation to compensate for hardware
- // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
- if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
- maxDev *= cfg.FM.MpxGain
- }
- upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
- log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
- compositeRate, deviceRate, deviceRate/compositeRate)
- } else {
- // Same-rate: entire DSP chain runs at deviceRate.
- // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
- if deviceRate > 0 {
- cfg.FM.CompositeRateHz = int(deviceRate)
- }
- cfg.FM.FMModulationEnabled = true
- log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
- }
-
- return &Engine{
- cfg: cfg,
- driver: driver,
- generator: offpkg.NewGenerator(cfg),
- upsampler: upsampler,
- chunkDuration: 50 * time.Millisecond,
- deviceRate: deviceRate,
- state: EngineIdle,
- }
- }
-
- func (e *Engine) SetChunkDuration(d time.Duration) {
- e.chunkDuration = d
- }
-
- // LiveConfigUpdate carries hot-reloadable parameters from the control API.
- // nil pointers mean "no change". Validated before applying.
- type LiveConfigUpdate struct {
- FrequencyMHz *float64
- OutputDrive *float64
- StereoEnabled *bool
- PilotLevel *float64
- RDSInjection *float64
- RDSEnabled *bool
- LimiterEnabled *bool
- LimiterCeiling *float64
- PS *string
- RadioText *string
- }
-
- // UpdateConfig applies live parameter changes without restarting the engine.
- // DSP params take effect at the next chunk boundary (~50ms max).
- // Frequency changes are applied between chunks via driver.Tune().
- // RDS text updates are applied at the next RDS group boundary (~88ms).
- func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
- // --- Validate ---
- if u.FrequencyMHz != nil {
- if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
- return fmt.Errorf("frequencyMHz out of range (65-110)")
- }
- }
- if u.OutputDrive != nil {
- if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
- return fmt.Errorf("outputDrive out of range (0-10)")
- }
- }
- if u.PilotLevel != nil {
- if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
- return fmt.Errorf("pilotLevel out of range (0-0.2)")
- }
- }
- if u.RDSInjection != nil {
- if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
- return fmt.Errorf("rdsInjection out of range (0-0.15)")
- }
- }
- if u.LimiterCeiling != nil {
- if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
- return fmt.Errorf("limiterCeiling out of range (0-2)")
- }
- }
-
- // --- Frequency: store for run loop to apply via driver.Tune() ---
- if u.FrequencyMHz != nil {
- freqHz := *u.FrequencyMHz * 1e6
- e.pendingFreq.Store(&freqHz)
- }
-
- // --- RDS text: forward to encoder atomics ---
- if u.PS != nil || u.RadioText != nil {
- if enc := e.generator.RDSEncoder(); enc != nil {
- ps, rt := "", ""
- if u.PS != nil {
- ps = *u.PS
- }
- if u.RadioText != nil {
- rt = *u.RadioText
- }
- enc.UpdateText(ps, rt)
- }
- }
-
- // --- DSP params: build new LiveParams from current + patch ---
- // Read current, apply deltas, store new
- current := e.generator.CurrentLiveParams()
- next := current // copy
-
- if u.OutputDrive != nil {
- next.OutputDrive = *u.OutputDrive
- }
- if u.StereoEnabled != nil {
- next.StereoEnabled = *u.StereoEnabled
- }
- if u.PilotLevel != nil {
- next.PilotLevel = *u.PilotLevel
- }
- if u.RDSInjection != nil {
- next.RDSInjection = *u.RDSInjection
- }
- if u.RDSEnabled != nil {
- next.RDSEnabled = *u.RDSEnabled
- }
- if u.LimiterEnabled != nil {
- next.LimiterEnabled = *u.LimiterEnabled
- }
- if u.LimiterCeiling != nil {
- next.LimiterCeiling = *u.LimiterCeiling
- }
-
- e.generator.UpdateLive(next)
- return nil
- }
-
- func (e *Engine) Start(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineIdle {
- e.mu.Unlock()
- return fmt.Errorf("engine already in state %s", e.state)
- }
-
- if err := e.driver.Start(ctx); err != nil {
- e.mu.Unlock()
- return fmt.Errorf("driver start: %w", err)
- }
-
- runCtx, cancel := context.WithCancel(ctx)
- e.cancel = cancel
- e.state = EngineRunning
- e.startedAt = time.Now()
- e.wg.Add(1)
- e.mu.Unlock()
-
- go e.run(runCtx)
- return nil
- }
-
- func (e *Engine) Stop(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineRunning {
- e.mu.Unlock()
- return nil
- }
- e.state = EngineStopping
- e.cancel()
- e.mu.Unlock()
-
- // Wait for run() goroutine to exit — deterministic, no guessing
- e.wg.Wait()
-
- if err := e.driver.Flush(ctx); err != nil {
- return err
- }
- if err := e.driver.Stop(ctx); err != nil {
- return err
- }
-
- e.mu.Lock()
- e.state = EngineIdle
- e.mu.Unlock()
- return nil
- }
-
- func (e *Engine) Stats() EngineStats {
- e.mu.Lock()
- state := e.state
- startedAt := e.startedAt
- e.mu.Unlock()
-
- var uptime float64
- if state == EngineRunning {
- uptime = time.Since(startedAt).Seconds()
- }
- errVal, _ := e.lastError.Load().(string)
-
- return EngineStats{
- State: state.String(),
- ChunksProduced: e.chunksProduced.Load(),
- TotalSamples: e.totalSamples.Load(),
- Underruns: e.underruns.Load(),
- LateBuffers: e.lateBuffers.Load(),
- LastError: errVal,
- UptimeSeconds: uptime,
- MaxCycleMs: durationMs(e.maxCycleNs.Load()),
- MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
- MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
- MaxWriteMs: durationMs(e.maxWriteNs.Load()),
- }
- }
-
- func (e *Engine) run(ctx context.Context) {
- defer e.wg.Done()
- for {
- if ctx.Err() != nil {
- return
- }
-
- // Apply pending frequency change between chunks
- if pf := e.pendingFreq.Swap(nil); pf != nil {
- if err := e.driver.Tune(ctx, *pf); err != nil {
- e.lastError.Store(fmt.Sprintf("tune: %v", err))
- } else {
- log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
- }
- }
-
- t0 := time.Now()
- frame := e.generator.GenerateFrame(e.chunkDuration)
- t1 := time.Now()
- if e.upsampler != nil {
- frame = e.upsampler.Process(frame)
- }
- t2 := time.Now()
- n, err := e.driver.Write(ctx, frame)
- t3 := time.Now()
-
- genDur := t1.Sub(t0)
- upDur := t2.Sub(t1)
- writeDur := t3.Sub(t2)
- cycleDur := t3.Sub(t0)
-
- updateMaxDuration(&e.maxGenerateNs, genDur)
- updateMaxDuration(&e.maxUpsampleNs, upDur)
- updateMaxDuration(&e.maxWriteNs, writeDur)
- updateMaxDuration(&e.maxCycleNs, cycleDur)
-
- if cycleDur > e.chunkDuration {
- late := e.lateBuffers.Add(1)
- if late <= 5 || late%20 == 0 {
- log.Printf("TX LATE: cycle=%s budget=%s gen=%s up=%s write=%s over=%s",
- cycleDur, e.chunkDuration, genDur, upDur, writeDur, cycleDur-e.chunkDuration)
- }
- }
-
- if err != nil {
- if ctx.Err() != nil {
- return
- }
- e.lastError.Store(err.Error())
- e.underruns.Add(1)
- // Back off to avoid pegging CPU on persistent errors
- select {
- case <-time.After(e.chunkDuration):
- case <-ctx.Done():
- return
- }
- continue
- }
- e.chunksProduced.Add(1)
- e.totalSamples.Add(uint64(n))
- }
- }
|