Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

954 wiersze
26KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "math"
  8. "net/http"
  9. "os"
  10. "os/signal"
  11. "path/filepath"
  12. "runtime/debug"
  13. "sort"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "syscall"
  18. "time"
  19. "github.com/gorilla/websocket"
  20. "sdr-visual-suite/internal/classifier"
  21. "sdr-visual-suite/internal/config"
  22. "sdr-visual-suite/internal/detector"
  23. "sdr-visual-suite/internal/dsp"
  24. "sdr-visual-suite/internal/events"
  25. fftutil "sdr-visual-suite/internal/fft"
  26. "sdr-visual-suite/internal/fft/gpufft"
  27. "sdr-visual-suite/internal/mock"
  28. "sdr-visual-suite/internal/recorder"
  29. "sdr-visual-suite/internal/runtime"
  30. "sdr-visual-suite/internal/sdr"
  31. "sdr-visual-suite/internal/sdrplay"
  32. )
  33. type SpectrumFrame struct {
  34. Timestamp int64 `json:"ts"`
  35. CenterHz float64 `json:"center_hz"`
  36. SampleHz int `json:"sample_rate"`
  37. FFTSize int `json:"fft_size"`
  38. Spectrum []float64 `json:"spectrum_db"`
  39. Signals []detector.Signal `json:"signals"`
  40. }
  41. type client struct {
  42. conn *websocket.Conn
  43. send chan []byte
  44. done chan struct{}
  45. closeOnce sync.Once
  46. }
  47. type hub struct {
  48. mu sync.Mutex
  49. clients map[*client]struct{}
  50. frameCnt int64
  51. lastLogTs time.Time
  52. }
  53. type gpuStatus struct {
  54. mu sync.RWMutex
  55. Available bool `json:"available"`
  56. Active bool `json:"active"`
  57. Error string `json:"error"`
  58. }
  59. type signalSnapshot struct {
  60. mu sync.RWMutex
  61. signals []detector.Signal
  62. }
  63. func (s *signalSnapshot) set(sig []detector.Signal) {
  64. s.mu.Lock()
  65. defer s.mu.Unlock()
  66. s.signals = append([]detector.Signal(nil), sig...)
  67. }
  68. func (s *signalSnapshot) get() []detector.Signal {
  69. s.mu.RLock()
  70. defer s.mu.RUnlock()
  71. return append([]detector.Signal(nil), s.signals...)
  72. }
  73. func (g *gpuStatus) set(active bool, err error) {
  74. g.mu.Lock()
  75. defer g.mu.Unlock()
  76. g.Active = active
  77. if err != nil {
  78. g.Error = err.Error()
  79. } else {
  80. g.Error = ""
  81. }
  82. }
  83. func (g *gpuStatus) snapshot() gpuStatus {
  84. g.mu.RLock()
  85. defer g.mu.RUnlock()
  86. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  87. }
  88. func newHub() *hub {
  89. return &hub{clients: map[*client]struct{}{}, lastLogTs: time.Now()}
  90. }
  91. func (h *hub) add(c *client) {
  92. h.mu.Lock()
  93. defer h.mu.Unlock()
  94. h.clients[c] = struct{}{}
  95. log.Printf("ws connected (%d clients)", len(h.clients))
  96. }
  97. func (h *hub) remove(c *client) {
  98. c.closeOnce.Do(func() { close(c.done) })
  99. h.mu.Lock()
  100. defer h.mu.Unlock()
  101. delete(h.clients, c)
  102. log.Printf("ws disconnected (%d clients)", len(h.clients))
  103. }
  104. func (h *hub) broadcast(frame SpectrumFrame) {
  105. b, err := json.Marshal(frame)
  106. if err != nil {
  107. log.Printf("marshal frame: %v", err)
  108. return
  109. }
  110. h.mu.Lock()
  111. clients := make([]*client, 0, len(h.clients))
  112. for c := range h.clients {
  113. clients = append(clients, c)
  114. }
  115. h.mu.Unlock()
  116. for _, c := range clients {
  117. select {
  118. case c.send <- b:
  119. default:
  120. h.remove(c)
  121. }
  122. }
  123. h.frameCnt++
  124. if time.Since(h.lastLogTs) > 2*time.Second {
  125. h.lastLogTs = time.Now()
  126. log.Printf("broadcast frames=%d clients=%d", h.frameCnt, len(clients))
  127. }
  128. }
  129. type sourceManager struct {
  130. mu sync.RWMutex
  131. src sdr.Source
  132. newSource func(cfg config.Config) (sdr.Source, error)
  133. }
  134. func (m *sourceManager) Restart(cfg config.Config) error {
  135. m.mu.Lock()
  136. defer m.mu.Unlock()
  137. old := m.src
  138. _ = old.Stop()
  139. next, err := m.newSource(cfg)
  140. if err != nil {
  141. _ = old.Start()
  142. m.src = old
  143. return err
  144. }
  145. if err := next.Start(); err != nil {
  146. _ = next.Stop()
  147. _ = old.Start()
  148. m.src = old
  149. return err
  150. }
  151. m.src = next
  152. return nil
  153. }
  154. func (m *sourceManager) Stats() sdr.SourceStats {
  155. m.mu.RLock()
  156. defer m.mu.RUnlock()
  157. if sp, ok := m.src.(sdr.StatsProvider); ok {
  158. return sp.Stats()
  159. }
  160. return sdr.SourceStats{}
  161. }
  162. func (m *sourceManager) Flush() {
  163. m.mu.RLock()
  164. defer m.mu.RUnlock()
  165. if fl, ok := m.src.(sdr.Flushable); ok {
  166. fl.Flush()
  167. }
  168. }
  169. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  170. return &sourceManager{src: src, newSource: newSource}
  171. }
  172. func (m *sourceManager) Start() error {
  173. m.mu.RLock()
  174. defer m.mu.RUnlock()
  175. return m.src.Start()
  176. }
  177. func (m *sourceManager) Stop() error {
  178. m.mu.RLock()
  179. defer m.mu.RUnlock()
  180. return m.src.Stop()
  181. }
  182. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  183. m.mu.RLock()
  184. defer m.mu.RUnlock()
  185. return m.src.ReadIQ(n)
  186. }
  187. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  188. m.mu.Lock()
  189. defer m.mu.Unlock()
  190. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  191. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  192. return nil
  193. }
  194. }
  195. old := m.src
  196. _ = old.Stop()
  197. next, err := m.newSource(cfg)
  198. if err != nil {
  199. _ = old.Start()
  200. return err
  201. }
  202. if err := next.Start(); err != nil {
  203. _ = next.Stop()
  204. _ = old.Start()
  205. return err
  206. }
  207. m.src = next
  208. return nil
  209. }
  210. type dspUpdate struct {
  211. cfg config.Config
  212. det *detector.Detector
  213. window []float64
  214. dcBlock bool
  215. iqBalance bool
  216. useGPUFFT bool
  217. }
  218. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  219. select {
  220. case ch <- update:
  221. default:
  222. select {
  223. case <-ch:
  224. default:
  225. }
  226. ch <- update
  227. }
  228. }
  229. func main() {
  230. var cfgPath string
  231. var mockFlag bool
  232. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  233. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  234. flag.Parse()
  235. cfg, err := config.Load(cfgPath)
  236. if err != nil {
  237. log.Fatalf("load config: %v", err)
  238. }
  239. cfgManager := runtime.New(cfg)
  240. gpuState := &gpuStatus{Available: gpufft.Available()}
  241. newSource := func(cfg config.Config) (sdr.Source, error) {
  242. if mockFlag {
  243. src := mock.New(cfg.SampleRate)
  244. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  245. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  246. }
  247. return src, nil
  248. }
  249. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  250. if err != nil {
  251. return nil, err
  252. }
  253. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  254. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  255. }
  256. return src, nil
  257. }
  258. src, err := newSource(cfg)
  259. if err != nil {
  260. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  261. }
  262. srcMgr := newSourceManager(src, newSource)
  263. if err := srcMgr.Start(); err != nil {
  264. log.Fatalf("source start: %v", err)
  265. }
  266. defer srcMgr.Stop()
  267. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  268. log.Fatalf("event path: %v", err)
  269. }
  270. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  271. if err != nil {
  272. log.Fatalf("open events: %v", err)
  273. }
  274. defer eventFile.Close()
  275. eventMu := &sync.RWMutex{}
  276. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  277. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  278. time.Duration(cfg.Detector.HoldMs)*time.Millisecond,
  279. cfg.Detector.EmaAlpha,
  280. cfg.Detector.HysteresisDb,
  281. cfg.Detector.MinStableFrames,
  282. time.Duration(cfg.Detector.GapToleranceMs)*time.Millisecond,
  283. cfg.Detector.CFARMode,
  284. cfg.Detector.CFARGuardCells,
  285. cfg.Detector.CFARTrainCells,
  286. cfg.Detector.CFARRank,
  287. cfg.Detector.CFARScaleDb,
  288. cfg.Detector.CFARWrapAround)
  289. window := fftutil.Hann(cfg.FFTSize)
  290. h := newHub()
  291. dspUpdates := make(chan dspUpdate, 1)
  292. ctx, cancel := context.WithCancel(context.Background())
  293. defer cancel()
  294. decodeMap := buildDecoderMap(cfg)
  295. recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  296. Enabled: cfg.Recorder.Enabled,
  297. MinSNRDb: cfg.Recorder.MinSNRDb,
  298. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  299. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  300. PrerollMs: cfg.Recorder.PrerollMs,
  301. RecordIQ: cfg.Recorder.RecordIQ,
  302. RecordAudio: cfg.Recorder.RecordAudio,
  303. AutoDemod: cfg.Recorder.AutoDemod,
  304. AutoDecode: cfg.Recorder.AutoDecode,
  305. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  306. OutputDir: cfg.Recorder.OutputDir,
  307. ClassFilter: cfg.Recorder.ClassFilter,
  308. RingSeconds: cfg.Recorder.RingSeconds,
  309. }, cfg.CenterHz, decodeMap)
  310. sigSnap := &signalSnapshot{}
  311. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr, sigSnap)
  312. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  313. origin := r.Header.Get("Origin")
  314. if origin == "" || origin == "null" {
  315. return true
  316. }
  317. // allow same-host or any local IP
  318. return true
  319. }}
  320. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  321. conn, err := upgrader.Upgrade(w, r, nil)
  322. if err != nil {
  323. log.Printf("ws upgrade failed: %v (origin: %s)", err, r.Header.Get("Origin"))
  324. return
  325. }
  326. c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})}
  327. h.add(c)
  328. defer func() {
  329. h.remove(c)
  330. _ = conn.Close()
  331. }()
  332. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  333. conn.SetPongHandler(func(string) error {
  334. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  335. return nil
  336. })
  337. go func() {
  338. ping := time.NewTicker(30 * time.Second)
  339. defer ping.Stop()
  340. for {
  341. select {
  342. case msg, ok := <-c.send:
  343. if !ok {
  344. return
  345. }
  346. _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  347. if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
  348. return
  349. }
  350. case <-ping.C:
  351. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  352. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  353. log.Printf("ws ping error: %v", err)
  354. return
  355. }
  356. }
  357. }
  358. }()
  359. for {
  360. _, _, err := conn.ReadMessage()
  361. if err != nil {
  362. return
  363. }
  364. }
  365. })
  366. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  367. w.Header().Set("Content-Type", "application/json")
  368. switch r.Method {
  369. case http.MethodGet:
  370. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  371. case http.MethodPost:
  372. var update runtime.ConfigUpdate
  373. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  374. http.Error(w, "invalid json", http.StatusBadRequest)
  375. return
  376. }
  377. prev := cfgManager.Snapshot()
  378. next, err := cfgManager.ApplyConfig(update)
  379. if err != nil {
  380. http.Error(w, err.Error(), http.StatusBadRequest)
  381. return
  382. }
  383. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  384. if sourceChanged {
  385. if err := srcMgr.ApplyConfig(next); err != nil {
  386. cfgManager.Replace(prev)
  387. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  388. return
  389. }
  390. }
  391. if err := config.Save(cfgPath, next); err != nil {
  392. log.Printf("config save failed: %v", err)
  393. }
  394. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  395. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  396. prev.Detector.HoldMs != next.Detector.HoldMs ||
  397. prev.Detector.EmaAlpha != next.Detector.EmaAlpha ||
  398. prev.Detector.HysteresisDb != next.Detector.HysteresisDb ||
  399. prev.Detector.MinStableFrames != next.Detector.MinStableFrames ||
  400. prev.Detector.GapToleranceMs != next.Detector.GapToleranceMs ||
  401. prev.Detector.CFARMode != next.Detector.CFARMode ||
  402. prev.Detector.CFARGuardCells != next.Detector.CFARGuardCells ||
  403. prev.Detector.CFARTrainCells != next.Detector.CFARTrainCells ||
  404. prev.Detector.CFARRank != next.Detector.CFARRank ||
  405. prev.Detector.CFARScaleDb != next.Detector.CFARScaleDb ||
  406. prev.Detector.CFARWrapAround != next.Detector.CFARWrapAround ||
  407. prev.SampleRate != next.SampleRate ||
  408. prev.FFTSize != next.FFTSize
  409. windowChanged := prev.FFTSize != next.FFTSize
  410. var newDet *detector.Detector
  411. var newWindow []float64
  412. if detChanged {
  413. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  414. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  415. time.Duration(next.Detector.HoldMs)*time.Millisecond,
  416. next.Detector.EmaAlpha,
  417. next.Detector.HysteresisDb,
  418. next.Detector.MinStableFrames,
  419. time.Duration(next.Detector.GapToleranceMs)*time.Millisecond,
  420. next.Detector.CFARMode,
  421. next.Detector.CFARGuardCells,
  422. next.Detector.CFARTrainCells,
  423. next.Detector.CFARRank,
  424. next.Detector.CFARScaleDb,
  425. next.Detector.CFARWrapAround)
  426. }
  427. if windowChanged {
  428. newWindow = fftutil.Hann(next.FFTSize)
  429. }
  430. pushDSPUpdate(dspUpdates, dspUpdate{
  431. cfg: next,
  432. det: newDet,
  433. window: newWindow,
  434. dcBlock: next.DCBlock,
  435. iqBalance: next.IQBalance,
  436. useGPUFFT: next.UseGPUFFT,
  437. })
  438. _ = json.NewEncoder(w).Encode(next)
  439. default:
  440. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  441. }
  442. })
  443. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  444. w.Header().Set("Content-Type", "application/json")
  445. if r.Method != http.MethodPost {
  446. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  447. return
  448. }
  449. var update runtime.SettingsUpdate
  450. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  451. http.Error(w, "invalid json", http.StatusBadRequest)
  452. return
  453. }
  454. prev := cfgManager.Snapshot()
  455. next, err := cfgManager.ApplySettings(update)
  456. if err != nil {
  457. http.Error(w, err.Error(), http.StatusBadRequest)
  458. return
  459. }
  460. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  461. if err := srcMgr.ApplyConfig(next); err != nil {
  462. cfgManager.Replace(prev)
  463. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  464. return
  465. }
  466. }
  467. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  468. pushDSPUpdate(dspUpdates, dspUpdate{
  469. cfg: next,
  470. dcBlock: next.DCBlock,
  471. iqBalance: next.IQBalance,
  472. })
  473. }
  474. if err := config.Save(cfgPath, next); err != nil {
  475. log.Printf("config save failed: %v", err)
  476. }
  477. _ = json.NewEncoder(w).Encode(next)
  478. })
  479. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  480. w.Header().Set("Content-Type", "application/json")
  481. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  482. })
  483. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  484. w.Header().Set("Content-Type", "application/json")
  485. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  486. })
  487. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  488. w.Header().Set("Content-Type", "application/json")
  489. limit := 200
  490. if v := r.URL.Query().Get("limit"); v != "" {
  491. if parsed, err := strconv.Atoi(v); err == nil {
  492. limit = parsed
  493. }
  494. }
  495. var since time.Time
  496. if v := r.URL.Query().Get("since"); v != "" {
  497. if parsed, err := parseSince(v); err == nil {
  498. since = parsed
  499. } else {
  500. http.Error(w, "invalid since", http.StatusBadRequest)
  501. return
  502. }
  503. }
  504. snap := cfgManager.Snapshot()
  505. eventMu.RLock()
  506. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  507. eventMu.RUnlock()
  508. if err != nil {
  509. http.Error(w, "failed to read events", http.StatusInternalServerError)
  510. return
  511. }
  512. _ = json.NewEncoder(w).Encode(evs)
  513. })
  514. http.HandleFunc("/api/signals", func(w http.ResponseWriter, r *http.Request) {
  515. w.Header().Set("Content-Type", "application/json")
  516. if sigSnap == nil {
  517. _ = json.NewEncoder(w).Encode([]detector.Signal{})
  518. return
  519. }
  520. _ = json.NewEncoder(w).Encode(sigSnap.get())
  521. })
  522. http.HandleFunc("/api/decoders", func(w http.ResponseWriter, r *http.Request) {
  523. w.Header().Set("Content-Type", "application/json")
  524. _ = json.NewEncoder(w).Encode(decoderKeys(cfgManager.Snapshot()))
  525. })
  526. http.HandleFunc("/api/recordings", func(w http.ResponseWriter, r *http.Request) {
  527. if r.Method != http.MethodGet {
  528. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  529. return
  530. }
  531. w.Header().Set("Content-Type", "application/json")
  532. snap := cfgManager.Snapshot()
  533. list, err := recorder.ListRecordings(snap.Recorder.OutputDir)
  534. if err != nil {
  535. http.Error(w, "failed to list recordings", http.StatusInternalServerError)
  536. return
  537. }
  538. _ = json.NewEncoder(w).Encode(list)
  539. })
  540. http.HandleFunc("/api/recordings/", func(w http.ResponseWriter, r *http.Request) {
  541. w.Header().Set("Content-Type", "application/json")
  542. id := strings.TrimPrefix(r.URL.Path, "/api/recordings/")
  543. if id == "" {
  544. http.Error(w, "missing id", http.StatusBadRequest)
  545. return
  546. }
  547. snap := cfgManager.Snapshot()
  548. base := filepath.Clean(filepath.Join(snap.Recorder.OutputDir, id))
  549. if !strings.HasPrefix(base, filepath.Clean(snap.Recorder.OutputDir)) {
  550. http.Error(w, "invalid path", http.StatusBadRequest)
  551. return
  552. }
  553. if r.URL.Path == "/api/recordings/"+id+"/audio" {
  554. http.ServeFile(w, r, filepath.Join(base, "audio.wav"))
  555. return
  556. }
  557. if r.URL.Path == "/api/recordings/"+id+"/iq" {
  558. http.ServeFile(w, r, filepath.Join(base, "signal.cf32"))
  559. return
  560. }
  561. if r.URL.Path == "/api/recordings/"+id+"/decode" {
  562. mode := r.URL.Query().Get("mode")
  563. cmd := buildDecoderMap(cfgManager.Snapshot())[mode]
  564. if cmd == "" {
  565. http.Error(w, "decoder not configured", http.StatusBadRequest)
  566. return
  567. }
  568. meta, err := recorder.ReadMeta(filepath.Join(base, "meta.json"))
  569. if err != nil {
  570. http.Error(w, "meta read failed", http.StatusInternalServerError)
  571. return
  572. }
  573. audioPath := filepath.Join(base, "audio.wav")
  574. if _, errStat := os.Stat(audioPath); errStat != nil {
  575. audioPath = ""
  576. }
  577. res, err := recorder.DecodeOnDemand(cmd, filepath.Join(base, "signal.cf32"), meta.SampleRate, audioPath)
  578. if err != nil {
  579. http.Error(w, res.Stderr, http.StatusInternalServerError)
  580. return
  581. }
  582. _ = json.NewEncoder(w).Encode(res)
  583. return
  584. }
  585. // default: meta.json
  586. http.ServeFile(w, r, filepath.Join(base, "meta.json"))
  587. })
  588. http.HandleFunc("/api/demod", func(w http.ResponseWriter, r *http.Request) {
  589. if r.Method != http.MethodGet {
  590. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  591. return
  592. }
  593. q := r.URL.Query()
  594. freq, _ := strconv.ParseFloat(q.Get("freq"), 64)
  595. bw, _ := strconv.ParseFloat(q.Get("bw"), 64)
  596. sec, _ := strconv.Atoi(q.Get("sec"))
  597. if sec < 1 {
  598. sec = 1
  599. }
  600. if sec > 10 {
  601. sec = 10
  602. }
  603. mode := q.Get("mode")
  604. data, _, err := recMgr.DemodLive(freq, bw, mode, sec)
  605. if err != nil {
  606. http.Error(w, err.Error(), http.StatusBadRequest)
  607. return
  608. }
  609. w.Header().Set("Content-Type", "audio/wav")
  610. _, _ = w.Write(data)
  611. })
  612. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  613. server := &http.Server{Addr: cfg.WebAddr}
  614. go func() {
  615. log.Printf("web listening on %s", cfg.WebAddr)
  616. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  617. log.Fatalf("server: %v", err)
  618. }
  619. }()
  620. stop := make(chan os.Signal, 1)
  621. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  622. <-stop
  623. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  624. defer cancelTimeout()
  625. _ = server.Shutdown(ctxTimeout)
  626. }
  627. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) {
  628. defer func() {
  629. if r := recover(); r != nil {
  630. log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack())
  631. }
  632. }()
  633. ticker := time.NewTicker(cfg.FrameInterval())
  634. defer ticker.Stop()
  635. logTicker := time.NewTicker(5 * time.Second)
  636. defer logTicker.Stop()
  637. enc := json.NewEncoder(eventFile)
  638. dcBlocker := dsp.NewDCBlocker(0.995)
  639. dcEnabled := cfg.DCBlock
  640. iqEnabled := cfg.IQBalance
  641. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  642. useGPU := cfg.UseGPUFFT
  643. var gpuEngine *gpufft.Engine
  644. if useGPU && gpuState != nil {
  645. snap := gpuState.snapshot()
  646. if snap.Available {
  647. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  648. gpuEngine = eng
  649. gpuState.set(true, nil)
  650. } else {
  651. gpuState.set(false, err)
  652. useGPU = false
  653. }
  654. } else {
  655. gpuState.set(false, nil)
  656. useGPU = false
  657. }
  658. } else if gpuState != nil {
  659. gpuState.set(false, nil)
  660. }
  661. gotSamples := false
  662. for {
  663. select {
  664. case <-ctx.Done():
  665. return
  666. case <-logTicker.C:
  667. st := srcMgr.Stats()
  668. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  669. case upd := <-updates:
  670. prevFFT := cfg.FFTSize
  671. prevUseGPU := useGPU
  672. cfg = upd.cfg
  673. if rec != nil {
  674. rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  675. Enabled: cfg.Recorder.Enabled,
  676. MinSNRDb: cfg.Recorder.MinSNRDb,
  677. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  678. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  679. PrerollMs: cfg.Recorder.PrerollMs,
  680. RecordIQ: cfg.Recorder.RecordIQ,
  681. RecordAudio: cfg.Recorder.RecordAudio,
  682. AutoDemod: cfg.Recorder.AutoDemod,
  683. AutoDecode: cfg.Recorder.AutoDecode,
  684. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  685. OutputDir: cfg.Recorder.OutputDir,
  686. ClassFilter: cfg.Recorder.ClassFilter,
  687. RingSeconds: cfg.Recorder.RingSeconds,
  688. }, cfg.CenterHz, buildDecoderMap(cfg))
  689. }
  690. if upd.det != nil {
  691. det = upd.det
  692. }
  693. if upd.window != nil {
  694. window = upd.window
  695. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  696. }
  697. dcEnabled = upd.dcBlock
  698. iqEnabled = upd.iqBalance
  699. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  700. srcMgr.Flush()
  701. gotSamples = false
  702. if gpuEngine != nil {
  703. gpuEngine.Close()
  704. gpuEngine = nil
  705. }
  706. useGPU = cfg.UseGPUFFT
  707. if useGPU && gpuState != nil {
  708. snap := gpuState.snapshot()
  709. if snap.Available {
  710. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  711. gpuEngine = eng
  712. gpuState.set(true, nil)
  713. } else {
  714. gpuState.set(false, err)
  715. useGPU = false
  716. }
  717. } else {
  718. gpuState.set(false, nil)
  719. useGPU = false
  720. }
  721. } else if gpuState != nil {
  722. gpuState.set(false, nil)
  723. }
  724. }
  725. dcBlocker.Reset()
  726. ticker.Reset(cfg.FrameInterval())
  727. case <-ticker.C:
  728. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  729. if err != nil {
  730. log.Printf("read IQ: %v", err)
  731. if strings.Contains(err.Error(), "timeout") {
  732. if err := srcMgr.Restart(cfg); err != nil {
  733. log.Printf("restart failed: %v", err)
  734. }
  735. }
  736. continue
  737. }
  738. if rec != nil {
  739. rec.Ingest(time.Now(), iq)
  740. }
  741. if !gotSamples {
  742. log.Printf("received IQ samples")
  743. gotSamples = true
  744. }
  745. if dcEnabled {
  746. dcBlocker.Apply(iq)
  747. }
  748. if iqEnabled {
  749. dsp.IQBalance(iq)
  750. }
  751. var spectrum []float64
  752. if useGPU && gpuEngine != nil {
  753. if len(window) == len(iq) {
  754. for i := 0; i < len(iq); i++ {
  755. v := iq[i]
  756. w := float32(window[i])
  757. iq[i] = complex(real(v)*w, imag(v)*w)
  758. }
  759. }
  760. out, err := gpuEngine.Exec(iq)
  761. if err != nil {
  762. if gpuState != nil {
  763. gpuState.set(false, err)
  764. }
  765. useGPU = false
  766. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  767. } else {
  768. spectrum = fftutil.SpectrumFromFFT(out)
  769. }
  770. } else {
  771. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  772. }
  773. for i := range spectrum {
  774. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  775. spectrum[i] = -200
  776. }
  777. }
  778. now := time.Now()
  779. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  780. // enrich classification with temporal IQ features on per-signal snippet
  781. if len(iq) > 0 {
  782. for i := range signals {
  783. snip := extractSignalIQ(iq, cfg.SampleRate, cfg.CenterHz, signals[i].CenterHz, signals[i].BWHz)
  784. cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, snip)
  785. signals[i].Class = cls
  786. }
  787. det.UpdateClasses(signals)
  788. }
  789. if sigSnap != nil {
  790. sigSnap.set(signals)
  791. }
  792. eventMu.Lock()
  793. for _, ev := range finished {
  794. _ = enc.Encode(ev)
  795. }
  796. eventMu.Unlock()
  797. if rec != nil && len(finished) > 0 {
  798. evCopy := make([]detector.Event, len(finished))
  799. copy(evCopy, finished)
  800. go rec.OnEvents(evCopy)
  801. }
  802. h.broadcast(SpectrumFrame{
  803. Timestamp: now.UnixMilli(),
  804. CenterHz: cfg.CenterHz,
  805. SampleHz: cfg.SampleRate,
  806. FFTSize: cfg.FFTSize,
  807. Spectrum: spectrum,
  808. Signals: signals,
  809. })
  810. }
  811. }
  812. }
  813. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  814. if raw == "" {
  815. return fallback
  816. }
  817. if d, err := time.ParseDuration(raw); err == nil {
  818. return d
  819. }
  820. return fallback
  821. }
  822. func buildDecoderMap(cfg config.Config) map[string]string {
  823. out := map[string]string{}
  824. if cfg.Decoder.FT8Cmd != "" {
  825. out["FT8"] = cfg.Decoder.FT8Cmd
  826. }
  827. if cfg.Decoder.WSPRCmd != "" {
  828. out["WSPR"] = cfg.Decoder.WSPRCmd
  829. }
  830. if cfg.Decoder.DMRCmd != "" {
  831. out["DMR"] = cfg.Decoder.DMRCmd
  832. }
  833. if cfg.Decoder.DStarCmd != "" {
  834. out["D-STAR"] = cfg.Decoder.DStarCmd
  835. }
  836. if cfg.Decoder.FSKCmd != "" {
  837. out["FSK"] = cfg.Decoder.FSKCmd
  838. }
  839. if cfg.Decoder.PSKCmd != "" {
  840. out["PSK"] = cfg.Decoder.PSKCmd
  841. }
  842. return out
  843. }
  844. func decoderKeys(cfg config.Config) []string {
  845. m := buildDecoderMap(cfg)
  846. keys := make([]string, 0, len(m))
  847. for k := range m {
  848. keys = append(keys, k)
  849. }
  850. sort.Strings(keys)
  851. return keys
  852. }
  853. func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
  854. if len(iq) == 0 || sampleRate <= 0 {
  855. return nil
  856. }
  857. offset := sigHz - centerHz
  858. shifted := dsp.FreqShift(iq, sampleRate, offset)
  859. cutoff := bwHz / 2
  860. if cutoff < 200 {
  861. cutoff = 200
  862. }
  863. if cutoff > float64(sampleRate)/2-1 {
  864. cutoff = float64(sampleRate)/2 - 1
  865. }
  866. taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
  867. filtered := dsp.ApplyFIR(shifted, taps)
  868. decim := sampleRate / 200000
  869. if decim < 1 {
  870. decim = 1
  871. }
  872. return dsp.Decimate(filtered, decim)
  873. }
  874. func parseSince(raw string) (time.Time, error) {
  875. if raw == "" {
  876. return time.Time{}, nil
  877. }
  878. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  879. if ms > 1e12 {
  880. return time.UnixMilli(ms), nil
  881. }
  882. return time.Unix(ms, 0), nil
  883. }
  884. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  885. return t, nil
  886. }
  887. return time.Parse(time.RFC3339, raw)
  888. }