| @@ -2,12 +2,14 @@ package main | |||
| import "sdr-wideband-suite/internal/pipeline" | |||
| func buildArbitrationSnapshot(step pipeline.RefinementStep, queue decisionQueueStats) *ArbitrationSnapshot { | |||
| func buildArbitrationSnapshot(step pipeline.RefinementStep, arb arbitrationState) *ArbitrationSnapshot { | |||
| return &ArbitrationSnapshot{ | |||
| Budgets: &step.Input.Budgets, | |||
| RefinementPlan: &step.Input.Plan, | |||
| Queue: queue, | |||
| DecisionSummary: summarizeDecisions(step.Result.Decisions), | |||
| DecisionItems: compactDecisions(step.Result.Decisions), | |||
| Budgets: &arb.Budgets, | |||
| HoldPolicy: &arb.HoldPolicy, | |||
| RefinementPlan: &step.Input.Plan, | |||
| RefinementAdmission: &step.Input.Admission, | |||
| Queue: arb.Queue, | |||
| DecisionSummary: summarizeDecisions(step.Result.Decisions), | |||
| DecisionItems: compactDecisions(step.Result.Decisions), | |||
| } | |||
| } | |||
| @@ -0,0 +1,36 @@ | |||
| package main | |||
| import ( | |||
| "time" | |||
| "sdr-wideband-suite/internal/pipeline" | |||
| ) | |||
| type arbitrator struct { | |||
| refinementHold *pipeline.RefinementHold | |||
| queues *decisionQueues | |||
| } | |||
| func newArbitrator() *arbitrator { | |||
| return &arbitrator{ | |||
| refinementHold: &pipeline.RefinementHold{Active: map[int64]time.Time{}}, | |||
| queues: newDecisionQueues(), | |||
| } | |||
| } | |||
| func (a *arbitrator) AdmitRefinement(plan pipeline.RefinementPlan, policy pipeline.Policy, now time.Time) pipeline.RefinementAdmissionResult { | |||
| if a == nil { | |||
| return pipeline.AdmitRefinementPlan(plan, policy, now, nil) | |||
| } | |||
| if a.refinementHold == nil { | |||
| a.refinementHold = &pipeline.RefinementHold{Active: map[int64]time.Time{}} | |||
| } | |||
| return pipeline.AdmitRefinementPlan(plan, policy, now, a.refinementHold) | |||
| } | |||
| func (a *arbitrator) ApplyDecisions(decisions []pipeline.SignalDecision, budget pipeline.BudgetModel, now time.Time, policy pipeline.Policy) decisionQueueStats { | |||
| if a == nil || a.queues == nil { | |||
| return decisionQueueStats{} | |||
| } | |||
| return a.queues.Apply(decisions, budget, now, policy) | |||
| } | |||
| @@ -19,6 +19,8 @@ type decisionQueueStats struct { | |||
| RecordBudget int `json:"record_budget"` | |||
| DecodeBudget int `json:"decode_budget"` | |||
| HoldMs int `json:"hold_ms"` | |||
| RecordHoldMs int `json:"record_hold_ms"` | |||
| DecodeHoldMs int `json:"decode_hold_ms"` | |||
| RecordDropped int `json:"record_dropped"` | |||
| DecodeDropped int `json:"decode_dropped"` | |||
| } | |||
| @@ -52,7 +54,9 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe | |||
| if dq == nil { | |||
| return decisionQueueStats{} | |||
| } | |||
| hold := time.Duration(budget.HoldMs) * time.Millisecond | |||
| holdPolicy := pipeline.HoldPolicyFromPolicy(policy) | |||
| recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond | |||
| decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond | |||
| recSeen := map[int64]bool{} | |||
| decSeen := map[int64]bool{} | |||
| for i := range decisions { | |||
| @@ -99,8 +103,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe | |||
| purgeExpired(dq.recordHold, now) | |||
| purgeExpired(dq.decodeHold, now) | |||
| recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, hold, now, policy) | |||
| decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, hold, now, policy) | |||
| recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy) | |||
| decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy) | |||
| stats := decisionQueueStats{ | |||
| RecordQueued: len(dq.record), | |||
| @@ -114,6 +118,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipe | |||
| RecordBudget: budget.Record.Max, | |||
| DecodeBudget: budget.Decode.Max, | |||
| HoldMs: budget.HoldMs, | |||
| RecordHoldMs: holdPolicy.RecordMs, | |||
| DecodeHoldMs: holdPolicy.DecodeMs, | |||
| } | |||
| for i := range decisions { | |||
| @@ -90,7 +90,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * | |||
| } else { | |||
| displaySignals = rt.det.StableSignals() | |||
| } | |||
| state.queueStats = rt.queueStats | |||
| state.arbitration = rt.arbitration | |||
| state.presentation = pipeline.AnalysisLevel{ | |||
| Name: "presentation", | |||
| Role: "presentation", | |||
| @@ -158,14 +158,8 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * | |||
| if hasWindows { | |||
| refinementDebug.Windows = windowStats | |||
| } | |||
| refinementDebug.Queue = state.queueStats | |||
| refinementDebug.Budgets = &state.refinement.Input.Budgets | |||
| refinementDebug.Arbitration = buildArbitrationSnapshot(state.refinement, state.queueStats) | |||
| refinementDebug.Arbitration = buildArbitrationSnapshot(state.refinement, state.arbitration) | |||
| debugInfo.Refinement = refinementDebug | |||
| debugInfo.Decisions = &DecisionDebug{ | |||
| Summary: summarizeDecisions(state.refinement.Result.Decisions), | |||
| Items: compactDecisions(state.refinement.Result.Decisions), | |||
| } | |||
| } | |||
| h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.surveillanceSpectrum, Signals: displaySignals, Debug: debugInfo}) | |||
| } | |||
| @@ -164,24 +164,20 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime | |||
| w.Header().Set("Content-Type", "application/json") | |||
| snap := phaseSnap.Snapshot() | |||
| windowStats := buildWindowStats(snap.refinement.Input.Windows) | |||
| arbitration := buildArbitrationSnapshot(snap.refinement, snap.queueStats) | |||
| arbitration := buildArbitrationSnapshot(snap.refinement, snap.arbitration) | |||
| out := map[string]any{ | |||
| "plan": snap.refinement.Input.Plan, | |||
| "windows": snap.refinement.Input.Windows, | |||
| "window_stats": windowStats, | |||
| "queue_stats": snap.queueStats, | |||
| "request": snap.refinement.Input.Request, | |||
| "context": snap.refinement.Input.Context, | |||
| "detail_level": snap.refinement.Input.Detail, | |||
| "budgets": snap.refinement.Input.Budgets, | |||
| "arbitration": arbitration, | |||
| "work_items": snap.refinement.Input.WorkItems, | |||
| "candidates": len(snap.refinement.Input.Candidates), | |||
| "scheduled": len(snap.refinement.Input.Scheduled), | |||
| "signals": len(snap.refinement.Result.Signals), | |||
| "decisions": len(snap.refinement.Result.Decisions), | |||
| "decision_summary": summarizeDecisions(snap.refinement.Result.Decisions), | |||
| "decision_items": compactDecisions(snap.refinement.Result.Decisions), | |||
| "surveillance_level": snap.surveillance.Level, | |||
| "surveillance_levels": snap.surveillance.Levels, | |||
| "display_level": snap.surveillance.DisplayLevel, | |||
| @@ -5,6 +5,6 @@ import "sdr-wideband-suite/internal/pipeline" | |||
| type phaseState struct { | |||
| surveillance pipeline.SurveillanceResult | |||
| refinement pipeline.RefinementStep | |||
| queueStats decisionQueueStats | |||
| arbitration arbitrationState | |||
| presentation pipeline.AnalysisLevel | |||
| } | |||
| @@ -13,7 +13,7 @@ func TestPhaseStateCarriesPhaseResults(t *testing.T) { | |||
| Input: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6}, | |||
| Result: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}}, | |||
| }, | |||
| queueStats: decisionQueueStats{RecordQueued: 1}, | |||
| arbitration: arbitrationState{Queue: decisionQueueStats{RecordQueued: 1}}, | |||
| presentation: pipeline.AnalysisLevel{Name: "presentation"}, | |||
| } | |||
| if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 { | |||
| @@ -42,8 +42,8 @@ type dspRuntime struct { | |||
| rdsMap map[int64]*rdsState | |||
| streamPhaseState map[int64]*streamExtractState | |||
| streamOverlap *streamIQOverlap | |||
| decisionQueues *decisionQueues | |||
| queueStats decisionQueueStats | |||
| arbiter *arbitrator | |||
| arbitration arbitrationState | |||
| gotSamples bool | |||
| } | |||
| @@ -79,6 +79,7 @@ func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, | |||
| rdsMap: map[int64]*rdsState{}, | |||
| streamPhaseState: map[int64]*streamExtractState{}, | |||
| streamOverlap: &streamIQOverlap{}, | |||
| arbiter: newArbitrator(), | |||
| } | |||
| if rt.useGPU && gpuState != nil { | |||
| snap := gpuState.snapshot() | |||
| @@ -327,17 +328,16 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S | |||
| } | |||
| } | |||
| func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pipeline.RefinementInput { | |||
| func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now time.Time) pipeline.RefinementInput { | |||
| policy := pipeline.PolicyFromConfig(rt.cfg) | |||
| plan := pipeline.BuildRefinementPlan(surv.Candidates, policy) | |||
| scheduled := append([]pipeline.ScheduledCandidate(nil), surv.Scheduled...) | |||
| if len(scheduled) == 0 && len(plan.Selected) > 0 { | |||
| scheduled = append([]pipeline.ScheduledCandidate(nil), plan.Selected...) | |||
| } | |||
| workItems := make([]pipeline.RefinementWorkItem, 0, len(plan.WorkItems)) | |||
| if len(plan.WorkItems) > 0 { | |||
| workItems = append(workItems, plan.WorkItems...) | |||
| admission := rt.arbiter.AdmitRefinement(plan, policy, now) | |||
| plan = admission.Plan | |||
| workItems := make([]pipeline.RefinementWorkItem, 0, len(admission.WorkItems)) | |||
| if len(admission.WorkItems) > 0 { | |||
| workItems = append(workItems, admission.WorkItems...) | |||
| } | |||
| scheduled := append([]pipeline.ScheduledCandidate(nil), admission.Admitted...) | |||
| workIndex := map[int64]int{} | |||
| for i := range workItems { | |||
| if workItems[i].Candidate.ID == 0 { | |||
| @@ -403,6 +403,7 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip | |||
| Context: surv.Context, | |||
| Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan}, | |||
| Budgets: pipeline.BudgetModelFromPolicy(policy), | |||
| Admission: admission.Admission, | |||
| Candidates: append([]pipeline.Candidate(nil), surv.Candidates...), | |||
| Scheduled: scheduled, | |||
| WorkItems: workItems, | |||
| @@ -416,16 +417,34 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip | |||
| input.Context.Refinement = level | |||
| input.Context.Detail = detailLevel | |||
| if !policy.RefinementEnabled { | |||
| for i := range input.WorkItems { | |||
| item := &input.WorkItems[i] | |||
| if item.Status == pipeline.RefinementStatusDropped { | |||
| continue | |||
| } | |||
| item.Status = pipeline.RefinementStatusDropped | |||
| item.Reason = pipeline.RefinementReasonDisabled | |||
| } | |||
| input.Scheduled = nil | |||
| input.WorkItems = nil | |||
| input.Request.Reason = pipeline.RefinementReasonDisabled | |||
| } | |||
| input.Admission.Reason = pipeline.RefinementReasonDisabled | |||
| input.Admission.Admitted = 0 | |||
| input.Admission.Skipped = 0 | |||
| input.Admission.Displaced = 0 | |||
| input.Plan.Selected = nil | |||
| input.Plan.DroppedByBudget = 0 | |||
| } | |||
| rt.arbitration.Budgets = input.Budgets | |||
| rt.arbitration.Refinement = input.Admission | |||
| rt.arbitration.HoldPolicy = pipeline.HoldPolicyFromPolicy(policy) | |||
| return input | |||
| } | |||
| func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep { | |||
| input := rt.buildRefinementInput(surv) | |||
| input := rt.buildRefinementInput(surv, art.now) | |||
| markWorkItemsStatus(input.WorkItems, pipeline.RefinementStatusAdmitted, pipeline.RefinementStatusRunning, pipeline.RefinementReasonRunning) | |||
| result := rt.refineSignals(art, input, extractMgr, rec) | |||
| markWorkItemsCompleted(input.WorkItems, result.Candidates) | |||
| return pipeline.RefinementStep{Input: input, Result: result} | |||
| } | |||
| @@ -488,8 +507,10 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin | |||
| } | |||
| } | |||
| budget := pipeline.BudgetModelFromPolicy(policy) | |||
| queueStats := rt.decisionQueues.Apply(decisions, budget, art.now, policy) | |||
| rt.queueStats = queueStats | |||
| queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy) | |||
| rt.arbitration.Budgets = budget | |||
| rt.arbitration.HoldPolicy = pipeline.HoldPolicyFromPolicy(policy) | |||
| rt.arbitration.Queue = queueStats | |||
| summary := summarizeDecisions(decisions) | |||
| if rec != nil { | |||
| if summary.RecordEnabled > 0 { | |||
| @@ -653,3 +674,34 @@ func sameIQBuffer(a []complex64, b []complex64) bool { | |||
| } | |||
| return &a[0] == &b[0] | |||
| } | |||
| func markWorkItemsStatus(items []pipeline.RefinementWorkItem, from string, to string, reason string) { | |||
| for i := range items { | |||
| if items[i].Status != from { | |||
| continue | |||
| } | |||
| items[i].Status = to | |||
| if reason != "" { | |||
| items[i].Reason = reason | |||
| } | |||
| } | |||
| } | |||
| func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pipeline.Candidate) { | |||
| if len(items) == 0 || len(candidates) == 0 { | |||
| return | |||
| } | |||
| done := map[int64]struct{}{} | |||
| for _, cand := range candidates { | |||
| if cand.ID != 0 { | |||
| done[cand.ID] = struct{}{} | |||
| } | |||
| } | |||
| for i := range items { | |||
| if _, ok := done[items[i].Candidate.ID]; !ok { | |||
| continue | |||
| } | |||
| items[i].Status = pipeline.RefinementStatusCompleted | |||
| items[i].Reason = pipeline.RefinementReasonCompleted | |||
| } | |||
| } | |||
| @@ -35,8 +35,8 @@ func TestScheduledCandidateSelectionUsesPolicy(t *testing.T) { | |||
| {ID: 2, SNRDb: 12, BandwidthHz: 5000}, | |||
| {ID: 3, SNRDb: 8, BandwidthHz: 7000}, | |||
| }, policy) | |||
| if len(got) != 1 { | |||
| t.Fatalf("expected 1 scheduled candidate, got %d", len(got)) | |||
| if len(got) != 2 { | |||
| t.Fatalf("expected 2 scheduled candidates after gating, got %d", len(got)) | |||
| } | |||
| if got[0].Candidate.ID != 2 { | |||
| t.Fatalf("expected highest priority candidate, got %d", got[0].Candidate.ID) | |||
| @@ -36,8 +36,6 @@ type RefinementDebug struct { | |||
| Request *pipeline.RefinementRequest `json:"request,omitempty"` | |||
| WorkItems []pipeline.RefinementWorkItem `json:"work_items,omitempty"` | |||
| Windows *RefinementWindowStats `json:"windows,omitempty"` | |||
| Queue decisionQueueStats `json:"queue,omitempty"` | |||
| Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` | |||
| Arbitration *ArbitrationSnapshot `json:"arbitration,omitempty"` | |||
| } | |||
| @@ -47,11 +45,20 @@ type DecisionDebug struct { | |||
| } | |||
| type ArbitrationSnapshot struct { | |||
| Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` | |||
| RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` | |||
| Queue decisionQueueStats `json:"queue,omitempty"` | |||
| DecisionSummary decisionSummary `json:"decision_summary,omitempty"` | |||
| DecisionItems []compactDecision `json:"decision_items,omitempty"` | |||
| Budgets *pipeline.BudgetModel `json:"budgets,omitempty"` | |||
| HoldPolicy *pipeline.HoldPolicy `json:"hold_policy,omitempty"` | |||
| RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"` | |||
| RefinementAdmission *pipeline.RefinementAdmission `json:"refinement_admission,omitempty"` | |||
| Queue decisionQueueStats `json:"queue,omitempty"` | |||
| DecisionSummary decisionSummary `json:"decision_summary,omitempty"` | |||
| DecisionItems []compactDecision `json:"decision_items,omitempty"` | |||
| } | |||
| type arbitrationState struct { | |||
| Budgets pipeline.BudgetModel | |||
| HoldPolicy pipeline.HoldPolicy | |||
| Refinement pipeline.RefinementAdmission | |||
| Queue decisionQueueStats | |||
| } | |||
| type SpectrumFrame struct { | |||
| @@ -0,0 +1,241 @@ | |||
| package pipeline | |||
| import ( | |||
| "math" | |||
| "strings" | |||
| "time" | |||
| ) | |||
| type HoldPolicy struct { | |||
| BaseMs int `json:"base_ms"` | |||
| RefinementMs int `json:"refinement_ms"` | |||
| RecordMs int `json:"record_ms"` | |||
| DecodeMs int `json:"decode_ms"` | |||
| Profile string `json:"profile,omitempty"` | |||
| Strategy string `json:"strategy,omitempty"` | |||
| Reasons []string `json:"reasons,omitempty"` | |||
| } | |||
| type RefinementHold struct { | |||
| Active map[int64]time.Time | |||
| } | |||
| type RefinementAdmission struct { | |||
| Budget int `json:"budget"` | |||
| BudgetSource string `json:"budget_source,omitempty"` | |||
| HoldMs int `json:"hold_ms"` | |||
| HoldSource string `json:"hold_source,omitempty"` | |||
| Planned int `json:"planned"` | |||
| Admitted int `json:"admitted"` | |||
| Skipped int `json:"skipped"` | |||
| Displaced int `json:"displaced"` | |||
| PriorityCutoff float64 `json:"priority_cutoff,omitempty"` | |||
| Reason string `json:"reason,omitempty"` | |||
| } | |||
| type RefinementAdmissionResult struct { | |||
| Plan RefinementPlan | |||
| WorkItems []RefinementWorkItem | |||
| Admitted []ScheduledCandidate | |||
| Admission RefinementAdmission | |||
| } | |||
| func HoldPolicyFromPolicy(policy Policy) HoldPolicy { | |||
| base := policy.DecisionHoldMs | |||
| if base < 0 { | |||
| base = 0 | |||
| } | |||
| refMult := 1.0 | |||
| recMult := 1.0 | |||
| decMult := 1.0 | |||
| reasons := make([]string, 0, 2) | |||
| profile := strings.ToLower(strings.TrimSpace(policy.Profile)) | |||
| strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) | |||
| if profileContains(profile, "archive") || strategyContains(strategy, "archive") { | |||
| recMult *= 1.5 | |||
| decMult *= 1.1 | |||
| refMult *= 1.2 | |||
| reasons = append(reasons, "archive") | |||
| } | |||
| if profileContains(profile, "digital") || strategyContains(strategy, "digital") { | |||
| decMult *= 1.6 | |||
| recMult *= 0.85 | |||
| refMult *= 1.1 | |||
| reasons = append(reasons, "digital") | |||
| } | |||
| if profileContains(profile, "aggressive") { | |||
| refMult *= 1.15 | |||
| reasons = append(reasons, "aggressive") | |||
| } | |||
| if strategyContains(strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy)), "multi") { | |||
| refMult *= 1.1 | |||
| reasons = append(reasons, "multi-resolution") | |||
| } | |||
| return HoldPolicy{ | |||
| BaseMs: base, | |||
| RefinementMs: scaleHold(base, refMult), | |||
| RecordMs: scaleHold(base, recMult), | |||
| DecodeMs: scaleHold(base, decMult), | |||
| Profile: policy.Profile, | |||
| Strategy: policy.RefinementStrategy, | |||
| Reasons: reasons, | |||
| } | |||
| } | |||
| func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold *RefinementHold) RefinementAdmissionResult { | |||
| ranked := plan.Ranked | |||
| if len(ranked) == 0 { | |||
| ranked = plan.Selected | |||
| } | |||
| workItems := append([]RefinementWorkItem(nil), plan.WorkItems...) | |||
| admission := RefinementAdmission{ | |||
| Budget: plan.Budget, | |||
| BudgetSource: plan.BudgetSource, | |||
| } | |||
| if len(ranked) == 0 { | |||
| admission.Reason = "no-candidates" | |||
| return RefinementAdmissionResult{Plan: plan, WorkItems: workItems, Admission: admission} | |||
| } | |||
| holdPolicy := HoldPolicyFromPolicy(policy) | |||
| admission.HoldMs = holdPolicy.RefinementMs | |||
| admission.HoldSource = "resources.decision_hold_ms" | |||
| if len(holdPolicy.Reasons) > 0 { | |||
| admission.HoldSource += ":" + strings.Join(holdPolicy.Reasons, ",") | |||
| } | |||
| planned := len(ranked) | |||
| admission.Planned = planned | |||
| selected := map[int64]struct{}{} | |||
| if hold != nil { | |||
| purgeHold(hold.Active, now) | |||
| for id := range hold.Active { | |||
| if rankedContains(ranked, id) { | |||
| selected[id] = struct{}{} | |||
| } | |||
| } | |||
| } | |||
| limit := plan.Budget | |||
| if limit <= 0 || limit > planned { | |||
| limit = planned | |||
| } | |||
| if len(selected) > limit { | |||
| limit = len(selected) | |||
| if limit > planned { | |||
| limit = planned | |||
| } | |||
| } | |||
| for _, cand := range ranked { | |||
| if len(selected) >= limit { | |||
| break | |||
| } | |||
| if _, ok := selected[cand.Candidate.ID]; ok { | |||
| continue | |||
| } | |||
| selected[cand.Candidate.ID] = struct{}{} | |||
| } | |||
| if hold != nil && admission.HoldMs > 0 { | |||
| until := now.Add(time.Duration(admission.HoldMs) * time.Millisecond) | |||
| if hold.Active == nil { | |||
| hold.Active = map[int64]time.Time{} | |||
| } | |||
| for id := range selected { | |||
| hold.Active[id] = until | |||
| } | |||
| } | |||
| admitted := make([]ScheduledCandidate, 0, len(selected)) | |||
| for _, cand := range ranked { | |||
| if _, ok := selected[cand.Candidate.ID]; ok { | |||
| admitted = append(admitted, cand) | |||
| } | |||
| } | |||
| admission.Admitted = len(admitted) | |||
| admission.Skipped = planned - admission.Admitted | |||
| if admission.Skipped < 0 { | |||
| admission.Skipped = 0 | |||
| } | |||
| displaced := map[int64]struct{}{} | |||
| if len(admitted) > 0 { | |||
| admission.PriorityCutoff = admitted[len(admitted)-1].Priority | |||
| for _, cand := range ranked { | |||
| if _, ok := selected[cand.Candidate.ID]; ok { | |||
| continue | |||
| } | |||
| if cand.Priority >= admission.PriorityCutoff { | |||
| displaced[cand.Candidate.ID] = struct{}{} | |||
| } | |||
| } | |||
| } | |||
| admission.Displaced = len(displaced) | |||
| plan.Selected = admitted | |||
| plan.PriorityCutoff = admission.PriorityCutoff | |||
| plan.DroppedByBudget = admission.Skipped | |||
| for i := range workItems { | |||
| item := &workItems[i] | |||
| if item.Status != RefinementStatusPlanned { | |||
| continue | |||
| } | |||
| id := item.Candidate.ID | |||
| if _, ok := selected[id]; ok { | |||
| item.Status = RefinementStatusAdmitted | |||
| item.Reason = RefinementReasonAdmitted | |||
| continue | |||
| } | |||
| if _, ok := displaced[id]; ok { | |||
| item.Status = RefinementStatusDisplaced | |||
| item.Reason = RefinementReasonDisplaced | |||
| continue | |||
| } | |||
| item.Status = RefinementStatusSkipped | |||
| item.Reason = RefinementReasonBudget | |||
| } | |||
| return RefinementAdmissionResult{ | |||
| Plan: plan, | |||
| WorkItems: workItems, | |||
| Admitted: admitted, | |||
| Admission: admission, | |||
| } | |||
| } | |||
| func purgeHold(active map[int64]time.Time, now time.Time) { | |||
| for id, until := range active { | |||
| if now.After(until) { | |||
| delete(active, id) | |||
| } | |||
| } | |||
| } | |||
| func rankedContains(items []ScheduledCandidate, id int64) bool { | |||
| for _, item := range items { | |||
| if item.Candidate.ID == id { | |||
| return true | |||
| } | |||
| } | |||
| return false | |||
| } | |||
| func scaleHold(base int, mult float64) int { | |||
| if base <= 0 { | |||
| return 0 | |||
| } | |||
| return int(math.Round(float64(base) * mult)) | |||
| } | |||
| func profileContains(profile string, token string) bool { | |||
| if profile == "" || token == "" { | |||
| return false | |||
| } | |||
| return strings.Contains(profile, strings.ToLower(token)) | |||
| } | |||
| func strategyContains(strategy string, token string) bool { | |||
| if strategy == "" || token == "" { | |||
| return false | |||
| } | |||
| return strings.Contains(strategy, strings.ToLower(token)) | |||
| } | |||
| @@ -0,0 +1,22 @@ | |||
| package pipeline | |||
| import "testing" | |||
| func TestHoldPolicyArchiveBiasesRecord(t *testing.T) { | |||
| policy := Policy{DecisionHoldMs: 1000, Profile: "archive", RefinementStrategy: "archive-oriented"} | |||
| hold := HoldPolicyFromPolicy(policy) | |||
| if hold.RecordMs <= hold.BaseMs { | |||
| t.Fatalf("expected archive profile to extend record hold, got %d vs %d", hold.RecordMs, hold.BaseMs) | |||
| } | |||
| if hold.RefinementMs <= hold.BaseMs { | |||
| t.Fatalf("expected archive profile to extend refinement hold, got %d vs %d", hold.RefinementMs, hold.BaseMs) | |||
| } | |||
| } | |||
| func TestHoldPolicyDigitalBiasesDecode(t *testing.T) { | |||
| policy := Policy{DecisionHoldMs: 1000, Profile: "digital-hunting", RefinementStrategy: "digital-hunting"} | |||
| hold := HoldPolicyFromPolicy(policy) | |||
| if hold.DecodeMs <= hold.RecordMs { | |||
| t.Fatalf("expected digital profile to favor decode hold, got decode=%d record=%d", hold.DecodeMs, hold.RecordMs) | |||
| } | |||
| } | |||
| @@ -35,6 +35,7 @@ func DecisionPriorityBoost(policy Policy, hint string, class string, queue strin | |||
| boost += hintMatchBoost(policy.AutoDecodeClasses, tag, 3.0) | |||
| } | |||
| boost += intentQueueBoost(policy.Intent, queue) | |||
| boost += queueStrategyBoost(policy, queue) | |||
| return boost | |||
| } | |||
| @@ -89,6 +90,31 @@ func intentQueueBoost(intent string, queue string) float64 { | |||
| return boost | |||
| } | |||
| func queueStrategyBoost(policy Policy, queue string) float64 { | |||
| queue = strings.ToLower(strings.TrimSpace(queue)) | |||
| if queue == "" { | |||
| return 0 | |||
| } | |||
| boost := 0.0 | |||
| profile := strings.ToLower(strings.TrimSpace(policy.Profile)) | |||
| strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy)) | |||
| if strings.Contains(profile, "archive") || strings.Contains(strategy, "archive") { | |||
| if queue == "record" { | |||
| boost += 1.5 | |||
| } else if queue == "decode" { | |||
| boost += 0.5 | |||
| } | |||
| } | |||
| if strings.Contains(profile, "digital") || strings.Contains(strategy, "digital") { | |||
| if queue == "decode" { | |||
| boost += 1.5 | |||
| } else if queue == "record" { | |||
| boost += 0.3 | |||
| } | |||
| } | |||
| return boost | |||
| } | |||
| func refinementIntentWeights(intent string) (float64, float64, float64) { | |||
| if intent == "" { | |||
| return 1.0, 1.0, 1.0 | |||
| @@ -52,6 +52,7 @@ type RefinementPlan struct { | |||
| PriorityMax float64 `json:"priority_max,omitempty"` | |||
| PriorityAvg float64 `json:"priority_avg,omitempty"` | |||
| PriorityCutoff float64 `json:"priority_cutoff,omitempty"` | |||
| Ranked []ScheduledCandidate `json:"ranked,omitempty"` | |||
| Selected []ScheduledCandidate `json:"selected,omitempty"` | |||
| WorkItems []RefinementWorkItem `json:"work_items,omitempty"` | |||
| } | |||
| @@ -68,6 +69,7 @@ type RefinementInput struct { | |||
| Context AnalysisContext `json:"context,omitempty"` | |||
| Request RefinementRequest `json:"request,omitempty"` | |||
| Budgets BudgetModel `json:"budgets,omitempty"` | |||
| Admission RefinementAdmission `json:"admission,omitempty"` | |||
| Candidates []Candidate `json:"candidates,omitempty"` | |||
| Scheduled []ScheduledCandidate `json:"scheduled,omitempty"` | |||
| WorkItems []RefinementWorkItem `json:"work_items,omitempty"` | |||
| @@ -52,21 +52,30 @@ type RefinementExecution struct { | |||
| } | |||
| const ( | |||
| RefinementStatusSelected = "selected" | |||
| RefinementStatusDropped = "dropped" | |||
| RefinementStatusDeferred = "deferred" | |||
| RefinementStatusPlanned = "planned" | |||
| RefinementStatusAdmitted = "admitted" | |||
| RefinementStatusRunning = "running" | |||
| RefinementStatusCompleted = "completed" | |||
| RefinementStatusDropped = "dropped" | |||
| RefinementStatusSkipped = "skipped" | |||
| RefinementStatusDisplaced = "displaced" | |||
| ) | |||
| const ( | |||
| RefinementReasonSelected = "selected" | |||
| RefinementReasonPlanned = "planned" | |||
| RefinementReasonAdmitted = "admitted" | |||
| RefinementReasonRunning = "running" | |||
| RefinementReasonCompleted = "completed" | |||
| RefinementReasonMonitorGate = "dropped:monitor" | |||
| RefinementReasonBelowSNR = "dropped:snr" | |||
| RefinementReasonBudget = "dropped:budget" | |||
| RefinementReasonBudget = "skipped:budget" | |||
| RefinementReasonDisabled = "dropped:disabled" | |||
| RefinementReasonUnclassified = "dropped:unclassified" | |||
| RefinementReasonDisplaced = "skipped:displaced" | |||
| ) | |||
| // BuildRefinementPlan scores and budgets candidates for costly local refinement. | |||
| // BuildRefinementPlan scores and ranks candidates for costly local refinement. | |||
| // Admission/budget enforcement is handled by arbitration to keep refinement/record/decode consistent. | |||
| // Current heuristic is intentionally simple and deterministic; later phases can add | |||
| // richer scoring (novelty, persistence, profile-aware band priorities, decoder value). | |||
| func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { | |||
| @@ -152,7 +161,8 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { | |||
| Priority: priority, | |||
| Score: score, | |||
| Breakdown: &score.Breakdown, | |||
| Status: RefinementStatusDeferred, | |||
| Status: RefinementStatusPlanned, | |||
| Reason: RefinementReasonPlanned, | |||
| }) | |||
| } | |||
| sort.Slice(scored, func(i, j int) bool { | |||
| @@ -178,37 +188,17 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { | |||
| plan.PriorityMax = maxPriority | |||
| plan.PriorityAvg = sumPriority / float64(len(scored)) | |||
| } | |||
| limit := plan.Budget | |||
| if limit <= 0 || limit > len(scored) { | |||
| limit = len(scored) | |||
| } | |||
| plan.Selected = scored[:limit] | |||
| if len(plan.Selected) > 0 { | |||
| plan.PriorityCutoff = plan.Selected[len(plan.Selected)-1].Priority | |||
| } | |||
| plan.DroppedByBudget = len(scored) - len(plan.Selected) | |||
| if len(plan.Selected) > 0 { | |||
| selected := map[int64]struct{}{} | |||
| for _, s := range plan.Selected { | |||
| selected[s.Candidate.ID] = struct{}{} | |||
| } | |||
| for i := range workItems { | |||
| item := &workItems[i] | |||
| if _, ok := selected[item.Candidate.ID]; ok { | |||
| item.Status = RefinementStatusSelected | |||
| item.Reason = RefinementReasonSelected | |||
| } else if item.Status == RefinementStatusDeferred { | |||
| item.Status = RefinementStatusDropped | |||
| item.Reason = RefinementReasonBudget | |||
| } | |||
| } | |||
| } | |||
| plan.Ranked = append(plan.Ranked, scored...) | |||
| plan.WorkItems = workItems | |||
| return plan | |||
| } | |||
| func ScheduleCandidates(candidates []Candidate, policy Policy) []ScheduledCandidate { | |||
| return BuildRefinementPlan(candidates, policy).Selected | |||
| plan := BuildRefinementPlan(candidates, policy) | |||
| if len(plan.Ranked) > 0 { | |||
| return plan.Ranked | |||
| } | |||
| return plan.Selected | |||
| } | |||
| func refinementStrategy(policy Policy) (string, string) { | |||
| @@ -1,6 +1,9 @@ | |||
| package pipeline | |||
| import "testing" | |||
| import ( | |||
| "testing" | |||
| "time" | |||
| ) | |||
| func TestScheduleCandidates(t *testing.T) { | |||
| policy := Policy{MaxRefinementJobs: 2, MinCandidateSNRDb: 5} | |||
| @@ -11,8 +14,8 @@ func TestScheduleCandidates(t *testing.T) { | |||
| {ID: 4, CenterHz: 400, SNRDb: 20, BandwidthHz: 100000, PeakDb: 5}, | |||
| } | |||
| got := ScheduleCandidates(cands, policy) | |||
| if len(got) != 2 { | |||
| t.Fatalf("expected 2 scheduled candidates, got %d", len(got)) | |||
| if len(got) != 3 { | |||
| t.Fatalf("expected 3 scheduled candidates, got %d", len(got)) | |||
| } | |||
| if got[0].Candidate.ID != 4 { | |||
| t.Fatalf("expected strongest candidate first, got id=%d", got[0].Candidate.ID) | |||
| @@ -36,26 +39,29 @@ func TestBuildRefinementPlanTracksDrops(t *testing.T) { | |||
| if plan.DroppedBySNR != 1 { | |||
| t.Fatalf("expected 1 dropped by SNR, got %d", plan.DroppedBySNR) | |||
| } | |||
| if plan.DroppedByBudget != 1 { | |||
| t.Fatalf("expected 1 dropped by budget, got %d", plan.DroppedByBudget) | |||
| if plan.DroppedByBudget != 0 { | |||
| t.Fatalf("expected 0 dropped by budget in plan stage, got %d", plan.DroppedByBudget) | |||
| } | |||
| if len(plan.Selected) != 1 || plan.Selected[0].Candidate.ID != 2 { | |||
| t.Fatalf("unexpected plan selection: %+v", plan.Selected) | |||
| if len(plan.Selected) != 0 { | |||
| t.Fatalf("expected no admitted selection in plan stage, got %+v", plan.Selected) | |||
| } | |||
| if len(plan.Ranked) != 2 { | |||
| t.Fatalf("expected ranked candidates after gating, got %d", len(plan.Ranked)) | |||
| } | |||
| if len(plan.WorkItems) != len(cands) { | |||
| t.Fatalf("expected work items for all candidates, got %d", len(plan.WorkItems)) | |||
| } | |||
| item2 := findWorkItem(plan.WorkItems, 2) | |||
| if item2 == nil || item2.Status != RefinementStatusSelected || item2.Reason != RefinementReasonSelected { | |||
| t.Fatalf("expected candidate 2 selected with reason, got %+v", item2) | |||
| if item2 == nil || item2.Status != RefinementStatusPlanned || item2.Reason != RefinementReasonPlanned { | |||
| t.Fatalf("expected candidate 2 planned with reason, got %+v", item2) | |||
| } | |||
| item1 := findWorkItem(plan.WorkItems, 1) | |||
| if item1 == nil || item1.Reason != RefinementReasonBelowSNR { | |||
| t.Fatalf("expected candidate 1 dropped by snr, got %+v", item1) | |||
| } | |||
| item3 := findWorkItem(plan.WorkItems, 3) | |||
| if item3 == nil || item3.Reason != RefinementReasonBudget { | |||
| t.Fatalf("expected candidate 3 dropped by budget, got %+v", item3) | |||
| if item3 == nil || item3.Status != RefinementStatusPlanned { | |||
| t.Fatalf("expected candidate 3 planned pre-admission, got %+v", item3) | |||
| } | |||
| } | |||
| @@ -73,8 +79,8 @@ func TestBuildRefinementPlanRespectsMaxConcurrent(t *testing.T) { | |||
| if plan.BudgetSource != "refinement.max_concurrent" { | |||
| t.Fatalf("expected budget source refinement.max_concurrent, got %s", plan.BudgetSource) | |||
| } | |||
| if len(plan.Selected) != 2 { | |||
| t.Fatalf("expected 2 selected, got %d", len(plan.Selected)) | |||
| if len(plan.Selected) != 0 { | |||
| t.Fatalf("expected no selected until admission, got %d", len(plan.Selected)) | |||
| } | |||
| } | |||
| @@ -90,8 +96,8 @@ func TestBuildRefinementPlanAppliesMonitorSpan(t *testing.T) { | |||
| if plan.DroppedByMonitor != 2 { | |||
| t.Fatalf("expected 2 dropped by monitor, got %d", plan.DroppedByMonitor) | |||
| } | |||
| if len(plan.Selected) != 2 { | |||
| t.Fatalf("expected 2 selected within monitor, got %d", len(plan.Selected)) | |||
| if len(plan.Ranked) != 2 { | |||
| t.Fatalf("expected 2 ranked within monitor, got %d", len(plan.Ranked)) | |||
| } | |||
| } | |||
| @@ -107,8 +113,8 @@ func TestBuildRefinementPlanAppliesMonitorSpanCentered(t *testing.T) { | |||
| if plan.DroppedByMonitor != 1 { | |||
| t.Fatalf("expected 1 dropped by monitor, got %d", plan.DroppedByMonitor) | |||
| } | |||
| if len(plan.Selected) != 3 { | |||
| t.Fatalf("expected 3 selected within monitor, got %d", len(plan.Selected)) | |||
| if len(plan.Ranked) != 3 { | |||
| t.Fatalf("expected 3 ranked within monitor, got %d", len(plan.Ranked)) | |||
| } | |||
| } | |||
| @@ -133,7 +139,7 @@ func TestScheduleCandidatesPriorityBoost(t *testing.T) { | |||
| {ID: 1, SNRDb: 15, Hint: "voice"}, | |||
| {ID: 2, SNRDb: 14, Hint: "digital-burst"}, | |||
| }, policy) | |||
| if len(got) != 1 || got[0].Candidate.ID != 2 { | |||
| if len(got) != 2 || got[0].Candidate.ID != 2 { | |||
| t.Fatalf("expected priority boost to favor digital candidate, got %+v", got) | |||
| } | |||
| } | |||
| @@ -148,16 +154,17 @@ func TestBuildRefinementPlanPriorityStats(t *testing.T) { | |||
| if plan.PriorityMax < plan.PriorityMin { | |||
| t.Fatalf("priority bounds invalid: %+v", plan) | |||
| } | |||
| if len(plan.Selected) != 1 { | |||
| t.Fatalf("expected 1 selected, got %d", len(plan.Selected)) | |||
| res := AdmitRefinementPlan(plan, policy, time.Now(), &RefinementHold{Active: map[int64]time.Time{}}) | |||
| if len(res.Plan.Selected) != 1 { | |||
| t.Fatalf("expected 1 admitted, got %d", len(res.Plan.Selected)) | |||
| } | |||
| if plan.PriorityCutoff != plan.Selected[0].Priority { | |||
| t.Fatalf("expected cutoff to match selection, got %.2f vs %.2f", plan.PriorityCutoff, plan.Selected[0].Priority) | |||
| if res.Plan.PriorityCutoff != res.Plan.Selected[0].Priority { | |||
| t.Fatalf("expected cutoff to match selection, got %.2f vs %.2f", res.Plan.PriorityCutoff, res.Plan.Selected[0].Priority) | |||
| } | |||
| if plan.Selected[0].Breakdown == nil { | |||
| if res.Plan.Selected[0].Breakdown == nil { | |||
| t.Fatalf("expected breakdown on selected candidate") | |||
| } | |||
| if plan.Selected[0].Score == nil || plan.Selected[0].Score.Total == 0 { | |||
| if res.Plan.Selected[0].Score == nil || res.Plan.Selected[0].Score.Total == 0 { | |||
| t.Fatalf("expected score on selected candidate") | |||
| } | |||
| } | |||
| @@ -169,11 +176,53 @@ func TestBuildRefinementPlanStrategyBias(t *testing.T) { | |||
| {ID: 2, CenterHz: 200, SNRDb: 11, BandwidthHz: 100000, PeakDb: 1}, | |||
| } | |||
| plan := BuildRefinementPlan(cands, policy) | |||
| if len(plan.Selected) != 1 { | |||
| t.Fatalf("expected 1 selected, got %d", len(plan.Selected)) | |||
| if len(plan.Ranked) != 2 { | |||
| t.Fatalf("expected ranked candidates, got %d", len(plan.Ranked)) | |||
| } | |||
| if plan.Ranked[0].Candidate.ID != 2 { | |||
| t.Fatalf("expected archive-oriented strategy to favor wider candidate, got %+v", plan.Ranked[0]) | |||
| } | |||
| } | |||
| func TestAdmitRefinementPlanAppliesBudget(t *testing.T) { | |||
| policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 10} | |||
| cands := []Candidate{ | |||
| {ID: 2, CenterHz: 200, SNRDb: 12, BandwidthHz: 50000, PeakDb: 3}, | |||
| {ID: 3, CenterHz: 300, SNRDb: 11, BandwidthHz: 25000, PeakDb: 2}, | |||
| } | |||
| plan := BuildRefinementPlan(cands, policy) | |||
| res := AdmitRefinementPlan(plan, policy, time.Now(), &RefinementHold{Active: map[int64]time.Time{}}) | |||
| if len(res.Plan.Selected) != 1 || res.Plan.Selected[0].Candidate.ID != 2 { | |||
| t.Fatalf("unexpected admission selection: %+v", res.Plan.Selected) | |||
| } | |||
| if res.Plan.DroppedByBudget != 1 { | |||
| t.Fatalf("expected 1 dropped by budget, got %d", res.Plan.DroppedByBudget) | |||
| } | |||
| item2 := findWorkItem(res.WorkItems, 2) | |||
| if item2 == nil || item2.Status != RefinementStatusAdmitted { | |||
| t.Fatalf("expected candidate 2 admitted, got %+v", item2) | |||
| } | |||
| item3 := findWorkItem(res.WorkItems, 3) | |||
| if item3 == nil || item3.Status != RefinementStatusSkipped { | |||
| t.Fatalf("expected candidate 3 skipped, got %+v", item3) | |||
| } | |||
| } | |||
| func TestAdmitRefinementPlanDisplacedByHold(t *testing.T) { | |||
| policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0} | |||
| cands := []Candidate{ | |||
| {ID: 1, CenterHz: 100, SNRDb: 5}, | |||
| {ID: 2, CenterHz: 200, SNRDb: 12}, | |||
| } | |||
| if plan.Selected[0].Candidate.ID != 2 { | |||
| t.Fatalf("expected archive-oriented strategy to favor wider candidate, got %+v", plan.Selected[0]) | |||
| plan := BuildRefinementPlan(cands, policy) | |||
| hold := &RefinementHold{Active: map[int64]time.Time{1: time.Now().Add(2 * time.Second)}} | |||
| res := AdmitRefinementPlan(plan, policy, time.Now(), hold) | |||
| if len(res.Plan.Selected) != 1 || res.Plan.Selected[0].Candidate.ID != 1 { | |||
| t.Fatalf("expected held candidate to remain admitted, got %+v", res.Plan.Selected) | |||
| } | |||
| item2 := findWorkItem(res.WorkItems, 2) | |||
| if item2 == nil || item2.Status != RefinementStatusDisplaced { | |||
| t.Fatalf("expected higher priority candidate displaced, got %+v", item2) | |||
| } | |||
| } | |||