diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go new file mode 100644 index 0000000..76a5989 --- /dev/null +++ b/cmd/sdrd/dsp_loop.go @@ -0,0 +1,225 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "math" + "os" + "runtime/debug" + "strings" + "sync" + "time" + + "sdr-visual-suite/internal/classifier" + "sdr-visual-suite/internal/config" + "sdr-visual-suite/internal/detector" + "sdr-visual-suite/internal/dsp" + fftutil "sdr-visual-suite/internal/fft" + "sdr-visual-suite/internal/fft/gpufft" + "sdr-visual-suite/internal/recorder" +) + +func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) { + defer func() { + if r := recover(); r != nil { + log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack()) + } + }() + ticker := time.NewTicker(cfg.FrameInterval()) + defer ticker.Stop() + logTicker := time.NewTicker(5 * time.Second) + defer logTicker.Stop() + enc := json.NewEncoder(eventFile) + dcBlocker := dsp.NewDCBlocker(0.995) + dcEnabled := cfg.DCBlock + iqEnabled := cfg.IQBalance + plan := fftutil.NewCmplxPlan(cfg.FFTSize) + useGPU := cfg.UseGPUFFT + var gpuEngine *gpufft.Engine + if useGPU && gpuState != nil { + snap := gpuState.snapshot() + if snap.Available { + if eng, err := gpufft.New(cfg.FFTSize); err == nil { + gpuEngine = eng + gpuState.set(true, nil) + } else { + gpuState.set(false, err) + useGPU = false + } + } else { + gpuState.set(false, nil) + useGPU = false + } + } else if gpuState != nil { + gpuState.set(false, nil) + } + + gotSamples := false + for { + select { + case <-ctx.Done(): + return + case <-logTicker.C: + st := srcMgr.Stats() + log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs) + case upd := <-updates: + prevFFT := cfg.FFTSize + prevUseGPU := useGPU + cfg = upd.cfg + if rec != nil { + rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{ + Enabled: cfg.Recorder.Enabled, + MinSNRDb: cfg.Recorder.MinSNRDb, + MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second), + MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second), + PrerollMs: cfg.Recorder.PrerollMs, + RecordIQ: cfg.Recorder.RecordIQ, + RecordAudio: cfg.Recorder.RecordAudio, + AutoDemod: cfg.Recorder.AutoDemod, + AutoDecode: cfg.Recorder.AutoDecode, + MaxDiskMB: cfg.Recorder.MaxDiskMB, + OutputDir: cfg.Recorder.OutputDir, + ClassFilter: cfg.Recorder.ClassFilter, + RingSeconds: cfg.Recorder.RingSeconds, + }, cfg.CenterHz, buildDecoderMap(cfg)) + } + if upd.det != nil { + det = upd.det + } + if upd.window != nil { + window = upd.window + plan = fftutil.NewCmplxPlan(cfg.FFTSize) + } + dcEnabled = upd.dcBlock + iqEnabled = upd.iqBalance + if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU { + srcMgr.Flush() + gotSamples = false + if gpuEngine != nil { + gpuEngine.Close() + gpuEngine = nil + } + useGPU = cfg.UseGPUFFT + if useGPU && gpuState != nil { + snap := gpuState.snapshot() + if snap.Available { + if eng, err := gpufft.New(cfg.FFTSize); err == nil { + gpuEngine = eng + gpuState.set(true, nil) + } else { + gpuState.set(false, err) + useGPU = false + } + } else { + gpuState.set(false, nil) + useGPU = false + } + } else if gpuState != nil { + gpuState.set(false, nil) + } + } + dcBlocker.Reset() + ticker.Reset(cfg.FrameInterval()) + case <-ticker.C: + iq, err := srcMgr.ReadIQ(cfg.FFTSize) + if err != nil { + log.Printf("read IQ: %v", err) + if strings.Contains(err.Error(), "timeout") { + if err := srcMgr.Restart(cfg); err != nil { + log.Printf("restart failed: %v", err) + } + } + continue + } + if rec != nil { + rec.Ingest(time.Now(), iq) + } + if !gotSamples { + log.Printf("received IQ samples") + gotSamples = true + } + if dcEnabled { + dcBlocker.Apply(iq) + } + if iqEnabled { + dsp.IQBalance(iq) + } + var spectrum []float64 + if useGPU && gpuEngine != nil { + if len(window) == len(iq) { + for i := 0; i < len(iq); i++ { + v := iq[i] + w := float32(window[i]) + iq[i] = complex(real(v)*w, imag(v)*w) + } + } + out, err := gpuEngine.Exec(iq) + if err != nil { + if gpuState != nil { + gpuState.set(false, err) + } + useGPU = false + spectrum = fftutil.SpectrumWithPlan(iq, nil, plan) + } else { + spectrum = fftutil.SpectrumFromFFT(out) + } + } else { + spectrum = fftutil.SpectrumWithPlan(iq, window, plan) + } + for i := range spectrum { + if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) { + spectrum[i] = -200 + } + } + now := time.Now() + finished, signals := det.Process(now, spectrum, cfg.CenterHz) + thresholds := det.LastThresholds() + noiseFloor := det.LastNoiseFloor() + if len(iq) > 0 { + for i := range signals { + snip := extractSignalIQ(iq, cfg.SampleRate, cfg.CenterHz, signals[i].CenterHz, signals[i].BWHz) + cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, snip) + signals[i].Class = cls + } + det.UpdateClasses(signals) + } + if sigSnap != nil { + sigSnap.set(signals) + } + eventMu.Lock() + for _, ev := range finished { + _ = enc.Encode(ev) + } + eventMu.Unlock() + if rec != nil && len(finished) > 0 { + evCopy := make([]detector.Event, len(finished)) + copy(evCopy, finished) + go rec.OnEvents(evCopy) + } + var debugInfo *SpectrumDebug + if len(thresholds) > 0 || len(signals) > 0 || noiseFloor != 0 { + scoreDebug := make([]map[string]any, 0, len(signals)) + for _, s := range signals { + if s.Class == nil || len(s.Class.Scores) == 0 { + scoreDebug = append(scoreDebug, map[string]any{"center_hz": s.CenterHz, "class": nil}) + continue + } + scores := make(map[string]float64, len(s.Class.Scores)) + for k, v := range s.Class.Scores { + scores[string(k)] = v + } + scoreDebug = append(scoreDebug, map[string]any{ + "center_hz": s.CenterHz, + "mod_type": s.Class.ModType, + "confidence": s.Class.Confidence, + "second_best": s.Class.SecondBest, + "scores": scores, + }) + } + debugInfo = &SpectrumDebug{Thresholds: thresholds, NoiseFloor: noiseFloor, Scores: scoreDebug} + } + h.broadcast(SpectrumFrame{Timestamp: now.UnixMilli(), CenterHz: cfg.CenterHz, SampleHz: cfg.SampleRate, FFTSize: cfg.FFTSize, Spectrum: spectrum, Signals: signals, Debug: debugInfo}) + } + } +} diff --git a/cmd/sdrd/helpers.go b/cmd/sdrd/helpers.go new file mode 100644 index 0000000..5f27563 --- /dev/null +++ b/cmd/sdrd/helpers.go @@ -0,0 +1,92 @@ +package main + +import ( + "sort" + "strconv" + "time" + + "sdr-visual-suite/internal/config" + "sdr-visual-suite/internal/dsp" +) + +func mustParseDuration(raw string, fallback time.Duration) time.Duration { + if raw == "" { + return fallback + } + if d, err := time.ParseDuration(raw); err == nil { + return d + } + return fallback +} + +func buildDecoderMap(cfg config.Config) map[string]string { + out := map[string]string{} + if cfg.Decoder.FT8Cmd != "" { + out["FT8"] = cfg.Decoder.FT8Cmd + } + if cfg.Decoder.WSPRCmd != "" { + out["WSPR"] = cfg.Decoder.WSPRCmd + } + if cfg.Decoder.DMRCmd != "" { + out["DMR"] = cfg.Decoder.DMRCmd + } + if cfg.Decoder.DStarCmd != "" { + out["D-STAR"] = cfg.Decoder.DStarCmd + } + if cfg.Decoder.FSKCmd != "" { + out["FSK"] = cfg.Decoder.FSKCmd + } + if cfg.Decoder.PSKCmd != "" { + out["PSK"] = cfg.Decoder.PSKCmd + } + return out +} + +func decoderKeys(cfg config.Config) []string { + m := buildDecoderMap(cfg) + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 { + if len(iq) == 0 || sampleRate <= 0 { + return nil + } + offset := sigHz - centerHz + shifted := dsp.FreqShift(iq, sampleRate, offset) + cutoff := bwHz / 2 + if cutoff < 200 { + cutoff = 200 + } + if cutoff > float64(sampleRate)/2-1 { + cutoff = float64(sampleRate)/2 - 1 + } + taps := dsp.LowpassFIR(cutoff, sampleRate, 101) + filtered := dsp.ApplyFIR(shifted, taps) + decim := sampleRate / 200000 + if decim < 1 { + decim = 1 + } + return dsp.Decimate(filtered, decim) +} + +func parseSince(raw string) (time.Time, error) { + if raw == "" { + return time.Time{}, nil + } + if ms, err := strconv.ParseInt(raw, 10, 64); err == nil { + if ms > 1e12 { + return time.UnixMilli(ms), nil + } + return time.Unix(ms, 0), nil + } + if t, err := time.Parse(time.RFC3339Nano, raw); err == nil { + return t, nil + } + return time.Parse(time.RFC3339, raw) +} + diff --git a/cmd/sdrd/main.go b/cmd/sdrd/main.go index f9cf8fa..492cf2e 100644 --- a/cmd/sdrd/main.go +++ b/cmd/sdrd/main.go @@ -5,13 +5,10 @@ import ( "encoding/json" "flag" "log" - "math" "net/http" "os" "os/signal" "path/filepath" - "runtime/debug" - "sort" "strconv" "strings" "sync" @@ -20,10 +17,8 @@ import ( "github.com/gorilla/websocket" - "sdr-visual-suite/internal/classifier" "sdr-visual-suite/internal/config" "sdr-visual-suite/internal/detector" - "sdr-visual-suite/internal/dsp" "sdr-visual-suite/internal/events" fftutil "sdr-visual-suite/internal/fft" "sdr-visual-suite/internal/fft/gpufft" @@ -435,304 +430,4 @@ func main() { _ = server.Shutdown(ctxTimeout) } -func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) { - defer func() { - if r := recover(); r != nil { - log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack()) - } - }() - ticker := time.NewTicker(cfg.FrameInterval()) - defer ticker.Stop() - logTicker := time.NewTicker(5 * time.Second) - defer logTicker.Stop() - enc := json.NewEncoder(eventFile) - dcBlocker := dsp.NewDCBlocker(0.995) - dcEnabled := cfg.DCBlock - iqEnabled := cfg.IQBalance - plan := fftutil.NewCmplxPlan(cfg.FFTSize) - useGPU := cfg.UseGPUFFT - var gpuEngine *gpufft.Engine - if useGPU && gpuState != nil { - snap := gpuState.snapshot() - if snap.Available { - if eng, err := gpufft.New(cfg.FFTSize); err == nil { - gpuEngine = eng - gpuState.set(true, nil) - } else { - gpuState.set(false, err) - useGPU = false - } - } else { - gpuState.set(false, nil) - useGPU = false - } - } else if gpuState != nil { - gpuState.set(false, nil) - } - - gotSamples := false - for { - select { - case <-ctx.Done(): - return - case <-logTicker.C: - st := srcMgr.Stats() - log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs) - case upd := <-updates: - prevFFT := cfg.FFTSize - prevUseGPU := useGPU - cfg = upd.cfg - if rec != nil { - rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{ - Enabled: cfg.Recorder.Enabled, - MinSNRDb: cfg.Recorder.MinSNRDb, - MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second), - MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second), - PrerollMs: cfg.Recorder.PrerollMs, - RecordIQ: cfg.Recorder.RecordIQ, - RecordAudio: cfg.Recorder.RecordAudio, - AutoDemod: cfg.Recorder.AutoDemod, - AutoDecode: cfg.Recorder.AutoDecode, - MaxDiskMB: cfg.Recorder.MaxDiskMB, - OutputDir: cfg.Recorder.OutputDir, - ClassFilter: cfg.Recorder.ClassFilter, - RingSeconds: cfg.Recorder.RingSeconds, - }, cfg.CenterHz, buildDecoderMap(cfg)) - } - if upd.det != nil { - det = upd.det - } - if upd.window != nil { - window = upd.window - plan = fftutil.NewCmplxPlan(cfg.FFTSize) - } - dcEnabled = upd.dcBlock - iqEnabled = upd.iqBalance - if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU { - srcMgr.Flush() - gotSamples = false - if gpuEngine != nil { - gpuEngine.Close() - gpuEngine = nil - } - useGPU = cfg.UseGPUFFT - if useGPU && gpuState != nil { - snap := gpuState.snapshot() - if snap.Available { - if eng, err := gpufft.New(cfg.FFTSize); err == nil { - gpuEngine = eng - gpuState.set(true, nil) - } else { - gpuState.set(false, err) - useGPU = false - } - } else { - gpuState.set(false, nil) - useGPU = false - } - } else if gpuState != nil { - gpuState.set(false, nil) - } - } - dcBlocker.Reset() - ticker.Reset(cfg.FrameInterval()) - case <-ticker.C: - iq, err := srcMgr.ReadIQ(cfg.FFTSize) - if err != nil { - log.Printf("read IQ: %v", err) - if strings.Contains(err.Error(), "timeout") { - if err := srcMgr.Restart(cfg); err != nil { - log.Printf("restart failed: %v", err) - } - } - continue - } - if rec != nil { - rec.Ingest(time.Now(), iq) - } - if !gotSamples { - log.Printf("received IQ samples") - gotSamples = true - } - if dcEnabled { - dcBlocker.Apply(iq) - } - if iqEnabled { - dsp.IQBalance(iq) - } - var spectrum []float64 - if useGPU && gpuEngine != nil { - if len(window) == len(iq) { - for i := 0; i < len(iq); i++ { - v := iq[i] - w := float32(window[i]) - iq[i] = complex(real(v)*w, imag(v)*w) - } - } - out, err := gpuEngine.Exec(iq) - if err != nil { - if gpuState != nil { - gpuState.set(false, err) - } - useGPU = false - spectrum = fftutil.SpectrumWithPlan(iq, nil, plan) - } else { - spectrum = fftutil.SpectrumFromFFT(out) - } - } else { - spectrum = fftutil.SpectrumWithPlan(iq, window, plan) - } - for i := range spectrum { - if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) { - spectrum[i] = -200 - } - } - now := time.Now() - finished, signals := det.Process(now, spectrum, cfg.CenterHz) - thresholds := det.LastThresholds() - noiseFloor := det.LastNoiseFloor() - // enrich classification with temporal IQ features on per-signal snippet - if len(iq) > 0 { - for i := range signals { - snip := extractSignalIQ(iq, cfg.SampleRate, cfg.CenterHz, signals[i].CenterHz, signals[i].BWHz) - cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, snip) - signals[i].Class = cls - } - det.UpdateClasses(signals) - } - if sigSnap != nil { - sigSnap.set(signals) - } - eventMu.Lock() - for _, ev := range finished { - _ = enc.Encode(ev) - } - eventMu.Unlock() - if rec != nil && len(finished) > 0 { - evCopy := make([]detector.Event, len(finished)) - copy(evCopy, finished) - go rec.OnEvents(evCopy) - } - var debugInfo *SpectrumDebug - if len(thresholds) > 0 || len(signals) > 0 || noiseFloor != 0 { - scoreDebug := make([]map[string]any, 0, len(signals)) - for _, s := range signals { - if s.Class == nil || len(s.Class.Scores) == 0 { - scoreDebug = append(scoreDebug, map[string]any{ - "center_hz": s.CenterHz, - "class": nil, - }) - continue - } - scores := make(map[string]float64, len(s.Class.Scores)) - for k, v := range s.Class.Scores { - scores[string(k)] = v - } - scoreDebug = append(scoreDebug, map[string]any{ - "center_hz": s.CenterHz, - "mod_type": s.Class.ModType, - "confidence": s.Class.Confidence, - "second_best": s.Class.SecondBest, - "scores": scores, - }) - } - debugInfo = &SpectrumDebug{ - Thresholds: thresholds, - NoiseFloor: noiseFloor, - Scores: scoreDebug, - } - } - h.broadcast(SpectrumFrame{ - Timestamp: now.UnixMilli(), - CenterHz: cfg.CenterHz, - SampleHz: cfg.SampleRate, - FFTSize: cfg.FFTSize, - Spectrum: spectrum, - Signals: signals, - Debug: debugInfo, - }) - } - } -} - -func mustParseDuration(raw string, fallback time.Duration) time.Duration { - if raw == "" { - return fallback - } - if d, err := time.ParseDuration(raw); err == nil { - return d - } - return fallback -} - -func buildDecoderMap(cfg config.Config) map[string]string { - out := map[string]string{} - if cfg.Decoder.FT8Cmd != "" { - out["FT8"] = cfg.Decoder.FT8Cmd - } - if cfg.Decoder.WSPRCmd != "" { - out["WSPR"] = cfg.Decoder.WSPRCmd - } - if cfg.Decoder.DMRCmd != "" { - out["DMR"] = cfg.Decoder.DMRCmd - } - if cfg.Decoder.DStarCmd != "" { - out["D-STAR"] = cfg.Decoder.DStarCmd - } - if cfg.Decoder.FSKCmd != "" { - out["FSK"] = cfg.Decoder.FSKCmd - } - if cfg.Decoder.PSKCmd != "" { - out["PSK"] = cfg.Decoder.PSKCmd - } - return out -} - -func decoderKeys(cfg config.Config) []string { - m := buildDecoderMap(cfg) - keys := make([]string, 0, len(m)) - for k := range m { - keys = append(keys, k) - } - sort.Strings(keys) - return keys -} - -func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 { - if len(iq) == 0 || sampleRate <= 0 { - return nil - } - offset := sigHz - centerHz - shifted := dsp.FreqShift(iq, sampleRate, offset) - cutoff := bwHz / 2 - if cutoff < 200 { - cutoff = 200 - } - if cutoff > float64(sampleRate)/2-1 { - cutoff = float64(sampleRate)/2 - 1 - } - taps := dsp.LowpassFIR(cutoff, sampleRate, 101) - filtered := dsp.ApplyFIR(shifted, taps) - decim := sampleRate / 200000 - if decim < 1 { - decim = 1 - } - return dsp.Decimate(filtered, decim) -} - -func parseSince(raw string) (time.Time, error) { - if raw == "" { - return time.Time{}, nil - } - if ms, err := strconv.ParseInt(raw, 10, 64); err == nil { - if ms > 1e12 { - return time.UnixMilli(ms), nil - } - return time.Unix(ms, 0), nil - } - if t, err := time.Parse(time.RFC3339Nano, raw); err == nil { - return t, nil - } - return time.Parse(time.RFC3339, raw) -}