Просмотр исходного кода

Merge branch 'CLIP-FILTER-CLIP'

tags/v0.9.0
Jan Svabenik 1 месяц назад
Родитель
Сommit
99ce317bad
24 измененных файлов: 4226 добавлений и 610 удалений
  1. +2
    -0
      .gitignore
  2. +35
    -3
      cmd/fmrtx/main.go
  3. +100
    -0
      docs/API.md
  4. +287
    -0
      docs/DSP-CHAIN.md
  5. +44
    -0
      docs/config.orangepi-pluto-soapy.json
  6. +10
    -4
      docs/config.plutosdr.json
  7. +30
    -0
      internal/app/engine.go
  8. +196
    -0
      internal/audio/stream.go
  9. +376
    -0
      internal/audio/stream_test.go
  10. +20
    -9
      internal/config/config.go
  11. +61
    -4
      internal/control/control.go
  12. +1631
    -527
      internal/control/ui.html
  13. +228
    -0
      internal/dsp/biquad.go
  14. +154
    -0
      internal/dsp/bs412.go
  15. +68
    -0
      internal/dsp/stereolimiter.go
  16. +153
    -23
      internal/offline/generator.go
  17. +23
    -10
      internal/offline/generator_test.go
  18. +1
    -1
      internal/platform/plutosdr/available_pluto.go
  19. +364
    -0
      internal/platform/plutosdr/pluto_linux.go
  20. +1
    -1
      internal/platform/plutosdr/stub.go
  21. +59
    -27
      internal/platform/soapysdr/lib_unix.go
  22. +16
    -1
      internal/rds/encoder.go
  23. +365
    -0
      scripts/orangepi-build-libiio.sh
  24. +2
    -0
      stream_tx.bat

+ 2
- 0
.gitignore Просмотреть файл

@@ -7,3 +7,5 @@ build/
*.iq
*.raw
*.bak
*.zip
*.exe

+ 35
- 3
cmd/fmrtx/main.go Просмотреть файл

@@ -12,6 +12,7 @@ import (
"time"

apppkg "github.com/jan/fm-rds-tx/internal/app"
"github.com/jan/fm-rds-tx/internal/audio"
cfgpkg "github.com/jan/fm-rds-tx/internal/config"
ctrlpkg "github.com/jan/fm-rds-tx/internal/control"
drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
@@ -31,6 +32,8 @@ func main() {
txMode := flag.Bool("tx", false, "start real TX mode (requires hardware + build tags)")
txAutoStart := flag.Bool("tx-auto-start", false, "auto-start TX on launch")
listDevices := flag.Bool("list-devices", false, "enumerate SoapySDR devices and exit")
audioStdin := flag.Bool("audio-stdin", false, "read S16LE stereo PCM audio from stdin")
audioRate := flag.Int("audio-rate", 44100, "sample rate of stdin audio input (Hz)")
flag.Parse()

// --- list-devices (SoapySDR) ---
@@ -99,7 +102,7 @@ func main() {
if driver == nil {
log.Fatal("no hardware driver available — build with -tags pluto (or -tags soapy)")
}
runTXMode(cfg, driver, *txAutoStart)
runTXMode(cfg, driver, *txAutoStart, *audioStdin, *audioRate)
return
}

@@ -142,7 +145,7 @@ func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver {
return nil
}

func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool) {
func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, audioStdin bool, audioRate int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

@@ -150,9 +153,17 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool) {
// OutputDrive controls composite signal level, NOT hardware gain.
// Hardware TX gain is always 0 dB (max power). Use external attenuator for power control.
soapyCfg := platform.SoapyConfig{
Driver: cfg.Backend.Device,
Driver: cfg.Backend.Driver,
Device: cfg.Backend.Device,
CenterFreqHz: cfg.FM.FrequencyMHz * 1e6,
GainDB: 0, // 0 dB = max TX power on PlutoSDR
DeviceArgs: map[string]string{},
}
if cfg.Backend.URI != "" {
soapyCfg.DeviceArgs["uri"] = cfg.Backend.URI
}
for k, v := range cfg.Backend.DeviceArgs {
soapyCfg.DeviceArgs[k] = v
}
soapyCfg.SampleRateHz = cfg.EffectiveDeviceRate()

@@ -172,10 +183,31 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool) {
// Engine
engine := apppkg.NewEngine(cfg, driver)

// Live audio stream source (optional)
var streamSrc *audio.StreamSource
if audioStdin {
// Buffer: 2 seconds at input rate — enough to absorb jitter
streamSrc = audio.NewStreamSource(audioRate*2, audioRate)
engine.SetStreamSource(streamSrc)

// Stdin ingest goroutine
go func() {
log.Printf("audio: reading S16LE stereo PCM from stdin at %d Hz", audioRate)
if err := audio.IngestReader(os.Stdin, streamSrc); err != nil {
log.Printf("audio: stdin ingest ended: %v", err)
} else {
log.Println("audio: stdin EOF")
}
}()
}

// Control plane
srv := ctrlpkg.NewServer(cfg)
srv.SetDriver(driver)
srv.SetTXController(&txBridge{engine: engine})
if streamSrc != nil {
srv.SetStreamSource(streamSrc)
}

if autoStart {
log.Println("TX: auto-start enabled")


+ 100
- 0
docs/API.md Просмотреть файл

@@ -218,3 +218,103 @@ These cannot be hot-reloaded (they affect DSP pipeline structure):
- `rds.pi` / `rds.pty` — rarely change, baked into encoder init
- `audio.inputPath` — audio source selection
- `backend.kind` / `backend.device` — hardware selection

---

### `POST /audio/stream`

Push raw audio data into the live stream buffer. Format: **S16LE stereo PCM** at the configured `--audio-rate` (default 44100 Hz).

Requires `--audio-stdin` or a configured stream source.

**Request:** Binary body, `application/octet-stream`, raw S16LE stereo PCM bytes.

**Response:**
```json
{
"ok": true,
"frames": 4096,
"stats": {
"available": 12000,
"capacity": 131072,
"buffered": 0.09,
"written": 890000,
"underruns": 0,
"overflows": 0
}
}
```

**Example:**
```bash
# Push a file
ffmpeg -i song.mp3 -f s16le -ar 44100 -ac 2 - | \
curl -X POST --data-binary @- http://pluto:8088/audio/stream
```

**Errors:**
- `405` if not POST
- `503` if no audio stream configured

---

## Audio Streaming

### Stdin pipe (primary method)

Pipe any audio source through ffmpeg into the transmitter:

```bash
# Internet radio stream
ffmpeg -i "http://stream.example.com/radio.mp3" -f s16le -ar 44100 -ac 2 - | \
fmrtx --tx --tx-auto-start --audio-stdin --config config.json

# Local music file
ffmpeg -i music.flac -f s16le -ar 44100 -ac 2 - | \
fmrtx --tx --tx-auto-start --audio-stdin

# Playlist (ffmpeg concat)
ffmpeg -f concat -i playlist.txt -f s16le -ar 44100 -ac 2 - | \
fmrtx --tx --tx-auto-start --audio-stdin

# PulseAudio / ALSA capture (Linux)
parecord --format=s16le --rate=44100 --channels=2 - | \
fmrtx --tx --tx-auto-start --audio-stdin

# Custom sample rate (e.g. 48kHz source)
ffmpeg -i source.wav -f s16le -ar 48000 -ac 2 - | \
fmrtx --tx --tx-auto-start --audio-stdin --audio-rate 48000
```

### HTTP audio push

Push audio from a remote machine via the HTTP API:

```bash
# From another machine on the network
ffmpeg -i music.mp3 -f s16le -ar 44100 -ac 2 - | \
curl -X POST --data-binary @- http://pluto-host:8088/audio/stream
```

### Audio buffer

The stream uses a lock-free ring buffer (default: 2 seconds at input rate). Buffer stats are available in `GET /runtime` under `audioStream`:

```json
{
"audioStream": {
"available": 12000,
"capacity": 131072,
"buffered": 0.09,
"written": 890000,
"underruns": 0,
"overflows": 0
}
}
```

- **underruns**: DSP consumed faster than audio arrived (silence inserted)
- **overflows**: Audio arrived faster than DSP consumed (data dropped)
- **buffered**: Fill ratio (0.0 = empty, 1.0 = full)

When no audio is streaming, the transmitter falls back to the configured tone generator or silence.

+ 287
- 0
docs/DSP-CHAIN.md Просмотреть файл

@@ -0,0 +1,287 @@
# fm-rds-tx — DSP Signal Chain & Konfiguration

## Übersicht

fm-rds-tx ist ein broadcast-konformer FM-Stereo-MPX-Encoder mit RDS für PlutoSDR/SoapySDR.
Die DSP-Kette folgt dem Industriestandard (Omnia, Orban, Stereo Tool) mit Clip-Filter-Clip-
Architektur und ITU-R BS.412 MPX Power Limiting.

---

## Signalkette

```
┌─────────────────────────────────────────────────────────────────┐
│ AUDIO INPUT │
│ S16LE Stereo PCM via stdin (ffmpeg) oder interner Tongenerator │
└──────────────────────────┬──────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ STAGE 1: Pre-Emphasis + Band-Limiting (pro Kanal L/R) │
│ │
│ Audio × gain ──→ Pre-Emphasis (50µs EU / 75µs US) │
│ ──→ 15kHz LPF (8th-order Chebyshev Type I, 0.5dB Ripple) │
│ ──→ 19kHz Notch (Q=15, double-cascade) │
│ │
│ Frequenzantwort (verifiziert): │
│ 10kHz: +0.1dB (flat) 15kHz: -0.2dB │
│ 17kHz: -21dB 18.5kHz: -40dB │
│ 19kHz: -155dB (tot) 22kHz: -51dB │
└──────────────────────────┬──────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ STAGE 2: Drive + Kompression + Clip₁ │
│ │
│ × OutputDrive │
│ ──→ StereoLimiter (5ms Attack / 200ms Release, ceiling) │
│ ──→ HardClip₁ (ceiling) │
│ │
│ Der Limiter komprimiert die Dynamik (bringt Average hoch). │
│ Der Clip fängt Peaks die der Limiter's Attack verpasst. │
│ "Slow-to-fast Progression" — Broadcast-Standard. │
└──────────────────────────┬──────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ STAGE 3: Cleanup LPF + Clip₂ (Overshoot-Kompensator) │
│ │
│ ──→ 15kHz LPF (8th-order Chebyshev, identisch zu Stage 1) │
│ ──→ HardClip₂ (ceiling) │
│ │
│ Der zweite LPF-Pass entfernt Clip₁-Harmonische. │
│ Clip₂ fängt die LPF-Overshoots (IIR-Filter erzeugen diese). │
│ Doppelter LPF-Pass verdoppelt die Guard-Band-Dämpfung. │
└──────────────────────────┬──────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ STAGE 4: Stereo-Encode │
│ │
│ L/R ──→ Mono: (L+R)/2 (0–15kHz Baseband) │
│ ──→ Stereo: (L-R)/2 × sin(38kHz) (23–53kHz DSB-SC) │
│ ──→ Pilot: sin(19kHz) (phase-locked, kohärent) │
│ ──→ RDS Carrier: sin(57kHz) (3× Pilot-Phase, kohärent) │
└──────────────────────────┬──────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ STAGE 5: Composite Clip + Schutzfilter │
│ │
│ Audio-MPX (Mono + Stereo-Sub) │
│ ──→ HardClip₃ (ceiling) — finale Deviations-Kontrolle │
│ ──→ 19kHz Notch (Q=10, double) — Clip-Harmonische bei Pilot │
│ ──→ 57kHz Notch (Q=10, double) — Clip-Harmonische bei RDS │
│ │
│ Guard Bands (total, 2× LPF + Notches): │
│ 19kHz: >-80dB broadband, >-90dB exakt │
│ 57kHz: >-100dB │
│ (Omnia 11 Spezifikation: >80dB — wir sind on par) │
└──────────────────────────┬──────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ STAGE 6: BS.412 MPX Power Limiter (optional) │
│ │
│ ──→ × BS412 Gain │
│ │
│ Rolling 60-Sekunden RMS-Messung auf dem Audio-Composite. │
│ Langsamer Gain-Regler (2s Attack / 5s Release). │
│ Zieht Pilot+RDS-Power automatisch vom Budget ab. │
│ Pflicht in CH, DE, NL, FR für lizenzierte FM-Sender. │
│ ~5dB Lautheitsverlust bei 0 dBr Threshold. │
└──────────────────────────┬──────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ STAGE 7: Pilot + RDS Injection (fixe Amplitude) │
│ │
│ composite = audioMPX │
│ + pilotLevel × sin(19kHz) — IMMER 9% │
│ + rdsInjection × rdsWaveform — IMMER 4% │
│ │
│ Pilot und RDS werden NIE geclippt, NIE gefiltert, NIE vom │
│ BS.412-Limiter berührt. Konstante Amplitude, immer. │
│ │
│ Peak Composite = ceiling + pilotLevel + rdsInjection ≈ 113% │
│ (Standard-Broadcast-Praxis — Pilot/RDS werden von den meisten │
│ Regulierungsbehörden aus dem Modulationslimit ausgenommen) │
└──────────────────────────┬──────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ STAGE 8: FM-Modulation │
│ │
│ Split-Rate: Composite @ 228kHz ──→ FMUpsampler ──→ IQ @ 2.28MHz│
│ maxDeviation × mpxGain = effektive Deviation │
│ composite=1.0 → ±75kHz Deviation (bei mpxGain=1.0) │
│ │
│ Ausgabe: IQ-Samples (float32) an PlutoSDR via libiio │
└──────────────────────────────────────────────────────────────────┘
```

---

## Konfiguration

### Audio

| Parameter | Typ | Default | Beschreibung |
|---|---|---|---|
| `audio.gain` | float | 1.0 | Eingangsverstärkung vor Pre-Emphasis. 1.0 = unity. |
| `audio.inputPath` | string | "" | WAV-Datei als Quelle (leer = stdin oder Tongenerator) |

**Empfehlung:** `gain: 1.0`. Pegel-Kontrolle über `outputDrive`.

### FM — Audio-Processing

| Parameter | Typ | Default | Bereich | Beschreibung |
|---|---|---|---|---|
| `outputDrive` | float | 0.5 | 0–10 | Eingangsverstärkung vor Limiter/Clip. Bestimmt wie aggressiv die Kompression arbeitet. |
| `limiterEnabled` | bool | true | — | Aktiviert den StereoLimiter (5ms/200ms). |
| `limiterCeiling` | float | 1.0 | 0–2 | Maximum-Amplitude für Audio L/R und Composite. 1.0 = ±75kHz. |
| `preEmphasisTauUS` | float | 50 | 0/50/75 | Pre-Emphasis Zeitkonstante. 50µs = Europa/CH, 75µs = USA, 0 = aus. |

**outputDrive im Detail:**

Der Drive bestimmt den *Klangcharakter*, nicht die Lautstärke (wenn BS.412 aktiv ist):

| Drive | Effekt | Einsatz |
|---|---|---|
| 1–2 | Wenig Kompression, dynamisch, sauber | Klassik, Jazz, Wortbeiträge |
| 3–4 | Moderate Kompression, ausgewogen | **Empfohlen für die meisten Formate** |
| 5–7 | Aggressive Kompression, dichter Sound | Pop/Rock-Formatradio |
| 8–10 | Maximale Dichte, hörbare Clip-Artefakte | Nicht empfohlen |

**Empfehlung:** `outputDrive: 3.0` für sauberen, broadcast-fähigen Sound.

### FM — Pilot & RDS

| Parameter | Typ | Default | Bereich | Beschreibung |
|---|---|---|---|---|
| `pilotLevel` | float | 0.09 | 0–0.2 | 19kHz Pilot-Amplitude. **Direkte Prozentangabe von ±75kHz.** 0.09 = 9% = ITU-Standard. |
| `rdsInjection` | float | 0.04 | 0–0.15 | RDS-Amplitude. **Direkte Prozentangabe.** 0.04 = 4%. Waveform ist unity-normalisiert. |
| `stereoEnabled` | bool | true | — | Stereo-Encode an/aus. Aus = nur Mono (L+R)/2, kein Pilot. |

**Empfehlung:** `pilotLevel: 0.09`, `rdsInjection: 0.04`. Nicht ändern ausser es gibt einen guten Grund.

Pilot und RDS sind **unabhängig vom OutputDrive** — sie bleiben immer bei der konfigurierten Amplitude, egal wie hart das Audio komprimiert wird.

### FM — BS.412 (ITU-R MPX Power Limiter)

| Parameter | Typ | Default | Beschreibung |
|---|---|---|---|
| `bs412Enabled` | bool | false | Aktiviert den BS.412 MPX Power Limiter. **Pflicht in CH, DE, NL, FR.** |
| `bs412ThresholdDBr` | float | 0 | Power-Limit in dBr. 0 = Standard. +3 = relaxiert. |

**Was BS.412 macht:**
Begrenzt die durchschnittliche MPX-Leistung über ein rollendes 60-Sekunden-Fenster.
Reduziert die Audio-Amplitude langsam wenn die Power den Threshold überschreitet.
Pilot und RDS werden automatisch vom Power-Budget abgezogen.

**Effekt auf den Sound:**
- ~5dB Lautheitsverlust bei 0 dBr mit aggressiver Kompression
- Weniger Verlust bei dynamischerem Material
- OutputDrive beeinflusst bei aktivem BS.412 nur den Klangcharakter, nicht die Lautheit

**Empfehlung:** `bs412Enabled: true`, `bs412ThresholdDBr: 0` für BAKOM-Compliance.

### FM — Hardware-Kalibrierung

| Parameter | Typ | Default | Bereich | Beschreibung |
|---|---|---|---|---|
| `mpxGain` | float | 1.0 | 0.1–5 | Skaliert die FM-Deviation (nicht den Composite!). Kompensiert DAC/SDR-Hardware-Faktoren. |
| `maxDeviationHz` | float | 75000 | 0–150000 | Maximale FM-Deviation in Hz. 75000 = Standard. |
| `compositeRateHz` | int | 228000 | — | Interne DSP-Sample-Rate. 228000 = 12×19kHz (optimal für Pilot-Kohärenz). |

**MpxTool-Kalibrierung:**
1. `mpxGain: 1.0` setzen (keine Skalierung)
2. MpxTool Ref Level so einstellen dass **Pilot Level = 9.0%** anzeigt
3. Für PlutoSDR typisch: Ref Level ca. **-7.5 dBFS**
4. Einmal kalibrieren, nie wieder anfassen

**Empfehlung:** `mpxGain: 1.0`, `maxDeviationHz: 75000`. Kalibrierung über MpxTool Ref Level.

### RDS

| Parameter | Typ | Default | Beschreibung |
|---|---|---|---|
| `rds.enabled` | bool | true | RDS an/aus |
| `rds.pi` | string | "1234" | Programme Identification (4-stellig hex). Muss mit BAKOM-Zuteilung übereinstimmen. |
| `rds.ps` | string | "FMRTX" | Programme Service Name (max 8 Zeichen). Stationsname auf dem Display. |
| `rds.radioText` | string | "" | Radio Text (max 64 Zeichen). Scrolltext auf dem Display. |
| `rds.pty` | int | 0 | Programme Type. 0=undefined, 1=News, 3=Info, 10=Pop, 15=Other Music, etc. |

### Backend

| Parameter | Typ | Default | Beschreibung |
|---|---|---|---|
| `backend.kind` | string | "file" | `"pluto"` für PlutoSDR, `"soapy"` für SoapySDR, `"file"` für Dateiausgabe |
| `backend.device` | string | "" | Device-String. PlutoSDR: `"usb:"` oder `"ip:192.168.2.1"` |
| `backend.deviceSampleRateHz` | float | 0 | SDR-Device-Rate. 2280000 = 10× compositeRate (optimal). |

---

## Referenz-Konfiguration (BAKOM-konform, PlutoSDR)

```json
{
"audio": {
"gain": 1.0
},
"rds": {
"enabled": true,
"pi": "BEEF",
"ps": "RADIO-ZH",
"radioText": "Ihr Zürcher Kurzradio",
"pty": 0
},
"fm": {
"frequencyMHz": 100.0,
"stereoEnabled": true,
"pilotLevel": 0.09,
"rdsInjection": 0.04,
"preEmphasisTauUS": 50,
"outputDrive": 3.0,
"limiterEnabled": true,
"limiterCeiling": 1.0,
"bs412Enabled": true,
"bs412ThresholdDBr": 0,
"mpxGain": 1.0,
"compositeRateHz": 228000,
"maxDeviationHz": 75000,
"fmModulationEnabled": true
},
"backend": {
"kind": "pluto",
"device": "usb:",
"deviceSampleRateHz": 2280000
},
"control": {
"listenAddress": "127.0.0.1:8088"
}
}
```

---

## Audio-Streaming (Produktionsbetrieb)

```bash
ffmpeg -i http://stream-url/stream -f s16le -ar 44100 -ac 2 - | fmrtx.exe --tx --tx-auto-start --audio-stdin --audio-rate 44100 --config config.json
```

**Hinweis:** Unter Windows `cmd.exe` verwenden, nicht PowerShell (korrumpiert die Binary-Pipe).

---

## Verifizierte Messwerte (MpxTool, PlutoSDR @ 100MHz)

| Parameter | Messung | Soll |
|---|---|---|
| Pilot Level | 9.0% | 9% ✓ |
| RDS Injection | 3.4% | 4% (≈, BPSK-Mittelung) |
| MPX Peak | 105–110% | 100–113% ✓ |
| Guard Band 19kHz | >-80dB | >-80dB (Omnia 11: >80dB) ✓ |
| Audio Bandwidth | flat bis 15kHz | 15kHz ✓ |

+ 44
- 0
docs/config.orangepi-pluto-soapy.json Просмотреть файл

@@ -0,0 +1,44 @@
{
"audio": {
"inputPath": "",
"gain": 1.0,
"toneLeftHz": 1000,
"toneRightHz": 1600,
"toneAmplitude": 0.2
},
"rds": {
"enabled": true,
"pi": "BEEF",
"ps": "PLUTOOPI",
"radioText": "Orange Pi Pluto Soapy test",
"pty": 0
},
"fm": {
"frequencyMHz": 100.0,
"stereoEnabled": true,
"pilotLevel": 0.09,
"rdsInjection": 0.04,
"preEmphasisTauUS": 50,
"outputDrive": 0.5,
"compositeRateHz": 228000,
"maxDeviationHz": 75000,
"limiterEnabled": true,
"limiterCeiling": 1.0,
"fmModulationEnabled": true,
"mpxGain": 1.0,
"bs412Enabled": false,
"bs412ThresholdDBr": 0
},
"backend": {
"kind": "soapy",
"driver": "plutosdr",
"device": "",
"uri": "ip:pluto.local",
"deviceArgs": {},
"outputPath": "",
"deviceSampleRateHz": 2280000
},
"control": {
"listenAddress": "127.0.0.1:8088"
}
}

+ 10
- 4
docs/config.plutosdr.json Просмотреть файл

@@ -10,16 +10,19 @@
"enabled": true,
"pi": "BEEF",
"ps": "PLUTO-TX",
"radioText": "Hello from PlutoSDR",
"radioText": "TESTATSSENDUNG 1mW",
"pty": 0
},
"fm": {
"bs412Enabled": true,
"bs412ThresholdDBr": 0,
"frequencyMHz": 100.0,
"stereoEnabled": true,
"pilotLevel": 0.041,
"rdsInjection": 0.021,
"pilotLevel": 0.09,
"rdsInjection": 0.04,
"preEmphasisTauUS": 50,
"outputDrive": 2.2,
"outputDrive": 1.0,
"mpxGain": 1.0,
"compositeRateHz": 228000,
"maxDeviationHz": 75000,
"limiterEnabled": true,
@@ -29,6 +32,9 @@
"backend": {
"kind": "pluto",
"device": "usb:",
"driver": "",
"uri": "",
"deviceArgs": {},
"outputPath": "",
"deviceSampleRateHz": 2280000
},


+ 30
- 0
internal/app/engine.go Просмотреть файл

@@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"

"github.com/jan/fm-rds-tx/internal/audio"
cfgpkg "github.com/jan/fm-rds-tx/internal/config"
"github.com/jan/fm-rds-tx/internal/dsp"
offpkg "github.com/jan/fm-rds-tx/internal/offline"
@@ -70,6 +71,30 @@ type Engine struct {

// Live config: pending frequency change, applied between chunks
pendingFreq atomic.Pointer[float64]

// Live audio stream (optional)
streamSrc *audio.StreamSource
}

// SetStreamSource configures a live audio stream as the audio source.
// Must be called before Start(). The StreamResampler is created internally
// to convert from the stream's sample rate to the DSP composite rate.
func (e *Engine) SetStreamSource(src *audio.StreamSource) {
e.streamSrc = src
compositeRate := float64(e.cfg.FM.CompositeRateHz)
if compositeRate <= 0 {
compositeRate = 228000
}
resampler := audio.NewStreamResampler(src, compositeRate)
e.generator.SetExternalSource(resampler)
log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
src.SampleRate, compositeRate, src.Stats().Capacity)
}

// StreamSource returns the live audio stream source, or nil.
// Used by the control server for stats and HTTP audio ingest.
func (e *Engine) StreamSource() *audio.StreamSource {
return e.streamSrc
}

func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
@@ -90,6 +115,11 @@ func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
if maxDev <= 0 {
maxDev = 75000
}
// mpxGain scales the FM deviation to compensate for hardware
// DAC/SDR scaling factors. DSP chain stays at logical 0-1.0 levels.
if cfg.FM.MpxGain > 0 && cfg.FM.MpxGain != 1.0 {
maxDev *= cfg.FM.MpxGain
}
upsampler = dsp.NewFMUpsampler(compositeRate, deviceRate, maxDev)
log.Printf("engine: split-rate mode — DSP@%.0fHz → upsample@%.0fHz (ratio %.2f)",
compositeRate, deviceRate, deviceRate/compositeRate)


+ 196
- 0
internal/audio/stream.go Просмотреть файл

@@ -0,0 +1,196 @@
package audio

import (
"encoding/binary"
"fmt"
"io"
"sync/atomic"
)

// StreamSource is a lock-free SPSC (single-producer, single-consumer) ring buffer
// for real-time audio streaming. One goroutine writes PCM frames, the DSP
// goroutine reads them via NextFrame(). Returns silence on underrun.
//
// Zero allocations in steady state. No mutex in the read or write path.
type StreamSource struct {
ring []Frame
size int
mask int // size-1, for fast modulo (size must be power of 2)
SampleRate int

writePos atomic.Int64
readPos atomic.Int64

Underruns atomic.Uint64
Overflows atomic.Uint64
Written atomic.Uint64
}

// NewStreamSource creates a ring buffer with the given capacity (rounded up
// to next power of 2) and input sample rate.
func NewStreamSource(capacity, sampleRate int) *StreamSource {
// Round up to power of 2
size := 1
for size < capacity {
size <<= 1
}
return &StreamSource{
ring: make([]Frame, size),
size: size,
mask: size - 1,
SampleRate: sampleRate,
}
}

// WriteFrame pushes a single frame into the ring buffer.
// Returns false if the buffer is full (overflow).
func (s *StreamSource) WriteFrame(f Frame) bool {
wp := s.writePos.Load()
rp := s.readPos.Load()
if wp-rp >= int64(s.size) {
s.Overflows.Add(1)
return false
}
s.ring[int(wp)&s.mask] = f
s.writePos.Add(1)
s.Written.Add(1)
return true
}

// WritePCM decodes interleaved S16LE stereo PCM bytes and writes frames
// to the ring buffer. Returns the number of frames written.
func (s *StreamSource) WritePCM(data []byte) int {
frames := len(data) / 4 // 2 channels × 2 bytes per sample
written := 0
for i := 0; i < frames; i++ {
off := i * 4
l := int16(binary.LittleEndian.Uint16(data[off:]))
r := int16(binary.LittleEndian.Uint16(data[off+2:]))
f := NewFrame(
Sample(float64(l)/32768.0),
Sample(float64(r)/32768.0),
)
if !s.WriteFrame(f) {
break
}
written++
}
return written
}

// ReadFrame consumes one frame from the ring buffer.
// Returns silence (0,0) on underrun.
func (s *StreamSource) ReadFrame() Frame {
rp := s.readPos.Load()
wp := s.writePos.Load()
if rp >= wp {
s.Underruns.Add(1)
return NewFrame(0, 0)
}
f := s.ring[int(rp)&s.mask]
s.readPos.Add(1)
return f
}

// NextFrame implements the frameSource interface.
func (s *StreamSource) NextFrame() Frame {
return s.ReadFrame()
}

// Available returns the number of frames currently buffered.
func (s *StreamSource) Available() int {
return int(s.writePos.Load() - s.readPos.Load())
}

// Buffered returns the fill ratio (0.0 = empty, 1.0 = full).
func (s *StreamSource) Buffered() float64 {
return float64(s.Available()) / float64(s.size)
}

// Stats returns diagnostic counters.
func (s *StreamSource) Stats() StreamStats {
return StreamStats{
Available: s.Available(),
Capacity: s.size,
Buffered: s.Buffered(),
Written: s.Written.Load(),
Underruns: s.Underruns.Load(),
Overflows: s.Overflows.Load(),
}
}

// StreamStats exposes runtime telemetry for the stream buffer.
type StreamStats struct {
Available int `json:"available"`
Capacity int `json:"capacity"`
Buffered float64 `json:"buffered"`
Written uint64 `json:"written"`
Underruns uint64 `json:"underruns"`
Overflows uint64 `json:"overflows"`
}

// --- StreamResampler ---

// StreamResampler wraps a StreamSource and rate-converts from the stream's
// native sample rate to the target output rate using linear interpolation.
// Consumes input frames on demand — no buffering beyond the ring buffer.
type StreamResampler struct {
src *StreamSource
ratio float64 // inputRate / outputRate (< 1 when upsampling)
pos float64
prev Frame
curr Frame
}

// NewStreamResampler creates a streaming resampler.
func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler {
if src == nil || outputRate <= 0 || src.SampleRate <= 0 {
return &StreamResampler{src: src, ratio: 1.0}
}
return &StreamResampler{
src: src,
ratio: float64(src.SampleRate) / outputRate,
}
}

// NextFrame returns the next interpolated frame at the output rate.
// Implements the frameSource interface.
func (r *StreamResampler) NextFrame() Frame {
if r.src == nil {
return NewFrame(0, 0)
}

// Consume input samples as the fractional position advances
for r.pos >= 1.0 {
r.prev = r.curr
r.curr = r.src.ReadFrame()
r.pos -= 1.0
}

frac := r.pos
l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac
ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac
r.pos += r.ratio
return NewFrame(Sample(l), Sample(ri))
}

// --- Ingest helpers ---

// IngestReader continuously reads S16LE stereo PCM from an io.Reader into
// a StreamSource. Blocks until the reader returns an error or io.EOF.
// Designed to run as a goroutine.
func IngestReader(r io.Reader, dst *StreamSource) error {
buf := make([]byte, 16384) // 4096 frames per read (16KB)
for {
n, err := r.Read(buf)
if n > 0 {
dst.WritePCM(buf[:n])
}
if err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("audio ingest: %w", err)
}
}
}

+ 376
- 0
internal/audio/stream_test.go Просмотреть файл

@@ -0,0 +1,376 @@
package audio

import (
"bytes"
"encoding/binary"
"io"
"math"
"sync"
"sync/atomic"
"testing"
)

func TestStreamSource_WriteRead(t *testing.T) {
s := NewStreamSource(1024, 44100)
if s.size != 1024 {
t.Fatalf("expected size 1024, got %d", s.size)
}

// Write and read a frame
f := NewFrame(0.5, -0.3)
if !s.WriteFrame(f) {
t.Fatal("write failed")
}
if s.Available() != 1 {
t.Fatalf("expected 1 available, got %d", s.Available())
}

out := s.ReadFrame()
if out.L != 0.5 || out.R != -0.3 {
t.Fatalf("read mismatch: got L=%.2f R=%.2f", out.L, out.R)
}
if s.Available() != 0 {
t.Fatalf("expected 0 available, got %d", s.Available())
}
}

func TestStreamSource_Underrun(t *testing.T) {
s := NewStreamSource(16, 44100)

// Read from empty buffer — should return silence
f := s.ReadFrame()
if f.L != 0 || f.R != 0 {
t.Fatal("expected silence on underrun")
}
if s.Underruns.Load() != 1 {
t.Fatalf("expected 1 underrun, got %d", s.Underruns.Load())
}
}

func TestStreamSource_Overflow(t *testing.T) {
s := NewStreamSource(4, 44100) // size rounds up to 4

// Fill completely
for i := 0; i < 4; i++ {
if !s.WriteFrame(NewFrame(Sample(float64(i)/10), 0)) {
t.Fatalf("write %d failed", i)
}
}

// Next write should overflow
if s.WriteFrame(NewFrame(1, 1)) {
t.Fatal("expected overflow")
}
if s.Overflows.Load() != 1 {
t.Fatalf("expected 1 overflow, got %d", s.Overflows.Load())
}
}

func TestStreamSource_PowerOf2Rounding(t *testing.T) {
tests := []struct{ in, expect int }{
{1, 1}, {2, 2}, {3, 4}, {5, 8}, {100, 128}, {1024, 1024}, {1025, 2048},
}
for _, tt := range tests {
s := NewStreamSource(tt.in, 44100)
if s.size != tt.expect {
t.Fatalf("NewStreamSource(%d): size=%d, expected %d", tt.in, s.size, tt.expect)
}
}
}

func TestStreamSource_FIFO(t *testing.T) {
s := NewStreamSource(64, 44100)
n := 50
for i := 0; i < n; i++ {
s.WriteFrame(NewFrame(Sample(float64(i)), 0))
}
for i := 0; i < n; i++ {
f := s.ReadFrame()
if int(f.L) != i {
t.Fatalf("FIFO order broken at %d: got %d", i, int(f.L))
}
}
}

func TestStreamSource_Wraparound(t *testing.T) {
s := NewStreamSource(8, 44100) // size = 8

// Write and read more than buffer size to test wraparound
for round := 0; round < 10; round++ {
for i := 0; i < 8; i++ {
val := float64(round*8 + i)
if !s.WriteFrame(NewFrame(Sample(val), 0)) {
t.Fatalf("write failed round=%d i=%d", round, i)
}
}
for i := 0; i < 8; i++ {
expected := float64(round*8 + i)
f := s.ReadFrame()
if float64(f.L) != expected {
t.Fatalf("round=%d i=%d: got %f expected %f", round, i, float64(f.L), expected)
}
}
}

stats := s.Stats()
if stats.Underruns != 0 || stats.Overflows != 0 {
t.Fatalf("unexpected errors: underruns=%d overflows=%d", stats.Underruns, stats.Overflows)
}
}

func TestStreamSource_WritePCM(t *testing.T) {
s := NewStreamSource(256, 44100)

// Create 10 stereo frames of S16LE PCM
var buf bytes.Buffer
for i := 0; i < 10; i++ {
l := int16(i * 1000)
r := int16(-i * 1000)
binary.Write(&buf, binary.LittleEndian, l)
binary.Write(&buf, binary.LittleEndian, r)
}

written := s.WritePCM(buf.Bytes())
if written != 10 {
t.Fatalf("expected 10 frames, wrote %d", written)
}

// Verify first frame
f := s.ReadFrame()
if f.L != 0 || f.R != 0 {
t.Fatalf("frame 0: L=%.4f R=%.4f, expected 0", f.L, f.R)
}
// Verify frame 5
for i := 1; i < 5; i++ {
s.ReadFrame()
}
f = s.ReadFrame()
expectedL := 5000.0 / 32768.0
if math.Abs(float64(f.L)-expectedL) > 0.001 {
t.Fatalf("frame 5 L=%.4f, expected %.4f", f.L, expectedL)
}
}

func TestStreamSource_ConcurrentSPSC(t *testing.T) {
s := NewStreamSource(4096, 44100)
frames := 50000
var producerDone atomic.Bool

var wg sync.WaitGroup
wg.Add(2)

// Producer
go func() {
defer wg.Done()
for i := 0; i < frames; i++ {
for !s.WriteFrame(NewFrame(Sample(float64(i+1)), 0)) {
// Buffer full — yield
}
}
producerDone.Store(true)
}()

// Consumer
var lastVal float64
var orderOK = true
var readCount int
go func() {
defer wg.Done()
for {
if s.Available() == 0 {
if producerDone.Load() {
break
}
continue
}
f := s.ReadFrame()
readCount++
v := float64(f.L)
if v > 0 && v < lastVal {
orderOK = false
}
if v > 0 {
lastVal = v
}
}
}()

wg.Wait()

if !orderOK {
t.Fatal("FIFO order broken in concurrent SPSC")
}
if readCount < frames/2 {
t.Fatalf("read too few frames: %d (expected ~%d)", readCount, frames)
}
}

// --- StreamResampler tests ---

func TestStreamResampler_1to1(t *testing.T) {
s := NewStreamSource(256, 44100)
r := NewStreamResampler(s, 44100) // 1:1

for i := 0; i < 100; i++ {
s.WriteFrame(NewFrame(Sample(float64(i)/100), 0))
}

// At 1:1 ratio, output should track input with a small startup delay.
// Skip first few samples (resampler priming), then verify monotonic increase.
prev := -1.0
for i := 0; i < 90; i++ {
f := r.NextFrame()
v := float64(f.L)
if i > 5 && v < prev-0.001 {
t.Fatalf("sample %d: non-monotonic %.4f < %.4f", i, v, prev)
}
if v > 0 {
prev = v
}
}
// Final value should be close to 0.9 (we wrote 0..0.99)
if prev < 0.5 {
t.Fatalf("final value %.4f too low (expected > 0.5)", prev)
}
}

func TestStreamResampler_Upsample(t *testing.T) {
// 44100 → 228000 (ratio ≈ 0.1934, ~5.17× upsampling)
s := NewStreamSource(4096, 44100)
r := NewStreamResampler(s, 228000)

// Write 1000 frames of a 1kHz sine at 44100 Hz
for i := 0; i < 1000; i++ {
v := math.Sin(2 * math.Pi * 1000 * float64(i) / 44100)
s.WriteFrame(NewFrame(Sample(v), Sample(v)))
}

// Read upsampled output — should be ~5170 samples for 1000 input
// (minus a few for resampler priming)
out := make([]float64, 0, 5200)
for i := 0; i < 5000; i++ {
f := r.NextFrame()
out = append(out, float64(f.L))
}

// Verify the output is a smooth sine, not clicks or zeros
// Check that max amplitude is close to 1.0
maxAmp := 0.0
for _, v := range out[100:] { // skip initial ramp
if math.Abs(v) > maxAmp {
maxAmp = math.Abs(v)
}
}
if maxAmp < 0.8 {
t.Fatalf("max amplitude %.4f too low (expected ~1.0)", maxAmp)
}

// Check smoothness: no sudden jumps > 0.1 between adjacent samples
maxJump := 0.0
for i := 101; i < len(out); i++ {
d := math.Abs(out[i] - out[i-1])
if d > maxJump {
maxJump = d
}
}
// At 228kHz with 1kHz tone: max step ≈ sin(2π*1000/228000) ≈ 0.0276
if maxJump > 0.05 {
t.Fatalf("max inter-sample jump %.4f (expected < 0.05 for smooth sine)", maxJump)
}
}

func TestStreamResampler_Downsample(t *testing.T) {
// 96000 → 44100 (ratio ≈ 2.177, downsampling)
s := NewStreamSource(8192, 96000)
r := NewStreamResampler(s, 44100)

// Write 4000 frames at 96kHz
for i := 0; i < 4000; i++ {
v := math.Sin(2 * math.Pi * 440 * float64(i) / 96000)
s.WriteFrame(NewFrame(Sample(v), 0))
}

// Should get ~1837 output frames (4000 × 44100/96000)
count := 0
for i := 0; i < 1800; i++ {
f := r.NextFrame()
_ = f
count++
}
if count != 1800 {
t.Fatalf("expected 1800 reads, got %d", count)
}
}

func TestStreamResampler_NilSource(t *testing.T) {
r := NewStreamResampler(nil, 228000)
f := r.NextFrame()
if f.L != 0 || f.R != 0 {
t.Fatal("expected silence from nil source")
}
}

// --- IngestReader test ---

func TestIngestReader(t *testing.T) {
s := NewStreamSource(4096, 44100)

// Create PCM data: 100 stereo frames
var buf bytes.Buffer
for i := 0; i < 100; i++ {
l := int16(i * 100)
r := int16(-i * 100)
binary.Write(&buf, binary.LittleEndian, l)
binary.Write(&buf, binary.LittleEndian, r)
}

// IngestReader should read all data and return nil (EOF)
err := IngestReader(bytes.NewReader(buf.Bytes()), s)
if err != nil {
t.Fatalf("IngestReader: %v", err)
}

if s.Available() != 100 {
t.Fatalf("expected 100 frames, got %d", s.Available())
}

// Verify first and last
f := s.ReadFrame()
if f.L != 0 {
t.Fatalf("frame 0 L=%.4f, expected 0", f.L)
}
for i := 1; i < 99; i++ {
s.ReadFrame()
}
f = s.ReadFrame()
expectedL := 9900.0 / 32768.0
if math.Abs(float64(f.L)-expectedL) > 0.01 {
t.Fatalf("frame 99 L=%.4f, expected ~%.4f", f.L, expectedL)
}
}

func TestIngestReader_Error(t *testing.T) {
s := NewStreamSource(256, 44100)
errReader := &errAfterN{n: 10}
err := IngestReader(errReader, s)
if err == nil {
t.Fatal("expected error")
}
}

type errAfterN struct {
n, count int
}

func (r *errAfterN) Read(p []byte) (int, error) {
if r.count >= r.n {
return 0, io.ErrUnexpectedEOF
}
r.count++
// Return 4 bytes (one stereo frame)
if len(p) >= 4 {
p[0], p[1], p[2], p[3] = 0, 0, 0, 0
return 4, nil
}
return 0, nil
}

+ 20
- 9
internal/config/config.go Просмотреть файл

@@ -35,8 +35,8 @@ type RDSConfig struct {
type FMConfig struct {
FrequencyMHz float64 `json:"frequencyMHz"`
StereoEnabled bool `json:"stereoEnabled"`
PilotLevel float64 `json:"pilotLevel"` // linear injection level in composite (e.g. 0.1 = 10%)
RDSInjection float64 `json:"rdsInjection"` // linear injection level in composite (e.g. 0.05 = 5%)
PilotLevel float64 `json:"pilotLevel"` // fraction of ±75kHz deviation (0.09 = 9%, ITU standard)
RDSInjection float64 `json:"rdsInjection"` // fraction of ±75kHz deviation (0.04 = 4%, typical)
PreEmphasisTauUS float64 `json:"preEmphasisTauUS"` // time constant in µs: 50 (EU) or 75 (US), 0=off
OutputDrive float64 `json:"outputDrive"`
CompositeRateHz int `json:"compositeRateHz"` // internal DSP/MPX sample rate
@@ -44,13 +44,19 @@ type FMConfig struct {
LimiterEnabled bool `json:"limiterEnabled"`
LimiterCeiling float64 `json:"limiterCeiling"`
FMModulationEnabled bool `json:"fmModulationEnabled"`
MpxGain float64 `json:"mpxGain"` // hardware calibration: scales entire composite output (default 1.0)
BS412Enabled bool `json:"bs412Enabled"` // ITU-R BS.412 MPX power limiter (EU requirement)
BS412ThresholdDBr float64 `json:"bs412ThresholdDBr"` // power limit in dBr (0 = standard, +3 = relaxed)
}

type BackendConfig struct {
Kind string `json:"kind"`
Device string `json:"device"`
OutputPath string `json:"outputPath"`
DeviceSampleRateHz float64 `json:"deviceSampleRateHz"` // actual SDR device rate; 0 = same as compositeRateHz
Kind string `json:"kind"`
Driver string `json:"driver,omitempty"`
Device string `json:"device"`
URI string `json:"uri,omitempty"`
DeviceArgs map[string]string `json:"deviceArgs,omitempty"`
OutputPath string `json:"outputPath"`
DeviceSampleRateHz float64 `json:"deviceSampleRateHz"` // actual SDR device rate; 0 = same as compositeRateHz
}

type ControlConfig struct {
@@ -64,8 +70,8 @@ func Default() Config {
FM: FMConfig{
FrequencyMHz: 100.0,
StereoEnabled: true,
PilotLevel: 0.1,
RDSInjection: 0.05,
PilotLevel: 0.09,
RDSInjection: 0.04,
PreEmphasisTauUS: 50,
OutputDrive: 0.5,
CompositeRateHz: 228000,
@@ -73,6 +79,7 @@ func Default() Config {
LimiterEnabled: true,
LimiterCeiling: 1.0,
FMModulationEnabled: true,
MpxGain: 1.0,
},
Backend: BackendConfig{Kind: "file", OutputPath: "build/out/composite.f32"},
Control: ControlConfig{ListenAddress: "127.0.0.1:8088"},
@@ -128,7 +135,7 @@ func (c Config) Validate() error {
if c.FM.RDSInjection < 0 || c.FM.RDSInjection > 0.15 {
return fmt.Errorf("fm.rdsInjection out of range")
}
if c.FM.OutputDrive < 0 || c.FM.OutputDrive > 3 {
if c.FM.OutputDrive < 0 || c.FM.OutputDrive > 10 {
return fmt.Errorf("fm.outputDrive out of range (0..3)")
}
if c.FM.CompositeRateHz < 96000 || c.FM.CompositeRateHz > 1520000 {
@@ -143,6 +150,10 @@ func (c Config) Validate() error {
if c.FM.LimiterCeiling < 0 || c.FM.LimiterCeiling > 2 {
return fmt.Errorf("fm.limiterCeiling out of range")
}
if c.FM.MpxGain == 0 { c.FM.MpxGain = 1.0 } // default if omitted from JSON
if c.FM.MpxGain < 0.1 || c.FM.MpxGain > 5 {
return fmt.Errorf("fm.mpxGain out of range (0.1..5)")
}
if c.Backend.Kind == "" {
return fmt.Errorf("backend.kind is required")
}


+ 61
- 4
internal/control/control.go Просмотреть файл

@@ -3,9 +3,11 @@ package control
import (
_ "embed"
"encoding/json"
"io"
"net/http"
"sync"

"github.com/jan/fm-rds-tx/internal/audio"
"github.com/jan/fm-rds-tx/internal/config"
drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
"github.com/jan/fm-rds-tx/internal/platform"
@@ -39,10 +41,11 @@ type LivePatch struct {
}

type Server struct {
mu sync.RWMutex
cfg config.Config
tx TXController
drv platform.SoapyDriver // optional, for runtime stats
mu sync.RWMutex
cfg config.Config
tx TXController
drv platform.SoapyDriver // optional, for runtime stats
streamSrc *audio.StreamSource // optional, for live audio ingest
}

type ConfigPatch struct {
@@ -78,6 +81,12 @@ func (s *Server) SetDriver(drv platform.SoapyDriver) {
s.mu.Unlock()
}

func (s *Server) SetStreamSource(src *audio.StreamSource) {
s.mu.Lock()
s.streamSrc = src
s.mu.Unlock()
}

func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/", s.handleUI)
@@ -88,6 +97,7 @@ func (s *Server) Handler() http.Handler {
mux.HandleFunc("/runtime", s.handleRuntime)
mux.HandleFunc("/tx/start", s.handleTXStart)
mux.HandleFunc("/tx/stop", s.handleTXStop)
mux.HandleFunc("/audio/stream", s.handleAudioStream)
return mux
}

@@ -128,6 +138,7 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
s.mu.RLock()
drv := s.drv
tx := s.tx
stream := s.streamSrc
s.mu.RUnlock()

result := map[string]any{}
@@ -137,10 +148,56 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
if tx != nil {
result["engine"] = tx.TXStats()
}
if stream != nil {
result["audioStream"] = stream.Stats()
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(result)
}

// handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
// it into the live audio ring buffer. Use with:
// curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
// ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
s.mu.RLock()
stream := s.streamSrc
s.mu.RUnlock()

if stream == nil {
http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
return
}

// Read body in chunks and push to ring buffer
buf := make([]byte, 32768)
totalFrames := 0
for {
n, err := r.Body.Read(buf)
if n > 0 {
totalFrames += stream.WritePCM(buf[:n])
}
if err != nil {
if err == io.EOF {
break
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"ok": true,
"frames": totalFrames,
"stats": stream.Stats(),
})
}

func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)


+ 1631
- 527
internal/control/ui.html
Разница между файлами не показана из-за своего большого размера
Просмотреть файл


+ 228
- 0
internal/dsp/biquad.go Просмотреть файл

@@ -0,0 +1,228 @@
package dsp

import "math"

// Biquad is a generic second-order IIR filter (direct form II transposed).
type Biquad struct {
b0, b1, b2 float64
a1, a2 float64
z1, z2 float64
}

// Process filters one sample.
func (f *Biquad) Process(in float64) float64 {
out := f.b0*in + f.z1
f.z1 = f.b1*in - f.a1*out + f.z2
f.z2 = f.b2*in - f.a2*out
return out
}

// Reset clears state.
func (f *Biquad) Reset() { f.z1 = 0; f.z2 = 0 }

// FilterChain cascades multiple biquad sections in series.
// Used for higher-order filters (e.g. 4th-order = 2 biquads).
type FilterChain struct {
Stages []Biquad
}

// Process runs input through all stages in series.
func (c *FilterChain) Process(in float64) float64 {
x := in
for i := range c.Stages {
x = c.Stages[i].Process(x)
}
return x
}

// Reset clears all filter state.
func (c *FilterChain) Reset() {
for i := range c.Stages {
c.Stages[i].Reset()
}
}

// --- Factory functions ---

// NewBiquadLPF creates a 2nd-order Butterworth lowpass (Q = 1/√2).
func NewBiquadLPF(cutoffHz, sampleRate float64) *Biquad {
return newBiquadLPFWithQ(cutoffHz, sampleRate, math.Sqrt2/2)
}

func newBiquadLPFWithQ(cutoffHz, sampleRate, q float64) *Biquad {
if cutoffHz <= 0 || sampleRate <= 0 || cutoffHz >= sampleRate/2 {
return &Biquad{b0: 1} // passthrough
}
omega := 2 * math.Pi * cutoffHz / sampleRate
cosW := math.Cos(omega)
sinW := math.Sin(omega)
alpha := sinW / (2 * q)
a0 := 1 + alpha
return &Biquad{
b0: (1 - cosW) / 2 / a0,
b1: (1 - cosW) / a0,
b2: (1 - cosW) / 2 / a0,
a1: (-2 * cosW) / a0,
a2: (1 - alpha) / a0,
}
}

// NewLPF4 creates a 4th-order Butterworth lowpass (two cascaded biquads).
func NewLPF4(cutoffHz, sampleRate float64) *FilterChain {
q1 := 1.0 / (2 * math.Cos(math.Pi/8)) // ≈ 0.5412
q2 := 1.0 / (2 * math.Cos(3*math.Pi/8)) // ≈ 1.3066
return &FilterChain{
Stages: []Biquad{
*newBiquadLPFWithQ(cutoffHz, sampleRate, q1),
*newBiquadLPFWithQ(cutoffHz, sampleRate, q2),
},
}
}

// NewLPF8 creates an 8th-order Butterworth lowpass (four cascaded biquads).
// Provides -48dB/octave rolloff. At 228kHz with fc=15kHz:
//
// 15kHz: -6dB, 19kHz: -28dB, 38kHz: -72dB, 57kHz: -108dB
func NewLPF8(cutoffHz, sampleRate float64) *FilterChain {
// 8th-order Butterworth pole angles: π/16, 3π/16, 5π/16, 7π/16
q1 := 1.0 / (2 * math.Cos(math.Pi/16)) // ≈ 0.5098
q2 := 1.0 / (2 * math.Cos(3*math.Pi/16)) // ≈ 0.6013
q3 := 1.0 / (2 * math.Cos(5*math.Pi/16)) // ≈ 0.8999
q4 := 1.0 / (2 * math.Cos(7*math.Pi/16)) // ≈ 2.5629
return &FilterChain{
Stages: []Biquad{
*newBiquadLPFWithQ(cutoffHz, sampleRate, q1),
*newBiquadLPFWithQ(cutoffHz, sampleRate, q2),
*newBiquadLPFWithQ(cutoffHz, sampleRate, q3),
*newBiquadLPFWithQ(cutoffHz, sampleRate, q4),
},
}
}

// NewNotch creates a 2nd-order IIR notch (bandstop) filter.
// Q controls width: higher Q = narrower notch.
// Typical: Q=5 → ~4kHz wide at -3dB, Q=10 → ~2kHz wide.
func NewNotch(centerHz, sampleRate, q float64) *Biquad {
if centerHz <= 0 || sampleRate <= 0 || centerHz >= sampleRate/2 {
return &Biquad{b0: 1}
}
omega := 2 * math.Pi * centerHz / sampleRate
cosW := math.Cos(omega)
alpha := math.Sin(omega) / (2 * q)
a0 := 1 + alpha
return &Biquad{
b0: 1 / a0,
b1: -2 * cosW / a0,
b2: 1 / a0,
a1: -2 * cosW / a0,
a2: (1 - alpha) / a0,
}
}

// NewChebyshevI creates an Nth-order Chebyshev Type I lowpass filter.
// Passband ripple in dB (typ. 0.5), then steep rolloff into stopband.
// Much steeper transition band than Butterworth at the same order.
// At 228kHz, 8th-order, 0.5dB ripple, fc=15kHz: -40dB@19kHz (vs -17dB Butterworth).
func NewChebyshevI(order int, rippleDB, cutoffHz, sampleRate float64) *FilterChain {
if order < 2 || order%2 != 0 {
return &FilterChain{Stages: []Biquad{{b0: 1}}}
}
if cutoffHz <= 0 || sampleRate <= 0 || cutoffHz >= sampleRate/2 {
return &FilterChain{Stages: []Biquad{{b0: 1}}}
}

N := order
nSections := N / 2

// Chebyshev parameters
epsilon := math.Sqrt(math.Pow(10, rippleDB/10) - 1)
v := math.Asinh(1/epsilon) / float64(N)

// Bilinear transform constant and frequency pre-warp
c := 2.0 * sampleRate
warp := c * math.Tan(math.Pi*cutoffHz/sampleRate)

stages := make([]Biquad, nSections)

for i := 0; i < nSections; i++ {
// Analog prototype pole (normalized Ωc=1)
angle := float64(2*i+1) * math.Pi / float64(2*N)
sigmaN := -math.Sinh(v) * math.Sin(angle)
omegaN := math.Cosh(v) * math.Cos(angle)

// Scale to actual cutoff frequency
sigma := sigmaN * warp
omega := omegaN * warp

// Analog section: H(s) = A / (s² + Bs + A)
A := sigma*sigma + omega*omega
B := -2 * sigma // positive (sigma is negative)

// Bilinear transform to digital biquad
c2 := c * c
a0 := c2 + B*c + A

stages[i] = Biquad{
b0: A / a0,
b1: 2 * A / a0,
b2: A / a0,
a1: (-2*c2 + 2*A) / a0,
a2: (c2 - B*c + A) / a0,
}
}

// Normalize DC gain to unity (Chebyshev even-order has -ripple at DC)
dcGain := 1.0
for _, s := range stages {
dcGain *= (s.b0 + s.b1 + s.b2) / (1 + s.a1 + s.a2)
}
if dcGain > 0 {
corr := 1.0 / dcGain
stages[0].b0 *= corr
stages[0].b1 *= corr
stages[0].b2 *= corr
}

return &FilterChain{Stages: stages}
}

// --- Broadcast-specific filter factories ---

// NewAudioLPF creates the broadcast-standard audio lowpass at 15kHz.
// 8th-order Chebyshev Type I with 0.5dB passband ripple.
// Flat to 15kHz, then steep wall: -40dB@19kHz (vs -17dB Butterworth).
// Two passes through clip-filter-clip: -80dB broadband at 19kHz.
func NewAudioLPF(sampleRate float64) *FilterChain {
return NewChebyshevI(8, 0.5, 15000, sampleRate)
}

// NewPilotNotch creates a double-cascade 19kHz notch for maximum
// rejection at the pilot frequency. Q=15: only 1.3kHz wide (18.4–19.6kHz).
// The 8th-order LPF handles broadband; this kills the exact 19kHz peak.
func NewPilotNotch(sampleRate float64) *FilterChain {
return &FilterChain{
Stages: []Biquad{
*NewNotch(19000, sampleRate, 15),
*NewNotch(19000, sampleRate, 15),
},
}
}

// NewCompositeProtection creates double-cascade notch filters for the
// composite clipper. Q=10: ~1.9kHz wide at 19kHz, ~5.7kHz wide at 57kHz.
// Narrow enough to preserve audio/stereo, deep enough to protect pilot/RDS.
func NewCompositeProtection(sampleRate float64) (notch19, notch57 *FilterChain) {
notch19 = &FilterChain{
Stages: []Biquad{
*NewNotch(19000, sampleRate, 10),
*NewNotch(19000, sampleRate, 10),
},
}
notch57 = &FilterChain{
Stages: []Biquad{
*NewNotch(57000, sampleRate, 10),
*NewNotch(57000, sampleRate, 10),
},
}
return
}

+ 154
- 0
internal/dsp/bs412.go Просмотреть файл

@@ -0,0 +1,154 @@
package dsp

import "math"

// BS412Limiter implements ITU-R BS.412 MPX power limiting.
// Measures the rolling 60-second average power of the composite signal
// and reduces audio gain when the power exceeds the threshold.
//
// The threshold is specified in dBr where 0 dBr is the reference power
// of a fully modulated mono signal (composite peak = 1.0, power = 0.5).
//
// Pilot and RDS power are accounted for: the audio power budget is
// reduced by their constant contribution so the total stays within limits.
type BS412Limiter struct {
enabled bool
thresholdPow float64 // linear power threshold for total MPX
audioBudget float64 // = thresholdPow - pilotPow - rdsPow

// Rolling 60-second power integrator
powerBuf []float64 // per-chunk average power values
bufIdx int
bufFull bool // true once the buffer has wrapped at least once
powerSum float64

// Slow gain controller
gain float64 // current output gain (0..1)
attackCoeff float64 // gain reduction speed
releaseCoeff float64 // gain recovery speed
}

// NewBS412Limiter creates a BS.412 MPX power limiter.
//
// Parameters:
// - thresholdDBr: power limit in dBr (0 = standard, +3 = relaxed)
// - pilotLevel: pilot amplitude in composite (e.g. 0.09)
// - rdsInjection: RDS amplitude in composite (e.g. 0.04)
// - chunkDurationSec: duration of each processing chunk (e.g. 0.05 for 50ms)
func NewBS412Limiter(thresholdDBr, pilotLevel, rdsInjection, chunkDurationSec float64) *BS412Limiter {
// Reference power: 0 dBr = power of mono sine at peak=1.0 = 0.5
refPower := 0.5
thresholdPow := refPower * math.Pow(10, thresholdDBr/10)

// Constant power contributions from pilot and RDS
pilotPow := pilotLevel * pilotLevel / 2 // sine wave RMS²
rdsPow := rdsInjection * rdsInjection / 4 // BPSK has ~half the power of a sine

audioBudget := thresholdPow - pilotPow - rdsPow
if audioBudget < 0.01 {
audioBudget = 0.01
}

// 60-second window in chunks
windowSec := 60.0
bufLen := int(math.Ceil(windowSec / chunkDurationSec))
if bufLen < 10 {
bufLen = 10
}

// Attack: ~2 seconds (slow, avoids pumping)
// Release: ~5 seconds (very slow, smooth recovery)
attackTC := 2.0 / chunkDurationSec // time constant in chunks
releaseTC := 5.0 / chunkDurationSec

return &BS412Limiter{
enabled: true,
thresholdPow: thresholdPow,
audioBudget: audioBudget,
powerBuf: make([]float64, bufLen),
gain: 1.0,
attackCoeff: 1.0 - math.Exp(-1.0/attackTC),
releaseCoeff: 1.0 - math.Exp(-1.0/releaseTC),
}
}

// ProcessChunk measures the audio power of a chunk and returns the
// gain factor to apply to the audio composite for BS.412 compliance.
// Call once per chunk with the average audio power of that chunk.
//
// audioPower = (1/N) × Σ sample² over the chunk's audio composite samples.
func (l *BS412Limiter) ProcessChunk(audioPower float64) float64 {
if !l.enabled {
return 1.0
}

// Update rolling 60-second power average
old := l.powerBuf[l.bufIdx]
l.powerBuf[l.bufIdx] = audioPower
l.powerSum += audioPower - old
l.bufIdx++
if l.bufIdx >= len(l.powerBuf) {
l.bufIdx = 0
l.bufFull = true
}

// Calculate average power over the window
var count int
if l.bufFull {
count = len(l.powerBuf)
} else {
count = l.bufIdx
}
if count < 1 {
return 1.0
}
avgPower := l.powerSum / float64(count)

// Target gain: bring average audio power to budget
targetGain := 1.0
if avgPower > l.audioBudget && avgPower > 0 {
targetGain = math.Sqrt(l.audioBudget / avgPower)
}

// Smooth gain changes (slow attack, slower release)
if targetGain < l.gain {
l.gain += l.attackCoeff * (targetGain - l.gain)
} else {
l.gain += l.releaseCoeff * (targetGain - l.gain)
}

// Clamp
if l.gain < 0.01 {
l.gain = 0.01
}
if l.gain > 1.0 {
l.gain = 1.0
}

return l.gain
}

// CurrentGain returns the current gain factor (0..1).
// Called at the start of each chunk to get the gain to apply.
func (l *BS412Limiter) CurrentGain() float64 {
return l.gain
}

// CurrentGainDB returns the current gain reduction in dB (negative = reducing).
func (l *BS412Limiter) CurrentGainDB() float64 {
if l.gain <= 0 {
return -100
}
return 20 * math.Log10(l.gain)
}

// Reset clears the power history and restores unity gain.
func (l *BS412Limiter) Reset() {
for i := range l.powerBuf {
l.powerBuf[i] = 0
}
l.bufIdx = 0
l.bufFull = false
l.powerSum = 0
l.gain = 1.0
}

+ 68
- 0
internal/dsp/stereolimiter.go Просмотреть файл

@@ -0,0 +1,68 @@
package dsp

import "math"

// StereoLimiter applies identical gain reduction to L and R channels,
// driven by the peak of max(|L|, |R|). This preserves the stereo image
// while preventing either channel from exceeding the ceiling.
//
// Attack is INSTANTANEOUS — gain is reduced in the same sample that
// exceeds the ceiling. This avoids overshoot entirely, which is critical
// because overshoot causes composite clipping that destroys pilot/RDS.
// Unlike hard clipping, gain scaling preserves the waveform shape and
// does not create harmonics.
//
// Release is smooth (exponential decay) to avoid audible pumping.
type StereoLimiter struct {
ceiling float64
releaseCoeff float64
gainReduction float64
}

// NewStereoLimiter creates a stereo-linked limiter with instant attack.
// releaseMs controls how quickly gain recovers after a peak (typ. 50-200ms).
func NewStereoLimiter(ceiling, attackMs, releaseMs, sampleRate float64) *StereoLimiter {
if ceiling <= 0 {
ceiling = 1.0
}
if releaseMs <= 0 {
releaseMs = 100
}
releaseSamples := releaseMs * sampleRate / 1000

return &StereoLimiter{
ceiling: ceiling,
releaseCoeff: 1.0 - math.Exp(-1.0/releaseSamples),
}
}

// Process applies stereo-linked limiting. Both channels receive the
// same gain factor, determined by the louder of the two.
//
// If the peak exceeds ceiling, gain is INSTANTLY reduced (zero overshoot).
// When the signal drops below ceiling, gain recovers smoothly via release.
func (l *StereoLimiter) Process(left, right float64) (float64, float64) {
peak := math.Max(math.Abs(left), math.Abs(right))

// Target: how much gain reduction do we need right now?
targetReduction := 0.0
if peak > l.ceiling {
targetReduction = 1.0 - l.ceiling/peak
}

// Instant attack: if we need MORE reduction, apply it NOW.
// Smooth release: if we need LESS reduction, decay slowly.
if targetReduction > l.gainReduction {
l.gainReduction = targetReduction // instant
} else {
l.gainReduction += l.releaseCoeff * (targetReduction - l.gainReduction) // smooth
}

gain := 1.0 - l.gainReduction
return left * gain, right * gain
}

// Reset clears the limiter state.
func (l *StereoLimiter) Reset() {
l.gainReduction = 0
}

+ 153
- 23
internal/offline/generator.go Просмотреть файл

@@ -31,6 +31,7 @@ type LiveParams struct {
RDSEnabled bool
LimiterEnabled bool
LimiterCeiling float64
MpxGain float64 // hardware calibration factor for composite output
}

// PreEmphasizedSource wraps an audio source and applies pre-emphasis.
@@ -77,24 +78,53 @@ type Generator struct {
stereoEncoder stereo.StereoEncoder
rdsEnc *rds.Encoder
combiner mpx.DefaultCombiner
limiter *dsp.MPXLimiter
fmMod *dsp.FMModulator
sampleRate float64
initialized bool
frameSeq uint64

// Broadcast-standard clip-filter-clip chain (per channel L/R):
//
// PreEmph → LPF₁(14kHz) → Notch(19kHz) → ×Drive
// → StereoLimiter (slow AGC: raises average level)
// → Clip₁ → LPF₂(14kHz) [cleanup] → Clip₂ [catches LPF overshoots]
// → Stereo Encode → Composite Clip → Notch₁₉ → Notch₅₇
// → + Pilot → + RDS → FM
//
audioLPF_L *dsp.FilterChain // 14kHz 8th-order (pre-clip)
audioLPF_R *dsp.FilterChain
pilotNotchL *dsp.FilterChain // 19kHz double-notch (guard band)
pilotNotchR *dsp.FilterChain
limiter *dsp.StereoLimiter // slow compressor (raises average, clips catch peaks)
cleanupLPF_L *dsp.FilterChain // 14kHz 8th-order (post-clip cleanup)
cleanupLPF_R *dsp.FilterChain
mpxNotch19 *dsp.FilterChain // composite clipper protection
mpxNotch57 *dsp.FilterChain
bs412 *dsp.BS412Limiter // ITU-R BS.412 MPX power limiter (optional)

// Pre-allocated frame buffer — reused every GenerateFrame call.
frameBuf *output.CompositeFrame
bufCap int

// Live-updatable DSP parameters — written by control API, read per chunk.
liveParams atomic.Pointer[LiveParams]

// Optional external audio source (e.g. StreamResampler for live audio).
// When set, takes priority over WAV/tones in sourceFor().
externalSource frameSource
}

func NewGenerator(cfg cfgpkg.Config) *Generator {
return &Generator{cfg: cfg}
}

// SetExternalSource sets a live audio source (e.g. StreamResampler) that
// takes priority over WAV/tone sources. Must be called before the first
// GenerateFrame() call (i.e. before init).
func (g *Generator) SetExternalSource(src frameSource) {
g.externalSource = src
}

// UpdateLive hot-swaps DSP parameters. Thread-safe — called from control API,
// applied at the next chunk boundary by the DSP goroutine.
func (g *Generator) UpdateLive(p LiveParams) {
@@ -107,7 +137,7 @@ func (g *Generator) CurrentLiveParams() LiveParams {
if lp := g.liveParams.Load(); lp != nil {
return *lp
}
return LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0}
return LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0, MpxGain: 1.0}
}

// RDSEncoder returns the live RDS encoder, or nil if RDS is disabled.
@@ -140,12 +170,42 @@ func (g *Generator) init() {
}
ceiling := g.cfg.FM.LimiterCeiling
if ceiling <= 0 { ceiling = 1.0 }
if g.cfg.FM.LimiterEnabled {
g.limiter = dsp.NewMPXLimiter(ceiling, 0.1, 50, g.sampleRate)

// Broadcast clip-filter-clip chain:
// Pre-clip: 14kHz LPF (8th-order) + 19kHz double-notch (per channel)
g.audioLPF_L = dsp.NewAudioLPF(g.sampleRate)
g.audioLPF_R = dsp.NewAudioLPF(g.sampleRate)
g.pilotNotchL = dsp.NewPilotNotch(g.sampleRate)
g.pilotNotchR = dsp.NewPilotNotch(g.sampleRate)
// Slow compressor: 5ms attack / 200ms release. Brings average level UP.
// The clips after it catch the peaks the limiter's attack time misses.
// This is the "slow-to-fast progression" from broadcast processing:
// slow limiter → fast clips.
g.limiter = dsp.NewStereoLimiter(ceiling, 5, 200, g.sampleRate)
// Post-clip cleanup: second 14kHz LPF pass (removes clip harmonics)
g.cleanupLPF_L = dsp.NewAudioLPF(g.sampleRate)
g.cleanupLPF_R = dsp.NewAudioLPF(g.sampleRate)
// Composite clipper protection: double-notch at 19kHz + 57kHz
g.mpxNotch19, g.mpxNotch57 = dsp.NewCompositeProtection(g.sampleRate)
// BS.412 MPX power limiter (EU/CH requirement for licensed FM)
if g.cfg.FM.BS412Enabled {
chunkSec := 0.05 // 50ms chunks (matches engine default)
g.bs412 = dsp.NewBS412Limiter(
g.cfg.FM.BS412ThresholdDBr,
g.cfg.FM.PilotLevel,
g.cfg.FM.RDSInjection,
chunkSec,
)
}
if g.cfg.FM.FMModulationEnabled {
g.fmMod = dsp.NewFMModulator(g.sampleRate)
if g.cfg.FM.MaxDeviationHz > 0 { g.fmMod.MaxDeviation = g.cfg.FM.MaxDeviationHz }
maxDev := g.cfg.FM.MaxDeviationHz
if maxDev > 0 {
if g.cfg.FM.MpxGain > 0 && g.cfg.FM.MpxGain != 1.0 {
maxDev *= g.cfg.FM.MpxGain
}
g.fmMod.MaxDeviation = maxDev
}
}

// Seed initial live params from config
@@ -157,12 +217,16 @@ func (g *Generator) init() {
RDSEnabled: g.cfg.RDS.Enabled,
LimiterEnabled: g.cfg.FM.LimiterEnabled,
LimiterCeiling: ceiling,
MpxGain: g.cfg.FM.MpxGain,
})

g.initialized = true
}

func (g *Generator) sourceFor(sampleRate float64) (frameSource, SourceInfo) {
if g.externalSource != nil {
return g.externalSource, SourceInfo{Kind: "stream", SampleRate: sampleRate, Detail: "live audio"}
}
if g.cfg.Audio.InputPath != "" {
if src, err := audio.LoadWAVSource(g.cfg.Audio.InputPath); err == nil {
return audio.NewResampledSource(src, sampleRate), SourceInfo{Kind: "wav", SampleRate: float64(src.SampleRate), Detail: g.cfg.Audio.InputPath}
@@ -196,36 +260,96 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame
lp := g.liveParams.Load()
if lp == nil {
// Fallback: should never happen after init(), but be safe
lp = &LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0}
lp = &LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0, MpxGain: 1.0}
}

// Apply live combiner gains
g.combiner.PilotGain = lp.PilotLevel
g.combiner.RDSGain = lp.RDSInjection

// Broadcast clip-filter-clip FM MPX signal chain:
//
// Audio L/R → PreEmphasis
// → LPF₁ (14kHz, 8th-order) → 19kHz Notch (double)
// → × OutputDrive → HardClip₁ (ceiling)
// → LPF₂ (14kHz, 8th-order) [removes clip₁ harmonics]
// → HardClip₂ (ceiling) [catches LPF₂ overshoots]
// → Stereo Encode
// Audio MPX (mono + stereo sub)
// → HardClip₃ (ceiling) [composite deviation control]
// → 19kHz Notch (double) [protect pilot band]
// → 57kHz Notch (double) [protect RDS band]
// + Pilot 19kHz (fixed, NEVER clipped)
// + RDS 57kHz (fixed, NEVER clipped)
// → FM Modulator
//
// Guard band depth at 19kHz: LPF₁(-21dB) + Notch(-60dB) + LPF₂(-21dB)
// + CompNotch(-60dB) → broadband floor -42dB, exact 19kHz >-90dB
ceiling := lp.LimiterCeiling
if ceiling <= 0 { ceiling = 1.0 }
// Pilot and RDS are FIXED injection levels, independent of OutputDrive.
// Config values directly represent percentage of ±75kHz deviation:
// pilotLevel: 0.09 = 9% = ±6.75kHz (ITU standard)
// rdsInjection: 0.04 = 4% = ±3.0kHz (typical)
pilotAmp := lp.PilotLevel
rdsAmp := lp.RDSInjection

// BS.412 MPX power limiter: uses previous chunk's measurement to set gain.
// Power is measured during this chunk and fed back at the end.
bs412Gain := 1.0
var bs412PowerAccum float64
if g.bs412 != nil {
bs412Gain = g.bs412.CurrentGain()
}

for i := 0; i < samples; i++ {
in := g.source.NextFrame()

comps := g.stereoEncoder.Encode(in)
if !lp.StereoEnabled {
comps.Stereo = 0; comps.Pilot = 0
// --- Stage 1: Band-limit pre-emphasized audio ---
l := g.audioLPF_L.Process(float64(in.L))
l = g.pilotNotchL.Process(l)
r := g.audioLPF_R.Process(float64(in.R))
r = g.pilotNotchR.Process(r)

// --- Stage 2: Drive + Compress + Clip₁ ---
l *= lp.OutputDrive
r *= lp.OutputDrive
if g.limiter != nil {
l, r = g.limiter.Process(l, r)
}

rdsValue := 0.0
if g.rdsEnc != nil && lp.RDSEnabled {
rdsCarrier := g.stereoEncoder.RDSCarrier()
rdsValue = g.rdsEnc.NextSampleWithCarrier(rdsCarrier)
l = dsp.HardClip(l, ceiling)
r = dsp.HardClip(r, ceiling)

// --- Stage 3: Cleanup LPF + Clip₂ (overshoot compensator) ---
l = g.cleanupLPF_L.Process(l)
r = g.cleanupLPF_R.Process(r)
l = dsp.HardClip(l, ceiling)
r = dsp.HardClip(r, ceiling)

// --- Stage 4: Stereo encode ---
limited := audio.NewFrame(audio.Sample(l), audio.Sample(r))
comps := g.stereoEncoder.Encode(limited)

// --- Stage 5: Composite clip + protection ---
audioMPX := float64(comps.Mono)
if lp.StereoEnabled {
audioMPX += float64(comps.Stereo)
}
audioMPX = dsp.HardClip(audioMPX, ceiling)
audioMPX = g.mpxNotch19.Process(audioMPX)
audioMPX = g.mpxNotch57.Process(audioMPX)

composite := g.combiner.Combine(comps.Mono, comps.Stereo, comps.Pilot, rdsValue)
composite *= lp.OutputDrive
// BS.412: apply gain and measure power
if bs412Gain < 1.0 {
audioMPX *= bs412Gain
}
bs412PowerAccum += audioMPX * audioMPX

if lp.LimiterEnabled && g.limiter != nil {
composite = g.limiter.Process(composite)
composite = dsp.HardClip(composite, ceiling)
// --- Stage 6: Add protected components ---
composite := audioMPX
if lp.StereoEnabled {
composite += pilotAmp * comps.Pilot
}
if g.rdsEnc != nil && lp.RDSEnabled {
rdsCarrier := g.stereoEncoder.RDSCarrier()
rdsValue := g.rdsEnc.NextSampleWithCarrier(rdsCarrier)
composite += rdsAmp * rdsValue
}

if g.fmMod != nil {
@@ -235,6 +359,12 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame
frame.Samples[i] = output.IQSample{I: float32(composite), Q: 0}
}
}

// BS.412: feed this chunk's average audio power for next chunk's gain calculation
if g.bs412 != nil && samples > 0 {
g.bs412.ProcessChunk(bs412PowerAccum / float64(samples))
}

return frame
}



+ 23
- 10
internal/offline/generator_test.go Просмотреть файл

@@ -83,8 +83,13 @@ func TestLimiterPreventsClipping(t *testing.T) {
cfg.FM.FMModulationEnabled = false
cfg.Audio.ToneAmplitude = 0.9; cfg.Audio.Gain = 2.0; cfg.FM.OutputDrive = 1.0
frame := NewGenerator(cfg).GenerateFrame(50 * time.Millisecond)
// Audio clipped to ceiling, pilot+RDS added on top (standard broadcast).
// Total = ceiling + pilotLevel*drive + rdsInjection*drive
maxAllowed := cfg.FM.LimiterCeiling +
cfg.FM.PilotLevel*cfg.FM.OutputDrive +
cfg.FM.RDSInjection*cfg.FM.OutputDrive + 0.02
for i, s := range frame.Samples {
if math.Abs(float64(s.I)) > 1.01 { t.Fatalf("sample %d: %.4f exceeds ceiling", i, s.I) }
if math.Abs(float64(s.I)) > maxAllowed { t.Fatalf("sample %d: %.4f exceeds max %.4f", i, s.I, maxAllowed) }
}
}

@@ -110,20 +115,28 @@ func TestFMModDisabledMeansComposite(t *testing.T) {
}
}

func TestLimiterDisabledAllowsHigherPeaks(t *testing.T) {
func TestClipFilterClipAlwaysActive(t *testing.T) {
// With clip-filter-clip architecture, peak control is always active
// regardless of LimiterEnabled (legacy flag). Both configs should
// produce the same peak level.
base := cfgpkg.Default()
base.FM.FMModulationEnabled = false
base.Audio.ToneAmplitude = 0.9; base.Audio.Gain = 2.0; base.FM.OutputDrive = 1.0

cfgLim := base; cfgLim.FM.LimiterEnabled = true; cfgLim.FM.LimiterCeiling = 1.0
cfgNoLim := base; cfgNoLim.FM.LimiterEnabled = false
cfgA := base; cfgA.FM.LimiterEnabled = true; cfgA.FM.LimiterCeiling = 1.0
cfgB := base; cfgB.FM.LimiterEnabled = false; cfgB.FM.LimiterCeiling = 1.0

fLim := NewGenerator(cfgLim).GenerateFrame(50 * time.Millisecond)
fNoLim := NewGenerator(cfgNoLim).GenerateFrame(50 * time.Millisecond)
fA := NewGenerator(cfgA).GenerateFrame(50 * time.Millisecond)
fB := NewGenerator(cfgB).GenerateFrame(50 * time.Millisecond)

var maxLim, maxNoLim float64
for _, s := range fLim.Samples { if math.Abs(float64(s.I)) > maxLim { maxLim = math.Abs(float64(s.I)) } }
for _, s := range fNoLim.Samples { if math.Abs(float64(s.I)) > maxNoLim { maxNoLim = math.Abs(float64(s.I)) } }
var maxA, maxB float64
for _, s := range fA.Samples { if math.Abs(float64(s.I)) > maxA { maxA = math.Abs(float64(s.I)) } }
for _, s := range fB.Samples { if math.Abs(float64(s.I)) > maxB { maxB = math.Abs(float64(s.I)) } }

if maxNoLim <= maxLim { t.Fatalf("limiter disabled should allow higher peaks: lim=%.4f nolim=%.4f", maxLim, maxNoLim) }
// Both should be within ceiling + pilot + RDS
maxAllowed := cfgA.FM.LimiterCeiling +
cfgA.FM.PilotLevel*cfgA.FM.OutputDrive +
cfgA.FM.RDSInjection*cfgA.FM.OutputDrive + 0.02
if maxA > maxAllowed { t.Fatalf("cfgA peak %.4f exceeds %.4f", maxA, maxAllowed) }
if maxB > maxAllowed { t.Fatalf("cfgB peak %.4f exceeds %.4f", maxB, maxAllowed) }
}

+ 1
- 1
internal/platform/plutosdr/available_pluto.go Просмотреть файл

@@ -1,4 +1,4 @@
//go:build pluto && windows
//go:build pluto && (windows || linux)

package plutosdr



+ 364
- 0
internal/platform/plutosdr/pluto_linux.go Просмотреть файл

@@ -0,0 +1,364 @@
//go:build pluto && linux

package plutosdr

/*
#cgo pkg-config: libiio
#include <iio.h>
#include <stdlib.h>
#include <stdint.h>
*/
import "C"

import (
"context"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/jan/fm-rds-tx/internal/output"
"github.com/jan/fm-rds-tx/internal/platform"
)

type PlutoDriver struct {
mu sync.Mutex
cfg platform.SoapyConfig

ctx *C.struct_iio_context
txDev *C.struct_iio_device
phyDev *C.struct_iio_device
chanI *C.struct_iio_channel
chanQ *C.struct_iio_channel
chanLO *C.struct_iio_channel
buf *C.struct_iio_buffer
bufSize int

started bool
configured bool
framesWritten atomic.Uint64
samplesWritten atomic.Uint64
underruns atomic.Uint64
lastError string
lastErrorAt string
layoutLogged bool
}

func NewPlutoDriver() platform.SoapyDriver {
return &PlutoDriver{}
}

func (d *PlutoDriver) Name() string { return "pluto-iio" }

func (d *PlutoDriver) Configure(_ context.Context, cfg platform.SoapyConfig) error {
d.mu.Lock()
defer d.mu.Unlock()

d.cleanup()
d.cfg = cfg

uri := "usb:"
if cfg.Device != "" && cfg.Device != "plutosdr" {
uri = cfg.Device
}
if v, ok := cfg.DeviceArgs["uri"]; ok && v != "" {
uri = v
}

cURI := C.CString(uri)
defer C.free(unsafe.Pointer(cURI))
ctx := C.iio_create_context_from_uri(cURI)
if ctx == nil {
return fmt.Errorf("pluto: failed to create IIO context (uri=%s)", uri)
}
d.ctx = ctx

txDev := d.findDevice("cf-ad9361-dds-core-lpc")
if txDev == nil {
return fmt.Errorf("pluto: TX device 'cf-ad9361-dds-core-lpc' not found")
}
d.txDev = txDev

phyDev := d.findDevice("ad9361-phy")
if phyDev == nil {
return fmt.Errorf("pluto: PHY device 'ad9361-phy' not found")
}
d.phyDev = phyDev

phyChanTX := d.findChannel(phyDev, "voltage3", true)
if phyChanTX == nil {
phyChanTX = d.findChannel(phyDev, "voltage0", true)
}
if phyChanTX == nil {
return fmt.Errorf("pluto: PHY TX channel not found (tried voltage3, voltage0)")
}

rate := int64(cfg.SampleRateHz)
if rate < 2084000 {
rate = 2084000
}
d.cfg.SampleRateHz = float64(rate)
if err := d.writeChanAttrLL(phyChanTX, "sampling_frequency", rate); err != nil {
return err
}

bw := rate
if bw > 2000000 {
bw = 2000000
}
if err := d.writeChanAttrLL(phyChanTX, "rf_bandwidth", bw); err != nil {
return err
}

phyChanLO := d.findChannel(phyDev, "altvoltage1", true)
d.chanLO = phyChanLO
if phyChanLO != nil {
freqHz := int64(cfg.CenterFreqHz)
if freqHz <= 0 {
freqHz = 100000000
}
if err := d.writeChanAttrLL(phyChanLO, "frequency", freqHz); err != nil {
return err
}
}

attenDB := int64(0)
if cfg.GainDB > 0 {
attenDB = -int64(89 - cfg.GainDB)
if attenDB > 0 {
attenDB = 0
}
if attenDB < -89 {
attenDB = -89
}
}
_ = d.writeChanAttrLL(phyChanTX, "hardwaregain", attenDB*1000)

chanI := d.findChannel(txDev, "voltage0", true)
chanQ := d.findChannel(txDev, "voltage1", true)
if chanI == nil || chanQ == nil {
return fmt.Errorf("pluto: TX I/Q channels not found on streaming device")
}
C.iio_channel_enable(chanI)
C.iio_channel_enable(chanQ)
d.chanI = chanI
d.chanQ = chanQ

d.bufSize = int(rate) / 20
if d.bufSize < 4096 {
d.bufSize = 4096
}
buf := C.iio_device_create_buffer(txDev, C.size_t(d.bufSize), C.bool(false))
if buf == nil {
return fmt.Errorf("pluto: failed to create TX buffer (size=%d)", d.bufSize)
}
d.buf = buf
d.configured = true
return nil
}

func (d *PlutoDriver) Capabilities(_ context.Context) (platform.DeviceCaps, error) {
return platform.DeviceCaps{
MinSampleRate: 521e3,
MaxSampleRate: 61.44e6,
HasGain: true,
GainMinDB: -89,
GainMaxDB: 0,
Channels: []int{0},
}, nil
}

func (d *PlutoDriver) Start(_ context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
if !d.configured {
return fmt.Errorf("pluto: not configured")
}
if d.started {
return fmt.Errorf("pluto: already started")
}
d.started = true
return nil
}

func (d *PlutoDriver) Write(_ context.Context, frame *output.CompositeFrame) (int, error) {
d.mu.Lock()
buf := d.buf
chanI := d.chanI
chanQ := d.chanQ
started := d.started
bufSize := d.bufSize
d.mu.Unlock()

if !started || buf == nil {
return 0, fmt.Errorf("pluto: not active")
}
if frame == nil || len(frame.Samples) == 0 {
return 0, nil
}

written := 0
total := len(frame.Samples)

for written < total {
chunk := total - written
if chunk > bufSize {
chunk = bufSize
}

step := uintptr(C.iio_buffer_step(buf))
if step == 0 {
return written, fmt.Errorf("pluto: buffer step is 0")
}

ptrI := uintptr(C.iio_buffer_first(buf, chanI))
ptrQ := uintptr(C.iio_buffer_first(buf, chanQ))
if ptrI == 0 || ptrQ == 0 {
return written, fmt.Errorf("pluto: buffer_first returned null")
}

end := uintptr(C.iio_buffer_end(buf))
d.mu.Lock()
if !d.layoutLogged {
delta := int64(ptrQ) - int64(ptrI)
span := int64(0)
if end > ptrI {
span = int64(end - ptrI)
}
log.Printf("pluto-linux: buffer layout step=%d ptrI=%#x ptrQ=%#x delta=%d end=%#x span=%d bufSize=%d", step, ptrI, ptrQ, delta, end, span, bufSize)
d.layoutLogged = true
}
d.mu.Unlock()
if end > 0 {
bufSamples := int((end - ptrI) / step)
if bufSamples > 0 && chunk > bufSamples {
chunk = bufSamples
}
}

for i := 0; i < chunk; i++ {
s := frame.Samples[written+i]
*(*int16)(unsafe.Pointer(ptrI)) = int16(s.I * 32767)
*(*int16)(unsafe.Pointer(ptrQ)) = int16(s.Q * 32767)
ptrI += step
ptrQ += step
}

pushed := int(C.iio_buffer_push(buf))
if pushed < 0 {
d.mu.Lock()
d.lastError = fmt.Sprintf("buffer_push: %d", pushed)
d.lastErrorAt = time.Now().UTC().Format(time.RFC3339)
d.underruns.Add(1)
d.mu.Unlock()
return written, fmt.Errorf("pluto: buffer_push returned %d", pushed)
}

written += chunk
}

d.framesWritten.Add(1)
d.samplesWritten.Add(uint64(written))
return written, nil
}

func (d *PlutoDriver) Stop(_ context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
d.started = false
return nil
}

func (d *PlutoDriver) Flush(_ context.Context) error { return nil }

func (d *PlutoDriver) Tune(_ context.Context, freqHz float64) error {
d.mu.Lock()
defer d.mu.Unlock()
if !d.configured || d.chanLO == nil {
return fmt.Errorf("pluto: not configured or LO channel not available")
}
return d.writeChanAttrLL(d.chanLO, "frequency", int64(freqHz))
}

func (d *PlutoDriver) Close(_ context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
d.started = false
d.cleanup()
return nil
}

func (d *PlutoDriver) Stats() platform.RuntimeStats {
d.mu.Lock()
defer d.mu.Unlock()
return platform.RuntimeStats{
TXEnabled: d.started,
StreamActive: d.started && d.buf != nil,
FramesWritten: d.framesWritten.Load(),
SamplesWritten: d.samplesWritten.Load(),
Underruns: d.underruns.Load(),
LastError: d.lastError,
LastErrorAt: d.lastErrorAt,
EffectiveRate: d.cfg.SampleRateHz,
}
}

func (d *PlutoDriver) cleanup() {
if d.buf != nil {
C.iio_buffer_destroy(d.buf)
d.buf = nil
}
if d.chanI != nil {
C.iio_channel_disable(d.chanI)
d.chanI = nil
}
if d.chanQ != nil {
C.iio_channel_disable(d.chanQ)
d.chanQ = nil
}
d.chanLO = nil
if d.ctx != nil {
C.iio_context_destroy(d.ctx)
d.ctx = nil
}
d.txDev = nil
d.phyDev = nil
d.configured = false
d.layoutLogged = false
}

func (d *PlutoDriver) findDevice(name string) *C.struct_iio_device {
if d.ctx == nil {
return nil
}
cName := C.CString(name)
defer C.free(unsafe.Pointer(cName))
return C.iio_context_find_device(d.ctx, cName)
}

func (d *PlutoDriver) findChannel(dev *C.struct_iio_device, name string, isOutput bool) *C.struct_iio_channel {
if dev == nil {
return nil
}
cName := C.CString(name)
defer C.free(unsafe.Pointer(cName))
if isOutput {
return C.iio_device_find_channel(dev, cName, C.bool(true))
}
return C.iio_device_find_channel(dev, cName, C.bool(false))
}

func (d *PlutoDriver) writeChanAttrLL(ch *C.struct_iio_channel, attr string, val int64) error {
if ch == nil {
return fmt.Errorf("pluto: channel missing for attr %s", attr)
}
cAttr := C.CString(attr)
defer C.free(unsafe.Pointer(cAttr))
ret := C.iio_channel_attr_write_longlong(ch, cAttr, C.longlong(val))
if ret < 0 {
return fmt.Errorf("pluto: write attr %s failed (rc=%d)", attr, int(ret))
}
return nil
}

+ 1
- 1
internal/platform/plutosdr/stub.go Просмотреть файл

@@ -1,4 +1,4 @@
//go:build !pluto || !windows
//go:build !pluto || (!windows && !linux)

package plutosdr



+ 59
- 27
internal/platform/soapysdr/lib_unix.go Просмотреть файл

@@ -4,7 +4,9 @@ package soapysdr

import (
"fmt"
"log"
"math"
"sort"
"unsafe"
)

@@ -29,6 +31,13 @@ static const char* soapy_dlerror() {
return dlerror();
}

// Try to resolve SoapySDR_getLastError dynamically when available.
typedef const char* (*last_error_fn)(void);
static const char* call_last_error(void* fn) {
if (fn == NULL) return NULL;
return ((last_error_fn)fn)();
}

// Function call trampolines — we call function pointers loaded via dlsym.
// These avoid the complexity of calling C function pointers from Go directly.

@@ -102,22 +111,23 @@ static void call_kwargs_set(void* fn, void* kw, const char* key, const char* val
import "C"

type soapyLib struct {
handle unsafe.Pointer
fnEnumerate unsafe.Pointer
fnKwargsListClear unsafe.Pointer
fnKwargsSet unsafe.Pointer
fnMake unsafe.Pointer
fnUnmake unsafe.Pointer
fnSetSampleRate unsafe.Pointer
fnSetFrequency unsafe.Pointer
fnSetGain unsafe.Pointer
fnGetGainRange unsafe.Pointer
fnSetupStream unsafe.Pointer
fnCloseStream unsafe.Pointer
fnGetStreamMTU unsafe.Pointer
fnActivateStream unsafe.Pointer
fnDeactivateStream unsafe.Pointer
fnWriteStream unsafe.Pointer
handle unsafe.Pointer
fnEnumerate unsafe.Pointer
fnKwargsListClear unsafe.Pointer
fnKwargsSet unsafe.Pointer
fnMake unsafe.Pointer
fnUnmake unsafe.Pointer
fnSetSampleRate unsafe.Pointer
fnSetFrequency unsafe.Pointer
fnSetGain unsafe.Pointer
fnGetGainRange unsafe.Pointer
fnSetupStream unsafe.Pointer
fnCloseStream unsafe.Pointer
fnGetStreamMTU unsafe.Pointer
fnActivateStream unsafe.Pointer
fnDeactivateStream unsafe.Pointer
fnWriteStream unsafe.Pointer
fnGetLastError unsafe.Pointer
}

var libNames = []string{
@@ -164,6 +174,7 @@ func loadSoapyLib() (*soapyLib, error) {
fnActivateStream: sym("SoapySDRDevice_activateStream"),
fnDeactivateStream: sym("SoapySDRDevice_deactivateStream"),
fnWriteStream: sym("SoapySDRDevice_writeStream"),
fnGetLastError: sym("SoapySDR_getLastError"),
}, nil
}

@@ -192,20 +203,20 @@ func (lib *soapyLib) enumerate() ([]map[string]string, error) {
}
}()

devices := make([]map[string]string, int(length))
kwSize := unsafe.Sizeof(kwargs{})
devices := make([]map[string]string, 0, int(length))
kwSize := unsafe.Sizeof(C.GoKwargs{})
base := uintptr(ret)
for i := 0; i < int(length); i++ {
kw := (*kwargs)(unsafe.Pointer(base + uintptr(i)*kwSize))
entry := (*C.GoKwargs)(unsafe.Pointer(base + uintptr(i)*kwSize))
m := make(map[string]string)
for j := 0; j < int(kw.size); j++ {
keyPtr := *(*uintptr)(unsafe.Pointer(uintptr(unsafe.Pointer(kw.keys)) + uintptr(j)*unsafe.Sizeof(uintptr(0))))
valPtr := *(*uintptr)(unsafe.Pointer(uintptr(unsafe.Pointer(kw.vals)) + uintptr(j)*unsafe.Sizeof(uintptr(0))))
if keyPtr != 0 && valPtr != 0 {
m[C.GoString((*C.char)(unsafe.Pointer(keyPtr)))] = C.GoString((*C.char)(unsafe.Pointer(valPtr)))
for j := 0; j < int(entry.size); j++ {
keyPtr := *(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(entry.keys)) + uintptr(j)*unsafe.Sizeof(uintptr(0))))
valPtr := *(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(entry.vals)) + uintptr(j)*unsafe.Sizeof(uintptr(0))))
if keyPtr != nil && valPtr != nil {
m[C.GoString(keyPtr)] = C.GoString(valPtr)
}
}
devices[i] = m
devices = append(devices, m)
}
return devices, nil
}
@@ -217,9 +228,30 @@ func (lib *soapyLib) makeDevice(driver, device string, args map[string]string) (
var kw kwargs
if driver != "" { lib.kwargsSet(&kw, "driver", driver) }
if device != "" { lib.kwargsSet(&kw, "device", device) }
for k, v := range args { lib.kwargsSet(&kw, k, v) }

keys := make([]string, 0, len(args))
for k := range args {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
lib.kwargsSet(&kw, k, args[k])
}

log.Printf("soapy: makeDevice driver=%q device=%q args=%v", driver, device, args)
ret := C.call_make(lib.fnMake, unsafe.Pointer(&kw))
if ret == nil { return 0, fmt.Errorf("soapy: failed to open device") }
if ret == nil {
msg := ""
if lib.fnGetLastError != nil {
if p := C.call_last_error(lib.fnGetLastError); p != nil {
msg = C.GoString(p)
}
}
if msg != "" {
return 0, fmt.Errorf("soapy: failed to open device: %s", msg)
}
return 0, fmt.Errorf("soapy: failed to open device")
}
return uintptr(ret), nil
}



+ 16
- 1
internal/rds/encoder.go Просмотреть файл

@@ -119,6 +119,18 @@ func NewEncoder(cfg RDSConfig) (*Encoder, error) {
waveform[i] = refWaveform[idx]
}
}
// Normalize to peak=1.0 so rdsInjection directly maps to injection %.
// The raw PiFmRds waveform peaks at ~0.543, which would make config
// values misleading (0.05 would give 2.7% instead of 5%).
var peak float64
for _, v := range waveform {
if a := math.Abs(v); a > peak { peak = a }
}
if peak > 0 {
for i := range waveform {
waveform[i] /= peak
}
}
ringSize := spb + wfLen
@@ -178,14 +190,17 @@ func (e *Encoder) NextSampleWithCarrier(carrier float64) float64 {
if e.sampleCount >= e.spb {
if e.bitPos >= bitsPerGroup {
// Apply live text updates at group boundaries (~88ms at 228kHz).
// This is the only place we read the atomics — zero per-sample overhead.
// Atomics are consumed (cleared) after reading to prevent
// re-applying the same text every group and toggling A/B flag.
if ps, ok := e.livePS.Load().(string); ok && ps != "" {
e.scheduler.cfg.PS = ps
e.livePS.Store("") // consumed
}
if rt, ok := e.liveRT.Load().(string); ok && rt != "" {
e.scheduler.cfg.RT = rt
e.scheduler.rtIdx = 0 // restart RT transmission for new text
e.scheduler.rtABFlag = !e.scheduler.rtABFlag // toggle A/B per RDS spec
e.liveRT.Store("") // consumed
}
e.getRDSGroup()
e.bitPos = 0


+ 365
- 0
scripts/orangepi-build-libiio.sh Просмотреть файл

@@ -0,0 +1,365 @@
#!/usr/bin/env bash
set -Eeuo pipefail

# Orange Pi Plus 2E / Armbian Bookworm build helper for fm-rds-tx
#
# Goals:
# - install build + runtime dependencies
# - install libiio / Pluto-related userspace bits
# - build fmrtx for Linux ARM
# - collect binary + shared libraries into dist/orangepi/
#
# Notes:
# - Linux Pluto build is libiio-first (`-tags pluto`).
# - SoapySDR is optional fallback/debug tooling, not the primary Pluto path.
# - Windows Pluto path remains separate and untouched by this script.
#
# Usage:
# chmod +x scripts/orangepi-build-libiio.sh
# ./scripts/orangepi-build-libiio.sh
#
# Optional env:
# PREFIX=/opt/fm-rds-tx
# DIST_DIR=dist/orangepi
# GO_VERSION=1.22.12
# SKIP_APT=1
# SKIP_GO_INSTALL=1
# BUILD_TAGS=pluto
#
# If you want to install the packaged result into a target directory:
# PREFIX=/opt/fm-rds-tx ./scripts/orangepi-build-libiio.sh

SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)"
REPO_DIR="$(cd -- "${SCRIPT_DIR}/.." && pwd)"
DIST_DIR="${DIST_DIR:-${REPO_DIR}/dist/orangepi}"
BUILD_DIR="${DIST_DIR}/build"
RUNTIME_DIR="${DIST_DIR}/runtime"
PREFIX="${PREFIX:-/opt/fm-rds-tx}"
GO_VERSION="${GO_VERSION:-1.22.12}"
BUILD_TAGS="${BUILD_TAGS:-pluto}"
ARCH="$(dpkg --print-architecture 2>/dev/null || true)"

log() {
printf '\n[%s] %s\n' "$(date '+%H:%M:%S')" "$*"
}

need_cmd() {
command -v "$1" >/dev/null 2>&1 || {
echo "Missing required command: $1" >&2
exit 1
}
}

apt_install_if_missing() {
local missing=()
for pkg in "$@"; do
if ! dpkg -s "$pkg" >/dev/null 2>&1; then
missing+=("$pkg")
fi
done
if ((${#missing[@]})); then
log "Installing missing packages: ${missing[*]}"
sudo apt-get install -y "${missing[@]}"
fi
}

install_go() {
if command -v go >/dev/null 2>&1; then
log "Go already present: $(go version)"
return
fi

if [[ "${SKIP_GO_INSTALL:-0}" == "1" ]]; then
echo "go not found and SKIP_GO_INSTALL=1 set" >&2
exit 1
fi

local go_arch
case "$ARCH" in
armhf) go_arch="armv6l" ;;
arm64) go_arch="arm64" ;;
amd64) go_arch="amd64" ;;
*)
echo "Unsupported architecture for automated Go install: $ARCH" >&2
exit 1
;;
esac

local tarball="go${GO_VERSION}.linux-${go_arch}.tar.gz"
local url="https://go.dev/dl/${tarball}"
local tmp="/tmp/${tarball}"

log "Installing Go ${GO_VERSION} for ${go_arch}"
wget -O "$tmp" "$url"
sudo rm -rf /usr/local/go
sudo tar -C /usr/local -xzf "$tmp"
export PATH="/usr/local/go/bin:${PATH}"

if ! grep -q '/usr/local/go/bin' "$HOME/.profile" 2>/dev/null; then
printf '\nexport PATH="/usr/local/go/bin:$PATH"\n' >> "$HOME/.profile"
fi

log "Go installed: $(/usr/local/go/bin/go version)"
}

resolve_lib() {
local name="$1"
local ldconfig_bin=""
if command -v ldconfig >/dev/null 2>&1; then
ldconfig_bin="$(command -v ldconfig)"
elif [[ -x /sbin/ldconfig ]]; then
ldconfig_bin="/sbin/ldconfig"
elif [[ -x /usr/sbin/ldconfig ]]; then
ldconfig_bin="/usr/sbin/ldconfig"
fi

if [[ -n "$ldconfig_bin" ]]; then
"$ldconfig_bin" -p 2>/dev/null | awk -v lib="$name" '$1 == lib { print $NF; exit }'
return 0
fi

find /lib /usr/lib /usr/local/lib -name "$name" 2>/dev/null | head -n 1
}

copy_lib_if_found() {
local libname="$1"
local path
path="$(resolve_lib "$libname" || true)"
if [[ -z "$path" ]]; then
path="$(find /lib /usr/lib /usr/local/lib -name "$libname" 2>/dev/null | head -n 1 || true)"
fi
if [[ -n "$path" && -f "$path" ]]; then
cp -Lv "$path" "$RUNTIME_DIR/lib/"
else
log "Library not found: $libname"
fi
}

find_soapy_plugin_path() {
local candidate=""

if command -v SoapySDRUtil >/dev/null 2>&1; then
candidate="$(SoapySDRUtil --info 2>/dev/null | awk -F': ' '/Search path:/ {print $2; exit}')"
if [[ -n "$candidate" && -d "$candidate" ]]; then
printf '%s\n' "$candidate"
return 0
fi
fi

for candidate in \
/usr/local/lib/SoapySDR/modules0.8-3 \
/usr/local/lib/SoapySDR/modules0.8 \
/usr/lib/arm-linux-gnueabihf/SoapySDR/modules0.8-3 \
/usr/lib/arm-linux-gnueabihf/SoapySDR/modules0.8 \
/usr/lib/SoapySDR/modules0.8-3 \
/usr/lib/SoapySDR/modules0.8
do
if [[ -d "$candidate" ]]; then
printf '%s\n' "$candidate"
return 0
fi
done

return 1
}

copy_soapy_plugins_if_found() {
local plugin_path
plugin_path="$(find_soapy_plugin_path || true)"
if [[ -n "$plugin_path" && -d "$plugin_path" ]]; then
mkdir -p "$RUNTIME_DIR/soapy-modules"
cp -Lv "$plugin_path"/* "$RUNTIME_DIR/soapy-modules/" 2>/dev/null || true
printf '%s\n' "$plugin_path" > "$DIST_DIR/SOAPY_PLUGIN_PATH.txt"
log "Copied Soapy plugins from: $plugin_path"
else
log "Soapy plugin path not found"
fi
}

write_runner() {
cat > "${DIST_DIR}/run-fmrtx.sh" <<'EOF'
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)"

SYSTEM_LIB_DIRS="/usr/local/lib:/usr/lib:/usr/lib/arm-linux-gnueabihf:/lib:/lib/arm-linux-gnueabihf"
export LD_LIBRARY_PATH="${SCRIPT_DIR}/runtime/lib:${SYSTEM_LIB_DIRS}:${LD_LIBRARY_PATH:-}"

if [[ -d "${SCRIPT_DIR}/runtime/soapy-modules" ]]; then
export SOAPY_SDR_PLUGIN_PATH="${SCRIPT_DIR}/runtime/soapy-modules:${SOAPY_SDR_PLUGIN_PATH:-}"
elif [[ -f "${SCRIPT_DIR}/SOAPY_PLUGIN_PATH.txt" ]]; then
export SOAPY_SDR_PLUGIN_PATH="$(cat "${SCRIPT_DIR}/SOAPY_PLUGIN_PATH.txt"):${SOAPY_SDR_PLUGIN_PATH:-}"
fi

exec "${SCRIPT_DIR}/runtime/bin/fmrtx" "$@"
EOF
chmod +x "${DIST_DIR}/run-fmrtx.sh"
}

write_install_helper() {
cat > "${DIST_DIR}/install.sh" <<EOF
#!/usr/bin/env bash
set -euo pipefail
PREFIX="{1:-$PREFIX}"
sudo mkdir -p "\$PREFIX/bin" "\$PREFIX/lib"
sudo cp -v "${DIST_DIR}/runtime/bin/fmrtx" "\$PREFIX/bin/"
sudo cp -v ${DIST_DIR}/runtime/lib/* "\$PREFIX/lib/" 2>/dev/null || true
cat <<'EON'
Installed.
Run with e.g.:
LD_LIBRARY_PATH=\$PREFIX/lib \$PREFIX/bin/fmrtx --help
EON
EOF
# replace placeholder introduced to avoid accidental shell expansion confusion
sed -i 's/{1:-/\${1:-/g' "${DIST_DIR}/install.sh"
chmod +x "${DIST_DIR}/install.sh"
}

main() {
need_cmd bash
need_cmd uname
need_cmd awk
need_cmd sed
need_cmd cp
need_cmd mkdir
need_cmd ldd

mkdir -p "$BUILD_DIR" "$RUNTIME_DIR/bin" "$RUNTIME_DIR/lib"

if [[ "${SKIP_APT:-0}" != "1" ]]; then
log "Refreshing apt metadata"
sudo apt-get update

apt_install_if_missing \
ca-certificates \
curl \
wget \
git \
build-essential \
pkg-config \
gcc \
g++ \
make \
file \
binutils \
tar \
xz-utils \
libiio0 \
libiio-dev \
libusb-1.0-0 \
libxml2 \
libxml2-dev

# Optional / best-effort packages. Not all repos expose them on every arch.
sudo apt-get install -y soapysdr-tools libsoapysdr0.8 libsoapysdr-dev 2>/dev/null || true
sudo apt-get install -y soapy-module-plutosdr 2>/dev/null || true
sudo apt-get install -y iio-oscilloscope 2>/dev/null || true
sudo apt-get install -y libusb-1.0-0-dev 2>/dev/null || true
fi

install_go
need_cmd go

export PATH="/usr/local/go/bin:${PATH}"
export CGO_ENABLED=1
export GOOS=linux

case "$ARCH" in
armhf)
export GOARCH=arm
export GOARM=7
;;
arm64)
export GOARCH=arm64
;;
amd64)
export GOARCH=amd64
;;
*)
echo "Unsupported architecture: $ARCH" >&2
exit 1
;;
esac

log "Build environment"
go version
echo "ARCH=${ARCH} GOOS=${GOOS} GOARCH=${GOARCH:-} GOARM=${GOARM:-} CGO_ENABLED=${CGO_ENABLED}"

log "Tidying modules"
(cd "$REPO_DIR" && go mod tidy)

log "Building fmrtx with Linux Pluto/libiio-first backend (tags: $BUILD_TAGS)"
(cd "$REPO_DIR" && go build -v -tags "$BUILD_TAGS" -o "$RUNTIME_DIR/bin/fmrtx" ./cmd/fmrtx)

log "Collecting runtime libraries"
copy_lib_if_found "libiio.so.0"
copy_lib_if_found "libSoapySDR.so.0.8"
copy_lib_if_found "libusb-1.0.so.0"
copy_lib_if_found "libxml2.so.2"
copy_lib_if_found "libstdc++.so.6"
copy_lib_if_found "libgcc_s.so.1"
copy_lib_if_found "libm.so.6"
copy_lib_if_found "libc.so.6"
if [[ "$BUILD_TAGS" == *soapy* ]]; then
copy_soapy_plugins_if_found
else
log "Skipping Soapy plugin copy (BUILD_TAGS=$BUILD_TAGS)"
fi

log "Writing helper scripts"
write_runner
write_install_helper

log "Writing build manifest"
cat > "${DIST_DIR}/BUILD-INFO.txt" <<EOF
fm-rds-tx Orange Pi build
========================
Date: $(date -Is)
Host: $(uname -a)
Repo: $REPO_DIR
Dist: $DIST_DIR
Architecture: $ARCH
Go: $(go version)

Build command:
go build -tags $BUILD_TAGS -o runtime/bin/fmrtx ./cmd/fmrtx

Important note:
Linux Pluto builds should prefer the native libiio path (`-tags pluto`).
SoapySDR is optional fallback/debug infrastructure only.
Windows Pluto handling is separate and intentionally untouched by this script.
EOF

log "Binary info"
file "$RUNTIME_DIR/bin/fmrtx" || true

log "Dynamic dependencies of built binary"
ldd "$RUNTIME_DIR/bin/fmrtx" || true

if [[ "$BUILD_TAGS" == *soapy* ]]; then
log "Wrapper self-test: fmrtx --list-devices"
"${DIST_DIR}/run-fmrtx.sh" --list-devices || true
else
log "Wrapper self-test: fmrtx --print-config"
"${DIST_DIR}/run-fmrtx.sh" --print-config || true
fi

log "Done. Artifacts are in: $DIST_DIR"
cat <<EOF

Next steps:
1. Copy/use: $DIST_DIR/runtime/bin/fmrtx
2. Libraries are in: $DIST_DIR/runtime/lib/
3. Launch via wrapper:
$DIST_DIR/run-fmrtx.sh --help
4. Install to target prefix:
$DIST_DIR/install.sh $PREFIX

Reminder:
Default Linux packaging now prefers the native libiio Pluto backend.
Use BUILD_TAGS=soapy only when you explicitly want the Soapy path.
Windows Pluto support is intentionally left on its separate path.
EOF
}

main "$@"

+ 2
- 0
stream_tx.bat Просмотреть файл

@@ -0,0 +1,2 @@
@echo off
ffmpeg -i "http://stream.srg-ssr.ch/m/drs3/mp3_128" -f s16le -ar 44100 -ac 2 - | fmrtx.exe --tx --tx-auto-start --audio-stdin --config docs/config.plutosdr.json

Загрузка…
Отмена
Сохранить