Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

858 рядки
26KB

  1. package main
  2. import (
  3. "math"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "sdr-wideband-suite/internal/classifier"
  9. "sdr-wideband-suite/internal/config"
  10. "sdr-wideband-suite/internal/demod"
  11. "sdr-wideband-suite/internal/detector"
  12. "sdr-wideband-suite/internal/dsp"
  13. fftutil "sdr-wideband-suite/internal/fft"
  14. "sdr-wideband-suite/internal/fft/gpufft"
  15. "sdr-wideband-suite/internal/pipeline"
  16. "sdr-wideband-suite/internal/rds"
  17. "sdr-wideband-suite/internal/recorder"
  18. )
  19. type rdsState struct {
  20. dec rds.Decoder
  21. result rds.Result
  22. lastDecode time.Time
  23. busy int32
  24. mu sync.Mutex
  25. }
  26. type dspRuntime struct {
  27. cfg config.Config
  28. det *detector.Detector
  29. window []float64
  30. plan *fftutil.CmplxPlan
  31. detailWindow []float64
  32. detailPlan *fftutil.CmplxPlan
  33. detailFFT int
  34. survWindows map[int][]float64
  35. survPlans map[int]*fftutil.CmplxPlan
  36. survFIR map[int][]float64
  37. dcEnabled bool
  38. iqEnabled bool
  39. useGPU bool
  40. gpuEngine *gpufft.Engine
  41. rdsMap map[int64]*rdsState
  42. streamPhaseState map[int64]*streamExtractState
  43. streamOverlap *streamIQOverlap
  44. arbiter *pipeline.Arbiter
  45. arbitration pipeline.ArbitrationState
  46. gotSamples bool
  47. }
  48. type spectrumArtifacts struct {
  49. allIQ []complex64
  50. surveillanceIQ []complex64
  51. detailIQ []complex64
  52. surveillanceSpectrum []float64
  53. surveillanceSpectra []pipeline.SurveillanceLevelSpectrum
  54. surveillancePlan surveillancePlan
  55. detailSpectrum []float64
  56. finished []detector.Event
  57. detected []detector.Signal
  58. thresholds []float64
  59. noiseFloor float64
  60. now time.Time
  61. }
  62. type surveillanceLevelSpec struct {
  63. Level pipeline.AnalysisLevel
  64. Decim int
  65. AllowGPU bool
  66. }
  67. type surveillancePlan struct {
  68. Primary pipeline.AnalysisLevel
  69. Levels []pipeline.AnalysisLevel
  70. LevelSet pipeline.SurveillanceLevelSet
  71. Presentation pipeline.AnalysisLevel
  72. Context pipeline.AnalysisContext
  73. Specs []surveillanceLevelSpec
  74. }
  75. func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime {
  76. detailFFT := cfg.Refinement.DetailFFTSize
  77. if detailFFT <= 0 {
  78. detailFFT = cfg.FFTSize
  79. }
  80. rt := &dspRuntime{
  81. cfg: cfg,
  82. det: det,
  83. window: window,
  84. plan: fftutil.NewCmplxPlan(cfg.FFTSize),
  85. detailWindow: fftutil.Hann(detailFFT),
  86. detailPlan: fftutil.NewCmplxPlan(detailFFT),
  87. detailFFT: detailFFT,
  88. survWindows: map[int][]float64{},
  89. survPlans: map[int]*fftutil.CmplxPlan{},
  90. survFIR: map[int][]float64{},
  91. dcEnabled: cfg.DCBlock,
  92. iqEnabled: cfg.IQBalance,
  93. useGPU: cfg.UseGPUFFT,
  94. rdsMap: map[int64]*rdsState{},
  95. streamPhaseState: map[int64]*streamExtractState{},
  96. streamOverlap: &streamIQOverlap{},
  97. arbiter: pipeline.NewArbiter(),
  98. }
  99. if rt.useGPU && gpuState != nil {
  100. snap := gpuState.snapshot()
  101. if snap.Available {
  102. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  103. rt.gpuEngine = eng
  104. gpuState.set(true, nil)
  105. } else {
  106. gpuState.set(false, err)
  107. rt.useGPU = false
  108. }
  109. }
  110. }
  111. return rt
  112. }
  113. func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) {
  114. prevFFT := rt.cfg.FFTSize
  115. prevSampleRate := rt.cfg.SampleRate
  116. prevUseGPU := rt.useGPU
  117. prevDetailFFT := rt.detailFFT
  118. rt.cfg = upd.cfg
  119. if rec != nil {
  120. rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{
  121. Enabled: rt.cfg.Recorder.Enabled,
  122. MinSNRDb: rt.cfg.Recorder.MinSNRDb,
  123. MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second),
  124. MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second),
  125. PrerollMs: rt.cfg.Recorder.PrerollMs,
  126. RecordIQ: rt.cfg.Recorder.RecordIQ,
  127. RecordAudio: rt.cfg.Recorder.RecordAudio,
  128. AutoDemod: rt.cfg.Recorder.AutoDemod,
  129. AutoDecode: rt.cfg.Recorder.AutoDecode,
  130. MaxDiskMB: rt.cfg.Recorder.MaxDiskMB,
  131. OutputDir: rt.cfg.Recorder.OutputDir,
  132. ClassFilter: rt.cfg.Recorder.ClassFilter,
  133. RingSeconds: rt.cfg.Recorder.RingSeconds,
  134. DeemphasisUs: rt.cfg.Recorder.DeemphasisUs,
  135. ExtractionTaps: rt.cfg.Recorder.ExtractionTaps,
  136. ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult,
  137. }, rt.cfg.CenterHz, buildDecoderMap(rt.cfg))
  138. }
  139. if upd.det != nil {
  140. rt.det = upd.det
  141. }
  142. if upd.window != nil {
  143. rt.window = upd.window
  144. rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize)
  145. }
  146. detailFFT := rt.cfg.Refinement.DetailFFTSize
  147. if detailFFT <= 0 {
  148. detailFFT = rt.cfg.FFTSize
  149. }
  150. if detailFFT != prevDetailFFT {
  151. rt.detailFFT = detailFFT
  152. rt.detailWindow = fftutil.Hann(detailFFT)
  153. rt.detailPlan = fftutil.NewCmplxPlan(detailFFT)
  154. }
  155. if prevSampleRate != rt.cfg.SampleRate {
  156. rt.survFIR = map[int][]float64{}
  157. }
  158. if prevFFT != rt.cfg.FFTSize {
  159. rt.survWindows = map[int][]float64{}
  160. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  161. }
  162. rt.dcEnabled = upd.dcBlock
  163. rt.iqEnabled = upd.iqBalance
  164. if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU {
  165. srcMgr.Flush()
  166. rt.gotSamples = false
  167. if rt.gpuEngine != nil {
  168. rt.gpuEngine.Close()
  169. rt.gpuEngine = nil
  170. }
  171. rt.useGPU = rt.cfg.UseGPUFFT
  172. if rt.useGPU && gpuState != nil {
  173. snap := gpuState.snapshot()
  174. if snap.Available {
  175. if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil {
  176. rt.gpuEngine = eng
  177. gpuState.set(true, nil)
  178. } else {
  179. gpuState.set(false, err)
  180. rt.useGPU = false
  181. }
  182. } else {
  183. gpuState.set(false, nil)
  184. rt.useGPU = false
  185. }
  186. } else if gpuState != nil {
  187. gpuState.set(false, nil)
  188. }
  189. }
  190. }
  191. func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 {
  192. return rt.spectrumFromIQWithPlan(iq, rt.window, rt.plan, gpuState, true)
  193. }
  194. func (rt *dspRuntime) spectrumFromIQWithPlan(iq []complex64, window []float64, plan *fftutil.CmplxPlan, gpuState *gpuStatus, allowGPU bool) []float64 {
  195. if len(iq) == 0 {
  196. return nil
  197. }
  198. if allowGPU && rt.useGPU && rt.gpuEngine != nil {
  199. gpuBuf := make([]complex64, len(iq))
  200. if len(window) == len(iq) {
  201. for i := 0; i < len(iq); i++ {
  202. v := iq[i]
  203. w := float32(window[i])
  204. gpuBuf[i] = complex(real(v)*w, imag(v)*w)
  205. }
  206. } else {
  207. copy(gpuBuf, iq)
  208. }
  209. out, err := rt.gpuEngine.Exec(gpuBuf)
  210. if err != nil {
  211. if gpuState != nil {
  212. gpuState.set(false, err)
  213. }
  214. rt.useGPU = false
  215. return fftutil.SpectrumWithPlan(gpuBuf, nil, plan)
  216. }
  217. return fftutil.SpectrumFromFFT(out)
  218. }
  219. return fftutil.SpectrumWithPlan(iq, window, plan)
  220. }
  221. func (rt *dspRuntime) windowForFFT(fftSize int) []float64 {
  222. if fftSize <= 0 {
  223. return nil
  224. }
  225. if fftSize == rt.cfg.FFTSize {
  226. return rt.window
  227. }
  228. if rt.survWindows == nil {
  229. rt.survWindows = map[int][]float64{}
  230. }
  231. if window, ok := rt.survWindows[fftSize]; ok {
  232. return window
  233. }
  234. window := fftutil.Hann(fftSize)
  235. rt.survWindows[fftSize] = window
  236. return window
  237. }
  238. func (rt *dspRuntime) planForFFT(fftSize int) *fftutil.CmplxPlan {
  239. if fftSize <= 0 {
  240. return nil
  241. }
  242. if fftSize == rt.cfg.FFTSize {
  243. return rt.plan
  244. }
  245. if rt.survPlans == nil {
  246. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  247. }
  248. if plan, ok := rt.survPlans[fftSize]; ok {
  249. return plan
  250. }
  251. plan := fftutil.NewCmplxPlan(fftSize)
  252. rt.survPlans[fftSize] = plan
  253. return plan
  254. }
  255. func (rt *dspRuntime) spectrumForLevel(iq []complex64, fftSize int, gpuState *gpuStatus, allowGPU bool) []float64 {
  256. if len(iq) == 0 || fftSize <= 0 {
  257. return nil
  258. }
  259. if len(iq) > fftSize {
  260. iq = iq[len(iq)-fftSize:]
  261. }
  262. window := rt.windowForFFT(fftSize)
  263. plan := rt.planForFFT(fftSize)
  264. return rt.spectrumFromIQWithPlan(iq, window, plan, gpuState, allowGPU)
  265. }
  266. func sanitizeSpectrum(spectrum []float64) {
  267. for i := range spectrum {
  268. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  269. spectrum[i] = -200
  270. }
  271. }
  272. }
  273. func (rt *dspRuntime) decimationTaps(factor int) []float64 {
  274. if factor <= 1 {
  275. return nil
  276. }
  277. if rt.survFIR == nil {
  278. rt.survFIR = map[int][]float64{}
  279. }
  280. if taps, ok := rt.survFIR[factor]; ok {
  281. return taps
  282. }
  283. cutoff := float64(rt.cfg.SampleRate/factor) * 0.5 * 0.8
  284. taps := dsp.LowpassFIR(cutoff, rt.cfg.SampleRate, 101)
  285. rt.survFIR[factor] = taps
  286. return taps
  287. }
  288. func (rt *dspRuntime) decimateSurveillanceIQ(iq []complex64, factor int) []complex64 {
  289. if factor <= 1 {
  290. return iq
  291. }
  292. taps := rt.decimationTaps(factor)
  293. if len(taps) == 0 {
  294. return dsp.Decimate(iq, factor)
  295. }
  296. filtered := dsp.ApplyFIR(iq, taps)
  297. return dsp.Decimate(filtered, factor)
  298. }
  299. func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) {
  300. required := rt.cfg.FFTSize
  301. if rt.detailFFT > required {
  302. required = rt.detailFFT
  303. }
  304. available := required
  305. st := srcMgr.Stats()
  306. if st.BufferSamples > required {
  307. available = (st.BufferSamples / required) * required
  308. if available < required {
  309. available = required
  310. }
  311. }
  312. allIQ, err := srcMgr.ReadIQ(available)
  313. if err != nil {
  314. return nil, err
  315. }
  316. if rec != nil {
  317. rec.Ingest(time.Now(), allIQ)
  318. }
  319. survIQ := allIQ
  320. if len(allIQ) > rt.cfg.FFTSize {
  321. survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:]
  322. }
  323. detailIQ := survIQ
  324. if rt.detailFFT > 0 && len(allIQ) >= rt.detailFFT {
  325. detailIQ = allIQ[len(allIQ)-rt.detailFFT:]
  326. }
  327. if rt.dcEnabled {
  328. dcBlocker.Apply(allIQ)
  329. }
  330. if rt.iqEnabled {
  331. dsp.IQBalance(survIQ)
  332. if !sameIQBuffer(detailIQ, survIQ) {
  333. detailIQ = append([]complex64(nil), detailIQ...)
  334. dsp.IQBalance(detailIQ)
  335. }
  336. }
  337. survSpectrum := rt.spectrumFromIQ(survIQ, gpuState)
  338. sanitizeSpectrum(survSpectrum)
  339. detailSpectrum := survSpectrum
  340. if !sameIQBuffer(detailIQ, survIQ) {
  341. detailSpectrum = rt.spectrumFromIQWithPlan(detailIQ, rt.detailWindow, rt.detailPlan, gpuState, false)
  342. sanitizeSpectrum(detailSpectrum)
  343. }
  344. policy := pipeline.PolicyFromConfig(rt.cfg)
  345. plan := rt.buildSurveillancePlan(policy)
  346. surveillanceSpectra := make([]pipeline.SurveillanceLevelSpectrum, 0, len(plan.Specs))
  347. for _, spec := range plan.Specs {
  348. if spec.Level.FFTSize <= 0 {
  349. continue
  350. }
  351. var spectrum []float64
  352. if spec.Decim <= 1 {
  353. if spec.Level.FFTSize == len(survSpectrum) {
  354. spectrum = survSpectrum
  355. } else {
  356. spectrum = rt.spectrumForLevel(survIQ, spec.Level.FFTSize, gpuState, spec.AllowGPU)
  357. sanitizeSpectrum(spectrum)
  358. }
  359. } else {
  360. required := spec.Level.FFTSize * spec.Decim
  361. if required > len(survIQ) {
  362. continue
  363. }
  364. src := survIQ
  365. if len(src) > required {
  366. src = src[len(src)-required:]
  367. }
  368. decimated := rt.decimateSurveillanceIQ(src, spec.Decim)
  369. spectrum = rt.spectrumForLevel(decimated, spec.Level.FFTSize, gpuState, false)
  370. sanitizeSpectrum(spectrum)
  371. }
  372. if len(spectrum) == 0 {
  373. continue
  374. }
  375. surveillanceSpectra = append(surveillanceSpectra, pipeline.SurveillanceLevelSpectrum{Level: spec.Level, Spectrum: spectrum})
  376. }
  377. now := time.Now()
  378. finished, detected := rt.det.Process(now, survSpectrum, rt.cfg.CenterHz)
  379. return &spectrumArtifacts{
  380. allIQ: allIQ,
  381. surveillanceIQ: survIQ,
  382. detailIQ: detailIQ,
  383. surveillanceSpectrum: survSpectrum,
  384. surveillanceSpectra: surveillanceSpectra,
  385. surveillancePlan: plan,
  386. detailSpectrum: detailSpectrum,
  387. finished: finished,
  388. detected: detected,
  389. thresholds: rt.det.LastThresholds(),
  390. noiseFloor: rt.det.LastNoiseFloor(),
  391. now: now,
  392. }, nil
  393. }
  394. func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.SurveillanceResult {
  395. if art == nil {
  396. return pipeline.SurveillanceResult{}
  397. }
  398. policy := pipeline.PolicyFromConfig(rt.cfg)
  399. plan := art.surveillancePlan
  400. if plan.Primary.Name == "" {
  401. plan = rt.buildSurveillancePlan(policy)
  402. }
  403. candidates := pipeline.CandidatesFromSignalsWithLevel(art.detected, "surveillance-detector", plan.Primary)
  404. scheduled := pipeline.ScheduleCandidates(candidates, policy)
  405. return pipeline.SurveillanceResult{
  406. Level: plan.Primary,
  407. Levels: plan.Levels,
  408. LevelSet: plan.LevelSet,
  409. DisplayLevel: plan.Presentation,
  410. Context: plan.Context,
  411. Spectra: art.surveillanceSpectra,
  412. Candidates: candidates,
  413. Scheduled: scheduled,
  414. Finished: art.finished,
  415. Signals: art.detected,
  416. NoiseFloor: art.noiseFloor,
  417. Thresholds: art.thresholds,
  418. }
  419. }
  420. func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now time.Time) pipeline.RefinementInput {
  421. policy := pipeline.PolicyFromConfig(rt.cfg)
  422. plan := pipeline.BuildRefinementPlan(surv.Candidates, policy)
  423. admission := rt.arbiter.AdmitRefinement(plan, policy, now)
  424. plan = admission.Plan
  425. workItems := make([]pipeline.RefinementWorkItem, 0, len(admission.WorkItems))
  426. if len(admission.WorkItems) > 0 {
  427. workItems = append(workItems, admission.WorkItems...)
  428. }
  429. scheduled := append([]pipeline.ScheduledCandidate(nil), admission.Admitted...)
  430. workIndex := map[int64]int{}
  431. for i := range workItems {
  432. if workItems[i].Candidate.ID == 0 {
  433. continue
  434. }
  435. workIndex[workItems[i].Candidate.ID] = i
  436. }
  437. windows := make([]pipeline.RefinementWindow, 0, len(scheduled))
  438. for _, sc := range scheduled {
  439. window := pipeline.RefinementWindowForCandidate(policy, sc.Candidate)
  440. windows = append(windows, window)
  441. if idx, ok := workIndex[sc.Candidate.ID]; ok {
  442. workItems[idx].Window = window
  443. }
  444. }
  445. detailFFT := rt.cfg.Refinement.DetailFFTSize
  446. if detailFFT <= 0 {
  447. detailFFT = rt.cfg.FFTSize
  448. }
  449. levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate))
  450. if _, maxSpan, ok := windowSpanBounds(windows); ok {
  451. levelSpan = maxSpan
  452. }
  453. level := analysisLevel("refinement", "refinement", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "refinement-window", 1, rt.cfg.SampleRate)
  454. detailLevel := analysisLevel("detail", "detail", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "detail-spectrum", 1, rt.cfg.SampleRate)
  455. if len(workItems) > 0 {
  456. for i := range workItems {
  457. item := &workItems[i]
  458. if item.Window.SpanHz <= 0 {
  459. continue
  460. }
  461. item.Execution = &pipeline.RefinementExecution{
  462. Stage: "refine",
  463. SampleRate: rt.cfg.SampleRate,
  464. FFTSize: detailFFT,
  465. CenterHz: item.Window.CenterHz,
  466. SpanHz: item.Window.SpanHz,
  467. Source: detailLevel.Source,
  468. }
  469. }
  470. }
  471. input := pipeline.RefinementInput{
  472. Level: level,
  473. Detail: detailLevel,
  474. Context: surv.Context,
  475. Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan},
  476. Budgets: pipeline.BudgetModelFromPolicy(policy),
  477. Admission: admission.Admission,
  478. Candidates: append([]pipeline.Candidate(nil), surv.Candidates...),
  479. Scheduled: scheduled,
  480. WorkItems: workItems,
  481. Plan: plan,
  482. Windows: windows,
  483. SampleRate: rt.cfg.SampleRate,
  484. FFTSize: detailFFT,
  485. CenterHz: rt.cfg.CenterHz,
  486. Source: "surveillance-detector",
  487. }
  488. input.Context.Refinement = level
  489. input.Context.Detail = detailLevel
  490. if !policy.RefinementEnabled {
  491. for i := range input.WorkItems {
  492. item := &input.WorkItems[i]
  493. if item.Status == pipeline.RefinementStatusDropped {
  494. continue
  495. }
  496. item.Status = pipeline.RefinementStatusDropped
  497. item.Reason = pipeline.RefinementReasonDisabled
  498. }
  499. input.Scheduled = nil
  500. input.Request.Reason = pipeline.ReasonAdmissionDisabled
  501. input.Admission.Reason = pipeline.ReasonAdmissionDisabled
  502. input.Admission.Admitted = 0
  503. input.Admission.Skipped = 0
  504. input.Admission.Displaced = 0
  505. input.Plan.Selected = nil
  506. input.Plan.DroppedByBudget = 0
  507. }
  508. rt.setArbitration(policy, input.Budgets, input.Admission, rt.arbitration.Queue)
  509. return input
  510. }
  511. func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep {
  512. input := rt.buildRefinementInput(surv, art.now)
  513. markWorkItemsStatus(input.WorkItems, pipeline.RefinementStatusAdmitted, pipeline.RefinementStatusRunning, pipeline.RefinementReasonRunning)
  514. result := rt.refineSignals(art, input, extractMgr, rec)
  515. markWorkItemsCompleted(input.WorkItems, result.Candidates)
  516. return pipeline.RefinementStep{Input: input, Result: result}
  517. }
  518. func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.RefinementInput, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult {
  519. if art == nil || len(art.detailIQ) == 0 || len(input.Scheduled) == 0 {
  520. return pipeline.RefinementResult{}
  521. }
  522. policy := pipeline.PolicyFromConfig(rt.cfg)
  523. selectedCandidates := make([]pipeline.Candidate, 0, len(input.Scheduled))
  524. selectedSignals := make([]detector.Signal, 0, len(input.Scheduled))
  525. for _, sc := range input.Scheduled {
  526. selectedCandidates = append(selectedCandidates, sc.Candidate)
  527. selectedSignals = append(selectedSignals, detector.Signal{
  528. ID: sc.Candidate.ID,
  529. FirstBin: sc.Candidate.FirstBin,
  530. LastBin: sc.Candidate.LastBin,
  531. CenterHz: sc.Candidate.CenterHz,
  532. BWHz: sc.Candidate.BandwidthHz,
  533. PeakDb: sc.Candidate.PeakDb,
  534. SNRDb: sc.Candidate.SNRDb,
  535. NoiseDb: sc.Candidate.NoiseDb,
  536. })
  537. }
  538. sampleRate := input.SampleRate
  539. fftSize := input.FFTSize
  540. centerHz := input.CenterHz
  541. if sampleRate <= 0 {
  542. sampleRate = rt.cfg.SampleRate
  543. }
  544. if fftSize <= 0 {
  545. fftSize = rt.cfg.FFTSize
  546. }
  547. if centerHz == 0 {
  548. centerHz = rt.cfg.CenterHz
  549. }
  550. snips, snipRates := extractSignalIQBatch(extractMgr, art.detailIQ, sampleRate, centerHz, selectedSignals)
  551. refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.detailSpectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
  552. signals := make([]detector.Signal, 0, len(refined))
  553. decisions := make([]pipeline.SignalDecision, 0, len(refined))
  554. for i, ref := range refined {
  555. sig := ref.Signal
  556. signals = append(signals, sig)
  557. cls := sig.Class
  558. snipRate := ref.SnippetRate
  559. decision := pipeline.DecideSignalAction(policy, ref.Candidate, cls)
  560. decisions = append(decisions, decision)
  561. if cls != nil {
  562. pll := classifier.PLLResult{}
  563. if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
  564. pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
  565. cls.PLL = &pll
  566. signals[i].PLL = &pll
  567. if cls.ModType == classifier.ClassWFM && pll.Stereo {
  568. cls.ModType = classifier.ClassWFMStereo
  569. }
  570. }
  571. if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil {
  572. rt.updateRDS(art.now, rec, &signals[i], cls)
  573. }
  574. }
  575. }
  576. budget := pipeline.BudgetModelFromPolicy(policy)
  577. queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy)
  578. rt.setArbitration(policy, budget, input.Admission, queueStats)
  579. summary := summarizeDecisions(decisions)
  580. if rec != nil {
  581. if summary.RecordEnabled > 0 {
  582. rt.cfg.Recorder.Enabled = true
  583. }
  584. if summary.DecodeEnabled > 0 {
  585. rt.cfg.Recorder.AutoDecode = true
  586. }
  587. }
  588. rt.det.UpdateClasses(signals)
  589. return pipeline.RefinementResult{Level: input.Level, Signals: signals, Decisions: decisions, Candidates: selectedCandidates}
  590. }
  591. func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) {
  592. if sig == nil || cls == nil {
  593. return
  594. }
  595. keyHz := sig.CenterHz
  596. if sig.PLL != nil && sig.PLL.ExactHz != 0 {
  597. keyHz = sig.PLL.ExactHz
  598. }
  599. key := int64(math.Round(keyHz / 25000.0))
  600. st := rt.rdsMap[key]
  601. if st == nil {
  602. st = &rdsState{}
  603. rt.rdsMap[key] = st
  604. }
  605. if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
  606. st.lastDecode = now
  607. atomic.StoreInt32(&st.busy, 1)
  608. go func(st *rdsState, sigHz float64) {
  609. defer atomic.StoreInt32(&st.busy, 0)
  610. ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
  611. if len(ringIQ) < ringSR || ringSR <= 0 {
  612. return
  613. }
  614. offset := sigHz - ringCenter
  615. shifted := dsp.FreqShift(ringIQ, ringSR, offset)
  616. decim1 := ringSR / 1000000
  617. if decim1 < 1 {
  618. decim1 = 1
  619. }
  620. lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
  621. f1 := dsp.ApplyFIR(shifted, lp1)
  622. d1 := dsp.Decimate(f1, decim1)
  623. rate1 := ringSR / decim1
  624. decim2 := rate1 / 250000
  625. if decim2 < 1 {
  626. decim2 = 1
  627. }
  628. lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
  629. f2 := dsp.ApplyFIR(d1, lp2)
  630. decimated := dsp.Decimate(f2, decim2)
  631. actualRate := rate1 / decim2
  632. rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
  633. if len(rdsBase.Samples) == 0 {
  634. return
  635. }
  636. st.mu.Lock()
  637. result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
  638. if result.PS != "" {
  639. st.result = result
  640. }
  641. st.mu.Unlock()
  642. }(st, sig.CenterHz)
  643. }
  644. st.mu.Lock()
  645. ps := st.result.PS
  646. st.mu.Unlock()
  647. if ps != "" && sig.PLL != nil {
  648. sig.PLL.RDSStation = strings.TrimSpace(ps)
  649. cls.PLL = sig.PLL
  650. }
  651. }
  652. func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) {
  653. if len(rt.rdsMap) > 0 {
  654. activeIDs := make(map[int64]bool, len(displaySignals))
  655. for _, s := range displaySignals {
  656. keyHz := s.CenterHz
  657. if s.PLL != nil && s.PLL.ExactHz != 0 {
  658. keyHz = s.PLL.ExactHz
  659. }
  660. activeIDs[int64(math.Round(keyHz/25000.0))] = true
  661. }
  662. for id := range rt.rdsMap {
  663. if !activeIDs[id] {
  664. delete(rt.rdsMap, id)
  665. }
  666. }
  667. }
  668. if len(rt.streamPhaseState) > 0 {
  669. sigIDs := make(map[int64]bool, len(displaySignals))
  670. for _, s := range displaySignals {
  671. sigIDs[s.ID] = true
  672. }
  673. for id := range rt.streamPhaseState {
  674. if !sigIDs[id] {
  675. delete(rt.streamPhaseState, id)
  676. }
  677. }
  678. }
  679. if rec != nil && len(displaySignals) > 0 {
  680. aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
  681. _ = aqCfg
  682. }
  683. }
  684. func spanForPolicy(policy pipeline.Policy, fallback float64) float64 {
  685. if policy.MonitorSpanHz > 0 {
  686. return policy.MonitorSpanHz
  687. }
  688. if policy.MonitorStartHz != 0 && policy.MonitorEndHz != 0 && policy.MonitorEndHz > policy.MonitorStartHz {
  689. return policy.MonitorEndHz - policy.MonitorStartHz
  690. }
  691. return fallback
  692. }
  693. func windowSpanBounds(windows []pipeline.RefinementWindow) (float64, float64, bool) {
  694. minSpan := 0.0
  695. maxSpan := 0.0
  696. ok := false
  697. for _, w := range windows {
  698. if w.SpanHz <= 0 {
  699. continue
  700. }
  701. if !ok || w.SpanHz < minSpan {
  702. minSpan = w.SpanHz
  703. }
  704. if !ok || w.SpanHz > maxSpan {
  705. maxSpan = w.SpanHz
  706. }
  707. ok = true
  708. }
  709. return minSpan, maxSpan, ok
  710. }
  711. func analysisLevel(name, role, truth string, sampleRate int, fftSize int, centerHz float64, spanHz float64, source string, decimation int, baseRate int) pipeline.AnalysisLevel {
  712. level := pipeline.AnalysisLevel{
  713. Name: name,
  714. Role: role,
  715. Truth: truth,
  716. SampleRate: sampleRate,
  717. FFTSize: fftSize,
  718. CenterHz: centerHz,
  719. SpanHz: spanHz,
  720. Source: source,
  721. }
  722. if level.SampleRate > 0 && level.FFTSize > 0 {
  723. level.BinHz = float64(level.SampleRate) / float64(level.FFTSize)
  724. }
  725. if decimation > 0 {
  726. level.Decimation = decimation
  727. } else if baseRate > 0 && level.SampleRate > 0 && baseRate%level.SampleRate == 0 {
  728. level.Decimation = baseRate / level.SampleRate
  729. }
  730. return level
  731. }
  732. func (rt *dspRuntime) buildSurveillancePlan(policy pipeline.Policy) surveillancePlan {
  733. baseRate := rt.cfg.SampleRate
  734. baseFFT := rt.cfg.Surveillance.AnalysisFFTSize
  735. if baseFFT <= 0 {
  736. baseFFT = rt.cfg.FFTSize
  737. }
  738. span := spanForPolicy(policy, float64(baseRate))
  739. primary := analysisLevel("surveillance", "surveillance", "surveillance", baseRate, baseFFT, rt.cfg.CenterHz, span, "baseband", 1, baseRate)
  740. levels := []pipeline.AnalysisLevel{primary}
  741. specs := []surveillanceLevelSpec{{Level: primary, Decim: 1, AllowGPU: true}}
  742. context := pipeline.AnalysisContext{Surveillance: primary}
  743. derivedLevels := make([]pipeline.AnalysisLevel, 0, 2)
  744. strategy := strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy))
  745. switch strategy {
  746. case "multi-res", "multi-resolution", "multi", "multi_res":
  747. decim := 2
  748. derivedRate := baseRate / decim
  749. derivedFFT := baseFFT / decim
  750. if derivedRate >= 200000 && derivedFFT >= 256 {
  751. derivedSpan := spanForPolicy(policy, float64(derivedRate))
  752. derived := analysisLevel("surveillance-lowres", "surveillance-lowres", "surveillance", derivedRate, derivedFFT, rt.cfg.CenterHz, derivedSpan, "decimated", decim, baseRate)
  753. levels = append(levels, derived)
  754. specs = append(specs, surveillanceLevelSpec{Level: derived, Decim: decim, AllowGPU: false})
  755. context.Derived = append(context.Derived, derived)
  756. derivedLevels = append(derivedLevels, derived)
  757. }
  758. }
  759. presentation := analysisLevel("presentation", "presentation", "presentation", baseRate, rt.cfg.Surveillance.DisplayBins, rt.cfg.CenterHz, span, "display", 1, baseRate)
  760. context.Presentation = presentation
  761. levelSet := pipeline.SurveillanceLevelSet{
  762. Primary: primary,
  763. Derived: append([]pipeline.AnalysisLevel(nil), derivedLevels...),
  764. Presentation: presentation,
  765. }
  766. allLevels := make([]pipeline.AnalysisLevel, 0, 1+len(derivedLevels)+1)
  767. allLevels = append(allLevels, primary)
  768. allLevels = append(allLevels, derivedLevels...)
  769. if presentation.Name != "" {
  770. allLevels = append(allLevels, presentation)
  771. }
  772. levelSet.All = allLevels
  773. return surveillancePlan{
  774. Primary: primary,
  775. Levels: levels,
  776. LevelSet: levelSet,
  777. Presentation: presentation,
  778. Context: context,
  779. Specs: specs,
  780. }
  781. }
  782. func sameIQBuffer(a []complex64, b []complex64) bool {
  783. if len(a) != len(b) {
  784. return false
  785. }
  786. if len(a) == 0 {
  787. return true
  788. }
  789. return &a[0] == &b[0]
  790. }
  791. func markWorkItemsStatus(items []pipeline.RefinementWorkItem, from string, to string, reason string) {
  792. for i := range items {
  793. if items[i].Status != from {
  794. continue
  795. }
  796. items[i].Status = to
  797. if reason != "" {
  798. items[i].Reason = reason
  799. }
  800. }
  801. }
  802. func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pipeline.Candidate) {
  803. if len(items) == 0 || len(candidates) == 0 {
  804. return
  805. }
  806. done := map[int64]struct{}{}
  807. for _, cand := range candidates {
  808. if cand.ID != 0 {
  809. done[cand.ID] = struct{}{}
  810. }
  811. }
  812. for i := range items {
  813. if _, ok := done[items[i].Candidate.ID]; !ok {
  814. continue
  815. }
  816. items[i].Status = pipeline.RefinementStatusCompleted
  817. items[i].Reason = pipeline.RefinementReasonCompleted
  818. }
  819. }
  820. func (rt *dspRuntime) setArbitration(policy pipeline.Policy, budget pipeline.BudgetModel, admission pipeline.RefinementAdmission, queue pipeline.DecisionQueueStats) {
  821. rt.arbitration = pipeline.BuildArbitrationState(policy, budget, admission, queue)
  822. }