diff --git a/internal/pipeline/arbitration.go b/internal/pipeline/arbitration.go index d7d5e2e..3a33910 100644 --- a/internal/pipeline/arbitration.go +++ b/internal/pipeline/arbitration.go @@ -21,17 +21,18 @@ type RefinementHold struct { } type RefinementAdmission struct { - Budget int `json:"budget"` - BudgetSource string `json:"budget_source,omitempty"` - HoldMs int `json:"hold_ms"` - HoldSource string `json:"hold_source,omitempty"` - Planned int `json:"planned"` - Admitted int `json:"admitted"` - Skipped int `json:"skipped"` - Displaced int `json:"displaced"` - PriorityCutoff float64 `json:"priority_cutoff,omitempty"` - PriorityTier string `json:"priority_tier,omitempty"` - Reason string `json:"reason,omitempty"` + Budget int `json:"budget"` + BudgetSource string `json:"budget_source,omitempty"` + HoldMs int `json:"hold_ms"` + HoldSource string `json:"hold_source,omitempty"` + Planned int `json:"planned"` + Admitted int `json:"admitted"` + Skipped int `json:"skipped"` + Displaced int `json:"displaced"` + PriorityCutoff float64 `json:"priority_cutoff,omitempty"` + PriorityTier string `json:"priority_tier,omitempty"` + Reason string `json:"reason,omitempty"` + Pressure BudgetPressure `json:"pressure,omitempty"` } type RefinementAdmissionResult struct { @@ -115,6 +116,7 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold } holdPolicy := HoldPolicyFromPolicy(policy) + budgetModel := BudgetModelFromPolicy(policy) admission.HoldMs = holdPolicy.RefinementMs admission.HoldSource = "resources.decision_hold_ms" if len(holdPolicy.Reasons) > 0 { @@ -189,8 +191,9 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold } admission.Displaced = len(displaced) admission.PriorityTier = PriorityTierFromRange(admission.PriorityCutoff, plan.PriorityMin, plan.PriorityMax) + admission.Pressure = buildRefinementPressure(budgetModel, admission) if admission.PriorityCutoff > 0 { - admission.Reason = admissionReason("admission:budget", policy, holdPolicy, "budget:"+slugToken(plan.BudgetSource)) + admission.Reason = admissionReason("admission:budget", policy, holdPolicy, pressureReasonTag(admission.Pressure), "budget:"+slugToken(plan.BudgetSource)) } plan.Selected = admitted @@ -218,7 +221,7 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax) - item.Admission.Reason = admissionReason(reason, policy, holdPolicy, "budget:"+slugToken(plan.BudgetSource)) + item.Admission.Reason = admissionReason(reason, policy, holdPolicy, pressureReasonTag(admission.Pressure), "budget:"+slugToken(plan.BudgetSource)) continue } if _, ok := displaced[id]; ok { @@ -231,7 +234,7 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax) - item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, "pressure:hold", "budget:"+slugToken(plan.BudgetSource)) + item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", "budget:"+slugToken(plan.BudgetSource)) continue } item.Status = RefinementStatusSkipped @@ -243,7 +246,7 @@ func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold item.Admission.Score = item.Priority item.Admission.Cutoff = admission.PriorityCutoff item.Admission.Tier = PriorityTierFromRange(item.Priority, plan.PriorityMin, plan.PriorityMax) - item.Admission.Reason = admissionReason("refinement:skip:budget", policy, holdPolicy, "pressure:budget", "budget:"+slugToken(plan.BudgetSource)) + item.Admission.Reason = admissionReason("refinement:skip:budget", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:budget", "budget:"+slugToken(plan.BudgetSource)) } return RefinementAdmissionResult{ Plan: plan, diff --git a/internal/pipeline/arbitration_state.go b/internal/pipeline/arbitration_state.go index 99a84c3..f1065d3 100644 --- a/internal/pipeline/arbitration_state.go +++ b/internal/pipeline/arbitration_state.go @@ -1,10 +1,11 @@ package pipeline type ArbitrationState struct { - Budgets BudgetModel `json:"budgets,omitempty"` - HoldPolicy HoldPolicy `json:"hold_policy,omitempty"` - Refinement RefinementAdmission `json:"refinement,omitempty"` - Queue DecisionQueueStats `json:"queue,omitempty"` + Budgets BudgetModel `json:"budgets,omitempty"` + HoldPolicy HoldPolicy `json:"hold_policy,omitempty"` + Refinement RefinementAdmission `json:"refinement,omitempty"` + Queue DecisionQueueStats `json:"queue,omitempty"` + Pressure BudgetPressureSummary `json:"pressure,omitempty"` } func BuildArbitrationState(policy Policy, budget BudgetModel, admission RefinementAdmission, queue DecisionQueueStats) ArbitrationState { @@ -13,5 +14,6 @@ func BuildArbitrationState(policy Policy, budget BudgetModel, admission Refineme HoldPolicy: HoldPolicyFromPolicy(policy), Refinement: admission, Queue: queue, + Pressure: BuildBudgetPressureSummary(budget, admission, queue), } } diff --git a/internal/pipeline/budget.go b/internal/pipeline/budget.go index af40772..84b3813 100644 --- a/internal/pipeline/budget.go +++ b/internal/pipeline/budget.go @@ -3,43 +3,64 @@ package pipeline import "strings" type BudgetQueue struct { - Max int `json:"max"` - IntentBias float64 `json:"intent_bias,omitempty"` - Source string `json:"source,omitempty"` + Max int `json:"max"` + IntentBias float64 `json:"intent_bias,omitempty"` + Preference float64 `json:"preference,omitempty"` + EffectiveMax float64 `json:"effective_max,omitempty"` + Source string `json:"source,omitempty"` +} + +type BudgetPreference struct { + Refinement float64 `json:"refinement"` + Record float64 `json:"record"` + Decode float64 `json:"decode"` + Reasons []string `json:"reasons,omitempty"` } type BudgetModel struct { - Refinement BudgetQueue `json:"refinement"` - Record BudgetQueue `json:"record"` - Decode BudgetQueue `json:"decode"` - HoldMs int `json:"hold_ms"` - Intent string `json:"intent,omitempty"` - Profile string `json:"profile,omitempty"` - Strategy string `json:"strategy,omitempty"` + Refinement BudgetQueue `json:"refinement"` + Record BudgetQueue `json:"record"` + Decode BudgetQueue `json:"decode"` + HoldMs int `json:"hold_ms"` + Intent string `json:"intent,omitempty"` + Profile string `json:"profile,omitempty"` + Strategy string `json:"strategy,omitempty"` + Preference BudgetPreference `json:"preference,omitempty"` } func BudgetModelFromPolicy(policy Policy) BudgetModel { recordBias, decodeBias := budgetIntentBias(policy.Intent) refBudget, refSource := refinementBudgetFromPolicy(policy) + preference := BudgetPreferenceFromPolicy(policy) + refEffective := effectiveBudget(refBudget, preference.Refinement) + recordEffective := effectiveBudget(policy.MaxRecordingStreams, preference.Record) + decodeEffective := effectiveBudget(policy.MaxDecodeJobs, preference.Decode) return BudgetModel{ Refinement: BudgetQueue{ - Max: refBudget, - Source: refSource, + Max: refBudget, + Preference: preference.Refinement, + EffectiveMax: refEffective, + Source: refSource, }, Record: BudgetQueue{ - Max: policy.MaxRecordingStreams, - IntentBias: recordBias, - Source: "resources.max_recording_streams", + Max: policy.MaxRecordingStreams, + IntentBias: recordBias, + Preference: preference.Record, + EffectiveMax: recordEffective, + Source: "resources.max_recording_streams", }, Decode: BudgetQueue{ - Max: policy.MaxDecodeJobs, - IntentBias: decodeBias, - Source: "resources.max_decode_jobs", + Max: policy.MaxDecodeJobs, + IntentBias: decodeBias, + Preference: preference.Decode, + EffectiveMax: decodeEffective, + Source: "resources.max_decode_jobs", }, - HoldMs: policy.DecisionHoldMs, - Intent: policy.Intent, - Profile: policy.Profile, - Strategy: policy.RefinementStrategy, + HoldMs: policy.DecisionHoldMs, + Intent: policy.Intent, + Profile: policy.Profile, + Strategy: policy.RefinementStrategy, + Preference: preference, } } @@ -75,3 +96,94 @@ func budgetIntentBias(intent string) (float64, float64) { } return recordBias, decodeBias } + +func BudgetPreferenceFromPolicy(policy Policy) BudgetPreference { + pref := BudgetPreference{Refinement: 1.0, Record: 1.0, Decode: 1.0} + reasons := make([]string, 0, 6) + addReason := func(tag string) { + if tag == "" { + return + } + for _, r := range reasons { + if r == tag { + return + } + } + reasons = append(reasons, tag) + } + + profile := strings.ToLower(strings.TrimSpace(policy.Profile)) + intent := strings.ToLower(strings.TrimSpace(policy.Intent)) + strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) + + if strings.Contains(profile, "archive") { + pref.Record += 0.6 + pref.Decode += 0.2 + pref.Refinement += 0.15 + addReason("profile:archive") + } + if strings.Contains(profile, "digital") { + pref.Decode += 0.6 + pref.Record += 0.1 + pref.Refinement += 0.15 + addReason("profile:digital") + } + if strings.Contains(profile, "aggressive") { + pref.Refinement += 0.35 + addReason("profile:aggressive") + } + + if strings.Contains(intent, "archive") || strings.Contains(intent, "record") { + pref.Record += 0.5 + addReason("intent:record") + } + if strings.Contains(intent, "decode") || strings.Contains(intent, "analysis") || strings.Contains(intent, "classif") { + pref.Decode += 0.5 + addReason("intent:decode") + } + if strings.Contains(intent, "digital") || strings.Contains(intent, "hunt") { + pref.Decode += 0.25 + addReason("intent:digital") + } + if strings.Contains(intent, "wideband") || strings.Contains(intent, "surveillance") { + pref.Refinement += 0.25 + addReason("intent:wideband") + } + + if strings.Contains(strategy, "archive") { + pref.Record += 0.2 + pref.Refinement += 0.1 + addReason("strategy:archive") + } + if strings.Contains(strategy, "digital") { + pref.Decode += 0.2 + addReason("strategy:digital") + } + if strings.Contains(strategy, "multi") { + pref.Refinement += 0.2 + addReason("strategy:multi-resolution") + } + + pref.Refinement = clampPreference(pref.Refinement) + pref.Record = clampPreference(pref.Record) + pref.Decode = clampPreference(pref.Decode) + pref.Reasons = reasons + return pref +} + +func clampPreference(value float64) float64 { + if value < 0.35 { + return 0.35 + } + return value +} + +func effectiveBudget(max int, preference float64) float64 { + if max <= 0 { + return 0 + } + if preference <= 0 { + preference = 1.0 + } + return float64(max) * preference +} diff --git a/internal/pipeline/decision_queue.go b/internal/pipeline/decision_queue.go index 4220fb6..6666397 100644 --- a/internal/pipeline/decision_queue.go +++ b/internal/pipeline/decision_queue.go @@ -112,6 +112,10 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy) decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy) + recPressure := buildQueuePressure(budget.Record, len(dq.record), len(recSelected.selected), len(dq.recordHold)) + decPressure := buildQueuePressure(budget.Decode, len(dq.decode), len(decSelected.selected), len(dq.decodeHold)) + recPressureTag := pressureReasonTag(recPressure) + decPressureTag := pressureReasonTag(decPressure) stats := DecisionQueueStats{ RecordQueued: len(dq.record), @@ -132,19 +136,19 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, for i := range decisions { id := decisions[i].Candidate.ID if decisions[i].ShouldRecord { - decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source) + decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source, recPressureTag) if _, ok := recSelected.selected[id]; !ok { decisions[i].ShouldRecord = false - decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, "pressure:budget", "budget:"+slugToken(budget.Record.Source)) + decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, recPressureTag, "pressure:budget", "budget:"+slugToken(budget.Record.Source)) stats.RecordDropped++ } } if decisions[i].ShouldAutoDecode { - decisions[i].DecodeAdmission = buildQueueAdmission("decode", id, decSelected, policy, holdPolicy, budget.Decode.Source) + decisions[i].DecodeAdmission = buildQueueAdmission("decode", id, decSelected, policy, holdPolicy, budget.Decode.Source, decPressureTag) if _, ok := decSelected.selected[id]; !ok { decisions[i].ShouldAutoDecode = false if decisions[i].Reason == "" { - decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, "pressure:budget", "budget:"+slugToken(budget.Decode.Source)) + decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, decPressureTag, "pressure:budget", "budget:"+slugToken(budget.Decode.Source)) } stats.DecodeDropped++ } @@ -234,7 +238,7 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in return selection } -func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string) *PriorityAdmission { +func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string, pressureTag string) *PriorityAdmission { score, ok := selection.scores[id] if !ok { return nil @@ -248,15 +252,15 @@ func buildQueueAdmission(queueName string, id int64, selection queueSelection, p if _, ok := selection.selected[id]; ok { if _, held := selection.held[id]; held { admission.Class = AdmissionClassHold - admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, "pressure:hold", "budget:"+slugToken(budgetSource)) + admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, pressureTag, "pressure:hold", "budget:"+slugToken(budgetSource)) } else { admission.Class = AdmissionClassAdmit - admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, "budget:"+slugToken(budgetSource)) + admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, pressureTag, "budget:"+slugToken(budgetSource)) } return admission } admission.Class = AdmissionClassDefer - admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, "pressure:budget", "budget:"+slugToken(budgetSource)) + admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, pressureTag, "pressure:budget", "budget:"+slugToken(budgetSource)) return admission } diff --git a/internal/pipeline/pressure.go b/internal/pipeline/pressure.go new file mode 100644 index 0000000..4efb983 --- /dev/null +++ b/internal/pipeline/pressure.go @@ -0,0 +1,110 @@ +package pipeline + +import "math" + +type BudgetPressure struct { + Max int `json:"max"` + Effective float64 `json:"effective,omitempty"` + Preference float64 `json:"preference,omitempty"` + Demand int `json:"demand"` + Queued int `json:"queued,omitempty"` + Selected int `json:"selected,omitempty"` + Active int `json:"active,omitempty"` + Pressure float64 `json:"pressure,omitempty"` + Level string `json:"level,omitempty"` +} + +type BudgetPressureSummary struct { + Refinement BudgetPressure `json:"refinement"` + Record BudgetPressure `json:"record"` + Decode BudgetPressure `json:"decode"` +} + +func BuildBudgetPressureSummary(budget BudgetModel, admission RefinementAdmission, queue DecisionQueueStats) BudgetPressureSummary { + return BudgetPressureSummary{ + Refinement: buildRefinementPressure(budget, admission), + Record: buildQueuePressure(budget.Record, queue.RecordQueued, queue.RecordSelected, queue.RecordActive), + Decode: buildQueuePressure(budget.Decode, queue.DecodeQueued, queue.DecodeSelected, queue.DecodeActive), + } +} + +func buildRefinementPressure(budget BudgetModel, admission RefinementAdmission) BudgetPressure { + demand := admission.Planned + selected := admission.Admitted + return buildPressure(budget.Refinement, demand, 0, selected, 0) +} + +func buildQueuePressure(queue BudgetQueue, queued, selected, active int) BudgetPressure { + demand := queued + if demand < selected { + demand = selected + } + return buildPressure(queue, demand, queued, selected, active) +} + +func buildPressure(queue BudgetQueue, demand int, queued int, selected int, active int) BudgetPressure { + effective := queue.EffectiveMax + preference := queue.Preference + if effective <= 0 && queue.Max > 0 { + if preference <= 0 { + preference = 1.0 + } + effective = float64(queue.Max) * preference + } + pressure := 0.0 + level := "" + switch { + case demand == 0: + level = "idle" + case queue.Max <= 0: + level = "blocked" + case effective > 0: + pressure = float64(demand) / effective + level = pressureLevel(pressure) + } + return BudgetPressure{ + Max: queue.Max, + Effective: roundFloat(pressureEffectiveMax(effective)), + Preference: preference, + Demand: demand, + Queued: queued, + Selected: selected, + Active: active, + Pressure: roundFloat(pressure), + Level: level, + } +} + +func pressureLevel(pressure float64) string { + switch { + case pressure >= 1.5: + return "critical" + case pressure >= 1.15: + return "high" + case pressure >= 0.85: + return "elevated" + default: + return "steady" + } +} + +func pressureReasonTag(pressure BudgetPressure) string { + if pressure.Level == "" || pressure.Level == "idle" { + return "" + } + return "pressure:" + pressure.Level +} + +func pressureEffectiveMax(value float64) float64 { + if value < 0 { + return 0 + } + return value +} + +func roundFloat(value float64) float64 { + if value == 0 { + return 0 + } + return math.Round(value*100) / 100 +} diff --git a/internal/pipeline/pressure_test.go b/internal/pipeline/pressure_test.go new file mode 100644 index 0000000..020d1c7 --- /dev/null +++ b/internal/pipeline/pressure_test.go @@ -0,0 +1,78 @@ +package pipeline + +import ( + "strings" + "testing" + "time" +) + +func TestBudgetPreferenceAffectsEffectiveBudgets(t *testing.T) { + archivePolicy := Policy{ + Profile: "archive", + Intent: "archive-and-triage", + MaxRecordingStreams: 10, + MaxDecodeJobs: 10, + MaxRefinementJobs: 6, + } + archiveBudget := BudgetModelFromPolicy(archivePolicy) + if archiveBudget.Record.EffectiveMax <= archiveBudget.Decode.EffectiveMax { + t.Fatalf("expected archive preference to favor record, got record=%.2f decode=%.2f", archiveBudget.Record.EffectiveMax, archiveBudget.Decode.EffectiveMax) + } + if len(archiveBudget.Preference.Reasons) == 0 { + t.Fatalf("expected archive preference reasons to be populated") + } + + digitalPolicy := Policy{ + Profile: "digital-hunting", + Intent: "decode-digital", + MaxRecordingStreams: 10, + MaxDecodeJobs: 10, + MaxRefinementJobs: 6, + } + digitalBudget := BudgetModelFromPolicy(digitalPolicy) + if digitalBudget.Decode.EffectiveMax <= digitalBudget.Record.EffectiveMax { + t.Fatalf("expected digital preference to favor decode, got record=%.2f decode=%.2f", digitalBudget.Record.EffectiveMax, digitalBudget.Decode.EffectiveMax) + } +} + +func TestPressureSummaryReflectsPreference(t *testing.T) { + policy := Policy{ + Profile: "digital-hunting", + Intent: "decode-digital", + MaxRecordingStreams: 4, + MaxDecodeJobs: 4, + MaxRefinementJobs: 2, + } + budget := BudgetModelFromPolicy(policy) + queue := DecisionQueueStats{ + RecordQueued: 4, + DecodeQueued: 4, + RecordSelected: 2, + DecodeSelected: 2, + RecordActive: 1, + DecodeActive: 1, + } + pressure := BuildBudgetPressureSummary(budget, RefinementAdmission{}, queue) + if pressure.Record.Pressure <= 0 || pressure.Decode.Pressure <= 0 { + t.Fatalf("expected non-zero pressure ratios, got record=%.2f decode=%.2f", pressure.Record.Pressure, pressure.Decode.Pressure) + } + if pressure.Record.Pressure <= pressure.Decode.Pressure { + t.Fatalf("expected record pressure to be higher than decode under digital preference, got record=%.2f decode=%.2f", pressure.Record.Pressure, pressure.Decode.Pressure) + } +} + +func TestRefinementPressureTagsAdmission(t *testing.T) { + policy := Policy{Profile: "archive", MaxRefinementJobs: 1, MinCandidateSNRDb: 0} + cands := []Candidate{ + {ID: 1, CenterHz: 100, SNRDb: 10}, + {ID: 2, CenterHz: 200, SNRDb: 9}, + } + plan := BuildRefinementPlan(cands, policy) + res := AdmitRefinementPlan(plan, policy, time.Now(), &RefinementHold{Active: map[int64]time.Time{}}) + if res.Admission.Pressure.Level == "" || res.Admission.Pressure.Level == "idle" { + t.Fatalf("expected pressure level to be set, got %+v", res.Admission.Pressure) + } + if res.Admission.Reason == "" || !strings.Contains(res.Admission.Reason, "pressure:") { + t.Fatalf("expected admission reason to include pressure tag, got %s", res.Admission.Reason) + } +}