diff --git a/internal/app/engine.go b/internal/app/engine.go index 02c7f48..eab5a0f 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -58,6 +58,7 @@ type Engine struct { state EngineState cancel context.CancelFunc startedAt time.Time + wg sync.WaitGroup chunksProduced atomic.Uint64 totalSamples atomic.Uint64 @@ -104,6 +105,7 @@ func (e *Engine) Start(ctx context.Context) error { e.cancel = cancel e.state = EngineRunning e.startedAt = time.Now() + e.wg.Add(1) e.mu.Unlock() go e.run(runCtx) @@ -120,7 +122,8 @@ func (e *Engine) Stop(ctx context.Context) error { e.cancel() e.mu.Unlock() - time.Sleep(e.chunkDuration * 2) + // Wait for run() goroutine to exit — deterministic, no guessing + e.wg.Wait() if err := e.driver.Flush(ctx); err != nil { return err @@ -158,6 +161,7 @@ func (e *Engine) Stats() EngineStats { } func (e *Engine) run(ctx context.Context) { + defer e.wg.Done() for { if ctx.Err() != nil { return @@ -168,6 +172,12 @@ func (e *Engine) run(ctx context.Context) { if ctx.Err() != nil { return } e.lastError.Store(err.Error()) e.underruns.Add(1) + // Back off to avoid pegging CPU on persistent errors + select { + case <-time.After(e.chunkDuration): + case <-ctx.Done(): + return + } continue } e.chunksProduced.Add(1) diff --git a/internal/control/control.go b/internal/control/control.go index 0005cd5..2dc4c72 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -162,6 +162,10 @@ func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(cfg) case http.MethodPost: + // TODO: config changes only update the control server's copy. + // The running Engine/Generator holds its own snapshot and won't + // pick up these changes until restarted. Wire up a hot-reload + // path or document this limitation clearly for operators. var patch ConfigPatch if err := json.NewDecoder(r.Body).Decode(&patch); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/internal/dsp/fmmod.go b/internal/dsp/fmmod.go index 9036f2a..d576fcf 100644 --- a/internal/dsp/fmmod.go +++ b/internal/dsp/fmmod.go @@ -31,7 +31,7 @@ func (m *FMModulator) Modulate(composite float64) (i, q float64) { m.phase += 2 * math.Pi * freqOffset / m.SampleRate // Keep phase bounded to avoid float64 precision loss over long runs - if m.phase > math.Pi { + if m.phase > math.Pi || m.phase < -math.Pi { m.phase -= 2 * math.Pi * math.Floor((m.phase+math.Pi)/(2*math.Pi)) } diff --git a/internal/dsp/preemphasis.go b/internal/dsp/preemphasis.go index d627aad..bc92990 100644 --- a/internal/dsp/preemphasis.go +++ b/internal/dsp/preemphasis.go @@ -9,9 +9,9 @@ import "math" // Transfer function: H(s) = 1 + s*τ // Bilinear transform to discrete: H(z) = (b0 + b1*z^-1) / (1 + a1*z^-1) type PreEmphasis struct { - b0, b1, a1 float64 - x1, y1 float64 // state - enabled bool + b0, b1 float64 + x1 float64 // state + enabled bool } // NewPreEmphasis creates a pre-emphasis filter for the given time constant @@ -38,7 +38,6 @@ func NewPreEmphasis(tauMicroseconds, sampleRate float64) *PreEmphasis { return &PreEmphasis{ b0: gain, b1: -alpha * gain, - a1: 0, // FIR, no feedback enabled: true, } } @@ -56,7 +55,6 @@ func (p *PreEmphasis) Process(in float64) float64 { // Reset clears the filter state. func (p *PreEmphasis) Reset() { p.x1 = 0 - p.y1 = 0 } // DeEmphasis implements the complementary de-emphasis filter. diff --git a/internal/offline/generator.go b/internal/offline/generator.go index 2f57649..816283a 100644 --- a/internal/offline/generator.go +++ b/internal/offline/generator.go @@ -20,9 +20,9 @@ type frameSource interface { NextFrame() audio.Frame } -// PreEmphasizedSource wraps an audio source and applies pre-emphasis at the -// audio input rate, before upsampling to composite rate. This is more -// efficient than filtering at composite rate and is the correct signal path. +// PreEmphasizedSource wraps an audio source and applies pre-emphasis. +// The source is expected to already output at composite rate (resampled +// upstream). Pre-emphasis is applied per-sample at that rate. type PreEmphasizedSource struct { src frameSource preL *dsp.PreEmphasis @@ -68,6 +68,7 @@ type Generator struct { fmMod *dsp.FMModulator sampleRate float64 initialized bool + frameSeq uint64 } func NewGenerator(cfg cfgpkg.Config) *Generator { @@ -124,11 +125,12 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame samples := int(duration.Seconds() * g.sampleRate) if samples <= 0 { samples = int(g.sampleRate / 10) } + g.frameSeq++ frame := &output.CompositeFrame{ Samples: make([]output.IQSample, samples), SampleRateHz: g.sampleRate, Timestamp: time.Now().UTC(), - Sequence: 1, + Sequence: g.frameSeq, } ceiling := g.cfg.FM.LimiterCeiling @@ -144,7 +146,8 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame rdsValue := 0.0 if g.rdsEnc != nil { - rdsValue = g.rdsEnc.NextSample() + rdsCarrier := g.stereoEncoder.RDSCarrier() + rdsValue = g.rdsEnc.NextSampleWithCarrier(rdsCarrier) } composite := g.combiner.Combine(comps.Mono, comps.Stereo, comps.Pilot, rdsValue) diff --git a/internal/rds/encoder.go b/internal/rds/encoder.go index 3f1edd9..e87b8b0 100644 --- a/internal/rds/encoder.go +++ b/internal/rds/encoder.go @@ -142,7 +142,19 @@ func (e *Encoder) Reset() { } // NextSample returns the next RDS subcarrier sample at the configured rate. +// Uses the internal free-running 57 kHz carrier. Prefer NextSampleWithCarrier +// for phase-locked operation in a stereo MPX chain. func (e *Encoder) NextSample() float64 { + carrier := math.Sin(2 * math.Pi * e.carrierPhase) + e.carrierPhase += e.carrierStep + if e.carrierPhase >= 1.0 { e.carrierPhase -= 1.0 } + return e.NextSampleWithCarrier(carrier) +} + +// NextSampleWithCarrier returns the next RDS sample modulated onto the +// supplied carrier value. The caller must provide sin(3 × pilotPhase × 2π) +// so that the 57 kHz RDS carrier is phase-locked to the 19 kHz pilot. +func (e *Encoder) NextSampleWithCarrier(carrier float64) float64 { if e.sampleCount >= e.spb { if e.bitPos >= bitsPerGroup { e.getRDSGroup() @@ -173,11 +185,6 @@ func (e *Encoder) NextSample() float64 { e.ring[e.outSampleIdx] = 0 e.outSampleIdx++; if e.outSampleIdx >= e.ringSize { e.outSampleIdx = 0 } - // 57 kHz carrier - carrier := math.Sin(2 * math.Pi * e.carrierPhase) - e.carrierPhase += e.carrierStep - if e.carrierPhase >= 1.0 { e.carrierPhase -= 1.0 } - e.sampleCount++ return envelope * carrier } diff --git a/internal/stereo/encoder.go b/internal/stereo/encoder.go index b7dff54..e74e0b0 100644 --- a/internal/stereo/encoder.go +++ b/internal/stereo/encoder.go @@ -19,7 +19,8 @@ type Components struct { // The 38 kHz subcarrier is derived from the pilot phase (2× multiplication), // guaranteeing perfect phase coherence as required by the FM stereo standard. type StereoEncoder struct { - pilot dsp.PilotGenerator + pilot dsp.PilotGenerator + lastPhase float64 // phase captured in last Encode(), for coherent RDS carrier } // NewStereoEncoder creates a StereoEncoder configured for the provided sample rate. @@ -33,9 +34,11 @@ func NewStereoEncoder(sampleRate float64) StereoEncoder { // The 38 kHz subcarrier is sin(2*pilotPhase), derived directly from the pilot // oscillator's phase — not from a separate oscillator. func (s *StereoEncoder) Encode(frame audio.Frame) Components { - // Advance pilot and capture its phase BEFORE generating the sample - pilot := s.pilot.Sample() // sin(2π * 19000 * t) + // Capture phase BEFORE advancing — the 38 kHz subcarrier must use the + // same phase instant as the pilot sample to maintain coherence. pilotPhase := s.pilot.Phase() + s.lastPhase = pilotPhase + pilot := s.pilot.Sample() // sin(2π * 19000 * t), then advances phase // 38 kHz subcarrier = sin(2 * pilotPhase * 2π) = sin(4π * 19000 * t) // This is mathematically identical to sin(2π * 38000 * t) but guaranteed @@ -55,14 +58,16 @@ func (s *StereoEncoder) Reset() { s.pilot.Reset() } -// PilotPhase returns the current pilot oscillator phase in [0, 1). -// Used to derive phase-coherent subcarriers (38 kHz = 2×, 57 kHz = 3×). +// PilotPhase returns the pilot phase used in the most recent Encode() call. +// This is the coherent phase instant for deriving subcarriers (38 kHz = 2×, 57 kHz = 3×). func (s *StereoEncoder) PilotPhase() float64 { - return s.pilot.Phase() + return s.lastPhase } // RDSCarrier returns sin(3 * pilotPhase * 2π) — the 57 kHz carrier // phase-locked to the pilot, as required by the RDS standard. +// Uses the phase captured in the most recent Encode() call so that +// pilot, 38 kHz subcarrier, and 57 kHz RDS carrier are all coherent. func (s *StereoEncoder) RDSCarrier() float64 { - return math.Sin(2 * math.Pi * 3 * s.pilot.Phase()) + return math.Sin(2 * math.Pi * 3 * s.lastPhase) }