diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index 450ed44..ecea970 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -49,6 +49,9 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * mu sync.Mutex } rdsMap := map[int64]*rdsState{} + // Streaming extraction state: per-signal phase + IQ overlap for FIR halo + streamPhaseState := map[int64]*streamExtractState{} + streamOverlap := &streamIQOverlap{} var gpuEngine *gpufft.Engine if useGPU && gpuState != nil { snap := gpuState.snapshot() @@ -206,6 +209,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * finished, signals := det.Process(now, spectrum, cfg.CenterHz) thresholds := det.LastThresholds() noiseFloor := det.LastNoiseFloor() + var displaySignals []detector.Signal if len(iq) > 0 { snips, snipRates := extractSignalIQBatch(extractMgr, iq, cfg.SampleRate, cfg.CenterHz, signals) for i := range signals { @@ -317,9 +321,39 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * } } } + + // GPU-extract signal snippets with phase-continuous FreqShift and + // IQ overlap for FIR halo. Heavy work on GPU, only demod runs async. + displaySignals = det.StableSignals() + if rec != nil && len(displaySignals) > 0 && len(allIQ) > 0 { + streamSnips, streamRates := extractForStreaming(extractMgr, allIQ, cfg.SampleRate, cfg.CenterHz, displaySignals, streamPhaseState, streamOverlap) + items := make([]recorder.StreamFeedItem, 0, len(displaySignals)) + for j, ds := range displaySignals { + if ds.ID == 0 || ds.Class == nil { + continue + } + if j >= len(streamSnips) || len(streamSnips[j]) == 0 { + continue + } + snipRate := cfg.SampleRate + if j < len(streamRates) && streamRates[j] > 0 { + snipRate = streamRates[j] + } + items = append(items, recorder.StreamFeedItem{ + Signal: ds, + Snippet: streamSnips[j], + SnipRate: snipRate, + }) + } + if len(items) > 0 { + rec.FeedSnippets(items) + } + } + } else { + // No IQ data this frame — still need displaySignals for broadcast + displaySignals = det.StableSignals() } - // Use smoothed active events for frontend display (stable markers) - displaySignals := det.StableSignals() + if sigSnap != nil { sigSnap.set(displaySignals) } diff --git a/cmd/sdrd/helpers.go b/cmd/sdrd/helpers.go index 2693272..24cad26 100644 --- a/cmd/sdrd/helpers.go +++ b/cmd/sdrd/helpers.go @@ -2,6 +2,7 @@ package main import ( "log" + "math" "sort" "strconv" "time" @@ -73,9 +74,22 @@ func (m *extractionManager) get(sampleCount int, sampleRate int) *gpudemod.Batch } m.mu.Lock() defer m.mu.Unlock() + if m.runner != nil && sampleCount > m.maxSamples { + m.runner.Close() + m.runner = nil + } if m.runner == nil { - if r, err := gpudemod.NewBatchRunner(sampleCount, sampleRate); err == nil { + // Allocate generously: enough for full allIQ (sampleRate/10 ≈ 100ms) + // so the runner never needs re-allocation when used for both + // classification (FFT-block ~65k) and streaming (allIQ ~273k+). + allocSize := sampleCount + generous := sampleRate/10 + 1024 // ~400k at 4MHz — covers any scenario + if generous > allocSize { + allocSize = generous + } + if r, err := gpudemod.NewBatchRunner(allocSize, sampleRate); err == nil { m.runner = r + m.maxSamples = allocSize } else { log.Printf("gpudemod: batch runner init failed: %v", err) } @@ -188,3 +202,171 @@ func parseSince(raw string) (time.Time, error) { } return time.Parse(time.RFC3339, raw) } + +// streamExtractState holds per-signal persistent state for phase-continuous +// GPU extraction. Stored in the DSP loop, keyed by signal ID. +type streamExtractState struct { + phase float64 // FreqShift phase accumulator +} + +// streamIQOverlap holds the tail of the previous allIQ for FIR halo prepend. +type streamIQOverlap struct { + tail []complex64 +} + +const streamOverlapLen = 512 // must be >= FIR tap count (101) with margin + +// extractForStreaming performs GPU-accelerated extraction with: +// - Per-signal phase-continuous FreqShift (via PhaseStart in ExtractJob) +// - IQ overlap prepended to allIQ so FIR kernel has real data in halo +// +// Returns extracted snippets with overlap trimmed, and updates phase state. +func extractForStreaming( + extractMgr *extractionManager, + allIQ []complex64, + sampleRate int, + centerHz float64, + signals []detector.Signal, + phaseState map[int64]*streamExtractState, + overlap *streamIQOverlap, +) ([][]complex64, []int) { + out := make([][]complex64, len(signals)) + rates := make([]int, len(signals)) + if len(allIQ) == 0 || sampleRate <= 0 || len(signals) == 0 { + return out, rates + } + + // Prepend overlap from previous frame so FIR kernel has real halo data + var gpuIQ []complex64 + overlapLen := len(overlap.tail) + if overlapLen > 0 { + gpuIQ = make([]complex64, overlapLen+len(allIQ)) + copy(gpuIQ, overlap.tail) + copy(gpuIQ[overlapLen:], allIQ) + } else { + gpuIQ = allIQ + overlapLen = 0 + } + + // Save tail for next frame + if len(allIQ) > streamOverlapLen { + overlap.tail = append(overlap.tail[:0], allIQ[len(allIQ)-streamOverlapLen:]...) + } else { + overlap.tail = append(overlap.tail[:0], allIQ...) + } + + decimTarget := 200000 + + // Build jobs with per-signal phase + jobs := make([]gpudemod.ExtractJob, len(signals)) + for i, sig := range signals { + bw := sig.BWHz + sigMHz := sig.CenterHz / 1e6 + isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || + (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO")) + if isWFM { + if bw < 150000 { + bw = 150000 + } + } else if bw < 20000 { + bw = 20000 + } + + ps := phaseState[sig.ID] + if ps == nil { + ps = &streamExtractState{} + phaseState[sig.ID] = ps + } + + // PhaseStart is where the NEW data begins. But gpuIQ has overlap + // prepended, so the GPU kernel starts processing at the overlap. + // We need to rewind the phase by overlapLen samples so that the + // overlap region gets the correct phase, and the new data region + // starts at ps.phase exactly. + phaseInc := -2.0 * math.Pi * (sig.CenterHz - centerHz) / float64(sampleRate) + gpuPhaseStart := ps.phase - phaseInc*float64(overlapLen) + + jobs[i] = gpudemod.ExtractJob{ + OffsetHz: sig.CenterHz - centerHz, + BW: bw, + OutRate: decimTarget, + PhaseStart: gpuPhaseStart, + } + } + + // Try GPU BatchRunner with phase + runner := extractMgr.get(len(gpuIQ), sampleRate) + if runner != nil { + results, err := runner.ShiftFilterDecimateBatchWithPhase(gpuIQ, jobs) + if err == nil && len(results) == len(signals) { + decim := sampleRate / decimTarget + if decim < 1 { + decim = 1 + } + trimSamples := overlapLen / decim + for i, res := range results { + // Update phase state — advance only by NEW data length, not overlap + phaseInc := -2.0 * math.Pi * jobs[i].OffsetHz / float64(sampleRate) + phaseState[signals[i].ID].phase += phaseInc * float64(len(allIQ)) + + // Trim overlap from output + iq := res.IQ + if trimSamples > 0 && trimSamples < len(iq) { + iq = iq[trimSamples:] + } + out[i] = iq + rates[i] = res.Rate + } + return out, rates + } else if err != nil { + log.Printf("gpudemod: stream batch extraction failed: %v", err) + } + } + + // CPU fallback (with phase tracking) + for i, sig := range signals { + offset := sig.CenterHz - centerHz + bw := jobs[i].BW + ps := phaseState[sig.ID] + + // Phase-continuous FreqShift — rewind by overlap so new data starts at ps.phase + shifted := make([]complex64, len(gpuIQ)) + inc := -2.0 * math.Pi * offset / float64(sampleRate) + phase := ps.phase - inc*float64(overlapLen) + for k, v := range gpuIQ { + phase += inc + re := math.Cos(phase) + im := math.Sin(phase) + shifted[k] = complex( + float32(float64(real(v))*re-float64(imag(v))*im), + float32(float64(real(v))*im+float64(imag(v))*re), + ) + } + // Advance phase by NEW data length only + ps.phase += inc * float64(len(allIQ)) + + cutoff := bw / 2 + if cutoff < 200 { + cutoff = 200 + } + if cutoff > float64(sampleRate)/2-1 { + cutoff = float64(sampleRate)/2 - 1 + } + taps := dsp.LowpassFIR(cutoff, sampleRate, 101) + filtered := dsp.ApplyFIR(shifted, taps) + decim := sampleRate / decimTarget + if decim < 1 { + decim = 1 + } + decimated := dsp.Decimate(filtered, decim) + rates[i] = sampleRate / decim + + // Trim overlap + trimSamples := overlapLen / decim + if trimSamples > 0 && trimSamples < len(decimated) { + decimated = decimated[trimSamples:] + } + out[i] = decimated + } + return out, rates +} diff --git a/cmd/sdrd/http_handlers.go b/cmd/sdrd/http_handlers.go index ad0499f..483b8cc 100644 --- a/cmd/sdrd/http_handlers.go +++ b/cmd/sdrd/http_handlers.go @@ -221,6 +221,11 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime } http.ServeFile(w, r, filepath.Join(base, "meta.json")) }) + mux.HandleFunc("/api/streams", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + n := recMgr.ActiveStreams() + _ = json.NewEncoder(w).Encode(map[string]any{"active_sessions": n}) + }) mux.HandleFunc("/api/demod", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) @@ -249,7 +254,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime func newHTTPServer(addr string, webRoot string, h *hub, cfgPath string, cfgManager *runtime.Manager, srcMgr *sourceManager, dspUpdates chan dspUpdate, gpuState *gpuStatus, recMgr *recorder.Manager, sigSnap *signalSnapshot, eventMu *sync.RWMutex) *http.Server { mux := http.NewServeMux() - registerWSHandlers(mux, h) + registerWSHandlers(mux, h, recMgr) registerAPIHandlers(mux, cfgPath, cfgManager, srcMgr, dspUpdates, gpuState, recMgr, sigSnap, eventMu) mux.Handle("/", http.FileServer(http.Dir(webRoot))) return &http.Server{Addr: addr, Handler: mux} diff --git a/cmd/sdrd/types.go b/cmd/sdrd/types.go index 3307fe2..7ef59dc 100644 --- a/cmd/sdrd/types.go +++ b/cmd/sdrd/types.go @@ -61,8 +61,9 @@ type sourceManager struct { } type extractionManager struct { - mu sync.Mutex - runner *gpudemod.BatchRunner + mu sync.Mutex + runner *gpudemod.BatchRunner + maxSamples int } type dspUpdate struct { diff --git a/cmd/sdrd/ws_handlers.go b/cmd/sdrd/ws_handlers.go index 52386a5..d5eb2da 100644 --- a/cmd/sdrd/ws_handlers.go +++ b/cmd/sdrd/ws_handlers.go @@ -1,14 +1,18 @@ package main import ( + "encoding/json" "log" "net/http" + "strconv" "time" "github.com/gorilla/websocket" + + "sdr-visual-suite/internal/recorder" ) -func registerWSHandlers(mux *http.ServeMux, h *hub) { +func registerWSHandlers(mux *http.ServeMux, h *hub, recMgr *recorder.Manager) { upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { origin := r.Header.Get("Origin") if origin == "" || origin == "null" { @@ -63,4 +67,97 @@ func registerWSHandlers(mux *http.ServeMux, h *hub) { } } }) + + // /ws/audio — WebSocket endpoint for continuous live-listen audio streaming. + // Client connects with query params: freq, bw, mode + // Server sends binary frames of PCM s16le audio at 48kHz. + mux.HandleFunc("/ws/audio", func(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + freq, _ := strconv.ParseFloat(q.Get("freq"), 64) + bw, _ := strconv.ParseFloat(q.Get("bw"), 64) + mode := q.Get("mode") + if freq <= 0 { + http.Error(w, "freq required", http.StatusBadRequest) + return + } + if bw <= 0 { + bw = 12000 + } + + streamer := recMgr.StreamerRef() + if streamer == nil { + http.Error(w, "streamer not available", http.StatusServiceUnavailable) + return + } + + subID, ch := streamer.SubscribeAudio(freq, bw, mode) + if ch == nil { + http.Error(w, "no active stream for this frequency", http.StatusNotFound) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + streamer.UnsubscribeAudio(subID) + log.Printf("ws/audio upgrade failed: %v", err) + return + } + defer func() { + streamer.UnsubscribeAudio(subID) + _ = conn.Close() + }() + + log.Printf("ws/audio: client connected freq=%.1fMHz mode=%s", freq/1e6, mode) + + // Send audio stream info as first text message + info := map[string]any{ + "type": "audio_info", + "sample_rate": 48000, + "channels": 1, + "format": "s16le", + "freq": freq, + "mode": mode, + } + if infoBytes, err := json.Marshal(info); err == nil { + _ = conn.WriteMessage(websocket.TextMessage, infoBytes) + } + + // Read goroutine (to detect disconnect) + done := make(chan struct{}) + go func() { + defer close(done) + for { + _, _, err := conn.ReadMessage() + if err != nil { + return + } + } + }() + + ping := time.NewTicker(30 * time.Second) + defer ping.Stop() + + for { + select { + case pcm, ok := <-ch: + if !ok { + log.Printf("ws/audio: stream ended freq=%.1fMHz", freq/1e6) + return + } + _ = conn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond)) + if err := conn.WriteMessage(websocket.BinaryMessage, pcm); err != nil { + log.Printf("ws/audio: write error: %v", err) + return + } + case <-ping.C: + _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + case <-done: + log.Printf("ws/audio: client disconnected freq=%.1fMHz", freq/1e6) + return + } + } + }) } diff --git a/internal/demod/gpudemod/batch.go b/internal/demod/gpudemod/batch.go index 1630fb3..6bbf9df 100644 --- a/internal/demod/gpudemod/batch.go +++ b/internal/demod/gpudemod/batch.go @@ -1,9 +1,20 @@ package gpudemod +import "math" + type ExtractJob struct { - OffsetHz float64 - BW float64 - OutRate int + OffsetHz float64 + BW float64 + OutRate int + PhaseStart float64 // FreqShift starting phase (0 for stateless, carry over for streaming) +} + +// ExtractResult holds the output of a batch extraction including the ending +// phase of the FreqShift oscillator for phase-continuous streaming. +type ExtractResult struct { + IQ []complex64 + Rate int + PhaseEnd float64 // FreqShift phase at end of this block — pass as PhaseStart next frame } func (e *Engine) ShiftFilterDecimateBatch(iq []complex64, jobs []ExtractJob) ([][]complex64, []int, error) { @@ -19,3 +30,22 @@ func (e *Engine) ShiftFilterDecimateBatch(iq []complex64, jobs []ExtractJob) ([] } return outs, rates, nil } + +// ShiftFilterDecimateBatchWithPhase is like ShiftFilterDecimateBatch but uses +// per-job PhaseStart and returns per-job PhaseEnd for phase-continuous streaming. +func (e *Engine) ShiftFilterDecimateBatchWithPhase(iq []complex64, jobs []ExtractJob) ([]ExtractResult, error) { + results := make([]ExtractResult, len(jobs)) + for i, job := range jobs { + out, rate, err := e.ShiftFilterDecimate(iq, job.OffsetHz, job.BW, job.OutRate) + if err != nil { + return nil, err + } + phaseInc := -2.0 * math.Pi * job.OffsetHz / float64(e.sampleRate) + results[i] = ExtractResult{ + IQ: out, + Rate: rate, + PhaseEnd: job.PhaseStart + phaseInc*float64(len(iq)), + } + } + return results, nil +} diff --git a/internal/demod/gpudemod/batch_runner.go b/internal/demod/gpudemod/batch_runner.go index 02b2aff..7441263 100644 --- a/internal/demod/gpudemod/batch_runner.go +++ b/internal/demod/gpudemod/batch_runner.go @@ -1,5 +1,7 @@ package gpudemod +import "math" + type batchSlot struct { job ExtractJob out []complex64 @@ -8,9 +10,10 @@ type batchSlot struct { } type BatchRunner struct { - eng *Engine - slots []batchSlot - slotBufs []slotBuffers + eng *Engine + slots []batchSlot + slotBufs []slotBuffers + slotBufSize int // number of IQ samples the slot buffers were allocated for } func NewBatchRunner(maxSamples int, sampleRate int) (*BatchRunner, error) { @@ -49,3 +52,34 @@ func (r *BatchRunner) ShiftFilterDecimateBatch(iq []complex64, jobs []ExtractJob r.prepare(jobs) return r.shiftFilterDecimateBatchImpl(iq) } + +// ShiftFilterDecimateBatchWithPhase uses per-job PhaseStart and returns +// per-job PhaseEnd for phase-continuous streaming. +func (r *BatchRunner) ShiftFilterDecimateBatchWithPhase(iq []complex64, jobs []ExtractJob) ([]ExtractResult, error) { + if r == nil || r.eng == nil { + return nil, ErrUnavailable + } + r.prepare(jobs) + outs, rates, err := r.shiftFilterDecimateBatchImpl(iq) + if err != nil { + return nil, err + } + results := make([]ExtractResult, len(jobs)) + for i, job := range jobs { + phaseInc := -2.0 * math.Pi * job.OffsetHz / float64(r.eng.sampleRate) + var iq_out []complex64 + var rate int + if i < len(outs) { + iq_out = outs[i] + } + if i < len(rates) { + rate = rates[i] + } + results[i] = ExtractResult{ + IQ: iq_out, + Rate: rate, + PhaseEnd: job.PhaseStart + phaseInc*float64(len(iq)), + } + } + return results, nil +} diff --git a/internal/demod/gpudemod/batch_runner_windows.go b/internal/demod/gpudemod/batch_runner_windows.go index a7ed004..a63db27 100644 --- a/internal/demod/gpudemod/batch_runner_windows.go +++ b/internal/demod/gpudemod/batch_runner_windows.go @@ -50,7 +50,9 @@ func (r *BatchRunner) freeSlotBuffers() { } func (r *BatchRunner) allocSlotBuffers(n int) error { - if len(r.slotBufs) == len(r.slots) && len(r.slotBufs) > 0 { + // Re-allocate if slot count changed OR if buffer size grew + needRealloc := len(r.slotBufs) != len(r.slots) || n > r.slotBufSize + if !needRealloc && len(r.slotBufs) > 0 { return nil } r.freeSlotBuffers() @@ -78,6 +80,7 @@ func (r *BatchRunner) allocSlotBuffers(n int) error { } r.slotBufs[i].stream = s } + r.slotBufSize = n return nil } @@ -166,7 +169,7 @@ func (r *BatchRunner) shiftFilterDecimateSlotParallel(iq []complex64, job Extrac return 0, 0, errors.New("not enough output samples after decimation") } phaseInc := -2.0 * math.Pi * job.OffsetHz / float64(e.sampleRate) - if bridgeLaunchFreqShiftStream(e.dIQIn, (*gpuFloat2)(buf.dShifted), n, phaseInc, e.phase, buf.stream) != 0 { + if bridgeLaunchFreqShiftStream(e.dIQIn, (*gpuFloat2)(buf.dShifted), n, phaseInc, job.PhaseStart, buf.stream) != 0 { return 0, 0, errors.New("gpu freq shift failed") } if bridgeLaunchFIRv2Stream((*gpuFloat2)(buf.dShifted), (*gpuFloat2)(buf.dFiltered), (*C.float)(buf.dTaps), n, len(taps), buf.stream) != 0 { diff --git a/internal/recorder/demod.go b/internal/recorder/demod.go index f354a6a..a446af5 100644 --- a/internal/recorder/demod.go +++ b/internal/recorder/demod.go @@ -87,6 +87,8 @@ func mapClassToDemod(c classifier.SignalClass) string { return "NFM" case classifier.ClassWFM: return "WFM" + case classifier.ClassWFMStereo: + return "WFM_STEREO" case classifier.ClassSSBUSB: return "USB" case classifier.ClassSSBLSB: diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go index 4b4ea5d..00c119e 100644 --- a/internal/recorder/recorder.go +++ b/internal/recorder/recorder.go @@ -3,6 +3,7 @@ package recorder import ( "errors" "fmt" + "log" "os" "path/filepath" "strings" @@ -42,6 +43,11 @@ type Manager struct { closed bool closeOnce sync.Once workerWG sync.WaitGroup + + // Streaming recorder + streamer *Streamer + streamedIDs map[int64]bool // signal IDs that were streamed (skip retroactive recording) + streamedMu sync.Mutex } func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeCommands map[string]string) *Manager { @@ -51,7 +57,17 @@ func New(sampleRate int, blockSize int, policy Policy, centerHz float64, decodeC if policy.RingSeconds <= 0 { policy.RingSeconds = 8 } - m := &Manager{policy: policy, ring: NewRing(sampleRate, blockSize, policy.RingSeconds), sampleRate: sampleRate, blockSize: blockSize, centerHz: centerHz, decodeCommands: decodeCommands, queue: make(chan detector.Event, 64)} + m := &Manager{ + policy: policy, + ring: NewRing(sampleRate, blockSize, policy.RingSeconds), + sampleRate: sampleRate, + blockSize: blockSize, + centerHz: centerHz, + decodeCommands: decodeCommands, + queue: make(chan detector.Event, 64), + streamer: newStreamer(policy, centerHz), + streamedIDs: make(map[int64]bool), + } m.initGPUDemod(sampleRate, blockSize) m.workerWG.Add(1) go m.worker() @@ -78,6 +94,9 @@ func (m *Manager) Update(sampleRate int, blockSize int, policy Policy, centerHz } else if m.ring == nil { m.ring = NewRing(sampleRate, blockSize, policy.RingSeconds) } + if m.streamer != nil { + m.streamer.updatePolicy(policy, centerHz) + } } func (m *Manager) Ingest(t0 time.Time, samples []complex64) { @@ -152,6 +171,11 @@ func (m *Manager) Close() { return } m.closeOnce.Do(func() { + // Close all active streaming sessions first + if m.streamer != nil { + m.streamer.CloseAll() + } + m.mu.Lock() m.closed = true if m.queue != nil { @@ -168,6 +192,16 @@ func (m *Manager) Close() { } func (m *Manager) recordEvent(ev detector.Event) error { + // Skip events that were already recorded via streaming + m.streamedMu.Lock() + wasStreamed := m.streamedIDs[ev.ID] + delete(m.streamedIDs, ev.ID) // clean up — event is finished + m.streamedMu.Unlock() + if wasStreamed { + log.Printf("STREAM: skipping retroactive recording for signal %d (already streamed)", ev.ID) + return nil + } + m.mu.RLock() policy := m.policy ring := m.ring @@ -266,3 +300,61 @@ func (m *Manager) SliceRecent(seconds float64) ([]complex64, int, float64) { iq := ring.Slice(start, end) return iq, sr, center } + +// FeedSnippets is called once per DSP frame with pre-extracted IQ snippets +// (GPU-accelerated FreqShift+FIR+Decimate). The Streamer handles demod with +// persistent state (overlap-save, stereo decode, de-emphasis) asynchronously. +func (m *Manager) FeedSnippets(items []StreamFeedItem) { + if m == nil || m.streamer == nil || len(items) == 0 { + return + } + m.mu.RLock() + closed := m.closed + m.mu.RUnlock() + if closed { + return + } + + // Mark all signal IDs so recordEvent skips them + m.streamedMu.Lock() + for _, item := range items { + if item.Signal.ID != 0 { + m.streamedIDs[item.Signal.ID] = true + } + } + m.streamedMu.Unlock() + + // Convert to internal type + internal := make([]streamFeedItem, len(items)) + for i, item := range items { + internal[i] = streamFeedItem{ + signal: item.Signal, + snippet: item.Snippet, + snipRate: item.SnipRate, + } + } + m.streamer.FeedSnippets(internal) +} + +// StreamFeedItem is the public type for passing extracted snippets from DSP loop. +type StreamFeedItem struct { + Signal detector.Signal + Snippet []complex64 + SnipRate int +} + +// Streamer returns the underlying Streamer for live-listen subscriptions. +func (m *Manager) StreamerRef() *Streamer { + if m == nil { + return nil + } + return m.streamer +} + +// ActiveStreams returns info about currently active streaming sessions. +func (m *Manager) ActiveStreams() int { + if m == nil || m.streamer == nil { + return 0 + } + return m.streamer.ActiveSessions() +} diff --git a/internal/recorder/streamer.go b/internal/recorder/streamer.go new file mode 100644 index 0000000..9bbaa8f --- /dev/null +++ b/internal/recorder/streamer.go @@ -0,0 +1,750 @@ +package recorder + +import ( + "bufio" + "encoding/binary" + "encoding/json" + "fmt" + "log" + "math" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "sdr-visual-suite/internal/classifier" + "sdr-visual-suite/internal/demod" + "sdr-visual-suite/internal/detector" + "sdr-visual-suite/internal/dsp" +) + +// --------------------------------------------------------------------------- +// streamSession — one open recording for one signal +// --------------------------------------------------------------------------- + +type streamSession struct { + signalID int64 + centerHz float64 + bwHz float64 + snrDb float64 + peakDb float64 + class *classifier.Classification + startTime time.Time + lastFeed time.Time + + dir string + wavFile *os.File + wavBuf *bufio.Writer + wavSamples int64 + sampleRate int // actual output audio sample rate + channels int + demodName string + segmentIdx int + + // --- Persistent DSP state for click-free streaming --- + + // Overlap-save: tail of previous extracted IQ snippet. + // Prepended to the next snippet so FIR filters and FM discriminator + // have history — eliminates transient clicks at frame boundaries. + overlapIQ []complex64 + + // De-emphasis IIR state (persists across frames) + deemphL float64 + deemphR float64 + + // Stereo decode: phase-continuous 38kHz oscillator + stereoPhase float64 + + // live-listen subscribers + audioSubs []audioSub +} + +type audioSub struct { + id int64 + ch chan []byte +} + +const ( + streamAudioRate = 48000 +) + +// --------------------------------------------------------------------------- +// Streamer — manages all active streaming sessions +// --------------------------------------------------------------------------- + +type streamFeedItem struct { + signal detector.Signal + snippet []complex64 + snipRate int +} + +type streamFeedMsg struct { + items []streamFeedItem +} + +type Streamer struct { + mu sync.Mutex + sessions map[int64]*streamSession + policy Policy + centerHz float64 + nextSub int64 + feedCh chan streamFeedMsg + done chan struct{} +} + +func newStreamer(policy Policy, centerHz float64) *Streamer { + st := &Streamer{ + sessions: make(map[int64]*streamSession), + policy: policy, + centerHz: centerHz, + feedCh: make(chan streamFeedMsg, 2), + done: make(chan struct{}), + } + go st.worker() + return st +} + +func (st *Streamer) worker() { + for msg := range st.feedCh { + st.processFeed(msg) + } + close(st.done) +} + +func (st *Streamer) updatePolicy(policy Policy, centerHz float64) { + st.mu.Lock() + defer st.mu.Unlock() + wasEnabled := st.policy.Enabled + st.policy = policy + st.centerHz = centerHz + + // If recording was just disabled, close all active sessions + // so WAV headers get fixed and meta.json gets written. + if wasEnabled && !policy.Enabled { + for id, sess := range st.sessions { + for _, sub := range sess.audioSubs { + close(sub.ch) + } + sess.audioSubs = nil + closeSession(sess, &st.policy) + delete(st.sessions, id) + } + log.Printf("STREAM: recording disabled — closed %d sessions", len(st.sessions)) + } +} + +// FeedSnippets is called from the DSP loop with pre-extracted IQ snippets +// (GPU-accelerated FreqShift+FIR+Decimate already done). It copies the snippets +// and enqueues them for async demod in the worker goroutine. +func (st *Streamer) FeedSnippets(items []streamFeedItem) { + st.mu.Lock() + enabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ) + st.mu.Unlock() + if !enabled || len(items) == 0 { + return + } + + // Copy snippets (GPU buffers may be reused) + copied := make([]streamFeedItem, len(items)) + for i, item := range items { + snipCopy := make([]complex64, len(item.snippet)) + copy(snipCopy, item.snippet) + copied[i] = streamFeedItem{ + signal: item.signal, + snippet: snipCopy, + snipRate: item.snipRate, + } + } + + select { + case st.feedCh <- streamFeedMsg{items: copied}: + default: + // Worker busy — drop frame rather than blocking DSP loop + } +} + +// processFeed runs in the worker goroutine. Receives pre-extracted snippets +// and does the lightweight demod + stereo + de-emphasis with persistent state. +func (st *Streamer) processFeed(msg streamFeedMsg) { + st.mu.Lock() + defer st.mu.Unlock() + + if !st.policy.Enabled || (!st.policy.RecordAudio && !st.policy.RecordIQ) { + return + } + + now := time.Now() + seen := make(map[int64]bool, len(msg.items)) + + for i := range msg.items { + item := &msg.items[i] + sig := &item.signal + seen[sig.ID] = true + + if sig.ID == 0 || sig.Class == nil { + continue + } + if sig.SNRDb < st.policy.MinSNRDb { + continue + } + if !st.classAllowed(sig.Class) { + continue + } + if len(item.snippet) == 0 || item.snipRate <= 0 { + continue + } + + sess, exists := st.sessions[sig.ID] + if !exists { + s, err := st.openSession(sig, now) + if err != nil { + log.Printf("STREAM: open failed signal=%d %.1fMHz: %v", + sig.ID, sig.CenterHz/1e6, err) + continue + } + st.sessions[sig.ID] = s + sess = s + } + + // Update metadata + sess.lastFeed = now + sess.centerHz = sig.CenterHz + sess.bwHz = sig.BWHz + if sig.SNRDb > sess.snrDb { + sess.snrDb = sig.SNRDb + } + if sig.PeakDb > sess.peakDb { + sess.peakDb = sig.PeakDb + } + if sig.Class != nil { + sess.class = sig.Class + } + + // Demod with persistent state (overlap-save, stereo, de-emphasis) + audio, audioRate := sess.processSnippet(item.snippet, item.snipRate) + if len(audio) > 0 { + if sess.wavSamples == 0 && audioRate > 0 { + sess.sampleRate = audioRate + } + appendAudio(sess, audio) + st.fanoutAudio(sess, audio) + } + + // Segment split + if st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration { + segIdx := sess.segmentIdx + 1 + oldSubs := sess.audioSubs + oldOverlap := sess.overlapIQ + oldDeemphL := sess.deemphL + oldDeemphR := sess.deemphR + oldStereo := sess.stereoPhase + sess.audioSubs = nil + closeSession(sess, &st.policy) + s, err := st.openSession(sig, now) + if err != nil { + delete(st.sessions, sig.ID) + continue + } + s.segmentIdx = segIdx + s.audioSubs = oldSubs + s.overlapIQ = oldOverlap + s.deemphL = oldDeemphL + s.deemphR = oldDeemphR + s.stereoPhase = oldStereo + st.sessions[sig.ID] = s + } + } + + // Close sessions for disappeared signals (with grace period) + for id, sess := range st.sessions { + if seen[id] { + continue + } + if now.Sub(sess.lastFeed) > 3*time.Second { + closeSession(sess, &st.policy) + delete(st.sessions, id) + } + } +} + +// CloseAll finalises all sessions and stops the worker goroutine. +func (st *Streamer) CloseAll() { + // Stop accepting new feeds and wait for worker to finish + close(st.feedCh) + <-st.done + + st.mu.Lock() + defer st.mu.Unlock() + for id, sess := range st.sessions { + for _, sub := range sess.audioSubs { + close(sub.ch) + } + sess.audioSubs = nil + closeSession(sess, &st.policy) + delete(st.sessions, id) + } +} + +// ActiveSessions returns the number of open streaming sessions. +func (st *Streamer) ActiveSessions() int { + st.mu.Lock() + defer st.mu.Unlock() + return len(st.sessions) +} + +// SubscribeAudio registers a live-listen subscriber for a given frequency. +func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte) { + ch := make(chan []byte, 64) + st.mu.Lock() + defer st.mu.Unlock() + st.nextSub++ + subID := st.nextSub + + var bestSess *streamSession + bestDist := math.MaxFloat64 + for _, sess := range st.sessions { + d := math.Abs(sess.centerHz - freq) + if d < bestDist { + bestDist = d + bestSess = sess + } + } + if bestSess != nil && bestDist < 200000 { + bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch}) + } else { + log.Printf("STREAM: audio subscriber %d has no matching session (freq=%.1fMHz)", subID, freq/1e6) + close(ch) + } + return subID, ch +} + +// UnsubscribeAudio removes a live-listen subscriber. +func (st *Streamer) UnsubscribeAudio(subID int64) { + st.mu.Lock() + defer st.mu.Unlock() + for _, sess := range st.sessions { + for i, sub := range sess.audioSubs { + if sub.id == subID { + close(sub.ch) + sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...) + return + } + } + } +} + +// --------------------------------------------------------------------------- +// Session: stateful extraction + demod +// --------------------------------------------------------------------------- + +// processSnippet takes a pre-extracted IQ snippet (from GPU or CPU +// extractSignalIQBatch) and demodulates it with persistent state. +// +// The overlap-save operates on the EXTRACTED snippet level: we prepend +// the tail of the previous snippet so that: +// - FM discriminator has iq[i-1] for the first sample +// - The ~50-sample transient from FreqShift phase reset and FIR startup +// falls into the overlap region and gets trimmed from the output +// +// Stateful components (across frames): +// - overlapIQ: tail of previous extracted snippet +// - stereoPhase: 38kHz oscillator for L-R decode +// - deemphL/R: de-emphasis IIR accumulators +func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) { + if len(snippet) == 0 || snipRate <= 0 { + return nil, 0 + } + + isWFMStereo := sess.demodName == "WFM_STEREO" + isWFM := sess.demodName == "WFM" || isWFMStereo + + demodName := sess.demodName + if isWFMStereo { + demodName = "WFM" // mono FM demod, then stateful stereo post-process + } + d := demod.Get(demodName) + if d == nil { + d = demod.Get("NFM") + } + if d == nil { + return nil, 0 + } + + // --- Minimal overlap: prepend last sample from previous snippet --- + // The FM discriminator computes atan2(iq[i] * conj(iq[i-1])), so the + // first output sample needs iq[-1] from the previous frame. + // FIR halo is already handled by extractForStreaming's IQ-level overlap, + // so we only need 1 sample here. + var fullSnip []complex64 + trimSamples := 0 + if len(sess.overlapIQ) > 0 { + fullSnip = make([]complex64, len(sess.overlapIQ)+len(snippet)) + copy(fullSnip, sess.overlapIQ) + copy(fullSnip[len(sess.overlapIQ):], snippet) + trimSamples = len(sess.overlapIQ) + } else { + fullSnip = snippet + } + + // Save last sample for next frame's FM discriminator + if len(snippet) > 0 { + sess.overlapIQ = []complex64{snippet[len(snippet)-1]} + } + + // --- Decimate to demod-preferred rate with anti-alias --- + demodRate := d.OutputSampleRate() + decim1 := int(math.Round(float64(snipRate) / float64(demodRate))) + if decim1 < 1 { + decim1 = 1 + } + actualDemodRate := snipRate / decim1 + + var dec []complex64 + if decim1 > 1 { + cutoff := float64(actualDemodRate) / 2.0 * 0.8 + aaTaps := dsp.LowpassFIR(cutoff, snipRate, 101) + filtered := dsp.ApplyFIR(fullSnip, aaTaps) + dec = dsp.Decimate(filtered, decim1) + } else { + dec = fullSnip + } + + // --- FM Demod --- + audio := d.Demod(dec, actualDemodRate) + if len(audio) == 0 { + return nil, 0 + } + + // --- Trim the overlap sample(s) from audio --- + audioTrim := trimSamples / decim1 + if decim1 <= 1 { + audioTrim = trimSamples + } + if audioTrim > 0 && audioTrim < len(audio) { + audio = audio[audioTrim:] + } + + // --- Stateful stereo decode --- + channels := 1 + if isWFMStereo { + channels = 2 + audio = sess.stereoDecodeStateful(audio, actualDemodRate) + } + + // --- Resample towards 48kHz --- + outputRate := actualDemodRate + if actualDemodRate > streamAudioRate { + decim2 := actualDemodRate / streamAudioRate + if decim2 < 1 { + decim2 = 1 + } + outputRate = actualDemodRate / decim2 + + aaTaps := dsp.LowpassFIR(float64(outputRate)/2.0*0.9, actualDemodRate, 63) + + if channels > 1 { + nFrames := len(audio) / channels + left := make([]float32, nFrames) + right := make([]float32, nFrames) + for i := 0; i < nFrames; i++ { + left[i] = audio[i*2] + if i*2+1 < len(audio) { + right[i] = audio[i*2+1] + } + } + left = dsp.ApplyFIRReal(left, aaTaps) + right = dsp.ApplyFIRReal(right, aaTaps) + outFrames := nFrames / decim2 + if outFrames < 1 { + return nil, 0 + } + resampled := make([]float32, outFrames*2) + for i := 0; i < outFrames; i++ { + resampled[i*2] = left[i*decim2] + resampled[i*2+1] = right[i*decim2] + } + audio = resampled + } else { + audio = dsp.ApplyFIRReal(audio, aaTaps) + resampled := make([]float32, 0, len(audio)/decim2+1) + for i := 0; i < len(audio); i += decim2 { + resampled = append(resampled, audio[i]) + } + audio = resampled + } + } + + // --- De-emphasis (50µs Europe) --- + if isWFM && outputRate > 0 { + const tau = 50e-6 + alpha := math.Exp(-1.0 / (float64(outputRate) * tau)) + if channels > 1 { + nFrames := len(audio) / channels + yL, yR := sess.deemphL, sess.deemphR + for i := 0; i < nFrames; i++ { + yL = alpha*yL + (1-alpha)*float64(audio[i*2]) + audio[i*2] = float32(yL) + yR = alpha*yR + (1-alpha)*float64(audio[i*2+1]) + audio[i*2+1] = float32(yR) + } + sess.deemphL, sess.deemphR = yL, yR + } else { + y := sess.deemphL + for i := range audio { + y = alpha*y + (1-alpha)*float64(audio[i]) + audio[i] = float32(y) + } + sess.deemphL = y + } + } + + return audio, outputRate +} + +// stereoDecodeStateful: phase-continuous 38kHz oscillator for L-R extraction. +func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) []float32 { + if len(mono) == 0 || sampleRate <= 0 { + return nil + } + + lp := dsp.LowpassFIR(15000, sampleRate, 101) + lpr := dsp.ApplyFIRReal(mono, lp) + + bpHi := dsp.LowpassFIR(53000, sampleRate, 101) + bpLo := dsp.LowpassFIR(23000, sampleRate, 101) + hi := dsp.ApplyFIRReal(mono, bpHi) + lo := dsp.ApplyFIRReal(mono, bpLo) + bpf := make([]float32, len(mono)) + for i := range mono { + bpf[i] = hi[i] - lo[i] + } + + lr := make([]float32, len(mono)) + phase := sess.stereoPhase + inc := 2 * math.Pi * 38000 / float64(sampleRate) + for i := range bpf { + phase += inc + lr[i] = bpf[i] * float32(2*math.Cos(phase)) + } + sess.stereoPhase = math.Mod(phase, 2*math.Pi) + + lr = dsp.ApplyFIRReal(lr, lp) + + out := make([]float32, len(lpr)*2) + for i := range lpr { + out[i*2] = 0.5 * (lpr[i] + lr[i]) + out[i*2+1] = 0.5 * (lpr[i] - lr[i]) + } + return out +} + +// --------------------------------------------------------------------------- +// Session management helpers +// --------------------------------------------------------------------------- + +func (st *Streamer) openSession(sig *detector.Signal, now time.Time) (*streamSession, error) { + outputDir := st.policy.OutputDir + if outputDir == "" { + outputDir = "data/recordings" + } + + demodName := "NFM" + if sig.Class != nil { + if n := mapClassToDemod(sig.Class.ModType); n != "" { + demodName = n + } + } + channels := 1 + if demodName == "WFM_STEREO" { + channels = 2 + } else if d := demod.Get(demodName); d != nil { + channels = d.Channels() + } + + dirName := fmt.Sprintf("%s_%.0fHz_stream%d", + now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID) + dir := filepath.Join(outputDir, dirName) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, err + } + + wavPath := filepath.Join(dir, "audio.wav") + f, err := os.Create(wavPath) + if err != nil { + return nil, err + } + if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil { + f.Close() + return nil, err + } + + sess := &streamSession{ + signalID: sig.ID, + centerHz: sig.CenterHz, + bwHz: sig.BWHz, + snrDb: sig.SNRDb, + peakDb: sig.PeakDb, + class: sig.Class, + startTime: now, + lastFeed: now, + dir: dir, + wavFile: f, + wavBuf: bufio.NewWriterSize(f, 64*1024), + sampleRate: streamAudioRate, + channels: channels, + demodName: demodName, + } + + log.Printf("STREAM: opened signal=%d %.1fMHz %s dir=%s", + sig.ID, sig.CenterHz/1e6, demodName, dirName) + return sess, nil +} + +func closeSession(sess *streamSession, policy *Policy) { + if sess.wavBuf != nil { + _ = sess.wavBuf.Flush() + } + if sess.wavFile != nil { + fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels) + sess.wavFile.Close() + sess.wavFile = nil + sess.wavBuf = nil + } + + dur := sess.lastFeed.Sub(sess.startTime) + files := map[string]any{ + "audio": "audio.wav", + "audio_sample_rate": sess.sampleRate, + "audio_channels": sess.channels, + "audio_demod": sess.demodName, + "recording_mode": "streaming", + } + meta := Meta{ + EventID: sess.signalID, + Start: sess.startTime, + End: sess.lastFeed, + CenterHz: sess.centerHz, + BandwidthHz: sess.bwHz, + SampleRate: sess.sampleRate, + SNRDb: sess.snrDb, + PeakDb: sess.peakDb, + Class: sess.class, + DurationMs: dur.Milliseconds(), + Files: files, + } + b, err := json.MarshalIndent(meta, "", " ") + if err == nil { + _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644) + } + if policy != nil { + enforceQuota(policy.OutputDir, policy.MaxDiskMB) + } +} + +func appendAudio(sess *streamSession, audio []float32) { + if sess.wavBuf == nil || len(audio) == 0 { + return + } + buf := make([]byte, len(audio)*2) + for i, s := range audio { + v := int16(clip(s * 32767)) + binary.LittleEndian.PutUint16(buf[i*2:], uint16(v)) + } + n, err := sess.wavBuf.Write(buf) + if err != nil { + log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err) + return + } + sess.wavSamples += int64(n / 2) +} + +func (st *Streamer) fanoutAudio(sess *streamSession, audio []float32) { + if len(sess.audioSubs) == 0 { + return + } + pcm := make([]byte, len(audio)*2) + for i, s := range audio { + v := int16(clip(s * 32767)) + binary.LittleEndian.PutUint16(pcm[i*2:], uint16(v)) + } + alive := sess.audioSubs[:0] + for _, sub := range sess.audioSubs { + select { + case sub.ch <- pcm: + default: + } + alive = append(alive, sub) + } + sess.audioSubs = alive +} + +func (st *Streamer) classAllowed(cls *classifier.Classification) bool { + if len(st.policy.ClassFilter) == 0 { + return true + } + if cls == nil { + return false + } + for _, f := range st.policy.ClassFilter { + if strings.EqualFold(f, string(cls.ModType)) { + return true + } + } + return false +} + +// --------------------------------------------------------------------------- +// WAV header helpers +// --------------------------------------------------------------------------- + +func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error { + if channels <= 0 { + channels = 1 + } + hdr := make([]byte, 44) + copy(hdr[0:4], "RIFF") + binary.LittleEndian.PutUint32(hdr[4:8], 36) + copy(hdr[8:12], "WAVE") + copy(hdr[12:16], "fmt ") + binary.LittleEndian.PutUint32(hdr[16:20], 16) + binary.LittleEndian.PutUint16(hdr[20:22], 1) + binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels)) + binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate)) + binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2)) + binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2)) + binary.LittleEndian.PutUint16(hdr[34:36], 16) + copy(hdr[36:40], "data") + binary.LittleEndian.PutUint32(hdr[40:44], 0) + _, err := f.Write(hdr) + return err +} + +func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) { + dataSize := uint32(totalSamples * 2) + var buf [4]byte + + binary.LittleEndian.PutUint32(buf[:], 36+dataSize) + if _, err := f.Seek(4, 0); err != nil { + return + } + _, _ = f.Write(buf[:]) + + binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate)) + if _, err := f.Seek(24, 0); err != nil { + return + } + _, _ = f.Write(buf[:]) + + binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2)) + if _, err := f.Seek(28, 0); err != nil { + return + } + _, _ = f.Write(buf[:]) + + binary.LittleEndian.PutUint32(buf[:], dataSize) + if _, err := f.Seek(40, 0); err != nil { + return + } + _, _ = f.Write(buf[:]) +} diff --git a/sdr-visual-suite.rar b/sdr-visual-suite.rar index 80a11ac..e5251ff 100644 Binary files a/sdr-visual-suite.rar and b/sdr-visual-suite.rar differ diff --git a/sdr-visual-suite_works.rar b/sdr-visual-suite_works.rar deleted file mode 100644 index 113dad6..0000000 Binary files a/sdr-visual-suite_works.rar and /dev/null differ