Sfoglia il codice sorgente

recorder: cap ring ingest chunks and enforce sample-based ring retention

master
Jan Svabenik 1 mese fa
parent
commit
25fe02b0ae
3 ha cambiato i file con 122 aggiunte e 17 eliminazioni
  1. +27
    -4
      internal/recorder/recorder.go
  2. +35
    -13
      internal/recorder/ring.go
  3. +60
    -0
      internal/recorder/ring_test.go

+ 27
- 4
internal/recorder/recorder.go Vedi File

@@ -86,11 +86,12 @@ func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeC
func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) { func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
prevRingSeconds := m.policy.RingSeconds
m.policy = policy m.policy = policy
m.centerHz = centerHz m.centerHz = centerHz
m.decodeCommands = decodeCommands m.decodeCommands = decodeCommands
// Only reset ring and GPU engine if sample parameters actually changed // Only reset ring and GPU engine if sample parameters actually changed
needRingReset := m.sampleRate != sampleRate || m.blockSize != blockSize
needRingReset := m.sampleRate != sampleRate || m.blockSize != blockSize || prevRingSeconds != policy.RingSeconds
m.sampleRate = sampleRate m.sampleRate = sampleRate
m.blockSize = blockSize m.blockSize = blockSize
if needRingReset { if needRingReset {
@@ -116,16 +117,38 @@ func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz
} }


func (m *Manager) Ingest(t0 time.Time, samples []complex64) { func (m *Manager) Ingest(t0 time.Time, samples []complex64) {
if m == nil {
if m == nil || len(samples) == 0 {
return return
} }
m.mu.RLock() m.mu.RLock()
ring := m.ring ring := m.ring
sampleRate := m.sampleRate
blockSize := m.blockSize
m.mu.RUnlock() m.mu.RUnlock()
if ring == nil {
if ring == nil || sampleRate <= 0 {
return return
} }
ring.Push(t0, samples)

chunkSamples := blockSize * 16
if chunkSamples < 65_536 {
chunkSamples = 65_536
}
maxRingSamples := ring.MaxSamples()
if maxRingSamples > 0 && chunkSamples > maxRingSamples {
chunkSamples = maxRingSamples
}
if chunkSamples <= 0 {
chunkSamples = len(samples)
}
for off := 0; off < len(samples); off += chunkSamples {
end := off + chunkSamples
if end > len(samples) {
end = len(samples)
}
chunkStart := t0.Add(time.Duration(float64(off) / float64(sampleRate) * float64(time.Second)))
ring.Push(chunkStart, samples[off:end])
}

if m.telemetry != nil { if m.telemetry != nil {
m.telemetry.SetGauge("recorder.ring.push_samples", float64(len(samples)), nil) m.telemetry.SetGauge("recorder.ring.push_samples", float64(len(samples)), nil)
} }


+ 35
- 13
internal/recorder/ring.go Vedi File

@@ -14,9 +14,9 @@ type iqBlock struct {
type Ring struct { type Ring struct {
mu sync.RWMutex mu sync.RWMutex
blocks []iqBlock blocks []iqBlock
maxBlocks int
maxSamples int
total int
sampleRate int sampleRate int
blockSize int
} }


func NewRing(sampleRate int, blockSize int, seconds int) *Ring { func NewRing(sampleRate int, blockSize int, seconds int) *Ring {
@@ -29,15 +29,15 @@ func NewRing(sampleRate int, blockSize int, seconds int) *Ring {
if blockSize <= 0 { if blockSize <= 0 {
blockSize = 2048 blockSize = 2048
} }
blocksPerSec := sampleRate / blockSize
if blocksPerSec <= 0 {
blocksPerSec = 1
maxSamples := sampleRate * seconds
minSamples := blockSize * 2
if minSamples < blockSize {
minSamples = blockSize
} }
maxBlocks := blocksPerSec * seconds
if maxBlocks < 2 {
maxBlocks = 2
if maxSamples < minSamples {
maxSamples = minSamples
} }
return &Ring{maxBlocks: maxBlocks, sampleRate: sampleRate, blockSize: blockSize}
return &Ring{maxSamples: maxSamples, sampleRate: sampleRate}
} }


func (r *Ring) Reset(sampleRate int, blockSize int, seconds int) { func (r *Ring) Reset(sampleRate int, blockSize int, seconds int) {
@@ -50,13 +50,35 @@ func (r *Ring) Push(t0 time.Time, samples []complex64) {
} }
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.blocks = append(r.blocks, iqBlock{t0: t0, samples: append([]complex64(nil), samples...)})
if len(r.blocks) > r.maxBlocks {
drop := len(r.blocks) - r.maxBlocks
r.blocks = r.blocks[drop:]
cp := append([]complex64(nil), samples...)
r.blocks = append(r.blocks, iqBlock{t0: t0, samples: cp})
r.total += len(cp)
for r.total > r.maxSamples && len(r.blocks) > 0 {
overflow := r.total - r.maxSamples
head := r.blocks[0]
if overflow >= len(head.samples) {
r.total -= len(head.samples)
r.blocks = r.blocks[1:]
continue
}
trim := overflow
advance := time.Duration(float64(trim) / float64(r.sampleRate) * float64(time.Second))
head.t0 = head.t0.Add(advance)
head.samples = head.samples[trim:]
r.blocks[0] = head
r.total -= trim
} }
} }


func (r *Ring) MaxSamples() int {
if r == nil {
return 0
}
r.mu.RLock()
defer r.mu.RUnlock()
return r.maxSamples
}

// Slice returns IQ samples between [start,end] (best-effort). // Slice returns IQ samples between [start,end] (best-effort).
func (r *Ring) Slice(start, end time.Time) []complex64 { func (r *Ring) Slice(start, end time.Time) []complex64 {
if r == nil || end.Before(start) { if r == nil || end.Before(start) {


+ 60
- 0
internal/recorder/ring_test.go Vedi File

@@ -0,0 +1,60 @@
package recorder

import (
"testing"
"time"
)

func TestRingSampleCapacityPartialTrim(t *testing.T) {
r := NewRing(10, 2, 2) // 20 samples capacity
base := time.Unix(1700000000, 0)

push := func(start int, n int, t0 time.Time) {
s := make([]complex64, n)
for i := range s {
s[i] = complex(float32(start+i), 0)
}
r.Push(t0, s)
}

push(0, 8, base)
push(8, 8, base.Add(800*time.Millisecond))
push(16, 8, base.Add(1600*time.Millisecond))

out := r.Slice(base, base.Add(4*time.Second))
if got, want := len(out), 20; got != want {
t.Fatalf("len mismatch: got %d want %d", got, want)
}
if got, want := int(real(out[0])), 4; got != want {
t.Fatalf("first sample mismatch: got %d want %d", got, want)
}
if got, want := int(real(out[len(out)-1])), 23; got != want {
t.Fatalf("last sample mismatch: got %d want %d", got, want)
}
}

func TestRingSampleCapacityVariablePushSizes(t *testing.T) {
r := NewRing(100, 10, 1) // 100 samples capacity
base := time.Unix(1700001000, 0)
offset := 0
for i := 0; i < 10; i++ {
block := make([]complex64, 15)
for j := range block {
block[j] = complex(float32(offset+j), 0)
}
t0 := base.Add(time.Duration(float64(offset) / 100.0 * float64(time.Second)))
r.Push(t0, block)
offset += len(block)
}

out := r.Slice(base, base.Add(3*time.Second))
if got, want := len(out), 100; got != want {
t.Fatalf("len mismatch: got %d want %d", got, want)
}
if got, want := int(real(out[0])), 50; got != want {
t.Fatalf("first sample mismatch: got %d want %d", got, want)
}
if got, want := int(real(out[len(out)-1])), 149; got != want {
t.Fatalf("last sample mismatch: got %d want %d", got, want)
}
}

Loading…
Annulla
Salva