You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
6.4KB

  1. package recorder
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "sync"
  9. "time"
  10. "sdr-visual-suite/internal/demod/gpudemod"
  11. "sdr-visual-suite/internal/detector"
  12. )
  13. type Policy struct {
  14. Enabled bool `yaml:"enabled" json:"enabled"`
  15. MinSNRDb float64 `yaml:"min_snr_db" json:"min_snr_db"`
  16. MinDuration time.Duration `yaml:"min_duration" json:"min_duration"`
  17. MaxDuration time.Duration `yaml:"max_duration" json:"max_duration"`
  18. PrerollMs int `yaml:"preroll_ms" json:"preroll_ms"`
  19. RecordIQ bool `yaml:"record_iq" json:"record_iq"`
  20. RecordAudio bool `yaml:"record_audio" json:"record_audio"`
  21. AutoDemod bool `yaml:"auto_demod" json:"auto_demod"`
  22. AutoDecode bool `yaml:"auto_decode" json:"auto_decode"`
  23. MaxDiskMB int `yaml:"max_disk_mb" json:"max_disk_mb"`
  24. OutputDir string `yaml:"output_dir" json:"output_dir"`
  25. ClassFilter []string `yaml:"class_filter" json:"class_filter"`
  26. RingSeconds int `yaml:"ring_seconds" json:"ring_seconds"`
  27. }
  28. type Manager struct {
  29. mu sync.RWMutex
  30. policy Policy
  31. ring *Ring
  32. sampleRate int
  33. blockSize int
  34. centerHz float64
  35. decodeCommands map[string]string
  36. queue chan detector.Event
  37. gpuDemod *gpudemod.Engine
  38. closed bool
  39. closeOnce sync.Once
  40. workerWG sync.WaitGroup
  41. }
  42. func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) *Manager {
  43. if policy.OutputDir == "" {
  44. policy.OutputDir = "data/recordings"
  45. }
  46. if policy.RingSeconds <= 0 {
  47. policy.RingSeconds = 8
  48. }
  49. m := &Manager{policy: policy, ring: NewRing(sampleRate, blockSize, policy.RingSeconds), sampleRate: sampleRate, blockSize: blockSize, centerHz: centerHz, decodeCommands: decodeCommands, queue: make(chan detector.Event, 64)}
  50. m.initGPUDemod(sampleRate, blockSize)
  51. m.workerWG.Add(1)
  52. go m.worker()
  53. return m
  54. }
  55. func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) {
  56. m.mu.Lock()
  57. defer m.mu.Unlock()
  58. m.policy = policy
  59. m.centerHz = centerHz
  60. m.decodeCommands = decodeCommands
  61. // Only reset ring and GPU engine if sample parameters actually changed
  62. needRingReset := m.sampleRate != sampleRate || m.blockSize != blockSize
  63. m.sampleRate = sampleRate
  64. m.blockSize = blockSize
  65. if needRingReset {
  66. m.initGPUDemodLocked(sampleRate, blockSize)
  67. if m.ring == nil {
  68. m.ring = NewRing(sampleRate, blockSize, policy.RingSeconds)
  69. } else {
  70. m.ring.Reset(sampleRate, blockSize, policy.RingSeconds)
  71. }
  72. } else if m.ring == nil {
  73. m.ring = NewRing(sampleRate, blockSize, policy.RingSeconds)
  74. }
  75. }
  76. func (m *Manager) Ingest(t0 time.Time, samples []complex64) {
  77. if m == nil {
  78. return
  79. }
  80. m.mu.RLock()
  81. ring := m.ring
  82. m.mu.RUnlock()
  83. if ring == nil {
  84. return
  85. }
  86. ring.Push(t0, samples)
  87. }
  88. func (m *Manager) OnEvents(events []detector.Event) {
  89. if m == nil || len(events) == 0 {
  90. return
  91. }
  92. m.mu.RLock()
  93. enabled := m.policy.Enabled
  94. closed := m.closed
  95. m.mu.RUnlock()
  96. if !enabled || closed {
  97. return
  98. }
  99. for _, ev := range events {
  100. select {
  101. case m.queue <- ev:
  102. default:
  103. // drop if queue full
  104. }
  105. }
  106. }
  107. func (m *Manager) worker() {
  108. defer m.workerWG.Done()
  109. for ev := range m.queue {
  110. _ = m.recordEvent(ev)
  111. }
  112. }
  113. func (m *Manager) initGPUDemod(sampleRate int, blockSize int) {
  114. m.mu.Lock()
  115. defer m.mu.Unlock()
  116. m.initGPUDemodLocked(sampleRate, blockSize)
  117. }
  118. func (m *Manager) gpuEngine() *gpudemod.Engine {
  119. m.mu.RLock()
  120. defer m.mu.RUnlock()
  121. return m.gpuDemod
  122. }
  123. func (m *Manager) initGPUDemodLocked(sampleRate int, blockSize int) {
  124. if m.gpuDemod != nil {
  125. m.gpuDemod.Close()
  126. m.gpuDemod = nil
  127. }
  128. if !gpudemod.Available() {
  129. return
  130. }
  131. eng, err := gpudemod.New(blockSize, sampleRate)
  132. if err != nil {
  133. return
  134. }
  135. m.gpuDemod = eng
  136. }
  137. func (m *Manager) Close() {
  138. if m == nil {
  139. return
  140. }
  141. m.closeOnce.Do(func() {
  142. m.mu.Lock()
  143. m.closed = true
  144. if m.queue != nil {
  145. close(m.queue)
  146. }
  147. gpu := m.gpuDemod
  148. m.gpuDemod = nil
  149. m.mu.Unlock()
  150. m.workerWG.Wait()
  151. if gpu != nil {
  152. gpu.Close()
  153. }
  154. })
  155. }
  156. func (m *Manager) recordEvent(ev detector.Event) error {
  157. m.mu.RLock()
  158. policy := m.policy
  159. ring := m.ring
  160. sampleRate := m.sampleRate
  161. centerHz := m.centerHz
  162. m.mu.RUnlock()
  163. if !policy.Enabled {
  164. return nil
  165. }
  166. if ev.SNRDb < policy.MinSNRDb {
  167. return nil
  168. }
  169. dur := ev.End.Sub(ev.Start)
  170. if policy.MinDuration > 0 && dur < policy.MinDuration {
  171. return nil
  172. }
  173. if policy.MaxDuration > 0 && dur > policy.MaxDuration {
  174. return nil
  175. }
  176. if len(policy.ClassFilter) > 0 && ev.Class != nil {
  177. match := false
  178. for _, c := range policy.ClassFilter {
  179. if strings.EqualFold(c, string(ev.Class.ModType)) {
  180. match = true
  181. break
  182. }
  183. }
  184. if !match {
  185. return nil
  186. }
  187. }
  188. if !policy.RecordIQ && !policy.RecordAudio {
  189. return nil
  190. }
  191. start := ev.Start.Add(-time.Duration(policy.PrerollMs) * time.Millisecond)
  192. end := ev.End
  193. if start.After(end) {
  194. return errors.New("invalid event window")
  195. }
  196. if ring == nil {
  197. return errors.New("no ring buffer")
  198. }
  199. segment := ring.Slice(start, end)
  200. if len(segment) == 0 {
  201. return errors.New("no iq in ring")
  202. }
  203. dir := filepath.Join(policy.OutputDir, fmt.Sprintf("%s_%0.fHz_evt%d", ev.Start.Format("2006-01-02T15-04-05"), ev.CenterHz, ev.ID))
  204. if err := os.MkdirAll(dir, 0o755); err != nil {
  205. return err
  206. }
  207. files := map[string]any{}
  208. var iqPath string
  209. if policy.RecordIQ {
  210. iqPath = filepath.Join(dir, "signal.cf32")
  211. if err := writeCF32(iqPath, segment); err != nil {
  212. return err
  213. }
  214. files["iq"] = "signal.cf32"
  215. files["iq_format"] = "cf32"
  216. files["iq_sample_rate"] = sampleRate
  217. }
  218. if policy.RecordAudio && policy.AutoDemod && ev.Class != nil {
  219. if err := m.demodAndWrite(dir, ev, segment, files); err != nil {
  220. return err
  221. }
  222. }
  223. if policy.AutoDecode && iqPath != "" && ev.Class != nil {
  224. m.runDecodeIfConfigured(string(ev.Class.ModType), iqPath, sampleRate, files, dir)
  225. }
  226. _ = centerHz
  227. return writeMeta(dir, ev, sampleRate, files)
  228. }
  229. // SliceRecent returns the most recent `seconds` of raw IQ from the ring buffer.
  230. // Returns the IQ samples, sample rate, and center frequency.
  231. func (m *Manager) SliceRecent(seconds float64) ([]complex64, int, float64) {
  232. if m == nil {
  233. return nil, 0, 0
  234. }
  235. m.mu.RLock()
  236. ring := m.ring
  237. sr := m.sampleRate
  238. center := m.centerHz
  239. m.mu.RUnlock()
  240. if ring == nil || sr <= 0 {
  241. return nil, 0, 0
  242. }
  243. end := time.Now()
  244. start := end.Add(-time.Duration(seconds * float64(time.Second)))
  245. iq := ring.Slice(start, end)
  246. return iq, sr, center
  247. }