Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

437 Zeilen
15KB

  1. package pipeline
  2. import (
  3. "sort"
  4. "time"
  5. )
  6. type DecisionQueueStats struct {
  7. RecordQueued int `json:"record_queued"`
  8. DecodeQueued int `json:"decode_queued"`
  9. RecordSelected int `json:"record_selected"`
  10. DecodeSelected int `json:"decode_selected"`
  11. RecordActive int `json:"record_active"`
  12. DecodeActive int `json:"decode_active"`
  13. RecordOldestS float64 `json:"record_oldest_sec"`
  14. DecodeOldestS float64 `json:"decode_oldest_sec"`
  15. RecordBudget int `json:"record_budget"`
  16. DecodeBudget int `json:"decode_budget"`
  17. HoldMs int `json:"hold_ms"`
  18. DecisionHoldMs int `json:"decision_hold_ms,omitempty"`
  19. RecordHoldMs int `json:"record_hold_ms"`
  20. DecodeHoldMs int `json:"decode_hold_ms"`
  21. RecordDropped int `json:"record_dropped"`
  22. DecodeDropped int `json:"decode_dropped"`
  23. RecordHoldSelected int `json:"record_hold_selected"`
  24. DecodeHoldSelected int `json:"decode_hold_selected"`
  25. RecordHoldProtected int `json:"record_hold_protected"`
  26. DecodeHoldProtected int `json:"decode_hold_protected"`
  27. RecordHoldExpired int `json:"record_hold_expired"`
  28. DecodeHoldExpired int `json:"decode_hold_expired"`
  29. RecordHoldDisplaced int `json:"record_hold_displaced"`
  30. DecodeHoldDisplaced int `json:"decode_hold_displaced"`
  31. RecordOpportunistic int `json:"record_opportunistic"`
  32. DecodeOpportunistic int `json:"decode_opportunistic"`
  33. RecordDisplaced int `json:"record_displaced"`
  34. DecodeDisplaced int `json:"decode_displaced"`
  35. RecordDisplacedByHold int `json:"record_displaced_by_hold,omitempty"`
  36. DecodeDisplacedByHold int `json:"decode_displaced_by_hold,omitempty"`
  37. }
  38. type queuedDecision struct {
  39. ID int64
  40. SNRDb float64
  41. Hint string
  42. Class string
  43. FirstSeen time.Time
  44. LastSeen time.Time
  45. }
  46. type queueSelection struct {
  47. selected map[int64]struct{}
  48. held map[int64]struct{}
  49. protected map[int64]struct{}
  50. displacedByHold map[int64]struct{}
  51. displaced map[int64]struct{}
  52. opportunistic map[int64]struct{}
  53. expired map[int64]struct{}
  54. scores map[int64]float64
  55. tiers map[int64]string
  56. minScore float64
  57. maxScore float64
  58. cutoff float64
  59. }
  60. type decisionQueues struct {
  61. record map[int64]*queuedDecision
  62. decode map[int64]*queuedDecision
  63. recordHold map[int64]time.Time
  64. decodeHold map[int64]time.Time
  65. }
  66. func newDecisionQueues() *decisionQueues {
  67. return &decisionQueues{
  68. record: map[int64]*queuedDecision{},
  69. decode: map[int64]*queuedDecision{},
  70. recordHold: map[int64]time.Time{},
  71. decodeHold: map[int64]time.Time{},
  72. }
  73. }
  74. func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats {
  75. if dq == nil {
  76. return DecisionQueueStats{}
  77. }
  78. holdPolicy := HoldPolicyFromPolicy(policy)
  79. recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond
  80. decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond
  81. recSeen := map[int64]bool{}
  82. decSeen := map[int64]bool{}
  83. for i := range decisions {
  84. id := decisions[i].Candidate.ID
  85. if id == 0 {
  86. continue
  87. }
  88. if decisions[i].ShouldRecord {
  89. qd := dq.record[id]
  90. if qd == nil {
  91. qd = &queuedDecision{ID: id, FirstSeen: now}
  92. dq.record[id] = qd
  93. }
  94. qd.SNRDb = decisions[i].Candidate.SNRDb
  95. qd.Hint = decisions[i].Candidate.Hint
  96. qd.Class = decisions[i].Class
  97. qd.LastSeen = now
  98. recSeen[id] = true
  99. }
  100. if decisions[i].ShouldAutoDecode {
  101. qd := dq.decode[id]
  102. if qd == nil {
  103. qd = &queuedDecision{ID: id, FirstSeen: now}
  104. dq.decode[id] = qd
  105. }
  106. qd.SNRDb = decisions[i].Candidate.SNRDb
  107. qd.Hint = decisions[i].Candidate.Hint
  108. qd.Class = decisions[i].Class
  109. qd.LastSeen = now
  110. decSeen[id] = true
  111. }
  112. }
  113. for id := range dq.record {
  114. if !recSeen[id] {
  115. delete(dq.record, id)
  116. }
  117. }
  118. for id := range dq.decode {
  119. if !decSeen[id] {
  120. delete(dq.decode, id)
  121. }
  122. }
  123. recExpired := expireHold(dq.recordHold, now)
  124. decExpired := expireHold(dq.decodeHold, now)
  125. recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy, recExpired)
  126. decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy, decExpired)
  127. recPressure := buildQueuePressure(budget.Record, len(dq.record), len(recSelected.selected), len(dq.recordHold))
  128. decPressure := buildQueuePressure(budget.Decode, len(dq.decode), len(decSelected.selected), len(dq.decodeHold))
  129. recPressureTag := pressureReasonTag(recPressure)
  130. decPressureTag := pressureReasonTag(decPressure)
  131. stats := DecisionQueueStats{
  132. RecordQueued: len(dq.record),
  133. DecodeQueued: len(dq.decode),
  134. RecordSelected: len(recSelected.selected),
  135. DecodeSelected: len(decSelected.selected),
  136. RecordActive: len(dq.recordHold),
  137. DecodeActive: len(dq.decodeHold),
  138. RecordOldestS: oldestAge(dq.record, now),
  139. DecodeOldestS: oldestAge(dq.decode, now),
  140. RecordBudget: budget.Record.Max,
  141. DecodeBudget: budget.Decode.Max,
  142. HoldMs: holdPolicy.BaseMs,
  143. DecisionHoldMs: holdPolicy.BaseMs,
  144. RecordHoldMs: holdPolicy.RecordMs,
  145. DecodeHoldMs: holdPolicy.DecodeMs,
  146. RecordHoldSelected: len(recSelected.held) - len(recSelected.displaced),
  147. DecodeHoldSelected: len(decSelected.held) - len(decSelected.displaced),
  148. RecordHoldProtected: len(recSelected.protected),
  149. DecodeHoldProtected: len(decSelected.protected),
  150. RecordHoldExpired: len(recExpired),
  151. DecodeHoldExpired: len(decExpired),
  152. RecordHoldDisplaced: len(recSelected.displaced),
  153. DecodeHoldDisplaced: len(decSelected.displaced),
  154. RecordOpportunistic: len(recSelected.opportunistic),
  155. DecodeOpportunistic: len(decSelected.opportunistic),
  156. RecordDisplaced: len(recSelected.displacedByHold),
  157. DecodeDisplaced: len(decSelected.displacedByHold),
  158. RecordDisplacedByHold: len(recSelected.displacedByHold),
  159. DecodeDisplacedByHold: len(decSelected.displacedByHold),
  160. }
  161. for i := range decisions {
  162. id := decisions[i].Candidate.ID
  163. if decisions[i].ShouldRecord {
  164. decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source, recPressureTag)
  165. if _, ok := recSelected.selected[id]; !ok {
  166. decisions[i].ShouldRecord = false
  167. extras := []string{recPressureTag, "pressure:budget", "budget:" + slugToken(budget.Record.Source)}
  168. if _, ok := recSelected.displaced[id]; ok {
  169. extras = []string{recPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Record.Source)}
  170. } else if _, ok := recSelected.displacedByHold[id]; ok {
  171. extras = []string{recPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Record.Source)}
  172. } else if _, ok := recSelected.expired[id]; ok {
  173. extras = append(extras, ReasonTagHoldExpired)
  174. }
  175. decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, extras...)
  176. stats.RecordDropped++
  177. }
  178. }
  179. if decisions[i].ShouldAutoDecode {
  180. decisions[i].DecodeAdmission = buildQueueAdmission("decode", id, decSelected, policy, holdPolicy, budget.Decode.Source, decPressureTag)
  181. if _, ok := decSelected.selected[id]; !ok {
  182. decisions[i].ShouldAutoDecode = false
  183. if decisions[i].Reason == "" {
  184. extras := []string{decPressureTag, "pressure:budget", "budget:" + slugToken(budget.Decode.Source)}
  185. if _, ok := decSelected.displaced[id]; ok {
  186. extras = []string{decPressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:" + slugToken(budget.Decode.Source)}
  187. } else if _, ok := decSelected.displacedByHold[id]; ok {
  188. extras = []string{decPressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budget.Decode.Source)}
  189. } else if _, ok := decSelected.expired[id]; ok {
  190. extras = append(extras, ReasonTagHoldExpired)
  191. }
  192. decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, extras...)
  193. }
  194. stats.DecodeDropped++
  195. }
  196. }
  197. }
  198. return stats
  199. }
  200. func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy, expired map[int64]struct{}) queueSelection {
  201. selection := queueSelection{
  202. selected: map[int64]struct{}{},
  203. held: map[int64]struct{}{},
  204. protected: map[int64]struct{}{},
  205. displacedByHold: map[int64]struct{}{},
  206. displaced: map[int64]struct{}{},
  207. opportunistic: map[int64]struct{}{},
  208. expired: map[int64]struct{}{},
  209. scores: map[int64]float64{},
  210. tiers: map[int64]string{},
  211. }
  212. if len(queue) == 0 {
  213. return selection
  214. }
  215. for id := range expired {
  216. selection.expired[id] = struct{}{}
  217. }
  218. type scored struct {
  219. id int64
  220. score float64
  221. }
  222. scoredList := make([]scored, 0, len(queue))
  223. for id, qd := range queue {
  224. age := now.Sub(qd.FirstSeen).Seconds()
  225. boost := age / 2.0
  226. if boost > 5 {
  227. boost = 5
  228. }
  229. hint := qd.Hint
  230. if hint == "" {
  231. hint = qd.Class
  232. }
  233. policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName)
  234. score := qd.SNRDb + boost + policyBoost
  235. selection.scores[id] = score
  236. if len(scoredList) == 0 || score < selection.minScore {
  237. selection.minScore = score
  238. }
  239. if len(scoredList) == 0 || score > selection.maxScore {
  240. selection.maxScore = score
  241. }
  242. scoredList = append(scoredList, scored{id: id, score: score})
  243. }
  244. sort.Slice(scoredList, func(i, j int) bool {
  245. return scoredList[i].score > scoredList[j].score
  246. })
  247. for id, score := range selection.scores {
  248. selection.tiers[id] = PriorityTierFromRange(score, selection.minScore, selection.maxScore)
  249. }
  250. limit := max
  251. if limit <= 0 || limit > len(scoredList) {
  252. limit = len(scoredList)
  253. }
  254. if len(hold) > 0 && len(hold) > limit {
  255. limit = len(hold)
  256. if limit > len(scoredList) {
  257. limit = len(scoredList)
  258. }
  259. }
  260. for id := range hold {
  261. if _, ok := queue[id]; ok {
  262. selection.selected[id] = struct{}{}
  263. selection.held[id] = struct{}{}
  264. if isProtectedTier(selection.tiers[id]) {
  265. selection.protected[id] = struct{}{}
  266. }
  267. }
  268. }
  269. displaceable := buildDisplaceableHold(selection.held, selection.protected, selection.tiers, selection.scores)
  270. for _, s := range scoredList {
  271. if _, ok := selection.selected[s.id]; ok {
  272. continue
  273. }
  274. if len(selection.selected) < limit {
  275. selection.selected[s.id] = struct{}{}
  276. continue
  277. }
  278. if len(displaceable) == 0 {
  279. continue
  280. }
  281. target := displaceable[0]
  282. if priorityTierRank(selection.tiers[s.id]) <= priorityTierRank(selection.tiers[target]) {
  283. continue
  284. }
  285. displaceable = displaceable[1:]
  286. delete(selection.selected, target)
  287. selection.displaced[target] = struct{}{}
  288. selection.selected[s.id] = struct{}{}
  289. selection.opportunistic[s.id] = struct{}{}
  290. }
  291. if holdDur > 0 {
  292. for id := range selection.displaced {
  293. delete(hold, id)
  294. }
  295. for id := range selection.selected {
  296. hold[id] = now.Add(holdDur)
  297. }
  298. }
  299. if len(selection.selected) > 0 {
  300. first := true
  301. for id := range selection.selected {
  302. score := selection.scores[id]
  303. if first || score < selection.cutoff {
  304. selection.cutoff = score
  305. first = false
  306. }
  307. }
  308. }
  309. if len(selection.selected) > 0 {
  310. for id := range selection.scores {
  311. if _, ok := selection.selected[id]; ok {
  312. continue
  313. }
  314. if _, ok := selection.displaced[id]; ok {
  315. continue
  316. }
  317. if selection.scores[id] >= selection.cutoff {
  318. selection.displacedByHold[id] = struct{}{}
  319. }
  320. }
  321. }
  322. return selection
  323. }
  324. func buildDisplaceableHold(held map[int64]struct{}, protected map[int64]struct{}, tiers map[int64]string, scores map[int64]float64) []int64 {
  325. type entry struct {
  326. id int64
  327. rank int
  328. score float64
  329. }
  330. candidates := make([]entry, 0, len(held))
  331. for id := range held {
  332. if _, ok := protected[id]; ok {
  333. continue
  334. }
  335. score := 0.0
  336. if scores != nil {
  337. score = scores[id]
  338. }
  339. candidates = append(candidates, entry{
  340. id: id,
  341. rank: priorityTierRank(tiers[id]),
  342. score: score,
  343. })
  344. }
  345. if len(candidates) == 0 {
  346. return nil
  347. }
  348. sort.Slice(candidates, func(i, j int) bool {
  349. if candidates[i].rank == candidates[j].rank {
  350. return candidates[i].score < candidates[j].score
  351. }
  352. return candidates[i].rank < candidates[j].rank
  353. })
  354. out := make([]int64, 0, len(candidates))
  355. for _, c := range candidates {
  356. out = append(out, c.id)
  357. }
  358. return out
  359. }
  360. func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string, pressureTag string) *PriorityAdmission {
  361. score, ok := selection.scores[id]
  362. if !ok {
  363. return nil
  364. }
  365. admission := &PriorityAdmission{
  366. Basis: queueName,
  367. Score: score,
  368. Cutoff: selection.cutoff,
  369. Tier: selection.tiers[id],
  370. }
  371. if _, ok := selection.selected[id]; ok {
  372. if _, held := selection.held[id]; held {
  373. admission.Class = AdmissionClassHold
  374. extras := []string{pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:" + slugToken(budgetSource)}
  375. if _, ok := selection.protected[id]; ok {
  376. extras = append(extras, ReasonTagHoldProtected)
  377. }
  378. admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, extras...)
  379. } else {
  380. admission.Class = AdmissionClassAdmit
  381. extras := []string{pressureTag, "budget:" + slugToken(budgetSource)}
  382. if _, ok := selection.opportunistic[id]; ok {
  383. extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced)
  384. }
  385. admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, extras...)
  386. }
  387. return admission
  388. }
  389. if _, ok := selection.displaced[id]; ok {
  390. admission.Class = AdmissionClassDisplace
  391. admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:"+slugToken(budgetSource))
  392. return admission
  393. }
  394. if _, ok := selection.displacedByHold[id]; ok {
  395. admission.Class = AdmissionClassDisplace
  396. admission.Reason = admissionReason("queue:"+queueName+":displace", policy, holdPolicy, pressureTag, "pressure:hold", ReasonTagHoldActive, "budget:"+slugToken(budgetSource))
  397. return admission
  398. }
  399. admission.Class = AdmissionClassDefer
  400. extras := []string{pressureTag, "pressure:budget", "budget:" + slugToken(budgetSource)}
  401. if _, ok := selection.expired[id]; ok {
  402. extras = append(extras, ReasonTagHoldExpired)
  403. }
  404. admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, extras...)
  405. return admission
  406. }
  407. func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 {
  408. oldest := 0.0
  409. first := true
  410. for _, qd := range queue {
  411. age := now.Sub(qd.FirstSeen).Seconds()
  412. if first || age > oldest {
  413. oldest = age
  414. first = false
  415. }
  416. }
  417. if first {
  418. return 0
  419. }
  420. return oldest
  421. }