Переглянути джерело

Add priority tiers and admission classes to pipeline

master
Jan Svabenik 7 години тому
джерело
коміт
4ebd51d22b
5 змінених файлів з 251 додано та 21 видалено
  1. +37
    -0
      internal/pipeline/arbitration.go
  2. +76
    -16
      internal/pipeline/decision_queue.go
  3. +7
    -5
      internal/pipeline/decisions.go
  4. +98
    -0
      internal/pipeline/priority.go
  5. +33
    -0
      internal/pipeline/scheduler.go

+ 37
- 0
internal/pipeline/arbitration.go Переглянути файл

@@ -30,6 +30,7 @@ type RefinementAdmission struct {
Skipped int `json:"skipped"` Skipped int `json:"skipped"`
Displaced int `json:"displaced"` Displaced int `json:"displaced"`
PriorityCutoff float64 `json:"priority_cutoff,omitempty"` PriorityCutoff float64 `json:"priority_cutoff,omitempty"`
PriorityTier string `json:"priority_tier,omitempty"`
Reason string `json:"reason,omitempty"` Reason string `json:"reason,omitempty"`
} }


@@ -123,11 +124,13 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
planned := len(ranked) planned := len(ranked)
admission.Planned = planned admission.Planned = planned
selected := map[int64]struct{}{} selected := map[int64]struct{}{}
held := map[int64]struct{}{}
if hold != nil { if hold != nil {
purgeHold(hold.Active, now) purgeHold(hold.Active, now)
for id := range hold.Active { for id := range hold.Active {
if rankedContains(ranked, id) { if rankedContains(ranked, id) {
selected[id] = struct{}{} selected[id] = struct{}{}
held[id] = struct{}{}
} }
} }
} }
@@ -185,6 +188,10 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
} }
} }
admission.Displaced = len(displaced) admission.Displaced = len(displaced)
admission.PriorityTier = PriorityTierFromRange(admission.PriorityCutoff, plan.PriorityMin, plan.PriorityMax)
if admission.PriorityCutoff > 0 {
admission.Reason = admissionReason("admission:budget", policy, holdPolicy, "budget:"+slugToken(plan.BudgetSource))
}


plan.Selected = admitted plan.Selected = admitted
plan.PriorityCutoff = admission.PriorityCutoff plan.PriorityCutoff = admission.PriorityCutoff
@@ -198,15 +205,45 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
if _, ok := selected[id]; ok { if _, ok := selected[id]; ok {
item.Status = RefinementStatusAdmitted item.Status = RefinementStatusAdmitted
item.Reason = RefinementReasonAdmitted item.Reason = RefinementReasonAdmitted
class := AdmissionClassAdmit
reason := "refinement:admit:budget"
if _, wasHeld := held[id]; wasHeld {
class = AdmissionClassHold
reason = "refinement:admit:hold"
}
if item.Admission == nil {
item.Admission = &PriorityAdmission{Basis: "refinement"}
}
item.Admission.Class = class
item.Admission.Score = item.Priority
item.Admission.Cutoff = admission.PriorityCutoff
item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax)
item.Admission.Reason = admissionReason(reason, policy, holdPolicy, "budget:"+slugToken(plan.BudgetSource))
continue continue
} }
if _, ok := displaced[id]; ok { if _, ok := displaced[id]; ok {
item.Status = RefinementStatusDisplaced item.Status = RefinementStatusDisplaced
item.Reason = RefinementReasonDisplaced item.Reason = RefinementReasonDisplaced
if item.Admission == nil {
item.Admission = &PriorityAdmission{Basis: "refinement"}
}
item.Admission.Class = AdmissionClassDisplace
item.Admission.Score = item.Priority
item.Admission.Cutoff = admission.PriorityCutoff
item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax)
item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, "pressure:hold", "budget:"+slugToken(plan.BudgetSource))
continue continue
} }
item.Status = RefinementStatusSkipped item.Status = RefinementStatusSkipped
item.Reason = RefinementReasonBudget item.Reason = RefinementReasonBudget
if item.Admission == nil {
item.Admission = &PriorityAdmission{Basis: "refinement"}
}
item.Admission.Class = AdmissionClassDefer
item.Admission.Score = item.Priority
item.Admission.Cutoff = admission.PriorityCutoff
item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax)
item.Admission.Reason = admissionReason("refinement:skip:budget", policy, holdPolicy, "pressure:budget", "budget:"+slugToken(plan.BudgetSource))
} }
return RefinementAdmissionResult{ return RefinementAdmissionResult{
Plan: plan, Plan: plan,


+ 76
- 16
internal/pipeline/decision_queue.go Переглянути файл

@@ -32,6 +32,15 @@ type queuedDecision struct {
LastSeen time.Time LastSeen time.Time
} }


type queueSelection struct {
selected map[int64]struct{}
held map[int64]struct{}
scores map[int64]float64
minScore float64
maxScore float64
cutoff float64
}

type decisionQueues struct { type decisionQueues struct {
record map[int64]*queuedDecision record map[int64]*queuedDecision
decode map[int64]*queuedDecision decode map[int64]*queuedDecision
@@ -107,8 +116,8 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel,
stats := DecisionQueueStats{ stats := DecisionQueueStats{
RecordQueued: len(dq.record), RecordQueued: len(dq.record),
DecodeQueued: len(dq.decode), DecodeQueued: len(dq.decode),
RecordSelected: len(recSelected),
DecodeSelected: len(decSelected),
RecordSelected: len(recSelected.selected),
DecodeSelected: len(decSelected.selected),
RecordActive: len(dq.recordHold), RecordActive: len(dq.recordHold),
DecodeActive: len(dq.decodeHold), DecodeActive: len(dq.decodeHold),
RecordOldestS: oldestAge(dq.record, now), RecordOldestS: oldestAge(dq.record, now),
@@ -123,17 +132,19 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel,
for i := range decisions { for i := range decisions {
id := decisions[i].Candidate.ID id := decisions[i].Candidate.ID
if decisions[i].ShouldRecord { if decisions[i].ShouldRecord {
if _, ok := recSelected[id]; !ok {
decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source)
if _, ok := recSelected.selected[id]; !ok {
decisions[i].ShouldRecord = false decisions[i].ShouldRecord = false
decisions[i].Reason = DecisionReasonQueueRecord
decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, "pressure:budget", "budget:"+slugToken(budget.Record.Source))
stats.RecordDropped++ stats.RecordDropped++
} }
} }
if decisions[i].ShouldAutoDecode { if decisions[i].ShouldAutoDecode {
if _, ok := decSelected[id]; !ok {
decisions[i].DecodeAdmission = buildQueueAdmission("decode", id, decSelected, policy, holdPolicy, budget.Decode.Source)
if _, ok := decSelected.selected[id]; !ok {
decisions[i].ShouldAutoDecode = false decisions[i].ShouldAutoDecode = false
if decisions[i].Reason == "" { if decisions[i].Reason == "" {
decisions[i].Reason = DecisionReasonQueueDecode
decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, "pressure:budget", "budget:"+slugToken(budget.Decode.Source))
} }
stats.DecodeDropped++ stats.DecodeDropped++
} }
@@ -142,10 +153,14 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel,
return stats return stats
} }


func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy) map[int64]struct{} {
selected := map[int64]struct{}{}
func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy) queueSelection {
selection := queueSelection{
selected: map[int64]struct{}{},
held: map[int64]struct{}{},
scores: map[int64]float64{},
}
if len(queue) == 0 { if len(queue) == 0 {
return selected
return selection
} }
type scored struct { type scored struct {
id int64 id int64
@@ -163,7 +178,15 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in
hint = qd.Class hint = qd.Class
} }
policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName) policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName)
scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost})
score := qd.SNRDb + boost + policyBoost
selection.scores[id] = score
if len(scoredList) == 0 || score < selection.minScore {
selection.minScore = score
}
if len(scoredList) == 0 || score > selection.maxScore {
selection.maxScore = score
}
scoredList = append(scoredList, scored{id: id, score: score})
} }
sort.Slice(scoredList, func(i, j int) bool { sort.Slice(scoredList, func(i, j int) bool {
return scoredList[i].score > scoredList[j].score return scoredList[i].score > scoredList[j].score
@@ -180,24 +203,61 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in
} }
for id := range hold { for id := range hold {
if _, ok := queue[id]; ok { if _, ok := queue[id]; ok {
selected[id] = struct{}{}
selection.selected[id] = struct{}{}
selection.held[id] = struct{}{}
} }
} }
for _, s := range scoredList { for _, s := range scoredList {
if len(selected) >= limit {
if len(selection.selected) >= limit {
break break
} }
if _, ok := selected[s.id]; ok {
if _, ok := selection.selected[s.id]; ok {
continue continue
} }
selected[s.id] = struct{}{}
selection.selected[s.id] = struct{}{}
} }
if holdDur > 0 { if holdDur > 0 {
for id := range selected {
for id := range selection.selected {
hold[id] = now.Add(holdDur) hold[id] = now.Add(holdDur)
} }
} }
return selected
if len(selection.selected) > 0 {
first := true
for id := range selection.selected {
score := selection.scores[id]
if first || score < selection.cutoff {
selection.cutoff = score
first = false
}
}
}
return selection
}

func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string) *PriorityAdmission {
score, ok := selection.scores[id]
if !ok {
return nil
}
admission := &PriorityAdmission{
Basis: queueName,
Score: score,
Cutoff: selection.cutoff,
Tier: PriorityTierFromRange(score, selection.minScore, selection.maxScore),
}
if _, ok := selection.selected[id]; ok {
if _, held := selection.held[id]; held {
admission.Class = AdmissionClassHold
admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, "pressure:hold", "budget:"+slugToken(budgetSource))
} else {
admission.Class = AdmissionClassAdmit
admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, "budget:"+slugToken(budgetSource))
}
return admission
}
admission.Class = AdmissionClassDefer
admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, "pressure:budget", "budget:"+slugToken(budgetSource))
return admission
} }


func purgeExpired(hold map[int64]time.Time, now time.Time) { func purgeExpired(hold map[int64]time.Time, now time.Time) {


+ 7
- 5
internal/pipeline/decisions.go Переглянути файл

@@ -7,11 +7,13 @@ import (
) )


type SignalDecision struct { type SignalDecision struct {
Candidate Candidate `json:"candidate"`
Class string `json:"class,omitempty"`
ShouldRecord bool `json:"should_record"`
ShouldAutoDecode bool `json:"should_auto_decode"`
Reason string `json:"reason,omitempty"`
Candidate Candidate `json:"candidate"`
Class string `json:"class,omitempty"`
ShouldRecord bool `json:"should_record"`
ShouldAutoDecode bool `json:"should_auto_decode"`
Reason string `json:"reason,omitempty"`
RecordAdmission *PriorityAdmission `json:"record_admission,omitempty"`
DecodeAdmission *PriorityAdmission `json:"decode_admission,omitempty"`
} }


func DecideSignalAction(policy Policy, candidate Candidate, cls *classifier.Classification) SignalDecision { func DecideSignalAction(policy Policy, candidate Candidate, cls *classifier.Classification) SignalDecision {


+ 98
- 0
internal/pipeline/priority.go Переглянути файл

@@ -0,0 +1,98 @@
package pipeline

import (
"strings"
)

const (
PriorityTierCritical = "critical"
PriorityTierHigh = "high"
PriorityTierMedium = "medium"
PriorityTierLow = "low"
PriorityTierBackground = "background"
)

const (
AdmissionClassPlanned = "plan"
AdmissionClassAdmit = "admit"
AdmissionClassHold = "hold"
AdmissionClassDefer = "defer"
AdmissionClassDisplace = "displace"
AdmissionClassDrop = "drop"
)

type PriorityAdmission struct {
Tier string `json:"tier,omitempty"`
Class string `json:"class,omitempty"`
Score float64 `json:"score,omitempty"`
Cutoff float64 `json:"cutoff,omitempty"`
Basis string `json:"basis,omitempty"`
Reason string `json:"reason,omitempty"`
}

func PriorityTierFromRange(score, min, max float64) string {
if max <= min {
return PriorityTierHigh
}
norm := (score - min) / (max - min)
switch {
case norm >= 0.85:
return PriorityTierCritical
case norm >= 0.65:
return PriorityTierHigh
case norm >= 0.45:
return PriorityTierMedium
case norm >= 0.25:
return PriorityTierLow
default:
return PriorityTierBackground
}
}

func admissionReason(base string, policy Policy, holdPolicy HoldPolicy, extras ...string) string {
tags := uniqueReasonTags(policy, holdPolicy, extras...)
if len(tags) == 0 {
return base
}
return base + ":" + strings.Join(tags, ":")
}

func uniqueReasonTags(policy Policy, holdPolicy HoldPolicy, extras ...string) []string {
seen := map[string]struct{}{}
tags := make([]string, 0, 6)
add := func(tag string) {
if tag == "" {
return
}
if _, ok := seen[tag]; ok {
return
}
seen[tag] = struct{}{}
tags = append(tags, tag)
}
if policy.Profile != "" {
add("profile:" + slugToken(policy.Profile))
}
if policy.Intent != "" {
add("intent:" + slugToken(policy.Intent))
}
if policy.RefinementStrategy != "" {
add("strategy:" + slugToken(policy.RefinementStrategy))
}
for _, reason := range holdPolicy.Reasons {
add(reason)
}
for _, extra := range extras {
add(extra)
}
return tags
}

func slugToken(input string) string {
input = strings.TrimSpace(strings.ToLower(input))
if input == "" {
return ""
}
parts := strings.Fields(input)
return strings.Join(parts, "-")
}

+ 33
- 0
internal/pipeline/scheduler.go Переглянути файл

@@ -8,6 +8,7 @@ import (
type ScheduledCandidate struct { type ScheduledCandidate struct {
Candidate Candidate `json:"candidate"` Candidate Candidate `json:"candidate"`
Priority float64 `json:"priority"` Priority float64 `json:"priority"`
Tier string `json:"tier,omitempty"`
Score *RefinementScore `json:"score,omitempty"` Score *RefinementScore `json:"score,omitempty"`
Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"`
} }
@@ -43,6 +44,7 @@ type RefinementWorkItem struct {
Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"`
Status string `json:"status,omitempty"` Status string `json:"status,omitempty"`
Reason string `json:"reason,omitempty"` Reason string `json:"reason,omitempty"`
Admission *PriorityAdmission `json:"admission,omitempty"`
} }


type RefinementExecution struct { type RefinementExecution struct {
@@ -85,6 +87,7 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan {
strategy, strategyReason := refinementStrategy(policy) strategy, strategyReason := refinementStrategy(policy)
budgetModel := BudgetModelFromPolicy(policy) budgetModel := BudgetModelFromPolicy(policy)
budget := budgetModel.Refinement.Max budget := budgetModel.Refinement.Max
holdPolicy := HoldPolicyFromPolicy(policy)
plan := RefinementPlan{ plan := RefinementPlan{
TotalCandidates: len(candidates), TotalCandidates: len(candidates),
MinCandidateSNRDb: policy.MinCandidateSNRDb, MinCandidateSNRDb: policy.MinCandidateSNRDb,
@@ -123,6 +126,12 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan {
Candidate: candidate, Candidate: candidate,
Status: RefinementStatusDropped, Status: RefinementStatusDropped,
Reason: RefinementReasonMonitorGate, Reason: RefinementReasonMonitorGate,
Admission: &PriorityAdmission{
Tier: PriorityTierBackground,
Class: AdmissionClassDrop,
Basis: "refinement",
Reason: admissionReason(RefinementReasonMonitorGate, policy, holdPolicy),
},
}) })
continue continue
} }
@@ -132,6 +141,12 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan {
Candidate: candidate, Candidate: candidate,
Status: RefinementStatusDropped, Status: RefinementStatusDropped,
Reason: RefinementReasonBelowSNR, Reason: RefinementReasonBelowSNR,
Admission: &PriorityAdmission{
Tier: PriorityTierBackground,
Class: AdmissionClassDrop,
Basis: "refinement",
Reason: admissionReason(RefinementReasonBelowSNR, policy, holdPolicy),
},
}) })
continue continue
} }
@@ -177,6 +192,12 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan {
Breakdown: &score.Breakdown, Breakdown: &score.Breakdown,
Status: RefinementStatusPlanned, Status: RefinementStatusPlanned,
Reason: RefinementReasonPlanned, Reason: RefinementReasonPlanned,
Admission: &PriorityAdmission{
Class: AdmissionClassPlanned,
Score: priority,
Basis: "refinement",
Reason: admissionReason(RefinementReasonPlanned, policy, holdPolicy),
},
}) })
} }
sort.Slice(scored, func(i, j int) bool { sort.Slice(scored, func(i, j int) bool {
@@ -201,6 +222,18 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan {
plan.PriorityMin = minPriority plan.PriorityMin = minPriority
plan.PriorityMax = maxPriority plan.PriorityMax = maxPriority
plan.PriorityAvg = sumPriority / float64(len(scored)) plan.PriorityAvg = sumPriority / float64(len(scored))
for i := range scored {
scored[i].Tier = PriorityTierFromRange(scored[i].Priority, minPriority, maxPriority)
}
for i := range workItems {
if workItems[i].Admission == nil {
continue
}
if workItems[i].Status != RefinementStatusPlanned {
continue
}
workItems[i].Admission.Tier = PriorityTierFromRange(workItems[i].Priority, minPriority, maxPriority)
}
} }
plan.Ranked = append(plan.Ranked, scored...) plan.Ranked = append(plan.Ranked, scored...)
plan.WorkItems = workItems plan.WorkItems = workItems


Завантаження…
Відмінити
Зберегти