Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

374 line
9.1KB

  1. package recorder
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "sdr-wideband-suite/internal/demod/gpudemod"
  12. "sdr-wideband-suite/internal/detector"
  13. )
  14. type Policy struct {
  15. Enabled bool `yaml:"enabled" json:"enabled"`
  16. MinSNRDb float64 `yaml:"min_snr_db" json:"min_snr_db"`
  17. MinDuration time.Duration `yaml:"min_duration" json:"min_duration"`
  18. MaxDuration time.Duration `yaml:"max_duration" json:"max_duration"`
  19. PrerollMs int `yaml:"preroll_ms" json:"preroll_ms"`
  20. RecordIQ bool `yaml:"record_iq" json:"record_iq"`
  21. RecordAudio bool `yaml:"record_audio" json:"record_audio"`
  22. AutoDemod bool `yaml:"auto_demod" json:"auto_demod"`
  23. AutoDecode bool `yaml:"auto_decode" json:"auto_decode"`
  24. MaxDiskMB int `yaml:"max_disk_mb" json:"max_disk_mb"`
  25. OutputDir string `yaml:"output_dir" json:"output_dir"`
  26. ClassFilter []string `yaml:"class_filter" json:"class_filter"`
  27. RingSeconds int `yaml:"ring_seconds" json:"ring_seconds"`
  28. // Audio quality (AQ-2, AQ-3, AQ-5)
  29. DeemphasisUs float64 `yaml:"deemphasis_us" json:"deemphasis_us"`
  30. ExtractionTaps int `yaml:"extraction_fir_taps" json:"extraction_fir_taps"`
  31. ExtractionBwMult float64 `yaml:"extraction_bw_mult" json:"extraction_bw_mult"`
  32. }
  33. type Manager struct {
  34. mu sync.RWMutex
  35. policy Policy
  36. ring *Ring
  37. sampleRate int
  38. blockSize int
  39. centerHz float64
  40. decodeCommands map[string]string
  41. queue chan detector.Event
  42. gpuDemod *gpudemod.Engine
  43. closed bool
  44. closeOnce sync.Once
  45. workerWG sync.WaitGroup
  46. // Streaming recorder
  47. streamer *Streamer
  48. streamedIDs map[int64]bool // signal IDs that were streamed (skip retroactive recording)
  49. streamedMu sync.Mutex
  50. }
  51. func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) *Manager {
  52. if policy.OutputDir == "" {
  53. policy.OutputDir = "data/recordings"
  54. }
  55. if policy.RingSeconds <= 0 {
  56. policy.RingSeconds = 8
  57. }
  58. m := &Manager{
  59. policy: policy,
  60. ring: NewRing(sampleRate, blockSize, policy.RingSeconds),
  61. sampleRate: sampleRate,
  62. blockSize: blockSize,
  63. centerHz: centerHz,
  64. decodeCommands: decodeCommands,
  65. queue: make(chan detector.Event, 64),
  66. streamer: newStreamer(policy, centerHz),
  67. streamedIDs: make(map[int64]bool),
  68. }
  69. m.initGPUDemod(sampleRate, blockSize)
  70. m.workerWG.Add(1)
  71. go m.worker()
  72. return m
  73. }
  74. func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) {
  75. m.mu.Lock()
  76. defer m.mu.Unlock()
  77. m.policy = policy
  78. m.centerHz = centerHz
  79. m.decodeCommands = decodeCommands
  80. // Only reset ring and GPU engine if sample parameters actually changed
  81. needRingReset := m.sampleRate != sampleRate || m.blockSize != blockSize
  82. m.sampleRate = sampleRate
  83. m.blockSize = blockSize
  84. if needRingReset {
  85. m.initGPUDemodLocked(sampleRate, blockSize)
  86. if m.ring == nil {
  87. m.ring = NewRing(sampleRate, blockSize, policy.RingSeconds)
  88. } else {
  89. m.ring.Reset(sampleRate, blockSize, policy.RingSeconds)
  90. }
  91. } else if m.ring == nil {
  92. m.ring = NewRing(sampleRate, blockSize, policy.RingSeconds)
  93. }
  94. if m.streamer != nil {
  95. m.streamer.updatePolicy(policy, centerHz)
  96. }
  97. }
  98. func (m *Manager) Ingest(t0 time.Time, samples []complex64) {
  99. if m == nil {
  100. return
  101. }
  102. m.mu.RLock()
  103. ring := m.ring
  104. m.mu.RUnlock()
  105. if ring == nil {
  106. return
  107. }
  108. ring.Push(t0, samples)
  109. }
  110. func (m *Manager) OnEvents(events []detector.Event) {
  111. if m == nil || len(events) == 0 {
  112. return
  113. }
  114. m.mu.RLock()
  115. enabled := m.policy.Enabled
  116. closed := m.closed
  117. m.mu.RUnlock()
  118. if !enabled || closed {
  119. return
  120. }
  121. for _, ev := range events {
  122. select {
  123. case m.queue <- ev:
  124. default:
  125. // drop if queue full
  126. }
  127. }
  128. }
  129. func (m *Manager) worker() {
  130. defer m.workerWG.Done()
  131. for ev := range m.queue {
  132. _ = m.recordEvent(ev)
  133. }
  134. }
  135. func (m *Manager) initGPUDemod(sampleRate int, blockSize int) {
  136. m.mu.Lock()
  137. defer m.mu.Unlock()
  138. m.initGPUDemodLocked(sampleRate, blockSize)
  139. }
  140. func (m *Manager) gpuEngine() *gpudemod.Engine {
  141. m.mu.RLock()
  142. defer m.mu.RUnlock()
  143. return m.gpuDemod
  144. }
  145. func (m *Manager) initGPUDemodLocked(sampleRate int, blockSize int) {
  146. if m.gpuDemod != nil {
  147. m.gpuDemod.Close()
  148. m.gpuDemod = nil
  149. }
  150. if !gpudemod.Available() {
  151. return
  152. }
  153. eng, err := gpudemod.New(blockSize, sampleRate)
  154. if err != nil {
  155. return
  156. }
  157. m.gpuDemod = eng
  158. }
  159. func (m *Manager) Close() {
  160. if m == nil {
  161. return
  162. }
  163. m.closeOnce.Do(func() {
  164. // Close all active streaming sessions first
  165. if m.streamer != nil {
  166. m.streamer.CloseAll()
  167. }
  168. m.mu.Lock()
  169. m.closed = true
  170. if m.queue != nil {
  171. close(m.queue)
  172. }
  173. gpu := m.gpuDemod
  174. m.gpuDemod = nil
  175. m.mu.Unlock()
  176. m.workerWG.Wait()
  177. if gpu != nil {
  178. gpu.Close()
  179. }
  180. })
  181. }
  182. func (m *Manager) recordEvent(ev detector.Event) error {
  183. // Skip events that were already recorded via streaming
  184. m.streamedMu.Lock()
  185. wasStreamed := m.streamedIDs[ev.ID]
  186. delete(m.streamedIDs, ev.ID) // clean up — event is finished
  187. m.streamedMu.Unlock()
  188. if wasStreamed {
  189. log.Printf("STREAM: skipping retroactive recording for signal %d (already streamed)", ev.ID)
  190. return nil
  191. }
  192. m.mu.RLock()
  193. policy := m.policy
  194. ring := m.ring
  195. sampleRate := m.sampleRate
  196. centerHz := m.centerHz
  197. m.mu.RUnlock()
  198. if !policy.Enabled {
  199. return nil
  200. }
  201. if ev.SNRDb < policy.MinSNRDb {
  202. return nil
  203. }
  204. dur := ev.End.Sub(ev.Start)
  205. if policy.MinDuration > 0 && dur < policy.MinDuration {
  206. return nil
  207. }
  208. if policy.MaxDuration > 0 && dur > policy.MaxDuration {
  209. return nil
  210. }
  211. if len(policy.ClassFilter) > 0 && ev.Class != nil {
  212. match := false
  213. for _, c := range policy.ClassFilter {
  214. if strings.EqualFold(c, string(ev.Class.ModType)) {
  215. match = true
  216. break
  217. }
  218. }
  219. if !match {
  220. return nil
  221. }
  222. }
  223. if !policy.RecordIQ && !policy.RecordAudio {
  224. return nil
  225. }
  226. start := ev.Start.Add(-time.Duration(policy.PrerollMs) * time.Millisecond)
  227. end := ev.End
  228. if start.After(end) {
  229. return errors.New("invalid event window")
  230. }
  231. if ring == nil {
  232. return errors.New("no ring buffer")
  233. }
  234. segment := ring.Slice(start, end)
  235. if len(segment) == 0 {
  236. return errors.New("no iq in ring")
  237. }
  238. dir := filepath.Join(policy.OutputDir, fmt.Sprintf("%s_%0.fHz_evt%d", ev.Start.Format("2006-01-02T15-04-05"), ev.CenterHz, ev.ID))
  239. if err := os.MkdirAll(dir, 0o755); err != nil {
  240. return err
  241. }
  242. files := map[string]any{}
  243. var iqPath string
  244. if policy.RecordIQ {
  245. iqPath = filepath.Join(dir, "signal.cf32")
  246. if err := writeCF32(iqPath, segment); err != nil {
  247. return err
  248. }
  249. files["iq"] = "signal.cf32"
  250. files["iq_format"] = "cf32"
  251. files["iq_sample_rate"] = sampleRate
  252. }
  253. if policy.RecordAudio && policy.AutoDemod && ev.Class != nil {
  254. if err := m.demodAndWrite(dir, ev, segment, files); err != nil {
  255. return err
  256. }
  257. }
  258. if policy.AutoDecode && iqPath != "" && ev.Class != nil {
  259. m.runDecodeIfConfigured(string(ev.Class.ModType), iqPath, sampleRate, files, dir)
  260. }
  261. _ = centerHz
  262. return writeMeta(dir, ev, sampleRate, files)
  263. }
  264. // SliceRecent returns the most recent `seconds` of raw IQ from the ring buffer.
  265. // Returns the IQ samples, sample rate, and center frequency.
  266. func (m *Manager) SliceRecent(seconds float64) ([]complex64, int, float64) {
  267. if m == nil {
  268. return nil, 0, 0
  269. }
  270. m.mu.RLock()
  271. ring := m.ring
  272. sr := m.sampleRate
  273. center := m.centerHz
  274. m.mu.RUnlock()
  275. if ring == nil || sr <= 0 {
  276. return nil, 0, 0
  277. }
  278. end := time.Now()
  279. start := end.Add(-time.Duration(seconds * float64(time.Second)))
  280. iq := ring.Slice(start, end)
  281. return iq, sr, center
  282. }
  283. // FeedSnippets is called once per DSP frame with pre-extracted IQ snippets
  284. // (GPU-accelerated FreqShift+FIR+Decimate). The Streamer handles demod with
  285. // persistent state (overlap-save, stereo decode, de-emphasis) asynchronously.
  286. func (m *Manager) FeedSnippets(items []StreamFeedItem) {
  287. if m == nil || m.streamer == nil || len(items) == 0 {
  288. return
  289. }
  290. m.mu.RLock()
  291. closed := m.closed
  292. m.mu.RUnlock()
  293. if closed {
  294. return
  295. }
  296. // Mark all signal IDs so recordEvent skips them
  297. m.streamedMu.Lock()
  298. for _, item := range items {
  299. if item.Signal.ID != 0 {
  300. m.streamedIDs[item.Signal.ID] = true
  301. }
  302. }
  303. m.streamedMu.Unlock()
  304. // Convert to internal type
  305. internal := make([]streamFeedItem, len(items))
  306. for i, item := range items {
  307. internal[i] = streamFeedItem{
  308. signal: item.Signal,
  309. snippet: item.Snippet,
  310. snipRate: item.SnipRate,
  311. }
  312. }
  313. m.streamer.FeedSnippets(internal)
  314. }
  315. // StreamFeedItem is the public type for passing extracted snippets from DSP loop.
  316. type StreamFeedItem struct {
  317. Signal detector.Signal
  318. Snippet []complex64
  319. SnipRate int
  320. }
  321. // Streamer returns the underlying Streamer for live-listen subscriptions.
  322. func (m *Manager) StreamerRef() *Streamer {
  323. if m == nil {
  324. return nil
  325. }
  326. return m.streamer
  327. }
  328. // ActiveStreams returns info about currently active streaming sessions.
  329. func (m *Manager) ActiveStreams() int {
  330. if m == nil || m.streamer == nil {
  331. return 0
  332. }
  333. return m.streamer.ActiveSessions()
  334. }
  335. // HasListeners returns true if any live-listen subscribers are active or pending.
  336. func (m *Manager) HasListeners() bool {
  337. if m == nil || m.streamer == nil {
  338. return false
  339. }
  340. return m.streamer.HasListeners()
  341. }