From 25fe02b0ae57efaf7e2065b888222857162334c6 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sat, 28 Mar 2026 08:30:03 +0100 Subject: [PATCH] recorder: cap ring ingest chunks and enforce sample-based ring retention --- internal/recorder/recorder.go | 31 +++++++++++++++--- internal/recorder/ring.go | 48 +++++++++++++++++++-------- internal/recorder/ring_test.go | 60 ++++++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 17 deletions(-) create mode 100644 internal/recorder/ring_test.go diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go index a03b378..f669af5 100644 --- a/internal/recorder/recorder.go +++ b/internal/recorder/recorder.go @@ -86,11 +86,12 @@ func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeC func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) { m.mu.Lock() defer m.mu.Unlock() + prevRingSeconds := m.policy.RingSeconds m.policy = policy m.centerHz = centerHz m.decodeCommands = decodeCommands // Only reset ring and GPU engine if sample parameters actually changed - needRingReset := m.sampleRate != sampleRate || m.blockSize != blockSize + needRingReset := m.sampleRate != sampleRate || m.blockSize != blockSize || prevRingSeconds != policy.RingSeconds m.sampleRate = sampleRate m.blockSize = blockSize if needRingReset { @@ -116,16 +117,38 @@ func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz } func (m *Manager) Ingest(t0 time.Time, samples []complex64) { - if m == nil { + if m == nil || len(samples) == 0 { return } m.mu.RLock() ring := m.ring + sampleRate := m.sampleRate + blockSize := m.blockSize m.mu.RUnlock() - if ring == nil { + if ring == nil || sampleRate <= 0 { return } - ring.Push(t0, samples) + + chunkSamples := blockSize * 16 + if chunkSamples < 65_536 { + chunkSamples = 65_536 + } + maxRingSamples := ring.MaxSamples() + if maxRingSamples > 0 && chunkSamples > maxRingSamples { + chunkSamples = maxRingSamples + } + if chunkSamples <= 0 { + chunkSamples = len(samples) + } + for off := 0; off < len(samples); off += chunkSamples { + end := off + chunkSamples + if end > len(samples) { + end = len(samples) + } + chunkStart := t0.Add(time.Duration(float64(off) / float64(sampleRate) * float64(time.Second))) + ring.Push(chunkStart, samples[off:end]) + } + if m.telemetry != nil { m.telemetry.SetGauge("recorder.ring.push_samples", float64(len(samples)), nil) } diff --git a/internal/recorder/ring.go b/internal/recorder/ring.go index 812d1a7..09cae87 100644 --- a/internal/recorder/ring.go +++ b/internal/recorder/ring.go @@ -14,9 +14,9 @@ type iqBlock struct { type Ring struct { mu sync.RWMutex blocks []iqBlock - maxBlocks int + maxSamples int + total int sampleRate int - blockSize int } func NewRing(sampleRate int, blockSize int, seconds int) *Ring { @@ -29,15 +29,15 @@ func NewRing(sampleRate int, blockSize int, seconds int) *Ring { if blockSize <= 0 { blockSize = 2048 } - blocksPerSec := sampleRate / blockSize - if blocksPerSec <= 0 { - blocksPerSec = 1 + maxSamples := sampleRate * seconds + minSamples := blockSize * 2 + if minSamples < blockSize { + minSamples = blockSize } - maxBlocks := blocksPerSec * seconds - if maxBlocks < 2 { - maxBlocks = 2 + if maxSamples < minSamples { + maxSamples = minSamples } - return &Ring{maxBlocks: maxBlocks, sampleRate: sampleRate, blockSize: blockSize} + return &Ring{maxSamples: maxSamples, sampleRate: sampleRate} } func (r *Ring) Reset(sampleRate int, blockSize int, seconds int) { @@ -50,13 +50,35 @@ func (r *Ring) Push(t0 time.Time, samples []complex64) { } r.mu.Lock() defer r.mu.Unlock() - r.blocks = append(r.blocks, iqBlock{t0: t0, samples: append([]complex64(nil), samples...)}) - if len(r.blocks) > r.maxBlocks { - drop := len(r.blocks) - r.maxBlocks - r.blocks = r.blocks[drop:] + cp := append([]complex64(nil), samples...) + r.blocks = append(r.blocks, iqBlock{t0: t0, samples: cp}) + r.total += len(cp) + for r.total > r.maxSamples && len(r.blocks) > 0 { + overflow := r.total - r.maxSamples + head := r.blocks[0] + if overflow >= len(head.samples) { + r.total -= len(head.samples) + r.blocks = r.blocks[1:] + continue + } + trim := overflow + advance := time.Duration(float64(trim) / float64(r.sampleRate) * float64(time.Second)) + head.t0 = head.t0.Add(advance) + head.samples = head.samples[trim:] + r.blocks[0] = head + r.total -= trim } } +func (r *Ring) MaxSamples() int { + if r == nil { + return 0 + } + r.mu.RLock() + defer r.mu.RUnlock() + return r.maxSamples +} + // Slice returns IQ samples between [start,end] (best-effort). func (r *Ring) Slice(start, end time.Time) []complex64 { if r == nil || end.Before(start) { diff --git a/internal/recorder/ring_test.go b/internal/recorder/ring_test.go new file mode 100644 index 0000000..5ca709d --- /dev/null +++ b/internal/recorder/ring_test.go @@ -0,0 +1,60 @@ +package recorder + +import ( + "testing" + "time" +) + +func TestRingSampleCapacityPartialTrim(t *testing.T) { + r := NewRing(10, 2, 2) // 20 samples capacity + base := time.Unix(1700000000, 0) + + push := func(start int, n int, t0 time.Time) { + s := make([]complex64, n) + for i := range s { + s[i] = complex(float32(start+i), 0) + } + r.Push(t0, s) + } + + push(0, 8, base) + push(8, 8, base.Add(800*time.Millisecond)) + push(16, 8, base.Add(1600*time.Millisecond)) + + out := r.Slice(base, base.Add(4*time.Second)) + if got, want := len(out), 20; got != want { + t.Fatalf("len mismatch: got %d want %d", got, want) + } + if got, want := int(real(out[0])), 4; got != want { + t.Fatalf("first sample mismatch: got %d want %d", got, want) + } + if got, want := int(real(out[len(out)-1])), 23; got != want { + t.Fatalf("last sample mismatch: got %d want %d", got, want) + } +} + +func TestRingSampleCapacityVariablePushSizes(t *testing.T) { + r := NewRing(100, 10, 1) // 100 samples capacity + base := time.Unix(1700001000, 0) + offset := 0 + for i := 0; i < 10; i++ { + block := make([]complex64, 15) + for j := range block { + block[j] = complex(float32(offset+j), 0) + } + t0 := base.Add(time.Duration(float64(offset) / 100.0 * float64(time.Second))) + r.Push(t0, block) + offset += len(block) + } + + out := r.Slice(base, base.Add(3*time.Second)) + if got, want := len(out), 100; got != want { + t.Fatalf("len mismatch: got %d want %d", got, want) + } + if got, want := int(real(out[0])), 50; got != want { + t.Fatalf("first sample mismatch: got %d want %d", got, want) + } + if got, want := int(real(out[len(out)-1])), 149; got != want { + t.Fatalf("last sample mismatch: got %d want %d", got, want) + } +}