Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

290 lines
8.4KB

  1. package pipeline
  2. import (
  3. "sort"
  4. "time"
  5. )
  6. type DecisionQueueStats struct {
  7. RecordQueued int `json:"record_queued"`
  8. DecodeQueued int `json:"decode_queued"`
  9. RecordSelected int `json:"record_selected"`
  10. DecodeSelected int `json:"decode_selected"`
  11. RecordActive int `json:"record_active"`
  12. DecodeActive int `json:"decode_active"`
  13. RecordOldestS float64 `json:"record_oldest_sec"`
  14. DecodeOldestS float64 `json:"decode_oldest_sec"`
  15. RecordBudget int `json:"record_budget"`
  16. DecodeBudget int `json:"decode_budget"`
  17. HoldMs int `json:"hold_ms"`
  18. RecordHoldMs int `json:"record_hold_ms"`
  19. DecodeHoldMs int `json:"decode_hold_ms"`
  20. RecordDropped int `json:"record_dropped"`
  21. DecodeDropped int `json:"decode_dropped"`
  22. }
  23. type queuedDecision struct {
  24. ID int64
  25. SNRDb float64
  26. Hint string
  27. Class string
  28. FirstSeen time.Time
  29. LastSeen time.Time
  30. }
  31. type queueSelection struct {
  32. selected map[int64]struct{}
  33. held map[int64]struct{}
  34. scores map[int64]float64
  35. minScore float64
  36. maxScore float64
  37. cutoff float64
  38. }
  39. type decisionQueues struct {
  40. record map[int64]*queuedDecision
  41. decode map[int64]*queuedDecision
  42. recordHold map[int64]time.Time
  43. decodeHold map[int64]time.Time
  44. }
  45. func newDecisionQueues() *decisionQueues {
  46. return &decisionQueues{
  47. record: map[int64]*queuedDecision{},
  48. decode: map[int64]*queuedDecision{},
  49. recordHold: map[int64]time.Time{},
  50. decodeHold: map[int64]time.Time{},
  51. }
  52. }
  53. func (dq *decisionQueues) Apply(decisions []SignalDecision, budget BudgetModel, now time.Time, policy Policy) DecisionQueueStats {
  54. if dq == nil {
  55. return DecisionQueueStats{}
  56. }
  57. holdPolicy := HoldPolicyFromPolicy(policy)
  58. recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond
  59. decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond
  60. recSeen := map[int64]bool{}
  61. decSeen := map[int64]bool{}
  62. for i := range decisions {
  63. id := decisions[i].Candidate.ID
  64. if id == 0 {
  65. continue
  66. }
  67. if decisions[i].ShouldRecord {
  68. qd := dq.record[id]
  69. if qd == nil {
  70. qd = &queuedDecision{ID: id, FirstSeen: now}
  71. dq.record[id] = qd
  72. }
  73. qd.SNRDb = decisions[i].Candidate.SNRDb
  74. qd.Hint = decisions[i].Candidate.Hint
  75. qd.Class = decisions[i].Class
  76. qd.LastSeen = now
  77. recSeen[id] = true
  78. }
  79. if decisions[i].ShouldAutoDecode {
  80. qd := dq.decode[id]
  81. if qd == nil {
  82. qd = &queuedDecision{ID: id, FirstSeen: now}
  83. dq.decode[id] = qd
  84. }
  85. qd.SNRDb = decisions[i].Candidate.SNRDb
  86. qd.Hint = decisions[i].Candidate.Hint
  87. qd.Class = decisions[i].Class
  88. qd.LastSeen = now
  89. decSeen[id] = true
  90. }
  91. }
  92. for id := range dq.record {
  93. if !recSeen[id] {
  94. delete(dq.record, id)
  95. }
  96. }
  97. for id := range dq.decode {
  98. if !decSeen[id] {
  99. delete(dq.decode, id)
  100. }
  101. }
  102. purgeExpired(dq.recordHold, now)
  103. purgeExpired(dq.decodeHold, now)
  104. recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy)
  105. decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy)
  106. recPressure := buildQueuePressure(budget.Record, len(dq.record), len(recSelected.selected), len(dq.recordHold))
  107. decPressure := buildQueuePressure(budget.Decode, len(dq.decode), len(decSelected.selected), len(dq.decodeHold))
  108. recPressureTag := pressureReasonTag(recPressure)
  109. decPressureTag := pressureReasonTag(decPressure)
  110. stats := DecisionQueueStats{
  111. RecordQueued: len(dq.record),
  112. DecodeQueued: len(dq.decode),
  113. RecordSelected: len(recSelected.selected),
  114. DecodeSelected: len(decSelected.selected),
  115. RecordActive: len(dq.recordHold),
  116. DecodeActive: len(dq.decodeHold),
  117. RecordOldestS: oldestAge(dq.record, now),
  118. DecodeOldestS: oldestAge(dq.decode, now),
  119. RecordBudget: budget.Record.Max,
  120. DecodeBudget: budget.Decode.Max,
  121. HoldMs: budget.HoldMs,
  122. RecordHoldMs: holdPolicy.RecordMs,
  123. DecodeHoldMs: holdPolicy.DecodeMs,
  124. }
  125. for i := range decisions {
  126. id := decisions[i].Candidate.ID
  127. if decisions[i].ShouldRecord {
  128. decisions[i].RecordAdmission = buildQueueAdmission("record", id, recSelected, policy, holdPolicy, budget.Record.Source, recPressureTag)
  129. if _, ok := recSelected.selected[id]; !ok {
  130. decisions[i].ShouldRecord = false
  131. decisions[i].Reason = admissionReason(DecisionReasonQueueRecord, policy, holdPolicy, recPressureTag, "pressure:budget", "budget:"+slugToken(budget.Record.Source))
  132. stats.RecordDropped++
  133. }
  134. }
  135. if decisions[i].ShouldAutoDecode {
  136. decisions[i].DecodeAdmission = buildQueueAdmission("decode", id, decSelected, policy, holdPolicy, budget.Decode.Source, decPressureTag)
  137. if _, ok := decSelected.selected[id]; !ok {
  138. decisions[i].ShouldAutoDecode = false
  139. if decisions[i].Reason == "" {
  140. decisions[i].Reason = admissionReason(DecisionReasonQueueDecode, policy, holdPolicy, decPressureTag, "pressure:budget", "budget:"+slugToken(budget.Decode.Source))
  141. }
  142. stats.DecodeDropped++
  143. }
  144. }
  145. }
  146. return stats
  147. }
  148. func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy Policy) queueSelection {
  149. selection := queueSelection{
  150. selected: map[int64]struct{}{},
  151. held: map[int64]struct{}{},
  152. scores: map[int64]float64{},
  153. }
  154. if len(queue) == 0 {
  155. return selection
  156. }
  157. type scored struct {
  158. id int64
  159. score float64
  160. }
  161. scoredList := make([]scored, 0, len(queue))
  162. for id, qd := range queue {
  163. age := now.Sub(qd.FirstSeen).Seconds()
  164. boost := age / 2.0
  165. if boost > 5 {
  166. boost = 5
  167. }
  168. hint := qd.Hint
  169. if hint == "" {
  170. hint = qd.Class
  171. }
  172. policyBoost := DecisionPriorityBoost(policy, hint, qd.Class, queueName)
  173. score := qd.SNRDb + boost + policyBoost
  174. selection.scores[id] = score
  175. if len(scoredList) == 0 || score < selection.minScore {
  176. selection.minScore = score
  177. }
  178. if len(scoredList) == 0 || score > selection.maxScore {
  179. selection.maxScore = score
  180. }
  181. scoredList = append(scoredList, scored{id: id, score: score})
  182. }
  183. sort.Slice(scoredList, func(i, j int) bool {
  184. return scoredList[i].score > scoredList[j].score
  185. })
  186. limit := max
  187. if limit <= 0 || limit > len(scoredList) {
  188. limit = len(scoredList)
  189. }
  190. if len(hold) > 0 && len(hold) > limit {
  191. limit = len(hold)
  192. if limit > len(scoredList) {
  193. limit = len(scoredList)
  194. }
  195. }
  196. for id := range hold {
  197. if _, ok := queue[id]; ok {
  198. selection.selected[id] = struct{}{}
  199. selection.held[id] = struct{}{}
  200. }
  201. }
  202. for _, s := range scoredList {
  203. if len(selection.selected) >= limit {
  204. break
  205. }
  206. if _, ok := selection.selected[s.id]; ok {
  207. continue
  208. }
  209. selection.selected[s.id] = struct{}{}
  210. }
  211. if holdDur > 0 {
  212. for id := range selection.selected {
  213. hold[id] = now.Add(holdDur)
  214. }
  215. }
  216. if len(selection.selected) > 0 {
  217. first := true
  218. for id := range selection.selected {
  219. score := selection.scores[id]
  220. if first || score < selection.cutoff {
  221. selection.cutoff = score
  222. first = false
  223. }
  224. }
  225. }
  226. return selection
  227. }
  228. func buildQueueAdmission(queueName string, id int64, selection queueSelection, policy Policy, holdPolicy HoldPolicy, budgetSource string, pressureTag string) *PriorityAdmission {
  229. score, ok := selection.scores[id]
  230. if !ok {
  231. return nil
  232. }
  233. admission := &PriorityAdmission{
  234. Basis: queueName,
  235. Score: score,
  236. Cutoff: selection.cutoff,
  237. Tier: PriorityTierFromRange(score, selection.minScore, selection.maxScore),
  238. }
  239. if _, ok := selection.selected[id]; ok {
  240. if _, held := selection.held[id]; held {
  241. admission.Class = AdmissionClassHold
  242. admission.Reason = admissionReason("queue:"+queueName+":hold", policy, holdPolicy, pressureTag, "pressure:hold", "budget:"+slugToken(budgetSource))
  243. } else {
  244. admission.Class = AdmissionClassAdmit
  245. admission.Reason = admissionReason("queue:"+queueName+":admit", policy, holdPolicy, pressureTag, "budget:"+slugToken(budgetSource))
  246. }
  247. return admission
  248. }
  249. admission.Class = AdmissionClassDefer
  250. admission.Reason = admissionReason("queue:"+queueName+":budget", policy, holdPolicy, pressureTag, "pressure:budget", "budget:"+slugToken(budgetSource))
  251. return admission
  252. }
  253. func purgeExpired(hold map[int64]time.Time, now time.Time) {
  254. for id, until := range hold {
  255. if now.After(until) {
  256. delete(hold, id)
  257. }
  258. }
  259. }
  260. func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 {
  261. oldest := 0.0
  262. first := true
  263. for _, qd := range queue {
  264. age := now.Sub(qd.FirstSeen).Seconds()
  265. if first || age > oldest {
  266. oldest = age
  267. first = false
  268. }
  269. }
  270. if first {
  271. return 0
  272. }
  273. return oldest
  274. }