Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
2.0KB

  1. package control
  2. import (
  3. "sync"
  4. "time"
  5. offpkg "github.com/jan/fm-rds-tx/internal/offline"
  6. )
  7. type TelemetryMessage struct {
  8. Type string `json:"type"`
  9. TS time.Time `json:"ts"`
  10. Seq uint64 `json:"seq"`
  11. Data *offpkg.MeasurementSnapshot `json:"data,omitempty"`
  12. }
  13. type telemetrySubscriber struct {
  14. ch chan TelemetryMessage
  15. done chan struct{}
  16. once sync.Once
  17. }
  18. type TelemetryHub struct {
  19. mu sync.Mutex
  20. subscribers map[*telemetrySubscriber]struct{}
  21. ingress chan *offpkg.MeasurementSnapshot
  22. }
  23. func NewTelemetryHub() *TelemetryHub {
  24. h := &TelemetryHub{
  25. subscribers: make(map[*telemetrySubscriber]struct{}),
  26. ingress: make(chan *offpkg.MeasurementSnapshot, 1),
  27. }
  28. go h.run()
  29. return h
  30. }
  31. func (h *TelemetryHub) Subscribe() (*telemetrySubscriber, func()) {
  32. sub := &telemetrySubscriber{ch: make(chan TelemetryMessage, 1), done: make(chan struct{})}
  33. h.mu.Lock()
  34. h.subscribers[sub] = struct{}{}
  35. h.mu.Unlock()
  36. return sub, func() {
  37. h.mu.Lock()
  38. delete(h.subscribers, sub)
  39. h.mu.Unlock()
  40. sub.once.Do(func() { close(sub.done) })
  41. }
  42. }
  43. func (h *TelemetryHub) PublishMeasurement(snapshot *offpkg.MeasurementSnapshot) {
  44. if h == nil || snapshot == nil {
  45. return
  46. }
  47. select {
  48. case h.ingress <- snapshot:
  49. default:
  50. select {
  51. case <-h.ingress:
  52. default:
  53. }
  54. select {
  55. case h.ingress <- snapshot:
  56. default:
  57. }
  58. }
  59. }
  60. func (h *TelemetryHub) run() {
  61. for snapshot := range h.ingress {
  62. if snapshot == nil {
  63. continue
  64. }
  65. msg := TelemetryMessage{
  66. Type: "measurement",
  67. TS: snapshot.Timestamp,
  68. Seq: snapshot.Sequence,
  69. Data: snapshot,
  70. }
  71. h.mu.Lock()
  72. subs := make([]*telemetrySubscriber, 0, len(h.subscribers))
  73. for sub := range h.subscribers {
  74. subs = append(subs, sub)
  75. }
  76. h.mu.Unlock()
  77. for _, sub := range subs {
  78. select {
  79. case <-sub.done:
  80. continue
  81. default:
  82. }
  83. select {
  84. case sub.ch <- msg:
  85. default:
  86. select {
  87. case <-sub.ch:
  88. default:
  89. }
  90. select {
  91. case sub.ch <- msg:
  92. default:
  93. }
  94. }
  95. }
  96. }
  97. }