From 822829cc233f33a435e5220f054fee5ba2b8274e Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sun, 22 Mar 2026 11:45:42 +0100 Subject: [PATCH] Add conservative budget rebalance layer --- cmd/sdrd/arbitration_snapshot.go | 1 + cmd/sdrd/pipeline_runtime.go | 11 +- cmd/sdrd/types.go | 1 + internal/pipeline/arbiter.go | 10 + internal/pipeline/arbitration.go | 6 +- internal/pipeline/arbitration_state.go | 2 + internal/pipeline/budget.go | 28 +- internal/pipeline/decision_queue.go | 10 +- internal/pipeline/pressure.go | 9 +- internal/pipeline/rebalance.go | 365 +++++++++++++++++++++++++ internal/pipeline/rebalance_test.go | 119 ++++++++ internal/pipeline/scheduler.go | 7 +- 12 files changed, 549 insertions(+), 20 deletions(-) create mode 100644 internal/pipeline/rebalance.go create mode 100644 internal/pipeline/rebalance_test.go diff --git a/cmd/sdrd/arbitration_snapshot.go b/cmd/sdrd/arbitration_snapshot.go index d9e5d8a..3cb26a7 100644 --- a/cmd/sdrd/arbitration_snapshot.go +++ b/cmd/sdrd/arbitration_snapshot.go @@ -9,6 +9,7 @@ func buildArbitrationSnapshot(step pipeline.RefinementStep, arb pipeline.Arbitra RefinementAdmission: &arb.Refinement, Queue: arb.Queue, Pressure: &arb.Pressure, + Rebalance: &arb.Rebalance, DecisionSummary: summarizeDecisions(step.Result.Decisions), DecisionItems: compactDecisions(step.Result.Decisions), } diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index 1c0d371..f6231af 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -539,8 +539,11 @@ func (rt *dspRuntime) derivedDetectorForLevel(level pipeline.AnalysisLevel) *der func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now time.Time) pipeline.RefinementInput { policy := pipeline.PolicyFromConfig(rt.cfg) - plan := pipeline.BuildRefinementPlan(surv.Candidates, policy) - admission := rt.arbiter.AdmitRefinement(plan, policy, now) + baseBudget := pipeline.BudgetModelFromPolicy(policy) + pressure := pipeline.BuildBudgetPressureSummary(baseBudget, rt.arbitration.Refinement, rt.arbitration.Queue) + budget := pipeline.ApplyBudgetRebalance(policy, baseBudget, pressure) + plan := pipeline.BuildRefinementPlanWithBudget(surv.Candidates, policy, budget) + admission := rt.arbiter.AdmitRefinementWithBudget(plan, policy, budget, now) plan = admission.Plan workItems := make([]pipeline.RefinementWorkItem, 0, len(admission.WorkItems)) if len(admission.WorkItems) > 0 { @@ -593,7 +596,7 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now Detail: detailLevel, Context: surv.Context, Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan}, - Budgets: pipeline.BudgetModelFromPolicy(policy), + Budgets: budget, Admission: admission.Admission, Candidates: append([]pipeline.Candidate(nil), surv.Candidates...), Scheduled: scheduled, @@ -695,7 +698,7 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin } } } - budget := pipeline.BudgetModelFromPolicy(policy) + budget := input.Budgets queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy) rt.setArbitration(policy, budget, input.Admission, queueStats) summary := summarizeDecisions(decisions) diff --git a/cmd/sdrd/types.go b/cmd/sdrd/types.go index 0307789..aaf1098 100644 --- a/cmd/sdrd/types.go +++ b/cmd/sdrd/types.go @@ -53,6 +53,7 @@ type ArbitrationSnapshot struct { RefinementAdmission *pipeline.RefinementAdmission `json:"refinement_admission,omitempty"` Queue pipeline.DecisionQueueStats `json:"queue,omitempty"` Pressure *pipeline.BudgetPressureSummary `json:"pressure,omitempty"` + Rebalance *pipeline.BudgetRebalance `json:"rebalance,omitempty"` DecisionSummary decisionSummary `json:"decision_summary,omitempty"` DecisionItems []compactDecision `json:"decision_items,omitempty"` } diff --git a/internal/pipeline/arbiter.go b/internal/pipeline/arbiter.go index 60a1118..3a89d25 100644 --- a/internal/pipeline/arbiter.go +++ b/internal/pipeline/arbiter.go @@ -24,6 +24,16 @@ func (a *Arbiter) AdmitRefinement(plan RefinementPlan, policy Policy, now time.T return AdmitRefinementPlan(plan, policy, now, a.refinementHold) } +func (a *Arbiter) AdmitRefinementWithBudget(plan RefinementPlan, policy Policy, budget BudgetModel, now time.Time) RefinementAdmissionResult { + if a == nil { + return AdmitRefinementPlanWithBudget(plan, policy, budget, now, nil) + } + if a.refinementHold == nil { + a.refinementHold = &RefinementHold{Active: map[int64]time.Time{}} + } + return AdmitRefinementPlanWithBudget(plan, policy, budget, now, a.refinementHold) +} + func (a *Arbiter) ApplyDecisions(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats { if a == nil || a.queues == nil { return DecisionQueueStats{} diff --git a/internal/pipeline/arbitration.go b/internal/pipeline/arbitration.go index 2bc2ffe..f701b81 100644 --- a/internal/pipeline/arbitration.go +++ b/internal/pipeline/arbitration.go @@ -128,6 +128,11 @@ func HoldPolicyFromPolicy(policy Policy) HoldPolicy { } func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold *RefinementHold) RefinementAdmissionResult { + budget := BudgetModelFromPolicy(policy) + return AdmitRefinementPlanWithBudget(plan, policy, budget, now, hold) +} + +func AdmitRefinementPlanWithBudget(plan RefinementPlan, policy Policy, budgetModel BudgetModel, now time.Time, hold *RefinementHold) RefinementAdmissionResult { ranked := plan.Ranked if len(ranked) == 0 { ranked = plan.Selected @@ -143,7 +148,6 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold } holdPolicy := HoldPolicyFromPolicy(policy) - budgetModel := BudgetModelFromPolicy(policy) admission.DecisionHoldMs = holdPolicy.BaseMs admission.HoldMs = holdPolicy.RefinementMs admission.HoldSource = "resources.decision_hold_ms" diff --git a/internal/pipeline/arbitration_state.go b/internal/pipeline/arbitration_state.go index f1065d3..4e390ef 100644 --- a/internal/pipeline/arbitration_state.go +++ b/internal/pipeline/arbitration_state.go @@ -6,6 +6,7 @@ type ArbitrationState struct { Refinement RefinementAdmission `json:"refinement,omitempty"` Queue DecisionQueueStats `json:"queue,omitempty"` Pressure BudgetPressureSummary `json:"pressure,omitempty"` + Rebalance BudgetRebalance `json:"rebalance,omitempty"` } func BuildArbitrationState(policy Policy, budget BudgetModel, admission RefinementAdmission, queue DecisionQueueStats) ArbitrationState { @@ -15,5 +16,6 @@ func BuildArbitrationState(policy Policy, budget BudgetModel, admission Refineme Refinement: admission, Queue: queue, Pressure: BuildBudgetPressureSummary(budget, admission, queue), + Rebalance: budget.Rebalance, } } diff --git a/internal/pipeline/budget.go b/internal/pipeline/budget.go index 84b3813..65d5f35 100644 --- a/internal/pipeline/budget.go +++ b/internal/pipeline/budget.go @@ -3,11 +3,13 @@ package pipeline import "strings" type BudgetQueue struct { - Max int `json:"max"` - IntentBias float64 `json:"intent_bias,omitempty"` - Preference float64 `json:"preference,omitempty"` - EffectiveMax float64 `json:"effective_max,omitempty"` - Source string `json:"source,omitempty"` + Max int `json:"max"` + IntentBias float64 `json:"intent_bias,omitempty"` + Preference float64 `json:"preference,omitempty"` + EffectiveMax float64 `json:"effective_max,omitempty"` + RebalancedMax int `json:"rebalanced_max,omitempty"` + RebalanceDelta int `json:"rebalance_delta,omitempty"` + Source string `json:"source,omitempty"` } type BudgetPreference struct { @@ -26,6 +28,7 @@ type BudgetModel struct { Profile string `json:"profile,omitempty"` Strategy string `json:"strategy,omitempty"` Preference BudgetPreference `json:"preference,omitempty"` + Rebalance BudgetRebalance `json:"rebalance,omitempty"` } func BudgetModelFromPolicy(policy Policy) BudgetModel { @@ -64,6 +67,11 @@ func BudgetModelFromPolicy(policy Policy) BudgetModel { } } +func BudgetModelFromPolicyWithRebalance(policy Policy, pressure BudgetPressureSummary) BudgetModel { + base := BudgetModelFromPolicy(policy) + return ApplyBudgetRebalance(policy, base, pressure) +} + func refinementBudgetFromPolicy(policy Policy) (int, string) { budget := policy.MaxRefinementJobs source := "resources.max_refinement_jobs" @@ -187,3 +195,13 @@ func effectiveBudget(max int, preference float64) float64 { } return float64(max) * preference } + +func budgetQueueLimit(queue BudgetQueue) int { + if queue.RebalanceDelta != 0 { + return queue.RebalancedMax + } + if queue.RebalancedMax != 0 { + return queue.RebalancedMax + } + return queue.Max +} diff --git a/internal/pipeline/decision_queue.go b/internal/pipeline/decision_queue.go index 3b1d072..2d22172 100644 --- a/internal/pipeline/decision_queue.go +++ b/internal/pipeline/decision_queue.go @@ -134,8 +134,10 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, recExpired := expireHold(dq.recordHold, now) decExpired := expireHold(dq.decodeHold, now) - recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy, recExpired) - decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy, decExpired) + recordBudget := budgetQueueLimit(budget.Record) + decodeBudget := budgetQueueLimit(budget.Decode) + recSelected := selectQueued("record", dq.record, dq.recordHold, recordBudget, recordHold, now, policy, recExpired) + decSelected := selectQueued("decode", dq.decode, dq.decodeHold, decodeBudget, decodeHold, now, policy, decExpired) recPressure := buildQueuePressure(budget.Record, len(dq.record), len(recSelected.selected), len(dq.recordHold)) decPressure := buildQueuePressure(budget.Decode, len(dq.decode), len(decSelected.selected), len(dq.decodeHold)) recPressureTag := pressureReasonTag(recPressure) @@ -150,8 +152,8 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, DecodeActive: len(dq.decodeHold), RecordOldestS: oldestAge(dq.record, now), DecodeOldestS: oldestAge(dq.decode, now), - RecordBudget: budget.Record.Max, - DecodeBudget: budget.Decode.Max, + RecordBudget: recordBudget, + DecodeBudget: decodeBudget, HoldMs: holdPolicy.BaseMs, DecisionHoldMs: holdPolicy.BaseMs, RecordHoldMs: holdPolicy.RecordMs, diff --git a/internal/pipeline/pressure.go b/internal/pipeline/pressure.go index 4efb983..ec74039 100644 --- a/internal/pipeline/pressure.go +++ b/internal/pipeline/pressure.go @@ -43,27 +43,28 @@ func buildQueuePressure(queue BudgetQueue, queued, selected, active int) BudgetP } func buildPressure(queue BudgetQueue, demand int, queued int, selected int, active int) BudgetPressure { + maxBudget := budgetQueueLimit(queue) effective := queue.EffectiveMax preference := queue.Preference - if effective <= 0 && queue.Max > 0 { + if effective <= 0 && maxBudget > 0 { if preference <= 0 { preference = 1.0 } - effective = float64(queue.Max) * preference + effective = float64(maxBudget) * preference } pressure := 0.0 level := "" switch { case demand == 0: level = "idle" - case queue.Max <= 0: + case maxBudget <= 0: level = "blocked" case effective > 0: pressure = float64(demand) / effective level = pressureLevel(pressure) } return BudgetPressure{ - Max: queue.Max, + Max: maxBudget, Effective: roundFloat(pressureEffectiveMax(effective)), Preference: preference, Demand: demand, diff --git a/internal/pipeline/rebalance.go b/internal/pipeline/rebalance.go new file mode 100644 index 0000000..fe74976 --- /dev/null +++ b/internal/pipeline/rebalance.go @@ -0,0 +1,365 @@ +package pipeline + +import "strings" + +type BudgetRebalance struct { + Mode string `json:"mode,omitempty"` + MaxShift int `json:"max_shift,omitempty"` + Active bool `json:"active,omitempty"` + Protect []string `json:"protect,omitempty"` + Favor []string `json:"favor,omitempty"` + Reasons []string `json:"reasons,omitempty"` + Adjustments BudgetRebalanceAdjustments `json:"adjustments,omitempty"` + favorWeights map[string]float64 `json:"-"` + protectMap map[string]bool `json:"-"` +} + +type BudgetRebalanceAdjustments struct { + Refinement int `json:"refinement,omitempty"` + Record int `json:"record,omitempty"` + Decode int `json:"decode,omitempty"` +} + +type rebalanceQueue struct { + name string + baseMax int + max int + pressure BudgetPressure + protect bool + favor float64 +} + +func ApplyBudgetRebalance(policy Policy, budget BudgetModel, pressure BudgetPressureSummary) BudgetModel { + state := buildRebalanceState(policy) + budget.Rebalance = state + if state.MaxShift <= 0 { + return budget + } + queues := []rebalanceQueue{ + { + name: "refinement", + baseMax: budget.Refinement.Max, + max: budget.Refinement.Max, + pressure: pressure.Refinement, + protect: false, + favor: state.favorWeight("refinement"), + }, + { + name: "record", + baseMax: budget.Record.Max, + max: budget.Record.Max, + pressure: pressure.Record, + protect: state.protects("record"), + favor: state.favorWeight("record"), + }, + { + name: "decode", + baseMax: budget.Decode.Max, + max: budget.Decode.Max, + pressure: pressure.Decode, + protect: state.protects("decode"), + favor: state.favorWeight("decode"), + }, + } + + for i := 0; i < state.MaxShift; i++ { + recvIdx := pickRebalanceReceiver(queues) + donorIdx := pickRebalanceDonor(queues) + if recvIdx < 0 || donorIdx < 0 || recvIdx == donorIdx { + break + } + if queues[donorIdx].max <= 1 { + break + } + queues[donorIdx].max-- + queues[recvIdx].max++ + state.Active = true + } + + applyRebalanceQueue(&budget.Refinement, queues[0]) + applyRebalanceQueue(&budget.Record, queues[1]) + applyRebalanceQueue(&budget.Decode, queues[2]) + + if state.Active { + state.Adjustments = BudgetRebalanceAdjustments{ + Refinement: budget.Refinement.RebalanceDelta, + Record: budget.Record.RebalanceDelta, + Decode: budget.Decode.RebalanceDelta, + } + budget.Rebalance = state + } + + return budget +} + +func applyRebalanceQueue(queue *BudgetQueue, state rebalanceQueue) { + if queue == nil { + return + } + delta := state.max - state.baseMax + queue.RebalanceDelta = delta + if delta != 0 { + queue.RebalancedMax = state.max + } else { + queue.RebalancedMax = 0 + } + queue.EffectiveMax = effectiveBudget(budgetQueueLimit(*queue), queue.Preference) +} + +func buildRebalanceState(policy Policy) BudgetRebalance { + state := BudgetRebalance{ + Mode: "conservative", + MaxShift: 1, + } + profile := strings.ToLower(strings.TrimSpace(policy.Profile)) + intent := strings.ToLower(strings.TrimSpace(policy.Intent)) + strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) + + protect := map[string]bool{} + favor := map[string]float64{ + "refinement": 1.0, + "record": 1.0, + "decode": 1.0, + } + reasons := make([]string, 0, 6) + addReason := func(tag string) { + if tag == "" { + return + } + for _, r := range reasons { + if r == tag { + return + } + } + reasons = append(reasons, tag) + } + legacy := strings.Contains(profile, "legacy") + if legacy { + state.MaxShift = 0 + addReason("profile:legacy") + } + if strings.Contains(profile, "archive") { + protect["record"] = true + favor["record"] += 0.3 + addReason("profile:archive") + addReason("protect:record") + } + if strings.Contains(profile, "digital") { + protect["decode"] = true + favor["decode"] += 0.3 + addReason("profile:digital") + addReason("protect:decode") + } + if strings.Contains(profile, "aggressive") { + favor["refinement"] += 0.35 + if !legacy { + state.MaxShift = maxInt(state.MaxShift, 2) + } + addReason("profile:aggressive") + addReason("favor:refinement") + } + + if strings.Contains(intent, "wideband") || strings.Contains(intent, "surveillance") { + favor["refinement"] += 0.25 + if !legacy { + state.MaxShift = maxInt(state.MaxShift, 2) + } + addReason("intent:wideband") + addReason("favor:refinement") + } + if strings.Contains(intent, "archive") || strings.Contains(intent, "record") { + protect["record"] = true + addReason("intent:archive") + addReason("protect:record") + } + if strings.Contains(intent, "decode") || strings.Contains(intent, "digital") || strings.Contains(intent, "hunt") { + protect["decode"] = true + addReason("intent:decode") + addReason("protect:decode") + } + + if strings.Contains(strategy, "archive") { + protect["record"] = true + addReason("strategy:archive") + addReason("protect:record") + } + if strings.Contains(strategy, "digital") { + protect["decode"] = true + addReason("strategy:digital") + addReason("protect:decode") + } + if strings.Contains(strategy, "multi") { + favor["refinement"] += 0.2 + addReason("strategy:multi-resolution") + addReason("favor:refinement") + } + + state.Protect = mapKeysSorted(protect) + state.Favor = favorKeysSorted(favor) + state.Reasons = reasons + state.favorWeights = favor + state.protectMap = protect + return state +} + +func pickRebalanceReceiver(queues []rebalanceQueue) int { + best := -1 + bestScore := 0.0 + for i := range queues { + q := &queues[i] + if q.baseMax <= 0 || q.max <= 0 { + continue + } + if !pressureIsReceiver(q.pressure) { + continue + } + score := pressureScore(q.pressure) * q.favor + if best == -1 || score > bestScore { + best = i + bestScore = score + } + } + return best +} + +func pickRebalanceDonor(queues []rebalanceQueue) int { + best := -1 + bestScore := 0.0 + for i := range queues { + q := &queues[i] + if q.baseMax <= 1 || q.max <= 1 { + continue + } + if q.protect { + continue + } + if !pressureIsDonor(q.pressure) { + continue + } + score := pressureScore(q.pressure) + if best == -1 || score < bestScore { + best = i + bestScore = score + } + } + return best +} + +func pressureIsReceiver(pressure BudgetPressure) bool { + if pressure.Pressure >= 1.15 { + return true + } + switch pressure.Level { + case "high", "critical": + return true + default: + return false + } +} + +func pressureIsDonor(pressure BudgetPressure) bool { + if pressure.Level == "blocked" { + return false + } + if pressure.Pressure == 0 && pressure.Demand == 0 { + return true + } + if pressure.Pressure > 0 && pressure.Pressure <= 0.85 { + return true + } + switch pressure.Level { + case "steady", "idle": + return true + default: + return false + } +} + +func pressureScore(pressure BudgetPressure) float64 { + if pressure.Pressure > 0 { + return pressure.Pressure + } + switch pressure.Level { + case "critical": + return 1.6 + case "high": + return 1.2 + case "elevated": + return 0.9 + case "steady": + return 0.6 + case "idle": + return 0.0 + default: + return 0.0 + } +} + +func mapKeysSorted(values map[string]bool) []string { + if len(values) == 0 { + return nil + } + keys := make([]string, 0, len(values)) + for k, ok := range values { + if ok { + keys = append(keys, k) + } + } + sortStrings(keys) + return keys +} + +func favorKeysSorted(weights map[string]float64) []string { + keys := make([]string, 0, len(weights)) + for k, v := range weights { + if v > 1.01 { + keys = append(keys, k) + } + } + sortStrings(keys) + return keys +} + +func sortStrings(values []string) { + if len(values) <= 1 { + return + } + for i := 0; i < len(values)-1; i++ { + for j := i + 1; j < len(values); j++ { + if values[j] < values[i] { + values[i], values[j] = values[j], values[i] + } + } + } +} + +func (r *BudgetRebalance) favorWeight(queue string) float64 { + if r == nil { + return 1.0 + } + if r.favorWeights != nil { + if v, ok := r.favorWeights[queue]; ok { + return v + } + } + return 1.0 +} + +func (r *BudgetRebalance) protects(queue string) bool { + if r == nil { + return false + } + if r.protectMap != nil { + if v, ok := r.protectMap[queue]; ok { + return v + } + } + return false +} + +func maxInt(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/internal/pipeline/rebalance_test.go b/internal/pipeline/rebalance_test.go new file mode 100644 index 0000000..63c0fa1 --- /dev/null +++ b/internal/pipeline/rebalance_test.go @@ -0,0 +1,119 @@ +package pipeline + +import "testing" + +func TestRebalanceArchiveProtectsRecord(t *testing.T) { + policy := Policy{ + Profile: "archive", + Intent: "archive-and-triage", + MaxRefinementJobs: 4, + MaxRecordingStreams: 4, + MaxDecodeJobs: 4, + } + budget := BudgetModelFromPolicy(policy) + pressure := BudgetPressureSummary{ + Refinement: pressureFor(0.6), + Record: pressureFor(0.6), + Decode: pressureFor(1.3), + } + rebalanced := ApplyBudgetRebalance(policy, budget, pressure) + if rebalanced.Record.RebalanceDelta < 0 { + t.Fatalf("expected record to be protected from donating, got delta=%d", rebalanced.Record.RebalanceDelta) + } + if rebalanced.Decode.RebalanceDelta <= 0 { + t.Fatalf("expected decode to receive a slot, got delta=%d", rebalanced.Decode.RebalanceDelta) + } + if rebalanced.Refinement.RebalanceDelta >= 0 { + t.Fatalf("expected refinement to donate a slot, got delta=%d", rebalanced.Refinement.RebalanceDelta) + } +} + +func TestRebalanceDigitalProtectsDecode(t *testing.T) { + policy := Policy{ + Profile: "digital-hunting", + Intent: "decode-digital", + MaxRefinementJobs: 4, + MaxRecordingStreams: 4, + MaxDecodeJobs: 4, + } + budget := BudgetModelFromPolicy(policy) + pressure := BudgetPressureSummary{ + Refinement: pressureFor(0.6), + Record: pressureFor(1.3), + Decode: pressureFor(0.6), + } + rebalanced := ApplyBudgetRebalance(policy, budget, pressure) + if rebalanced.Decode.RebalanceDelta < 0 { + t.Fatalf("expected decode to be protected from donating, got delta=%d", rebalanced.Decode.RebalanceDelta) + } + if rebalanced.Record.RebalanceDelta <= 0 { + t.Fatalf("expected record to receive a slot, got delta=%d", rebalanced.Record.RebalanceDelta) + } + if rebalanced.Refinement.RebalanceDelta >= 0 { + t.Fatalf("expected refinement to donate a slot, got delta=%d", rebalanced.Refinement.RebalanceDelta) + } +} + +func TestRebalanceAggressiveFavorsRefinement(t *testing.T) { + policy := Policy{ + Profile: "wideband-aggressive", + Intent: "wideband-surveillance", + MaxRefinementJobs: 6, + MaxRecordingStreams: 4, + MaxDecodeJobs: 4, + } + budget := BudgetModelFromPolicy(policy) + pressure := BudgetPressureSummary{ + Refinement: pressureFor(1.3), + Record: pressureFor(0.5), + Decode: pressureFor(0.5), + } + rebalanced := ApplyBudgetRebalance(policy, budget, pressure) + if rebalanced.Refinement.RebalanceDelta <= 0 { + t.Fatalf("expected refinement to receive slots, got delta=%d", rebalanced.Refinement.RebalanceDelta) + } +} + +func TestRebalanceLegacyStaysConservative(t *testing.T) { + policy := Policy{ + Profile: "legacy", + Intent: "general-monitoring", + MaxRefinementJobs: 4, + MaxRecordingStreams: 4, + MaxDecodeJobs: 4, + } + budget := BudgetModelFromPolicy(policy) + pressure := BudgetPressureSummary{ + Refinement: pressureFor(0.5), + Record: pressureFor(1.3), + Decode: pressureFor(0.5), + } + rebalanced := ApplyBudgetRebalance(policy, budget, pressure) + if rebalanced.Rebalance.Active { + t.Fatalf("expected legacy rebalance to remain inactive") + } + if rebalanced.Refinement.RebalanceDelta != 0 || rebalanced.Record.RebalanceDelta != 0 || rebalanced.Decode.RebalanceDelta != 0 { + t.Fatalf("expected no rebalance deltas, got ref=%d record=%d decode=%d", rebalanced.Refinement.RebalanceDelta, rebalanced.Record.RebalanceDelta, rebalanced.Decode.RebalanceDelta) + } +} + +func pressureFor(value float64) BudgetPressure { + level := "" + switch { + case value >= 1.5: + level = "critical" + case value >= 1.15: + level = "high" + case value >= 0.85: + level = "elevated" + case value > 0: + level = "steady" + default: + level = "idle" + } + demand := 1 + if value == 0 { + demand = 0 + } + return BudgetPressure{Pressure: value, Level: level, Demand: demand} +} diff --git a/internal/pipeline/scheduler.go b/internal/pipeline/scheduler.go index ef7b85c..0e34ea0 100644 --- a/internal/pipeline/scheduler.go +++ b/internal/pipeline/scheduler.go @@ -87,9 +87,12 @@ const ( // Current heuristic is intentionally simple and deterministic; later phases can add // richer scoring (novelty, persistence, profile-aware band priorities, decoder value). func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { + return BuildRefinementPlanWithBudget(candidates, policy, BudgetModelFromPolicy(policy)) +} + +func BuildRefinementPlanWithBudget(candidates []Candidate, policy Policy, budgetModel BudgetModel) RefinementPlan { strategy, strategyReason := refinementStrategy(policy) - budgetModel := BudgetModelFromPolicy(policy) - budget := budgetModel.Refinement.Max + budget := budgetQueueLimit(budgetModel.Refinement) holdPolicy := HoldPolicyFromPolicy(policy) plan := RefinementPlan{ TotalCandidates: len(candidates),