Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

295 satır
8.7KB

  1. package factory
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "time"
  11. "aoiprxkit"
  12. "github.com/jan/fm-rds-tx/internal/config"
  13. "github.com/jan/fm-rds-tx/internal/ingest"
  14. "github.com/jan/fm-rds-tx/internal/ingest/adapters/aoip"
  15. "github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw"
  16. "github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast"
  17. "github.com/jan/fm-rds-tx/internal/ingest/adapters/srt"
  18. "github.com/jan/fm-rds-tx/internal/ingest/adapters/stdinpcm"
  19. )
  20. type Deps struct {
  21. Stdin io.Reader
  22. HTTP *http.Client
  23. SRTOpener aoiprxkit.SRTConnOpener
  24. AES67ReceiverFactory aoip.ReceiverFactory
  25. AES67Discover AES67DiscoverFunc
  26. }
  27. type AudioIngress interface {
  28. WritePCM16(data []byte) (int, error)
  29. }
  30. type AES67DiscoverRequest struct {
  31. StreamName string
  32. Timeout time.Duration
  33. InterfaceName string
  34. SAPGroup string
  35. SAPPort int
  36. }
  37. type AES67DiscoverFunc func(ctx context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error)
  38. func BuildSource(ctx context.Context, cfg config.Config, deps Deps) (ingest.Source, AudioIngress, error) {
  39. switch normalizeIngestKind(cfg.Ingest.Kind) {
  40. case "", "none":
  41. return nil, nil, nil
  42. case "stdin", "stdin-pcm":
  43. reader := deps.Stdin
  44. if reader == nil {
  45. reader = os.Stdin
  46. }
  47. src := stdinpcm.New("stdin-main", reader, cfg.Ingest.Stdin.SampleRateHz, cfg.Ingest.Stdin.Channels, 1024)
  48. return src, nil, nil
  49. case "http-raw":
  50. src := httpraw.New("http-raw-main", cfg.Ingest.HTTPRaw.SampleRateHz, cfg.Ingest.HTTPRaw.Channels)
  51. return src, src, nil
  52. case "icecast":
  53. src := icecast.New(
  54. "icecast-main",
  55. cfg.Ingest.Icecast.URL,
  56. deps.HTTP,
  57. icecast.ReconnectConfig{
  58. Enabled: cfg.Ingest.Reconnect.Enabled,
  59. InitialBackoffMs: cfg.Ingest.Reconnect.InitialBackoffMs,
  60. MaxBackoffMs: cfg.Ingest.Reconnect.MaxBackoffMs,
  61. },
  62. icecast.WithDecoderPreference(cfg.Ingest.Icecast.Decoder),
  63. )
  64. return src, nil, nil
  65. case "srt":
  66. srtCfg := aoiprxkit.SRTConfig{
  67. URL: cfg.Ingest.SRT.URL,
  68. Mode: cfg.Ingest.SRT.Mode,
  69. SampleRateHz: cfg.Ingest.SRT.SampleRateHz,
  70. Channels: cfg.Ingest.SRT.Channels,
  71. }
  72. opts := []srt.Option{}
  73. if deps.SRTOpener != nil {
  74. opts = append(opts, srt.WithConnOpener(deps.SRTOpener))
  75. }
  76. src := srt.New("srt-main", srtCfg, opts...)
  77. return src, nil, nil
  78. case "aes67", "aoip", "aoip-rtp":
  79. aoipCfg, detail, origin, err := buildAES67Config(ctx, cfg, deps)
  80. if err != nil {
  81. return nil, nil, err
  82. }
  83. opts := []aoip.Option{}
  84. if deps.AES67ReceiverFactory != nil {
  85. opts = append(opts, aoip.WithReceiverFactory(deps.AES67ReceiverFactory))
  86. }
  87. if detail != "" {
  88. opts = append(opts, aoip.WithDetail(detail))
  89. }
  90. if origin != nil {
  91. opts = append(opts, aoip.WithOrigin(*origin))
  92. }
  93. src := aoip.New("aes67-main", aoipCfg, opts...)
  94. return src, nil, nil
  95. default:
  96. return nil, nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind)
  97. }
  98. }
  99. func SampleRateForKind(cfg config.Config) int {
  100. switch normalizeIngestKind(cfg.Ingest.Kind) {
  101. case "stdin", "stdin-pcm":
  102. if cfg.Ingest.Stdin.SampleRateHz > 0 {
  103. return cfg.Ingest.Stdin.SampleRateHz
  104. }
  105. case "http-raw":
  106. if cfg.Ingest.HTTPRaw.SampleRateHz > 0 {
  107. return cfg.Ingest.HTTPRaw.SampleRateHz
  108. }
  109. case "icecast":
  110. // 48000 Hz is the most common rate for modern Icecast streams.
  111. // The ingest runtime will auto-correct to the actual decoded rate
  112. // after the first PCM chunk arrives (see runtime.go handleChunk).
  113. return 48000
  114. case "srt":
  115. if cfg.Ingest.SRT.SampleRateHz > 0 {
  116. return cfg.Ingest.SRT.SampleRateHz
  117. }
  118. case "aes67", "aoip", "aoip-rtp":
  119. if cfg.Ingest.AES67.SampleRateHz > 0 {
  120. return cfg.Ingest.AES67.SampleRateHz
  121. }
  122. }
  123. // Default to 48000 Hz: the correct rate for professional sources
  124. // (SRT, AES67) and modern streams. The ingest runtime corrects this
  125. // dynamically from the first decoded chunk for compressed sources.
  126. return 48000
  127. }
  128. func normalizeIngestKind(kind string) string {
  129. return strings.ToLower(strings.TrimSpace(kind))
  130. }
  131. func buildAES67Config(ctx context.Context, cfg config.Config, deps Deps) (aoiprxkit.Config, string, *ingest.SourceOrigin, error) {
  132. base := aoiprxkit.DefaultConfig()
  133. ing := cfg.Ingest.AES67
  134. if strings.TrimSpace(ing.InterfaceName) != "" {
  135. base.InterfaceName = strings.TrimSpace(ing.InterfaceName)
  136. }
  137. if ing.PayloadType >= 0 {
  138. base.PayloadType = uint8(ing.PayloadType)
  139. }
  140. if ing.SampleRateHz > 0 {
  141. base.SampleRateHz = ing.SampleRateHz
  142. }
  143. if ing.Channels > 0 {
  144. base.Channels = ing.Channels
  145. }
  146. if strings.TrimSpace(ing.Encoding) != "" {
  147. base.Encoding = strings.ToUpper(strings.TrimSpace(ing.Encoding))
  148. }
  149. if ing.PacketTimeMs > 0 {
  150. base.PacketTime = time.Duration(ing.PacketTimeMs) * time.Millisecond
  151. }
  152. if ing.JitterDepthPackets > 0 {
  153. base.JitterDepthPackets = ing.JitterDepthPackets
  154. }
  155. if ing.ReadBufferBytes > 0 {
  156. base.ReadBufferBytes = ing.ReadBufferBytes
  157. }
  158. sdpText, discoveredStreamName, origin, err := resolveAES67SDP(ctx, ing, deps)
  159. if err != nil {
  160. return aoiprxkit.Config{}, "", nil, err
  161. }
  162. if sdpText != "" {
  163. info, err := aoiprxkit.ParseMinimalSDP(sdpText)
  164. if err != nil {
  165. return aoiprxkit.Config{}, "", nil, fmt.Errorf("parse ingest.aes67 SDP: %w", err)
  166. }
  167. parsed, err := aoiprxkit.ConfigFromSDP(base, info)
  168. if err != nil {
  169. return aoiprxkit.Config{}, "", nil, fmt.Errorf("map ingest.aes67 SDP: %w", err)
  170. }
  171. detail := ""
  172. endpoint := fmt.Sprintf("rtp://%s:%d", parsed.MulticastGroup, parsed.Port)
  173. if discoveredStreamName != "" {
  174. detail = fmt.Sprintf("rtp://%s:%d (SAP s=%s)", parsed.MulticastGroup, parsed.Port, discoveredStreamName)
  175. }
  176. if origin == nil {
  177. origin = &ingest.SourceOrigin{}
  178. }
  179. if origin.Endpoint == "" {
  180. origin.Endpoint = endpoint
  181. }
  182. return parsed, detail, origin, nil
  183. }
  184. if strings.TrimSpace(ing.MulticastGroup) != "" {
  185. base.MulticastGroup = strings.TrimSpace(ing.MulticastGroup)
  186. }
  187. if ing.Port > 0 {
  188. base.Port = ing.Port
  189. }
  190. if err := base.Validate(); err != nil {
  191. return aoiprxkit.Config{}, "", nil, err
  192. }
  193. if origin == nil {
  194. origin = &ingest.SourceOrigin{Kind: "manual"}
  195. }
  196. if origin.Endpoint == "" {
  197. origin.Endpoint = fmt.Sprintf("rtp://%s:%d", base.MulticastGroup, base.Port)
  198. }
  199. return base, "", origin, nil
  200. }
  201. func resolveAES67SDP(ctx context.Context, ing config.IngestAES67Config, deps Deps) (string, string, *ingest.SourceOrigin, error) {
  202. sdpText := strings.TrimSpace(ing.SDP)
  203. if sdpText == "" && strings.TrimSpace(ing.SDPPath) != "" {
  204. sdpPath := filepath.Clean(ing.SDPPath)
  205. data, err := os.ReadFile(sdpPath)
  206. if err != nil {
  207. return "", "", nil, fmt.Errorf("read ingest.aes67.sdpPath: %w", err)
  208. }
  209. sdpText = string(data)
  210. return sdpText, "", &ingest.SourceOrigin{
  211. Kind: "sdp-file",
  212. SDPPath: sdpPath,
  213. }, nil
  214. }
  215. if sdpText != "" {
  216. return sdpText, "", &ingest.SourceOrigin{
  217. Kind: "sdp-inline",
  218. }, nil
  219. }
  220. discoveryEnabled := ing.Discovery.Enabled || strings.TrimSpace(ing.Discovery.StreamName) != ""
  221. if !discoveryEnabled {
  222. return "", "", &ingest.SourceOrigin{
  223. Kind: "manual",
  224. }, nil
  225. }
  226. timeout := time.Duration(ing.Discovery.TimeoutMs) * time.Millisecond
  227. if timeout <= 0 {
  228. timeout = 3 * time.Second
  229. }
  230. req := AES67DiscoverRequest{
  231. StreamName: strings.TrimSpace(ing.Discovery.StreamName),
  232. Timeout: timeout,
  233. InterfaceName: strings.TrimSpace(ing.Discovery.InterfaceName),
  234. SAPGroup: strings.TrimSpace(ing.Discovery.SAPGroup),
  235. SAPPort: ing.Discovery.SAPPort,
  236. }
  237. discover := deps.AES67Discover
  238. if discover == nil {
  239. discover = discoverAES67ViaSAP
  240. }
  241. announcement, err := discover(ctx, req)
  242. if err != nil {
  243. return "", "", nil, fmt.Errorf("discover ingest.aes67 stream %q via SAP: %w", req.StreamName, err)
  244. }
  245. if strings.TrimSpace(announcement.SDP) == "" {
  246. return "", "", nil, fmt.Errorf("discover ingest.aes67 stream %q via SAP: empty SDP payload", req.StreamName)
  247. }
  248. return announcement.SDP, req.StreamName, &ingest.SourceOrigin{
  249. Kind: "sap-discovery",
  250. StreamName: req.StreamName,
  251. }, nil
  252. }
  253. func discoverAES67ViaSAP(ctx context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error) {
  254. if req.StreamName == "" {
  255. return aoiprxkit.SAPAnnouncement{}, fmt.Errorf("stream name must not be empty")
  256. }
  257. listenerCfg := aoiprxkit.DefaultSAPListenerConfig()
  258. if req.InterfaceName != "" {
  259. listenerCfg.InterfaceName = req.InterfaceName
  260. }
  261. if req.SAPGroup != "" {
  262. listenerCfg.Group = req.SAPGroup
  263. }
  264. if req.SAPPort > 0 {
  265. listenerCfg.Port = req.SAPPort
  266. }
  267. sf, err := aoiprxkit.NewStreamFinder(listenerCfg)
  268. if err != nil {
  269. return aoiprxkit.SAPAnnouncement{}, err
  270. }
  271. if err := sf.Start(ctx); err != nil {
  272. return aoiprxkit.SAPAnnouncement{}, err
  273. }
  274. defer sf.Stop()
  275. waitCtx, cancel := context.WithTimeout(ctx, req.Timeout)
  276. defer cancel()
  277. return sf.WaitForStreamName(waitCtx, req.StreamName)
  278. }