Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

441 rinda
11KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "strconv"
  12. "sync"
  13. "syscall"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. "sdr-visual-suite/internal/config"
  17. "sdr-visual-suite/internal/detector"
  18. "sdr-visual-suite/internal/dsp"
  19. "sdr-visual-suite/internal/events"
  20. fftutil "sdr-visual-suite/internal/fft"
  21. "sdr-visual-suite/internal/mock"
  22. "sdr-visual-suite/internal/runtime"
  23. "sdr-visual-suite/internal/sdr"
  24. "sdr-visual-suite/internal/sdrplay"
  25. )
  26. type SpectrumFrame struct {
  27. Timestamp int64 `json:"ts"`
  28. CenterHz float64 `json:"center_hz"`
  29. SampleHz int `json:"sample_rate"`
  30. FFTSize int `json:"fft_size"`
  31. Spectrum []float64 `json:"spectrum_db"`
  32. Signals []detector.Signal `json:"signals"`
  33. }
  34. type hub struct {
  35. mu sync.Mutex
  36. clients map[*websocket.Conn]struct{}
  37. }
  38. func newHub() *hub {
  39. return &hub{clients: map[*websocket.Conn]struct{}{}}
  40. }
  41. func (h *hub) add(c *websocket.Conn) {
  42. h.mu.Lock()
  43. defer h.mu.Unlock()
  44. h.clients[c] = struct{}{}
  45. }
  46. func (h *hub) remove(c *websocket.Conn) {
  47. h.mu.Lock()
  48. defer h.mu.Unlock()
  49. delete(h.clients, c)
  50. }
  51. func (h *hub) broadcast(frame SpectrumFrame) {
  52. h.mu.Lock()
  53. defer h.mu.Unlock()
  54. b, _ := json.Marshal(frame)
  55. for c := range h.clients {
  56. _ = c.WriteMessage(websocket.TextMessage, b)
  57. }
  58. }
  59. type sourceManager struct {
  60. mu sync.RWMutex
  61. src sdr.Source
  62. newSource func(cfg config.Config) (sdr.Source, error)
  63. }
  64. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  65. return &sourceManager{src: src, newSource: newSource}
  66. }
  67. func (m *sourceManager) Start() error {
  68. m.mu.RLock()
  69. defer m.mu.RUnlock()
  70. return m.src.Start()
  71. }
  72. func (m *sourceManager) Stop() error {
  73. m.mu.RLock()
  74. defer m.mu.RUnlock()
  75. return m.src.Stop()
  76. }
  77. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  78. m.mu.RLock()
  79. defer m.mu.RUnlock()
  80. return m.src.ReadIQ(n)
  81. }
  82. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  83. m.mu.Lock()
  84. defer m.mu.Unlock()
  85. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  86. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  87. return nil
  88. }
  89. }
  90. old := m.src
  91. _ = old.Stop()
  92. next, err := m.newSource(cfg)
  93. if err != nil {
  94. _ = old.Start()
  95. return err
  96. }
  97. if err := next.Start(); err != nil {
  98. _ = next.Stop()
  99. _ = old.Start()
  100. return err
  101. }
  102. m.src = next
  103. return nil
  104. }
  105. type dspUpdate struct {
  106. cfg config.Config
  107. det *detector.Detector
  108. window []float64
  109. dcBlock bool
  110. iqBalance bool
  111. }
  112. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  113. select {
  114. case ch <- update:
  115. default:
  116. select {
  117. case <-ch:
  118. default:
  119. }
  120. ch <- update
  121. }
  122. }
  123. func main() {
  124. var cfgPath string
  125. var mockFlag bool
  126. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  127. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  128. flag.Parse()
  129. cfg, err := config.Load(cfgPath)
  130. if err != nil {
  131. log.Fatalf("load config: %v", err)
  132. }
  133. cfgManager := runtime.New(cfg)
  134. newSource := func(cfg config.Config) (sdr.Source, error) {
  135. if mockFlag {
  136. src := mock.New(cfg.SampleRate)
  137. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  138. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  139. }
  140. return src, nil
  141. }
  142. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  143. if err != nil {
  144. return nil, err
  145. }
  146. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  147. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  148. }
  149. return src, nil
  150. }
  151. src, err := newSource(cfg)
  152. if err != nil {
  153. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  154. }
  155. srcMgr := newSourceManager(src, newSource)
  156. if err := srcMgr.Start(); err != nil {
  157. log.Fatalf("source start: %v", err)
  158. }
  159. defer srcMgr.Stop()
  160. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  161. log.Fatalf("event path: %v", err)
  162. }
  163. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  164. if err != nil {
  165. log.Fatalf("open events: %v", err)
  166. }
  167. defer eventFile.Close()
  168. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  169. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  170. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  171. window := fftutil.Hann(cfg.FFTSize)
  172. h := newHub()
  173. dspUpdates := make(chan dspUpdate, 1)
  174. ctx, cancel := context.WithCancel(context.Background())
  175. defer cancel()
  176. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, dspUpdates)
  177. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
  178. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  179. c, err := upgrader.Upgrade(w, r, nil)
  180. if err != nil {
  181. return
  182. }
  183. h.add(c)
  184. defer func() {
  185. h.remove(c)
  186. _ = c.Close()
  187. }()
  188. for {
  189. _, _, err := c.ReadMessage()
  190. if err != nil {
  191. return
  192. }
  193. }
  194. })
  195. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  196. w.Header().Set("Content-Type", "application/json")
  197. switch r.Method {
  198. case http.MethodGet:
  199. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  200. case http.MethodPost:
  201. var update runtime.ConfigUpdate
  202. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  203. http.Error(w, "invalid json", http.StatusBadRequest)
  204. return
  205. }
  206. prev := cfgManager.Snapshot()
  207. next, err := cfgManager.ApplyConfig(update)
  208. if err != nil {
  209. http.Error(w, err.Error(), http.StatusBadRequest)
  210. return
  211. }
  212. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  213. if sourceChanged {
  214. if err := srcMgr.ApplyConfig(next); err != nil {
  215. cfgManager.Replace(prev)
  216. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  217. return
  218. }
  219. }
  220. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  221. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  222. prev.Detector.HoldMs != next.Detector.HoldMs ||
  223. prev.SampleRate != next.SampleRate ||
  224. prev.FFTSize != next.FFTSize
  225. windowChanged := prev.FFTSize != next.FFTSize
  226. var newDet *detector.Detector
  227. var newWindow []float64
  228. if detChanged {
  229. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  230. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  231. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  232. }
  233. if windowChanged {
  234. newWindow = fftutil.Hann(next.FFTSize)
  235. }
  236. pushDSPUpdate(dspUpdates, dspUpdate{
  237. cfg: next,
  238. det: newDet,
  239. window: newWindow,
  240. dcBlock: next.DCBlock,
  241. iqBalance: next.IQBalance,
  242. })
  243. _ = json.NewEncoder(w).Encode(next)
  244. default:
  245. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  246. }
  247. })
  248. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  249. w.Header().Set("Content-Type", "application/json")
  250. if r.Method != http.MethodPost {
  251. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  252. return
  253. }
  254. var update runtime.SettingsUpdate
  255. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  256. http.Error(w, "invalid json", http.StatusBadRequest)
  257. return
  258. }
  259. prev := cfgManager.Snapshot()
  260. next, err := cfgManager.ApplySettings(update)
  261. if err != nil {
  262. http.Error(w, err.Error(), http.StatusBadRequest)
  263. return
  264. }
  265. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  266. if err := srcMgr.ApplyConfig(next); err != nil {
  267. cfgManager.Replace(prev)
  268. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  269. return
  270. }
  271. }
  272. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  273. pushDSPUpdate(dspUpdates, dspUpdate{
  274. cfg: next,
  275. dcBlock: next.DCBlock,
  276. iqBalance: next.IQBalance,
  277. })
  278. }
  279. _ = json.NewEncoder(w).Encode(next)
  280. })
  281. http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
  282. w.Header().Set("Content-Type", "application/json")
  283. if sp, ok := src.(sdr.StatsProvider); ok {
  284. _ = json.NewEncoder(w).Encode(sp.Stats())
  285. return
  286. }
  287. _ = json.NewEncoder(w).Encode(sdr.SourceStats{})
  288. })
  289. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  290. w.Header().Set("Content-Type", "application/json")
  291. limit := 200
  292. if v := r.URL.Query().Get("limit"); v != "" {
  293. if parsed, err := strconv.Atoi(v); err == nil {
  294. limit = parsed
  295. }
  296. }
  297. var since time.Time
  298. if v := r.URL.Query().Get("since"); v != "" {
  299. if parsed, err := parseSince(v); err == nil {
  300. since = parsed
  301. } else {
  302. http.Error(w, "invalid since", http.StatusBadRequest)
  303. return
  304. }
  305. }
  306. evs, err := events.ReadRecent(cfg.EventPath, limit, since)
  307. if err != nil {
  308. http.Error(w, "failed to read events", http.StatusInternalServerError)
  309. return
  310. }
  311. _ = json.NewEncoder(w).Encode(evs)
  312. })
  313. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  314. server := &http.Server{Addr: cfg.WebAddr}
  315. go func() {
  316. log.Printf("web listening on %s", cfg.WebAddr)
  317. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  318. log.Fatalf("server: %v", err)
  319. }
  320. }()
  321. stop := make(chan os.Signal, 1)
  322. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  323. <-stop
  324. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  325. defer cancelTimeout()
  326. _ = server.Shutdown(ctxTimeout)
  327. }
  328. func runDSP(ctx context.Context, src sdr.Source, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, updates <-chan dspUpdate) {
  329. ticker := time.NewTicker(cfg.FrameInterval())
  330. defer ticker.Stop()
  331. enc := json.NewEncoder(eventFile)
  332. dcBlocker := dsp.NewDCBlocker(0.995)
  333. dcEnabled := cfg.DCBlock
  334. iqEnabled := cfg.IQBalance
  335. gotSamples := false
  336. for {
  337. select {
  338. case <-ctx.Done():
  339. return
  340. case upd := <-updates:
  341. cfg = upd.cfg
  342. if upd.det != nil {
  343. det = upd.det
  344. }
  345. if upd.window != nil {
  346. window = upd.window
  347. }
  348. dcEnabled = upd.dcBlock
  349. iqEnabled = upd.iqBalance
  350. dcBlocker.Reset()
  351. ticker.Reset(cfg.FrameInterval())
  352. case <-ticker.C:
  353. iq, err := src.ReadIQ(cfg.FFTSize)
  354. if err != nil {
  355. log.Printf("read IQ: %v", err)
  356. continue
  357. }
  358. if !gotSamples {
  359. log.Printf("received IQ samples")
  360. gotSamples = true
  361. }
  362. if dcEnabled {
  363. dcBlocker.Apply(iq)
  364. }
  365. if iqEnabled {
  366. dsp.IQBalance(iq)
  367. }
  368. spectrum := fftutil.Spectrum(iq, window)
  369. now := time.Now()
  370. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  371. for _, ev := range finished {
  372. _ = enc.Encode(ev)
  373. }
  374. h.broadcast(SpectrumFrame{
  375. Timestamp: now.UnixMilli(),
  376. CenterHz: cfg.CenterHz,
  377. SampleHz: cfg.SampleRate,
  378. FFTSize: cfg.FFTSize,
  379. Spectrum: spectrum,
  380. Signals: signals,
  381. })
  382. }
  383. }
  384. }
  385. func parseSince(raw string) (time.Time, error) {
  386. if raw == "" {
  387. return time.Time{}, nil
  388. }
  389. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  390. if ms > 1e12 {
  391. return time.UnixMilli(ms), nil
  392. }
  393. return time.Unix(ms, 0), nil
  394. }
  395. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  396. return t, nil
  397. }
  398. return time.Parse(time.RFC3339, raw)
  399. }