ソースを参照

refactor: split DSP loop and helpers from main

master
Jan Svabenik 2日前
コミット
b011378b28
3個のファイルの変更317行の追加305行の削除
  1. +225
    -0
      cmd/sdrd/dsp_loop.go
  2. +92
    -0
      cmd/sdrd/helpers.go
  3. +0
    -305
      cmd/sdrd/main.go

+ 225
- 0
cmd/sdrd/dsp_loop.go ファイルの表示

@@ -0,0 +1,225 @@
package main

import (
"context"
"encoding/json"
"log"
"math"
"os"
"runtime/debug"
"strings"
"sync"
"time"

"sdr-visual-suite/internal/classifier"
"sdr-visual-suite/internal/config"
"sdr-visual-suite/internal/detector"
"sdr-visual-suite/internal/dsp"
fftutil "sdr-visual-suite/internal/fft"
"sdr-visual-suite/internal/fft/gpufft"
"sdr-visual-suite/internal/recorder"
)

func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) {
defer func() {
if r := recover(); r != nil {
log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack())
}
}()
ticker := time.NewTicker(cfg.FrameInterval())
defer ticker.Stop()
logTicker := time.NewTicker(5 * time.Second)
defer logTicker.Stop()
enc := json.NewEncoder(eventFile)
dcBlocker := dsp.NewDCBlocker(0.995)
dcEnabled := cfg.DCBlock
iqEnabled := cfg.IQBalance
plan := fftutil.NewCmplxPlan(cfg.FFTSize)
useGPU := cfg.UseGPUFFT
var gpuEngine *gpufft.Engine
if useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
useGPU = false
}
} else {
gpuState.set(false, nil)
useGPU = false
}
} else if gpuState != nil {
gpuState.set(false, nil)
}

gotSamples := false
for {
select {
case <-ctx.Done():
return
case <-logTicker.C:
st := srcMgr.Stats()
log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
case upd := <-updates:
prevFFT := cfg.FFTSize
prevUseGPU := useGPU
cfg = upd.cfg
if rec != nil {
rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
Enabled: cfg.Recorder.Enabled,
MinSNRDb: cfg.Recorder.MinSNRDb,
MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
PrerollMs: cfg.Recorder.PrerollMs,
RecordIQ: cfg.Recorder.RecordIQ,
RecordAudio: cfg.Recorder.RecordAudio,
AutoDemod: cfg.Recorder.AutoDemod,
AutoDecode: cfg.Recorder.AutoDecode,
MaxDiskMB: cfg.Recorder.MaxDiskMB,
OutputDir: cfg.Recorder.OutputDir,
ClassFilter: cfg.Recorder.ClassFilter,
RingSeconds: cfg.Recorder.RingSeconds,
}, cfg.CenterHz, buildDecoderMap(cfg))
}
if upd.det != nil {
det = upd.det
}
if upd.window != nil {
window = upd.window
plan = fftutil.NewCmplxPlan(cfg.FFTSize)
}
dcEnabled = upd.dcBlock
iqEnabled = upd.iqBalance
if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
srcMgr.Flush()
gotSamples = false
if gpuEngine != nil {
gpuEngine.Close()
gpuEngine = nil
}
useGPU = cfg.UseGPUFFT
if useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
useGPU = false
}
} else {
gpuState.set(false, nil)
useGPU = false
}
} else if gpuState != nil {
gpuState.set(false, nil)
}
}
dcBlocker.Reset()
ticker.Reset(cfg.FrameInterval())
case <-ticker.C:
iq, err := srcMgr.ReadIQ(cfg.FFTSize)
if err != nil {
log.Printf("read IQ: %v", err)
if strings.Contains(err.Error(), "timeout") {
if err := srcMgr.Restart(cfg); err != nil {
log.Printf("restart failed: %v", err)
}
}
continue
}
if rec != nil {
rec.Ingest(time.Now(), iq)
}
if !gotSamples {
log.Printf("received IQ samples")
gotSamples = true
}
if dcEnabled {
dcBlocker.Apply(iq)
}
if iqEnabled {
dsp.IQBalance(iq)
}
var spectrum []float64
if useGPU && gpuEngine != nil {
if len(window) == len(iq) {
for i := 0; i < len(iq); i++ {
v := iq[i]
w := float32(window[i])
iq[i] = complex(real(v)*w, imag(v)*w)
}
}
out, err := gpuEngine.Exec(iq)
if err != nil {
if gpuState != nil {
gpuState.set(false, err)
}
useGPU = false
spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
} else {
spectrum = fftutil.SpectrumFromFFT(out)
}
} else {
spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
}
for i := range spectrum {
if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
spectrum[i] = -200
}
}
now := time.Now()
finished, signals := det.Process(now, spectrum, cfg.CenterHz)
thresholds := det.LastThresholds()
noiseFloor := det.LastNoiseFloor()
if len(iq) > 0 {
for i := range signals {
snip := extractSignalIQ(iq, cfg.SampleRate, cfg.CenterHz, signals[i].CenterHz, signals[i].BWHz)
cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, snip)
signals[i].Class = cls
}
det.UpdateClasses(signals)
}
if sigSnap != nil {
sigSnap.set(signals)
}
eventMu.Lock()
for _, ev := range finished {
_ = enc.Encode(ev)
}
eventMu.Unlock()
if rec != nil && len(finished) > 0 {
evCopy := make([]detector.Event, len(finished))
copy(evCopy, finished)
go rec.OnEvents(evCopy)
}
var debugInfo *SpectrumDebug
if len(thresholds) > 0 || len(signals) > 0 || noiseFloor != 0 {
scoreDebug := make([]map[string]any, 0, len(signals))
for _, s := range signals {
if s.Class == nil || len(s.Class.Scores) == 0 {
scoreDebug = append(scoreDebug, map[string]any{"center_hz": s.CenterHz, "class": nil})
continue
}
scores := make(map[string]float64, len(s.Class.Scores))
for k, v := range s.Class.Scores {
scores[string(k)] = v
}
scoreDebug = append(scoreDebug, map[string]any{
"center_hz": s.CenterHz,
"mod_type": s.Class.ModType,
"confidence": s.Class.Confidence,
"second_best": s.Class.SecondBest,
"scores": scores,
})
}
debugInfo = &SpectrumDebug{Thresholds: thresholds, NoiseFloor: noiseFloor, Scores: scoreDebug}
}
h.broadcast(SpectrumFrame{Timestamp: now.UnixMilli(), CenterHz: cfg.CenterHz, SampleHz: cfg.SampleRate, FFTSize: cfg.FFTSize, Spectrum: spectrum, Signals: signals, Debug: debugInfo})
}
}
}

+ 92
- 0
cmd/sdrd/helpers.go ファイルの表示

@@ -0,0 +1,92 @@
package main

import (
"sort"
"strconv"
"time"

"sdr-visual-suite/internal/config"
"sdr-visual-suite/internal/dsp"
)

func mustParseDuration(raw string, fallback time.Duration) time.Duration {
if raw == "" {
return fallback
}
if d, err := time.ParseDuration(raw); err == nil {
return d
}
return fallback
}

func buildDecoderMap(cfg config.Config) map[string]string {
out := map[string]string{}
if cfg.Decoder.FT8Cmd != "" {
out["FT8"] = cfg.Decoder.FT8Cmd
}
if cfg.Decoder.WSPRCmd != "" {
out["WSPR"] = cfg.Decoder.WSPRCmd
}
if cfg.Decoder.DMRCmd != "" {
out["DMR"] = cfg.Decoder.DMRCmd
}
if cfg.Decoder.DStarCmd != "" {
out["D-STAR"] = cfg.Decoder.DStarCmd
}
if cfg.Decoder.FSKCmd != "" {
out["FSK"] = cfg.Decoder.FSKCmd
}
if cfg.Decoder.PSKCmd != "" {
out["PSK"] = cfg.Decoder.PSKCmd
}
return out
}

func decoderKeys(cfg config.Config) []string {
m := buildDecoderMap(cfg)
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}

func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
if len(iq) == 0 || sampleRate <= 0 {
return nil
}
offset := sigHz - centerHz
shifted := dsp.FreqShift(iq, sampleRate, offset)
cutoff := bwHz / 2
if cutoff < 200 {
cutoff = 200
}
if cutoff > float64(sampleRate)/2-1 {
cutoff = float64(sampleRate)/2 - 1
}
taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
filtered := dsp.ApplyFIR(shifted, taps)
decim := sampleRate / 200000
if decim < 1 {
decim = 1
}
return dsp.Decimate(filtered, decim)
}

func parseSince(raw string) (time.Time, error) {
if raw == "" {
return time.Time{}, nil
}
if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
if ms > 1e12 {
return time.UnixMilli(ms), nil
}
return time.Unix(ms, 0), nil
}
if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
return t, nil
}
return time.Parse(time.RFC3339, raw)
}


+ 0
- 305
cmd/sdrd/main.go ファイルの表示

@@ -5,13 +5,10 @@ import (
"encoding/json"
"flag"
"log"
"math"
"net/http"
"os"
"os/signal"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
@@ -20,10 +17,8 @@ import (

"github.com/gorilla/websocket"

"sdr-visual-suite/internal/classifier"
"sdr-visual-suite/internal/config"
"sdr-visual-suite/internal/detector"
"sdr-visual-suite/internal/dsp"
"sdr-visual-suite/internal/events"
fftutil "sdr-visual-suite/internal/fft"
"sdr-visual-suite/internal/fft/gpufft"
@@ -435,304 +430,4 @@ func main() {
_ = server.Shutdown(ctxTimeout)
}

func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) {
defer func() {
if r := recover(); r != nil {
log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack())
}
}()
ticker := time.NewTicker(cfg.FrameInterval())
defer ticker.Stop()
logTicker := time.NewTicker(5 * time.Second)
defer logTicker.Stop()
enc := json.NewEncoder(eventFile)
dcBlocker := dsp.NewDCBlocker(0.995)
dcEnabled := cfg.DCBlock
iqEnabled := cfg.IQBalance
plan := fftutil.NewCmplxPlan(cfg.FFTSize)
useGPU := cfg.UseGPUFFT
var gpuEngine *gpufft.Engine
if useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
useGPU = false
}
} else {
gpuState.set(false, nil)
useGPU = false
}
} else if gpuState != nil {
gpuState.set(false, nil)
}

gotSamples := false
for {
select {
case <-ctx.Done():
return
case <-logTicker.C:
st := srcMgr.Stats()
log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
case upd := <-updates:
prevFFT := cfg.FFTSize
prevUseGPU := useGPU
cfg = upd.cfg
if rec != nil {
rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
Enabled: cfg.Recorder.Enabled,
MinSNRDb: cfg.Recorder.MinSNRDb,
MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
PrerollMs: cfg.Recorder.PrerollMs,
RecordIQ: cfg.Recorder.RecordIQ,
RecordAudio: cfg.Recorder.RecordAudio,
AutoDemod: cfg.Recorder.AutoDemod,
AutoDecode: cfg.Recorder.AutoDecode,
MaxDiskMB: cfg.Recorder.MaxDiskMB,
OutputDir: cfg.Recorder.OutputDir,
ClassFilter: cfg.Recorder.ClassFilter,
RingSeconds: cfg.Recorder.RingSeconds,
}, cfg.CenterHz, buildDecoderMap(cfg))
}
if upd.det != nil {
det = upd.det
}
if upd.window != nil {
window = upd.window
plan = fftutil.NewCmplxPlan(cfg.FFTSize)
}
dcEnabled = upd.dcBlock
iqEnabled = upd.iqBalance
if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
srcMgr.Flush()
gotSamples = false
if gpuEngine != nil {
gpuEngine.Close()
gpuEngine = nil
}
useGPU = cfg.UseGPUFFT
if useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
useGPU = false
}
} else {
gpuState.set(false, nil)
useGPU = false
}
} else if gpuState != nil {
gpuState.set(false, nil)
}
}
dcBlocker.Reset()
ticker.Reset(cfg.FrameInterval())
case <-ticker.C:
iq, err := srcMgr.ReadIQ(cfg.FFTSize)
if err != nil {
log.Printf("read IQ: %v", err)
if strings.Contains(err.Error(), "timeout") {
if err := srcMgr.Restart(cfg); err != nil {
log.Printf("restart failed: %v", err)
}
}
continue
}
if rec != nil {
rec.Ingest(time.Now(), iq)
}
if !gotSamples {
log.Printf("received IQ samples")
gotSamples = true
}
if dcEnabled {
dcBlocker.Apply(iq)
}
if iqEnabled {
dsp.IQBalance(iq)
}
var spectrum []float64
if useGPU && gpuEngine != nil {
if len(window) == len(iq) {
for i := 0; i < len(iq); i++ {
v := iq[i]
w := float32(window[i])
iq[i] = complex(real(v)*w, imag(v)*w)
}
}
out, err := gpuEngine.Exec(iq)
if err != nil {
if gpuState != nil {
gpuState.set(false, err)
}
useGPU = false
spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
} else {
spectrum = fftutil.SpectrumFromFFT(out)
}
} else {
spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
}
for i := range spectrum {
if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
spectrum[i] = -200
}
}
now := time.Now()
finished, signals := det.Process(now, spectrum, cfg.CenterHz)
thresholds := det.LastThresholds()
noiseFloor := det.LastNoiseFloor()
// enrich classification with temporal IQ features on per-signal snippet
if len(iq) > 0 {
for i := range signals {
snip := extractSignalIQ(iq, cfg.SampleRate, cfg.CenterHz, signals[i].CenterHz, signals[i].BWHz)
cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, snip)
signals[i].Class = cls
}
det.UpdateClasses(signals)
}
if sigSnap != nil {
sigSnap.set(signals)
}
eventMu.Lock()
for _, ev := range finished {
_ = enc.Encode(ev)
}
eventMu.Unlock()
if rec != nil && len(finished) > 0 {
evCopy := make([]detector.Event, len(finished))
copy(evCopy, finished)
go rec.OnEvents(evCopy)
}
var debugInfo *SpectrumDebug
if len(thresholds) > 0 || len(signals) > 0 || noiseFloor != 0 {
scoreDebug := make([]map[string]any, 0, len(signals))
for _, s := range signals {
if s.Class == nil || len(s.Class.Scores) == 0 {
scoreDebug = append(scoreDebug, map[string]any{
"center_hz": s.CenterHz,
"class": nil,
})
continue
}
scores := make(map[string]float64, len(s.Class.Scores))
for k, v := range s.Class.Scores {
scores[string(k)] = v
}
scoreDebug = append(scoreDebug, map[string]any{
"center_hz": s.CenterHz,
"mod_type": s.Class.ModType,
"confidence": s.Class.Confidence,
"second_best": s.Class.SecondBest,
"scores": scores,
})
}
debugInfo = &SpectrumDebug{
Thresholds: thresholds,
NoiseFloor: noiseFloor,
Scores: scoreDebug,
}
}
h.broadcast(SpectrumFrame{
Timestamp: now.UnixMilli(),
CenterHz: cfg.CenterHz,
SampleHz: cfg.SampleRate,
FFTSize: cfg.FFTSize,
Spectrum: spectrum,
Signals: signals,
Debug: debugInfo,
})
}
}
}

func mustParseDuration(raw string, fallback time.Duration) time.Duration {
if raw == "" {
return fallback
}
if d, err := time.ParseDuration(raw); err == nil {
return d
}
return fallback
}

func buildDecoderMap(cfg config.Config) map[string]string {
out := map[string]string{}
if cfg.Decoder.FT8Cmd != "" {
out["FT8"] = cfg.Decoder.FT8Cmd
}
if cfg.Decoder.WSPRCmd != "" {
out["WSPR"] = cfg.Decoder.WSPRCmd
}
if cfg.Decoder.DMRCmd != "" {
out["DMR"] = cfg.Decoder.DMRCmd
}
if cfg.Decoder.DStarCmd != "" {
out["D-STAR"] = cfg.Decoder.DStarCmd
}
if cfg.Decoder.FSKCmd != "" {
out["FSK"] = cfg.Decoder.FSKCmd
}
if cfg.Decoder.PSKCmd != "" {
out["PSK"] = cfg.Decoder.PSKCmd
}
return out
}

func decoderKeys(cfg config.Config) []string {
m := buildDecoderMap(cfg)
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}

func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
if len(iq) == 0 || sampleRate <= 0 {
return nil
}
offset := sigHz - centerHz
shifted := dsp.FreqShift(iq, sampleRate, offset)
cutoff := bwHz / 2
if cutoff < 200 {
cutoff = 200
}
if cutoff > float64(sampleRate)/2-1 {
cutoff = float64(sampleRate)/2 - 1
}
taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
filtered := dsp.ApplyFIR(shifted, taps)
decim := sampleRate / 200000
if decim < 1 {
decim = 1
}
return dsp.Decimate(filtered, decim)
}

func parseSince(raw string) (time.Time, error) {
if raw == "" {
return time.Time{}, nil
}
if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
if ms > 1e12 {
return time.UnixMilli(ms), nil
}
return time.Unix(ms, 0), nil
}
if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
return t, nil
}
return time.Parse(time.RFC3339, raw)
}


読み込み中…
キャンセル
保存