ソースを参照

fix: harden telemetry hub lifecycle and publish boundary

main
Jan 1ヶ月前
コミット
6bbf095174
2個のファイルの変更65行の追加29行の削除
  1. +8
    -3
      internal/control/control.go
  2. +57
    -26
      internal/control/telemetry.go

+ 8
- 3
internal/control/control.go ファイルの表示

@@ -423,10 +423,15 @@ func (s *Server) handleTelemetryWS(ws *websocket.Conn) {
}
}

for msg := range sub.ch {
_ = ws.SetWriteDeadline(time.Now().Add(2 * time.Second))
if err := websocket.JSON.Send(ws, msg); err != nil {
for {
select {
case <-sub.done:
return
case msg := <-sub.ch:
_ = ws.SetWriteDeadline(time.Now().Add(2 * time.Second))
if err := websocket.JSON.Send(ws, msg); err != nil {
return
}
}
}
}


+ 57
- 26
internal/control/telemetry.go ファイルの表示

@@ -8,37 +8,43 @@ import (
)

type TelemetryMessage struct {
Type string `json:"type"`
TS time.Time `json:"ts"`
Seq uint64 `json:"seq"`
Type string `json:"type"`
TS time.Time `json:"ts"`
Seq uint64 `json:"seq"`
Data *offpkg.MeasurementSnapshot `json:"data,omitempty"`
}

type telemetrySubscriber struct {
ch chan TelemetryMessage
ch chan TelemetryMessage
done chan struct{}
once sync.Once
}

type TelemetryHub struct {
mu sync.Mutex
subscribers map[*telemetrySubscriber]struct{}
ingress chan *offpkg.MeasurementSnapshot
}

func NewTelemetryHub() *TelemetryHub {
return &TelemetryHub{subscribers: make(map[*telemetrySubscriber]struct{})}
h := &TelemetryHub{
subscribers: make(map[*telemetrySubscriber]struct{}),
ingress: make(chan *offpkg.MeasurementSnapshot, 1),
}
go h.run()
return h
}

func (h *TelemetryHub) Subscribe() (*telemetrySubscriber, func()) {
sub := &telemetrySubscriber{ch: make(chan TelemetryMessage, 1)}
sub := &telemetrySubscriber{ch: make(chan TelemetryMessage, 1), done: make(chan struct{})}
h.mu.Lock()
h.subscribers[sub] = struct{}{}
h.mu.Unlock()
return sub, func() {
h.mu.Lock()
if _, ok := h.subscribers[sub]; ok {
delete(h.subscribers, sub)
close(sub.ch)
}
delete(h.subscribers, sub)
h.mu.Unlock()
sub.once.Do(func() { close(sub.done) })
}
}

@@ -46,31 +52,56 @@ func (h *TelemetryHub) PublishMeasurement(snapshot *offpkg.MeasurementSnapshot)
if h == nil || snapshot == nil {
return
}
msg := TelemetryMessage{
Type: "measurement",
TS: snapshot.Timestamp,
Seq: snapshot.Sequence,
Data: snapshot,
select {
case h.ingress <- snapshot:
default:
select {
case <-h.ingress:
default:
}
select {
case h.ingress <- snapshot:
default:
}
}
}

h.mu.Lock()
subs := make([]*telemetrySubscriber, 0, len(h.subscribers))
for sub := range h.subscribers {
subs = append(subs, sub)
}
h.mu.Unlock()
func (h *TelemetryHub) run() {
for snapshot := range h.ingress {
if snapshot == nil {
continue
}
msg := TelemetryMessage{
Type: "measurement",
TS: snapshot.Timestamp,
Seq: snapshot.Sequence,
Data: snapshot,
}

for _, sub := range subs {
select {
case sub.ch <- msg:
default:
h.mu.Lock()
subs := make([]*telemetrySubscriber, 0, len(h.subscribers))
for sub := range h.subscribers {
subs = append(subs, sub)
}
h.mu.Unlock()

for _, sub := range subs {
select {
case <-sub.ch:
case <-sub.done:
continue
default:
}
select {
case sub.ch <- msg:
default:
select {
case <-sub.ch:
default:
}
select {
case sub.ch <- msg:
default:
}
}
}
}


読み込み中…
キャンセル
保存