Wideband autonomous SDR analysis engine forked from sdr-visual-suite
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

791 lines
27KB

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "math"
  6. "os"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "sdr-wideband-suite/internal/config"
  12. "sdr-wideband-suite/internal/demod/gpudemod"
  13. "sdr-wideband-suite/internal/detector"
  14. "sdr-wideband-suite/internal/dsp"
  15. "sdr-wideband-suite/internal/logging"
  16. "sdr-wideband-suite/internal/telemetry"
  17. )
  18. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  19. if raw == "" {
  20. return fallback
  21. }
  22. if d, err := time.ParseDuration(raw); err == nil {
  23. return d
  24. }
  25. return fallback
  26. }
  27. func buildDecoderMap(cfg config.Config) map[string]string {
  28. out := map[string]string{}
  29. if cfg.Decoder.FT8Cmd != "" {
  30. out["FT8"] = cfg.Decoder.FT8Cmd
  31. }
  32. if cfg.Decoder.WSPRCmd != "" {
  33. out["WSPR"] = cfg.Decoder.WSPRCmd
  34. }
  35. if cfg.Decoder.DMRCmd != "" {
  36. out["DMR"] = cfg.Decoder.DMRCmd
  37. }
  38. if cfg.Decoder.DStarCmd != "" {
  39. out["D-STAR"] = cfg.Decoder.DStarCmd
  40. }
  41. if cfg.Decoder.FSKCmd != "" {
  42. out["FSK"] = cfg.Decoder.FSKCmd
  43. }
  44. if cfg.Decoder.PSKCmd != "" {
  45. out["PSK"] = cfg.Decoder.PSKCmd
  46. }
  47. return out
  48. }
  49. func decoderKeys(cfg config.Config) []string {
  50. m := buildDecoderMap(cfg)
  51. keys := make([]string, 0, len(m))
  52. for k := range m {
  53. keys = append(keys, k)
  54. }
  55. sort.Strings(keys)
  56. return keys
  57. }
  58. func (m *extractionManager) reset() {
  59. if m == nil {
  60. return
  61. }
  62. m.mu.Lock()
  63. defer m.mu.Unlock()
  64. if m.runner != nil {
  65. m.runner.Close()
  66. m.runner = nil
  67. }
  68. }
  69. func (m *extractionManager) get(sampleCount int, sampleRate int) *gpudemod.BatchRunner {
  70. if m == nil || sampleCount <= 0 || sampleRate <= 0 || !gpudemod.Available() {
  71. return nil
  72. }
  73. m.mu.Lock()
  74. defer m.mu.Unlock()
  75. if m.runner != nil && sampleCount > m.maxSamples {
  76. m.runner.Close()
  77. m.runner = nil
  78. }
  79. if m.runner == nil {
  80. // Allocate generously: enough for full allIQ (sampleRate/10 ≈ 100ms)
  81. // so the runner never needs re-allocation when used for both
  82. // classification (FFT-block ~65k) and streaming (allIQ ~273k+).
  83. allocSize := sampleCount
  84. generous := sampleRate/10 + 1024 // ~400k at 4MHz — covers any scenario
  85. if generous > allocSize {
  86. allocSize = generous
  87. }
  88. if r, err := gpudemod.NewBatchRunner(allocSize, sampleRate); err == nil {
  89. m.runner = r
  90. m.maxSamples = allocSize
  91. } else {
  92. log.Printf("gpudemod: batch runner init failed: %v", err)
  93. }
  94. return m.runner
  95. }
  96. return m.runner
  97. }
  98. func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
  99. if len(iq) == 0 || sampleRate <= 0 {
  100. return nil
  101. }
  102. results, _ := extractSignalIQBatch(nil, iq, sampleRate, centerHz, []detector.Signal{{CenterHz: sigHz, BWHz: bwHz}})
  103. if len(results) == 0 {
  104. return nil
  105. }
  106. return results[0]
  107. }
  108. func extractSignalIQBatch(extractMgr *extractionManager, iq []complex64, sampleRate int, centerHz float64, signals []detector.Signal) ([][]complex64, []int) {
  109. out := make([][]complex64, len(signals))
  110. rates := make([]int, len(signals))
  111. if len(iq) == 0 || sampleRate <= 0 || len(signals) == 0 {
  112. return out, rates
  113. }
  114. decimTarget := 200000
  115. if decimTarget <= 0 {
  116. decimTarget = sampleRate
  117. }
  118. runner := extractMgr.get(len(iq), sampleRate)
  119. if runner != nil {
  120. jobs := make([]gpudemod.ExtractJob, len(signals))
  121. for i, sig := range signals {
  122. bw := sig.BWHz
  123. sigMHz := sig.CenterHz / 1e6
  124. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  125. jobOutRate := decimTarget
  126. if isWFM {
  127. jobOutRate = wfmStreamOutRate
  128. }
  129. // Minimum extraction BW: ensure enough bandwidth for demod features
  130. // FM broadcast (87.5-108 MHz) needs >=250kHz for stereo pilot + RDS at 57kHz
  131. // Also widen for any signal classified as WFM (in case of re-extraction)
  132. if isWFM {
  133. if bw < wfmStreamMinBW {
  134. bw = wfmStreamMinBW
  135. }
  136. } else if bw < 20000 {
  137. bw = 20000
  138. }
  139. jobs[i] = gpudemod.ExtractJob{OffsetHz: sig.CenterHz - centerHz, BW: bw, OutRate: jobOutRate}
  140. }
  141. if gpuOuts, gpuRates, err := runner.ShiftFilterDecimateBatch(iq, jobs); err == nil && len(gpuOuts) == len(signals) {
  142. // batch extraction OK (silent)
  143. for i := range gpuOuts {
  144. out[i] = gpuOuts[i]
  145. if i < len(gpuRates) {
  146. rates[i] = gpuRates[i]
  147. }
  148. }
  149. return out, rates
  150. } else if err != nil {
  151. log.Printf("gpudemod: batch extraction failed for %d signals: %v", len(signals), err)
  152. }
  153. }
  154. // CPU extraction fallback (silent — see batch extraction failed above if applicable)
  155. for i, sig := range signals {
  156. offset := sig.CenterHz - centerHz
  157. shifted := dsp.FreqShift(iq, sampleRate, offset)
  158. bw := sig.BWHz
  159. // FM broadcast (87.5-108 MHz) needs >=250kHz for stereo + RDS
  160. sigMHz := sig.CenterHz / 1e6
  161. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  162. if isWFM {
  163. if bw < wfmStreamMinBW {
  164. bw = wfmStreamMinBW
  165. }
  166. } else if bw < 20000 {
  167. bw = 20000
  168. }
  169. cutoff := bw / 2
  170. if cutoff < 200 {
  171. cutoff = 200
  172. }
  173. if cutoff > float64(sampleRate)/2-1 {
  174. cutoff = float64(sampleRate)/2 - 1
  175. }
  176. taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
  177. filtered := dsp.ApplyFIR(shifted, taps)
  178. decim := sampleRate / decimTarget
  179. if decim < 1 {
  180. decim = 1
  181. }
  182. out[i] = dsp.Decimate(filtered, decim)
  183. rates[i] = sampleRate / decim
  184. }
  185. return out, rates
  186. }
  187. func parseSince(raw string) (time.Time, error) {
  188. if raw == "" {
  189. return time.Time{}, nil
  190. }
  191. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  192. if ms > 1e12 {
  193. return time.UnixMilli(ms), nil
  194. }
  195. return time.Unix(ms, 0), nil
  196. }
  197. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  198. return t, nil
  199. }
  200. return time.Parse(time.RFC3339, raw)
  201. }
  202. // streamExtractState holds per-signal persistent state for phase-continuous
  203. // GPU extraction. Stored in the DSP loop, keyed by signal ID.
  204. type streamExtractState struct {
  205. phase float64 // FreqShift phase accumulator
  206. }
  207. // streamIQOverlap holds the tail of the previous allIQ for FIR halo prepend.
  208. type streamIQOverlap struct {
  209. tail []complex64
  210. }
  211. // extractionConfig holds audio quality settings for signal extraction.
  212. type extractionConfig struct {
  213. firTaps int // AQ-3: FIR tap count (default 101)
  214. bwMult float64 // AQ-5: BW multiplier (default 1.2)
  215. }
  216. const streamOverlapLen = 512 // must be >= FIR tap count with margin
  217. const (
  218. wfmStreamOutRate = 500000
  219. wfmStreamMinBW = 250000
  220. )
  221. var forceCPUStreamExtract = func() bool {
  222. raw := strings.TrimSpace(os.Getenv("SDR_FORCE_CPU_STREAM_EXTRACT"))
  223. if raw == "" {
  224. return false
  225. }
  226. v, err := strconv.ParseBool(raw)
  227. if err != nil {
  228. return false
  229. }
  230. return v
  231. }()
  232. // extractForStreaming performs GPU-accelerated extraction with:
  233. // - Per-signal phase-continuous FreqShift (via PhaseStart in ExtractJob)
  234. // - IQ overlap prepended to allIQ so FIR kernel has real data in halo
  235. //
  236. // Returns extracted snippets with overlap trimmed, and updates phase state.
  237. func extractForStreaming(
  238. extractMgr *extractionManager,
  239. allIQ []complex64,
  240. sampleRate int,
  241. centerHz float64,
  242. signals []detector.Signal,
  243. phaseState map[int64]*streamExtractState,
  244. overlap *streamIQOverlap,
  245. aqCfg extractionConfig,
  246. coll *telemetry.Collector,
  247. ) ([][]complex64, []int) {
  248. out := make([][]complex64, len(signals))
  249. rates := make([]int, len(signals))
  250. if len(allIQ) == 0 || sampleRate <= 0 || len(signals) == 0 {
  251. return out, rates
  252. }
  253. // AQ-3: Use configured overlap length (must cover FIR taps)
  254. overlapNeeded := streamOverlapLen
  255. if aqCfg.firTaps > 0 && aqCfg.firTaps+64 > overlapNeeded {
  256. overlapNeeded = aqCfg.firTaps + 64
  257. }
  258. // Prepend overlap from previous frame so FIR kernel has real halo data
  259. var gpuIQ []complex64
  260. overlapLen := len(overlap.tail)
  261. logging.Debug("extract", "overlap", "len", overlapLen, "needed", overlapNeeded, "allIQ", len(allIQ))
  262. if overlapLen > 0 {
  263. gpuIQ = make([]complex64, overlapLen+len(allIQ))
  264. copy(gpuIQ, overlap.tail)
  265. copy(gpuIQ[overlapLen:], allIQ)
  266. } else {
  267. gpuIQ = allIQ
  268. overlapLen = 0
  269. }
  270. // Save tail for next frame (sized to cover configured FIR taps)
  271. if len(allIQ) > overlapNeeded {
  272. overlap.tail = append(overlap.tail[:0], allIQ[len(allIQ)-overlapNeeded:]...)
  273. } else {
  274. overlap.tail = append(overlap.tail[:0], allIQ...)
  275. }
  276. decimTarget := 200000
  277. // AQ-5: BW multiplier for extraction (wider = better S/N for weak signals)
  278. bwMult := aqCfg.bwMult
  279. if bwMult <= 0 {
  280. bwMult = 1.0
  281. }
  282. if coll != nil {
  283. coll.SetGauge("iq.extract.input.length", float64(len(allIQ)), nil)
  284. coll.SetGauge("iq.extract.input.overlap_length", float64(overlapLen), nil)
  285. headMean, tailMean, boundaryScore, _ := boundaryMetrics(overlap.tail, allIQ, 32)
  286. coll.SetGauge("iq.extract.input.head_mean_mag", headMean, nil)
  287. coll.SetGauge("iq.extract.input.prev_tail_mean_mag", tailMean, nil)
  288. coll.Observe("iq.extract.input.discontinuity_score", boundaryScore, nil)
  289. }
  290. rawBoundary := make(map[int64]boundaryProbeState, len(signals))
  291. trimmedBoundary := make(map[int64]boundaryProbeState, len(signals))
  292. // Build jobs with per-signal phase
  293. jobs := make([]gpudemod.ExtractJob, len(signals))
  294. for i, sig := range signals {
  295. bw := sig.BWHz * bwMult // AQ-5: widen extraction BW
  296. sigMHz := sig.CenterHz / 1e6
  297. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) ||
  298. (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  299. jobOutRate := decimTarget
  300. if isWFM {
  301. jobOutRate = wfmStreamOutRate
  302. if bw < wfmStreamMinBW {
  303. bw = wfmStreamMinBW
  304. }
  305. } else if bw < 20000 {
  306. bw = 20000
  307. }
  308. ps := phaseState[sig.ID]
  309. if ps == nil {
  310. ps = &streamExtractState{}
  311. phaseState[sig.ID] = ps
  312. }
  313. // PhaseStart is where the NEW data begins. But gpuIQ has overlap
  314. // prepended, so the GPU kernel starts processing at the overlap.
  315. // We need to rewind the phase by overlapLen samples so that the
  316. // overlap region gets the correct phase, and the new data region
  317. // starts at ps.phase exactly.
  318. phaseInc := -2.0 * math.Pi * (sig.CenterHz - centerHz) / float64(sampleRate)
  319. gpuPhaseStart := ps.phase - phaseInc*float64(overlapLen)
  320. jobs[i] = gpudemod.ExtractJob{
  321. OffsetHz: sig.CenterHz - centerHz,
  322. BW: bw,
  323. OutRate: jobOutRate,
  324. PhaseStart: gpuPhaseStart,
  325. }
  326. if coll != nil {
  327. tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID), "path", "gpu")
  328. inputHead := probeHead(gpuIQ, 16, 1e-6)
  329. coll.SetGauge("iq.extract.input_head.zero_count", float64(inputHead.zeroCount), tags)
  330. coll.SetGauge("iq.extract.input_head.first_nonzero_index", float64(inputHead.firstNonZeroIndex), tags)
  331. coll.SetGauge("iq.extract.input_head.max_step", inputHead.maxStep, tags)
  332. coll.Event("extract_input_head_probe", "info", "extractor input head probe", tags, map[string]any{
  333. "mags": inputHead.mags,
  334. "zero_count": inputHead.zeroCount,
  335. "first_nonzero_index": inputHead.firstNonZeroIndex,
  336. "head_max_step": inputHead.maxStep,
  337. "center_offset_hz": jobs[i].OffsetHz,
  338. "bandwidth_hz": bw,
  339. "out_rate": jobOutRate,
  340. "trim_samples": (overlapLen + int(math.Max(1, math.Round(float64(sampleRate)/float64(jobOutRate)))) - 1) / int(math.Max(1, math.Round(float64(sampleRate)/float64(jobOutRate)))),
  341. })
  342. }
  343. }
  344. // Try GPU BatchRunner with phase unless CPU-only debug is forced.
  345. var runner *gpudemod.BatchRunner
  346. if forceCPUStreamExtract {
  347. logging.Warn("boundary", "force_cpu_stream_extract", "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "signals", len(signals))
  348. } else {
  349. runner = extractMgr.get(len(gpuIQ), sampleRate)
  350. }
  351. if runner != nil {
  352. results, err := runner.ShiftFilterDecimateBatchWithPhase(gpuIQ, jobs)
  353. if err == nil && len(results) == len(signals) {
  354. for i, res := range results {
  355. outRate := res.Rate
  356. if outRate <= 0 {
  357. outRate = decimTarget
  358. }
  359. sigMHz := signals[i].CenterHz / 1e6
  360. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (signals[i].Class != nil && (signals[i].Class.ModType == "WFM" || signals[i].Class.ModType == "WFM_STEREO"))
  361. if isWFM {
  362. outRate = wfmStreamOutRate
  363. }
  364. decim := sampleRate / outRate
  365. if decim < 1 {
  366. decim = 1
  367. }
  368. trimSamples := (overlapLen + decim - 1) / decim
  369. if i == 0 {
  370. logging.Debug("extract", "gpu_result", "rate", res.Rate, "outRate", outRate, "decim", decim, "trim", trimSamples)
  371. }
  372. // Update phase state — advance only by NEW data length, not overlap
  373. phaseInc := -2.0 * math.Pi * jobs[i].OffsetHz / float64(sampleRate)
  374. phaseState[signals[i].ID].phase += phaseInc * float64(len(allIQ))
  375. // Normalize to [-π, π) to prevent float64 drift over long runs
  376. phaseState[signals[i].ID].phase = math.Remainder(phaseState[signals[i].ID].phase, 2*math.Pi)
  377. // Trim overlap from output
  378. iq := res.IQ
  379. rawLen := len(iq)
  380. if trimSamples > 0 && trimSamples < len(iq) {
  381. iq = iq[trimSamples:]
  382. }
  383. if i == 0 {
  384. logging.Debug("boundary", "extract_trim", "path", "gpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(iq), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID)
  385. logExtractorHeadComparison(signals[i].ID, "gpu", overlapLen, res.IQ, trimSamples, iq)
  386. }
  387. if coll != nil {
  388. tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", signals[i].ID), "path", "gpu")
  389. stats := computeIQHeadStats(iq, 64)
  390. coll.SetGauge("iq.extract.output.length", float64(len(iq)), tags)
  391. coll.Observe("iq.extract.output.head_mean_mag", stats.meanMag, tags)
  392. coll.Observe("iq.extract.output.head_min_mag", stats.minMag, tags)
  393. coll.Observe("iq.extract.output.head_max_step", stats.maxStep, tags)
  394. coll.Observe("iq.extract.output.head_p95_step", stats.p95Step, tags)
  395. coll.Observe("iq.extract.output.head_tail_ratio", stats.headTail, tags)
  396. coll.SetGauge("iq.extract.output.head_low_magnitude_count", float64(stats.lowMag), tags)
  397. coll.SetGauge("iq.extract.raw.length", float64(rawLen), tags)
  398. coll.SetGauge("iq.extract.trim.trim_samples", float64(trimSamples), tags)
  399. if rawLen > 0 {
  400. coll.SetGauge("iq.extract.raw.head_mag", math.Hypot(float64(real(res.IQ[0])), float64(imag(res.IQ[0]))), tags)
  401. coll.SetGauge("iq.extract.raw.tail_mag", math.Hypot(float64(real(res.IQ[rawLen-1])), float64(imag(res.IQ[rawLen-1]))), tags)
  402. rawHead := probeHead(res.IQ, 16, 1e-6)
  403. coll.SetGauge("iq.extract.raw.head_zero_count", float64(rawHead.zeroCount), tags)
  404. coll.SetGauge("iq.extract.raw.first_nonzero_index", float64(rawHead.firstNonZeroIndex), tags)
  405. coll.SetGauge("iq.extract.raw.head_max_step", rawHead.maxStep, tags)
  406. coll.Event("extract_raw_head_probe", "info", "raw extractor head probe", tags, map[string]any{
  407. "mags": rawHead.mags,
  408. "zero_count": rawHead.zeroCount,
  409. "first_nonzero_index": rawHead.firstNonZeroIndex,
  410. "head_max_step": rawHead.maxStep,
  411. "trim_samples": trimSamples,
  412. })
  413. }
  414. if len(iq) > 0 {
  415. coll.SetGauge("iq.extract.trimmed.head_mag", math.Hypot(float64(real(iq[0])), float64(imag(iq[0]))), tags)
  416. coll.SetGauge("iq.extract.trimmed.tail_mag", math.Hypot(float64(real(iq[len(iq)-1])), float64(imag(iq[len(iq)-1]))), tags)
  417. trimmedHead := probeHead(iq, 16, 1e-6)
  418. coll.SetGauge("iq.extract.trimmed.head_zero_count", float64(trimmedHead.zeroCount), tags)
  419. coll.SetGauge("iq.extract.trimmed.first_nonzero_index", float64(trimmedHead.firstNonZeroIndex), tags)
  420. coll.SetGauge("iq.extract.trimmed.head_max_step", trimmedHead.maxStep, tags)
  421. coll.Event("extract_trimmed_head_probe", "info", "trimmed extractor head probe", tags, map[string]any{
  422. "mags": trimmedHead.mags,
  423. "zero_count": trimmedHead.zeroCount,
  424. "first_nonzero_index": trimmedHead.firstNonZeroIndex,
  425. "head_max_step": trimmedHead.maxStep,
  426. "trim_samples": trimSamples,
  427. })
  428. }
  429. if rb := rawBoundary[signals[i].ID]; rb.set && rawLen > 0 {
  430. prevMag := math.Hypot(float64(real(rb.last)), float64(imag(rb.last)))
  431. currMag := math.Hypot(float64(real(res.IQ[0])), float64(imag(res.IQ[0])))
  432. coll.SetGauge("iq.extract.raw.boundary.prev_tail_mag", prevMag, tags)
  433. coll.SetGauge("iq.extract.raw.boundary.curr_head_mag", currMag, tags)
  434. coll.Event("extract_raw_boundary", "info", "raw extractor boundary", tags, map[string]any{
  435. "delta_mag": math.Abs(currMag - prevMag),
  436. "trim_samples": trimSamples,
  437. "raw_len": rawLen,
  438. })
  439. }
  440. if tb := trimmedBoundary[signals[i].ID]; tb.set && len(iq) > 0 {
  441. prevMag := math.Hypot(float64(real(tb.last)), float64(imag(tb.last)))
  442. currMag := math.Hypot(float64(real(iq[0])), float64(imag(iq[0])))
  443. coll.SetGauge("iq.extract.trimmed.boundary.prev_tail_mag", prevMag, tags)
  444. coll.SetGauge("iq.extract.trimmed.boundary.curr_head_mag", currMag, tags)
  445. coll.Event("extract_trimmed_boundary", "info", "trimmed extractor boundary", tags, map[string]any{
  446. "delta_mag": math.Abs(currMag - prevMag),
  447. "trim_samples": trimSamples,
  448. "out_len": len(iq),
  449. })
  450. }
  451. }
  452. if rawLen > 0 {
  453. rawBoundary[signals[i].ID] = boundaryProbeState{last: res.IQ[rawLen-1], set: true}
  454. }
  455. if len(iq) > 0 {
  456. trimmedBoundary[signals[i].ID] = boundaryProbeState{last: iq[len(iq)-1], set: true}
  457. }
  458. out[i] = iq
  459. rates[i] = res.Rate
  460. }
  461. return out, rates
  462. } else if err != nil {
  463. log.Printf("gpudemod: stream batch extraction failed: %v", err)
  464. }
  465. }
  466. // CPU fallback (with phase tracking)
  467. for i, sig := range signals {
  468. offset := sig.CenterHz - centerHz
  469. bw := jobs[i].BW
  470. ps := phaseState[sig.ID]
  471. // Phase-continuous FreqShift — rewind by overlap so new data starts at ps.phase
  472. shifted := make([]complex64, len(gpuIQ))
  473. inc := -2.0 * math.Pi * offset / float64(sampleRate)
  474. phase := ps.phase - inc*float64(overlapLen)
  475. for k, v := range gpuIQ {
  476. phase += inc
  477. re := math.Cos(phase)
  478. im := math.Sin(phase)
  479. shifted[k] = complex(
  480. float32(float64(real(v))*re-float64(imag(v))*im),
  481. float32(float64(real(v))*im+float64(imag(v))*re),
  482. )
  483. }
  484. // Advance phase by NEW data length only
  485. ps.phase += inc * float64(len(allIQ))
  486. ps.phase = math.Remainder(ps.phase, 2*math.Pi)
  487. cutoff := bw / 2
  488. if cutoff < 200 {
  489. cutoff = 200
  490. }
  491. if cutoff > float64(sampleRate)/2-1 {
  492. cutoff = float64(sampleRate)/2 - 1
  493. }
  494. firTaps := 101
  495. if aqCfg.firTaps > 0 {
  496. firTaps = aqCfg.firTaps
  497. }
  498. taps := dsp.LowpassFIR(cutoff, sampleRate, firTaps)
  499. filtered := dsp.ApplyFIR(shifted, taps)
  500. sigMHz := sig.CenterHz / 1e6
  501. isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO"))
  502. outRate := decimTarget
  503. if isWFM {
  504. outRate = wfmStreamOutRate
  505. }
  506. decim := sampleRate / outRate
  507. if decim < 1 {
  508. decim = 1
  509. }
  510. decimated := dsp.Decimate(filtered, decim)
  511. rates[i] = sampleRate / decim
  512. // Trim overlap — use ceil to ensure ALL overlap samples are removed.
  513. // Floor trim (overlapLen/decim) leaves a remainder for non-divisible
  514. // factors (e.g. 512/20=25 trims only 500 of 512 samples → 12 leak).
  515. trimSamples := (overlapLen + decim - 1) / decim
  516. if i == 0 {
  517. logging.Debug("extract", "cpu_result", "outRate", outRate, "decim", decim, "trim", trimSamples)
  518. }
  519. rawIQ := decimated
  520. rawLen := len(rawIQ)
  521. if trimSamples > 0 && trimSamples < len(decimated) {
  522. decimated = decimated[trimSamples:]
  523. }
  524. if i == 0 {
  525. logging.Debug("boundary", "extract_trim", "path", "cpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(decimated), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID)
  526. logExtractorHeadComparison(signals[i].ID, "cpu", overlapLen, decimated, trimSamples, decimated)
  527. }
  528. if coll != nil {
  529. tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", signals[i].ID), "path", "cpu")
  530. stats := computeIQHeadStats(decimated, 64)
  531. coll.SetGauge("iq.extract.output.length", float64(len(decimated)), tags)
  532. coll.Observe("iq.extract.output.head_mean_mag", stats.meanMag, tags)
  533. coll.Observe("iq.extract.output.head_min_mag", stats.minMag, tags)
  534. coll.Observe("iq.extract.output.head_max_step", stats.maxStep, tags)
  535. coll.Observe("iq.extract.output.head_p95_step", stats.p95Step, tags)
  536. coll.Observe("iq.extract.output.head_tail_ratio", stats.headTail, tags)
  537. coll.SetGauge("iq.extract.output.head_low_magnitude_count", float64(stats.lowMag), tags)
  538. coll.SetGauge("iq.extract.raw.length", float64(rawLen), tags)
  539. coll.SetGauge("iq.extract.trim.trim_samples", float64(trimSamples), tags)
  540. if rb := rawBoundary[signals[i].ID]; rb.set && rawLen > 0 {
  541. observeBoundarySample(coll, "iq.extract.raw.boundary", tags, rb.last, rawIQ[0])
  542. }
  543. if tb := trimmedBoundary[signals[i].ID]; tb.set && len(decimated) > 0 {
  544. observeBoundarySample(coll, "iq.extract.trimmed.boundary", tags, tb.last, decimated[0])
  545. }
  546. }
  547. if rawLen > 0 {
  548. rawBoundary[signals[i].ID] = boundaryProbeState{last: rawIQ[rawLen-1], set: true}
  549. }
  550. if len(decimated) > 0 {
  551. trimmedBoundary[signals[i].ID] = boundaryProbeState{last: decimated[len(decimated)-1], set: true}
  552. }
  553. out[i] = decimated
  554. }
  555. return out, rates
  556. }
  557. type iqHeadStats struct {
  558. length int
  559. minMag float64
  560. maxMag float64
  561. meanMag float64
  562. lowMag int
  563. maxStep float64
  564. maxStepIdx int
  565. p95Step float64
  566. headTail float64
  567. headMinIdx int
  568. stepSamples []float64
  569. }
  570. type boundaryProbeState struct {
  571. last complex64
  572. set bool
  573. }
  574. type headProbe struct {
  575. zeroCount int
  576. firstNonZeroIndex int
  577. maxStep float64
  578. mags []float64
  579. }
  580. func probeHead(samples []complex64, n int, zeroThreshold float64) headProbe {
  581. if n <= 0 || len(samples) == 0 {
  582. return headProbe{firstNonZeroIndex: -1}
  583. }
  584. if len(samples) < n {
  585. n = len(samples)
  586. }
  587. if zeroThreshold <= 0 {
  588. zeroThreshold = 1e-6
  589. }
  590. out := headProbe{firstNonZeroIndex: -1, mags: make([]float64, 0, n)}
  591. for i := 0; i < n; i++ {
  592. v := samples[i]
  593. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  594. out.mags = append(out.mags, mag)
  595. if mag <= zeroThreshold {
  596. out.zeroCount++
  597. } else if out.firstNonZeroIndex < 0 {
  598. out.firstNonZeroIndex = i
  599. }
  600. if i > 0 {
  601. p := samples[i-1]
  602. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  603. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  604. step := math.Abs(math.Atan2(num, den))
  605. if step > out.maxStep {
  606. out.maxStep = step
  607. }
  608. }
  609. }
  610. return out
  611. }
  612. func observeBoundarySample(coll *telemetry.Collector, metricPrefix string, tags map[string]string, prev complex64, curr complex64) {
  613. prevMag := math.Hypot(float64(real(prev)), float64(imag(prev)))
  614. currMag := math.Hypot(float64(real(curr)), float64(imag(curr)))
  615. deltaMag := math.Abs(currMag - prevMag)
  616. num := float64(real(prev))*float64(imag(curr)) - float64(imag(prev))*float64(real(curr))
  617. den := float64(real(prev))*float64(real(curr)) + float64(imag(prev))*float64(imag(curr))
  618. deltaPhase := math.Abs(math.Atan2(num, den))
  619. d2 := float64(real(curr-prev))*float64(real(curr-prev)) + float64(imag(curr-prev))*float64(imag(curr-prev))
  620. coll.Observe(metricPrefix+".delta_mag", deltaMag, tags)
  621. coll.Observe(metricPrefix+".delta_phase", deltaPhase, tags)
  622. coll.Observe(metricPrefix+".d2", d2, tags)
  623. coll.Observe(metricPrefix+".discontinuity_score", deltaMag+deltaPhase, tags)
  624. }
  625. func computeIQHeadStats(iq []complex64, headLen int) iqHeadStats {
  626. stats := iqHeadStats{minMag: math.MaxFloat64, headMinIdx: -1, maxStepIdx: -1}
  627. if len(iq) == 0 {
  628. stats.minMag = 0
  629. return stats
  630. }
  631. n := len(iq)
  632. if headLen > 0 && headLen < n {
  633. n = headLen
  634. }
  635. stats.length = n
  636. stats.stepSamples = make([]float64, 0, max(0, n-1))
  637. sumMag := 0.0
  638. headSum := 0.0
  639. tailSum := 0.0
  640. tailCount := 0
  641. for i := 0; i < n; i++ {
  642. v := iq[i]
  643. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  644. if mag < stats.minMag {
  645. stats.minMag = mag
  646. stats.headMinIdx = i
  647. }
  648. if mag > stats.maxMag {
  649. stats.maxMag = mag
  650. }
  651. sumMag += mag
  652. if mag < 0.05 {
  653. stats.lowMag++
  654. }
  655. if i < min(16, n) {
  656. headSum += mag
  657. }
  658. if i >= max(0, n-16) {
  659. tailSum += mag
  660. tailCount++
  661. }
  662. if i > 0 {
  663. p := iq[i-1]
  664. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  665. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  666. step := math.Abs(math.Atan2(num, den))
  667. if step > stats.maxStep {
  668. stats.maxStep = step
  669. stats.maxStepIdx = i - 1
  670. }
  671. stats.stepSamples = append(stats.stepSamples, step)
  672. }
  673. }
  674. stats.meanMag = sumMag / float64(n)
  675. if len(stats.stepSamples) > 0 {
  676. sorted := append([]float64(nil), stats.stepSamples...)
  677. sort.Float64s(sorted)
  678. idx := int(float64(len(sorted)-1) * 0.95)
  679. stats.p95Step = sorted[idx]
  680. } else {
  681. stats.p95Step = stats.maxStep
  682. }
  683. if headSum > 0 && tailCount > 0 {
  684. headMean := headSum / float64(min(16, n))
  685. tailMean := tailSum / float64(tailCount)
  686. if tailMean > 0 {
  687. stats.headTail = headMean / tailMean
  688. }
  689. }
  690. return stats
  691. }
  692. func observeIQStats(coll *telemetry.Collector, stage string, iq []complex64, tags telemetry.Tags) {
  693. if coll == nil || len(iq) == 0 {
  694. return
  695. }
  696. stats := computeIQHeadStats(iq, len(iq))
  697. stageTags := telemetry.TagsWith(tags, "stage", stage)
  698. coll.Observe("iq.magnitude.min", stats.minMag, stageTags)
  699. coll.Observe("iq.magnitude.max", stats.maxMag, stageTags)
  700. coll.Observe("iq.magnitude.mean", stats.meanMag, stageTags)
  701. coll.Observe("iq.phase_step.max", stats.maxStep, stageTags)
  702. coll.Observe("iq.phase_step.p95", stats.p95Step, stageTags)
  703. coll.Observe("iq.low_magnitude.count", float64(stats.lowMag), stageTags)
  704. coll.SetGauge("iq.length", float64(stats.length), stageTags)
  705. }
  706. func logExtractorHeadComparison(signalID int64, path string, overlapLen int, raw []complex64, trimSamples int, out []complex64) {
  707. rawStats := computeIQHeadStats(raw, 96)
  708. trimmedStats := computeIQHeadStats(out, 96)
  709. logging.Debug("boundary", "extract_head_compare",
  710. "signal", signalID,
  711. "path", path,
  712. "raw_len", len(raw),
  713. "trim", trimSamples,
  714. "out_len", len(out),
  715. "overlap_len", overlapLen,
  716. "raw_min_mag", rawStats.minMag,
  717. "raw_min_idx", rawStats.headMinIdx,
  718. "raw_max_step", rawStats.maxStep,
  719. "raw_max_step_idx", rawStats.maxStepIdx,
  720. "raw_head_tail", rawStats.headTail,
  721. "trimmed_min_mag", trimmedStats.minMag,
  722. "trimmed_min_idx", trimmedStats.headMinIdx,
  723. "trimmed_max_step", trimmedStats.maxStep,
  724. "trimmed_max_step_idx", trimmedStats.maxStepIdx,
  725. "trimmed_head_tail", trimmedStats.headTail,
  726. )
  727. for _, off := range []int{2, 4, 8, 16} {
  728. if len(out) <= off+8 {
  729. continue
  730. }
  731. offStats := computeIQHeadStats(out[off:], 96)
  732. logging.Debug("boundary", "extract_head_offset_compare",
  733. "signal", signalID,
  734. "path", path,
  735. "offset", off,
  736. "base_min_mag", trimmedStats.minMag,
  737. "base_min_idx", trimmedStats.headMinIdx,
  738. "base_max_step", trimmedStats.maxStep,
  739. "base_max_step_idx", trimmedStats.maxStepIdx,
  740. "offset_min_mag", offStats.minMag,
  741. "offset_min_idx", offStats.headMinIdx,
  742. "offset_max_step", offStats.maxStep,
  743. "offset_max_step_idx", offStats.maxStepIdx,
  744. "offset_head_tail", offStats.headTail,
  745. )
  746. }
  747. }