瀏覽代碼

feat: add pipeline runtime and candidate scheduler

master
Jan Svabenik 10 小時之前
父節點
當前提交
e40f5825c0
共有 5 個文件被更改,包括 491 次插入325 次删除
  1. +20
    -325
      cmd/sdrd/dsp_loop.go
  2. +355
    -0
      cmd/sdrd/pipeline_runtime.go
  3. +44
    -0
      cmd/sdrd/pipeline_runtime_test.go
  4. +49
    -0
      internal/pipeline/scheduler.go
  5. +23
    -0
      internal/pipeline/scheduler_test.go

+ 20
- 325
cmd/sdrd/dsp_loop.go 查看文件

@@ -4,24 +4,16 @@ import (
"context"
"encoding/json"
"log"
"math"
"os"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"

"sdr-wideband-suite/internal/classifier"
"sdr-wideband-suite/internal/config"
"sdr-wideband-suite/internal/demod"
"sdr-wideband-suite/internal/detector"
"sdr-wideband-suite/internal/dsp"
fftutil "sdr-wideband-suite/internal/fft"
"sdr-wideband-suite/internal/fft/gpufft"
"sdr-wideband-suite/internal/rds"
"sdr-wideband-suite/internal/recorder"
"sdr-wideband-suite/internal/pipeline"
)

func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot, extractMgr *extractionManager) {
@@ -30,49 +22,13 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack())
}
}()
rt := newDSPRuntime(cfg, det, window, gpuState)
ticker := time.NewTicker(cfg.FrameInterval())
defer ticker.Stop()
logTicker := time.NewTicker(5 * time.Second)
defer logTicker.Stop()
enc := json.NewEncoder(eventFile)
dcBlocker := dsp.NewDCBlocker(0.995)
dcEnabled := cfg.DCBlock
iqEnabled := cfg.IQBalance
plan := fftutil.NewCmplxPlan(cfg.FFTSize)
useGPU := cfg.UseGPUFFT

// Persistent RDS decoders per signal — async ring-buffer based
type rdsState struct {
dec rds.Decoder
result rds.Result
lastDecode time.Time
busy int32 // atomic: 1 = goroutine running
mu sync.Mutex
}
rdsMap := map[int64]*rdsState{}
// Streaming extraction state: per-signal phase + IQ overlap for FIR halo
streamPhaseState := map[int64]*streamExtractState{}
streamOverlap := &streamIQOverlap{}
var gpuEngine *gpufft.Engine
if useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
useGPU = false
}
} else {
gpuState.set(false, nil)
useGPU = false
}
} else if gpuState != nil {
gpuState.set(false, nil)
}

gotSamples := false
for {
select {
case <-ctx.Done():
@@ -81,290 +37,33 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
st := srcMgr.Stats()
log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
case upd := <-updates:
prevFFT := cfg.FFTSize
prevUseGPU := useGPU
cfg = upd.cfg
if rec != nil {
rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
Enabled: cfg.Recorder.Enabled,
MinSNRDb: cfg.Recorder.MinSNRDb,
MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
PrerollMs: cfg.Recorder.PrerollMs,
RecordIQ: cfg.Recorder.RecordIQ,
RecordAudio: cfg.Recorder.RecordAudio,
AutoDemod: cfg.Recorder.AutoDemod,
AutoDecode: cfg.Recorder.AutoDecode,
MaxDiskMB: cfg.Recorder.MaxDiskMB,
OutputDir: cfg.Recorder.OutputDir,
ClassFilter: cfg.Recorder.ClassFilter,
RingSeconds: cfg.Recorder.RingSeconds,
DeemphasisUs: cfg.Recorder.DeemphasisUs,
ExtractionTaps: cfg.Recorder.ExtractionTaps,
ExtractionBwMult: cfg.Recorder.ExtractionBwMult,
}, cfg.CenterHz, buildDecoderMap(cfg))
}
if upd.det != nil {
det = upd.det
}
if upd.window != nil {
window = upd.window
plan = fftutil.NewCmplxPlan(cfg.FFTSize)
}
dcEnabled = upd.dcBlock
iqEnabled = upd.iqBalance
if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
srcMgr.Flush()
gotSamples = false
if gpuEngine != nil {
gpuEngine.Close()
gpuEngine = nil
}
useGPU = cfg.UseGPUFFT
if useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
useGPU = false
}
} else {
gpuState.set(false, nil)
useGPU = false
}
} else if gpuState != nil {
gpuState.set(false, nil)
}
}
rt.applyUpdate(upd, srcMgr, rec, gpuState)
dcBlocker.Reset()
ticker.Reset(cfg.FrameInterval())
ticker.Reset(rt.cfg.FrameInterval())
case <-ticker.C:
// Pipeline phases:
// 1) ingest IQ
// 2) build surveillance spectrum
// 3) detect coarse candidates
// 4) refine locally with IQ snippets + classification
// 5) update tracking / events / recorder / presentation
// Read all available IQ data — not just one FFT block.
// This ensures the ring buffer captures 100% of IQ for recording/demod.
available := cfg.FFTSize
st := srcMgr.Stats()
if st.BufferSamples > cfg.FFTSize {
// Round down to multiple of FFTSize for clean processing
available = (st.BufferSamples / cfg.FFTSize) * cfg.FFTSize
if available < cfg.FFTSize {
available = cfg.FFTSize
}
}
allIQ, err := srcMgr.ReadIQ(available)
art, err := rt.captureSpectrum(srcMgr, rec, dcBlocker, gpuState)
if err != nil {
log.Printf("read IQ: %v", err)
if strings.Contains(err.Error(), "timeout") {
if err := srcMgr.Restart(cfg); err != nil {
if err := srcMgr.Restart(rt.cfg); err != nil {
log.Printf("restart failed: %v", err)
}
}
continue
}
// Ingest ALL IQ data into the ring buffer for recording
if rec != nil {
rec.Ingest(time.Now(), allIQ)
}
// Use only the last FFT block for spectrum display
iq := allIQ
if len(allIQ) > cfg.FFTSize {
iq = allIQ[len(allIQ)-cfg.FFTSize:]
}
if !gotSamples {
if !rt.gotSamples {
log.Printf("received IQ samples")
gotSamples = true
}
if dcEnabled {
dcBlocker.Apply(iq)
}
if iqEnabled {
dsp.IQBalance(iq)
}
var spectrum []float64
if useGPU && gpuEngine != nil {
// GPU FFT: apply window to a COPY — allIQ must stay unmodified
// for extractForStreaming which needs raw IQ for signal extraction.
gpuBuf := make([]complex64, len(iq))
if len(window) == len(iq) {
for i := 0; i < len(iq); i++ {
v := iq[i]
w := float32(window[i])
gpuBuf[i] = complex(real(v)*w, imag(v)*w)
}
} else {
copy(gpuBuf, iq)
}
out, err := gpuEngine.Exec(gpuBuf)
if err != nil {
if gpuState != nil {
gpuState.set(false, err)
}
useGPU = false
spectrum = fftutil.SpectrumWithPlan(gpuBuf, nil, plan)
} else {
spectrum = fftutil.SpectrumFromFFT(out)
}
} else {
spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
rt.gotSamples = true
}
for i := range spectrum {
if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
spectrum[i] = -200
}
}
now := time.Now()
finished, detectedSignals := det.Process(now, spectrum, cfg.CenterHz)
candidates := pipeline.CandidatesFromSignals(detectedSignals, "surveillance-detector")
thresholds := det.LastThresholds()
noiseFloor := det.LastNoiseFloor()
finished := art.finished
thresholds := art.thresholds
noiseFloor := art.noiseFloor
var displaySignals []detector.Signal
if len(iq) > 0 {
snips, snipRates := extractSignalIQBatch(extractMgr, iq, cfg.SampleRate, cfg.CenterHz, detectedSignals)
refined := pipeline.RefineCandidates(candidates, spectrum, cfg.SampleRate, cfg.FFTSize, snips, snipRates, classifier.ClassifierMode(cfg.ClassifierMode))
signals := make([]detector.Signal, 0, len(refined))
for i, ref := range refined {
sig := ref.Signal
signals = append(signals, sig)
cls := sig.Class
snipRate := ref.SnippetRate
if cls != nil {
pll := classifier.PLLResult{}
if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
cls.PLL = &pll
signals[i].PLL = &pll
if cls.ModType == classifier.ClassWFM && pll.Stereo {
cls.ModType = classifier.ClassWFMStereo
}
}
// RDS decode for WFM — async, uses ring buffer for continuous IQ
if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil {
keyHz := pll.ExactHz
if keyHz == 0 {
keyHz = signals[i].CenterHz
}
key := int64(math.Round(keyHz / 25000.0))
st := rdsMap[key]
if st == nil {
st = &rdsState{}
rdsMap[key] = st
}
// Launch async decode every 4 seconds, skip if previous still running
if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
st.lastDecode = now
atomic.StoreInt32(&st.busy, 1)
go func(st *rdsState, sigHz float64) {
defer atomic.StoreInt32(&st.busy, 0)
ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
if len(ringIQ) < ringSR || ringSR <= 0 {
return
}
// Shift FM station to center
offset := sigHz - ringCenter
shifted := dsp.FreqShift(ringIQ, ringSR, offset)

// Two-stage decimation to ~250kHz with proper anti-alias
// Stage 1: 4MHz → 1MHz (decim 4), LP at 400kHz
decim1 := ringSR / 1000000
if decim1 < 1 {
decim1 = 1
}
lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
f1 := dsp.ApplyFIR(shifted, lp1)
d1 := dsp.Decimate(f1, decim1)
rate1 := ringSR / decim1

// Stage 2: 1MHz → 250kHz (decim 4), LP at 100kHz
decim2 := rate1 / 250000
if decim2 < 1 {
decim2 = 1
}
lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
f2 := dsp.ApplyFIR(d1, lp2)
decimated := dsp.Decimate(f2, decim2)
actualRate := rate1 / decim2

// RDS baseband extraction on the clean decimated block
rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
if len(rdsBase.Samples) == 0 {
return
}
st.mu.Lock()
result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
diag := st.dec.LastDiag
if result.PS != "" {
st.result = result
}
st.mu.Unlock()
log.Printf("RDS TRACE: ring decode freq=%.1fMHz decIQ=%d decSR=%d bbLen=%d bbRate=%d PI=%04X PS=%q %s",
sigHz/1e6, len(decimated), actualRate, len(rdsBase.Samples), rdsBase.SampleRate,
result.PI, result.PS, diag)
if result.PS != "" {
log.Printf("RDS decoded: PI=%04X PS=%q RT=%q freq=%.1fMHz", result.PI, result.PS, result.RT, sigHz/1e6)
}
}(st, signals[i].CenterHz)
}
// Read last known result (lock-free for display)
st.mu.Lock()
ps := st.result.PS
st.mu.Unlock()
if ps != "" {
pll.RDSStation = strings.TrimSpace(ps)
cls.PLL = &pll
signals[i].PLL = &pll
}
}
}
}
det.UpdateClasses(signals)

// Cleanup RDS accumulators for signals that no longer exist
if len(rdsMap) > 0 {
activeIDs := make(map[int64]bool, len(signals))
for _, s := range signals {
keyHz := s.CenterHz
if s.PLL != nil && s.PLL.ExactHz != 0 {
keyHz = s.PLL.ExactHz
}
activeIDs[int64(math.Round(keyHz/25000.0))] = true
}
for id := range rdsMap {
if !activeIDs[id] {
delete(rdsMap, id)
}
}
}

// Cleanup streamPhaseState for disappeared signals
if len(streamPhaseState) > 0 {
sigIDs := make(map[int64]bool, len(signals))
for _, s := range signals {
sigIDs[s.ID] = true
}
for id := range streamPhaseState {
if !sigIDs[id] {
delete(streamPhaseState, id)
}
}
}

// GPU-extract signal snippets with phase-continuous FreqShift and
// IQ overlap for FIR halo. Heavy work on GPU, only demod runs async.
displaySignals = det.StableSignals()
if rec != nil && len(displaySignals) > 0 && len(allIQ) > 0 {
aqCfg := extractionConfig{
firTaps: cfg.Recorder.ExtractionTaps,
bwMult: cfg.Recorder.ExtractionBwMult,
}
streamSnips, streamRates := extractForStreaming(extractMgr, allIQ, cfg.SampleRate, cfg.CenterHz, displaySignals, streamPhaseState, streamOverlap, aqCfg)
if len(art.iq) > 0 {
displaySignals = rt.refineSignals(art, extractMgr, rec)
if rec != nil && len(displaySignals) > 0 && len(art.allIQ) > 0 {
aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
streamSnips, streamRates := extractForStreaming(extractMgr, art.allIQ, rt.cfg.SampleRate, rt.cfg.CenterHz, displaySignals, rt.streamPhaseState, rt.streamOverlap, aqCfg)
items := make([]recorder.StreamFeedItem, 0, len(displaySignals))
for j, ds := range displaySignals {
if ds.ID == 0 || ds.Class == nil {
@@ -373,23 +72,19 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
if j >= len(streamSnips) || len(streamSnips[j]) == 0 {
continue
}
snipRate := cfg.SampleRate
snipRate := rt.cfg.SampleRate
if j < len(streamRates) && streamRates[j] > 0 {
snipRate = streamRates[j]
}
items = append(items, recorder.StreamFeedItem{
Signal: ds,
Snippet: streamSnips[j],
SnipRate: snipRate,
})
items = append(items, recorder.StreamFeedItem{Signal: ds, Snippet: streamSnips[j], SnipRate: snipRate})
}
if len(items) > 0 {
rec.FeedSnippets(items)
}
}
rt.maintenance(displaySignals, rec)
} else {
// No IQ data this frame — still need displaySignals for broadcast
displaySignals = det.StableSignals()
displaySignals = rt.det.StableSignals()
}

if sigSnap != nil {
@@ -427,7 +122,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
}
debugInfo = &SpectrumDebug{Thresholds: thresholds, NoiseFloor: noiseFloor, Scores: scoreDebug}
}
h.broadcast(SpectrumFrame{Timestamp: now.UnixMilli(), CenterHz: cfg.CenterHz, SampleHz: cfg.SampleRate, FFTSize: cfg.FFTSize, Spectrum: spectrum, Signals: displaySignals, Debug: debugInfo})
h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.spectrum, Signals: displaySignals, Debug: debugInfo})
}
}
}

+ 355
- 0
cmd/sdrd/pipeline_runtime.go 查看文件

@@ -0,0 +1,355 @@
package main

import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"sdr-wideband-suite/internal/classifier"
"sdr-wideband-suite/internal/config"
"sdr-wideband-suite/internal/demod"
"sdr-wideband-suite/internal/detector"
"sdr-wideband-suite/internal/dsp"
fftutil "sdr-wideband-suite/internal/fft"
"sdr-wideband-suite/internal/fft/gpufft"
"sdr-wideband-suite/internal/pipeline"
"sdr-wideband-suite/internal/rds"
"sdr-wideband-suite/internal/recorder"
)

type rdsState struct {
dec rds.Decoder
result rds.Result
lastDecode time.Time
busy int32
mu sync.Mutex
}

type dspRuntime struct {
cfg config.Config
det *detector.Detector
window []float64
plan *fftutil.CmplxPlan
dcEnabled bool
iqEnabled bool
useGPU bool
gpuEngine *gpufft.Engine
rdsMap map[int64]*rdsState
streamPhaseState map[int64]*streamExtractState
streamOverlap *streamIQOverlap
gotSamples bool
}

type spectrumArtifacts struct {
allIQ []complex64
iq []complex64
spectrum []float64
finished []detector.Event
detected []detector.Signal
thresholds []float64
noiseFloor float64
now time.Time
}

func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime {
rt := &dspRuntime{
cfg: cfg,
det: det,
window: window,
plan: fftutil.NewCmplxPlan(cfg.FFTSize),
dcEnabled: cfg.DCBlock,
iqEnabled: cfg.IQBalance,
useGPU: cfg.UseGPUFFT,
rdsMap: map[int64]*rdsState{},
streamPhaseState: map[int64]*streamExtractState{},
streamOverlap: &streamIQOverlap{},
}
if rt.useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
rt.gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
rt.useGPU = false
}
}
}
return rt
}

func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) {
prevFFT := rt.cfg.FFTSize
prevUseGPU := rt.useGPU
rt.cfg = upd.cfg
if rec != nil {
rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{
Enabled: rt.cfg.Recorder.Enabled,
MinSNRDb: rt.cfg.Recorder.MinSNRDb,
MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second),
MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second),
PrerollMs: rt.cfg.Recorder.PrerollMs,
RecordIQ: rt.cfg.Recorder.RecordIQ,
RecordAudio: rt.cfg.Recorder.RecordAudio,
AutoDemod: rt.cfg.Recorder.AutoDemod,
AutoDecode: rt.cfg.Recorder.AutoDecode,
MaxDiskMB: rt.cfg.Recorder.MaxDiskMB,
OutputDir: rt.cfg.Recorder.OutputDir,
ClassFilter: rt.cfg.Recorder.ClassFilter,
RingSeconds: rt.cfg.Recorder.RingSeconds,
DeemphasisUs: rt.cfg.Recorder.DeemphasisUs,
ExtractionTaps: rt.cfg.Recorder.ExtractionTaps,
ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult,
}, rt.cfg.CenterHz, buildDecoderMap(rt.cfg))
}
if upd.det != nil {
rt.det = upd.det
}
if upd.window != nil {
rt.window = upd.window
rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize)
}
rt.dcEnabled = upd.dcBlock
rt.iqEnabled = upd.iqBalance
if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU {
srcMgr.Flush()
rt.gotSamples = false
if rt.gpuEngine != nil {
rt.gpuEngine.Close()
rt.gpuEngine = nil
}
rt.useGPU = rt.cfg.UseGPUFFT
if rt.useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil {
rt.gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
rt.useGPU = false
}
} else {
gpuState.set(false, nil)
rt.useGPU = false
}
} else if gpuState != nil {
gpuState.set(false, nil)
}
}
}

func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) {
available := rt.cfg.FFTSize
st := srcMgr.Stats()
if st.BufferSamples > rt.cfg.FFTSize {
available = (st.BufferSamples / rt.cfg.FFTSize) * rt.cfg.FFTSize
if available < rt.cfg.FFTSize {
available = rt.cfg.FFTSize
}
}
allIQ, err := srcMgr.ReadIQ(available)
if err != nil {
return nil, err
}
if rec != nil {
rec.Ingest(time.Now(), allIQ)
}
iq := allIQ
if len(allIQ) > rt.cfg.FFTSize {
iq = allIQ[len(allIQ)-rt.cfg.FFTSize:]
}
if rt.dcEnabled {
dcBlocker.Apply(iq)
}
if rt.iqEnabled {
dsp.IQBalance(iq)
}
var spectrum []float64
if rt.useGPU && rt.gpuEngine != nil {
gpuBuf := make([]complex64, len(iq))
if len(rt.window) == len(iq) {
for i := 0; i < len(iq); i++ {
v := iq[i]
w := float32(rt.window[i])
gpuBuf[i] = complex(real(v)*w, imag(v)*w)
}
} else {
copy(gpuBuf, iq)
}
out, err := rt.gpuEngine.Exec(gpuBuf)
if err != nil {
if gpuState != nil {
gpuState.set(false, err)
}
rt.useGPU = false
spectrum = fftutil.SpectrumWithPlan(gpuBuf, nil, rt.plan)
} else {
spectrum = fftutil.SpectrumFromFFT(out)
}
} else {
spectrum = fftutil.SpectrumWithPlan(iq, rt.window, rt.plan)
}
for i := range spectrum {
if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
spectrum[i] = -200
}
}
now := time.Now()
finished, detected := rt.det.Process(now, spectrum, rt.cfg.CenterHz)
return &spectrumArtifacts{
allIQ: allIQ,
iq: iq,
spectrum: spectrum,
finished: finished,
detected: detected,
thresholds: rt.det.LastThresholds(),
noiseFloor: rt.det.LastNoiseFloor(),
now: now,
}, nil
}

func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, extractMgr *extractionManager, rec *recorder.Manager) []detector.Signal {
if art == nil || len(art.iq) == 0 {
return nil
}
policy := pipeline.PolicyFromConfig(rt.cfg)
candidates := pipeline.CandidatesFromSignals(art.detected, "surveillance-detector")
scheduled := pipeline.ScheduleCandidates(candidates, policy)
selected := make([]detector.Signal, 0, len(scheduled))
for _, sc := range scheduled {
selected = append(selected, detector.Signal{
ID: sc.Candidate.ID,
FirstBin: sc.Candidate.FirstBin,
LastBin: sc.Candidate.LastBin,
CenterHz: sc.Candidate.CenterHz,
BWHz: sc.Candidate.BandwidthHz,
PeakDb: sc.Candidate.PeakDb,
SNRDb: sc.Candidate.SNRDb,
NoiseDb: sc.Candidate.NoiseDb,
})
}
snips, snipRates := extractSignalIQBatch(extractMgr, art.iq, rt.cfg.SampleRate, rt.cfg.CenterHz, selected)
refined := pipeline.RefineCandidates(pipeline.CandidatesFromSignals(selected, "scheduled-candidate"), art.spectrum, rt.cfg.SampleRate, rt.cfg.FFTSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
signals := make([]detector.Signal, 0, len(refined))
for i, ref := range refined {
sig := ref.Signal
signals = append(signals, sig)
cls := sig.Class
snipRate := ref.SnippetRate
if cls != nil {
pll := classifier.PLLResult{}
if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
cls.PLL = &pll
signals[i].PLL = &pll
if cls.ModType == classifier.ClassWFM && pll.Stereo {
cls.ModType = classifier.ClassWFMStereo
}
}
if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil {
rt.updateRDS(art.now, rec, &signals[i], cls)
}
}
}
rt.det.UpdateClasses(signals)
return signals
}

func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) {
if sig == nil || cls == nil {
return
}
keyHz := sig.CenterHz
if sig.PLL != nil && sig.PLL.ExactHz != 0 {
keyHz = sig.PLL.ExactHz
}
key := int64(math.Round(keyHz / 25000.0))
st := rt.rdsMap[key]
if st == nil {
st = &rdsState{}
rt.rdsMap[key] = st
}
if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
st.lastDecode = now
atomic.StoreInt32(&st.busy, 1)
go func(st *rdsState, sigHz float64) {
defer atomic.StoreInt32(&st.busy, 0)
ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
if len(ringIQ) < ringSR || ringSR <= 0 {
return
}
offset := sigHz - ringCenter
shifted := dsp.FreqShift(ringIQ, ringSR, offset)
decim1 := ringSR / 1000000
if decim1 < 1 {
decim1 = 1
}
lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
f1 := dsp.ApplyFIR(shifted, lp1)
d1 := dsp.Decimate(f1, decim1)
rate1 := ringSR / decim1
decim2 := rate1 / 250000
if decim2 < 1 {
decim2 = 1
}
lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
f2 := dsp.ApplyFIR(d1, lp2)
decimated := dsp.Decimate(f2, decim2)
actualRate := rate1 / decim2
rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
if len(rdsBase.Samples) == 0 {
return
}
st.mu.Lock()
result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
if result.PS != "" {
st.result = result
}
st.mu.Unlock()
}(st, sig.CenterHz)
}
st.mu.Lock()
ps := st.result.PS
st.mu.Unlock()
if ps != "" && sig.PLL != nil {
sig.PLL.RDSStation = strings.TrimSpace(ps)
cls.PLL = sig.PLL
}
}

func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) {
if len(rt.rdsMap) > 0 {
activeIDs := make(map[int64]bool, len(displaySignals))
for _, s := range displaySignals {
keyHz := s.CenterHz
if s.PLL != nil && s.PLL.ExactHz != 0 {
keyHz = s.PLL.ExactHz
}
activeIDs[int64(math.Round(keyHz/25000.0))] = true
}
for id := range rt.rdsMap {
if !activeIDs[id] {
delete(rt.rdsMap, id)
}
}
}
if len(rt.streamPhaseState) > 0 {
sigIDs := make(map[int64]bool, len(displaySignals))
for _, s := range displaySignals {
sigIDs[s.ID] = true
}
for id := range rt.streamPhaseState {
if !sigIDs[id] {
delete(rt.streamPhaseState, id)
}
}
}
if rec != nil && len(displaySignals) > 0 {
aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
_ = aqCfg
}
}

+ 44
- 0
cmd/sdrd/pipeline_runtime_test.go 查看文件

@@ -0,0 +1,44 @@
package main

import (
"testing"

"sdr-wideband-suite/internal/config"
"sdr-wideband-suite/internal/detector"
fftutil "sdr-wideband-suite/internal/fft"
"sdr-wideband-suite/internal/pipeline"
)

func TestNewDSPRuntime(t *testing.T) {
cfg := config.Default()
det := detector.New(cfg.Detector, cfg.SampleRate, cfg.FFTSize)
window := fftutil.Hann(cfg.FFTSize)
rt := newDSPRuntime(cfg, det, window, &gpuStatus{})
if rt == nil {
t.Fatalf("runtime is nil")
}
if rt.plan == nil {
t.Fatalf("fft plan is nil")
}
if rt.cfg.FFTSize != cfg.FFTSize {
t.Fatalf("unexpected fft size: %d", rt.cfg.FFTSize)
}
}

func TestScheduledCandidateSelectionUsesPolicy(t *testing.T) {
cfg := config.Default()
cfg.Resources.MaxRefinementJobs = 1
cfg.Refinement.MinCandidateSNRDb = 6
policy := pipeline.PolicyFromConfig(cfg)
got := pipeline.ScheduleCandidates([]pipeline.Candidate{
{ID: 1, SNRDb: 3, BandwidthHz: 1000},
{ID: 2, SNRDb: 12, BandwidthHz: 5000},
{ID: 3, SNRDb: 8, BandwidthHz: 7000},
}, policy)
if len(got) != 1 {
t.Fatalf("expected 1 scheduled candidate, got %d", len(got))
}
if got[0].Candidate.ID != 2 {
t.Fatalf("expected highest priority candidate, got %d", got[0].Candidate.ID)
}
}

+ 49
- 0
internal/pipeline/scheduler.go 查看文件

@@ -0,0 +1,49 @@
package pipeline

import "sort"

type ScheduledCandidate struct {
Candidate Candidate `json:"candidate"`
Priority float64 `json:"priority"`
}

// ScheduleCandidates picks the most valuable candidates for costly local refinement.
// Current heuristic is intentionally simple and deterministic; later phases can add
// richer scoring (novelty, persistence, profile-aware band priorities, decoder value).
func ScheduleCandidates(candidates []Candidate, policy Policy) []ScheduledCandidate {
if len(candidates) == 0 {
return nil
}
out := make([]ScheduledCandidate, 0, len(candidates))
for _, c := range candidates {
if c.SNRDb < policy.MinCandidateSNRDb {
continue
}
priority := c.SNRDb
if c.BandwidthHz > 0 {
priority += minFloat64(c.BandwidthHz/25000.0, 6)
}
if c.PeakDb > 0 {
priority += c.PeakDb / 20.0
}
out = append(out, ScheduledCandidate{Candidate: c, Priority: priority})
}
sort.Slice(out, func(i, j int) bool {
if out[i].Priority == out[j].Priority {
return out[i].Candidate.CenterHz < out[j].Candidate.CenterHz
}
return out[i].Priority > out[j].Priority
})
limit := policy.MaxRefinementJobs
if limit <= 0 || limit > len(out) {
limit = len(out)
}
return out[:limit]
}

func minFloat64(a, b float64) float64 {
if a < b {
return a
}
return b
}

+ 23
- 0
internal/pipeline/scheduler_test.go 查看文件

@@ -0,0 +1,23 @@
package pipeline

import "testing"

func TestScheduleCandidates(t *testing.T) {
policy := Policy{MaxRefinementJobs: 2, MinCandidateSNRDb: 5}
cands := []Candidate{
{ID: 1, CenterHz: 100, SNRDb: 4, BandwidthHz: 10000, PeakDb: 1},
{ID: 2, CenterHz: 200, SNRDb: 12, BandwidthHz: 50000, PeakDb: 3},
{ID: 3, CenterHz: 300, SNRDb: 10, BandwidthHz: 25000, PeakDb: 2},
{ID: 4, CenterHz: 400, SNRDb: 20, BandwidthHz: 100000, PeakDb: 5},
}
got := ScheduleCandidates(cands, policy)
if len(got) != 2 {
t.Fatalf("expected 2 scheduled candidates, got %d", len(got))
}
if got[0].Candidate.ID != 4 {
t.Fatalf("expected strongest candidate first, got id=%d", got[0].Candidate.ID)
}
if got[1].Candidate.ID != 2 {
t.Fatalf("expected next strongest candidate second, got id=%d", got[1].Candidate.ID)
}
}

Loading…
取消
儲存