diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index a25a9b8..9e7a5fa 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -238,14 +238,22 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a type txBridge struct{ engine *apppkg.Engine } -func (b *txBridge) StartTX() error { return b.engine.Start(context.Background()) } -func (b *txBridge) StopTX() error { return b.engine.Stop(context.Background()) } +func (b *txBridge) StartTX() error { return b.engine.Start(context.Background()) } +func (b *txBridge) StopTX() error { return b.engine.Stop(context.Background()) } func (b *txBridge) TXStats() map[string]any { s := b.engine.Stats() return map[string]any{ - "state": s.State, "chunksProduced": s.ChunksProduced, - "totalSamples": s.TotalSamples, "underruns": s.Underruns, - "lastError": s.LastError, "uptimeSeconds": s.UptimeSeconds, + "state": s.State, + "chunksProduced": s.ChunksProduced, + "totalSamples": s.TotalSamples, + "underruns": s.Underruns, + "lateBuffers": s.LateBuffers, + "lastError": s.LastError, + "uptimeSeconds": s.UptimeSeconds, + "maxCycleMs": s.MaxCycleMs, + "maxGenerateMs": s.MaxGenerateMs, + "maxUpsampleMs": s.MaxUpsampleMs, + "maxWriteMs": s.MaxWriteMs, } } func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error { diff --git a/internal/app/engine.go b/internal/app/engine.go index 3e84b34..8928be0 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -36,13 +36,35 @@ func (s EngineState) String() string { } } +func updateMaxDuration(dst *atomic.Uint64, d time.Duration) { + v := uint64(d) + for { + cur := dst.Load() + if v <= cur { + return + } + if dst.CompareAndSwap(cur, v) { + return + } + } +} + +func durationMs(ns uint64) float64 { + return float64(ns) / float64(time.Millisecond) +} + type EngineStats struct { State string `json:"state"` ChunksProduced uint64 `json:"chunksProduced"` TotalSamples uint64 `json:"totalSamples"` Underruns uint64 `json:"underruns"` + LateBuffers uint64 `json:"lateBuffers,omitempty"` LastError string `json:"lastError,omitempty"` UptimeSeconds float64 `json:"uptimeSeconds"` + MaxCycleMs float64 `json:"maxCycleMs,omitempty"` + MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"` + MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"` + MaxWriteMs float64 `json:"maxWriteMs,omitempty"` } // Engine is the continuous TX loop. It generates composite IQ in chunks, @@ -67,6 +89,11 @@ type Engine struct { chunksProduced atomic.Uint64 totalSamples atomic.Uint64 underruns atomic.Uint64 + lateBuffers atomic.Uint64 + maxCycleNs atomic.Uint64 + maxGenerateNs atomic.Uint64 + maxUpsampleNs atomic.Uint64 + maxWriteNs atomic.Uint64 lastError atomic.Value // string // Live config: pending frequency change, applied between chunks @@ -205,8 +232,12 @@ func (e *Engine) UpdateConfig(u LiveConfigUpdate) error { if u.PS != nil || u.RadioText != nil { if enc := e.generator.RDSEncoder(); enc != nil { ps, rt := "", "" - if u.PS != nil { ps = *u.PS } - if u.RadioText != nil { rt = *u.RadioText } + if u.PS != nil { + ps = *u.PS + } + if u.RadioText != nil { + rt = *u.RadioText + } enc.UpdateText(ps, rt) } } @@ -216,13 +247,27 @@ func (e *Engine) UpdateConfig(u LiveConfigUpdate) error { current := e.generator.CurrentLiveParams() next := current // copy - if u.OutputDrive != nil { next.OutputDrive = *u.OutputDrive } - if u.StereoEnabled != nil { next.StereoEnabled = *u.StereoEnabled } - if u.PilotLevel != nil { next.PilotLevel = *u.PilotLevel } - if u.RDSInjection != nil { next.RDSInjection = *u.RDSInjection } - if u.RDSEnabled != nil { next.RDSEnabled = *u.RDSEnabled } - if u.LimiterEnabled != nil { next.LimiterEnabled = *u.LimiterEnabled } - if u.LimiterCeiling != nil { next.LimiterCeiling = *u.LimiterCeiling } + if u.OutputDrive != nil { + next.OutputDrive = *u.OutputDrive + } + if u.StereoEnabled != nil { + next.StereoEnabled = *u.StereoEnabled + } + if u.PilotLevel != nil { + next.PilotLevel = *u.PilotLevel + } + if u.RDSInjection != nil { + next.RDSInjection = *u.RDSInjection + } + if u.RDSEnabled != nil { + next.RDSEnabled = *u.RDSEnabled + } + if u.LimiterEnabled != nil { + next.LimiterEnabled = *u.LimiterEnabled + } + if u.LimiterCeiling != nil { + next.LimiterCeiling = *u.LimiterCeiling + } e.generator.UpdateLive(next) return nil @@ -294,8 +339,13 @@ func (e *Engine) Stats() EngineStats { ChunksProduced: e.chunksProduced.Load(), TotalSamples: e.totalSamples.Load(), Underruns: e.underruns.Load(), + LateBuffers: e.lateBuffers.Load(), LastError: errVal, UptimeSeconds: uptime, + MaxCycleMs: durationMs(e.maxCycleNs.Load()), + MaxGenerateMs: durationMs(e.maxGenerateNs.Load()), + MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()), + MaxWriteMs: durationMs(e.maxWriteNs.Load()), } } @@ -315,13 +365,38 @@ func (e *Engine) run(ctx context.Context) { } } + t0 := time.Now() frame := e.generator.GenerateFrame(e.chunkDuration) + t1 := time.Now() if e.upsampler != nil { frame = e.upsampler.Process(frame) } + t2 := time.Now() n, err := e.driver.Write(ctx, frame) + t3 := time.Now() + + genDur := t1.Sub(t0) + upDur := t2.Sub(t1) + writeDur := t3.Sub(t2) + cycleDur := t3.Sub(t0) + + updateMaxDuration(&e.maxGenerateNs, genDur) + updateMaxDuration(&e.maxUpsampleNs, upDur) + updateMaxDuration(&e.maxWriteNs, writeDur) + updateMaxDuration(&e.maxCycleNs, cycleDur) + + if cycleDur > e.chunkDuration { + late := e.lateBuffers.Add(1) + if late <= 5 || late%20 == 0 { + log.Printf("TX LATE: cycle=%s budget=%s gen=%s up=%s write=%s over=%s", + cycleDur, e.chunkDuration, genDur, upDur, writeDur, cycleDur-e.chunkDuration) + } + } + if err != nil { - if ctx.Err() != nil { return } + if ctx.Err() != nil { + return + } e.lastError.Store(err.Error()) e.underruns.Add(1) // Back off to avoid pegging CPU on persistent errors diff --git a/internal/platform/plutosdr/pluto_linux.go b/internal/platform/plutosdr/pluto_linux.go index 206c0a1..55c2471 100644 --- a/internal/platform/plutosdr/pluto_linux.go +++ b/internal/platform/plutosdr/pluto_linux.go @@ -23,17 +23,34 @@ import ( "github.com/jan/fm-rds-tx/internal/platform" ) +func updateMaxDuration(dst *atomic.Uint64, d time.Duration) { + v := uint64(d) + for { + cur := dst.Load() + if v <= cur { + return + } + if dst.CompareAndSwap(cur, v) { + return + } + } +} + +func durationMs(ns uint64) float64 { + return float64(ns) / float64(time.Millisecond) +} + type PlutoDriver struct { mu sync.Mutex cfg platform.SoapyConfig - ctx *C.struct_iio_context - txDev *C.struct_iio_device - phyDev *C.struct_iio_device - chanI *C.struct_iio_channel - chanQ *C.struct_iio_channel - chanLO *C.struct_iio_channel - buf *C.struct_iio_buffer + ctx *C.struct_iio_context + txDev *C.struct_iio_device + phyDev *C.struct_iio_device + chanI *C.struct_iio_channel + chanQ *C.struct_iio_channel + chanLO *C.struct_iio_channel + buf *C.struct_iio_buffer bufSize int started bool @@ -41,6 +58,12 @@ type PlutoDriver struct { framesWritten atomic.Uint64 samplesWritten atomic.Uint64 underruns atomic.Uint64 + slowWrites atomic.Uint64 + slowFills atomic.Uint64 + slowPushes atomic.Uint64 + maxWriteNs atomic.Uint64 + maxFillNs atomic.Uint64 + maxPushNs atomic.Uint64 lastError string lastErrorAt string layoutLogged bool @@ -238,6 +261,7 @@ func (d *PlutoDriver) Write(_ context.Context, frame *output.CompositeFrame) (in } } + fillStart := time.Now() for i := 0; i < chunk; i++ { s := frame.Samples[written+i] *(*int16)(unsafe.Pointer(ptrI)) = int16(s.I * 32767) @@ -245,8 +269,47 @@ func (d *PlutoDriver) Write(_ context.Context, frame *output.CompositeFrame) (in ptrI += step ptrQ += step } + fillDur := time.Since(fillStart) + pushStart := time.Now() pushed := int(C.iio_buffer_push(buf)) + pushDur := time.Since(pushStart) + + chunkBudget := time.Duration(0) + if d.cfg.SampleRateHz > 0 { + chunkBudget = time.Duration(float64(chunk) / d.cfg.SampleRateHz * float64(time.Second)) + } + writeDur := fillDur + pushDur + + updateMaxDuration(&d.maxFillNs, fillDur) + updateMaxDuration(&d.maxPushNs, pushDur) + updateMaxDuration(&d.maxWriteNs, writeDur) + + slow := false + if chunkBudget > 0 { + if fillDur > chunkBudget/4 { + d.slowFills.Add(1) + slow = true + } + if pushDur > chunkBudget/2 { + d.slowPushes.Add(1) + slow = true + } + if writeDur > chunkBudget { + d.slowWrites.Add(1) + slow = true + } + } + if slow { + sw := d.slowWrites.Load() + sf := d.slowFills.Load() + sp := d.slowPushes.Load() + if sw <= 5 || sf <= 5 || sp <= 5 || sw%20 == 0 || sf%20 == 0 || sp%20 == 0 { + log.Printf("pluto-linux slow write: chunk=%d budget=%s fill=%s push=%s total=%s", + chunk, chunkBudget, fillDur, pushDur, writeDur) + } + } + if pushed < 0 { d.mu.Lock() d.lastError = fmt.Sprintf("buffer_push: %d", pushed) @@ -299,9 +362,15 @@ func (d *PlutoDriver) Stats() platform.RuntimeStats { FramesWritten: d.framesWritten.Load(), SamplesWritten: d.samplesWritten.Load(), Underruns: d.underruns.Load(), + SlowWrites: d.slowWrites.Load(), + SlowFills: d.slowFills.Load(), + SlowPushes: d.slowPushes.Load(), LastError: d.lastError, LastErrorAt: d.lastErrorAt, EffectiveRate: d.cfg.SampleRateHz, + MaxWriteMs: durationMs(d.maxWriteNs.Load()), + MaxFillMs: durationMs(d.maxFillNs.Load()), + MaxPushMs: durationMs(d.maxPushNs.Load()), } } diff --git a/internal/platform/soapy.go b/internal/platform/soapy.go index 8fe4845..df32635 100644 --- a/internal/platform/soapy.go +++ b/internal/platform/soapy.go @@ -33,9 +33,15 @@ type RuntimeStats struct { SamplesWritten uint64 `json:"samplesWritten"` Underruns uint64 `json:"underruns"` Overruns uint64 `json:"overruns"` + SlowWrites uint64 `json:"slowWrites,omitempty"` + SlowFills uint64 `json:"slowFills,omitempty"` + SlowPushes uint64 `json:"slowPushes,omitempty"` LastError string `json:"lastError,omitempty"` LastErrorAt string `json:"lastErrorAt,omitempty"` EffectiveRate float64 `json:"effectiveSampleRateHz"` + MaxWriteMs float64 `json:"maxWriteMs,omitempty"` + MaxFillMs float64 `json:"maxFillMs,omitempty"` + MaxPushMs float64 `json:"maxPushMs,omitempty"` } // ----------------------------------------------------------------------- @@ -133,14 +139,14 @@ func (sb *SoapyBackend) Driver() SoapyDriver { // ----------------------------------------------------------------------- type SimulatedDriver struct { - mu sync.Mutex - fallback output.Backend - cfg SoapyConfig - started bool - framesWritten atomic.Uint64 + mu sync.Mutex + fallback output.Backend + cfg SoapyConfig + started bool + framesWritten atomic.Uint64 samplesWritten atomic.Uint64 - lastError string - lastErrorAt string + lastError string + lastErrorAt string } func NewSimulatedDriver(writer output.Backend) *SimulatedDriver {