From 45a91bcd623ce683bd4e385e9d2781017d1b9963 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sat, 21 Mar 2026 18:18:11 +0100 Subject: [PATCH] feat: make scheduling an explicit phase handoff --- cmd/sdrd/dsp_loop.go | 2 +- cmd/sdrd/phase_state_test.go | 8 ++++---- cmd/sdrd/pipeline_runtime.go | 12 +++++++----- internal/pipeline/phases.go | 16 +++++++++------- internal/pipeline/phases_test.go | 17 ++++++++++++++--- 5 files changed, 35 insertions(+), 20 deletions(-) diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index 6047902..b1ba8e1 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -62,7 +62,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * noiseFloor := state.surveillance.NoiseFloor var displaySignals []detector.Signal if len(art.iq) > 0 { - state.refinement = rt.refineSignals(art, extractMgr, rec) + state.refinement = rt.refineSignals(art, state.surveillance.Scheduled, extractMgr, rec) displaySignals = state.refinement.Signals if rec != nil && len(displaySignals) > 0 && len(art.allIQ) > 0 { aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult} diff --git a/cmd/sdrd/phase_state_test.go b/cmd/sdrd/phase_state_test.go index 9aadbf4..7843b3d 100644 --- a/cmd/sdrd/phase_state_test.go +++ b/cmd/sdrd/phase_state_test.go @@ -8,13 +8,13 @@ import ( func TestPhaseStateCarriesPhaseResults(t *testing.T) { ps := &phaseState{ - surveillance: pipeline.SurveillanceResult{NoiseFloor: -90}, - refinement: pipeline.RefinementResult{Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}}, + surveillance: pipeline.SurveillanceResult{NoiseFloor: -90, Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}}, + refinement: pipeline.RefinementResult{Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}}, } - if ps.surveillance.NoiseFloor != -90 { + if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 { t.Fatalf("unexpected surveillance state: %+v", ps.surveillance) } - if len(ps.refinement.Decisions) != 1 || !ps.refinement.Decisions[0].ShouldRecord { + if len(ps.refinement.Decisions) != 1 || !ps.refinement.Decisions[0].ShouldRecord || len(ps.refinement.Candidates) != 1 { t.Fatalf("unexpected refinement state: %+v", ps.refinement) } } diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index 83a2aa4..f30830f 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -216,8 +216,12 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S if art == nil { return pipeline.SurveillanceResult{} } + policy := pipeline.PolicyFromConfig(rt.cfg) + candidates := pipeline.CandidatesFromSignals(art.detected, "surveillance-detector") + scheduled := pipeline.ScheduleCandidates(candidates, policy) return pipeline.SurveillanceResult{ - Candidates: pipeline.CandidatesFromSignals(art.detected, "surveillance-detector"), + Candidates: candidates, + Scheduled: scheduled, Finished: art.finished, Signals: art.detected, NoiseFloor: art.noiseFloor, @@ -225,13 +229,11 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S } } -func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult { +func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, scheduled []pipeline.ScheduledCandidate, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult { if art == nil || len(art.iq) == 0 { return pipeline.RefinementResult{} } policy := pipeline.PolicyFromConfig(rt.cfg) - candidates := pipeline.CandidatesFromSignals(art.detected, "surveillance-detector") - scheduled := pipeline.ScheduleCandidates(candidates, policy) selectedCandidates := make([]pipeline.Candidate, 0, len(scheduled)) selectedSignals := make([]detector.Signal, 0, len(scheduled)) for _, sc := range scheduled { @@ -280,7 +282,7 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, extractMgr *extracti } } rt.det.UpdateClasses(signals) - return pipeline.RefinementResult{Signals: signals, Decisions: decisions} + return pipeline.RefinementResult{Signals: signals, Decisions: decisions, Candidates: selectedCandidates} } func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) { diff --git a/internal/pipeline/phases.go b/internal/pipeline/phases.go index 2cfdd16..2a044d2 100644 --- a/internal/pipeline/phases.go +++ b/internal/pipeline/phases.go @@ -3,14 +3,16 @@ package pipeline import "sdr-wideband-suite/internal/detector" type SurveillanceResult struct { - Candidates []Candidate `json:"candidates"` - Finished []detector.Event `json:"finished"` - Signals []detector.Signal `json:"signals"` - NoiseFloor float64 `json:"noise_floor"` - Thresholds []float64 `json:"thresholds,omitempty"` + Candidates []Candidate `json:"candidates"` + Scheduled []ScheduledCandidate `json:"scheduled,omitempty"` + Finished []detector.Event `json:"finished"` + Signals []detector.Signal `json:"signals"` + NoiseFloor float64 `json:"noise_floor"` + Thresholds []float64 `json:"thresholds,omitempty"` } type RefinementResult struct { - Signals []detector.Signal `json:"signals"` - Decisions []SignalDecision `json:"decisions,omitempty"` + Signals []detector.Signal `json:"signals"` + Decisions []SignalDecision `json:"decisions,omitempty"` + Candidates []Candidate `json:"candidates,omitempty"` } diff --git a/internal/pipeline/phases_test.go b/internal/pipeline/phases_test.go index 356067f..141b6fb 100644 --- a/internal/pipeline/phases_test.go +++ b/internal/pipeline/phases_test.go @@ -8,10 +8,21 @@ import ( func TestRefinementResultCarriesDecisions(t *testing.T) { res := RefinementResult{ - Signals: []detector.Signal{{ID: 1}}, - Decisions: []SignalDecision{{ShouldRecord: true}}, + Signals: []detector.Signal{{ID: 1}}, + Decisions: []SignalDecision{{ShouldRecord: true}}, + Candidates: []Candidate{{ID: 1}}, } - if len(res.Signals) != 1 || len(res.Decisions) != 1 { + if len(res.Signals) != 1 || len(res.Decisions) != 1 || len(res.Candidates) != 1 { t.Fatalf("unexpected refinement result: %+v", res) } } + +func TestSurveillanceResultCarriesScheduledCandidates(t *testing.T) { + res := SurveillanceResult{ + Candidates: []Candidate{{ID: 1}}, + Scheduled: []ScheduledCandidate{{Candidate: Candidate{ID: 1}, Priority: 10}}, + } + if len(res.Candidates) != 1 || len(res.Scheduled) != 1 { + t.Fatalf("unexpected surveillance result: %+v", res) + } +}