diff --git a/cmd/sdrd/arbitration_snapshot.go b/cmd/sdrd/arbitration_snapshot.go new file mode 100644 index 0000000..0793e4e --- /dev/null +++ b/cmd/sdrd/arbitration_snapshot.go @@ -0,0 +1,13 @@ +package main + +import "sdr-wideband-suite/internal/pipeline" + +func buildArbitrationSnapshot(step pipeline.RefinementStep, queue decisionQueueStats) *ArbitrationSnapshot { + return &ArbitrationSnapshot{ + Budgets: &step.Input.Budgets, + RefinementPlan: &step.Input.Plan, + Queue: queue, + DecisionSummary: summarizeDecisions(step.Result.Decisions), + DecisionItems: compactDecisions(step.Result.Decisions), + } +} diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index aca74e2..f841f0f 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -159,9 +159,8 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * refinementDebug.Windows = windowStats } refinementDebug.Queue = state.queueStats - policy := pipeline.PolicyFromConfig(rt.cfg) - budget := pipeline.BudgetModelFromPolicy(policy) - refinementDebug.Budgets = &budget + refinementDebug.Budgets = &state.refinement.Input.Budgets + refinementDebug.Arbitration = buildArbitrationSnapshot(state.refinement, state.queueStats) debugInfo.Refinement = refinementDebug debugInfo.Decisions = &DecisionDebug{ Summary: summarizeDecisions(state.refinement.Result.Decisions), diff --git a/cmd/sdrd/http_handlers.go b/cmd/sdrd/http_handlers.go index 64e88e9..ed59e6c 100644 --- a/cmd/sdrd/http_handlers.go +++ b/cmd/sdrd/http_handlers.go @@ -152,6 +152,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime "auto_record_classes": policy.AutoRecordClasses, "auto_decode_classes": policy.AutoDecodeClasses, "refinement_jobs": policy.MaxRefinementJobs, + "refinement_detail_fft": policy.RefinementDetailFFTSize, "refinement_auto_span": policy.RefinementAutoSpan, "refinement_min_span_hz": policy.RefinementMinSpanHz, "refinement_max_span_hz": policy.RefinementMaxSpanHz, @@ -163,6 +164,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime w.Header().Set("Content-Type", "application/json") snap := phaseSnap.Snapshot() windowStats := buildWindowStats(snap.refinement.Input.Windows) + arbitration := buildArbitrationSnapshot(snap.refinement, snap.queueStats) out := map[string]any{ "plan": snap.refinement.Input.Plan, "windows": snap.refinement.Input.Windows, @@ -172,6 +174,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime "context": snap.refinement.Input.Context, "detail_level": snap.refinement.Input.Detail, "budgets": snap.refinement.Input.Budgets, + "arbitration": arbitration, "work_items": snap.refinement.Input.WorkItems, "candidates": len(snap.refinement.Input.Candidates), "scheduled": len(snap.refinement.Input.Scheduled), diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index 9b5628a..7053751 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -32,6 +32,9 @@ type dspRuntime struct { det *detector.Detector window []float64 plan *fftutil.CmplxPlan + detailWindow []float64 + detailPlan *fftutil.CmplxPlan + detailFFT int dcEnabled bool iqEnabled bool useGPU bool @@ -58,11 +61,18 @@ type spectrumArtifacts struct { } func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime { + detailFFT := cfg.Refinement.DetailFFTSize + if detailFFT <= 0 { + detailFFT = cfg.FFTSize + } rt := &dspRuntime{ cfg: cfg, det: det, window: window, plan: fftutil.NewCmplxPlan(cfg.FFTSize), + detailWindow: fftutil.Hann(detailFFT), + detailPlan: fftutil.NewCmplxPlan(detailFFT), + detailFFT: detailFFT, dcEnabled: cfg.DCBlock, iqEnabled: cfg.IQBalance, useGPU: cfg.UseGPUFFT, @@ -88,6 +98,7 @@ func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) { prevFFT := rt.cfg.FFTSize prevUseGPU := rt.useGPU + prevDetailFFT := rt.detailFFT rt.cfg = upd.cfg if rec != nil { rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{ @@ -116,6 +127,15 @@ func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *rec rt.window = upd.window rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize) } + detailFFT := rt.cfg.Refinement.DetailFFTSize + if detailFFT <= 0 { + detailFFT = rt.cfg.FFTSize + } + if detailFFT != prevDetailFFT { + rt.detailFFT = detailFFT + rt.detailWindow = fftutil.Hann(detailFFT) + rt.detailPlan = fftutil.NewCmplxPlan(detailFFT) + } rt.dcEnabled = upd.dcBlock rt.iqEnabled = upd.iqBalance if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU { @@ -147,15 +167,19 @@ func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *rec } func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 { + return rt.spectrumFromIQWithPlan(iq, rt.window, rt.plan, gpuState, true) +} + +func (rt *dspRuntime) spectrumFromIQWithPlan(iq []complex64, window []float64, plan *fftutil.CmplxPlan, gpuState *gpuStatus, allowGPU bool) []float64 { if len(iq) == 0 { return nil } - if rt.useGPU && rt.gpuEngine != nil { + if allowGPU && rt.useGPU && rt.gpuEngine != nil { gpuBuf := make([]complex64, len(iq)) - if len(rt.window) == len(iq) { + if len(window) == len(iq) { for i := 0; i < len(iq); i++ { v := iq[i] - w := float32(rt.window[i]) + w := float32(window[i]) gpuBuf[i] = complex(real(v)*w, imag(v)*w) } } else { @@ -167,20 +191,24 @@ func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []floa gpuState.set(false, err) } rt.useGPU = false - return fftutil.SpectrumWithPlan(gpuBuf, nil, rt.plan) + return fftutil.SpectrumWithPlan(gpuBuf, nil, plan) } return fftutil.SpectrumFromFFT(out) } - return fftutil.SpectrumWithPlan(iq, rt.window, rt.plan) + return fftutil.SpectrumWithPlan(iq, window, plan) } func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) { - available := rt.cfg.FFTSize + required := rt.cfg.FFTSize + if rt.detailFFT > required { + required = rt.detailFFT + } + available := required st := srcMgr.Stats() - if st.BufferSamples > rt.cfg.FFTSize { - available = (st.BufferSamples / rt.cfg.FFTSize) * rt.cfg.FFTSize - if available < rt.cfg.FFTSize { - available = rt.cfg.FFTSize + if st.BufferSamples > required { + available = (st.BufferSamples / required) * required + if available < required { + available = required } } allIQ, err := srcMgr.ReadIQ(available) @@ -194,11 +222,19 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag if len(allIQ) > rt.cfg.FFTSize { survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:] } + detailIQ := survIQ + if rt.detailFFT > 0 && len(allIQ) >= rt.detailFFT { + detailIQ = allIQ[len(allIQ)-rt.detailFFT:] + } if rt.dcEnabled { - dcBlocker.Apply(survIQ) + dcBlocker.Apply(allIQ) } if rt.iqEnabled { dsp.IQBalance(survIQ) + if !sameIQBuffer(detailIQ, survIQ) { + detailIQ = append([]complex64(nil), detailIQ...) + dsp.IQBalance(detailIQ) + } } survSpectrum := rt.spectrumFromIQ(survIQ, gpuState) for i := range survSpectrum { @@ -206,10 +242,9 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag survSpectrum[i] = -200 } } - detailIQ := survIQ detailSpectrum := survSpectrum if !sameIQBuffer(detailIQ, survIQ) { - detailSpectrum = rt.spectrumFromIQ(detailIQ, gpuState) + detailSpectrum = rt.spectrumFromIQWithPlan(detailIQ, rt.detailWindow, rt.detailPlan, gpuState, false) for i := range detailSpectrum { if math.IsNaN(detailSpectrum[i]) || math.IsInf(detailSpectrum[i], 0) { detailSpectrum[i] = -200 @@ -318,6 +353,10 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip workItems[idx].Window = window } } + detailFFT := rt.cfg.Refinement.DetailFFTSize + if detailFFT <= 0 { + detailFFT = rt.cfg.FFTSize + } levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate)) if _, maxSpan, ok := windowSpanBounds(windows); ok { levelSpan = maxSpan @@ -327,7 +366,7 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip Role: "refinement", Truth: "refinement", SampleRate: rt.cfg.SampleRate, - FFTSize: rt.cfg.FFTSize, + FFTSize: detailFFT, CenterHz: rt.cfg.CenterHz, SpanHz: levelSpan, Source: "refinement-window", @@ -337,11 +376,27 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip Role: "detail", Truth: "refinement", SampleRate: rt.cfg.SampleRate, - FFTSize: rt.cfg.FFTSize, + FFTSize: detailFFT, CenterHz: rt.cfg.CenterHz, SpanHz: levelSpan, Source: "detail-spectrum", } + if len(workItems) > 0 { + for i := range workItems { + item := &workItems[i] + if item.Window.SpanHz <= 0 { + continue + } + item.Execution = &pipeline.RefinementExecution{ + Stage: "refine", + SampleRate: rt.cfg.SampleRate, + FFTSize: detailFFT, + CenterHz: item.Window.CenterHz, + SpanHz: item.Window.SpanHz, + Source: detailLevel.Source, + } + } + } input := pipeline.RefinementInput{ Level: level, Detail: detailLevel, @@ -354,7 +409,7 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip Plan: plan, Windows: windows, SampleRate: rt.cfg.SampleRate, - FFTSize: rt.cfg.FFTSize, + FFTSize: detailFFT, CenterHz: rt.cfg.CenterHz, Source: "surveillance-detector", } diff --git a/cmd/sdrd/types.go b/cmd/sdrd/types.go index 57a02b0..0c0571c 100644 --- a/cmd/sdrd/types.go +++ b/cmd/sdrd/types.go @@ -32,12 +32,13 @@ type RefinementWindowStats struct { } type RefinementDebug struct { - Plan *pipeline.RefinementPlan `json:"plan,omitempty"` - Request *pipeline.RefinementRequest `json:"request,omitempty"` - WorkItems []pipeline.RefinementWorkItem `json:"work_items,omitempty"` - Windows *RefinementWindowStats `json:"windows,omitempty"` - Queue decisionQueueStats `json:"queue,omitempty"` - Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` + Plan *pipeline.RefinementPlan `json:"plan,omitempty"` + Request *pipeline.RefinementRequest `json:"request,omitempty"` + WorkItems []pipeline.RefinementWorkItem `json:"work_items,omitempty"` + Windows *RefinementWindowStats `json:"windows,omitempty"` + Queue decisionQueueStats `json:"queue,omitempty"` + Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` + Arbitration *ArbitrationSnapshot `json:"arbitration,omitempty"` } type DecisionDebug struct { @@ -45,6 +46,14 @@ type DecisionDebug struct { Items []compactDecision `json:"items,omitempty"` } +type ArbitrationSnapshot struct { + Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` + RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` + Queue decisionQueueStats `json:"queue,omitempty"` + DecisionSummary decisionSummary `json:"decision_summary,omitempty"` + DecisionItems []compactDecision `json:"decision_items,omitempty"` +} + type SpectrumFrame struct { Timestamp int64 `json:"ts"` CenterHz float64 `json:"center_hz"` diff --git a/internal/config/config.go b/internal/config/config.go index 96e8df0..7b720bb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -97,6 +97,7 @@ type SurveillanceConfig struct { type RefinementConfig struct { Enabled bool `yaml:"enabled" json:"enabled"` MaxConcurrent int `yaml:"max_concurrent" json:"max_concurrent"` + DetailFFTSize int `yaml:"detail_fft_size" json:"detail_fft_size"` MinCandidateSNRDb float64 `yaml:"min_candidate_snr_db" json:"min_candidate_snr_db"` MinSpanHz float64 `yaml:"min_span_hz" json:"min_span_hz"` MaxSpanHz float64 `yaml:"max_span_hz" json:"max_span_hz"` @@ -178,6 +179,7 @@ func Default() Config { Refinement: RefinementConfig{ Enabled: true, MaxConcurrent: 8, + DetailFFTSize: 0, MinCandidateSNRDb: 0, MinSpanHz: 0, MaxSpanHz: 0, @@ -205,6 +207,7 @@ func Default() Config { Refinement: &RefinementConfig{ Enabled: true, MaxConcurrent: 8, + DetailFFTSize: 0, MinCandidateSNRDb: 0, MinSpanHz: 0, MaxSpanHz: 0, @@ -235,6 +238,7 @@ func Default() Config { Refinement: &RefinementConfig{ Enabled: true, MaxConcurrent: 16, + DetailFFTSize: 0, MinCandidateSNRDb: 0, MinSpanHz: 4000, MaxSpanHz: 200000, @@ -265,6 +269,7 @@ func Default() Config { Refinement: &RefinementConfig{ Enabled: true, MaxConcurrent: 32, + DetailFFTSize: 0, MinCandidateSNRDb: 0, MinSpanHz: 6000, MaxSpanHz: 250000, @@ -295,6 +300,7 @@ func Default() Config { Refinement: &RefinementConfig{ Enabled: true, MaxConcurrent: 12, + DetailFFTSize: 0, MinCandidateSNRDb: 0, MinSpanHz: 4000, MaxSpanHz: 200000, @@ -325,6 +331,7 @@ func Default() Config { Refinement: &RefinementConfig{ Enabled: true, MaxConcurrent: 16, + DetailFFTSize: 0, MinCandidateSNRDb: 0, MinSpanHz: 3000, MaxSpanHz: 120000, @@ -503,6 +510,12 @@ func applyDefaults(cfg Config) Config { if cfg.Refinement.MaxConcurrent <= 0 { cfg.Refinement.MaxConcurrent = 8 } + if cfg.Refinement.DetailFFTSize <= 0 { + cfg.Refinement.DetailFFTSize = cfg.Surveillance.AnalysisFFTSize + } + if cfg.Refinement.DetailFFTSize&(cfg.Refinement.DetailFFTSize-1) != 0 { + cfg.Refinement.DetailFFTSize = cfg.Surveillance.AnalysisFFTSize + } if cfg.Refinement.MinSpanHz < 0 { cfg.Refinement.MinSpanHz = 0 } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index be87026..f60ce43 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -44,6 +44,9 @@ func TestLoadConfig(t *testing.T) { if cfg.Refinement.AutoSpan == nil || !*cfg.Refinement.AutoSpan { t.Fatalf("refinement auto_span default not applied") } + if cfg.Refinement.DetailFFTSize != cfg.Surveillance.AnalysisFFTSize { + t.Fatalf("refinement detail fft not aligned: %d vs %d", cfg.Refinement.DetailFFTSize, cfg.Surveillance.AnalysisFFTSize) + } if cfg.EventPath == "" { t.Fatalf("event path default not applied") } diff --git a/internal/pipeline/budget.go b/internal/pipeline/budget.go index 303d46e..af40772 100644 --- a/internal/pipeline/budget.go +++ b/internal/pipeline/budget.go @@ -20,10 +20,11 @@ type BudgetModel struct { func BudgetModelFromPolicy(policy Policy) BudgetModel { recordBias, decodeBias := budgetIntentBias(policy.Intent) + refBudget, refSource := refinementBudgetFromPolicy(policy) return BudgetModel{ Refinement: BudgetQueue{ - Max: policy.MaxRefinementJobs, - Source: "resources.max_refinement_jobs", + Max: refBudget, + Source: refSource, }, Record: BudgetQueue{ Max: policy.MaxRecordingStreams, @@ -42,6 +43,16 @@ func BudgetModelFromPolicy(policy Policy) BudgetModel { } } +func refinementBudgetFromPolicy(policy Policy) (int, string) { + budget := policy.MaxRefinementJobs + source := "resources.max_refinement_jobs" + if policy.RefinementMaxConcurrent > 0 && (budget <= 0 || policy.RefinementMaxConcurrent < budget) { + budget = policy.RefinementMaxConcurrent + source = "refinement.max_concurrent" + } + return budget, source +} + func budgetIntentBias(intent string) (float64, float64) { if intent == "" { return 0, 0 diff --git a/internal/pipeline/phases.go b/internal/pipeline/phases.go index abe83c8..bc705fb 100644 --- a/internal/pipeline/phases.go +++ b/internal/pipeline/phases.go @@ -38,6 +38,7 @@ type RefinementPlan struct { TotalCandidates int `json:"total_candidates"` MinCandidateSNRDb float64 `json:"min_candidate_snr_db"` Budget int `json:"budget"` + BudgetSource string `json:"budget_source,omitempty"` Strategy string `json:"strategy,omitempty"` StrategyReason string `json:"strategy_reason,omitempty"` MonitorStartHz float64 `json:"monitor_start_hz,omitempty"` diff --git a/internal/pipeline/policy.go b/internal/pipeline/policy.go index 84cb472..01565ed 100644 --- a/internal/pipeline/policy.go +++ b/internal/pipeline/policy.go @@ -22,6 +22,7 @@ type Policy struct { RefinementEnabled bool `json:"refinement_enabled"` MaxRefinementJobs int `json:"max_refinement_jobs"` RefinementMaxConcurrent int `json:"refinement_max_concurrent"` + RefinementDetailFFTSize int `json:"refinement_detail_fft_size"` MinCandidateSNRDb float64 `json:"min_candidate_snr_db"` RefinementMinSpanHz float64 `json:"refinement_min_span_hz"` RefinementMaxSpanHz float64 `json:"refinement_max_span_hz"` @@ -33,6 +34,10 @@ type Policy struct { } func PolicyFromConfig(cfg config.Config) Policy { + detailFFT := cfg.Refinement.DetailFFTSize + if detailFFT <= 0 { + detailFFT = cfg.Surveillance.AnalysisFFTSize + } p := Policy{ Mode: cfg.Pipeline.Mode, Profile: cfg.Pipeline.Profile, @@ -52,6 +57,7 @@ func PolicyFromConfig(cfg config.Config) Policy { RefinementEnabled: cfg.Refinement.Enabled, MaxRefinementJobs: cfg.Resources.MaxRefinementJobs, RefinementMaxConcurrent: cfg.Refinement.MaxConcurrent, + RefinementDetailFFTSize: detailFFT, MinCandidateSNRDb: cfg.Refinement.MinCandidateSNRDb, RefinementMinSpanHz: cfg.Refinement.MinSpanHz, RefinementMaxSpanHz: cfg.Refinement.MaxSpanHz, @@ -204,5 +210,8 @@ func ApplyNamedProfile(cfg *config.Config, name string) { if cfg.Resources.MaxDecodeJobs <= 0 { cfg.Resources.MaxDecodeJobs = cfg.Resources.MaxRecordingStreams } + if cfg.Refinement.DetailFFTSize <= 0 { + cfg.Refinement.DetailFFTSize = cfg.Surveillance.AnalysisFFTSize + } cfg.FFTSize = cfg.Surveillance.AnalysisFFTSize } diff --git a/internal/pipeline/policy_test.go b/internal/pipeline/policy_test.go index 2e3c066..432460c 100644 --- a/internal/pipeline/policy_test.go +++ b/internal/pipeline/policy_test.go @@ -30,6 +30,9 @@ func TestApplyNamedProfile(t *testing.T) { if cfg.Resources.MaxRefinementJobs < 16 { t.Fatalf("refinement jobs too small: %d", cfg.Resources.MaxRefinementJobs) } + if cfg.Refinement.DetailFFTSize != cfg.Surveillance.AnalysisFFTSize { + t.Fatalf("detail fft not aligned: %d vs %d", cfg.Refinement.DetailFFTSize, cfg.Surveillance.AnalysisFFTSize) + } } func TestPolicyFromConfig(t *testing.T) { @@ -45,6 +48,7 @@ func TestPolicyFromConfig(t *testing.T) { cfg.Surveillance.DisplayBins = 1200 cfg.Surveillance.DisplayFPS = 6 cfg.Refinement.Enabled = true + cfg.Refinement.DetailFFTSize = 4096 cfg.Resources.MaxRefinementJobs = 5 cfg.Refinement.MinCandidateSNRDb = 2.5 cfg.Resources.PreferGPU = true @@ -62,6 +66,9 @@ func TestPolicyFromConfig(t *testing.T) { if !p.RefinementEnabled || p.MaxRefinementJobs != 5 || p.MinCandidateSNRDb != 2.5 || !p.PreferGPU { t.Fatalf("unexpected policy details: %+v", p) } + if p.RefinementDetailFFTSize != 4096 { + t.Fatalf("unexpected refinement detail fft: %+v", p.RefinementDetailFFTSize) + } if p.MaxRecordingStreams != 7 { t.Fatalf("unexpected record budget: %+v", p.MaxRecordingStreams) } diff --git a/internal/pipeline/scheduler.go b/internal/pipeline/scheduler.go index 93f5ab6..3cf5e82 100644 --- a/internal/pipeline/scheduler.go +++ b/internal/pipeline/scheduler.go @@ -34,6 +34,7 @@ type RefinementScore struct { type RefinementWorkItem struct { Candidate Candidate `json:"candidate"` Window RefinementWindow `json:"window,omitempty"` + Execution *RefinementExecution `json:"execution,omitempty"` Priority float64 `json:"priority,omitempty"` Score *RefinementScore `json:"score,omitempty"` Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` @@ -41,6 +42,15 @@ type RefinementWorkItem struct { Reason string `json:"reason,omitempty"` } +type RefinementExecution struct { + Stage string `json:"stage,omitempty"` + SampleRate int `json:"sample_rate,omitempty"` + FFTSize int `json:"fft_size,omitempty"` + CenterHz float64 `json:"center_hz,omitempty"` + SpanHz float64 `json:"span_hz,omitempty"` + Source string `json:"source,omitempty"` +} + const ( RefinementStatusSelected = "selected" RefinementStatusDropped = "dropped" @@ -60,12 +70,14 @@ const ( // Current heuristic is intentionally simple and deterministic; later phases can add // richer scoring (novelty, persistence, profile-aware band priorities, decoder value). func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { - budget := refinementBudget(policy) strategy, strategyReason := refinementStrategy(policy) + budgetModel := BudgetModelFromPolicy(policy) + budget := budgetModel.Refinement.Max plan := RefinementPlan{ TotalCandidates: len(candidates), MinCandidateSNRDb: policy.MinCandidateSNRDb, Budget: budget, + BudgetSource: budgetModel.Refinement.Source, Strategy: strategy, StrategyReason: strategyReason, } @@ -85,6 +97,7 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { BandwidthWeight: bwWeight, PeakWeight: peakWeight, } + scoreModel = applyStrategyWeights(strategy, scoreModel) plan.ScoreModel = scoreModel scored := make([]ScheduledCandidate, 0, len(candidates)) workItems := make([]RefinementWorkItem, 0, len(candidates)) @@ -107,15 +120,15 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { }) continue } - snrScore := c.SNRDb * snrWeight + snrScore := c.SNRDb * scoreModel.SNRWeight bwScore := 0.0 peakScore := 0.0 policyBoost := CandidatePriorityBoost(policy, c.Hint) if c.BandwidthHz > 0 { - bwScore = minFloat64(c.BandwidthHz/25000.0, 6) * bwWeight + bwScore = minFloat64(c.BandwidthHz/25000.0, 6) * scoreModel.BandwidthWeight } if c.PeakDb > 0 { - peakScore = (c.PeakDb / 20.0) * peakWeight + peakScore = (c.PeakDb / 20.0) * scoreModel.PeakWeight } priority := snrScore + bwScore + peakScore + policyBoost score := &RefinementScore{ @@ -198,14 +211,6 @@ func ScheduleCandidates(candidates []Candidate, policy Policy) []ScheduledCandid return BuildRefinementPlan(candidates, policy).Selected } -func refinementBudget(policy Policy) int { - budget := policy.MaxRefinementJobs - if policy.RefinementMaxConcurrent > 0 && (budget <= 0 || policy.RefinementMaxConcurrent < budget) { - budget = policy.RefinementMaxConcurrent - } - return budget -} - func refinementStrategy(policy Policy) (string, string) { intent := strings.ToLower(strings.TrimSpace(policy.Intent)) switch { @@ -220,6 +225,28 @@ func refinementStrategy(policy Policy) (string, string) { } } +func applyStrategyWeights(strategy string, model RefinementScoreModel) RefinementScoreModel { + switch strings.ToLower(strings.TrimSpace(strategy)) { + case "digital-hunting": + model.SNRWeight *= 1.4 + model.BandwidthWeight *= 0.75 + model.PeakWeight *= 1.2 + case "archive-oriented": + model.SNRWeight *= 1.1 + model.BandwidthWeight *= 1.6 + model.PeakWeight *= 1.05 + case "multi-resolution", "multi", "multi-res", "multi_res": + model.SNRWeight *= 1.15 + model.BandwidthWeight *= 1.1 + model.PeakWeight *= 1.15 + case "single-resolution": + model.SNRWeight *= 1.1 + model.BandwidthWeight *= 1.0 + model.PeakWeight *= 1.0 + } + return model +} + func minFloat64(a, b float64) float64 { if a < b { return a diff --git a/internal/pipeline/scheduler_test.go b/internal/pipeline/scheduler_test.go index a37f748..fcfe36e 100644 --- a/internal/pipeline/scheduler_test.go +++ b/internal/pipeline/scheduler_test.go @@ -70,6 +70,9 @@ func TestBuildRefinementPlanRespectsMaxConcurrent(t *testing.T) { if plan.Budget != 2 { t.Fatalf("expected budget 2, got %d", plan.Budget) } + if plan.BudgetSource != "refinement.max_concurrent" { + t.Fatalf("expected budget source refinement.max_concurrent, got %s", plan.BudgetSource) + } if len(plan.Selected) != 2 { t.Fatalf("expected 2 selected, got %d", len(plan.Selected)) } @@ -159,6 +162,21 @@ func TestBuildRefinementPlanPriorityStats(t *testing.T) { } } +func TestBuildRefinementPlanStrategyBias(t *testing.T) { + policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0, Intent: "archive-and-triage"} + cands := []Candidate{ + {ID: 1, CenterHz: 100, SNRDb: 12, BandwidthHz: 5000, PeakDb: 1}, + {ID: 2, CenterHz: 200, SNRDb: 11, BandwidthHz: 100000, PeakDb: 1}, + } + plan := BuildRefinementPlan(cands, policy) + if len(plan.Selected) != 1 { + t.Fatalf("expected 1 selected, got %d", len(plan.Selected)) + } + if plan.Selected[0].Candidate.ID != 2 { + t.Fatalf("expected archive-oriented strategy to favor wider candidate, got %+v", plan.Selected[0]) + } +} + func findWorkItem(items []RefinementWorkItem, id int64) *RefinementWorkItem { for i := range items { if items[i].Candidate.ID == id { diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index 418f4d7..50001ce 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -32,6 +32,7 @@ type SurveillanceUpdate struct { type RefinementUpdate struct { Enabled *bool `json:"enabled"` MaxConcurrent *int `json:"max_concurrent"` + DetailFFTSize *int `json:"detail_fft_size"` MinCandidateSNRDb *float64 `json:"min_candidate_snr_db"` MinSpanHz *float64 `json:"min_span_hz"` MaxSpanHz *float64 `json:"max_span_hz"` @@ -261,6 +262,16 @@ func (m *Manager) ApplyConfig(update ConfigUpdate) (config.Config, error) { } next.Refinement.MaxConcurrent = *update.Refinement.MaxConcurrent } + if update.Refinement.DetailFFTSize != nil { + v := *update.Refinement.DetailFFTSize + if v <= 0 { + return m.cfg, errors.New("refinement.detail_fft_size must be > 0") + } + if v&(v-1) != 0 { + return m.cfg, errors.New("refinement.detail_fft_size must be a power of 2") + } + next.Refinement.DetailFFTSize = v + } if update.Refinement.MinCandidateSNRDb != nil { next.Refinement.MinCandidateSNRDb = *update.Refinement.MinCandidateSNRDb } diff --git a/internal/runtime/runtime_test.go b/internal/runtime/runtime_test.go index 4f78d78..aa8e6f9 100644 --- a/internal/runtime/runtime_test.go +++ b/internal/runtime/runtime_test.go @@ -36,6 +36,7 @@ func TestApplyConfigUpdate(t *testing.T) { minSpan := 4000.0 maxSpan := 200000.0 autoSpan := false + detailFFT := 1024 maxDecode := 12 decisionHold := 1500 updated, err := mgr.ApplyConfig(ConfigUpdate{ @@ -53,7 +54,7 @@ func TestApplyConfigUpdate(t *testing.T) { AutoDecodeClasses: &autoDecode, }, Surveillance: &SurveillanceUpdate{FrameRate: &survFPS, DisplayBins: &displayBins, DisplayFPS: &displayFPS}, - Refinement: &RefinementUpdate{MinSpanHz: &minSpan, MaxSpanHz: &maxSpan, AutoSpan: &autoSpan}, + Refinement: &RefinementUpdate{MinSpanHz: &minSpan, MaxSpanHz: &maxSpan, AutoSpan: &autoSpan, DetailFFTSize: &detailFFT}, Resources: &ResourcesUpdate{MaxRefinementJobs: &maxRefJobs, MaxDecodeJobs: &maxDecode, DecisionHoldMs: &decisionHold}, Detector: &DetectorUpdate{ ThresholdDb: &threshold, @@ -143,6 +144,9 @@ func TestApplyConfigUpdate(t *testing.T) { if updated.Refinement.MinSpanHz != minSpan || updated.Refinement.MaxSpanHz != maxSpan { t.Fatalf("refinement span not applied: %v / %v", updated.Refinement.MinSpanHz, updated.Refinement.MaxSpanHz) } + if updated.Refinement.DetailFFTSize != detailFFT { + t.Fatalf("refinement detail fft not applied: %v", updated.Refinement.DetailFFTSize) + } if updated.Refinement.AutoSpan == nil || *updated.Refinement.AutoSpan != autoSpan { t.Fatalf("refinement auto span not applied") } @@ -217,6 +221,17 @@ func TestApplyConfigRejectsInvalid(t *testing.T) { t.Fatalf("gap_tolerance_ms changed on error") } } + + { + mgr := New(cfg) + badDetail := 123 + if _, err := mgr.ApplyConfig(ConfigUpdate{Refinement: &RefinementUpdate{DetailFFTSize: &badDetail}}); err == nil { + t.Fatalf("expected refinement.detail_fft_size error") + } + if mgr.Snapshot().Refinement.DetailFFTSize != cfg.Refinement.DetailFFTSize { + t.Fatalf("detail fft changed on error") + } + } } func TestApplySettings(t *testing.T) {