Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

813 wiersze
27KB

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "math"
  6. "os"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "sdr-wideband-suite/internal/config"
  12. "sdr-wideband-suite/internal/demod/gpudemod"
  13. "sdr-wideband-suite/internal/detector"
  14. "sdr-wideband-suite/internal/dsp"
  15. "sdr-wideband-suite/internal/logging"
  16. "sdr-wideband-suite/internal/telemetry"
  17. )
  18. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  19. if raw == "" {
  20. return fallback
  21. }
  22. if d, err := time.ParseDuration(raw); err == nil {
  23. return d
  24. }
  25. return fallback
  26. }
  27. func buildDecoderMap(cfg config.Config) map[string]string {
  28. out := map[string]string{}
  29. if cfg.Decoder.FT8Cmd != "" {
  30. out["FT8"] = cfg.Decoder.FT8Cmd
  31. }
  32. if cfg.Decoder.WSPRCmd != "" {
  33. out["WSPR"] = cfg.Decoder.WSPRCmd
  34. }
  35. if cfg.Decoder.DMRCmd != "" {
  36. out["DMR"] = cfg.Decoder.DMRCmd
  37. }
  38. if cfg.Decoder.DStarCmd != "" {
  39. out["D-STAR"] = cfg.Decoder.DStarCmd
  40. }
  41. if cfg.Decoder.FSKCmd != "" {
  42. out["FSK"] = cfg.Decoder.FSKCmd
  43. }
  44. if cfg.Decoder.PSKCmd != "" {
  45. out["PSK"] = cfg.Decoder.PSKCmd
  46. }
  47. return out
  48. }
  49. func decoderKeys(cfg config.Config) []string {
  50. m := buildDecoderMap(cfg)
  51. keys := make([]string, 0, len(m))
  52. for k := range m {
  53. keys = append(keys, k)
  54. }
  55. sort.Strings(keys)
  56. return keys
  57. }
  58. func (m *extractionManager) reset() {
  59. if m == nil {
  60. return
  61. }
  62. m.mu.Lock()
  63. defer m.mu.Unlock()
  64. if m.runner != nil {
  65. m.runner.Close()
  66. m.runner = nil
  67. }
  68. }
  69. func (m *extractionManager) get(sampleCount int, sampleRate int) *gpudemod.BatchRunner {
  70. if m == nil || sampleCount <= 0 || sampleRate <= 0 || !gpudemod.Available() {
  71. return nil
  72. }
  73. m.mu.Lock()
  74. defer m.mu.Unlock()
  75. if m.runner != nil && sampleCount > m.maxSamples {
  76. m.runner.Close()
  77. m.runner = nil
  78. }
  79. if m.runner == nil {
  80. // Allocate generously: enough for full allIQ (sampleRate/10 ≈ 100ms)
  81. // so the runner never needs re-allocation when used for both
  82. // classification (FFT-block ~65k) and streaming (allIQ ~273k+).
  83. allocSize := sampleCount
  84. generous := sampleRate/10 + 1024 // ~400k at 4MHz — covers any scenario
  85. if generous > allocSize {
  86. allocSize = generous
  87. }
  88. if r, err := gpudemod.NewBatchRunner(allocSize, sampleRate); err == nil {
  89. m.runner = r
  90. m.maxSamples = allocSize
  91. } else {
  92. log.Printf("gpudemod: batch runner init failed: %v", err)
  93. }
  94. return m.runner
  95. }
  96. return m.runner
  97. }
  98. func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
  99. if len(iq) == 0 || sampleRate <= 0 {
  100. return nil
  101. }
  102. results, _ := extractSignalIQBatch(nil, iq, sampleRate, centerHz, []detector.Signal{{CenterHz: sigHz, BWHz: bwHz}})
  103. if len(results) == 0 {
  104. return nil
  105. }
  106. return results[0]
  107. }
  108. func extractSignalIQBatch(extractMgr *extractionManager, iq []complex64, sampleRate int, centerHz float64, signals []detector.Signal) ([][]complex64, []int) {
  109. out := make([][]complex64, len(signals))
  110. rates := make([]int, len(signals))
  111. if len(iq) == 0 || sampleRate <= 0 || len(signals) == 0 {
  112. return out, rates
  113. }
  114. decimTarget := 200000
  115. if decimTarget <= 0 {
  116. decimTarget = sampleRate
  117. }
  118. runner := extractMgr.get(len(iq), sampleRate)
  119. if runner != nil {
  120. jobs := make([]gpudemod.ExtractJob, len(signals))
  121. for i, sig := range signals {
  122. bw := sig.BWHz
  123. sigMHz := sig.CenterHz / 1e6
  124. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  125. jobOutRate := decimTarget
  126. if isWFM {
  127. jobOutRate = wfmStreamOutRate
  128. }
  129. // Minimum extraction BW: ensure enough bandwidth for demod features
  130. // FM broadcast (87.5-108 MHz) needs >=250kHz for stereo pilot + RDS at 57kHz
  131. // Also widen for any signal classified as WFM (in case of re-extraction)
  132. if isWFM {
  133. if bw < wfmStreamMinBW {
  134. bw = wfmStreamMinBW
  135. }
  136. } else if bw < 20000 {
  137. bw = 20000
  138. }
  139. jobs[i] = gpudemod.ExtractJob{OffsetHz: sig.CenterHz - centerHz, BW: bw, OutRate: jobOutRate}
  140. }
  141. if gpuOuts, gpuRates, err := runner.ShiftFilterDecimateBatch(iq, jobs); err == nil && len(gpuOuts) == len(signals) {
  142. // batch extraction OK (silent)
  143. for i := range gpuOuts {
  144. out[i] = gpuOuts[i]
  145. if i < len(gpuRates) {
  146. rates[i] = gpuRates[i]
  147. }
  148. }
  149. return out, rates
  150. } else if err != nil {
  151. log.Printf("gpudemod: batch extraction failed for %d signals: %v", len(signals), err)
  152. }
  153. }
  154. // CPU extraction fallback (silent — see batch extraction failed above if applicable)
  155. for i, sig := range signals {
  156. offset := sig.CenterHz - centerHz
  157. shifted := dsp.FreqShift(iq, sampleRate, offset)
  158. bw := sig.BWHz
  159. // FM broadcast (87.5-108 MHz) needs >=250kHz for stereo + RDS
  160. sigMHz := sig.CenterHz / 1e6
  161. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  162. if isWFM {
  163. if bw < wfmStreamMinBW {
  164. bw = wfmStreamMinBW
  165. }
  166. } else if bw < 20000 {
  167. bw = 20000
  168. }
  169. cutoff := bw / 2
  170. if cutoff < 200 {
  171. cutoff = 200
  172. }
  173. if cutoff > float64(sampleRate)/2-1 {
  174. cutoff = float64(sampleRate)/2 - 1
  175. }
  176. taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
  177. filtered := dsp.ApplyFIR(shifted, taps)
  178. decim := sampleRate / decimTarget
  179. if decim < 1 {
  180. decim = 1
  181. }
  182. out[i] = dsp.Decimate(filtered, decim)
  183. rates[i] = sampleRate / decim
  184. }
  185. return out, rates
  186. }
  187. func parseSince(raw string) (time.Time, error) {
  188. if raw == "" {
  189. return time.Time{}, nil
  190. }
  191. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  192. if ms > 1e12 {
  193. return time.UnixMilli(ms), nil
  194. }
  195. return time.Unix(ms, 0), nil
  196. }
  197. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  198. return t, nil
  199. }
  200. return time.Parse(time.RFC3339, raw)
  201. }
  202. // streamExtractState holds per-signal persistent state for phase-continuous
  203. // GPU extraction. Stored in the DSP loop, keyed by signal ID.
  204. type streamExtractState struct {
  205. phase float64 // FreqShift phase accumulator
  206. }
  207. // streamIQOverlap holds the tail of the previous allIQ for FIR halo prepend.
  208. type streamIQOverlap struct {
  209. tail []complex64
  210. }
  211. // extractionConfig holds audio quality settings for signal extraction.
  212. type extractionConfig struct {
  213. firTaps int // AQ-3: FIR tap count (default 101)
  214. bwMult float64 // AQ-5: BW multiplier (default 1.2)
  215. }
  216. const streamOverlapLen = 512 // must be >= FIR tap count with margin
  217. const (
  218. wfmStreamOutRate = 500000
  219. wfmStreamMinBW = 250000
  220. )
  221. var forceCPUStreamExtract = func() bool {
  222. raw := strings.TrimSpace(os.Getenv("SDR_FORCE_CPU_STREAM_EXTRACT"))
  223. if raw == "" {
  224. return false
  225. }
  226. v, err := strconv.ParseBool(raw)
  227. if err != nil {
  228. return false
  229. }
  230. return v
  231. }()
  232. // extractForStreaming performs GPU-accelerated extraction with:
  233. // - Per-signal phase-continuous FreqShift (via PhaseStart in ExtractJob)
  234. // - IQ overlap prepended to allIQ so FIR kernel has real data in halo
  235. //
  236. // Returns extracted snippets with overlap trimmed, and updates phase state.
  237. func extractForStreaming(
  238. extractMgr *extractionManager,
  239. allIQ []complex64,
  240. sampleRate int,
  241. centerHz float64,
  242. signals []detector.Signal,
  243. phaseState map[int64]*streamExtractState,
  244. overlap *streamIQOverlap,
  245. aqCfg extractionConfig,
  246. coll *telemetry.Collector,
  247. ) ([][]complex64, []int) {
  248. out := make([][]complex64, len(signals))
  249. rates := make([]int, len(signals))
  250. if len(allIQ) == 0 || sampleRate <= 0 || len(signals) == 0 {
  251. return out, rates
  252. }
  253. // AQ-3: Use configured overlap length (must cover FIR taps)
  254. overlapNeeded := streamOverlapLen
  255. if aqCfg.firTaps > 0 && aqCfg.firTaps+64 > overlapNeeded {
  256. overlapNeeded = aqCfg.firTaps + 64
  257. }
  258. // Prepend overlap from previous frame so FIR kernel has real halo data
  259. var gpuIQ []complex64
  260. overlapLen := len(overlap.tail)
  261. logging.Debug("extract", "overlap", "len", overlapLen, "needed", overlapNeeded, "allIQ", len(allIQ))
  262. if overlapLen > 0 {
  263. gpuIQ = make([]complex64, overlapLen+len(allIQ))
  264. copy(gpuIQ, overlap.tail)
  265. copy(gpuIQ[overlapLen:], allIQ)
  266. } else {
  267. gpuIQ = allIQ
  268. overlapLen = 0
  269. }
  270. // Save tail for next frame (sized to cover configured FIR taps)
  271. if len(allIQ) > overlapNeeded {
  272. overlap.tail = append(overlap.tail[:0], allIQ[len(allIQ)-overlapNeeded:]...)
  273. } else {
  274. overlap.tail = append(overlap.tail[:0], allIQ...)
  275. }
  276. decimTarget := 200000
  277. // AQ-5: BW multiplier for extraction (wider = better S/N for weak signals)
  278. bwMult := aqCfg.bwMult
  279. if bwMult <= 0 {
  280. bwMult = 1.0
  281. }
  282. if coll != nil {
  283. coll.SetGauge("iq.extract.input.length", float64(len(allIQ)), nil)
  284. coll.SetGauge("iq.extract.input.overlap_length", float64(overlapLen), nil)
  285. headMean, tailMean, boundaryScore, _ := boundaryMetrics(overlap.tail, allIQ, 32)
  286. coll.SetGauge("iq.extract.input.head_mean_mag", headMean, nil)
  287. coll.SetGauge("iq.extract.input.prev_tail_mean_mag", tailMean, nil)
  288. coll.Observe("iq.extract.input.discontinuity_score", boundaryScore, nil)
  289. }
  290. rawBoundary := make(map[int64]boundaryProbeState, len(signals))
  291. trimmedBoundary := make(map[int64]boundaryProbeState, len(signals))
  292. // Build jobs with per-signal phase
  293. jobs := make([]gpudemod.ExtractJob, len(signals))
  294. for i, sig := range signals {
  295. bw := sig.BWHz * bwMult // AQ-5: widen extraction BW
  296. sigMHz := sig.CenterHz / 1e6
  297. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) ||
  298. (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  299. jobOutRate := decimTarget
  300. if isWFM {
  301. jobOutRate = wfmStreamOutRate
  302. if bw < wfmStreamMinBW {
  303. bw = wfmStreamMinBW
  304. }
  305. } else if bw < 20000 {
  306. bw = 20000
  307. }
  308. ps := phaseState[sig.ID]
  309. if ps == nil {
  310. ps = &streamExtractState{}
  311. phaseState[sig.ID] = ps
  312. }
  313. // PhaseStart is where the NEW data begins. But gpuIQ has overlap
  314. // prepended, so the GPU kernel starts processing at the overlap.
  315. // We need to rewind the phase by overlapLen samples so that the
  316. // overlap region gets the correct phase, and the new data region
  317. // starts at ps.phase exactly.
  318. phaseInc := -2.0 * math.Pi * (sig.CenterHz - centerHz) / float64(sampleRate)
  319. gpuPhaseStart := ps.phase - phaseInc*float64(overlapLen)
  320. jobs[i] = gpudemod.ExtractJob{
  321. OffsetHz: sig.CenterHz - centerHz,
  322. BW: bw,
  323. OutRate: jobOutRate,
  324. PhaseStart: gpuPhaseStart,
  325. }
  326. if coll != nil {
  327. tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID), "path", "gpu")
  328. inputHead := probeHead(gpuIQ, 16, 1e-6)
  329. coll.SetGauge("iq.extract.input_head.zero_count", float64(inputHead.zeroCount), tags)
  330. coll.SetGauge("iq.extract.input_head.first_nonzero_index", float64(inputHead.firstNonZeroIndex), tags)
  331. coll.SetGauge("iq.extract.input_head.max_step", inputHead.maxStep, tags)
  332. coll.Event("extract_input_head_probe", "info", "extractor input head probe", tags, map[string]any{
  333. "mags": inputHead.mags,
  334. "zero_count": inputHead.zeroCount,
  335. "first_nonzero_index": inputHead.firstNonZeroIndex,
  336. "head_max_step": inputHead.maxStep,
  337. "center_offset_hz": jobs[i].OffsetHz,
  338. "bandwidth_hz": bw,
  339. "out_rate": jobOutRate,
  340. "trim_samples": (overlapLen + int(math.Max(1, math.Round(float64(sampleRate)/float64(jobOutRate)))) - 1) / int(math.Max(1, math.Round(float64(sampleRate)/float64(jobOutRate)))),
  341. })
  342. }
  343. }
  344. // Try GPU BatchRunner with phase unless CPU-only debug is forced.
  345. var runner *gpudemod.BatchRunner
  346. if forceCPUStreamExtract {
  347. logging.Warn("boundary", "force_cpu_stream_extract", "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "signals", len(signals))
  348. } else {
  349. runner = extractMgr.get(len(gpuIQ), sampleRate)
  350. }
  351. if runner != nil {
  352. if coll != nil && len(gpuIQ) > 0 {
  353. inputProbe := probeHead(gpuIQ, 16, 1e-6)
  354. coll.Event("gpu_kernel_input_head_probe", "info", "gpu kernel input head probe", nil, map[string]any{
  355. "mags": inputProbe.mags,
  356. "zero_count": inputProbe.zeroCount,
  357. "first_nonzero_index": inputProbe.firstNonZeroIndex,
  358. "head_max_step": inputProbe.maxStep,
  359. "gpuIQ_len": len(gpuIQ),
  360. "sample_rate": sampleRate,
  361. "signals": len(signals),
  362. })
  363. }
  364. results, err := runner.ShiftFilterDecimateBatchWithPhase(gpuIQ, jobs)
  365. if err == nil && len(results) == len(signals) {
  366. for i, res := range results {
  367. outRate := res.Rate
  368. if outRate <= 0 {
  369. outRate = decimTarget
  370. }
  371. sigMHz := signals[i].CenterHz / 1e6
  372. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (signals[i].Class != nil && (signals[i].Class.ModType == "WFM" || signals[i].Class.ModType == "WFM_STEREO"))
  373. if isWFM {
  374. outRate = wfmStreamOutRate
  375. }
  376. decim := sampleRate / outRate
  377. if decim < 1 {
  378. decim = 1
  379. }
  380. trimSamples := (overlapLen + decim - 1) / decim
  381. if i == 0 {
  382. logging.Debug("extract", "gpu_result", "rate", res.Rate, "outRate", outRate, "decim", decim, "trim", trimSamples)
  383. }
  384. // Update phase state — advance only by NEW data length, not overlap
  385. phaseInc := -2.0 * math.Pi * jobs[i].OffsetHz / float64(sampleRate)
  386. phaseState[signals[i].ID].phase += phaseInc * float64(len(allIQ))
  387. // Normalize to [-π, π) to prevent float64 drift over long runs
  388. phaseState[signals[i].ID].phase = math.Remainder(phaseState[signals[i].ID].phase, 2*math.Pi)
  389. // Trim overlap from output
  390. iq := res.IQ
  391. rawLen := len(iq)
  392. if trimSamples > 0 && trimSamples < len(iq) {
  393. iq = iq[trimSamples:]
  394. }
  395. if i == 0 {
  396. logging.Debug("boundary", "extract_trim", "path", "gpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(iq), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID)
  397. logExtractorHeadComparison(signals[i].ID, "gpu", overlapLen, res.IQ, trimSamples, iq)
  398. }
  399. if coll != nil {
  400. tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", signals[i].ID), "path", "gpu")
  401. kernelProbe := probeHead(res.IQ, 16, 1e-6)
  402. coll.Event("gpu_kernel_output_head_probe", "info", "gpu kernel output head probe", tags, map[string]any{
  403. "mags": kernelProbe.mags,
  404. "zero_count": kernelProbe.zeroCount,
  405. "first_nonzero_index": kernelProbe.firstNonZeroIndex,
  406. "head_max_step": kernelProbe.maxStep,
  407. "raw_len": rawLen,
  408. "out_rate": outRate,
  409. "trim_samples": trimSamples,
  410. })
  411. stats := computeIQHeadStats(iq, 64)
  412. coll.SetGauge("iq.extract.output.length", float64(len(iq)), tags)
  413. coll.Observe("iq.extract.output.head_mean_mag", stats.meanMag, tags)
  414. coll.Observe("iq.extract.output.head_min_mag", stats.minMag, tags)
  415. coll.Observe("iq.extract.output.head_max_step", stats.maxStep, tags)
  416. coll.Observe("iq.extract.output.head_p95_step", stats.p95Step, tags)
  417. coll.Observe("iq.extract.output.head_tail_ratio", stats.headTail, tags)
  418. coll.SetGauge("iq.extract.output.head_low_magnitude_count", float64(stats.lowMag), tags)
  419. coll.SetGauge("iq.extract.raw.length", float64(rawLen), tags)
  420. coll.SetGauge("iq.extract.trim.trim_samples", float64(trimSamples), tags)
  421. if rawLen > 0 {
  422. coll.SetGauge("iq.extract.raw.head_mag", math.Hypot(float64(real(res.IQ[0])), float64(imag(res.IQ[0]))), tags)
  423. coll.SetGauge("iq.extract.raw.tail_mag", math.Hypot(float64(real(res.IQ[rawLen-1])), float64(imag(res.IQ[rawLen-1]))), tags)
  424. rawHead := probeHead(res.IQ, 16, 1e-6)
  425. coll.SetGauge("iq.extract.raw.head_zero_count", float64(rawHead.zeroCount), tags)
  426. coll.SetGauge("iq.extract.raw.first_nonzero_index", float64(rawHead.firstNonZeroIndex), tags)
  427. coll.SetGauge("iq.extract.raw.head_max_step", rawHead.maxStep, tags)
  428. coll.Event("extract_raw_head_probe", "info", "raw extractor head probe", tags, map[string]any{
  429. "mags": rawHead.mags,
  430. "zero_count": rawHead.zeroCount,
  431. "first_nonzero_index": rawHead.firstNonZeroIndex,
  432. "head_max_step": rawHead.maxStep,
  433. "trim_samples": trimSamples,
  434. })
  435. }
  436. if len(iq) > 0 {
  437. coll.SetGauge("iq.extract.trimmed.head_mag", math.Hypot(float64(real(iq[0])), float64(imag(iq[0]))), tags)
  438. coll.SetGauge("iq.extract.trimmed.tail_mag", math.Hypot(float64(real(iq[len(iq)-1])), float64(imag(iq[len(iq)-1]))), tags)
  439. trimmedHead := probeHead(iq, 16, 1e-6)
  440. coll.SetGauge("iq.extract.trimmed.head_zero_count", float64(trimmedHead.zeroCount), tags)
  441. coll.SetGauge("iq.extract.trimmed.first_nonzero_index", float64(trimmedHead.firstNonZeroIndex), tags)
  442. coll.SetGauge("iq.extract.trimmed.head_max_step", trimmedHead.maxStep, tags)
  443. coll.Event("extract_trimmed_head_probe", "info", "trimmed extractor head probe", tags, map[string]any{
  444. "mags": trimmedHead.mags,
  445. "zero_count": trimmedHead.zeroCount,
  446. "first_nonzero_index": trimmedHead.firstNonZeroIndex,
  447. "head_max_step": trimmedHead.maxStep,
  448. "trim_samples": trimSamples,
  449. })
  450. }
  451. if rb := rawBoundary[signals[i].ID]; rb.set && rawLen > 0 {
  452. prevMag := math.Hypot(float64(real(rb.last)), float64(imag(rb.last)))
  453. currMag := math.Hypot(float64(real(res.IQ[0])), float64(imag(res.IQ[0])))
  454. coll.SetGauge("iq.extract.raw.boundary.prev_tail_mag", prevMag, tags)
  455. coll.SetGauge("iq.extract.raw.boundary.curr_head_mag", currMag, tags)
  456. coll.Event("extract_raw_boundary", "info", "raw extractor boundary", tags, map[string]any{
  457. "delta_mag": math.Abs(currMag - prevMag),
  458. "trim_samples": trimSamples,
  459. "raw_len": rawLen,
  460. })
  461. }
  462. if tb := trimmedBoundary[signals[i].ID]; tb.set && len(iq) > 0 {
  463. prevMag := math.Hypot(float64(real(tb.last)), float64(imag(tb.last)))
  464. currMag := math.Hypot(float64(real(iq[0])), float64(imag(iq[0])))
  465. coll.SetGauge("iq.extract.trimmed.boundary.prev_tail_mag", prevMag, tags)
  466. coll.SetGauge("iq.extract.trimmed.boundary.curr_head_mag", currMag, tags)
  467. coll.Event("extract_trimmed_boundary", "info", "trimmed extractor boundary", tags, map[string]any{
  468. "delta_mag": math.Abs(currMag - prevMag),
  469. "trim_samples": trimSamples,
  470. "out_len": len(iq),
  471. })
  472. }
  473. }
  474. if rawLen > 0 {
  475. rawBoundary[signals[i].ID] = boundaryProbeState{last: res.IQ[rawLen-1], set: true}
  476. }
  477. if len(iq) > 0 {
  478. trimmedBoundary[signals[i].ID] = boundaryProbeState{last: iq[len(iq)-1], set: true}
  479. }
  480. out[i] = iq
  481. rates[i] = res.Rate
  482. }
  483. return out, rates
  484. } else if err != nil {
  485. log.Printf("gpudemod: stream batch extraction failed: %v", err)
  486. }
  487. }
  488. // CPU fallback (with phase tracking)
  489. for i, sig := range signals {
  490. offset := sig.CenterHz - centerHz
  491. bw := jobs[i].BW
  492. ps := phaseState[sig.ID]
  493. // Phase-continuous FreqShift — rewind by overlap so new data starts at ps.phase
  494. shifted := make([]complex64, len(gpuIQ))
  495. inc := -2.0 * math.Pi * offset / float64(sampleRate)
  496. phase := ps.phase - inc*float64(overlapLen)
  497. for k, v := range gpuIQ {
  498. phase += inc
  499. re := math.Cos(phase)
  500. im := math.Sin(phase)
  501. shifted[k] = complex(
  502. float32(float64(real(v))*re-float64(imag(v))*im),
  503. float32(float64(real(v))*im+float64(imag(v))*re),
  504. )
  505. }
  506. // Advance phase by NEW data length only
  507. ps.phase += inc * float64(len(allIQ))
  508. ps.phase = math.Remainder(ps.phase, 2*math.Pi)
  509. cutoff := bw / 2
  510. if cutoff < 200 {
  511. cutoff = 200
  512. }
  513. if cutoff > float64(sampleRate)/2-1 {
  514. cutoff = float64(sampleRate)/2 - 1
  515. }
  516. firTaps := 101
  517. if aqCfg.firTaps > 0 {
  518. firTaps = aqCfg.firTaps
  519. }
  520. taps := dsp.LowpassFIR(cutoff, sampleRate, firTaps)
  521. filtered := dsp.ApplyFIR(shifted, taps)
  522. sigMHz := sig.CenterHz / 1e6
  523. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  524. outRate := decimTarget
  525. if isWFM {
  526. outRate = wfmStreamOutRate
  527. }
  528. decim := sampleRate / outRate
  529. if decim < 1 {
  530. decim = 1
  531. }
  532. decimated := dsp.Decimate(filtered, decim)
  533. rates[i] = sampleRate / decim
  534. // Trim overlap — use ceil to ensure ALL overlap samples are removed.
  535. // Floor trim (overlapLen/decim) leaves a remainder for non-divisible
  536. // factors (e.g. 512/20=25 trims only 500 of 512 samples → 12 leak).
  537. trimSamples := (overlapLen + decim - 1) / decim
  538. if i == 0 {
  539. logging.Debug("extract", "cpu_result", "outRate", outRate, "decim", decim, "trim", trimSamples)
  540. }
  541. rawIQ := decimated
  542. rawLen := len(rawIQ)
  543. if trimSamples > 0 && trimSamples < len(decimated) {
  544. decimated = decimated[trimSamples:]
  545. }
  546. if i == 0 {
  547. logging.Debug("boundary", "extract_trim", "path", "cpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(decimated), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID)
  548. logExtractorHeadComparison(signals[i].ID, "cpu", overlapLen, decimated, trimSamples, decimated)
  549. }
  550. if coll != nil {
  551. tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", signals[i].ID), "path", "cpu")
  552. stats := computeIQHeadStats(decimated, 64)
  553. coll.SetGauge("iq.extract.output.length", float64(len(decimated)), tags)
  554. coll.Observe("iq.extract.output.head_mean_mag", stats.meanMag, tags)
  555. coll.Observe("iq.extract.output.head_min_mag", stats.minMag, tags)
  556. coll.Observe("iq.extract.output.head_max_step", stats.maxStep, tags)
  557. coll.Observe("iq.extract.output.head_p95_step", stats.p95Step, tags)
  558. coll.Observe("iq.extract.output.head_tail_ratio", stats.headTail, tags)
  559. coll.SetGauge("iq.extract.output.head_low_magnitude_count", float64(stats.lowMag), tags)
  560. coll.SetGauge("iq.extract.raw.length", float64(rawLen), tags)
  561. coll.SetGauge("iq.extract.trim.trim_samples", float64(trimSamples), tags)
  562. if rb := rawBoundary[signals[i].ID]; rb.set && rawLen > 0 {
  563. observeBoundarySample(coll, "iq.extract.raw.boundary", tags, rb.last, rawIQ[0])
  564. }
  565. if tb := trimmedBoundary[signals[i].ID]; tb.set && len(decimated) > 0 {
  566. observeBoundarySample(coll, "iq.extract.trimmed.boundary", tags, tb.last, decimated[0])
  567. }
  568. }
  569. if rawLen > 0 {
  570. rawBoundary[signals[i].ID] = boundaryProbeState{last: rawIQ[rawLen-1], set: true}
  571. }
  572. if len(decimated) > 0 {
  573. trimmedBoundary[signals[i].ID] = boundaryProbeState{last: decimated[len(decimated)-1], set: true}
  574. }
  575. out[i] = decimated
  576. }
  577. return out, rates
  578. }
  579. type iqHeadStats struct {
  580. length int
  581. minMag float64
  582. maxMag float64
  583. meanMag float64
  584. lowMag int
  585. maxStep float64
  586. maxStepIdx int
  587. p95Step float64
  588. headTail float64
  589. headMinIdx int
  590. stepSamples []float64
  591. }
  592. type boundaryProbeState struct {
  593. last complex64
  594. set bool
  595. }
  596. type headProbe struct {
  597. zeroCount int
  598. firstNonZeroIndex int
  599. maxStep float64
  600. mags []float64
  601. }
  602. func probeHead(samples []complex64, n int, zeroThreshold float64) headProbe {
  603. if n <= 0 || len(samples) == 0 {
  604. return headProbe{firstNonZeroIndex: -1}
  605. }
  606. if len(samples) < n {
  607. n = len(samples)
  608. }
  609. if zeroThreshold <= 0 {
  610. zeroThreshold = 1e-6
  611. }
  612. out := headProbe{firstNonZeroIndex: -1, mags: make([]float64, 0, n)}
  613. for i := 0; i < n; i++ {
  614. v := samples[i]
  615. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  616. out.mags = append(out.mags, mag)
  617. if mag <= zeroThreshold {
  618. out.zeroCount++
  619. } else if out.firstNonZeroIndex < 0 {
  620. out.firstNonZeroIndex = i
  621. }
  622. if i > 0 {
  623. p := samples[i-1]
  624. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  625. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  626. step := math.Abs(math.Atan2(num, den))
  627. if step > out.maxStep {
  628. out.maxStep = step
  629. }
  630. }
  631. }
  632. return out
  633. }
  634. func observeBoundarySample(coll *telemetry.Collector, metricPrefix string, tags map[string]string, prev complex64, curr complex64) {
  635. prevMag := math.Hypot(float64(real(prev)), float64(imag(prev)))
  636. currMag := math.Hypot(float64(real(curr)), float64(imag(curr)))
  637. deltaMag := math.Abs(currMag - prevMag)
  638. num := float64(real(prev))*float64(imag(curr)) - float64(imag(prev))*float64(real(curr))
  639. den := float64(real(prev))*float64(real(curr)) + float64(imag(prev))*float64(imag(curr))
  640. deltaPhase := math.Abs(math.Atan2(num, den))
  641. d2 := float64(real(curr-prev))*float64(real(curr-prev)) + float64(imag(curr-prev))*float64(imag(curr-prev))
  642. coll.Observe(metricPrefix+".delta_mag", deltaMag, tags)
  643. coll.Observe(metricPrefix+".delta_phase", deltaPhase, tags)
  644. coll.Observe(metricPrefix+".d2", d2, tags)
  645. coll.Observe(metricPrefix+".discontinuity_score", deltaMag+deltaPhase, tags)
  646. }
  647. func computeIQHeadStats(iq []complex64, headLen int) iqHeadStats {
  648. stats := iqHeadStats{minMag: math.MaxFloat64, headMinIdx: -1, maxStepIdx: -1}
  649. if len(iq) == 0 {
  650. stats.minMag = 0
  651. return stats
  652. }
  653. n := len(iq)
  654. if headLen > 0 && headLen < n {
  655. n = headLen
  656. }
  657. stats.length = n
  658. stats.stepSamples = make([]float64, 0, max(0, n-1))
  659. sumMag := 0.0
  660. headSum := 0.0
  661. tailSum := 0.0
  662. tailCount := 0
  663. for i := 0; i < n; i++ {
  664. v := iq[i]
  665. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  666. if mag < stats.minMag {
  667. stats.minMag = mag
  668. stats.headMinIdx = i
  669. }
  670. if mag > stats.maxMag {
  671. stats.maxMag = mag
  672. }
  673. sumMag += mag
  674. if mag < 0.05 {
  675. stats.lowMag++
  676. }
  677. if i < min(16, n) {
  678. headSum += mag
  679. }
  680. if i >= max(0, n-16) {
  681. tailSum += mag
  682. tailCount++
  683. }
  684. if i > 0 {
  685. p := iq[i-1]
  686. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  687. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  688. step := math.Abs(math.Atan2(num, den))
  689. if step > stats.maxStep {
  690. stats.maxStep = step
  691. stats.maxStepIdx = i - 1
  692. }
  693. stats.stepSamples = append(stats.stepSamples, step)
  694. }
  695. }
  696. stats.meanMag = sumMag / float64(n)
  697. if len(stats.stepSamples) > 0 {
  698. sorted := append([]float64(nil), stats.stepSamples...)
  699. sort.Float64s(sorted)
  700. idx := int(float64(len(sorted)-1) * 0.95)
  701. stats.p95Step = sorted[idx]
  702. } else {
  703. stats.p95Step = stats.maxStep
  704. }
  705. if headSum > 0 && tailCount > 0 {
  706. headMean := headSum / float64(min(16, n))
  707. tailMean := tailSum / float64(tailCount)
  708. if tailMean > 0 {
  709. stats.headTail = headMean / tailMean
  710. }
  711. }
  712. return stats
  713. }
  714. func observeIQStats(coll *telemetry.Collector, stage string, iq []complex64, tags telemetry.Tags) {
  715. if coll == nil || len(iq) == 0 {
  716. return
  717. }
  718. stats := computeIQHeadStats(iq, len(iq))
  719. stageTags := telemetry.TagsWith(tags, "stage", stage)
  720. coll.Observe("iq.magnitude.min", stats.minMag, stageTags)
  721. coll.Observe("iq.magnitude.max", stats.maxMag, stageTags)
  722. coll.Observe("iq.magnitude.mean", stats.meanMag, stageTags)
  723. coll.Observe("iq.phase_step.max", stats.maxStep, stageTags)
  724. coll.Observe("iq.phase_step.p95", stats.p95Step, stageTags)
  725. coll.Observe("iq.low_magnitude.count", float64(stats.lowMag), stageTags)
  726. coll.SetGauge("iq.length", float64(stats.length), stageTags)
  727. }
  728. func logExtractorHeadComparison(signalID int64, path string, overlapLen int, raw []complex64, trimSamples int, out []complex64) {
  729. rawStats := computeIQHeadStats(raw, 96)
  730. trimmedStats := computeIQHeadStats(out, 96)
  731. logging.Debug("boundary", "extract_head_compare",
  732. "signal", signalID,
  733. "path", path,
  734. "raw_len", len(raw),
  735. "trim", trimSamples,
  736. "out_len", len(out),
  737. "overlap_len", overlapLen,
  738. "raw_min_mag", rawStats.minMag,
  739. "raw_min_idx", rawStats.headMinIdx,
  740. "raw_max_step", rawStats.maxStep,
  741. "raw_max_step_idx", rawStats.maxStepIdx,
  742. "raw_head_tail", rawStats.headTail,
  743. "trimmed_min_mag", trimmedStats.minMag,
  744. "trimmed_min_idx", trimmedStats.headMinIdx,
  745. "trimmed_max_step", trimmedStats.maxStep,
  746. "trimmed_max_step_idx", trimmedStats.maxStepIdx,
  747. "trimmed_head_tail", trimmedStats.headTail,
  748. )
  749. for _, off := range []int{2, 4, 8, 16} {
  750. if len(out) <= off+8 {
  751. continue
  752. }
  753. offStats := computeIQHeadStats(out[off:], 96)
  754. logging.Debug("boundary", "extract_head_offset_compare",
  755. "signal", signalID,
  756. "path", path,
  757. "offset", off,
  758. "base_min_mag", trimmedStats.minMag,
  759. "base_min_idx", trimmedStats.headMinIdx,
  760. "base_max_step", trimmedStats.maxStep,
  761. "base_max_step_idx", trimmedStats.maxStepIdx,
  762. "offset_min_mag", offStats.minMag,
  763. "offset_min_idx", offStats.headMinIdx,
  764. "offset_max_step", offStats.maxStep,
  765. "offset_max_step_idx", offStats.maxStepIdx,
  766. "offset_head_tail", offStats.headTail,
  767. )
  768. }
  769. }