Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

412 line
14KB

  1. package pipeline
  2. import (
  3. "math"
  4. "strings"
  5. "time"
  6. )
  7. type HoldPolicy struct {
  8. BaseMs int `json:"base_ms"`
  9. RefinementMs int `json:"refinement_ms"`
  10. RecordMs int `json:"record_ms"`
  11. DecodeMs int `json:"decode_ms"`
  12. Profile string `json:"profile,omitempty"`
  13. Strategy string `json:"strategy,omitempty"`
  14. Reasons []string `json:"reasons,omitempty"`
  15. }
  16. type RefinementHold struct {
  17. Active map[int64]time.Time
  18. }
  19. type RefinementAdmission struct {
  20. Budget int `json:"budget"`
  21. BudgetSource string `json:"budget_source,omitempty"`
  22. DecisionHoldMs int `json:"decision_hold_ms,omitempty"`
  23. HoldMs int `json:"hold_ms"`
  24. HoldSource string `json:"hold_source,omitempty"`
  25. Planned int `json:"planned"`
  26. Admitted int `json:"admitted"`
  27. Skipped int `json:"skipped"`
  28. Displaced int `json:"displaced"`
  29. DisplacedByHold int `json:"displaced_by_hold,omitempty"`
  30. HoldActive int `json:"hold_active"`
  31. HoldSelected int `json:"hold_selected"`
  32. HoldProtected int `json:"hold_protected"`
  33. HoldExpired int `json:"hold_expired"`
  34. HoldDisplaced int `json:"hold_displaced"`
  35. Opportunistic int `json:"opportunistic"`
  36. PriorityCutoff float64 `json:"priority_cutoff,omitempty"`
  37. PriorityTier string `json:"priority_tier,omitempty"`
  38. Reason string `json:"reason,omitempty"`
  39. Pressure BudgetPressure `json:"pressure,omitempty"`
  40. }
  41. type RefinementAdmissionResult struct {
  42. Plan RefinementPlan
  43. WorkItems []RefinementWorkItem
  44. Admitted []ScheduledCandidate
  45. Admission RefinementAdmission
  46. }
  47. func HoldPolicyFromPolicy(policy Policy) HoldPolicy {
  48. base := policy.DecisionHoldMs
  49. if base < 0 {
  50. base = 0
  51. }
  52. refMult := 1.0
  53. recMult := 1.0
  54. decMult := 1.0
  55. reasons := make([]string, 0, 2)
  56. profile := strings.ToLower(strings.TrimSpace(policy.Profile))
  57. strategy := strings.ToLower(strings.TrimSpace(policy.RefinementStrategy))
  58. intent := strings.ToLower(strings.TrimSpace(policy.Intent))
  59. archiveProfile := profileContains(profile, "archive")
  60. archiveStrategy := strategyContains(strategy, "archive")
  61. if archiveProfile || archiveStrategy {
  62. recMult *= 1.5
  63. decMult *= 1.1
  64. refMult *= 1.2
  65. if archiveProfile {
  66. reasons = append(reasons, HoldReasonProfileArchive)
  67. }
  68. if archiveStrategy {
  69. reasons = append(reasons, HoldReasonStrategyArchive)
  70. }
  71. }
  72. digitalProfile := profileContains(profile, "digital")
  73. digitalStrategy := strategyContains(strategy, "digital")
  74. if digitalProfile || digitalStrategy {
  75. decMult *= 1.6
  76. recMult *= 0.85
  77. refMult *= 1.1
  78. if digitalProfile {
  79. reasons = append(reasons, HoldReasonProfileDigital)
  80. }
  81. if digitalStrategy {
  82. reasons = append(reasons, HoldReasonStrategyDigital)
  83. }
  84. }
  85. if profileContains(profile, "aggressive") {
  86. refMult *= 1.15
  87. reasons = append(reasons, HoldReasonProfileAggressive)
  88. }
  89. if strategyContains(strings.ToLower(strings.TrimSpace(policy.SurveillanceStrategy)), "multi") {
  90. refMult *= 1.1
  91. reasons = append(reasons, HoldReasonStrategyMultiRes)
  92. }
  93. intentArchive := strings.Contains(intent, "archive") || strings.Contains(intent, "triage") || strings.Contains(intent, "record")
  94. intentDecode := strings.Contains(intent, "decode") || strings.Contains(intent, "digital") || strings.Contains(intent, "analysis")
  95. intentSurveillance := strings.Contains(intent, "surveillance") || strings.Contains(intent, "wideband")
  96. if intentArchive {
  97. recMult *= 1.25
  98. refMult *= 1.1
  99. decMult *= 1.05
  100. reasons = append(reasons, HoldReasonIntentArchive)
  101. }
  102. if intentDecode {
  103. decMult *= 1.25
  104. refMult *= 1.05
  105. reasons = append(reasons, HoldReasonIntentDecode)
  106. }
  107. if intentSurveillance {
  108. refMult *= 1.1
  109. reasons = append(reasons, HoldReasonIntentSurveillance)
  110. }
  111. return HoldPolicy{
  112. BaseMs: base,
  113. RefinementMs: scaleHold(base, refMult),
  114. RecordMs: scaleHold(base, recMult),
  115. DecodeMs: scaleHold(base, decMult),
  116. Profile: policy.Profile,
  117. Strategy: policy.RefinementStrategy,
  118. Reasons: reasons,
  119. }
  120. }
  121. func AdmitRefinementPlan(plan RefinementPlan, policy Policy, now time.Time, hold *RefinementHold) RefinementAdmissionResult {
  122. budget := BudgetModelFromPolicy(policy)
  123. return AdmitRefinementPlanWithBudget(plan, policy, budget, now, hold)
  124. }
  125. func AdmitRefinementPlanWithBudget(plan RefinementPlan, policy Policy, budgetModel BudgetModel, now time.Time, hold *RefinementHold) RefinementAdmissionResult {
  126. ranked := plan.Ranked
  127. if len(ranked) == 0 {
  128. ranked = plan.Selected
  129. }
  130. workItems := append([]RefinementWorkItem(nil), plan.WorkItems...)
  131. admission := RefinementAdmission{
  132. Budget: plan.Budget,
  133. BudgetSource: plan.BudgetSource,
  134. }
  135. if len(ranked) == 0 {
  136. admission.Reason = ReasonAdmissionNoCandidates
  137. return RefinementAdmissionResult{Plan: plan, WorkItems: workItems, Admission: admission}
  138. }
  139. holdPolicy := HoldPolicyFromPolicy(policy)
  140. admission.DecisionHoldMs = holdPolicy.BaseMs
  141. admission.HoldMs = holdPolicy.RefinementMs
  142. admission.HoldSource = "resources.decision_hold_ms"
  143. if len(holdPolicy.Reasons) > 0 {
  144. admission.HoldSource += ":" + strings.Join(holdPolicy.Reasons, ",")
  145. }
  146. planned := len(ranked)
  147. admission.Planned = planned
  148. selected := map[int64]struct{}{}
  149. held := map[int64]struct{}{}
  150. protected := map[int64]struct{}{}
  151. expired := map[int64]struct{}{}
  152. if hold != nil {
  153. expired = expireHold(hold.Active, now)
  154. for id := range hold.Active {
  155. if rankedContains(ranked, id) {
  156. selected[id] = struct{}{}
  157. held[id] = struct{}{}
  158. }
  159. }
  160. }
  161. limit := plan.Budget
  162. if limit <= 0 || limit > planned {
  163. limit = planned
  164. }
  165. if len(selected) > limit {
  166. limit = len(selected)
  167. if limit > planned {
  168. limit = planned
  169. }
  170. }
  171. tierByID := map[int64]string{}
  172. scoreByID := map[int64]float64{}
  173. familyByID := map[int64]string{}
  174. familyRankByID := map[int64]int{}
  175. familyFloorByID := map[int64]string{}
  176. for _, cand := range ranked {
  177. id := cand.Candidate.ID
  178. family, familyRank := signalPriorityMatch(policy, cand.Candidate.Hint, "")
  179. familyByID[id] = family
  180. familyRankByID[id] = familyRank
  181. familyFloorByID[id] = signalPriorityTierFloor(familyRank)
  182. baseTier := PriorityTierFromRange(cand.Priority, plan.PriorityMin, plan.PriorityMax)
  183. tierByID[id] = applyTierFloor(baseTier, familyFloorByID[id])
  184. scoreByID[id] = cand.Priority
  185. }
  186. for id := range held {
  187. if isProtectedTier(tierByID[id]) {
  188. protected[id] = struct{}{}
  189. }
  190. }
  191. displaceable := buildDisplaceableHold(held, protected, tierByID, scoreByID, familyRankByID)
  192. opportunistic := map[int64]struct{}{}
  193. displacedHold := map[int64]struct{}{}
  194. for _, cand := range ranked {
  195. if _, ok := selected[cand.Candidate.ID]; ok {
  196. continue
  197. }
  198. if len(selected) < limit {
  199. selected[cand.Candidate.ID] = struct{}{}
  200. continue
  201. }
  202. if len(displaceable) == 0 {
  203. continue
  204. }
  205. target := displaceable[0]
  206. if priorityTierRank(tierByID[cand.Candidate.ID]) <= priorityTierRank(tierByID[target]) {
  207. continue
  208. }
  209. displaceable = displaceable[1:]
  210. delete(selected, target)
  211. displacedHold[target] = struct{}{}
  212. selected[cand.Candidate.ID] = struct{}{}
  213. opportunistic[cand.Candidate.ID] = struct{}{}
  214. }
  215. if hold != nil && admission.HoldMs > 0 {
  216. until := now.Add(time.Duration(admission.HoldMs) * time.Millisecond)
  217. if hold.Active == nil {
  218. hold.Active = map[int64]time.Time{}
  219. }
  220. for id := range displacedHold {
  221. delete(hold.Active, id)
  222. }
  223. for id := range selected {
  224. hold.Active[id] = until
  225. }
  226. }
  227. admitted := make([]ScheduledCandidate, 0, len(selected))
  228. for _, cand := range ranked {
  229. if _, ok := selected[cand.Candidate.ID]; ok {
  230. admitted = append(admitted, cand)
  231. }
  232. }
  233. admission.Admitted = len(admitted)
  234. admission.Skipped = planned - admission.Admitted
  235. if admission.Skipped < 0 {
  236. admission.Skipped = 0
  237. }
  238. if hold != nil {
  239. admission.HoldActive = len(hold.Active)
  240. }
  241. admission.HoldSelected = len(held) - len(displacedHold)
  242. admission.HoldProtected = len(protected)
  243. admission.HoldExpired = len(expired)
  244. admission.HoldDisplaced = len(displacedHold)
  245. admission.Opportunistic = len(opportunistic)
  246. displacedByHold := map[int64]struct{}{}
  247. if len(admitted) > 0 {
  248. admission.PriorityCutoff = admitted[len(admitted)-1].Priority
  249. for _, cand := range ranked {
  250. if _, ok := selected[cand.Candidate.ID]; ok {
  251. continue
  252. }
  253. if cand.Priority >= admission.PriorityCutoff {
  254. if _, ok := displacedHold[cand.Candidate.ID]; ok {
  255. continue
  256. }
  257. displacedByHold[cand.Candidate.ID] = struct{}{}
  258. }
  259. }
  260. }
  261. admission.Displaced = len(displacedByHold) + len(displacedHold)
  262. admission.DisplacedByHold = len(displacedByHold)
  263. admission.PriorityTier = PriorityTierFromRange(admission.PriorityCutoff, plan.PriorityMin, plan.PriorityMax)
  264. admission.Pressure = buildRefinementPressure(budgetModel, admission)
  265. if admission.PriorityCutoff > 0 {
  266. admission.Reason = admissionReason("admission:budget", policy, holdPolicy, pressureReasonTag(admission.Pressure), "budget:"+slugToken(plan.BudgetSource))
  267. }
  268. plan.Selected = admitted
  269. plan.PriorityCutoff = admission.PriorityCutoff
  270. plan.DroppedByBudget = admission.Skipped
  271. for i := range workItems {
  272. item := &workItems[i]
  273. if item.Status != RefinementStatusPlanned {
  274. continue
  275. }
  276. id := item.Candidate.ID
  277. familyRankOut := familyRankForOutput(familyRankByID[id])
  278. if _, ok := selected[id]; ok {
  279. item.Status = RefinementStatusAdmitted
  280. item.Reason = RefinementReasonAdmitted
  281. class := AdmissionClassAdmit
  282. reason := "refinement:admit:budget"
  283. if _, wasHeld := held[id]; wasHeld {
  284. class = AdmissionClassHold
  285. reason = "refinement:admit:hold"
  286. }
  287. if item.Admission == nil {
  288. item.Admission = &PriorityAdmission{Basis: "refinement"}
  289. }
  290. item.Admission.Class = class
  291. item.Admission.Score = item.Priority
  292. item.Admission.Cutoff = admission.PriorityCutoff
  293. item.Admission.Tier = tierByID[id]
  294. item.Admission.TierFloor = familyFloorByID[id]
  295. item.Admission.Family = familyByID[id]
  296. item.Admission.FamilyRank = familyRankOut
  297. extras := []string{pressureReasonTag(admission.Pressure), "budget:" + slugToken(plan.BudgetSource)}
  298. if _, wasHeld := held[id]; wasHeld {
  299. extras = append(extras, "pressure:hold", ReasonTagHoldActive)
  300. if _, ok := protected[id]; ok {
  301. extras = append(extras, ReasonTagHoldProtected)
  302. }
  303. }
  304. if _, ok := opportunistic[id]; ok {
  305. extras = append(extras, "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced)
  306. }
  307. item.Admission.Reason = admissionReason(reason, policy, holdPolicy, extras...)
  308. continue
  309. }
  310. if _, ok := displacedHold[id]; ok {
  311. item.Status = RefinementStatusDisplaced
  312. item.Reason = RefinementReasonDisplaced
  313. if item.Admission == nil {
  314. item.Admission = &PriorityAdmission{Basis: "refinement"}
  315. }
  316. item.Admission.Class = AdmissionClassDisplace
  317. item.Admission.Score = item.Priority
  318. item.Admission.Cutoff = admission.PriorityCutoff
  319. item.Admission.Tier = tierByID[id]
  320. item.Admission.TierFloor = familyFloorByID[id]
  321. item.Admission.Family = familyByID[id]
  322. item.Admission.FamilyRank = familyRankOut
  323. item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", ReasonTagDisplaceOpportunist, ReasonTagDisplaceTier, ReasonTagHoldDisplaced, "budget:"+slugToken(plan.BudgetSource))
  324. continue
  325. }
  326. if _, ok := displacedByHold[id]; ok {
  327. item.Status = RefinementStatusDisplaced
  328. item.Reason = RefinementReasonDisplaced
  329. if item.Admission == nil {
  330. item.Admission = &PriorityAdmission{Basis: "refinement"}
  331. }
  332. item.Admission.Class = AdmissionClassDisplace
  333. item.Admission.Score = item.Priority
  334. item.Admission.Cutoff = admission.PriorityCutoff
  335. item.Admission.Tier = tierByID[id]
  336. item.Admission.TierFloor = familyFloorByID[id]
  337. item.Admission.Family = familyByID[id]
  338. item.Admission.FamilyRank = familyRankOut
  339. item.Admission.Reason = admissionReason("refinement:displace:hold", policy, holdPolicy, pressureReasonTag(admission.Pressure), "pressure:hold", ReasonTagHoldActive, "budget:"+slugToken(plan.BudgetSource))
  340. continue
  341. }
  342. item.Status = RefinementStatusSkipped
  343. item.Reason = RefinementReasonBudget
  344. if item.Admission == nil {
  345. item.Admission = &PriorityAdmission{Basis: "refinement"}
  346. }
  347. item.Admission.Class = AdmissionClassDefer
  348. item.Admission.Score = item.Priority
  349. item.Admission.Cutoff = admission.PriorityCutoff
  350. item.Admission.Tier = tierByID[id]
  351. item.Admission.TierFloor = familyFloorByID[id]
  352. item.Admission.Family = familyByID[id]
  353. item.Admission.FamilyRank = familyRankOut
  354. extras := []string{pressureReasonTag(admission.Pressure), "pressure:budget", "budget:" + slugToken(plan.BudgetSource)}
  355. if _, ok := expired[id]; ok {
  356. extras = append(extras, ReasonTagHoldExpired)
  357. }
  358. item.Admission.Reason = admissionReason("refinement:skip:budget", policy, holdPolicy, extras...)
  359. }
  360. return RefinementAdmissionResult{
  361. Plan: plan,
  362. WorkItems: workItems,
  363. Admitted: admitted,
  364. Admission: admission,
  365. }
  366. }
  367. func rankedContains(items []ScheduledCandidate, id int64) bool {
  368. for _, item := range items {
  369. if item.Candidate.ID == id {
  370. return true
  371. }
  372. }
  373. return false
  374. }
  375. func scaleHold(base int, mult float64) int {
  376. if base <= 0 {
  377. return 0
  378. }
  379. return int(math.Round(float64(base) * mult))
  380. }
  381. func profileContains(profile string, token string) bool {
  382. if profile == "" || token == "" {
  383. return false
  384. }
  385. return strings.Contains(profile, strings.ToLower(token))
  386. }
  387. func strategyContains(strategy string, token string) bool {
  388. if strategy == "" || token == "" {
  389. return false
  390. }
  391. return strings.Contains(strategy, strings.ToLower(token))
  392. }