From 4b9a48fe3f4598fd426836ee62a2589999d466d4 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sun, 22 Mar 2026 04:30:51 +0100 Subject: [PATCH] dsp: decouple refinement step --- cmd/sdrd/decision_budget.go | 8 +- cmd/sdrd/dsp_loop.go | 14 ++-- cmd/sdrd/http_handlers.go | 20 ++--- cmd/sdrd/phase_snapshot_test.go | 6 +- cmd/sdrd/phase_state.go | 9 +-- cmd/sdrd/phase_state_test.go | 20 ++--- cmd/sdrd/pipeline_runtime.go | 131 +++++++++++++++++--------------- 7 files changed, 109 insertions(+), 99 deletions(-) diff --git a/cmd/sdrd/decision_budget.go b/cmd/sdrd/decision_budget.go index dcaf4d1..cda24e6 100644 --- a/cmd/sdrd/decision_budget.go +++ b/cmd/sdrd/decision_budget.go @@ -93,8 +93,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i purgeExpired(dq.recordHold, now) purgeExpired(dq.decodeHold, now) - recSelected := selectQueued(dq.record, dq.recordHold, maxRecord, hold, now, policy) - decSelected := selectQueued(dq.decode, dq.decodeHold, maxDecode, hold, now, policy) + recSelected := selectQueued("record", dq.record, dq.recordHold, maxRecord, hold, now, policy) + decSelected := selectQueued("decode", dq.decode, dq.decodeHold, maxDecode, hold, now, policy) stats := decisionQueueStats{ RecordQueued: len(dq.record), @@ -127,7 +127,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i return stats } -func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy pipeline.Policy) map[int64]struct{} { +func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy pipeline.Policy) map[int64]struct{} { selected := map[int64]struct{}{} if len(queue) == 0 { return selected @@ -147,7 +147,7 @@ func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max if hint == "" { hint = qd.Class } - policyBoost := pipeline.CandidatePriorityBoost(policy, hint) + policyBoost := pipeline.DecisionPriorityBoost(policy, hint, qd.Class, queueName) scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost}) } sort.Slice(scoredList, func(i, j int) bool { diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index b0ac21e..af888cb 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -58,14 +58,13 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * rt.gotSamples = true } state.surveillance = rt.buildSurveillanceResult(art) - state.refinementInput = rt.buildRefinementInput(state.surveillance) + state.refinement = rt.runRefinement(art, state.surveillance, extractMgr, rec) finished := state.surveillance.Finished thresholds := state.surveillance.Thresholds noiseFloor := state.surveillance.NoiseFloor var displaySignals []detector.Signal - if len(art.iq) > 0 { - state.refinement = rt.refineSignals(art, state.refinementInput, extractMgr, rec) - displaySignals = state.refinement.Signals + if len(art.detailIQ) > 0 { + displaySignals = state.refinement.Result.Signals if rec != nil && len(displaySignals) > 0 && len(art.allIQ) > 0 { aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult} streamSnips, streamRates := extractForStreaming(extractMgr, art.allIQ, rt.cfg.SampleRate, rt.cfg.CenterHz, displaySignals, rt.streamPhaseState, rt.streamOverlap, aqCfg) @@ -89,7 +88,6 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } rt.maintenance(displaySignals, rec) } else { - state.refinement = pipeline.RefinementResult{} displaySignals = rt.det.StableSignals() } state.queueStats = rt.queueStats @@ -119,8 +117,8 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * rec.OnEvents(evCopy) } var debugInfo *SpectrumDebug - plan := state.refinementInput.Plan - windowStats := buildWindowStats(state.refinementInput.Windows) + plan := state.refinement.Input.Plan + windowStats := buildWindowStats(state.refinement.Input.Windows) hasPlan := plan.TotalCandidates > 0 || plan.Budget > 0 || plan.DroppedBySNR > 0 || plan.DroppedByBudget > 0 hasWindows := windowStats != nil && windowStats.Count > 0 if len(thresholds) > 0 || len(displaySignals) > 0 || noiseFloor != 0 || hasPlan || hasWindows { @@ -150,7 +148,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * debugInfo.Windows = windowStats } } - h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.spectrum, Signals: displaySignals, Debug: debugInfo}) + h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.surveillanceSpectrum, Signals: displaySignals, Debug: debugInfo}) } } } diff --git a/cmd/sdrd/http_handlers.go b/cmd/sdrd/http_handlers.go index b145cc8..6e645f9 100644 --- a/cmd/sdrd/http_handlers.go +++ b/cmd/sdrd/http_handlers.go @@ -154,22 +154,22 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime mux.HandleFunc("/api/refinement", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") snap := phaseSnap.Snapshot() - windowStats := buildWindowStats(snap.refinementInput.Windows) + windowStats := buildWindowStats(snap.refinement.Input.Windows) out := map[string]any{ - "plan": snap.refinementInput.Plan, - "windows": snap.refinementInput.Windows, + "plan": snap.refinement.Input.Plan, + "windows": snap.refinement.Input.Windows, "window_stats": windowStats, "queue_stats": snap.queueStats, - "candidates": len(snap.refinementInput.Candidates), - "scheduled": len(snap.refinementInput.Scheduled), - "signals": len(snap.refinement.Signals), - "decisions": len(snap.refinement.Decisions), - "decision_summary": summarizeDecisions(snap.refinement.Decisions), - "decision_items": compactDecisions(snap.refinement.Decisions), + "candidates": len(snap.refinement.Input.Candidates), + "scheduled": len(snap.refinement.Input.Scheduled), + "signals": len(snap.refinement.Result.Signals), + "decisions": len(snap.refinement.Result.Decisions), + "decision_summary": summarizeDecisions(snap.refinement.Result.Decisions), + "decision_items": compactDecisions(snap.refinement.Result.Decisions), "surveillance_level": snap.surveillance.Level, "surveillance_levels": snap.surveillance.Levels, "display_level": snap.surveillance.DisplayLevel, - "refinement_level": snap.refinementInput.Level, + "refinement_level": snap.refinement.Input.Level, "presentation_level": snap.presentation, } _ = json.NewEncoder(w).Encode(out) diff --git a/cmd/sdrd/phase_snapshot_test.go b/cmd/sdrd/phase_snapshot_test.go index 10a217d..62a5a39 100644 --- a/cmd/sdrd/phase_snapshot_test.go +++ b/cmd/sdrd/phase_snapshot_test.go @@ -6,13 +6,13 @@ func TestPhaseSnapshotSetGet(t *testing.T) { snap := &phaseSnapshot{} state := phaseState{} state.surveillance.NoiseFloor = -91 - state.refinementInput.SampleRate = 2048000 + state.refinement.Input.SampleRate = 2048000 snap.Set(state) got := snap.Snapshot() if got.surveillance.NoiseFloor != -91 { t.Fatalf("unexpected noise floor: %v", got.surveillance.NoiseFloor) } - if got.refinementInput.SampleRate != 2048000 { - t.Fatalf("unexpected sample rate: %v", got.refinementInput.SampleRate) + if got.refinement.Input.SampleRate != 2048000 { + t.Fatalf("unexpected sample rate: %v", got.refinement.Input.SampleRate) } } diff --git a/cmd/sdrd/phase_state.go b/cmd/sdrd/phase_state.go index c3bc58d..f30fdcb 100644 --- a/cmd/sdrd/phase_state.go +++ b/cmd/sdrd/phase_state.go @@ -3,9 +3,8 @@ package main import "sdr-wideband-suite/internal/pipeline" type phaseState struct { - surveillance pipeline.SurveillanceResult - refinementInput pipeline.RefinementInput - refinement pipeline.RefinementResult - queueStats decisionQueueStats - presentation pipeline.AnalysisLevel + surveillance pipeline.SurveillanceResult + refinement pipeline.RefinementStep + queueStats decisionQueueStats + presentation pipeline.AnalysisLevel } diff --git a/cmd/sdrd/phase_state_test.go b/cmd/sdrd/phase_state_test.go index ab29a04..0eeb7c8 100644 --- a/cmd/sdrd/phase_state_test.go +++ b/cmd/sdrd/phase_state_test.go @@ -8,19 +8,21 @@ import ( func TestPhaseStateCarriesPhaseResults(t *testing.T) { ps := &phaseState{ - surveillance: pipeline.SurveillanceResult{Level: pipeline.AnalysisLevel{Name: "surveillance"}, NoiseFloor: -90, Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}}, - refinementInput: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6}, - refinement: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}}, - queueStats: decisionQueueStats{RecordQueued: 1}, - presentation: pipeline.AnalysisLevel{Name: "presentation"}, + surveillance: pipeline.SurveillanceResult{Level: pipeline.AnalysisLevel{Name: "surveillance"}, NoiseFloor: -90, Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}}, + refinement: pipeline.RefinementStep{ + Input: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6}, + Result: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}}, + }, + queueStats: decisionQueueStats{RecordQueued: 1}, + presentation: pipeline.AnalysisLevel{Name: "presentation"}, } if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 { t.Fatalf("unexpected surveillance state: %+v", ps.surveillance) } - if len(ps.refinementInput.Scheduled) != 1 || ps.refinementInput.SampleRate != 2048000 { - t.Fatalf("unexpected refinement input: %+v", ps.refinementInput) + if len(ps.refinement.Input.Scheduled) != 1 || ps.refinement.Input.SampleRate != 2048000 { + t.Fatalf("unexpected refinement input: %+v", ps.refinement.Input) } - if len(ps.refinement.Decisions) != 1 || !ps.refinement.Decisions[0].ShouldRecord || len(ps.refinement.Candidates) != 1 { - t.Fatalf("unexpected refinement state: %+v", ps.refinement) + if len(ps.refinement.Result.Decisions) != 1 || !ps.refinement.Result.Decisions[0].ShouldRecord || len(ps.refinement.Result.Candidates) != 1 { + t.Fatalf("unexpected refinement state: %+v", ps.refinement.Result) } } diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index b1cc846..2d10d89 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -45,14 +45,16 @@ type dspRuntime struct { } type spectrumArtifacts struct { - allIQ []complex64 - iq []complex64 - spectrum []float64 - finished []detector.Event - detected []detector.Signal - thresholds []float64 - noiseFloor float64 - now time.Time + allIQ []complex64 + surveillanceIQ []complex64 + detailIQ []complex64 + surveillanceSpectrum []float64 + detailSpectrum []float64 + finished []detector.Event + detected []detector.Signal + thresholds []float64 + noiseFloor float64 + now time.Time } func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime { @@ -160,27 +162,27 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag if rec != nil { rec.Ingest(time.Now(), allIQ) } - iq := allIQ + survIQ := allIQ if len(allIQ) > rt.cfg.FFTSize { - iq = allIQ[len(allIQ)-rt.cfg.FFTSize:] + survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:] } if rt.dcEnabled { - dcBlocker.Apply(iq) + dcBlocker.Apply(survIQ) } if rt.iqEnabled { - dsp.IQBalance(iq) + dsp.IQBalance(survIQ) } var spectrum []float64 if rt.useGPU && rt.gpuEngine != nil { - gpuBuf := make([]complex64, len(iq)) - if len(rt.window) == len(iq) { - for i := 0; i < len(iq); i++ { - v := iq[i] + gpuBuf := make([]complex64, len(survIQ)) + if len(rt.window) == len(survIQ) { + for i := 0; i < len(survIQ); i++ { + v := survIQ[i] w := float32(rt.window[i]) gpuBuf[i] = complex(real(v)*w, imag(v)*w) } } else { - copy(gpuBuf, iq) + copy(gpuBuf, survIQ) } out, err := rt.gpuEngine.Exec(gpuBuf) if err != nil { @@ -193,7 +195,7 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag spectrum = fftutil.SpectrumFromFFT(out) } } else { - spectrum = fftutil.SpectrumWithPlan(iq, rt.window, rt.plan) + spectrum = fftutil.SpectrumWithPlan(survIQ, rt.window, rt.plan) } for i := range spectrum { if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) { @@ -203,14 +205,16 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag now := time.Now() finished, detected := rt.det.Process(now, spectrum, rt.cfg.CenterHz) return &spectrumArtifacts{ - allIQ: allIQ, - iq: iq, - spectrum: spectrum, - finished: finished, - detected: detected, - thresholds: rt.det.LastThresholds(), - noiseFloor: rt.det.LastNoiseFloor(), - now: now, + allIQ: allIQ, + surveillanceIQ: survIQ, + detailIQ: survIQ, + surveillanceSpectrum: spectrum, + detailSpectrum: spectrum, + finished: finished, + detected: detected, + thresholds: rt.det.LastThresholds(), + noiseFloor: rt.det.LastNoiseFloor(), + now: now, }, nil } @@ -226,7 +230,7 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S SampleRate: rt.cfg.SampleRate, FFTSize: rt.cfg.Surveillance.AnalysisFFTSize, CenterHz: rt.cfg.CenterHz, - SpanHz: float64(rt.cfg.SampleRate), + SpanHz: spanForPolicy(policy, float64(rt.cfg.SampleRate)), Source: "baseband", } lowRate := rt.cfg.SampleRate / 2 @@ -242,7 +246,7 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S SampleRate: lowRate, FFTSize: lowFFT, CenterHz: rt.cfg.CenterHz, - SpanHz: float64(lowRate), + SpanHz: spanForPolicy(policy, float64(lowRate)), Source: "downsampled", } displayLevel := pipeline.AnalysisLevel{ @@ -250,13 +254,10 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S SampleRate: rt.cfg.SampleRate, FFTSize: rt.cfg.Surveillance.DisplayBins, CenterHz: rt.cfg.CenterHz, - SpanHz: float64(rt.cfg.SampleRate), + SpanHz: spanForPolicy(policy, float64(rt.cfg.SampleRate)), Source: "display", } - levels := []pipeline.AnalysisLevel{level} - if lowLevel.SampleRate != level.SampleRate || lowLevel.FFTSize != level.FFTSize { - levels = append(levels, lowLevel) - } + levels := surveillanceLevels(policy, level, lowLevel) return pipeline.SurveillanceResult{ Level: level, Levels: levels, @@ -279,36 +280,14 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip } windows := make([]pipeline.RefinementWindow, 0, len(scheduled)) for _, sc := range scheduled { - span := sc.Candidate.BandwidthHz - windowSource := "candidate" - if policy.RefinementAutoSpan && (span <= 0 || span < 2000 || span > 400000) { - autoSpan, autoSource := pipeline.AutoSpanForHint(sc.Candidate.Hint) - if autoSpan > 0 { - span = autoSpan - windowSource = autoSource - } - } - if policy.RefinementMinSpanHz > 0 && span < policy.RefinementMinSpanHz { - span = policy.RefinementMinSpanHz - } - if policy.RefinementMaxSpanHz > 0 && span > policy.RefinementMaxSpanHz { - span = policy.RefinementMaxSpanHz - } - if span <= 0 { - span = 12000 - } - windows = append(windows, pipeline.RefinementWindow{ - CenterHz: sc.Candidate.CenterHz, - SpanHz: span, - Source: windowSource, - }) + windows = append(windows, pipeline.RefinementWindowForCandidate(policy, sc.Candidate)) } level := pipeline.AnalysisLevel{ Name: "refinement", SampleRate: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, CenterHz: rt.cfg.CenterHz, - SpanHz: float64(rt.cfg.SampleRate), + SpanHz: spanForPolicy(policy, float64(rt.cfg.SampleRate)), Source: "refinement-window", } input := pipeline.RefinementInput{ @@ -328,8 +307,14 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip return input } +func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep { + input := rt.buildRefinementInput(surv) + result := rt.refineSignals(art, input, extractMgr, rec) + return pipeline.RefinementStep{Input: input, Result: result} +} + func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.RefinementInput, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult { - if art == nil || len(art.iq) == 0 || len(input.Scheduled) == 0 { + if art == nil || len(art.detailIQ) == 0 || len(input.Scheduled) == 0 { return pipeline.RefinementResult{} } policy := pipeline.PolicyFromConfig(rt.cfg) @@ -360,8 +345,8 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin if centerHz == 0 { centerHz = rt.cfg.CenterHz } - snips, snipRates := extractSignalIQBatch(extractMgr, art.iq, sampleRate, centerHz, selectedSignals) - refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.spectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode)) + snips, snipRates := extractSignalIQBatch(extractMgr, art.detailIQ, sampleRate, centerHz, selectedSignals) + refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.detailSpectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode)) signals := make([]detector.Signal, 0, len(refined)) decisions := make([]pipeline.SignalDecision, 0, len(refined)) for i, ref := range refined { @@ -498,3 +483,29 @@ func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorde _ = aqCfg } } + +func spanForPolicy(policy pipeline.Policy, fallback float64) float64 { + if policy.MonitorSpanHz > 0 { + return policy.MonitorSpanHz + } + if policy.MonitorStartHz != 0 && policy.MonitorEndHz != 0 && policy.MonitorEndHz > policy.MonitorStartHz { + return policy.MonitorEndHz - policy.MonitorStartHz + } + return fallback +} + +func surveillanceLevels(policy pipeline.Policy, primary pipeline.AnalysisLevel, secondary pipeline.AnalysisLevel) []pipeline.AnalysisLevel { + levels := []pipeline.AnalysisLevel{primary} + strategy := strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy)) + switch strategy { + case "", "single-resolution": + if secondary.SampleRate != primary.SampleRate || secondary.FFTSize != primary.FFTSize { + levels = append(levels, secondary) + } + default: + if secondary.SampleRate != primary.SampleRate || secondary.FFTSize != primary.FFTSize { + levels = append(levels, secondary) + } + } + return levels +}