|
- package control
-
- import (
- "sync"
- "time"
-
- offpkg "github.com/jan/fm-rds-tx/internal/offline"
- )
-
- type TelemetryMessage struct {
- Type string `json:"type"`
- TS time.Time `json:"ts"`
- Seq uint64 `json:"seq"`
- Data *offpkg.MeasurementSnapshot `json:"data,omitempty"`
- }
-
- type telemetrySubscriber struct {
- ch chan TelemetryMessage
- done chan struct{}
- once sync.Once
- }
-
- type TelemetryHub struct {
- mu sync.Mutex
- subscribers map[*telemetrySubscriber]struct{}
- ingress chan *offpkg.MeasurementSnapshot
- }
-
- func NewTelemetryHub() *TelemetryHub {
- h := &TelemetryHub{
- subscribers: make(map[*telemetrySubscriber]struct{}),
- ingress: make(chan *offpkg.MeasurementSnapshot, 1),
- }
- go h.run()
- return h
- }
-
- func (h *TelemetryHub) Subscribe() (*telemetrySubscriber, func()) {
- sub := &telemetrySubscriber{ch: make(chan TelemetryMessage, 1), done: make(chan struct{})}
- h.mu.Lock()
- h.subscribers[sub] = struct{}{}
- h.mu.Unlock()
- return sub, func() {
- h.mu.Lock()
- delete(h.subscribers, sub)
- h.mu.Unlock()
- sub.once.Do(func() { close(sub.done) })
- }
- }
-
- func (h *TelemetryHub) PublishMeasurement(snapshot *offpkg.MeasurementSnapshot) {
- if h == nil || snapshot == nil {
- return
- }
- select {
- case h.ingress <- snapshot:
- default:
- select {
- case <-h.ingress:
- default:
- }
- select {
- case h.ingress <- snapshot:
- default:
- }
- }
- }
-
- func (h *TelemetryHub) run() {
- for snapshot := range h.ingress {
- if snapshot == nil {
- continue
- }
- msg := TelemetryMessage{
- Type: "measurement",
- TS: snapshot.Timestamp,
- Seq: snapshot.Sequence,
- Data: snapshot,
- }
-
- h.mu.Lock()
- subs := make([]*telemetrySubscriber, 0, len(h.subscribers))
- for sub := range h.subscribers {
- subs = append(subs, sub)
- }
- h.mu.Unlock()
-
- for _, sub := range subs {
- select {
- case <-sub.done:
- continue
- default:
- }
- select {
- case sub.ch <- msg:
- default:
- select {
- case <-sub.ch:
- default:
- }
- select {
- case sub.ch <- msg:
- default:
- }
- }
- }
- }
- }
|