|
- package srt
-
- import (
- "context"
- "fmt"
- "io"
- "net/url"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-
- "aoiprxkit"
- "github.com/jan/fm-rds-tx/internal/ingest"
- )
-
- type Option func(*Source)
-
- func WithConnOpener(opener aoiprxkit.SRTConnOpener) Option {
- return func(s *Source) {
- if opener != nil {
- s.opener = opener
- }
- }
- }
-
- type Source struct {
- id string
- cfg aoiprxkit.SRTConfig
-
- opener aoiprxkit.SRTConnOpener
-
- chunks chan ingest.PCMChunk
- errs chan error
-
- cancel context.CancelFunc
- wg sync.WaitGroup
-
- mu sync.Mutex
- rx *aoiprxkit.SRTReceiver
- started atomic.Bool
- closeOnce sync.Once
-
- state atomic.Value // string
- connected atomic.Bool
- chunksIn atomic.Uint64
- samplesIn atomic.Uint64
- overflows atomic.Uint64
- discontinuities atomic.Uint64
- transportLoss atomic.Uint64
- reorders atomic.Uint64
- lastChunkAtUnix atomic.Int64
- lastError atomic.Value // string
- nextSeq atomic.Uint64
-
- seqMu sync.Mutex
- lastFrame uint16
- lastHasVal bool
- }
-
- func New(id string, cfg aoiprxkit.SRTConfig, opts ...Option) *Source {
- if id == "" {
- id = "srt-main"
- }
- if cfg.Mode == "" {
- cfg.Mode = "listener"
- }
- if cfg.SampleRateHz <= 0 {
- cfg.SampleRateHz = 48000
- }
- if cfg.Channels <= 0 {
- cfg.Channels = 2
- }
-
- s := &Source{
- id: id,
- cfg: cfg,
- chunks: make(chan ingest.PCMChunk, 64),
- errs: make(chan error, 8),
- }
- for _, opt := range opts {
- if opt != nil {
- opt(s)
- }
- }
- s.state.Store("idle")
- s.lastError.Store("")
- return s
- }
-
- func (s *Source) Descriptor() ingest.SourceDescriptor {
- return ingest.SourceDescriptor{
- ID: s.id,
- Kind: "srt",
- Family: "aoip",
- Transport: "srt",
- Codec: "pcm_s32le",
- Channels: s.cfg.Channels,
- SampleRateHz: s.cfg.SampleRateHz,
- Detail: s.cfg.URL,
- Origin: &ingest.SourceOrigin{
- Kind: "url",
- Endpoint: redactURL(s.cfg.URL),
- Mode: strings.TrimSpace(s.cfg.Mode),
- },
- }
- }
-
- func (s *Source) Start(ctx context.Context) error {
- if !s.started.CompareAndSwap(false, true) {
- return nil
- }
-
- var (
- rx *aoiprxkit.SRTReceiver
- err error
- )
- if s.opener != nil {
- rx, err = aoiprxkit.NewSRTReceiverWithOpener(s.cfg, s.opener, s.handleFrame)
- } else {
- rx, err = aoiprxkit.NewSRTReceiver(s.cfg, s.handleFrame)
- }
- if err != nil {
- s.started.Store(false)
- s.connected.Store(false)
- s.state.Store("failed")
- s.setError(err)
- return err
- }
-
- runCtx, cancel := context.WithCancel(ctx)
- s.cancel = cancel
- s.mu.Lock()
- s.rx = rx
- s.mu.Unlock()
- s.lastError.Store("")
- s.connected.Store(false)
- s.state.Store("connecting")
-
- if err := rx.Start(runCtx); err != nil {
- s.started.Store(false)
- s.connected.Store(false)
- s.state.Store("failed")
- s.setError(err)
- return err
- }
- s.connected.Store(true)
- s.state.Store("running")
-
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
- <-runCtx.Done()
- _ = s.stopReceiver()
- s.connected.Store(false)
- s.closeChannels()
- }()
- return nil
- }
-
- func (s *Source) Stop() error {
- if !s.started.CompareAndSwap(true, false) {
- return nil
- }
- if s.cancel != nil {
- s.cancel()
- }
- if err := s.stopReceiver(); err != nil {
- s.setError(err)
- s.state.Store("failed")
- }
- s.wg.Wait()
- s.connected.Store(false)
- state, _ := s.state.Load().(string)
- if state != "failed" {
- s.state.Store("stopped")
- }
- return nil
- }
-
- func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
- func (s *Source) Errors() <-chan error { return s.errs }
-
- func (s *Source) Stats() ingest.SourceStats {
- state, _ := s.state.Load().(string)
- last := s.lastChunkAtUnix.Load()
- errStr, _ := s.lastError.Load().(string)
- var lastChunkAt time.Time
- if last > 0 {
- lastChunkAt = time.Unix(0, last)
- }
- return ingest.SourceStats{
- State: state,
- Connected: s.connected.Load(),
- LastChunkAt: lastChunkAt,
- ChunksIn: s.chunksIn.Load(),
- SamplesIn: s.samplesIn.Load(),
- Overflows: s.overflows.Load(),
- Discontinuities: s.discontinuities.Load(),
- TransportLoss: s.transportLoss.Load(),
- Reorders: s.reorders.Load(),
- LastError: errStr,
- }
- }
-
- func (s *Source) handleFrame(frame aoiprxkit.PCMFrame) {
- if !s.started.Load() {
- return
- }
-
- discontinuity := false
- s.seqMu.Lock()
- if s.lastHasVal {
- expected := s.lastFrame + 1
- if frame.SequenceNumber != expected {
- discontinuity = true
- delta := int16(frame.SequenceNumber - expected)
- if delta > 0 {
- s.transportLoss.Add(uint64(delta))
- } else {
- s.reorders.Add(1)
- }
- }
- }
- s.lastFrame = frame.SequenceNumber
- s.lastHasVal = true
- s.seqMu.Unlock()
-
- chunk := ingest.PCMChunk{
- Samples: append([]int32(nil), frame.Samples...),
- Channels: frame.Channels,
- SampleRateHz: frame.SampleRateHz,
- Sequence: s.nextSeq.Add(1) - 1,
- Timestamp: frame.ReceivedAt,
- SourceID: s.id,
- Discontinuity: discontinuity,
- }
-
- s.chunksIn.Add(1)
- s.samplesIn.Add(uint64(len(chunk.Samples)))
- s.lastChunkAtUnix.Store(time.Now().UnixNano())
- if discontinuity {
- s.discontinuities.Add(1)
- }
-
- select {
- case s.chunks <- chunk:
- default:
- s.overflows.Add(1)
- s.discontinuities.Add(1)
- s.setError(io.ErrShortBuffer)
- s.emitError(fmt.Errorf("srt chunk buffer overflow"))
- }
- }
-
- func (s *Source) stopReceiver() error {
- s.mu.Lock()
- rx := s.rx
- s.rx = nil
- s.mu.Unlock()
- if rx == nil {
- return nil
- }
- return rx.Stop()
- }
-
- func (s *Source) closeChannels() {
- s.closeOnce.Do(func() {
- close(s.chunks)
- close(s.errs)
- })
- }
-
- func (s *Source) setError(err error) {
- if err == nil {
- return
- }
- s.lastError.Store(err.Error())
- s.emitError(err)
- }
-
- func (s *Source) emitError(err error) {
- if err == nil {
- return
- }
- select {
- case s.errs <- err:
- default:
- }
- }
-
- func redactURL(raw string) string {
- trimmed := strings.TrimSpace(raw)
- if trimmed == "" {
- return ""
- }
- u, err := url.Parse(trimmed)
- if err != nil || u.Host == "" {
- return trimmed
- }
- u.User = nil
- u.RawQuery = ""
- u.Fragment = ""
- return u.String()
- }
|