소스 검색

feat: add live audio ingest pipeline for on-air streaming

Add a lock-free stdin PCM ingest path, streaming resampler, stereo-linked limiting and pre-MPX audio filtering, plus the engine/control wiring needed to drive live audio into TX mode. Also document the ingest API and include a helper batch script for piping ffmpeg audio into fmrtx.
tags/v0.9.0
Jan Svabenik 1 개월 전
부모
커밋
59c338ebda
12개의 변경된 파일988개의 추가작업 그리고 24개의 파일을 삭제
  1. +26
    -2
      cmd/fmrtx/main.go
  2. +100
    -0
      docs/API.md
  3. +25
    -0
      internal/app/engine.go
  4. +196
    -0
      internal/audio/stream.go
  5. +376
    -0
      internal/audio/stream_test.go
  6. +61
    -4
      internal/control/control.go
  7. +54
    -0
      internal/dsp/biquad.go
  8. +68
    -0
      internal/dsp/stereolimiter.go
  9. +72
    -16
      internal/offline/generator.go
  10. +4
    -1
      internal/offline/generator_test.go
  11. +4
    -1
      internal/rds/encoder.go
  12. +2
    -0
      stream_tx.bat

+ 26
- 2
cmd/fmrtx/main.go 파일 보기

@@ -12,6 +12,7 @@ import (
"time"

apppkg "github.com/jan/fm-rds-tx/internal/app"
"github.com/jan/fm-rds-tx/internal/audio"
cfgpkg "github.com/jan/fm-rds-tx/internal/config"
ctrlpkg "github.com/jan/fm-rds-tx/internal/control"
drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
@@ -31,6 +32,8 @@ func main() {
txMode := flag.Bool("tx", false, "start real TX mode (requires hardware + build tags)")
txAutoStart := flag.Bool("tx-auto-start", false, "auto-start TX on launch")
listDevices := flag.Bool("list-devices", false, "enumerate SoapySDR devices and exit")
audioStdin := flag.Bool("audio-stdin", false, "read S16LE stereo PCM audio from stdin")
audioRate := flag.Int("audio-rate", 44100, "sample rate of stdin audio input (Hz)")
flag.Parse()

// --- list-devices (SoapySDR) ---
@@ -99,7 +102,7 @@ func main() {
if driver == nil {
log.Fatal("no hardware driver available — build with -tags pluto (or -tags soapy)")
}
runTXMode(cfg, driver, *txAutoStart)
runTXMode(cfg, driver, *txAutoStart, *audioStdin, *audioRate)
return
}

@@ -142,7 +145,7 @@ func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver {
return nil
}

func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool) {
func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, audioStdin bool, audioRate int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

@@ -172,10 +175,31 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool) {
// Engine
engine := apppkg.NewEngine(cfg, driver)

// Live audio stream source (optional)
var streamSrc *audio.StreamSource
if audioStdin {
// Buffer: 2 seconds at input rate — enough to absorb jitter
streamSrc = audio.NewStreamSource(audioRate*2, audioRate)
engine.SetStreamSource(streamSrc)

// Stdin ingest goroutine
go func() {
log.Printf("audio: reading S16LE stereo PCM from stdin at %d Hz", audioRate)
if err := audio.IngestReader(os.Stdin, streamSrc); err != nil {
log.Printf("audio: stdin ingest ended: %v", err)
} else {
log.Println("audio: stdin EOF")
}
}()
}

// Control plane
srv := ctrlpkg.NewServer(cfg)
srv.SetDriver(driver)
srv.SetTXController(&txBridge{engine: engine})
if streamSrc != nil {
srv.SetStreamSource(streamSrc)
}

if autoStart {
log.Println("TX: auto-start enabled")


+ 100
- 0
docs/API.md 파일 보기

@@ -218,3 +218,103 @@ These cannot be hot-reloaded (they affect DSP pipeline structure):
- `rds.pi` / `rds.pty` — rarely change, baked into encoder init
- `audio.inputPath` — audio source selection
- `backend.kind` / `backend.device` — hardware selection

---

### `POST /audio/stream`

Push raw audio data into the live stream buffer. Format: **S16LE stereo PCM** at the configured `--audio-rate` (default 44100 Hz).

Requires `--audio-stdin` or a configured stream source.

**Request:** Binary body, `application/octet-stream`, raw S16LE stereo PCM bytes.

**Response:**
```json
{
"ok": true,
"frames": 4096,
"stats": {
"available": 12000,
"capacity": 131072,
"buffered": 0.09,
"written": 890000,
"underruns": 0,
"overflows": 0
}
}
```

**Example:**
```bash
# Push a file
ffmpeg -i song.mp3 -f s16le -ar 44100 -ac 2 - | \
curl -X POST --data-binary @- http://pluto:8088/audio/stream
```

**Errors:**
- `405` if not POST
- `503` if no audio stream configured

---

## Audio Streaming

### Stdin pipe (primary method)

Pipe any audio source through ffmpeg into the transmitter:

```bash
# Internet radio stream
ffmpeg -i "http://stream.example.com/radio.mp3" -f s16le -ar 44100 -ac 2 - | \
fmrtx --tx --tx-auto-start --audio-stdin --config config.json

# Local music file
ffmpeg -i music.flac -f s16le -ar 44100 -ac 2 - | \
fmrtx --tx --tx-auto-start --audio-stdin

# Playlist (ffmpeg concat)
ffmpeg -f concat -i playlist.txt -f s16le -ar 44100 -ac 2 - | \
fmrtx --tx --tx-auto-start --audio-stdin

# PulseAudio / ALSA capture (Linux)
parecord --format=s16le --rate=44100 --channels=2 - | \
fmrtx --tx --tx-auto-start --audio-stdin

# Custom sample rate (e.g. 48kHz source)
ffmpeg -i source.wav -f s16le -ar 48000 -ac 2 - | \
fmrtx --tx --tx-auto-start --audio-stdin --audio-rate 48000
```

### HTTP audio push

Push audio from a remote machine via the HTTP API:

```bash
# From another machine on the network
ffmpeg -i music.mp3 -f s16le -ar 44100 -ac 2 - | \
curl -X POST --data-binary @- http://pluto-host:8088/audio/stream
```

### Audio buffer

The stream uses a lock-free ring buffer (default: 2 seconds at input rate). Buffer stats are available in `GET /runtime` under `audioStream`:

```json
{
"audioStream": {
"available": 12000,
"capacity": 131072,
"buffered": 0.09,
"written": 890000,
"underruns": 0,
"overflows": 0
}
}
```

- **underruns**: DSP consumed faster than audio arrived (silence inserted)
- **overflows**: Audio arrived faster than DSP consumed (data dropped)
- **buffered**: Fill ratio (0.0 = empty, 1.0 = full)

When no audio is streaming, the transmitter falls back to the configured tone generator or silence.

+ 25
- 0
internal/app/engine.go 파일 보기

@@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"

"github.com/jan/fm-rds-tx/internal/audio"
cfgpkg "github.com/jan/fm-rds-tx/internal/config"
"github.com/jan/fm-rds-tx/internal/dsp"
offpkg "github.com/jan/fm-rds-tx/internal/offline"
@@ -70,6 +71,30 @@ type Engine struct {

// Live config: pending frequency change, applied between chunks
pendingFreq atomic.Pointer[float64]

// Live audio stream (optional)
streamSrc *audio.StreamSource
}

// SetStreamSource configures a live audio stream as the audio source.
// Must be called before Start(). The StreamResampler is created internally
// to convert from the stream's sample rate to the DSP composite rate.
func (e *Engine) SetStreamSource(src *audio.StreamSource) {
e.streamSrc = src
compositeRate := float64(e.cfg.FM.CompositeRateHz)
if compositeRate <= 0 {
compositeRate = 228000
}
resampler := audio.NewStreamResampler(src, compositeRate)
e.generator.SetExternalSource(resampler)
log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
src.SampleRate, compositeRate, src.Stats().Capacity)
}

// StreamSource returns the live audio stream source, or nil.
// Used by the control server for stats and HTTP audio ingest.
func (e *Engine) StreamSource() *audio.StreamSource {
return e.streamSrc
}

func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {


+ 196
- 0
internal/audio/stream.go 파일 보기

@@ -0,0 +1,196 @@
package audio

import (
"encoding/binary"
"fmt"
"io"
"sync/atomic"
)

// StreamSource is a lock-free SPSC (single-producer, single-consumer) ring buffer
// for real-time audio streaming. One goroutine writes PCM frames, the DSP
// goroutine reads them via NextFrame(). Returns silence on underrun.
//
// Zero allocations in steady state. No mutex in the read or write path.
type StreamSource struct {
ring []Frame
size int
mask int // size-1, for fast modulo (size must be power of 2)
SampleRate int

writePos atomic.Int64
readPos atomic.Int64

Underruns atomic.Uint64
Overflows atomic.Uint64
Written atomic.Uint64
}

// NewStreamSource creates a ring buffer with the given capacity (rounded up
// to next power of 2) and input sample rate.
func NewStreamSource(capacity, sampleRate int) *StreamSource {
// Round up to power of 2
size := 1
for size < capacity {
size <<= 1
}
return &StreamSource{
ring: make([]Frame, size),
size: size,
mask: size - 1,
SampleRate: sampleRate,
}
}

// WriteFrame pushes a single frame into the ring buffer.
// Returns false if the buffer is full (overflow).
func (s *StreamSource) WriteFrame(f Frame) bool {
wp := s.writePos.Load()
rp := s.readPos.Load()
if wp-rp >= int64(s.size) {
s.Overflows.Add(1)
return false
}
s.ring[int(wp)&s.mask] = f
s.writePos.Add(1)
s.Written.Add(1)
return true
}

// WritePCM decodes interleaved S16LE stereo PCM bytes and writes frames
// to the ring buffer. Returns the number of frames written.
func (s *StreamSource) WritePCM(data []byte) int {
frames := len(data) / 4 // 2 channels × 2 bytes per sample
written := 0
for i := 0; i < frames; i++ {
off := i * 4
l := int16(binary.LittleEndian.Uint16(data[off:]))
r := int16(binary.LittleEndian.Uint16(data[off+2:]))
f := NewFrame(
Sample(float64(l)/32768.0),
Sample(float64(r)/32768.0),
)
if !s.WriteFrame(f) {
break
}
written++
}
return written
}

// ReadFrame consumes one frame from the ring buffer.
// Returns silence (0,0) on underrun.
func (s *StreamSource) ReadFrame() Frame {
rp := s.readPos.Load()
wp := s.writePos.Load()
if rp >= wp {
s.Underruns.Add(1)
return NewFrame(0, 0)
}
f := s.ring[int(rp)&s.mask]
s.readPos.Add(1)
return f
}

// NextFrame implements the frameSource interface.
func (s *StreamSource) NextFrame() Frame {
return s.ReadFrame()
}

// Available returns the number of frames currently buffered.
func (s *StreamSource) Available() int {
return int(s.writePos.Load() - s.readPos.Load())
}

// Buffered returns the fill ratio (0.0 = empty, 1.0 = full).
func (s *StreamSource) Buffered() float64 {
return float64(s.Available()) / float64(s.size)
}

// Stats returns diagnostic counters.
func (s *StreamSource) Stats() StreamStats {
return StreamStats{
Available: s.Available(),
Capacity: s.size,
Buffered: s.Buffered(),
Written: s.Written.Load(),
Underruns: s.Underruns.Load(),
Overflows: s.Overflows.Load(),
}
}

// StreamStats exposes runtime telemetry for the stream buffer.
type StreamStats struct {
Available int `json:"available"`
Capacity int `json:"capacity"`
Buffered float64 `json:"buffered"`
Written uint64 `json:"written"`
Underruns uint64 `json:"underruns"`
Overflows uint64 `json:"overflows"`
}

// --- StreamResampler ---

// StreamResampler wraps a StreamSource and rate-converts from the stream's
// native sample rate to the target output rate using linear interpolation.
// Consumes input frames on demand — no buffering beyond the ring buffer.
type StreamResampler struct {
src *StreamSource
ratio float64 // inputRate / outputRate (< 1 when upsampling)
pos float64
prev Frame
curr Frame
}

// NewStreamResampler creates a streaming resampler.
func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler {
if src == nil || outputRate <= 0 || src.SampleRate <= 0 {
return &StreamResampler{src: src, ratio: 1.0}
}
return &StreamResampler{
src: src,
ratio: float64(src.SampleRate) / outputRate,
}
}

// NextFrame returns the next interpolated frame at the output rate.
// Implements the frameSource interface.
func (r *StreamResampler) NextFrame() Frame {
if r.src == nil {
return NewFrame(0, 0)
}

// Consume input samples as the fractional position advances
for r.pos >= 1.0 {
r.prev = r.curr
r.curr = r.src.ReadFrame()
r.pos -= 1.0
}

frac := r.pos
l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac
ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac
r.pos += r.ratio
return NewFrame(Sample(l), Sample(ri))
}

// --- Ingest helpers ---

// IngestReader continuously reads S16LE stereo PCM from an io.Reader into
// a StreamSource. Blocks until the reader returns an error or io.EOF.
// Designed to run as a goroutine.
func IngestReader(r io.Reader, dst *StreamSource) error {
buf := make([]byte, 16384) // 4096 frames per read (16KB)
for {
n, err := r.Read(buf)
if n > 0 {
dst.WritePCM(buf[:n])
}
if err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("audio ingest: %w", err)
}
}
}

+ 376
- 0
internal/audio/stream_test.go 파일 보기

@@ -0,0 +1,376 @@
package audio

import (
"bytes"
"encoding/binary"
"io"
"math"
"sync"
"sync/atomic"
"testing"
)

func TestStreamSource_WriteRead(t *testing.T) {
s := NewStreamSource(1024, 44100)
if s.size != 1024 {
t.Fatalf("expected size 1024, got %d", s.size)
}

// Write and read a frame
f := NewFrame(0.5, -0.3)
if !s.WriteFrame(f) {
t.Fatal("write failed")
}
if s.Available() != 1 {
t.Fatalf("expected 1 available, got %d", s.Available())
}

out := s.ReadFrame()
if out.L != 0.5 || out.R != -0.3 {
t.Fatalf("read mismatch: got L=%.2f R=%.2f", out.L, out.R)
}
if s.Available() != 0 {
t.Fatalf("expected 0 available, got %d", s.Available())
}
}

func TestStreamSource_Underrun(t *testing.T) {
s := NewStreamSource(16, 44100)

// Read from empty buffer — should return silence
f := s.ReadFrame()
if f.L != 0 || f.R != 0 {
t.Fatal("expected silence on underrun")
}
if s.Underruns.Load() != 1 {
t.Fatalf("expected 1 underrun, got %d", s.Underruns.Load())
}
}

func TestStreamSource_Overflow(t *testing.T) {
s := NewStreamSource(4, 44100) // size rounds up to 4

// Fill completely
for i := 0; i < 4; i++ {
if !s.WriteFrame(NewFrame(Sample(float64(i)/10), 0)) {
t.Fatalf("write %d failed", i)
}
}

// Next write should overflow
if s.WriteFrame(NewFrame(1, 1)) {
t.Fatal("expected overflow")
}
if s.Overflows.Load() != 1 {
t.Fatalf("expected 1 overflow, got %d", s.Overflows.Load())
}
}

func TestStreamSource_PowerOf2Rounding(t *testing.T) {
tests := []struct{ in, expect int }{
{1, 1}, {2, 2}, {3, 4}, {5, 8}, {100, 128}, {1024, 1024}, {1025, 2048},
}
for _, tt := range tests {
s := NewStreamSource(tt.in, 44100)
if s.size != tt.expect {
t.Fatalf("NewStreamSource(%d): size=%d, expected %d", tt.in, s.size, tt.expect)
}
}
}

func TestStreamSource_FIFO(t *testing.T) {
s := NewStreamSource(64, 44100)
n := 50
for i := 0; i < n; i++ {
s.WriteFrame(NewFrame(Sample(float64(i)), 0))
}
for i := 0; i < n; i++ {
f := s.ReadFrame()
if int(f.L) != i {
t.Fatalf("FIFO order broken at %d: got %d", i, int(f.L))
}
}
}

func TestStreamSource_Wraparound(t *testing.T) {
s := NewStreamSource(8, 44100) // size = 8

// Write and read more than buffer size to test wraparound
for round := 0; round < 10; round++ {
for i := 0; i < 8; i++ {
val := float64(round*8 + i)
if !s.WriteFrame(NewFrame(Sample(val), 0)) {
t.Fatalf("write failed round=%d i=%d", round, i)
}
}
for i := 0; i < 8; i++ {
expected := float64(round*8 + i)
f := s.ReadFrame()
if float64(f.L) != expected {
t.Fatalf("round=%d i=%d: got %f expected %f", round, i, float64(f.L), expected)
}
}
}

stats := s.Stats()
if stats.Underruns != 0 || stats.Overflows != 0 {
t.Fatalf("unexpected errors: underruns=%d overflows=%d", stats.Underruns, stats.Overflows)
}
}

func TestStreamSource_WritePCM(t *testing.T) {
s := NewStreamSource(256, 44100)

// Create 10 stereo frames of S16LE PCM
var buf bytes.Buffer
for i := 0; i < 10; i++ {
l := int16(i * 1000)
r := int16(-i * 1000)
binary.Write(&buf, binary.LittleEndian, l)
binary.Write(&buf, binary.LittleEndian, r)
}

written := s.WritePCM(buf.Bytes())
if written != 10 {
t.Fatalf("expected 10 frames, wrote %d", written)
}

// Verify first frame
f := s.ReadFrame()
if f.L != 0 || f.R != 0 {
t.Fatalf("frame 0: L=%.4f R=%.4f, expected 0", f.L, f.R)
}
// Verify frame 5
for i := 1; i < 5; i++ {
s.ReadFrame()
}
f = s.ReadFrame()
expectedL := 5000.0 / 32768.0
if math.Abs(float64(f.L)-expectedL) > 0.001 {
t.Fatalf("frame 5 L=%.4f, expected %.4f", f.L, expectedL)
}
}

func TestStreamSource_ConcurrentSPSC(t *testing.T) {
s := NewStreamSource(4096, 44100)
frames := 50000
var producerDone atomic.Bool

var wg sync.WaitGroup
wg.Add(2)

// Producer
go func() {
defer wg.Done()
for i := 0; i < frames; i++ {
for !s.WriteFrame(NewFrame(Sample(float64(i+1)), 0)) {
// Buffer full — yield
}
}
producerDone.Store(true)
}()

// Consumer
var lastVal float64
var orderOK = true
var readCount int
go func() {
defer wg.Done()
for {
if s.Available() == 0 {
if producerDone.Load() {
break
}
continue
}
f := s.ReadFrame()
readCount++
v := float64(f.L)
if v > 0 && v < lastVal {
orderOK = false
}
if v > 0 {
lastVal = v
}
}
}()

wg.Wait()

if !orderOK {
t.Fatal("FIFO order broken in concurrent SPSC")
}
if readCount < frames/2 {
t.Fatalf("read too few frames: %d (expected ~%d)", readCount, frames)
}
}

// --- StreamResampler tests ---

func TestStreamResampler_1to1(t *testing.T) {
s := NewStreamSource(256, 44100)
r := NewStreamResampler(s, 44100) // 1:1

for i := 0; i < 100; i++ {
s.WriteFrame(NewFrame(Sample(float64(i)/100), 0))
}

// At 1:1 ratio, output should track input with a small startup delay.
// Skip first few samples (resampler priming), then verify monotonic increase.
prev := -1.0
for i := 0; i < 90; i++ {
f := r.NextFrame()
v := float64(f.L)
if i > 5 && v < prev-0.001 {
t.Fatalf("sample %d: non-monotonic %.4f < %.4f", i, v, prev)
}
if v > 0 {
prev = v
}
}
// Final value should be close to 0.9 (we wrote 0..0.99)
if prev < 0.5 {
t.Fatalf("final value %.4f too low (expected > 0.5)", prev)
}
}

func TestStreamResampler_Upsample(t *testing.T) {
// 44100 → 228000 (ratio ≈ 0.1934, ~5.17× upsampling)
s := NewStreamSource(4096, 44100)
r := NewStreamResampler(s, 228000)

// Write 1000 frames of a 1kHz sine at 44100 Hz
for i := 0; i < 1000; i++ {
v := math.Sin(2 * math.Pi * 1000 * float64(i) / 44100)
s.WriteFrame(NewFrame(Sample(v), Sample(v)))
}

// Read upsampled output — should be ~5170 samples for 1000 input
// (minus a few for resampler priming)
out := make([]float64, 0, 5200)
for i := 0; i < 5000; i++ {
f := r.NextFrame()
out = append(out, float64(f.L))
}

// Verify the output is a smooth sine, not clicks or zeros
// Check that max amplitude is close to 1.0
maxAmp := 0.0
for _, v := range out[100:] { // skip initial ramp
if math.Abs(v) > maxAmp {
maxAmp = math.Abs(v)
}
}
if maxAmp < 0.8 {
t.Fatalf("max amplitude %.4f too low (expected ~1.0)", maxAmp)
}

// Check smoothness: no sudden jumps > 0.1 between adjacent samples
maxJump := 0.0
for i := 101; i < len(out); i++ {
d := math.Abs(out[i] - out[i-1])
if d > maxJump {
maxJump = d
}
}
// At 228kHz with 1kHz tone: max step ≈ sin(2π*1000/228000) ≈ 0.0276
if maxJump > 0.05 {
t.Fatalf("max inter-sample jump %.4f (expected < 0.05 for smooth sine)", maxJump)
}
}

func TestStreamResampler_Downsample(t *testing.T) {
// 96000 → 44100 (ratio ≈ 2.177, downsampling)
s := NewStreamSource(8192, 96000)
r := NewStreamResampler(s, 44100)

// Write 4000 frames at 96kHz
for i := 0; i < 4000; i++ {
v := math.Sin(2 * math.Pi * 440 * float64(i) / 96000)
s.WriteFrame(NewFrame(Sample(v), 0))
}

// Should get ~1837 output frames (4000 × 44100/96000)
count := 0
for i := 0; i < 1800; i++ {
f := r.NextFrame()
_ = f
count++
}
if count != 1800 {
t.Fatalf("expected 1800 reads, got %d", count)
}
}

func TestStreamResampler_NilSource(t *testing.T) {
r := NewStreamResampler(nil, 228000)
f := r.NextFrame()
if f.L != 0 || f.R != 0 {
t.Fatal("expected silence from nil source")
}
}

// --- IngestReader test ---

func TestIngestReader(t *testing.T) {
s := NewStreamSource(4096, 44100)

// Create PCM data: 100 stereo frames
var buf bytes.Buffer
for i := 0; i < 100; i++ {
l := int16(i * 100)
r := int16(-i * 100)
binary.Write(&buf, binary.LittleEndian, l)
binary.Write(&buf, binary.LittleEndian, r)
}

// IngestReader should read all data and return nil (EOF)
err := IngestReader(bytes.NewReader(buf.Bytes()), s)
if err != nil {
t.Fatalf("IngestReader: %v", err)
}

if s.Available() != 100 {
t.Fatalf("expected 100 frames, got %d", s.Available())
}

// Verify first and last
f := s.ReadFrame()
if f.L != 0 {
t.Fatalf("frame 0 L=%.4f, expected 0", f.L)
}
for i := 1; i < 99; i++ {
s.ReadFrame()
}
f = s.ReadFrame()
expectedL := 9900.0 / 32768.0
if math.Abs(float64(f.L)-expectedL) > 0.01 {
t.Fatalf("frame 99 L=%.4f, expected ~%.4f", f.L, expectedL)
}
}

func TestIngestReader_Error(t *testing.T) {
s := NewStreamSource(256, 44100)
errReader := &errAfterN{n: 10}
err := IngestReader(errReader, s)
if err == nil {
t.Fatal("expected error")
}
}

type errAfterN struct {
n, count int
}

func (r *errAfterN) Read(p []byte) (int, error) {
if r.count >= r.n {
return 0, io.ErrUnexpectedEOF
}
r.count++
// Return 4 bytes (one stereo frame)
if len(p) >= 4 {
p[0], p[1], p[2], p[3] = 0, 0, 0, 0
return 4, nil
}
return 0, nil
}

+ 61
- 4
internal/control/control.go 파일 보기

@@ -3,9 +3,11 @@ package control
import (
_ "embed"
"encoding/json"
"io"
"net/http"
"sync"

"github.com/jan/fm-rds-tx/internal/audio"
"github.com/jan/fm-rds-tx/internal/config"
drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
"github.com/jan/fm-rds-tx/internal/platform"
@@ -39,10 +41,11 @@ type LivePatch struct {
}

type Server struct {
mu sync.RWMutex
cfg config.Config
tx TXController
drv platform.SoapyDriver // optional, for runtime stats
mu sync.RWMutex
cfg config.Config
tx TXController
drv platform.SoapyDriver // optional, for runtime stats
streamSrc *audio.StreamSource // optional, for live audio ingest
}

type ConfigPatch struct {
@@ -78,6 +81,12 @@ func (s *Server) SetDriver(drv platform.SoapyDriver) {
s.mu.Unlock()
}

func (s *Server) SetStreamSource(src *audio.StreamSource) {
s.mu.Lock()
s.streamSrc = src
s.mu.Unlock()
}

func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/", s.handleUI)
@@ -88,6 +97,7 @@ func (s *Server) Handler() http.Handler {
mux.HandleFunc("/runtime", s.handleRuntime)
mux.HandleFunc("/tx/start", s.handleTXStart)
mux.HandleFunc("/tx/stop", s.handleTXStop)
mux.HandleFunc("/audio/stream", s.handleAudioStream)
return mux
}

@@ -128,6 +138,7 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
s.mu.RLock()
drv := s.drv
tx := s.tx
stream := s.streamSrc
s.mu.RUnlock()

result := map[string]any{}
@@ -137,10 +148,56 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
if tx != nil {
result["engine"] = tx.TXStats()
}
if stream != nil {
result["audioStream"] = stream.Stats()
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(result)
}

// handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
// it into the live audio ring buffer. Use with:
// curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
// ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
s.mu.RLock()
stream := s.streamSrc
s.mu.RUnlock()

if stream == nil {
http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
return
}

// Read body in chunks and push to ring buffer
buf := make([]byte, 32768)
totalFrames := 0
for {
n, err := r.Body.Read(buf)
if n > 0 {
totalFrames += stream.WritePCM(buf[:n])
}
if err != nil {
if err == io.EOF {
break
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"ok": true,
"frames": totalFrames,
"stats": stream.Stats(),
})
}

func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)


+ 54
- 0
internal/dsp/biquad.go 파일 보기

@@ -0,0 +1,54 @@
package dsp

import "math"

// BiquadLPF is a second-order Butterworth lowpass filter (biquad, direct form II transposed).
// Used after the audio limiter to remove intermodulation products and harmonics
// that could fall into the 19kHz pilot, 38kHz stereo sub, or 57kHz RDS bands.
//
// At 228kHz with fc=15kHz:
// 15kHz: -3 dB (corner)
// 19kHz: -5 dB
// 38kHz: -18 dB
// 57kHz: -27 dB ← protects RDS band
type BiquadLPF struct {
b0, b1, b2 float64
a1, a2 float64
z1, z2 float64 // state (direct form II transposed)
}

// NewBiquadLPF creates a 2nd-order Butterworth lowpass at the given cutoff.
func NewBiquadLPF(cutoffHz, sampleRate float64) *BiquadLPF {
if cutoffHz <= 0 || sampleRate <= 0 || cutoffHz >= sampleRate/2 {
// Passthrough: return unity filter
return &BiquadLPF{b0: 1}
}

omega := 2 * math.Pi * cutoffHz / sampleRate
cosW := math.Cos(omega)
sinW := math.Sin(omega)
alpha := sinW / (2 * math.Sqrt2) // Q = 1/√2 for Butterworth

a0 := 1 + alpha
return &BiquadLPF{
b0: (1 - cosW) / 2 / a0,
b1: (1 - cosW) / a0,
b2: (1 - cosW) / 2 / a0,
a1: (-2 * cosW) / a0,
a2: (1 - alpha) / a0,
}
}

// Process filters a single sample.
func (f *BiquadLPF) Process(in float64) float64 {
out := f.b0*in + f.z1
f.z1 = f.b1*in - f.a1*out + f.z2
f.z2 = f.b2*in - f.a2*out
return out
}

// Reset clears the filter state.
func (f *BiquadLPF) Reset() {
f.z1 = 0
f.z2 = 0
}

+ 68
- 0
internal/dsp/stereolimiter.go 파일 보기

@@ -0,0 +1,68 @@
package dsp

import "math"

// StereoLimiter applies identical gain reduction to L and R channels,
// driven by the peak of max(|L|, |R|). This preserves the stereo image
// while preventing either channel from exceeding the ceiling.
//
// Attack is INSTANTANEOUS — gain is reduced in the same sample that
// exceeds the ceiling. This avoids overshoot entirely, which is critical
// because overshoot causes composite clipping that destroys pilot/RDS.
// Unlike hard clipping, gain scaling preserves the waveform shape and
// does not create harmonics.
//
// Release is smooth (exponential decay) to avoid audible pumping.
type StereoLimiter struct {
ceiling float64
releaseCoeff float64
gainReduction float64
}

// NewStereoLimiter creates a stereo-linked limiter with instant attack.
// releaseMs controls how quickly gain recovers after a peak (typ. 50-200ms).
func NewStereoLimiter(ceiling, attackMs, releaseMs, sampleRate float64) *StereoLimiter {
if ceiling <= 0 {
ceiling = 1.0
}
if releaseMs <= 0 {
releaseMs = 100
}
releaseSamples := releaseMs * sampleRate / 1000

return &StereoLimiter{
ceiling: ceiling,
releaseCoeff: 1.0 - math.Exp(-1.0/releaseSamples),
}
}

// Process applies stereo-linked limiting. Both channels receive the
// same gain factor, determined by the louder of the two.
//
// If the peak exceeds ceiling, gain is INSTANTLY reduced (zero overshoot).
// When the signal drops below ceiling, gain recovers smoothly via release.
func (l *StereoLimiter) Process(left, right float64) (float64, float64) {
peak := math.Max(math.Abs(left), math.Abs(right))

// Target: how much gain reduction do we need right now?
targetReduction := 0.0
if peak > l.ceiling {
targetReduction = 1.0 - l.ceiling/peak
}

// Instant attack: if we need MORE reduction, apply it NOW.
// Smooth release: if we need LESS reduction, decay slowly.
if targetReduction > l.gainReduction {
l.gainReduction = targetReduction // instant
} else {
l.gainReduction += l.releaseCoeff * (targetReduction - l.gainReduction) // smooth
}

gain := 1.0 - l.gainReduction
return left * gain, right * gain
}

// Reset clears the limiter state.
func (l *StereoLimiter) Reset() {
l.gainReduction = 0
}

+ 72
- 16
internal/offline/generator.go 파일 보기

@@ -77,7 +77,8 @@ type Generator struct {
stereoEncoder stereo.StereoEncoder
rdsEnc *rds.Encoder
combiner mpx.DefaultCombiner
limiter *dsp.MPXLimiter
limiter *dsp.StereoLimiter // stereo-linked, operates on L/R BEFORE stereo encoding
lpfL, lpfR *dsp.BiquadLPF // 15kHz lowpass after limiter, protects RDS band
fmMod *dsp.FMModulator
sampleRate float64
initialized bool
@@ -89,12 +90,23 @@ type Generator struct {

// Live-updatable DSP parameters — written by control API, read per chunk.
liveParams atomic.Pointer[LiveParams]

// Optional external audio source (e.g. StreamResampler for live audio).
// When set, takes priority over WAV/tones in sourceFor().
externalSource frameSource
}

func NewGenerator(cfg cfgpkg.Config) *Generator {
return &Generator{cfg: cfg}
}

// SetExternalSource sets a live audio source (e.g. StreamResampler) that
// takes priority over WAV/tone sources. Must be called before the first
// GenerateFrame() call (i.e. before init).
func (g *Generator) SetExternalSource(src frameSource) {
g.externalSource = src
}

// UpdateLive hot-swaps DSP parameters. Thread-safe — called from control API,
// applied at the next chunk boundary by the DSP goroutine.
func (g *Generator) UpdateLive(p LiveParams) {
@@ -140,9 +152,18 @@ func (g *Generator) init() {
}
ceiling := g.cfg.FM.LimiterCeiling
if ceiling <= 0 { ceiling = 1.0 }
// Audio ceiling leaves headroom for pilot + RDS so total ≤ ceiling
pilotAmp := g.cfg.FM.PilotLevel * g.cfg.FM.OutputDrive
rdsAmp := g.cfg.FM.RDSInjection * g.cfg.FM.OutputDrive
audioCeiling := ceiling - pilotAmp - rdsAmp
if audioCeiling < 0.3 { audioCeiling = 0.3 }
if g.cfg.FM.LimiterEnabled {
g.limiter = dsp.NewMPXLimiter(ceiling, 0.1, 50, g.sampleRate)
g.limiter = dsp.NewStereoLimiter(audioCeiling, 0.5, 100, g.sampleRate)
}
// 15kHz lowpass after limiter — removes limiter gain-step intermodulation
// products that would otherwise fall into pilot/stereo/RDS bands.
g.lpfL = dsp.NewBiquadLPF(15000, g.sampleRate)
g.lpfR = dsp.NewBiquadLPF(15000, g.sampleRate)
if g.cfg.FM.FMModulationEnabled {
g.fmMod = dsp.NewFMModulator(g.sampleRate)
if g.cfg.FM.MaxDeviationHz > 0 { g.fmMod.MaxDeviation = g.cfg.FM.MaxDeviationHz }
@@ -163,6 +184,9 @@ func (g *Generator) init() {
}

func (g *Generator) sourceFor(sampleRate float64) (frameSource, SourceInfo) {
if g.externalSource != nil {
return g.externalSource, SourceInfo{Kind: "stream", SampleRate: sampleRate, Detail: "live audio"}
}
if g.cfg.Audio.InputPath != "" {
if src, err := audio.LoadWAVSource(g.cfg.Audio.InputPath); err == nil {
return audio.NewResampledSource(src, sampleRate), SourceInfo{Kind: "wav", SampleRate: float64(src.SampleRate), Detail: g.cfg.Audio.InputPath}
@@ -199,32 +223,64 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame
lp = &LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0}
}

// Apply live combiner gains
g.combiner.PilotGain = lp.PilotLevel
g.combiner.RDSGain = lp.RDSInjection

// Signal path (matches professional broadcast processors):
// Audio L/R → × Drive → Stereo-linked limiter → Stereo encoder
// → Mono + Stereo sub (from limited audio, natural levels)
// → + Pilot (fixed) → + RDS (fixed) → FM modulator
//
// The limiter never sees the 38kHz subcarrier, so it can't pump
// the stereo difference signal. Pilot and RDS are post-encoder
// at fixed amplitudes, unaffected by audio dynamics.
//
// Audio ceiling is auto-reduced to leave headroom for pilot + RDS,
// so total composite stays within ±ceiling (= ±75kHz deviation).
ceiling := lp.LimiterCeiling
if ceiling <= 0 { ceiling = 1.0 }
pilotAmp := lp.PilotLevel * lp.OutputDrive
rdsAmp := lp.RDSInjection * lp.OutputDrive
audioCeiling := ceiling - pilotAmp - rdsAmp
if audioCeiling < 0.3 { audioCeiling = 0.3 } // safety floor

for i := 0; i < samples; i++ {
in := g.source.NextFrame()

comps := g.stereoEncoder.Encode(in)
if !lp.StereoEnabled {
comps.Stereo = 0; comps.Pilot = 0
// --- Stage 1: Band-limit pre-emphasized audio ---
// The 15kHz LPF goes BEFORE drive+limiter. Pre-emphasis boosts
// HF by up to +13.5dB. Without the LPF, the limiter would waste
// gain reduction on HF peaks that get filtered later, causing
// wild modulation swings (30-163%). With LPF first, the limiter
// sees the final audio bandwidth and sets gain correctly.
l := g.lpfL.Process(float64(in.L))
r := g.lpfR.Process(float64(in.R))

// --- Stage 2: Scale and limit ---
l *= lp.OutputDrive
r *= lp.OutputDrive

if lp.LimiterEnabled && g.limiter != nil {
l, r = g.limiter.Process(l, r)
}

rdsValue := 0.0
// --- Stage 3: Stereo encode the limited, filtered audio ---
limited := audio.NewFrame(audio.Sample(l), audio.Sample(r))
comps := g.stereoEncoder.Encode(limited)

// --- Stage 3: Combine at fixed levels ---
composite := float64(comps.Mono)
if lp.StereoEnabled {
composite += float64(comps.Stereo)
composite += pilotAmp * comps.Pilot
}
if g.rdsEnc != nil && lp.RDSEnabled {
rdsCarrier := g.stereoEncoder.RDSCarrier()
rdsValue = g.rdsEnc.NextSampleWithCarrier(rdsCarrier)
rdsValue := g.rdsEnc.NextSampleWithCarrier(rdsCarrier)
composite += rdsAmp * rdsValue
}

composite := g.combiner.Combine(comps.Mono, comps.Stereo, comps.Pilot, rdsValue)
composite *= lp.OutputDrive

if lp.LimiterEnabled && g.limiter != nil {
composite = g.limiter.Process(composite)
// Final composite safety clip — only fires on brief limiter
// overshoots during fast transients. Clips the entire composite,
// not individual audio bands, so harmonics don't target RDS.
if lp.LimiterEnabled {
composite = dsp.HardClip(composite, ceiling)
}



+ 4
- 1
internal/offline/generator_test.go 파일 보기

@@ -83,8 +83,11 @@ func TestLimiterPreventsClipping(t *testing.T) {
cfg.FM.FMModulationEnabled = false
cfg.Audio.ToneAmplitude = 0.9; cfg.Audio.Gain = 2.0; cfg.FM.OutputDrive = 1.0
frame := NewGenerator(cfg).GenerateFrame(50 * time.Millisecond)
// Total composite (audio + pilot + RDS) should stay within ceiling.
// Audio ceiling is auto-reduced to leave headroom for pilot + RDS.
maxAllowed := cfg.FM.LimiterCeiling + 0.02 // small tolerance for limiter settling
for i, s := range frame.Samples {
if math.Abs(float64(s.I)) > 1.01 { t.Fatalf("sample %d: %.4f exceeds ceiling", i, s.I) }
if math.Abs(float64(s.I)) > maxAllowed { t.Fatalf("sample %d: %.4f exceeds max %.4f", i, s.I, maxAllowed) }
}
}



+ 4
- 1
internal/rds/encoder.go 파일 보기

@@ -178,14 +178,17 @@ func (e *Encoder) NextSampleWithCarrier(carrier float64) float64 {
if e.sampleCount >= e.spb {
if e.bitPos >= bitsPerGroup {
// Apply live text updates at group boundaries (~88ms at 228kHz).
// This is the only place we read the atomics — zero per-sample overhead.
// Atomics are consumed (cleared) after reading to prevent
// re-applying the same text every group and toggling A/B flag.
if ps, ok := e.livePS.Load().(string); ok && ps != "" {
e.scheduler.cfg.PS = ps
e.livePS.Store("") // consumed
}
if rt, ok := e.liveRT.Load().(string); ok && rt != "" {
e.scheduler.cfg.RT = rt
e.scheduler.rtIdx = 0 // restart RT transmission for new text
e.scheduler.rtABFlag = !e.scheduler.rtABFlag // toggle A/B per RDS spec
e.liveRT.Store("") // consumed
}
e.getRDSGroup()
e.bitPos = 0


+ 2
- 0
stream_tx.bat 파일 보기

@@ -0,0 +1,2 @@
@echo off
ffmpeg -i "http://stream.srg-ssr.ch/m/drs3/mp3_128" -f s16le -ar 44100 -ac 2 - | fmrtx.exe --tx --tx-auto-start --audio-stdin --config docs/config.plutosdr.json

불러오는 중...
취소
저장