Procházet zdrojové kódy

ingest: rewire tx/control to runtime and http raw adapter

main
Jan před 1 měsícem
rodič
revize
4ad70d4ae4
5 změnil soubory, kde provedl 285 přidání a 63 odebrání
  1. +75
    -36
      cmd/fmrtx/main.go
  2. +45
    -13
      internal/control/control.go
  3. +18
    -13
      internal/control/control_test.go
  4. +133
    -0
      internal/ingest/adapters/httpraw/source.go
  5. +14
    -1
      internal/ingest/runtime.go

+ 75
- 36
cmd/fmrtx/main.go Zobrazit soubor

@@ -15,6 +15,9 @@ import (
cfgpkg "github.com/jan/fm-rds-tx/internal/config"
ctrlpkg "github.com/jan/fm-rds-tx/internal/control"
drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
"github.com/jan/fm-rds-tx/internal/ingest"
"github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw"
"github.com/jan/fm-rds-tx/internal/ingest/adapters/stdinpcm"
"github.com/jan/fm-rds-tx/internal/platform"
"github.com/jan/fm-rds-tx/internal/platform/plutosdr"
"github.com/jan/fm-rds-tx/internal/platform/soapysdr"
@@ -36,7 +39,6 @@ func main() {
audioHTTP := flag.Bool("audio-http", false, "enable HTTP audio ingest via /audio/stream")
flag.Parse()

// --- list-devices (SoapySDR) ---
if *listDevices {
devices, err := soapysdr.Enumerate()
if err != nil {
@@ -60,13 +62,12 @@ func main() {
log.Fatalf("load config: %v", err)
}

// --- print-config ---
if *printConfig {
preemph := "off"
if cfg.FM.PreEmphasisTauUS > 0 {
preemph = fmt.Sprintf("%.0fµs", cfg.FM.PreEmphasisTauUS)
preemph = fmt.Sprintf("%.0fus", cfg.FM.PreEmphasisTauUS)
}
fmt.Printf("backend=%s freq=%.1fMHz stereo=%t rds=%t preemph=%s limiter=%t fmmod=%t deviation=±%.0fHz compositeRate=%dHz deviceRate=%.0fHz listen=%s pluto=%t soapy=%t\n",
fmt.Printf("backend=%s freq=%.1fMHz stereo=%t rds=%t preemph=%s limiter=%t fmmod=%t deviation=+-%.0fHz compositeRate=%dHz deviceRate=%.0fHz listen=%s pluto=%t soapy=%t\n",
cfg.Backend.Kind, cfg.FM.FrequencyMHz, cfg.FM.StereoEnabled, cfg.RDS.Enabled,
preemph, cfg.FM.LimiterEnabled, cfg.FM.FMModulationEnabled, cfg.FM.MaxDeviationHz,
cfg.FM.CompositeRateHz, cfg.EffectiveDeviceRate(), cfg.Control.ListenAddress,
@@ -74,7 +75,6 @@ func main() {
return
}

// --- dry-run ---
if *dryRun {
frame := drypkg.Generate(cfg)
if err := drypkg.WriteJSON(*dryOutput, frame); err != nil {
@@ -86,7 +86,6 @@ func main() {
return
}

// --- simulate ---
if *simulate {
summary, err := apppkg.RunSimulatedTransmit(cfg, *simulateOutput, *simulateDuration)
if err != nil {
@@ -96,28 +95,24 @@ func main() {
return
}

// --- TX mode ---
if *txMode {
driver := selectDriver(cfg)
if driver == nil {
log.Fatal("no hardware driver available build with -tags pluto (or -tags soapy)")
log.Fatal("no hardware driver available - build with -tags pluto (or -tags soapy)")
}
runTXMode(cfg, driver, *txAutoStart, *audioStdin, *audioRate, *audioHTTP)
return
}

// --- default: HTTP only ---
srv := ctrlpkg.NewServer(cfg)
server := ctrlpkg.NewHTTPServer(cfg, srv.Handler())
log.Printf("fm-rds-tx listening on %s (TX default: off, use --tx for hardware)", server.Addr)
log.Fatal(server.ListenAndServe())
}

// selectDriver picks the best available driver based on config and build tags.
func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver {
kind := cfg.Backend.Kind

// Explicit PlutoSDR
if kind == "pluto" || kind == "plutosdr" {
if plutosdr.Available() {
return plutosdr.NewPlutoDriver()
@@ -125,7 +120,6 @@ func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver {
log.Printf("warning: backend=%s but pluto driver not available (%s)", kind, plutosdr.AvailableError())
}

// Explicit SoapySDR
if kind == "soapy" || kind == "soapysdr" {
if soapysdr.Available() {
return soapysdr.NewNativeDriver()
@@ -133,7 +127,6 @@ func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver {
log.Printf("warning: backend=%s but soapy driver not available", kind)
}

// Auto-detect: prefer PlutoSDR, fall back to SoapySDR
if plutosdr.Available() {
log.Println("auto-selected: pluto-iio driver")
return plutosdr.NewPlutoDriver()
@@ -150,14 +143,11 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Configure driver
// OutputDrive controls composite signal level, NOT hardware gain.
// Hardware TX gain is always 0 dB (max power). Use external attenuator for power control.
soapyCfg := platform.SoapyConfig{
Driver: cfg.Backend.Driver,
Device: cfg.Backend.Device,
CenterFreqHz: cfg.FM.FrequencyMHz * 1e6,
GainDB: 0, // 0 dB = max TX power on PlutoSDR
GainDB: 0,
DeviceArgs: map[string]string{},
}
if cfg.Backend.URI != "" {
@@ -181,42 +171,45 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a
caps.GainMinDB, caps.GainMaxDB, caps.MinSampleRate, caps.MaxSampleRate)
}

// Engine
engine := apppkg.NewEngine(cfg, driver)
cfg = applyLegacyAudioFlags(cfg, audioStdin, audioRate, audioHTTP)

// Live audio stream source (optional)
var streamSrc *audio.StreamSource
if audioStdin || audioHTTP {
// Buffer: 2 seconds at input rate — enough to absorb jitter
bufferFrames := audioRate * 2
var ingestRuntime *ingest.Runtime
var ingress ctrlpkg.AudioIngress
if cfg.Ingest.Kind != "" && cfg.Ingest.Kind != "none" {
rate := ingestSampleRate(cfg)
bufferFrames := rate * 2
if bufferFrames <= 0 {
bufferFrames = 1
}
streamSrc = audio.NewStreamSource(bufferFrames, audioRate)
streamSrc = audio.NewStreamSource(bufferFrames, rate)
engine.SetStreamSource(streamSrc)

if audioStdin {
go func() {
log.Printf("audio: reading S16LE stereo PCM from stdin at %d Hz", audioRate)
if err := audio.IngestReader(os.Stdin, streamSrc); err != nil {
log.Printf("audio: stdin ingest ended: %v", err)
} else {
log.Println("audio: stdin EOF")
}
}()
source, sourceIngress, err := buildPhase1Source(cfg)
if err != nil {
log.Fatalf("ingest source: %v", err)
}
if audioHTTP {
log.Printf("audio: HTTP ingest enabled on /audio/stream (rate=%dHz, buffer=%d frames)", audioRate, streamSrc.Stats().Capacity)
ingestRuntime = ingest.NewRuntime(streamSrc, source)
if err := ingestRuntime.Start(ctx); err != nil {
log.Fatalf("ingest start: %v", err)
}
ingress = sourceIngress
log.Printf("ingest: kind=%s rate=%dHz buffer=%d frames", cfg.Ingest.Kind, rate, streamSrc.Stats().Capacity)
}

// Control plane
srv := ctrlpkg.NewServer(cfg)
srv.SetDriver(driver)
srv.SetTXController(&txBridge{engine: engine})
if streamSrc != nil {
srv.SetStreamSource(streamSrc)
}
if ingress != nil {
srv.SetAudioIngress(ingress)
}
if ingestRuntime != nil {
srv.SetIngestRuntime(ingestRuntime)
}

if autoStart {
log.Println("TX: auto-start enabled")
@@ -225,7 +218,7 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a
}
log.Printf("TX ACTIVE: freq=%.3fMHz rate=%.0fHz", cfg.FM.FrequencyMHz, cfg.EffectiveDeviceRate())
} else {
log.Println("TX ready (idle) POST /tx/start to begin")
log.Println("TX ready (idle) - POST /tx/start to begin")
}

ctrlServer := ctrlpkg.NewHTTPServer(cfg, srv.Handler())
@@ -242,10 +235,56 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a
log.Printf("received %s, shutting down...", sig)

_ = engine.Stop(ctx)
if ingestRuntime != nil {
_ = ingestRuntime.Stop()
}
_ = driver.Close(ctx)
log.Println("shutdown complete")
}

func applyLegacyAudioFlags(cfg cfgpkg.Config, audioStdin bool, audioRate int, audioHTTP bool) cfgpkg.Config {
if audioRate > 0 {
cfg.Ingest.Stdin.SampleRateHz = audioRate
cfg.Ingest.HTTPRaw.SampleRateHz = audioRate
}
if audioStdin && audioHTTP {
log.Printf("audio: both --audio-stdin and --audio-http set; using ingest kind=stdin")
}
if audioStdin {
cfg.Ingest.Kind = "stdin"
}
if audioHTTP && !audioStdin {
cfg.Ingest.Kind = "http-raw"
}
return cfg
}

func ingestSampleRate(cfg cfgpkg.Config) int {
switch cfg.Ingest.Kind {
case "stdin", "stdin-pcm":
return cfg.Ingest.Stdin.SampleRateHz
case "http-raw":
return cfg.Ingest.HTTPRaw.SampleRateHz
default:
return 44100
}
}

func buildPhase1Source(cfg cfgpkg.Config) (ingest.Source, ctrlpkg.AudioIngress, error) {
switch cfg.Ingest.Kind {
case "stdin", "stdin-pcm":
src := stdinpcm.New("stdin-main", os.Stdin, cfg.Ingest.Stdin.SampleRateHz, cfg.Ingest.Stdin.Channels, 1024)
return src, nil, nil
case "http-raw":
src := httpraw.New("http-raw-main", cfg.Ingest.HTTPRaw.SampleRateHz, cfg.Ingest.HTTPRaw.Channels)
return src, src, nil
case "", "none":
return nil, nil, nil
default:
return nil, nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind)
}
}

type txBridge struct{ engine *apppkg.Engine }

func (b *txBridge) StartTX() error { return b.engine.Start(context.Background()) }


+ 45
- 13
internal/control/control.go Zobrazit soubor

@@ -14,6 +14,7 @@ import (
"github.com/jan/fm-rds-tx/internal/audio"
"github.com/jan/fm-rds-tx/internal/config"
drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
"github.com/jan/fm-rds-tx/internal/ingest"
"github.com/jan/fm-rds-tx/internal/platform"
)

@@ -46,12 +47,22 @@ type LivePatch struct {
}

type Server struct {
mu sync.RWMutex
cfg config.Config
tx TXController
drv platform.SoapyDriver // optional, for runtime stats
streamSrc *audio.StreamSource // optional, for live audio ingest
audit auditCounters
mu sync.RWMutex
cfg config.Config
tx TXController
drv platform.SoapyDriver // optional, for runtime stats
streamSrc *audio.StreamSource // optional, for live audio ring stats
audioIngress AudioIngress // optional, for /audio/stream
ingestRt IngestRuntime // optional, for /runtime ingest stats
audit auditCounters
}

type AudioIngress interface {
WritePCM16(data []byte) (int, error)
}

type IngestRuntime interface {
Stats() ingest.Stats
}

type auditEvent string
@@ -196,6 +207,18 @@ func (s *Server) SetStreamSource(src *audio.StreamSource) {
s.mu.Unlock()
}

func (s *Server) SetAudioIngress(ingress AudioIngress) {
s.mu.Lock()
s.audioIngress = ingress
s.mu.Unlock()
}

func (s *Server) SetIngestRuntime(rt IngestRuntime) {
s.mu.Lock()
s.ingestRt = rt
s.mu.Unlock()
}

func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/", s.handleUI)
@@ -268,6 +291,7 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
drv := s.drv
tx := s.tx
stream := s.streamSrc
ingestRt := s.ingestRt
s.mu.RUnlock()

result := map[string]any{}
@@ -280,6 +304,9 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
if stream != nil {
result["audioStream"] = stream.Stats()
}
if ingestRt != nil {
result["ingest"] = ingestRt.Stats()
}
result["controlAudit"] = s.auditSnapshot()
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(result)
@@ -311,8 +338,9 @@ func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request)

// handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
// it into the live audio ring buffer. Use with:
// curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
// ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
//
// curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
// ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
s.recordAudit(auditMethodNotAllowed)
@@ -325,11 +353,11 @@ func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
return
}
s.mu.RLock()
stream := s.streamSrc
ingress := s.audioIngress
s.mu.RUnlock()

if stream == nil {
http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
if ingress == nil {
http.Error(w, "audio ingest not configured (use --audio-http with ingest runtime)", http.StatusServiceUnavailable)
return
}

@@ -341,7 +369,12 @@ func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
for {
n, err := r.Body.Read(buf)
if n > 0 {
totalFrames += stream.WritePCM(buf[:n])
written, writeErr := ingress.WritePCM16(buf[:n])
totalFrames += written
if writeErr != nil {
http.Error(w, writeErr.Error(), http.StatusServiceUnavailable)
return
}
}
if err != nil {
if err == io.EOF {
@@ -362,7 +395,6 @@ func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
_ = json.NewEncoder(w).Encode(map[string]any{
"ok": true,
"frames": totalFrames,
"stats": stream.Stats(),
})
}



+ 18
- 13
internal/control/control_test.go Zobrazit soubor

@@ -9,7 +9,6 @@ import (
"strings"
"testing"

"github.com/jan/fm-rds-tx/internal/audio"
cfgpkg "github.com/jan/fm-rds-tx/internal/config"
"github.com/jan/fm-rds-tx/internal/output"
)
@@ -317,8 +316,8 @@ func TestAudioStreamRequiresSource(t *testing.T) {
func TestAudioStreamPushesPCM(t *testing.T) {
cfg := cfgpkg.Default()
srv := NewServer(cfg)
stream := audio.NewStreamSource(256, 44100)
srv.SetStreamSource(stream)
ingress := &fakeAudioIngress{}
srv.SetAudioIngress(ingress)
pcm := []byte{0, 0, 0, 0}
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader(pcm))
@@ -338,12 +337,8 @@ func TestAudioStreamPushesPCM(t *testing.T) {
if frames != 1 {
t.Fatalf("expected 1 frame, got %v", frames)
}
stats, ok := body["stats"].(map[string]any)
if !ok {
t.Fatalf("missing stats: %v", body["stats"])
}
if avail, _ := stats["available"].(float64); avail < 1 {
t.Fatalf("expected stats.available >= 1, got %v", avail)
if ingress.totalFrames != 1 {
t.Fatalf("expected ingress frames=1, got %d", ingress.totalFrames)
}
}

@@ -360,7 +355,7 @@ func TestAudioStreamRejectsNonPost(t *testing.T) {
func TestAudioStreamRejectsMissingContentType(t *testing.T) {
cfg := cfgpkg.Default()
srv := NewServer(cfg)
srv.SetStreamSource(audio.NewStreamSource(256, 44100))
srv.SetAudioIngress(&fakeAudioIngress{})
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader([]byte{0, 0}))
srv.Handler().ServeHTTP(rec, req)
@@ -375,7 +370,7 @@ func TestAudioStreamRejectsMissingContentType(t *testing.T) {
func TestAudioStreamRejectsUnsupportedContentType(t *testing.T) {
cfg := cfgpkg.Default()
srv := NewServer(cfg)
srv.SetStreamSource(audio.NewStreamSource(256, 44100))
srv.SetAudioIngress(&fakeAudioIngress{})
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader([]byte{0, 0}))
req.Header.Set("Content-Type", "text/plain")
@@ -397,7 +392,7 @@ func TestAudioStreamRejectsBodyTooLarge(t *testing.T) {
limit := int(audioStreamBodyLimit)
body := make([]byte, limit+1)
srv := NewServer(cfgpkg.Default())
srv.SetStreamSource(audio.NewStreamSource(256, 44100))
srv.SetAudioIngress(&fakeAudioIngress{})
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/octet-stream")
@@ -524,7 +519,7 @@ func TestControlAuditTracksMethodNotAllowed(t *testing.T) {

func TestControlAuditTracksUnsupportedMediaType(t *testing.T) {
srv := NewServer(cfgpkg.Default())
srv.SetStreamSource(audio.NewStreamSource(256, 44100))
srv.SetAudioIngress(&fakeAudioIngress{})
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader([]byte{0, 0}))
srv.Handler().ServeHTTP(rec, req)
@@ -605,6 +600,16 @@ type fakeTXController struct {
stats map[string]any
}

type fakeAudioIngress struct {
totalFrames int
}

func (f *fakeAudioIngress) WritePCM16(data []byte) (int, error) {
frames := len(data) / 4
f.totalFrames += frames
return frames, nil
}

func (f *fakeTXController) StartTX() error { return nil }
func (f *fakeTXController) StopTX() error { return nil }
func (f *fakeTXController) TXStats() map[string]any {


+ 133
- 0
internal/ingest/adapters/httpraw/source.go Zobrazit soubor

@@ -0,0 +1,133 @@
package httpraw

import (
"context"
"encoding/binary"
"fmt"
"sync/atomic"
"time"

"github.com/jan/fm-rds-tx/internal/ingest"
)

type Source struct {
id string
sampleRate int
channels int

chunks chan ingest.PCMChunk
errs chan error

sequence atomic.Uint64
state atomic.Value // string
chunksIn atomic.Uint64
samplesIn atomic.Uint64
discontinuities atomic.Uint64
lastChunkAtUnix atomic.Int64
lastError atomic.Value // string
}

func New(id string, sampleRate, channels int) *Source {
if id == "" {
id = "http-raw"
}
if sampleRate <= 0 {
sampleRate = 44100
}
if channels <= 0 {
channels = 2
}
s := &Source{
id: id,
sampleRate: sampleRate,
channels: channels,
chunks: make(chan ingest.PCMChunk, 32),
errs: make(chan error, 8),
}
s.state.Store("idle")
return s
}

func (s *Source) Descriptor() ingest.SourceDescriptor {
return ingest.SourceDescriptor{
ID: s.id,
Kind: "http-raw",
Family: "raw",
Transport: "http",
Codec: "pcm_s16le",
Channels: s.channels,
SampleRateHz: s.sampleRate,
Detail: "HTTP push /audio/stream",
}
}

func (s *Source) Start(_ context.Context) error {
s.state.Store("running")
return nil
}

func (s *Source) Stop() error {
s.state.Store("stopped")
return nil
}

func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
func (s *Source) Errors() <-chan error { return s.errs }

func (s *Source) Stats() ingest.SourceStats {
state, _ := s.state.Load().(string)
last := s.lastChunkAtUnix.Load()
errStr, _ := s.lastError.Load().(string)
var lastChunkAt time.Time
if last > 0 {
lastChunkAt = time.Unix(0, last)
}
return ingest.SourceStats{
State: state,
Connected: state == "running",
LastChunkAt: lastChunkAt,
ChunksIn: s.chunksIn.Load(),
SamplesIn: s.samplesIn.Load(),
Discontinuities: s.discontinuities.Load(),
LastError: errStr,
}
}

func (s *Source) WritePCM16(data []byte) (int, error) {
if s.channels != 1 && s.channels != 2 {
return 0, fmt.Errorf("unsupported configured channels: %d", s.channels)
}
if len(data) == 0 {
return 0, nil
}
frameBytes := s.channels * 2
usable := len(data) - (len(data) % frameBytes)
if usable == 0 {
return 0, nil
}
samples := make([]int32, 0, usable/2)
for i := 0; i+1 < usable; i += 2 {
v := int16(binary.LittleEndian.Uint16(data[i : i+2]))
samples = append(samples, int32(v)<<16)
}
seq := s.sequence.Add(1) - 1
chunk := ingest.PCMChunk{
Samples: samples,
Channels: s.channels,
SampleRateHz: s.sampleRate,
Sequence: seq,
Timestamp: time.Now(),
SourceID: s.id,
}
select {
case s.chunks <- chunk:
default:
s.discontinuities.Add(1)
return 0, fmt.Errorf("http raw ingress overflow")
}
frames := usable / frameBytes
s.chunksIn.Add(1)
s.samplesIn.Add(uint64(len(samples)))
s.lastChunkAtUnix.Store(time.Now().UnixNano())
return frames, nil
}

+ 14
- 1
internal/ingest/runtime.go Zobrazit soubor

@@ -34,6 +34,12 @@ func NewRuntime(sink *audio.StreamSource, src Source) *Runtime {
}

func (r *Runtime) Start(ctx context.Context) error {
if r.sink == nil {
r.mu.Lock()
r.stats.State = "failed"
r.mu.Unlock()
return nil
}
if r.source == nil {
r.mu.Lock()
r.stats.State = "idle"
@@ -91,7 +97,11 @@ func (r *Runtime) run() {
select {
case <-r.ctx.Done():
return
case err := <-errCh:
case err, ok := <-errCh:
if !ok {
errCh = nil
continue
}
if err == nil {
continue
}
@@ -100,6 +110,9 @@ func (r *Runtime) run() {
r.mu.Unlock()
case chunk, ok := <-ch:
if !ok {
r.mu.Lock()
r.stats.State = "stopped"
r.mu.Unlock()
return
}
r.handleChunk(chunk)


Načítá se…
Zrušit
Uložit