您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

633 行
16KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/gorilla/websocket"
  17. "sdr-visual-suite/internal/config"
  18. "sdr-visual-suite/internal/detector"
  19. "sdr-visual-suite/internal/dsp"
  20. "sdr-visual-suite/internal/events"
  21. fftutil "sdr-visual-suite/internal/fft"
  22. "sdr-visual-suite/internal/fft/gpufft"
  23. "sdr-visual-suite/internal/mock"
  24. "sdr-visual-suite/internal/runtime"
  25. "sdr-visual-suite/internal/sdr"
  26. "sdr-visual-suite/internal/sdrplay"
  27. )
  28. type SpectrumFrame struct {
  29. Timestamp int64 `json:"ts"`
  30. CenterHz float64 `json:"center_hz"`
  31. SampleHz int `json:"sample_rate"`
  32. FFTSize int `json:"fft_size"`
  33. Spectrum []float64 `json:"spectrum_db"`
  34. Signals []detector.Signal `json:"signals"`
  35. }
  36. type client struct {
  37. conn *websocket.Conn
  38. send chan []byte
  39. }
  40. type hub struct {
  41. mu sync.Mutex
  42. clients map[*client]struct{}
  43. }
  44. type gpuStatus struct {
  45. mu sync.RWMutex
  46. Available bool `json:"available"`
  47. Active bool `json:"active"`
  48. Error string `json:"error"`
  49. }
  50. func (g *gpuStatus) set(active bool, err error) {
  51. g.mu.Lock()
  52. defer g.mu.Unlock()
  53. g.Active = active
  54. if err != nil {
  55. g.Error = err.Error()
  56. } else {
  57. g.Error = ""
  58. }
  59. }
  60. func (g *gpuStatus) snapshot() gpuStatus {
  61. g.mu.RLock()
  62. defer g.mu.RUnlock()
  63. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  64. }
  65. func newHub() *hub {
  66. return &hub{clients: map[*client]struct{}{}}
  67. }
  68. func (h *hub) add(c *client) {
  69. h.mu.Lock()
  70. defer h.mu.Unlock()
  71. h.clients[c] = struct{}{}
  72. }
  73. func (h *hub) remove(c *client) {
  74. h.mu.Lock()
  75. defer h.mu.Unlock()
  76. delete(h.clients, c)
  77. close(c.send)
  78. }
  79. func (h *hub) broadcast(frame SpectrumFrame) {
  80. b, err := json.Marshal(frame)
  81. if err != nil {
  82. log.Printf("marshal frame: %v", err)
  83. return
  84. }
  85. h.mu.Lock()
  86. clients := make([]*client, 0, len(h.clients))
  87. for c := range h.clients {
  88. clients = append(clients, c)
  89. }
  90. h.mu.Unlock()
  91. for _, c := range clients {
  92. select {
  93. case c.send <- b:
  94. default:
  95. h.remove(c)
  96. }
  97. }
  98. }
  99. type sourceManager struct {
  100. mu sync.RWMutex
  101. src sdr.Source
  102. newSource func(cfg config.Config) (sdr.Source, error)
  103. }
  104. func (m *sourceManager) Restart(cfg config.Config) error {
  105. m.mu.Lock()
  106. defer m.mu.Unlock()
  107. old := m.src
  108. _ = old.Stop()
  109. next, err := m.newSource(cfg)
  110. if err != nil {
  111. _ = old.Start()
  112. m.src = old
  113. return err
  114. }
  115. if err := next.Start(); err != nil {
  116. _ = next.Stop()
  117. _ = old.Start()
  118. m.src = old
  119. return err
  120. }
  121. m.src = next
  122. return nil
  123. }
  124. func (m *sourceManager) Stats() sdr.SourceStats {
  125. m.mu.RLock()
  126. defer m.mu.RUnlock()
  127. if sp, ok := m.src.(sdr.StatsProvider); ok {
  128. return sp.Stats()
  129. }
  130. return sdr.SourceStats{}
  131. }
  132. func (m *sourceManager) Flush() {
  133. m.mu.RLock()
  134. defer m.mu.RUnlock()
  135. if fl, ok := m.src.(sdr.Flushable); ok {
  136. fl.Flush()
  137. }
  138. }
  139. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  140. return &sourceManager{src: src, newSource: newSource}
  141. }
  142. func (m *sourceManager) Start() error {
  143. m.mu.RLock()
  144. defer m.mu.RUnlock()
  145. return m.src.Start()
  146. }
  147. func (m *sourceManager) Stop() error {
  148. m.mu.RLock()
  149. defer m.mu.RUnlock()
  150. return m.src.Stop()
  151. }
  152. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  153. m.mu.RLock()
  154. defer m.mu.RUnlock()
  155. return m.src.ReadIQ(n)
  156. }
  157. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  158. m.mu.Lock()
  159. defer m.mu.Unlock()
  160. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  161. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  162. return nil
  163. }
  164. }
  165. old := m.src
  166. _ = old.Stop()
  167. next, err := m.newSource(cfg)
  168. if err != nil {
  169. _ = old.Start()
  170. return err
  171. }
  172. if err := next.Start(); err != nil {
  173. _ = next.Stop()
  174. _ = old.Start()
  175. return err
  176. }
  177. m.src = next
  178. return nil
  179. }
  180. type dspUpdate struct {
  181. cfg config.Config
  182. det *detector.Detector
  183. window []float64
  184. dcBlock bool
  185. iqBalance bool
  186. useGPUFFT bool
  187. }
  188. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  189. select {
  190. case ch <- update:
  191. default:
  192. select {
  193. case <-ch:
  194. default:
  195. }
  196. ch <- update
  197. }
  198. }
  199. func main() {
  200. var cfgPath string
  201. var mockFlag bool
  202. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  203. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  204. flag.Parse()
  205. cfg, err := config.Load(cfgPath)
  206. if err != nil {
  207. log.Fatalf("load config: %v", err)
  208. }
  209. cfgManager := runtime.New(cfg)
  210. gpuState := &gpuStatus{Available: gpufft.Available()}
  211. newSource := func(cfg config.Config) (sdr.Source, error) {
  212. if mockFlag {
  213. src := mock.New(cfg.SampleRate)
  214. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  215. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  216. }
  217. return src, nil
  218. }
  219. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  220. if err != nil {
  221. return nil, err
  222. }
  223. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  224. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  225. }
  226. return src, nil
  227. }
  228. src, err := newSource(cfg)
  229. if err != nil {
  230. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  231. }
  232. srcMgr := newSourceManager(src, newSource)
  233. if err := srcMgr.Start(); err != nil {
  234. log.Fatalf("source start: %v", err)
  235. }
  236. defer srcMgr.Stop()
  237. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  238. log.Fatalf("event path: %v", err)
  239. }
  240. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  241. if err != nil {
  242. log.Fatalf("open events: %v", err)
  243. }
  244. defer eventFile.Close()
  245. eventMu := &sync.Mutex{}
  246. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  247. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  248. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  249. window := fftutil.Hann(cfg.FFTSize)
  250. h := newHub()
  251. dspUpdates := make(chan dspUpdate, 1)
  252. ctx, cancel := context.WithCancel(context.Background())
  253. defer cancel()
  254. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState)
  255. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  256. origin := r.Header.Get("Origin")
  257. return origin == "" || strings.HasPrefix(origin, "http://localhost") || strings.HasPrefix(origin, "http://127.0.0.1")
  258. }}
  259. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  260. conn, err := upgrader.Upgrade(w, r, nil)
  261. if err != nil {
  262. return
  263. }
  264. c := &client{conn: conn, send: make(chan []byte, 32)}
  265. h.add(c)
  266. defer func() {
  267. h.remove(c)
  268. _ = conn.Close()
  269. }()
  270. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  271. conn.SetPongHandler(func(string) error {
  272. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  273. return nil
  274. })
  275. go func() {
  276. ping := time.NewTicker(30 * time.Second)
  277. defer ping.Stop()
  278. for {
  279. select {
  280. case msg, ok := <-c.send:
  281. if !ok {
  282. return
  283. }
  284. _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  285. if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
  286. return
  287. }
  288. case <-ping.C:
  289. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  290. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  291. return
  292. }
  293. }
  294. }
  295. }()
  296. for {
  297. _, _, err := conn.ReadMessage()
  298. if err != nil {
  299. return
  300. }
  301. }
  302. })
  303. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  304. w.Header().Set("Content-Type", "application/json")
  305. switch r.Method {
  306. case http.MethodGet:
  307. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  308. case http.MethodPost:
  309. var update runtime.ConfigUpdate
  310. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  311. http.Error(w, "invalid json", http.StatusBadRequest)
  312. return
  313. }
  314. prev := cfgManager.Snapshot()
  315. next, err := cfgManager.ApplyConfig(update)
  316. if err != nil {
  317. http.Error(w, err.Error(), http.StatusBadRequest)
  318. return
  319. }
  320. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  321. if sourceChanged {
  322. if err := srcMgr.ApplyConfig(next); err != nil {
  323. cfgManager.Replace(prev)
  324. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  325. return
  326. }
  327. }
  328. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  329. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  330. prev.Detector.HoldMs != next.Detector.HoldMs ||
  331. prev.SampleRate != next.SampleRate ||
  332. prev.FFTSize != next.FFTSize
  333. windowChanged := prev.FFTSize != next.FFTSize
  334. var newDet *detector.Detector
  335. var newWindow []float64
  336. if detChanged {
  337. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  338. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  339. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  340. }
  341. if windowChanged {
  342. newWindow = fftutil.Hann(next.FFTSize)
  343. }
  344. pushDSPUpdate(dspUpdates, dspUpdate{
  345. cfg: next,
  346. det: newDet,
  347. window: newWindow,
  348. dcBlock: next.DCBlock,
  349. iqBalance: next.IQBalance,
  350. useGPUFFT: next.UseGPUFFT,
  351. })
  352. _ = json.NewEncoder(w).Encode(next)
  353. default:
  354. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  355. }
  356. })
  357. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  358. w.Header().Set("Content-Type", "application/json")
  359. if r.Method != http.MethodPost {
  360. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  361. return
  362. }
  363. var update runtime.SettingsUpdate
  364. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  365. http.Error(w, "invalid json", http.StatusBadRequest)
  366. return
  367. }
  368. prev := cfgManager.Snapshot()
  369. next, err := cfgManager.ApplySettings(update)
  370. if err != nil {
  371. http.Error(w, err.Error(), http.StatusBadRequest)
  372. return
  373. }
  374. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  375. if err := srcMgr.ApplyConfig(next); err != nil {
  376. cfgManager.Replace(prev)
  377. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  378. return
  379. }
  380. }
  381. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  382. pushDSPUpdate(dspUpdates, dspUpdate{
  383. cfg: next,
  384. dcBlock: next.DCBlock,
  385. iqBalance: next.IQBalance,
  386. })
  387. }
  388. _ = json.NewEncoder(w).Encode(next)
  389. })
  390. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  391. w.Header().Set("Content-Type", "application/json")
  392. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  393. })
  394. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  395. w.Header().Set("Content-Type", "application/json")
  396. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  397. })
  398. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  399. w.Header().Set("Content-Type", "application/json")
  400. limit := 200
  401. if v := r.URL.Query().Get("limit"); v != "" {
  402. if parsed, err := strconv.Atoi(v); err == nil {
  403. limit = parsed
  404. }
  405. }
  406. var since time.Time
  407. if v := r.URL.Query().Get("since"); v != "" {
  408. if parsed, err := parseSince(v); err == nil {
  409. since = parsed
  410. } else {
  411. http.Error(w, "invalid since", http.StatusBadRequest)
  412. return
  413. }
  414. }
  415. snap := cfgManager.Snapshot()
  416. eventMu.Lock()
  417. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  418. eventMu.Unlock()
  419. if err != nil {
  420. http.Error(w, "failed to read events", http.StatusInternalServerError)
  421. return
  422. }
  423. _ = json.NewEncoder(w).Encode(evs)
  424. })
  425. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  426. server := &http.Server{Addr: cfg.WebAddr}
  427. go func() {
  428. log.Printf("web listening on %s", cfg.WebAddr)
  429. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  430. log.Fatalf("server: %v", err)
  431. }
  432. }()
  433. stop := make(chan os.Signal, 1)
  434. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  435. <-stop
  436. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  437. defer cancelTimeout()
  438. _ = server.Shutdown(ctxTimeout)
  439. }
  440. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.Mutex, updates <-chan dspUpdate, gpuState *gpuStatus) {
  441. ticker := time.NewTicker(cfg.FrameInterval())
  442. defer ticker.Stop()
  443. logTicker := time.NewTicker(5 * time.Second)
  444. defer logTicker.Stop()
  445. enc := json.NewEncoder(eventFile)
  446. dcBlocker := dsp.NewDCBlocker(0.995)
  447. dcEnabled := cfg.DCBlock
  448. iqEnabled := cfg.IQBalance
  449. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  450. useGPU := cfg.UseGPUFFT
  451. var gpuEngine *gpufft.Engine
  452. if useGPU && gpuState != nil && gpuState.Available {
  453. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  454. gpuEngine = eng
  455. gpuState.set(true, nil)
  456. } else {
  457. gpuState.set(false, err)
  458. useGPU = false
  459. }
  460. } else if gpuState != nil {
  461. gpuState.set(false, nil)
  462. }
  463. gotSamples := false
  464. for {
  465. select {
  466. case <-ctx.Done():
  467. return
  468. case <-logTicker.C:
  469. st := srcMgr.Stats()
  470. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  471. case upd := <-updates:
  472. prevFFT := cfg.FFTSize
  473. prevUseGPU := useGPU
  474. cfg = upd.cfg
  475. if upd.det != nil {
  476. det = upd.det
  477. }
  478. if upd.window != nil {
  479. window = upd.window
  480. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  481. }
  482. dcEnabled = upd.dcBlock
  483. iqEnabled = upd.iqBalance
  484. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  485. srcMgr.Flush()
  486. gotSamples = false
  487. if gpuEngine != nil {
  488. gpuEngine.Close()
  489. gpuEngine = nil
  490. }
  491. useGPU = cfg.UseGPUFFT
  492. if useGPU && gpuState != nil && gpuState.Available {
  493. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  494. gpuEngine = eng
  495. gpuState.set(true, nil)
  496. } else {
  497. gpuState.set(false, err)
  498. useGPU = false
  499. }
  500. } else if gpuState != nil {
  501. gpuState.set(false, nil)
  502. }
  503. }
  504. dcBlocker.Reset()
  505. ticker.Reset(cfg.FrameInterval())
  506. case <-ticker.C:
  507. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  508. if err != nil {
  509. log.Printf("read IQ: %v", err)
  510. if strings.Contains(err.Error(), "timeout") {
  511. if err := srcMgr.Restart(cfg); err != nil {
  512. log.Printf("restart failed: %v", err)
  513. }
  514. }
  515. continue
  516. }
  517. if !gotSamples {
  518. log.Printf("received IQ samples")
  519. gotSamples = true
  520. }
  521. if dcEnabled {
  522. dcBlocker.Apply(iq)
  523. }
  524. if iqEnabled {
  525. dsp.IQBalance(iq)
  526. }
  527. var spectrum []float64
  528. if useGPU && gpuEngine != nil {
  529. if len(window) == len(iq) {
  530. for i := 0; i < len(iq); i++ {
  531. v := iq[i]
  532. w := float32(window[i])
  533. iq[i] = complex(real(v)*w, imag(v)*w)
  534. }
  535. }
  536. out, err := gpuEngine.Exec(iq)
  537. if err != nil {
  538. if gpuState != nil {
  539. gpuState.set(false, err)
  540. }
  541. useGPU = false
  542. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  543. } else {
  544. spectrum = fftutil.SpectrumFromFFT(out)
  545. }
  546. } else {
  547. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  548. }
  549. now := time.Now()
  550. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  551. eventMu.Lock()
  552. for _, ev := range finished {
  553. _ = enc.Encode(ev)
  554. }
  555. eventMu.Unlock()
  556. h.broadcast(SpectrumFrame{
  557. Timestamp: now.UnixMilli(),
  558. CenterHz: cfg.CenterHz,
  559. SampleHz: cfg.SampleRate,
  560. FFTSize: cfg.FFTSize,
  561. Spectrum: spectrum,
  562. Signals: signals,
  563. })
  564. }
  565. }
  566. }
  567. func parseSince(raw string) (time.Time, error) {
  568. if raw == "" {
  569. return time.Time{}, nil
  570. }
  571. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  572. if ms > 1e12 {
  573. return time.UnixMilli(ms), nil
  574. }
  575. return time.Unix(ms, 0), nil
  576. }
  577. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  578. return t, nil
  579. }
  580. return time.Parse(time.RFC3339, raw)
  581. }