Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

892 linhas
23KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "sort"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "time"
  17. "github.com/gorilla/websocket"
  18. "sdr-visual-suite/internal/classifier"
  19. "sdr-visual-suite/internal/config"
  20. "sdr-visual-suite/internal/detector"
  21. "sdr-visual-suite/internal/dsp"
  22. "sdr-visual-suite/internal/events"
  23. fftutil "sdr-visual-suite/internal/fft"
  24. "sdr-visual-suite/internal/fft/gpufft"
  25. "sdr-visual-suite/internal/mock"
  26. "sdr-visual-suite/internal/recorder"
  27. "sdr-visual-suite/internal/runtime"
  28. "sdr-visual-suite/internal/sdr"
  29. "sdr-visual-suite/internal/sdrplay"
  30. )
  31. type SpectrumFrame struct {
  32. Timestamp int64 `json:"ts"`
  33. CenterHz float64 `json:"center_hz"`
  34. SampleHz int `json:"sample_rate"`
  35. FFTSize int `json:"fft_size"`
  36. Spectrum []float64 `json:"spectrum_db"`
  37. Signals []detector.Signal `json:"signals"`
  38. }
  39. type client struct {
  40. conn *websocket.Conn
  41. send chan []byte
  42. done chan struct{}
  43. closeOnce sync.Once
  44. }
  45. type hub struct {
  46. mu sync.Mutex
  47. clients map[*client]struct{}
  48. }
  49. type gpuStatus struct {
  50. mu sync.RWMutex
  51. Available bool `json:"available"`
  52. Active bool `json:"active"`
  53. Error string `json:"error"`
  54. }
  55. type signalSnapshot struct {
  56. mu sync.RWMutex
  57. signals []detector.Signal
  58. }
  59. func (s *signalSnapshot) set(sig []detector.Signal) {
  60. s.mu.Lock()
  61. defer s.mu.Unlock()
  62. s.signals = append([]detector.Signal(nil), sig...)
  63. }
  64. func (s *signalSnapshot) get() []detector.Signal {
  65. s.mu.RLock()
  66. defer s.mu.RUnlock()
  67. return append([]detector.Signal(nil), s.signals...)
  68. }
  69. func (g *gpuStatus) set(active bool, err error) {
  70. g.mu.Lock()
  71. defer g.mu.Unlock()
  72. g.Active = active
  73. if err != nil {
  74. g.Error = err.Error()
  75. } else {
  76. g.Error = ""
  77. }
  78. }
  79. func (g *gpuStatus) snapshot() gpuStatus {
  80. g.mu.RLock()
  81. defer g.mu.RUnlock()
  82. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  83. }
  84. func newHub() *hub {
  85. return &hub{clients: map[*client]struct{}{}}
  86. }
  87. func (h *hub) add(c *client) {
  88. h.mu.Lock()
  89. defer h.mu.Unlock()
  90. h.clients[c] = struct{}{}
  91. }
  92. func (h *hub) remove(c *client) {
  93. c.closeOnce.Do(func() { close(c.done) })
  94. h.mu.Lock()
  95. defer h.mu.Unlock()
  96. delete(h.clients, c)
  97. }
  98. func (h *hub) broadcast(frame SpectrumFrame) {
  99. b, err := json.Marshal(frame)
  100. if err != nil {
  101. log.Printf("marshal frame: %v", err)
  102. return
  103. }
  104. h.mu.Lock()
  105. clients := make([]*client, 0, len(h.clients))
  106. for c := range h.clients {
  107. clients = append(clients, c)
  108. }
  109. h.mu.Unlock()
  110. for _, c := range clients {
  111. select {
  112. case c.send <- b:
  113. default:
  114. h.remove(c)
  115. }
  116. }
  117. }
  118. type sourceManager struct {
  119. mu sync.RWMutex
  120. src sdr.Source
  121. newSource func(cfg config.Config) (sdr.Source, error)
  122. }
  123. func (m *sourceManager) Restart(cfg config.Config) error {
  124. m.mu.Lock()
  125. defer m.mu.Unlock()
  126. old := m.src
  127. _ = old.Stop()
  128. next, err := m.newSource(cfg)
  129. if err != nil {
  130. _ = old.Start()
  131. m.src = old
  132. return err
  133. }
  134. if err := next.Start(); err != nil {
  135. _ = next.Stop()
  136. _ = old.Start()
  137. m.src = old
  138. return err
  139. }
  140. m.src = next
  141. return nil
  142. }
  143. func (m *sourceManager) Stats() sdr.SourceStats {
  144. m.mu.RLock()
  145. defer m.mu.RUnlock()
  146. if sp, ok := m.src.(sdr.StatsProvider); ok {
  147. return sp.Stats()
  148. }
  149. return sdr.SourceStats{}
  150. }
  151. func (m *sourceManager) Flush() {
  152. m.mu.RLock()
  153. defer m.mu.RUnlock()
  154. if fl, ok := m.src.(sdr.Flushable); ok {
  155. fl.Flush()
  156. }
  157. }
  158. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  159. return &sourceManager{src: src, newSource: newSource}
  160. }
  161. func (m *sourceManager) Start() error {
  162. m.mu.RLock()
  163. defer m.mu.RUnlock()
  164. return m.src.Start()
  165. }
  166. func (m *sourceManager) Stop() error {
  167. m.mu.RLock()
  168. defer m.mu.RUnlock()
  169. return m.src.Stop()
  170. }
  171. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  172. m.mu.RLock()
  173. defer m.mu.RUnlock()
  174. return m.src.ReadIQ(n)
  175. }
  176. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  177. m.mu.Lock()
  178. defer m.mu.Unlock()
  179. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  180. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  181. return nil
  182. }
  183. }
  184. old := m.src
  185. _ = old.Stop()
  186. next, err := m.newSource(cfg)
  187. if err != nil {
  188. _ = old.Start()
  189. return err
  190. }
  191. if err := next.Start(); err != nil {
  192. _ = next.Stop()
  193. _ = old.Start()
  194. return err
  195. }
  196. m.src = next
  197. return nil
  198. }
  199. type dspUpdate struct {
  200. cfg config.Config
  201. det *detector.Detector
  202. window []float64
  203. dcBlock bool
  204. iqBalance bool
  205. useGPUFFT bool
  206. }
  207. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  208. select {
  209. case ch <- update:
  210. default:
  211. select {
  212. case <-ch:
  213. default:
  214. }
  215. ch <- update
  216. }
  217. }
  218. func main() {
  219. var cfgPath string
  220. var mockFlag bool
  221. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  222. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  223. flag.Parse()
  224. cfg, err := config.Load(cfgPath)
  225. if err != nil {
  226. log.Fatalf("load config: %v", err)
  227. }
  228. cfgManager := runtime.New(cfg)
  229. gpuState := &gpuStatus{Available: gpufft.Available()}
  230. newSource := func(cfg config.Config) (sdr.Source, error) {
  231. if mockFlag {
  232. src := mock.New(cfg.SampleRate)
  233. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  234. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  235. }
  236. return src, nil
  237. }
  238. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  239. if err != nil {
  240. return nil, err
  241. }
  242. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  243. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  244. }
  245. return src, nil
  246. }
  247. src, err := newSource(cfg)
  248. if err != nil {
  249. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  250. }
  251. srcMgr := newSourceManager(src, newSource)
  252. if err := srcMgr.Start(); err != nil {
  253. log.Fatalf("source start: %v", err)
  254. }
  255. defer srcMgr.Stop()
  256. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  257. log.Fatalf("event path: %v", err)
  258. }
  259. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  260. if err != nil {
  261. log.Fatalf("open events: %v", err)
  262. }
  263. defer eventFile.Close()
  264. eventMu := &sync.RWMutex{}
  265. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  266. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  267. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  268. window := fftutil.Hann(cfg.FFTSize)
  269. h := newHub()
  270. dspUpdates := make(chan dspUpdate, 1)
  271. ctx, cancel := context.WithCancel(context.Background())
  272. defer cancel()
  273. decodeMap := buildDecoderMap(cfg)
  274. recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  275. Enabled: cfg.Recorder.Enabled,
  276. MinSNRDb: cfg.Recorder.MinSNRDb,
  277. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  278. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  279. PrerollMs: cfg.Recorder.PrerollMs,
  280. RecordIQ: cfg.Recorder.RecordIQ,
  281. RecordAudio: cfg.Recorder.RecordAudio,
  282. AutoDemod: cfg.Recorder.AutoDemod,
  283. AutoDecode: cfg.Recorder.AutoDecode,
  284. OutputDir: cfg.Recorder.OutputDir,
  285. ClassFilter: cfg.Recorder.ClassFilter,
  286. RingSeconds: cfg.Recorder.RingSeconds,
  287. }, cfg.CenterHz, decodeMap)
  288. sigSnap := &signalSnapshot{}
  289. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr, sigSnap)
  290. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  291. origin := r.Header.Get("Origin")
  292. if origin == "" || origin == "null" {
  293. return true
  294. }
  295. // allow same-host or any local IP
  296. return true
  297. }}
  298. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  299. conn, err := upgrader.Upgrade(w, r, nil)
  300. if err != nil {
  301. return
  302. }
  303. c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})}
  304. h.add(c)
  305. defer func() {
  306. h.remove(c)
  307. _ = conn.Close()
  308. }()
  309. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  310. conn.SetPongHandler(func(string) error {
  311. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  312. return nil
  313. })
  314. go func() {
  315. ping := time.NewTicker(30 * time.Second)
  316. defer ping.Stop()
  317. for {
  318. select {
  319. case msg, ok := <-c.send:
  320. if !ok {
  321. return
  322. }
  323. _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  324. if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
  325. return
  326. }
  327. case <-ping.C:
  328. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  329. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  330. return
  331. }
  332. }
  333. }
  334. }()
  335. for {
  336. _, _, err := conn.ReadMessage()
  337. if err != nil {
  338. return
  339. }
  340. }
  341. })
  342. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  343. w.Header().Set("Content-Type", "application/json")
  344. switch r.Method {
  345. case http.MethodGet:
  346. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  347. case http.MethodPost:
  348. var update runtime.ConfigUpdate
  349. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  350. http.Error(w, "invalid json", http.StatusBadRequest)
  351. return
  352. }
  353. prev := cfgManager.Snapshot()
  354. next, err := cfgManager.ApplyConfig(update)
  355. if err != nil {
  356. http.Error(w, err.Error(), http.StatusBadRequest)
  357. return
  358. }
  359. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  360. if sourceChanged {
  361. if err := srcMgr.ApplyConfig(next); err != nil {
  362. cfgManager.Replace(prev)
  363. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  364. return
  365. }
  366. }
  367. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  368. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  369. prev.Detector.HoldMs != next.Detector.HoldMs ||
  370. prev.SampleRate != next.SampleRate ||
  371. prev.FFTSize != next.FFTSize
  372. windowChanged := prev.FFTSize != next.FFTSize
  373. var newDet *detector.Detector
  374. var newWindow []float64
  375. if detChanged {
  376. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  377. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  378. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  379. }
  380. if windowChanged {
  381. newWindow = fftutil.Hann(next.FFTSize)
  382. }
  383. pushDSPUpdate(dspUpdates, dspUpdate{
  384. cfg: next,
  385. det: newDet,
  386. window: newWindow,
  387. dcBlock: next.DCBlock,
  388. iqBalance: next.IQBalance,
  389. useGPUFFT: next.UseGPUFFT,
  390. })
  391. _ = json.NewEncoder(w).Encode(next)
  392. default:
  393. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  394. }
  395. })
  396. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  397. w.Header().Set("Content-Type", "application/json")
  398. if r.Method != http.MethodPost {
  399. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  400. return
  401. }
  402. var update runtime.SettingsUpdate
  403. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  404. http.Error(w, "invalid json", http.StatusBadRequest)
  405. return
  406. }
  407. prev := cfgManager.Snapshot()
  408. next, err := cfgManager.ApplySettings(update)
  409. if err != nil {
  410. http.Error(w, err.Error(), http.StatusBadRequest)
  411. return
  412. }
  413. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  414. if err := srcMgr.ApplyConfig(next); err != nil {
  415. cfgManager.Replace(prev)
  416. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  417. return
  418. }
  419. }
  420. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  421. pushDSPUpdate(dspUpdates, dspUpdate{
  422. cfg: next,
  423. dcBlock: next.DCBlock,
  424. iqBalance: next.IQBalance,
  425. })
  426. }
  427. _ = json.NewEncoder(w).Encode(next)
  428. })
  429. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  430. w.Header().Set("Content-Type", "application/json")
  431. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  432. })
  433. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  434. w.Header().Set("Content-Type", "application/json")
  435. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  436. })
  437. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  438. w.Header().Set("Content-Type", "application/json")
  439. limit := 200
  440. if v := r.URL.Query().Get("limit"); v != "" {
  441. if parsed, err := strconv.Atoi(v); err == nil {
  442. limit = parsed
  443. }
  444. }
  445. var since time.Time
  446. if v := r.URL.Query().Get("since"); v != "" {
  447. if parsed, err := parseSince(v); err == nil {
  448. since = parsed
  449. } else {
  450. http.Error(w, "invalid since", http.StatusBadRequest)
  451. return
  452. }
  453. }
  454. snap := cfgManager.Snapshot()
  455. eventMu.RLock()
  456. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  457. eventMu.RUnlock()
  458. if err != nil {
  459. http.Error(w, "failed to read events", http.StatusInternalServerError)
  460. return
  461. }
  462. _ = json.NewEncoder(w).Encode(evs)
  463. })
  464. http.HandleFunc("/api/signals", func(w http.ResponseWriter, r *http.Request) {
  465. w.Header().Set("Content-Type", "application/json")
  466. if sigSnap == nil {
  467. _ = json.NewEncoder(w).Encode([]detector.Signal{})
  468. return
  469. }
  470. _ = json.NewEncoder(w).Encode(sigSnap.get())
  471. })
  472. http.HandleFunc("/api/decoders", func(w http.ResponseWriter, r *http.Request) {
  473. w.Header().Set("Content-Type", "application/json")
  474. _ = json.NewEncoder(w).Encode(decoderKeys(cfgManager.Snapshot()))
  475. })
  476. http.HandleFunc("/api/recordings", func(w http.ResponseWriter, r *http.Request) {
  477. if r.Method != http.MethodGet {
  478. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  479. return
  480. }
  481. w.Header().Set("Content-Type", "application/json")
  482. snap := cfgManager.Snapshot()
  483. list, err := recorder.ListRecordings(snap.Recorder.OutputDir)
  484. if err != nil {
  485. http.Error(w, "failed to list recordings", http.StatusInternalServerError)
  486. return
  487. }
  488. _ = json.NewEncoder(w).Encode(list)
  489. })
  490. http.HandleFunc("/api/recordings/", func(w http.ResponseWriter, r *http.Request) {
  491. w.Header().Set("Content-Type", "application/json")
  492. id := strings.TrimPrefix(r.URL.Path, "/api/recordings/")
  493. if id == "" {
  494. http.Error(w, "missing id", http.StatusBadRequest)
  495. return
  496. }
  497. snap := cfgManager.Snapshot()
  498. base := filepath.Clean(filepath.Join(snap.Recorder.OutputDir, id))
  499. if !strings.HasPrefix(base, filepath.Clean(snap.Recorder.OutputDir)) {
  500. http.Error(w, "invalid path", http.StatusBadRequest)
  501. return
  502. }
  503. if r.URL.Path == "/api/recordings/"+id+"/audio" {
  504. http.ServeFile(w, r, filepath.Join(base, "audio.wav"))
  505. return
  506. }
  507. if r.URL.Path == "/api/recordings/"+id+"/iq" {
  508. http.ServeFile(w, r, filepath.Join(base, "signal.cf32"))
  509. return
  510. }
  511. if r.URL.Path == "/api/recordings/"+id+"/decode" {
  512. mode := r.URL.Query().Get("mode")
  513. cmd := buildDecoderMap(cfgManager.Snapshot())[mode]
  514. if cmd == "" {
  515. http.Error(w, "decoder not configured", http.StatusBadRequest)
  516. return
  517. }
  518. meta, err := recorder.ReadMeta(filepath.Join(base, "meta.json"))
  519. if err != nil {
  520. http.Error(w, "meta read failed", http.StatusInternalServerError)
  521. return
  522. }
  523. audioPath := filepath.Join(base, "audio.wav")
  524. if _, errStat := os.Stat(audioPath); errStat != nil {
  525. audioPath = ""
  526. }
  527. res, err := recorder.DecodeOnDemand(cmd, filepath.Join(base, "signal.cf32"), meta.SampleRate, audioPath)
  528. if err != nil {
  529. http.Error(w, res.Stderr, http.StatusInternalServerError)
  530. return
  531. }
  532. _ = json.NewEncoder(w).Encode(res)
  533. return
  534. }
  535. // default: meta.json
  536. http.ServeFile(w, r, filepath.Join(base, "meta.json"))
  537. })
  538. http.HandleFunc("/api/demod", func(w http.ResponseWriter, r *http.Request) {
  539. if r.Method != http.MethodGet {
  540. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  541. return
  542. }
  543. q := r.URL.Query()
  544. freq, _ := strconv.ParseFloat(q.Get("freq"), 64)
  545. bw, _ := strconv.ParseFloat(q.Get("bw"), 64)
  546. sec, _ := strconv.Atoi(q.Get("sec"))
  547. if sec < 1 {
  548. sec = 1
  549. }
  550. if sec > 10 {
  551. sec = 10
  552. }
  553. mode := q.Get("mode")
  554. data, _, err := recMgr.DemodLive(freq, bw, mode, sec)
  555. if err != nil {
  556. http.Error(w, err.Error(), http.StatusBadRequest)
  557. return
  558. }
  559. w.Header().Set("Content-Type", "audio/wav")
  560. _, _ = w.Write(data)
  561. })
  562. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  563. server := &http.Server{Addr: cfg.WebAddr}
  564. go func() {
  565. log.Printf("web listening on %s", cfg.WebAddr)
  566. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  567. log.Fatalf("server: %v", err)
  568. }
  569. }()
  570. stop := make(chan os.Signal, 1)
  571. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  572. <-stop
  573. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  574. defer cancelTimeout()
  575. _ = server.Shutdown(ctxTimeout)
  576. }
  577. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) {
  578. ticker := time.NewTicker(cfg.FrameInterval())
  579. defer ticker.Stop()
  580. logTicker := time.NewTicker(5 * time.Second)
  581. defer logTicker.Stop()
  582. enc := json.NewEncoder(eventFile)
  583. dcBlocker := dsp.NewDCBlocker(0.995)
  584. dcEnabled := cfg.DCBlock
  585. iqEnabled := cfg.IQBalance
  586. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  587. useGPU := cfg.UseGPUFFT
  588. var gpuEngine *gpufft.Engine
  589. if useGPU && gpuState != nil {
  590. snap := gpuState.snapshot()
  591. if snap.Available {
  592. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  593. gpuEngine = eng
  594. gpuState.set(true, nil)
  595. } else {
  596. gpuState.set(false, err)
  597. useGPU = false
  598. }
  599. } else {
  600. gpuState.set(false, nil)
  601. useGPU = false
  602. }
  603. } else if gpuState != nil {
  604. gpuState.set(false, nil)
  605. }
  606. gotSamples := false
  607. for {
  608. select {
  609. case <-ctx.Done():
  610. return
  611. case <-logTicker.C:
  612. st := srcMgr.Stats()
  613. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  614. case upd := <-updates:
  615. prevFFT := cfg.FFTSize
  616. prevUseGPU := useGPU
  617. cfg = upd.cfg
  618. if rec != nil {
  619. rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  620. Enabled: cfg.Recorder.Enabled,
  621. MinSNRDb: cfg.Recorder.MinSNRDb,
  622. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  623. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  624. PrerollMs: cfg.Recorder.PrerollMs,
  625. RecordIQ: cfg.Recorder.RecordIQ,
  626. RecordAudio: cfg.Recorder.RecordAudio,
  627. AutoDemod: cfg.Recorder.AutoDemod,
  628. AutoDecode: cfg.Recorder.AutoDecode,
  629. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  630. OutputDir: cfg.Recorder.OutputDir,
  631. ClassFilter: cfg.Recorder.ClassFilter,
  632. RingSeconds: cfg.Recorder.RingSeconds,
  633. }, cfg.CenterHz, buildDecoderMap(cfg))
  634. }
  635. if upd.det != nil {
  636. det = upd.det
  637. }
  638. if upd.window != nil {
  639. window = upd.window
  640. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  641. }
  642. dcEnabled = upd.dcBlock
  643. iqEnabled = upd.iqBalance
  644. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  645. srcMgr.Flush()
  646. gotSamples = false
  647. if gpuEngine != nil {
  648. gpuEngine.Close()
  649. gpuEngine = nil
  650. }
  651. useGPU = cfg.UseGPUFFT
  652. if useGPU && gpuState != nil {
  653. snap := gpuState.snapshot()
  654. if snap.Available {
  655. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  656. gpuEngine = eng
  657. gpuState.set(true, nil)
  658. } else {
  659. gpuState.set(false, err)
  660. useGPU = false
  661. }
  662. } else {
  663. gpuState.set(false, nil)
  664. useGPU = false
  665. }
  666. } else if gpuState != nil {
  667. gpuState.set(false, nil)
  668. }
  669. }
  670. dcBlocker.Reset()
  671. ticker.Reset(cfg.FrameInterval())
  672. case <-ticker.C:
  673. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  674. if err != nil {
  675. log.Printf("read IQ: %v", err)
  676. if strings.Contains(err.Error(), "timeout") {
  677. if err := srcMgr.Restart(cfg); err != nil {
  678. log.Printf("restart failed: %v", err)
  679. }
  680. }
  681. continue
  682. }
  683. if rec != nil {
  684. rec.Ingest(time.Now(), iq)
  685. }
  686. if !gotSamples {
  687. log.Printf("received IQ samples")
  688. gotSamples = true
  689. }
  690. if dcEnabled {
  691. dcBlocker.Apply(iq)
  692. }
  693. if iqEnabled {
  694. dsp.IQBalance(iq)
  695. }
  696. var spectrum []float64
  697. if useGPU && gpuEngine != nil {
  698. if len(window) == len(iq) {
  699. for i := 0; i < len(iq); i++ {
  700. v := iq[i]
  701. w := float32(window[i])
  702. iq[i] = complex(real(v)*w, imag(v)*w)
  703. }
  704. }
  705. out, err := gpuEngine.Exec(iq)
  706. if err != nil {
  707. if gpuState != nil {
  708. gpuState.set(false, err)
  709. }
  710. useGPU = false
  711. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  712. } else {
  713. spectrum = fftutil.SpectrumFromFFT(out)
  714. }
  715. } else {
  716. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  717. }
  718. now := time.Now()
  719. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  720. // enrich classification with temporal IQ features on per-signal snippet
  721. if len(iq) > 0 {
  722. for i := range signals {
  723. snip := extractSignalIQ(iq, cfg.SampleRate, cfg.CenterHz, signals[i].CenterHz, signals[i].BWHz)
  724. cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, snip)
  725. signals[i].Class = cls
  726. }
  727. det.UpdateClasses(signals)
  728. }
  729. if sigSnap != nil {
  730. sigSnap.set(signals)
  731. }
  732. eventMu.Lock()
  733. for _, ev := range finished {
  734. _ = enc.Encode(ev)
  735. }
  736. eventMu.Unlock()
  737. if rec != nil {
  738. rec.OnEvents(finished)
  739. }
  740. h.broadcast(SpectrumFrame{
  741. Timestamp: now.UnixMilli(),
  742. CenterHz: cfg.CenterHz,
  743. SampleHz: cfg.SampleRate,
  744. FFTSize: cfg.FFTSize,
  745. Spectrum: spectrum,
  746. Signals: signals,
  747. })
  748. }
  749. }
  750. }
  751. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  752. if raw == "" {
  753. return fallback
  754. }
  755. if d, err := time.ParseDuration(raw); err == nil {
  756. return d
  757. }
  758. return fallback
  759. }
  760. func buildDecoderMap(cfg config.Config) map[string]string {
  761. out := map[string]string{}
  762. if cfg.Decoder.FT8Cmd != "" {
  763. out["FT8"] = cfg.Decoder.FT8Cmd
  764. }
  765. if cfg.Decoder.WSPRCmd != "" {
  766. out["WSPR"] = cfg.Decoder.WSPRCmd
  767. }
  768. if cfg.Decoder.DMRCmd != "" {
  769. out["DMR"] = cfg.Decoder.DMRCmd
  770. }
  771. if cfg.Decoder.DStarCmd != "" {
  772. out["D-STAR"] = cfg.Decoder.DStarCmd
  773. }
  774. if cfg.Decoder.FSKCmd != "" {
  775. out["FSK"] = cfg.Decoder.FSKCmd
  776. }
  777. if cfg.Decoder.PSKCmd != "" {
  778. out["PSK"] = cfg.Decoder.PSKCmd
  779. }
  780. return out
  781. }
  782. func decoderKeys(cfg config.Config) []string {
  783. m := buildDecoderMap(cfg)
  784. keys := make([]string, 0, len(m))
  785. for k := range m {
  786. keys = append(keys, k)
  787. }
  788. sort.Strings(keys)
  789. return keys
  790. }
  791. func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
  792. if len(iq) == 0 || sampleRate <= 0 {
  793. return nil
  794. }
  795. offset := sigHz - centerHz
  796. shifted := dsp.FreqShift(iq, sampleRate, offset)
  797. cutoff := bwHz / 2
  798. if cutoff < 200 {
  799. cutoff = 200
  800. }
  801. if cutoff > float64(sampleRate)/2-1 {
  802. cutoff = float64(sampleRate)/2 - 1
  803. }
  804. taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
  805. filtered := dsp.ApplyFIR(shifted, taps)
  806. decim := sampleRate / 200000
  807. if decim < 1 {
  808. decim = 1
  809. }
  810. return dsp.Decimate(filtered, decim)
  811. }
  812. func parseSince(raw string) (time.Time, error) {
  813. if raw == "" {
  814. return time.Time{}, nil
  815. }
  816. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  817. if ms > 1e12 {
  818. return time.UnixMilli(ms), nil
  819. }
  820. return time.Unix(ms, 0), nil
  821. }
  822. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  823. return t, nil
  824. }
  825. return time.Parse(time.RFC3339, raw)
  826. }