Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

1001 строка
31KB

  1. package main
  2. import (
  3. "fmt"
  4. "math"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "sdr-wideband-suite/internal/classifier"
  10. "sdr-wideband-suite/internal/config"
  11. "sdr-wideband-suite/internal/demod"
  12. "sdr-wideband-suite/internal/detector"
  13. "sdr-wideband-suite/internal/dsp"
  14. fftutil "sdr-wideband-suite/internal/fft"
  15. "sdr-wideband-suite/internal/fft/gpufft"
  16. "sdr-wideband-suite/internal/pipeline"
  17. "sdr-wideband-suite/internal/rds"
  18. "sdr-wideband-suite/internal/recorder"
  19. )
  20. type rdsState struct {
  21. dec rds.Decoder
  22. result rds.Result
  23. lastDecode time.Time
  24. busy int32
  25. mu sync.Mutex
  26. }
  27. type dspRuntime struct {
  28. cfg config.Config
  29. det *detector.Detector
  30. derivedDetectors map[string]*derivedDetector
  31. nextDerivedBase int64
  32. window []float64
  33. plan *fftutil.CmplxPlan
  34. detailWindow []float64
  35. detailPlan *fftutil.CmplxPlan
  36. detailFFT int
  37. survWindows map[int][]float64
  38. survPlans map[int]*fftutil.CmplxPlan
  39. survFIR map[int][]float64
  40. dcEnabled bool
  41. iqEnabled bool
  42. useGPU bool
  43. gpuEngine *gpufft.Engine
  44. rdsMap map[int64]*rdsState
  45. streamPhaseState map[int64]*streamExtractState
  46. streamOverlap *streamIQOverlap
  47. arbiter *pipeline.Arbiter
  48. arbitration pipeline.ArbitrationState
  49. gotSamples bool
  50. }
  51. type spectrumArtifacts struct {
  52. allIQ []complex64
  53. surveillanceIQ []complex64
  54. detailIQ []complex64
  55. surveillanceSpectrum []float64
  56. surveillanceSpectra []pipeline.SurveillanceLevelSpectrum
  57. surveillancePlan surveillancePlan
  58. detailSpectrum []float64
  59. finished []detector.Event
  60. detected []detector.Signal
  61. thresholds []float64
  62. noiseFloor float64
  63. now time.Time
  64. }
  65. type derivedDetector struct {
  66. det *detector.Detector
  67. sampleRate int
  68. fftSize int
  69. idBase int64
  70. }
  71. type surveillanceLevelSpec struct {
  72. Level pipeline.AnalysisLevel
  73. Decim int
  74. AllowGPU bool
  75. }
  76. type surveillancePlan struct {
  77. Primary pipeline.AnalysisLevel
  78. Levels []pipeline.AnalysisLevel
  79. LevelSet pipeline.SurveillanceLevelSet
  80. Presentation pipeline.AnalysisLevel
  81. Context pipeline.AnalysisContext
  82. DetectionPolicy pipeline.SurveillanceDetectionPolicy
  83. Specs []surveillanceLevelSpec
  84. }
  85. const derivedIDBlock = int64(1_000_000_000)
  86. func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime {
  87. detailFFT := cfg.Refinement.DetailFFTSize
  88. if detailFFT <= 0 {
  89. detailFFT = cfg.FFTSize
  90. }
  91. rt := &dspRuntime{
  92. cfg: cfg,
  93. det: det,
  94. derivedDetectors: map[string]*derivedDetector{},
  95. nextDerivedBase: -derivedIDBlock,
  96. window: window,
  97. plan: fftutil.NewCmplxPlan(cfg.FFTSize),
  98. detailWindow: fftutil.Hann(detailFFT),
  99. detailPlan: fftutil.NewCmplxPlan(detailFFT),
  100. detailFFT: detailFFT,
  101. survWindows: map[int][]float64{},
  102. survPlans: map[int]*fftutil.CmplxPlan{},
  103. survFIR: map[int][]float64{},
  104. dcEnabled: cfg.DCBlock,
  105. iqEnabled: cfg.IQBalance,
  106. useGPU: cfg.UseGPUFFT,
  107. rdsMap: map[int64]*rdsState{},
  108. streamPhaseState: map[int64]*streamExtractState{},
  109. streamOverlap: &streamIQOverlap{},
  110. arbiter: pipeline.NewArbiter(),
  111. }
  112. if rt.useGPU && gpuState != nil {
  113. snap := gpuState.snapshot()
  114. if snap.Available {
  115. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  116. rt.gpuEngine = eng
  117. gpuState.set(true, nil)
  118. } else {
  119. gpuState.set(false, err)
  120. rt.useGPU = false
  121. }
  122. }
  123. }
  124. return rt
  125. }
  126. func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) {
  127. prevFFT := rt.cfg.FFTSize
  128. prevSampleRate := rt.cfg.SampleRate
  129. prevUseGPU := rt.useGPU
  130. prevDetailFFT := rt.detailFFT
  131. rt.cfg = upd.cfg
  132. if rec != nil {
  133. rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{
  134. Enabled: rt.cfg.Recorder.Enabled,
  135. MinSNRDb: rt.cfg.Recorder.MinSNRDb,
  136. MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second),
  137. MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second),
  138. PrerollMs: rt.cfg.Recorder.PrerollMs,
  139. RecordIQ: rt.cfg.Recorder.RecordIQ,
  140. RecordAudio: rt.cfg.Recorder.RecordAudio,
  141. AutoDemod: rt.cfg.Recorder.AutoDemod,
  142. AutoDecode: rt.cfg.Recorder.AutoDecode,
  143. MaxDiskMB: rt.cfg.Recorder.MaxDiskMB,
  144. OutputDir: rt.cfg.Recorder.OutputDir,
  145. ClassFilter: rt.cfg.Recorder.ClassFilter,
  146. RingSeconds: rt.cfg.Recorder.RingSeconds,
  147. DeemphasisUs: rt.cfg.Recorder.DeemphasisUs,
  148. ExtractionTaps: rt.cfg.Recorder.ExtractionTaps,
  149. ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult,
  150. }, rt.cfg.CenterHz, buildDecoderMap(rt.cfg))
  151. }
  152. if upd.det != nil {
  153. rt.det = upd.det
  154. }
  155. if upd.window != nil {
  156. rt.window = upd.window
  157. rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize)
  158. }
  159. detailFFT := rt.cfg.Refinement.DetailFFTSize
  160. if detailFFT <= 0 {
  161. detailFFT = rt.cfg.FFTSize
  162. }
  163. if detailFFT != prevDetailFFT {
  164. rt.detailFFT = detailFFT
  165. rt.detailWindow = fftutil.Hann(detailFFT)
  166. rt.detailPlan = fftutil.NewCmplxPlan(detailFFT)
  167. }
  168. if prevSampleRate != rt.cfg.SampleRate {
  169. rt.survFIR = map[int][]float64{}
  170. }
  171. if prevFFT != rt.cfg.FFTSize {
  172. rt.survWindows = map[int][]float64{}
  173. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  174. }
  175. if upd.det != nil || prevSampleRate != rt.cfg.SampleRate || prevFFT != rt.cfg.FFTSize {
  176. rt.derivedDetectors = map[string]*derivedDetector{}
  177. rt.nextDerivedBase = -derivedIDBlock
  178. }
  179. rt.dcEnabled = upd.dcBlock
  180. rt.iqEnabled = upd.iqBalance
  181. if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU {
  182. srcMgr.Flush()
  183. rt.gotSamples = false
  184. if rt.gpuEngine != nil {
  185. rt.gpuEngine.Close()
  186. rt.gpuEngine = nil
  187. }
  188. rt.useGPU = rt.cfg.UseGPUFFT
  189. if rt.useGPU && gpuState != nil {
  190. snap := gpuState.snapshot()
  191. if snap.Available {
  192. if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil {
  193. rt.gpuEngine = eng
  194. gpuState.set(true, nil)
  195. } else {
  196. gpuState.set(false, err)
  197. rt.useGPU = false
  198. }
  199. } else {
  200. gpuState.set(false, nil)
  201. rt.useGPU = false
  202. }
  203. } else if gpuState != nil {
  204. gpuState.set(false, nil)
  205. }
  206. }
  207. }
  208. func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 {
  209. return rt.spectrumFromIQWithPlan(iq, rt.window, rt.plan, gpuState, true)
  210. }
  211. func (rt *dspRuntime) spectrumFromIQWithPlan(iq []complex64, window []float64, plan *fftutil.CmplxPlan, gpuState *gpuStatus, allowGPU bool) []float64 {
  212. if len(iq) == 0 {
  213. return nil
  214. }
  215. if allowGPU && rt.useGPU && rt.gpuEngine != nil {
  216. gpuBuf := make([]complex64, len(iq))
  217. if len(window) == len(iq) {
  218. for i := 0; i < len(iq); i++ {
  219. v := iq[i]
  220. w := float32(window[i])
  221. gpuBuf[i] = complex(real(v)*w, imag(v)*w)
  222. }
  223. } else {
  224. copy(gpuBuf, iq)
  225. }
  226. out, err := rt.gpuEngine.Exec(gpuBuf)
  227. if err != nil {
  228. if gpuState != nil {
  229. gpuState.set(false, err)
  230. }
  231. rt.useGPU = false
  232. return fftutil.SpectrumWithPlan(gpuBuf, nil, plan)
  233. }
  234. return fftutil.SpectrumFromFFT(out)
  235. }
  236. return fftutil.SpectrumWithPlan(iq, window, plan)
  237. }
  238. func (rt *dspRuntime) windowForFFT(fftSize int) []float64 {
  239. if fftSize <= 0 {
  240. return nil
  241. }
  242. if fftSize == rt.cfg.FFTSize {
  243. return rt.window
  244. }
  245. if rt.survWindows == nil {
  246. rt.survWindows = map[int][]float64{}
  247. }
  248. if window, ok := rt.survWindows[fftSize]; ok {
  249. return window
  250. }
  251. window := fftutil.Hann(fftSize)
  252. rt.survWindows[fftSize] = window
  253. return window
  254. }
  255. func (rt *dspRuntime) planForFFT(fftSize int) *fftutil.CmplxPlan {
  256. if fftSize <= 0 {
  257. return nil
  258. }
  259. if fftSize == rt.cfg.FFTSize {
  260. return rt.plan
  261. }
  262. if rt.survPlans == nil {
  263. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  264. }
  265. if plan, ok := rt.survPlans[fftSize]; ok {
  266. return plan
  267. }
  268. plan := fftutil.NewCmplxPlan(fftSize)
  269. rt.survPlans[fftSize] = plan
  270. return plan
  271. }
  272. func (rt *dspRuntime) spectrumForLevel(iq []complex64, fftSize int, gpuState *gpuStatus, allowGPU bool) []float64 {
  273. if len(iq) == 0 || fftSize <= 0 {
  274. return nil
  275. }
  276. if len(iq) > fftSize {
  277. iq = iq[len(iq)-fftSize:]
  278. }
  279. window := rt.windowForFFT(fftSize)
  280. plan := rt.planForFFT(fftSize)
  281. return rt.spectrumFromIQWithPlan(iq, window, plan, gpuState, allowGPU)
  282. }
  283. func sanitizeSpectrum(spectrum []float64) {
  284. for i := range spectrum {
  285. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  286. spectrum[i] = -200
  287. }
  288. }
  289. }
  290. func (rt *dspRuntime) decimationTaps(factor int) []float64 {
  291. if factor <= 1 {
  292. return nil
  293. }
  294. if rt.survFIR == nil {
  295. rt.survFIR = map[int][]float64{}
  296. }
  297. if taps, ok := rt.survFIR[factor]; ok {
  298. return taps
  299. }
  300. cutoff := float64(rt.cfg.SampleRate/factor) * 0.5 * 0.8
  301. taps := dsp.LowpassFIR(cutoff, rt.cfg.SampleRate, 101)
  302. rt.survFIR[factor] = taps
  303. return taps
  304. }
  305. func (rt *dspRuntime) decimateSurveillanceIQ(iq []complex64, factor int) []complex64 {
  306. if factor <= 1 {
  307. return iq
  308. }
  309. taps := rt.decimationTaps(factor)
  310. if len(taps) == 0 {
  311. return dsp.Decimate(iq, factor)
  312. }
  313. filtered := dsp.ApplyFIR(iq, taps)
  314. return dsp.Decimate(filtered, factor)
  315. }
  316. func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) {
  317. required := rt.cfg.FFTSize
  318. if rt.detailFFT > required {
  319. required = rt.detailFFT
  320. }
  321. available := required
  322. st := srcMgr.Stats()
  323. if st.BufferSamples > required {
  324. available = (st.BufferSamples / required) * required
  325. if available < required {
  326. available = required
  327. }
  328. }
  329. allIQ, err := srcMgr.ReadIQ(available)
  330. if err != nil {
  331. return nil, err
  332. }
  333. if rec != nil {
  334. rec.Ingest(time.Now(), allIQ)
  335. }
  336. survIQ := allIQ
  337. if len(allIQ) > rt.cfg.FFTSize {
  338. survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:]
  339. }
  340. detailIQ := survIQ
  341. if rt.detailFFT > 0 && len(allIQ) >= rt.detailFFT {
  342. detailIQ = allIQ[len(allIQ)-rt.detailFFT:]
  343. }
  344. if rt.dcEnabled {
  345. dcBlocker.Apply(allIQ)
  346. }
  347. if rt.iqEnabled {
  348. dsp.IQBalance(survIQ)
  349. if !sameIQBuffer(detailIQ, survIQ) {
  350. detailIQ = append([]complex64(nil), detailIQ...)
  351. dsp.IQBalance(detailIQ)
  352. }
  353. }
  354. survSpectrum := rt.spectrumFromIQ(survIQ, gpuState)
  355. sanitizeSpectrum(survSpectrum)
  356. detailSpectrum := survSpectrum
  357. if !sameIQBuffer(detailIQ, survIQ) {
  358. detailSpectrum = rt.spectrumFromIQWithPlan(detailIQ, rt.detailWindow, rt.detailPlan, gpuState, false)
  359. sanitizeSpectrum(detailSpectrum)
  360. }
  361. policy := pipeline.PolicyFromConfig(rt.cfg)
  362. plan := rt.buildSurveillancePlan(policy)
  363. surveillanceSpectra := make([]pipeline.SurveillanceLevelSpectrum, 0, len(plan.Specs))
  364. for _, spec := range plan.Specs {
  365. if spec.Level.FFTSize <= 0 {
  366. continue
  367. }
  368. var spectrum []float64
  369. if spec.Decim <= 1 {
  370. if spec.Level.FFTSize == len(survSpectrum) {
  371. spectrum = survSpectrum
  372. } else {
  373. spectrum = rt.spectrumForLevel(survIQ, spec.Level.FFTSize, gpuState, spec.AllowGPU)
  374. sanitizeSpectrum(spectrum)
  375. }
  376. } else {
  377. required := spec.Level.FFTSize * spec.Decim
  378. if required > len(survIQ) {
  379. continue
  380. }
  381. src := survIQ
  382. if len(src) > required {
  383. src = src[len(src)-required:]
  384. }
  385. decimated := rt.decimateSurveillanceIQ(src, spec.Decim)
  386. spectrum = rt.spectrumForLevel(decimated, spec.Level.FFTSize, gpuState, false)
  387. sanitizeSpectrum(spectrum)
  388. }
  389. if len(spectrum) == 0 {
  390. continue
  391. }
  392. surveillanceSpectra = append(surveillanceSpectra, pipeline.SurveillanceLevelSpectrum{Level: spec.Level, Spectrum: spectrum})
  393. }
  394. now := time.Now()
  395. finished, detected := rt.det.Process(now, survSpectrum, rt.cfg.CenterHz)
  396. return &spectrumArtifacts{
  397. allIQ: allIQ,
  398. surveillanceIQ: survIQ,
  399. detailIQ: detailIQ,
  400. surveillanceSpectrum: survSpectrum,
  401. surveillanceSpectra: surveillanceSpectra,
  402. surveillancePlan: plan,
  403. detailSpectrum: detailSpectrum,
  404. finished: finished,
  405. detected: detected,
  406. thresholds: rt.det.LastThresholds(),
  407. noiseFloor: rt.det.LastNoiseFloor(),
  408. now: now,
  409. }, nil
  410. }
  411. func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.SurveillanceResult {
  412. if art == nil {
  413. return pipeline.SurveillanceResult{}
  414. }
  415. policy := pipeline.PolicyFromConfig(rt.cfg)
  416. plan := art.surveillancePlan
  417. if plan.Primary.Name == "" {
  418. plan = rt.buildSurveillancePlan(policy)
  419. }
  420. primaryCandidates := pipeline.CandidatesFromSignalsWithLevel(art.detected, "surveillance-detector", plan.Primary)
  421. derivedCandidates := rt.detectDerivedCandidates(art, plan)
  422. candidates := pipeline.FuseCandidates(primaryCandidates, derivedCandidates)
  423. scheduled := pipeline.ScheduleCandidates(candidates, policy)
  424. return pipeline.SurveillanceResult{
  425. Level: plan.Primary,
  426. Levels: plan.Levels,
  427. LevelSet: plan.LevelSet,
  428. DetectionPolicy: plan.DetectionPolicy,
  429. DisplayLevel: plan.Presentation,
  430. Context: plan.Context,
  431. Spectra: art.surveillanceSpectra,
  432. Candidates: candidates,
  433. Scheduled: scheduled,
  434. Finished: art.finished,
  435. Signals: art.detected,
  436. NoiseFloor: art.noiseFloor,
  437. Thresholds: art.thresholds,
  438. }
  439. }
  440. func (rt *dspRuntime) detectDerivedCandidates(art *spectrumArtifacts, plan surveillancePlan) []pipeline.Candidate {
  441. if art == nil || len(plan.LevelSet.Derived) == 0 {
  442. return nil
  443. }
  444. spectra := map[string][]float64{}
  445. for _, spec := range art.surveillanceSpectra {
  446. if spec.Level.Name == "" || len(spec.Spectrum) == 0 {
  447. continue
  448. }
  449. spectra[spec.Level.Name] = spec.Spectrum
  450. }
  451. if len(spectra) == 0 {
  452. return nil
  453. }
  454. out := make([]pipeline.Candidate, 0, len(plan.LevelSet.Derived))
  455. for _, level := range plan.LevelSet.Derived {
  456. if level.Name == "" {
  457. continue
  458. }
  459. if !pipeline.IsDetectionLevel(level) {
  460. continue
  461. }
  462. spectrum := spectra[level.Name]
  463. if len(spectrum) == 0 {
  464. continue
  465. }
  466. entry := rt.derivedDetectorForLevel(level)
  467. if entry == nil || entry.det == nil {
  468. continue
  469. }
  470. _, signals := entry.det.Process(art.now, spectrum, level.CenterHz)
  471. if len(signals) == 0 {
  472. continue
  473. }
  474. cands := pipeline.CandidatesFromSignalsWithLevel(signals, "surveillance-derived", level)
  475. for i := range cands {
  476. if cands[i].ID == 0 {
  477. continue
  478. }
  479. cands[i].ID = entry.idBase - cands[i].ID
  480. }
  481. out = append(out, cands...)
  482. }
  483. if len(out) == 0 {
  484. return nil
  485. }
  486. return out
  487. }
  488. func (rt *dspRuntime) derivedDetectorForLevel(level pipeline.AnalysisLevel) *derivedDetector {
  489. if level.SampleRate <= 0 || level.FFTSize <= 0 {
  490. return nil
  491. }
  492. if rt.derivedDetectors == nil {
  493. rt.derivedDetectors = map[string]*derivedDetector{}
  494. }
  495. key := level.Name
  496. if key == "" {
  497. key = fmt.Sprintf("%d:%d", level.SampleRate, level.FFTSize)
  498. }
  499. entry := rt.derivedDetectors[key]
  500. if entry != nil && entry.sampleRate == level.SampleRate && entry.fftSize == level.FFTSize {
  501. return entry
  502. }
  503. if rt.nextDerivedBase == 0 {
  504. rt.nextDerivedBase = -derivedIDBlock
  505. }
  506. entry = &derivedDetector{
  507. det: detector.New(rt.cfg.Detector, level.SampleRate, level.FFTSize),
  508. sampleRate: level.SampleRate,
  509. fftSize: level.FFTSize,
  510. idBase: rt.nextDerivedBase,
  511. }
  512. rt.nextDerivedBase -= derivedIDBlock
  513. rt.derivedDetectors[key] = entry
  514. return entry
  515. }
  516. func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now time.Time) pipeline.RefinementInput {
  517. policy := pipeline.PolicyFromConfig(rt.cfg)
  518. baseBudget := pipeline.BudgetModelFromPolicy(policy)
  519. pressure := pipeline.BuildBudgetPressureSummary(baseBudget, rt.arbitration.Refinement, rt.arbitration.Queue)
  520. budget := pipeline.ApplyBudgetRebalance(policy, baseBudget, pressure)
  521. plan := pipeline.BuildRefinementPlanWithBudget(surv.Candidates, policy, budget)
  522. admission := rt.arbiter.AdmitRefinementWithBudget(plan, policy, budget, now)
  523. plan = admission.Plan
  524. workItems := make([]pipeline.RefinementWorkItem, 0, len(admission.WorkItems))
  525. if len(admission.WorkItems) > 0 {
  526. workItems = append(workItems, admission.WorkItems...)
  527. }
  528. scheduled := append([]pipeline.ScheduledCandidate(nil), admission.Admitted...)
  529. workIndex := map[int64]int{}
  530. for i := range workItems {
  531. if workItems[i].Candidate.ID == 0 {
  532. continue
  533. }
  534. workIndex[workItems[i].Candidate.ID] = i
  535. }
  536. windows := make([]pipeline.RefinementWindow, 0, len(scheduled))
  537. for _, sc := range scheduled {
  538. window := pipeline.RefinementWindowForCandidate(policy, sc.Candidate)
  539. windows = append(windows, window)
  540. if idx, ok := workIndex[sc.Candidate.ID]; ok {
  541. workItems[idx].Window = window
  542. }
  543. }
  544. detailFFT := rt.cfg.Refinement.DetailFFTSize
  545. if detailFFT <= 0 {
  546. detailFFT = rt.cfg.FFTSize
  547. }
  548. levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate))
  549. if _, maxSpan, ok := windowSpanBounds(windows); ok {
  550. levelSpan = maxSpan
  551. }
  552. level := analysisLevel("refinement", "refinement", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "refinement-window", 1, rt.cfg.SampleRate)
  553. detailLevel := analysisLevel("detail", "detail", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "detail-spectrum", 1, rt.cfg.SampleRate)
  554. if len(workItems) > 0 {
  555. for i := range workItems {
  556. item := &workItems[i]
  557. if item.Window.SpanHz <= 0 {
  558. continue
  559. }
  560. item.Execution = &pipeline.RefinementExecution{
  561. Stage: "refine",
  562. SampleRate: rt.cfg.SampleRate,
  563. FFTSize: detailFFT,
  564. CenterHz: item.Window.CenterHz,
  565. SpanHz: item.Window.SpanHz,
  566. Source: detailLevel.Source,
  567. }
  568. }
  569. }
  570. input := pipeline.RefinementInput{
  571. Level: level,
  572. Detail: detailLevel,
  573. Context: surv.Context,
  574. Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan},
  575. Budgets: budget,
  576. Admission: admission.Admission,
  577. Candidates: append([]pipeline.Candidate(nil), surv.Candidates...),
  578. Scheduled: scheduled,
  579. WorkItems: workItems,
  580. Plan: plan,
  581. Windows: windows,
  582. SampleRate: rt.cfg.SampleRate,
  583. FFTSize: detailFFT,
  584. CenterHz: rt.cfg.CenterHz,
  585. Source: "surveillance-detector",
  586. }
  587. input.Context.Refinement = level
  588. input.Context.Detail = detailLevel
  589. if !policy.RefinementEnabled {
  590. for i := range input.WorkItems {
  591. item := &input.WorkItems[i]
  592. if item.Status == pipeline.RefinementStatusDropped {
  593. continue
  594. }
  595. item.Status = pipeline.RefinementStatusDropped
  596. item.Reason = pipeline.RefinementReasonDisabled
  597. }
  598. input.Scheduled = nil
  599. input.Request.Reason = pipeline.ReasonAdmissionDisabled
  600. input.Admission.Reason = pipeline.ReasonAdmissionDisabled
  601. input.Admission.Admitted = 0
  602. input.Admission.Skipped = 0
  603. input.Admission.Displaced = 0
  604. input.Plan.Selected = nil
  605. input.Plan.DroppedByBudget = 0
  606. }
  607. rt.setArbitration(policy, input.Budgets, input.Admission, rt.arbitration.Queue)
  608. return input
  609. }
  610. func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep {
  611. input := rt.buildRefinementInput(surv, art.now)
  612. markWorkItemsStatus(input.WorkItems, pipeline.RefinementStatusAdmitted, pipeline.RefinementStatusRunning, pipeline.RefinementReasonRunning)
  613. result := rt.refineSignals(art, input, extractMgr, rec)
  614. markWorkItemsCompleted(input.WorkItems, result.Candidates)
  615. return pipeline.RefinementStep{Input: input, Result: result}
  616. }
  617. func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.RefinementInput, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult {
  618. if art == nil || len(art.detailIQ) == 0 || len(input.Scheduled) == 0 {
  619. return pipeline.RefinementResult{}
  620. }
  621. policy := pipeline.PolicyFromConfig(rt.cfg)
  622. selectedCandidates := make([]pipeline.Candidate, 0, len(input.Scheduled))
  623. selectedSignals := make([]detector.Signal, 0, len(input.Scheduled))
  624. for _, sc := range input.Scheduled {
  625. selectedCandidates = append(selectedCandidates, sc.Candidate)
  626. selectedSignals = append(selectedSignals, detector.Signal{
  627. ID: sc.Candidate.ID,
  628. FirstBin: sc.Candidate.FirstBin,
  629. LastBin: sc.Candidate.LastBin,
  630. CenterHz: sc.Candidate.CenterHz,
  631. BWHz: sc.Candidate.BandwidthHz,
  632. PeakDb: sc.Candidate.PeakDb,
  633. SNRDb: sc.Candidate.SNRDb,
  634. NoiseDb: sc.Candidate.NoiseDb,
  635. })
  636. }
  637. sampleRate := input.SampleRate
  638. fftSize := input.FFTSize
  639. centerHz := input.CenterHz
  640. if sampleRate <= 0 {
  641. sampleRate = rt.cfg.SampleRate
  642. }
  643. if fftSize <= 0 {
  644. fftSize = rt.cfg.FFTSize
  645. }
  646. if centerHz == 0 {
  647. centerHz = rt.cfg.CenterHz
  648. }
  649. snips, snipRates := extractSignalIQBatch(extractMgr, art.detailIQ, sampleRate, centerHz, selectedSignals)
  650. refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.detailSpectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
  651. signals := make([]detector.Signal, 0, len(refined))
  652. decisions := make([]pipeline.SignalDecision, 0, len(refined))
  653. for i, ref := range refined {
  654. sig := ref.Signal
  655. signals = append(signals, sig)
  656. cls := sig.Class
  657. snipRate := ref.SnippetRate
  658. decision := pipeline.DecideSignalAction(policy, ref.Candidate, cls)
  659. decisions = append(decisions, decision)
  660. if cls != nil {
  661. pll := classifier.PLLResult{}
  662. if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
  663. pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
  664. cls.PLL = &pll
  665. signals[i].PLL = &pll
  666. if cls.ModType == classifier.ClassWFM && pll.Stereo {
  667. cls.ModType = classifier.ClassWFMStereo
  668. }
  669. }
  670. if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil {
  671. rt.updateRDS(art.now, rec, &signals[i], cls)
  672. }
  673. }
  674. }
  675. budget := input.Budgets
  676. queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy)
  677. rt.setArbitration(policy, budget, input.Admission, queueStats)
  678. summary := summarizeDecisions(decisions)
  679. if rec != nil {
  680. if summary.RecordEnabled > 0 {
  681. rt.cfg.Recorder.Enabled = true
  682. }
  683. if summary.DecodeEnabled > 0 {
  684. rt.cfg.Recorder.AutoDecode = true
  685. }
  686. }
  687. rt.det.UpdateClasses(signals)
  688. return pipeline.RefinementResult{Level: input.Level, Signals: signals, Decisions: decisions, Candidates: selectedCandidates}
  689. }
  690. func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) {
  691. if sig == nil || cls == nil {
  692. return
  693. }
  694. keyHz := sig.CenterHz
  695. if sig.PLL != nil && sig.PLL.ExactHz != 0 {
  696. keyHz = sig.PLL.ExactHz
  697. }
  698. key := int64(math.Round(keyHz / 25000.0))
  699. st := rt.rdsMap[key]
  700. if st == nil {
  701. st = &rdsState{}
  702. rt.rdsMap[key] = st
  703. }
  704. if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
  705. st.lastDecode = now
  706. atomic.StoreInt32(&st.busy, 1)
  707. go func(st *rdsState, sigHz float64) {
  708. defer atomic.StoreInt32(&st.busy, 0)
  709. ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
  710. if len(ringIQ) < ringSR || ringSR <= 0 {
  711. return
  712. }
  713. offset := sigHz - ringCenter
  714. shifted := dsp.FreqShift(ringIQ, ringSR, offset)
  715. decim1 := ringSR / 1000000
  716. if decim1 < 1 {
  717. decim1 = 1
  718. }
  719. lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
  720. f1 := dsp.ApplyFIR(shifted, lp1)
  721. d1 := dsp.Decimate(f1, decim1)
  722. rate1 := ringSR / decim1
  723. decim2 := rate1 / 250000
  724. if decim2 < 1 {
  725. decim2 = 1
  726. }
  727. lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
  728. f2 := dsp.ApplyFIR(d1, lp2)
  729. decimated := dsp.Decimate(f2, decim2)
  730. actualRate := rate1 / decim2
  731. rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
  732. if len(rdsBase.Samples) == 0 {
  733. return
  734. }
  735. st.mu.Lock()
  736. result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
  737. if result.PS != "" {
  738. st.result = result
  739. }
  740. st.mu.Unlock()
  741. }(st, sig.CenterHz)
  742. }
  743. st.mu.Lock()
  744. ps := st.result.PS
  745. st.mu.Unlock()
  746. if ps != "" && sig.PLL != nil {
  747. sig.PLL.RDSStation = strings.TrimSpace(ps)
  748. cls.PLL = sig.PLL
  749. }
  750. }
  751. func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) {
  752. if len(rt.rdsMap) > 0 {
  753. activeIDs := make(map[int64]bool, len(displaySignals))
  754. for _, s := range displaySignals {
  755. keyHz := s.CenterHz
  756. if s.PLL != nil && s.PLL.ExactHz != 0 {
  757. keyHz = s.PLL.ExactHz
  758. }
  759. activeIDs[int64(math.Round(keyHz/25000.0))] = true
  760. }
  761. for id := range rt.rdsMap {
  762. if !activeIDs[id] {
  763. delete(rt.rdsMap, id)
  764. }
  765. }
  766. }
  767. if len(rt.streamPhaseState) > 0 {
  768. sigIDs := make(map[int64]bool, len(displaySignals))
  769. for _, s := range displaySignals {
  770. sigIDs[s.ID] = true
  771. }
  772. for id := range rt.streamPhaseState {
  773. if !sigIDs[id] {
  774. delete(rt.streamPhaseState, id)
  775. }
  776. }
  777. }
  778. if rec != nil && len(displaySignals) > 0 {
  779. aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
  780. _ = aqCfg
  781. }
  782. }
  783. func spanForPolicy(policy pipeline.Policy, fallback float64) float64 {
  784. if policy.MonitorSpanHz > 0 {
  785. return policy.MonitorSpanHz
  786. }
  787. if len(policy.MonitorWindows) > 0 {
  788. maxSpan := 0.0
  789. for _, w := range policy.MonitorWindows {
  790. if w.SpanHz > maxSpan {
  791. maxSpan = w.SpanHz
  792. }
  793. }
  794. if maxSpan > 0 {
  795. return maxSpan
  796. }
  797. }
  798. if policy.MonitorStartHz != 0 && policy.MonitorEndHz != 0 && policy.MonitorEndHz > policy.MonitorStartHz {
  799. return policy.MonitorEndHz - policy.MonitorStartHz
  800. }
  801. return fallback
  802. }
  803. func windowSpanBounds(windows []pipeline.RefinementWindow) (float64, float64, bool) {
  804. minSpan := 0.0
  805. maxSpan := 0.0
  806. ok := false
  807. for _, w := range windows {
  808. if w.SpanHz <= 0 {
  809. continue
  810. }
  811. if !ok || w.SpanHz < minSpan {
  812. minSpan = w.SpanHz
  813. }
  814. if !ok || w.SpanHz > maxSpan {
  815. maxSpan = w.SpanHz
  816. }
  817. ok = true
  818. }
  819. return minSpan, maxSpan, ok
  820. }
  821. func analysisLevel(name, role, truth string, sampleRate int, fftSize int, centerHz float64, spanHz float64, source string, decimation int, baseRate int) pipeline.AnalysisLevel {
  822. level := pipeline.AnalysisLevel{
  823. Name: name,
  824. Role: role,
  825. Truth: truth,
  826. SampleRate: sampleRate,
  827. FFTSize: fftSize,
  828. CenterHz: centerHz,
  829. SpanHz: spanHz,
  830. Source: source,
  831. }
  832. if level.SampleRate > 0 && level.FFTSize > 0 {
  833. level.BinHz = float64(level.SampleRate) / float64(level.FFTSize)
  834. }
  835. if decimation > 0 {
  836. level.Decimation = decimation
  837. } else if baseRate > 0 && level.SampleRate > 0 && baseRate%level.SampleRate == 0 {
  838. level.Decimation = baseRate / level.SampleRate
  839. }
  840. return level
  841. }
  842. func (rt *dspRuntime) buildSurveillancePlan(policy pipeline.Policy) surveillancePlan {
  843. baseRate := rt.cfg.SampleRate
  844. baseFFT := rt.cfg.Surveillance.AnalysisFFTSize
  845. if baseFFT <= 0 {
  846. baseFFT = rt.cfg.FFTSize
  847. }
  848. span := spanForPolicy(policy, float64(baseRate))
  849. detectionPolicy := pipeline.SurveillanceDetectionPolicyFromPolicy(policy)
  850. primary := analysisLevel("surveillance", pipeline.RoleSurveillancePrimary, "surveillance", baseRate, baseFFT, rt.cfg.CenterHz, span, "baseband", 1, baseRate)
  851. levels := []pipeline.AnalysisLevel{primary}
  852. specs := []surveillanceLevelSpec{{Level: primary, Decim: 1, AllowGPU: true}}
  853. context := pipeline.AnalysisContext{Surveillance: primary}
  854. derivedLevels := make([]pipeline.AnalysisLevel, 0, 2)
  855. supportLevels := make([]pipeline.AnalysisLevel, 0, 2)
  856. strategy := strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy))
  857. switch strategy {
  858. case "multi-res", "multi-resolution", "multi", "multi_res":
  859. decim := 2
  860. derivedRate := baseRate / decim
  861. derivedFFT := baseFFT / decim
  862. if derivedRate >= 200000 && derivedFFT >= 256 {
  863. derivedSpan := spanForPolicy(policy, float64(derivedRate))
  864. role := pipeline.RoleSurveillanceSupport
  865. if detectionPolicy.DerivedDetectionEnabled {
  866. role = pipeline.RoleSurveillanceDerived
  867. }
  868. derived := analysisLevel("surveillance-lowres", role, "surveillance", derivedRate, derivedFFT, rt.cfg.CenterHz, derivedSpan, "decimated", decim, baseRate)
  869. if detectionPolicy.DerivedDetectionEnabled {
  870. levels = append(levels, derived)
  871. derivedLevels = append(derivedLevels, derived)
  872. } else {
  873. supportLevels = append(supportLevels, derived)
  874. }
  875. specs = append(specs, surveillanceLevelSpec{Level: derived, Decim: decim, AllowGPU: false})
  876. context.Derived = append(context.Derived, derived)
  877. }
  878. }
  879. presentation := analysisLevel("presentation", pipeline.RolePresentation, "presentation", baseRate, rt.cfg.Surveillance.DisplayBins, rt.cfg.CenterHz, span, "display", 1, baseRate)
  880. context.Presentation = presentation
  881. if len(derivedLevels) == 0 && detectionPolicy.DerivedDetectionEnabled {
  882. detectionPolicy.DerivedDetectionEnabled = false
  883. detectionPolicy.DerivedDetectionReason = "levels"
  884. }
  885. switch {
  886. case len(derivedLevels) > 0:
  887. detectionPolicy.DerivedDetectionMode = "detection"
  888. case len(supportLevels) > 0:
  889. detectionPolicy.DerivedDetectionMode = "support"
  890. default:
  891. detectionPolicy.DerivedDetectionMode = "disabled"
  892. }
  893. levelSet := pipeline.SurveillanceLevelSet{
  894. Primary: primary,
  895. Derived: append([]pipeline.AnalysisLevel(nil), derivedLevels...),
  896. Support: append([]pipeline.AnalysisLevel(nil), supportLevels...),
  897. Presentation: presentation,
  898. }
  899. detectionLevels := make([]pipeline.AnalysisLevel, 0, 1+len(derivedLevels))
  900. detectionLevels = append(detectionLevels, primary)
  901. detectionLevels = append(detectionLevels, derivedLevels...)
  902. levelSet.Detection = detectionLevels
  903. allLevels := make([]pipeline.AnalysisLevel, 0, 1+len(derivedLevels)+len(supportLevels)+1)
  904. allLevels = append(allLevels, primary)
  905. allLevels = append(allLevels, derivedLevels...)
  906. allLevels = append(allLevels, supportLevels...)
  907. if presentation.Name != "" {
  908. allLevels = append(allLevels, presentation)
  909. }
  910. levelSet.All = allLevels
  911. return surveillancePlan{
  912. Primary: primary,
  913. Levels: levels,
  914. LevelSet: levelSet,
  915. Presentation: presentation,
  916. Context: context,
  917. DetectionPolicy: detectionPolicy,
  918. Specs: specs,
  919. }
  920. }
  921. func sameIQBuffer(a []complex64, b []complex64) bool {
  922. if len(a) != len(b) {
  923. return false
  924. }
  925. if len(a) == 0 {
  926. return true
  927. }
  928. return &a[0] == &b[0]
  929. }
  930. func markWorkItemsStatus(items []pipeline.RefinementWorkItem, from string, to string, reason string) {
  931. for i := range items {
  932. if items[i].Status != from {
  933. continue
  934. }
  935. items[i].Status = to
  936. if reason != "" {
  937. items[i].Reason = reason
  938. }
  939. }
  940. }
  941. func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pipeline.Candidate) {
  942. if len(items) == 0 || len(candidates) == 0 {
  943. return
  944. }
  945. done := map[int64]struct{}{}
  946. for _, cand := range candidates {
  947. if cand.ID != 0 {
  948. done[cand.ID] = struct{}{}
  949. }
  950. }
  951. for i := range items {
  952. if _, ok := done[items[i].Candidate.ID]; !ok {
  953. continue
  954. }
  955. items[i].Status = pipeline.RefinementStatusCompleted
  956. items[i].Reason = pipeline.RefinementReasonCompleted
  957. }
  958. }
  959. func (rt *dspRuntime) setArbitration(policy pipeline.Policy, budget pipeline.BudgetModel, admission pipeline.RefinementAdmission, queue pipeline.DecisionQueueStats) {
  960. rt.arbitration = pipeline.BuildArbitrationState(policy, budget, admission, queue)
  961. }