diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index 05472da..fbd8578 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -15,6 +15,9 @@ import ( cfgpkg "github.com/jan/fm-rds-tx/internal/config" ctrlpkg "github.com/jan/fm-rds-tx/internal/control" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" + "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw" + "github.com/jan/fm-rds-tx/internal/ingest/adapters/stdinpcm" "github.com/jan/fm-rds-tx/internal/platform" "github.com/jan/fm-rds-tx/internal/platform/plutosdr" "github.com/jan/fm-rds-tx/internal/platform/soapysdr" @@ -36,7 +39,6 @@ func main() { audioHTTP := flag.Bool("audio-http", false, "enable HTTP audio ingest via /audio/stream") flag.Parse() - // --- list-devices (SoapySDR) --- if *listDevices { devices, err := soapysdr.Enumerate() if err != nil { @@ -60,13 +62,12 @@ func main() { log.Fatalf("load config: %v", err) } - // --- print-config --- if *printConfig { preemph := "off" if cfg.FM.PreEmphasisTauUS > 0 { - preemph = fmt.Sprintf("%.0fµs", cfg.FM.PreEmphasisTauUS) + preemph = fmt.Sprintf("%.0fus", cfg.FM.PreEmphasisTauUS) } - fmt.Printf("backend=%s freq=%.1fMHz stereo=%t rds=%t preemph=%s limiter=%t fmmod=%t deviation=±%.0fHz compositeRate=%dHz deviceRate=%.0fHz listen=%s pluto=%t soapy=%t\n", + fmt.Printf("backend=%s freq=%.1fMHz stereo=%t rds=%t preemph=%s limiter=%t fmmod=%t deviation=+-%.0fHz compositeRate=%dHz deviceRate=%.0fHz listen=%s pluto=%t soapy=%t\n", cfg.Backend.Kind, cfg.FM.FrequencyMHz, cfg.FM.StereoEnabled, cfg.RDS.Enabled, preemph, cfg.FM.LimiterEnabled, cfg.FM.FMModulationEnabled, cfg.FM.MaxDeviationHz, cfg.FM.CompositeRateHz, cfg.EffectiveDeviceRate(), cfg.Control.ListenAddress, @@ -74,7 +75,6 @@ func main() { return } - // --- dry-run --- if *dryRun { frame := drypkg.Generate(cfg) if err := drypkg.WriteJSON(*dryOutput, frame); err != nil { @@ -86,7 +86,6 @@ func main() { return } - // --- simulate --- if *simulate { summary, err := apppkg.RunSimulatedTransmit(cfg, *simulateOutput, *simulateDuration) if err != nil { @@ -96,28 +95,24 @@ func main() { return } - // --- TX mode --- if *txMode { driver := selectDriver(cfg) if driver == nil { - log.Fatal("no hardware driver available — build with -tags pluto (or -tags soapy)") + log.Fatal("no hardware driver available - build with -tags pluto (or -tags soapy)") } runTXMode(cfg, driver, *txAutoStart, *audioStdin, *audioRate, *audioHTTP) return } - // --- default: HTTP only --- srv := ctrlpkg.NewServer(cfg) server := ctrlpkg.NewHTTPServer(cfg, srv.Handler()) log.Printf("fm-rds-tx listening on %s (TX default: off, use --tx for hardware)", server.Addr) log.Fatal(server.ListenAndServe()) } -// selectDriver picks the best available driver based on config and build tags. func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver { kind := cfg.Backend.Kind - // Explicit PlutoSDR if kind == "pluto" || kind == "plutosdr" { if plutosdr.Available() { return plutosdr.NewPlutoDriver() @@ -125,7 +120,6 @@ func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver { log.Printf("warning: backend=%s but pluto driver not available (%s)", kind, plutosdr.AvailableError()) } - // Explicit SoapySDR if kind == "soapy" || kind == "soapysdr" { if soapysdr.Available() { return soapysdr.NewNativeDriver() @@ -133,7 +127,6 @@ func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver { log.Printf("warning: backend=%s but soapy driver not available", kind) } - // Auto-detect: prefer PlutoSDR, fall back to SoapySDR if plutosdr.Available() { log.Println("auto-selected: pluto-iio driver") return plutosdr.NewPlutoDriver() @@ -150,14 +143,11 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Configure driver - // OutputDrive controls composite signal level, NOT hardware gain. - // Hardware TX gain is always 0 dB (max power). Use external attenuator for power control. soapyCfg := platform.SoapyConfig{ Driver: cfg.Backend.Driver, Device: cfg.Backend.Device, CenterFreqHz: cfg.FM.FrequencyMHz * 1e6, - GainDB: 0, // 0 dB = max TX power on PlutoSDR + GainDB: 0, DeviceArgs: map[string]string{}, } if cfg.Backend.URI != "" { @@ -181,42 +171,45 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a caps.GainMinDB, caps.GainMaxDB, caps.MinSampleRate, caps.MaxSampleRate) } - // Engine engine := apppkg.NewEngine(cfg, driver) + cfg = applyLegacyAudioFlags(cfg, audioStdin, audioRate, audioHTTP) - // Live audio stream source (optional) var streamSrc *audio.StreamSource - if audioStdin || audioHTTP { - // Buffer: 2 seconds at input rate — enough to absorb jitter - bufferFrames := audioRate * 2 + var ingestRuntime *ingest.Runtime + var ingress ctrlpkg.AudioIngress + if cfg.Ingest.Kind != "" && cfg.Ingest.Kind != "none" { + rate := ingestSampleRate(cfg) + bufferFrames := rate * 2 if bufferFrames <= 0 { bufferFrames = 1 } - streamSrc = audio.NewStreamSource(bufferFrames, audioRate) + streamSrc = audio.NewStreamSource(bufferFrames, rate) engine.SetStreamSource(streamSrc) - if audioStdin { - go func() { - log.Printf("audio: reading S16LE stereo PCM from stdin at %d Hz", audioRate) - if err := audio.IngestReader(os.Stdin, streamSrc); err != nil { - log.Printf("audio: stdin ingest ended: %v", err) - } else { - log.Println("audio: stdin EOF") - } - }() + source, sourceIngress, err := buildPhase1Source(cfg) + if err != nil { + log.Fatalf("ingest source: %v", err) } - if audioHTTP { - log.Printf("audio: HTTP ingest enabled on /audio/stream (rate=%dHz, buffer=%d frames)", audioRate, streamSrc.Stats().Capacity) + ingestRuntime = ingest.NewRuntime(streamSrc, source) + if err := ingestRuntime.Start(ctx); err != nil { + log.Fatalf("ingest start: %v", err) } + ingress = sourceIngress + log.Printf("ingest: kind=%s rate=%dHz buffer=%d frames", cfg.Ingest.Kind, rate, streamSrc.Stats().Capacity) } - // Control plane srv := ctrlpkg.NewServer(cfg) srv.SetDriver(driver) srv.SetTXController(&txBridge{engine: engine}) if streamSrc != nil { srv.SetStreamSource(streamSrc) } + if ingress != nil { + srv.SetAudioIngress(ingress) + } + if ingestRuntime != nil { + srv.SetIngestRuntime(ingestRuntime) + } if autoStart { log.Println("TX: auto-start enabled") @@ -225,7 +218,7 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a } log.Printf("TX ACTIVE: freq=%.3fMHz rate=%.0fHz", cfg.FM.FrequencyMHz, cfg.EffectiveDeviceRate()) } else { - log.Println("TX ready (idle) — POST /tx/start to begin") + log.Println("TX ready (idle) - POST /tx/start to begin") } ctrlServer := ctrlpkg.NewHTTPServer(cfg, srv.Handler()) @@ -242,10 +235,56 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a log.Printf("received %s, shutting down...", sig) _ = engine.Stop(ctx) + if ingestRuntime != nil { + _ = ingestRuntime.Stop() + } _ = driver.Close(ctx) log.Println("shutdown complete") } +func applyLegacyAudioFlags(cfg cfgpkg.Config, audioStdin bool, audioRate int, audioHTTP bool) cfgpkg.Config { + if audioRate > 0 { + cfg.Ingest.Stdin.SampleRateHz = audioRate + cfg.Ingest.HTTPRaw.SampleRateHz = audioRate + } + if audioStdin && audioHTTP { + log.Printf("audio: both --audio-stdin and --audio-http set; using ingest kind=stdin") + } + if audioStdin { + cfg.Ingest.Kind = "stdin" + } + if audioHTTP && !audioStdin { + cfg.Ingest.Kind = "http-raw" + } + return cfg +} + +func ingestSampleRate(cfg cfgpkg.Config) int { + switch cfg.Ingest.Kind { + case "stdin", "stdin-pcm": + return cfg.Ingest.Stdin.SampleRateHz + case "http-raw": + return cfg.Ingest.HTTPRaw.SampleRateHz + default: + return 44100 + } +} + +func buildPhase1Source(cfg cfgpkg.Config) (ingest.Source, ctrlpkg.AudioIngress, error) { + switch cfg.Ingest.Kind { + case "stdin", "stdin-pcm": + src := stdinpcm.New("stdin-main", os.Stdin, cfg.Ingest.Stdin.SampleRateHz, cfg.Ingest.Stdin.Channels, 1024) + return src, nil, nil + case "http-raw": + src := httpraw.New("http-raw-main", cfg.Ingest.HTTPRaw.SampleRateHz, cfg.Ingest.HTTPRaw.Channels) + return src, src, nil + case "", "none": + return nil, nil, nil + default: + return nil, nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind) + } +} + type txBridge struct{ engine *apppkg.Engine } func (b *txBridge) StartTX() error { return b.engine.Start(context.Background()) } diff --git a/internal/control/control.go b/internal/control/control.go index 381a637..1b93a05 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -14,6 +14,7 @@ import ( "github.com/jan/fm-rds-tx/internal/audio" "github.com/jan/fm-rds-tx/internal/config" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" + "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/platform" ) @@ -46,12 +47,22 @@ type LivePatch struct { } type Server struct { - mu sync.RWMutex - cfg config.Config - tx TXController - drv platform.SoapyDriver // optional, for runtime stats - streamSrc *audio.StreamSource // optional, for live audio ingest - audit auditCounters + mu sync.RWMutex + cfg config.Config + tx TXController + drv platform.SoapyDriver // optional, for runtime stats + streamSrc *audio.StreamSource // optional, for live audio ring stats + audioIngress AudioIngress // optional, for /audio/stream + ingestRt IngestRuntime // optional, for /runtime ingest stats + audit auditCounters +} + +type AudioIngress interface { + WritePCM16(data []byte) (int, error) +} + +type IngestRuntime interface { + Stats() ingest.Stats } type auditEvent string @@ -196,6 +207,18 @@ func (s *Server) SetStreamSource(src *audio.StreamSource) { s.mu.Unlock() } +func (s *Server) SetAudioIngress(ingress AudioIngress) { + s.mu.Lock() + s.audioIngress = ingress + s.mu.Unlock() +} + +func (s *Server) SetIngestRuntime(rt IngestRuntime) { + s.mu.Lock() + s.ingestRt = rt + s.mu.Unlock() +} + func (s *Server) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/", s.handleUI) @@ -268,6 +291,7 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) { drv := s.drv tx := s.tx stream := s.streamSrc + ingestRt := s.ingestRt s.mu.RUnlock() result := map[string]any{} @@ -280,6 +304,9 @@ func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) { if stream != nil { result["audioStream"] = stream.Stats() } + if ingestRt != nil { + result["ingest"] = ingestRt.Stats() + } result["controlAudit"] = s.auditSnapshot() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(result) @@ -311,8 +338,9 @@ func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes // it into the live audio ring buffer. Use with: -// curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw -// ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream +// +// curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw +// ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { s.recordAudit(auditMethodNotAllowed) @@ -325,11 +353,11 @@ func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) { return } s.mu.RLock() - stream := s.streamSrc + ingress := s.audioIngress s.mu.RUnlock() - if stream == nil { - http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable) + if ingress == nil { + http.Error(w, "audio ingest not configured (use --audio-http with ingest runtime)", http.StatusServiceUnavailable) return } @@ -341,7 +369,12 @@ func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) { for { n, err := r.Body.Read(buf) if n > 0 { - totalFrames += stream.WritePCM(buf[:n]) + written, writeErr := ingress.WritePCM16(buf[:n]) + totalFrames += written + if writeErr != nil { + http.Error(w, writeErr.Error(), http.StatusServiceUnavailable) + return + } } if err != nil { if err == io.EOF { @@ -362,7 +395,6 @@ func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(map[string]any{ "ok": true, "frames": totalFrames, - "stats": stream.Stats(), }) } diff --git a/internal/control/control_test.go b/internal/control/control_test.go index e25ea07..176ff7a 100644 --- a/internal/control/control_test.go +++ b/internal/control/control_test.go @@ -9,7 +9,6 @@ import ( "strings" "testing" - "github.com/jan/fm-rds-tx/internal/audio" cfgpkg "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/output" ) @@ -317,8 +316,8 @@ func TestAudioStreamRequiresSource(t *testing.T) { func TestAudioStreamPushesPCM(t *testing.T) { cfg := cfgpkg.Default() srv := NewServer(cfg) - stream := audio.NewStreamSource(256, 44100) - srv.SetStreamSource(stream) + ingress := &fakeAudioIngress{} + srv.SetAudioIngress(ingress) pcm := []byte{0, 0, 0, 0} rec := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader(pcm)) @@ -338,12 +337,8 @@ func TestAudioStreamPushesPCM(t *testing.T) { if frames != 1 { t.Fatalf("expected 1 frame, got %v", frames) } - stats, ok := body["stats"].(map[string]any) - if !ok { - t.Fatalf("missing stats: %v", body["stats"]) - } - if avail, _ := stats["available"].(float64); avail < 1 { - t.Fatalf("expected stats.available >= 1, got %v", avail) + if ingress.totalFrames != 1 { + t.Fatalf("expected ingress frames=1, got %d", ingress.totalFrames) } } @@ -360,7 +355,7 @@ func TestAudioStreamRejectsNonPost(t *testing.T) { func TestAudioStreamRejectsMissingContentType(t *testing.T) { cfg := cfgpkg.Default() srv := NewServer(cfg) - srv.SetStreamSource(audio.NewStreamSource(256, 44100)) + srv.SetAudioIngress(&fakeAudioIngress{}) rec := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader([]byte{0, 0})) srv.Handler().ServeHTTP(rec, req) @@ -375,7 +370,7 @@ func TestAudioStreamRejectsMissingContentType(t *testing.T) { func TestAudioStreamRejectsUnsupportedContentType(t *testing.T) { cfg := cfgpkg.Default() srv := NewServer(cfg) - srv.SetStreamSource(audio.NewStreamSource(256, 44100)) + srv.SetAudioIngress(&fakeAudioIngress{}) rec := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader([]byte{0, 0})) req.Header.Set("Content-Type", "text/plain") @@ -397,7 +392,7 @@ func TestAudioStreamRejectsBodyTooLarge(t *testing.T) { limit := int(audioStreamBodyLimit) body := make([]byte, limit+1) srv := NewServer(cfgpkg.Default()) - srv.SetStreamSource(audio.NewStreamSource(256, 44100)) + srv.SetAudioIngress(&fakeAudioIngress{}) rec := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/octet-stream") @@ -524,7 +519,7 @@ func TestControlAuditTracksMethodNotAllowed(t *testing.T) { func TestControlAuditTracksUnsupportedMediaType(t *testing.T) { srv := NewServer(cfgpkg.Default()) - srv.SetStreamSource(audio.NewStreamSource(256, 44100)) + srv.SetAudioIngress(&fakeAudioIngress{}) rec := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "/audio/stream", bytes.NewReader([]byte{0, 0})) srv.Handler().ServeHTTP(rec, req) @@ -605,6 +600,16 @@ type fakeTXController struct { stats map[string]any } +type fakeAudioIngress struct { + totalFrames int +} + +func (f *fakeAudioIngress) WritePCM16(data []byte) (int, error) { + frames := len(data) / 4 + f.totalFrames += frames + return frames, nil +} + func (f *fakeTXController) StartTX() error { return nil } func (f *fakeTXController) StopTX() error { return nil } func (f *fakeTXController) TXStats() map[string]any { diff --git a/internal/ingest/adapters/httpraw/source.go b/internal/ingest/adapters/httpraw/source.go new file mode 100644 index 0000000..e9ba054 --- /dev/null +++ b/internal/ingest/adapters/httpraw/source.go @@ -0,0 +1,133 @@ +package httpraw + +import ( + "context" + "encoding/binary" + "fmt" + "sync/atomic" + "time" + + "github.com/jan/fm-rds-tx/internal/ingest" +) + +type Source struct { + id string + sampleRate int + channels int + + chunks chan ingest.PCMChunk + errs chan error + + sequence atomic.Uint64 + state atomic.Value // string + chunksIn atomic.Uint64 + samplesIn atomic.Uint64 + discontinuities atomic.Uint64 + lastChunkAtUnix atomic.Int64 + lastError atomic.Value // string +} + +func New(id string, sampleRate, channels int) *Source { + if id == "" { + id = "http-raw" + } + if sampleRate <= 0 { + sampleRate = 44100 + } + if channels <= 0 { + channels = 2 + } + s := &Source{ + id: id, + sampleRate: sampleRate, + channels: channels, + chunks: make(chan ingest.PCMChunk, 32), + errs: make(chan error, 8), + } + s.state.Store("idle") + return s +} + +func (s *Source) Descriptor() ingest.SourceDescriptor { + return ingest.SourceDescriptor{ + ID: s.id, + Kind: "http-raw", + Family: "raw", + Transport: "http", + Codec: "pcm_s16le", + Channels: s.channels, + SampleRateHz: s.sampleRate, + Detail: "HTTP push /audio/stream", + } +} + +func (s *Source) Start(_ context.Context) error { + s.state.Store("running") + return nil +} + +func (s *Source) Stop() error { + s.state.Store("stopped") + return nil +} + +func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks } +func (s *Source) Errors() <-chan error { return s.errs } + +func (s *Source) Stats() ingest.SourceStats { + state, _ := s.state.Load().(string) + last := s.lastChunkAtUnix.Load() + errStr, _ := s.lastError.Load().(string) + var lastChunkAt time.Time + if last > 0 { + lastChunkAt = time.Unix(0, last) + } + return ingest.SourceStats{ + State: state, + Connected: state == "running", + LastChunkAt: lastChunkAt, + ChunksIn: s.chunksIn.Load(), + SamplesIn: s.samplesIn.Load(), + Discontinuities: s.discontinuities.Load(), + LastError: errStr, + } +} + +func (s *Source) WritePCM16(data []byte) (int, error) { + if s.channels != 1 && s.channels != 2 { + return 0, fmt.Errorf("unsupported configured channels: %d", s.channels) + } + if len(data) == 0 { + return 0, nil + } + frameBytes := s.channels * 2 + usable := len(data) - (len(data) % frameBytes) + if usable == 0 { + return 0, nil + } + samples := make([]int32, 0, usable/2) + for i := 0; i+1 < usable; i += 2 { + v := int16(binary.LittleEndian.Uint16(data[i : i+2])) + samples = append(samples, int32(v)<<16) + } + seq := s.sequence.Add(1) - 1 + chunk := ingest.PCMChunk{ + Samples: samples, + Channels: s.channels, + SampleRateHz: s.sampleRate, + Sequence: seq, + Timestamp: time.Now(), + SourceID: s.id, + } + select { + case s.chunks <- chunk: + default: + s.discontinuities.Add(1) + return 0, fmt.Errorf("http raw ingress overflow") + } + frames := usable / frameBytes + s.chunksIn.Add(1) + s.samplesIn.Add(uint64(len(samples))) + s.lastChunkAtUnix.Store(time.Now().UnixNano()) + return frames, nil +} diff --git a/internal/ingest/runtime.go b/internal/ingest/runtime.go index 27c7db5..df0048c 100644 --- a/internal/ingest/runtime.go +++ b/internal/ingest/runtime.go @@ -34,6 +34,12 @@ func NewRuntime(sink *audio.StreamSource, src Source) *Runtime { } func (r *Runtime) Start(ctx context.Context) error { + if r.sink == nil { + r.mu.Lock() + r.stats.State = "failed" + r.mu.Unlock() + return nil + } if r.source == nil { r.mu.Lock() r.stats.State = "idle" @@ -91,7 +97,11 @@ func (r *Runtime) run() { select { case <-r.ctx.Done(): return - case err := <-errCh: + case err, ok := <-errCh: + if !ok { + errCh = nil + continue + } if err == nil { continue } @@ -100,6 +110,9 @@ func (r *Runtime) run() { r.mu.Unlock() case chunk, ok := <-ch: if !ok { + r.mu.Lock() + r.stats.State = "stopped" + r.mu.Unlock() return } r.handleChunk(chunk)