diff --git a/internal/config/config.go b/internal/config/config.go index eba5ecb..d890d4c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -78,6 +78,7 @@ type IngestConfig struct { HTTPRaw IngestPCMConfig `json:"httpRaw"` Icecast IngestIcecastConfig `json:"icecast"` SRT IngestSRTConfig `json:"srt"` + AES67 IngestAES67Config `json:"aes67"` } type IngestReconnectConfig struct { @@ -112,6 +113,21 @@ type IngestSRTConfig struct { Channels int `json:"channels"` } +type IngestAES67Config struct { + SDPPath string `json:"sdpPath"` + SDP string `json:"sdp"` + MulticastGroup string `json:"multicastGroup"` + Port int `json:"port"` + InterfaceName string `json:"interfaceName"` + PayloadType int `json:"payloadType"` + SampleRateHz int `json:"sampleRateHz"` + Channels int `json:"channels"` + Encoding string `json:"encoding"` + PacketTimeMs int `json:"packetTimeMs"` + JitterDepthPackets int `json:"jitterDepthPackets"` + ReadBufferBytes int `json:"readBufferBytes"` +} + func Default() Config { return Config{ Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0.4}, @@ -165,6 +181,15 @@ func Default() Config { SampleRateHz: 48000, Channels: 2, }, + AES67: IngestAES67Config{ + PayloadType: 97, + SampleRateHz: 48000, + Channels: 2, + Encoding: "L24", + PacketTimeMs: 1, + JitterDepthPackets: 8, + ReadBufferBytes: 1 << 20, + }, }, } } @@ -253,7 +278,7 @@ func (c Config) Validate() error { } ingestKind := strings.ToLower(strings.TrimSpace(c.Ingest.Kind)) switch ingestKind { - case "none", "stdin", "stdin-pcm", "http-raw", "icecast", "srt": + case "none", "stdin", "stdin-pcm", "http-raw", "icecast", "srt", "aes67", "aoip", "aoip-rtp": default: return fmt.Errorf("ingest.kind unsupported: %s", c.Ingest.Kind) } @@ -290,6 +315,42 @@ func (c Config) Validate() error { if ingestKind == "srt" && strings.TrimSpace(c.Ingest.SRT.URL) == "" { return fmt.Errorf("ingest.srt.url is required when ingest.kind=srt") } + if ingestKind == "aes67" || ingestKind == "aoip" || ingestKind == "aoip-rtp" { + hasSDP := strings.TrimSpace(c.Ingest.AES67.SDP) != "" + hasSDPPath := strings.TrimSpace(c.Ingest.AES67.SDPPath) != "" + if hasSDP && hasSDPPath { + return fmt.Errorf("ingest.aes67.sdp and ingest.aes67.sdpPath are mutually exclusive") + } + if !hasSDP && !hasSDPPath { + if strings.TrimSpace(c.Ingest.AES67.MulticastGroup) == "" { + return fmt.Errorf("ingest.aes67.multicastGroup is required when ingest.kind=%s", ingestKind) + } + if c.Ingest.AES67.Port <= 0 || c.Ingest.AES67.Port > 65535 { + return fmt.Errorf("ingest.aes67.port must be 1..65535") + } + } + if c.Ingest.AES67.PayloadType < 0 || c.Ingest.AES67.PayloadType > 127 { + return fmt.Errorf("ingest.aes67.payloadType must be 0..127") + } + if c.Ingest.AES67.SampleRateHz <= 0 { + return fmt.Errorf("ingest.aes67.sampleRateHz must be > 0") + } + if c.Ingest.AES67.Channels != 1 && c.Ingest.AES67.Channels != 2 { + return fmt.Errorf("ingest.aes67.channels must be 1 or 2") + } + if strings.ToUpper(strings.TrimSpace(c.Ingest.AES67.Encoding)) != "L24" { + return fmt.Errorf("ingest.aes67.encoding must be L24") + } + if c.Ingest.AES67.PacketTimeMs <= 0 { + return fmt.Errorf("ingest.aes67.packetTimeMs must be > 0") + } + if c.Ingest.AES67.JitterDepthPackets < 1 { + return fmt.Errorf("ingest.aes67.jitterDepthPackets must be >= 1") + } + if c.Ingest.AES67.ReadBufferBytes < 0 { + return fmt.Errorf("ingest.aes67.readBufferBytes must be >= 0") + } + } switch strings.ToLower(strings.TrimSpace(c.Ingest.SRT.Mode)) { case "", "listener", "caller", "rendezvous": default: diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 6002e54..3376cc8 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -165,6 +165,44 @@ func TestValidateRejectsInvalidSRTConfig(t *testing.T) { } } +func TestValidateRejectsInvalidAES67Config(t *testing.T) { + cfg := Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "" + if err := cfg.Validate(); err == nil { + t.Fatal("expected aes67 multicast group error") + } + + cfg = Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "239.10.20.30" + cfg.Ingest.AES67.Port = 5004 + cfg.Ingest.AES67.Encoding = "L16" + if err := cfg.Validate(); err == nil { + t.Fatal("expected aes67 encoding error") + } + + cfg = Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "239.10.20.30" + cfg.Ingest.AES67.Port = 5004 + cfg.Ingest.AES67.SDP = "v=0" + cfg.Ingest.AES67.SDPPath = "stream.sdp" + if err := cfg.Validate(); err == nil { + t.Fatal("expected mutually exclusive sdp/sdpPath error") + } +} + +func TestValidateAcceptsAES67WithSDPOnly(t *testing.T) { + cfg := Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "" + cfg.Ingest.AES67.SDP = "v=0\r\ns=demo\r\nc=IN IP4 239.10.20.30\r\nm=audio 5004 RTP/AVP 97\r\na=rtpmap:97 L24/48000/2\r\n" + if err := cfg.Validate(); err != nil { + t.Fatalf("expected aes67 with SDP to validate: %v", err) + } +} + func TestValidateRejectsUnsupportedIngestPCMShape(t *testing.T) { cfg := Default() cfg.Ingest.Stdin.SampleRateHz = 0 diff --git a/internal/ingest/adapters/aoip/source.go b/internal/ingest/adapters/aoip/source.go new file mode 100644 index 0000000..3d24346 --- /dev/null +++ b/internal/ingest/adapters/aoip/source.go @@ -0,0 +1,306 @@ +package aoip + +import ( + "context" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "aoiprxkit" + "github.com/jan/fm-rds-tx/internal/ingest" +) + +type ReceiverClient interface { + Start(ctx context.Context) error + Stop() error + Stats() aoiprxkit.Stats +} + +type ReceiverFactory func(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error) + +type Option func(*Source) + +func WithReceiverFactory(factory ReceiverFactory) Option { + return func(s *Source) { + if factory != nil { + s.factory = factory + } + } +} + +type Source struct { + id string + cfg aoiprxkit.Config + + factory ReceiverFactory + + chunks chan ingest.PCMChunk + errs chan error + + cancel context.CancelFunc + wg sync.WaitGroup + + mu sync.Mutex + rx ReceiverClient + started atomic.Bool + closeOnce sync.Once + + state atomic.Value // string + connected atomic.Bool + chunksIn atomic.Uint64 + samplesIn atomic.Uint64 + overflows atomic.Uint64 + discontinuities atomic.Uint64 + transportLoss atomic.Uint64 + reorders atomic.Uint64 + lastChunkAtUnix atomic.Int64 + lastError atomic.Value // string + nextSeq atomic.Uint64 + + seqMu sync.Mutex + lastFrame uint16 + lastHasVal bool +} + +func New(id string, cfg aoiprxkit.Config, opts ...Option) *Source { + if id == "" { + id = "aes67-main" + } + if cfg.MulticastGroup == "" { + cfg = aoiprxkit.DefaultConfig() + } + s := &Source{ + id: id, + cfg: cfg, + factory: newReceiverAdapter, + chunks: make(chan ingest.PCMChunk, 64), + errs: make(chan error, 8), + } + for _, opt := range opts { + if opt != nil { + opt(s) + } + } + s.state.Store("idle") + s.lastError.Store("") + return s +} + +func (s *Source) Descriptor() ingest.SourceDescriptor { + return ingest.SourceDescriptor{ + ID: s.id, + Kind: "aes67", + Family: "aoip", + Transport: "rtp", + Codec: "l24", + Channels: s.cfg.Channels, + SampleRateHz: s.cfg.SampleRateHz, + Detail: fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port), + } +} + +func (s *Source) Start(ctx context.Context) error { + if !s.started.CompareAndSwap(false, true) { + return nil + } + + rx, err := s.factory(s.cfg, s.handleFrame) + if err != nil { + s.started.Store(false) + s.connected.Store(false) + s.state.Store("failed") + s.setError(err) + return err + } + + runCtx, cancel := context.WithCancel(ctx) + s.cancel = cancel + s.mu.Lock() + s.rx = rx + s.mu.Unlock() + s.lastError.Store("") + s.connected.Store(false) + s.state.Store("connecting") + + if err := rx.Start(runCtx); err != nil { + s.started.Store(false) + s.connected.Store(false) + s.state.Store("failed") + s.setError(err) + return err + } + s.connected.Store(true) + s.state.Store("running") + + s.wg.Add(1) + go func() { + defer s.wg.Done() + <-runCtx.Done() + _ = s.stopReceiver() + s.connected.Store(false) + s.closeChannels() + }() + return nil +} + +func (s *Source) Stop() error { + if !s.started.CompareAndSwap(true, false) { + return nil + } + if s.cancel != nil { + s.cancel() + } + if err := s.stopReceiver(); err != nil { + s.setError(err) + s.state.Store("failed") + } + s.wg.Wait() + s.connected.Store(false) + state, _ := s.state.Load().(string) + if state != "failed" { + s.state.Store("stopped") + } + return nil +} + +func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks } +func (s *Source) Errors() <-chan error { return s.errs } + +func (s *Source) Stats() ingest.SourceStats { + state, _ := s.state.Load().(string) + last := s.lastChunkAtUnix.Load() + errStr, _ := s.lastError.Load().(string) + var lastChunkAt time.Time + if last > 0 { + lastChunkAt = time.Unix(0, last) + } + var rxStats aoiprxkit.Stats + s.mu.Lock() + rx := s.rx + s.mu.Unlock() + if rx != nil { + rxStats = rx.Stats() + } + transportLoss := s.transportLoss.Load() + if rxStats.PacketsGapLoss > transportLoss { + transportLoss = rxStats.PacketsGapLoss + } + reorders := s.reorders.Load() + if rxStats.JitterReorders > reorders { + reorders = rxStats.JitterReorders + } + return ingest.SourceStats{ + State: state, + Connected: s.connected.Load(), + LastChunkAt: lastChunkAt, + ChunksIn: s.chunksIn.Load(), + SamplesIn: s.samplesIn.Load(), + Overflows: s.overflows.Load(), + Underruns: rxStats.PacketsLateDrop, + Discontinuities: s.discontinuities.Load() + rxStats.PacketsLateDrop, + TransportLoss: transportLoss, + Reorders: reorders, + JitterDepth: s.cfg.JitterDepthPackets, + LastError: errStr, + } +} + +func (s *Source) handleFrame(frame aoiprxkit.PCMFrame) { + if !s.started.Load() { + return + } + + discontinuity := false + s.seqMu.Lock() + if s.lastHasVal { + expected := s.lastFrame + 1 + if frame.SequenceNumber != expected { + discontinuity = true + delta := int16(frame.SequenceNumber - expected) + if delta > 0 { + s.transportLoss.Add(uint64(delta)) + } else { + s.reorders.Add(1) + } + } + } + s.lastFrame = frame.SequenceNumber + s.lastHasVal = true + s.seqMu.Unlock() + + chunk := ingest.PCMChunk{ + Samples: append([]int32(nil), frame.Samples...), + Channels: frame.Channels, + SampleRateHz: frame.SampleRateHz, + Sequence: s.nextSeq.Add(1) - 1, + Timestamp: frame.ReceivedAt, + SourceID: s.id, + Discontinuity: discontinuity, + } + + s.chunksIn.Add(1) + s.samplesIn.Add(uint64(len(chunk.Samples))) + s.lastChunkAtUnix.Store(time.Now().UnixNano()) + if discontinuity { + s.discontinuities.Add(1) + } + + select { + case s.chunks <- chunk: + default: + s.overflows.Add(1) + s.discontinuities.Add(1) + s.setError(io.ErrShortBuffer) + s.emitError(fmt.Errorf("aes67 chunk buffer overflow")) + } +} + +func (s *Source) stopReceiver() error { + s.mu.Lock() + rx := s.rx + s.rx = nil + s.mu.Unlock() + if rx == nil { + return nil + } + return rx.Stop() +} + +func (s *Source) closeChannels() { + s.closeOnce.Do(func() { + close(s.chunks) + close(s.errs) + }) +} + +func (s *Source) setError(err error) { + if err == nil { + return + } + s.lastError.Store(err.Error()) + s.emitError(err) +} + +func (s *Source) emitError(err error) { + if err == nil { + return + } + select { + case s.errs <- err: + default: + } +} + +type receiverAdapter struct { + *aoiprxkit.Receiver +} + +func newReceiverAdapter(cfg aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error) { + rx, err := aoiprxkit.NewReceiver(cfg, onFrame) + if err != nil { + return nil, err + } + return &receiverAdapter{Receiver: rx}, nil +} diff --git a/internal/ingest/adapters/aoip/source_test.go b/internal/ingest/adapters/aoip/source_test.go new file mode 100644 index 0000000..434f6cf --- /dev/null +++ b/internal/ingest/adapters/aoip/source_test.go @@ -0,0 +1,127 @@ +package aoip + +import ( + "context" + "testing" + "time" + + "aoiprxkit" + "github.com/jan/fm-rds-tx/internal/ingest" +) + +type stubReceiver struct { + onStart func() + onStop func() + stats aoiprxkit.Stats +} + +func (r *stubReceiver) Start(context.Context) error { + if r.onStart != nil { + r.onStart() + } + return nil +} + +func (r *stubReceiver) Stop() error { + if r.onStop != nil { + r.onStop() + } + return nil +} + +func (r *stubReceiver) Stats() aoiprxkit.Stats { + return r.stats +} + +func TestSourceEmitsChunksAndMapsStats(t *testing.T) { + var handler aoiprxkit.FrameHandler + rx := &stubReceiver{ + stats: aoiprxkit.Stats{ + PacketsGapLoss: 1, + PacketsLateDrop: 2, + JitterReorders: 1, + }, + } + src := New("aes67-test", aoiprxkit.Config{ + MulticastGroup: "239.10.20.30", + Port: 5004, + PayloadType: 97, + SampleRateHz: 48000, + Channels: 2, + Encoding: "L24", + PacketTime: time.Millisecond, + JitterDepthPackets: 6, + }, WithReceiverFactory(func(_ aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (ReceiverClient, error) { + handler = onFrame + return rx, nil + })) + + if err := src.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer src.Stop() + + handler(aoiprxkit.PCMFrame{ + SequenceNumber: 100, + SampleRateHz: 48000, + Channels: 2, + Samples: []int32{1, -1, 2, -2}, + ReceivedAt: time.Now(), + }) + handler(aoiprxkit.PCMFrame{ + SequenceNumber: 102, + SampleRateHz: 48000, + Channels: 2, + Samples: []int32{3, -3, 4, -4}, + ReceivedAt: time.Now(), + }) + + chunk1 := readChunk(t, src.Chunks()) + if chunk1.Discontinuity { + t.Fatalf("first chunk should not be discontinuity") + } + chunk2 := readChunk(t, src.Chunks()) + if !chunk2.Discontinuity { + t.Fatalf("second chunk should be discontinuity on sequence gap") + } + + stats := src.Stats() + if stats.State != "running" { + t.Fatalf("state=%q want running", stats.State) + } + if !stats.Connected { + t.Fatalf("connected=false want true") + } + if stats.ChunksIn != 2 { + t.Fatalf("chunksIn=%d want 2", stats.ChunksIn) + } + if stats.SamplesIn != 8 { + t.Fatalf("samplesIn=%d want 8", stats.SamplesIn) + } + if stats.TransportLoss != 1 { + t.Fatalf("transportLoss=%d want 1", stats.TransportLoss) + } + if stats.Reorders != 1 { + t.Fatalf("reorders=%d want 1", stats.Reorders) + } + if stats.Underruns != 2 { + t.Fatalf("underruns=%d want 2", stats.Underruns) + } + if stats.JitterDepth != 6 { + t.Fatalf("jitterDepth=%d want 6", stats.JitterDepth) + } +} + +func readChunk(t *testing.T, ch <-chan ingest.PCMChunk) ingest.PCMChunk { + t.Helper() + select { + case chunk, ok := <-ch: + if !ok { + t.Fatal("chunk channel closed") + } + return chunk + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for chunk") + return ingest.PCMChunk{} + } +} diff --git a/internal/ingest/factory/factory.go b/internal/ingest/factory/factory.go index 62146fa..85fd34a 100644 --- a/internal/ingest/factory/factory.go +++ b/internal/ingest/factory/factory.go @@ -5,11 +5,14 @@ import ( "io" "net/http" "os" + "path/filepath" "strings" + "time" "aoiprxkit" "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/adapters/aoip" "github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw" "github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast" "github.com/jan/fm-rds-tx/internal/ingest/adapters/srt" @@ -17,9 +20,10 @@ import ( ) type Deps struct { - Stdin io.Reader - HTTP *http.Client - SRTOpener aoiprxkit.SRTConnOpener + Stdin io.Reader + HTTP *http.Client + SRTOpener aoiprxkit.SRTConnOpener + AES67ReceiverFactory aoip.ReceiverFactory } type AudioIngress interface { @@ -66,6 +70,17 @@ func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, err } src := srt.New("srt-main", srtCfg, opts...) return src, nil, nil + case "aes67", "aoip", "aoip-rtp": + aoipCfg, err := buildAES67Config(cfg) + if err != nil { + return nil, nil, err + } + opts := []aoip.Option{} + if deps.AES67ReceiverFactory != nil { + opts = append(opts, aoip.WithReceiverFactory(deps.AES67ReceiverFactory)) + } + src := aoip.New("aes67-main", aoipCfg, opts...) + return src, nil, nil default: return nil, nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind) } @@ -87,6 +102,10 @@ func SampleRateForKind(cfg config.Config) int { if cfg.Ingest.SRT.SampleRateHz > 0 { return cfg.Ingest.SRT.SampleRateHz } + case "aes67", "aoip", "aoip-rtp": + if cfg.Ingest.AES67.SampleRateHz > 0 { + return cfg.Ingest.AES67.SampleRateHz + } } return 44100 } @@ -94,3 +113,62 @@ func SampleRateForKind(cfg config.Config) int { func normalizeIngestKind(kind string) string { return strings.ToLower(strings.TrimSpace(kind)) } + +func buildAES67Config(cfg config.Config) (aoiprxkit.Config, error) { + base := aoiprxkit.DefaultConfig() + ing := cfg.Ingest.AES67 + if strings.TrimSpace(ing.InterfaceName) != "" { + base.InterfaceName = strings.TrimSpace(ing.InterfaceName) + } + if ing.PayloadType >= 0 { + base.PayloadType = uint8(ing.PayloadType) + } + if ing.SampleRateHz > 0 { + base.SampleRateHz = ing.SampleRateHz + } + if ing.Channels > 0 { + base.Channels = ing.Channels + } + if strings.TrimSpace(ing.Encoding) != "" { + base.Encoding = strings.ToUpper(strings.TrimSpace(ing.Encoding)) + } + if ing.PacketTimeMs > 0 { + base.PacketTime = time.Duration(ing.PacketTimeMs) * time.Millisecond + } + if ing.JitterDepthPackets > 0 { + base.JitterDepthPackets = ing.JitterDepthPackets + } + if ing.ReadBufferBytes > 0 { + base.ReadBufferBytes = ing.ReadBufferBytes + } + + sdpText := strings.TrimSpace(ing.SDP) + if sdpText == "" && strings.TrimSpace(ing.SDPPath) != "" { + data, err := os.ReadFile(filepath.Clean(ing.SDPPath)) + if err != nil { + return aoiprxkit.Config{}, fmt.Errorf("read ingest.aes67.sdpPath: %w", err) + } + sdpText = string(data) + } + if sdpText != "" { + info, err := aoiprxkit.ParseMinimalSDP(sdpText) + if err != nil { + return aoiprxkit.Config{}, fmt.Errorf("parse ingest.aes67 SDP: %w", err) + } + parsed, err := aoiprxkit.ConfigFromSDP(base, info) + if err != nil { + return aoiprxkit.Config{}, fmt.Errorf("map ingest.aes67 SDP: %w", err) + } + return parsed, nil + } + if strings.TrimSpace(ing.MulticastGroup) != "" { + base.MulticastGroup = strings.TrimSpace(ing.MulticastGroup) + } + if ing.Port > 0 { + base.Port = ing.Port + } + if err := base.Validate(); err != nil { + return aoiprxkit.Config{}, err + } + return base, nil +} diff --git a/internal/ingest/factory/factory_test.go b/internal/ingest/factory/factory_test.go index 5b147cd..2f43e13 100644 --- a/internal/ingest/factory/factory_test.go +++ b/internal/ingest/factory/factory_test.go @@ -110,6 +110,52 @@ func TestBuildSourceSRT(t *testing.T) { } } +func TestBuildSourceAES67(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "239.69.10.20" + cfg.Ingest.AES67.Port = 5008 + cfg.Ingest.AES67.PayloadType = 98 + cfg.Ingest.AES67.SampleRateHz = 48000 + cfg.Ingest.AES67.Channels = 2 + cfg.Ingest.AES67.Encoding = "L24" + cfg.Ingest.AES67.PacketTimeMs = 1 + cfg.Ingest.AES67.JitterDepthPackets = 6 + + src, ingress, err := BuildSource(cfg, Deps{}) + if err != nil { + t.Fatalf("build source: %v", err) + } + if src == nil { + t.Fatalf("expected source") + } + if ingress != nil { + t.Fatalf("expected no ingress for aes67") + } + if got := src.Descriptor().Kind; got != "aes67" { + t.Fatalf("source kind=%s", got) + } +} + +func TestBuildSourceAES67FromInlineSDP(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "" + cfg.Ingest.AES67.SDP = "v=0\r\ns=demo\r\nc=IN IP4 239.10.20.30\r\nm=audio 5004 RTP/AVP 97\r\na=rtpmap:97 L24/48000/2\r\na=ptime:1\r\n" + + src, _, err := BuildSource(cfg, Deps{}) + if err != nil { + t.Fatalf("build source: %v", err) + } + desc := src.Descriptor() + if desc.Transport != "rtp" { + t.Fatalf("transport=%q want rtp", desc.Transport) + } + if desc.SampleRateHz != 48000 || desc.Channels != 2 { + t.Fatalf("shape=%d/%d", desc.SampleRateHz, desc.Channels) + } +} + func TestBuildSourceUnsupportedKind(t *testing.T) { cfg := config.Default() cfg.Ingest.Kind = "nope" @@ -143,4 +189,10 @@ func TestSampleRateForKind(t *testing.T) { if got := SampleRateForKind(cfg); got != 48000 { t.Fatalf("srt sample rate=%d", got) } + + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.SampleRateHz = 32000 + if got := SampleRateForKind(cfg); got != 32000 { + t.Fatalf("aes67 sample rate=%d", got) + } } diff --git a/internal/ingest/factory/ingest_smoke_test.go b/internal/ingest/factory/ingest_smoke_test.go index e9aed73..b67bb69 100644 --- a/internal/ingest/factory/ingest_smoke_test.go +++ b/internal/ingest/factory/ingest_smoke_test.go @@ -11,12 +11,29 @@ import ( "github.com/jan/fm-rds-tx/internal/audio" "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/ingest" + aoipad "github.com/jan/fm-rds-tx/internal/ingest/adapters/aoip" ) type streamReadCloser struct{ io.Reader } func (r streamReadCloser) Close() error { return nil } +type stubAES67Receiver struct { + onStart func() +} + +func (r *stubAES67Receiver) Start(context.Context) error { + if r.onStart != nil { + r.onStart() + } + return nil +} + +func (r *stubAES67Receiver) Stop() error { return nil } +func (r *stubAES67Receiver) Stats() aoiprxkit.Stats { + return aoiprxkit.Stats{} +} + func TestHTTPRawFactoryToRuntimeSmoke(t *testing.T) { cfg := config.Default() cfg.Ingest.Kind = "http-raw" @@ -120,6 +137,64 @@ func TestSRTFactoryToRuntimeSmoke(t *testing.T) { } } +func TestAES67FactoryToRuntimeSmoke(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "239.10.20.30" + cfg.Ingest.AES67.Port = 5004 + cfg.Ingest.AES67.SampleRateHz = 48000 + cfg.Ingest.AES67.Channels = 2 + cfg.Ingest.AES67.Encoding = "L24" + cfg.Ingest.AES67.PacketTimeMs = 1 + + var frameHandler aoiprxkit.FrameHandler + src, ingress, err := BuildSource(cfg, Deps{ + AES67ReceiverFactory: func(_ aoiprxkit.Config, onFrame aoiprxkit.FrameHandler) (aoipad.ReceiverClient, error) { + frameHandler = onFrame + return &stubAES67Receiver{ + onStart: func() { + frameHandler(aoiprxkit.PCMFrame{ + SequenceNumber: 1, + SampleRateHz: 48000, + Channels: 2, + Samples: []int32{7, -7, 9, -9}, + ReceivedAt: time.Now(), + }) + }, + }, nil + }, + }) + if err != nil { + t.Fatalf("build source: %v", err) + } + if src == nil { + t.Fatalf("expected source for kind=aes67") + } + if ingress != nil { + t.Fatalf("expected no ingress for kind=aes67") + } + + sink := audio.NewStreamSource(128, cfg.Ingest.AES67.SampleRateHz) + rt := ingest.NewRuntime(sink, src) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("runtime start: %v", err) + } + defer rt.Stop() + + waitForSinkFrames(t, sink, 2) + + stats := rt.Stats() + if stats.Active.Kind != "aes67" { + t.Fatalf("active kind=%q want aes67", stats.Active.Kind) + } + if stats.Source.ChunksIn != 1 { + t.Fatalf("source chunksIn=%d want 1", stats.Source.ChunksIn) + } + if stats.Source.SamplesIn != 4 { + t.Fatalf("source samplesIn=%d want 4", stats.Source.SamplesIn) + } +} + func waitForSinkFrames(t *testing.T, sink *audio.StreamSource, minFrames int) { t.Helper() deadline := time.Now().Add(1 * time.Second)