diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index 6264bbb..7696692 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -4,24 +4,16 @@ import ( "context" "encoding/json" "log" - "math" "os" "runtime/debug" "strings" "sync" - "sync/atomic" "time" - "sdr-wideband-suite/internal/classifier" "sdr-wideband-suite/internal/config" - "sdr-wideband-suite/internal/demod" "sdr-wideband-suite/internal/detector" "sdr-wideband-suite/internal/dsp" - fftutil "sdr-wideband-suite/internal/fft" - "sdr-wideband-suite/internal/fft/gpufft" - "sdr-wideband-suite/internal/rds" "sdr-wideband-suite/internal/recorder" - "sdr-wideband-suite/internal/pipeline" ) func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot, extractMgr *extractionManager) { @@ -30,49 +22,13 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack()) } }() + rt := newDSPRuntime(cfg, det, window, gpuState) ticker := time.NewTicker(cfg.FrameInterval()) defer ticker.Stop() logTicker := time.NewTicker(5 * time.Second) defer logTicker.Stop() enc := json.NewEncoder(eventFile) dcBlocker := dsp.NewDCBlocker(0.995) - dcEnabled := cfg.DCBlock - iqEnabled := cfg.IQBalance - plan := fftutil.NewCmplxPlan(cfg.FFTSize) - useGPU := cfg.UseGPUFFT - - // Persistent RDS decoders per signal — async ring-buffer based - type rdsState struct { - dec rds.Decoder - result rds.Result - lastDecode time.Time - busy int32 // atomic: 1 = goroutine running - mu sync.Mutex - } - rdsMap := map[int64]*rdsState{} - // Streaming extraction state: per-signal phase + IQ overlap for FIR halo - streamPhaseState := map[int64]*streamExtractState{} - streamOverlap := &streamIQOverlap{} - var gpuEngine *gpufft.Engine - if useGPU && gpuState != nil { - snap := gpuState.snapshot() - if snap.Available { - if eng, err := gpufft.New(cfg.FFTSize); err == nil { - gpuEngine = eng - gpuState.set(true, nil) - } else { - gpuState.set(false, err) - useGPU = false - } - } else { - gpuState.set(false, nil) - useGPU = false - } - } else if gpuState != nil { - gpuState.set(false, nil) - } - - gotSamples := false for { select { case <-ctx.Done(): @@ -81,290 +37,33 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * st := srcMgr.Stats() log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs) case upd := <-updates: - prevFFT := cfg.FFTSize - prevUseGPU := useGPU - cfg = upd.cfg - if rec != nil { - rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{ - Enabled: cfg.Recorder.Enabled, - MinSNRDb: cfg.Recorder.MinSNRDb, - MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second), - MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second), - PrerollMs: cfg.Recorder.PrerollMs, - RecordIQ: cfg.Recorder.RecordIQ, - RecordAudio: cfg.Recorder.RecordAudio, - AutoDemod: cfg.Recorder.AutoDemod, - AutoDecode: cfg.Recorder.AutoDecode, - MaxDiskMB: cfg.Recorder.MaxDiskMB, - OutputDir: cfg.Recorder.OutputDir, - ClassFilter: cfg.Recorder.ClassFilter, - RingSeconds: cfg.Recorder.RingSeconds, - DeemphasisUs: cfg.Recorder.DeemphasisUs, - ExtractionTaps: cfg.Recorder.ExtractionTaps, - ExtractionBwMult: cfg.Recorder.ExtractionBwMult, - }, cfg.CenterHz, buildDecoderMap(cfg)) - } - if upd.det != nil { - det = upd.det - } - if upd.window != nil { - window = upd.window - plan = fftutil.NewCmplxPlan(cfg.FFTSize) - } - dcEnabled = upd.dcBlock - iqEnabled = upd.iqBalance - if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU { - srcMgr.Flush() - gotSamples = false - if gpuEngine != nil { - gpuEngine.Close() - gpuEngine = nil - } - useGPU = cfg.UseGPUFFT - if useGPU && gpuState != nil { - snap := gpuState.snapshot() - if snap.Available { - if eng, err := gpufft.New(cfg.FFTSize); err == nil { - gpuEngine = eng - gpuState.set(true, nil) - } else { - gpuState.set(false, err) - useGPU = false - } - } else { - gpuState.set(false, nil) - useGPU = false - } - } else if gpuState != nil { - gpuState.set(false, nil) - } - } + rt.applyUpdate(upd, srcMgr, rec, gpuState) dcBlocker.Reset() - ticker.Reset(cfg.FrameInterval()) + ticker.Reset(rt.cfg.FrameInterval()) case <-ticker.C: - // Pipeline phases: - // 1) ingest IQ - // 2) build surveillance spectrum - // 3) detect coarse candidates - // 4) refine locally with IQ snippets + classification - // 5) update tracking / events / recorder / presentation - // Read all available IQ data — not just one FFT block. - // This ensures the ring buffer captures 100% of IQ for recording/demod. - available := cfg.FFTSize - st := srcMgr.Stats() - if st.BufferSamples > cfg.FFTSize { - // Round down to multiple of FFTSize for clean processing - available = (st.BufferSamples / cfg.FFTSize) * cfg.FFTSize - if available < cfg.FFTSize { - available = cfg.FFTSize - } - } - allIQ, err := srcMgr.ReadIQ(available) + art, err := rt.captureSpectrum(srcMgr, rec, dcBlocker, gpuState) if err != nil { log.Printf("read IQ: %v", err) if strings.Contains(err.Error(), "timeout") { - if err := srcMgr.Restart(cfg); err != nil { + if err := srcMgr.Restart(rt.cfg); err != nil { log.Printf("restart failed: %v", err) } } continue } - // Ingest ALL IQ data into the ring buffer for recording - if rec != nil { - rec.Ingest(time.Now(), allIQ) - } - // Use only the last FFT block for spectrum display - iq := allIQ - if len(allIQ) > cfg.FFTSize { - iq = allIQ[len(allIQ)-cfg.FFTSize:] - } - if !gotSamples { + if !rt.gotSamples { log.Printf("received IQ samples") - gotSamples = true - } - if dcEnabled { - dcBlocker.Apply(iq) - } - if iqEnabled { - dsp.IQBalance(iq) - } - var spectrum []float64 - if useGPU && gpuEngine != nil { - // GPU FFT: apply window to a COPY — allIQ must stay unmodified - // for extractForStreaming which needs raw IQ for signal extraction. - gpuBuf := make([]complex64, len(iq)) - if len(window) == len(iq) { - for i := 0; i < len(iq); i++ { - v := iq[i] - w := float32(window[i]) - gpuBuf[i] = complex(real(v)*w, imag(v)*w) - } - } else { - copy(gpuBuf, iq) - } - out, err := gpuEngine.Exec(gpuBuf) - if err != nil { - if gpuState != nil { - gpuState.set(false, err) - } - useGPU = false - spectrum = fftutil.SpectrumWithPlan(gpuBuf, nil, plan) - } else { - spectrum = fftutil.SpectrumFromFFT(out) - } - } else { - spectrum = fftutil.SpectrumWithPlan(iq, window, plan) + rt.gotSamples = true } - for i := range spectrum { - if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) { - spectrum[i] = -200 - } - } - now := time.Now() - finished, detectedSignals := det.Process(now, spectrum, cfg.CenterHz) - candidates := pipeline.CandidatesFromSignals(detectedSignals, "surveillance-detector") - thresholds := det.LastThresholds() - noiseFloor := det.LastNoiseFloor() + finished := art.finished + thresholds := art.thresholds + noiseFloor := art.noiseFloor var displaySignals []detector.Signal - if len(iq) > 0 { - snips, snipRates := extractSignalIQBatch(extractMgr, iq, cfg.SampleRate, cfg.CenterHz, detectedSignals) - refined := pipeline.RefineCandidates(candidates, spectrum, cfg.SampleRate, cfg.FFTSize, snips, snipRates, classifier.ClassifierMode(cfg.ClassifierMode)) - signals := make([]detector.Signal, 0, len(refined)) - for i, ref := range refined { - sig := ref.Signal - signals = append(signals, sig) - cls := sig.Class - snipRate := ref.SnippetRate - if cls != nil { - pll := classifier.PLLResult{} - if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 { - pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType) - cls.PLL = &pll - signals[i].PLL = &pll - if cls.ModType == classifier.ClassWFM && pll.Stereo { - cls.ModType = classifier.ClassWFMStereo - } - } - // RDS decode for WFM — async, uses ring buffer for continuous IQ - if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil { - keyHz := pll.ExactHz - if keyHz == 0 { - keyHz = signals[i].CenterHz - } - key := int64(math.Round(keyHz / 25000.0)) - st := rdsMap[key] - if st == nil { - st = &rdsState{} - rdsMap[key] = st - } - // Launch async decode every 4 seconds, skip if previous still running - if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 { - st.lastDecode = now - atomic.StoreInt32(&st.busy, 1) - go func(st *rdsState, sigHz float64) { - defer atomic.StoreInt32(&st.busy, 0) - ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0) - if len(ringIQ) < ringSR || ringSR <= 0 { - return - } - // Shift FM station to center - offset := sigHz - ringCenter - shifted := dsp.FreqShift(ringIQ, ringSR, offset) - - // Two-stage decimation to ~250kHz with proper anti-alias - // Stage 1: 4MHz → 1MHz (decim 4), LP at 400kHz - decim1 := ringSR / 1000000 - if decim1 < 1 { - decim1 = 1 - } - lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51) - f1 := dsp.ApplyFIR(shifted, lp1) - d1 := dsp.Decimate(f1, decim1) - rate1 := ringSR / decim1 - - // Stage 2: 1MHz → 250kHz (decim 4), LP at 100kHz - decim2 := rate1 / 250000 - if decim2 < 1 { - decim2 = 1 - } - lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101) - f2 := dsp.ApplyFIR(d1, lp2) - decimated := dsp.Decimate(f2, decim2) - actualRate := rate1 / decim2 - - // RDS baseband extraction on the clean decimated block - rdsBase := demod.RDSBasebandComplex(decimated, actualRate) - if len(rdsBase.Samples) == 0 { - return - } - st.mu.Lock() - result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate) - diag := st.dec.LastDiag - if result.PS != "" { - st.result = result - } - st.mu.Unlock() - log.Printf("RDS TRACE: ring decode freq=%.1fMHz decIQ=%d decSR=%d bbLen=%d bbRate=%d PI=%04X PS=%q %s", - sigHz/1e6, len(decimated), actualRate, len(rdsBase.Samples), rdsBase.SampleRate, - result.PI, result.PS, diag) - if result.PS != "" { - log.Printf("RDS decoded: PI=%04X PS=%q RT=%q freq=%.1fMHz", result.PI, result.PS, result.RT, sigHz/1e6) - } - }(st, signals[i].CenterHz) - } - // Read last known result (lock-free for display) - st.mu.Lock() - ps := st.result.PS - st.mu.Unlock() - if ps != "" { - pll.RDSStation = strings.TrimSpace(ps) - cls.PLL = &pll - signals[i].PLL = &pll - } - } - } - } - det.UpdateClasses(signals) - - // Cleanup RDS accumulators for signals that no longer exist - if len(rdsMap) > 0 { - activeIDs := make(map[int64]bool, len(signals)) - for _, s := range signals { - keyHz := s.CenterHz - if s.PLL != nil && s.PLL.ExactHz != 0 { - keyHz = s.PLL.ExactHz - } - activeIDs[int64(math.Round(keyHz/25000.0))] = true - } - for id := range rdsMap { - if !activeIDs[id] { - delete(rdsMap, id) - } - } - } - - // Cleanup streamPhaseState for disappeared signals - if len(streamPhaseState) > 0 { - sigIDs := make(map[int64]bool, len(signals)) - for _, s := range signals { - sigIDs[s.ID] = true - } - for id := range streamPhaseState { - if !sigIDs[id] { - delete(streamPhaseState, id) - } - } - } - - // GPU-extract signal snippets with phase-continuous FreqShift and - // IQ overlap for FIR halo. Heavy work on GPU, only demod runs async. - displaySignals = det.StableSignals() - if rec != nil && len(displaySignals) > 0 && len(allIQ) > 0 { - aqCfg := extractionConfig{ - firTaps: cfg.Recorder.ExtractionTaps, - bwMult: cfg.Recorder.ExtractionBwMult, - } - streamSnips, streamRates := extractForStreaming(extractMgr, allIQ, cfg.SampleRate, cfg.CenterHz, displaySignals, streamPhaseState, streamOverlap, aqCfg) + if len(art.iq) > 0 { + displaySignals = rt.refineSignals(art, extractMgr, rec) + if rec != nil && len(displaySignals) > 0 && len(art.allIQ) > 0 { + aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult} + streamSnips, streamRates := extractForStreaming(extractMgr, art.allIQ, rt.cfg.SampleRate, rt.cfg.CenterHz, displaySignals, rt.streamPhaseState, rt.streamOverlap, aqCfg) items := make([]recorder.StreamFeedItem, 0, len(displaySignals)) for j, ds := range displaySignals { if ds.ID == 0 || ds.Class == nil { @@ -373,23 +72,19 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * if j >= len(streamSnips) || len(streamSnips[j]) == 0 { continue } - snipRate := cfg.SampleRate + snipRate := rt.cfg.SampleRate if j < len(streamRates) && streamRates[j] > 0 { snipRate = streamRates[j] } - items = append(items, recorder.StreamFeedItem{ - Signal: ds, - Snippet: streamSnips[j], - SnipRate: snipRate, - }) + items = append(items, recorder.StreamFeedItem{Signal: ds, Snippet: streamSnips[j], SnipRate: snipRate}) } if len(items) > 0 { rec.FeedSnippets(items) } } + rt.maintenance(displaySignals, rec) } else { - // No IQ data this frame — still need displaySignals for broadcast - displaySignals = det.StableSignals() + displaySignals = rt.det.StableSignals() } if sigSnap != nil { @@ -427,7 +122,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } debugInfo = &SpectrumDebug{Thresholds: thresholds, NoiseFloor: noiseFloor, Scores: scoreDebug} } - h.broadcast(SpectrumFrame{Timestamp: now.UnixMilli(), CenterHz: cfg.CenterHz, SampleHz: cfg.SampleRate, FFTSize: cfg.FFTSize, Spectrum: spectrum, Signals: displaySignals, Debug: debugInfo}) + h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.spectrum, Signals: displaySignals, Debug: debugInfo}) } } } diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go new file mode 100644 index 0000000..6c65d72 --- /dev/null +++ b/cmd/sdrd/pipeline_runtime.go @@ -0,0 +1,355 @@ +package main + +import ( + "math" + "strings" + "sync" + "sync/atomic" + "time" + + "sdr-wideband-suite/internal/classifier" + "sdr-wideband-suite/internal/config" + "sdr-wideband-suite/internal/demod" + "sdr-wideband-suite/internal/detector" + "sdr-wideband-suite/internal/dsp" + fftutil "sdr-wideband-suite/internal/fft" + "sdr-wideband-suite/internal/fft/gpufft" + "sdr-wideband-suite/internal/pipeline" + "sdr-wideband-suite/internal/rds" + "sdr-wideband-suite/internal/recorder" +) + +type rdsState struct { + dec rds.Decoder + result rds.Result + lastDecode time.Time + busy int32 + mu sync.Mutex +} + +type dspRuntime struct { + cfg config.Config + det *detector.Detector + window []float64 + plan *fftutil.CmplxPlan + dcEnabled bool + iqEnabled bool + useGPU bool + gpuEngine *gpufft.Engine + rdsMap map[int64]*rdsState + streamPhaseState map[int64]*streamExtractState + streamOverlap *streamIQOverlap + gotSamples bool +} + +type spectrumArtifacts struct { + allIQ []complex64 + iq []complex64 + spectrum []float64 + finished []detector.Event + detected []detector.Signal + thresholds []float64 + noiseFloor float64 + now time.Time +} + +func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime { + rt := &dspRuntime{ + cfg: cfg, + det: det, + window: window, + plan: fftutil.NewCmplxPlan(cfg.FFTSize), + dcEnabled: cfg.DCBlock, + iqEnabled: cfg.IQBalance, + useGPU: cfg.UseGPUFFT, + rdsMap: map[int64]*rdsState{}, + streamPhaseState: map[int64]*streamExtractState{}, + streamOverlap: &streamIQOverlap{}, + } + if rt.useGPU && gpuState != nil { + snap := gpuState.snapshot() + if snap.Available { + if eng, err := gpufft.New(cfg.FFTSize); err == nil { + rt.gpuEngine = eng + gpuState.set(true, nil) + } else { + gpuState.set(false, err) + rt.useGPU = false + } + } + } + return rt +} + +func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) { + prevFFT := rt.cfg.FFTSize + prevUseGPU := rt.useGPU + rt.cfg = upd.cfg + if rec != nil { + rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{ + Enabled: rt.cfg.Recorder.Enabled, + MinSNRDb: rt.cfg.Recorder.MinSNRDb, + MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second), + MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second), + PrerollMs: rt.cfg.Recorder.PrerollMs, + RecordIQ: rt.cfg.Recorder.RecordIQ, + RecordAudio: rt.cfg.Recorder.RecordAudio, + AutoDemod: rt.cfg.Recorder.AutoDemod, + AutoDecode: rt.cfg.Recorder.AutoDecode, + MaxDiskMB: rt.cfg.Recorder.MaxDiskMB, + OutputDir: rt.cfg.Recorder.OutputDir, + ClassFilter: rt.cfg.Recorder.ClassFilter, + RingSeconds: rt.cfg.Recorder.RingSeconds, + DeemphasisUs: rt.cfg.Recorder.DeemphasisUs, + ExtractionTaps: rt.cfg.Recorder.ExtractionTaps, + ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult, + }, rt.cfg.CenterHz, buildDecoderMap(rt.cfg)) + } + if upd.det != nil { + rt.det = upd.det + } + if upd.window != nil { + rt.window = upd.window + rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize) + } + rt.dcEnabled = upd.dcBlock + rt.iqEnabled = upd.iqBalance + if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU { + srcMgr.Flush() + rt.gotSamples = false + if rt.gpuEngine != nil { + rt.gpuEngine.Close() + rt.gpuEngine = nil + } + rt.useGPU = rt.cfg.UseGPUFFT + if rt.useGPU && gpuState != nil { + snap := gpuState.snapshot() + if snap.Available { + if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil { + rt.gpuEngine = eng + gpuState.set(true, nil) + } else { + gpuState.set(false, err) + rt.useGPU = false + } + } else { + gpuState.set(false, nil) + rt.useGPU = false + } + } else if gpuState != nil { + gpuState.set(false, nil) + } + } +} + +func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) { + available := rt.cfg.FFTSize + st := srcMgr.Stats() + if st.BufferSamples > rt.cfg.FFTSize { + available = (st.BufferSamples / rt.cfg.FFTSize) * rt.cfg.FFTSize + if available < rt.cfg.FFTSize { + available = rt.cfg.FFTSize + } + } + allIQ, err := srcMgr.ReadIQ(available) + if err != nil { + return nil, err + } + if rec != nil { + rec.Ingest(time.Now(), allIQ) + } + iq := allIQ + if len(allIQ) > rt.cfg.FFTSize { + iq = allIQ[len(allIQ)-rt.cfg.FFTSize:] + } + if rt.dcEnabled { + dcBlocker.Apply(iq) + } + if rt.iqEnabled { + dsp.IQBalance(iq) + } + var spectrum []float64 + if rt.useGPU && rt.gpuEngine != nil { + gpuBuf := make([]complex64, len(iq)) + if len(rt.window) == len(iq) { + for i := 0; i < len(iq); i++ { + v := iq[i] + w := float32(rt.window[i]) + gpuBuf[i] = complex(real(v)*w, imag(v)*w) + } + } else { + copy(gpuBuf, iq) + } + out, err := rt.gpuEngine.Exec(gpuBuf) + if err != nil { + if gpuState != nil { + gpuState.set(false, err) + } + rt.useGPU = false + spectrum = fftutil.SpectrumWithPlan(gpuBuf, nil, rt.plan) + } else { + spectrum = fftutil.SpectrumFromFFT(out) + } + } else { + spectrum = fftutil.SpectrumWithPlan(iq, rt.window, rt.plan) + } + for i := range spectrum { + if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) { + spectrum[i] = -200 + } + } + now := time.Now() + finished, detected := rt.det.Process(now, spectrum, rt.cfg.CenterHz) + return &spectrumArtifacts{ + allIQ: allIQ, + iq: iq, + spectrum: spectrum, + finished: finished, + detected: detected, + thresholds: rt.det.LastThresholds(), + noiseFloor: rt.det.LastNoiseFloor(), + now: now, + }, nil +} + +func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, extractMgr *extractionManager, rec *recorder.Manager) []detector.Signal { + if art == nil || len(art.iq) == 0 { + return nil + } + policy := pipeline.PolicyFromConfig(rt.cfg) + candidates := pipeline.CandidatesFromSignals(art.detected, "surveillance-detector") + scheduled := pipeline.ScheduleCandidates(candidates, policy) + selected := make([]detector.Signal, 0, len(scheduled)) + for _, sc := range scheduled { + selected = append(selected, detector.Signal{ + ID: sc.Candidate.ID, + FirstBin: sc.Candidate.FirstBin, + LastBin: sc.Candidate.LastBin, + CenterHz: sc.Candidate.CenterHz, + BWHz: sc.Candidate.BandwidthHz, + PeakDb: sc.Candidate.PeakDb, + SNRDb: sc.Candidate.SNRDb, + NoiseDb: sc.Candidate.NoiseDb, + }) + } + snips, snipRates := extractSignalIQBatch(extractMgr, art.iq, rt.cfg.SampleRate, rt.cfg.CenterHz, selected) + refined := pipeline.RefineCandidates(pipeline.CandidatesFromSignals(selected, "scheduled-candidate"), art.spectrum, rt.cfg.SampleRate, rt.cfg.FFTSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode)) + signals := make([]detector.Signal, 0, len(refined)) + for i, ref := range refined { + sig := ref.Signal + signals = append(signals, sig) + cls := sig.Class + snipRate := ref.SnippetRate + if cls != nil { + pll := classifier.PLLResult{} + if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 { + pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType) + cls.PLL = &pll + signals[i].PLL = &pll + if cls.ModType == classifier.ClassWFM && pll.Stereo { + cls.ModType = classifier.ClassWFMStereo + } + } + if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil { + rt.updateRDS(art.now, rec, &signals[i], cls) + } + } + } + rt.det.UpdateClasses(signals) + return signals +} + +func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) { + if sig == nil || cls == nil { + return + } + keyHz := sig.CenterHz + if sig.PLL != nil && sig.PLL.ExactHz != 0 { + keyHz = sig.PLL.ExactHz + } + key := int64(math.Round(keyHz / 25000.0)) + st := rt.rdsMap[key] + if st == nil { + st = &rdsState{} + rt.rdsMap[key] = st + } + if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 { + st.lastDecode = now + atomic.StoreInt32(&st.busy, 1) + go func(st *rdsState, sigHz float64) { + defer atomic.StoreInt32(&st.busy, 0) + ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0) + if len(ringIQ) < ringSR || ringSR <= 0 { + return + } + offset := sigHz - ringCenter + shifted := dsp.FreqShift(ringIQ, ringSR, offset) + decim1 := ringSR / 1000000 + if decim1 < 1 { + decim1 = 1 + } + lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51) + f1 := dsp.ApplyFIR(shifted, lp1) + d1 := dsp.Decimate(f1, decim1) + rate1 := ringSR / decim1 + decim2 := rate1 / 250000 + if decim2 < 1 { + decim2 = 1 + } + lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101) + f2 := dsp.ApplyFIR(d1, lp2) + decimated := dsp.Decimate(f2, decim2) + actualRate := rate1 / decim2 + rdsBase := demod.RDSBasebandComplex(decimated, actualRate) + if len(rdsBase.Samples) == 0 { + return + } + st.mu.Lock() + result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate) + if result.PS != "" { + st.result = result + } + st.mu.Unlock() + }(st, sig.CenterHz) + } + st.mu.Lock() + ps := st.result.PS + st.mu.Unlock() + if ps != "" && sig.PLL != nil { + sig.PLL.RDSStation = strings.TrimSpace(ps) + cls.PLL = sig.PLL + } +} + +func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) { + if len(rt.rdsMap) > 0 { + activeIDs := make(map[int64]bool, len(displaySignals)) + for _, s := range displaySignals { + keyHz := s.CenterHz + if s.PLL != nil && s.PLL.ExactHz != 0 { + keyHz = s.PLL.ExactHz + } + activeIDs[int64(math.Round(keyHz/25000.0))] = true + } + for id := range rt.rdsMap { + if !activeIDs[id] { + delete(rt.rdsMap, id) + } + } + } + if len(rt.streamPhaseState) > 0 { + sigIDs := make(map[int64]bool, len(displaySignals)) + for _, s := range displaySignals { + sigIDs[s.ID] = true + } + for id := range rt.streamPhaseState { + if !sigIDs[id] { + delete(rt.streamPhaseState, id) + } + } + } + if rec != nil && len(displaySignals) > 0 { + aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult} + _ = aqCfg + } +} diff --git a/cmd/sdrd/pipeline_runtime_test.go b/cmd/sdrd/pipeline_runtime_test.go new file mode 100644 index 0000000..8ff9b77 --- /dev/null +++ b/cmd/sdrd/pipeline_runtime_test.go @@ -0,0 +1,44 @@ +package main + +import ( + "testing" + + "sdr-wideband-suite/internal/config" + "sdr-wideband-suite/internal/detector" + fftutil "sdr-wideband-suite/internal/fft" + "sdr-wideband-suite/internal/pipeline" +) + +func TestNewDSPRuntime(t *testing.T) { + cfg := config.Default() + det := detector.New(cfg.Detector, cfg.SampleRate, cfg.FFTSize) + window := fftutil.Hann(cfg.FFTSize) + rt := newDSPRuntime(cfg, det, window, &gpuStatus{}) + if rt == nil { + t.Fatalf("runtime is nil") + } + if rt.plan == nil { + t.Fatalf("fft plan is nil") + } + if rt.cfg.FFTSize != cfg.FFTSize { + t.Fatalf("unexpected fft size: %d", rt.cfg.FFTSize) + } +} + +func TestScheduledCandidateSelectionUsesPolicy(t *testing.T) { + cfg := config.Default() + cfg.Resources.MaxRefinementJobs = 1 + cfg.Refinement.MinCandidateSNRDb = 6 + policy := pipeline.PolicyFromConfig(cfg) + got := pipeline.ScheduleCandidates([]pipeline.Candidate{ + {ID: 1, SNRDb: 3, BandwidthHz: 1000}, + {ID: 2, SNRDb: 12, BandwidthHz: 5000}, + {ID: 3, SNRDb: 8, BandwidthHz: 7000}, + }, policy) + if len(got) != 1 { + t.Fatalf("expected 1 scheduled candidate, got %d", len(got)) + } + if got[0].Candidate.ID != 2 { + t.Fatalf("expected highest priority candidate, got %d", got[0].Candidate.ID) + } +} diff --git a/internal/pipeline/scheduler.go b/internal/pipeline/scheduler.go new file mode 100644 index 0000000..57b3ad5 --- /dev/null +++ b/internal/pipeline/scheduler.go @@ -0,0 +1,49 @@ +package pipeline + +import "sort" + +type ScheduledCandidate struct { + Candidate Candidate `json:"candidate"` + Priority float64 `json:"priority"` +} + +// ScheduleCandidates picks the most valuable candidates for costly local refinement. +// Current heuristic is intentionally simple and deterministic; later phases can add +// richer scoring (novelty, persistence, profile-aware band priorities, decoder value). +func ScheduleCandidates(candidates []Candidate, policy Policy) []ScheduledCandidate { + if len(candidates) == 0 { + return nil + } + out := make([]ScheduledCandidate, 0, len(candidates)) + for _, c := range candidates { + if c.SNRDb < policy.MinCandidateSNRDb { + continue + } + priority := c.SNRDb + if c.BandwidthHz > 0 { + priority += minFloat64(c.BandwidthHz/25000.0, 6) + } + if c.PeakDb > 0 { + priority += c.PeakDb / 20.0 + } + out = append(out, ScheduledCandidate{Candidate: c, Priority: priority}) + } + sort.Slice(out, func(i, j int) bool { + if out[i].Priority == out[j].Priority { + return out[i].Candidate.CenterHz < out[j].Candidate.CenterHz + } + return out[i].Priority > out[j].Priority + }) + limit := policy.MaxRefinementJobs + if limit <= 0 || limit > len(out) { + limit = len(out) + } + return out[:limit] +} + +func minFloat64(a, b float64) float64 { + if a < b { + return a + } + return b +} diff --git a/internal/pipeline/scheduler_test.go b/internal/pipeline/scheduler_test.go new file mode 100644 index 0000000..7dc060a --- /dev/null +++ b/internal/pipeline/scheduler_test.go @@ -0,0 +1,23 @@ +package pipeline + +import "testing" + +func TestScheduleCandidates(t *testing.T) { + policy := Policy{MaxRefinementJobs: 2, MinCandidateSNRDb: 5} + cands := []Candidate{ + {ID: 1, CenterHz: 100, SNRDb: 4, BandwidthHz: 10000, PeakDb: 1}, + {ID: 2, CenterHz: 200, SNRDb: 12, BandwidthHz: 50000, PeakDb: 3}, + {ID: 3, CenterHz: 300, SNRDb: 10, BandwidthHz: 25000, PeakDb: 2}, + {ID: 4, CenterHz: 400, SNRDb: 20, BandwidthHz: 100000, PeakDb: 5}, + } + got := ScheduleCandidates(cands, policy) + if len(got) != 2 { + t.Fatalf("expected 2 scheduled candidates, got %d", len(got)) + } + if got[0].Candidate.ID != 4 { + t.Fatalf("expected strongest candidate first, got id=%d", got[0].Candidate.ID) + } + if got[1].Candidate.ID != 2 { + t.Fatalf("expected next strongest candidate second, got id=%d", got[1].Candidate.ID) + } +}