From a57b0034e5caaf1e6a0a1b86e29f897cd33aad54 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sat, 21 Mar 2026 21:59:21 +0100 Subject: [PATCH] feat: queue hold policy weighting and lowres level --- cmd/sdrd/decision_budget.go | 21 ++++++++++++++++----- cmd/sdrd/decision_budget_test.go | 2 +- cmd/sdrd/pipeline_runtime.go | 24 ++++++++++++++++++++++-- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/cmd/sdrd/decision_budget.go b/cmd/sdrd/decision_budget.go index 6824398..dcaf4d1 100644 --- a/cmd/sdrd/decision_budget.go +++ b/cmd/sdrd/decision_budget.go @@ -21,6 +21,8 @@ type decisionQueueStats struct { type queuedDecision struct { ID int64 SNRDb float64 + Hint string + Class string FirstSeen time.Time LastSeen time.Time } @@ -41,7 +43,7 @@ func newDecisionQueues() *decisionQueues { } } -func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int, hold time.Duration, now time.Time) decisionQueueStats { +func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int, hold time.Duration, now time.Time, policy pipeline.Policy) decisionQueueStats { if dq == nil { return decisionQueueStats{} } @@ -59,6 +61,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i dq.record[id] = qd } qd.SNRDb = decisions[i].Candidate.SNRDb + qd.Hint = decisions[i].Candidate.Hint + qd.Class = decisions[i].Class qd.LastSeen = now recSeen[id] = true } @@ -69,6 +73,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i dq.decode[id] = qd } qd.SNRDb = decisions[i].Candidate.SNRDb + qd.Hint = decisions[i].Candidate.Hint + qd.Class = decisions[i].Class qd.LastSeen = now decSeen[id] = true } @@ -87,8 +93,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i purgeExpired(dq.recordHold, now) purgeExpired(dq.decodeHold, now) - recSelected := selectQueued(dq.record, dq.recordHold, maxRecord, hold, now) - decSelected := selectQueued(dq.decode, dq.decodeHold, maxDecode, hold, now) + recSelected := selectQueued(dq.record, dq.recordHold, maxRecord, hold, now, policy) + decSelected := selectQueued(dq.decode, dq.decodeHold, maxDecode, hold, now, policy) stats := decisionQueueStats{ RecordQueued: len(dq.record), @@ -121,7 +127,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i return stats } -func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time) map[int64]struct{} { +func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy pipeline.Policy) map[int64]struct{} { selected := map[int64]struct{}{} if len(queue) == 0 { return selected @@ -137,7 +143,12 @@ func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max if boost > 5 { boost = 5 } - scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost}) + hint := qd.Hint + if hint == "" { + hint = qd.Class + } + policyBoost := pipeline.CandidatePriorityBoost(policy, hint) + scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost}) } sort.Slice(scoredList, func(i, j int) bool { return scoredList[i].score > scoredList[j].score diff --git a/cmd/sdrd/decision_budget_test.go b/cmd/sdrd/decision_budget_test.go index c3d5c40..5a674cc 100644 --- a/cmd/sdrd/decision_budget_test.go +++ b/cmd/sdrd/decision_budget_test.go @@ -14,7 +14,7 @@ func TestEnforceDecisionBudgets(t *testing.T) { {Candidate: pipeline.Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false}, } q := newDecisionQueues() - stats := q.Apply(decisions, 1, 1, 0, time.Now()) + stats := q.Apply(decisions, 1, 1, 0, time.Now(), pipeline.Policy{SignalPriorities: []string{"digital"}}) if stats.RecordSelected != 1 || stats.DecodeSelected != 1 { t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected) } diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index 4a89cf4..b1cc846 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -229,6 +229,22 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S SpanHz: float64(rt.cfg.SampleRate), Source: "baseband", } + lowRate := rt.cfg.SampleRate / 2 + lowFFT := rt.cfg.Surveillance.AnalysisFFTSize / 2 + if lowRate < 200000 { + lowRate = rt.cfg.SampleRate + } + if lowFFT < 256 { + lowFFT = rt.cfg.Surveillance.AnalysisFFTSize + } + lowLevel := pipeline.AnalysisLevel{ + Name: "surveillance-lowres", + SampleRate: lowRate, + FFTSize: lowFFT, + CenterHz: rt.cfg.CenterHz, + SpanHz: float64(lowRate), + Source: "downsampled", + } displayLevel := pipeline.AnalysisLevel{ Name: "presentation", SampleRate: rt.cfg.SampleRate, @@ -237,9 +253,13 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S SpanHz: float64(rt.cfg.SampleRate), Source: "display", } + levels := []pipeline.AnalysisLevel{level} + if lowLevel.SampleRate != level.SampleRate || lowLevel.FFTSize != level.FFTSize { + levels = append(levels, lowLevel) + } return pipeline.SurveillanceResult{ Level: level, - Levels: []pipeline.AnalysisLevel{level}, + Levels: levels, DisplayLevel: displayLevel, Candidates: candidates, Scheduled: scheduled, @@ -369,7 +389,7 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin maxRecord := rt.cfg.Resources.MaxRecordingStreams maxDecode := rt.cfg.Resources.MaxDecodeJobs hold := time.Duration(rt.cfg.Resources.DecisionHoldMs) * time.Millisecond - queueStats := rt.decisionQueues.Apply(decisions, maxRecord, maxDecode, hold, art.now) + queueStats := rt.decisionQueues.Apply(decisions, maxRecord, maxDecode, hold, art.now, policy) rt.queueStats = queueStats summary := summarizeDecisions(decisions) if rec != nil {