Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

382 lignes
9.4KB

  1. package recorder
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "sdr-wideband-suite/internal/demod/gpudemod"
  12. "sdr-wideband-suite/internal/detector"
  13. )
  14. type Policy struct {
  15. Enabled bool `yaml:"enabled" json:"enabled"`
  16. MinSNRDb float64 `yaml:"min_snr_db" json:"min_snr_db"`
  17. MinDuration time.Duration `yaml:"min_duration" json:"min_duration"`
  18. MaxDuration time.Duration `yaml:"max_duration" json:"max_duration"`
  19. PrerollMs int `yaml:"preroll_ms" json:"preroll_ms"`
  20. RecordIQ bool `yaml:"record_iq" json:"record_iq"`
  21. RecordAudio bool `yaml:"record_audio" json:"record_audio"`
  22. AutoDemod bool `yaml:"auto_demod" json:"auto_demod"`
  23. AutoDecode bool `yaml:"auto_decode" json:"auto_decode"`
  24. MaxDiskMB int `yaml:"max_disk_mb" json:"max_disk_mb"`
  25. OutputDir string `yaml:"output_dir" json:"output_dir"`
  26. ClassFilter []string `yaml:"class_filter" json:"class_filter"`
  27. RingSeconds int `yaml:"ring_seconds" json:"ring_seconds"`
  28. // Audio quality (AQ-2, AQ-3, AQ-5)
  29. DeemphasisUs float64 `yaml:"deemphasis_us" json:"deemphasis_us"`
  30. ExtractionTaps int `yaml:"extraction_fir_taps" json:"extraction_fir_taps"`
  31. ExtractionBwMult float64 `yaml:"extraction_bw_mult" json:"extraction_bw_mult"`
  32. DebugLiveAudio bool `yaml:"debug_live_audio" json:"debug_live_audio"`
  33. }
  34. type Manager struct {
  35. mu sync.RWMutex
  36. policy Policy
  37. ring *Ring
  38. sampleRate int
  39. blockSize int
  40. centerHz float64
  41. decodeCommands map[string]string
  42. queue chan detector.Event
  43. gpuDemod *gpudemod.Engine
  44. closed bool
  45. closeOnce sync.Once
  46. workerWG sync.WaitGroup
  47. // Streaming recorder
  48. streamer *Streamer
  49. streamedIDs map[int64]bool // signal IDs that were streamed (skip retroactive recording)
  50. streamedMu sync.Mutex
  51. }
  52. func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) *Manager {
  53. if policy.OutputDir == "" {
  54. policy.OutputDir = "data/recordings"
  55. }
  56. if policy.RingSeconds <= 0 {
  57. policy.RingSeconds = 8
  58. }
  59. m := &Manager{
  60. policy: policy,
  61. ring: NewRing(sampleRate, blockSize, policy.RingSeconds),
  62. sampleRate: sampleRate,
  63. blockSize: blockSize,
  64. centerHz: centerHz,
  65. decodeCommands: decodeCommands,
  66. queue: make(chan detector.Event, 64),
  67. streamer: newStreamer(policy, centerHz),
  68. streamedIDs: make(map[int64]bool),
  69. }
  70. m.initGPUDemod(sampleRate, blockSize)
  71. m.workerWG.Add(1)
  72. go m.worker()
  73. return m
  74. }
  75. func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) {
  76. m.mu.Lock()
  77. defer m.mu.Unlock()
  78. m.policy = policy
  79. m.centerHz = centerHz
  80. m.decodeCommands = decodeCommands
  81. // Only reset ring and GPU engine if sample parameters actually changed
  82. needRingReset := m.sampleRate != sampleRate || m.blockSize != blockSize
  83. m.sampleRate = sampleRate
  84. m.blockSize = blockSize
  85. if needRingReset {
  86. m.initGPUDemodLocked(sampleRate, blockSize)
  87. if m.ring == nil {
  88. m.ring = NewRing(sampleRate, blockSize, policy.RingSeconds)
  89. } else {
  90. m.ring.Reset(sampleRate, blockSize, policy.RingSeconds)
  91. }
  92. } else if m.ring == nil {
  93. m.ring = NewRing(sampleRate, blockSize, policy.RingSeconds)
  94. }
  95. if m.streamer != nil {
  96. m.streamer.updatePolicy(policy, centerHz)
  97. }
  98. }
  99. func (m *Manager) Ingest(t0 time.Time, samples []complex64) {
  100. if m == nil {
  101. return
  102. }
  103. m.mu.RLock()
  104. ring := m.ring
  105. m.mu.RUnlock()
  106. if ring == nil {
  107. return
  108. }
  109. ring.Push(t0, samples)
  110. }
  111. func (m *Manager) OnEvents(events []detector.Event) {
  112. if m == nil || len(events) == 0 {
  113. return
  114. }
  115. m.mu.RLock()
  116. enabled := m.policy.Enabled
  117. closed := m.closed
  118. m.mu.RUnlock()
  119. if !enabled || closed {
  120. return
  121. }
  122. for _, ev := range events {
  123. select {
  124. case m.queue <- ev:
  125. default:
  126. // drop if queue full
  127. }
  128. }
  129. }
  130. func (m *Manager) worker() {
  131. defer m.workerWG.Done()
  132. for ev := range m.queue {
  133. _ = m.recordEvent(ev)
  134. }
  135. }
  136. func (m *Manager) initGPUDemod(sampleRate int, blockSize int) {
  137. m.mu.Lock()
  138. defer m.mu.Unlock()
  139. m.initGPUDemodLocked(sampleRate, blockSize)
  140. }
  141. func (m *Manager) gpuEngine() *gpudemod.Engine {
  142. m.mu.RLock()
  143. defer m.mu.RUnlock()
  144. return m.gpuDemod
  145. }
  146. func (m *Manager) initGPUDemodLocked(sampleRate int, blockSize int) {
  147. if m.gpuDemod != nil {
  148. m.gpuDemod.Close()
  149. m.gpuDemod = nil
  150. }
  151. if !gpudemod.Available() {
  152. return
  153. }
  154. eng, err := gpudemod.New(blockSize, sampleRate)
  155. if err != nil {
  156. return
  157. }
  158. m.gpuDemod = eng
  159. }
  160. func (m *Manager) Close() {
  161. if m == nil {
  162. return
  163. }
  164. m.closeOnce.Do(func() {
  165. // Close all active streaming sessions first
  166. if m.streamer != nil {
  167. m.streamer.CloseAll()
  168. }
  169. m.mu.Lock()
  170. m.closed = true
  171. if m.queue != nil {
  172. close(m.queue)
  173. }
  174. gpu := m.gpuDemod
  175. m.gpuDemod = nil
  176. m.mu.Unlock()
  177. m.workerWG.Wait()
  178. if gpu != nil {
  179. gpu.Close()
  180. }
  181. })
  182. }
  183. func (m *Manager) recordEvent(ev detector.Event) error {
  184. // Skip events that were already recorded via streaming
  185. m.streamedMu.Lock()
  186. wasStreamed := m.streamedIDs[ev.ID]
  187. delete(m.streamedIDs, ev.ID) // clean up — event is finished
  188. m.streamedMu.Unlock()
  189. if wasStreamed {
  190. log.Printf("STREAM: skipping retroactive recording for signal %d (already streamed)", ev.ID)
  191. return nil
  192. }
  193. m.mu.RLock()
  194. policy := m.policy
  195. ring := m.ring
  196. sampleRate := m.sampleRate
  197. centerHz := m.centerHz
  198. m.mu.RUnlock()
  199. if !policy.Enabled {
  200. return nil
  201. }
  202. if ev.SNRDb < policy.MinSNRDb {
  203. return nil
  204. }
  205. dur := ev.End.Sub(ev.Start)
  206. if policy.MinDuration > 0 && dur < policy.MinDuration {
  207. return nil
  208. }
  209. if policy.MaxDuration > 0 && dur > policy.MaxDuration {
  210. return nil
  211. }
  212. if len(policy.ClassFilter) > 0 && ev.Class != nil {
  213. match := false
  214. for _, c := range policy.ClassFilter {
  215. if strings.EqualFold(c, string(ev.Class.ModType)) {
  216. match = true
  217. break
  218. }
  219. }
  220. if !match {
  221. return nil
  222. }
  223. }
  224. if !policy.RecordIQ && !policy.RecordAudio {
  225. return nil
  226. }
  227. start := ev.Start.Add(-time.Duration(policy.PrerollMs) * time.Millisecond)
  228. end := ev.End
  229. if start.After(end) {
  230. return errors.New("invalid event window")
  231. }
  232. if ring == nil {
  233. return errors.New("no ring buffer")
  234. }
  235. segment := ring.Slice(start, end)
  236. if len(segment) == 0 {
  237. return errors.New("no iq in ring")
  238. }
  239. dir := filepath.Join(policy.OutputDir, fmt.Sprintf("%s_%0.fHz_evt%d", ev.Start.Format("2006-01-02T15-04-05"), ev.CenterHz, ev.ID))
  240. if err := os.MkdirAll(dir, 0o755); err != nil {
  241. return err
  242. }
  243. files := map[string]any{}
  244. var iqPath string
  245. if policy.RecordIQ {
  246. iqPath = filepath.Join(dir, "signal.cf32")
  247. if err := writeCF32(iqPath, segment); err != nil {
  248. return err
  249. }
  250. files["iq"] = "signal.cf32"
  251. files["iq_format"] = "cf32"
  252. files["iq_sample_rate"] = sampleRate
  253. }
  254. if policy.RecordAudio && policy.AutoDemod && ev.Class != nil {
  255. if err := m.demodAndWrite(dir, ev, segment, files); err != nil {
  256. return err
  257. }
  258. }
  259. if policy.AutoDecode && iqPath != "" && ev.Class != nil {
  260. m.runDecodeIfConfigured(string(ev.Class.ModType), iqPath, sampleRate, files, dir)
  261. }
  262. _ = centerHz
  263. return writeMeta(dir, ev, sampleRate, files)
  264. }
  265. // SliceRecent returns the most recent `seconds` of raw IQ from the ring buffer.
  266. // Returns the IQ samples, sample rate, and center frequency.
  267. func (m *Manager) SliceRecent(seconds float64) ([]complex64, int, float64) {
  268. if m == nil {
  269. return nil, 0, 0
  270. }
  271. m.mu.RLock()
  272. ring := m.ring
  273. sr := m.sampleRate
  274. center := m.centerHz
  275. m.mu.RUnlock()
  276. if ring == nil || sr <= 0 {
  277. return nil, 0, 0
  278. }
  279. end := time.Now()
  280. start := end.Add(-time.Duration(seconds * float64(time.Second)))
  281. iq := ring.Slice(start, end)
  282. return iq, sr, center
  283. }
  284. // FeedSnippets is called once per DSP frame with pre-extracted IQ snippets
  285. // (GPU-accelerated FreqShift+FIR+Decimate). The Streamer handles demod with
  286. // persistent state (overlap-save, stereo decode, de-emphasis) asynchronously.
  287. func (m *Manager) FeedSnippets(items []StreamFeedItem) {
  288. if m == nil || m.streamer == nil || len(items) == 0 {
  289. return
  290. }
  291. m.mu.RLock()
  292. closed := m.closed
  293. m.mu.RUnlock()
  294. if closed {
  295. return
  296. }
  297. // Mark all signal IDs so recordEvent skips them
  298. m.streamedMu.Lock()
  299. for _, item := range items {
  300. if item.Signal.ID != 0 {
  301. m.streamedIDs[item.Signal.ID] = true
  302. }
  303. }
  304. m.streamedMu.Unlock()
  305. // Convert to internal type
  306. internal := make([]streamFeedItem, len(items))
  307. for i, item := range items {
  308. internal[i] = streamFeedItem{
  309. signal: item.Signal,
  310. snippet: item.Snippet,
  311. snipRate: item.SnipRate,
  312. }
  313. }
  314. m.streamer.FeedSnippets(internal)
  315. }
  316. // StreamFeedItem is the public type for passing extracted snippets from DSP loop.
  317. type StreamFeedItem struct {
  318. Signal detector.Signal
  319. Snippet []complex64
  320. SnipRate int
  321. }
  322. // Streamer returns the underlying Streamer for live-listen subscriptions.
  323. func (m *Manager) StreamerRef() *Streamer {
  324. if m == nil {
  325. return nil
  326. }
  327. return m.streamer
  328. }
  329. func (m *Manager) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  330. if m == nil || m.streamer == nil {
  331. return nil
  332. }
  333. return m.streamer.RuntimeInfoBySignalID()
  334. }
  335. // ActiveStreams returns info about currently active streaming sessions.
  336. func (m *Manager) ActiveStreams() int {
  337. if m == nil || m.streamer == nil {
  338. return 0
  339. }
  340. return m.streamer.ActiveSessions()
  341. }
  342. // HasListeners returns true if any live-listen subscribers are active or pending.
  343. func (m *Manager) HasListeners() bool {
  344. if m == nil || m.streamer == nil {
  345. return false
  346. }
  347. return m.streamer.HasListeners()
  348. }