From 4ebd51d22b2382632df873b194505421e9d18d7f Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sun, 22 Mar 2026 10:53:37 +0100 Subject: [PATCH] Add priority tiers and admission classes to pipeline --- internal/pipeline/arbitration.go | 37 +++++++++++ internal/pipeline/decision_queue.go | 92 ++++++++++++++++++++++----- internal/pipeline/decisions.go | 12 ++-- internal/pipeline/priority.go | 98 +++++++++++++++++++++++++++++ internal/pipeline/scheduler.go | 33 ++++++++++ 5 files changed, 251 insertions(+), 21 deletions(-) create mode 100644 internal/pipeline/priority.go diff --git a/internal/pipeline/arbitration.go b/internal/pipeline/arbitration.go index 00f3193..d7d5e2e 100644 --- a/internal/pipeline/arbitration.go +++ b/internal/pipeline/arbitration.go @@ -30,6 +30,7 @@ type RefinementAdmission struct { Skipped int `json:"skipped"` Displaced int `json:"displaced"` PriorityCutoff float64 `json:"priority_cutoff,omitempty"` + PriorityTier string `json:"priority_tier,omitempty"` Reason string `json:"reason,omitempty"` } @@ -123,11 +124,13 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold planned := len(ranked) admission.Planned = planned selected := map[int64]struct{}{} + held := map[int64]struct{}{} if hold != nil { purgeHold(hold.Active, now) for id := range hold.Active { if rankedContains(ranked, id) { selected[id] = struct{}{} + held[id] = struct{}{} } } } @@ -185,6 +188,10 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold } } admission.Displaced = len(displaced) + admission.PriorityTier = PriorityTierFromRange(admission.PriorityCutoff, plan.PriorityMin, plan.PriorityMax) + if admission.PriorityCutoff > 0 { + admission.Reason = admissionReason("admission:budget", policy, holdPolicy, "budget:"+slugToken(plan.BudgetSource)) + } plan.Selected = admitted plan.PriorityCutoff = admission.PriorityCutoff @@ -198,15 +205,45 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold if _, ok := selected[id]; ok { item.Status = RefinementStatusAdmitted item.Reason = RefinementReasonAdmitted + class := AdmissionClassAdmit + reason := "refinement:admit:budget" + if _, wasHeld := held[id]; wasHeld { + class = AdmissionClassHold + reason = "refinement:admit:hold" + } + if item.Admission == nil { + item.Admission = &PriorityAdmission{Basis: "refinement"} + } + item.Admission.Class = class + item.Admission.Score = item.Priority + item.Admission.Cutoff = admission.PriorityCutoff + item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax) + item.Admission.Reason = admissionReason(reason, policy, holdPolicy, "budget:"+slugToken(plan.BudgetSource)) continue } if _, ok := displaced[id]; ok { item.Status = RefinementStatusDisplaced item.Reason = RefinementReasonDisplaced + if item.Admission == nil { + item.Admission = &PriorityAdmission{Basis: "refinement"} + } + item.Admission.Class = AdmissionClassDisplace + item.Admission.Score = item.Priority + item.Admission.Cutoff = admission.PriorityCutoff + item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax) + item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, "pressure:hold", "budget:"+slugToken(plan.BudgetSource)) continue } item.Status = RefinementStatusSkipped item.Reason = RefinementReasonBudget + if item.Admission == nil { + item.Admission = &PriorityAdmission{Basis: "refinement"} + } + item.Admission.Class = AdmissionClassDefer + item.Admission.Score = item.Priority + item.Admission.Cutoff = admission.PriorityCutoff + item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax) + item.Admission.Reason = admissionReason("refinement:skip:budget", policy, holdPolicy, "pressure:budget", "budget:"+slugToken(plan.BudgetSource)) } return RefinementAdmissionResult{ Plan: plan, diff --git a/internal/pipeline/decision_queue.go b/internal/pipeline/decision_queue.go index 0b2604c..4220fb6 100644 --- a/internal/pipeline/decision_queue.go +++ b/internal/pipeline/decision_queue.go @@ -32,6 +32,15 @@ type queuedDecision struct { LastSeen time.Time } +type queueSelection struct { + selected map[int64]struct{} + held map[int64]struct{} + scores map[int64]float64 + minScore float64 + maxScore float64 + cutoff float64 +} + type decisionQueues struct { record map[int64]*queuedDecision decode map[int64]*queuedDecision @@ -107,8 +116,8 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, stats := DecisionQueueStats{ RecordQueued: len(dq.record), DecodeQueued: len(dq.decode), - RecordSelected: len(recSelected), - DecodeSelected: len(decSelected), + RecordSelected: len(recSelected.selected), + DecodeSelected: len(decSelected.selected), RecordActive: len(dq.recordHold), DecodeActive: len(dq.decodeHold), RecordOldestS: oldestAge(dq.record, now), @@ -123,17 +132,19 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, for i := range decisions { id := decisions[i].Candidate.ID if decisions[i].ShouldRecord { - if _, ok := recSelected[id]; !ok { + decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source) + if _, ok := recSelected.selected[id]; !ok { decisions[i].ShouldRecord = false - decisions[i].Reason = DecisionReasonQueueRecord + decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, "pressure:budget", "budget:"+slugToken(budget.Record.Source)) stats.RecordDropped++ } } if decisions[i].ShouldAutoDecode { - if _, ok := decSelected[id]; !ok { + decisions[i].DecodeAdmission = buildQueueAdmission("decode", id, decSelected, policy, holdPolicy, budget.Decode.Source) + if _, ok := decSelected.selected[id]; !ok { decisions[i].ShouldAutoDecode = false if decisions[i].Reason == "" { - decisions[i].Reason = DecisionReasonQueueDecode + decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, "pressure:budget", "budget:"+slugToken(budget.Decode.Source)) } stats.DecodeDropped++ } @@ -142,10 +153,14 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, return stats } -func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy) map[int64]struct{} { - selected := map[int64]struct{}{} +func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy) queueSelection { + selection := queueSelection{ + selected: map[int64]struct{}{}, + held: map[int64]struct{}{}, + scores: map[int64]float64{}, + } if len(queue) == 0 { - return selected + return selection } type scored struct { id int64 @@ -163,7 +178,15 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in hint = qd.Class } policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName) - scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost}) + score := qd.SNRDb + boost + policyBoost + selection.scores[id] = score + if len(scoredList) == 0 || score < selection.minScore { + selection.minScore = score + } + if len(scoredList) == 0 || score > selection.maxScore { + selection.maxScore = score + } + scoredList = append(scoredList, scored{id: id, score: score}) } sort.Slice(scoredList, func(i, j int) bool { return scoredList[i].score > scoredList[j].score @@ -180,24 +203,61 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in } for id := range hold { if _, ok := queue[id]; ok { - selected[id] = struct{}{} + selection.selected[id] = struct{}{} + selection.held[id] = struct{}{} } } for _, s := range scoredList { - if len(selected) >= limit { + if len(selection.selected) >= limit { break } - if _, ok := selected[s.id]; ok { + if _, ok := selection.selected[s.id]; ok { continue } - selected[s.id] = struct{}{} + selection.selected[s.id] = struct{}{} } if holdDur > 0 { - for id := range selected { + for id := range selection.selected { hold[id] = now.Add(holdDur) } } - return selected + if len(selection.selected) > 0 { + first := true + for id := range selection.selected { + score := selection.scores[id] + if first || score < selection.cutoff { + selection.cutoff = score + first = false + } + } + } + return selection +} + +func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string) *PriorityAdmission { + score, ok := selection.scores[id] + if !ok { + return nil + } + admission := &PriorityAdmission{ + Basis: queueName, + Score: score, + Cutoff: selection.cutoff, + Tier: PriorityTierFromRange(score, selection.minScore, selection.maxScore), + } + if _, ok := selection.selected[id]; ok { + if _, held := selection.held[id]; held { + admission.Class = AdmissionClassHold + admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, "pressure:hold", "budget:"+slugToken(budgetSource)) + } else { + admission.Class = AdmissionClassAdmit + admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, "budget:"+slugToken(budgetSource)) + } + return admission + } + admission.Class = AdmissionClassDefer + admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, "pressure:budget", "budget:"+slugToken(budgetSource)) + return admission } func purgeExpired(hold map[int64]time.Time, now time.Time) { diff --git a/internal/pipeline/decisions.go b/internal/pipeline/decisions.go index b458d03..e0c7215 100644 --- a/internal/pipeline/decisions.go +++ b/internal/pipeline/decisions.go @@ -7,11 +7,13 @@ import ( ) type SignalDecision struct { - Candidate Candidate `json:"candidate"` - Class string `json:"class,omitempty"` - ShouldRecord bool `json:"should_record"` - ShouldAutoDecode bool `json:"should_auto_decode"` - Reason string `json:"reason,omitempty"` + Candidate Candidate `json:"candidate"` + Class string `json:"class,omitempty"` + ShouldRecord bool `json:"should_record"` + ShouldAutoDecode bool `json:"should_auto_decode"` + Reason string `json:"reason,omitempty"` + RecordAdmission *PriorityAdmission `json:"record_admission,omitempty"` + DecodeAdmission *PriorityAdmission `json:"decode_admission,omitempty"` } func DecideSignalAction(policy Policy, candidate Candidate, cls *classifier.Classification) SignalDecision { diff --git a/internal/pipeline/priority.go b/internal/pipeline/priority.go new file mode 100644 index 0000000..cd303da --- /dev/null +++ b/internal/pipeline/priority.go @@ -0,0 +1,98 @@ +package pipeline + +import ( + "strings" +) + +const ( + PriorityTierCritical = "critical" + PriorityTierHigh = "high" + PriorityTierMedium = "medium" + PriorityTierLow = "low" + PriorityTierBackground = "background" +) + +const ( + AdmissionClassPlanned = "plan" + AdmissionClassAdmit = "admit" + AdmissionClassHold = "hold" + AdmissionClassDefer = "defer" + AdmissionClassDisplace = "displace" + AdmissionClassDrop = "drop" +) + +type PriorityAdmission struct { + Tier string `json:"tier,omitempty"` + Class string `json:"class,omitempty"` + Score float64 `json:"score,omitempty"` + Cutoff float64 `json:"cutoff,omitempty"` + Basis string `json:"basis,omitempty"` + Reason string `json:"reason,omitempty"` +} + +func PriorityTierFromRange(score, min, max float64) string { + if max <= min { + return PriorityTierHigh + } + norm := (score - min) / (max - min) + switch { + case norm >= 0.85: + return PriorityTierCritical + case norm >= 0.65: + return PriorityTierHigh + case norm >= 0.45: + return PriorityTierMedium + case norm >= 0.25: + return PriorityTierLow + default: + return PriorityTierBackground + } +} + +func admissionReason(base string, policy Policy, holdPolicy HoldPolicy, extras ...string) string { + tags := uniqueReasonTags(policy, holdPolicy, extras...) + if len(tags) == 0 { + return base + } + return base + ":" + strings.Join(tags, ":") +} + +func uniqueReasonTags(policy Policy, holdPolicy HoldPolicy, extras ...string) []string { + seen := map[string]struct{}{} + tags := make([]string, 0, 6) + add := func(tag string) { + if tag == "" { + return + } + if _, ok := seen[tag]; ok { + return + } + seen[tag] = struct{}{} + tags = append(tags, tag) + } + if policy.Profile != "" { + add("profile:" + slugToken(policy.Profile)) + } + if policy.Intent != "" { + add("intent:" + slugToken(policy.Intent)) + } + if policy.RefinementStrategy != "" { + add("strategy:" + slugToken(policy.RefinementStrategy)) + } + for _, reason := range holdPolicy.Reasons { + add(reason) + } + for _, extra := range extras { + add(extra) + } + return tags +} + +func slugToken(input string) string { + input = strings.TrimSpace(strings.ToLower(input)) + if input == "" { + return "" + } + parts := strings.Fields(input) + return strings.Join(parts, "-") +} diff --git a/internal/pipeline/scheduler.go b/internal/pipeline/scheduler.go index 6459bb7..48e0377 100644 --- a/internal/pipeline/scheduler.go +++ b/internal/pipeline/scheduler.go @@ -8,6 +8,7 @@ import ( type ScheduledCandidate struct { Candidate Candidate `json:"candidate"` Priority float64 `json:"priority"` + Tier string `json:"tier,omitempty"` Score *RefinementScore `json:"score,omitempty"` Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` } @@ -43,6 +44,7 @@ type RefinementWorkItem struct { Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` Status string `json:"status,omitempty"` Reason string `json:"reason,omitempty"` + Admission *PriorityAdmission `json:"admission,omitempty"` } type RefinementExecution struct { @@ -85,6 +87,7 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { strategy, strategyReason := refinementStrategy(policy) budgetModel := BudgetModelFromPolicy(policy) budget := budgetModel.Refinement.Max + holdPolicy := HoldPolicyFromPolicy(policy) plan := RefinementPlan{ TotalCandidates: len(candidates), MinCandidateSNRDb: policy.MinCandidateSNRDb, @@ -123,6 +126,12 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { Candidate: candidate, Status: RefinementStatusDropped, Reason: RefinementReasonMonitorGate, + Admission: &PriorityAdmission{ + Tier: PriorityTierBackground, + Class: AdmissionClassDrop, + Basis: "refinement", + Reason: admissionReason(RefinementReasonMonitorGate, policy, holdPolicy), + }, }) continue } @@ -132,6 +141,12 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { Candidate: candidate, Status: RefinementStatusDropped, Reason: RefinementReasonBelowSNR, + Admission: &PriorityAdmission{ + Tier: PriorityTierBackground, + Class: AdmissionClassDrop, + Basis: "refinement", + Reason: admissionReason(RefinementReasonBelowSNR, policy, holdPolicy), + }, }) continue } @@ -177,6 +192,12 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { Breakdown: &score.Breakdown, Status: RefinementStatusPlanned, Reason: RefinementReasonPlanned, + Admission: &PriorityAdmission{ + Class: AdmissionClassPlanned, + Score: priority, + Basis: "refinement", + Reason: admissionReason(RefinementReasonPlanned, policy, holdPolicy), + }, }) } sort.Slice(scored, func(i, j int) bool { @@ -201,6 +222,18 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { plan.PriorityMin = minPriority plan.PriorityMax = maxPriority plan.PriorityAvg = sumPriority / float64(len(scored)) + for i := range scored { + scored[i].Tier = PriorityTierFromRange(scored[i].Priority, minPriority, maxPriority) + } + for i := range workItems { + if workItems[i].Admission == nil { + continue + } + if workItems[i].Status != RefinementStatusPlanned { + continue + } + workItems[i].Admission.Tier = PriorityTierFromRange(workItems[i].Priority, minPriority, maxPriority) + } } plan.Ranked = append(plan.Ranked, scored...) plan.WorkItems = workItems