| @@ -3,14 +3,16 @@ package main | |||
| import "sdr-wideband-suite/internal/pipeline" | |||
| type compactDecision struct { | |||
| ID int64 `json:"id"` | |||
| Class string `json:"class,omitempty"` | |||
| Record bool `json:"record"` | |||
| Decode bool `json:"decode"` | |||
| Reason string `json:"reason,omitempty"` | |||
| RecordAdmission *pipeline.PriorityAdmission `json:"record_admission,omitempty"` | |||
| DecodeAdmission *pipeline.PriorityAdmission `json:"decode_admission,omitempty"` | |||
| Candidate pipeline.Candidate `json:"candidate"` | |||
| ID int64 `json:"id"` | |||
| Class string `json:"class,omitempty"` | |||
| Record bool `json:"record"` | |||
| Decode bool `json:"decode"` | |||
| Reason string `json:"reason,omitempty"` | |||
| MonitorBias float64 `json:"monitor_bias,omitempty"` | |||
| MonitorDetail *pipeline.MonitorWindowMatch `json:"monitor_detail,omitempty"` | |||
| RecordAdmission *pipeline.PriorityAdmission `json:"record_admission,omitempty"` | |||
| DecodeAdmission *pipeline.PriorityAdmission `json:"decode_admission,omitempty"` | |||
| Candidate pipeline.Candidate `json:"candidate"` | |||
| } | |||
| func compactDecisions(decisions []pipeline.SignalDecision) []compactDecision { | |||
| @@ -22,6 +24,8 @@ func compactDecisions(decisions []pipeline.SignalDecision) []compactDecision { | |||
| Record: d.ShouldRecord, | |||
| Decode: d.ShouldAutoDecode, | |||
| Reason: d.Reason, | |||
| MonitorBias: d.MonitorBias, | |||
| MonitorDetail: d.MonitorDetail, | |||
| RecordAdmission: d.RecordAdmission, | |||
| DecodeAdmission: d.DecodeAdmission, | |||
| Candidate: d.Candidate, | |||
| @@ -51,6 +51,7 @@ type CandidateWindowSummary struct { | |||
| EndHz float64 `json:"end_hz,omitempty"` | |||
| CenterHz float64 `json:"center_hz,omitempty"` | |||
| SpanHz float64 `json:"span_hz,omitempty"` | |||
| Priority float64 `json:"priority,omitempty"` | |||
| PriorityBias float64 `json:"priority_bias,omitempty"` | |||
| Candidates int `json:"candidates"` | |||
| } | |||
| @@ -232,6 +233,7 @@ func buildCandidateWindowSummary(candidates []pipeline.Candidate, windows []pipe | |||
| EndHz: win.EndHz, | |||
| CenterHz: win.CenterHz, | |||
| SpanHz: win.SpanHz, | |||
| Priority: win.Priority, | |||
| PriorityBias: win.PriorityBias, | |||
| } | |||
| index[win.Index] = len(out) | |||
| @@ -21,6 +21,7 @@ type MonitorWindow struct { | |||
| EndHz float64 `yaml:"end_hz" json:"end_hz"` | |||
| CenterHz float64 `yaml:"center_hz" json:"center_hz"` | |||
| SpanHz float64 `yaml:"span_hz" json:"span_hz"` | |||
| Priority float64 `yaml:"priority" json:"priority"` | |||
| } | |||
| type DetectorConfig struct { | |||
| @@ -2,6 +2,8 @@ package pipeline | |||
| import ( | |||
| "sort" | |||
| "strconv" | |||
| "strings" | |||
| "time" | |||
| ) | |||
| @@ -39,12 +41,14 @@ type DecisionQueueStats struct { | |||
| } | |||
| type queuedDecision struct { | |||
| ID int64 | |||
| SNRDb float64 | |||
| Hint string | |||
| Class string | |||
| FirstSeen time.Time | |||
| LastSeen time.Time | |||
| ID int64 | |||
| SNRDb float64 | |||
| Hint string | |||
| Class string | |||
| WindowTag string | |||
| WindowBias float64 | |||
| FirstSeen time.Time | |||
| LastSeen time.Time | |||
| } | |||
| type queueSelection struct { | |||
| @@ -60,6 +64,7 @@ type queueSelection struct { | |||
| families map[int64]string | |||
| familyRanks map[int64]int | |||
| tierFloors map[int64]string | |||
| windowTags map[int64]string | |||
| minScore float64 | |||
| maxScore float64 | |||
| cutoff float64 | |||
| @@ -104,6 +109,8 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, | |||
| qd.SNRDb = decisions[i].Candidate.SNRDb | |||
| qd.Hint = decisions[i].Candidate.Hint | |||
| qd.Class = decisions[i].Class | |||
| qd.WindowTag = windowTagForDecision(decisions[i]) | |||
| qd.WindowBias = decisions[i].MonitorBias | |||
| qd.LastSeen = now | |||
| recSeen[id] = true | |||
| } | |||
| @@ -116,6 +123,8 @@ func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, | |||
| qd.SNRDb = decisions[i].Candidate.SNRDb | |||
| qd.Hint = decisions[i].Candidate.Hint | |||
| qd.Class = decisions[i].Class | |||
| qd.WindowTag = windowTagForDecision(decisions[i]) | |||
| qd.WindowBias = decisions[i].MonitorBias | |||
| qd.LastSeen = now | |||
| decSeen[id] = true | |||
| } | |||
| @@ -228,6 +237,7 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in | |||
| families: map[int64]string{}, | |||
| familyRanks: map[int64]int{}, | |||
| tierFloors: map[int64]string{}, | |||
| windowTags: map[int64]string{}, | |||
| } | |||
| if len(queue) == 0 { | |||
| return selection | |||
| @@ -255,7 +265,10 @@ func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[in | |||
| selection.families[id] = family | |||
| selection.familyRanks[id] = familyRank | |||
| selection.tierFloors[id] = signalPriorityTierFloor(familyRank) | |||
| score := qd.SNRDb + boost + policyBoost | |||
| if qd.WindowTag != "" { | |||
| selection.windowTags[id] = qd.WindowTag | |||
| } | |||
| score := qd.SNRDb + boost + policyBoost + qd.WindowBias | |||
| selection.scores[id] = score | |||
| if len(scoredList) == 0 || score < selection.minScore { | |||
| selection.minScore = score | |||
| @@ -398,6 +411,7 @@ func buildQueueAdmission(queueName string, id int64, selection queueSelection, p | |||
| if !ok { | |||
| return nil | |||
| } | |||
| windowTag := selection.windowTags[id] | |||
| admission := &PriorityAdmission{ | |||
| Basis: queueName, | |||
| Score: score, | |||
| @@ -411,6 +425,9 @@ func buildQueueAdmission(queueName string, id int64, selection queueSelection, p | |||
| if _, held := selection.held[id]; held { | |||
| admission.Class = AdmissionClassHold | |||
| extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)} | |||
| if windowTag != "" { | |||
| extras = append(extras, windowTag) | |||
| } | |||
| if _, ok := selection.protected[id]; ok { | |||
| extras = append(extras, ReasonTagHoldProtected) | |||
| } | |||
| @@ -418,6 +435,9 @@ func buildQueueAdmission(queueName string, id int64, selection queueSelection, p | |||
| } else { | |||
| admission.Class = AdmissionClassAdmit | |||
| extras := []string{pressureTag, "budget:" + slugToken(budgetSource)} | |||
| if windowTag != "" { | |||
| extras = append(extras, windowTag) | |||
| } | |||
| if _, ok := selection.opportunistic[id]; ok { | |||
| extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced) | |||
| } | |||
| @@ -427,16 +447,27 @@ func buildQueueAdmission(queueName string, id int64, selection queueSelection, p | |||
| } | |||
| if _, ok := selection.displaced[id]; ok { | |||
| admission.Class = AdmissionClassDisplace | |||
| admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:"+slugToken(budgetSource)) | |||
| extras := []string{pressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budgetSource)} | |||
| if windowTag != "" { | |||
| extras = append(extras, windowTag) | |||
| } | |||
| admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, extras...) | |||
| return admission | |||
| } | |||
| if _, ok := selection.displacedByHold[id]; ok { | |||
| admission.Class = AdmissionClassDisplace | |||
| admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:"+slugToken(budgetSource)) | |||
| extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)} | |||
| if windowTag != "" { | |||
| extras = append(extras, windowTag) | |||
| } | |||
| admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, extras...) | |||
| return admission | |||
| } | |||
| admission.Class = AdmissionClassDefer | |||
| extras := []string{pressureTag, "pressure:budget", "budget:" + slugToken(budgetSource)} | |||
| if windowTag != "" { | |||
| extras = append(extras, windowTag) | |||
| } | |||
| if _, ok := selection.expired[id]; ok { | |||
| extras = append(extras, ReasonTagHoldExpired) | |||
| } | |||
| @@ -444,6 +475,21 @@ func buildQueueAdmission(queueName string, id int64, selection queueSelection, p | |||
| return admission | |||
| } | |||
| func windowTagForDecision(decision SignalDecision) string { | |||
| if decision.MonitorBias == 0 || decision.MonitorDetail == nil { | |||
| return "" | |||
| } | |||
| label := strings.TrimSpace(decision.MonitorDetail.Label) | |||
| if label == "" { | |||
| label = "index-" + strconv.Itoa(decision.MonitorDetail.Index) | |||
| } | |||
| label = slugToken(label) | |||
| if label == "" { | |||
| return "" | |||
| } | |||
| return "window:" + label | |||
| } | |||
| func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 { | |||
| oldest := 0.0 | |||
| first := true | |||
| @@ -65,6 +65,42 @@ func TestDecisionQueueEnforcesBudgets(t *testing.T) { | |||
| } | |||
| } | |||
| func TestDecisionQueueMonitorWindowBiasSelectsPreferred(t *testing.T) { | |||
| arbiter := NewArbiter() | |||
| policy := Policy{ | |||
| DecisionHoldMs: 250, | |||
| AutoRecordClasses: []string{"test"}, | |||
| MonitorWindows: finalizeMonitorWindows([]MonitorWindow{ | |||
| {Label: "low", StartHz: 100, EndHz: 200, SpanHz: 100, Priority: -1}, | |||
| {Label: "high", StartHz: 300, EndHz: 400, SpanHz: 100, Priority: 1}, | |||
| }), | |||
| } | |||
| budget := BudgetModel{Record: BudgetQueue{Max: 1}} | |||
| now := time.Now() | |||
| decisions := []SignalDecision{ | |||
| DecideSignalAction(policy, Candidate{ID: 1, CenterHz: 150, SNRDb: 10, Hint: "test"}, nil), | |||
| DecideSignalAction(policy, Candidate{ID: 2, CenterHz: 350, SNRDb: 10, Hint: "test"}, nil), | |||
| } | |||
| arbiter.ApplyDecisions(decisions, budget, now, policy) | |||
| if decisions[0].MonitorBias == 0 || decisions[1].MonitorBias == 0 { | |||
| t.Fatalf("expected monitor bias to be applied to both decisions") | |||
| } | |||
| if decisions[0].ShouldRecord { | |||
| t.Fatalf("expected low-priority window decision to be deferred") | |||
| } | |||
| if !decisions[1].ShouldRecord { | |||
| t.Fatalf("expected high-priority window decision to be selected") | |||
| } | |||
| if decisions[1].RecordAdmission == nil || decisions[1].RecordAdmission.Class != AdmissionClassAdmit { | |||
| t.Fatalf("expected admit admission, got %+v", decisions[1].RecordAdmission) | |||
| } | |||
| if decisions[1].RecordAdmission == nil || !strings.Contains(decisions[1].RecordAdmission.Reason, "window:high") { | |||
| t.Fatalf("expected window tag in admission reason, got %+v", decisions[1].RecordAdmission) | |||
| } | |||
| } | |||
| func TestDecisionQueueHoldKeepsSelection(t *testing.T) { | |||
| arbiter := NewArbiter() | |||
| policy := Policy{DecisionHoldMs: 500} | |||
| @@ -7,16 +7,21 @@ import ( | |||
| ) | |||
| type SignalDecision struct { | |||
| Candidate Candidate `json:"candidate"` | |||
| Class string `json:"class,omitempty"` | |||
| ShouldRecord bool `json:"should_record"` | |||
| ShouldAutoDecode bool `json:"should_auto_decode"` | |||
| Reason string `json:"reason,omitempty"` | |||
| RecordAdmission *PriorityAdmission `json:"record_admission,omitempty"` | |||
| DecodeAdmission *PriorityAdmission `json:"decode_admission,omitempty"` | |||
| Candidate Candidate `json:"candidate"` | |||
| Class string `json:"class,omitempty"` | |||
| ShouldRecord bool `json:"should_record"` | |||
| ShouldAutoDecode bool `json:"should_auto_decode"` | |||
| Reason string `json:"reason,omitempty"` | |||
| MonitorBias float64 `json:"monitor_bias,omitempty"` | |||
| MonitorDetail *MonitorWindowMatch `json:"monitor_detail,omitempty"` | |||
| RecordAdmission *PriorityAdmission `json:"record_admission,omitempty"` | |||
| DecodeAdmission *PriorityAdmission `json:"decode_admission,omitempty"` | |||
| } | |||
| func DecideSignalAction(policy Policy, candidate Candidate, cls *classifier.Classification) SignalDecision { | |||
| if len(policy.MonitorWindows) > 0 { | |||
| _ = ApplyMonitorWindowMatches(policy, &candidate) | |||
| } | |||
| decision := SignalDecision{Candidate: candidate} | |||
| classTag := "" | |||
| hintTag := strings.TrimSpace(candidate.Hint) | |||
| @@ -45,5 +50,10 @@ func DecideSignalAction(policy Policy, candidate Candidate, cls *classifier.Clas | |||
| if decision.Reason == "" && candidate.Hint != "" { | |||
| decision.Reason = DecisionReasonHintOnly | |||
| } | |||
| monitorBias, monitorDetail := MonitorWindowBias(policy, candidate) | |||
| if monitorBias != 0 { | |||
| decision.MonitorBias = monitorBias | |||
| decision.MonitorDetail = monitorDetail | |||
| } | |||
| return decision | |||
| } | |||
| @@ -61,13 +61,23 @@ func finalizeMonitorWindows(windows []MonitorWindow) []MonitorWindow { | |||
| } | |||
| for i := range windows { | |||
| windows[i].Index = i | |||
| priority := normalizeMonitorPriority(windows[i].Priority) | |||
| windows[i].Priority = priority | |||
| spanBias := 0.0 | |||
| if maxSpan > 0 && len(windows) > 1 && windows[i].SpanHz > 0 { | |||
| bias := maxMonitorWindowBias * (1 - (windows[i].SpanHz / maxSpan)) | |||
| if bias < 0 { | |||
| bias = 0 | |||
| spanBias = maxMonitorWindowBias * (1 - (windows[i].SpanHz / maxSpan)) | |||
| if spanBias < 0 { | |||
| spanBias = 0 | |||
| } | |||
| windows[i].PriorityBias = bias | |||
| } | |||
| policyBias := priority * maxMonitorWindowBias | |||
| totalBias := spanBias + policyBias | |||
| if totalBias > maxMonitorWindowBias { | |||
| totalBias = maxMonitorWindowBias | |||
| } else if totalBias < -maxMonitorWindowBias { | |||
| totalBias = -maxMonitorWindowBias | |||
| } | |||
| windows[i].PriorityBias = totalBias | |||
| } | |||
| return windows | |||
| } | |||
| @@ -101,6 +111,7 @@ func normalizeGoalWindow(raw config.MonitorWindow, fallbackCenter float64) (Moni | |||
| CenterHz: (raw.StartHz + raw.EndHz) / 2, | |||
| SpanHz: span, | |||
| Source: "goals:window:start_end", | |||
| Priority: raw.Priority, | |||
| }, true | |||
| } | |||
| center := raw.CenterHz | |||
| @@ -120,11 +131,25 @@ func normalizeGoalWindow(raw config.MonitorWindow, fallbackCenter float64) (Moni | |||
| CenterHz: center, | |||
| SpanHz: raw.SpanHz, | |||
| Source: source, | |||
| Priority: raw.Priority, | |||
| }, true | |||
| } | |||
| return MonitorWindow{}, false | |||
| } | |||
| func normalizeMonitorPriority(priority float64) float64 { | |||
| if math.IsNaN(priority) || math.IsInf(priority, 0) { | |||
| return 0 | |||
| } | |||
| if priority > 1 { | |||
| return 1 | |||
| } | |||
| if priority < -1 { | |||
| return -1 | |||
| } | |||
| return priority | |||
| } | |||
| func monitorBounds(policy Policy) (float64, float64, bool) { | |||
| if len(policy.MonitorWindows) > 0 { | |||
| return MonitorWindowBounds(policy.MonitorWindows) | |||
| @@ -90,3 +90,32 @@ func TestMonitorWindowBiasPrefersNarrowWindow(t *testing.T) { | |||
| t.Fatalf("expected positive bias, got %.3f", bias) | |||
| } | |||
| } | |||
| func TestMonitorWindowPriorityBiasUsesPriority(t *testing.T) { | |||
| goals := config.PipelineGoalConfig{ | |||
| MonitorWindows: []config.MonitorWindow{ | |||
| {Label: "low", StartHz: 100, EndHz: 200, Priority: -1}, | |||
| {Label: "high", StartHz: 300, EndHz: 400, Priority: 1}, | |||
| }, | |||
| } | |||
| policy := Policy{MonitorWindows: NormalizeMonitorWindows(goals, 0)} | |||
| var low, high *MonitorWindow | |||
| for i := range policy.MonitorWindows { | |||
| win := &policy.MonitorWindows[i] | |||
| switch win.Label { | |||
| case "low": | |||
| low = win | |||
| case "high": | |||
| high = win | |||
| } | |||
| } | |||
| if low == nil || high == nil { | |||
| t.Fatalf("expected both windows") | |||
| } | |||
| if low.Priority != -1 || high.Priority != 1 { | |||
| t.Fatalf("unexpected priority values: low=%.2f high=%.2f", low.Priority, high.Priority) | |||
| } | |||
| if high.PriorityBias <= low.PriorityBias { | |||
| t.Fatalf("expected high priority bias > low priority bias, got %.3f vs %.3f", high.PriorityBias, low.PriorityBias) | |||
| } | |||
| } | |||
| @@ -420,6 +420,7 @@ func buildMonitorWindowStats(windows []MonitorWindow) []MonitorWindowStats { | |||
| EndHz: win.EndHz, | |||
| CenterHz: win.CenterHz, | |||
| SpanHz: win.SpanHz, | |||
| Priority: win.Priority, | |||
| PriorityBias: win.PriorityBias, | |||
| }) | |||
| } | |||
| @@ -39,6 +39,7 @@ type MonitorWindow struct { | |||
| CenterHz float64 `json:"center_hz,omitempty"` | |||
| SpanHz float64 `json:"span_hz,omitempty"` | |||
| Source string `json:"source,omitempty"` | |||
| Priority float64 `json:"priority,omitempty"` | |||
| PriorityBias float64 `json:"priority_bias,omitempty"` | |||
| } | |||
| @@ -66,6 +67,7 @@ type MonitorWindowStats struct { | |||
| EndHz float64 `json:"end_hz,omitempty"` | |||
| CenterHz float64 `json:"center_hz,omitempty"` | |||
| SpanHz float64 `json:"span_hz,omitempty"` | |||
| Priority float64 `json:"priority,omitempty"` | |||
| PriorityBias float64 `json:"priority_bias,omitempty"` | |||
| Candidates int `json:"candidates,omitempty"` | |||
| Planned int `json:"planned,omitempty"` | |||
| @@ -508,6 +508,9 @@ func (m *Manager) ApplyConfig(update ConfigUpdate) (config.Config, error) { | |||
| func validateMonitorWindows(windows []config.MonitorWindow) error { | |||
| for i, w := range windows { | |||
| if math.IsNaN(w.Priority) || math.IsInf(w.Priority, 0) || w.Priority < -1 || w.Priority > 1 { | |||
| return fmt.Errorf("monitor_windows[%d] priority must be between -1 and 1", i) | |||
| } | |||
| hasStart := w.StartHz != 0 || w.EndHz != 0 | |||
| if hasStart { | |||
| if w.StartHz <= 0 || w.EndHz <= 0 || w.EndHz <= w.StartHz { | |||