ソースを参照

Add recorder ring buffer and IQ recording policy

master
Jan Svabenik 3日前
コミット
327f220da1
6個のファイルの変更418行の追加32行の削除
  1. +78
    -22
      cmd/sdrd/main.go
  2. +39
    -10
      internal/config/config.go
  3. +23
    -0
      internal/recorder/iqwriter.go
  4. +46
    -0
      internal/recorder/metadata.go
  5. +125
    -0
      internal/recorder/recorder.go
  6. +107
    -0
      internal/recorder/ring.go

+ 78
- 22
cmd/sdrd/main.go ファイルの表示

@@ -24,6 +24,7 @@ import (
fftutil "sdr-visual-suite/internal/fft"
"sdr-visual-suite/internal/fft/gpufft"
"sdr-visual-suite/internal/mock"
"sdr-visual-suite/internal/recorder"
"sdr-visual-suite/internal/runtime"
"sdr-visual-suite/internal/sdr"
"sdr-visual-suite/internal/sdrplay"
@@ -39,8 +40,10 @@ type SpectrumFrame struct {
}

type client struct {
conn *websocket.Conn
send chan []byte
conn *websocket.Conn
send chan []byte
done chan struct{}
closeOnce sync.Once
}

type hub struct {
@@ -83,10 +86,10 @@ func (h *hub) add(c *client) {
}

func (h *hub) remove(c *client) {
c.closeOnce.Do(func() { close(c.done) })
h.mu.Lock()
defer h.mu.Unlock()
delete(h.clients, c)
close(c.send)
}

func (h *hub) broadcast(frame SpectrumFrame) {
@@ -277,7 +280,7 @@ func main() {
log.Fatalf("open events: %v", err)
}
defer eventFile.Close()
eventMu := &sync.Mutex{}
eventMu := &sync.RWMutex{}

det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
@@ -290,18 +293,30 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState)
recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
Enabled: cfg.Recorder.Enabled,
MinSNRDb: cfg.Recorder.MinSNRDb,
MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
PrerollMs: cfg.Recorder.PrerollMs,
RecordIQ: cfg.Recorder.RecordIQ,
OutputDir: cfg.Recorder.OutputDir,
ClassFilter: cfg.Recorder.ClassFilter,
RingSeconds: cfg.Recorder.RingSeconds,
})

go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr)

upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
origin := r.Header.Get("Origin")
return origin == "" || strings.HasPrefix(origin, "http://localhost") || strings.HasPrefix(origin, "http://127.0.0.1")
}}
origin := r.Header.Get("Origin")
return origin == "" || strings.HasPrefix(origin, "http://localhost") || strings.HasPrefix(origin, "http://127.0.0.1")
}}
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
c := &client{conn: conn, send: make(chan []byte, 32)}
c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})}
h.add(c)
defer func() {
h.remove(c)
@@ -458,9 +473,9 @@ func main() {
}
}
snap := cfgManager.Snapshot()
eventMu.Lock()
eventMu.RLock()
evs, err := events.ReadRecent(snap.EventPath, limit, since)
eventMu.Unlock()
eventMu.RUnlock()
if err != nil {
http.Error(w, "failed to read events", http.StatusInternalServerError)
return
@@ -486,7 +501,7 @@ func main() {
_ = server.Shutdown(ctxTimeout)
}

func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.Mutex, updates <-chan dspUpdate, gpuState *gpuStatus) {
func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager) {
ticker := time.NewTicker(cfg.FrameInterval())
defer ticker.Stop()
logTicker := time.NewTicker(5 * time.Second)
@@ -498,12 +513,18 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
plan := fftutil.NewCmplxPlan(cfg.FFTSize)
useGPU := cfg.UseGPUFFT
var gpuEngine *gpufft.Engine
if useGPU && gpuState != nil && gpuState.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
if useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
useGPU = false
}
} else {
gpuState.set(false, err)
gpuState.set(false, nil)
useGPU = false
}
} else if gpuState != nil {
@@ -522,6 +543,19 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
prevFFT := cfg.FFTSize
prevUseGPU := useGPU
cfg = upd.cfg
if rec != nil {
rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
Enabled: cfg.Recorder.Enabled,
MinSNRDb: cfg.Recorder.MinSNRDb,
MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
PrerollMs: cfg.Recorder.PrerollMs,
RecordIQ: cfg.Recorder.RecordIQ,
OutputDir: cfg.Recorder.OutputDir,
ClassFilter: cfg.Recorder.ClassFilter,
RingSeconds: cfg.Recorder.RingSeconds,
})
}
if upd.det != nil {
det = upd.det
}
@@ -539,12 +573,18 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
gpuEngine = nil
}
useGPU = cfg.UseGPUFFT
if useGPU && gpuState != nil && gpuState.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
if useGPU && gpuState != nil {
snap := gpuState.snapshot()
if snap.Available {
if eng, err := gpufft.New(cfg.FFTSize); err == nil {
gpuEngine = eng
gpuState.set(true, nil)
} else {
gpuState.set(false, err)
useGPU = false
}
} else {
gpuState.set(false, err)
gpuState.set(false, nil)
useGPU = false
}
} else if gpuState != nil {
@@ -564,6 +604,9 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
}
continue
}
if rec != nil {
rec.Ingest(time.Now(), iq)
}
if !gotSamples {
log.Printf("received IQ samples")
gotSamples = true
@@ -603,6 +646,9 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
_ = enc.Encode(ev)
}
eventMu.Unlock()
if rec != nil {
rec.OnEvents(finished)
}
h.broadcast(SpectrumFrame{
Timestamp: now.UnixMilli(),
CenterHz: cfg.CenterHz,
@@ -615,6 +661,16 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
}
}

func mustParseDuration(raw string, fallback time.Duration) time.Duration {
if raw == "" {
return fallback
}
if d, err := time.ParseDuration(raw); err == nil {
return d
}
return fallback
}

func parseSince(raw string) (time.Time, error) {
if raw == "" {
return time.Time{}, nil


+ 39
- 10
internal/config/config.go ファイルの表示

@@ -19,6 +19,18 @@ type DetectorConfig struct {
HoldMs int `yaml:"hold_ms" json:"hold_ms"`
}

type RecorderConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
MinSNRDb float64 `yaml:"min_snr_db" json:"min_snr_db"`
MinDuration string `yaml:"min_duration" json:"min_duration"`
MaxDuration string `yaml:"max_duration" json:"max_duration"`
PrerollMs int `yaml:"preroll_ms" json:"preroll_ms"`
RecordIQ bool `yaml:"record_iq" json:"record_iq"`
OutputDir string `yaml:"output_dir" json:"output_dir"`
ClassFilter []string `yaml:"class_filter" json:"class_filter"`
RingSeconds int `yaml:"ring_seconds" json:"ring_seconds"`
}

type Config struct {
Bands []Band `yaml:"bands" json:"bands"`
CenterHz float64 `yaml:"center_hz" json:"center_hz"`
@@ -31,6 +43,7 @@ type Config struct {
DCBlock bool `yaml:"dc_block" json:"dc_block"`
IQBalance bool `yaml:"iq_balance" json:"iq_balance"`
Detector DetectorConfig `yaml:"detector" json:"detector"`
Recorder RecorderConfig `yaml:"recorder" json:"recorder"`
WebAddr string `yaml:"web_addr" json:"web_addr"`
EventPath string `yaml:"event_path" json:"event_path"`
FrameRate int `yaml:"frame_rate" json:"frame_rate"`
@@ -43,16 +56,26 @@ func Default() Config {
Bands: []Band{
{Name: "example", StartHz: 99.5e6, EndHz: 100.5e6},
},
CenterHz: 100.0e6,
SampleRate: 2_048_000,
FFTSize: 2048,
GainDb: 30,
TunerBwKHz: 1536,
UseGPUFFT: false,
AGC: false,
DCBlock: false,
IQBalance: false,
Detector: DetectorConfig{ThresholdDb: -20, MinDurationMs: 250, HoldMs: 500},
CenterHz: 100.0e6,
SampleRate: 2_048_000,
FFTSize: 2048,
GainDb: 30,
TunerBwKHz: 1536,
UseGPUFFT: false,
AGC: false,
DCBlock: false,
IQBalance: false,
Detector: DetectorConfig{ThresholdDb: -20, MinDurationMs: 250, HoldMs: 500},
Recorder: RecorderConfig{
Enabled: false,
MinSNRDb: 10,
MinDuration: "1s",
MaxDuration: "300s",
PrerollMs: 500,
RecordIQ: true,
OutputDir: "data/recordings",
RingSeconds: 8,
},
WebAddr: ":8080",
EventPath: "data/events.jsonl",
FrameRate: 15,
@@ -103,6 +126,12 @@ func Load(path string) (Config, error) {
if cfg.CenterHz == 0 {
cfg.CenterHz = 100.0e6
}
if cfg.Recorder.OutputDir == "" {
cfg.Recorder.OutputDir = "data/recordings"
}
if cfg.Recorder.RingSeconds <= 0 {
cfg.Recorder.RingSeconds = 8
}
return cfg, nil
}



+ 23
- 0
internal/recorder/iqwriter.go ファイルの表示

@@ -0,0 +1,23 @@
package recorder

import (
"encoding/binary"
"os"
)

func writeCF32(path string, samples []complex64) error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
for _, v := range samples {
if err := binary.Write(f, binary.LittleEndian, real(v)); err != nil {
return err
}
if err := binary.Write(f, binary.LittleEndian, imag(v)); err != nil {
return err
}
}
return nil
}

+ 46
- 0
internal/recorder/metadata.go ファイルの表示

@@ -0,0 +1,46 @@
package recorder

import (
"encoding/json"
"os"
"path/filepath"
"time"

"sdr-visual-suite/internal/classifier"
"sdr-visual-suite/internal/detector"
)

type Meta struct {
EventID int64 `json:"event_id"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
CenterHz float64 `json:"center_hz"`
BandwidthHz float64 `json:"bandwidth_hz"`
SampleRate int `json:"sample_rate"`
SNRDb float64 `json:"snr_db"`
PeakDb float64 `json:"peak_db"`
Class *classifier.Classification `json:"classification,omitempty"`
DurationMs int64 `json:"duration_ms"`
Files map[string]any `json:"files"`
}

func writeMeta(dir string, ev detector.Event, sampleRate int, files map[string]any) error {
m := Meta{
EventID: ev.ID,
Start: ev.Start,
End: ev.End,
CenterHz: ev.CenterHz,
BandwidthHz: ev.Bandwidth,
SampleRate: sampleRate,
SNRDb: ev.SNRDb,
PeakDb: ev.PeakDb,
Class: ev.Class,
DurationMs: ev.End.Sub(ev.Start).Milliseconds(),
Files: files,
}
b, err := json.MarshalIndent(m, "", " ")
if err != nil {
return err
}
return os.WriteFile(filepath.Join(dir, "meta.json"), b, 0o644)
}

+ 125
- 0
internal/recorder/recorder.go ファイルの表示

@@ -0,0 +1,125 @@
package recorder

import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"sdr-visual-suite/internal/detector"
)

type Policy struct {
Enabled bool `yaml:"enabled" json:"enabled"`
MinSNRDb float64 `yaml:"min_snr_db" json:"min_snr_db"`
MinDuration time.Duration `yaml:"min_duration" json:"min_duration"`
MaxDuration time.Duration `yaml:"max_duration" json:"max_duration"`
PrerollMs int `yaml:"preroll_ms" json:"preroll_ms"`
RecordIQ bool `yaml:"record_iq" json:"record_iq"`
OutputDir string `yaml:"output_dir" json:"output_dir"`
ClassFilter []string `yaml:"class_filter" json:"class_filter"`
RingSeconds int `yaml:"ring_seconds" json:"ring_seconds"`
}

type Manager struct {
policy Policy
ring *Ring
sampleRate int
blockSize int
}

func New(sampleRate int, blockSize int, policy Policy) *Manager {
if policy.OutputDir == "" {
policy.OutputDir = "data/recordings"
}
if policy.RingSeconds <= 0 {
policy.RingSeconds = 8
}
return &Manager{policy: policy, ring: NewRing(sampleRate, blockSize, policy.RingSeconds), sampleRate: sampleRate, blockSize: blockSize}
}

func (m *Manager) Update(sampleRate int, blockSize int, policy Policy) {
m.policy = policy
m.sampleRate = sampleRate
m.blockSize = blockSize
if m.ring == nil {
m.ring = NewRing(sampleRate, blockSize, policy.RingSeconds)
return
}
m.ring.Reset(sampleRate, blockSize, policy.RingSeconds)
}

func (m *Manager) Ingest(t0 time.Time, samples []complex64) {
if m == nil || m.ring == nil {
return
}
m.ring.Push(t0, samples)
}

func (m *Manager) OnEvents(events []detector.Event) {
if m == nil || !m.policy.Enabled || len(events) == 0 {
return
}
for _, ev := range events {
_ = m.recordEvent(ev)
}
}

func (m *Manager) recordEvent(ev detector.Event) error {
if !m.policy.Enabled {
return nil
}
if ev.SNRDb < m.policy.MinSNRDb {
return nil
}
dur := ev.End.Sub(ev.Start)
if m.policy.MinDuration > 0 && dur < m.policy.MinDuration {
return nil
}
if m.policy.MaxDuration > 0 && dur > m.policy.MaxDuration {
return nil
}
if len(m.policy.ClassFilter) > 0 && ev.Class != nil {
match := false
for _, c := range m.policy.ClassFilter {
if strings.EqualFold(c, string(ev.Class.ModType)) {
match = true
break
}
}
if !match {
return nil
}
}
if !m.policy.RecordIQ {
return nil
}

start := ev.Start.Add(-time.Duration(m.policy.PrerollMs) * time.Millisecond)
end := ev.End
if start.After(end) {
return errors.New("invalid event window")
}

segment := m.ring.Slice(start, end)
if len(segment) == 0 {
return errors.New("no iq in ring")
}

dir := filepath.Join(m.policy.OutputDir, fmt.Sprintf("%s_%0.fHz_evt%d", ev.Start.Format("2006-01-02T15-04-05"), ev.CenterHz, ev.ID))
if err := os.MkdirAll(dir, 0o755); err != nil {
return err
}
files := map[string]any{}
path := filepath.Join(dir, "signal.cf32")
if err := writeCF32(path, segment); err != nil {
return err
}
files["iq"] = "signal.cf32"
files["iq_format"] = "cf32"
files["iq_sample_rate"] = m.sampleRate

return writeMeta(dir, ev, m.sampleRate, files)
}

+ 107
- 0
internal/recorder/ring.go ファイルの表示

@@ -0,0 +1,107 @@
package recorder

import (
"sync"
"time"
)

type iqBlock struct {
t0 time.Time
samples []complex64
}

// Ring keeps recent IQ blocks for preroll capture.
type Ring struct {
mu sync.RWMutex
blocks []iqBlock
maxBlocks int
sampleRate int
blockSize int
}

func NewRing(sampleRate int, blockSize int, seconds int) *Ring {
if seconds <= 0 {
seconds = 5
}
if sampleRate <= 0 {
sampleRate = 2_048_000
}
if blockSize <= 0 {
blockSize = 2048
}
blocksPerSec := sampleRate / blockSize
if blocksPerSec <= 0 {
blocksPerSec = 1
}
maxBlocks := blocksPerSec * seconds
if maxBlocks < 2 {
maxBlocks = 2
}
return &Ring{maxBlocks: maxBlocks, sampleRate: sampleRate, blockSize: blockSize}
}

func (r *Ring) Reset(sampleRate int, blockSize int, seconds int) {
*r = *NewRing(sampleRate, blockSize, seconds)
}

func (r *Ring) Push(t0 time.Time, samples []complex64) {
if r == nil || len(samples) == 0 {
return
}
r.mu.Lock()
defer r.mu.Unlock()
r.blocks = append(r.blocks, iqBlock{t0: t0, samples: append([]complex64(nil), samples...)})
if len(r.blocks) > r.maxBlocks {
drop := len(r.blocks) - r.maxBlocks
r.blocks = r.blocks[drop:]
}
}

// Slice returns IQ samples between [start,end] (best-effort).
func (r *Ring) Slice(start, end time.Time) []complex64 {
if r == nil || end.Before(start) {
return nil
}
r.mu.RLock()
defer r.mu.RUnlock()
var out []complex64
for _, b := range r.blocks {
blockDur := time.Duration(float64(len(b.samples)) / float64(r.sampleRate) * float64(time.Second))
bEnd := b.t0.Add(blockDur)
if bEnd.Before(start) || b.t0.After(end) {
continue
}
// compute overlap
oStart := maxTime(start, b.t0)
oEnd := minTime(end, bEnd)
if oEnd.Before(oStart) {
continue
}
startIdx := int(float64(oStart.Sub(b.t0)) / float64(time.Second) * float64(r.sampleRate))
endIdx := int(float64(oEnd.Sub(b.t0)) / float64(time.Second) * float64(r.sampleRate))
if startIdx < 0 {
startIdx = 0
}
if endIdx > len(b.samples) {
endIdx = len(b.samples)
}
if endIdx > startIdx {
out = append(out, b.samples[startIdx:endIdx]...)
}
}
return out
}

func minTime(a, b time.Time) time.Time {
if a.Before(b) {
return a
}
return b
}

func maxTime(a, b time.Time) time.Time {
if a.After(b) {
return a
}
return b
}

読み込み中…
キャンセル
保存