Просмотр исходного кода

Add refinement detail FFT and arbitration scaffolding

master
Jan Svabenik 3 часов назад
Родитель
Сommit
9671722691
15 измененных файлов: 234 добавлений и 40 удалений
  1. +13
    -0
      cmd/sdrd/arbitration_snapshot.go
  2. +2
    -3
      cmd/sdrd/dsp_loop.go
  3. +3
    -0
      cmd/sdrd/http_handlers.go
  4. +71
    -16
      cmd/sdrd/pipeline_runtime.go
  5. +15
    -6
      cmd/sdrd/types.go
  6. +13
    -0
      internal/config/config.go
  7. +3
    -0
      internal/config/config_test.go
  8. +13
    -2
      internal/pipeline/budget.go
  9. +1
    -0
      internal/pipeline/phases.go
  10. +9
    -0
      internal/pipeline/policy.go
  11. +7
    -0
      internal/pipeline/policy_test.go
  12. +39
    -12
      internal/pipeline/scheduler.go
  13. +18
    -0
      internal/pipeline/scheduler_test.go
  14. +11
    -0
      internal/runtime/runtime.go
  15. +16
    -1
      internal/runtime/runtime_test.go

+ 13
- 0
cmd/sdrd/arbitration_snapshot.go Просмотреть файл

@@ -0,0 +1,13 @@
package main

import "sdr-wideband-suite/internal/pipeline"

func buildArbitrationSnapshot(step pipeline.RefinementStep, queue decisionQueueStats) *ArbitrationSnapshot {
return &ArbitrationSnapshot{
Budgets: &step.Input.Budgets,
RefinementPlan: &step.Input.Plan,
Queue: queue,
DecisionSummary: summarizeDecisions(step.Result.Decisions),
DecisionItems: compactDecisions(step.Result.Decisions),
}
}

+ 2
- 3
cmd/sdrd/dsp_loop.go Просмотреть файл

@@ -159,9 +159,8 @@ func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *
refinementDebug.Windows = windowStats refinementDebug.Windows = windowStats
} }
refinementDebug.Queue = state.queueStats refinementDebug.Queue = state.queueStats
policy := pipeline.PolicyFromConfig(rt.cfg)
budget := pipeline.BudgetModelFromPolicy(policy)
refinementDebug.Budgets = &budget
refinementDebug.Budgets = &state.refinement.Input.Budgets
refinementDebug.Arbitration = buildArbitrationSnapshot(state.refinement, state.queueStats)
debugInfo.Refinement = refinementDebug debugInfo.Refinement = refinementDebug
debugInfo.Decisions = &DecisionDebug{ debugInfo.Decisions = &DecisionDebug{
Summary: summarizeDecisions(state.refinement.Result.Decisions), Summary: summarizeDecisions(state.refinement.Result.Decisions),


+ 3
- 0
cmd/sdrd/http_handlers.go Просмотреть файл

@@ -152,6 +152,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime
"auto_record_classes": policy.AutoRecordClasses, "auto_record_classes": policy.AutoRecordClasses,
"auto_decode_classes": policy.AutoDecodeClasses, "auto_decode_classes": policy.AutoDecodeClasses,
"refinement_jobs": policy.MaxRefinementJobs, "refinement_jobs": policy.MaxRefinementJobs,
"refinement_detail_fft": policy.RefinementDetailFFTSize,
"refinement_auto_span": policy.RefinementAutoSpan, "refinement_auto_span": policy.RefinementAutoSpan,
"refinement_min_span_hz": policy.RefinementMinSpanHz, "refinement_min_span_hz": policy.RefinementMinSpanHz,
"refinement_max_span_hz": policy.RefinementMaxSpanHz, "refinement_max_span_hz": policy.RefinementMaxSpanHz,
@@ -163,6 +164,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
snap := phaseSnap.Snapshot() snap := phaseSnap.Snapshot()
windowStats := buildWindowStats(snap.refinement.Input.Windows) windowStats := buildWindowStats(snap.refinement.Input.Windows)
arbitration := buildArbitrationSnapshot(snap.refinement, snap.queueStats)
out := map[string]any{ out := map[string]any{
"plan": snap.refinement.Input.Plan, "plan": snap.refinement.Input.Plan,
"windows": snap.refinement.Input.Windows, "windows": snap.refinement.Input.Windows,
@@ -172,6 +174,7 @@ func registerAPIHandlers(mux *http.ServeMux, cfgPath string, cfgManager *runtime
"context": snap.refinement.Input.Context, "context": snap.refinement.Input.Context,
"detail_level": snap.refinement.Input.Detail, "detail_level": snap.refinement.Input.Detail,
"budgets": snap.refinement.Input.Budgets, "budgets": snap.refinement.Input.Budgets,
"arbitration": arbitration,
"work_items": snap.refinement.Input.WorkItems, "work_items": snap.refinement.Input.WorkItems,
"candidates": len(snap.refinement.Input.Candidates), "candidates": len(snap.refinement.Input.Candidates),
"scheduled": len(snap.refinement.Input.Scheduled), "scheduled": len(snap.refinement.Input.Scheduled),


+ 71
- 16
cmd/sdrd/pipeline_runtime.go Просмотреть файл

@@ -32,6 +32,9 @@ type dspRuntime struct {
det *detector.Detector det *detector.Detector
window []float64 window []float64
plan *fftutil.CmplxPlan plan *fftutil.CmplxPlan
detailWindow []float64
detailPlan *fftutil.CmplxPlan
detailFFT int
dcEnabled bool dcEnabled bool
iqEnabled bool iqEnabled bool
useGPU bool useGPU bool
@@ -58,11 +61,18 @@ type spectrumArtifacts struct {
} }


func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime { func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64, gpuState *gpuStatus) *dspRuntime {
detailFFT := cfg.Refinement.DetailFFTSize
if detailFFT <= 0 {
detailFFT = cfg.FFTSize
}
rt := &dspRuntime{ rt := &dspRuntime{
cfg: cfg, cfg: cfg,
det: det, det: det,
window: window, window: window,
plan: fftutil.NewCmplxPlan(cfg.FFTSize), plan: fftutil.NewCmplxPlan(cfg.FFTSize),
detailWindow: fftutil.Hann(detailFFT),
detailPlan: fftutil.NewCmplxPlan(detailFFT),
detailFFT: detailFFT,
dcEnabled: cfg.DCBlock, dcEnabled: cfg.DCBlock,
iqEnabled: cfg.IQBalance, iqEnabled: cfg.IQBalance,
useGPU: cfg.UseGPUFFT, useGPU: cfg.UseGPUFFT,
@@ -88,6 +98,7 @@ func newDSPRuntime(cfg config.Config, det *detector.Detector, window []float64,
func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) { func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *recorder.Manager, gpuState *gpuStatus) {
prevFFT := rt.cfg.FFTSize prevFFT := rt.cfg.FFTSize
prevUseGPU := rt.useGPU prevUseGPU := rt.useGPU
prevDetailFFT := rt.detailFFT
rt.cfg = upd.cfg rt.cfg = upd.cfg
if rec != nil { if rec != nil {
rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{ rec.Update(rt.cfg.SampleRate, rt.cfg.FFTSize, recorder.Policy{
@@ -116,6 +127,15 @@ func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *rec
rt.window = upd.window rt.window = upd.window
rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize) rt.plan = fftutil.NewCmplxPlan(rt.cfg.FFTSize)
} }
detailFFT := rt.cfg.Refinement.DetailFFTSize
if detailFFT <= 0 {
detailFFT = rt.cfg.FFTSize
}
if detailFFT != prevDetailFFT {
rt.detailFFT = detailFFT
rt.detailWindow = fftutil.Hann(detailFFT)
rt.detailPlan = fftutil.NewCmplxPlan(detailFFT)
}
rt.dcEnabled = upd.dcBlock rt.dcEnabled = upd.dcBlock
rt.iqEnabled = upd.iqBalance rt.iqEnabled = upd.iqBalance
if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU { if rt.cfg.FFTSize != prevFFT || rt.cfg.UseGPUFFT != prevUseGPU {
@@ -147,15 +167,19 @@ func (rt *dspRuntime) applyUpdate(upd dspUpdate, srcMgr *sourceManager, rec *rec
} }


func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 { func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []float64 {
return rt.spectrumFromIQWithPlan(iq, rt.window, rt.plan, gpuState, true)
}

func (rt *dspRuntime) spectrumFromIQWithPlan(iq []complex64, window []float64, plan *fftutil.CmplxPlan, gpuState *gpuStatus, allowGPU bool) []float64 {
if len(iq) == 0 { if len(iq) == 0 {
return nil return nil
} }
if rt.useGPU && rt.gpuEngine != nil {
if allowGPU && rt.useGPU && rt.gpuEngine != nil {
gpuBuf := make([]complex64, len(iq)) gpuBuf := make([]complex64, len(iq))
if len(rt.window) == len(iq) {
if len(window) == len(iq) {
for i := 0; i < len(iq); i++ { for i := 0; i < len(iq); i++ {
v := iq[i] v := iq[i]
w := float32(rt.window[i])
w := float32(window[i])
gpuBuf[i] = complex(real(v)*w, imag(v)*w) gpuBuf[i] = complex(real(v)*w, imag(v)*w)
} }
} else { } else {
@@ -167,20 +191,24 @@ func (rt *dspRuntime) spectrumFromIQ(iq []complex64, gpuState *gpuStatus) []floa
gpuState.set(false, err) gpuState.set(false, err)
} }
rt.useGPU = false rt.useGPU = false
return fftutil.SpectrumWithPlan(gpuBuf, nil, rt.plan)
return fftutil.SpectrumWithPlan(gpuBuf, nil, plan)
} }
return fftutil.SpectrumFromFFT(out) return fftutil.SpectrumFromFFT(out)
} }
return fftutil.SpectrumWithPlan(iq, rt.window, rt.plan)
return fftutil.SpectrumWithPlan(iq, window, plan)
} }


func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) { func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manager, dcBlocker *dsp.DCBlocker, gpuState *gpuStatus) (*spectrumArtifacts, error) {
available := rt.cfg.FFTSize
required := rt.cfg.FFTSize
if rt.detailFFT > required {
required = rt.detailFFT
}
available := required
st := srcMgr.Stats() st := srcMgr.Stats()
if st.BufferSamples > rt.cfg.FFTSize {
available = (st.BufferSamples / rt.cfg.FFTSize) * rt.cfg.FFTSize
if available < rt.cfg.FFTSize {
available = rt.cfg.FFTSize
if st.BufferSamples > required {
available = (st.BufferSamples / required) * required
if available < required {
available = required
} }
} }
allIQ, err := srcMgr.ReadIQ(available) allIQ, err := srcMgr.ReadIQ(available)
@@ -194,11 +222,19 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag
if len(allIQ) > rt.cfg.FFTSize { if len(allIQ) > rt.cfg.FFTSize {
survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:] survIQ = allIQ[len(allIQ)-rt.cfg.FFTSize:]
} }
detailIQ := survIQ
if rt.detailFFT > 0 && len(allIQ) >= rt.detailFFT {
detailIQ = allIQ[len(allIQ)-rt.detailFFT:]
}
if rt.dcEnabled { if rt.dcEnabled {
dcBlocker.Apply(survIQ)
dcBlocker.Apply(allIQ)
} }
if rt.iqEnabled { if rt.iqEnabled {
dsp.IQBalance(survIQ) dsp.IQBalance(survIQ)
if !sameIQBuffer(detailIQ, survIQ) {
detailIQ = append([]complex64(nil), detailIQ...)
dsp.IQBalance(detailIQ)
}
} }
survSpectrum := rt.spectrumFromIQ(survIQ, gpuState) survSpectrum := rt.spectrumFromIQ(survIQ, gpuState)
for i := range survSpectrum { for i := range survSpectrum {
@@ -206,10 +242,9 @@ func (rt *dspRuntime) captureSpectrum(srcMgr *sourceManager, rec *recorder.Manag
survSpectrum[i] = -200 survSpectrum[i] = -200
} }
} }
detailIQ := survIQ
detailSpectrum := survSpectrum detailSpectrum := survSpectrum
if !sameIQBuffer(detailIQ, survIQ) { if !sameIQBuffer(detailIQ, survIQ) {
detailSpectrum = rt.spectrumFromIQ(detailIQ, gpuState)
detailSpectrum = rt.spectrumFromIQWithPlan(detailIQ, rt.detailWindow, rt.detailPlan, gpuState, false)
for i := range detailSpectrum { for i := range detailSpectrum {
if math.IsNaN(detailSpectrum[i]) || math.IsInf(detailSpectrum[i], 0) { if math.IsNaN(detailSpectrum[i]) || math.IsInf(detailSpectrum[i], 0) {
detailSpectrum[i] = -200 detailSpectrum[i] = -200
@@ -318,6 +353,10 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip
workItems[idx].Window = window workItems[idx].Window = window
} }
} }
detailFFT := rt.cfg.Refinement.DetailFFTSize
if detailFFT <= 0 {
detailFFT = rt.cfg.FFTSize
}
levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate)) levelSpan := spanForPolicy(policy, float64(rt.cfg.SampleRate))
if _, maxSpan, ok := windowSpanBounds(windows); ok { if _, maxSpan, ok := windowSpanBounds(windows); ok {
levelSpan = maxSpan levelSpan = maxSpan
@@ -327,7 +366,7 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip
Role: "refinement", Role: "refinement",
Truth: "refinement", Truth: "refinement",
SampleRate: rt.cfg.SampleRate, SampleRate: rt.cfg.SampleRate,
FFTSize: rt.cfg.FFTSize,
FFTSize: detailFFT,
CenterHz: rt.cfg.CenterHz, CenterHz: rt.cfg.CenterHz,
SpanHz: levelSpan, SpanHz: levelSpan,
Source: "refinement-window", Source: "refinement-window",
@@ -337,11 +376,27 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip
Role: "detail", Role: "detail",
Truth: "refinement", Truth: "refinement",
SampleRate: rt.cfg.SampleRate, SampleRate: rt.cfg.SampleRate,
FFTSize: rt.cfg.FFTSize,
FFTSize: detailFFT,
CenterHz: rt.cfg.CenterHz, CenterHz: rt.cfg.CenterHz,
SpanHz: levelSpan, SpanHz: levelSpan,
Source: "detail-spectrum", Source: "detail-spectrum",
} }
if len(workItems) > 0 {
for i := range workItems {
item := &workItems[i]
if item.Window.SpanHz <= 0 {
continue
}
item.Execution = &pipeline.RefinementExecution{
Stage: "refine",
SampleRate: rt.cfg.SampleRate,
FFTSize: detailFFT,
CenterHz: item.Window.CenterHz,
SpanHz: item.Window.SpanHz,
Source: detailLevel.Source,
}
}
}
input := pipeline.RefinementInput{ input := pipeline.RefinementInput{
Level: level, Level: level,
Detail: detailLevel, Detail: detailLevel,
@@ -354,7 +409,7 @@ func (rt *dspRuntime) buildRefinementInput(surv pipeline.SurveillanceResult) pip
Plan: plan, Plan: plan,
Windows: windows, Windows: windows,
SampleRate: rt.cfg.SampleRate, SampleRate: rt.cfg.SampleRate,
FFTSize: rt.cfg.FFTSize,
FFTSize: detailFFT,
CenterHz: rt.cfg.CenterHz, CenterHz: rt.cfg.CenterHz,
Source: "surveillance-detector", Source: "surveillance-detector",
} }


+ 15
- 6
cmd/sdrd/types.go Просмотреть файл

@@ -32,12 +32,13 @@ type RefinementWindowStats struct {
} }


type RefinementDebug struct { type RefinementDebug struct {
Plan *pipeline.RefinementPlan `json:"plan,omitempty"`
Request *pipeline.RefinementRequest `json:"request,omitempty"`
WorkItems []pipeline.RefinementWorkItem `json:"work_items,omitempty"`
Windows *RefinementWindowStats `json:"windows,omitempty"`
Queue decisionQueueStats `json:"queue,omitempty"`
Budgets *pipeline.BudgetModel `json:"budgets,omitempty"`
Plan *pipeline.RefinementPlan `json:"plan,omitempty"`
Request *pipeline.RefinementRequest `json:"request,omitempty"`
WorkItems []pipeline.RefinementWorkItem `json:"work_items,omitempty"`
Windows *RefinementWindowStats `json:"windows,omitempty"`
Queue decisionQueueStats `json:"queue,omitempty"`
Budgets *pipeline.BudgetModel `json:"budgets,omitempty"`
Arbitration *ArbitrationSnapshot `json:"arbitration,omitempty"`
} }


type DecisionDebug struct { type DecisionDebug struct {
@@ -45,6 +46,14 @@ type DecisionDebug struct {
Items []compactDecision `json:"items,omitempty"` Items []compactDecision `json:"items,omitempty"`
} }


type ArbitrationSnapshot struct {
Budgets *pipeline.BudgetModel `json:"budgets,omitempty"`
RefinementPlan *pipeline.RefinementPlan `json:"refinement_plan,omitempty"`
Queue decisionQueueStats `json:"queue,omitempty"`
DecisionSummary decisionSummary `json:"decision_summary,omitempty"`
DecisionItems []compactDecision `json:"decision_items,omitempty"`
}

type SpectrumFrame struct { type SpectrumFrame struct {
Timestamp int64 `json:"ts"` Timestamp int64 `json:"ts"`
CenterHz float64 `json:"center_hz"` CenterHz float64 `json:"center_hz"`


+ 13
- 0
internal/config/config.go Просмотреть файл

@@ -97,6 +97,7 @@ type SurveillanceConfig struct {
type RefinementConfig struct { type RefinementConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"` Enabled bool `yaml:"enabled" json:"enabled"`
MaxConcurrent int `yaml:"max_concurrent" json:"max_concurrent"` MaxConcurrent int `yaml:"max_concurrent" json:"max_concurrent"`
DetailFFTSize int `yaml:"detail_fft_size" json:"detail_fft_size"`
MinCandidateSNRDb float64 `yaml:"min_candidate_snr_db" json:"min_candidate_snr_db"` MinCandidateSNRDb float64 `yaml:"min_candidate_snr_db" json:"min_candidate_snr_db"`
MinSpanHz float64 `yaml:"min_span_hz" json:"min_span_hz"` MinSpanHz float64 `yaml:"min_span_hz" json:"min_span_hz"`
MaxSpanHz float64 `yaml:"max_span_hz" json:"max_span_hz"` MaxSpanHz float64 `yaml:"max_span_hz" json:"max_span_hz"`
@@ -178,6 +179,7 @@ func Default() Config {
Refinement: RefinementConfig{ Refinement: RefinementConfig{
Enabled: true, Enabled: true,
MaxConcurrent: 8, MaxConcurrent: 8,
DetailFFTSize: 0,
MinCandidateSNRDb: 0, MinCandidateSNRDb: 0,
MinSpanHz: 0, MinSpanHz: 0,
MaxSpanHz: 0, MaxSpanHz: 0,
@@ -205,6 +207,7 @@ func Default() Config {
Refinement: &RefinementConfig{ Refinement: &RefinementConfig{
Enabled: true, Enabled: true,
MaxConcurrent: 8, MaxConcurrent: 8,
DetailFFTSize: 0,
MinCandidateSNRDb: 0, MinCandidateSNRDb: 0,
MinSpanHz: 0, MinSpanHz: 0,
MaxSpanHz: 0, MaxSpanHz: 0,
@@ -235,6 +238,7 @@ func Default() Config {
Refinement: &RefinementConfig{ Refinement: &RefinementConfig{
Enabled: true, Enabled: true,
MaxConcurrent: 16, MaxConcurrent: 16,
DetailFFTSize: 0,
MinCandidateSNRDb: 0, MinCandidateSNRDb: 0,
MinSpanHz: 4000, MinSpanHz: 4000,
MaxSpanHz: 200000, MaxSpanHz: 200000,
@@ -265,6 +269,7 @@ func Default() Config {
Refinement: &RefinementConfig{ Refinement: &RefinementConfig{
Enabled: true, Enabled: true,
MaxConcurrent: 32, MaxConcurrent: 32,
DetailFFTSize: 0,
MinCandidateSNRDb: 0, MinCandidateSNRDb: 0,
MinSpanHz: 6000, MinSpanHz: 6000,
MaxSpanHz: 250000, MaxSpanHz: 250000,
@@ -295,6 +300,7 @@ func Default() Config {
Refinement: &RefinementConfig{ Refinement: &RefinementConfig{
Enabled: true, Enabled: true,
MaxConcurrent: 12, MaxConcurrent: 12,
DetailFFTSize: 0,
MinCandidateSNRDb: 0, MinCandidateSNRDb: 0,
MinSpanHz: 4000, MinSpanHz: 4000,
MaxSpanHz: 200000, MaxSpanHz: 200000,
@@ -325,6 +331,7 @@ func Default() Config {
Refinement: &RefinementConfig{ Refinement: &RefinementConfig{
Enabled: true, Enabled: true,
MaxConcurrent: 16, MaxConcurrent: 16,
DetailFFTSize: 0,
MinCandidateSNRDb: 0, MinCandidateSNRDb: 0,
MinSpanHz: 3000, MinSpanHz: 3000,
MaxSpanHz: 120000, MaxSpanHz: 120000,
@@ -503,6 +510,12 @@ func applyDefaults(cfg Config) Config {
if cfg.Refinement.MaxConcurrent <= 0 { if cfg.Refinement.MaxConcurrent <= 0 {
cfg.Refinement.MaxConcurrent = 8 cfg.Refinement.MaxConcurrent = 8
} }
if cfg.Refinement.DetailFFTSize <= 0 {
cfg.Refinement.DetailFFTSize = cfg.Surveillance.AnalysisFFTSize
}
if cfg.Refinement.DetailFFTSize&(cfg.Refinement.DetailFFTSize-1) != 0 {
cfg.Refinement.DetailFFTSize = cfg.Surveillance.AnalysisFFTSize
}
if cfg.Refinement.MinSpanHz < 0 { if cfg.Refinement.MinSpanHz < 0 {
cfg.Refinement.MinSpanHz = 0 cfg.Refinement.MinSpanHz = 0
} }


+ 3
- 0
internal/config/config_test.go Просмотреть файл

@@ -44,6 +44,9 @@ func TestLoadConfig(t *testing.T) {
if cfg.Refinement.AutoSpan == nil || !*cfg.Refinement.AutoSpan { if cfg.Refinement.AutoSpan == nil || !*cfg.Refinement.AutoSpan {
t.Fatalf("refinement auto_span default not applied") t.Fatalf("refinement auto_span default not applied")
} }
if cfg.Refinement.DetailFFTSize != cfg.Surveillance.AnalysisFFTSize {
t.Fatalf("refinement detail fft not aligned: %d vs %d", cfg.Refinement.DetailFFTSize, cfg.Surveillance.AnalysisFFTSize)
}
if cfg.EventPath == "" { if cfg.EventPath == "" {
t.Fatalf("event path default not applied") t.Fatalf("event path default not applied")
} }


+ 13
- 2
internal/pipeline/budget.go Просмотреть файл

@@ -20,10 +20,11 @@ type BudgetModel struct {


func BudgetModelFromPolicy(policy Policy) BudgetModel { func BudgetModelFromPolicy(policy Policy) BudgetModel {
recordBias, decodeBias := budgetIntentBias(policy.Intent) recordBias, decodeBias := budgetIntentBias(policy.Intent)
refBudget, refSource := refinementBudgetFromPolicy(policy)
return BudgetModel{ return BudgetModel{
Refinement: BudgetQueue{ Refinement: BudgetQueue{
Max: policy.MaxRefinementJobs,
Source: "resources.max_refinement_jobs",
Max: refBudget,
Source: refSource,
}, },
Record: BudgetQueue{ Record: BudgetQueue{
Max: policy.MaxRecordingStreams, Max: policy.MaxRecordingStreams,
@@ -42,6 +43,16 @@ func BudgetModelFromPolicy(policy Policy) BudgetModel {
} }
} }


func refinementBudgetFromPolicy(policy Policy) (int, string) {
budget := policy.MaxRefinementJobs
source := "resources.max_refinement_jobs"
if policy.RefinementMaxConcurrent > 0 && (budget <= 0 || policy.RefinementMaxConcurrent < budget) {
budget = policy.RefinementMaxConcurrent
source = "refinement.max_concurrent"
}
return budget, source
}

func budgetIntentBias(intent string) (float64, float64) { func budgetIntentBias(intent string) (float64, float64) {
if intent == "" { if intent == "" {
return 0, 0 return 0, 0


+ 1
- 0
internal/pipeline/phases.go Просмотреть файл

@@ -38,6 +38,7 @@ type RefinementPlan struct {
TotalCandidates int `json:"total_candidates"` TotalCandidates int `json:"total_candidates"`
MinCandidateSNRDb float64 `json:"min_candidate_snr_db"` MinCandidateSNRDb float64 `json:"min_candidate_snr_db"`
Budget int `json:"budget"` Budget int `json:"budget"`
BudgetSource string `json:"budget_source,omitempty"`
Strategy string `json:"strategy,omitempty"` Strategy string `json:"strategy,omitempty"`
StrategyReason string `json:"strategy_reason,omitempty"` StrategyReason string `json:"strategy_reason,omitempty"`
MonitorStartHz float64 `json:"monitor_start_hz,omitempty"` MonitorStartHz float64 `json:"monitor_start_hz,omitempty"`


+ 9
- 0
internal/pipeline/policy.go Просмотреть файл

@@ -22,6 +22,7 @@ type Policy struct {
RefinementEnabled bool `json:"refinement_enabled"` RefinementEnabled bool `json:"refinement_enabled"`
MaxRefinementJobs int `json:"max_refinement_jobs"` MaxRefinementJobs int `json:"max_refinement_jobs"`
RefinementMaxConcurrent int `json:"refinement_max_concurrent"` RefinementMaxConcurrent int `json:"refinement_max_concurrent"`
RefinementDetailFFTSize int `json:"refinement_detail_fft_size"`
MinCandidateSNRDb float64 `json:"min_candidate_snr_db"` MinCandidateSNRDb float64 `json:"min_candidate_snr_db"`
RefinementMinSpanHz float64 `json:"refinement_min_span_hz"` RefinementMinSpanHz float64 `json:"refinement_min_span_hz"`
RefinementMaxSpanHz float64 `json:"refinement_max_span_hz"` RefinementMaxSpanHz float64 `json:"refinement_max_span_hz"`
@@ -33,6 +34,10 @@ type Policy struct {
} }


func PolicyFromConfig(cfg config.Config) Policy { func PolicyFromConfig(cfg config.Config) Policy {
detailFFT := cfg.Refinement.DetailFFTSize
if detailFFT <= 0 {
detailFFT = cfg.Surveillance.AnalysisFFTSize
}
p := Policy{ p := Policy{
Mode: cfg.Pipeline.Mode, Mode: cfg.Pipeline.Mode,
Profile: cfg.Pipeline.Profile, Profile: cfg.Pipeline.Profile,
@@ -52,6 +57,7 @@ func PolicyFromConfig(cfg config.Config) Policy {
RefinementEnabled: cfg.Refinement.Enabled, RefinementEnabled: cfg.Refinement.Enabled,
MaxRefinementJobs: cfg.Resources.MaxRefinementJobs, MaxRefinementJobs: cfg.Resources.MaxRefinementJobs,
RefinementMaxConcurrent: cfg.Refinement.MaxConcurrent, RefinementMaxConcurrent: cfg.Refinement.MaxConcurrent,
RefinementDetailFFTSize: detailFFT,
MinCandidateSNRDb: cfg.Refinement.MinCandidateSNRDb, MinCandidateSNRDb: cfg.Refinement.MinCandidateSNRDb,
RefinementMinSpanHz: cfg.Refinement.MinSpanHz, RefinementMinSpanHz: cfg.Refinement.MinSpanHz,
RefinementMaxSpanHz: cfg.Refinement.MaxSpanHz, RefinementMaxSpanHz: cfg.Refinement.MaxSpanHz,
@@ -204,5 +210,8 @@ func ApplyNamedProfile(cfg *config.Config, name string) {
if cfg.Resources.MaxDecodeJobs <= 0 { if cfg.Resources.MaxDecodeJobs <= 0 {
cfg.Resources.MaxDecodeJobs = cfg.Resources.MaxRecordingStreams cfg.Resources.MaxDecodeJobs = cfg.Resources.MaxRecordingStreams
} }
if cfg.Refinement.DetailFFTSize <= 0 {
cfg.Refinement.DetailFFTSize = cfg.Surveillance.AnalysisFFTSize
}
cfg.FFTSize = cfg.Surveillance.AnalysisFFTSize cfg.FFTSize = cfg.Surveillance.AnalysisFFTSize
} }

+ 7
- 0
internal/pipeline/policy_test.go Просмотреть файл

@@ -30,6 +30,9 @@ func TestApplyNamedProfile(t *testing.T) {
if cfg.Resources.MaxRefinementJobs < 16 { if cfg.Resources.MaxRefinementJobs < 16 {
t.Fatalf("refinement jobs too small: %d", cfg.Resources.MaxRefinementJobs) t.Fatalf("refinement jobs too small: %d", cfg.Resources.MaxRefinementJobs)
} }
if cfg.Refinement.DetailFFTSize != cfg.Surveillance.AnalysisFFTSize {
t.Fatalf("detail fft not aligned: %d vs %d", cfg.Refinement.DetailFFTSize, cfg.Surveillance.AnalysisFFTSize)
}
} }


func TestPolicyFromConfig(t *testing.T) { func TestPolicyFromConfig(t *testing.T) {
@@ -45,6 +48,7 @@ func TestPolicyFromConfig(t *testing.T) {
cfg.Surveillance.DisplayBins = 1200 cfg.Surveillance.DisplayBins = 1200
cfg.Surveillance.DisplayFPS = 6 cfg.Surveillance.DisplayFPS = 6
cfg.Refinement.Enabled = true cfg.Refinement.Enabled = true
cfg.Refinement.DetailFFTSize = 4096
cfg.Resources.MaxRefinementJobs = 5 cfg.Resources.MaxRefinementJobs = 5
cfg.Refinement.MinCandidateSNRDb = 2.5 cfg.Refinement.MinCandidateSNRDb = 2.5
cfg.Resources.PreferGPU = true cfg.Resources.PreferGPU = true
@@ -62,6 +66,9 @@ func TestPolicyFromConfig(t *testing.T) {
if !p.RefinementEnabled || p.MaxRefinementJobs != 5 || p.MinCandidateSNRDb != 2.5 || !p.PreferGPU { if !p.RefinementEnabled || p.MaxRefinementJobs != 5 || p.MinCandidateSNRDb != 2.5 || !p.PreferGPU {
t.Fatalf("unexpected policy details: %+v", p) t.Fatalf("unexpected policy details: %+v", p)
} }
if p.RefinementDetailFFTSize != 4096 {
t.Fatalf("unexpected refinement detail fft: %+v", p.RefinementDetailFFTSize)
}
if p.MaxRecordingStreams != 7 { if p.MaxRecordingStreams != 7 {
t.Fatalf("unexpected record budget: %+v", p.MaxRecordingStreams) t.Fatalf("unexpected record budget: %+v", p.MaxRecordingStreams)
} }


+ 39
- 12
internal/pipeline/scheduler.go Просмотреть файл

@@ -34,6 +34,7 @@ type RefinementScore struct {
type RefinementWorkItem struct { type RefinementWorkItem struct {
Candidate Candidate `json:"candidate"` Candidate Candidate `json:"candidate"`
Window RefinementWindow `json:"window,omitempty"` Window RefinementWindow `json:"window,omitempty"`
Execution *RefinementExecution `json:"execution,omitempty"`
Priority float64 `json:"priority,omitempty"` Priority float64 `json:"priority,omitempty"`
Score *RefinementScore `json:"score,omitempty"` Score *RefinementScore `json:"score,omitempty"`
Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"` Breakdown *RefinementScoreDetails `json:"breakdown,omitempty"`
@@ -41,6 +42,15 @@ type RefinementWorkItem struct {
Reason string `json:"reason,omitempty"` Reason string `json:"reason,omitempty"`
} }


type RefinementExecution struct {
Stage string `json:"stage,omitempty"`
SampleRate int `json:"sample_rate,omitempty"`
FFTSize int `json:"fft_size,omitempty"`
CenterHz float64 `json:"center_hz,omitempty"`
SpanHz float64 `json:"span_hz,omitempty"`
Source string `json:"source,omitempty"`
}

const ( const (
RefinementStatusSelected = "selected" RefinementStatusSelected = "selected"
RefinementStatusDropped = "dropped" RefinementStatusDropped = "dropped"
@@ -60,12 +70,14 @@ const (
// Current heuristic is intentionally simple and deterministic; later phases can add // Current heuristic is intentionally simple and deterministic; later phases can add
// richer scoring (novelty, persistence, profile-aware band priorities, decoder value). // richer scoring (novelty, persistence, profile-aware band priorities, decoder value).
func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan { func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan {
budget := refinementBudget(policy)
strategy, strategyReason := refinementStrategy(policy) strategy, strategyReason := refinementStrategy(policy)
budgetModel := BudgetModelFromPolicy(policy)
budget := budgetModel.Refinement.Max
plan := RefinementPlan{ plan := RefinementPlan{
TotalCandidates: len(candidates), TotalCandidates: len(candidates),
MinCandidateSNRDb: policy.MinCandidateSNRDb, MinCandidateSNRDb: policy.MinCandidateSNRDb,
Budget: budget, Budget: budget,
BudgetSource: budgetModel.Refinement.Source,
Strategy: strategy, Strategy: strategy,
StrategyReason: strategyReason, StrategyReason: strategyReason,
} }
@@ -85,6 +97,7 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan {
BandwidthWeight: bwWeight, BandwidthWeight: bwWeight,
PeakWeight: peakWeight, PeakWeight: peakWeight,
} }
scoreModel = applyStrategyWeights(strategy, scoreModel)
plan.ScoreModel = scoreModel plan.ScoreModel = scoreModel
scored := make([]ScheduledCandidate, 0, len(candidates)) scored := make([]ScheduledCandidate, 0, len(candidates))
workItems := make([]RefinementWorkItem, 0, len(candidates)) workItems := make([]RefinementWorkItem, 0, len(candidates))
@@ -107,15 +120,15 @@ func BuildRefinementPlan(candidates []Candidate, policy Policy) RefinementPlan {
}) })
continue continue
} }
snrScore := c.SNRDb * snrWeight
snrScore := c.SNRDb * scoreModel.SNRWeight
bwScore := 0.0 bwScore := 0.0
peakScore := 0.0 peakScore := 0.0
policyBoost := CandidatePriorityBoost(policy, c.Hint) policyBoost := CandidatePriorityBoost(policy, c.Hint)
if c.BandwidthHz > 0 { if c.BandwidthHz > 0 {
bwScore = minFloat64(c.BandwidthHz/25000.0, 6) * bwWeight
bwScore = minFloat64(c.BandwidthHz/25000.0, 6) * scoreModel.BandwidthWeight
} }
if c.PeakDb > 0 { if c.PeakDb > 0 {
peakScore = (c.PeakDb / 20.0) * peakWeight
peakScore = (c.PeakDb / 20.0) * scoreModel.PeakWeight
} }
priority := snrScore + bwScore + peakScore + policyBoost priority := snrScore + bwScore + peakScore + policyBoost
score := &RefinementScore{ score := &RefinementScore{
@@ -198,14 +211,6 @@ func ScheduleCandidates(candidates []Candidate, policy Policy) []ScheduledCandid
return BuildRefinementPlan(candidates, policy).Selected return BuildRefinementPlan(candidates, policy).Selected
} }


func refinementBudget(policy Policy) int {
budget := policy.MaxRefinementJobs
if policy.RefinementMaxConcurrent > 0 && (budget <= 0 || policy.RefinementMaxConcurrent < budget) {
budget = policy.RefinementMaxConcurrent
}
return budget
}

func refinementStrategy(policy Policy) (string, string) { func refinementStrategy(policy Policy) (string, string) {
intent := strings.ToLower(strings.TrimSpace(policy.Intent)) intent := strings.ToLower(strings.TrimSpace(policy.Intent))
switch { switch {
@@ -220,6 +225,28 @@ func refinementStrategy(policy Policy) (string, string) {
} }
} }


func applyStrategyWeights(strategy string, model RefinementScoreModel) RefinementScoreModel {
switch strings.ToLower(strings.TrimSpace(strategy)) {
case "digital-hunting":
model.SNRWeight *= 1.4
model.BandwidthWeight *= 0.75
model.PeakWeight *= 1.2
case "archive-oriented":
model.SNRWeight *= 1.1
model.BandwidthWeight *= 1.6
model.PeakWeight *= 1.05
case "multi-resolution", "multi", "multi-res", "multi_res":
model.SNRWeight *= 1.15
model.BandwidthWeight *= 1.1
model.PeakWeight *= 1.15
case "single-resolution":
model.SNRWeight *= 1.1
model.BandwidthWeight *= 1.0
model.PeakWeight *= 1.0
}
return model
}

func minFloat64(a, b float64) float64 { func minFloat64(a, b float64) float64 {
if a < b { if a < b {
return a return a


+ 18
- 0
internal/pipeline/scheduler_test.go Просмотреть файл

@@ -70,6 +70,9 @@ func TestBuildRefinementPlanRespectsMaxConcurrent(t *testing.T) {
if plan.Budget != 2 { if plan.Budget != 2 {
t.Fatalf("expected budget 2, got %d", plan.Budget) t.Fatalf("expected budget 2, got %d", plan.Budget)
} }
if plan.BudgetSource != "refinement.max_concurrent" {
t.Fatalf("expected budget source refinement.max_concurrent, got %s", plan.BudgetSource)
}
if len(plan.Selected) != 2 { if len(plan.Selected) != 2 {
t.Fatalf("expected 2 selected, got %d", len(plan.Selected)) t.Fatalf("expected 2 selected, got %d", len(plan.Selected))
} }
@@ -159,6 +162,21 @@ func TestBuildRefinementPlanPriorityStats(t *testing.T) {
} }
} }


func TestBuildRefinementPlanStrategyBias(t *testing.T) {
policy := Policy{MaxRefinementJobs: 1, MinCandidateSNRDb: 0, Intent: "archive-and-triage"}
cands := []Candidate{
{ID: 1, CenterHz: 100, SNRDb: 12, BandwidthHz: 5000, PeakDb: 1},
{ID: 2, CenterHz: 200, SNRDb: 11, BandwidthHz: 100000, PeakDb: 1},
}
plan := BuildRefinementPlan(cands, policy)
if len(plan.Selected) != 1 {
t.Fatalf("expected 1 selected, got %d", len(plan.Selected))
}
if plan.Selected[0].Candidate.ID != 2 {
t.Fatalf("expected archive-oriented strategy to favor wider candidate, got %+v", plan.Selected[0])
}
}

func findWorkItem(items []RefinementWorkItem, id int64) *RefinementWorkItem { func findWorkItem(items []RefinementWorkItem, id int64) *RefinementWorkItem {
for i := range items { for i := range items {
if items[i].Candidate.ID == id { if items[i].Candidate.ID == id {


+ 11
- 0
internal/runtime/runtime.go Просмотреть файл

@@ -32,6 +32,7 @@ type SurveillanceUpdate struct {
type RefinementUpdate struct { type RefinementUpdate struct {
Enabled *bool `json:"enabled"` Enabled *bool `json:"enabled"`
MaxConcurrent *int `json:"max_concurrent"` MaxConcurrent *int `json:"max_concurrent"`
DetailFFTSize *int `json:"detail_fft_size"`
MinCandidateSNRDb *float64 `json:"min_candidate_snr_db"` MinCandidateSNRDb *float64 `json:"min_candidate_snr_db"`
MinSpanHz *float64 `json:"min_span_hz"` MinSpanHz *float64 `json:"min_span_hz"`
MaxSpanHz *float64 `json:"max_span_hz"` MaxSpanHz *float64 `json:"max_span_hz"`
@@ -261,6 +262,16 @@ func (m *Manager) ApplyConfig(update ConfigUpdate) (config.Config, error) {
} }
next.Refinement.MaxConcurrent = *update.Refinement.MaxConcurrent next.Refinement.MaxConcurrent = *update.Refinement.MaxConcurrent
} }
if update.Refinement.DetailFFTSize != nil {
v := *update.Refinement.DetailFFTSize
if v <= 0 {
return m.cfg, errors.New("refinement.detail_fft_size must be > 0")
}
if v&(v-1) != 0 {
return m.cfg, errors.New("refinement.detail_fft_size must be a power of 2")
}
next.Refinement.DetailFFTSize = v
}
if update.Refinement.MinCandidateSNRDb != nil { if update.Refinement.MinCandidateSNRDb != nil {
next.Refinement.MinCandidateSNRDb = *update.Refinement.MinCandidateSNRDb next.Refinement.MinCandidateSNRDb = *update.Refinement.MinCandidateSNRDb
} }


+ 16
- 1
internal/runtime/runtime_test.go Просмотреть файл

@@ -36,6 +36,7 @@ func TestApplyConfigUpdate(t *testing.T) {
minSpan := 4000.0 minSpan := 4000.0
maxSpan := 200000.0 maxSpan := 200000.0
autoSpan := false autoSpan := false
detailFFT := 1024
maxDecode := 12 maxDecode := 12
decisionHold := 1500 decisionHold := 1500
updated, err := mgr.ApplyConfig(ConfigUpdate{ updated, err := mgr.ApplyConfig(ConfigUpdate{
@@ -53,7 +54,7 @@ func TestApplyConfigUpdate(t *testing.T) {
AutoDecodeClasses: &autoDecode, AutoDecodeClasses: &autoDecode,
}, },
Surveillance: &SurveillanceUpdate{FrameRate: &survFPS, DisplayBins: &displayBins, DisplayFPS: &displayFPS}, Surveillance: &SurveillanceUpdate{FrameRate: &survFPS, DisplayBins: &displayBins, DisplayFPS: &displayFPS},
Refinement: &RefinementUpdate{MinSpanHz: &minSpan, MaxSpanHz: &maxSpan, AutoSpan: &autoSpan},
Refinement: &RefinementUpdate{MinSpanHz: &minSpan, MaxSpanHz: &maxSpan, AutoSpan: &autoSpan, DetailFFTSize: &detailFFT},
Resources: &ResourcesUpdate{MaxRefinementJobs: &maxRefJobs, MaxDecodeJobs: &maxDecode, DecisionHoldMs: &decisionHold}, Resources: &ResourcesUpdate{MaxRefinementJobs: &maxRefJobs, MaxDecodeJobs: &maxDecode, DecisionHoldMs: &decisionHold},
Detector: &DetectorUpdate{ Detector: &DetectorUpdate{
ThresholdDb: &threshold, ThresholdDb: &threshold,
@@ -143,6 +144,9 @@ func TestApplyConfigUpdate(t *testing.T) {
if updated.Refinement.MinSpanHz != minSpan || updated.Refinement.MaxSpanHz != maxSpan { if updated.Refinement.MinSpanHz != minSpan || updated.Refinement.MaxSpanHz != maxSpan {
t.Fatalf("refinement span not applied: %v / %v", updated.Refinement.MinSpanHz, updated.Refinement.MaxSpanHz) t.Fatalf("refinement span not applied: %v / %v", updated.Refinement.MinSpanHz, updated.Refinement.MaxSpanHz)
} }
if updated.Refinement.DetailFFTSize != detailFFT {
t.Fatalf("refinement detail fft not applied: %v", updated.Refinement.DetailFFTSize)
}
if updated.Refinement.AutoSpan == nil || *updated.Refinement.AutoSpan != autoSpan { if updated.Refinement.AutoSpan == nil || *updated.Refinement.AutoSpan != autoSpan {
t.Fatalf("refinement auto span not applied") t.Fatalf("refinement auto span not applied")
} }
@@ -217,6 +221,17 @@ func TestApplyConfigRejectsInvalid(t *testing.T) {
t.Fatalf("gap_tolerance_ms changed on error") t.Fatalf("gap_tolerance_ms changed on error")
} }
} }

{
mgr := New(cfg)
badDetail := 123
if _, err := mgr.ApplyConfig(ConfigUpdate{Refinement: &RefinementUpdate{DetailFFTSize: &badDetail}}); err == nil {
t.Fatalf("expected refinement.detail_fft_size error")
}
if mgr.Snapshot().Refinement.DetailFFTSize != cfg.Refinement.DetailFFTSize {
t.Fatalf("detail fft changed on error")
}
}
} }


func TestApplySettings(t *testing.T) { func TestApplySettings(t *testing.T) {


Загрузка…
Отмена
Сохранить