package aoiprxkit import ( "context" "fmt" "io" "sync" "time" ) type StreamReceiverConfig struct { SourceLabel string } type StreamReceiver struct { cfg StreamReceiverConfig opener func(context.Context) (io.ReadCloser, error) onFrame FrameHandler mu sync.Mutex rc io.ReadCloser cancel context.CancelFunc done chan struct{} } func NewStreamReceiver(cfg StreamReceiverConfig, opener func(context.Context) (io.ReadCloser, error), onFrame FrameHandler) (*StreamReceiver, error) { if opener == nil { return nil, fmt.Errorf("opener must not be nil") } if onFrame == nil { return nil, fmt.Errorf("onFrame must not be nil") } return &StreamReceiver{cfg: cfg, opener: opener, onFrame: onFrame, done: make(chan struct{})}, nil } func (r *StreamReceiver) Start(ctx context.Context) error { r.mu.Lock() defer r.mu.Unlock() if r.rc != nil { return fmt.Errorf("stream receiver already started") } cctx, cancel := context.WithCancel(ctx) rc, err := r.opener(cctx) if err != nil { cancel() return err } r.rc = rc r.cancel = cancel r.done = make(chan struct{}) go r.loop(cctx, rc) return nil } func (r *StreamReceiver) Stop() error { r.mu.Lock() rc := r.rc cancel := r.cancel done := r.done r.rc = nil r.cancel = nil r.mu.Unlock() if cancel != nil { cancel() } if rc != nil { _ = rc.Close() } if done != nil { <-done } return nil } func (r *StreamReceiver) loop(ctx context.Context, rc io.ReadCloser) { defer close(r.done) for { select { case <-ctx.Done(): return default: } hdr, err := ReadStreamHeader(rc) if err != nil { return } payload := make([]byte, hdr.PayloadBytes) if _, err := io.ReadFull(rc, payload); err != nil { return } switch hdr.Codec { case StreamCodecPCM_S32LE: samples, err := DecodeS32LE(payload, int(hdr.Channels)) if err != nil { continue } r.onFrame(PCMFrame{ SequenceNumber: uint16(hdr.Sequence & 0xffff), Timestamp: uint32(hdr.Timestamp & 0xffffffff), SampleRateHz: int(hdr.SampleRateHz), Channels: int(hdr.Channels), Samples: samples, ReceivedAt: time.Now(), Source: r.cfg.SourceLabel, }) case StreamCodecOpus: // Reserved for later phase. Not decoded in this module revision. continue default: continue } } }