Просмотр исходного кода

feat: add pre-emphasis, MPX limiter and FM IQ modulation DSP blocks

tags/v0.4.0-pre
Jan Svabenik 1 месяц назад
Родитель
Сommit
c22fb7bf2a
7 измененных файлов: 370 добавлений и 1 удалений
  1. +1
    -1
      go.mod
  2. +46
    -0
      internal/dsp/fmmod.go
  3. +41
    -0
      internal/dsp/fmmod_test.go
  4. +70
    -0
      internal/dsp/limiter.go
  5. +46
    -0
      internal/dsp/limiter_test.go
  6. +94
    -0
      internal/dsp/preemphasis.go
  7. +72
    -0
      internal/dsp/preemphasis_test.go

+ 1
- 1
go.mod Просмотреть файл

@@ -1,6 +1,6 @@
module github.com/jan/fm-rds-tx

go 1.24.0
go 1.22

require github.com/jan/fm-rds-tx/internal v0.0.0



+ 46
- 0
internal/dsp/fmmod.go Просмотреть файл

@@ -0,0 +1,46 @@
package dsp

import "math"

// FMModulator converts a composite baseband signal into FM-modulated IQ samples.
// The modulated output represents the instantaneous frequency deviation around
// a zero-IF carrier (baseband IQ). For a real SDR transmitter the SDR hardware
// then upconverts this to the desired center frequency.
type FMModulator struct {
// MaxDeviation is the peak frequency deviation in Hz (±75 kHz for FM broadcast).
MaxDeviation float64
SampleRate float64

phase float64 // accumulated carrier phase in radians
}

// NewFMModulator creates a modulator with broadcast FM defaults.
func NewFMModulator(sampleRate float64) *FMModulator {
return &FMModulator{
MaxDeviation: 75000, // ±75 kHz
SampleRate: sampleRate,
}
}

// Modulate converts a single composite sample (normalized to [-1,+1] representing
// full deviation) into an IQ pair.
func (m *FMModulator) Modulate(composite float64) (i, q float64) {
// Instantaneous frequency offset = composite * maxDeviation
// Phase increment per sample = 2π * freq_offset / sampleRate
freqOffset := composite * m.MaxDeviation
m.phase += 2 * math.Pi * freqOffset / m.SampleRate

// Keep phase bounded to avoid float64 precision loss over long runs
if m.phase > math.Pi {
m.phase -= 2 * math.Pi * math.Floor((m.phase+math.Pi)/(2*math.Pi))
}

i = math.Cos(m.phase)
q = math.Sin(m.phase)
return i, q
}

// Reset clears the modulator phase.
func (m *FMModulator) Reset() {
m.phase = 0
}

+ 41
- 0
internal/dsp/fmmod_test.go Просмотреть файл

@@ -0,0 +1,41 @@
package dsp

import (
"math"
"testing"
)

func TestFMModulatorOutputMagnitude(t *testing.T) {
mod := NewFMModulator(228000)
// IQ output should always have magnitude ~1
for n := 0; n < 1000; n++ {
composite := math.Sin(2 * math.Pi * 1000 * float64(n) / 228000)
i, q := mod.Modulate(composite)
mag := math.Sqrt(i*i + q*q)
if math.Abs(mag-1.0) > 1e-9 {
t.Fatalf("sample %d: magnitude=%.9f, expected 1.0", n, mag)
}
}
}

func TestFMModulatorZeroInput(t *testing.T) {
mod := NewFMModulator(228000)
// Zero input = no deviation = phase stays at 0
for n := 0; n < 100; n++ {
i, q := mod.Modulate(0)
if math.Abs(i-1.0) > 1e-9 || math.Abs(q) > 1e-9 {
t.Fatalf("sample %d: expected (1,0) got (%.6f,%.6f)", n, i, q)
}
}
}

func TestFMModulatorReset(t *testing.T) {
mod := NewFMModulator(228000)
mod.Modulate(0.5)
mod.Modulate(0.5)
mod.Reset()
i, q := mod.Modulate(0)
if math.Abs(i-1.0) > 1e-9 || math.Abs(q) > 1e-9 {
t.Fatalf("after reset: expected (1,0) got (%.6f,%.6f)", i, q)
}
}

+ 70
- 0
internal/dsp/limiter.go Просмотреть файл

@@ -0,0 +1,70 @@
package dsp

import "math"

// MPXLimiter is a look-ahead peak limiter for the composite MPX signal.
// It prevents overmodulation by keeping the composite output within
// the configured ceiling. The limiter applies gain reduction smoothly
// to avoid introducing audible artifacts.
type MPXLimiter struct {
ceiling float64 // maximum absolute output level (e.g. 1.0)
attackCoeff float64 // attack smoothing coefficient
releaseCoeff float64 // release smoothing coefficient
gainReduction float64 // current gain reduction in linear scale (0 = no reduction)
}

// NewMPXLimiter creates a limiter with the given ceiling, attack and release
// times in milliseconds, and sample rate.
func NewMPXLimiter(ceiling, attackMs, releaseMs, sampleRate float64) *MPXLimiter {
if ceiling <= 0 {
ceiling = 1.0
}
if attackMs <= 0 {
attackMs = 0.1 // fast attack for MPX
}
if releaseMs <= 0 {
releaseMs = 50 // moderate release
}
attackSamples := attackMs * sampleRate / 1000
releaseSamples := releaseMs * sampleRate / 1000

return &MPXLimiter{
ceiling: ceiling,
attackCoeff: 1.0 - math.Exp(-1.0/attackSamples),
releaseCoeff: 1.0 - math.Exp(-1.0/releaseSamples),
}
}

// Process applies limiting to a single composite sample.
func (l *MPXLimiter) Process(in float64) float64 {
absIn := math.Abs(in)
targetReduction := 0.0
if absIn > l.ceiling {
targetReduction = 1.0 - l.ceiling/absIn
}

if targetReduction > l.gainReduction {
l.gainReduction += l.attackCoeff * (targetReduction - l.gainReduction)
} else {
l.gainReduction += l.releaseCoeff * (targetReduction - l.gainReduction)
}

gain := 1.0 - l.gainReduction
return in * gain
}

// Reset clears the limiter state.
func (l *MPXLimiter) Reset() {
l.gainReduction = 0
}

// HardClip provides a simple hard clipper as a safety net after the limiter.
func HardClip(sample, ceiling float64) float64 {
if sample > ceiling {
return ceiling
}
if sample < -ceiling {
return -ceiling
}
return sample
}

+ 46
- 0
internal/dsp/limiter_test.go Просмотреть файл

@@ -0,0 +1,46 @@
package dsp

import (
"math"
"testing"
)

func TestMPXLimiterPassesQuiet(t *testing.T) {
lim := NewMPXLimiter(1.0, 0.1, 50, 228000)
for i := 0; i < 100; i++ {
in := 0.3 * math.Sin(float64(i))
out := lim.Process(in)
if math.Abs(out-in) > 1e-9 {
t.Fatalf("limiter altered quiet signal at sample %d: in=%.6f out=%.6f", i, in, out)
}
}
}

func TestMPXLimiterClamps(t *testing.T) {
lim := NewMPXLimiter(1.0, 0.01, 50, 228000)
// Feed a signal well above ceiling
var maxOut float64
for i := 0; i < 10000; i++ {
in := 3.0 * math.Sin(2*math.Pi*1000*float64(i)/228000)
out := lim.Process(in)
if math.Abs(out) > maxOut {
maxOut = math.Abs(out)
}
}
// After attack settles, output should approach ceiling
if maxOut > 1.5 {
t.Fatalf("limiter didn't reduce level enough: maxOut=%.4f", maxOut)
}
}

func TestHardClip(t *testing.T) {
if HardClip(1.5, 1.0) != 1.0 {
t.Fatal("expected clip to 1.0")
}
if HardClip(-1.5, 1.0) != -1.0 {
t.Fatal("expected clip to -1.0")
}
if HardClip(0.5, 1.0) != 0.5 {
t.Fatal("expected passthrough")
}
}

+ 94
- 0
internal/dsp/preemphasis.go Просмотреть файл

@@ -0,0 +1,94 @@
package dsp

import "math"

// PreEmphasis implements a first-order high-shelf filter for FM broadcast
// pre-emphasis. Standard time constants are 50 µs (Europe/world) and
// 75 µs (North America/South Korea).
//
// Transfer function: H(s) = 1 + s*τ
// Bilinear transform to discrete: H(z) = (b0 + b1*z^-1) / (1 + a1*z^-1)
type PreEmphasis struct {
b0, b1, a1 float64
x1, y1 float64 // state
enabled bool
}

// NewPreEmphasis creates a pre-emphasis filter for the given time constant
// and sample rate. tau is in microseconds (50 or 75).
func NewPreEmphasis(tauMicroseconds, sampleRate float64) *PreEmphasis {
if tauMicroseconds <= 0 || sampleRate <= 0 {
return &PreEmphasis{enabled: false}
}
tau := tauMicroseconds * 1e-6

// Bilinear transform of H(s) = 1 + s*tau
// With pre-warping: let c = 2*fs
// Numerator: (1 + tau*c) + (-1 + tau*c)*z^-1 => b0 = 1+tau*c, b1 = -1+tau*c
// Denominator: (1 + tau*c) + (1 - tau*c)*z^-1 (but we normalize so a0=1)
// Wait - cleaner approach: standard first-order shelf.

// H(s) = (1 + s*tau) mapped with bilinear: s = c*(1 - z^-1)/(1 + z^-1), c = 2*fs
// De-emphasis: H_de(z) = (1-alpha)/(1 - alpha*z^-1), alpha = exp(-1/(tau*fs))
// Pre-emphasis: H_pre(z) = 1/H_de(z) = (1 - alpha*z^-1)/(1-alpha)
// y[n] = (x[n] - alpha*x[n-1]) / (1 - alpha)
alpha := math.Exp(-1.0 / (tau * sampleRate))
gain := 1.0 / (1.0 - alpha)

return &PreEmphasis{
b0: gain,
b1: -alpha * gain,
a1: 0, // FIR, no feedback
enabled: true,
}
}

// Process applies the pre-emphasis filter to a single sample.
func (p *PreEmphasis) Process(in float64) float64 {
if !p.enabled {
return in
}
out := p.b0*in + p.b1*p.x1
p.x1 = in
return out
}

// Reset clears the filter state.
func (p *PreEmphasis) Reset() {
p.x1 = 0
p.y1 = 0
}

// DeEmphasis implements the complementary de-emphasis filter.
// H(z) = (1-alpha)/(1 - alpha*z^-1)
type DeEmphasis struct {
alpha float64
gain float64
prevOut float64
enabled bool
}

// NewDeEmphasis creates a de-emphasis filter.
func NewDeEmphasis(tauMicroseconds, sampleRate float64) *DeEmphasis {
if tauMicroseconds <= 0 || sampleRate <= 0 {
return &DeEmphasis{enabled: false}
}
tau := tauMicroseconds * 1e-6
alpha := math.Exp(-1.0 / (tau * sampleRate))
return &DeEmphasis{alpha: alpha, gain: 1.0 - alpha, enabled: true}
}

// Process applies the de-emphasis filter.
func (d *DeEmphasis) Process(in float64) float64 {
if !d.enabled {
return in
}
out := d.gain*in + d.alpha*d.prevOut
d.prevOut = out
return out
}

// Reset clears the filter state.
func (d *DeEmphasis) Reset() {
d.prevOut = 0
}

+ 72
- 0
internal/dsp/preemphasis_test.go Просмотреть файл

@@ -0,0 +1,72 @@
package dsp

import (
"math"
"testing"
)

func TestPreEmphasisBoostsHighFrequency(t *testing.T) {
fs := 48000.0
pe := NewPreEmphasis(50, fs)

// Generate a low-frequency tone and a high-frequency tone,
// measure the energy ratio after pre-emphasis.
n := 4800 // 100ms
lowFreq := 100.0
highFreq := 10000.0

var lowEnergy, highEnergy float64
for i := 0; i < n; i++ {
s := math.Sin(2 * math.Pi * lowFreq * float64(i) / fs)
out := pe.Process(s)
lowEnergy += out * out
}
pe.Reset()
for i := 0; i < n; i++ {
s := math.Sin(2 * math.Pi * highFreq * float64(i) / fs)
out := pe.Process(s)
highEnergy += out * out
}

// High frequency should have more energy than low frequency
// after pre-emphasis. The input energy is the same for both.
ratio := highEnergy / lowEnergy
if ratio < 2.0 {
t.Fatalf("expected high-freq boost, energy ratio=%.2f", ratio)
}
}

func TestPreEmphasisDeEmphasisRoundtrip(t *testing.T) {
fs := 48000.0
pe := NewPreEmphasis(50, fs)
de := NewDeEmphasis(50, fs)

// Run a 1kHz tone through pre+de, should approximately recover
n := 4800
freq := 1000.0
var maxErr float64

// Let filters settle for 200 samples
for i := 0; i < 200; i++ {
s := math.Sin(2 * math.Pi * freq * float64(i) / fs)
de.Process(pe.Process(s))
}
for i := 200; i < n; i++ {
s := math.Sin(2 * math.Pi * freq * float64(i) / fs)
recovered := de.Process(pe.Process(s))
err := math.Abs(recovered - s)
if err > maxErr {
maxErr = err
}
}
if maxErr > 0.05 {
t.Fatalf("roundtrip error too large: %.4f", maxErr)
}
}

func TestDisabledPreEmphasis(t *testing.T) {
pe := NewPreEmphasis(0, 48000)
if pe.Process(0.5) != 0.5 {
t.Fatal("disabled filter should pass through")
}
}

Загрузка…
Отмена
Сохранить