From 30a5d11d537c62addd1e11dff6b7bc45cdf7c62e Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sun, 22 Mar 2026 11:33:27 +0100 Subject: [PATCH] pipeline: apply intent holds and family tier floors --- internal/pipeline/arbitration.go | 47 +++++++++++++++- internal/pipeline/arbitration_reasons.go | 15 +++-- internal/pipeline/decision_queue.go | 41 +++++++++++--- internal/pipeline/priority.go | 72 ++++++++++++++++++++++-- internal/pipeline/scheduler.go | 66 ++++++++++++++-------- 5 files changed, 194 insertions(+), 47 deletions(-) diff --git a/internal/pipeline/arbitration.go b/internal/pipeline/arbitration.go index ca1dbf7..2bc2ffe 100644 --- a/internal/pipeline/arbitration.go +++ b/internal/pipeline/arbitration.go @@ -61,6 +61,7 @@ func HoldPolicyFromPolicy(policy Policy) HoldPolicy { reasons := make([]string, 0, 2) profile := strings.ToLower(strings.TrimSpace(policy.Profile)) strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) + intent := strings.ToLower(strings.TrimSpace(policy.Intent)) archiveProfile := profileContains(profile, "archive") archiveStrategy := strategyContains(strategy, "archive") @@ -96,6 +97,24 @@ func HoldPolicyFromPolicy(policy Policy) HoldPolicy { refMult *= 1.1 reasons = append(reasons, HoldReasonStrategyMultiRes) } + intentArchive := strings.Contains(intent, "archive") || strings.Contains(intent, "triage") || strings.Contains(intent, "record") + intentDecode := strings.Contains(intent, "decode") || strings.Contains(intent, "digital") || strings.Contains(intent, "analysis") + intentSurveillance := strings.Contains(intent, "surveillance") || strings.Contains(intent, "wideband") + if intentArchive { + recMult *= 1.25 + refMult *= 1.1 + decMult *= 1.05 + reasons = append(reasons, HoldReasonIntentArchive) + } + if intentDecode { + decMult *= 1.25 + refMult *= 1.05 + reasons = append(reasons, HoldReasonIntentDecode) + } + if intentSurveillance { + refMult *= 1.1 + reasons = append(reasons, HoldReasonIntentSurveillance) + } return HoldPolicy{ BaseMs: base, @@ -159,16 +178,25 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold } tierByID := map[int64]string{} scoreByID := map[int64]float64{} + familyByID := map[int64]string{} + familyRankByID := map[int64]int{} + familyFloorByID := map[int64]string{} for _, cand := range ranked { - tierByID[cand.Candidate.ID] = PriorityTierFromRange(cand.Priority, plan.PriorityMin, plan.PriorityMax) - scoreByID[cand.Candidate.ID] = cand.Priority + id := cand.Candidate.ID + family, familyRank := signalPriorityMatch(policy, cand.Candidate.Hint, "") + familyByID[id] = family + familyRankByID[id] = familyRank + familyFloorByID[id] = signalPriorityTierFloor(familyRank) + baseTier := PriorityTierFromRange(cand.Priority, plan.PriorityMin, plan.PriorityMax) + tierByID[id] = applyTierFloor(baseTier, familyFloorByID[id]) + scoreByID[id] = cand.Priority } for id := range held { if isProtectedTier(tierByID[id]) { protected[id] = struct{}{} } } - displaceable := buildDisplaceableHold(held, protected, tierByID, scoreByID) + displaceable := buildDisplaceableHold(held, protected, tierByID, scoreByID, familyRankByID) opportunistic := map[int64]struct{}{} displacedHold := map[int64]struct{}{} for _, cand := range ranked { @@ -257,6 +285,7 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold continue } id := item.Candidate.ID + familyRankOut := familyRankForOutput(familyRankByID[id]) if _, ok := selected[id]; ok { item.Status = RefinementStatusAdmitted item.Reason = RefinementReasonAdmitted @@ -273,6 +302,9 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff item.Admission.Tier = tierByID[id] + item.Admission.TierFloor = familyFloorByID[id] + item.Admission.Family = familyByID[id] + item.Admission.FamilyRank = familyRankOut extras := []string{pressureReasonTag(admission.Pressure), "budget:" + slugToken(plan.BudgetSource)} if _, wasHeld := held[id]; wasHeld { extras = append(extras, "pressure:hold", ReasonTagHoldActive) @@ -296,6 +328,9 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff item.Admission.Tier = tierByID[id] + item.Admission.TierFloor = familyFloorByID[id] + item.Admission.Family = familyByID[id] + item.Admission.FamilyRank = familyRankOut item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:"+slugToken(plan.BudgetSource)) continue } @@ -309,6 +344,9 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff item.Admission.Tier = tierByID[id] + item.Admission.TierFloor = familyFloorByID[id] + item.Admission.Family = familyByID[id] + item.Admission.FamilyRank = familyRankOut item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", ReasonTagHoldActive, "budget:"+slugToken(plan.BudgetSource)) continue } @@ -321,6 +359,9 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff item.Admission.Tier = tierByID[id] + item.Admission.TierFloor = familyFloorByID[id] + item.Admission.Family = familyByID[id] + item.Admission.FamilyRank = familyRankOut extras := []string{pressureReasonTag(admission.Pressure), "pressure:budget", "budget:" + slugToken(plan.BudgetSource)} if _, ok := expired[id]; ok { extras = append(extras, ReasonTagHoldExpired) diff --git a/internal/pipeline/arbitration_reasons.go b/internal/pipeline/arbitration_reasons.go index 3694e96..ca8ae6a 100644 --- a/internal/pipeline/arbitration_reasons.go +++ b/internal/pipeline/arbitration_reasons.go @@ -17,12 +17,15 @@ const ( ) const ( - HoldReasonProfileArchive = "profile:archive" - HoldReasonProfileDigital = "profile:digital" - HoldReasonProfileAggressive = "profile:aggressive" - HoldReasonStrategyArchive = "strategy:archive" - HoldReasonStrategyDigital = "strategy:digital" - HoldReasonStrategyMultiRes = "strategy:multi-resolution" + HoldReasonProfileArchive = "profile:archive" + HoldReasonProfileDigital = "profile:digital" + HoldReasonProfileAggressive = "profile:aggressive" + HoldReasonStrategyArchive = "strategy:archive" + HoldReasonStrategyDigital = "strategy:digital" + HoldReasonStrategyMultiRes = "strategy:multi-resolution" + HoldReasonIntentArchive = "intent:archive" + HoldReasonIntentDecode = "intent:decode" + HoldReasonIntentSurveillance = "intent:surveillance" ) const ( diff --git a/internal/pipeline/decision_queue.go b/internal/pipeline/decision_queue.go index 605a4ec..3b1d072 100644 --- a/internal/pipeline/decision_queue.go +++ b/internal/pipeline/decision_queue.go @@ -57,6 +57,9 @@ type queueSelection struct { expired map[int64]struct{} scores map[int64]float64 tiers map[int64]string + families map[int64]string + familyRanks map[int64]int + tierFloors map[int64]string minScore float64 maxScore float64 cutoff float64 @@ -220,6 +223,9 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in expired: map[int64]struct{}{}, scores: map[int64]float64{}, tiers: map[int64]string{}, + families: map[int64]string{}, + familyRanks: map[int64]int{}, + tierFloors: map[int64]string{}, } if len(queue) == 0 { return selection @@ -243,6 +249,10 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in hint = qd.Class } policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName) + family, familyRank := signalPriorityMatch(policy, qd.Hint, qd.Class) + selection.families[id] = family + selection.familyRanks[id] = familyRank + selection.tierFloors[id] = signalPriorityTierFloor(familyRank) score := qd.SNRDb + boost + policyBoost selection.scores[id] = score if len(scoredList) == 0 || score < selection.minScore { @@ -257,7 +267,8 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in return scoredList[i].score > scoredList[j].score }) for id, score := range selection.scores { - selection.tiers[id] = PriorityTierFromRange(score, selection.minScore, selection.maxScore) + baseTier := PriorityTierFromRange(score, selection.minScore, selection.maxScore) + selection.tiers[id] = applyTierFloor(baseTier, selection.tierFloors[id]) } limit := max if limit <= 0 || limit > len(scoredList) { @@ -278,7 +289,7 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in } } } - displaceable := buildDisplaceableHold(selection.held, selection.protected, selection.tiers, selection.scores) + displaceable := buildDisplaceableHold(selection.held, selection.protected, selection.tiers, selection.scores, selection.familyRanks) for _, s := range scoredList { if _, ok := selection.selected[s.id]; ok { continue @@ -334,11 +345,12 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in return selection } -func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{}, tiers map[int64]string, scores map[int64]float64) []int64 { +func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{}, tiers map[int64]string, scores map[int64]float64, familyRanks map[int64]int) []int64 { type entry struct { - id int64 - rank int - score float64 + id int64 + rank int + familyOrder int + score float64 } candidates := make([]entry, 0, len(held)) for id := range held { @@ -349,10 +361,15 @@ func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{} if scores != nil { score = scores[id] } + familyRank := -1 + if familyRanks != nil { + familyRank = familyRanks[id] + } candidates = append(candidates, entry{ - id: id, - rank: priorityTierRank(tiers[id]), - score: score, + id: id, + rank: priorityTierRank(tiers[id]), + familyOrder: familyDisplaceOrder(familyRank), + score: score, }) } if len(candidates) == 0 { @@ -360,6 +377,9 @@ func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{} } sort.Slice(candidates, func(i, j int) bool { if candidates[i].rank == candidates[j].rank { + if candidates[i].familyOrder != candidates[j].familyOrder { + return candidates[i].familyOrder > candidates[j].familyOrder + } return candidates[i].score < candidates[j].score } return candidates[i].rank < candidates[j].rank @@ -382,6 +402,9 @@ func buildQueueAdmission(queueName string, id int64, selection queueSelection, p Cutoff: selection.cutoff, Tier: selection.tiers[id], } + admission.TierFloor = selection.tierFloors[id] + admission.Family = selection.families[id] + admission.FamilyRank = familyRankForOutput(selection.familyRanks[id]) if _, ok := selection.selected[id]; ok { if _, held := selection.held[id]; held { admission.Class = AdmissionClassHold diff --git a/internal/pipeline/priority.go b/internal/pipeline/priority.go index a9f3c6b..723fcaa 100644 --- a/internal/pipeline/priority.go +++ b/internal/pipeline/priority.go @@ -22,12 +22,15 @@ const ( ) type PriorityAdmission struct { - Tier string `json:"tier,omitempty"` - Class string `json:"class,omitempty"` - Score float64 `json:"score,omitempty"` - Cutoff float64 `json:"cutoff,omitempty"` - Basis string `json:"basis,omitempty"` - Reason string `json:"reason,omitempty"` + Tier string `json:"tier,omitempty"` + TierFloor string `json:"tier_floor,omitempty"` + Family string `json:"family,omitempty"` + FamilyRank int `json:"family_rank,omitempty"` + Class string `json:"class,omitempty"` + Score float64 `json:"score,omitempty"` + Cutoff float64 `json:"cutoff,omitempty"` + Basis string `json:"basis,omitempty"` + Reason string `json:"reason,omitempty"` } func PriorityTierFromRange(score, min, max float64) string { @@ -111,3 +114,60 @@ func slugToken(input string) string { parts := strings.Fields(input) return strings.Join(parts, "-") } + +func signalPriorityMatch(policy Policy, hint string, class string) (string, int) { + tag := strings.ToLower(strings.TrimSpace(hint)) + if tag == "" { + tag = strings.ToLower(strings.TrimSpace(class)) + } + if tag == "" || len(policy.SignalPriorities) == 0 { + return "", -1 + } + for i, want := range policy.SignalPriorities { + w := strings.ToLower(strings.TrimSpace(want)) + if w == "" { + continue + } + if strings.Contains(tag, w) || strings.Contains(w, tag) { + return w, i + } + } + return "", -1 +} + +func signalPriorityTierFloor(rank int) string { + switch rank { + case 0: + return PriorityTierHigh + case 1: + return PriorityTierMedium + case 2: + return PriorityTierLow + default: + return "" + } +} + +func applyTierFloor(tier string, floor string) string { + if floor == "" { + return tier + } + if priorityTierRank(tier) < priorityTierRank(floor) { + return floor + } + return tier +} + +func familyRankForOutput(rank int) int { + if rank < 0 { + return 0 + } + return rank + 1 +} + +func familyDisplaceOrder(rank int) int { + if rank < 0 { + return 100 + } + return rank +} diff --git a/internal/pipeline/scheduler.go b/internal/pipeline/scheduler.go index 48e0377..ef7b85c 100644 --- a/internal/pipeline/scheduler.go +++ b/internal/pipeline/scheduler.go @@ -6,11 +6,14 @@ import ( ) type ScheduledCandidate struct { - Candidate Candidate `json:"candidate"` - Priority float64 `json:"priority"` - Tier string `json:"tier,omitempty"` - Score *RefinementScore `json:"score,omitempty"` - Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` + Candidate Candidate `json:"candidate"` + Priority float64 `json:"priority"` + Tier string `json:"tier,omitempty"` + TierFloor string `json:"tier_floor,omitempty"` + Family string `json:"family,omitempty"` + FamilyRank int `json:"family_rank,omitempty"` + Score *RefinementScore `json:"score,omitempty"` + Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` } type RefinementScoreModel struct { @@ -120,6 +123,9 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { for _, c := range candidates { candidate := c RefreshCandidateEvidenceState(&candidate) + family, familyRank := signalPriorityMatch(policy, candidate.Hint, "") + familyFloor := signalPriorityTierFloor(familyRank) + familyRankOut := familyRankForOutput(familyRank) if !candidateInMonitor(policy, candidate) { plan.DroppedByMonitor++ workItems = append(workItems, RefinementWorkItem{ @@ -127,10 +133,13 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { Status: RefinementStatusDropped, Reason: RefinementReasonMonitorGate, Admission: &PriorityAdmission{ - Tier: PriorityTierBackground, - Class: AdmissionClassDrop, - Basis: "refinement", - Reason: admissionReason(RefinementReasonMonitorGate, policy, holdPolicy), + Tier: PriorityTierBackground, + TierFloor: familyFloor, + Family: family, + FamilyRank: familyRankOut, + Class: AdmissionClassDrop, + Basis: "refinement", + Reason: admissionReason(RefinementReasonMonitorGate, policy, holdPolicy), }, }) continue @@ -142,10 +151,13 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { Status: RefinementStatusDropped, Reason: RefinementReasonBelowSNR, Admission: &PriorityAdmission{ - Tier: PriorityTierBackground, - Class: AdmissionClassDrop, - Basis: "refinement", - Reason: admissionReason(RefinementReasonBelowSNR, policy, holdPolicy), + Tier: PriorityTierBackground, + TierFloor: familyFloor, + Family: family, + FamilyRank: familyRankOut, + Class: AdmissionClassDrop, + Basis: "refinement", + Reason: admissionReason(RefinementReasonBelowSNR, policy, holdPolicy), }, }) continue @@ -180,10 +192,13 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { Weights: &scoreModel, } scored = append(scored, ScheduledCandidate{ - Candidate: candidate, - Priority: priority, - Score: score, - Breakdown: &score.Breakdown, + Candidate: candidate, + Priority: priority, + TierFloor: familyFloor, + Family: family, + FamilyRank: familyRankOut, + Score: score, + Breakdown: &score.Breakdown, }) workItems = append(workItems, RefinementWorkItem{ Candidate: candidate, @@ -193,10 +208,13 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { Status: RefinementStatusPlanned, Reason: RefinementReasonPlanned, Admission: &PriorityAdmission{ - Class: AdmissionClassPlanned, - Score: priority, - Basis: "refinement", - Reason: admissionReason(RefinementReasonPlanned, policy, holdPolicy), + Class: AdmissionClassPlanned, + TierFloor: familyFloor, + Family: family, + FamilyRank: familyRankOut, + Score: priority, + Basis: "refinement", + Reason: admissionReason(RefinementReasonPlanned, policy, holdPolicy), }, }) } @@ -223,7 +241,8 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { plan.PriorityMax = maxPriority plan.PriorityAvg = sumPriority / float64(len(scored)) for i := range scored { - scored[i].Tier = PriorityTierFromRange(scored[i].Priority, minPriority, maxPriority) + baseTier := PriorityTierFromRange(scored[i].Priority, minPriority, maxPriority) + scored[i].Tier = applyTierFloor(baseTier, scored[i].TierFloor) } for i := range workItems { if workItems[i].Admission == nil { @@ -232,7 +251,8 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { if workItems[i].Status != RefinementStatusPlanned { continue } - workItems[i].Admission.Tier = PriorityTierFromRange(workItems[i].Priority, minPriority, maxPriority) + baseTier := PriorityTierFromRange(workItems[i].Priority, minPriority, maxPriority) + workItems[i].Admission.Tier = applyTierFloor(baseTier, workItems[i].Admission.TierFloor) } } plan.Ranked = append(plan.Ranked, scored...)