Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

708 wiersze
21KB

  1. package main
  2. import (
  3. "math"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "sdr-wideband-suite/internal/classifier"
  9. "sdr-wideband-suite/internal/config"
  10. "sdr-wideband-suite/internal/demod"
  11. "sdr-wideband-suite/internal/detector"
  12. "sdr-wideband-suite/internal/dsp"
  13. fftutil "sdr-wideband-suite/internal/fft"
  14. "sdr-wideband-suite/internal/fft/gpufft"
  15. "sdr-wideband-suite/internal/pipeline"
  16. "sdr-wideband-suite/internal/rds"
  17. "sdr-wideband-suite/internal/recorder"
  18. )
  19. type rdsState struct {
  20. dec rds.Decoder
  21. result rds.Result
  22. lastDecode time.Time
  23. busy int32
  24. mu sync.Mutex
  25. }
  26. type dspRuntime struct {
  27. cfg config.Config
  28. det *detector.Detector
  29. window []float64
  30. plan *fftutil.CmplxPlan
  31. detailWindow []float64
  32. detailPlan *fftutil.CmplxPlan
  33. detailFFT int
  34. dcEnabled bool
  35. iqEnabled bool
  36. useGPU bool
  37. gpuEngine *gpufft.Engine
  38. rdsMap map[int64]*rdsState
  39. streamPhaseState map[int64]*streamExtractState
  40. streamOverlap *streamIQOverlap
  41. arbiter *pipeline.Arbiter
  42. arbitration pipeline.ArbitrationState
  43. gotSamples bool
  44. }
  45. type spectrumArtifacts struct {
  46. allIQ []complex64
  47. surveillanceIQ []complex64
  48. detailIQ []complex64
  49. surveillanceSpectrum []float64
  50. detailSpectrum []float64
  51. finished []detector.Event
  52. detected []detector.Signal
  53. thresholds []float64
  54. noiseFloor float64
  55. now time.Time
  56. }
  57. func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime {
  58. detailFFT := cfg.Refinement.DetailFFTSize
  59. if detailFFT <= 0 {
  60. detailFFT = cfg.FFTSize
  61. }
  62. rt := &dspRuntime{
  63. cfg: cfg,
  64. det: det,
  65. window: window,
  66. plan: fftutil.NewCmplxPlan(cfg.FFTSize),
  67. detailWindow: fftutil.Hann(detailFFT),
  68. detailPlan: fftutil.NewCmplxPlan(detailFFT),
  69. detailFFT: detailFFT,
  70. dcEnabled: cfg.DCBlock,
  71. iqEnabled: cfg.IQBalance,
  72. useGPU: cfg.UseGPUFFT,
  73. rdsMap: map[int64]*rdsState{},
  74. streamPhaseState: map[int64]*streamExtractState{},
  75. streamOverlap: &streamIQOverlap{},
  76. arbiter: pipeline.NewArbiter(),
  77. }
  78. if rt.useGPU && gpuState != nil {
  79. snap := gpuState.snapshot()
  80. if snap.Available {
  81. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  82. rt.gpuEngine = eng
  83. gpuState.set(true, nil)
  84. } else {
  85. gpuState.set(false, err)
  86. rt.useGPU = false
  87. }
  88. }
  89. }
  90. return rt
  91. }
  92. func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) {
  93. prevFFT := rt.cfg.FFTSize
  94. prevUseGPU := rt.useGPU
  95. prevDetailFFT := rt.detailFFT
  96. rt.cfg = upd.cfg
  97. if rec != nil {
  98. rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{
  99. Enabled: rt.cfg.Recorder.Enabled,
  100. MinSNRDb: rt.cfg.Recorder.MinSNRDb,
  101. MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second),
  102. MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second),
  103. PrerollMs: rt.cfg.Recorder.PrerollMs,
  104. RecordIQ: rt.cfg.Recorder.RecordIQ,
  105. RecordAudio: rt.cfg.Recorder.RecordAudio,
  106. AutoDemod: rt.cfg.Recorder.AutoDemod,
  107. AutoDecode: rt.cfg.Recorder.AutoDecode,
  108. MaxDiskMB: rt.cfg.Recorder.MaxDiskMB,
  109. OutputDir: rt.cfg.Recorder.OutputDir,
  110. ClassFilter: rt.cfg.Recorder.ClassFilter,
  111. RingSeconds: rt.cfg.Recorder.RingSeconds,
  112. DeemphasisUs: rt.cfg.Recorder.DeemphasisUs,
  113. ExtractionTaps: rt.cfg.Recorder.ExtractionTaps,
  114. ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult,
  115. }, rt.cfg.CenterHz, buildDecoderMap(rt.cfg))
  116. }
  117. if upd.det != nil {
  118. rt.det = upd.det
  119. }
  120. if upd.window != nil {
  121. rt.window = upd.window
  122. rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize)
  123. }
  124. detailFFT := rt.cfg.Refinement.DetailFFTSize
  125. if detailFFT <= 0 {
  126. detailFFT = rt.cfg.FFTSize
  127. }
  128. if detailFFT != prevDetailFFT {
  129. rt.detailFFT = detailFFT
  130. rt.detailWindow = fftutil.Hann(detailFFT)
  131. rt.detailPlan = fftutil.NewCmplxPlan(detailFFT)
  132. }
  133. rt.dcEnabled = upd.dcBlock
  134. rt.iqEnabled = upd.iqBalance
  135. if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU {
  136. srcMgr.Flush()
  137. rt.gotSamples = false
  138. if rt.gpuEngine != nil {
  139. rt.gpuEngine.Close()
  140. rt.gpuEngine = nil
  141. }
  142. rt.useGPU = rt.cfg.UseGPUFFT
  143. if rt.useGPU && gpuState != nil {
  144. snap := gpuState.snapshot()
  145. if snap.Available {
  146. if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil {
  147. rt.gpuEngine = eng
  148. gpuState.set(true, nil)
  149. } else {
  150. gpuState.set(false, err)
  151. rt.useGPU = false
  152. }
  153. } else {
  154. gpuState.set(false, nil)
  155. rt.useGPU = false
  156. }
  157. } else if gpuState != nil {
  158. gpuState.set(false, nil)
  159. }
  160. }
  161. }
  162. func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 {
  163. return rt.spectrumFromIQWithPlan(iq, rt.window, rt.plan, gpuState, true)
  164. }
  165. func (rt *dspRuntime) spectrumFromIQWithPlan(iq []complex64, window []float64, plan *fftutil.CmplxPlan, gpuState *gpuStatus, allowGPU bool) []float64 {
  166. if len(iq) == 0 {
  167. return nil
  168. }
  169. if allowGPU && rt.useGPU && rt.gpuEngine != nil {
  170. gpuBuf := make([]complex64, len(iq))
  171. if len(window) == len(iq) {
  172. for i := 0; i < len(iq); i++ {
  173. v := iq[i]
  174. w := float32(window[i])
  175. gpuBuf[i] = complex(real(v)*w, imag(v)*w)
  176. }
  177. } else {
  178. copy(gpuBuf, iq)
  179. }
  180. out, err := rt.gpuEngine.Exec(gpuBuf)
  181. if err != nil {
  182. if gpuState != nil {
  183. gpuState.set(false, err)
  184. }
  185. rt.useGPU = false
  186. return fftutil.SpectrumWithPlan(gpuBuf, nil, plan)
  187. }
  188. return fftutil.SpectrumFromFFT(out)
  189. }
  190. return fftutil.SpectrumWithPlan(iq, window, plan)
  191. }
  192. func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) {
  193. required := rt.cfg.FFTSize
  194. if rt.detailFFT > required {
  195. required = rt.detailFFT
  196. }
  197. available := required
  198. st := srcMgr.Stats()
  199. if st.BufferSamples > required {
  200. available = (st.BufferSamples / required) * required
  201. if available < required {
  202. available = required
  203. }
  204. }
  205. allIQ, err := srcMgr.ReadIQ(available)
  206. if err != nil {
  207. return nil, err
  208. }
  209. if rec != nil {
  210. rec.Ingest(time.Now(), allIQ)
  211. }
  212. survIQ := allIQ
  213. if len(allIQ) > rt.cfg.FFTSize {
  214. survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:]
  215. }
  216. detailIQ := survIQ
  217. if rt.detailFFT > 0 && len(allIQ) >= rt.detailFFT {
  218. detailIQ = allIQ[len(allIQ)-rt.detailFFT:]
  219. }
  220. if rt.dcEnabled {
  221. dcBlocker.Apply(allIQ)
  222. }
  223. if rt.iqEnabled {
  224. dsp.IQBalance(survIQ)
  225. if !sameIQBuffer(detailIQ, survIQ) {
  226. detailIQ = append([]complex64(nil), detailIQ...)
  227. dsp.IQBalance(detailIQ)
  228. }
  229. }
  230. survSpectrum := rt.spectrumFromIQ(survIQ, gpuState)
  231. for i := range survSpectrum {
  232. if math.IsNaN(survSpectrum[i]) || math.IsInf(survSpectrum[i], 0) {
  233. survSpectrum[i] = -200
  234. }
  235. }
  236. detailSpectrum := survSpectrum
  237. if !sameIQBuffer(detailIQ, survIQ) {
  238. detailSpectrum = rt.spectrumFromIQWithPlan(detailIQ, rt.detailWindow, rt.detailPlan, gpuState, false)
  239. for i := range detailSpectrum {
  240. if math.IsNaN(detailSpectrum[i]) || math.IsInf(detailSpectrum[i], 0) {
  241. detailSpectrum[i] = -200
  242. }
  243. }
  244. }
  245. now := time.Now()
  246. finished, detected := rt.det.Process(now, survSpectrum, rt.cfg.CenterHz)
  247. return &spectrumArtifacts{
  248. allIQ: allIQ,
  249. surveillanceIQ: survIQ,
  250. detailIQ: detailIQ,
  251. surveillanceSpectrum: survSpectrum,
  252. detailSpectrum: detailSpectrum,
  253. finished: finished,
  254. detected: detected,
  255. thresholds: rt.det.LastThresholds(),
  256. noiseFloor: rt.det.LastNoiseFloor(),
  257. now: now,
  258. }, nil
  259. }
  260. func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.SurveillanceResult {
  261. if art == nil {
  262. return pipeline.SurveillanceResult{}
  263. }
  264. policy := pipeline.PolicyFromConfig(rt.cfg)
  265. candidates := pipeline.CandidatesFromSignals(art.detected, "surveillance-detector")
  266. scheduled := pipeline.ScheduleCandidates(candidates, policy)
  267. level := pipeline.AnalysisLevel{
  268. Name: "surveillance",
  269. Role: "surveillance",
  270. Truth: "surveillance",
  271. SampleRate: rt.cfg.SampleRate,
  272. FFTSize: rt.cfg.Surveillance.AnalysisFFTSize,
  273. CenterHz: rt.cfg.CenterHz,
  274. SpanHz: spanForPolicy(policy, float64(rt.cfg.SampleRate)),
  275. Source: "baseband",
  276. }
  277. lowRate := rt.cfg.SampleRate / 2
  278. lowFFT := rt.cfg.Surveillance.AnalysisFFTSize / 2
  279. if lowRate < 200000 {
  280. lowRate = rt.cfg.SampleRate
  281. }
  282. if lowFFT < 256 {
  283. lowFFT = rt.cfg.Surveillance.AnalysisFFTSize
  284. }
  285. lowLevel := pipeline.AnalysisLevel{
  286. Name: "surveillance-lowres",
  287. Role: "surveillance-lowres",
  288. Truth: "surveillance",
  289. SampleRate: lowRate,
  290. FFTSize: lowFFT,
  291. CenterHz: rt.cfg.CenterHz,
  292. SpanHz: spanForPolicy(policy, float64(lowRate)),
  293. Source: "downsampled",
  294. }
  295. displayLevel := pipeline.AnalysisLevel{
  296. Name: "presentation",
  297. Role: "presentation",
  298. Truth: "presentation",
  299. SampleRate: rt.cfg.SampleRate,
  300. FFTSize: rt.cfg.Surveillance.DisplayBins,
  301. CenterHz: rt.cfg.CenterHz,
  302. SpanHz: spanForPolicy(policy, float64(rt.cfg.SampleRate)),
  303. Source: "display",
  304. }
  305. levels, context := surveillanceLevels(policy, level, lowLevel, displayLevel)
  306. return pipeline.SurveillanceResult{
  307. Level: level,
  308. Levels: levels,
  309. DisplayLevel: displayLevel,
  310. Context: context,
  311. Candidates: candidates,
  312. Scheduled: scheduled,
  313. Finished: art.finished,
  314. Signals: art.detected,
  315. NoiseFloor: art.noiseFloor,
  316. Thresholds: art.thresholds,
  317. }
  318. }
  319. func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now time.Time) pipeline.RefinementInput {
  320. policy := pipeline.PolicyFromConfig(rt.cfg)
  321. plan := pipeline.BuildRefinementPlan(surv.Candidates, policy)
  322. admission := rt.arbiter.AdmitRefinement(plan, policy, now)
  323. plan = admission.Plan
  324. workItems := make([]pipeline.RefinementWorkItem, 0, len(admission.WorkItems))
  325. if len(admission.WorkItems) > 0 {
  326. workItems = append(workItems, admission.WorkItems...)
  327. }
  328. scheduled := append([]pipeline.ScheduledCandidate(nil), admission.Admitted...)
  329. workIndex := map[int64]int{}
  330. for i := range workItems {
  331. if workItems[i].Candidate.ID == 0 {
  332. continue
  333. }
  334. workIndex[workItems[i].Candidate.ID] = i
  335. }
  336. windows := make([]pipeline.RefinementWindow, 0, len(scheduled))
  337. for _, sc := range scheduled {
  338. window := pipeline.RefinementWindowForCandidate(policy, sc.Candidate)
  339. windows = append(windows, window)
  340. if idx, ok := workIndex[sc.Candidate.ID]; ok {
  341. workItems[idx].Window = window
  342. }
  343. }
  344. detailFFT := rt.cfg.Refinement.DetailFFTSize
  345. if detailFFT <= 0 {
  346. detailFFT = rt.cfg.FFTSize
  347. }
  348. levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate))
  349. if _, maxSpan, ok := windowSpanBounds(windows); ok {
  350. levelSpan = maxSpan
  351. }
  352. level := pipeline.AnalysisLevel{
  353. Name: "refinement",
  354. Role: "refinement",
  355. Truth: "refinement",
  356. SampleRate: rt.cfg.SampleRate,
  357. FFTSize: detailFFT,
  358. CenterHz: rt.cfg.CenterHz,
  359. SpanHz: levelSpan,
  360. Source: "refinement-window",
  361. }
  362. detailLevel := pipeline.AnalysisLevel{
  363. Name: "detail",
  364. Role: "detail",
  365. Truth: "refinement",
  366. SampleRate: rt.cfg.SampleRate,
  367. FFTSize: detailFFT,
  368. CenterHz: rt.cfg.CenterHz,
  369. SpanHz: levelSpan,
  370. Source: "detail-spectrum",
  371. }
  372. if len(workItems) > 0 {
  373. for i := range workItems {
  374. item := &workItems[i]
  375. if item.Window.SpanHz <= 0 {
  376. continue
  377. }
  378. item.Execution = &pipeline.RefinementExecution{
  379. Stage: "refine",
  380. SampleRate: rt.cfg.SampleRate,
  381. FFTSize: detailFFT,
  382. CenterHz: item.Window.CenterHz,
  383. SpanHz: item.Window.SpanHz,
  384. Source: detailLevel.Source,
  385. }
  386. }
  387. }
  388. input := pipeline.RefinementInput{
  389. Level: level,
  390. Detail: detailLevel,
  391. Context: surv.Context,
  392. Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan},
  393. Budgets: pipeline.BudgetModelFromPolicy(policy),
  394. Admission: admission.Admission,
  395. Candidates: append([]pipeline.Candidate(nil), surv.Candidates...),
  396. Scheduled: scheduled,
  397. WorkItems: workItems,
  398. Plan: plan,
  399. Windows: windows,
  400. SampleRate: rt.cfg.SampleRate,
  401. FFTSize: detailFFT,
  402. CenterHz: rt.cfg.CenterHz,
  403. Source: "surveillance-detector",
  404. }
  405. input.Context.Refinement = level
  406. input.Context.Detail = detailLevel
  407. if !policy.RefinementEnabled {
  408. for i := range input.WorkItems {
  409. item := &input.WorkItems[i]
  410. if item.Status == pipeline.RefinementStatusDropped {
  411. continue
  412. }
  413. item.Status = pipeline.RefinementStatusDropped
  414. item.Reason = pipeline.RefinementReasonDisabled
  415. }
  416. input.Scheduled = nil
  417. input.Request.Reason = pipeline.ReasonAdmissionDisabled
  418. input.Admission.Reason = pipeline.ReasonAdmissionDisabled
  419. input.Admission.Admitted = 0
  420. input.Admission.Skipped = 0
  421. input.Admission.Displaced = 0
  422. input.Plan.Selected = nil
  423. input.Plan.DroppedByBudget = 0
  424. }
  425. rt.setArbitration(policy, input.Budgets, input.Admission, rt.arbitration.Queue)
  426. return input
  427. }
  428. func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep {
  429. input := rt.buildRefinementInput(surv, art.now)
  430. markWorkItemsStatus(input.WorkItems, pipeline.RefinementStatusAdmitted, pipeline.RefinementStatusRunning, pipeline.RefinementReasonRunning)
  431. result := rt.refineSignals(art, input, extractMgr, rec)
  432. markWorkItemsCompleted(input.WorkItems, result.Candidates)
  433. return pipeline.RefinementStep{Input: input, Result: result}
  434. }
  435. func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.RefinementInput, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult {
  436. if art == nil || len(art.detailIQ) == 0 || len(input.Scheduled) == 0 {
  437. return pipeline.RefinementResult{}
  438. }
  439. policy := pipeline.PolicyFromConfig(rt.cfg)
  440. selectedCandidates := make([]pipeline.Candidate, 0, len(input.Scheduled))
  441. selectedSignals := make([]detector.Signal, 0, len(input.Scheduled))
  442. for _, sc := range input.Scheduled {
  443. selectedCandidates = append(selectedCandidates, sc.Candidate)
  444. selectedSignals = append(selectedSignals, detector.Signal{
  445. ID: sc.Candidate.ID,
  446. FirstBin: sc.Candidate.FirstBin,
  447. LastBin: sc.Candidate.LastBin,
  448. CenterHz: sc.Candidate.CenterHz,
  449. BWHz: sc.Candidate.BandwidthHz,
  450. PeakDb: sc.Candidate.PeakDb,
  451. SNRDb: sc.Candidate.SNRDb,
  452. NoiseDb: sc.Candidate.NoiseDb,
  453. })
  454. }
  455. sampleRate := input.SampleRate
  456. fftSize := input.FFTSize
  457. centerHz := input.CenterHz
  458. if sampleRate <= 0 {
  459. sampleRate = rt.cfg.SampleRate
  460. }
  461. if fftSize <= 0 {
  462. fftSize = rt.cfg.FFTSize
  463. }
  464. if centerHz == 0 {
  465. centerHz = rt.cfg.CenterHz
  466. }
  467. snips, snipRates := extractSignalIQBatch(extractMgr, art.detailIQ, sampleRate, centerHz, selectedSignals)
  468. refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.detailSpectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
  469. signals := make([]detector.Signal, 0, len(refined))
  470. decisions := make([]pipeline.SignalDecision, 0, len(refined))
  471. for i, ref := range refined {
  472. sig := ref.Signal
  473. signals = append(signals, sig)
  474. cls := sig.Class
  475. snipRate := ref.SnippetRate
  476. decision := pipeline.DecideSignalAction(policy, ref.Candidate, cls)
  477. decisions = append(decisions, decision)
  478. if cls != nil {
  479. pll := classifier.PLLResult{}
  480. if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
  481. pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
  482. cls.PLL = &pll
  483. signals[i].PLL = &pll
  484. if cls.ModType == classifier.ClassWFM && pll.Stereo {
  485. cls.ModType = classifier.ClassWFMStereo
  486. }
  487. }
  488. if (cls.ModType == classifier.ClassWFM || cls.ModType == classifier.ClassWFMStereo) && rec != nil {
  489. rt.updateRDS(art.now, rec, &signals[i], cls)
  490. }
  491. }
  492. }
  493. budget := pipeline.BudgetModelFromPolicy(policy)
  494. queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy)
  495. rt.setArbitration(policy, budget, input.Admission, queueStats)
  496. summary := summarizeDecisions(decisions)
  497. if rec != nil {
  498. if summary.RecordEnabled > 0 {
  499. rt.cfg.Recorder.Enabled = true
  500. }
  501. if summary.DecodeEnabled > 0 {
  502. rt.cfg.Recorder.AutoDecode = true
  503. }
  504. }
  505. rt.det.UpdateClasses(signals)
  506. return pipeline.RefinementResult{Level: input.Level, Signals: signals, Decisions: decisions, Candidates: selectedCandidates}
  507. }
  508. func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) {
  509. if sig == nil || cls == nil {
  510. return
  511. }
  512. keyHz := sig.CenterHz
  513. if sig.PLL != nil && sig.PLL.ExactHz != 0 {
  514. keyHz = sig.PLL.ExactHz
  515. }
  516. key := int64(math.Round(keyHz / 25000.0))
  517. st := rt.rdsMap[key]
  518. if st == nil {
  519. st = &rdsState{}
  520. rt.rdsMap[key] = st
  521. }
  522. if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
  523. st.lastDecode = now
  524. atomic.StoreInt32(&st.busy, 1)
  525. go func(st *rdsState, sigHz float64) {
  526. defer atomic.StoreInt32(&st.busy, 0)
  527. ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
  528. if len(ringIQ) < ringSR || ringSR <= 0 {
  529. return
  530. }
  531. offset := sigHz - ringCenter
  532. shifted := dsp.FreqShift(ringIQ, ringSR, offset)
  533. decim1 := ringSR / 1000000
  534. if decim1 < 1 {
  535. decim1 = 1
  536. }
  537. lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
  538. f1 := dsp.ApplyFIR(shifted, lp1)
  539. d1 := dsp.Decimate(f1, decim1)
  540. rate1 := ringSR / decim1
  541. decim2 := rate1 / 250000
  542. if decim2 < 1 {
  543. decim2 = 1
  544. }
  545. lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
  546. f2 := dsp.ApplyFIR(d1, lp2)
  547. decimated := dsp.Decimate(f2, decim2)
  548. actualRate := rate1 / decim2
  549. rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
  550. if len(rdsBase.Samples) == 0 {
  551. return
  552. }
  553. st.mu.Lock()
  554. result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
  555. if result.PS != "" {
  556. st.result = result
  557. }
  558. st.mu.Unlock()
  559. }(st, sig.CenterHz)
  560. }
  561. st.mu.Lock()
  562. ps := st.result.PS
  563. st.mu.Unlock()
  564. if ps != "" && sig.PLL != nil {
  565. sig.PLL.RDSStation = strings.TrimSpace(ps)
  566. cls.PLL = sig.PLL
  567. }
  568. }
  569. func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) {
  570. if len(rt.rdsMap) > 0 {
  571. activeIDs := make(map[int64]bool, len(displaySignals))
  572. for _, s := range displaySignals {
  573. keyHz := s.CenterHz
  574. if s.PLL != nil && s.PLL.ExactHz != 0 {
  575. keyHz = s.PLL.ExactHz
  576. }
  577. activeIDs[int64(math.Round(keyHz/25000.0))] = true
  578. }
  579. for id := range rt.rdsMap {
  580. if !activeIDs[id] {
  581. delete(rt.rdsMap, id)
  582. }
  583. }
  584. }
  585. if len(rt.streamPhaseState) > 0 {
  586. sigIDs := make(map[int64]bool, len(displaySignals))
  587. for _, s := range displaySignals {
  588. sigIDs[s.ID] = true
  589. }
  590. for id := range rt.streamPhaseState {
  591. if !sigIDs[id] {
  592. delete(rt.streamPhaseState, id)
  593. }
  594. }
  595. }
  596. if rec != nil && len(displaySignals) > 0 {
  597. aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
  598. _ = aqCfg
  599. }
  600. }
  601. func spanForPolicy(policy pipeline.Policy, fallback float64) float64 {
  602. if policy.MonitorSpanHz > 0 {
  603. return policy.MonitorSpanHz
  604. }
  605. if policy.MonitorStartHz != 0 && policy.MonitorEndHz != 0 && policy.MonitorEndHz > policy.MonitorStartHz {
  606. return policy.MonitorEndHz - policy.MonitorStartHz
  607. }
  608. return fallback
  609. }
  610. func windowSpanBounds(windows []pipeline.RefinementWindow) (float64, float64, bool) {
  611. minSpan := 0.0
  612. maxSpan := 0.0
  613. ok := false
  614. for _, w := range windows {
  615. if w.SpanHz <= 0 {
  616. continue
  617. }
  618. if !ok || w.SpanHz < minSpan {
  619. minSpan = w.SpanHz
  620. }
  621. if !ok || w.SpanHz > maxSpan {
  622. maxSpan = w.SpanHz
  623. }
  624. ok = true
  625. }
  626. return minSpan, maxSpan, ok
  627. }
  628. func surveillanceLevels(policy pipeline.Policy, primary pipeline.AnalysisLevel, secondary pipeline.AnalysisLevel, presentation pipeline.AnalysisLevel) ([]pipeline.AnalysisLevel, pipeline.AnalysisContext) {
  629. levels := []pipeline.AnalysisLevel{primary}
  630. context := pipeline.AnalysisContext{
  631. Surveillance: primary,
  632. Presentation: presentation,
  633. }
  634. strategy := strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy))
  635. switch strategy {
  636. case "multi-res", "multi-resolution", "multi", "multi_res":
  637. if secondary.SampleRate != primary.SampleRate || secondary.FFTSize != primary.FFTSize {
  638. levels = append(levels, secondary)
  639. context.Derived = append(context.Derived, secondary)
  640. }
  641. }
  642. return levels, context
  643. }
  644. func sameIQBuffer(a []complex64, b []complex64) bool {
  645. if len(a) != len(b) {
  646. return false
  647. }
  648. if len(a) == 0 {
  649. return true
  650. }
  651. return &a[0] == &b[0]
  652. }
  653. func markWorkItemsStatus(items []pipeline.RefinementWorkItem, from string, to string, reason string) {
  654. for i := range items {
  655. if items[i].Status != from {
  656. continue
  657. }
  658. items[i].Status = to
  659. if reason != "" {
  660. items[i].Reason = reason
  661. }
  662. }
  663. }
  664. func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pipeline.Candidate) {
  665. if len(items) == 0 || len(candidates) == 0 {
  666. return
  667. }
  668. done := map[int64]struct{}{}
  669. for _, cand := range candidates {
  670. if cand.ID != 0 {
  671. done[cand.ID] = struct{}{}
  672. }
  673. }
  674. for i := range items {
  675. if _, ok := done[items[i].Candidate.ID]; !ok {
  676. continue
  677. }
  678. items[i].Status = pipeline.RefinementStatusCompleted
  679. items[i].Reason = pipeline.RefinementReasonCompleted
  680. }
  681. }
  682. func (rt *dspRuntime) setArbitration(policy pipeline.Policy, budget pipeline.BudgetModel, admission pipeline.RefinementAdmission, queue pipeline.DecisionQueueStats) {
  683. rt.arbitration = pipeline.BuildArbitrationState(policy, budget, admission, queue)
  684. }