Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

462 lines
16KB

  1. package pipeline
  2. import (
  3. "sort"
  4. "time"
  5. )
  6. type DecisionQueueStats struct {
  7. RecordQueued int `json:"record_queued"`
  8. DecodeQueued int `json:"decode_queued"`
  9. RecordSelected int `json:"record_selected"`
  10. DecodeSelected int `json:"decode_selected"`
  11. RecordActive int `json:"record_active"`
  12. DecodeActive int `json:"decode_active"`
  13. RecordOldestS float64 `json:"record_oldest_sec"`
  14. DecodeOldestS float64 `json:"decode_oldest_sec"`
  15. RecordBudget int `json:"record_budget"`
  16. DecodeBudget int `json:"decode_budget"`
  17. HoldMs int `json:"hold_ms"`
  18. DecisionHoldMs int `json:"decision_hold_ms,omitempty"`
  19. RecordHoldMs int `json:"record_hold_ms"`
  20. DecodeHoldMs int `json:"decode_hold_ms"`
  21. RecordDropped int `json:"record_dropped"`
  22. DecodeDropped int `json:"decode_dropped"`
  23. RecordHoldSelected int `json:"record_hold_selected"`
  24. DecodeHoldSelected int `json:"decode_hold_selected"`
  25. RecordHoldProtected int `json:"record_hold_protected"`
  26. DecodeHoldProtected int `json:"decode_hold_protected"`
  27. RecordHoldExpired int `json:"record_hold_expired"`
  28. DecodeHoldExpired int `json:"decode_hold_expired"`
  29. RecordHoldDisplaced int `json:"record_hold_displaced"`
  30. DecodeHoldDisplaced int `json:"decode_hold_displaced"`
  31. RecordOpportunistic int `json:"record_opportunistic"`
  32. DecodeOpportunistic int `json:"decode_opportunistic"`
  33. RecordDisplaced int `json:"record_displaced"`
  34. DecodeDisplaced int `json:"decode_displaced"`
  35. RecordDisplacedByHold int `json:"record_displaced_by_hold,omitempty"`
  36. DecodeDisplacedByHold int `json:"decode_displaced_by_hold,omitempty"`
  37. }
  38. type queuedDecision struct {
  39. ID int64
  40. SNRDb float64
  41. Hint string
  42. Class string
  43. FirstSeen time.Time
  44. LastSeen time.Time
  45. }
  46. type queueSelection struct {
  47. selected map[int64]struct{}
  48. held map[int64]struct{}
  49. protected map[int64]struct{}
  50. displacedByHold map[int64]struct{}
  51. displaced map[int64]struct{}
  52. opportunistic map[int64]struct{}
  53. expired map[int64]struct{}
  54. scores map[int64]float64
  55. tiers map[int64]string
  56. families map[int64]string
  57. familyRanks map[int64]int
  58. tierFloors map[int64]string
  59. minScore float64
  60. maxScore float64
  61. cutoff float64
  62. }
  63. type decisionQueues struct {
  64. record map[int64]*queuedDecision
  65. decode map[int64]*queuedDecision
  66. recordHold map[int64]time.Time
  67. decodeHold map[int64]time.Time
  68. }
  69. func newDecisionQueues() *decisionQueues {
  70. return &decisionQueues{
  71. record: map[int64]*queuedDecision{},
  72. decode: map[int64]*queuedDecision{},
  73. recordHold: map[int64]time.Time{},
  74. decodeHold: map[int64]time.Time{},
  75. }
  76. }
  77. func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats {
  78. if dq == nil {
  79. return DecisionQueueStats{}
  80. }
  81. holdPolicy := HoldPolicyFromPolicy(policy)
  82. recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond
  83. decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond
  84. recSeen := map[int64]bool{}
  85. decSeen := map[int64]bool{}
  86. for i := range decisions {
  87. id := decisions[i].Candidate.ID
  88. if id == 0 {
  89. continue
  90. }
  91. if decisions[i].ShouldRecord {
  92. qd := dq.record[id]
  93. if qd == nil {
  94. qd = &queuedDecision{ID: id, FirstSeen: now}
  95. dq.record[id] = qd
  96. }
  97. qd.SNRDb = decisions[i].Candidate.SNRDb
  98. qd.Hint = decisions[i].Candidate.Hint
  99. qd.Class = decisions[i].Class
  100. qd.LastSeen = now
  101. recSeen[id] = true
  102. }
  103. if decisions[i].ShouldAutoDecode {
  104. qd := dq.decode[id]
  105. if qd == nil {
  106. qd = &queuedDecision{ID: id, FirstSeen: now}
  107. dq.decode[id] = qd
  108. }
  109. qd.SNRDb = decisions[i].Candidate.SNRDb
  110. qd.Hint = decisions[i].Candidate.Hint
  111. qd.Class = decisions[i].Class
  112. qd.LastSeen = now
  113. decSeen[id] = true
  114. }
  115. }
  116. for id := range dq.record {
  117. if !recSeen[id] {
  118. delete(dq.record, id)
  119. }
  120. }
  121. for id := range dq.decode {
  122. if !decSeen[id] {
  123. delete(dq.decode, id)
  124. }
  125. }
  126. recExpired := expireHold(dq.recordHold, now)
  127. decExpired := expireHold(dq.decodeHold, now)
  128. recordBudget := budgetQueueLimit(budget.Record)
  129. decodeBudget := budgetQueueLimit(budget.Decode)
  130. recSelected := selectQueued("record", dq.record, dq.recordHold, recordBudget, recordHold, now, policy, recExpired)
  131. decSelected := selectQueued("decode", dq.decode, dq.decodeHold, decodeBudget, decodeHold, now, policy, decExpired)
  132. recPressure := buildQueuePressure(budget.Record, len(dq.record), len(recSelected.selected), len(dq.recordHold))
  133. decPressure := buildQueuePressure(budget.Decode, len(dq.decode), len(decSelected.selected), len(dq.decodeHold))
  134. recPressureTag := pressureReasonTag(recPressure)
  135. decPressureTag := pressureReasonTag(decPressure)
  136. stats := DecisionQueueStats{
  137. RecordQueued: len(dq.record),
  138. DecodeQueued: len(dq.decode),
  139. RecordSelected: len(recSelected.selected),
  140. DecodeSelected: len(decSelected.selected),
  141. RecordActive: len(dq.recordHold),
  142. DecodeActive: len(dq.decodeHold),
  143. RecordOldestS: oldestAge(dq.record, now),
  144. DecodeOldestS: oldestAge(dq.decode, now),
  145. RecordBudget: recordBudget,
  146. DecodeBudget: decodeBudget,
  147. HoldMs: holdPolicy.BaseMs,
  148. DecisionHoldMs: holdPolicy.BaseMs,
  149. RecordHoldMs: holdPolicy.RecordMs,
  150. DecodeHoldMs: holdPolicy.DecodeMs,
  151. RecordHoldSelected: len(recSelected.held) - len(recSelected.displaced),
  152. DecodeHoldSelected: len(decSelected.held) - len(decSelected.displaced),
  153. RecordHoldProtected: len(recSelected.protected),
  154. DecodeHoldProtected: len(decSelected.protected),
  155. RecordHoldExpired: len(recExpired),
  156. DecodeHoldExpired: len(decExpired),
  157. RecordHoldDisplaced: len(recSelected.displaced),
  158. DecodeHoldDisplaced: len(decSelected.displaced),
  159. RecordOpportunistic: len(recSelected.opportunistic),
  160. DecodeOpportunistic: len(decSelected.opportunistic),
  161. RecordDisplaced: len(recSelected.displacedByHold),
  162. DecodeDisplaced: len(decSelected.displacedByHold),
  163. RecordDisplacedByHold: len(recSelected.displacedByHold),
  164. DecodeDisplacedByHold: len(decSelected.displacedByHold),
  165. }
  166. for i := range decisions {
  167. id := decisions[i].Candidate.ID
  168. if decisions[i].ShouldRecord {
  169. decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source, recPressureTag)
  170. if _, ok := recSelected.selected[id]; !ok {
  171. decisions[i].ShouldRecord = false
  172. extras := []string{recPressureTag, "pressure:budget", "budget:" + slugToken(budget.Record.Source)}
  173. if _, ok := recSelected.displaced[id]; ok {
  174. extras = []string{recPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Record.Source)}
  175. } else if _, ok := recSelected.displacedByHold[id]; ok {
  176. extras = []string{recPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Record.Source)}
  177. } else if _, ok := recSelected.expired[id]; ok {
  178. extras = append(extras, ReasonTagHoldExpired)
  179. }
  180. decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, extras...)
  181. stats.RecordDropped++
  182. }
  183. }
  184. if decisions[i].ShouldAutoDecode {
  185. decisions[i].DecodeAdmission = buildQueueAdmission("decode", id, decSelected, policy, holdPolicy, budget.Decode.Source, decPressureTag)
  186. if _, ok := decSelected.selected[id]; !ok {
  187. decisions[i].ShouldAutoDecode = false
  188. if decisions[i].Reason == "" {
  189. extras := []string{decPressureTag, "pressure:budget", "budget:" + slugToken(budget.Decode.Source)}
  190. if _, ok := decSelected.displaced[id]; ok {
  191. extras = []string{decPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Decode.Source)}
  192. } else if _, ok := decSelected.displacedByHold[id]; ok {
  193. extras = []string{decPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Decode.Source)}
  194. } else if _, ok := decSelected.expired[id]; ok {
  195. extras = append(extras, ReasonTagHoldExpired)
  196. }
  197. decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, extras...)
  198. }
  199. stats.DecodeDropped++
  200. }
  201. }
  202. }
  203. return stats
  204. }
  205. func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy, expired map[int64]struct{}) queueSelection {
  206. selection := queueSelection{
  207. selected: map[int64]struct{}{},
  208. held: map[int64]struct{}{},
  209. protected: map[int64]struct{}{},
  210. displacedByHold: map[int64]struct{}{},
  211. displaced: map[int64]struct{}{},
  212. opportunistic: map[int64]struct{}{},
  213. expired: map[int64]struct{}{},
  214. scores: map[int64]float64{},
  215. tiers: map[int64]string{},
  216. families: map[int64]string{},
  217. familyRanks: map[int64]int{},
  218. tierFloors: map[int64]string{},
  219. }
  220. if len(queue) == 0 {
  221. return selection
  222. }
  223. for id := range expired {
  224. selection.expired[id] = struct{}{}
  225. }
  226. type scored struct {
  227. id int64
  228. score float64
  229. }
  230. scoredList := make([]scored, 0, len(queue))
  231. for id, qd := range queue {
  232. age := now.Sub(qd.FirstSeen).Seconds()
  233. boost := age / 2.0
  234. if boost > 5 {
  235. boost = 5
  236. }
  237. hint := qd.Hint
  238. if hint == "" {
  239. hint = qd.Class
  240. }
  241. policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName)
  242. family, familyRank := signalPriorityMatch(policy, qd.Hint, qd.Class)
  243. selection.families[id] = family
  244. selection.familyRanks[id] = familyRank
  245. selection.tierFloors[id] = signalPriorityTierFloor(familyRank)
  246. score := qd.SNRDb + boost + policyBoost
  247. selection.scores[id] = score
  248. if len(scoredList) == 0 || score < selection.minScore {
  249. selection.minScore = score
  250. }
  251. if len(scoredList) == 0 || score > selection.maxScore {
  252. selection.maxScore = score
  253. }
  254. scoredList = append(scoredList, scored{id: id, score: score})
  255. }
  256. sort.Slice(scoredList, func(i, j int) bool {
  257. return scoredList[i].score > scoredList[j].score
  258. })
  259. for id, score := range selection.scores {
  260. baseTier := PriorityTierFromRange(score, selection.minScore, selection.maxScore)
  261. selection.tiers[id] = applyTierFloor(baseTier, selection.tierFloors[id])
  262. }
  263. limit := max
  264. if limit <= 0 || limit > len(scoredList) {
  265. limit = len(scoredList)
  266. }
  267. if len(hold) > 0 && len(hold) > limit {
  268. limit = len(hold)
  269. if limit > len(scoredList) {
  270. limit = len(scoredList)
  271. }
  272. }
  273. for id := range hold {
  274. if _, ok := queue[id]; ok {
  275. selection.selected[id] = struct{}{}
  276. selection.held[id] = struct{}{}
  277. if isProtectedTier(selection.tiers[id]) {
  278. selection.protected[id] = struct{}{}
  279. }
  280. }
  281. }
  282. displaceable := buildDisplaceableHold(selection.held, selection.protected, selection.tiers, selection.scores, selection.familyRanks)
  283. for _, s := range scoredList {
  284. if _, ok := selection.selected[s.id]; ok {
  285. continue
  286. }
  287. if len(selection.selected) < limit {
  288. selection.selected[s.id] = struct{}{}
  289. continue
  290. }
  291. if len(displaceable) == 0 {
  292. continue
  293. }
  294. target := displaceable[0]
  295. if priorityTierRank(selection.tiers[s.id]) <= priorityTierRank(selection.tiers[target]) {
  296. continue
  297. }
  298. displaceable = displaceable[1:]
  299. delete(selection.selected, target)
  300. selection.displaced[target] = struct{}{}
  301. selection.selected[s.id] = struct{}{}
  302. selection.opportunistic[s.id] = struct{}{}
  303. }
  304. if holdDur > 0 {
  305. for id := range selection.displaced {
  306. delete(hold, id)
  307. }
  308. for id := range selection.selected {
  309. hold[id] = now.Add(holdDur)
  310. }
  311. }
  312. if len(selection.selected) > 0 {
  313. first := true
  314. for id := range selection.selected {
  315. score := selection.scores[id]
  316. if first || score < selection.cutoff {
  317. selection.cutoff = score
  318. first = false
  319. }
  320. }
  321. }
  322. if len(selection.selected) > 0 {
  323. for id := range selection.scores {
  324. if _, ok := selection.selected[id]; ok {
  325. continue
  326. }
  327. if _, ok := selection.displaced[id]; ok {
  328. continue
  329. }
  330. if selection.scores[id] >= selection.cutoff {
  331. selection.displacedByHold[id] = struct{}{}
  332. }
  333. }
  334. }
  335. return selection
  336. }
  337. func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{}, tiers map[int64]string, scores map[int64]float64, familyRanks map[int64]int) []int64 {
  338. type entry struct {
  339. id int64
  340. rank int
  341. familyOrder int
  342. score float64
  343. }
  344. candidates := make([]entry, 0, len(held))
  345. for id := range held {
  346. if _, ok := protected[id]; ok {
  347. continue
  348. }
  349. score := 0.0
  350. if scores != nil {
  351. score = scores[id]
  352. }
  353. familyRank := -1
  354. if familyRanks != nil {
  355. familyRank = familyRanks[id]
  356. }
  357. candidates = append(candidates, entry{
  358. id: id,
  359. rank: priorityTierRank(tiers[id]),
  360. familyOrder: familyDisplaceOrder(familyRank),
  361. score: score,
  362. })
  363. }
  364. if len(candidates) == 0 {
  365. return nil
  366. }
  367. sort.Slice(candidates, func(i, j int) bool {
  368. if candidates[i].rank == candidates[j].rank {
  369. if candidates[i].familyOrder != candidates[j].familyOrder {
  370. return candidates[i].familyOrder > candidates[j].familyOrder
  371. }
  372. return candidates[i].score < candidates[j].score
  373. }
  374. return candidates[i].rank < candidates[j].rank
  375. })
  376. out := make([]int64, 0, len(candidates))
  377. for _, c := range candidates {
  378. out = append(out, c.id)
  379. }
  380. return out
  381. }
  382. func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string, pressureTag string) *PriorityAdmission {
  383. score, ok := selection.scores[id]
  384. if !ok {
  385. return nil
  386. }
  387. admission := &PriorityAdmission{
  388. Basis: queueName,
  389. Score: score,
  390. Cutoff: selection.cutoff,
  391. Tier: selection.tiers[id],
  392. }
  393. admission.TierFloor = selection.tierFloors[id]
  394. admission.Family = selection.families[id]
  395. admission.FamilyRank = familyRankForOutput(selection.familyRanks[id])
  396. if _, ok := selection.selected[id]; ok {
  397. if _, held := selection.held[id]; held {
  398. admission.Class = AdmissionClassHold
  399. extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)}
  400. if _, ok := selection.protected[id]; ok {
  401. extras = append(extras, ReasonTagHoldProtected)
  402. }
  403. admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, extras...)
  404. } else {
  405. admission.Class = AdmissionClassAdmit
  406. extras := []string{pressureTag, "budget:" + slugToken(budgetSource)}
  407. if _, ok := selection.opportunistic[id]; ok {
  408. extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced)
  409. }
  410. admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, extras...)
  411. }
  412. return admission
  413. }
  414. if _, ok := selection.displaced[id]; ok {
  415. admission.Class = AdmissionClassDisplace
  416. admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:"+slugToken(budgetSource))
  417. return admission
  418. }
  419. if _, ok := selection.displacedByHold[id]; ok {
  420. admission.Class = AdmissionClassDisplace
  421. admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:"+slugToken(budgetSource))
  422. return admission
  423. }
  424. admission.Class = AdmissionClassDefer
  425. extras := []string{pressureTag, "pressure:budget", "budget:" + slugToken(budgetSource)}
  426. if _, ok := selection.expired[id]; ok {
  427. extras = append(extras, ReasonTagHoldExpired)
  428. }
  429. admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, extras...)
  430. return admission
  431. }
  432. func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 {
  433. oldest := 0.0
  434. first := true
  435. for _, qd := range queue {
  436. age := now.Sub(qd.FirstSeen).Seconds()
  437. if first || age > oldest {
  438. oldest = age
  439. first = false
  440. }
  441. }
  442. if first {
  443. return 0
  444. }
  445. return oldest
  446. }