Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

786 řádky
21KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/gorilla/websocket"
  17. "sdr-visual-suite/internal/classifier"
  18. "sdr-visual-suite/internal/config"
  19. "sdr-visual-suite/internal/detector"
  20. "sdr-visual-suite/internal/dsp"
  21. "sdr-visual-suite/internal/events"
  22. fftutil "sdr-visual-suite/internal/fft"
  23. "sdr-visual-suite/internal/fft/gpufft"
  24. "sdr-visual-suite/internal/mock"
  25. "sdr-visual-suite/internal/recorder"
  26. "sdr-visual-suite/internal/runtime"
  27. "sdr-visual-suite/internal/sdr"
  28. "sdr-visual-suite/internal/sdrplay"
  29. )
  30. type SpectrumFrame struct {
  31. Timestamp int64 `json:"ts"`
  32. CenterHz float64 `json:"center_hz"`
  33. SampleHz int `json:"sample_rate"`
  34. FFTSize int `json:"fft_size"`
  35. Spectrum []float64 `json:"spectrum_db"`
  36. Signals []detector.Signal `json:"signals"`
  37. }
  38. type client struct {
  39. conn *websocket.Conn
  40. send chan []byte
  41. done chan struct{}
  42. closeOnce sync.Once
  43. }
  44. type hub struct {
  45. mu sync.Mutex
  46. clients map[*client]struct{}
  47. }
  48. type gpuStatus struct {
  49. mu sync.RWMutex
  50. Available bool `json:"available"`
  51. Active bool `json:"active"`
  52. Error string `json:"error"`
  53. }
  54. func (g *gpuStatus) set(active bool, err error) {
  55. g.mu.Lock()
  56. defer g.mu.Unlock()
  57. g.Active = active
  58. if err != nil {
  59. g.Error = err.Error()
  60. } else {
  61. g.Error = ""
  62. }
  63. }
  64. func (g *gpuStatus) snapshot() gpuStatus {
  65. g.mu.RLock()
  66. defer g.mu.RUnlock()
  67. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  68. }
  69. func newHub() *hub {
  70. return &hub{clients: map[*client]struct{}{}}
  71. }
  72. func (h *hub) add(c *client) {
  73. h.mu.Lock()
  74. defer h.mu.Unlock()
  75. h.clients[c] = struct{}{}
  76. }
  77. func (h *hub) remove(c *client) {
  78. c.closeOnce.Do(func() { close(c.done) })
  79. h.mu.Lock()
  80. defer h.mu.Unlock()
  81. delete(h.clients, c)
  82. }
  83. func (h *hub) broadcast(frame SpectrumFrame) {
  84. b, err := json.Marshal(frame)
  85. if err != nil {
  86. log.Printf("marshal frame: %v", err)
  87. return
  88. }
  89. h.mu.Lock()
  90. clients := make([]*client, 0, len(h.clients))
  91. for c := range h.clients {
  92. clients = append(clients, c)
  93. }
  94. h.mu.Unlock()
  95. for _, c := range clients {
  96. select {
  97. case c.send <- b:
  98. default:
  99. h.remove(c)
  100. }
  101. }
  102. }
  103. type sourceManager struct {
  104. mu sync.RWMutex
  105. src sdr.Source
  106. newSource func(cfg config.Config) (sdr.Source, error)
  107. }
  108. func (m *sourceManager) Restart(cfg config.Config) error {
  109. m.mu.Lock()
  110. defer m.mu.Unlock()
  111. old := m.src
  112. _ = old.Stop()
  113. next, err := m.newSource(cfg)
  114. if err != nil {
  115. _ = old.Start()
  116. m.src = old
  117. return err
  118. }
  119. if err := next.Start(); err != nil {
  120. _ = next.Stop()
  121. _ = old.Start()
  122. m.src = old
  123. return err
  124. }
  125. m.src = next
  126. return nil
  127. }
  128. func (m *sourceManager) Stats() sdr.SourceStats {
  129. m.mu.RLock()
  130. defer m.mu.RUnlock()
  131. if sp, ok := m.src.(sdr.StatsProvider); ok {
  132. return sp.Stats()
  133. }
  134. return sdr.SourceStats{}
  135. }
  136. func (m *sourceManager) Flush() {
  137. m.mu.RLock()
  138. defer m.mu.RUnlock()
  139. if fl, ok := m.src.(sdr.Flushable); ok {
  140. fl.Flush()
  141. }
  142. }
  143. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  144. return &sourceManager{src: src, newSource: newSource}
  145. }
  146. func (m *sourceManager) Start() error {
  147. m.mu.RLock()
  148. defer m.mu.RUnlock()
  149. return m.src.Start()
  150. }
  151. func (m *sourceManager) Stop() error {
  152. m.mu.RLock()
  153. defer m.mu.RUnlock()
  154. return m.src.Stop()
  155. }
  156. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  157. m.mu.RLock()
  158. defer m.mu.RUnlock()
  159. return m.src.ReadIQ(n)
  160. }
  161. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  162. m.mu.Lock()
  163. defer m.mu.Unlock()
  164. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  165. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  166. return nil
  167. }
  168. }
  169. old := m.src
  170. _ = old.Stop()
  171. next, err := m.newSource(cfg)
  172. if err != nil {
  173. _ = old.Start()
  174. return err
  175. }
  176. if err := next.Start(); err != nil {
  177. _ = next.Stop()
  178. _ = old.Start()
  179. return err
  180. }
  181. m.src = next
  182. return nil
  183. }
  184. type dspUpdate struct {
  185. cfg config.Config
  186. det *detector.Detector
  187. window []float64
  188. dcBlock bool
  189. iqBalance bool
  190. useGPUFFT bool
  191. }
  192. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  193. select {
  194. case ch <- update:
  195. default:
  196. select {
  197. case <-ch:
  198. default:
  199. }
  200. ch <- update
  201. }
  202. }
  203. func main() {
  204. var cfgPath string
  205. var mockFlag bool
  206. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  207. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  208. flag.Parse()
  209. cfg, err := config.Load(cfgPath)
  210. if err != nil {
  211. log.Fatalf("load config: %v", err)
  212. }
  213. cfgManager := runtime.New(cfg)
  214. gpuState := &gpuStatus{Available: gpufft.Available()}
  215. newSource := func(cfg config.Config) (sdr.Source, error) {
  216. if mockFlag {
  217. src := mock.New(cfg.SampleRate)
  218. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  219. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  220. }
  221. return src, nil
  222. }
  223. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  224. if err != nil {
  225. return nil, err
  226. }
  227. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  228. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  229. }
  230. return src, nil
  231. }
  232. src, err := newSource(cfg)
  233. if err != nil {
  234. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  235. }
  236. srcMgr := newSourceManager(src, newSource)
  237. if err := srcMgr.Start(); err != nil {
  238. log.Fatalf("source start: %v", err)
  239. }
  240. defer srcMgr.Stop()
  241. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  242. log.Fatalf("event path: %v", err)
  243. }
  244. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  245. if err != nil {
  246. log.Fatalf("open events: %v", err)
  247. }
  248. defer eventFile.Close()
  249. eventMu := &sync.RWMutex{}
  250. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  251. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  252. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  253. window := fftutil.Hann(cfg.FFTSize)
  254. h := newHub()
  255. dspUpdates := make(chan dspUpdate, 1)
  256. ctx, cancel := context.WithCancel(context.Background())
  257. defer cancel()
  258. decodeMap := buildDecoderMap(cfg)
  259. recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  260. Enabled: cfg.Recorder.Enabled,
  261. MinSNRDb: cfg.Recorder.MinSNRDb,
  262. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  263. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  264. PrerollMs: cfg.Recorder.PrerollMs,
  265. RecordIQ: cfg.Recorder.RecordIQ,
  266. RecordAudio: cfg.Recorder.RecordAudio,
  267. AutoDemod: cfg.Recorder.AutoDemod,
  268. AutoDecode: cfg.Recorder.AutoDecode,
  269. OutputDir: cfg.Recorder.OutputDir,
  270. ClassFilter: cfg.Recorder.ClassFilter,
  271. RingSeconds: cfg.Recorder.RingSeconds,
  272. }, cfg.CenterHz, decodeMap)
  273. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr)
  274. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  275. origin := r.Header.Get("Origin")
  276. return origin == "" || strings.HasPrefix(origin, "http://localhost") || strings.HasPrefix(origin, "http://127.0.0.1")
  277. }}
  278. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  279. conn, err := upgrader.Upgrade(w, r, nil)
  280. if err != nil {
  281. return
  282. }
  283. c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})}
  284. h.add(c)
  285. defer func() {
  286. h.remove(c)
  287. _ = conn.Close()
  288. }()
  289. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  290. conn.SetPongHandler(func(string) error {
  291. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  292. return nil
  293. })
  294. go func() {
  295. ping := time.NewTicker(30 * time.Second)
  296. defer ping.Stop()
  297. for {
  298. select {
  299. case msg, ok := <-c.send:
  300. if !ok {
  301. return
  302. }
  303. _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  304. if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
  305. return
  306. }
  307. case <-ping.C:
  308. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  309. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  310. return
  311. }
  312. }
  313. }
  314. }()
  315. for {
  316. _, _, err := conn.ReadMessage()
  317. if err != nil {
  318. return
  319. }
  320. }
  321. })
  322. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  323. w.Header().Set("Content-Type", "application/json")
  324. switch r.Method {
  325. case http.MethodGet:
  326. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  327. case http.MethodPost:
  328. var update runtime.ConfigUpdate
  329. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  330. http.Error(w, "invalid json", http.StatusBadRequest)
  331. return
  332. }
  333. prev := cfgManager.Snapshot()
  334. next, err := cfgManager.ApplyConfig(update)
  335. if err != nil {
  336. http.Error(w, err.Error(), http.StatusBadRequest)
  337. return
  338. }
  339. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  340. if sourceChanged {
  341. if err := srcMgr.ApplyConfig(next); err != nil {
  342. cfgManager.Replace(prev)
  343. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  344. return
  345. }
  346. }
  347. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  348. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  349. prev.Detector.HoldMs != next.Detector.HoldMs ||
  350. prev.SampleRate != next.SampleRate ||
  351. prev.FFTSize != next.FFTSize
  352. windowChanged := prev.FFTSize != next.FFTSize
  353. var newDet *detector.Detector
  354. var newWindow []float64
  355. if detChanged {
  356. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  357. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  358. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  359. }
  360. if windowChanged {
  361. newWindow = fftutil.Hann(next.FFTSize)
  362. }
  363. pushDSPUpdate(dspUpdates, dspUpdate{
  364. cfg: next,
  365. det: newDet,
  366. window: newWindow,
  367. dcBlock: next.DCBlock,
  368. iqBalance: next.IQBalance,
  369. useGPUFFT: next.UseGPUFFT,
  370. })
  371. _ = json.NewEncoder(w).Encode(next)
  372. default:
  373. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  374. }
  375. })
  376. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  377. w.Header().Set("Content-Type", "application/json")
  378. if r.Method != http.MethodPost {
  379. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  380. return
  381. }
  382. var update runtime.SettingsUpdate
  383. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  384. http.Error(w, "invalid json", http.StatusBadRequest)
  385. return
  386. }
  387. prev := cfgManager.Snapshot()
  388. next, err := cfgManager.ApplySettings(update)
  389. if err != nil {
  390. http.Error(w, err.Error(), http.StatusBadRequest)
  391. return
  392. }
  393. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  394. if err := srcMgr.ApplyConfig(next); err != nil {
  395. cfgManager.Replace(prev)
  396. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  397. return
  398. }
  399. }
  400. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  401. pushDSPUpdate(dspUpdates, dspUpdate{
  402. cfg: next,
  403. dcBlock: next.DCBlock,
  404. iqBalance: next.IQBalance,
  405. })
  406. }
  407. _ = json.NewEncoder(w).Encode(next)
  408. })
  409. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  410. w.Header().Set("Content-Type", "application/json")
  411. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  412. })
  413. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  414. w.Header().Set("Content-Type", "application/json")
  415. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  416. })
  417. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  418. w.Header().Set("Content-Type", "application/json")
  419. limit := 200
  420. if v := r.URL.Query().Get("limit"); v != "" {
  421. if parsed, err := strconv.Atoi(v); err == nil {
  422. limit = parsed
  423. }
  424. }
  425. var since time.Time
  426. if v := r.URL.Query().Get("since"); v != "" {
  427. if parsed, err := parseSince(v); err == nil {
  428. since = parsed
  429. } else {
  430. http.Error(w, "invalid since", http.StatusBadRequest)
  431. return
  432. }
  433. }
  434. snap := cfgManager.Snapshot()
  435. eventMu.RLock()
  436. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  437. eventMu.RUnlock()
  438. if err != nil {
  439. http.Error(w, "failed to read events", http.StatusInternalServerError)
  440. return
  441. }
  442. _ = json.NewEncoder(w).Encode(evs)
  443. })
  444. http.HandleFunc("/api/recordings", func(w http.ResponseWriter, r *http.Request) {
  445. if r.Method != http.MethodGet {
  446. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  447. return
  448. }
  449. w.Header().Set("Content-Type", "application/json")
  450. snap := cfgManager.Snapshot()
  451. list, err := recorder.ListRecordings(snap.Recorder.OutputDir)
  452. if err != nil {
  453. http.Error(w, "failed to list recordings", http.StatusInternalServerError)
  454. return
  455. }
  456. _ = json.NewEncoder(w).Encode(list)
  457. })
  458. http.HandleFunc("/api/recordings/", func(w http.ResponseWriter, r *http.Request) {
  459. w.Header().Set("Content-Type", "application/json")
  460. id := strings.TrimPrefix(r.URL.Path, "/api/recordings/")
  461. if id == "" {
  462. http.Error(w, "missing id", http.StatusBadRequest)
  463. return
  464. }
  465. snap := cfgManager.Snapshot()
  466. base := filepath.Clean(filepath.Join(snap.Recorder.OutputDir, id))
  467. if !strings.HasPrefix(base, filepath.Clean(snap.Recorder.OutputDir)) {
  468. http.Error(w, "invalid path", http.StatusBadRequest)
  469. return
  470. }
  471. if r.URL.Path == "/api/recordings/"+id+"/audio" {
  472. http.ServeFile(w, r, filepath.Join(base, "audio.wav"))
  473. return
  474. }
  475. if r.URL.Path == "/api/recordings/"+id+"/iq" {
  476. http.ServeFile(w, r, filepath.Join(base, "signal.cf32"))
  477. return
  478. }
  479. // default: meta.json
  480. http.ServeFile(w, r, filepath.Join(base, "meta.json"))
  481. })
  482. http.HandleFunc("/api/demod", func(w http.ResponseWriter, r *http.Request) {
  483. if r.Method != http.MethodGet {
  484. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  485. return
  486. }
  487. q := r.URL.Query()
  488. freq, _ := strconv.ParseFloat(q.Get("freq"), 64)
  489. bw, _ := strconv.ParseFloat(q.Get("bw"), 64)
  490. sec, _ := strconv.Atoi(q.Get("sec"))
  491. mode := q.Get("mode")
  492. data, _, err := recMgr.DemodLive(freq, bw, mode, sec)
  493. if err != nil {
  494. http.Error(w, err.Error(), http.StatusBadRequest)
  495. return
  496. }
  497. w.Header().Set("Content-Type", "audio/wav")
  498. _, _ = w.Write(data)
  499. })
  500. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  501. server := &http.Server{Addr: cfg.WebAddr}
  502. go func() {
  503. log.Printf("web listening on %s", cfg.WebAddr)
  504. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  505. log.Fatalf("server: %v", err)
  506. }
  507. }()
  508. stop := make(chan os.Signal, 1)
  509. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  510. <-stop
  511. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  512. defer cancelTimeout()
  513. _ = server.Shutdown(ctxTimeout)
  514. }
  515. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager) {
  516. ticker := time.NewTicker(cfg.FrameInterval())
  517. defer ticker.Stop()
  518. logTicker := time.NewTicker(5 * time.Second)
  519. defer logTicker.Stop()
  520. enc := json.NewEncoder(eventFile)
  521. dcBlocker := dsp.NewDCBlocker(0.995)
  522. dcEnabled := cfg.DCBlock
  523. iqEnabled := cfg.IQBalance
  524. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  525. useGPU := cfg.UseGPUFFT
  526. var gpuEngine *gpufft.Engine
  527. if useGPU && gpuState != nil {
  528. snap := gpuState.snapshot()
  529. if snap.Available {
  530. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  531. gpuEngine = eng
  532. gpuState.set(true, nil)
  533. } else {
  534. gpuState.set(false, err)
  535. useGPU = false
  536. }
  537. } else {
  538. gpuState.set(false, nil)
  539. useGPU = false
  540. }
  541. } else if gpuState != nil {
  542. gpuState.set(false, nil)
  543. }
  544. gotSamples := false
  545. for {
  546. select {
  547. case <-ctx.Done():
  548. return
  549. case <-logTicker.C:
  550. st := srcMgr.Stats()
  551. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  552. case upd := <-updates:
  553. prevFFT := cfg.FFTSize
  554. prevUseGPU := useGPU
  555. cfg = upd.cfg
  556. if rec != nil {
  557. rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  558. Enabled: cfg.Recorder.Enabled,
  559. MinSNRDb: cfg.Recorder.MinSNRDb,
  560. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  561. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  562. PrerollMs: cfg.Recorder.PrerollMs,
  563. RecordIQ: cfg.Recorder.RecordIQ,
  564. RecordAudio: cfg.Recorder.RecordAudio,
  565. AutoDemod: cfg.Recorder.AutoDemod,
  566. AutoDecode: cfg.Recorder.AutoDecode,
  567. OutputDir: cfg.Recorder.OutputDir,
  568. ClassFilter: cfg.Recorder.ClassFilter,
  569. RingSeconds: cfg.Recorder.RingSeconds,
  570. }, cfg.CenterHz, buildDecoderMap(cfg))
  571. }
  572. if upd.det != nil {
  573. det = upd.det
  574. }
  575. if upd.window != nil {
  576. window = upd.window
  577. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  578. }
  579. dcEnabled = upd.dcBlock
  580. iqEnabled = upd.iqBalance
  581. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  582. srcMgr.Flush()
  583. gotSamples = false
  584. if gpuEngine != nil {
  585. gpuEngine.Close()
  586. gpuEngine = nil
  587. }
  588. useGPU = cfg.UseGPUFFT
  589. if useGPU && gpuState != nil {
  590. snap := gpuState.snapshot()
  591. if snap.Available {
  592. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  593. gpuEngine = eng
  594. gpuState.set(true, nil)
  595. } else {
  596. gpuState.set(false, err)
  597. useGPU = false
  598. }
  599. } else {
  600. gpuState.set(false, nil)
  601. useGPU = false
  602. }
  603. } else if gpuState != nil {
  604. gpuState.set(false, nil)
  605. }
  606. }
  607. dcBlocker.Reset()
  608. ticker.Reset(cfg.FrameInterval())
  609. case <-ticker.C:
  610. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  611. if err != nil {
  612. log.Printf("read IQ: %v", err)
  613. if strings.Contains(err.Error(), "timeout") {
  614. if err := srcMgr.Restart(cfg); err != nil {
  615. log.Printf("restart failed: %v", err)
  616. }
  617. }
  618. continue
  619. }
  620. if rec != nil {
  621. rec.Ingest(time.Now(), iq)
  622. }
  623. if !gotSamples {
  624. log.Printf("received IQ samples")
  625. gotSamples = true
  626. }
  627. if dcEnabled {
  628. dcBlocker.Apply(iq)
  629. }
  630. if iqEnabled {
  631. dsp.IQBalance(iq)
  632. }
  633. var spectrum []float64
  634. if useGPU && gpuEngine != nil {
  635. if len(window) == len(iq) {
  636. for i := 0; i < len(iq); i++ {
  637. v := iq[i]
  638. w := float32(window[i])
  639. iq[i] = complex(real(v)*w, imag(v)*w)
  640. }
  641. }
  642. out, err := gpuEngine.Exec(iq)
  643. if err != nil {
  644. if gpuState != nil {
  645. gpuState.set(false, err)
  646. }
  647. useGPU = false
  648. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  649. } else {
  650. spectrum = fftutil.SpectrumFromFFT(out)
  651. }
  652. } else {
  653. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  654. }
  655. now := time.Now()
  656. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  657. // enrich classification with temporal IQ features
  658. if len(iq) > 0 {
  659. for i := range signals {
  660. cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, iq)
  661. signals[i].Class = cls
  662. }
  663. }
  664. eventMu.Lock()
  665. for _, ev := range finished {
  666. _ = enc.Encode(ev)
  667. }
  668. eventMu.Unlock()
  669. if rec != nil {
  670. rec.OnEvents(finished)
  671. }
  672. h.broadcast(SpectrumFrame{
  673. Timestamp: now.UnixMilli(),
  674. CenterHz: cfg.CenterHz,
  675. SampleHz: cfg.SampleRate,
  676. FFTSize: cfg.FFTSize,
  677. Spectrum: spectrum,
  678. Signals: signals,
  679. })
  680. }
  681. }
  682. }
  683. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  684. if raw == "" {
  685. return fallback
  686. }
  687. if d, err := time.ParseDuration(raw); err == nil {
  688. return d
  689. }
  690. return fallback
  691. }
  692. func buildDecoderMap(cfg config.Config) map[string]string {
  693. out := map[string]string{}
  694. if cfg.Decoder.FT8Cmd != "" {
  695. out["FT8"] = cfg.Decoder.FT8Cmd
  696. }
  697. if cfg.Decoder.WSPRCmd != "" {
  698. out["WSPR"] = cfg.Decoder.WSPRCmd
  699. }
  700. if cfg.Decoder.DMRCmd != "" {
  701. out["DMR"] = cfg.Decoder.DMRCmd
  702. }
  703. if cfg.Decoder.DStarCmd != "" {
  704. out["D-STAR"] = cfg.Decoder.DStarCmd
  705. }
  706. if cfg.Decoder.FSKCmd != "" {
  707. out["FSK"] = cfg.Decoder.FSKCmd
  708. }
  709. if cfg.Decoder.PSKCmd != "" {
  710. out["PSK"] = cfg.Decoder.PSKCmd
  711. }
  712. return out
  713. }
  714. func parseSince(raw string) (time.Time, error) {
  715. if raw == "" {
  716. return time.Time{}, nil
  717. }
  718. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  719. if ms > 1e12 {
  720. return time.UnixMilli(ms), nil
  721. }
  722. return time.Unix(ms, 0), nil
  723. }
  724. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  725. return t, nil
  726. }
  727. return time.Parse(time.RFC3339, raw)
  728. }