From aa26330147eea71955f115facc939085f7f155ab Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 19:53:10 +0200 Subject: [PATCH] ingest: expose source origin in runtime details --- internal/control/control_test.go | 23 ++++++- internal/ingest/adapters/aoip/source.go | 20 ++++++ internal/ingest/adapters/aoip/source_test.go | 15 ++++- internal/ingest/adapters/icecast/source.go | 20 ++++++ .../ingest/adapters/icecast/source_test.go | 14 +++++ internal/ingest/adapters/srt/source.go | 22 +++++++ internal/ingest/adapters/srt/source_test.go | 14 +++++ internal/ingest/factory/factory.go | 62 ++++++++++++++----- internal/ingest/factory/factory_test.go | 18 ++++++ internal/ingest/types.go | 27 +++++--- 10 files changed, 208 insertions(+), 27 deletions(-) diff --git a/internal/control/control_test.go b/internal/control/control_test.go index a59dcbf..11fa3a8 100644 --- a/internal/control/control_test.go +++ b/internal/control/control_test.go @@ -221,7 +221,14 @@ func TestRuntimeIncludesDetailedIngestSourceAndRuntimeStats(t *testing.T) { srv := NewServer(cfgpkg.Default()) srv.SetIngestRuntime(&fakeIngestRuntime{ stats: ingest.Stats{ - Active: ingest.SourceDescriptor{ID: "icecast-main", Kind: "icecast"}, + Active: ingest.SourceDescriptor{ + ID: "icecast-main", + Kind: "icecast", + Origin: &ingest.SourceOrigin{ + Kind: "url", + Endpoint: "http://example.org/live", + }, + }, Source: ingest.SourceStats{ State: "reconnecting", Connected: false, @@ -261,6 +268,20 @@ func TestRuntimeIncludesDetailedIngestSourceAndRuntimeStats(t *testing.T) { if source["lastError"] != "dial tcp timeout" { t.Fatalf("source lastError mismatch: got %v", source["lastError"]) } + active, ok := ingestPayload["active"].(map[string]any) + if !ok { + t.Fatalf("expected ingest.active map, got %T", ingestPayload["active"]) + } + origin, ok := active["origin"].(map[string]any) + if !ok { + t.Fatalf("expected ingest.active.origin map, got %T", active["origin"]) + } + if origin["kind"] != "url" { + t.Fatalf("origin kind mismatch: got %v", origin["kind"]) + } + if origin["endpoint"] != "http://example.org/live" { + t.Fatalf("origin endpoint mismatch: got %v", origin["endpoint"]) + } runtimePayload, ok := ingestPayload["runtime"].(map[string]any) if !ok { t.Fatalf("expected ingest.runtime map, got %T", ingestPayload["runtime"]) diff --git a/internal/ingest/adapters/aoip/source.go b/internal/ingest/adapters/aoip/source.go index 4324461..24e30f1 100644 --- a/internal/ingest/adapters/aoip/source.go +++ b/internal/ingest/adapters/aoip/source.go @@ -36,12 +36,20 @@ func WithDetail(detail string) Option { } } +func WithOrigin(origin ingest.SourceOrigin) Option { + return func(s *Source) { + clone := origin + s.origin = &clone + } +} + type Source struct { id string cfg aoiprxkit.Config factory ReceiverFactory detail string + origin *ingest.SourceOrigin chunks chan ingest.PCMChunk errs chan error @@ -100,6 +108,17 @@ func (s *Source) Descriptor() ingest.SourceDescriptor { if detail == "" { detail = fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port) } + origin := s.origin + if origin == nil { + origin = &ingest.SourceOrigin{ + Kind: "manual", + } + } + if origin.Endpoint == "" { + copyOrigin := *origin + copyOrigin.Endpoint = fmt.Sprintf("rtp://%s:%d", s.cfg.MulticastGroup, s.cfg.Port) + origin = ©Origin + } return ingest.SourceDescriptor{ ID: s.id, Kind: "aes67", @@ -109,6 +128,7 @@ func (s *Source) Descriptor() ingest.SourceDescriptor { Channels: s.cfg.Channels, SampleRateHz: s.cfg.SampleRateHz, Detail: detail, + Origin: origin, } } diff --git a/internal/ingest/adapters/aoip/source_test.go b/internal/ingest/adapters/aoip/source_test.go index 068ac23..b3462ae 100644 --- a/internal/ingest/adapters/aoip/source_test.go +++ b/internal/ingest/adapters/aoip/source_test.go @@ -118,12 +118,25 @@ func TestSourceDescriptorSupportsDetailOverride(t *testing.T) { Port: 5004, SampleRateHz: 48000, Channels: 2, - }, WithDetail("rtp://239.10.20.30:5004 (SAP s=AES67-MAIN)")) + }, WithDetail("rtp://239.10.20.30:5004 (SAP s=AES67-MAIN)"), WithOrigin(ingest.SourceOrigin{ + Kind: "sap-discovery", + StreamName: "AES67-MAIN", + Endpoint: "rtp://239.10.20.30:5004", + })) desc := src.Descriptor() if desc.Detail != "rtp://239.10.20.30:5004 (SAP s=AES67-MAIN)" { t.Fatalf("detail=%q", desc.Detail) } + if desc.Origin == nil { + t.Fatalf("expected descriptor origin") + } + if desc.Origin.Kind != "sap-discovery" { + t.Fatalf("origin kind=%q", desc.Origin.Kind) + } + if desc.Origin.StreamName != "AES67-MAIN" { + t.Fatalf("origin streamName=%q", desc.Origin.StreamName) + } } func readChunk(t *testing.T, ch <-chan ingest.PCMChunk) ingest.PCMChunk { diff --git a/internal/ingest/adapters/icecast/source.go b/internal/ingest/adapters/icecast/source.go index 028722f..784891d 100644 --- a/internal/ingest/adapters/icecast/source.go +++ b/internal/ingest/adapters/icecast/source.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "net/url" "strings" "sync" "sync/atomic" @@ -115,6 +116,10 @@ func (s *Source) Descriptor() ingest.SourceDescriptor { Transport: "http", Codec: s.decoderPreference, Detail: s.url, + Origin: &ingest.SourceOrigin{ + Kind: "url", + Endpoint: redactURL(s.url), + }, } } @@ -345,3 +350,18 @@ func normalizeDecoderPreference(pref string) string { return strings.ToLower(strings.TrimSpace(pref)) } } + +func redactURL(raw string) string { + trimmed := strings.TrimSpace(raw) + if trimmed == "" { + return "" + } + u, err := url.Parse(trimmed) + if err != nil || u.Host == "" { + return trimmed + } + u.User = nil + u.RawQuery = "" + u.Fragment = "" + return u.String() +} diff --git a/internal/ingest/adapters/icecast/source_test.go b/internal/ingest/adapters/icecast/source_test.go index 162ea89..9984269 100644 --- a/internal/ingest/adapters/icecast/source_test.go +++ b/internal/ingest/adapters/icecast/source_test.go @@ -311,6 +311,20 @@ func TestWithDecoderPreferenceFallbackAliasNormalizesToFFmpeg(t *testing.T) { } } +func TestDescriptorOriginRedactsCredentialsAndQuery(t *testing.T) { + src := New("ice-test", "http://user:secret@example.org:8000/live.mp3?token=abc", nil, ReconnectConfig{}) + desc := src.Descriptor() + if desc.Origin == nil { + t.Fatalf("expected descriptor origin") + } + if desc.Origin.Kind != "url" { + t.Fatalf("origin kind=%q want url", desc.Origin.Kind) + } + if desc.Origin.Endpoint != "http://example.org:8000/live.mp3" { + t.Fatalf("origin endpoint=%q", desc.Origin.Endpoint) + } +} + func TestConnectAndRunRequestsICYAndPublishesStreamTitle(t *testing.T) { const ( audioPrefix = "ABCD" diff --git a/internal/ingest/adapters/srt/source.go b/internal/ingest/adapters/srt/source.go index 327e1ae..af3685d 100644 --- a/internal/ingest/adapters/srt/source.go +++ b/internal/ingest/adapters/srt/source.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "io" + "net/url" + "strings" "sync" "sync/atomic" "time" @@ -96,6 +98,11 @@ func (s *Source) Descriptor() ingest.SourceDescriptor { Channels: s.cfg.Channels, SampleRateHz: s.cfg.SampleRateHz, Detail: s.cfg.URL, + Origin: &ingest.SourceOrigin{ + Kind: "url", + Endpoint: redactURL(s.cfg.URL), + Mode: strings.TrimSpace(s.cfg.Mode), + }, } } @@ -281,3 +288,18 @@ func (s *Source) emitError(err error) { default: } } + +func redactURL(raw string) string { + trimmed := strings.TrimSpace(raw) + if trimmed == "" { + return "" + } + u, err := url.Parse(trimmed) + if err != nil || u.Host == "" { + return trimmed + } + u.User = nil + u.RawQuery = "" + u.Fragment = "" + return u.String() +} diff --git a/internal/ingest/adapters/srt/source_test.go b/internal/ingest/adapters/srt/source_test.go index a4527b1..5393cd4 100644 --- a/internal/ingest/adapters/srt/source_test.go +++ b/internal/ingest/adapters/srt/source_test.go @@ -35,6 +35,20 @@ func TestSourceEmitsChunksFromSRTFrames(t *testing.T) { return readCloser{Reader: bytes.NewReader(stream.Bytes())}, nil })) + desc := src.Descriptor() + if desc.Origin == nil { + t.Fatalf("expected descriptor origin") + } + if desc.Origin.Kind != "url" { + t.Fatalf("origin kind=%q want url", desc.Origin.Kind) + } + if desc.Origin.Endpoint != "srt://127.0.0.1:9000" { + t.Fatalf("origin endpoint=%q", desc.Origin.Endpoint) + } + if desc.Origin.Mode != "listener" { + t.Fatalf("origin mode=%q want listener", desc.Origin.Mode) + } + if err := src.Start(context.Background()); err != nil { t.Fatalf("start: %v", err) } diff --git a/internal/ingest/factory/factory.go b/internal/ingest/factory/factory.go index 8ee79c8..5f8696c 100644 --- a/internal/ingest/factory/factory.go +++ b/internal/ingest/factory/factory.go @@ -83,7 +83,7 @@ func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, err src := srt.New("srt-main", srtCfg, opts...) return src, nil, nil case "aes67", "aoip", "aoip-rtp": - aoipCfg, detail, err := buildAES67Config(cfg, deps) + aoipCfg, detail, origin, err := buildAES67Config(cfg, deps) if err != nil { return nil, nil, err } @@ -94,6 +94,9 @@ func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, err if detail != "" { opts = append(opts, aoip.WithDetail(detail)) } + if origin != nil { + opts = append(opts, aoip.WithOrigin(*origin)) + } src := aoip.New("aes67-main", aoipCfg, opts...) return src, nil, nil default: @@ -129,7 +132,7 @@ func normalizeIngestKind(kind string) string { return strings.ToLower(strings.TrimSpace(kind)) } -func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, error) { +func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, *ingest.SourceOrigin, error) { base := aoiprxkit.DefaultConfig() ing := cfg.Ingest.AES67 if strings.TrimSpace(ing.InterfaceName) != "" { @@ -157,25 +160,32 @@ func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, e base.ReadBufferBytes = ing.ReadBufferBytes } - sdpText, discoveredStreamName, err := resolveAES67SDP(ing, deps) + sdpText, discoveredStreamName, origin, err := resolveAES67SDP(ing, deps) if err != nil { - return aoiprxkit.Config{}, "", err + return aoiprxkit.Config{}, "", nil, err } if sdpText != "" { info, err := aoiprxkit.ParseMinimalSDP(sdpText) if err != nil { - return aoiprxkit.Config{}, "", fmt.Errorf("parse ingest.aes67 SDP: %w", err) + return aoiprxkit.Config{}, "", nil, fmt.Errorf("parse ingest.aes67 SDP: %w", err) } parsed, err := aoiprxkit.ConfigFromSDP(base, info) if err != nil { - return aoiprxkit.Config{}, "", fmt.Errorf("map ingest.aes67 SDP: %w", err) + return aoiprxkit.Config{}, "", nil, fmt.Errorf("map ingest.aes67 SDP: %w", err) } detail := "" + endpoint := fmt.Sprintf("rtp://%s:%d", parsed.MulticastGroup, parsed.Port) if discoveredStreamName != "" { detail = fmt.Sprintf("rtp://%s:%d (SAP s=%s)", parsed.MulticastGroup, parsed.Port, discoveredStreamName) } - return parsed, detail, nil + if origin == nil { + origin = &ingest.SourceOrigin{} + } + if origin.Endpoint == "" { + origin.Endpoint = endpoint + } + return parsed, detail, origin, nil } if strings.TrimSpace(ing.MulticastGroup) != "" { base.MulticastGroup = strings.TrimSpace(ing.MulticastGroup) @@ -184,27 +194,42 @@ func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, e base.Port = ing.Port } if err := base.Validate(); err != nil { - return aoiprxkit.Config{}, "", err + return aoiprxkit.Config{}, "", nil, err + } + if origin == nil { + origin = &ingest.SourceOrigin{Kind: "manual"} + } + if origin.Endpoint == "" { + origin.Endpoint = fmt.Sprintf("rtp://%s:%d", base.MulticastGroup, base.Port) } - return base, "", nil + return base, "", origin, nil } -func resolveAES67SDP(ing config.IngestAES67Config, deps Deps) (string, string, error) { +func resolveAES67SDP(ing config.IngestAES67Config, deps Deps) (string, string, *ingest.SourceOrigin, error) { sdpText := strings.TrimSpace(ing.SDP) if sdpText == "" && strings.TrimSpace(ing.SDPPath) != "" { - data, err := os.ReadFile(filepath.Clean(ing.SDPPath)) + sdpPath := filepath.Clean(ing.SDPPath) + data, err := os.ReadFile(sdpPath) if err != nil { - return "", "", fmt.Errorf("read ingest.aes67.sdpPath: %w", err) + return "", "", nil, fmt.Errorf("read ingest.aes67.sdpPath: %w", err) } sdpText = string(data) + return sdpText, "", &ingest.SourceOrigin{ + Kind: "sdp-file", + SDPPath: sdpPath, + }, nil } if sdpText != "" { - return sdpText, "", nil + return sdpText, "", &ingest.SourceOrigin{ + Kind: "sdp-inline", + }, nil } discoveryEnabled := ing.Discovery.Enabled || strings.TrimSpace(ing.Discovery.StreamName) != "" if !discoveryEnabled { - return "", "", nil + return "", "", &ingest.SourceOrigin{ + Kind: "manual", + }, nil } timeout := time.Duration(ing.Discovery.TimeoutMs) * time.Millisecond if timeout <= 0 { @@ -223,12 +248,15 @@ func resolveAES67SDP(ing config.IngestAES67Config, deps Deps) (string, string, e } announcement, err := discover(context.Background(), req) if err != nil { - return "", "", fmt.Errorf("discover ingest.aes67 stream %q via SAP: %w", req.StreamName, err) + return "", "", nil, fmt.Errorf("discover ingest.aes67 stream %q via SAP: %w", req.StreamName, err) } if strings.TrimSpace(announcement.SDP) == "" { - return "", "", fmt.Errorf("discover ingest.aes67 stream %q via SAP: empty SDP payload", req.StreamName) + return "", "", nil, fmt.Errorf("discover ingest.aes67 stream %q via SAP: empty SDP payload", req.StreamName) } - return announcement.SDP, req.StreamName, nil + return announcement.SDP, req.StreamName, &ingest.SourceOrigin{ + Kind: "sap-discovery", + StreamName: req.StreamName, + }, nil } func discoverAES67ViaSAP(ctx context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error) { diff --git a/internal/ingest/factory/factory_test.go b/internal/ingest/factory/factory_test.go index a506d9f..4b703dd 100644 --- a/internal/ingest/factory/factory_test.go +++ b/internal/ingest/factory/factory_test.go @@ -90,6 +90,9 @@ func TestBuildSourceIcecastUsesDecoderPreference(t *testing.T) { if got := src.Descriptor().Codec; got != "ffmpeg" { t.Fatalf("codec=%s want ffmpeg", got) } + if got := src.Descriptor().Origin; got == nil || got.Kind != "url" { + t.Fatalf("expected icecast origin kind url, got %+v", got) + } } func TestBuildSourceSRT(t *testing.T) { @@ -113,6 +116,9 @@ func TestBuildSourceSRT(t *testing.T) { if got := src.Descriptor().Kind; got != "srt" { t.Fatalf("source kind=%s", got) } + if got := src.Descriptor().Origin; got == nil || got.Kind != "url" || got.Mode != "listener" { + t.Fatalf("expected srt origin url/listener, got %+v", got) + } } func TestBuildSourceAES67(t *testing.T) { @@ -159,6 +165,12 @@ func TestBuildSourceAES67FromInlineSDP(t *testing.T) { if desc.SampleRateHz != 48000 || desc.Channels != 2 { t.Fatalf("shape=%d/%d", desc.SampleRateHz, desc.Channels) } + if desc.Origin == nil || desc.Origin.Kind != "sdp-inline" { + t.Fatalf("origin=%+v want sdp-inline", desc.Origin) + } + if desc.Origin.Endpoint != "rtp://239.10.20.30:5004" { + t.Fatalf("origin endpoint=%q", desc.Origin.Endpoint) + } } func TestBuildSourceAES67WithDiscovery(t *testing.T) { @@ -191,6 +203,12 @@ func TestBuildSourceAES67WithDiscovery(t *testing.T) { if desc.Detail != "rtp://239.10.20.30:5004 (SAP s=AES67-MAIN)" { t.Fatalf("descriptor detail=%q", desc.Detail) } + if desc.Origin == nil || desc.Origin.Kind != "sap-discovery" { + t.Fatalf("origin=%+v want sap-discovery", desc.Origin) + } + if desc.Origin.StreamName != "AES67-MAIN" { + t.Fatalf("origin streamName=%q", desc.Origin.StreamName) + } } func TestBuildSourceAES67DiscoveryError(t *testing.T) { diff --git a/internal/ingest/types.go b/internal/ingest/types.go index bae4801..92d95eb 100644 --- a/internal/ingest/types.go +++ b/internal/ingest/types.go @@ -15,12 +15,23 @@ type PCMChunk struct { } type SourceDescriptor struct { - ID string `json:"id"` - Kind string `json:"kind"` - Family string `json:"family"` - Transport string `json:"transport"` - Codec string `json:"codec"` - Channels int `json:"channels"` - SampleRateHz int `json:"sampleRateHz"` - Detail string `json:"detail,omitempty"` + ID string `json:"id"` + Kind string `json:"kind"` + Family string `json:"family"` + Transport string `json:"transport"` + Codec string `json:"codec"` + Channels int `json:"channels"` + SampleRateHz int `json:"sampleRateHz"` + Detail string `json:"detail,omitempty"` + Origin *SourceOrigin `json:"origin,omitempty"` +} + +// SourceOrigin describes where an ingest source definition came from and +// which endpoint it resolved to, so control/runtime can show provenance. +type SourceOrigin struct { + Kind string `json:"kind,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + Mode string `json:"mode,omitempty"` + StreamName string `json:"streamName,omitempty"` + SDPPath string `json:"sdpPath,omitempty"` }