From 39611f981daaf21efd783cf423c2665a82fdfe79 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sat, 21 Mar 2026 21:22:43 +0100 Subject: [PATCH] feat: add decision queues and queue stats --- cmd/sdrd/decision_budget.go | 161 ++++++++++++++++++++++++++----- cmd/sdrd/decision_budget_test.go | 14 +-- cmd/sdrd/dsp_loop.go | 9 ++ cmd/sdrd/http_handlers.go | 2 + cmd/sdrd/phase_state.go | 2 + cmd/sdrd/phase_state_test.go | 2 + cmd/sdrd/pipeline_runtime.go | 5 +- 7 files changed, 165 insertions(+), 30 deletions(-) diff --git a/cmd/sdrd/decision_budget.go b/cmd/sdrd/decision_budget.go index 29553c6..7fe8517 100644 --- a/cmd/sdrd/decision_budget.go +++ b/cmd/sdrd/decision_budget.go @@ -2,39 +2,154 @@ package main import ( "sort" + "time" "sdr-wideband-suite/internal/pipeline" ) -func enforceDecisionBudgets(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int) (int, int) { - recorded := 0 - decoded := 0 - order := make([]int, len(decisions)) +type decisionQueueStats struct { + RecordQueued int `json:"record_queued"` + DecodeQueued int `json:"decode_queued"` + RecordSelected int `json:"record_selected"` + DecodeSelected int `json:"decode_selected"` + RecordOldestS float64 `json:"record_oldest_sec"` + DecodeOldestS float64 `json:"decode_oldest_sec"` +} + +type queuedDecision struct { + ID int64 + SNRDb float64 + FirstSeen time.Time + LastSeen time.Time +} + +type decisionQueues struct { + record map[int64]*queuedDecision + decode map[int64]*queuedDecision +} + +func newDecisionQueues() *decisionQueues { + return &decisionQueues{record: map[int64]*queuedDecision{}, decode: map[int64]*queuedDecision{}} +} + +func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int, now time.Time) decisionQueueStats { + if dq == nil { + return decisionQueueStats{} + } + recSeen := map[int64]bool{} + decSeen := map[int64]bool{} for i := range decisions { - order[i] = i + id := decisions[i].Candidate.ID + if id == 0 { + continue + } + if decisions[i].ShouldRecord { + qd := dq.record[id] + if qd == nil { + qd = &queuedDecision{ID: id, FirstSeen: now} + dq.record[id] = qd + } + qd.SNRDb = decisions[i].Candidate.SNRDb + qd.LastSeen = now + recSeen[id] = true + } + if decisions[i].ShouldAutoDecode { + qd := dq.decode[id] + if qd == nil { + qd = &queuedDecision{ID: id, FirstSeen: now} + dq.decode[id] = qd + } + qd.SNRDb = decisions[i].Candidate.SNRDb + qd.LastSeen = now + decSeen[id] = true + } } - sort.SliceStable(order, func(i, j int) bool { - return decisions[order[i]].Candidate.SNRDb > decisions[order[j]].Candidate.SNRDb - }) - for _, idx := range order { - if decisions[idx].ShouldRecord { - if maxRecord > 0 && recorded >= maxRecord { - decisions[idx].ShouldRecord = false - decisions[idx].Reason = "recording budget exceeded" - } else { - recorded++ + for id := range dq.record { + if !recSeen[id] { + delete(dq.record, id) + } + } + for id := range dq.decode { + if !decSeen[id] { + delete(dq.decode, id) + } + } + + recSelected := selectQueued(dq.record, maxRecord, now) + decSelected := selectQueued(dq.decode, maxDecode, now) + + stats := decisionQueueStats{ + RecordQueued: len(dq.record), + DecodeQueued: len(dq.decode), + RecordSelected: len(recSelected), + DecodeSelected: len(decSelected), + RecordOldestS: oldestAge(dq.record, now), + DecodeOldestS: oldestAge(dq.decode, now), + } + + for i := range decisions { + id := decisions[i].Candidate.ID + if decisions[i].ShouldRecord { + if _, ok := recSelected[id]; !ok { + decisions[i].ShouldRecord = false + decisions[i].Reason = "queued: record budget" } } - if decisions[idx].ShouldAutoDecode { - if maxDecode > 0 && decoded >= maxDecode { - decisions[idx].ShouldAutoDecode = false - if decisions[idx].Reason == "" { - decisions[idx].Reason = "decode budget exceeded" + if decisions[i].ShouldAutoDecode { + if _, ok := decSelected[id]; !ok { + decisions[i].ShouldAutoDecode = false + if decisions[i].Reason == "" { + decisions[i].Reason = "queued: decode budget" } - } else { - decoded++ } } } - return recorded, decoded + return stats +} + +func selectQueued(queue map[int64]*queuedDecision, max int, now time.Time) map[int64]struct{} { + selected := map[int64]struct{}{} + if len(queue) == 0 { + return selected + } + type scored struct { + id int64 + score float64 + } + scoredList := make([]scored, 0, len(queue)) + for id, qd := range queue { + age := now.Sub(qd.FirstSeen).Seconds() + boost := age / 2.0 + if boost > 5 { + boost = 5 + } + scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost}) + } + sort.Slice(scoredList, func(i, j int) bool { + return scoredList[i].score > scoredList[j].score + }) + limit := max + if limit <= 0 || limit > len(scoredList) { + limit = len(scoredList) + } + for i := 0; i < limit; i++ { + selected[scoredList[i].id] = struct{}{} + } + return selected +} + +func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 { + oldest := 0.0 + first := true + for _, qd := range queue { + age := now.Sub(qd.FirstSeen).Seconds() + if first || age > oldest { + oldest = age + first = false + } + } + if first { + return 0 + } + return oldest } diff --git a/cmd/sdrd/decision_budget_test.go b/cmd/sdrd/decision_budget_test.go index f519ea6..ef216fb 100644 --- a/cmd/sdrd/decision_budget_test.go +++ b/cmd/sdrd/decision_budget_test.go @@ -2,19 +2,21 @@ package main import ( "testing" + "time" "sdr-wideband-suite/internal/pipeline" ) func TestEnforceDecisionBudgets(t *testing.T) { decisions := []pipeline.SignalDecision{ - {Candidate: pipeline.Candidate{SNRDb: 5}, ShouldRecord: true, ShouldAutoDecode: true}, - {Candidate: pipeline.Candidate{SNRDb: 15}, ShouldRecord: true, ShouldAutoDecode: true}, - {Candidate: pipeline.Candidate{SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false}, + {Candidate: pipeline.Candidate{ID: 1, SNRDb: 5}, ShouldRecord: true, ShouldAutoDecode: true}, + {Candidate: pipeline.Candidate{ID: 2, SNRDb: 15}, ShouldRecord: true, ShouldAutoDecode: true}, + {Candidate: pipeline.Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false}, } - recorded, decoded := enforceDecisionBudgets(decisions, 1, 1) - if recorded != 1 || decoded != 1 { - t.Fatalf("unexpected counts: record=%d decode=%d", recorded, decoded) + q := newDecisionQueues() + stats := q.Apply(decisions, 1, 1, time.Now()) + if stats.RecordSelected != 1 || stats.DecodeSelected != 1 { + t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected) } if !decisions[1].ShouldRecord || !decisions[1].ShouldAutoDecode { t.Fatalf("expected highest SNR decision to remain allowed") diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index 10eb856..b0ac21e 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -92,6 +92,15 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * state.refinement = pipeline.RefinementResult{} displaySignals = rt.det.StableSignals() } + state.queueStats = rt.queueStats + state.presentation = pipeline.AnalysisLevel{ + Name: "presentation", + SampleRate: rt.cfg.SampleRate, + FFTSize: rt.cfg.Surveillance.DisplayBins, + CenterHz: rt.cfg.CenterHz, + SpanHz: float64(rt.cfg.SampleRate), + Source: "display", + } if phaseSnap != nil { phaseSnap.Set(*state) } diff --git a/cmd/sdrd/http_handlers.go b/cmd/sdrd/http_handlers.go index 1f33583..ef07bcb 100644 --- a/cmd/sdrd/http_handlers.go +++ b/cmd/sdrd/http_handlers.go @@ -159,6 +159,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime "plan": snap.refinementInput.Plan, "windows": snap.refinementInput.Windows, "window_stats": windowStats, + "queue_stats": snap.queueStats, "candidates": len(snap.refinementInput.Candidates), "scheduled": len(snap.refinementInput.Scheduled), "signals": len(snap.refinement.Signals), @@ -167,6 +168,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime "decision_items": compactDecisions(snap.refinement.Decisions), "surveillance_level": snap.surveillance.Level, "refinement_level": snap.refinementInput.Level, + "presentation_level": snap.presentation, } _ = json.NewEncoder(w).Encode(out) }) diff --git a/cmd/sdrd/phase_state.go b/cmd/sdrd/phase_state.go index dba7007..c3bc58d 100644 --- a/cmd/sdrd/phase_state.go +++ b/cmd/sdrd/phase_state.go @@ -6,4 +6,6 @@ type phaseState struct { surveillance pipeline.SurveillanceResult refinementInput pipeline.RefinementInput refinement pipeline.RefinementResult + queueStats decisionQueueStats + presentation pipeline.AnalysisLevel } diff --git a/cmd/sdrd/phase_state_test.go b/cmd/sdrd/phase_state_test.go index 2afb90e..9438501 100644 --- a/cmd/sdrd/phase_state_test.go +++ b/cmd/sdrd/phase_state_test.go @@ -11,6 +11,8 @@ func TestPhaseStateCarriesPhaseResults(t *testing.T) { surveillance: pipeline.SurveillanceResult{NoiseFloor: -90, Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}}, refinementInput: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6}, refinement: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}}, + queueStats: decisionQueueStats{RecordQueued: 1}, + presentation: pipeline.AnalysisLevel{Name: "presentation"}, } if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 { t.Fatalf("unexpected surveillance state: %+v", ps.surveillance) diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index 7acc9cc..465c99e 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -39,6 +39,8 @@ type dspRuntime struct { rdsMap map[int64]*rdsState streamPhaseState map[int64]*streamExtractState streamOverlap *streamIQOverlap + decisionQueues *decisionQueues + queueStats decisionQueueStats gotSamples bool } @@ -356,7 +358,8 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin } maxRecord := rt.cfg.Resources.MaxRecordingStreams maxDecode := rt.cfg.Resources.MaxDecodeJobs - enforceDecisionBudgets(decisions, maxRecord, maxDecode) + queueStats := rt.decisionQueues.Apply(decisions, maxRecord, maxDecode, art.now) + rt.queueStats = queueStats summary := summarizeDecisions(decisions) if rec != nil { if summary.RecordEnabled > 0 {