Procházet zdrojové kódy

fix: align live MPX phase state and frame sequencing

Keep frame sequence numbers monotonic, lock the RDS carrier to the exact pilot phase used by the stereo encoder, and apply the accompanying DSP/control cleanups needed for stable live transmission behaviour.
tags/v0.7.0-pre
Jan Svabenik před 1 měsícem
rodič
revize
d4673b647d
7 změnil soubory, kde provedl 51 přidání a 24 odebrání
  1. +11
    -1
      internal/app/engine.go
  2. +4
    -0
      internal/control/control.go
  3. +1
    -1
      internal/dsp/fmmod.go
  4. +3
    -5
      internal/dsp/preemphasis.go
  5. +8
    -5
      internal/offline/generator.go
  6. +12
    -5
      internal/rds/encoder.go
  7. +12
    -7
      internal/stereo/encoder.go

+ 11
- 1
internal/app/engine.go Zobrazit soubor

@@ -58,6 +58,7 @@ type Engine struct {
state EngineState state EngineState
cancel context.CancelFunc cancel context.CancelFunc
startedAt time.Time startedAt time.Time
wg sync.WaitGroup


chunksProduced atomic.Uint64 chunksProduced atomic.Uint64
totalSamples atomic.Uint64 totalSamples atomic.Uint64
@@ -104,6 +105,7 @@ func (e *Engine) Start(ctx context.Context) error {
e.cancel = cancel e.cancel = cancel
e.state = EngineRunning e.state = EngineRunning
e.startedAt = time.Now() e.startedAt = time.Now()
e.wg.Add(1)
e.mu.Unlock() e.mu.Unlock()


go e.run(runCtx) go e.run(runCtx)
@@ -120,7 +122,8 @@ func (e *Engine) Stop(ctx context.Context) error {
e.cancel() e.cancel()
e.mu.Unlock() e.mu.Unlock()


time.Sleep(e.chunkDuration * 2)
// Wait for run() goroutine to exit — deterministic, no guessing
e.wg.Wait()


if err := e.driver.Flush(ctx); err != nil { if err := e.driver.Flush(ctx); err != nil {
return err return err
@@ -158,6 +161,7 @@ func (e *Engine) Stats() EngineStats {
} }


func (e *Engine) run(ctx context.Context) { func (e *Engine) run(ctx context.Context) {
defer e.wg.Done()
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
@@ -168,6 +172,12 @@ func (e *Engine) run(ctx context.Context) {
if ctx.Err() != nil { return } if ctx.Err() != nil { return }
e.lastError.Store(err.Error()) e.lastError.Store(err.Error())
e.underruns.Add(1) e.underruns.Add(1)
// Back off to avoid pegging CPU on persistent errors
select {
case <-time.After(e.chunkDuration):
case <-ctx.Done():
return
}
continue continue
} }
e.chunksProduced.Add(1) e.chunksProduced.Add(1)


+ 4
- 0
internal/control/control.go Zobrazit soubor

@@ -162,6 +162,10 @@ func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(cfg) _ = json.NewEncoder(w).Encode(cfg)
case http.MethodPost: case http.MethodPost:
// TODO: config changes only update the control server's copy.
// The running Engine/Generator holds its own snapshot and won't
// pick up these changes until restarted. Wire up a hot-reload
// path or document this limitation clearly for operators.
var patch ConfigPatch var patch ConfigPatch
if err := json.NewDecoder(r.Body).Decode(&patch); err != nil { if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)


+ 1
- 1
internal/dsp/fmmod.go Zobrazit soubor

@@ -31,7 +31,7 @@ func (m *FMModulator) Modulate(composite float64) (i, q float64) {
m.phase += 2 * math.Pi * freqOffset / m.SampleRate m.phase += 2 * math.Pi * freqOffset / m.SampleRate


// Keep phase bounded to avoid float64 precision loss over long runs // Keep phase bounded to avoid float64 precision loss over long runs
if m.phase > math.Pi {
if m.phase > math.Pi || m.phase < -math.Pi {
m.phase -= 2 * math.Pi * math.Floor((m.phase+math.Pi)/(2*math.Pi)) m.phase -= 2 * math.Pi * math.Floor((m.phase+math.Pi)/(2*math.Pi))
} }




+ 3
- 5
internal/dsp/preemphasis.go Zobrazit soubor

@@ -9,9 +9,9 @@ import "math"
// Transfer function: H(s) = 1 + s*τ // Transfer function: H(s) = 1 + s*τ
// Bilinear transform to discrete: H(z) = (b0 + b1*z^-1) / (1 + a1*z^-1) // Bilinear transform to discrete: H(z) = (b0 + b1*z^-1) / (1 + a1*z^-1)
type PreEmphasis struct { type PreEmphasis struct {
b0, b1, a1 float64
x1, y1 float64 // state
enabled bool
b0, b1 float64
x1 float64 // state
enabled bool
} }


// NewPreEmphasis creates a pre-emphasis filter for the given time constant // NewPreEmphasis creates a pre-emphasis filter for the given time constant
@@ -38,7 +38,6 @@ func NewPreEmphasis(tauMicroseconds, sampleRate float64) *PreEmphasis {
return &PreEmphasis{ return &PreEmphasis{
b0: gain, b0: gain,
b1: -alpha * gain, b1: -alpha * gain,
a1: 0, // FIR, no feedback
enabled: true, enabled: true,
} }
} }
@@ -56,7 +55,6 @@ func (p *PreEmphasis) Process(in float64) float64 {
// Reset clears the filter state. // Reset clears the filter state.
func (p *PreEmphasis) Reset() { func (p *PreEmphasis) Reset() {
p.x1 = 0 p.x1 = 0
p.y1 = 0
} }


// DeEmphasis implements the complementary de-emphasis filter. // DeEmphasis implements the complementary de-emphasis filter.


+ 8
- 5
internal/offline/generator.go Zobrazit soubor

@@ -20,9 +20,9 @@ type frameSource interface {
NextFrame() audio.Frame NextFrame() audio.Frame
} }


// PreEmphasizedSource wraps an audio source and applies pre-emphasis at the
// audio input rate, before upsampling to composite rate. This is more
// efficient than filtering at composite rate and is the correct signal path.
// PreEmphasizedSource wraps an audio source and applies pre-emphasis.
// The source is expected to already output at composite rate (resampled
// upstream). Pre-emphasis is applied per-sample at that rate.
type PreEmphasizedSource struct { type PreEmphasizedSource struct {
src frameSource src frameSource
preL *dsp.PreEmphasis preL *dsp.PreEmphasis
@@ -68,6 +68,7 @@ type Generator struct {
fmMod *dsp.FMModulator fmMod *dsp.FMModulator
sampleRate float64 sampleRate float64
initialized bool initialized bool
frameSeq uint64
} }


func NewGenerator(cfg cfgpkg.Config) *Generator { func NewGenerator(cfg cfgpkg.Config) *Generator {
@@ -124,11 +125,12 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame
samples := int(duration.Seconds() * g.sampleRate) samples := int(duration.Seconds() * g.sampleRate)
if samples <= 0 { samples = int(g.sampleRate / 10) } if samples <= 0 { samples = int(g.sampleRate / 10) }


g.frameSeq++
frame := &output.CompositeFrame{ frame := &output.CompositeFrame{
Samples: make([]output.IQSample, samples), Samples: make([]output.IQSample, samples),
SampleRateHz: g.sampleRate, SampleRateHz: g.sampleRate,
Timestamp: time.Now().UTC(), Timestamp: time.Now().UTC(),
Sequence: 1,
Sequence: g.frameSeq,
} }


ceiling := g.cfg.FM.LimiterCeiling ceiling := g.cfg.FM.LimiterCeiling
@@ -144,7 +146,8 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame


rdsValue := 0.0 rdsValue := 0.0
if g.rdsEnc != nil { if g.rdsEnc != nil {
rdsValue = g.rdsEnc.NextSample()
rdsCarrier := g.stereoEncoder.RDSCarrier()
rdsValue = g.rdsEnc.NextSampleWithCarrier(rdsCarrier)
} }


composite := g.combiner.Combine(comps.Mono, comps.Stereo, comps.Pilot, rdsValue) composite := g.combiner.Combine(comps.Mono, comps.Stereo, comps.Pilot, rdsValue)


+ 12
- 5
internal/rds/encoder.go Zobrazit soubor

@@ -142,7 +142,19 @@ func (e *Encoder) Reset() {
} }


// NextSample returns the next RDS subcarrier sample at the configured rate. // NextSample returns the next RDS subcarrier sample at the configured rate.
// Uses the internal free-running 57 kHz carrier. Prefer NextSampleWithCarrier
// for phase-locked operation in a stereo MPX chain.
func (e *Encoder) NextSample() float64 { func (e *Encoder) NextSample() float64 {
carrier := math.Sin(2 * math.Pi * e.carrierPhase)
e.carrierPhase += e.carrierStep
if e.carrierPhase >= 1.0 { e.carrierPhase -= 1.0 }
return e.NextSampleWithCarrier(carrier)
}

// NextSampleWithCarrier returns the next RDS sample modulated onto the
// supplied carrier value. The caller must provide sin(3 × pilotPhase × 2π)
// so that the 57 kHz RDS carrier is phase-locked to the 19 kHz pilot.
func (e *Encoder) NextSampleWithCarrier(carrier float64) float64 {
if e.sampleCount >= e.spb { if e.sampleCount >= e.spb {
if e.bitPos >= bitsPerGroup { if e.bitPos >= bitsPerGroup {
e.getRDSGroup() e.getRDSGroup()
@@ -173,11 +185,6 @@ func (e *Encoder) NextSample() float64 {
e.ring[e.outSampleIdx] = 0 e.ring[e.outSampleIdx] = 0
e.outSampleIdx++; if e.outSampleIdx >= e.ringSize { e.outSampleIdx = 0 } e.outSampleIdx++; if e.outSampleIdx >= e.ringSize { e.outSampleIdx = 0 }
// 57 kHz carrier
carrier := math.Sin(2 * math.Pi * e.carrierPhase)
e.carrierPhase += e.carrierStep
if e.carrierPhase >= 1.0 { e.carrierPhase -= 1.0 }
e.sampleCount++ e.sampleCount++
return envelope * carrier return envelope * carrier
} }


+ 12
- 7
internal/stereo/encoder.go Zobrazit soubor

@@ -19,7 +19,8 @@ type Components struct {
// The 38 kHz subcarrier is derived from the pilot phase (2× multiplication), // The 38 kHz subcarrier is derived from the pilot phase (2× multiplication),
// guaranteeing perfect phase coherence as required by the FM stereo standard. // guaranteeing perfect phase coherence as required by the FM stereo standard.
type StereoEncoder struct { type StereoEncoder struct {
pilot dsp.PilotGenerator
pilot dsp.PilotGenerator
lastPhase float64 // phase captured in last Encode(), for coherent RDS carrier
} }


// NewStereoEncoder creates a StereoEncoder configured for the provided sample rate. // NewStereoEncoder creates a StereoEncoder configured for the provided sample rate.
@@ -33,9 +34,11 @@ func NewStereoEncoder(sampleRate float64) StereoEncoder {
// The 38 kHz subcarrier is sin(2*pilotPhase), derived directly from the pilot // The 38 kHz subcarrier is sin(2*pilotPhase), derived directly from the pilot
// oscillator's phase — not from a separate oscillator. // oscillator's phase — not from a separate oscillator.
func (s *StereoEncoder) Encode(frame audio.Frame) Components { func (s *StereoEncoder) Encode(frame audio.Frame) Components {
// Advance pilot and capture its phase BEFORE generating the sample
pilot := s.pilot.Sample() // sin(2π * 19000 * t)
// Capture phase BEFORE advancing — the 38 kHz subcarrier must use the
// same phase instant as the pilot sample to maintain coherence.
pilotPhase := s.pilot.Phase() pilotPhase := s.pilot.Phase()
s.lastPhase = pilotPhase
pilot := s.pilot.Sample() // sin(2π * 19000 * t), then advances phase


// 38 kHz subcarrier = sin(2 * pilotPhase * 2π) = sin(4π * 19000 * t) // 38 kHz subcarrier = sin(2 * pilotPhase * 2π) = sin(4π * 19000 * t)
// This is mathematically identical to sin(2π * 38000 * t) but guaranteed // This is mathematically identical to sin(2π * 38000 * t) but guaranteed
@@ -55,14 +58,16 @@ func (s *StereoEncoder) Reset() {
s.pilot.Reset() s.pilot.Reset()
} }


// PilotPhase returns the current pilot oscillator phase in [0, 1).
// Used to derive phase-coherent subcarriers (38 kHz = 2×, 57 kHz = 3×).
// PilotPhase returns the pilot phase used in the most recent Encode() call.
// This is the coherent phase instant for deriving subcarriers (38 kHz = 2×, 57 kHz = 3×).
func (s *StereoEncoder) PilotPhase() float64 { func (s *StereoEncoder) PilotPhase() float64 {
return s.pilot.Phase()
return s.lastPhase
} }


// RDSCarrier returns sin(3 * pilotPhase * 2π) — the 57 kHz carrier // RDSCarrier returns sin(3 * pilotPhase * 2π) — the 57 kHz carrier
// phase-locked to the pilot, as required by the RDS standard. // phase-locked to the pilot, as required by the RDS standard.
// Uses the phase captured in the most recent Encode() call so that
// pilot, 38 kHz subcarrier, and 57 kHz RDS carrier are all coherent.
func (s *StereoEncoder) RDSCarrier() float64 { func (s *StereoEncoder) RDSCarrier() float64 {
return math.Sin(2 * math.Pi * 3 * s.pilot.Phase())
return math.Sin(2 * math.Pi * 3 * s.lastPhase)
} }

Načítá se…
Zrušit
Uložit