diff --git a/internal/config/config.go b/internal/config/config.go index d890d4c..2d469ca 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -114,18 +114,28 @@ type IngestSRTConfig struct { } type IngestAES67Config struct { - SDPPath string `json:"sdpPath"` - SDP string `json:"sdp"` - MulticastGroup string `json:"multicastGroup"` - Port int `json:"port"` - InterfaceName string `json:"interfaceName"` - PayloadType int `json:"payloadType"` - SampleRateHz int `json:"sampleRateHz"` - Channels int `json:"channels"` - Encoding string `json:"encoding"` - PacketTimeMs int `json:"packetTimeMs"` - JitterDepthPackets int `json:"jitterDepthPackets"` - ReadBufferBytes int `json:"readBufferBytes"` + SDPPath string `json:"sdpPath"` + SDP string `json:"sdp"` + Discovery IngestAES67DiscoveryConfig `json:"discovery"` + MulticastGroup string `json:"multicastGroup"` + Port int `json:"port"` + InterfaceName string `json:"interfaceName"` + PayloadType int `json:"payloadType"` + SampleRateHz int `json:"sampleRateHz"` + Channels int `json:"channels"` + Encoding string `json:"encoding"` + PacketTimeMs int `json:"packetTimeMs"` + JitterDepthPackets int `json:"jitterDepthPackets"` + ReadBufferBytes int `json:"readBufferBytes"` +} + +type IngestAES67DiscoveryConfig struct { + Enabled bool `json:"enabled"` + StreamName string `json:"streamName"` + TimeoutMs int `json:"timeoutMs"` + InterfaceName string `json:"interfaceName"` + SAPGroup string `json:"sapGroup"` + SAPPort int `json:"sapPort"` } func Default() Config { @@ -182,6 +192,9 @@ func Default() Config { Channels: 2, }, AES67: IngestAES67Config{ + Discovery: IngestAES67DiscoveryConfig{ + TimeoutMs: 3000, + }, PayloadType: 97, SampleRateHz: 48000, Channels: 2, @@ -318,17 +331,30 @@ func (c Config) Validate() error { if ingestKind == "aes67" || ingestKind == "aoip" || ingestKind == "aoip-rtp" { hasSDP := strings.TrimSpace(c.Ingest.AES67.SDP) != "" hasSDPPath := strings.TrimSpace(c.Ingest.AES67.SDPPath) != "" + discoveryEnabled := c.Ingest.AES67.Discovery.Enabled || strings.TrimSpace(c.Ingest.AES67.Discovery.StreamName) != "" if hasSDP && hasSDPPath { return fmt.Errorf("ingest.aes67.sdp and ingest.aes67.sdpPath are mutually exclusive") } if !hasSDP && !hasSDPPath { - if strings.TrimSpace(c.Ingest.AES67.MulticastGroup) == "" { + if strings.TrimSpace(c.Ingest.AES67.MulticastGroup) == "" && !discoveryEnabled { return fmt.Errorf("ingest.aes67.multicastGroup is required when ingest.kind=%s", ingestKind) } - if c.Ingest.AES67.Port <= 0 || c.Ingest.AES67.Port > 65535 { + if (c.Ingest.AES67.Port <= 0 || c.Ingest.AES67.Port > 65535) && !discoveryEnabled { return fmt.Errorf("ingest.aes67.port must be 1..65535") } } + if c.Ingest.AES67.Discovery.TimeoutMs < 0 { + return fmt.Errorf("ingest.aes67.discovery.timeoutMs must be >= 0") + } + if c.Ingest.AES67.Discovery.SAPPort < 0 || c.Ingest.AES67.Discovery.SAPPort > 65535 { + return fmt.Errorf("ingest.aes67.discovery.sapPort must be 0..65535") + } + if discoveryEnabled && strings.TrimSpace(c.Ingest.AES67.Discovery.StreamName) == "" { + return fmt.Errorf("ingest.aes67.discovery.streamName is required when discovery is enabled") + } + if discoveryEnabled && c.Ingest.AES67.Port > 65535 { + return fmt.Errorf("ingest.aes67.port must be 1..65535") + } if c.Ingest.AES67.PayloadType < 0 || c.Ingest.AES67.PayloadType > 127 { return fmt.Errorf("ingest.aes67.payloadType must be 0..127") } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 3376cc8..0c0cd5a 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -203,6 +203,41 @@ func TestValidateAcceptsAES67WithSDPOnly(t *testing.T) { } } +func TestValidateAcceptsAES67WithDiscoveryOnly(t *testing.T) { + cfg := Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "" + cfg.Ingest.AES67.Port = 0 + cfg.Ingest.AES67.Discovery.StreamName = "AES67-MAIN" + if err := cfg.Validate(); err != nil { + t.Fatalf("expected aes67 discovery config to validate: %v", err) + } +} + +func TestValidateRejectsAES67DiscoveryWithoutStreamName(t *testing.T) { + cfg := Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "" + cfg.Ingest.AES67.Port = 0 + cfg.Ingest.AES67.Discovery.Enabled = true + cfg.Ingest.AES67.Discovery.StreamName = "" + if err := cfg.Validate(); err == nil { + t.Fatal("expected discovery streamName validation error") + } +} + +func TestValidateRejectsAES67DiscoverySAPPortOutOfRange(t *testing.T) { + cfg := Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "" + cfg.Ingest.AES67.Port = 0 + cfg.Ingest.AES67.Discovery.StreamName = "AES67-MAIN" + cfg.Ingest.AES67.Discovery.SAPPort = 70000 + if err := cfg.Validate(); err == nil { + t.Fatal("expected discovery sapPort validation error") + } +} + func TestValidateRejectsUnsupportedIngestPCMShape(t *testing.T) { cfg := Default() cfg.Ingest.Stdin.SampleRateHz = 0 diff --git a/internal/ingest/adapters/aoip/source.go b/internal/ingest/adapters/aoip/source.go index 3d24346..4324461 100644 --- a/internal/ingest/adapters/aoip/source.go +++ b/internal/ingest/adapters/aoip/source.go @@ -30,11 +30,18 @@ func WithReceiverFactory(factory ReceiverFactory) Option { } } +func WithDetail(detail string) Option { + return func(s *Source) { + s.detail = detail + } +} + type Source struct { id string cfg aoiprxkit.Config factory ReceiverFactory + detail string chunks chan ingest.PCMChunk errs chan error @@ -89,6 +96,10 @@ func New(id string, cfg aoiprxkit.Config, opts ...Option) *Source { } func (s *Source) Descriptor() ingest.SourceDescriptor { + detail := s.detail + if detail == "" { + detail = fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port) + } return ingest.SourceDescriptor{ ID: s.id, Kind: "aes67", @@ -97,7 +108,7 @@ func (s *Source) Descriptor() ingest.SourceDescriptor { Codec: "l24", Channels: s.cfg.Channels, SampleRateHz: s.cfg.SampleRateHz, - Detail: fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port), + Detail: detail, } } diff --git a/internal/ingest/adapters/aoip/source_test.go b/internal/ingest/adapters/aoip/source_test.go index 434f6cf..068ac23 100644 --- a/internal/ingest/adapters/aoip/source_test.go +++ b/internal/ingest/adapters/aoip/source_test.go @@ -112,6 +112,20 @@ func TestSourceEmitsChunksAndMapsStats(t *testing.T) { } } +func TestSourceDescriptorSupportsDetailOverride(t *testing.T) { + src := New("aes67-test", aoiprxkit.Config{ + MulticastGroup: "239.10.20.30", + Port: 5004, + SampleRateHz: 48000, + Channels: 2, + }, WithDetail("rtp://239.10.20.30:5004 (SAP s=AES67-MAIN)")) + + desc := src.Descriptor() + if desc.Detail != "rtp://239.10.20.30:5004 (SAP s=AES67-MAIN)" { + t.Fatalf("detail=%q", desc.Detail) + } +} + func readChunk(t *testing.T, ch <-chan ingest.PCMChunk) ingest.PCMChunk { t.Helper() select { diff --git a/internal/ingest/factory/factory.go b/internal/ingest/factory/factory.go index 85fd34a..8ee79c8 100644 --- a/internal/ingest/factory/factory.go +++ b/internal/ingest/factory/factory.go @@ -1,6 +1,7 @@ package factory import ( + "context" "fmt" "io" "net/http" @@ -24,12 +25,23 @@ type Deps struct { HTTP *http.Client SRTOpener aoiprxkit.SRTConnOpener AES67ReceiverFactory aoip.ReceiverFactory + AES67Discover AES67DiscoverFunc } type AudioIngress interface { WritePCM16(data []byte) (int, error) } +type AES67DiscoverRequest struct { + StreamName string + Timeout time.Duration + InterfaceName string + SAPGroup string + SAPPort int +} + +type AES67DiscoverFunc func(ctx context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error) + func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, error) { switch normalizeIngestKind(cfg.Ingest.Kind) { case "", "none": @@ -71,7 +83,7 @@ func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, err src := srt.New("srt-main", srtCfg, opts...) return src, nil, nil case "aes67", "aoip", "aoip-rtp": - aoipCfg, err := buildAES67Config(cfg) + aoipCfg, detail, err := buildAES67Config(cfg, deps) if err != nil { return nil, nil, err } @@ -79,6 +91,9 @@ func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, err if deps.AES67ReceiverFactory != nil { opts = append(opts, aoip.WithReceiverFactory(deps.AES67ReceiverFactory)) } + if detail != "" { + opts = append(opts, aoip.WithDetail(detail)) + } src := aoip.New("aes67-main", aoipCfg, opts...) return src, nil, nil default: @@ -114,7 +129,7 @@ func normalizeIngestKind(kind string) string { return strings.ToLower(strings.TrimSpace(kind)) } -func buildAES67Config(cfg config.Config) (aoiprxkit.Config, error) { +func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, error) { base := aoiprxkit.DefaultConfig() ing := cfg.Ingest.AES67 if strings.TrimSpace(ing.InterfaceName) != "" { @@ -142,24 +157,25 @@ func buildAES67Config(cfg config.Config) (aoiprxkit.Config, error) { base.ReadBufferBytes = ing.ReadBufferBytes } - sdpText := strings.TrimSpace(ing.SDP) - if sdpText == "" && strings.TrimSpace(ing.SDPPath) != "" { - data, err := os.ReadFile(filepath.Clean(ing.SDPPath)) - if err != nil { - return aoiprxkit.Config{}, fmt.Errorf("read ingest.aes67.sdpPath: %w", err) - } - sdpText = string(data) + sdpText, discoveredStreamName, err := resolveAES67SDP(ing, deps) + if err != nil { + return aoiprxkit.Config{}, "", err } + if sdpText != "" { info, err := aoiprxkit.ParseMinimalSDP(sdpText) if err != nil { - return aoiprxkit.Config{}, fmt.Errorf("parse ingest.aes67 SDP: %w", err) + return aoiprxkit.Config{}, "", fmt.Errorf("parse ingest.aes67 SDP: %w", err) } parsed, err := aoiprxkit.ConfigFromSDP(base, info) if err != nil { - return aoiprxkit.Config{}, fmt.Errorf("map ingest.aes67 SDP: %w", err) + return aoiprxkit.Config{}, "", fmt.Errorf("map ingest.aes67 SDP: %w", err) + } + detail := "" + if discoveredStreamName != "" { + detail = fmt.Sprintf("rtp://%s:%d (SAP s=%s)", parsed.MulticastGroup, parsed.Port, discoveredStreamName) } - return parsed, nil + return parsed, detail, nil } if strings.TrimSpace(ing.MulticastGroup) != "" { base.MulticastGroup = strings.TrimSpace(ing.MulticastGroup) @@ -168,7 +184,77 @@ func buildAES67Config(cfg config.Config) (aoiprxkit.Config, error) { base.Port = ing.Port } if err := base.Validate(); err != nil { - return aoiprxkit.Config{}, err + return aoiprxkit.Config{}, "", err } - return base, nil + return base, "", nil +} + +func resolveAES67SDP(ing config.IngestAES67Config, deps Deps) (string, string, error) { + sdpText := strings.TrimSpace(ing.SDP) + if sdpText == "" && strings.TrimSpace(ing.SDPPath) != "" { + data, err := os.ReadFile(filepath.Clean(ing.SDPPath)) + if err != nil { + return "", "", fmt.Errorf("read ingest.aes67.sdpPath: %w", err) + } + sdpText = string(data) + } + if sdpText != "" { + return sdpText, "", nil + } + + discoveryEnabled := ing.Discovery.Enabled || strings.TrimSpace(ing.Discovery.StreamName) != "" + if !discoveryEnabled { + return "", "", nil + } + timeout := time.Duration(ing.Discovery.TimeoutMs) * time.Millisecond + if timeout <= 0 { + timeout = 3 * time.Second + } + req := AES67DiscoverRequest{ + StreamName: strings.TrimSpace(ing.Discovery.StreamName), + Timeout: timeout, + InterfaceName: strings.TrimSpace(ing.Discovery.InterfaceName), + SAPGroup: strings.TrimSpace(ing.Discovery.SAPGroup), + SAPPort: ing.Discovery.SAPPort, + } + discover := deps.AES67Discover + if discover == nil { + discover = discoverAES67ViaSAP + } + announcement, err := discover(context.Background(), req) + if err != nil { + return "", "", fmt.Errorf("discover ingest.aes67 stream %q via SAP: %w", req.StreamName, err) + } + if strings.TrimSpace(announcement.SDP) == "" { + return "", "", fmt.Errorf("discover ingest.aes67 stream %q via SAP: empty SDP payload", req.StreamName) + } + return announcement.SDP, req.StreamName, nil +} + +func discoverAES67ViaSAP(ctx context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error) { + if req.StreamName == "" { + return aoiprxkit.SAPAnnouncement{}, fmt.Errorf("stream name must not be empty") + } + listenerCfg := aoiprxkit.DefaultSAPListenerConfig() + if req.InterfaceName != "" { + listenerCfg.InterfaceName = req.InterfaceName + } + if req.SAPGroup != "" { + listenerCfg.Group = req.SAPGroup + } + if req.SAPPort > 0 { + listenerCfg.Port = req.SAPPort + } + sf, err := aoiprxkit.NewStreamFinder(listenerCfg) + if err != nil { + return aoiprxkit.SAPAnnouncement{}, err + } + if err := sf.Start(ctx); err != nil { + return aoiprxkit.SAPAnnouncement{}, err + } + defer sf.Stop() + + waitCtx, cancel := context.WithTimeout(ctx, req.Timeout) + defer cancel() + return sf.WaitForStreamName(waitCtx, req.StreamName) } diff --git a/internal/ingest/factory/factory_test.go b/internal/ingest/factory/factory_test.go index 2f43e13..a506d9f 100644 --- a/internal/ingest/factory/factory_test.go +++ b/internal/ingest/factory/factory_test.go @@ -2,7 +2,12 @@ package factory import ( "bytes" + "context" + "errors" "testing" + "time" + + "aoiprxkit" "github.com/jan/fm-rds-tx/internal/config" ) @@ -156,6 +161,56 @@ func TestBuildSourceAES67FromInlineSDP(t *testing.T) { } } +func TestBuildSourceAES67WithDiscovery(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "" + cfg.Ingest.AES67.Port = 0 + cfg.Ingest.AES67.Discovery.StreamName = "AES67-MAIN" + cfg.Ingest.AES67.Discovery.TimeoutMs = 1500 + + var gotReq AES67DiscoverRequest + src, _, err := BuildSource(cfg, Deps{ + AES67Discover: func(_ context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error) { + gotReq = req + return aoiprxkit.SAPAnnouncement{ + SDP: "v=0\r\ns=AES67-MAIN\r\nc=IN IP4 239.10.20.30\r\nm=audio 5004 RTP/AVP 97\r\na=rtpmap:97 L24/48000/2\r\na=ptime:1\r\n", + }, nil + }, + }) + if err != nil { + t.Fatalf("build source: %v", err) + } + if gotReq.StreamName != "AES67-MAIN" { + t.Fatalf("discovery streamName=%q want AES67-MAIN", gotReq.StreamName) + } + if gotReq.Timeout != 1500*time.Millisecond { + t.Fatalf("discovery timeout=%s want 1500ms", gotReq.Timeout) + } + desc := src.Descriptor() + if desc.Detail != "rtp://239.10.20.30:5004 (SAP s=AES67-MAIN)" { + t.Fatalf("descriptor detail=%q", desc.Detail) + } +} + +func TestBuildSourceAES67DiscoveryError(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "aes67" + cfg.Ingest.AES67.MulticastGroup = "" + cfg.Ingest.AES67.Port = 0 + cfg.Ingest.AES67.Discovery.StreamName = "AES67-MAIN" + + _, _, err := BuildSource(cfg, Deps{ + AES67Discover: func(_ context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error) { + _ = req + return aoiprxkit.SAPAnnouncement{}, errors.New("timeout") + }, + }) + if err == nil { + t.Fatalf("expected discovery error") + } +} + func TestBuildSourceUnsupportedKind(t *testing.T) { cfg := config.Default() cfg.Ingest.Kind = "nope"