From aaf95591ef047e9aad023205f8ad0c3f898f3559 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sun, 22 Mar 2026 08:21:51 +0100 Subject: [PATCH] Add arbitration-driven refinement admission and queue holds --- cmd/sdrd/arbitration_snapshot.go | 14 +- cmd/sdrd/arbitrator.go | 36 ++++ cmd/sdrd/decision_budget.go | 12 +- cmd/sdrd/dsp_loop.go | 10 +- cmd/sdrd/http_handlers.go | 6 +- cmd/sdrd/phase_state.go | 2 +- cmd/sdrd/phase_state_test.go | 2 +- cmd/sdrd/pipeline_runtime.go | 82 +++++++-- cmd/sdrd/pipeline_runtime_test.go | 4 +- cmd/sdrd/types.go | 21 ++- internal/pipeline/arbitration.go | 241 ++++++++++++++++++++++++++ internal/pipeline/arbitration_test.go | 22 +++ internal/pipeline/goals.go | 26 +++ internal/pipeline/phases.go | 2 + internal/pipeline/scheduler.go | 56 +++--- internal/pipeline/scheduler_test.go | 105 ++++++++--- 16 files changed, 532 insertions(+), 109 deletions(-) create mode 100644 cmd/sdrd/arbitrator.go create mode 100644 internal/pipeline/arbitration.go create mode 100644 internal/pipeline/arbitration_test.go diff --git a/cmd/sdrd/arbitration_snapshot.go b/cmd/sdrd/arbitration_snapshot.go index 0793e4e..0572cad 100644 --- a/cmd/sdrd/arbitration_snapshot.go +++ b/cmd/sdrd/arbitration_snapshot.go @@ -2,12 +2,14 @@ package main import "sdr-wideband-suite/internal/pipeline" -func buildArbitrationSnapshot(step pipeline.RefinementStep, queue decisionQueueStats) *ArbitrationSnapshot { +func buildArbitrationSnapshot(step pipeline.RefinementStep, arb arbitrationState) *ArbitrationSnapshot { return &ArbitrationSnapshot{ - Budgets: &step.Input.Budgets, - RefinementPlan: &step.Input.Plan, - Queue: queue, - DecisionSummary: summarizeDecisions(step.Result.Decisions), - DecisionItems: compactDecisions(step.Result.Decisions), + Budgets: &arb.Budgets, + HoldPolicy: &arb.HoldPolicy, + RefinementPlan: &step.Input.Plan, + RefinementAdmission: &step.Input.Admission, + Queue: arb.Queue, + DecisionSummary: summarizeDecisions(step.Result.Decisions), + DecisionItems: compactDecisions(step.Result.Decisions), } } diff --git a/cmd/sdrd/arbitrator.go b/cmd/sdrd/arbitrator.go new file mode 100644 index 0000000..6ff4712 --- /dev/null +++ b/cmd/sdrd/arbitrator.go @@ -0,0 +1,36 @@ +package main + +import ( + "time" + + "sdr-wideband-suite/internal/pipeline" +) + +type arbitrator struct { + refinementHold *pipeline.RefinementHold + queues *decisionQueues +} + +func newArbitrator() *arbitrator { + return &arbitrator{ + refinementHold: &pipeline.RefinementHold{Active: map[int64]time.Time{}}, + queues: newDecisionQueues(), + } +} + +func (a *arbitrator) AdmitRefinement(plan pipeline.RefinementPlan, policy pipeline.Policy, now time.Time) pipeline.RefinementAdmissionResult { + if a == nil { + return pipeline.AdmitRefinementPlan(plan, policy, now, nil) + } + if a.refinementHold == nil { + a.refinementHold = &pipeline.RefinementHold{Active: map[int64]time.Time{}} + } + return pipeline.AdmitRefinementPlan(plan, policy, now, a.refinementHold) +} + +func (a *arbitrator) ApplyDecisions(decisions []pipeline.SignalDecision, budget pipeline.BudgetModel, now time.Time, policy pipeline.Policy) decisionQueueStats { + if a == nil || a.queues == nil { + return decisionQueueStats{} + } + return a.queues.Apply(decisions, budget, now, policy) +} diff --git a/cmd/sdrd/decision_budget.go b/cmd/sdrd/decision_budget.go index 261faa7..77c409f 100644 --- a/cmd/sdrd/decision_budget.go +++ b/cmd/sdrd/decision_budget.go @@ -19,6 +19,8 @@ type decisionQueueStats struct { RecordBudget int `json:"record_budget"` DecodeBudget int `json:"decode_budget"` HoldMs int `json:"hold_ms"` + RecordHoldMs int `json:"record_hold_ms"` + DecodeHoldMs int `json:"decode_hold_ms"` RecordDropped int `json:"record_dropped"` DecodeDropped int `json:"decode_dropped"` } @@ -52,7 +54,9 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe if dq == nil { return decisionQueueStats{} } - hold := time.Duration(budget.HoldMs) * time.Millisecond + holdPolicy := pipeline.HoldPolicyFromPolicy(policy) + recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond + decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond recSeen := map[int64]bool{} decSeen := map[int64]bool{} for i := range decisions { @@ -99,8 +103,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe purgeExpired(dq.recordHold, now) purgeExpired(dq.decodeHold, now) - recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, hold, now, policy) - decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, hold, now, policy) + recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy) + decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy) stats := decisionQueueStats{ RecordQueued: len(dq.record), @@ -114,6 +118,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe RecordBudget: budget.Record.Max, DecodeBudget: budget.Decode.Max, HoldMs: budget.HoldMs, + RecordHoldMs: holdPolicy.RecordMs, + DecodeHoldMs: holdPolicy.DecodeMs, } for i := range decisions { diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index f841f0f..ed8def3 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -90,7 +90,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } else { displaySignals = rt.det.StableSignals() } - state.queueStats = rt.queueStats + state.arbitration = rt.arbitration state.presentation = pipeline.AnalysisLevel{ Name: "presentation", Role: "presentation", @@ -158,14 +158,8 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * if hasWindows { refinementDebug.Windows = windowStats } - refinementDebug.Queue = state.queueStats - refinementDebug.Budgets = &state.refinement.Input.Budgets - refinementDebug.Arbitration = buildArbitrationSnapshot(state.refinement, state.queueStats) + refinementDebug.Arbitration = buildArbitrationSnapshot(state.refinement, state.arbitration) debugInfo.Refinement = refinementDebug - debugInfo.Decisions = &DecisionDebug{ - Summary: summarizeDecisions(state.refinement.Result.Decisions), - Items: compactDecisions(state.refinement.Result.Decisions), - } } h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.surveillanceSpectrum, Signals: displaySignals, Debug: debugInfo}) } diff --git a/cmd/sdrd/http_handlers.go b/cmd/sdrd/http_handlers.go index ed59e6c..b20650b 100644 --- a/cmd/sdrd/http_handlers.go +++ b/cmd/sdrd/http_handlers.go @@ -164,24 +164,20 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime w.Header().Set("Content-Type", "application/json") snap := phaseSnap.Snapshot() windowStats := buildWindowStats(snap.refinement.Input.Windows) - arbitration := buildArbitrationSnapshot(snap.refinement, snap.queueStats) + arbitration := buildArbitrationSnapshot(snap.refinement, snap.arbitration) out := map[string]any{ "plan": snap.refinement.Input.Plan, "windows": snap.refinement.Input.Windows, "window_stats": windowStats, - "queue_stats": snap.queueStats, "request": snap.refinement.Input.Request, "context": snap.refinement.Input.Context, "detail_level": snap.refinement.Input.Detail, - "budgets": snap.refinement.Input.Budgets, "arbitration": arbitration, "work_items": snap.refinement.Input.WorkItems, "candidates": len(snap.refinement.Input.Candidates), "scheduled": len(snap.refinement.Input.Scheduled), "signals": len(snap.refinement.Result.Signals), "decisions": len(snap.refinement.Result.Decisions), - "decision_summary": summarizeDecisions(snap.refinement.Result.Decisions), - "decision_items": compactDecisions(snap.refinement.Result.Decisions), "surveillance_level": snap.surveillance.Level, "surveillance_levels": snap.surveillance.Levels, "display_level": snap.surveillance.DisplayLevel, diff --git a/cmd/sdrd/phase_state.go b/cmd/sdrd/phase_state.go index f30fdcb..2a14df3 100644 --- a/cmd/sdrd/phase_state.go +++ b/cmd/sdrd/phase_state.go @@ -5,6 +5,6 @@ import "sdr-wideband-suite/internal/pipeline" type phaseState struct { surveillance pipeline.SurveillanceResult refinement pipeline.RefinementStep - queueStats decisionQueueStats + arbitration arbitrationState presentation pipeline.AnalysisLevel } diff --git a/cmd/sdrd/phase_state_test.go b/cmd/sdrd/phase_state_test.go index 0eeb7c8..4ba2ac6 100644 --- a/cmd/sdrd/phase_state_test.go +++ b/cmd/sdrd/phase_state_test.go @@ -13,7 +13,7 @@ func TestPhaseStateCarriesPhaseResults(t *testing.T) { Input: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6}, Result: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}}, }, - queueStats: decisionQueueStats{RecordQueued: 1}, + arbitration: arbitrationState{Queue: decisionQueueStats{RecordQueued: 1}}, presentation: pipeline.AnalysisLevel{Name: "presentation"}, } if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 { diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index 7053751..2535498 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -42,8 +42,8 @@ type dspRuntime struct { rdsMap map[int64]*rdsState streamPhaseState map[int64]*streamExtractState streamOverlap *streamIQOverlap - decisionQueues *decisionQueues - queueStats decisionQueueStats + arbiter *arbitrator + arbitration arbitrationState gotSamples bool } @@ -79,6 +79,7 @@ func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, rdsMap: map[int64]*rdsState{}, streamPhaseState: map[int64]*streamExtractState{}, streamOverlap: &streamIQOverlap{}, + arbiter: newArbitrator(), } if rt.useGPU && gpuState != nil { snap := gpuState.snapshot() @@ -327,17 +328,16 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S } } -func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pipeline.RefinementInput { +func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now time.Time) pipeline.RefinementInput { policy := pipeline.PolicyFromConfig(rt.cfg) plan := pipeline.BuildRefinementPlan(surv.Candidates, policy) - scheduled := append([]pipeline.ScheduledCandidate(nil), surv.Scheduled...) - if len(scheduled) == 0 && len(plan.Selected) > 0 { - scheduled = append([]pipeline.ScheduledCandidate(nil), plan.Selected...) - } - workItems := make([]pipeline.RefinementWorkItem, 0, len(plan.WorkItems)) - if len(plan.WorkItems) > 0 { - workItems = append(workItems, plan.WorkItems...) + admission := rt.arbiter.AdmitRefinement(plan, policy, now) + plan = admission.Plan + workItems := make([]pipeline.RefinementWorkItem, 0, len(admission.WorkItems)) + if len(admission.WorkItems) > 0 { + workItems = append(workItems, admission.WorkItems...) } + scheduled := append([]pipeline.ScheduledCandidate(nil), admission.Admitted...) workIndex := map[int64]int{} for i := range workItems { if workItems[i].Candidate.ID == 0 { @@ -403,6 +403,7 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip Context: surv.Context, Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan}, Budgets: pipeline.BudgetModelFromPolicy(policy), + Admission: admission.Admission, Candidates: append([]pipeline.Candidate(nil), surv.Candidates...), Scheduled: scheduled, WorkItems: workItems, @@ -416,16 +417,34 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip input.Context.Refinement = level input.Context.Detail = detailLevel if !policy.RefinementEnabled { + for i := range input.WorkItems { + item := &input.WorkItems[i] + if item.Status == pipeline.RefinementStatusDropped { + continue + } + item.Status = pipeline.RefinementStatusDropped + item.Reason = pipeline.RefinementReasonDisabled + } input.Scheduled = nil - input.WorkItems = nil input.Request.Reason = pipeline.RefinementReasonDisabled - } + input.Admission.Reason = pipeline.RefinementReasonDisabled + input.Admission.Admitted = 0 + input.Admission.Skipped = 0 + input.Admission.Displaced = 0 + input.Plan.Selected = nil + input.Plan.DroppedByBudget = 0 + } + rt.arbitration.Budgets = input.Budgets + rt.arbitration.Refinement = input.Admission + rt.arbitration.HoldPolicy = pipeline.HoldPolicyFromPolicy(policy) return input } func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep { - input := rt.buildRefinementInput(surv) + input := rt.buildRefinementInput(surv, art.now) + markWorkItemsStatus(input.WorkItems, pipeline.RefinementStatusAdmitted, pipeline.RefinementStatusRunning, pipeline.RefinementReasonRunning) result := rt.refineSignals(art, input, extractMgr, rec) + markWorkItemsCompleted(input.WorkItems, result.Candidates) return pipeline.RefinementStep{Input: input, Result: result} } @@ -488,8 +507,10 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin } } budget := pipeline.BudgetModelFromPolicy(policy) - queueStats := rt.decisionQueues.Apply(decisions, budget, art.now, policy) - rt.queueStats = queueStats + queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy) + rt.arbitration.Budgets = budget + rt.arbitration.HoldPolicy = pipeline.HoldPolicyFromPolicy(policy) + rt.arbitration.Queue = queueStats summary := summarizeDecisions(decisions) if rec != nil { if summary.RecordEnabled > 0 { @@ -653,3 +674,34 @@ func sameIQBuffer(a []complex64, b []complex64) bool { } return &a[0] == &b[0] } + +func markWorkItemsStatus(items []pipeline.RefinementWorkItem, from string, to string, reason string) { + for i := range items { + if items[i].Status != from { + continue + } + items[i].Status = to + if reason != "" { + items[i].Reason = reason + } + } +} + +func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pipeline.Candidate) { + if len(items) == 0 || len(candidates) == 0 { + return + } + done := map[int64]struct{}{} + for _, cand := range candidates { + if cand.ID != 0 { + done[cand.ID] = struct{}{} + } + } + for i := range items { + if _, ok := done[items[i].Candidate.ID]; !ok { + continue + } + items[i].Status = pipeline.RefinementStatusCompleted + items[i].Reason = pipeline.RefinementReasonCompleted + } +} diff --git a/cmd/sdrd/pipeline_runtime_test.go b/cmd/sdrd/pipeline_runtime_test.go index 447262f..95fe89e 100644 --- a/cmd/sdrd/pipeline_runtime_test.go +++ b/cmd/sdrd/pipeline_runtime_test.go @@ -35,8 +35,8 @@ func TestScheduledCandidateSelectionUsesPolicy(t *testing.T) { {ID: 2, SNRDb: 12, BandwidthHz: 5000}, {ID: 3, SNRDb: 8, BandwidthHz: 7000}, }, policy) - if len(got) != 1 { - t.Fatalf("expected 1 scheduled candidate, got %d", len(got)) + if len(got) != 2 { + t.Fatalf("expected 2 scheduled candidates after gating, got %d", len(got)) } if got[0].Candidate.ID != 2 { t.Fatalf("expected highest priority candidate, got %d", got[0].Candidate.ID) diff --git a/cmd/sdrd/types.go b/cmd/sdrd/types.go index 0c0571c..e596984 100644 --- a/cmd/sdrd/types.go +++ b/cmd/sdrd/types.go @@ -36,8 +36,6 @@ type RefinementDebug struct { Request *pipeline.RefinementRequest `json:"request,omitempty"` WorkItems []pipeline.RefinementWorkItem `json:"work_items,omitempty"` Windows *RefinementWindowStats `json:"windows,omitempty"` - Queue decisionQueueStats `json:"queue,omitempty"` - Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` Arbitration *ArbitrationSnapshot `json:"arbitration,omitempty"` } @@ -47,11 +45,20 @@ type DecisionDebug struct { } type ArbitrationSnapshot struct { - Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` - RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` - Queue decisionQueueStats `json:"queue,omitempty"` - DecisionSummary decisionSummary `json:"decision_summary,omitempty"` - DecisionItems []compactDecision `json:"decision_items,omitempty"` + Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` + HoldPolicy *pipeline.HoldPolicy `json:"hold_policy,omitempty"` + RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` + RefinementAdmission *pipeline.RefinementAdmission `json:"refinement_admission,omitempty"` + Queue decisionQueueStats `json:"queue,omitempty"` + DecisionSummary decisionSummary `json:"decision_summary,omitempty"` + DecisionItems []compactDecision `json:"decision_items,omitempty"` +} + +type arbitrationState struct { + Budgets pipeline.BudgetModel + HoldPolicy pipeline.HoldPolicy + Refinement pipeline.RefinementAdmission + Queue decisionQueueStats } type SpectrumFrame struct { diff --git a/internal/pipeline/arbitration.go b/internal/pipeline/arbitration.go new file mode 100644 index 0000000..b617c7d --- /dev/null +++ b/internal/pipeline/arbitration.go @@ -0,0 +1,241 @@ +package pipeline + +import ( + "math" + "strings" + "time" +) + +type HoldPolicy struct { + BaseMs int `json:"base_ms"` + RefinementMs int `json:"refinement_ms"` + RecordMs int `json:"record_ms"` + DecodeMs int `json:"decode_ms"` + Profile string `json:"profile,omitempty"` + Strategy string `json:"strategy,omitempty"` + Reasons []string `json:"reasons,omitempty"` +} + +type RefinementHold struct { + Active map[int64]time.Time +} + +type RefinementAdmission struct { + Budget int `json:"budget"` + BudgetSource string `json:"budget_source,omitempty"` + HoldMs int `json:"hold_ms"` + HoldSource string `json:"hold_source,omitempty"` + Planned int `json:"planned"` + Admitted int `json:"admitted"` + Skipped int `json:"skipped"` + Displaced int `json:"displaced"` + PriorityCutoff float64 `json:"priority_cutoff,omitempty"` + Reason string `json:"reason,omitempty"` +} + +type RefinementAdmissionResult struct { + Plan RefinementPlan + WorkItems []RefinementWorkItem + Admitted []ScheduledCandidate + Admission RefinementAdmission +} + +func HoldPolicyFromPolicy(policy Policy) HoldPolicy { + base := policy.DecisionHoldMs + if base < 0 { + base = 0 + } + refMult := 1.0 + recMult := 1.0 + decMult := 1.0 + reasons := make([]string, 0, 2) + profile := strings.ToLower(strings.TrimSpace(policy.Profile)) + strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) + + if profileContains(profile, "archive") || strategyContains(strategy, "archive") { + recMult *= 1.5 + decMult *= 1.1 + refMult *= 1.2 + reasons = append(reasons, "archive") + } + if profileContains(profile, "digital") || strategyContains(strategy, "digital") { + decMult *= 1.6 + recMult *= 0.85 + refMult *= 1.1 + reasons = append(reasons, "digital") + } + if profileContains(profile, "aggressive") { + refMult *= 1.15 + reasons = append(reasons, "aggressive") + } + if strategyContains(strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy)), "multi") { + refMult *= 1.1 + reasons = append(reasons, "multi-resolution") + } + + return HoldPolicy{ + BaseMs: base, + RefinementMs: scaleHold(base, refMult), + RecordMs: scaleHold(base, recMult), + DecodeMs: scaleHold(base, decMult), + Profile: policy.Profile, + Strategy: policy.RefinementStrategy, + Reasons: reasons, + } +} + +func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold *RefinementHold) RefinementAdmissionResult { + ranked := plan.Ranked + if len(ranked) == 0 { + ranked = plan.Selected + } + workItems := append([]RefinementWorkItem(nil), plan.WorkItems...) + admission := RefinementAdmission{ + Budget: plan.Budget, + BudgetSource: plan.BudgetSource, + } + if len(ranked) == 0 { + admission.Reason = "no-candidates" + return RefinementAdmissionResult{Plan: plan, WorkItems: workItems, Admission: admission} + } + + holdPolicy := HoldPolicyFromPolicy(policy) + admission.HoldMs = holdPolicy.RefinementMs + admission.HoldSource = "resources.decision_hold_ms" + if len(holdPolicy.Reasons) > 0 { + admission.HoldSource += ":" + strings.Join(holdPolicy.Reasons, ",") + } + + planned := len(ranked) + admission.Planned = planned + selected := map[int64]struct{}{} + if hold != nil { + purgeHold(hold.Active, now) + for id := range hold.Active { + if rankedContains(ranked, id) { + selected[id] = struct{}{} + } + } + } + limit := plan.Budget + if limit <= 0 || limit > planned { + limit = planned + } + if len(selected) > limit { + limit = len(selected) + if limit > planned { + limit = planned + } + } + for _, cand := range ranked { + if len(selected) >= limit { + break + } + if _, ok := selected[cand.Candidate.ID]; ok { + continue + } + selected[cand.Candidate.ID] = struct{}{} + } + if hold != nil && admission.HoldMs > 0 { + until := now.Add(time.Duration(admission.HoldMs) * time.Millisecond) + if hold.Active == nil { + hold.Active = map[int64]time.Time{} + } + for id := range selected { + hold.Active[id] = until + } + } + + admitted := make([]ScheduledCandidate, 0, len(selected)) + for _, cand := range ranked { + if _, ok := selected[cand.Candidate.ID]; ok { + admitted = append(admitted, cand) + } + } + admission.Admitted = len(admitted) + admission.Skipped = planned - admission.Admitted + if admission.Skipped < 0 { + admission.Skipped = 0 + } + + displaced := map[int64]struct{}{} + if len(admitted) > 0 { + admission.PriorityCutoff = admitted[len(admitted)-1].Priority + for _, cand := range ranked { + if _, ok := selected[cand.Candidate.ID]; ok { + continue + } + if cand.Priority >= admission.PriorityCutoff { + displaced[cand.Candidate.ID] = struct{}{} + } + } + } + admission.Displaced = len(displaced) + + plan.Selected = admitted + plan.PriorityCutoff = admission.PriorityCutoff + plan.DroppedByBudget = admission.Skipped + for i := range workItems { + item := &workItems[i] + if item.Status != RefinementStatusPlanned { + continue + } + id := item.Candidate.ID + if _, ok := selected[id]; ok { + item.Status = RefinementStatusAdmitted + item.Reason = RefinementReasonAdmitted + continue + } + if _, ok := displaced[id]; ok { + item.Status = RefinementStatusDisplaced + item.Reason = RefinementReasonDisplaced + continue + } + item.Status = RefinementStatusSkipped + item.Reason = RefinementReasonBudget + } + return RefinementAdmissionResult{ + Plan: plan, + WorkItems: workItems, + Admitted: admitted, + Admission: admission, + } +} + +func purgeHold(active map[int64]time.Time, now time.Time) { + for id, until := range active { + if now.After(until) { + delete(active, id) + } + } +} + +func rankedContains(items []ScheduledCandidate, id int64) bool { + for _, item := range items { + if item.Candidate.ID == id { + return true + } + } + return false +} + +func scaleHold(base int, mult float64) int { + if base <= 0 { + return 0 + } + return int(math.Round(float64(base) * mult)) +} + +func profileContains(profile string, token string) bool { + if profile == "" || token == "" { + return false + } + return strings.Contains(profile, strings.ToLower(token)) +} + +func strategyContains(strategy string, token string) bool { + if strategy == "" || token == "" { + return false + } + return strings.Contains(strategy, strings.ToLower(token)) +} diff --git a/internal/pipeline/arbitration_test.go b/internal/pipeline/arbitration_test.go new file mode 100644 index 0000000..a24ed3c --- /dev/null +++ b/internal/pipeline/arbitration_test.go @@ -0,0 +1,22 @@ +package pipeline + +import "testing" + +func TestHoldPolicyArchiveBiasesRecord(t *testing.T) { + policy := Policy{DecisionHoldMs: 1000, Profile: "archive", RefinementStrategy: "archive-oriented"} + hold := HoldPolicyFromPolicy(policy) + if hold.RecordMs <= hold.BaseMs { + t.Fatalf("expected archive profile to extend record hold, got %d vs %d", hold.RecordMs, hold.BaseMs) + } + if hold.RefinementMs <= hold.BaseMs { + t.Fatalf("expected archive profile to extend refinement hold, got %d vs %d", hold.RefinementMs, hold.BaseMs) + } +} + +func TestHoldPolicyDigitalBiasesDecode(t *testing.T) { + policy := Policy{DecisionHoldMs: 1000, Profile: "digital-hunting", RefinementStrategy: "digital-hunting"} + hold := HoldPolicyFromPolicy(policy) + if hold.DecodeMs <= hold.RecordMs { + t.Fatalf("expected digital profile to favor decode hold, got decode=%d record=%d", hold.DecodeMs, hold.RecordMs) + } +} diff --git a/internal/pipeline/goals.go b/internal/pipeline/goals.go index 7222a59..d93ecd8 100644 --- a/internal/pipeline/goals.go +++ b/internal/pipeline/goals.go @@ -35,6 +35,7 @@ func DecisionPriorityBoost(policy Policy, hint string, class string, queue strin boost += hintMatchBoost(policy.AutoDecodeClasses, tag, 3.0) } boost += intentQueueBoost(policy.Intent, queue) + boost += queueStrategyBoost(policy, queue) return boost } @@ -89,6 +90,31 @@ func intentQueueBoost(intent string, queue string) float64 { return boost } +func queueStrategyBoost(policy Policy, queue string) float64 { + queue = strings.ToLower(strings.TrimSpace(queue)) + if queue == "" { + return 0 + } + boost := 0.0 + profile := strings.ToLower(strings.TrimSpace(policy.Profile)) + strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) + if strings.Contains(profile, "archive") || strings.Contains(strategy, "archive") { + if queue == "record" { + boost += 1.5 + } else if queue == "decode" { + boost += 0.5 + } + } + if strings.Contains(profile, "digital") || strings.Contains(strategy, "digital") { + if queue == "decode" { + boost += 1.5 + } else if queue == "record" { + boost += 0.3 + } + } + return boost +} + func refinementIntentWeights(intent string) (float64, float64, float64) { if intent == "" { return 1.0, 1.0, 1.0 diff --git a/internal/pipeline/phases.go b/internal/pipeline/phases.go index bc705fb..3f9d7d7 100644 --- a/internal/pipeline/phases.go +++ b/internal/pipeline/phases.go @@ -52,6 +52,7 @@ type RefinementPlan struct { PriorityMax float64 `json:"priority_max,omitempty"` PriorityAvg float64 `json:"priority_avg,omitempty"` PriorityCutoff float64 `json:"priority_cutoff,omitempty"` + Ranked []ScheduledCandidate `json:"ranked,omitempty"` Selected []ScheduledCandidate `json:"selected,omitempty"` WorkItems []RefinementWorkItem `json:"work_items,omitempty"` } @@ -68,6 +69,7 @@ type RefinementInput struct { Context AnalysisContext `json:"context,omitempty"` Request RefinementRequest `json:"request,omitempty"` Budgets BudgetModel `json:"budgets,omitempty"` + Admission RefinementAdmission `json:"admission,omitempty"` Candidates []Candidate `json:"candidates,omitempty"` Scheduled []ScheduledCandidate `json:"scheduled,omitempty"` WorkItems []RefinementWorkItem `json:"work_items,omitempty"` diff --git a/internal/pipeline/scheduler.go b/internal/pipeline/scheduler.go index 3cf5e82..21c0d89 100644 --- a/internal/pipeline/scheduler.go +++ b/internal/pipeline/scheduler.go @@ -52,21 +52,30 @@ type RefinementExecution struct { } const ( - RefinementStatusSelected = "selected" - RefinementStatusDropped = "dropped" - RefinementStatusDeferred = "deferred" + RefinementStatusPlanned = "planned" + RefinementStatusAdmitted = "admitted" + RefinementStatusRunning = "running" + RefinementStatusCompleted = "completed" + RefinementStatusDropped = "dropped" + RefinementStatusSkipped = "skipped" + RefinementStatusDisplaced = "displaced" ) const ( - RefinementReasonSelected = "selected" + RefinementReasonPlanned = "planned" + RefinementReasonAdmitted = "admitted" + RefinementReasonRunning = "running" + RefinementReasonCompleted = "completed" RefinementReasonMonitorGate = "dropped:monitor" RefinementReasonBelowSNR = "dropped:snr" - RefinementReasonBudget = "dropped:budget" + RefinementReasonBudget = "skipped:budget" RefinementReasonDisabled = "dropped:disabled" RefinementReasonUnclassified = "dropped:unclassified" + RefinementReasonDisplaced = "skipped:displaced" ) -// BuildRefinementPlan scores and budgets candidates for costly local refinement. +// BuildRefinementPlan scores and ranks candidates for costly local refinement. +// Admission/budget enforcement is handled by arbitration to keep refinement/record/decode consistent. // Current heuristic is intentionally simple and deterministic; later phases can add // richer scoring (novelty, persistence, profile-aware band priorities, decoder value). func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { @@ -152,7 +161,8 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { Priority: priority, Score: score, Breakdown: &score.Breakdown, - Status: RefinementStatusDeferred, + Status: RefinementStatusPlanned, + Reason: RefinementReasonPlanned, }) } sort.Slice(scored, func(i, j int) bool { @@ -178,37 +188,17 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { plan.PriorityMax = maxPriority plan.PriorityAvg = sumPriority / float64(len(scored)) } - limit := plan.Budget - if limit <= 0 || limit > len(scored) { - limit = len(scored) - } - plan.Selected = scored[:limit] - if len(plan.Selected) > 0 { - plan.PriorityCutoff = plan.Selected[len(plan.Selected)-1].Priority - } - plan.DroppedByBudget = len(scored) - len(plan.Selected) - if len(plan.Selected) > 0 { - selected := map[int64]struct{}{} - for _, s := range plan.Selected { - selected[s.Candidate.ID] = struct{}{} - } - for i := range workItems { - item := &workItems[i] - if _, ok := selected[item.Candidate.ID]; ok { - item.Status = RefinementStatusSelected - item.Reason = RefinementReasonSelected - } else if item.Status == RefinementStatusDeferred { - item.Status = RefinementStatusDropped - item.Reason = RefinementReasonBudget - } - } - } + plan.Ranked = append(plan.Ranked, scored...) plan.WorkItems = workItems return plan } func ScheduleCandidates(candidates []Candidate, policy Policy) []ScheduledCandidate { - return BuildRefinementPlan(candidates, policy).Selected + plan := BuildRefinementPlan(candidates, policy) + if len(plan.Ranked) > 0 { + return plan.Ranked + } + return plan.Selected } func refinementStrategy(policy Policy) (string, string) { diff --git a/internal/pipeline/scheduler_test.go b/internal/pipeline/scheduler_test.go index fcfe36e..10018bd 100644 --- a/internal/pipeline/scheduler_test.go +++ b/internal/pipeline/scheduler_test.go @@ -1,6 +1,9 @@ package pipeline -import "testing" +import ( + "testing" + "time" +) func TestScheduleCandidates(t *testing.T) { policy := Policy{MaxRefinementJobs: 2, MinCandidateSNRDb: 5} @@ -11,8 +14,8 @@ func TestScheduleCandidates(t *testing.T) { {ID: 4, CenterHz: 400, SNRDb: 20, BandwidthHz: 100000, PeakDb: 5}, } got := ScheduleCandidates(cands, policy) - if len(got) != 2 { - t.Fatalf("expected 2 scheduled candidates, got %d", len(got)) + if len(got) != 3 { + t.Fatalf("expected 3 scheduled candidates, got %d", len(got)) } if got[0].Candidate.ID != 4 { t.Fatalf("expected strongest candidate first, got id=%d", got[0].Candidate.ID) @@ -36,26 +39,29 @@ func TestBuildRefinementPlanTracksDrops(t *testing.T) { if plan.DroppedBySNR != 1 { t.Fatalf("expected 1 dropped by SNR, got %d", plan.DroppedBySNR) } - if plan.DroppedByBudget != 1 { - t.Fatalf("expected 1 dropped by budget, got %d", plan.DroppedByBudget) + if plan.DroppedByBudget != 0 { + t.Fatalf("expected 0 dropped by budget in plan stage, got %d", plan.DroppedByBudget) } - if len(plan.Selected) != 1 || plan.Selected[0].Candidate.ID != 2 { - t.Fatalf("unexpected plan selection: %+v", plan.Selected) + if len(plan.Selected) != 0 { + t.Fatalf("expected no admitted selection in plan stage, got %+v", plan.Selected) + } + if len(plan.Ranked) != 2 { + t.Fatalf("expected ranked candidates after gating, got %d", len(plan.Ranked)) } if len(plan.WorkItems) != len(cands) { t.Fatalf("expected work items for all candidates, got %d", len(plan.WorkItems)) } item2 := findWorkItem(plan.WorkItems, 2) - if item2 == nil || item2.Status != RefinementStatusSelected || item2.Reason != RefinementReasonSelected { - t.Fatalf("expected candidate 2 selected with reason, got %+v", item2) + if item2 == nil || item2.Status != RefinementStatusPlanned || item2.Reason != RefinementReasonPlanned { + t.Fatalf("expected candidate 2 planned with reason, got %+v", item2) } item1 := findWorkItem(plan.WorkItems, 1) if item1 == nil || item1.Reason != RefinementReasonBelowSNR { t.Fatalf("expected candidate 1 dropped by snr, got %+v", item1) } item3 := findWorkItem(plan.WorkItems, 3) - if item3 == nil || item3.Reason != RefinementReasonBudget { - t.Fatalf("expected candidate 3 dropped by budget, got %+v", item3) + if item3 == nil || item3.Status != RefinementStatusPlanned { + t.Fatalf("expected candidate 3 planned pre-admission, got %+v", item3) } } @@ -73,8 +79,8 @@ func TestBuildRefinementPlanRespectsMaxConcurrent(t *testing.T) { if plan.BudgetSource != "refinement.max_concurrent" { t.Fatalf("expected budget source refinement.max_concurrent, got %s", plan.BudgetSource) } - if len(plan.Selected) != 2 { - t.Fatalf("expected 2 selected, got %d", len(plan.Selected)) + if len(plan.Selected) != 0 { + t.Fatalf("expected no selected until admission, got %d", len(plan.Selected)) } } @@ -90,8 +96,8 @@ func TestBuildRefinementPlanAppliesMonitorSpan(t *testing.T) { if plan.DroppedByMonitor != 2 { t.Fatalf("expected 2 dropped by monitor, got %d", plan.DroppedByMonitor) } - if len(plan.Selected) != 2 { - t.Fatalf("expected 2 selected within monitor, got %d", len(plan.Selected)) + if len(plan.Ranked) != 2 { + t.Fatalf("expected 2 ranked within monitor, got %d", len(plan.Ranked)) } } @@ -107,8 +113,8 @@ func TestBuildRefinementPlanAppliesMonitorSpanCentered(t *testing.T) { if plan.DroppedByMonitor != 1 { t.Fatalf("expected 1 dropped by monitor, got %d", plan.DroppedByMonitor) } - if len(plan.Selected) != 3 { - t.Fatalf("expected 3 selected within monitor, got %d", len(plan.Selected)) + if len(plan.Ranked) != 3 { + t.Fatalf("expected 3 ranked within monitor, got %d", len(plan.Ranked)) } } @@ -133,7 +139,7 @@ func TestScheduleCandidatesPriorityBoost(t *testing.T) { {ID: 1, SNRDb: 15, Hint: "voice"}, {ID: 2, SNRDb: 14, Hint: "digital-burst"}, }, policy) - if len(got) != 1 || got[0].Candidate.ID != 2 { + if len(got) != 2 || got[0].Candidate.ID != 2 { t.Fatalf("expected priority boost to favor digital candidate, got %+v", got) } } @@ -148,16 +154,17 @@ func TestBuildRefinementPlanPriorityStats(t *testing.T) { if plan.PriorityMax < plan.PriorityMin { t.Fatalf("priority bounds invalid: %+v", plan) } - if len(plan.Selected) != 1 { - t.Fatalf("expected 1 selected, got %d", len(plan.Selected)) + res := AdmitRefinementPlan(plan, policy, time.Now(), &RefinementHold{Active: map[int64]time.Time{}}) + if len(res.Plan.Selected) != 1 { + t.Fatalf("expected 1 admitted, got %d", len(res.Plan.Selected)) } - if plan.PriorityCutoff != plan.Selected[0].Priority { - t.Fatalf("expected cutoff to match selection, got %.2f vs %.2f", plan.PriorityCutoff, plan.Selected[0].Priority) + if res.Plan.PriorityCutoff != res.Plan.Selected[0].Priority { + t.Fatalf("expected cutoff to match selection, got %.2f vs %.2f", res.Plan.PriorityCutoff, res.Plan.Selected[0].Priority) } - if plan.Selected[0].Breakdown == nil { + if res.Plan.Selected[0].Breakdown == nil { t.Fatalf("expected breakdown on selected candidate") } - if plan.Selected[0].Score == nil || plan.Selected[0].Score.Total == 0 { + if res.Plan.Selected[0].Score == nil || res.Plan.Selected[0].Score.Total == 0 { t.Fatalf("expected score on selected candidate") } } @@ -169,11 +176,53 @@ func TestBuildRefinementPlanStrategyBias(t *testing.T) { {ID: 2, CenterHz: 200, SNRDb: 11, BandwidthHz: 100000, PeakDb: 1}, } plan := BuildRefinementPlan(cands, policy) - if len(plan.Selected) != 1 { - t.Fatalf("expected 1 selected, got %d", len(plan.Selected)) + if len(plan.Ranked) != 2 { + t.Fatalf("expected ranked candidates, got %d", len(plan.Ranked)) + } + if plan.Ranked[0].Candidate.ID != 2 { + t.Fatalf("expected archive-oriented strategy to favor wider candidate, got %+v", plan.Ranked[0]) + } +} + +func TestAdmitRefinementPlanAppliesBudget(t *testing.T) { + policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 10} + cands := []Candidate{ + {ID: 2, CenterHz: 200, SNRDb: 12, BandwidthHz: 50000, PeakDb: 3}, + {ID: 3, CenterHz: 300, SNRDb: 11, BandwidthHz: 25000, PeakDb: 2}, + } + plan := BuildRefinementPlan(cands, policy) + res := AdmitRefinementPlan(plan, policy, time.Now(), &RefinementHold{Active: map[int64]time.Time{}}) + if len(res.Plan.Selected) != 1 || res.Plan.Selected[0].Candidate.ID != 2 { + t.Fatalf("unexpected admission selection: %+v", res.Plan.Selected) + } + if res.Plan.DroppedByBudget != 1 { + t.Fatalf("expected 1 dropped by budget, got %d", res.Plan.DroppedByBudget) + } + item2 := findWorkItem(res.WorkItems, 2) + if item2 == nil || item2.Status != RefinementStatusAdmitted { + t.Fatalf("expected candidate 2 admitted, got %+v", item2) + } + item3 := findWorkItem(res.WorkItems, 3) + if item3 == nil || item3.Status != RefinementStatusSkipped { + t.Fatalf("expected candidate 3 skipped, got %+v", item3) + } +} + +func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) { + policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0} + cands := []Candidate{ + {ID: 1, CenterHz: 100, SNRDb: 5}, + {ID: 2, CenterHz: 200, SNRDb: 12}, } - if plan.Selected[0].Candidate.ID != 2 { - t.Fatalf("expected archive-oriented strategy to favor wider candidate, got %+v", plan.Selected[0]) + plan := BuildRefinementPlan(cands, policy) + hold := &RefinementHold{Active: map[int64]time.Time{1: time.Now().Add(2 * time.Second)}} + res := AdmitRefinementPlan(plan, policy, time.Now(), hold) + if len(res.Plan.Selected) != 1 || res.Plan.Selected[0].Candidate.ID != 1 { + t.Fatalf("expected held candidate to remain admitted, got %+v", res.Plan.Selected) + } + item2 := findWorkItem(res.WorkItems, 2) + if item2 == nil || item2.Status != RefinementStatusDisplaced { + t.Fatalf("expected higher priority candidate displaced, got %+v", item2) } }