Kaynağa Gözat

feat: parallelize mixed-bandwidth GPU batch demod

master
Jan Svabenik 2 gün önce
ebeveyn
işleme
2363292f4f
6 değiştirilmiş dosya ile 271 ekleme ve 45 silme
  1. +80
    -0
      internal/demod/gpudemod/batch_runner_test.go
  2. +136
    -45
      internal/demod/gpudemod/batch_runner_windows.go
  3. BIN
      internal/demod/gpudemod/build/gpudemod_kernels.exp
  4. BIN
      internal/demod/gpudemod/build/gpudemod_kernels.lib
  5. +48
    -0
      internal/demod/gpudemod/native/exports.cu
  6. +7
    -0
      internal/demod/gpudemod/windows_bridge.go

+ 80
- 0
internal/demod/gpudemod/batch_runner_test.go Dosyayı Görüntüle

@@ -0,0 +1,80 @@
package gpudemod

import (
"math"
"math/cmplx"
"testing"

"sdr-visual-suite/internal/dsp"
)

func TestMixedBandwidthBatch(t *testing.T) {
if !Available() {
t.Skip("no GPU")
}
sampleRate := 2048000
n := 2048
iq := makeSyntheticIQ(n, sampleRate, []float64{50e3, -120e3, 300e3, -80e3})
jobs := []ExtractJob{
{OffsetHz: 50e3, BW: 12000, OutRate: 48000},
{OffsetHz: -120e3, BW: 150000, OutRate: 192000},
{OffsetHz: 300e3, BW: 3000, OutRate: 48000},
{OffsetHz: -80e3, BW: 500, OutRate: 48000},
}
cpuOuts := make([][]complex64, len(jobs))
for i, job := range jobs {
cpuOuts[i] = extractCPU(iq, sampleRate, job)
}
runner, err := NewBatchRunner(n, sampleRate)
if err != nil {
t.Fatalf("NewBatchRunner: %v", err)
}
defer runner.Close()
gpuOuts, rates, err := runner.ShiftFilterDecimateBatch(iq, jobs)
if err != nil {
t.Fatalf("Batch: %v", err)
}
for i := range jobs {
if !complexSliceClose(cpuOuts[i], gpuOuts[i], 1e-3) {
t.Errorf("job %d: GPU/CPU mismatch (rate=%d)", i, rates[i])
}
}
}

func makeSyntheticIQ(n int, sr int, freqs []float64) []complex64 {
iq := make([]complex64, n)
for _, f := range freqs {
for i := range iq {
phase := 2 * math.Pi * f * float64(i) / float64(sr)
iq[i] += complex(float32(math.Cos(phase)), float32(math.Sin(phase)))
}
}
return iq
}

func extractCPU(iq []complex64, sr int, job ExtractJob) []complex64 {
shifted := dsp.FreqShift(iq, sr, job.OffsetHz)
cutoff := job.BW / 2
if cutoff < 200 {
cutoff = 200
}
taps := dsp.LowpassFIR(cutoff, sr, 101)
filtered := dsp.ApplyFIR(shifted, taps)
decim := int(math.Round(float64(sr) / float64(job.OutRate)))
if decim < 1 {
decim = 1
}
return dsp.Decimate(filtered, decim)
}

func complexSliceClose(a, b []complex64, tol float64) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if cmplx.Abs(complex128(a[i]-b[i])) > tol {
return false
}
}
return true
}

+ 136
- 45
internal/demod/gpudemod/batch_runner_windows.go Dosyayı Görüntüle

@@ -2,6 +2,11 @@

package gpudemod

/*
#include <cuda_runtime.h>
*/
import "C"

import (
"errors"
"math"
@@ -10,89 +15,175 @@ import (
"sdr-visual-suite/internal/dsp"
)

type slotBuffers struct {
dShifted unsafe.Pointer
dFiltered unsafe.Pointer
dDecimated unsafe.Pointer
dTaps unsafe.Pointer
stream streamHandle
}

type windowsBatchRunner struct {
*BatchRunner
slotBufs []slotBuffers
}

func asWindowsBatchRunner(r *BatchRunner) *windowsBatchRunner {
return (*windowsBatchRunner)(unsafe.Pointer(r))
}

func (r *windowsBatchRunner) freeSlotBuffers() {
for i := range r.slotBufs {
if r.slotBufs[i].dShifted != nil {
_ = bridgeCudaFree(r.slotBufs[i].dShifted)
r.slotBufs[i].dShifted = nil
}
if r.slotBufs[i].dFiltered != nil {
_ = bridgeCudaFree(r.slotBufs[i].dFiltered)
r.slotBufs[i].dFiltered = nil
}
if r.slotBufs[i].dDecimated != nil {
_ = bridgeCudaFree(r.slotBufs[i].dDecimated)
r.slotBufs[i].dDecimated = nil
}
if r.slotBufs[i].dTaps != nil {
_ = bridgeCudaFree(r.slotBufs[i].dTaps)
r.slotBufs[i].dTaps = nil
}
if r.slotBufs[i].stream != nil {
_ = bridgeStreamDestroy(r.slotBufs[i].stream)
r.slotBufs[i].stream = nil
}
}
r.slotBufs = nil
}

func (r *windowsBatchRunner) allocSlotBuffers(n int) error {
if len(r.slotBufs) == len(r.slots) && len(r.slotBufs) > 0 {
return nil
}
r.freeSlotBuffers()
if len(r.slots) == 0 {
return nil
}
iqBytes := uintptr(n) * unsafe.Sizeof(complex64(0))
tapsBytes := uintptr(256) * unsafe.Sizeof(float32(0))
r.slotBufs = make([]slotBuffers, len(r.slots))
for i := range r.slotBufs {
for _, ptr := range []*unsafe.Pointer{&r.slotBufs[i].dShifted, &r.slotBufs[i].dFiltered, &r.slotBufs[i].dDecimated} {
if bridgeCudaMalloc(ptr, iqBytes) != 0 {
r.freeSlotBuffers()
return errors.New("cudaMalloc slot buffer failed")
}
}
if bridgeCudaMalloc(&r.slotBufs[i].dTaps, tapsBytes) != 0 {
r.freeSlotBuffers()
return errors.New("cudaMalloc slot taps failed")
}
s, res := bridgeStreamCreate()
if res != 0 {
r.freeSlotBuffers()
return errors.New("cudaStreamCreate failed")
}
r.slotBufs[i].stream = s
}
return nil
}

func (r *BatchRunner) shiftFilterDecimateBatchImpl(iq []complex64) ([][]complex64, []int, error) {
wr := asWindowsBatchRunner(r)
e := r.eng
if e == nil || !e.cudaReady {
return nil, nil, ErrUnavailable
}
outs := make([][]complex64, len(r.slots))
rates := make([]int, len(r.slots))
n := len(iq)
if n == 0 {
return outs, rates, nil
}
if err := wr.allocSlotBuffers(n); err != nil {
return nil, nil, err
}
bytesIn := uintptr(n) * unsafe.Sizeof(complex64(0))
if bridgeMemcpyH2D(unsafe.Pointer(e.dIQIn), unsafe.Pointer(&iq[0]), bytesIn) != 0 {
return nil, nil, errors.New("cudaMemcpy H2D failed")
}
for i := range r.slots {
if !r.slots[i].active {
continue
}
out, rate, err := r.shiftFilterDecimateSlot(iq, r.slots[i].job, nil)
nOut, rate, err := r.shiftFilterDecimateSlotParallel(iq, r.slots[i].job, wr.slotBufs[i])
if err != nil {
return nil, nil, err
}
r.slots[i].out = out
r.slots[i].rate = rate
outs[i] = out
outs[i] = make([]complex64, nOut)
rates[i] = rate
}
for i := range r.slots {
if !r.slots[i].active {
continue
}
buf := wr.slotBufs[i]
if bridgeStreamSync(buf.stream) != 0 {
return nil, nil, errors.New("cuda stream sync failed")
}
out := outs[i]
if len(out) == 0 {
continue
}
outBytes := uintptr(len(out)) * unsafe.Sizeof(complex64(0))
if bridgeMemcpyD2H(unsafe.Pointer(&out[0]), buf.dDecimated, outBytes) != 0 {
return nil, nil, errors.New("cudaMemcpy D2H failed")
}
r.slots[i].out = out
}
return outs, rates, nil
}

func (r *BatchRunner) shiftFilterDecimateSlot(iq []complex64, job ExtractJob, stream streamHandle) ([]complex64, int, error) {
func (r *BatchRunner) shiftFilterDecimateSlotParallel(iq []complex64, job ExtractJob, buf slotBuffers) (int, int, error) {
e := r.eng
if e == nil || !e.cudaReady {
return nil, 0, ErrUnavailable
return 0, 0, ErrUnavailable
}
if len(iq) == 0 {
return nil, 0, nil
n := len(iq)
if n == 0 {
return 0, 0, nil
}
cutoff := job.BW / 2
if cutoff < 200 {
cutoff = 200
}
base64 := dsp.LowpassFIR(cutoff, e.sampleRate, 101)
taps := make([]float32, len(base64))
for i, v := range base64 {
base := dsp.LowpassFIR(cutoff, e.sampleRate, 101)
taps := make([]float32, len(base))
for i, v := range base {
taps[i] = float32(v)
}
if len(taps) == 0 {
return nil, 0, errors.New("no FIR taps configured")
return 0, 0, errors.New("no FIR taps configured")
}
e.SetFIR(taps)
if stream == nil {
if bridgeDeviceSync() != 0 {
return nil, 0, errors.New("cudaDeviceSynchronize failed")
}
tapsBytes := uintptr(len(taps)) * unsafe.Sizeof(float32(0))
if bridgeMemcpyH2D(buf.dTaps, unsafe.Pointer(&taps[0]), tapsBytes) != 0 {
return 0, 0, errors.New("taps H2D failed")
}
decim := int(math.Round(float64(e.sampleRate) / float64(job.OutRate)))
if decim < 1 {
decim = 1
}
n := len(iq)
nOut := n / decim
if nOut <= 0 {
return nil, 0, errors.New("not enough output samples after decimation")
}
bytesIn := uintptr(n) * unsafe.Sizeof(complex64(0))
if bridgeMemcpyH2D(unsafe.Pointer(e.dIQIn), unsafe.Pointer(&iq[0]), bytesIn) != 0 {
return nil, 0, errors.New("cudaMemcpy H2D failed")
return 0, 0, errors.New("not enough output samples after decimation")
}
phaseInc := -2.0 * math.Pi * job.OffsetHz / float64(e.sampleRate)
phaseStart := e.phase
if bridgeLaunchFreqShiftStream(e.dIQIn, e.dShifted, n, phaseInc, phaseStart, stream) != 0 {
return nil, 0, errors.New("gpu freq shift failed")
}
if bridgeLaunchFIRStream(e.dShifted, e.dFiltered, n, len(taps), stream) != 0 {
return nil, 0, errors.New("gpu FIR failed")
if bridgeLaunchFreqShiftStream(e.dIQIn, (*gpuFloat2)(buf.dShifted), n, phaseInc, e.phase, buf.stream) != 0 {
return 0, 0, errors.New("gpu freq shift failed")
}
if bridgeLaunchDecimateStream(e.dFiltered, e.dDecimated, nOut, decim, stream) != 0 {
return nil, 0, errors.New("gpu decimate failed")
}
if stream != nil {
if bridgeStreamSync(stream) != 0 {
return nil, 0, errors.New("cuda stream sync failed")
}
} else {
if bridgeDeviceSync() != 0 {
return nil, 0, errors.New("cudaDeviceSynchronize failed")
}
if bridgeLaunchFIRv2Stream((*gpuFloat2)(buf.dShifted), (*gpuFloat2)(buf.dFiltered), (*C.float)(buf.dTaps), n, len(taps), buf.stream) != 0 {
return 0, 0, errors.New("gpu FIR v2 failed")
}
out := make([]complex64, nOut)
outBytes := uintptr(nOut) * unsafe.Sizeof(complex64(0))
if bridgeMemcpyD2H(unsafe.Pointer(&out[0]), unsafe.Pointer(e.dDecimated), outBytes) != 0 {
return nil, 0, errors.New("cudaMemcpy D2H failed")
if bridgeLaunchDecimateStream((*gpuFloat2)(buf.dFiltered), (*gpuFloat2)(buf.dDecimated), nOut, decim, buf.stream) != 0 {
return 0, 0, errors.New("gpu decimate failed")
}
e.phase = phaseStart + phaseInc*float64(n)
return out, e.sampleRate / decim, nil
return nOut, e.sampleRate / decim, nil
}

BIN
internal/demod/gpudemod/build/gpudemod_kernels.exp Dosyayı Görüntüle


BIN
internal/demod/gpudemod/build/gpudemod_kernels.lib Dosyayı Görüntüle


+ 48
- 0
internal/demod/gpudemod/native/exports.cu Dosyayı Görüntüle

@@ -170,6 +170,54 @@ GPUD_API int GPUD_CALL gpud_launch_fir_cuda(
return (int)cudaGetLastError();
}

__global__ void gpud_fir_kernel_v2(
const float2* __restrict__ in,
float2* __restrict__ out,
const float* __restrict__ taps,
int n,
int num_taps
) {
extern __shared__ float2 s_data[];
int gid = blockIdx.x * blockDim.x + threadIdx.x;
int lid = threadIdx.x;
int halo = num_taps - 1;

if (gid < n) s_data[lid + halo] = in[gid];
else s_data[lid + halo] = make_float2(0.0f, 0.0f);

if (lid < halo) {
int src = gid - halo;
s_data[lid] = (src >= 0) ? in[src] : make_float2(0.0f, 0.0f);
}
__syncthreads();
if (gid >= n) return;

float acc_r = 0.0f, acc_i = 0.0f;
for (int k = 0; k < num_taps; ++k) {
float2 v = s_data[lid + halo - k];
float t = taps[k];
acc_r += v.x * t;
acc_i += v.y * t;
}
out[gid] = make_float2(acc_r, acc_i);
}

GPUD_API int GPUD_CALL gpud_launch_fir_v2_stream_cuda(
const float2* in,
float2* out,
const float* taps,
int n,
int num_taps,
gpud_stream_handle stream
) {
if (n <= 0 || num_taps <= 0 || num_taps > 256) return 0;
const int block = 256;
const int grid = (n + block - 1) / block;
size_t sharedBytes = (size_t)(block + num_taps - 1) * sizeof(float2);
gpud_fir_kernel_v2<<<grid, block, sharedBytes, (cudaStream_t)stream>>>(in, out, taps, n, num_taps);
return (int)cudaGetLastError();
}

GPUD_API int GPUD_CALL gpud_launch_decimate_cuda(
const float2* in,
float2* out,


+ 7
- 0
internal/demod/gpudemod/windows_bridge.go Dosyayı Görüntüle

@@ -20,6 +20,7 @@ typedef int (__stdcall *gpud_launch_freq_shift_stream_fn)(const gpud_float2* in,
typedef int (__stdcall *gpud_launch_freq_shift_fn)(const gpud_float2* in, gpud_float2* out, int n, double phase_inc, double phase_start);
typedef int (__stdcall *gpud_launch_fm_discrim_fn)(const gpud_float2* in, float* out, int n);
typedef int (__stdcall *gpud_launch_fir_stream_fn)(const gpud_float2* in, gpud_float2* out, int n, int num_taps, gpud_stream_handle stream);
typedef int (__stdcall *gpud_launch_fir_v2_stream_fn)(const gpud_float2* in, gpud_float2* out, const float* taps, int n, int num_taps, gpud_stream_handle stream);
typedef int (__stdcall *gpud_launch_fir_fn)(const gpud_float2* in, gpud_float2* out, int n, int num_taps);
typedef int (__stdcall *gpud_launch_decimate_stream_fn)(const gpud_float2* in, gpud_float2* out, int n_out, int factor, gpud_stream_handle stream);
typedef int (__stdcall *gpud_launch_decimate_fn)(const gpud_float2* in, gpud_float2* out, int n_out, int factor);
@@ -35,6 +36,7 @@ static gpud_launch_freq_shift_stream_fn gpud_p_launch_freq_shift_stream = NULL;
static gpud_launch_freq_shift_fn gpud_p_launch_freq_shift = NULL;
static gpud_launch_fm_discrim_fn gpud_p_launch_fm_discrim = NULL;
static gpud_launch_fir_stream_fn gpud_p_launch_fir_stream = NULL;
static gpud_launch_fir_v2_stream_fn gpud_p_launch_fir_v2_stream = NULL;
static gpud_launch_fir_fn gpud_p_launch_fir = NULL;
static gpud_launch_decimate_stream_fn gpud_p_launch_decimate_stream = NULL;
static gpud_launch_decimate_fn gpud_p_launch_decimate = NULL;
@@ -59,6 +61,7 @@ static int gpud_load_library(const char* path) {
gpud_p_launch_freq_shift = (gpud_launch_freq_shift_fn)GetProcAddress(gpud_mod, "gpud_launch_freq_shift_cuda");
gpud_p_launch_fm_discrim = (gpud_launch_fm_discrim_fn)GetProcAddress(gpud_mod, "gpud_launch_fm_discrim_cuda");
gpud_p_launch_fir_stream = (gpud_launch_fir_stream_fn)GetProcAddress(gpud_mod, "gpud_launch_fir_stream_cuda");
gpud_p_launch_fir_v2_stream = (gpud_launch_fir_v2_stream_fn)GetProcAddress(gpud_mod, "gpud_launch_fir_v2_stream_cuda");
gpud_p_launch_fir = (gpud_launch_fir_fn)GetProcAddress(gpud_mod, "gpud_launch_fir_cuda");
gpud_p_launch_decimate_stream = (gpud_launch_decimate_stream_fn)GetProcAddress(gpud_mod, "gpud_launch_decimate_stream_cuda");
gpud_p_launch_decimate = (gpud_launch_decimate_fn)GetProcAddress(gpud_mod, "gpud_launch_decimate_cuda");
@@ -80,6 +83,7 @@ static int gpud_launch_freq_shift_stream(gpud_float2 *in, gpud_float2 *out, int
static int gpud_launch_freq_shift(gpud_float2 *in, gpud_float2 *out, int n, double phase_inc, double phase_start) { if (!gpud_p_launch_freq_shift) return -1; return gpud_p_launch_freq_shift(in, out, n, phase_inc, phase_start); }
static int gpud_launch_fm_discrim(gpud_float2 *in, float *out, int n) { if (!gpud_p_launch_fm_discrim) return -1; return gpud_p_launch_fm_discrim(in, out, n); }
static int gpud_launch_fir_stream(gpud_float2 *in, gpud_float2 *out, int n, int num_taps, gpud_stream_handle stream) { if (!gpud_p_launch_fir_stream) return -1; return gpud_p_launch_fir_stream(in, out, n, num_taps, stream); }
static int gpud_launch_fir_v2_stream(gpud_float2 *in, gpud_float2 *out, const float *taps, int n, int num_taps, gpud_stream_handle stream) { if (!gpud_p_launch_fir_v2_stream) return -1; return gpud_p_launch_fir_v2_stream(in, out, taps, n, num_taps, stream); }
static int gpud_launch_fir(gpud_float2 *in, gpud_float2 *out, int n, int num_taps) { if (!gpud_p_launch_fir) return -1; return gpud_p_launch_fir(in, out, n, num_taps); }
static int gpud_launch_decimate_stream(gpud_float2 *in, gpud_float2 *out, int n_out, int factor, gpud_stream_handle stream) { if (!gpud_p_launch_decimate_stream) return -1; return gpud_p_launch_decimate_stream(in, out, n_out, factor, stream); }
static int gpud_launch_decimate(gpud_float2 *in, gpud_float2 *out, int n_out, int factor) { if (!gpud_p_launch_decimate) return -1; return gpud_p_launch_decimate(in, out, n_out, factor); }
@@ -115,6 +119,9 @@ func bridgeLaunchFIR(in *C.gpud_float2, out *C.gpud_float2, n int, numTaps int)
func bridgeLaunchFIRStream(in *C.gpud_float2, out *C.gpud_float2, n int, numTaps int, stream streamHandle) int {
return int(C.gpud_launch_fir_stream(in, out, C.int(n), C.int(numTaps), C.gpud_stream_handle(stream)))
}
func bridgeLaunchFIRv2Stream(in *C.gpud_float2, out *C.gpud_float2, taps *C.float, n int, numTaps int, stream streamHandle) int {
return int(C.gpud_launch_fir_v2_stream(in, out, taps, C.int(n), C.int(numTaps), C.gpud_stream_handle(stream)))
}
func bridgeLaunchDecimate(in *C.gpud_float2, out *C.gpud_float2, nOut int, factor int) int { return int(C.gpud_launch_decimate(in, out, C.int(nOut), C.int(factor))) }
func bridgeLaunchDecimateStream(in *C.gpud_float2, out *C.gpud_float2, nOut int, factor int, stream streamHandle) int {
return int(C.gpud_launch_decimate_stream(in, out, C.int(nOut), C.int(factor), C.gpud_stream_handle(stream)))


Yükleniyor…
İptal
Kaydet