diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index a579752..f8149a8 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -32,6 +32,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * enc := json.NewEncoder(eventFile) dcBlocker := dsp.NewDCBlocker(0.995) state := &phaseState{} + var frameID uint64 for { select { case <-ctx.Done(): @@ -44,6 +45,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * dcBlocker.Reset() ticker.Reset(rt.cfg.FrameInterval()) case <-ticker.C: + frameID++ art, err := rt.captureSpectrum(srcMgr, rec, dcBlocker, gpuState) if err != nil { log.Printf("read IQ: %v", err) @@ -58,6 +60,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * log.Printf("received IQ samples") rt.gotSamples = true } + logging.Debug("trace", "capture_done", "trace", frameID, "allIQ", len(art.allIQ), "detailIQ", len(art.detailIQ)) state.surveillance = rt.buildSurveillanceResult(art) state.refinement = rt.runRefinement(art, state.surveillance, extractMgr, rec) finished := state.surveillance.Finished @@ -77,6 +80,23 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult} streamSnips, streamRates := extractForStreaming(extractMgr, art.allIQ, rt.cfg.SampleRate, rt.cfg.CenterHz, streamSignals, rt.streamPhaseState, rt.streamOverlap, aqCfg) + nonEmpty := 0 + minLen := 0 + maxLen := 0 + for i := range streamSnips { + l := len(streamSnips[i]) + if l == 0 { + continue + } + nonEmpty++ + if minLen == 0 || l < minLen { + minLen = l + } + if l > maxLen { + maxLen = l + } + } + logging.Debug("trace", "extract_stats", "trace", frameID, "signals", len(streamSignals), "nonempty", nonEmpty, "minLen", minLen, "maxLen", maxLen) items := make([]recorder.StreamFeedItem, 0, len(streamSignals)) for j, ds := range streamSignals { className := "" @@ -107,9 +127,10 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * log.Printf("LIVEAUDIO DSP: feedItems=%d", len(items)) } if len(items) > 0 { - rec.FeedSnippets(items) + rec.FeedSnippets(items, frameID) + logging.Debug("trace", "feed", "trace", frameID, "items", len(items), "signals", len(streamSignals), "allIQ", len(art.allIQ)) } else { - logging.Warn("gap", "feed_empty", "signals", len(streamSignals)) + logging.Warn("gap", "feed_empty", "signals", len(streamSignals), "trace", frameID) } } rt.maintenance(displaySignals, rec) diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go index d8a796f..7e473a9 100644 --- a/internal/recorder/recorder.go +++ b/internal/recorder/recorder.go @@ -310,7 +310,7 @@ func (m *Manager) SliceRecent(seconds float64) ([]complex64, int, float64) { // FeedSnippets is called once per DSP frame with pre-extracted IQ snippets // (GPU-accelerated FreqShift+FIR+Decimate). The Streamer handles demod with // persistent state (overlap-save, stereo decode, de-emphasis) asynchronously. -func (m *Manager) FeedSnippets(items []StreamFeedItem) { +func (m *Manager) FeedSnippets(items []StreamFeedItem, traceID uint64) { if m == nil || m.streamer == nil || len(items) == 0 { return } @@ -339,7 +339,7 @@ func (m *Manager) FeedSnippets(items []StreamFeedItem) { snipRate: item.SnipRate, } } - m.streamer.FeedSnippets(internal) + m.streamer.FeedSnippets(internal, traceID) } // StreamFeedItem is the public type for passing extracted snippets from DSP loop. diff --git a/internal/recorder/streamer.go b/internal/recorder/streamer.go index afb965f..b26cea7 100644 --- a/internal/recorder/streamer.go +++ b/internal/recorder/streamer.go @@ -155,7 +155,8 @@ type streamFeedItem struct { } type streamFeedMsg struct { - items []streamFeedItem + traceID uint64 + items []streamFeedItem } type Streamer struct { @@ -257,7 +258,7 @@ func (st *Streamer) hasListenersLocked() bool { // // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet // data, so items can be passed directly without another copy. -func (st *Streamer) FeedSnippets(items []streamFeedItem) { +func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) { st.mu.Lock() recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ) hasListeners := st.hasListenersLocked() @@ -281,7 +282,7 @@ func (st *Streamer) FeedSnippets(items []streamFeedItem) { } select { - case st.feedCh <- streamFeedMsg{items: items}: + case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}: default: st.droppedFeed++ logging.Warn("drop", "feed_drop", "count", st.droppedFeed) @@ -297,12 +298,14 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { if !st.lastProcTS.IsZero() { gap := now.Sub(st.lastProcTS) if gap > 150*time.Millisecond { - logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds()) + logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID) } } st.lastProcTS = now defer st.mu.Unlock() + logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items)) + if !recEnabled && !hasListeners { return } @@ -390,7 +393,12 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { } // Demod with persistent state + logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate) audio, audioRate := sess.processSnippet(item.snippet, item.snipRate) + logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate) + if len(audio) == 0 { + logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate) + } if len(audio) > 0 { if sess.wavSamples == 0 && audioRate > 0 { sess.sampleRate = audioRate