Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

528 Zeilen
13KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "strconv"
  12. "sync"
  13. "syscall"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. "sdr-visual-suite/internal/config"
  17. "sdr-visual-suite/internal/detector"
  18. "sdr-visual-suite/internal/dsp"
  19. "sdr-visual-suite/internal/events"
  20. fftutil "sdr-visual-suite/internal/fft"
  21. "sdr-visual-suite/internal/fft/gpufft"
  22. "sdr-visual-suite/internal/mock"
  23. "sdr-visual-suite/internal/runtime"
  24. "sdr-visual-suite/internal/sdr"
  25. "sdr-visual-suite/internal/sdrplay"
  26. )
  27. type SpectrumFrame struct {
  28. Timestamp int64 `json:"ts"`
  29. CenterHz float64 `json:"center_hz"`
  30. SampleHz int `json:"sample_rate"`
  31. FFTSize int `json:"fft_size"`
  32. Spectrum []float64 `json:"spectrum_db"`
  33. Signals []detector.Signal `json:"signals"`
  34. }
  35. type hub struct {
  36. mu sync.Mutex
  37. clients map[*websocket.Conn]struct{}
  38. }
  39. type gpuStatus struct {
  40. mu sync.RWMutex
  41. Available bool `json:"available"`
  42. Active bool `json:"active"`
  43. Error string `json:"error"`
  44. }
  45. func (g *gpuStatus) set(active bool, err error) {
  46. g.mu.Lock()
  47. defer g.mu.Unlock()
  48. g.Active = active
  49. if err != nil {
  50. g.Error = err.Error()
  51. } else {
  52. g.Error = ""
  53. }
  54. }
  55. func (g *gpuStatus) snapshot() gpuStatus {
  56. g.mu.RLock()
  57. defer g.mu.RUnlock()
  58. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  59. }
  60. func newHub() *hub {
  61. return &hub{clients: map[*websocket.Conn]struct{}{}}
  62. }
  63. func (h *hub) add(c *websocket.Conn) {
  64. h.mu.Lock()
  65. defer h.mu.Unlock()
  66. h.clients[c] = struct{}{}
  67. }
  68. func (h *hub) remove(c *websocket.Conn) {
  69. h.mu.Lock()
  70. defer h.mu.Unlock()
  71. delete(h.clients, c)
  72. }
  73. func (h *hub) broadcast(frame SpectrumFrame) {
  74. h.mu.Lock()
  75. defer h.mu.Unlock()
  76. b, _ := json.Marshal(frame)
  77. for c := range h.clients {
  78. _ = c.WriteMessage(websocket.TextMessage, b)
  79. }
  80. }
  81. type sourceManager struct {
  82. mu sync.RWMutex
  83. src sdr.Source
  84. newSource func(cfg config.Config) (sdr.Source, error)
  85. }
  86. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  87. return &sourceManager{src: src, newSource: newSource}
  88. }
  89. func (m *sourceManager) Start() error {
  90. m.mu.RLock()
  91. defer m.mu.RUnlock()
  92. return m.src.Start()
  93. }
  94. func (m *sourceManager) Stop() error {
  95. m.mu.RLock()
  96. defer m.mu.RUnlock()
  97. return m.src.Stop()
  98. }
  99. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  100. m.mu.RLock()
  101. defer m.mu.RUnlock()
  102. return m.src.ReadIQ(n)
  103. }
  104. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  105. m.mu.Lock()
  106. defer m.mu.Unlock()
  107. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  108. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  109. return nil
  110. }
  111. }
  112. old := m.src
  113. _ = old.Stop()
  114. next, err := m.newSource(cfg)
  115. if err != nil {
  116. _ = old.Start()
  117. return err
  118. }
  119. if err := next.Start(); err != nil {
  120. _ = next.Stop()
  121. _ = old.Start()
  122. return err
  123. }
  124. m.src = next
  125. return nil
  126. }
  127. type dspUpdate struct {
  128. cfg config.Config
  129. det *detector.Detector
  130. window []float64
  131. dcBlock bool
  132. iqBalance bool
  133. useGPUFFT bool
  134. }
  135. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  136. select {
  137. case ch <- update:
  138. default:
  139. select {
  140. case <-ch:
  141. default:
  142. }
  143. ch <- update
  144. }
  145. }
  146. func main() {
  147. var cfgPath string
  148. var mockFlag bool
  149. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  150. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  151. flag.Parse()
  152. cfg, err := config.Load(cfgPath)
  153. if err != nil {
  154. log.Fatalf("load config: %v", err)
  155. }
  156. cfgManager := runtime.New(cfg)
  157. gpuState := &gpuStatus{Available: gpufft.Available()}
  158. newSource := func(cfg config.Config) (sdr.Source, error) {
  159. if mockFlag {
  160. src := mock.New(cfg.SampleRate)
  161. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  162. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  163. }
  164. return src, nil
  165. }
  166. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  167. if err != nil {
  168. return nil, err
  169. }
  170. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  171. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  172. }
  173. return src, nil
  174. }
  175. src, err := newSource(cfg)
  176. if err != nil {
  177. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  178. }
  179. srcMgr := newSourceManager(src, newSource)
  180. if err := srcMgr.Start(); err != nil {
  181. log.Fatalf("source start: %v", err)
  182. }
  183. defer srcMgr.Stop()
  184. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  185. log.Fatalf("event path: %v", err)
  186. }
  187. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  188. if err != nil {
  189. log.Fatalf("open events: %v", err)
  190. }
  191. defer eventFile.Close()
  192. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  193. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  194. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  195. window := fftutil.Hann(cfg.FFTSize)
  196. h := newHub()
  197. dspUpdates := make(chan dspUpdate, 1)
  198. ctx, cancel := context.WithCancel(context.Background())
  199. defer cancel()
  200. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, dspUpdates, gpuState)
  201. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
  202. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  203. c, err := upgrader.Upgrade(w, r, nil)
  204. if err != nil {
  205. return
  206. }
  207. h.add(c)
  208. defer func() {
  209. h.remove(c)
  210. _ = c.Close()
  211. }()
  212. for {
  213. _, _, err := c.ReadMessage()
  214. if err != nil {
  215. return
  216. }
  217. }
  218. })
  219. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  220. w.Header().Set("Content-Type", "application/json")
  221. switch r.Method {
  222. case http.MethodGet:
  223. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  224. case http.MethodPost:
  225. var update runtime.ConfigUpdate
  226. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  227. http.Error(w, "invalid json", http.StatusBadRequest)
  228. return
  229. }
  230. prev := cfgManager.Snapshot()
  231. next, err := cfgManager.ApplyConfig(update)
  232. if err != nil {
  233. http.Error(w, err.Error(), http.StatusBadRequest)
  234. return
  235. }
  236. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  237. if sourceChanged {
  238. if err := srcMgr.ApplyConfig(next); err != nil {
  239. cfgManager.Replace(prev)
  240. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  241. return
  242. }
  243. }
  244. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  245. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  246. prev.Detector.HoldMs != next.Detector.HoldMs ||
  247. prev.SampleRate != next.SampleRate ||
  248. prev.FFTSize != next.FFTSize
  249. windowChanged := prev.FFTSize != next.FFTSize
  250. var newDet *detector.Detector
  251. var newWindow []float64
  252. if detChanged {
  253. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  254. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  255. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  256. }
  257. if windowChanged {
  258. newWindow = fftutil.Hann(next.FFTSize)
  259. }
  260. pushDSPUpdate(dspUpdates, dspUpdate{
  261. cfg: next,
  262. det: newDet,
  263. window: newWindow,
  264. dcBlock: next.DCBlock,
  265. iqBalance: next.IQBalance,
  266. useGPUFFT: next.UseGPUFFT,
  267. })
  268. _ = json.NewEncoder(w).Encode(next)
  269. default:
  270. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  271. }
  272. })
  273. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  274. w.Header().Set("Content-Type", "application/json")
  275. if r.Method != http.MethodPost {
  276. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  277. return
  278. }
  279. var update runtime.SettingsUpdate
  280. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  281. http.Error(w, "invalid json", http.StatusBadRequest)
  282. return
  283. }
  284. prev := cfgManager.Snapshot()
  285. next, err := cfgManager.ApplySettings(update)
  286. if err != nil {
  287. http.Error(w, err.Error(), http.StatusBadRequest)
  288. return
  289. }
  290. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  291. if err := srcMgr.ApplyConfig(next); err != nil {
  292. cfgManager.Replace(prev)
  293. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  294. return
  295. }
  296. }
  297. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  298. pushDSPUpdate(dspUpdates, dspUpdate{
  299. cfg: next,
  300. dcBlock: next.DCBlock,
  301. iqBalance: next.IQBalance,
  302. })
  303. }
  304. _ = json.NewEncoder(w).Encode(next)
  305. })
  306. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  307. w.Header().Set("Content-Type", "application/json")
  308. if sp, ok := src.(sdr.StatsProvider); ok {
  309. _ = json.NewEncoder(w).Encode(sp.Stats())
  310. return
  311. }
  312. _ = json.NewEncoder(w).Encode(sdr.SourceStats{})
  313. })
  314. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  315. w.Header().Set("Content-Type", "application/json")
  316. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  317. })
  318. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  319. w.Header().Set("Content-Type", "application/json")
  320. limit := 200
  321. if v := r.URL.Query().Get("limit"); v != "" {
  322. if parsed, err := strconv.Atoi(v); err == nil {
  323. limit = parsed
  324. }
  325. }
  326. var since time.Time
  327. if v := r.URL.Query().Get("since"); v != "" {
  328. if parsed, err := parseSince(v); err == nil {
  329. since = parsed
  330. } else {
  331. http.Error(w, "invalid since", http.StatusBadRequest)
  332. return
  333. }
  334. }
  335. evs, err := events.ReadRecent(cfg.EventPath, limit, since)
  336. if err != nil {
  337. http.Error(w, "failed to read events", http.StatusInternalServerError)
  338. return
  339. }
  340. _ = json.NewEncoder(w).Encode(evs)
  341. })
  342. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  343. server := &http.Server{Addr: cfg.WebAddr}
  344. go func() {
  345. log.Printf("web listening on %s", cfg.WebAddr)
  346. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  347. log.Fatalf("server: %v", err)
  348. }
  349. }()
  350. stop := make(chan os.Signal, 1)
  351. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  352. <-stop
  353. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  354. defer cancelTimeout()
  355. _ = server.Shutdown(ctxTimeout)
  356. }
  357. func runDSP(ctx context.Context, src sdr.Source, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, updates <-chan dspUpdate, gpuState *gpuStatus) {
  358. ticker := time.NewTicker(cfg.FrameInterval())
  359. defer ticker.Stop()
  360. enc := json.NewEncoder(eventFile)
  361. dcBlocker := dsp.NewDCBlocker(0.995)
  362. dcEnabled := cfg.DCBlock
  363. iqEnabled := cfg.IQBalance
  364. useGPU := cfg.UseGPUFFT
  365. var gpuEngine *gpufft.Engine
  366. if useGPU && gpuState != nil && gpuState.Available {
  367. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  368. gpuEngine = eng
  369. gpuState.set(true, nil)
  370. } else {
  371. gpuState.set(false, err)
  372. useGPU = false
  373. }
  374. } else if gpuState != nil {
  375. gpuState.set(false, nil)
  376. }
  377. gotSamples := false
  378. for {
  379. select {
  380. case <-ctx.Done():
  381. return
  382. case upd := <-updates:
  383. prevFFT := cfg.FFTSize
  384. prevUseGPU := useGPU
  385. cfg = upd.cfg
  386. if upd.det != nil {
  387. det = upd.det
  388. }
  389. if upd.window != nil {
  390. window = upd.window
  391. }
  392. dcEnabled = upd.dcBlock
  393. iqEnabled = upd.iqBalance
  394. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  395. if gpuEngine != nil {
  396. gpuEngine.Close()
  397. gpuEngine = nil
  398. }
  399. useGPU = cfg.UseGPUFFT
  400. if useGPU && gpuState != nil && gpuState.Available {
  401. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  402. gpuEngine = eng
  403. gpuState.set(true, nil)
  404. } else {
  405. gpuState.set(false, err)
  406. useGPU = false
  407. }
  408. } else if gpuState != nil {
  409. gpuState.set(false, nil)
  410. }
  411. }
  412. dcBlocker.Reset()
  413. ticker.Reset(cfg.FrameInterval())
  414. case <-ticker.C:
  415. iq, err := src.ReadIQ(cfg.FFTSize)
  416. if err != nil {
  417. log.Printf("read IQ: %v", err)
  418. continue
  419. }
  420. if !gotSamples {
  421. log.Printf("received IQ samples")
  422. gotSamples = true
  423. }
  424. if dcEnabled {
  425. dcBlocker.Apply(iq)
  426. }
  427. if iqEnabled {
  428. dsp.IQBalance(iq)
  429. }
  430. var spectrum []float64
  431. if useGPU && gpuEngine != nil {
  432. if len(window) == len(iq) {
  433. for i := 0; i < len(iq); i++ {
  434. v := iq[i]
  435. w := float32(window[i])
  436. iq[i] = complex(real(v)*w, imag(v)*w)
  437. }
  438. }
  439. out, err := gpuEngine.Exec(iq)
  440. if err != nil {
  441. if gpuState != nil {
  442. gpuState.set(false, err)
  443. }
  444. useGPU = false
  445. spectrum = fftutil.Spectrum(iq, window)
  446. } else {
  447. spectrum = fftutil.SpectrumFromFFT(out)
  448. }
  449. } else {
  450. spectrum = fftutil.Spectrum(iq, window)
  451. }
  452. now := time.Now()
  453. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  454. for _, ev := range finished {
  455. _ = enc.Encode(ev)
  456. }
  457. h.broadcast(SpectrumFrame{
  458. Timestamp: now.UnixMilli(),
  459. CenterHz: cfg.CenterHz,
  460. SampleHz: cfg.SampleRate,
  461. FFTSize: cfg.FFTSize,
  462. Spectrum: spectrum,
  463. Signals: signals,
  464. })
  465. }
  466. }
  467. }
  468. func parseSince(raw string) (time.Time, error) {
  469. if raw == "" {
  470. return time.Time{}, nil
  471. }
  472. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  473. if ms > 1e12 {
  474. return time.UnixMilli(ms), nil
  475. }
  476. return time.Unix(ms, 0), nil
  477. }
  478. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  479. return t, nil
  480. }
  481. return time.Parse(time.RFC3339, raw)
  482. }