Browse Source

Merge branch 'sbc-linux'

tags/v0.9.0
Jan Svabenik 1 month ago
parent
commit
5415ae480e
7 changed files with 188 additions and 30 deletions
  1. +13
    -5
      cmd/fmrtx/main.go
  2. BIN
      docs/ourdev_680886RNY3D8.pdf
  3. BIN
      docs/rd040912.pdf
  4. +85
    -10
      internal/app/engine.go
  5. +76
    -7
      internal/platform/plutosdr/pluto_linux.go
  6. +13
    -7
      internal/platform/soapy.go
  7. +1
    -1
      stream_tx.bat

+ 13
- 5
cmd/fmrtx/main.go View File

@@ -238,14 +238,22 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a


type txBridge struct{ engine *apppkg.Engine } type txBridge struct{ engine *apppkg.Engine }


func (b *txBridge) StartTX() error { return b.engine.Start(context.Background()) }
func (b *txBridge) StopTX() error { return b.engine.Stop(context.Background()) }
func (b *txBridge) StartTX() error { return b.engine.Start(context.Background()) }
func (b *txBridge) StopTX() error { return b.engine.Stop(context.Background()) }
func (b *txBridge) TXStats() map[string]any { func (b *txBridge) TXStats() map[string]any {
s := b.engine.Stats() s := b.engine.Stats()
return map[string]any{ return map[string]any{
"state": s.State, "chunksProduced": s.ChunksProduced,
"totalSamples": s.TotalSamples, "underruns": s.Underruns,
"lastError": s.LastError, "uptimeSeconds": s.UptimeSeconds,
"state": s.State,
"chunksProduced": s.ChunksProduced,
"totalSamples": s.TotalSamples,
"underruns": s.Underruns,
"lateBuffers": s.LateBuffers,
"lastError": s.LastError,
"uptimeSeconds": s.UptimeSeconds,
"maxCycleMs": s.MaxCycleMs,
"maxGenerateMs": s.MaxGenerateMs,
"maxUpsampleMs": s.MaxUpsampleMs,
"maxWriteMs": s.MaxWriteMs,
} }
} }
func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error { func (b *txBridge) UpdateConfig(lp ctrlpkg.LivePatch) error {


BIN
docs/ourdev_680886RNY3D8.pdf View File


BIN
docs/rd040912.pdf View File


+ 85
- 10
internal/app/engine.go View File

@@ -36,13 +36,35 @@ func (s EngineState) String() string {
} }
} }


func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
v := uint64(d)
for {
cur := dst.Load()
if v <= cur {
return
}
if dst.CompareAndSwap(cur, v) {
return
}
}
}

func durationMs(ns uint64) float64 {
return float64(ns) / float64(time.Millisecond)
}

type EngineStats struct { type EngineStats struct {
State string `json:"state"` State string `json:"state"`
ChunksProduced uint64 `json:"chunksProduced"` ChunksProduced uint64 `json:"chunksProduced"`
TotalSamples uint64 `json:"totalSamples"` TotalSamples uint64 `json:"totalSamples"`
Underruns uint64 `json:"underruns"` Underruns uint64 `json:"underruns"`
LateBuffers uint64 `json:"lateBuffers,omitempty"`
LastError string `json:"lastError,omitempty"` LastError string `json:"lastError,omitempty"`
UptimeSeconds float64 `json:"uptimeSeconds"` UptimeSeconds float64 `json:"uptimeSeconds"`
MaxCycleMs float64 `json:"maxCycleMs,omitempty"`
MaxGenerateMs float64 `json:"maxGenerateMs,omitempty"`
MaxUpsampleMs float64 `json:"maxUpsampleMs,omitempty"`
MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
} }


// Engine is the continuous TX loop. It generates composite IQ in chunks, // Engine is the continuous TX loop. It generates composite IQ in chunks,
@@ -67,6 +89,11 @@ type Engine struct {
chunksProduced atomic.Uint64 chunksProduced atomic.Uint64
totalSamples atomic.Uint64 totalSamples atomic.Uint64
underruns atomic.Uint64 underruns atomic.Uint64
lateBuffers atomic.Uint64
maxCycleNs atomic.Uint64
maxGenerateNs atomic.Uint64
maxUpsampleNs atomic.Uint64
maxWriteNs atomic.Uint64
lastError atomic.Value // string lastError atomic.Value // string


// Live config: pending frequency change, applied between chunks // Live config: pending frequency change, applied between chunks
@@ -205,8 +232,12 @@ func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
if u.PS != nil || u.RadioText != nil { if u.PS != nil || u.RadioText != nil {
if enc := e.generator.RDSEncoder(); enc != nil { if enc := e.generator.RDSEncoder(); enc != nil {
ps, rt := "", "" ps, rt := "", ""
if u.PS != nil { ps = *u.PS }
if u.RadioText != nil { rt = *u.RadioText }
if u.PS != nil {
ps = *u.PS
}
if u.RadioText != nil {
rt = *u.RadioText
}
enc.UpdateText(ps, rt) enc.UpdateText(ps, rt)
} }
} }
@@ -216,13 +247,27 @@ func (e *Engine) UpdateConfig(u LiveConfigUpdate) error {
current := e.generator.CurrentLiveParams() current := e.generator.CurrentLiveParams()
next := current // copy next := current // copy


if u.OutputDrive != nil { next.OutputDrive = *u.OutputDrive }
if u.StereoEnabled != nil { next.StereoEnabled = *u.StereoEnabled }
if u.PilotLevel != nil { next.PilotLevel = *u.PilotLevel }
if u.RDSInjection != nil { next.RDSInjection = *u.RDSInjection }
if u.RDSEnabled != nil { next.RDSEnabled = *u.RDSEnabled }
if u.LimiterEnabled != nil { next.LimiterEnabled = *u.LimiterEnabled }
if u.LimiterCeiling != nil { next.LimiterCeiling = *u.LimiterCeiling }
if u.OutputDrive != nil {
next.OutputDrive = *u.OutputDrive
}
if u.StereoEnabled != nil {
next.StereoEnabled = *u.StereoEnabled
}
if u.PilotLevel != nil {
next.PilotLevel = *u.PilotLevel
}
if u.RDSInjection != nil {
next.RDSInjection = *u.RDSInjection
}
if u.RDSEnabled != nil {
next.RDSEnabled = *u.RDSEnabled
}
if u.LimiterEnabled != nil {
next.LimiterEnabled = *u.LimiterEnabled
}
if u.LimiterCeiling != nil {
next.LimiterCeiling = *u.LimiterCeiling
}


e.generator.UpdateLive(next) e.generator.UpdateLive(next)
return nil return nil
@@ -294,8 +339,13 @@ func (e *Engine) Stats() EngineStats {
ChunksProduced: e.chunksProduced.Load(), ChunksProduced: e.chunksProduced.Load(),
TotalSamples: e.totalSamples.Load(), TotalSamples: e.totalSamples.Load(),
Underruns: e.underruns.Load(), Underruns: e.underruns.Load(),
LateBuffers: e.lateBuffers.Load(),
LastError: errVal, LastError: errVal,
UptimeSeconds: uptime, UptimeSeconds: uptime,
MaxCycleMs: durationMs(e.maxCycleNs.Load()),
MaxGenerateMs: durationMs(e.maxGenerateNs.Load()),
MaxUpsampleMs: durationMs(e.maxUpsampleNs.Load()),
MaxWriteMs: durationMs(e.maxWriteNs.Load()),
} }
} }


@@ -315,13 +365,38 @@ func (e *Engine) run(ctx context.Context) {
} }
} }


t0 := time.Now()
frame := e.generator.GenerateFrame(e.chunkDuration) frame := e.generator.GenerateFrame(e.chunkDuration)
t1 := time.Now()
if e.upsampler != nil { if e.upsampler != nil {
frame = e.upsampler.Process(frame) frame = e.upsampler.Process(frame)
} }
t2 := time.Now()
n, err := e.driver.Write(ctx, frame) n, err := e.driver.Write(ctx, frame)
t3 := time.Now()

genDur := t1.Sub(t0)
upDur := t2.Sub(t1)
writeDur := t3.Sub(t2)
cycleDur := t3.Sub(t0)

updateMaxDuration(&e.maxGenerateNs, genDur)
updateMaxDuration(&e.maxUpsampleNs, upDur)
updateMaxDuration(&e.maxWriteNs, writeDur)
updateMaxDuration(&e.maxCycleNs, cycleDur)

if cycleDur > e.chunkDuration {
late := e.lateBuffers.Add(1)
if late <= 5 || late%20 == 0 {
log.Printf("TX LATE: cycle=%s budget=%s gen=%s up=%s write=%s over=%s",
cycleDur, e.chunkDuration, genDur, upDur, writeDur, cycleDur-e.chunkDuration)
}
}

if err != nil { if err != nil {
if ctx.Err() != nil { return }
if ctx.Err() != nil {
return
}
e.lastError.Store(err.Error()) e.lastError.Store(err.Error())
e.underruns.Add(1) e.underruns.Add(1)
// Back off to avoid pegging CPU on persistent errors // Back off to avoid pegging CPU on persistent errors


+ 76
- 7
internal/platform/plutosdr/pluto_linux.go View File

@@ -23,17 +23,34 @@ import (
"github.com/jan/fm-rds-tx/internal/platform" "github.com/jan/fm-rds-tx/internal/platform"
) )


func updateMaxDuration(dst *atomic.Uint64, d time.Duration) {
v := uint64(d)
for {
cur := dst.Load()
if v <= cur {
return
}
if dst.CompareAndSwap(cur, v) {
return
}
}
}

func durationMs(ns uint64) float64 {
return float64(ns) / float64(time.Millisecond)
}

type PlutoDriver struct { type PlutoDriver struct {
mu sync.Mutex mu sync.Mutex
cfg platform.SoapyConfig cfg platform.SoapyConfig


ctx *C.struct_iio_context
txDev *C.struct_iio_device
phyDev *C.struct_iio_device
chanI *C.struct_iio_channel
chanQ *C.struct_iio_channel
chanLO *C.struct_iio_channel
buf *C.struct_iio_buffer
ctx *C.struct_iio_context
txDev *C.struct_iio_device
phyDev *C.struct_iio_device
chanI *C.struct_iio_channel
chanQ *C.struct_iio_channel
chanLO *C.struct_iio_channel
buf *C.struct_iio_buffer
bufSize int bufSize int


started bool started bool
@@ -41,6 +58,12 @@ type PlutoDriver struct {
framesWritten atomic.Uint64 framesWritten atomic.Uint64
samplesWritten atomic.Uint64 samplesWritten atomic.Uint64
underruns atomic.Uint64 underruns atomic.Uint64
slowWrites atomic.Uint64
slowFills atomic.Uint64
slowPushes atomic.Uint64
maxWriteNs atomic.Uint64
maxFillNs atomic.Uint64
maxPushNs atomic.Uint64
lastError string lastError string
lastErrorAt string lastErrorAt string
layoutLogged bool layoutLogged bool
@@ -238,6 +261,7 @@ func (d *PlutoDriver) Write(_ context.Context, frame *output.CompositeFrame) (in
} }
} }


fillStart := time.Now()
for i := 0; i < chunk; i++ { for i := 0; i < chunk; i++ {
s := frame.Samples[written+i] s := frame.Samples[written+i]
*(*int16)(unsafe.Pointer(ptrI)) = int16(s.I * 32767) *(*int16)(unsafe.Pointer(ptrI)) = int16(s.I * 32767)
@@ -245,8 +269,47 @@ func (d *PlutoDriver) Write(_ context.Context, frame *output.CompositeFrame) (in
ptrI += step ptrI += step
ptrQ += step ptrQ += step
} }
fillDur := time.Since(fillStart)


pushStart := time.Now()
pushed := int(C.iio_buffer_push(buf)) pushed := int(C.iio_buffer_push(buf))
pushDur := time.Since(pushStart)

chunkBudget := time.Duration(0)
if d.cfg.SampleRateHz > 0 {
chunkBudget = time.Duration(float64(chunk) / d.cfg.SampleRateHz * float64(time.Second))
}
writeDur := fillDur + pushDur

updateMaxDuration(&d.maxFillNs, fillDur)
updateMaxDuration(&d.maxPushNs, pushDur)
updateMaxDuration(&d.maxWriteNs, writeDur)

slow := false
if chunkBudget > 0 {
if fillDur > chunkBudget/4 {
d.slowFills.Add(1)
slow = true
}
if pushDur > chunkBudget/2 {
d.slowPushes.Add(1)
slow = true
}
if writeDur > chunkBudget {
d.slowWrites.Add(1)
slow = true
}
}
if slow {
sw := d.slowWrites.Load()
sf := d.slowFills.Load()
sp := d.slowPushes.Load()
if sw <= 5 || sf <= 5 || sp <= 5 || sw%20 == 0 || sf%20 == 0 || sp%20 == 0 {
log.Printf("pluto-linux slow write: chunk=%d budget=%s fill=%s push=%s total=%s",
chunk, chunkBudget, fillDur, pushDur, writeDur)
}
}

if pushed < 0 { if pushed < 0 {
d.mu.Lock() d.mu.Lock()
d.lastError = fmt.Sprintf("buffer_push: %d", pushed) d.lastError = fmt.Sprintf("buffer_push: %d", pushed)
@@ -299,9 +362,15 @@ func (d *PlutoDriver) Stats() platform.RuntimeStats {
FramesWritten: d.framesWritten.Load(), FramesWritten: d.framesWritten.Load(),
SamplesWritten: d.samplesWritten.Load(), SamplesWritten: d.samplesWritten.Load(),
Underruns: d.underruns.Load(), Underruns: d.underruns.Load(),
SlowWrites: d.slowWrites.Load(),
SlowFills: d.slowFills.Load(),
SlowPushes: d.slowPushes.Load(),
LastError: d.lastError, LastError: d.lastError,
LastErrorAt: d.lastErrorAt, LastErrorAt: d.lastErrorAt,
EffectiveRate: d.cfg.SampleRateHz, EffectiveRate: d.cfg.SampleRateHz,
MaxWriteMs: durationMs(d.maxWriteNs.Load()),
MaxFillMs: durationMs(d.maxFillNs.Load()),
MaxPushMs: durationMs(d.maxPushNs.Load()),
} }
} }




+ 13
- 7
internal/platform/soapy.go View File

@@ -33,9 +33,15 @@ type RuntimeStats struct {
SamplesWritten uint64 `json:"samplesWritten"` SamplesWritten uint64 `json:"samplesWritten"`
Underruns uint64 `json:"underruns"` Underruns uint64 `json:"underruns"`
Overruns uint64 `json:"overruns"` Overruns uint64 `json:"overruns"`
SlowWrites uint64 `json:"slowWrites,omitempty"`
SlowFills uint64 `json:"slowFills,omitempty"`
SlowPushes uint64 `json:"slowPushes,omitempty"`
LastError string `json:"lastError,omitempty"` LastError string `json:"lastError,omitempty"`
LastErrorAt string `json:"lastErrorAt,omitempty"` LastErrorAt string `json:"lastErrorAt,omitempty"`
EffectiveRate float64 `json:"effectiveSampleRateHz"` EffectiveRate float64 `json:"effectiveSampleRateHz"`
MaxWriteMs float64 `json:"maxWriteMs,omitempty"`
MaxFillMs float64 `json:"maxFillMs,omitempty"`
MaxPushMs float64 `json:"maxPushMs,omitempty"`
} }


// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
@@ -133,14 +139,14 @@ func (sb *SoapyBackend) Driver() SoapyDriver {
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------


type SimulatedDriver struct { type SimulatedDriver struct {
mu sync.Mutex
fallback output.Backend
cfg SoapyConfig
started bool
framesWritten atomic.Uint64
mu sync.Mutex
fallback output.Backend
cfg SoapyConfig
started bool
framesWritten atomic.Uint64
samplesWritten atomic.Uint64 samplesWritten atomic.Uint64
lastError string
lastErrorAt string
lastError string
lastErrorAt string
} }


func NewSimulatedDriver(writer output.Backend) *SimulatedDriver { func NewSimulatedDriver(writer output.Backend) *SimulatedDriver {


+ 1
- 1
stream_tx.bat View File

@@ -1,2 +1,2 @@
@echo off @echo off
ffmpeg -i "http://stream.srg-ssr.ch/m/drs3/mp3_128" -f s16le -ar 44100 -ac 2 - | fmrtx.exe --tx --tx-auto-start --audio-stdin --config docs/config.plutosdr.json
ffmpeg -i "http://svabi.ch:8443/stream" -f s16le -ar 44100 -ac 2 - | fmrtx.exe --tx --tx-auto-start --audio-stdin --config docs/config.plutosdr.json

Loading…
Cancel
Save