Просмотр исходного кода

Add queue health indicator

tags/v0.9.0
Jan Svabenik 1 месяц назад
Родитель
Сommit
d62e8fae24
3 измененных файлов: 84 добавлений и 13 удалений
  1. +4
    -1
      docs/pro-runtime-hardening-workboard.md
  2. +39
    -12
      internal/output/frame_queue.go
  3. +41
    -0
      internal/output/frame_queue_test.go

+ 4
- 1
docs/pro-runtime-hardening-workboard.md Просмотреть файл

@@ -199,11 +199,12 @@ Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem,
- **Akzeptanzpunkte:**
- Keine unbounded Queue.
- Fill-Level (High/Low) ist aus `QueueStats` sichtbar.
- Queue-Health-Indikator (`queue.health`) liefert `critical`, `low` oder `normal` aus dem Fill-Level. EngineStats.`queue` zeigt den Status ebenfalls.
- Drop/Repeat/Mute-Counter sind vorhanden und testbar.
- **Nachweis:**
- `FrameQueue`-Implementierung (`internal/output/frame_queue.go`) liefert kapazitätsgesteuerte Push/Pop-Logik und Counters.
- Engine-Run nutzt Queue vor dem Writer und zeigt `QueueStats` in `EngineStats`.
- Tests (`internal/output/frame_queue_test.go` + `go test ./...`) decken Push/Pop, Timeout-Counters und Stats ab.
- Tests (`internal/output/frame_queue_test.go` + `go test ./...`) decken Push/Pop, Timeout-Counters, Stats und den neuen Queue-Health-Indikator ab.
- **Restrisiken:**
- Die Queue wird aktuell synchron getrieben; ein dedizierter Writer-Worker fehlt noch.
- Queue-Close erwartet, dass Generator/Writer vor dem Schließen stoppen, sonst droht Panik beim Schreiben.
@@ -246,11 +247,13 @@ Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem,
| Datum | Entscheidung | Notiz |
|---|---|---|
| 2026-04-05 | FrameQueue mit Engine-Integration | Queue lebt nach dem Upsampler auf DeviceFrame-Ebene, Kapazität via `runtime.frameQueueCapacity`, `EngineStats` zeigt `QueueStats`, Tests decken Timeouts und Counters ab. |
| 2026-04-05 | Queue-Health-Indikator | `QueueStats.Health` gibt `critical`/`low`/`normal` zurück und ist über `EngineStats.Queue` im Runtime-Endpunkt sichtbar. |

## WS-01 Verifikation
| Datum | Fokus | Ergebnis |
|---|---|---|
| 2026-04-05 | FrameQueue + Engine integration | ✅ `go test ./...` (im `internal`-Modul incl. `frame_queue_test.go`) |
| 2026-04-05 | Queue-Health-Indikator | go test ./... deckt `TestFrameQueueHealthIndicator` und `queue.health` ab. |

---



+ 39
- 12
internal/output/frame_queue.go Просмотреть файл

@@ -12,17 +12,31 @@ var ErrFrameQueueClosed = errors.New("frame queue closed")

// QueueStats exposes the runtime state of a frame queue.
type QueueStats struct {
Capacity int `json:"capacity"`
Depth int `json:"depth"`
FillLevel float64 `json:"fillLevel"`
HighWaterMark int `json:"highWaterMark"`
LowWaterMark int `json:"lowWaterMark"`
PushTimeouts uint64 `json:"pushTimeouts"`
PopTimeouts uint64 `json:"popTimeouts"`
DroppedFrames uint64 `json:"droppedFrames"`
RepeatedFrames uint64 `json:"repeatedFrames"`
MutedFrames uint64 `json:"mutedFrames"`
}
Capacity int `json:"capacity"`
Depth int `json:"depth"`
FillLevel float64 `json:"fillLevel"`
Health QueueHealth `json:"health"`
HighWaterMark int `json:"highWaterMark"`
LowWaterMark int `json:"lowWaterMark"`
PushTimeouts uint64 `json:"pushTimeouts"`
PopTimeouts uint64 `json:"popTimeouts"`
DroppedFrames uint64 `json:"droppedFrames"`
RepeatedFrames uint64 `json:"repeatedFrames"`
MutedFrames uint64 `json:"mutedFrames"`
}

type QueueHealth string

const (
QueueHealthCritical QueueHealth = "critical"
QueueHealthLow QueueHealth = "low"
QueueHealthNormal QueueHealth = "normal"
)

const (
queueHealthCriticalThreshold = 0.2
queueHealthLowThreshold = 0.5
)

// FrameQueue is a bounded ring that holds CompositeFrame instances between the
// generator and the writer. Push blocks when the queue is full until space
@@ -87,10 +101,12 @@ func (q *FrameQueue) Depth() int {
// Stats returns a snapshot of the queue metrics.
func (q *FrameQueue) Stats() QueueStats {
q.mu.Lock()
fill := q.fillLevelLocked()
stats := QueueStats{
Capacity: q.capacity,
Depth: q.depth,
FillLevel: q.fillLevelLocked(),
FillLevel: fill,
Health: queueHealthFromFill(fill),
HighWaterMark: q.highWaterMark,
LowWaterMark: q.lowWaterMark,
PushTimeouts: q.pushTimeouts,
@@ -209,3 +225,14 @@ func (q *FrameQueue) recordPopTimeout() {
q.popTimeouts++
q.mu.Unlock()
}

func queueHealthFromFill(fill float64) QueueHealth {
switch {
case fill <= queueHealthCriticalThreshold:
return QueueHealthCritical
case fill <= queueHealthLowThreshold:
return QueueHealthLow
default:
return QueueHealthNormal
}
}

+ 41
- 0
internal/output/frame_queue_test.go Просмотреть файл

@@ -80,3 +80,44 @@ func TestFrameQueueCounters(t *testing.T) {
t.Fatalf("expected 1 mute, got %d", stats.MutedFrames)
}
}

func TestFrameQueueHealthIndicator(t *testing.T) {
q := NewFrameQueue(4)
ctx := context.Background()

stats := q.Stats()
if stats.Health != QueueHealthCritical {
t.Fatalf("expected initial health critical, got %s", stats.Health)
}

push := func(seq int) {
frame := &CompositeFrame{Sequence: seq}
if err := q.Push(ctx, frame); err != nil {
t.Fatalf("push %d failed: %v", seq, err)
}
}

push(1)
stats = q.Stats()
if stats.Health != QueueHealthLow {
t.Fatalf("expected low after one frame, got %s", stats.Health)
}

push(2)
stats = q.Stats()
if stats.Health != QueueHealthLow {
t.Fatalf("expected low at 50%% fill, got %s", stats.Health)
}

push(3)
stats = q.Stats()
if stats.Health != QueueHealthNormal {
t.Fatalf("expected normal once queue has ~75%% fill, got %s", stats.Health)
}

for q.Depth() > 0 {
if _, err := q.Pop(ctx); err != nil {
t.Fatalf("cleanup pop failed: %v", err)
}
}
}

Загрузка…
Отмена
Сохранить