Parcourir la source

feat: add decision queues and queue stats

master
Jan Svabenik il y a 8 heures
Parent
révision
39611f981d
7 fichiers modifiés avec 165 ajouts et 30 suppressions
  1. +138
    -23
      cmd/sdrd/decision_budget.go
  2. +8
    -6
      cmd/sdrd/decision_budget_test.go
  3. +9
    -0
      cmd/sdrd/dsp_loop.go
  4. +2
    -0
      cmd/sdrd/http_handlers.go
  5. +2
    -0
      cmd/sdrd/phase_state.go
  6. +2
    -0
      cmd/sdrd/phase_state_test.go
  7. +4
    -1
      cmd/sdrd/pipeline_runtime.go

+ 138
- 23
cmd/sdrd/decision_budget.go Voir le fichier

@@ -2,39 +2,154 @@ package main

import (
"sort"
"time"

"sdr-wideband-suite/internal/pipeline"
)

func enforceDecisionBudgets(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int) (int, int) {
recorded := 0
decoded := 0
order := make([]int, len(decisions))
type decisionQueueStats struct {
RecordQueued int `json:"record_queued"`
DecodeQueued int `json:"decode_queued"`
RecordSelected int `json:"record_selected"`
DecodeSelected int `json:"decode_selected"`
RecordOldestS float64 `json:"record_oldest_sec"`
DecodeOldestS float64 `json:"decode_oldest_sec"`
}

type queuedDecision struct {
ID int64
SNRDb float64
FirstSeen time.Time
LastSeen time.Time
}

type decisionQueues struct {
record map[int64]*queuedDecision
decode map[int64]*queuedDecision
}

func newDecisionQueues() *decisionQueues {
return &decisionQueues{record: map[int64]*queuedDecision{}, decode: map[int64]*queuedDecision{}}
}

func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int, now time.Time) decisionQueueStats {
if dq == nil {
return decisionQueueStats{}
}
recSeen := map[int64]bool{}
decSeen := map[int64]bool{}
for i := range decisions {
order[i] = i
id := decisions[i].Candidate.ID
if id == 0 {
continue
}
if decisions[i].ShouldRecord {
qd := dq.record[id]
if qd == nil {
qd = &queuedDecision{ID: id, FirstSeen: now}
dq.record[id] = qd
}
qd.SNRDb = decisions[i].Candidate.SNRDb
qd.LastSeen = now
recSeen[id] = true
}
if decisions[i].ShouldAutoDecode {
qd := dq.decode[id]
if qd == nil {
qd = &queuedDecision{ID: id, FirstSeen: now}
dq.decode[id] = qd
}
qd.SNRDb = decisions[i].Candidate.SNRDb
qd.LastSeen = now
decSeen[id] = true
}
}
sort.SliceStable(order, func(i, j int) bool {
return decisions[order[i]].Candidate.SNRDb > decisions[order[j]].Candidate.SNRDb
})
for _, idx := range order {
if decisions[idx].ShouldRecord {
if maxRecord > 0 && recorded >= maxRecord {
decisions[idx].ShouldRecord = false
decisions[idx].Reason = "recording budget exceeded"
} else {
recorded++
for id := range dq.record {
if !recSeen[id] {
delete(dq.record, id)
}
}
for id := range dq.decode {
if !decSeen[id] {
delete(dq.decode, id)
}
}

recSelected := selectQueued(dq.record, maxRecord, now)
decSelected := selectQueued(dq.decode, maxDecode, now)

stats := decisionQueueStats{
RecordQueued: len(dq.record),
DecodeQueued: len(dq.decode),
RecordSelected: len(recSelected),
DecodeSelected: len(decSelected),
RecordOldestS: oldestAge(dq.record, now),
DecodeOldestS: oldestAge(dq.decode, now),
}

for i := range decisions {
id := decisions[i].Candidate.ID
if decisions[i].ShouldRecord {
if _, ok := recSelected[id]; !ok {
decisions[i].ShouldRecord = false
decisions[i].Reason = "queued: record budget"
}
}
if decisions[idx].ShouldAutoDecode {
if maxDecode > 0 && decoded >= maxDecode {
decisions[idx].ShouldAutoDecode = false
if decisions[idx].Reason == "" {
decisions[idx].Reason = "decode budget exceeded"
if decisions[i].ShouldAutoDecode {
if _, ok := decSelected[id]; !ok {
decisions[i].ShouldAutoDecode = false
if decisions[i].Reason == "" {
decisions[i].Reason = "queued: decode budget"
}
} else {
decoded++
}
}
}
return recorded, decoded
return stats
}

func selectQueued(queue map[int64]*queuedDecision, max int, now time.Time) map[int64]struct{} {
selected := map[int64]struct{}{}
if len(queue) == 0 {
return selected
}
type scored struct {
id int64
score float64
}
scoredList := make([]scored, 0, len(queue))
for id, qd := range queue {
age := now.Sub(qd.FirstSeen).Seconds()
boost := age / 2.0
if boost > 5 {
boost = 5
}
scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost})
}
sort.Slice(scoredList, func(i, j int) bool {
return scoredList[i].score > scoredList[j].score
})
limit := max
if limit <= 0 || limit > len(scoredList) {
limit = len(scoredList)
}
for i := 0; i < limit; i++ {
selected[scoredList[i].id] = struct{}{}
}
return selected
}

func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 {
oldest := 0.0
first := true
for _, qd := range queue {
age := now.Sub(qd.FirstSeen).Seconds()
if first || age > oldest {
oldest = age
first = false
}
}
if first {
return 0
}
return oldest
}

+ 8
- 6
cmd/sdrd/decision_budget_test.go Voir le fichier

@@ -2,19 +2,21 @@ package main

import (
"testing"
"time"

"sdr-wideband-suite/internal/pipeline"
)

func TestEnforceDecisionBudgets(t *testing.T) {
decisions := []pipeline.SignalDecision{
{Candidate: pipeline.Candidate{SNRDb: 5}, ShouldRecord: true, ShouldAutoDecode: true},
{Candidate: pipeline.Candidate{SNRDb: 15}, ShouldRecord: true, ShouldAutoDecode: true},
{Candidate: pipeline.Candidate{SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false},
{Candidate: pipeline.Candidate{ID: 1, SNRDb: 5}, ShouldRecord: true, ShouldAutoDecode: true},
{Candidate: pipeline.Candidate{ID: 2, SNRDb: 15}, ShouldRecord: true, ShouldAutoDecode: true},
{Candidate: pipeline.Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false},
}
recorded, decoded := enforceDecisionBudgets(decisions, 1, 1)
if recorded != 1 || decoded != 1 {
t.Fatalf("unexpected counts: record=%d decode=%d", recorded, decoded)
q := newDecisionQueues()
stats := q.Apply(decisions, 1, 1, time.Now())
if stats.RecordSelected != 1 || stats.DecodeSelected != 1 {
t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected)
}
if !decisions[1].ShouldRecord || !decisions[1].ShouldAutoDecode {
t.Fatalf("expected highest SNR decision to remain allowed")


+ 9
- 0
cmd/sdrd/dsp_loop.go Voir le fichier

@@ -92,6 +92,15 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
state.refinement = pipeline.RefinementResult{}
displaySignals = rt.det.StableSignals()
}
state.queueStats = rt.queueStats
state.presentation = pipeline.AnalysisLevel{
Name: "presentation",
SampleRate: rt.cfg.SampleRate,
FFTSize: rt.cfg.Surveillance.DisplayBins,
CenterHz: rt.cfg.CenterHz,
SpanHz: float64(rt.cfg.SampleRate),
Source: "display",
}
if phaseSnap != nil {
phaseSnap.Set(*state)
}


+ 2
- 0
cmd/sdrd/http_handlers.go Voir le fichier

@@ -159,6 +159,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime
"plan": snap.refinementInput.Plan,
"windows": snap.refinementInput.Windows,
"window_stats": windowStats,
"queue_stats": snap.queueStats,
"candidates": len(snap.refinementInput.Candidates),
"scheduled": len(snap.refinementInput.Scheduled),
"signals": len(snap.refinement.Signals),
@@ -167,6 +168,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime
"decision_items": compactDecisions(snap.refinement.Decisions),
"surveillance_level": snap.surveillance.Level,
"refinement_level": snap.refinementInput.Level,
"presentation_level": snap.presentation,
}
_ = json.NewEncoder(w).Encode(out)
})


+ 2
- 0
cmd/sdrd/phase_state.go Voir le fichier

@@ -6,4 +6,6 @@ type phaseState struct {
surveillance pipeline.SurveillanceResult
refinementInput pipeline.RefinementInput
refinement pipeline.RefinementResult
queueStats decisionQueueStats
presentation pipeline.AnalysisLevel
}

+ 2
- 0
cmd/sdrd/phase_state_test.go Voir le fichier

@@ -11,6 +11,8 @@ func TestPhaseStateCarriesPhaseResults(t *testing.T) {
surveillance: pipeline.SurveillanceResult{NoiseFloor: -90, Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}},
refinementInput: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6},
refinement: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}},
queueStats: decisionQueueStats{RecordQueued: 1},
presentation: pipeline.AnalysisLevel{Name: "presentation"},
}
if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 {
t.Fatalf("unexpected surveillance state: %+v", ps.surveillance)


+ 4
- 1
cmd/sdrd/pipeline_runtime.go Voir le fichier

@@ -39,6 +39,8 @@ type dspRuntime struct {
rdsMap map[int64]*rdsState
streamPhaseState map[int64]*streamExtractState
streamOverlap *streamIQOverlap
decisionQueues *decisionQueues
queueStats decisionQueueStats
gotSamples bool
}

@@ -356,7 +358,8 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin
}
maxRecord := rt.cfg.Resources.MaxRecordingStreams
maxDecode := rt.cfg.Resources.MaxDecodeJobs
enforceDecisionBudgets(decisions, maxRecord, maxDecode)
queueStats := rt.decisionQueues.Apply(decisions, maxRecord, maxDecode, art.now)
rt.queueStats = queueStats
summary := summarizeDecisions(decisions)
if rec != nil {
if summary.RecordEnabled > 0 {


Chargement…
Annuler
Enregistrer