package aoip import ( "context" "fmt" "io" "sync" "sync/atomic" "time" "aoiprxkit" "github.com/jan/fm-rds-tx/internal/ingest" ) type ReceiverClient interface { Start(ctx context.Context) error Stop() error Stats() aoiprxkit.Stats } type ReceiverFactory func(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error) type Option func(*Source) func WithReceiverFactory(factory ReceiverFactory) Option { return func(s *Source) { if factory != nil { s.factory = factory } } } func WithDetail(detail string) Option { return func(s *Source) { s.detail = detail } } func WithOrigin(origin ingest.SourceOrigin) Option { return func(s *Source) { clone := origin s.origin = &clone } } type Source struct { id string cfg aoiprxkit.Config factory ReceiverFactory detail string origin *ingest.SourceOrigin chunks chan ingest.PCMChunk errs chan error cancel context.CancelFunc wg sync.WaitGroup mu sync.Mutex rx ReceiverClient started atomic.Bool closeOnce sync.Once state atomic.Value // string connected atomic.Bool chunksIn atomic.Uint64 samplesIn atomic.Uint64 overflows atomic.Uint64 discontinuities atomic.Uint64 transportLoss atomic.Uint64 reorders atomic.Uint64 lastChunkAtUnix atomic.Int64 lastError atomic.Value // string nextSeq atomic.Uint64 seqMu sync.Mutex lastFrame uint16 lastHasVal bool } func New(id string, cfg aoiprxkit.Config, opts ...Option) *Source { if id == "" { id = "aes67-main" } if cfg.MulticastGroup == "" { cfg = aoiprxkit.DefaultConfig() } s := &Source{ id: id, cfg: cfg, factory: newReceiverAdapter, chunks: make(chan ingest.PCMChunk, 64), errs: make(chan error, 8), } for _, opt := range opts { if opt != nil { opt(s) } } s.state.Store("idle") s.lastError.Store("") return s } func (s *Source) Descriptor() ingest.SourceDescriptor { detail := s.detail if detail == "" { detail = fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port) } origin := s.origin if origin == nil { origin = &ingest.SourceOrigin{ Kind: "manual", } } if origin.Endpoint == "" { copyOrigin := *origin copyOrigin.Endpoint = fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port) origin = ©Origin } return ingest.SourceDescriptor{ ID: s.id, Kind: "aes67", Family: "aoip", Transport: "rtp", Codec: "l24", Channels: s.cfg.Channels, SampleRateHz: s.cfg.SampleRateHz, Detail: detail, Origin: origin, } } func (s *Source) Start(ctx context.Context) error { if !s.started.CompareAndSwap(false, true) { return nil } rx, err := s.factory(s.cfg, s.handleFrame) if err != nil { s.started.Store(false) s.connected.Store(false) s.state.Store("failed") s.setError(err) return err } runCtx, cancel := context.WithCancel(ctx) s.cancel = cancel s.mu.Lock() s.rx = rx s.mu.Unlock() s.lastError.Store("") s.connected.Store(false) s.state.Store("connecting") if err := rx.Start(runCtx); err != nil { s.started.Store(false) s.connected.Store(false) s.state.Store("failed") s.setError(err) return err } s.connected.Store(true) s.state.Store("running") s.wg.Add(1) go func() { defer s.wg.Done() <-runCtx.Done() _ = s.stopReceiver() s.connected.Store(false) s.closeChannels() }() return nil } func (s *Source) Stop() error { if !s.started.CompareAndSwap(true, false) { return nil } if s.cancel != nil { s.cancel() } if err := s.stopReceiver(); err != nil { s.setError(err) s.state.Store("failed") } s.wg.Wait() s.connected.Store(false) state, _ := s.state.Load().(string) if state != "failed" { s.state.Store("stopped") } return nil } func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks } func (s *Source) Errors() <-chan error { return s.errs } func (s *Source) Stats() ingest.SourceStats { state, _ := s.state.Load().(string) last := s.lastChunkAtUnix.Load() errStr, _ := s.lastError.Load().(string) var lastChunkAt time.Time if last > 0 { lastChunkAt = time.Unix(0, last) } var rxStats aoiprxkit.Stats s.mu.Lock() rx := s.rx s.mu.Unlock() if rx != nil { rxStats = rx.Stats() } transportLoss := s.transportLoss.Load() if rxStats.PacketsGapLoss > transportLoss { transportLoss = rxStats.PacketsGapLoss } reorders := s.reorders.Load() if rxStats.JitterReorders > reorders { reorders = rxStats.JitterReorders } return ingest.SourceStats{ State: state, Connected: s.connected.Load(), LastChunkAt: lastChunkAt, ChunksIn: s.chunksIn.Load(), SamplesIn: s.samplesIn.Load(), Overflows: s.overflows.Load(), Underruns: rxStats.PacketsLateDrop, Discontinuities: s.discontinuities.Load() + rxStats.PacketsLateDrop, TransportLoss: transportLoss, Reorders: reorders, JitterDepth: s.cfg.JitterDepthPackets, LastError: errStr, } } func (s *Source) handleFrame(frame aoiprxkit.PCMFrame) { if !s.started.Load() { return } discontinuity := false s.seqMu.Lock() if s.lastHasVal { expected := s.lastFrame + 1 if frame.SequenceNumber != expected { discontinuity = true delta := int16(frame.SequenceNumber - expected) if delta > 0 { s.transportLoss.Add(uint64(delta)) } else { s.reorders.Add(1) } } } s.lastFrame = frame.SequenceNumber s.lastHasVal = true s.seqMu.Unlock() chunk := ingest.PCMChunk{ Samples: append([]int32(nil), frame.Samples...), Channels: frame.Channels, SampleRateHz: frame.SampleRateHz, Sequence: s.nextSeq.Add(1) - 1, Timestamp: frame.ReceivedAt, SourceID: s.id, Discontinuity: discontinuity, } s.chunksIn.Add(1) s.samplesIn.Add(uint64(len(chunk.Samples))) s.lastChunkAtUnix.Store(time.Now().UnixNano()) if discontinuity { s.discontinuities.Add(1) } select { case s.chunks <- chunk: default: s.overflows.Add(1) s.discontinuities.Add(1) s.setError(io.ErrShortBuffer) s.emitError(fmt.Errorf("aes67 chunk buffer overflow")) } } func (s *Source) stopReceiver() error { s.mu.Lock() rx := s.rx s.rx = nil s.mu.Unlock() if rx == nil { return nil } return rx.Stop() } func (s *Source) closeChannels() { s.closeOnce.Do(func() { close(s.chunks) close(s.errs) }) } func (s *Source) setError(err error) { if err == nil { return } s.lastError.Store(err.Error()) s.emitError(err) } func (s *Source) emitError(err error) { if err == nil { return } select { case s.errs <- err: default: } } type receiverAdapter struct { *aoiprxkit.Receiver } func newReceiverAdapter(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error) { rx, err := aoiprxkit.NewReceiver(cfg, onFrame) if err != nil { return nil, err } return &receiverAdapter{Receiver: rx}, nil }