diff --git a/internal/pipeline/arbitration.go b/internal/pipeline/arbitration.go index 30feaef..ca1dbf7 100644 --- a/internal/pipeline/arbitration.go +++ b/internal/pipeline/arbitration.go @@ -21,24 +21,26 @@ type RefinementHold struct { } type RefinementAdmission struct { - Budget int `json:"budget"` - BudgetSource string `json:"budget_source,omitempty"` - HoldMs int `json:"hold_ms"` - HoldSource string `json:"hold_source,omitempty"` - Planned int `json:"planned"` - Admitted int `json:"admitted"` - Skipped int `json:"skipped"` - Displaced int `json:"displaced"` - HoldActive int `json:"hold_active"` - HoldSelected int `json:"hold_selected"` - HoldProtected int `json:"hold_protected"` - HoldExpired int `json:"hold_expired"` - HoldDisplaced int `json:"hold_displaced"` - Opportunistic int `json:"opportunistic"` - PriorityCutoff float64 `json:"priority_cutoff,omitempty"` - PriorityTier string `json:"priority_tier,omitempty"` - Reason string `json:"reason,omitempty"` - Pressure BudgetPressure `json:"pressure,omitempty"` + Budget int `json:"budget"` + BudgetSource string `json:"budget_source,omitempty"` + DecisionHoldMs int `json:"decision_hold_ms,omitempty"` + HoldMs int `json:"hold_ms"` + HoldSource string `json:"hold_source,omitempty"` + Planned int `json:"planned"` + Admitted int `json:"admitted"` + Skipped int `json:"skipped"` + Displaced int `json:"displaced"` + DisplacedByHold int `json:"displaced_by_hold,omitempty"` + HoldActive int `json:"hold_active"` + HoldSelected int `json:"hold_selected"` + HoldProtected int `json:"hold_protected"` + HoldExpired int `json:"hold_expired"` + HoldDisplaced int `json:"hold_displaced"` + Opportunistic int `json:"opportunistic"` + PriorityCutoff float64 `json:"priority_cutoff,omitempty"` + PriorityTier string `json:"priority_tier,omitempty"` + Reason string `json:"reason,omitempty"` + Pressure BudgetPressure `json:"pressure,omitempty"` } type RefinementAdmissionResult struct { @@ -123,6 +125,7 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold holdPolicy := HoldPolicyFromPolicy(policy) budgetModel := BudgetModelFromPolicy(policy) + admission.DecisionHoldMs = holdPolicy.BaseMs admission.HoldMs = holdPolicy.RefinementMs admission.HoldSource = "resources.decision_hold_ms" if len(holdPolicy.Reasons) > 0 { @@ -238,6 +241,7 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold } } admission.Displaced = len(displacedByHold) + len(displacedHold) + admission.DisplacedByHold = len(displacedByHold) admission.PriorityTier = PriorityTierFromRange(admission.PriorityCutoff, plan.PriorityMin, plan.PriorityMax) admission.Pressure = buildRefinementPressure(budgetModel, admission) if admission.PriorityCutoff > 0 { diff --git a/internal/pipeline/decision_queue.go b/internal/pipeline/decision_queue.go index 1b78eb3..605a4ec 100644 --- a/internal/pipeline/decision_queue.go +++ b/internal/pipeline/decision_queue.go @@ -6,33 +6,36 @@ import ( ) type DecisionQueueStats struct { - RecordQueued int `json:"record_queued"` - DecodeQueued int `json:"decode_queued"` - RecordSelected int `json:"record_selected"` - DecodeSelected int `json:"decode_selected"` - RecordActive int `json:"record_active"` - DecodeActive int `json:"decode_active"` - RecordOldestS float64 `json:"record_oldest_sec"` - DecodeOldestS float64 `json:"decode_oldest_sec"` - RecordBudget int `json:"record_budget"` - DecodeBudget int `json:"decode_budget"` - HoldMs int `json:"hold_ms"` - RecordHoldMs int `json:"record_hold_ms"` - DecodeHoldMs int `json:"decode_hold_ms"` - RecordDropped int `json:"record_dropped"` - DecodeDropped int `json:"decode_dropped"` - RecordHoldSelected int `json:"record_hold_selected"` - DecodeHoldSelected int `json:"decode_hold_selected"` - RecordHoldProtected int `json:"record_hold_protected"` - DecodeHoldProtected int `json:"decode_hold_protected"` - RecordHoldExpired int `json:"record_hold_expired"` - DecodeHoldExpired int `json:"decode_hold_expired"` - RecordHoldDisplaced int `json:"record_hold_displaced"` - DecodeHoldDisplaced int `json:"decode_hold_displaced"` - RecordOpportunistic int `json:"record_opportunistic"` - DecodeOpportunistic int `json:"decode_opportunistic"` - RecordDisplaced int `json:"record_displaced"` - DecodeDisplaced int `json:"decode_displaced"` + RecordQueued int `json:"record_queued"` + DecodeQueued int `json:"decode_queued"` + RecordSelected int `json:"record_selected"` + DecodeSelected int `json:"decode_selected"` + RecordActive int `json:"record_active"` + DecodeActive int `json:"decode_active"` + RecordOldestS float64 `json:"record_oldest_sec"` + DecodeOldestS float64 `json:"decode_oldest_sec"` + RecordBudget int `json:"record_budget"` + DecodeBudget int `json:"decode_budget"` + HoldMs int `json:"hold_ms"` + DecisionHoldMs int `json:"decision_hold_ms,omitempty"` + RecordHoldMs int `json:"record_hold_ms"` + DecodeHoldMs int `json:"decode_hold_ms"` + RecordDropped int `json:"record_dropped"` + DecodeDropped int `json:"decode_dropped"` + RecordHoldSelected int `json:"record_hold_selected"` + DecodeHoldSelected int `json:"decode_hold_selected"` + RecordHoldProtected int `json:"record_hold_protected"` + DecodeHoldProtected int `json:"decode_hold_protected"` + RecordHoldExpired int `json:"record_hold_expired"` + DecodeHoldExpired int `json:"decode_hold_expired"` + RecordHoldDisplaced int `json:"record_hold_displaced"` + DecodeHoldDisplaced int `json:"decode_hold_displaced"` + RecordOpportunistic int `json:"record_opportunistic"` + DecodeOpportunistic int `json:"decode_opportunistic"` + RecordDisplaced int `json:"record_displaced"` + DecodeDisplaced int `json:"decode_displaced"` + RecordDisplacedByHold int `json:"record_displaced_by_hold,omitempty"` + DecodeDisplacedByHold int `json:"decode_displaced_by_hold,omitempty"` } type queuedDecision struct { @@ -136,31 +139,34 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, decPressureTag := pressureReasonTag(decPressure) stats := DecisionQueueStats{ - RecordQueued: len(dq.record), - DecodeQueued: len(dq.decode), - RecordSelected: len(recSelected.selected), - DecodeSelected: len(decSelected.selected), - RecordActive: len(dq.recordHold), - DecodeActive: len(dq.decodeHold), - RecordOldestS: oldestAge(dq.record, now), - DecodeOldestS: oldestAge(dq.decode, now), - RecordBudget: budget.Record.Max, - DecodeBudget: budget.Decode.Max, - HoldMs: budget.HoldMs, - RecordHoldMs: holdPolicy.RecordMs, - DecodeHoldMs: holdPolicy.DecodeMs, - RecordHoldSelected: len(recSelected.held) - len(recSelected.displaced), - DecodeHoldSelected: len(decSelected.held) - len(decSelected.displaced), - RecordHoldProtected: len(recSelected.protected), - DecodeHoldProtected: len(decSelected.protected), - RecordHoldExpired: len(recExpired), - DecodeHoldExpired: len(decExpired), - RecordHoldDisplaced: len(recSelected.displaced), - DecodeHoldDisplaced: len(decSelected.displaced), - RecordOpportunistic: len(recSelected.opportunistic), - DecodeOpportunistic: len(decSelected.opportunistic), - RecordDisplaced: len(recSelected.displacedByHold), - DecodeDisplaced: len(decSelected.displacedByHold), + RecordQueued: len(dq.record), + DecodeQueued: len(dq.decode), + RecordSelected: len(recSelected.selected), + DecodeSelected: len(decSelected.selected), + RecordActive: len(dq.recordHold), + DecodeActive: len(dq.decodeHold), + RecordOldestS: oldestAge(dq.record, now), + DecodeOldestS: oldestAge(dq.decode, now), + RecordBudget: budget.Record.Max, + DecodeBudget: budget.Decode.Max, + HoldMs: holdPolicy.BaseMs, + DecisionHoldMs: holdPolicy.BaseMs, + RecordHoldMs: holdPolicy.RecordMs, + DecodeHoldMs: holdPolicy.DecodeMs, + RecordHoldSelected: len(recSelected.held) - len(recSelected.displaced), + DecodeHoldSelected: len(decSelected.held) - len(decSelected.displaced), + RecordHoldProtected: len(recSelected.protected), + DecodeHoldProtected: len(decSelected.protected), + RecordHoldExpired: len(recExpired), + DecodeHoldExpired: len(decExpired), + RecordHoldDisplaced: len(recSelected.displaced), + DecodeHoldDisplaced: len(decSelected.displaced), + RecordOpportunistic: len(recSelected.opportunistic), + DecodeOpportunistic: len(decSelected.opportunistic), + RecordDisplaced: len(recSelected.displacedByHold), + DecodeDisplaced: len(decSelected.displacedByHold), + RecordDisplacedByHold: len(recSelected.displacedByHold), + DecodeDisplacedByHold: len(decSelected.displacedByHold), } for i := range decisions { diff --git a/internal/pipeline/decision_queue_test.go b/internal/pipeline/decision_queue_test.go index 7741943..deaa154 100644 --- a/internal/pipeline/decision_queue_test.go +++ b/internal/pipeline/decision_queue_test.go @@ -85,7 +85,7 @@ func TestDecisionQueueHoldKeepsSelection(t *testing.T) { {Candidate: Candidate{ID: 2, SNRDb: 30}, ShouldRecord: true, ShouldAutoDecode: true}, {Candidate: Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: true}, } - arbiter.ApplyDecisions(decisions, budget, now.Add(100*time.Millisecond), policy) + stats := arbiter.ApplyDecisions(decisions, budget, now.Add(100*time.Millisecond), policy) if !decisions[1].ShouldRecord || !decisions[1].ShouldAutoDecode { t.Fatalf("expected held candidate 2 to remain selected") } @@ -95,6 +95,12 @@ func TestDecisionQueueHoldKeepsSelection(t *testing.T) { if decisions[1].RecordAdmission == nil || decisions[1].RecordAdmission.Class != AdmissionClassHold { t.Fatalf("expected record admission hold class, got %+v", decisions[1].RecordAdmission) } + if stats.DecisionHoldMs != policy.DecisionHoldMs { + t.Fatalf("expected decision hold ms %d, got %d", policy.DecisionHoldMs, stats.DecisionHoldMs) + } + if stats.RecordDisplacedByHold != 1 || stats.RecordDisplaced != 1 { + t.Fatalf("expected displaced-by-hold count 1, got %+v", stats) + } } func TestDecisionQueueHighTierHoldProtected(t *testing.T) { diff --git a/internal/pipeline/scheduler_test.go b/internal/pipeline/scheduler_test.go index e2a15f0..88e8d41 100644 --- a/internal/pipeline/scheduler_test.go +++ b/internal/pipeline/scheduler_test.go @@ -301,7 +301,7 @@ func TestAdmitRefinementPlanAppliesBudget(t *testing.T) { } func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) { - policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0} + policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0, DecisionHoldMs: 500} cands := []Candidate{ {ID: 1, CenterHz: 100, SNRDb: 9}, {ID: 2, CenterHz: 200, SNRDb: 12}, @@ -320,6 +320,12 @@ func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) { if item2.Admission == nil || item2.Admission.Class != AdmissionClassDisplace { t.Fatalf("expected displaced admission class, got %+v", item2.Admission) } + if res.Admission.DisplacedByHold != 1 || res.Admission.Displaced != 1 { + t.Fatalf("expected displaced-by-hold count 1, got %+v", res.Admission) + } + if res.Admission.DecisionHoldMs != policy.DecisionHoldMs { + t.Fatalf("expected decision hold ms %d, got %d", policy.DecisionHoldMs, res.Admission.DecisionHoldMs) + } } func TestAdmitRefinementPlanOpportunisticDisplacement(t *testing.T) {