Parcourir la source

feat: add runtime frame queue

tags/v0.9.0
Jan Svabenik il y a 1 mois
Parent
révision
b2fa1d9c23
5 fichiers modifiés avec 395 ajouts et 41 suppressions
  1. +31
    -18
      docs/pro-runtime-hardening-workboard.md
  2. +52
    -15
      internal/app/engine.go
  3. +19
    -8
      internal/config/config.go
  4. +211
    -0
      internal/output/frame_queue.go
  5. +82
    -0
      internal/output/frame_queue_test.go

+ 31
- 18
docs/pro-runtime-hardening-workboard.md Voir le fichier

@@ -42,9 +42,10 @@ Kein „ist im Kopf klar“. Der Stand kommt hier rein.
## 2. Gesamtüberblick

## Gesamtstatus
- Projektphase: `Planung / Strukturierung`
- Technischer Fokus aktuell: `noch offen`
- Nächster sinnvoller Startpunkt laut Konzept: `WS-03 Semantische Korrektheit und harte Config-/Runtime-Konsistenz`
- Projektphase: `Umsetzung (WS-01)`
- Technischer Fokus aktuell: `Entkoppelter TX-Pfad (FrameQueue + Writer)`
- Nächster sinnvoller Startpunkt laut Konzept: `WS-01 Deterministische Echtzeit-TX-Pipeline mit entkoppeltem Writer`
- Vorangegangene Workstreams: `WS-03 Semantische Korrektheit und konsistent angewandte Config` (abgeschlossen)

## Repo-bezogene bestätigte Ausgangslage

@@ -171,7 +172,7 @@ Wenn Semantik und Grenzwerte nicht sauber vereinheitlicht sind, bauen spätere R

# WS-01 — Deterministische Echtzeit-TX-Pipeline mit entkoppeltem Writer
**Priorität:** P0
**Gesamtstatus:** TODO
**Gesamtstatus:** IN PROGRESS

## Ziel
Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem, kontrolliertem Frame-Puffer betrieben.
@@ -184,21 +185,28 @@ Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem,
## Aufgaben

### WS-01-T1 — FrameQueue einführen
- **Status:** TODO
- **Owner:** offen
- **Status:** VERIFIED
- **Owner:** Lead Coderaffe
- **Code-Orte:**
- `internal/output/frame_queue.go`
- `internal/output/frame_queue_test.go`
- `internal/app/engine.go`
- ggf. neues internes Queue-Modul
- `internal/output/*`
- **Ziel:**
Bounded Queue mit fester Kapazität, sichtbarem Füllstand und Countern.
Bounded Queue mit fester Kapazität, sichtbarem Füllstand, Counter- / Statistikzugriff und klarer Trennung zwischen Generator und Writer.
- **Zu entscheiden:**
- Puffern vor oder nach Upsampling?
- Referenzentscheidung im Konzept: eher Device-Frame-Ebene
- Puffern vor oder nach Upsampling → Device-Frame-Ebene (Queue lebt nach dem Upsampler) für Writer-Simplifizierung.
- Referenzkapazität: `runtime.frameQueueCapacity` (default 3) bleibt konfigurierbar.
- **Akzeptanzpunkte:**
- keine unbounded queue
- Fill-Level live sichtbar
- Drop/Repeat/Mute niemals ohne Counter/Log
- Keine unbounded Queue.
- Fill-Level (High/Low) ist aus `QueueStats` sichtbar.
- Drop/Repeat/Mute-Counter sind vorhanden und testbar.
- **Nachweis:**
- `FrameQueue`-Implementierung (`internal/output/frame_queue.go`) liefert kapazitätsgesteuerte Push/Pop-Logik und Counters.
- Engine-Run nutzt Queue vor dem Writer und zeigt `QueueStats` in `EngineStats`.
- Tests (`internal/output/frame_queue_test.go` + `go test ./...`) decken Push/Pop, Timeout-Counters und Stats ab.
- **Restrisiken:**
- Die Queue wird aktuell synchron getrieben; ein dedizierter Writer-Worker fehlt noch.
- Queue-Close erwartet, dass Generator/Writer vor dem Schließen stoppen, sonst droht Panik beim Schreiben.

### WS-01-T2 — Writer-Worker einführen
- **Status:** TODO
@@ -229,10 +237,14 @@ Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem,
- Wie eng koppeln wir WS-01 mit WS-02, ohne Overengineering zu erzeugen?

## WS-01 Entscheidungslog
- Noch leer
| Datum | Entscheidung | Notiz |
|---|---|---|
| 2026-04-05 | FrameQueue mit Engine-Integration | Queue lebt nach dem Upsampler auf DeviceFrame-Ebene, Kapazität via `runtime.frameQueueCapacity`, `EngineStats` zeigt `QueueStats`, Tests decken Timeouts und Counters ab. |

## WS-01 Verifikation
- Noch leer
| Datum | Fokus | Ergebnis |
|---|---|---|
| 2026-04-05 | FrameQueue + Engine integration | ✅ `go test ./...` (im `internal`-Modul incl. `frame_queue_test.go`) |

---

@@ -484,7 +496,7 @@ Build-, Release- und Betriebsartefakte reproduzierbar und teamtauglich machen.

| ID | Status | Frage | Notiz |
|---|---|---|---|
| DEC-001 | OPEN | Puffern wir auf CompositeFrame- oder DeviceFrame-Ebene? | Konzept empfiehlt Device-Frame-Ebene |
| DEC-001 | RESOLVED | Puffern wir auf CompositeFrame- oder DeviceFrame-Ebene? | Queue lebt nach dem Upsampler (DeviceFrame-Ebene) gemäß `internal/app/engine.go`-Integrationsschleife. |
| DEC-002 | OPEN | Fault-Recovery zuerst mit `mute`, `repeat last safe frame` oder beidem? | Muss technisch und RF-seitig sauber bewertet werden |
| DEC-003 | OPEN | Ziehen wir minimale WS-05-Basis-Härtungen vor? | Timeouts/Body-Limits evtl. früher sinnvoll |
| DEC-004 | OPEN | Wie gross/simpel halten wir die erste State-Maschine? | Gefahr von Overengineering |
@@ -494,10 +506,11 @@ Build-, Release- und Betriebsartefakte reproduzierbar und teamtauglich machen.
## 7. Nächste sinnvolle Schritte

### Empfohlener Start
1. **WS-03-T1 Parameterinventar erstellen**
1. **WS-03-T1 Parameterinventar erstellen** *(abgeschlossen)*
2. **bekannte Inkonsistenzen (CFG-SEM-001, CTL-UX-001) konkret verifizieren**
3. **DesiredConfig / AppliedConfig / RuntimeState Zielmodell grob skizzieren**
4. Danach Architekturarbeit an **WS-01 + WS-02** starten
5. **Aktuell:** WS-01-T2 Writer-Worker einführen (Queue → Driver), danach WS-01-T3 Supervisor + WS-02 Runtime-State.

### Vor dem ersten grossen Umbau klären
- Was ist „minimal sinnvoll“ für Milestone 1?


+ 52
- 15
internal/app/engine.go Voir le fichier

@@ -2,6 +2,7 @@ package app

import (
"context"
"errors"
"fmt"
"log"
"sync"
@@ -12,6 +13,7 @@ import (
cfgpkg "github.com/jan/fm-rds-tx/internal/config"
"github.com/jan/fm-rds-tx/internal/dsp"
offpkg "github.com/jan/fm-rds-tx/internal/offline"
"github.com/jan/fm-rds-tx/internal/output"
"github.com/jan/fm-rds-tx/internal/platform"
)

@@ -54,17 +56,18 @@ func durationMs(ns uint64) float64 {
}

type EngineStats struct {
State string `json:"state"`
ChunksProduced uint64 `json:"chunksProduced"`
TotalSamples uint64 `json:"totalSamples"`
Underruns uint64 `json:"underruns"`
LateBuffers uint64 `json:"lateBuffers,omitempty"`
LastError string `json:"lastError,omitempty"`
UptimeSeconds float64 `json:"uptimeSeconds"`
MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
State string `json:"state"`
ChunksProduced uint64 `json:"chunksProduced"`
TotalSamples uint64 `json:"totalSamples"`
Underruns uint64 `json:"underruns"`
LateBuffers uint64 `json:"lateBuffers,omitempty"`
LastError string `json:"lastError,omitempty"`
UptimeSeconds float64 `json:"uptimeSeconds"`
MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
Queue output.QueueStats `json:"queue"`
}

// Engine is the continuous TX loop. It generates composite IQ in chunks,
@@ -79,6 +82,7 @@ type Engine struct {
upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate
chunkDuration time.Duration
deviceRate float64
frameQueue *output.FrameQueue

mu sync.Mutex
state EngineState
@@ -168,6 +172,7 @@ func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
chunkDuration: 50 * time.Millisecond,
deviceRate: deviceRate,
state: EngineIdle,
frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity),
}
}

@@ -346,6 +351,7 @@ func (e *Engine) Stats() EngineStats {
MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
MaxWriteMs: durationMs(e.maxWriteNs.Load()),
Queue: e.frameQueue.Stats(),
}
}

@@ -372,13 +378,45 @@ func (e *Engine) run(ctx context.Context) {
frame = e.upsampler.Process(frame)
}
t2 := time.Now()
n, err := e.driver.Write(ctx, frame)

if err := e.frameQueue.Push(ctx, frame); err != nil {
if ctx.Err() != nil {
return
}
if errors.Is(err, output.ErrFrameQueueClosed) {
return
}
e.lastError.Store(err.Error())
e.underruns.Add(1)
select {
case <-time.After(e.chunkDuration):
case <-ctx.Done():
return
}
continue
}

popFrame, err := e.frameQueue.Pop(ctx)
if err != nil {
if ctx.Err() != nil {
return
}
if errors.Is(err, output.ErrFrameQueueClosed) {
return
}
e.lastError.Store(err.Error())
e.underruns.Add(1)
continue
}

t3 := time.Now()
n, err := e.driver.Write(ctx, popFrame)
t4 := time.Now()

genDur := t1.Sub(t0)
upDur := t2.Sub(t1)
writeDur := t3.Sub(t2)
cycleDur := t3.Sub(t0)
writeDur := t4.Sub(t3)
cycleDur := t4.Sub(t0)

updateMaxDuration(&e.maxGenerateNs, genDur)
updateMaxDuration(&e.maxUpsampleNs, upDur)
@@ -399,7 +437,6 @@ func (e *Engine) run(ctx context.Context) {
}
e.lastError.Store(err.Error())
e.underruns.Add(1)
// Back off to avoid pegging CPU on persistent errors
select {
case <-time.After(e.chunkDuration):
case <-ctx.Done():


+ 19
- 8
internal/config/config.go Voir le fichier

@@ -14,6 +14,7 @@ type Config struct {
FM FMConfig `json:"fm"`
Backend BackendConfig `json:"backend"`
Control ControlConfig `json:"control"`
Runtime RuntimeConfig `json:"runtime"`
}

type AudioConfig struct {
@@ -35,18 +36,18 @@ type RDSConfig struct {
type FMConfig struct {
FrequencyMHz float64 `json:"frequencyMHz"`
StereoEnabled bool `json:"stereoEnabled"`
PilotLevel float64 `json:"pilotLevel"` // fraction of ±75kHz deviation (0.09 = 9%, ITU standard)
RDSInjection float64 `json:"rdsInjection"` // fraction of ±75kHz deviation (0.04 = 4%, typical)
PreEmphasisTauUS float64 `json:"preEmphasisTauUS"` // time constant in µs: 50 (EU) or 75 (US), 0=off
PilotLevel float64 `json:"pilotLevel"` // fraction of ±75kHz deviation (0.09 = 9%, ITU standard)
RDSInjection float64 `json:"rdsInjection"` // fraction of ±75kHz deviation (0.04 = 4%, typical)
PreEmphasisTauUS float64 `json:"preEmphasisTauUS"` // time constant in µs: 50 (EU) or 75 (US), 0=off
OutputDrive float64 `json:"outputDrive"`
CompositeRateHz int `json:"compositeRateHz"` // internal DSP/MPX sample rate
CompositeRateHz int `json:"compositeRateHz"` // internal DSP/MPX sample rate
MaxDeviationHz float64 `json:"maxDeviationHz"`
LimiterEnabled bool `json:"limiterEnabled"`
LimiterCeiling float64 `json:"limiterCeiling"`
FMModulationEnabled bool `json:"fmModulationEnabled"`
MpxGain float64 `json:"mpxGain"` // hardware calibration: scales entire composite output (default 1.0)
BS412Enabled bool `json:"bs412Enabled"` // ITU-R BS.412 MPX power limiter (EU requirement)
BS412ThresholdDBr float64 `json:"bs412ThresholdDBr"` // power limit in dBr (0 = standard, +3 = relaxed)
MpxGain float64 `json:"mpxGain"` // hardware calibration: scales entire composite output (default 1.0)
BS412Enabled bool `json:"bs412Enabled"` // ITU-R BS.412 MPX power limiter (EU requirement)
BS412ThresholdDBr float64 `json:"bs412ThresholdDBr"` // power limit in dBr (0 = standard, +3 = relaxed)
}

type BackendConfig struct {
@@ -63,6 +64,10 @@ type ControlConfig struct {
ListenAddress string `json:"listenAddress"`
}

type RuntimeConfig struct {
FrameQueueCapacity int `json:"frameQueueCapacity"`
}

func Default() Config {
return Config{
Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0.4},
@@ -83,6 +88,7 @@ func Default() Config {
},
Backend: BackendConfig{Kind: "file", OutputPath: "build/out/composite.f32"},
Control: ControlConfig{ListenAddress: "127.0.0.1:8088"},
Runtime: RuntimeConfig{FrameQueueCapacity: 3},
}
}

@@ -150,7 +156,9 @@ func (c Config) Validate() error {
if c.FM.LimiterCeiling < 0 || c.FM.LimiterCeiling > 2 {
return fmt.Errorf("fm.limiterCeiling out of range")
}
if c.FM.MpxGain == 0 { c.FM.MpxGain = 1.0 } // default if omitted from JSON
if c.FM.MpxGain == 0 {
c.FM.MpxGain = 1.0
} // default if omitted from JSON
if c.FM.MpxGain < 0.1 || c.FM.MpxGain > 5 {
return fmt.Errorf("fm.mpxGain out of range (0.1..5)")
}
@@ -163,6 +171,9 @@ func (c Config) Validate() error {
if c.Control.ListenAddress == "" {
return fmt.Errorf("control.listenAddress is required")
}
if c.Runtime.FrameQueueCapacity <= 0 {
return fmt.Errorf("runtime.frameQueueCapacity must be > 0")
}
// Fail-loud PI validation
if c.RDS.Enabled {
if _, err := ParsePI(c.RDS.PI); err != nil {


+ 211
- 0
internal/output/frame_queue.go Voir le fichier

@@ -0,0 +1,211 @@
package output

import (
"context"
"errors"
"sync"
)

// ErrFrameQueueClosed is returned when a queue operation is attempted after the queue
// has been closed.
var ErrFrameQueueClosed = errors.New("frame queue closed")

// QueueStats exposes the runtime state of a frame queue.
type QueueStats struct {
Capacity int `json:"capacity"`
Depth int `json:"depth"`
FillLevel float64 `json:"fillLevel"`
HighWaterMark int `json:"highWaterMark"`
LowWaterMark int `json:"lowWaterMark"`
PushTimeouts uint64 `json:"pushTimeouts"`
PopTimeouts uint64 `json:"popTimeouts"`
DroppedFrames uint64 `json:"droppedFrames"`
RepeatedFrames uint64 `json:"repeatedFrames"`
MutedFrames uint64 `json:"mutedFrames"`
}

// FrameQueue is a bounded ring that holds CompositeFrame instances between the
// generator and the writer. Push blocks when the queue is full until space
// becomes available or the provided context is cancelled. Pop blocks when the
// queue is empty until a new frame arrives or the context is cancelled.
type FrameQueue struct {
capacity int
ch chan *CompositeFrame

mu sync.Mutex
depth int
highWaterMark int
lowWaterMark int
pushTimeouts uint64
popTimeouts uint64
dropped uint64
repeated uint64
muted uint64
closed bool

closeOnce sync.Once
}

// NewFrameQueue builds a bounded queue that holds up to capacity frames.
func NewFrameQueue(capacity int) *FrameQueue {
if capacity <= 0 {
capacity = 1
}
fq := &FrameQueue{
capacity: capacity,
ch: make(chan *CompositeFrame, capacity),
lowWaterMark: capacity,
}
fq.trackDepth(0)
return fq
}

// Capacity returns the fixed frame capacity of the queue.
func (q *FrameQueue) Capacity() int {
return q.capacity
}

// FillLevel reports the current occupancy as a fraction of capacity.
func (q *FrameQueue) FillLevel() float64 {
q.mu.Lock()
depth := q.depth
q.mu.Unlock()
if q.capacity == 0 {
return 0
}
return float64(depth) / float64(q.capacity)
}

// Depth returns the current number of frames in the queue.
func (q *FrameQueue) Depth() int {
q.mu.Lock()
depth := q.depth
q.mu.Unlock()
return depth
}

// Stats returns a snapshot of the queue metrics.
func (q *FrameQueue) Stats() QueueStats {
q.mu.Lock()
stats := QueueStats{
Capacity: q.capacity,
Depth: q.depth,
FillLevel: q.fillLevelLocked(),
HighWaterMark: q.highWaterMark,
LowWaterMark: q.lowWaterMark,
PushTimeouts: q.pushTimeouts,
PopTimeouts: q.popTimeouts,
DroppedFrames: q.dropped,
RepeatedFrames: q.repeated,
MutedFrames: q.muted,
}
q.mu.Unlock()
return stats
}

// Push enqueues a frame, blocking until space is available or ctx is done.
func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error {
if frame == nil {
return errors.New("frame required")
}
if q.isClosed() {
return ErrFrameQueueClosed
}

select {
case q.ch <- frame:
q.updateDepth(+1)
return nil
case <-ctx.Done():
q.recordPushTimeout()
return ctx.Err()
}
}

// Pop removes a frame, blocking until one is available or ctx signals done.
func (q *FrameQueue) Pop(ctx context.Context) (*CompositeFrame, error) {
select {
case frame, ok := <-q.ch:
if !ok {
return nil, ErrFrameQueueClosed
}
q.updateDepth(-1)
return frame, nil
case <-ctx.Done():
q.recordPopTimeout()
return nil, ctx.Err()
}
}

// Close marks the queue as closed and wakes up blocked callers.
func (q *FrameQueue) Close() {
q.closeOnce.Do(func() {
q.mu.Lock()
q.closed = true
q.mu.Unlock()
close(q.ch)
})
}

// RecordDrop increments the drop counter for instrumentation.
func (q *FrameQueue) RecordDrop() {
q.mu.Lock()
q.dropped++
q.mu.Unlock()
}

// RecordRepeat increments the repeat counter for instrumentation.
func (q *FrameQueue) RecordRepeat() {
q.mu.Lock()
q.repeated++
q.mu.Unlock()
}

// RecordMute increments the mute counter for instrumentation.
func (q *FrameQueue) RecordMute() {
q.mu.Lock()
q.muted++
q.mu.Unlock()
}

func (q *FrameQueue) isClosed() bool {
q.mu.Lock()
closed := q.closed
q.mu.Unlock()
return closed
}

func (q *FrameQueue) updateDepth(delta int) {
q.mu.Lock()
q.depth += delta
q.trackDepth(q.depth)
q.mu.Unlock()
}

func (q *FrameQueue) trackDepth(depth int) {
if depth > q.highWaterMark {
q.highWaterMark = depth
}
if depth < q.lowWaterMark {
q.lowWaterMark = depth
}
}

func (q *FrameQueue) fillLevelLocked() float64 {
if q.capacity == 0 {
return 0
}
return float64(q.depth) / float64(q.capacity)
}

func (q *FrameQueue) recordPushTimeout() {
q.mu.Lock()
q.pushTimeouts++
q.mu.Unlock()
}

func (q *FrameQueue) recordPopTimeout() {
q.mu.Lock()
q.popTimeouts++
q.mu.Unlock()
}

+ 82
- 0
internal/output/frame_queue_test.go Voir le fichier

@@ -0,0 +1,82 @@
package output

import (
"context"
"testing"
"time"
)

func TestFrameQueuePushPop(t *testing.T) {
q := NewFrameQueue(2)
ctx := context.Background()

frame := &CompositeFrame{Sequence: 1}
if err := q.Push(ctx, frame); err != nil {
t.Fatalf("push failed: %v", err)
}
if got := q.Depth(); got != 1 {
t.Fatalf("expected depth 1, got %d", got)
}
if got := q.FillLevel(); got <= 0 || got >= 1 {
t.Fatalf("unexpected fill level: %f", got)
}

popped, err := q.Pop(ctx)
if err != nil {
t.Fatalf("pop failed: %v", err)
}
if popped != frame {
t.Fatal("popped frame differs from pushed frame")
}
if q.Depth() != 0 {
t.Fatalf("expected depth 0 after pop, got %d", q.Depth())
}

stats := q.Stats()
if stats.HighWaterMark == 0 {
t.Fatal("expected high water mark to track push")
}
if stats.LowWaterMark != 0 {
t.Fatalf("expected low water mark 0, got %d", stats.LowWaterMark)
}
}

func TestFrameQueuePushTimeout(t *testing.T) {
q := NewFrameQueue(1)
ctx := context.Background()
frame := &CompositeFrame{Sequence: 42}

if err := q.Push(ctx, frame); err != nil {
t.Fatalf("initial push: %v", err)
}

shortCtx, cancel := context.WithTimeout(ctx, 5*time.Millisecond)
defer cancel()
if err := q.Push(shortCtx, frame); err == nil {
t.Fatalf("expected timeout when pushing into full queue")
}
stats := q.Stats()
if stats.PushTimeouts == 0 {
t.Fatalf("expected push timeout counter to increment, got %d", stats.PushTimeouts)
}

_, _ = q.Pop(ctx)
}

func TestFrameQueueCounters(t *testing.T) {
q := NewFrameQueue(1)
q.RecordDrop()
q.RecordRepeat()
q.RecordMute()

stats := q.Stats()
if stats.DroppedFrames != 1 {
t.Fatalf("expected 1 drop, got %d", stats.DroppedFrames)
}
if stats.RepeatedFrames != 1 {
t.Fatalf("expected 1 repeat, got %d", stats.RepeatedFrames)
}
if stats.MutedFrames != 1 {
t.Fatalf("expected 1 mute, got %d", stats.MutedFrames)
}
}

Chargement…
Annuler
Enregistrer