diff --git a/internal/app/engine.go b/internal/app/engine.go index a269cd8..34fbaaf 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -81,6 +81,8 @@ type EngineStats struct { MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"` MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"` MaxWriteMs float64 `json:"maxWriteMs,omitempty"` + MaxQueueResidenceMs float64 `json:"maxQueueResidenceMs,omitempty"` + MaxPipelineLatencyMs float64 `json:"maxPipelineLatencyMs,omitempty"` Queue output.QueueStats `json:"queue"` RuntimeIndicator RuntimeIndicator `json:"runtimeIndicator"` RuntimeAlert string `json:"runtimeAlert,omitempty"` @@ -152,6 +154,8 @@ type Engine struct { maxGenerateNs atomic.Uint64 maxUpsampleNs atomic.Uint64 maxWriteNs atomic.Uint64 + maxQueueResidenceNs atomic.Uint64 + maxPipelineNs atomic.Uint64 lastError atomic.Value // string lastFault atomic.Value // *FaultEvent faultHistoryMu sync.Mutex @@ -429,6 +433,8 @@ func (e *Engine) Stats() EngineStats { MaxGenerateMs: durationMs(e.maxGenerateNs.Load()), MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()), MaxWriteMs: durationMs(e.maxWriteNs.Load()), + MaxQueueResidenceMs: durationMs(e.maxQueueResidenceNs.Load()), + MaxPipelineLatencyMs: durationMs(e.maxPipelineNs.Load()), Queue: queue, RuntimeIndicator: ri, RuntimeAlert: runtimeAlert(queue.Health, hasRecentLateBuffers), @@ -515,6 +521,7 @@ func (e *Engine) run(ctx context.Context) { updateMaxDuration(&e.maxUpsampleNs, upDur) enqueued := cloneFrame(frame) + enqueued.EnqueuedAt = time.Now() if enqueued == nil { e.lastError.Store("engine: frame clone failed") e.underruns.Add(1) @@ -558,26 +565,35 @@ func (e *Engine) writerLoop(ctx context.Context) { continue } + frame.DequeuedAt = time.Now() + queueResidence := time.Duration(0) + if !frame.EnqueuedAt.IsZero() { + queueResidence = frame.DequeuedAt.Sub(frame.EnqueuedAt) + } + writeStart := time.Now() + frame.WriteStartedAt = writeStart n, err := e.driver.Write(ctx, frame) writeDur := time.Since(writeStart) - cycleDur := writeDur + pipelineLatency := writeDur if !frame.GeneratedAt.IsZero() { - cycleDur = time.Since(frame.GeneratedAt) + pipelineLatency = time.Since(frame.GeneratedAt) } updateMaxDuration(&e.maxWriteNs, writeDur) - updateMaxDuration(&e.maxCycleNs, cycleDur) + updateMaxDuration(&e.maxQueueResidenceNs, queueResidence) + updateMaxDuration(&e.maxPipelineNs, pipelineLatency) + updateMaxDuration(&e.maxCycleNs, writeDur) queueStats := e.frameQueue.Stats() e.evaluateRuntimeState(queueStats, e.hasRecentLateBuffers()) - if cycleDur > e.chunkDuration { + if writeDur > e.chunkDuration { late := e.lateBuffers.Add(1) e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano())) if late <= 5 || late%20 == 0 { - log.Printf("TX LATE: cycle=%s budget=%s write=%s over=%s", - cycleDur, e.chunkDuration, writeDur, cycleDur-e.chunkDuration) + log.Printf("TX LATE: write=%s budget=%s over=%s queueResidence=%s pipeline=%s", + writeDur, e.chunkDuration, writeDur-e.chunkDuration, queueResidence, pipelineLatency) } } @@ -607,11 +623,14 @@ func cloneFrame(src *output.CompositeFrame) *output.CompositeFrame { samples := make([]output.IQSample, len(src.Samples)) copy(samples, src.Samples) return &output.CompositeFrame{ - Samples: samples, - SampleRateHz: src.SampleRateHz, - Timestamp: src.Timestamp, - GeneratedAt: src.GeneratedAt, - Sequence: src.Sequence, + Samples: samples, + SampleRateHz: src.SampleRateHz, + Timestamp: src.Timestamp, + GeneratedAt: src.GeneratedAt, + EnqueuedAt: src.EnqueuedAt, + DequeuedAt: src.DequeuedAt, + WriteStartedAt: src.WriteStartedAt, + Sequence: src.Sequence, } } diff --git a/internal/output/backend.go b/internal/output/backend.go index bbc0171..94fb3c8 100644 --- a/internal/output/backend.go +++ b/internal/output/backend.go @@ -16,11 +16,14 @@ type IQSample struct { // CompositeFrame carries a block of MPX/IQ samples along with timing metadata. type CompositeFrame struct { - Samples []IQSample - SampleRateHz float64 - Timestamp time.Time - GeneratedAt time.Time - Sequence uint64 + Samples []IQSample + SampleRateHz float64 + Timestamp time.Time + GeneratedAt time.Time + EnqueuedAt time.Time + DequeuedAt time.Time + WriteStartedAt time.Time + Sequence uint64 } // BackendConfig describes the properties for a backend instance.