From bd608fda075b7eaae50a3283a766494df3df4ab2 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Wed, 25 Mar 2026 14:35:45 +0100 Subject: [PATCH] fix: stabilize streaming extraction and FM block boundaries --- cmd/sdrd/helpers.go | 43 +++++++++++++++++- cmd/sdrd/streaming_refactor.go | 51 ++++++++++++++++++++-- internal/demod/gpudemod/stream_state.go | 10 ++++- internal/demod/gpudemod/streaming_types.go | 14 +++++- internal/recorder/streamer.go | 26 ++++++++++- 5 files changed, 135 insertions(+), 9 deletions(-) diff --git a/cmd/sdrd/helpers.go b/cmd/sdrd/helpers.go index fbb7c02..905af0a 100644 --- a/cmd/sdrd/helpers.go +++ b/cmd/sdrd/helpers.go @@ -267,14 +267,53 @@ func extractForStreaming( coll *telemetry.Collector, ) ([][]complex64, []int) { if useStreamingProductionPath { - if out, rates, err := extractForStreamingProduction(extractMgr, allIQ, sampleRate, centerHz, signals, aqCfg, coll); err == nil { + out, rates, err := extractForStreamingProduction(extractMgr, allIQ, sampleRate, centerHz, signals, aqCfg, coll) + if err == nil { + logging.Debug("extract", "path_active", "path", "streaming_production", "signals", len(signals), "allIQ", len(allIQ)) + if coll != nil { + coll.IncCounter("extract.path.streaming_production", 1, nil) + } return out, rates } + // CRITICAL: the streaming production path failed — log WHY before falling through + log.Printf("EXTRACT PATH FALLTHROUGH: streaming production failed: %v — using legacy overlap+trim", err) + logging.Warn("extract", "streaming_production_fallthrough", + "err", err.Error(), + "signals", len(signals), + "allIQ", len(allIQ), + "sampleRate", sampleRate, + ) + if coll != nil { + coll.IncCounter("extract.path.streaming_production_failed", 1, nil) + coll.Event("extraction_path_fallthrough", "warn", + "streaming production path failed, using legacy overlap+trim", nil, + map[string]any{ + "error": err.Error(), + "signals": len(signals), + "allIQ_len": len(allIQ), + "sampleRate": sampleRate, + }) + } } if useStreamingOraclePath { - if out, rates, err := extractForStreamingOracle(allIQ, sampleRate, centerHz, signals, aqCfg, coll); err == nil { + out, rates, err := extractForStreamingOracle(allIQ, sampleRate, centerHz, signals, aqCfg, coll) + if err == nil { + logging.Debug("extract", "path_active", "path", "streaming_oracle", "signals", len(signals)) + if coll != nil { + coll.IncCounter("extract.path.streaming_oracle", 1, nil) + } return out, rates } + log.Printf("EXTRACT PATH FALLTHROUGH: streaming oracle failed: %v", err) + logging.Warn("extract", "streaming_oracle_fallthrough", "err", err.Error()) + if coll != nil { + coll.IncCounter("extract.path.streaming_oracle_failed", 1, nil) + } + } + // If we reach here, the legacy overlap+trim path is running + logging.Warn("extract", "path_active", "path", "legacy_overlap_trim", "signals", len(signals), "allIQ", len(allIQ)) + if coll != nil { + coll.IncCounter("extract.path.legacy_overlap_trim", 1, nil) } out := make([][]complex64, len(signals)) rates := make([]int, len(signals)) diff --git a/cmd/sdrd/streaming_refactor.go b/cmd/sdrd/streaming_refactor.go index 9ea78fe..9ad2260 100644 --- a/cmd/sdrd/streaming_refactor.go +++ b/cmd/sdrd/streaming_refactor.go @@ -15,7 +15,6 @@ var streamingOracleRunner *gpudemod.CPUOracleRunner func buildStreamingJobs(sampleRate int, centerHz float64, signals []detector.Signal, aqCfg extractionConfig) ([]gpudemod.StreamingExtractJob, error) { jobs := make([]gpudemod.StreamingExtractJob, len(signals)) - decimTarget := 200000 bwMult := aqCfg.bwMult if bwMult <= 0 { bwMult = 1.0 @@ -29,14 +28,20 @@ func buildStreamingJobs(sampleRate int, centerHz float64, signals []detector.Sig sigMHz := sig.CenterHz / 1e6 isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO")) - outRate := decimTarget + var outRate int if isWFM { outRate = wfmStreamOutRate if bw < wfmStreamMinBW { bw = wfmStreamMinBW } - } else if bw < 20000 { - bw = 20000 + } else { + // Non-WFM target: must be an exact integer divisor of sampleRate. + // The old hardcoded 200000 fails for common SDR rates (e.g. 4096000/200000=20.48). + // Find the nearest valid rate >= 128000 (enough for NFM/AM/SSB). + outRate = nearestExactDecimationRate(sampleRate, 200000, 128000) + if bw < 20000 { + bw = 20000 + } } if _, err := gpudemod.ExactIntegerDecimation(sampleRate, outRate); err != nil { return nil, err @@ -92,3 +97,41 @@ func extractForStreamingOracle( func phaseIncForOffset(sampleRate int, offsetHz float64) float64 { return -2.0 * math.Pi * offsetHz / float64(sampleRate) } + +// nearestExactDecimationRate finds the output rate closest to targetRate +// (but not below minRate) that is an exact integer divisor of sampleRate. +// This avoids the ExactIntegerDecimation check failing for rates like +// 4096000/200000=20.48 which silently killed the entire streaming batch. +func nearestExactDecimationRate(sampleRate int, targetRate int, minRate int) int { + if sampleRate <= 0 || targetRate <= 0 { + return targetRate + } + if sampleRate%targetRate == 0 { + return targetRate // already exact + } + // Try decimation factors near the target + targetDecim := sampleRate / targetRate // floor + bestRate := 0 + bestDist := sampleRate // impossibly large + for d := max(1, targetDecim-2); d <= targetDecim+2; d++ { + rate := sampleRate / d + if rate < minRate { + continue + } + if sampleRate%rate != 0 { + continue // not exact (shouldn't happen since rate = sampleRate/d, but guard) + } + dist := targetRate - rate + if dist < 0 { + dist = -dist + } + if dist < bestDist { + bestDist = dist + bestRate = rate + } + } + if bestRate > 0 { + return bestRate + } + return targetRate // fallback — will fail ExactIntegerDecimation and surface the error +} diff --git a/internal/demod/gpudemod/stream_state.go b/internal/demod/gpudemod/stream_state.go index 5c1db48..26bc5fd 100644 --- a/internal/demod/gpudemod/stream_state.go +++ b/internal/demod/gpudemod/stream_state.go @@ -1,6 +1,10 @@ package gpudemod -import "sdr-wideband-suite/internal/dsp" +import ( + "log" + + "sdr-wideband-suite/internal/dsp" +) func (r *BatchRunner) ResetSignalState(signalID int64) { if r == nil || r.streamState == nil { @@ -35,6 +39,10 @@ func (r *BatchRunner) getOrInitExtractState(job StreamingExtractJob, sampleRate r.streamState[job.SignalID] = state } if state.ConfigHash != job.ConfigHash { + if state.Initialized { + log.Printf("STREAMING STATE RESET: signal=%d oldHash=%d newHash=%d historyLen=%d", + job.SignalID, state.ConfigHash, job.ConfigHash, len(state.ShiftedHistory)) + } ResetExtractStreamState(state, job.ConfigHash) } state.Decim = decim diff --git a/internal/demod/gpudemod/streaming_types.go b/internal/demod/gpudemod/streaming_types.go index c6ee6b7..bc8e7b6 100644 --- a/internal/demod/gpudemod/streaming_types.go +++ b/internal/demod/gpudemod/streaming_types.go @@ -3,6 +3,7 @@ package gpudemod import ( "fmt" "hash/fnv" + "math" ) type StreamingExtractJob struct { @@ -48,7 +49,18 @@ func ResetExtractStreamState(state *ExtractStreamState, cfgHash uint64) { } func StreamingConfigHash(signalID int64, offsetHz float64, bandwidth float64, outRate int, numTaps int, sampleRate int) uint64 { + // Quantize offset and bandwidth to 1 kHz resolution before hashing. + // The detector's exponential smoothing causes CenterHz (and therefore offsetHz) + // to jitter by fractions of a Hz every frame. With %.9f formatting, this + // produced a new hash every frame → full state reset (NCOPhase=0, History=[], + // PhaseCount=0) → FIR settling + phase discontinuity → audible clicks. + // + // The NCO phase_inc is computed from the exact offset each frame, so small + // frequency changes are tracked smoothly without a reset. Only structural + // changes (bandwidth affecting FIR taps, decimation, tap count) need a reset. + qOff := math.Round(offsetHz / 1000) * 1000 + qBW := math.Round(bandwidth / 1000) * 1000 h := fnv.New64a() - _, _ = h.Write([]byte(fmt.Sprintf("sig=%d|off=%.9f|bw=%.9f|out=%d|taps=%d|sr=%d", signalID, offsetHz, bandwidth, outRate, numTaps, sampleRate))) + _, _ = h.Write([]byte(fmt.Sprintf("sig=%d|off=%.0f|bw=%.0f|out=%d|taps=%d|sr=%d", signalID, qOff, qBW, outRate, numTaps, sampleRate))) return h.Sum64() } diff --git a/internal/recorder/streamer.go b/internal/recorder/streamer.go index 65cb34d..f5a650d 100644 --- a/internal/recorder/streamer.go +++ b/internal/recorder/streamer.go @@ -61,6 +61,11 @@ type streamSession struct { prevExtractIQ complex64 lastExtractIQSet bool + // FM discriminator cross-block bridging: carry the last IQ sample so the + // discriminator can compute the phase step across block boundaries. + lastDiscrimIQ complex64 + lastDiscrimIQSet bool + lastDemodL float32 prevDemodL float64 lastDemodSet bool @@ -1238,7 +1243,26 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int, col } // --- FM/AM/etc Demod --- - audio := d.Demod(dec, actualDemodRate) + // For FM demod (NFM/WFM): bridge the block boundary by prepending the + // previous block's last IQ sample. Without this, the discriminator loses + // the cross-boundary phase step (1 audio sample missing per block) and + // any phase discontinuity at the seam becomes an unsmoothed audio transient. + var audio []float32 + isFMDemod := demodName == "NFM" || demodName == "WFM" + if isFMDemod && sess.lastDiscrimIQSet && len(dec) > 0 { + bridged := make([]complex64, len(dec)+1) + bridged[0] = sess.lastDiscrimIQ + copy(bridged[1:], dec) + audio = d.Demod(bridged, actualDemodRate) + // bridged produced len(dec) audio samples (= len(bridged)-1) + // which is exactly the correct count for the new data + } else { + audio = d.Demod(dec, actualDemodRate) + } + if len(dec) > 0 { + sess.lastDiscrimIQ = dec[len(dec)-1] + sess.lastDiscrimIQSet = true + } if len(audio) == 0 { return nil, 0 }