Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

574 строки
14KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/gorilla/websocket"
  17. "sdr-visual-suite/internal/config"
  18. "sdr-visual-suite/internal/detector"
  19. "sdr-visual-suite/internal/dsp"
  20. "sdr-visual-suite/internal/events"
  21. fftutil "sdr-visual-suite/internal/fft"
  22. "sdr-visual-suite/internal/fft/gpufft"
  23. "sdr-visual-suite/internal/mock"
  24. "sdr-visual-suite/internal/runtime"
  25. "sdr-visual-suite/internal/sdr"
  26. "sdr-visual-suite/internal/sdrplay"
  27. )
  28. type SpectrumFrame struct {
  29. Timestamp int64 `json:"ts"`
  30. CenterHz float64 `json:"center_hz"`
  31. SampleHz int `json:"sample_rate"`
  32. FFTSize int `json:"fft_size"`
  33. Spectrum []float64 `json:"spectrum_db"`
  34. Signals []detector.Signal `json:"signals"`
  35. }
  36. type hub struct {
  37. mu sync.Mutex
  38. clients map[*websocket.Conn]struct{}
  39. }
  40. type gpuStatus struct {
  41. mu sync.RWMutex
  42. Available bool `json:"available"`
  43. Active bool `json:"active"`
  44. Error string `json:"error"`
  45. }
  46. func (g *gpuStatus) set(active bool, err error) {
  47. g.mu.Lock()
  48. defer g.mu.Unlock()
  49. g.Active = active
  50. if err != nil {
  51. g.Error = err.Error()
  52. } else {
  53. g.Error = ""
  54. }
  55. }
  56. func (g *gpuStatus) snapshot() gpuStatus {
  57. g.mu.RLock()
  58. defer g.mu.RUnlock()
  59. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  60. }
  61. func newHub() *hub {
  62. return &hub{clients: map[*websocket.Conn]struct{}{}}
  63. }
  64. func (h *hub) add(c *websocket.Conn) {
  65. h.mu.Lock()
  66. defer h.mu.Unlock()
  67. h.clients[c] = struct{}{}
  68. }
  69. func (h *hub) remove(c *websocket.Conn) {
  70. h.mu.Lock()
  71. defer h.mu.Unlock()
  72. delete(h.clients, c)
  73. }
  74. func (h *hub) broadcast(frame SpectrumFrame) {
  75. h.mu.Lock()
  76. defer h.mu.Unlock()
  77. b, _ := json.Marshal(frame)
  78. for c := range h.clients {
  79. _ = c.WriteMessage(websocket.TextMessage, b)
  80. }
  81. }
  82. type sourceManager struct {
  83. mu sync.RWMutex
  84. src sdr.Source
  85. newSource func(cfg config.Config) (sdr.Source, error)
  86. }
  87. func (m *sourceManager) Restart(cfg config.Config) error {
  88. m.mu.Lock()
  89. defer m.mu.Unlock()
  90. old := m.src
  91. _ = old.Stop()
  92. next, err := m.newSource(cfg)
  93. if err != nil {
  94. _ = old.Start()
  95. m.src = old
  96. return err
  97. }
  98. if err := next.Start(); err != nil {
  99. _ = next.Stop()
  100. _ = old.Start()
  101. m.src = old
  102. return err
  103. }
  104. m.src = next
  105. return nil
  106. }
  107. func (m *sourceManager) Stats() sdr.SourceStats {
  108. m.mu.RLock()
  109. defer m.mu.RUnlock()
  110. if sp, ok := m.src.(sdr.StatsProvider); ok {
  111. return sp.Stats()
  112. }
  113. return sdr.SourceStats{}
  114. }
  115. func (m *sourceManager) Flush() {
  116. m.mu.RLock()
  117. defer m.mu.RUnlock()
  118. if fl, ok := m.src.(sdr.Flushable); ok {
  119. fl.Flush()
  120. }
  121. }
  122. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  123. return &sourceManager{src: src, newSource: newSource}
  124. }
  125. func (m *sourceManager) Start() error {
  126. m.mu.RLock()
  127. defer m.mu.RUnlock()
  128. return m.src.Start()
  129. }
  130. func (m *sourceManager) Stop() error {
  131. m.mu.RLock()
  132. defer m.mu.RUnlock()
  133. return m.src.Stop()
  134. }
  135. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  136. m.mu.RLock()
  137. defer m.mu.RUnlock()
  138. return m.src.ReadIQ(n)
  139. }
  140. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  141. m.mu.Lock()
  142. defer m.mu.Unlock()
  143. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  144. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  145. return nil
  146. }
  147. }
  148. old := m.src
  149. _ = old.Stop()
  150. next, err := m.newSource(cfg)
  151. if err != nil {
  152. _ = old.Start()
  153. return err
  154. }
  155. if err := next.Start(); err != nil {
  156. _ = next.Stop()
  157. _ = old.Start()
  158. return err
  159. }
  160. m.src = next
  161. return nil
  162. }
  163. type dspUpdate struct {
  164. cfg config.Config
  165. det *detector.Detector
  166. window []float64
  167. dcBlock bool
  168. iqBalance bool
  169. useGPUFFT bool
  170. }
  171. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  172. select {
  173. case ch <- update:
  174. default:
  175. select {
  176. case <-ch:
  177. default:
  178. }
  179. ch <- update
  180. }
  181. }
  182. func main() {
  183. var cfgPath string
  184. var mockFlag bool
  185. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  186. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  187. flag.Parse()
  188. cfg, err := config.Load(cfgPath)
  189. if err != nil {
  190. log.Fatalf("load config: %v", err)
  191. }
  192. cfgManager := runtime.New(cfg)
  193. gpuState := &gpuStatus{Available: gpufft.Available()}
  194. newSource := func(cfg config.Config) (sdr.Source, error) {
  195. if mockFlag {
  196. src := mock.New(cfg.SampleRate)
  197. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  198. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  199. }
  200. return src, nil
  201. }
  202. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  203. if err != nil {
  204. return nil, err
  205. }
  206. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  207. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  208. }
  209. return src, nil
  210. }
  211. src, err := newSource(cfg)
  212. if err != nil {
  213. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  214. }
  215. srcMgr := newSourceManager(src, newSource)
  216. if err := srcMgr.Start(); err != nil {
  217. log.Fatalf("source start: %v", err)
  218. }
  219. defer srcMgr.Stop()
  220. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  221. log.Fatalf("event path: %v", err)
  222. }
  223. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  224. if err != nil {
  225. log.Fatalf("open events: %v", err)
  226. }
  227. defer eventFile.Close()
  228. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  229. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  230. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  231. window := fftutil.Hann(cfg.FFTSize)
  232. h := newHub()
  233. dspUpdates := make(chan dspUpdate, 1)
  234. ctx, cancel := context.WithCancel(context.Background())
  235. defer cancel()
  236. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, dspUpdates, gpuState)
  237. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
  238. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  239. c, err := upgrader.Upgrade(w, r, nil)
  240. if err != nil {
  241. return
  242. }
  243. h.add(c)
  244. defer func() {
  245. h.remove(c)
  246. _ = c.Close()
  247. }()
  248. for {
  249. _, _, err := c.ReadMessage()
  250. if err != nil {
  251. return
  252. }
  253. }
  254. })
  255. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  256. w.Header().Set("Content-Type", "application/json")
  257. switch r.Method {
  258. case http.MethodGet:
  259. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  260. case http.MethodPost:
  261. var update runtime.ConfigUpdate
  262. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  263. http.Error(w, "invalid json", http.StatusBadRequest)
  264. return
  265. }
  266. prev := cfgManager.Snapshot()
  267. next, err := cfgManager.ApplyConfig(update)
  268. if err != nil {
  269. http.Error(w, err.Error(), http.StatusBadRequest)
  270. return
  271. }
  272. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  273. if sourceChanged {
  274. if err := srcMgr.ApplyConfig(next); err != nil {
  275. cfgManager.Replace(prev)
  276. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  277. return
  278. }
  279. }
  280. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  281. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  282. prev.Detector.HoldMs != next.Detector.HoldMs ||
  283. prev.SampleRate != next.SampleRate ||
  284. prev.FFTSize != next.FFTSize
  285. windowChanged := prev.FFTSize != next.FFTSize
  286. var newDet *detector.Detector
  287. var newWindow []float64
  288. if detChanged {
  289. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  290. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  291. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  292. }
  293. if windowChanged {
  294. newWindow = fftutil.Hann(next.FFTSize)
  295. }
  296. pushDSPUpdate(dspUpdates, dspUpdate{
  297. cfg: next,
  298. det: newDet,
  299. window: newWindow,
  300. dcBlock: next.DCBlock,
  301. iqBalance: next.IQBalance,
  302. useGPUFFT: next.UseGPUFFT,
  303. })
  304. _ = json.NewEncoder(w).Encode(next)
  305. default:
  306. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  307. }
  308. })
  309. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  310. w.Header().Set("Content-Type", "application/json")
  311. if r.Method != http.MethodPost {
  312. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  313. return
  314. }
  315. var update runtime.SettingsUpdate
  316. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  317. http.Error(w, "invalid json", http.StatusBadRequest)
  318. return
  319. }
  320. prev := cfgManager.Snapshot()
  321. next, err := cfgManager.ApplySettings(update)
  322. if err != nil {
  323. http.Error(w, err.Error(), http.StatusBadRequest)
  324. return
  325. }
  326. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  327. if err := srcMgr.ApplyConfig(next); err != nil {
  328. cfgManager.Replace(prev)
  329. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  330. return
  331. }
  332. }
  333. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  334. pushDSPUpdate(dspUpdates, dspUpdate{
  335. cfg: next,
  336. dcBlock: next.DCBlock,
  337. iqBalance: next.IQBalance,
  338. })
  339. }
  340. _ = json.NewEncoder(w).Encode(next)
  341. })
  342. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  343. w.Header().Set("Content-Type", "application/json")
  344. if sp, ok := src.(sdr.StatsProvider); ok {
  345. _ = json.NewEncoder(w).Encode(sp.Stats())
  346. return
  347. }
  348. _ = json.NewEncoder(w).Encode(sdr.SourceStats{})
  349. })
  350. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  351. w.Header().Set("Content-Type", "application/json")
  352. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  353. })
  354. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  355. w.Header().Set("Content-Type", "application/json")
  356. limit := 200
  357. if v := r.URL.Query().Get("limit"); v != "" {
  358. if parsed, err := strconv.Atoi(v); err == nil {
  359. limit = parsed
  360. }
  361. }
  362. var since time.Time
  363. if v := r.URL.Query().Get("since"); v != "" {
  364. if parsed, err := parseSince(v); err == nil {
  365. since = parsed
  366. } else {
  367. http.Error(w, "invalid since", http.StatusBadRequest)
  368. return
  369. }
  370. }
  371. evs, err := events.ReadRecent(cfg.EventPath, limit, since)
  372. if err != nil {
  373. http.Error(w, "failed to read events", http.StatusInternalServerError)
  374. return
  375. }
  376. _ = json.NewEncoder(w).Encode(evs)
  377. })
  378. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  379. server := &http.Server{Addr: cfg.WebAddr}
  380. go func() {
  381. log.Printf("web listening on %s", cfg.WebAddr)
  382. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  383. log.Fatalf("server: %v", err)
  384. }
  385. }()
  386. stop := make(chan os.Signal, 1)
  387. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  388. <-stop
  389. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  390. defer cancelTimeout()
  391. _ = server.Shutdown(ctxTimeout)
  392. }
  393. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, updates <-chan dspUpdate, gpuState *gpuStatus) {
  394. ticker := time.NewTicker(cfg.FrameInterval())
  395. defer ticker.Stop()
  396. enc := json.NewEncoder(eventFile)
  397. dcBlocker := dsp.NewDCBlocker(0.995)
  398. dcEnabled := cfg.DCBlock
  399. iqEnabled := cfg.IQBalance
  400. useGPU := cfg.UseGPUFFT
  401. var gpuEngine *gpufft.Engine
  402. if useGPU && gpuState != nil && gpuState.Available {
  403. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  404. gpuEngine = eng
  405. gpuState.set(true, nil)
  406. } else {
  407. gpuState.set(false, err)
  408. useGPU = false
  409. }
  410. } else if gpuState != nil {
  411. gpuState.set(false, nil)
  412. }
  413. gotSamples := false
  414. for {
  415. select {
  416. case <-ctx.Done():
  417. return
  418. case upd := <-updates:
  419. prevFFT := cfg.FFTSize
  420. prevUseGPU := useGPU
  421. cfg = upd.cfg
  422. if upd.det != nil {
  423. det = upd.det
  424. }
  425. if upd.window != nil {
  426. window = upd.window
  427. }
  428. dcEnabled = upd.dcBlock
  429. iqEnabled = upd.iqBalance
  430. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  431. srcMgr.Flush()
  432. gotSamples = false
  433. if gpuEngine != nil {
  434. gpuEngine.Close()
  435. gpuEngine = nil
  436. }
  437. useGPU = cfg.UseGPUFFT
  438. if useGPU && gpuState != nil && gpuState.Available {
  439. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  440. gpuEngine = eng
  441. gpuState.set(true, nil)
  442. } else {
  443. gpuState.set(false, err)
  444. useGPU = false
  445. }
  446. } else if gpuState != nil {
  447. gpuState.set(false, nil)
  448. }
  449. }
  450. dcBlocker.Reset()
  451. ticker.Reset(cfg.FrameInterval())
  452. case <-ticker.C:
  453. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  454. if err != nil {
  455. log.Printf("read IQ: %v", err)
  456. if strings.Contains(err.Error(), "timeout") {
  457. if err := srcMgr.Restart(cfg); err != nil {
  458. log.Printf("restart failed: %v", err)
  459. }
  460. }
  461. continue
  462. }
  463. if !gotSamples {
  464. log.Printf("received IQ samples")
  465. gotSamples = true
  466. }
  467. if dcEnabled {
  468. dcBlocker.Apply(iq)
  469. }
  470. if iqEnabled {
  471. dsp.IQBalance(iq)
  472. }
  473. var spectrum []float64
  474. if useGPU && gpuEngine != nil {
  475. if len(window) == len(iq) {
  476. for i := 0; i < len(iq); i++ {
  477. v := iq[i]
  478. w := float32(window[i])
  479. iq[i] = complex(real(v)*w, imag(v)*w)
  480. }
  481. }
  482. out, err := gpuEngine.Exec(iq)
  483. if err != nil {
  484. if gpuState != nil {
  485. gpuState.set(false, err)
  486. }
  487. useGPU = false
  488. spectrum = fftutil.Spectrum(iq, window)
  489. } else {
  490. spectrum = fftutil.SpectrumFromFFT(out)
  491. }
  492. } else {
  493. spectrum = fftutil.Spectrum(iq, window)
  494. }
  495. now := time.Now()
  496. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  497. for _, ev := range finished {
  498. _ = enc.Encode(ev)
  499. }
  500. h.broadcast(SpectrumFrame{
  501. Timestamp: now.UnixMilli(),
  502. CenterHz: cfg.CenterHz,
  503. SampleHz: cfg.SampleRate,
  504. FFTSize: cfg.FFTSize,
  505. Spectrum: spectrum,
  506. Signals: signals,
  507. })
  508. }
  509. }
  510. }
  511. func parseSince(raw string) (time.Time, error) {
  512. if raw == "" {
  513. return time.Time{}, nil
  514. }
  515. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  516. if ms > 1e12 {
  517. return time.UnixMilli(ms), nil
  518. }
  519. return time.Unix(ms, 0), nil
  520. }
  521. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  522. return t, nil
  523. }
  524. return time.Parse(time.RFC3339, raw)
  525. }