package output import ( "context" "errors" "sync" ) // ErrFrameQueueClosed is returned when a queue operation is attempted after the queue // has been closed. var ErrFrameQueueClosed = errors.New("frame queue closed") // QueueStats exposes the runtime state of a frame queue. type QueueStats struct { Capacity int `json:"capacity"` Depth int `json:"depth"` FillLevel float64 `json:"fillLevel"` Health QueueHealth `json:"health"` HighWaterMark int `json:"highWaterMark"` LowWaterMark int `json:"lowWaterMark"` PushTimeouts uint64 `json:"pushTimeouts"` PopTimeouts uint64 `json:"popTimeouts"` DroppedFrames uint64 `json:"droppedFrames"` RepeatedFrames uint64 `json:"repeatedFrames"` MutedFrames uint64 `json:"mutedFrames"` } type QueueHealth string const ( QueueHealthCritical QueueHealth = "critical" QueueHealthLow QueueHealth = "low" QueueHealthNormal QueueHealth = "normal" ) const ( queueHealthCriticalThreshold = 0.2 queueHealthLowThreshold = 0.5 ) // FrameQueue is a bounded ring that holds CompositeFrame instances between the // generator and the writer. Push blocks when the queue is full until space // becomes available or the provided context is cancelled. Pop blocks when the // queue is empty until a new frame arrives or the context is cancelled. type FrameQueue struct { capacity int ch chan *CompositeFrame // closeCh is closed by Close() and used in Push/Pop selects so that // a concurrent Close() can never race with a channel send. closeCh chan struct{} mu sync.Mutex depth int highWaterMark int lowWaterMark int pushTimeouts uint64 popTimeouts uint64 dropped uint64 repeated uint64 muted uint64 closed bool closeOnce sync.Once } // NewFrameQueue builds a bounded queue that holds up to capacity frames. func NewFrameQueue(capacity int) *FrameQueue { if capacity <= 0 { capacity = 1 } fq := &FrameQueue{ capacity: capacity, ch: make(chan *CompositeFrame, capacity), closeCh: make(chan struct{}), lowWaterMark: capacity, } fq.trackDepth(0) return fq } // Capacity returns the fixed frame capacity of the queue. func (q *FrameQueue) Capacity() int { return q.capacity } // FillLevel reports the current occupancy as a fraction of capacity. // Uses len(ch) directly for accuracy: updateDepth() is called after the // channel operation, so q.depth can lag by one frame transiently. func (q *FrameQueue) FillLevel() float64 { if q.capacity == 0 { return 0 } return float64(len(q.ch)) / float64(q.capacity) } // Depth returns the current number of frames in the queue. // Uses len(ch) directly for accuracy (see FillLevel). func (q *FrameQueue) Depth() int { return len(q.ch) } // Stats returns a snapshot of the queue metrics. func (q *FrameQueue) Stats() QueueStats { q.mu.Lock() fill := q.fillLevelLocked() stats := QueueStats{ Capacity: q.capacity, Depth: len(q.ch), FillLevel: fill, Health: queueHealthFromFill(fill), HighWaterMark: q.highWaterMark, LowWaterMark: q.lowWaterMark, PushTimeouts: q.pushTimeouts, PopTimeouts: q.popTimeouts, DroppedFrames: q.dropped, RepeatedFrames: q.repeated, MutedFrames: q.muted, } q.mu.Unlock() return stats } // Push enqueues a frame, blocking until space is available or ctx is done. func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error { if frame == nil { return errors.New("frame required") } // BUG-A fix: use closeCh in the select — no TOCTOU gap. // BUG-3 fix: updateDepth(+1) and trackDepth (highWaterMark) only on // successful send. Pre-incrementing before the select caused the // high-watermark to count frames that were never actually queued // (cancelled or closed path), potentially reporting capacity+1. select { case q.ch <- frame: q.updateDepth(+1) return nil case <-q.closeCh: return ErrFrameQueueClosed case <-ctx.Done(): q.recordPushTimeout() return ctx.Err() } } // Pop removes a frame, blocking until one is available or ctx signals done. func (q *FrameQueue) Pop(ctx context.Context) (*CompositeFrame, error) { select { case frame, ok := <-q.ch: if !ok { return nil, ErrFrameQueueClosed } q.updateDepth(-1) return frame, nil case <-ctx.Done(): q.recordPopTimeout() return nil, ctx.Err() } } // Drain removes and discards all frames currently in the queue. // Call before restarting a stopped engine to avoid replaying stale frames. func (q *FrameQueue) Drain() { for { select { case <-q.ch: q.updateDepth(-1) default: return } } } // Close marks the queue as closed and wakes up blocked callers. func (q *FrameQueue) Close() { q.closeOnce.Do(func() { q.mu.Lock() q.closed = true q.mu.Unlock() // Close closeCh first so blocked Push() calls unblock safely // before close(ch) removes the destination. close(q.closeCh) close(q.ch) }) } // RecordDrop increments the drop counter for instrumentation. func (q *FrameQueue) RecordDrop() { q.mu.Lock() q.dropped++ q.mu.Unlock() } // RecordRepeat increments the repeat counter for instrumentation. func (q *FrameQueue) RecordRepeat() { q.mu.Lock() q.repeated++ q.mu.Unlock() } // RecordMute increments the mute counter for instrumentation. func (q *FrameQueue) RecordMute() { q.mu.Lock() q.muted++ q.mu.Unlock() } func (q *FrameQueue) isClosed() bool { q.mu.Lock() closed := q.closed q.mu.Unlock() return closed } func (q *FrameQueue) updateDepth(delta int) { q.mu.Lock() q.depth += delta q.trackDepth(q.depth) q.mu.Unlock() } func (q *FrameQueue) trackDepth(depth int) { if depth > q.highWaterMark { q.highWaterMark = depth } if depth < q.lowWaterMark { q.lowWaterMark = depth } } func (q *FrameQueue) fillLevelLocked() float64 { if q.capacity == 0 { return 0 } // Use len(ch) rather than q.depth: depth is updated after the channel // operation, so it can be off by one during the Push/Pop window. return float64(len(q.ch)) / float64(q.capacity) } func (q *FrameQueue) recordPushTimeout() { q.mu.Lock() q.pushTimeouts++ q.mu.Unlock() } func (q *FrameQueue) recordPopTimeout() { q.mu.Lock() q.popTimeouts++ q.mu.Unlock() } func queueHealthFromFill(fill float64) QueueHealth { switch { case fill <= queueHealthCriticalThreshold: return QueueHealthCritical case fill <= queueHealthLowThreshold: return QueueHealthLow default: return QueueHealthNormal } }