diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index cfa71f8..069f91e 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -12,6 +12,7 @@ import ( "time" apppkg "github.com/jan/fm-rds-tx/internal/app" + "github.com/jan/fm-rds-tx/internal/audio" cfgpkg "github.com/jan/fm-rds-tx/internal/config" ctrlpkg "github.com/jan/fm-rds-tx/internal/control" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" @@ -31,6 +32,8 @@ func main() { txMode := flag.Bool("tx", false, "start real TX mode (requires hardware + build tags)") txAutoStart := flag.Bool("tx-auto-start", false, "auto-start TX on launch") listDevices := flag.Bool("list-devices", false, "enumerate SoapySDR devices and exit") + audioStdin := flag.Bool("audio-stdin", false, "read S16LE stereo PCM audio from stdin") + audioRate := flag.Int("audio-rate", 44100, "sample rate of stdin audio input (Hz)") flag.Parse() // --- list-devices (SoapySDR) --- @@ -99,7 +102,7 @@ func main() { if driver == nil { log.Fatal("no hardware driver available — build with -tags pluto (or -tags soapy)") } - runTXMode(cfg, driver, *txAutoStart) + runTXMode(cfg, driver, *txAutoStart, *audioStdin, *audioRate) return } @@ -142,7 +145,7 @@ func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver { return nil } -func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool) { +func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, audioStdin bool, audioRate int) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -172,10 +175,31 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool) { // Engine engine := apppkg.NewEngine(cfg, driver) + // Live audio stream source (optional) + var streamSrc *audio.StreamSource + if audioStdin { + // Buffer: 2 seconds at input rate — enough to absorb jitter + streamSrc = audio.NewStreamSource(audioRate*2, audioRate) + engine.SetStreamSource(streamSrc) + + // Stdin ingest goroutine + go func() { + log.Printf("audio: reading S16LE stereo PCM from stdin at %d Hz", audioRate) + if err := audio.IngestReader(os.Stdin, streamSrc); err != nil { + log.Printf("audio: stdin ingest ended: %v", err) + } else { + log.Println("audio: stdin EOF") + } + }() + } + // Control plane srv := ctrlpkg.NewServer(cfg) srv.SetDriver(driver) srv.SetTXController(&txBridge{engine: engine}) + if streamSrc != nil { + srv.SetStreamSource(streamSrc) + } if autoStart { log.Println("TX: auto-start enabled") diff --git a/docs/API.md b/docs/API.md index 8e3b096..b29d676 100644 --- a/docs/API.md +++ b/docs/API.md @@ -218,3 +218,103 @@ These cannot be hot-reloaded (they affect DSP pipeline structure): - `rds.pi` / `rds.pty` — rarely change, baked into encoder init - `audio.inputPath` — audio source selection - `backend.kind` / `backend.device` — hardware selection + +--- + +### `POST /audio/stream` + +Push raw audio data into the live stream buffer. Format: **S16LE stereo PCM** at the configured `--audio-rate` (default 44100 Hz). + +Requires `--audio-stdin` or a configured stream source. + +**Request:** Binary body, `application/octet-stream`, raw S16LE stereo PCM bytes. + +**Response:** +```json +{ + "ok": true, + "frames": 4096, + "stats": { + "available": 12000, + "capacity": 131072, + "buffered": 0.09, + "written": 890000, + "underruns": 0, + "overflows": 0 + } +} +``` + +**Example:** +```bash +# Push a file +ffmpeg -i song.mp3 -f s16le -ar 44100 -ac 2 - | \ + curl -X POST --data-binary @- http://pluto:8088/audio/stream +``` + +**Errors:** +- `405` if not POST +- `503` if no audio stream configured + +--- + +## Audio Streaming + +### Stdin pipe (primary method) + +Pipe any audio source through ffmpeg into the transmitter: + +```bash +# Internet radio stream +ffmpeg -i "http://stream.example.com/radio.mp3" -f s16le -ar 44100 -ac 2 - | \ + fmrtx --tx --tx-auto-start --audio-stdin --config config.json + +# Local music file +ffmpeg -i music.flac -f s16le -ar 44100 -ac 2 - | \ + fmrtx --tx --tx-auto-start --audio-stdin + +# Playlist (ffmpeg concat) +ffmpeg -f concat -i playlist.txt -f s16le -ar 44100 -ac 2 - | \ + fmrtx --tx --tx-auto-start --audio-stdin + +# PulseAudio / ALSA capture (Linux) +parecord --format=s16le --rate=44100 --channels=2 - | \ + fmrtx --tx --tx-auto-start --audio-stdin + +# Custom sample rate (e.g. 48kHz source) +ffmpeg -i source.wav -f s16le -ar 48000 -ac 2 - | \ + fmrtx --tx --tx-auto-start --audio-stdin --audio-rate 48000 +``` + +### HTTP audio push + +Push audio from a remote machine via the HTTP API: + +```bash +# From another machine on the network +ffmpeg -i music.mp3 -f s16le -ar 44100 -ac 2 - | \ + curl -X POST --data-binary @- http://pluto-host:8088/audio/stream +``` + +### Audio buffer + +The stream uses a lock-free ring buffer (default: 2 seconds at input rate). Buffer stats are available in `GET /runtime` under `audioStream`: + +```json +{ + "audioStream": { + "available": 12000, + "capacity": 131072, + "buffered": 0.09, + "written": 890000, + "underruns": 0, + "overflows": 0 + } +} +``` + +- **underruns**: DSP consumed faster than audio arrived (silence inserted) +- **overflows**: Audio arrived faster than DSP consumed (data dropped) +- **buffered**: Fill ratio (0.0 = empty, 1.0 = full) + +When no audio is streaming, the transmitter falls back to the configured tone generator or silence. diff --git a/internal/app/engine.go b/internal/app/engine.go index 7384d9a..0139a9b 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/jan/fm-rds-tx/internal/audio" cfgpkg "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/dsp" offpkg "github.com/jan/fm-rds-tx/internal/offline" @@ -70,6 +71,30 @@ type Engine struct { // Live config: pending frequency change, applied between chunks pendingFreq atomic.Pointer[float64] + + // Live audio stream (optional) + streamSrc *audio.StreamSource +} + +// SetStreamSource configures a live audio stream as the audio source. +// Must be called before Start(). The StreamResampler is created internally +// to convert from the stream's sample rate to the DSP composite rate. +func (e *Engine) SetStreamSource(src *audio.StreamSource) { + e.streamSrc = src + compositeRate := float64(e.cfg.FM.CompositeRateHz) + if compositeRate <= 0 { + compositeRate = 228000 + } + resampler := audio.NewStreamResampler(src, compositeRate) + e.generator.SetExternalSource(resampler) + log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)", + src.SampleRate, compositeRate, src.Stats().Capacity) +} + +// StreamSource returns the live audio stream source, or nil. +// Used by the control server for stats and HTTP audio ingest. +func (e *Engine) StreamSource() *audio.StreamSource { + return e.streamSrc } func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine { diff --git a/internal/audio/stream.go b/internal/audio/stream.go new file mode 100644 index 0000000..bf951a8 --- /dev/null +++ b/internal/audio/stream.go @@ -0,0 +1,196 @@ +package audio + +import ( + "encoding/binary" + "fmt" + "io" + "sync/atomic" +) + +// StreamSource is a lock-free SPSC (single-producer, single-consumer) ring buffer +// for real-time audio streaming. One goroutine writes PCM frames, the DSP +// goroutine reads them via NextFrame(). Returns silence on underrun. +// +// Zero allocations in steady state. No mutex in the read or write path. +type StreamSource struct { + ring []Frame + size int + mask int // size-1, for fast modulo (size must be power of 2) + SampleRate int + + writePos atomic.Int64 + readPos atomic.Int64 + + Underruns atomic.Uint64 + Overflows atomic.Uint64 + Written atomic.Uint64 +} + +// NewStreamSource creates a ring buffer with the given capacity (rounded up +// to next power of 2) and input sample rate. +func NewStreamSource(capacity, sampleRate int) *StreamSource { + // Round up to power of 2 + size := 1 + for size < capacity { + size <<= 1 + } + return &StreamSource{ + ring: make([]Frame, size), + size: size, + mask: size - 1, + SampleRate: sampleRate, + } +} + +// WriteFrame pushes a single frame into the ring buffer. +// Returns false if the buffer is full (overflow). +func (s *StreamSource) WriteFrame(f Frame) bool { + wp := s.writePos.Load() + rp := s.readPos.Load() + if wp-rp >= int64(s.size) { + s.Overflows.Add(1) + return false + } + s.ring[int(wp)&s.mask] = f + s.writePos.Add(1) + s.Written.Add(1) + return true +} + +// WritePCM decodes interleaved S16LE stereo PCM bytes and writes frames +// to the ring buffer. Returns the number of frames written. +func (s *StreamSource) WritePCM(data []byte) int { + frames := len(data) / 4 // 2 channels × 2 bytes per sample + written := 0 + for i := 0; i < frames; i++ { + off := i * 4 + l := int16(binary.LittleEndian.Uint16(data[off:])) + r := int16(binary.LittleEndian.Uint16(data[off+2:])) + f := NewFrame( + Sample(float64(l)/32768.0), + Sample(float64(r)/32768.0), + ) + if !s.WriteFrame(f) { + break + } + written++ + } + return written +} + +// ReadFrame consumes one frame from the ring buffer. +// Returns silence (0,0) on underrun. +func (s *StreamSource) ReadFrame() Frame { + rp := s.readPos.Load() + wp := s.writePos.Load() + if rp >= wp { + s.Underruns.Add(1) + return NewFrame(0, 0) + } + f := s.ring[int(rp)&s.mask] + s.readPos.Add(1) + return f +} + +// NextFrame implements the frameSource interface. +func (s *StreamSource) NextFrame() Frame { + return s.ReadFrame() +} + +// Available returns the number of frames currently buffered. +func (s *StreamSource) Available() int { + return int(s.writePos.Load() - s.readPos.Load()) +} + +// Buffered returns the fill ratio (0.0 = empty, 1.0 = full). +func (s *StreamSource) Buffered() float64 { + return float64(s.Available()) / float64(s.size) +} + +// Stats returns diagnostic counters. +func (s *StreamSource) Stats() StreamStats { + return StreamStats{ + Available: s.Available(), + Capacity: s.size, + Buffered: s.Buffered(), + Written: s.Written.Load(), + Underruns: s.Underruns.Load(), + Overflows: s.Overflows.Load(), + } +} + +// StreamStats exposes runtime telemetry for the stream buffer. +type StreamStats struct { + Available int `json:"available"` + Capacity int `json:"capacity"` + Buffered float64 `json:"buffered"` + Written uint64 `json:"written"` + Underruns uint64 `json:"underruns"` + Overflows uint64 `json:"overflows"` +} + +// --- StreamResampler --- + +// StreamResampler wraps a StreamSource and rate-converts from the stream's +// native sample rate to the target output rate using linear interpolation. +// Consumes input frames on demand — no buffering beyond the ring buffer. +type StreamResampler struct { + src *StreamSource + ratio float64 // inputRate / outputRate (< 1 when upsampling) + pos float64 + prev Frame + curr Frame +} + +// NewStreamResampler creates a streaming resampler. +func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler { + if src == nil || outputRate <= 0 || src.SampleRate <= 0 { + return &StreamResampler{src: src, ratio: 1.0} + } + return &StreamResampler{ + src: src, + ratio: float64(src.SampleRate) / outputRate, + } +} + +// NextFrame returns the next interpolated frame at the output rate. +// Implements the frameSource interface. +func (r *StreamResampler) NextFrame() Frame { + if r.src == nil { + return NewFrame(0, 0) + } + + // Consume input samples as the fractional position advances + for r.pos >= 1.0 { + r.prev = r.curr + r.curr = r.src.ReadFrame() + r.pos -= 1.0 + } + + frac := r.pos + l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac + ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac + r.pos += r.ratio + return NewFrame(Sample(l), Sample(ri)) +} + +// --- Ingest helpers --- + +// IngestReader continuously reads S16LE stereo PCM from an io.Reader into +// a StreamSource. Blocks until the reader returns an error or io.EOF. +// Designed to run as a goroutine. +func IngestReader(r io.Reader, dst *StreamSource) error { + buf := make([]byte, 16384) // 4096 frames per read (16KB) + for { + n, err := r.Read(buf) + if n > 0 { + dst.WritePCM(buf[:n]) + } + if err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("audio ingest: %w", err) + } + } +} diff --git a/internal/audio/stream_test.go b/internal/audio/stream_test.go new file mode 100644 index 0000000..cc2820a --- /dev/null +++ b/internal/audio/stream_test.go @@ -0,0 +1,376 @@ +package audio + +import ( + "bytes" + "encoding/binary" + "io" + "math" + "sync" + "sync/atomic" + "testing" +) + +func TestStreamSource_WriteRead(t *testing.T) { + s := NewStreamSource(1024, 44100) + if s.size != 1024 { + t.Fatalf("expected size 1024, got %d", s.size) + } + + // Write and read a frame + f := NewFrame(0.5, -0.3) + if !s.WriteFrame(f) { + t.Fatal("write failed") + } + if s.Available() != 1 { + t.Fatalf("expected 1 available, got %d", s.Available()) + } + + out := s.ReadFrame() + if out.L != 0.5 || out.R != -0.3 { + t.Fatalf("read mismatch: got L=%.2f R=%.2f", out.L, out.R) + } + if s.Available() != 0 { + t.Fatalf("expected 0 available, got %d", s.Available()) + } +} + +func TestStreamSource_Underrun(t *testing.T) { + s := NewStreamSource(16, 44100) + + // Read from empty buffer — should return silence + f := s.ReadFrame() + if f.L != 0 || f.R != 0 { + t.Fatal("expected silence on underrun") + } + if s.Underruns.Load() != 1 { + t.Fatalf("expected 1 underrun, got %d", s.Underruns.Load()) + } +} + +func TestStreamSource_Overflow(t *testing.T) { + s := NewStreamSource(4, 44100) // size rounds up to 4 + + // Fill completely + for i := 0; i < 4; i++ { + if !s.WriteFrame(NewFrame(Sample(float64(i)/10), 0)) { + t.Fatalf("write %d failed", i) + } + } + + // Next write should overflow + if s.WriteFrame(NewFrame(1, 1)) { + t.Fatal("expected overflow") + } + if s.Overflows.Load() != 1 { + t.Fatalf("expected 1 overflow, got %d", s.Overflows.Load()) + } +} + +func TestStreamSource_PowerOf2Rounding(t *testing.T) { + tests := []struct{ in, expect int }{ + {1, 1}, {2, 2}, {3, 4}, {5, 8}, {100, 128}, {1024, 1024}, {1025, 2048}, + } + for _, tt := range tests { + s := NewStreamSource(tt.in, 44100) + if s.size != tt.expect { + t.Fatalf("NewStreamSource(%d): size=%d, expected %d", tt.in, s.size, tt.expect) + } + } +} + +func TestStreamSource_FIFO(t *testing.T) { + s := NewStreamSource(64, 44100) + n := 50 + for i := 0; i < n; i++ { + s.WriteFrame(NewFrame(Sample(float64(i)), 0)) + } + for i := 0; i < n; i++ { + f := s.ReadFrame() + if int(f.L) != i { + t.Fatalf("FIFO order broken at %d: got %d", i, int(f.L)) + } + } +} + +func TestStreamSource_Wraparound(t *testing.T) { + s := NewStreamSource(8, 44100) // size = 8 + + // Write and read more than buffer size to test wraparound + for round := 0; round < 10; round++ { + for i := 0; i < 8; i++ { + val := float64(round*8 + i) + if !s.WriteFrame(NewFrame(Sample(val), 0)) { + t.Fatalf("write failed round=%d i=%d", round, i) + } + } + for i := 0; i < 8; i++ { + expected := float64(round*8 + i) + f := s.ReadFrame() + if float64(f.L) != expected { + t.Fatalf("round=%d i=%d: got %f expected %f", round, i, float64(f.L), expected) + } + } + } + + stats := s.Stats() + if stats.Underruns != 0 || stats.Overflows != 0 { + t.Fatalf("unexpected errors: underruns=%d overflows=%d", stats.Underruns, stats.Overflows) + } +} + +func TestStreamSource_WritePCM(t *testing.T) { + s := NewStreamSource(256, 44100) + + // Create 10 stereo frames of S16LE PCM + var buf bytes.Buffer + for i := 0; i < 10; i++ { + l := int16(i * 1000) + r := int16(-i * 1000) + binary.Write(&buf, binary.LittleEndian, l) + binary.Write(&buf, binary.LittleEndian, r) + } + + written := s.WritePCM(buf.Bytes()) + if written != 10 { + t.Fatalf("expected 10 frames, wrote %d", written) + } + + // Verify first frame + f := s.ReadFrame() + if f.L != 0 || f.R != 0 { + t.Fatalf("frame 0: L=%.4f R=%.4f, expected 0", f.L, f.R) + } + // Verify frame 5 + for i := 1; i < 5; i++ { + s.ReadFrame() + } + f = s.ReadFrame() + expectedL := 5000.0 / 32768.0 + if math.Abs(float64(f.L)-expectedL) > 0.001 { + t.Fatalf("frame 5 L=%.4f, expected %.4f", f.L, expectedL) + } +} + +func TestStreamSource_ConcurrentSPSC(t *testing.T) { + s := NewStreamSource(4096, 44100) + frames := 50000 + var producerDone atomic.Bool + + var wg sync.WaitGroup + wg.Add(2) + + // Producer + go func() { + defer wg.Done() + for i := 0; i < frames; i++ { + for !s.WriteFrame(NewFrame(Sample(float64(i+1)), 0)) { + // Buffer full — yield + } + } + producerDone.Store(true) + }() + + // Consumer + var lastVal float64 + var orderOK = true + var readCount int + go func() { + defer wg.Done() + for { + if s.Available() == 0 { + if producerDone.Load() { + break + } + continue + } + f := s.ReadFrame() + readCount++ + v := float64(f.L) + if v > 0 && v < lastVal { + orderOK = false + } + if v > 0 { + lastVal = v + } + } + }() + + wg.Wait() + + if !orderOK { + t.Fatal("FIFO order broken in concurrent SPSC") + } + if readCount < frames/2 { + t.Fatalf("read too few frames: %d (expected ~%d)", readCount, frames) + } +} + +// --- StreamResampler tests --- + +func TestStreamResampler_1to1(t *testing.T) { + s := NewStreamSource(256, 44100) + r := NewStreamResampler(s, 44100) // 1:1 + + for i := 0; i < 100; i++ { + s.WriteFrame(NewFrame(Sample(float64(i)/100), 0)) + } + + // At 1:1 ratio, output should track input with a small startup delay. + // Skip first few samples (resampler priming), then verify monotonic increase. + prev := -1.0 + for i := 0; i < 90; i++ { + f := r.NextFrame() + v := float64(f.L) + if i > 5 && v < prev-0.001 { + t.Fatalf("sample %d: non-monotonic %.4f < %.4f", i, v, prev) + } + if v > 0 { + prev = v + } + } + // Final value should be close to 0.9 (we wrote 0..0.99) + if prev < 0.5 { + t.Fatalf("final value %.4f too low (expected > 0.5)", prev) + } +} + +func TestStreamResampler_Upsample(t *testing.T) { + // 44100 → 228000 (ratio ≈ 0.1934, ~5.17× upsampling) + s := NewStreamSource(4096, 44100) + r := NewStreamResampler(s, 228000) + + // Write 1000 frames of a 1kHz sine at 44100 Hz + for i := 0; i < 1000; i++ { + v := math.Sin(2 * math.Pi * 1000 * float64(i) / 44100) + s.WriteFrame(NewFrame(Sample(v), Sample(v))) + } + + // Read upsampled output — should be ~5170 samples for 1000 input + // (minus a few for resampler priming) + out := make([]float64, 0, 5200) + for i := 0; i < 5000; i++ { + f := r.NextFrame() + out = append(out, float64(f.L)) + } + + // Verify the output is a smooth sine, not clicks or zeros + // Check that max amplitude is close to 1.0 + maxAmp := 0.0 + for _, v := range out[100:] { // skip initial ramp + if math.Abs(v) > maxAmp { + maxAmp = math.Abs(v) + } + } + if maxAmp < 0.8 { + t.Fatalf("max amplitude %.4f too low (expected ~1.0)", maxAmp) + } + + // Check smoothness: no sudden jumps > 0.1 between adjacent samples + maxJump := 0.0 + for i := 101; i < len(out); i++ { + d := math.Abs(out[i] - out[i-1]) + if d > maxJump { + maxJump = d + } + } + // At 228kHz with 1kHz tone: max step ≈ sin(2π*1000/228000) ≈ 0.0276 + if maxJump > 0.05 { + t.Fatalf("max inter-sample jump %.4f (expected < 0.05 for smooth sine)", maxJump) + } +} + +func TestStreamResampler_Downsample(t *testing.T) { + // 96000 → 44100 (ratio ≈ 2.177, downsampling) + s := NewStreamSource(8192, 96000) + r := NewStreamResampler(s, 44100) + + // Write 4000 frames at 96kHz + for i := 0; i < 4000; i++ { + v := math.Sin(2 * math.Pi * 440 * float64(i) / 96000) + s.WriteFrame(NewFrame(Sample(v), 0)) + } + + // Should get ~1837 output frames (4000 × 44100/96000) + count := 0 + for i := 0; i < 1800; i++ { + f := r.NextFrame() + _ = f + count++ + } + if count != 1800 { + t.Fatalf("expected 1800 reads, got %d", count) + } +} + +func TestStreamResampler_NilSource(t *testing.T) { + r := NewStreamResampler(nil, 228000) + f := r.NextFrame() + if f.L != 0 || f.R != 0 { + t.Fatal("expected silence from nil source") + } +} + +// --- IngestReader test --- + +func TestIngestReader(t *testing.T) { + s := NewStreamSource(4096, 44100) + + // Create PCM data: 100 stereo frames + var buf bytes.Buffer + for i := 0; i < 100; i++ { + l := int16(i * 100) + r := int16(-i * 100) + binary.Write(&buf, binary.LittleEndian, l) + binary.Write(&buf, binary.LittleEndian, r) + } + + // IngestReader should read all data and return nil (EOF) + err := IngestReader(bytes.NewReader(buf.Bytes()), s) + if err != nil { + t.Fatalf("IngestReader: %v", err) + } + + if s.Available() != 100 { + t.Fatalf("expected 100 frames, got %d", s.Available()) + } + + // Verify first and last + f := s.ReadFrame() + if f.L != 0 { + t.Fatalf("frame 0 L=%.4f, expected 0", f.L) + } + for i := 1; i < 99; i++ { + s.ReadFrame() + } + f = s.ReadFrame() + expectedL := 9900.0 / 32768.0 + if math.Abs(float64(f.L)-expectedL) > 0.01 { + t.Fatalf("frame 99 L=%.4f, expected ~%.4f", f.L, expectedL) + } +} + +func TestIngestReader_Error(t *testing.T) { + s := NewStreamSource(256, 44100) + errReader := &errAfterN{n: 10} + err := IngestReader(errReader, s) + if err == nil { + t.Fatal("expected error") + } +} + +type errAfterN struct { + n, count int +} + +func (r *errAfterN) Read(p []byte) (int, error) { + if r.count >= r.n { + return 0, io.ErrUnexpectedEOF + } + r.count++ + // Return 4 bytes (one stereo frame) + if len(p) >= 4 { + p[0], p[1], p[2], p[3] = 0, 0, 0, 0 + return 4, nil + } + return 0, nil +} diff --git a/internal/control/control.go b/internal/control/control.go index e0cf0e8..7c74e7f 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -3,9 +3,11 @@ package control import ( _ "embed" "encoding/json" + "io" "net/http" "sync" + "github.com/jan/fm-rds-tx/internal/audio" "github.com/jan/fm-rds-tx/internal/config" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" "github.com/jan/fm-rds-tx/internal/platform" @@ -39,10 +41,11 @@ type LivePatch struct { } type Server struct { - mu sync.RWMutex - cfg config.Config - tx TXController - drv platform.SoapyDriver // optional, for runtime stats + mu sync.RWMutex + cfg config.Config + tx TXController + drv platform.SoapyDriver // optional, for runtime stats + streamSrc *audio.StreamSource // optional, for live audio ingest } type ConfigPatch struct { @@ -78,6 +81,12 @@ func (s *Server) SetDriver(drv platform.SoapyDriver) { s.mu.Unlock() } +func (s *Server) SetStreamSource(src *audio.StreamSource) { + s.mu.Lock() + s.streamSrc = src + s.mu.Unlock() +} + func (s *Server) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/", s.handleUI) @@ -88,6 +97,7 @@ func (s *Server) Handler() http.Handler { mux.HandleFunc("/runtime", s.handleRuntime) mux.HandleFunc("/tx/start", s.handleTXStart) mux.HandleFunc("/tx/stop", s.handleTXStop) + mux.HandleFunc("/audio/stream", s.handleAudioStream) return mux } @@ -128,6 +138,7 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() drv := s.drv tx := s.tx + stream := s.streamSrc s.mu.RUnlock() result := map[string]any{} @@ -137,10 +148,56 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) { if tx != nil { result["engine"] = tx.TXStats() } + if stream != nil { + result["audioStream"] = stream.Stats() + } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(result) } +// handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes +// it into the live audio ring buffer. Use with: +// curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw +// ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream +func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + s.mu.RLock() + stream := s.streamSrc + s.mu.RUnlock() + + if stream == nil { + http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable) + return + } + + // Read body in chunks and push to ring buffer + buf := make([]byte, 32768) + totalFrames := 0 + for { + n, err := r.Body.Read(buf) + if n > 0 { + totalFrames += stream.WritePCM(buf[:n]) + } + if err != nil { + if err == io.EOF { + break + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "ok": true, + "frames": totalFrames, + "stats": stream.Stats(), + }) +} + func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) diff --git a/internal/dsp/biquad.go b/internal/dsp/biquad.go new file mode 100644 index 0000000..90270aa --- /dev/null +++ b/internal/dsp/biquad.go @@ -0,0 +1,54 @@ +package dsp + +import "math" + +// BiquadLPF is a second-order Butterworth lowpass filter (biquad, direct form II transposed). +// Used after the audio limiter to remove intermodulation products and harmonics +// that could fall into the 19kHz pilot, 38kHz stereo sub, or 57kHz RDS bands. +// +// At 228kHz with fc=15kHz: +// 15kHz: -3 dB (corner) +// 19kHz: -5 dB +// 38kHz: -18 dB +// 57kHz: -27 dB ← protects RDS band +type BiquadLPF struct { + b0, b1, b2 float64 + a1, a2 float64 + z1, z2 float64 // state (direct form II transposed) +} + +// NewBiquadLPF creates a 2nd-order Butterworth lowpass at the given cutoff. +func NewBiquadLPF(cutoffHz, sampleRate float64) *BiquadLPF { + if cutoffHz <= 0 || sampleRate <= 0 || cutoffHz >= sampleRate/2 { + // Passthrough: return unity filter + return &BiquadLPF{b0: 1} + } + + omega := 2 * math.Pi * cutoffHz / sampleRate + cosW := math.Cos(omega) + sinW := math.Sin(omega) + alpha := sinW / (2 * math.Sqrt2) // Q = 1/√2 for Butterworth + + a0 := 1 + alpha + return &BiquadLPF{ + b0: (1 - cosW) / 2 / a0, + b1: (1 - cosW) / a0, + b2: (1 - cosW) / 2 / a0, + a1: (-2 * cosW) / a0, + a2: (1 - alpha) / a0, + } +} + +// Process filters a single sample. +func (f *BiquadLPF) Process(in float64) float64 { + out := f.b0*in + f.z1 + f.z1 = f.b1*in - f.a1*out + f.z2 + f.z2 = f.b2*in - f.a2*out + return out +} + +// Reset clears the filter state. +func (f *BiquadLPF) Reset() { + f.z1 = 0 + f.z2 = 0 +} diff --git a/internal/dsp/stereolimiter.go b/internal/dsp/stereolimiter.go new file mode 100644 index 0000000..08029ec --- /dev/null +++ b/internal/dsp/stereolimiter.go @@ -0,0 +1,68 @@ +package dsp + +import "math" + +// StereoLimiter applies identical gain reduction to L and R channels, +// driven by the peak of max(|L|, |R|). This preserves the stereo image +// while preventing either channel from exceeding the ceiling. +// +// Attack is INSTANTANEOUS — gain is reduced in the same sample that +// exceeds the ceiling. This avoids overshoot entirely, which is critical +// because overshoot causes composite clipping that destroys pilot/RDS. +// Unlike hard clipping, gain scaling preserves the waveform shape and +// does not create harmonics. +// +// Release is smooth (exponential decay) to avoid audible pumping. +type StereoLimiter struct { + ceiling float64 + releaseCoeff float64 + gainReduction float64 +} + +// NewStereoLimiter creates a stereo-linked limiter with instant attack. +// releaseMs controls how quickly gain recovers after a peak (typ. 50-200ms). +func NewStereoLimiter(ceiling, attackMs, releaseMs, sampleRate float64) *StereoLimiter { + if ceiling <= 0 { + ceiling = 1.0 + } + if releaseMs <= 0 { + releaseMs = 100 + } + releaseSamples := releaseMs * sampleRate / 1000 + + return &StereoLimiter{ + ceiling: ceiling, + releaseCoeff: 1.0 - math.Exp(-1.0/releaseSamples), + } +} + +// Process applies stereo-linked limiting. Both channels receive the +// same gain factor, determined by the louder of the two. +// +// If the peak exceeds ceiling, gain is INSTANTLY reduced (zero overshoot). +// When the signal drops below ceiling, gain recovers smoothly via release. +func (l *StereoLimiter) Process(left, right float64) (float64, float64) { + peak := math.Max(math.Abs(left), math.Abs(right)) + + // Target: how much gain reduction do we need right now? + targetReduction := 0.0 + if peak > l.ceiling { + targetReduction = 1.0 - l.ceiling/peak + } + + // Instant attack: if we need MORE reduction, apply it NOW. + // Smooth release: if we need LESS reduction, decay slowly. + if targetReduction > l.gainReduction { + l.gainReduction = targetReduction // instant + } else { + l.gainReduction += l.releaseCoeff * (targetReduction - l.gainReduction) // smooth + } + + gain := 1.0 - l.gainReduction + return left * gain, right * gain +} + +// Reset clears the limiter state. +func (l *StereoLimiter) Reset() { + l.gainReduction = 0 +} diff --git a/internal/offline/generator.go b/internal/offline/generator.go index e0513e4..51ec8b1 100644 --- a/internal/offline/generator.go +++ b/internal/offline/generator.go @@ -77,7 +77,8 @@ type Generator struct { stereoEncoder stereo.StereoEncoder rdsEnc *rds.Encoder combiner mpx.DefaultCombiner - limiter *dsp.MPXLimiter + limiter *dsp.StereoLimiter // stereo-linked, operates on L/R BEFORE stereo encoding + lpfL, lpfR *dsp.BiquadLPF // 15kHz lowpass after limiter, protects RDS band fmMod *dsp.FMModulator sampleRate float64 initialized bool @@ -89,12 +90,23 @@ type Generator struct { // Live-updatable DSP parameters — written by control API, read per chunk. liveParams atomic.Pointer[LiveParams] + + // Optional external audio source (e.g. StreamResampler for live audio). + // When set, takes priority over WAV/tones in sourceFor(). + externalSource frameSource } func NewGenerator(cfg cfgpkg.Config) *Generator { return &Generator{cfg: cfg} } +// SetExternalSource sets a live audio source (e.g. StreamResampler) that +// takes priority over WAV/tone sources. Must be called before the first +// GenerateFrame() call (i.e. before init). +func (g *Generator) SetExternalSource(src frameSource) { + g.externalSource = src +} + // UpdateLive hot-swaps DSP parameters. Thread-safe — called from control API, // applied at the next chunk boundary by the DSP goroutine. func (g *Generator) UpdateLive(p LiveParams) { @@ -140,9 +152,18 @@ func (g *Generator) init() { } ceiling := g.cfg.FM.LimiterCeiling if ceiling <= 0 { ceiling = 1.0 } + // Audio ceiling leaves headroom for pilot + RDS so total ≤ ceiling + pilotAmp := g.cfg.FM.PilotLevel * g.cfg.FM.OutputDrive + rdsAmp := g.cfg.FM.RDSInjection * g.cfg.FM.OutputDrive + audioCeiling := ceiling - pilotAmp - rdsAmp + if audioCeiling < 0.3 { audioCeiling = 0.3 } if g.cfg.FM.LimiterEnabled { - g.limiter = dsp.NewMPXLimiter(ceiling, 0.1, 50, g.sampleRate) + g.limiter = dsp.NewStereoLimiter(audioCeiling, 0.5, 100, g.sampleRate) } + // 15kHz lowpass after limiter — removes limiter gain-step intermodulation + // products that would otherwise fall into pilot/stereo/RDS bands. + g.lpfL = dsp.NewBiquadLPF(15000, g.sampleRate) + g.lpfR = dsp.NewBiquadLPF(15000, g.sampleRate) if g.cfg.FM.FMModulationEnabled { g.fmMod = dsp.NewFMModulator(g.sampleRate) if g.cfg.FM.MaxDeviationHz > 0 { g.fmMod.MaxDeviation = g.cfg.FM.MaxDeviationHz } @@ -163,6 +184,9 @@ func (g *Generator) init() { } func (g *Generator) sourceFor(sampleRate float64) (frameSource, SourceInfo) { + if g.externalSource != nil { + return g.externalSource, SourceInfo{Kind: "stream", SampleRate: sampleRate, Detail: "live audio"} + } if g.cfg.Audio.InputPath != "" { if src, err := audio.LoadWAVSource(g.cfg.Audio.InputPath); err == nil { return audio.NewResampledSource(src, sampleRate), SourceInfo{Kind: "wav", SampleRate: float64(src.SampleRate), Detail: g.cfg.Audio.InputPath} @@ -199,32 +223,64 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame lp = &LiveParams{OutputDrive: 1.0, LimiterCeiling: 1.0} } - // Apply live combiner gains - g.combiner.PilotGain = lp.PilotLevel - g.combiner.RDSGain = lp.RDSInjection - + // Signal path (matches professional broadcast processors): + // Audio L/R → × Drive → Stereo-linked limiter → Stereo encoder + // → Mono + Stereo sub (from limited audio, natural levels) + // → + Pilot (fixed) → + RDS (fixed) → FM modulator + // + // The limiter never sees the 38kHz subcarrier, so it can't pump + // the stereo difference signal. Pilot and RDS are post-encoder + // at fixed amplitudes, unaffected by audio dynamics. + // + // Audio ceiling is auto-reduced to leave headroom for pilot + RDS, + // so total composite stays within ±ceiling (= ±75kHz deviation). ceiling := lp.LimiterCeiling if ceiling <= 0 { ceiling = 1.0 } + pilotAmp := lp.PilotLevel * lp.OutputDrive + rdsAmp := lp.RDSInjection * lp.OutputDrive + audioCeiling := ceiling - pilotAmp - rdsAmp + if audioCeiling < 0.3 { audioCeiling = 0.3 } // safety floor for i := 0; i < samples; i++ { in := g.source.NextFrame() - comps := g.stereoEncoder.Encode(in) - if !lp.StereoEnabled { - comps.Stereo = 0; comps.Pilot = 0 + // --- Stage 1: Band-limit pre-emphasized audio --- + // The 15kHz LPF goes BEFORE drive+limiter. Pre-emphasis boosts + // HF by up to +13.5dB. Without the LPF, the limiter would waste + // gain reduction on HF peaks that get filtered later, causing + // wild modulation swings (30-163%). With LPF first, the limiter + // sees the final audio bandwidth and sets gain correctly. + l := g.lpfL.Process(float64(in.L)) + r := g.lpfR.Process(float64(in.R)) + + // --- Stage 2: Scale and limit --- + l *= lp.OutputDrive + r *= lp.OutputDrive + + if lp.LimiterEnabled && g.limiter != nil { + l, r = g.limiter.Process(l, r) } - rdsValue := 0.0 + // --- Stage 3: Stereo encode the limited, filtered audio --- + limited := audio.NewFrame(audio.Sample(l), audio.Sample(r)) + comps := g.stereoEncoder.Encode(limited) + + // --- Stage 3: Combine at fixed levels --- + composite := float64(comps.Mono) + if lp.StereoEnabled { + composite += float64(comps.Stereo) + composite += pilotAmp * comps.Pilot + } if g.rdsEnc != nil && lp.RDSEnabled { rdsCarrier := g.stereoEncoder.RDSCarrier() - rdsValue = g.rdsEnc.NextSampleWithCarrier(rdsCarrier) + rdsValue := g.rdsEnc.NextSampleWithCarrier(rdsCarrier) + composite += rdsAmp * rdsValue } - composite := g.combiner.Combine(comps.Mono, comps.Stereo, comps.Pilot, rdsValue) - composite *= lp.OutputDrive - - if lp.LimiterEnabled && g.limiter != nil { - composite = g.limiter.Process(composite) + // Final composite safety clip — only fires on brief limiter + // overshoots during fast transients. Clips the entire composite, + // not individual audio bands, so harmonics don't target RDS. + if lp.LimiterEnabled { composite = dsp.HardClip(composite, ceiling) } diff --git a/internal/offline/generator_test.go b/internal/offline/generator_test.go index 0a411f8..24b6b8b 100644 --- a/internal/offline/generator_test.go +++ b/internal/offline/generator_test.go @@ -83,8 +83,11 @@ func TestLimiterPreventsClipping(t *testing.T) { cfg.FM.FMModulationEnabled = false cfg.Audio.ToneAmplitude = 0.9; cfg.Audio.Gain = 2.0; cfg.FM.OutputDrive = 1.0 frame := NewGenerator(cfg).GenerateFrame(50 * time.Millisecond) + // Total composite (audio + pilot + RDS) should stay within ceiling. + // Audio ceiling is auto-reduced to leave headroom for pilot + RDS. + maxAllowed := cfg.FM.LimiterCeiling + 0.02 // small tolerance for limiter settling for i, s := range frame.Samples { - if math.Abs(float64(s.I)) > 1.01 { t.Fatalf("sample %d: %.4f exceeds ceiling", i, s.I) } + if math.Abs(float64(s.I)) > maxAllowed { t.Fatalf("sample %d: %.4f exceeds max %.4f", i, s.I, maxAllowed) } } } diff --git a/internal/rds/encoder.go b/internal/rds/encoder.go index 8abcaf5..fd54455 100644 --- a/internal/rds/encoder.go +++ b/internal/rds/encoder.go @@ -178,14 +178,17 @@ func (e *Encoder) NextSampleWithCarrier(carrier float64) float64 { if e.sampleCount >= e.spb { if e.bitPos >= bitsPerGroup { // Apply live text updates at group boundaries (~88ms at 228kHz). - // This is the only place we read the atomics — zero per-sample overhead. + // Atomics are consumed (cleared) after reading to prevent + // re-applying the same text every group and toggling A/B flag. if ps, ok := e.livePS.Load().(string); ok && ps != "" { e.scheduler.cfg.PS = ps + e.livePS.Store("") // consumed } if rt, ok := e.liveRT.Load().(string); ok && rt != "" { e.scheduler.cfg.RT = rt e.scheduler.rtIdx = 0 // restart RT transmission for new text e.scheduler.rtABFlag = !e.scheduler.rtABFlag // toggle A/B per RDS spec + e.liveRT.Store("") // consumed } e.getRDSGroup() e.bitPos = 0 diff --git a/stream_tx.bat b/stream_tx.bat new file mode 100644 index 0000000..69a9bfb --- /dev/null +++ b/stream_tx.bat @@ -0,0 +1,2 @@ +@echo off +ffmpeg -i "http://stream.srg-ssr.ch/m/drs3/mp3_128" -f s16le -ar 44100 -ac 2 - | fmrtx.exe --tx --tx-auto-start --audio-stdin --config docs/config.plutosdr.json \ No newline at end of file