Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

956 wiersze
29KB

  1. package main
  2. import (
  3. "fmt"
  4. "math"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "sdr-wideband-suite/internal/classifier"
  10. "sdr-wideband-suite/internal/config"
  11. "sdr-wideband-suite/internal/demod"
  12. "sdr-wideband-suite/internal/detector"
  13. "sdr-wideband-suite/internal/dsp"
  14. fftutil "sdr-wideband-suite/internal/fft"
  15. "sdr-wideband-suite/internal/fft/gpufft"
  16. "sdr-wideband-suite/internal/pipeline"
  17. "sdr-wideband-suite/internal/rds"
  18. "sdr-wideband-suite/internal/recorder"
  19. )
  20. type rdsState struct {
  21. dec rds.Decoder
  22. result rds.Result
  23. lastDecode time.Time
  24. busy int32
  25. mu sync.Mutex
  26. }
  27. type dspRuntime struct {
  28. cfg config.Config
  29. det *detector.Detector
  30. derivedDetectors map[string]*derivedDetector
  31. nextDerivedBase int64
  32. window []float64
  33. plan *fftutil.CmplxPlan
  34. detailWindow []float64
  35. detailPlan *fftutil.CmplxPlan
  36. detailFFT int
  37. survWindows map[int][]float64
  38. survPlans map[int]*fftutil.CmplxPlan
  39. survFIR map[int][]float64
  40. dcEnabled bool
  41. iqEnabled bool
  42. useGPU bool
  43. gpuEngine *gpufft.Engine
  44. rdsMap map[int64]*rdsState
  45. streamPhaseState map[int64]*streamExtractState
  46. streamOverlap *streamIQOverlap
  47. arbiter *pipeline.Arbiter
  48. arbitration pipeline.ArbitrationState
  49. gotSamples bool
  50. }
  51. type spectrumArtifacts struct {
  52. allIQ []complex64
  53. surveillanceIQ []complex64
  54. detailIQ []complex64
  55. surveillanceSpectrum []float64
  56. surveillanceSpectra []pipeline.SurveillanceLevelSpectrum
  57. surveillancePlan surveillancePlan
  58. detailSpectrum []float64
  59. finished []detector.Event
  60. detected []detector.Signal
  61. thresholds []float64
  62. noiseFloor float64
  63. now time.Time
  64. }
  65. type derivedDetector struct {
  66. det *detector.Detector
  67. sampleRate int
  68. fftSize int
  69. idBase int64
  70. }
  71. type surveillanceLevelSpec struct {
  72. Level pipeline.AnalysisLevel
  73. Decim int
  74. AllowGPU bool
  75. }
  76. type surveillancePlan struct {
  77. Primary pipeline.AnalysisLevel
  78. Levels []pipeline.AnalysisLevel
  79. LevelSet pipeline.SurveillanceLevelSet
  80. Presentation pipeline.AnalysisLevel
  81. Context pipeline.AnalysisContext
  82. Specs []surveillanceLevelSpec
  83. }
  84. const derivedIDBlock = int64(1_000_000_000)
  85. func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime {
  86. detailFFT := cfg.Refinement.DetailFFTSize
  87. if detailFFT <= 0 {
  88. detailFFT = cfg.FFTSize
  89. }
  90. rt := &dspRuntime{
  91. cfg: cfg,
  92. det: det,
  93. derivedDetectors: map[string]*derivedDetector{},
  94. nextDerivedBase: -derivedIDBlock,
  95. window: window,
  96. plan: fftutil.NewCmplxPlan(cfg.FFTSize),
  97. detailWindow: fftutil.Hann(detailFFT),
  98. detailPlan: fftutil.NewCmplxPlan(detailFFT),
  99. detailFFT: detailFFT,
  100. survWindows: map[int][]float64{},
  101. survPlans: map[int]*fftutil.CmplxPlan{},
  102. survFIR: map[int][]float64{},
  103. dcEnabled: cfg.DCBlock,
  104. iqEnabled: cfg.IQBalance,
  105. useGPU: cfg.UseGPUFFT,
  106. rdsMap: map[int64]*rdsState{},
  107. streamPhaseState: map[int64]*streamExtractState{},
  108. streamOverlap: &streamIQOverlap{},
  109. arbiter: pipeline.NewArbiter(),
  110. }
  111. if rt.useGPU && gpuState != nil {
  112. snap := gpuState.snapshot()
  113. if snap.Available {
  114. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  115. rt.gpuEngine = eng
  116. gpuState.set(true, nil)
  117. } else {
  118. gpuState.set(false, err)
  119. rt.useGPU = false
  120. }
  121. }
  122. }
  123. return rt
  124. }
  125. func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) {
  126. prevFFT := rt.cfg.FFTSize
  127. prevSampleRate := rt.cfg.SampleRate
  128. prevUseGPU := rt.useGPU
  129. prevDetailFFT := rt.detailFFT
  130. rt.cfg = upd.cfg
  131. if rec != nil {
  132. rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{
  133. Enabled: rt.cfg.Recorder.Enabled,
  134. MinSNRDb: rt.cfg.Recorder.MinSNRDb,
  135. MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second),
  136. MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second),
  137. PrerollMs: rt.cfg.Recorder.PrerollMs,
  138. RecordIQ: rt.cfg.Recorder.RecordIQ,
  139. RecordAudio: rt.cfg.Recorder.RecordAudio,
  140. AutoDemod: rt.cfg.Recorder.AutoDemod,
  141. AutoDecode: rt.cfg.Recorder.AutoDecode,
  142. MaxDiskMB: rt.cfg.Recorder.MaxDiskMB,
  143. OutputDir: rt.cfg.Recorder.OutputDir,
  144. ClassFilter: rt.cfg.Recorder.ClassFilter,
  145. RingSeconds: rt.cfg.Recorder.RingSeconds,
  146. DeemphasisUs: rt.cfg.Recorder.DeemphasisUs,
  147. ExtractionTaps: rt.cfg.Recorder.ExtractionTaps,
  148. ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult,
  149. }, rt.cfg.CenterHz, buildDecoderMap(rt.cfg))
  150. }
  151. if upd.det != nil {
  152. rt.det = upd.det
  153. }
  154. if upd.window != nil {
  155. rt.window = upd.window
  156. rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize)
  157. }
  158. detailFFT := rt.cfg.Refinement.DetailFFTSize
  159. if detailFFT <= 0 {
  160. detailFFT = rt.cfg.FFTSize
  161. }
  162. if detailFFT != prevDetailFFT {
  163. rt.detailFFT = detailFFT
  164. rt.detailWindow = fftutil.Hann(detailFFT)
  165. rt.detailPlan = fftutil.NewCmplxPlan(detailFFT)
  166. }
  167. if prevSampleRate != rt.cfg.SampleRate {
  168. rt.survFIR = map[int][]float64{}
  169. }
  170. if prevFFT != rt.cfg.FFTSize {
  171. rt.survWindows = map[int][]float64{}
  172. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  173. }
  174. if upd.det != nil || prevSampleRate != rt.cfg.SampleRate || prevFFT != rt.cfg.FFTSize {
  175. rt.derivedDetectors = map[string]*derivedDetector{}
  176. rt.nextDerivedBase = -derivedIDBlock
  177. }
  178. rt.dcEnabled = upd.dcBlock
  179. rt.iqEnabled = upd.iqBalance
  180. if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU {
  181. srcMgr.Flush()
  182. rt.gotSamples = false
  183. if rt.gpuEngine != nil {
  184. rt.gpuEngine.Close()
  185. rt.gpuEngine = nil
  186. }
  187. rt.useGPU = rt.cfg.UseGPUFFT
  188. if rt.useGPU && gpuState != nil {
  189. snap := gpuState.snapshot()
  190. if snap.Available {
  191. if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil {
  192. rt.gpuEngine = eng
  193. gpuState.set(true, nil)
  194. } else {
  195. gpuState.set(false, err)
  196. rt.useGPU = false
  197. }
  198. } else {
  199. gpuState.set(false, nil)
  200. rt.useGPU = false
  201. }
  202. } else if gpuState != nil {
  203. gpuState.set(false, nil)
  204. }
  205. }
  206. }
  207. func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 {
  208. return rt.spectrumFromIQWithPlan(iq, rt.window, rt.plan, gpuState, true)
  209. }
  210. func (rt *dspRuntime) spectrumFromIQWithPlan(iq []complex64, window []float64, plan *fftutil.CmplxPlan, gpuState *gpuStatus, allowGPU bool) []float64 {
  211. if len(iq) == 0 {
  212. return nil
  213. }
  214. if allowGPU && rt.useGPU && rt.gpuEngine != nil {
  215. gpuBuf := make([]complex64, len(iq))
  216. if len(window) == len(iq) {
  217. for i := 0; i < len(iq); i++ {
  218. v := iq[i]
  219. w := float32(window[i])
  220. gpuBuf[i] = complex(real(v)*w, imag(v)*w)
  221. }
  222. } else {
  223. copy(gpuBuf, iq)
  224. }
  225. out, err := rt.gpuEngine.Exec(gpuBuf)
  226. if err != nil {
  227. if gpuState != nil {
  228. gpuState.set(false, err)
  229. }
  230. rt.useGPU = false
  231. return fftutil.SpectrumWithPlan(gpuBuf, nil, plan)
  232. }
  233. return fftutil.SpectrumFromFFT(out)
  234. }
  235. return fftutil.SpectrumWithPlan(iq, window, plan)
  236. }
  237. func (rt *dspRuntime) windowForFFT(fftSize int) []float64 {
  238. if fftSize <= 0 {
  239. return nil
  240. }
  241. if fftSize == rt.cfg.FFTSize {
  242. return rt.window
  243. }
  244. if rt.survWindows == nil {
  245. rt.survWindows = map[int][]float64{}
  246. }
  247. if window, ok := rt.survWindows[fftSize]; ok {
  248. return window
  249. }
  250. window := fftutil.Hann(fftSize)
  251. rt.survWindows[fftSize] = window
  252. return window
  253. }
  254. func (rt *dspRuntime) planForFFT(fftSize int) *fftutil.CmplxPlan {
  255. if fftSize <= 0 {
  256. return nil
  257. }
  258. if fftSize == rt.cfg.FFTSize {
  259. return rt.plan
  260. }
  261. if rt.survPlans == nil {
  262. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  263. }
  264. if plan, ok := rt.survPlans[fftSize]; ok {
  265. return plan
  266. }
  267. plan := fftutil.NewCmplxPlan(fftSize)
  268. rt.survPlans[fftSize] = plan
  269. return plan
  270. }
  271. func (rt *dspRuntime) spectrumForLevel(iq []complex64, fftSize int, gpuState *gpuStatus, allowGPU bool) []float64 {
  272. if len(iq) == 0 || fftSize <= 0 {
  273. return nil
  274. }
  275. if len(iq) > fftSize {
  276. iq = iq[len(iq)-fftSize:]
  277. }
  278. window := rt.windowForFFT(fftSize)
  279. plan := rt.planForFFT(fftSize)
  280. return rt.spectrumFromIQWithPlan(iq, window, plan, gpuState, allowGPU)
  281. }
  282. func sanitizeSpectrum(spectrum []float64) {
  283. for i := range spectrum {
  284. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  285. spectrum[i] = -200
  286. }
  287. }
  288. }
  289. func (rt *dspRuntime) decimationTaps(factor int) []float64 {
  290. if factor <= 1 {
  291. return nil
  292. }
  293. if rt.survFIR == nil {
  294. rt.survFIR = map[int][]float64{}
  295. }
  296. if taps, ok := rt.survFIR[factor]; ok {
  297. return taps
  298. }
  299. cutoff := float64(rt.cfg.SampleRate/factor) * 0.5 * 0.8
  300. taps := dsp.LowpassFIR(cutoff, rt.cfg.SampleRate, 101)
  301. rt.survFIR[factor] = taps
  302. return taps
  303. }
  304. func (rt *dspRuntime) decimateSurveillanceIQ(iq []complex64, factor int) []complex64 {
  305. if factor <= 1 {
  306. return iq
  307. }
  308. taps := rt.decimationTaps(factor)
  309. if len(taps) == 0 {
  310. return dsp.Decimate(iq, factor)
  311. }
  312. filtered := dsp.ApplyFIR(iq, taps)
  313. return dsp.Decimate(filtered, factor)
  314. }
  315. func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) {
  316. required := rt.cfg.FFTSize
  317. if rt.detailFFT > required {
  318. required = rt.detailFFT
  319. }
  320. available := required
  321. st := srcMgr.Stats()
  322. if st.BufferSamples > required {
  323. available = (st.BufferSamples / required) * required
  324. if available < required {
  325. available = required
  326. }
  327. }
  328. allIQ, err := srcMgr.ReadIQ(available)
  329. if err != nil {
  330. return nil, err
  331. }
  332. if rec != nil {
  333. rec.Ingest(time.Now(), allIQ)
  334. }
  335. survIQ := allIQ
  336. if len(allIQ) > rt.cfg.FFTSize {
  337. survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:]
  338. }
  339. detailIQ := survIQ
  340. if rt.detailFFT > 0 && len(allIQ) >= rt.detailFFT {
  341. detailIQ = allIQ[len(allIQ)-rt.detailFFT:]
  342. }
  343. if rt.dcEnabled {
  344. dcBlocker.Apply(allIQ)
  345. }
  346. if rt.iqEnabled {
  347. dsp.IQBalance(survIQ)
  348. if !sameIQBuffer(detailIQ, survIQ) {
  349. detailIQ = append([]complex64(nil), detailIQ...)
  350. dsp.IQBalance(detailIQ)
  351. }
  352. }
  353. survSpectrum := rt.spectrumFromIQ(survIQ, gpuState)
  354. sanitizeSpectrum(survSpectrum)
  355. detailSpectrum := survSpectrum
  356. if !sameIQBuffer(detailIQ, survIQ) {
  357. detailSpectrum = rt.spectrumFromIQWithPlan(detailIQ, rt.detailWindow, rt.detailPlan, gpuState, false)
  358. sanitizeSpectrum(detailSpectrum)
  359. }
  360. policy := pipeline.PolicyFromConfig(rt.cfg)
  361. plan := rt.buildSurveillancePlan(policy)
  362. surveillanceSpectra := make([]pipeline.SurveillanceLevelSpectrum, 0, len(plan.Specs))
  363. for _, spec := range plan.Specs {
  364. if spec.Level.FFTSize <= 0 {
  365. continue
  366. }
  367. var spectrum []float64
  368. if spec.Decim <= 1 {
  369. if spec.Level.FFTSize == len(survSpectrum) {
  370. spectrum = survSpectrum
  371. } else {
  372. spectrum = rt.spectrumForLevel(survIQ, spec.Level.FFTSize, gpuState, spec.AllowGPU)
  373. sanitizeSpectrum(spectrum)
  374. }
  375. } else {
  376. required := spec.Level.FFTSize * spec.Decim
  377. if required > len(survIQ) {
  378. continue
  379. }
  380. src := survIQ
  381. if len(src) > required {
  382. src = src[len(src)-required:]
  383. }
  384. decimated := rt.decimateSurveillanceIQ(src, spec.Decim)
  385. spectrum = rt.spectrumForLevel(decimated, spec.Level.FFTSize, gpuState, false)
  386. sanitizeSpectrum(spectrum)
  387. }
  388. if len(spectrum) == 0 {
  389. continue
  390. }
  391. surveillanceSpectra = append(surveillanceSpectra, pipeline.SurveillanceLevelSpectrum{Level: spec.Level, Spectrum: spectrum})
  392. }
  393. now := time.Now()
  394. finished, detected := rt.det.Process(now, survSpectrum, rt.cfg.CenterHz)
  395. return &spectrumArtifacts{
  396. allIQ: allIQ,
  397. surveillanceIQ: survIQ,
  398. detailIQ: detailIQ,
  399. surveillanceSpectrum: survSpectrum,
  400. surveillanceSpectra: surveillanceSpectra,
  401. surveillancePlan: plan,
  402. detailSpectrum: detailSpectrum,
  403. finished: finished,
  404. detected: detected,
  405. thresholds: rt.det.LastThresholds(),
  406. noiseFloor: rt.det.LastNoiseFloor(),
  407. now: now,
  408. }, nil
  409. }
  410. func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.SurveillanceResult {
  411. if art == nil {
  412. return pipeline.SurveillanceResult{}
  413. }
  414. policy := pipeline.PolicyFromConfig(rt.cfg)
  415. plan := art.surveillancePlan
  416. if plan.Primary.Name == "" {
  417. plan = rt.buildSurveillancePlan(policy)
  418. }
  419. primaryCandidates := pipeline.CandidatesFromSignalsWithLevel(art.detected, "surveillance-detector", plan.Primary)
  420. derivedCandidates := rt.detectDerivedCandidates(art, plan)
  421. candidates := pipeline.FuseCandidates(primaryCandidates, derivedCandidates)
  422. scheduled := pipeline.ScheduleCandidates(candidates, policy)
  423. return pipeline.SurveillanceResult{
  424. Level: plan.Primary,
  425. Levels: plan.Levels,
  426. LevelSet: plan.LevelSet,
  427. DisplayLevel: plan.Presentation,
  428. Context: plan.Context,
  429. Spectra: art.surveillanceSpectra,
  430. Candidates: candidates,
  431. Scheduled: scheduled,
  432. Finished: art.finished,
  433. Signals: art.detected,
  434. NoiseFloor: art.noiseFloor,
  435. Thresholds: art.thresholds,
  436. }
  437. }
  438. func (rt *dspRuntime) detectDerivedCandidates(art *spectrumArtifacts, plan surveillancePlan) []pipeline.Candidate {
  439. if art == nil || len(plan.LevelSet.Derived) == 0 {
  440. return nil
  441. }
  442. spectra := map[string][]float64{}
  443. for _, spec := range art.surveillanceSpectra {
  444. if spec.Level.Name == "" || len(spec.Spectrum) == 0 {
  445. continue
  446. }
  447. spectra[spec.Level.Name] = spec.Spectrum
  448. }
  449. if len(spectra) == 0 {
  450. return nil
  451. }
  452. out := make([]pipeline.Candidate, 0, len(plan.LevelSet.Derived))
  453. for _, level := range plan.LevelSet.Derived {
  454. if level.Name == "" {
  455. continue
  456. }
  457. if !pipeline.IsDetectionLevel(level) {
  458. continue
  459. }
  460. spectrum := spectra[level.Name]
  461. if len(spectrum) == 0 {
  462. continue
  463. }
  464. entry := rt.derivedDetectorForLevel(level)
  465. if entry == nil || entry.det == nil {
  466. continue
  467. }
  468. _, signals := entry.det.Process(art.now, spectrum, level.CenterHz)
  469. if len(signals) == 0 {
  470. continue
  471. }
  472. cands := pipeline.CandidatesFromSignalsWithLevel(signals, "surveillance-derived", level)
  473. for i := range cands {
  474. if cands[i].ID == 0 {
  475. continue
  476. }
  477. cands[i].ID = entry.idBase - cands[i].ID
  478. }
  479. out = append(out, cands...)
  480. }
  481. if len(out) == 0 {
  482. return nil
  483. }
  484. return out
  485. }
  486. func (rt *dspRuntime) derivedDetectorForLevel(level pipeline.AnalysisLevel) *derivedDetector {
  487. if level.SampleRate <= 0 || level.FFTSize <= 0 {
  488. return nil
  489. }
  490. if rt.derivedDetectors == nil {
  491. rt.derivedDetectors = map[string]*derivedDetector{}
  492. }
  493. key := level.Name
  494. if key == "" {
  495. key = fmt.Sprintf("%d:%d", level.SampleRate, level.FFTSize)
  496. }
  497. entry := rt.derivedDetectors[key]
  498. if entry != nil && entry.sampleRate == level.SampleRate && entry.fftSize == level.FFTSize {
  499. return entry
  500. }
  501. if rt.nextDerivedBase == 0 {
  502. rt.nextDerivedBase = -derivedIDBlock
  503. }
  504. entry = &derivedDetector{
  505. det: detector.New(rt.cfg.Detector, level.SampleRate, level.FFTSize),
  506. sampleRate: level.SampleRate,
  507. fftSize: level.FFTSize,
  508. idBase: rt.nextDerivedBase,
  509. }
  510. rt.nextDerivedBase -= derivedIDBlock
  511. rt.derivedDetectors[key] = entry
  512. return entry
  513. }
  514. func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now time.Time) pipeline.RefinementInput {
  515. policy := pipeline.PolicyFromConfig(rt.cfg)
  516. plan := pipeline.BuildRefinementPlan(surv.Candidates, policy)
  517. admission := rt.arbiter.AdmitRefinement(plan, policy, now)
  518. plan = admission.Plan
  519. workItems := make([]pipeline.RefinementWorkItem, 0, len(admission.WorkItems))
  520. if len(admission.WorkItems) > 0 {
  521. workItems = append(workItems, admission.WorkItems...)
  522. }
  523. scheduled := append([]pipeline.ScheduledCandidate(nil), admission.Admitted...)
  524. workIndex := map[int64]int{}
  525. for i := range workItems {
  526. if workItems[i].Candidate.ID == 0 {
  527. continue
  528. }
  529. workIndex[workItems[i].Candidate.ID] = i
  530. }
  531. windows := make([]pipeline.RefinementWindow, 0, len(scheduled))
  532. for _, sc := range scheduled {
  533. window := pipeline.RefinementWindowForCandidate(policy, sc.Candidate)
  534. windows = append(windows, window)
  535. if idx, ok := workIndex[sc.Candidate.ID]; ok {
  536. workItems[idx].Window = window
  537. }
  538. }
  539. detailFFT := rt.cfg.Refinement.DetailFFTSize
  540. if detailFFT <= 0 {
  541. detailFFT = rt.cfg.FFTSize
  542. }
  543. levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate))
  544. if _, maxSpan, ok := windowSpanBounds(windows); ok {
  545. levelSpan = maxSpan
  546. }
  547. level := analysisLevel("refinement", "refinement", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "refinement-window", 1, rt.cfg.SampleRate)
  548. detailLevel := analysisLevel("detail", "detail", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "detail-spectrum", 1, rt.cfg.SampleRate)
  549. if len(workItems) > 0 {
  550. for i := range workItems {
  551. item := &workItems[i]
  552. if item.Window.SpanHz <= 0 {
  553. continue
  554. }
  555. item.Execution = &pipeline.RefinementExecution{
  556. Stage: "refine",
  557. SampleRate: rt.cfg.SampleRate,
  558. FFTSize: detailFFT,
  559. CenterHz: item.Window.CenterHz,
  560. SpanHz: item.Window.SpanHz,
  561. Source: detailLevel.Source,
  562. }
  563. }
  564. }
  565. input := pipeline.RefinementInput{
  566. Level: level,
  567. Detail: detailLevel,
  568. Context: surv.Context,
  569. Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan},
  570. Budgets: pipeline.BudgetModelFromPolicy(policy),
  571. Admission: admission.Admission,
  572. Candidates: append([]pipeline.Candidate(nil), surv.Candidates...),
  573. Scheduled: scheduled,
  574. WorkItems: workItems,
  575. Plan: plan,
  576. Windows: windows,
  577. SampleRate: rt.cfg.SampleRate,
  578. FFTSize: detailFFT,
  579. CenterHz: rt.cfg.CenterHz,
  580. Source: "surveillance-detector",
  581. }
  582. input.Context.Refinement = level
  583. input.Context.Detail = detailLevel
  584. if !policy.RefinementEnabled {
  585. for i := range input.WorkItems {
  586. item := &input.WorkItems[i]
  587. if item.Status == pipeline.RefinementStatusDropped {
  588. continue
  589. }
  590. item.Status = pipeline.RefinementStatusDropped
  591. item.Reason = pipeline.RefinementReasonDisabled
  592. }
  593. input.Scheduled = nil
  594. input.Request.Reason = pipeline.ReasonAdmissionDisabled
  595. input.Admission.Reason = pipeline.ReasonAdmissionDisabled
  596. input.Admission.Admitted = 0
  597. input.Admission.Skipped = 0
  598. input.Admission.Displaced = 0
  599. input.Plan.Selected = nil
  600. input.Plan.DroppedByBudget = 0
  601. }
  602. rt.setArbitration(policy, input.Budgets, input.Admission, rt.arbitration.Queue)
  603. return input
  604. }
  605. func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep {
  606. input := rt.buildRefinementInput(surv, art.now)
  607. markWorkItemsStatus(input.WorkItems, pipeline.RefinementStatusAdmitted, pipeline.RefinementStatusRunning, pipeline.RefinementReasonRunning)
  608. result := rt.refineSignals(art, input, extractMgr, rec)
  609. markWorkItemsCompleted(input.WorkItems, result.Candidates)
  610. return pipeline.RefinementStep{Input: input, Result: result}
  611. }
  612. func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.RefinementInput, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult {
  613. if art == nil || len(art.detailIQ) == 0 || len(input.Scheduled) == 0 {
  614. return pipeline.RefinementResult{}
  615. }
  616. policy := pipeline.PolicyFromConfig(rt.cfg)
  617. selectedCandidates := make([]pipeline.Candidate, 0, len(input.Scheduled))
  618. selectedSignals := make([]detector.Signal, 0, len(input.Scheduled))
  619. for _, sc := range input.Scheduled {
  620. selectedCandidates = append(selectedCandidates, sc.Candidate)
  621. selectedSignals = append(selectedSignals, detector.Signal{
  622. ID: sc.Candidate.ID,
  623. FirstBin: sc.Candidate.FirstBin,
  624. LastBin: sc.Candidate.LastBin,
  625. CenterHz: sc.Candidate.CenterHz,
  626. BWHz: sc.Candidate.BandwidthHz,
  627. PeakDb: sc.Candidate.PeakDb,
  628. SNRDb: sc.Candidate.SNRDb,
  629. NoiseDb: sc.Candidate.NoiseDb,
  630. })
  631. }
  632. sampleRate := input.SampleRate
  633. fftSize := input.FFTSize
  634. centerHz := input.CenterHz
  635. if sampleRate <= 0 {
  636. sampleRate = rt.cfg.SampleRate
  637. }
  638. if fftSize <= 0 {
  639. fftSize = rt.cfg.FFTSize
  640. }
  641. if centerHz == 0 {
  642. centerHz = rt.cfg.CenterHz
  643. }
  644. snips, snipRates := extractSignalIQBatch(extractMgr, art.detailIQ, sampleRate, centerHz, selectedSignals)
  645. refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.detailSpectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
  646. signals := make([]detector.Signal, 0, len(refined))
  647. decisions := make([]pipeline.SignalDecision, 0, len(refined))
  648. for i, ref := range refined {
  649. sig := ref.Signal
  650. signals = append(signals, sig)
  651. cls := sig.Class
  652. snipRate := ref.SnippetRate
  653. decision := pipeline.DecideSignalAction(policy, ref.Candidate, cls)
  654. decisions = append(decisions, decision)
  655. if cls != nil {
  656. pll := classifier.PLLResult{}
  657. if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
  658. pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
  659. cls.PLL = &pll
  660. signals[i].PLL = &pll
  661. if cls.ModType == classifier.ClassWFM && pll.Stereo {
  662. cls.ModType = classifier.ClassWFMStereo
  663. }
  664. }
  665. if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil {
  666. rt.updateRDS(art.now, rec, &signals[i], cls)
  667. }
  668. }
  669. }
  670. budget := pipeline.BudgetModelFromPolicy(policy)
  671. queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy)
  672. rt.setArbitration(policy, budget, input.Admission, queueStats)
  673. summary := summarizeDecisions(decisions)
  674. if rec != nil {
  675. if summary.RecordEnabled > 0 {
  676. rt.cfg.Recorder.Enabled = true
  677. }
  678. if summary.DecodeEnabled > 0 {
  679. rt.cfg.Recorder.AutoDecode = true
  680. }
  681. }
  682. rt.det.UpdateClasses(signals)
  683. return pipeline.RefinementResult{Level: input.Level, Signals: signals, Decisions: decisions, Candidates: selectedCandidates}
  684. }
  685. func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) {
  686. if sig == nil || cls == nil {
  687. return
  688. }
  689. keyHz := sig.CenterHz
  690. if sig.PLL != nil && sig.PLL.ExactHz != 0 {
  691. keyHz = sig.PLL.ExactHz
  692. }
  693. key := int64(math.Round(keyHz / 25000.0))
  694. st := rt.rdsMap[key]
  695. if st == nil {
  696. st = &rdsState{}
  697. rt.rdsMap[key] = st
  698. }
  699. if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
  700. st.lastDecode = now
  701. atomic.StoreInt32(&st.busy, 1)
  702. go func(st *rdsState, sigHz float64) {
  703. defer atomic.StoreInt32(&st.busy, 0)
  704. ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
  705. if len(ringIQ) < ringSR || ringSR <= 0 {
  706. return
  707. }
  708. offset := sigHz - ringCenter
  709. shifted := dsp.FreqShift(ringIQ, ringSR, offset)
  710. decim1 := ringSR / 1000000
  711. if decim1 < 1 {
  712. decim1 = 1
  713. }
  714. lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
  715. f1 := dsp.ApplyFIR(shifted, lp1)
  716. d1 := dsp.Decimate(f1, decim1)
  717. rate1 := ringSR / decim1
  718. decim2 := rate1 / 250000
  719. if decim2 < 1 {
  720. decim2 = 1
  721. }
  722. lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
  723. f2 := dsp.ApplyFIR(d1, lp2)
  724. decimated := dsp.Decimate(f2, decim2)
  725. actualRate := rate1 / decim2
  726. rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
  727. if len(rdsBase.Samples) == 0 {
  728. return
  729. }
  730. st.mu.Lock()
  731. result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
  732. if result.PS != "" {
  733. st.result = result
  734. }
  735. st.mu.Unlock()
  736. }(st, sig.CenterHz)
  737. }
  738. st.mu.Lock()
  739. ps := st.result.PS
  740. st.mu.Unlock()
  741. if ps != "" && sig.PLL != nil {
  742. sig.PLL.RDSStation = strings.TrimSpace(ps)
  743. cls.PLL = sig.PLL
  744. }
  745. }
  746. func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) {
  747. if len(rt.rdsMap) > 0 {
  748. activeIDs := make(map[int64]bool, len(displaySignals))
  749. for _, s := range displaySignals {
  750. keyHz := s.CenterHz
  751. if s.PLL != nil && s.PLL.ExactHz != 0 {
  752. keyHz = s.PLL.ExactHz
  753. }
  754. activeIDs[int64(math.Round(keyHz/25000.0))] = true
  755. }
  756. for id := range rt.rdsMap {
  757. if !activeIDs[id] {
  758. delete(rt.rdsMap, id)
  759. }
  760. }
  761. }
  762. if len(rt.streamPhaseState) > 0 {
  763. sigIDs := make(map[int64]bool, len(displaySignals))
  764. for _, s := range displaySignals {
  765. sigIDs[s.ID] = true
  766. }
  767. for id := range rt.streamPhaseState {
  768. if !sigIDs[id] {
  769. delete(rt.streamPhaseState, id)
  770. }
  771. }
  772. }
  773. if rec != nil && len(displaySignals) > 0 {
  774. aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
  775. _ = aqCfg
  776. }
  777. }
  778. func spanForPolicy(policy pipeline.Policy, fallback float64) float64 {
  779. if policy.MonitorSpanHz > 0 {
  780. return policy.MonitorSpanHz
  781. }
  782. if policy.MonitorStartHz != 0 && policy.MonitorEndHz != 0 && policy.MonitorEndHz > policy.MonitorStartHz {
  783. return policy.MonitorEndHz - policy.MonitorStartHz
  784. }
  785. return fallback
  786. }
  787. func windowSpanBounds(windows []pipeline.RefinementWindow) (float64, float64, bool) {
  788. minSpan := 0.0
  789. maxSpan := 0.0
  790. ok := false
  791. for _, w := range windows {
  792. if w.SpanHz <= 0 {
  793. continue
  794. }
  795. if !ok || w.SpanHz < minSpan {
  796. minSpan = w.SpanHz
  797. }
  798. if !ok || w.SpanHz > maxSpan {
  799. maxSpan = w.SpanHz
  800. }
  801. ok = true
  802. }
  803. return minSpan, maxSpan, ok
  804. }
  805. func analysisLevel(name, role, truth string, sampleRate int, fftSize int, centerHz float64, spanHz float64, source string, decimation int, baseRate int) pipeline.AnalysisLevel {
  806. level := pipeline.AnalysisLevel{
  807. Name: name,
  808. Role: role,
  809. Truth: truth,
  810. SampleRate: sampleRate,
  811. FFTSize: fftSize,
  812. CenterHz: centerHz,
  813. SpanHz: spanHz,
  814. Source: source,
  815. }
  816. if level.SampleRate > 0 && level.FFTSize > 0 {
  817. level.BinHz = float64(level.SampleRate) / float64(level.FFTSize)
  818. }
  819. if decimation > 0 {
  820. level.Decimation = decimation
  821. } else if baseRate > 0 && level.SampleRate > 0 && baseRate%level.SampleRate == 0 {
  822. level.Decimation = baseRate / level.SampleRate
  823. }
  824. return level
  825. }
  826. func (rt *dspRuntime) buildSurveillancePlan(policy pipeline.Policy) surveillancePlan {
  827. baseRate := rt.cfg.SampleRate
  828. baseFFT := rt.cfg.Surveillance.AnalysisFFTSize
  829. if baseFFT <= 0 {
  830. baseFFT = rt.cfg.FFTSize
  831. }
  832. span := spanForPolicy(policy, float64(baseRate))
  833. primary := analysisLevel("surveillance", "surveillance", "surveillance", baseRate, baseFFT, rt.cfg.CenterHz, span, "baseband", 1, baseRate)
  834. levels := []pipeline.AnalysisLevel{primary}
  835. specs := []surveillanceLevelSpec{{Level: primary, Decim: 1, AllowGPU: true}}
  836. context := pipeline.AnalysisContext{Surveillance: primary}
  837. derivedLevels := make([]pipeline.AnalysisLevel, 0, 2)
  838. strategy := strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy))
  839. switch strategy {
  840. case "multi-res", "multi-resolution", "multi", "multi_res":
  841. decim := 2
  842. derivedRate := baseRate / decim
  843. derivedFFT := baseFFT / decim
  844. if derivedRate >= 200000 && derivedFFT >= 256 {
  845. derivedSpan := spanForPolicy(policy, float64(derivedRate))
  846. derived := analysisLevel("surveillance-lowres", "surveillance-lowres", "surveillance", derivedRate, derivedFFT, rt.cfg.CenterHz, derivedSpan, "decimated", decim, baseRate)
  847. levels = append(levels, derived)
  848. specs = append(specs, surveillanceLevelSpec{Level: derived, Decim: decim, AllowGPU: false})
  849. context.Derived = append(context.Derived, derived)
  850. derivedLevels = append(derivedLevels, derived)
  851. }
  852. }
  853. presentation := analysisLevel("presentation", "presentation", "presentation", baseRate, rt.cfg.Surveillance.DisplayBins, rt.cfg.CenterHz, span, "display", 1, baseRate)
  854. context.Presentation = presentation
  855. levelSet := pipeline.SurveillanceLevelSet{
  856. Primary: primary,
  857. Derived: append([]pipeline.AnalysisLevel(nil), derivedLevels...),
  858. Presentation: presentation,
  859. }
  860. allLevels := make([]pipeline.AnalysisLevel, 0, 1+len(derivedLevels)+1)
  861. allLevels = append(allLevels, primary)
  862. allLevels = append(allLevels, derivedLevels...)
  863. if presentation.Name != "" {
  864. allLevels = append(allLevels, presentation)
  865. }
  866. levelSet.All = allLevels
  867. return surveillancePlan{
  868. Primary: primary,
  869. Levels: levels,
  870. LevelSet: levelSet,
  871. Presentation: presentation,
  872. Context: context,
  873. Specs: specs,
  874. }
  875. }
  876. func sameIQBuffer(a []complex64, b []complex64) bool {
  877. if len(a) != len(b) {
  878. return false
  879. }
  880. if len(a) == 0 {
  881. return true
  882. }
  883. return &a[0] == &b[0]
  884. }
  885. func markWorkItemsStatus(items []pipeline.RefinementWorkItem, from string, to string, reason string) {
  886. for i := range items {
  887. if items[i].Status != from {
  888. continue
  889. }
  890. items[i].Status = to
  891. if reason != "" {
  892. items[i].Reason = reason
  893. }
  894. }
  895. }
  896. func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pipeline.Candidate) {
  897. if len(items) == 0 || len(candidates) == 0 {
  898. return
  899. }
  900. done := map[int64]struct{}{}
  901. for _, cand := range candidates {
  902. if cand.ID != 0 {
  903. done[cand.ID] = struct{}{}
  904. }
  905. }
  906. for i := range items {
  907. if _, ok := done[items[i].Candidate.ID]; !ok {
  908. continue
  909. }
  910. items[i].Status = pipeline.RefinementStatusCompleted
  911. items[i].Reason = pipeline.RefinementReasonCompleted
  912. }
  913. }
  914. func (rt *dspRuntime) setArbitration(policy pipeline.Policy, budget pipeline.BudgetModel, admission pipeline.RefinementAdmission, queue pipeline.DecisionQueueStats) {
  915. rt.arbitration = pipeline.BuildArbitrationState(policy, budget, admission, queue)
  916. }