diff --git a/internal/pipeline/arbitration.go b/internal/pipeline/arbitration.go index 3a33910..30feaef 100644 --- a/internal/pipeline/arbitration.go +++ b/internal/pipeline/arbitration.go @@ -29,6 +29,12 @@ type RefinementAdmission struct { Admitted int `json:"admitted"` Skipped int `json:"skipped"` Displaced int `json:"displaced"` + HoldActive int `json:"hold_active"` + HoldSelected int `json:"hold_selected"` + HoldProtected int `json:"hold_protected"` + HoldExpired int `json:"hold_expired"` + HoldDisplaced int `json:"hold_displaced"` + Opportunistic int `json:"opportunistic"` PriorityCutoff float64 `json:"priority_cutoff,omitempty"` PriorityTier string `json:"priority_tier,omitempty"` Reason string `json:"reason,omitempty"` @@ -127,8 +133,10 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold admission.Planned = planned selected := map[int64]struct{}{} held := map[int64]struct{}{} + protected := map[int64]struct{}{} + expired := map[int64]struct{}{} if hold != nil { - purgeHold(hold.Active, now) + expired = expireHold(hold.Active, now) for id := range hold.Active { if rankedContains(ranked, id) { selected[id] = struct{}{} @@ -146,20 +154,49 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold limit = planned } } + tierByID := map[int64]string{} + scoreByID := map[int64]float64{} for _, cand := range ranked { - if len(selected) >= limit { - break + tierByID[cand.Candidate.ID] = PriorityTierFromRange(cand.Priority, plan.PriorityMin, plan.PriorityMax) + scoreByID[cand.Candidate.ID] = cand.Priority + } + for id := range held { + if isProtectedTier(tierByID[id]) { + protected[id] = struct{}{} } + } + displaceable := buildDisplaceableHold(held, protected, tierByID, scoreByID) + opportunistic := map[int64]struct{}{} + displacedHold := map[int64]struct{}{} + for _, cand := range ranked { if _, ok := selected[cand.Candidate.ID]; ok { continue } + if len(selected) < limit { + selected[cand.Candidate.ID] = struct{}{} + continue + } + if len(displaceable) == 0 { + continue + } + target := displaceable[0] + if priorityTierRank(tierByID[cand.Candidate.ID]) <= priorityTierRank(tierByID[target]) { + continue + } + displaceable = displaceable[1:] + delete(selected, target) + displacedHold[target] = struct{}{} selected[cand.Candidate.ID] = struct{}{} + opportunistic[cand.Candidate.ID] = struct{}{} } if hold != nil && admission.HoldMs > 0 { until := now.Add(time.Duration(admission.HoldMs) * time.Millisecond) if hold.Active == nil { hold.Active = map[int64]time.Time{} } + for id := range displacedHold { + delete(hold.Active, id) + } for id := range selected { hold.Active[id] = until } @@ -176,8 +213,16 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold if admission.Skipped < 0 { admission.Skipped = 0 } + if hold != nil { + admission.HoldActive = len(hold.Active) + } + admission.HoldSelected = len(held) - len(displacedHold) + admission.HoldProtected = len(protected) + admission.HoldExpired = len(expired) + admission.HoldDisplaced = len(displacedHold) + admission.Opportunistic = len(opportunistic) - displaced := map[int64]struct{}{} + displacedByHold := map[int64]struct{}{} if len(admitted) > 0 { admission.PriorityCutoff = admitted[len(admitted)-1].Priority for _, cand := range ranked { @@ -185,11 +230,14 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold continue } if cand.Priority >= admission.PriorityCutoff { - displaced[cand.Candidate.ID] = struct{}{} + if _, ok := displacedHold[cand.Candidate.ID]; ok { + continue + } + displacedByHold[cand.Candidate.ID] = struct{}{} } } } - admission.Displaced = len(displaced) + admission.Displaced = len(displacedByHold) + len(displacedHold) admission.PriorityTier = PriorityTierFromRange(admission.PriorityCutoff, plan.PriorityMin, plan.PriorityMax) admission.Pressure = buildRefinementPressure(budgetModel, admission) if admission.PriorityCutoff > 0 { @@ -220,11 +268,34 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Class = class item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff - item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax) - item.Admission.Reason = admissionReason(reason, policy, holdPolicy, pressureReasonTag(admission.Pressure), "budget:"+slugToken(plan.BudgetSource)) + item.Admission.Tier = tierByID[id] + extras := []string{pressureReasonTag(admission.Pressure), "budget:" + slugToken(plan.BudgetSource)} + if _, wasHeld := held[id]; wasHeld { + extras = append(extras, "pressure:hold", ReasonTagHoldActive) + if _, ok := protected[id]; ok { + extras = append(extras, ReasonTagHoldProtected) + } + } + if _, ok := opportunistic[id]; ok { + extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced) + } + item.Admission.Reason = admissionReason(reason, policy, holdPolicy, extras...) + continue + } + if _, ok := displacedHold[id]; ok { + item.Status = RefinementStatusDisplaced + item.Reason = RefinementReasonDisplaced + if item.Admission == nil { + item.Admission = &PriorityAdmission{Basis: "refinement"} + } + item.Admission.Class = AdmissionClassDisplace + item.Admission.Score = item.Priority + item.Admission.Cutoff = admission.PriorityCutoff + item.Admission.Tier = tierByID[id] + item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:"+slugToken(plan.BudgetSource)) continue } - if _, ok := displaced[id]; ok { + if _, ok := displacedByHold[id]; ok { item.Status = RefinementStatusDisplaced item.Reason = RefinementReasonDisplaced if item.Admission == nil { @@ -233,8 +304,8 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Class = AdmissionClassDisplace item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff - item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax) - item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", "budget:"+slugToken(plan.BudgetSource)) + item.Admission.Tier = tierByID[id] + item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", ReasonTagHoldActive, "budget:"+slugToken(plan.BudgetSource)) continue } item.Status = RefinementStatusSkipped @@ -245,8 +316,12 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Class = AdmissionClassDefer item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff - item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax) - item.Admission.Reason = admissionReason("refinement:skip:budget", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:budget", "budget:"+slugToken(plan.BudgetSource)) + item.Admission.Tier = tierByID[id] + extras := []string{pressureReasonTag(admission.Pressure), "pressure:budget", "budget:" + slugToken(plan.BudgetSource)} + if _, ok := expired[id]; ok { + extras = append(extras, ReasonTagHoldExpired) + } + item.Admission.Reason = admissionReason("refinement:skip:budget", policy, holdPolicy, extras...) } return RefinementAdmissionResult{ Plan: plan, @@ -256,14 +331,6 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold } } -func purgeHold(active map[int64]time.Time, now time.Time) { - for id, until := range active { - if now.After(until) { - delete(active, id) - } - } -} - func rankedContains(items []ScheduledCandidate, id int64) bool { for _, item := range items { if item.Candidate.ID == id { diff --git a/internal/pipeline/arbitration_reasons.go b/internal/pipeline/arbitration_reasons.go index 8485bf6..3694e96 100644 --- a/internal/pipeline/arbitration_reasons.go +++ b/internal/pipeline/arbitration_reasons.go @@ -24,3 +24,12 @@ const ( HoldReasonStrategyDigital = "strategy:digital" HoldReasonStrategyMultiRes = "strategy:multi-resolution" ) + +const ( + ReasonTagHoldActive = "hold:active" + ReasonTagHoldExpired = "hold:expired" + ReasonTagHoldProtected = "hold:protected" + ReasonTagHoldDisplaced = "hold:displaced" + ReasonTagDisplaceOpportunist = "displace:opportunistic" + ReasonTagDisplaceTier = "displace:tier" +) diff --git a/internal/pipeline/decision_queue.go b/internal/pipeline/decision_queue.go index 6666397..1b78eb3 100644 --- a/internal/pipeline/decision_queue.go +++ b/internal/pipeline/decision_queue.go @@ -6,21 +6,33 @@ import ( ) type DecisionQueueStats struct { - RecordQueued int `json:"record_queued"` - DecodeQueued int `json:"decode_queued"` - RecordSelected int `json:"record_selected"` - DecodeSelected int `json:"decode_selected"` - RecordActive int `json:"record_active"` - DecodeActive int `json:"decode_active"` - RecordOldestS float64 `json:"record_oldest_sec"` - DecodeOldestS float64 `json:"decode_oldest_sec"` - RecordBudget int `json:"record_budget"` - DecodeBudget int `json:"decode_budget"` - HoldMs int `json:"hold_ms"` - RecordHoldMs int `json:"record_hold_ms"` - DecodeHoldMs int `json:"decode_hold_ms"` - RecordDropped int `json:"record_dropped"` - DecodeDropped int `json:"decode_dropped"` + RecordQueued int `json:"record_queued"` + DecodeQueued int `json:"decode_queued"` + RecordSelected int `json:"record_selected"` + DecodeSelected int `json:"decode_selected"` + RecordActive int `json:"record_active"` + DecodeActive int `json:"decode_active"` + RecordOldestS float64 `json:"record_oldest_sec"` + DecodeOldestS float64 `json:"decode_oldest_sec"` + RecordBudget int `json:"record_budget"` + DecodeBudget int `json:"decode_budget"` + HoldMs int `json:"hold_ms"` + RecordHoldMs int `json:"record_hold_ms"` + DecodeHoldMs int `json:"decode_hold_ms"` + RecordDropped int `json:"record_dropped"` + DecodeDropped int `json:"decode_dropped"` + RecordHoldSelected int `json:"record_hold_selected"` + DecodeHoldSelected int `json:"decode_hold_selected"` + RecordHoldProtected int `json:"record_hold_protected"` + DecodeHoldProtected int `json:"decode_hold_protected"` + RecordHoldExpired int `json:"record_hold_expired"` + DecodeHoldExpired int `json:"decode_hold_expired"` + RecordHoldDisplaced int `json:"record_hold_displaced"` + DecodeHoldDisplaced int `json:"decode_hold_displaced"` + RecordOpportunistic int `json:"record_opportunistic"` + DecodeOpportunistic int `json:"decode_opportunistic"` + RecordDisplaced int `json:"record_displaced"` + DecodeDisplaced int `json:"decode_displaced"` } type queuedDecision struct { @@ -33,12 +45,18 @@ type queuedDecision struct { } type queueSelection struct { - selected map[int64]struct{} - held map[int64]struct{} - scores map[int64]float64 - minScore float64 - maxScore float64 - cutoff float64 + selected map[int64]struct{} + held map[int64]struct{} + protected map[int64]struct{} + displacedByHold map[int64]struct{} + displaced map[int64]struct{} + opportunistic map[int64]struct{} + expired map[int64]struct{} + scores map[int64]float64 + tiers map[int64]string + minScore float64 + maxScore float64 + cutoff float64 } type decisionQueues struct { @@ -107,30 +125,42 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, } } - purgeExpired(dq.recordHold, now) - purgeExpired(dq.decodeHold, now) + recExpired := expireHold(dq.recordHold, now) + decExpired := expireHold(dq.decodeHold, now) - recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy) - decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy) + recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy, recExpired) + decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy, decExpired) recPressure := buildQueuePressure(budget.Record, len(dq.record), len(recSelected.selected), len(dq.recordHold)) decPressure := buildQueuePressure(budget.Decode, len(dq.decode), len(decSelected.selected), len(dq.decodeHold)) recPressureTag := pressureReasonTag(recPressure) decPressureTag := pressureReasonTag(decPressure) stats := DecisionQueueStats{ - RecordQueued: len(dq.record), - DecodeQueued: len(dq.decode), - RecordSelected: len(recSelected.selected), - DecodeSelected: len(decSelected.selected), - RecordActive: len(dq.recordHold), - DecodeActive: len(dq.decodeHold), - RecordOldestS: oldestAge(dq.record, now), - DecodeOldestS: oldestAge(dq.decode, now), - RecordBudget: budget.Record.Max, - DecodeBudget: budget.Decode.Max, - HoldMs: budget.HoldMs, - RecordHoldMs: holdPolicy.RecordMs, - DecodeHoldMs: holdPolicy.DecodeMs, + RecordQueued: len(dq.record), + DecodeQueued: len(dq.decode), + RecordSelected: len(recSelected.selected), + DecodeSelected: len(decSelected.selected), + RecordActive: len(dq.recordHold), + DecodeActive: len(dq.decodeHold), + RecordOldestS: oldestAge(dq.record, now), + DecodeOldestS: oldestAge(dq.decode, now), + RecordBudget: budget.Record.Max, + DecodeBudget: budget.Decode.Max, + HoldMs: budget.HoldMs, + RecordHoldMs: holdPolicy.RecordMs, + DecodeHoldMs: holdPolicy.DecodeMs, + RecordHoldSelected: len(recSelected.held) - len(recSelected.displaced), + DecodeHoldSelected: len(decSelected.held) - len(decSelected.displaced), + RecordHoldProtected: len(recSelected.protected), + DecodeHoldProtected: len(decSelected.protected), + RecordHoldExpired: len(recExpired), + DecodeHoldExpired: len(decExpired), + RecordHoldDisplaced: len(recSelected.displaced), + DecodeHoldDisplaced: len(decSelected.displaced), + RecordOpportunistic: len(recSelected.opportunistic), + DecodeOpportunistic: len(decSelected.opportunistic), + RecordDisplaced: len(recSelected.displacedByHold), + DecodeDisplaced: len(decSelected.displacedByHold), } for i := range decisions { @@ -139,7 +169,15 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source, recPressureTag) if _, ok := recSelected.selected[id]; !ok { decisions[i].ShouldRecord = false - decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, recPressureTag, "pressure:budget", "budget:"+slugToken(budget.Record.Source)) + extras := []string{recPressureTag, "pressure:budget", "budget:" + slugToken(budget.Record.Source)} + if _, ok := recSelected.displaced[id]; ok { + extras = []string{recPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Record.Source)} + } else if _, ok := recSelected.displacedByHold[id]; ok { + extras = []string{recPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Record.Source)} + } else if _, ok := recSelected.expired[id]; ok { + extras = append(extras, ReasonTagHoldExpired) + } + decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, extras...) stats.RecordDropped++ } } @@ -148,7 +186,15 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, if _, ok := decSelected.selected[id]; !ok { decisions[i].ShouldAutoDecode = false if decisions[i].Reason == "" { - decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, decPressureTag, "pressure:budget", "budget:"+slugToken(budget.Decode.Source)) + extras := []string{decPressureTag, "pressure:budget", "budget:" + slugToken(budget.Decode.Source)} + if _, ok := decSelected.displaced[id]; ok { + extras = []string{decPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Decode.Source)} + } else if _, ok := decSelected.displacedByHold[id]; ok { + extras = []string{decPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Decode.Source)} + } else if _, ok := decSelected.expired[id]; ok { + extras = append(extras, ReasonTagHoldExpired) + } + decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, extras...) } stats.DecodeDropped++ } @@ -157,15 +203,24 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, return stats } -func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy) queueSelection { +func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy, expired map[int64]struct{}) queueSelection { selection := queueSelection{ - selected: map[int64]struct{}{}, - held: map[int64]struct{}{}, - scores: map[int64]float64{}, + selected: map[int64]struct{}{}, + held: map[int64]struct{}{}, + protected: map[int64]struct{}{}, + displacedByHold: map[int64]struct{}{}, + displaced: map[int64]struct{}{}, + opportunistic: map[int64]struct{}{}, + expired: map[int64]struct{}{}, + scores: map[int64]float64{}, + tiers: map[int64]string{}, } if len(queue) == 0 { return selection } + for id := range expired { + selection.expired[id] = struct{}{} + } type scored struct { id int64 score float64 @@ -195,6 +250,9 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in sort.Slice(scoredList, func(i, j int) bool { return scoredList[i].score > scoredList[j].score }) + for id, score := range selection.scores { + selection.tiers[id] = PriorityTierFromRange(score, selection.minScore, selection.maxScore) + } limit := max if limit <= 0 || limit > len(scoredList) { limit = len(scoredList) @@ -209,18 +267,37 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in if _, ok := queue[id]; ok { selection.selected[id] = struct{}{} selection.held[id] = struct{}{} + if isProtectedTier(selection.tiers[id]) { + selection.protected[id] = struct{}{} + } } } + displaceable := buildDisplaceableHold(selection.held, selection.protected, selection.tiers, selection.scores) for _, s := range scoredList { - if len(selection.selected) >= limit { - break - } if _, ok := selection.selected[s.id]; ok { continue } + if len(selection.selected) < limit { + selection.selected[s.id] = struct{}{} + continue + } + if len(displaceable) == 0 { + continue + } + target := displaceable[0] + if priorityTierRank(selection.tiers[s.id]) <= priorityTierRank(selection.tiers[target]) { + continue + } + displaceable = displaceable[1:] + delete(selection.selected, target) + selection.displaced[target] = struct{}{} selection.selected[s.id] = struct{}{} + selection.opportunistic[s.id] = struct{}{} } if holdDur > 0 { + for id := range selection.displaced { + delete(hold, id) + } for id := range selection.selected { hold[id] = now.Add(holdDur) } @@ -235,9 +312,59 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in } } } + if len(selection.selected) > 0 { + for id := range selection.scores { + if _, ok := selection.selected[id]; ok { + continue + } + if _, ok := selection.displaced[id]; ok { + continue + } + if selection.scores[id] >= selection.cutoff { + selection.displacedByHold[id] = struct{}{} + } + } + } return selection } +func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{}, tiers map[int64]string, scores map[int64]float64) []int64 { + type entry struct { + id int64 + rank int + score float64 + } + candidates := make([]entry, 0, len(held)) + for id := range held { + if _, ok := protected[id]; ok { + continue + } + score := 0.0 + if scores != nil { + score = scores[id] + } + candidates = append(candidates, entry{ + id: id, + rank: priorityTierRank(tiers[id]), + score: score, + }) + } + if len(candidates) == 0 { + return nil + } + sort.Slice(candidates, func(i, j int) bool { + if candidates[i].rank == candidates[j].rank { + return candidates[i].score < candidates[j].score + } + return candidates[i].rank < candidates[j].rank + }) + out := make([]int64, 0, len(candidates)) + for _, c := range candidates { + out = append(out, c.id) + } + return out +} + func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string, pressureTag string) *PriorityAdmission { score, ok := selection.scores[id] if !ok { @@ -247,29 +374,43 @@ func buildQueueAdmission(queueName string, id int64, selection queueSelection, p Basis: queueName, Score: score, Cutoff: selection.cutoff, - Tier: PriorityTierFromRange(score, selection.minScore, selection.maxScore), + Tier: selection.tiers[id], } if _, ok := selection.selected[id]; ok { if _, held := selection.held[id]; held { admission.Class = AdmissionClassHold - admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, pressureTag, "pressure:hold", "budget:"+slugToken(budgetSource)) + extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)} + if _, ok := selection.protected[id]; ok { + extras = append(extras, ReasonTagHoldProtected) + } + admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, extras...) } else { admission.Class = AdmissionClassAdmit - admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, pressureTag, "budget:"+slugToken(budgetSource)) + extras := []string{pressureTag, "budget:" + slugToken(budgetSource)} + if _, ok := selection.opportunistic[id]; ok { + extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced) + } + admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, extras...) } return admission } + if _, ok := selection.displaced[id]; ok { + admission.Class = AdmissionClassDisplace + admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:"+slugToken(budgetSource)) + return admission + } + if _, ok := selection.displacedByHold[id]; ok { + admission.Class = AdmissionClassDisplace + admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:"+slugToken(budgetSource)) + return admission + } admission.Class = AdmissionClassDefer - admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, pressureTag, "pressure:budget", "budget:"+slugToken(budgetSource)) - return admission -} - -func purgeExpired(hold map[int64]time.Time, now time.Time) { - for id, until := range hold { - if now.After(until) { - delete(hold, id) - } + extras := []string{pressureTag, "pressure:budget", "budget:" + slugToken(budgetSource)} + if _, ok := selection.expired[id]; ok { + extras = append(extras, ReasonTagHoldExpired) } + admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, extras...) + return admission } func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 { diff --git a/internal/pipeline/decision_queue_test.go b/internal/pipeline/decision_queue_test.go index 942893e..7741943 100644 --- a/internal/pipeline/decision_queue_test.go +++ b/internal/pipeline/decision_queue_test.go @@ -81,8 +81,9 @@ func TestDecisionQueueHoldKeepsSelection(t *testing.T) { } decisions = []SignalDecision{ - {Candidate: Candidate{ID: 1, SNRDb: 25}, ShouldRecord: true, ShouldAutoDecode: true}, - {Candidate: Candidate{ID: 2, SNRDb: 2}, ShouldRecord: true, ShouldAutoDecode: true}, + {Candidate: Candidate{ID: 1, SNRDb: 32}, ShouldRecord: true, ShouldAutoDecode: true}, + {Candidate: Candidate{ID: 2, SNRDb: 30}, ShouldRecord: true, ShouldAutoDecode: true}, + {Candidate: Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: true}, } arbiter.ApplyDecisions(decisions, budget, now.Add(100*time.Millisecond), policy) if !decisions[1].ShouldRecord || !decisions[1].ShouldAutoDecode { @@ -95,3 +96,118 @@ func TestDecisionQueueHoldKeepsSelection(t *testing.T) { t.Fatalf("expected record admission hold class, got %+v", decisions[1].RecordAdmission) } } + +func TestDecisionQueueHighTierHoldProtected(t *testing.T) { + arbiter := NewArbiter() + policy := Policy{DecisionHoldMs: 500} + budget := BudgetModel{Record: BudgetQueue{Max: 1}} + now := time.Now() + + decisions := []SignalDecision{ + {Candidate: Candidate{ID: 1, SNRDb: 30}, ShouldRecord: true}, + {Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true}, + } + arbiter.ApplyDecisions(decisions, budget, now, policy) + if !decisions[0].ShouldRecord { + t.Fatalf("expected candidate 1 to be selected initially") + } + + decisions = []SignalDecision{ + {Candidate: Candidate{ID: 1, SNRDb: 30}, ShouldRecord: true}, + {Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true}, + {Candidate: Candidate{ID: 3, SNRDb: 32}, ShouldRecord: true}, + } + arbiter.ApplyDecisions(decisions, budget, now.Add(100*time.Millisecond), policy) + if !decisions[0].ShouldRecord { + t.Fatalf("expected protected hold to keep candidate 1") + } + if decisions[2].ShouldRecord { + t.Fatalf("expected candidate 3 to remain deferred behind protected hold") + } + if decisions[0].RecordAdmission == nil || decisions[0].RecordAdmission.Class != AdmissionClassHold { + t.Fatalf("expected hold admission for candidate 1, got %+v", decisions[0].RecordAdmission) + } + if decisions[2].RecordAdmission == nil || decisions[2].RecordAdmission.Class != AdmissionClassDisplace { + t.Fatalf("expected displacement admission for candidate 3, got %+v", decisions[2].RecordAdmission) + } +} + +func TestDecisionQueueOpportunisticDisplacement(t *testing.T) { + arbiter := NewArbiter() + policy := Policy{DecisionHoldMs: 500} + budget := BudgetModel{Record: BudgetQueue{Max: 1}} + now := time.Now() + + decisions := []SignalDecision{ + {Candidate: Candidate{ID: 1, SNRDb: 15}, ShouldRecord: true}, + {Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true}, + } + arbiter.ApplyDecisions(decisions, budget, now, policy) + if !decisions[0].ShouldRecord { + t.Fatalf("expected candidate 1 to be selected initially") + } + + decisions = []SignalDecision{ + {Candidate: Candidate{ID: 1, SNRDb: 5}, ShouldRecord: true}, + {Candidate: Candidate{ID: 2, SNRDb: 4}, ShouldRecord: true}, + {Candidate: Candidate{ID: 3, SNRDb: 30}, ShouldRecord: true}, + } + arbiter.ApplyDecisions(decisions, budget, now.Add(100*time.Millisecond), policy) + if decisions[0].ShouldRecord { + t.Fatalf("expected candidate 1 to be displaced") + } + if !decisions[2].ShouldRecord { + t.Fatalf("expected candidate 3 to opportunistically displace hold") + } + if decisions[0].RecordAdmission == nil || decisions[0].RecordAdmission.Class != AdmissionClassDisplace { + t.Fatalf("expected displacement admission for candidate 1, got %+v", decisions[0].RecordAdmission) + } + if decisions[2].RecordAdmission == nil || decisions[2].RecordAdmission.Class != AdmissionClassAdmit { + t.Fatalf("expected admit admission for candidate 3, got %+v", decisions[2].RecordAdmission) + } + if decisions[2].RecordAdmission == nil || !strings.Contains(decisions[2].RecordAdmission.Reason, ReasonTagDisplaceOpportunist) { + t.Fatalf("expected opportunistic displacement reason, got %+v", decisions[2].RecordAdmission) + } +} + +func TestDecisionQueueHoldExpiryChurn(t *testing.T) { + arbiter := NewArbiter() + policy := Policy{DecisionHoldMs: 100} + budget := BudgetModel{Record: BudgetQueue{Max: 1}} + now := time.Now() + + decisions := []SignalDecision{ + {Candidate: Candidate{ID: 1, SNRDb: 12}, ShouldRecord: true}, + {Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true}, + } + arbiter.ApplyDecisions(decisions, budget, now, policy) + if !decisions[0].ShouldRecord { + t.Fatalf("expected candidate 1 to be selected initially") + } + + decisions = []SignalDecision{ + {Candidate: Candidate{ID: 1, SNRDb: 30}, ShouldRecord: true}, + {Candidate: Candidate{ID: 2, SNRDb: 32}, ShouldRecord: true}, + {Candidate: Candidate{ID: 3, SNRDb: 5}, ShouldRecord: true}, + } + arbiter.ApplyDecisions(decisions, budget, now.Add(50*time.Millisecond), policy) + if !decisions[0].ShouldRecord { + t.Fatalf("expected hold to keep candidate 1 before expiry") + } + + decisions = []SignalDecision{ + {Candidate: Candidate{ID: 1, SNRDb: 30}, ShouldRecord: true}, + {Candidate: Candidate{ID: 2, SNRDb: 32}, ShouldRecord: true}, + {Candidate: Candidate{ID: 3, SNRDb: 5}, ShouldRecord: true}, + } + arbiter.ApplyDecisions(decisions, budget, now.Add(200*time.Millisecond), policy) + if decisions[0].ShouldRecord { + t.Fatalf("expected candidate 1 to be released after hold expiry") + } + if !decisions[1].ShouldRecord { + t.Fatalf("expected candidate 2 to be selected after hold expiry") + } + if decisions[0].RecordAdmission == nil || !strings.Contains(decisions[0].RecordAdmission.Reason, ReasonTagHoldExpired) { + t.Fatalf("expected hold expiry reason, got %+v", decisions[0].RecordAdmission) + } +} diff --git a/internal/pipeline/hold.go b/internal/pipeline/hold.go new file mode 100644 index 0000000..aaac2bf --- /dev/null +++ b/internal/pipeline/hold.go @@ -0,0 +1,21 @@ +package pipeline + +import "time" + +func expireHold(hold map[int64]time.Time, now time.Time) map[int64]struct{} { + if len(hold) == 0 { + return map[int64]struct{}{} + } + expired := map[int64]struct{}{} + for id, until := range hold { + if now.After(until) { + expired[id] = struct{}{} + delete(hold, id) + } + } + return expired +} + +func isProtectedTier(tier string) bool { + return priorityTierRank(tier) >= priorityTierRank(PriorityTierHigh) +} diff --git a/internal/pipeline/priority.go b/internal/pipeline/priority.go index cd303da..a9f3c6b 100644 --- a/internal/pipeline/priority.go +++ b/internal/pipeline/priority.go @@ -49,6 +49,21 @@ func PriorityTierFromRange(score, min, max float64) string { } } +func priorityTierRank(tier string) int { + switch tier { + case PriorityTierCritical: + return 4 + case PriorityTierHigh: + return 3 + case PriorityTierMedium: + return 2 + case PriorityTierLow: + return 1 + default: + return 0 + } +} + func admissionReason(base string, policy Policy, holdPolicy HoldPolicy, extras ...string) string { tags := uniqueReasonTags(policy, holdPolicy, extras...) if len(tags) == 0 { diff --git a/internal/pipeline/scheduler_test.go b/internal/pipeline/scheduler_test.go index 7b84c4e..e2a15f0 100644 --- a/internal/pipeline/scheduler_test.go +++ b/internal/pipeline/scheduler_test.go @@ -1,6 +1,7 @@ package pipeline import ( + "strings" "testing" "time" ) @@ -302,8 +303,9 @@ func TestAdmitRefinementPlanAppliesBudget(t *testing.T) { func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) { policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0} cands := []Candidate{ - {ID: 1, CenterHz: 100, SNRDb: 5}, + {ID: 1, CenterHz: 100, SNRDb: 9}, {ID: 2, CenterHz: 200, SNRDb: 12}, + {ID: 3, CenterHz: 300, SNRDb: 2}, } plan := BuildRefinementPlan(cands, policy) hold := &RefinementHold{Active: map[int64]time.Time{1: time.Now().Add(2 * time.Second)}} @@ -320,6 +322,30 @@ func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) { } } +func TestAdmitRefinementPlanOpportunisticDisplacement(t *testing.T) { + policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0, DecisionHoldMs: 500} + cands := []Candidate{ + {ID: 1, CenterHz: 100, SNRDb: 5}, + {ID: 2, CenterHz: 200, SNRDb: 25}, + } + plan := BuildRefinementPlan(cands, policy) + hold := &RefinementHold{Active: map[int64]time.Time{1: time.Now().Add(2 * time.Second)}} + res := AdmitRefinementPlan(plan, policy, time.Now(), hold) + if len(res.Plan.Selected) != 1 || res.Plan.Selected[0].Candidate.ID != 2 { + t.Fatalf("expected opportunistic displacement to admit candidate 2, got %+v", res.Plan.Selected) + } + item1 := findWorkItem(res.WorkItems, 1) + if item1 == nil || item1.Status != RefinementStatusDisplaced { + t.Fatalf("expected candidate 1 displaced, got %+v", item1) + } + if item1.Admission == nil || item1.Admission.Class != AdmissionClassDisplace { + t.Fatalf("expected displaced admission class, got %+v", item1.Admission) + } + if item1.Admission == nil || !strings.Contains(item1.Admission.Reason, ReasonTagDisplaceOpportunist) { + t.Fatalf("expected opportunistic displacement reason, got %+v", item1.Admission) + } +} + func TestRefinementStrategyUsesProfile(t *testing.T) { strategy, reason := refinementStrategy(Policy{Profile: "digital-hunting"}) if strategy != "digital-hunting" || reason != "profile" {