Преглед изворни кода

dsp: decouple refinement step

master
Jan Svabenik пре 9 часа
родитељ
комит
4b9a48fe3f
7 измењених фајлова са 109 додато и 99 уклоњено
  1. +4
    -4
      cmd/sdrd/decision_budget.go
  2. +6
    -8
      cmd/sdrd/dsp_loop.go
  3. +10
    -10
      cmd/sdrd/http_handlers.go
  4. +3
    -3
      cmd/sdrd/phase_snapshot_test.go
  5. +4
    -5
      cmd/sdrd/phase_state.go
  6. +11
    -9
      cmd/sdrd/phase_state_test.go
  7. +71
    -60
      cmd/sdrd/pipeline_runtime.go

+ 4
- 4
cmd/sdrd/decision_budget.go Прегледај датотеку

@@ -93,8 +93,8 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i
purgeExpired(dq.recordHold, now)
purgeExpired(dq.decodeHold, now)

recSelected := selectQueued(dq.record, dq.recordHold, maxRecord, hold, now, policy)
decSelected := selectQueued(dq.decode, dq.decodeHold, maxDecode, hold, now, policy)
recSelected := selectQueued("record", dq.record, dq.recordHold, maxRecord, hold, now, policy)
decSelected := selectQueued("decode", dq.decode, dq.decodeHold, maxDecode, hold, now, policy)

stats := decisionQueueStats{
RecordQueued: len(dq.record),
@@ -127,7 +127,7 @@ func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, maxRecord i
return stats
}

func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy pipeline.Policy) map[int64]struct{} {
func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy pipeline.Policy) map[int64]struct{} {
selected := map[int64]struct{}{}
if len(queue) == 0 {
return selected
@@ -147,7 +147,7 @@ func selectQueued(queue map[int64]*queuedDecision, hold map[int64]time.Time, max
if hint == "" {
hint = qd.Class
}
policyBoost := pipeline.CandidatePriorityBoost(policy, hint)
policyBoost := pipeline.DecisionPriorityBoost(policy, hint, qd.Class, queueName)
scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost})
}
sort.Slice(scoredList, func(i, j int) bool {


+ 6
- 8
cmd/sdrd/dsp_loop.go Прегледај датотеку

@@ -58,14 +58,13 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
rt.gotSamples = true
}
state.surveillance = rt.buildSurveillanceResult(art)
state.refinementInput = rt.buildRefinementInput(state.surveillance)
state.refinement = rt.runRefinement(art, state.surveillance, extractMgr, rec)
finished := state.surveillance.Finished
thresholds := state.surveillance.Thresholds
noiseFloor := state.surveillance.NoiseFloor
var displaySignals []detector.Signal
if len(art.iq) > 0 {
state.refinement = rt.refineSignals(art, state.refinementInput, extractMgr, rec)
displaySignals = state.refinement.Signals
if len(art.detailIQ) > 0 {
displaySignals = state.refinement.Result.Signals
if rec != nil && len(displaySignals) > 0 && len(art.allIQ) > 0 {
aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
streamSnips, streamRates := extractForStreaming(extractMgr, art.allIQ, rt.cfg.SampleRate, rt.cfg.CenterHz, displaySignals, rt.streamPhaseState, rt.streamOverlap, aqCfg)
@@ -89,7 +88,6 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
}
rt.maintenance(displaySignals, rec)
} else {
state.refinement = pipeline.RefinementResult{}
displaySignals = rt.det.StableSignals()
}
state.queueStats = rt.queueStats
@@ -119,8 +117,8 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
rec.OnEvents(evCopy)
}
var debugInfo *SpectrumDebug
plan := state.refinementInput.Plan
windowStats := buildWindowStats(state.refinementInput.Windows)
plan := state.refinement.Input.Plan
windowStats := buildWindowStats(state.refinement.Input.Windows)
hasPlan := plan.TotalCandidates > 0 || plan.Budget > 0 || plan.DroppedBySNR > 0 || plan.DroppedByBudget > 0
hasWindows := windowStats != nil && windowStats.Count > 0
if len(thresholds) > 0 || len(displaySignals) > 0 || noiseFloor != 0 || hasPlan || hasWindows {
@@ -150,7 +148,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
debugInfo.Windows = windowStats
}
}
h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.spectrum, Signals: displaySignals, Debug: debugInfo})
h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.surveillanceSpectrum, Signals: displaySignals, Debug: debugInfo})
}
}
}

+ 10
- 10
cmd/sdrd/http_handlers.go Прегледај датотеку

@@ -154,22 +154,22 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime
mux.HandleFunc("/api/refinement", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
snap := phaseSnap.Snapshot()
windowStats := buildWindowStats(snap.refinementInput.Windows)
windowStats := buildWindowStats(snap.refinement.Input.Windows)
out := map[string]any{
"plan": snap.refinementInput.Plan,
"windows": snap.refinementInput.Windows,
"plan": snap.refinement.Input.Plan,
"windows": snap.refinement.Input.Windows,
"window_stats": windowStats,
"queue_stats": snap.queueStats,
"candidates": len(snap.refinementInput.Candidates),
"scheduled": len(snap.refinementInput.Scheduled),
"signals": len(snap.refinement.Signals),
"decisions": len(snap.refinement.Decisions),
"decision_summary": summarizeDecisions(snap.refinement.Decisions),
"decision_items": compactDecisions(snap.refinement.Decisions),
"candidates": len(snap.refinement.Input.Candidates),
"scheduled": len(snap.refinement.Input.Scheduled),
"signals": len(snap.refinement.Result.Signals),
"decisions": len(snap.refinement.Result.Decisions),
"decision_summary": summarizeDecisions(snap.refinement.Result.Decisions),
"decision_items": compactDecisions(snap.refinement.Result.Decisions),
"surveillance_level": snap.surveillance.Level,
"surveillance_levels": snap.surveillance.Levels,
"display_level": snap.surveillance.DisplayLevel,
"refinement_level": snap.refinementInput.Level,
"refinement_level": snap.refinement.Input.Level,
"presentation_level": snap.presentation,
}
_ = json.NewEncoder(w).Encode(out)


+ 3
- 3
cmd/sdrd/phase_snapshot_test.go Прегледај датотеку

@@ -6,13 +6,13 @@ func TestPhaseSnapshotSetGet(t *testing.T) {
snap := &phaseSnapshot{}
state := phaseState{}
state.surveillance.NoiseFloor = -91
state.refinementInput.SampleRate = 2048000
state.refinement.Input.SampleRate = 2048000
snap.Set(state)
got := snap.Snapshot()
if got.surveillance.NoiseFloor != -91 {
t.Fatalf("unexpected noise floor: %v", got.surveillance.NoiseFloor)
}
if got.refinementInput.SampleRate != 2048000 {
t.Fatalf("unexpected sample rate: %v", got.refinementInput.SampleRate)
if got.refinement.Input.SampleRate != 2048000 {
t.Fatalf("unexpected sample rate: %v", got.refinement.Input.SampleRate)
}
}

+ 4
- 5
cmd/sdrd/phase_state.go Прегледај датотеку

@@ -3,9 +3,8 @@ package main
import "sdr-wideband-suite/internal/pipeline"

type phaseState struct {
surveillance pipeline.SurveillanceResult
refinementInput pipeline.RefinementInput
refinement pipeline.RefinementResult
queueStats decisionQueueStats
presentation pipeline.AnalysisLevel
surveillance pipeline.SurveillanceResult
refinement pipeline.RefinementStep
queueStats decisionQueueStats
presentation pipeline.AnalysisLevel
}

+ 11
- 9
cmd/sdrd/phase_state_test.go Прегледај датотеку

@@ -8,19 +8,21 @@ import (

func TestPhaseStateCarriesPhaseResults(t *testing.T) {
ps := &phaseState{
surveillance: pipeline.SurveillanceResult{Level: pipeline.AnalysisLevel{Name: "surveillance"}, NoiseFloor: -90, Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}},
refinementInput: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6},
refinement: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}},
queueStats: decisionQueueStats{RecordQueued: 1},
presentation: pipeline.AnalysisLevel{Name: "presentation"},
surveillance: pipeline.SurveillanceResult{Level: pipeline.AnalysisLevel{Name: "surveillance"}, NoiseFloor: -90, Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}},
refinement: pipeline.RefinementStep{
Input: pipeline.RefinementInput{Scheduled: []pipeline.ScheduledCandidate{{Candidate: pipeline.Candidate{ID: 1}, Priority: 5}}, SampleRate: 2048000, FFTSize: 2048, CenterHz: 7.1e6},
Result: pipeline.RefinementResult{Level: pipeline.AnalysisLevel{Name: "refinement"}, Decisions: []pipeline.SignalDecision{{ShouldRecord: true}}, Candidates: []pipeline.Candidate{{ID: 1}}},
},
queueStats: decisionQueueStats{RecordQueued: 1},
presentation: pipeline.AnalysisLevel{Name: "presentation"},
}
if ps.surveillance.NoiseFloor != -90 || len(ps.surveillance.Scheduled) != 1 {
t.Fatalf("unexpected surveillance state: %+v", ps.surveillance)
}
if len(ps.refinementInput.Scheduled) != 1 || ps.refinementInput.SampleRate != 2048000 {
t.Fatalf("unexpected refinement input: %+v", ps.refinementInput)
if len(ps.refinement.Input.Scheduled) != 1 || ps.refinement.Input.SampleRate != 2048000 {
t.Fatalf("unexpected refinement input: %+v", ps.refinement.Input)
}
if len(ps.refinement.Decisions) != 1 || !ps.refinement.Decisions[0].ShouldRecord || len(ps.refinement.Candidates) != 1 {
t.Fatalf("unexpected refinement state: %+v", ps.refinement)
if len(ps.refinement.Result.Decisions) != 1 || !ps.refinement.Result.Decisions[0].ShouldRecord || len(ps.refinement.Result.Candidates) != 1 {
t.Fatalf("unexpected refinement state: %+v", ps.refinement.Result)
}
}

+ 71
- 60
cmd/sdrd/pipeline_runtime.go Прегледај датотеку

@@ -45,14 +45,16 @@ type dspRuntime struct {
}

type spectrumArtifacts struct {
allIQ []complex64
iq []complex64
spectrum []float64
finished []detector.Event
detected []detector.Signal
thresholds []float64
noiseFloor float64
now time.Time
allIQ []complex64
surveillanceIQ []complex64
detailIQ []complex64
surveillanceSpectrum []float64
detailSpectrum []float64
finished []detector.Event
detected []detector.Signal
thresholds []float64
noiseFloor float64
now time.Time
}

func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime {
@@ -160,27 +162,27 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag
if rec != nil {
rec.Ingest(time.Now(), allIQ)
}
iq := allIQ
survIQ := allIQ
if len(allIQ) > rt.cfg.FFTSize {
iq = allIQ[len(allIQ)-rt.cfg.FFTSize:]
survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:]
}
if rt.dcEnabled {
dcBlocker.Apply(iq)
dcBlocker.Apply(survIQ)
}
if rt.iqEnabled {
dsp.IQBalance(iq)
dsp.IQBalance(survIQ)
}
var spectrum []float64
if rt.useGPU && rt.gpuEngine != nil {
gpuBuf := make([]complex64, len(iq))
if len(rt.window) == len(iq) {
for i := 0; i < len(iq); i++ {
v := iq[i]
gpuBuf := make([]complex64, len(survIQ))
if len(rt.window) == len(survIQ) {
for i := 0; i < len(survIQ); i++ {
v := survIQ[i]
w := float32(rt.window[i])
gpuBuf[i] = complex(real(v)*w, imag(v)*w)
}
} else {
copy(gpuBuf, iq)
copy(gpuBuf, survIQ)
}
out, err := rt.gpuEngine.Exec(gpuBuf)
if err != nil {
@@ -193,7 +195,7 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag
spectrum = fftutil.SpectrumFromFFT(out)
}
} else {
spectrum = fftutil.SpectrumWithPlan(iq, rt.window, rt.plan)
spectrum = fftutil.SpectrumWithPlan(survIQ, rt.window, rt.plan)
}
for i := range spectrum {
if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
@@ -203,14 +205,16 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag
now := time.Now()
finished, detected := rt.det.Process(now, spectrum, rt.cfg.CenterHz)
return &spectrumArtifacts{
allIQ: allIQ,
iq: iq,
spectrum: spectrum,
finished: finished,
detected: detected,
thresholds: rt.det.LastThresholds(),
noiseFloor: rt.det.LastNoiseFloor(),
now: now,
allIQ: allIQ,
surveillanceIQ: survIQ,
detailIQ: survIQ,
surveillanceSpectrum: spectrum,
detailSpectrum: spectrum,
finished: finished,
detected: detected,
thresholds: rt.det.LastThresholds(),
noiseFloor: rt.det.LastNoiseFloor(),
now: now,
}, nil
}

@@ -226,7 +230,7 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S
SampleRate: rt.cfg.SampleRate,
FFTSize: rt.cfg.Surveillance.AnalysisFFTSize,
CenterHz: rt.cfg.CenterHz,
SpanHz: float64(rt.cfg.SampleRate),
SpanHz: spanForPolicy(policy, float64(rt.cfg.SampleRate)),
Source: "baseband",
}
lowRate := rt.cfg.SampleRate / 2
@@ -242,7 +246,7 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S
SampleRate: lowRate,
FFTSize: lowFFT,
CenterHz: rt.cfg.CenterHz,
SpanHz: float64(lowRate),
SpanHz: spanForPolicy(policy, float64(lowRate)),
Source: "downsampled",
}
displayLevel := pipeline.AnalysisLevel{
@@ -250,13 +254,10 @@ func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.S
SampleRate: rt.cfg.SampleRate,
FFTSize: rt.cfg.Surveillance.DisplayBins,
CenterHz: rt.cfg.CenterHz,
SpanHz: float64(rt.cfg.SampleRate),
SpanHz: spanForPolicy(policy, float64(rt.cfg.SampleRate)),
Source: "display",
}
levels := []pipeline.AnalysisLevel{level}
if lowLevel.SampleRate != level.SampleRate || lowLevel.FFTSize != level.FFTSize {
levels = append(levels, lowLevel)
}
levels := surveillanceLevels(policy, level, lowLevel)
return pipeline.SurveillanceResult{
Level: level,
Levels: levels,
@@ -279,36 +280,14 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip
}
windows := make([]pipeline.RefinementWindow, 0, len(scheduled))
for _, sc := range scheduled {
span := sc.Candidate.BandwidthHz
windowSource := "candidate"
if policy.RefinementAutoSpan && (span <= 0 || span < 2000 || span > 400000) {
autoSpan, autoSource := pipeline.AutoSpanForHint(sc.Candidate.Hint)
if autoSpan > 0 {
span = autoSpan
windowSource = autoSource
}
}
if policy.RefinementMinSpanHz > 0 && span < policy.RefinementMinSpanHz {
span = policy.RefinementMinSpanHz
}
if policy.RefinementMaxSpanHz > 0 && span > policy.RefinementMaxSpanHz {
span = policy.RefinementMaxSpanHz
}
if span <= 0 {
span = 12000
}
windows = append(windows, pipeline.RefinementWindow{
CenterHz: sc.Candidate.CenterHz,
SpanHz: span,
Source: windowSource,
})
windows = append(windows, pipeline.RefinementWindowForCandidate(policy, sc.Candidate))
}
level := pipeline.AnalysisLevel{
Name: "refinement",
SampleRate: rt.cfg.SampleRate,
FFTSize: rt.cfg.FFTSize,
CenterHz: rt.cfg.CenterHz,
SpanHz: float64(rt.cfg.SampleRate),
SpanHz: spanForPolicy(policy, float64(rt.cfg.SampleRate)),
Source: "refinement-window",
}
input := pipeline.RefinementInput{
@@ -328,8 +307,14 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip
return input
}

func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep {
input := rt.buildRefinementInput(surv)
result := rt.refineSignals(art, input, extractMgr, rec)
return pipeline.RefinementStep{Input: input, Result: result}
}

func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.RefinementInput, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult {
if art == nil || len(art.iq) == 0 || len(input.Scheduled) == 0 {
if art == nil || len(art.detailIQ) == 0 || len(input.Scheduled) == 0 {
return pipeline.RefinementResult{}
}
policy := pipeline.PolicyFromConfig(rt.cfg)
@@ -360,8 +345,8 @@ func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.Refin
if centerHz == 0 {
centerHz = rt.cfg.CenterHz
}
snips, snipRates := extractSignalIQBatch(extractMgr, art.iq, sampleRate, centerHz, selectedSignals)
refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.spectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
snips, snipRates := extractSignalIQBatch(extractMgr, art.detailIQ, sampleRate, centerHz, selectedSignals)
refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.detailSpectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
signals := make([]detector.Signal, 0, len(refined))
decisions := make([]pipeline.SignalDecision, 0, len(refined))
for i, ref := range refined {
@@ -498,3 +483,29 @@ func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorde
_ = aqCfg
}
}

func spanForPolicy(policy pipeline.Policy, fallback float64) float64 {
if policy.MonitorSpanHz > 0 {
return policy.MonitorSpanHz
}
if policy.MonitorStartHz != 0 && policy.MonitorEndHz != 0 && policy.MonitorEndHz > policy.MonitorStartHz {
return policy.MonitorEndHz - policy.MonitorStartHz
}
return fallback
}

func surveillanceLevels(policy pipeline.Policy, primary pipeline.AnalysisLevel, secondary pipeline.AnalysisLevel) []pipeline.AnalysisLevel {
levels := []pipeline.AnalysisLevel{primary}
strategy := strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy))
switch strategy {
case "", "single-resolution":
if secondary.SampleRate != primary.SampleRate || secondary.FFTSize != primary.FFTSize {
levels = append(levels, secondary)
}
default:
if secondary.SampleRate != primary.SampleRate || secondary.FFTSize != primary.FFTSize {
levels = append(levels, secondary)
}
}
return levels
}

Loading…
Откажи
Сачувај