Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

843 rindas
22KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/gorilla/websocket"
  17. "sdr-visual-suite/internal/classifier"
  18. "sdr-visual-suite/internal/config"
  19. "sdr-visual-suite/internal/detector"
  20. "sdr-visual-suite/internal/dsp"
  21. "sdr-visual-suite/internal/events"
  22. fftutil "sdr-visual-suite/internal/fft"
  23. "sdr-visual-suite/internal/fft/gpufft"
  24. "sdr-visual-suite/internal/mock"
  25. "sdr-visual-suite/internal/recorder"
  26. "sdr-visual-suite/internal/runtime"
  27. "sdr-visual-suite/internal/sdr"
  28. "sdr-visual-suite/internal/sdrplay"
  29. )
  30. type SpectrumFrame struct {
  31. Timestamp int64 `json:"ts"`
  32. CenterHz float64 `json:"center_hz"`
  33. SampleHz int `json:"sample_rate"`
  34. FFTSize int `json:"fft_size"`
  35. Spectrum []float64 `json:"spectrum_db"`
  36. Signals []detector.Signal `json:"signals"`
  37. }
  38. type client struct {
  39. conn *websocket.Conn
  40. send chan []byte
  41. done chan struct{}
  42. closeOnce sync.Once
  43. }
  44. type hub struct {
  45. mu sync.Mutex
  46. clients map[*client]struct{}
  47. }
  48. type gpuStatus struct {
  49. mu sync.RWMutex
  50. Available bool `json:"available"`
  51. Active bool `json:"active"`
  52. Error string `json:"error"`
  53. }
  54. type signalSnapshot struct {
  55. mu sync.RWMutex
  56. signals []detector.Signal
  57. }
  58. func (s *signalSnapshot) set(sig []detector.Signal) {
  59. s.mu.Lock()
  60. defer s.mu.Unlock()
  61. s.signals = append([]detector.Signal(nil), sig...)
  62. }
  63. func (s *signalSnapshot) get() []detector.Signal {
  64. s.mu.RLock()
  65. defer s.mu.RUnlock()
  66. return append([]detector.Signal(nil), s.signals...)
  67. }
  68. func (g *gpuStatus) set(active bool, err error) {
  69. g.mu.Lock()
  70. defer g.mu.Unlock()
  71. g.Active = active
  72. if err != nil {
  73. g.Error = err.Error()
  74. } else {
  75. g.Error = ""
  76. }
  77. }
  78. func (g *gpuStatus) snapshot() gpuStatus {
  79. g.mu.RLock()
  80. defer g.mu.RUnlock()
  81. return gpuStatus{Available: g.Available, Active: g.Active, Error: g.Error}
  82. }
  83. func newHub() *hub {
  84. return &hub{clients: map[*client]struct{}{}}
  85. }
  86. func (h *hub) add(c *client) {
  87. h.mu.Lock()
  88. defer h.mu.Unlock()
  89. h.clients[c] = struct{}{}
  90. }
  91. func (h *hub) remove(c *client) {
  92. c.closeOnce.Do(func() { close(c.done) })
  93. h.mu.Lock()
  94. defer h.mu.Unlock()
  95. delete(h.clients, c)
  96. }
  97. func (h *hub) broadcast(frame SpectrumFrame) {
  98. b, err := json.Marshal(frame)
  99. if err != nil {
  100. log.Printf("marshal frame: %v", err)
  101. return
  102. }
  103. h.mu.Lock()
  104. clients := make([]*client, 0, len(h.clients))
  105. for c := range h.clients {
  106. clients = append(clients, c)
  107. }
  108. h.mu.Unlock()
  109. for _, c := range clients {
  110. select {
  111. case c.send <- b:
  112. default:
  113. h.remove(c)
  114. }
  115. }
  116. }
  117. type sourceManager struct {
  118. mu sync.RWMutex
  119. src sdr.Source
  120. newSource func(cfg config.Config) (sdr.Source, error)
  121. }
  122. func (m *sourceManager) Restart(cfg config.Config) error {
  123. m.mu.Lock()
  124. defer m.mu.Unlock()
  125. old := m.src
  126. _ = old.Stop()
  127. next, err := m.newSource(cfg)
  128. if err != nil {
  129. _ = old.Start()
  130. m.src = old
  131. return err
  132. }
  133. if err := next.Start(); err != nil {
  134. _ = next.Stop()
  135. _ = old.Start()
  136. m.src = old
  137. return err
  138. }
  139. m.src = next
  140. return nil
  141. }
  142. func (m *sourceManager) Stats() sdr.SourceStats {
  143. m.mu.RLock()
  144. defer m.mu.RUnlock()
  145. if sp, ok := m.src.(sdr.StatsProvider); ok {
  146. return sp.Stats()
  147. }
  148. return sdr.SourceStats{}
  149. }
  150. func (m *sourceManager) Flush() {
  151. m.mu.RLock()
  152. defer m.mu.RUnlock()
  153. if fl, ok := m.src.(sdr.Flushable); ok {
  154. fl.Flush()
  155. }
  156. }
  157. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  158. return &sourceManager{src: src, newSource: newSource}
  159. }
  160. func (m *sourceManager) Start() error {
  161. m.mu.RLock()
  162. defer m.mu.RUnlock()
  163. return m.src.Start()
  164. }
  165. func (m *sourceManager) Stop() error {
  166. m.mu.RLock()
  167. defer m.mu.RUnlock()
  168. return m.src.Stop()
  169. }
  170. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  171. m.mu.RLock()
  172. defer m.mu.RUnlock()
  173. return m.src.ReadIQ(n)
  174. }
  175. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  176. m.mu.Lock()
  177. defer m.mu.Unlock()
  178. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  179. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  180. return nil
  181. }
  182. }
  183. old := m.src
  184. _ = old.Stop()
  185. next, err := m.newSource(cfg)
  186. if err != nil {
  187. _ = old.Start()
  188. return err
  189. }
  190. if err := next.Start(); err != nil {
  191. _ = next.Stop()
  192. _ = old.Start()
  193. return err
  194. }
  195. m.src = next
  196. return nil
  197. }
  198. type dspUpdate struct {
  199. cfg config.Config
  200. det *detector.Detector
  201. window []float64
  202. dcBlock bool
  203. iqBalance bool
  204. useGPUFFT bool
  205. }
  206. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  207. select {
  208. case ch <- update:
  209. default:
  210. select {
  211. case <-ch:
  212. default:
  213. }
  214. ch <- update
  215. }
  216. }
  217. func main() {
  218. var cfgPath string
  219. var mockFlag bool
  220. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  221. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  222. flag.Parse()
  223. cfg, err := config.Load(cfgPath)
  224. if err != nil {
  225. log.Fatalf("load config: %v", err)
  226. }
  227. cfgManager := runtime.New(cfg)
  228. gpuState := &gpuStatus{Available: gpufft.Available()}
  229. newSource := func(cfg config.Config) (sdr.Source, error) {
  230. if mockFlag {
  231. src := mock.New(cfg.SampleRate)
  232. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  233. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  234. }
  235. return src, nil
  236. }
  237. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  238. if err != nil {
  239. return nil, err
  240. }
  241. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  242. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  243. }
  244. return src, nil
  245. }
  246. src, err := newSource(cfg)
  247. if err != nil {
  248. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  249. }
  250. srcMgr := newSourceManager(src, newSource)
  251. if err := srcMgr.Start(); err != nil {
  252. log.Fatalf("source start: %v", err)
  253. }
  254. defer srcMgr.Stop()
  255. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  256. log.Fatalf("event path: %v", err)
  257. }
  258. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  259. if err != nil {
  260. log.Fatalf("open events: %v", err)
  261. }
  262. defer eventFile.Close()
  263. eventMu := &sync.RWMutex{}
  264. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  265. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  266. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  267. window := fftutil.Hann(cfg.FFTSize)
  268. h := newHub()
  269. dspUpdates := make(chan dspUpdate, 1)
  270. ctx, cancel := context.WithCancel(context.Background())
  271. defer cancel()
  272. decodeMap := buildDecoderMap(cfg)
  273. recMgr := recorder.New(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  274. Enabled: cfg.Recorder.Enabled,
  275. MinSNRDb: cfg.Recorder.MinSNRDb,
  276. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  277. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  278. PrerollMs: cfg.Recorder.PrerollMs,
  279. RecordIQ: cfg.Recorder.RecordIQ,
  280. RecordAudio: cfg.Recorder.RecordAudio,
  281. AutoDemod: cfg.Recorder.AutoDemod,
  282. AutoDecode: cfg.Recorder.AutoDecode,
  283. OutputDir: cfg.Recorder.OutputDir,
  284. ClassFilter: cfg.Recorder.ClassFilter,
  285. RingSeconds: cfg.Recorder.RingSeconds,
  286. }, cfg.CenterHz, decodeMap)
  287. sigSnap := &signalSnapshot{}
  288. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, eventMu, dspUpdates, gpuState, recMgr, sigSnap)
  289. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  290. origin := r.Header.Get("Origin")
  291. return origin == "" || strings.HasPrefix(origin, "http://localhost") || strings.HasPrefix(origin, "http://127.0.0.1")
  292. }}
  293. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  294. conn, err := upgrader.Upgrade(w, r, nil)
  295. if err != nil {
  296. return
  297. }
  298. c := &client{conn: conn, send: make(chan []byte, 32), done: make(chan struct{})}
  299. h.add(c)
  300. defer func() {
  301. h.remove(c)
  302. _ = conn.Close()
  303. }()
  304. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  305. conn.SetPongHandler(func(string) error {
  306. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  307. return nil
  308. })
  309. go func() {
  310. ping := time.NewTicker(30 * time.Second)
  311. defer ping.Stop()
  312. for {
  313. select {
  314. case msg, ok := <-c.send:
  315. if !ok {
  316. return
  317. }
  318. _ = conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
  319. if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
  320. return
  321. }
  322. case <-ping.C:
  323. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  324. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  325. return
  326. }
  327. }
  328. }
  329. }()
  330. for {
  331. _, _, err := conn.ReadMessage()
  332. if err != nil {
  333. return
  334. }
  335. }
  336. })
  337. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  338. w.Header().Set("Content-Type", "application/json")
  339. switch r.Method {
  340. case http.MethodGet:
  341. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  342. case http.MethodPost:
  343. var update runtime.ConfigUpdate
  344. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  345. http.Error(w, "invalid json", http.StatusBadRequest)
  346. return
  347. }
  348. prev := cfgManager.Snapshot()
  349. next, err := cfgManager.ApplyConfig(update)
  350. if err != nil {
  351. http.Error(w, err.Error(), http.StatusBadRequest)
  352. return
  353. }
  354. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  355. if sourceChanged {
  356. if err := srcMgr.ApplyConfig(next); err != nil {
  357. cfgManager.Replace(prev)
  358. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  359. return
  360. }
  361. }
  362. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  363. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  364. prev.Detector.HoldMs != next.Detector.HoldMs ||
  365. prev.SampleRate != next.SampleRate ||
  366. prev.FFTSize != next.FFTSize
  367. windowChanged := prev.FFTSize != next.FFTSize
  368. var newDet *detector.Detector
  369. var newWindow []float64
  370. if detChanged {
  371. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  372. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  373. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  374. }
  375. if windowChanged {
  376. newWindow = fftutil.Hann(next.FFTSize)
  377. }
  378. pushDSPUpdate(dspUpdates, dspUpdate{
  379. cfg: next,
  380. det: newDet,
  381. window: newWindow,
  382. dcBlock: next.DCBlock,
  383. iqBalance: next.IQBalance,
  384. useGPUFFT: next.UseGPUFFT,
  385. })
  386. _ = json.NewEncoder(w).Encode(next)
  387. default:
  388. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  389. }
  390. })
  391. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  392. w.Header().Set("Content-Type", "application/json")
  393. if r.Method != http.MethodPost {
  394. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  395. return
  396. }
  397. var update runtime.SettingsUpdate
  398. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  399. http.Error(w, "invalid json", http.StatusBadRequest)
  400. return
  401. }
  402. prev := cfgManager.Snapshot()
  403. next, err := cfgManager.ApplySettings(update)
  404. if err != nil {
  405. http.Error(w, err.Error(), http.StatusBadRequest)
  406. return
  407. }
  408. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  409. if err := srcMgr.ApplyConfig(next); err != nil {
  410. cfgManager.Replace(prev)
  411. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  412. return
  413. }
  414. }
  415. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  416. pushDSPUpdate(dspUpdates, dspUpdate{
  417. cfg: next,
  418. dcBlock: next.DCBlock,
  419. iqBalance: next.IQBalance,
  420. })
  421. }
  422. _ = json.NewEncoder(w).Encode(next)
  423. })
  424. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  425. w.Header().Set("Content-Type", "application/json")
  426. _ = json.NewEncoder(w).Encode(srcMgr.Stats())
  427. })
  428. http.HandleFunc("/api/gpu", func(w http.ResponseWriter, r *http.Request) {
  429. w.Header().Set("Content-Type", "application/json")
  430. _ = json.NewEncoder(w).Encode(gpuState.snapshot())
  431. })
  432. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  433. w.Header().Set("Content-Type", "application/json")
  434. limit := 200
  435. if v := r.URL.Query().Get("limit"); v != "" {
  436. if parsed, err := strconv.Atoi(v); err == nil {
  437. limit = parsed
  438. }
  439. }
  440. var since time.Time
  441. if v := r.URL.Query().Get("since"); v != "" {
  442. if parsed, err := parseSince(v); err == nil {
  443. since = parsed
  444. } else {
  445. http.Error(w, "invalid since", http.StatusBadRequest)
  446. return
  447. }
  448. }
  449. snap := cfgManager.Snapshot()
  450. eventMu.RLock()
  451. evs, err := events.ReadRecent(snap.EventPath, limit, since)
  452. eventMu.RUnlock()
  453. if err != nil {
  454. http.Error(w, "failed to read events", http.StatusInternalServerError)
  455. return
  456. }
  457. _ = json.NewEncoder(w).Encode(evs)
  458. })
  459. http.HandleFunc("/api/signals", func(w http.ResponseWriter, r *http.Request) {
  460. w.Header().Set("Content-Type", "application/json")
  461. if sigSnap == nil {
  462. _ = json.NewEncoder(w).Encode([]detector.Signal{})
  463. return
  464. }
  465. _ = json.NewEncoder(w).Encode(sigSnap.get())
  466. })
  467. http.HandleFunc("/api/recordings", func(w http.ResponseWriter, r *http.Request) {
  468. if r.Method != http.MethodGet {
  469. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  470. return
  471. }
  472. w.Header().Set("Content-Type", "application/json")
  473. snap := cfgManager.Snapshot()
  474. list, err := recorder.ListRecordings(snap.Recorder.OutputDir)
  475. if err != nil {
  476. http.Error(w, "failed to list recordings", http.StatusInternalServerError)
  477. return
  478. }
  479. _ = json.NewEncoder(w).Encode(list)
  480. })
  481. http.HandleFunc("/api/recordings/", func(w http.ResponseWriter, r *http.Request) {
  482. w.Header().Set("Content-Type", "application/json")
  483. id := strings.TrimPrefix(r.URL.Path, "/api/recordings/")
  484. if id == "" {
  485. http.Error(w, "missing id", http.StatusBadRequest)
  486. return
  487. }
  488. snap := cfgManager.Snapshot()
  489. base := filepath.Clean(filepath.Join(snap.Recorder.OutputDir, id))
  490. if !strings.HasPrefix(base, filepath.Clean(snap.Recorder.OutputDir)) {
  491. http.Error(w, "invalid path", http.StatusBadRequest)
  492. return
  493. }
  494. if r.URL.Path == "/api/recordings/"+id+"/audio" {
  495. http.ServeFile(w, r, filepath.Join(base, "audio.wav"))
  496. return
  497. }
  498. if r.URL.Path == "/api/recordings/"+id+"/iq" {
  499. http.ServeFile(w, r, filepath.Join(base, "signal.cf32"))
  500. return
  501. }
  502. if r.URL.Path == "/api/recordings/"+id+"/decode" {
  503. mode := r.URL.Query().Get("mode")
  504. cmd := buildDecoderMap(cfgManager.Snapshot())[mode]
  505. if cmd == "" {
  506. http.Error(w, "decoder not configured", http.StatusBadRequest)
  507. return
  508. }
  509. meta, err := recorder.ReadMeta(filepath.Join(base, "meta.json"))
  510. if err != nil {
  511. http.Error(w, "meta read failed", http.StatusInternalServerError)
  512. return
  513. }
  514. res, err := recorder.DecodeOnDemand(cmd, filepath.Join(base, "signal.cf32"), meta.SampleRate)
  515. if err != nil {
  516. http.Error(w, res.Stderr, http.StatusInternalServerError)
  517. return
  518. }
  519. _ = json.NewEncoder(w).Encode(res)
  520. return
  521. }
  522. // default: meta.json
  523. http.ServeFile(w, r, filepath.Join(base, "meta.json"))
  524. })
  525. http.HandleFunc("/api/demod", func(w http.ResponseWriter, r *http.Request) {
  526. if r.Method != http.MethodGet {
  527. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  528. return
  529. }
  530. q := r.URL.Query()
  531. freq, _ := strconv.ParseFloat(q.Get("freq"), 64)
  532. bw, _ := strconv.ParseFloat(q.Get("bw"), 64)
  533. sec, _ := strconv.Atoi(q.Get("sec"))
  534. if sec < 1 {
  535. sec = 1
  536. }
  537. if sec > 10 {
  538. sec = 10
  539. }
  540. mode := q.Get("mode")
  541. data, _, err := recMgr.DemodLive(freq, bw, mode, sec)
  542. if err != nil {
  543. http.Error(w, err.Error(), http.StatusBadRequest)
  544. return
  545. }
  546. w.Header().Set("Content-Type", "audio/wav")
  547. _, _ = w.Write(data)
  548. })
  549. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  550. server := &http.Server{Addr: cfg.WebAddr}
  551. go func() {
  552. log.Printf("web listening on %s", cfg.WebAddr)
  553. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  554. log.Fatalf("server: %v", err)
  555. }
  556. }()
  557. stop := make(chan os.Signal, 1)
  558. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  559. <-stop
  560. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  561. defer cancelTimeout()
  562. _ = server.Shutdown(ctxTimeout)
  563. }
  564. func runDSP(ctx context.Context, srcMgr *sourceManager, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, eventMu *sync.RWMutex, updates <-chan dspUpdate, gpuState *gpuStatus, rec *recorder.Manager, sigSnap *signalSnapshot) {
  565. ticker := time.NewTicker(cfg.FrameInterval())
  566. defer ticker.Stop()
  567. logTicker := time.NewTicker(5 * time.Second)
  568. defer logTicker.Stop()
  569. enc := json.NewEncoder(eventFile)
  570. dcBlocker := dsp.NewDCBlocker(0.995)
  571. dcEnabled := cfg.DCBlock
  572. iqEnabled := cfg.IQBalance
  573. plan := fftutil.NewCmplxPlan(cfg.FFTSize)
  574. useGPU := cfg.UseGPUFFT
  575. var gpuEngine *gpufft.Engine
  576. if useGPU && gpuState != nil {
  577. snap := gpuState.snapshot()
  578. if snap.Available {
  579. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  580. gpuEngine = eng
  581. gpuState.set(true, nil)
  582. } else {
  583. gpuState.set(false, err)
  584. useGPU = false
  585. }
  586. } else {
  587. gpuState.set(false, nil)
  588. useGPU = false
  589. }
  590. } else if gpuState != nil {
  591. gpuState.set(false, nil)
  592. }
  593. gotSamples := false
  594. for {
  595. select {
  596. case <-ctx.Done():
  597. return
  598. case <-logTicker.C:
  599. st := srcMgr.Stats()
  600. log.Printf("stats: buf=%d drop=%d reset=%d last=%dms", st.BufferSamples, st.Dropped, st.Resets, st.LastSampleAgoMs)
  601. case upd := <-updates:
  602. prevFFT := cfg.FFTSize
  603. prevUseGPU := useGPU
  604. cfg = upd.cfg
  605. if rec != nil {
  606. rec.Update(cfg.SampleRate, cfg.FFTSize, recorder.Policy{
  607. Enabled: cfg.Recorder.Enabled,
  608. MinSNRDb: cfg.Recorder.MinSNRDb,
  609. MinDuration: mustParseDuration(cfg.Recorder.MinDuration, 1*time.Second),
  610. MaxDuration: mustParseDuration(cfg.Recorder.MaxDuration, 300*time.Second),
  611. PrerollMs: cfg.Recorder.PrerollMs,
  612. RecordIQ: cfg.Recorder.RecordIQ,
  613. RecordAudio: cfg.Recorder.RecordAudio,
  614. AutoDemod: cfg.Recorder.AutoDemod,
  615. AutoDecode: cfg.Recorder.AutoDecode,
  616. OutputDir: cfg.Recorder.OutputDir,
  617. ClassFilter: cfg.Recorder.ClassFilter,
  618. RingSeconds: cfg.Recorder.RingSeconds,
  619. }, cfg.CenterHz, buildDecoderMap(cfg))
  620. }
  621. if upd.det != nil {
  622. det = upd.det
  623. }
  624. if upd.window != nil {
  625. window = upd.window
  626. plan = fftutil.NewCmplxPlan(cfg.FFTSize)
  627. }
  628. dcEnabled = upd.dcBlock
  629. iqEnabled = upd.iqBalance
  630. if cfg.FFTSize != prevFFT || cfg.UseGPUFFT != prevUseGPU {
  631. srcMgr.Flush()
  632. gotSamples = false
  633. if gpuEngine != nil {
  634. gpuEngine.Close()
  635. gpuEngine = nil
  636. }
  637. useGPU = cfg.UseGPUFFT
  638. if useGPU && gpuState != nil {
  639. snap := gpuState.snapshot()
  640. if snap.Available {
  641. if eng, err := gpufft.New(cfg.FFTSize); err == nil {
  642. gpuEngine = eng
  643. gpuState.set(true, nil)
  644. } else {
  645. gpuState.set(false, err)
  646. useGPU = false
  647. }
  648. } else {
  649. gpuState.set(false, nil)
  650. useGPU = false
  651. }
  652. } else if gpuState != nil {
  653. gpuState.set(false, nil)
  654. }
  655. }
  656. dcBlocker.Reset()
  657. ticker.Reset(cfg.FrameInterval())
  658. case <-ticker.C:
  659. iq, err := srcMgr.ReadIQ(cfg.FFTSize)
  660. if err != nil {
  661. log.Printf("read IQ: %v", err)
  662. if strings.Contains(err.Error(), "timeout") {
  663. if err := srcMgr.Restart(cfg); err != nil {
  664. log.Printf("restart failed: %v", err)
  665. }
  666. }
  667. continue
  668. }
  669. if rec != nil {
  670. rec.Ingest(time.Now(), iq)
  671. }
  672. if !gotSamples {
  673. log.Printf("received IQ samples")
  674. gotSamples = true
  675. }
  676. if dcEnabled {
  677. dcBlocker.Apply(iq)
  678. }
  679. if iqEnabled {
  680. dsp.IQBalance(iq)
  681. }
  682. var spectrum []float64
  683. if useGPU && gpuEngine != nil {
  684. if len(window) == len(iq) {
  685. for i := 0; i < len(iq); i++ {
  686. v := iq[i]
  687. w := float32(window[i])
  688. iq[i] = complex(real(v)*w, imag(v)*w)
  689. }
  690. }
  691. out, err := gpuEngine.Exec(iq)
  692. if err != nil {
  693. if gpuState != nil {
  694. gpuState.set(false, err)
  695. }
  696. useGPU = false
  697. spectrum = fftutil.SpectrumWithPlan(iq, nil, plan)
  698. } else {
  699. spectrum = fftutil.SpectrumFromFFT(out)
  700. }
  701. } else {
  702. spectrum = fftutil.SpectrumWithPlan(iq, window, plan)
  703. }
  704. now := time.Now()
  705. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  706. // enrich classification with temporal IQ features
  707. if len(iq) > 0 {
  708. for i := range signals {
  709. cls := classifier.Classify(classifier.SignalInput{FirstBin: signals[i].FirstBin, LastBin: signals[i].LastBin, SNRDb: signals[i].SNRDb}, spectrum, cfg.SampleRate, cfg.FFTSize, iq)
  710. signals[i].Class = cls
  711. }
  712. }
  713. if sigSnap != nil {
  714. sigSnap.set(signals)
  715. }
  716. eventMu.Lock()
  717. for _, ev := range finished {
  718. _ = enc.Encode(ev)
  719. }
  720. eventMu.Unlock()
  721. if rec != nil {
  722. rec.OnEvents(finished)
  723. }
  724. h.broadcast(SpectrumFrame{
  725. Timestamp: now.UnixMilli(),
  726. CenterHz: cfg.CenterHz,
  727. SampleHz: cfg.SampleRate,
  728. FFTSize: cfg.FFTSize,
  729. Spectrum: spectrum,
  730. Signals: signals,
  731. })
  732. }
  733. }
  734. }
  735. func mustParseDuration(raw string, fallback time.Duration) time.Duration {
  736. if raw == "" {
  737. return fallback
  738. }
  739. if d, err := time.ParseDuration(raw); err == nil {
  740. return d
  741. }
  742. return fallback
  743. }
  744. func buildDecoderMap(cfg config.Config) map[string]string {
  745. out := map[string]string{}
  746. if cfg.Decoder.FT8Cmd != "" {
  747. out["FT8"] = cfg.Decoder.FT8Cmd
  748. }
  749. if cfg.Decoder.WSPRCmd != "" {
  750. out["WSPR"] = cfg.Decoder.WSPRCmd
  751. }
  752. if cfg.Decoder.DMRCmd != "" {
  753. out["DMR"] = cfg.Decoder.DMRCmd
  754. }
  755. if cfg.Decoder.DStarCmd != "" {
  756. out["D-STAR"] = cfg.Decoder.DStarCmd
  757. }
  758. if cfg.Decoder.FSKCmd != "" {
  759. out["FSK"] = cfg.Decoder.FSKCmd
  760. }
  761. if cfg.Decoder.PSKCmd != "" {
  762. out["PSK"] = cfg.Decoder.PSKCmd
  763. }
  764. return out
  765. }
  766. func parseSince(raw string) (time.Time, error) {
  767. if raw == "" {
  768. return time.Time{}, nil
  769. }
  770. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  771. if ms > 1e12 {
  772. return time.UnixMilli(ms), nil
  773. }
  774. return time.Unix(ms, 0), nil
  775. }
  776. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  777. return t, nil
  778. }
  779. return time.Parse(time.RFC3339, raw)
  780. }