From 8794d484cd9f95e3dad79c9cfd55737c8dfebd7c Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 08:21:58 +0200 Subject: [PATCH] ingest: use unified factory in main and harden ingest config validation --- cmd/fmrtx/main.go | 43 ++------------------------- internal/config/config.go | 32 +++++++++++++++----- internal/config/config_test.go | 53 ++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 48 deletions(-) diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index 5631bca..203b2e2 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -16,9 +16,7 @@ import ( ctrlpkg "github.com/jan/fm-rds-tx/internal/control" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" "github.com/jan/fm-rds-tx/internal/ingest" - "github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw" - "github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast" - "github.com/jan/fm-rds-tx/internal/ingest/adapters/stdinpcm" + ingestfactory "github.com/jan/fm-rds-tx/internal/ingest/factory" "github.com/jan/fm-rds-tx/internal/platform" "github.com/jan/fm-rds-tx/internal/platform/plutosdr" "github.com/jan/fm-rds-tx/internal/platform/soapysdr" @@ -179,7 +177,7 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a var ingestRuntime *ingest.Runtime var ingress ctrlpkg.AudioIngress if cfg.Ingest.Kind != "" && cfg.Ingest.Kind != "none" { - rate := ingestSampleRate(cfg) + rate := ingestfactory.SampleRateForKind(cfg) bufferFrames := rate * 2 if bufferFrames <= 0 { bufferFrames = 1 @@ -187,7 +185,7 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a streamSrc = audio.NewStreamSource(bufferFrames, rate) engine.SetStreamSource(streamSrc) - source, sourceIngress, err := buildPhase1Source(cfg) + source, sourceIngress, err := ingestfactory.BuildSource(cfg, ingestfactory.Deps{Stdin: os.Stdin}) if err != nil { log.Fatalf("ingest source: %v", err) } @@ -260,41 +258,6 @@ func applyLegacyAudioFlags(cfg cfgpkg.Config, audioStdin bool, audioRate int, au return cfg } -func ingestSampleRate(cfg cfgpkg.Config) int { - switch cfg.Ingest.Kind { - case "stdin", "stdin-pcm": - return cfg.Ingest.Stdin.SampleRateHz - case "http-raw": - return cfg.Ingest.HTTPRaw.SampleRateHz - case "icecast": - return 44100 - default: - return 44100 - } -} - -func buildPhase1Source(cfg cfgpkg.Config) (ingest.Source, ctrlpkg.AudioIngress, error) { - switch cfg.Ingest.Kind { - case "stdin", "stdin-pcm": - src := stdinpcm.New("stdin-main", os.Stdin, cfg.Ingest.Stdin.SampleRateHz, cfg.Ingest.Stdin.Channels, 1024) - return src, nil, nil - case "http-raw": - src := httpraw.New("http-raw-main", cfg.Ingest.HTTPRaw.SampleRateHz, cfg.Ingest.HTTPRaw.Channels) - return src, src, nil - case "icecast": - src := icecast.New("icecast-main", cfg.Ingest.Icecast.URL, nil, icecast.ReconnectConfig{ - Enabled: cfg.Ingest.Reconnect.Enabled, - InitialBackoffMs: cfg.Ingest.Reconnect.InitialBackoffMs, - MaxBackoffMs: cfg.Ingest.Reconnect.MaxBackoffMs, - }) - return src, nil, nil - case "", "none": - return nil, nil, nil - default: - return nil, nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind) - } -} - type txBridge struct{ engine *apppkg.Engine } func (b *txBridge) StartTX() error { return b.engine.Start(context.Background()) } diff --git a/internal/config/config.go b/internal/config/config.go index 5ea3506..2dff082 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -137,7 +137,7 @@ func Default() Config { Format: "s16le", }, Icecast: IngestIcecastConfig{ - Decoder: "native", + Decoder: "auto", }, }, } @@ -207,9 +207,6 @@ func (c Config) Validate() error { if c.FM.LimiterCeiling < 0 || c.FM.LimiterCeiling > 2 { return fmt.Errorf("fm.limiterCeiling out of range") } - if c.FM.MpxGain == 0 { - c.FM.MpxGain = 1.0 - } // default if omitted from JSON if c.FM.MpxGain < 0.1 || c.FM.MpxGain > 5 { return fmt.Errorf("fm.mpxGain out of range (0.1..5)") } @@ -228,6 +225,11 @@ func (c Config) Validate() error { if c.Ingest.Kind == "" { c.Ingest.Kind = "none" } + switch strings.ToLower(strings.TrimSpace(c.Ingest.Kind)) { + case "none", "stdin", "stdin-pcm", "http-raw", "icecast": + default: + return fmt.Errorf("ingest.kind unsupported: %s", c.Ingest.Kind) + } if c.Ingest.PrebufferMs < 0 { return fmt.Errorf("ingest.prebufferMs must be >= 0") } @@ -237,18 +239,32 @@ func (c Config) Validate() error { if c.Ingest.Reconnect.InitialBackoffMs < 0 || c.Ingest.Reconnect.MaxBackoffMs < 0 { return fmt.Errorf("ingest.reconnect backoff must be >= 0") } + if c.Ingest.Reconnect.Enabled && c.Ingest.Reconnect.InitialBackoffMs <= 0 { + return fmt.Errorf("ingest.reconnect.initialBackoffMs must be > 0 when reconnect is enabled") + } + if c.Ingest.Reconnect.Enabled && c.Ingest.Reconnect.MaxBackoffMs <= 0 { + return fmt.Errorf("ingest.reconnect.maxBackoffMs must be > 0 when reconnect is enabled") + } if c.Ingest.Reconnect.MaxBackoffMs > 0 && c.Ingest.Reconnect.InitialBackoffMs > c.Ingest.Reconnect.MaxBackoffMs { return fmt.Errorf("ingest.reconnect.initialBackoffMs must be <= maxBackoffMs") } - if c.Ingest.Stdin.SampleRateHz < 0 || c.Ingest.HTTPRaw.SampleRateHz < 0 { - return fmt.Errorf("ingest pcm sampleRateHz must be >= 0") + if c.Ingest.Stdin.SampleRateHz <= 0 || c.Ingest.HTTPRaw.SampleRateHz <= 0 { + return fmt.Errorf("ingest pcm sampleRateHz must be > 0") } - if c.Ingest.Stdin.Channels < 0 || c.Ingest.HTTPRaw.Channels < 0 { - return fmt.Errorf("ingest pcm channels must be >= 0") + if (c.Ingest.Stdin.Channels != 1 && c.Ingest.Stdin.Channels != 2) || (c.Ingest.HTTPRaw.Channels != 1 && c.Ingest.HTTPRaw.Channels != 2) { + return fmt.Errorf("ingest pcm channels must be 1 or 2") + } + if strings.ToLower(strings.TrimSpace(c.Ingest.Stdin.Format)) != "s16le" || strings.ToLower(strings.TrimSpace(c.Ingest.HTTPRaw.Format)) != "s16le" { + return fmt.Errorf("ingest pcm format must be s16le") } if c.Ingest.Kind == "icecast" && strings.TrimSpace(c.Ingest.Icecast.URL) == "" { return fmt.Errorf("ingest.icecast.url is required when ingest.kind=icecast") } + switch strings.ToLower(strings.TrimSpace(c.Ingest.Icecast.Decoder)) { + case "", "auto", "native", "ffmpeg", "fallback": + default: + return fmt.Errorf("ingest.icecast.decoder unsupported: %s", c.Ingest.Icecast.Decoder) + } // Fail-loud PI validation if c.RDS.Enabled { if _, err := ParsePI(c.RDS.PI); err != nil { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 079d9c0..7236eff 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -123,3 +123,56 @@ func TestEffectiveDeviceRate(t *testing.T) { t.Fatal("expected 912000") } } + +func TestValidateRejectsUnsupportedIngestKind(t *testing.T) { + cfg := Default() + cfg.Ingest.Kind = "unsupported" + if err := cfg.Validate(); err == nil { + t.Fatal("expected error") + } +} + +func TestValidateRejectsUnsupportedIngestPCMShape(t *testing.T) { + cfg := Default() + cfg.Ingest.Stdin.SampleRateHz = 0 + if err := cfg.Validate(); err == nil { + t.Fatal("expected sampleRate error") + } + + cfg = Default() + cfg.Ingest.HTTPRaw.Channels = 6 + if err := cfg.Validate(); err == nil { + t.Fatal("expected channels error") + } + + cfg = Default() + cfg.Ingest.Stdin.Format = "f32le" + if err := cfg.Validate(); err == nil { + t.Fatal("expected format error") + } +} + +func TestValidateRejectsUnsupportedIcecastDecoder(t *testing.T) { + cfg := Default() + cfg.Ingest.Icecast.Decoder = "mystery" + if err := cfg.Validate(); err == nil { + t.Fatal("expected decoder error") + } +} + +func TestValidateRejectsReconnectWithMissingBackoff(t *testing.T) { + cfg := Default() + cfg.Ingest.Reconnect.Enabled = true + cfg.Ingest.Reconnect.InitialBackoffMs = 0 + if err := cfg.Validate(); err == nil { + t.Fatal("expected reconnect backoff error") + } +} + +func TestValidateRejectsZeroMpxGain(t *testing.T) { + cfg := Default() + cfg.FM.MpxGain = 0 + if err := cfg.Validate(); err == nil { + t.Fatal("expected mpxGain error") + } +}