Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

250 lines
6.2KB

  1. package output
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. )
  7. // ErrFrameQueueClosed is returned when a queue operation is attempted after the queue
  8. // has been closed.
  9. var ErrFrameQueueClosed = errors.New("frame queue closed")
  10. // QueueStats exposes the runtime state of a frame queue.
  11. type QueueStats struct {
  12. Capacity int `json:"capacity"`
  13. Depth int `json:"depth"`
  14. FillLevel float64 `json:"fillLevel"`
  15. Health QueueHealth `json:"health"`
  16. HighWaterMark int `json:"highWaterMark"`
  17. LowWaterMark int `json:"lowWaterMark"`
  18. PushTimeouts uint64 `json:"pushTimeouts"`
  19. PopTimeouts uint64 `json:"popTimeouts"`
  20. DroppedFrames uint64 `json:"droppedFrames"`
  21. RepeatedFrames uint64 `json:"repeatedFrames"`
  22. MutedFrames uint64 `json:"mutedFrames"`
  23. }
  24. type QueueHealth string
  25. const (
  26. QueueHealthCritical QueueHealth = "critical"
  27. QueueHealthLow QueueHealth = "low"
  28. QueueHealthNormal QueueHealth = "normal"
  29. )
  30. const (
  31. queueHealthCriticalThreshold = 0.2
  32. queueHealthLowThreshold = 0.5
  33. )
  34. // FrameQueue is a bounded ring that holds CompositeFrame instances between the
  35. // generator and the writer. Push blocks when the queue is full until space
  36. // becomes available or the provided context is cancelled. Pop blocks when the
  37. // queue is empty until a new frame arrives or the context is cancelled.
  38. type FrameQueue struct {
  39. capacity int
  40. ch chan *CompositeFrame
  41. // closeCh is closed by Close() and used in Push/Pop selects so that
  42. // a concurrent Close() can never race with a channel send.
  43. closeCh chan struct{}
  44. mu sync.Mutex
  45. depth int
  46. highWaterMark int
  47. lowWaterMark int
  48. pushTimeouts uint64
  49. popTimeouts uint64
  50. dropped uint64
  51. repeated uint64
  52. muted uint64
  53. closed bool
  54. closeOnce sync.Once
  55. }
  56. // NewFrameQueue builds a bounded queue that holds up to capacity frames.
  57. func NewFrameQueue(capacity int) *FrameQueue {
  58. if capacity <= 0 {
  59. capacity = 1
  60. }
  61. fq := &FrameQueue{
  62. capacity: capacity,
  63. ch: make(chan *CompositeFrame, capacity),
  64. closeCh: make(chan struct{}),
  65. lowWaterMark: capacity,
  66. }
  67. fq.trackDepth(0)
  68. return fq
  69. }
  70. // Capacity returns the fixed frame capacity of the queue.
  71. func (q *FrameQueue) Capacity() int {
  72. return q.capacity
  73. }
  74. // FillLevel reports the current occupancy as a fraction of capacity.
  75. // Uses len(ch) directly for accuracy: updateDepth() is called after the
  76. // channel operation, so q.depth can lag by one frame transiently.
  77. func (q *FrameQueue) FillLevel() float64 {
  78. if q.capacity == 0 {
  79. return 0
  80. }
  81. return float64(len(q.ch)) / float64(q.capacity)
  82. }
  83. // Depth returns the current number of frames in the queue.
  84. // Uses len(ch) directly for accuracy (see FillLevel).
  85. func (q *FrameQueue) Depth() int {
  86. return len(q.ch)
  87. }
  88. // Stats returns a snapshot of the queue metrics.
  89. func (q *FrameQueue) Stats() QueueStats {
  90. q.mu.Lock()
  91. fill := q.fillLevelLocked()
  92. stats := QueueStats{
  93. Capacity: q.capacity,
  94. Depth: len(q.ch),
  95. FillLevel: fill,
  96. Health: queueHealthFromFill(fill),
  97. HighWaterMark: q.highWaterMark,
  98. LowWaterMark: q.lowWaterMark,
  99. PushTimeouts: q.pushTimeouts,
  100. PopTimeouts: q.popTimeouts,
  101. DroppedFrames: q.dropped,
  102. RepeatedFrames: q.repeated,
  103. MutedFrames: q.muted,
  104. }
  105. q.mu.Unlock()
  106. return stats
  107. }
  108. // Push enqueues a frame, blocking until space is available or ctx is done.
  109. func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error {
  110. if frame == nil {
  111. return errors.New("frame required")
  112. }
  113. // BUG-A fix: use closeCh in the select so that a concurrent Close() can
  114. // never race with the send. The old isClosed() pre-check + separate
  115. // ch<- send had a TOCTOU gap that could panic with "send on closed channel".
  116. // BUG-05 fix: increment depth before the send; undo on cancel/close.
  117. q.updateDepth(+1)
  118. select {
  119. case q.ch <- frame:
  120. return nil
  121. case <-q.closeCh:
  122. q.updateDepth(-1)
  123. return ErrFrameQueueClosed
  124. case <-ctx.Done():
  125. q.updateDepth(-1)
  126. q.recordPushTimeout()
  127. return ctx.Err()
  128. }
  129. }
  130. // Pop removes a frame, blocking until one is available or ctx signals done.
  131. func (q *FrameQueue) Pop(ctx context.Context) (*CompositeFrame, error) {
  132. select {
  133. case frame, ok := <-q.ch:
  134. if !ok {
  135. return nil, ErrFrameQueueClosed
  136. }
  137. q.updateDepth(-1)
  138. return frame, nil
  139. case <-ctx.Done():
  140. q.recordPopTimeout()
  141. return nil, ctx.Err()
  142. }
  143. }
  144. // Close marks the queue as closed and wakes up blocked callers.
  145. func (q *FrameQueue) Close() {
  146. q.closeOnce.Do(func() {
  147. q.mu.Lock()
  148. q.closed = true
  149. q.mu.Unlock()
  150. // Close closeCh first so blocked Push() calls unblock safely
  151. // before close(ch) removes the destination.
  152. close(q.closeCh)
  153. close(q.ch)
  154. })
  155. }
  156. // RecordDrop increments the drop counter for instrumentation.
  157. func (q *FrameQueue) RecordDrop() {
  158. q.mu.Lock()
  159. q.dropped++
  160. q.mu.Unlock()
  161. }
  162. // RecordRepeat increments the repeat counter for instrumentation.
  163. func (q *FrameQueue) RecordRepeat() {
  164. q.mu.Lock()
  165. q.repeated++
  166. q.mu.Unlock()
  167. }
  168. // RecordMute increments the mute counter for instrumentation.
  169. func (q *FrameQueue) RecordMute() {
  170. q.mu.Lock()
  171. q.muted++
  172. q.mu.Unlock()
  173. }
  174. func (q *FrameQueue) isClosed() bool {
  175. q.mu.Lock()
  176. closed := q.closed
  177. q.mu.Unlock()
  178. return closed
  179. }
  180. func (q *FrameQueue) updateDepth(delta int) {
  181. q.mu.Lock()
  182. q.depth += delta
  183. q.trackDepth(q.depth)
  184. q.mu.Unlock()
  185. }
  186. func (q *FrameQueue) trackDepth(depth int) {
  187. if depth > q.highWaterMark {
  188. q.highWaterMark = depth
  189. }
  190. if depth < q.lowWaterMark {
  191. q.lowWaterMark = depth
  192. }
  193. }
  194. func (q *FrameQueue) fillLevelLocked() float64 {
  195. if q.capacity == 0 {
  196. return 0
  197. }
  198. // Use len(ch) rather than q.depth: depth is updated after the channel
  199. // operation, so it can be off by one during the Push/Pop window.
  200. return float64(len(q.ch)) / float64(q.capacity)
  201. }
  202. func (q *FrameQueue) recordPushTimeout() {
  203. q.mu.Lock()
  204. q.pushTimeouts++
  205. q.mu.Unlock()
  206. }
  207. func (q *FrameQueue) recordPopTimeout() {
  208. q.mu.Lock()
  209. q.popTimeouts++
  210. q.mu.Unlock()
  211. }
  212. func queueHealthFromFill(fill float64) QueueHealth {
  213. switch {
  214. case fill <= queueHealthCriticalThreshold:
  215. return QueueHealthCritical
  216. case fill <= queueHealthLowThreshold:
  217. return QueueHealthLow
  218. default:
  219. return QueueHealthNormal
  220. }
  221. }