|
- package output
-
- import (
- "context"
- "errors"
- "sync"
- )
-
- // ErrFrameQueueClosed is returned when a queue operation is attempted after the queue
- // has been closed.
- var ErrFrameQueueClosed = errors.New("frame queue closed")
-
- // QueueStats exposes the runtime state of a frame queue.
- type QueueStats struct {
- Capacity int `json:"capacity"`
- Depth int `json:"depth"`
- FillLevel float64 `json:"fillLevel"`
- Health QueueHealth `json:"health"`
- HighWaterMark int `json:"highWaterMark"`
- LowWaterMark int `json:"lowWaterMark"`
- PushTimeouts uint64 `json:"pushTimeouts"`
- PopTimeouts uint64 `json:"popTimeouts"`
- DroppedFrames uint64 `json:"droppedFrames"`
- RepeatedFrames uint64 `json:"repeatedFrames"`
- MutedFrames uint64 `json:"mutedFrames"`
- }
-
- type QueueHealth string
-
- const (
- QueueHealthCritical QueueHealth = "critical"
- QueueHealthLow QueueHealth = "low"
- QueueHealthNormal QueueHealth = "normal"
- )
-
- const (
- queueHealthCriticalThreshold = 0.2
- queueHealthLowThreshold = 0.5
- )
-
- // FrameQueue is a bounded ring that holds CompositeFrame instances between the
- // generator and the writer. Push blocks when the queue is full until space
- // becomes available or the provided context is cancelled. Pop blocks when the
- // queue is empty until a new frame arrives or the context is cancelled.
- type FrameQueue struct {
- capacity int
- ch chan *CompositeFrame
-
- mu sync.Mutex
- depth int
- highWaterMark int
- lowWaterMark int
- pushTimeouts uint64
- popTimeouts uint64
- dropped uint64
- repeated uint64
- muted uint64
- closed bool
-
- closeOnce sync.Once
- }
-
- // NewFrameQueue builds a bounded queue that holds up to capacity frames.
- func NewFrameQueue(capacity int) *FrameQueue {
- if capacity <= 0 {
- capacity = 1
- }
- fq := &FrameQueue{
- capacity: capacity,
- ch: make(chan *CompositeFrame, capacity),
- lowWaterMark: capacity,
- }
- fq.trackDepth(0)
- return fq
- }
-
- // Capacity returns the fixed frame capacity of the queue.
- func (q *FrameQueue) Capacity() int {
- return q.capacity
- }
-
- // FillLevel reports the current occupancy as a fraction of capacity.
- // Uses len(ch) directly for accuracy: updateDepth() is called after the
- // channel operation, so q.depth can lag by one frame transiently.
- func (q *FrameQueue) FillLevel() float64 {
- if q.capacity == 0 {
- return 0
- }
- return float64(len(q.ch)) / float64(q.capacity)
- }
-
- // Depth returns the current number of frames in the queue.
- // Uses len(ch) directly for accuracy (see FillLevel).
- func (q *FrameQueue) Depth() int {
- return len(q.ch)
- }
-
- // Stats returns a snapshot of the queue metrics.
- func (q *FrameQueue) Stats() QueueStats {
- q.mu.Lock()
- fill := q.fillLevelLocked()
- stats := QueueStats{
- Capacity: q.capacity,
- Depth: len(q.ch),
- FillLevel: fill,
- Health: queueHealthFromFill(fill),
- HighWaterMark: q.highWaterMark,
- LowWaterMark: q.lowWaterMark,
- PushTimeouts: q.pushTimeouts,
- PopTimeouts: q.popTimeouts,
- DroppedFrames: q.dropped,
- RepeatedFrames: q.repeated,
- MutedFrames: q.muted,
- }
- q.mu.Unlock()
- return stats
- }
-
- // Push enqueues a frame, blocking until space is available or ctx is done.
- func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error {
- if frame == nil {
- return errors.New("frame required")
- }
- if q.isClosed() {
- return ErrFrameQueueClosed
- }
-
- // BUG-05 fix: increment depth BEFORE the channel send so that Stats()
- // never reports fill=0 while a frame is in the channel awaiting receive.
- // On context cancellation, undo the increment.
- q.updateDepth(+1)
- select {
- case q.ch <- frame:
- return nil
- case <-ctx.Done():
- q.updateDepth(-1)
- q.recordPushTimeout()
- return ctx.Err()
- }
- }
-
- // Pop removes a frame, blocking until one is available or ctx signals done.
- func (q *FrameQueue) Pop(ctx context.Context) (*CompositeFrame, error) {
- select {
- case frame, ok := <-q.ch:
- if !ok {
- return nil, ErrFrameQueueClosed
- }
- q.updateDepth(-1)
- return frame, nil
- case <-ctx.Done():
- q.recordPopTimeout()
- return nil, ctx.Err()
- }
- }
-
- // Close marks the queue as closed and wakes up blocked callers.
- func (q *FrameQueue) Close() {
- q.closeOnce.Do(func() {
- q.mu.Lock()
- q.closed = true
- q.mu.Unlock()
- close(q.ch)
- })
- }
-
- // RecordDrop increments the drop counter for instrumentation.
- func (q *FrameQueue) RecordDrop() {
- q.mu.Lock()
- q.dropped++
- q.mu.Unlock()
- }
-
- // RecordRepeat increments the repeat counter for instrumentation.
- func (q *FrameQueue) RecordRepeat() {
- q.mu.Lock()
- q.repeated++
- q.mu.Unlock()
- }
-
- // RecordMute increments the mute counter for instrumentation.
- func (q *FrameQueue) RecordMute() {
- q.mu.Lock()
- q.muted++
- q.mu.Unlock()
- }
-
- func (q *FrameQueue) isClosed() bool {
- q.mu.Lock()
- closed := q.closed
- q.mu.Unlock()
- return closed
- }
-
- func (q *FrameQueue) updateDepth(delta int) {
- q.mu.Lock()
- q.depth += delta
- q.trackDepth(q.depth)
- q.mu.Unlock()
- }
-
- func (q *FrameQueue) trackDepth(depth int) {
- if depth > q.highWaterMark {
- q.highWaterMark = depth
- }
- if depth < q.lowWaterMark {
- q.lowWaterMark = depth
- }
- }
-
- func (q *FrameQueue) fillLevelLocked() float64 {
- if q.capacity == 0 {
- return 0
- }
- // Use len(ch) rather than q.depth: depth is updated after the channel
- // operation, so it can be off by one during the Push/Pop window.
- return float64(len(q.ch)) / float64(q.capacity)
- }
-
- func (q *FrameQueue) recordPushTimeout() {
- q.mu.Lock()
- q.pushTimeouts++
- q.mu.Unlock()
- }
-
- func (q *FrameQueue) recordPopTimeout() {
- q.mu.Lock()
- q.popTimeouts++
- q.mu.Unlock()
- }
-
- func queueHealthFromFill(fill float64) QueueHealth {
- switch {
- case fill <= queueHealthCriticalThreshold:
- return QueueHealthCritical
- case fill <= queueHealthLowThreshold:
- return QueueHealthLow
- default:
- return QueueHealthNormal
- }
- }
|