You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 line
5.9KB

  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. "net/http"
  6. "strconv"
  7. "time"
  8. "github.com/gorilla/websocket"
  9. "sdr-visual-suite/internal/recorder"
  10. )
  11. func registerWSHandlers(mux *http.ServeMux, h *hub, recMgr *recorder.Manager) {
  12. upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
  13. origin := r.Header.Get("Origin")
  14. if origin == "" || origin == "null" {
  15. return true
  16. }
  17. return true
  18. }}
  19. mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  20. conn, err := upgrader.Upgrade(w, r, nil)
  21. if err != nil {
  22. log.Printf("ws upgrade failed: %v (origin: %s)", err, r.Header.Get("Origin"))
  23. return
  24. }
  25. // Parse query params for remote clients: ?binary=1&bins=2048&fps=5
  26. q := r.URL.Query()
  27. c := &client{conn: conn, send: make(chan []byte, 64), done: make(chan struct{})}
  28. if q.Get("binary") == "1" || q.Get("binary") == "true" {
  29. c.binary = true
  30. }
  31. if v, err := strconv.Atoi(q.Get("bins")); err == nil && v > 0 {
  32. c.maxBins = v
  33. }
  34. if v, err := strconv.Atoi(q.Get("fps")); err == nil && v > 0 {
  35. c.targetFps = v
  36. // frameSkip: if server runs at ~15fps and client wants 5fps → skip 3
  37. c.frameSkip = 15 / v
  38. if c.frameSkip < 1 {
  39. c.frameSkip = 1
  40. }
  41. }
  42. h.add(c)
  43. defer func() {
  44. h.remove(c)
  45. _ = conn.Close()
  46. }()
  47. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  48. conn.SetPongHandler(func(string) error {
  49. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  50. return nil
  51. })
  52. go func() {
  53. ping := time.NewTicker(30 * time.Second)
  54. defer ping.Stop()
  55. for {
  56. select {
  57. case msg, ok := <-c.send:
  58. if !ok {
  59. return
  60. }
  61. // Binary frames can be large (130KB+) — need more time
  62. deadline := 500 * time.Millisecond
  63. if !c.binary {
  64. deadline = 200 * time.Millisecond
  65. }
  66. _ = conn.SetWriteDeadline(time.Now().Add(deadline))
  67. msgType := websocket.TextMessage
  68. if c.binary {
  69. msgType = websocket.BinaryMessage
  70. }
  71. if err := conn.WriteMessage(msgType, msg); err != nil {
  72. return
  73. }
  74. case <-ping.C:
  75. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  76. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  77. return
  78. }
  79. case <-c.done:
  80. return
  81. }
  82. }
  83. }()
  84. // Read loop: handle config messages from client + keep-alive
  85. for {
  86. _, msg, err := conn.ReadMessage()
  87. if err != nil {
  88. return
  89. }
  90. // Try to parse as client config update
  91. var cfg struct {
  92. Binary *bool `json:"binary,omitempty"`
  93. Bins *int `json:"bins,omitempty"`
  94. FPS *int `json:"fps,omitempty"`
  95. }
  96. if json.Unmarshal(msg, &cfg) == nil {
  97. if cfg.Binary != nil {
  98. c.binary = *cfg.Binary
  99. }
  100. if cfg.Bins != nil && *cfg.Bins > 0 {
  101. c.maxBins = *cfg.Bins
  102. }
  103. if cfg.FPS != nil && *cfg.FPS > 0 {
  104. c.targetFps = *cfg.FPS
  105. c.frameSkip = 15 / *cfg.FPS
  106. if c.frameSkip < 1 {
  107. c.frameSkip = 1
  108. }
  109. }
  110. }
  111. }
  112. })
  113. // /ws/audio — WebSocket endpoint for continuous live-listen audio streaming.
  114. // Client connects with query params: freq, bw, mode
  115. // Server sends binary frames of PCM s16le audio at 48kHz.
  116. mux.HandleFunc("/ws/audio", func(w http.ResponseWriter, r *http.Request) {
  117. q := r.URL.Query()
  118. freq, _ := strconv.ParseFloat(q.Get("freq"), 64)
  119. bw, _ := strconv.ParseFloat(q.Get("bw"), 64)
  120. mode := q.Get("mode")
  121. if freq <= 0 {
  122. http.Error(w, "freq required", http.StatusBadRequest)
  123. return
  124. }
  125. if bw <= 0 {
  126. bw = 12000
  127. }
  128. streamer := recMgr.StreamerRef()
  129. if streamer == nil {
  130. http.Error(w, "streamer not available", http.StatusServiceUnavailable)
  131. return
  132. }
  133. // LL-3: Subscribe BEFORE upgrading WebSocket.
  134. // SubscribeAudio now returns AudioInfo and never immediately closes
  135. // the channel — it queues pending listeners instead.
  136. subID, ch, audioInfo, err := streamer.SubscribeAudio(freq, bw, mode)
  137. if err != nil {
  138. http.Error(w, err.Error(), http.StatusServiceUnavailable)
  139. return
  140. }
  141. conn, err := upgrader.Upgrade(w, r, nil)
  142. if err != nil {
  143. streamer.UnsubscribeAudio(subID)
  144. log.Printf("ws/audio upgrade failed: %v", err)
  145. return
  146. }
  147. defer func() {
  148. streamer.UnsubscribeAudio(subID)
  149. _ = conn.Close()
  150. }()
  151. log.Printf("ws/audio: client connected freq=%.1fMHz mode=%s", freq/1e6, mode)
  152. // LL-2: Send actual audio info (channels, sample rate from session)
  153. info := map[string]any{
  154. "type": "audio_info",
  155. "sample_rate": audioInfo.SampleRate,
  156. "channels": audioInfo.Channels,
  157. "format": audioInfo.Format,
  158. "demod": audioInfo.DemodName,
  159. "freq": freq,
  160. "mode": mode,
  161. }
  162. if infoBytes, err := json.Marshal(info); err == nil {
  163. _ = conn.WriteMessage(websocket.TextMessage, infoBytes)
  164. }
  165. // Read goroutine (to detect disconnect)
  166. done := make(chan struct{})
  167. go func() {
  168. defer close(done)
  169. for {
  170. _, _, err := conn.ReadMessage()
  171. if err != nil {
  172. return
  173. }
  174. }
  175. }()
  176. ping := time.NewTicker(30 * time.Second)
  177. defer ping.Stop()
  178. for {
  179. select {
  180. case data, ok := <-ch:
  181. if !ok {
  182. log.Printf("ws/audio: stream ended freq=%.1fMHz", freq/1e6)
  183. return
  184. }
  185. if len(data) == 0 {
  186. continue
  187. }
  188. _ = conn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond))
  189. // Tag protocol: first byte is message type
  190. // 0x00 = AudioInfo JSON (send as TextMessage, strip tag)
  191. // 0x01 = PCM audio (send as BinaryMessage, strip tag)
  192. tag := data[0]
  193. payload := data[1:]
  194. msgType := websocket.BinaryMessage
  195. if tag == 0x00 {
  196. msgType = websocket.TextMessage
  197. }
  198. if err := conn.WriteMessage(msgType, payload); err != nil {
  199. log.Printf("ws/audio: write error: %v", err)
  200. return
  201. }
  202. case <-ping.C:
  203. _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  204. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  205. return
  206. }
  207. case <-done:
  208. log.Printf("ws/audio: client disconnected freq=%.1fMHz", freq/1e6)
  209. return
  210. }
  211. }
  212. })
  213. }