Wideband autonomous SDR analysis engine forked from sdr-visual-suite
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

170 satır
6.0KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log"
  6. "os"
  7. "runtime/debug"
  8. "strings"
  9. "sync"
  10. "time"
  11. "sdr-wideband-suite/internal/config"
  12. "sdr-wideband-suite/internal/detector"
  13. "sdr-wideband-suite/internal/dsp"
  14. "sdr-wideband-suite/internal/recorder"
  15. )
  16. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot, extractMgr *extractionManager, phaseSnap *phaseSnapshot) {
  17. defer func() {
  18. if r := recover(); r != nil {
  19. log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack())
  20. }
  21. }()
  22. rt := newDSPRuntime(cfg, det, window, gpuState)
  23. ticker := time.NewTicker(cfg.FrameInterval())
  24. defer ticker.Stop()
  25. logTicker := time.NewTicker(5 * time.Second)
  26. defer logTicker.Stop()
  27. enc := json.NewEncoder(eventFile)
  28. dcBlocker := dsp.NewDCBlocker(0.995)
  29. state := &phaseState{}
  30. for {
  31. select {
  32. case <-ctx.Done():
  33. return
  34. case <-logTicker.C:
  35. st := srcMgr.Stats()
  36. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  37. case upd := <-updates:
  38. rt.applyUpdate(upd, srcMgr, rec, gpuState)
  39. dcBlocker.Reset()
  40. ticker.Reset(rt.cfg.FrameInterval())
  41. case <-ticker.C:
  42. art, err := rt.captureSpectrum(srcMgr, rec, dcBlocker, gpuState)
  43. if err != nil {
  44. log.Printf("read IQ: %v", err)
  45. if strings.Contains(err.Error(), "timeout") {
  46. if err := srcMgr.Restart(rt.cfg); err != nil {
  47. log.Printf("restart failed: %v", err)
  48. }
  49. }
  50. continue
  51. }
  52. if !rt.gotSamples {
  53. log.Printf("received IQ samples")
  54. rt.gotSamples = true
  55. }
  56. state.surveillance = rt.buildSurveillanceResult(art)
  57. state.refinement = rt.runRefinement(art, state.surveillance, extractMgr, rec)
  58. finished := state.surveillance.Finished
  59. thresholds := state.surveillance.Thresholds
  60. noiseFloor := state.surveillance.NoiseFloor
  61. var displaySignals []detector.Signal
  62. if len(art.detailIQ) > 0 {
  63. displaySignals = state.refinement.Result.Signals
  64. if rec != nil && len(displaySignals) > 0 && len(art.allIQ) > 0 {
  65. aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult}
  66. streamSnips, streamRates := extractForStreaming(extractMgr, art.allIQ, rt.cfg.SampleRate, rt.cfg.CenterHz, displaySignals, rt.streamPhaseState, rt.streamOverlap, aqCfg)
  67. items := make([]recorder.StreamFeedItem, 0, len(displaySignals))
  68. for j, ds := range displaySignals {
  69. if ds.ID == 0 || ds.Class == nil {
  70. continue
  71. }
  72. if j >= len(streamSnips) || len(streamSnips[j]) == 0 {
  73. continue
  74. }
  75. snipRate := rt.cfg.SampleRate
  76. if j < len(streamRates) && streamRates[j] > 0 {
  77. snipRate = streamRates[j]
  78. }
  79. items = append(items, recorder.StreamFeedItem{Signal: ds, Snippet: streamSnips[j], SnipRate: snipRate})
  80. }
  81. if len(items) > 0 {
  82. rec.FeedSnippets(items)
  83. }
  84. }
  85. rt.maintenance(displaySignals, rec)
  86. } else {
  87. displaySignals = rt.det.StableSignals()
  88. }
  89. state.arbitration = rt.arbitration
  90. state.presentation = state.surveillance.DisplayLevel
  91. if phaseSnap != nil {
  92. phaseSnap.Set(*state)
  93. }
  94. if sigSnap != nil {
  95. sigSnap.set(displaySignals)
  96. }
  97. eventMu.Lock()
  98. for _, ev := range finished {
  99. _ = enc.Encode(ev)
  100. }
  101. eventMu.Unlock()
  102. if rec != nil && len(finished) > 0 {
  103. evCopy := make([]detector.Event, len(finished))
  104. copy(evCopy, finished)
  105. rec.OnEvents(evCopy)
  106. }
  107. var debugInfo *SpectrumDebug
  108. plan := state.refinement.Input.Plan
  109. windowStats := buildWindowStats(state.refinement.Input.Windows)
  110. hasPlan := plan.TotalCandidates > 0 || plan.Budget > 0 || plan.DroppedBySNR > 0 || plan.DroppedByBudget > 0
  111. hasWindows := windowStats != nil && windowStats.Count > 0
  112. if len(thresholds) > 0 || len(displaySignals) > 0 || noiseFloor != 0 || hasPlan || hasWindows {
  113. scoreDebug := make([]map[string]any, 0, len(displaySignals))
  114. for _, s := range displaySignals {
  115. if s.Class == nil || len(s.Class.Scores) == 0 {
  116. scoreDebug = append(scoreDebug, map[string]any{"center_hz": s.CenterHz, "class": nil})
  117. continue
  118. }
  119. scores := make(map[string]float64, len(s.Class.Scores))
  120. for k, v := range s.Class.Scores {
  121. scores[string(k)] = v
  122. }
  123. scoreDebug = append(scoreDebug, map[string]any{
  124. "center_hz": s.CenterHz,
  125. "mod_type": s.Class.ModType,
  126. "confidence": s.Class.Confidence,
  127. "second_best": s.Class.SecondBest,
  128. "scores": scores,
  129. })
  130. }
  131. debugInfo = &SpectrumDebug{Thresholds: thresholds, NoiseFloor: noiseFloor, Scores: scoreDebug}
  132. candidateSources := buildCandidateSourceSummary(state.surveillance.Candidates)
  133. candidateEvidence := buildCandidateEvidenceSummary(state.surveillance.Candidates)
  134. candidateEvidenceStates := buildCandidateEvidenceStateSummary(state.surveillance.Candidates)
  135. if len(candidateSources) > 0 {
  136. debugInfo.CandidateSources = candidateSources
  137. }
  138. if len(candidateEvidence) > 0 {
  139. debugInfo.CandidateEvidence = candidateEvidence
  140. }
  141. if candidateEvidenceStates != nil {
  142. debugInfo.CandidateEvidenceStates = candidateEvidenceStates
  143. }
  144. if hasPlan {
  145. debugInfo.RefinementPlan = &plan
  146. }
  147. if hasWindows {
  148. debugInfo.Windows = windowStats
  149. }
  150. refinementDebug := &RefinementDebug{}
  151. if hasPlan {
  152. refinementDebug.Plan = &plan
  153. refinementDebug.Request = &state.refinement.Input.Request
  154. refinementDebug.WorkItems = state.refinement.Input.WorkItems
  155. }
  156. if hasWindows {
  157. refinementDebug.Windows = windowStats
  158. }
  159. refinementDebug.Arbitration = buildArbitrationSnapshot(state.refinement, state.arbitration)
  160. debugInfo.Refinement = refinementDebug
  161. }
  162. h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.surveillanceSpectrum, Signals: displaySignals, Debug: debugInfo})
  163. }
  164. }
  165. }