From 0b9f86685a8a32fafdb1a24394aa0a5e0bb9e7c3 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Mon, 23 Mar 2026 12:30:23 +0100 Subject: [PATCH] Add feed/process gap diagnostics for live audio --- cmd/sdrd/dsp_loop.go | 4 ++++ internal/recorder/streamer.go | 23 ++++++++++++++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index 5d54d6c..a579752 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -13,6 +13,7 @@ import ( "sdr-wideband-suite/internal/config" "sdr-wideband-suite/internal/detector" "sdr-wideband-suite/internal/dsp" + "sdr-wideband-suite/internal/logging" "sdr-wideband-suite/internal/pipeline" "sdr-wideband-suite/internal/recorder" ) @@ -93,6 +94,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * continue } if j >= len(streamSnips) || len(streamSnips[j]) == 0 { + logging.Warn("gap", "snippet_empty", "signal", ds.ID) continue } snipRate := rt.cfg.SampleRate @@ -106,6 +108,8 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } if len(items) > 0 { rec.FeedSnippets(items) + } else { + logging.Warn("gap", "feed_empty", "signals", len(streamSignals)) } } rt.maintenance(displaySignals, rec) diff --git a/internal/recorder/streamer.go b/internal/recorder/streamer.go index 7ba3d53..afb965f 100644 --- a/internal/recorder/streamer.go +++ b/internal/recorder/streamer.go @@ -170,6 +170,9 @@ type Streamer struct { droppedFeed uint64 droppedPCM uint64 + lastFeedTS time.Time + lastProcTS time.Time + // pendingListens are subscribers waiting for a matching session. pendingListens map[int64]*pendingListen } @@ -260,6 +263,14 @@ func (st *Streamer) FeedSnippets(items []streamFeedItem) { hasListeners := st.hasListenersLocked() pending := len(st.pendingListens) debugLiveAudio := st.policy.DebugLiveAudio + now := time.Now() + if !st.lastFeedTS.IsZero() { + gap := now.Sub(st.lastFeedTS) + if gap > 150*time.Millisecond { + logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds()) + } + } + st.lastFeedTS = now st.mu.Unlock() if debugLiveAudio { @@ -280,16 +291,22 @@ func (st *Streamer) FeedSnippets(items []streamFeedItem) { // processFeed runs in the worker goroutine. func (st *Streamer) processFeed(msg streamFeedMsg) { st.mu.Lock() - defer st.mu.Unlock() - recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ) hasListeners := st.hasListenersLocked() + now := time.Now() + if !st.lastProcTS.IsZero() { + gap := now.Sub(st.lastProcTS) + if gap > 150*time.Millisecond { + logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds()) + } + } + st.lastProcTS = now + defer st.mu.Unlock() if !recEnabled && !hasListeners { return } - now := time.Now() seen := make(map[int64]bool, len(msg.items)) for i := range msg.items {