|
- package main
-
- import (
- "context"
- "flag"
- "fmt"
- "log"
- "os"
- "os/signal"
- "strings"
- "syscall"
- "time"
-
- apppkg "github.com/jan/fm-rds-tx/internal/app"
- "github.com/jan/fm-rds-tx/internal/license"
- "github.com/jan/fm-rds-tx/internal/audio"
- cfgpkg "github.com/jan/fm-rds-tx/internal/config"
- ctrlpkg "github.com/jan/fm-rds-tx/internal/control"
- drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
- "github.com/jan/fm-rds-tx/internal/ingest"
- "github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast"
- ingestfactory "github.com/jan/fm-rds-tx/internal/ingest/factory"
- "github.com/jan/fm-rds-tx/internal/platform"
- "github.com/jan/fm-rds-tx/internal/platform/plutosdr"
- "github.com/jan/fm-rds-tx/internal/platform/soapysdr"
- )
-
- func main() {
- configPath := flag.String("config", "", "path to JSON config")
- printConfig := flag.Bool("print-config", false, "print effective config and exit")
- dryRun := flag.Bool("dry-run", false, "run no-hardware dry-run output")
- dryOutput := flag.String("dry-output", "-", "dry-run output path or - for stdout")
- simulate := flag.Bool("simulate-tx", false, "run simulated Soapy/backend transmit path")
- simulateOutput := flag.String("simulate-output", "", "simulated transmit output file")
- simulateDuration := flag.Duration("simulate-duration", 500*time.Millisecond, "simulated transmit duration")
- txMode := flag.Bool("tx", false, "start real TX mode (requires hardware + build tags)")
- txAutoStart := flag.Bool("tx-auto-start", false, "auto-start TX on launch")
- licenseKey := flag.String("license", "", "fm-rds-tx license key (omit for evaluation mode with jingle)")
- listDevices := flag.Bool("list-devices", false, "enumerate SoapySDR devices and exit")
- audioStdin := flag.Bool("audio-stdin", false, "read S16LE stereo PCM audio from stdin")
- audioRate := flag.Int("audio-rate", 44100, "sample rate of stdin audio input (Hz)")
- audioHTTP := flag.Bool("audio-http", false, "enable HTTP audio ingest via /audio/stream")
- flag.Parse()
-
- if *listDevices {
- devices, err := soapysdr.Enumerate()
- if err != nil {
- log.Fatalf("enumerate: %v", err)
- }
- if len(devices) == 0 {
- fmt.Println("no SoapySDR devices found")
- return
- }
- for i, dev := range devices {
- fmt.Printf("device %d:\n", i)
- for k, v := range dev {
- fmt.Printf(" %s = %s\n", k, v)
- }
- }
- return
- }
-
- cfg, err := cfgpkg.Load(*configPath)
- if err != nil {
- log.Fatalf("load config: %v", err)
- }
-
- if *printConfig {
- preemph := "off"
- if cfg.FM.PreEmphasisTauUS > 0 {
- preemph = fmt.Sprintf("%.0fus", cfg.FM.PreEmphasisTauUS)
- }
- fmt.Printf("backend=%s freq=%.1fMHz stereo=%t rds=%t preemph=%s limiter=%t fmmod=%t deviation=+-%.0fHz compositeRate=%dHz deviceRate=%.0fHz listen=%s pluto=%t soapy=%t\n",
- cfg.Backend.Kind, cfg.FM.FrequencyMHz, cfg.FM.StereoEnabled, cfg.RDS.Enabled,
- preemph, cfg.FM.LimiterEnabled, cfg.FM.FMModulationEnabled, cfg.FM.MaxDeviationHz,
- cfg.FM.CompositeRateHz, cfg.EffectiveDeviceRate(), cfg.Control.ListenAddress,
- plutosdr.Available(), soapysdr.Available())
- return
- }
-
- if *dryRun {
- frame := drypkg.Generate(cfg)
- if err := drypkg.WriteJSON(*dryOutput, frame); err != nil {
- log.Fatalf("dry-run: %v", err)
- }
- if *dryOutput != "" && *dryOutput != "-" {
- fmt.Fprintf(os.Stderr, "dry run frame written to %s\n", *dryOutput)
- }
- return
- }
-
- if *simulate {
- summary, err := apppkg.RunSimulatedTransmit(cfg, *simulateOutput, *simulateDuration)
- if err != nil {
- log.Fatalf("simulate-tx: %v", err)
- }
- fmt.Println(summary)
- return
- }
-
- if *txMode {
- driver := selectDriver(cfg)
- if driver == nil {
- log.Fatal("no hardware driver available - build with -tags pluto (or -tags soapy)")
- }
- runTXMode(cfg, *configPath, driver, *txAutoStart, *audioStdin, *audioRate, *audioHTTP, *licenseKey)
- return
- }
-
- srv := ctrlpkg.NewServer(cfg)
- configureControlPlanePersistence(srv, *configPath, nil)
- server := ctrlpkg.NewHTTPServer(cfg, srv.Handler())
- log.Printf("fm-rds-tx listening on %s (TX default: off, use --tx for hardware)", server.Addr)
- log.Fatal(server.ListenAndServe())
- }
-
- func selectDriver(cfg cfgpkg.Config) platform.SoapyDriver {
- kind := cfg.Backend.Kind
-
- if kind == "pluto" || kind == "plutosdr" {
- if plutosdr.Available() {
- return plutosdr.NewPlutoDriver()
- }
- log.Printf("warning: backend=%s but pluto driver not available (%s)", kind, plutosdr.AvailableError())
- }
-
- if kind == "soapy" || kind == "soapysdr" {
- if soapysdr.Available() {
- return soapysdr.NewNativeDriver()
- }
- log.Printf("warning: backend=%s but soapy driver not available", kind)
- }
-
- if plutosdr.Available() {
- log.Println("auto-selected: pluto-iio driver")
- return plutosdr.NewPlutoDriver()
- }
- if soapysdr.Available() {
- log.Println("auto-selected: soapy-native driver")
- return soapysdr.NewNativeDriver()
- }
-
- return nil
- }
-
- func runTXMode(cfg cfgpkg.Config, configPath string, driver platform.SoapyDriver, autoStart bool, audioStdin bool, audioRate int, audioHTTP bool, licenseKey string) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- soapyCfg := platform.SoapyConfig{
- Driver: cfg.Backend.Driver,
- Device: cfg.Backend.Device,
- CenterFreqHz: cfg.FM.FrequencyMHz * 1e6,
- GainDB: 0,
- DeviceArgs: map[string]string{},
- }
- if cfg.Backend.URI != "" {
- soapyCfg.DeviceArgs["uri"] = cfg.Backend.URI
- }
- for k, v := range cfg.Backend.DeviceArgs {
- soapyCfg.DeviceArgs[k] = v
- }
- soapyCfg.SampleRateHz = cfg.EffectiveDeviceRate()
-
- log.Printf("TX: configuring %s freq=%.3fMHz rate=%.0fHz gain=%.1fdB",
- driver.Name(), cfg.FM.FrequencyMHz, soapyCfg.SampleRateHz, soapyCfg.GainDB)
-
- if err := driver.Configure(ctx, soapyCfg); err != nil {
- log.Fatalf("configure: %v", err)
- }
-
- caps, err := driver.Capabilities(ctx)
- if err == nil {
- log.Printf("TX: device caps: gain=%.0f..%.0f dB, rate=%.0f..%.0f Hz",
- caps.GainMinDB, caps.GainMaxDB, caps.MinSampleRate, caps.MaxSampleRate)
- }
-
- engine := apppkg.NewEngine(cfg, driver)
-
- // License setup.
- licState := license.NewState(licenseKey)
- if licState.Licensed() {
- log.Println("license: valid key — evaluation jingle disabled")
- } else {
- log.Printf("license: no valid key — evaluation jingle every %d minutes", license.JingleIntervalMinutes)
- }
- engine.SetLicenseState(licState, licenseKey)
- cfg = applyLegacyAudioFlags(cfg, audioStdin, audioRate, audioHTTP)
-
- var streamSrc *audio.StreamSource
- var ingestRuntime *ingest.Runtime
- var ingress ctrlpkg.AudioIngress
- if ingestEnabled(cfg.Ingest.Kind) {
- rate := ingestfactory.SampleRateForKind(cfg)
- bufferFrames := rate * 2
- if bufferFrames <= 0 {
- bufferFrames = 1
- }
- streamSrc = audio.NewStreamSource(bufferFrames, rate)
- engine.SetStreamSource(streamSrc)
-
- source, sourceIngress, err := ingestfactory.BuildSource(ctx, cfg, ingestfactory.Deps{Stdin: os.Stdin})
- if err != nil {
- log.Fatalf("ingest source: %v", err)
- }
- runtimeOpts := []ingest.RuntimeOption{}
- runtimeOpts = append(runtimeOpts, ingest.WithPrebufferMs(cfg.Ingest.PrebufferMs))
- if cfg.Ingest.Icecast.RadioText.Enabled {
- relay := icecast.NewRadioTextRelay(
- icecast.RadioTextOptions{
- Enabled: true,
- Prefix: cfg.Ingest.Icecast.RadioText.Prefix,
- MaxLen: cfg.Ingest.Icecast.RadioText.MaxLen,
- OnlyOnChange: cfg.Ingest.Icecast.RadioText.OnlyOnChange,
- },
- cfg.RDS.RadioText,
- func(rt string) error {
- return engine.UpdateConfig(apppkg.LiveConfigUpdate{RadioText: &rt})
- },
- )
- runtimeOpts = append(runtimeOpts, ingest.WithStreamTitleHandler(func(streamTitle string) {
- if err := relay.HandleStreamTitle(streamTitle); err != nil {
- log.Printf("ingest: failed to forward StreamTitle to RDS RadioText: %v", err)
- }
- }))
- log.Printf(
- "ingest: ICY StreamTitle->RDS enabled (maxLen=%d onlyOnChange=%t prefix=%q)",
- cfg.Ingest.Icecast.RadioText.MaxLen,
- cfg.Ingest.Icecast.RadioText.OnlyOnChange,
- cfg.Ingest.Icecast.RadioText.Prefix,
- )
- }
- ingestRuntime = ingest.NewRuntime(streamSrc, source, runtimeOpts...)
- if err := ingestRuntime.Start(ctx); err != nil {
- log.Fatalf("ingest start: %v", err)
- }
- ingress = sourceIngress
- log.Printf("ingest: kind=%s rate=%dHz buffer=%d frames", cfg.Ingest.Kind, rate, streamSrc.Stats().Capacity)
- }
-
- srv := ctrlpkg.NewServer(cfg)
- configureControlPlanePersistence(srv, configPath, cancel)
- srv.SetDriver(driver)
- srv.SetTXController(&txBridge{engine: engine})
- if streamSrc != nil {
- srv.SetStreamSource(streamSrc)
- }
- if ingress != nil {
- srv.SetAudioIngress(ingress)
- }
- if ingestRuntime != nil {
- srv.SetIngestRuntime(ingestRuntime)
- }
-
- if autoStart {
- log.Println("TX: auto-start enabled")
- if err := engine.Start(ctx); err != nil {
- log.Fatalf("engine start: %v", err)
- }
- log.Printf("TX ACTIVE: freq=%.3fMHz rate=%.0fHz", cfg.FM.FrequencyMHz, cfg.EffectiveDeviceRate())
- } else {
- log.Println("TX ready (idle) - POST /tx/start to begin")
- }
-
- ctrlServer := ctrlpkg.NewHTTPServer(cfg, srv.Handler())
- go func() {
- log.Printf("control plane on %s (read=%s write=%s idle=%s)", ctrlServer.Addr, ctrlServer.ReadTimeout, ctrlServer.WriteTimeout, ctrlServer.IdleTimeout)
- if err := ctrlServer.ListenAndServe(); err != nil {
- log.Printf("http: %v", err)
- }
- }()
-
- sigCh := make(chan os.Signal, 1)
- signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
- sig := <-sigCh
- log.Printf("received %s, shutting down...", sig)
-
- _ = engine.Stop(ctx)
- if ingestRuntime != nil {
- _ = ingestRuntime.Stop()
- }
- _ = driver.Close(ctx)
- log.Println("shutdown complete")
- }
-
- func configureControlPlanePersistence(srv *ctrlpkg.Server, configPath string, cancel context.CancelFunc) {
- if strings.TrimSpace(configPath) == "" {
- return
- }
- srv.SetConfigSaver(func(next cfgpkg.Config) error {
- return cfgpkg.Save(configPath, next)
- })
- srv.SetHardReload(func() {
- // BUG-5 fix: cancel the app context instead of os.Exit(0).
- // os.Exit skips all defers, Flush/Stop calls, and driver cleanup.
- // Cancelling ctx lets the normal shutdown sequence run: engine.Stop,
- // ingestRuntime.Stop, driver.Close — then the process exits naturally.
- // The supervisor (systemd etc.) will restart the process as intended.
- log.Printf("control: hard reload — cancelling app context for clean restart")
- if cancel != nil {
- cancel()
- }
- })
- }
-
- func ingestEnabled(kind string) bool {
- normalized := strings.ToLower(strings.TrimSpace(kind))
- return normalized != "" && normalized != "none"
- }
-
- func applyLegacyAudioFlags(cfg cfgpkg.Config, audioStdin bool, audioRate int, audioHTTP bool) cfgpkg.Config {
- if audioRate > 0 {
- cfg.Ingest.Stdin.SampleRateHz = audioRate
- cfg.Ingest.HTTPRaw.SampleRateHz = audioRate
- }
- if audioStdin && audioHTTP {
- log.Printf("audio: both --audio-stdin and --audio-http set; using ingest kind=stdin")
- }
- if audioStdin {
- cfg.Ingest.Kind = "stdin"
- }
- if audioHTTP && !audioStdin {
- cfg.Ingest.Kind = "http-raw"
- }
- return cfg
- }
-
- type txBridge struct{ engine *apppkg.Engine }
-
- func (b *txBridge) StartTX() error { return b.engine.Start(context.Background()) }
- func (b *txBridge) StopTX() error { return b.engine.Stop(context.Background()) }
- func (b *txBridge) TXStats() map[string]any {
- s := b.engine.Stats()
- return map[string]any{
- "runtimeStateDurationSeconds": s.RuntimeStateDurationSeconds,
- "state": s.State,
- "chunksProduced": s.ChunksProduced,
- "totalSamples": s.TotalSamples,
- "underruns": s.Underruns,
- "lateBuffers": s.LateBuffers,
- "lastError": s.LastError,
- "uptimeSeconds": s.UptimeSeconds,
- "maxCycleMs": s.MaxCycleMs,
- "maxGenerateMs": s.MaxGenerateMs,
- "maxUpsampleMs": s.MaxUpsampleMs,
- "maxWriteMs": s.MaxWriteMs,
- "queue": s.Queue,
- "runtimeIndicator": s.RuntimeIndicator,
- "runtimeAlert": s.RuntimeAlert,
- "appliedFrequencyMHz": s.AppliedFrequencyMHz,
- "activePS": s.ActivePS,
- "activeRadioText": s.ActiveRadioText,
- "degradedTransitions": s.DegradedTransitions,
- "mutedTransitions": s.MutedTransitions,
- "faultedTransitions": s.FaultedTransitions,
- "faultCount": s.FaultCount,
- "faultHistory": s.FaultHistory,
- "transitionHistory": s.TransitionHistory,
- "lastFault": s.LastFault,
- }
- }
- func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error {
- return b.engine.UpdateConfig(apppkg.LiveConfigUpdate{
- FrequencyMHz: lp.FrequencyMHz,
- OutputDrive: lp.OutputDrive,
- StereoEnabled: lp.StereoEnabled,
- PilotLevel: lp.PilotLevel,
- RDSInjection: lp.RDSInjection,
- RDSEnabled: lp.RDSEnabled,
- LimiterEnabled: lp.LimiterEnabled,
- LimiterCeiling: lp.LimiterCeiling,
- PS: lp.PS,
- RadioText: lp.RadioText,
- ToneLeftHz: lp.ToneLeftHz,
- ToneRightHz: lp.ToneRightHz,
- ToneAmplitude: lp.ToneAmplitude,
- AudioGain: lp.AudioGain,
- CompositeClipperEnabled: lp.CompositeClipperEnabled,
- })
- }
-
- func (b *txBridge) ResetFault() error {
- return b.engine.ResetFault()
- }
|