package ingest import ( "context" "sync" "sync/atomic" "time" "github.com/jan/fm-rds-tx/internal/audio" ) type Runtime struct { sink *audio.StreamSource source Source started atomic.Bool onTitle func(string) ctx context.Context cancel context.CancelFunc wg sync.WaitGroup mu sync.RWMutex active SourceDescriptor stats RuntimeStats } type RuntimeOption func(*Runtime) func WithStreamTitleHandler(handler func(string)) RuntimeOption { return func(r *Runtime) { r.onTitle = handler } } func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime { r := &Runtime{ sink: sink, source: src, stats: RuntimeStats{ State: "idle", }, } for _, opt := range opts { if opt != nil { opt(r) } } return r } func (r *Runtime) Start(ctx context.Context) error { if r.sink == nil { r.mu.Lock() r.stats.State = "failed" r.mu.Unlock() return nil } if r.source == nil { r.mu.Lock() r.stats.State = "idle" r.mu.Unlock() return nil } if !r.started.CompareAndSwap(false, true) { return nil } r.ctx, r.cancel = context.WithCancel(ctx) r.mu.Lock() r.active = r.source.Descriptor() r.stats.State = "starting" r.mu.Unlock() if err := r.source.Start(r.ctx); err != nil { r.started.Store(false) r.mu.Lock() r.stats.State = "failed" r.mu.Unlock() return err } r.wg.Add(1) go r.run() return nil } func (r *Runtime) Stop() error { if !r.started.CompareAndSwap(true, false) { return nil } if r.cancel != nil { r.cancel() } if r.source != nil { _ = r.source.Stop() } r.wg.Wait() r.mu.Lock() r.stats.State = "stopped" r.mu.Unlock() return nil } func (r *Runtime) run() { defer r.wg.Done() r.mu.Lock() r.stats.State = "running" r.mu.Unlock() ch := r.source.Chunks() errCh := r.source.Errors() var titleCh <-chan string if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil { titleCh = src.StreamTitleUpdates() } for { select { case <-r.ctx.Done(): return case err, ok := <-errCh: if !ok { errCh = nil continue } if err == nil { continue } r.mu.Lock() r.stats.State = "degraded" r.mu.Unlock() case chunk, ok := <-ch: if !ok { r.mu.Lock() r.stats.State = "stopped" r.mu.Unlock() return } r.handleChunk(chunk) case title, ok := <-titleCh: if !ok { titleCh = nil continue } r.onTitle(title) } } } func (r *Runtime) handleChunk(chunk PCMChunk) { frames, err := ChunkToFrames(chunk) if err != nil { r.mu.Lock() r.stats.ConvertErrors++ r.stats.State = "degraded" r.mu.Unlock() return } dropped := uint64(0) for _, frame := range frames { if !r.sink.WriteFrame(frame) { dropped++ } } r.mu.Lock() if chunk.SampleRateHz > 0 { r.active.SampleRateHz = chunk.SampleRateHz } if chunk.Channels > 0 { r.active.Channels = chunk.Channels } r.stats.State = "running" r.stats.LastChunkAt = time.Now() r.stats.DroppedFrames += dropped r.stats.WriteBlocked = dropped > 0 r.mu.Unlock() } func (r *Runtime) Stats() Stats { r.mu.RLock() runtimeStats := r.stats active := r.active r.mu.RUnlock() sourceStats := SourceStats{} if r.source != nil { sourceStats = r.source.Stats() } return Stats{ Active: active, Source: sourceStats, Runtime: runtimeStats, } }