Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

564 lines
19KB

  1. package pipeline
  2. import (
  3. "sort"
  4. "strconv"
  5. "strings"
  6. "time"
  7. )
  8. type DecisionQueueStats struct {
  9. RecordQueued int `json:"record_queued"`
  10. DecodeQueued int `json:"decode_queued"`
  11. RecordSelected int `json:"record_selected"`
  12. DecodeSelected int `json:"decode_selected"`
  13. RecordActive int `json:"record_active"`
  14. DecodeActive int `json:"decode_active"`
  15. RecordOldestS float64 `json:"record_oldest_sec"`
  16. DecodeOldestS float64 `json:"decode_oldest_sec"`
  17. RecordBudget int `json:"record_budget"`
  18. DecodeBudget int `json:"decode_budget"`
  19. HoldMs int `json:"hold_ms"`
  20. DecisionHoldMs int `json:"decision_hold_ms,omitempty"`
  21. RecordHoldMs int `json:"record_hold_ms"`
  22. DecodeHoldMs int `json:"decode_hold_ms"`
  23. RecordDropped int `json:"record_dropped"`
  24. DecodeDropped int `json:"decode_dropped"`
  25. RecordHoldSelected int `json:"record_hold_selected"`
  26. DecodeHoldSelected int `json:"decode_hold_selected"`
  27. RecordHoldProtected int `json:"record_hold_protected"`
  28. DecodeHoldProtected int `json:"decode_hold_protected"`
  29. RecordHoldExpired int `json:"record_hold_expired"`
  30. DecodeHoldExpired int `json:"decode_hold_expired"`
  31. RecordHoldDisplaced int `json:"record_hold_displaced"`
  32. DecodeHoldDisplaced int `json:"decode_hold_displaced"`
  33. RecordOpportunistic int `json:"record_opportunistic"`
  34. DecodeOpportunistic int `json:"decode_opportunistic"`
  35. RecordDisplaced int `json:"record_displaced"`
  36. DecodeDisplaced int `json:"decode_displaced"`
  37. RecordDisplacedByHold int `json:"record_displaced_by_hold,omitempty"`
  38. DecodeDisplacedByHold int `json:"decode_displaced_by_hold,omitempty"`
  39. }
  40. type queuedDecision struct {
  41. ID int64
  42. SNRDb float64
  43. Hint string
  44. Class string
  45. WindowTag string
  46. WindowZone string
  47. WindowBias float64
  48. FirstSeen time.Time
  49. LastSeen time.Time
  50. }
  51. type queueSelection struct {
  52. selected map[int64]struct{}
  53. held map[int64]struct{}
  54. protected map[int64]struct{}
  55. displacedByHold map[int64]struct{}
  56. displaced map[int64]struct{}
  57. opportunistic map[int64]struct{}
  58. expired map[int64]struct{}
  59. scores map[int64]float64
  60. tiers map[int64]string
  61. families map[int64]string
  62. familyRanks map[int64]int
  63. tierFloors map[int64]string
  64. windowTags map[int64]string
  65. windowZones map[int64]string
  66. minScore float64
  67. maxScore float64
  68. cutoff float64
  69. }
  70. type decisionQueues struct {
  71. record map[int64]*queuedDecision
  72. decode map[int64]*queuedDecision
  73. recordHold map[int64]time.Time
  74. decodeHold map[int64]time.Time
  75. }
  76. func newDecisionQueues() *decisionQueues {
  77. return &decisionQueues{
  78. record: map[int64]*queuedDecision{},
  79. decode: map[int64]*queuedDecision{},
  80. recordHold: map[int64]time.Time{},
  81. decodeHold: map[int64]time.Time{},
  82. }
  83. }
  84. func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats {
  85. if dq == nil {
  86. return DecisionQueueStats{}
  87. }
  88. holdPolicy := HoldPolicyFromPolicy(policy)
  89. recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond
  90. decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond
  91. recSeen := map[int64]bool{}
  92. decSeen := map[int64]bool{}
  93. for i := range decisions {
  94. id := decisions[i].Candidate.ID
  95. if id == 0 {
  96. continue
  97. }
  98. if decisions[i].ShouldRecord {
  99. qd := dq.record[id]
  100. if qd == nil {
  101. qd = &queuedDecision{ID: id, FirstSeen: now}
  102. dq.record[id] = qd
  103. }
  104. qd.SNRDb = decisions[i].Candidate.SNRDb
  105. qd.Hint = decisions[i].Candidate.Hint
  106. qd.Class = decisions[i].Class
  107. qd.WindowTag = windowTagForDecision(decisions[i], "record")
  108. qd.WindowZone = windowZoneForDecision(decisions[i], "record")
  109. qd.WindowBias = decisions[i].MonitorBias + decisions[i].RecordBias
  110. qd.LastSeen = now
  111. recSeen[id] = true
  112. }
  113. if decisions[i].ShouldAutoDecode {
  114. qd := dq.decode[id]
  115. if qd == nil {
  116. qd = &queuedDecision{ID: id, FirstSeen: now}
  117. dq.decode[id] = qd
  118. }
  119. qd.SNRDb = decisions[i].Candidate.SNRDb
  120. qd.Hint = decisions[i].Candidate.Hint
  121. qd.Class = decisions[i].Class
  122. qd.WindowTag = windowTagForDecision(decisions[i], "decode")
  123. qd.WindowZone = windowZoneForDecision(decisions[i], "decode")
  124. qd.WindowBias = decisions[i].MonitorBias + decisions[i].DecodeBias
  125. qd.LastSeen = now
  126. decSeen[id] = true
  127. }
  128. }
  129. for id := range dq.record {
  130. if !recSeen[id] {
  131. delete(dq.record, id)
  132. }
  133. }
  134. for id := range dq.decode {
  135. if !decSeen[id] {
  136. delete(dq.decode, id)
  137. }
  138. }
  139. recExpired := expireHold(dq.recordHold, now)
  140. decExpired := expireHold(dq.decodeHold, now)
  141. recordBudget := budgetQueueLimit(budget.Record)
  142. decodeBudget := budgetQueueLimit(budget.Decode)
  143. recSelected := selectQueued("record", dq.record, dq.recordHold, recordBudget, recordHold, now, policy, recExpired)
  144. decSelected := selectQueued("decode", dq.decode, dq.decodeHold, decodeBudget, decodeHold, now, policy, decExpired)
  145. recPressure := buildQueuePressure(budget.Record, len(dq.record), len(recSelected.selected), len(dq.recordHold))
  146. decPressure := buildQueuePressure(budget.Decode, len(dq.decode), len(decSelected.selected), len(dq.decodeHold))
  147. recPressureTag := pressureReasonTag(recPressure)
  148. decPressureTag := pressureReasonTag(decPressure)
  149. stats := DecisionQueueStats{
  150. RecordQueued: len(dq.record),
  151. DecodeQueued: len(dq.decode),
  152. RecordSelected: len(recSelected.selected),
  153. DecodeSelected: len(decSelected.selected),
  154. RecordActive: len(dq.recordHold),
  155. DecodeActive: len(dq.decodeHold),
  156. RecordOldestS: oldestAge(dq.record, now),
  157. DecodeOldestS: oldestAge(dq.decode, now),
  158. RecordBudget: recordBudget,
  159. DecodeBudget: decodeBudget,
  160. HoldMs: holdPolicy.BaseMs,
  161. DecisionHoldMs: holdPolicy.BaseMs,
  162. RecordHoldMs: holdPolicy.RecordMs,
  163. DecodeHoldMs: holdPolicy.DecodeMs,
  164. RecordHoldSelected: len(recSelected.held) - len(recSelected.displaced),
  165. DecodeHoldSelected: len(decSelected.held) - len(decSelected.displaced),
  166. RecordHoldProtected: len(recSelected.protected),
  167. DecodeHoldProtected: len(decSelected.protected),
  168. RecordHoldExpired: len(recExpired),
  169. DecodeHoldExpired: len(decExpired),
  170. RecordHoldDisplaced: len(recSelected.displaced),
  171. DecodeHoldDisplaced: len(decSelected.displaced),
  172. RecordOpportunistic: len(recSelected.opportunistic),
  173. DecodeOpportunistic: len(decSelected.opportunistic),
  174. RecordDisplaced: len(recSelected.displacedByHold),
  175. DecodeDisplaced: len(decSelected.displacedByHold),
  176. RecordDisplacedByHold: len(recSelected.displacedByHold),
  177. DecodeDisplacedByHold: len(decSelected.displacedByHold),
  178. }
  179. for i := range decisions {
  180. id := decisions[i].Candidate.ID
  181. if decisions[i].ShouldRecord {
  182. decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source, recPressureTag)
  183. if _, ok := recSelected.selected[id]; !ok {
  184. decisions[i].ShouldRecord = false
  185. extras := []string{recPressureTag, "pressure:budget", "budget:" + slugToken(budget.Record.Source)}
  186. if _, ok := recSelected.displaced[id]; ok {
  187. extras = []string{recPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Record.Source)}
  188. } else if _, ok := recSelected.displacedByHold[id]; ok {
  189. extras = []string{recPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Record.Source)}
  190. } else if _, ok := recSelected.expired[id]; ok {
  191. extras = append(extras, ReasonTagHoldExpired)
  192. }
  193. decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, extras...)
  194. stats.RecordDropped++
  195. }
  196. }
  197. if decisions[i].ShouldAutoDecode {
  198. decisions[i].DecodeAdmission = buildQueueAdmission("decode", id, decSelected, policy, holdPolicy, budget.Decode.Source, decPressureTag)
  199. if _, ok := decSelected.selected[id]; !ok {
  200. decisions[i].ShouldAutoDecode = false
  201. if decisions[i].Reason == "" {
  202. extras := []string{decPressureTag, "pressure:budget", "budget:" + slugToken(budget.Decode.Source)}
  203. if _, ok := decSelected.displaced[id]; ok {
  204. extras = []string{decPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Decode.Source)}
  205. } else if _, ok := decSelected.displacedByHold[id]; ok {
  206. extras = []string{decPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Decode.Source)}
  207. } else if _, ok := decSelected.expired[id]; ok {
  208. extras = append(extras, ReasonTagHoldExpired)
  209. }
  210. decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, extras...)
  211. }
  212. stats.DecodeDropped++
  213. }
  214. }
  215. }
  216. return stats
  217. }
  218. func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy, expired map[int64]struct{}) queueSelection {
  219. selection := queueSelection{
  220. selected: map[int64]struct{}{},
  221. held: map[int64]struct{}{},
  222. protected: map[int64]struct{}{},
  223. displacedByHold: map[int64]struct{}{},
  224. displaced: map[int64]struct{}{},
  225. opportunistic: map[int64]struct{}{},
  226. expired: map[int64]struct{}{},
  227. scores: map[int64]float64{},
  228. tiers: map[int64]string{},
  229. families: map[int64]string{},
  230. familyRanks: map[int64]int{},
  231. tierFloors: map[int64]string{},
  232. windowTags: map[int64]string{},
  233. windowZones: map[int64]string{},
  234. }
  235. if len(queue) == 0 {
  236. return selection
  237. }
  238. for id := range expired {
  239. selection.expired[id] = struct{}{}
  240. }
  241. type scored struct {
  242. id int64
  243. score float64
  244. }
  245. scoredList := make([]scored, 0, len(queue))
  246. for id, qd := range queue {
  247. age := now.Sub(qd.FirstSeen).Seconds()
  248. boost := age / 2.0
  249. if boost > 5 {
  250. boost = 5
  251. }
  252. hint := qd.Hint
  253. if hint == "" {
  254. hint = qd.Class
  255. }
  256. policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName)
  257. family, familyRank := signalPriorityMatch(policy, qd.Hint, qd.Class)
  258. selection.families[id] = family
  259. selection.familyRanks[id] = familyRank
  260. selection.tierFloors[id] = signalPriorityTierFloor(familyRank)
  261. if qd.WindowTag != "" {
  262. selection.windowTags[id] = qd.WindowTag
  263. }
  264. if qd.WindowZone != "" {
  265. selection.windowZones[id] = qd.WindowZone
  266. }
  267. score := qd.SNRDb + boost + policyBoost + qd.WindowBias
  268. selection.scores[id] = score
  269. if len(scoredList) == 0 || score < selection.minScore {
  270. selection.minScore = score
  271. }
  272. if len(scoredList) == 0 || score > selection.maxScore {
  273. selection.maxScore = score
  274. }
  275. scoredList = append(scoredList, scored{id: id, score: score})
  276. }
  277. sort.Slice(scoredList, func(i, j int) bool {
  278. return scoredList[i].score > scoredList[j].score
  279. })
  280. for id, score := range selection.scores {
  281. baseTier := PriorityTierFromRange(score, selection.minScore, selection.maxScore)
  282. selection.tiers[id] = applyTierFloor(baseTier, selection.tierFloors[id])
  283. }
  284. limit := max
  285. if limit <= 0 || limit > len(scoredList) {
  286. limit = len(scoredList)
  287. }
  288. if len(hold) > 0 && len(hold) > limit {
  289. limit = len(hold)
  290. if limit > len(scoredList) {
  291. limit = len(scoredList)
  292. }
  293. }
  294. for id := range hold {
  295. if _, ok := queue[id]; ok {
  296. selection.selected[id] = struct{}{}
  297. selection.held[id] = struct{}{}
  298. if isProtectedTier(selection.tiers[id]) {
  299. selection.protected[id] = struct{}{}
  300. }
  301. }
  302. }
  303. displaceable := buildDisplaceableHold(selection.held, selection.protected, selection.tiers, selection.scores, selection.familyRanks)
  304. for _, s := range scoredList {
  305. if _, ok := selection.selected[s.id]; ok {
  306. continue
  307. }
  308. if len(selection.selected) < limit {
  309. selection.selected[s.id] = struct{}{}
  310. continue
  311. }
  312. if len(displaceable) == 0 {
  313. continue
  314. }
  315. target := displaceable[0]
  316. if priorityTierRank(selection.tiers[s.id]) <= priorityTierRank(selection.tiers[target]) {
  317. continue
  318. }
  319. displaceable = displaceable[1:]
  320. delete(selection.selected, target)
  321. selection.displaced[target] = struct{}{}
  322. selection.selected[s.id] = struct{}{}
  323. selection.opportunistic[s.id] = struct{}{}
  324. }
  325. if holdDur > 0 {
  326. for id := range selection.displaced {
  327. delete(hold, id)
  328. }
  329. for id := range selection.selected {
  330. hold[id] = now.Add(holdDur)
  331. }
  332. }
  333. if len(selection.selected) > 0 {
  334. first := true
  335. for id := range selection.selected {
  336. score := selection.scores[id]
  337. if first || score < selection.cutoff {
  338. selection.cutoff = score
  339. first = false
  340. }
  341. }
  342. }
  343. if len(selection.selected) > 0 {
  344. for id := range selection.scores {
  345. if _, ok := selection.selected[id]; ok {
  346. continue
  347. }
  348. if _, ok := selection.displaced[id]; ok {
  349. continue
  350. }
  351. if selection.scores[id] >= selection.cutoff {
  352. selection.displacedByHold[id] = struct{}{}
  353. }
  354. }
  355. }
  356. return selection
  357. }
  358. func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{}, tiers map[int64]string, scores map[int64]float64, familyRanks map[int64]int) []int64 {
  359. type entry struct {
  360. id int64
  361. rank int
  362. familyOrder int
  363. score float64
  364. }
  365. candidates := make([]entry, 0, len(held))
  366. for id := range held {
  367. if _, ok := protected[id]; ok {
  368. continue
  369. }
  370. score := 0.0
  371. if scores != nil {
  372. score = scores[id]
  373. }
  374. familyRank := -1
  375. if familyRanks != nil {
  376. familyRank = familyRanks[id]
  377. }
  378. candidates = append(candidates, entry{
  379. id: id,
  380. rank: priorityTierRank(tiers[id]),
  381. familyOrder: familyDisplaceOrder(familyRank),
  382. score: score,
  383. })
  384. }
  385. if len(candidates) == 0 {
  386. return nil
  387. }
  388. sort.Slice(candidates, func(i, j int) bool {
  389. if candidates[i].rank == candidates[j].rank {
  390. if candidates[i].familyOrder != candidates[j].familyOrder {
  391. return candidates[i].familyOrder > candidates[j].familyOrder
  392. }
  393. return candidates[i].score < candidates[j].score
  394. }
  395. return candidates[i].rank < candidates[j].rank
  396. })
  397. out := make([]int64, 0, len(candidates))
  398. for _, c := range candidates {
  399. out = append(out, c.id)
  400. }
  401. return out
  402. }
  403. func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string, pressureTag string) *PriorityAdmission {
  404. score, ok := selection.scores[id]
  405. if !ok {
  406. return nil
  407. }
  408. windowTag := selection.windowTags[id]
  409. windowZone := selection.windowZones[id]
  410. admission := &PriorityAdmission{
  411. Basis: queueName,
  412. Score: score,
  413. Cutoff: selection.cutoff,
  414. Tier: selection.tiers[id],
  415. }
  416. zoneTag := windowZoneTag(windowZone)
  417. admission.TierFloor = selection.tierFloors[id]
  418. admission.Family = selection.families[id]
  419. admission.FamilyRank = familyRankForOutput(selection.familyRanks[id])
  420. if _, ok := selection.selected[id]; ok {
  421. if _, held := selection.held[id]; held {
  422. admission.Class = AdmissionClassHold
  423. extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)}
  424. if windowTag != "" {
  425. extras = append(extras, windowTag)
  426. }
  427. if zoneTag != "" {
  428. extras = append(extras, zoneTag)
  429. }
  430. if _, ok := selection.protected[id]; ok {
  431. extras = append(extras, ReasonTagHoldProtected)
  432. }
  433. admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, extras...)
  434. } else {
  435. admission.Class = AdmissionClassAdmit
  436. extras := []string{pressureTag, "budget:" + slugToken(budgetSource)}
  437. if windowTag != "" {
  438. extras = append(extras, windowTag)
  439. }
  440. if zoneTag != "" {
  441. extras = append(extras, zoneTag)
  442. }
  443. if _, ok := selection.opportunistic[id]; ok {
  444. extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced)
  445. }
  446. admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, extras...)
  447. }
  448. return admission
  449. }
  450. if _, ok := selection.displaced[id]; ok {
  451. admission.Class = AdmissionClassDisplace
  452. extras := []string{pressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budgetSource)}
  453. if windowTag != "" {
  454. extras = append(extras, windowTag)
  455. }
  456. if zoneTag != "" {
  457. extras = append(extras, zoneTag)
  458. }
  459. admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, extras...)
  460. return admission
  461. }
  462. if _, ok := selection.displacedByHold[id]; ok {
  463. admission.Class = AdmissionClassDisplace
  464. extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)}
  465. if windowTag != "" {
  466. extras = append(extras, windowTag)
  467. }
  468. if zoneTag != "" {
  469. extras = append(extras, zoneTag)
  470. }
  471. admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, extras...)
  472. return admission
  473. }
  474. admission.Class = AdmissionClassDefer
  475. extras := []string{pressureTag, "pressure:budget", "budget:" + slugToken(budgetSource)}
  476. if windowTag != "" {
  477. extras = append(extras, windowTag)
  478. }
  479. if zoneTag != "" {
  480. extras = append(extras, zoneTag)
  481. }
  482. if _, ok := selection.expired[id]; ok {
  483. extras = append(extras, ReasonTagHoldExpired)
  484. }
  485. admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, extras...)
  486. return admission
  487. }
  488. func windowTagForDecision(decision SignalDecision, action string) string {
  489. match := windowMatchForDecision(decision, action)
  490. if match == nil {
  491. return ""
  492. }
  493. label := strings.TrimSpace(match.Label)
  494. if label == "" {
  495. label = "index-" + strconv.Itoa(match.Index)
  496. }
  497. label = slugToken(label)
  498. if label == "" {
  499. return ""
  500. }
  501. return "window:" + label
  502. }
  503. func windowZoneForDecision(decision SignalDecision, action string) string {
  504. match := windowMatchForDecision(decision, action)
  505. if match == nil {
  506. return ""
  507. }
  508. return match.Zone
  509. }
  510. func windowMatchForDecision(decision SignalDecision, action string) *MonitorWindowMatch {
  511. switch strings.ToLower(strings.TrimSpace(action)) {
  512. case "record":
  513. if decision.RecordWindow != nil {
  514. return decision.RecordWindow
  515. }
  516. case "decode":
  517. if decision.DecodeWindow != nil {
  518. return decision.DecodeWindow
  519. }
  520. }
  521. return decision.MonitorDetail
  522. }
  523. func windowZoneTag(zone string) string {
  524. zone = slugToken(zone)
  525. if zone == "" {
  526. return ""
  527. }
  528. return "window-zone:" + zone
  529. }
  530. func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 {
  531. oldest := 0.0
  532. first := true
  533. for _, qd := range queue {
  534. age := now.Sub(qd.FirstSeen).Seconds()
  535. if first || age > oldest {
  536. oldest = age
  537. first = false
  538. }
  539. }
  540. if first {
  541. return 0
  542. }
  543. return oldest
  544. }