|
- package aoip
-
- import (
- "context"
- "fmt"
- "io"
- "sync"
- "sync/atomic"
- "time"
-
- "aoiprxkit"
- "github.com/jan/fm-rds-tx/internal/ingest"
- )
-
- type ReceiverClient interface {
- Start(ctx context.Context) error
- Stop() error
- Stats() aoiprxkit.Stats
- }
-
- type ReceiverFactory func(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error)
-
- type Option func(*Source)
-
- func WithReceiverFactory(factory ReceiverFactory) Option {
- return func(s *Source) {
- if factory != nil {
- s.factory = factory
- }
- }
- }
-
- func WithDetail(detail string) Option {
- return func(s *Source) {
- s.detail = detail
- }
- }
-
- type Source struct {
- id string
- cfg aoiprxkit.Config
-
- factory ReceiverFactory
- detail string
-
- chunks chan ingest.PCMChunk
- errs chan error
-
- cancel context.CancelFunc
- wg sync.WaitGroup
-
- mu sync.Mutex
- rx ReceiverClient
- started atomic.Bool
- closeOnce sync.Once
-
- state atomic.Value // string
- connected atomic.Bool
- chunksIn atomic.Uint64
- samplesIn atomic.Uint64
- overflows atomic.Uint64
- discontinuities atomic.Uint64
- transportLoss atomic.Uint64
- reorders atomic.Uint64
- lastChunkAtUnix atomic.Int64
- lastError atomic.Value // string
- nextSeq atomic.Uint64
-
- seqMu sync.Mutex
- lastFrame uint16
- lastHasVal bool
- }
-
- func New(id string, cfg aoiprxkit.Config, opts ...Option) *Source {
- if id == "" {
- id = "aes67-main"
- }
- if cfg.MulticastGroup == "" {
- cfg = aoiprxkit.DefaultConfig()
- }
- s := &Source{
- id: id,
- cfg: cfg,
- factory: newReceiverAdapter,
- chunks: make(chan ingest.PCMChunk, 64),
- errs: make(chan error, 8),
- }
- for _, opt := range opts {
- if opt != nil {
- opt(s)
- }
- }
- s.state.Store("idle")
- s.lastError.Store("")
- return s
- }
-
- func (s *Source) Descriptor() ingest.SourceDescriptor {
- detail := s.detail
- if detail == "" {
- detail = fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port)
- }
- return ingest.SourceDescriptor{
- ID: s.id,
- Kind: "aes67",
- Family: "aoip",
- Transport: "rtp",
- Codec: "l24",
- Channels: s.cfg.Channels,
- SampleRateHz: s.cfg.SampleRateHz,
- Detail: detail,
- }
- }
-
- func (s *Source) Start(ctx context.Context) error {
- if !s.started.CompareAndSwap(false, true) {
- return nil
- }
-
- rx, err := s.factory(s.cfg, s.handleFrame)
- if err != nil {
- s.started.Store(false)
- s.connected.Store(false)
- s.state.Store("failed")
- s.setError(err)
- return err
- }
-
- runCtx, cancel := context.WithCancel(ctx)
- s.cancel = cancel
- s.mu.Lock()
- s.rx = rx
- s.mu.Unlock()
- s.lastError.Store("")
- s.connected.Store(false)
- s.state.Store("connecting")
-
- if err := rx.Start(runCtx); err != nil {
- s.started.Store(false)
- s.connected.Store(false)
- s.state.Store("failed")
- s.setError(err)
- return err
- }
- s.connected.Store(true)
- s.state.Store("running")
-
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
- <-runCtx.Done()
- _ = s.stopReceiver()
- s.connected.Store(false)
- s.closeChannels()
- }()
- return nil
- }
-
- func (s *Source) Stop() error {
- if !s.started.CompareAndSwap(true, false) {
- return nil
- }
- if s.cancel != nil {
- s.cancel()
- }
- if err := s.stopReceiver(); err != nil {
- s.setError(err)
- s.state.Store("failed")
- }
- s.wg.Wait()
- s.connected.Store(false)
- state, _ := s.state.Load().(string)
- if state != "failed" {
- s.state.Store("stopped")
- }
- return nil
- }
-
- func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
- func (s *Source) Errors() <-chan error { return s.errs }
-
- func (s *Source) Stats() ingest.SourceStats {
- state, _ := s.state.Load().(string)
- last := s.lastChunkAtUnix.Load()
- errStr, _ := s.lastError.Load().(string)
- var lastChunkAt time.Time
- if last > 0 {
- lastChunkAt = time.Unix(0, last)
- }
- var rxStats aoiprxkit.Stats
- s.mu.Lock()
- rx := s.rx
- s.mu.Unlock()
- if rx != nil {
- rxStats = rx.Stats()
- }
- transportLoss := s.transportLoss.Load()
- if rxStats.PacketsGapLoss > transportLoss {
- transportLoss = rxStats.PacketsGapLoss
- }
- reorders := s.reorders.Load()
- if rxStats.JitterReorders > reorders {
- reorders = rxStats.JitterReorders
- }
- return ingest.SourceStats{
- State: state,
- Connected: s.connected.Load(),
- LastChunkAt: lastChunkAt,
- ChunksIn: s.chunksIn.Load(),
- SamplesIn: s.samplesIn.Load(),
- Overflows: s.overflows.Load(),
- Underruns: rxStats.PacketsLateDrop,
- Discontinuities: s.discontinuities.Load() + rxStats.PacketsLateDrop,
- TransportLoss: transportLoss,
- Reorders: reorders,
- JitterDepth: s.cfg.JitterDepthPackets,
- LastError: errStr,
- }
- }
-
- func (s *Source) handleFrame(frame aoiprxkit.PCMFrame) {
- if !s.started.Load() {
- return
- }
-
- discontinuity := false
- s.seqMu.Lock()
- if s.lastHasVal {
- expected := s.lastFrame + 1
- if frame.SequenceNumber != expected {
- discontinuity = true
- delta := int16(frame.SequenceNumber - expected)
- if delta > 0 {
- s.transportLoss.Add(uint64(delta))
- } else {
- s.reorders.Add(1)
- }
- }
- }
- s.lastFrame = frame.SequenceNumber
- s.lastHasVal = true
- s.seqMu.Unlock()
-
- chunk := ingest.PCMChunk{
- Samples: append([]int32(nil), frame.Samples...),
- Channels: frame.Channels,
- SampleRateHz: frame.SampleRateHz,
- Sequence: s.nextSeq.Add(1) - 1,
- Timestamp: frame.ReceivedAt,
- SourceID: s.id,
- Discontinuity: discontinuity,
- }
-
- s.chunksIn.Add(1)
- s.samplesIn.Add(uint64(len(chunk.Samples)))
- s.lastChunkAtUnix.Store(time.Now().UnixNano())
- if discontinuity {
- s.discontinuities.Add(1)
- }
-
- select {
- case s.chunks <- chunk:
- default:
- s.overflows.Add(1)
- s.discontinuities.Add(1)
- s.setError(io.ErrShortBuffer)
- s.emitError(fmt.Errorf("aes67 chunk buffer overflow"))
- }
- }
-
- func (s *Source) stopReceiver() error {
- s.mu.Lock()
- rx := s.rx
- s.rx = nil
- s.mu.Unlock()
- if rx == nil {
- return nil
- }
- return rx.Stop()
- }
-
- func (s *Source) closeChannels() {
- s.closeOnce.Do(func() {
- close(s.chunks)
- close(s.errs)
- })
- }
-
- func (s *Source) setError(err error) {
- if err == nil {
- return
- }
- s.lastError.Store(err.Error())
- s.emitError(err)
- }
-
- func (s *Source) emitError(err error) {
- if err == nil {
- return
- }
- select {
- case s.errs <- err:
- default:
- }
- }
-
- type receiverAdapter struct {
- *aoiprxkit.Receiver
- }
-
- func newReceiverAdapter(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error) {
- rx, err := aoiprxkit.NewReceiver(cfg, onFrame)
- if err != nil {
- return nil, err
- }
- return &receiverAdapter{Receiver: rx}, nil
- }
|