| @@ -2,12 +2,12 @@ package main | |||||
| import "sdr-wideband-suite/internal/pipeline" | import "sdr-wideband-suite/internal/pipeline" | ||||
| func buildArbitrationSnapshot(step pipeline.RefinementStep, arb arbitrationState) *ArbitrationSnapshot { | |||||
| func buildArbitrationSnapshot(step pipeline.RefinementStep, arb pipeline.ArbitrationState) *ArbitrationSnapshot { | |||||
| return &ArbitrationSnapshot{ | return &ArbitrationSnapshot{ | ||||
| Budgets: &arb.Budgets, | Budgets: &arb.Budgets, | ||||
| HoldPolicy: &arb.HoldPolicy, | HoldPolicy: &arb.HoldPolicy, | ||||
| RefinementPlan: &step.Input.Plan, | RefinementPlan: &step.Input.Plan, | ||||
| RefinementAdmission: &step.Input.Admission, | |||||
| RefinementAdmission: &arb.Refinement, | |||||
| Queue: arb.Queue, | Queue: arb.Queue, | ||||
| DecisionSummary: summarizeDecisions(step.Result.Decisions), | DecisionSummary: summarizeDecisions(step.Result.Decisions), | ||||
| DecisionItems: compactDecisions(step.Result.Decisions), | DecisionItems: compactDecisions(step.Result.Decisions), | ||||
| @@ -1,36 +0,0 @@ | |||||
| package main | |||||
| import ( | |||||
| "time" | |||||
| "sdr-wideband-suite/internal/pipeline" | |||||
| ) | |||||
| type arbitrator struct { | |||||
| refinementHold *pipeline.RefinementHold | |||||
| queues *decisionQueues | |||||
| } | |||||
| func newArbitrator() *arbitrator { | |||||
| return &arbitrator{ | |||||
| refinementHold: &pipeline.RefinementHold{Active: map[int64]time.Time{}}, | |||||
| queues: newDecisionQueues(), | |||||
| } | |||||
| } | |||||
| func (a *arbitrator) AdmitRefinement(plan pipeline.RefinementPlan, policy pipeline.Policy, now time.Time) pipeline.RefinementAdmissionResult { | |||||
| if a == nil { | |||||
| return pipeline.AdmitRefinementPlan(plan, policy, now, nil) | |||||
| } | |||||
| if a.refinementHold == nil { | |||||
| a.refinementHold = &pipeline.RefinementHold{Active: map[int64]time.Time{}} | |||||
| } | |||||
| return pipeline.AdmitRefinementPlan(plan, policy, now, a.refinementHold) | |||||
| } | |||||
| func (a *arbitrator) ApplyDecisions(decisions []pipeline.SignalDecision, budget pipeline.BudgetModel, now time.Time, policy pipeline.Policy) decisionQueueStats { | |||||
| if a == nil || a.queues == nil { | |||||
| return decisionQueueStats{} | |||||
| } | |||||
| return a.queues.Apply(decisions, budget, now, policy) | |||||
| } | |||||
| @@ -1,32 +0,0 @@ | |||||
| package main | |||||
| import ( | |||||
| "testing" | |||||
| "time" | |||||
| "sdr-wideband-suite/internal/pipeline" | |||||
| ) | |||||
| func TestEnforceDecisionBudgets(t *testing.T) { | |||||
| decisions := []pipeline.SignalDecision{ | |||||
| {Candidate: pipeline.Candidate{ID: 1, SNRDb: 5}, ShouldRecord: true, ShouldAutoDecode: true}, | |||||
| {Candidate: pipeline.Candidate{ID: 2, SNRDb: 15}, ShouldRecord: true, ShouldAutoDecode: true}, | |||||
| {Candidate: pipeline.Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false}, | |||||
| } | |||||
| q := newDecisionQueues() | |||||
| policy := pipeline.Policy{SignalPriorities: []string{"digital"}, MaxRecordingStreams: 1, MaxDecodeJobs: 1} | |||||
| budget := pipeline.BudgetModelFromPolicy(policy) | |||||
| stats := q.Apply(decisions, budget, time.Now(), policy) | |||||
| if stats.RecordSelected != 1 || stats.DecodeSelected != 1 { | |||||
| t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected) | |||||
| } | |||||
| if !decisions[1].ShouldRecord || !decisions[1].ShouldAutoDecode { | |||||
| t.Fatalf("expected highest SNR decision to remain allowed") | |||||
| } | |||||
| if decisions[0].ShouldRecord || decisions[0].ShouldAutoDecode { | |||||
| t.Fatalf("expected lowest SNR decision to be budgeted off") | |||||
| } | |||||
| if decisions[2].ShouldRecord { | |||||
| t.Fatalf("expected mid SNR decision to be budgeted off by record budget") | |||||
| } | |||||
| } | |||||
| @@ -21,7 +21,7 @@ func summarizeDecisions(decisions []pipeline.SignalDecision) decisionSummary { | |||||
| } | } | ||||
| reason := d.Reason | reason := d.Reason | ||||
| if reason == "" { | if reason == "" { | ||||
| reason = "unspecified" | |||||
| reason = pipeline.DecisionReasonUnspecified | |||||
| } | } | ||||
| summary.Reasons[reason]++ | summary.Reasons[reason]++ | ||||
| } | } | ||||
| @@ -5,6 +5,6 @@ import "sdr-wideband-suite/internal/pipeline" | |||||
| type phaseState struct { | type phaseState struct { | ||||
| surveillance pipeline.SurveillanceResult | surveillance pipeline.SurveillanceResult | ||||
| refinement pipeline.RefinementStep | refinement pipeline.RefinementStep | ||||
| arbitration arbitrationState | |||||
| arbitration pipeline.ArbitrationState | |||||
| presentation pipeline.AnalysisLevel | presentation pipeline.AnalysisLevel | ||||
| } | } | ||||
| @@ -13,7 +13,7 @@ func TestPhaseStateCarriesPhaseResults(t *testing.T) { | |||||
| Input: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6}, | Input: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6}, | ||||
| Result: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}}, | Result: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}}, | ||||
| }, | }, | ||||
| arbitration: arbitrationState{Queue: decisionQueueStats{RecordQueued: 1}}, | |||||
| arbitration: pipeline.ArbitrationState{Queue: pipeline.DecisionQueueStats{RecordQueued: 1}}, | |||||
| presentation: pipeline.AnalysisLevel{Name: "presentation"}, | presentation: pipeline.AnalysisLevel{Name: "presentation"}, | ||||
| } | } | ||||
| if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 { | if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 { | ||||
| @@ -42,8 +42,8 @@ type dspRuntime struct { | |||||
| rdsMap map[int64]*rdsState | rdsMap map[int64]*rdsState | ||||
| streamPhaseState map[int64]*streamExtractState | streamPhaseState map[int64]*streamExtractState | ||||
| streamOverlap *streamIQOverlap | streamOverlap *streamIQOverlap | ||||
| arbiter *arbitrator | |||||
| arbitration arbitrationState | |||||
| arbiter *pipeline.Arbiter | |||||
| arbitration pipeline.ArbitrationState | |||||
| gotSamples bool | gotSamples bool | ||||
| } | } | ||||
| @@ -79,7 +79,7 @@ func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, | |||||
| rdsMap: map[int64]*rdsState{}, | rdsMap: map[int64]*rdsState{}, | ||||
| streamPhaseState: map[int64]*streamExtractState{}, | streamPhaseState: map[int64]*streamExtractState{}, | ||||
| streamOverlap: &streamIQOverlap{}, | streamOverlap: &streamIQOverlap{}, | ||||
| arbiter: newArbitrator(), | |||||
| arbiter: pipeline.NewArbiter(), | |||||
| } | } | ||||
| if rt.useGPU && gpuState != nil { | if rt.useGPU && gpuState != nil { | ||||
| snap := gpuState.snapshot() | snap := gpuState.snapshot() | ||||
| @@ -426,17 +426,15 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now | |||||
| item.Reason = pipeline.RefinementReasonDisabled | item.Reason = pipeline.RefinementReasonDisabled | ||||
| } | } | ||||
| input.Scheduled = nil | input.Scheduled = nil | ||||
| input.Request.Reason = pipeline.RefinementReasonDisabled | |||||
| input.Admission.Reason = pipeline.RefinementReasonDisabled | |||||
| input.Request.Reason = pipeline.ReasonAdmissionDisabled | |||||
| input.Admission.Reason = pipeline.ReasonAdmissionDisabled | |||||
| input.Admission.Admitted = 0 | input.Admission.Admitted = 0 | ||||
| input.Admission.Skipped = 0 | input.Admission.Skipped = 0 | ||||
| input.Admission.Displaced = 0 | input.Admission.Displaced = 0 | ||||
| input.Plan.Selected = nil | input.Plan.Selected = nil | ||||
| input.Plan.DroppedByBudget = 0 | input.Plan.DroppedByBudget = 0 | ||||
| } | } | ||||
| rt.arbitration.Budgets = input.Budgets | |||||
| rt.arbitration.Refinement = input.Admission | |||||
| rt.arbitration.HoldPolicy = pipeline.HoldPolicyFromPolicy(policy) | |||||
| rt.setArbitration(policy, input.Budgets, input.Admission, rt.arbitration.Queue) | |||||
| return input | return input | ||||
| } | } | ||||
| @@ -508,9 +506,7 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin | |||||
| } | } | ||||
| budget := pipeline.BudgetModelFromPolicy(policy) | budget := pipeline.BudgetModelFromPolicy(policy) | ||||
| queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy) | queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy) | ||||
| rt.arbitration.Budgets = budget | |||||
| rt.arbitration.HoldPolicy = pipeline.HoldPolicyFromPolicy(policy) | |||||
| rt.arbitration.Queue = queueStats | |||||
| rt.setArbitration(policy, budget, input.Admission, queueStats) | |||||
| summary := summarizeDecisions(decisions) | summary := summarizeDecisions(decisions) | ||||
| if rec != nil { | if rec != nil { | ||||
| if summary.RecordEnabled > 0 { | if summary.RecordEnabled > 0 { | ||||
| @@ -705,3 +701,7 @@ func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pi | |||||
| items[i].Reason = pipeline.RefinementReasonCompleted | items[i].Reason = pipeline.RefinementReasonCompleted | ||||
| } | } | ||||
| } | } | ||||
| func (rt *dspRuntime) setArbitration(policy pipeline.Policy, budget pipeline.BudgetModel, admission pipeline.RefinementAdmission, queue pipeline.DecisionQueueStats) { | |||||
| rt.arbitration = pipeline.BuildArbitrationState(policy, budget, admission, queue) | |||||
| } | |||||
| @@ -45,20 +45,13 @@ type DecisionDebug struct { | |||||
| } | } | ||||
| type ArbitrationSnapshot struct { | type ArbitrationSnapshot struct { | ||||
| Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` | |||||
| HoldPolicy *pipeline.HoldPolicy `json:"hold_policy,omitempty"` | |||||
| RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` | |||||
| Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` | |||||
| HoldPolicy *pipeline.HoldPolicy `json:"hold_policy,omitempty"` | |||||
| RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` | |||||
| RefinementAdmission *pipeline.RefinementAdmission `json:"refinement_admission,omitempty"` | RefinementAdmission *pipeline.RefinementAdmission `json:"refinement_admission,omitempty"` | ||||
| Queue decisionQueueStats `json:"queue,omitempty"` | |||||
| DecisionSummary decisionSummary `json:"decision_summary,omitempty"` | |||||
| DecisionItems []compactDecision `json:"decision_items,omitempty"` | |||||
| } | |||||
| type arbitrationState struct { | |||||
| Budgets pipeline.BudgetModel | |||||
| HoldPolicy pipeline.HoldPolicy | |||||
| Refinement pipeline.RefinementAdmission | |||||
| Queue decisionQueueStats | |||||
| Queue pipeline.DecisionQueueStats `json:"queue,omitempty"` | |||||
| DecisionSummary decisionSummary `json:"decision_summary,omitempty"` | |||||
| DecisionItems []compactDecision `json:"decision_items,omitempty"` | |||||
| } | } | ||||
| type SpectrumFrame struct { | type SpectrumFrame struct { | ||||
| @@ -0,0 +1,32 @@ | |||||
| package pipeline | |||||
| import "time" | |||||
| type Arbiter struct { | |||||
| refinementHold *RefinementHold | |||||
| queues *decisionQueues | |||||
| } | |||||
| func NewArbiter() *Arbiter { | |||||
| return &Arbiter{ | |||||
| refinementHold: &RefinementHold{Active: map[int64]time.Time{}}, | |||||
| queues: newDecisionQueues(), | |||||
| } | |||||
| } | |||||
| func (a *Arbiter) AdmitRefinement(plan RefinementPlan, policy Policy, now time.Time) RefinementAdmissionResult { | |||||
| if a == nil { | |||||
| return AdmitRefinementPlan(plan, policy, now, nil) | |||||
| } | |||||
| if a.refinementHold == nil { | |||||
| a.refinementHold = &RefinementHold{Active: map[int64]time.Time{}} | |||||
| } | |||||
| return AdmitRefinementPlan(plan, policy, now, a.refinementHold) | |||||
| } | |||||
| func (a *Arbiter) ApplyDecisions(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats { | |||||
| if a == nil || a.queues == nil { | |||||
| return DecisionQueueStats{} | |||||
| } | |||||
| return a.queues.Apply(decisions, budget, now, policy) | |||||
| } | |||||
| @@ -52,25 +52,39 @@ func HoldPolicyFromPolicy(policy Policy) HoldPolicy { | |||||
| profile := strings.ToLower(strings.TrimSpace(policy.Profile)) | profile := strings.ToLower(strings.TrimSpace(policy.Profile)) | ||||
| strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) | strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) | ||||
| if profileContains(profile, "archive") || strategyContains(strategy, "archive") { | |||||
| archiveProfile := profileContains(profile, "archive") | |||||
| archiveStrategy := strategyContains(strategy, "archive") | |||||
| if archiveProfile || archiveStrategy { | |||||
| recMult *= 1.5 | recMult *= 1.5 | ||||
| decMult *= 1.1 | decMult *= 1.1 | ||||
| refMult *= 1.2 | refMult *= 1.2 | ||||
| reasons = append(reasons, "archive") | |||||
| if archiveProfile { | |||||
| reasons = append(reasons, HoldReasonProfileArchive) | |||||
| } | |||||
| if archiveStrategy { | |||||
| reasons = append(reasons, HoldReasonStrategyArchive) | |||||
| } | |||||
| } | } | ||||
| if profileContains(profile, "digital") || strategyContains(strategy, "digital") { | |||||
| digitalProfile := profileContains(profile, "digital") | |||||
| digitalStrategy := strategyContains(strategy, "digital") | |||||
| if digitalProfile || digitalStrategy { | |||||
| decMult *= 1.6 | decMult *= 1.6 | ||||
| recMult *= 0.85 | recMult *= 0.85 | ||||
| refMult *= 1.1 | refMult *= 1.1 | ||||
| reasons = append(reasons, "digital") | |||||
| if digitalProfile { | |||||
| reasons = append(reasons, HoldReasonProfileDigital) | |||||
| } | |||||
| if digitalStrategy { | |||||
| reasons = append(reasons, HoldReasonStrategyDigital) | |||||
| } | |||||
| } | } | ||||
| if profileContains(profile, "aggressive") { | if profileContains(profile, "aggressive") { | ||||
| refMult *= 1.15 | refMult *= 1.15 | ||||
| reasons = append(reasons, "aggressive") | |||||
| reasons = append(reasons, HoldReasonProfileAggressive) | |||||
| } | } | ||||
| if strategyContains(strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy)), "multi") { | if strategyContains(strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy)), "multi") { | ||||
| refMult *= 1.1 | refMult *= 1.1 | ||||
| reasons = append(reasons, "multi-resolution") | |||||
| reasons = append(reasons, HoldReasonStrategyMultiRes) | |||||
| } | } | ||||
| return HoldPolicy{ | return HoldPolicy{ | ||||
| @@ -95,7 +109,7 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold | |||||
| BudgetSource: plan.BudgetSource, | BudgetSource: plan.BudgetSource, | ||||
| } | } | ||||
| if len(ranked) == 0 { | if len(ranked) == 0 { | ||||
| admission.Reason = "no-candidates" | |||||
| admission.Reason = ReasonAdmissionNoCandidates | |||||
| return RefinementAdmissionResult{Plan: plan, WorkItems: workItems, Admission: admission} | return RefinementAdmissionResult{Plan: plan, WorkItems: workItems, Admission: admission} | ||||
| } | } | ||||
| @@ -0,0 +1,26 @@ | |||||
| package pipeline | |||||
| const ( | |||||
| ReasonAdmissionNoCandidates = "admission:none:candidates" | |||||
| ReasonAdmissionDisabled = "admission:disabled" | |||||
| ) | |||||
| const ( | |||||
| DecisionReasonRecordClass = "decision:record:class" | |||||
| DecisionReasonRecordHint = "decision:record:hint" | |||||
| DecisionReasonDecodeClass = "decision:decode:class" | |||||
| DecisionReasonDecodeHint = "decision:decode:hint" | |||||
| DecisionReasonHintOnly = "decision:hint" | |||||
| DecisionReasonQueueRecord = "queue:record:budget" | |||||
| DecisionReasonQueueDecode = "queue:decode:budget" | |||||
| DecisionReasonUnspecified = "decision:unspecified" | |||||
| ) | |||||
| const ( | |||||
| HoldReasonProfileArchive = "profile:archive" | |||||
| HoldReasonProfileDigital = "profile:digital" | |||||
| HoldReasonProfileAggressive = "profile:aggressive" | |||||
| HoldReasonStrategyArchive = "strategy:archive" | |||||
| HoldReasonStrategyDigital = "strategy:digital" | |||||
| HoldReasonStrategyMultiRes = "strategy:multi-resolution" | |||||
| ) | |||||
| @@ -0,0 +1,17 @@ | |||||
| package pipeline | |||||
| type ArbitrationState struct { | |||||
| Budgets BudgetModel `json:"budgets,omitempty"` | |||||
| HoldPolicy HoldPolicy `json:"hold_policy,omitempty"` | |||||
| Refinement RefinementAdmission `json:"refinement,omitempty"` | |||||
| Queue DecisionQueueStats `json:"queue,omitempty"` | |||||
| } | |||||
| func BuildArbitrationState(policy Policy, budget BudgetModel, admission RefinementAdmission, queue DecisionQueueStats) ArbitrationState { | |||||
| return ArbitrationState{ | |||||
| Budgets: budget, | |||||
| HoldPolicy: HoldPolicyFromPolicy(policy), | |||||
| Refinement: admission, | |||||
| Queue: queue, | |||||
| } | |||||
| } | |||||
| @@ -1,6 +1,9 @@ | |||||
| package pipeline | package pipeline | ||||
| import "testing" | |||||
| import ( | |||||
| "testing" | |||||
| "time" | |||||
| ) | |||||
| func TestHoldPolicyArchiveBiasesRecord(t *testing.T) { | func TestHoldPolicyArchiveBiasesRecord(t *testing.T) { | ||||
| policy := Policy{DecisionHoldMs: 1000, Profile: "archive", RefinementStrategy: "archive-oriented"} | policy := Policy{DecisionHoldMs: 1000, Profile: "archive", RefinementStrategy: "archive-oriented"} | ||||
| @@ -11,6 +14,9 @@ func TestHoldPolicyArchiveBiasesRecord(t *testing.T) { | |||||
| if hold.RefinementMs <= hold.BaseMs { | if hold.RefinementMs <= hold.BaseMs { | ||||
| t.Fatalf("expected archive profile to extend refinement hold, got %d vs %d", hold.RefinementMs, hold.BaseMs) | t.Fatalf("expected archive profile to extend refinement hold, got %d vs %d", hold.RefinementMs, hold.BaseMs) | ||||
| } | } | ||||
| if !containsReason(hold.Reasons, HoldReasonProfileArchive) { | |||||
| t.Fatalf("expected profile archive reason, got %+v", hold.Reasons) | |||||
| } | |||||
| } | } | ||||
| func TestHoldPolicyDigitalBiasesDecode(t *testing.T) { | func TestHoldPolicyDigitalBiasesDecode(t *testing.T) { | ||||
| @@ -19,4 +25,23 @@ func TestHoldPolicyDigitalBiasesDecode(t *testing.T) { | |||||
| if hold.DecodeMs <= hold.RecordMs { | if hold.DecodeMs <= hold.RecordMs { | ||||
| t.Fatalf("expected digital profile to favor decode hold, got decode=%d record=%d", hold.DecodeMs, hold.RecordMs) | t.Fatalf("expected digital profile to favor decode hold, got decode=%d record=%d", hold.DecodeMs, hold.RecordMs) | ||||
| } | } | ||||
| if !containsReason(hold.Reasons, HoldReasonProfileDigital) { | |||||
| t.Fatalf("expected profile digital reason, got %+v", hold.Reasons) | |||||
| } | |||||
| } | |||||
| func TestAdmitRefinementPlanNoCandidatesReason(t *testing.T) { | |||||
| res := AdmitRefinementPlan(RefinementPlan{}, Policy{}, time.Now(), &RefinementHold{Active: map[int64]time.Time{}}) | |||||
| if res.Admission.Reason != ReasonAdmissionNoCandidates { | |||||
| t.Fatalf("expected no-candidates reason, got %s", res.Admission.Reason) | |||||
| } | |||||
| } | |||||
| func containsReason(reasons []string, target string) bool { | |||||
| for _, r := range reasons { | |||||
| if r == target { | |||||
| return true | |||||
| } | |||||
| } | |||||
| return false | |||||
| } | } | ||||
| @@ -1,13 +1,11 @@ | |||||
| package main | |||||
| package pipeline | |||||
| import ( | import ( | ||||
| "sort" | "sort" | ||||
| "time" | "time" | ||||
| "sdr-wideband-suite/internal/pipeline" | |||||
| ) | ) | ||||
| type decisionQueueStats struct { | |||||
| type DecisionQueueStats struct { | |||||
| RecordQueued int `json:"record_queued"` | RecordQueued int `json:"record_queued"` | ||||
| DecodeQueued int `json:"decode_queued"` | DecodeQueued int `json:"decode_queued"` | ||||
| RecordSelected int `json:"record_selected"` | RecordSelected int `json:"record_selected"` | ||||
| @@ -50,11 +48,11 @@ func newDecisionQueues() *decisionQueues { | |||||
| } | } | ||||
| } | } | ||||
| func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipeline.BudgetModel, now time.Time, policy pipeline.Policy) decisionQueueStats { | |||||
| func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats { | |||||
| if dq == nil { | if dq == nil { | ||||
| return decisionQueueStats{} | |||||
| return DecisionQueueStats{} | |||||
| } | } | ||||
| holdPolicy := pipeline.HoldPolicyFromPolicy(policy) | |||||
| holdPolicy := HoldPolicyFromPolicy(policy) | |||||
| recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond | recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond | ||||
| decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond | decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond | ||||
| recSeen := map[int64]bool{} | recSeen := map[int64]bool{} | ||||
| @@ -106,7 +104,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe | |||||
| recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy) | recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy) | ||||
| decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy) | decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy) | ||||
| stats := decisionQueueStats{ | |||||
| stats := DecisionQueueStats{ | |||||
| RecordQueued: len(dq.record), | RecordQueued: len(dq.record), | ||||
| DecodeQueued: len(dq.decode), | DecodeQueued: len(dq.decode), | ||||
| RecordSelected: len(recSelected), | RecordSelected: len(recSelected), | ||||
| @@ -127,7 +125,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe | |||||
| if decisions[i].ShouldRecord { | if decisions[i].ShouldRecord { | ||||
| if _, ok := recSelected[id]; !ok { | if _, ok := recSelected[id]; !ok { | ||||
| decisions[i].ShouldRecord = false | decisions[i].ShouldRecord = false | ||||
| decisions[i].Reason = "queued: record budget" | |||||
| decisions[i].Reason = DecisionReasonQueueRecord | |||||
| stats.RecordDropped++ | stats.RecordDropped++ | ||||
| } | } | ||||
| } | } | ||||
| @@ -135,7 +133,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe | |||||
| if _, ok := decSelected[id]; !ok { | if _, ok := decSelected[id]; !ok { | ||||
| decisions[i].ShouldAutoDecode = false | decisions[i].ShouldAutoDecode = false | ||||
| if decisions[i].Reason == "" { | if decisions[i].Reason == "" { | ||||
| decisions[i].Reason = "queued: decode budget" | |||||
| decisions[i].Reason = DecisionReasonQueueDecode | |||||
| } | } | ||||
| stats.DecodeDropped++ | stats.DecodeDropped++ | ||||
| } | } | ||||
| @@ -144,7 +142,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe | |||||
| return stats | return stats | ||||
| } | } | ||||
| func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy pipeline.Policy) map[int64]struct{} { | |||||
| func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy) map[int64]struct{} { | |||||
| selected := map[int64]struct{}{} | selected := map[int64]struct{}{} | ||||
| if len(queue) == 0 { | if len(queue) == 0 { | ||||
| return selected | return selected | ||||
| @@ -164,7 +162,7 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in | |||||
| if hint == "" { | if hint == "" { | ||||
| hint = qd.Class | hint = qd.Class | ||||
| } | } | ||||
| policyBoost := pipeline.DecisionPriorityBoost(policy, hint, qd.Class, queueName) | |||||
| policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName) | |||||
| scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost}) | scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost}) | ||||
| } | } | ||||
| sort.Slice(scoredList, func(i, j int) bool { | sort.Slice(scoredList, func(i, j int) bool { | ||||
| @@ -0,0 +1,59 @@ | |||||
| package pipeline | |||||
| import ( | |||||
| "testing" | |||||
| "time" | |||||
| ) | |||||
| func TestDecisionQueueDropsByBudget(t *testing.T) { | |||||
| arbiter := NewArbiter() | |||||
| decisions := []SignalDecision{ | |||||
| {Candidate: Candidate{ID: 1, SNRDb: 12}, ShouldRecord: true, ShouldAutoDecode: true}, | |||||
| {Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: true}, | |||||
| } | |||||
| budget := BudgetModel{ | |||||
| Record: BudgetQueue{Max: 1}, | |||||
| Decode: BudgetQueue{Max: 1}, | |||||
| } | |||||
| stats := arbiter.ApplyDecisions(decisions, budget, time.Now(), Policy{DecisionHoldMs: 250}) | |||||
| if stats.RecordDropped == 0 || stats.DecodeDropped == 0 { | |||||
| t.Fatalf("expected drops by budget, got %+v", stats) | |||||
| } | |||||
| allowed := 0 | |||||
| for _, d := range decisions { | |||||
| if d.ShouldRecord || d.ShouldAutoDecode { | |||||
| allowed++ | |||||
| continue | |||||
| } | |||||
| if d.Reason != DecisionReasonQueueRecord && d.Reason != DecisionReasonQueueDecode { | |||||
| t.Fatalf("unexpected decision reason: %s", d.Reason) | |||||
| } | |||||
| } | |||||
| if allowed != 1 { | |||||
| t.Fatalf("expected 1 decision allowed, got %d", allowed) | |||||
| } | |||||
| } | |||||
| func TestDecisionQueueEnforcesBudgets(t *testing.T) { | |||||
| decisions := []SignalDecision{ | |||||
| {Candidate: Candidate{ID: 1, SNRDb: 5}, ShouldRecord: true, ShouldAutoDecode: true}, | |||||
| {Candidate: Candidate{ID: 2, SNRDb: 15}, ShouldRecord: true, ShouldAutoDecode: true}, | |||||
| {Candidate: Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false}, | |||||
| } | |||||
| arbiter := NewArbiter() | |||||
| policy := Policy{SignalPriorities: []string{"digital"}, MaxRecordingStreams: 1, MaxDecodeJobs: 1} | |||||
| budget := BudgetModelFromPolicy(policy) | |||||
| stats := arbiter.ApplyDecisions(decisions, budget, time.Now(), policy) | |||||
| if stats.RecordSelected != 1 || stats.DecodeSelected != 1 { | |||||
| t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected) | |||||
| } | |||||
| if !decisions[1].ShouldRecord || !decisions[1].ShouldAutoDecode { | |||||
| t.Fatalf("expected highest SNR decision to remain allowed") | |||||
| } | |||||
| if decisions[0].ShouldRecord || decisions[0].ShouldAutoDecode { | |||||
| t.Fatalf("expected lowest SNR decision to be budgeted off") | |||||
| } | |||||
| if decisions[2].ShouldRecord { | |||||
| t.Fatalf("expected mid SNR decision to be budgeted off by record budget") | |||||
| } | |||||
| } | |||||
| @@ -24,24 +24,24 @@ func DecideSignalAction(policy Policy, candidate Candidate, cls *classifier.Clas | |||||
| } | } | ||||
| if classTag != "" && WantsClass(policy.AutoRecordClasses, classTag) { | if classTag != "" && WantsClass(policy.AutoRecordClasses, classTag) { | ||||
| decision.ShouldRecord = true | decision.ShouldRecord = true | ||||
| decision.Reason = "matched auto_record_classes" | |||||
| decision.Reason = DecisionReasonRecordClass | |||||
| } else if classTag == "" && hintTag != "" && WantsClass(policy.AutoRecordClasses, hintTag) { | } else if classTag == "" && hintTag != "" && WantsClass(policy.AutoRecordClasses, hintTag) { | ||||
| decision.ShouldRecord = true | decision.ShouldRecord = true | ||||
| decision.Reason = "matched auto_record_classes (hint)" | |||||
| decision.Reason = DecisionReasonRecordHint | |||||
| } | } | ||||
| if classTag != "" && WantsClass(policy.AutoDecodeClasses, classTag) { | if classTag != "" && WantsClass(policy.AutoDecodeClasses, classTag) { | ||||
| decision.ShouldAutoDecode = true | decision.ShouldAutoDecode = true | ||||
| if decision.Reason == "" { | if decision.Reason == "" { | ||||
| decision.Reason = "matched auto_decode_classes" | |||||
| decision.Reason = DecisionReasonDecodeClass | |||||
| } | } | ||||
| } else if classTag == "" && hintTag != "" && WantsClass(policy.AutoDecodeClasses, hintTag) { | } else if classTag == "" && hintTag != "" && WantsClass(policy.AutoDecodeClasses, hintTag) { | ||||
| decision.ShouldAutoDecode = true | decision.ShouldAutoDecode = true | ||||
| if decision.Reason == "" { | if decision.Reason == "" { | ||||
| decision.Reason = "matched auto_decode_classes (hint)" | |||||
| decision.Reason = DecisionReasonDecodeHint | |||||
| } | } | ||||
| } | } | ||||
| if decision.Reason == "" && candidate.Hint != "" { | if decision.Reason == "" && candidate.Hint != "" { | ||||
| decision.Reason = "policy evaluated candidate hint" | |||||
| decision.Reason = DecisionReasonHintOnly | |||||
| } | } | ||||
| return decision | return decision | ||||
| } | } | ||||
| @@ -62,16 +62,16 @@ const ( | |||||
| ) | ) | ||||
| const ( | const ( | ||||
| RefinementReasonPlanned = "planned" | |||||
| RefinementReasonAdmitted = "admitted" | |||||
| RefinementReasonRunning = "running" | |||||
| RefinementReasonCompleted = "completed" | |||||
| RefinementReasonMonitorGate = "dropped:monitor" | |||||
| RefinementReasonBelowSNR = "dropped:snr" | |||||
| RefinementReasonBudget = "skipped:budget" | |||||
| RefinementReasonDisabled = "dropped:disabled" | |||||
| RefinementReasonUnclassified = "dropped:unclassified" | |||||
| RefinementReasonDisplaced = "skipped:displaced" | |||||
| RefinementReasonPlanned = "refinement:planned" | |||||
| RefinementReasonAdmitted = "refinement:admitted" | |||||
| RefinementReasonRunning = "refinement:running" | |||||
| RefinementReasonCompleted = "refinement:completed" | |||||
| RefinementReasonMonitorGate = "refinement:drop:monitor" | |||||
| RefinementReasonBelowSNR = "refinement:drop:snr" | |||||
| RefinementReasonBudget = "refinement:skip:budget" | |||||
| RefinementReasonDisabled = "refinement:drop:disabled" | |||||
| RefinementReasonUnclassified = "refinement:drop:unclassified" | |||||
| RefinementReasonDisplaced = "refinement:skip:displaced" | |||||
| ) | ) | ||||
| // BuildRefinementPlan scores and ranks candidates for costly local refinement. | // BuildRefinementPlan scores and ranks candidates for costly local refinement. | ||||
| @@ -203,7 +203,14 @@ func ScheduleCandidates(candidates []Candidate, policy Policy) []ScheduledCandid | |||||
| func refinementStrategy(policy Policy) (string, string) { | func refinementStrategy(policy Policy) (string, string) { | ||||
| intent := strings.ToLower(strings.TrimSpace(policy.Intent)) | intent := strings.ToLower(strings.TrimSpace(policy.Intent)) | ||||
| profile := strings.ToLower(strings.TrimSpace(policy.Profile)) | |||||
| switch { | switch { | ||||
| case strings.Contains(profile, "digital"): | |||||
| return "digital-hunting", "profile" | |||||
| case strings.Contains(profile, "archive"): | |||||
| return "archive-oriented", "profile" | |||||
| case strings.Contains(profile, "aggressive"): | |||||
| return "multi-resolution", "profile" | |||||
| case strings.Contains(intent, "digital") || strings.Contains(intent, "hunt") || strings.Contains(intent, "decode"): | case strings.Contains(intent, "digital") || strings.Contains(intent, "hunt") || strings.Contains(intent, "decode"): | ||||
| return "digital-hunting", "intent" | return "digital-hunting", "intent" | ||||
| case strings.Contains(intent, "archive") || strings.Contains(intent, "triage") || strings.Contains(policy.Mode, "archive"): | case strings.Contains(intent, "archive") || strings.Contains(intent, "triage") || strings.Contains(policy.Mode, "archive"): | ||||
| @@ -226,6 +226,17 @@ func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) { | |||||
| } | } | ||||
| } | } | ||||
| func TestRefinementStrategyUsesProfile(t *testing.T) { | |||||
| strategy, reason := refinementStrategy(Policy{Profile: "digital-hunting"}) | |||||
| if strategy != "digital-hunting" || reason != "profile" { | |||||
| t.Fatalf("expected digital profile to set strategy, got %s (%s)", strategy, reason) | |||||
| } | |||||
| strategy, reason = refinementStrategy(Policy{Profile: "archive"}) | |||||
| if strategy != "archive-oriented" || reason != "profile" { | |||||
| t.Fatalf("expected archive profile to set strategy, got %s (%s)", strategy, reason) | |||||
| } | |||||
| } | |||||
| func findWorkItem(items []RefinementWorkItem, id int64) *RefinementWorkItem { | func findWorkItem(items []RefinementWorkItem, id int64) *RefinementWorkItem { | ||||
| for i := range items { | for i := range items { | ||||
| if items[i].Candidate.ID == id { | if items[i].Candidate.ID == id { | ||||