Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

228 lignes
5.8KB

  1. package main
  2. import (
  3. "sort"
  4. "time"
  5. "sdr-wideband-suite/internal/pipeline"
  6. )
  7. type decisionQueueStats struct {
  8. RecordQueued int `json:"record_queued"`
  9. DecodeQueued int `json:"decode_queued"`
  10. RecordSelected int `json:"record_selected"`
  11. DecodeSelected int `json:"decode_selected"`
  12. RecordActive int `json:"record_active"`
  13. DecodeActive int `json:"decode_active"`
  14. RecordOldestS float64 `json:"record_oldest_sec"`
  15. DecodeOldestS float64 `json:"decode_oldest_sec"`
  16. RecordBudget int `json:"record_budget"`
  17. DecodeBudget int `json:"decode_budget"`
  18. HoldMs int `json:"hold_ms"`
  19. RecordHoldMs int `json:"record_hold_ms"`
  20. DecodeHoldMs int `json:"decode_hold_ms"`
  21. RecordDropped int `json:"record_dropped"`
  22. DecodeDropped int `json:"decode_dropped"`
  23. }
  24. type queuedDecision struct {
  25. ID int64
  26. SNRDb float64
  27. Hint string
  28. Class string
  29. FirstSeen time.Time
  30. LastSeen time.Time
  31. }
  32. type decisionQueues struct {
  33. record map[int64]*queuedDecision
  34. decode map[int64]*queuedDecision
  35. recordHold map[int64]time.Time
  36. decodeHold map[int64]time.Time
  37. }
  38. func newDecisionQueues() *decisionQueues {
  39. return &decisionQueues{
  40. record: map[int64]*queuedDecision{},
  41. decode: map[int64]*queuedDecision{},
  42. recordHold: map[int64]time.Time{},
  43. decodeHold: map[int64]time.Time{},
  44. }
  45. }
  46. func (dq *decisionQueues) Apply(decisions []pipeline.SignalDecision, budget pipeline.BudgetModel, now time.Time, policy pipeline.Policy) decisionQueueStats {
  47. if dq == nil {
  48. return decisionQueueStats{}
  49. }
  50. holdPolicy := pipeline.HoldPolicyFromPolicy(policy)
  51. recordHold := time.Duration(holdPolicy.RecordMs) * time.Millisecond
  52. decodeHold := time.Duration(holdPolicy.DecodeMs) * time.Millisecond
  53. recSeen := map[int64]bool{}
  54. decSeen := map[int64]bool{}
  55. for i := range decisions {
  56. id := decisions[i].Candidate.ID
  57. if id == 0 {
  58. continue
  59. }
  60. if decisions[i].ShouldRecord {
  61. qd := dq.record[id]
  62. if qd == nil {
  63. qd = &queuedDecision{ID: id, FirstSeen: now}
  64. dq.record[id] = qd
  65. }
  66. qd.SNRDb = decisions[i].Candidate.SNRDb
  67. qd.Hint = decisions[i].Candidate.Hint
  68. qd.Class = decisions[i].Class
  69. qd.LastSeen = now
  70. recSeen[id] = true
  71. }
  72. if decisions[i].ShouldAutoDecode {
  73. qd := dq.decode[id]
  74. if qd == nil {
  75. qd = &queuedDecision{ID: id, FirstSeen: now}
  76. dq.decode[id] = qd
  77. }
  78. qd.SNRDb = decisions[i].Candidate.SNRDb
  79. qd.Hint = decisions[i].Candidate.Hint
  80. qd.Class = decisions[i].Class
  81. qd.LastSeen = now
  82. decSeen[id] = true
  83. }
  84. }
  85. for id := range dq.record {
  86. if !recSeen[id] {
  87. delete(dq.record, id)
  88. }
  89. }
  90. for id := range dq.decode {
  91. if !decSeen[id] {
  92. delete(dq.decode, id)
  93. }
  94. }
  95. purgeExpired(dq.recordHold, now)
  96. purgeExpired(dq.decodeHold, now)
  97. recSelected := selectQueued("record", dq.record, dq.recordHold, budget.Record.Max, recordHold, now, policy)
  98. decSelected := selectQueued("decode", dq.decode, dq.decodeHold, budget.Decode.Max, decodeHold, now, policy)
  99. stats := decisionQueueStats{
  100. RecordQueued: len(dq.record),
  101. DecodeQueued: len(dq.decode),
  102. RecordSelected: len(recSelected),
  103. DecodeSelected: len(decSelected),
  104. RecordActive: len(dq.recordHold),
  105. DecodeActive: len(dq.decodeHold),
  106. RecordOldestS: oldestAge(dq.record, now),
  107. DecodeOldestS: oldestAge(dq.decode, now),
  108. RecordBudget: budget.Record.Max,
  109. DecodeBudget: budget.Decode.Max,
  110. HoldMs: budget.HoldMs,
  111. RecordHoldMs: holdPolicy.RecordMs,
  112. DecodeHoldMs: holdPolicy.DecodeMs,
  113. }
  114. for i := range decisions {
  115. id := decisions[i].Candidate.ID
  116. if decisions[i].ShouldRecord {
  117. if _, ok := recSelected[id]; !ok {
  118. decisions[i].ShouldRecord = false
  119. decisions[i].Reason = "queued: record budget"
  120. stats.RecordDropped++
  121. }
  122. }
  123. if decisions[i].ShouldAutoDecode {
  124. if _, ok := decSelected[id]; !ok {
  125. decisions[i].ShouldAutoDecode = false
  126. if decisions[i].Reason == "" {
  127. decisions[i].Reason = "queued: decode budget"
  128. }
  129. stats.DecodeDropped++
  130. }
  131. }
  132. }
  133. return stats
  134. }
  135. func selectQueued(queueName string, queue map[int64]*queuedDecision, hold map[int64]time.Time, max int, holdDur time.Duration, now time.Time, policy pipeline.Policy) map[int64]struct{} {
  136. selected := map[int64]struct{}{}
  137. if len(queue) == 0 {
  138. return selected
  139. }
  140. type scored struct {
  141. id int64
  142. score float64
  143. }
  144. scoredList := make([]scored, 0, len(queue))
  145. for id, qd := range queue {
  146. age := now.Sub(qd.FirstSeen).Seconds()
  147. boost := age / 2.0
  148. if boost > 5 {
  149. boost = 5
  150. }
  151. hint := qd.Hint
  152. if hint == "" {
  153. hint = qd.Class
  154. }
  155. policyBoost := pipeline.DecisionPriorityBoost(policy, hint, qd.Class, queueName)
  156. scoredList = append(scoredList, scored{id: id, score: qd.SNRDb + boost + policyBoost})
  157. }
  158. sort.Slice(scoredList, func(i, j int) bool {
  159. return scoredList[i].score > scoredList[j].score
  160. })
  161. limit := max
  162. if limit <= 0 || limit > len(scoredList) {
  163. limit = len(scoredList)
  164. }
  165. if len(hold) > 0 && len(hold) > limit {
  166. limit = len(hold)
  167. if limit > len(scoredList) {
  168. limit = len(scoredList)
  169. }
  170. }
  171. for id := range hold {
  172. if _, ok := queue[id]; ok {
  173. selected[id] = struct{}{}
  174. }
  175. }
  176. for _, s := range scoredList {
  177. if len(selected) >= limit {
  178. break
  179. }
  180. if _, ok := selected[s.id]; ok {
  181. continue
  182. }
  183. selected[s.id] = struct{}{}
  184. }
  185. if holdDur > 0 {
  186. for id := range selected {
  187. hold[id] = now.Add(holdDur)
  188. }
  189. }
  190. return selected
  191. }
  192. func purgeExpired(hold map[int64]time.Time, now time.Time) {
  193. for id, until := range hold {
  194. if now.After(until) {
  195. delete(hold, id)
  196. }
  197. }
  198. }
  199. func oldestAge(queue map[int64]*queuedDecision, now time.Time) float64 {
  200. oldest := 0.0
  201. first := true
  202. for _, qd := range queue {
  203. age := now.Sub(qd.FirstSeen).Seconds()
  204. if first || age > oldest {
  205. oldest = age
  206. first = false
  207. }
  208. }
  209. if first {
  210. return 0
  211. }
  212. return oldest
  213. }