Wideband autonomous SDR analysis engine forked from sdr-visual-suite
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

1208 行
38KB

  1. package main
  2. import (
  3. "fmt"
  4. "math"
  5. "os"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "sdr-wideband-suite/internal/classifier"
  12. "sdr-wideband-suite/internal/config"
  13. "sdr-wideband-suite/internal/demod"
  14. "sdr-wideband-suite/internal/detector"
  15. "sdr-wideband-suite/internal/dsp"
  16. "sdr-wideband-suite/internal/logging"
  17. fftutil "sdr-wideband-suite/internal/fft"
  18. "sdr-wideband-suite/internal/fft/gpufft"
  19. "sdr-wideband-suite/internal/pipeline"
  20. "sdr-wideband-suite/internal/rds"
  21. "sdr-wideband-suite/internal/recorder"
  22. "sdr-wideband-suite/internal/telemetry"
  23. )
  24. type rdsState struct {
  25. dec rds.Decoder
  26. result rds.Result
  27. lastDecode time.Time
  28. busy int32
  29. mu sync.Mutex
  30. }
  31. var forceFixedStreamReadSamples = func() int {
  32. raw := strings.TrimSpace(os.Getenv("SDR_FORCE_FIXED_STREAM_READ_SAMPLES"))
  33. if raw == "" {
  34. return 0
  35. }
  36. v, err := strconv.Atoi(raw)
  37. if err != nil || v <= 0 {
  38. return 0
  39. }
  40. return v
  41. }()
  42. type dspRuntime struct {
  43. cfg config.Config
  44. det *detector.Detector
  45. derivedDetectors map[string]*derivedDetector
  46. nextDerivedBase int64
  47. window []float64
  48. plan *fftutil.CmplxPlan
  49. detailWindow []float64
  50. detailPlan *fftutil.CmplxPlan
  51. detailFFT int
  52. survWindows map[int][]float64
  53. survPlans map[int]*fftutil.CmplxPlan
  54. survFIR map[int][]float64
  55. dcEnabled bool
  56. iqEnabled bool
  57. useGPU bool
  58. gpuEngine *gpufft.Engine
  59. rdsMap map[int64]*rdsState
  60. streamPhaseState map[int64]*streamExtractState
  61. streamOverlap *streamIQOverlap
  62. arbiter *pipeline.Arbiter
  63. arbitration pipeline.ArbitrationState
  64. gotSamples bool
  65. telemetry *telemetry.Collector
  66. lastAllIQTail []complex64
  67. }
  68. type spectrumArtifacts struct {
  69. allIQ []complex64
  70. streamDropped bool
  71. surveillanceIQ []complex64
  72. detailIQ []complex64
  73. surveillanceSpectrum []float64
  74. surveillanceSpectra []pipeline.SurveillanceLevelSpectrum
  75. surveillancePlan surveillancePlan
  76. detailSpectrum []float64
  77. finished []detector.Event
  78. detected []detector.Signal
  79. thresholds []float64
  80. noiseFloor float64
  81. now time.Time
  82. }
  83. type derivedDetector struct {
  84. det *detector.Detector
  85. sampleRate int
  86. fftSize int
  87. idBase int64
  88. }
  89. type surveillanceLevelSpec struct {
  90. Level pipeline.AnalysisLevel
  91. Decim int
  92. AllowGPU bool
  93. }
  94. type surveillancePlan struct {
  95. Primary pipeline.AnalysisLevel
  96. Levels []pipeline.AnalysisLevel
  97. LevelSet pipeline.SurveillanceLevelSet
  98. Presentation pipeline.AnalysisLevel
  99. Context pipeline.AnalysisContext
  100. DetectionPolicy pipeline.SurveillanceDetectionPolicy
  101. Specs []surveillanceLevelSpec
  102. }
  103. const derivedIDBlock = int64(1_000_000_000)
  104. func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus, coll *telemetry.Collector) *dspRuntime {
  105. detailFFT := cfg.Refinement.DetailFFTSize
  106. if detailFFT <= 0 {
  107. detailFFT = cfg.FFTSize
  108. }
  109. rt := &dspRuntime{
  110. cfg: cfg,
  111. det: det,
  112. derivedDetectors: map[string]*derivedDetector{},
  113. nextDerivedBase: -derivedIDBlock,
  114. window: window,
  115. plan: fftutil.NewCmplxPlan(cfg.FFTSize),
  116. detailWindow: fftutil.Hann(detailFFT),
  117. detailPlan: fftutil.NewCmplxPlan(detailFFT),
  118. detailFFT: detailFFT,
  119. survWindows: map[int][]float64{},
  120. survPlans: map[int]*fftutil.CmplxPlan{},
  121. survFIR: map[int][]float64{},
  122. dcEnabled: cfg.DCBlock,
  123. iqEnabled: cfg.IQBalance,
  124. useGPU: cfg.UseGPUFFT,
  125. rdsMap: map[int64]*rdsState{},
  126. streamPhaseState: map[int64]*streamExtractState{},
  127. streamOverlap: &streamIQOverlap{},
  128. arbiter: pipeline.NewArbiter(),
  129. telemetry: coll,
  130. }
  131. if rt.useGPU && gpuState != nil {
  132. snap := gpuState.snapshot()
  133. if snap.Available {
  134. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  135. rt.gpuEngine = eng
  136. gpuState.set(true, nil)
  137. } else {
  138. gpuState.set(false, err)
  139. rt.useGPU = false
  140. }
  141. }
  142. }
  143. return rt
  144. }
  145. func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) {
  146. prevFFT := rt.cfg.FFTSize
  147. prevSampleRate := rt.cfg.SampleRate
  148. prevUseGPU := rt.useGPU
  149. prevDetailFFT := rt.detailFFT
  150. rt.cfg = upd.cfg
  151. if rec != nil {
  152. rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{
  153. Enabled: rt.cfg.Recorder.Enabled,
  154. MinSNRDb: rt.cfg.Recorder.MinSNRDb,
  155. MinDuration: mustParseDuration(rt.cfg.Recorder.MinDuration, 1*time.Second),
  156. MaxDuration: mustParseDuration(rt.cfg.Recorder.MaxDuration, 300*time.Second),
  157. PrerollMs: rt.cfg.Recorder.PrerollMs,
  158. RecordIQ: rt.cfg.Recorder.RecordIQ,
  159. RecordAudio: rt.cfg.Recorder.RecordAudio,
  160. AutoDemod: rt.cfg.Recorder.AutoDemod,
  161. AutoDecode: rt.cfg.Recorder.AutoDecode,
  162. MaxDiskMB: rt.cfg.Recorder.MaxDiskMB,
  163. OutputDir: rt.cfg.Recorder.OutputDir,
  164. ClassFilter: rt.cfg.Recorder.ClassFilter,
  165. RingSeconds: rt.cfg.Recorder.RingSeconds,
  166. DeemphasisUs: rt.cfg.Recorder.DeemphasisUs,
  167. ExtractionTaps: rt.cfg.Recorder.ExtractionTaps,
  168. ExtractionBwMult: rt.cfg.Recorder.ExtractionBwMult,
  169. }, rt.cfg.CenterHz, buildDecoderMap(rt.cfg))
  170. }
  171. if upd.det != nil {
  172. rt.det = upd.det
  173. }
  174. if upd.window != nil {
  175. rt.window = upd.window
  176. rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize)
  177. }
  178. detailFFT := rt.cfg.Refinement.DetailFFTSize
  179. if detailFFT <= 0 {
  180. detailFFT = rt.cfg.FFTSize
  181. }
  182. if detailFFT != prevDetailFFT {
  183. rt.detailFFT = detailFFT
  184. rt.detailWindow = fftutil.Hann(detailFFT)
  185. rt.detailPlan = fftutil.NewCmplxPlan(detailFFT)
  186. }
  187. if prevSampleRate != rt.cfg.SampleRate {
  188. rt.survFIR = map[int][]float64{}
  189. }
  190. if prevFFT != rt.cfg.FFTSize {
  191. rt.survWindows = map[int][]float64{}
  192. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  193. }
  194. if upd.det != nil || prevSampleRate != rt.cfg.SampleRate || prevFFT != rt.cfg.FFTSize {
  195. rt.derivedDetectors = map[string]*derivedDetector{}
  196. rt.nextDerivedBase = -derivedIDBlock
  197. }
  198. rt.dcEnabled = upd.dcBlock
  199. rt.iqEnabled = upd.iqBalance
  200. if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU {
  201. srcMgr.Flush()
  202. rt.gotSamples = false
  203. if rt.gpuEngine != nil {
  204. rt.gpuEngine.Close()
  205. rt.gpuEngine = nil
  206. }
  207. rt.useGPU = rt.cfg.UseGPUFFT
  208. if rt.useGPU && gpuState != nil {
  209. snap := gpuState.snapshot()
  210. if snap.Available {
  211. if eng, err := gpufft.New(rt.cfg.FFTSize); err == nil {
  212. rt.gpuEngine = eng
  213. gpuState.set(true, nil)
  214. } else {
  215. gpuState.set(false, err)
  216. rt.useGPU = false
  217. }
  218. } else {
  219. gpuState.set(false, nil)
  220. rt.useGPU = false
  221. }
  222. } else if gpuState != nil {
  223. gpuState.set(false, nil)
  224. }
  225. }
  226. if rt.telemetry != nil {
  227. rt.telemetry.Event("dsp_config_update", "info", "dsp runtime configuration updated", nil, map[string]any{
  228. "fft_size": rt.cfg.FFTSize,
  229. "sample_rate": rt.cfg.SampleRate,
  230. "use_gpu_fft": rt.cfg.UseGPUFFT,
  231. "detail_fft": rt.detailFFT,
  232. "surv_strategy": rt.cfg.Surveillance.Strategy,
  233. })
  234. }
  235. }
  236. func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 {
  237. return rt.spectrumFromIQWithPlan(iq, rt.window, rt.plan, gpuState, true)
  238. }
  239. func (rt *dspRuntime) spectrumFromIQWithPlan(iq []complex64, window []float64, plan *fftutil.CmplxPlan, gpuState *gpuStatus, allowGPU bool) []float64 {
  240. if len(iq) == 0 {
  241. return nil
  242. }
  243. if allowGPU && rt.useGPU && rt.gpuEngine != nil {
  244. gpuBuf := make([]complex64, len(iq))
  245. if len(window) == len(iq) {
  246. for i := 0; i < len(iq); i++ {
  247. v := iq[i]
  248. w := float32(window[i])
  249. gpuBuf[i] = complex(real(v)*w, imag(v)*w)
  250. }
  251. } else {
  252. copy(gpuBuf, iq)
  253. }
  254. out, err := rt.gpuEngine.Exec(gpuBuf)
  255. if err != nil {
  256. if gpuState != nil {
  257. gpuState.set(false, err)
  258. }
  259. rt.useGPU = false
  260. return fftutil.SpectrumWithPlan(gpuBuf, nil, plan)
  261. }
  262. return fftutil.SpectrumFromFFT(out)
  263. }
  264. return fftutil.SpectrumWithPlan(iq, window, plan)
  265. }
  266. func (rt *dspRuntime) windowForFFT(fftSize int) []float64 {
  267. if fftSize <= 0 {
  268. return nil
  269. }
  270. if fftSize == rt.cfg.FFTSize {
  271. return rt.window
  272. }
  273. if rt.survWindows == nil {
  274. rt.survWindows = map[int][]float64{}
  275. }
  276. if window, ok := rt.survWindows[fftSize]; ok {
  277. return window
  278. }
  279. window := fftutil.Hann(fftSize)
  280. rt.survWindows[fftSize] = window
  281. return window
  282. }
  283. func (rt *dspRuntime) planForFFT(fftSize int) *fftutil.CmplxPlan {
  284. if fftSize <= 0 {
  285. return nil
  286. }
  287. if fftSize == rt.cfg.FFTSize {
  288. return rt.plan
  289. }
  290. if rt.survPlans == nil {
  291. rt.survPlans = map[int]*fftutil.CmplxPlan{}
  292. }
  293. if plan, ok := rt.survPlans[fftSize]; ok {
  294. return plan
  295. }
  296. plan := fftutil.NewCmplxPlan(fftSize)
  297. rt.survPlans[fftSize] = plan
  298. return plan
  299. }
  300. func (rt *dspRuntime) spectrumForLevel(iq []complex64, fftSize int, gpuState *gpuStatus, allowGPU bool) []float64 {
  301. if len(iq) == 0 || fftSize <= 0 {
  302. return nil
  303. }
  304. if len(iq) > fftSize {
  305. iq = iq[len(iq)-fftSize:]
  306. }
  307. window := rt.windowForFFT(fftSize)
  308. plan := rt.planForFFT(fftSize)
  309. return rt.spectrumFromIQWithPlan(iq, window, plan, gpuState, allowGPU)
  310. }
  311. func sanitizeSpectrum(spectrum []float64) {
  312. for i := range spectrum {
  313. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  314. spectrum[i] = -200
  315. }
  316. }
  317. }
  318. func (rt *dspRuntime) decimationTaps(factor int) []float64 {
  319. if factor <= 1 {
  320. return nil
  321. }
  322. if rt.survFIR == nil {
  323. rt.survFIR = map[int][]float64{}
  324. }
  325. if taps, ok := rt.survFIR[factor]; ok {
  326. return taps
  327. }
  328. cutoff := float64(rt.cfg.SampleRate/factor) * 0.5 * 0.8
  329. taps := dsp.LowpassFIR(cutoff, rt.cfg.SampleRate, 101)
  330. rt.survFIR[factor] = taps
  331. return taps
  332. }
  333. func (rt *dspRuntime) decimateSurveillanceIQ(iq []complex64, factor int) []complex64 {
  334. if factor <= 1 {
  335. return iq
  336. }
  337. taps := rt.decimationTaps(factor)
  338. if len(taps) == 0 {
  339. return dsp.Decimate(iq, factor)
  340. }
  341. filtered := dsp.ApplyFIR(iq, taps)
  342. return dsp.Decimate(filtered, factor)
  343. }
  344. func meanMagComplex(samples []complex64) float64 {
  345. if len(samples) == 0 {
  346. return 0
  347. }
  348. var sum float64
  349. for _, v := range samples {
  350. sum += math.Hypot(float64(real(v)), float64(imag(v)))
  351. }
  352. return sum / float64(len(samples))
  353. }
  354. func phaseStepAbs(a, b complex64) float64 {
  355. num := float64(real(a))*float64(imag(b)) - float64(imag(a))*float64(real(b))
  356. den := float64(real(a))*float64(real(b)) + float64(imag(a))*float64(imag(b))
  357. return math.Abs(math.Atan2(num, den))
  358. }
  359. func boundaryMetrics(prevTail []complex64, curr []complex64, window int) (float64, float64, float64, int) {
  360. if len(curr) == 0 {
  361. return 0, 0, 0, 0
  362. }
  363. if window <= 0 {
  364. window = 16
  365. }
  366. headN := window
  367. if len(curr) < headN {
  368. headN = len(curr)
  369. }
  370. headMean := meanMagComplex(curr[:headN])
  371. if len(prevTail) == 0 {
  372. return headMean, 0, 0, headN
  373. }
  374. tailN := window
  375. if len(prevTail) < tailN {
  376. tailN = len(prevTail)
  377. }
  378. tailMean := meanMagComplex(prevTail[len(prevTail)-tailN:])
  379. deltaMag := math.Abs(headMean - tailMean)
  380. phaseJump := phaseStepAbs(prevTail[len(prevTail)-1], curr[0])
  381. score := deltaMag + phaseJump
  382. return headMean, tailMean, score, headN
  383. }
  384. func tailWindowComplex(src []complex64, n int) []complex64 {
  385. if n <= 0 || len(src) == 0 {
  386. return nil
  387. }
  388. if len(src) <= n {
  389. out := make([]complex64, len(src))
  390. copy(out, src)
  391. return out
  392. }
  393. out := make([]complex64, n)
  394. copy(out, src[len(src)-n:])
  395. return out
  396. }
  397. func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) {
  398. start := time.Now()
  399. required := rt.cfg.FFTSize
  400. if rt.detailFFT > required {
  401. required = rt.detailFFT
  402. }
  403. available := required
  404. st := srcMgr.Stats()
  405. if rt.telemetry != nil {
  406. rt.telemetry.SetGauge("source.buffer_samples", float64(st.BufferSamples), nil)
  407. rt.telemetry.SetGauge("source.last_sample_ago_ms", float64(st.LastSampleAgoMs), nil)
  408. rt.telemetry.SetGauge("source.dropped", float64(st.Dropped), nil)
  409. rt.telemetry.SetGauge("source.resets", float64(st.Resets), nil)
  410. }
  411. if forceFixedStreamReadSamples > 0 {
  412. available = forceFixedStreamReadSamples
  413. if available < required {
  414. available = required
  415. }
  416. available = (available / required) * required
  417. if available < required {
  418. available = required
  419. }
  420. logging.Warn("boundary", "fixed_stream_read_samples", "configured", forceFixedStreamReadSamples, "effective", available, "required", required)
  421. } else if st.BufferSamples > required {
  422. available = (st.BufferSamples / required) * required
  423. if available < required {
  424. available = required
  425. }
  426. }
  427. logging.Debug("capture", "read_iq", "required", required, "available", available, "buf", st.BufferSamples, "reset", st.Resets, "drop", st.Dropped)
  428. readStart := time.Now()
  429. allIQ, err := srcMgr.ReadIQ(available)
  430. if err != nil {
  431. if rt.telemetry != nil {
  432. rt.telemetry.IncCounter("capture.read.error", 1, nil)
  433. }
  434. return nil, err
  435. }
  436. if rt.telemetry != nil {
  437. rt.telemetry.Observe("capture.read.duration_ms", float64(time.Since(readStart).Microseconds())/1000.0, nil)
  438. rt.telemetry.Observe("capture.read.samples", float64(len(allIQ)), nil)
  439. }
  440. if rec != nil {
  441. ingestStart := time.Now()
  442. rec.Ingest(time.Now(), allIQ)
  443. if rt.telemetry != nil {
  444. rt.telemetry.Observe("capture.ingest.duration_ms", float64(time.Since(ingestStart).Microseconds())/1000.0, nil)
  445. }
  446. }
  447. // Cap allIQ for downstream extraction to prevent buffer bloat.
  448. // Without this cap, buffer accumulation during processing stalls causes
  449. // increasingly large reads → longer extraction → more accumulation
  450. // (positive feedback loop). For audio streaming this creates >150ms
  451. // feed gaps that produce audible clicks.
  452. // The ring buffer (Ingest above) gets the full data; only extraction is capped.
  453. maxStreamSamples := rt.cfg.SampleRate / rt.cfg.FrameRate * 2
  454. if maxStreamSamples < required {
  455. maxStreamSamples = required
  456. }
  457. maxStreamSamples = (maxStreamSamples / required) * required
  458. streamDropped := false
  459. if len(allIQ) > maxStreamSamples {
  460. allIQ = allIQ[len(allIQ)-maxStreamSamples:]
  461. streamDropped = true
  462. if rt.telemetry != nil {
  463. rt.telemetry.IncCounter("capture.stream_drop.count", 1, nil)
  464. rt.telemetry.Event("iq_dropped", "warn", "capture IQ dropped before extraction", nil, map[string]any{
  465. "max_stream_samples": maxStreamSamples,
  466. "required": required,
  467. })
  468. }
  469. }
  470. logging.Debug("capture", "iq_len", "len", len(allIQ), "surv_fft", rt.cfg.FFTSize, "detail_fft", rt.detailFFT)
  471. survIQ := allIQ
  472. if len(allIQ) > rt.cfg.FFTSize {
  473. survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:]
  474. }
  475. detailIQ := survIQ
  476. if rt.detailFFT > 0 && len(allIQ) >= rt.detailFFT {
  477. detailIQ = allIQ[len(allIQ)-rt.detailFFT:]
  478. }
  479. if rt.dcEnabled {
  480. dcBlocker.Apply(allIQ)
  481. if rt.telemetry != nil {
  482. rt.telemetry.IncCounter("dsp.dc_block.apply", 1, nil)
  483. }
  484. }
  485. if rt.iqEnabled {
  486. // IQBalance must NOT modify allIQ in-place: allIQ goes to the extraction
  487. // pipeline and any in-place modification creates a phase/amplitude
  488. // discontinuity at the survIQ boundary (len-FFTSize) that the polyphase
  489. // extractor then sees as paired click artifacts in the FM discriminator.
  490. detailIsSurv := sameIQBuffer(detailIQ, survIQ)
  491. survIQ = append([]complex64(nil), survIQ...)
  492. dsp.IQBalance(survIQ)
  493. if detailIsSurv {
  494. detailIQ = survIQ
  495. } else {
  496. detailIQ = append([]complex64(nil), detailIQ...)
  497. dsp.IQBalance(detailIQ)
  498. }
  499. }
  500. if rt.telemetry != nil {
  501. rt.telemetry.SetGauge("iq.stage.all.length", float64(len(allIQ)), nil)
  502. rt.telemetry.SetGauge("iq.stage.surveillance.length", float64(len(survIQ)), nil)
  503. rt.telemetry.SetGauge("iq.stage.detail.length", float64(len(detailIQ)), nil)
  504. rt.telemetry.Observe("capture.total.duration_ms", float64(time.Since(start).Microseconds())/1000.0, nil)
  505. headMean, tailMean, boundaryScore, boundaryWindow := boundaryMetrics(rt.lastAllIQTail, allIQ, 32)
  506. rt.telemetry.SetGauge("iq.boundary.all.head_mean_mag", headMean, nil)
  507. rt.telemetry.SetGauge("iq.boundary.all.prev_tail_mean_mag", tailMean, nil)
  508. rt.telemetry.Observe("iq.boundary.all.discontinuity_score", boundaryScore, nil)
  509. if len(rt.lastAllIQTail) > 0 && len(allIQ) > 0 {
  510. deltaMag := math.Abs(math.Hypot(float64(real(allIQ[0])), float64(imag(allIQ[0]))) - math.Hypot(float64(real(rt.lastAllIQTail[len(rt.lastAllIQTail)-1])), float64(imag(rt.lastAllIQTail[len(rt.lastAllIQTail)-1]))))
  511. phaseJump := phaseStepAbs(rt.lastAllIQTail[len(rt.lastAllIQTail)-1], allIQ[0])
  512. rt.telemetry.Observe("iq.boundary.all.delta_mag", deltaMag, nil)
  513. rt.telemetry.Observe("iq.boundary.all.delta_phase", phaseJump, nil)
  514. if rt.telemetry.ShouldSampleHeavy() {
  515. rt.telemetry.Event("alliq_boundary", "info", "allIQ boundary snapshot", nil, map[string]any{
  516. "window": boundaryWindow,
  517. "head_mean_mag": headMean,
  518. "prev_tail_mean_mag": tailMean,
  519. "delta_mag": deltaMag,
  520. "delta_phase": phaseJump,
  521. "discontinuity_score": boundaryScore,
  522. "alliq_len": len(allIQ),
  523. "stream_dropped": streamDropped,
  524. })
  525. }
  526. }
  527. if rt.telemetry.ShouldSampleHeavy() {
  528. observeIQStats(rt.telemetry, "capture_all", allIQ, nil)
  529. observeIQStats(rt.telemetry, "capture_surveillance", survIQ, nil)
  530. observeIQStats(rt.telemetry, "capture_detail", detailIQ, nil)
  531. }
  532. }
  533. rt.lastAllIQTail = tailWindowComplex(allIQ, 32)
  534. survSpectrum := rt.spectrumFromIQ(survIQ, gpuState)
  535. sanitizeSpectrum(survSpectrum)
  536. detailSpectrum := survSpectrum
  537. if !sameIQBuffer(detailIQ, survIQ) {
  538. detailSpectrum = rt.spectrumFromIQWithPlan(detailIQ, rt.detailWindow, rt.detailPlan, gpuState, false)
  539. sanitizeSpectrum(detailSpectrum)
  540. }
  541. policy := pipeline.PolicyFromConfig(rt.cfg)
  542. plan := rt.buildSurveillancePlan(policy)
  543. surveillanceSpectra := make([]pipeline.SurveillanceLevelSpectrum, 0, len(plan.Specs))
  544. for _, spec := range plan.Specs {
  545. if spec.Level.FFTSize <= 0 {
  546. continue
  547. }
  548. var spectrum []float64
  549. if spec.Decim <= 1 {
  550. if spec.Level.FFTSize == len(survSpectrum) {
  551. spectrum = survSpectrum
  552. } else {
  553. spectrum = rt.spectrumForLevel(survIQ, spec.Level.FFTSize, gpuState, spec.AllowGPU)
  554. sanitizeSpectrum(spectrum)
  555. }
  556. } else {
  557. required := spec.Level.FFTSize * spec.Decim
  558. if required > len(survIQ) {
  559. continue
  560. }
  561. src := survIQ
  562. if len(src) > required {
  563. src = src[len(src)-required:]
  564. }
  565. decimated := rt.decimateSurveillanceIQ(src, spec.Decim)
  566. spectrum = rt.spectrumForLevel(decimated, spec.Level.FFTSize, gpuState, false)
  567. sanitizeSpectrum(spectrum)
  568. }
  569. if len(spectrum) == 0 {
  570. continue
  571. }
  572. surveillanceSpectra = append(surveillanceSpectra, pipeline.SurveillanceLevelSpectrum{Level: spec.Level, Spectrum: spectrum})
  573. }
  574. now := time.Now()
  575. finished, detected := rt.det.Process(now, survSpectrum, rt.cfg.CenterHz)
  576. if rt.telemetry != nil {
  577. rt.telemetry.SetGauge("signals.detected.count", float64(len(detected)), nil)
  578. rt.telemetry.SetGauge("signals.finished.count", float64(len(finished)), nil)
  579. }
  580. return &spectrumArtifacts{
  581. allIQ: allIQ,
  582. streamDropped: streamDropped,
  583. surveillanceIQ: survIQ,
  584. detailIQ: detailIQ,
  585. surveillanceSpectrum: survSpectrum,
  586. surveillanceSpectra: surveillanceSpectra,
  587. surveillancePlan: plan,
  588. detailSpectrum: detailSpectrum,
  589. finished: finished,
  590. detected: detected,
  591. thresholds: rt.det.LastThresholds(),
  592. noiseFloor: rt.det.LastNoiseFloor(),
  593. now: now,
  594. }, nil
  595. }
  596. func (rt *dspRuntime) buildSurveillanceResult(art *spectrumArtifacts) pipeline.SurveillanceResult {
  597. if art == nil {
  598. return pipeline.SurveillanceResult{}
  599. }
  600. policy := pipeline.PolicyFromConfig(rt.cfg)
  601. plan := art.surveillancePlan
  602. if plan.Primary.Name == "" {
  603. plan = rt.buildSurveillancePlan(policy)
  604. }
  605. primaryCandidates := pipeline.CandidatesFromSignalsWithLevel(art.detected, "surveillance-detector", plan.Primary)
  606. derivedCandidates := rt.detectDerivedCandidates(art, plan)
  607. candidates := pipeline.FuseCandidates(primaryCandidates, derivedCandidates)
  608. pipeline.ApplyMonitorWindowMatchesToCandidates(policy, candidates)
  609. scheduled := pipeline.ScheduleCandidates(candidates, policy)
  610. return pipeline.SurveillanceResult{
  611. Level: plan.Primary,
  612. Levels: plan.Levels,
  613. LevelSet: plan.LevelSet,
  614. DetectionPolicy: plan.DetectionPolicy,
  615. DisplayLevel: plan.Presentation,
  616. Context: plan.Context,
  617. Spectra: art.surveillanceSpectra,
  618. Candidates: candidates,
  619. Scheduled: scheduled,
  620. Finished: art.finished,
  621. Signals: art.detected,
  622. NoiseFloor: art.noiseFloor,
  623. Thresholds: art.thresholds,
  624. }
  625. }
  626. func (rt *dspRuntime) detectDerivedCandidates(art *spectrumArtifacts, plan surveillancePlan) []pipeline.Candidate {
  627. if art == nil || len(plan.LevelSet.Derived) == 0 {
  628. return nil
  629. }
  630. spectra := map[string][]float64{}
  631. for _, spec := range art.surveillanceSpectra {
  632. if spec.Level.Name == "" || len(spec.Spectrum) == 0 {
  633. continue
  634. }
  635. spectra[spec.Level.Name] = spec.Spectrum
  636. }
  637. if len(spectra) == 0 {
  638. return nil
  639. }
  640. out := make([]pipeline.Candidate, 0, len(plan.LevelSet.Derived))
  641. for _, level := range plan.LevelSet.Derived {
  642. if level.Name == "" {
  643. continue
  644. }
  645. if !pipeline.IsDetectionLevel(level) {
  646. continue
  647. }
  648. spectrum := spectra[level.Name]
  649. if len(spectrum) == 0 {
  650. continue
  651. }
  652. entry := rt.derivedDetectorForLevel(level)
  653. if entry == nil || entry.det == nil {
  654. continue
  655. }
  656. _, signals := entry.det.Process(art.now, spectrum, level.CenterHz)
  657. if len(signals) == 0 {
  658. continue
  659. }
  660. cands := pipeline.CandidatesFromSignalsWithLevel(signals, "surveillance-derived", level)
  661. for i := range cands {
  662. if cands[i].ID == 0 {
  663. continue
  664. }
  665. cands[i].ID = entry.idBase - cands[i].ID
  666. }
  667. out = append(out, cands...)
  668. }
  669. if len(out) == 0 {
  670. return nil
  671. }
  672. return out
  673. }
  674. func (rt *dspRuntime) derivedDetectorForLevel(level pipeline.AnalysisLevel) *derivedDetector {
  675. if level.SampleRate <= 0 || level.FFTSize <= 0 {
  676. return nil
  677. }
  678. if rt.derivedDetectors == nil {
  679. rt.derivedDetectors = map[string]*derivedDetector{}
  680. }
  681. key := level.Name
  682. if key == "" {
  683. key = fmt.Sprintf("%d:%d", level.SampleRate, level.FFTSize)
  684. }
  685. entry := rt.derivedDetectors[key]
  686. if entry != nil && entry.sampleRate == level.SampleRate && entry.fftSize == level.FFTSize {
  687. return entry
  688. }
  689. if rt.nextDerivedBase == 0 {
  690. rt.nextDerivedBase = -derivedIDBlock
  691. }
  692. entry = &derivedDetector{
  693. det: detector.New(rt.cfg.Detector, level.SampleRate, level.FFTSize),
  694. sampleRate: level.SampleRate,
  695. fftSize: level.FFTSize,
  696. idBase: rt.nextDerivedBase,
  697. }
  698. rt.nextDerivedBase -= derivedIDBlock
  699. rt.derivedDetectors[key] = entry
  700. return entry
  701. }
  702. func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult, now time.Time) pipeline.RefinementInput {
  703. policy := pipeline.PolicyFromConfig(rt.cfg)
  704. baseBudget := pipeline.BudgetModelFromPolicy(policy)
  705. pressure := pipeline.BuildBudgetPressureSummary(baseBudget, rt.arbitration.Refinement, rt.arbitration.Queue)
  706. budget := pipeline.ApplyBudgetRebalance(policy, baseBudget, pressure)
  707. plan := pipeline.BuildRefinementPlanWithBudget(surv.Candidates, policy, budget)
  708. admission := rt.arbiter.AdmitRefinementWithBudget(plan, policy, budget, now)
  709. plan = admission.Plan
  710. workItems := make([]pipeline.RefinementWorkItem, 0, len(admission.WorkItems))
  711. if len(admission.WorkItems) > 0 {
  712. workItems = append(workItems, admission.WorkItems...)
  713. }
  714. scheduled := append([]pipeline.ScheduledCandidate(nil), admission.Admitted...)
  715. workIndex := map[int64]int{}
  716. for i := range workItems {
  717. if workItems[i].Candidate.ID == 0 {
  718. continue
  719. }
  720. workIndex[workItems[i].Candidate.ID] = i
  721. }
  722. windows := make([]pipeline.RefinementWindow, 0, len(scheduled))
  723. for _, sc := range scheduled {
  724. window := pipeline.RefinementWindowForCandidate(policy, sc.Candidate)
  725. windows = append(windows, window)
  726. if idx, ok := workIndex[sc.Candidate.ID]; ok {
  727. workItems[idx].Window = window
  728. }
  729. }
  730. detailFFT := rt.cfg.Refinement.DetailFFTSize
  731. if detailFFT <= 0 {
  732. detailFFT = rt.cfg.FFTSize
  733. }
  734. levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate))
  735. if _, maxSpan, ok := windowSpanBounds(windows); ok {
  736. levelSpan = maxSpan
  737. }
  738. level := analysisLevel("refinement", "refinement", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "refinement-window", 1, rt.cfg.SampleRate)
  739. detailLevel := analysisLevel("detail", "detail", "refinement", rt.cfg.SampleRate, detailFFT, rt.cfg.CenterHz, levelSpan, "detail-spectrum", 1, rt.cfg.SampleRate)
  740. if len(workItems) > 0 {
  741. for i := range workItems {
  742. item := &workItems[i]
  743. if item.Window.SpanHz <= 0 {
  744. continue
  745. }
  746. item.Execution = &pipeline.RefinementExecution{
  747. Stage: "refine",
  748. SampleRate: rt.cfg.SampleRate,
  749. FFTSize: detailFFT,
  750. CenterHz: item.Window.CenterHz,
  751. SpanHz: item.Window.SpanHz,
  752. Source: detailLevel.Source,
  753. }
  754. }
  755. }
  756. input := pipeline.RefinementInput{
  757. Level: level,
  758. Detail: detailLevel,
  759. Context: surv.Context,
  760. Request: pipeline.RefinementRequest{Strategy: plan.Strategy, Reason: "surveillance-plan", SpanHintHz: levelSpan},
  761. Budgets: budget,
  762. Admission: admission.Admission,
  763. Candidates: append([]pipeline.Candidate(nil), surv.Candidates...),
  764. Scheduled: scheduled,
  765. WorkItems: workItems,
  766. Plan: plan,
  767. Windows: windows,
  768. SampleRate: rt.cfg.SampleRate,
  769. FFTSize: detailFFT,
  770. CenterHz: rt.cfg.CenterHz,
  771. Source: "surveillance-detector",
  772. }
  773. input.Context.Refinement = level
  774. input.Context.Detail = detailLevel
  775. if !policy.RefinementEnabled {
  776. for i := range input.WorkItems {
  777. item := &input.WorkItems[i]
  778. if item.Status == pipeline.RefinementStatusDropped {
  779. continue
  780. }
  781. item.Status = pipeline.RefinementStatusDropped
  782. item.Reason = pipeline.RefinementReasonDisabled
  783. }
  784. input.Scheduled = nil
  785. input.Request.Reason = pipeline.ReasonAdmissionDisabled
  786. input.Admission.Reason = pipeline.ReasonAdmissionDisabled
  787. input.Admission.Admitted = 0
  788. input.Admission.Skipped = 0
  789. input.Admission.Displaced = 0
  790. input.Plan.Selected = nil
  791. input.Plan.DroppedByBudget = 0
  792. }
  793. rt.setArbitration(policy, input.Budgets, input.Admission, rt.arbitration.Queue)
  794. return input
  795. }
  796. func (rt *dspRuntime) runRefinement(art *spectrumArtifacts, surv pipeline.SurveillanceResult, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementStep {
  797. input := rt.buildRefinementInput(surv, art.now)
  798. markWorkItemsStatus(input.WorkItems, pipeline.RefinementStatusAdmitted, pipeline.RefinementStatusRunning, pipeline.RefinementReasonRunning)
  799. result := rt.refineSignals(art, input, extractMgr, rec)
  800. markWorkItemsCompleted(input.WorkItems, result.Candidates)
  801. return pipeline.RefinementStep{Input: input, Result: result}
  802. }
  803. func (rt *dspRuntime) refineSignals(art *spectrumArtifacts, input pipeline.RefinementInput, extractMgr *extractionManager, rec *recorder.Manager) pipeline.RefinementResult {
  804. if art == nil || len(art.detailIQ) == 0 || len(input.Scheduled) == 0 {
  805. return pipeline.RefinementResult{}
  806. }
  807. policy := pipeline.PolicyFromConfig(rt.cfg)
  808. selectedCandidates := make([]pipeline.Candidate, 0, len(input.Scheduled))
  809. selectedSignals := make([]detector.Signal, 0, len(input.Scheduled))
  810. for _, sc := range input.Scheduled {
  811. selectedCandidates = append(selectedCandidates, sc.Candidate)
  812. selectedSignals = append(selectedSignals, detector.Signal{
  813. ID: sc.Candidate.ID,
  814. FirstBin: sc.Candidate.FirstBin,
  815. LastBin: sc.Candidate.LastBin,
  816. CenterHz: sc.Candidate.CenterHz,
  817. BWHz: sc.Candidate.BandwidthHz,
  818. PeakDb: sc.Candidate.PeakDb,
  819. SNRDb: sc.Candidate.SNRDb,
  820. NoiseDb: sc.Candidate.NoiseDb,
  821. })
  822. }
  823. sampleRate := input.SampleRate
  824. fftSize := input.FFTSize
  825. centerHz := input.CenterHz
  826. if sampleRate <= 0 {
  827. sampleRate = rt.cfg.SampleRate
  828. }
  829. if fftSize <= 0 {
  830. fftSize = rt.cfg.FFTSize
  831. }
  832. if centerHz == 0 {
  833. centerHz = rt.cfg.CenterHz
  834. }
  835. snips, snipRates := extractSignalIQBatch(extractMgr, art.detailIQ, sampleRate, centerHz, selectedSignals)
  836. refined := pipeline.RefineCandidates(selectedCandidates, input.Windows, art.detailSpectrum, sampleRate, fftSize, snips, snipRates, classifier.ClassifierMode(rt.cfg.ClassifierMode))
  837. signals := make([]detector.Signal, 0, len(refined))
  838. decisions := make([]pipeline.SignalDecision, 0, len(refined))
  839. for i, ref := range refined {
  840. sig := ref.Signal
  841. signals = append(signals, sig)
  842. cls := sig.Class
  843. snipRate := ref.SnippetRate
  844. decision := pipeline.DecideSignalAction(policy, ref.Candidate, cls)
  845. decisions = append(decisions, decision)
  846. if cls != nil {
  847. if cls.ModType == classifier.ClassWFM {
  848. cls.ModType = classifier.ClassWFMStereo
  849. signals[i].PlaybackMode = string(classifier.ClassWFMStereo)
  850. signals[i].DemodName = string(classifier.ClassWFMStereo)
  851. signals[i].StereoState = "searching"
  852. }
  853. pll := classifier.PLLResult{}
  854. if i < len(snips) && snips[i] != nil && len(snips[i]) > 256 {
  855. pll = classifier.EstimateExactFrequency(snips[i], snipRate, signals[i].CenterHz, cls.ModType)
  856. cls.PLL = &pll
  857. signals[i].PLL = &pll
  858. if cls.ModType == classifier.ClassWFMStereo {
  859. if pll.Stereo {
  860. signals[i].StereoState = "locked"
  861. } else if signals[i].StereoState == "" {
  862. signals[i].StereoState = "searching"
  863. }
  864. signals[i].PlaybackMode = string(classifier.ClassWFMStereo)
  865. signals[i].DemodName = string(classifier.ClassWFMStereo)
  866. }
  867. }
  868. if cls.ModType == classifier.ClassWFMStereo && rec != nil {
  869. rt.updateRDS(art.now, rec, &signals[i], cls)
  870. if signals[i].PLL != nil && signals[i].PLL.RDSStation != "" {
  871. signals[i].StereoState = "locked"
  872. }
  873. }
  874. }
  875. }
  876. budget := input.Budgets
  877. queueStats := rt.arbiter.ApplyDecisions(decisions, budget, art.now, policy)
  878. rt.setArbitration(policy, budget, input.Admission, queueStats)
  879. summary := summarizeDecisions(decisions)
  880. if rec != nil {
  881. if summary.RecordEnabled > 0 {
  882. rt.cfg.Recorder.Enabled = true
  883. }
  884. if summary.DecodeEnabled > 0 {
  885. rt.cfg.Recorder.AutoDecode = true
  886. }
  887. }
  888. rt.det.UpdateClasses(signals)
  889. return pipeline.RefinementResult{Level: input.Level, Signals: signals, Decisions: decisions, Candidates: selectedCandidates}
  890. }
  891. func (rt *dspRuntime) updateRDS(now time.Time, rec *recorder.Manager, sig *detector.Signal, cls *classifier.Classification) {
  892. if sig == nil || cls == nil {
  893. return
  894. }
  895. keyHz := sig.CenterHz
  896. if sig.PLL != nil && sig.PLL.ExactHz != 0 {
  897. keyHz = sig.PLL.ExactHz
  898. }
  899. key := int64(math.Round(keyHz / 25000.0))
  900. st := rt.rdsMap[key]
  901. if st == nil {
  902. st = &rdsState{}
  903. rt.rdsMap[key] = st
  904. }
  905. if now.Sub(st.lastDecode) >= 4*time.Second && atomic.LoadInt32(&st.busy) == 0 {
  906. st.lastDecode = now
  907. atomic.StoreInt32(&st.busy, 1)
  908. go func(st *rdsState, sigHz float64) {
  909. defer atomic.StoreInt32(&st.busy, 0)
  910. ringIQ, ringSR, ringCenter := rec.SliceRecent(4.0)
  911. if len(ringIQ) < ringSR || ringSR <= 0 {
  912. return
  913. }
  914. offset := sigHz - ringCenter
  915. shifted := dsp.FreqShift(ringIQ, ringSR, offset)
  916. decim1 := ringSR / 1000000
  917. if decim1 < 1 {
  918. decim1 = 1
  919. }
  920. lp1 := dsp.LowpassFIR(float64(ringSR/decim1)/2.0*0.8, ringSR, 51)
  921. f1 := dsp.ApplyFIR(shifted, lp1)
  922. d1 := dsp.Decimate(f1, decim1)
  923. rate1 := ringSR / decim1
  924. decim2 := rate1 / 250000
  925. if decim2 < 1 {
  926. decim2 = 1
  927. }
  928. lp2 := dsp.LowpassFIR(float64(rate1/decim2)/2.0*0.8, rate1, 101)
  929. f2 := dsp.ApplyFIR(d1, lp2)
  930. decimated := dsp.Decimate(f2, decim2)
  931. actualRate := rate1 / decim2
  932. rdsBase := demod.RDSBasebandComplex(decimated, actualRate)
  933. if len(rdsBase.Samples) == 0 {
  934. return
  935. }
  936. st.mu.Lock()
  937. result := st.dec.Decode(rdsBase.Samples, rdsBase.SampleRate)
  938. if result.PS != "" {
  939. st.result = result
  940. }
  941. st.mu.Unlock()
  942. }(st, sig.CenterHz)
  943. }
  944. st.mu.Lock()
  945. ps := st.result.PS
  946. st.mu.Unlock()
  947. if ps != "" && sig.PLL != nil {
  948. sig.PLL.RDSStation = strings.TrimSpace(ps)
  949. cls.PLL = sig.PLL
  950. }
  951. }
  952. func (rt *dspRuntime) maintenance(displaySignals []detector.Signal, rec *recorder.Manager) {
  953. if len(rt.rdsMap) > 0 {
  954. activeIDs := make(map[int64]bool, len(displaySignals))
  955. for _, s := range displaySignals {
  956. keyHz := s.CenterHz
  957. if s.PLL != nil && s.PLL.ExactHz != 0 {
  958. keyHz = s.PLL.ExactHz
  959. }
  960. activeIDs[int64(math.Round(keyHz/25000.0))] = true
  961. }
  962. for id := range rt.rdsMap {
  963. if !activeIDs[id] {
  964. delete(rt.rdsMap, id)
  965. }
  966. }
  967. }
  968. if len(rt.streamPhaseState) > 0 {
  969. sigIDs := make(map[int64]bool, len(displaySignals))
  970. for _, s := range displaySignals {
  971. sigIDs[s.ID] = true
  972. }
  973. for id := range rt.streamPhaseState {
  974. if !sigIDs[id] {
  975. delete(rt.streamPhaseState, id)
  976. }
  977. }
  978. }
  979. if rec != nil && len(displaySignals) > 0 {
  980. aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
  981. _ = aqCfg
  982. }
  983. }
  984. func spanForPolicy(policy pipeline.Policy, fallback float64) float64 {
  985. if policy.MonitorSpanHz > 0 {
  986. return policy.MonitorSpanHz
  987. }
  988. if len(policy.MonitorWindows) > 0 {
  989. maxSpan := 0.0
  990. for _, w := range policy.MonitorWindows {
  991. if w.SpanHz > maxSpan {
  992. maxSpan = w.SpanHz
  993. }
  994. }
  995. if maxSpan > 0 {
  996. return maxSpan
  997. }
  998. }
  999. if policy.MonitorStartHz != 0 && policy.MonitorEndHz != 0 && policy.MonitorEndHz > policy.MonitorStartHz {
  1000. return policy.MonitorEndHz - policy.MonitorStartHz
  1001. }
  1002. return fallback
  1003. }
  1004. func windowSpanBounds(windows []pipeline.RefinementWindow) (float64, float64, bool) {
  1005. minSpan := 0.0
  1006. maxSpan := 0.0
  1007. ok := false
  1008. for _, w := range windows {
  1009. if w.SpanHz <= 0 {
  1010. continue
  1011. }
  1012. if !ok || w.SpanHz < minSpan {
  1013. minSpan = w.SpanHz
  1014. }
  1015. if !ok || w.SpanHz > maxSpan {
  1016. maxSpan = w.SpanHz
  1017. }
  1018. ok = true
  1019. }
  1020. return minSpan, maxSpan, ok
  1021. }
  1022. func analysisLevel(name, role, truth string, sampleRate int, fftSize int, centerHz float64, spanHz float64, source string, decimation int, baseRate int) pipeline.AnalysisLevel {
  1023. level := pipeline.AnalysisLevel{
  1024. Name: name,
  1025. Role: role,
  1026. Truth: truth,
  1027. SampleRate: sampleRate,
  1028. FFTSize: fftSize,
  1029. CenterHz: centerHz,
  1030. SpanHz: spanHz,
  1031. Source: source,
  1032. }
  1033. if level.SampleRate > 0 && level.FFTSize > 0 {
  1034. level.BinHz = float64(level.SampleRate) / float64(level.FFTSize)
  1035. }
  1036. if decimation > 0 {
  1037. level.Decimation = decimation
  1038. } else if baseRate > 0 && level.SampleRate > 0 && baseRate%level.SampleRate == 0 {
  1039. level.Decimation = baseRate / level.SampleRate
  1040. }
  1041. return level
  1042. }
  1043. func (rt *dspRuntime) buildSurveillancePlan(policy pipeline.Policy) surveillancePlan {
  1044. baseRate := rt.cfg.SampleRate
  1045. baseFFT := rt.cfg.Surveillance.AnalysisFFTSize
  1046. if baseFFT <= 0 {
  1047. baseFFT = rt.cfg.FFTSize
  1048. }
  1049. span := spanForPolicy(policy, float64(baseRate))
  1050. detectionPolicy := pipeline.SurveillanceDetectionPolicyFromPolicy(policy)
  1051. primary := analysisLevel("surveillance", pipeline.RoleSurveillancePrimary, "surveillance", baseRate, baseFFT, rt.cfg.CenterHz, span, "baseband", 1, baseRate)
  1052. levels := []pipeline.AnalysisLevel{primary}
  1053. specs := []surveillanceLevelSpec{{Level: primary, Decim: 1, AllowGPU: true}}
  1054. context := pipeline.AnalysisContext{Surveillance: primary}
  1055. derivedLevels := make([]pipeline.AnalysisLevel, 0, 2)
  1056. supportLevels := make([]pipeline.AnalysisLevel, 0, 2)
  1057. strategy := strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy))
  1058. switch strategy {
  1059. case "multi-res", "multi-resolution", "multi", "multi_res":
  1060. decim := 2
  1061. derivedRate := baseRate / decim
  1062. derivedFFT := baseFFT / decim
  1063. if derivedRate >= 200000 && derivedFFT >= 256 {
  1064. derivedSpan := spanForPolicy(policy, float64(derivedRate))
  1065. role := pipeline.RoleSurveillanceSupport
  1066. if detectionPolicy.DerivedDetectionEnabled {
  1067. role = pipeline.RoleSurveillanceDerived
  1068. }
  1069. derived := analysisLevel("surveillance-lowres", role, "surveillance", derivedRate, derivedFFT, rt.cfg.CenterHz, derivedSpan, "decimated", decim, baseRate)
  1070. if detectionPolicy.DerivedDetectionEnabled {
  1071. levels = append(levels, derived)
  1072. derivedLevels = append(derivedLevels, derived)
  1073. } else {
  1074. supportLevels = append(supportLevels, derived)
  1075. }
  1076. specs = append(specs, surveillanceLevelSpec{Level: derived, Decim: decim, AllowGPU: false})
  1077. context.Derived = append(context.Derived, derived)
  1078. }
  1079. }
  1080. presentation := analysisLevel("presentation", pipeline.RolePresentation, "presentation", baseRate, rt.cfg.Surveillance.DisplayBins, rt.cfg.CenterHz, span, "display", 1, baseRate)
  1081. context.Presentation = presentation
  1082. if len(derivedLevels) == 0 && detectionPolicy.DerivedDetectionEnabled {
  1083. detectionPolicy.DerivedDetectionEnabled = false
  1084. detectionPolicy.DerivedDetectionReason = "levels"
  1085. }
  1086. switch {
  1087. case len(derivedLevels) > 0:
  1088. detectionPolicy.DerivedDetectionMode = "detection"
  1089. case len(supportLevels) > 0:
  1090. detectionPolicy.DerivedDetectionMode = "support"
  1091. default:
  1092. detectionPolicy.DerivedDetectionMode = "disabled"
  1093. }
  1094. levelSet := pipeline.SurveillanceLevelSet{
  1095. Primary: primary,
  1096. Derived: append([]pipeline.AnalysisLevel(nil), derivedLevels...),
  1097. Support: append([]pipeline.AnalysisLevel(nil), supportLevels...),
  1098. Presentation: presentation,
  1099. }
  1100. detectionLevels := make([]pipeline.AnalysisLevel, 0, 1+len(derivedLevels))
  1101. detectionLevels = append(detectionLevels, primary)
  1102. detectionLevels = append(detectionLevels, derivedLevels...)
  1103. levelSet.Detection = detectionLevels
  1104. allLevels := make([]pipeline.AnalysisLevel, 0, 1+len(derivedLevels)+len(supportLevels)+1)
  1105. allLevels = append(allLevels, primary)
  1106. allLevels = append(allLevels, derivedLevels...)
  1107. allLevels = append(allLevels, supportLevels...)
  1108. if presentation.Name != "" {
  1109. allLevels = append(allLevels, presentation)
  1110. }
  1111. levelSet.All = allLevels
  1112. return surveillancePlan{
  1113. Primary: primary,
  1114. Levels: levels,
  1115. LevelSet: levelSet,
  1116. Presentation: presentation,
  1117. Context: context,
  1118. DetectionPolicy: detectionPolicy,
  1119. Specs: specs,
  1120. }
  1121. }
  1122. func sameIQBuffer(a []complex64, b []complex64) bool {
  1123. if len(a) != len(b) {
  1124. return false
  1125. }
  1126. if len(a) == 0 {
  1127. return true
  1128. }
  1129. return &a[0] == &b[0]
  1130. }
  1131. func markWorkItemsStatus(items []pipeline.RefinementWorkItem, from string, to string, reason string) {
  1132. for i := range items {
  1133. if items[i].Status != from {
  1134. continue
  1135. }
  1136. items[i].Status = to
  1137. if reason != "" {
  1138. items[i].Reason = reason
  1139. }
  1140. }
  1141. }
  1142. func markWorkItemsCompleted(items []pipeline.RefinementWorkItem, candidates []pipeline.Candidate) {
  1143. if len(items) == 0 || len(candidates) == 0 {
  1144. return
  1145. }
  1146. done := map[int64]struct{}{}
  1147. for _, cand := range candidates {
  1148. if cand.ID != 0 {
  1149. done[cand.ID] = struct{}{}
  1150. }
  1151. }
  1152. for i := range items {
  1153. if _, ok := done[items[i].Candidate.ID]; !ok {
  1154. continue
  1155. }
  1156. items[i].Status = pipeline.RefinementStatusCompleted
  1157. items[i].Reason = pipeline.RefinementReasonCompleted
  1158. }
  1159. }
  1160. func (rt *dspRuntime) setArbitration(policy pipeline.Policy, budget pipeline.BudgetModel, admission pipeline.RefinementAdmission, queue pipeline.DecisionQueueStats) {
  1161. rt.arbitration = pipeline.BuildArbitrationState(policy, budget, admission, queue)
  1162. }