Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

826 řádky
28KB

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "math"
  6. "os"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "sdr-wideband-suite/internal/config"
  12. "sdr-wideband-suite/internal/demod/gpudemod"
  13. "sdr-wideband-suite/internal/detector"
  14. "sdr-wideband-suite/internal/dsp"
  15. "sdr-wideband-suite/internal/logging"
  16. "sdr-wideband-suite/internal/telemetry"
  17. )
  18. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  19. if raw == "" {
  20. return fallback
  21. }
  22. if d, err := time.ParseDuration(raw); err == nil {
  23. return d
  24. }
  25. return fallback
  26. }
  27. func buildDecoderMap(cfg config.Config) map[string]string {
  28. out := map[string]string{}
  29. if cfg.Decoder.FT8Cmd != "" {
  30. out["FT8"] = cfg.Decoder.FT8Cmd
  31. }
  32. if cfg.Decoder.WSPRCmd != "" {
  33. out["WSPR"] = cfg.Decoder.WSPRCmd
  34. }
  35. if cfg.Decoder.DMRCmd != "" {
  36. out["DMR"] = cfg.Decoder.DMRCmd
  37. }
  38. if cfg.Decoder.DStarCmd != "" {
  39. out["D-STAR"] = cfg.Decoder.DStarCmd
  40. }
  41. if cfg.Decoder.FSKCmd != "" {
  42. out["FSK"] = cfg.Decoder.FSKCmd
  43. }
  44. if cfg.Decoder.PSKCmd != "" {
  45. out["PSK"] = cfg.Decoder.PSKCmd
  46. }
  47. return out
  48. }
  49. func decoderKeys(cfg config.Config) []string {
  50. m := buildDecoderMap(cfg)
  51. keys := make([]string, 0, len(m))
  52. for k := range m {
  53. keys = append(keys, k)
  54. }
  55. sort.Strings(keys)
  56. return keys
  57. }
  58. func (m *extractionManager) reset() {
  59. if m == nil {
  60. return
  61. }
  62. m.mu.Lock()
  63. defer m.mu.Unlock()
  64. if m.runner != nil {
  65. m.runner.Close()
  66. m.runner = nil
  67. }
  68. }
  69. func (m *extractionManager) get(sampleCount int, sampleRate int) *gpudemod.BatchRunner {
  70. if m == nil || sampleCount <= 0 || sampleRate <= 0 || !gpudemod.Available() {
  71. return nil
  72. }
  73. m.mu.Lock()
  74. defer m.mu.Unlock()
  75. if m.runner != nil && sampleCount > m.maxSamples {
  76. m.runner.Close()
  77. m.runner = nil
  78. }
  79. if m.runner == nil {
  80. // Allocate generously: enough for full allIQ (sampleRate/10 ≈ 100ms)
  81. // so the runner never needs re-allocation when used for both
  82. // classification (FFT-block ~65k) and streaming (allIQ ~273k+).
  83. allocSize := sampleCount
  84. generous := sampleRate/10 + 1024 // ~400k at 4MHz — covers any scenario
  85. if generous > allocSize {
  86. allocSize = generous
  87. }
  88. if r, err := gpudemod.NewBatchRunner(allocSize, sampleRate); err == nil {
  89. m.runner = r
  90. m.maxSamples = allocSize
  91. } else {
  92. log.Printf("gpudemod: batch runner init failed: %v", err)
  93. }
  94. return m.runner
  95. }
  96. return m.runner
  97. }
  98. func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
  99. if len(iq) == 0 || sampleRate <= 0 {
  100. return nil
  101. }
  102. results, _ := extractSignalIQBatch(nil, iq, sampleRate, centerHz, []detector.Signal{{CenterHz: sigHz, BWHz: bwHz}})
  103. if len(results) == 0 {
  104. return nil
  105. }
  106. return results[0]
  107. }
  108. func extractSignalIQBatch(extractMgr *extractionManager, iq []complex64, sampleRate int, centerHz float64, signals []detector.Signal) ([][]complex64, []int) {
  109. out := make([][]complex64, len(signals))
  110. rates := make([]int, len(signals))
  111. if len(iq) == 0 || sampleRate <= 0 || len(signals) == 0 {
  112. return out, rates
  113. }
  114. decimTarget := 200000
  115. if decimTarget <= 0 {
  116. decimTarget = sampleRate
  117. }
  118. runner := extractMgr.get(len(iq), sampleRate)
  119. if runner != nil {
  120. jobs := make([]gpudemod.ExtractJob, len(signals))
  121. for i, sig := range signals {
  122. bw := sig.BWHz
  123. sigMHz := sig.CenterHz / 1e6
  124. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  125. jobOutRate := decimTarget
  126. if isWFM {
  127. jobOutRate = wfmStreamOutRate
  128. }
  129. // Minimum extraction BW: ensure enough bandwidth for demod features
  130. // FM broadcast (87.5-108 MHz) needs >=250kHz for stereo pilot + RDS at 57kHz
  131. // Also widen for any signal classified as WFM (in case of re-extraction)
  132. if isWFM {
  133. if bw < wfmStreamMinBW {
  134. bw = wfmStreamMinBW
  135. }
  136. } else if bw < 20000 {
  137. bw = 20000
  138. }
  139. jobs[i] = gpudemod.ExtractJob{OffsetHz: sig.CenterHz - centerHz, BW: bw, OutRate: jobOutRate}
  140. }
  141. if gpuOuts, gpuRates, err := runner.ShiftFilterDecimateBatch(iq, jobs); err == nil && len(gpuOuts) == len(signals) {
  142. // batch extraction OK (silent)
  143. for i := range gpuOuts {
  144. out[i] = gpuOuts[i]
  145. if i < len(gpuRates) {
  146. rates[i] = gpuRates[i]
  147. }
  148. }
  149. return out, rates
  150. } else if err != nil {
  151. log.Printf("gpudemod: batch extraction failed for %d signals: %v", len(signals), err)
  152. }
  153. }
  154. // CPU extraction fallback (silent — see batch extraction failed above if applicable)
  155. for i, sig := range signals {
  156. offset := sig.CenterHz - centerHz
  157. shifted := dsp.FreqShift(iq, sampleRate, offset)
  158. bw := sig.BWHz
  159. // FM broadcast (87.5-108 MHz) needs >=250kHz for stereo + RDS
  160. sigMHz := sig.CenterHz / 1e6
  161. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  162. if isWFM {
  163. if bw < wfmStreamMinBW {
  164. bw = wfmStreamMinBW
  165. }
  166. } else if bw < 20000 {
  167. bw = 20000
  168. }
  169. cutoff := bw / 2
  170. if cutoff < 200 {
  171. cutoff = 200
  172. }
  173. if cutoff > float64(sampleRate)/2-1 {
  174. cutoff = float64(sampleRate)/2 - 1
  175. }
  176. taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
  177. filtered := dsp.ApplyFIR(shifted, taps)
  178. decim := sampleRate / decimTarget
  179. if decim < 1 {
  180. decim = 1
  181. }
  182. out[i] = dsp.Decimate(filtered, decim)
  183. rates[i] = sampleRate / decim
  184. }
  185. return out, rates
  186. }
  187. func parseSince(raw string) (time.Time, error) {
  188. if raw == "" {
  189. return time.Time{}, nil
  190. }
  191. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  192. if ms > 1e12 {
  193. return time.UnixMilli(ms), nil
  194. }
  195. return time.Unix(ms, 0), nil
  196. }
  197. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  198. return t, nil
  199. }
  200. return time.Parse(time.RFC3339, raw)
  201. }
  202. // streamExtractState holds per-signal persistent state for phase-continuous
  203. // GPU extraction. Stored in the DSP loop, keyed by signal ID.
  204. type streamExtractState struct {
  205. phase float64 // FreqShift phase accumulator
  206. }
  207. // streamIQOverlap holds the tail of the previous allIQ for FIR halo prepend.
  208. type streamIQOverlap struct {
  209. tail []complex64
  210. }
  211. // extractionConfig holds audio quality settings for signal extraction.
  212. type extractionConfig struct {
  213. firTaps int // AQ-3: FIR tap count (default 101)
  214. bwMult float64 // AQ-5: BW multiplier (default 1.2)
  215. }
  216. const streamOverlapLen = 512 // must be >= FIR tap count with margin
  217. const (
  218. wfmStreamOutRate = 512000
  219. wfmStreamMinBW = 250000
  220. )
  221. var forceCPUStreamExtract = func() bool {
  222. raw := strings.TrimSpace(os.Getenv("SDR_FORCE_CPU_STREAM_EXTRACT"))
  223. if raw == "" {
  224. return false
  225. }
  226. v, err := strconv.ParseBool(raw)
  227. if err != nil {
  228. return false
  229. }
  230. return v
  231. }()
  232. // extractForStreaming performs GPU-accelerated extraction with:
  233. // - Per-signal phase-continuous FreqShift (via PhaseStart in ExtractJob)
  234. // - IQ overlap prepended to allIQ so FIR kernel has real data in halo
  235. //
  236. // Returns extracted snippets with overlap trimmed, and updates phase state.
  237. // extractForStreaming is the current legacy production path.
  238. // It still relies on overlap-prepend + trim semantics and is intentionally
  239. // kept separate from the new streaming refactor/oracle path under development.
  240. func extractForStreaming(
  241. extractMgr *extractionManager,
  242. allIQ []complex64,
  243. sampleRate int,
  244. centerHz float64,
  245. signals []detector.Signal,
  246. phaseState map[int64]*streamExtractState,
  247. overlap *streamIQOverlap,
  248. aqCfg extractionConfig,
  249. coll *telemetry.Collector,
  250. ) ([][]complex64, []int) {
  251. if useStreamingProductionPath {
  252. if out, rates, err := extractForStreamingProduction(extractMgr, allIQ, sampleRate, centerHz, signals, aqCfg, coll); err == nil {
  253. return out, rates
  254. }
  255. }
  256. if useStreamingOraclePath {
  257. if out, rates, err := extractForStreamingOracle(allIQ, sampleRate, centerHz, signals, aqCfg, coll); err == nil {
  258. return out, rates
  259. }
  260. }
  261. out := make([][]complex64, len(signals))
  262. rates := make([]int, len(signals))
  263. if len(allIQ) == 0 || sampleRate <= 0 || len(signals) == 0 {
  264. return out, rates
  265. }
  266. // AQ-3: Use configured overlap length (must cover FIR taps)
  267. overlapNeeded := streamOverlapLen
  268. if aqCfg.firTaps > 0 && aqCfg.firTaps+64 > overlapNeeded {
  269. overlapNeeded = aqCfg.firTaps + 64
  270. }
  271. // Prepend overlap from previous frame so FIR kernel has real halo data
  272. var gpuIQ []complex64
  273. overlapLen := len(overlap.tail)
  274. logging.Debug("extract", "overlap", "len", overlapLen, "needed", overlapNeeded, "allIQ", len(allIQ))
  275. if overlapLen > 0 {
  276. gpuIQ = make([]complex64, overlapLen+len(allIQ))
  277. copy(gpuIQ, overlap.tail)
  278. copy(gpuIQ[overlapLen:], allIQ)
  279. } else {
  280. gpuIQ = allIQ
  281. overlapLen = 0
  282. }
  283. // Save tail for next frame (sized to cover configured FIR taps)
  284. if len(allIQ) > overlapNeeded {
  285. overlap.tail = append(overlap.tail[:0], allIQ[len(allIQ)-overlapNeeded:]...)
  286. } else {
  287. overlap.tail = append(overlap.tail[:0], allIQ...)
  288. }
  289. decimTarget := 200000
  290. // AQ-5: BW multiplier for extraction (wider = better S/N for weak signals)
  291. bwMult := aqCfg.bwMult
  292. if bwMult <= 0 {
  293. bwMult = 1.0
  294. }
  295. if coll != nil {
  296. coll.SetGauge("iq.extract.input.length", float64(len(allIQ)), nil)
  297. coll.SetGauge("iq.extract.input.overlap_length", float64(overlapLen), nil)
  298. headMean, tailMean, boundaryScore, _ := boundaryMetrics(overlap.tail, allIQ, 32)
  299. coll.SetGauge("iq.extract.input.head_mean_mag", headMean, nil)
  300. coll.SetGauge("iq.extract.input.prev_tail_mean_mag", tailMean, nil)
  301. coll.Observe("iq.extract.input.discontinuity_score", boundaryScore, nil)
  302. }
  303. rawBoundary := make(map[int64]boundaryProbeState, len(signals))
  304. trimmedBoundary := make(map[int64]boundaryProbeState, len(signals))
  305. // Build jobs with per-signal phase
  306. jobs := make([]gpudemod.ExtractJob, len(signals))
  307. for i, sig := range signals {
  308. bw := sig.BWHz * bwMult // AQ-5: widen extraction BW
  309. sigMHz := sig.CenterHz / 1e6
  310. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) ||
  311. (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  312. jobOutRate := decimTarget
  313. if isWFM {
  314. jobOutRate = wfmStreamOutRate
  315. if bw < wfmStreamMinBW {
  316. bw = wfmStreamMinBW
  317. }
  318. } else if bw < 20000 {
  319. bw = 20000
  320. }
  321. ps := phaseState[sig.ID]
  322. if ps == nil {
  323. ps = &streamExtractState{}
  324. phaseState[sig.ID] = ps
  325. }
  326. // PhaseStart is where the NEW data begins. But gpuIQ has overlap
  327. // prepended, so the GPU kernel starts processing at the overlap.
  328. // We need to rewind the phase by overlapLen samples so that the
  329. // overlap region gets the correct phase, and the new data region
  330. // starts at ps.phase exactly.
  331. phaseInc := -2.0 * math.Pi * (sig.CenterHz - centerHz) / float64(sampleRate)
  332. gpuPhaseStart := ps.phase - phaseInc*float64(overlapLen)
  333. jobs[i] = gpudemod.ExtractJob{
  334. OffsetHz: sig.CenterHz - centerHz,
  335. BW: bw,
  336. OutRate: jobOutRate,
  337. PhaseStart: gpuPhaseStart,
  338. }
  339. if coll != nil {
  340. tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID), "path", "gpu")
  341. inputHead := probeHead(gpuIQ, 16, 1e-6)
  342. coll.SetGauge("iq.extract.input_head.zero_count", float64(inputHead.zeroCount), tags)
  343. coll.SetGauge("iq.extract.input_head.first_nonzero_index", float64(inputHead.firstNonZeroIndex), tags)
  344. coll.SetGauge("iq.extract.input_head.max_step", inputHead.maxStep, tags)
  345. coll.Event("extract_input_head_probe", "info", "extractor input head probe", tags, map[string]any{
  346. "mags": inputHead.mags,
  347. "zero_count": inputHead.zeroCount,
  348. "first_nonzero_index": inputHead.firstNonZeroIndex,
  349. "head_max_step": inputHead.maxStep,
  350. "center_offset_hz": jobs[i].OffsetHz,
  351. "bandwidth_hz": bw,
  352. "out_rate": jobOutRate,
  353. "trim_samples": (overlapLen + int(math.Max(1, math.Round(float64(sampleRate)/float64(jobOutRate)))) - 1) / int(math.Max(1, math.Round(float64(sampleRate)/float64(jobOutRate)))),
  354. })
  355. }
  356. }
  357. // Try GPU BatchRunner with phase unless CPU-only debug is forced.
  358. var runner *gpudemod.BatchRunner
  359. if forceCPUStreamExtract {
  360. logging.Warn("boundary", "force_cpu_stream_extract", "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "signals", len(signals))
  361. } else {
  362. runner = extractMgr.get(len(gpuIQ), sampleRate)
  363. }
  364. if runner != nil {
  365. if coll != nil && len(gpuIQ) > 0 {
  366. inputProbe := probeHead(gpuIQ, 16, 1e-6)
  367. coll.Event("gpu_kernel_input_head_probe", "info", "gpu kernel input head probe", nil, map[string]any{
  368. "mags": inputProbe.mags,
  369. "zero_count": inputProbe.zeroCount,
  370. "first_nonzero_index": inputProbe.firstNonZeroIndex,
  371. "head_max_step": inputProbe.maxStep,
  372. "gpuIQ_len": len(gpuIQ),
  373. "sample_rate": sampleRate,
  374. "signals": len(signals),
  375. })
  376. }
  377. results, err := runner.ShiftFilterDecimateBatchWithPhase(gpuIQ, jobs)
  378. if err == nil && len(results) == len(signals) {
  379. for i, res := range results {
  380. outRate := res.Rate
  381. if outRate <= 0 {
  382. outRate = decimTarget
  383. }
  384. sigMHz := signals[i].CenterHz / 1e6
  385. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (signals[i].Class != nil && (signals[i].Class.ModType == "WFM" || signals[i].Class.ModType == "WFM_STEREO"))
  386. if isWFM {
  387. outRate = wfmStreamOutRate
  388. }
  389. decim := sampleRate / outRate
  390. if decim < 1 {
  391. decim = 1
  392. }
  393. trimSamples := (overlapLen + decim - 1) / decim
  394. if i == 0 {
  395. logging.Debug("extract", "gpu_result", "rate", res.Rate, "outRate", outRate, "decim", decim, "trim", trimSamples)
  396. }
  397. // Update phase state — advance only by NEW data length, not overlap
  398. phaseInc := -2.0 * math.Pi * jobs[i].OffsetHz / float64(sampleRate)
  399. phaseState[signals[i].ID].phase += phaseInc * float64(len(allIQ))
  400. // Normalize to [-π, π) to prevent float64 drift over long runs
  401. phaseState[signals[i].ID].phase = math.Remainder(phaseState[signals[i].ID].phase, 2*math.Pi)
  402. // Trim overlap from output
  403. iq := res.IQ
  404. rawLen := len(iq)
  405. if trimSamples > 0 && trimSamples < len(iq) {
  406. iq = iq[trimSamples:]
  407. }
  408. if i == 0 {
  409. logging.Debug("boundary", "extract_trim", "path", "gpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(iq), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID)
  410. logExtractorHeadComparison(signals[i].ID, "gpu", overlapLen, res.IQ, trimSamples, iq)
  411. }
  412. if coll != nil {
  413. tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", signals[i].ID), "path", "gpu")
  414. kernelProbe := probeHead(res.IQ, 16, 1e-6)
  415. coll.Event("gpu_kernel_output_head_probe", "info", "gpu kernel output head probe", tags, map[string]any{
  416. "mags": kernelProbe.mags,
  417. "zero_count": kernelProbe.zeroCount,
  418. "first_nonzero_index": kernelProbe.firstNonZeroIndex,
  419. "head_max_step": kernelProbe.maxStep,
  420. "raw_len": rawLen,
  421. "out_rate": outRate,
  422. "trim_samples": trimSamples,
  423. })
  424. stats := computeIQHeadStats(iq, 64)
  425. coll.SetGauge("iq.extract.output.length", float64(len(iq)), tags)
  426. coll.Observe("iq.extract.output.head_mean_mag", stats.meanMag, tags)
  427. coll.Observe("iq.extract.output.head_min_mag", stats.minMag, tags)
  428. coll.Observe("iq.extract.output.head_max_step", stats.maxStep, tags)
  429. coll.Observe("iq.extract.output.head_p95_step", stats.p95Step, tags)
  430. coll.Observe("iq.extract.output.head_tail_ratio", stats.headTail, tags)
  431. coll.SetGauge("iq.extract.output.head_low_magnitude_count", float64(stats.lowMag), tags)
  432. coll.SetGauge("iq.extract.raw.length", float64(rawLen), tags)
  433. coll.SetGauge("iq.extract.trim.trim_samples", float64(trimSamples), tags)
  434. if rawLen > 0 {
  435. coll.SetGauge("iq.extract.raw.head_mag", math.Hypot(float64(real(res.IQ[0])), float64(imag(res.IQ[0]))), tags)
  436. coll.SetGauge("iq.extract.raw.tail_mag", math.Hypot(float64(real(res.IQ[rawLen-1])), float64(imag(res.IQ[rawLen-1]))), tags)
  437. rawHead := probeHead(res.IQ, 16, 1e-6)
  438. coll.SetGauge("iq.extract.raw.head_zero_count", float64(rawHead.zeroCount), tags)
  439. coll.SetGauge("iq.extract.raw.first_nonzero_index", float64(rawHead.firstNonZeroIndex), tags)
  440. coll.SetGauge("iq.extract.raw.head_max_step", rawHead.maxStep, tags)
  441. coll.Event("extract_raw_head_probe", "info", "raw extractor head probe", tags, map[string]any{
  442. "mags": rawHead.mags,
  443. "zero_count": rawHead.zeroCount,
  444. "first_nonzero_index": rawHead.firstNonZeroIndex,
  445. "head_max_step": rawHead.maxStep,
  446. "trim_samples": trimSamples,
  447. })
  448. }
  449. if len(iq) > 0 {
  450. coll.SetGauge("iq.extract.trimmed.head_mag", math.Hypot(float64(real(iq[0])), float64(imag(iq[0]))), tags)
  451. coll.SetGauge("iq.extract.trimmed.tail_mag", math.Hypot(float64(real(iq[len(iq)-1])), float64(imag(iq[len(iq)-1]))), tags)
  452. trimmedHead := probeHead(iq, 16, 1e-6)
  453. coll.SetGauge("iq.extract.trimmed.head_zero_count", float64(trimmedHead.zeroCount), tags)
  454. coll.SetGauge("iq.extract.trimmed.first_nonzero_index", float64(trimmedHead.firstNonZeroIndex), tags)
  455. coll.SetGauge("iq.extract.trimmed.head_max_step", trimmedHead.maxStep, tags)
  456. coll.Event("extract_trimmed_head_probe", "info", "trimmed extractor head probe", tags, map[string]any{
  457. "mags": trimmedHead.mags,
  458. "zero_count": trimmedHead.zeroCount,
  459. "first_nonzero_index": trimmedHead.firstNonZeroIndex,
  460. "head_max_step": trimmedHead.maxStep,
  461. "trim_samples": trimSamples,
  462. })
  463. }
  464. if rb := rawBoundary[signals[i].ID]; rb.set && rawLen > 0 {
  465. prevMag := math.Hypot(float64(real(rb.last)), float64(imag(rb.last)))
  466. currMag := math.Hypot(float64(real(res.IQ[0])), float64(imag(res.IQ[0])))
  467. coll.SetGauge("iq.extract.raw.boundary.prev_tail_mag", prevMag, tags)
  468. coll.SetGauge("iq.extract.raw.boundary.curr_head_mag", currMag, tags)
  469. coll.Event("extract_raw_boundary", "info", "raw extractor boundary", tags, map[string]any{
  470. "delta_mag": math.Abs(currMag - prevMag),
  471. "trim_samples": trimSamples,
  472. "raw_len": rawLen,
  473. })
  474. }
  475. if tb := trimmedBoundary[signals[i].ID]; tb.set && len(iq) > 0 {
  476. prevMag := math.Hypot(float64(real(tb.last)), float64(imag(tb.last)))
  477. currMag := math.Hypot(float64(real(iq[0])), float64(imag(iq[0])))
  478. coll.SetGauge("iq.extract.trimmed.boundary.prev_tail_mag", prevMag, tags)
  479. coll.SetGauge("iq.extract.trimmed.boundary.curr_head_mag", currMag, tags)
  480. coll.Event("extract_trimmed_boundary", "info", "trimmed extractor boundary", tags, map[string]any{
  481. "delta_mag": math.Abs(currMag - prevMag),
  482. "trim_samples": trimSamples,
  483. "out_len": len(iq),
  484. })
  485. }
  486. }
  487. if rawLen > 0 {
  488. rawBoundary[signals[i].ID] = boundaryProbeState{last: res.IQ[rawLen-1], set: true}
  489. }
  490. if len(iq) > 0 {
  491. trimmedBoundary[signals[i].ID] = boundaryProbeState{last: iq[len(iq)-1], set: true}
  492. }
  493. out[i] = iq
  494. rates[i] = res.Rate
  495. }
  496. return out, rates
  497. } else if err != nil {
  498. log.Printf("gpudemod: stream batch extraction failed: %v", err)
  499. }
  500. }
  501. // CPU fallback (with phase tracking)
  502. for i, sig := range signals {
  503. offset := sig.CenterHz - centerHz
  504. bw := jobs[i].BW
  505. ps := phaseState[sig.ID]
  506. // Phase-continuous FreqShift — rewind by overlap so new data starts at ps.phase
  507. shifted := make([]complex64, len(gpuIQ))
  508. inc := -2.0 * math.Pi * offset / float64(sampleRate)
  509. phase := ps.phase - inc*float64(overlapLen)
  510. for k, v := range gpuIQ {
  511. phase += inc
  512. re := math.Cos(phase)
  513. im := math.Sin(phase)
  514. shifted[k] = complex(
  515. float32(float64(real(v))*re-float64(imag(v))*im),
  516. float32(float64(real(v))*im+float64(imag(v))*re),
  517. )
  518. }
  519. // Advance phase by NEW data length only
  520. ps.phase += inc * float64(len(allIQ))
  521. ps.phase = math.Remainder(ps.phase, 2*math.Pi)
  522. cutoff := bw / 2
  523. if cutoff < 200 {
  524. cutoff = 200
  525. }
  526. if cutoff > float64(sampleRate)/2-1 {
  527. cutoff = float64(sampleRate)/2 - 1
  528. }
  529. firTaps := 101
  530. if aqCfg.firTaps > 0 {
  531. firTaps = aqCfg.firTaps
  532. }
  533. taps := dsp.LowpassFIR(cutoff, sampleRate, firTaps)
  534. filtered := dsp.ApplyFIR(shifted, taps)
  535. sigMHz := sig.CenterHz / 1e6
  536. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  537. outRate := decimTarget
  538. if isWFM {
  539. outRate = wfmStreamOutRate
  540. }
  541. decim := sampleRate / outRate
  542. if decim < 1 {
  543. decim = 1
  544. }
  545. decimated := dsp.Decimate(filtered, decim)
  546. rates[i] = sampleRate / decim
  547. // Trim overlap — use ceil to ensure ALL overlap samples are removed.
  548. // Floor trim (overlapLen/decim) leaves a remainder for non-divisible
  549. // factors (e.g. 512/20=25 trims only 500 of 512 samples → 12 leak).
  550. trimSamples := (overlapLen + decim - 1) / decim
  551. if i == 0 {
  552. logging.Debug("extract", "cpu_result", "outRate", outRate, "decim", decim, "trim", trimSamples)
  553. }
  554. rawIQ := decimated
  555. rawLen := len(rawIQ)
  556. if trimSamples > 0 && trimSamples < len(decimated) {
  557. decimated = decimated[trimSamples:]
  558. }
  559. if i == 0 {
  560. logging.Debug("boundary", "extract_trim", "path", "cpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(decimated), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID)
  561. logExtractorHeadComparison(signals[i].ID, "cpu", overlapLen, decimated, trimSamples, decimated)
  562. }
  563. if coll != nil {
  564. tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", signals[i].ID), "path", "cpu")
  565. stats := computeIQHeadStats(decimated, 64)
  566. coll.SetGauge("iq.extract.output.length", float64(len(decimated)), tags)
  567. coll.Observe("iq.extract.output.head_mean_mag", stats.meanMag, tags)
  568. coll.Observe("iq.extract.output.head_min_mag", stats.minMag, tags)
  569. coll.Observe("iq.extract.output.head_max_step", stats.maxStep, tags)
  570. coll.Observe("iq.extract.output.head_p95_step", stats.p95Step, tags)
  571. coll.Observe("iq.extract.output.head_tail_ratio", stats.headTail, tags)
  572. coll.SetGauge("iq.extract.output.head_low_magnitude_count", float64(stats.lowMag), tags)
  573. coll.SetGauge("iq.extract.raw.length", float64(rawLen), tags)
  574. coll.SetGauge("iq.extract.trim.trim_samples", float64(trimSamples), tags)
  575. if rb := rawBoundary[signals[i].ID]; rb.set && rawLen > 0 {
  576. observeBoundarySample(coll, "iq.extract.raw.boundary", tags, rb.last, rawIQ[0])
  577. }
  578. if tb := trimmedBoundary[signals[i].ID]; tb.set && len(decimated) > 0 {
  579. observeBoundarySample(coll, "iq.extract.trimmed.boundary", tags, tb.last, decimated[0])
  580. }
  581. }
  582. if rawLen > 0 {
  583. rawBoundary[signals[i].ID] = boundaryProbeState{last: rawIQ[rawLen-1], set: true}
  584. }
  585. if len(decimated) > 0 {
  586. trimmedBoundary[signals[i].ID] = boundaryProbeState{last: decimated[len(decimated)-1], set: true}
  587. }
  588. out[i] = decimated
  589. }
  590. return out, rates
  591. }
  592. type iqHeadStats struct {
  593. length int
  594. minMag float64
  595. maxMag float64
  596. meanMag float64
  597. lowMag int
  598. maxStep float64
  599. maxStepIdx int
  600. p95Step float64
  601. headTail float64
  602. headMinIdx int
  603. stepSamples []float64
  604. }
  605. type boundaryProbeState struct {
  606. last complex64
  607. set bool
  608. }
  609. type headProbe struct {
  610. zeroCount int
  611. firstNonZeroIndex int
  612. maxStep float64
  613. mags []float64
  614. }
  615. func probeHead(samples []complex64, n int, zeroThreshold float64) headProbe {
  616. if n <= 0 || len(samples) == 0 {
  617. return headProbe{firstNonZeroIndex: -1}
  618. }
  619. if len(samples) < n {
  620. n = len(samples)
  621. }
  622. if zeroThreshold <= 0 {
  623. zeroThreshold = 1e-6
  624. }
  625. out := headProbe{firstNonZeroIndex: -1, mags: make([]float64, 0, n)}
  626. for i := 0; i < n; i++ {
  627. v := samples[i]
  628. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  629. out.mags = append(out.mags, mag)
  630. if mag <= zeroThreshold {
  631. out.zeroCount++
  632. } else if out.firstNonZeroIndex < 0 {
  633. out.firstNonZeroIndex = i
  634. }
  635. if i > 0 {
  636. p := samples[i-1]
  637. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  638. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  639. step := math.Abs(math.Atan2(num, den))
  640. if step > out.maxStep {
  641. out.maxStep = step
  642. }
  643. }
  644. }
  645. return out
  646. }
  647. func observeBoundarySample(coll *telemetry.Collector, metricPrefix string, tags map[string]string, prev complex64, curr complex64) {
  648. prevMag := math.Hypot(float64(real(prev)), float64(imag(prev)))
  649. currMag := math.Hypot(float64(real(curr)), float64(imag(curr)))
  650. deltaMag := math.Abs(currMag - prevMag)
  651. num := float64(real(prev))*float64(imag(curr)) - float64(imag(prev))*float64(real(curr))
  652. den := float64(real(prev))*float64(real(curr)) + float64(imag(prev))*float64(imag(curr))
  653. deltaPhase := math.Abs(math.Atan2(num, den))
  654. d2 := float64(real(curr-prev))*float64(real(curr-prev)) + float64(imag(curr-prev))*float64(imag(curr-prev))
  655. coll.Observe(metricPrefix+".delta_mag", deltaMag, tags)
  656. coll.Observe(metricPrefix+".delta_phase", deltaPhase, tags)
  657. coll.Observe(metricPrefix+".d2", d2, tags)
  658. coll.Observe(metricPrefix+".discontinuity_score", deltaMag+deltaPhase, tags)
  659. }
  660. func computeIQHeadStats(iq []complex64, headLen int) iqHeadStats {
  661. stats := iqHeadStats{minMag: math.MaxFloat64, headMinIdx: -1, maxStepIdx: -1}
  662. if len(iq) == 0 {
  663. stats.minMag = 0
  664. return stats
  665. }
  666. n := len(iq)
  667. if headLen > 0 && headLen < n {
  668. n = headLen
  669. }
  670. stats.length = n
  671. stats.stepSamples = make([]float64, 0, max(0, n-1))
  672. sumMag := 0.0
  673. headSum := 0.0
  674. tailSum := 0.0
  675. tailCount := 0
  676. for i := 0; i < n; i++ {
  677. v := iq[i]
  678. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  679. if mag < stats.minMag {
  680. stats.minMag = mag
  681. stats.headMinIdx = i
  682. }
  683. if mag > stats.maxMag {
  684. stats.maxMag = mag
  685. }
  686. sumMag += mag
  687. if mag < 0.05 {
  688. stats.lowMag++
  689. }
  690. if i < min(16, n) {
  691. headSum += mag
  692. }
  693. if i >= max(0, n-16) {
  694. tailSum += mag
  695. tailCount++
  696. }
  697. if i > 0 {
  698. p := iq[i-1]
  699. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  700. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  701. step := math.Abs(math.Atan2(num, den))
  702. if step > stats.maxStep {
  703. stats.maxStep = step
  704. stats.maxStepIdx = i - 1
  705. }
  706. stats.stepSamples = append(stats.stepSamples, step)
  707. }
  708. }
  709. stats.meanMag = sumMag / float64(n)
  710. if len(stats.stepSamples) > 0 {
  711. sorted := append([]float64(nil), stats.stepSamples...)
  712. sort.Float64s(sorted)
  713. idx := int(float64(len(sorted)-1) * 0.95)
  714. stats.p95Step = sorted[idx]
  715. } else {
  716. stats.p95Step = stats.maxStep
  717. }
  718. if headSum > 0 && tailCount > 0 {
  719. headMean := headSum / float64(min(16, n))
  720. tailMean := tailSum / float64(tailCount)
  721. if tailMean > 0 {
  722. stats.headTail = headMean / tailMean
  723. }
  724. }
  725. return stats
  726. }
  727. func observeIQStats(coll *telemetry.Collector, stage string, iq []complex64, tags telemetry.Tags) {
  728. if coll == nil || len(iq) == 0 {
  729. return
  730. }
  731. stats := computeIQHeadStats(iq, len(iq))
  732. stageTags := telemetry.TagsWith(tags, "stage", stage)
  733. coll.Observe("iq.magnitude.min", stats.minMag, stageTags)
  734. coll.Observe("iq.magnitude.max", stats.maxMag, stageTags)
  735. coll.Observe("iq.magnitude.mean", stats.meanMag, stageTags)
  736. coll.Observe("iq.phase_step.max", stats.maxStep, stageTags)
  737. coll.Observe("iq.phase_step.p95", stats.p95Step, stageTags)
  738. coll.Observe("iq.low_magnitude.count", float64(stats.lowMag), stageTags)
  739. coll.SetGauge("iq.length", float64(stats.length), stageTags)
  740. }
  741. func logExtractorHeadComparison(signalID int64, path string, overlapLen int, raw []complex64, trimSamples int, out []complex64) {
  742. rawStats := computeIQHeadStats(raw, 96)
  743. trimmedStats := computeIQHeadStats(out, 96)
  744. logging.Debug("boundary", "extract_head_compare",
  745. "signal", signalID,
  746. "path", path,
  747. "raw_len", len(raw),
  748. "trim", trimSamples,
  749. "out_len", len(out),
  750. "overlap_len", overlapLen,
  751. "raw_min_mag", rawStats.minMag,
  752. "raw_min_idx", rawStats.headMinIdx,
  753. "raw_max_step", rawStats.maxStep,
  754. "raw_max_step_idx", rawStats.maxStepIdx,
  755. "raw_head_tail", rawStats.headTail,
  756. "trimmed_min_mag", trimmedStats.minMag,
  757. "trimmed_min_idx", trimmedStats.headMinIdx,
  758. "trimmed_max_step", trimmedStats.maxStep,
  759. "trimmed_max_step_idx", trimmedStats.maxStepIdx,
  760. "trimmed_head_tail", trimmedStats.headTail,
  761. )
  762. for _, off := range []int{2, 4, 8, 16} {
  763. if len(out) <= off+8 {
  764. continue
  765. }
  766. offStats := computeIQHeadStats(out[off:], 96)
  767. logging.Debug("boundary", "extract_head_offset_compare",
  768. "signal", signalID,
  769. "path", path,
  770. "offset", off,
  771. "base_min_mag", trimmedStats.minMag,
  772. "base_min_idx", trimmedStats.headMinIdx,
  773. "base_max_step", trimmedStats.maxStep,
  774. "base_max_step_idx", trimmedStats.maxStepIdx,
  775. "offset_min_mag", offStats.minMag,
  776. "offset_min_idx", offStats.headMinIdx,
  777. "offset_max_step", offStats.maxStep,
  778. "offset_max_step_idx", offStats.maxStepIdx,
  779. "offset_head_tail", offStats.headTail,
  780. )
  781. }
  782. }