Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

903 lines
24KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "sort"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "time"
  17. "github.com/gorilla/websocket"
  18. "sdr-visual-suite/internal/classifier"
  19. "sdr-visual-suite/internal/config"
  20. "sdr-visual-suite/internal/detector"
  21. "sdr-visual-suite/internal/dsp"
  22. "sdr-visual-suite/internal/events"
  23. fftutil "sdr-visual-suite/internal/fft"
  24. "sdr-visual-suite/internal/fft/gpufft"
  25. "sdr-visual-suite/internal/mock"
  26. "sdr-visual-suite/internal/recorder"
  27. "sdr-visual-suite/internal/runtime"
  28. "sdr-visual-suite/internal/sdr"
  29. "sdr-visual-suite/internal/sdrplay"
  30. )
  31. type SpectrumFrame struct {
  32. Timestamp int64 `json:"ts"`
  33. CenterHz float64 `json:"center_hz"`
  34. SampleHz int `json:"sample_rate"`
  35. FFTSize int `json:"fft_size"`
  36. Spectrum []float64 `json:"spectrum_db"`
  37. Signals []detector.Signal `json:"signals"`
  38. }
  39. type client struct {
  40. conn *websocket.Conn
  41. send chan []byte
  42. done chan struct{}
  43. closeOnce sync.Once
  44. }
  45. type hub struct {
  46. mu sync.Mutex
  47. clients map[*client]struct{}
  48. frameCnt int64
  49. lastLogTs time.Time
  50. }
  51. type gpuStatus struct {
  52. mu sync.RWMutex
  53. Available bool `json:"available"`
  54. Active bool `json:"active"`
  55. Error string `json:"error"`
  56. }
  57. type signalSnapshot struct {
  58. mu sync.RWMutex
  59. signals []detector.Signal
  60. }
  61. func (s *signalSnapshot) set(sig []detector.Signal) {
  62. s.mu.Lock()
  63. defer s.mu.Unlock()
  64. s.signals = append([]detector.Signal(nil), sig...)
  65. }
  66. func (s *signalSnapshot) get() []detector.Signal {
  67. s.mu.RLock()
  68. defer s.mu.RUnlock()
  69. return append([]detector.Signal(nil), s.signals...)
  70. }
  71. func (g *gpuStatus) set(active bool, err error) {
  72. g.mu.Lock()
  73. defer g.mu.Unlock()
  74. g.Active = active
  75. if err != nil {
  76. g.Error = err.Error()
  77. } else {
  78. g.Error = ""
  79. }
  80. }
  81. func (g *gpuStatus) snapshot() gpuStatus {
  82. g.mu.RLock()
  83. defer g.mu.RUnlock()
  84. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  85. }
  86. func newHub() *hub {
  87. return &hub{clients: map[*client]struct{}{}, lastLogTs: time.Now()}
  88. }
  89. func (h *hub) add(c *client) {
  90. h.mu.Lock()
  91. defer h.mu.Unlock()
  92. h.clients[c] = struct{}{}
  93. log.Printf("ws connected (%d clients)", len(h.clients))
  94. }
  95. func (h *hub) remove(c *client) {
  96. c.closeOnce.Do(func() { close(c.done) })
  97. h.mu.Lock()
  98. defer h.mu.Unlock()
  99. delete(h.clients, c)
  100. log.Printf("ws disconnected (%d clients)", len(h.clients))
  101. }
  102. func (h *hub) broadcast(frame SpectrumFrame) {
  103. b, err := json.Marshal(frame)
  104. if err != nil {
  105. log.Printf("marshal frame: %v", err)
  106. return
  107. }
  108. h.mu.Lock()
  109. clients := make([]*client, 0, len(h.clients))
  110. for c := range h.clients {
  111. clients = append(clients, c)
  112. }
  113. h.mu.Unlock()
  114. for _, c := range clients {
  115. select {
  116. case c.send <- b:
  117. default:
  118. h.remove(c)
  119. }
  120. }
  121. h.frameCnt++
  122. if time.Since(h.lastLogTs) > 2*time.Second {
  123. h.lastLogTs = time.Now()
  124. log.Printf("broadcast frames=%d clients=%d", h.frameCnt, len(clients))
  125. }
  126. }
  127. type sourceManager struct {
  128. mu sync.RWMutex
  129. src sdr.Source
  130. newSource func(cfg config.Config) (sdr.Source, error)
  131. }
  132. func (m *sourceManager) Restart(cfg config.Config) error {
  133. m.mu.Lock()
  134. defer m.mu.Unlock()
  135. old := m.src
  136. _ = old.Stop()
  137. next, err := m.newSource(cfg)
  138. if err != nil {
  139. _ = old.Start()
  140. m.src = old
  141. return err
  142. }
  143. if err := next.Start(); err != nil {
  144. _ = next.Stop()
  145. _ = old.Start()
  146. m.src = old
  147. return err
  148. }
  149. m.src = next
  150. return nil
  151. }
  152. func (m *sourceManager) Stats() sdr.SourceStats {
  153. m.mu.RLock()
  154. defer m.mu.RUnlock()
  155. if sp, ok := m.src.(sdr.StatsProvider); ok {
  156. return sp.Stats()
  157. }
  158. return sdr.SourceStats{}
  159. }
  160. func (m *sourceManager) Flush() {
  161. m.mu.RLock()
  162. defer m.mu.RUnlock()
  163. if fl, ok := m.src.(sdr.Flushable); ok {
  164. fl.Flush()
  165. }
  166. }
  167. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  168. return &sourceManager{src: src, newSource: newSource}
  169. }
  170. func (m *sourceManager) Start() error {
  171. m.mu.RLock()
  172. defer m.mu.RUnlock()
  173. return m.src.Start()
  174. }
  175. func (m *sourceManager) Stop() error {
  176. m.mu.RLock()
  177. defer m.mu.RUnlock()
  178. return m.src.Stop()
  179. }
  180. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  181. m.mu.RLock()
  182. defer m.mu.RUnlock()
  183. return m.src.ReadIQ(n)
  184. }
  185. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  186. m.mu.Lock()
  187. defer m.mu.Unlock()
  188. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  189. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  190. return nil
  191. }
  192. }
  193. old := m.src
  194. _ = old.Stop()
  195. next, err := m.newSource(cfg)
  196. if err != nil {
  197. _ = old.Start()
  198. return err
  199. }
  200. if err := next.Start(); err != nil {
  201. _ = next.Stop()
  202. _ = old.Start()
  203. return err
  204. }
  205. m.src = next
  206. return nil
  207. }
  208. type dspUpdate struct {
  209. cfg config.Config
  210. det *detector.Detector
  211. window []float64
  212. dcBlock bool
  213. iqBalance bool
  214. useGPUFFT bool
  215. }
  216. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  217. select {
  218. case ch <- update:
  219. default:
  220. select {
  221. case <-ch:
  222. default:
  223. }
  224. ch <- update
  225. }
  226. }
  227. func main() {
  228. var cfgPath string
  229. var mockFlag bool
  230. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  231. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  232. flag.Parse()
  233. cfg, err := config.Load(cfgPath)
  234. if err != nil {
  235. log.Fatalf("load config: %v", err)
  236. }
  237. cfgManager := runtime.New(cfg)
  238. gpuState := &gpuStatus{Available: gpufft.Available()}
  239. newSource := func(cfg config.Config) (sdr.Source, error) {
  240. if mockFlag {
  241. src := mock.New(cfg.SampleRate)
  242. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  243. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  244. }
  245. return src, nil
  246. }
  247. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  248. if err != nil {
  249. return nil, err
  250. }
  251. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  252. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  253. }
  254. return src, nil
  255. }
  256. src, err := newSource(cfg)
  257. if err != nil {
  258. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  259. }
  260. srcMgr := newSourceManager(src, newSource)
  261. if err := srcMgr.Start(); err != nil {
  262. log.Fatalf("source start: %v", err)
  263. }
  264. defer srcMgr.Stop()
  265. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  266. log.Fatalf("event path: %v", err)
  267. }
  268. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  269. if err != nil {
  270. log.Fatalf("open events: %v", err)
  271. }
  272. defer eventFile.Close()
  273. eventMu := &sync.RWMutex{}
  274. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  275. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  276. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  277. window := fftutil.Hann(cfg.FFTSize)
  278. h := newHub()
  279. dspUpdates := make(chan dspUpdate, 1)
  280. ctx, cancel := context.WithCancel(context.Background())
  281. defer cancel()
  282. decodeMap := buildDecoderMap(cfg)
  283. recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  284. Enabled: cfg.Recorder.Enabled,
  285. MinSNRDb: cfg.Recorder.MinSNRDb,
  286. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  287. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  288. PrerollMs: cfg.Recorder.PrerollMs,
  289. RecordIQ: cfg.Recorder.RecordIQ,
  290. RecordAudio: cfg.Recorder.RecordAudio,
  291. AutoDemod: cfg.Recorder.AutoDemod,
  292. AutoDecode: cfg.Recorder.AutoDecode,
  293. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  294. OutputDir: cfg.Recorder.OutputDir,
  295. ClassFilter: cfg.Recorder.ClassFilter,
  296. RingSeconds: cfg.Recorder.RingSeconds,
  297. }, cfg.CenterHz, decodeMap)
  298. sigSnap := &signalSnapshot{}
  299. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr, sigSnap)
  300. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  301. origin := r.Header.Get("Origin")
  302. if origin == "" || origin == "null" {
  303. return true
  304. }
  305. // allow same-host or any local IP
  306. return true
  307. }}
  308. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  309. conn, err := upgrader.Upgrade(w, r, nil)
  310. if err != nil {
  311. return
  312. }
  313. c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})}
  314. h.add(c)
  315. defer func() {
  316. h.remove(c)
  317. _ = conn.Close()
  318. }()
  319. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  320. conn.SetPongHandler(func(string) error {
  321. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  322. return nil
  323. })
  324. go func() {
  325. ping := time.NewTicker(30 * time.Second)
  326. defer ping.Stop()
  327. for {
  328. select {
  329. case msg, ok := <-c.send:
  330. if !ok {
  331. return
  332. }
  333. _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  334. if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
  335. return
  336. }
  337. case <-ping.C:
  338. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  339. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  340. log.Printf("ws ping error: %v", err)
  341. return
  342. }
  343. }
  344. }
  345. }()
  346. for {
  347. _, _, err := conn.ReadMessage()
  348. if err != nil {
  349. return
  350. }
  351. }
  352. })
  353. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  354. w.Header().Set("Content-Type", "application/json")
  355. switch r.Method {
  356. case http.MethodGet:
  357. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  358. case http.MethodPost:
  359. var update runtime.ConfigUpdate
  360. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  361. http.Error(w, "invalid json", http.StatusBadRequest)
  362. return
  363. }
  364. prev := cfgManager.Snapshot()
  365. next, err := cfgManager.ApplyConfig(update)
  366. if err != nil {
  367. http.Error(w, err.Error(), http.StatusBadRequest)
  368. return
  369. }
  370. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  371. if sourceChanged {
  372. if err := srcMgr.ApplyConfig(next); err != nil {
  373. cfgManager.Replace(prev)
  374. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  375. return
  376. }
  377. }
  378. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  379. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  380. prev.Detector.HoldMs != next.Detector.HoldMs ||
  381. prev.SampleRate != next.SampleRate ||
  382. prev.FFTSize != next.FFTSize
  383. windowChanged := prev.FFTSize != next.FFTSize
  384. var newDet *detector.Detector
  385. var newWindow []float64
  386. if detChanged {
  387. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  388. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  389. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  390. }
  391. if windowChanged {
  392. newWindow = fftutil.Hann(next.FFTSize)
  393. }
  394. pushDSPUpdate(dspUpdates, dspUpdate{
  395. cfg: next,
  396. det: newDet,
  397. window: newWindow,
  398. dcBlock: next.DCBlock,
  399. iqBalance: next.IQBalance,
  400. useGPUFFT: next.UseGPUFFT,
  401. })
  402. _ = json.NewEncoder(w).Encode(next)
  403. default:
  404. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  405. }
  406. })
  407. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  408. w.Header().Set("Content-Type", "application/json")
  409. if r.Method != http.MethodPost {
  410. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  411. return
  412. }
  413. var update runtime.SettingsUpdate
  414. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  415. http.Error(w, "invalid json", http.StatusBadRequest)
  416. return
  417. }
  418. prev := cfgManager.Snapshot()
  419. next, err := cfgManager.ApplySettings(update)
  420. if err != nil {
  421. http.Error(w, err.Error(), http.StatusBadRequest)
  422. return
  423. }
  424. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  425. if err := srcMgr.ApplyConfig(next); err != nil {
  426. cfgManager.Replace(prev)
  427. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  428. return
  429. }
  430. }
  431. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  432. pushDSPUpdate(dspUpdates, dspUpdate{
  433. cfg: next,
  434. dcBlock: next.DCBlock,
  435. iqBalance: next.IQBalance,
  436. })
  437. }
  438. _ = json.NewEncoder(w).Encode(next)
  439. })
  440. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  441. w.Header().Set("Content-Type", "application/json")
  442. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  443. })
  444. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  445. w.Header().Set("Content-Type", "application/json")
  446. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  447. })
  448. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  449. w.Header().Set("Content-Type", "application/json")
  450. limit := 200
  451. if v := r.URL.Query().Get("limit"); v != "" {
  452. if parsed, err := strconv.Atoi(v); err == nil {
  453. limit = parsed
  454. }
  455. }
  456. var since time.Time
  457. if v := r.URL.Query().Get("since"); v != "" {
  458. if parsed, err := parseSince(v); err == nil {
  459. since = parsed
  460. } else {
  461. http.Error(w, "invalid since", http.StatusBadRequest)
  462. return
  463. }
  464. }
  465. snap := cfgManager.Snapshot()
  466. eventMu.RLock()
  467. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  468. eventMu.RUnlock()
  469. if err != nil {
  470. http.Error(w, "failed to read events", http.StatusInternalServerError)
  471. return
  472. }
  473. _ = json.NewEncoder(w).Encode(evs)
  474. })
  475. http.HandleFunc("/api/signals", func(w http.ResponseWriter, r *http.Request) {
  476. w.Header().Set("Content-Type", "application/json")
  477. if sigSnap == nil {
  478. _ = json.NewEncoder(w).Encode([]detector.Signal{})
  479. return
  480. }
  481. _ = json.NewEncoder(w).Encode(sigSnap.get())
  482. })
  483. http.HandleFunc("/api/decoders", func(w http.ResponseWriter, r *http.Request) {
  484. w.Header().Set("Content-Type", "application/json")
  485. _ = json.NewEncoder(w).Encode(decoderKeys(cfgManager.Snapshot()))
  486. })
  487. http.HandleFunc("/api/recordings", func(w http.ResponseWriter, r *http.Request) {
  488. if r.Method != http.MethodGet {
  489. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  490. return
  491. }
  492. w.Header().Set("Content-Type", "application/json")
  493. snap := cfgManager.Snapshot()
  494. list, err := recorder.ListRecordings(snap.Recorder.OutputDir)
  495. if err != nil {
  496. http.Error(w, "failed to list recordings", http.StatusInternalServerError)
  497. return
  498. }
  499. _ = json.NewEncoder(w).Encode(list)
  500. })
  501. http.HandleFunc("/api/recordings/", func(w http.ResponseWriter, r *http.Request) {
  502. w.Header().Set("Content-Type", "application/json")
  503. id := strings.TrimPrefix(r.URL.Path, "/api/recordings/")
  504. if id == "" {
  505. http.Error(w, "missing id", http.StatusBadRequest)
  506. return
  507. }
  508. snap := cfgManager.Snapshot()
  509. base := filepath.Clean(filepath.Join(snap.Recorder.OutputDir, id))
  510. if !strings.HasPrefix(base, filepath.Clean(snap.Recorder.OutputDir)) {
  511. http.Error(w, "invalid path", http.StatusBadRequest)
  512. return
  513. }
  514. if r.URL.Path == "/api/recordings/"+id+"/audio" {
  515. http.ServeFile(w, r, filepath.Join(base, "audio.wav"))
  516. return
  517. }
  518. if r.URL.Path == "/api/recordings/"+id+"/iq" {
  519. http.ServeFile(w, r, filepath.Join(base, "signal.cf32"))
  520. return
  521. }
  522. if r.URL.Path == "/api/recordings/"+id+"/decode" {
  523. mode := r.URL.Query().Get("mode")
  524. cmd := buildDecoderMap(cfgManager.Snapshot())[mode]
  525. if cmd == "" {
  526. http.Error(w, "decoder not configured", http.StatusBadRequest)
  527. return
  528. }
  529. meta, err := recorder.ReadMeta(filepath.Join(base, "meta.json"))
  530. if err != nil {
  531. http.Error(w, "meta read failed", http.StatusInternalServerError)
  532. return
  533. }
  534. audioPath := filepath.Join(base, "audio.wav")
  535. if _, errStat := os.Stat(audioPath); errStat != nil {
  536. audioPath = ""
  537. }
  538. res, err := recorder.DecodeOnDemand(cmd, filepath.Join(base, "signal.cf32"), meta.SampleRate, audioPath)
  539. if err != nil {
  540. http.Error(w, res.Stderr, http.StatusInternalServerError)
  541. return
  542. }
  543. _ = json.NewEncoder(w).Encode(res)
  544. return
  545. }
  546. // default: meta.json
  547. http.ServeFile(w, r, filepath.Join(base, "meta.json"))
  548. })
  549. http.HandleFunc("/api/demod", func(w http.ResponseWriter, r *http.Request) {
  550. if r.Method != http.MethodGet {
  551. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  552. return
  553. }
  554. q := r.URL.Query()
  555. freq, _ := strconv.ParseFloat(q.Get("freq"), 64)
  556. bw, _ := strconv.ParseFloat(q.Get("bw"), 64)
  557. sec, _ := strconv.Atoi(q.Get("sec"))
  558. if sec < 1 {
  559. sec = 1
  560. }
  561. if sec > 10 {
  562. sec = 10
  563. }
  564. mode := q.Get("mode")
  565. data, _, err := recMgr.DemodLive(freq, bw, mode, sec)
  566. if err != nil {
  567. http.Error(w, err.Error(), http.StatusBadRequest)
  568. return
  569. }
  570. w.Header().Set("Content-Type", "audio/wav")
  571. _, _ = w.Write(data)
  572. })
  573. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  574. server := &http.Server{Addr: cfg.WebAddr}
  575. go func() {
  576. log.Printf("web listening on %s", cfg.WebAddr)
  577. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  578. log.Fatalf("server: %v", err)
  579. }
  580. }()
  581. stop := make(chan os.Signal, 1)
  582. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  583. <-stop
  584. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  585. defer cancelTimeout()
  586. _ = server.Shutdown(ctxTimeout)
  587. }
  588. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) {
  589. ticker := time.NewTicker(cfg.FrameInterval())
  590. defer ticker.Stop()
  591. logTicker := time.NewTicker(5 * time.Second)
  592. defer logTicker.Stop()
  593. enc := json.NewEncoder(eventFile)
  594. dcBlocker := dsp.NewDCBlocker(0.995)
  595. dcEnabled := cfg.DCBlock
  596. iqEnabled := cfg.IQBalance
  597. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  598. useGPU := cfg.UseGPUFFT
  599. var gpuEngine *gpufft.Engine
  600. if useGPU && gpuState != nil {
  601. snap := gpuState.snapshot()
  602. if snap.Available {
  603. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  604. gpuEngine = eng
  605. gpuState.set(true, nil)
  606. } else {
  607. gpuState.set(false, err)
  608. useGPU = false
  609. }
  610. } else {
  611. gpuState.set(false, nil)
  612. useGPU = false
  613. }
  614. } else if gpuState != nil {
  615. gpuState.set(false, nil)
  616. }
  617. gotSamples := false
  618. for {
  619. select {
  620. case <-ctx.Done():
  621. return
  622. case <-logTicker.C:
  623. st := srcMgr.Stats()
  624. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  625. case upd := <-updates:
  626. prevFFT := cfg.FFTSize
  627. prevUseGPU := useGPU
  628. cfg = upd.cfg
  629. if rec != nil {
  630. rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  631. Enabled: cfg.Recorder.Enabled,
  632. MinSNRDb: cfg.Recorder.MinSNRDb,
  633. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  634. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  635. PrerollMs: cfg.Recorder.PrerollMs,
  636. RecordIQ: cfg.Recorder.RecordIQ,
  637. RecordAudio: cfg.Recorder.RecordAudio,
  638. AutoDemod: cfg.Recorder.AutoDemod,
  639. AutoDecode: cfg.Recorder.AutoDecode,
  640. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  641. OutputDir: cfg.Recorder.OutputDir,
  642. ClassFilter: cfg.Recorder.ClassFilter,
  643. RingSeconds: cfg.Recorder.RingSeconds,
  644. }, cfg.CenterHz, buildDecoderMap(cfg))
  645. }
  646. if upd.det != nil {
  647. det = upd.det
  648. }
  649. if upd.window != nil {
  650. window = upd.window
  651. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  652. }
  653. dcEnabled = upd.dcBlock
  654. iqEnabled = upd.iqBalance
  655. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  656. srcMgr.Flush()
  657. gotSamples = false
  658. if gpuEngine != nil {
  659. gpuEngine.Close()
  660. gpuEngine = nil
  661. }
  662. useGPU = cfg.UseGPUFFT
  663. if useGPU && gpuState != nil {
  664. snap := gpuState.snapshot()
  665. if snap.Available {
  666. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  667. gpuEngine = eng
  668. gpuState.set(true, nil)
  669. } else {
  670. gpuState.set(false, err)
  671. useGPU = false
  672. }
  673. } else {
  674. gpuState.set(false, nil)
  675. useGPU = false
  676. }
  677. } else if gpuState != nil {
  678. gpuState.set(false, nil)
  679. }
  680. }
  681. dcBlocker.Reset()
  682. ticker.Reset(cfg.FrameInterval())
  683. case <-ticker.C:
  684. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  685. if err != nil {
  686. log.Printf("read IQ: %v", err)
  687. if strings.Contains(err.Error(), "timeout") {
  688. if err := srcMgr.Restart(cfg); err != nil {
  689. log.Printf("restart failed: %v", err)
  690. }
  691. }
  692. continue
  693. }
  694. if rec != nil {
  695. rec.Ingest(time.Now(), iq)
  696. }
  697. if !gotSamples {
  698. log.Printf("received IQ samples")
  699. gotSamples = true
  700. }
  701. if dcEnabled {
  702. dcBlocker.Apply(iq)
  703. }
  704. if iqEnabled {
  705. dsp.IQBalance(iq)
  706. }
  707. var spectrum []float64
  708. if useGPU && gpuEngine != nil {
  709. if len(window) == len(iq) {
  710. for i := 0; i < len(iq); i++ {
  711. v := iq[i]
  712. w := float32(window[i])
  713. iq[i] = complex(real(v)*w, imag(v)*w)
  714. }
  715. }
  716. out, err := gpuEngine.Exec(iq)
  717. if err != nil {
  718. if gpuState != nil {
  719. gpuState.set(false, err)
  720. }
  721. useGPU = false
  722. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  723. } else {
  724. spectrum = fftutil.SpectrumFromFFT(out)
  725. }
  726. } else {
  727. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  728. }
  729. now := time.Now()
  730. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  731. // enrich classification with temporal IQ features on per-signal snippet
  732. if len(iq) > 0 {
  733. for i := range signals {
  734. snip := extractSignalIQ(iq, cfg.SampleRate, cfg.CenterHz, signals[i].CenterHz, signals[i].BWHz)
  735. cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, snip)
  736. signals[i].Class = cls
  737. }
  738. det.UpdateClasses(signals)
  739. }
  740. if sigSnap != nil {
  741. sigSnap.set(signals)
  742. }
  743. eventMu.Lock()
  744. for _, ev := range finished {
  745. _ = enc.Encode(ev)
  746. }
  747. eventMu.Unlock()
  748. if rec != nil {
  749. rec.OnEvents(finished)
  750. }
  751. h.broadcast(SpectrumFrame{
  752. Timestamp: now.UnixMilli(),
  753. CenterHz: cfg.CenterHz,
  754. SampleHz: cfg.SampleRate,
  755. FFTSize: cfg.FFTSize,
  756. Spectrum: spectrum,
  757. Signals: signals,
  758. })
  759. }
  760. }
  761. }
  762. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  763. if raw == "" {
  764. return fallback
  765. }
  766. if d, err := time.ParseDuration(raw); err == nil {
  767. return d
  768. }
  769. return fallback
  770. }
  771. func buildDecoderMap(cfg config.Config) map[string]string {
  772. out := map[string]string{}
  773. if cfg.Decoder.FT8Cmd != "" {
  774. out["FT8"] = cfg.Decoder.FT8Cmd
  775. }
  776. if cfg.Decoder.WSPRCmd != "" {
  777. out["WSPR"] = cfg.Decoder.WSPRCmd
  778. }
  779. if cfg.Decoder.DMRCmd != "" {
  780. out["DMR"] = cfg.Decoder.DMRCmd
  781. }
  782. if cfg.Decoder.DStarCmd != "" {
  783. out["D-STAR"] = cfg.Decoder.DStarCmd
  784. }
  785. if cfg.Decoder.FSKCmd != "" {
  786. out["FSK"] = cfg.Decoder.FSKCmd
  787. }
  788. if cfg.Decoder.PSKCmd != "" {
  789. out["PSK"] = cfg.Decoder.PSKCmd
  790. }
  791. return out
  792. }
  793. func decoderKeys(cfg config.Config) []string {
  794. m := buildDecoderMap(cfg)
  795. keys := make([]string, 0, len(m))
  796. for k := range m {
  797. keys = append(keys, k)
  798. }
  799. sort.Strings(keys)
  800. return keys
  801. }
  802. func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
  803. if len(iq) == 0 || sampleRate <= 0 {
  804. return nil
  805. }
  806. offset := sigHz - centerHz
  807. shifted := dsp.FreqShift(iq, sampleRate, offset)
  808. cutoff := bwHz / 2
  809. if cutoff < 200 {
  810. cutoff = 200
  811. }
  812. if cutoff > float64(sampleRate)/2-1 {
  813. cutoff = float64(sampleRate)/2 - 1
  814. }
  815. taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
  816. filtered := dsp.ApplyFIR(shifted, taps)
  817. decim := sampleRate / 200000
  818. if decim < 1 {
  819. decim = 1
  820. }
  821. return dsp.Decimate(filtered, decim)
  822. }
  823. func parseSince(raw string) (time.Time, error) {
  824. if raw == "" {
  825. return time.Time{}, nil
  826. }
  827. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  828. if ms > 1e12 {
  829. return time.UnixMilli(ms), nil
  830. }
  831. return time.Unix(ms, 0), nil
  832. }
  833. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  834. return t, nil
  835. }
  836. return time.Parse(time.RFC3339, raw)
  837. }