From 2c4710cc5582f683a49eb09da65e9327f617cc5f Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sat, 21 Mar 2026 16:29:35 +0100 Subject: [PATCH] feat: add policy-driven pipeline controls --- cmd/sdrd/http_handlers.go | 5 ++ internal/pipeline/policy.go | 93 ++++++++++++++++++++++++++++ internal/pipeline/policy_test.go | 42 +++++++++++++ internal/runtime/runtime.go | 102 ++++++++++++++++++++++++++++--- internal/runtime/runtime_test.go | 15 +++++ 5 files changed, 248 insertions(+), 9 deletions(-) create mode 100644 internal/pipeline/policy.go create mode 100644 internal/pipeline/policy_test.go diff --git a/cmd/sdrd/http_handlers.go b/cmd/sdrd/http_handlers.go index 7dc904e..19a1338 100644 --- a/cmd/sdrd/http_handlers.go +++ b/cmd/sdrd/http_handlers.go @@ -122,6 +122,11 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(gpuState.snapshot()) }) + mux.HandleFunc("/api/pipeline/policy", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + cfg := cfgManager.Snapshot() + _ = json.NewEncoder(w).Encode(pipeline.PolicyFromConfig(cfg)) + }) mux.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") limit := 200 diff --git a/internal/pipeline/policy.go b/internal/pipeline/policy.go new file mode 100644 index 0000000..db94c97 --- /dev/null +++ b/internal/pipeline/policy.go @@ -0,0 +1,93 @@ +package pipeline + +import "sdr-wideband-suite/internal/config" + +type Policy struct { + Mode string `json:"mode"` + SurveillanceFFTSize int `json:"surveillance_fft_size"` + SurveillanceFPS int `json:"surveillance_fps"` + RefinementEnabled bool `json:"refinement_enabled"` + MaxRefinementJobs int `json:"max_refinement_jobs"` + MinCandidateSNRDb float64 `json:"min_candidate_snr_db"` + PreferGPU bool `json:"prefer_gpu"` +} + +func PolicyFromConfig(cfg config.Config) Policy { + return Policy{ + Mode: cfg.Pipeline.Mode, + SurveillanceFFTSize: cfg.Surveillance.AnalysisFFTSize, + SurveillanceFPS: cfg.Surveillance.FrameRate, + RefinementEnabled: cfg.Refinement.Enabled, + MaxRefinementJobs: cfg.Resources.MaxRefinementJobs, + MinCandidateSNRDb: cfg.Refinement.MinCandidateSNRDb, + PreferGPU: cfg.Resources.PreferGPU, + } +} + +func ApplyNamedProfile(cfg *config.Config, name string) { + if cfg == nil || name == "" { + return + } + switch name { + case "legacy": + cfg.Pipeline.Mode = "legacy" + cfg.Surveillance.Strategy = "single-resolution" + cfg.Refinement.Enabled = true + if cfg.Resources.MaxRefinementJobs <= 0 { + cfg.Resources.MaxRefinementJobs = 8 + } + case "wideband-balanced": + cfg.Pipeline.Mode = "wideband-balanced" + cfg.Surveillance.Strategy = "single-resolution" + if cfg.Surveillance.AnalysisFFTSize < 4096 { + cfg.Surveillance.AnalysisFFTSize = 4096 + } + if cfg.FrameRate < 12 { + cfg.FrameRate = 12 + } + if cfg.Surveillance.FrameRate < 12 { + cfg.Surveillance.FrameRate = 12 + } + cfg.Refinement.Enabled = true + if cfg.Refinement.MaxConcurrent < 16 { + cfg.Refinement.MaxConcurrent = 16 + } + if cfg.Resources.MaxRefinementJobs < 16 { + cfg.Resources.MaxRefinementJobs = 16 + } + cfg.Resources.PreferGPU = true + case "wideband-aggressive": + cfg.Pipeline.Mode = "wideband-aggressive" + cfg.Surveillance.Strategy = "single-resolution" + if cfg.Surveillance.AnalysisFFTSize < 8192 { + cfg.Surveillance.AnalysisFFTSize = 8192 + } + if cfg.FrameRate < 10 { + cfg.FrameRate = 10 + } + if cfg.Surveillance.FrameRate < 10 { + cfg.Surveillance.FrameRate = 10 + } + cfg.Refinement.Enabled = true + if cfg.Refinement.MaxConcurrent < 32 { + cfg.Refinement.MaxConcurrent = 32 + } + if cfg.Resources.MaxRefinementJobs < 32 { + cfg.Resources.MaxRefinementJobs = 32 + } + cfg.Resources.PreferGPU = true + case "archive": + cfg.Pipeline.Mode = "archive" + cfg.Refinement.Enabled = true + if cfg.Refinement.MaxConcurrent < 12 { + cfg.Refinement.MaxConcurrent = 12 + } + if cfg.Resources.MaxRefinementJobs < 12 { + cfg.Resources.MaxRefinementJobs = 12 + } + if !cfg.Recorder.Enabled { + cfg.Recorder.Enabled = true + } + } + cfg.FFTSize = cfg.Surveillance.AnalysisFFTSize +} diff --git a/internal/pipeline/policy_test.go b/internal/pipeline/policy_test.go new file mode 100644 index 0000000..8c87271 --- /dev/null +++ b/internal/pipeline/policy_test.go @@ -0,0 +1,42 @@ +package pipeline + +import ( + "testing" + + "sdr-wideband-suite/internal/config" +) + +func TestApplyNamedProfile(t *testing.T) { + cfg := config.Default() + ApplyNamedProfile(&cfg, "wideband-balanced") + if cfg.Pipeline.Mode != "wideband-balanced" { + t.Fatalf("mode not applied: %s", cfg.Pipeline.Mode) + } + if cfg.Surveillance.AnalysisFFTSize < 4096 { + t.Fatalf("analysis fft too small: %d", cfg.Surveillance.AnalysisFFTSize) + } + if !cfg.Refinement.Enabled { + t.Fatalf("refinement should stay enabled") + } + if cfg.Resources.MaxRefinementJobs < 16 { + t.Fatalf("refinement jobs too small: %d", cfg.Resources.MaxRefinementJobs) + } +} + +func TestPolicyFromConfig(t *testing.T) { + cfg := config.Default() + cfg.Pipeline.Mode = "archive" + cfg.Surveillance.AnalysisFFTSize = 8192 + cfg.Surveillance.FrameRate = 9 + cfg.Refinement.Enabled = true + cfg.Resources.MaxRefinementJobs = 5 + cfg.Refinement.MinCandidateSNRDb = 2.5 + cfg.Resources.PreferGPU = true + p := PolicyFromConfig(cfg) + if p.Mode != "archive" || p.SurveillanceFFTSize != 8192 || p.SurveillanceFPS != 9 { + t.Fatalf("unexpected policy: %+v", p) + } + if !p.RefinementEnabled || p.MaxRefinementJobs != 5 || p.MinCandidateSNRDb != 2.5 || !p.PreferGPU { + t.Fatalf("unexpected policy details: %+v", p) + } +} diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index 3dc4d1a..8cb5f2e 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -9,16 +9,42 @@ import ( "sdr-wideband-suite/internal/config" ) +type PipelineUpdate struct { + Mode *string `json:"mode"` +} + +type SurveillanceUpdate struct { + AnalysisFFTSize *int `json:"analysis_fft_size"` + FrameRate *int `json:"frame_rate"` + Strategy *string `json:"strategy"` +} + +type RefinementUpdate struct { + Enabled *bool `json:"enabled"` + MaxConcurrent *int `json:"max_concurrent"` + MinCandidateSNRDb *float64 `json:"min_candidate_snr_db"` +} + +type ResourcesUpdate struct { + PreferGPU *bool `json:"prefer_gpu"` + MaxRefinementJobs *int `json:"max_refinement_jobs"` + MaxRecordingStreams *int `json:"max_recording_streams"` +} + type ConfigUpdate struct { - CenterHz *float64 `json:"center_hz"` - SampleRate *int `json:"sample_rate"` - FFTSize *int `json:"fft_size"` - GainDb *float64 `json:"gain_db"` - TunerBwKHz *int `json:"tuner_bw_khz"` - UseGPUFFT *bool `json:"use_gpu_fft"` - ClassifierMode *string `json:"classifier_mode"` - Detector *DetectorUpdate `json:"detector"` - Recorder *RecorderUpdate `json:"recorder"` + CenterHz *float64 `json:"center_hz"` + SampleRate *int `json:"sample_rate"` + FFTSize *int `json:"fft_size"` + GainDb *float64 `json:"gain_db"` + TunerBwKHz *int `json:"tuner_bw_khz"` + UseGPUFFT *bool `json:"use_gpu_fft"` + ClassifierMode *string `json:"classifier_mode"` + Pipeline *PipelineUpdate `json:"pipeline"` + Surveillance *SurveillanceUpdate `json:"surveillance"` + Refinement *RefinementUpdate `json:"refinement"` + Resources *ResourcesUpdate `json:"resources"` + Detector *DetectorUpdate `json:"detector"` + Recorder *RecorderUpdate `json:"recorder"` } type DetectorUpdate struct { @@ -134,6 +160,64 @@ func (m *Manager) ApplyConfig(update ConfigUpdate) (config.Config, error) { return m.cfg, errors.New("classifier_mode must be rule, math, or combined") } } + if update.Pipeline != nil && update.Pipeline.Mode != nil { + next.Pipeline.Mode = *update.Pipeline.Mode + } + if update.Surveillance != nil { + if update.Surveillance.AnalysisFFTSize != nil { + v := *update.Surveillance.AnalysisFFTSize + if v <= 0 { + return m.cfg, errors.New("surveillance.analysis_fft_size must be > 0") + } + if v&(v-1) != 0 { + return m.cfg, errors.New("surveillance.analysis_fft_size must be a power of 2") + } + next.Surveillance.AnalysisFFTSize = v + next.FFTSize = v + } + if update.Surveillance.FrameRate != nil { + v := *update.Surveillance.FrameRate + if v <= 0 { + return m.cfg, errors.New("surveillance.frame_rate must be > 0") + } + next.Surveillance.FrameRate = v + next.FrameRate = v + } + if update.Surveillance.Strategy != nil { + next.Surveillance.Strategy = *update.Surveillance.Strategy + } + } + if update.Refinement != nil { + if update.Refinement.Enabled != nil { + next.Refinement.Enabled = *update.Refinement.Enabled + } + if update.Refinement.MaxConcurrent != nil { + if *update.Refinement.MaxConcurrent <= 0 { + return m.cfg, errors.New("refinement.max_concurrent must be > 0") + } + next.Refinement.MaxConcurrent = *update.Refinement.MaxConcurrent + } + if update.Refinement.MinCandidateSNRDb != nil { + next.Refinement.MinCandidateSNRDb = *update.Refinement.MinCandidateSNRDb + } + } + if update.Resources != nil { + if update.Resources.PreferGPU != nil { + next.Resources.PreferGPU = *update.Resources.PreferGPU + } + if update.Resources.MaxRefinementJobs != nil { + if *update.Resources.MaxRefinementJobs <= 0 { + return m.cfg, errors.New("resources.max_refinement_jobs must be > 0") + } + next.Resources.MaxRefinementJobs = *update.Resources.MaxRefinementJobs + } + if update.Resources.MaxRecordingStreams != nil { + if *update.Resources.MaxRecordingStreams <= 0 { + return m.cfg, errors.New("resources.max_recording_streams must be > 0") + } + next.Resources.MaxRecordingStreams = *update.Resources.MaxRecordingStreams + } + } if update.Detector != nil { if update.Detector.ThresholdDb != nil { next.Detector.ThresholdDb = *update.Detector.ThresholdDb diff --git a/internal/runtime/runtime_test.go b/internal/runtime/runtime_test.go index 10a3213..2bc5171 100644 --- a/internal/runtime/runtime_test.go +++ b/internal/runtime/runtime_test.go @@ -22,11 +22,17 @@ func TestApplyConfigUpdate(t *testing.T) { cfarRank := 18 cfarScale := 5.5 + mode := "wideband-balanced" + survFPS := 12 + maxRefJobs := 24 updated, err := mgr.ApplyConfig(ConfigUpdate{ CenterHz: ¢er, SampleRate: &sampleRate, FFTSize: &fftSize, TunerBwKHz: &bw, + Pipeline: &PipelineUpdate{Mode: &mode}, + Surveillance: &SurveillanceUpdate{FrameRate: &survFPS}, + Resources: &ResourcesUpdate{MaxRefinementJobs: &maxRefJobs}, Detector: &DetectorUpdate{ ThresholdDb: &threshold, CFARMode: &cfarMode, @@ -76,6 +82,15 @@ func TestApplyConfigUpdate(t *testing.T) { if updated.TunerBwKHz != bw { t.Fatalf("tuner bw: %v", updated.TunerBwKHz) } + if updated.Pipeline.Mode != mode { + t.Fatalf("pipeline mode: %v", updated.Pipeline.Mode) + } + if updated.Surveillance.FrameRate != survFPS || updated.FrameRate != survFPS { + t.Fatalf("surveillance frame rate: %v / %v", updated.Surveillance.FrameRate, updated.FrameRate) + } + if updated.Resources.MaxRefinementJobs != maxRefJobs { + t.Fatalf("max refinement jobs: %v", updated.Resources.MaxRefinementJobs) + } } func TestApplyConfigRejectsInvalid(t *testing.T) {