diff --git a/docs/pro-runtime-hardening-workboard.md b/docs/pro-runtime-hardening-workboard.md index 1a3866a..d8aee65 100644 --- a/docs/pro-runtime-hardening-workboard.md +++ b/docs/pro-runtime-hardening-workboard.md @@ -42,9 +42,10 @@ Kein „ist im Kopf klar“. Der Stand kommt hier rein. ## 2. Gesamtüberblick ## Gesamtstatus -- Projektphase: `Planung / Strukturierung` -- Technischer Fokus aktuell: `noch offen` -- Nächster sinnvoller Startpunkt laut Konzept: `WS-03 Semantische Korrektheit und harte Config-/Runtime-Konsistenz` +- Projektphase: `Umsetzung (WS-01)` +- Technischer Fokus aktuell: `Entkoppelter TX-Pfad (FrameQueue + Writer)` +- Nächster sinnvoller Startpunkt laut Konzept: `WS-01 Deterministische Echtzeit-TX-Pipeline mit entkoppeltem Writer` +- Vorangegangene Workstreams: `WS-03 Semantische Korrektheit und konsistent angewandte Config` (abgeschlossen) ## Repo-bezogene bestätigte Ausgangslage @@ -171,7 +172,7 @@ Wenn Semantik und Grenzwerte nicht sauber vereinheitlicht sind, bauen spätere R # WS-01 — Deterministische Echtzeit-TX-Pipeline mit entkoppeltem Writer **Priorität:** P0 -**Gesamtstatus:** TODO +**Gesamtstatus:** IN PROGRESS ## Ziel Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem, kontrolliertem Frame-Puffer betrieben. @@ -184,21 +185,28 @@ Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem, ## Aufgaben ### WS-01-T1 — FrameQueue einführen -- **Status:** TODO -- **Owner:** offen +- **Status:** VERIFIED +- **Owner:** Lead Coderaffe - **Code-Orte:** + - `internal/output/frame_queue.go` + - `internal/output/frame_queue_test.go` - `internal/app/engine.go` - - ggf. neues internes Queue-Modul - - `internal/output/*` - **Ziel:** - Bounded Queue mit fester Kapazität, sichtbarem Füllstand und Countern. + Bounded Queue mit fester Kapazität, sichtbarem Füllstand, Counter- / Statistikzugriff und klarer Trennung zwischen Generator und Writer. - **Zu entscheiden:** - - Puffern vor oder nach Upsampling? - - Referenzentscheidung im Konzept: eher Device-Frame-Ebene + - Puffern vor oder nach Upsampling → Device-Frame-Ebene (Queue lebt nach dem Upsampler) für Writer-Simplifizierung. + - Referenzkapazität: `runtime.frameQueueCapacity` (default 3) bleibt konfigurierbar. - **Akzeptanzpunkte:** - - keine unbounded queue - - Fill-Level live sichtbar - - Drop/Repeat/Mute niemals ohne Counter/Log + - Keine unbounded Queue. + - Fill-Level (High/Low) ist aus `QueueStats` sichtbar. + - Drop/Repeat/Mute-Counter sind vorhanden und testbar. +- **Nachweis:** + - `FrameQueue`-Implementierung (`internal/output/frame_queue.go`) liefert kapazitätsgesteuerte Push/Pop-Logik und Counters. + - Engine-Run nutzt Queue vor dem Writer und zeigt `QueueStats` in `EngineStats`. + - Tests (`internal/output/frame_queue_test.go` + `go test ./...`) decken Push/Pop, Timeout-Counters und Stats ab. +- **Restrisiken:** + - Die Queue wird aktuell synchron getrieben; ein dedizierter Writer-Worker fehlt noch. + - Queue-Close erwartet, dass Generator/Writer vor dem Schließen stoppen, sonst droht Panik beim Schreiben. ### WS-01-T2 — Writer-Worker einführen - **Status:** TODO @@ -229,10 +237,14 @@ Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem, - Wie eng koppeln wir WS-01 mit WS-02, ohne Overengineering zu erzeugen? ## WS-01 Entscheidungslog -- Noch leer +| Datum | Entscheidung | Notiz | +|---|---|---| +| 2026-04-05 | FrameQueue mit Engine-Integration | Queue lebt nach dem Upsampler auf DeviceFrame-Ebene, Kapazität via `runtime.frameQueueCapacity`, `EngineStats` zeigt `QueueStats`, Tests decken Timeouts und Counters ab. | ## WS-01 Verifikation -- Noch leer +| Datum | Fokus | Ergebnis | +|---|---|---| +| 2026-04-05 | FrameQueue + Engine integration | ✅ `go test ./...` (im `internal`-Modul incl. `frame_queue_test.go`) | --- @@ -484,7 +496,7 @@ Build-, Release- und Betriebsartefakte reproduzierbar und teamtauglich machen. | ID | Status | Frage | Notiz | |---|---|---|---| -| DEC-001 | OPEN | Puffern wir auf CompositeFrame- oder DeviceFrame-Ebene? | Konzept empfiehlt Device-Frame-Ebene | +| DEC-001 | RESOLVED | Puffern wir auf CompositeFrame- oder DeviceFrame-Ebene? | Queue lebt nach dem Upsampler (DeviceFrame-Ebene) gemäß `internal/app/engine.go`-Integrationsschleife. | | DEC-002 | OPEN | Fault-Recovery zuerst mit `mute`, `repeat last safe frame` oder beidem? | Muss technisch und RF-seitig sauber bewertet werden | | DEC-003 | OPEN | Ziehen wir minimale WS-05-Basis-Härtungen vor? | Timeouts/Body-Limits evtl. früher sinnvoll | | DEC-004 | OPEN | Wie gross/simpel halten wir die erste State-Maschine? | Gefahr von Overengineering | @@ -494,10 +506,11 @@ Build-, Release- und Betriebsartefakte reproduzierbar und teamtauglich machen. ## 7. Nächste sinnvolle Schritte ### Empfohlener Start -1. **WS-03-T1 Parameterinventar erstellen** +1. **WS-03-T1 Parameterinventar erstellen** *(abgeschlossen)* 2. **bekannte Inkonsistenzen (CFG-SEM-001, CTL-UX-001) konkret verifizieren** 3. **DesiredConfig / AppliedConfig / RuntimeState Zielmodell grob skizzieren** 4. Danach Architekturarbeit an **WS-01 + WS-02** starten +5. **Aktuell:** WS-01-T2 Writer-Worker einführen (Queue → Driver), danach WS-01-T3 Supervisor + WS-02 Runtime-State. ### Vor dem ersten grossen Umbau klären - Was ist „minimal sinnvoll“ für Milestone 1? diff --git a/internal/app/engine.go b/internal/app/engine.go index bd13134..6904ec9 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -2,6 +2,7 @@ package app import ( "context" + "errors" "fmt" "log" "sync" @@ -12,6 +13,7 @@ import ( cfgpkg "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/dsp" offpkg "github.com/jan/fm-rds-tx/internal/offline" + "github.com/jan/fm-rds-tx/internal/output" "github.com/jan/fm-rds-tx/internal/platform" ) @@ -54,17 +56,18 @@ func durationMs(ns uint64) float64 { } type EngineStats struct { - State string `json:"state"` - ChunksProduced uint64 `json:"chunksProduced"` - TotalSamples uint64 `json:"totalSamples"` - Underruns uint64 `json:"underruns"` - LateBuffers uint64 `json:"lateBuffers,omitempty"` - LastError string `json:"lastError,omitempty"` - UptimeSeconds float64 `json:"uptimeSeconds"` - MaxCycleMs float64 `json:"maxCycleMs,omitempty"` - MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"` - MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"` - MaxWriteMs float64 `json:"maxWriteMs,omitempty"` + State string `json:"state"` + ChunksProduced uint64 `json:"chunksProduced"` + TotalSamples uint64 `json:"totalSamples"` + Underruns uint64 `json:"underruns"` + LateBuffers uint64 `json:"lateBuffers,omitempty"` + LastError string `json:"lastError,omitempty"` + UptimeSeconds float64 `json:"uptimeSeconds"` + MaxCycleMs float64 `json:"maxCycleMs,omitempty"` + MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"` + MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"` + MaxWriteMs float64 `json:"maxWriteMs,omitempty"` + Queue output.QueueStats `json:"queue"` } // Engine is the continuous TX loop. It generates composite IQ in chunks, @@ -79,6 +82,7 @@ type Engine struct { upsampler *dsp.FMUpsampler // nil = same-rate, non-nil = split-rate chunkDuration time.Duration deviceRate float64 + frameQueue *output.FrameQueue mu sync.Mutex state EngineState @@ -168,6 +172,7 @@ func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine { chunkDuration: 50 * time.Millisecond, deviceRate: deviceRate, state: EngineIdle, + frameQueue: output.NewFrameQueue(cfg.Runtime.FrameQueueCapacity), } } @@ -346,6 +351,7 @@ func (e *Engine) Stats() EngineStats { MaxGenerateMs: durationMs(e.maxGenerateNs.Load()), MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()), MaxWriteMs: durationMs(e.maxWriteNs.Load()), + Queue: e.frameQueue.Stats(), } } @@ -372,13 +378,45 @@ func (e *Engine) run(ctx context.Context) { frame = e.upsampler.Process(frame) } t2 := time.Now() - n, err := e.driver.Write(ctx, frame) + + if err := e.frameQueue.Push(ctx, frame); err != nil { + if ctx.Err() != nil { + return + } + if errors.Is(err, output.ErrFrameQueueClosed) { + return + } + e.lastError.Store(err.Error()) + e.underruns.Add(1) + select { + case <-time.After(e.chunkDuration): + case <-ctx.Done(): + return + } + continue + } + + popFrame, err := e.frameQueue.Pop(ctx) + if err != nil { + if ctx.Err() != nil { + return + } + if errors.Is(err, output.ErrFrameQueueClosed) { + return + } + e.lastError.Store(err.Error()) + e.underruns.Add(1) + continue + } + t3 := time.Now() + n, err := e.driver.Write(ctx, popFrame) + t4 := time.Now() genDur := t1.Sub(t0) upDur := t2.Sub(t1) - writeDur := t3.Sub(t2) - cycleDur := t3.Sub(t0) + writeDur := t4.Sub(t3) + cycleDur := t4.Sub(t0) updateMaxDuration(&e.maxGenerateNs, genDur) updateMaxDuration(&e.maxUpsampleNs, upDur) @@ -399,7 +437,6 @@ func (e *Engine) run(ctx context.Context) { } e.lastError.Store(err.Error()) e.underruns.Add(1) - // Back off to avoid pegging CPU on persistent errors select { case <-time.After(e.chunkDuration): case <-ctx.Done(): diff --git a/internal/config/config.go b/internal/config/config.go index 7654d17..6c73382 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,6 +14,7 @@ type Config struct { FM FMConfig `json:"fm"` Backend BackendConfig `json:"backend"` Control ControlConfig `json:"control"` + Runtime RuntimeConfig `json:"runtime"` } type AudioConfig struct { @@ -35,18 +36,18 @@ type RDSConfig struct { type FMConfig struct { FrequencyMHz float64 `json:"frequencyMHz"` StereoEnabled bool `json:"stereoEnabled"` - PilotLevel float64 `json:"pilotLevel"` // fraction of ±75kHz deviation (0.09 = 9%, ITU standard) - RDSInjection float64 `json:"rdsInjection"` // fraction of ±75kHz deviation (0.04 = 4%, typical) - PreEmphasisTauUS float64 `json:"preEmphasisTauUS"` // time constant in µs: 50 (EU) or 75 (US), 0=off + PilotLevel float64 `json:"pilotLevel"` // fraction of ±75kHz deviation (0.09 = 9%, ITU standard) + RDSInjection float64 `json:"rdsInjection"` // fraction of ±75kHz deviation (0.04 = 4%, typical) + PreEmphasisTauUS float64 `json:"preEmphasisTauUS"` // time constant in µs: 50 (EU) or 75 (US), 0=off OutputDrive float64 `json:"outputDrive"` - CompositeRateHz int `json:"compositeRateHz"` // internal DSP/MPX sample rate + CompositeRateHz int `json:"compositeRateHz"` // internal DSP/MPX sample rate MaxDeviationHz float64 `json:"maxDeviationHz"` LimiterEnabled bool `json:"limiterEnabled"` LimiterCeiling float64 `json:"limiterCeiling"` FMModulationEnabled bool `json:"fmModulationEnabled"` - MpxGain float64 `json:"mpxGain"` // hardware calibration: scales entire composite output (default 1.0) - BS412Enabled bool `json:"bs412Enabled"` // ITU-R BS.412 MPX power limiter (EU requirement) - BS412ThresholdDBr float64 `json:"bs412ThresholdDBr"` // power limit in dBr (0 = standard, +3 = relaxed) + MpxGain float64 `json:"mpxGain"` // hardware calibration: scales entire composite output (default 1.0) + BS412Enabled bool `json:"bs412Enabled"` // ITU-R BS.412 MPX power limiter (EU requirement) + BS412ThresholdDBr float64 `json:"bs412ThresholdDBr"` // power limit in dBr (0 = standard, +3 = relaxed) } type BackendConfig struct { @@ -63,6 +64,10 @@ type ControlConfig struct { ListenAddress string `json:"listenAddress"` } +type RuntimeConfig struct { + FrameQueueCapacity int `json:"frameQueueCapacity"` +} + func Default() Config { return Config{ Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0.4}, @@ -83,6 +88,7 @@ func Default() Config { }, Backend: BackendConfig{Kind: "file", OutputPath: "build/out/composite.f32"}, Control: ControlConfig{ListenAddress: "127.0.0.1:8088"}, + Runtime: RuntimeConfig{FrameQueueCapacity: 3}, } } @@ -150,7 +156,9 @@ func (c Config) Validate() error { if c.FM.LimiterCeiling < 0 || c.FM.LimiterCeiling > 2 { return fmt.Errorf("fm.limiterCeiling out of range") } - if c.FM.MpxGain == 0 { c.FM.MpxGain = 1.0 } // default if omitted from JSON + if c.FM.MpxGain == 0 { + c.FM.MpxGain = 1.0 + } // default if omitted from JSON if c.FM.MpxGain < 0.1 || c.FM.MpxGain > 5 { return fmt.Errorf("fm.mpxGain out of range (0.1..5)") } @@ -163,6 +171,9 @@ func (c Config) Validate() error { if c.Control.ListenAddress == "" { return fmt.Errorf("control.listenAddress is required") } + if c.Runtime.FrameQueueCapacity <= 0 { + return fmt.Errorf("runtime.frameQueueCapacity must be > 0") + } // Fail-loud PI validation if c.RDS.Enabled { if _, err := ParsePI(c.RDS.PI); err != nil { diff --git a/internal/output/frame_queue.go b/internal/output/frame_queue.go new file mode 100644 index 0000000..22b6d38 --- /dev/null +++ b/internal/output/frame_queue.go @@ -0,0 +1,211 @@ +package output + +import ( + "context" + "errors" + "sync" +) + +// ErrFrameQueueClosed is returned when a queue operation is attempted after the queue +// has been closed. +var ErrFrameQueueClosed = errors.New("frame queue closed") + +// QueueStats exposes the runtime state of a frame queue. +type QueueStats struct { + Capacity int `json:"capacity"` + Depth int `json:"depth"` + FillLevel float64 `json:"fillLevel"` + HighWaterMark int `json:"highWaterMark"` + LowWaterMark int `json:"lowWaterMark"` + PushTimeouts uint64 `json:"pushTimeouts"` + PopTimeouts uint64 `json:"popTimeouts"` + DroppedFrames uint64 `json:"droppedFrames"` + RepeatedFrames uint64 `json:"repeatedFrames"` + MutedFrames uint64 `json:"mutedFrames"` +} + +// FrameQueue is a bounded ring that holds CompositeFrame instances between the +// generator and the writer. Push blocks when the queue is full until space +// becomes available or the provided context is cancelled. Pop blocks when the +// queue is empty until a new frame arrives or the context is cancelled. +type FrameQueue struct { + capacity int + ch chan *CompositeFrame + + mu sync.Mutex + depth int + highWaterMark int + lowWaterMark int + pushTimeouts uint64 + popTimeouts uint64 + dropped uint64 + repeated uint64 + muted uint64 + closed bool + + closeOnce sync.Once +} + +// NewFrameQueue builds a bounded queue that holds up to capacity frames. +func NewFrameQueue(capacity int) *FrameQueue { + if capacity <= 0 { + capacity = 1 + } + fq := &FrameQueue{ + capacity: capacity, + ch: make(chan *CompositeFrame, capacity), + lowWaterMark: capacity, + } + fq.trackDepth(0) + return fq +} + +// Capacity returns the fixed frame capacity of the queue. +func (q *FrameQueue) Capacity() int { + return q.capacity +} + +// FillLevel reports the current occupancy as a fraction of capacity. +func (q *FrameQueue) FillLevel() float64 { + q.mu.Lock() + depth := q.depth + q.mu.Unlock() + if q.capacity == 0 { + return 0 + } + return float64(depth) / float64(q.capacity) +} + +// Depth returns the current number of frames in the queue. +func (q *FrameQueue) Depth() int { + q.mu.Lock() + depth := q.depth + q.mu.Unlock() + return depth +} + +// Stats returns a snapshot of the queue metrics. +func (q *FrameQueue) Stats() QueueStats { + q.mu.Lock() + stats := QueueStats{ + Capacity: q.capacity, + Depth: q.depth, + FillLevel: q.fillLevelLocked(), + HighWaterMark: q.highWaterMark, + LowWaterMark: q.lowWaterMark, + PushTimeouts: q.pushTimeouts, + PopTimeouts: q.popTimeouts, + DroppedFrames: q.dropped, + RepeatedFrames: q.repeated, + MutedFrames: q.muted, + } + q.mu.Unlock() + return stats +} + +// Push enqueues a frame, blocking until space is available or ctx is done. +func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error { + if frame == nil { + return errors.New("frame required") + } + if q.isClosed() { + return ErrFrameQueueClosed + } + + select { + case q.ch <- frame: + q.updateDepth(+1) + return nil + case <-ctx.Done(): + q.recordPushTimeout() + return ctx.Err() + } +} + +// Pop removes a frame, blocking until one is available or ctx signals done. +func (q *FrameQueue) Pop(ctx context.Context) (*CompositeFrame, error) { + select { + case frame, ok := <-q.ch: + if !ok { + return nil, ErrFrameQueueClosed + } + q.updateDepth(-1) + return frame, nil + case <-ctx.Done(): + q.recordPopTimeout() + return nil, ctx.Err() + } +} + +// Close marks the queue as closed and wakes up blocked callers. +func (q *FrameQueue) Close() { + q.closeOnce.Do(func() { + q.mu.Lock() + q.closed = true + q.mu.Unlock() + close(q.ch) + }) +} + +// RecordDrop increments the drop counter for instrumentation. +func (q *FrameQueue) RecordDrop() { + q.mu.Lock() + q.dropped++ + q.mu.Unlock() +} + +// RecordRepeat increments the repeat counter for instrumentation. +func (q *FrameQueue) RecordRepeat() { + q.mu.Lock() + q.repeated++ + q.mu.Unlock() +} + +// RecordMute increments the mute counter for instrumentation. +func (q *FrameQueue) RecordMute() { + q.mu.Lock() + q.muted++ + q.mu.Unlock() +} + +func (q *FrameQueue) isClosed() bool { + q.mu.Lock() + closed := q.closed + q.mu.Unlock() + return closed +} + +func (q *FrameQueue) updateDepth(delta int) { + q.mu.Lock() + q.depth += delta + q.trackDepth(q.depth) + q.mu.Unlock() +} + +func (q *FrameQueue) trackDepth(depth int) { + if depth > q.highWaterMark { + q.highWaterMark = depth + } + if depth < q.lowWaterMark { + q.lowWaterMark = depth + } +} + +func (q *FrameQueue) fillLevelLocked() float64 { + if q.capacity == 0 { + return 0 + } + return float64(q.depth) / float64(q.capacity) +} + +func (q *FrameQueue) recordPushTimeout() { + q.mu.Lock() + q.pushTimeouts++ + q.mu.Unlock() +} + +func (q *FrameQueue) recordPopTimeout() { + q.mu.Lock() + q.popTimeouts++ + q.mu.Unlock() +} diff --git a/internal/output/frame_queue_test.go b/internal/output/frame_queue_test.go new file mode 100644 index 0000000..90f3460 --- /dev/null +++ b/internal/output/frame_queue_test.go @@ -0,0 +1,82 @@ +package output + +import ( + "context" + "testing" + "time" +) + +func TestFrameQueuePushPop(t *testing.T) { + q := NewFrameQueue(2) + ctx := context.Background() + + frame := &CompositeFrame{Sequence: 1} + if err := q.Push(ctx, frame); err != nil { + t.Fatalf("push failed: %v", err) + } + if got := q.Depth(); got != 1 { + t.Fatalf("expected depth 1, got %d", got) + } + if got := q.FillLevel(); got <= 0 || got >= 1 { + t.Fatalf("unexpected fill level: %f", got) + } + + popped, err := q.Pop(ctx) + if err != nil { + t.Fatalf("pop failed: %v", err) + } + if popped != frame { + t.Fatal("popped frame differs from pushed frame") + } + if q.Depth() != 0 { + t.Fatalf("expected depth 0 after pop, got %d", q.Depth()) + } + + stats := q.Stats() + if stats.HighWaterMark == 0 { + t.Fatal("expected high water mark to track push") + } + if stats.LowWaterMark != 0 { + t.Fatalf("expected low water mark 0, got %d", stats.LowWaterMark) + } +} + +func TestFrameQueuePushTimeout(t *testing.T) { + q := NewFrameQueue(1) + ctx := context.Background() + frame := &CompositeFrame{Sequence: 42} + + if err := q.Push(ctx, frame); err != nil { + t.Fatalf("initial push: %v", err) + } + + shortCtx, cancel := context.WithTimeout(ctx, 5*time.Millisecond) + defer cancel() + if err := q.Push(shortCtx, frame); err == nil { + t.Fatalf("expected timeout when pushing into full queue") + } + stats := q.Stats() + if stats.PushTimeouts == 0 { + t.Fatalf("expected push timeout counter to increment, got %d", stats.PushTimeouts) + } + + _, _ = q.Pop(ctx) +} + +func TestFrameQueueCounters(t *testing.T) { + q := NewFrameQueue(1) + q.RecordDrop() + q.RecordRepeat() + q.RecordMute() + + stats := q.Stats() + if stats.DroppedFrames != 1 { + t.Fatalf("expected 1 drop, got %d", stats.DroppedFrames) + } + if stats.RepeatedFrames != 1 { + t.Fatalf("expected 1 repeat, got %d", stats.RepeatedFrames) + } + if stats.MutedFrames != 1 { + t.Fatalf("expected 1 mute, got %d", stats.MutedFrames) + } +}