Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

262 line
6.4KB

  1. package output
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. )
  7. // ErrFrameQueueClosed is returned when a queue operation is attempted after the queue
  8. // has been closed.
  9. var ErrFrameQueueClosed = errors.New("frame queue closed")
  10. // QueueStats exposes the runtime state of a frame queue.
  11. type QueueStats struct {
  12. Capacity int `json:"capacity"`
  13. Depth int `json:"depth"`
  14. FillLevel float64 `json:"fillLevel"`
  15. Health QueueHealth `json:"health"`
  16. HighWaterMark int `json:"highWaterMark"`
  17. LowWaterMark int `json:"lowWaterMark"`
  18. PushTimeouts uint64 `json:"pushTimeouts"`
  19. PopTimeouts uint64 `json:"popTimeouts"`
  20. DroppedFrames uint64 `json:"droppedFrames"`
  21. RepeatedFrames uint64 `json:"repeatedFrames"`
  22. MutedFrames uint64 `json:"mutedFrames"`
  23. }
  24. type QueueHealth string
  25. const (
  26. QueueHealthCritical QueueHealth = "critical"
  27. QueueHealthLow QueueHealth = "low"
  28. QueueHealthNormal QueueHealth = "normal"
  29. )
  30. const (
  31. queueHealthCriticalThreshold = 0.2
  32. queueHealthLowThreshold = 0.5
  33. )
  34. // FrameQueue is a bounded ring that holds CompositeFrame instances between the
  35. // generator and the writer. Push blocks when the queue is full until space
  36. // becomes available or the provided context is cancelled. Pop blocks when the
  37. // queue is empty until a new frame arrives or the context is cancelled.
  38. type FrameQueue struct {
  39. capacity int
  40. ch chan *CompositeFrame
  41. // closeCh is closed by Close() and used in Push/Pop selects so that
  42. // a concurrent Close() can never race with a channel send.
  43. closeCh chan struct{}
  44. mu sync.Mutex
  45. depth int
  46. highWaterMark int
  47. lowWaterMark int
  48. pushTimeouts uint64
  49. popTimeouts uint64
  50. dropped uint64
  51. repeated uint64
  52. muted uint64
  53. closed bool
  54. closeOnce sync.Once
  55. }
  56. // NewFrameQueue builds a bounded queue that holds up to capacity frames.
  57. func NewFrameQueue(capacity int) *FrameQueue {
  58. if capacity <= 0 {
  59. capacity = 1
  60. }
  61. fq := &FrameQueue{
  62. capacity: capacity,
  63. ch: make(chan *CompositeFrame, capacity),
  64. closeCh: make(chan struct{}),
  65. lowWaterMark: capacity,
  66. }
  67. fq.trackDepth(0)
  68. return fq
  69. }
  70. // Capacity returns the fixed frame capacity of the queue.
  71. func (q *FrameQueue) Capacity() int {
  72. return q.capacity
  73. }
  74. // FillLevel reports the current occupancy as a fraction of capacity.
  75. // Uses len(ch) directly for accuracy: updateDepth() is called after the
  76. // channel operation, so q.depth can lag by one frame transiently.
  77. func (q *FrameQueue) FillLevel() float64 {
  78. if q.capacity == 0 {
  79. return 0
  80. }
  81. return float64(len(q.ch)) / float64(q.capacity)
  82. }
  83. // Depth returns the current number of frames in the queue.
  84. // Uses len(ch) directly for accuracy (see FillLevel).
  85. func (q *FrameQueue) Depth() int {
  86. return len(q.ch)
  87. }
  88. // Stats returns a snapshot of the queue metrics.
  89. func (q *FrameQueue) Stats() QueueStats {
  90. q.mu.Lock()
  91. fill := q.fillLevelLocked()
  92. stats := QueueStats{
  93. Capacity: q.capacity,
  94. Depth: len(q.ch),
  95. FillLevel: fill,
  96. Health: queueHealthFromFill(fill),
  97. HighWaterMark: q.highWaterMark,
  98. LowWaterMark: q.lowWaterMark,
  99. PushTimeouts: q.pushTimeouts,
  100. PopTimeouts: q.popTimeouts,
  101. DroppedFrames: q.dropped,
  102. RepeatedFrames: q.repeated,
  103. MutedFrames: q.muted,
  104. }
  105. q.mu.Unlock()
  106. return stats
  107. }
  108. // Push enqueues a frame, blocking until space is available or ctx is done.
  109. func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error {
  110. if frame == nil {
  111. return errors.New("frame required")
  112. }
  113. // BUG-A fix: use closeCh in the select — no TOCTOU gap.
  114. // BUG-3 fix: updateDepth(+1) and trackDepth (highWaterMark) only on
  115. // successful send. Pre-incrementing before the select caused the
  116. // high-watermark to count frames that were never actually queued
  117. // (cancelled or closed path), potentially reporting capacity+1.
  118. select {
  119. case q.ch <- frame:
  120. q.updateDepth(+1)
  121. return nil
  122. case <-q.closeCh:
  123. return ErrFrameQueueClosed
  124. case <-ctx.Done():
  125. q.recordPushTimeout()
  126. return ctx.Err()
  127. }
  128. }
  129. // Pop removes a frame, blocking until one is available or ctx signals done.
  130. func (q *FrameQueue) Pop(ctx context.Context) (*CompositeFrame, error) {
  131. select {
  132. case frame, ok := <-q.ch:
  133. if !ok {
  134. return nil, ErrFrameQueueClosed
  135. }
  136. q.updateDepth(-1)
  137. return frame, nil
  138. case <-ctx.Done():
  139. q.recordPopTimeout()
  140. return nil, ctx.Err()
  141. }
  142. }
  143. // Drain removes and discards all frames currently in the queue.
  144. // Call before restarting a stopped engine to avoid replaying stale frames.
  145. func (q *FrameQueue) Drain() {
  146. for {
  147. select {
  148. case <-q.ch:
  149. q.updateDepth(-1)
  150. default:
  151. return
  152. }
  153. }
  154. }
  155. // Close marks the queue as closed and wakes up blocked callers.
  156. func (q *FrameQueue) Close() {
  157. q.closeOnce.Do(func() {
  158. q.mu.Lock()
  159. q.closed = true
  160. q.mu.Unlock()
  161. // Close closeCh first so blocked Push() calls unblock safely
  162. // before close(ch) removes the destination.
  163. close(q.closeCh)
  164. close(q.ch)
  165. })
  166. }
  167. // RecordDrop increments the drop counter for instrumentation.
  168. func (q *FrameQueue) RecordDrop() {
  169. q.mu.Lock()
  170. q.dropped++
  171. q.mu.Unlock()
  172. }
  173. // RecordRepeat increments the repeat counter for instrumentation.
  174. func (q *FrameQueue) RecordRepeat() {
  175. q.mu.Lock()
  176. q.repeated++
  177. q.mu.Unlock()
  178. }
  179. // RecordMute increments the mute counter for instrumentation.
  180. func (q *FrameQueue) RecordMute() {
  181. q.mu.Lock()
  182. q.muted++
  183. q.mu.Unlock()
  184. }
  185. func (q *FrameQueue) isClosed() bool {
  186. q.mu.Lock()
  187. closed := q.closed
  188. q.mu.Unlock()
  189. return closed
  190. }
  191. func (q *FrameQueue) updateDepth(delta int) {
  192. q.mu.Lock()
  193. q.depth += delta
  194. q.trackDepth(q.depth)
  195. q.mu.Unlock()
  196. }
  197. func (q *FrameQueue) trackDepth(depth int) {
  198. if depth > q.highWaterMark {
  199. q.highWaterMark = depth
  200. }
  201. if depth < q.lowWaterMark {
  202. q.lowWaterMark = depth
  203. }
  204. }
  205. func (q *FrameQueue) fillLevelLocked() float64 {
  206. if q.capacity == 0 {
  207. return 0
  208. }
  209. // Use len(ch) rather than q.depth: depth is updated after the channel
  210. // operation, so it can be off by one during the Push/Pop window.
  211. return float64(len(q.ch)) / float64(q.capacity)
  212. }
  213. func (q *FrameQueue) recordPushTimeout() {
  214. q.mu.Lock()
  215. q.pushTimeouts++
  216. q.mu.Unlock()
  217. }
  218. func (q *FrameQueue) recordPopTimeout() {
  219. q.mu.Lock()
  220. q.popTimeouts++
  221. q.mu.Unlock()
  222. }
  223. func queueHealthFromFill(fill float64) QueueHealth {
  224. switch {
  225. case fill <= queueHealthCriticalThreshold:
  226. return QueueHealthCritical
  227. case fill <= queueHealthLowThreshold:
  228. return QueueHealthLow
  229. default:
  230. return QueueHealthNormal
  231. }
  232. }