Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

467 строки
14KB

  1. package main
  2. import (
  3. "math"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "sdr-wideband-suite/internal/classifier"
  9. "sdr-wideband-suite/internal/config"
  10. "sdr-wideband-suite/internal/demod"
  11. "sdr-wideband-suite/internal/detector"
  12. "sdr-wideband-suite/internal/dsp"
  13. fftutil "sdr-wideband-suite/internal/fft"
  14. "sdr-wideband-suite/internal/fft/gpufft"
  15. "sdr-wideband-suite/internal/pipeline"
  16. "sdr-wideband-suite/internal/rds"
  17. "sdr-wideband-suite/internal/recorder"
  18. )
  19. type rdsState struct {
  20. dec rds.Decoder
  21. result rds.Result
  22. lastDecode time.Time
  23. busy int32
  24. mu sync.Mutex
  25. }
  26. type dspRuntime struct {
  27. cfg config.Config
  28. det *detector.Detector
  29. window []float64
  30. plan *fftutil.CmplxPlan
  31. dcEnabled bool
  32. iqEnabled bool
  33. useGPU bool
  34. gpuEngine *gpufft.Engine
  35. rdsMap map[int64]*rdsState
  36. streamPhaseState map[int64]*streamExtractState
  37. streamOverlap *streamIQOverlap
  38. gotSamples bool
  39. }
  40. type spectrumArtifacts struct {
  41. allIQ []complex64
  42. iq []complex64
  43. spectrum []float64
  44. finished []detector.Event
  45. detected []detector.Signal
  46. thresholds []float64
  47. noiseFloor float64
  48. now time.Time
  49. }
  50. func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime {
  51. rt := &dspRuntime{
  52. cfg: cfg,
  53. det: det,
  54. window: window,
  55. plan: fftutil.NewCmplxPlan(cfg.FFTSize),
  56. dcEnabled: cfg.DCBlock,
  57. iqEnabled: cfg.IQBalance,
  58. useGPU: cfg.UseGPUFFT,
  59. rdsMap: map[int64]*rdsState{},
  60. streamPhaseState: map[int64]*streamExtractState{},
  61. streamOverlap: &streamIQOverlap{},
  62. }
  63. if rt.useGPU && gpuState != nil {
  64. snap := gpuState.snapshot()
  65. if snap.Available {
  66. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  67. rt.gpuEngine = eng
  68. gpuState.set(true, nil)
  69. } else {
  70. gpuState.set(false, err)
  71. rt.useGPU = false
  72. }
  73. }
  74. }
  75. return rt
  76. }
  77. func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) {
  78. prevFFT := rt.cfg.FFTSize
  79. prevUseGPU := rt.useGPU
  80. rt.cfg = upd.cfg
  81. if rec != nil {
  82. rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{
  83. Enabled: rt.cfg.Recorder.Enabled,
  84. MinSNRDb: rt.cfg.Recorder.MinSNRDb,
  85. MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second),
  86. MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second),
  87. PrerollMs: rt.cfg.Recorder.PrerollMs,
  88. RecordIQ: rt.cfg.Recorder.RecordIQ,
  89. RecordAudio: rt.cfg.Recorder.RecordAudio,
  90. AutoDemod: rt.cfg.Recorder.AutoDemod,
  91. AutoDecode: rt.cfg.Recorder.AutoDecode,
  92. MaxDiskMB: rt.cfg.Recorder.MaxDiskMB,
  93. OutputDir: rt.cfg.Recorder.OutputDir,
  94. ClassFilter: rt.cfg.Recorder.ClassFilter,
  95. RingSeconds: rt.cfg.Recorder.RingSeconds,
  96. DeemphasisUs: rt.cfg.Recorder.DeemphasisUs,
  97. ExtractionTaps: rt.cfg.Recorder.ExtractionTaps,
  98. ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult,
  99. }, rt.cfg.CenterHz, buildDecoderMap(rt.cfg))
  100. }
  101. if upd.det != nil {
  102. rt.det = upd.det
  103. }
  104. if upd.window != nil {
  105. rt.window = upd.window
  106. rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize)
  107. }
  108. rt.dcEnabled = upd.dcBlock
  109. rt.iqEnabled = upd.iqBalance
  110. if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU {
  111. srcMgr.Flush()
  112. rt.gotSamples = false
  113. if rt.gpuEngine != nil {
  114. rt.gpuEngine.Close()
  115. rt.gpuEngine = nil
  116. }
  117. rt.useGPU = rt.cfg.UseGPUFFT
  118. if rt.useGPU && gpuState != nil {
  119. snap := gpuState.snapshot()
  120. if snap.Available {
  121. if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil {
  122. rt.gpuEngine = eng
  123. gpuState.set(true, nil)
  124. } else {
  125. gpuState.set(false, err)
  126. rt.useGPU = false
  127. }
  128. } else {
  129. gpuState.set(false, nil)
  130. rt.useGPU = false
  131. }
  132. } else if gpuState != nil {
  133. gpuState.set(false, nil)
  134. }
  135. }
  136. }
  137. func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) {
  138. available := rt.cfg.FFTSize
  139. st := srcMgr.Stats()
  140. if st.BufferSamples > rt.cfg.FFTSize {
  141. available = (st.BufferSamples / rt.cfg.FFTSize) * rt.cfg.FFTSize
  142. if available < rt.cfg.FFTSize {
  143. available = rt.cfg.FFTSize
  144. }
  145. }
  146. allIQ, err := srcMgr.ReadIQ(available)
  147. if err != nil {
  148. return nil, err
  149. }
  150. if rec != nil {
  151. rec.Ingest(time.Now(), allIQ)
  152. }
  153. iq := allIQ
  154. if len(allIQ) > rt.cfg.FFTSize {
  155. iq = allIQ[len(allIQ)-rt.cfg.FFTSize:]
  156. }
  157. if rt.dcEnabled {
  158. dcBlocker.Apply(iq)
  159. }
  160. if rt.iqEnabled {
  161. dsp.IQBalance(iq)
  162. }
  163. var spectrum []float64
  164. if rt.useGPU && rt.gpuEngine != nil {
  165. gpuBuf := make([]complex64, len(iq))
  166. if len(rt.window) == len(iq) {
  167. for i := 0; i < len(iq); i++ {
  168. v := iq[i]
  169. w := float32(rt.window[i])
  170. gpuBuf[i] = complex(real(v)*w, imag(v)*w)
  171. }
  172. } else {
  173. copy(gpuBuf, iq)
  174. }
  175. out, err := rt.gpuEngine.Exec(gpuBuf)
  176. if err != nil {
  177. if gpuState != nil {
  178. gpuState.set(false, err)
  179. }
  180. rt.useGPU = false
  181. spectrum = fftutil.SpectrumWithPlan(gpuBuf, nil, rt.plan)
  182. } else {
  183. spectrum = fftutil.SpectrumFromFFT(out)
  184. }
  185. } else {
  186. spectrum = fftutil.SpectrumWithPlan(iq, rt.window, rt.plan)
  187. }
  188. for i := range spectrum {
  189. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  190. spectrum[i] = -200
  191. }
  192. }
  193. now := time.Now()
  194. finished, detected := rt.det.Process(now, spectrum, rt.cfg.CenterHz)
  195. return &spectrumArtifacts{
  196. allIQ: allIQ,
  197. iq: iq,
  198. spectrum: spectrum,
  199. finished: finished,
  200. detected: detected,
  201. thresholds: rt.det.LastThresholds(),
  202. noiseFloor: rt.det.LastNoiseFloor(),
  203. now: now,
  204. }, nil
  205. }
  206. func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.SurveillanceResult {
  207. if art == nil {
  208. return pipeline.SurveillanceResult{}
  209. }
  210. policy := pipeline.PolicyFromConfig(rt.cfg)
  211. candidates := pipeline.CandidatesFromSignals(art.detected, "surveillance-detector")
  212. scheduled := pipeline.ScheduleCandidates(candidates, policy)
  213. level := pipeline.AnalysisLevel{
  214. Name: "surveillance",
  215. SampleRate: rt.cfg.SampleRate,
  216. FFTSize: rt.cfg.Surveillance.AnalysisFFTSize,
  217. CenterHz: rt.cfg.CenterHz,
  218. SpanHz: float64(rt.cfg.SampleRate),
  219. Source: "baseband",
  220. }
  221. return pipeline.SurveillanceResult{
  222. Level: level,
  223. Candidates: candidates,
  224. Scheduled: scheduled,
  225. Finished: art.finished,
  226. Signals: art.detected,
  227. NoiseFloor: art.noiseFloor,
  228. Thresholds: art.thresholds,
  229. }
  230. }
  231. func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pipeline.RefinementInput {
  232. policy := pipeline.PolicyFromConfig(rt.cfg)
  233. plan := pipeline.BuildRefinementPlan(surv.Candidates, policy)
  234. scheduled := append([]pipeline.ScheduledCandidate(nil), surv.Scheduled...)
  235. if len(scheduled) == 0 && len(plan.Selected) > 0 {
  236. scheduled = append([]pipeline.ScheduledCandidate(nil), plan.Selected...)
  237. }
  238. windows := make([]pipeline.RefinementWindow, 0, len(scheduled))
  239. for _, sc := range scheduled {
  240. span := sc.Candidate.BandwidthHz
  241. windowSource := "candidate"
  242. if policy.RefinementAutoSpan && (span <= 0 || span < 2000 || span > 400000) {
  243. autoSpan, autoSource := pipeline.AutoSpanForHint(sc.Candidate.Hint)
  244. if autoSpan > 0 {
  245. span = autoSpan
  246. windowSource = autoSource
  247. }
  248. }
  249. if policy.RefinementMinSpanHz > 0 && span < policy.RefinementMinSpanHz {
  250. span = policy.RefinementMinSpanHz
  251. }
  252. if policy.RefinementMaxSpanHz > 0 && span > policy.RefinementMaxSpanHz {
  253. span = policy.RefinementMaxSpanHz
  254. }
  255. if span <= 0 {
  256. span = 12000
  257. }
  258. windows = append(windows, pipeline.RefinementWindow{
  259. CenterHz: sc.Candidate.CenterHz,
  260. SpanHz: span,
  261. Source: windowSource,
  262. })
  263. }
  264. level := pipeline.AnalysisLevel{
  265. Name: "refinement",
  266. SampleRate: rt.cfg.SampleRate,
  267. FFTSize: rt.cfg.FFTSize,
  268. CenterHz: rt.cfg.CenterHz,
  269. SpanHz: float64(rt.cfg.SampleRate),
  270. Source: "refinement-window",
  271. }
  272. input := pipeline.RefinementInput{
  273. Level: level,
  274. Candidates: append([]pipeline.Candidate(nil), surv.Candidates...),
  275. Scheduled: scheduled,
  276. Plan: plan,
  277. Windows: windows,
  278. SampleRate: rt.cfg.SampleRate,
  279. FFTSize: rt.cfg.FFTSize,
  280. CenterHz: rt.cfg.CenterHz,
  281. Source: "surveillance-detector",
  282. }
  283. if !policy.RefinementEnabled {
  284. input.Scheduled = nil
  285. }
  286. return input
  287. }
  288. func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.RefinementInput, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult {
  289. if art == nil || len(art.iq) == 0 || len(input.Scheduled) == 0 {
  290. return pipeline.RefinementResult{}
  291. }
  292. policy := pipeline.PolicyFromConfig(rt.cfg)
  293. selectedCandidates := make([]pipeline.Candidate, 0, len(input.Scheduled))
  294. selectedSignals := make([]detector.Signal, 0, len(input.Scheduled))
  295. for _, sc := range input.Scheduled {
  296. selectedCandidates = append(selectedCandidates, sc.Candidate)
  297. selectedSignals = append(selectedSignals, detector.Signal{
  298. ID: sc.Candidate.ID,
  299. FirstBin: sc.Candidate.FirstBin,
  300. LastBin: sc.Candidate.LastBin,
  301. CenterHz: sc.Candidate.CenterHz,
  302. BWHz: sc.Candidate.BandwidthHz,
  303. PeakDb: sc.Candidate.PeakDb,
  304. SNRDb: sc.Candidate.SNRDb,
  305. NoiseDb: sc.Candidate.NoiseDb,
  306. })
  307. }
  308. sampleRate := input.SampleRate
  309. fftSize := input.FFTSize
  310. centerHz := input.CenterHz
  311. if sampleRate <= 0 {
  312. sampleRate = rt.cfg.SampleRate
  313. }
  314. if fftSize <= 0 {
  315. fftSize = rt.cfg.FFTSize
  316. }
  317. if centerHz == 0 {
  318. centerHz = rt.cfg.CenterHz
  319. }
  320. snips, snipRates := extractSignalIQBatch(extractMgr, art.iq, sampleRate, centerHz, selectedSignals)
  321. refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.spectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
  322. signals := make([]detector.Signal, 0, len(refined))
  323. decisions := make([]pipeline.SignalDecision, 0, len(refined))
  324. for i, ref := range refined {
  325. sig := ref.Signal
  326. signals = append(signals, sig)
  327. cls := sig.Class
  328. snipRate := ref.SnippetRate
  329. decision := pipeline.DecideSignalAction(policy, ref.Candidate, cls)
  330. decisions = append(decisions, decision)
  331. if cls != nil {
  332. pll := classifier.PLLResult{}
  333. if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
  334. pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
  335. cls.PLL = &pll
  336. signals[i].PLL = &pll
  337. if cls.ModType == classifier.ClassWFM && pll.Stereo {
  338. cls.ModType = classifier.ClassWFMStereo
  339. }
  340. }
  341. if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil {
  342. rt.updateRDS(art.now, rec, &signals[i], cls)
  343. }
  344. }
  345. }
  346. maxRecord := rt.cfg.Resources.MaxRecordingStreams
  347. maxDecode := rt.cfg.Resources.MaxDecodeJobs
  348. enforceDecisionBudgets(decisions, maxRecord, maxDecode)
  349. summary := summarizeDecisions(decisions)
  350. if rec != nil {
  351. if summary.RecordEnabled > 0 {
  352. rt.cfg.Recorder.Enabled = true
  353. }
  354. if summary.DecodeEnabled > 0 {
  355. rt.cfg.Recorder.AutoDecode = true
  356. }
  357. }
  358. rt.det.UpdateClasses(signals)
  359. return pipeline.RefinementResult{Level: input.Level, Signals: signals, Decisions: decisions, Candidates: selectedCandidates}
  360. }
  361. func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) {
  362. if sig == nil || cls == nil {
  363. return
  364. }
  365. keyHz := sig.CenterHz
  366. if sig.PLL != nil && sig.PLL.ExactHz != 0 {
  367. keyHz = sig.PLL.ExactHz
  368. }
  369. key := int64(math.Round(keyHz / 25000.0))
  370. st := rt.rdsMap[key]
  371. if st == nil {
  372. st = &rdsState{}
  373. rt.rdsMap[key] = st
  374. }
  375. if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
  376. st.lastDecode = now
  377. atomic.StoreInt32(&st.busy, 1)
  378. go func(st *rdsState, sigHz float64) {
  379. defer atomic.StoreInt32(&st.busy, 0)
  380. ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
  381. if len(ringIQ) < ringSR || ringSR <= 0 {
  382. return
  383. }
  384. offset := sigHz - ringCenter
  385. shifted := dsp.FreqShift(ringIQ, ringSR, offset)
  386. decim1 := ringSR / 1000000
  387. if decim1 < 1 {
  388. decim1 = 1
  389. }
  390. lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
  391. f1 := dsp.ApplyFIR(shifted, lp1)
  392. d1 := dsp.Decimate(f1, decim1)
  393. rate1 := ringSR / decim1
  394. decim2 := rate1 / 250000
  395. if decim2 < 1 {
  396. decim2 = 1
  397. }
  398. lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
  399. f2 := dsp.ApplyFIR(d1, lp2)
  400. decimated := dsp.Decimate(f2, decim2)
  401. actualRate := rate1 / decim2
  402. rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
  403. if len(rdsBase.Samples) == 0 {
  404. return
  405. }
  406. st.mu.Lock()
  407. result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
  408. if result.PS != "" {
  409. st.result = result
  410. }
  411. st.mu.Unlock()
  412. }(st, sig.CenterHz)
  413. }
  414. st.mu.Lock()
  415. ps := st.result.PS
  416. st.mu.Unlock()
  417. if ps != "" && sig.PLL != nil {
  418. sig.PLL.RDSStation = strings.TrimSpace(ps)
  419. cls.PLL = sig.PLL
  420. }
  421. }
  422. func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) {
  423. if len(rt.rdsMap) > 0 {
  424. activeIDs := make(map[int64]bool, len(displaySignals))
  425. for _, s := range displaySignals {
  426. keyHz := s.CenterHz
  427. if s.PLL != nil && s.PLL.ExactHz != 0 {
  428. keyHz = s.PLL.ExactHz
  429. }
  430. activeIDs[int64(math.Round(keyHz/25000.0))] = true
  431. }
  432. for id := range rt.rdsMap {
  433. if !activeIDs[id] {
  434. delete(rt.rdsMap, id)
  435. }
  436. }
  437. }
  438. if len(rt.streamPhaseState) > 0 {
  439. sigIDs := make(map[int64]bool, len(displaySignals))
  440. for _, s := range displaySignals {
  441. sigIDs[s.ID] = true
  442. }
  443. for id := range rt.streamPhaseState {
  444. if !sigIDs[id] {
  445. delete(rt.streamPhaseState, id)
  446. }
  447. }
  448. }
  449. if rec != nil && len(displaySignals) > 0 {
  450. aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
  451. _ = aqCfg
  452. }
  453. }