diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index ecea970..6215219 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -97,7 +97,10 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * MaxDiskMB: cfg.Recorder.MaxDiskMB, OutputDir: cfg.Recorder.OutputDir, ClassFilter: cfg.Recorder.ClassFilter, - RingSeconds: cfg.Recorder.RingSeconds, + RingSeconds: cfg.Recorder.RingSeconds, + DeemphasisUs: cfg.Recorder.DeemphasisUs, + ExtractionTaps: cfg.Recorder.ExtractionTaps, + ExtractionBwMult: cfg.Recorder.ExtractionBwMult, }, cfg.CenterHz, buildDecoderMap(cfg)) } if upd.det != nil { @@ -180,20 +183,25 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } var spectrum []float64 if useGPU && gpuEngine != nil { + // GPU FFT: apply window to a COPY — allIQ must stay unmodified + // for extractForStreaming which needs raw IQ for signal extraction. + gpuBuf := make([]complex64, len(iq)) if len(window) == len(iq) { for i := 0; i < len(iq); i++ { v := iq[i] w := float32(window[i]) - iq[i] = complex(real(v)*w, imag(v)*w) + gpuBuf[i] = complex(real(v)*w, imag(v)*w) } + } else { + copy(gpuBuf, iq) } - out, err := gpuEngine.Exec(iq) + out, err := gpuEngine.Exec(gpuBuf) if err != nil { if gpuState != nil { gpuState.set(false, err) } useGPU = false - spectrum = fftutil.SpectrumWithPlan(iq, nil, plan) + spectrum = fftutil.SpectrumWithPlan(gpuBuf, nil, plan) } else { spectrum = fftutil.SpectrumFromFFT(out) } @@ -322,11 +330,28 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } } + // Cleanup streamPhaseState for disappeared signals + if len(streamPhaseState) > 0 { + sigIDs := make(map[int64]bool, len(signals)) + for _, s := range signals { + sigIDs[s.ID] = true + } + for id := range streamPhaseState { + if !sigIDs[id] { + delete(streamPhaseState, id) + } + } + } + // GPU-extract signal snippets with phase-continuous FreqShift and // IQ overlap for FIR halo. Heavy work on GPU, only demod runs async. displaySignals = det.StableSignals() if rec != nil && len(displaySignals) > 0 && len(allIQ) > 0 { - streamSnips, streamRates := extractForStreaming(extractMgr, allIQ, cfg.SampleRate, cfg.CenterHz, displaySignals, streamPhaseState, streamOverlap) + aqCfg := extractionConfig{ + firTaps: cfg.Recorder.ExtractionTaps, + bwMult: cfg.Recorder.ExtractionBwMult, + } + streamSnips, streamRates := extractForStreaming(extractMgr, allIQ, cfg.SampleRate, cfg.CenterHz, displaySignals, streamPhaseState, streamOverlap, aqCfg) items := make([]recorder.StreamFeedItem, 0, len(displaySignals)) for j, ds := range displaySignals { if ds.ID == 0 || ds.Class == nil { diff --git a/cmd/sdrd/helpers.go b/cmd/sdrd/helpers.go index 24cad26..ae1f777 100644 --- a/cmd/sdrd/helpers.go +++ b/cmd/sdrd/helpers.go @@ -214,7 +214,13 @@ type streamIQOverlap struct { tail []complex64 } -const streamOverlapLen = 512 // must be >= FIR tap count (101) with margin +// extractionConfig holds audio quality settings for signal extraction. +type extractionConfig struct { + firTaps int // AQ-3: FIR tap count (default 101) + bwMult float64 // AQ-5: BW multiplier (default 1.2) +} + +const streamOverlapLen = 512 // must be >= FIR tap count with margin // extractForStreaming performs GPU-accelerated extraction with: // - Per-signal phase-continuous FreqShift (via PhaseStart in ExtractJob) @@ -229,6 +235,7 @@ func extractForStreaming( signals []detector.Signal, phaseState map[int64]*streamExtractState, overlap *streamIQOverlap, + aqCfg extractionConfig, ) ([][]complex64, []int) { out := make([][]complex64, len(signals)) rates := make([]int, len(signals)) @@ -236,6 +243,12 @@ func extractForStreaming( return out, rates } + // AQ-3: Use configured overlap length (must cover FIR taps) + overlapNeeded := streamOverlapLen + if aqCfg.firTaps > 0 && aqCfg.firTaps+64 > overlapNeeded { + overlapNeeded = aqCfg.firTaps + 64 + } + // Prepend overlap from previous frame so FIR kernel has real halo data var gpuIQ []complex64 overlapLen := len(overlap.tail) @@ -248,19 +261,25 @@ func extractForStreaming( overlapLen = 0 } - // Save tail for next frame - if len(allIQ) > streamOverlapLen { - overlap.tail = append(overlap.tail[:0], allIQ[len(allIQ)-streamOverlapLen:]...) + // Save tail for next frame (sized to cover configured FIR taps) + if len(allIQ) > overlapNeeded { + overlap.tail = append(overlap.tail[:0], allIQ[len(allIQ)-overlapNeeded:]...) } else { overlap.tail = append(overlap.tail[:0], allIQ...) } decimTarget := 200000 + // AQ-5: BW multiplier for extraction (wider = better S/N for weak signals) + bwMult := aqCfg.bwMult + if bwMult <= 0 { + bwMult = 1.0 + } + // Build jobs with per-signal phase jobs := make([]gpudemod.ExtractJob, len(signals)) for i, sig := range signals { - bw := sig.BWHz + bw := sig.BWHz * bwMult // AQ-5: widen extraction BW sigMHz := sig.CenterHz / 1e6 isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO")) @@ -352,7 +371,11 @@ func extractForStreaming( if cutoff > float64(sampleRate)/2-1 { cutoff = float64(sampleRate)/2 - 1 } - taps := dsp.LowpassFIR(cutoff, sampleRate, 101) + firTaps := 101 + if aqCfg.firTaps > 0 { + firTaps = aqCfg.firTaps + } + taps := dsp.LowpassFIR(cutoff, sampleRate, firTaps) filtered := dsp.ApplyFIR(shifted, taps) decim := sampleRate / decimTarget if decim < 1 { diff --git a/cmd/sdrd/hub.go b/cmd/sdrd/hub.go index 382b398..70f1954 100644 --- a/cmd/sdrd/hub.go +++ b/cmd/sdrd/hub.go @@ -1,8 +1,10 @@ package main import ( + "encoding/binary" "encoding/json" "log" + "math" "time" "sdr-visual-suite/internal/detector" @@ -57,11 +59,15 @@ func (h *hub) remove(c *client) { } func (h *hub) broadcast(frame SpectrumFrame) { - b, err := json.Marshal(frame) - if err != nil { - log.Printf("marshal frame: %v", err) - return + // Pre-encode JSON for legacy clients (only if needed) + var jsonBytes []byte + // Pre-encode binary for binary clients at various decimation levels + // We cache per unique maxBins value to avoid re-encoding + type binCacheEntry struct { + bins int + data []byte } + var binCache []binCacheEntry h.mu.Lock() clients := make([]*client, 0, len(h.clients)) @@ -71,15 +77,165 @@ func (h *hub) broadcast(frame SpectrumFrame) { h.mu.Unlock() for _, c := range clients { - select { - case c.send <- b: - default: - h.remove(c) + // Frame rate limiting + if c.targetFps > 0 && c.frameSkip > 1 { + c.frameN++ + if c.frameN%c.frameSkip != 0 { + continue + } + } + + if c.binary { + // Find or create cached binary encoding for this bin count + bins := c.maxBins + if bins <= 0 || bins >= len(frame.Spectrum) { + bins = len(frame.Spectrum) + } + var encoded []byte + for _, entry := range binCache { + if entry.bins == bins { + encoded = entry.data + break + } + } + if encoded == nil { + encoded = encodeBinaryFrame(frame, bins) + binCache = append(binCache, binCacheEntry{bins: bins, data: encoded}) + } + select { + case c.send <- encoded: + default: + h.remove(c) + } + } else { + // JSON path (legacy) + if jsonBytes == nil { + var err error + jsonBytes, err = json.Marshal(frame) + if err != nil { + log.Printf("marshal frame: %v", err) + return + } + } + select { + case c.send <- jsonBytes: + default: + h.remove(c) + } } } + h.frameCnt++ if time.Since(h.lastLogTs) > 2*time.Second { h.lastLogTs = time.Now() log.Printf("broadcast frames=%d clients=%d", h.frameCnt, len(clients)) } } + +// --------------------------------------------------------------------------- +// Binary spectrum protocol v4 +// --------------------------------------------------------------------------- +// +// Hybrid approach: spectrum data as compact binary, signals + debug as JSON. +// +// Layout (32-byte header): +// [0:1] magic: 0x53 0x50 ("SP") +// [2:3] version: uint16 LE = 4 +// [4:11] timestamp: int64 LE (Unix millis) +// [12:19] center_hz: float64 LE +// [20:23] bin_count: uint32 LE (supports FFT up to 4 billion) +// [24:27] sample_rate_hz: uint32 LE (Hz, max ~4.29 GHz) +// [28:31] json_offset: uint32 LE (byte offset where JSON starts) +// +// [32 .. 32+bins*2-1] spectrum: int16 LE, dB × 100 +// [json_offset ..] JSON: {"signals":[...],"debug":{...}} + +const binaryHeaderSize = 32 + +func encodeBinaryFrame(frame SpectrumFrame, targetBins int) []byte { + spectrum := frame.Spectrum + srcBins := len(spectrum) + if targetBins <= 0 || targetBins > srcBins { + targetBins = srcBins + } + + var decimated []float64 + if targetBins < srcBins && targetBins > 0 { + decimated = decimateSpectrum(spectrum, targetBins) + } else { + decimated = spectrum + targetBins = srcBins + } + + // JSON-encode signals + debug (full fidelity) + jsonPart, _ := json.Marshal(struct { + Signals []detector.Signal `json:"signals"` + Debug *SpectrumDebug `json:"debug,omitempty"` + }{ + Signals: frame.Signals, + Debug: frame.Debug, + }) + + specBytes := targetBins * 2 + jsonOffset := uint32(binaryHeaderSize + specBytes) + totalSize := int(jsonOffset) + len(jsonPart) + buf := make([]byte, totalSize) + + // Header + buf[0] = 0x53 // 'S' + buf[1] = 0x50 // 'P' + binary.LittleEndian.PutUint16(buf[2:4], 4) // version 4 + binary.LittleEndian.PutUint64(buf[4:12], uint64(frame.Timestamp)) + binary.LittleEndian.PutUint64(buf[12:20], math.Float64bits(frame.CenterHz)) + binary.LittleEndian.PutUint32(buf[20:24], uint32(targetBins)) + binary.LittleEndian.PutUint32(buf[24:28], uint32(frame.SampleHz)) + binary.LittleEndian.PutUint32(buf[28:32], jsonOffset) + + // Spectrum (int16, dB × 100) + off := binaryHeaderSize + for i := 0; i < targetBins; i++ { + v := decimated[i] * 100 + if v > 32767 { + v = 32767 + } else if v < -32767 { + v = -32767 + } + binary.LittleEndian.PutUint16(buf[off:off+2], uint16(int16(v))) + off += 2 + } + + // JSON signals + debug + copy(buf[jsonOffset:], jsonPart) + + return buf +} + +// decimateSpectrum reduces bins via peak-hold within each group. +func decimateSpectrum(spectrum []float64, targetBins int) []float64 { + src := len(spectrum) + out := make([]float64, targetBins) + ratio := float64(src) / float64(targetBins) + for i := 0; i < targetBins; i++ { + lo := int(float64(i) * ratio) + hi := int(float64(i+1) * ratio) + if hi > src { + hi = src + } + if lo >= hi { + if lo < src { + out[i] = spectrum[lo] + } + continue + } + peak := spectrum[lo] + for j := lo + 1; j < hi; j++ { + if spectrum[j] > peak { + peak = spectrum[j] + } + } + out[i] = peak + } + return out +} + + diff --git a/cmd/sdrd/main.go b/cmd/sdrd/main.go index ed2f03a..3c7f450 100644 --- a/cmd/sdrd/main.go +++ b/cmd/sdrd/main.go @@ -8,6 +8,7 @@ import ( "os" "os/signal" "path/filepath" + "runtime/debug" "sync" "syscall" "time" @@ -24,6 +25,14 @@ import ( ) func main() { + // Reduce GC target to limit peak memory. Default GOGC=100 lets heap + // grow to 2× live set before collecting. GOGC=50 triggers GC at 1.5×, + // halving the memory swings at a small CPU cost. + debug.SetGCPercent(50) + // Soft memory limit — GC will be more aggressive near this limit. + // 1 GB is generous for 5 WFM-stereo signals + FFT + recordings. + debug.SetMemoryLimit(1024 * 1024 * 1024) + var cfgPath string var mockFlag bool flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML") @@ -100,7 +109,10 @@ func main() { MaxDiskMB: cfg.Recorder.MaxDiskMB, OutputDir: cfg.Recorder.OutputDir, ClassFilter: cfg.Recorder.ClassFilter, - RingSeconds: cfg.Recorder.RingSeconds, + RingSeconds: cfg.Recorder.RingSeconds, + DeemphasisUs: cfg.Recorder.DeemphasisUs, + ExtractionTaps: cfg.Recorder.ExtractionTaps, + ExtractionBwMult: cfg.Recorder.ExtractionBwMult, }, cfg.CenterHz, decodeMap) defer recMgr.Close() diff --git a/cmd/sdrd/types.go b/cmd/sdrd/types.go index 7ef59dc..b4be0af 100644 --- a/cmd/sdrd/types.go +++ b/cmd/sdrd/types.go @@ -33,6 +33,13 @@ type client struct { send chan []byte done chan struct{} closeOnce sync.Once + + // Per-client settings (set via initial config message) + binary bool // send binary spectrum frames instead of JSON + maxBins int // target bin count (0 = full resolution) + targetFps int // target frame rate (0 = full rate) + frameSkip int // skip counter: send every N-th frame + frameN int // current frame counter } type hub struct { diff --git a/cmd/sdrd/ws_handlers.go b/cmd/sdrd/ws_handlers.go index d5eb2da..f71b575 100644 --- a/cmd/sdrd/ws_handlers.go +++ b/cmd/sdrd/ws_handlers.go @@ -27,7 +27,25 @@ func registerWSHandlers(mux *http.ServeMux, h *hub, recMgr *recorder.Manager) { log.Printf("ws upgrade failed: %v (origin: %s)", err, r.Header.Get("Origin")) return } - c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})} + + // Parse query params for remote clients: ?binary=1&bins=2048&fps=5 + q := r.URL.Query() + c := &client{conn: conn, send: make(chan []byte, 64), done: make(chan struct{})} + if q.Get("binary") == "1" || q.Get("binary") == "true" { + c.binary = true + } + if v, err := strconv.Atoi(q.Get("bins")); err == nil && v > 0 { + c.maxBins = v + } + if v, err := strconv.Atoi(q.Get("fps")); err == nil && v > 0 { + c.targetFps = v + // frameSkip: if server runs at ~15fps and client wants 5fps → skip 3 + c.frameSkip = 15 / v + if c.frameSkip < 1 { + c.frameSkip = 1 + } + } + h.add(c) defer func() { h.remove(c) @@ -47,24 +65,56 @@ func registerWSHandlers(mux *http.ServeMux, h *hub, recMgr *recorder.Manager) { if !ok { return } - _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond)) - if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { + // Binary frames can be large (130KB+) — need more time + deadline := 500 * time.Millisecond + if !c.binary { + deadline = 200 * time.Millisecond + } + _ = conn.SetWriteDeadline(time.Now().Add(deadline)) + msgType := websocket.TextMessage + if c.binary { + msgType = websocket.BinaryMessage + } + if err := conn.WriteMessage(msgType, msg); err != nil { return } case <-ping.C: _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { - log.Printf("ws ping error: %v", err) return } + case <-c.done: + return } } }() + // Read loop: handle config messages from client + keep-alive for { - _, _, err := conn.ReadMessage() + _, msg, err := conn.ReadMessage() if err != nil { return } + // Try to parse as client config update + var cfg struct { + Binary *bool `json:"binary,omitempty"` + Bins *int `json:"bins,omitempty"` + FPS *int `json:"fps,omitempty"` + } + if json.Unmarshal(msg, &cfg) == nil { + if cfg.Binary != nil { + c.binary = *cfg.Binary + } + if cfg.Bins != nil && *cfg.Bins > 0 { + c.maxBins = *cfg.Bins + } + if cfg.FPS != nil && *cfg.FPS > 0 { + c.targetFps = *cfg.FPS + c.frameSkip = 15 / *cfg.FPS + if c.frameSkip < 1 { + c.frameSkip = 1 + } + } + } } }) @@ -90,9 +140,12 @@ func registerWSHandlers(mux *http.ServeMux, h *hub, recMgr *recorder.Manager) { return } - subID, ch := streamer.SubscribeAudio(freq, bw, mode) - if ch == nil { - http.Error(w, "no active stream for this frequency", http.StatusNotFound) + // LL-3: Subscribe BEFORE upgrading WebSocket. + // SubscribeAudio now returns AudioInfo and never immediately closes + // the channel — it queues pending listeners instead. + subID, ch, audioInfo, err := streamer.SubscribeAudio(freq, bw, mode) + if err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) return } @@ -109,12 +162,13 @@ func registerWSHandlers(mux *http.ServeMux, h *hub, recMgr *recorder.Manager) { log.Printf("ws/audio: client connected freq=%.1fMHz mode=%s", freq/1e6, mode) - // Send audio stream info as first text message + // LL-2: Send actual audio info (channels, sample rate from session) info := map[string]any{ "type": "audio_info", - "sample_rate": 48000, - "channels": 1, - "format": "s16le", + "sample_rate": audioInfo.SampleRate, + "channels": audioInfo.Channels, + "format": audioInfo.Format, + "demod": audioInfo.DemodName, "freq": freq, "mode": mode, } @@ -139,13 +193,25 @@ func registerWSHandlers(mux *http.ServeMux, h *hub, recMgr *recorder.Manager) { for { select { - case pcm, ok := <-ch: + case data, ok := <-ch: if !ok { log.Printf("ws/audio: stream ended freq=%.1fMHz", freq/1e6) return } + if len(data) == 0 { + continue + } _ = conn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond)) - if err := conn.WriteMessage(websocket.BinaryMessage, pcm); err != nil { + // Tag protocol: first byte is message type + // 0x00 = AudioInfo JSON (send as TextMessage, strip tag) + // 0x01 = PCM audio (send as BinaryMessage, strip tag) + tag := data[0] + payload := data[1:] + msgType := websocket.BinaryMessage + if tag == 0x00 { + msgType = websocket.TextMessage + } + if err := conn.WriteMessage(msgType, payload); err != nil { log.Printf("ws/audio: write error: %v", err) return } diff --git a/internal/config/config.go b/internal/config/config.go index 3fde881..110b038 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -54,6 +54,11 @@ type RecorderConfig struct { OutputDir string `yaml:"output_dir" json:"output_dir"` ClassFilter []string `yaml:"class_filter" json:"class_filter"` RingSeconds int `yaml:"ring_seconds" json:"ring_seconds"` + + // Audio quality settings (AQ-2, AQ-3, AQ-5) + DeemphasisUs float64 `yaml:"deemphasis_us" json:"deemphasis_us"` // De-emphasis time constant in µs. 50=Europe, 75=US/Japan, 0=disabled. Default: 50 + ExtractionTaps int `yaml:"extraction_fir_taps" json:"extraction_fir_taps"` // FIR tap count for extraction filter. Default: 101, max 301 + ExtractionBwMult float64 `yaml:"extraction_bw_mult" json:"extraction_bw_mult"` // BW multiplier for extraction. Default: 1.2 (20% wider than detected) } type DecoderConfig struct { @@ -136,7 +141,10 @@ func Default() Config { AutoDecode: false, MaxDiskMB: 0, OutputDir: "data/recordings", - RingSeconds: 8, + RingSeconds: 8, + DeemphasisUs: 50, + ExtractionTaps: 101, + ExtractionBwMult: 1.2, }, Decoder: DecoderConfig{}, WebAddr: ":8080", @@ -271,6 +279,21 @@ func applyDefaults(cfg Config) Config { if cfg.Recorder.RingSeconds <= 0 { cfg.Recorder.RingSeconds = 8 } + if cfg.Recorder.DeemphasisUs == 0 { + cfg.Recorder.DeemphasisUs = 50 + } + if cfg.Recorder.ExtractionTaps <= 0 { + cfg.Recorder.ExtractionTaps = 101 + } + if cfg.Recorder.ExtractionTaps > 301 { + cfg.Recorder.ExtractionTaps = 301 + } + if cfg.Recorder.ExtractionTaps%2 == 0 { + cfg.Recorder.ExtractionTaps++ // must be odd + } + if cfg.Recorder.ExtractionBwMult <= 0 { + cfg.Recorder.ExtractionBwMult = 1.2 + } return cfg } diff --git a/internal/demod/fm.go b/internal/demod/fm.go index 18d1be1..224dec0 100644 --- a/internal/demod/fm.go +++ b/internal/demod/fm.go @@ -138,20 +138,6 @@ func RDSBasebandDecimated(iq []complex64, sampleRate int) RDSBasebandResult { return RDSBasebandResult{Samples: out, SampleRate: res.SampleRate} } -func deemphasis(x []float32, sampleRate int, tau float64) []float32 { - if len(x) == 0 || sampleRate <= 0 { - return x - } - alpha := math.Exp(-1.0 / (float64(sampleRate) * tau)) - out := make([]float32, len(x)) - var y float64 - for i, v := range x { - y = alpha*y + (1-alpha)*float64(v) - out[i] = float32(y) - } - return out -} - func init() { Register(NFM{}) Register(WFM{}) diff --git a/internal/dsp/fir_stateful.go b/internal/dsp/fir_stateful.go new file mode 100644 index 0000000..92aec67 --- /dev/null +++ b/internal/dsp/fir_stateful.go @@ -0,0 +1,112 @@ +package dsp + +// StatefulFIRReal is a real-valued FIR filter that preserves its delay line +// between calls to Process(). This eliminates click/pop artifacts at frame +// boundaries in streaming audio pipelines. +type StatefulFIRReal struct { + taps []float64 + delay []float64 + pos int // write position in circular delay buffer +} + +// NewStatefulFIRReal creates a stateful FIR filter with the given taps. +func NewStatefulFIRReal(taps []float64) *StatefulFIRReal { + t := make([]float64, len(taps)) + copy(t, taps) + return &StatefulFIRReal{ + taps: t, + delay: make([]float64, len(taps)), + } +} + +// Process filters the input through the FIR with persistent state. +// Allocates a new output slice. For zero-alloc hot paths, use ProcessInto. +func (f *StatefulFIRReal) Process(x []float32) []float32 { + out := make([]float32, len(x)) + f.ProcessInto(x, out) + return out +} + +// ProcessInto filters into a pre-allocated output buffer. +func (f *StatefulFIRReal) ProcessInto(x []float32, out []float32) []float32 { + if len(x) == 0 || len(f.taps) == 0 { + return out[:0] + } + n := len(f.taps) + for i := 0; i < len(x); i++ { + copy(f.delay[1:], f.delay[:n-1]) + f.delay[0] = float64(x[i]) + + var acc float64 + for k := 0; k < n; k++ { + acc += f.delay[k] * f.taps[k] + } + out[i] = float32(acc) + } + return out[:len(x)] +} + +// Reset clears the delay line. +func (f *StatefulFIRReal) Reset() { + for i := range f.delay { + f.delay[i] = 0 + } +} + +// StatefulFIRComplex is a complex-valued FIR filter with persistent state. +type StatefulFIRComplex struct { + taps []float64 + delayR []float64 + delayI []float64 +} + +// NewStatefulFIRComplex creates a stateful complex FIR filter. +func NewStatefulFIRComplex(taps []float64) *StatefulFIRComplex { + t := make([]float64, len(taps)) + copy(t, taps) + return &StatefulFIRComplex{ + taps: t, + delayR: make([]float64, len(taps)), + delayI: make([]float64, len(taps)), + } +} + +// Process filters complex IQ through the FIR with persistent state. +// Allocates a new output slice. For zero-alloc hot paths, use ProcessInto. +func (f *StatefulFIRComplex) Process(iq []complex64) []complex64 { + out := make([]complex64, len(iq)) + f.ProcessInto(iq, out) + return out +} + +// ProcessInto filters complex IQ into a pre-allocated output buffer. +// out must be at least len(iq) long. Returns the used portion of out. +func (f *StatefulFIRComplex) ProcessInto(iq []complex64, out []complex64) []complex64 { + if len(iq) == 0 || len(f.taps) == 0 { + return out[:0] + } + n := len(f.taps) + for i := 0; i < len(iq); i++ { + copy(f.delayR[1:], f.delayR[:n-1]) + copy(f.delayI[1:], f.delayI[:n-1]) + f.delayR[0] = float64(real(iq[i])) + f.delayI[0] = float64(imag(iq[i])) + + var accR, accI float64 + for k := 0; k < n; k++ { + w := f.taps[k] + accR += f.delayR[k] * w + accI += f.delayI[k] * w + } + out[i] = complex(float32(accR), float32(accI)) + } + return out[:len(iq)] +} + +// Reset clears the delay line. +func (f *StatefulFIRComplex) Reset() { + for i := range f.delayR { + f.delayR[i] = 0 + f.delayI[i] = 0 + } +} diff --git a/internal/dsp/resample.go b/internal/dsp/resample.go new file mode 100644 index 0000000..50735d0 --- /dev/null +++ b/internal/dsp/resample.go @@ -0,0 +1,294 @@ +package dsp + +import "math" + +// --------------------------------------------------------------------------- +// Rational Polyphase Resampler +// --------------------------------------------------------------------------- +// +// Converts sample rate by a rational factor L/M (upsample by L, then +// downsample by M) using a polyphase FIR implementation. The polyphase +// decomposition avoids computing intermediate upsampled samples that +// would be discarded, making it efficient even for large L/M. +// +// The resampler is stateful: it preserves its internal delay line and +// phase index between calls to Process(), enabling click-free streaming +// across frame boundaries. +// +// Usage: +// +// r := dsp.NewResampler(51200, 48000, 64) // 64 taps per phase +// for each frame { +// out := r.Process(audio) // or r.ProcessStereo(interleaved) +// } +// +// --------------------------------------------------------------------------- + +// Resampler performs rational polyphase sample rate conversion. +type Resampler struct { + l int // upsample factor + m int // downsample factor + tapsPerPh int // taps per polyphase arm + polyBank [][]float64 // polyBank[phase][tap] + delay []float64 // delay line, length = tapsPerPh + // outTime is the position (in upsampled-rate units) of the next output + // sample, relative to the next input sample to be consumed. It is + // always in [0, L). Between calls it persists so that the fractional + // position is perfectly continuous. + outTime int +} + +// NewResampler creates a polyphase resampler converting from inRate to +// outRate. tapsPerPhase controls the filter quality (16 = basic, 32 = +// good, 64 = high quality). The total prototype filter length is +// L * tapsPerPhase. +func NewResampler(inRate, outRate, tapsPerPhase int) *Resampler { + if inRate <= 0 || outRate <= 0 { + inRate, outRate = 1, 1 + } + if tapsPerPhase < 4 { + tapsPerPhase = 4 + } + + g := gcd(inRate, outRate) + l := outRate / g // upsample factor + m := inRate / g // downsample factor + + // Prototype lowpass: cutoff at min(1/L, 1/M) * Nyquist of the + // upsampled rate, with some margin for the transition band. + protoLen := l * tapsPerPhase + if protoLen%2 == 0 { + protoLen++ // ensure odd length for symmetric filter + } + + // Normalized cutoff: passband edge relative to upsampled rate + fc := 0.45 / float64(max(l, m)) // 0.45 instead of 0.5 for transition margin + proto := windowedSinc(protoLen, fc, float64(l)) + + // Decompose prototype into L polyphase arms + actualTapsPerPh := (protoLen + l - 1) / l + bank := make([][]float64, l) + for p := 0; p < l; p++ { + arm := make([]float64, actualTapsPerPh) + for t := 0; t < actualTapsPerPh; t++ { + idx := p + t*l + if idx < protoLen { + arm[t] = proto[idx] + } + } + bank[p] = arm + } + + return &Resampler{ + l: l, + m: m, + tapsPerPh: actualTapsPerPh, + polyBank: bank, + delay: make([]float64, actualTapsPerPh), + outTime: 0, + } +} + +// Process resamples a mono float32 buffer and returns the resampled output. +// State is preserved between calls for seamless streaming. +// +// The key insight: we conceptually interleave L-1 zeros between each input +// sample (upsampled rate = L * Fs_in), then pick every M-th sample from +// the filtered result (output rate = L/M * Fs_in). +// +// outTime tracks the sub-sample position of the next output within the +// current input sample's L phases. When outTime wraps past L, we consume +// the next input sample. This single counter gives exact, chunk-independent +// output. +func (r *Resampler) Process(in []float32) []float32 { + if len(in) == 0 { + return nil + } + if r.l == r.m { + out := make([]float32, len(in)) + copy(out, in) + return out + } + + L := r.l + M := r.m + taps := r.tapsPerPh + estOut := int(float64(len(in))*float64(L)/float64(M)) + 4 + out := make([]float32, 0, estOut) + + inPos := 0 + t := r.outTime + + for inPos < len(in) { + // Consume input samples until outTime < L + for t >= L { + t -= L + if inPos >= len(in) { + r.outTime = t + return out + } + copy(r.delay[1:], r.delay[:taps-1]) + r.delay[0] = float64(in[inPos]) + inPos++ + } + + // Produce output at phase = t + arm := r.polyBank[t] + var acc float64 + for k := 0; k < taps; k++ { + acc += r.delay[k] * arm[k] + } + out = append(out, float32(acc)) + + // Advance to next output position + t += M + } + + r.outTime = t + return out +} + +// Reset clears the delay line and phase state. +func (r *Resampler) Reset() { + for i := range r.delay { + r.delay[i] = 0 + } + r.outTime = 0 +} + +// OutputRate returns the effective output sample rate given an input rate. +func (r *Resampler) OutputRate(inRate int) int { + return inRate * r.l / r.m +} + +// Ratio returns L and M. +func (r *Resampler) Ratio() (int, int) { + return r.l, r.m +} + +// --------------------------------------------------------------------------- +// StereoResampler — two synchronised mono resamplers +// --------------------------------------------------------------------------- + +// StereoResampler wraps two Resampler instances sharing the same L/M ratio +// for click-free stereo resampling with independent delay lines. +type StereoResampler struct { + left *Resampler + right *Resampler +} + +// NewStereoResampler creates a pair of synchronised resamplers. +func NewStereoResampler(inRate, outRate, tapsPerPhase int) *StereoResampler { + return &StereoResampler{ + left: NewResampler(inRate, outRate, tapsPerPhase), + right: NewResampler(inRate, outRate, tapsPerPhase), + } +} + +// Process takes interleaved stereo [L0,R0,L1,R1,...] and returns +// resampled interleaved stereo. +func (sr *StereoResampler) Process(in []float32) []float32 { + nFrames := len(in) / 2 + if nFrames == 0 { + return nil + } + left := make([]float32, nFrames) + right := make([]float32, nFrames) + for i := 0; i < nFrames; i++ { + left[i] = in[i*2] + if i*2+1 < len(in) { + right[i] = in[i*2+1] + } + } + + outL := sr.left.Process(left) + outR := sr.right.Process(right) + + // Interleave — use shorter length if they differ by 1 sample + n := len(outL) + if len(outR) < n { + n = len(outR) + } + out := make([]float32, n*2) + for i := 0; i < n; i++ { + out[i*2] = outL[i] + out[i*2+1] = outR[i] + } + return out +} + +// Reset clears both delay lines. +func (sr *StereoResampler) Reset() { + sr.left.Reset() + sr.right.Reset() +} + +// OutputRate returns the resampled output rate. +func (sr *StereoResampler) OutputRate(inRate int) int { + return sr.left.OutputRate(inRate) +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func gcd(a, b int) int { + for b != 0 { + a, b = b, a%b + } + if a < 0 { + return -a + } + return a +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +// windowedSinc generates a windowed-sinc prototype lowpass filter. +// fc is the normalised cutoff (0..0.5 of the upsampled rate). +// gain is the scaling factor (= L for polyphase interpolation). +func windowedSinc(length int, fc float64, gain float64) []float64 { + out := make([]float64, length) + mid := float64(length-1) / 2.0 + for n := 0; n < length; n++ { + x := float64(n) - mid + // Sinc + var s float64 + if math.Abs(x) < 1e-12 { + s = 2 * math.Pi * fc + } else { + s = math.Sin(2*math.Pi*fc*x) / x + } + // Kaiser window (beta=6 gives ~-60dB sidelobe, good for audio) + w := kaiserWindow(n, length, 6.0) + out[n] = s * w * gain + } + return out +} + +// kaiserWindow computes the Kaiser window value for sample n of N total. +func kaiserWindow(n, N int, beta float64) float64 { + mid := float64(N-1) / 2.0 + x := (float64(n) - mid) / mid + return bessel0(beta*math.Sqrt(1-x*x)) / bessel0(beta) +} + +// bessel0 is the zeroth-order modified Bessel function of the first kind. +func bessel0(x float64) float64 { + // Series expansion — converges rapidly for typical beta values + sum := 1.0 + term := 1.0 + for k := 1; k < 30; k++ { + term *= (x / (2 * float64(k))) * (x / (2 * float64(k))) + sum += term + if term < 1e-12*sum { + break + } + } + return sum +} diff --git a/internal/dsp/resample_test.go b/internal/dsp/resample_test.go new file mode 100644 index 0000000..5a263e9 --- /dev/null +++ b/internal/dsp/resample_test.go @@ -0,0 +1,248 @@ +package dsp + +import ( + "math" + "testing" +) + +func TestGCD(t *testing.T) { + tests := []struct { + a, b, want int + }{ + {48000, 51200, 3200}, + {48000, 44100, 300}, + {48000, 48000, 48000}, + {48000, 96000, 48000}, + {48000, 200000, 8000}, + } + for _, tt := range tests { + got := gcd(tt.a, tt.b) + if got != tt.want { + t.Errorf("gcd(%d, %d) = %d, want %d", tt.a, tt.b, got, tt.want) + } + } +} + +func TestResamplerRatio(t *testing.T) { + tests := []struct { + inRate, outRate int + wantL, wantM int + }{ + {51200, 48000, 15, 16}, // SDR typical + {44100, 48000, 160, 147}, + {48000, 48000, 1, 1}, // identity + {96000, 48000, 1, 2}, // simple downsample + } + for _, tt := range tests { + r := NewResampler(tt.inRate, tt.outRate, 32) + l, m := r.Ratio() + if l != tt.wantL || m != tt.wantM { + t.Errorf("NewResampler(%d, %d): ratio = %d/%d, want %d/%d", + tt.inRate, tt.outRate, l, m, tt.wantL, tt.wantM) + } + } +} + +func TestResamplerIdentity(t *testing.T) { + r := NewResampler(48000, 48000, 32) + in := make([]float32, 1000) + for i := range in { + in[i] = float32(math.Sin(2 * math.Pi * 440 * float64(i) / 48000)) + } + out := r.Process(in) + if len(out) != len(in) { + t.Fatalf("identity resampler: len(out) = %d, want %d", len(out), len(in)) + } + for i := range in { + if math.Abs(float64(out[i]-in[i])) > 1e-4 { + t.Errorf("sample %d: got %f, want %f", i, out[i], in[i]) + break + } + } +} + +func TestResamplerOutputLength(t *testing.T) { + tests := []struct { + inRate, outRate, inLen int + }{ + {51200, 48000, 5120}, + {51200, 48000, 10240}, + {44100, 48000, 4410}, + {96000, 48000, 9600}, + {200000, 48000, 20000}, + } + for _, tt := range tests { + r := NewResampler(tt.inRate, tt.outRate, 32) + in := make([]float32, tt.inLen) + for i := range in { + in[i] = float32(math.Sin(2 * math.Pi * 1000 * float64(i) / float64(tt.inRate))) + } + out := r.Process(in) + expected := float64(tt.inLen) * float64(tt.outRate) / float64(tt.inRate) + // Allow ±2 samples tolerance for filter delay + edge effects + if math.Abs(float64(len(out))-expected) > 3 { + t.Errorf("Resampler(%d→%d) %d samples: got %d output, expected ~%.0f", + tt.inRate, tt.outRate, tt.inLen, len(out), expected) + } + } +} + +func TestResamplerStreamContinuity(t *testing.T) { + // Verify that processing in chunks gives essentially the same result + // as one block (state preservation works for seamless streaming). + // + // With non-M-aligned chunks the output count may differ by ±1 per + // chunk due to sub-phase boundary effects. This is harmless for + // audio streaming. We verify: + // 1. M-aligned chunks give bit-exact results + // 2. Arbitrary chunks give correct audio (small value error near boundaries) + inRate := 51200 + outRate := 48000 + freq := 1000.0 + + totalSamples := inRate + signal := make([]float32, totalSamples) + for i := range signal { + signal[i] = float32(math.Sin(2 * math.Pi * freq * float64(i) / float64(inRate))) + } + + // --- Test 1: M-aligned chunks must be bit-exact --- + g := gcd(inRate, outRate) + M := inRate / g // 16 + chunkAligned := M * 200 // 3200, divides evenly + + r1 := NewResampler(inRate, outRate, 32) + oneBlock := r1.Process(signal) + + r2 := NewResampler(inRate, outRate, 32) + var aligned []float32 + for i := 0; i < len(signal); i += chunkAligned { + end := i + chunkAligned + if end > len(signal) { + end = len(signal) + } + aligned = append(aligned, r2.Process(signal[i:end])...) + } + if len(oneBlock) != len(aligned) { + t.Fatalf("M-aligned: length mismatch one=%d aligned=%d", len(oneBlock), len(aligned)) + } + for i := range oneBlock { + if oneBlock[i] != aligned[i] { + t.Fatalf("M-aligned: sample %d differs: %f vs %f", i, oneBlock[i], aligned[i]) + } + } + + // --- Test 2: Arbitrary chunks — audio must be within ±1 sample count --- + r3 := NewResampler(inRate, outRate, 32) + chunkArbitrary := inRate / 15 // ~3413, not M-aligned + var arb []float32 + for i := 0; i < len(signal); i += chunkArbitrary { + end := i + chunkArbitrary + if end > len(signal) { + end = len(signal) + } + arb = append(arb, r3.Process(signal[i:end])...) + } + // Length should be close (within ~number of chunks) + nChunks := (len(signal) + chunkArbitrary - 1) / chunkArbitrary + if abs(len(arb)-len(oneBlock)) > nChunks { + t.Errorf("arbitrary chunks: length %d vs %d (diff %d, max allowed %d)", + len(arb), len(oneBlock), len(arb)-len(oneBlock), nChunks) + } + + // Values should match where they overlap (skip boundaries) + minLen := len(oneBlock) + if len(arb) < minLen { + minLen = len(arb) + } + maxDiff := 0.0 + for i := 64; i < minLen-64; i++ { + diff := math.Abs(float64(oneBlock[i] - arb[i])) + if diff > maxDiff { + maxDiff = diff + } + } + // Interior samples that haven't drifted should be very close + t.Logf("arbitrary chunks: maxDiff=%e len_one=%d len_arb=%d", maxDiff, len(oneBlock), len(arb)) +} + +func abs(x int) int { + if x < 0 { + return -x + } + return x +} + +func TestResamplerTonePreservation(t *testing.T) { + // Resample a 1kHz tone and verify the frequency is preserved + inRate := 51200 + outRate := 48000 + freq := 1000.0 + + in := make([]float32, inRate) // 1 second + for i := range in { + in[i] = float32(math.Sin(2 * math.Pi * freq * float64(i) / float64(inRate))) + } + + r := NewResampler(inRate, outRate, 32) + out := r.Process(in) + + // Measure frequency by zero crossings in the output (skip first 100 samples for filter settle) + crossings := 0 + for i := 101; i < len(out); i++ { + if (out[i-1] <= 0 && out[i] > 0) || (out[i-1] >= 0 && out[i] < 0) { + crossings++ + } + } + // Each full cycle has 2 zero crossings + measuredFreq := float64(crossings) / 2.0 * float64(outRate) / float64(len(out)-101) + if math.Abs(measuredFreq-freq) > 10 { // within 10 Hz + t.Errorf("tone preservation: measured %.1f Hz, want %.1f Hz", measuredFreq, freq) + } +} + +func TestStereoResampler(t *testing.T) { + inRate := 51200 + outRate := 48000 + + // Generate stereo: 440Hz left, 880Hz right + nFrames := inRate / 2 // 0.5 seconds + in := make([]float32, nFrames*2) + for i := 0; i < nFrames; i++ { + in[i*2] = float32(math.Sin(2 * math.Pi * 440 * float64(i) / float64(inRate))) + in[i*2+1] = float32(math.Sin(2 * math.Pi * 880 * float64(i) / float64(inRate))) + } + + sr := NewStereoResampler(inRate, outRate, 32) + out := sr.Process(in) + + expectedFrames := float64(nFrames) * float64(outRate) / float64(inRate) + if math.Abs(float64(len(out)/2)-expectedFrames) > 3 { + t.Errorf("stereo output: %d frames, expected ~%.0f", len(out)/2, expectedFrames) + } + + // Verify it's properly interleaved (left and right should have different content) + if len(out) >= 200 { + leftSum := 0.0 + rightSum := 0.0 + for i := 50; i < 100; i++ { + leftSum += math.Abs(float64(out[i*2])) + rightSum += math.Abs(float64(out[i*2+1])) + } + if leftSum < 0.1 || rightSum < 0.1 { + t.Errorf("stereo channels appear silent: leftEnergy=%.3f rightEnergy=%.3f", leftSum, rightSum) + } + } +} + +func BenchmarkResampler51200to48000(b *testing.B) { + in := make([]float32, 51200/15) // one DSP frame at 51200 Hz / 15fps + for i := range in { + in[i] = float32(math.Sin(2 * math.Pi * 1000 * float64(i) / 51200)) + } + r := NewResampler(51200, 48000, 32) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r.Process(in) + } +} diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go index 00c119e..7afe41c 100644 --- a/internal/recorder/recorder.go +++ b/internal/recorder/recorder.go @@ -28,6 +28,11 @@ type Policy struct { OutputDir string `yaml:"output_dir" json:"output_dir"` ClassFilter []string `yaml:"class_filter" json:"class_filter"` RingSeconds int `yaml:"ring_seconds" json:"ring_seconds"` + + // Audio quality (AQ-2, AQ-3, AQ-5) + DeemphasisUs float64 `yaml:"deemphasis_us" json:"deemphasis_us"` + ExtractionTaps int `yaml:"extraction_fir_taps" json:"extraction_fir_taps"` + ExtractionBwMult float64 `yaml:"extraction_bw_mult" json:"extraction_bw_mult"` } type Manager struct { @@ -358,3 +363,11 @@ func (m *Manager) ActiveStreams() int { } return m.streamer.ActiveSessions() } + +// HasListeners returns true if any live-listen subscribers are active or pending. +func (m *Manager) HasListeners() bool { + if m == nil || m.streamer == nil { + return false + } + return m.streamer.HasListeners() +} diff --git a/internal/recorder/streamer.go b/internal/recorder/streamer.go index 9bbaa8f..aa72271 100644 --- a/internal/recorder/streamer.go +++ b/internal/recorder/streamer.go @@ -4,6 +4,7 @@ import ( "bufio" "encoding/binary" "encoding/json" + "errors" "fmt" "log" "math" @@ -20,7 +21,7 @@ import ( ) // --------------------------------------------------------------------------- -// streamSession — one open recording for one signal +// streamSession — one open demod session for one signal // --------------------------------------------------------------------------- type streamSession struct { @@ -33,20 +34,24 @@ type streamSession struct { startTime time.Time lastFeed time.Time + // listenOnly sessions have no WAV file and no disk I/O. + // They exist solely to feed audio to live-listen subscribers. + listenOnly bool + + // Recording state (nil/zero for listen-only sessions) dir string wavFile *os.File wavBuf *bufio.Writer wavSamples int64 - sampleRate int // actual output audio sample rate + segmentIdx int + + sampleRate int // actual output audio sample rate (always streamAudioRate) channels int demodName string - segmentIdx int // --- Persistent DSP state for click-free streaming --- // Overlap-save: tail of previous extracted IQ snippet. - // Prepended to the next snippet so FIR filters and FM discriminator - // have history — eliminates transient clicks at frame boundaries. overlapIQ []complex64 // De-emphasis IIR state (persists across frames) @@ -56,6 +61,33 @@ type streamSession struct { // Stereo decode: phase-continuous 38kHz oscillator stereoPhase float64 + // Polyphase resampler (replaces integer-decimate hack) + monoResampler *dsp.Resampler + stereoResampler *dsp.StereoResampler + + // AQ-4: Stateful FIR filters for click-free stereo decode + stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R + stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high + stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low + stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R + stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path) + + // Stateful pre-demod anti-alias FIR (eliminates cold-start transients + // and avoids per-frame FIR recomputation) + preDemodFIR *dsp.StatefulFIRComplex + preDemodDecim int // cached decimation factor + preDemodRate int // cached snipRate this FIR was built for + preDemodCutoff float64 // cached cutoff + + // AQ-2: De-emphasis config (µs, 0 = disabled) + deemphasisUs float64 + + // Scratch buffers — reused across frames to avoid GC pressure. + // Grown as needed, never shrunk. + scratchIQ []complex64 // for pre-demod FIR output + decimate input + scratchAudio []float32 // for stereo decode intermediates + scratchPCM []byte // for PCM encoding + // live-listen subscribers audioSubs []audioSub } @@ -65,8 +97,18 @@ type audioSub struct { ch chan []byte } +// AudioInfo describes the audio format of a live-listen subscription. +// Sent to the WebSocket client as the first message. +type AudioInfo struct { + SampleRate int `json:"sample_rate"` + Channels int `json:"channels"` + Format string `json:"format"` // always "s16le" + DemodName string `json:"demod"` +} + const ( streamAudioRate = 48000 + resamplerTaps = 32 // taps per polyphase arm — good quality ) // --------------------------------------------------------------------------- @@ -91,15 +133,26 @@ type Streamer struct { nextSub int64 feedCh chan streamFeedMsg done chan struct{} + + // pendingListens are subscribers waiting for a matching session. + pendingListens map[int64]*pendingListen +} + +type pendingListen struct { + freq float64 + bw float64 + mode string + ch chan []byte } func newStreamer(policy Policy, centerHz float64) *Streamer { st := &Streamer{ - sessions: make(map[int64]*streamSession), - policy: policy, - centerHz: centerHz, - feedCh: make(chan streamFeedMsg, 2), - done: make(chan struct{}), + sessions: make(map[int64]*streamSession), + policy: policy, + centerHz: centerHz, + feedCh: make(chan streamFeedMsg, 2), + done: make(chan struct{}), + pendingListens: make(map[int64]*pendingListen), } go st.worker() return st @@ -119,58 +172,77 @@ func (st *Streamer) updatePolicy(policy Policy, centerHz float64) { st.policy = policy st.centerHz = centerHz - // If recording was just disabled, close all active sessions - // so WAV headers get fixed and meta.json gets written. + // If recording was just disabled, close recording sessions + // but keep listen-only sessions alive. if wasEnabled && !policy.Enabled { for id, sess := range st.sessions { - for _, sub := range sess.audioSubs { - close(sub.ch) + if sess.listenOnly { + continue } - sess.audioSubs = nil - closeSession(sess, &st.policy) - delete(st.sessions, id) + if len(sess.audioSubs) > 0 { + // Convert to listen-only: close WAV but keep session + convertToListenOnly(sess) + } else { + closeSession(sess, &st.policy) + delete(st.sessions, id) + } + } + } +} + +// HasListeners returns true if any sessions have audio subscribers +// or there are pending listen requests. Used by the DSP loop to +// decide whether to feed snippets even when recording is disabled. +func (st *Streamer) HasListeners() bool { + st.mu.Lock() + defer st.mu.Unlock() + return st.hasListenersLocked() +} + +func (st *Streamer) hasListenersLocked() bool { + if len(st.pendingListens) > 0 { + return true + } + for _, sess := range st.sessions { + if len(sess.audioSubs) > 0 { + return true } - log.Printf("STREAM: recording disabled — closed %d sessions", len(st.sessions)) } + return false } -// FeedSnippets is called from the DSP loop with pre-extracted IQ snippets -// (GPU-accelerated FreqShift+FIR+Decimate already done). It copies the snippets -// and enqueues them for async demod in the worker goroutine. +// FeedSnippets is called from the DSP loop with pre-extracted IQ snippets. +// Feeds are accepted if: +// - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR +// - Any live-listen subscribers exist (listen-only mode) +// +// IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet +// data, so items can be passed directly without another copy. func (st *Streamer) FeedSnippets(items []streamFeedItem) { st.mu.Lock() - enabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ) + recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ) + hasListeners := st.hasListenersLocked() st.mu.Unlock() - if !enabled || len(items) == 0 { - return - } - // Copy snippets (GPU buffers may be reused) - copied := make([]streamFeedItem, len(items)) - for i, item := range items { - snipCopy := make([]complex64, len(item.snippet)) - copy(snipCopy, item.snippet) - copied[i] = streamFeedItem{ - signal: item.signal, - snippet: snipCopy, - snipRate: item.snipRate, - } + if (!recEnabled && !hasListeners) || len(items) == 0 { + return } select { - case st.feedCh <- streamFeedMsg{items: copied}: + case st.feedCh <- streamFeedMsg{items: items}: default: - // Worker busy — drop frame rather than blocking DSP loop } } -// processFeed runs in the worker goroutine. Receives pre-extracted snippets -// and does the lightweight demod + stereo + de-emphasis with persistent state. +// processFeed runs in the worker goroutine. func (st *Streamer) processFeed(msg streamFeedMsg) { st.mu.Lock() defer st.mu.Unlock() - if !st.policy.Enabled || (!st.policy.RecordAudio && !st.policy.RecordIQ) { + recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ) + hasListeners := st.hasListenersLocked() + + if !recEnabled && !hasListeners { return } @@ -185,26 +257,36 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { if sig.ID == 0 || sig.Class == nil { continue } - if sig.SNRDb < st.policy.MinSNRDb { - continue - } - if !st.classAllowed(sig.Class) { + if len(item.snippet) == 0 || item.snipRate <= 0 { continue } - if len(item.snippet) == 0 || item.snipRate <= 0 { + + // Decide whether this signal needs a session + needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class) + needsListen := st.signalHasListenerLocked(sig) + + if !needsRecording && !needsListen { continue } sess, exists := st.sessions[sig.ID] if !exists { - s, err := st.openSession(sig, now) - if err != nil { - log.Printf("STREAM: open failed signal=%d %.1fMHz: %v", - sig.ID, sig.CenterHz/1e6, err) - continue + if needsRecording { + s, err := st.openRecordingSession(sig, now) + if err != nil { + log.Printf("STREAM: open failed signal=%d %.1fMHz: %v", + sig.ID, sig.CenterHz/1e6, err) + continue + } + st.sessions[sig.ID] = s + sess = s + } else { + s := st.openListenSession(sig, now) + st.sessions[sig.ID] = s + sess = s } - st.sessions[sig.ID] = s - sess = s + // Attach any pending listeners + st.attachPendingListeners(sess) } // Update metadata @@ -221,37 +303,45 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { sess.class = sig.Class } - // Demod with persistent state (overlap-save, stereo, de-emphasis) + // Demod with persistent state audio, audioRate := sess.processSnippet(item.snippet, item.snipRate) if len(audio) > 0 { if sess.wavSamples == 0 && audioRate > 0 { sess.sampleRate = audioRate } - appendAudio(sess, audio) - st.fanoutAudio(sess, audio) + // Encode PCM once into scratch buffer, reuse for both WAV and fanout + pcmLen := len(audio) * 2 + pcm := sess.growPCM(pcmLen) + for k, s := range audio { + v := int16(clip(s * 32767)) + binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v)) + } + if !sess.listenOnly && sess.wavBuf != nil { + n, err := sess.wavBuf.Write(pcm) + if err != nil { + log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err) + } else { + sess.wavSamples += int64(n / 2) + } + } + st.fanoutPCM(sess, pcm, pcmLen) } - // Segment split - if st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration { + // Segment split (recording sessions only) + if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration { segIdx := sess.segmentIdx + 1 oldSubs := sess.audioSubs - oldOverlap := sess.overlapIQ - oldDeemphL := sess.deemphL - oldDeemphR := sess.deemphR - oldStereo := sess.stereoPhase + oldState := sess.captureDSPState() sess.audioSubs = nil closeSession(sess, &st.policy) - s, err := st.openSession(sig, now) + s, err := st.openRecordingSession(sig, now) if err != nil { delete(st.sessions, sig.ID) continue } s.segmentIdx = segIdx s.audioSubs = oldSubs - s.overlapIQ = oldOverlap - s.deemphL = oldDeemphL - s.deemphR = oldDeemphR - s.stereoPhase = oldStereo + s.restoreDSPState(oldState) st.sessions[sig.ID] = s } } @@ -261,16 +351,65 @@ func (st *Streamer) processFeed(msg streamFeedMsg) { if seen[id] { continue } - if now.Sub(sess.lastFeed) > 3*time.Second { - closeSession(sess, &st.policy) + gracePeriod := 3 * time.Second + if sess.listenOnly { + gracePeriod = 5 * time.Second + } + if now.Sub(sess.lastFeed) > gracePeriod { + for _, sub := range sess.audioSubs { + close(sub.ch) + } + sess.audioSubs = nil + if !sess.listenOnly { + closeSession(sess, &st.policy) + } delete(st.sessions, id) } } } +func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool { + if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 { + return true + } + for _, pl := range st.pendingListens { + if math.Abs(sig.CenterHz-pl.freq) < 200000 { + return true + } + } + return false +} + +func (st *Streamer) attachPendingListeners(sess *streamSession) { + for subID, pl := range st.pendingListens { + if math.Abs(sess.centerHz-pl.freq) < 200000 { + sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch}) + delete(st.pendingListens, subID) + + // Send updated audio_info now that we know the real session params. + // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage. + infoJSON, _ := json.Marshal(AudioInfo{ + SampleRate: sess.sampleRate, + Channels: sess.channels, + Format: "s16le", + DemodName: sess.demodName, + }) + tagged := make([]byte, 1+len(infoJSON)) + tagged[0] = 0x00 // tag: audio_info + copy(tagged[1:], infoJSON) + select { + case pl.ch <- tagged: + default: + } + + log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)", + subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels) + } + } +} + // CloseAll finalises all sessions and stops the worker goroutine. func (st *Streamer) CloseAll() { - // Stop accepting new feeds and wait for worker to finish close(st.feedCh) <-st.done @@ -281,9 +420,15 @@ func (st *Streamer) CloseAll() { close(sub.ch) } sess.audioSubs = nil - closeSession(sess, &st.policy) + if !sess.listenOnly { + closeSession(sess, &st.policy) + } delete(st.sessions, id) } + for _, pl := range st.pendingListens { + close(pl.ch) + } + st.pendingListens = nil } // ActiveSessions returns the number of open streaming sessions. @@ -294,13 +439,21 @@ func (st *Streamer) ActiveSessions() int { } // SubscribeAudio registers a live-listen subscriber for a given frequency. -func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte) { +// +// LL-2: Returns AudioInfo with correct channels and sample rate. +// LL-3: Returns error only on hard failures (nil streamer etc). +// +// If a matching session exists, attaches immediately. Otherwise, the +// subscriber is held as "pending" and will be attached when a matching +// signal appears in the next DSP frame. +func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) { ch := make(chan []byte, 64) st.mu.Lock() defer st.mu.Unlock() st.nextSub++ subID := st.nextSub + // Try to find a matching session var bestSess *streamSession bestDist := math.MaxFloat64 for _, sess := range st.sessions { @@ -310,19 +463,48 @@ func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64 bestSess = sess } } + if bestSess != nil && bestDist < 200000 { bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch}) - } else { - log.Printf("STREAM: audio subscriber %d has no matching session (freq=%.1fMHz)", subID, freq/1e6) - close(ch) + info := AudioInfo{ + SampleRate: bestSess.sampleRate, + Channels: bestSess.channels, + Format: "s16le", + DemodName: bestSess.demodName, + } + log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)", + subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName) + return subID, ch, info, nil + } + + // No matching session yet — add as pending listener + st.pendingListens[subID] = &pendingListen{ + freq: freq, + bw: bw, + mode: mode, + ch: ch, } - return subID, ch + info := AudioInfo{ + SampleRate: streamAudioRate, + Channels: 1, + Format: "s16le", + DemodName: "NFM", + } + log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6) + return subID, ch, info, nil } // UnsubscribeAudio removes a live-listen subscriber. func (st *Streamer) UnsubscribeAudio(subID int64) { st.mu.Lock() defer st.mu.Unlock() + + if pl, ok := st.pendingListens[subID]; ok { + close(pl.ch) + delete(st.pendingListens, subID) + return + } + for _, sess := range st.sessions { for i, sub := range sess.audioSubs { if sub.id == subID { @@ -338,19 +520,9 @@ func (st *Streamer) UnsubscribeAudio(subID int64) { // Session: stateful extraction + demod // --------------------------------------------------------------------------- -// processSnippet takes a pre-extracted IQ snippet (from GPU or CPU -// extractSignalIQBatch) and demodulates it with persistent state. -// -// The overlap-save operates on the EXTRACTED snippet level: we prepend -// the tail of the previous snippet so that: -// - FM discriminator has iq[i-1] for the first sample -// - The ~50-sample transient from FreqShift phase reset and FIR startup -// falls into the overlap region and gets trimmed from the output -// -// Stateful components (across frames): -// - overlapIQ: tail of previous extracted snippet -// - stereoPhase: 38kHz oscillator for L-R decode -// - deemphL/R: de-emphasis IIR accumulators +// processSnippet takes a pre-extracted IQ snippet and demodulates it with +// persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz +// output with zero transient artifacts. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) { if len(snippet) == 0 || snipRate <= 0 { return nil, 0 @@ -361,7 +533,7 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] demodName := sess.demodName if isWFMStereo { - demodName = "WFM" // mono FM demod, then stateful stereo post-process + demodName = "WFM" } d := demod.Get(demodName) if d == nil { @@ -371,18 +543,16 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] return nil, 0 } - // --- Minimal overlap: prepend last sample from previous snippet --- - // The FM discriminator computes atan2(iq[i] * conj(iq[i-1])), so the - // first output sample needs iq[-1] from the previous frame. - // FIR halo is already handled by extractForStreaming's IQ-level overlap, - // so we only need 1 sample here. + // --- FM discriminator overlap: prepend 1 sample from previous frame --- + // The FM discriminator needs iq[i-1] to compute the first output. + // All FIR filtering is now stateful, so no additional overlap is needed. var fullSnip []complex64 trimSamples := 0 - if len(sess.overlapIQ) > 0 { - fullSnip = make([]complex64, len(sess.overlapIQ)+len(snippet)) - copy(fullSnip, sess.overlapIQ) - copy(fullSnip[len(sess.overlapIQ):], snippet) - trimSamples = len(sess.overlapIQ) + if len(sess.overlapIQ) == 1 { + fullSnip = make([]complex64, 1+len(snippet)) + fullSnip[0] = sess.overlapIQ[0] + copy(fullSnip[1:], snippet) + trimSamples = 1 } else { fullSnip = snippet } @@ -392,7 +562,7 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] sess.overlapIQ = []complex64{snippet[len(snippet)-1]} } - // --- Decimate to demod-preferred rate with anti-alias --- + // --- Stateful anti-alias FIR + decimation to demod rate --- demodRate := d.OutputSampleRate() decim1 := int(math.Round(float64(snipRate) / float64(demodRate))) if decim1 < 1 { @@ -403,8 +573,17 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] var dec []complex64 if decim1 > 1 { cutoff := float64(actualDemodRate) / 2.0 * 0.8 - aaTaps := dsp.LowpassFIR(cutoff, snipRate, 101) - filtered := dsp.ApplyFIR(fullSnip, aaTaps) + + // Lazy-init or reinit stateful FIR if parameters changed + if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff { + taps := dsp.LowpassFIR(cutoff, snipRate, 101) + sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps) + sess.preDemodRate = snipRate + sess.preDemodCutoff = cutoff + sess.preDemodDecim = decim1 + } + + filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip))) dec = dsp.Decimate(filtered, decim1) } else { dec = fullSnip @@ -416,13 +595,15 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] return nil, 0 } - // --- Trim the overlap sample(s) from audio --- - audioTrim := trimSamples / decim1 - if decim1 <= 1 { - audioTrim = trimSamples - } - if audioTrim > 0 && audioTrim < len(audio) { - audio = audio[audioTrim:] + // --- Trim the 1-sample FM discriminator overlap --- + if trimSamples > 0 { + audioTrim := trimSamples / decim1 + if audioTrim < 1 { + audioTrim = 1 // at minimum trim 1 audio sample + } + if audioTrim > 0 && audioTrim < len(audio) { + audio = audio[audioTrim:] + } } // --- Stateful stereo decode --- @@ -432,53 +613,25 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] audio = sess.stereoDecodeStateful(audio, actualDemodRate) } - // --- Resample towards 48kHz --- - outputRate := actualDemodRate - if actualDemodRate > streamAudioRate { - decim2 := actualDemodRate / streamAudioRate - if decim2 < 1 { - decim2 = 1 - } - outputRate = actualDemodRate / decim2 - - aaTaps := dsp.LowpassFIR(float64(outputRate)/2.0*0.9, actualDemodRate, 63) - + // --- Polyphase resample to exact 48kHz --- + if actualDemodRate != streamAudioRate { if channels > 1 { - nFrames := len(audio) / channels - left := make([]float32, nFrames) - right := make([]float32, nFrames) - for i := 0; i < nFrames; i++ { - left[i] = audio[i*2] - if i*2+1 < len(audio) { - right[i] = audio[i*2+1] - } - } - left = dsp.ApplyFIRReal(left, aaTaps) - right = dsp.ApplyFIRReal(right, aaTaps) - outFrames := nFrames / decim2 - if outFrames < 1 { - return nil, 0 - } - resampled := make([]float32, outFrames*2) - for i := 0; i < outFrames; i++ { - resampled[i*2] = left[i*decim2] - resampled[i*2+1] = right[i*decim2] + if sess.stereoResampler == nil { + sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps) } - audio = resampled + audio = sess.stereoResampler.Process(audio) } else { - audio = dsp.ApplyFIRReal(audio, aaTaps) - resampled := make([]float32, 0, len(audio)/decim2+1) - for i := 0; i < len(audio); i += decim2 { - resampled = append(resampled, audio[i]) + if sess.monoResampler == nil { + sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps) } - audio = resampled + audio = sess.monoResampler.Process(audio) } } - // --- De-emphasis (50µs Europe) --- - if isWFM && outputRate > 0 { - const tau = 50e-6 - alpha := math.Exp(-1.0 / (float64(outputRate) * tau)) + // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) --- + if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 { + tau := sess.deemphasisUs * 1e-6 + alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau)) if channels > 1 { nFrames := len(audio) / channels yL, yR := sess.deemphL, sess.deemphR @@ -499,28 +652,44 @@ func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([] } } - return audio, outputRate + return audio, streamAudioRate } // stereoDecodeStateful: phase-continuous 38kHz oscillator for L-R extraction. +// AQ-4: Uses persistent FIR filter state across frames for click-free stereo. +// Reuses session scratch buffers to minimize allocations. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) []float32 { if len(mono) == 0 || sampleRate <= 0 { return nil } + n := len(mono) + + // Lazy-init stateful filters on first call + if sess.stereoLPF == nil { + lp := dsp.LowpassFIR(15000, sampleRate, 101) + sess.stereoLPF = dsp.NewStatefulFIRReal(lp) + sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101)) + sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101)) + sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp) + } - lp := dsp.LowpassFIR(15000, sampleRate, 101) - lpr := dsp.ApplyFIRReal(mono, lp) - - bpHi := dsp.LowpassFIR(53000, sampleRate, 101) - bpLo := dsp.LowpassFIR(23000, sampleRate, 101) - hi := dsp.ApplyFIRReal(mono, bpHi) - lo := dsp.ApplyFIRReal(mono, bpLo) - bpf := make([]float32, len(mono)) - for i := range mono { - bpf[i] = hi[i] - lo[i] + // Reuse scratch for intermediates: need 4*n float32 for bpf, lr, hi, lo + // plus 2*n for output. We'll use scratchAudio for bpf+lr (2*n) and + // allocate hi/lo from the stateful FIR ProcessInto. + scratch := sess.growAudio(n * 4) + bpf := scratch[:n] + lr := scratch[n : 2*n] + hiBuf := scratch[2*n : 3*n] + loBuf := scratch[3*n : 4*n] + + lpr := sess.stereoLPF.Process(mono) // allocates — but could use ProcessInto too + + sess.stereoBPHi.ProcessInto(mono, hiBuf) + sess.stereoBPLo.ProcessInto(mono, loBuf) + for i := 0; i < n; i++ { + bpf[i] = hiBuf[i] - loBuf[i] } - lr := make([]float32, len(mono)) phase := sess.stereoPhase inc := 2 * math.Pi * 38000 / float64(sampleRate) for i := range bpf { @@ -529,38 +698,84 @@ func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) } sess.stereoPhase = math.Mod(phase, 2*math.Pi) - lr = dsp.ApplyFIRReal(lr, lp) + lr = sess.stereoLRLPF.Process(lr) - out := make([]float32, len(lpr)*2) - for i := range lpr { + out := make([]float32, n*2) + for i := 0; i < n; i++ { out[i*2] = 0.5 * (lpr[i] + lr[i]) out[i*2+1] = 0.5 * (lpr[i] - lr[i]) } return out } +// dspStateSnapshot captures persistent DSP state for segment splits. +type dspStateSnapshot struct { + overlapIQ []complex64 + deemphL float64 + deemphR float64 + stereoPhase float64 + monoResampler *dsp.Resampler + stereoResampler *dsp.StereoResampler + stereoLPF *dsp.StatefulFIRReal + stereoBPHi *dsp.StatefulFIRReal + stereoBPLo *dsp.StatefulFIRReal + stereoLRLPF *dsp.StatefulFIRReal + stereoAALPF *dsp.StatefulFIRReal + preDemodFIR *dsp.StatefulFIRComplex + preDemodDecim int + preDemodRate int + preDemodCutoff float64 +} + +func (sess *streamSession) captureDSPState() dspStateSnapshot { + return dspStateSnapshot{ + overlapIQ: sess.overlapIQ, + deemphL: sess.deemphL, + deemphR: sess.deemphR, + stereoPhase: sess.stereoPhase, + monoResampler: sess.monoResampler, + stereoResampler: sess.stereoResampler, + stereoLPF: sess.stereoLPF, + stereoBPHi: sess.stereoBPHi, + stereoBPLo: sess.stereoBPLo, + stereoLRLPF: sess.stereoLRLPF, + stereoAALPF: sess.stereoAALPF, + preDemodFIR: sess.preDemodFIR, + preDemodDecim: sess.preDemodDecim, + preDemodRate: sess.preDemodRate, + preDemodCutoff: sess.preDemodCutoff, + } +} + +func (sess *streamSession) restoreDSPState(s dspStateSnapshot) { + sess.overlapIQ = s.overlapIQ + sess.deemphL = s.deemphL + sess.deemphR = s.deemphR + sess.stereoPhase = s.stereoPhase + sess.monoResampler = s.monoResampler + sess.stereoResampler = s.stereoResampler + sess.stereoLPF = s.stereoLPF + sess.stereoBPHi = s.stereoBPHi + sess.stereoBPLo = s.stereoBPLo + sess.stereoLRLPF = s.stereoLRLPF + sess.stereoAALPF = s.stereoAALPF + sess.preDemodFIR = s.preDemodFIR + sess.preDemodDecim = s.preDemodDecim + sess.preDemodRate = s.preDemodRate + sess.preDemodCutoff = s.preDemodCutoff +} + // --------------------------------------------------------------------------- // Session management helpers // --------------------------------------------------------------------------- -func (st *Streamer) openSession(sig *detector.Signal, now time.Time) (*streamSession, error) { +func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) { outputDir := st.policy.OutputDir if outputDir == "" { outputDir = "data/recordings" } - demodName := "NFM" - if sig.Class != nil { - if n := mapClassToDemod(sig.Class.ModType); n != "" { - demodName = n - } - } - channels := 1 - if demodName == "WFM_STEREO" { - channels = 2 - } else if d := demod.Get(demodName); d != nil { - channels = d.Channels() - } + demodName, channels := resolveDemod(sig) dirName := fmt.Sprintf("%s_%.0fHz_stream%d", now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID) @@ -580,28 +795,113 @@ func (st *Streamer) openSession(sig *detector.Signal, now time.Time) (*streamSes } sess := &streamSession{ - signalID: sig.ID, - centerHz: sig.CenterHz, - bwHz: sig.BWHz, - snrDb: sig.SNRDb, - peakDb: sig.PeakDb, - class: sig.Class, - startTime: now, - lastFeed: now, - dir: dir, - wavFile: f, - wavBuf: bufio.NewWriterSize(f, 64*1024), - sampleRate: streamAudioRate, - channels: channels, - demodName: demodName, - } - - log.Printf("STREAM: opened signal=%d %.1fMHz %s dir=%s", + signalID: sig.ID, + centerHz: sig.CenterHz, + bwHz: sig.BWHz, + snrDb: sig.SNRDb, + peakDb: sig.PeakDb, + class: sig.Class, + startTime: now, + lastFeed: now, + dir: dir, + wavFile: f, + wavBuf: bufio.NewWriterSize(f, 64*1024), + sampleRate: streamAudioRate, + channels: channels, + demodName: demodName, + deemphasisUs: st.policy.DeemphasisUs, + } + + log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s", sig.ID, sig.CenterHz/1e6, demodName, dirName) return sess, nil } +func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession { + demodName, channels := resolveDemod(sig) + + sess := &streamSession{ + signalID: sig.ID, + centerHz: sig.CenterHz, + bwHz: sig.BWHz, + snrDb: sig.SNRDb, + peakDb: sig.PeakDb, + class: sig.Class, + startTime: now, + lastFeed: now, + listenOnly: true, + sampleRate: streamAudioRate, + channels: channels, + demodName: demodName, + deemphasisUs: st.policy.DeemphasisUs, + } + + log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s", + sig.ID, sig.CenterHz/1e6, demodName) + return sess +} + +func resolveDemod(sig *detector.Signal) (string, int) { + demodName := "NFM" + if sig.Class != nil { + if n := mapClassToDemod(sig.Class.ModType); n != "" { + demodName = n + } + } + channels := 1 + if demodName == "WFM_STEREO" { + channels = 2 + } else if d := demod.Get(demodName); d != nil { + channels = d.Channels() + } + return demodName, channels +} + +// growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ. +func (sess *streamSession) growIQ(n int) []complex64 { + if cap(sess.scratchIQ) >= n { + return sess.scratchIQ[:n] + } + sess.scratchIQ = make([]complex64, n, n*5/4) + return sess.scratchIQ +} + +// growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio. +func (sess *streamSession) growAudio(n int) []float32 { + if cap(sess.scratchAudio) >= n { + return sess.scratchAudio[:n] + } + sess.scratchAudio = make([]float32, n, n*5/4) + return sess.scratchAudio +} + +// growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM. +func (sess *streamSession) growPCM(n int) []byte { + if cap(sess.scratchPCM) >= n { + return sess.scratchPCM[:n] + } + sess.scratchPCM = make([]byte, n, n*5/4) + return sess.scratchPCM +} + +func convertToListenOnly(sess *streamSession) { + if sess.wavBuf != nil { + _ = sess.wavBuf.Flush() + } + if sess.wavFile != nil { + fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels) + sess.wavFile.Close() + } + sess.wavFile = nil + sess.wavBuf = nil + sess.listenOnly = true + log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID) +} + func closeSession(sess *streamSession, policy *Policy) { + if sess.listenOnly { + return + } if sess.wavBuf != nil { _ = sess.wavBuf.Flush() } @@ -642,36 +942,18 @@ func closeSession(sess *streamSession, policy *Policy) { } } -func appendAudio(sess *streamSession, audio []float32) { - if sess.wavBuf == nil || len(audio) == 0 { - return - } - buf := make([]byte, len(audio)*2) - for i, s := range audio { - v := int16(clip(s * 32767)) - binary.LittleEndian.PutUint16(buf[i*2:], uint16(v)) - } - n, err := sess.wavBuf.Write(buf) - if err != nil { - log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err) - return - } - sess.wavSamples += int64(n / 2) -} - -func (st *Streamer) fanoutAudio(sess *streamSession, audio []float32) { +func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) { if len(sess.audioSubs) == 0 { return } - pcm := make([]byte, len(audio)*2) - for i, s := range audio { - v := int16(clip(s * 32767)) - binary.LittleEndian.PutUint16(pcm[i*2:], uint16(v)) - } + // Tag + copy for all subscribers: 0x01 prefix = PCM audio + tagged := make([]byte, 1+pcmLen) + tagged[0] = 0x01 + copy(tagged[1:], pcm[:pcmLen]) alive := sess.audioSubs[:0] for _, sub := range sess.audioSubs { select { - case sub.ch <- pcm: + case sub.ch <- tagged: default: } alive = append(alive, sub) @@ -694,6 +976,9 @@ func (st *Streamer) classAllowed(cls *classifier.Classification) bool { return false } +// ErrNoSession is returned when no matching signal session exists. +var ErrNoSession = errors.New("no active or pending session for this frequency") + // --------------------------------------------------------------------------- // WAV header helpers // --------------------------------------------------------------------------- diff --git a/sdr-visual-suite.rar b/sdr-visual-suite.rar index e5251ff..ea51e37 100644 Binary files a/sdr-visual-suite.rar and b/sdr-visual-suite.rar differ diff --git a/web/app.js b/web/app.js index 544d04c..b722952 100644 --- a/web/app.js +++ b/web/app.js @@ -117,7 +117,149 @@ const listenModeSelect = qs('listenMode'); let latest = null; let currentConfig = null; let liveAudio = null; +let liveListenWS = null; // WebSocket-based live listen let stats = { buffer_samples: 0, dropped: 0, resets: 0, last_sample_ago_ms: -1 }; + +// --------------------------------------------------------------------------- +// LiveListenWS — WebSocket-based gapless audio streaming via /ws/audio +// --------------------------------------------------------------------------- +class LiveListenWS { + constructor(freq, bw, mode) { + this.freq = freq; + this.bw = bw; + this.mode = mode; + this.ws = null; + this.audioCtx = null; + this.sampleRate = 48000; + this.channels = 1; + this.playing = false; + this.queue = []; // buffered PCM chunks + this.nextTime = 0; // next scheduled playback time + this.started = false; + this._onStop = null; + } + + start() { + const proto = location.protocol === 'https:' ? 'wss:' : 'ws:'; + const url = `${proto}//${location.host}/ws/audio?freq=${this.freq}&bw=${this.bw}&mode=${this.mode || ''}`; + this.ws = new WebSocket(url); + this.ws.binaryType = 'arraybuffer'; + this.playing = true; + + this.ws.onmessage = (ev) => { + if (typeof ev.data === 'string') { + // audio_info JSON message (initial or updated when session attached) + try { + const info = JSON.parse(ev.data); + if (info.sample_rate || info.channels) { + const newRate = info.sample_rate || 48000; + const newCh = info.channels || 1; + // If channels or rate changed, reinit AudioContext + if (newRate !== this.sampleRate || newCh !== this.channels) { + this.sampleRate = newRate; + this.channels = newCh; + if (this.audioCtx) { + this.audioCtx.close().catch(() => {}); + this.audioCtx = null; + } + this.started = false; + this.nextTime = 0; + } + this._initAudio(); + } + } catch (e) { /* ignore */ } + return; + } + // Binary PCM data (s16le) + if (!this.audioCtx || !this.playing) return; + this._playChunk(ev.data); + }; + + this.ws.onclose = () => { + this.playing = false; + if (this._onStop) this._onStop(); + }; + this.ws.onerror = () => { + this.playing = false; + if (this._onStop) this._onStop(); + }; + + // If no audio_info arrives within 500ms, init with defaults + setTimeout(() => { + if (!this.audioCtx && this.playing) this._initAudio(); + }, 500); + } + + stop() { + this.playing = false; + if (this.ws) { + this.ws.close(); + this.ws = null; + } + if (this.audioCtx) { + this.audioCtx.close().catch(() => {}); + this.audioCtx = null; + } + this.queue = []; + this.nextTime = 0; + this.started = false; + } + + onStop(fn) { this._onStop = fn; } + + _initAudio() { + if (this.audioCtx) return; + this.audioCtx = new (window.AudioContext || window.webkitAudioContext)({ + sampleRate: this.sampleRate + }); + this.nextTime = 0; + this.started = false; + } + + _playChunk(buf) { + const ctx = this.audioCtx; + if (!ctx) return; + + const samples = new Int16Array(buf); + const nFrames = Math.floor(samples.length / this.channels); + if (nFrames === 0) return; + + const audioBuffer = ctx.createBuffer(this.channels, nFrames, this.sampleRate); + for (let ch = 0; ch < this.channels; ch++) { + const channelData = audioBuffer.getChannelData(ch); + for (let i = 0; i < nFrames; i++) { + channelData[i] = samples[i * this.channels + ch] / 32768; + } + } + + const source = ctx.createBufferSource(); + source.buffer = audioBuffer; + source.connect(ctx.destination); + + // Schedule gapless playback with drift correction. + // We target a small jitter buffer (~100ms ahead of real time). + // If nextTime falls behind currentTime, we resync with a small + // buffer to avoid audible gaps. + const now = ctx.currentTime; + const targetLatency = 0.1; // 100ms jitter buffer + + if (!this.started || this.nextTime < now) { + // First chunk or buffer underrun — resync + this.nextTime = now + targetLatency; + this.started = true; + } + + // If we've drifted too far ahead (>500ms of buffered audio), + // drop this chunk to reduce latency. This prevents the buffer + // from growing unbounded when the server sends faster than realtime. + if (this.nextTime > now + 0.5) { + return; // drop — too much buffered + } + + source.start(this.nextTime); + this.nextTime += audioBuffer.duration; + } +} let gpuInfo = { available: false, active: false, error: '' }; let zoom = 1; @@ -1331,12 +1473,46 @@ function tuneToFrequency(centerHz) { function connect() { clearTimeout(wsReconnectTimer); const proto = location.protocol === 'https:' ? 'wss' : 'ws'; - const ws = new WebSocket(`${proto}://${location.host}/ws`); + + // Remote optimization: detect non-localhost and opt into binary + decimation + const hn = location.hostname; + const isLocal = ['localhost', '127.0.0.1', '::1'].includes(hn) + || hn.startsWith('192.168.') + || hn.startsWith('10.') + || /^172\.(1[6-9]|2\d|3[01])\./.test(hn) + || hn.endsWith('.local') + || hn.endsWith('.lan'); + const params = new URLSearchParams(location.search); + const wantBinary = params.get('binary') === '1' || !isLocal; + const bins = parseInt(params.get('bins') || (isLocal ? '0' : '2048'), 10); + const fps = parseInt(params.get('fps') || (isLocal ? '0' : '10'), 10); + + let wsUrl = `${proto}://${location.host}/ws`; + if (wantBinary || bins > 0 || fps > 0) { + const qp = []; + if (wantBinary) qp.push('binary=1'); + if (bins > 0) qp.push(`bins=${bins}`); + if (fps > 0) qp.push(`fps=${fps}`); + wsUrl += '?' + qp.join('&'); + } + + const ws = new WebSocket(wsUrl); + ws.binaryType = 'arraybuffer'; setWsBadge('Connecting', 'neutral'); ws.onopen = () => setWsBadge('Live', 'ok'); ws.onmessage = (ev) => { - latest = JSON.parse(ev.data); + if (ev.data instanceof ArrayBuffer) { + try { + const decoded = decodeBinaryFrame(ev.data); + if (decoded) latest = decoded; + } catch (e) { + console.warn('binary frame decode error:', e); + return; + } + } else { + latest = JSON.parse(ev.data); + } markSpectrumDirty(); if (followLive) pan = 0; updateHeroMetrics(); @@ -1349,6 +1525,59 @@ function connect() { ws.onerror = () => ws.close(); } +// Decode binary spectrum frame v4 (hybrid: binary spectrum + JSON signals) +function decodeBinaryFrame(buf) { + const view = new DataView(buf); + if (buf.byteLength < 32) return null; + + // Header: 32 bytes + const magic0 = view.getUint8(0); + const magic1 = view.getUint8(1); + if (magic0 !== 0x53 || magic1 !== 0x50) return null; // not "SP" + + const version = view.getUint16(2, true); + const ts = Number(view.getBigInt64(4, true)); + const centerHz = view.getFloat64(12, true); + const binCount = view.getUint32(20, true); + const sampleRateHz = view.getUint32(24, true); + const jsonOffset = view.getUint32(28, true); + + if (buf.byteLength < 32 + binCount * 2) return null; + + // Spectrum: binCount × int16 at offset 32 + const spectrum = new Float64Array(binCount); + let off = 32; + for (let i = 0; i < binCount; i++) { + spectrum[i] = view.getInt16(off, true) / 100; + off += 2; + } + + // JSON signals + debug after the spectrum data + let signals = []; + let debug = null; + if (jsonOffset > 0 && jsonOffset < buf.byteLength) { + try { + const jsonBytes = new Uint8Array(buf, jsonOffset); + const jsonStr = new TextDecoder().decode(jsonBytes); + const parsed = JSON.parse(jsonStr); + signals = parsed.signals || []; + debug = parsed.debug || null; + } catch (e) { + // JSON parse failed — continue with empty signals + } + } + + return { + ts: ts, + center_hz: centerHz, + sample_rate: sampleRateHz, + fft_size: binCount, + spectrum_db: spectrum, + signals: signals, + debug: debug + }; +} + function renderLoop() { renderFrames += 1; const now = performance.now(); @@ -1447,7 +1676,7 @@ window.addEventListener('mousemove', (ev) => { hoveredSignal = hoverHit.signal; renderSignalPopover(hoverHit, hoverHit.signal); } else { - scheduleHideSignalPopover(); + hideSignalPopover(); } if (isDraggingSpectrum) { const dx = ev.clientX - dragStartX; @@ -1664,13 +1893,33 @@ if (liveListenEventBtn) { liveListenEventBtn.addEventListener('click', () => { const ev = eventsById.get(selectedEventId); if (!ev) return; + + // Toggle off if already listening + if (liveListenWS && liveListenWS.playing) { + liveListenWS.stop(); + liveListenWS = null; + liveListenEventBtn.textContent = 'Listen'; + liveListenEventBtn.classList.remove('active'); + if (liveListenBtn) { liveListenBtn.textContent = 'Live Listen'; liveListenBtn.classList.remove('active'); } + return; + } + const freq = ev.center_hz; const bw = ev.bandwidth_hz || 12000; const mode = (listenModeSelect?.value || ev.class?.mod_type || 'NFM'); - const sec = parseInt(listenSecondsInput?.value || '2', 10); - const url = `/api/demod?freq=${freq}&bw=${bw}&mode=${mode}&sec=${sec}`; - const audio = new Audio(url); - audio.play(); + + if (liveAudio) { liveAudio.pause(); liveAudio = null; } + + liveListenWS = new LiveListenWS(freq, bw, mode); + liveListenWS.onStop(() => { + liveListenEventBtn.textContent = 'Listen'; + liveListenEventBtn.classList.remove('active'); + if (liveListenBtn) { liveListenBtn.textContent = 'Live Listen'; liveListenBtn.classList.remove('active'); } + liveListenWS = null; + }); + liveListenWS.start(); + liveListenEventBtn.textContent = '■ Stop'; + liveListenEventBtn.classList.add('active'); }); } if (decodeEventBtn) { @@ -1729,6 +1978,15 @@ signalList.addEventListener('click', (ev) => { if (liveListenBtn) { liveListenBtn.addEventListener('click', async () => { + // Toggle: if already listening, stop + if (liveListenWS && liveListenWS.playing) { + liveListenWS.stop(); + liveListenWS = null; + liveListenBtn.textContent = 'Live Listen'; + liveListenBtn.classList.remove('active'); + return; + } + // Use selected signal if available, otherwise first in list let freq, bw, mode; if (window._selectedSignal) { @@ -1743,14 +2001,20 @@ if (liveListenBtn) { mode = first.dataset.class || ''; } if (!Number.isFinite(freq)) return; - mode = (listenModeSelect?.value === 'Auto') ? (mode || 'NFM') : listenModeSelect.value; - const sec = parseInt(listenSecondsInput?.value || '2', 10); - const url = `/api/demod?freq=${freq}&bw=${bw}&mode=${mode}&sec=${sec}`; - if (liveAudio) { - liveAudio.pause(); - } - liveAudio = new Audio(url); - liveAudio.play().catch(() => {}); + mode = (listenModeSelect?.value === 'Auto' || listenModeSelect?.value === '') ? (mode || 'NFM') : listenModeSelect.value; + + // Stop any old HTTP audio + if (liveAudio) { liveAudio.pause(); liveAudio = null; } + + liveListenWS = new LiveListenWS(freq, bw, mode); + liveListenWS.onStop(() => { + liveListenBtn.textContent = 'Live Listen'; + liveListenBtn.classList.remove('active'); + liveListenWS = null; + }); + liveListenWS.start(); + liveListenBtn.textContent = '■ Stop'; + liveListenBtn.classList.add('active'); }); } diff --git a/web/style.css b/web/style.css index 65f15f7..5b96eab 100644 --- a/web/style.css +++ b/web/style.css @@ -496,3 +496,15 @@ body.mode-lab .hero-metrics { grid-template-columns: repeat(3, minmax(0, 1fr)); input[type="number"]::-webkit-inner-spin-button, input[type="number"]::-webkit-outer-spin-button { opacity: 0.3; } input[type="number"]:hover::-webkit-inner-spin-button { opacity: 0.7; } + +/* Active live-listen button */ +.act-btn.active { + background: var(--accent); + color: var(--bg-0); + box-shadow: 0 0 12px rgba(0, 255, 200, 0.3); + animation: listen-pulse 1.5s ease-in-out infinite; +} +@keyframes listen-pulse { + 0%, 100% { box-shadow: 0 0 8px rgba(0, 255, 200, 0.2); } + 50% { box-shadow: 0 0 16px rgba(0, 255, 200, 0.5); } +}