From 7c29f37ef4f499fd08547add596d295ea648507d Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sun, 22 Mar 2026 08:41:21 +0100 Subject: [PATCH] Consolidate arbitration core and normalize reasons --- cmd/sdrd/arbitration_snapshot.go | 4 +- cmd/sdrd/arbitrator.go | 36 ----------- cmd/sdrd/decision_budget_test.go | 32 ---------- cmd/sdrd/decision_summary.go | 2 +- cmd/sdrd/phase_state.go | 2 +- cmd/sdrd/phase_state_test.go | 2 +- cmd/sdrd/pipeline_runtime.go | 22 +++---- cmd/sdrd/types.go | 19 ++---- internal/pipeline/arbiter.go | 32 ++++++++++ internal/pipeline/arbitration.go | 28 ++++++--- internal/pipeline/arbitration_reasons.go | 26 ++++++++ internal/pipeline/arbitration_state.go | 17 ++++++ internal/pipeline/arbitration_test.go | 27 ++++++++- .../pipeline/decision_queue.go | 22 ++++--- internal/pipeline/decision_queue_test.go | 59 +++++++++++++++++++ internal/pipeline/decisions.go | 10 ++-- internal/pipeline/scheduler.go | 27 +++++---- internal/pipeline/scheduler_test.go | 11 ++++ 18 files changed, 246 insertions(+), 132 deletions(-) delete mode 100644 cmd/sdrd/arbitrator.go delete mode 100644 cmd/sdrd/decision_budget_test.go create mode 100644 internal/pipeline/arbiter.go create mode 100644 internal/pipeline/arbitration_reasons.go create mode 100644 internal/pipeline/arbitration_state.go rename cmd/sdrd/decision_budget.go => internal/pipeline/decision_queue.go (89%) create mode 100644 internal/pipeline/decision_queue_test.go diff --git a/cmd/sdrd/arbitration_snapshot.go b/cmd/sdrd/arbitration_snapshot.go index 0572cad..fe048e0 100644 --- a/cmd/sdrd/arbitration_snapshot.go +++ b/cmd/sdrd/arbitration_snapshot.go @@ -2,12 +2,12 @@ package main import "sdr-wideband-suite/internal/pipeline" -func buildArbitrationSnapshot(step pipeline.RefinementStep, arb arbitrationState) *ArbitrationSnapshot { +func buildArbitrationSnapshot(step pipeline.RefinementStep, arb pipeline.ArbitrationState) *ArbitrationSnapshot { return &ArbitrationSnapshot{ Budgets: &arb.Budgets, HoldPolicy: &arb.HoldPolicy, RefinementPlan: &step.Input.Plan, - RefinementAdmission: &step.Input.Admission, + RefinementAdmission: &arb.Refinement, Queue: arb.Queue, DecisionSummary: summarizeDecisions(step.Result.Decisions), DecisionItems: compactDecisions(step.Result.Decisions), diff --git a/cmd/sdrd/arbitrator.go b/cmd/sdrd/arbitrator.go deleted file mode 100644 index 6ff4712..0000000 --- a/cmd/sdrd/arbitrator.go +++ /dev/null @@ -1,36 +0,0 @@ -package main - -import ( - "time" - - "sdr-wideband-suite/internal/pipeline" -) - -type arbitrator struct { - refinementHold *pipeline.RefinementHold - queues *decisionQueues -} - -func newArbitrator() *arbitrator { - return &arbitrator{ - refinementHold: &pipeline.RefinementHold{Active: map[int64]time.Time{}}, - queues: newDecisionQueues(), - } -} - -func (a *arbitrator) AdmitRefinement(plan pipeline.RefinementPlan, policy pipeline.Policy, now time.Time) pipeline.RefinementAdmissionResult { - if a == nil { - return pipeline.AdmitRefinementPlan(plan, policy, now, nil) - } - if a.refinementHold == nil { - a.refinementHold = &pipeline.RefinementHold{Active: map[int64]time.Time{}} - } - return pipeline.AdmitRefinementPlan(plan, policy, now, a.refinementHold) -} - -func (a *arbitrator) ApplyDecisions(decisions []pipeline.SignalDecision, budget pipeline.BudgetModel, now time.Time, policy pipeline.Policy) decisionQueueStats { - if a == nil || a.queues == nil { - return decisionQueueStats{} - } - return a.queues.Apply(decisions, budget, now, policy) -} diff --git a/cmd/sdrd/decision_budget_test.go b/cmd/sdrd/decision_budget_test.go deleted file mode 100644 index 5a2eeb1..0000000 --- a/cmd/sdrd/decision_budget_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package main - -import ( - "testing" - "time" - - "sdr-wideband-suite/internal/pipeline" -) - -func TestEnforceDecisionBudgets(t *testing.T) { - decisions := []pipeline.SignalDecision{ - {Candidate: pipeline.Candidate{ID: 1, SNRDb: 5}, ShouldRecord: true, ShouldAutoDecode: true}, - {Candidate: pipeline.Candidate{ID: 2, SNRDb: 15}, ShouldRecord: true, ShouldAutoDecode: true}, - {Candidate: pipeline.Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false}, - } - q := newDecisionQueues() - policy := pipeline.Policy{SignalPriorities: []string{"digital"}, MaxRecordingStreams: 1, MaxDecodeJobs: 1} - budget := pipeline.BudgetModelFromPolicy(policy) - stats := q.Apply(decisions, budget, time.Now(), policy) - if stats.RecordSelected != 1 || stats.DecodeSelected != 1 { - t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected) - } - if !decisions[1].ShouldRecord || !decisions[1].ShouldAutoDecode { - t.Fatalf("expected highest SNR decision to remain allowed") - } - if decisions[0].ShouldRecord || decisions[0].ShouldAutoDecode { - t.Fatalf("expected lowest SNR decision to be budgeted off") - } - if decisions[2].ShouldRecord { - t.Fatalf("expected mid SNR decision to be budgeted off by record budget") - } -} diff --git a/cmd/sdrd/decision_summary.go b/cmd/sdrd/decision_summary.go index 9ffdab5..ec58804 100644 --- a/cmd/sdrd/decision_summary.go +++ b/cmd/sdrd/decision_summary.go @@ -21,7 +21,7 @@ func summarizeDecisions(decisions []pipeline.SignalDecision) decisionSummary { } reason := d.Reason if reason == "" { - reason = "unspecified" + reason = pipeline.DecisionReasonUnspecified } summary.Reasons[reason]++ } diff --git a/cmd/sdrd/phase_state.go b/cmd/sdrd/phase_state.go index 2a14df3..a2be478 100644 --- a/cmd/sdrd/phase_state.go +++ b/cmd/sdrd/phase_state.go @@ -5,6 +5,6 @@ import "sdr-wideband-suite/internal/pipeline" type phaseState struct { surveillance pipeline.SurveillanceResult refinement pipeline.RefinementStep - arbitration arbitrationState + arbitration pipeline.ArbitrationState presentation pipeline.AnalysisLevel } diff --git a/cmd/sdrd/phase_state_test.go b/cmd/sdrd/phase_state_test.go index 4ba2ac6..4acc2ba 100644 --- a/cmd/sdrd/phase_state_test.go +++ b/cmd/sdrd/phase_state_test.go @@ -13,7 +13,7 @@ func TestPhaseStateCarriesPhaseResults(t *testing.T) { Input: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6}, Result: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}}, }, - arbitration: arbitrationState{Queue: decisionQueueStats{RecordQueued: 1}}, + arbitration: pipeline.ArbitrationState{Queue: pipeline.DecisionQueueStats{RecordQueued: 1}}, presentation: pipeline.AnalysisLevel{Name: "presentation"}, } if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 { diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index 2535498..33676ae 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -42,8 +42,8 @@ type dspRuntime struct { rdsMap map[int64]*rdsState streamPhaseState map[int64]*streamExtractState streamOverlap *streamIQOverlap - arbiter *arbitrator - arbitration arbitrationState + arbiter *pipeline.Arbiter + arbitration pipeline.ArbitrationState gotSamples bool } @@ -79,7 +79,7 @@ func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, rdsMap: map[int64]*rdsState{}, streamPhaseState: map[int64]*streamExtractState{}, streamOverlap: &streamIQOverlap{}, - arbiter: newArbitrator(), + arbiter: pipeline.NewArbiter(), } if rt.useGPU && gpuState != nil { snap := gpuState.snapshot() @@ -426,17 +426,15 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now item.Reason = pipeline.RefinementReasonDisabled } input.Scheduled = nil - input.Request.Reason = pipeline.RefinementReasonDisabled - input.Admission.Reason = pipeline.RefinementReasonDisabled + input.Request.Reason = pipeline.ReasonAdmissionDisabled + input.Admission.Reason = pipeline.ReasonAdmissionDisabled input.Admission.Admitted = 0 input.Admission.Skipped = 0 input.Admission.Displaced = 0 input.Plan.Selected = nil input.Plan.DroppedByBudget = 0 } - rt.arbitration.Budgets = input.Budgets - rt.arbitration.Refinement = input.Admission - rt.arbitration.HoldPolicy = pipeline.HoldPolicyFromPolicy(policy) + rt.setArbitration(policy, input.Budgets, input.Admission, rt.arbitration.Queue) return input } @@ -508,9 +506,7 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin } budget := pipeline.BudgetModelFromPolicy(policy) queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy) - rt.arbitration.Budgets = budget - rt.arbitration.HoldPolicy = pipeline.HoldPolicyFromPolicy(policy) - rt.arbitration.Queue = queueStats + rt.setArbitration(policy, budget, input.Admission, queueStats) summary := summarizeDecisions(decisions) if rec != nil { if summary.RecordEnabled > 0 { @@ -705,3 +701,7 @@ func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pi items[i].Reason = pipeline.RefinementReasonCompleted } } + +func (rt *dspRuntime) setArbitration(policy pipeline.Policy, budget pipeline.BudgetModel, admission pipeline.RefinementAdmission, queue pipeline.DecisionQueueStats) { + rt.arbitration = pipeline.BuildArbitrationState(policy, budget, admission, queue) +} diff --git a/cmd/sdrd/types.go b/cmd/sdrd/types.go index e596984..50454a5 100644 --- a/cmd/sdrd/types.go +++ b/cmd/sdrd/types.go @@ -45,20 +45,13 @@ type DecisionDebug struct { } type ArbitrationSnapshot struct { - Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` - HoldPolicy *pipeline.HoldPolicy `json:"hold_policy,omitempty"` - RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` + Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` + HoldPolicy *pipeline.HoldPolicy `json:"hold_policy,omitempty"` + RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` RefinementAdmission *pipeline.RefinementAdmission `json:"refinement_admission,omitempty"` - Queue decisionQueueStats `json:"queue,omitempty"` - DecisionSummary decisionSummary `json:"decision_summary,omitempty"` - DecisionItems []compactDecision `json:"decision_items,omitempty"` -} - -type arbitrationState struct { - Budgets pipeline.BudgetModel - HoldPolicy pipeline.HoldPolicy - Refinement pipeline.RefinementAdmission - Queue decisionQueueStats + Queue pipeline.DecisionQueueStats `json:"queue,omitempty"` + DecisionSummary decisionSummary `json:"decision_summary,omitempty"` + DecisionItems []compactDecision `json:"decision_items,omitempty"` } type SpectrumFrame struct { diff --git a/internal/pipeline/arbiter.go b/internal/pipeline/arbiter.go new file mode 100644 index 0000000..60a1118 --- /dev/null +++ b/internal/pipeline/arbiter.go @@ -0,0 +1,32 @@ +package pipeline + +import "time" + +type Arbiter struct { + refinementHold *RefinementHold + queues *decisionQueues +} + +func NewArbiter() *Arbiter { + return &Arbiter{ + refinementHold: &RefinementHold{Active: map[int64]time.Time{}}, + queues: newDecisionQueues(), + } +} + +func (a *Arbiter) AdmitRefinement(plan RefinementPlan, policy Policy, now time.Time) RefinementAdmissionResult { + if a == nil { + return AdmitRefinementPlan(plan, policy, now, nil) + } + if a.refinementHold == nil { + a.refinementHold = &RefinementHold{Active: map[int64]time.Time{}} + } + return AdmitRefinementPlan(plan, policy, now, a.refinementHold) +} + +func (a *Arbiter) ApplyDecisions(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats { + if a == nil || a.queues == nil { + return DecisionQueueStats{} + } + return a.queues.Apply(decisions, budget, now, policy) +} diff --git a/internal/pipeline/arbitration.go b/internal/pipeline/arbitration.go index b617c7d..00f3193 100644 --- a/internal/pipeline/arbitration.go +++ b/internal/pipeline/arbitration.go @@ -52,25 +52,39 @@ func HoldPolicyFromPolicy(policy Policy) HoldPolicy { profile := strings.ToLower(strings.TrimSpace(policy.Profile)) strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) - if profileContains(profile, "archive") || strategyContains(strategy, "archive") { + archiveProfile := profileContains(profile, "archive") + archiveStrategy := strategyContains(strategy, "archive") + if archiveProfile || archiveStrategy { recMult *= 1.5 decMult *= 1.1 refMult *= 1.2 - reasons = append(reasons, "archive") + if archiveProfile { + reasons = append(reasons, HoldReasonProfileArchive) + } + if archiveStrategy { + reasons = append(reasons, HoldReasonStrategyArchive) + } } - if profileContains(profile, "digital") || strategyContains(strategy, "digital") { + digitalProfile := profileContains(profile, "digital") + digitalStrategy := strategyContains(strategy, "digital") + if digitalProfile || digitalStrategy { decMult *= 1.6 recMult *= 0.85 refMult *= 1.1 - reasons = append(reasons, "digital") + if digitalProfile { + reasons = append(reasons, HoldReasonProfileDigital) + } + if digitalStrategy { + reasons = append(reasons, HoldReasonStrategyDigital) + } } if profileContains(profile, "aggressive") { refMult *= 1.15 - reasons = append(reasons, "aggressive") + reasons = append(reasons, HoldReasonProfileAggressive) } if strategyContains(strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy)), "multi") { refMult *= 1.1 - reasons = append(reasons, "multi-resolution") + reasons = append(reasons, HoldReasonStrategyMultiRes) } return HoldPolicy{ @@ -95,7 +109,7 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold BudgetSource: plan.BudgetSource, } if len(ranked) == 0 { - admission.Reason = "no-candidates" + admission.Reason = ReasonAdmissionNoCandidates return RefinementAdmissionResult{Plan: plan, WorkItems: workItems, Admission: admission} } diff --git a/internal/pipeline/arbitration_reasons.go b/internal/pipeline/arbitration_reasons.go new file mode 100644 index 0000000..8485bf6 --- /dev/null +++ b/internal/pipeline/arbitration_reasons.go @@ -0,0 +1,26 @@ +package pipeline + +const ( + ReasonAdmissionNoCandidates = "admission:none:candidates" + ReasonAdmissionDisabled = "admission:disabled" +) + +const ( + DecisionReasonRecordClass = "decision:record:class" + DecisionReasonRecordHint = "decision:record:hint" + DecisionReasonDecodeClass = "decision:decode:class" + DecisionReasonDecodeHint = "decision:decode:hint" + DecisionReasonHintOnly = "decision:hint" + DecisionReasonQueueRecord = "queue:record:budget" + DecisionReasonQueueDecode = "queue:decode:budget" + DecisionReasonUnspecified = "decision:unspecified" +) + +const ( + HoldReasonProfileArchive = "profile:archive" + HoldReasonProfileDigital = "profile:digital" + HoldReasonProfileAggressive = "profile:aggressive" + HoldReasonStrategyArchive = "strategy:archive" + HoldReasonStrategyDigital = "strategy:digital" + HoldReasonStrategyMultiRes = "strategy:multi-resolution" +) diff --git a/internal/pipeline/arbitration_state.go b/internal/pipeline/arbitration_state.go new file mode 100644 index 0000000..99a84c3 --- /dev/null +++ b/internal/pipeline/arbitration_state.go @@ -0,0 +1,17 @@ +package pipeline + +type ArbitrationState struct { + Budgets BudgetModel `json:"budgets,omitempty"` + HoldPolicy HoldPolicy `json:"hold_policy,omitempty"` + Refinement RefinementAdmission `json:"refinement,omitempty"` + Queue DecisionQueueStats `json:"queue,omitempty"` +} + +func BuildArbitrationState(policy Policy, budget BudgetModel, admission RefinementAdmission, queue DecisionQueueStats) ArbitrationState { + return ArbitrationState{ + Budgets: budget, + HoldPolicy: HoldPolicyFromPolicy(policy), + Refinement: admission, + Queue: queue, + } +} diff --git a/internal/pipeline/arbitration_test.go b/internal/pipeline/arbitration_test.go index a24ed3c..de58f41 100644 --- a/internal/pipeline/arbitration_test.go +++ b/internal/pipeline/arbitration_test.go @@ -1,6 +1,9 @@ package pipeline -import "testing" +import ( + "testing" + "time" +) func TestHoldPolicyArchiveBiasesRecord(t *testing.T) { policy := Policy{DecisionHoldMs: 1000, Profile: "archive", RefinementStrategy: "archive-oriented"} @@ -11,6 +14,9 @@ func TestHoldPolicyArchiveBiasesRecord(t *testing.T) { if hold.RefinementMs <= hold.BaseMs { t.Fatalf("expected archive profile to extend refinement hold, got %d vs %d", hold.RefinementMs, hold.BaseMs) } + if !containsReason(hold.Reasons, HoldReasonProfileArchive) { + t.Fatalf("expected profile archive reason, got %+v", hold.Reasons) + } } func TestHoldPolicyDigitalBiasesDecode(t *testing.T) { @@ -19,4 +25,23 @@ func TestHoldPolicyDigitalBiasesDecode(t *testing.T) { if hold.DecodeMs <= hold.RecordMs { t.Fatalf("expected digital profile to favor decode hold, got decode=%d record=%d", hold.DecodeMs, hold.RecordMs) } + if !containsReason(hold.Reasons, HoldReasonProfileDigital) { + t.Fatalf("expected profile digital reason, got %+v", hold.Reasons) + } +} + +func TestAdmitRefinementPlanNoCandidatesReason(t *testing.T) { + res := AdmitRefinementPlan(RefinementPlan{}, Policy{}, time.Now(), &RefinementHold{Active: map[int64]time.Time{}}) + if res.Admission.Reason != ReasonAdmissionNoCandidates { + t.Fatalf("expected no-candidates reason, got %s", res.Admission.Reason) + } +} + +func containsReason(reasons []string, target string) bool { + for _, r := range reasons { + if r == target { + return true + } + } + return false } diff --git a/cmd/sdrd/decision_budget.go b/internal/pipeline/decision_queue.go similarity index 89% rename from cmd/sdrd/decision_budget.go rename to internal/pipeline/decision_queue.go index 77c409f..0b2604c 100644 --- a/cmd/sdrd/decision_budget.go +++ b/internal/pipeline/decision_queue.go @@ -1,13 +1,11 @@ -package main +package pipeline import ( "sort" "time" - - "sdr-wideband-suite/internal/pipeline" ) -type decisionQueueStats struct { +type DecisionQueueStats struct { RecordQueued int `json:"record_queued"` DecodeQueued int `json:"decode_queued"` RecordSelected int `json:"record_selected"` @@ -50,11 +48,11 @@ func newDecisionQueues() *decisionQueues { } } -func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipeline.BudgetModel, now time.Time, policy pipeline.Policy) decisionQueueStats { +func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats { if dq == nil { - return decisionQueueStats{} + return DecisionQueueStats{} } - holdPolicy := pipeline.HoldPolicyFromPolicy(policy) + holdPolicy := HoldPolicyFromPolicy(policy) recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond recSeen := map[int64]bool{} @@ -106,7 +104,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy) decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy) - stats := decisionQueueStats{ + stats := DecisionQueueStats{ RecordQueued: len(dq.record), DecodeQueued: len(dq.decode), RecordSelected: len(recSelected), @@ -127,7 +125,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe if decisions[i].ShouldRecord { if _, ok := recSelected[id]; !ok { decisions[i].ShouldRecord = false - decisions[i].Reason = "queued: record budget" + decisions[i].Reason = DecisionReasonQueueRecord stats.RecordDropped++ } } @@ -135,7 +133,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe if _, ok := decSelected[id]; !ok { decisions[i].ShouldAutoDecode = false if decisions[i].Reason == "" { - decisions[i].Reason = "queued: decode budget" + decisions[i].Reason = DecisionReasonQueueDecode } stats.DecodeDropped++ } @@ -144,7 +142,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe return stats } -func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy pipeline.Policy) map[int64]struct{} { +func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy) map[int64]struct{} { selected := map[int64]struct{}{} if len(queue) == 0 { return selected @@ -164,7 +162,7 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in if hint == "" { hint = qd.Class } - policyBoost := pipeline.DecisionPriorityBoost(policy, hint, qd.Class, queueName) + policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName) scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost}) } sort.Slice(scoredList, func(i, j int) bool { diff --git a/internal/pipeline/decision_queue_test.go b/internal/pipeline/decision_queue_test.go new file mode 100644 index 0000000..2edb80f --- /dev/null +++ b/internal/pipeline/decision_queue_test.go @@ -0,0 +1,59 @@ +package pipeline + +import ( + "testing" + "time" +) + +func TestDecisionQueueDropsByBudget(t *testing.T) { + arbiter := NewArbiter() + decisions := []SignalDecision{ + {Candidate: Candidate{ID: 1, SNRDb: 12}, ShouldRecord: true, ShouldAutoDecode: true}, + {Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: true}, + } + budget := BudgetModel{ + Record: BudgetQueue{Max: 1}, + Decode: BudgetQueue{Max: 1}, + } + stats := arbiter.ApplyDecisions(decisions, budget, time.Now(), Policy{DecisionHoldMs: 250}) + if stats.RecordDropped == 0 || stats.DecodeDropped == 0 { + t.Fatalf("expected drops by budget, got %+v", stats) + } + allowed := 0 + for _, d := range decisions { + if d.ShouldRecord || d.ShouldAutoDecode { + allowed++ + continue + } + if d.Reason != DecisionReasonQueueRecord && d.Reason != DecisionReasonQueueDecode { + t.Fatalf("unexpected decision reason: %s", d.Reason) + } + } + if allowed != 1 { + t.Fatalf("expected 1 decision allowed, got %d", allowed) + } +} + +func TestDecisionQueueEnforcesBudgets(t *testing.T) { + decisions := []SignalDecision{ + {Candidate: Candidate{ID: 1, SNRDb: 5}, ShouldRecord: true, ShouldAutoDecode: true}, + {Candidate: Candidate{ID: 2, SNRDb: 15}, ShouldRecord: true, ShouldAutoDecode: true}, + {Candidate: Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false}, + } + arbiter := NewArbiter() + policy := Policy{SignalPriorities: []string{"digital"}, MaxRecordingStreams: 1, MaxDecodeJobs: 1} + budget := BudgetModelFromPolicy(policy) + stats := arbiter.ApplyDecisions(decisions, budget, time.Now(), policy) + if stats.RecordSelected != 1 || stats.DecodeSelected != 1 { + t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected) + } + if !decisions[1].ShouldRecord || !decisions[1].ShouldAutoDecode { + t.Fatalf("expected highest SNR decision to remain allowed") + } + if decisions[0].ShouldRecord || decisions[0].ShouldAutoDecode { + t.Fatalf("expected lowest SNR decision to be budgeted off") + } + if decisions[2].ShouldRecord { + t.Fatalf("expected mid SNR decision to be budgeted off by record budget") + } +} diff --git a/internal/pipeline/decisions.go b/internal/pipeline/decisions.go index 4e6a18a..b458d03 100644 --- a/internal/pipeline/decisions.go +++ b/internal/pipeline/decisions.go @@ -24,24 +24,24 @@ func DecideSignalAction(policy Policy, candidate Candidate, cls *classifier.Clas } if classTag != "" && WantsClass(policy.AutoRecordClasses, classTag) { decision.ShouldRecord = true - decision.Reason = "matched auto_record_classes" + decision.Reason = DecisionReasonRecordClass } else if classTag == "" && hintTag != "" && WantsClass(policy.AutoRecordClasses, hintTag) { decision.ShouldRecord = true - decision.Reason = "matched auto_record_classes (hint)" + decision.Reason = DecisionReasonRecordHint } if classTag != "" && WantsClass(policy.AutoDecodeClasses, classTag) { decision.ShouldAutoDecode = true if decision.Reason == "" { - decision.Reason = "matched auto_decode_classes" + decision.Reason = DecisionReasonDecodeClass } } else if classTag == "" && hintTag != "" && WantsClass(policy.AutoDecodeClasses, hintTag) { decision.ShouldAutoDecode = true if decision.Reason == "" { - decision.Reason = "matched auto_decode_classes (hint)" + decision.Reason = DecisionReasonDecodeHint } } if decision.Reason == "" && candidate.Hint != "" { - decision.Reason = "policy evaluated candidate hint" + decision.Reason = DecisionReasonHintOnly } return decision } diff --git a/internal/pipeline/scheduler.go b/internal/pipeline/scheduler.go index 21c0d89..16892db 100644 --- a/internal/pipeline/scheduler.go +++ b/internal/pipeline/scheduler.go @@ -62,16 +62,16 @@ const ( ) const ( - RefinementReasonPlanned = "planned" - RefinementReasonAdmitted = "admitted" - RefinementReasonRunning = "running" - RefinementReasonCompleted = "completed" - RefinementReasonMonitorGate = "dropped:monitor" - RefinementReasonBelowSNR = "dropped:snr" - RefinementReasonBudget = "skipped:budget" - RefinementReasonDisabled = "dropped:disabled" - RefinementReasonUnclassified = "dropped:unclassified" - RefinementReasonDisplaced = "skipped:displaced" + RefinementReasonPlanned = "refinement:planned" + RefinementReasonAdmitted = "refinement:admitted" + RefinementReasonRunning = "refinement:running" + RefinementReasonCompleted = "refinement:completed" + RefinementReasonMonitorGate = "refinement:drop:monitor" + RefinementReasonBelowSNR = "refinement:drop:snr" + RefinementReasonBudget = "refinement:skip:budget" + RefinementReasonDisabled = "refinement:drop:disabled" + RefinementReasonUnclassified = "refinement:drop:unclassified" + RefinementReasonDisplaced = "refinement:skip:displaced" ) // BuildRefinementPlan scores and ranks candidates for costly local refinement. @@ -203,7 +203,14 @@ func ScheduleCandidates(candidates []Candidate, policy Policy) []ScheduledCandid func refinementStrategy(policy Policy) (string, string) { intent := strings.ToLower(strings.TrimSpace(policy.Intent)) + profile := strings.ToLower(strings.TrimSpace(policy.Profile)) switch { + case strings.Contains(profile, "digital"): + return "digital-hunting", "profile" + case strings.Contains(profile, "archive"): + return "archive-oriented", "profile" + case strings.Contains(profile, "aggressive"): + return "multi-resolution", "profile" case strings.Contains(intent, "digital") || strings.Contains(intent, "hunt") || strings.Contains(intent, "decode"): return "digital-hunting", "intent" case strings.Contains(intent, "archive") || strings.Contains(intent, "triage") || strings.Contains(policy.Mode, "archive"): diff --git a/internal/pipeline/scheduler_test.go b/internal/pipeline/scheduler_test.go index 10018bd..b1012b4 100644 --- a/internal/pipeline/scheduler_test.go +++ b/internal/pipeline/scheduler_test.go @@ -226,6 +226,17 @@ func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) { } } +func TestRefinementStrategyUsesProfile(t *testing.T) { + strategy, reason := refinementStrategy(Policy{Profile: "digital-hunting"}) + if strategy != "digital-hunting" || reason != "profile" { + t.Fatalf("expected digital profile to set strategy, got %s (%s)", strategy, reason) + } + strategy, reason = refinementStrategy(Policy{Profile: "archive"}) + if strategy != "archive-oriented" || reason != "profile" { + t.Fatalf("expected archive profile to set strategy, got %s (%s)", strategy, reason) + } +} + func findWorkItem(items []RefinementWorkItem, id int64) *RefinementWorkItem { for i := range items { if items[i].Candidate.ID == id {