From 94c132d6fc7431ca220a3d11d5fe850523d9e8ad Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Tue, 24 Mar 2026 15:06:16 +0100 Subject: [PATCH] debug: instrument audio click investigation --- cmd/sdrd/dsp_loop.go | 8 ++++++ cmd/sdrd/helpers.go | 31 +++++++++++++++++++-- cmd/sdrd/pipeline_runtime.go | 30 ++++++++++++++++++++- internal/recorder/recorder.go | 7 +++++ internal/recorder/streamer.go | 51 ++++++++++++++++++----------------- 5 files changed, 100 insertions(+), 27 deletions(-) diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index f8149a8..73f95d1 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -75,6 +75,14 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * streamSignals = stableSignals } if rec != nil && len(art.allIQ) > 0 { + if art.streamDropped { + rt.streamOverlap = &streamIQOverlap{} + for k := range rt.streamPhaseState { + rt.streamPhaseState[k].phase = 0 + } + rec.ResetStreams() + logging.Warn("gap", "iq_dropped", "msg", "buffer bloat caused extraction drop; overlap reset") + } if rt.cfg.Recorder.DebugLiveAudio { log.Printf("LIVEAUDIO DSP: detailIQ=%d displaySignals=%d streamSignals=%d stableSignals=%d allIQ=%d", len(art.detailIQ), len(displaySignals), len(streamSignals), len(stableSignals), len(art.allIQ)) } diff --git a/cmd/sdrd/helpers.go b/cmd/sdrd/helpers.go index 76524e9..26a25ca 100644 --- a/cmd/sdrd/helpers.go +++ b/cmd/sdrd/helpers.go @@ -3,8 +3,10 @@ package main import ( "log" "math" + "os" "sort" "strconv" + "strings" "time" "sdr-wideband-suite/internal/config" @@ -231,6 +233,18 @@ const ( wfmStreamMinBW = 250000 ) +var forceCPUStreamExtract = func() bool { + raw := strings.TrimSpace(os.Getenv("SDR_FORCE_CPU_STREAM_EXTRACT")) + if raw == "" { + return false + } + v, err := strconv.ParseBool(raw) + if err != nil { + return false + } + return v +}() + // extractForStreaming performs GPU-accelerated extraction with: // - Per-signal phase-continuous FreqShift (via PhaseStart in ExtractJob) // - IQ overlap prepended to allIQ so FIR kernel has real data in halo @@ -325,8 +339,13 @@ func extractForStreaming( } } - // Try GPU BatchRunner with phase - runner := extractMgr.get(len(gpuIQ), sampleRate) + // Try GPU BatchRunner with phase unless CPU-only debug is forced. + var runner *gpudemod.BatchRunner + if forceCPUStreamExtract { + logging.Warn("boundary", "force_cpu_stream_extract", "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "signals", len(signals)) + } else { + runner = extractMgr.get(len(gpuIQ), sampleRate) + } if runner != nil { results, err := runner.ShiftFilterDecimateBatchWithPhase(gpuIQ, jobs) if err == nil && len(results) == len(signals) { @@ -356,9 +375,13 @@ func extractForStreaming( // Trim overlap from output iq := res.IQ + rawLen := len(iq) if trimSamples > 0 && trimSamples < len(iq) { iq = iq[trimSamples:] } + if i == 0 { + logging.Debug("boundary", "extract_trim", "path", "gpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(iq), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID) + } out[i] = iq rates[i] = res.Rate } @@ -424,9 +447,13 @@ func extractForStreaming( if i == 0 { logging.Debug("extract", "cpu_result", "outRate", outRate, "decim", decim, "trim", trimSamples) } + rawLen := len(decimated) if trimSamples > 0 && trimSamples < len(decimated) { decimated = decimated[trimSamples:] } + if i == 0 { + logging.Debug("boundary", "extract_trim", "path", "cpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(decimated), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID) + } out[i] = decimated } return out, rates diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index d2cec7f..1d498f6 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -3,6 +3,8 @@ package main import ( "fmt" "math" + "os" + "strconv" "strings" "sync" "sync/atomic" @@ -29,6 +31,18 @@ type rdsState struct { mu sync.Mutex } +var forceFixedStreamReadSamples = func() int { + raw := strings.TrimSpace(os.Getenv("SDR_FORCE_FIXED_STREAM_READ_SAMPLES")) + if raw == "" { + return 0 + } + v, err := strconv.Atoi(raw) + if err != nil || v <= 0 { + return 0 + } + return v +}() + type dspRuntime struct { cfg config.Config det *detector.Detector @@ -56,6 +70,7 @@ type dspRuntime struct { type spectrumArtifacts struct { allIQ []complex64 + streamDropped bool surveillanceIQ []complex64 detailIQ []complex64 surveillanceSpectrum []float64 @@ -341,7 +356,17 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag } available := required st := srcMgr.Stats() - if st.BufferSamples > required { + if forceFixedStreamReadSamples > 0 { + available = forceFixedStreamReadSamples + if available < required { + available = required + } + available = (available / required) * required + if available < required { + available = required + } + logging.Warn("boundary", "fixed_stream_read_samples", "configured", forceFixedStreamReadSamples, "effective", available, "required", required) + } else if st.BufferSamples > required { available = (st.BufferSamples / required) * required if available < required { available = required @@ -366,8 +391,10 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag maxStreamSamples = required } maxStreamSamples = (maxStreamSamples / required) * required + streamDropped := false if len(allIQ) > maxStreamSamples { allIQ = allIQ[len(allIQ)-maxStreamSamples:] + streamDropped = true } logging.Debug("capture", "iq_len", "len", len(allIQ), "surv_fft", rt.cfg.FFTSize, "detail_fft", rt.detailFFT) survIQ := allIQ @@ -432,6 +459,7 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag finished, detected := rt.det.Process(now, survSpectrum, rt.cfg.CenterHz) return &spectrumArtifacts{ allIQ: allIQ, + streamDropped: streamDropped, surveillanceIQ: survIQ, detailIQ: detailIQ, surveillanceSpectrum: survSpectrum, diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go index 7e473a9..ef1b113 100644 --- a/internal/recorder/recorder.go +++ b/internal/recorder/recorder.go @@ -357,6 +357,13 @@ func (m *Manager) StreamerRef() *Streamer { return m.streamer } +func (m *Manager) ResetStreams() { + if m == nil || m.streamer == nil { + return + } + m.streamer.ResetStreams() +} + func (m *Manager) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo { if m == nil || m.streamer == nil { return nil diff --git a/internal/recorder/streamer.go b/internal/recorder/streamer.go index 30d5248..f48253f 100644 --- a/internal/recorder/streamer.go +++ b/internal/recorder/streamer.go @@ -60,6 +60,8 @@ type streamSession struct { // --- Persistent DSP state for click-free streaming --- // Overlap-save: tail of previous extracted IQ snippet. + // Currently unused for live demod after removing the extra discriminator + // overlap prepend, but kept in DSP snapshot state for compatibility. overlapIQ []complex64 // De-emphasis IIR state (persists across frames) @@ -731,26 +733,13 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] return nil, 0 } - // --- FM discriminator overlap: prepend 1 sample from previous frame --- - // The FM discriminator needs iq[i-1] to compute the first output. - // All FIR filtering is now stateful, so no additional overlap is needed. - var fullSnip []complex64 - trimSamples := 0 - _ = trimSamples - if len(sess.overlapIQ) == 1 { - fullSnip = make([]complex64, 1+len(snippet)) - fullSnip[0] = sess.overlapIQ[0] - copy(fullSnip[1:], snippet) - trimSamples = 1 - logging.Debug("discrim", "overlap_applied", "signal", sess.signalID, "snip", len(snippet)) - } else { - fullSnip = snippet - } - - // Save last sample for next frame's FM discriminator - if len(snippet) > 0 { - sess.overlapIQ = []complex64{snippet[len(snippet)-1]} - } + // The extra 1-sample discriminator overlap prepend was removed after it was + // shown to shift the downstream decimation phase and create heavy click + // artifacts in steady-state streaming/recording. The upstream extraction path + // and the stateful FIR/decimation stages already provide continuity. + fullSnip := snippet + overlapApplied := false + prevTailValid := false // --- Stateful anti-alias FIR + decimation to demod rate --- demodRate := d.OutputSampleRate() @@ -788,20 +777,21 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] sess.preDemodDecimPhase = 0 } + decimPhaseBefore := sess.preDemodDecimPhase filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip))) dec = dsp.DecimateStateful(filtered, decim1, &sess.preDemodDecimPhase) + logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase) } else { + logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0) dec = fullSnip } - // --- FM Demod --- + // --- FM/AM/etc Demod --- audio := d.Demod(dec, actualDemodRate) if len(audio) == 0 { return nil, 0 } - - // --- Trim the 1-sample FM discriminator overlap --- - // TEMP: skip audio trim to test if per-block trimming causes ticks + logging.Debug("boundary", "audio_path", "signal", sess.signalID, "demod", demodName, "actual_rate", actualDemodRate, "audio_len", len(audio), "channels", d.Channels(), "overlap_applied", overlapApplied, "prev_tail_valid", prevTailValid) // --- Stateful stereo decode with conservative lock/hysteresis --- channels := 1 @@ -1483,3 +1473,16 @@ func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels } _, _ = f.Write(buf[:]) } + +// ResetStreams forces all active streaming sessions to discard their FIR states and decimation phases. +// This is used when the upstream DSP drops samples, creating a hard break in phase continuity. +func (st *Streamer) ResetStreams() { + st.mu.Lock() + defer st.mu.Unlock() + for _, sess := range st.sessions { + sess.preDemodFIR = nil + sess.preDemodDecimPhase = 0 + sess.stereoResampler = nil + sess.monoResampler = nil + } +}