Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

239 wiersze
5.3KB

  1. package output
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. )
  7. // ErrFrameQueueClosed is returned when a queue operation is attempted after the queue
  8. // has been closed.
  9. var ErrFrameQueueClosed = errors.New("frame queue closed")
  10. // QueueStats exposes the runtime state of a frame queue.
  11. type QueueStats struct {
  12. Capacity int `json:"capacity"`
  13. Depth int `json:"depth"`
  14. FillLevel float64 `json:"fillLevel"`
  15. Health QueueHealth `json:"health"`
  16. HighWaterMark int `json:"highWaterMark"`
  17. LowWaterMark int `json:"lowWaterMark"`
  18. PushTimeouts uint64 `json:"pushTimeouts"`
  19. PopTimeouts uint64 `json:"popTimeouts"`
  20. DroppedFrames uint64 `json:"droppedFrames"`
  21. RepeatedFrames uint64 `json:"repeatedFrames"`
  22. MutedFrames uint64 `json:"mutedFrames"`
  23. }
  24. type QueueHealth string
  25. const (
  26. QueueHealthCritical QueueHealth = "critical"
  27. QueueHealthLow QueueHealth = "low"
  28. QueueHealthNormal QueueHealth = "normal"
  29. )
  30. const (
  31. queueHealthCriticalThreshold = 0.2
  32. queueHealthLowThreshold = 0.5
  33. )
  34. // FrameQueue is a bounded ring that holds CompositeFrame instances between the
  35. // generator and the writer. Push blocks when the queue is full until space
  36. // becomes available or the provided context is cancelled. Pop blocks when the
  37. // queue is empty until a new frame arrives or the context is cancelled.
  38. type FrameQueue struct {
  39. capacity int
  40. ch chan *CompositeFrame
  41. mu sync.Mutex
  42. depth int
  43. highWaterMark int
  44. lowWaterMark int
  45. pushTimeouts uint64
  46. popTimeouts uint64
  47. dropped uint64
  48. repeated uint64
  49. muted uint64
  50. closed bool
  51. closeOnce sync.Once
  52. }
  53. // NewFrameQueue builds a bounded queue that holds up to capacity frames.
  54. func NewFrameQueue(capacity int) *FrameQueue {
  55. if capacity <= 0 {
  56. capacity = 1
  57. }
  58. fq := &FrameQueue{
  59. capacity: capacity,
  60. ch: make(chan *CompositeFrame, capacity),
  61. lowWaterMark: capacity,
  62. }
  63. fq.trackDepth(0)
  64. return fq
  65. }
  66. // Capacity returns the fixed frame capacity of the queue.
  67. func (q *FrameQueue) Capacity() int {
  68. return q.capacity
  69. }
  70. // FillLevel reports the current occupancy as a fraction of capacity.
  71. func (q *FrameQueue) FillLevel() float64 {
  72. q.mu.Lock()
  73. depth := q.depth
  74. q.mu.Unlock()
  75. if q.capacity == 0 {
  76. return 0
  77. }
  78. return float64(depth) / float64(q.capacity)
  79. }
  80. // Depth returns the current number of frames in the queue.
  81. func (q *FrameQueue) Depth() int {
  82. q.mu.Lock()
  83. depth := q.depth
  84. q.mu.Unlock()
  85. return depth
  86. }
  87. // Stats returns a snapshot of the queue metrics.
  88. func (q *FrameQueue) Stats() QueueStats {
  89. q.mu.Lock()
  90. fill := q.fillLevelLocked()
  91. stats := QueueStats{
  92. Capacity: q.capacity,
  93. Depth: q.depth,
  94. FillLevel: fill,
  95. Health: queueHealthFromFill(fill),
  96. HighWaterMark: q.highWaterMark,
  97. LowWaterMark: q.lowWaterMark,
  98. PushTimeouts: q.pushTimeouts,
  99. PopTimeouts: q.popTimeouts,
  100. DroppedFrames: q.dropped,
  101. RepeatedFrames: q.repeated,
  102. MutedFrames: q.muted,
  103. }
  104. q.mu.Unlock()
  105. return stats
  106. }
  107. // Push enqueues a frame, blocking until space is available or ctx is done.
  108. func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error {
  109. if frame == nil {
  110. return errors.New("frame required")
  111. }
  112. if q.isClosed() {
  113. return ErrFrameQueueClosed
  114. }
  115. select {
  116. case q.ch <- frame:
  117. q.updateDepth(+1)
  118. return nil
  119. case <-ctx.Done():
  120. q.recordPushTimeout()
  121. return ctx.Err()
  122. }
  123. }
  124. // Pop removes a frame, blocking until one is available or ctx signals done.
  125. func (q *FrameQueue) Pop(ctx context.Context) (*CompositeFrame, error) {
  126. select {
  127. case frame, ok := <-q.ch:
  128. if !ok {
  129. return nil, ErrFrameQueueClosed
  130. }
  131. q.updateDepth(-1)
  132. return frame, nil
  133. case <-ctx.Done():
  134. q.recordPopTimeout()
  135. return nil, ctx.Err()
  136. }
  137. }
  138. // Close marks the queue as closed and wakes up blocked callers.
  139. func (q *FrameQueue) Close() {
  140. q.closeOnce.Do(func() {
  141. q.mu.Lock()
  142. q.closed = true
  143. q.mu.Unlock()
  144. close(q.ch)
  145. })
  146. }
  147. // RecordDrop increments the drop counter for instrumentation.
  148. func (q *FrameQueue) RecordDrop() {
  149. q.mu.Lock()
  150. q.dropped++
  151. q.mu.Unlock()
  152. }
  153. // RecordRepeat increments the repeat counter for instrumentation.
  154. func (q *FrameQueue) RecordRepeat() {
  155. q.mu.Lock()
  156. q.repeated++
  157. q.mu.Unlock()
  158. }
  159. // RecordMute increments the mute counter for instrumentation.
  160. func (q *FrameQueue) RecordMute() {
  161. q.mu.Lock()
  162. q.muted++
  163. q.mu.Unlock()
  164. }
  165. func (q *FrameQueue) isClosed() bool {
  166. q.mu.Lock()
  167. closed := q.closed
  168. q.mu.Unlock()
  169. return closed
  170. }
  171. func (q *FrameQueue) updateDepth(delta int) {
  172. q.mu.Lock()
  173. q.depth += delta
  174. q.trackDepth(q.depth)
  175. q.mu.Unlock()
  176. }
  177. func (q *FrameQueue) trackDepth(depth int) {
  178. if depth > q.highWaterMark {
  179. q.highWaterMark = depth
  180. }
  181. if depth < q.lowWaterMark {
  182. q.lowWaterMark = depth
  183. }
  184. }
  185. func (q *FrameQueue) fillLevelLocked() float64 {
  186. if q.capacity == 0 {
  187. return 0
  188. }
  189. return float64(q.depth) / float64(q.capacity)
  190. }
  191. func (q *FrameQueue) recordPushTimeout() {
  192. q.mu.Lock()
  193. q.pushTimeouts++
  194. q.mu.Unlock()
  195. }
  196. func (q *FrameQueue) recordPopTimeout() {
  197. q.mu.Lock()
  198. q.popTimeouts++
  199. q.mu.Unlock()
  200. }
  201. func queueHealthFromFill(fill float64) QueueHealth {
  202. switch {
  203. case fill <= queueHealthCriticalThreshold:
  204. return QueueHealthCritical
  205. case fill <= queueHealthLowThreshold:
  206. return QueueHealthLow
  207. default:
  208. return QueueHealthNormal
  209. }
  210. }