Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

432 lines
11KB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "path/filepath"
  11. "strconv"
  12. "sync"
  13. "syscall"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. "sdr-visual-suite/internal/config"
  17. "sdr-visual-suite/internal/detector"
  18. "sdr-visual-suite/internal/dsp"
  19. "sdr-visual-suite/internal/events"
  20. fftutil "sdr-visual-suite/internal/fft"
  21. "sdr-visual-suite/internal/mock"
  22. "sdr-visual-suite/internal/runtime"
  23. "sdr-visual-suite/internal/sdr"
  24. "sdr-visual-suite/internal/sdrplay"
  25. )
  26. type SpectrumFrame struct {
  27. Timestamp int64 `json:"ts"`
  28. CenterHz float64 `json:"center_hz"`
  29. SampleHz int `json:"sample_rate"`
  30. FFTSize int `json:"fft_size"`
  31. Spectrum []float64 `json:"spectrum_db"`
  32. Signals []detector.Signal `json:"signals"`
  33. }
  34. type hub struct {
  35. mu sync.Mutex
  36. clients map[*websocket.Conn]struct{}
  37. }
  38. func newHub() *hub {
  39. return &hub{clients: map[*websocket.Conn]struct{}{}}
  40. }
  41. func (h *hub) add(c *websocket.Conn) {
  42. h.mu.Lock()
  43. defer h.mu.Unlock()
  44. h.clients[c] = struct{}{}
  45. }
  46. func (h *hub) remove(c *websocket.Conn) {
  47. h.mu.Lock()
  48. defer h.mu.Unlock()
  49. delete(h.clients, c)
  50. }
  51. func (h *hub) broadcast(frame SpectrumFrame) {
  52. h.mu.Lock()
  53. defer h.mu.Unlock()
  54. b, _ := json.Marshal(frame)
  55. for c := range h.clients {
  56. _ = c.WriteMessage(websocket.TextMessage, b)
  57. }
  58. }
  59. type sourceManager struct {
  60. mu sync.RWMutex
  61. src sdr.Source
  62. newSource func(cfg config.Config) (sdr.Source, error)
  63. }
  64. func newSourceManager(src sdr.Source, newSource func(cfg config.Config) (sdr.Source, error)) *sourceManager {
  65. return &sourceManager{src: src, newSource: newSource}
  66. }
  67. func (m *sourceManager) Start() error {
  68. m.mu.RLock()
  69. defer m.mu.RUnlock()
  70. return m.src.Start()
  71. }
  72. func (m *sourceManager) Stop() error {
  73. m.mu.RLock()
  74. defer m.mu.RUnlock()
  75. return m.src.Stop()
  76. }
  77. func (m *sourceManager) ReadIQ(n int) ([]complex64, error) {
  78. m.mu.RLock()
  79. defer m.mu.RUnlock()
  80. return m.src.ReadIQ(n)
  81. }
  82. func (m *sourceManager) ApplyConfig(cfg config.Config) error {
  83. m.mu.Lock()
  84. defer m.mu.Unlock()
  85. if updatable, ok := m.src.(sdr.ConfigurableSource); ok {
  86. if err := updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz); err == nil {
  87. return nil
  88. }
  89. }
  90. old := m.src
  91. _ = old.Stop()
  92. next, err := m.newSource(cfg)
  93. if err != nil {
  94. _ = old.Start()
  95. return err
  96. }
  97. if err := next.Start(); err != nil {
  98. _ = next.Stop()
  99. _ = old.Start()
  100. return err
  101. }
  102. m.src = next
  103. return nil
  104. }
  105. type dspUpdate struct {
  106. cfg config.Config
  107. det *detector.Detector
  108. window []float64
  109. dcBlock bool
  110. iqBalance bool
  111. }
  112. func pushDSPUpdate(ch chan dspUpdate, update dspUpdate) {
  113. select {
  114. case ch <- update:
  115. default:
  116. select {
  117. case <-ch:
  118. default:
  119. }
  120. ch <- update
  121. }
  122. }
  123. func main() {
  124. var cfgPath string
  125. var mockFlag bool
  126. flag.StringVar(&cfgPath, "config", "config.yaml", "path to config YAML")
  127. flag.BoolVar(&mockFlag, "mock", false, "use synthetic IQ source")
  128. flag.Parse()
  129. cfg, err := config.Load(cfgPath)
  130. if err != nil {
  131. log.Fatalf("load config: %v", err)
  132. }
  133. cfgManager := runtime.New(cfg)
  134. newSource := func(cfg config.Config) (sdr.Source, error) {
  135. if mockFlag {
  136. src := mock.New(cfg.SampleRate)
  137. if updatable, ok := interface{}(src).(sdr.ConfigurableSource); ok {
  138. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  139. }
  140. return src, nil
  141. }
  142. src, err := sdrplay.New(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.TunerBwKHz)
  143. if err != nil {
  144. return nil, err
  145. }
  146. if updatable, ok := src.(sdr.ConfigurableSource); ok {
  147. _ = updatable.UpdateConfig(cfg.SampleRate, cfg.CenterHz, cfg.GainDb, cfg.AGC, cfg.TunerBwKHz)
  148. }
  149. return src, nil
  150. }
  151. src, err := newSource(cfg)
  152. if err != nil {
  153. log.Fatalf("sdrplay init failed: %v (try --mock or build with -tags sdrplay)", err)
  154. }
  155. srcMgr := newSourceManager(src, newSource)
  156. if err := srcMgr.Start(); err != nil {
  157. log.Fatalf("source start: %v", err)
  158. }
  159. defer srcMgr.Stop()
  160. if err := os.MkdirAll(filepath.Dir(cfg.EventPath), 0o755); err != nil {
  161. log.Fatalf("event path: %v", err)
  162. }
  163. eventFile, err := os.OpenFile(cfg.EventPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  164. if err != nil {
  165. log.Fatalf("open events: %v", err)
  166. }
  167. defer eventFile.Close()
  168. det := detector.New(cfg.Detector.ThresholdDb, cfg.SampleRate, cfg.FFTSize,
  169. time.Duration(cfg.Detector.MinDurationMs)*time.Millisecond,
  170. time.Duration(cfg.Detector.HoldMs)*time.Millisecond)
  171. window := fftutil.Hann(cfg.FFTSize)
  172. h := newHub()
  173. dspUpdates := make(chan dspUpdate, 1)
  174. ctx, cancel := context.WithCancel(context.Background())
  175. defer cancel()
  176. go runDSP(ctx, srcMgr, cfg, det, window, h, eventFile, dspUpdates)
  177. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
  178. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  179. c, err := upgrader.Upgrade(w, r, nil)
  180. if err != nil {
  181. return
  182. }
  183. h.add(c)
  184. defer func() {
  185. h.remove(c)
  186. _ = c.Close()
  187. }()
  188. for {
  189. _, _, err := c.ReadMessage()
  190. if err != nil {
  191. return
  192. }
  193. }
  194. })
  195. http.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
  196. w.Header().Set("Content-Type", "application/json")
  197. switch r.Method {
  198. case http.MethodGet:
  199. _ = json.NewEncoder(w).Encode(cfgManager.Snapshot())
  200. case http.MethodPost:
  201. var update runtime.ConfigUpdate
  202. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  203. http.Error(w, "invalid json", http.StatusBadRequest)
  204. return
  205. }
  206. prev := cfgManager.Snapshot()
  207. next, err := cfgManager.ApplyConfig(update)
  208. if err != nil {
  209. http.Error(w, err.Error(), http.StatusBadRequest)
  210. return
  211. }
  212. sourceChanged := prev.CenterHz != next.CenterHz || prev.SampleRate != next.SampleRate || prev.GainDb != next.GainDb || prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz
  213. if sourceChanged {
  214. if err := srcMgr.ApplyConfig(next); err != nil {
  215. cfgManager.Replace(prev)
  216. http.Error(w, "failed to apply source config", http.StatusInternalServerError)
  217. return
  218. }
  219. }
  220. detChanged := prev.Detector.ThresholdDb != next.Detector.ThresholdDb ||
  221. prev.Detector.MinDurationMs != next.Detector.MinDurationMs ||
  222. prev.Detector.HoldMs != next.Detector.HoldMs ||
  223. prev.SampleRate != next.SampleRate ||
  224. prev.FFTSize != next.FFTSize
  225. windowChanged := prev.FFTSize != next.FFTSize
  226. var newDet *detector.Detector
  227. var newWindow []float64
  228. if detChanged {
  229. newDet = detector.New(next.Detector.ThresholdDb, next.SampleRate, next.FFTSize,
  230. time.Duration(next.Detector.MinDurationMs)*time.Millisecond,
  231. time.Duration(next.Detector.HoldMs)*time.Millisecond)
  232. }
  233. if windowChanged {
  234. newWindow = fftutil.Hann(next.FFTSize)
  235. }
  236. pushDSPUpdate(dspUpdates, dspUpdate{
  237. cfg: next,
  238. det: newDet,
  239. window: newWindow,
  240. dcBlock: next.DCBlock,
  241. iqBalance: next.IQBalance,
  242. })
  243. _ = json.NewEncoder(w).Encode(next)
  244. default:
  245. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  246. }
  247. })
  248. http.HandleFunc("/api/sdr/settings", func(w http.ResponseWriter, r *http.Request) {
  249. w.Header().Set("Content-Type", "application/json")
  250. if r.Method != http.MethodPost {
  251. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  252. return
  253. }
  254. var update runtime.SettingsUpdate
  255. if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
  256. http.Error(w, "invalid json", http.StatusBadRequest)
  257. return
  258. }
  259. prev := cfgManager.Snapshot()
  260. next, err := cfgManager.ApplySettings(update)
  261. if err != nil {
  262. http.Error(w, err.Error(), http.StatusBadRequest)
  263. return
  264. }
  265. if prev.AGC != next.AGC || prev.TunerBwKHz != next.TunerBwKHz {
  266. if err := srcMgr.ApplyConfig(next); err != nil {
  267. cfgManager.Replace(prev)
  268. http.Error(w, "failed to apply sdr settings", http.StatusInternalServerError)
  269. return
  270. }
  271. }
  272. if prev.DCBlock != next.DCBlock || prev.IQBalance != next.IQBalance {
  273. pushDSPUpdate(dspUpdates, dspUpdate{
  274. cfg: next,
  275. dcBlock: next.DCBlock,
  276. iqBalance: next.IQBalance,
  277. })
  278. }
  279. _ = json.NewEncoder(w).Encode(next)
  280. })
  281. http.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
  282. w.Header().Set("Content-Type", "application/json")
  283. limit := 200
  284. if v := r.URL.Query().Get("limit"); v != "" {
  285. if parsed, err := strconv.Atoi(v); err == nil {
  286. limit = parsed
  287. }
  288. }
  289. var since time.Time
  290. if v := r.URL.Query().Get("since"); v != "" {
  291. if parsed, err := parseSince(v); err == nil {
  292. since = parsed
  293. } else {
  294. http.Error(w, "invalid since", http.StatusBadRequest)
  295. return
  296. }
  297. }
  298. evs, err := events.ReadRecent(cfg.EventPath, limit, since)
  299. if err != nil {
  300. http.Error(w, "failed to read events", http.StatusInternalServerError)
  301. return
  302. }
  303. _ = json.NewEncoder(w).Encode(evs)
  304. })
  305. http.Handle("/", http.FileServer(http.Dir(cfg.WebRoot)))
  306. server := &http.Server{Addr: cfg.WebAddr}
  307. go func() {
  308. log.Printf("web listening on %s", cfg.WebAddr)
  309. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  310. log.Fatalf("server: %v", err)
  311. }
  312. }()
  313. stop := make(chan os.Signal, 1)
  314. signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
  315. <-stop
  316. ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second)
  317. defer cancelTimeout()
  318. _ = server.Shutdown(ctxTimeout)
  319. }
  320. func runDSP(ctx context.Context, src sdr.Source, cfg config.Config, det *detector.Detector, window []float64, h *hub, eventFile *os.File, updates <-chan dspUpdate) {
  321. ticker := time.NewTicker(cfg.FrameInterval())
  322. defer ticker.Stop()
  323. enc := json.NewEncoder(eventFile)
  324. dcBlocker := dsp.NewDCBlocker(0.995)
  325. dcEnabled := cfg.DCBlock
  326. iqEnabled := cfg.IQBalance
  327. gotSamples := false
  328. for {
  329. select {
  330. case <-ctx.Done():
  331. return
  332. case upd := <-updates:
  333. cfg = upd.cfg
  334. if upd.det != nil {
  335. det = upd.det
  336. }
  337. if upd.window != nil {
  338. window = upd.window
  339. }
  340. dcEnabled = upd.dcBlock
  341. iqEnabled = upd.iqBalance
  342. dcBlocker.Reset()
  343. ticker.Reset(cfg.FrameInterval())
  344. case <-ticker.C:
  345. iq, err := src.ReadIQ(cfg.FFTSize)
  346. if err != nil {
  347. log.Printf("read IQ: %v", err)
  348. continue
  349. }
  350. if !gotSamples {
  351. log.Printf("received IQ samples")
  352. gotSamples = true
  353. }
  354. if dcEnabled {
  355. dcBlocker.Apply(iq)
  356. }
  357. if iqEnabled {
  358. dsp.IQBalance(iq)
  359. }
  360. spectrum := fftutil.Spectrum(iq, window)
  361. now := time.Now()
  362. finished, signals := det.Process(now, spectrum, cfg.CenterHz)
  363. for _, ev := range finished {
  364. _ = enc.Encode(ev)
  365. }
  366. h.broadcast(SpectrumFrame{
  367. Timestamp: now.UnixMilli(),
  368. CenterHz: cfg.CenterHz,
  369. SampleHz: cfg.SampleRate,
  370. FFTSize: cfg.FFTSize,
  371. Spectrum: spectrum,
  372. Signals: signals,
  373. })
  374. }
  375. }
  376. }
  377. func parseSince(raw string) (time.Time, error) {
  378. if raw == "" {
  379. return time.Time{}, nil
  380. }
  381. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  382. if ms > 1e12 {
  383. return time.UnixMilli(ms), nil
  384. }
  385. return time.Unix(ms, 0), nil
  386. }
  387. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  388. return t, nil
  389. }
  390. return time.Parse(time.RFC3339, raw)
  391. }