package app import ( "context" "errors" "fmt" "log" "sync" "sync/atomic" "time" "github.com/jan/fm-rds-tx/internal/audio" cfgpkg "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/dsp" offpkg "github.com/jan/fm-rds-tx/internal/offline" "github.com/jan/fm-rds-tx/internal/output" "github.com/jan/fm-rds-tx/internal/platform" ) type EngineState int const ( EngineIdle EngineState = iota EngineRunning EngineStopping ) func (s EngineState) String() string { switch s { case EngineIdle: return "idle" case EngineRunning: return "running" case EngineStopping: return "stopping" default: return "unknown" } } func updateMaxDuration(dst *atomic.Uint64, d time.Duration) { v := uint64(d) for { cur := dst.Load() if v <= cur { return } if dst.CompareAndSwap(cur, v) { return } } } func durationMs(ns uint64) float64 { return float64(ns) / float64(time.Millisecond) } type EngineStats struct { State string `json:"state"` ChunksProduced uint64 `json:"chunksProduced"` TotalSamples uint64 `json:"totalSamples"` Underruns uint64 `json:"underruns"` LateBuffers uint64 `json:"lateBuffers,omitempty"` LastError string `json:"lastError,omitempty"` UptimeSeconds float64 `json:"uptimeSeconds"` MaxCycleMs float64 `json:"maxCycleMs,omitempty"` MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"` MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"` MaxWriteMs float64 `json:"maxWriteMs,omitempty"` Queue output.QueueStats `json:"queue"` RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"` RuntimeAlert string `json:"runtimeAlert,omitempty"` } type RuntimeIndicator string const ( RuntimeIndicatorNormal RuntimeIndicator = "normal" RuntimeIndicatorDegraded RuntimeIndicator = "degraded" RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical" ) // Engine is the continuous TX loop. It generates composite IQ in chunks, // resamples to device rate, and pushes to hardware in a tight loop. // The hardware buffer_push call is blocking — it returns when the hardware // has consumed the previous buffer and is ready for the next one. // This naturally paces the loop to real-time without a ticker. type Engine struct { cfg cfgpkg.Config driver platform.SoapyDriver generator *offpkg.Generator upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate chunkDuration time.Duration deviceRate float64 frameQueue *output.FrameQueue mu sync.Mutex state EngineState cancel context.CancelFunc startedAt time.Time wg sync.WaitGroup chunksProduced atomic.Uint64 totalSamples atomic.Uint64 underruns atomic.Uint64 lateBuffers atomic.Uint64 maxCycleNs atomic.Uint64 maxGenerateNs atomic.Uint64 maxUpsampleNs atomic.Uint64 maxWriteNs atomic.Uint64 lastError atomic.Value // string // Live config: pending frequency change, applied between chunks pendingFreq atomic.Pointer[float64] // Live audio stream (optional) streamSrc *audio.StreamSource } // SetStreamSource configures a live audio stream as the audio source. // Must be called before Start(). The StreamResampler is created internally // to convert from the stream's sample rate to the DSP composite rate. func (e *Engine) SetStreamSource(src *audio.StreamSource) { e.streamSrc = src compositeRate := float64(e.cfg.FM.CompositeRateHz) if compositeRate <= 0 { compositeRate = 228000 } resampler := audio.NewStreamResampler(src, compositeRate) e.generator.SetExternalSource(resampler) log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)", src.SampleRate, compositeRate, src.Stats().Capacity) } // StreamSource returns the live audio stream source, or nil. // Used by the control server for stats and HTTP audio ingest. func (e *Engine) StreamSource() *audio.StreamSource { return e.streamSrc } func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine { deviceRate := cfg.EffectiveDeviceRate() compositeRate := float64(cfg.FM.CompositeRateHz) if compositeRate <= 0 { compositeRate = 228000 } var upsampler *dsp.FMUpsampler if deviceRate > compositeRate*1.001 { // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz), // FMUpsampler handles FM modulation + interpolation to deviceRate. // This halves CPU load compared to running all DSP at deviceRate. cfg.FM.FMModulationEnabled = false maxDev := cfg.FM.MaxDeviationHz if maxDev <= 0 { maxDev = 75000 } // mpxGain scales the FM deviation to compensate for hardware // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels. if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 { maxDev *= cfg.FM.MpxGain } upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev) log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)", compositeRate, deviceRate, deviceRate/compositeRate) } else { // Same-rate: entire DSP chain runs at deviceRate. // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz). if deviceRate > 0 { cfg.FM.CompositeRateHz = int(deviceRate) } cfg.FM.FMModulationEnabled = true log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz) } return &Engine{ cfg: cfg, driver: driver, generator: offpkg.NewGenerator(cfg), upsampler: upsampler, chunkDuration: 50 * time.Millisecond, deviceRate: deviceRate, state: EngineIdle, frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity), } } func (e *Engine) SetChunkDuration(d time.Duration) { e.chunkDuration = d } // LiveConfigUpdate carries hot-reloadable parameters from the control API. // nil pointers mean "no change". Validated before applying. type LiveConfigUpdate struct { FrequencyMHz *float64 OutputDrive *float64 StereoEnabled *bool PilotLevel *float64 RDSInjection *float64 RDSEnabled *bool LimiterEnabled *bool LimiterCeiling *float64 PS *string RadioText *string } // UpdateConfig applies live parameter changes without restarting the engine. // DSP params take effect at the next chunk boundary (~50ms max). // Frequency changes are applied between chunks via driver.Tune(). // RDS text updates are applied at the next RDS group boundary (~88ms). func (e *Engine) UpdateConfig(u LiveConfigUpdate) error { // --- Validate --- if u.FrequencyMHz != nil { if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 { return fmt.Errorf("frequencyMHz out of range (65-110)") } } if u.OutputDrive != nil { if *u.OutputDrive < 0 || *u.OutputDrive > 10 { return fmt.Errorf("outputDrive out of range (0-10)") } } if u.PilotLevel != nil { if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 { return fmt.Errorf("pilotLevel out of range (0-0.2)") } } if u.RDSInjection != nil { if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 { return fmt.Errorf("rdsInjection out of range (0-0.15)") } } if u.LimiterCeiling != nil { if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 { return fmt.Errorf("limiterCeiling out of range (0-2)") } } // --- Frequency: store for run loop to apply via driver.Tune() --- if u.FrequencyMHz != nil { freqHz := *u.FrequencyMHz * 1e6 e.pendingFreq.Store(&freqHz) } // --- RDS text: forward to encoder atomics --- if u.PS != nil || u.RadioText != nil { if enc := e.generator.RDSEncoder(); enc != nil { ps, rt := "", "" if u.PS != nil { ps = *u.PS } if u.RadioText != nil { rt = *u.RadioText } enc.UpdateText(ps, rt) } } // --- DSP params: build new LiveParams from current + patch --- // Read current, apply deltas, store new current := e.generator.CurrentLiveParams() next := current // copy if u.OutputDrive != nil { next.OutputDrive = *u.OutputDrive } if u.StereoEnabled != nil { next.StereoEnabled = *u.StereoEnabled } if u.PilotLevel != nil { next.PilotLevel = *u.PilotLevel } if u.RDSInjection != nil { next.RDSInjection = *u.RDSInjection } if u.RDSEnabled != nil { next.RDSEnabled = *u.RDSEnabled } if u.LimiterEnabled != nil { next.LimiterEnabled = *u.LimiterEnabled } if u.LimiterCeiling != nil { next.LimiterCeiling = *u.LimiterCeiling } e.generator.UpdateLive(next) return nil } func (e *Engine) Start(ctx context.Context) error { e.mu.Lock() if e.state != EngineIdle { e.mu.Unlock() return fmt.Errorf("engine already in state %s", e.state) } if err := e.driver.Start(ctx); err != nil { e.mu.Unlock() return fmt.Errorf("driver start: %w", err) } runCtx, cancel := context.WithCancel(ctx) e.cancel = cancel e.state = EngineRunning e.startedAt = time.Now() e.wg.Add(1) e.mu.Unlock() go e.run(runCtx) return nil } func (e *Engine) Stop(ctx context.Context) error { e.mu.Lock() if e.state != EngineRunning { e.mu.Unlock() return nil } e.state = EngineStopping e.cancel() e.mu.Unlock() // Wait for run() goroutine to exit — deterministic, no guessing e.wg.Wait() if err := e.driver.Flush(ctx); err != nil { return err } if err := e.driver.Stop(ctx); err != nil { return err } e.mu.Lock() e.state = EngineIdle e.mu.Unlock() return nil } func (e *Engine) Stats() EngineStats { e.mu.Lock() state := e.state startedAt := e.startedAt e.mu.Unlock() var uptime float64 if state == EngineRunning { uptime = time.Since(startedAt).Seconds() } errVal, _ := e.lastError.Load().(string) queue := e.frameQueue.Stats() lateBuffers := e.lateBuffers.Load() ri := runtimeIndicator(queue.Health, lateBuffers) return EngineStats{ State: state.String(), ChunksProduced: e.chunksProduced.Load(), TotalSamples: e.totalSamples.Load(), Underruns: e.underruns.Load(), LateBuffers: lateBuffers, LastError: errVal, UptimeSeconds: uptime, MaxCycleMs: durationMs(e.maxCycleNs.Load()), MaxGenerateMs: durationMs(e.maxGenerateNs.Load()), MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()), MaxWriteMs: durationMs(e.maxWriteNs.Load()), Queue: queue, RuntimeIndicator: ri, RuntimeAlert: runtimeAlert(queue.Health, lateBuffers), } } func runtimeIndicator(queueHealth output.QueueHealth, lateBuffers uint64) RuntimeIndicator { switch { case queueHealth == output.QueueHealthCritical: return RuntimeIndicatorQueueCritical case queueHealth == output.QueueHealthLow || lateBuffers > 0: return RuntimeIndicatorDegraded default: return RuntimeIndicatorNormal } } func runtimeAlert(queueHealth output.QueueHealth, lateBuffers uint64) string { switch { case queueHealth == output.QueueHealthCritical: return "queue health critical" case lateBuffers > 0: return "late buffers" case queueHealth == output.QueueHealthLow: return "queue health low" default: return "" } } func (e *Engine) run(ctx context.Context) { e.wg.Add(1) go e.writerLoop(ctx) defer e.wg.Done() for { if ctx.Err() != nil { return } // Apply pending frequency change between chunks if pf := e.pendingFreq.Swap(nil); pf != nil { if err := e.driver.Tune(ctx, *pf); err != nil { e.lastError.Store(fmt.Sprintf("tune: %v", err)) } else { log.Printf("engine: tuned to %.3f MHz", *pf/1e6) } } t0 := time.Now() frame := e.generator.GenerateFrame(e.chunkDuration) frame.GeneratedAt = t0 t1 := time.Now() if e.upsampler != nil { frame = e.upsampler.Process(frame) frame.GeneratedAt = t0 } t2 := time.Now() genDur := t1.Sub(t0) upDur := t2.Sub(t1) updateMaxDuration(&e.maxGenerateNs, genDur) updateMaxDuration(&e.maxUpsampleNs, upDur) enqueued := cloneFrame(frame) if enqueued == nil { e.lastError.Store("engine: frame clone failed") e.underruns.Add(1) continue } if err := e.frameQueue.Push(ctx, enqueued); err != nil { if ctx.Err() != nil { return } if errors.Is(err, output.ErrFrameQueueClosed) { return } e.lastError.Store(err.Error()) e.underruns.Add(1) select { case <-time.After(e.chunkDuration): case <-ctx.Done(): return } continue } } } func (e *Engine) writerLoop(ctx context.Context) { defer e.wg.Done() for { frame, err := e.frameQueue.Pop(ctx) if err != nil { if ctx.Err() != nil { return } if errors.Is(err, output.ErrFrameQueueClosed) { return } e.lastError.Store(err.Error()) e.underruns.Add(1) continue } writeStart := time.Now() n, err := e.driver.Write(ctx, frame) writeDur := time.Since(writeStart) cycleDur := writeDur if !frame.GeneratedAt.IsZero() { cycleDur = time.Since(frame.GeneratedAt) } updateMaxDuration(&e.maxWriteNs, writeDur) updateMaxDuration(&e.maxCycleNs, cycleDur) if cycleDur > e.chunkDuration { late := e.lateBuffers.Add(1) if late <= 5 || late%20 == 0 { log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s", cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration) } } if err != nil { if ctx.Err() != nil { return } e.lastError.Store(err.Error()) e.underruns.Add(1) select { case <-time.After(e.chunkDuration): case <-ctx.Done(): return } continue } e.chunksProduced.Add(1) e.totalSamples.Add(uint64(n)) } } func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame { if src == nil { return nil } samples := make([]output.IQSample, len(src.Samples)) copy(samples, src.Samples) return &output.CompositeFrame{ Samples: samples, SampleRateHz: src.SampleRateHz, Timestamp: src.Timestamp, GeneratedAt: src.GeneratedAt, Sequence: src.Sequence, } }