From 43cb4ad747fb9a6f4174f803d98cc13265919e22 Mon Sep 17 00:00:00 2001 From: Jan Svabenik Date: Sun, 5 Apr 2026 15:16:50 +0200 Subject: [PATCH] feat: introduce writer worker --- docs/pro-runtime-hardening-workboard.md | 20 +++++--- internal/app/engine.go | 62 +++++++++++++++++++------ internal/dsp/fmupsample.go | 3 +- internal/dsp/iqresample.go | 1 + internal/dsp/upsample.go | 1 + internal/output/backend.go | 1 + 6 files changed, 67 insertions(+), 21 deletions(-) diff --git a/docs/pro-runtime-hardening-workboard.md b/docs/pro-runtime-hardening-workboard.md index d8aee65..a1cd7d3 100644 --- a/docs/pro-runtime-hardening-workboard.md +++ b/docs/pro-runtime-hardening-workboard.md @@ -209,16 +209,22 @@ Generator/Upsampler und Hardwarewriter werden als getrennte Stufen mit kleinem, - Queue-Close erwartet, dass Generator/Writer vor dem Schließen stoppen, sonst droht Panik beim Schreiben. ### WS-01-T2 — Writer-Worker einführen -- **Status:** TODO -- **Owner:** offen +- **Status:** VERIFIED +- **Owner:** Lead Coderaffe - **Code-Orte:** - - `internal/app/engine.go` - - `internal/platform/*` + - `internal/app/engine.go` (run loop, `writerLoop`, `cloneFrame`, Stats) + - `internal/dsp/*` (FMUpsampler / Resampler copy `GeneratedAt` für Cycle-Metriken) - **Ziel:** - Nur noch ein dedizierter Worker besitzt `driver.Write()`. + Generator/Upsampler liefern Frames in die FrameQueue, `driver.Write()` läuft nur noch im dedizierten Writer. - **Akzeptanzpunkte:** - - Write-Latenz pro Frame messbar - - Timinginteraktionen klar isoliert + - `writerLoop()` ist die einzige Stelle mit `driver.Write()` und zieht aus der Queue. + - FrameQueue ist ein echter Puffer (Generator klont Frames, Writer poppt) und `EngineStats.Queue` zeigt den Füllstand. + - Write- und Cycle-Latenzen plus `LateBuffers` bleiben in `EngineStats` sichtbar (`MaxWriteMs`, `LateBuffers`, `MaxCycleMs`). +- **Nachweis:** + - `go test ./...` (Engine + Queue + DSP) läuft erfolgreich. + - `EngineStats` berichtet weiterhin über Queue-/Writer-Metriken. +- **Restrisiken:** + - Frame-Klonierung pro Chunk erhöht Heap-Pressure; spätere Workstreams sollten Pooling / Zero-Copy prüfen. ### WS-01-T3 — Supervisor-Schicht einführen - **Status:** TODO diff --git a/internal/app/engine.go b/internal/app/engine.go index 6904ec9..f1920d1 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -356,7 +356,10 @@ func (e *Engine) Stats() EngineStats { } func (e *Engine) run(ctx context.Context) { + e.wg.Add(1) + go e.writerLoop(ctx) defer e.wg.Done() + for { if ctx.Err() != nil { return @@ -373,13 +376,27 @@ func (e *Engine) run(ctx context.Context) { t0 := time.Now() frame := e.generator.GenerateFrame(e.chunkDuration) + frame.GeneratedAt = t0 t1 := time.Now() if e.upsampler != nil { frame = e.upsampler.Process(frame) + frame.GeneratedAt = t0 } t2 := time.Now() - if err := e.frameQueue.Push(ctx, frame); err != nil { + genDur := t1.Sub(t0) + upDur := t2.Sub(t1) + updateMaxDuration(&e.maxGenerateNs, genDur) + updateMaxDuration(&e.maxUpsampleNs, upDur) + + enqueued := cloneFrame(frame) + if enqueued == nil { + e.lastError.Store("engine: frame clone failed") + e.underruns.Add(1) + continue + } + + if err := e.frameQueue.Push(ctx, enqueued); err != nil { if ctx.Err() != nil { return } @@ -395,8 +412,13 @@ func (e *Engine) run(ctx context.Context) { } continue } + } +} - popFrame, err := e.frameQueue.Pop(ctx) +func (e *Engine) writerLoop(ctx context.Context) { + defer e.wg.Done() + for { + frame, err := e.frameQueue.Pop(ctx) if err != nil { if ctx.Err() != nil { return @@ -409,25 +431,23 @@ func (e *Engine) run(ctx context.Context) { continue } - t3 := time.Now() - n, err := e.driver.Write(ctx, popFrame) - t4 := time.Now() + writeStart := time.Now() + n, err := e.driver.Write(ctx, frame) + writeDur := time.Since(writeStart) - genDur := t1.Sub(t0) - upDur := t2.Sub(t1) - writeDur := t4.Sub(t3) - cycleDur := t4.Sub(t0) + cycleDur := writeDur + if !frame.GeneratedAt.IsZero() { + cycleDur = time.Since(frame.GeneratedAt) + } - updateMaxDuration(&e.maxGenerateNs, genDur) - updateMaxDuration(&e.maxUpsampleNs, upDur) updateMaxDuration(&e.maxWriteNs, writeDur) updateMaxDuration(&e.maxCycleNs, cycleDur) if cycleDur > e.chunkDuration { late := e.lateBuffers.Add(1) if late <= 5 || late%20 == 0 { - log.Printf("TX LATE: cycle=%s budget=%s gen=%s up=%s write=%s over=%s", - cycleDur, e.chunkDuration, genDur, upDur, writeDur, cycleDur-e.chunkDuration) + log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s", + cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration) } } @@ -444,7 +464,23 @@ func (e *Engine) run(ctx context.Context) { } continue } + e.chunksProduced.Add(1) e.totalSamples.Add(uint64(n)) } } + +func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame { + if src == nil { + return nil + } + samples := make([]output.IQSample, len(src.Samples)) + copy(samples, src.Samples) + return &output.CompositeFrame{ + Samples: samples, + SampleRateHz: src.SampleRateHz, + Timestamp: src.Timestamp, + GeneratedAt: src.GeneratedAt, + Sequence: src.Sequence, + } +} diff --git a/internal/dsp/fmupsample.go b/internal/dsp/fmupsample.go index c4bcf6c..998d556 100644 --- a/internal/dsp/fmupsample.go +++ b/internal/dsp/fmupsample.go @@ -147,7 +147,7 @@ func (u *FMUpsampler) Process(frame *output.CompositeFrame) *output.CompositeFra pos := u.srcPos n := 0 for pos < float64(srcLen) && n < maxOut { - vi := int(pos) // virtual index (integer part) + vi := int(pos) // virtual index (integer part) frac := pos - float64(vi) pA := phaseAt(vi) @@ -171,6 +171,7 @@ func (u *FMUpsampler) Process(frame *output.CompositeFrame) *output.CompositeFra u.outFrame.SampleRateHz = u.dstRate u.outFrame.Timestamp = frame.Timestamp u.outFrame.Sequence = frame.Sequence + u.outFrame.GeneratedAt = frame.GeneratedAt return &u.outFrame } diff --git a/internal/dsp/iqresample.go b/internal/dsp/iqresample.go index 4d1c044..a3d565e 100644 --- a/internal/dsp/iqresample.go +++ b/internal/dsp/iqresample.go @@ -54,6 +54,7 @@ func ResampleIQ(frame *output.CompositeFrame, targetRateHz float64) *output.Comp Samples: dst, SampleRateHz: targetRateHz, Timestamp: frame.Timestamp, + GeneratedAt: frame.GeneratedAt, Sequence: frame.Sequence, } } diff --git a/internal/dsp/upsample.go b/internal/dsp/upsample.go index b45b1ea..9f81549 100644 --- a/internal/dsp/upsample.go +++ b/internal/dsp/upsample.go @@ -76,6 +76,7 @@ func (u *FMPhaseUpsampler) Process(frame *output.CompositeFrame) *output.Composi Samples: dst, SampleRateHz: u.dstRate, Timestamp: frame.Timestamp, + GeneratedAt: frame.GeneratedAt, Sequence: frame.Sequence, } } diff --git a/internal/output/backend.go b/internal/output/backend.go index 38e5b82..bbc0171 100644 --- a/internal/output/backend.go +++ b/internal/output/backend.go @@ -19,6 +19,7 @@ type CompositeFrame struct { Samples []IQSample SampleRateHz float64 Timestamp time.Time + GeneratedAt time.Time Sequence uint64 }