package ingest import ( "context" "sync" "sync/atomic" "time" "github.com/jan/fm-rds-tx/internal/audio" ) type Runtime struct { sink *audio.StreamSource source Source started atomic.Bool onTitle func(string) prebuffer time.Duration ctx context.Context cancel context.CancelFunc wg sync.WaitGroup work *frameBuffer workSampleRate int prebufferFrames int gateOpen bool seenChunk bool lastDrainAt time.Time drainAllowance float64 mu sync.RWMutex active SourceDescriptor stats RuntimeStats } type RuntimeOption func(*Runtime) func WithStreamTitleHandler(handler func(string)) RuntimeOption { return func(r *Runtime) { r.onTitle = handler } } func WithPrebuffer(d time.Duration) RuntimeOption { return func(r *Runtime) { if d < 0 { d = 0 } r.prebuffer = d } } func WithPrebufferMs(ms int) RuntimeOption { return func(r *Runtime) { if ms < 0 { ms = 0 } r.prebuffer = time.Duration(ms) * time.Millisecond } } func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime { sampleRate := 44100 capacity := 1024 if sink != nil { if sink.SampleRate > 0 { sampleRate = sink.SampleRate } if sinkCap := sink.Stats().Capacity; sinkCap > 0 { capacity = sinkCap * 2 } } r := &Runtime{ sink: sink, source: src, work: newFrameBuffer(capacity), workSampleRate: sampleRate, stats: RuntimeStats{ State: "idle", }, } for _, opt := range opts { if opt != nil { opt(r) } } if r.workSampleRate > 0 && r.prebuffer > 0 { r.prebufferFrames = int(r.prebuffer.Seconds() * float64(r.workSampleRate)) } minCapacity := 256 if r.prebufferFrames > 0 && minCapacity < r.prebufferFrames*2 { minCapacity = r.prebufferFrames * 2 } if r.work == nil || r.work.capacity() < minCapacity { r.work = newFrameBuffer(minCapacity) } r.updateBufferedStatsLocked() return r } func (r *Runtime) Start(ctx context.Context) error { if r.sink == nil { r.mu.Lock() r.stats.State = "failed" r.mu.Unlock() return nil } if r.source == nil { r.mu.Lock() r.stats.State = "idle" r.mu.Unlock() return nil } if !r.started.CompareAndSwap(false, true) { return nil } r.ctx, r.cancel = context.WithCancel(ctx) r.mu.Lock() r.active = r.source.Descriptor() r.stats.State = "starting" r.stats.Prebuffering = false r.stats.WriteBlocked = false r.gateOpen = false r.seenChunk = false r.lastDrainAt = time.Now() r.drainAllowance = 0 r.work.reset() r.updateBufferedStatsLocked() r.mu.Unlock() if err := r.source.Start(r.ctx); err != nil { r.started.Store(false) r.mu.Lock() r.stats.State = "failed" r.mu.Unlock() return err } r.wg.Add(1) go r.run() return nil } func (r *Runtime) Stop() error { if !r.started.CompareAndSwap(true, false) { return nil } if r.cancel != nil { r.cancel() } if r.source != nil { _ = r.source.Stop() } r.wg.Wait() r.mu.Lock() r.stats.State = "stopped" r.mu.Unlock() return nil } func (r *Runtime) run() { defer r.wg.Done() ch := r.source.Chunks() errCh := r.source.Errors() ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() var titleCh <-chan string if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil { titleCh = src.StreamTitleUpdates() } for { select { case <-r.ctx.Done(): return case err, ok := <-errCh: if !ok { errCh = nil continue } if err == nil { continue } r.mu.Lock() r.stats.State = "degraded" r.stats.Prebuffering = false r.mu.Unlock() case chunk, ok := <-ch: if !ok { r.mu.Lock() r.stats.State = "stopped" r.stats.Prebuffering = false r.mu.Unlock() return } r.handleChunk(chunk) case <-ticker.C: r.drainWorkingBuffer() case title, ok := <-titleCh: if !ok { titleCh = nil continue } r.onTitle(title) } } } func (r *Runtime) handleChunk(chunk PCMChunk) { r.mu.Lock() r.seenChunk = true r.mu.Unlock() frames, err := ChunkToFrames(chunk) if err != nil { r.mu.Lock() r.stats.ConvertErrors++ r.stats.State = "degraded" r.mu.Unlock() return } dropped := uint64(0) for _, frame := range frames { if !r.work.push(frame) { dropped++ } } r.mu.Lock() if chunk.SampleRateHz > 0 { r.active.SampleRateHz = chunk.SampleRateHz } if chunk.Channels > 0 { r.active.Channels = chunk.Channels } r.stats.LastChunkAt = time.Now() r.stats.DroppedFrames += dropped if dropped > 0 { r.stats.State = "degraded" } r.updateBufferedStatsLocked() r.mu.Unlock() r.drainWorkingBuffer() } func (r *Runtime) drainWorkingBuffer() { r.mu.Lock() defer r.mu.Unlock() now := time.Now() if r.sink == nil { r.resetDrainPacerLocked(now) r.updateBufferedStatsLocked() return } bufferedFrames := r.work.available() if !r.gateOpen { switch { case bufferedFrames == 0: if r.stats.State == "degraded" { // Keep degraded visible until fresh audio recovers runtime. } else if !r.seenChunk { r.stats.State = "starting" } else if r.stats.State != "degraded" { r.stats.State = "running" } r.stats.Prebuffering = false r.stats.WriteBlocked = false r.resetDrainPacerLocked(now) r.updateBufferedStatsLocked() return case r.prebufferFrames > 0 && bufferedFrames < r.prebufferFrames: r.stats.State = "prebuffering" r.stats.Prebuffering = true r.stats.WriteBlocked = false r.resetDrainPacerLocked(now) r.updateBufferedStatsLocked() return default: r.gateOpen = true r.resetDrainPacerLocked(now) } } writeBlocked := false limit := r.pacedDrainLimitLocked(now, bufferedFrames) written := 0 for written < limit && r.work.available() > 0 { frame, ok := r.work.peek() if !ok { break } if !r.sink.WriteFrame(frame) { writeBlocked = true break } r.work.pop() written++ } if written > 0 { r.drainAllowance -= float64(written) if r.drainAllowance < 0 { r.drainAllowance = 0 } } if r.work.available() == 0 && r.prebufferFrames > 0 { // Re-arm the gate after dry-out to rebuild margin before resuming. r.gateOpen = false r.resetDrainPacerLocked(now) } r.stats.Prebuffering = false r.stats.WriteBlocked = writeBlocked if writeBlocked { r.stats.State = "degraded" } else { r.stats.State = "running" } r.updateBufferedStatsLocked() } func (r *Runtime) pacedDrainLimitLocked(now time.Time, bufferedFrames int) int { if bufferedFrames <= 0 { return 0 } rate := r.workSampleRate if r.sink != nil && r.sink.SampleRate > 0 { rate = r.sink.SampleRate } if rate <= 0 { return bufferedFrames } if !r.lastDrainAt.IsZero() { elapsed := now.Sub(r.lastDrainAt) if elapsed > 0 { r.drainAllowance += elapsed.Seconds() * float64(rate) } } r.lastDrainAt = now maxAllowance := maxInt(1, rate/5) // cap accumulated credit at 200 ms if r.drainAllowance > float64(maxAllowance) { r.drainAllowance = float64(maxAllowance) } limit := int(r.drainAllowance) if limit <= 0 { return 0 } maxBurst := maxInt(1, rate/50) // max 20 ms worth of frames per drain call if limit > maxBurst { limit = maxBurst } sinkStats := r.sink.Stats() headroom := sinkStats.Capacity - sinkStats.Available if headroom < 0 { headroom = 0 } if limit > headroom { limit = headroom } if limit > bufferedFrames { limit = bufferedFrames } return limit } func (r *Runtime) resetDrainPacerLocked(now time.Time) { r.lastDrainAt = now r.drainAllowance = 0 } func maxInt(a, b int) int { if a > b { return a } return b } func (r *Runtime) updateBufferedStatsLocked() { available := r.work.available() capacity := r.work.capacity() buffered := 0.0 if capacity > 0 { buffered = float64(available) / float64(capacity) } bufferedSeconds := 0.0 if r.workSampleRate > 0 { bufferedSeconds = float64(available) / float64(r.workSampleRate) } r.stats.Buffered = buffered r.stats.BufferedSeconds = bufferedSeconds } func (r *Runtime) Stats() Stats { r.mu.RLock() runtimeStats := r.stats active := r.active r.mu.RUnlock() sourceStats := SourceStats{} if r.source != nil { sourceStats = r.source.Stats() } if sourceStats.BufferedSeconds < runtimeStats.BufferedSeconds { sourceStats.BufferedSeconds = runtimeStats.BufferedSeconds } return Stats{ Active: active, Source: sourceStats, Runtime: runtimeStats, } } type frameBuffer struct { frames []audio.Frame head int len int } func newFrameBuffer(capacity int) *frameBuffer { if capacity < 1 { capacity = 1 } return &frameBuffer{frames: make([]audio.Frame, capacity)} } func (b *frameBuffer) capacity() int { return len(b.frames) } func (b *frameBuffer) available() int { return b.len } func (b *frameBuffer) reset() { b.head = 0 b.len = 0 } func (b *frameBuffer) push(frame audio.Frame) bool { if b.len >= len(b.frames) { return false } idx := (b.head + b.len) % len(b.frames) b.frames[idx] = frame b.len++ return true } func (b *frameBuffer) peek() (audio.Frame, bool) { if b.len == 0 { return audio.Frame{}, false } return b.frames[b.head], true } func (b *frameBuffer) pop() (audio.Frame, bool) { if b.len == 0 { return audio.Frame{}, false } frame := b.frames[b.head] b.head = (b.head + 1) % len(b.frames) b.len-- return frame, true }