package aoiprxkit import ( "bufio" "context" "crypto/sha1" "encoding/base64" "encoding/json" "io" "net" "net/http" "strings" "time" ) type MeterServer struct { meter *LiveMeter srv *http.Server } func NewMeterServer(listenAddress string, meter *LiveMeter) *MeterServer { if meter == nil { meter = NewLiveMeter() } ms := &MeterServer{meter: meter} mux := http.NewServeMux() mux.HandleFunc("/", ms.handleIndex) mux.HandleFunc("/healthz", ms.handleHealth) mux.HandleFunc("/api/meter", ms.handleSnapshot) mux.HandleFunc("/ws/live", ms.handleWS) ms.srv = &http.Server{ Addr: listenAddress, Handler: mux, ReadHeaderTimeout: 5 * time.Second, ReadTimeout: 10 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, } return ms } func (m *MeterServer) Meter() *LiveMeter { return m.meter } func (m *MeterServer) Start() error { go func() { _ = m.srv.ListenAndServe() }() return nil } func (m *MeterServer) Shutdown(ctx context.Context) error { return m.srv.Shutdown(ctx) } func (m *MeterServer) handleHealth(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) } func (m *MeterServer) handleSnapshot(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(m.meter.Snapshot()) } func (m *MeterServer) handleIndex(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") _, _ = io.WriteString(w, meterIndexHTML) } func (m *MeterServer) handleWS(w http.ResponseWriter, r *http.Request) { if !headerContainsToken(r.Header, "Connection", "Upgrade") || !strings.EqualFold(r.Header.Get("Upgrade"), "websocket") { http.Error(w, "upgrade required", http.StatusUpgradeRequired) return } key := strings.TrimSpace(r.Header.Get("Sec-WebSocket-Key")) if key == "" { http.Error(w, "missing Sec-WebSocket-Key", http.StatusBadRequest) return } hj, ok := w.(http.Hijacker) if !ok { http.Error(w, "hijacking not supported", http.StatusInternalServerError) return } conn, rw, err := hj.Hijack() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } accept := computeWebSocketAccept(key) _, _ = rw.WriteString("HTTP/1.1 101 Switching Protocols\r\n") _, _ = rw.WriteString("Upgrade: websocket\r\n") _, _ = rw.WriteString("Connection: Upgrade\r\n") _, _ = rw.WriteString("Sec-WebSocket-Accept: " + accept + "\r\n\r\n") _ = rw.Flush() ch, unsubscribe := m.meter.Subscribe() defer unsubscribe() defer conn.Close() _ = conn.SetDeadline(time.Time{}) for snap := range ch { payload, err := json.Marshal(snap) if err != nil { return } if err := writeWebSocketTextFrame(conn, payload); err != nil { return } } } func headerContainsToken(h http.Header, key, token string) bool { for _, v := range h.Values(key) { parts := strings.Split(v, ",") for _, part := range parts { if strings.EqualFold(strings.TrimSpace(part), token) { return true } } } return false } func computeWebSocketAccept(key string) string { const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" sum := sha1.Sum([]byte(key + magic)) return base64.StdEncoding.EncodeToString(sum[:]) } func writeWebSocketTextFrame(conn net.Conn, payload []byte) error { bw := bufio.NewWriter(conn) header := []byte{0x81} switch { case len(payload) < 126: header = append(header, byte(len(payload))) case len(payload) <= 65535: header = append(header, 126, byte(len(payload)>>8), byte(len(payload))) default: header = append(header, 127, byte(uint64(len(payload))>>56), byte(uint64(len(payload))>>48), byte(uint64(len(payload))>>40), byte(uint64(len(payload))>>32), byte(uint64(len(payload))>>24), byte(uint64(len(payload))>>16), byte(uint64(len(payload))>>8), byte(uint64(len(payload))), ) } if _, err := bw.Write(header); err != nil { return err } if _, err := bw.Write(payload); err != nil { return err } return bw.Flush() } const meterIndexHTML = `