Explorar el Código

Merge branch 'bugfix/icecast-write-blocked' into audio-ingest-rework

main
Jan hace 1 mes
padre
commit
bd301909ad
Se han modificado 8 ficheros con 681 adiciones y 66 borrados
  1. +1
    -0
      cmd/fmrtx/main.go
  2. +18
    -5
      internal/app/engine.go
  3. +88
    -38
      internal/audio/stream.go
  4. +3
    -4
      internal/ingest/adapters/icecast/source.go
  5. +51
    -0
      internal/ingest/adapters/icecast/source_test.go
  6. +311
    -12
      internal/ingest/runtime.go
  7. +201
    -1
      internal/ingest/runtime_test.go
  8. +8
    -6
      internal/ingest/stats.go

+ 1
- 0
cmd/fmrtx/main.go Ver fichero

@@ -193,6 +193,7 @@ func runTXMode(cfg cfgpkg.Config, configPath string, driver platform.SoapyDriver
log.Fatalf("ingest source: %v", err) log.Fatalf("ingest source: %v", err)
} }
runtimeOpts := []ingest.RuntimeOption{} runtimeOpts := []ingest.RuntimeOption{}
runtimeOpts = append(runtimeOpts, ingest.WithPrebufferMs(cfg.Ingest.PrebufferMs))
if cfg.Ingest.Icecast.RadioText.Enabled { if cfg.Ingest.Icecast.RadioText.Enabled {
relay := icecast.NewRadioTextRelay( relay := icecast.NewRadioTextRelay(
icecast.RadioTextOptions{ icecast.RadioTextOptions{


+ 18
- 5
internal/app/engine.go Ver fichero

@@ -113,13 +113,14 @@ type RuntimeTransition struct {
} }


const ( const (
lateBufferIndicatorWindow = 5 * time.Second
writeLateTolerance = 1 * time.Millisecond
lateBufferIndicatorWindow = 2 * time.Second
writeLateTolerance = 10 * time.Millisecond
queueCriticalStreakThreshold = 3 queueCriticalStreakThreshold = 3
queueMutedStreakThreshold = queueCriticalStreakThreshold * 2 queueMutedStreakThreshold = queueCriticalStreakThreshold * 2
queueMutedRecoveryThreshold = queueCriticalStreakThreshold queueMutedRecoveryThreshold = queueCriticalStreakThreshold
queueFaultedStreakThreshold = queueCriticalStreakThreshold queueFaultedStreakThreshold = queueCriticalStreakThreshold
faultRepeatWindow = 1 * time.Second faultRepeatWindow = 1 * time.Second
lateBufferStreakThreshold = 3 // consecutive late writes required before alerting
faultHistoryCapacity = 8 faultHistoryCapacity = 8
runtimeTransitionHistoryCapacity = 8 runtimeTransitionHistoryCapacity = 8
) )
@@ -150,6 +151,7 @@ type Engine struct {
underruns atomic.Uint64 underruns atomic.Uint64
lateBuffers atomic.Uint64 lateBuffers atomic.Uint64
lateBufferAlertAt atomic.Uint64 lateBufferAlertAt atomic.Uint64
lateBufferStreak atomic.Uint64 // consecutive late writes; reset on clean write
criticalStreak atomic.Uint64 criticalStreak atomic.Uint64
mutedRecoveryStreak atomic.Uint64 mutedRecoveryStreak atomic.Uint64
mutedFaultStreak atomic.Uint64 mutedFaultStreak atomic.Uint64
@@ -604,12 +606,23 @@ func (e *Engine) writerLoop(ctx context.Context) {


lateOver := writeDur - e.chunkDuration lateOver := writeDur - e.chunkDuration
if lateOver > writeLateTolerance { if lateOver > writeLateTolerance {
streak := e.lateBufferStreak.Add(1)
late := e.lateBuffers.Add(1) late := e.lateBuffers.Add(1)
e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
// Only arm the alert window once the streak threshold is reached.
// Isolated OS-scheduling or USB jitter spikes (single late writes)
// are normal on a loaded system and must not trigger degraded state.
// This mirrors the queue-health streak logic.
if streak >= lateBufferStreakThreshold {
e.lateBufferAlertAt.Store(uint64(time.Now().UnixNano()))
}
if late <= 5 || late%20 == 0 { if late <= 5 || late%20 == 0 {
log.Printf("TX LATE: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s",
writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency)
log.Printf("TX LATE [streak=%d]: write=%s budget=%s over=%s tolerance=%s queueResidence=%s pipeline=%s",
streak, writeDur, e.chunkDuration, lateOver, writeLateTolerance, queueResidence, pipelineLatency)
} }
} else {
// Clean write — reset the consecutive streak so isolated spikes
// never accumulate toward the threshold.
e.lateBufferStreak.Store(0)
} }


if err != nil { if err != nil {


+ 88
- 38
internal/audio/stream.go Ver fichero

@@ -12,19 +12,31 @@ import (
// goroutine reads them via NextFrame(). Returns silence on underrun. // goroutine reads them via NextFrame(). Returns silence on underrun.
// //
// Zero allocations in steady state. No mutex in the read or write path. // Zero allocations in steady state. No mutex in the read or write path.
//
// SampleRate is the nominal input sample rate. It may be updated at runtime
// via SetSampleRate once the actual decoded rate is known (e.g. when the first
// PCM chunk arrives from a compressed stream). Reads and writes to the sample
// rate are atomic so they are safe across goroutines.
type StreamSource struct { type StreamSource struct {
ring []Frame
size int
mask int // size-1, for fast modulo (size must be power of 2)
ring []Frame
size int
mask int // size-1, for fast modulo (size must be power of 2)

// SampleRate is kept as a plain int for backward compatibility with code
// that reads it before any goroutine races are possible (construction,
// logging). All hot-path code uses the atomic below.
SampleRate int SampleRate int


sampleRateAtomic atomic.Int32

writePos atomic.Int64 writePos atomic.Int64
readPos atomic.Int64 readPos atomic.Int64


Underruns atomic.Uint64 Underruns atomic.Uint64
Overflows atomic.Uint64 Overflows atomic.Uint64
Written atomic.Uint64 Written atomic.Uint64
highWatermark atomic.Int64

highWatermark atomic.Int64
underrunStreak atomic.Uint64 underrunStreak atomic.Uint64
maxUnderrunStreak atomic.Uint64 maxUnderrunStreak atomic.Uint64
} }
@@ -37,12 +49,29 @@ func NewStreamSource(capacity, sampleRate int) *StreamSource {
for size < capacity { for size < capacity {
size <<= 1 size <<= 1
} }
return &StreamSource{
s := &StreamSource{
ring: make([]Frame, size), ring: make([]Frame, size),
size: size, size: size,
mask: size - 1, mask: size - 1,
SampleRate: sampleRate, SampleRate: sampleRate,
} }
s.sampleRateAtomic.Store(int32(sampleRate))
return s
}

// SetSampleRate updates the sample rate atomically. Safe to call from any
// goroutine, including while the DSP goroutine is consuming frames via
// StreamResampler. The change takes effect on the very next NextFrame() call.
// Also updates the public SampleRate field for non-concurrent readers.
func (s *StreamSource) SetSampleRate(hz int) {
s.SampleRate = hz
s.sampleRateAtomic.Store(int32(hz))
}

// GetSampleRate returns the current sample rate via atomic load. Use this
// in hot paths / cross-goroutine reads instead of .SampleRate directly.
func (s *StreamSource) GetSampleRate() int {
return int(s.sampleRateAtomic.Load())
} }


// WriteFrame pushes a single frame into the ring buffer. // WriteFrame pushes a single frame into the ring buffer.
@@ -124,40 +153,41 @@ func (s *StreamSource) Stats() StreamStats {
currentStreak := int(s.underrunStreak.Load()) currentStreak := int(s.underrunStreak.Load())
maxStreak := int(s.maxUnderrunStreak.Load()) maxStreak := int(s.maxUnderrunStreak.Load())
return StreamStats{ return StreamStats{
Available: available,
Capacity: s.size,
Buffered: buffered,
BufferedDurationSeconds: s.bufferedDurationSeconds(available),
HighWatermark: highWatermark,
Available: available,
Capacity: s.size,
Buffered: buffered,
BufferedDurationSeconds: s.bufferedDurationSeconds(available),
HighWatermark: highWatermark,
HighWatermarkDurationSeconds: s.bufferedDurationSeconds(highWatermark), HighWatermarkDurationSeconds: s.bufferedDurationSeconds(highWatermark),
Written: s.Written.Load(),
Underruns: s.Underruns.Load(),
Overflows: s.Overflows.Load(),
UnderrunStreak: currentStreak,
MaxUnderrunStreak: maxStreak,
Written: s.Written.Load(),
Underruns: s.Underruns.Load(),
Overflows: s.Overflows.Load(),
UnderrunStreak: currentStreak,
MaxUnderrunStreak: maxStreak,
} }
} }


// StreamStats exposes runtime telemetry for the stream buffer. // StreamStats exposes runtime telemetry for the stream buffer.
type StreamStats struct { type StreamStats struct {
Available int `json:"available"`
Capacity int `json:"capacity"`
Buffered float64 `json:"buffered"`
BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"`
HighWatermark int `json:"highWatermark"`
Available int `json:"available"`
Capacity int `json:"capacity"`
Buffered float64 `json:"buffered"`
BufferedDurationSeconds float64 `json:"bufferedDurationSeconds"`
HighWatermark int `json:"highWatermark"`
HighWatermarkDurationSeconds float64 `json:"highWatermarkDurationSeconds"` HighWatermarkDurationSeconds float64 `json:"highWatermarkDurationSeconds"`
Written uint64 `json:"written"`
Underruns uint64 `json:"underruns"`
Overflows uint64 `json:"overflows"`
UnderrunStreak int `json:"underrunStreak"`
MaxUnderrunStreak int `json:"maxUnderrunStreak"`
Written uint64 `json:"written"`
Underruns uint64 `json:"underruns"`
Overflows uint64 `json:"overflows"`
UnderrunStreak int `json:"underrunStreak"`
MaxUnderrunStreak int `json:"maxUnderrunStreak"`
} }


func (s *StreamSource) bufferedDurationSeconds(available int) float64 { func (s *StreamSource) bufferedDurationSeconds(available int) float64 {
if s.SampleRate <= 0 {
rate := s.GetSampleRate()
if rate <= 0 {
return 0 return 0
} }
return float64(available) / float64(s.SampleRate)
return float64(available) / float64(rate)
} }


func (s *StreamSource) updateHighWatermark() { func (s *StreamSource) updateHighWatermark() {
@@ -195,33 +225,53 @@ func (s *StreamSource) resetUnderrunStreak() {
// StreamResampler wraps a StreamSource and rate-converts from the stream's // StreamResampler wraps a StreamSource and rate-converts from the stream's
// native sample rate to the target output rate using linear interpolation. // native sample rate to the target output rate using linear interpolation.
// Consumes input frames on demand — no buffering beyond the ring buffer. // Consumes input frames on demand — no buffering beyond the ring buffer.
//
// The input rate is read atomically from src on every NextFrame() call so
// that a SetSampleRate() from the ingest goroutine takes effect immediately,
// without any additional synchronisation. The pos accumulator is not reset
// on a rate change: this may produce a single glitch-free transient at the
// moment the rate is corrected, which is far preferable to playing the whole
// stream at the wrong pitch.
type StreamResampler struct { type StreamResampler struct {
src *StreamSource
ratio float64 // inputRate / outputRate (< 1 when upsampling)
pos float64
prev Frame
curr Frame
src *StreamSource
outputRate float64 // target composite rate, fixed for the lifetime of the resampler
pos float64
prev Frame
curr Frame
} }


// NewStreamResampler creates a streaming resampler. // NewStreamResampler creates a streaming resampler.
// outputRate is the fixed DSP composite rate. The input rate is taken from
// src.GetSampleRate() dynamically, so it will automatically track any
// subsequent SetSampleRate() call.
func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler { func NewStreamResampler(src *StreamSource, outputRate float64) *StreamResampler {
if src == nil || outputRate <= 0 || src.SampleRate <= 0 {
return &StreamResampler{src: src, ratio: 1.0}
if src == nil || outputRate <= 0 {
return &StreamResampler{src: src, outputRate: outputRate}
} }
return &StreamResampler{ return &StreamResampler{
src: src,
ratio: float64(src.SampleRate) / outputRate,
src: src,
outputRate: outputRate,
} }
} }


// NextFrame returns the next interpolated frame at the output rate. // NextFrame returns the next interpolated frame at the output rate.
// Implements the frameSource interface. // Implements the frameSource interface.
// The input/output ratio is recomputed on every call from the atomic sample
// rate so that runtime rate corrections via SetSampleRate are race-free.
func (r *StreamResampler) NextFrame() Frame { func (r *StreamResampler) NextFrame() Frame {
if r.src == nil { if r.src == nil {
return NewFrame(0, 0) return NewFrame(0, 0)
} }


// Consume input samples as the fractional position advances
// Compute ratio atomically so we see any SetSampleRate update immediately.
ratio := 1.0
if r.outputRate > 0 {
if inputRate := r.src.GetSampleRate(); inputRate > 0 {
ratio = float64(inputRate) / r.outputRate
}
}

// Consume input samples as the fractional position advances.
for r.pos >= 1.0 { for r.pos >= 1.0 {
r.prev = r.curr r.prev = r.curr
r.curr = r.src.ReadFrame() r.curr = r.src.ReadFrame()
@@ -231,7 +281,7 @@ func (r *StreamResampler) NextFrame() Frame {
frac := r.pos frac := r.pos
l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac l := float64(r.prev.L)*(1-frac) + float64(r.curr.L)*frac
ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac ri := float64(r.prev.R)*(1-frac) + float64(r.curr.R)*frac
r.pos += r.ratio
r.pos += ratio
return NewFrame(Sample(l), Sample(ri)) return NewFrame(Sample(l), Sample(ri))
} }




+ 3
- 4
internal/ingest/adapters/icecast/source.go Ver fichero

@@ -75,7 +75,9 @@ func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Op
id = "icecast-main" id = "icecast-main"
} }
if client == nil { if client == nil {
client = &http.Client{Timeout: 20 * time.Second}
// Streaming responses are long-lived; a global client timeout would
// terminate the body read after a fixed duration.
client = &http.Client{}
} }
s := &Source{ s := &Source{
id: id, id: id,
@@ -202,9 +204,6 @@ func (s *Source) loop(ctx context.Context) {
if err == nil { if err == nil {
err = errStreamEnded err = errStreamEnded
} }
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
s.connected.Store(false) s.connected.Store(false)
s.lastError.Store(err.Error()) s.lastError.Store(err.Error())
select { select {


+ 51
- 0
internal/ingest/adapters/icecast/source_test.go Ver fichero

@@ -511,6 +511,57 @@ func TestSourceClearsLastErrorAfterSuccessfulReconnect(t *testing.T) {
} }
} }


func TestNewWithoutClientUsesStreamingSafeHTTPClient(t *testing.T) {
src := New("ice-test", "http://example", nil, ReconnectConfig{})
if src.client == nil {
t.Fatal("expected default http client")
}
if src.client.Timeout != 0 {
t.Fatalf("client timeout=%v want 0 for streaming", src.client.Timeout)
}
}

func TestSourceReconnectsAfterDeadlineExceededError(t *testing.T) {
var requests atomic.Int64
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
requests.Add(1)
w.Header().Set("Content-Type", "audio/mpeg")
_, _ = w.Write([]byte("test-stream"))
}))
defer srv.Close()

dec := &scriptedLoopDecoder{
actions: []decodeAction{
{err: context.DeadlineExceeded}, // first attempt fails transiently
{blockUntilStop: true}, // second attempt recovers and stays running
},
}
reg := decoder.NewRegistry()
reg.Register("mp3", func() decoder.Decoder { return dec })
reg.Register("ffmpeg", func() decoder.Decoder { return &testDecoder{name: "ffmpeg"} })

src := New("ice-test", srv.URL, srv.Client(), ReconnectConfig{
Enabled: true,
InitialBackoffMs: 1,
MaxBackoffMs: 1,
}, WithDecoderRegistry(reg), WithDecoderPreference("auto"))

if err := src.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer src.Stop()

waitForCondition(t, func() bool { return dec.callCount() >= 2 }, "second decode call after deadline exceeded")

stats := src.Stats()
if stats.Reconnects < 1 {
t.Fatalf("reconnects=%d want >=1", stats.Reconnects)
}
if got := requests.Load(); got < 2 {
t.Fatalf("requests=%d want >=2", got)
}
}

func waitForCondition(t *testing.T, cond func() bool, label string) { func waitForCondition(t *testing.T, cond func() bool, label string) {
t.Helper() t.Helper()
deadline := time.Now().Add(2 * time.Second) deadline := time.Now().Add(2 * time.Second)


+ 311
- 12
internal/ingest/runtime.go Ver fichero

@@ -10,15 +10,24 @@ import (
) )


type Runtime struct { type Runtime struct {
sink *audio.StreamSource
source Source
started atomic.Bool
onTitle func(string)
sink *audio.StreamSource
source Source
started atomic.Bool
onTitle func(string)
prebuffer time.Duration


ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup


work *frameBuffer
workSampleRate int
prebufferFrames int
gateOpen bool
seenChunk bool
lastDrainAt time.Time
drainAllowance float64

mu sync.RWMutex mu sync.RWMutex
active SourceDescriptor active SourceDescriptor
stats RuntimeStats stats RuntimeStats
@@ -32,10 +41,40 @@ func WithStreamTitleHandler(handler func(string)) RuntimeOption {
} }
} }


func WithPrebuffer(d time.Duration) RuntimeOption {
return func(r *Runtime) {
if d < 0 {
d = 0
}
r.prebuffer = d
}
}

func WithPrebufferMs(ms int) RuntimeOption {
return func(r *Runtime) {
if ms < 0 {
ms = 0
}
r.prebuffer = time.Duration(ms) * time.Millisecond
}
}

func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime { func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime {
sampleRate := 44100
capacity := 1024
if sink != nil {
if sink.SampleRate > 0 {
sampleRate = sink.SampleRate
}
if sinkCap := sink.Stats().Capacity; sinkCap > 0 {
capacity = sinkCap * 2
}
}
r := &Runtime{ r := &Runtime{
sink: sink,
source: src,
sink: sink,
source: src,
work: newFrameBuffer(capacity),
workSampleRate: sampleRate,
stats: RuntimeStats{ stats: RuntimeStats{
State: "idle", State: "idle",
}, },
@@ -45,6 +84,17 @@ func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Ru
opt(r) opt(r)
} }
} }
if r.workSampleRate > 0 && r.prebuffer > 0 {
r.prebufferFrames = int(r.prebuffer.Seconds() * float64(r.workSampleRate))
}
minCapacity := 256
if r.prebufferFrames > 0 && minCapacity < r.prebufferFrames*2 {
minCapacity = r.prebufferFrames * 2
}
if r.work == nil || r.work.capacity() < minCapacity {
r.work = newFrameBuffer(minCapacity)
}
r.updateBufferedStatsLocked()
return r return r
} }


@@ -69,6 +119,14 @@ func (r *Runtime) Start(ctx context.Context) error {
r.mu.Lock() r.mu.Lock()
r.active = r.source.Descriptor() r.active = r.source.Descriptor()
r.stats.State = "starting" r.stats.State = "starting"
r.stats.Prebuffering = false
r.stats.WriteBlocked = false
r.gateOpen = false
r.seenChunk = false
r.lastDrainAt = time.Now()
r.drainAllowance = 0
r.work.reset()
r.updateBufferedStatsLocked()
r.mu.Unlock() r.mu.Unlock()
if err := r.source.Start(r.ctx); err != nil { if err := r.source.Start(r.ctx); err != nil {
r.started.Store(false) r.started.Store(false)
@@ -102,12 +160,11 @@ func (r *Runtime) Stop() error {


func (r *Runtime) run() { func (r *Runtime) run() {
defer r.wg.Done() defer r.wg.Done()
r.mu.Lock()
r.stats.State = "running"
r.mu.Unlock()


ch := r.source.Chunks() ch := r.source.Chunks()
errCh := r.source.Errors() errCh := r.source.Errors()
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
var titleCh <-chan string var titleCh <-chan string
if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil { if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil {
titleCh = src.StreamTitleUpdates() titleCh = src.StreamTitleUpdates()
@@ -126,15 +183,19 @@ func (r *Runtime) run() {
} }
r.mu.Lock() r.mu.Lock()
r.stats.State = "degraded" r.stats.State = "degraded"
r.stats.Prebuffering = false
r.mu.Unlock() r.mu.Unlock()
case chunk, ok := <-ch: case chunk, ok := <-ch:
if !ok { if !ok {
r.mu.Lock() r.mu.Lock()
r.stats.State = "stopped" r.stats.State = "stopped"
r.stats.Prebuffering = false
r.mu.Unlock() r.mu.Unlock()
return return
} }
r.handleChunk(chunk) r.handleChunk(chunk)
case <-ticker.C:
r.drainWorkingBuffer()
case title, ok := <-titleCh: case title, ok := <-titleCh:
if !ok { if !ok {
titleCh = nil titleCh = nil
@@ -146,6 +207,32 @@ func (r *Runtime) run() {
} }


func (r *Runtime) handleChunk(chunk PCMChunk) { func (r *Runtime) handleChunk(chunk PCMChunk) {
r.mu.Lock()
r.seenChunk = true

// Propagate the actual decoded sample rate to the sink and pacer the
// first time (or whenever) it differs from our working rate. This fixes
// the two-part rate-mismatch bug that appears when a native decoder
// (e.g. go-mp3) decodes a 48000 Hz stream while the StreamSource and
// StreamResampler were initialised assuming 44100 Hz:
//
// 1. The pacer (pacedDrainLimitLocked) was draining at the wrong rate,
// causing the work buffer to overflow → glitches.
// 2. The StreamResampler ratio (inputRate/outputRate) was computed from
// the stale sink.SampleRate, so every frame was played at the wrong
// pitch → audio too slow (44100/48000 ≈ 91.9 % speed).
//
// SetSampleRate writes atomically, so the StreamResampler's NextFrame()
// picks up the corrected ratio without any additional locking.
if chunk.SampleRateHz > 0 && chunk.SampleRateHz != r.workSampleRate {
r.workSampleRate = chunk.SampleRateHz
if r.sink != nil {
r.sink.SetSampleRate(chunk.SampleRateHz)
}
}

r.mu.Unlock()

frames, err := ChunkToFrames(chunk) frames, err := ChunkToFrames(chunk)
if err != nil { if err != nil {
r.mu.Lock() r.mu.Lock()
@@ -156,16 +243,172 @@ func (r *Runtime) handleChunk(chunk PCMChunk) {
} }
dropped := uint64(0) dropped := uint64(0)
for _, frame := range frames { for _, frame := range frames {
if !r.sink.WriteFrame(frame) {
if !r.work.push(frame) {
dropped++ dropped++
} }
} }
r.mu.Lock() r.mu.Lock()
r.stats.State = "running"
if chunk.SampleRateHz > 0 {
r.active.SampleRateHz = chunk.SampleRateHz
}
if chunk.Channels > 0 {
r.active.Channels = chunk.Channels
}
r.stats.LastChunkAt = time.Now() r.stats.LastChunkAt = time.Now()
r.stats.DroppedFrames += dropped r.stats.DroppedFrames += dropped
r.stats.WriteBlocked = dropped > 0
if dropped > 0 {
r.stats.State = "degraded"
}
r.updateBufferedStatsLocked()
r.mu.Unlock() r.mu.Unlock()
r.drainWorkingBuffer()
}

func (r *Runtime) drainWorkingBuffer() {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
if r.sink == nil {
r.resetDrainPacerLocked(now)
r.updateBufferedStatsLocked()
return
}
bufferedFrames := r.work.available()
if !r.gateOpen {
switch {
case bufferedFrames == 0:
if r.stats.State == "degraded" {
// Keep degraded visible until fresh audio recovers runtime.
} else if !r.seenChunk {
r.stats.State = "starting"
} else if r.stats.State != "degraded" {
r.stats.State = "running"
}
r.stats.Prebuffering = false
r.stats.WriteBlocked = false
r.resetDrainPacerLocked(now)
r.updateBufferedStatsLocked()
return
case r.prebufferFrames > 0 && bufferedFrames < r.prebufferFrames:
r.stats.State = "prebuffering"
r.stats.Prebuffering = true
r.stats.WriteBlocked = false
r.resetDrainPacerLocked(now)
r.updateBufferedStatsLocked()
return
default:
r.gateOpen = true
r.resetDrainPacerLocked(now)
}
}
writeBlocked := false
limit := r.pacedDrainLimitLocked(now, bufferedFrames)
written := 0
for written < limit && r.work.available() > 0 {
frame, ok := r.work.peek()
if !ok {
break
}
if !r.sink.WriteFrame(frame) {
writeBlocked = true
break
}
r.work.pop()
written++
}
if written > 0 {
r.drainAllowance -= float64(written)
if r.drainAllowance < 0 {
r.drainAllowance = 0
}
}
if r.work.available() == 0 && r.prebufferFrames > 0 {
// Re-arm the gate after dry-out to rebuild margin before resuming.
r.gateOpen = false
r.resetDrainPacerLocked(now)
}
r.stats.Prebuffering = false
r.stats.WriteBlocked = writeBlocked
if writeBlocked {
r.stats.State = "degraded"
} else {
r.stats.State = "running"
}
r.updateBufferedStatsLocked()
}

func (r *Runtime) pacedDrainLimitLocked(now time.Time, bufferedFrames int) int {
if bufferedFrames <= 0 {
return 0
}
// Use workSampleRate which is kept in sync with sink.SampleRate via
// handleChunk. This ensures the pacer drains at the actual decoded rate
// rather than the initial (potentially wrong) configured rate.
rate := r.workSampleRate
if r.sink != nil && r.sink.GetSampleRate() > 0 {
rate = r.sink.GetSampleRate()
}
if rate <= 0 {
return bufferedFrames
}
if !r.lastDrainAt.IsZero() {
elapsed := now.Sub(r.lastDrainAt)
if elapsed > 0 {
r.drainAllowance += elapsed.Seconds() * float64(rate)
}
}
r.lastDrainAt = now
maxAllowance := maxInt(1, rate/5) // cap accumulated credit at 200 ms
if r.drainAllowance > float64(maxAllowance) {
r.drainAllowance = float64(maxAllowance)
}
limit := int(r.drainAllowance)
if limit <= 0 {
return 0
}
maxBurst := maxInt(1, rate/50) // max 20 ms worth of frames per drain call
if limit > maxBurst {
limit = maxBurst
}
sinkStats := r.sink.Stats()
headroom := sinkStats.Capacity - sinkStats.Available
if headroom < 0 {
headroom = 0
}
if limit > headroom {
limit = headroom
}
if limit > bufferedFrames {
limit = bufferedFrames
}
return limit
}

func (r *Runtime) resetDrainPacerLocked(now time.Time) {
r.lastDrainAt = now
r.drainAllowance = 0
}

func maxInt(a, b int) int {
if a > b {
return a
}
return b
}

func (r *Runtime) updateBufferedStatsLocked() {
available := r.work.available()
capacity := r.work.capacity()
buffered := 0.0
if capacity > 0 {
buffered = float64(available) / float64(capacity)
}
bufferedSeconds := 0.0
if r.workSampleRate > 0 {
bufferedSeconds = float64(available) / float64(r.workSampleRate)
}
r.stats.Buffered = buffered
r.stats.BufferedSeconds = bufferedSeconds
} }


func (r *Runtime) Stats() Stats { func (r *Runtime) Stats() Stats {
@@ -178,9 +421,65 @@ func (r *Runtime) Stats() Stats {
if r.source != nil { if r.source != nil {
sourceStats = r.source.Stats() sourceStats = r.source.Stats()
} }
if sourceStats.BufferedSeconds < runtimeStats.BufferedSeconds {
sourceStats.BufferedSeconds = runtimeStats.BufferedSeconds
}
return Stats{ return Stats{
Active: active, Active: active,
Source: sourceStats, Source: sourceStats,
Runtime: runtimeStats, Runtime: runtimeStats,
} }
} }

type frameBuffer struct {
frames []audio.Frame
head int
len int
}

func newFrameBuffer(capacity int) *frameBuffer {
if capacity < 1 {
capacity = 1
}
return &frameBuffer{frames: make([]audio.Frame, capacity)}
}

func (b *frameBuffer) capacity() int {
return len(b.frames)
}

func (b *frameBuffer) available() int {
return b.len
}

func (b *frameBuffer) reset() {
b.head = 0
b.len = 0
}

func (b *frameBuffer) push(frame audio.Frame) bool {
if b.len >= len(b.frames) {
return false
}
idx := (b.head + b.len) % len(b.frames)
b.frames[idx] = frame
b.len++
return true
}

func (b *frameBuffer) peek() (audio.Frame, bool) {
if b.len == 0 {
return audio.Frame{}, false
}
return b.frames[b.head], true
}

func (b *frameBuffer) pop() (audio.Frame, bool) {
if b.len == 0 {
return audio.Frame{}, false
}
frame := b.frames[b.head]
b.head = (b.head + 1) % len(b.frames)
b.len--
return frame, true
}

+ 201
- 1
internal/ingest/runtime_test.go Ver fichero

@@ -147,7 +147,6 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T)
t.Fatalf("start: %v", err) t.Fatalf("start: %v", err)
} }
defer rt.Stop() defer rt.Stop()
waitForRuntimeState(t, rt, "running")


stats := rt.Stats() stats := rt.Stats()
if stats.Active.ID != "icecast-primary" { if stats.Active.ID != "icecast-primary" {
@@ -164,6 +163,187 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T)
} }
} }


func TestRuntimePrebufferGateAppliesBeforeSinkWrites(t *testing.T) {
sink := audio.NewStreamSource(512, 1000)
src := newFakeSource()
rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond))
if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 1000,
Samples: stereoSamples(80, 100),
}

time.Sleep(30 * time.Millisecond)
if sink.Available() != 0 {
t.Fatalf("sink available=%d want 0 while prebuffering", sink.Available())
}
stats := rt.Stats()
if stats.Runtime.State != "prebuffering" || !stats.Runtime.Prebuffering {
t.Fatalf("runtime state=%q prebuffering=%t", stats.Runtime.State, stats.Runtime.Prebuffering)
}
if stats.Runtime.BufferedSeconds <= 0 {
t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds)
}

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 1000,
Samples: stereoSamples(40, 120),
}
waitForSinkFrames(t, sink, 1)
waitForRuntimeState(t, rt, "running")
if got := rt.Stats().Runtime.Prebuffering; got {
t.Fatalf("runtime prebuffering=%t want false", got)
}
}

func TestRuntimeWriteBlockedRetainsWorkingBuffer(t *testing.T) {
sink := audio.NewStreamSource(1, 1000)
src := newFakeSource()
rt := NewRuntime(sink, src)
if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 1000,
Samples: stereoSamples(4, 200),
}
waitForSinkFrames(t, sink, 1)
waitForRuntimeState(t, rt, "running")
stats := rt.Stats()
if stats.Runtime.WriteBlocked {
t.Fatalf("runtime writeBlocked=%t want false", stats.Runtime.WriteBlocked)
}
if stats.Runtime.BufferedSeconds <= 0 {
t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds)
}
if stats.Runtime.DroppedFrames != 0 {
t.Fatalf("runtime droppedFrames=%d want 0", stats.Runtime.DroppedFrames)
}
if got := sink.Stats().Overflows; got != 0 {
t.Fatalf("sink overflows=%d want 0", got)
}
}

func TestRuntimeDrainWorkingBufferIsBurstBounded(t *testing.T) {
sink := audio.NewStreamSource(64, 1000)
rt := NewRuntime(sink, nil)

rt.gateOpen = true
for i := 0; i < 40; i++ {
if !rt.work.push(audio.NewFrame(0.1, -0.1)) {
t.Fatalf("failed to seed work frame %d", i)
}
}
rt.lastDrainAt = time.Now().Add(-time.Second)

rt.drainWorkingBuffer()

if got := sink.Available(); got != 20 {
t.Fatalf("sink available=%d want 20 (20ms burst at 1kHz)", got)
}
if got := rt.work.available(); got != 20 {
t.Fatalf("work available=%d want 20", got)
}
if got := rt.Stats().Runtime.WriteBlocked; got {
t.Fatalf("runtime writeBlocked=%t want false", got)
}
}

func TestRuntimeDrainWorkingBufferHonorsSinkHeadroom(t *testing.T) {
sink := audio.NewStreamSource(64, 1000)
rt := NewRuntime(sink, nil)

for i := 0; i < 63; i++ {
if !sink.WriteFrame(audio.NewFrame(0.2, -0.2)) {
t.Fatalf("failed to seed sink frame %d", i)
}
}
rt.gateOpen = true
for i := 0; i < 8; i++ {
if !rt.work.push(audio.NewFrame(0.3, -0.3)) {
t.Fatalf("failed to seed work frame %d", i)
}
}
rt.lastDrainAt = time.Now().Add(-time.Second)

rt.drainWorkingBuffer()

if got := sink.Available(); got != 64 {
t.Fatalf("sink available=%d want 64", got)
}
if got := rt.work.available(); got != 7 {
t.Fatalf("work available=%d want 7", got)
}
if got := sink.Stats().Overflows; got != 0 {
t.Fatalf("sink overflows=%d want 0", got)
}
if got := rt.Stats().Runtime.WriteBlocked; got {
t.Fatalf("runtime writeBlocked=%t want false", got)
}
}

func TestRuntimeStatsSourceBufferedSecondsIncludesWorkingBuffer(t *testing.T) {
sink := audio.NewStreamSource(32, 1000)
src := newFakeSource()
src.stats = SourceStats{State: "running", Connected: true, BufferedSeconds: 0}
rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond))
if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 1000,
Samples: stereoSamples(50, 300),
}
time.Sleep(20 * time.Millisecond)
stats := rt.Stats()
if stats.Source.BufferedSeconds <= 0 {
t.Fatalf("source bufferedSeconds=%f want > 0", stats.Source.BufferedSeconds)
}
}

func TestRuntimeUpdatesActiveDescriptorFromChunkMetadata(t *testing.T) {
sink := audio.NewStreamSource(128, 44100)
src := newFakeSource()
src.desc = SourceDescriptor{
ID: "icecast-primary",
Kind: "icecast",
Channels: 0,
SampleRateHz: 0,
}
rt := NewRuntime(sink, src)
if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 48000,
Samples: []int32{100 << 16, -100 << 16},
}

waitForRuntimeState(t, rt, "running")
stats := rt.Stats()
if stats.Active.SampleRateHz != 48000 {
t.Fatalf("active sampleRateHz=%d want 48000", stats.Active.SampleRateHz)
}
if stats.Active.Channels != 2 {
t.Fatalf("active channels=%d want 2", stats.Active.Channels)
}
}

func TestRuntimeForwardsStreamTitleUpdatesToHandler(t *testing.T) { func TestRuntimeForwardsStreamTitleUpdatesToHandler(t *testing.T) {
sink := audio.NewStreamSource(128, 44100) sink := audio.NewStreamSource(128, 44100)
src := newFakeSource() src := newFakeSource()
@@ -199,3 +379,23 @@ func waitForRuntimeState(t *testing.T, rt *Runtime, want string) {
} }
t.Fatalf("timeout waiting for runtime state %q; last=%q", want, rt.Stats().Runtime.State) t.Fatalf("timeout waiting for runtime state %q; last=%q", want, rt.Stats().Runtime.State)
} }

func waitForSinkFrames(t *testing.T, sink *audio.StreamSource, minFrames int) {
t.Helper()
deadline := time.Now().Add(1 * time.Second)
for time.Now().Before(deadline) {
if sink.Available() >= minFrames {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timeout waiting for sink frames: have=%d want>=%d", sink.Available(), minFrames)
}

func stereoSamples(frames int, v int32) []int32 {
out := make([]int32, 0, frames*2)
for i := 0; i < frames; i++ {
out = append(out, v<<16, -v<<16)
}
return out
}

+ 8
- 6
internal/ingest/stats.go Ver fichero

@@ -24,12 +24,14 @@ type SourceStats struct {
} }


type RuntimeStats struct { type RuntimeStats struct {
State string `json:"state"`
Prebuffering bool `json:"prebuffering"`
LastChunkAt time.Time `json:"lastChunkAt,omitempty"`
DroppedFrames uint64 `json:"droppedFrames"`
ConvertErrors uint64 `json:"convertErrors"`
WriteBlocked bool `json:"writeBlocked"`
State string `json:"state"`
Prebuffering bool `json:"prebuffering"`
Buffered float64 `json:"buffered"`
BufferedSeconds float64 `json:"bufferedSeconds"`
LastChunkAt time.Time `json:"lastChunkAt,omitempty"`
DroppedFrames uint64 `json:"droppedFrames"`
ConvertErrors uint64 `json:"convertErrors"`
WriteBlocked bool `json:"writeBlocked"`
} }


type Stats struct { type Stats struct {


Cargando…
Cancelar
Guardar