Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

969 строки
26KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "math"
  8. "net/http"
  9. "os"
  10. "os/signal"
  11. "path/filepath"
  12. "runtime/debug"
  13. "sort"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "syscall"
  18. "time"
  19. "github.com/gorilla/websocket"
  20. "sdr-visual-suite/internal/classifier"
  21. "sdr-visual-suite/internal/config"
  22. "sdr-visual-suite/internal/detector"
  23. "sdr-visual-suite/internal/dsp"
  24. "sdr-visual-suite/internal/events"
  25. fftutil "sdr-visual-suite/internal/fft"
  26. "sdr-visual-suite/internal/fft/gpufft"
  27. "sdr-visual-suite/internal/mock"
  28. "sdr-visual-suite/internal/recorder"
  29. "sdr-visual-suite/internal/runtime"
  30. "sdr-visual-suite/internal/sdr"
  31. "sdr-visual-suite/internal/sdrplay"
  32. )
  33. type SpectrumDebug struct {
  34. Thresholds []float64 `json:"thresholds,omitempty"`
  35. NoiseFloor float64 `json:"noise_floor,omitempty"`
  36. Scores []map[string]any `json:"scores,omitempty"`
  37. }
  38. type SpectrumFrame struct {
  39. Timestamp int64 `json:"ts"`
  40. CenterHz float64 `json:"center_hz"`
  41. SampleHz int `json:"sample_rate"`
  42. FFTSize int `json:"fft_size"`
  43. Spectrum []float64 `json:"spectrum_db"`
  44. Signals []detector.Signal `json:"signals"`
  45. Debug *SpectrumDebug `json:"debug,omitempty"`
  46. }
  47. type client struct {
  48. conn *websocket.Conn
  49. send chan []byte
  50. done chan struct{}
  51. closeOnce sync.Once
  52. }
  53. type hub struct {
  54. mu sync.Mutex
  55. clients map[*client]struct{}
  56. frameCnt int64
  57. lastLogTs time.Time
  58. }
  59. type gpuStatus struct {
  60. mu sync.RWMutex
  61. Available bool `json:"available"`
  62. Active bool `json:"active"`
  63. Error string `json:"error"`
  64. }
  65. type signalSnapshot struct {
  66. mu sync.RWMutex
  67. signals []detector.Signal
  68. }
  69. func (s *signalSnapshot) set(sig []detector.Signal) {
  70. s.mu.Lock()
  71. defer s.mu.Unlock()
  72. s.signals = append([]detector.Signal(nil), sig...)
  73. }
  74. func (s *signalSnapshot) get() []detector.Signal {
  75. s.mu.RLock()
  76. defer s.mu.RUnlock()
  77. return append([]detector.Signal(nil), s.signals...)
  78. }
  79. func (g *gpuStatus) set(active bool, err error) {
  80. g.mu.Lock()
  81. defer g.mu.Unlock()
  82. g.Active = active
  83. if err != nil {
  84. g.Error = err.Error()
  85. } else {
  86. g.Error = ""
  87. }
  88. }
  89. func (g *gpuStatus) snapshot() gpuStatus {
  90. g.mu.RLock()
  91. defer g.mu.RUnlock()
  92. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  93. }
  94. func newHub() *hub {
  95. return &hub{clients: map[*client]struct{}{}, lastLogTs: time.Now()}
  96. }
  97. func (h *hub) add(c *client) {
  98. h.mu.Lock()
  99. defer h.mu.Unlock()
  100. h.clients[c] = struct{}{}
  101. log.Printf("ws connected (%d clients)", len(h.clients))
  102. }
  103. func (h *hub) remove(c *client) {
  104. c.closeOnce.Do(func() { close(c.done) })
  105. h.mu.Lock()
  106. defer h.mu.Unlock()
  107. delete(h.clients, c)
  108. log.Printf("ws disconnected (%d clients)", len(h.clients))
  109. }
  110. func (h *hub) broadcast(frame SpectrumFrame) {
  111. b, err := json.Marshal(frame)
  112. if err != nil {
  113. log.Printf("marshal frame: %v", err)
  114. return
  115. }
  116. h.mu.Lock()
  117. clients := make([]*client, 0, len(h.clients))
  118. for c := range h.clients {
  119. clients = append(clients, c)
  120. }
  121. h.mu.Unlock()
  122. for _, c := range clients {
  123. select {
  124. case c.send <- b:
  125. default:
  126. h.remove(c)
  127. }
  128. }
  129. h.frameCnt++
  130. if time.Since(h.lastLogTs) > 2*time.Second {
  131. h.lastLogTs = time.Now()
  132. log.Printf("broadcast frames=%d clients=%d", h.frameCnt, len(clients))
  133. }
  134. }
  135. type sourceManager struct {
  136. mu sync.RWMutex
  137. src sdr.Source
  138. newSource func(cfg config.Config) (sdr.Source, error)
  139. }
  140. func (m *sourceManager) Restart(cfg config.Config) error {
  141. m.mu.Lock()
  142. defer m.mu.Unlock()
  143. old := m.src
  144. _ = old.Stop()
  145. next, err := m.newSource(cfg)
  146. if err != nil {
  147. _ = old.Start()
  148. m.src = old
  149. return err
  150. }
  151. if err := next.Start(); err != nil {
  152. _ = next.Stop()
  153. _ = old.Start()
  154. m.src = old
  155. return err
  156. }
  157. m.src = next
  158. return nil
  159. }
  160. func (m *sourceManager) Stats() sdr.SourceStats {
  161. m.mu.RLock()
  162. defer m.mu.RUnlock()
  163. if sp, ok := m.src.(sdr.StatsProvider); ok {
  164. return sp.Stats()
  165. }
  166. return sdr.SourceStats{}
  167. }
  168. func (m *sourceManager) Flush() {
  169. m.mu.RLock()
  170. defer m.mu.RUnlock()
  171. if fl, ok := m.src.(sdr.Flushable); ok {
  172. fl.Flush()
  173. }
  174. }
  175. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  176. return &sourceManager{src: src, newSource: newSource}
  177. }
  178. func (m *sourceManager) Start() error {
  179. m.mu.RLock()
  180. defer m.mu.RUnlock()
  181. return m.src.Start()
  182. }
  183. func (m *sourceManager) Stop() error {
  184. m.mu.RLock()
  185. defer m.mu.RUnlock()
  186. return m.src.Stop()
  187. }
  188. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  189. m.mu.RLock()
  190. defer m.mu.RUnlock()
  191. return m.src.ReadIQ(n)
  192. }
  193. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  194. m.mu.Lock()
  195. defer m.mu.Unlock()
  196. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  197. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  198. return nil
  199. }
  200. }
  201. old := m.src
  202. _ = old.Stop()
  203. next, err := m.newSource(cfg)
  204. if err != nil {
  205. _ = old.Start()
  206. return err
  207. }
  208. if err := next.Start(); err != nil {
  209. _ = next.Stop()
  210. _ = old.Start()
  211. return err
  212. }
  213. m.src = next
  214. return nil
  215. }
  216. type dspUpdate struct {
  217. cfg config.Config
  218. det *detector.Detector
  219. window []float64
  220. dcBlock bool
  221. iqBalance bool
  222. useGPUFFT bool
  223. }
  224. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  225. select {
  226. case ch <- update:
  227. default:
  228. select {
  229. case <-ch:
  230. default:
  231. }
  232. ch <- update
  233. }
  234. }
  235. func main() {
  236. var cfgPath string
  237. var mockFlag bool
  238. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  239. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  240. flag.Parse()
  241. cfg, err := config.Load(cfgPath)
  242. if err != nil {
  243. log.Fatalf("load config: %v", err)
  244. }
  245. cfgManager := runtime.New(cfg)
  246. gpuState := &gpuStatus{Available: gpufft.Available()}
  247. newSource := func(cfg config.Config) (sdr.Source, error) {
  248. if mockFlag {
  249. src := mock.New(cfg.SampleRate)
  250. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  251. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  252. }
  253. return src, nil
  254. }
  255. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  256. if err != nil {
  257. return nil, err
  258. }
  259. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  260. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  261. }
  262. return src, nil
  263. }
  264. src, err := newSource(cfg)
  265. if err != nil {
  266. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  267. }
  268. srcMgr := newSourceManager(src, newSource)
  269. if err := srcMgr.Start(); err != nil {
  270. log.Fatalf("source start: %v", err)
  271. }
  272. defer srcMgr.Stop()
  273. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  274. log.Fatalf("event path: %v", err)
  275. }
  276. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  277. if err != nil {
  278. log.Fatalf("open events: %v", err)
  279. }
  280. defer eventFile.Close()
  281. eventMu := &sync.RWMutex{}
  282. det := detector.New(cfg.Detector, cfg.SampleRate, cfg.FFTSize)
  283. window := fftutil.Hann(cfg.FFTSize)
  284. h := newHub()
  285. dspUpdates := make(chan dspUpdate, 1)
  286. ctx, cancel := context.WithCancel(context.Background())
  287. defer cancel()
  288. decodeMap := buildDecoderMap(cfg)
  289. recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  290. Enabled: cfg.Recorder.Enabled,
  291. MinSNRDb: cfg.Recorder.MinSNRDb,
  292. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  293. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  294. PrerollMs: cfg.Recorder.PrerollMs,
  295. RecordIQ: cfg.Recorder.RecordIQ,
  296. RecordAudio: cfg.Recorder.RecordAudio,
  297. AutoDemod: cfg.Recorder.AutoDemod,
  298. AutoDecode: cfg.Recorder.AutoDecode,
  299. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  300. OutputDir: cfg.Recorder.OutputDir,
  301. ClassFilter: cfg.Recorder.ClassFilter,
  302. RingSeconds: cfg.Recorder.RingSeconds,
  303. }, cfg.CenterHz, decodeMap)
  304. sigSnap := &signalSnapshot{}
  305. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr, sigSnap)
  306. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  307. origin := r.Header.Get("Origin")
  308. if origin == "" || origin == "null" {
  309. return true
  310. }
  311. // allow same-host or any local IP
  312. return true
  313. }}
  314. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  315. conn, err := upgrader.Upgrade(w, r, nil)
  316. if err != nil {
  317. log.Printf("ws upgrade failed: %v (origin: %s)", err, r.Header.Get("Origin"))
  318. return
  319. }
  320. c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})}
  321. h.add(c)
  322. defer func() {
  323. h.remove(c)
  324. _ = conn.Close()
  325. }()
  326. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  327. conn.SetPongHandler(func(string) error {
  328. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  329. return nil
  330. })
  331. go func() {
  332. ping := time.NewTicker(30 * time.Second)
  333. defer ping.Stop()
  334. for {
  335. select {
  336. case msg, ok := <-c.send:
  337. if !ok {
  338. return
  339. }
  340. _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  341. if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
  342. return
  343. }
  344. case <-ping.C:
  345. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  346. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  347. log.Printf("ws ping error: %v", err)
  348. return
  349. }
  350. }
  351. }
  352. }()
  353. for {
  354. _, _, err := conn.ReadMessage()
  355. if err != nil {
  356. return
  357. }
  358. }
  359. })
  360. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  361. w.Header().Set("Content-Type", "application/json")
  362. switch r.Method {
  363. case http.MethodGet:
  364. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  365. case http.MethodPost:
  366. var update runtime.ConfigUpdate
  367. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  368. http.Error(w, "invalid json", http.StatusBadRequest)
  369. return
  370. }
  371. prev := cfgManager.Snapshot()
  372. next, err := cfgManager.ApplyConfig(update)
  373. if err != nil {
  374. http.Error(w, err.Error(), http.StatusBadRequest)
  375. return
  376. }
  377. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  378. if sourceChanged {
  379. if err := srcMgr.ApplyConfig(next); err != nil {
  380. cfgManager.Replace(prev)
  381. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  382. return
  383. }
  384. }
  385. if err := config.Save(cfgPath, next); err != nil {
  386. log.Printf("config save failed: %v", err)
  387. }
  388. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  389. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  390. prev.Detector.HoldMs != next.Detector.HoldMs ||
  391. prev.Detector.EmaAlpha != next.Detector.EmaAlpha ||
  392. prev.Detector.HysteresisDb != next.Detector.HysteresisDb ||
  393. prev.Detector.MinStableFrames != next.Detector.MinStableFrames ||
  394. prev.Detector.GapToleranceMs != next.Detector.GapToleranceMs ||
  395. prev.Detector.CFARMode != next.Detector.CFARMode ||
  396. prev.Detector.CFARGuardCells != next.Detector.CFARGuardCells ||
  397. prev.Detector.CFARTrainCells != next.Detector.CFARTrainCells ||
  398. prev.Detector.CFARRank != next.Detector.CFARRank ||
  399. prev.Detector.CFARScaleDb != next.Detector.CFARScaleDb ||
  400. prev.Detector.CFARWrapAround != next.Detector.CFARWrapAround ||
  401. prev.SampleRate != next.SampleRate ||
  402. prev.FFTSize != next.FFTSize
  403. windowChanged := prev.FFTSize != next.FFTSize
  404. var newDet *detector.Detector
  405. var newWindow []float64
  406. if detChanged {
  407. newDet = detector.New(next.Detector, next.SampleRate, next.FFTSize)
  408. }
  409. if windowChanged {
  410. newWindow = fftutil.Hann(next.FFTSize)
  411. }
  412. pushDSPUpdate(dspUpdates, dspUpdate{
  413. cfg: next,
  414. det: newDet,
  415. window: newWindow,
  416. dcBlock: next.DCBlock,
  417. iqBalance: next.IQBalance,
  418. useGPUFFT: next.UseGPUFFT,
  419. })
  420. _ = json.NewEncoder(w).Encode(next)
  421. default:
  422. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  423. }
  424. })
  425. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  426. w.Header().Set("Content-Type", "application/json")
  427. if r.Method != http.MethodPost {
  428. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  429. return
  430. }
  431. var update runtime.SettingsUpdate
  432. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  433. http.Error(w, "invalid json", http.StatusBadRequest)
  434. return
  435. }
  436. prev := cfgManager.Snapshot()
  437. next, err := cfgManager.ApplySettings(update)
  438. if err != nil {
  439. http.Error(w, err.Error(), http.StatusBadRequest)
  440. return
  441. }
  442. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  443. if err := srcMgr.ApplyConfig(next); err != nil {
  444. cfgManager.Replace(prev)
  445. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  446. return
  447. }
  448. }
  449. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  450. pushDSPUpdate(dspUpdates, dspUpdate{
  451. cfg: next,
  452. dcBlock: next.DCBlock,
  453. iqBalance: next.IQBalance,
  454. })
  455. }
  456. if err := config.Save(cfgPath, next); err != nil {
  457. log.Printf("config save failed: %v", err)
  458. }
  459. _ = json.NewEncoder(w).Encode(next)
  460. })
  461. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  462. w.Header().Set("Content-Type", "application/json")
  463. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  464. })
  465. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  466. w.Header().Set("Content-Type", "application/json")
  467. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  468. })
  469. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  470. w.Header().Set("Content-Type", "application/json")
  471. limit := 200
  472. if v := r.URL.Query().Get("limit"); v != "" {
  473. if parsed, err := strconv.Atoi(v); err == nil {
  474. limit = parsed
  475. }
  476. }
  477. var since time.Time
  478. if v := r.URL.Query().Get("since"); v != "" {
  479. if parsed, err := parseSince(v); err == nil {
  480. since = parsed
  481. } else {
  482. http.Error(w, "invalid since", http.StatusBadRequest)
  483. return
  484. }
  485. }
  486. snap := cfgManager.Snapshot()
  487. eventMu.RLock()
  488. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  489. eventMu.RUnlock()
  490. if err != nil {
  491. http.Error(w, "failed to read events", http.StatusInternalServerError)
  492. return
  493. }
  494. _ = json.NewEncoder(w).Encode(evs)
  495. })
  496. http.HandleFunc("/api/signals", func(w http.ResponseWriter, r *http.Request) {
  497. w.Header().Set("Content-Type", "application/json")
  498. if sigSnap == nil {
  499. _ = json.NewEncoder(w).Encode([]detector.Signal{})
  500. return
  501. }
  502. _ = json.NewEncoder(w).Encode(sigSnap.get())
  503. })
  504. http.HandleFunc("/api/decoders", func(w http.ResponseWriter, r *http.Request) {
  505. w.Header().Set("Content-Type", "application/json")
  506. _ = json.NewEncoder(w).Encode(decoderKeys(cfgManager.Snapshot()))
  507. })
  508. http.HandleFunc("/api/recordings", func(w http.ResponseWriter, r *http.Request) {
  509. if r.Method != http.MethodGet {
  510. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  511. return
  512. }
  513. w.Header().Set("Content-Type", "application/json")
  514. snap := cfgManager.Snapshot()
  515. list, err := recorder.ListRecordings(snap.Recorder.OutputDir)
  516. if err != nil {
  517. http.Error(w, "failed to list recordings", http.StatusInternalServerError)
  518. return
  519. }
  520. _ = json.NewEncoder(w).Encode(list)
  521. })
  522. http.HandleFunc("/api/recordings/", func(w http.ResponseWriter, r *http.Request) {
  523. w.Header().Set("Content-Type", "application/json")
  524. id := strings.TrimPrefix(r.URL.Path, "/api/recordings/")
  525. if id == "" {
  526. http.Error(w, "missing id", http.StatusBadRequest)
  527. return
  528. }
  529. snap := cfgManager.Snapshot()
  530. base := filepath.Clean(filepath.Join(snap.Recorder.OutputDir, id))
  531. if !strings.HasPrefix(base, filepath.Clean(snap.Recorder.OutputDir)) {
  532. http.Error(w, "invalid path", http.StatusBadRequest)
  533. return
  534. }
  535. if r.URL.Path == "/api/recordings/"+id+"/audio" {
  536. http.ServeFile(w, r, filepath.Join(base, "audio.wav"))
  537. return
  538. }
  539. if r.URL.Path == "/api/recordings/"+id+"/iq" {
  540. http.ServeFile(w, r, filepath.Join(base, "signal.cf32"))
  541. return
  542. }
  543. if r.URL.Path == "/api/recordings/"+id+"/decode" {
  544. mode := r.URL.Query().Get("mode")
  545. cmd := buildDecoderMap(cfgManager.Snapshot())[mode]
  546. if cmd == "" {
  547. http.Error(w, "decoder not configured", http.StatusBadRequest)
  548. return
  549. }
  550. meta, err := recorder.ReadMeta(filepath.Join(base, "meta.json"))
  551. if err != nil {
  552. http.Error(w, "meta read failed", http.StatusInternalServerError)
  553. return
  554. }
  555. audioPath := filepath.Join(base, "audio.wav")
  556. if _, errStat := os.Stat(audioPath); errStat != nil {
  557. audioPath = ""
  558. }
  559. res, err := recorder.DecodeOnDemand(cmd, filepath.Join(base, "signal.cf32"), meta.SampleRate, audioPath)
  560. if err != nil {
  561. http.Error(w, res.Stderr, http.StatusInternalServerError)
  562. return
  563. }
  564. _ = json.NewEncoder(w).Encode(res)
  565. return
  566. }
  567. // default: meta.json
  568. http.ServeFile(w, r, filepath.Join(base, "meta.json"))
  569. })
  570. http.HandleFunc("/api/demod", func(w http.ResponseWriter, r *http.Request) {
  571. if r.Method != http.MethodGet {
  572. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  573. return
  574. }
  575. q := r.URL.Query()
  576. freq, _ := strconv.ParseFloat(q.Get("freq"), 64)
  577. bw, _ := strconv.ParseFloat(q.Get("bw"), 64)
  578. sec, _ := strconv.Atoi(q.Get("sec"))
  579. if sec < 1 {
  580. sec = 1
  581. }
  582. if sec > 10 {
  583. sec = 10
  584. }
  585. mode := q.Get("mode")
  586. data, _, err := recMgr.DemodLive(freq, bw, mode, sec)
  587. if err != nil {
  588. http.Error(w, err.Error(), http.StatusBadRequest)
  589. return
  590. }
  591. w.Header().Set("Content-Type", "audio/wav")
  592. _, _ = w.Write(data)
  593. })
  594. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  595. server := &http.Server{Addr: cfg.WebAddr}
  596. go func() {
  597. log.Printf("web listening on %s", cfg.WebAddr)
  598. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  599. log.Fatalf("server: %v", err)
  600. }
  601. }()
  602. stop := make(chan os.Signal, 1)
  603. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  604. <-stop
  605. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  606. defer cancelTimeout()
  607. _ = server.Shutdown(ctxTimeout)
  608. }
  609. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) {
  610. defer func() {
  611. if r := recover(); r != nil {
  612. log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack())
  613. }
  614. }()
  615. ticker := time.NewTicker(cfg.FrameInterval())
  616. defer ticker.Stop()
  617. logTicker := time.NewTicker(5 * time.Second)
  618. defer logTicker.Stop()
  619. enc := json.NewEncoder(eventFile)
  620. dcBlocker := dsp.NewDCBlocker(0.995)
  621. dcEnabled := cfg.DCBlock
  622. iqEnabled := cfg.IQBalance
  623. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  624. useGPU := cfg.UseGPUFFT
  625. var gpuEngine *gpufft.Engine
  626. if useGPU && gpuState != nil {
  627. snap := gpuState.snapshot()
  628. if snap.Available {
  629. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  630. gpuEngine = eng
  631. gpuState.set(true, nil)
  632. } else {
  633. gpuState.set(false, err)
  634. useGPU = false
  635. }
  636. } else {
  637. gpuState.set(false, nil)
  638. useGPU = false
  639. }
  640. } else if gpuState != nil {
  641. gpuState.set(false, nil)
  642. }
  643. gotSamples := false
  644. for {
  645. select {
  646. case <-ctx.Done():
  647. return
  648. case <-logTicker.C:
  649. st := srcMgr.Stats()
  650. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  651. case upd := <-updates:
  652. prevFFT := cfg.FFTSize
  653. prevUseGPU := useGPU
  654. cfg = upd.cfg
  655. if rec != nil {
  656. rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  657. Enabled: cfg.Recorder.Enabled,
  658. MinSNRDb: cfg.Recorder.MinSNRDb,
  659. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  660. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  661. PrerollMs: cfg.Recorder.PrerollMs,
  662. RecordIQ: cfg.Recorder.RecordIQ,
  663. RecordAudio: cfg.Recorder.RecordAudio,
  664. AutoDemod: cfg.Recorder.AutoDemod,
  665. AutoDecode: cfg.Recorder.AutoDecode,
  666. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  667. OutputDir: cfg.Recorder.OutputDir,
  668. ClassFilter: cfg.Recorder.ClassFilter,
  669. RingSeconds: cfg.Recorder.RingSeconds,
  670. }, cfg.CenterHz, buildDecoderMap(cfg))
  671. }
  672. if upd.det != nil {
  673. det = upd.det
  674. }
  675. if upd.window != nil {
  676. window = upd.window
  677. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  678. }
  679. dcEnabled = upd.dcBlock
  680. iqEnabled = upd.iqBalance
  681. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  682. srcMgr.Flush()
  683. gotSamples = false
  684. if gpuEngine != nil {
  685. gpuEngine.Close()
  686. gpuEngine = nil
  687. }
  688. useGPU = cfg.UseGPUFFT
  689. if useGPU && gpuState != nil {
  690. snap := gpuState.snapshot()
  691. if snap.Available {
  692. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  693. gpuEngine = eng
  694. gpuState.set(true, nil)
  695. } else {
  696. gpuState.set(false, err)
  697. useGPU = false
  698. }
  699. } else {
  700. gpuState.set(false, nil)
  701. useGPU = false
  702. }
  703. } else if gpuState != nil {
  704. gpuState.set(false, nil)
  705. }
  706. }
  707. dcBlocker.Reset()
  708. ticker.Reset(cfg.FrameInterval())
  709. case <-ticker.C:
  710. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  711. if err != nil {
  712. log.Printf("read IQ: %v", err)
  713. if strings.Contains(err.Error(), "timeout") {
  714. if err := srcMgr.Restart(cfg); err != nil {
  715. log.Printf("restart failed: %v", err)
  716. }
  717. }
  718. continue
  719. }
  720. if rec != nil {
  721. rec.Ingest(time.Now(), iq)
  722. }
  723. if !gotSamples {
  724. log.Printf("received IQ samples")
  725. gotSamples = true
  726. }
  727. if dcEnabled {
  728. dcBlocker.Apply(iq)
  729. }
  730. if iqEnabled {
  731. dsp.IQBalance(iq)
  732. }
  733. var spectrum []float64
  734. if useGPU && gpuEngine != nil {
  735. if len(window) == len(iq) {
  736. for i := 0; i < len(iq); i++ {
  737. v := iq[i]
  738. w := float32(window[i])
  739. iq[i] = complex(real(v)*w, imag(v)*w)
  740. }
  741. }
  742. out, err := gpuEngine.Exec(iq)
  743. if err != nil {
  744. if gpuState != nil {
  745. gpuState.set(false, err)
  746. }
  747. useGPU = false
  748. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  749. } else {
  750. spectrum = fftutil.SpectrumFromFFT(out)
  751. }
  752. } else {
  753. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  754. }
  755. for i := range spectrum {
  756. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  757. spectrum[i] = -200
  758. }
  759. }
  760. now := time.Now()
  761. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  762. thresholds := det.LastThresholds()
  763. noiseFloor := det.LastNoiseFloor()
  764. // enrich classification with temporal IQ features on per-signal snippet
  765. if len(iq) > 0 {
  766. for i := range signals {
  767. snip := extractSignalIQ(iq, cfg.SampleRate, cfg.CenterHz, signals[i].CenterHz, signals[i].BWHz)
  768. cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, snip)
  769. signals[i].Class = cls
  770. }
  771. det.UpdateClasses(signals)
  772. }
  773. if sigSnap != nil {
  774. sigSnap.set(signals)
  775. }
  776. eventMu.Lock()
  777. for _, ev := range finished {
  778. _ = enc.Encode(ev)
  779. }
  780. eventMu.Unlock()
  781. if rec != nil && len(finished) > 0 {
  782. evCopy := make([]detector.Event, len(finished))
  783. copy(evCopy, finished)
  784. go rec.OnEvents(evCopy)
  785. }
  786. var debugInfo *SpectrumDebug
  787. if len(thresholds) > 0 || len(signals) > 0 || noiseFloor != 0 {
  788. scoreDebug := make([]map[string]any, 0, len(signals))
  789. for _, s := range signals {
  790. if s.Class == nil || len(s.Class.Scores) == 0 {
  791. scoreDebug = append(scoreDebug, map[string]any{
  792. "center_hz": s.CenterHz,
  793. "class": nil,
  794. })
  795. continue
  796. }
  797. scores := make(map[string]float64, len(s.Class.Scores))
  798. for k, v := range s.Class.Scores {
  799. scores[string(k)] = v
  800. }
  801. scoreDebug = append(scoreDebug, map[string]any{
  802. "center_hz": s.CenterHz,
  803. "mod_type": s.Class.ModType,
  804. "confidence": s.Class.Confidence,
  805. "second_best": s.Class.SecondBest,
  806. "scores": scores,
  807. })
  808. }
  809. debugInfo = &SpectrumDebug{
  810. Thresholds: thresholds,
  811. NoiseFloor: noiseFloor,
  812. Scores: scoreDebug,
  813. }
  814. }
  815. h.broadcast(SpectrumFrame{
  816. Timestamp: now.UnixMilli(),
  817. CenterHz: cfg.CenterHz,
  818. SampleHz: cfg.SampleRate,
  819. FFTSize: cfg.FFTSize,
  820. Spectrum: spectrum,
  821. Signals: signals,
  822. Debug: debugInfo,
  823. })
  824. }
  825. }
  826. }
  827. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  828. if raw == "" {
  829. return fallback
  830. }
  831. if d, err := time.ParseDuration(raw); err == nil {
  832. return d
  833. }
  834. return fallback
  835. }
  836. func buildDecoderMap(cfg config.Config) map[string]string {
  837. out := map[string]string{}
  838. if cfg.Decoder.FT8Cmd != "" {
  839. out["FT8"] = cfg.Decoder.FT8Cmd
  840. }
  841. if cfg.Decoder.WSPRCmd != "" {
  842. out["WSPR"] = cfg.Decoder.WSPRCmd
  843. }
  844. if cfg.Decoder.DMRCmd != "" {
  845. out["DMR"] = cfg.Decoder.DMRCmd
  846. }
  847. if cfg.Decoder.DStarCmd != "" {
  848. out["D-STAR"] = cfg.Decoder.DStarCmd
  849. }
  850. if cfg.Decoder.FSKCmd != "" {
  851. out["FSK"] = cfg.Decoder.FSKCmd
  852. }
  853. if cfg.Decoder.PSKCmd != "" {
  854. out["PSK"] = cfg.Decoder.PSKCmd
  855. }
  856. return out
  857. }
  858. func decoderKeys(cfg config.Config) []string {
  859. m := buildDecoderMap(cfg)
  860. keys := make([]string, 0, len(m))
  861. for k := range m {
  862. keys = append(keys, k)
  863. }
  864. sort.Strings(keys)
  865. return keys
  866. }
  867. func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
  868. if len(iq) == 0 || sampleRate <= 0 {
  869. return nil
  870. }
  871. offset := sigHz - centerHz
  872. shifted := dsp.FreqShift(iq, sampleRate, offset)
  873. cutoff := bwHz / 2
  874. if cutoff < 200 {
  875. cutoff = 200
  876. }
  877. if cutoff > float64(sampleRate)/2-1 {
  878. cutoff = float64(sampleRate)/2 - 1
  879. }
  880. taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
  881. filtered := dsp.ApplyFIR(shifted, taps)
  882. decim := sampleRate / 200000
  883. if decim < 1 {
  884. decim = 1
  885. }
  886. return dsp.Decimate(filtered, decim)
  887. }
  888. func parseSince(raw string) (time.Time, error) {
  889. if raw == "" {
  890. return time.Time{}, nil
  891. }
  892. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  893. if ms > 1e12 {
  894. return time.UnixMilli(ms), nil
  895. }
  896. return time.Unix(ms, 0), nil
  897. }
  898. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  899. return t, nil
  900. }
  901. return time.Parse(time.RFC3339, raw)
  902. }