From 645d9f40e7754e54697fc25da1cb4c6b229002ee Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sat, 21 Mar 2026 21:47:17 +0100 Subject: [PATCH] feat: add decision hold window for queues --- cmd/sdrd/decision_budget.go | 62 +++++++++++++++++++++++++++----- cmd/sdrd/decision_budget_test.go | 2 +- cmd/sdrd/pipeline_runtime.go | 3 +- config.yaml | 1 + internal/config/config.go | 8 +++++ internal/pipeline/policy.go | 2 ++ 6 files changed, 67 insertions(+), 11 deletions(-) diff --git a/cmd/sdrd/decision_budget.go b/cmd/sdrd/decision_budget.go index 7fe8517..6824398 100644 --- a/cmd/sdrd/decision_budget.go +++ b/cmd/sdrd/decision_budget.go @@ -12,6 +12,8 @@ type decisionQueueStats struct { DecodeQueued int `json:"decode_queued"` RecordSelected int `json:"record_selected"` DecodeSelected int `json:"decode_selected"` + RecordActive int `json:"record_active"` + DecodeActive int `json:"decode_active"` RecordOldestS float64 `json:"record_oldest_sec"` DecodeOldestS float64 `json:"decode_oldest_sec"` } @@ -24,15 +26,22 @@ type queuedDecision struct { } type decisionQueues struct { - record map[int64]*queuedDecision - decode map[int64]*queuedDecision + record map[int64]*queuedDecision + decode map[int64]*queuedDecision + recordHold map[int64]time.Time + decodeHold map[int64]time.Time } func newDecisionQueues() *decisionQueues { - return &decisionQueues{record: map[int64]*queuedDecision{}, decode: map[int64]*queuedDecision{}} + return &decisionQueues{ + record: map[int64]*queuedDecision{}, + decode: map[int64]*queuedDecision{}, + recordHold: map[int64]time.Time{}, + decodeHold: map[int64]time.Time{}, + } } -func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int, now time.Time) decisionQueueStats { +func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int, hold time.Duration, now time.Time) decisionQueueStats { if dq == nil { return decisionQueueStats{} } @@ -75,14 +84,19 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i } } - recSelected := selectQueued(dq.record, maxRecord, now) - decSelected := selectQueued(dq.decode, maxDecode, now) + purgeExpired(dq.recordHold, now) + purgeExpired(dq.decodeHold, now) + + recSelected := selectQueued(dq.record, dq.recordHold, maxRecord, hold, now) + decSelected := selectQueued(dq.decode, dq.decodeHold, maxDecode, hold, now) stats := decisionQueueStats{ RecordQueued: len(dq.record), DecodeQueued: len(dq.decode), RecordSelected: len(recSelected), DecodeSelected: len(decSelected), + RecordActive: len(dq.recordHold), + DecodeActive: len(dq.decodeHold), RecordOldestS: oldestAge(dq.record, now), DecodeOldestS: oldestAge(dq.decode, now), } @@ -107,7 +121,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i return stats } -func selectQueued(queue map[int64]*queuedDecision, max int, now time.Time) map[int64]struct{} { +func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time) map[int64]struct{} { selected := map[int64]struct{}{} if len(queue) == 0 { return selected @@ -132,12 +146,42 @@ func selectQueued(queue map[int64]*queuedDecision, max int, now time.Time) map[i if limit <= 0 || limit > len(scoredList) { limit = len(scoredList) } - for i := 0; i < limit; i++ { - selected[scoredList[i].id] = struct{}{} + if len(hold) > 0 && len(hold) > limit { + limit = len(hold) + if limit > len(scoredList) { + limit = len(scoredList) + } + } + for id := range hold { + if _, ok := queue[id]; ok { + selected[id] = struct{}{} + } + } + for _, s := range scoredList { + if len(selected) >= limit { + break + } + if _, ok := selected[s.id]; ok { + continue + } + selected[s.id] = struct{}{} + } + if holdDur > 0 { + for id := range selected { + hold[id] = now.Add(holdDur) + } } return selected } +func purgeExpired(hold map[int64]time.Time, now time.Time) { + for id, until := range hold { + if now.After(until) { + delete(hold, id) + } + } +} + func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 { oldest := 0.0 first := true diff --git a/cmd/sdrd/decision_budget_test.go b/cmd/sdrd/decision_budget_test.go index ef216fb..c3d5c40 100644 --- a/cmd/sdrd/decision_budget_test.go +++ b/cmd/sdrd/decision_budget_test.go @@ -14,7 +14,7 @@ func TestEnforceDecisionBudgets(t *testing.T) { {Candidate: pipeline.Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false}, } q := newDecisionQueues() - stats := q.Apply(decisions, 1, 1, time.Now()) + stats := q.Apply(decisions, 1, 1, 0, time.Now()) if stats.RecordSelected != 1 || stats.DecodeSelected != 1 { t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected) } diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index a9d8484..4a89cf4 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -368,7 +368,8 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin } maxRecord := rt.cfg.Resources.MaxRecordingStreams maxDecode := rt.cfg.Resources.MaxDecodeJobs - queueStats := rt.decisionQueues.Apply(decisions, maxRecord, maxDecode, art.now) + hold := time.Duration(rt.cfg.Resources.DecisionHoldMs) * time.Millisecond + queueStats := rt.decisionQueues.Apply(decisions, maxRecord, maxDecode, hold, art.now) rt.queueStats = queueStats summary := summarizeDecisions(decisions) if rec != nil { diff --git a/config.yaml b/config.yaml index 235ce49..93a8750 100644 --- a/config.yaml +++ b/config.yaml @@ -70,6 +70,7 @@ resources: max_refinement_jobs: 8 max_recording_streams: 16 max_decode_jobs: 16 + decision_hold_ms: 2000 detector: threshold_db: -20 min_duration_ms: 250 diff --git a/internal/config/config.go b/internal/config/config.go index d8cfa34..75e580c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -107,6 +107,7 @@ type ResourceConfig struct { MaxRefinementJobs int `yaml:"max_refinement_jobs" json:"max_refinement_jobs"` MaxRecordingStreams int `yaml:"max_recording_streams" json:"max_recording_streams"` MaxDecodeJobs int `yaml:"max_decode_jobs" json:"max_decode_jobs"` + DecisionHoldMs int `yaml:"decision_hold_ms" json:"decision_hold_ms"` } type ProfileConfig struct { @@ -186,6 +187,7 @@ func Default() Config { MaxRefinementJobs: 8, MaxRecordingStreams: 16, MaxDecodeJobs: 16, + DecisionHoldMs: 2000, }, Profiles: []ProfileConfig{ {Name: "legacy", Description: "Current single-band pipeline behavior", Pipeline: &PipelineConfig{Mode: "legacy", Goals: PipelineGoalConfig{Intent: "general-monitoring"}}}, @@ -375,6 +377,12 @@ func applyDefaults(cfg Config) Config { if cfg.Resources.MaxRecordingStreams <= 0 { cfg.Resources.MaxRecordingStreams = 16 } + if cfg.Resources.DecisionHoldMs < 0 { + cfg.Resources.DecisionHoldMs = 0 + } + if cfg.Resources.DecisionHoldMs == 0 { + cfg.Resources.DecisionHoldMs = 2000 + } if cfg.FrameRate <= 0 { cfg.FrameRate = 15 } diff --git a/internal/pipeline/policy.go b/internal/pipeline/policy.go index 9753c79..4b61a17 100644 --- a/internal/pipeline/policy.go +++ b/internal/pipeline/policy.go @@ -24,6 +24,7 @@ type Policy struct { RefinementAutoSpan bool `json:"refinement_auto_span"` PreferGPU bool `json:"prefer_gpu"` MaxDecodeJobs int `json:"max_decode_jobs"` + DecisionHoldMs int `json:"decision_hold_ms"` } func PolicyFromConfig(cfg config.Config) Policy { @@ -49,6 +50,7 @@ func PolicyFromConfig(cfg config.Config) Policy { RefinementAutoSpan: config.BoolValue(cfg.Refinement.AutoSpan, true), PreferGPU: cfg.Resources.PreferGPU, MaxDecodeJobs: cfg.Resources.MaxDecodeJobs, + DecisionHoldMs: cfg.Resources.DecisionHoldMs, } }