package factory import ( "context" "fmt" "io" "net/http" "os" "path/filepath" "strings" "time" "aoiprxkit" "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/adapters/aoip" "github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw" "github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast" "github.com/jan/fm-rds-tx/internal/ingest/adapters/srt" "github.com/jan/fm-rds-tx/internal/ingest/adapters/stdinpcm" ) type Deps struct { Stdin io.Reader HTTP *http.Client SRTOpener aoiprxkit.SRTConnOpener AES67ReceiverFactory aoip.ReceiverFactory AES67Discover AES67DiscoverFunc } type AudioIngress interface { WritePCM16(data []byte) (int, error) } type AES67DiscoverRequest struct { StreamName string Timeout time.Duration InterfaceName string SAPGroup string SAPPort int } type AES67DiscoverFunc func(ctx context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error) func BuildSource(ctx context.Context, cfg config.Config, deps Deps) (ingest.Source, AudioIngress, error) { switch normalizeIngestKind(cfg.Ingest.Kind) { case "", "none": return nil, nil, nil case "stdin", "stdin-pcm": reader := deps.Stdin if reader == nil { reader = os.Stdin } src := stdinpcm.New("stdin-main", reader, cfg.Ingest.Stdin.SampleRateHz, cfg.Ingest.Stdin.Channels, 1024) return src, nil, nil case "http-raw": src := httpraw.New("http-raw-main", cfg.Ingest.HTTPRaw.SampleRateHz, cfg.Ingest.HTTPRaw.Channels) return src, src, nil case "icecast": src := icecast.New( "icecast-main", cfg.Ingest.Icecast.URL, deps.HTTP, icecast.ReconnectConfig{ Enabled: cfg.Ingest.Reconnect.Enabled, InitialBackoffMs: cfg.Ingest.Reconnect.InitialBackoffMs, MaxBackoffMs: cfg.Ingest.Reconnect.MaxBackoffMs, }, icecast.WithDecoderPreference(cfg.Ingest.Icecast.Decoder), ) return src, nil, nil case "srt": srtCfg := aoiprxkit.SRTConfig{ URL: cfg.Ingest.SRT.URL, Mode: cfg.Ingest.SRT.Mode, SampleRateHz: cfg.Ingest.SRT.SampleRateHz, Channels: cfg.Ingest.SRT.Channels, } opts := []srt.Option{} if deps.SRTOpener != nil { opts = append(opts, srt.WithConnOpener(deps.SRTOpener)) } src := srt.New("srt-main", srtCfg, opts...) return src, nil, nil case "aes67", "aoip", "aoip-rtp": aoipCfg, detail, origin, err := buildAES67Config(ctx, cfg, deps) if err != nil { return nil, nil, err } opts := []aoip.Option{} if deps.AES67ReceiverFactory != nil { opts = append(opts, aoip.WithReceiverFactory(deps.AES67ReceiverFactory)) } if detail != "" { opts = append(opts, aoip.WithDetail(detail)) } if origin != nil { opts = append(opts, aoip.WithOrigin(*origin)) } src := aoip.New("aes67-main", aoipCfg, opts...) return src, nil, nil default: return nil, nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind) } } func SampleRateForKind(cfg config.Config) int { switch normalizeIngestKind(cfg.Ingest.Kind) { case "stdin", "stdin-pcm": if cfg.Ingest.Stdin.SampleRateHz > 0 { return cfg.Ingest.Stdin.SampleRateHz } case "http-raw": if cfg.Ingest.HTTPRaw.SampleRateHz > 0 { return cfg.Ingest.HTTPRaw.SampleRateHz } case "icecast": // 48000 Hz is the most common rate for modern Icecast streams. // The ingest runtime will auto-correct to the actual decoded rate // after the first PCM chunk arrives (see runtime.go handleChunk). return 48000 case "srt": if cfg.Ingest.SRT.SampleRateHz > 0 { return cfg.Ingest.SRT.SampleRateHz } case "aes67", "aoip", "aoip-rtp": if cfg.Ingest.AES67.SampleRateHz > 0 { return cfg.Ingest.AES67.SampleRateHz } } // Default to 48000 Hz: the correct rate for professional sources // (SRT, AES67) and modern streams. The ingest runtime corrects this // dynamically from the first decoded chunk for compressed sources. return 48000 } func normalizeIngestKind(kind string) string { return strings.ToLower(strings.TrimSpace(kind)) } func buildAES67Config(ctx context.Context, cfg config.Config, deps Deps) (aoiprxkit.Config, string, *ingest.SourceOrigin, error) { base := aoiprxkit.DefaultConfig() ing := cfg.Ingest.AES67 if strings.TrimSpace(ing.InterfaceName) != "" { base.InterfaceName = strings.TrimSpace(ing.InterfaceName) } if ing.PayloadType >= 0 { base.PayloadType = uint8(ing.PayloadType) } if ing.SampleRateHz > 0 { base.SampleRateHz = ing.SampleRateHz } if ing.Channels > 0 { base.Channels = ing.Channels } if strings.TrimSpace(ing.Encoding) != "" { base.Encoding = strings.ToUpper(strings.TrimSpace(ing.Encoding)) } if ing.PacketTimeMs > 0 { base.PacketTime = time.Duration(ing.PacketTimeMs) * time.Millisecond } if ing.JitterDepthPackets > 0 { base.JitterDepthPackets = ing.JitterDepthPackets } if ing.ReadBufferBytes > 0 { base.ReadBufferBytes = ing.ReadBufferBytes } sdpText, discoveredStreamName, origin, err := resolveAES67SDP(ctx, ing, deps) if err != nil { return aoiprxkit.Config{}, "", nil, err } if sdpText != "" { info, err := aoiprxkit.ParseMinimalSDP(sdpText) if err != nil { return aoiprxkit.Config{}, "", nil, fmt.Errorf("parse ingest.aes67 SDP: %w", err) } parsed, err := aoiprxkit.ConfigFromSDP(base, info) if err != nil { return aoiprxkit.Config{}, "", nil, fmt.Errorf("map ingest.aes67 SDP: %w", err) } detail := "" endpoint := fmt.Sprintf("rtp://%s:%d", parsed.MulticastGroup, parsed.Port) if discoveredStreamName != "" { detail = fmt.Sprintf("rtp://%s:%d (SAP s=%s)", parsed.MulticastGroup, parsed.Port, discoveredStreamName) } if origin == nil { origin = &ingest.SourceOrigin{} } if origin.Endpoint == "" { origin.Endpoint = endpoint } return parsed, detail, origin, nil } if strings.TrimSpace(ing.MulticastGroup) != "" { base.MulticastGroup = strings.TrimSpace(ing.MulticastGroup) } if ing.Port > 0 { base.Port = ing.Port } if err := base.Validate(); err != nil { return aoiprxkit.Config{}, "", nil, err } if origin == nil { origin = &ingest.SourceOrigin{Kind: "manual"} } if origin.Endpoint == "" { origin.Endpoint = fmt.Sprintf("rtp://%s:%d", base.MulticastGroup, base.Port) } return base, "", origin, nil } func resolveAES67SDP(ctx context.Context, ing config.IngestAES67Config, deps Deps) (string, string, *ingest.SourceOrigin, error) { sdpText := strings.TrimSpace(ing.SDP) if sdpText == "" && strings.TrimSpace(ing.SDPPath) != "" { sdpPath := filepath.Clean(ing.SDPPath) data, err := os.ReadFile(sdpPath) if err != nil { return "", "", nil, fmt.Errorf("read ingest.aes67.sdpPath: %w", err) } sdpText = string(data) return sdpText, "", &ingest.SourceOrigin{ Kind: "sdp-file", SDPPath: sdpPath, }, nil } if sdpText != "" { return sdpText, "", &ingest.SourceOrigin{ Kind: "sdp-inline", }, nil } discoveryEnabled := ing.Discovery.Enabled || strings.TrimSpace(ing.Discovery.StreamName) != "" if !discoveryEnabled { return "", "", &ingest.SourceOrigin{ Kind: "manual", }, nil } timeout := time.Duration(ing.Discovery.TimeoutMs) * time.Millisecond if timeout <= 0 { timeout = 3 * time.Second } req := AES67DiscoverRequest{ StreamName: strings.TrimSpace(ing.Discovery.StreamName), Timeout: timeout, InterfaceName: strings.TrimSpace(ing.Discovery.InterfaceName), SAPGroup: strings.TrimSpace(ing.Discovery.SAPGroup), SAPPort: ing.Discovery.SAPPort, } discover := deps.AES67Discover if discover == nil { discover = discoverAES67ViaSAP } announcement, err := discover(ctx, req) if err != nil { return "", "", nil, fmt.Errorf("discover ingest.aes67 stream %q via SAP: %w", req.StreamName, err) } if strings.TrimSpace(announcement.SDP) == "" { return "", "", nil, fmt.Errorf("discover ingest.aes67 stream %q via SAP: empty SDP payload", req.StreamName) } return announcement.SDP, req.StreamName, &ingest.SourceOrigin{ Kind: "sap-discovery", StreamName: req.StreamName, }, nil } func discoverAES67ViaSAP(ctx context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error) { if req.StreamName == "" { return aoiprxkit.SAPAnnouncement{}, fmt.Errorf("stream name must not be empty") } listenerCfg := aoiprxkit.DefaultSAPListenerConfig() if req.InterfaceName != "" { listenerCfg.InterfaceName = req.InterfaceName } if req.SAPGroup != "" { listenerCfg.Group = req.SAPGroup } if req.SAPPort > 0 { listenerCfg.Port = req.SAPPort } sf, err := aoiprxkit.NewStreamFinder(listenerCfg) if err != nil { return aoiprxkit.SAPAnnouncement{}, err } if err := sf.Start(ctx); err != nil { return aoiprxkit.SAPAnnouncement{}, err } defer sf.Stop() waitCtx, cancel := context.WithTimeout(ctx, req.Timeout) defer cancel() return sf.WaitForStreamName(waitCtx, req.StreamName) }