浏览代码

feat: add policy-driven pipeline controls

master
Jan Svabenik 13 小时前
父节点
当前提交
2c4710cc55
共有 5 个文件被更改,包括 248 次插入9 次删除
  1. +5
    -0
      cmd/sdrd/http_handlers.go
  2. +93
    -0
      internal/pipeline/policy.go
  3. +42
    -0
      internal/pipeline/policy_test.go
  4. +93
    -9
      internal/runtime/runtime.go
  5. +15
    -0
      internal/runtime/runtime_test.go

+ 5
- 0
cmd/sdrd/http_handlers.go 查看文件

@@ -122,6 +122,11 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(gpuState.snapshot())
})
mux.HandleFunc("/api/pipeline/policy", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
cfg := cfgManager.Snapshot()
_ = json.NewEncoder(w).Encode(pipeline.PolicyFromConfig(cfg))
})
mux.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
limit := 200


+ 93
- 0
internal/pipeline/policy.go 查看文件

@@ -0,0 +1,93 @@
package pipeline

import "sdr-wideband-suite/internal/config"

type Policy struct {
Mode string `json:"mode"`
SurveillanceFFTSize int `json:"surveillance_fft_size"`
SurveillanceFPS int `json:"surveillance_fps"`
RefinementEnabled bool `json:"refinement_enabled"`
MaxRefinementJobs int `json:"max_refinement_jobs"`
MinCandidateSNRDb float64 `json:"min_candidate_snr_db"`
PreferGPU bool `json:"prefer_gpu"`
}

func PolicyFromConfig(cfg config.Config) Policy {
return Policy{
Mode: cfg.Pipeline.Mode,
SurveillanceFFTSize: cfg.Surveillance.AnalysisFFTSize,
SurveillanceFPS: cfg.Surveillance.FrameRate,
RefinementEnabled: cfg.Refinement.Enabled,
MaxRefinementJobs: cfg.Resources.MaxRefinementJobs,
MinCandidateSNRDb: cfg.Refinement.MinCandidateSNRDb,
PreferGPU: cfg.Resources.PreferGPU,
}
}

func ApplyNamedProfile(cfg *config.Config, name string) {
if cfg == nil || name == "" {
return
}
switch name {
case "legacy":
cfg.Pipeline.Mode = "legacy"
cfg.Surveillance.Strategy = "single-resolution"
cfg.Refinement.Enabled = true
if cfg.Resources.MaxRefinementJobs <= 0 {
cfg.Resources.MaxRefinementJobs = 8
}
case "wideband-balanced":
cfg.Pipeline.Mode = "wideband-balanced"
cfg.Surveillance.Strategy = "single-resolution"
if cfg.Surveillance.AnalysisFFTSize < 4096 {
cfg.Surveillance.AnalysisFFTSize = 4096
}
if cfg.FrameRate < 12 {
cfg.FrameRate = 12
}
if cfg.Surveillance.FrameRate < 12 {
cfg.Surveillance.FrameRate = 12
}
cfg.Refinement.Enabled = true
if cfg.Refinement.MaxConcurrent < 16 {
cfg.Refinement.MaxConcurrent = 16
}
if cfg.Resources.MaxRefinementJobs < 16 {
cfg.Resources.MaxRefinementJobs = 16
}
cfg.Resources.PreferGPU = true
case "wideband-aggressive":
cfg.Pipeline.Mode = "wideband-aggressive"
cfg.Surveillance.Strategy = "single-resolution"
if cfg.Surveillance.AnalysisFFTSize < 8192 {
cfg.Surveillance.AnalysisFFTSize = 8192
}
if cfg.FrameRate < 10 {
cfg.FrameRate = 10
}
if cfg.Surveillance.FrameRate < 10 {
cfg.Surveillance.FrameRate = 10
}
cfg.Refinement.Enabled = true
if cfg.Refinement.MaxConcurrent < 32 {
cfg.Refinement.MaxConcurrent = 32
}
if cfg.Resources.MaxRefinementJobs < 32 {
cfg.Resources.MaxRefinementJobs = 32
}
cfg.Resources.PreferGPU = true
case "archive":
cfg.Pipeline.Mode = "archive"
cfg.Refinement.Enabled = true
if cfg.Refinement.MaxConcurrent < 12 {
cfg.Refinement.MaxConcurrent = 12
}
if cfg.Resources.MaxRefinementJobs < 12 {
cfg.Resources.MaxRefinementJobs = 12
}
if !cfg.Recorder.Enabled {
cfg.Recorder.Enabled = true
}
}
cfg.FFTSize = cfg.Surveillance.AnalysisFFTSize
}

+ 42
- 0
internal/pipeline/policy_test.go 查看文件

@@ -0,0 +1,42 @@
package pipeline

import (
"testing"

"sdr-wideband-suite/internal/config"
)

func TestApplyNamedProfile(t *testing.T) {
cfg := config.Default()
ApplyNamedProfile(&cfg, "wideband-balanced")
if cfg.Pipeline.Mode != "wideband-balanced" {
t.Fatalf("mode not applied: %s", cfg.Pipeline.Mode)
}
if cfg.Surveillance.AnalysisFFTSize < 4096 {
t.Fatalf("analysis fft too small: %d", cfg.Surveillance.AnalysisFFTSize)
}
if !cfg.Refinement.Enabled {
t.Fatalf("refinement should stay enabled")
}
if cfg.Resources.MaxRefinementJobs < 16 {
t.Fatalf("refinement jobs too small: %d", cfg.Resources.MaxRefinementJobs)
}
}

func TestPolicyFromConfig(t *testing.T) {
cfg := config.Default()
cfg.Pipeline.Mode = "archive"
cfg.Surveillance.AnalysisFFTSize = 8192
cfg.Surveillance.FrameRate = 9
cfg.Refinement.Enabled = true
cfg.Resources.MaxRefinementJobs = 5
cfg.Refinement.MinCandidateSNRDb = 2.5
cfg.Resources.PreferGPU = true
p := PolicyFromConfig(cfg)
if p.Mode != "archive" || p.SurveillanceFFTSize != 8192 || p.SurveillanceFPS != 9 {
t.Fatalf("unexpected policy: %+v", p)
}
if !p.RefinementEnabled || p.MaxRefinementJobs != 5 || p.MinCandidateSNRDb != 2.5 || !p.PreferGPU {
t.Fatalf("unexpected policy details: %+v", p)
}
}

+ 93
- 9
internal/runtime/runtime.go 查看文件

@@ -9,16 +9,42 @@ import (
"sdr-wideband-suite/internal/config"
)

type PipelineUpdate struct {
Mode *string `json:"mode"`
}

type SurveillanceUpdate struct {
AnalysisFFTSize *int `json:"analysis_fft_size"`
FrameRate *int `json:"frame_rate"`
Strategy *string `json:"strategy"`
}

type RefinementUpdate struct {
Enabled *bool `json:"enabled"`
MaxConcurrent *int `json:"max_concurrent"`
MinCandidateSNRDb *float64 `json:"min_candidate_snr_db"`
}

type ResourcesUpdate struct {
PreferGPU *bool `json:"prefer_gpu"`
MaxRefinementJobs *int `json:"max_refinement_jobs"`
MaxRecordingStreams *int `json:"max_recording_streams"`
}

type ConfigUpdate struct {
CenterHz *float64 `json:"center_hz"`
SampleRate *int `json:"sample_rate"`
FFTSize *int `json:"fft_size"`
GainDb *float64 `json:"gain_db"`
TunerBwKHz *int `json:"tuner_bw_khz"`
UseGPUFFT *bool `json:"use_gpu_fft"`
ClassifierMode *string `json:"classifier_mode"`
Detector *DetectorUpdate `json:"detector"`
Recorder *RecorderUpdate `json:"recorder"`
CenterHz *float64 `json:"center_hz"`
SampleRate *int `json:"sample_rate"`
FFTSize *int `json:"fft_size"`
GainDb *float64 `json:"gain_db"`
TunerBwKHz *int `json:"tuner_bw_khz"`
UseGPUFFT *bool `json:"use_gpu_fft"`
ClassifierMode *string `json:"classifier_mode"`
Pipeline *PipelineUpdate `json:"pipeline"`
Surveillance *SurveillanceUpdate `json:"surveillance"`
Refinement *RefinementUpdate `json:"refinement"`
Resources *ResourcesUpdate `json:"resources"`
Detector *DetectorUpdate `json:"detector"`
Recorder *RecorderUpdate `json:"recorder"`
}

type DetectorUpdate struct {
@@ -134,6 +160,64 @@ func (m *Manager) ApplyConfig(update ConfigUpdate) (config.Config, error) {
return m.cfg, errors.New("classifier_mode must be rule, math, or combined")
}
}
if update.Pipeline != nil && update.Pipeline.Mode != nil {
next.Pipeline.Mode = *update.Pipeline.Mode
}
if update.Surveillance != nil {
if update.Surveillance.AnalysisFFTSize != nil {
v := *update.Surveillance.AnalysisFFTSize
if v <= 0 {
return m.cfg, errors.New("surveillance.analysis_fft_size must be > 0")
}
if v&(v-1) != 0 {
return m.cfg, errors.New("surveillance.analysis_fft_size must be a power of 2")
}
next.Surveillance.AnalysisFFTSize = v
next.FFTSize = v
}
if update.Surveillance.FrameRate != nil {
v := *update.Surveillance.FrameRate
if v <= 0 {
return m.cfg, errors.New("surveillance.frame_rate must be > 0")
}
next.Surveillance.FrameRate = v
next.FrameRate = v
}
if update.Surveillance.Strategy != nil {
next.Surveillance.Strategy = *update.Surveillance.Strategy
}
}
if update.Refinement != nil {
if update.Refinement.Enabled != nil {
next.Refinement.Enabled = *update.Refinement.Enabled
}
if update.Refinement.MaxConcurrent != nil {
if *update.Refinement.MaxConcurrent <= 0 {
return m.cfg, errors.New("refinement.max_concurrent must be > 0")
}
next.Refinement.MaxConcurrent = *update.Refinement.MaxConcurrent
}
if update.Refinement.MinCandidateSNRDb != nil {
next.Refinement.MinCandidateSNRDb = *update.Refinement.MinCandidateSNRDb
}
}
if update.Resources != nil {
if update.Resources.PreferGPU != nil {
next.Resources.PreferGPU = *update.Resources.PreferGPU
}
if update.Resources.MaxRefinementJobs != nil {
if *update.Resources.MaxRefinementJobs <= 0 {
return m.cfg, errors.New("resources.max_refinement_jobs must be > 0")
}
next.Resources.MaxRefinementJobs = *update.Resources.MaxRefinementJobs
}
if update.Resources.MaxRecordingStreams != nil {
if *update.Resources.MaxRecordingStreams <= 0 {
return m.cfg, errors.New("resources.max_recording_streams must be > 0")
}
next.Resources.MaxRecordingStreams = *update.Resources.MaxRecordingStreams
}
}
if update.Detector != nil {
if update.Detector.ThresholdDb != nil {
next.Detector.ThresholdDb = *update.Detector.ThresholdDb


+ 15
- 0
internal/runtime/runtime_test.go 查看文件

@@ -22,11 +22,17 @@ func TestApplyConfigUpdate(t *testing.T) {
cfarRank := 18
cfarScale := 5.5

mode := "wideband-balanced"
survFPS := 12
maxRefJobs := 24
updated, err := mgr.ApplyConfig(ConfigUpdate{
CenterHz: &center,
SampleRate: &sampleRate,
FFTSize: &fftSize,
TunerBwKHz: &bw,
Pipeline: &PipelineUpdate{Mode: &mode},
Surveillance: &SurveillanceUpdate{FrameRate: &survFPS},
Resources: &ResourcesUpdate{MaxRefinementJobs: &maxRefJobs},
Detector: &DetectorUpdate{
ThresholdDb: &threshold,
CFARMode: &cfarMode,
@@ -76,6 +82,15 @@ func TestApplyConfigUpdate(t *testing.T) {
if updated.TunerBwKHz != bw {
t.Fatalf("tuner bw: %v", updated.TunerBwKHz)
}
if updated.Pipeline.Mode != mode {
t.Fatalf("pipeline mode: %v", updated.Pipeline.Mode)
}
if updated.Surveillance.FrameRate != survFPS || updated.FrameRate != survFPS {
t.Fatalf("surveillance frame rate: %v / %v", updated.Surveillance.FrameRate, updated.FrameRate)
}
if updated.Resources.MaxRefinementJobs != maxRefJobs {
t.Fatalf("max refinement jobs: %v", updated.Resources.MaxRefinementJobs)
}
}

func TestApplyConfigRejectsInvalid(t *testing.T) {


正在加载...
取消
保存