Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

752 рядки
20KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/gorilla/websocket"
  17. "sdr-visual-suite/internal/config"
  18. "sdr-visual-suite/internal/detector"
  19. "sdr-visual-suite/internal/dsp"
  20. "sdr-visual-suite/internal/events"
  21. fftutil "sdr-visual-suite/internal/fft"
  22. "sdr-visual-suite/internal/fft/gpufft"
  23. "sdr-visual-suite/internal/mock"
  24. "sdr-visual-suite/internal/recorder"
  25. "sdr-visual-suite/internal/runtime"
  26. "sdr-visual-suite/internal/sdr"
  27. "sdr-visual-suite/internal/sdrplay"
  28. )
  29. type SpectrumFrame struct {
  30. Timestamp int64 `json:"ts"`
  31. CenterHz float64 `json:"center_hz"`
  32. SampleHz int `json:"sample_rate"`
  33. FFTSize int `json:"fft_size"`
  34. Spectrum []float64 `json:"spectrum_db"`
  35. Signals []detector.Signal `json:"signals"`
  36. }
  37. type client struct {
  38. conn *websocket.Conn
  39. send chan []byte
  40. done chan struct{}
  41. closeOnce sync.Once
  42. }
  43. type hub struct {
  44. mu sync.Mutex
  45. clients map[*client]struct{}
  46. }
  47. type gpuStatus struct {
  48. mu sync.RWMutex
  49. Available bool `json:"available"`
  50. Active bool `json:"active"`
  51. Error string `json:"error"`
  52. }
  53. func (g *gpuStatus) set(active bool, err error) {
  54. g.mu.Lock()
  55. defer g.mu.Unlock()
  56. g.Active = active
  57. if err != nil {
  58. g.Error = err.Error()
  59. } else {
  60. g.Error = ""
  61. }
  62. }
  63. func (g *gpuStatus) snapshot() gpuStatus {
  64. g.mu.RLock()
  65. defer g.mu.RUnlock()
  66. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  67. }
  68. func newHub() *hub {
  69. return &hub{clients: map[*client]struct{}{}}
  70. }
  71. func (h *hub) add(c *client) {
  72. h.mu.Lock()
  73. defer h.mu.Unlock()
  74. h.clients[c] = struct{}{}
  75. }
  76. func (h *hub) remove(c *client) {
  77. c.closeOnce.Do(func() { close(c.done) })
  78. h.mu.Lock()
  79. defer h.mu.Unlock()
  80. delete(h.clients, c)
  81. }
  82. func (h *hub) broadcast(frame SpectrumFrame) {
  83. b, err := json.Marshal(frame)
  84. if err != nil {
  85. log.Printf("marshal frame: %v", err)
  86. return
  87. }
  88. h.mu.Lock()
  89. clients := make([]*client, 0, len(h.clients))
  90. for c := range h.clients {
  91. clients = append(clients, c)
  92. }
  93. h.mu.Unlock()
  94. for _, c := range clients {
  95. select {
  96. case c.send <- b:
  97. default:
  98. h.remove(c)
  99. }
  100. }
  101. }
  102. type sourceManager struct {
  103. mu sync.RWMutex
  104. src sdr.Source
  105. newSource func(cfg config.Config) (sdr.Source, error)
  106. }
  107. func (m *sourceManager) Restart(cfg config.Config) error {
  108. m.mu.Lock()
  109. defer m.mu.Unlock()
  110. old := m.src
  111. _ = old.Stop()
  112. next, err := m.newSource(cfg)
  113. if err != nil {
  114. _ = old.Start()
  115. m.src = old
  116. return err
  117. }
  118. if err := next.Start(); err != nil {
  119. _ = next.Stop()
  120. _ = old.Start()
  121. m.src = old
  122. return err
  123. }
  124. m.src = next
  125. return nil
  126. }
  127. func (m *sourceManager) Stats() sdr.SourceStats {
  128. m.mu.RLock()
  129. defer m.mu.RUnlock()
  130. if sp, ok := m.src.(sdr.StatsProvider); ok {
  131. return sp.Stats()
  132. }
  133. return sdr.SourceStats{}
  134. }
  135. func (m *sourceManager) Flush() {
  136. m.mu.RLock()
  137. defer m.mu.RUnlock()
  138. if fl, ok := m.src.(sdr.Flushable); ok {
  139. fl.Flush()
  140. }
  141. }
  142. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  143. return &sourceManager{src: src, newSource: newSource}
  144. }
  145. func (m *sourceManager) Start() error {
  146. m.mu.RLock()
  147. defer m.mu.RUnlock()
  148. return m.src.Start()
  149. }
  150. func (m *sourceManager) Stop() error {
  151. m.mu.RLock()
  152. defer m.mu.RUnlock()
  153. return m.src.Stop()
  154. }
  155. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  156. m.mu.RLock()
  157. defer m.mu.RUnlock()
  158. return m.src.ReadIQ(n)
  159. }
  160. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  161. m.mu.Lock()
  162. defer m.mu.Unlock()
  163. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  164. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  165. return nil
  166. }
  167. }
  168. old := m.src
  169. _ = old.Stop()
  170. next, err := m.newSource(cfg)
  171. if err != nil {
  172. _ = old.Start()
  173. return err
  174. }
  175. if err := next.Start(); err != nil {
  176. _ = next.Stop()
  177. _ = old.Start()
  178. return err
  179. }
  180. m.src = next
  181. return nil
  182. }
  183. type dspUpdate struct {
  184. cfg config.Config
  185. det *detector.Detector
  186. window []float64
  187. dcBlock bool
  188. iqBalance bool
  189. useGPUFFT bool
  190. }
  191. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  192. select {
  193. case ch <- update:
  194. default:
  195. select {
  196. case <-ch:
  197. default:
  198. }
  199. ch <- update
  200. }
  201. }
  202. func main() {
  203. var cfgPath string
  204. var mockFlag bool
  205. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  206. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  207. flag.Parse()
  208. cfg, err := config.Load(cfgPath)
  209. if err != nil {
  210. log.Fatalf("load config: %v", err)
  211. }
  212. cfgManager := runtime.New(cfg)
  213. gpuState := &gpuStatus{Available: gpufft.Available()}
  214. newSource := func(cfg config.Config) (sdr.Source, error) {
  215. if mockFlag {
  216. src := mock.New(cfg.SampleRate)
  217. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  218. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  219. }
  220. return src, nil
  221. }
  222. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  223. if err != nil {
  224. return nil, err
  225. }
  226. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  227. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  228. }
  229. return src, nil
  230. }
  231. src, err := newSource(cfg)
  232. if err != nil {
  233. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  234. }
  235. srcMgr := newSourceManager(src, newSource)
  236. if err := srcMgr.Start(); err != nil {
  237. log.Fatalf("source start: %v", err)
  238. }
  239. defer srcMgr.Stop()
  240. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  241. log.Fatalf("event path: %v", err)
  242. }
  243. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  244. if err != nil {
  245. log.Fatalf("open events: %v", err)
  246. }
  247. defer eventFile.Close()
  248. eventMu := &sync.RWMutex{}
  249. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  250. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  251. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  252. window := fftutil.Hann(cfg.FFTSize)
  253. h := newHub()
  254. dspUpdates := make(chan dspUpdate, 1)
  255. ctx, cancel := context.WithCancel(context.Background())
  256. defer cancel()
  257. recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  258. Enabled: cfg.Recorder.Enabled,
  259. MinSNRDb: cfg.Recorder.MinSNRDb,
  260. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  261. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  262. PrerollMs: cfg.Recorder.PrerollMs,
  263. RecordIQ: cfg.Recorder.RecordIQ,
  264. RecordAudio: cfg.Recorder.RecordAudio,
  265. AutoDemod: cfg.Recorder.AutoDemod,
  266. OutputDir: cfg.Recorder.OutputDir,
  267. ClassFilter: cfg.Recorder.ClassFilter,
  268. RingSeconds: cfg.Recorder.RingSeconds,
  269. }, cfg.CenterHz)
  270. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr)
  271. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  272. origin := r.Header.Get("Origin")
  273. return origin == "" || strings.HasPrefix(origin, "http://localhost") || strings.HasPrefix(origin, "http://127.0.0.1")
  274. }}
  275. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  276. conn, err := upgrader.Upgrade(w, r, nil)
  277. if err != nil {
  278. return
  279. }
  280. c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})}
  281. h.add(c)
  282. defer func() {
  283. h.remove(c)
  284. _ = conn.Close()
  285. }()
  286. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  287. conn.SetPongHandler(func(string) error {
  288. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  289. return nil
  290. })
  291. go func() {
  292. ping := time.NewTicker(30 * time.Second)
  293. defer ping.Stop()
  294. for {
  295. select {
  296. case msg, ok := <-c.send:
  297. if !ok {
  298. return
  299. }
  300. _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  301. if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
  302. return
  303. }
  304. case <-ping.C:
  305. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  306. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  307. return
  308. }
  309. }
  310. }
  311. }()
  312. for {
  313. _, _, err := conn.ReadMessage()
  314. if err != nil {
  315. return
  316. }
  317. }
  318. })
  319. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  320. w.Header().Set("Content-Type", "application/json")
  321. switch r.Method {
  322. case http.MethodGet:
  323. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  324. case http.MethodPost:
  325. var update runtime.ConfigUpdate
  326. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  327. http.Error(w, "invalid json", http.StatusBadRequest)
  328. return
  329. }
  330. prev := cfgManager.Snapshot()
  331. next, err := cfgManager.ApplyConfig(update)
  332. if err != nil {
  333. http.Error(w, err.Error(), http.StatusBadRequest)
  334. return
  335. }
  336. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  337. if sourceChanged {
  338. if err := srcMgr.ApplyConfig(next); err != nil {
  339. cfgManager.Replace(prev)
  340. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  341. return
  342. }
  343. }
  344. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  345. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  346. prev.Detector.HoldMs != next.Detector.HoldMs ||
  347. prev.SampleRate != next.SampleRate ||
  348. prev.FFTSize != next.FFTSize
  349. windowChanged := prev.FFTSize != next.FFTSize
  350. var newDet *detector.Detector
  351. var newWindow []float64
  352. if detChanged {
  353. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  354. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  355. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  356. }
  357. if windowChanged {
  358. newWindow = fftutil.Hann(next.FFTSize)
  359. }
  360. pushDSPUpdate(dspUpdates, dspUpdate{
  361. cfg: next,
  362. det: newDet,
  363. window: newWindow,
  364. dcBlock: next.DCBlock,
  365. iqBalance: next.IQBalance,
  366. useGPUFFT: next.UseGPUFFT,
  367. })
  368. _ = json.NewEncoder(w).Encode(next)
  369. default:
  370. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  371. }
  372. })
  373. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  374. w.Header().Set("Content-Type", "application/json")
  375. if r.Method != http.MethodPost {
  376. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  377. return
  378. }
  379. var update runtime.SettingsUpdate
  380. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  381. http.Error(w, "invalid json", http.StatusBadRequest)
  382. return
  383. }
  384. prev := cfgManager.Snapshot()
  385. next, err := cfgManager.ApplySettings(update)
  386. if err != nil {
  387. http.Error(w, err.Error(), http.StatusBadRequest)
  388. return
  389. }
  390. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  391. if err := srcMgr.ApplyConfig(next); err != nil {
  392. cfgManager.Replace(prev)
  393. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  394. return
  395. }
  396. }
  397. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  398. pushDSPUpdate(dspUpdates, dspUpdate{
  399. cfg: next,
  400. dcBlock: next.DCBlock,
  401. iqBalance: next.IQBalance,
  402. })
  403. }
  404. _ = json.NewEncoder(w).Encode(next)
  405. })
  406. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  407. w.Header().Set("Content-Type", "application/json")
  408. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  409. })
  410. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  411. w.Header().Set("Content-Type", "application/json")
  412. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  413. })
  414. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  415. w.Header().Set("Content-Type", "application/json")
  416. limit := 200
  417. if v := r.URL.Query().Get("limit"); v != "" {
  418. if parsed, err := strconv.Atoi(v); err == nil {
  419. limit = parsed
  420. }
  421. }
  422. var since time.Time
  423. if v := r.URL.Query().Get("since"); v != "" {
  424. if parsed, err := parseSince(v); err == nil {
  425. since = parsed
  426. } else {
  427. http.Error(w, "invalid since", http.StatusBadRequest)
  428. return
  429. }
  430. }
  431. snap := cfgManager.Snapshot()
  432. eventMu.RLock()
  433. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  434. eventMu.RUnlock()
  435. if err != nil {
  436. http.Error(w, "failed to read events", http.StatusInternalServerError)
  437. return
  438. }
  439. _ = json.NewEncoder(w).Encode(evs)
  440. })
  441. http.HandleFunc("/api/recordings", func(w http.ResponseWriter, r *http.Request) {
  442. if r.Method != http.MethodGet {
  443. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  444. return
  445. }
  446. w.Header().Set("Content-Type", "application/json")
  447. snap := cfgManager.Snapshot()
  448. list, err := recorder.ListRecordings(snap.Recorder.OutputDir)
  449. if err != nil {
  450. http.Error(w, "failed to list recordings", http.StatusInternalServerError)
  451. return
  452. }
  453. _ = json.NewEncoder(w).Encode(list)
  454. })
  455. http.HandleFunc("/api/recordings/", func(w http.ResponseWriter, r *http.Request) {
  456. w.Header().Set("Content-Type", "application/json")
  457. id := strings.TrimPrefix(r.URL.Path, "/api/recordings/")
  458. if id == "" {
  459. http.Error(w, "missing id", http.StatusBadRequest)
  460. return
  461. }
  462. snap := cfgManager.Snapshot()
  463. base := filepath.Clean(filepath.Join(snap.Recorder.OutputDir, id))
  464. if !strings.HasPrefix(base, filepath.Clean(snap.Recorder.OutputDir)) {
  465. http.Error(w, "invalid path", http.StatusBadRequest)
  466. return
  467. }
  468. if r.URL.Path == "/api/recordings/"+id+"/audio" {
  469. http.ServeFile(w, r, filepath.Join(base, "audio.wav"))
  470. return
  471. }
  472. if r.URL.Path == "/api/recordings/"+id+"/iq" {
  473. http.ServeFile(w, r, filepath.Join(base, "signal.cf32"))
  474. return
  475. }
  476. // default: meta.json
  477. http.ServeFile(w, r, filepath.Join(base, "meta.json"))
  478. })
  479. http.HandleFunc("/api/demod", func(w http.ResponseWriter, r *http.Request) {
  480. if r.Method != http.MethodGet {
  481. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  482. return
  483. }
  484. q := r.URL.Query()
  485. freq, _ := strconv.ParseFloat(q.Get("freq"), 64)
  486. bw, _ := strconv.ParseFloat(q.Get("bw"), 64)
  487. sec, _ := strconv.Atoi(q.Get("sec"))
  488. mode := q.Get("mode")
  489. data, _, err := recMgr.DemodLive(freq, bw, mode, sec)
  490. if err != nil {
  491. http.Error(w, err.Error(), http.StatusBadRequest)
  492. return
  493. }
  494. w.Header().Set("Content-Type", "audio/wav")
  495. _, _ = w.Write(data)
  496. })
  497. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  498. server := &http.Server{Addr: cfg.WebAddr}
  499. go func() {
  500. log.Printf("web listening on %s", cfg.WebAddr)
  501. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  502. log.Fatalf("server: %v", err)
  503. }
  504. }()
  505. stop := make(chan os.Signal, 1)
  506. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  507. <-stop
  508. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  509. defer cancelTimeout()
  510. _ = server.Shutdown(ctxTimeout)
  511. }
  512. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager) {
  513. ticker := time.NewTicker(cfg.FrameInterval())
  514. defer ticker.Stop()
  515. logTicker := time.NewTicker(5 * time.Second)
  516. defer logTicker.Stop()
  517. enc := json.NewEncoder(eventFile)
  518. dcBlocker := dsp.NewDCBlocker(0.995)
  519. dcEnabled := cfg.DCBlock
  520. iqEnabled := cfg.IQBalance
  521. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  522. useGPU := cfg.UseGPUFFT
  523. var gpuEngine *gpufft.Engine
  524. if useGPU && gpuState != nil {
  525. snap := gpuState.snapshot()
  526. if snap.Available {
  527. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  528. gpuEngine = eng
  529. gpuState.set(true, nil)
  530. } else {
  531. gpuState.set(false, err)
  532. useGPU = false
  533. }
  534. } else {
  535. gpuState.set(false, nil)
  536. useGPU = false
  537. }
  538. } else if gpuState != nil {
  539. gpuState.set(false, nil)
  540. }
  541. gotSamples := false
  542. for {
  543. select {
  544. case <-ctx.Done():
  545. return
  546. case <-logTicker.C:
  547. st := srcMgr.Stats()
  548. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  549. case upd := <-updates:
  550. prevFFT := cfg.FFTSize
  551. prevUseGPU := useGPU
  552. cfg = upd.cfg
  553. if rec != nil {
  554. rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  555. Enabled: cfg.Recorder.Enabled,
  556. MinSNRDb: cfg.Recorder.MinSNRDb,
  557. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  558. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  559. PrerollMs: cfg.Recorder.PrerollMs,
  560. RecordIQ: cfg.Recorder.RecordIQ,
  561. RecordAudio: cfg.Recorder.RecordAudio,
  562. AutoDemod: cfg.Recorder.AutoDemod,
  563. OutputDir: cfg.Recorder.OutputDir,
  564. ClassFilter: cfg.Recorder.ClassFilter,
  565. RingSeconds: cfg.Recorder.RingSeconds,
  566. }, cfg.CenterHz)
  567. }
  568. if upd.det != nil {
  569. det = upd.det
  570. }
  571. if upd.window != nil {
  572. window = upd.window
  573. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  574. }
  575. dcEnabled = upd.dcBlock
  576. iqEnabled = upd.iqBalance
  577. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  578. srcMgr.Flush()
  579. gotSamples = false
  580. if gpuEngine != nil {
  581. gpuEngine.Close()
  582. gpuEngine = nil
  583. }
  584. useGPU = cfg.UseGPUFFT
  585. if useGPU && gpuState != nil {
  586. snap := gpuState.snapshot()
  587. if snap.Available {
  588. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  589. gpuEngine = eng
  590. gpuState.set(true, nil)
  591. } else {
  592. gpuState.set(false, err)
  593. useGPU = false
  594. }
  595. } else {
  596. gpuState.set(false, nil)
  597. useGPU = false
  598. }
  599. } else if gpuState != nil {
  600. gpuState.set(false, nil)
  601. }
  602. }
  603. dcBlocker.Reset()
  604. ticker.Reset(cfg.FrameInterval())
  605. case <-ticker.C:
  606. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  607. if err != nil {
  608. log.Printf("read IQ: %v", err)
  609. if strings.Contains(err.Error(), "timeout") {
  610. if err := srcMgr.Restart(cfg); err != nil {
  611. log.Printf("restart failed: %v", err)
  612. }
  613. }
  614. continue
  615. }
  616. if rec != nil {
  617. rec.Ingest(time.Now(), iq)
  618. }
  619. if !gotSamples {
  620. log.Printf("received IQ samples")
  621. gotSamples = true
  622. }
  623. if dcEnabled {
  624. dcBlocker.Apply(iq)
  625. }
  626. if iqEnabled {
  627. dsp.IQBalance(iq)
  628. }
  629. var spectrum []float64
  630. if useGPU && gpuEngine != nil {
  631. if len(window) == len(iq) {
  632. for i := 0; i < len(iq); i++ {
  633. v := iq[i]
  634. w := float32(window[i])
  635. iq[i] = complex(real(v)*w, imag(v)*w)
  636. }
  637. }
  638. out, err := gpuEngine.Exec(iq)
  639. if err != nil {
  640. if gpuState != nil {
  641. gpuState.set(false, err)
  642. }
  643. useGPU = false
  644. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  645. } else {
  646. spectrum = fftutil.SpectrumFromFFT(out)
  647. }
  648. } else {
  649. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  650. }
  651. now := time.Now()
  652. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  653. eventMu.Lock()
  654. for _, ev := range finished {
  655. _ = enc.Encode(ev)
  656. }
  657. eventMu.Unlock()
  658. if rec != nil {
  659. rec.OnEvents(finished)
  660. }
  661. h.broadcast(SpectrumFrame{
  662. Timestamp: now.UnixMilli(),
  663. CenterHz: cfg.CenterHz,
  664. SampleHz: cfg.SampleRate,
  665. FFTSize: cfg.FFTSize,
  666. Spectrum: spectrum,
  667. Signals: signals,
  668. })
  669. }
  670. }
  671. }
  672. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  673. if raw == "" {
  674. return fallback
  675. }
  676. if d, err := time.ParseDuration(raw); err == nil {
  677. return d
  678. }
  679. return fallback
  680. }
  681. func parseSince(raw string) (time.Time, error) {
  682. if raw == "" {
  683. return time.Time{}, nil
  684. }
  685. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  686. if ms > 1e12 {
  687. return time.UnixMilli(ms), nil
  688. }
  689. return time.Unix(ms, 0), nil
  690. }
  691. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  692. return t, nil
  693. }
  694. return time.Parse(time.RFC3339, raw)
  695. }