Explorar el Código

Add graceful shutdown for recorder worker

master
Jan Svabenik hace 2 días
padre
commit
74dc899907
Se han modificado 2 ficheros con 28 adiciones y 1 borrados
  1. +1
    -0
      cmd/sdrd/main.go
  2. +27
    -1
      internal/recorder/recorder.go

+ 1
- 0
cmd/sdrd/main.go Ver fichero

@@ -102,6 +102,7 @@ func main() {
ClassFilter: cfg.Recorder.ClassFilter,
RingSeconds: cfg.Recorder.RingSeconds,
}, cfg.CenterHz, decodeMap)
defer recMgr.Close()

sigSnap := &signalSnapshot{}



+ 27
- 1
internal/recorder/recorder.go Ver fichero

@@ -39,6 +39,9 @@ type Manager struct {
decodeCommands map[string]string
queue chan detector.Event
gpuDemod *gpudemod.Engine
closed bool
closeOnce sync.Once
workerWG sync.WaitGroup
}

func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) *Manager {
@@ -50,6 +53,7 @@ func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeC
}
m := &Manager{policy: policy, ring: NewRing(sampleRate, blockSize, policy.RingSeconds), sampleRate: sampleRate, blockSize: blockSize, centerHz: centerHz, decodeCommands: decodeCommands, queue: make(chan detector.Event, 64)}
m.initGPUDemod(sampleRate, blockSize)
m.workerWG.Add(1)
go m.worker()
return m
}
@@ -89,8 +93,9 @@ func (m *Manager) OnEvents(events []detector.Event) {
}
m.mu.RLock()
enabled := m.policy.Enabled
closed := m.closed
m.mu.RUnlock()
if !enabled {
if !enabled || closed {
return
}
for _, ev := range events {
@@ -103,6 +108,7 @@ func (m *Manager) OnEvents(events []detector.Event) {
}

func (m *Manager) worker() {
defer m.workerWG.Done()
for ev := range m.queue {
_ = m.recordEvent(ev)
}
@@ -129,6 +135,26 @@ func (m *Manager) initGPUDemodLocked(sampleRate int, blockSize int) {
m.gpuDemod = eng
}

func (m *Manager) Close() {
if m == nil {
return
}
m.closeOnce.Do(func() {
m.mu.Lock()
m.closed = true
if m.queue != nil {
close(m.queue)
}
gpu := m.gpuDemod
m.gpuDemod = nil
m.mu.Unlock()
m.workerWG.Wait()
if gpu != nil {
gpu.Close()
}
})
}

func (m *Manager) recordEvent(ev detector.Event) error {
m.mu.RLock()
policy := m.policy


Cargando…
Cancelar
Guardar