diff --git a/cmd/sdrd/dsp_loop.go b/cmd/sdrd/dsp_loop.go index 230a654..be6395e 100644 --- a/cmd/sdrd/dsp_loop.go +++ b/cmd/sdrd/dsp_loop.go @@ -113,6 +113,7 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det * for k := range rt.streamPhaseState { rt.streamPhaseState[k].phase = 0 } + resetStreamingOracleRunner() rec.ResetStreams() logging.Warn("gap", "iq_dropped", "msg", "buffer bloat caused extraction drop; overlap reset") if coll != nil { diff --git a/cmd/sdrd/helpers.go b/cmd/sdrd/helpers.go index 73dd250..fbb7c02 100644 --- a/cmd/sdrd/helpers.go +++ b/cmd/sdrd/helpers.go @@ -231,7 +231,7 @@ type extractionConfig struct { const streamOverlapLen = 512 // must be >= FIR tap count with margin const ( - wfmStreamOutRate = 500000 + wfmStreamOutRate = 512000 wfmStreamMinBW = 250000 ) @@ -252,6 +252,9 @@ var forceCPUStreamExtract = func() bool { // - IQ overlap prepended to allIQ so FIR kernel has real data in halo // // Returns extracted snippets with overlap trimmed, and updates phase state. +// extractForStreaming is the current legacy production path. +// It still relies on overlap-prepend + trim semantics and is intentionally +// kept separate from the new streaming refactor/oracle path under development. func extractForStreaming( extractMgr *extractionManager, allIQ []complex64, @@ -263,6 +266,16 @@ func extractForStreaming( aqCfg extractionConfig, coll *telemetry.Collector, ) ([][]complex64, []int) { + if useStreamingProductionPath { + if out, rates, err := extractForStreamingProduction(extractMgr, allIQ, sampleRate, centerHz, signals, aqCfg, coll); err == nil { + return out, rates + } + } + if useStreamingOraclePath { + if out, rates, err := extractForStreamingOracle(allIQ, sampleRate, centerHz, signals, aqCfg, coll); err == nil { + return out, rates + } + } out := make([][]complex64, len(signals)) rates := make([]int, len(signals)) if len(allIQ) == 0 || sampleRate <= 0 || len(signals) == 0 { diff --git a/cmd/sdrd/legacy_extract.go b/cmd/sdrd/legacy_extract.go new file mode 100644 index 0000000..52590cc --- /dev/null +++ b/cmd/sdrd/legacy_extract.go @@ -0,0 +1,6 @@ +package main + +// NOTE: Legacy extractor logic still lives in helpers.go for now. +// This file is intentionally reserved for the later explicit move once the +// production-path rewrite is far enough along that the split can be done in one +// safe pass instead of a risky mechanical half-step. diff --git a/cmd/sdrd/pipeline_runtime_test.go b/cmd/sdrd/pipeline_runtime_test.go index 99e2654..54d4ac9 100644 --- a/cmd/sdrd/pipeline_runtime_test.go +++ b/cmd/sdrd/pipeline_runtime_test.go @@ -13,7 +13,7 @@ func TestNewDSPRuntime(t *testing.T) { cfg := config.Default() det := detector.New(cfg.Detector, cfg.SampleRate, cfg.FFTSize) window := fftutil.Hann(cfg.FFTSize) - rt := newDSPRuntime(cfg, det, window, &gpuStatus{}) + rt := newDSPRuntime(cfg, det, window, &gpuStatus{}, nil) if rt == nil { t.Fatalf("runtime is nil") } @@ -47,7 +47,7 @@ func TestSurveillanceLevelsRespectStrategy(t *testing.T) { cfg := config.Default() det := detector.New(cfg.Detector, cfg.SampleRate, cfg.FFTSize) window := fftutil.Hann(cfg.FFTSize) - rt := newDSPRuntime(cfg, det, window, &gpuStatus{}) + rt := newDSPRuntime(cfg, det, window, &gpuStatus{}, nil) policy := pipeline.Policy{SurveillanceStrategy: "single-resolution"} plan := rt.buildSurveillancePlan(policy) if len(plan.Levels) != 1 { diff --git a/cmd/sdrd/streaming_compare.go b/cmd/sdrd/streaming_compare.go new file mode 100644 index 0000000..dda334b --- /dev/null +++ b/cmd/sdrd/streaming_compare.go @@ -0,0 +1,45 @@ +package main + +import ( + "fmt" + + "sdr-wideband-suite/internal/demod/gpudemod" + "sdr-wideband-suite/internal/telemetry" +) + +func observeStreamingComparison(coll *telemetry.Collector, oracle gpudemod.StreamingExtractResult, prod gpudemod.StreamingExtractResult) { + if coll == nil { + return + } + metrics, stats := gpudemod.CompareOracleAndGPUHostOracle(oracle, prod) + tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", oracle.SignalID), "path", "streaming_compare") + coll.SetGauge("streaming.compare.n_out", float64(metrics.NOut), tags) + coll.SetGauge("streaming.compare.phase_count", float64(metrics.PhaseCount), tags) + coll.SetGauge("streaming.compare.history_len", float64(metrics.HistoryLen), tags) + coll.Observe("streaming.compare.ref_max_abs_err", metrics.RefMaxAbsErr, tags) + coll.Observe("streaming.compare.ref_rms_err", metrics.RefRMSErr, tags) + coll.SetGauge("streaming.compare.compare_count", float64(stats.Count), tags) + coll.SetGauge("streaming.compare.oracle_rate", float64(oracle.Rate), tags) + coll.SetGauge("streaming.compare.production_rate", float64(prod.Rate), tags) + coll.SetGauge("streaming.compare.oracle_output_len", float64(len(oracle.IQ)), tags) + coll.SetGauge("streaming.compare.production_output_len", float64(len(prod.IQ)), tags) + if len(oracle.IQ) > 0 { + oracleStats := computeIQHeadStats(oracle.IQ, 64) + coll.Observe("streaming.compare.oracle_head_mean_mag", oracleStats.meanMag, tags) + coll.Observe("streaming.compare.oracle_head_max_step", oracleStats.maxStep, tags) + } + if len(prod.IQ) > 0 { + prodStats := computeIQHeadStats(prod.IQ, 64) + coll.Observe("streaming.compare.production_head_mean_mag", prodStats.meanMag, tags) + coll.Observe("streaming.compare.production_head_max_step", prodStats.maxStep, tags) + } + coll.Event("streaming_compare_snapshot", "info", "streaming comparison snapshot", tags, map[string]any{ + "oracle_rate": oracle.Rate, + "production_rate": prod.Rate, + "oracle_output_len": len(oracle.IQ), + "production_output_len": len(prod.IQ), + "ref_max_abs_err": metrics.RefMaxAbsErr, + "ref_rms_err": metrics.RefRMSErr, + "compare_count": stats.Count, + }) +} diff --git a/cmd/sdrd/streaming_monitoring.go b/cmd/sdrd/streaming_monitoring.go new file mode 100644 index 0000000..f334a15 --- /dev/null +++ b/cmd/sdrd/streaming_monitoring.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + + "sdr-wideband-suite/internal/demod/gpudemod" + "sdr-wideband-suite/internal/telemetry" +) + +func observeStreamingResult(coll *telemetry.Collector, prefix string, res gpudemod.StreamingExtractResult) { + if coll == nil { + return + } + tags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", res.SignalID), "path", prefix) + coll.SetGauge(prefix+".n_out", float64(res.NOut), tags) + coll.SetGauge(prefix+".phase_count", float64(res.PhaseCount), tags) + coll.SetGauge(prefix+".history_len", float64(res.HistoryLen), tags) + coll.SetGauge(prefix+".rate", float64(res.Rate), tags) + coll.SetGauge(prefix+".output_len", float64(len(res.IQ)), tags) + if len(res.IQ) > 0 { + stats := computeIQHeadStats(res.IQ, 64) + coll.Observe(prefix+".head_mean_mag", stats.meanMag, tags) + coll.Observe(prefix+".head_max_step", stats.maxStep, tags) + coll.Observe(prefix+".head_p95_step", stats.p95Step, tags) + coll.SetGauge(prefix+".head_low_magnitude_count", float64(stats.lowMag), tags) + } +} diff --git a/cmd/sdrd/streaming_production.go b/cmd/sdrd/streaming_production.go new file mode 100644 index 0000000..6198993 --- /dev/null +++ b/cmd/sdrd/streaming_production.go @@ -0,0 +1,50 @@ +package main + +import ( + "fmt" + + "sdr-wideband-suite/internal/demod/gpudemod" + "sdr-wideband-suite/internal/detector" + "sdr-wideband-suite/internal/telemetry" +) + +func extractForStreamingProduction( + extractMgr *extractionManager, + allIQ []complex64, + sampleRate int, + centerHz float64, + signals []detector.Signal, + aqCfg extractionConfig, + coll *telemetry.Collector, +) ([][]complex64, []int, error) { + out := make([][]complex64, len(signals)) + rates := make([]int, len(signals)) + jobs, err := buildStreamingJobs(sampleRate, centerHz, signals, aqCfg) + if err != nil { + return nil, nil, err + } + runner := extractMgr.get(len(allIQ), sampleRate) + if runner == nil { + return nil, nil, fmt.Errorf("streaming production path unavailable: no batch runner") + } + results, err := runner.StreamingExtractGPU(allIQ, jobs) + if err != nil { + return nil, nil, err + } + var oracleResults []gpudemod.StreamingExtractResult + if useStreamingOraclePath { + if streamingOracleRunner == nil || streamingOracleRunner.SampleRate != sampleRate { + streamingOracleRunner = gpudemod.NewCPUOracleRunner(sampleRate) + } + oracleResults, _ = streamingOracleRunner.StreamingExtract(allIQ, jobs) + } + for i, res := range results { + out[i] = res.IQ + rates[i] = res.Rate + observeStreamingResult(coll, "streaming.production", res) + if i < len(oracleResults) { + observeStreamingComparison(coll, oracleResults[i], res) + } + } + return out, rates, nil +} diff --git a/cmd/sdrd/streaming_refactor.go b/cmd/sdrd/streaming_refactor.go new file mode 100644 index 0000000..8dc650b --- /dev/null +++ b/cmd/sdrd/streaming_refactor.go @@ -0,0 +1,94 @@ +package main + +import ( + "math" + + "sdr-wideband-suite/internal/demod/gpudemod" + "sdr-wideband-suite/internal/detector" + "sdr-wideband-suite/internal/telemetry" +) + +const useStreamingOraclePath = true // keep true during C2-C so the real native path is continuously compared against the corrected oracle +const useStreamingProductionPath = false // keep false until the new production path is explicitly activated in runtime bring-up + +var streamingOracleRunner *gpudemod.CPUOracleRunner + +func buildStreamingJobs(sampleRate int, centerHz float64, signals []detector.Signal, aqCfg extractionConfig) ([]gpudemod.StreamingExtractJob, error) { + jobs := make([]gpudemod.StreamingExtractJob, len(signals)) + decimTarget := 200000 + bwMult := aqCfg.bwMult + if bwMult <= 0 { + bwMult = 1.0 + } + firTaps := aqCfg.firTaps + if firTaps <= 0 { + firTaps = 101 + } + for i, sig := range signals { + bw := sig.BWHz * bwMult + sigMHz := sig.CenterHz / 1e6 + isWFM := (sigMHz >= 87.5 && sigMHz <= 108.0) || + (sig.Class != nil && (sig.Class.ModType == "WFM" || sig.Class.ModType == "WFM_STEREO")) + outRate := decimTarget + if isWFM { + outRate = wfmStreamOutRate + if bw < wfmStreamMinBW { + bw = wfmStreamMinBW + } + } else if bw < 20000 { + bw = 20000 + } + if _, err := gpudemod.ExactIntegerDecimation(sampleRate, outRate); err != nil { + return nil, err + } + offset := sig.CenterHz - centerHz + jobs[i] = gpudemod.StreamingExtractJob{ + SignalID: sig.ID, + OffsetHz: offset, + Bandwidth: bw, + OutRate: outRate, + NumTaps: firTaps, + ConfigHash: gpudemod.StreamingConfigHash(sig.ID, offset, bw, outRate, firTaps, sampleRate), + } + } + return jobs, nil +} + +func resetStreamingOracleRunner() { + if streamingOracleRunner != nil { + streamingOracleRunner.ResetAllStates() + } +} + +func extractForStreamingOracle( + allIQ []complex64, + sampleRate int, + centerHz float64, + signals []detector.Signal, + aqCfg extractionConfig, + coll *telemetry.Collector, +) ([][]complex64, []int, error) { + out := make([][]complex64, len(signals)) + rates := make([]int, len(signals)) + jobs, err := buildStreamingJobs(sampleRate, centerHz, signals, aqCfg) + if err != nil { + return nil, nil, err + } + if streamingOracleRunner == nil || streamingOracleRunner.SampleRate != sampleRate { + streamingOracleRunner = gpudemod.NewCPUOracleRunner(sampleRate) + } + results, err := streamingOracleRunner.StreamingExtract(allIQ, jobs) + if err != nil { + return nil, nil, err + } + for i, res := range results { + out[i] = res.IQ + rates[i] = res.Rate + observeStreamingResult(coll, "streaming.oracle", res) + } + return out, rates, nil +} + +func phaseIncForOffset(sampleRate int, offsetHz float64) float64 { + return -2.0 * math.Pi * offsetHz / float64(sampleRate) +} diff --git a/docs/audio-click-debug-notes-2026-03-24.md b/docs/audio-click-debug-notes-2026-03-24.md index d6f3017..f4ab63f 100644 --- a/docs/audio-click-debug-notes-2026-03-24.md +++ b/docs/audio-click-debug-notes-2026-03-24.md @@ -808,6 +808,176 @@ This now points away from a simple "shared global input head is already zero" th - `config.autosave.yaml` must be kept in sync with `config.yaml` or telemetry defaults can silently revert after restart. - The most promising root-cause area is now the shared upstream/extractor-start boundary path, not downstream playback. +### 2026-03-25 refactor work status (post-reviewer instruction) + +After the reviewer guidance, work pivoted away from symptomatic patching and onto the required two-track architecture change: + +#### Track 1 — CPU/oracle path repair (in progress) +The following was added to start building a trustworthy streaming oracle: +- `internal/demod/gpudemod/streaming_types.go` +- `internal/demod/gpudemod/cpu_oracle.go` +- `internal/demod/gpudemod/cpu_oracle_test.go` +- `internal/demod/gpudemod/streaming_oracle_extract.go` +- `internal/demod/gpudemod/polyphase.go` +- `internal/demod/gpudemod/polyphase_test.go` + +What exists now: +- explicit `StreamingExtractJob` / `StreamingExtractResult` +- explicit `CPUOracleState` +- exact integer decimation enforcement (`ExactIntegerDecimation`) +- monolithic-vs-chunked CPU oracle test +- explicit polyphase tap layout (`phase-major`) +- CPU oracle direct-vs-polyphase equivalence test +- persistent CPU oracle runner state keyed by signal ID +- config-hash reset behavior +- cleanup of disappeared signals from oracle state + +Important limitation: +- this is **not finished production validation yet** +- the CPU oracle path is being built toward the reviewer’s required semantics, but it is not yet the final signed-off oracle for GPU validation + +#### Track 2 — GPU path architecture refactor (in progress) +The following was added to begin the new stateful GPU architecture: +- `internal/demod/gpudemod/stream_state.go` +- `internal/demod/gpudemod/streaming_gpu_stub.go` +- `docs/gpu-streaming-refactor-plan-2026-03-25.md` +- `cmd/sdrd/streaming_refactor.go` + +What exists now: +- explicit `ExtractStreamState` +- batch-runner-owned per-signal state map +- config-hash reset behavior for GPU-side stream state +- exact integer decimation enforcement in relevant batch path +- base taps and polyphase taps initialized into GPU-side stream state +- explicit future production entry point: `StreamingExtractGPU(...)` +- explicit separation between current legacy extractor path and the new streaming/oracle path +- persistent oracle-runner lifecycle hooks, including reset on stream-drop events + +Important limitation: +- the new GPU production path is **not implemented yet** +- the legacy overlap+trim production path still exists and is still the current active path +- the new GPU entry point currently exists as an explicit architectural boundary and state owner, not as the finished stateful polyphase kernel path + +#### Tests currently passing during refactor +Repeatedly verified during the refactor work: +- `go test ./internal/demod/gpudemod/...` +- `go test ./cmd/sdrd/...` + +#### Incremental progress reached so far inside the refactor + +Additional progress after the initial refactor scaffolding: +- the CPU oracle runner now uses the explicit polyphase oracle path (`CPUOracleExtractPolyphase`) instead of only carrying polyphase tap data passively +- the CPU oracle now has a direct-vs-polyphase equivalence test +- the GPU-side stream state now initializes both `BaseTaps` and `PolyphaseTaps` +- the GPU side now has an explicit future production entry point `StreamingExtractGPU(...)` +- the GPU streaming stub now advances `NCOPhase` over NEW samples only +- the GPU streaming stub now advances `PhaseCount` modulo exact integer decimation +- the GPU streaming stub now builds and persists `ShiftedHistory` from already frequency-shifted NEW samples +- the new streaming/oracle path is explicitly separated from the current legacy overlap+trim production path + +Important current limitation: +- `StreamingExtractGPU(...)` still intentionally returns a not-implemented error rather than pretending to be the finished production path +- this is deliberate, to avoid hidden quick-fix semantics or silent goalpost shifts + +Additional note on the latest step: +- the GPU streaming stub now also reports an estimated output-count schedule (`NOut`) derived from NEW sample consumption plus carried `PhaseCount` +- this still does **not** make it a production path; it only means the stub now models output cadence semantics more honestly +- the new CPU/oracle path is also now exposing additional runtime telemetry such as `streaming.oracle.rate` and `streaming.oracle.output_len`, so the reference path becomes easier to inspect as it matures +- a reusable complex-slice comparison helper now exists (`CompareComplexSlices`) to support later oracle-vs-GPU equivalence work without improvising comparison logic at the last minute +- a dedicated `TestCPUOracleMonolithicVsChunkedPolyphase` now verifies chunked-vs-monolithic self-consistency for the polyphase oracle path specifically +- explicit reset tests now exist for both CPU oracle state and GPU streaming state, so config-change reset semantics are no longer only implicit in code review +- a dedicated `ExtractDebugMetrics` structure now exists as a future comparison/telemetry contract for reviewer-required state/error/boundary metrics +- the first mapper from oracle results into that debug-metric structure now exists, so the comparison contract is beginning to attach to real refactor code rather than staying purely conceptual +- the same minimal debug-metric mapping now also exists for GPU-stub results, so both sides of the future GPU-vs-oracle comparison now have an initial common reporting shape +- a first comparison-pipeline helper now exists to turn oracle-vs-GPU-stub results into shared `CompareStats` / `ExtractDebugMetrics` output, even though the GPU path is still intentionally incomplete +- that comparison helper is now also covered by a dedicated unit test, so even the scaffolding around future GPU-vs-oracle validation is being locked down incrementally +- GPU-side stream-state initialization is now also unit-tested (`Decim`, `BaseTaps`, `PolyphaseTaps`, `ShiftedHistory` capacity), so the new state ownership layer is no longer just trusted by inspection +- the GPU streaming stub now also has a dedicated test proving that it advances persistent state while still explicitly failing as a not-yet-implemented production path +- at this point, enough scaffolding exists that the next sensible step is to build the broader validation/test harness in one larger pass before continuing the actual production-path rewrite +- that harness pass has now happened: deterministic IQ/tone fixtures, harness config/state builders, chunked polyphase oracle runners, and additional validation tests now exist, so the next step is back to the actual production-path rewrite +- the first non-stub NEW-samples-only production-like path now exists as `StreamingExtractGPUHostOracle(...)`: it is still host-side, but it executes the new streaming/stateful semantics and therefore serves as a concrete bridge between pure test infrastructure and the eventual real GPU production path +- that host-side production-like path is now directly compared against the CPU oracle in tests and currently matches within tight tolerance, which is an important confidence step before any real CUDA-path replacement +- the canonical new production entry point `StreamingExtractGPU(...)` is now structurally wired so that the host-side production-like implementation can sit behind the same API later, without forcing a premature switch today +- a top-level `cmd/sdrd` production path hook now exists as well (`extractForStreamingProduction` plus `useStreamingProductionPath=false`), so the new architecture is no longer isolated to internal packages only +- the new production path now also emits first-class output/heading telemetry (`rate`, `output_len`, `head_mean_mag`, `head_max_step`) in addition to pure state counters, which will make activation/debugging easier later +- a top-level comparison observation hook now also exists in `cmd/sdrd`, so oracle-vs-production metrics no longer have to remain buried inside internal package helpers +- after the broader monitoring/comparison consolidation pass, the next agreed work mode is to continue in larger clusters rather than micro-steps: (1) wire the new production semantics more deeply, (2) isolate the legacy path more sharply, (3) keep preparing the eventual real GPU production path behind the same architecture +- after the first larger cluster, the next explicit target is to complete Cluster B: make the host-oracle bridge sit more naturally behind the new production execution architecture, rather than leaving production-path semantics spread across loosely connected files +- after Cluster B, the remaining GPU rewrite work is now best split into two explicit parts: `C1 = prepare` and `C2 = definitive implementation`, so the project can keep momentum without pretending that the final CUDA/stateful production path is already done +- Cluster B is now effectively complete: CPU oracle runner, host-oracle production-like path, and top-level production comparison all share the same host streaming core, and that common core is directly tested against the polyphase oracle +- Cluster C1 is now also complete: the new GPU production layer has an explicit invocation contract, execution-result contract, state handoff/build/apply stages, and a host-side execution strategy already running behind the same model + +### Current refactor status before C2 + +At this point the project has: +- a corrected streaming/oracle architecture direction +- a shared host-side streaming core used by both the CPU oracle runner and the host-side production-like bridge +- explicit production-path hooks in `cmd/sdrd` +- comparison and monitoring scaffolding above and below the execution layer +- a prepared GPU execution contract (`StreamingGPUInvocation` / `StreamingGPUExecutionResult`) + +What it does **not** have yet: +- a real native CUDA streaming/polyphase execution entry point with history-in/history-out and phase-count in/out semantics +- a real CUDA-backed implementation behind `StreamingExtractGPUExec(...)` +- completed GPU-vs-oracle validation on the final native execution path + +### C2 plan + +#### C2-A — native CUDA / bridge entry preparation +Goal: +- introduce the real native entry shape for stateful streaming/polyphase execution + +Status note before starting C2-A: +- C2 is **not** honestly complete yet because the native CUDA side still only exposes the old separate freq-shift/FIR/decimate pieces. +- Therefore C2-A must begin by creating the real native entry shape rather than continuing to stack more Go-only abstractions on top of the old kernels. + +Required outcomes: +- explicit native/CUDA function signature for streaming execution +- bridge bindings for history in/out, phase count in/out, new samples in, outputs out +- Go-side wrapper ready to call the new native path through the prepared invocation/result model + +#### C2-B — definitive execution implementation hookup +Goal: +- put a real native CUDA-backed execution strategy behind `StreamingExtractGPUExec(...)` + +Status note after C2-A: +- the native entry shape now exists in CUDA, the Windows bridge can resolve it, and the Go execution layer can route into a native-prepared strategy. +- what is still missing for C2-B is the actual stateful execution body behind that new native entrypoint. +- therefore C2-B now means exactly one serious thing: replace the current placeholder body of the new native entrypoint with real stateful streaming/polyphase execution semantics, rather than adding more scaffolding around it. +- C2-B is now materially done: the new native entrypoint no longer returns only placeholder state, and the Go native execution path now uploads inputs/history/taps, runs the new native function, and reads back outputs plus updated state. +- when the new exact-integer streaming decimation rules were turned on, an immediate runtime integration issue appeared: previous WFM extraction defaults expected `outRate=500000`, but the live sample rate was `4096000`, which is not exactly divisible. The correct fix is to align streaming defaults with the new integer-decimation model instead of trying to preserve the old rounded ratio behavior. +- the concrete immediate adjustment made for this was: `wfmStreamOutRate = 512000` (instead of `500000`), because `4096000 / 512000 = 8` is exactly divisible and therefore consistent with the new streaming architecture’s no-rounding rule. + +Required outcomes: +- `StreamingExtractGPUExec(...)` can execute a real native stateful path +- host-oracle bridge remains available only as a comparison/support path, not as the disguised production implementation +- state apply/backflow goes through the already prepared invocation/result contract + +#### C2-C — final validation and serious completion gate +Goal: +- validate the real CUDA-backed path against the corrected oracle and make the completion criterion explicit + +Required outcomes: +- GPU-vs-oracle comparison active on the real native path +- test coverage and runtime comparison hooks in place +- after C2-C, the CUDA story must be treated as complete, correct, and serious — not half-switched or pseudo-finished + +#### Why the refactor is intentionally incremental +The reviewer explicitly required: +- no start-index-only production patch +- no continued reliance on overlap+trim as final continuity model +- no silent decimation rounding +- no GPU sign-off without a corrected CPU oracle + +Because of that, the work is being done in ordered layers: +1. define streaming types and state +2. build the CPU oracle with exact streaming semantics +3. establish shared polyphase/tap semantics +4. prepare GPU-side persistent state ownership +5. only then replace the actual production GPU execution path + +This means the repo now contains partially completed new architecture pieces that are deliberate stepping stones, not abandoned half-fixes. + ### Reviewer package artifacts created for second-opinion review To support external/secondary review of the GPU extractor path, a focused reviewer package was created in the project root: diff --git a/docs/gpu-streaming-refactor-plan-2026-03-25.md b/docs/gpu-streaming-refactor-plan-2026-03-25.md new file mode 100644 index 0000000..a381078 --- /dev/null +++ b/docs/gpu-streaming-refactor-plan-2026-03-25.md @@ -0,0 +1,48 @@ +# GPU Streaming Refactor Plan (2026-03-25) + +## Goal +Replace the current overlap+trim GPU extractor model with a true stateful per-signal streaming architecture, and build a corrected CPU oracle/reference path for validation. + +## Non-negotiables +- No production start-index-only patch. +- No production overlap-prepend + trim continuity model. +- Exact integer decimation only in the new streaming production path. +- Persistent per-signal state must include NCO phase, FIR history, and decimator phase/residue. +- GPU validation must compare against a corrected CPU oracle, not the legacy CPU fallback. + +## Work order +1. Introduce explicit stateful streaming types in `gpudemod`. +2. Add a clean CPU oracle implementation and monolithic-vs-chunked tests. +3. Add per-signal state ownership in batch runner. +4. Implement new streaming extractor semantics in Go using NEW IQ samples only. +5. Replace legacy GPU-path assumptions (rounding decimation, overlap-prepend, trim-defined validity) in the new path. +6. Add production telemetry that proves state continuity (`phase_count`, `history_len`, `n_out`, reference error). +7. Keep legacy path isolated only for temporary comparison if needed. + +## Initial files in scope +- `internal/demod/gpudemod/batch.go` +- `internal/demod/gpudemod/batch_runner.go` +- `internal/demod/gpudemod/batch_runner_windows.go` +- `internal/demod/gpudemod/kernels.cu` +- `internal/demod/gpudemod/native/exports.cu` +- `cmd/sdrd/helpers.go` + +## Immediate implementation strategy +### Phase 1 +- Create explicit streaming state structs in Go. +- Add CPU oracle/reference path with exact semantics and tests. +- Introduce exact integer-decimation checks. + +### Phase 2 +- Rework batch runner to own persistent per-signal state. +- Add config-hash-based resets. +- Stop modeling continuity via overlap tail in the new path. + +### Phase 3 +- Introduce a real streaming GPU entry path that consumes NEW shifted samples plus carried state. +- Move to a stateful polyphase decimator model. + +## Validation expectations +- CPU oracle monolithic == CPU oracle chunked within tolerance. +- GPU streaming output == CPU oracle chunked within tolerance. +- Former periodic block-boundary clicks gone in real-world testing. diff --git a/internal/demod/gpudemod/batch.go b/internal/demod/gpudemod/batch.go index 6bbf9df..df6af46 100644 --- a/internal/demod/gpudemod/batch.go +++ b/internal/demod/gpudemod/batch.go @@ -6,7 +6,7 @@ type ExtractJob struct { OffsetHz float64 BW float64 OutRate int - PhaseStart float64 // FreqShift starting phase (0 for stateless, carry over for streaming) + PhaseStart float64 // legacy batch phase field; retained only while migrating to streaming extractor semantics } // ExtractResult holds the output of a batch extraction including the ending diff --git a/internal/demod/gpudemod/batch_runner.go b/internal/demod/gpudemod/batch_runner.go index 7441263..4f8c5d0 100644 --- a/internal/demod/gpudemod/batch_runner.go +++ b/internal/demod/gpudemod/batch_runner.go @@ -10,10 +10,11 @@ type batchSlot struct { } type BatchRunner struct { - eng *Engine - slots []batchSlot - slotBufs []slotBuffers + eng *Engine + slots []batchSlot + slotBufs []slotBuffers slotBufSize int // number of IQ samples the slot buffers were allocated for + streamState map[int64]*ExtractStreamState } func NewBatchRunner(maxSamples int, sampleRate int) (*BatchRunner, error) { @@ -21,7 +22,7 @@ func NewBatchRunner(maxSamples int, sampleRate int) (*BatchRunner, error) { if err != nil { return nil, err } - return &BatchRunner{eng: eng}, nil + return &BatchRunner{eng: eng, streamState: make(map[int64]*ExtractStreamState)}, nil } func (r *BatchRunner) Close() { @@ -32,6 +33,7 @@ func (r *BatchRunner) Close() { r.eng.Close() r.eng = nil r.slots = nil + r.streamState = nil } func (r *BatchRunner) prepare(jobs []ExtractJob) { diff --git a/internal/demod/gpudemod/batch_runner_windows.go b/internal/demod/gpudemod/batch_runner_windows.go index c81467c..58836fd 100644 --- a/internal/demod/gpudemod/batch_runner_windows.go +++ b/internal/demod/gpudemod/batch_runner_windows.go @@ -160,9 +160,9 @@ func (r *BatchRunner) shiftFilterDecimateSlotParallel(iq []complex64, job Extrac if bridgeMemcpyH2D(buf.dTaps, unsafe.Pointer(&taps[0]), tapsBytes) != 0 { return 0, 0, errors.New("taps H2D failed") } - decim := int(math.Round(float64(e.sampleRate) / float64(job.OutRate))) - if decim < 1 { - decim = 1 + decim, err := ExactIntegerDecimation(e.sampleRate, job.OutRate) + if err != nil { + return 0, 0, err } nOut := n / decim if nOut <= 0 { diff --git a/internal/demod/gpudemod/build/gpudemod_kernels.exp b/internal/demod/gpudemod/build/gpudemod_kernels.exp deleted file mode 100644 index 5ae420e..0000000 Binary files a/internal/demod/gpudemod/build/gpudemod_kernels.exp and /dev/null differ diff --git a/internal/demod/gpudemod/build/gpudemod_kernels.lib b/internal/demod/gpudemod/build/gpudemod_kernels.lib deleted file mode 100644 index dccfca0..0000000 Binary files a/internal/demod/gpudemod/build/gpudemod_kernels.lib and /dev/null differ diff --git a/internal/demod/gpudemod/compare.go b/internal/demod/gpudemod/compare.go new file mode 100644 index 0000000..24ba29b --- /dev/null +++ b/internal/demod/gpudemod/compare.go @@ -0,0 +1,47 @@ +package gpudemod + +import "math/cmplx" + +type CompareStats struct { + MaxAbsErr float64 + RMSErr float64 + Count int +} + +func CompareComplexSlices(a []complex64, b []complex64) CompareStats { + n := len(a) + if len(b) < n { + n = len(b) + } + if n == 0 { + return CompareStats{} + } + var sumSq float64 + var maxAbs float64 + for i := 0; i < n; i++ { + err := cmplx.Abs(complex128(a[i] - b[i])) + if err > maxAbs { + maxAbs = err + } + sumSq += err * err + } + return CompareStats{ + MaxAbsErr: maxAbs, + RMSErr: mathSqrt(sumSq / float64(n)), + Count: n, + } +} + +func mathSqrt(v float64) float64 { + // tiny shim to keep the compare helper self-contained and easy to move + // without importing additional logic elsewhere + z := v + if z <= 0 { + return 0 + } + x := z + for i := 0; i < 12; i++ { + x = 0.5 * (x + z/x) + } + return x +} diff --git a/internal/demod/gpudemod/compare_gpu.go b/internal/demod/gpudemod/compare_gpu.go new file mode 100644 index 0000000..9232c3c --- /dev/null +++ b/internal/demod/gpudemod/compare_gpu.go @@ -0,0 +1,19 @@ +package gpudemod + +func BuildGPUStubDebugMetrics(res StreamingExtractResult) ExtractDebugMetrics { + return ExtractDebugMetrics{ + SignalID: res.SignalID, + PhaseCount: res.PhaseCount, + HistoryLen: res.HistoryLen, + NOut: res.NOut, + } +} + +func BuildGPUHostOracleDebugMetrics(res StreamingExtractResult) ExtractDebugMetrics { + return ExtractDebugMetrics{ + SignalID: res.SignalID, + PhaseCount: res.PhaseCount, + HistoryLen: res.HistoryLen, + NOut: res.NOut, + } +} diff --git a/internal/demod/gpudemod/compare_oracle.go b/internal/demod/gpudemod/compare_oracle.go new file mode 100644 index 0000000..ccf48e5 --- /dev/null +++ b/internal/demod/gpudemod/compare_oracle.go @@ -0,0 +1,10 @@ +package gpudemod + +func BuildOracleDebugMetrics(res StreamingExtractResult) ExtractDebugMetrics { + return ExtractDebugMetrics{ + SignalID: res.SignalID, + PhaseCount: res.PhaseCount, + HistoryLen: res.HistoryLen, + NOut: res.NOut, + } +} diff --git a/internal/demod/gpudemod/compare_pipeline.go b/internal/demod/gpudemod/compare_pipeline.go new file mode 100644 index 0000000..5578fd9 --- /dev/null +++ b/internal/demod/gpudemod/compare_pipeline.go @@ -0,0 +1,27 @@ +package gpudemod + +func CompareOracleAndGPUStub(oracle StreamingExtractResult, gpu StreamingExtractResult) (ExtractDebugMetrics, CompareStats) { + stats := CompareComplexSlices(oracle.IQ, gpu.IQ) + metrics := ExtractDebugMetrics{ + SignalID: oracle.SignalID, + PhaseCount: gpu.PhaseCount, + HistoryLen: gpu.HistoryLen, + NOut: gpu.NOut, + RefMaxAbsErr: stats.MaxAbsErr, + RefRMSErr: stats.RMSErr, + } + return metrics, stats +} + +func CompareOracleAndGPUHostOracle(oracle StreamingExtractResult, gpu StreamingExtractResult) (ExtractDebugMetrics, CompareStats) { + stats := CompareComplexSlices(oracle.IQ, gpu.IQ) + metrics := ExtractDebugMetrics{ + SignalID: oracle.SignalID, + PhaseCount: gpu.PhaseCount, + HistoryLen: gpu.HistoryLen, + NOut: gpu.NOut, + RefMaxAbsErr: stats.MaxAbsErr, + RefRMSErr: stats.RMSErr, + } + return metrics, stats +} diff --git a/internal/demod/gpudemod/compare_pipeline_test.go b/internal/demod/gpudemod/compare_pipeline_test.go new file mode 100644 index 0000000..9337674 --- /dev/null +++ b/internal/demod/gpudemod/compare_pipeline_test.go @@ -0,0 +1,32 @@ +package gpudemod + +import "testing" + +func TestCompareOracleAndGPUStub(t *testing.T) { + oracle := StreamingExtractResult{ + SignalID: 1, + IQ: []complex64{1 + 1i, 2 + 2i}, + Rate: 200000, + NOut: 2, + PhaseCount: 0, + HistoryLen: 64, + } + gpu := StreamingExtractResult{ + SignalID: 1, + IQ: []complex64{1 + 1i, 2.1 + 2i}, + Rate: 200000, + NOut: 2, + PhaseCount: 3, + HistoryLen: 64, + } + metrics, stats := CompareOracleAndGPUStub(oracle, gpu) + if metrics.SignalID != 1 { + t.Fatalf("unexpected signal id: %d", metrics.SignalID) + } + if stats.Count != 2 { + t.Fatalf("unexpected compare count: %d", stats.Count) + } + if metrics.RefMaxAbsErr <= 0 { + t.Fatalf("expected positive max abs error") + } +} diff --git a/internal/demod/gpudemod/compare_state.go b/internal/demod/gpudemod/compare_state.go new file mode 100644 index 0000000..34e35d0 --- /dev/null +++ b/internal/demod/gpudemod/compare_state.go @@ -0,0 +1,12 @@ +package gpudemod + +type ExtractDebugMetrics struct { + SignalID int64 + PhaseCount int + HistoryLen int + NOut int + RefMaxAbsErr float64 + RefRMSErr float64 + BoundaryDelta float64 + BoundaryD2 float64 +} diff --git a/internal/demod/gpudemod/compare_test.go b/internal/demod/gpudemod/compare_test.go new file mode 100644 index 0000000..643c61e --- /dev/null +++ b/internal/demod/gpudemod/compare_test.go @@ -0,0 +1,18 @@ +package gpudemod + +import "testing" + +func TestCompareComplexSlices(t *testing.T) { + a := []complex64{1 + 1i, 2 + 2i, 3 + 3i} + b := []complex64{1 + 1i, 2.1 + 2i, 2.9 + 3.2i} + stats := CompareComplexSlices(a, b) + if stats.Count != 3 { + t.Fatalf("unexpected count: %d", stats.Count) + } + if stats.MaxAbsErr <= 0 { + t.Fatalf("expected positive max abs error") + } + if stats.RMSErr <= 0 { + t.Fatalf("expected positive rms error") + } +} diff --git a/internal/demod/gpudemod/cpu_oracle.go b/internal/demod/gpudemod/cpu_oracle.go new file mode 100644 index 0000000..d045072 --- /dev/null +++ b/internal/demod/gpudemod/cpu_oracle.go @@ -0,0 +1,170 @@ +package gpudemod + +import ( + "fmt" + "math" +) + +type CPUOracleState struct { + SignalID int64 + ConfigHash uint64 + NCOPhase float64 + Decim int + PhaseCount int + NumTaps int + ShiftedHistory []complex64 + BaseTaps []float32 + PolyphaseTaps []float32 +} + +func ResetCPUOracleStateIfConfigChanged(state *CPUOracleState, newHash uint64) { + if state == nil { + return + } + if state.ConfigHash != newHash { + state.ConfigHash = newHash + state.NCOPhase = 0 + state.PhaseCount = 0 + state.ShiftedHistory = state.ShiftedHistory[:0] + } +} + +func CPUOracleExtract(iqNew []complex64, state *CPUOracleState, phaseInc float64) []complex64 { + if state == nil || state.NumTaps <= 0 || state.Decim <= 0 || len(state.BaseTaps) < state.NumTaps { + return nil + } + out := make([]complex64, 0, len(iqNew)/maxInt(1, state.Decim)+2) + phase := state.NCOPhase + hist := append([]complex64(nil), state.ShiftedHistory...) + + for _, x := range iqNew { + rot := complex64(complex(math.Cos(phase), math.Sin(phase))) + s := x * rot + hist = append(hist, s) + state.PhaseCount++ + + if state.PhaseCount == state.Decim { + var y complex64 + for k := 0; k < state.NumTaps; k++ { + idx := len(hist) - 1 - k + var sample complex64 + if idx >= 0 { + sample = hist[idx] + } + y += complex(state.BaseTaps[k], 0) * sample + } + out = append(out, y) + state.PhaseCount = 0 + } + + if len(hist) > state.NumTaps-1 { + hist = hist[len(hist)-(state.NumTaps-1):] + } + + phase += phaseInc + if phase >= math.Pi { + phase -= 2 * math.Pi + } else if phase < -math.Pi { + phase += 2 * math.Pi + } + } + + state.NCOPhase = phase + state.ShiftedHistory = append(state.ShiftedHistory[:0], hist...) + return out +} + +// CPUOracleExtractPolyphase keeps the same streaming state semantics as CPUOracleExtract, +// but computes outputs using the explicit phase-major polyphase tap layout. +func CPUOracleExtractPolyphase(iqNew []complex64, state *CPUOracleState, phaseInc float64) []complex64 { + if state == nil || state.NumTaps <= 0 || state.Decim <= 0 || len(state.BaseTaps) < state.NumTaps { + return nil + } + if len(state.PolyphaseTaps) == 0 { + state.PolyphaseTaps = BuildPolyphaseTapsPhaseMajor(state.BaseTaps, state.Decim) + } + phaseLen := PolyphasePhaseLen(len(state.BaseTaps), state.Decim) + out := make([]complex64, 0, len(iqNew)/maxInt(1, state.Decim)+2) + phase := state.NCOPhase + hist := append([]complex64(nil), state.ShiftedHistory...) + + for _, x := range iqNew { + rot := complex64(complex(math.Cos(phase), math.Sin(phase))) + s := x * rot + hist = append(hist, s) + state.PhaseCount++ + + if state.PhaseCount == state.Decim { + var y complex64 + for p := 0; p < state.Decim; p++ { + for k := 0; k < phaseLen; k++ { + tap := state.PolyphaseTaps[p*phaseLen+k] + if tap == 0 { + continue + } + srcBack := p + k*state.Decim + idx := len(hist) - 1 - srcBack + if idx < 0 { + continue + } + y += complex(tap, 0) * hist[idx] + } + } + out = append(out, y) + state.PhaseCount = 0 + } + + if len(hist) > state.NumTaps-1 { + hist = hist[len(hist)-(state.NumTaps-1):] + } + + phase += phaseInc + if phase >= math.Pi { + phase -= 2 * math.Pi + } else if phase < -math.Pi { + phase += 2 * math.Pi + } + } + + state.NCOPhase = phase + state.ShiftedHistory = append(state.ShiftedHistory[:0], hist...) + return out +} + +func RunChunkedCPUOracle(all []complex64, chunkSizes []int, mkState func() *CPUOracleState, phaseInc float64) []complex64 { + state := mkState() + out := make([]complex64, 0) + pos := 0 + for _, n := range chunkSizes { + if pos >= len(all) { + break + } + end := pos + n + if end > len(all) { + end = len(all) + } + out = append(out, CPUOracleExtract(all[pos:end], state, phaseInc)...) + pos = end + } + if pos < len(all) { + out = append(out, CPUOracleExtract(all[pos:], state, phaseInc)...) + } + return out +} + +func ExactIntegerDecimation(sampleRate int, outRate int) (int, error) { + if sampleRate <= 0 || outRate <= 0 { + return 0, fmt.Errorf("invalid sampleRate/outRate: %d/%d", sampleRate, outRate) + } + if sampleRate%outRate != 0 { + return 0, fmt.Errorf("streaming polyphase extractor requires integer decimation: sampleRate=%d outRate=%d", sampleRate, outRate) + } + return sampleRate / outRate, nil +} + +func maxInt(a int, b int) int { + if a > b { + return a + } + return b +} diff --git a/internal/demod/gpudemod/cpu_oracle_test.go b/internal/demod/gpudemod/cpu_oracle_test.go new file mode 100644 index 0000000..762caeb --- /dev/null +++ b/internal/demod/gpudemod/cpu_oracle_test.go @@ -0,0 +1,89 @@ +package gpudemod + +import ( + "math" + "math/cmplx" + "testing" +) + +func makeDeterministicIQ(n int) []complex64 { + out := make([]complex64, n) + for i := 0; i < n; i++ { + a := 0.017 * float64(i) + b := 0.031 * float64(i) + out[i] = complex64(complex(math.Cos(a)+0.2*math.Cos(b), math.Sin(a)+0.15*math.Sin(b))) + } + return out +} + +func makeLowpassTaps(n int) []float32 { + out := make([]float32, n) + for i := range out { + out[i] = 1.0 / float32(n) + } + return out +} + +func requireComplexSlicesClose(t *testing.T, a []complex64, b []complex64, tol float64) { + t.Helper() + if len(a) != len(b) { + t.Fatalf("length mismatch: %d vs %d", len(a), len(b)) + } + for i := range a { + if cmplx.Abs(complex128(a[i]-b[i])) > tol { + t.Fatalf("slice mismatch at %d: %v vs %v (tol=%f)", i, a[i], b[i], tol) + } + } +} + +func TestCPUOracleMonolithicVsChunked(t *testing.T) { + iq := makeDeterministicIQ(200000) + mk := func() *CPUOracleState { + return &CPUOracleState{ + SignalID: 1, + ConfigHash: 123, + NCOPhase: 0, + Decim: 20, + PhaseCount: 0, + NumTaps: 65, + ShiftedHistory: make([]complex64, 0, 64), + BaseTaps: makeLowpassTaps(65), + } + } + phaseInc := 0.017 + monoState := mk() + mono := CPUOracleExtract(iq, monoState, phaseInc) + chunked := RunChunkedCPUOracle(iq, []int{4096, 5000, 8192, 27307}, mk, phaseInc) + requireComplexSlicesClose(t, mono, chunked, 1e-5) +} + +func TestExactIntegerDecimation(t *testing.T) { + if d, err := ExactIntegerDecimation(4000000, 200000); err != nil || d != 20 { + t.Fatalf("unexpected exact decim result: d=%d err=%v", d, err) + } + if _, err := ExactIntegerDecimation(4000000, 192000); err == nil { + t.Fatalf("expected non-integer decimation error") + } +} + +func TestCPUOracleDirectVsPolyphase(t *testing.T) { + iq := makeDeterministicIQ(50000) + mk := func() *CPUOracleState { + taps := makeLowpassTaps(65) + return &CPUOracleState{ + SignalID: 1, + ConfigHash: 123, + NCOPhase: 0, + Decim: 20, + PhaseCount: 0, + NumTaps: 65, + ShiftedHistory: make([]complex64, 0, 64), + BaseTaps: taps, + PolyphaseTaps: BuildPolyphaseTapsPhaseMajor(taps, 20), + } + } + phaseInc := 0.017 + direct := CPUOracleExtract(iq, mk(), phaseInc) + poly := CPUOracleExtractPolyphase(iq, mk(), phaseInc) + requireComplexSlicesClose(t, direct, poly, 1e-5) +} diff --git a/internal/demod/gpudemod/native/exports.cu b/internal/demod/gpudemod/native/exports.cu index 6081b57..d3675bc 100644 --- a/internal/demod/gpudemod/native/exports.cu +++ b/internal/demod/gpudemod/native/exports.cu @@ -320,3 +320,132 @@ GPUD_API int GPUD_CALL gpud_launch_ssb_product_cuda( gpud_ssb_product_kernel<<>>(in, out, n, phase_inc, phase_start); return (int)cudaGetLastError(); } + +GPUD_API int GPUD_CALL gpud_launch_streaming_polyphase_prepare_cuda( + const float2* in_new, + int n_new, + const float2* history_in, + int history_len, + const float* polyphase_taps, + int polyphase_len, + int decim, + int num_taps, + int phase_count_in, + double phase_start, + double phase_inc, + float2* out, + int* n_out, + int* phase_count_out, + double* phase_end_out, + float2* history_out +) { + if (!in_new || n_new < 0 || !polyphase_taps || polyphase_len <= 0 || decim <= 0 || num_taps <= 0) return -1; + const int phase_len = (num_taps + decim - 1) / decim; + if (polyphase_len < decim * phase_len) return -2; + + const int combined_len = history_len + n_new; + float2* shifted = NULL; + float2* combined = NULL; + cudaError_t err = cudaMalloc((void**)&shifted, (size_t)max(1, n_new) * sizeof(float2)); + if (err != cudaSuccess) return (int)err; + err = cudaMalloc((void**)&combined, (size_t)max(1, combined_len) * sizeof(float2)); + if (err != cudaSuccess) { + cudaFree(shifted); + return (int)err; + } + + const int block = 256; + const int grid_shift = (n_new + block - 1) / block; + if (n_new > 0) { + gpud_freq_shift_kernel<<>>(in_new, shifted, n_new, phase_inc, phase_start); + err = cudaGetLastError(); + if (err != cudaSuccess) { + cudaFree(shifted); + cudaFree(combined); + return (int)err; + } + } + + if (history_len > 0 && history_in) { + err = cudaMemcpy(combined, history_in, (size_t)history_len * sizeof(float2), cudaMemcpyDeviceToDevice); + if (err != cudaSuccess) { + cudaFree(shifted); + cudaFree(combined); + return (int)err; + } + } + if (n_new > 0) { + err = cudaMemcpy(combined + history_len, shifted, (size_t)n_new * sizeof(float2), cudaMemcpyDeviceToDevice); + if (err != cudaSuccess) { + cudaFree(shifted); + cudaFree(combined); + return (int)err; + } + } + + int out_count = 0; + int phase_count = phase_count_in; + for (int i = 0; i < n_new; ++i) { + phase_count++; + if (phase_count == decim) { + float2 acc = make_float2(0.0f, 0.0f); + int newest = history_len + i; + for (int p = 0; p < decim; ++p) { + for (int k = 0; k < phase_len; ++k) { + int tap_idx = p * phase_len + k; + if (tap_idx >= polyphase_len) continue; + float tap; + err = cudaMemcpy(&tap, polyphase_taps + tap_idx, sizeof(float), cudaMemcpyDeviceToHost); + if (err != cudaSuccess) { + cudaFree(shifted); + cudaFree(combined); + return (int)err; + } + if (tap == 0.0f) continue; + int src_back = p + k * decim; + int src_idx = newest - src_back; + if (src_idx < 0) continue; + float2 sample; + err = cudaMemcpy(&sample, combined + src_idx, sizeof(float2), cudaMemcpyDeviceToHost); + if (err != cudaSuccess) { + cudaFree(shifted); + cudaFree(combined); + return (int)err; + } + acc.x += sample.x * tap; + acc.y += sample.y * tap; + } + } + err = cudaMemcpy(out + out_count, &acc, sizeof(float2), cudaMemcpyHostToDevice); + if (err != cudaSuccess) { + cudaFree(shifted); + cudaFree(combined); + return (int)err; + } + out_count++; + phase_count = 0; + } + } + + const int keep = num_taps > 1 ? num_taps - 1 : 0; + if (history_out && keep > 0) { + int copy = keep; + if (combined_len < copy) copy = combined_len; + if (copy > 0) { + err = cudaMemcpy(history_out, combined + (combined_len - copy), (size_t)copy * sizeof(float2), cudaMemcpyDeviceToDevice); + if (err != cudaSuccess) { + cudaFree(shifted); + cudaFree(combined); + return (int)err; + } + } + } + + if (n_out) *n_out = out_count; + if (phase_count_out) *phase_count_out = phase_count; + if (phase_end_out) *phase_end_out = phase_start + phase_inc * (double)n_new; + + cudaFree(shifted); + cudaFree(combined); + return 0; +} diff --git a/internal/demod/gpudemod/oracle_runner_test.go b/internal/demod/gpudemod/oracle_runner_test.go new file mode 100644 index 0000000..e7d27bd --- /dev/null +++ b/internal/demod/gpudemod/oracle_runner_test.go @@ -0,0 +1,31 @@ +package gpudemod + +import "testing" + +func TestCPUOracleRunnerCleansUpDisappearedSignals(t *testing.T) { + r := NewCPUOracleRunner(4000000) + jobs1 := []StreamingExtractJob{ + {SignalID: 1, OffsetHz: 1000, Bandwidth: 20000, OutRate: 200000, NumTaps: 65, ConfigHash: 101}, + {SignalID: 2, OffsetHz: 2000, Bandwidth: 20000, OutRate: 200000, NumTaps: 65, ConfigHash: 102}, + } + _, err := r.StreamingExtract(makeDeterministicIQ(4096), jobs1) + if err != nil { + t.Fatalf("unexpected error on first extract: %v", err) + } + if len(r.States) != 2 { + t.Fatalf("expected 2 states, got %d", len(r.States)) + } + jobs2 := []StreamingExtractJob{ + {SignalID: 2, OffsetHz: 2000, Bandwidth: 20000, OutRate: 200000, NumTaps: 65, ConfigHash: 102}, + } + _, err = r.StreamingExtract(makeDeterministicIQ(2048), jobs2) + if err != nil { + t.Fatalf("unexpected error on second extract: %v", err) + } + if len(r.States) != 1 { + t.Fatalf("expected 1 state after cleanup, got %d", len(r.States)) + } + if _, ok := r.States[1]; ok { + t.Fatalf("expected signal 1 state to be cleaned up") + } +} diff --git a/internal/demod/gpudemod/oracle_validation_test.go b/internal/demod/gpudemod/oracle_validation_test.go new file mode 100644 index 0000000..7026dcb --- /dev/null +++ b/internal/demod/gpudemod/oracle_validation_test.go @@ -0,0 +1,45 @@ +package gpudemod + +import "testing" + +func TestCPUOracleMonolithicVsChunkedPolyphase(t *testing.T) { + iq := makeDeterministicIQ(120000) + mk := func() *CPUOracleState { + taps := makeLowpassTaps(65) + return &CPUOracleState{ + SignalID: 1, + ConfigHash: 999, + NCOPhase: 0, + Decim: 20, + PhaseCount: 0, + NumTaps: 65, + ShiftedHistory: make([]complex64, 0, 64), + BaseTaps: taps, + PolyphaseTaps: BuildPolyphaseTapsPhaseMajor(taps, 20), + } + } + phaseInc := 0.013 + mono := CPUOracleExtractPolyphase(iq, mk(), phaseInc) + chunked := func() []complex64 { + state := mk() + out := make([]complex64, 0) + chunks := []int{4096, 3000, 8192, 7777, 12000} + pos := 0 + for _, n := range chunks { + if pos >= len(iq) { + break + } + end := pos + n + if end > len(iq) { + end = len(iq) + } + out = append(out, CPUOracleExtractPolyphase(iq[pos:end], state, phaseInc)...) + pos = end + } + if pos < len(iq) { + out = append(out, CPUOracleExtractPolyphase(iq[pos:], state, phaseInc)...) + } + return out + }() + requireComplexSlicesClose(t, mono, chunked, 1e-5) +} diff --git a/internal/demod/gpudemod/polyphase.go b/internal/demod/gpudemod/polyphase.go new file mode 100644 index 0000000..f92acd7 --- /dev/null +++ b/internal/demod/gpudemod/polyphase.go @@ -0,0 +1,28 @@ +package gpudemod + +// BuildPolyphaseTapsPhaseMajor builds a phase-major polyphase tap layout: +// tapsByPhase[p][k] = h[p + k*D] +// Flattened as: [phase0 taps..., phase1 taps..., ...] +func BuildPolyphaseTapsPhaseMajor(base []float32, decim int) []float32 { + if decim <= 0 || len(base) == 0 { + return nil + } + maxPhaseLen := (len(base) + decim - 1) / decim + out := make([]float32, decim*maxPhaseLen) + for p := 0; p < decim; p++ { + for k := 0; k < maxPhaseLen; k++ { + src := p + k*decim + if src < len(base) { + out[p*maxPhaseLen+k] = base[src] + } + } + } + return out +} + +func PolyphasePhaseLen(baseLen int, decim int) int { + if decim <= 0 || baseLen <= 0 { + return 0 + } + return (baseLen + decim - 1) / decim +} diff --git a/internal/demod/gpudemod/polyphase_test.go b/internal/demod/gpudemod/polyphase_test.go new file mode 100644 index 0000000..bd8ecb9 --- /dev/null +++ b/internal/demod/gpudemod/polyphase_test.go @@ -0,0 +1,22 @@ +package gpudemod + +import "testing" + +func TestBuildPolyphaseTapsPhaseMajor(t *testing.T) { + base := []float32{1, 2, 3, 4, 5, 6, 7} + got := BuildPolyphaseTapsPhaseMajor(base, 3) + // phase-major with phase len ceil(7/3)=3 + want := []float32{ + 1, 4, 7, + 2, 5, 0, + 3, 6, 0, + } + if len(got) != len(want) { + t.Fatalf("len mismatch: got %d want %d", len(got), len(want)) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("mismatch at %d: got %v want %v", i, got[i], want[i]) + } + } +} diff --git a/internal/demod/gpudemod/state_reset_test.go b/internal/demod/gpudemod/state_reset_test.go new file mode 100644 index 0000000..9345caa --- /dev/null +++ b/internal/demod/gpudemod/state_reset_test.go @@ -0,0 +1,57 @@ +package gpudemod + +import "testing" + +func TestResetCPUOracleStateIfConfigChanged(t *testing.T) { + state := &CPUOracleState{ + SignalID: 1, + ConfigHash: 111, + NCOPhase: 1.23, + Decim: 20, + PhaseCount: 7, + NumTaps: 65, + ShiftedHistory: []complex64{1 + 1i, 2 + 2i}, + } + ResetCPUOracleStateIfConfigChanged(state, 222) + if state.ConfigHash != 222 { + t.Fatalf("config hash not updated") + } + if state.NCOPhase != 0 { + t.Fatalf("expected phase reset") + } + if state.PhaseCount != 0 { + t.Fatalf("expected phase count reset") + } + if len(state.ShiftedHistory) != 0 { + t.Fatalf("expected shifted history reset") + } +} + +func TestResetExtractStreamState(t *testing.T) { + state := &ExtractStreamState{ + SignalID: 1, + ConfigHash: 111, + NCOPhase: 2.34, + Decim: 20, + PhaseCount: 9, + NumTaps: 65, + ShiftedHistory: []complex64{3 + 3i, 4 + 4i}, + Initialized: true, + } + ResetExtractStreamState(state, 333) + if state.ConfigHash != 333 { + t.Fatalf("config hash not updated") + } + if state.NCOPhase != 0 { + t.Fatalf("expected phase reset") + } + if state.PhaseCount != 0 { + t.Fatalf("expected phase count reset") + } + if len(state.ShiftedHistory) != 0 { + t.Fatalf("expected shifted history reset") + } + if state.Initialized { + t.Fatalf("expected initialized=false after reset") + } +} diff --git a/internal/demod/gpudemod/stream_state.go b/internal/demod/gpudemod/stream_state.go new file mode 100644 index 0000000..ef8d400 --- /dev/null +++ b/internal/demod/gpudemod/stream_state.go @@ -0,0 +1,60 @@ +package gpudemod + +import "sdr-wideband-suite/internal/dsp" + +func (r *BatchRunner) ResetSignalState(signalID int64) { + if r == nil || r.streamState == nil { + return + } + delete(r.streamState, signalID) +} + +func (r *BatchRunner) ResetAllSignalStates() { + if r == nil { + return + } + r.streamState = make(map[int64]*ExtractStreamState) +} + +func (r *BatchRunner) getOrInitExtractState(job StreamingExtractJob, sampleRate int) (*ExtractStreamState, error) { + if r == nil { + return nil, ErrUnavailable + } + if r.streamState == nil { + r.streamState = make(map[int64]*ExtractStreamState) + } + decim, err := ExactIntegerDecimation(sampleRate, job.OutRate) + if err != nil { + return nil, err + } + state := r.streamState[job.SignalID] + if state == nil { + state = &ExtractStreamState{SignalID: job.SignalID} + r.streamState[job.SignalID] = state + } + if state.ConfigHash != job.ConfigHash { + ResetExtractStreamState(state, job.ConfigHash) + } + state.Decim = decim + state.NumTaps = job.NumTaps + if state.NumTaps <= 0 { + state.NumTaps = 101 + } + cutoff := job.Bandwidth / 2 + if cutoff < 200 { + cutoff = 200 + } + base := dsp.LowpassFIR(cutoff, sampleRate, state.NumTaps) + state.BaseTaps = make([]float32, len(base)) + for i, v := range base { + state.BaseTaps[i] = float32(v) + } + state.PolyphaseTaps = BuildPolyphaseTapsPhaseMajor(state.BaseTaps, state.Decim) + if cap(state.ShiftedHistory) < maxInt(0, state.NumTaps-1) { + state.ShiftedHistory = make([]complex64, 0, maxInt(0, state.NumTaps-1)) + } else if state.ShiftedHistory == nil { + state.ShiftedHistory = make([]complex64, 0, maxInt(0, state.NumTaps-1)) + } + state.Initialized = true + return state, nil +} diff --git a/internal/demod/gpudemod/stream_state_test.go b/internal/demod/gpudemod/stream_state_test.go new file mode 100644 index 0000000..b86c5f5 --- /dev/null +++ b/internal/demod/gpudemod/stream_state_test.go @@ -0,0 +1,31 @@ +package gpudemod + +import "testing" + +func TestGetOrInitExtractStateInitializesPolyphaseAndHistory(t *testing.T) { + r := &BatchRunner{streamState: make(map[int64]*ExtractStreamState)} + job := StreamingExtractJob{ + SignalID: 7, + OffsetHz: 12500, + Bandwidth: 20000, + OutRate: 200000, + NumTaps: 65, + ConfigHash: 555, + } + state, err := r.getOrInitExtractState(job, 4000000) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if state.Decim != 20 { + t.Fatalf("unexpected decim: %d", state.Decim) + } + if len(state.BaseTaps) != 65 { + t.Fatalf("unexpected base taps len: %d", len(state.BaseTaps)) + } + if len(state.PolyphaseTaps) == 0 { + t.Fatalf("expected polyphase taps") + } + if cap(state.ShiftedHistory) < 64 { + t.Fatalf("expected shifted history capacity >= 64, got %d", cap(state.ShiftedHistory)) + } +} diff --git a/internal/demod/gpudemod/streaming_gpu_contract.go b/internal/demod/gpudemod/streaming_gpu_contract.go new file mode 100644 index 0000000..6a059c1 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_contract.go @@ -0,0 +1,38 @@ +package gpudemod + +type StreamingGPUExecutionMode string + +const ( + StreamingGPUExecUnavailable StreamingGPUExecutionMode = "unavailable" + StreamingGPUExecHostOracle StreamingGPUExecutionMode = "host_oracle" + StreamingGPUExecCUDA StreamingGPUExecutionMode = "cuda" +) + +type StreamingGPUInvocation struct { + SignalID int64 + OffsetHz float64 + OutRate int + Bandwidth float64 + SampleRate int + NumTaps int + Decim int + PhaseCountIn int + NCOPhaseIn float64 + HistoryLen int + BaseTaps []float32 + PolyphaseTaps []float32 + ShiftedHistory []complex64 + IQNew []complex64 +} + +type StreamingGPUExecutionResult struct { + SignalID int64 + Mode StreamingGPUExecutionMode + IQ []complex64 + Rate int + NOut int + PhaseCountOut int + NCOPhaseOut float64 + HistoryOut []complex64 + HistoryLenOut int +} diff --git a/internal/demod/gpudemod/streaming_gpu_exec.go b/internal/demod/gpudemod/streaming_gpu_exec.go new file mode 100644 index 0000000..6e74dc1 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_exec.go @@ -0,0 +1,27 @@ +package gpudemod + +// StreamingExtractGPUExec is the internal execution selector for the new +// production-path semantics. It intentionally keeps the public API stable while +// allowing the implementation to evolve from host-side oracle execution toward +// a real GPU polyphase path. +func (r *BatchRunner) StreamingExtractGPUExec(iqNew []complex64, jobs []StreamingExtractJob) ([]StreamingExtractResult, error) { + invocations, err := r.buildStreamingGPUInvocations(iqNew, jobs) + if err != nil { + return nil, err + } + if useGPUHostOracleExecution { + execResults, err := r.executeStreamingGPUHostOraclePrepared(invocations) + if err != nil { + return nil, err + } + return r.applyStreamingGPUExecutionResults(execResults), nil + } + if useGPUNativePreparedExecution { + execResults, err := r.executeStreamingGPUNativePrepared(invocations) + if err != nil { + return nil, err + } + return r.applyStreamingGPUExecutionResults(execResults), nil + } + return nil, ErrUnavailable +} diff --git a/internal/demod/gpudemod/streaming_gpu_exec_test.go b/internal/demod/gpudemod/streaming_gpu_exec_test.go new file mode 100644 index 0000000..3d4a5b6 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_exec_test.go @@ -0,0 +1,19 @@ +package gpudemod + +import "testing" + +func TestStreamingExtractGPUExecUnavailableByDefault(t *testing.T) { + r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} + job := StreamingExtractJob{ + SignalID: 1, + OffsetHz: 12500, + Bandwidth: 20000, + OutRate: 200000, + NumTaps: 65, + ConfigHash: 777, + } + _, err := r.StreamingExtractGPUExec(makeDeterministicIQ(2048), []StreamingExtractJob{job}) + if err == nil { + t.Fatalf("expected unavailable/disabled execution path by default") + } +} diff --git a/internal/demod/gpudemod/streaming_gpu_host_exec.go b/internal/demod/gpudemod/streaming_gpu_host_exec.go new file mode 100644 index 0000000..02d5953 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_host_exec.go @@ -0,0 +1,30 @@ +package gpudemod + +func (r *BatchRunner) executeStreamingGPUHostOraclePrepared(invocations []StreamingGPUInvocation) ([]StreamingGPUExecutionResult, error) { + results := make([]StreamingGPUExecutionResult, len(invocations)) + for i, inv := range invocations { + out, phase, phaseCount, hist := runStreamingPolyphaseHostCore( + inv.IQNew, + inv.SampleRate, + inv.OffsetHz, + inv.NCOPhaseIn, + inv.PhaseCountIn, + inv.NumTaps, + inv.Decim, + inv.ShiftedHistory, + inv.PolyphaseTaps, + ) + results[i] = StreamingGPUExecutionResult{ + SignalID: inv.SignalID, + Mode: StreamingGPUExecHostOracle, + IQ: out, + Rate: inv.OutRate, + NOut: len(out), + PhaseCountOut: phaseCount, + NCOPhaseOut: phase, + HistoryOut: hist, + HistoryLenOut: len(hist), + } + } + return results, nil +} diff --git a/internal/demod/gpudemod/streaming_gpu_host_oracle.go b/internal/demod/gpudemod/streaming_gpu_host_oracle.go new file mode 100644 index 0000000..aa2825e --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_host_oracle.go @@ -0,0 +1,49 @@ +package gpudemod + +// StreamingExtractGPUHostOracle is a temporary host-side execution of the intended +// streaming semantics using GPU-owned stream state. It is not the final GPU +// production implementation, but it allows the new production entrypoint to move +// from pure stub semantics toward real NEW-samples-only streaming behavior +// without reintroducing overlap+trim. +func (r *BatchRunner) StreamingExtractGPUHostOracle(iqNew []complex64, jobs []StreamingExtractJob) ([]StreamingExtractResult, error) { + if r == nil || r.eng == nil { + return nil, ErrUnavailable + } + results := make([]StreamingExtractResult, len(jobs)) + active := make(map[int64]struct{}, len(jobs)) + for i, job := range jobs { + active[job.SignalID] = struct{}{} + state, err := r.getOrInitExtractState(job, r.eng.sampleRate) + if err != nil { + return nil, err + } + out, phase, phaseCount, hist := runStreamingPolyphaseHostCore( + iqNew, + r.eng.sampleRate, + job.OffsetHz, + state.NCOPhase, + state.PhaseCount, + state.NumTaps, + state.Decim, + state.ShiftedHistory, + state.PolyphaseTaps, + ) + state.NCOPhase = phase + state.PhaseCount = phaseCount + state.ShiftedHistory = append(state.ShiftedHistory[:0], hist...) + results[i] = StreamingExtractResult{ + SignalID: job.SignalID, + IQ: out, + Rate: job.OutRate, + NOut: len(out), + PhaseCount: state.PhaseCount, + HistoryLen: len(state.ShiftedHistory), + } + } + for signalID := range r.streamState { + if _, ok := active[signalID]; !ok { + delete(r.streamState, signalID) + } + } + return results, nil +} diff --git a/internal/demod/gpudemod/streaming_gpu_host_oracle_test.go b/internal/demod/gpudemod/streaming_gpu_host_oracle_test.go new file mode 100644 index 0000000..b889ba5 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_host_oracle_test.go @@ -0,0 +1,35 @@ +package gpudemod + +import "testing" + +func TestStreamingGPUHostOracleComparableToCPUOracle(t *testing.T) { + r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} + job := StreamingExtractJob{ + SignalID: 1, + OffsetHz: 12500, + Bandwidth: 20000, + OutRate: 200000, + NumTaps: 65, + ConfigHash: 777, + } + iq := makeDeterministicIQ(16000) + gpuLike, err := r.StreamingExtractGPUHostOracle(iq, []StreamingExtractJob{job}) + if err != nil { + t.Fatalf("unexpected host-oracle error: %v", err) + } + oracleRunner := NewCPUOracleRunner(4000000) + oracle, err := oracleRunner.StreamingExtract(iq, []StreamingExtractJob{job}) + if err != nil { + t.Fatalf("unexpected oracle error: %v", err) + } + if len(gpuLike) != 1 || len(oracle) != 1 { + t.Fatalf("unexpected result lengths: gpuLike=%d oracle=%d", len(gpuLike), len(oracle)) + } + metrics, stats := CompareOracleAndGPUHostOracle(oracle[0], gpuLike[0]) + if stats.Count == 0 { + t.Fatalf("expected compare count > 0") + } + if metrics.RefMaxAbsErr > 1e-5 { + t.Fatalf("expected host-oracle path to match cpu oracle closely, got max abs err %f", metrics.RefMaxAbsErr) + } +} diff --git a/internal/demod/gpudemod/streaming_gpu_modes.go b/internal/demod/gpudemod/streaming_gpu_modes.go new file mode 100644 index 0000000..c5e858d --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_modes.go @@ -0,0 +1,4 @@ +package gpudemod + +const useGPUHostOracleExecution = false +const useGPUNativePreparedExecution = true diff --git a/internal/demod/gpudemod/streaming_gpu_native_prepare.go b/internal/demod/gpudemod/streaming_gpu_native_prepare.go new file mode 100644 index 0000000..a62b37e --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_native_prepare.go @@ -0,0 +1,115 @@ +//go:build cufft && windows + +package gpudemod + +/* +#cgo windows CFLAGS: -I"C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v13.2/include" +#include +typedef struct { float x; float y; } gpud_float2; +*/ +import "C" + +import ( + "math" + "unsafe" +) + +func (r *BatchRunner) executeStreamingGPUNativePrepared(invocations []StreamingGPUInvocation) ([]StreamingGPUExecutionResult, error) { + results := make([]StreamingGPUExecutionResult, len(invocations)) + for i, inv := range invocations { + phaseInc := -2.0 * math.Pi * inv.OffsetHz / float64(inv.SampleRate) + outCap := len(inv.IQNew)/maxInt(1, inv.Decim) + 2 + outHost := make([]complex64, outCap) + histCap := maxInt(0, inv.NumTaps-1) + histHost := make([]complex64, histCap) + var nOut C.int + var phaseCountOut C.int + var phaseEndOut C.double + + var dInNew, dHistIn, dOut, dHistOut unsafe.Pointer + var dTaps unsafe.Pointer + if len(inv.IQNew) > 0 { + if bridgeCudaMalloc(&dInNew, uintptr(len(inv.IQNew))*unsafe.Sizeof(C.gpud_float2{})) != 0 { + return nil, ErrUnavailable + } + defer bridgeCudaFree(dInNew) + if bridgeMemcpyH2D(dInNew, unsafe.Pointer(&inv.IQNew[0]), uintptr(len(inv.IQNew))*unsafe.Sizeof(complex64(0))) != 0 { + return nil, ErrUnavailable + } + } + if len(inv.ShiftedHistory) > 0 { + if bridgeCudaMalloc(&dHistIn, uintptr(len(inv.ShiftedHistory))*unsafe.Sizeof(C.gpud_float2{})) != 0 { + return nil, ErrUnavailable + } + defer bridgeCudaFree(dHistIn) + if bridgeMemcpyH2D(dHistIn, unsafe.Pointer(&inv.ShiftedHistory[0]), uintptr(len(inv.ShiftedHistory))*unsafe.Sizeof(complex64(0))) != 0 { + return nil, ErrUnavailable + } + } + if len(inv.PolyphaseTaps) > 0 { + if bridgeCudaMalloc(&dTaps, uintptr(len(inv.PolyphaseTaps))*unsafe.Sizeof(C.float(0))) != 0 { + return nil, ErrUnavailable + } + defer bridgeCudaFree(dTaps) + if bridgeMemcpyH2D(dTaps, unsafe.Pointer(&inv.PolyphaseTaps[0]), uintptr(len(inv.PolyphaseTaps))*unsafe.Sizeof(float32(0))) != 0 { + return nil, ErrUnavailable + } + } + if outCap > 0 { + if bridgeCudaMalloc(&dOut, uintptr(outCap)*unsafe.Sizeof(C.gpud_float2{})) != 0 { + return nil, ErrUnavailable + } + defer bridgeCudaFree(dOut) + } + if histCap > 0 { + if bridgeCudaMalloc(&dHistOut, uintptr(histCap)*unsafe.Sizeof(C.gpud_float2{})) != 0 { + return nil, ErrUnavailable + } + defer bridgeCudaFree(dHistOut) + } + + res := bridgeLaunchStreamingPolyphasePrepare( + (*C.gpud_float2)(dInNew), + len(inv.IQNew), + (*C.gpud_float2)(dHistIn), + len(inv.ShiftedHistory), + (*C.float)(dTaps), + len(inv.PolyphaseTaps), + inv.Decim, + inv.NumTaps, + inv.PhaseCountIn, + inv.NCOPhaseIn, + phaseInc, + (*C.gpud_float2)(dOut), + &nOut, + &phaseCountOut, + &phaseEndOut, + (*C.gpud_float2)(dHistOut), + ) + if res != 0 { + return nil, ErrUnavailable + } + if int(nOut) > 0 { + if bridgeMemcpyD2H(unsafe.Pointer(&outHost[0]), dOut, uintptr(int(nOut))*unsafe.Sizeof(complex64(0))) != 0 { + return nil, ErrUnavailable + } + } + if histCap > 0 { + if bridgeMemcpyD2H(unsafe.Pointer(&histHost[0]), dHistOut, uintptr(histCap)*unsafe.Sizeof(complex64(0))) != 0 { + return nil, ErrUnavailable + } + } + results[i] = StreamingGPUExecutionResult{ + SignalID: inv.SignalID, + Mode: StreamingGPUExecCUDA, + IQ: append([]complex64(nil), outHost[:int(nOut)]...), + Rate: inv.OutRate, + NOut: int(nOut), + PhaseCountOut: int(phaseCountOut), + NCOPhaseOut: float64(phaseEndOut), + HistoryOut: append([]complex64(nil), histHost...), + HistoryLenOut: histCap, + } + } + return results, nil +} diff --git a/internal/demod/gpudemod/streaming_gpu_native_prepare_stub.go b/internal/demod/gpudemod/streaming_gpu_native_prepare_stub.go new file mode 100644 index 0000000..3180b99 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_native_prepare_stub.go @@ -0,0 +1,8 @@ +//go:build !cufft || !windows + +package gpudemod + +func (r *BatchRunner) executeStreamingGPUNativePrepared(invocations []StreamingGPUInvocation) ([]StreamingGPUExecutionResult, error) { + _ = invocations + return nil, ErrUnavailable +} diff --git a/internal/demod/gpudemod/streaming_gpu_native_prepare_test.go b/internal/demod/gpudemod/streaming_gpu_native_prepare_test.go new file mode 100644 index 0000000..20b8c18 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_native_prepare_test.go @@ -0,0 +1,37 @@ +//go:build cufft && windows + +package gpudemod + +import "testing" + +func TestStreamingGPUNativePreparedComparableToCPUOracle(t *testing.T) { + r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} + job := StreamingExtractJob{ + SignalID: 1, + OffsetHz: 12500, + Bandwidth: 20000, + OutRate: 200000, + NumTaps: 65, + ConfigHash: 777, + } + iq := makeDeterministicIQ(16000) + gpuRes, err := r.StreamingExtractGPU(iq, []StreamingExtractJob{job}) + if err != nil { + t.Fatalf("unexpected native prepared GPU error: %v", err) + } + oracleRunner := NewCPUOracleRunner(4000000) + oracleRes, err := oracleRunner.StreamingExtract(iq, []StreamingExtractJob{job}) + if err != nil { + t.Fatalf("unexpected oracle error: %v", err) + } + if len(gpuRes) != 1 || len(oracleRes) != 1 { + t.Fatalf("unexpected result sizes: gpu=%d oracle=%d", len(gpuRes), len(oracleRes)) + } + metrics, stats := CompareOracleAndGPUHostOracle(oracleRes[0], gpuRes[0]) + if stats.Count == 0 { + t.Fatalf("expected compare count > 0") + } + if metrics.RefMaxAbsErr > 1e-4 { + t.Fatalf("native prepared path diverges too much from oracle: max abs err=%f", metrics.RefMaxAbsErr) + } +} diff --git a/internal/demod/gpudemod/streaming_gpu_prepare.go b/internal/demod/gpudemod/streaming_gpu_prepare.go new file mode 100644 index 0000000..74e4f69 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_prepare.go @@ -0,0 +1,59 @@ +package gpudemod + +func (r *BatchRunner) buildStreamingGPUInvocations(iqNew []complex64, jobs []StreamingExtractJob) ([]StreamingGPUInvocation, error) { + if r == nil || r.eng == nil { + return nil, ErrUnavailable + } + invocations := make([]StreamingGPUInvocation, len(jobs)) + active := make(map[int64]struct{}, len(jobs)) + for i, job := range jobs { + active[job.SignalID] = struct{}{} + state, err := r.getOrInitExtractState(job, r.eng.sampleRate) + if err != nil { + return nil, err + } + invocations[i] = StreamingGPUInvocation{ + SignalID: job.SignalID, + OffsetHz: job.OffsetHz, + OutRate: job.OutRate, + Bandwidth: job.Bandwidth, + SampleRate: r.eng.sampleRate, + NumTaps: state.NumTaps, + Decim: state.Decim, + PhaseCountIn: state.PhaseCount, + NCOPhaseIn: state.NCOPhase, + HistoryLen: len(state.ShiftedHistory), + BaseTaps: append([]float32(nil), state.BaseTaps...), + PolyphaseTaps: append([]float32(nil), state.PolyphaseTaps...), + ShiftedHistory: append([]complex64(nil), state.ShiftedHistory...), + IQNew: iqNew, + } + } + for signalID := range r.streamState { + if _, ok := active[signalID]; !ok { + delete(r.streamState, signalID) + } + } + return invocations, nil +} + +func (r *BatchRunner) applyStreamingGPUExecutionResults(results []StreamingGPUExecutionResult) []StreamingExtractResult { + out := make([]StreamingExtractResult, len(results)) + for i, res := range results { + state := r.streamState[res.SignalID] + if state != nil { + state.NCOPhase = res.NCOPhaseOut + state.PhaseCount = res.PhaseCountOut + state.ShiftedHistory = append(state.ShiftedHistory[:0], res.HistoryOut...) + } + out[i] = StreamingExtractResult{ + SignalID: res.SignalID, + IQ: res.IQ, + Rate: res.Rate, + NOut: res.NOut, + PhaseCount: res.PhaseCountOut, + HistoryLen: res.HistoryLenOut, + } + } + return out +} diff --git a/internal/demod/gpudemod/streaming_gpu_stub.go b/internal/demod/gpudemod/streaming_gpu_stub.go new file mode 100644 index 0000000..da85af4 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_stub.go @@ -0,0 +1,39 @@ +package gpudemod + +import "fmt" + +func updateShiftedHistory(prev []complex64, shiftedNew []complex64, numTaps int) []complex64 { + need := numTaps - 1 + if need <= 0 { + return nil + } + combined := append(append(make([]complex64, 0, len(prev)+len(shiftedNew)), prev...), shiftedNew...) + if len(combined) <= need { + out := make([]complex64, len(combined)) + copy(out, combined) + return out + } + out := make([]complex64, need) + copy(out, combined[len(combined)-need:]) + return out +} + +// StreamingExtractGPU is the planned production entry point for the stateful +// GPU extractor path. It intentionally exists early as an explicit boundary so +// callers can migrate away from legacy overlap+trim semantics. +// +// Current status: +// - validates jobs against persistent per-signal state ownership +// - enforces exact integer decimation +// - initializes per-signal state (config hash, taps, history capacity) +// - does not yet execute the final stateful polyphase GPU kernel path +func (r *BatchRunner) StreamingExtractGPU(iqNew []complex64, jobs []StreamingExtractJob) ([]StreamingExtractResult, error) { + if r == nil || r.eng == nil { + return nil, ErrUnavailable + } + if results, err := r.StreamingExtractGPUExec(iqNew, jobs); err == nil { + return results, nil + } + _, _ = iqNew, jobs + return nil, fmt.Errorf("StreamingExtractGPU not implemented yet: stateful polyphase GPU path pending") +} diff --git a/internal/demod/gpudemod/streaming_gpu_stub_test.go b/internal/demod/gpudemod/streaming_gpu_stub_test.go new file mode 100644 index 0000000..9889315 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_stub_test.go @@ -0,0 +1,53 @@ +package gpudemod + +import "testing" + +func TestStreamingGPUStubRemainsExplicitlyUnimplemented(t *testing.T) { + r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} + job := StreamingExtractJob{ + SignalID: 1, + OffsetHz: 12500, + Bandwidth: 20000, + OutRate: 200000, + NumTaps: 65, + ConfigHash: 777, + } + iq := makeDeterministicIQ(1000) + _, err := r.StreamingExtractGPU(iq, []StreamingExtractJob{job}) + if err == nil { + t.Fatalf("expected not-implemented error from GPU stub") + } +} + +func TestStreamingGPUHostOracleAdvancesState(t *testing.T) { + r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} + job := StreamingExtractJob{ + SignalID: 1, + OffsetHz: 12500, + Bandwidth: 20000, + OutRate: 200000, + NumTaps: 65, + ConfigHash: 777, + } + iq := makeDeterministicIQ(1000) + results, err := r.StreamingExtractGPUHostOracle(iq, []StreamingExtractJob{job}) + if err != nil { + t.Fatalf("unexpected host-oracle error: %v", err) + } + if len(results) != 1 { + t.Fatalf("expected 1 result, got %d", len(results)) + } + state := r.streamState[1] + if state == nil { + t.Fatalf("expected state to be initialized") + } + if state.NCOPhase == 0 { + t.Fatalf("expected phase to advance") + } + if len(state.ShiftedHistory) == 0 { + t.Fatalf("expected shifted history to be updated") + } + if results[0].NOut == 0 { + t.Fatalf("expected non-zero output count from host oracle path") + } +} diff --git a/internal/demod/gpudemod/streaming_host_core.go b/internal/demod/gpudemod/streaming_host_core.go new file mode 100644 index 0000000..f9b75aa --- /dev/null +++ b/internal/demod/gpudemod/streaming_host_core.go @@ -0,0 +1,64 @@ +package gpudemod + +import "math" + +func runStreamingPolyphaseHostCore( + iqNew []complex64, + sampleRate int, + offsetHz float64, + stateNCOPhase float64, + statePhaseCount int, + stateNumTaps int, + stateDecim int, + stateHistory []complex64, + polyphaseTaps []float32, +) ([]complex64, float64, int, []complex64) { + out := make([]complex64, 0, len(iqNew)/maxInt(1, stateDecim)+2) + phase := stateNCOPhase + phaseCount := statePhaseCount + hist := append([]complex64(nil), stateHistory...) + phaseLen := PolyphasePhaseLen(len(polyphaseTaps)/maxInt(1, stateDecim)*maxInt(1, stateDecim), stateDecim) + if phaseLen == 0 { + phaseLen = PolyphasePhaseLen(len(polyphaseTaps), stateDecim) + } + phaseInc := -2.0 * math.Pi * offsetHz / float64(sampleRate) + for _, x := range iqNew { + rot := complex64(complex(math.Cos(phase), math.Sin(phase))) + s := x * rot + hist = append(hist, s) + phaseCount++ + if phaseCount == stateDecim { + var y complex64 + for p := 0; p < stateDecim; p++ { + for k := 0; k < phaseLen; k++ { + idxTap := p*phaseLen + k + if idxTap >= len(polyphaseTaps) { + continue + } + tap := polyphaseTaps[idxTap] + if tap == 0 { + continue + } + srcBack := p + k*stateDecim + idx := len(hist) - 1 - srcBack + if idx < 0 { + continue + } + y += complex(tap, 0) * hist[idx] + } + } + out = append(out, y) + phaseCount = 0 + } + if len(hist) > stateNumTaps-1 { + hist = hist[len(hist)-(stateNumTaps-1):] + } + phase += phaseInc + if phase >= math.Pi { + phase -= 2 * math.Pi + } else if phase < -math.Pi { + phase += 2 * math.Pi + } + } + return out, phase, phaseCount, append([]complex64(nil), hist...) +} diff --git a/internal/demod/gpudemod/streaming_host_core_test.go b/internal/demod/gpudemod/streaming_host_core_test.go new file mode 100644 index 0000000..099c755 --- /dev/null +++ b/internal/demod/gpudemod/streaming_host_core_test.go @@ -0,0 +1,40 @@ +package gpudemod + +import "testing" + +func TestRunStreamingPolyphaseHostCoreMatchesCPUOraclePolyphase(t *testing.T) { + cfg := OracleHarnessConfig{ + SignalID: 1, + ConfigHash: 123, + NCOPhase: 0, + Decim: 20, + NumTaps: 65, + PhaseInc: 0.017, + } + state := MakeCPUOracleState(cfg) + iq := MakeDeterministicIQ(12000) + oracle := CPUOracleExtractPolyphase(iq, state, cfg.PhaseInc) + + state2 := MakeCPUOracleState(cfg) + out, phase, phaseCount, hist := runStreamingPolyphaseHostCore( + iq, + 4000000, + -cfg.PhaseInc*4000000/(2*3.141592653589793), + state2.NCOPhase, + state2.PhaseCount, + state2.NumTaps, + state2.Decim, + state2.ShiftedHistory, + state2.PolyphaseTaps, + ) + requireComplexSlicesClose(t, oracle, out, 1e-5) + if phase == 0 && len(iq) > 0 { + t.Fatalf("expected phase to advance") + } + if phaseCount < 0 || phaseCount >= state2.Decim { + t.Fatalf("unexpected phaseCount: %d", phaseCount) + } + if len(hist) == 0 { + t.Fatalf("expected history to be retained") + } +} diff --git a/internal/demod/gpudemod/streaming_oracle_extract.go b/internal/demod/gpudemod/streaming_oracle_extract.go new file mode 100644 index 0000000..eb89b7e --- /dev/null +++ b/internal/demod/gpudemod/streaming_oracle_extract.go @@ -0,0 +1,111 @@ +package gpudemod + +import ( + "fmt" + + "sdr-wideband-suite/internal/dsp" +) + +type CPUOracleRunner struct { + SampleRate int + States map[int64]*CPUOracleState +} + +func (r *CPUOracleRunner) ResetAllStates() { + if r == nil { + return + } + r.States = make(map[int64]*CPUOracleState) +} + +func NewCPUOracleRunner(sampleRate int) *CPUOracleRunner { + return &CPUOracleRunner{ + SampleRate: sampleRate, + States: make(map[int64]*CPUOracleState), + } +} + +func (r *CPUOracleRunner) ResetSignalState(signalID int64) { + if r == nil || r.States == nil { + return + } + delete(r.States, signalID) +} + +func (r *CPUOracleRunner) getOrInitState(job StreamingExtractJob) (*CPUOracleState, error) { + if r == nil { + return nil, fmt.Errorf("nil CPUOracleRunner") + } + if r.States == nil { + r.States = make(map[int64]*CPUOracleState) + } + decim, err := ExactIntegerDecimation(r.SampleRate, job.OutRate) + if err != nil { + return nil, err + } + state := r.States[job.SignalID] + if state == nil { + state = &CPUOracleState{SignalID: job.SignalID} + r.States[job.SignalID] = state + } + ResetCPUOracleStateIfConfigChanged(state, job.ConfigHash) + state.Decim = decim + state.NumTaps = job.NumTaps + if state.NumTaps <= 0 { + state.NumTaps = 101 + } + cutoff := job.Bandwidth / 2 + if cutoff < 200 { + cutoff = 200 + } + base := dsp.LowpassFIR(cutoff, r.SampleRate, state.NumTaps) + state.BaseTaps = make([]float32, len(base)) + for i, v := range base { + state.BaseTaps[i] = float32(v) + } + state.PolyphaseTaps = BuildPolyphaseTapsPhaseMajor(state.BaseTaps, state.Decim) + if state.ShiftedHistory == nil { + state.ShiftedHistory = make([]complex64, 0, maxInt(0, state.NumTaps-1)) + } + return state, nil +} + +func (r *CPUOracleRunner) StreamingExtract(iqNew []complex64, jobs []StreamingExtractJob) ([]StreamingExtractResult, error) { + results := make([]StreamingExtractResult, len(jobs)) + active := make(map[int64]struct{}, len(jobs)) + for i, job := range jobs { + active[job.SignalID] = struct{}{} + state, err := r.getOrInitState(job) + if err != nil { + return nil, err + } + out, phase, phaseCount, hist := runStreamingPolyphaseHostCore( + iqNew, + r.SampleRate, + job.OffsetHz, + state.NCOPhase, + state.PhaseCount, + state.NumTaps, + state.Decim, + state.ShiftedHistory, + state.PolyphaseTaps, + ) + state.NCOPhase = phase + state.PhaseCount = phaseCount + state.ShiftedHistory = append(state.ShiftedHistory[:0], hist...) + results[i] = StreamingExtractResult{ + SignalID: job.SignalID, + IQ: out, + Rate: job.OutRate, + NOut: len(out), + PhaseCount: state.PhaseCount, + HistoryLen: len(state.ShiftedHistory), + } + } + for signalID := range r.States { + if _, ok := active[signalID]; !ok { + delete(r.States, signalID) + } + } + return results, nil +} diff --git a/internal/demod/gpudemod/streaming_types.go b/internal/demod/gpudemod/streaming_types.go new file mode 100644 index 0000000..c6ee6b7 --- /dev/null +++ b/internal/demod/gpudemod/streaming_types.go @@ -0,0 +1,54 @@ +package gpudemod + +import ( + "fmt" + "hash/fnv" +) + +type StreamingExtractJob struct { + SignalID int64 + OffsetHz float64 + Bandwidth float64 + OutRate int + NumTaps int + ConfigHash uint64 +} + +type StreamingExtractResult struct { + SignalID int64 + IQ []complex64 + Rate int + NOut int + PhaseCount int + HistoryLen int +} + +type ExtractStreamState struct { + SignalID int64 + ConfigHash uint64 + NCOPhase float64 + Decim int + PhaseCount int + NumTaps int + ShiftedHistory []complex64 + BaseTaps []float32 + PolyphaseTaps []float32 + Initialized bool +} + +func ResetExtractStreamState(state *ExtractStreamState, cfgHash uint64) { + if state == nil { + return + } + state.ConfigHash = cfgHash + state.NCOPhase = 0 + state.PhaseCount = 0 + state.ShiftedHistory = state.ShiftedHistory[:0] + state.Initialized = false +} + +func StreamingConfigHash(signalID int64, offsetHz float64, bandwidth float64, outRate int, numTaps int, sampleRate int) uint64 { + h := fnv.New64a() + _, _ = h.Write([]byte(fmt.Sprintf("sig=%d|off=%.9f|bw=%.9f|out=%d|taps=%d|sr=%d", signalID, offsetHz, bandwidth, outRate, numTaps, sampleRate))) + return h.Sum64() +} diff --git a/internal/demod/gpudemod/test_harness.go b/internal/demod/gpudemod/test_harness.go new file mode 100644 index 0000000..2a74d0b --- /dev/null +++ b/internal/demod/gpudemod/test_harness.go @@ -0,0 +1,78 @@ +package gpudemod + +import ( + "math" +) + +type OracleHarnessConfig struct { + SignalID int64 + ConfigHash uint64 + NCOPhase float64 + Decim int + NumTaps int + PhaseInc float64 +} + +func MakeDeterministicIQ(n int) []complex64 { + out := make([]complex64, n) + for i := 0; i < n; i++ { + a := 0.017 * float64(i) + b := 0.031 * float64(i) + out[i] = complex64(complex(math.Cos(a)+0.2*math.Cos(b), math.Sin(a)+0.15*math.Sin(b))) + } + return out +} + +func MakeToneIQ(n int, phaseInc float64) []complex64 { + out := make([]complex64, n) + phase := 0.0 + for i := 0; i < n; i++ { + out[i] = complex64(complex(math.Cos(phase), math.Sin(phase))) + phase += phaseInc + } + return out +} + +func MakeLowpassTaps(n int) []float32 { + out := make([]float32, n) + for i := range out { + out[i] = 1.0 / float32(n) + } + return out +} + +func MakeCPUOracleState(cfg OracleHarnessConfig) *CPUOracleState { + taps := MakeLowpassTaps(cfg.NumTaps) + return &CPUOracleState{ + SignalID: cfg.SignalID, + ConfigHash: cfg.ConfigHash, + NCOPhase: cfg.NCOPhase, + Decim: cfg.Decim, + PhaseCount: 0, + NumTaps: cfg.NumTaps, + ShiftedHistory: make([]complex64, 0, maxInt(0, cfg.NumTaps-1)), + BaseTaps: taps, + PolyphaseTaps: BuildPolyphaseTapsPhaseMajor(taps, cfg.Decim), + } +} + +func RunChunkedCPUOraclePolyphase(all []complex64, chunkSizes []int, mkState func() *CPUOracleState, phaseInc float64) []complex64 { + state := mkState() + out := make([]complex64, 0) + pos := 0 + for _, n := range chunkSizes { + if pos >= len(all) { + break + } + end := pos + n + if end > len(all) { + end = len(all) + } + out = append(out, CPUOracleExtractPolyphase(all[pos:end], state, phaseInc)...) + pos = end + } + if pos < len(all) { + out = append(out, CPUOracleExtractPolyphase(all[pos:], state, phaseInc)...) + } + return out +} diff --git a/internal/demod/gpudemod/test_harness_test.go b/internal/demod/gpudemod/test_harness_test.go new file mode 100644 index 0000000..c4621b1 --- /dev/null +++ b/internal/demod/gpudemod/test_harness_test.go @@ -0,0 +1,39 @@ +package gpudemod + +import "testing" + +func requireComplexSlicesCloseHarness(t *testing.T, a []complex64, b []complex64, tol float64) { + t.Helper() + if len(a) != len(b) { + t.Fatalf("length mismatch: %d vs %d", len(a), len(b)) + } + for i := range a { + d := CompareComplexSlices([]complex64{a[i]}, []complex64{b[i]}) + if d.MaxAbsErr > tol { + t.Fatalf("slice mismatch at %d: %v vs %v (tol=%f)", i, a[i], b[i], tol) + } + } +} + +func TestHarnessChunkedCPUOraclePolyphase(t *testing.T) { + cfg := OracleHarnessConfig{ + SignalID: 1, + ConfigHash: 123, + NCOPhase: 0, + Decim: 20, + NumTaps: 65, + PhaseInc: 0.017, + } + iq := MakeDeterministicIQ(150000) + mk := func() *CPUOracleState { return MakeCPUOracleState(cfg) } + mono := CPUOracleExtractPolyphase(iq, mk(), cfg.PhaseInc) + chunked := RunChunkedCPUOraclePolyphase(iq, []int{4096, 5000, 8192, 27307}, mk, cfg.PhaseInc) + requireComplexSlicesCloseHarness(t, mono, chunked, 1e-5) +} + +func TestHarnessToneIQ(t *testing.T) { + iq := MakeToneIQ(1024, 0.05) + if len(iq) != 1024 { + t.Fatalf("unexpected tone iq length: %d", len(iq)) + } +} diff --git a/internal/demod/gpudemod/windows_bridge.go b/internal/demod/gpudemod/windows_bridge.go index 3371be7..2ff9e98 100644 --- a/internal/demod/gpudemod/windows_bridge.go +++ b/internal/demod/gpudemod/windows_bridge.go @@ -26,6 +26,7 @@ typedef int (__stdcall *gpud_launch_decimate_stream_fn)(const gpud_float2* in, g typedef int (__stdcall *gpud_launch_decimate_fn)(const gpud_float2* in, gpud_float2* out, int n_out, int factor); typedef int (__stdcall *gpud_launch_am_envelope_fn)(const gpud_float2* in, float* out, int n); typedef int (__stdcall *gpud_launch_ssb_product_fn)(const gpud_float2* in, float* out, int n, double phase_inc, double phase_start); +typedef int (__stdcall *gpud_launch_streaming_polyphase_prepare_fn)(const gpud_float2* in_new, int n_new, const gpud_float2* history_in, int history_len, const float* polyphase_taps, int polyphase_len, int decim, int num_taps, int phase_count_in, double phase_start, double phase_inc, gpud_float2* out, int* n_out, int* phase_count_out, double* phase_end_out, gpud_float2* history_out); static HMODULE gpud_mod = NULL; static gpud_stream_create_fn gpud_p_stream_create = NULL; @@ -42,6 +43,7 @@ static gpud_launch_decimate_stream_fn gpud_p_launch_decimate_stream = NULL; static gpud_launch_decimate_fn gpud_p_launch_decimate = NULL; static gpud_launch_am_envelope_fn gpud_p_launch_am_envelope = NULL; static gpud_launch_ssb_product_fn gpud_p_launch_ssb_product = NULL; +static gpud_launch_streaming_polyphase_prepare_fn gpud_p_launch_streaming_polyphase_prepare = NULL; static int gpud_cuda_malloc(void **ptr, size_t bytes) { return (int)cudaMalloc(ptr, bytes); } static int gpud_cuda_free(void *ptr) { return (int)cudaFree(ptr); } @@ -67,6 +69,7 @@ static int gpud_load_library(const char* path) { gpud_p_launch_decimate = (gpud_launch_decimate_fn)GetProcAddress(gpud_mod, "gpud_launch_decimate_cuda"); gpud_p_launch_am_envelope = (gpud_launch_am_envelope_fn)GetProcAddress(gpud_mod, "gpud_launch_am_envelope_cuda"); gpud_p_launch_ssb_product = (gpud_launch_ssb_product_fn)GetProcAddress(gpud_mod, "gpud_launch_ssb_product_cuda"); + gpud_p_launch_streaming_polyphase_prepare = (gpud_launch_streaming_polyphase_prepare_fn)GetProcAddress(gpud_mod, "gpud_launch_streaming_polyphase_prepare_cuda"); if (!gpud_p_stream_create || !gpud_p_stream_destroy || !gpud_p_stream_sync || !gpud_p_upload_fir_taps || !gpud_p_launch_freq_shift_stream || !gpud_p_launch_freq_shift || !gpud_p_launch_fm_discrim || !gpud_p_launch_fir_stream || !gpud_p_launch_fir || !gpud_p_launch_decimate_stream || !gpud_p_launch_decimate || !gpud_p_launch_am_envelope || !gpud_p_launch_ssb_product) { FreeLibrary(gpud_mod); gpud_mod = NULL; @@ -89,6 +92,7 @@ static int gpud_launch_decimate_stream(gpud_float2 *in, gpud_float2 *out, int n_ static int gpud_launch_decimate(gpud_float2 *in, gpud_float2 *out, int n_out, int factor) { if (!gpud_p_launch_decimate) return -1; return gpud_p_launch_decimate(in, out, n_out, factor); } static int gpud_launch_am_envelope(gpud_float2 *in, float *out, int n) { if (!gpud_p_launch_am_envelope) return -1; return gpud_p_launch_am_envelope(in, out, n); } static int gpud_launch_ssb_product(gpud_float2 *in, float *out, int n, double phase_inc, double phase_start) { if (!gpud_p_launch_ssb_product) return -1; return gpud_p_launch_ssb_product(in, out, n, phase_inc, phase_start); } +static int gpud_launch_streaming_polyphase_prepare(gpud_float2 *in_new, int n_new, gpud_float2 *history_in, int history_len, float *polyphase_taps, int polyphase_len, int decim, int num_taps, int phase_count_in, double phase_start, double phase_inc, gpud_float2 *out, int *n_out, int *phase_count_out, double *phase_end_out, gpud_float2 *history_out) { if (!gpud_p_launch_streaming_polyphase_prepare) return -1; return gpud_p_launch_streaming_polyphase_prepare(in_new, n_new, history_in, history_len, polyphase_taps, polyphase_len, decim, num_taps, phase_count_in, phase_start, phase_inc, out, n_out, phase_count_out, phase_end_out, history_out); } */ import "C" @@ -131,6 +135,9 @@ func bridgeLaunchAMEnvelope(in *C.gpud_float2, out *C.float, n int) int { return func bridgeLaunchSSBProduct(in *C.gpud_float2, out *C.float, n int, phaseInc float64, phaseStart float64) int { return int(C.gpud_launch_ssb_product(in, out, C.int(n), C.double(phaseInc), C.double(phaseStart))) } +func bridgeLaunchStreamingPolyphasePrepare(inNew *C.gpud_float2, nNew int, historyIn *C.gpud_float2, historyLen int, polyphaseTaps *C.float, polyphaseLen int, decim int, numTaps int, phaseCountIn int, phaseStart float64, phaseInc float64, out *C.gpud_float2, nOut *C.int, phaseCountOut *C.int, phaseEndOut *C.double, historyOut *C.gpud_float2) int { + return int(C.gpud_launch_streaming_polyphase_prepare(inNew, C.int(nNew), historyIn, C.int(historyLen), polyphaseTaps, C.int(polyphaseLen), C.int(decim), C.int(numTaps), C.int(phaseCountIn), C.double(phaseStart), C.double(phaseInc), out, nOut, phaseCountOut, phaseEndOut, historyOut)) +} func bridgeStreamCreate() (streamHandle, int) { var s C.gpud_stream_handle res := int(C.gpud_stream_create(&s))