Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

434 рядки
14KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log"
  6. "math"
  7. "os"
  8. "runtime/debug"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "sdr-wideband-suite/internal/classifier"
  14. "sdr-wideband-suite/internal/config"
  15. "sdr-wideband-suite/internal/demod"
  16. "sdr-wideband-suite/internal/detector"
  17. "sdr-wideband-suite/internal/dsp"
  18. fftutil "sdr-wideband-suite/internal/fft"
  19. "sdr-wideband-suite/internal/fft/gpufft"
  20. "sdr-wideband-suite/internal/rds"
  21. "sdr-wideband-suite/internal/recorder"
  22. "sdr-wideband-suite/internal/pipeline"
  23. )
  24. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot, extractMgr *extractionManager) {
  25. defer func() {
  26. if r := recover(); r != nil {
  27. log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack())
  28. }
  29. }()
  30. ticker := time.NewTicker(cfg.FrameInterval())
  31. defer ticker.Stop()
  32. logTicker := time.NewTicker(5 * time.Second)
  33. defer logTicker.Stop()
  34. enc := json.NewEncoder(eventFile)
  35. dcBlocker := dsp.NewDCBlocker(0.995)
  36. dcEnabled := cfg.DCBlock
  37. iqEnabled := cfg.IQBalance
  38. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  39. useGPU := cfg.UseGPUFFT
  40. // Persistent RDS decoders per signal — async ring-buffer based
  41. type rdsState struct {
  42. dec rds.Decoder
  43. result rds.Result
  44. lastDecode time.Time
  45. busy int32 // atomic: 1 = goroutine running
  46. mu sync.Mutex
  47. }
  48. rdsMap := map[int64]*rdsState{}
  49. // Streaming extraction state: per-signal phase + IQ overlap for FIR halo
  50. streamPhaseState := map[int64]*streamExtractState{}
  51. streamOverlap := &streamIQOverlap{}
  52. var gpuEngine *gpufft.Engine
  53. if useGPU && gpuState != nil {
  54. snap := gpuState.snapshot()
  55. if snap.Available {
  56. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  57. gpuEngine = eng
  58. gpuState.set(true, nil)
  59. } else {
  60. gpuState.set(false, err)
  61. useGPU = false
  62. }
  63. } else {
  64. gpuState.set(false, nil)
  65. useGPU = false
  66. }
  67. } else if gpuState != nil {
  68. gpuState.set(false, nil)
  69. }
  70. gotSamples := false
  71. for {
  72. select {
  73. case <-ctx.Done():
  74. return
  75. case <-logTicker.C:
  76. st := srcMgr.Stats()
  77. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  78. case upd := <-updates:
  79. prevFFT := cfg.FFTSize
  80. prevUseGPU := useGPU
  81. cfg = upd.cfg
  82. if rec != nil {
  83. rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  84. Enabled: cfg.Recorder.Enabled,
  85. MinSNRDb: cfg.Recorder.MinSNRDb,
  86. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  87. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  88. PrerollMs: cfg.Recorder.PrerollMs,
  89. RecordIQ: cfg.Recorder.RecordIQ,
  90. RecordAudio: cfg.Recorder.RecordAudio,
  91. AutoDemod: cfg.Recorder.AutoDemod,
  92. AutoDecode: cfg.Recorder.AutoDecode,
  93. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  94. OutputDir: cfg.Recorder.OutputDir,
  95. ClassFilter: cfg.Recorder.ClassFilter,
  96. RingSeconds: cfg.Recorder.RingSeconds,
  97. DeemphasisUs: cfg.Recorder.DeemphasisUs,
  98. ExtractionTaps: cfg.Recorder.ExtractionTaps,
  99. ExtractionBwMult: cfg.Recorder.ExtractionBwMult,
  100. }, cfg.CenterHz, buildDecoderMap(cfg))
  101. }
  102. if upd.det != nil {
  103. det = upd.det
  104. }
  105. if upd.window != nil {
  106. window = upd.window
  107. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  108. }
  109. dcEnabled = upd.dcBlock
  110. iqEnabled = upd.iqBalance
  111. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  112. srcMgr.Flush()
  113. gotSamples = false
  114. if gpuEngine != nil {
  115. gpuEngine.Close()
  116. gpuEngine = nil
  117. }
  118. useGPU = cfg.UseGPUFFT
  119. if useGPU && gpuState != nil {
  120. snap := gpuState.snapshot()
  121. if snap.Available {
  122. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  123. gpuEngine = eng
  124. gpuState.set(true, nil)
  125. } else {
  126. gpuState.set(false, err)
  127. useGPU = false
  128. }
  129. } else {
  130. gpuState.set(false, nil)
  131. useGPU = false
  132. }
  133. } else if gpuState != nil {
  134. gpuState.set(false, nil)
  135. }
  136. }
  137. dcBlocker.Reset()
  138. ticker.Reset(cfg.FrameInterval())
  139. case <-ticker.C:
  140. // Pipeline phases:
  141. // 1) ingest IQ
  142. // 2) build surveillance spectrum
  143. // 3) detect coarse candidates
  144. // 4) refine locally with IQ snippets + classification
  145. // 5) update tracking / events / recorder / presentation
  146. // Read all available IQ data — not just one FFT block.
  147. // This ensures the ring buffer captures 100% of IQ for recording/demod.
  148. available := cfg.FFTSize
  149. st := srcMgr.Stats()
  150. if st.BufferSamples > cfg.FFTSize {
  151. // Round down to multiple of FFTSize for clean processing
  152. available = (st.BufferSamples / cfg.FFTSize) * cfg.FFTSize
  153. if available < cfg.FFTSize {
  154. available = cfg.FFTSize
  155. }
  156. }
  157. allIQ, err := srcMgr.ReadIQ(available)
  158. if err != nil {
  159. log.Printf("read IQ: %v", err)
  160. if strings.Contains(err.Error(), "timeout") {
  161. if err := srcMgr.Restart(cfg); err != nil {
  162. log.Printf("restart failed: %v", err)
  163. }
  164. }
  165. continue
  166. }
  167. // Ingest ALL IQ data into the ring buffer for recording
  168. if rec != nil {
  169. rec.Ingest(time.Now(), allIQ)
  170. }
  171. // Use only the last FFT block for spectrum display
  172. iq := allIQ
  173. if len(allIQ) > cfg.FFTSize {
  174. iq = allIQ[len(allIQ)-cfg.FFTSize:]
  175. }
  176. if !gotSamples {
  177. log.Printf("received IQ samples")
  178. gotSamples = true
  179. }
  180. if dcEnabled {
  181. dcBlocker.Apply(iq)
  182. }
  183. if iqEnabled {
  184. dsp.IQBalance(iq)
  185. }
  186. var spectrum []float64
  187. if useGPU && gpuEngine != nil {
  188. // GPU FFT: apply window to a COPY — allIQ must stay unmodified
  189. // for extractForStreaming which needs raw IQ for signal extraction.
  190. gpuBuf := make([]complex64, len(iq))
  191. if len(window) == len(iq) {
  192. for i := 0; i < len(iq); i++ {
  193. v := iq[i]
  194. w := float32(window[i])
  195. gpuBuf[i] = complex(real(v)*w, imag(v)*w)
  196. }
  197. } else {
  198. copy(gpuBuf, iq)
  199. }
  200. out, err := gpuEngine.Exec(gpuBuf)
  201. if err != nil {
  202. if gpuState != nil {
  203. gpuState.set(false, err)
  204. }
  205. useGPU = false
  206. spectrum = fftutil.SpectrumWithPlan(gpuBuf, nil, plan)
  207. } else {
  208. spectrum = fftutil.SpectrumFromFFT(out)
  209. }
  210. } else {
  211. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  212. }
  213. for i := range spectrum {
  214. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  215. spectrum[i] = -200
  216. }
  217. }
  218. now := time.Now()
  219. finished, detectedSignals := det.Process(now, spectrum, cfg.CenterHz)
  220. candidates := pipeline.CandidatesFromSignals(detectedSignals, "surveillance-detector")
  221. thresholds := det.LastThresholds()
  222. noiseFloor := det.LastNoiseFloor()
  223. var displaySignals []detector.Signal
  224. if len(iq) > 0 {
  225. snips, snipRates := extractSignalIQBatch(extractMgr, iq, cfg.SampleRate, cfg.CenterHz, detectedSignals)
  226. refined := pipeline.RefineCandidates(candidates, spectrum, cfg.SampleRate, cfg.FFTSize, snips, snipRates, classifier.ClassifierMode(cfg.ClassifierMode))
  227. signals := make([]detector.Signal, 0, len(refined))
  228. for i, ref := range refined {
  229. sig := ref.Signal
  230. signals = append(signals, sig)
  231. cls := sig.Class
  232. snipRate := ref.SnippetRate
  233. if cls != nil {
  234. pll := classifier.PLLResult{}
  235. if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
  236. pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
  237. cls.PLL = &pll
  238. signals[i].PLL = &pll
  239. if cls.ModType == classifier.ClassWFM && pll.Stereo {
  240. cls.ModType = classifier.ClassWFMStereo
  241. }
  242. }
  243. // RDS decode for WFM — async, uses ring buffer for continuous IQ
  244. if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil {
  245. keyHz := pll.ExactHz
  246. if keyHz == 0 {
  247. keyHz = signals[i].CenterHz
  248. }
  249. key := int64(math.Round(keyHz / 25000.0))
  250. st := rdsMap[key]
  251. if st == nil {
  252. st = &rdsState{}
  253. rdsMap[key] = st
  254. }
  255. // Launch async decode every 4 seconds, skip if previous still running
  256. if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
  257. st.lastDecode = now
  258. atomic.StoreInt32(&st.busy, 1)
  259. go func(st *rdsState, sigHz float64) {
  260. defer atomic.StoreInt32(&st.busy, 0)
  261. ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
  262. if len(ringIQ) < ringSR || ringSR <= 0 {
  263. return
  264. }
  265. // Shift FM station to center
  266. offset := sigHz - ringCenter
  267. shifted := dsp.FreqShift(ringIQ, ringSR, offset)
  268. // Two-stage decimation to ~250kHz with proper anti-alias
  269. // Stage 1: 4MHz → 1MHz (decim 4), LP at 400kHz
  270. decim1 := ringSR / 1000000
  271. if decim1 < 1 {
  272. decim1 = 1
  273. }
  274. lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
  275. f1 := dsp.ApplyFIR(shifted, lp1)
  276. d1 := dsp.Decimate(f1, decim1)
  277. rate1 := ringSR / decim1
  278. // Stage 2: 1MHz → 250kHz (decim 4), LP at 100kHz
  279. decim2 := rate1 / 250000
  280. if decim2 < 1 {
  281. decim2 = 1
  282. }
  283. lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
  284. f2 := dsp.ApplyFIR(d1, lp2)
  285. decimated := dsp.Decimate(f2, decim2)
  286. actualRate := rate1 / decim2
  287. // RDS baseband extraction on the clean decimated block
  288. rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
  289. if len(rdsBase.Samples) == 0 {
  290. return
  291. }
  292. st.mu.Lock()
  293. result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
  294. diag := st.dec.LastDiag
  295. if result.PS != "" {
  296. st.result = result
  297. }
  298. st.mu.Unlock()
  299. log.Printf("RDS TRACE: ring decode freq=%.1fMHz decIQ=%d decSR=%d bbLen=%d bbRate=%d PI=%04X PS=%q %s",
  300. sigHz/1e6, len(decimated), actualRate, len(rdsBase.Samples), rdsBase.SampleRate,
  301. result.PI, result.PS, diag)
  302. if result.PS != "" {
  303. log.Printf("RDS decoded: PI=%04X PS=%q RT=%q freq=%.1fMHz", result.PI, result.PS, result.RT, sigHz/1e6)
  304. }
  305. }(st, signals[i].CenterHz)
  306. }
  307. // Read last known result (lock-free for display)
  308. st.mu.Lock()
  309. ps := st.result.PS
  310. st.mu.Unlock()
  311. if ps != "" {
  312. pll.RDSStation = strings.TrimSpace(ps)
  313. cls.PLL = &pll
  314. signals[i].PLL = &pll
  315. }
  316. }
  317. }
  318. }
  319. det.UpdateClasses(signals)
  320. // Cleanup RDS accumulators for signals that no longer exist
  321. if len(rdsMap) > 0 {
  322. activeIDs := make(map[int64]bool, len(signals))
  323. for _, s := range signals {
  324. keyHz := s.CenterHz
  325. if s.PLL != nil && s.PLL.ExactHz != 0 {
  326. keyHz = s.PLL.ExactHz
  327. }
  328. activeIDs[int64(math.Round(keyHz/25000.0))] = true
  329. }
  330. for id := range rdsMap {
  331. if !activeIDs[id] {
  332. delete(rdsMap, id)
  333. }
  334. }
  335. }
  336. // Cleanup streamPhaseState for disappeared signals
  337. if len(streamPhaseState) > 0 {
  338. sigIDs := make(map[int64]bool, len(signals))
  339. for _, s := range signals {
  340. sigIDs[s.ID] = true
  341. }
  342. for id := range streamPhaseState {
  343. if !sigIDs[id] {
  344. delete(streamPhaseState, id)
  345. }
  346. }
  347. }
  348. // GPU-extract signal snippets with phase-continuous FreqShift and
  349. // IQ overlap for FIR halo. Heavy work on GPU, only demod runs async.
  350. displaySignals = det.StableSignals()
  351. if rec != nil && len(displaySignals) > 0 && len(allIQ) > 0 {
  352. aqCfg := extractionConfig{
  353. firTaps: cfg.Recorder.ExtractionTaps,
  354. bwMult: cfg.Recorder.ExtractionBwMult,
  355. }
  356. streamSnips, streamRates := extractForStreaming(extractMgr, allIQ, cfg.SampleRate, cfg.CenterHz, displaySignals, streamPhaseState, streamOverlap, aqCfg)
  357. items := make([]recorder.StreamFeedItem, 0, len(displaySignals))
  358. for j, ds := range displaySignals {
  359. if ds.ID == 0 || ds.Class == nil {
  360. continue
  361. }
  362. if j >= len(streamSnips) || len(streamSnips[j]) == 0 {
  363. continue
  364. }
  365. snipRate := cfg.SampleRate
  366. if j < len(streamRates) && streamRates[j] > 0 {
  367. snipRate = streamRates[j]
  368. }
  369. items = append(items, recorder.StreamFeedItem{
  370. Signal: ds,
  371. Snippet: streamSnips[j],
  372. SnipRate: snipRate,
  373. })
  374. }
  375. if len(items) > 0 {
  376. rec.FeedSnippets(items)
  377. }
  378. }
  379. } else {
  380. // No IQ data this frame — still need displaySignals for broadcast
  381. displaySignals = det.StableSignals()
  382. }
  383. if sigSnap != nil {
  384. sigSnap.set(displaySignals)
  385. }
  386. eventMu.Lock()
  387. for _, ev := range finished {
  388. _ = enc.Encode(ev)
  389. }
  390. eventMu.Unlock()
  391. if rec != nil && len(finished) > 0 {
  392. evCopy := make([]detector.Event, len(finished))
  393. copy(evCopy, finished)
  394. rec.OnEvents(evCopy)
  395. }
  396. var debugInfo *SpectrumDebug
  397. if len(thresholds) > 0 || len(displaySignals) > 0 || noiseFloor != 0 {
  398. scoreDebug := make([]map[string]any, 0, len(displaySignals))
  399. for _, s := range displaySignals {
  400. if s.Class == nil || len(s.Class.Scores) == 0 {
  401. scoreDebug = append(scoreDebug, map[string]any{"center_hz": s.CenterHz, "class": nil})
  402. continue
  403. }
  404. scores := make(map[string]float64, len(s.Class.Scores))
  405. for k, v := range s.Class.Scores {
  406. scores[string(k)] = v
  407. }
  408. scoreDebug = append(scoreDebug, map[string]any{
  409. "center_hz": s.CenterHz,
  410. "mod_type": s.Class.ModType,
  411. "confidence": s.Class.Confidence,
  412. "second_best": s.Class.SecondBest,
  413. "scores": scores,
  414. })
  415. }
  416. debugInfo = &SpectrumDebug{Thresholds: thresholds, NoiseFloor: noiseFloor, Scores: scoreDebug}
  417. }
  418. h.broadcast(SpectrumFrame{Timestamp: now.UnixMilli(), CenterHz: cfg.CenterHz, SampleHz: cfg.SampleRate, FFTSize: cfg.FFTSize, Spectrum: spectrum, Signals: displaySignals, Debug: debugInfo})
  419. }
  420. }
  421. }