package main import ( "math" "strings" "sync" "sync/atomic" "time" "sdr-wideband-suite/internal/classifier" "sdr-wideband-suite/internal/config" "sdr-wideband-suite/internal/demod" "sdr-wideband-suite/internal/detector" "sdr-wideband-suite/internal/dsp" fftutil "sdr-wideband-suite/internal/fft" "sdr-wideband-suite/internal/fft/gpufft" "sdr-wideband-suite/internal/pipeline" "sdr-wideband-suite/internal/rds" "sdr-wideband-suite/internal/recorder" ) type rdsState struct { dec rds.Decoder result rds.Result lastDecode time.Time busy int32 mu sync.Mutex } type dspRuntime struct { cfg config.Config det *detector.Detector window []float64 plan *fftutil.CmplxPlan dcEnabled bool iqEnabled bool useGPU bool gpuEngine *gpufft.Engine rdsMap map[int64]*rdsState streamPhaseState map[int64]*streamExtractState streamOverlap *streamIQOverlap gotSamples bool } type spectrumArtifacts struct { allIQ []complex64 iq []complex64 spectrum []float64 finished []detector.Event detected []detector.Signal thresholds []float64 noiseFloor float64 now time.Time } func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime { rt := &dspRuntime{ cfg: cfg, det: det, window: window, plan: fftutil.NewCmplxPlan(cfg.FFTSize), dcEnabled: cfg.DCBlock, iqEnabled: cfg.IQBalance, useGPU: cfg.UseGPUFFT, rdsMap: map[int64]*rdsState{}, streamPhaseState: map[int64]*streamExtractState{}, streamOverlap: &streamIQOverlap{}, } if rt.useGPU && gpuState != nil { snap := gpuState.snapshot() if snap.Available { if eng, err := gpufft.New(cfg.FFTSize); err == nil { rt.gpuEngine = eng gpuState.set(true, nil) } else { gpuState.set(false, err) rt.useGPU = false } } } return rt } func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) { prevFFT := rt.cfg.FFTSize prevUseGPU := rt.useGPU rt.cfg = upd.cfg if rec != nil { rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{ Enabled: rt.cfg.Recorder.Enabled, MinSNRDb: rt.cfg.Recorder.MinSNRDb, MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second), MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second), PrerollMs: rt.cfg.Recorder.PrerollMs, RecordIQ: rt.cfg.Recorder.RecordIQ, RecordAudio: rt.cfg.Recorder.RecordAudio, AutoDemod: rt.cfg.Recorder.AutoDemod, AutoDecode: rt.cfg.Recorder.AutoDecode, MaxDiskMB: rt.cfg.Recorder.MaxDiskMB, OutputDir: rt.cfg.Recorder.OutputDir, ClassFilter: rt.cfg.Recorder.ClassFilter, RingSeconds: rt.cfg.Recorder.RingSeconds, DeemphasisUs: rt.cfg.Recorder.DeemphasisUs, ExtractionTaps: rt.cfg.Recorder.ExtractionTaps, ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult, }, rt.cfg.CenterHz, buildDecoderMap(rt.cfg)) } if upd.det != nil { rt.det = upd.det } if upd.window != nil { rt.window = upd.window rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize) } rt.dcEnabled = upd.dcBlock rt.iqEnabled = upd.iqBalance if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU { srcMgr.Flush() rt.gotSamples = false if rt.gpuEngine != nil { rt.gpuEngine.Close() rt.gpuEngine = nil } rt.useGPU = rt.cfg.UseGPUFFT if rt.useGPU && gpuState != nil { snap := gpuState.snapshot() if snap.Available { if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil { rt.gpuEngine = eng gpuState.set(true, nil) } else { gpuState.set(false, err) rt.useGPU = false } } else { gpuState.set(false, nil) rt.useGPU = false } } else if gpuState != nil { gpuState.set(false, nil) } } } func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) { available := rt.cfg.FFTSize st := srcMgr.Stats() if st.BufferSamples > rt.cfg.FFTSize { available = (st.BufferSamples / rt.cfg.FFTSize) * rt.cfg.FFTSize if available < rt.cfg.FFTSize { available = rt.cfg.FFTSize } } allIQ, err := srcMgr.ReadIQ(available) if err != nil { return nil, err } if rec != nil { rec.Ingest(time.Now(), allIQ) } iq := allIQ if len(allIQ) > rt.cfg.FFTSize { iq = allIQ[len(allIQ)-rt.cfg.FFTSize:] } if rt.dcEnabled { dcBlocker.Apply(iq) } if rt.iqEnabled { dsp.IQBalance(iq) } var spectrum []float64 if rt.useGPU && rt.gpuEngine != nil { gpuBuf := make([]complex64, len(iq)) if len(rt.window) == len(iq) { for i := 0; i < len(iq); i++ { v := iq[i] w := float32(rt.window[i]) gpuBuf[i] = complex(real(v)*w, imag(v)*w) } } else { copy(gpuBuf, iq) } out, err := rt.gpuEngine.Exec(gpuBuf) if err != nil { if gpuState != nil { gpuState.set(false, err) } rt.useGPU = false spectrum = fftutil.SpectrumWithPlan(gpuBuf, nil, rt.plan) } else { spectrum = fftutil.SpectrumFromFFT(out) } } else { spectrum = fftutil.SpectrumWithPlan(iq, rt.window, rt.plan) } for i := range spectrum { if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) { spectrum[i] = -200 } } now := time.Now() finished, detected := rt.det.Process(now, spectrum, rt.cfg.CenterHz) return &spectrumArtifacts{ allIQ: allIQ, iq: iq, spectrum: spectrum, finished: finished, detected: detected, thresholds: rt.det.LastThresholds(), noiseFloor: rt.det.LastNoiseFloor(), now: now, }, nil } func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, extractMgr *extractionManager, rec *recorder.Manager) []detector.Signal { if art == nil || len(art.iq) == 0 { return nil } policy := pipeline.PolicyFromConfig(rt.cfg) candidates := pipeline.CandidatesFromSignals(art.detected, "surveillance-detector") scheduled := pipeline.ScheduleCandidates(candidates, policy) selectedCandidates := make([]pipeline.Candidate, 0, len(scheduled)) selectedSignals := make([]detector.Signal, 0, len(scheduled)) for _, sc := range scheduled { selectedCandidates = append(selectedCandidates, sc.Candidate) selectedSignals = append(selectedSignals, detector.Signal{ ID: sc.Candidate.ID, FirstBin: sc.Candidate.FirstBin, LastBin: sc.Candidate.LastBin, CenterHz: sc.Candidate.CenterHz, BWHz: sc.Candidate.BandwidthHz, PeakDb: sc.Candidate.PeakDb, SNRDb: sc.Candidate.SNRDb, NoiseDb: sc.Candidate.NoiseDb, }) } snips, snipRates := extractSignalIQBatch(extractMgr, art.iq, rt.cfg.SampleRate, rt.cfg.CenterHz, selectedSignals) refined := pipeline.RefineCandidates(selectedCandidates, art.spectrum, rt.cfg.SampleRate, rt.cfg.FFTSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode)) signals := make([]detector.Signal, 0, len(refined)) for i, ref := range refined { sig := ref.Signal signals = append(signals, sig) cls := sig.Class snipRate := ref.SnippetRate decision := pipeline.DecideSignalAction(policy, ref.Candidate, cls) if decision.ShouldAutoDecode && rec != nil { rt.cfg.Recorder.AutoDecode = true } if decision.ShouldRecord && rec != nil { rt.cfg.Recorder.Enabled = true } if cls != nil { pll := classifier.PLLResult{} if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 { pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType) cls.PLL = &pll signals[i].PLL = &pll if cls.ModType == classifier.ClassWFM && pll.Stereo { cls.ModType = classifier.ClassWFMStereo } } if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil { rt.updateRDS(art.now, rec, &signals[i], cls) } } } rt.det.UpdateClasses(signals) return signals } func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) { if sig == nil || cls == nil { return } keyHz := sig.CenterHz if sig.PLL != nil && sig.PLL.ExactHz != 0 { keyHz = sig.PLL.ExactHz } key := int64(math.Round(keyHz / 25000.0)) st := rt.rdsMap[key] if st == nil { st = &rdsState{} rt.rdsMap[key] = st } if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 { st.lastDecode = now atomic.StoreInt32(&st.busy, 1) go func(st *rdsState, sigHz float64) { defer atomic.StoreInt32(&st.busy, 0) ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0) if len(ringIQ) < ringSR || ringSR <= 0 { return } offset := sigHz - ringCenter shifted := dsp.FreqShift(ringIQ, ringSR, offset) decim1 := ringSR / 1000000 if decim1 < 1 { decim1 = 1 } lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51) f1 := dsp.ApplyFIR(shifted, lp1) d1 := dsp.Decimate(f1, decim1) rate1 := ringSR / decim1 decim2 := rate1 / 250000 if decim2 < 1 { decim2 = 1 } lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101) f2 := dsp.ApplyFIR(d1, lp2) decimated := dsp.Decimate(f2, decim2) actualRate := rate1 / decim2 rdsBase := demod.RDSBasebandComplex(decimated, actualRate) if len(rdsBase.Samples) == 0 { return } st.mu.Lock() result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate) if result.PS != "" { st.result = result } st.mu.Unlock() }(st, sig.CenterHz) } st.mu.Lock() ps := st.result.PS st.mu.Unlock() if ps != "" && sig.PLL != nil { sig.PLL.RDSStation = strings.TrimSpace(ps) cls.PLL = sig.PLL } } func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) { if len(rt.rdsMap) > 0 { activeIDs := make(map[int64]bool, len(displaySignals)) for _, s := range displaySignals { keyHz := s.CenterHz if s.PLL != nil && s.PLL.ExactHz != 0 { keyHz = s.PLL.ExactHz } activeIDs[int64(math.Round(keyHz/25000.0))] = true } for id := range rt.rdsMap { if !activeIDs[id] { delete(rt.rdsMap, id) } } } if len(rt.streamPhaseState) > 0 { sigIDs := make(map[int64]bool, len(displaySignals)) for _, s := range displaySignals { sigIDs[s.ID] = true } for id := range rt.streamPhaseState { if !sigIDs[id] { delete(rt.streamPhaseState, id) } } } if rec != nil && len(displaySignals) > 0 { aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult} _ = aqCfg } }