package aoiprxkit import ( "context" "fmt" "sync" "time" ) // StreamFinder keeps a live in-memory view of SAP/SDP announcements // and can wait for sessions by their SDP "s=" session name. type StreamFinder struct { listener *SAPListener mu sync.Mutex sessions map[string]SAPAnnouncement waiters map[string][]chan SAPAnnouncement } func NewStreamFinder(cfg SAPListenerConfig) (*StreamFinder, error) { sf := &StreamFinder{ sessions: make(map[string]SAPAnnouncement), waiters: make(map[string][]chan SAPAnnouncement), } listener, err := NewSAPListener(cfg, sf.handleAnnouncement) if err != nil { return nil, err } sf.listener = listener return sf, nil } func (s *StreamFinder) Start(ctx context.Context) error { return s.listener.Start(ctx) } func (s *StreamFinder) Stop() error { return s.listener.Stop() } func (s *StreamFinder) handleAnnouncement(a SAPAnnouncement) { name := a.ParsedSDP.SessionName if name == "" { return } s.mu.Lock() defer s.mu.Unlock() if a.Delete { delete(s.sessions, name) return } s.sessions[name] = a if waiters := s.waiters[name]; len(waiters) > 0 { delete(s.waiters, name) for _, ch := range waiters { select { case ch <- a: default: } close(ch) } } } func (s *StreamFinder) FindByStreamName(name string) (SAPAnnouncement, bool) { s.mu.Lock() defer s.mu.Unlock() a, ok := s.sessions[name] return a, ok } func (s *StreamFinder) WaitForStreamName(ctx context.Context, name string) (SAPAnnouncement, error) { if name == "" { return SAPAnnouncement{}, fmt.Errorf("stream name must not be empty") } s.mu.Lock() if a, ok := s.sessions[name]; ok { s.mu.Unlock() return a, nil } ch := make(chan SAPAnnouncement, 1) s.waiters[name] = append(s.waiters[name], ch) s.mu.Unlock() select { case <-ctx.Done(): s.mu.Lock() waiters := s.waiters[name] kept := waiters[:0] for _, w := range waiters { if w != ch { kept = append(kept, w) } } if len(kept) == 0 { delete(s.waiters, name) } else { s.waiters[name] = kept } s.mu.Unlock() return SAPAnnouncement{}, ctx.Err() case a := <-ch: return a, nil } } func (s *StreamFinder) WaitConfigByStreamName(ctx context.Context, base Config, name string) (Config, SAPAnnouncement, error) { a, err := s.WaitForStreamName(ctx, name) if err != nil { return Config{}, SAPAnnouncement{}, err } cfg, err := ConfigFromSDP(base, a.ParsedSDP) if err != nil { return Config{}, SAPAnnouncement{}, err } return cfg, a, nil } func (s *StreamFinder) Snapshot() []SAPAnnouncement { s.mu.Lock() defer s.mu.Unlock() out := make([]SAPAnnouncement, 0, len(s.sessions)) for _, v := range s.sessions { out = append(out, v) } return out } func (s *StreamFinder) WaitForStreamNameTimeout(name string, timeout time.Duration) (SAPAnnouncement, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return s.WaitForStreamName(ctx, name) }