From edd4df0e5ded7451ab340271553feea7c0c8d038 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Wed, 25 Mar 2026 06:41:19 +0100 Subject: [PATCH] debug: add live telemetry for click investigation --- cmd/sdrd/dsp_loop.go | 93 +- cmd/sdrd/helpers.go | 143 +++ cmd/sdrd/http_handlers.go | 188 +++- cmd/sdrd/main.go | 28 +- cmd/sdrd/pipeline_runtime.go | 58 +- cmd/sdrd/source_manager.go | 43 +- cmd/sdrd/types.go | 2 + config.yaml | 12 + docs/audio-click-debug-notes-2026-03-24.md | 36 +- docs/telemetry-debug-runbook.md | 55 + internal/config/config.go | 52 + .../demod/gpudemod/build/gpudemod_kernels.exp | Bin 0 -> 2824 bytes internal/dsp/decimating_fir.go | 14 + internal/recorder/recorder.go | 23 +- internal/recorder/streamer.go | 177 +++- internal/telemetry/telemetry.go | 965 ++++++++++++++++++ 16 files changed, 1856 insertions(+), 33 deletions(-) create mode 100644 docs/telemetry-debug-runbook.md create mode 100644 internal/demod/gpudemod/build/gpudemod_kernels.exp create mode 100644 internal/telemetry/telemetry.go diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index 73f95d1..c9b0a78 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "fmt" "log" "os" "runtime/debug" @@ -16,15 +17,16 @@ import ( "sdr-wideband-suite/internal/logging" "sdr-wideband-suite/internal/pipeline" "sdr-wideband-suite/internal/recorder" + "sdr-wideband-suite/internal/telemetry" ) -func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot, extractMgr *extractionManager, phaseSnap *phaseSnapshot) { +func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot, extractMgr *extractionManager, phaseSnap *phaseSnapshot, coll *telemetry.Collector) { defer func() { if r := recover(); r != nil { log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack()) } }() - rt := newDSPRuntime(cfg, det, window, gpuState) + rt := newDSPRuntime(cfg, det, window, gpuState, coll) ticker := time.NewTicker(cfg.FrameInterval()) defer ticker.Stop() logTicker := time.NewTicker(5 * time.Second) @@ -33,6 +35,9 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * dcBlocker := dsp.NewDCBlocker(0.995) state := &phaseState{} var frameID uint64 + prevDisplayed := map[int64]detector.Signal{} + lastSourceDrops := uint64(0) + lastSourceResets := uint64(0) for { select { case <-ctx.Done(): @@ -40,11 +45,28 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * case <-logTicker.C: st := srcMgr.Stats() log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs) + if coll != nil { + coll.SetGauge("source.buffer_samples", float64(st.BufferSamples), nil) + coll.SetGauge("source.last_sample_ago_ms", float64(st.LastSampleAgoMs), nil) + if st.Dropped > lastSourceDrops { + coll.IncCounter("source.drop.count", float64(st.Dropped-lastSourceDrops), nil) + } + if st.Resets > lastSourceResets { + coll.IncCounter("source.reset.count", float64(st.Resets-lastSourceResets), nil) + coll.Event("source_reset", "warn", "source reset observed", nil, map[string]any{"resets": st.Resets}) + } + lastSourceDrops = st.Dropped + lastSourceResets = st.Resets + } case upd := <-updates: rt.applyUpdate(upd, srcMgr, rec, gpuState) dcBlocker.Reset() ticker.Reset(rt.cfg.FrameInterval()) + if coll != nil { + coll.IncCounter("dsp.update.apply", 1, nil) + } case <-ticker.C: + frameStart := time.Now() frameID++ art, err := rt.captureSpectrum(srcMgr, rec, dcBlocker, gpuState) if err != nil { @@ -61,8 +83,19 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * rt.gotSamples = true } logging.Debug("trace", "capture_done", "trace", frameID, "allIQ", len(art.allIQ), "detailIQ", len(art.detailIQ)) + if coll != nil { + coll.Observe("stage.capture.duration_ms", float64(time.Since(frameStart).Microseconds())/1000.0, telemetry.TagsFromPairs("frame_id", fmt.Sprintf("%d", frameID))) + } + survStart := time.Now() state.surveillance = rt.buildSurveillanceResult(art) + if coll != nil { + coll.Observe("stage.surveillance.duration_ms", float64(time.Since(survStart).Microseconds())/1000.0, telemetry.TagsFromPairs("frame_id", fmt.Sprintf("%d", frameID))) + } + refineStart := time.Now() state.refinement = rt.runRefinement(art, state.surveillance, extractMgr, rec) + if coll != nil { + coll.Observe("stage.refinement.duration_ms", float64(time.Since(refineStart).Microseconds())/1000.0, telemetry.TagsFromPairs("frame_id", fmt.Sprintf("%d", frameID))) + } finished := state.surveillance.Finished thresholds := state.surveillance.Thresholds noiseFloor := state.surveillance.NoiseFloor @@ -82,12 +115,36 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } rec.ResetStreams() logging.Warn("gap", "iq_dropped", "msg", "buffer bloat caused extraction drop; overlap reset") + if coll != nil { + coll.IncCounter("capture.stream_reset", 1, nil) + coll.Event("iq_dropped", "warn", "stream overlap reset after dropped IQ", nil, map[string]any{"frame_id": frameID}) + } } if rt.cfg.Recorder.DebugLiveAudio { log.Printf("LIVEAUDIO DSP: detailIQ=%d displaySignals=%d streamSignals=%d stableSignals=%d allIQ=%d", len(art.detailIQ), len(displaySignals), len(streamSignals), len(stableSignals), len(art.allIQ)) } aqCfg := extractionConfig{firTaps: rt.cfg.Recorder.ExtractionTaps, bwMult: rt.cfg.Recorder.ExtractionBwMult} + extractStart := time.Now() streamSnips, streamRates := extractForStreaming(extractMgr, art.allIQ, rt.cfg.SampleRate, rt.cfg.CenterHz, streamSignals, rt.streamPhaseState, rt.streamOverlap, aqCfg) + if coll != nil { + coll.Observe("stage.extract_stream.duration_ms", float64(time.Since(extractStart).Microseconds())/1000.0, telemetry.TagsFromPairs("frame_id", fmt.Sprintf("%d", frameID))) + coll.SetGauge("stage.extract_stream.signals", float64(len(streamSignals)), nil) + if coll.ShouldSampleHeavy() { + for i := range streamSnips { + if i >= len(streamSignals) { + break + } + tags := telemetry.TagsFromPairs( + "signal_id", fmt.Sprintf("%d", streamSignals[i].ID), + "stage", "extract_stream", + ) + coll.SetGauge("iq.stage.extract.length", float64(len(streamSnips[i])), tags) + if len(streamSnips[i]) > 0 { + observeIQStats(coll, "extract_stream", streamSnips[i], tags) + } + } + } + } nonEmpty := 0 minLen := 0 maxLen := 0 @@ -135,10 +192,18 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * log.Printf("LIVEAUDIO DSP: feedItems=%d", len(items)) } if len(items) > 0 { + feedStart := time.Now() rec.FeedSnippets(items, frameID) + if coll != nil { + coll.Observe("stage.feed_enqueue.duration_ms", float64(time.Since(feedStart).Microseconds())/1000.0, telemetry.TagsFromPairs("frame_id", fmt.Sprintf("%d", frameID))) + coll.SetGauge("stage.feed.items", float64(len(items)), nil) + } logging.Debug("trace", "feed", "trace", frameID, "items", len(items), "signals", len(streamSignals), "allIQ", len(art.allIQ)) } else { logging.Warn("gap", "feed_empty", "signals", len(streamSignals), "trace", frameID) + if coll != nil { + coll.IncCounter("stage.feed.empty", 1, nil) + } } } rt.maintenance(displaySignals, rec) @@ -164,6 +229,27 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * if sigSnap != nil { sigSnap.set(displaySignals) } + if coll != nil { + coll.SetGauge("signals.display.count", float64(len(displaySignals)), nil) + current := make(map[int64]detector.Signal, len(displaySignals)) + for _, s := range displaySignals { + current[s.ID] = s + if _, ok := prevDisplayed[s.ID]; !ok { + coll.Event("signal_create", "info", "signal entered display set", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", s.ID)), map[string]any{ + "center_hz": s.CenterHz, + "bw_hz": s.BWHz, + }) + } + } + for id, prev := range prevDisplayed { + if _, ok := current[id]; !ok { + coll.Event("signal_remove", "info", "signal left display set", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id)), map[string]any{ + "center_hz": prev.CenterHz, + }) + } + } + prevDisplayed = current + } eventMu.Lock() for _, ev := range finished { _ = enc.Encode(ev) @@ -252,6 +338,9 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * debugInfo.Refinement = refinementDebug } h.broadcast(SpectrumFrame{Timestamp: art.now.UnixMilli(), CenterHz: rt.cfg.CenterHz, SampleHz: rt.cfg.SampleRate, FFTSize: rt.cfg.FFTSize, Spectrum: art.surveillanceSpectrum, Signals: displaySignals, Debug: debugInfo}) + if coll != nil { + coll.Observe("dsp.frame.duration_ms", float64(time.Since(frameStart).Microseconds())/1000.0, nil) + } } } } diff --git a/cmd/sdrd/helpers.go b/cmd/sdrd/helpers.go index 26a25ca..bfe3b4f 100644 --- a/cmd/sdrd/helpers.go +++ b/cmd/sdrd/helpers.go @@ -14,6 +14,7 @@ import ( "sdr-wideband-suite/internal/detector" "sdr-wideband-suite/internal/dsp" "sdr-wideband-suite/internal/logging" + "sdr-wideband-suite/internal/telemetry" ) func mustParseDuration(raw string, fallback time.Duration) time.Duration { @@ -381,6 +382,7 @@ func extractForStreaming( } if i == 0 { logging.Debug("boundary", "extract_trim", "path", "gpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(iq), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID) + logExtractorHeadComparison(signals[i].ID, "gpu", overlapLen, res.IQ, trimSamples, iq) } out[i] = iq rates[i] = res.Rate @@ -453,8 +455,149 @@ func extractForStreaming( } if i == 0 { logging.Debug("boundary", "extract_trim", "path", "cpu", "raw_len", rawLen, "trim", trimSamples, "out_len", len(decimated), "overlap_len", overlapLen, "allIQ_len", len(allIQ), "gpuIQ_len", len(gpuIQ), "outRate", outRate, "signal", signals[i].ID) + logExtractorHeadComparison(signals[i].ID, "cpu", overlapLen, decimated, trimSamples, decimated) } out[i] = decimated } return out, rates } + +type iqHeadStats struct { + length int + minMag float64 + maxMag float64 + meanMag float64 + lowMag int + maxStep float64 + maxStepIdx int + p95Step float64 + headTail float64 + headMinIdx int + stepSamples []float64 +} + +func computeIQHeadStats(iq []complex64, headLen int) iqHeadStats { + stats := iqHeadStats{minMag: math.MaxFloat64, headMinIdx: -1, maxStepIdx: -1} + if len(iq) == 0 { + stats.minMag = 0 + return stats + } + n := len(iq) + if headLen > 0 && headLen < n { + n = headLen + } + stats.length = n + stats.stepSamples = make([]float64, 0, max(0, n-1)) + sumMag := 0.0 + headSum := 0.0 + tailSum := 0.0 + tailCount := 0 + for i := 0; i < n; i++ { + v := iq[i] + mag := math.Hypot(float64(real(v)), float64(imag(v))) + if mag < stats.minMag { + stats.minMag = mag + stats.headMinIdx = i + } + if mag > stats.maxMag { + stats.maxMag = mag + } + sumMag += mag + if mag < 0.05 { + stats.lowMag++ + } + if i < min(16, n) { + headSum += mag + } + if i >= max(0, n-16) { + tailSum += mag + tailCount++ + } + if i > 0 { + p := iq[i-1] + num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v)) + den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v)) + step := math.Abs(math.Atan2(num, den)) + if step > stats.maxStep { + stats.maxStep = step + stats.maxStepIdx = i - 1 + } + stats.stepSamples = append(stats.stepSamples, step) + } + } + stats.meanMag = sumMag / float64(n) + if len(stats.stepSamples) > 0 { + sorted := append([]float64(nil), stats.stepSamples...) + sort.Float64s(sorted) + idx := int(float64(len(sorted)-1) * 0.95) + stats.p95Step = sorted[idx] + } else { + stats.p95Step = stats.maxStep + } + if headSum > 0 && tailCount > 0 { + headMean := headSum / float64(min(16, n)) + tailMean := tailSum / float64(tailCount) + if tailMean > 0 { + stats.headTail = headMean / tailMean + } + } + return stats +} + +func observeIQStats(coll *telemetry.Collector, stage string, iq []complex64, tags telemetry.Tags) { + if coll == nil || len(iq) == 0 { + return + } + stats := computeIQHeadStats(iq, len(iq)) + stageTags := telemetry.TagsWith(tags, "stage", stage) + coll.Observe("iq.magnitude.min", stats.minMag, stageTags) + coll.Observe("iq.magnitude.max", stats.maxMag, stageTags) + coll.Observe("iq.magnitude.mean", stats.meanMag, stageTags) + coll.Observe("iq.phase_step.max", stats.maxStep, stageTags) + coll.Observe("iq.phase_step.p95", stats.p95Step, stageTags) + coll.Observe("iq.low_magnitude.count", float64(stats.lowMag), stageTags) + coll.SetGauge("iq.length", float64(stats.length), stageTags) +} + +func logExtractorHeadComparison(signalID int64, path string, overlapLen int, raw []complex64, trimSamples int, out []complex64) { + rawStats := computeIQHeadStats(raw, 96) + trimmedStats := computeIQHeadStats(out, 96) + logging.Debug("boundary", "extract_head_compare", + "signal", signalID, + "path", path, + "raw_len", len(raw), + "trim", trimSamples, + "out_len", len(out), + "overlap_len", overlapLen, + "raw_min_mag", rawStats.minMag, + "raw_min_idx", rawStats.headMinIdx, + "raw_max_step", rawStats.maxStep, + "raw_max_step_idx", rawStats.maxStepIdx, + "raw_head_tail", rawStats.headTail, + "trimmed_min_mag", trimmedStats.minMag, + "trimmed_min_idx", trimmedStats.headMinIdx, + "trimmed_max_step", trimmedStats.maxStep, + "trimmed_max_step_idx", trimmedStats.maxStepIdx, + "trimmed_head_tail", trimmedStats.headTail, + ) + for _, off := range []int{2, 4, 8, 16} { + if len(out) <= off+8 { + continue + } + offStats := computeIQHeadStats(out[off:], 96) + logging.Debug("boundary", "extract_head_offset_compare", + "signal", signalID, + "path", path, + "offset", off, + "base_min_mag", trimmedStats.minMag, + "base_min_idx", trimmedStats.headMinIdx, + "base_max_step", trimmedStats.maxStep, + "base_max_step_idx", trimmedStats.maxStepIdx, + "offset_min_mag", offStats.minMag, + "offset_min_idx", offStats.headMinIdx, + "offset_max_step", offStats.maxStep, + "offset_max_step_idx", offStats.maxStepIdx, + "offset_head_tail", offStats.headTail, + ) + } +} diff --git a/cmd/sdrd/http_handlers.go b/cmd/sdrd/http_handlers.go index 14c0846..a633fde 100644 --- a/cmd/sdrd/http_handlers.go +++ b/cmd/sdrd/http_handlers.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "errors" "log" "net/http" "os" @@ -19,9 +20,10 @@ import ( "sdr-wideband-suite/internal/pipeline" "sdr-wideband-suite/internal/recorder" "sdr-wideband-suite/internal/runtime" + "sdr-wideband-suite/internal/telemetry" ) -func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime.Manager, srcMgr *sourceManager, dspUpdates chan dspUpdate, gpuState *gpuStatus, recMgr *recorder.Manager, sigSnap *signalSnapshot, eventMu *sync.RWMutex, phaseSnap *phaseSnapshot) { +func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime.Manager, srcMgr *sourceManager, dspUpdates chan dspUpdate, gpuState *gpuStatus, recMgr *recorder.Manager, sigSnap *signalSnapshot, eventMu *sync.RWMutex, phaseSnap *phaseSnapshot, telem *telemetry.Collector) { mux.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") switch r.Method { @@ -378,16 +380,196 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime w.Header().Set("Content-Type", "audio/wav") _, _ = w.Write(data) }) + mux.HandleFunc("/api/debug/telemetry/live", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if telem == nil { + _ = json.NewEncoder(w).Encode(map[string]any{"enabled": false, "error": "telemetry unavailable"}) + return + } + _ = json.NewEncoder(w).Encode(telem.LiveSnapshot()) + }) + mux.HandleFunc("/api/debug/telemetry/history", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if telem == nil { + http.Error(w, "telemetry unavailable", http.StatusServiceUnavailable) + return + } + query, err := telemetryQueryFromRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + items, err := telem.QueryMetrics(query) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _ = json.NewEncoder(w).Encode(map[string]any{"items": items, "count": len(items)}) + }) + mux.HandleFunc("/api/debug/telemetry/events", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if telem == nil { + http.Error(w, "telemetry unavailable", http.StatusServiceUnavailable) + return + } + query, err := telemetryQueryFromRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + items, err := telem.QueryEvents(query) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _ = json.NewEncoder(w).Encode(map[string]any{"items": items, "count": len(items)}) + }) + mux.HandleFunc("/api/debug/telemetry/config", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if telem == nil { + http.Error(w, "telemetry unavailable", http.StatusServiceUnavailable) + return + } + switch r.Method { + case http.MethodGet: + _ = json.NewEncoder(w).Encode(map[string]any{ + "collector": telem.Config(), + "config": cfgManager.Snapshot().Debug.Telemetry, + }) + case http.MethodPost: + var update struct { + Enabled *bool `json:"enabled"` + HeavyEnabled *bool `json:"heavy_enabled"` + HeavySampleEvery *int `json:"heavy_sample_every"` + MetricSampleEvery *int `json:"metric_sample_every"` + MetricHistoryMax *int `json:"metric_history_max"` + EventHistoryMax *int `json:"event_history_max"` + RetentionSeconds *int `json:"retention_seconds"` + PersistEnabled *bool `json:"persist_enabled"` + PersistDir *string `json:"persist_dir"` + RotateMB *int `json:"rotate_mb"` + KeepFiles *int `json:"keep_files"` + } + if err := json.NewDecoder(r.Body).Decode(&update); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + next := cfgManager.Snapshot() + cur := next.Debug.Telemetry + if update.Enabled != nil { + cur.Enabled = *update.Enabled + } + if update.HeavyEnabled != nil { + cur.HeavyEnabled = *update.HeavyEnabled + } + if update.HeavySampleEvery != nil { + cur.HeavySampleEvery = *update.HeavySampleEvery + } + if update.MetricSampleEvery != nil { + cur.MetricSampleEvery = *update.MetricSampleEvery + } + if update.MetricHistoryMax != nil { + cur.MetricHistoryMax = *update.MetricHistoryMax + } + if update.EventHistoryMax != nil { + cur.EventHistoryMax = *update.EventHistoryMax + } + if update.RetentionSeconds != nil { + cur.RetentionSeconds = *update.RetentionSeconds + } + if update.PersistEnabled != nil { + cur.PersistEnabled = *update.PersistEnabled + } + if update.PersistDir != nil && *update.PersistDir != "" { + cur.PersistDir = *update.PersistDir + } + if update.RotateMB != nil { + cur.RotateMB = *update.RotateMB + } + if update.KeepFiles != nil { + cur.KeepFiles = *update.KeepFiles + } + next.Debug.Telemetry = cur + cfgManager.Replace(next) + if err := config.Save(cfgPath, next); err != nil { + log.Printf("telemetry config save failed: %v", err) + } + err := telem.Configure(telemetry.Config{ + Enabled: cur.Enabled, + HeavyEnabled: cur.HeavyEnabled, + HeavySampleEvery: cur.HeavySampleEvery, + MetricSampleEvery: cur.MetricSampleEvery, + MetricHistoryMax: cur.MetricHistoryMax, + EventHistoryMax: cur.EventHistoryMax, + Retention: time.Duration(cur.RetentionSeconds) * time.Second, + PersistEnabled: cur.PersistEnabled, + PersistDir: cur.PersistDir, + RotateMB: cur.RotateMB, + KeepFiles: cur.KeepFiles, + }) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "collector": telem.Config(), "config": cur}) + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + } + }) } -func newHTTPServer(addr string, webRoot string, h *hub, cfgPath string, cfgManager *runtime.Manager, srcMgr *sourceManager, dspUpdates chan dspUpdate, gpuState *gpuStatus, recMgr *recorder.Manager, sigSnap *signalSnapshot, eventMu *sync.RWMutex, phaseSnap *phaseSnapshot) *http.Server { +func newHTTPServer(addr string, webRoot string, h *hub, cfgPath string, cfgManager *runtime.Manager, srcMgr *sourceManager, dspUpdates chan dspUpdate, gpuState *gpuStatus, recMgr *recorder.Manager, sigSnap *signalSnapshot, eventMu *sync.RWMutex, phaseSnap *phaseSnapshot, telem *telemetry.Collector) *http.Server { mux := http.NewServeMux() registerWSHandlers(mux, h, recMgr) - registerAPIHandlers(mux, cfgPath, cfgManager, srcMgr, dspUpdates, gpuState, recMgr, sigSnap, eventMu, phaseSnap) + registerAPIHandlers(mux, cfgPath, cfgManager, srcMgr, dspUpdates, gpuState, recMgr, sigSnap, eventMu, phaseSnap, telem) mux.Handle("/", http.FileServer(http.Dir(webRoot))) return &http.Server{Addr: addr, Handler: mux} } +func telemetryQueryFromRequest(r *http.Request) (telemetry.Query, error) { + q := r.URL.Query() + var out telemetry.Query + var err error + if out.From, err = telemetry.ParseTimeQuery(q.Get("since")); err != nil { + return out, errors.New("invalid since") + } + if out.To, err = telemetry.ParseTimeQuery(q.Get("until")); err != nil { + return out, errors.New("invalid until") + } + if v := q.Get("limit"); v != "" { + if parsed, parseErr := strconv.Atoi(v); parseErr == nil { + out.Limit = parsed + } + } + out.Name = q.Get("name") + out.NamePrefix = q.Get("prefix") + out.Level = q.Get("level") + out.IncludePersisted = true + if v := q.Get("include_persisted"); v != "" { + if b, parseErr := strconv.ParseBool(v); parseErr == nil { + out.IncludePersisted = b + } + } + tags := telemetry.Tags{} + for key, vals := range q { + if len(vals) == 0 { + continue + } + if strings.HasPrefix(key, "tag_") { + tags[strings.TrimPrefix(key, "tag_")] = vals[0] + } + } + for _, key := range []string{"signal_id", "session_id", "stage", "trace_id", "component"} { + if v := q.Get(key); v != "" { + tags[key] = v + } + } + if len(tags) > 0 { + out.Tags = tags + } + return out, nil +} + func shutdownServer(server *http.Server) { ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second) defer cancelTimeout() diff --git a/cmd/sdrd/main.go b/cmd/sdrd/main.go index 77a9814..361775d 100644 --- a/cmd/sdrd/main.go +++ b/cmd/sdrd/main.go @@ -23,6 +23,7 @@ import ( "sdr-wideband-suite/internal/runtime" "sdr-wideband-suite/internal/sdr" "sdr-wideband-suite/internal/sdrplay" + "sdr-wideband-suite/internal/telemetry" ) func main() { @@ -51,6 +52,25 @@ func main() { cfgManager := runtime.New(cfg) gpuState := &gpuStatus{Available: gpufft.Available()} + telemetryCfg := telemetry.Config{ + Enabled: cfg.Debug.Telemetry.Enabled, + HeavyEnabled: cfg.Debug.Telemetry.HeavyEnabled, + HeavySampleEvery: cfg.Debug.Telemetry.HeavySampleEvery, + MetricSampleEvery: cfg.Debug.Telemetry.MetricSampleEvery, + MetricHistoryMax: cfg.Debug.Telemetry.MetricHistoryMax, + EventHistoryMax: cfg.Debug.Telemetry.EventHistoryMax, + Retention: time.Duration(cfg.Debug.Telemetry.RetentionSeconds) * time.Second, + PersistEnabled: cfg.Debug.Telemetry.PersistEnabled, + PersistDir: cfg.Debug.Telemetry.PersistDir, + RotateMB: cfg.Debug.Telemetry.RotateMB, + KeepFiles: cfg.Debug.Telemetry.KeepFiles, + } + telemetryCollector, err := telemetry.New(telemetryCfg) + if err != nil { + log.Fatalf("telemetry init failed: %v", err) + } + defer telemetryCollector.Close() + telemetryCollector.SetStatus("build", "sdrd") newSource := func(cfg config.Config) (sdr.Source, error) { if mockFlag { @@ -74,7 +94,7 @@ func main() { if err != nil { log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err) } - srcMgr := newSourceManager(src, newSource) + srcMgr := newSourceManagerWithTelemetry(src, newSource, telemetryCollector) if err := srcMgr.Start(); err != nil { log.Fatalf("source start: %v", err) } @@ -118,7 +138,7 @@ func main() { DeemphasisUs: cfg.Recorder.DeemphasisUs, ExtractionTaps: cfg.Recorder.ExtractionTaps, ExtractionBwMult: cfg.Recorder.ExtractionBwMult, - }, cfg.CenterHz, decodeMap) + }, cfg.CenterHz, decodeMap, telemetryCollector) defer recMgr.Close() sigSnap := &signalSnapshot{} @@ -126,9 +146,9 @@ func main() { defer extractMgr.reset() phaseSnap := &phaseSnapshot{} - go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr, sigSnap, extractMgr, phaseSnap) + go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr, sigSnap, extractMgr, phaseSnap, telemetryCollector) - server := newHTTPServer(cfg.WebAddr, cfg.WebRoot, h, cfgPath, cfgManager, srcMgr, dspUpdates, gpuState, recMgr, sigSnap, eventMu, phaseSnap) + server := newHTTPServer(cfg.WebAddr, cfg.WebRoot, h, cfgPath, cfgManager, srcMgr, dspUpdates, gpuState, recMgr, sigSnap, eventMu, phaseSnap, telemetryCollector) go func() { log.Printf("web listening on %s", cfg.WebAddr) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { diff --git a/cmd/sdrd/pipeline_runtime.go b/cmd/sdrd/pipeline_runtime.go index 1d498f6..f8cf665 100644 --- a/cmd/sdrd/pipeline_runtime.go +++ b/cmd/sdrd/pipeline_runtime.go @@ -21,6 +21,7 @@ import ( "sdr-wideband-suite/internal/pipeline" "sdr-wideband-suite/internal/rds" "sdr-wideband-suite/internal/recorder" + "sdr-wideband-suite/internal/telemetry" ) type rdsState struct { @@ -66,6 +67,7 @@ type dspRuntime struct { arbiter *pipeline.Arbiter arbitration pipeline.ArbitrationState gotSamples bool + telemetry *telemetry.Collector } type spectrumArtifacts struct { @@ -109,7 +111,7 @@ type surveillancePlan struct { const derivedIDBlock = int64(1_000_000_000) -func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime { +func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus, coll *telemetry.Collector) *dspRuntime { detailFFT := cfg.Refinement.DetailFFTSize if detailFFT <= 0 { detailFFT = cfg.FFTSize @@ -134,6 +136,7 @@ func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, streamPhaseState: map[int64]*streamExtractState{}, streamOverlap: &streamIQOverlap{}, arbiter: pipeline.NewArbiter(), + telemetry: coll, } if rt.useGPU && gpuState != nil { snap := gpuState.snapshot() @@ -231,6 +234,15 @@ func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *rec gpuState.set(false, nil) } } + if rt.telemetry != nil { + rt.telemetry.Event("dsp_config_update", "info", "dsp runtime configuration updated", nil, map[string]any{ + "fft_size": rt.cfg.FFTSize, + "sample_rate": rt.cfg.SampleRate, + "use_gpu_fft": rt.cfg.UseGPUFFT, + "detail_fft": rt.detailFFT, + "surv_strategy": rt.cfg.Surveillance.Strategy, + }) + } } func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 { @@ -350,12 +362,19 @@ func (rt *dspRuntime) decimateSurveillanceIQ(iq []complex64, factor int) []compl } func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) { + start := time.Now() required := rt.cfg.FFTSize if rt.detailFFT > required { required = rt.detailFFT } available := required st := srcMgr.Stats() + if rt.telemetry != nil { + rt.telemetry.SetGauge("source.buffer_samples", float64(st.BufferSamples), nil) + rt.telemetry.SetGauge("source.last_sample_ago_ms", float64(st.LastSampleAgoMs), nil) + rt.telemetry.SetGauge("source.dropped", float64(st.Dropped), nil) + rt.telemetry.SetGauge("source.resets", float64(st.Resets), nil) + } if forceFixedStreamReadSamples > 0 { available = forceFixedStreamReadSamples if available < required { @@ -373,12 +392,24 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag } } logging.Debug("capture", "read_iq", "required", required, "available", available, "buf", st.BufferSamples, "reset", st.Resets, "drop", st.Dropped) + readStart := time.Now() allIQ, err := srcMgr.ReadIQ(available) if err != nil { + if rt.telemetry != nil { + rt.telemetry.IncCounter("capture.read.error", 1, nil) + } return nil, err } + if rt.telemetry != nil { + rt.telemetry.Observe("capture.read.duration_ms", float64(time.Since(readStart).Microseconds())/1000.0, nil) + rt.telemetry.Observe("capture.read.samples", float64(len(allIQ)), nil) + } if rec != nil { + ingestStart := time.Now() rec.Ingest(time.Now(), allIQ) + if rt.telemetry != nil { + rt.telemetry.Observe("capture.ingest.duration_ms", float64(time.Since(ingestStart).Microseconds())/1000.0, nil) + } } // Cap allIQ for downstream extraction to prevent buffer bloat. // Without this cap, buffer accumulation during processing stalls causes @@ -395,6 +426,13 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag if len(allIQ) > maxStreamSamples { allIQ = allIQ[len(allIQ)-maxStreamSamples:] streamDropped = true + if rt.telemetry != nil { + rt.telemetry.IncCounter("capture.stream_drop.count", 1, nil) + rt.telemetry.Event("iq_dropped", "warn", "capture IQ dropped before extraction", nil, map[string]any{ + "max_stream_samples": maxStreamSamples, + "required": required, + }) + } } logging.Debug("capture", "iq_len", "len", len(allIQ), "surv_fft", rt.cfg.FFTSize, "detail_fft", rt.detailFFT) survIQ := allIQ @@ -407,6 +445,9 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag } if rt.dcEnabled { dcBlocker.Apply(allIQ) + if rt.telemetry != nil { + rt.telemetry.IncCounter("dsp.dc_block.apply", 1, nil) + } } if rt.iqEnabled { dsp.IQBalance(survIQ) @@ -415,6 +456,17 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag dsp.IQBalance(detailIQ) } } + if rt.telemetry != nil { + rt.telemetry.SetGauge("iq.stage.all.length", float64(len(allIQ)), nil) + rt.telemetry.SetGauge("iq.stage.surveillance.length", float64(len(survIQ)), nil) + rt.telemetry.SetGauge("iq.stage.detail.length", float64(len(detailIQ)), nil) + rt.telemetry.Observe("capture.total.duration_ms", float64(time.Since(start).Microseconds())/1000.0, nil) + if rt.telemetry.ShouldSampleHeavy() { + observeIQStats(rt.telemetry, "capture_all", allIQ, nil) + observeIQStats(rt.telemetry, "capture_surveillance", survIQ, nil) + observeIQStats(rt.telemetry, "capture_detail", detailIQ, nil) + } + } survSpectrum := rt.spectrumFromIQ(survIQ, gpuState) sanitizeSpectrum(survSpectrum) detailSpectrum := survSpectrum @@ -457,6 +509,10 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag } now := time.Now() finished, detected := rt.det.Process(now, survSpectrum, rt.cfg.CenterHz) + if rt.telemetry != nil { + rt.telemetry.SetGauge("signals.detected.count", float64(len(detected)), nil) + rt.telemetry.SetGauge("signals.finished.count", float64(len(finished)), nil) + } return &spectrumArtifacts{ allIQ: allIQ, streamDropped: streamDropped, diff --git a/cmd/sdrd/source_manager.go b/cmd/sdrd/source_manager.go index 606e6e8..4f58a54 100644 --- a/cmd/sdrd/source_manager.go +++ b/cmd/sdrd/source_manager.go @@ -1,11 +1,16 @@ package main import ( + "fmt" + "time" + "sdr-wideband-suite/internal/config" "sdr-wideband-suite/internal/sdr" + "sdr-wideband-suite/internal/telemetry" ) func (m *sourceManager) Restart(cfg config.Config) error { + start := time.Now() m.mu.Lock() defer m.mu.Unlock() old := m.src @@ -14,15 +19,27 @@ func (m *sourceManager) Restart(cfg config.Config) error { if err != nil { _ = old.Start() m.src = old + if m.telemetry != nil { + m.telemetry.IncCounter("source.restart.error", 1, nil) + m.telemetry.Event("source_restart_failed", "warn", "source restart failed", nil, map[string]any{"error": err.Error()}) + } return err } if err := next.Start(); err != nil { _ = next.Stop() _ = old.Start() m.src = old + if m.telemetry != nil { + m.telemetry.IncCounter("source.restart.error", 1, nil) + m.telemetry.Event("source_restart_failed", "warn", "source restart failed", nil, map[string]any{"error": err.Error()}) + } return err } m.src = next + if m.telemetry != nil { + m.telemetry.IncCounter("source.restart.count", 1, nil) + m.telemetry.Observe("source.restart.duration_ms", float64(time.Since(start).Milliseconds()), nil) + } return nil } @@ -44,7 +61,11 @@ func (m *sourceManager) Flush() { } func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager { - return &sourceManager{src: src, newSource: newSource} + return newSourceManagerWithTelemetry(src, newSource, nil) +} + +func newSourceManagerWithTelemetry(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error), coll *telemetry.Collector) *sourceManager { + return &sourceManager{src: src, newSource: newSource, telemetry: coll} } func (m *sourceManager) Start() error { @@ -60,9 +81,27 @@ func (m *sourceManager) Stop() error { } func (m *sourceManager) ReadIQ(n int) ([]complex64, error) { + waitStart := time.Now() m.mu.RLock() + wait := time.Since(waitStart) defer m.mu.RUnlock() - return m.src.ReadIQ(n) + if m.telemetry != nil { + m.telemetry.Observe("source.lock_wait_ms", float64(wait.Microseconds())/1000.0, telemetry.TagsFromPairs("lock", "read")) + if wait > 2*time.Millisecond { + m.telemetry.IncCounter("source.lock_contention.count", 1, telemetry.TagsFromPairs("lock", "read")) + } + } + readStart := time.Now() + out, err := m.src.ReadIQ(n) + if m.telemetry != nil { + tags := telemetry.TagsFromPairs("requested", fmt.Sprintf("%d", n)) + m.telemetry.Observe("source.read.duration_ms", float64(time.Since(readStart).Microseconds())/1000.0, tags) + m.telemetry.SetGauge("source.read.samples", float64(len(out)), nil) + if err != nil { + m.telemetry.IncCounter("source.read.error", 1, nil) + } + } + return out, err } func (m *sourceManager) ApplyConfig(cfg config.Config) error { diff --git a/cmd/sdrd/types.go b/cmd/sdrd/types.go index c96e5c6..0e36748 100644 --- a/cmd/sdrd/types.go +++ b/cmd/sdrd/types.go @@ -11,6 +11,7 @@ import ( "sdr-wideband-suite/internal/detector" "sdr-wideband-suite/internal/pipeline" "sdr-wideband-suite/internal/sdr" + "sdr-wideband-suite/internal/telemetry" ) type SpectrumDebug struct { @@ -110,6 +111,7 @@ type sourceManager struct { mu sync.RWMutex src sdr.Source newSource func(cfg config.Config) (sdr.Source, error) + telemetry *telemetry.Collector } type extractionManager struct { diff --git a/config.yaml b/config.yaml index 6d57eb6..3d1b720 100644 --- a/config.yaml +++ b/config.yaml @@ -251,6 +251,18 @@ decoder: debug: audio_dump_enabled: false cpu_monitoring: false + telemetry: + enabled: true + heavy_enabled: false + heavy_sample_every: 12 + metric_sample_every: 2 + metric_history_max: 12000 + event_history_max: 4000 + retention_seconds: 900 + persist_enabled: true + persist_dir: debug/telemetry + rotate_mb: 16 + keep_files: 8 logging: level: debug categories: [gap, prefir, boundary] diff --git a/docs/audio-click-debug-notes-2026-03-24.md b/docs/audio-click-debug-notes-2026-03-24.md index a11c38c..4f4824f 100644 --- a/docs/audio-click-debug-notes-2026-03-24.md +++ b/docs/audio-click-debug-notes-2026-03-24.md @@ -210,6 +210,12 @@ This should **not** be reintroduced casually. A temporary mechanism exists to force stable extraction block sizes. This is useful diagnostically because it removes one source of pipeline variability. +**IMPORTANT DECISION / DO NOT LOSE:** +- The fixed read-size path currently lives behind the environment variable `SDR_FORCE_FIXED_STREAM_READ_SAMPLES`. +- The tested value `389120` clearly helps by making `allIQ`, `gpuIQ_len`, `raw_len`, and `out_len` much more stable and by reducing one major source of pipeline variability. +- Current plan: **once the remaining click root cause is solved, promote this behavior into the normal code path instead of leaving it as an env-var-only debug switch.** +- In other words: treat fixed read sizing as a likely permanent stabilization improvement, but do not bake it in blindly until the click investigation is complete. + ### 3. FM discriminator metering exists `internal/demod/fm.go` now emits targeted discriminator stats under `discrim` logging, including: - min/max IQ magnitude @@ -260,15 +266,18 @@ Interpretation: ### 8. Current architectural conclusion The likely clean fix is **not** to keep trimming samples away. -Instead, the likely correct direction is: -- replace the current “stateful FIR, then separate decimation” handoff with a **stateful decimating FIR / polyphase decimator** -- preserve phase and delay state explicitly -- ensure the first emitted decimated samples are already truly valid for demodulation +The FIR/decimation section is still suspicious, but later tests showed it is likely not the sole origin. Important nuance: - the currently suspicious FIR + decimation section is already running in **Go/CPU** (`processSnippet`), not in CUDA - therefore the next correctness fix should be developed and validated in Go first +Later update: +- a stateful decimating FIR / polyphase-style replacement was implemented in Go and tested +- it was architecturally cleaner than the old separated FIR->decimate handoff +- but it did **not** remove the recurring hot spot / clicks +- therefore the old handoff was not the whole root cause, even if the newer path is still cleaner + --- ## Best current hypothesis @@ -296,18 +305,21 @@ Crucially: This strongly suggests a **settling/transient zone at the beginning of the decimated IQ block**. +Later refinements to this theory: +- pre-FIR probing originally looked cleaner than post-FIR probing, which made FIR/decimation look like the main culprit +- however, a temporary FIR bypass showed the clicks were still present, only somewhat quieter / less aggressive +- this indicates the pre-demod FIR likely amplifies or sharpens an upstream issue, but is not the sole origin +- a cleaner stateful decimating FIR implementation also failed to eliminate the recurring hot spot, further weakening the idea that the old FIR->decimate handoff alone caused the bug + --- ## Recommended next steps -1. Run with reduced logging only (`demod`, `gap`, `boundary`) unless discriminator logging is specifically needed again. -2. Keep heavy dump features OFF unless explicitly needed. -3. Treat the beginning of the `dec` block as the highest-priority investigation zone. -4. Continue analysing whether the observed issue is: - - an expected FIR/decimation settling region being handled incorrectly, or - - evidence that corrupted IQ is already entering the pre-demod FIR -5. When testing fixes, prefer low-overhead, theory-driven experiments over broad logging/dump spam. -6. Only re-enable audio dump windows selectively and briefly. +1. Run with reduced logging only and keep heavy dump features OFF unless explicitly needed. +2. Continue investigating the extractor path and its immediate surroundings (`extractForStreaming`, signal parameter source, offset/BW stability, overlap/trim behavior). +3. Treat FIR/decimation as a possible amplifier/focuser of the issue, but not the only suspect. +4. When testing fixes, prefer low-overhead, theory-driven experiments over broad logging/dump spam. +5. Only re-enable audio dump windows selectively and briefly. --- diff --git a/docs/telemetry-debug-runbook.md b/docs/telemetry-debug-runbook.md new file mode 100644 index 0000000..4363b5e --- /dev/null +++ b/docs/telemetry-debug-runbook.md @@ -0,0 +1,55 @@ +# Debug Telemetry Runbook + +This project now includes structured server-side telemetry for the audio/DSP pipeline. + +## Endpoints + +- `GET /api/debug/telemetry/live` + - Current counters/gauges/distributions and recent events. +- `GET /api/debug/telemetry/history` + - Historical metric samples. + - Query params: + - `since`, `until`: unix seconds/ms or RFC3339 + - `limit` + - `name`, `prefix` + - `signal_id`, `session_id`, `stage`, `trace_id`, `component` + - `tag_=` for arbitrary tag filters + - `include_persisted=true|false` +- `GET /api/debug/telemetry/events` + - Historical events/anomalies. + - Same filters as history plus `level`. +- `GET /api/debug/telemetry/config` + - Active telemetry config from runtime + collector. +- `POST /api/debug/telemetry/config` + - Runtime config update (also saved to autosave config). + +## Config knobs + +`debug.telemetry` in config: + +- `enabled` +- `heavy_enabled` +- `heavy_sample_every` +- `metric_sample_every` +- `metric_history_max` +- `event_history_max` +- `retention_seconds` +- `persist_enabled` +- `persist_dir` +- `rotate_mb` +- `keep_files` + +Persisted JSONL files rotate in `persist_dir` (default: `debug/telemetry`). + +## 5-10 minute debug flow + +1. Keep `enabled=true`, `heavy_enabled=false`, `persist_enabled=true`. +2. Run workload for 5-10 minutes. +3. Pull live state: + - `GET /api/debug/telemetry/live` +4. Pull anomalies: + - `GET /api/debug/telemetry/events?since=&level=warn` +5. Pull pipeline timing and queue/backpressure: + - `GET /api/debug/telemetry/history?since=&prefix=stage.` + - `GET /api/debug/telemetry/history?since=&prefix=streamer.` +6. If IQ boundary issues persist, temporarily set `heavy_enabled=true` (keep sampling coarse with `heavy_sample_every` > 1), rerun, then inspect `iq.*` metrics and `audio.*` anomalies by `signal_id`/`session_id`. diff --git a/internal/config/config.go b/internal/config/config.go index 5651873..66f0c9a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -99,6 +99,21 @@ type DecoderConfig struct { type DebugConfig struct { AudioDumpEnabled bool `yaml:"audio_dump_enabled" json:"audio_dump_enabled"` CPUMonitoring bool `yaml:"cpu_monitoring" json:"cpu_monitoring"` + Telemetry TelemetryConfig `yaml:"telemetry" json:"telemetry"` +} + +type TelemetryConfig struct { + Enabled bool `yaml:"enabled" json:"enabled"` + HeavyEnabled bool `yaml:"heavy_enabled" json:"heavy_enabled"` + HeavySampleEvery int `yaml:"heavy_sample_every" json:"heavy_sample_every"` + MetricSampleEvery int `yaml:"metric_sample_every" json:"metric_sample_every"` + MetricHistoryMax int `yaml:"metric_history_max" json:"metric_history_max"` + EventHistoryMax int `yaml:"event_history_max" json:"event_history_max"` + RetentionSeconds int `yaml:"retention_seconds" json:"retention_seconds"` + PersistEnabled bool `yaml:"persist_enabled" json:"persist_enabled"` + PersistDir string `yaml:"persist_dir" json:"persist_dir"` + RotateMB int `yaml:"rotate_mb" json:"rotate_mb"` + KeepFiles int `yaml:"keep_files" json:"keep_files"` } type PipelineGoalConfig struct { @@ -430,6 +445,19 @@ func Default() Config { Debug: DebugConfig{ AudioDumpEnabled: false, CPUMonitoring: false, + Telemetry: TelemetryConfig{ + Enabled: true, + HeavyEnabled: false, + HeavySampleEvery: 12, + MetricSampleEvery: 2, + MetricHistoryMax: 12000, + EventHistoryMax: 4000, + RetentionSeconds: 900, + PersistEnabled: false, + PersistDir: "debug/telemetry", + RotateMB: 16, + KeepFiles: 8, + }, }, Logging: LogConfig{ Level: "informal", @@ -674,6 +702,30 @@ func applyDefaults(cfg Config) Config { if cfg.Recorder.ExtractionBwMult <= 0 { cfg.Recorder.ExtractionBwMult = 1.2 } + if cfg.Debug.Telemetry.HeavySampleEvery <= 0 { + cfg.Debug.Telemetry.HeavySampleEvery = 12 + } + if cfg.Debug.Telemetry.MetricSampleEvery <= 0 { + cfg.Debug.Telemetry.MetricSampleEvery = 2 + } + if cfg.Debug.Telemetry.MetricHistoryMax <= 0 { + cfg.Debug.Telemetry.MetricHistoryMax = 12000 + } + if cfg.Debug.Telemetry.EventHistoryMax <= 0 { + cfg.Debug.Telemetry.EventHistoryMax = 4000 + } + if cfg.Debug.Telemetry.RetentionSeconds <= 0 { + cfg.Debug.Telemetry.RetentionSeconds = 900 + } + if cfg.Debug.Telemetry.PersistDir == "" { + cfg.Debug.Telemetry.PersistDir = "debug/telemetry" + } + if cfg.Debug.Telemetry.RotateMB <= 0 { + cfg.Debug.Telemetry.RotateMB = 16 + } + if cfg.Debug.Telemetry.KeepFiles <= 0 { + cfg.Debug.Telemetry.KeepFiles = 8 + } return cfg } diff --git a/internal/demod/gpudemod/build/gpudemod_kernels.exp b/internal/demod/gpudemod/build/gpudemod_kernels.exp new file mode 100644 index 0000000000000000000000000000000000000000..5ae420e5db8ad3c2f0a4b1c6cd874778d112b0be GIT binary patch literal 2824 zcmeHJ&u`pB6n@FJO`Fn^@T;_ic3Xl-fUfpONE2G5Ra>E=wxMcD)q^G1@k|m^ukCp3 zWQ#a(K!^jCIB?*|5hRZM5g;MdKY;^6>XADVeD68lcM&neYeYBqr&;%W%Lo`W;=_xuw>rtw7FI4t@9S7Ps4dweH+P;$p z?uPC3Y#nTAKa8~Prpg&ID(!kbCwY>^B(cUAm9A@EJX(BGhpo1~r@3b<&$zKSYOYvs z+GfLBOGZq&YujsRV^-}b4pr(F`{&+xy+mibvLh)C5`4CR&CE$Kxb>IPEd}DEOVVwoOAnYFSps+WA$At0iJtV9Nd{Nj2a8lSj@VGD^ zcv#pX@Px1k$g8N)Mc_$c3Gj%pE|6DNqdt&tb&ak7Ul#T;@M&RJfv1Fh0-O?d9e7&U zr@(1pH-U9wp97x}b{lv`*cZTOh1~_774{|YIbjy?6=7cipBJ_d_y?X-h)b8ocoMq& zw)KJ0v9UgM0_#e+9q`5*%c%I=wuhK@0(H)$UZO28NO1if-;!9c@}RI*Q_oj}<{(CN zKcYA2;xvcSBc4ghQEbL9DbWqjjYAWzCH2``bM@=*EnlN=>78kEx79yi66wBX@0)SB z9y`7IyPmI2eYU^2FlX&faK|&LHd zIh;*S!jaFe=r~9XXJO<}C>x6dyb2!U3NN22#V;e@h8h*Wfqe8eG9N8FJ=~Zd(8_$`^=JzX$x*qI zdEG1cY`-5XqOS+a%yJW@ML(64HvL{wIx=%6Zx>DU0;5 zq%6^Q!%C}3KbMpiJuWG2`m>~TXbRf}5j!SpZeh&>1+rv)uDS5eW;sv@AX6^n&4I!> zFfW emit now) } +func (f *StatefulDecimatingFIRComplex) Phase() int { + if f == nil { + return 0 + } + return f.phase +} + +func (f *StatefulDecimatingFIRComplex) TapsLen() int { + if f == nil { + return 0 + } + return len(f.taps) +} + func NewStatefulDecimatingFIRComplex(taps []float64, factor int) *StatefulDecimatingFIRComplex { if factor < 1 { factor = 1 diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go index ef1b113..a03b378 100644 --- a/internal/recorder/recorder.go +++ b/internal/recorder/recorder.go @@ -12,6 +12,7 @@ import ( "sdr-wideband-suite/internal/demod/gpudemod" "sdr-wideband-suite/internal/detector" + "sdr-wideband-suite/internal/telemetry" ) type Policy struct { @@ -54,9 +55,10 @@ type Manager struct { streamer *Streamer streamedIDs map[int64]bool // signal IDs that were streamed (skip retroactive recording) streamedMu sync.Mutex + telemetry *telemetry.Collector } -func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) *Manager { +func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string, coll *telemetry.Collector) *Manager { if policy.OutputDir == "" { policy.OutputDir = "data/recordings" } @@ -71,8 +73,9 @@ func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeC centerHz: centerHz, decodeCommands: decodeCommands, queue: make(chan detector.Event, 64), - streamer: newStreamer(policy, centerHz), + streamer: newStreamer(policy, centerHz, coll), streamedIDs: make(map[int64]bool), + telemetry: coll, } m.initGPUDemod(sampleRate, blockSize) m.workerWG.Add(1) @@ -103,6 +106,13 @@ func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz if m.streamer != nil { m.streamer.updatePolicy(policy, centerHz) } + if m.telemetry != nil { + m.telemetry.Event("recorder_update", "info", "recorder policy updated", nil, map[string]any{ + "sample_rate": sampleRate, + "block_size": blockSize, + "enabled": policy.Enabled, + }) + } } func (m *Manager) Ingest(t0 time.Time, samples []complex64) { @@ -116,6 +126,9 @@ func (m *Manager) Ingest(t0 time.Time, samples []complex64) { return } ring.Push(t0, samples) + if m.telemetry != nil { + m.telemetry.SetGauge("recorder.ring.push_samples", float64(len(samples)), nil) + } } func (m *Manager) OnEvents(events []detector.Event) { @@ -134,8 +147,14 @@ func (m *Manager) OnEvents(events []detector.Event) { case m.queue <- ev: default: // drop if queue full + if m.telemetry != nil { + m.telemetry.IncCounter("recorder.event_queue.drop", 1, nil) + } } } + if m.telemetry != nil { + m.telemetry.SetGauge("recorder.event_queue.len", float64(len(m.queue)), nil) + } } func (m *Manager) worker() { diff --git a/internal/recorder/streamer.go b/internal/recorder/streamer.go index be7fb70..3e61c6e 100644 --- a/internal/recorder/streamer.go +++ b/internal/recorder/streamer.go @@ -20,6 +20,7 @@ import ( "sdr-wideband-suite/internal/detector" "sdr-wideband-suite/internal/dsp" "sdr-wideband-suite/internal/logging" + "sdr-wideband-suite/internal/telemetry" ) // --------------------------------------------------------------------------- @@ -27,6 +28,7 @@ import ( // --------------------------------------------------------------------------- type streamSession struct { + sessionID string signalID int64 centerHz float64 bwHz float64 @@ -54,9 +56,10 @@ type streamSession struct { prevDecIQ complex64 lastDecIQSet bool - lastDemodL float32 - prevDemodL float64 - lastDemodSet bool + lastDemodL float32 + prevDemodL float64 + lastDemodSet bool + snippetSeq uint64 // listenOnly sessions have no WAV file and no disk I/O. // They exist solely to feed audio to live-listen subscribers. @@ -226,6 +229,7 @@ type streamFeedItem struct { type streamFeedMsg struct { traceID uint64 items []streamFeedItem + enqueuedAt time.Time } type Streamer struct { @@ -245,6 +249,7 @@ type Streamer struct { // pendingListens are subscribers waiting for a matching session. pendingListens map[int64]*pendingListen + telemetry *telemetry.Collector } type pendingListen struct { @@ -254,7 +259,7 @@ type pendingListen struct { ch chan []byte } -func newStreamer(policy Policy, centerHz float64) *Streamer { +func newStreamer(policy Policy, centerHz float64, coll *telemetry.Collector) *Streamer { st := &Streamer{ sessions: make(map[int64]*streamSession), policy: policy, @@ -262,6 +267,7 @@ func newStreamer(policy Policy, centerHz float64) *Streamer { feedCh: make(chan streamFeedMsg, 2), done: make(chan struct{}), pendingListens: make(map[int64]*pendingListen), + telemetry: coll, } go st.worker() return st @@ -349,18 +355,33 @@ func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) { if (!recEnabled && !hasListeners) || len(items) == 0 { return } + if st.telemetry != nil { + st.telemetry.SetGauge("streamer.feed.queue_len", float64(len(st.feedCh)), nil) + st.telemetry.SetGauge("streamer.pending_listeners", float64(pending), nil) + st.telemetry.Observe("streamer.feed.batch_size", float64(len(items)), nil) + } select { - case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}: + case st.feedCh <- streamFeedMsg{traceID: traceID, items: items, enqueuedAt: time.Now()}: default: st.droppedFeed++ logging.Warn("drop", "feed_drop", "count", st.droppedFeed) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.feed.drop", 1, nil) + st.telemetry.Event("stream_feed_drop", "warn", "feed queue full", nil, map[string]any{ + "trace_id": traceID, + "queue_len": len(st.feedCh), + }) + } } } // processFeed runs in the worker goroutine. func (st *Streamer) processFeed(msg streamFeedMsg) { + procStart := time.Now() + lockStart := time.Now() st.mu.Lock() + lockWait := time.Since(lockStart) recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ) hasListeners := st.hasListenersLocked() now := time.Now() @@ -368,10 +389,24 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { gap := now.Sub(st.lastProcTS) if gap > 150*time.Millisecond { logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.process.gap.count", 1, nil) + st.telemetry.Observe("streamer.process.gap_ms", float64(gap.Milliseconds()), nil) + } } } st.lastProcTS = now defer st.mu.Unlock() + defer func() { + if st.telemetry != nil { + st.telemetry.Observe("streamer.process.total_ms", float64(time.Since(procStart).Microseconds())/1000.0, nil) + st.telemetry.Observe("streamer.lock_wait_ms", float64(lockWait.Microseconds())/1000.0, telemetry.TagsFromPairs("lock", "process")) + } + }() + if st.telemetry != nil { + st.telemetry.Observe("streamer.feed.enqueue_delay_ms", float64(now.Sub(msg.enqueuedAt).Microseconds())/1000.0, nil) + st.telemetry.SetGauge("streamer.sessions.active", float64(len(st.sessions)), nil) + } logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items)) @@ -434,6 +469,9 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { if err != nil { log.Printf("STREAM: open failed signal=%d %.1fMHz: %v", sig.ID, sig.CenterHz/1e6, err) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.session.open_error", 1, telemetry.TagsFromPairs("kind", "recording")) + } continue } st.sessions[sig.ID] = s @@ -445,6 +483,13 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { } // Attach any pending listeners st.attachPendingListeners(sess) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.session.open", 1, telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID))) + st.telemetry.Event("session_open", "info", "stream session opened", telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{ + "listen_only": sess.listenOnly, + "demod": sess.demodName, + }) + } } // Update metadata @@ -463,10 +508,17 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { // Demod with persistent state logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate) - audio, audioRate := sess.processSnippet(item.snippet, item.snipRate) + audioStart := time.Now() + audio, audioRate := sess.processSnippet(item.snippet, item.snipRate, st.telemetry) + if st.telemetry != nil { + st.telemetry.Observe("streamer.process_snippet_ms", float64(time.Since(audioStart).Microseconds())/1000.0, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + } logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate) if len(audio) == 0 { logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.audio.empty", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID))) + } } if len(audio) > 0 { if sess.wavSamples == 0 && audioRate > 0 { @@ -493,6 +545,10 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { gap := time.Since(sess.lastAudioTs) if gap > 150*time.Millisecond { logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds()) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.audio.gap.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID))) + st.telemetry.Observe("streamer.audio.gap_ms", float64(gap.Milliseconds()), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID))) + } } } // Transient click detector: finds short impulses (1-3 samples) @@ -519,6 +575,10 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first) if d2 > 0.15 { logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2) + if st.telemetry != nil { + st.telemetry.IncCounter("audio.boundary_click.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + st.telemetry.Observe("audio.boundary_click.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID))) + } } } @@ -541,6 +601,10 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { } if nClicks > 0 { logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames) + if st.telemetry != nil { + st.telemetry.IncCounter("audio.intra_click.count", float64(nClicks), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + st.telemetry.Observe("audio.intra_click.max_d2", maxD2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID))) + } } // Store last two samples for next frame's boundary check @@ -580,6 +644,13 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { s.audioSubs = oldSubs s.restoreDSPState(oldState) st.sessions[sig.ID] = s + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.session.reopen", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID))) + st.telemetry.Event("session_reopen", "info", "stream session rotated by max duration", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{ + "old_session": sess.sessionID, + "new_session": s.sessionID, + }) + } } } @@ -600,6 +671,13 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { if !sess.listenOnly { closeSession(sess, &st.policy) } + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID)) + st.telemetry.Event("session_close", "info", "stream session closed", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID), map[string]any{ + "reason": "signal_missing", + "listen_only": sess.listenOnly, + }) + } delete(st.sessions, id) } } @@ -693,12 +771,18 @@ func (st *Streamer) CloseAll() { if !sess.listenOnly { closeSession(sess, &st.policy) } + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID)) + } delete(st.sessions, id) } for _, pl := range st.pendingListens { close(pl.ch) } st.pendingListens = nil + if st.telemetry != nil { + st.telemetry.Event("streamer_close_all", "info", "all stream sessions closed", nil, nil) + } } // ActiveSessions returns the number of open streaming sessions. @@ -755,6 +839,9 @@ func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64 if audioDumpEnabled { log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", bestSess.signalID, bestSess.debugDumpStart.Format(time.RFC3339), bestSess.debugDumpUntil.Format(time.RFC3339)) } + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.listener.attach", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", bestSess.signalID), "session_id", bestSess.sessionID)) + } return subID, ch, info, nil } @@ -768,6 +855,10 @@ func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64 info := defaultAudioInfoForMode(mode) log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6) log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.listener.pending", 1, nil) + st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil) + } return subID, ch, info, nil } @@ -779,6 +870,10 @@ func (st *Streamer) UnsubscribeAudio(subID int64) { if pl, ok := st.pendingListens[subID]; ok { close(pl.ch) delete(st.pendingListens, subID) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "pending")) + st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil) + } return } @@ -787,6 +882,9 @@ func (st *Streamer) UnsubscribeAudio(subID int64) { if sub.id == subID { close(sub.ch) sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "active", "session_id", sess.sessionID)) + } return } } @@ -800,10 +898,13 @@ func (st *Streamer) UnsubscribeAudio(subID int64) { // processSnippet takes a pre-extracted IQ snippet and demodulates it with // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz // output with zero transient artifacts. -func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) { +func (sess *streamSession) processSnippet(snippet []complex64, snipRate int, coll *telemetry.Collector) ([]float32, int) { if len(snippet) == 0 || snipRate <= 0 { return nil, 0 } + if coll != nil { + coll.SetGauge("iq.stage.snippet.length", float64(len(snippet)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + } isWFMStereo := sess.demodName == "WFM_STEREO" isWFM := sess.demodName == "WFM" || isWFMStereo @@ -899,11 +1000,24 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] sess.preDemodCutoff = cutoff sess.preDemodDecim = decim1 sess.preDemodDecimPhase = 0 + if coll != nil { + coll.IncCounter("dsp.pre_demod.init", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + coll.Event("prefir_reinit", "info", "pre-demod decimator reinitialized", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{ + "snip_rate": snipRate, + "cutoff_hz": cutoff, + "decim": decim1, + }) + } } decimPhaseBefore := sess.preDemodDecimPhase filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip))) dec = sess.preDemodDecimator.Process(fullSnip) + sess.preDemodDecimPhase = sess.preDemodDecimator.Phase() + if coll != nil { + coll.Observe("dsp.pre_demod.decimation_factor", float64(decim1), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + coll.SetGauge("iq.stage.pre_demod.length", float64(len(dec)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + } logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase) } else { logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0) @@ -913,6 +1027,9 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] if decHeadTrimSamples > 0 && decHeadTrimSamples < len(dec) { logging.Warn("boundary", "dec_head_trim_applied", "signal", sess.signalID, "trim", decHeadTrimSamples, "before_len", len(dec)) dec = dec[decHeadTrimSamples:] + if coll != nil { + coll.IncCounter("dsp.pre_demod.head_trim", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID))) + } } if logging.EnabledCategory("boundary") && len(dec) > 0 { @@ -923,6 +1040,10 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] d2Mag := math.Hypot(d2Re, d2Im) if d2Mag > 0.15 { logging.Warn("boundary", "dec_iq_boundary", "signal", sess.signalID, "d2", d2Mag) + if coll != nil { + coll.IncCounter("iq.dec.boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + coll.Observe("iq.dec.boundary.d2", d2Mag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID))) + } } } @@ -968,6 +1089,9 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] if ratio < 0.75 || ratio > 1.25 { logging.Warn("boundary", "dec_iq_head_tail_skew", "signal", sess.signalID, "head_avg", headAvg, "tail_avg", tailAvg, "ratio", ratio) } + if coll != nil { + coll.Observe("iq.dec.head_tail_ratio", ratio, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + } } probeN := 64 @@ -1003,6 +1127,11 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] if maxHeadStep > 1.5 { logging.Warn("boundary", "dec_iq_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx, "min_mag", minHeadMag, "min_idx", minHeadIdx) } + if coll != nil { + coll.Observe("iq.dec.magnitude.min", minMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + coll.Observe("iq.dec.magnitude.max", maxMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + coll.Observe("iq.dec.phase_step.max", maxHeadStep, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + } if len(dec) >= 2 { sess.prevDecIQ = dec[len(dec)-2] @@ -1019,6 +1148,9 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] if len(audio) == 0 { return nil, 0 } + if coll != nil { + coll.SetGauge("audio.stage.demod.length", float64(len(audio)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + } if logging.EnabledCategory("boundary") { stride := d.Channels() if stride < 1 { @@ -1031,6 +1163,10 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] d2 := math.Abs(2*float64(sess.lastDemodL) - sess.prevDemodL - first) if d2 > 0.15 { logging.Warn("boundary", "demod_boundary", "signal", sess.signalID, "d2", d2) + if coll != nil { + coll.IncCounter("audio.demod_boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + coll.Observe("audio.demod_boundary.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID))) + } } } if nFrames >= 2 { @@ -1099,6 +1235,12 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate) sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps) sess.stereoResamplerRate = actualDemodRate + if coll != nil { + coll.Event("resampler_reset", "info", "stereo resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{ + "mode": "stereo", + "rate": actualDemodRate, + }) + } } audio = sess.stereoResampler.Process(audio) } else { @@ -1106,10 +1248,19 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate) sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps) sess.monoResamplerRate = actualDemodRate + if coll != nil { + coll.Event("resampler_reset", "info", "mono resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{ + "mode": "mono", + "rate": actualDemodRate, + }) + } } audio = sess.monoResampler.Process(audio) } } + if coll != nil { + coll.SetGauge("audio.stage.output.length", float64(len(audio)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + } // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) --- if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 { @@ -1429,6 +1580,7 @@ func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (* playbackMode, stereoState := initialPlaybackState(demodName) sess := &streamSession{ + sessionID: fmt.Sprintf("%d-%d-r", sig.ID, now.UnixMilli()), signalID: sig.ID, centerHz: sig.CenterHz, bwHz: sig.BWHz, @@ -1473,6 +1625,7 @@ func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *stre playbackMode, stereoState := initialPlaybackState(demodName) sess := &streamSession{ + sessionID: fmt.Sprintf("%d-%d-l", sig.ID, now.UnixMilli()), signalID: sig.ID, centerHz: sig.CenterHz, bwHz: sig.BWHz, @@ -1677,10 +1830,16 @@ func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) { default: st.droppedPCM++ logging.Warn("drop", "pcm_drop", "count", st.droppedPCM) + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.pcm.drop", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + } } alive = append(alive, sub) } sess.audioSubs = alive + if st.telemetry != nil { + st.telemetry.SetGauge("streamer.subscribers.count", float64(len(alive)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)) + } } func (st *Streamer) classAllowed(cls *classifier.Classification) bool { @@ -1770,6 +1929,10 @@ func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels func (st *Streamer) ResetStreams() { st.mu.Lock() defer st.mu.Unlock() + if st.telemetry != nil { + st.telemetry.IncCounter("streamer.reset.count", 1, nil) + st.telemetry.Event("stream_reset", "warn", "stream DSP state reset", nil, map[string]any{"sessions": len(st.sessions)}) + } for _, sess := range st.sessions { sess.preDemodFIR = nil sess.preDemodDecimator = nil diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go new file mode 100644 index 0000000..837f868 --- /dev/null +++ b/internal/telemetry/telemetry.go @@ -0,0 +1,965 @@ +package telemetry + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +type Config struct { + Enabled bool `json:"enabled"` + HeavyEnabled bool `json:"heavy_enabled"` + HeavySampleEvery int `json:"heavy_sample_every"` + MetricSampleEvery int `json:"metric_sample_every"` + MetricHistoryMax int `json:"metric_history_max"` + EventHistoryMax int `json:"event_history_max"` + Retention time.Duration `json:"retention"` + PersistEnabled bool `json:"persist_enabled"` + PersistDir string `json:"persist_dir"` + RotateMB int `json:"rotate_mb"` + KeepFiles int `json:"keep_files"` +} + +func DefaultConfig() Config { + return Config{ + Enabled: true, + HeavyEnabled: false, + HeavySampleEvery: 12, + MetricSampleEvery: 2, + MetricHistoryMax: 12_000, + EventHistoryMax: 4_000, + Retention: 15 * time.Minute, + PersistEnabled: false, + PersistDir: "debug/telemetry", + RotateMB: 16, + KeepFiles: 8, + } +} + +type Tags map[string]string + +type MetricPoint struct { + Timestamp time.Time `json:"ts"` + Name string `json:"name"` + Type string `json:"type"` + Value float64 `json:"value"` + Tags Tags `json:"tags,omitempty"` +} + +type Event struct { + ID uint64 `json:"id"` + Timestamp time.Time `json:"ts"` + Name string `json:"name"` + Level string `json:"level"` + Message string `json:"message,omitempty"` + Tags Tags `json:"tags,omitempty"` + Fields map[string]any `json:"fields,omitempty"` +} + +type SeriesValue struct { + Name string `json:"name"` + Value float64 `json:"value"` + Tags Tags `json:"tags,omitempty"` +} + +type DistValue struct { + Name string `json:"name"` + Count int64 `json:"count"` + Min float64 `json:"min"` + Max float64 `json:"max"` + Mean float64 `json:"mean"` + Last float64 `json:"last"` + P95 float64 `json:"p95"` + Tags Tags `json:"tags,omitempty"` +} + +type LiveSnapshot struct { + Now time.Time `json:"now"` + StartedAt time.Time `json:"started_at"` + UptimeMs int64 `json:"uptime_ms"` + Config Config `json:"config"` + Counters []SeriesValue `json:"counters"` + Gauges []SeriesValue `json:"gauges"` + Distributions []DistValue `json:"distributions"` + RecentEvents []Event `json:"recent_events"` + Status map[string]any `json:"status,omitempty"` +} + +type Query struct { + From time.Time + To time.Time + Limit int + Name string + NamePrefix string + Level string + Tags Tags + IncludePersisted bool +} + +type collectorMetric struct { + name string + tags Tags + value float64 +} + +type distMetric struct { + name string + tags Tags + count int64 + sum float64 + min float64 + max float64 + last float64 + samples []float64 + next int + full bool +} + +type persistedEnvelope struct { + Kind string `json:"kind"` + Metric *MetricPoint `json:"metric,omitempty"` + Event *Event `json:"event,omitempty"` +} + +type Collector struct { + mu sync.RWMutex + cfg Config + startedAt time.Time + counterSeq uint64 + heavySeq uint64 + eventSeq uint64 + + counters map[string]*collectorMetric + gauges map[string]*collectorMetric + dists map[string]*distMetric + metricsHistory []MetricPoint + events []Event + status map[string]any + + writer *jsonlWriter +} + +func New(cfg Config) (*Collector, error) { + cfg = sanitizeConfig(cfg) + c := &Collector{ + cfg: cfg, + startedAt: time.Now().UTC(), + counters: map[string]*collectorMetric{}, + gauges: map[string]*collectorMetric{}, + dists: map[string]*distMetric{}, + metricsHistory: make([]MetricPoint, 0, cfg.MetricHistoryMax), + events: make([]Event, 0, cfg.EventHistoryMax), + status: map[string]any{}, + } + if cfg.PersistEnabled { + writer, err := newJSONLWriter(cfg) + if err != nil { + return nil, err + } + c.writer = writer + } + return c, nil +} + +func (c *Collector) Close() error { + if c == nil { + return nil + } + c.mu.Lock() + writer := c.writer + c.writer = nil + c.mu.Unlock() + if writer != nil { + return writer.Close() + } + return nil +} + +func (c *Collector) Configure(cfg Config) error { + if c == nil { + return nil + } + cfg = sanitizeConfig(cfg) + var writer *jsonlWriter + var err error + if cfg.PersistEnabled { + writer, err = newJSONLWriter(cfg) + if err != nil { + return err + } + } + c.mu.Lock() + old := c.writer + c.cfg = cfg + c.writer = writer + c.trimLocked(time.Now().UTC()) + c.mu.Unlock() + if old != nil { + _ = old.Close() + } + return nil +} + +func (c *Collector) Config() Config { + c.mu.RLock() + defer c.mu.RUnlock() + return c.cfg +} + +func (c *Collector) Enabled() bool { + if c == nil { + return false + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.cfg.Enabled +} + +func (c *Collector) ShouldSampleHeavy() bool { + if c == nil { + return false + } + c.mu.RLock() + cfg := c.cfg + c.mu.RUnlock() + if !cfg.Enabled || !cfg.HeavyEnabled { + return false + } + n := cfg.HeavySampleEvery + if n <= 1 { + return true + } + seq := atomic.AddUint64(&c.heavySeq, 1) + return seq%uint64(n) == 0 +} + +func (c *Collector) SetStatus(key string, value any) { + if c == nil { + return + } + c.mu.Lock() + c.status[key] = value + c.mu.Unlock() +} + +func (c *Collector) IncCounter(name string, delta float64, tags Tags) { + c.recordMetric("counter", name, delta, tags, true) +} + +func (c *Collector) SetGauge(name string, value float64, tags Tags) { + c.recordMetric("gauge", name, value, tags, false) +} + +func (c *Collector) Observe(name string, value float64, tags Tags) { + c.recordMetric("distribution", name, value, tags, false) +} + +func (c *Collector) Event(name string, level string, message string, tags Tags, fields map[string]any) { + if c == nil { + return + } + now := time.Now().UTC() + c.mu.Lock() + if !c.cfg.Enabled { + c.mu.Unlock() + return + } + ev := Event{ + ID: atomic.AddUint64(&c.eventSeq, 1), + Timestamp: now, + Name: name, + Level: strings.TrimSpace(strings.ToLower(level)), + Message: message, + Tags: cloneTags(tags), + Fields: cloneFields(fields), + } + if ev.Level == "" { + ev.Level = "info" + } + c.events = append(c.events, ev) + c.trimLocked(now) + writer := c.writer + c.mu.Unlock() + if writer != nil { + _ = writer.Write(persistedEnvelope{Kind: "event", Event: &ev}) + } +} + +func (c *Collector) recordMetric(kind string, name string, value float64, tags Tags, add bool) { + if c == nil || strings.TrimSpace(name) == "" { + return + } + now := time.Now().UTC() + c.mu.Lock() + if !c.cfg.Enabled { + c.mu.Unlock() + return + } + key := metricKey(name, tags) + switch kind { + case "counter": + m := c.counters[key] + if m == nil { + m = &collectorMetric{name: name, tags: cloneTags(tags)} + c.counters[key] = m + } + if add { + m.value += value + } else { + m.value = value + } + case "gauge": + m := c.gauges[key] + if m == nil { + m = &collectorMetric{name: name, tags: cloneTags(tags)} + c.gauges[key] = m + } + m.value = value + case "distribution": + d := c.dists[key] + if d == nil { + d = &distMetric{ + name: name, + tags: cloneTags(tags), + min: value, + max: value, + samples: make([]float64, 64), + } + c.dists[key] = d + } + d.count++ + d.sum += value + d.last = value + if d.count == 1 || value < d.min { + d.min = value + } + if d.count == 1 || value > d.max { + d.max = value + } + if len(d.samples) > 0 { + d.samples[d.next] = value + d.next++ + if d.next >= len(d.samples) { + d.next = 0 + d.full = true + } + } + } + sampleN := c.cfg.MetricSampleEvery + seq := atomic.AddUint64(&c.counterSeq, 1) + shouldStore := sampleN <= 1 || seq%uint64(sampleN) == 0 || kind == "counter" + var mp MetricPoint + if shouldStore { + mp = MetricPoint{ + Timestamp: now, + Name: name, + Type: kind, + Value: value, + Tags: cloneTags(tags), + } + c.metricsHistory = append(c.metricsHistory, mp) + } + c.trimLocked(now) + writer := c.writer + c.mu.Unlock() + + if writer != nil && shouldStore { + _ = writer.Write(persistedEnvelope{Kind: "metric", Metric: &mp}) + } +} + +func (c *Collector) LiveSnapshot() LiveSnapshot { + now := time.Now().UTC() + c.mu.RLock() + cfg := c.cfg + out := LiveSnapshot{ + Now: now, + StartedAt: c.startedAt, + UptimeMs: now.Sub(c.startedAt).Milliseconds(), + Config: cfg, + Counters: make([]SeriesValue, 0, len(c.counters)), + Gauges: make([]SeriesValue, 0, len(c.gauges)), + Distributions: make([]DistValue, 0, len(c.dists)), + RecentEvents: make([]Event, 0, min(40, len(c.events))), + Status: cloneFields(c.status), + } + for _, m := range c.counters { + out.Counters = append(out.Counters, SeriesValue{Name: m.name, Value: m.value, Tags: cloneTags(m.tags)}) + } + for _, m := range c.gauges { + out.Gauges = append(out.Gauges, SeriesValue{Name: m.name, Value: m.value, Tags: cloneTags(m.tags)}) + } + for _, d := range c.dists { + mean := 0.0 + if d.count > 0 { + mean = d.sum / float64(d.count) + } + out.Distributions = append(out.Distributions, DistValue{ + Name: d.name, + Count: d.count, + Min: d.min, + Max: d.max, + Mean: mean, + Last: d.last, + P95: p95FromDist(d), + Tags: cloneTags(d.tags), + }) + } + start := len(c.events) - cap(out.RecentEvents) + if start < 0 { + start = 0 + } + for _, ev := range c.events[start:] { + out.RecentEvents = append(out.RecentEvents, copyEvent(ev)) + } + c.mu.RUnlock() + sort.Slice(out.Counters, func(i, j int) bool { return out.Counters[i].Name < out.Counters[j].Name }) + sort.Slice(out.Gauges, func(i, j int) bool { return out.Gauges[i].Name < out.Gauges[j].Name }) + sort.Slice(out.Distributions, func(i, j int) bool { return out.Distributions[i].Name < out.Distributions[j].Name }) + return out +} + +func (c *Collector) QueryMetrics(q Query) ([]MetricPoint, error) { + if c == nil { + return nil, nil + } + q = normalizeQuery(q) + c.mu.RLock() + items := make([]MetricPoint, 0, len(c.metricsHistory)) + for _, m := range c.metricsHistory { + if metricMatch(m, q) { + items = append(items, copyMetric(m)) + } + } + cfg := c.cfg + c.mu.RUnlock() + if q.IncludePersisted && cfg.PersistEnabled { + persisted, err := readPersistedMetrics(cfg, q) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, err + } + items = append(items, persisted...) + } + sort.Slice(items, func(i, j int) bool { + return items[i].Timestamp.Before(items[j].Timestamp) + }) + if q.Limit > 0 && len(items) > q.Limit { + items = items[len(items)-q.Limit:] + } + return items, nil +} + +func (c *Collector) QueryEvents(q Query) ([]Event, error) { + if c == nil { + return nil, nil + } + q = normalizeQuery(q) + c.mu.RLock() + items := make([]Event, 0, len(c.events)) + for _, ev := range c.events { + if eventMatch(ev, q) { + items = append(items, copyEvent(ev)) + } + } + cfg := c.cfg + c.mu.RUnlock() + if q.IncludePersisted && cfg.PersistEnabled { + persisted, err := readPersistedEvents(cfg, q) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, err + } + items = append(items, persisted...) + } + sort.Slice(items, func(i, j int) bool { + return items[i].Timestamp.Before(items[j].Timestamp) + }) + if q.Limit > 0 && len(items) > q.Limit { + items = items[len(items)-q.Limit:] + } + return items, nil +} + +func (c *Collector) trimLocked(now time.Time) { + if c.cfg.MetricHistoryMax > 0 && len(c.metricsHistory) > c.cfg.MetricHistoryMax { + c.metricsHistory = append([]MetricPoint(nil), c.metricsHistory[len(c.metricsHistory)-c.cfg.MetricHistoryMax:]...) + } + if c.cfg.EventHistoryMax > 0 && len(c.events) > c.cfg.EventHistoryMax { + c.events = append([]Event(nil), c.events[len(c.events)-c.cfg.EventHistoryMax:]...) + } + ret := c.cfg.Retention + if ret <= 0 { + return + } + cut := now.Add(-ret) + mStart := 0 + for mStart < len(c.metricsHistory) && c.metricsHistory[mStart].Timestamp.Before(cut) { + mStart++ + } + if mStart > 0 { + c.metricsHistory = append([]MetricPoint(nil), c.metricsHistory[mStart:]...) + } + eStart := 0 + for eStart < len(c.events) && c.events[eStart].Timestamp.Before(cut) { + eStart++ + } + if eStart > 0 { + c.events = append([]Event(nil), c.events[eStart:]...) + } +} + +func sanitizeConfig(cfg Config) Config { + def := DefaultConfig() + if cfg.HeavySampleEvery <= 0 { + cfg.HeavySampleEvery = def.HeavySampleEvery + } + if cfg.MetricSampleEvery <= 0 { + cfg.MetricSampleEvery = def.MetricSampleEvery + } + if cfg.MetricHistoryMax <= 0 { + cfg.MetricHistoryMax = def.MetricHistoryMax + } + if cfg.EventHistoryMax <= 0 { + cfg.EventHistoryMax = def.EventHistoryMax + } + if cfg.Retention <= 0 { + cfg.Retention = def.Retention + } + if strings.TrimSpace(cfg.PersistDir) == "" { + cfg.PersistDir = def.PersistDir + } + if cfg.RotateMB <= 0 { + cfg.RotateMB = def.RotateMB + } + if cfg.KeepFiles <= 0 { + cfg.KeepFiles = def.KeepFiles + } + return cfg +} + +func normalizeQuery(q Query) Query { + if q.Limit <= 0 || q.Limit > 5000 { + q.Limit = 500 + } + if q.Tags == nil { + q.Tags = Tags{} + } + return q +} + +func metricMatch(m MetricPoint, q Query) bool { + if !q.From.IsZero() && m.Timestamp.Before(q.From) { + return false + } + if !q.To.IsZero() && m.Timestamp.After(q.To) { + return false + } + if q.Name != "" && m.Name != q.Name { + return false + } + if q.NamePrefix != "" && !strings.HasPrefix(m.Name, q.NamePrefix) { + return false + } + for k, v := range q.Tags { + if m.Tags[k] != v { + return false + } + } + return true +} + +func eventMatch(ev Event, q Query) bool { + if !q.From.IsZero() && ev.Timestamp.Before(q.From) { + return false + } + if !q.To.IsZero() && ev.Timestamp.After(q.To) { + return false + } + if q.Name != "" && ev.Name != q.Name { + return false + } + if q.NamePrefix != "" && !strings.HasPrefix(ev.Name, q.NamePrefix) { + return false + } + if q.Level != "" && !strings.EqualFold(q.Level, ev.Level) { + return false + } + for k, v := range q.Tags { + if ev.Tags[k] != v { + return false + } + } + return true +} + +func metricKey(name string, tags Tags) string { + if len(tags) == 0 { + return name + } + keys := make([]string, 0, len(tags)) + for k := range tags { + keys = append(keys, k) + } + sort.Strings(keys) + var b strings.Builder + b.Grow(len(name) + len(keys)*16) + b.WriteString(name) + for _, k := range keys { + b.WriteString("|") + b.WriteString(k) + b.WriteString("=") + b.WriteString(tags[k]) + } + return b.String() +} + +func cloneTags(tags Tags) Tags { + if len(tags) == 0 { + return nil + } + out := make(Tags, len(tags)) + for k, v := range tags { + out[k] = v + } + return out +} + +func cloneFields(fields map[string]any) map[string]any { + if len(fields) == 0 { + return nil + } + out := make(map[string]any, len(fields)) + for k, v := range fields { + out[k] = v + } + return out +} + +func copyMetric(m MetricPoint) MetricPoint { + return MetricPoint{ + Timestamp: m.Timestamp, + Name: m.Name, + Type: m.Type, + Value: m.Value, + Tags: cloneTags(m.Tags), + } +} + +func copyEvent(ev Event) Event { + return Event{ + ID: ev.ID, + Timestamp: ev.Timestamp, + Name: ev.Name, + Level: ev.Level, + Message: ev.Message, + Tags: cloneTags(ev.Tags), + Fields: cloneFields(ev.Fields), + } +} + +func p95FromDist(d *distMetric) float64 { + if d == nil || d.count == 0 { + return 0 + } + n := d.next + if d.full { + n = len(d.samples) + } + if n <= 0 { + return d.last + } + buf := make([]float64, n) + copy(buf, d.samples[:n]) + sort.Float64s(buf) + idx := int(float64(n-1) * 0.95) + if idx < 0 { + idx = 0 + } + if idx >= n { + idx = n - 1 + } + return buf[idx] +} + +type jsonlWriter struct { + cfg Config + mu sync.Mutex + dir string + f *os.File + w *bufio.Writer + currentPath string + currentSize int64 + seq int64 +} + +func newJSONLWriter(cfg Config) (*jsonlWriter, error) { + dir := filepath.Clean(cfg.PersistDir) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, err + } + w := &jsonlWriter{cfg: cfg, dir: dir} + if err := w.rotateLocked(); err != nil { + return nil, err + } + return w, nil +} + +func (w *jsonlWriter) Write(v persistedEnvelope) error { + w.mu.Lock() + defer w.mu.Unlock() + if w.f == nil || w.w == nil { + return nil + } + line, err := json.Marshal(v) + if err != nil { + return err + } + line = append(line, '\n') + if w.currentSize+int64(len(line)) > int64(w.cfg.RotateMB)*1024*1024 { + if err := w.rotateLocked(); err != nil { + return err + } + } + n, err := w.w.Write(line) + w.currentSize += int64(n) + if err != nil { + return err + } + return w.w.Flush() +} + +func (w *jsonlWriter) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.w != nil { + _ = w.w.Flush() + } + if w.f != nil { + err := w.f.Close() + w.f = nil + w.w = nil + return err + } + return nil +} + +func (w *jsonlWriter) rotateLocked() error { + if w.w != nil { + _ = w.w.Flush() + } + if w.f != nil { + _ = w.f.Close() + } + w.seq++ + name := fmt.Sprintf("telemetry-%s-%04d.jsonl", time.Now().UTC().Format("20060102-150405"), w.seq) + path := filepath.Join(w.dir, name) + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return err + } + info, _ := f.Stat() + size := int64(0) + if info != nil { + size = info.Size() + } + w.f = f + w.w = bufio.NewWriterSize(f, 64*1024) + w.currentPath = path + w.currentSize = size + _ = pruneFiles(w.dir, w.cfg.KeepFiles) + return nil +} + +func pruneFiles(dir string, keep int) error { + if keep <= 0 { + return nil + } + ents, err := os.ReadDir(dir) + if err != nil { + return err + } + files := make([]string, 0, len(ents)) + for _, ent := range ents { + if ent.IsDir() { + continue + } + name := ent.Name() + if !strings.HasPrefix(name, "telemetry-") || !strings.HasSuffix(name, ".jsonl") { + continue + } + files = append(files, filepath.Join(dir, name)) + } + if len(files) <= keep { + return nil + } + sort.Strings(files) + for _, path := range files[:len(files)-keep] { + _ = os.Remove(path) + } + return nil +} + +func readPersistedMetrics(cfg Config, q Query) ([]MetricPoint, error) { + files, err := listPersistedFiles(cfg.PersistDir) + if err != nil { + return nil, err + } + out := make([]MetricPoint, 0, 256) + for _, path := range files { + points, err := parsePersistedFile(path, q) + if err != nil { + continue + } + for _, p := range points.metrics { + if metricMatch(p, q) { + out = append(out, p) + } + } + } + return out, nil +} + +func readPersistedEvents(cfg Config, q Query) ([]Event, error) { + files, err := listPersistedFiles(cfg.PersistDir) + if err != nil { + return nil, err + } + out := make([]Event, 0, 128) + for _, path := range files { + points, err := parsePersistedFile(path, q) + if err != nil { + continue + } + for _, ev := range points.events { + if eventMatch(ev, q) { + out = append(out, ev) + } + } + } + return out, nil +} + +type parsedFile struct { + metrics []MetricPoint + events []Event +} + +func parsePersistedFile(path string, q Query) (parsedFile, error) { + f, err := os.Open(path) + if err != nil { + return parsedFile{}, err + } + defer f.Close() + out := parsedFile{ + metrics: make([]MetricPoint, 0, 64), + events: make([]Event, 0, 32), + } + s := bufio.NewScanner(f) + s.Buffer(make([]byte, 0, 32*1024), 1024*1024) + for s.Scan() { + line := s.Bytes() + if len(line) == 0 { + continue + } + var env persistedEnvelope + if err := json.Unmarshal(line, &env); err != nil { + continue + } + if env.Metric != nil { + out.metrics = append(out.metrics, *env.Metric) + } else if env.Event != nil { + out.events = append(out.events, *env.Event) + } + if q.Limit > 0 && len(out.metrics)+len(out.events) > q.Limit*2 { + // keep bounded while scanning + if len(out.metrics) > q.Limit { + out.metrics = out.metrics[len(out.metrics)-q.Limit:] + } + if len(out.events) > q.Limit { + out.events = out.events[len(out.events)-q.Limit:] + } + } + } + return out, s.Err() +} + +func listPersistedFiles(dir string) ([]string, error) { + ents, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + files := make([]string, 0, len(ents)) + for _, ent := range ents { + if ent.IsDir() { + continue + } + name := ent.Name() + if strings.HasPrefix(name, "telemetry-") && strings.HasSuffix(name, ".jsonl") { + files = append(files, filepath.Join(dir, name)) + } + } + sort.Strings(files) + return files, nil +} + +func ParseTimeQuery(raw string) (time.Time, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return time.Time{}, nil + } + if ms, err := strconv.ParseInt(raw, 10, 64); err == nil { + if ms > 1e12 { + return time.UnixMilli(ms).UTC(), nil + } + return time.Unix(ms, 0).UTC(), nil + } + if t, err := time.Parse(time.RFC3339Nano, raw); err == nil { + return t.UTC(), nil + } + if t, err := time.Parse(time.RFC3339, raw); err == nil { + return t.UTC(), nil + } + return time.Time{}, errors.New("invalid time query") +} + +func TagsWith(base Tags, key string, value any) Tags { + out := cloneTags(base) + if out == nil { + out = Tags{} + } + out[key] = fmt.Sprint(value) + return out +} + +func TagsFromPairs(kv ...string) Tags { + if len(kv) < 2 { + return nil + } + out := Tags{} + for i := 0; i+1 < len(kv); i += 2 { + k := strings.TrimSpace(kv[i]) + if k == "" { + continue + } + out[k] = kv[i+1] + } + if len(out) == 0 { + return nil + } + return out +} + +func min(a int, b int) int { + if a < b { + return a + } + return b +}