diff --git a/build-gpudemod-dll.ps1 b/build-gpudemod-dll.ps1 index da19b08..4e095c2 100644 --- a/build-gpudemod-dll.ps1 +++ b/build-gpudemod-dll.ps1 @@ -16,12 +16,25 @@ if (!(Test-Path $outDir)) { New-Item -ItemType Directory -Path $outDir | Out-Nul Remove-Item $dll,$lib,$exp -Force -ErrorAction SilentlyContinue -$cmd = @" -call "$vcvars" && "$nvcc" -shared "$src" -o "$dll" -cudart=hybrid -Xcompiler "/MD" -arch=sm_75 -gencode arch=compute_75,code=sm_75 -gencode arch=compute_80,code=sm_80 -gencode arch=compute_86,code=sm_86 -gencode arch=compute_89,code=sm_89 -gencode arch=compute_90,code=sm_90 +$bat = Join-Path $env:TEMP 'build-gpudemod-dll.bat' +$batContent = @" +@echo off +call "$vcvars" +if errorlevel 1 exit /b %errorlevel% +"$nvcc" -shared "$src" -o "$dll" -cudart=hybrid -Xcompiler "/MD" -arch=sm_75 ^ + -gencode arch=compute_75,code=sm_75 ^ + -gencode arch=compute_80,code=sm_80 ^ + -gencode arch=compute_86,code=sm_86 ^ + -gencode arch=compute_89,code=sm_89 ^ + -gencode arch=compute_90,code=sm_90 +exit /b %errorlevel% "@ +Set-Content -Path $bat -Value $batContent -Encoding ASCII Write-Host 'Building gpudemod CUDA DLL...' -ForegroundColor Cyan -cmd.exe /c $cmd -if ($LASTEXITCODE -ne 0) { throw 'gpudemod DLL build failed' } +cmd.exe /c ""$bat"" +$exitCode = $LASTEXITCODE +Remove-Item $bat -Force -ErrorAction SilentlyContinue +if ($exitCode -ne 0) { throw 'gpudemod DLL build failed' } Write-Host "Built: $dll" -ForegroundColor Green diff --git a/build-sdrplay.ps1 b/build-sdrplay.ps1 index 5f5e2bb..89c5507 100644 --- a/build-sdrplay.ps1 +++ b/build-sdrplay.ps1 @@ -21,10 +21,13 @@ if (Test-Path $sdrplayBin) { $env:PATH = "$sdrplayBin;" + $env:PATH } # CUDA runtime / cuFFT $cudaInc = 'C:\CUDA\include' $cudaBin = 'C:\CUDA\bin' +$cudaBinX64 = 'C:\CUDA\bin\x64' if (-not (Test-Path $cudaInc)) { $cudaInc = 'C:\PROGRA~1\NVIDIA~2\CUDA\v13.2\include' } if (-not (Test-Path $cudaBin)) { $cudaBin = 'C:\PROGRA~1\NVIDIA~2\CUDA\v13.2\bin' } +if (-not (Test-Path $cudaBinX64)) { $cudaBinX64 = 'C:\PROGRA~1\NVIDIA~2\CUDA\v13.2\bin\x64' } $cudaMingw = Join-Path $PSScriptRoot 'cuda-mingw' if (Test-Path $cudaInc) { $env:CGO_CFLAGS = "$env:CGO_CFLAGS -I$cudaInc" } +if (Test-Path $cudaBinX64) { $env:PATH = "$cudaBinX64;" + $env:PATH } if (Test-Path $cudaBin) { $env:PATH = "$cudaBin;" + $env:PATH } if (Test-Path $cudaMingw) { $env:CGO_LDFLAGS = "$env:CGO_LDFLAGS -L$cudaMingw -lcudart64_13 -lcufft64_12 -lkernel32" } @@ -68,8 +71,11 @@ if ($dllSrc) { } $cudartCandidates = @( + (Join-Path $cudaBinX64 'cudart64_13.dll'), (Join-Path $cudaBin 'cudart64_13.dll'), + 'C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v13.2\bin\x64\cudart64_13.dll', 'C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v13.2\bin\cudart64_13.dll', + 'C:\CUDA\bin\x64\cudart64_13.dll', 'C:\CUDA\bin\cudart64_13.dll' ) $cudartSrc = $cudartCandidates | Where-Object { $_ -and (Test-Path $_) } | Select-Object -First 1 diff --git a/internal/demod/gpudemod/batch_runner.go b/internal/demod/gpudemod/batch_runner.go index 4f8c5d0..3933c1b 100644 --- a/internal/demod/gpudemod/batch_runner.go +++ b/internal/demod/gpudemod/batch_runner.go @@ -15,6 +15,7 @@ type BatchRunner struct { slotBufs []slotBuffers slotBufSize int // number of IQ samples the slot buffers were allocated for streamState map[int64]*ExtractStreamState + nativeState map[int64]*nativeStreamingSignalState } func NewBatchRunner(maxSamples int, sampleRate int) (*BatchRunner, error) { @@ -22,7 +23,11 @@ func NewBatchRunner(maxSamples int, sampleRate int) (*BatchRunner, error) { if err != nil { return nil, err } - return &BatchRunner{eng: eng, streamState: make(map[int64]*ExtractStreamState)}, nil + return &BatchRunner{ + eng: eng, + streamState: make(map[int64]*ExtractStreamState), + nativeState: make(map[int64]*nativeStreamingSignalState), + }, nil } func (r *BatchRunner) Close() { @@ -30,10 +35,12 @@ func (r *BatchRunner) Close() { return } r.freeSlotBuffers() + r.freeAllNativeStreamingStates() r.eng.Close() r.eng = nil r.slots = nil r.streamState = nil + r.nativeState = nil } func (r *BatchRunner) prepare(jobs []ExtractJob) { diff --git a/internal/demod/gpudemod/native/exports.cu b/internal/demod/gpudemod/native/exports.cu index d3675bc..d2bceae 100644 --- a/internal/demod/gpudemod/native/exports.cu +++ b/internal/demod/gpudemod/native/exports.cu @@ -11,6 +11,10 @@ typedef void* gpud_stream_handle; +static __forceinline__ int gpud_max_i(int a, int b) { + return a > b ? a : b; +} + GPUD_API int GPUD_CALL gpud_stream_create(gpud_stream_handle* out) { if (!out) return -1; cudaStream_t stream; @@ -321,6 +325,34 @@ GPUD_API int GPUD_CALL gpud_launch_ssb_product_cuda( return (int)cudaGetLastError(); } +__global__ void gpud_streaming_polyphase_accum_kernel( + const float2* __restrict__ history_state, + int history_len, + const float2* __restrict__ shifted_new, + int n_new, + const float* __restrict__ polyphase_taps, + int polyphase_len, + int decim, + int phase_len, + int start_idx, + int n_out, + float2* __restrict__ out +); + +__global__ void gpud_streaming_history_tail_kernel( + const float2* __restrict__ history_state, + int history_len, + const float2* __restrict__ shifted_new, + int n_new, + int keep, + float2* __restrict__ history_out +); + +static __forceinline__ double gpud_reduce_phase(double phase); + +// Transitional legacy entrypoint retained for bring-up and comparison. +// The production-native streaming path is gpud_launch_streaming_polyphase_stateful_cuda, +// which preserves per-signal carry state across NEW-samples-only chunks. GPUD_API int GPUD_CALL gpud_launch_streaming_polyphase_prepare_cuda( const float2* in_new, int n_new, @@ -339,113 +371,261 @@ GPUD_API int GPUD_CALL gpud_launch_streaming_polyphase_prepare_cuda( double* phase_end_out, float2* history_out ) { - if (!in_new || n_new < 0 || !polyphase_taps || polyphase_len <= 0 || decim <= 0 || num_taps <= 0) return -1; + if (n_new < 0 || !polyphase_taps || polyphase_len <= 0 || decim <= 0 || num_taps <= 0) return -1; const int phase_len = (num_taps + decim - 1) / decim; if (polyphase_len < decim * phase_len) return -2; - const int combined_len = history_len + n_new; - float2* shifted = NULL; - float2* combined = NULL; - cudaError_t err = cudaMalloc((void**)&shifted, (size_t)max(1, n_new) * sizeof(float2)); - if (err != cudaSuccess) return (int)err; - err = cudaMalloc((void**)&combined, (size_t)max(1, combined_len) * sizeof(float2)); - if (err != cudaSuccess) { - cudaFree(shifted); - return (int)err; - } + const int keep = num_taps > 1 ? num_taps - 1 : 0; + int clamped_history_len = history_len; + if (clamped_history_len < 0) clamped_history_len = 0; + if (clamped_history_len > keep) clamped_history_len = keep; + if (clamped_history_len > 0 && !history_in) return -5; - const int block = 256; - const int grid_shift = (n_new + block - 1) / block; + float2* shifted = NULL; + cudaError_t err = cudaSuccess; if (n_new > 0) { + if (!in_new) return -3; + err = cudaMalloc((void**)&shifted, (size_t)gpud_max_i(1, n_new) * sizeof(float2)); + if (err != cudaSuccess) return (int)err; + const int block = 256; + const int grid_shift = (n_new + block - 1) / block; gpud_freq_shift_kernel<<>>(in_new, shifted, n_new, phase_inc, phase_start); err = cudaGetLastError(); if (err != cudaSuccess) { cudaFree(shifted); - cudaFree(combined); return (int)err; } } - if (history_len > 0 && history_in) { - err = cudaMemcpy(combined, history_in, (size_t)history_len * sizeof(float2), cudaMemcpyDeviceToDevice); - if (err != cudaSuccess) { + int phase_count = phase_count_in; + if (phase_count < 0) phase_count = 0; + if (phase_count >= decim) phase_count %= decim; + const int total_phase = phase_count + n_new; + const int out_count = total_phase / decim; + if (out_count > 0) { + if (!out) { cudaFree(shifted); - cudaFree(combined); - return (int)err; + return -4; } - } - if (n_new > 0) { - err = cudaMemcpy(combined + history_len, shifted, (size_t)n_new * sizeof(float2), cudaMemcpyDeviceToDevice); + const int block = 256; + const int grid = (out_count + block - 1) / block; + const int start_idx = decim - phase_count - 1; + gpud_streaming_polyphase_accum_kernel<<>>( + history_in, + clamped_history_len, + shifted, + n_new, + polyphase_taps, + polyphase_len, + decim, + phase_len, + start_idx, + out_count, + out + ); + err = cudaGetLastError(); if (err != cudaSuccess) { cudaFree(shifted); - cudaFree(combined); return (int)err; } } - int out_count = 0; - int phase_count = phase_count_in; - for (int i = 0; i < n_new; ++i) { - phase_count++; - if (phase_count == decim) { - float2 acc = make_float2(0.0f, 0.0f); - int newest = history_len + i; - for (int p = 0; p < decim; ++p) { - for (int k = 0; k < phase_len; ++k) { - int tap_idx = p * phase_len + k; - if (tap_idx >= polyphase_len) continue; - float tap; - err = cudaMemcpy(&tap, polyphase_taps + tap_idx, sizeof(float), cudaMemcpyDeviceToHost); - if (err != cudaSuccess) { - cudaFree(shifted); - cudaFree(combined); - return (int)err; - } - if (tap == 0.0f) continue; - int src_back = p + k * decim; - int src_idx = newest - src_back; - if (src_idx < 0) continue; - float2 sample; - err = cudaMemcpy(&sample, combined + src_idx, sizeof(float2), cudaMemcpyDeviceToHost); - if (err != cudaSuccess) { - cudaFree(shifted); - cudaFree(combined); - return (int)err; - } - acc.x += sample.x * tap; - acc.y += sample.y * tap; - } - } - err = cudaMemcpy(out + out_count, &acc, sizeof(float2), cudaMemcpyHostToDevice); + if (history_out && keep > 0) { + const int new_history_len = clamped_history_len + n_new < keep ? clamped_history_len + n_new : keep; + if (new_history_len > 0) { + const int block = 256; + const int grid = (new_history_len + block - 1) / block; + gpud_streaming_history_tail_kernel<<>>( + history_in, + clamped_history_len, + shifted, + n_new, + new_history_len, + history_out + ); + err = cudaGetLastError(); if (err != cudaSuccess) { cudaFree(shifted); - cudaFree(combined); return (int)err; } - out_count++; - phase_count = 0; } } - const int keep = num_taps > 1 ? num_taps - 1 : 0; - if (history_out && keep > 0) { - int copy = keep; - if (combined_len < copy) copy = combined_len; - if (copy > 0) { - err = cudaMemcpy(history_out, combined + (combined_len - copy), (size_t)copy * sizeof(float2), cudaMemcpyDeviceToDevice); - if (err != cudaSuccess) { - cudaFree(shifted); - cudaFree(combined); - return (int)err; - } + if (n_out) *n_out = out_count; + if (phase_count_out) *phase_count_out = total_phase % decim; + if (phase_end_out) *phase_end_out = gpud_reduce_phase(phase_start + phase_inc * (double)n_new); + + if (shifted) cudaFree(shifted); + return 0; +} + +static __device__ __forceinline__ float2 gpud_stream_sample_at( + const float2* __restrict__ history_state, + int history_len, + const float2* __restrict__ shifted_new, + int n_new, + int idx +) { + if (idx < 0) return make_float2(0.0f, 0.0f); + if (idx < history_len) return history_state[idx]; + int shifted_idx = idx - history_len; + if (shifted_idx < 0 || shifted_idx >= n_new) return make_float2(0.0f, 0.0f); + return shifted_new[shifted_idx]; +} + +__global__ void gpud_streaming_polyphase_accum_kernel( + const float2* __restrict__ history_state, + int history_len, + const float2* __restrict__ shifted_new, + int n_new, + const float* __restrict__ polyphase_taps, + int polyphase_len, + int decim, + int phase_len, + int start_idx, + int n_out, + float2* __restrict__ out +) { + int out_idx = blockIdx.x * blockDim.x + threadIdx.x; + if (out_idx >= n_out) return; + + int newest = history_len + start_idx + out_idx * decim; + float acc_r = 0.0f; + float acc_i = 0.0f; + for (int p = 0; p < decim; ++p) { + for (int k = 0; k < phase_len; ++k) { + int tap_idx = p * phase_len + k; + if (tap_idx >= polyphase_len) continue; + float tap = polyphase_taps[tap_idx]; + if (tap == 0.0f) continue; + int src_back = p + k * decim; + int src_idx = newest - src_back; + float2 sample = gpud_stream_sample_at(history_state, history_len, shifted_new, n_new, src_idx); + acc_r += sample.x * tap; + acc_i += sample.y * tap; } } + out[out_idx] = make_float2(acc_r, acc_i); +} - if (n_out) *n_out = out_count; - if (phase_count_out) *phase_count_out = phase_count; - if (phase_end_out) *phase_end_out = phase_start + phase_inc * (double)n_new; +__global__ void gpud_streaming_history_tail_kernel( + const float2* __restrict__ history_state, + int history_len, + const float2* __restrict__ shifted_new, + int n_new, + int keep, + float2* __restrict__ history_out +) { + int idx = blockIdx.x * blockDim.x + threadIdx.x; + if (idx >= keep) return; + int combined_len = history_len + n_new; + int src_idx = combined_len - keep + idx; + history_out[idx] = gpud_stream_sample_at(history_state, history_len, shifted_new, n_new, src_idx); +} + +static __forceinline__ double gpud_reduce_phase(double phase) { + const double TWO_PI = 6.283185307179586; + return phase - rint(phase / TWO_PI) * TWO_PI; +} + +// Production-native candidate entrypoint for the stateful streaming extractor. +// Callers provide only NEW samples; overlap+trim is intentionally not part of this path. +GPUD_API int GPUD_CALL gpud_launch_streaming_polyphase_stateful_cuda( + const float2* in_new, + int n_new, + float2* shifted_new_tmp, + const float* polyphase_taps, + int polyphase_len, + int decim, + int num_taps, + float2* history_state, + float2* history_scratch, + int history_cap, + int* history_len_io, + int* phase_count_state, + double* phase_state, + double phase_inc, + float2* out, + int out_cap, + int* n_out +) { + if (!polyphase_taps || decim <= 0 || num_taps <= 0 || !history_len_io || !phase_count_state || !phase_state || !n_out) return -10; + if (n_new < 0 || out_cap < 0 || history_cap < 0) return -11; + const int phase_len = (num_taps + decim - 1) / decim; + if (polyphase_len < decim * phase_len) return -12; + + int history_len = *history_len_io; + if (history_len < 0) history_len = 0; + if (history_len > history_cap) history_len = history_cap; + + int phase_count = *phase_count_state; + if (phase_count < 0) phase_count = 0; + if (phase_count >= decim) phase_count %= decim; + + double phase_start = *phase_state; + if (n_new > 0) { + if (!in_new || !shifted_new_tmp) return -13; + const int block = 256; + const int grid = (n_new + block - 1) / block; + gpud_freq_shift_kernel<<>>(in_new, shifted_new_tmp, n_new, phase_inc, phase_start); + cudaError_t err = cudaGetLastError(); + if (err != cudaSuccess) return (int)err; + } + + const int total_phase = phase_count + n_new; + const int out_count = total_phase / decim; + if (out_count > out_cap) return -14; + + if (out_count > 0) { + if (!out) return -15; + const int block = 256; + const int grid = (out_count + block - 1) / block; + const int start_idx = decim - phase_count - 1; + gpud_streaming_polyphase_accum_kernel<<>>( + history_state, + history_len, + shifted_new_tmp, + n_new, + polyphase_taps, + polyphase_len, + decim, + phase_len, + start_idx, + out_count, + out + ); + cudaError_t err = cudaGetLastError(); + if (err != cudaSuccess) return (int)err; + } + + int new_history_len = history_len; + if (history_cap > 0) { + new_history_len = history_len + n_new; + if (new_history_len > history_cap) new_history_len = history_cap; + if (new_history_len > 0) { + if (!history_state || !history_scratch) return -16; + const int block = 256; + const int grid = (new_history_len + block - 1) / block; + gpud_streaming_history_tail_kernel<<>>( + history_state, + history_len, + shifted_new_tmp, + n_new, + new_history_len, + history_scratch + ); + cudaError_t err = cudaGetLastError(); + if (err != cudaSuccess) return (int)err; + err = cudaMemcpy(history_state, history_scratch, (size_t)new_history_len * sizeof(float2), cudaMemcpyDeviceToDevice); + if (err != cudaSuccess) return (int)err; + } + } else { + new_history_len = 0; + } - cudaFree(shifted); - cudaFree(combined); + *history_len_io = new_history_len; + *phase_count_state = total_phase % decim; + *phase_state = gpud_reduce_phase(phase_start + phase_inc * (double)n_new); + *n_out = out_count; return 0; } diff --git a/internal/demod/gpudemod/stream_state.go b/internal/demod/gpudemod/stream_state.go index ef8d400..5c1db48 100644 --- a/internal/demod/gpudemod/stream_state.go +++ b/internal/demod/gpudemod/stream_state.go @@ -7,6 +7,7 @@ func (r *BatchRunner) ResetSignalState(signalID int64) { return } delete(r.streamState, signalID) + r.resetNativeStreamingState(signalID) } func (r *BatchRunner) ResetAllSignalStates() { @@ -14,6 +15,7 @@ func (r *BatchRunner) ResetAllSignalStates() { return } r.streamState = make(map[int64]*ExtractStreamState) + r.resetAllNativeStreamingStates() } func (r *BatchRunner) getOrInitExtractState(job StreamingExtractJob, sampleRate int) (*ExtractStreamState, error) { diff --git a/internal/demod/gpudemod/streaming_gpu_contract.go b/internal/demod/gpudemod/streaming_gpu_contract.go index 6a059c1..c978f22 100644 --- a/internal/demod/gpudemod/streaming_gpu_contract.go +++ b/internal/demod/gpudemod/streaming_gpu_contract.go @@ -10,6 +10,7 @@ const ( type StreamingGPUInvocation struct { SignalID int64 + ConfigHash uint64 OffsetHz float64 OutRate int Bandwidth float64 diff --git a/internal/demod/gpudemod/streaming_gpu_exec.go b/internal/demod/gpudemod/streaming_gpu_exec.go index 6e74dc1..23ec814 100644 --- a/internal/demod/gpudemod/streaming_gpu_exec.go +++ b/internal/demod/gpudemod/streaming_gpu_exec.go @@ -9,15 +9,17 @@ func (r *BatchRunner) StreamingExtractGPUExec(iqNew []complex64, jobs []Streamin if err != nil { return nil, err } - if useGPUHostOracleExecution { - execResults, err := r.executeStreamingGPUHostOraclePrepared(invocations) - if err != nil { + if useGPUNativePreparedExecution { + execResults, err := r.executeStreamingGPUNativePrepared(invocations) + if err == nil { + return r.applyStreamingGPUExecutionResults(execResults), nil + } + if !useGPUHostOracleExecution { return nil, err } - return r.applyStreamingGPUExecutionResults(execResults), nil } - if useGPUNativePreparedExecution { - execResults, err := r.executeStreamingGPUNativePrepared(invocations) + if useGPUHostOracleExecution { + execResults, err := r.executeStreamingGPUHostOraclePrepared(invocations) if err != nil { return nil, err } diff --git a/internal/demod/gpudemod/streaming_gpu_exec_test.go b/internal/demod/gpudemod/streaming_gpu_exec_test.go index 3d4a5b6..9933cdf 100644 --- a/internal/demod/gpudemod/streaming_gpu_exec_test.go +++ b/internal/demod/gpudemod/streaming_gpu_exec_test.go @@ -2,7 +2,7 @@ package gpudemod import "testing" -func TestStreamingExtractGPUExecUnavailableByDefault(t *testing.T) { +func TestStreamingExtractGPUExecUsesSafeDefaultMode(t *testing.T) { r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} job := StreamingExtractJob{ SignalID: 1, @@ -12,8 +12,101 @@ func TestStreamingExtractGPUExecUnavailableByDefault(t *testing.T) { NumTaps: 65, ConfigHash: 777, } - _, err := r.StreamingExtractGPUExec(makeDeterministicIQ(2048), []StreamingExtractJob{job}) - if err == nil { - t.Fatalf("expected unavailable/disabled execution path by default") + res, err := r.StreamingExtractGPUExec(makeDeterministicIQ(2048), []StreamingExtractJob{job}) + if err != nil { + t.Fatalf("expected safe default execution path, got error: %v", err) + } + if len(res) != 1 { + t.Fatalf("expected 1 result, got %d", len(res)) + } + if res[0].Rate != job.OutRate { + t.Fatalf("expected output rate %d, got %d", job.OutRate, res[0].Rate) + } + if res[0].NOut <= 0 { + t.Fatalf("expected streaming output samples") + } +} + +func TestStreamingGPUExecMatchesCPUOracleAcrossChunkPatterns(t *testing.T) { + job := StreamingExtractJob{ + SignalID: 1, + OffsetHz: 12500, + Bandwidth: 20000, + OutRate: 200000, + NumTaps: 65, + ConfigHash: 777, + } + t.Run("DeterministicIQ", func(t *testing.T) { + r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} + steps := makeStreamingValidationSteps( + makeDeterministicIQ(1500), + []int{0, 1, 2, 17, 63, 64, 65, 129, 511}, + []StreamingExtractJob{job}, + ) + runStreamingExecSequenceAgainstOracle(t, r, steps, 1e-5, 1e-9) + }) + t.Run("ToneNoiseIQ", func(t *testing.T) { + r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} + steps := makeStreamingValidationSteps( + makeToneNoiseIQ(4096, 0.023), + []int{7, 20, 3, 63, 64, 65, 777}, + []StreamingExtractJob{job}, + ) + runStreamingExecSequenceAgainstOracle(t, r, steps, 1e-5, 1e-9) + }) +} + +func TestStreamingGPUExecLifecycleMatchesCPUOracle(t *testing.T) { + r := &BatchRunner{ + eng: &Engine{sampleRate: 4000000}, + streamState: make(map[int64]*ExtractStreamState), + nativeState: make(map[int64]*nativeStreamingSignalState), + } + baseA := StreamingExtractJob{ + SignalID: 11, + OffsetHz: 12500, + Bandwidth: 20000, + OutRate: 200000, + NumTaps: 65, + ConfigHash: 1001, + } + baseB := StreamingExtractJob{ + SignalID: 22, + OffsetHz: -18750, + Bandwidth: 16000, + OutRate: 100000, + NumTaps: 33, + ConfigHash: 2002, + } + steps := []streamingValidationStep{ + { + name: "prime_both_signals", + iq: makeDeterministicIQ(512), + jobs: []StreamingExtractJob{baseA, baseB}, + }, + { + name: "config_reset_with_zero_new", + iq: nil, + jobs: []StreamingExtractJob{{SignalID: baseA.SignalID, OffsetHz: baseA.OffsetHz, Bandwidth: baseA.Bandwidth, OutRate: baseA.OutRate, NumTaps: baseA.NumTaps, ConfigHash: baseA.ConfigHash + 1}, baseB}, + }, + { + name: "signal_b_disappears", + iq: makeToneNoiseIQ(96, 0.041), + jobs: []StreamingExtractJob{baseA}, + }, + { + name: "signal_b_reappears_fresh", + iq: makeDeterministicIQ(160), + jobs: []StreamingExtractJob{baseA, baseB}, + }, + { + name: "small_history_boundary_chunk", + iq: makeToneNoiseIQ(65, 0.017), + jobs: []StreamingExtractJob{baseA, baseB}, + }, + } + runStreamingExecSequenceAgainstOracle(t, r, steps, 1e-5, 1e-9) + if _, ok := r.nativeState[baseB.SignalID]; ok { + t.Fatalf("expected safe host-oracle path to keep native state inactive while gate is off") } } diff --git a/internal/demod/gpudemod/streaming_gpu_native_prepare.go b/internal/demod/gpudemod/streaming_gpu_native_prepare.go index a62b37e..247998d 100644 --- a/internal/demod/gpudemod/streaming_gpu_native_prepare.go +++ b/internal/demod/gpudemod/streaming_gpu_native_prepare.go @@ -15,101 +15,270 @@ import ( ) func (r *BatchRunner) executeStreamingGPUNativePrepared(invocations []StreamingGPUInvocation) ([]StreamingGPUExecutionResult, error) { + if r == nil || r.eng == nil { + return nil, ErrUnavailable + } + if r.nativeState == nil { + r.nativeState = make(map[int64]*nativeStreamingSignalState) + } results := make([]StreamingGPUExecutionResult, len(invocations)) for i, inv := range invocations { - phaseInc := -2.0 * math.Pi * inv.OffsetHz / float64(inv.SampleRate) - outCap := len(inv.IQNew)/maxInt(1, inv.Decim) + 2 - outHost := make([]complex64, outCap) - histCap := maxInt(0, inv.NumTaps-1) - histHost := make([]complex64, histCap) - var nOut C.int - var phaseCountOut C.int - var phaseEndOut C.double - - var dInNew, dHistIn, dOut, dHistOut unsafe.Pointer - var dTaps unsafe.Pointer - if len(inv.IQNew) > 0 { - if bridgeCudaMalloc(&dInNew, uintptr(len(inv.IQNew))*unsafe.Sizeof(C.gpud_float2{})) != 0 { - return nil, ErrUnavailable - } - defer bridgeCudaFree(dInNew) - if bridgeMemcpyH2D(dInNew, unsafe.Pointer(&inv.IQNew[0]), uintptr(len(inv.IQNew))*unsafe.Sizeof(complex64(0))) != 0 { - return nil, ErrUnavailable - } + state, err := r.getOrInitNativeStreamingState(inv) + if err != nil { + return nil, err } - if len(inv.ShiftedHistory) > 0 { - if bridgeCudaMalloc(&dHistIn, uintptr(len(inv.ShiftedHistory))*unsafe.Sizeof(C.gpud_float2{})) != 0 { - return nil, ErrUnavailable - } - defer bridgeCudaFree(dHistIn) - if bridgeMemcpyH2D(dHistIn, unsafe.Pointer(&inv.ShiftedHistory[0]), uintptr(len(inv.ShiftedHistory))*unsafe.Sizeof(complex64(0))) != 0 { - return nil, ErrUnavailable - } - } - if len(inv.PolyphaseTaps) > 0 { - if bridgeCudaMalloc(&dTaps, uintptr(len(inv.PolyphaseTaps))*unsafe.Sizeof(C.float(0))) != 0 { - return nil, ErrUnavailable + if len(inv.IQNew) > 0 { + if err := ensureNativeBuffer(&state.dInNew, &state.inNewCap, len(inv.IQNew), unsafe.Sizeof(C.gpud_float2{})); err != nil { + return nil, err } - defer bridgeCudaFree(dTaps) - if bridgeMemcpyH2D(dTaps, unsafe.Pointer(&inv.PolyphaseTaps[0]), uintptr(len(inv.PolyphaseTaps))*unsafe.Sizeof(float32(0))) != 0 { + if bridgeMemcpyH2D(state.dInNew, unsafe.Pointer(&inv.IQNew[0]), uintptr(len(inv.IQNew))*unsafe.Sizeof(complex64(0))) != 0 { return nil, ErrUnavailable } } + outCap := len(inv.IQNew)/maxInt(1, inv.Decim) + 2 if outCap > 0 { - if bridgeCudaMalloc(&dOut, uintptr(outCap)*unsafe.Sizeof(C.gpud_float2{})) != 0 { - return nil, ErrUnavailable - } - defer bridgeCudaFree(dOut) - } - if histCap > 0 { - if bridgeCudaMalloc(&dHistOut, uintptr(histCap)*unsafe.Sizeof(C.gpud_float2{})) != 0 { - return nil, ErrUnavailable + if err := ensureNativeBuffer(&state.dOut, &state.outCap, outCap, unsafe.Sizeof(C.gpud_float2{})); err != nil { + return nil, err } - defer bridgeCudaFree(dHistOut) } - res := bridgeLaunchStreamingPolyphasePrepare( - (*C.gpud_float2)(dInNew), + phaseInc := -2.0 * math.Pi * inv.OffsetHz / float64(inv.SampleRate) + // The native export consumes phase carry as host scalars while sample/history + // buffers remain device-resident, so keep these counters in nativeState. + var nOut C.int + historyLen := C.int(state.historyLen) + phaseCount := C.int(state.phaseCount) + phaseNCO := C.double(state.phaseNCO) + res := bridgeLaunchStreamingPolyphaseStateful( + (*C.gpud_float2)(state.dInNew), len(inv.IQNew), - (*C.gpud_float2)(dHistIn), - len(inv.ShiftedHistory), - (*C.float)(dTaps), - len(inv.PolyphaseTaps), - inv.Decim, - inv.NumTaps, - inv.PhaseCountIn, - inv.NCOPhaseIn, + (*C.gpud_float2)(state.dShifted), + (*C.float)(state.dTaps), + state.tapsLen, + state.decim, + state.numTaps, + (*C.gpud_float2)(state.dHistory), + (*C.gpud_float2)(state.dHistoryScratch), + state.historyCap, + &historyLen, + &phaseCount, + &phaseNCO, phaseInc, - (*C.gpud_float2)(dOut), + (*C.gpud_float2)(state.dOut), + outCap, &nOut, - &phaseCountOut, - &phaseEndOut, - (*C.gpud_float2)(dHistOut), ) if res != 0 { return nil, ErrUnavailable } - if int(nOut) > 0 { - if bridgeMemcpyD2H(unsafe.Pointer(&outHost[0]), dOut, uintptr(int(nOut))*unsafe.Sizeof(complex64(0))) != 0 { + state.historyLen = int(historyLen) + state.phaseCount = int(phaseCount) + state.phaseNCO = float64(phaseNCO) + + outHost := make([]complex64, int(nOut)) + if len(outHost) > 0 { + if bridgeMemcpyD2H(unsafe.Pointer(&outHost[0]), state.dOut, uintptr(len(outHost))*unsafe.Sizeof(complex64(0))) != 0 { return nil, ErrUnavailable } } - if histCap > 0 { - if bridgeMemcpyD2H(unsafe.Pointer(&histHost[0]), dHistOut, uintptr(histCap)*unsafe.Sizeof(complex64(0))) != 0 { + histHost := make([]complex64, state.historyLen) + if state.historyLen > 0 { + if bridgeMemcpyD2H(unsafe.Pointer(&histHost[0]), state.dHistory, uintptr(state.historyLen)*unsafe.Sizeof(complex64(0))) != 0 { return nil, ErrUnavailable } } + results[i] = StreamingGPUExecutionResult{ SignalID: inv.SignalID, Mode: StreamingGPUExecCUDA, - IQ: append([]complex64(nil), outHost[:int(nOut)]...), + IQ: outHost, Rate: inv.OutRate, - NOut: int(nOut), - PhaseCountOut: int(phaseCountOut), - NCOPhaseOut: float64(phaseEndOut), - HistoryOut: append([]complex64(nil), histHost...), - HistoryLenOut: histCap, + NOut: len(outHost), + PhaseCountOut: state.phaseCount, + NCOPhaseOut: state.phaseNCO, + HistoryOut: histHost, + HistoryLenOut: len(histHost), } } return results, nil } + +func (r *BatchRunner) getOrInitNativeStreamingState(inv StreamingGPUInvocation) (*nativeStreamingSignalState, error) { + state := r.nativeState[inv.SignalID] + needReset := false + historyCap := maxInt(0, inv.NumTaps-1) + if state == nil { + state = &nativeStreamingSignalState{signalID: inv.SignalID} + r.nativeState[inv.SignalID] = state + needReset = true + } + if state.configHash != inv.ConfigHash { + needReset = true + } + if state.decim != inv.Decim || state.numTaps != inv.NumTaps || state.tapsLen != len(inv.PolyphaseTaps) { + needReset = true + } + if state.historyCap != historyCap { + needReset = true + } + if needReset { + releaseNativeStreamingSignalState(state) + } + if len(inv.PolyphaseTaps) == 0 { + return nil, ErrUnavailable + } + if state.dTaps == nil && len(inv.PolyphaseTaps) > 0 { + if bridgeCudaMalloc(&state.dTaps, uintptr(len(inv.PolyphaseTaps))*unsafe.Sizeof(C.float(0))) != 0 { + return nil, ErrUnavailable + } + if bridgeMemcpyH2D(state.dTaps, unsafe.Pointer(&inv.PolyphaseTaps[0]), uintptr(len(inv.PolyphaseTaps))*unsafe.Sizeof(float32(0))) != 0 { + return nil, ErrUnavailable + } + state.tapsLen = len(inv.PolyphaseTaps) + } + if state.dShifted == nil { + minCap := maxInt(1, len(inv.IQNew)) + if bridgeCudaMalloc(&state.dShifted, uintptr(minCap)*unsafe.Sizeof(C.gpud_float2{})) != 0 { + return nil, ErrUnavailable + } + state.shiftedCap = minCap + } + if state.shiftedCap < len(inv.IQNew) { + if bridgeCudaFree(state.dShifted) != 0 { + return nil, ErrUnavailable + } + state.dShifted = nil + state.shiftedCap = 0 + if bridgeCudaMalloc(&state.dShifted, uintptr(len(inv.IQNew))*unsafe.Sizeof(C.gpud_float2{})) != 0 { + return nil, ErrUnavailable + } + state.shiftedCap = len(inv.IQNew) + } + if state.dHistory == nil && historyCap > 0 { + if bridgeCudaMalloc(&state.dHistory, uintptr(historyCap)*unsafe.Sizeof(C.gpud_float2{})) != 0 { + return nil, ErrUnavailable + } + } + if state.dHistoryScratch == nil && historyCap > 0 { + if bridgeCudaMalloc(&state.dHistoryScratch, uintptr(historyCap)*unsafe.Sizeof(C.gpud_float2{})) != 0 { + return nil, ErrUnavailable + } + state.historyScratchCap = historyCap + } + if needReset { + state.phaseCount = inv.PhaseCountIn + state.phaseNCO = inv.NCOPhaseIn + state.historyLen = minInt(len(inv.ShiftedHistory), historyCap) + if state.historyLen > 0 { + if bridgeMemcpyH2D(state.dHistory, unsafe.Pointer(&inv.ShiftedHistory[len(inv.ShiftedHistory)-state.historyLen]), uintptr(state.historyLen)*unsafe.Sizeof(complex64(0))) != 0 { + return nil, ErrUnavailable + } + } + } + state.decim = inv.Decim + state.numTaps = inv.NumTaps + state.historyCap = historyCap + state.historyScratchCap = historyCap + state.configHash = inv.ConfigHash + return state, nil +} + +func ensureNativeBuffer(ptr *unsafe.Pointer, capRef *int, need int, elemSize uintptr) error { + if need <= 0 { + return nil + } + if *ptr != nil && *capRef >= need { + return nil + } + if *ptr != nil { + if bridgeCudaFree(*ptr) != 0 { + return ErrUnavailable + } + *ptr = nil + *capRef = 0 + } + if bridgeCudaMalloc(ptr, uintptr(need)*elemSize) != 0 { + return ErrUnavailable + } + *capRef = need + return nil +} + +func (r *BatchRunner) syncNativeStreamingStates(active map[int64]struct{}) { + if r == nil || r.nativeState == nil { + return + } + for id, state := range r.nativeState { + if _, ok := active[id]; ok { + continue + } + releaseNativeStreamingSignalState(state) + delete(r.nativeState, id) + } +} + +func (r *BatchRunner) resetNativeStreamingState(signalID int64) { + if r == nil || r.nativeState == nil { + return + } + if state := r.nativeState[signalID]; state != nil { + releaseNativeStreamingSignalState(state) + } + delete(r.nativeState, signalID) +} + +func (r *BatchRunner) resetAllNativeStreamingStates() { + if r == nil { + return + } + r.freeAllNativeStreamingStates() + r.nativeState = make(map[int64]*nativeStreamingSignalState) +} + +func (r *BatchRunner) freeAllNativeStreamingStates() { + if r == nil || r.nativeState == nil { + return + } + for id, state := range r.nativeState { + releaseNativeStreamingSignalState(state) + delete(r.nativeState, id) + } +} + +func releaseNativeStreamingSignalState(state *nativeStreamingSignalState) { + if state == nil { + return + } + for _, ptr := range []*unsafe.Pointer{ + &state.dInNew, + &state.dShifted, + &state.dOut, + &state.dTaps, + &state.dHistory, + &state.dHistoryScratch, + } { + if *ptr != nil { + _ = bridgeCudaFree(*ptr) + *ptr = nil + } + } + state.inNewCap = 0 + state.shiftedCap = 0 + state.outCap = 0 + state.tapsLen = 0 + state.historyCap = 0 + state.historyLen = 0 + state.historyScratchCap = 0 + state.phaseCount = 0 + state.phaseNCO = 0 + state.decim = 0 + state.numTaps = 0 + state.configHash = 0 +} + +func minInt(a int, b int) int { + if a < b { + return a + } + return b +} diff --git a/internal/demod/gpudemod/streaming_gpu_native_prepare_stub.go b/internal/demod/gpudemod/streaming_gpu_native_prepare_stub.go index 3180b99..7f1e4c0 100644 --- a/internal/demod/gpudemod/streaming_gpu_native_prepare_stub.go +++ b/internal/demod/gpudemod/streaming_gpu_native_prepare_stub.go @@ -6,3 +6,39 @@ func (r *BatchRunner) executeStreamingGPUNativePrepared(invocations []StreamingG _ = invocations return nil, ErrUnavailable } + +func (r *BatchRunner) syncNativeStreamingStates(active map[int64]struct{}) { + _ = active + if r == nil { + return + } + if r.nativeState == nil { + r.nativeState = make(map[int64]*nativeStreamingSignalState) + } + for id := range r.nativeState { + if _, ok := active[id]; !ok { + delete(r.nativeState, id) + } + } +} + +func (r *BatchRunner) resetNativeStreamingState(signalID int64) { + if r == nil || r.nativeState == nil { + return + } + delete(r.nativeState, signalID) +} + +func (r *BatchRunner) resetAllNativeStreamingStates() { + if r == nil { + return + } + r.nativeState = make(map[int64]*nativeStreamingSignalState) +} + +func (r *BatchRunner) freeAllNativeStreamingStates() { + if r == nil { + return + } + r.nativeState = nil +} diff --git a/internal/demod/gpudemod/streaming_gpu_native_prepare_test.go b/internal/demod/gpudemod/streaming_gpu_native_prepare_test.go index 20b8c18..9312d65 100644 --- a/internal/demod/gpudemod/streaming_gpu_native_prepare_test.go +++ b/internal/demod/gpudemod/streaming_gpu_native_prepare_test.go @@ -2,10 +2,49 @@ package gpudemod -import "testing" +import ( + "os" + "path/filepath" + "testing" +) -func TestStreamingGPUNativePreparedComparableToCPUOracle(t *testing.T) { - r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} +func configureNativePreparedDLLPath(t *testing.T) { + t.Helper() + candidates := []string{ + filepath.Join("build", "gpudemod_kernels.dll"), + filepath.Join("internal", "demod", "gpudemod", "build", "gpudemod_kernels.dll"), + "gpudemod_kernels.dll", + } + for _, candidate := range candidates { + if _, err := os.Stat(candidate); err == nil { + abs, err := filepath.Abs(candidate) + if err != nil { + t.Fatalf("resolve native prepared DLL path: %v", err) + } + t.Setenv("GPUMOD_DLL", abs) + return + } + } +} + +func requireNativePreparedTestRunner(t *testing.T) *BatchRunner { + t.Helper() + configureNativePreparedDLLPath(t) + if err := ensureDLLLoaded(); err != nil { + t.Skipf("native prepared path unavailable: %v", err) + } + if !Available() { + t.Skip("native prepared path unavailable: cuda device not available") + } + r, err := NewBatchRunner(32768, 4000000) + if err != nil { + t.Skipf("native prepared path unavailable: %v", err) + } + t.Cleanup(r.Close) + return r +} + +func TestStreamingGPUNativePreparedMatchesCPUOracleAcrossChunkPatterns(t *testing.T) { job := StreamingExtractJob{ SignalID: 1, OffsetHz: 12500, @@ -14,24 +53,154 @@ func TestStreamingGPUNativePreparedComparableToCPUOracle(t *testing.T) { NumTaps: 65, ConfigHash: 777, } - iq := makeDeterministicIQ(16000) - gpuRes, err := r.StreamingExtractGPU(iq, []StreamingExtractJob{job}) - if err != nil { - t.Fatalf("unexpected native prepared GPU error: %v", err) + exec := func(r *BatchRunner, invocations []StreamingGPUInvocation) ([]StreamingGPUExecutionResult, error) { + return r.executeStreamingGPUNativePrepared(invocations) } - oracleRunner := NewCPUOracleRunner(4000000) - oracleRes, err := oracleRunner.StreamingExtract(iq, []StreamingExtractJob{job}) - if err != nil { - t.Fatalf("unexpected oracle error: %v", err) + t.Run("DeterministicIQ", func(t *testing.T) { + r := requireNativePreparedTestRunner(t) + steps := makeStreamingValidationSteps( + makeDeterministicIQ(8192), + []int{0, 1, 2, 17, 63, 64, 65, 129, 511, 2048}, + []StreamingExtractJob{job}, + ) + runPreparedSequenceAgainstOracle(t, r, exec, steps, 1e-4, 1e-8) + }) + t.Run("ToneNoiseIQ", func(t *testing.T) { + r := requireNativePreparedTestRunner(t) + steps := makeStreamingValidationSteps( + makeToneNoiseIQ(12288, 0.023), + []int{7, 20, 3, 63, 64, 65, 777, 2048, 4096}, + []StreamingExtractJob{job}, + ) + runPreparedSequenceAgainstOracle(t, r, exec, steps, 1e-4, 1e-8) + }) +} + +func TestStreamingGPUNativePreparedLifecycleResetAndCapacity(t *testing.T) { + r := requireNativePreparedTestRunner(t) + exec := func(invocations []StreamingGPUInvocation) ([]StreamingGPUExecutionResult, error) { + return r.executeStreamingGPUNativePrepared(invocations) } - if len(gpuRes) != 1 || len(oracleRes) != 1 { - t.Fatalf("unexpected result sizes: gpu=%d oracle=%d", len(gpuRes), len(oracleRes)) + jobA := StreamingExtractJob{ + SignalID: 11, + OffsetHz: 12500, + Bandwidth: 20000, + OutRate: 200000, + NumTaps: 65, + ConfigHash: 3001, + } + jobB := StreamingExtractJob{ + SignalID: 22, + OffsetHz: -18750, + Bandwidth: 16000, + OutRate: 100000, + NumTaps: 33, + ConfigHash: 4002, } - metrics, stats := CompareOracleAndGPUHostOracle(oracleRes[0], gpuRes[0]) - if stats.Count == 0 { - t.Fatalf("expected compare count > 0") + + steps := []streamingValidationStep{ + { + name: "prime_both_signals", + iq: makeDeterministicIQ(256), + jobs: []StreamingExtractJob{jobA, jobB}, + }, + { + name: "grow_capacity", + iq: makeToneNoiseIQ(4096, 0.037), + jobs: []StreamingExtractJob{jobA, jobB}, + }, + { + name: "config_reset_zero_new", + iq: nil, + jobs: []StreamingExtractJob{{SignalID: jobA.SignalID, OffsetHz: jobA.OffsetHz, Bandwidth: jobA.Bandwidth, OutRate: jobA.OutRate, NumTaps: jobA.NumTaps, ConfigHash: jobA.ConfigHash + 1}, jobB}, + }, + { + name: "signal_b_disappears", + iq: makeDeterministicIQ(64), + jobs: []StreamingExtractJob{jobA}, + }, + { + name: "signal_b_reappears", + iq: makeToneNoiseIQ(96, 0.017), + jobs: []StreamingExtractJob{jobA, jobB}, + }, + { + name: "history_boundary", + iq: makeDeterministicIQ(65), + jobs: []StreamingExtractJob{jobA, jobB}, + }, } - if metrics.RefMaxAbsErr > 1e-4 { - t.Fatalf("native prepared path diverges too much from oracle: max abs err=%f", metrics.RefMaxAbsErr) + + oracle := NewCPUOracleRunner(r.eng.sampleRate) + var grownCap int + for idx, step := range steps { + invocations, err := r.buildStreamingGPUInvocations(step.iq, step.jobs) + if err != nil { + t.Fatalf("step %d (%s): build invocations failed: %v", idx, step.name, err) + } + got, err := exec(invocations) + if err != nil { + t.Fatalf("step %d (%s): native prepared exec failed: %v", idx, step.name, err) + } + want, err := oracle.StreamingExtract(step.iq, step.jobs) + if err != nil { + t.Fatalf("step %d (%s): oracle failed: %v", idx, step.name, err) + } + if len(got) != len(want) { + t.Fatalf("step %d (%s): result count mismatch: got=%d want=%d", idx, step.name, len(got), len(want)) + } + applied := r.applyStreamingGPUExecutionResults(got) + for i, job := range step.jobs { + oracleState := oracle.States[job.SignalID] + requirePreparedExecutionResultMatchesOracle(t, got[i], want[i], oracleState, 1e-4, 1e-8) + requireStreamingExtractResultMatchesOracle(t, applied[i], want[i]) + requireExtractStateMatchesOracle(t, r.streamState[job.SignalID], oracleState, 1e-8, 1e-4) + + state := r.nativeState[job.SignalID] + if state == nil { + t.Fatalf("step %d (%s): missing native state for signal %d", idx, step.name, job.SignalID) + } + if state.configHash != job.ConfigHash { + t.Fatalf("step %d (%s): native config hash mismatch for signal %d: got=%d want=%d", idx, step.name, job.SignalID, state.configHash, job.ConfigHash) + } + if state.decim != oracleState.Decim { + t.Fatalf("step %d (%s): native decim mismatch for signal %d: got=%d want=%d", idx, step.name, job.SignalID, state.decim, oracleState.Decim) + } + if state.numTaps != oracleState.NumTaps { + t.Fatalf("step %d (%s): native num taps mismatch for signal %d: got=%d want=%d", idx, step.name, job.SignalID, state.numTaps, oracleState.NumTaps) + } + if state.historyCap != maxInt(0, oracleState.NumTaps-1) { + t.Fatalf("step %d (%s): native history cap mismatch for signal %d: got=%d want=%d", idx, step.name, job.SignalID, state.historyCap, maxInt(0, oracleState.NumTaps-1)) + } + if state.historyLen != len(oracleState.ShiftedHistory) { + t.Fatalf("step %d (%s): native history len mismatch for signal %d: got=%d want=%d", idx, step.name, job.SignalID, state.historyLen, len(oracleState.ShiftedHistory)) + } + if len(step.iq) > 0 && state.shiftedCap < len(step.iq) { + t.Fatalf("step %d (%s): native shifted capacity too small for signal %d: got=%d need>=%d", idx, step.name, job.SignalID, state.shiftedCap, len(step.iq)) + } + if state.outCap < got[i].NOut { + t.Fatalf("step %d (%s): native out capacity too small for signal %d: got=%d need>=%d", idx, step.name, job.SignalID, state.outCap, got[i].NOut) + } + if job.SignalID == jobA.SignalID && state.shiftedCap > grownCap { + grownCap = state.shiftedCap + } + } + if step.name == "grow_capacity" && grownCap < len(step.iq) { + t.Fatalf("expected capacity growth for signal %d, got=%d want>=%d", jobA.SignalID, grownCap, len(step.iq)) + } + if step.name == "config_reset_zero_new" { + state := r.nativeState[jobA.SignalID] + if state == nil { + t.Fatalf("missing native state for signal %d after config reset", jobA.SignalID) + } + if state.historyLen != 0 { + t.Fatalf("expected cleared native history after config reset, got=%d", state.historyLen) + } + } + if step.name == "signal_b_disappears" { + if _, ok := r.nativeState[jobB.SignalID]; ok { + t.Fatalf("expected native state for signal %d to be removed on disappearance", jobB.SignalID) + } + } } } diff --git a/internal/demod/gpudemod/streaming_gpu_native_state.go b/internal/demod/gpudemod/streaming_gpu_native_state.go new file mode 100644 index 0000000..e1b6460 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_native_state.go @@ -0,0 +1,28 @@ +package gpudemod + +import "unsafe" + +type nativeStreamingSignalState struct { + signalID int64 + + configHash uint64 + decim int + numTaps int + + dInNew unsafe.Pointer + dShifted unsafe.Pointer + dOut unsafe.Pointer + dTaps unsafe.Pointer + dHistory unsafe.Pointer + dHistoryScratch unsafe.Pointer + + inNewCap int + shiftedCap int + outCap int + tapsLen int + historyCap int + historyLen int + historyScratchCap int + phaseCount int + phaseNCO float64 +} diff --git a/internal/demod/gpudemod/streaming_gpu_prepare.go b/internal/demod/gpudemod/streaming_gpu_prepare.go index 74e4f69..8e8a957 100644 --- a/internal/demod/gpudemod/streaming_gpu_prepare.go +++ b/internal/demod/gpudemod/streaming_gpu_prepare.go @@ -14,6 +14,7 @@ func (r *BatchRunner) buildStreamingGPUInvocations(iqNew []complex64, jobs []Str } invocations[i] = StreamingGPUInvocation{ SignalID: job.SignalID, + ConfigHash: state.ConfigHash, OffsetHz: job.OffsetHz, OutRate: job.OutRate, Bandwidth: job.Bandwidth, @@ -34,6 +35,7 @@ func (r *BatchRunner) buildStreamingGPUInvocations(iqNew []complex64, jobs []Str delete(r.streamState, signalID) } } + r.syncNativeStreamingStates(active) return invocations, nil } diff --git a/internal/demod/gpudemod/streaming_gpu_stub.go b/internal/demod/gpudemod/streaming_gpu_stub.go index da85af4..500e235 100644 --- a/internal/demod/gpudemod/streaming_gpu_stub.go +++ b/internal/demod/gpudemod/streaming_gpu_stub.go @@ -1,7 +1,5 @@ package gpudemod -import "fmt" - func updateShiftedHistory(prev []complex64, shiftedNew []complex64, numTaps int) []complex64 { need := numTaps - 1 if need <= 0 { @@ -18,22 +16,11 @@ func updateShiftedHistory(prev []complex64, shiftedNew []complex64, numTaps int) return out } -// StreamingExtractGPU is the planned production entry point for the stateful -// GPU extractor path. It intentionally exists early as an explicit boundary so -// callers can migrate away from legacy overlap+trim semantics. -// -// Current status: -// - validates jobs against persistent per-signal state ownership -// - enforces exact integer decimation -// - initializes per-signal state (config hash, taps, history capacity) -// - does not yet execute the final stateful polyphase GPU kernel path +// StreamingExtractGPU is the production entry point for the stateful streaming +// extractor path. Execution strategy is selected by StreamingExtractGPUExec. func (r *BatchRunner) StreamingExtractGPU(iqNew []complex64, jobs []StreamingExtractJob) ([]StreamingExtractResult, error) { if r == nil || r.eng == nil { return nil, ErrUnavailable } - if results, err := r.StreamingExtractGPUExec(iqNew, jobs); err == nil { - return results, nil - } - _, _ = iqNew, jobs - return nil, fmt.Errorf("StreamingExtractGPU not implemented yet: stateful polyphase GPU path pending") + return r.StreamingExtractGPUExec(iqNew, jobs) } diff --git a/internal/demod/gpudemod/streaming_gpu_stub_test.go b/internal/demod/gpudemod/streaming_gpu_stub_test.go index 9889315..2c947d3 100644 --- a/internal/demod/gpudemod/streaming_gpu_stub_test.go +++ b/internal/demod/gpudemod/streaming_gpu_stub_test.go @@ -2,7 +2,7 @@ package gpudemod import "testing" -func TestStreamingGPUStubRemainsExplicitlyUnimplemented(t *testing.T) { +func TestStreamingGPUUsesSafeProductionDefault(t *testing.T) { r := &BatchRunner{eng: &Engine{sampleRate: 4000000}, streamState: make(map[int64]*ExtractStreamState)} job := StreamingExtractJob{ SignalID: 1, @@ -13,9 +13,15 @@ func TestStreamingGPUStubRemainsExplicitlyUnimplemented(t *testing.T) { ConfigHash: 777, } iq := makeDeterministicIQ(1000) - _, err := r.StreamingExtractGPU(iq, []StreamingExtractJob{job}) - if err == nil { - t.Fatalf("expected not-implemented error from GPU stub") + results, err := r.StreamingExtractGPU(iq, []StreamingExtractJob{job}) + if err != nil { + t.Fatalf("expected safe production default path, got error: %v", err) + } + if len(results) != 1 { + t.Fatalf("expected 1 result, got %d", len(results)) + } + if results[0].NOut == 0 { + t.Fatalf("expected non-zero output count from safe production path") } } diff --git a/internal/demod/gpudemod/streaming_gpu_validation_helpers_test.go b/internal/demod/gpudemod/streaming_gpu_validation_helpers_test.go new file mode 100644 index 0000000..b88b102 --- /dev/null +++ b/internal/demod/gpudemod/streaming_gpu_validation_helpers_test.go @@ -0,0 +1,213 @@ +package gpudemod + +import ( + "math" + "testing" +) + +type streamingValidationStep struct { + name string + iq []complex64 + jobs []StreamingExtractJob +} + +type streamingPreparedExecutor func(*BatchRunner, []StreamingGPUInvocation) ([]StreamingGPUExecutionResult, error) + +func makeToneNoiseIQ(n int, phaseInc float64) []complex64 { + out := make([]complex64, n) + phase := 0.0 + for i := 0; i < n; i++ { + tone := complex(math.Cos(phase), math.Sin(phase)) + noiseI := 0.17*math.Cos(0.113*float64(i)+0.31) + 0.07*math.Sin(0.071*float64(i)) + noiseQ := 0.13*math.Sin(0.097*float64(i)+0.11) - 0.05*math.Cos(0.043*float64(i)) + out[i] = complex64(0.85*tone + 0.15*complex(noiseI, noiseQ)) + phase += phaseInc + } + return out +} + +func makeStreamingValidationSteps(iq []complex64, chunkSizes []int, jobs []StreamingExtractJob) []streamingValidationStep { + steps := make([]streamingValidationStep, 0, len(chunkSizes)+1) + pos := 0 + for idx, n := range chunkSizes { + if n < 0 { + n = 0 + } + end := pos + n + if end > len(iq) { + end = len(iq) + } + steps = append(steps, streamingValidationStep{ + name: "chunk", + iq: append([]complex64(nil), iq[pos:end]...), + jobs: append([]StreamingExtractJob(nil), jobs...), + }) + _ = idx + pos = end + } + if pos < len(iq) { + steps = append(steps, streamingValidationStep{ + name: "remainder", + iq: append([]complex64(nil), iq[pos:]...), + jobs: append([]StreamingExtractJob(nil), jobs...), + }) + } + return steps +} + +func requirePhaseClose(t *testing.T, got float64, want float64, tol float64) { + t.Helper() + diff := got - want + for diff > math.Pi { + diff -= 2 * math.Pi + } + for diff < -math.Pi { + diff += 2 * math.Pi + } + if math.Abs(diff) > tol { + t.Fatalf("phase mismatch: got=%0.12f want=%0.12f diff=%0.12f tol=%0.12f", got, want, diff, tol) + } +} + +func requireStreamingExtractResultMatchesOracle(t *testing.T, got StreamingExtractResult, want StreamingExtractResult) { + t.Helper() + if got.SignalID != want.SignalID { + t.Fatalf("signal id mismatch: got=%d want=%d", got.SignalID, want.SignalID) + } + if got.Rate != want.Rate { + t.Fatalf("rate mismatch for signal %d: got=%d want=%d", got.SignalID, got.Rate, want.Rate) + } + if got.NOut != want.NOut { + t.Fatalf("n_out mismatch for signal %d: got=%d want=%d", got.SignalID, got.NOut, want.NOut) + } + if got.PhaseCount != want.PhaseCount { + t.Fatalf("phase count mismatch for signal %d: got=%d want=%d", got.SignalID, got.PhaseCount, want.PhaseCount) + } + if got.HistoryLen != want.HistoryLen { + t.Fatalf("history len mismatch for signal %d: got=%d want=%d", got.SignalID, got.HistoryLen, want.HistoryLen) + } +} + +func requirePreparedExecutionResultMatchesOracle(t *testing.T, got StreamingGPUExecutionResult, want StreamingExtractResult, oracleState *CPUOracleState, sampleTol float64, phaseTol float64) { + t.Helper() + if oracleState == nil { + t.Fatalf("missing oracle state for signal %d", got.SignalID) + } + if got.SignalID != want.SignalID { + t.Fatalf("signal id mismatch: got=%d want=%d", got.SignalID, want.SignalID) + } + if got.Rate != want.Rate { + t.Fatalf("rate mismatch for signal %d: got=%d want=%d", got.SignalID, got.Rate, want.Rate) + } + if got.NOut != want.NOut { + t.Fatalf("n_out mismatch for signal %d: got=%d want=%d", got.SignalID, got.NOut, want.NOut) + } + if got.PhaseCountOut != oracleState.PhaseCount { + t.Fatalf("phase count mismatch for signal %d: got=%d want=%d", got.SignalID, got.PhaseCountOut, oracleState.PhaseCount) + } + requirePhaseClose(t, got.NCOPhaseOut, oracleState.NCOPhase, phaseTol) + if got.HistoryLenOut != len(oracleState.ShiftedHistory) { + t.Fatalf("history len mismatch for signal %d: got=%d want=%d", got.SignalID, got.HistoryLenOut, len(oracleState.ShiftedHistory)) + } + requireComplexSlicesClose(t, got.IQ, want.IQ, sampleTol) + requireComplexSlicesClose(t, got.HistoryOut, oracleState.ShiftedHistory, sampleTol) +} + +func requireExtractStateMatchesOracle(t *testing.T, got *ExtractStreamState, want *CPUOracleState, phaseTol float64, sampleTol float64) { + t.Helper() + if got == nil || want == nil { + t.Fatalf("state mismatch: got nil=%t want nil=%t", got == nil, want == nil) + } + if got.SignalID != want.SignalID { + t.Fatalf("signal id mismatch: got=%d want=%d", got.SignalID, want.SignalID) + } + if got.ConfigHash != want.ConfigHash { + t.Fatalf("config hash mismatch for signal %d: got=%d want=%d", got.SignalID, got.ConfigHash, want.ConfigHash) + } + if got.Decim != want.Decim { + t.Fatalf("decim mismatch for signal %d: got=%d want=%d", got.SignalID, got.Decim, want.Decim) + } + if got.NumTaps != want.NumTaps { + t.Fatalf("num taps mismatch for signal %d: got=%d want=%d", got.SignalID, got.NumTaps, want.NumTaps) + } + if got.PhaseCount != want.PhaseCount { + t.Fatalf("phase count mismatch for signal %d: got=%d want=%d", got.SignalID, got.PhaseCount, want.PhaseCount) + } + requirePhaseClose(t, got.NCOPhase, want.NCOPhase, phaseTol) + requireComplexSlicesClose(t, got.ShiftedHistory, want.ShiftedHistory, sampleTol) +} + +func requireStateKeysMatchOracle(t *testing.T, got map[int64]*ExtractStreamState, want map[int64]*CPUOracleState) { + t.Helper() + if len(got) != len(want) { + t.Fatalf("active state count mismatch: got=%d want=%d", len(got), len(want)) + } + for signalID := range want { + if got[signalID] == nil { + t.Fatalf("missing active state for signal %d", signalID) + } + } + for signalID := range got { + if want[signalID] == nil { + t.Fatalf("unexpected active state for signal %d", signalID) + } + } +} + +func runStreamingExecSequenceAgainstOracle(t *testing.T, runner *BatchRunner, steps []streamingValidationStep, sampleTol float64, phaseTol float64) { + t.Helper() + oracle := NewCPUOracleRunner(runner.eng.sampleRate) + for idx, step := range steps { + got, err := runner.StreamingExtractGPUExec(step.iq, step.jobs) + if err != nil { + t.Fatalf("step %d (%s): exec failed: %v", idx, step.name, err) + } + want, err := oracle.StreamingExtract(step.iq, step.jobs) + if err != nil { + t.Fatalf("step %d (%s): oracle failed: %v", idx, step.name, err) + } + if len(got) != len(want) { + t.Fatalf("step %d (%s): result count mismatch: got=%d want=%d", idx, step.name, len(got), len(want)) + } + for i, job := range step.jobs { + requireStreamingExtractResultMatchesOracle(t, got[i], want[i]) + requireComplexSlicesClose(t, got[i].IQ, want[i].IQ, sampleTol) + requireExtractStateMatchesOracle(t, runner.streamState[job.SignalID], oracle.States[job.SignalID], phaseTol, sampleTol) + } + requireStateKeysMatchOracle(t, runner.streamState, oracle.States) + } +} + +func runPreparedSequenceAgainstOracle(t *testing.T, runner *BatchRunner, exec streamingPreparedExecutor, steps []streamingValidationStep, sampleTol float64, phaseTol float64) { + t.Helper() + oracle := NewCPUOracleRunner(runner.eng.sampleRate) + for idx, step := range steps { + invocations, err := runner.buildStreamingGPUInvocations(step.iq, step.jobs) + if err != nil { + t.Fatalf("step %d (%s): build invocations failed: %v", idx, step.name, err) + } + got, err := exec(runner, invocations) + if err != nil { + t.Fatalf("step %d (%s): prepared exec failed: %v", idx, step.name, err) + } + want, err := oracle.StreamingExtract(step.iq, step.jobs) + if err != nil { + t.Fatalf("step %d (%s): oracle failed: %v", idx, step.name, err) + } + if len(got) != len(want) { + t.Fatalf("step %d (%s): result count mismatch: got=%d want=%d", idx, step.name, len(got), len(want)) + } + applied := runner.applyStreamingGPUExecutionResults(got) + if len(applied) != len(want) { + t.Fatalf("step %d (%s): applied result count mismatch: got=%d want=%d", idx, step.name, len(applied), len(want)) + } + for i, job := range step.jobs { + oracleState := oracle.States[job.SignalID] + requirePreparedExecutionResultMatchesOracle(t, got[i], want[i], oracleState, sampleTol, phaseTol) + requireStreamingExtractResultMatchesOracle(t, applied[i], want[i]) + requireComplexSlicesClose(t, applied[i].IQ, want[i].IQ, sampleTol) + requireExtractStateMatchesOracle(t, runner.streamState[job.SignalID], oracleState, phaseTol, sampleTol) + } + requireStateKeysMatchOracle(t, runner.streamState, oracle.States) + } +} diff --git a/internal/demod/gpudemod/windows_bridge.go b/internal/demod/gpudemod/windows_bridge.go index 2ff9e98..fbfcc9a 100644 --- a/internal/demod/gpudemod/windows_bridge.go +++ b/internal/demod/gpudemod/windows_bridge.go @@ -4,7 +4,7 @@ package gpudemod /* #cgo windows CFLAGS: -I"C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v13.2/include" -#cgo windows LDFLAGS: -lcudart64_13 -lkernel32 +#cgo windows LDFLAGS: -L"C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v13.2/bin/x64" -l:cudart64_13.dll -lkernel32 #include #include #include @@ -27,6 +27,7 @@ typedef int (__stdcall *gpud_launch_decimate_fn)(const gpud_float2* in, gpud_flo typedef int (__stdcall *gpud_launch_am_envelope_fn)(const gpud_float2* in, float* out, int n); typedef int (__stdcall *gpud_launch_ssb_product_fn)(const gpud_float2* in, float* out, int n, double phase_inc, double phase_start); typedef int (__stdcall *gpud_launch_streaming_polyphase_prepare_fn)(const gpud_float2* in_new, int n_new, const gpud_float2* history_in, int history_len, const float* polyphase_taps, int polyphase_len, int decim, int num_taps, int phase_count_in, double phase_start, double phase_inc, gpud_float2* out, int* n_out, int* phase_count_out, double* phase_end_out, gpud_float2* history_out); +typedef int (__stdcall *gpud_launch_streaming_polyphase_stateful_fn)(const gpud_float2* in_new, int n_new, gpud_float2* shifted_new_tmp, const float* polyphase_taps, int polyphase_len, int decim, int num_taps, gpud_float2* history_state, gpud_float2* history_scratch, int history_cap, int* history_len_io, int* phase_count_state, double* phase_state, double phase_inc, gpud_float2* out, int out_cap, int* n_out); static HMODULE gpud_mod = NULL; static gpud_stream_create_fn gpud_p_stream_create = NULL; @@ -44,6 +45,7 @@ static gpud_launch_decimate_fn gpud_p_launch_decimate = NULL; static gpud_launch_am_envelope_fn gpud_p_launch_am_envelope = NULL; static gpud_launch_ssb_product_fn gpud_p_launch_ssb_product = NULL; static gpud_launch_streaming_polyphase_prepare_fn gpud_p_launch_streaming_polyphase_prepare = NULL; +static gpud_launch_streaming_polyphase_stateful_fn gpud_p_launch_streaming_polyphase_stateful = NULL; static int gpud_cuda_malloc(void **ptr, size_t bytes) { return (int)cudaMalloc(ptr, bytes); } static int gpud_cuda_free(void *ptr) { return (int)cudaFree(ptr); } @@ -70,6 +72,7 @@ static int gpud_load_library(const char* path) { gpud_p_launch_am_envelope = (gpud_launch_am_envelope_fn)GetProcAddress(gpud_mod, "gpud_launch_am_envelope_cuda"); gpud_p_launch_ssb_product = (gpud_launch_ssb_product_fn)GetProcAddress(gpud_mod, "gpud_launch_ssb_product_cuda"); gpud_p_launch_streaming_polyphase_prepare = (gpud_launch_streaming_polyphase_prepare_fn)GetProcAddress(gpud_mod, "gpud_launch_streaming_polyphase_prepare_cuda"); + gpud_p_launch_streaming_polyphase_stateful = (gpud_launch_streaming_polyphase_stateful_fn)GetProcAddress(gpud_mod, "gpud_launch_streaming_polyphase_stateful_cuda"); if (!gpud_p_stream_create || !gpud_p_stream_destroy || !gpud_p_stream_sync || !gpud_p_upload_fir_taps || !gpud_p_launch_freq_shift_stream || !gpud_p_launch_freq_shift || !gpud_p_launch_fm_discrim || !gpud_p_launch_fir_stream || !gpud_p_launch_fir || !gpud_p_launch_decimate_stream || !gpud_p_launch_decimate || !gpud_p_launch_am_envelope || !gpud_p_launch_ssb_product) { FreeLibrary(gpud_mod); gpud_mod = NULL; @@ -93,6 +96,7 @@ static int gpud_launch_decimate(gpud_float2 *in, gpud_float2 *out, int n_out, in static int gpud_launch_am_envelope(gpud_float2 *in, float *out, int n) { if (!gpud_p_launch_am_envelope) return -1; return gpud_p_launch_am_envelope(in, out, n); } static int gpud_launch_ssb_product(gpud_float2 *in, float *out, int n, double phase_inc, double phase_start) { if (!gpud_p_launch_ssb_product) return -1; return gpud_p_launch_ssb_product(in, out, n, phase_inc, phase_start); } static int gpud_launch_streaming_polyphase_prepare(gpud_float2 *in_new, int n_new, gpud_float2 *history_in, int history_len, float *polyphase_taps, int polyphase_len, int decim, int num_taps, int phase_count_in, double phase_start, double phase_inc, gpud_float2 *out, int *n_out, int *phase_count_out, double *phase_end_out, gpud_float2 *history_out) { if (!gpud_p_launch_streaming_polyphase_prepare) return -1; return gpud_p_launch_streaming_polyphase_prepare(in_new, n_new, history_in, history_len, polyphase_taps, polyphase_len, decim, num_taps, phase_count_in, phase_start, phase_inc, out, n_out, phase_count_out, phase_end_out, history_out); } +static int gpud_launch_streaming_polyphase_stateful(gpud_float2 *in_new, int n_new, gpud_float2 *shifted_new_tmp, float *polyphase_taps, int polyphase_len, int decim, int num_taps, gpud_float2 *history_state, gpud_float2 *history_scratch, int history_cap, int *history_len_io, int *phase_count_state, double *phase_state, double phase_inc, gpud_float2 *out, int out_cap, int *n_out) { if (!gpud_p_launch_streaming_polyphase_stateful) return -1; return gpud_p_launch_streaming_polyphase_stateful(in_new, n_new, shifted_new_tmp, polyphase_taps, polyphase_len, decim, num_taps, history_state, history_scratch, history_cap, history_len_io, phase_count_state, phase_state, phase_inc, out, out_cap, n_out); } */ import "C" @@ -107,41 +111,68 @@ func bridgeLoadLibrary(path string) int { defer C.free(unsafe.Pointer(cp)) return int(C.gpud_load_library(cp)) } -func bridgeCudaMalloc(ptr *unsafe.Pointer, bytes uintptr) int { return int(C.gpud_cuda_malloc(ptr, C.size_t(bytes))) } +func bridgeCudaMalloc(ptr *unsafe.Pointer, bytes uintptr) int { + return int(C.gpud_cuda_malloc(ptr, C.size_t(bytes))) +} func bridgeCudaFree(ptr unsafe.Pointer) int { return int(C.gpud_cuda_free(ptr)) } -func bridgeMemcpyH2D(dst unsafe.Pointer, src unsafe.Pointer, bytes uintptr) int { return int(C.gpud_memcpy_h2d(dst, src, C.size_t(bytes))) } -func bridgeMemcpyD2H(dst unsafe.Pointer, src unsafe.Pointer, bytes uintptr) int { return int(C.gpud_memcpy_d2h(dst, src, C.size_t(bytes))) } +func bridgeMemcpyH2D(dst unsafe.Pointer, src unsafe.Pointer, bytes uintptr) int { + return int(C.gpud_memcpy_h2d(dst, src, C.size_t(bytes))) +} +func bridgeMemcpyD2H(dst unsafe.Pointer, src unsafe.Pointer, bytes uintptr) int { + return int(C.gpud_memcpy_d2h(dst, src, C.size_t(bytes))) +} func bridgeDeviceSync() int { return int(C.gpud_device_sync()) } -func bridgeUploadFIRTaps(taps *C.float, n int) int { return int(C.gpud_upload_fir_taps(taps, C.int(n))) } +func bridgeUploadFIRTaps(taps *C.float, n int) int { + return int(C.gpud_upload_fir_taps(taps, C.int(n))) +} func bridgeLaunchFreqShift(in *C.gpud_float2, out *C.gpud_float2, n int, phaseInc float64, phaseStart float64) int { return int(C.gpud_launch_freq_shift(in, out, C.int(n), C.double(phaseInc), C.double(phaseStart))) } func bridgeLaunchFreqShiftStream(in *C.gpud_float2, out *C.gpud_float2, n int, phaseInc float64, phaseStart float64, stream streamHandle) int { return int(C.gpud_launch_freq_shift_stream(in, out, C.int(n), C.double(phaseInc), C.double(phaseStart), C.gpud_stream_handle(stream))) } -func bridgeLaunchFIR(in *C.gpud_float2, out *C.gpud_float2, n int, numTaps int) int { return int(C.gpud_launch_fir(in, out, C.int(n), C.int(numTaps))) } +func bridgeLaunchFIR(in *C.gpud_float2, out *C.gpud_float2, n int, numTaps int) int { + return int(C.gpud_launch_fir(in, out, C.int(n), C.int(numTaps))) +} func bridgeLaunchFIRStream(in *C.gpud_float2, out *C.gpud_float2, n int, numTaps int, stream streamHandle) int { return int(C.gpud_launch_fir_stream(in, out, C.int(n), C.int(numTaps), C.gpud_stream_handle(stream))) } func bridgeLaunchFIRv2Stream(in *C.gpud_float2, out *C.gpud_float2, taps *C.float, n int, numTaps int, stream streamHandle) int { return int(C.gpud_launch_fir_v2_stream(in, out, taps, C.int(n), C.int(numTaps), C.gpud_stream_handle(stream))) } -func bridgeLaunchDecimate(in *C.gpud_float2, out *C.gpud_float2, nOut int, factor int) int { return int(C.gpud_launch_decimate(in, out, C.int(nOut), C.int(factor))) } +func bridgeLaunchDecimate(in *C.gpud_float2, out *C.gpud_float2, nOut int, factor int) int { + return int(C.gpud_launch_decimate(in, out, C.int(nOut), C.int(factor))) +} func bridgeLaunchDecimateStream(in *C.gpud_float2, out *C.gpud_float2, nOut int, factor int, stream streamHandle) int { return int(C.gpud_launch_decimate_stream(in, out, C.int(nOut), C.int(factor), C.gpud_stream_handle(stream))) } -func bridgeLaunchFMDiscrim(in *C.gpud_float2, out *C.float, n int) int { return int(C.gpud_launch_fm_discrim(in, out, C.int(n))) } -func bridgeLaunchAMEnvelope(in *C.gpud_float2, out *C.float, n int) int { return int(C.gpud_launch_am_envelope(in, out, C.int(n))) } +func bridgeLaunchFMDiscrim(in *C.gpud_float2, out *C.float, n int) int { + return int(C.gpud_launch_fm_discrim(in, out, C.int(n))) +} +func bridgeLaunchAMEnvelope(in *C.gpud_float2, out *C.float, n int) int { + return int(C.gpud_launch_am_envelope(in, out, C.int(n))) +} func bridgeLaunchSSBProduct(in *C.gpud_float2, out *C.float, n int, phaseInc float64, phaseStart float64) int { return int(C.gpud_launch_ssb_product(in, out, C.int(n), C.double(phaseInc), C.double(phaseStart))) } + +// bridgeLaunchStreamingPolyphasePrepare is a transitional bridge for the +// legacy single-call prepare path. The stateful native path uses +// bridgeLaunchStreamingPolyphaseStateful. func bridgeLaunchStreamingPolyphasePrepare(inNew *C.gpud_float2, nNew int, historyIn *C.gpud_float2, historyLen int, polyphaseTaps *C.float, polyphaseLen int, decim int, numTaps int, phaseCountIn int, phaseStart float64, phaseInc float64, out *C.gpud_float2, nOut *C.int, phaseCountOut *C.int, phaseEndOut *C.double, historyOut *C.gpud_float2) int { return int(C.gpud_launch_streaming_polyphase_prepare(inNew, C.int(nNew), historyIn, C.int(historyLen), polyphaseTaps, C.int(polyphaseLen), C.int(decim), C.int(numTaps), C.int(phaseCountIn), C.double(phaseStart), C.double(phaseInc), out, nOut, phaseCountOut, phaseEndOut, historyOut)) } +func bridgeLaunchStreamingPolyphaseStateful(inNew *C.gpud_float2, nNew int, shiftedNewTmp *C.gpud_float2, polyphaseTaps *C.float, polyphaseLen int, decim int, numTaps int, historyState *C.gpud_float2, historyScratch *C.gpud_float2, historyCap int, historyLenIO *C.int, phaseCountState *C.int, phaseState *C.double, phaseInc float64, out *C.gpud_float2, outCap int, nOut *C.int) int { + return int(C.gpud_launch_streaming_polyphase_stateful(inNew, C.int(nNew), shiftedNewTmp, polyphaseTaps, C.int(polyphaseLen), C.int(decim), C.int(numTaps), historyState, historyScratch, C.int(historyCap), historyLenIO, phaseCountState, phaseState, C.double(phaseInc), out, C.int(outCap), nOut)) +} func bridgeStreamCreate() (streamHandle, int) { var s C.gpud_stream_handle res := int(C.gpud_stream_create(&s)) return streamHandle(s), res } -func bridgeStreamDestroy(stream streamHandle) int { return int(C.gpud_stream_destroy(C.gpud_stream_handle(stream))) } -func bridgeStreamSync(stream streamHandle) int { return int(C.gpud_stream_sync(C.gpud_stream_handle(stream))) } +func bridgeStreamDestroy(stream streamHandle) int { + return int(C.gpud_stream_destroy(C.gpud_stream_handle(stream))) +} +func bridgeStreamSync(stream streamHandle) int { + return int(C.gpud_stream_sync(C.gpud_stream_handle(stream))) +}