Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

508 строки
17KB

  1. package pipeline
  2. import (
  3. "sort"
  4. "strconv"
  5. "strings"
  6. "time"
  7. )
  8. type DecisionQueueStats struct {
  9. RecordQueued int `json:"record_queued"`
  10. DecodeQueued int `json:"decode_queued"`
  11. RecordSelected int `json:"record_selected"`
  12. DecodeSelected int `json:"decode_selected"`
  13. RecordActive int `json:"record_active"`
  14. DecodeActive int `json:"decode_active"`
  15. RecordOldestS float64 `json:"record_oldest_sec"`
  16. DecodeOldestS float64 `json:"decode_oldest_sec"`
  17. RecordBudget int `json:"record_budget"`
  18. DecodeBudget int `json:"decode_budget"`
  19. HoldMs int `json:"hold_ms"`
  20. DecisionHoldMs int `json:"decision_hold_ms,omitempty"`
  21. RecordHoldMs int `json:"record_hold_ms"`
  22. DecodeHoldMs int `json:"decode_hold_ms"`
  23. RecordDropped int `json:"record_dropped"`
  24. DecodeDropped int `json:"decode_dropped"`
  25. RecordHoldSelected int `json:"record_hold_selected"`
  26. DecodeHoldSelected int `json:"decode_hold_selected"`
  27. RecordHoldProtected int `json:"record_hold_protected"`
  28. DecodeHoldProtected int `json:"decode_hold_protected"`
  29. RecordHoldExpired int `json:"record_hold_expired"`
  30. DecodeHoldExpired int `json:"decode_hold_expired"`
  31. RecordHoldDisplaced int `json:"record_hold_displaced"`
  32. DecodeHoldDisplaced int `json:"decode_hold_displaced"`
  33. RecordOpportunistic int `json:"record_opportunistic"`
  34. DecodeOpportunistic int `json:"decode_opportunistic"`
  35. RecordDisplaced int `json:"record_displaced"`
  36. DecodeDisplaced int `json:"decode_displaced"`
  37. RecordDisplacedByHold int `json:"record_displaced_by_hold,omitempty"`
  38. DecodeDisplacedByHold int `json:"decode_displaced_by_hold,omitempty"`
  39. }
  40. type queuedDecision struct {
  41. ID int64
  42. SNRDb float64
  43. Hint string
  44. Class string
  45. WindowTag string
  46. WindowBias float64
  47. FirstSeen time.Time
  48. LastSeen time.Time
  49. }
  50. type queueSelection struct {
  51. selected map[int64]struct{}
  52. held map[int64]struct{}
  53. protected map[int64]struct{}
  54. displacedByHold map[int64]struct{}
  55. displaced map[int64]struct{}
  56. opportunistic map[int64]struct{}
  57. expired map[int64]struct{}
  58. scores map[int64]float64
  59. tiers map[int64]string
  60. families map[int64]string
  61. familyRanks map[int64]int
  62. tierFloors map[int64]string
  63. windowTags map[int64]string
  64. minScore float64
  65. maxScore float64
  66. cutoff float64
  67. }
  68. type decisionQueues struct {
  69. record map[int64]*queuedDecision
  70. decode map[int64]*queuedDecision
  71. recordHold map[int64]time.Time
  72. decodeHold map[int64]time.Time
  73. }
  74. func newDecisionQueues() *decisionQueues {
  75. return &decisionQueues{
  76. record: map[int64]*queuedDecision{},
  77. decode: map[int64]*queuedDecision{},
  78. recordHold: map[int64]time.Time{},
  79. decodeHold: map[int64]time.Time{},
  80. }
  81. }
  82. func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats {
  83. if dq == nil {
  84. return DecisionQueueStats{}
  85. }
  86. holdPolicy := HoldPolicyFromPolicy(policy)
  87. recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond
  88. decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond
  89. recSeen := map[int64]bool{}
  90. decSeen := map[int64]bool{}
  91. for i := range decisions {
  92. id := decisions[i].Candidate.ID
  93. if id == 0 {
  94. continue
  95. }
  96. if decisions[i].ShouldRecord {
  97. qd := dq.record[id]
  98. if qd == nil {
  99. qd = &queuedDecision{ID: id, FirstSeen: now}
  100. dq.record[id] = qd
  101. }
  102. qd.SNRDb = decisions[i].Candidate.SNRDb
  103. qd.Hint = decisions[i].Candidate.Hint
  104. qd.Class = decisions[i].Class
  105. qd.WindowTag = windowTagForDecision(decisions[i])
  106. qd.WindowBias = decisions[i].MonitorBias
  107. qd.LastSeen = now
  108. recSeen[id] = true
  109. }
  110. if decisions[i].ShouldAutoDecode {
  111. qd := dq.decode[id]
  112. if qd == nil {
  113. qd = &queuedDecision{ID: id, FirstSeen: now}
  114. dq.decode[id] = qd
  115. }
  116. qd.SNRDb = decisions[i].Candidate.SNRDb
  117. qd.Hint = decisions[i].Candidate.Hint
  118. qd.Class = decisions[i].Class
  119. qd.WindowTag = windowTagForDecision(decisions[i])
  120. qd.WindowBias = decisions[i].MonitorBias
  121. qd.LastSeen = now
  122. decSeen[id] = true
  123. }
  124. }
  125. for id := range dq.record {
  126. if !recSeen[id] {
  127. delete(dq.record, id)
  128. }
  129. }
  130. for id := range dq.decode {
  131. if !decSeen[id] {
  132. delete(dq.decode, id)
  133. }
  134. }
  135. recExpired := expireHold(dq.recordHold, now)
  136. decExpired := expireHold(dq.decodeHold, now)
  137. recordBudget := budgetQueueLimit(budget.Record)
  138. decodeBudget := budgetQueueLimit(budget.Decode)
  139. recSelected := selectQueued("record", dq.record, dq.recordHold, recordBudget, recordHold, now, policy, recExpired)
  140. decSelected := selectQueued("decode", dq.decode, dq.decodeHold, decodeBudget, decodeHold, now, policy, decExpired)
  141. recPressure := buildQueuePressure(budget.Record, len(dq.record), len(recSelected.selected), len(dq.recordHold))
  142. decPressure := buildQueuePressure(budget.Decode, len(dq.decode), len(decSelected.selected), len(dq.decodeHold))
  143. recPressureTag := pressureReasonTag(recPressure)
  144. decPressureTag := pressureReasonTag(decPressure)
  145. stats := DecisionQueueStats{
  146. RecordQueued: len(dq.record),
  147. DecodeQueued: len(dq.decode),
  148. RecordSelected: len(recSelected.selected),
  149. DecodeSelected: len(decSelected.selected),
  150. RecordActive: len(dq.recordHold),
  151. DecodeActive: len(dq.decodeHold),
  152. RecordOldestS: oldestAge(dq.record, now),
  153. DecodeOldestS: oldestAge(dq.decode, now),
  154. RecordBudget: recordBudget,
  155. DecodeBudget: decodeBudget,
  156. HoldMs: holdPolicy.BaseMs,
  157. DecisionHoldMs: holdPolicy.BaseMs,
  158. RecordHoldMs: holdPolicy.RecordMs,
  159. DecodeHoldMs: holdPolicy.DecodeMs,
  160. RecordHoldSelected: len(recSelected.held) - len(recSelected.displaced),
  161. DecodeHoldSelected: len(decSelected.held) - len(decSelected.displaced),
  162. RecordHoldProtected: len(recSelected.protected),
  163. DecodeHoldProtected: len(decSelected.protected),
  164. RecordHoldExpired: len(recExpired),
  165. DecodeHoldExpired: len(decExpired),
  166. RecordHoldDisplaced: len(recSelected.displaced),
  167. DecodeHoldDisplaced: len(decSelected.displaced),
  168. RecordOpportunistic: len(recSelected.opportunistic),
  169. DecodeOpportunistic: len(decSelected.opportunistic),
  170. RecordDisplaced: len(recSelected.displacedByHold),
  171. DecodeDisplaced: len(decSelected.displacedByHold),
  172. RecordDisplacedByHold: len(recSelected.displacedByHold),
  173. DecodeDisplacedByHold: len(decSelected.displacedByHold),
  174. }
  175. for i := range decisions {
  176. id := decisions[i].Candidate.ID
  177. if decisions[i].ShouldRecord {
  178. decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source, recPressureTag)
  179. if _, ok := recSelected.selected[id]; !ok {
  180. decisions[i].ShouldRecord = false
  181. extras := []string{recPressureTag, "pressure:budget", "budget:" + slugToken(budget.Record.Source)}
  182. if _, ok := recSelected.displaced[id]; ok {
  183. extras = []string{recPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Record.Source)}
  184. } else if _, ok := recSelected.displacedByHold[id]; ok {
  185. extras = []string{recPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Record.Source)}
  186. } else if _, ok := recSelected.expired[id]; ok {
  187. extras = append(extras, ReasonTagHoldExpired)
  188. }
  189. decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, extras...)
  190. stats.RecordDropped++
  191. }
  192. }
  193. if decisions[i].ShouldAutoDecode {
  194. decisions[i].DecodeAdmission = buildQueueAdmission("decode", id, decSelected, policy, holdPolicy, budget.Decode.Source, decPressureTag)
  195. if _, ok := decSelected.selected[id]; !ok {
  196. decisions[i].ShouldAutoDecode = false
  197. if decisions[i].Reason == "" {
  198. extras := []string{decPressureTag, "pressure:budget", "budget:" + slugToken(budget.Decode.Source)}
  199. if _, ok := decSelected.displaced[id]; ok {
  200. extras = []string{decPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Decode.Source)}
  201. } else if _, ok := decSelected.displacedByHold[id]; ok {
  202. extras = []string{decPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Decode.Source)}
  203. } else if _, ok := decSelected.expired[id]; ok {
  204. extras = append(extras, ReasonTagHoldExpired)
  205. }
  206. decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, extras...)
  207. }
  208. stats.DecodeDropped++
  209. }
  210. }
  211. }
  212. return stats
  213. }
  214. func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy, expired map[int64]struct{}) queueSelection {
  215. selection := queueSelection{
  216. selected: map[int64]struct{}{},
  217. held: map[int64]struct{}{},
  218. protected: map[int64]struct{}{},
  219. displacedByHold: map[int64]struct{}{},
  220. displaced: map[int64]struct{}{},
  221. opportunistic: map[int64]struct{}{},
  222. expired: map[int64]struct{}{},
  223. scores: map[int64]float64{},
  224. tiers: map[int64]string{},
  225. families: map[int64]string{},
  226. familyRanks: map[int64]int{},
  227. tierFloors: map[int64]string{},
  228. windowTags: map[int64]string{},
  229. }
  230. if len(queue) == 0 {
  231. return selection
  232. }
  233. for id := range expired {
  234. selection.expired[id] = struct{}{}
  235. }
  236. type scored struct {
  237. id int64
  238. score float64
  239. }
  240. scoredList := make([]scored, 0, len(queue))
  241. for id, qd := range queue {
  242. age := now.Sub(qd.FirstSeen).Seconds()
  243. boost := age / 2.0
  244. if boost > 5 {
  245. boost = 5
  246. }
  247. hint := qd.Hint
  248. if hint == "" {
  249. hint = qd.Class
  250. }
  251. policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName)
  252. family, familyRank := signalPriorityMatch(policy, qd.Hint, qd.Class)
  253. selection.families[id] = family
  254. selection.familyRanks[id] = familyRank
  255. selection.tierFloors[id] = signalPriorityTierFloor(familyRank)
  256. if qd.WindowTag != "" {
  257. selection.windowTags[id] = qd.WindowTag
  258. }
  259. score := qd.SNRDb + boost + policyBoost + qd.WindowBias
  260. selection.scores[id] = score
  261. if len(scoredList) == 0 || score < selection.minScore {
  262. selection.minScore = score
  263. }
  264. if len(scoredList) == 0 || score > selection.maxScore {
  265. selection.maxScore = score
  266. }
  267. scoredList = append(scoredList, scored{id: id, score: score})
  268. }
  269. sort.Slice(scoredList, func(i, j int) bool {
  270. return scoredList[i].score > scoredList[j].score
  271. })
  272. for id, score := range selection.scores {
  273. baseTier := PriorityTierFromRange(score, selection.minScore, selection.maxScore)
  274. selection.tiers[id] = applyTierFloor(baseTier, selection.tierFloors[id])
  275. }
  276. limit := max
  277. if limit <= 0 || limit > len(scoredList) {
  278. limit = len(scoredList)
  279. }
  280. if len(hold) > 0 && len(hold) > limit {
  281. limit = len(hold)
  282. if limit > len(scoredList) {
  283. limit = len(scoredList)
  284. }
  285. }
  286. for id := range hold {
  287. if _, ok := queue[id]; ok {
  288. selection.selected[id] = struct{}{}
  289. selection.held[id] = struct{}{}
  290. if isProtectedTier(selection.tiers[id]) {
  291. selection.protected[id] = struct{}{}
  292. }
  293. }
  294. }
  295. displaceable := buildDisplaceableHold(selection.held, selection.protected, selection.tiers, selection.scores, selection.familyRanks)
  296. for _, s := range scoredList {
  297. if _, ok := selection.selected[s.id]; ok {
  298. continue
  299. }
  300. if len(selection.selected) < limit {
  301. selection.selected[s.id] = struct{}{}
  302. continue
  303. }
  304. if len(displaceable) == 0 {
  305. continue
  306. }
  307. target := displaceable[0]
  308. if priorityTierRank(selection.tiers[s.id]) <= priorityTierRank(selection.tiers[target]) {
  309. continue
  310. }
  311. displaceable = displaceable[1:]
  312. delete(selection.selected, target)
  313. selection.displaced[target] = struct{}{}
  314. selection.selected[s.id] = struct{}{}
  315. selection.opportunistic[s.id] = struct{}{}
  316. }
  317. if holdDur > 0 {
  318. for id := range selection.displaced {
  319. delete(hold, id)
  320. }
  321. for id := range selection.selected {
  322. hold[id] = now.Add(holdDur)
  323. }
  324. }
  325. if len(selection.selected) > 0 {
  326. first := true
  327. for id := range selection.selected {
  328. score := selection.scores[id]
  329. if first || score < selection.cutoff {
  330. selection.cutoff = score
  331. first = false
  332. }
  333. }
  334. }
  335. if len(selection.selected) > 0 {
  336. for id := range selection.scores {
  337. if _, ok := selection.selected[id]; ok {
  338. continue
  339. }
  340. if _, ok := selection.displaced[id]; ok {
  341. continue
  342. }
  343. if selection.scores[id] >= selection.cutoff {
  344. selection.displacedByHold[id] = struct{}{}
  345. }
  346. }
  347. }
  348. return selection
  349. }
  350. func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{}, tiers map[int64]string, scores map[int64]float64, familyRanks map[int64]int) []int64 {
  351. type entry struct {
  352. id int64
  353. rank int
  354. familyOrder int
  355. score float64
  356. }
  357. candidates := make([]entry, 0, len(held))
  358. for id := range held {
  359. if _, ok := protected[id]; ok {
  360. continue
  361. }
  362. score := 0.0
  363. if scores != nil {
  364. score = scores[id]
  365. }
  366. familyRank := -1
  367. if familyRanks != nil {
  368. familyRank = familyRanks[id]
  369. }
  370. candidates = append(candidates, entry{
  371. id: id,
  372. rank: priorityTierRank(tiers[id]),
  373. familyOrder: familyDisplaceOrder(familyRank),
  374. score: score,
  375. })
  376. }
  377. if len(candidates) == 0 {
  378. return nil
  379. }
  380. sort.Slice(candidates, func(i, j int) bool {
  381. if candidates[i].rank == candidates[j].rank {
  382. if candidates[i].familyOrder != candidates[j].familyOrder {
  383. return candidates[i].familyOrder > candidates[j].familyOrder
  384. }
  385. return candidates[i].score < candidates[j].score
  386. }
  387. return candidates[i].rank < candidates[j].rank
  388. })
  389. out := make([]int64, 0, len(candidates))
  390. for _, c := range candidates {
  391. out = append(out, c.id)
  392. }
  393. return out
  394. }
  395. func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string, pressureTag string) *PriorityAdmission {
  396. score, ok := selection.scores[id]
  397. if !ok {
  398. return nil
  399. }
  400. windowTag := selection.windowTags[id]
  401. admission := &PriorityAdmission{
  402. Basis: queueName,
  403. Score: score,
  404. Cutoff: selection.cutoff,
  405. Tier: selection.tiers[id],
  406. }
  407. admission.TierFloor = selection.tierFloors[id]
  408. admission.Family = selection.families[id]
  409. admission.FamilyRank = familyRankForOutput(selection.familyRanks[id])
  410. if _, ok := selection.selected[id]; ok {
  411. if _, held := selection.held[id]; held {
  412. admission.Class = AdmissionClassHold
  413. extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)}
  414. if windowTag != "" {
  415. extras = append(extras, windowTag)
  416. }
  417. if _, ok := selection.protected[id]; ok {
  418. extras = append(extras, ReasonTagHoldProtected)
  419. }
  420. admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, extras...)
  421. } else {
  422. admission.Class = AdmissionClassAdmit
  423. extras := []string{pressureTag, "budget:" + slugToken(budgetSource)}
  424. if windowTag != "" {
  425. extras = append(extras, windowTag)
  426. }
  427. if _, ok := selection.opportunistic[id]; ok {
  428. extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced)
  429. }
  430. admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, extras...)
  431. }
  432. return admission
  433. }
  434. if _, ok := selection.displaced[id]; ok {
  435. admission.Class = AdmissionClassDisplace
  436. extras := []string{pressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budgetSource)}
  437. if windowTag != "" {
  438. extras = append(extras, windowTag)
  439. }
  440. admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, extras...)
  441. return admission
  442. }
  443. if _, ok := selection.displacedByHold[id]; ok {
  444. admission.Class = AdmissionClassDisplace
  445. extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)}
  446. if windowTag != "" {
  447. extras = append(extras, windowTag)
  448. }
  449. admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, extras...)
  450. return admission
  451. }
  452. admission.Class = AdmissionClassDefer
  453. extras := []string{pressureTag, "pressure:budget", "budget:" + slugToken(budgetSource)}
  454. if windowTag != "" {
  455. extras = append(extras, windowTag)
  456. }
  457. if _, ok := selection.expired[id]; ok {
  458. extras = append(extras, ReasonTagHoldExpired)
  459. }
  460. admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, extras...)
  461. return admission
  462. }
  463. func windowTagForDecision(decision SignalDecision) string {
  464. if decision.MonitorDetail == nil {
  465. return ""
  466. }
  467. label := strings.TrimSpace(decision.MonitorDetail.Label)
  468. if label == "" {
  469. label = "index-" + strconv.Itoa(decision.MonitorDetail.Index)
  470. }
  471. label = slugToken(label)
  472. if label == "" {
  473. return ""
  474. }
  475. return "window:" + label
  476. }
  477. func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 {
  478. oldest := 0.0
  479. first := true
  480. for _, qd := range queue {
  481. age := now.Sub(qd.FirstSeen).Seconds()
  482. if first || age > oldest {
  483. oldest = age
  484. first = false
  485. }
  486. }
  487. if first {
  488. return 0
  489. }
  490. return oldest
  491. }