Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

960 rindas
26KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "math"
  8. "net/http"
  9. "os"
  10. "os/signal"
  11. "path/filepath"
  12. "runtime/debug"
  13. "sort"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "syscall"
  18. "time"
  19. "github.com/gorilla/websocket"
  20. "sdr-visual-suite/internal/classifier"
  21. "sdr-visual-suite/internal/config"
  22. "sdr-visual-suite/internal/detector"
  23. "sdr-visual-suite/internal/dsp"
  24. "sdr-visual-suite/internal/events"
  25. fftutil "sdr-visual-suite/internal/fft"
  26. "sdr-visual-suite/internal/fft/gpufft"
  27. "sdr-visual-suite/internal/mock"
  28. "sdr-visual-suite/internal/recorder"
  29. "sdr-visual-suite/internal/runtime"
  30. "sdr-visual-suite/internal/sdr"
  31. "sdr-visual-suite/internal/sdrplay"
  32. )
  33. type SpectrumFrame struct {
  34. Timestamp int64 `json:"ts"`
  35. CenterHz float64 `json:"center_hz"`
  36. SampleHz int `json:"sample_rate"`
  37. FFTSize int `json:"fft_size"`
  38. Spectrum []float64 `json:"spectrum_db"`
  39. Thresholds []float64 `json:"thresholds,omitempty"`
  40. NoiseFloor float64 `json:"noise_floor,omitempty"`
  41. Signals []detector.Signal `json:"signals"`
  42. }
  43. type client struct {
  44. conn *websocket.Conn
  45. send chan []byte
  46. done chan struct{}
  47. closeOnce sync.Once
  48. }
  49. type hub struct {
  50. mu sync.Mutex
  51. clients map[*client]struct{}
  52. frameCnt int64
  53. lastLogTs time.Time
  54. }
  55. type gpuStatus struct {
  56. mu sync.RWMutex
  57. Available bool `json:"available"`
  58. Active bool `json:"active"`
  59. Error string `json:"error"`
  60. }
  61. type signalSnapshot struct {
  62. mu sync.RWMutex
  63. signals []detector.Signal
  64. }
  65. func (s *signalSnapshot) set(sig []detector.Signal) {
  66. s.mu.Lock()
  67. defer s.mu.Unlock()
  68. s.signals = append([]detector.Signal(nil), sig...)
  69. }
  70. func (s *signalSnapshot) get() []detector.Signal {
  71. s.mu.RLock()
  72. defer s.mu.RUnlock()
  73. return append([]detector.Signal(nil), s.signals...)
  74. }
  75. func (g *gpuStatus) set(active bool, err error) {
  76. g.mu.Lock()
  77. defer g.mu.Unlock()
  78. g.Active = active
  79. if err != nil {
  80. g.Error = err.Error()
  81. } else {
  82. g.Error = ""
  83. }
  84. }
  85. func (g *gpuStatus) snapshot() gpuStatus {
  86. g.mu.RLock()
  87. defer g.mu.RUnlock()
  88. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  89. }
  90. func newHub() *hub {
  91. return &hub{clients: map[*client]struct{}{}, lastLogTs: time.Now()}
  92. }
  93. func (h *hub) add(c *client) {
  94. h.mu.Lock()
  95. defer h.mu.Unlock()
  96. h.clients[c] = struct{}{}
  97. log.Printf("ws connected (%d clients)", len(h.clients))
  98. }
  99. func (h *hub) remove(c *client) {
  100. c.closeOnce.Do(func() { close(c.done) })
  101. h.mu.Lock()
  102. defer h.mu.Unlock()
  103. delete(h.clients, c)
  104. log.Printf("ws disconnected (%d clients)", len(h.clients))
  105. }
  106. func (h *hub) broadcast(frame SpectrumFrame) {
  107. b, err := json.Marshal(frame)
  108. if err != nil {
  109. log.Printf("marshal frame: %v", err)
  110. return
  111. }
  112. h.mu.Lock()
  113. clients := make([]*client, 0, len(h.clients))
  114. for c := range h.clients {
  115. clients = append(clients, c)
  116. }
  117. h.mu.Unlock()
  118. for _, c := range clients {
  119. select {
  120. case c.send <- b:
  121. default:
  122. h.remove(c)
  123. }
  124. }
  125. h.frameCnt++
  126. if time.Since(h.lastLogTs) > 2*time.Second {
  127. h.lastLogTs = time.Now()
  128. log.Printf("broadcast frames=%d clients=%d", h.frameCnt, len(clients))
  129. }
  130. }
  131. type sourceManager struct {
  132. mu sync.RWMutex
  133. src sdr.Source
  134. newSource func(cfg config.Config) (sdr.Source, error)
  135. }
  136. func (m *sourceManager) Restart(cfg config.Config) error {
  137. m.mu.Lock()
  138. defer m.mu.Unlock()
  139. old := m.src
  140. _ = old.Stop()
  141. next, err := m.newSource(cfg)
  142. if err != nil {
  143. _ = old.Start()
  144. m.src = old
  145. return err
  146. }
  147. if err := next.Start(); err != nil {
  148. _ = next.Stop()
  149. _ = old.Start()
  150. m.src = old
  151. return err
  152. }
  153. m.src = next
  154. return nil
  155. }
  156. func (m *sourceManager) Stats() sdr.SourceStats {
  157. m.mu.RLock()
  158. defer m.mu.RUnlock()
  159. if sp, ok := m.src.(sdr.StatsProvider); ok {
  160. return sp.Stats()
  161. }
  162. return sdr.SourceStats{}
  163. }
  164. func (m *sourceManager) Flush() {
  165. m.mu.RLock()
  166. defer m.mu.RUnlock()
  167. if fl, ok := m.src.(sdr.Flushable); ok {
  168. fl.Flush()
  169. }
  170. }
  171. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  172. return &sourceManager{src: src, newSource: newSource}
  173. }
  174. func (m *sourceManager) Start() error {
  175. m.mu.RLock()
  176. defer m.mu.RUnlock()
  177. return m.src.Start()
  178. }
  179. func (m *sourceManager) Stop() error {
  180. m.mu.RLock()
  181. defer m.mu.RUnlock()
  182. return m.src.Stop()
  183. }
  184. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  185. m.mu.RLock()
  186. defer m.mu.RUnlock()
  187. return m.src.ReadIQ(n)
  188. }
  189. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  190. m.mu.Lock()
  191. defer m.mu.Unlock()
  192. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  193. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  194. return nil
  195. }
  196. }
  197. old := m.src
  198. _ = old.Stop()
  199. next, err := m.newSource(cfg)
  200. if err != nil {
  201. _ = old.Start()
  202. return err
  203. }
  204. if err := next.Start(); err != nil {
  205. _ = next.Stop()
  206. _ = old.Start()
  207. return err
  208. }
  209. m.src = next
  210. return nil
  211. }
  212. type dspUpdate struct {
  213. cfg config.Config
  214. det *detector.Detector
  215. window []float64
  216. dcBlock bool
  217. iqBalance bool
  218. useGPUFFT bool
  219. }
  220. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  221. select {
  222. case ch <- update:
  223. default:
  224. select {
  225. case <-ch:
  226. default:
  227. }
  228. ch <- update
  229. }
  230. }
  231. func main() {
  232. var cfgPath string
  233. var mockFlag bool
  234. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  235. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  236. flag.Parse()
  237. cfg, err := config.Load(cfgPath)
  238. if err != nil {
  239. log.Fatalf("load config: %v", err)
  240. }
  241. cfgManager := runtime.New(cfg)
  242. gpuState := &gpuStatus{Available: gpufft.Available()}
  243. newSource := func(cfg config.Config) (sdr.Source, error) {
  244. if mockFlag {
  245. src := mock.New(cfg.SampleRate)
  246. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  247. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  248. }
  249. return src, nil
  250. }
  251. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  252. if err != nil {
  253. return nil, err
  254. }
  255. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  256. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  257. }
  258. return src, nil
  259. }
  260. src, err := newSource(cfg)
  261. if err != nil {
  262. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  263. }
  264. srcMgr := newSourceManager(src, newSource)
  265. if err := srcMgr.Start(); err != nil {
  266. log.Fatalf("source start: %v", err)
  267. }
  268. defer srcMgr.Stop()
  269. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  270. log.Fatalf("event path: %v", err)
  271. }
  272. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  273. if err != nil {
  274. log.Fatalf("open events: %v", err)
  275. }
  276. defer eventFile.Close()
  277. eventMu := &sync.RWMutex{}
  278. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  279. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  280. time.Duration(cfg.Detector.HoldMs)*time.Millisecond,
  281. cfg.Detector.EmaAlpha,
  282. cfg.Detector.HysteresisDb,
  283. cfg.Detector.MinStableFrames,
  284. time.Duration(cfg.Detector.GapToleranceMs)*time.Millisecond,
  285. cfg.Detector.CFARMode,
  286. cfg.Detector.CFARGuardCells,
  287. cfg.Detector.CFARTrainCells,
  288. cfg.Detector.CFARRank,
  289. cfg.Detector.CFARScaleDb,
  290. cfg.Detector.CFARWrapAround)
  291. window := fftutil.Hann(cfg.FFTSize)
  292. h := newHub()
  293. dspUpdates := make(chan dspUpdate, 1)
  294. ctx, cancel := context.WithCancel(context.Background())
  295. defer cancel()
  296. decodeMap := buildDecoderMap(cfg)
  297. recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  298. Enabled: cfg.Recorder.Enabled,
  299. MinSNRDb: cfg.Recorder.MinSNRDb,
  300. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  301. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  302. PrerollMs: cfg.Recorder.PrerollMs,
  303. RecordIQ: cfg.Recorder.RecordIQ,
  304. RecordAudio: cfg.Recorder.RecordAudio,
  305. AutoDemod: cfg.Recorder.AutoDemod,
  306. AutoDecode: cfg.Recorder.AutoDecode,
  307. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  308. OutputDir: cfg.Recorder.OutputDir,
  309. ClassFilter: cfg.Recorder.ClassFilter,
  310. RingSeconds: cfg.Recorder.RingSeconds,
  311. }, cfg.CenterHz, decodeMap)
  312. sigSnap := &signalSnapshot{}
  313. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr, sigSnap)
  314. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  315. origin := r.Header.Get("Origin")
  316. if origin == "" || origin == "null" {
  317. return true
  318. }
  319. // allow same-host or any local IP
  320. return true
  321. }}
  322. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  323. conn, err := upgrader.Upgrade(w, r, nil)
  324. if err != nil {
  325. log.Printf("ws upgrade failed: %v (origin: %s)", err, r.Header.Get("Origin"))
  326. return
  327. }
  328. c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})}
  329. h.add(c)
  330. defer func() {
  331. h.remove(c)
  332. _ = conn.Close()
  333. }()
  334. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  335. conn.SetPongHandler(func(string) error {
  336. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  337. return nil
  338. })
  339. go func() {
  340. ping := time.NewTicker(30 * time.Second)
  341. defer ping.Stop()
  342. for {
  343. select {
  344. case msg, ok := <-c.send:
  345. if !ok {
  346. return
  347. }
  348. _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  349. if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
  350. return
  351. }
  352. case <-ping.C:
  353. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  354. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  355. log.Printf("ws ping error: %v", err)
  356. return
  357. }
  358. }
  359. }
  360. }()
  361. for {
  362. _, _, err := conn.ReadMessage()
  363. if err != nil {
  364. return
  365. }
  366. }
  367. })
  368. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  369. w.Header().Set("Content-Type", "application/json")
  370. switch r.Method {
  371. case http.MethodGet:
  372. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  373. case http.MethodPost:
  374. var update runtime.ConfigUpdate
  375. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  376. http.Error(w, "invalid json", http.StatusBadRequest)
  377. return
  378. }
  379. prev := cfgManager.Snapshot()
  380. next, err := cfgManager.ApplyConfig(update)
  381. if err != nil {
  382. http.Error(w, err.Error(), http.StatusBadRequest)
  383. return
  384. }
  385. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  386. if sourceChanged {
  387. if err := srcMgr.ApplyConfig(next); err != nil {
  388. cfgManager.Replace(prev)
  389. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  390. return
  391. }
  392. }
  393. if err := config.Save(cfgPath, next); err != nil {
  394. log.Printf("config save failed: %v", err)
  395. }
  396. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  397. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  398. prev.Detector.HoldMs != next.Detector.HoldMs ||
  399. prev.Detector.EmaAlpha != next.Detector.EmaAlpha ||
  400. prev.Detector.HysteresisDb != next.Detector.HysteresisDb ||
  401. prev.Detector.MinStableFrames != next.Detector.MinStableFrames ||
  402. prev.Detector.GapToleranceMs != next.Detector.GapToleranceMs ||
  403. prev.Detector.CFARMode != next.Detector.CFARMode ||
  404. prev.Detector.CFARGuardCells != next.Detector.CFARGuardCells ||
  405. prev.Detector.CFARTrainCells != next.Detector.CFARTrainCells ||
  406. prev.Detector.CFARRank != next.Detector.CFARRank ||
  407. prev.Detector.CFARScaleDb != next.Detector.CFARScaleDb ||
  408. prev.Detector.CFARWrapAround != next.Detector.CFARWrapAround ||
  409. prev.SampleRate != next.SampleRate ||
  410. prev.FFTSize != next.FFTSize
  411. windowChanged := prev.FFTSize != next.FFTSize
  412. var newDet *detector.Detector
  413. var newWindow []float64
  414. if detChanged {
  415. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  416. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  417. time.Duration(next.Detector.HoldMs)*time.Millisecond,
  418. next.Detector.EmaAlpha,
  419. next.Detector.HysteresisDb,
  420. next.Detector.MinStableFrames,
  421. time.Duration(next.Detector.GapToleranceMs)*time.Millisecond,
  422. next.Detector.CFARMode,
  423. next.Detector.CFARGuardCells,
  424. next.Detector.CFARTrainCells,
  425. next.Detector.CFARRank,
  426. next.Detector.CFARScaleDb,
  427. next.Detector.CFARWrapAround)
  428. }
  429. if windowChanged {
  430. newWindow = fftutil.Hann(next.FFTSize)
  431. }
  432. pushDSPUpdate(dspUpdates, dspUpdate{
  433. cfg: next,
  434. det: newDet,
  435. window: newWindow,
  436. dcBlock: next.DCBlock,
  437. iqBalance: next.IQBalance,
  438. useGPUFFT: next.UseGPUFFT,
  439. })
  440. _ = json.NewEncoder(w).Encode(next)
  441. default:
  442. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  443. }
  444. })
  445. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  446. w.Header().Set("Content-Type", "application/json")
  447. if r.Method != http.MethodPost {
  448. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  449. return
  450. }
  451. var update runtime.SettingsUpdate
  452. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  453. http.Error(w, "invalid json", http.StatusBadRequest)
  454. return
  455. }
  456. prev := cfgManager.Snapshot()
  457. next, err := cfgManager.ApplySettings(update)
  458. if err != nil {
  459. http.Error(w, err.Error(), http.StatusBadRequest)
  460. return
  461. }
  462. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  463. if err := srcMgr.ApplyConfig(next); err != nil {
  464. cfgManager.Replace(prev)
  465. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  466. return
  467. }
  468. }
  469. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  470. pushDSPUpdate(dspUpdates, dspUpdate{
  471. cfg: next,
  472. dcBlock: next.DCBlock,
  473. iqBalance: next.IQBalance,
  474. })
  475. }
  476. if err := config.Save(cfgPath, next); err != nil {
  477. log.Printf("config save failed: %v", err)
  478. }
  479. _ = json.NewEncoder(w).Encode(next)
  480. })
  481. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  482. w.Header().Set("Content-Type", "application/json")
  483. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  484. })
  485. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  486. w.Header().Set("Content-Type", "application/json")
  487. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  488. })
  489. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  490. w.Header().Set("Content-Type", "application/json")
  491. limit := 200
  492. if v := r.URL.Query().Get("limit"); v != "" {
  493. if parsed, err := strconv.Atoi(v); err == nil {
  494. limit = parsed
  495. }
  496. }
  497. var since time.Time
  498. if v := r.URL.Query().Get("since"); v != "" {
  499. if parsed, err := parseSince(v); err == nil {
  500. since = parsed
  501. } else {
  502. http.Error(w, "invalid since", http.StatusBadRequest)
  503. return
  504. }
  505. }
  506. snap := cfgManager.Snapshot()
  507. eventMu.RLock()
  508. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  509. eventMu.RUnlock()
  510. if err != nil {
  511. http.Error(w, "failed to read events", http.StatusInternalServerError)
  512. return
  513. }
  514. _ = json.NewEncoder(w).Encode(evs)
  515. })
  516. http.HandleFunc("/api/signals", func(w http.ResponseWriter, r *http.Request) {
  517. w.Header().Set("Content-Type", "application/json")
  518. if sigSnap == nil {
  519. _ = json.NewEncoder(w).Encode([]detector.Signal{})
  520. return
  521. }
  522. _ = json.NewEncoder(w).Encode(sigSnap.get())
  523. })
  524. http.HandleFunc("/api/decoders", func(w http.ResponseWriter, r *http.Request) {
  525. w.Header().Set("Content-Type", "application/json")
  526. _ = json.NewEncoder(w).Encode(decoderKeys(cfgManager.Snapshot()))
  527. })
  528. http.HandleFunc("/api/recordings", func(w http.ResponseWriter, r *http.Request) {
  529. if r.Method != http.MethodGet {
  530. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  531. return
  532. }
  533. w.Header().Set("Content-Type", "application/json")
  534. snap := cfgManager.Snapshot()
  535. list, err := recorder.ListRecordings(snap.Recorder.OutputDir)
  536. if err != nil {
  537. http.Error(w, "failed to list recordings", http.StatusInternalServerError)
  538. return
  539. }
  540. _ = json.NewEncoder(w).Encode(list)
  541. })
  542. http.HandleFunc("/api/recordings/", func(w http.ResponseWriter, r *http.Request) {
  543. w.Header().Set("Content-Type", "application/json")
  544. id := strings.TrimPrefix(r.URL.Path, "/api/recordings/")
  545. if id == "" {
  546. http.Error(w, "missing id", http.StatusBadRequest)
  547. return
  548. }
  549. snap := cfgManager.Snapshot()
  550. base := filepath.Clean(filepath.Join(snap.Recorder.OutputDir, id))
  551. if !strings.HasPrefix(base, filepath.Clean(snap.Recorder.OutputDir)) {
  552. http.Error(w, "invalid path", http.StatusBadRequest)
  553. return
  554. }
  555. if r.URL.Path == "/api/recordings/"+id+"/audio" {
  556. http.ServeFile(w, r, filepath.Join(base, "audio.wav"))
  557. return
  558. }
  559. if r.URL.Path == "/api/recordings/"+id+"/iq" {
  560. http.ServeFile(w, r, filepath.Join(base, "signal.cf32"))
  561. return
  562. }
  563. if r.URL.Path == "/api/recordings/"+id+"/decode" {
  564. mode := r.URL.Query().Get("mode")
  565. cmd := buildDecoderMap(cfgManager.Snapshot())[mode]
  566. if cmd == "" {
  567. http.Error(w, "decoder not configured", http.StatusBadRequest)
  568. return
  569. }
  570. meta, err := recorder.ReadMeta(filepath.Join(base, "meta.json"))
  571. if err != nil {
  572. http.Error(w, "meta read failed", http.StatusInternalServerError)
  573. return
  574. }
  575. audioPath := filepath.Join(base, "audio.wav")
  576. if _, errStat := os.Stat(audioPath); errStat != nil {
  577. audioPath = ""
  578. }
  579. res, err := recorder.DecodeOnDemand(cmd, filepath.Join(base, "signal.cf32"), meta.SampleRate, audioPath)
  580. if err != nil {
  581. http.Error(w, res.Stderr, http.StatusInternalServerError)
  582. return
  583. }
  584. _ = json.NewEncoder(w).Encode(res)
  585. return
  586. }
  587. // default: meta.json
  588. http.ServeFile(w, r, filepath.Join(base, "meta.json"))
  589. })
  590. http.HandleFunc("/api/demod", func(w http.ResponseWriter, r *http.Request) {
  591. if r.Method != http.MethodGet {
  592. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  593. return
  594. }
  595. q := r.URL.Query()
  596. freq, _ := strconv.ParseFloat(q.Get("freq"), 64)
  597. bw, _ := strconv.ParseFloat(q.Get("bw"), 64)
  598. sec, _ := strconv.Atoi(q.Get("sec"))
  599. if sec < 1 {
  600. sec = 1
  601. }
  602. if sec > 10 {
  603. sec = 10
  604. }
  605. mode := q.Get("mode")
  606. data, _, err := recMgr.DemodLive(freq, bw, mode, sec)
  607. if err != nil {
  608. http.Error(w, err.Error(), http.StatusBadRequest)
  609. return
  610. }
  611. w.Header().Set("Content-Type", "audio/wav")
  612. _, _ = w.Write(data)
  613. })
  614. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  615. server := &http.Server{Addr: cfg.WebAddr}
  616. go func() {
  617. log.Printf("web listening on %s", cfg.WebAddr)
  618. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  619. log.Fatalf("server: %v", err)
  620. }
  621. }()
  622. stop := make(chan os.Signal, 1)
  623. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  624. <-stop
  625. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  626. defer cancelTimeout()
  627. _ = server.Shutdown(ctxTimeout)
  628. }
  629. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) {
  630. defer func() {
  631. if r := recover(); r != nil {
  632. log.Printf("FATAL: runDSP goroutine panic: %v\n%s", r, debug.Stack())
  633. }
  634. }()
  635. ticker := time.NewTicker(cfg.FrameInterval())
  636. defer ticker.Stop()
  637. logTicker := time.NewTicker(5 * time.Second)
  638. defer logTicker.Stop()
  639. enc := json.NewEncoder(eventFile)
  640. dcBlocker := dsp.NewDCBlocker(0.995)
  641. dcEnabled := cfg.DCBlock
  642. iqEnabled := cfg.IQBalance
  643. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  644. useGPU := cfg.UseGPUFFT
  645. var gpuEngine *gpufft.Engine
  646. if useGPU && gpuState != nil {
  647. snap := gpuState.snapshot()
  648. if snap.Available {
  649. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  650. gpuEngine = eng
  651. gpuState.set(true, nil)
  652. } else {
  653. gpuState.set(false, err)
  654. useGPU = false
  655. }
  656. } else {
  657. gpuState.set(false, nil)
  658. useGPU = false
  659. }
  660. } else if gpuState != nil {
  661. gpuState.set(false, nil)
  662. }
  663. gotSamples := false
  664. for {
  665. select {
  666. case <-ctx.Done():
  667. return
  668. case <-logTicker.C:
  669. st := srcMgr.Stats()
  670. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  671. case upd := <-updates:
  672. prevFFT := cfg.FFTSize
  673. prevUseGPU := useGPU
  674. cfg = upd.cfg
  675. if rec != nil {
  676. rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  677. Enabled: cfg.Recorder.Enabled,
  678. MinSNRDb: cfg.Recorder.MinSNRDb,
  679. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  680. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  681. PrerollMs: cfg.Recorder.PrerollMs,
  682. RecordIQ: cfg.Recorder.RecordIQ,
  683. RecordAudio: cfg.Recorder.RecordAudio,
  684. AutoDemod: cfg.Recorder.AutoDemod,
  685. AutoDecode: cfg.Recorder.AutoDecode,
  686. MaxDiskMB: cfg.Recorder.MaxDiskMB,
  687. OutputDir: cfg.Recorder.OutputDir,
  688. ClassFilter: cfg.Recorder.ClassFilter,
  689. RingSeconds: cfg.Recorder.RingSeconds,
  690. }, cfg.CenterHz, buildDecoderMap(cfg))
  691. }
  692. if upd.det != nil {
  693. det = upd.det
  694. }
  695. if upd.window != nil {
  696. window = upd.window
  697. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  698. }
  699. dcEnabled = upd.dcBlock
  700. iqEnabled = upd.iqBalance
  701. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  702. srcMgr.Flush()
  703. gotSamples = false
  704. if gpuEngine != nil {
  705. gpuEngine.Close()
  706. gpuEngine = nil
  707. }
  708. useGPU = cfg.UseGPUFFT
  709. if useGPU && gpuState != nil {
  710. snap := gpuState.snapshot()
  711. if snap.Available {
  712. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  713. gpuEngine = eng
  714. gpuState.set(true, nil)
  715. } else {
  716. gpuState.set(false, err)
  717. useGPU = false
  718. }
  719. } else {
  720. gpuState.set(false, nil)
  721. useGPU = false
  722. }
  723. } else if gpuState != nil {
  724. gpuState.set(false, nil)
  725. }
  726. }
  727. dcBlocker.Reset()
  728. ticker.Reset(cfg.FrameInterval())
  729. case <-ticker.C:
  730. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  731. if err != nil {
  732. log.Printf("read IQ: %v", err)
  733. if strings.Contains(err.Error(), "timeout") {
  734. if err := srcMgr.Restart(cfg); err != nil {
  735. log.Printf("restart failed: %v", err)
  736. }
  737. }
  738. continue
  739. }
  740. if rec != nil {
  741. rec.Ingest(time.Now(), iq)
  742. }
  743. if !gotSamples {
  744. log.Printf("received IQ samples")
  745. gotSamples = true
  746. }
  747. if dcEnabled {
  748. dcBlocker.Apply(iq)
  749. }
  750. if iqEnabled {
  751. dsp.IQBalance(iq)
  752. }
  753. var spectrum []float64
  754. if useGPU && gpuEngine != nil {
  755. if len(window) == len(iq) {
  756. for i := 0; i < len(iq); i++ {
  757. v := iq[i]
  758. w := float32(window[i])
  759. iq[i] = complex(real(v)*w, imag(v)*w)
  760. }
  761. }
  762. out, err := gpuEngine.Exec(iq)
  763. if err != nil {
  764. if gpuState != nil {
  765. gpuState.set(false, err)
  766. }
  767. useGPU = false
  768. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  769. } else {
  770. spectrum = fftutil.SpectrumFromFFT(out)
  771. }
  772. } else {
  773. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  774. }
  775. for i := range spectrum {
  776. if math.IsNaN(spectrum[i]) || math.IsInf(spectrum[i], 0) {
  777. spectrum[i] = -200
  778. }
  779. }
  780. now := time.Now()
  781. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  782. thresholds := det.LastThresholds()
  783. noiseFloor := det.LastNoiseFloor()
  784. // enrich classification with temporal IQ features on per-signal snippet
  785. if len(iq) > 0 {
  786. for i := range signals {
  787. snip := extractSignalIQ(iq, cfg.SampleRate, cfg.CenterHz, signals[i].CenterHz, signals[i].BWHz)
  788. cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, snip)
  789. signals[i].Class = cls
  790. }
  791. det.UpdateClasses(signals)
  792. }
  793. if sigSnap != nil {
  794. sigSnap.set(signals)
  795. }
  796. eventMu.Lock()
  797. for _, ev := range finished {
  798. _ = enc.Encode(ev)
  799. }
  800. eventMu.Unlock()
  801. if rec != nil && len(finished) > 0 {
  802. evCopy := make([]detector.Event, len(finished))
  803. copy(evCopy, finished)
  804. go rec.OnEvents(evCopy)
  805. }
  806. h.broadcast(SpectrumFrame{
  807. Timestamp: now.UnixMilli(),
  808. CenterHz: cfg.CenterHz,
  809. SampleHz: cfg.SampleRate,
  810. FFTSize: cfg.FFTSize,
  811. Spectrum: spectrum,
  812. Thresholds: thresholds,
  813. NoiseFloor: noiseFloor,
  814. Signals: signals,
  815. })
  816. }
  817. }
  818. }
  819. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  820. if raw == "" {
  821. return fallback
  822. }
  823. if d, err := time.ParseDuration(raw); err == nil {
  824. return d
  825. }
  826. return fallback
  827. }
  828. func buildDecoderMap(cfg config.Config) map[string]string {
  829. out := map[string]string{}
  830. if cfg.Decoder.FT8Cmd != "" {
  831. out["FT8"] = cfg.Decoder.FT8Cmd
  832. }
  833. if cfg.Decoder.WSPRCmd != "" {
  834. out["WSPR"] = cfg.Decoder.WSPRCmd
  835. }
  836. if cfg.Decoder.DMRCmd != "" {
  837. out["DMR"] = cfg.Decoder.DMRCmd
  838. }
  839. if cfg.Decoder.DStarCmd != "" {
  840. out["D-STAR"] = cfg.Decoder.DStarCmd
  841. }
  842. if cfg.Decoder.FSKCmd != "" {
  843. out["FSK"] = cfg.Decoder.FSKCmd
  844. }
  845. if cfg.Decoder.PSKCmd != "" {
  846. out["PSK"] = cfg.Decoder.PSKCmd
  847. }
  848. return out
  849. }
  850. func decoderKeys(cfg config.Config) []string {
  851. m := buildDecoderMap(cfg)
  852. keys := make([]string, 0, len(m))
  853. for k := range m {
  854. keys = append(keys, k)
  855. }
  856. sort.Strings(keys)
  857. return keys
  858. }
  859. func extractSignalIQ(iq []complex64, sampleRate int, centerHz float64, sigHz float64, bwHz float64) []complex64 {
  860. if len(iq) == 0 || sampleRate <= 0 {
  861. return nil
  862. }
  863. offset := sigHz - centerHz
  864. shifted := dsp.FreqShift(iq, sampleRate, offset)
  865. cutoff := bwHz / 2
  866. if cutoff < 200 {
  867. cutoff = 200
  868. }
  869. if cutoff > float64(sampleRate)/2-1 {
  870. cutoff = float64(sampleRate)/2 - 1
  871. }
  872. taps := dsp.LowpassFIR(cutoff, sampleRate, 101)
  873. filtered := dsp.ApplyFIR(shifted, taps)
  874. decim := sampleRate / 200000
  875. if decim < 1 {
  876. decim = 1
  877. }
  878. return dsp.Decimate(filtered, decim)
  879. }
  880. func parseSince(raw string) (time.Time, error) {
  881. if raw == "" {
  882. return time.Time{}, nil
  883. }
  884. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  885. if ms > 1e12 {
  886. return time.UnixMilli(ms), nil
  887. }
  888. return time.Unix(ms, 0), nil
  889. }
  890. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  891. return t, nil
  892. }
  893. return time.Parse(time.RFC3339, raw)
  894. }