Просмотр исходного кода

pipeline: deepen hold/displacement semantics

master
Jan Svabenik 4 часов назад
Родитель
Сommit
592fa03b9c
7 измененных файлов: 478 добавлений и 83 удалений
  1. +88
    -21
      internal/pipeline/arbitration.go
  2. +9
    -0
      internal/pipeline/arbitration_reasons.go
  3. +200
    -59
      internal/pipeline/decision_queue.go
  4. +118
    -2
      internal/pipeline/decision_queue_test.go
  5. +21
    -0
      internal/pipeline/hold.go
  6. +15
    -0
      internal/pipeline/priority.go
  7. +27
    -1
      internal/pipeline/scheduler_test.go

+ 88
- 21
internal/pipeline/arbitration.go Просмотреть файл

@@ -29,6 +29,12 @@ type RefinementAdmission struct {
Admitted int `json:"admitted"`
Skipped int `json:"skipped"`
Displaced int `json:"displaced"`
HoldActive int `json:"hold_active"`
HoldSelected int `json:"hold_selected"`
HoldProtected int `json:"hold_protected"`
HoldExpired int `json:"hold_expired"`
HoldDisplaced int `json:"hold_displaced"`
Opportunistic int `json:"opportunistic"`
PriorityCutoff float64 `json:"priority_cutoff,omitempty"`
PriorityTier string `json:"priority_tier,omitempty"`
Reason string `json:"reason,omitempty"`
@@ -127,8 +133,10 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
admission.Planned = planned
selected := map[int64]struct{}{}
held := map[int64]struct{}{}
protected := map[int64]struct{}{}
expired := map[int64]struct{}{}
if hold != nil {
purgeHold(hold.Active, now)
expired = expireHold(hold.Active, now)
for id := range hold.Active {
if rankedContains(ranked, id) {
selected[id] = struct{}{}
@@ -146,20 +154,49 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
limit = planned
}
}
tierByID := map[int64]string{}
scoreByID := map[int64]float64{}
for _, cand := range ranked {
if len(selected) >= limit {
break
tierByID[cand.Candidate.ID] = PriorityTierFromRange(cand.Priority, plan.PriorityMin, plan.PriorityMax)
scoreByID[cand.Candidate.ID] = cand.Priority
}
for id := range held {
if isProtectedTier(tierByID[id]) {
protected[id] = struct{}{}
}
}
displaceable := buildDisplaceableHold(held, protected, tierByID, scoreByID)
opportunistic := map[int64]struct{}{}
displacedHold := map[int64]struct{}{}
for _, cand := range ranked {
if _, ok := selected[cand.Candidate.ID]; ok {
continue
}
if len(selected) < limit {
selected[cand.Candidate.ID] = struct{}{}
continue
}
if len(displaceable) == 0 {
continue
}
target := displaceable[0]
if priorityTierRank(tierByID[cand.Candidate.ID]) <= priorityTierRank(tierByID[target]) {
continue
}
displaceable = displaceable[1:]
delete(selected, target)
displacedHold[target] = struct{}{}
selected[cand.Candidate.ID] = struct{}{}
opportunistic[cand.Candidate.ID] = struct{}{}
}
if hold != nil && admission.HoldMs > 0 {
until := now.Add(time.Duration(admission.HoldMs) * time.Millisecond)
if hold.Active == nil {
hold.Active = map[int64]time.Time{}
}
for id := range displacedHold {
delete(hold.Active, id)
}
for id := range selected {
hold.Active[id] = until
}
@@ -176,8 +213,16 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
if admission.Skipped < 0 {
admission.Skipped = 0
}
if hold != nil {
admission.HoldActive = len(hold.Active)
}
admission.HoldSelected = len(held) - len(displacedHold)
admission.HoldProtected = len(protected)
admission.HoldExpired = len(expired)
admission.HoldDisplaced = len(displacedHold)
admission.Opportunistic = len(opportunistic)

displaced := map[int64]struct{}{}
displacedByHold := map[int64]struct{}{}
if len(admitted) > 0 {
admission.PriorityCutoff = admitted[len(admitted)-1].Priority
for _, cand := range ranked {
@@ -185,11 +230,14 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
continue
}
if cand.Priority >= admission.PriorityCutoff {
displaced[cand.Candidate.ID] = struct{}{}
if _, ok := displacedHold[cand.Candidate.ID]; ok {
continue
}
displacedByHold[cand.Candidate.ID] = struct{}{}
}
}
}
admission.Displaced = len(displaced)
admission.Displaced = len(displacedByHold) + len(displacedHold)
admission.PriorityTier = PriorityTierFromRange(admission.PriorityCutoff, plan.PriorityMin, plan.PriorityMax)
admission.Pressure = buildRefinementPressure(budgetModel, admission)
if admission.PriorityCutoff > 0 {
@@ -220,11 +268,34 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
item.Admission.Class = class
item.Admission.Score = item.Priority
item.Admission.Cutoff = admission.PriorityCutoff
item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax)
item.Admission.Reason = admissionReason(reason, policy, holdPolicy, pressureReasonTag(admission.Pressure), "budget:"+slugToken(plan.BudgetSource))
item.Admission.Tier = tierByID[id]
extras := []string{pressureReasonTag(admission.Pressure), "budget:" + slugToken(plan.BudgetSource)}
if _, wasHeld := held[id]; wasHeld {
extras = append(extras, "pressure:hold", ReasonTagHoldActive)
if _, ok := protected[id]; ok {
extras = append(extras, ReasonTagHoldProtected)
}
}
if _, ok := opportunistic[id]; ok {
extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced)
}
item.Admission.Reason = admissionReason(reason, policy, holdPolicy, extras...)
continue
}
if _, ok := displacedHold[id]; ok {
item.Status = RefinementStatusDisplaced
item.Reason = RefinementReasonDisplaced
if item.Admission == nil {
item.Admission = &PriorityAdmission{Basis: "refinement"}
}
item.Admission.Class = AdmissionClassDisplace
item.Admission.Score = item.Priority
item.Admission.Cutoff = admission.PriorityCutoff
item.Admission.Tier = tierByID[id]
item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:"+slugToken(plan.BudgetSource))
continue
}
if _, ok := displaced[id]; ok {
if _, ok := displacedByHold[id]; ok {
item.Status = RefinementStatusDisplaced
item.Reason = RefinementReasonDisplaced
if item.Admission == nil {
@@ -233,8 +304,8 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
item.Admission.Class = AdmissionClassDisplace
item.Admission.Score = item.Priority
item.Admission.Cutoff = admission.PriorityCutoff
item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax)
item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", "budget:"+slugToken(plan.BudgetSource))
item.Admission.Tier = tierByID[id]
item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", ReasonTagHoldActive, "budget:"+slugToken(plan.BudgetSource))
continue
}
item.Status = RefinementStatusSkipped
@@ -245,8 +316,12 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
item.Admission.Class = AdmissionClassDefer
item.Admission.Score = item.Priority
item.Admission.Cutoff = admission.PriorityCutoff
item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax)
item.Admission.Reason = admissionReason("refinement:skip:budget", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:budget", "budget:"+slugToken(plan.BudgetSource))
item.Admission.Tier = tierByID[id]
extras := []string{pressureReasonTag(admission.Pressure), "pressure:budget", "budget:" + slugToken(plan.BudgetSource)}
if _, ok := expired[id]; ok {
extras = append(extras, ReasonTagHoldExpired)
}
item.Admission.Reason = admissionReason("refinement:skip:budget", policy, holdPolicy, extras...)
}
return RefinementAdmissionResult{
Plan: plan,
@@ -256,14 +331,6 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold
}
}

func purgeHold(active map[int64]time.Time, now time.Time) {
for id, until := range active {
if now.After(until) {
delete(active, id)
}
}
}

func rankedContains(items []ScheduledCandidate, id int64) bool {
for _, item := range items {
if item.Candidate.ID == id {


+ 9
- 0
internal/pipeline/arbitration_reasons.go Просмотреть файл

@@ -24,3 +24,12 @@ const (
HoldReasonStrategyDigital = "strategy:digital"
HoldReasonStrategyMultiRes = "strategy:multi-resolution"
)

const (
ReasonTagHoldActive = "hold:active"
ReasonTagHoldExpired = "hold:expired"
ReasonTagHoldProtected = "hold:protected"
ReasonTagHoldDisplaced = "hold:displaced"
ReasonTagDisplaceOpportunist = "displace:opportunistic"
ReasonTagDisplaceTier = "displace:tier"
)

+ 200
- 59
internal/pipeline/decision_queue.go Просмотреть файл

@@ -6,21 +6,33 @@ import (
)

type DecisionQueueStats struct {
RecordQueued int `json:"record_queued"`
DecodeQueued int `json:"decode_queued"`
RecordSelected int `json:"record_selected"`
DecodeSelected int `json:"decode_selected"`
RecordActive int `json:"record_active"`
DecodeActive int `json:"decode_active"`
RecordOldestS float64 `json:"record_oldest_sec"`
DecodeOldestS float64 `json:"decode_oldest_sec"`
RecordBudget int `json:"record_budget"`
DecodeBudget int `json:"decode_budget"`
HoldMs int `json:"hold_ms"`
RecordHoldMs int `json:"record_hold_ms"`
DecodeHoldMs int `json:"decode_hold_ms"`
RecordDropped int `json:"record_dropped"`
DecodeDropped int `json:"decode_dropped"`
RecordQueued int `json:"record_queued"`
DecodeQueued int `json:"decode_queued"`
RecordSelected int `json:"record_selected"`
DecodeSelected int `json:"decode_selected"`
RecordActive int `json:"record_active"`
DecodeActive int `json:"decode_active"`
RecordOldestS float64 `json:"record_oldest_sec"`
DecodeOldestS float64 `json:"decode_oldest_sec"`
RecordBudget int `json:"record_budget"`
DecodeBudget int `json:"decode_budget"`
HoldMs int `json:"hold_ms"`
RecordHoldMs int `json:"record_hold_ms"`
DecodeHoldMs int `json:"decode_hold_ms"`
RecordDropped int `json:"record_dropped"`
DecodeDropped int `json:"decode_dropped"`
RecordHoldSelected int `json:"record_hold_selected"`
DecodeHoldSelected int `json:"decode_hold_selected"`
RecordHoldProtected int `json:"record_hold_protected"`
DecodeHoldProtected int `json:"decode_hold_protected"`
RecordHoldExpired int `json:"record_hold_expired"`
DecodeHoldExpired int `json:"decode_hold_expired"`
RecordHoldDisplaced int `json:"record_hold_displaced"`
DecodeHoldDisplaced int `json:"decode_hold_displaced"`
RecordOpportunistic int `json:"record_opportunistic"`
DecodeOpportunistic int `json:"decode_opportunistic"`
RecordDisplaced int `json:"record_displaced"`
DecodeDisplaced int `json:"decode_displaced"`
}

type queuedDecision struct {
@@ -33,12 +45,18 @@ type queuedDecision struct {
}

type queueSelection struct {
selected map[int64]struct{}
held map[int64]struct{}
scores map[int64]float64
minScore float64
maxScore float64
cutoff float64
selected map[int64]struct{}
held map[int64]struct{}
protected map[int64]struct{}
displacedByHold map[int64]struct{}
displaced map[int64]struct{}
opportunistic map[int64]struct{}
expired map[int64]struct{}
scores map[int64]float64
tiers map[int64]string
minScore float64
maxScore float64
cutoff float64
}

type decisionQueues struct {
@@ -107,30 +125,42 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel,
}
}

purgeExpired(dq.recordHold, now)
purgeExpired(dq.decodeHold, now)
recExpired := expireHold(dq.recordHold, now)
decExpired := expireHold(dq.decodeHold, now)

recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy)
decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy)
recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy, recExpired)
decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy, decExpired)
recPressure := buildQueuePressure(budget.Record, len(dq.record), len(recSelected.selected), len(dq.recordHold))
decPressure := buildQueuePressure(budget.Decode, len(dq.decode), len(decSelected.selected), len(dq.decodeHold))
recPressureTag := pressureReasonTag(recPressure)
decPressureTag := pressureReasonTag(decPressure)

stats := DecisionQueueStats{
RecordQueued: len(dq.record),
DecodeQueued: len(dq.decode),
RecordSelected: len(recSelected.selected),
DecodeSelected: len(decSelected.selected),
RecordActive: len(dq.recordHold),
DecodeActive: len(dq.decodeHold),
RecordOldestS: oldestAge(dq.record, now),
DecodeOldestS: oldestAge(dq.decode, now),
RecordBudget: budget.Record.Max,
DecodeBudget: budget.Decode.Max,
HoldMs: budget.HoldMs,
RecordHoldMs: holdPolicy.RecordMs,
DecodeHoldMs: holdPolicy.DecodeMs,
RecordQueued: len(dq.record),
DecodeQueued: len(dq.decode),
RecordSelected: len(recSelected.selected),
DecodeSelected: len(decSelected.selected),
RecordActive: len(dq.recordHold),
DecodeActive: len(dq.decodeHold),
RecordOldestS: oldestAge(dq.record, now),
DecodeOldestS: oldestAge(dq.decode, now),
RecordBudget: budget.Record.Max,
DecodeBudget: budget.Decode.Max,
HoldMs: budget.HoldMs,
RecordHoldMs: holdPolicy.RecordMs,
DecodeHoldMs: holdPolicy.DecodeMs,
RecordHoldSelected: len(recSelected.held) - len(recSelected.displaced),
DecodeHoldSelected: len(decSelected.held) - len(decSelected.displaced),
RecordHoldProtected: len(recSelected.protected),
DecodeHoldProtected: len(decSelected.protected),
RecordHoldExpired: len(recExpired),
DecodeHoldExpired: len(decExpired),
RecordHoldDisplaced: len(recSelected.displaced),
DecodeHoldDisplaced: len(decSelected.displaced),
RecordOpportunistic: len(recSelected.opportunistic),
DecodeOpportunistic: len(decSelected.opportunistic),
RecordDisplaced: len(recSelected.displacedByHold),
DecodeDisplaced: len(decSelected.displacedByHold),
}

for i := range decisions {
@@ -139,7 +169,15 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel,
decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source, recPressureTag)
if _, ok := recSelected.selected[id]; !ok {
decisions[i].ShouldRecord = false
decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, recPressureTag, "pressure:budget", "budget:"+slugToken(budget.Record.Source))
extras := []string{recPressureTag, "pressure:budget", "budget:" + slugToken(budget.Record.Source)}
if _, ok := recSelected.displaced[id]; ok {
extras = []string{recPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Record.Source)}
} else if _, ok := recSelected.displacedByHold[id]; ok {
extras = []string{recPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Record.Source)}
} else if _, ok := recSelected.expired[id]; ok {
extras = append(extras, ReasonTagHoldExpired)
}
decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, extras...)
stats.RecordDropped++
}
}
@@ -148,7 +186,15 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel,
if _, ok := decSelected.selected[id]; !ok {
decisions[i].ShouldAutoDecode = false
if decisions[i].Reason == "" {
decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, decPressureTag, "pressure:budget", "budget:"+slugToken(budget.Decode.Source))
extras := []string{decPressureTag, "pressure:budget", "budget:" + slugToken(budget.Decode.Source)}
if _, ok := decSelected.displaced[id]; ok {
extras = []string{decPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Decode.Source)}
} else if _, ok := decSelected.displacedByHold[id]; ok {
extras = []string{decPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Decode.Source)}
} else if _, ok := decSelected.expired[id]; ok {
extras = append(extras, ReasonTagHoldExpired)
}
decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, extras...)
}
stats.DecodeDropped++
}
@@ -157,15 +203,24 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel,
return stats
}

func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy) queueSelection {
func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy, expired map[int64]struct{}) queueSelection {
selection := queueSelection{
selected: map[int64]struct{}{},
held: map[int64]struct{}{},
scores: map[int64]float64{},
selected: map[int64]struct{}{},
held: map[int64]struct{}{},
protected: map[int64]struct{}{},
displacedByHold: map[int64]struct{}{},
displaced: map[int64]struct{}{},
opportunistic: map[int64]struct{}{},
expired: map[int64]struct{}{},
scores: map[int64]float64{},
tiers: map[int64]string{},
}
if len(queue) == 0 {
return selection
}
for id := range expired {
selection.expired[id] = struct{}{}
}
type scored struct {
id int64
score float64
@@ -195,6 +250,9 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in
sort.Slice(scoredList, func(i, j int) bool {
return scoredList[i].score > scoredList[j].score
})
for id, score := range selection.scores {
selection.tiers[id] = PriorityTierFromRange(score, selection.minScore, selection.maxScore)
}
limit := max
if limit <= 0 || limit > len(scoredList) {
limit = len(scoredList)
@@ -209,18 +267,37 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in
if _, ok := queue[id]; ok {
selection.selected[id] = struct{}{}
selection.held[id] = struct{}{}
if isProtectedTier(selection.tiers[id]) {
selection.protected[id] = struct{}{}
}
}
}
displaceable := buildDisplaceableHold(selection.held, selection.protected, selection.tiers, selection.scores)
for _, s := range scoredList {
if len(selection.selected) >= limit {
break
}
if _, ok := selection.selected[s.id]; ok {
continue
}
if len(selection.selected) < limit {
selection.selected[s.id] = struct{}{}
continue
}
if len(displaceable) == 0 {
continue
}
target := displaceable[0]
if priorityTierRank(selection.tiers[s.id]) <= priorityTierRank(selection.tiers[target]) {
continue
}
displaceable = displaceable[1:]
delete(selection.selected, target)
selection.displaced[target] = struct{}{}
selection.selected[s.id] = struct{}{}
selection.opportunistic[s.id] = struct{}{}
}
if holdDur > 0 {
for id := range selection.displaced {
delete(hold, id)
}
for id := range selection.selected {
hold[id] = now.Add(holdDur)
}
@@ -235,9 +312,59 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in
}
}
}
if len(selection.selected) > 0 {
for id := range selection.scores {
if _, ok := selection.selected[id]; ok {
continue
}
if _, ok := selection.displaced[id]; ok {
continue
}
if selection.scores[id] >= selection.cutoff {
selection.displacedByHold[id] = struct{}{}
}
}
}
return selection
}

func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{}, tiers map[int64]string, scores map[int64]float64) []int64 {
type entry struct {
id int64
rank int
score float64
}
candidates := make([]entry, 0, len(held))
for id := range held {
if _, ok := protected[id]; ok {
continue
}
score := 0.0
if scores != nil {
score = scores[id]
}
candidates = append(candidates, entry{
id: id,
rank: priorityTierRank(tiers[id]),
score: score,
})
}
if len(candidates) == 0 {
return nil
}
sort.Slice(candidates, func(i, j int) bool {
if candidates[i].rank == candidates[j].rank {
return candidates[i].score < candidates[j].score
}
return candidates[i].rank < candidates[j].rank
})
out := make([]int64, 0, len(candidates))
for _, c := range candidates {
out = append(out, c.id)
}
return out
}

func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string, pressureTag string) *PriorityAdmission {
score, ok := selection.scores[id]
if !ok {
@@ -247,29 +374,43 @@ func buildQueueAdmission(queueName string, id int64, selection queueSelection, p
Basis: queueName,
Score: score,
Cutoff: selection.cutoff,
Tier: PriorityTierFromRange(score, selection.minScore, selection.maxScore),
Tier: selection.tiers[id],
}
if _, ok := selection.selected[id]; ok {
if _, held := selection.held[id]; held {
admission.Class = AdmissionClassHold
admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, pressureTag, "pressure:hold", "budget:"+slugToken(budgetSource))
extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)}
if _, ok := selection.protected[id]; ok {
extras = append(extras, ReasonTagHoldProtected)
}
admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, extras...)
} else {
admission.Class = AdmissionClassAdmit
admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, pressureTag, "budget:"+slugToken(budgetSource))
extras := []string{pressureTag, "budget:" + slugToken(budgetSource)}
if _, ok := selection.opportunistic[id]; ok {
extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced)
}
admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, extras...)
}
return admission
}
if _, ok := selection.displaced[id]; ok {
admission.Class = AdmissionClassDisplace
admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:"+slugToken(budgetSource))
return admission
}
if _, ok := selection.displacedByHold[id]; ok {
admission.Class = AdmissionClassDisplace
admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:"+slugToken(budgetSource))
return admission
}
admission.Class = AdmissionClassDefer
admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, pressureTag, "pressure:budget", "budget:"+slugToken(budgetSource))
return admission
}

func purgeExpired(hold map[int64]time.Time, now time.Time) {
for id, until := range hold {
if now.After(until) {
delete(hold, id)
}
extras := []string{pressureTag, "pressure:budget", "budget:" + slugToken(budgetSource)}
if _, ok := selection.expired[id]; ok {
extras = append(extras, ReasonTagHoldExpired)
}
admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, extras...)
return admission
}

func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 {


+ 118
- 2
internal/pipeline/decision_queue_test.go Просмотреть файл

@@ -81,8 +81,9 @@ func TestDecisionQueueHoldKeepsSelection(t *testing.T) {
}

decisions = []SignalDecision{
{Candidate: Candidate{ID: 1, SNRDb: 25}, ShouldRecord: true, ShouldAutoDecode: true},
{Candidate: Candidate{ID: 2, SNRDb: 2}, ShouldRecord: true, ShouldAutoDecode: true},
{Candidate: Candidate{ID: 1, SNRDb: 32}, ShouldRecord: true, ShouldAutoDecode: true},
{Candidate: Candidate{ID: 2, SNRDb: 30}, ShouldRecord: true, ShouldAutoDecode: true},
{Candidate: Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: true},
}
arbiter.ApplyDecisions(decisions, budget, now.Add(100*time.Millisecond), policy)
if !decisions[1].ShouldRecord || !decisions[1].ShouldAutoDecode {
@@ -95,3 +96,118 @@ func TestDecisionQueueHoldKeepsSelection(t *testing.T) {
t.Fatalf("expected record admission hold class, got %+v", decisions[1].RecordAdmission)
}
}

func TestDecisionQueueHighTierHoldProtected(t *testing.T) {
arbiter := NewArbiter()
policy := Policy{DecisionHoldMs: 500}
budget := BudgetModel{Record: BudgetQueue{Max: 1}}
now := time.Now()

decisions := []SignalDecision{
{Candidate: Candidate{ID: 1, SNRDb: 30}, ShouldRecord: true},
{Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true},
}
arbiter.ApplyDecisions(decisions, budget, now, policy)
if !decisions[0].ShouldRecord {
t.Fatalf("expected candidate 1 to be selected initially")
}

decisions = []SignalDecision{
{Candidate: Candidate{ID: 1, SNRDb: 30}, ShouldRecord: true},
{Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true},
{Candidate: Candidate{ID: 3, SNRDb: 32}, ShouldRecord: true},
}
arbiter.ApplyDecisions(decisions, budget, now.Add(100*time.Millisecond), policy)
if !decisions[0].ShouldRecord {
t.Fatalf("expected protected hold to keep candidate 1")
}
if decisions[2].ShouldRecord {
t.Fatalf("expected candidate 3 to remain deferred behind protected hold")
}
if decisions[0].RecordAdmission == nil || decisions[0].RecordAdmission.Class != AdmissionClassHold {
t.Fatalf("expected hold admission for candidate 1, got %+v", decisions[0].RecordAdmission)
}
if decisions[2].RecordAdmission == nil || decisions[2].RecordAdmission.Class != AdmissionClassDisplace {
t.Fatalf("expected displacement admission for candidate 3, got %+v", decisions[2].RecordAdmission)
}
}

func TestDecisionQueueOpportunisticDisplacement(t *testing.T) {
arbiter := NewArbiter()
policy := Policy{DecisionHoldMs: 500}
budget := BudgetModel{Record: BudgetQueue{Max: 1}}
now := time.Now()

decisions := []SignalDecision{
{Candidate: Candidate{ID: 1, SNRDb: 15}, ShouldRecord: true},
{Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true},
}
arbiter.ApplyDecisions(decisions, budget, now, policy)
if !decisions[0].ShouldRecord {
t.Fatalf("expected candidate 1 to be selected initially")
}

decisions = []SignalDecision{
{Candidate: Candidate{ID: 1, SNRDb: 5}, ShouldRecord: true},
{Candidate: Candidate{ID: 2, SNRDb: 4}, ShouldRecord: true},
{Candidate: Candidate{ID: 3, SNRDb: 30}, ShouldRecord: true},
}
arbiter.ApplyDecisions(decisions, budget, now.Add(100*time.Millisecond), policy)
if decisions[0].ShouldRecord {
t.Fatalf("expected candidate 1 to be displaced")
}
if !decisions[2].ShouldRecord {
t.Fatalf("expected candidate 3 to opportunistically displace hold")
}
if decisions[0].RecordAdmission == nil || decisions[0].RecordAdmission.Class != AdmissionClassDisplace {
t.Fatalf("expected displacement admission for candidate 1, got %+v", decisions[0].RecordAdmission)
}
if decisions[2].RecordAdmission == nil || decisions[2].RecordAdmission.Class != AdmissionClassAdmit {
t.Fatalf("expected admit admission for candidate 3, got %+v", decisions[2].RecordAdmission)
}
if decisions[2].RecordAdmission == nil || !strings.Contains(decisions[2].RecordAdmission.Reason, ReasonTagDisplaceOpportunist) {
t.Fatalf("expected opportunistic displacement reason, got %+v", decisions[2].RecordAdmission)
}
}

func TestDecisionQueueHoldExpiryChurn(t *testing.T) {
arbiter := NewArbiter()
policy := Policy{DecisionHoldMs: 100}
budget := BudgetModel{Record: BudgetQueue{Max: 1}}
now := time.Now()

decisions := []SignalDecision{
{Candidate: Candidate{ID: 1, SNRDb: 12}, ShouldRecord: true},
{Candidate: Candidate{ID: 2, SNRDb: 10}, ShouldRecord: true},
}
arbiter.ApplyDecisions(decisions, budget, now, policy)
if !decisions[0].ShouldRecord {
t.Fatalf("expected candidate 1 to be selected initially")
}

decisions = []SignalDecision{
{Candidate: Candidate{ID: 1, SNRDb: 30}, ShouldRecord: true},
{Candidate: Candidate{ID: 2, SNRDb: 32}, ShouldRecord: true},
{Candidate: Candidate{ID: 3, SNRDb: 5}, ShouldRecord: true},
}
arbiter.ApplyDecisions(decisions, budget, now.Add(50*time.Millisecond), policy)
if !decisions[0].ShouldRecord {
t.Fatalf("expected hold to keep candidate 1 before expiry")
}

decisions = []SignalDecision{
{Candidate: Candidate{ID: 1, SNRDb: 30}, ShouldRecord: true},
{Candidate: Candidate{ID: 2, SNRDb: 32}, ShouldRecord: true},
{Candidate: Candidate{ID: 3, SNRDb: 5}, ShouldRecord: true},
}
arbiter.ApplyDecisions(decisions, budget, now.Add(200*time.Millisecond), policy)
if decisions[0].ShouldRecord {
t.Fatalf("expected candidate 1 to be released after hold expiry")
}
if !decisions[1].ShouldRecord {
t.Fatalf("expected candidate 2 to be selected after hold expiry")
}
if decisions[0].RecordAdmission == nil || !strings.Contains(decisions[0].RecordAdmission.Reason, ReasonTagHoldExpired) {
t.Fatalf("expected hold expiry reason, got %+v", decisions[0].RecordAdmission)
}
}

+ 21
- 0
internal/pipeline/hold.go Просмотреть файл

@@ -0,0 +1,21 @@
package pipeline

import "time"

func expireHold(hold map[int64]time.Time, now time.Time) map[int64]struct{} {
if len(hold) == 0 {
return map[int64]struct{}{}
}
expired := map[int64]struct{}{}
for id, until := range hold {
if now.After(until) {
expired[id] = struct{}{}
delete(hold, id)
}
}
return expired
}

func isProtectedTier(tier string) bool {
return priorityTierRank(tier) >= priorityTierRank(PriorityTierHigh)
}

+ 15
- 0
internal/pipeline/priority.go Просмотреть файл

@@ -49,6 +49,21 @@ func PriorityTierFromRange(score, min, max float64) string {
}
}

func priorityTierRank(tier string) int {
switch tier {
case PriorityTierCritical:
return 4
case PriorityTierHigh:
return 3
case PriorityTierMedium:
return 2
case PriorityTierLow:
return 1
default:
return 0
}
}

func admissionReason(base string, policy Policy, holdPolicy HoldPolicy, extras ...string) string {
tags := uniqueReasonTags(policy, holdPolicy, extras...)
if len(tags) == 0 {


+ 27
- 1
internal/pipeline/scheduler_test.go Просмотреть файл

@@ -1,6 +1,7 @@
package pipeline

import (
"strings"
"testing"
"time"
)
@@ -302,8 +303,9 @@ func TestAdmitRefinementPlanAppliesBudget(t *testing.T) {
func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) {
policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0}
cands := []Candidate{
{ID: 1, CenterHz: 100, SNRDb: 5},
{ID: 1, CenterHz: 100, SNRDb: 9},
{ID: 2, CenterHz: 200, SNRDb: 12},
{ID: 3, CenterHz: 300, SNRDb: 2},
}
plan := BuildRefinementPlan(cands, policy)
hold := &RefinementHold{Active: map[int64]time.Time{1: time.Now().Add(2 * time.Second)}}
@@ -320,6 +322,30 @@ func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) {
}
}

func TestAdmitRefinementPlanOpportunisticDisplacement(t *testing.T) {
policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0, DecisionHoldMs: 500}
cands := []Candidate{
{ID: 1, CenterHz: 100, SNRDb: 5},
{ID: 2, CenterHz: 200, SNRDb: 25},
}
plan := BuildRefinementPlan(cands, policy)
hold := &RefinementHold{Active: map[int64]time.Time{1: time.Now().Add(2 * time.Second)}}
res := AdmitRefinementPlan(plan, policy, time.Now(), hold)
if len(res.Plan.Selected) != 1 || res.Plan.Selected[0].Candidate.ID != 2 {
t.Fatalf("expected opportunistic displacement to admit candidate 2, got %+v", res.Plan.Selected)
}
item1 := findWorkItem(res.WorkItems, 1)
if item1 == nil || item1.Status != RefinementStatusDisplaced {
t.Fatalf("expected candidate 1 displaced, got %+v", item1)
}
if item1.Admission == nil || item1.Admission.Class != AdmissionClassDisplace {
t.Fatalf("expected displaced admission class, got %+v", item1.Admission)
}
if item1.Admission == nil || !strings.Contains(item1.Admission.Reason, ReasonTagDisplaceOpportunist) {
t.Fatalf("expected opportunistic displacement reason, got %+v", item1.Admission)
}
}

func TestRefinementStrategyUsesProfile(t *testing.T) {
strategy, reason := refinementStrategy(Policy{Profile: "digital-hunting"})
if strategy != "digital-hunting" || reason != "profile" {


Загрузка…
Отмена
Сохранить