Bläddra i källkod

Add minimal audio stutter instrumentation for server and browser summaries

debug/audio-stutter-after-10min
Jan Svabenik 3 månader sedan
förälder
incheckning
cac709f2ca
4 ändrade filer med 365 tillägg och 14 borttagningar
  1. +32
    -0
      cmd/sdrd/http_handlers.go
  2. +119
    -0
      internal/recorder/audio_stutter_debug.go
  3. +122
    -14
      internal/recorder/streamer.go
  4. +92
    -0
      web/app.js

+ 32
- 0
cmd/sdrd/http_handlers.go Visa fil

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"io"
"log"
"net/http"
"os"
@@ -516,6 +517,37 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
})
mux.HandleFunc("/api/debug/audio-stutter/browser-summary", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
streamer := recMgr.StreamerRef()
if streamer == nil {
http.Error(w, "streamer unavailable", http.StatusServiceUnavailable)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 64*1024))
if err != nil {
http.Error(w, "read failed", http.StatusBadRequest)
return
}
if len(body) == 0 {
http.Error(w, "empty body", http.StatusBadRequest)
return
}
var payload any
if err := json.Unmarshal(body, &payload); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
if err := streamer.AppendBrowserAudioSummary(payload); err != nil {
http.Error(w, "persist failed", http.StatusInternalServerError)
return
}
_ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
})
}

func newHTTPServer(addr string, webRoot string, h *hub, cfgPath string, cfgManager *runtime.Manager, srcMgr *sourceManager, dspUpdates chan dspUpdate, gpuState *gpuStatus, recMgr *recorder.Manager, sigSnap *signalSnapshot, eventMu *sync.RWMutex, phaseSnap *phaseSnapshot, telem *telemetry.Collector) *http.Server {


+ 119
- 0
internal/recorder/audio_stutter_debug.go Visa fil

@@ -0,0 +1,119 @@
package recorder

import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"sync"
"time"
)

const (
audioStutterDebugDir = "debug/audio-stutter"
serverStreamSummaryFile = "server_stream_summary.jsonl"
browserAudioSummaryFile = "browser_audio_summary.jsonl"
)

type audioStutterDebugLogger struct {
mu sync.Mutex
serverFile *os.File
serverWriter *bufio.Writer
browserFile *os.File
browserWriter *bufio.Writer
}

func newAudioStutterDebugLogger() *audioStutterDebugLogger {
return &audioStutterDebugLogger{}
}

func (l *audioStutterDebugLogger) WriteServerSummary(v any) error {
l.mu.Lock()
defer l.mu.Unlock()
if err := l.ensureServerWriterLocked(); err != nil {
return err
}
return writeJSONLLineLocked(l.serverWriter, v)
}

func (l *audioStutterDebugLogger) WriteBrowserSummary(v any) error {
l.mu.Lock()
defer l.mu.Unlock()
if err := l.ensureBrowserWriterLocked(); err != nil {
return err
}
envelope := map[string]any{
"ts_server": time.Now().UTC().Format(time.RFC3339Nano),
"payload": v,
}
return writeJSONLLineLocked(l.browserWriter, envelope)
}

func (l *audioStutterDebugLogger) Close() {
l.mu.Lock()
defer l.mu.Unlock()
if l.serverWriter != nil {
_ = l.serverWriter.Flush()
}
if l.serverFile != nil {
_ = l.serverFile.Close()
}
if l.browserWriter != nil {
_ = l.browserWriter.Flush()
}
if l.browserFile != nil {
_ = l.browserFile.Close()
}
l.serverWriter = nil
l.serverFile = nil
l.browserWriter = nil
l.browserFile = nil
}

func (l *audioStutterDebugLogger) ensureServerWriterLocked() error {
if l.serverWriter != nil {
return nil
}
if err := os.MkdirAll(audioStutterDebugDir, 0o755); err != nil {
return err
}
path := filepath.Join(audioStutterDebugDir, serverStreamSummaryFile)
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return err
}
l.serverFile = f
l.serverWriter = bufio.NewWriterSize(f, 16*1024)
return nil
}

func (l *audioStutterDebugLogger) ensureBrowserWriterLocked() error {
if l.browserWriter != nil {
return nil
}
if err := os.MkdirAll(audioStutterDebugDir, 0o755); err != nil {
return err
}
path := filepath.Join(audioStutterDebugDir, browserAudioSummaryFile)
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return err
}
l.browserFile = f
l.browserWriter = bufio.NewWriterSize(f, 16*1024)
return nil
}

func writeJSONLLineLocked(w *bufio.Writer, v any) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
if _, err := w.Write(b); err != nil {
return err
}
if err := w.WriteByte('\n'); err != nil {
return err
}
return w.Flush()
}

+ 122
- 14
internal/recorder/streamer.go Visa fil

@@ -10,9 +10,9 @@ import (
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sort"
"sync"
"time"

@@ -46,8 +46,8 @@ type streamSession struct {
debugDumpUntil time.Time
debugDumpBase string

demodDump []float32
finalDump []float32
demodDump []float32
finalDump []float32
lastAudioL float32
lastAudioR float32
prevAudioL float64 // second-to-last L sample for boundary transient detection
@@ -136,12 +136,12 @@ type streamSession struct {

// Stateful pre-demod anti-alias FIR (eliminates cold-start transients
// and avoids per-frame FIR recomputation)
preDemodFIR *dsp.StatefulFIRComplex
preDemodDecimator *dsp.StatefulDecimatingFIRComplex
preDemodDecim int // cached decimation factor
preDemodRate int // cached snipRate this FIR was built for
preDemodCutoff float64 // cached cutoff
preDemodDecimPhase int // retained for backward compatibility in snapshots/debug
preDemodFIR *dsp.StatefulFIRComplex
preDemodDecimator *dsp.StatefulDecimatingFIRComplex
preDemodDecim int // cached decimation factor
preDemodRate int // cached snipRate this FIR was built for
preDemodCutoff float64 // cached cutoff
preDemodDecimPhase int // retained for backward compatibility in snapshots/debug

// AQ-2: De-emphasis config (µs, 0 = disabled)
deemphasisUs float64
@@ -244,8 +244,8 @@ type streamFeedItem struct {
}

type streamFeedMsg struct {
traceID uint64
items []streamFeedItem
traceID uint64
items []streamFeedItem
enqueuedAt time.Time
}

@@ -267,6 +267,16 @@ type Streamer struct {
// pendingListens are subscribers waiting for a matching session.
pendingListens map[int64]*pendingListen
telemetry *telemetry.Collector

debugSummary *audioStutterDebugLogger
summaryStop chan struct{}
summaryWG sync.WaitGroup

// Stream summary counters (cheap to maintain, sampled every ~5s)
producedPCMFrames uint64
processLoopCount uint64
processLoopSumMs float64
processLoopMaxMs float64
}

type pendingListen struct {
@@ -285,8 +295,12 @@ func newStreamer(policy Policy, centerHz float64, coll *telemetry.Collector) *St
done: make(chan struct{}),
pendingListens: make(map[int64]*pendingListen),
telemetry: coll,
debugSummary: newAudioStutterDebugLogger(),
summaryStop: make(chan struct{}),
}
go st.worker()
st.summaryWG.Add(1)
go st.summaryWorker()
return st
}

@@ -386,7 +400,7 @@ func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
if st.telemetry != nil {
st.telemetry.IncCounter("streamer.feed.drop", 1, nil)
st.telemetry.Event("stream_feed_drop", "warn", "feed queue full", nil, map[string]any{
"trace_id": traceID,
"trace_id": traceID,
"queue_len": len(st.feedCh),
})
}
@@ -415,8 +429,14 @@ func (st *Streamer) processFeed(msg streamFeedMsg) {
st.lastProcTS = now
defer st.mu.Unlock()
defer func() {
procMs := float64(time.Since(procStart).Microseconds()) / 1000.0
st.processLoopCount++
st.processLoopSumMs += procMs
if procMs > st.processLoopMaxMs {
st.processLoopMaxMs = procMs
}
if st.telemetry != nil {
st.telemetry.Observe("streamer.process.total_ms", float64(time.Since(procStart).Microseconds())/1000.0, nil)
st.telemetry.Observe("streamer.process.total_ms", procMs, nil)
st.telemetry.Observe("streamer.lock_wait_ms", float64(lockWait.Microseconds())/1000.0, telemetry.TagsFromPairs("lock", "process"))
}
}()
@@ -538,6 +558,11 @@ func (st *Streamer) processFeed(msg streamFeedMsg) {
}
}
if len(audio) > 0 {
ch := sess.channels
if ch <= 0 {
ch = 1
}
st.producedPCMFrames += uint64(len(audio) / ch)
if sess.wavSamples == 0 && audioRate > 0 {
sess.sampleRate = audioRate
}
@@ -691,7 +716,7 @@ func (st *Streamer) processFeed(msg streamFeedMsg) {
if st.telemetry != nil {
st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID))
st.telemetry.Event("session_close", "info", "stream session closed", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID), map[string]any{
"reason": "signal_missing",
"reason": "signal_missing",
"listen_only": sess.listenOnly,
})
}
@@ -775,6 +800,8 @@ func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
}

func (st *Streamer) CloseAll() {
close(st.summaryStop)
st.summaryWG.Wait()
close(st.feedCh)
<-st.done

@@ -800,6 +827,9 @@ func (st *Streamer) CloseAll() {
if st.telemetry != nil {
st.telemetry.Event("streamer_close_all", "info", "all stream sessions closed", nil, nil)
}
if st.debugSummary != nil {
st.debugSummary.Close()
}
}

// ActiveSessions returns the number of open streaming sessions.
@@ -2009,6 +2039,84 @@ func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
}
}

func (st *Streamer) summaryWorker() {
defer st.summaryWG.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
st.writePeriodicSummary()
case <-st.summaryStop:
return
}
}
}

func (st *Streamer) writePeriodicSummary() {
st.mu.Lock()
activeSubscribers := 0
for _, sess := range st.sessions {
activeSubscribers += len(sess.audioSubs)
}
processAvg := 0.0
if st.processLoopCount > 0 {
processAvg = st.processLoopSumMs / float64(st.processLoopCount)
}
summary := map[string]any{
"ts": time.Now().UTC().Format(time.RFC3339Nano),
"feed_drop_total": st.droppedFeed,
"pcm_drop_total": st.droppedPCM,
"active_subscribers": activeSubscribers,
"pending_listeners": len(st.pendingListens),
"active_sessions": len(st.sessions),
"produced_pcm_frames": st.producedPCMFrames,
"process_loop_ms_avg": processAvg,
"process_loop_ms_max": st.processLoopMaxMs,
"feed_queue_len": len(st.feedCh),
"feed_queue_cap": cap(st.feedCh),
"feed_queue_fill_ratio": safeRatio(float64(len(st.feedCh)), float64(cap(st.feedCh))),
"backpressure_hint": st.backpressureHintLocked(),
}
st.processLoopCount = 0
st.processLoopSumMs = 0
st.processLoopMaxMs = 0
st.mu.Unlock()

if st.debugSummary != nil {
_ = st.debugSummary.WriteServerSummary(summary)
}
}

func (st *Streamer) backpressureHintLocked() string {
queueLen := len(st.feedCh)
queueCap := cap(st.feedCh)
if queueCap > 0 && float64(queueLen)/float64(queueCap) >= 0.8 {
return "feed_queue_high"
}
if st.droppedFeed > 0 || st.droppedPCM > 0 {
return "drops_seen"
}
if len(st.pendingListens) > 0 {
return "pending_listeners"
}
return "ok"
}

func safeRatio(a float64, b float64) float64 {
if b <= 0 {
return 0
}
return a / b
}

func (st *Streamer) AppendBrowserAudioSummary(v any) error {
if st == nil || st.debugSummary == nil {
return nil
}
return st.debugSummary.WriteBrowserSummary(v)
}

func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
if len(st.policy.ClassFilter) == 0 {
return true


+ 92
- 0
web/app.js Visa fil

@@ -218,6 +218,20 @@ class LiveListenWS {
this._flushTimer = 0;
// Fade state for soft resync
this._lastEndSample = null; // last sample value per channel for crossfade
this._summaryTimer = 0;
this._stats = {
startedAtMs: performance.now(),
pcmChunksRx: 0,
pcmSamplesRx: 0,
acceptedChunks: 0,
droppedMaxBuffered: 0,
underruns: 0,
resyncs: 0,
lastAcceptedChunkAtMs: 0,
lastLeadMs: 0,
maxLeadMs: Number.NEGATIVE_INFINITY,
minLeadMs: Number.POSITIVE_INFINITY
};
}

start() {
@@ -226,6 +240,7 @@ class LiveListenWS {
this.ws = new WebSocket(url);
this.ws.binaryType = 'arraybuffer';
this.playing = true;
this._startSummaryTicker();

this.ws.onmessage = (ev) => {
if (typeof ev.data === 'string') {
@@ -248,15 +263,20 @@ class LiveListenWS {
return;
}
if (!this.audioCtx || !this.playing) return;
this._stats.pcmChunksRx++;
this._onPCM(ev.data);
};

this.ws.onclose = () => {
this.playing = false;
this._emitSummary('ws_close');
this._stopSummaryTicker();
if (this._onStop) this._onStop();
};
this.ws.onerror = () => {
this.playing = false;
this._emitSummary('ws_error');
this._stopSummaryTicker();
if (this._onStop) this._onStop();
};

@@ -266,6 +286,8 @@ class LiveListenWS {
}

stop() {
this._emitSummary('stop');
this._stopSummaryTicker();
this.playing = false;
if (this.ws) { this.ws.close(); this.ws = null; }
this._teardownAudio();
@@ -283,6 +305,21 @@ class LiveListenWS {
this._lastEndSample = null;
}

_startSummaryTicker() {
this._stopSummaryTicker();
this._summaryTimer = setInterval(() => {
if (!this.playing) return;
this._emitSummary('periodic');
}, 5000);
}

_stopSummaryTicker() {
if (this._summaryTimer) {
clearInterval(this._summaryTimer);
this._summaryTimer = 0;
}
}

_initAudio() {
if (this.audioCtx) return;
this.audioCtx = new (window.AudioContext || window.webkitAudioContext)({
@@ -296,6 +333,7 @@ class LiveListenWS {

_onPCM(buf) {
const chunk = new Int16Array(buf);
this._stats.pcmSamplesRx += chunk.length;
const maxPendingFrames = Math.ceil(this.sampleRate * 0.25);
const maxPendingSamples = maxPendingFrames * Math.max(1, this.channels);

@@ -365,6 +403,10 @@ class LiveListenWS {
}

const now = ctx.currentTime;
const leadMsBefore = (this.nextTime - now) * 1000;
this._stats.lastLeadMs = leadMsBefore;
if (leadMsBefore > this._stats.maxLeadMs) this._stats.maxLeadMs = leadMsBefore;
if (leadMsBefore < this._stats.minLeadMs) this._stats.minLeadMs = leadMsBefore;

// Target latency: 400ms. This means we schedule audio to play 400ms
// from now. Even if the main thread hangs for 300ms, the already-
@@ -377,6 +419,10 @@ class LiveListenWS {
if (!this.started || this.nextTime < now) {
// First chunk or underrun.
// Apply fade-in to avoid click at resync point.
if (this.started && this.nextTime < now) {
this._stats.underruns++;
this._stats.resyncs++;
}
const fadeIn = Math.min(64, nFrames);
for (let ch = 0; ch < this.channels; ch++) {
const data = audioBuffer.getChannelData(ch);
@@ -390,6 +436,7 @@ class LiveListenWS {

if (this.nextTime > now + maxBuffered) {
// Too much buffered — drop to cap latency
this._stats.droppedMaxBuffered++;
return;
}

@@ -397,8 +444,53 @@ class LiveListenWS {
source.buffer = audioBuffer;
source.connect(ctx.destination);
source.start(this.nextTime);
this._stats.acceptedChunks++;
this._stats.lastAcceptedChunkAtMs = performance.now();
this.nextTime += audioBuffer.duration;
}

_emitSummary(reason) {
const nowMs = performance.now();
const audioNow = this.audioCtx ? this.audioCtx.currentTime : null;
const leadMs = this.audioCtx ? (this.nextTime - this.audioCtx.currentTime) * 1000 : null;
const sinceAcceptedMs = this._stats.lastAcceptedChunkAtMs > 0
? nowMs - this._stats.lastAcceptedChunkAtMs
: -1;
const payload = {
ts_client: new Date().toISOString(),
reason,
freq_hz: this.freq,
bw_hz: this.bw,
mode: this.mode,
sample_rate: this.sampleRate,
channels: this.channels,
playing: this.playing,
audio_current_time: audioNow,
audio_next_time: this.nextTime,
lead_ms: leadMs,
lead_ms_last: this._stats.lastLeadMs,
lead_ms_max: Number.isFinite(this._stats.maxLeadMs) ? this._stats.maxLeadMs : null,
lead_ms_min: Number.isFinite(this._stats.minLeadMs) ? this._stats.minLeadMs : null,
pcm_chunks_rx: this._stats.pcmChunksRx,
pcm_samples_rx: this._stats.pcmSamplesRx,
accepted_chunks: this._stats.acceptedChunks,
max_buffered_drops: this._stats.droppedMaxBuffered,
underruns: this._stats.underruns,
resyncs: this._stats.resyncs,
ms_since_last_accepted_chunk: sinceAcceptedMs,
uptime_ms: nowMs - this._stats.startedAtMs
};
postBrowserAudioSummary(payload);
}
}

function postBrowserAudioSummary(payload) {
fetch('/api/debug/audio-stutter/browser-summary', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
keepalive: true
}).catch(() => {});
}

const liveListenDefaults = {


Laddar…
Avbryt
Spara