From cac709f2ca1377ea265a56497ff04bcf4a580d21 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sat, 28 Mar 2026 15:28:24 +0100 Subject: [PATCH] Add minimal audio stutter instrumentation for server and browser summaries --- cmd/sdrd/http_handlers.go | 32 ++++++ internal/recorder/audio_stutter_debug.go | 119 ++++++++++++++++++++ internal/recorder/streamer.go | 136 ++++++++++++++++++++--- web/app.js | 92 +++++++++++++++ 4 files changed, 365 insertions(+), 14 deletions(-) create mode 100644 internal/recorder/audio_stutter_debug.go diff --git a/cmd/sdrd/http_handlers.go b/cmd/sdrd/http_handlers.go index a633fde..acf944f 100644 --- a/cmd/sdrd/http_handlers.go +++ b/cmd/sdrd/http_handlers.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "io" "log" "net/http" "os" @@ -516,6 +517,37 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } }) + mux.HandleFunc("/api/debug/audio-stutter/browser-summary", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + streamer := recMgr.StreamerRef() + if streamer == nil { + http.Error(w, "streamer unavailable", http.StatusServiceUnavailable) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, 64*1024)) + if err != nil { + http.Error(w, "read failed", http.StatusBadRequest) + return + } + if len(body) == 0 { + http.Error(w, "empty body", http.StatusBadRequest) + return + } + var payload any + if err := json.Unmarshal(body, &payload); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + if err := streamer.AppendBrowserAudioSummary(payload); err != nil { + http.Error(w, "persist failed", http.StatusInternalServerError) + return + } + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) + }) } func newHTTPServer(addr string, webRoot string, h *hub, cfgPath string, cfgManager *runtime.Manager, srcMgr *sourceManager, dspUpdates chan dspUpdate, gpuState *gpuStatus, recMgr *recorder.Manager, sigSnap *signalSnapshot, eventMu *sync.RWMutex, phaseSnap *phaseSnapshot, telem *telemetry.Collector) *http.Server { diff --git a/internal/recorder/audio_stutter_debug.go b/internal/recorder/audio_stutter_debug.go new file mode 100644 index 0000000..91bdc53 --- /dev/null +++ b/internal/recorder/audio_stutter_debug.go @@ -0,0 +1,119 @@ +package recorder + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "sync" + "time" +) + +const ( + audioStutterDebugDir = "debug/audio-stutter" + serverStreamSummaryFile = "server_stream_summary.jsonl" + browserAudioSummaryFile = "browser_audio_summary.jsonl" +) + +type audioStutterDebugLogger struct { + mu sync.Mutex + serverFile *os.File + serverWriter *bufio.Writer + browserFile *os.File + browserWriter *bufio.Writer +} + +func newAudioStutterDebugLogger() *audioStutterDebugLogger { + return &audioStutterDebugLogger{} +} + +func (l *audioStutterDebugLogger) WriteServerSummary(v any) error { + l.mu.Lock() + defer l.mu.Unlock() + if err := l.ensureServerWriterLocked(); err != nil { + return err + } + return writeJSONLLineLocked(l.serverWriter, v) +} + +func (l *audioStutterDebugLogger) WriteBrowserSummary(v any) error { + l.mu.Lock() + defer l.mu.Unlock() + if err := l.ensureBrowserWriterLocked(); err != nil { + return err + } + envelope := map[string]any{ + "ts_server": time.Now().UTC().Format(time.RFC3339Nano), + "payload": v, + } + return writeJSONLLineLocked(l.browserWriter, envelope) +} + +func (l *audioStutterDebugLogger) Close() { + l.mu.Lock() + defer l.mu.Unlock() + if l.serverWriter != nil { + _ = l.serverWriter.Flush() + } + if l.serverFile != nil { + _ = l.serverFile.Close() + } + if l.browserWriter != nil { + _ = l.browserWriter.Flush() + } + if l.browserFile != nil { + _ = l.browserFile.Close() + } + l.serverWriter = nil + l.serverFile = nil + l.browserWriter = nil + l.browserFile = nil +} + +func (l *audioStutterDebugLogger) ensureServerWriterLocked() error { + if l.serverWriter != nil { + return nil + } + if err := os.MkdirAll(audioStutterDebugDir, 0o755); err != nil { + return err + } + path := filepath.Join(audioStutterDebugDir, serverStreamSummaryFile) + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return err + } + l.serverFile = f + l.serverWriter = bufio.NewWriterSize(f, 16*1024) + return nil +} + +func (l *audioStutterDebugLogger) ensureBrowserWriterLocked() error { + if l.browserWriter != nil { + return nil + } + if err := os.MkdirAll(audioStutterDebugDir, 0o755); err != nil { + return err + } + path := filepath.Join(audioStutterDebugDir, browserAudioSummaryFile) + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return err + } + l.browserFile = f + l.browserWriter = bufio.NewWriterSize(f, 16*1024) + return nil +} + +func writeJSONLLineLocked(w *bufio.Writer, v any) error { + b, err := json.Marshal(v) + if err != nil { + return err + } + if _, err := w.Write(b); err != nil { + return err + } + if err := w.WriteByte('\n'); err != nil { + return err + } + return w.Flush() +} diff --git a/internal/recorder/streamer.go b/internal/recorder/streamer.go index d7fee44..5919845 100644 --- a/internal/recorder/streamer.go +++ b/internal/recorder/streamer.go @@ -10,9 +10,9 @@ import ( "math" "os" "path/filepath" + "sort" "strconv" "strings" - "sort" "sync" "time" @@ -46,8 +46,8 @@ type streamSession struct { debugDumpUntil time.Time debugDumpBase string - demodDump []float32 - finalDump []float32 + demodDump []float32 + finalDump []float32 lastAudioL float32 lastAudioR float32 prevAudioL float64 // second-to-last L sample for boundary transient detection @@ -136,12 +136,12 @@ type streamSession struct { // Stateful pre-demod anti-alias FIR (eliminates cold-start transients // and avoids per-frame FIR recomputation) - preDemodFIR *dsp.StatefulFIRComplex - preDemodDecimator *dsp.StatefulDecimatingFIRComplex - preDemodDecim int // cached decimation factor - preDemodRate int // cached snipRate this FIR was built for - preDemodCutoff float64 // cached cutoff - preDemodDecimPhase int // retained for backward compatibility in snapshots/debug + preDemodFIR *dsp.StatefulFIRComplex + preDemodDecimator *dsp.StatefulDecimatingFIRComplex + preDemodDecim int // cached decimation factor + preDemodRate int // cached snipRate this FIR was built for + preDemodCutoff float64 // cached cutoff + preDemodDecimPhase int // retained for backward compatibility in snapshots/debug // AQ-2: De-emphasis config (µs, 0 = disabled) deemphasisUs float64 @@ -244,8 +244,8 @@ type streamFeedItem struct { } type streamFeedMsg struct { - traceID uint64 - items []streamFeedItem + traceID uint64 + items []streamFeedItem enqueuedAt time.Time } @@ -267,6 +267,16 @@ type Streamer struct { // pendingListens are subscribers waiting for a matching session. pendingListens map[int64]*pendingListen telemetry *telemetry.Collector + + debugSummary *audioStutterDebugLogger + summaryStop chan struct{} + summaryWG sync.WaitGroup + + // Stream summary counters (cheap to maintain, sampled every ~5s) + producedPCMFrames uint64 + processLoopCount uint64 + processLoopSumMs float64 + processLoopMaxMs float64 } type pendingListen struct { @@ -285,8 +295,12 @@ func newStreamer(policy Policy, centerHz float64, coll *telemetry.Collector) *St done: make(chan struct{}), pendingListens: make(map[int64]*pendingListen), telemetry: coll, + debugSummary: newAudioStutterDebugLogger(), + summaryStop: make(chan struct{}), } go st.worker() + st.summaryWG.Add(1) + go st.summaryWorker() return st } @@ -386,7 +400,7 @@ func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) { if st.telemetry != nil { st.telemetry.IncCounter("streamer.feed.drop", 1, nil) st.telemetry.Event("stream_feed_drop", "warn", "feed queue full", nil, map[string]any{ - "trace_id": traceID, + "trace_id": traceID, "queue_len": len(st.feedCh), }) } @@ -415,8 +429,14 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { st.lastProcTS = now defer st.mu.Unlock() defer func() { + procMs := float64(time.Since(procStart).Microseconds()) / 1000.0 + st.processLoopCount++ + st.processLoopSumMs += procMs + if procMs > st.processLoopMaxMs { + st.processLoopMaxMs = procMs + } if st.telemetry != nil { - st.telemetry.Observe("streamer.process.total_ms", float64(time.Since(procStart).Microseconds())/1000.0, nil) + st.telemetry.Observe("streamer.process.total_ms", procMs, nil) st.telemetry.Observe("streamer.lock_wait_ms", float64(lockWait.Microseconds())/1000.0, telemetry.TagsFromPairs("lock", "process")) } }() @@ -538,6 +558,11 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { } } if len(audio) > 0 { + ch := sess.channels + if ch <= 0 { + ch = 1 + } + st.producedPCMFrames += uint64(len(audio) / ch) if sess.wavSamples == 0 && audioRate > 0 { sess.sampleRate = audioRate } @@ -691,7 +716,7 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { if st.telemetry != nil { st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID)) st.telemetry.Event("session_close", "info", "stream session closed", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID), map[string]any{ - "reason": "signal_missing", + "reason": "signal_missing", "listen_only": sess.listenOnly, }) } @@ -775,6 +800,8 @@ func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo { } func (st *Streamer) CloseAll() { + close(st.summaryStop) + st.summaryWG.Wait() close(st.feedCh) <-st.done @@ -800,6 +827,9 @@ func (st *Streamer) CloseAll() { if st.telemetry != nil { st.telemetry.Event("streamer_close_all", "info", "all stream sessions closed", nil, nil) } + if st.debugSummary != nil { + st.debugSummary.Close() + } } // ActiveSessions returns the number of open streaming sessions. @@ -2009,6 +2039,84 @@ func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) { } } +func (st *Streamer) summaryWorker() { + defer st.summaryWG.Done() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + st.writePeriodicSummary() + case <-st.summaryStop: + return + } + } +} + +func (st *Streamer) writePeriodicSummary() { + st.mu.Lock() + activeSubscribers := 0 + for _, sess := range st.sessions { + activeSubscribers += len(sess.audioSubs) + } + processAvg := 0.0 + if st.processLoopCount > 0 { + processAvg = st.processLoopSumMs / float64(st.processLoopCount) + } + summary := map[string]any{ + "ts": time.Now().UTC().Format(time.RFC3339Nano), + "feed_drop_total": st.droppedFeed, + "pcm_drop_total": st.droppedPCM, + "active_subscribers": activeSubscribers, + "pending_listeners": len(st.pendingListens), + "active_sessions": len(st.sessions), + "produced_pcm_frames": st.producedPCMFrames, + "process_loop_ms_avg": processAvg, + "process_loop_ms_max": st.processLoopMaxMs, + "feed_queue_len": len(st.feedCh), + "feed_queue_cap": cap(st.feedCh), + "feed_queue_fill_ratio": safeRatio(float64(len(st.feedCh)), float64(cap(st.feedCh))), + "backpressure_hint": st.backpressureHintLocked(), + } + st.processLoopCount = 0 + st.processLoopSumMs = 0 + st.processLoopMaxMs = 0 + st.mu.Unlock() + + if st.debugSummary != nil { + _ = st.debugSummary.WriteServerSummary(summary) + } +} + +func (st *Streamer) backpressureHintLocked() string { + queueLen := len(st.feedCh) + queueCap := cap(st.feedCh) + if queueCap > 0 && float64(queueLen)/float64(queueCap) >= 0.8 { + return "feed_queue_high" + } + if st.droppedFeed > 0 || st.droppedPCM > 0 { + return "drops_seen" + } + if len(st.pendingListens) > 0 { + return "pending_listeners" + } + return "ok" +} + +func safeRatio(a float64, b float64) float64 { + if b <= 0 { + return 0 + } + return a / b +} + +func (st *Streamer) AppendBrowserAudioSummary(v any) error { + if st == nil || st.debugSummary == nil { + return nil + } + return st.debugSummary.WriteBrowserSummary(v) +} + func (st *Streamer) classAllowed(cls *classifier.Classification) bool { if len(st.policy.ClassFilter) == 0 { return true diff --git a/web/app.js b/web/app.js index 1d15071..d72ef74 100644 --- a/web/app.js +++ b/web/app.js @@ -218,6 +218,20 @@ class LiveListenWS { this._flushTimer = 0; // Fade state for soft resync this._lastEndSample = null; // last sample value per channel for crossfade + this._summaryTimer = 0; + this._stats = { + startedAtMs: performance.now(), + pcmChunksRx: 0, + pcmSamplesRx: 0, + acceptedChunks: 0, + droppedMaxBuffered: 0, + underruns: 0, + resyncs: 0, + lastAcceptedChunkAtMs: 0, + lastLeadMs: 0, + maxLeadMs: Number.NEGATIVE_INFINITY, + minLeadMs: Number.POSITIVE_INFINITY + }; } start() { @@ -226,6 +240,7 @@ class LiveListenWS { this.ws = new WebSocket(url); this.ws.binaryType = 'arraybuffer'; this.playing = true; + this._startSummaryTicker(); this.ws.onmessage = (ev) => { if (typeof ev.data === 'string') { @@ -248,15 +263,20 @@ class LiveListenWS { return; } if (!this.audioCtx || !this.playing) return; + this._stats.pcmChunksRx++; this._onPCM(ev.data); }; this.ws.onclose = () => { this.playing = false; + this._emitSummary('ws_close'); + this._stopSummaryTicker(); if (this._onStop) this._onStop(); }; this.ws.onerror = () => { this.playing = false; + this._emitSummary('ws_error'); + this._stopSummaryTicker(); if (this._onStop) this._onStop(); }; @@ -266,6 +286,8 @@ class LiveListenWS { } stop() { + this._emitSummary('stop'); + this._stopSummaryTicker(); this.playing = false; if (this.ws) { this.ws.close(); this.ws = null; } this._teardownAudio(); @@ -283,6 +305,21 @@ class LiveListenWS { this._lastEndSample = null; } + _startSummaryTicker() { + this._stopSummaryTicker(); + this._summaryTimer = setInterval(() => { + if (!this.playing) return; + this._emitSummary('periodic'); + }, 5000); + } + + _stopSummaryTicker() { + if (this._summaryTimer) { + clearInterval(this._summaryTimer); + this._summaryTimer = 0; + } + } + _initAudio() { if (this.audioCtx) return; this.audioCtx = new (window.AudioContext || window.webkitAudioContext)({ @@ -296,6 +333,7 @@ class LiveListenWS { _onPCM(buf) { const chunk = new Int16Array(buf); + this._stats.pcmSamplesRx += chunk.length; const maxPendingFrames = Math.ceil(this.sampleRate * 0.25); const maxPendingSamples = maxPendingFrames * Math.max(1, this.channels); @@ -365,6 +403,10 @@ class LiveListenWS { } const now = ctx.currentTime; + const leadMsBefore = (this.nextTime - now) * 1000; + this._stats.lastLeadMs = leadMsBefore; + if (leadMsBefore > this._stats.maxLeadMs) this._stats.maxLeadMs = leadMsBefore; + if (leadMsBefore < this._stats.minLeadMs) this._stats.minLeadMs = leadMsBefore; // Target latency: 400ms. This means we schedule audio to play 400ms // from now. Even if the main thread hangs for 300ms, the already- @@ -377,6 +419,10 @@ class LiveListenWS { if (!this.started || this.nextTime < now) { // First chunk or underrun. // Apply fade-in to avoid click at resync point. + if (this.started && this.nextTime < now) { + this._stats.underruns++; + this._stats.resyncs++; + } const fadeIn = Math.min(64, nFrames); for (let ch = 0; ch < this.channels; ch++) { const data = audioBuffer.getChannelData(ch); @@ -390,6 +436,7 @@ class LiveListenWS { if (this.nextTime > now + maxBuffered) { // Too much buffered — drop to cap latency + this._stats.droppedMaxBuffered++; return; } @@ -397,8 +444,53 @@ class LiveListenWS { source.buffer = audioBuffer; source.connect(ctx.destination); source.start(this.nextTime); + this._stats.acceptedChunks++; + this._stats.lastAcceptedChunkAtMs = performance.now(); this.nextTime += audioBuffer.duration; } + + _emitSummary(reason) { + const nowMs = performance.now(); + const audioNow = this.audioCtx ? this.audioCtx.currentTime : null; + const leadMs = this.audioCtx ? (this.nextTime - this.audioCtx.currentTime) * 1000 : null; + const sinceAcceptedMs = this._stats.lastAcceptedChunkAtMs > 0 + ? nowMs - this._stats.lastAcceptedChunkAtMs + : -1; + const payload = { + ts_client: new Date().toISOString(), + reason, + freq_hz: this.freq, + bw_hz: this.bw, + mode: this.mode, + sample_rate: this.sampleRate, + channels: this.channels, + playing: this.playing, + audio_current_time: audioNow, + audio_next_time: this.nextTime, + lead_ms: leadMs, + lead_ms_last: this._stats.lastLeadMs, + lead_ms_max: Number.isFinite(this._stats.maxLeadMs) ? this._stats.maxLeadMs : null, + lead_ms_min: Number.isFinite(this._stats.minLeadMs) ? this._stats.minLeadMs : null, + pcm_chunks_rx: this._stats.pcmChunksRx, + pcm_samples_rx: this._stats.pcmSamplesRx, + accepted_chunks: this._stats.acceptedChunks, + max_buffered_drops: this._stats.droppedMaxBuffered, + underruns: this._stats.underruns, + resyncs: this._stats.resyncs, + ms_since_last_accepted_chunk: sinceAcceptedMs, + uptime_ms: nowMs - this._stats.startedAtMs + }; + postBrowserAudioSummary(payload); + } +} + +function postBrowserAudioSummary(payload) { + fetch('/api/debug/audio-stutter/browser-summary', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(payload), + keepalive: true + }).catch(() => {}); } const liveListenDefaults = {