|
- package ingest
-
- import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/jan/fm-rds-tx/internal/audio"
- )
-
- type Runtime struct {
- sink *audio.StreamSource
- source Source
- started atomic.Bool
- onTitle func(string)
- prebuffer time.Duration
-
- ctx context.Context
- cancel context.CancelFunc
- wg sync.WaitGroup
-
- work *frameBuffer
- workSampleRate int
- prebufferFrames int
- gateOpen bool
- seenChunk bool
- lastDrainAt time.Time
- drainAllowance float64
-
- mu sync.RWMutex
- active SourceDescriptor
- stats RuntimeStats
- }
-
- type RuntimeOption func(*Runtime)
-
- func WithStreamTitleHandler(handler func(string)) RuntimeOption {
- return func(r *Runtime) {
- r.onTitle = handler
- }
- }
-
- func WithPrebuffer(d time.Duration) RuntimeOption {
- return func(r *Runtime) {
- if d < 0 {
- d = 0
- }
- r.prebuffer = d
- }
- }
-
- func WithPrebufferMs(ms int) RuntimeOption {
- return func(r *Runtime) {
- if ms < 0 {
- ms = 0
- }
- r.prebuffer = time.Duration(ms) * time.Millisecond
- }
- }
-
- func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime {
- sampleRate := 44100
- capacity := 1024
- if sink != nil {
- if sink.SampleRate > 0 {
- sampleRate = sink.SampleRate
- }
- if sinkCap := sink.Stats().Capacity; sinkCap > 0 {
- capacity = sinkCap * 2
- }
- }
- r := &Runtime{
- sink: sink,
- source: src,
- work: newFrameBuffer(capacity),
- workSampleRate: sampleRate,
- stats: RuntimeStats{
- State: "idle",
- },
- }
- for _, opt := range opts {
- if opt != nil {
- opt(r)
- }
- }
- if r.workSampleRate > 0 && r.prebuffer > 0 {
- r.prebufferFrames = int(r.prebuffer.Seconds() * float64(r.workSampleRate))
- }
- minCapacity := 256
- if r.prebufferFrames > 0 && minCapacity < r.prebufferFrames*2 {
- minCapacity = r.prebufferFrames * 2
- }
- if r.work == nil || r.work.capacity() < minCapacity {
- r.work = newFrameBuffer(minCapacity)
- }
- r.updateBufferedStatsLocked()
- return r
- }
-
- func (r *Runtime) Start(ctx context.Context) error {
- if r.sink == nil {
- r.mu.Lock()
- r.stats.State = "failed"
- r.mu.Unlock()
- return nil
- }
- if r.source == nil {
- r.mu.Lock()
- r.stats.State = "idle"
- r.mu.Unlock()
- return nil
- }
- if !r.started.CompareAndSwap(false, true) {
- return nil
- }
-
- r.ctx, r.cancel = context.WithCancel(ctx)
- r.mu.Lock()
- r.active = r.source.Descriptor()
- r.stats.State = "starting"
- r.stats.Prebuffering = false
- r.stats.WriteBlocked = false
- r.gateOpen = false
- r.seenChunk = false
- r.lastDrainAt = time.Now()
- r.drainAllowance = 0
- r.work.reset()
- r.updateBufferedStatsLocked()
- r.mu.Unlock()
- if err := r.source.Start(r.ctx); err != nil {
- r.started.Store(false)
- r.mu.Lock()
- r.stats.State = "failed"
- r.mu.Unlock()
- return err
- }
-
- r.wg.Add(1)
- go r.run()
- return nil
- }
-
- func (r *Runtime) Stop() error {
- if !r.started.CompareAndSwap(true, false) {
- return nil
- }
- if r.cancel != nil {
- r.cancel()
- }
- if r.source != nil {
- _ = r.source.Stop()
- }
- r.wg.Wait()
- r.mu.Lock()
- r.stats.State = "stopped"
- r.mu.Unlock()
- return nil
- }
-
- func (r *Runtime) run() {
- defer r.wg.Done()
-
- ch := r.source.Chunks()
- errCh := r.source.Errors()
- ticker := time.NewTicker(10 * time.Millisecond)
- defer ticker.Stop()
- var titleCh <-chan string
- if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil {
- titleCh = src.StreamTitleUpdates()
- }
- for {
- select {
- case <-r.ctx.Done():
- return
- case err, ok := <-errCh:
- if !ok {
- errCh = nil
- continue
- }
- if err == nil {
- continue
- }
- r.mu.Lock()
- r.stats.State = "degraded"
- r.stats.Prebuffering = false
- r.mu.Unlock()
- case chunk, ok := <-ch:
- if !ok {
- r.mu.Lock()
- r.stats.State = "stopped"
- r.stats.Prebuffering = false
- r.mu.Unlock()
- return
- }
- r.handleChunk(chunk)
- case <-ticker.C:
- r.drainWorkingBuffer()
- case title, ok := <-titleCh:
- if !ok {
- titleCh = nil
- continue
- }
- r.onTitle(title)
- }
- }
- }
-
- func (r *Runtime) handleChunk(chunk PCMChunk) {
- r.mu.Lock()
- r.seenChunk = true
-
- // Propagate the actual decoded sample rate to the sink and pacer the
- // first time (or whenever) it differs from our working rate. This fixes
- // the two-part rate-mismatch bug that appears when a native decoder
- // (e.g. go-mp3) decodes a 48000 Hz stream while the StreamSource and
- // StreamResampler were initialised assuming 44100 Hz:
- //
- // 1. The pacer (pacedDrainLimitLocked) was draining at the wrong rate,
- // causing the work buffer to overflow → glitches.
- // 2. The StreamResampler ratio (inputRate/outputRate) was computed from
- // the stale sink.SampleRate, so every frame was played at the wrong
- // pitch → audio too slow (44100/48000 ≈ 91.9 % speed).
- //
- // SetSampleRate writes atomically, so the StreamResampler's NextFrame()
- // picks up the corrected ratio without any additional locking.
- if chunk.SampleRateHz > 0 && chunk.SampleRateHz != r.workSampleRate {
- r.workSampleRate = chunk.SampleRateHz
- if r.sink != nil {
- r.sink.SetSampleRate(chunk.SampleRateHz)
- }
- }
-
- r.mu.Unlock()
-
- frames, err := ChunkToFrames(chunk)
- if err != nil {
- r.mu.Lock()
- r.stats.ConvertErrors++
- r.stats.State = "degraded"
- r.mu.Unlock()
- return
- }
- dropped := uint64(0)
- for _, frame := range frames {
- if !r.work.push(frame) {
- dropped++
- }
- }
- r.mu.Lock()
- if chunk.SampleRateHz > 0 {
- r.active.SampleRateHz = chunk.SampleRateHz
- }
- if chunk.Channels > 0 {
- r.active.Channels = chunk.Channels
- }
- r.stats.LastChunkAt = time.Now()
- r.stats.DroppedFrames += dropped
- if dropped > 0 {
- r.stats.State = "degraded"
- }
- r.updateBufferedStatsLocked()
- r.mu.Unlock()
- r.drainWorkingBuffer()
- }
-
- func (r *Runtime) drainWorkingBuffer() {
- r.mu.Lock()
- defer r.mu.Unlock()
- now := time.Now()
- if r.sink == nil {
- r.resetDrainPacerLocked(now)
- r.updateBufferedStatsLocked()
- return
- }
- bufferedFrames := r.work.available()
- if !r.gateOpen {
- switch {
- case bufferedFrames == 0:
- if r.stats.State == "degraded" {
- // Keep degraded visible until fresh audio recovers runtime.
- } else if !r.seenChunk {
- r.stats.State = "starting"
- } else if r.stats.State != "degraded" {
- r.stats.State = "running"
- }
- r.stats.Prebuffering = false
- r.stats.WriteBlocked = false
- r.resetDrainPacerLocked(now)
- r.updateBufferedStatsLocked()
- return
- case r.prebufferFrames > 0 && bufferedFrames < r.prebufferFrames:
- r.stats.State = "prebuffering"
- r.stats.Prebuffering = true
- r.stats.WriteBlocked = false
- r.resetDrainPacerLocked(now)
- r.updateBufferedStatsLocked()
- return
- default:
- r.gateOpen = true
- r.resetDrainPacerLocked(now)
- }
- }
- writeBlocked := false
- limit := r.pacedDrainLimitLocked(now, bufferedFrames)
- written := 0
- for written < limit && r.work.available() > 0 {
- frame, ok := r.work.peek()
- if !ok {
- break
- }
- if !r.sink.WriteFrame(frame) {
- writeBlocked = true
- break
- }
- r.work.pop()
- written++
- }
- if written > 0 {
- r.drainAllowance -= float64(written)
- if r.drainAllowance < 0 {
- r.drainAllowance = 0
- }
- }
- if r.work.available() == 0 && r.prebufferFrames > 0 {
- // Re-arm the gate after dry-out to rebuild margin before resuming.
- r.gateOpen = false
- r.resetDrainPacerLocked(now)
- }
- r.stats.Prebuffering = false
- r.stats.WriteBlocked = writeBlocked
- if writeBlocked {
- r.stats.State = "degraded"
- } else {
- r.stats.State = "running"
- }
- r.updateBufferedStatsLocked()
- }
-
- func (r *Runtime) pacedDrainLimitLocked(now time.Time, bufferedFrames int) int {
- if bufferedFrames <= 0 {
- return 0
- }
- // Use workSampleRate which is kept in sync with sink.SampleRate via
- // handleChunk. This ensures the pacer drains at the actual decoded rate
- // rather than the initial (potentially wrong) configured rate.
- rate := r.workSampleRate
- if r.sink != nil && r.sink.GetSampleRate() > 0 {
- rate = r.sink.GetSampleRate()
- }
- if rate <= 0 {
- return bufferedFrames
- }
- if !r.lastDrainAt.IsZero() {
- elapsed := now.Sub(r.lastDrainAt)
- if elapsed > 0 {
- r.drainAllowance += elapsed.Seconds() * float64(rate)
- }
- }
- r.lastDrainAt = now
- maxAllowance := maxInt(1, rate/5) // cap accumulated credit at 200 ms
- if r.drainAllowance > float64(maxAllowance) {
- r.drainAllowance = float64(maxAllowance)
- }
- limit := int(r.drainAllowance)
- if limit <= 0 {
- return 0
- }
- maxBurst := maxInt(1, rate/50) // max 20 ms worth of frames per drain call
- if limit > maxBurst {
- limit = maxBurst
- }
- sinkStats := r.sink.Stats()
- headroom := sinkStats.Capacity - sinkStats.Available
- if headroom < 0 {
- headroom = 0
- }
- if limit > headroom {
- limit = headroom
- }
- if limit > bufferedFrames {
- limit = bufferedFrames
- }
- return limit
- }
-
- func (r *Runtime) resetDrainPacerLocked(now time.Time) {
- r.lastDrainAt = now
- r.drainAllowance = 0
- }
-
- func maxInt(a, b int) int {
- if a > b {
- return a
- }
- return b
- }
-
- func (r *Runtime) updateBufferedStatsLocked() {
- available := r.work.available()
- capacity := r.work.capacity()
- buffered := 0.0
- if capacity > 0 {
- buffered = float64(available) / float64(capacity)
- }
- bufferedSeconds := 0.0
- if r.workSampleRate > 0 {
- bufferedSeconds = float64(available) / float64(r.workSampleRate)
- }
- r.stats.Buffered = buffered
- r.stats.BufferedSeconds = bufferedSeconds
- }
-
- func (r *Runtime) Stats() Stats {
- r.mu.RLock()
- runtimeStats := r.stats
- active := r.active
- r.mu.RUnlock()
-
- sourceStats := SourceStats{}
- if r.source != nil {
- sourceStats = r.source.Stats()
- }
- if sourceStats.BufferedSeconds < runtimeStats.BufferedSeconds {
- sourceStats.BufferedSeconds = runtimeStats.BufferedSeconds
- }
- return Stats{
- Active: active,
- Source: sourceStats,
- Runtime: runtimeStats,
- }
- }
-
- type frameBuffer struct {
- frames []audio.Frame
- head int
- len int
- }
-
- func newFrameBuffer(capacity int) *frameBuffer {
- if capacity < 1 {
- capacity = 1
- }
- return &frameBuffer{frames: make([]audio.Frame, capacity)}
- }
-
- func (b *frameBuffer) capacity() int {
- return len(b.frames)
- }
-
- func (b *frameBuffer) available() int {
- return b.len
- }
-
- func (b *frameBuffer) reset() {
- b.head = 0
- b.len = 0
- }
-
- func (b *frameBuffer) push(frame audio.Frame) bool {
- if b.len >= len(b.frames) {
- return false
- }
- idx := (b.head + b.len) % len(b.frames)
- b.frames[idx] = frame
- b.len++
- return true
- }
-
- func (b *frameBuffer) peek() (audio.Frame, bool) {
- if b.len == 0 {
- return audio.Frame{}, false
- }
- return b.frames[b.head], true
- }
-
- func (b *frameBuffer) pop() (audio.Frame, bool) {
- if b.len == 0 {
- return audio.Frame{}, false
- }
- frame := b.frames[b.head]
- b.head = (b.head + 1) % len(b.frames)
- b.len--
- return frame, true
- }
|