You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

609 lines
15KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/gorilla/websocket"
  17. "sdr-visual-suite/internal/config"
  18. "sdr-visual-suite/internal/detector"
  19. "sdr-visual-suite/internal/dsp"
  20. "sdr-visual-suite/internal/events"
  21. fftutil "sdr-visual-suite/internal/fft"
  22. "sdr-visual-suite/internal/fft/gpufft"
  23. "sdr-visual-suite/internal/mock"
  24. "sdr-visual-suite/internal/runtime"
  25. "sdr-visual-suite/internal/sdr"
  26. "sdr-visual-suite/internal/sdrplay"
  27. )
  28. type SpectrumFrame struct {
  29. Timestamp int64 `json:"ts"`
  30. CenterHz float64 `json:"center_hz"`
  31. SampleHz int `json:"sample_rate"`
  32. FFTSize int `json:"fft_size"`
  33. Spectrum []float64 `json:"spectrum_db"`
  34. Signals []detector.Signal `json:"signals"`
  35. }
  36. type hub struct {
  37. mu sync.Mutex
  38. clients map[*websocket.Conn]struct{}
  39. }
  40. type gpuStatus struct {
  41. mu sync.RWMutex
  42. Available bool `json:"available"`
  43. Active bool `json:"active"`
  44. Error string `json:"error"`
  45. }
  46. func (g *gpuStatus) set(active bool, err error) {
  47. g.mu.Lock()
  48. defer g.mu.Unlock()
  49. g.Active = active
  50. if err != nil {
  51. g.Error = err.Error()
  52. } else {
  53. g.Error = ""
  54. }
  55. }
  56. func (g *gpuStatus) snapshot() gpuStatus {
  57. g.mu.RLock()
  58. defer g.mu.RUnlock()
  59. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  60. }
  61. func newHub() *hub {
  62. return &hub{clients: map[*websocket.Conn]struct{}{}}
  63. }
  64. func (h *hub) add(c *websocket.Conn) {
  65. h.mu.Lock()
  66. defer h.mu.Unlock()
  67. h.clients[c] = struct{}{}
  68. }
  69. func (h *hub) remove(c *websocket.Conn) {
  70. h.mu.Lock()
  71. defer h.mu.Unlock()
  72. delete(h.clients, c)
  73. }
  74. func (h *hub) broadcast(frame SpectrumFrame) {
  75. h.mu.Lock()
  76. clients := make([]*websocket.Conn, 0, len(h.clients))
  77. for c := range h.clients {
  78. clients = append(clients, c)
  79. }
  80. h.mu.Unlock()
  81. b, _ := json.Marshal(frame)
  82. for _, c := range clients {
  83. _ = c.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  84. if err := c.WriteMessage(websocket.TextMessage, b); err != nil {
  85. h.remove(c)
  86. }
  87. }
  88. }
  89. type sourceManager struct {
  90. mu sync.RWMutex
  91. src sdr.Source
  92. newSource func(cfg config.Config) (sdr.Source, error)
  93. }
  94. func (m *sourceManager) Restart(cfg config.Config) error {
  95. m.mu.Lock()
  96. defer m.mu.Unlock()
  97. old := m.src
  98. _ = old.Stop()
  99. next, err := m.newSource(cfg)
  100. if err != nil {
  101. _ = old.Start()
  102. m.src = old
  103. return err
  104. }
  105. if err := next.Start(); err != nil {
  106. _ = next.Stop()
  107. _ = old.Start()
  108. m.src = old
  109. return err
  110. }
  111. m.src = next
  112. return nil
  113. }
  114. func (m *sourceManager) Stats() sdr.SourceStats {
  115. m.mu.RLock()
  116. defer m.mu.RUnlock()
  117. if sp, ok := m.src.(sdr.StatsProvider); ok {
  118. return sp.Stats()
  119. }
  120. return sdr.SourceStats{}
  121. }
  122. func (m *sourceManager) Flush() {
  123. m.mu.RLock()
  124. defer m.mu.RUnlock()
  125. if fl, ok := m.src.(sdr.Flushable); ok {
  126. fl.Flush()
  127. }
  128. }
  129. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  130. return &sourceManager{src: src, newSource: newSource}
  131. }
  132. func (m *sourceManager) Start() error {
  133. m.mu.RLock()
  134. defer m.mu.RUnlock()
  135. return m.src.Start()
  136. }
  137. func (m *sourceManager) Stop() error {
  138. m.mu.RLock()
  139. defer m.mu.RUnlock()
  140. return m.src.Stop()
  141. }
  142. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  143. m.mu.RLock()
  144. defer m.mu.RUnlock()
  145. return m.src.ReadIQ(n)
  146. }
  147. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  148. m.mu.Lock()
  149. defer m.mu.Unlock()
  150. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  151. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  152. return nil
  153. }
  154. }
  155. old := m.src
  156. _ = old.Stop()
  157. next, err := m.newSource(cfg)
  158. if err != nil {
  159. _ = old.Start()
  160. return err
  161. }
  162. if err := next.Start(); err != nil {
  163. _ = next.Stop()
  164. _ = old.Start()
  165. return err
  166. }
  167. m.src = next
  168. return nil
  169. }
  170. type dspUpdate struct {
  171. cfg config.Config
  172. det *detector.Detector
  173. window []float64
  174. dcBlock bool
  175. iqBalance bool
  176. useGPUFFT bool
  177. }
  178. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  179. select {
  180. case ch <- update:
  181. default:
  182. select {
  183. case <-ch:
  184. default:
  185. }
  186. ch <- update
  187. }
  188. }
  189. func main() {
  190. var cfgPath string
  191. var mockFlag bool
  192. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  193. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  194. flag.Parse()
  195. cfg, err := config.Load(cfgPath)
  196. if err != nil {
  197. log.Fatalf("load config: %v", err)
  198. }
  199. cfgManager := runtime.New(cfg)
  200. gpuState := &gpuStatus{Available: gpufft.Available()}
  201. newSource := func(cfg config.Config) (sdr.Source, error) {
  202. if mockFlag {
  203. src := mock.New(cfg.SampleRate)
  204. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  205. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  206. }
  207. return src, nil
  208. }
  209. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  210. if err != nil {
  211. return nil, err
  212. }
  213. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  214. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  215. }
  216. return src, nil
  217. }
  218. src, err := newSource(cfg)
  219. if err != nil {
  220. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  221. }
  222. srcMgr := newSourceManager(src, newSource)
  223. if err := srcMgr.Start(); err != nil {
  224. log.Fatalf("source start: %v", err)
  225. }
  226. defer srcMgr.Stop()
  227. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  228. log.Fatalf("event path: %v", err)
  229. }
  230. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  231. if err != nil {
  232. log.Fatalf("open events: %v", err)
  233. }
  234. defer eventFile.Close()
  235. eventMu := &sync.Mutex{}
  236. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  237. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  238. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  239. window := fftutil.Hann(cfg.FFTSize)
  240. h := newHub()
  241. dspUpdates := make(chan dspUpdate, 1)
  242. ctx, cancel := context.WithCancel(context.Background())
  243. defer cancel()
  244. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState)
  245. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  246. origin := r.Header.Get("Origin")
  247. return origin == "" || strings.HasPrefix(origin, "http://localhost") || strings.HasPrefix(origin, "http://127.0.0.1")
  248. }}
  249. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  250. c, err := upgrader.Upgrade(w, r, nil)
  251. if err != nil {
  252. return
  253. }
  254. h.add(c)
  255. defer func() {
  256. h.remove(c)
  257. _ = c.Close()
  258. }()
  259. c.SetReadDeadline(time.Now().Add(60 * time.Second))
  260. c.SetPongHandler(func(string) error {
  261. c.SetReadDeadline(time.Now().Add(60 * time.Second))
  262. return nil
  263. })
  264. go func() {
  265. ticker := time.NewTicker(30 * time.Second)
  266. defer ticker.Stop()
  267. for range ticker.C {
  268. _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second))
  269. if err := c.WriteMessage(websocket.PingMessage, nil); err != nil {
  270. return
  271. }
  272. }
  273. }()
  274. for {
  275. _, _, err := c.ReadMessage()
  276. if err != nil {
  277. return
  278. }
  279. }
  280. })
  281. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  282. w.Header().Set("Content-Type", "application/json")
  283. switch r.Method {
  284. case http.MethodGet:
  285. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  286. case http.MethodPost:
  287. var update runtime.ConfigUpdate
  288. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  289. http.Error(w, "invalid json", http.StatusBadRequest)
  290. return
  291. }
  292. prev := cfgManager.Snapshot()
  293. next, err := cfgManager.ApplyConfig(update)
  294. if err != nil {
  295. http.Error(w, err.Error(), http.StatusBadRequest)
  296. return
  297. }
  298. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  299. if sourceChanged {
  300. if err := srcMgr.ApplyConfig(next); err != nil {
  301. cfgManager.Replace(prev)
  302. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  303. return
  304. }
  305. }
  306. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  307. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  308. prev.Detector.HoldMs != next.Detector.HoldMs ||
  309. prev.SampleRate != next.SampleRate ||
  310. prev.FFTSize != next.FFTSize
  311. windowChanged := prev.FFTSize != next.FFTSize
  312. var newDet *detector.Detector
  313. var newWindow []float64
  314. if detChanged {
  315. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  316. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  317. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  318. }
  319. if windowChanged {
  320. newWindow = fftutil.Hann(next.FFTSize)
  321. }
  322. pushDSPUpdate(dspUpdates, dspUpdate{
  323. cfg: next,
  324. det: newDet,
  325. window: newWindow,
  326. dcBlock: next.DCBlock,
  327. iqBalance: next.IQBalance,
  328. useGPUFFT: next.UseGPUFFT,
  329. })
  330. _ = json.NewEncoder(w).Encode(next)
  331. default:
  332. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  333. }
  334. })
  335. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  336. w.Header().Set("Content-Type", "application/json")
  337. if r.Method != http.MethodPost {
  338. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  339. return
  340. }
  341. var update runtime.SettingsUpdate
  342. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  343. http.Error(w, "invalid json", http.StatusBadRequest)
  344. return
  345. }
  346. prev := cfgManager.Snapshot()
  347. next, err := cfgManager.ApplySettings(update)
  348. if err != nil {
  349. http.Error(w, err.Error(), http.StatusBadRequest)
  350. return
  351. }
  352. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  353. if err := srcMgr.ApplyConfig(next); err != nil {
  354. cfgManager.Replace(prev)
  355. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  356. return
  357. }
  358. }
  359. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  360. pushDSPUpdate(dspUpdates, dspUpdate{
  361. cfg: next,
  362. dcBlock: next.DCBlock,
  363. iqBalance: next.IQBalance,
  364. })
  365. }
  366. _ = json.NewEncoder(w).Encode(next)
  367. })
  368. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  369. w.Header().Set("Content-Type", "application/json")
  370. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  371. })
  372. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  373. w.Header().Set("Content-Type", "application/json")
  374. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  375. })
  376. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  377. w.Header().Set("Content-Type", "application/json")
  378. limit := 200
  379. if v := r.URL.Query().Get("limit"); v != "" {
  380. if parsed, err := strconv.Atoi(v); err == nil {
  381. limit = parsed
  382. }
  383. }
  384. var since time.Time
  385. if v := r.URL.Query().Get("since"); v != "" {
  386. if parsed, err := parseSince(v); err == nil {
  387. since = parsed
  388. } else {
  389. http.Error(w, "invalid since", http.StatusBadRequest)
  390. return
  391. }
  392. }
  393. snap := cfgManager.Snapshot()
  394. eventMu.Lock()
  395. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  396. eventMu.Unlock()
  397. if err != nil {
  398. http.Error(w, "failed to read events", http.StatusInternalServerError)
  399. return
  400. }
  401. _ = json.NewEncoder(w).Encode(evs)
  402. })
  403. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  404. server := &http.Server{Addr: cfg.WebAddr}
  405. go func() {
  406. log.Printf("web listening on %s", cfg.WebAddr)
  407. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  408. log.Fatalf("server: %v", err)
  409. }
  410. }()
  411. stop := make(chan os.Signal, 1)
  412. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  413. <-stop
  414. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  415. defer cancelTimeout()
  416. _ = server.Shutdown(ctxTimeout)
  417. }
  418. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.Mutex, updates <-chan dspUpdate, gpuState *gpuStatus) {
  419. ticker := time.NewTicker(cfg.FrameInterval())
  420. defer ticker.Stop()
  421. logTicker := time.NewTicker(5 * time.Second)
  422. defer logTicker.Stop()
  423. enc := json.NewEncoder(eventFile)
  424. dcBlocker := dsp.NewDCBlocker(0.995)
  425. dcEnabled := cfg.DCBlock
  426. iqEnabled := cfg.IQBalance
  427. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  428. useGPU := cfg.UseGPUFFT
  429. var gpuEngine *gpufft.Engine
  430. if useGPU && gpuState != nil && gpuState.Available {
  431. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  432. gpuEngine = eng
  433. gpuState.set(true, nil)
  434. } else {
  435. gpuState.set(false, err)
  436. useGPU = false
  437. }
  438. } else if gpuState != nil {
  439. gpuState.set(false, nil)
  440. }
  441. gotSamples := false
  442. for {
  443. select {
  444. case <-ctx.Done():
  445. return
  446. case <-logTicker.C:
  447. st := srcMgr.Stats()
  448. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  449. case upd := <-updates:
  450. prevFFT := cfg.FFTSize
  451. prevUseGPU := useGPU
  452. cfg = upd.cfg
  453. if upd.det != nil {
  454. det = upd.det
  455. }
  456. if upd.window != nil {
  457. window = upd.window
  458. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  459. }
  460. dcEnabled = upd.dcBlock
  461. iqEnabled = upd.iqBalance
  462. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  463. srcMgr.Flush()
  464. gotSamples = false
  465. if gpuEngine != nil {
  466. gpuEngine.Close()
  467. gpuEngine = nil
  468. }
  469. useGPU = cfg.UseGPUFFT
  470. if useGPU && gpuState != nil && gpuState.Available {
  471. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  472. gpuEngine = eng
  473. gpuState.set(true, nil)
  474. } else {
  475. gpuState.set(false, err)
  476. useGPU = false
  477. }
  478. } else if gpuState != nil {
  479. gpuState.set(false, nil)
  480. }
  481. }
  482. dcBlocker.Reset()
  483. ticker.Reset(cfg.FrameInterval())
  484. case <-ticker.C:
  485. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  486. if err != nil {
  487. log.Printf("read IQ: %v", err)
  488. if strings.Contains(err.Error(), "timeout") {
  489. if err := srcMgr.Restart(cfg); err != nil {
  490. log.Printf("restart failed: %v", err)
  491. }
  492. }
  493. continue
  494. }
  495. if !gotSamples {
  496. log.Printf("received IQ samples")
  497. gotSamples = true
  498. }
  499. if dcEnabled {
  500. dcBlocker.Apply(iq)
  501. }
  502. if iqEnabled {
  503. dsp.IQBalance(iq)
  504. }
  505. var spectrum []float64
  506. if useGPU && gpuEngine != nil {
  507. if len(window) == len(iq) {
  508. for i := 0; i < len(iq); i++ {
  509. v := iq[i]
  510. w := float32(window[i])
  511. iq[i] = complex(real(v)*w, imag(v)*w)
  512. }
  513. }
  514. out, err := gpuEngine.Exec(iq)
  515. if err != nil {
  516. if gpuState != nil {
  517. gpuState.set(false, err)
  518. }
  519. useGPU = false
  520. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  521. } else {
  522. spectrum = fftutil.SpectrumFromFFT(out)
  523. }
  524. } else {
  525. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  526. }
  527. now := time.Now()
  528. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  529. eventMu.Lock()
  530. for _, ev := range finished {
  531. _ = enc.Encode(ev)
  532. }
  533. eventMu.Unlock()
  534. h.broadcast(SpectrumFrame{
  535. Timestamp: now.UnixMilli(),
  536. CenterHz: cfg.CenterHz,
  537. SampleHz: cfg.SampleRate,
  538. FFTSize: cfg.FFTSize,
  539. Spectrum: spectrum,
  540. Signals: signals,
  541. })
  542. }
  543. }
  544. }
  545. func parseSince(raw string) (time.Time, error) {
  546. if raw == "" {
  547. return time.Time{}, nil
  548. }
  549. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  550. if ms > 1e12 {
  551. return time.UnixMilli(ms), nil
  552. }
  553. return time.Unix(ms, 0), nil
  554. }
  555. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  556. return t, nil
  557. }
  558. return time.Parse(time.RFC3339, raw)
  559. }