diff --git a/README.md b/README.md index e4260de..59305f1 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ Edit `config.yaml` (autosave goes to `config.autosave.yaml`). ### New phase-1 pipeline fields - `pipeline.mode` — operating mode label (`legacy`, `wideband-balanced`, ...) +- `pipeline.profile` — last applied operating profile name (if any) - `pipeline.goals.*` — declarative target/intent layer for future autonomous operation - `intent` - `monitor_start_hz` / `monitor_end_hz` / `monitor_span_hz` @@ -64,7 +65,7 @@ Edit `config.yaml` (autosave goes to `config.autosave.yaml`). - `auto_decode_classes` - `surveillance.analysis_fft_size` — analysis FFT size used by the surveillance layer - `surveillance.frame_rate` — surveillance cadence target -- `surveillance.strategy` — currently `single-resolution`, reserved for future multi-resolution modes +- `surveillance.strategy` — `single-resolution` or `multi-resolution` - `surveillance.display_bins` — preferred presentation density for clients/UI - `surveillance.display_fps` — preferred presentation cadence for clients/UI - `refinement.enabled` — enables explicit candidate refinement stage @@ -74,9 +75,11 @@ Edit `config.yaml` (autosave goes to `config.autosave.yaml`). - `refinement.auto_span` — use mod-type heuristics when candidate bandwidth is missing/odd - `resources.prefer_gpu` — GPU preference hint -**Profile defaults (wideband)** -- `wideband-balanced`: min_span_hz=4000, max_span_hz=200000 -- `wideband-aggressive`: min_span_hz=6000, max_span_hz=250000 +**Operating profiles (wideband)** +- `wideband-balanced`: multi-resolution, 4096 FFT, refinement span 4000-200000 Hz +- `wideband-aggressive`: multi-resolution, 8192 FFT, refinement span 6000-250000 Hz +- `archive`: record-forward bias, higher record/decode budgets +- `digital-hunting`: digital-first priorities and decode bias - `resources.max_refinement_jobs` — processing budget hint - `resources.max_recording_streams` — recording/streaming budget hint - `resources.max_decode_jobs` — decode budget hint @@ -155,7 +158,7 @@ go build -tags sdrplay ./cmd/sdrd - `GET /api/gpu` - `GET /api/pipeline/policy` - `GET /api/pipeline/recommendations` -- `GET /api/refinement` → latest refinement plan/windows snapshot (includes `window_stats`, `queue_stats`, `decision_summary`, `decision_items`, levels) +- `GET /api/refinement` → latest refinement plan/windows snapshot (includes `window_stats`, `queue_stats`, `decision_summary`, `decision_items`, levels, request/context/budgets/work_items) ### Signals / Events - `GET /api/signals` → current live signals diff --git a/cmd/sdrd/decision_budget.go b/cmd/sdrd/decision_budget.go index cda24e6..261faa7 100644 --- a/cmd/sdrd/decision_budget.go +++ b/cmd/sdrd/decision_budget.go @@ -16,6 +16,11 @@ type decisionQueueStats struct { DecodeActive int `json:"decode_active"` RecordOldestS float64 `json:"record_oldest_sec"` DecodeOldestS float64 `json:"decode_oldest_sec"` + RecordBudget int `json:"record_budget"` + DecodeBudget int `json:"decode_budget"` + HoldMs int `json:"hold_ms"` + RecordDropped int `json:"record_dropped"` + DecodeDropped int `json:"decode_dropped"` } type queuedDecision struct { @@ -43,10 +48,11 @@ func newDecisionQueues() *decisionQueues { } } -func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int, hold time.Duration, now time.Time, policy pipeline.Policy) decisionQueueStats { +func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipeline.BudgetModel, now time.Time, policy pipeline.Policy) decisionQueueStats { if dq == nil { return decisionQueueStats{} } + hold := time.Duration(budget.HoldMs) * time.Millisecond recSeen := map[int64]bool{} decSeen := map[int64]bool{} for i := range decisions { @@ -93,8 +99,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i purgeExpired(dq.recordHold, now) purgeExpired(dq.decodeHold, now) - recSelected := selectQueued("record", dq.record, dq.recordHold, maxRecord, hold, now, policy) - decSelected := selectQueued("decode", dq.decode, dq.decodeHold, maxDecode, hold, now, policy) + recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, hold, now, policy) + decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, hold, now, policy) stats := decisionQueueStats{ RecordQueued: len(dq.record), @@ -105,6 +111,9 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i DecodeActive: len(dq.decodeHold), RecordOldestS: oldestAge(dq.record, now), DecodeOldestS: oldestAge(dq.decode, now), + RecordBudget: budget.Record.Max, + DecodeBudget: budget.Decode.Max, + HoldMs: budget.HoldMs, } for i := range decisions { @@ -113,6 +122,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i if _, ok := recSelected[id]; !ok { decisions[i].ShouldRecord = false decisions[i].Reason = "queued: record budget" + stats.RecordDropped++ } } if decisions[i].ShouldAutoDecode { @@ -121,6 +131,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i if decisions[i].Reason == "" { decisions[i].Reason = "queued: decode budget" } + stats.DecodeDropped++ } } } diff --git a/cmd/sdrd/decision_budget_test.go b/cmd/sdrd/decision_budget_test.go index 5a674cc..5a2eeb1 100644 --- a/cmd/sdrd/decision_budget_test.go +++ b/cmd/sdrd/decision_budget_test.go @@ -14,7 +14,9 @@ func TestEnforceDecisionBudgets(t *testing.T) { {Candidate: pipeline.Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false}, } q := newDecisionQueues() - stats := q.Apply(decisions, 1, 1, 0, time.Now(), pipeline.Policy{SignalPriorities: []string{"digital"}}) + policy := pipeline.Policy{SignalPriorities: []string{"digital"}, MaxRecordingStreams: 1, MaxDecodeJobs: 1} + budget := pipeline.BudgetModelFromPolicy(policy) + stats := q.Apply(decisions, budget, time.Now(), policy) if stats.RecordSelected != 1 || stats.DecodeSelected != 1 { t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected) } diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index af888cb..aca74e2 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -93,6 +93,8 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * state.queueStats = rt.queueStats state.presentation = pipeline.AnalysisLevel{ Name: "presentation", + Role: "presentation", + Truth: "presentation", SampleRate: rt.cfg.SampleRate, FFTSize: rt.cfg.Surveillance.DisplayBins, CenterHz: rt.cfg.CenterHz, @@ -147,6 +149,24 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * if hasWindows { debugInfo.Windows = windowStats } + refinementDebug := &RefinementDebug{} + if hasPlan { + refinementDebug.Plan = &plan + refinementDebug.Request = &state.refinement.Input.Request + refinementDebug.WorkItems = state.refinement.Input.WorkItems + } + if hasWindows { + refinementDebug.Windows = windowStats + } + refinementDebug.Queue = state.queueStats + policy := pipeline.PolicyFromConfig(rt.cfg) + budget := pipeline.BudgetModelFromPolicy(policy) + refinementDebug.Budgets = &budget + debugInfo.Refinement = refinementDebug + debugInfo.Decisions = &DecisionDebug{ + Summary: summarizeDecisions(state.refinement.Result.Decisions), + Items: compactDecisions(state.refinement.Result.Decisions), + } } h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.surveillanceSpectrum, Signals: displaySignals, Debug: debugInfo}) } diff --git a/cmd/sdrd/http_handlers.go b/cmd/sdrd/http_handlers.go index 303ceed..64e88e9 100644 --- a/cmd/sdrd/http_handlers.go +++ b/cmd/sdrd/http_handlers.go @@ -137,9 +137,13 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime w.Header().Set("Content-Type", "application/json") cfg := cfgManager.Snapshot() policy := pipeline.PolicyFromConfig(cfg) + budget := pipeline.BudgetModelFromPolicy(policy) recommend := map[string]any{ + "profile": policy.Profile, "mode": policy.Mode, "intent": policy.Intent, + "surveillance_strategy": policy.SurveillanceStrategy, + "refinement_strategy": policy.RefinementStrategy, "monitor_center_hz": policy.MonitorCenterHz, "monitor_start_hz": policy.MonitorStartHz, "monitor_end_hz": policy.MonitorEndHz, @@ -151,6 +155,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime "refinement_auto_span": policy.RefinementAutoSpan, "refinement_min_span_hz": policy.RefinementMinSpanHz, "refinement_max_span_hz": policy.RefinementMaxSpanHz, + "budgets": budget, } _ = json.NewEncoder(w).Encode(recommend) }) @@ -163,6 +168,11 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime "windows": snap.refinement.Input.Windows, "window_stats": windowStats, "queue_stats": snap.queueStats, + "request": snap.refinement.Input.Request, + "context": snap.refinement.Input.Context, + "detail_level": snap.refinement.Input.Detail, + "budgets": snap.refinement.Input.Budgets, + "work_items": snap.refinement.Input.WorkItems, "candidates": len(snap.refinement.Input.Candidates), "scheduled": len(snap.refinement.Input.Scheduled), "signals": len(snap.refinement.Result.Signals), diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index 4a8c72b..9b5628a 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -241,6 +241,8 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S scheduled := pipeline.ScheduleCandidates(candidates, policy) level := pipeline.AnalysisLevel{ Name: "surveillance", + Role: "surveillance", + Truth: "surveillance", SampleRate: rt.cfg.SampleRate, FFTSize: rt.cfg.Surveillance.AnalysisFFTSize, CenterHz: rt.cfg.CenterHz, @@ -257,6 +259,8 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S } lowLevel := pipeline.AnalysisLevel{ Name: "surveillance-lowres", + Role: "surveillance-lowres", + Truth: "surveillance", SampleRate: lowRate, FFTSize: lowFFT, CenterHz: rt.cfg.CenterHz, @@ -265,17 +269,20 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S } displayLevel := pipeline.AnalysisLevel{ Name: "presentation", + Role: "presentation", + Truth: "presentation", SampleRate: rt.cfg.SampleRate, FFTSize: rt.cfg.Surveillance.DisplayBins, CenterHz: rt.cfg.CenterHz, SpanHz: spanForPolicy(policy, float64(rt.cfg.SampleRate)), Source: "display", } - levels := surveillanceLevels(policy, level, lowLevel) + levels, context := surveillanceLevels(policy, level, lowLevel, displayLevel) return pipeline.SurveillanceResult{ Level: level, Levels: levels, DisplayLevel: displayLevel, + Context: context, Candidates: candidates, Scheduled: scheduled, Finished: art.finished, @@ -292,9 +299,24 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip if len(scheduled) == 0 && len(plan.Selected) > 0 { scheduled = append([]pipeline.ScheduledCandidate(nil), plan.Selected...) } + workItems := make([]pipeline.RefinementWorkItem, 0, len(plan.WorkItems)) + if len(plan.WorkItems) > 0 { + workItems = append(workItems, plan.WorkItems...) + } + workIndex := map[int64]int{} + for i := range workItems { + if workItems[i].Candidate.ID == 0 { + continue + } + workIndex[workItems[i].Candidate.ID] = i + } windows := make([]pipeline.RefinementWindow, 0, len(scheduled)) for _, sc := range scheduled { - windows = append(windows, pipeline.RefinementWindowForCandidate(policy, sc.Candidate)) + window := pipeline.RefinementWindowForCandidate(policy, sc.Candidate) + windows = append(windows, window) + if idx, ok := workIndex[sc.Candidate.ID]; ok { + workItems[idx].Window = window + } } levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate)) if _, maxSpan, ok := windowSpanBounds(windows); ok { @@ -302,16 +324,33 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip } level := pipeline.AnalysisLevel{ Name: "refinement", + Role: "refinement", + Truth: "refinement", SampleRate: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, CenterHz: rt.cfg.CenterHz, SpanHz: levelSpan, Source: "refinement-window", } + detailLevel := pipeline.AnalysisLevel{ + Name: "detail", + Role: "detail", + Truth: "refinement", + SampleRate: rt.cfg.SampleRate, + FFTSize: rt.cfg.FFTSize, + CenterHz: rt.cfg.CenterHz, + SpanHz: levelSpan, + Source: "detail-spectrum", + } input := pipeline.RefinementInput{ Level: level, + Detail: detailLevel, + Context: surv.Context, + Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan}, + Budgets: pipeline.BudgetModelFromPolicy(policy), Candidates: append([]pipeline.Candidate(nil), surv.Candidates...), Scheduled: scheduled, + WorkItems: workItems, Plan: plan, Windows: windows, SampleRate: rt.cfg.SampleRate, @@ -319,8 +358,12 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip CenterHz: rt.cfg.CenterHz, Source: "surveillance-detector", } + input.Context.Refinement = level + input.Context.Detail = detailLevel if !policy.RefinementEnabled { input.Scheduled = nil + input.WorkItems = nil + input.Request.Reason = pipeline.RefinementReasonDisabled } return input } @@ -389,10 +432,8 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin } } } - maxRecord := rt.cfg.Resources.MaxRecordingStreams - maxDecode := rt.cfg.Resources.MaxDecodeJobs - hold := time.Duration(rt.cfg.Resources.DecisionHoldMs) * time.Millisecond - queueStats := rt.decisionQueues.Apply(decisions, maxRecord, maxDecode, hold, art.now, policy) + budget := pipeline.BudgetModelFromPolicy(policy) + queueStats := rt.decisionQueues.Apply(decisions, budget, art.now, policy) rt.queueStats = queueStats summary := summarizeDecisions(decisions) if rec != nil { @@ -531,16 +572,21 @@ func windowSpanBounds(windows []pipeline.RefinementWindow) (float64, float64, bo return minSpan, maxSpan, ok } -func surveillanceLevels(policy pipeline.Policy, primary pipeline.AnalysisLevel, secondary pipeline.AnalysisLevel) []pipeline.AnalysisLevel { +func surveillanceLevels(policy pipeline.Policy, primary pipeline.AnalysisLevel, secondary pipeline.AnalysisLevel, presentation pipeline.AnalysisLevel) ([]pipeline.AnalysisLevel, pipeline.AnalysisContext) { levels := []pipeline.AnalysisLevel{primary} + context := pipeline.AnalysisContext{ + Surveillance: primary, + Presentation: presentation, + } strategy := strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy)) switch strategy { case "multi-res", "multi-resolution", "multi", "multi_res": if secondary.SampleRate != primary.SampleRate || secondary.FFTSize != primary.FFTSize { levels = append(levels, secondary) + context.Derived = append(context.Derived, secondary) } } - return levels + return levels, context } func sameIQBuffer(a []complex64, b []complex64) bool { diff --git a/cmd/sdrd/pipeline_runtime_test.go b/cmd/sdrd/pipeline_runtime_test.go index 0067dbf..447262f 100644 --- a/cmd/sdrd/pipeline_runtime_test.go +++ b/cmd/sdrd/pipeline_runtime_test.go @@ -47,12 +47,13 @@ func TestSurveillanceLevelsRespectStrategy(t *testing.T) { policy := pipeline.Policy{SurveillanceStrategy: "single-resolution"} primary := pipeline.AnalysisLevel{Name: "primary", SampleRate: 2000000, FFTSize: 2048} secondary := pipeline.AnalysisLevel{Name: "secondary", SampleRate: 1000000, FFTSize: 1024} - levels := surveillanceLevels(policy, primary, secondary) + presentation := pipeline.AnalysisLevel{Name: "presentation", SampleRate: 2000000, FFTSize: 2048} + levels, _ := surveillanceLevels(policy, primary, secondary, presentation) if len(levels) != 1 { t.Fatalf("expected single level for single-resolution, got %d", len(levels)) } policy.SurveillanceStrategy = "multi-res" - levels = surveillanceLevels(policy, primary, secondary) + levels, _ = surveillanceLevels(policy, primary, secondary, presentation) if len(levels) != 2 { t.Fatalf("expected secondary level for multi-res, got %d", len(levels)) } diff --git a/cmd/sdrd/types.go b/cmd/sdrd/types.go index 8841abc..57a02b0 100644 --- a/cmd/sdrd/types.go +++ b/cmd/sdrd/types.go @@ -19,6 +19,8 @@ type SpectrumDebug struct { Scores []map[string]any `json:"scores,omitempty"` RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` Windows *RefinementWindowStats `json:"refinement_windows,omitempty"` + Refinement *RefinementDebug `json:"refinement,omitempty"` + Decisions *DecisionDebug `json:"decisions,omitempty"` } type RefinementWindowStats struct { @@ -29,6 +31,20 @@ type RefinementWindowStats struct { Sources map[string]int `json:"sources,omitempty"` } +type RefinementDebug struct { + Plan *pipeline.RefinementPlan `json:"plan,omitempty"` + Request *pipeline.RefinementRequest `json:"request,omitempty"` + WorkItems []pipeline.RefinementWorkItem `json:"work_items,omitempty"` + Windows *RefinementWindowStats `json:"windows,omitempty"` + Queue decisionQueueStats `json:"queue,omitempty"` + Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` +} + +type DecisionDebug struct { + Summary decisionSummary `json:"summary"` + Items []compactDecision `json:"items,omitempty"` +} + type SpectrumFrame struct { Timestamp int64 `json:"ts"` CenterHz float64 `json:"center_hz"` diff --git a/internal/config/config.go b/internal/config/config.go index 75e580c..96e8df0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -81,8 +81,9 @@ type PipelineGoalConfig struct { } type PipelineConfig struct { - Mode string `yaml:"mode" json:"mode"` - Goals PipelineGoalConfig `yaml:"goals" json:"goals"` + Mode string `yaml:"mode" json:"mode"` + Profile string `yaml:"profile,omitempty" json:"profile,omitempty"` + Goals PipelineGoalConfig `yaml:"goals" json:"goals"` } type SurveillanceConfig struct { @@ -190,10 +191,153 @@ func Default() Config { DecisionHoldMs: 2000, }, Profiles: []ProfileConfig{ - {Name: "legacy", Description: "Current single-band pipeline behavior", Pipeline: &PipelineConfig{Mode: "legacy", Goals: PipelineGoalConfig{Intent: "general-monitoring"}}}, - {Name: "wideband-balanced", Description: "Prepared baseline for scalable wideband surveillance", Pipeline: &PipelineConfig{Mode: "wideband-balanced", Goals: PipelineGoalConfig{Intent: "wideband-surveillance"}}}, - {Name: "wideband-aggressive", Description: "Higher surveillance/refinement budgets for future broad-span monitoring", Pipeline: &PipelineConfig{Mode: "wideband-aggressive", Goals: PipelineGoalConfig{Intent: "high-density-wideband-surveillance"}}}, - {Name: "archive", Description: "Record-first monitoring profile", Pipeline: &PipelineConfig{Mode: "archive", Goals: PipelineGoalConfig{Intent: "archive-and-triage"}}}, + { + Name: "legacy", + Description: "Current single-band pipeline behavior", + Pipeline: &PipelineConfig{Mode: "legacy", Profile: "legacy", Goals: PipelineGoalConfig{Intent: "general-monitoring"}}, + Surveillance: &SurveillanceConfig{ + AnalysisFFTSize: 2048, + FrameRate: 15, + Strategy: "single-resolution", + DisplayBins: 2048, + DisplayFPS: 15, + }, + Refinement: &RefinementConfig{ + Enabled: true, + MaxConcurrent: 8, + MinCandidateSNRDb: 0, + MinSpanHz: 0, + MaxSpanHz: 0, + AutoSpan: boolPtr(true), + }, + Resources: &ResourceConfig{ + PreferGPU: false, + MaxRefinementJobs: 8, + MaxRecordingStreams: 16, + MaxDecodeJobs: 16, + DecisionHoldMs: 2000, + }, + }, + { + Name: "wideband-balanced", + Description: "Baseline multi-resolution wideband surveillance", + Pipeline: &PipelineConfig{Mode: "wideband-balanced", Profile: "wideband-balanced", Goals: PipelineGoalConfig{ + Intent: "wideband-surveillance", + SignalPriorities: []string{"digital", "wfm"}, + }}, + Surveillance: &SurveillanceConfig{ + AnalysisFFTSize: 4096, + FrameRate: 12, + Strategy: "multi-resolution", + DisplayBins: 2048, + DisplayFPS: 12, + }, + Refinement: &RefinementConfig{ + Enabled: true, + MaxConcurrent: 16, + MinCandidateSNRDb: 0, + MinSpanHz: 4000, + MaxSpanHz: 200000, + AutoSpan: boolPtr(true), + }, + Resources: &ResourceConfig{ + PreferGPU: true, + MaxRefinementJobs: 16, + MaxRecordingStreams: 16, + MaxDecodeJobs: 12, + DecisionHoldMs: 2000, + }, + }, + { + Name: "wideband-aggressive", + Description: "Higher surveillance/refinement budgets for dense wideband monitoring", + Pipeline: &PipelineConfig{Mode: "wideband-aggressive", Profile: "wideband-aggressive", Goals: PipelineGoalConfig{ + Intent: "high-density-wideband-surveillance", + SignalPriorities: []string{"digital", "wfm", "trunk"}, + }}, + Surveillance: &SurveillanceConfig{ + AnalysisFFTSize: 8192, + FrameRate: 10, + Strategy: "multi-resolution", + DisplayBins: 4096, + DisplayFPS: 10, + }, + Refinement: &RefinementConfig{ + Enabled: true, + MaxConcurrent: 32, + MinCandidateSNRDb: 0, + MinSpanHz: 6000, + MaxSpanHz: 250000, + AutoSpan: boolPtr(true), + }, + Resources: &ResourceConfig{ + PreferGPU: true, + MaxRefinementJobs: 32, + MaxRecordingStreams: 24, + MaxDecodeJobs: 16, + DecisionHoldMs: 2000, + }, + }, + { + Name: "archive", + Description: "Record-first monitoring profile", + Pipeline: &PipelineConfig{Mode: "archive", Profile: "archive", Goals: PipelineGoalConfig{ + Intent: "archive-and-triage", + SignalPriorities: []string{"wfm", "nfm", "digital"}, + }}, + Surveillance: &SurveillanceConfig{ + AnalysisFFTSize: 4096, + FrameRate: 12, + Strategy: "single-resolution", + DisplayBins: 2048, + DisplayFPS: 12, + }, + Refinement: &RefinementConfig{ + Enabled: true, + MaxConcurrent: 12, + MinCandidateSNRDb: 0, + MinSpanHz: 4000, + MaxSpanHz: 200000, + AutoSpan: boolPtr(true), + }, + Resources: &ResourceConfig{ + PreferGPU: true, + MaxRefinementJobs: 12, + MaxRecordingStreams: 24, + MaxDecodeJobs: 12, + DecisionHoldMs: 2500, + }, + }, + { + Name: "digital-hunting", + Description: "Digital-first refinement and decode focus", + Pipeline: &PipelineConfig{Mode: "digital-hunting", Profile: "digital-hunting", Goals: PipelineGoalConfig{ + Intent: "digital-surveillance", + SignalPriorities: []string{"ft8", "wspr", "fsk", "psk", "dmr"}, + }}, + Surveillance: &SurveillanceConfig{ + AnalysisFFTSize: 4096, + FrameRate: 12, + Strategy: "multi-resolution", + DisplayBins: 2048, + DisplayFPS: 12, + }, + Refinement: &RefinementConfig{ + Enabled: true, + MaxConcurrent: 16, + MinCandidateSNRDb: 0, + MinSpanHz: 3000, + MaxSpanHz: 120000, + AutoSpan: boolPtr(true), + }, + Resources: &ResourceConfig{ + PreferGPU: true, + MaxRefinementJobs: 16, + MaxRecordingStreams: 12, + MaxDecodeJobs: 16, + DecisionHoldMs: 2000, + }, + }, }, Detector: DetectorConfig{ ThresholdDb: -20, diff --git a/internal/pipeline/budget.go b/internal/pipeline/budget.go new file mode 100644 index 0000000..303d46e --- /dev/null +++ b/internal/pipeline/budget.go @@ -0,0 +1,66 @@ +package pipeline + +import "strings" + +type BudgetQueue struct { + Max int `json:"max"` + IntentBias float64 `json:"intent_bias,omitempty"` + Source string `json:"source,omitempty"` +} + +type BudgetModel struct { + Refinement BudgetQueue `json:"refinement"` + Record BudgetQueue `json:"record"` + Decode BudgetQueue `json:"decode"` + HoldMs int `json:"hold_ms"` + Intent string `json:"intent,omitempty"` + Profile string `json:"profile,omitempty"` + Strategy string `json:"strategy,omitempty"` +} + +func BudgetModelFromPolicy(policy Policy) BudgetModel { + recordBias, decodeBias := budgetIntentBias(policy.Intent) + return BudgetModel{ + Refinement: BudgetQueue{ + Max: policy.MaxRefinementJobs, + Source: "resources.max_refinement_jobs", + }, + Record: BudgetQueue{ + Max: policy.MaxRecordingStreams, + IntentBias: recordBias, + Source: "resources.max_recording_streams", + }, + Decode: BudgetQueue{ + Max: policy.MaxDecodeJobs, + IntentBias: decodeBias, + Source: "resources.max_decode_jobs", + }, + HoldMs: policy.DecisionHoldMs, + Intent: policy.Intent, + Profile: policy.Profile, + Strategy: policy.RefinementStrategy, + } +} + +func budgetIntentBias(intent string) (float64, float64) { + if intent == "" { + return 0, 0 + } + recordBias := 0.0 + decodeBias := 0.0 + intent = strings.ToLower(intent) + if strings.Contains(intent, "archive") || strings.Contains(intent, "record") { + recordBias += 1.5 + } + if strings.Contains(intent, "triage") { + recordBias += 0.5 + decodeBias += 0.5 + } + if strings.Contains(intent, "decode") || strings.Contains(intent, "analysis") { + decodeBias += 1.0 + } + if strings.Contains(intent, "digital") { + decodeBias += 0.5 + } + return recordBias, decodeBias +} diff --git a/internal/pipeline/phases.go b/internal/pipeline/phases.go index 4118b24..abe83c8 100644 --- a/internal/pipeline/phases.go +++ b/internal/pipeline/phases.go @@ -4,6 +4,8 @@ import "sdr-wideband-suite/internal/detector" type AnalysisLevel struct { Name string `json:"name"` + Role string `json:"role,omitempty"` + Truth string `json:"truth,omitempty"` SampleRate int `json:"sample_rate"` FFTSize int `json:"fft_size"` CenterHz float64 `json:"center_hz"` @@ -11,6 +13,14 @@ type AnalysisLevel struct { Source string `json:"source,omitempty"` } +type AnalysisContext struct { + Surveillance AnalysisLevel `json:"surveillance,omitempty"` + Refinement AnalysisLevel `json:"refinement,omitempty"` + Presentation AnalysisLevel `json:"presentation,omitempty"` + Detail AnalysisLevel `json:"detail,omitempty"` + Derived []AnalysisLevel `json:"derived,omitempty"` +} + type SurveillanceResult struct { Level AnalysisLevel `json:"level"` Levels []AnalysisLevel `json:"levels,omitempty"` @@ -21,29 +31,45 @@ type SurveillanceResult struct { NoiseFloor float64 `json:"noise_floor"` Thresholds []float64 `json:"thresholds,omitempty"` DisplayLevel AnalysisLevel `json:"display_level"` + Context AnalysisContext `json:"context,omitempty"` } type RefinementPlan struct { TotalCandidates int `json:"total_candidates"` MinCandidateSNRDb float64 `json:"min_candidate_snr_db"` Budget int `json:"budget"` + Strategy string `json:"strategy,omitempty"` + StrategyReason string `json:"strategy_reason,omitempty"` MonitorStartHz float64 `json:"monitor_start_hz,omitempty"` MonitorEndHz float64 `json:"monitor_end_hz,omitempty"` MonitorSpanHz float64 `json:"monitor_span_hz,omitempty"` DroppedByMonitor int `json:"dropped_by_monitor"` DroppedBySNR int `json:"dropped_by_snr"` DroppedByBudget int `json:"dropped_by_budget"` + ScoreModel RefinementScoreModel `json:"score_model,omitempty"` PriorityMin float64 `json:"priority_min,omitempty"` PriorityMax float64 `json:"priority_max,omitempty"` PriorityAvg float64 `json:"priority_avg,omitempty"` PriorityCutoff float64 `json:"priority_cutoff,omitempty"` Selected []ScheduledCandidate `json:"selected,omitempty"` + WorkItems []RefinementWorkItem `json:"work_items,omitempty"` +} + +type RefinementRequest struct { + Strategy string `json:"strategy,omitempty"` + Reason string `json:"reason,omitempty"` + SpanHintHz float64 `json:"span_hint_hz,omitempty"` } type RefinementInput struct { Level AnalysisLevel `json:"level"` + Detail AnalysisLevel `json:"detail,omitempty"` + Context AnalysisContext `json:"context,omitempty"` + Request RefinementRequest `json:"request,omitempty"` + Budgets BudgetModel `json:"budgets,omitempty"` Candidates []Candidate `json:"candidates,omitempty"` Scheduled []ScheduledCandidate `json:"scheduled,omitempty"` + WorkItems []RefinementWorkItem `json:"work_items,omitempty"` Plan RefinementPlan `json:"plan,omitempty"` Windows []RefinementWindow `json:"windows,omitempty"` SampleRate int `json:"sample_rate"` diff --git a/internal/pipeline/policy.go b/internal/pipeline/policy.go index 2f94354..84cb472 100644 --- a/internal/pipeline/policy.go +++ b/internal/pipeline/policy.go @@ -4,6 +4,7 @@ import "sdr-wideband-suite/internal/config" type Policy struct { Mode string `json:"mode"` + Profile string `json:"profile,omitempty"` Intent string `json:"intent"` MonitorCenterHz float64 `json:"monitor_center_hz,omitempty"` MonitorStartHz float64 `json:"monitor_start_hz,omitempty"` @@ -17,6 +18,7 @@ type Policy struct { DisplayBins int `json:"display_bins"` DisplayFPS int `json:"display_fps"` SurveillanceStrategy string `json:"surveillance_strategy"` + RefinementStrategy string `json:"refinement_strategy,omitempty"` RefinementEnabled bool `json:"refinement_enabled"` MaxRefinementJobs int `json:"max_refinement_jobs"` RefinementMaxConcurrent int `json:"refinement_max_concurrent"` @@ -25,6 +27,7 @@ type Policy struct { RefinementMaxSpanHz float64 `json:"refinement_max_span_hz"` RefinementAutoSpan bool `json:"refinement_auto_span"` PreferGPU bool `json:"prefer_gpu"` + MaxRecordingStreams int `json:"max_recording_streams"` MaxDecodeJobs int `json:"max_decode_jobs"` DecisionHoldMs int `json:"decision_hold_ms"` } @@ -32,6 +35,7 @@ type Policy struct { func PolicyFromConfig(cfg config.Config) Policy { p := Policy{ Mode: cfg.Pipeline.Mode, + Profile: cfg.Pipeline.Profile, Intent: cfg.Pipeline.Goals.Intent, MonitorCenterHz: cfg.CenterHz, MonitorStartHz: cfg.Pipeline.Goals.MonitorStartHz, @@ -53,9 +57,11 @@ func PolicyFromConfig(cfg config.Config) Policy { RefinementMaxSpanHz: cfg.Refinement.MaxSpanHz, RefinementAutoSpan: config.BoolValue(cfg.Refinement.AutoSpan, true), PreferGPU: cfg.Resources.PreferGPU, + MaxRecordingStreams: cfg.Resources.MaxRecordingStreams, MaxDecodeJobs: cfg.Resources.MaxDecodeJobs, DecisionHoldMs: cfg.Resources.DecisionHoldMs, } + p.RefinementStrategy, _ = refinementStrategy(p) if p.MonitorSpanHz <= 0 && p.MonitorStartHz != 0 && p.MonitorEndHz != 0 && p.MonitorEndHz > p.MonitorStartHz { p.MonitorSpanHz = p.MonitorEndHz - p.MonitorStartHz } @@ -66,6 +72,10 @@ func ApplyNamedProfile(cfg *config.Config, name string) { if cfg == nil || name == "" { return } + cfg.Pipeline.Profile = name + if prof, ok := ResolveProfile(*cfg, name); ok { + MergeProfile(cfg, prof) + } switch name { case "legacy": cfg.Pipeline.Mode = "legacy" @@ -78,7 +88,7 @@ func ApplyNamedProfile(cfg *config.Config, name string) { case "wideband-balanced": cfg.Pipeline.Mode = "wideband-balanced" cfg.Pipeline.Goals.Intent = "wideband-surveillance" - cfg.Surveillance.Strategy = "single-resolution" + cfg.Surveillance.Strategy = "multi-resolution" if cfg.Surveillance.AnalysisFFTSize < 4096 { cfg.Surveillance.AnalysisFFTSize = 4096 } @@ -101,11 +111,14 @@ func ApplyNamedProfile(cfg *config.Config, name string) { if cfg.Refinement.MaxSpanHz <= 0 { cfg.Refinement.MaxSpanHz = 200000 } + if len(cfg.Pipeline.Goals.SignalPriorities) == 0 { + cfg.Pipeline.Goals.SignalPriorities = []string{"digital", "wfm"} + } cfg.Resources.PreferGPU = true case "wideband-aggressive": cfg.Pipeline.Mode = "wideband-aggressive" cfg.Pipeline.Goals.Intent = "high-density-wideband-surveillance" - cfg.Surveillance.Strategy = "single-resolution" + cfg.Surveillance.Strategy = "multi-resolution" if cfg.Surveillance.AnalysisFFTSize < 8192 { cfg.Surveillance.AnalysisFFTSize = 8192 } @@ -128,6 +141,9 @@ func ApplyNamedProfile(cfg *config.Config, name string) { if cfg.Refinement.MaxSpanHz <= 0 { cfg.Refinement.MaxSpanHz = 250000 } + if len(cfg.Pipeline.Goals.SignalPriorities) == 0 { + cfg.Pipeline.Goals.SignalPriorities = []string{"digital", "wfm", "trunk"} + } cfg.Resources.PreferGPU = true case "archive": cfg.Pipeline.Mode = "archive" @@ -139,9 +155,51 @@ func ApplyNamedProfile(cfg *config.Config, name string) { if cfg.Resources.MaxRefinementJobs < 12 { cfg.Resources.MaxRefinementJobs = 12 } + if cfg.Resources.MaxRecordingStreams < 24 { + cfg.Resources.MaxRecordingStreams = 24 + } + if cfg.Resources.MaxDecodeJobs < 12 { + cfg.Resources.MaxDecodeJobs = 12 + } + if len(cfg.Pipeline.Goals.SignalPriorities) == 0 { + cfg.Pipeline.Goals.SignalPriorities = []string{"wfm", "nfm", "digital"} + } if !cfg.Recorder.Enabled { cfg.Recorder.Enabled = true } + case "digital-hunting": + cfg.Pipeline.Mode = "digital-hunting" + cfg.Pipeline.Goals.Intent = "digital-surveillance" + cfg.Surveillance.Strategy = "multi-resolution" + if cfg.Surveillance.AnalysisFFTSize < 4096 { + cfg.Surveillance.AnalysisFFTSize = 4096 + } + if cfg.FrameRate < 12 { + cfg.FrameRate = 12 + } + if cfg.Surveillance.FrameRate < 12 { + cfg.Surveillance.FrameRate = 12 + } + cfg.Refinement.Enabled = true + if cfg.Refinement.MaxConcurrent < 16 { + cfg.Refinement.MaxConcurrent = 16 + } + if cfg.Resources.MaxRefinementJobs < 16 { + cfg.Resources.MaxRefinementJobs = 16 + } + if cfg.Refinement.MinSpanHz <= 0 { + cfg.Refinement.MinSpanHz = 3000 + } + if cfg.Refinement.MaxSpanHz <= 0 { + cfg.Refinement.MaxSpanHz = 120000 + } + if len(cfg.Pipeline.Goals.SignalPriorities) == 0 { + cfg.Pipeline.Goals.SignalPriorities = []string{"ft8", "wspr", "fsk", "psk", "dmr"} + } + cfg.Resources.PreferGPU = true + } + if cfg.Pipeline.Goals.MonitorSpanHz <= 0 && cfg.SampleRate > 0 { + cfg.Pipeline.Goals.MonitorSpanHz = float64(cfg.SampleRate) } if cfg.Resources.MaxDecodeJobs <= 0 { cfg.Resources.MaxDecodeJobs = cfg.Resources.MaxRecordingStreams diff --git a/internal/pipeline/policy_test.go b/internal/pipeline/policy_test.go index b2c186d..2e3c066 100644 --- a/internal/pipeline/policy_test.go +++ b/internal/pipeline/policy_test.go @@ -12,12 +12,18 @@ func TestApplyNamedProfile(t *testing.T) { if cfg.Pipeline.Mode != "wideband-balanced" { t.Fatalf("mode not applied: %s", cfg.Pipeline.Mode) } + if cfg.Pipeline.Profile != "wideband-balanced" { + t.Fatalf("profile not applied: %s", cfg.Pipeline.Profile) + } if cfg.Pipeline.Goals.Intent != "wideband-surveillance" { t.Fatalf("intent not applied: %s", cfg.Pipeline.Goals.Intent) } if cfg.Surveillance.AnalysisFFTSize < 4096 { t.Fatalf("analysis fft too small: %d", cfg.Surveillance.AnalysisFFTSize) } + if cfg.Surveillance.Strategy != "multi-resolution" { + t.Fatalf("strategy not applied: %s", cfg.Surveillance.Strategy) + } if !cfg.Refinement.Enabled { t.Fatalf("refinement should stay enabled") } @@ -42,6 +48,7 @@ func TestPolicyFromConfig(t *testing.T) { cfg.Resources.MaxRefinementJobs = 5 cfg.Refinement.MinCandidateSNRDb = 2.5 cfg.Resources.PreferGPU = true + cfg.Resources.MaxRecordingStreams = 7 p := PolicyFromConfig(cfg) if p.Mode != "archive" || p.Intent != "archive-and-triage" || p.SurveillanceFFTSize != 8192 || p.SurveillanceFPS != 9 || p.DisplayBins != 1200 || p.DisplayFPS != 6 { t.Fatalf("unexpected policy: %+v", p) @@ -55,4 +62,10 @@ func TestPolicyFromConfig(t *testing.T) { if !p.RefinementEnabled || p.MaxRefinementJobs != 5 || p.MinCandidateSNRDb != 2.5 || !p.PreferGPU { t.Fatalf("unexpected policy details: %+v", p) } + if p.MaxRecordingStreams != 7 { + t.Fatalf("unexpected record budget: %+v", p.MaxRecordingStreams) + } + if p.RefinementStrategy == "" { + t.Fatalf("expected refinement strategy to be set") + } } diff --git a/internal/pipeline/profile.go b/internal/pipeline/profile.go index e4e9ce0..0cf34c6 100644 --- a/internal/pipeline/profile.go +++ b/internal/pipeline/profile.go @@ -1,10 +1,15 @@ package pipeline -import "sdr-wideband-suite/internal/config" +import ( + "strings" + + "sdr-wideband-suite/internal/config" +) func ResolveProfile(cfg config.Config, name string) (config.ProfileConfig, bool) { + name = strings.ToLower(strings.TrimSpace(name)) for _, p := range cfg.Profiles { - if p.Name == name { + if strings.ToLower(strings.TrimSpace(p.Name)) == name { return p, true } } @@ -15,8 +20,14 @@ func MergeProfile(cfg *config.Config, profile config.ProfileConfig) { if cfg == nil { return } + if profile.Name != "" { + cfg.Pipeline.Profile = profile.Name + } if profile.Pipeline != nil { cfg.Pipeline = *profile.Pipeline + if profile.Name != "" { + cfg.Pipeline.Profile = profile.Name + } } if profile.Surveillance != nil { cfg.Surveillance = *profile.Surveillance diff --git a/internal/pipeline/profile_test.go b/internal/pipeline/profile_test.go index f121ad9..625c230 100644 --- a/internal/pipeline/profile_test.go +++ b/internal/pipeline/profile_test.go @@ -9,12 +9,12 @@ import ( func TestResolveAndMergeProfile(t *testing.T) { cfg := config.Default() cfg.Profiles = append(cfg.Profiles, config.ProfileConfig{ - Name: "custom-test", - Description: "test profile", - Pipeline: &config.PipelineConfig{Mode: "custom", Goals: config.PipelineGoalConfig{Intent: "custom-intent", MonitorSpanHz: 12.5e6}}, + Name: "custom-test", + Description: "test profile", + Pipeline: &config.PipelineConfig{Mode: "custom", Goals: config.PipelineGoalConfig{Intent: "custom-intent", MonitorSpanHz: 12.5e6}}, Surveillance: &config.SurveillanceConfig{AnalysisFFTSize: 16384, FrameRate: 8, Strategy: "single-resolution"}, - Refinement: &config.RefinementConfig{Enabled: true, MaxConcurrent: 20, MinCandidateSNRDb: 4}, - Resources: &config.ResourceConfig{PreferGPU: true, MaxRefinementJobs: 20, MaxRecordingStreams: 32}, + Refinement: &config.RefinementConfig{Enabled: true, MaxConcurrent: 20, MinCandidateSNRDb: 4}, + Resources: &config.ResourceConfig{PreferGPU: true, MaxRefinementJobs: 20, MaxRecordingStreams: 32}, }) p, ok := ResolveProfile(cfg, "custom-test") if !ok { @@ -24,6 +24,9 @@ func TestResolveAndMergeProfile(t *testing.T) { if cfg.Pipeline.Mode != "custom" || cfg.Pipeline.Goals.Intent != "custom-intent" { t.Fatalf("pipeline not merged: %+v", cfg.Pipeline) } + if cfg.Pipeline.Profile != "custom-test" { + t.Fatalf("profile not applied: %+v", cfg.Pipeline.Profile) + } if cfg.FFTSize != 16384 || cfg.FrameRate != 8 { t.Fatalf("surveillance not merged into legacy fields: fft=%d fps=%d", cfg.FFTSize, cfg.FrameRate) } diff --git a/internal/pipeline/scheduler.go b/internal/pipeline/scheduler.go index 273243f..93f5ab6 100644 --- a/internal/pipeline/scheduler.go +++ b/internal/pipeline/scheduler.go @@ -1,32 +1,73 @@ package pipeline -import "sort" +import ( + "sort" + "strings" +) type ScheduledCandidate struct { - Candidate Candidate `json:"candidate"` - Priority float64 `json:"priority"` - Breakdown *PriorityBreakdown `json:"breakdown,omitempty"` + Candidate Candidate `json:"candidate"` + Priority float64 `json:"priority"` + Score *RefinementScore `json:"score,omitempty"` + Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` } -type PriorityBreakdown struct { +type RefinementScoreModel struct { + SNRWeight float64 `json:"snr_weight"` + BandwidthWeight float64 `json:"bandwidth_weight"` + PeakWeight float64 `json:"peak_weight"` +} + +type RefinementScoreDetails struct { SNRScore float64 `json:"snr_score"` BandwidthScore float64 `json:"bandwidth_score"` PeakScore float64 `json:"peak_score"` PolicyBoost float64 `json:"policy_boost"` } +type RefinementScore struct { + Total float64 `json:"total"` + Breakdown RefinementScoreDetails `json:"breakdown"` + Weights *RefinementScoreModel `json:"weights,omitempty"` +} + +type RefinementWorkItem struct { + Candidate Candidate `json:"candidate"` + Window RefinementWindow `json:"window,omitempty"` + Priority float64 `json:"priority,omitempty"` + Score *RefinementScore `json:"score,omitempty"` + Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` + Status string `json:"status,omitempty"` + Reason string `json:"reason,omitempty"` +} + +const ( + RefinementStatusSelected = "selected" + RefinementStatusDropped = "dropped" + RefinementStatusDeferred = "deferred" +) + +const ( + RefinementReasonSelected = "selected" + RefinementReasonMonitorGate = "dropped:monitor" + RefinementReasonBelowSNR = "dropped:snr" + RefinementReasonBudget = "dropped:budget" + RefinementReasonDisabled = "dropped:disabled" + RefinementReasonUnclassified = "dropped:unclassified" +) + // BuildRefinementPlan scores and budgets candidates for costly local refinement. // Current heuristic is intentionally simple and deterministic; later phases can add // richer scoring (novelty, persistence, profile-aware band priorities, decoder value). func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { - budget := policy.MaxRefinementJobs - if policy.RefinementMaxConcurrent > 0 && (budget <= 0 || policy.RefinementMaxConcurrent < budget) { - budget = policy.RefinementMaxConcurrent - } + budget := refinementBudget(policy) + strategy, strategyReason := refinementStrategy(policy) plan := RefinementPlan{ TotalCandidates: len(candidates), MinCandidateSNRDb: policy.MinCandidateSNRDb, Budget: budget, + Strategy: strategy, + StrategyReason: strategyReason, } if start, end, ok := monitorBounds(policy); ok { plan.MonitorStartHz = start @@ -39,14 +80,31 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { return plan } snrWeight, bwWeight, peakWeight := refinementIntentWeights(policy.Intent) + scoreModel := RefinementScoreModel{ + SNRWeight: snrWeight, + BandwidthWeight: bwWeight, + PeakWeight: peakWeight, + } + plan.ScoreModel = scoreModel scored := make([]ScheduledCandidate, 0, len(candidates)) + workItems := make([]RefinementWorkItem, 0, len(candidates)) for _, c := range candidates { if !candidateInMonitor(policy, c) { plan.DroppedByMonitor++ + workItems = append(workItems, RefinementWorkItem{ + Candidate: c, + Status: RefinementStatusDropped, + Reason: RefinementReasonMonitorGate, + }) continue } if c.SNRDb < policy.MinCandidateSNRDb { plan.DroppedBySNR++ + workItems = append(workItems, RefinementWorkItem{ + Candidate: c, + Status: RefinementStatusDropped, + Reason: RefinementReasonBelowSNR, + }) continue } snrScore := c.SNRDb * snrWeight @@ -60,15 +118,28 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { peakScore = (c.PeakDb / 20.0) * peakWeight } priority := snrScore + bwScore + peakScore + policyBoost - scored = append(scored, ScheduledCandidate{ - Candidate: c, - Priority: priority, - Breakdown: &PriorityBreakdown{ + score := &RefinementScore{ + Total: priority, + Breakdown: RefinementScoreDetails{ SNRScore: snrScore, BandwidthScore: bwScore, PeakScore: peakScore, PolicyBoost: policyBoost, }, + Weights: &scoreModel, + } + scored = append(scored, ScheduledCandidate{ + Candidate: c, + Priority: priority, + Score: score, + Breakdown: &score.Breakdown, + }) + workItems = append(workItems, RefinementWorkItem{ + Candidate: c, + Priority: priority, + Score: score, + Breakdown: &score.Breakdown, + Status: RefinementStatusDeferred, }) } sort.Slice(scored, func(i, j int) bool { @@ -103,6 +174,23 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { plan.PriorityCutoff = plan.Selected[len(plan.Selected)-1].Priority } plan.DroppedByBudget = len(scored) - len(plan.Selected) + if len(plan.Selected) > 0 { + selected := map[int64]struct{}{} + for _, s := range plan.Selected { + selected[s.Candidate.ID] = struct{}{} + } + for i := range workItems { + item := &workItems[i] + if _, ok := selected[item.Candidate.ID]; ok { + item.Status = RefinementStatusSelected + item.Reason = RefinementReasonSelected + } else if item.Status == RefinementStatusDeferred { + item.Status = RefinementStatusDropped + item.Reason = RefinementReasonBudget + } + } + } + plan.WorkItems = workItems return plan } @@ -110,6 +198,28 @@ func ScheduleCandidates(candidates []Candidate, policy Policy) []ScheduledCandid return BuildRefinementPlan(candidates, policy).Selected } +func refinementBudget(policy Policy) int { + budget := policy.MaxRefinementJobs + if policy.RefinementMaxConcurrent > 0 && (budget <= 0 || policy.RefinementMaxConcurrent < budget) { + budget = policy.RefinementMaxConcurrent + } + return budget +} + +func refinementStrategy(policy Policy) (string, string) { + intent := strings.ToLower(strings.TrimSpace(policy.Intent)) + switch { + case strings.Contains(intent, "digital") || strings.Contains(intent, "hunt") || strings.Contains(intent, "decode"): + return "digital-hunting", "intent" + case strings.Contains(intent, "archive") || strings.Contains(intent, "triage") || strings.Contains(policy.Mode, "archive"): + return "archive-oriented", "intent" + case strings.Contains(strings.ToLower(policy.SurveillanceStrategy), "multi"): + return "multi-resolution", "surveillance-strategy" + default: + return "single-resolution", "default" + } +} + func minFloat64(a, b float64) float64 { if a < b { return a diff --git a/internal/pipeline/scheduler_test.go b/internal/pipeline/scheduler_test.go index 93eaa5d..a37f748 100644 --- a/internal/pipeline/scheduler_test.go +++ b/internal/pipeline/scheduler_test.go @@ -42,6 +42,21 @@ func TestBuildRefinementPlanTracksDrops(t *testing.T) { if len(plan.Selected) != 1 || plan.Selected[0].Candidate.ID != 2 { t.Fatalf("unexpected plan selection: %+v", plan.Selected) } + if len(plan.WorkItems) != len(cands) { + t.Fatalf("expected work items for all candidates, got %d", len(plan.WorkItems)) + } + item2 := findWorkItem(plan.WorkItems, 2) + if item2 == nil || item2.Status != RefinementStatusSelected || item2.Reason != RefinementReasonSelected { + t.Fatalf("expected candidate 2 selected with reason, got %+v", item2) + } + item1 := findWorkItem(plan.WorkItems, 1) + if item1 == nil || item1.Reason != RefinementReasonBelowSNR { + t.Fatalf("expected candidate 1 dropped by snr, got %+v", item1) + } + item3 := findWorkItem(plan.WorkItems, 3) + if item3 == nil || item3.Reason != RefinementReasonBudget { + t.Fatalf("expected candidate 3 dropped by budget, got %+v", item3) + } } func TestBuildRefinementPlanRespectsMaxConcurrent(t *testing.T) { @@ -139,4 +154,16 @@ func TestBuildRefinementPlanPriorityStats(t *testing.T) { if plan.Selected[0].Breakdown == nil { t.Fatalf("expected breakdown on selected candidate") } + if plan.Selected[0].Score == nil || plan.Selected[0].Score.Total == 0 { + t.Fatalf("expected score on selected candidate") + } +} + +func findWorkItem(items []RefinementWorkItem, id int64) *RefinementWorkItem { + for i := range items { + if items[i].Candidate.ID == id { + return &items[i] + } + } + return nil } diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index cdbe1fa..418f4d7 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -63,23 +63,23 @@ type ConfigUpdate struct { } type DetectorUpdate struct { - ThresholdDb *float64 `json:"threshold_db"` - MinDuration *int `json:"min_duration_ms"` - HoldMs *int `json:"hold_ms"` - EmaAlpha *float64 `json:"ema_alpha"` - HysteresisDb *float64 `json:"hysteresis_db"` - MinStableFrames *int `json:"min_stable_frames"` - GapToleranceMs *int `json:"gap_tolerance_ms"` - CFARMode *string `json:"cfar_mode"` - CFARGuardHz *float64 `json:"cfar_guard_hz"` - CFARTrainHz *float64 `json:"cfar_train_hz"` - CFARGuardCells *int `json:"cfar_guard_cells"` - CFARTrainCells *int `json:"cfar_train_cells"` - CFARRank *int `json:"cfar_rank"` - CFARScaleDb *float64 `json:"cfar_scale_db"` - CFARWrapAround *bool `json:"cfar_wrap_around"` - EdgeMarginDb *float64 `json:"edge_margin_db"` - MergeGapHz *float64 `json:"merge_gap_hz"` + ThresholdDb *float64 `json:"threshold_db"` + MinDuration *int `json:"min_duration_ms"` + HoldMs *int `json:"hold_ms"` + EmaAlpha *float64 `json:"ema_alpha"` + HysteresisDb *float64 `json:"hysteresis_db"` + MinStableFrames *int `json:"min_stable_frames"` + GapToleranceMs *int `json:"gap_tolerance_ms"` + CFARMode *string `json:"cfar_mode"` + CFARGuardHz *float64 `json:"cfar_guard_hz"` + CFARTrainHz *float64 `json:"cfar_train_hz"` + CFARGuardCells *int `json:"cfar_guard_cells"` + CFARTrainCells *int `json:"cfar_train_cells"` + CFARRank *int `json:"cfar_rank"` + CFARScaleDb *float64 `json:"cfar_scale_db"` + CFARWrapAround *bool `json:"cfar_wrap_around"` + EdgeMarginDb *float64 `json:"edge_margin_db"` + MergeGapHz *float64 `json:"merge_gap_hz"` ClassHistorySize *int `json:"class_history_size"` ClassSwitchRatio *float64 `json:"class_switch_ratio"` } @@ -179,6 +179,9 @@ func (m *Manager) ApplyConfig(update ConfigUpdate) (config.Config, error) { if update.Pipeline.Mode != nil { next.Pipeline.Mode = *update.Pipeline.Mode } + if update.Pipeline.Profile != nil { + next.Pipeline.Profile = *update.Pipeline.Profile + } if update.Pipeline.Intent != nil { next.Pipeline.Goals.Intent = *update.Pipeline.Intent } diff --git a/internal/runtime/runtime_test.go b/internal/runtime/runtime_test.go index 3381ad2..4f78d78 100644 --- a/internal/runtime/runtime_test.go +++ b/internal/runtime/runtime_test.go @@ -53,8 +53,8 @@ func TestApplyConfigUpdate(t *testing.T) { AutoDecodeClasses: &autoDecode, }, Surveillance: &SurveillanceUpdate{FrameRate: &survFPS, DisplayBins: &displayBins, DisplayFPS: &displayFPS}, - Refinement: &RefinementUpdate{MinSpanHz: &minSpan, MaxSpanHz: &maxSpan, AutoSpan: &autoSpan}, - Resources: &ResourcesUpdate{MaxRefinementJobs: &maxRefJobs, MaxDecodeJobs: &maxDecode, DecisionHoldMs: &decisionHold}, + Refinement: &RefinementUpdate{MinSpanHz: &minSpan, MaxSpanHz: &maxSpan, AutoSpan: &autoSpan}, + Resources: &ResourcesUpdate{MaxRefinementJobs: &maxRefJobs, MaxDecodeJobs: &maxDecode, DecisionHoldMs: &decisionHold}, Detector: &DetectorUpdate{ ThresholdDb: &threshold, CFARMode: &cfarMode, @@ -107,6 +107,9 @@ func TestApplyConfigUpdate(t *testing.T) { if updated.Pipeline.Mode != mode { t.Fatalf("pipeline mode: %v", updated.Pipeline.Mode) } + if updated.Pipeline.Profile != profile { + t.Fatalf("pipeline profile: %v", updated.Pipeline.Profile) + } if updated.Pipeline.Goals.Intent != intent { t.Fatalf("pipeline intent: %v", updated.Pipeline.Goals.Intent) }