From 327f220da130b481e352de6c71106d8e5b95670d Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Wed, 18 Mar 2026 06:35:05 +0100 Subject: [PATCH] Add recorder ring buffer and IQ recording policy --- cmd/sdrd/main.go | 100 +++++++++++++++++++++------ internal/config/config.go | 49 ++++++++++--- internal/recorder/iqwriter.go | 23 +++++++ internal/recorder/metadata.go | 46 +++++++++++++ internal/recorder/recorder.go | 125 ++++++++++++++++++++++++++++++++++ internal/recorder/ring.go | 107 +++++++++++++++++++++++++++++ 6 files changed, 418 insertions(+), 32 deletions(-) create mode 100644 internal/recorder/iqwriter.go create mode 100644 internal/recorder/metadata.go create mode 100644 internal/recorder/recorder.go create mode 100644 internal/recorder/ring.go diff --git a/cmd/sdrd/main.go b/cmd/sdrd/main.go index 68912b8..b5c00be 100644 --- a/cmd/sdrd/main.go +++ b/cmd/sdrd/main.go @@ -24,6 +24,7 @@ import ( fftutil "sdr-visual-suite/internal/fft" "sdr-visual-suite/internal/fft/gpufft" "sdr-visual-suite/internal/mock" + "sdr-visual-suite/internal/recorder" "sdr-visual-suite/internal/runtime" "sdr-visual-suite/internal/sdr" "sdr-visual-suite/internal/sdrplay" @@ -39,8 +40,10 @@ type SpectrumFrame struct { } type client struct { - conn *websocket.Conn - send chan []byte + conn *websocket.Conn + send chan []byte + done chan struct{} + closeOnce sync.Once } type hub struct { @@ -83,10 +86,10 @@ func (h *hub) add(c *client) { } func (h *hub) remove(c *client) { + c.closeOnce.Do(func() { close(c.done) }) h.mu.Lock() defer h.mu.Unlock() delete(h.clients, c) - close(c.send) } func (h *hub) broadcast(frame SpectrumFrame) { @@ -277,7 +280,7 @@ func main() { log.Fatalf("open events: %v", err) } defer eventFile.Close() - eventMu := &sync.Mutex{} + eventMu := &sync.RWMutex{} det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize, time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond, @@ -290,18 +293,30 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState) + recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{ + Enabled: cfg.Recorder.Enabled, + MinSNRDb: cfg.Recorder.MinSNRDb, + MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second), + MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second), + PrerollMs: cfg.Recorder.PrerollMs, + RecordIQ: cfg.Recorder.RecordIQ, + OutputDir: cfg.Recorder.OutputDir, + ClassFilter: cfg.Recorder.ClassFilter, + RingSeconds: cfg.Recorder.RingSeconds, + }) + + go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr) upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { - origin := r.Header.Get("Origin") - return origin == "" || strings.HasPrefix(origin, "http://localhost") || strings.HasPrefix(origin, "http://127.0.0.1") -}} + origin := r.Header.Get("Origin") + return origin == "" || strings.HasPrefix(origin, "http://localhost") || strings.HasPrefix(origin, "http://127.0.0.1") + }} http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return } - c := &client{conn: conn, send: make(chan []byte, 32)} + c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})} h.add(c) defer func() { h.remove(c) @@ -458,9 +473,9 @@ func main() { } } snap := cfgManager.Snapshot() - eventMu.Lock() + eventMu.RLock() evs, err := events.ReadRecent(snap.EventPath, limit, since) - eventMu.Unlock() + eventMu.RUnlock() if err != nil { http.Error(w, "failed to read events", http.StatusInternalServerError) return @@ -486,7 +501,7 @@ func main() { _ = server.Shutdown(ctxTimeout) } -func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.Mutex, updates <-chan dspUpdate, gpuState *gpuStatus) { +func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager) { ticker := time.NewTicker(cfg.FrameInterval()) defer ticker.Stop() logTicker := time.NewTicker(5 * time.Second) @@ -498,12 +513,18 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * plan := fftutil.NewCmplxPlan(cfg.FFTSize) useGPU := cfg.UseGPUFFT var gpuEngine *gpufft.Engine - if useGPU && gpuState != nil && gpuState.Available { - if eng, err := gpufft.New(cfg.FFTSize); err == nil { - gpuEngine = eng - gpuState.set(true, nil) + if useGPU && gpuState != nil { + snap := gpuState.snapshot() + if snap.Available { + if eng, err := gpufft.New(cfg.FFTSize); err == nil { + gpuEngine = eng + gpuState.set(true, nil) + } else { + gpuState.set(false, err) + useGPU = false + } } else { - gpuState.set(false, err) + gpuState.set(false, nil) useGPU = false } } else if gpuState != nil { @@ -522,6 +543,19 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * prevFFT := cfg.FFTSize prevUseGPU := useGPU cfg = upd.cfg + if rec != nil { + rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{ + Enabled: cfg.Recorder.Enabled, + MinSNRDb: cfg.Recorder.MinSNRDb, + MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second), + MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second), + PrerollMs: cfg.Recorder.PrerollMs, + RecordIQ: cfg.Recorder.RecordIQ, + OutputDir: cfg.Recorder.OutputDir, + ClassFilter: cfg.Recorder.ClassFilter, + RingSeconds: cfg.Recorder.RingSeconds, + }) + } if upd.det != nil { det = upd.det } @@ -539,12 +573,18 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * gpuEngine = nil } useGPU = cfg.UseGPUFFT - if useGPU && gpuState != nil && gpuState.Available { - if eng, err := gpufft.New(cfg.FFTSize); err == nil { - gpuEngine = eng - gpuState.set(true, nil) + if useGPU && gpuState != nil { + snap := gpuState.snapshot() + if snap.Available { + if eng, err := gpufft.New(cfg.FFTSize); err == nil { + gpuEngine = eng + gpuState.set(true, nil) + } else { + gpuState.set(false, err) + useGPU = false + } } else { - gpuState.set(false, err) + gpuState.set(false, nil) useGPU = false } } else if gpuState != nil { @@ -564,6 +604,9 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } continue } + if rec != nil { + rec.Ingest(time.Now(), iq) + } if !gotSamples { log.Printf("received IQ samples") gotSamples = true @@ -603,6 +646,9 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * _ = enc.Encode(ev) } eventMu.Unlock() + if rec != nil { + rec.OnEvents(finished) + } h.broadcast(SpectrumFrame{ Timestamp: now.UnixMilli(), CenterHz: cfg.CenterHz, @@ -615,6 +661,16 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } } +func mustParseDuration(raw string, fallback time.Duration) time.Duration { + if raw == "" { + return fallback + } + if d, err := time.ParseDuration(raw); err == nil { + return d + } + return fallback +} + func parseSince(raw string) (time.Time, error) { if raw == "" { return time.Time{}, nil diff --git a/internal/config/config.go b/internal/config/config.go index 916cf35..cf7187c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,6 +19,18 @@ type DetectorConfig struct { HoldMs int `yaml:"hold_ms" json:"hold_ms"` } +type RecorderConfig struct { + Enabled bool `yaml:"enabled" json:"enabled"` + MinSNRDb float64 `yaml:"min_snr_db" json:"min_snr_db"` + MinDuration string `yaml:"min_duration" json:"min_duration"` + MaxDuration string `yaml:"max_duration" json:"max_duration"` + PrerollMs int `yaml:"preroll_ms" json:"preroll_ms"` + RecordIQ bool `yaml:"record_iq" json:"record_iq"` + OutputDir string `yaml:"output_dir" json:"output_dir"` + ClassFilter []string `yaml:"class_filter" json:"class_filter"` + RingSeconds int `yaml:"ring_seconds" json:"ring_seconds"` +} + type Config struct { Bands []Band `yaml:"bands" json:"bands"` CenterHz float64 `yaml:"center_hz" json:"center_hz"` @@ -31,6 +43,7 @@ type Config struct { DCBlock bool `yaml:"dc_block" json:"dc_block"` IQBalance bool `yaml:"iq_balance" json:"iq_balance"` Detector DetectorConfig `yaml:"detector" json:"detector"` + Recorder RecorderConfig `yaml:"recorder" json:"recorder"` WebAddr string `yaml:"web_addr" json:"web_addr"` EventPath string `yaml:"event_path" json:"event_path"` FrameRate int `yaml:"frame_rate" json:"frame_rate"` @@ -43,16 +56,26 @@ func Default() Config { Bands: []Band{ {Name: "example", StartHz: 99.5e6, EndHz: 100.5e6}, }, - CenterHz: 100.0e6, - SampleRate: 2_048_000, - FFTSize: 2048, - GainDb: 30, - TunerBwKHz: 1536, - UseGPUFFT: false, - AGC: false, - DCBlock: false, - IQBalance: false, - Detector: DetectorConfig{ThresholdDb: -20, MinDurationMs: 250, HoldMs: 500}, + CenterHz: 100.0e6, + SampleRate: 2_048_000, + FFTSize: 2048, + GainDb: 30, + TunerBwKHz: 1536, + UseGPUFFT: false, + AGC: false, + DCBlock: false, + IQBalance: false, + Detector: DetectorConfig{ThresholdDb: -20, MinDurationMs: 250, HoldMs: 500}, + Recorder: RecorderConfig{ + Enabled: false, + MinSNRDb: 10, + MinDuration: "1s", + MaxDuration: "300s", + PrerollMs: 500, + RecordIQ: true, + OutputDir: "data/recordings", + RingSeconds: 8, + }, WebAddr: ":8080", EventPath: "data/events.jsonl", FrameRate: 15, @@ -103,6 +126,12 @@ func Load(path string) (Config, error) { if cfg.CenterHz == 0 { cfg.CenterHz = 100.0e6 } + if cfg.Recorder.OutputDir == "" { + cfg.Recorder.OutputDir = "data/recordings" + } + if cfg.Recorder.RingSeconds <= 0 { + cfg.Recorder.RingSeconds = 8 + } return cfg, nil } diff --git a/internal/recorder/iqwriter.go b/internal/recorder/iqwriter.go new file mode 100644 index 0000000..2b697f8 --- /dev/null +++ b/internal/recorder/iqwriter.go @@ -0,0 +1,23 @@ +package recorder + +import ( + "encoding/binary" + "os" +) + +func writeCF32(path string, samples []complex64) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + for _, v := range samples { + if err := binary.Write(f, binary.LittleEndian, real(v)); err != nil { + return err + } + if err := binary.Write(f, binary.LittleEndian, imag(v)); err != nil { + return err + } + } + return nil +} diff --git a/internal/recorder/metadata.go b/internal/recorder/metadata.go new file mode 100644 index 0000000..2899f45 --- /dev/null +++ b/internal/recorder/metadata.go @@ -0,0 +1,46 @@ +package recorder + +import ( + "encoding/json" + "os" + "path/filepath" + "time" + + "sdr-visual-suite/internal/classifier" + "sdr-visual-suite/internal/detector" +) + +type Meta struct { + EventID int64 `json:"event_id"` + Start time.Time `json:"start"` + End time.Time `json:"end"` + CenterHz float64 `json:"center_hz"` + BandwidthHz float64 `json:"bandwidth_hz"` + SampleRate int `json:"sample_rate"` + SNRDb float64 `json:"snr_db"` + PeakDb float64 `json:"peak_db"` + Class *classifier.Classification `json:"classification,omitempty"` + DurationMs int64 `json:"duration_ms"` + Files map[string]any `json:"files"` +} + +func writeMeta(dir string, ev detector.Event, sampleRate int, files map[string]any) error { + m := Meta{ + EventID: ev.ID, + Start: ev.Start, + End: ev.End, + CenterHz: ev.CenterHz, + BandwidthHz: ev.Bandwidth, + SampleRate: sampleRate, + SNRDb: ev.SNRDb, + PeakDb: ev.PeakDb, + Class: ev.Class, + DurationMs: ev.End.Sub(ev.Start).Milliseconds(), + Files: files, + } + b, err := json.MarshalIndent(m, "", " ") + if err != nil { + return err + } + return os.WriteFile(filepath.Join(dir, "meta.json"), b, 0o644) +} diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go new file mode 100644 index 0000000..a3aab31 --- /dev/null +++ b/internal/recorder/recorder.go @@ -0,0 +1,125 @@ +package recorder + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "sdr-visual-suite/internal/detector" +) + +type Policy struct { + Enabled bool `yaml:"enabled" json:"enabled"` + MinSNRDb float64 `yaml:"min_snr_db" json:"min_snr_db"` + MinDuration time.Duration `yaml:"min_duration" json:"min_duration"` + MaxDuration time.Duration `yaml:"max_duration" json:"max_duration"` + PrerollMs int `yaml:"preroll_ms" json:"preroll_ms"` + RecordIQ bool `yaml:"record_iq" json:"record_iq"` + OutputDir string `yaml:"output_dir" json:"output_dir"` + ClassFilter []string `yaml:"class_filter" json:"class_filter"` + RingSeconds int `yaml:"ring_seconds" json:"ring_seconds"` +} + +type Manager struct { + policy Policy + ring *Ring + sampleRate int + blockSize int +} + +func New(sampleRate int, blockSize int, policy Policy) *Manager { + if policy.OutputDir == "" { + policy.OutputDir = "data/recordings" + } + if policy.RingSeconds <= 0 { + policy.RingSeconds = 8 + } + return &Manager{policy: policy, ring: NewRing(sampleRate, blockSize, policy.RingSeconds), sampleRate: sampleRate, blockSize: blockSize} +} + +func (m *Manager) Update(sampleRate int, blockSize int, policy Policy) { + m.policy = policy + m.sampleRate = sampleRate + m.blockSize = blockSize + if m.ring == nil { + m.ring = NewRing(sampleRate, blockSize, policy.RingSeconds) + return + } + m.ring.Reset(sampleRate, blockSize, policy.RingSeconds) +} + +func (m *Manager) Ingest(t0 time.Time, samples []complex64) { + if m == nil || m.ring == nil { + return + } + m.ring.Push(t0, samples) +} + +func (m *Manager) OnEvents(events []detector.Event) { + if m == nil || !m.policy.Enabled || len(events) == 0 { + return + } + for _, ev := range events { + _ = m.recordEvent(ev) + } +} + +func (m *Manager) recordEvent(ev detector.Event) error { + if !m.policy.Enabled { + return nil + } + if ev.SNRDb < m.policy.MinSNRDb { + return nil + } + dur := ev.End.Sub(ev.Start) + if m.policy.MinDuration > 0 && dur < m.policy.MinDuration { + return nil + } + if m.policy.MaxDuration > 0 && dur > m.policy.MaxDuration { + return nil + } + if len(m.policy.ClassFilter) > 0 && ev.Class != nil { + match := false + for _, c := range m.policy.ClassFilter { + if strings.EqualFold(c, string(ev.Class.ModType)) { + match = true + break + } + } + if !match { + return nil + } + } + if !m.policy.RecordIQ { + return nil + } + + start := ev.Start.Add(-time.Duration(m.policy.PrerollMs) * time.Millisecond) + end := ev.End + if start.After(end) { + return errors.New("invalid event window") + } + + segment := m.ring.Slice(start, end) + if len(segment) == 0 { + return errors.New("no iq in ring") + } + + dir := filepath.Join(m.policy.OutputDir, fmt.Sprintf("%s_%0.fHz_evt%d", ev.Start.Format("2006-01-02T15-04-05"), ev.CenterHz, ev.ID)) + if err := os.MkdirAll(dir, 0o755); err != nil { + return err + } + files := map[string]any{} + path := filepath.Join(dir, "signal.cf32") + if err := writeCF32(path, segment); err != nil { + return err + } + files["iq"] = "signal.cf32" + files["iq_format"] = "cf32" + files["iq_sample_rate"] = m.sampleRate + + return writeMeta(dir, ev, m.sampleRate, files) +} diff --git a/internal/recorder/ring.go b/internal/recorder/ring.go new file mode 100644 index 0000000..812d1a7 --- /dev/null +++ b/internal/recorder/ring.go @@ -0,0 +1,107 @@ +package recorder + +import ( + "sync" + "time" +) + +type iqBlock struct { + t0 time.Time + samples []complex64 +} + +// Ring keeps recent IQ blocks for preroll capture. +type Ring struct { + mu sync.RWMutex + blocks []iqBlock + maxBlocks int + sampleRate int + blockSize int +} + +func NewRing(sampleRate int, blockSize int, seconds int) *Ring { + if seconds <= 0 { + seconds = 5 + } + if sampleRate <= 0 { + sampleRate = 2_048_000 + } + if blockSize <= 0 { + blockSize = 2048 + } + blocksPerSec := sampleRate / blockSize + if blocksPerSec <= 0 { + blocksPerSec = 1 + } + maxBlocks := blocksPerSec * seconds + if maxBlocks < 2 { + maxBlocks = 2 + } + return &Ring{maxBlocks: maxBlocks, sampleRate: sampleRate, blockSize: blockSize} +} + +func (r *Ring) Reset(sampleRate int, blockSize int, seconds int) { + *r = *NewRing(sampleRate, blockSize, seconds) +} + +func (r *Ring) Push(t0 time.Time, samples []complex64) { + if r == nil || len(samples) == 0 { + return + } + r.mu.Lock() + defer r.mu.Unlock() + r.blocks = append(r.blocks, iqBlock{t0: t0, samples: append([]complex64(nil), samples...)}) + if len(r.blocks) > r.maxBlocks { + drop := len(r.blocks) - r.maxBlocks + r.blocks = r.blocks[drop:] + } +} + +// Slice returns IQ samples between [start,end] (best-effort). +func (r *Ring) Slice(start, end time.Time) []complex64 { + if r == nil || end.Before(start) { + return nil + } + r.mu.RLock() + defer r.mu.RUnlock() + var out []complex64 + for _, b := range r.blocks { + blockDur := time.Duration(float64(len(b.samples)) / float64(r.sampleRate) * float64(time.Second)) + bEnd := b.t0.Add(blockDur) + if bEnd.Before(start) || b.t0.After(end) { + continue + } + // compute overlap + oStart := maxTime(start, b.t0) + oEnd := minTime(end, bEnd) + if oEnd.Before(oStart) { + continue + } + startIdx := int(float64(oStart.Sub(b.t0)) / float64(time.Second) * float64(r.sampleRate)) + endIdx := int(float64(oEnd.Sub(b.t0)) / float64(time.Second) * float64(r.sampleRate)) + if startIdx < 0 { + startIdx = 0 + } + if endIdx > len(b.samples) { + endIdx = len(b.samples) + } + if endIdx > startIdx { + out = append(out, b.samples[startIdx:endIdx]...) + } + } + return out +} + +func minTime(a, b time.Time) time.Time { + if a.Before(b) { + return a + } + return b +} + +func maxTime(a, b time.Time) time.Time { + if a.After(b) { + return a + } + return b +}