package control import ( "sync" "time" offpkg "github.com/jan/fm-rds-tx/internal/offline" ) type TelemetryMessage struct { Type string `json:"type"` TS time.Time `json:"ts"` Seq uint64 `json:"seq"` Data *offpkg.MeasurementSnapshot `json:"data,omitempty"` } type telemetrySubscriber struct { ch chan TelemetryMessage done chan struct{} once sync.Once } type TelemetryHub struct { mu sync.Mutex subscribers map[*telemetrySubscriber]struct{} ingress chan *offpkg.MeasurementSnapshot } func NewTelemetryHub() *TelemetryHub { h := &TelemetryHub{ subscribers: make(map[*telemetrySubscriber]struct{}), ingress: make(chan *offpkg.MeasurementSnapshot, 1), } go h.run() return h } func (h *TelemetryHub) Subscribe() (*telemetrySubscriber, func()) { sub := &telemetrySubscriber{ch: make(chan TelemetryMessage, 1), done: make(chan struct{})} h.mu.Lock() h.subscribers[sub] = struct{}{} h.mu.Unlock() return sub, func() { h.mu.Lock() delete(h.subscribers, sub) h.mu.Unlock() sub.once.Do(func() { close(sub.done) }) } } func (h *TelemetryHub) PublishMeasurement(snapshot *offpkg.MeasurementSnapshot) { if h == nil || snapshot == nil { return } select { case h.ingress <- snapshot: default: select { case <-h.ingress: default: } select { case h.ingress <- snapshot: default: } } } func (h *TelemetryHub) run() { for snapshot := range h.ingress { if snapshot == nil { continue } msg := TelemetryMessage{ Type: "measurement", TS: snapshot.Timestamp, Seq: snapshot.Sequence, Data: snapshot, } h.mu.Lock() subs := make([]*telemetrySubscriber, 0, len(h.subscribers)) for sub := range h.subscribers { subs = append(subs, sub) } h.mu.Unlock() for _, sub := range subs { select { case <-sub.done: continue default: } select { case sub.ch <- msg: default: select { case <-sub.ch: default: } select { case sub.ch <- msg: default: } } } } }