Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

841 строка
25KB

  1. package main
  2. import (
  3. "math"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "sdr-wideband-suite/internal/classifier"
  9. "sdr-wideband-suite/internal/config"
  10. "sdr-wideband-suite/internal/demod"
  11. "sdr-wideband-suite/internal/detector"
  12. "sdr-wideband-suite/internal/dsp"
  13. fftutil "sdr-wideband-suite/internal/fft"
  14. "sdr-wideband-suite/internal/fft/gpufft"
  15. "sdr-wideband-suite/internal/pipeline"
  16. "sdr-wideband-suite/internal/rds"
  17. "sdr-wideband-suite/internal/recorder"
  18. )
  19. type rdsState struct {
  20. dec rds.Decoder
  21. result rds.Result
  22. lastDecode time.Time
  23. busy int32
  24. mu sync.Mutex
  25. }
  26. type dspRuntime struct {
  27. cfg config.Config
  28. det *detector.Detector
  29. window []float64
  30. plan *fftutil.CmplxPlan
  31. detailWindow []float64
  32. detailPlan *fftutil.CmplxPlan
  33. detailFFT int
  34. survWindows map[int][]float64
  35. survPlans map[int]*fftutil.CmplxPlan
  36. survFIR map[int][]float64
  37. dcEnabled bool
  38. iqEnabled bool
  39. useGPU bool
  40. gpuEngine *gpufft.Engine
  41. rdsMap map[int64]*rdsState
  42. streamPhaseState map[int64]*streamExtractState
  43. streamOverlap *streamIQOverlap
  44. arbiter *pipeline.Arbiter
  45. arbitration pipeline.ArbitrationState
  46. gotSamples bool
  47. }
  48. type spectrumArtifacts struct {
  49. allIQ []complex64
  50. surveillanceIQ []complex64
  51. detailIQ []complex64
  52. surveillanceSpectrum []float64
  53. surveillanceSpectra []pipeline.SurveillanceLevelSpectrum
  54. surveillancePlan surveillancePlan
  55. detailSpectrum []float64
  56. finished []detector.Event
  57. detected []detector.Signal
  58. thresholds []float64
  59. noiseFloor float64
  60. now time.Time
  61. }
  62. type surveillanceLevelSpec struct {
  63. Level pipeline.AnalysisLevel
  64. Decim int
  65. AllowGPU bool
  66. }
  67. type surveillancePlan struct {
  68. Primary pipeline.AnalysisLevel
  69. Levels []pipeline.AnalysisLevel
  70. Presentation pipeline.AnalysisLevel
  71. Context pipeline.AnalysisContext
  72. Specs []surveillanceLevelSpec
  73. }
  74. func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime {
  75. detailFFT := cfg.Refinement.DetailFFTSize
  76. if detailFFT <= 0 {
  77. detailFFT = cfg.FFTSize
  78. }
  79. rt := &dspRuntime{
  80. cfg: cfg,
  81. det: det,
  82. window: window,
  83. plan: fftutil.NewCmplxPlan(cfg.FFTSize),
  84. detailWindow: fftutil.Hann(detailFFT),
  85. detailPlan: fftutil.NewCmplxPlan(detailFFT),
  86. detailFFT: detailFFT,
  87. survWindows: map[int][]float64{},
  88. survPlans: map[int]*fftutil.CmplxPlan{},
  89. survFIR: map[int][]float64{},
  90. dcEnabled: cfg.DCBlock,
  91. iqEnabled: cfg.IQBalance,
  92. useGPU: cfg.UseGPUFFT,
  93. rdsMap: map[int64]*rdsState{},
  94. streamPhaseState: map[int64]*streamExtractState{},
  95. streamOverlap: &streamIQOverlap{},
  96. arbiter: pipeline.NewArbiter(),
  97. }
  98. if rt.useGPU && gpuState != nil {
  99. snap := gpuState.snapshot()
  100. if snap.Available {
  101. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  102. rt.gpuEngine = eng
  103. gpuState.set(true, nil)
  104. } else {
  105. gpuState.set(false, err)
  106. rt.useGPU = false
  107. }
  108. }
  109. }
  110. return rt
  111. }
  112. func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) {
  113. prevFFT := rt.cfg.FFTSize
  114. prevSampleRate := rt.cfg.SampleRate
  115. prevUseGPU := rt.useGPU
  116. prevDetailFFT := rt.detailFFT
  117. rt.cfg = upd.cfg
  118. if rec != nil {
  119. rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{
  120. Enabled: rt.cfg.Recorder.Enabled,
  121. MinSNRDb: rt.cfg.Recorder.MinSNRDb,
  122. MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second),
  123. MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second),
  124. PrerollMs: rt.cfg.Recorder.PrerollMs,
  125. RecordIQ: rt.cfg.Recorder.RecordIQ,
  126. RecordAudio: rt.cfg.Recorder.RecordAudio,
  127. AutoDemod: rt.cfg.Recorder.AutoDemod,
  128. AutoDecode: rt.cfg.Recorder.AutoDecode,
  129. MaxDiskMB: rt.cfg.Recorder.MaxDiskMB,
  130. OutputDir: rt.cfg.Recorder.OutputDir,
  131. ClassFilter: rt.cfg.Recorder.ClassFilter,
  132. RingSeconds: rt.cfg.Recorder.RingSeconds,
  133. DeemphasisUs: rt.cfg.Recorder.DeemphasisUs,
  134. ExtractionTaps: rt.cfg.Recorder.ExtractionTaps,
  135. ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult,
  136. }, rt.cfg.CenterHz, buildDecoderMap(rt.cfg))
  137. }
  138. if upd.det != nil {
  139. rt.det = upd.det
  140. }
  141. if upd.window != nil {
  142. rt.window = upd.window
  143. rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize)
  144. }
  145. detailFFT := rt.cfg.Refinement.DetailFFTSize
  146. if detailFFT <= 0 {
  147. detailFFT = rt.cfg.FFTSize
  148. }
  149. if detailFFT != prevDetailFFT {
  150. rt.detailFFT = detailFFT
  151. rt.detailWindow = fftutil.Hann(detailFFT)
  152. rt.detailPlan = fftutil.NewCmplxPlan(detailFFT)
  153. }
  154. if prevSampleRate != rt.cfg.SampleRate {
  155. rt.survFIR = map[int][]float64{}
  156. }
  157. if prevFFT != rt.cfg.FFTSize {
  158. rt.survWindows = map[int][]float64{}
  159. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  160. }
  161. rt.dcEnabled = upd.dcBlock
  162. rt.iqEnabled = upd.iqBalance
  163. if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU {
  164. srcMgr.Flush()
  165. rt.gotSamples = false
  166. if rt.gpuEngine != nil {
  167. rt.gpuEngine.Close()
  168. rt.gpuEngine = nil
  169. }
  170. rt.useGPU = rt.cfg.UseGPUFFT
  171. if rt.useGPU && gpuState != nil {
  172. snap := gpuState.snapshot()
  173. if snap.Available {
  174. if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil {
  175. rt.gpuEngine = eng
  176. gpuState.set(true, nil)
  177. } else {
  178. gpuState.set(false, err)
  179. rt.useGPU = false
  180. }
  181. } else {
  182. gpuState.set(false, nil)
  183. rt.useGPU = false
  184. }
  185. } else if gpuState != nil {
  186. gpuState.set(false, nil)
  187. }
  188. }
  189. }
  190. func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 {
  191. return rt.spectrumFromIQWithPlan(iq, rt.window, rt.plan, gpuState, true)
  192. }
  193. func (rt *dspRuntime) spectrumFromIQWithPlan(iq []complex64, window []float64, plan *fftutil.CmplxPlan, gpuState *gpuStatus, allowGPU bool) []float64 {
  194. if len(iq) == 0 {
  195. return nil
  196. }
  197. if allowGPU && rt.useGPU && rt.gpuEngine != nil {
  198. gpuBuf := make([]complex64, len(iq))
  199. if len(window) == len(iq) {
  200. for i := 0; i < len(iq); i++ {
  201. v := iq[i]
  202. w := float32(window[i])
  203. gpuBuf[i] = complex(real(v)*w, imag(v)*w)
  204. }
  205. } else {
  206. copy(gpuBuf, iq)
  207. }
  208. out, err := rt.gpuEngine.Exec(gpuBuf)
  209. if err != nil {
  210. if gpuState != nil {
  211. gpuState.set(false, err)
  212. }
  213. rt.useGPU = false
  214. return fftutil.SpectrumWithPlan(gpuBuf, nil, plan)
  215. }
  216. return fftutil.SpectrumFromFFT(out)
  217. }
  218. return fftutil.SpectrumWithPlan(iq, window, plan)
  219. }
  220. func (rt *dspRuntime) windowForFFT(fftSize int) []float64 {
  221. if fftSize <= 0 {
  222. return nil
  223. }
  224. if fftSize == rt.cfg.FFTSize {
  225. return rt.window
  226. }
  227. if rt.survWindows == nil {
  228. rt.survWindows = map[int][]float64{}
  229. }
  230. if window, ok := rt.survWindows[fftSize]; ok {
  231. return window
  232. }
  233. window := fftutil.Hann(fftSize)
  234. rt.survWindows[fftSize] = window
  235. return window
  236. }
  237. func (rt *dspRuntime) planForFFT(fftSize int) *fftutil.CmplxPlan {
  238. if fftSize <= 0 {
  239. return nil
  240. }
  241. if fftSize == rt.cfg.FFTSize {
  242. return rt.plan
  243. }
  244. if rt.survPlans == nil {
  245. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  246. }
  247. if plan, ok := rt.survPlans[fftSize]; ok {
  248. return plan
  249. }
  250. plan := fftutil.NewCmplxPlan(fftSize)
  251. rt.survPlans[fftSize] = plan
  252. return plan
  253. }
  254. func (rt *dspRuntime) spectrumForLevel(iq []complex64, fftSize int, gpuState *gpuStatus, allowGPU bool) []float64 {
  255. if len(iq) == 0 || fftSize <= 0 {
  256. return nil
  257. }
  258. if len(iq) > fftSize {
  259. iq = iq[len(iq)-fftSize:]
  260. }
  261. window := rt.windowForFFT(fftSize)
  262. plan := rt.planForFFT(fftSize)
  263. return rt.spectrumFromIQWithPlan(iq, window, plan, gpuState, allowGPU)
  264. }
  265. func sanitizeSpectrum(spectrum []float64) {
  266. for i := range spectrum {
  267. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  268. spectrum[i] = -200
  269. }
  270. }
  271. }
  272. func (rt *dspRuntime) decimationTaps(factor int) []float64 {
  273. if factor <= 1 {
  274. return nil
  275. }
  276. if rt.survFIR == nil {
  277. rt.survFIR = map[int][]float64{}
  278. }
  279. if taps, ok := rt.survFIR[factor]; ok {
  280. return taps
  281. }
  282. cutoff := float64(rt.cfg.SampleRate/factor) * 0.5 * 0.8
  283. taps := dsp.LowpassFIR(cutoff, rt.cfg.SampleRate, 101)
  284. rt.survFIR[factor] = taps
  285. return taps
  286. }
  287. func (rt *dspRuntime) decimateSurveillanceIQ(iq []complex64, factor int) []complex64 {
  288. if factor <= 1 {
  289. return iq
  290. }
  291. taps := rt.decimationTaps(factor)
  292. if len(taps) == 0 {
  293. return dsp.Decimate(iq, factor)
  294. }
  295. filtered := dsp.ApplyFIR(iq, taps)
  296. return dsp.Decimate(filtered, factor)
  297. }
  298. func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) {
  299. required := rt.cfg.FFTSize
  300. if rt.detailFFT > required {
  301. required = rt.detailFFT
  302. }
  303. available := required
  304. st := srcMgr.Stats()
  305. if st.BufferSamples > required {
  306. available = (st.BufferSamples / required) * required
  307. if available < required {
  308. available = required
  309. }
  310. }
  311. allIQ, err := srcMgr.ReadIQ(available)
  312. if err != nil {
  313. return nil, err
  314. }
  315. if rec != nil {
  316. rec.Ingest(time.Now(), allIQ)
  317. }
  318. survIQ := allIQ
  319. if len(allIQ) > rt.cfg.FFTSize {
  320. survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:]
  321. }
  322. detailIQ := survIQ
  323. if rt.detailFFT > 0 && len(allIQ) >= rt.detailFFT {
  324. detailIQ = allIQ[len(allIQ)-rt.detailFFT:]
  325. }
  326. if rt.dcEnabled {
  327. dcBlocker.Apply(allIQ)
  328. }
  329. if rt.iqEnabled {
  330. dsp.IQBalance(survIQ)
  331. if !sameIQBuffer(detailIQ, survIQ) {
  332. detailIQ = append([]complex64(nil), detailIQ...)
  333. dsp.IQBalance(detailIQ)
  334. }
  335. }
  336. survSpectrum := rt.spectrumFromIQ(survIQ, gpuState)
  337. sanitizeSpectrum(survSpectrum)
  338. detailSpectrum := survSpectrum
  339. if !sameIQBuffer(detailIQ, survIQ) {
  340. detailSpectrum = rt.spectrumFromIQWithPlan(detailIQ, rt.detailWindow, rt.detailPlan, gpuState, false)
  341. sanitizeSpectrum(detailSpectrum)
  342. }
  343. policy := pipeline.PolicyFromConfig(rt.cfg)
  344. plan := rt.buildSurveillancePlan(policy)
  345. surveillanceSpectra := make([]pipeline.SurveillanceLevelSpectrum, 0, len(plan.Specs))
  346. for _, spec := range plan.Specs {
  347. if spec.Level.FFTSize <= 0 {
  348. continue
  349. }
  350. var spectrum []float64
  351. if spec.Decim <= 1 {
  352. if spec.Level.FFTSize == len(survSpectrum) {
  353. spectrum = survSpectrum
  354. } else {
  355. spectrum = rt.spectrumForLevel(survIQ, spec.Level.FFTSize, gpuState, spec.AllowGPU)
  356. sanitizeSpectrum(spectrum)
  357. }
  358. } else {
  359. required := spec.Level.FFTSize * spec.Decim
  360. if required > len(survIQ) {
  361. continue
  362. }
  363. src := survIQ
  364. if len(src) > required {
  365. src = src[len(src)-required:]
  366. }
  367. decimated := rt.decimateSurveillanceIQ(src, spec.Decim)
  368. spectrum = rt.spectrumForLevel(decimated, spec.Level.FFTSize, gpuState, false)
  369. sanitizeSpectrum(spectrum)
  370. }
  371. if len(spectrum) == 0 {
  372. continue
  373. }
  374. surveillanceSpectra = append(surveillanceSpectra, pipeline.SurveillanceLevelSpectrum{Level: spec.Level, Spectrum: spectrum})
  375. }
  376. now := time.Now()
  377. finished, detected := rt.det.Process(now, survSpectrum, rt.cfg.CenterHz)
  378. return &spectrumArtifacts{
  379. allIQ: allIQ,
  380. surveillanceIQ: survIQ,
  381. detailIQ: detailIQ,
  382. surveillanceSpectrum: survSpectrum,
  383. surveillanceSpectra: surveillanceSpectra,
  384. surveillancePlan: plan,
  385. detailSpectrum: detailSpectrum,
  386. finished: finished,
  387. detected: detected,
  388. thresholds: rt.det.LastThresholds(),
  389. noiseFloor: rt.det.LastNoiseFloor(),
  390. now: now,
  391. }, nil
  392. }
  393. func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.SurveillanceResult {
  394. if art == nil {
  395. return pipeline.SurveillanceResult{}
  396. }
  397. policy := pipeline.PolicyFromConfig(rt.cfg)
  398. candidates := pipeline.CandidatesFromSignals(art.detected, "surveillance-detector")
  399. scheduled := pipeline.ScheduleCandidates(candidates, policy)
  400. plan := art.surveillancePlan
  401. if plan.Primary.Name == "" {
  402. plan = rt.buildSurveillancePlan(policy)
  403. }
  404. return pipeline.SurveillanceResult{
  405. Level: plan.Primary,
  406. Levels: plan.Levels,
  407. DisplayLevel: plan.Presentation,
  408. Context: plan.Context,
  409. Spectra: art.surveillanceSpectra,
  410. Candidates: candidates,
  411. Scheduled: scheduled,
  412. Finished: art.finished,
  413. Signals: art.detected,
  414. NoiseFloor: art.noiseFloor,
  415. Thresholds: art.thresholds,
  416. }
  417. }
  418. func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now time.Time) pipeline.RefinementInput {
  419. policy := pipeline.PolicyFromConfig(rt.cfg)
  420. plan := pipeline.BuildRefinementPlan(surv.Candidates, policy)
  421. admission := rt.arbiter.AdmitRefinement(plan, policy, now)
  422. plan = admission.Plan
  423. workItems := make([]pipeline.RefinementWorkItem, 0, len(admission.WorkItems))
  424. if len(admission.WorkItems) > 0 {
  425. workItems = append(workItems, admission.WorkItems...)
  426. }
  427. scheduled := append([]pipeline.ScheduledCandidate(nil), admission.Admitted...)
  428. workIndex := map[int64]int{}
  429. for i := range workItems {
  430. if workItems[i].Candidate.ID == 0 {
  431. continue
  432. }
  433. workIndex[workItems[i].Candidate.ID] = i
  434. }
  435. windows := make([]pipeline.RefinementWindow, 0, len(scheduled))
  436. for _, sc := range scheduled {
  437. window := pipeline.RefinementWindowForCandidate(policy, sc.Candidate)
  438. windows = append(windows, window)
  439. if idx, ok := workIndex[sc.Candidate.ID]; ok {
  440. workItems[idx].Window = window
  441. }
  442. }
  443. detailFFT := rt.cfg.Refinement.DetailFFTSize
  444. if detailFFT <= 0 {
  445. detailFFT = rt.cfg.FFTSize
  446. }
  447. levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate))
  448. if _, maxSpan, ok := windowSpanBounds(windows); ok {
  449. levelSpan = maxSpan
  450. }
  451. level := analysisLevel("refinement", "refinement", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "refinement-window", 1, rt.cfg.SampleRate)
  452. detailLevel := analysisLevel("detail", "detail", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "detail-spectrum", 1, rt.cfg.SampleRate)
  453. if len(workItems) > 0 {
  454. for i := range workItems {
  455. item := &workItems[i]
  456. if item.Window.SpanHz <= 0 {
  457. continue
  458. }
  459. item.Execution = &pipeline.RefinementExecution{
  460. Stage: "refine",
  461. SampleRate: rt.cfg.SampleRate,
  462. FFTSize: detailFFT,
  463. CenterHz: item.Window.CenterHz,
  464. SpanHz: item.Window.SpanHz,
  465. Source: detailLevel.Source,
  466. }
  467. }
  468. }
  469. input := pipeline.RefinementInput{
  470. Level: level,
  471. Detail: detailLevel,
  472. Context: surv.Context,
  473. Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan},
  474. Budgets: pipeline.BudgetModelFromPolicy(policy),
  475. Admission: admission.Admission,
  476. Candidates: append([]pipeline.Candidate(nil), surv.Candidates...),
  477. Scheduled: scheduled,
  478. WorkItems: workItems,
  479. Plan: plan,
  480. Windows: windows,
  481. SampleRate: rt.cfg.SampleRate,
  482. FFTSize: detailFFT,
  483. CenterHz: rt.cfg.CenterHz,
  484. Source: "surveillance-detector",
  485. }
  486. input.Context.Refinement = level
  487. input.Context.Detail = detailLevel
  488. if !policy.RefinementEnabled {
  489. for i := range input.WorkItems {
  490. item := &input.WorkItems[i]
  491. if item.Status == pipeline.RefinementStatusDropped {
  492. continue
  493. }
  494. item.Status = pipeline.RefinementStatusDropped
  495. item.Reason = pipeline.RefinementReasonDisabled
  496. }
  497. input.Scheduled = nil
  498. input.Request.Reason = pipeline.ReasonAdmissionDisabled
  499. input.Admission.Reason = pipeline.ReasonAdmissionDisabled
  500. input.Admission.Admitted = 0
  501. input.Admission.Skipped = 0
  502. input.Admission.Displaced = 0
  503. input.Plan.Selected = nil
  504. input.Plan.DroppedByBudget = 0
  505. }
  506. rt.setArbitration(policy, input.Budgets, input.Admission, rt.arbitration.Queue)
  507. return input
  508. }
  509. func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep {
  510. input := rt.buildRefinementInput(surv, art.now)
  511. markWorkItemsStatus(input.WorkItems, pipeline.RefinementStatusAdmitted, pipeline.RefinementStatusRunning, pipeline.RefinementReasonRunning)
  512. result := rt.refineSignals(art, input, extractMgr, rec)
  513. markWorkItemsCompleted(input.WorkItems, result.Candidates)
  514. return pipeline.RefinementStep{Input: input, Result: result}
  515. }
  516. func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.RefinementInput, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult {
  517. if art == nil || len(art.detailIQ) == 0 || len(input.Scheduled) == 0 {
  518. return pipeline.RefinementResult{}
  519. }
  520. policy := pipeline.PolicyFromConfig(rt.cfg)
  521. selectedCandidates := make([]pipeline.Candidate, 0, len(input.Scheduled))
  522. selectedSignals := make([]detector.Signal, 0, len(input.Scheduled))
  523. for _, sc := range input.Scheduled {
  524. selectedCandidates = append(selectedCandidates, sc.Candidate)
  525. selectedSignals = append(selectedSignals, detector.Signal{
  526. ID: sc.Candidate.ID,
  527. FirstBin: sc.Candidate.FirstBin,
  528. LastBin: sc.Candidate.LastBin,
  529. CenterHz: sc.Candidate.CenterHz,
  530. BWHz: sc.Candidate.BandwidthHz,
  531. PeakDb: sc.Candidate.PeakDb,
  532. SNRDb: sc.Candidate.SNRDb,
  533. NoiseDb: sc.Candidate.NoiseDb,
  534. })
  535. }
  536. sampleRate := input.SampleRate
  537. fftSize := input.FFTSize
  538. centerHz := input.CenterHz
  539. if sampleRate <= 0 {
  540. sampleRate = rt.cfg.SampleRate
  541. }
  542. if fftSize <= 0 {
  543. fftSize = rt.cfg.FFTSize
  544. }
  545. if centerHz == 0 {
  546. centerHz = rt.cfg.CenterHz
  547. }
  548. snips, snipRates := extractSignalIQBatch(extractMgr, art.detailIQ, sampleRate, centerHz, selectedSignals)
  549. refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.detailSpectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
  550. signals := make([]detector.Signal, 0, len(refined))
  551. decisions := make([]pipeline.SignalDecision, 0, len(refined))
  552. for i, ref := range refined {
  553. sig := ref.Signal
  554. signals = append(signals, sig)
  555. cls := sig.Class
  556. snipRate := ref.SnippetRate
  557. decision := pipeline.DecideSignalAction(policy, ref.Candidate, cls)
  558. decisions = append(decisions, decision)
  559. if cls != nil {
  560. pll := classifier.PLLResult{}
  561. if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
  562. pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
  563. cls.PLL = &pll
  564. signals[i].PLL = &pll
  565. if cls.ModType == classifier.ClassWFM && pll.Stereo {
  566. cls.ModType = classifier.ClassWFMStereo
  567. }
  568. }
  569. if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil {
  570. rt.updateRDS(art.now, rec, &signals[i], cls)
  571. }
  572. }
  573. }
  574. budget := pipeline.BudgetModelFromPolicy(policy)
  575. queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy)
  576. rt.setArbitration(policy, budget, input.Admission, queueStats)
  577. summary := summarizeDecisions(decisions)
  578. if rec != nil {
  579. if summary.RecordEnabled > 0 {
  580. rt.cfg.Recorder.Enabled = true
  581. }
  582. if summary.DecodeEnabled > 0 {
  583. rt.cfg.Recorder.AutoDecode = true
  584. }
  585. }
  586. rt.det.UpdateClasses(signals)
  587. return pipeline.RefinementResult{Level: input.Level, Signals: signals, Decisions: decisions, Candidates: selectedCandidates}
  588. }
  589. func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) {
  590. if sig == nil || cls == nil {
  591. return
  592. }
  593. keyHz := sig.CenterHz
  594. if sig.PLL != nil && sig.PLL.ExactHz != 0 {
  595. keyHz = sig.PLL.ExactHz
  596. }
  597. key := int64(math.Round(keyHz / 25000.0))
  598. st := rt.rdsMap[key]
  599. if st == nil {
  600. st = &rdsState{}
  601. rt.rdsMap[key] = st
  602. }
  603. if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
  604. st.lastDecode = now
  605. atomic.StoreInt32(&st.busy, 1)
  606. go func(st *rdsState, sigHz float64) {
  607. defer atomic.StoreInt32(&st.busy, 0)
  608. ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
  609. if len(ringIQ) < ringSR || ringSR <= 0 {
  610. return
  611. }
  612. offset := sigHz - ringCenter
  613. shifted := dsp.FreqShift(ringIQ, ringSR, offset)
  614. decim1 := ringSR / 1000000
  615. if decim1 < 1 {
  616. decim1 = 1
  617. }
  618. lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
  619. f1 := dsp.ApplyFIR(shifted, lp1)
  620. d1 := dsp.Decimate(f1, decim1)
  621. rate1 := ringSR / decim1
  622. decim2 := rate1 / 250000
  623. if decim2 < 1 {
  624. decim2 = 1
  625. }
  626. lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
  627. f2 := dsp.ApplyFIR(d1, lp2)
  628. decimated := dsp.Decimate(f2, decim2)
  629. actualRate := rate1 / decim2
  630. rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
  631. if len(rdsBase.Samples) == 0 {
  632. return
  633. }
  634. st.mu.Lock()
  635. result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
  636. if result.PS != "" {
  637. st.result = result
  638. }
  639. st.mu.Unlock()
  640. }(st, sig.CenterHz)
  641. }
  642. st.mu.Lock()
  643. ps := st.result.PS
  644. st.mu.Unlock()
  645. if ps != "" && sig.PLL != nil {
  646. sig.PLL.RDSStation = strings.TrimSpace(ps)
  647. cls.PLL = sig.PLL
  648. }
  649. }
  650. func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) {
  651. if len(rt.rdsMap) > 0 {
  652. activeIDs := make(map[int64]bool, len(displaySignals))
  653. for _, s := range displaySignals {
  654. keyHz := s.CenterHz
  655. if s.PLL != nil && s.PLL.ExactHz != 0 {
  656. keyHz = s.PLL.ExactHz
  657. }
  658. activeIDs[int64(math.Round(keyHz/25000.0))] = true
  659. }
  660. for id := range rt.rdsMap {
  661. if !activeIDs[id] {
  662. delete(rt.rdsMap, id)
  663. }
  664. }
  665. }
  666. if len(rt.streamPhaseState) > 0 {
  667. sigIDs := make(map[int64]bool, len(displaySignals))
  668. for _, s := range displaySignals {
  669. sigIDs[s.ID] = true
  670. }
  671. for id := range rt.streamPhaseState {
  672. if !sigIDs[id] {
  673. delete(rt.streamPhaseState, id)
  674. }
  675. }
  676. }
  677. if rec != nil && len(displaySignals) > 0 {
  678. aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
  679. _ = aqCfg
  680. }
  681. }
  682. func spanForPolicy(policy pipeline.Policy, fallback float64) float64 {
  683. if policy.MonitorSpanHz > 0 {
  684. return policy.MonitorSpanHz
  685. }
  686. if policy.MonitorStartHz != 0 && policy.MonitorEndHz != 0 && policy.MonitorEndHz > policy.MonitorStartHz {
  687. return policy.MonitorEndHz - policy.MonitorStartHz
  688. }
  689. return fallback
  690. }
  691. func windowSpanBounds(windows []pipeline.RefinementWindow) (float64, float64, bool) {
  692. minSpan := 0.0
  693. maxSpan := 0.0
  694. ok := false
  695. for _, w := range windows {
  696. if w.SpanHz <= 0 {
  697. continue
  698. }
  699. if !ok || w.SpanHz < minSpan {
  700. minSpan = w.SpanHz
  701. }
  702. if !ok || w.SpanHz > maxSpan {
  703. maxSpan = w.SpanHz
  704. }
  705. ok = true
  706. }
  707. return minSpan, maxSpan, ok
  708. }
  709. func analysisLevel(name, role, truth string, sampleRate int, fftSize int, centerHz float64, spanHz float64, source string, decimation int, baseRate int) pipeline.AnalysisLevel {
  710. level := pipeline.AnalysisLevel{
  711. Name: name,
  712. Role: role,
  713. Truth: truth,
  714. SampleRate: sampleRate,
  715. FFTSize: fftSize,
  716. CenterHz: centerHz,
  717. SpanHz: spanHz,
  718. Source: source,
  719. }
  720. if level.SampleRate > 0 && level.FFTSize > 0 {
  721. level.BinHz = float64(level.SampleRate) / float64(level.FFTSize)
  722. }
  723. if decimation > 0 {
  724. level.Decimation = decimation
  725. } else if baseRate > 0 && level.SampleRate > 0 && baseRate%level.SampleRate == 0 {
  726. level.Decimation = baseRate / level.SampleRate
  727. }
  728. return level
  729. }
  730. func (rt *dspRuntime) buildSurveillancePlan(policy pipeline.Policy) surveillancePlan {
  731. baseRate := rt.cfg.SampleRate
  732. baseFFT := rt.cfg.Surveillance.AnalysisFFTSize
  733. if baseFFT <= 0 {
  734. baseFFT = rt.cfg.FFTSize
  735. }
  736. span := spanForPolicy(policy, float64(baseRate))
  737. primary := analysisLevel("surveillance", "surveillance", "surveillance", baseRate, baseFFT, rt.cfg.CenterHz, span, "baseband", 1, baseRate)
  738. levels := []pipeline.AnalysisLevel{primary}
  739. specs := []surveillanceLevelSpec{{Level: primary, Decim: 1, AllowGPU: true}}
  740. context := pipeline.AnalysisContext{Surveillance: primary}
  741. strategy := strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy))
  742. switch strategy {
  743. case "multi-res", "multi-resolution", "multi", "multi_res":
  744. decim := 2
  745. derivedRate := baseRate / decim
  746. derivedFFT := baseFFT / decim
  747. if derivedRate >= 200000 && derivedFFT >= 256 {
  748. derivedSpan := spanForPolicy(policy, float64(derivedRate))
  749. derived := analysisLevel("surveillance-lowres", "surveillance-lowres", "surveillance", derivedRate, derivedFFT, rt.cfg.CenterHz, derivedSpan, "decimated", decim, baseRate)
  750. levels = append(levels, derived)
  751. specs = append(specs, surveillanceLevelSpec{Level: derived, Decim: decim, AllowGPU: false})
  752. context.Derived = append(context.Derived, derived)
  753. }
  754. }
  755. presentation := analysisLevel("presentation", "presentation", "presentation", baseRate, rt.cfg.Surveillance.DisplayBins, rt.cfg.CenterHz, span, "display", 1, baseRate)
  756. context.Presentation = presentation
  757. return surveillancePlan{
  758. Primary: primary,
  759. Levels: levels,
  760. Presentation: presentation,
  761. Context: context,
  762. Specs: specs,
  763. }
  764. }
  765. func sameIQBuffer(a []complex64, b []complex64) bool {
  766. if len(a) != len(b) {
  767. return false
  768. }
  769. if len(a) == 0 {
  770. return true
  771. }
  772. return &a[0] == &b[0]
  773. }
  774. func markWorkItemsStatus(items []pipeline.RefinementWorkItem, from string, to string, reason string) {
  775. for i := range items {
  776. if items[i].Status != from {
  777. continue
  778. }
  779. items[i].Status = to
  780. if reason != "" {
  781. items[i].Reason = reason
  782. }
  783. }
  784. }
  785. func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pipeline.Candidate) {
  786. if len(items) == 0 || len(candidates) == 0 {
  787. return
  788. }
  789. done := map[int64]struct{}{}
  790. for _, cand := range candidates {
  791. if cand.ID != 0 {
  792. done[cand.ID] = struct{}{}
  793. }
  794. }
  795. for i := range items {
  796. if _, ok := done[items[i].Candidate.ID]; !ok {
  797. continue
  798. }
  799. items[i].Status = pipeline.RefinementStatusCompleted
  800. items[i].Reason = pipeline.RefinementReasonCompleted
  801. }
  802. }
  803. func (rt *dspRuntime) setArbitration(policy pipeline.Policy, budget pipeline.BudgetModel, admission pipeline.RefinementAdmission, queue pipeline.DecisionQueueStats) {
  804. rt.arbitration = pipeline.BuildArbitrationState(policy, budget, admission, queue)
  805. }