Преглед на файлове

feat: queue hold policy weighting and lowres level

master
Jan Svabenik преди 9 часа
родител
ревизия
a57b0034e5
променени са 3 файла, в които са добавени 39 реда и са изтрити 8 реда
  1. +16
    -5
      cmd/sdrd/decision_budget.go
  2. +1
    -1
      cmd/sdrd/decision_budget_test.go
  3. +22
    -2
      cmd/sdrd/pipeline_runtime.go

+ 16
- 5
cmd/sdrd/decision_budget.go Целия файл

@@ -21,6 +21,8 @@ type decisionQueueStats struct {
type queuedDecision struct {
ID int64
SNRDb float64
Hint string
Class string
FirstSeen time.Time
LastSeen time.Time
}
@@ -41,7 +43,7 @@ func newDecisionQueues() *decisionQueues {
}
}

func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int, hold time.Duration, now time.Time) decisionQueueStats {
func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord int, maxDecode int, hold time.Duration, now time.Time, policy pipeline.Policy) decisionQueueStats {
if dq == nil {
return decisionQueueStats{}
}
@@ -59,6 +61,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i
dq.record[id] = qd
}
qd.SNRDb = decisions[i].Candidate.SNRDb
qd.Hint = decisions[i].Candidate.Hint
qd.Class = decisions[i].Class
qd.LastSeen = now
recSeen[id] = true
}
@@ -69,6 +73,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i
dq.decode[id] = qd
}
qd.SNRDb = decisions[i].Candidate.SNRDb
qd.Hint = decisions[i].Candidate.Hint
qd.Class = decisions[i].Class
qd.LastSeen = now
decSeen[id] = true
}
@@ -87,8 +93,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i
purgeExpired(dq.recordHold, now)
purgeExpired(dq.decodeHold, now)

recSelected := selectQueued(dq.record, dq.recordHold, maxRecord, hold, now)
decSelected := selectQueued(dq.decode, dq.decodeHold, maxDecode, hold, now)
recSelected := selectQueued(dq.record, dq.recordHold, maxRecord, hold, now, policy)
decSelected := selectQueued(dq.decode, dq.decodeHold, maxDecode, hold, now, policy)

stats := decisionQueueStats{
RecordQueued: len(dq.record),
@@ -121,7 +127,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i
return stats
}

func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time) map[int64]struct{} {
func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy pipeline.Policy) map[int64]struct{} {
selected := map[int64]struct{}{}
if len(queue) == 0 {
return selected
@@ -137,7 +143,12 @@ func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max
if boost > 5 {
boost = 5
}
scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost})
hint := qd.Hint
if hint == "" {
hint = qd.Class
}
policyBoost := pipeline.CandidatePriorityBoost(policy, hint)
scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost})
}
sort.Slice(scoredList, func(i, j int) bool {
return scoredList[i].score > scoredList[j].score


+ 1
- 1
cmd/sdrd/decision_budget_test.go Целия файл

@@ -14,7 +14,7 @@ func TestEnforceDecisionBudgets(t *testing.T) {
{Candidate: pipeline.Candidate{ID: 3, SNRDb: 10}, ShouldRecord: true, ShouldAutoDecode: false},
}
q := newDecisionQueues()
stats := q.Apply(decisions, 1, 1, 0, time.Now())
stats := q.Apply(decisions, 1, 1, 0, time.Now(), pipeline.Policy{SignalPriorities: []string{"digital"}})
if stats.RecordSelected != 1 || stats.DecodeSelected != 1 {
t.Fatalf("unexpected counts: record=%d decode=%d", stats.RecordSelected, stats.DecodeSelected)
}


+ 22
- 2
cmd/sdrd/pipeline_runtime.go Целия файл

@@ -229,6 +229,22 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S
SpanHz: float64(rt.cfg.SampleRate),
Source: "baseband",
}
lowRate := rt.cfg.SampleRate / 2
lowFFT := rt.cfg.Surveillance.AnalysisFFTSize / 2
if lowRate < 200000 {
lowRate = rt.cfg.SampleRate
}
if lowFFT < 256 {
lowFFT = rt.cfg.Surveillance.AnalysisFFTSize
}
lowLevel := pipeline.AnalysisLevel{
Name: "surveillance-lowres",
SampleRate: lowRate,
FFTSize: lowFFT,
CenterHz: rt.cfg.CenterHz,
SpanHz: float64(lowRate),
Source: "downsampled",
}
displayLevel := pipeline.AnalysisLevel{
Name: "presentation",
SampleRate: rt.cfg.SampleRate,
@@ -237,9 +253,13 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S
SpanHz: float64(rt.cfg.SampleRate),
Source: "display",
}
levels := []pipeline.AnalysisLevel{level}
if lowLevel.SampleRate != level.SampleRate || lowLevel.FFTSize != level.FFTSize {
levels = append(levels, lowLevel)
}
return pipeline.SurveillanceResult{
Level: level,
Levels: []pipeline.AnalysisLevel{level},
Levels: levels,
DisplayLevel: displayLevel,
Candidates: candidates,
Scheduled: scheduled,
@@ -369,7 +389,7 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin
maxRecord := rt.cfg.Resources.MaxRecordingStreams
maxDecode := rt.cfg.Resources.MaxDecodeJobs
hold := time.Duration(rt.cfg.Resources.DecisionHoldMs) * time.Millisecond
queueStats := rt.decisionQueues.Apply(decisions, maxRecord, maxDecode, hold, art.now)
queueStats := rt.decisionQueues.Apply(decisions, maxRecord, maxDecode, hold, art.now, policy)
rt.queueStats = queueStats
summary := summarizeDecisions(decisions)
if rec != nil {


Loading…
Отказ
Запис