|
- package app
-
- import (
- "context"
- "errors"
- "fmt"
- "log"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/jan/fm-rds-tx/internal/audio"
- cfgpkg "github.com/jan/fm-rds-tx/internal/config"
- "github.com/jan/fm-rds-tx/internal/dsp"
- offpkg "github.com/jan/fm-rds-tx/internal/offline"
- "github.com/jan/fm-rds-tx/internal/output"
- "github.com/jan/fm-rds-tx/internal/platform"
- )
-
- type EngineState int
-
- const (
- EngineIdle EngineState = iota
- EngineRunning
- EngineStopping
- )
-
- func (s EngineState) String() string {
- switch s {
- case EngineIdle:
- return "idle"
- case EngineRunning:
- return "running"
- case EngineStopping:
- return "stopping"
- default:
- return "unknown"
- }
- }
-
- type RuntimeState string
-
- const (
- RuntimeStateIdle RuntimeState = "idle"
- RuntimeStateArming RuntimeState = "arming"
- RuntimeStatePrebuffering RuntimeState = "prebuffering"
- RuntimeStateRunning RuntimeState = "running"
- RuntimeStateDegraded RuntimeState = "degraded"
- RuntimeStateMuted RuntimeState = "muted"
- RuntimeStateFaulted RuntimeState = "faulted"
- RuntimeStateStopping RuntimeState = "stopping"
- )
-
- func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
- v := uint64(d)
- for {
- cur := dst.Load()
- if v <= cur {
- return
- }
- if dst.CompareAndSwap(cur, v) {
- return
- }
- }
- }
-
- func durationMs(ns uint64) float64 {
- return float64(ns) / float64(time.Millisecond)
- }
-
- type EngineStats struct {
- State string `json:"state"`
- ChunksProduced uint64 `json:"chunksProduced"`
- TotalSamples uint64 `json:"totalSamples"`
- Underruns uint64 `json:"underruns"`
- LateBuffers uint64 `json:"lateBuffers,omitempty"`
- LastError string `json:"lastError,omitempty"`
- UptimeSeconds float64 `json:"uptimeSeconds"`
- MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
- MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
- MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
- MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
- Queue output.QueueStats `json:"queue"`
- RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"`
- RuntimeAlert string `json:"runtimeAlert,omitempty"`
- }
-
- type RuntimeIndicator string
-
- const (
- RuntimeIndicatorNormal RuntimeIndicator = "normal"
- RuntimeIndicatorDegraded RuntimeIndicator = "degraded"
- RuntimeIndicatorQueueCritical RuntimeIndicator = "queueCritical"
- )
-
- const lateBufferIndicatorWindow = 5 * time.Second
-
- // Engine is the continuous TX loop. It generates composite IQ in chunks,
- // resamples to device rate, and pushes to hardware in a tight loop.
- // The hardware buffer_push call is blocking — it returns when the hardware
- // has consumed the previous buffer and is ready for the next one.
- // This naturally paces the loop to real-time without a ticker.
- type Engine struct {
- cfg cfgpkg.Config
- driver platform.SoapyDriver
- generator *offpkg.Generator
- upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
- chunkDuration time.Duration
- deviceRate float64
- frameQueue *output.FrameQueue
-
- mu sync.Mutex
- state EngineState
- cancel context.CancelFunc
- startedAt time.Time
- wg sync.WaitGroup
- runtimeState atomic.Value
-
- chunksProduced atomic.Uint64
- totalSamples atomic.Uint64
- underruns atomic.Uint64
- lateBuffers atomic.Uint64
- lateBufferAlertAt atomic.Uint64
- maxCycleNs atomic.Uint64
- maxGenerateNs atomic.Uint64
- maxUpsampleNs atomic.Uint64
- maxWriteNs atomic.Uint64
- lastError atomic.Value // string
-
- // Live config: pending frequency change, applied between chunks
- pendingFreq atomic.Pointer[float64]
-
- // Live audio stream (optional)
- streamSrc *audio.StreamSource
- }
-
- // SetStreamSource configures a live audio stream as the audio source.
- // Must be called before Start(). The StreamResampler is created internally
- // to convert from the stream's sample rate to the DSP composite rate.
- func (e *Engine) SetStreamSource(src *audio.StreamSource) {
- e.streamSrc = src
- compositeRate := float64(e.cfg.FM.CompositeRateHz)
- if compositeRate <= 0 {
- compositeRate = 228000
- }
- resampler := audio.NewStreamResampler(src, compositeRate)
- e.generator.SetExternalSource(resampler)
- log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
- src.SampleRate, compositeRate, src.Stats().Capacity)
- }
-
- // StreamSource returns the live audio stream source, or nil.
- // Used by the control server for stats and HTTP audio ingest.
- func (e *Engine) StreamSource() *audio.StreamSource {
- return e.streamSrc
- }
-
- func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
- deviceRate := cfg.EffectiveDeviceRate()
- compositeRate := float64(cfg.FM.CompositeRateHz)
- if compositeRate <= 0 {
- compositeRate = 228000
- }
-
- var upsampler *dsp.FMUpsampler
-
- if deviceRate > compositeRate*1.001 {
- // Split-rate: DSP chain runs at compositeRate (typ. 228 kHz),
- // FMUpsampler handles FM modulation + interpolation to deviceRate.
- // This halves CPU load compared to running all DSP at deviceRate.
- cfg.FM.FMModulationEnabled = false
- maxDev := cfg.FM.MaxDeviationHz
- if maxDev <= 0 {
- maxDev = 75000
- }
- // mpxGain scales the FM deviation to compensate for hardware
- // DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
- if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
- maxDev *= cfg.FM.MpxGain
- }
- upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
- log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
- compositeRate, deviceRate, deviceRate/compositeRate)
- } else {
- // Same-rate: entire DSP chain runs at deviceRate.
- // Used when deviceRate ≈ compositeRate (e.g. LimeSDR at 228 kHz).
- if deviceRate > 0 {
- cfg.FM.CompositeRateHz = int(deviceRate)
- }
- cfg.FM.FMModulationEnabled = true
- log.Printf("engine: same-rate mode — DSP@%dHz", cfg.FM.CompositeRateHz)
- }
-
- engine := &Engine{
- cfg: cfg,
- driver: driver,
- generator: offpkg.NewGenerator(cfg),
- upsampler: upsampler,
- chunkDuration: 50 * time.Millisecond,
- deviceRate: deviceRate,
- state: EngineIdle,
- frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
- }
- engine.setRuntimeState(RuntimeStateIdle)
- return engine
- }
-
- func (e *Engine) SetChunkDuration(d time.Duration) {
- e.chunkDuration = d
- }
-
- // LiveConfigUpdate carries hot-reloadable parameters from the control API.
- // nil pointers mean "no change". Validated before applying.
- type LiveConfigUpdate struct {
- FrequencyMHz *float64
- OutputDrive *float64
- StereoEnabled *bool
- PilotLevel *float64
- RDSInjection *float64
- RDSEnabled *bool
- LimiterEnabled *bool
- LimiterCeiling *float64
- PS *string
- RadioText *string
- }
-
- // UpdateConfig applies live parameter changes without restarting the engine.
- // DSP params take effect at the next chunk boundary (~50ms max).
- // Frequency changes are applied between chunks via driver.Tune().
- // RDS text updates are applied at the next RDS group boundary (~88ms).
- func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
- // --- Validate ---
- if u.FrequencyMHz != nil {
- if *u.FrequencyMHz < 65 || *u.FrequencyMHz > 110 {
- return fmt.Errorf("frequencyMHz out of range (65-110)")
- }
- }
- if u.OutputDrive != nil {
- if *u.OutputDrive < 0 || *u.OutputDrive > 10 {
- return fmt.Errorf("outputDrive out of range (0-10)")
- }
- }
- if u.PilotLevel != nil {
- if *u.PilotLevel < 0 || *u.PilotLevel > 0.2 {
- return fmt.Errorf("pilotLevel out of range (0-0.2)")
- }
- }
- if u.RDSInjection != nil {
- if *u.RDSInjection < 0 || *u.RDSInjection > 0.15 {
- return fmt.Errorf("rdsInjection out of range (0-0.15)")
- }
- }
- if u.LimiterCeiling != nil {
- if *u.LimiterCeiling < 0 || *u.LimiterCeiling > 2 {
- return fmt.Errorf("limiterCeiling out of range (0-2)")
- }
- }
-
- // --- Frequency: store for run loop to apply via driver.Tune() ---
- if u.FrequencyMHz != nil {
- freqHz := *u.FrequencyMHz * 1e6
- e.pendingFreq.Store(&freqHz)
- }
-
- // --- RDS text: forward to encoder atomics ---
- if u.PS != nil || u.RadioText != nil {
- if enc := e.generator.RDSEncoder(); enc != nil {
- ps, rt := "", ""
- if u.PS != nil {
- ps = *u.PS
- }
- if u.RadioText != nil {
- rt = *u.RadioText
- }
- enc.UpdateText(ps, rt)
- }
- }
-
- // --- DSP params: build new LiveParams from current + patch ---
- // Read current, apply deltas, store new
- current := e.generator.CurrentLiveParams()
- next := current // copy
-
- if u.OutputDrive != nil {
- next.OutputDrive = *u.OutputDrive
- }
- if u.StereoEnabled != nil {
- next.StereoEnabled = *u.StereoEnabled
- }
- if u.PilotLevel != nil {
- next.PilotLevel = *u.PilotLevel
- }
- if u.RDSInjection != nil {
- next.RDSInjection = *u.RDSInjection
- }
- if u.RDSEnabled != nil {
- next.RDSEnabled = *u.RDSEnabled
- }
- if u.LimiterEnabled != nil {
- next.LimiterEnabled = *u.LimiterEnabled
- }
- if u.LimiterCeiling != nil {
- next.LimiterCeiling = *u.LimiterCeiling
- }
-
- e.generator.UpdateLive(next)
- return nil
- }
-
- func (e *Engine) Start(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineIdle {
- e.mu.Unlock()
- return fmt.Errorf("engine already in state %s", e.state)
- }
-
- if err := e.driver.Start(ctx); err != nil {
- e.mu.Unlock()
- return fmt.Errorf("driver start: %w", err)
- }
-
- runCtx, cancel := context.WithCancel(ctx)
- e.cancel = cancel
- e.state = EngineRunning
- e.setRuntimeState(RuntimeStateArming)
- e.startedAt = time.Now()
- e.wg.Add(1)
- e.mu.Unlock()
-
- go e.run(runCtx)
- return nil
- }
-
- func (e *Engine) Stop(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineRunning {
- e.mu.Unlock()
- return nil
- }
- e.state = EngineStopping
- e.setRuntimeState(RuntimeStateStopping)
- e.cancel()
- e.mu.Unlock()
-
- // Wait for run() goroutine to exit — deterministic, no guessing
- e.wg.Wait()
-
- if err := e.driver.Flush(ctx); err != nil {
- return err
- }
- if err := e.driver.Stop(ctx); err != nil {
- return err
- }
-
- e.mu.Lock()
- e.state = EngineIdle
- e.setRuntimeState(RuntimeStateIdle)
- e.mu.Unlock()
- return nil
- }
-
- func (e *Engine) Stats() EngineStats {
- e.mu.Lock()
- state := e.state
- startedAt := e.startedAt
- e.mu.Unlock()
-
- var uptime float64
- if state == EngineRunning {
- uptime = time.Since(startedAt).Seconds()
- }
- errVal, _ := e.lastError.Load().(string)
-
- queue := e.frameQueue.Stats()
- lateBuffers := e.lateBuffers.Load()
- now := time.Now()
- lateAlertAt := e.lateBufferAlertAt.Load()
- hasRecentLateBuffers := lateAlertAt > 0 && now.Sub(time.Unix(0, int64(lateAlertAt))) <= lateBufferIndicatorWindow
- ri := runtimeIndicator(queue.Health, hasRecentLateBuffers)
- return EngineStats{
- State: string(e.currentRuntimeState()),
- ChunksProduced: e.chunksProduced.Load(),
- TotalSamples: e.totalSamples.Load(),
- Underruns: e.underruns.Load(),
- LateBuffers: lateBuffers,
- LastError: errVal,
- UptimeSeconds: uptime,
- MaxCycleMs: durationMs(e.maxCycleNs.Load()),
- MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
- MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
- MaxWriteMs: durationMs(e.maxWriteNs.Load()),
- Queue: queue,
- RuntimeIndicator: ri,
- RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers),
- }
- }
-
- func runtimeIndicator(queueHealth output.QueueHealth, recentLateBuffers bool) RuntimeIndicator {
- switch {
- case queueHealth == output.QueueHealthCritical:
- return RuntimeIndicatorQueueCritical
- case queueHealth == output.QueueHealthLow || recentLateBuffers:
- return RuntimeIndicatorDegraded
- default:
- return RuntimeIndicatorNormal
- }
- }
-
- func runtimeAlert(queueHealth output.QueueHealth, recentLateBuffers bool) string {
- switch {
- case queueHealth == output.QueueHealthCritical:
- return "queue health critical"
- case recentLateBuffers:
- return "late buffers"
- case queueHealth == output.QueueHealthLow:
- return "queue health low"
- default:
- return ""
- }
- }
-
- func (e *Engine) run(ctx context.Context) {
- e.setRuntimeState(RuntimeStateRunning)
- e.wg.Add(1)
- go e.writerLoop(ctx)
- defer e.wg.Done()
-
- for {
- if ctx.Err() != nil {
- return
- }
-
- // Apply pending frequency change between chunks
- if pf := e.pendingFreq.Swap(nil); pf != nil {
- if err := e.driver.Tune(ctx, *pf); err != nil {
- e.lastError.Store(fmt.Sprintf("tune: %v", err))
- } else {
- log.Printf("engine: tuned to %.3f MHz", *pf/1e6)
- }
- }
-
- t0 := time.Now()
- frame := e.generator.GenerateFrame(e.chunkDuration)
- frame.GeneratedAt = t0
- t1 := time.Now()
- if e.upsampler != nil {
- frame = e.upsampler.Process(frame)
- frame.GeneratedAt = t0
- }
- t2 := time.Now()
-
- genDur := t1.Sub(t0)
- upDur := t2.Sub(t1)
- updateMaxDuration(&e.maxGenerateNs, genDur)
- updateMaxDuration(&e.maxUpsampleNs, upDur)
-
- enqueued := cloneFrame(frame)
- if enqueued == nil {
- e.lastError.Store("engine: frame clone failed")
- e.underruns.Add(1)
- continue
- }
-
- if err := e.frameQueue.Push(ctx, enqueued); err != nil {
- if ctx.Err() != nil {
- return
- }
- if errors.Is(err, output.ErrFrameQueueClosed) {
- return
- }
- e.lastError.Store(err.Error())
- e.underruns.Add(1)
- select {
- case <-time.After(e.chunkDuration):
- case <-ctx.Done():
- return
- }
- continue
- }
- }
- }
-
- func (e *Engine) writerLoop(ctx context.Context) {
- defer e.wg.Done()
- for {
- frame, err := e.frameQueue.Pop(ctx)
- if err != nil {
- if ctx.Err() != nil {
- return
- }
- if errors.Is(err, output.ErrFrameQueueClosed) {
- return
- }
- e.lastError.Store(err.Error())
- e.underruns.Add(1)
- continue
- }
-
- writeStart := time.Now()
- n, err := e.driver.Write(ctx, frame)
- writeDur := time.Since(writeStart)
-
- cycleDur := writeDur
- if !frame.GeneratedAt.IsZero() {
- cycleDur = time.Since(frame.GeneratedAt)
- }
-
- updateMaxDuration(&e.maxWriteNs, writeDur)
- updateMaxDuration(&e.maxCycleNs, cycleDur)
-
- if cycleDur > e.chunkDuration {
- late := e.lateBuffers.Add(1)
- e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
- if late <= 5 || late%20 == 0 {
- log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s",
- cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration)
- }
- }
-
- if err != nil {
- if ctx.Err() != nil {
- return
- }
- e.lastError.Store(err.Error())
- e.underruns.Add(1)
- select {
- case <-time.After(e.chunkDuration):
- case <-ctx.Done():
- return
- }
- continue
- }
-
- e.chunksProduced.Add(1)
- e.totalSamples.Add(uint64(n))
- }
- }
-
- func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame {
- if src == nil {
- return nil
- }
- samples := make([]output.IQSample, len(src.Samples))
- copy(samples, src.Samples)
- return &output.CompositeFrame{
- Samples: samples,
- SampleRateHz: src.SampleRateHz,
- Timestamp: src.Timestamp,
- GeneratedAt: src.GeneratedAt,
- Sequence: src.Sequence,
- }
- }
-
- func (e *Engine) setRuntimeState(state RuntimeState) {
- e.runtimeState.Store(state)
- }
-
- func (e *Engine) currentRuntimeState() RuntimeState {
- if v := e.runtimeState.Load(); v != nil {
- if rs, ok := v.(RuntimeState); ok {
- return rs
- }
- }
- return RuntimeStateIdle
- }
|