diff --git a/internal/control/control.go b/internal/control/control.go index 1765bb0..2fd125f 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -423,10 +423,15 @@ func (s *Server) handleTelemetryWS(ws *websocket.Conn) { } } - for msg := range sub.ch { - _ = ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) - if err := websocket.JSON.Send(ws, msg); err != nil { + for { + select { + case <-sub.done: return + case msg := <-sub.ch: + _ = ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) + if err := websocket.JSON.Send(ws, msg); err != nil { + return + } } } } diff --git a/internal/control/telemetry.go b/internal/control/telemetry.go index 6a38fc0..7468c5c 100644 --- a/internal/control/telemetry.go +++ b/internal/control/telemetry.go @@ -8,37 +8,43 @@ import ( ) type TelemetryMessage struct { - Type string `json:"type"` - TS time.Time `json:"ts"` - Seq uint64 `json:"seq"` + Type string `json:"type"` + TS time.Time `json:"ts"` + Seq uint64 `json:"seq"` Data *offpkg.MeasurementSnapshot `json:"data,omitempty"` } type telemetrySubscriber struct { - ch chan TelemetryMessage + ch chan TelemetryMessage + done chan struct{} + once sync.Once } type TelemetryHub struct { mu sync.Mutex subscribers map[*telemetrySubscriber]struct{} + ingress chan *offpkg.MeasurementSnapshot } func NewTelemetryHub() *TelemetryHub { - return &TelemetryHub{subscribers: make(map[*telemetrySubscriber]struct{})} + h := &TelemetryHub{ + subscribers: make(map[*telemetrySubscriber]struct{}), + ingress: make(chan *offpkg.MeasurementSnapshot, 1), + } + go h.run() + return h } func (h *TelemetryHub) Subscribe() (*telemetrySubscriber, func()) { - sub := &telemetrySubscriber{ch: make(chan TelemetryMessage, 1)} + sub := &telemetrySubscriber{ch: make(chan TelemetryMessage, 1), done: make(chan struct{})} h.mu.Lock() h.subscribers[sub] = struct{}{} h.mu.Unlock() return sub, func() { h.mu.Lock() - if _, ok := h.subscribers[sub]; ok { - delete(h.subscribers, sub) - close(sub.ch) - } + delete(h.subscribers, sub) h.mu.Unlock() + sub.once.Do(func() { close(sub.done) }) } } @@ -46,31 +52,56 @@ func (h *TelemetryHub) PublishMeasurement(snapshot *offpkg.MeasurementSnapshot) if h == nil || snapshot == nil { return } - msg := TelemetryMessage{ - Type: "measurement", - TS: snapshot.Timestamp, - Seq: snapshot.Sequence, - Data: snapshot, + select { + case h.ingress <- snapshot: + default: + select { + case <-h.ingress: + default: + } + select { + case h.ingress <- snapshot: + default: + } } +} - h.mu.Lock() - subs := make([]*telemetrySubscriber, 0, len(h.subscribers)) - for sub := range h.subscribers { - subs = append(subs, sub) - } - h.mu.Unlock() +func (h *TelemetryHub) run() { + for snapshot := range h.ingress { + if snapshot == nil { + continue + } + msg := TelemetryMessage{ + Type: "measurement", + TS: snapshot.Timestamp, + Seq: snapshot.Sequence, + Data: snapshot, + } - for _, sub := range subs { - select { - case sub.ch <- msg: - default: + h.mu.Lock() + subs := make([]*telemetrySubscriber, 0, len(h.subscribers)) + for sub := range h.subscribers { + subs = append(subs, sub) + } + h.mu.Unlock() + + for _, sub := range subs { select { - case <-sub.ch: + case <-sub.done: + continue default: } select { case sub.ch <- msg: default: + select { + case <-sub.ch: + default: + } + select { + case sub.ch <- msg: + default: + } } } }