From e48c11c21269ed3366d440125155d2246c14f057 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Wed, 25 Mar 2026 07:23:19 +0100 Subject: [PATCH] debug: add extractor boundary telemetry and notes --- cmd/sdrd/dsp_loop.go | 2 +- cmd/sdrd/helpers.go | 172 +++++++++- cmd/sdrd/pipeline_runtime.go | 82 +++++ config.yaml | 6 +- docs/audio-click-debug-notes-2026-03-24.md | 359 +++++++++++++++++++++ internal/recorder/streamer.go | 116 ++++++- internal/telemetry/telemetry.go | 3 +- 7 files changed, 730 insertions(+), 10 deletions(-) diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index c9b0a78..230a654 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -125,7 +125,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult} extractStart := time.Now() - streamSnips, streamRates := extractForStreaming(extractMgr, art.allIQ, rt.cfg.SampleRate, rt.cfg.CenterHz, streamSignals, rt.streamPhaseState, rt.streamOverlap, aqCfg) + streamSnips, streamRates := extractForStreaming(extractMgr, art.allIQ, rt.cfg.SampleRate, rt.cfg.CenterHz, streamSignals, rt.streamPhaseState, rt.streamOverlap, aqCfg, rt.telemetry) if coll != nil { coll.Observe("stage.extract_stream.duration_ms", float64(time.Since(extractStart).Microseconds())/1000.0, telemetry.TagsFromPairs("frame_id", fmt.Sprintf("%d", frameID))) coll.SetGauge("stage.extract_stream.signals", float64(len(streamSignals)), nil) diff --git a/cmd/sdrd/helpers.go b/cmd/sdrd/helpers.go index bfe3b4f..16bb810 100644 --- a/cmd/sdrd/helpers.go +++ b/cmd/sdrd/helpers.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "log" "math" "os" @@ -260,6 +261,7 @@ func extractForStreaming( phaseState map[int64]*streamExtractState, overlap *streamIQOverlap, aqCfg extractionConfig, + coll *telemetry.Collector, ) ([][]complex64, []int) { out := make([][]complex64, len(signals)) rates := make([]int, len(signals)) @@ -301,6 +303,18 @@ func extractForStreaming( bwMult = 1.0 } + if coll != nil { + coll.SetGauge("iq.extract.input.length", float64(len(allIQ)), nil) + coll.SetGauge("iq.extract.input.overlap_length", float64(overlapLen), nil) + headMean, tailMean, boundaryScore, _ := boundaryMetrics(overlap.tail, allIQ, 32) + coll.SetGauge("iq.extract.input.head_mean_mag", headMean, nil) + coll.SetGauge("iq.extract.input.prev_tail_mean_mag", tailMean, nil) + coll.Observe("iq.extract.input.discontinuity_score", boundaryScore, nil) + } + + rawBoundary := make(map[int64]boundaryProbeState, len(signals)) + trimmedBoundary := make(map[int64]boundaryProbeState, len(signals)) + // Build jobs with per-signal phase jobs := make([]gpudemod.ExtractJob, len(signals)) for i, sig := range signals { @@ -384,6 +398,77 @@ func extractForStreaming( logging.Debug("boundary", "extract_trim", "path", "gpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(iq), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID) logExtractorHeadComparison(signals[i].ID, "gpu", overlapLen, res.IQ, trimSamples, iq) } + if coll != nil { + tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", signals[i].ID), "path", "gpu") + stats := computeIQHeadStats(iq, 64) + coll.SetGauge("iq.extract.output.length", float64(len(iq)), tags) + coll.Observe("iq.extract.output.head_mean_mag", stats.meanMag, tags) + coll.Observe("iq.extract.output.head_min_mag", stats.minMag, tags) + coll.Observe("iq.extract.output.head_max_step", stats.maxStep, tags) + coll.Observe("iq.extract.output.head_p95_step", stats.p95Step, tags) + coll.Observe("iq.extract.output.head_tail_ratio", stats.headTail, tags) + coll.SetGauge("iq.extract.output.head_low_magnitude_count", float64(stats.lowMag), tags) + coll.SetGauge("iq.extract.raw.length", float64(rawLen), tags) + coll.SetGauge("iq.extract.trim.trim_samples", float64(trimSamples), tags) + if rawLen > 0 { + coll.SetGauge("iq.extract.raw.head_mag", math.Hypot(float64(real(res.IQ[0])), float64(imag(res.IQ[0]))), tags) + coll.SetGauge("iq.extract.raw.tail_mag", math.Hypot(float64(real(res.IQ[rawLen-1])), float64(imag(res.IQ[rawLen-1]))), tags) + rawHead := probeHead(res.IQ, 16, 1e-6) + coll.SetGauge("iq.extract.raw.head_zero_count", float64(rawHead.zeroCount), tags) + coll.SetGauge("iq.extract.raw.first_nonzero_index", float64(rawHead.firstNonZeroIndex), tags) + coll.SetGauge("iq.extract.raw.head_max_step", rawHead.maxStep, tags) + coll.Event("extract_raw_head_probe", "info", "raw extractor head probe", tags, map[string]any{ + "mags": rawHead.mags, + "zero_count": rawHead.zeroCount, + "first_nonzero_index": rawHead.firstNonZeroIndex, + "head_max_step": rawHead.maxStep, + "trim_samples": trimSamples, + }) + } + if len(iq) > 0 { + coll.SetGauge("iq.extract.trimmed.head_mag", math.Hypot(float64(real(iq[0])), float64(imag(iq[0]))), tags) + coll.SetGauge("iq.extract.trimmed.tail_mag", math.Hypot(float64(real(iq[len(iq)-1])), float64(imag(iq[len(iq)-1]))), tags) + trimmedHead := probeHead(iq, 16, 1e-6) + coll.SetGauge("iq.extract.trimmed.head_zero_count", float64(trimmedHead.zeroCount), tags) + coll.SetGauge("iq.extract.trimmed.first_nonzero_index", float64(trimmedHead.firstNonZeroIndex), tags) + coll.SetGauge("iq.extract.trimmed.head_max_step", trimmedHead.maxStep, tags) + coll.Event("extract_trimmed_head_probe", "info", "trimmed extractor head probe", tags, map[string]any{ + "mags": trimmedHead.mags, + "zero_count": trimmedHead.zeroCount, + "first_nonzero_index": trimmedHead.firstNonZeroIndex, + "head_max_step": trimmedHead.maxStep, + "trim_samples": trimSamples, + }) + } + if rb := rawBoundary[signals[i].ID]; rb.set && rawLen > 0 { + prevMag := math.Hypot(float64(real(rb.last)), float64(imag(rb.last))) + currMag := math.Hypot(float64(real(res.IQ[0])), float64(imag(res.IQ[0]))) + coll.SetGauge("iq.extract.raw.boundary.prev_tail_mag", prevMag, tags) + coll.SetGauge("iq.extract.raw.boundary.curr_head_mag", currMag, tags) + coll.Event("extract_raw_boundary", "info", "raw extractor boundary", tags, map[string]any{ + "delta_mag": math.Abs(currMag - prevMag), + "trim_samples": trimSamples, + "raw_len": rawLen, + }) + } + if tb := trimmedBoundary[signals[i].ID]; tb.set && len(iq) > 0 { + prevMag := math.Hypot(float64(real(tb.last)), float64(imag(tb.last))) + currMag := math.Hypot(float64(real(iq[0])), float64(imag(iq[0]))) + coll.SetGauge("iq.extract.trimmed.boundary.prev_tail_mag", prevMag, tags) + coll.SetGauge("iq.extract.trimmed.boundary.curr_head_mag", currMag, tags) + coll.Event("extract_trimmed_boundary", "info", "trimmed extractor boundary", tags, map[string]any{ + "delta_mag": math.Abs(currMag - prevMag), + "trim_samples": trimSamples, + "out_len": len(iq), + }) + } + } + if rawLen > 0 { + rawBoundary[signals[i].ID] = boundaryProbeState{last: res.IQ[rawLen-1], set: true} + } + if len(iq) > 0 { + trimmedBoundary[signals[i].ID] = boundaryProbeState{last: iq[len(iq)-1], set: true} + } out[i] = iq rates[i] = res.Rate } @@ -449,7 +534,8 @@ func extractForStreaming( if i == 0 { logging.Debug("extract", "cpu_result", "outRate", outRate, "decim", decim, "trim", trimSamples) } - rawLen := len(decimated) + rawIQ := decimated + rawLen := len(rawIQ) if trimSamples > 0 && trimSamples < len(decimated) { decimated = decimated[trimSamples:] } @@ -457,6 +543,31 @@ func extractForStreaming( logging.Debug("boundary", "extract_trim", "path", "cpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(decimated), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID) logExtractorHeadComparison(signals[i].ID, "cpu", overlapLen, decimated, trimSamples, decimated) } + if coll != nil { + tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", signals[i].ID), "path", "cpu") + stats := computeIQHeadStats(decimated, 64) + coll.SetGauge("iq.extract.output.length", float64(len(decimated)), tags) + coll.Observe("iq.extract.output.head_mean_mag", stats.meanMag, tags) + coll.Observe("iq.extract.output.head_min_mag", stats.minMag, tags) + coll.Observe("iq.extract.output.head_max_step", stats.maxStep, tags) + coll.Observe("iq.extract.output.head_p95_step", stats.p95Step, tags) + coll.Observe("iq.extract.output.head_tail_ratio", stats.headTail, tags) + coll.SetGauge("iq.extract.output.head_low_magnitude_count", float64(stats.lowMag), tags) + coll.SetGauge("iq.extract.raw.length", float64(rawLen), tags) + coll.SetGauge("iq.extract.trim.trim_samples", float64(trimSamples), tags) + if rb := rawBoundary[signals[i].ID]; rb.set && rawLen > 0 { + observeBoundarySample(coll, "iq.extract.raw.boundary", tags, rb.last, rawIQ[0]) + } + if tb := trimmedBoundary[signals[i].ID]; tb.set && len(decimated) > 0 { + observeBoundarySample(coll, "iq.extract.trimmed.boundary", tags, tb.last, decimated[0]) + } + } + if rawLen > 0 { + rawBoundary[signals[i].ID] = boundaryProbeState{last: rawIQ[rawLen-1], set: true} + } + if len(decimated) > 0 { + trimmedBoundary[signals[i].ID] = boundaryProbeState{last: decimated[len(decimated)-1], set: true} + } out[i] = decimated } return out, rates @@ -476,6 +587,65 @@ type iqHeadStats struct { stepSamples []float64 } +type boundaryProbeState struct { + last complex64 + set bool +} + +type headProbe struct { + zeroCount int + firstNonZeroIndex int + maxStep float64 + mags []float64 +} + +func probeHead(samples []complex64, n int, zeroThreshold float64) headProbe { + if n <= 0 || len(samples) == 0 { + return headProbe{firstNonZeroIndex: -1} + } + if len(samples) < n { + n = len(samples) + } + if zeroThreshold <= 0 { + zeroThreshold = 1e-6 + } + out := headProbe{firstNonZeroIndex: -1, mags: make([]float64, 0, n)} + for i := 0; i < n; i++ { + v := samples[i] + mag := math.Hypot(float64(real(v)), float64(imag(v))) + out.mags = append(out.mags, mag) + if mag <= zeroThreshold { + out.zeroCount++ + } else if out.firstNonZeroIndex < 0 { + out.firstNonZeroIndex = i + } + if i > 0 { + p := samples[i-1] + num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v)) + den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v)) + step := math.Abs(math.Atan2(num, den)) + if step > out.maxStep { + out.maxStep = step + } + } + } + return out +} + +func observeBoundarySample(coll *telemetry.Collector, metricPrefix string, tags map[string]string, prev complex64, curr complex64) { + prevMag := math.Hypot(float64(real(prev)), float64(imag(prev))) + currMag := math.Hypot(float64(real(curr)), float64(imag(curr))) + deltaMag := math.Abs(currMag - prevMag) + num := float64(real(prev))*float64(imag(curr)) - float64(imag(prev))*float64(real(curr)) + den := float64(real(prev))*float64(real(curr)) + float64(imag(prev))*float64(imag(curr)) + deltaPhase := math.Abs(math.Atan2(num, den)) + d2 := float64(real(curr-prev))*float64(real(curr-prev)) + float64(imag(curr-prev))*float64(imag(curr-prev)) + coll.Observe(metricPrefix+".delta_mag", deltaMag, tags) + coll.Observe(metricPrefix+".delta_phase", deltaPhase, tags) + coll.Observe(metricPrefix+".d2", d2, tags) + coll.Observe(metricPrefix+".discontinuity_score", deltaMag+deltaPhase, tags) +} + func computeIQHeadStats(iq []complex64, headLen int) iqHeadStats { stats := iqHeadStats{minMag: math.MaxFloat64, headMinIdx: -1, maxStepIdx: -1} if len(iq) == 0 { diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index f8cf665..8b182e2 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -68,6 +68,7 @@ type dspRuntime struct { arbitration pipeline.ArbitrationState gotSamples bool telemetry *telemetry.Collector + lastAllIQTail []complex64 } type spectrumArtifacts struct { @@ -361,6 +362,63 @@ func (rt *dspRuntime) decimateSurveillanceIQ(iq []complex64, factor int) []compl return dsp.Decimate(filtered, factor) } +func meanMagComplex(samples []complex64) float64 { + if len(samples) == 0 { + return 0 + } + var sum float64 + for _, v := range samples { + sum += math.Hypot(float64(real(v)), float64(imag(v))) + } + return sum / float64(len(samples)) +} + +func phaseStepAbs(a, b complex64) float64 { + num := float64(real(a))*float64(imag(b)) - float64(imag(a))*float64(real(b)) + den := float64(real(a))*float64(real(b)) + float64(imag(a))*float64(imag(b)) + return math.Abs(math.Atan2(num, den)) +} + +func boundaryMetrics(prevTail []complex64, curr []complex64, window int) (float64, float64, float64, int) { + if len(curr) == 0 { + return 0, 0, 0, 0 + } + if window <= 0 { + window = 16 + } + headN := window + if len(curr) < headN { + headN = len(curr) + } + headMean := meanMagComplex(curr[:headN]) + if len(prevTail) == 0 { + return headMean, 0, 0, headN + } + tailN := window + if len(prevTail) < tailN { + tailN = len(prevTail) + } + tailMean := meanMagComplex(prevTail[len(prevTail)-tailN:]) + deltaMag := math.Abs(headMean - tailMean) + phaseJump := phaseStepAbs(prevTail[len(prevTail)-1], curr[0]) + score := deltaMag + phaseJump + return headMean, tailMean, score, headN +} + +func tailWindowComplex(src []complex64, n int) []complex64 { + if n <= 0 || len(src) == 0 { + return nil + } + if len(src) <= n { + out := make([]complex64, len(src)) + copy(out, src) + return out + } + out := make([]complex64, n) + copy(out, src[len(src)-n:]) + return out +} + func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) { start := time.Now() required := rt.cfg.FFTSize @@ -461,12 +519,36 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag rt.telemetry.SetGauge("iq.stage.surveillance.length", float64(len(survIQ)), nil) rt.telemetry.SetGauge("iq.stage.detail.length", float64(len(detailIQ)), nil) rt.telemetry.Observe("capture.total.duration_ms", float64(time.Since(start).Microseconds())/1000.0, nil) + + headMean, tailMean, boundaryScore, boundaryWindow := boundaryMetrics(rt.lastAllIQTail, allIQ, 32) + rt.telemetry.SetGauge("iq.boundary.all.head_mean_mag", headMean, nil) + rt.telemetry.SetGauge("iq.boundary.all.prev_tail_mean_mag", tailMean, nil) + rt.telemetry.Observe("iq.boundary.all.discontinuity_score", boundaryScore, nil) + if len(rt.lastAllIQTail) > 0 && len(allIQ) > 0 { + deltaMag := math.Abs(math.Hypot(float64(real(allIQ[0])), float64(imag(allIQ[0]))) - math.Hypot(float64(real(rt.lastAllIQTail[len(rt.lastAllIQTail)-1])), float64(imag(rt.lastAllIQTail[len(rt.lastAllIQTail)-1])))) + phaseJump := phaseStepAbs(rt.lastAllIQTail[len(rt.lastAllIQTail)-1], allIQ[0]) + rt.telemetry.Observe("iq.boundary.all.delta_mag", deltaMag, nil) + rt.telemetry.Observe("iq.boundary.all.delta_phase", phaseJump, nil) + if rt.telemetry.ShouldSampleHeavy() { + rt.telemetry.Event("alliq_boundary", "info", "allIQ boundary snapshot", nil, map[string]any{ + "window": boundaryWindow, + "head_mean_mag": headMean, + "prev_tail_mean_mag": tailMean, + "delta_mag": deltaMag, + "delta_phase": phaseJump, + "discontinuity_score": boundaryScore, + "alliq_len": len(allIQ), + "stream_dropped": streamDropped, + }) + } + } if rt.telemetry.ShouldSampleHeavy() { observeIQStats(rt.telemetry, "capture_all", allIQ, nil) observeIQStats(rt.telemetry, "capture_surveillance", survIQ, nil) observeIQStats(rt.telemetry, "capture_detail", detailIQ, nil) } } + rt.lastAllIQTail = tailWindowComplex(allIQ, 32) survSpectrum := rt.spectrumFromIQ(survIQ, gpuState) sanitizeSpectrum(survSpectrum) detailSpectrum := survSpectrum diff --git a/config.yaml b/config.yaml index 3d1b720..4485db3 100644 --- a/config.yaml +++ b/config.yaml @@ -255,9 +255,9 @@ debug: enabled: true heavy_enabled: false heavy_sample_every: 12 - metric_sample_every: 2 - metric_history_max: 12000 - event_history_max: 4000 + metric_sample_every: 8 + metric_history_max: 6000 + event_history_max: 1500 retention_seconds: 900 persist_enabled: true persist_dir: debug/telemetry diff --git a/docs/audio-click-debug-notes-2026-03-24.md b/docs/audio-click-debug-notes-2026-03-24.md index 4f4824f..55add8d 100644 --- a/docs/audio-click-debug-notes-2026-03-24.md +++ b/docs/audio-click-debug-notes-2026-03-24.md @@ -321,6 +321,364 @@ Later refinements to this theory: 4. When testing fixes, prefer low-overhead, theory-driven experiments over broad logging/dump spam. 5. Only re-enable audio dump windows selectively and briefly. +### Debug TODO / operational reminders + +- The current telemetry collector is **not** using a true ring buffer for metric/event history. +- Internally it keeps append-only history slices (`metricsHistory`, `events`) and periodically trims them by copying tail slices. +- Under heavy per-block telemetry this can add enough mutex/copy overhead to make the live stream start stuttering after a short run. +- Therefore: keep telemetry sampling conservative during live reproduction runs; do **not** leave full heavy telemetry enabled longer than needed. +- Follow-up engineering task: replace or redesign telemetry history storage to use a proper low-overhead ring-buffer style structure (or equivalent bounded lock-light design) if live telemetry is to remain a standard debugging tool. + +--- + +## 2026-03-25 update — extractor-focused live telemetry findings + +### Where the investigation moved + +The investigation was deliberately refocused away from browser/feed/demod-only suspicions and toward: +- shared upstream IQ cadence / block boundaries +- extractor input/output continuity +- raw vs trimmed extractor-head behaviour + +This was driven by two observations: +1. all signals still click +2. the newly added live telemetry made it possible to inspect the shared path while the system was running + +### Telemetry infrastructure / config notes + +Two config files matter for debug telemetry defaults: +- `config.yaml` +- `config.autosave.yaml` + +The autosave file can overwrite intended telemetry defaults after restart, so both must be updated together. + +Current conservative live-debug defaults that worked better: +- `heavy_enabled: false` +- `heavy_sample_every: 12` +- `metric_sample_every: 8` +- `metric_history_max: 6000` +- `event_history_max: 1500` + +Important operational lesson: +- runtime `POST /api/debug/telemetry/config` changes only affect the current `sdrd` process +- after restart, the process reloads config defaults again +- if autosave still contains older values (for example `heavy_enabled: true` or very large history limits), the debug run can accidentally become self-distorting again + +### Telemetry endpoints + +The live debug work used these HTTP endpoints on the `sdrd` web server (typically `http://127.0.0.1:8080`): + +#### `GET /api/debug/telemetry/config` +Returns the current effective telemetry configuration. +Useful for verifying: +- whether heavy telemetry is enabled +- history sizes +- persistence settings +- sample rates actually active in the running process + +Typical fields: +- `enabled` +- `heavy_enabled` +- `heavy_sample_every` +- `metric_sample_every` +- `metric_history_max` +- `event_history_max` +- `retention_seconds` +- `persist_enabled` +- `persist_dir` + +#### `POST /api/debug/telemetry/config` +Applies runtime telemetry config changes to the current process. +Used during investigation to temporarily reduce telemetry load without editing files. + +Example body used during investigation: +```json +{ + "heavy_enabled": true, + "heavy_sample_every": 12, + "metric_sample_every": 8 +} +``` + +#### `GET /api/debug/telemetry/live` +Returns the current live metric snapshot (gauges/counters/distributions). +Useful for: +- quick sanity checks +- verifying that a metric family exists +- confirming whether a new metric name is actually being emitted + +#### `GET /api/debug/telemetry/history?prefix=&limit=` +Returns stored metric history entries filtered by metric-name prefix. +This is the main endpoint for time-series debugging during live runs. + +Useful examples: +- `prefix=stage.` +- `prefix=source.` +- `prefix=iq.boundary.all` +- `prefix=iq.extract.input` +- `prefix=iq.extract.output` +- `prefix=iq.extract.raw.` +- `prefix=iq.extract.trimmed.` +- `prefix=iq.pre_demod` +- `prefix=audio.demod` + +#### `GET /api/debug/telemetry/events?limit=` +Returns recent structured telemetry events. +Used heavily once compact per-block event probes were added, because events were often easier to inspect reliably than sparsely sampled distribution histories. + +This ended up being especially useful for: +- raw extractor head probes +- trimmed extractor head probes +- boundary snapshots + +### Important telemetry families added/used + +#### Shared-path / global boundary metrics +- `iq.boundary.all.head_mean_mag` +- `iq.boundary.all.prev_tail_mean_mag` +- `iq.boundary.all.delta_mag` +- `iq.boundary.all.delta_phase` +- `iq.boundary.all.discontinuity_score` + +Purpose: +- detect whether the shared `allIQ` block boundary was already obviously broken before signal-specific extraction + +#### Extractor input/output metrics +- `iq.extract.input.length` +- `iq.extract.input.overlap_length` +- `iq.extract.input.head_mean_mag` +- `iq.extract.input.prev_tail_mean_mag` +- `iq.extract.input.discontinuity_score` +- `iq.extract.output.length` +- `iq.extract.output.head_mean_mag` +- `iq.extract.output.head_min_mag` +- `iq.extract.output.head_max_step` +- `iq.extract.output.head_p95_step` +- `iq.extract.output.head_tail_ratio` +- `iq.extract.output.head_low_magnitude_count` +- `iq.extract.output.boundary.delta_mag` +- `iq.extract.output.boundary.delta_phase` +- `iq.extract.output.boundary.d2` +- `iq.extract.output.boundary.discontinuity_score` + +Purpose: +- isolate whether the final per-signal extractor output itself was discontinuous across blocks + +#### Raw vs trimmed extractor-head telemetry +- `iq.extract.raw.length` +- `iq.extract.raw.head_mag` +- `iq.extract.raw.tail_mag` +- `iq.extract.raw.head_zero_count` +- `iq.extract.raw.first_nonzero_index` +- `iq.extract.raw.head_max_step` +- `iq.extract.trim.trim_samples` +- `iq.extract.trimmed.head_mag` +- `iq.extract.trimmed.tail_mag` +- `iq.extract.trimmed.head_zero_count` +- `iq.extract.trimmed.first_nonzero_index` +- `iq.extract.trimmed.head_max_step` +- event `extract_raw_head_probe` +- event `extract_trimmed_head_probe` + +Purpose: +- answer the key question: is the corruption already present in the raw extractor output head, or created by trimming/overlap logic afterward? + +#### Pre-demod / audio-stage metrics +- `iq.pre_demod.head_mean_mag` +- `iq.pre_demod.head_min_mag` +- `iq.pre_demod.head_max_step` +- `iq.pre_demod.head_p95_step` +- `iq.pre_demod.head_low_magnitude_count` +- `audio.demod.head_mean_abs` +- `audio.demod.tail_mean_abs` +- `audio.demod.edge_delta_abs` +- existing `audio.demod_boundary.*` + +Purpose: +- verify where artifacts become visible/audible downstream + +### What the 2026-03-25 telemetry actually showed + +#### 1. Feed / enqueue remained relatively uninteresting +`stage.feed_enqueue.duration_ms` was usually effectively zero. + +Representative values during live runs: +- mostly `0` +- occasional small spikes such as `0.5 ms` and `5.8 ms` + +Interpretation: +- feed enqueue is not the main source of clicks + +#### 2. Extract-stream time was usually modest +`stage.extract_stream.duration_ms` was usually small and stable compared with the main loop. + +Representative values: +- often `1–5 ms` +- occasional spikes such as `10.7 ms` and `18.9 ms` + +Interpretation: +- extraction is not free, but runtime cost alone does not explain the clicks + +#### 3. Shared capture / source cadence still fluctuated heavily +Representative live values: +- `dsp.frame.duration_ms`: often around `90–100 ms`, but also `110–150 ms`, with one observed spike around `212.6 ms` +- `source.read.duration_ms`: roughly `80–90 ms` often, but also about `60 ms`, `47 ms`, `19 ms`, and even `0.677 ms` +- `source.buffer_samples`: ranged from very small to very large bursts, including examples like `512`, `4608`, `94720`, `179200`, `304544` +- a `source_reset` event was seen and `source.resets=1` + +Interpretation: +- shared upstream cadence is clearly unstable enough to remain suspicious +- but this alone did not localize the final click mechanism + +#### 4. Pre-demod stage showed repeated hard phase anomalies even when energy looked healthy +Representative live values for normal non-vanishing signals: +- `iq.pre_demod.head_mean_mag` around `0.25–0.31` +- `iq.pre_demod.head_low_magnitude_count = 0` +- `iq.pre_demod.head_max_step` repeatedly high, including roughly: + - `1.5` + - `2.0` + - `2.4` + - `2.8` + - `3.08` + +Interpretation: +- not primarily an amplitude collapse +- rather a strong phase/continuity defect reaching the pre-demod stage + +#### 5. Audio stage still showed real block-edge artifacts +Representative values: +- `audio.demod.edge_delta_abs` repeatedly around `0.4–0.8` +- outliers up to roughly `1.21` and `1.26` +- `audio.demod_boundary.count` continued to fire repeatedly + +Interpretation: +- demod is where the problem becomes audible, but the root cause still appeared to be earlier/shared + +### Key extractor findings from the new telemetry + +#### A. Per-signal extractor output boundary is genuinely broken +For a representative strong signal (`signal_id=2`), `iq.extract.output.boundary.delta_phase` repeatedly showed very large jumps such as: +- `2.60` +- `3.06` +- `2.14` +- `2.71` +- `3.09` +- `2.92` +- `2.63` +- `2.78` + +Also observed for `iq.extract.output.boundary.discontinuity_score`: +- `2.86` +- `3.08` +- `2.92` +- `2.52` +- `2.40` +- `2.85` + +Later runs using `d2` made the discontinuity even easier to see. Representative `iq.extract.output.boundary.d2` values for the same strong signal included: +- `0.347` +- `0.303` +- `0.362` +- `0.359` +- `0.382` +- `0.344` +- `0.337` +- `0.206` + +At the same time, `iq.extract.output.boundary.delta_mag` was often comparatively small (examples around `0.0003–0.0038`). + +Interpretation: +- the main boundary defect is not primarily amplitude mismatch +- it is much more consistent with complex/phase discontinuity across output blocks + +#### B. The raw extractor head is systematically bad on all signals +The new `extract_raw_head_probe` events were the strongest finding of the day. + +Representative repeated pattern for strong signals (`signal_id=1` and `signal_id=2`): +- `first_nonzero_index = 1` +- `zero_count = 1` +- first magnitude sample exactly `0` +- then a short ramp: e.g. for `signal_id=2` + - `0` + - `0.000388` + - `0.002316` + - `0.004152` + - `0.019126` + - `0.011418` + - `0.124034` + - `0.257569` + - `0.317579` +- `head_max_step` often near π, e.g.: + - `3.141592653589793` + - `3.088773696463606` + - `3.0106854446936318` + - `2.9794833659932527` + +The same qualitative pattern appeared for weaker signals too: +- raw head starts at `0` +- a brief near-zero ramp follows +- only after several samples does the magnitude look like a normal extracted band + +Interpretation: +- the raw extractor output head is already damaged / settling / invalid before trimming +- this strongly supports an upstream/shared-start-condition problem rather than a trim-created artifact + +#### C. The trimmed extractor head usually looks sane +Representative repeated pattern for the same signals after `trim_samples = 64`: +- `first_nonzero_index = 0` +- `zero_count = 0` +- magnitudes look immediately plausible and stable +- `head_max_step` is dramatically lower than raw, often around `0.15–0.9` for strong channels + +Example trimmed head magnitudes for `signal_id=2`: +- `0.299350` +- `0.300954` +- `0.298032` +- `0.298738` +- `0.312258` +- `0.296932` +- `0.239010` +- `0.266881` +- `0.313193` + +Example trimmed head magnitudes for `signal_id=1`: +- `0.277400` +- `0.275994` +- `0.273718` +- `0.272846` +- `0.277842` +- `0.278398` +- `0.268829` +- `0.273790` +- `0.279031` + +Interpretation: +- trimming is removing a genuinely bad raw head region +- trimming is therefore **not** the main origin of the problem +- it acts more like cleanup of an already bad upstream/raw start region + +### Strongest current conclusion after the 2026-03-25 telemetry pass + +The current best reading is: + +> The click root cause is very likely **upstream of final trimming**, at or before the raw extractor output head, and likely tied to shared block-boundary / extractor-start conditions rather than to feed enqueue, browser playback, or trimming itself. + +More specifically: +- all signals show a systematically bad raw extractor head +- the trimmed head usually looks healthier +- yet the final extractor output still exhibits significant complex boundary discontinuity from block to block +- therefore there are likely **two connected effects**: + 1. a bad / settling / zeroed raw extractor head + 2. remaining complex phase-continuity problems across final output blocks even after that head is trimmed away + +### What should not be forgotten from this stage + +- The overlap-prepend bug was real and worth fixing, but was not sufficient. +- The fixed read-size path (`SDR_FORCE_FIXED_STREAM_READ_SAMPLES=389120`) remains useful and likely worth promoting later, but it is not the root-cause fix. +- The telemetry system itself can perturb runs if overused; conservative sampling matters. +- `config.autosave.yaml` must be kept in sync with `config.yaml` or telemetry defaults can silently revert after restart. +- The most promising root-cause area is now the shared upstream/extractor-start boundary path, not downstream playback. + --- ## Meta note @@ -332,3 +690,4 @@ The most important thing not to forget is: - the click is already present in demod audio - whole-process CPU saturation is not the main explanation - excessive debug instrumentation can itself create misleading secondary problems +- the 2026-03-25 extractor telemetry strongly suggests the remaining root cause is upstream of the final trim stage diff --git a/internal/recorder/streamer.go b/internal/recorder/streamer.go index 3e61c6e..65cb34d 100644 --- a/internal/recorder/streamer.go +++ b/internal/recorder/streamer.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strconv" "strings" + "sort" "sync" "time" @@ -56,6 +57,10 @@ type streamSession struct { prevDecIQ complex64 lastDecIQSet bool + lastExtractIQ complex64 + prevExtractIQ complex64 + lastExtractIQSet bool + lastDemodL float32 prevDemodL float64 lastDemodSet bool @@ -898,12 +903,95 @@ func (st *Streamer) UnsubscribeAudio(subID int64) { // processSnippet takes a pre-extracted IQ snippet and demodulates it with // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz // output with zero transient artifacts. +type iqHeadProbeStats struct { + meanMag float64 + minMag float64 + maxStep float64 + p95Step float64 + lowMag int +} + +func probeIQHeadStats(iq []complex64, probeLen int) iqHeadProbeStats { + if probeLen <= 0 || len(iq) == 0 { + return iqHeadProbeStats{} + } + if len(iq) < probeLen { + probeLen = len(iq) + } + stats := iqHeadProbeStats{minMag: math.MaxFloat64} + steps := make([]float64, 0, probeLen) + var sum float64 + for i := 0; i < probeLen; i++ { + v := iq[i] + mag := math.Hypot(float64(real(v)), float64(imag(v))) + sum += mag + if mag < stats.minMag { + stats.minMag = mag + } + if mag < 0.02 { + stats.lowMag++ + } + if i > 0 { + p := iq[i-1] + num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v)) + den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v)) + step := math.Abs(math.Atan2(num, den)) + steps = append(steps, step) + if step > stats.maxStep { + stats.maxStep = step + } + } + } + stats.meanMag = sum / float64(probeLen) + if len(steps) > 0 { + sorted := append([]float64(nil), steps...) + sort.Float64s(sorted) + idx := int(math.Round(0.95 * float64(len(sorted)-1))) + if idx < 0 { + idx = 0 + } + if idx >= len(sorted) { + idx = len(sorted) - 1 + } + stats.p95Step = sorted[idx] + } + if stats.minMag == math.MaxFloat64 { + stats.minMag = 0 + } + return stats +} + func (sess *streamSession) processSnippet(snippet []complex64, snipRate int, coll *telemetry.Collector) ([]float32, int) { if len(snippet) == 0 || snipRate <= 0 { return nil, 0 } + baseTags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID) if coll != nil { - coll.SetGauge("iq.stage.snippet.length", float64(len(snippet)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + coll.SetGauge("iq.stage.snippet.length", float64(len(snippet)), baseTags) + stats := probeIQHeadStats(snippet, 64) + coll.Observe("iq.snippet.head_mean_mag", stats.meanMag, baseTags) + coll.Observe("iq.snippet.head_min_mag", stats.minMag, baseTags) + coll.Observe("iq.snippet.head_max_step", stats.maxStep, baseTags) + coll.Observe("iq.snippet.head_p95_step", stats.p95Step, baseTags) + coll.SetGauge("iq.snippet.head_low_magnitude_count", float64(stats.lowMag), baseTags) + if sess.lastExtractIQSet { + prevMag := math.Hypot(float64(real(sess.lastExtractIQ)), float64(imag(sess.lastExtractIQ))) + currMag := math.Hypot(float64(real(snippet[0])), float64(imag(snippet[0]))) + deltaMag := math.Abs(currMag - prevMag) + num := float64(real(sess.lastExtractIQ))*float64(imag(snippet[0])) - float64(imag(sess.lastExtractIQ))*float64(real(snippet[0])) + den := float64(real(sess.lastExtractIQ))*float64(real(snippet[0])) + float64(imag(sess.lastExtractIQ))*float64(imag(snippet[0])) + deltaPhase := math.Abs(math.Atan2(num, den)) + d2 := float64(real(snippet[0]-sess.lastExtractIQ))*float64(real(snippet[0]-sess.lastExtractIQ)) + float64(imag(snippet[0]-sess.lastExtractIQ))*float64(imag(snippet[0]-sess.lastExtractIQ)) + coll.Observe("iq.extract.output.boundary.delta_mag", deltaMag, baseTags) + coll.Observe("iq.extract.output.boundary.delta_phase", deltaPhase, baseTags) + coll.Observe("iq.extract.output.boundary.d2", d2, baseTags) + coll.Observe("iq.extract.output.boundary.discontinuity_score", deltaMag+deltaPhase, baseTags) + } + } + if len(snippet) > 0 { + sess.prevExtractIQ = sess.lastExtractIQ + sess.lastExtractIQ = snippet[len(snippet)-1] + sess.lastExtractIQSet = true } isWFMStereo := sess.demodName == "WFM_STEREO" @@ -1015,8 +1103,14 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int, col dec = sess.preDemodDecimator.Process(fullSnip) sess.preDemodDecimPhase = sess.preDemodDecimator.Phase() if coll != nil { - coll.Observe("dsp.pre_demod.decimation_factor", float64(decim1), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) - coll.SetGauge("iq.stage.pre_demod.length", float64(len(dec)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + coll.Observe("dsp.pre_demod.decimation_factor", float64(decim1), baseTags) + coll.SetGauge("iq.stage.pre_demod.length", float64(len(dec)), baseTags) + decStats := probeIQHeadStats(dec, 64) + coll.Observe("iq.pre_demod.head_mean_mag", decStats.meanMag, baseTags) + coll.Observe("iq.pre_demod.head_min_mag", decStats.minMag, baseTags) + coll.Observe("iq.pre_demod.head_max_step", decStats.maxStep, baseTags) + coll.Observe("iq.pre_demod.head_p95_step", decStats.p95Step, baseTags) + coll.SetGauge("iq.pre_demod.head_low_magnitude_count", float64(decStats.lowMag), baseTags) } logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase) } else { @@ -1149,7 +1243,21 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int, col return nil, 0 } if coll != nil { - coll.SetGauge("audio.stage.demod.length", float64(len(audio)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + coll.SetGauge("audio.stage.demod.length", float64(len(audio)), baseTags) + probe := 64 + if len(audio) < probe { + probe = len(audio) + } + if probe > 0 { + var headAbs, tailAbs float64 + for i := 0; i < probe; i++ { + headAbs += math.Abs(float64(audio[i])) + tailAbs += math.Abs(float64(audio[len(audio)-probe+i])) + } + coll.Observe("audio.demod.head_mean_abs", headAbs/float64(probe), baseTags) + coll.Observe("audio.demod.tail_mean_abs", tailAbs/float64(probe), baseTags) + coll.Observe("audio.demod.edge_delta_abs", math.Abs(float64(audio[0])-float64(audio[len(audio)-1])), baseTags) + } } if logging.EnabledCategory("boundary") { stride := d.Channels() diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 837f868..e57a6a1 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -356,7 +356,8 @@ func (c *Collector) recordMetric(kind string, name string, value float64, tags T } sampleN := c.cfg.MetricSampleEvery seq := atomic.AddUint64(&c.counterSeq, 1) - shouldStore := sampleN <= 1 || seq%uint64(sampleN) == 0 || kind == "counter" + forceStore := strings.HasPrefix(name, "iq.extract.raw.boundary.") || strings.HasPrefix(name, "iq.extract.trimmed.boundary.") + shouldStore := forceStore || sampleN <= 1 || seq%uint64(sampleN) == 0 || kind == "counter" var mp MetricPoint if shouldStore { mp = MetricPoint{