Przeglądaj źródła

ingest: add runtime working buffer and prebuffer gate

main
Jan 1 miesiąc temu
rodzic
commit
33b9640ef0
3 zmienionych plików z 321 dodań i 19 usunięć
  1. +205
    -12
      internal/ingest/runtime.go
  2. +108
    -1
      internal/ingest/runtime_test.go
  3. +8
    -6
      internal/ingest/stats.go

+ 205
- 12
internal/ingest/runtime.go Wyświetl plik

@@ -10,15 +10,22 @@ import (
)

type Runtime struct {
sink *audio.StreamSource
source Source
started atomic.Bool
onTitle func(string)
sink *audio.StreamSource
source Source
started atomic.Bool
onTitle func(string)
prebuffer time.Duration

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

work *frameBuffer
workSampleRate int
prebufferFrames int
gateOpen bool
seenChunk bool

mu sync.RWMutex
active SourceDescriptor
stats RuntimeStats
@@ -32,10 +39,40 @@ func WithStreamTitleHandler(handler func(string)) RuntimeOption {
}
}

func WithPrebuffer(d time.Duration) RuntimeOption {
return func(r *Runtime) {
if d < 0 {
d = 0
}
r.prebuffer = d
}
}

func WithPrebufferMs(ms int) RuntimeOption {
return func(r *Runtime) {
if ms < 0 {
ms = 0
}
r.prebuffer = time.Duration(ms) * time.Millisecond
}
}

func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime {
sampleRate := 44100
capacity := 1024
if sink != nil {
if sink.SampleRate > 0 {
sampleRate = sink.SampleRate
}
if sinkCap := sink.Stats().Capacity; sinkCap > 0 {
capacity = sinkCap * 2
}
}
r := &Runtime{
sink: sink,
source: src,
sink: sink,
source: src,
work: newFrameBuffer(capacity),
workSampleRate: sampleRate,
stats: RuntimeStats{
State: "idle",
},
@@ -45,6 +82,17 @@ func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Ru
opt(r)
}
}
if r.workSampleRate > 0 && r.prebuffer > 0 {
r.prebufferFrames = int(r.prebuffer.Seconds() * float64(r.workSampleRate))
}
minCapacity := 256
if r.prebufferFrames > 0 && minCapacity < r.prebufferFrames*2 {
minCapacity = r.prebufferFrames * 2
}
if r.work == nil || r.work.capacity() < minCapacity {
r.work = newFrameBuffer(minCapacity)
}
r.updateBufferedStatsLocked()
return r
}

@@ -69,6 +117,12 @@ func (r *Runtime) Start(ctx context.Context) error {
r.mu.Lock()
r.active = r.source.Descriptor()
r.stats.State = "starting"
r.stats.Prebuffering = false
r.stats.WriteBlocked = false
r.gateOpen = false
r.seenChunk = false
r.work.reset()
r.updateBufferedStatsLocked()
r.mu.Unlock()
if err := r.source.Start(r.ctx); err != nil {
r.started.Store(false)
@@ -102,12 +156,11 @@ func (r *Runtime) Stop() error {

func (r *Runtime) run() {
defer r.wg.Done()
r.mu.Lock()
r.stats.State = "running"
r.mu.Unlock()

ch := r.source.Chunks()
errCh := r.source.Errors()
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
var titleCh <-chan string
if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil {
titleCh = src.StreamTitleUpdates()
@@ -126,15 +179,19 @@ func (r *Runtime) run() {
}
r.mu.Lock()
r.stats.State = "degraded"
r.stats.Prebuffering = false
r.mu.Unlock()
case chunk, ok := <-ch:
if !ok {
r.mu.Lock()
r.stats.State = "stopped"
r.stats.Prebuffering = false
r.mu.Unlock()
return
}
r.handleChunk(chunk)
case <-ticker.C:
r.drainWorkingBuffer()
case title, ok := <-titleCh:
if !ok {
titleCh = nil
@@ -146,6 +203,10 @@ func (r *Runtime) run() {
}

func (r *Runtime) handleChunk(chunk PCMChunk) {
r.mu.Lock()
r.seenChunk = true
r.mu.Unlock()

frames, err := ChunkToFrames(chunk)
if err != nil {
r.mu.Lock()
@@ -156,7 +217,7 @@ func (r *Runtime) handleChunk(chunk PCMChunk) {
}
dropped := uint64(0)
for _, frame := range frames {
if !r.sink.WriteFrame(frame) {
if !r.work.push(frame) {
dropped++
}
}
@@ -167,11 +228,87 @@ func (r *Runtime) handleChunk(chunk PCMChunk) {
if chunk.Channels > 0 {
r.active.Channels = chunk.Channels
}
r.stats.State = "running"
r.stats.LastChunkAt = time.Now()
r.stats.DroppedFrames += dropped
r.stats.WriteBlocked = dropped > 0
if dropped > 0 {
r.stats.State = "degraded"
}
r.updateBufferedStatsLocked()
r.mu.Unlock()
r.drainWorkingBuffer()
}

func (r *Runtime) drainWorkingBuffer() {
r.mu.Lock()
defer r.mu.Unlock()
if r.sink == nil {
r.updateBufferedStatsLocked()
return
}
bufferedFrames := r.work.available()
if !r.gateOpen {
switch {
case bufferedFrames == 0:
if r.stats.State == "degraded" {
// Keep degraded visible until fresh audio recovers runtime.
} else if !r.seenChunk {
r.stats.State = "starting"
} else if r.stats.State != "degraded" {
r.stats.State = "running"
}
r.stats.Prebuffering = false
r.stats.WriteBlocked = false
r.updateBufferedStatsLocked()
return
case r.prebufferFrames > 0 && bufferedFrames < r.prebufferFrames:
r.stats.State = "prebuffering"
r.stats.Prebuffering = true
r.stats.WriteBlocked = false
r.updateBufferedStatsLocked()
return
default:
r.gateOpen = true
}
}
writeBlocked := false
for r.work.available() > 0 {
frame, ok := r.work.peek()
if !ok {
break
}
if !r.sink.WriteFrame(frame) {
writeBlocked = true
break
}
r.work.pop()
}
if r.work.available() == 0 && r.prebufferFrames > 0 {
// Re-arm the gate after dry-out to rebuild margin before resuming.
r.gateOpen = false
}
r.stats.Prebuffering = false
r.stats.WriteBlocked = writeBlocked
if writeBlocked {
r.stats.State = "degraded"
} else {
r.stats.State = "running"
}
r.updateBufferedStatsLocked()
}

func (r *Runtime) updateBufferedStatsLocked() {
available := r.work.available()
capacity := r.work.capacity()
buffered := 0.0
if capacity > 0 {
buffered = float64(available) / float64(capacity)
}
bufferedSeconds := 0.0
if r.workSampleRate > 0 {
bufferedSeconds = float64(available) / float64(r.workSampleRate)
}
r.stats.Buffered = buffered
r.stats.BufferedSeconds = bufferedSeconds
}

func (r *Runtime) Stats() Stats {
@@ -184,9 +321,65 @@ func (r *Runtime) Stats() Stats {
if r.source != nil {
sourceStats = r.source.Stats()
}
if sourceStats.BufferedSeconds < runtimeStats.BufferedSeconds {
sourceStats.BufferedSeconds = runtimeStats.BufferedSeconds
}
return Stats{
Active: active,
Source: sourceStats,
Runtime: runtimeStats,
}
}

type frameBuffer struct {
frames []audio.Frame
head int
len int
}

func newFrameBuffer(capacity int) *frameBuffer {
if capacity < 1 {
capacity = 1
}
return &frameBuffer{frames: make([]audio.Frame, capacity)}
}

func (b *frameBuffer) capacity() int {
return len(b.frames)
}

func (b *frameBuffer) available() int {
return b.len
}

func (b *frameBuffer) reset() {
b.head = 0
b.len = 0
}

func (b *frameBuffer) push(frame audio.Frame) bool {
if b.len >= len(b.frames) {
return false
}
idx := (b.head + b.len) % len(b.frames)
b.frames[idx] = frame
b.len++
return true
}

func (b *frameBuffer) peek() (audio.Frame, bool) {
if b.len == 0 {
return audio.Frame{}, false
}
return b.frames[b.head], true
}

func (b *frameBuffer) pop() (audio.Frame, bool) {
if b.len == 0 {
return audio.Frame{}, false
}
frame := b.frames[b.head]
b.head = (b.head + 1) % len(b.frames)
b.len--
return frame, true
}

+ 108
- 1
internal/ingest/runtime_test.go Wyświetl plik

@@ -147,7 +147,6 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T)
t.Fatalf("start: %v", err)
}
defer rt.Stop()
waitForRuntimeState(t, rt, "running")

stats := rt.Stats()
if stats.Active.ID != "icecast-primary" {
@@ -164,6 +163,94 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T)
}
}

func TestRuntimePrebufferGateAppliesBeforeSinkWrites(t *testing.T) {
sink := audio.NewStreamSource(512, 1000)
src := newFakeSource()
rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond))
if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 1000,
Samples: stereoSamples(80, 100),
}

time.Sleep(30 * time.Millisecond)
if sink.Available() != 0 {
t.Fatalf("sink available=%d want 0 while prebuffering", sink.Available())
}
stats := rt.Stats()
if stats.Runtime.State != "prebuffering" || !stats.Runtime.Prebuffering {
t.Fatalf("runtime state=%q prebuffering=%t", stats.Runtime.State, stats.Runtime.Prebuffering)
}
if stats.Runtime.BufferedSeconds <= 0 {
t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds)
}

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 1000,
Samples: stereoSamples(40, 120),
}
waitForSinkFrames(t, sink, 1)
waitForRuntimeState(t, rt, "running")
if got := rt.Stats().Runtime.Prebuffering; got {
t.Fatalf("runtime prebuffering=%t want false", got)
}
}

func TestRuntimeWriteBlockedRetainsWorkingBuffer(t *testing.T) {
sink := audio.NewStreamSource(1, 1000)
src := newFakeSource()
rt := NewRuntime(sink, src)
if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 1000,
Samples: stereoSamples(4, 200),
}
waitForRuntimeState(t, rt, "degraded")
stats := rt.Stats()
if !stats.Runtime.WriteBlocked {
t.Fatalf("runtime writeBlocked=%t want true", stats.Runtime.WriteBlocked)
}
if stats.Runtime.BufferedSeconds <= 0 {
t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds)
}
if stats.Runtime.DroppedFrames != 0 {
t.Fatalf("runtime droppedFrames=%d want 0", stats.Runtime.DroppedFrames)
}
}

func TestRuntimeStatsSourceBufferedSecondsIncludesWorkingBuffer(t *testing.T) {
sink := audio.NewStreamSource(32, 1000)
src := newFakeSource()
src.stats = SourceStats{State: "running", Connected: true, BufferedSeconds: 0}
rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond))
if err := rt.Start(context.Background()); err != nil {
t.Fatalf("start: %v", err)
}
defer rt.Stop()

src.chunks <- PCMChunk{
Channels: 2,
SampleRateHz: 1000,
Samples: stereoSamples(50, 300),
}
time.Sleep(20 * time.Millisecond)
stats := rt.Stats()
if stats.Source.BufferedSeconds <= 0 {
t.Fatalf("source bufferedSeconds=%f want > 0", stats.Source.BufferedSeconds)
}
}

func TestRuntimeUpdatesActiveDescriptorFromChunkMetadata(t *testing.T) {
sink := audio.NewStreamSource(128, 44100)
src := newFakeSource()
@@ -230,3 +317,23 @@ func waitForRuntimeState(t *testing.T, rt *Runtime, want string) {
}
t.Fatalf("timeout waiting for runtime state %q; last=%q", want, rt.Stats().Runtime.State)
}

func waitForSinkFrames(t *testing.T, sink *audio.StreamSource, minFrames int) {
t.Helper()
deadline := time.Now().Add(1 * time.Second)
for time.Now().Before(deadline) {
if sink.Available() >= minFrames {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timeout waiting for sink frames: have=%d want>=%d", sink.Available(), minFrames)
}

func stereoSamples(frames int, v int32) []int32 {
out := make([]int32, 0, frames*2)
for i := 0; i < frames; i++ {
out = append(out, v<<16, -v<<16)
}
return out
}

+ 8
- 6
internal/ingest/stats.go Wyświetl plik

@@ -24,12 +24,14 @@ type SourceStats struct {
}

type RuntimeStats struct {
State string `json:"state"`
Prebuffering bool `json:"prebuffering"`
LastChunkAt time.Time `json:"lastChunkAt,omitempty"`
DroppedFrames uint64 `json:"droppedFrames"`
ConvertErrors uint64 `json:"convertErrors"`
WriteBlocked bool `json:"writeBlocked"`
State string `json:"state"`
Prebuffering bool `json:"prebuffering"`
Buffered float64 `json:"buffered"`
BufferedSeconds float64 `json:"bufferedSeconds"`
LastChunkAt time.Time `json:"lastChunkAt,omitempty"`
DroppedFrames uint64 `json:"droppedFrames"`
ConvertErrors uint64 `json:"convertErrors"`
WriteBlocked bool `json:"writeBlocked"`
}

type Stats struct {


Ładowanie…
Anuluj
Zapisz