diff --git a/aoiprxkit/README.md b/aoiprxkit/README.md new file mode 100644 index 0000000..1616809 --- /dev/null +++ b/aoiprxkit/README.md @@ -0,0 +1,90 @@ +# aoiprxkit + +Standalone Go module for adding professional AoIP receive capabilities step by step. + +This package covers the roadmap up to **Phase 4** with a **Go-native target architecture**: + +1. **AES67 RX-lite** +2. **static SDP loading + optional SAP listener** +3. **stream discovery by SAP/SDP session name** +4. **live browser metering over HTTP/WebSocket** +5. **NMOS IS-04 / IS-05 client scaffolding** +6. **SRT WAN ingest via native transport adapter + framed PCM profile** + +## Included components + +### Core RTP / AES67-lite receiver +- IPv4 multicast RTP join +- static config or config derived from SDP +- `L24` decoding +- small jitter / reorder buffer +- PCM frame callback +- runtime counters + +### SDP support +- minimal parser for: + - `c=` + - `m=audio` + - `a=rtpmap` + - `a=ptime` +- conversion helper from parsed SDP to receiver config + +### SAP listener +- optional listener for SAP announcements +- default SAP group/port support +- `application/sdp` extraction +- callback with parsed session details + +### NMOS scaffolding +- lightweight Query API client +- lightweight Connection API client +- helpers for receiver-side staged activation payloads + +### SRT WAN bridge (reworked) +- no `ffmpeg.exe` dependency in the default package path +- generic stream receiver for framed PCM +- SRT receiver abstraction with injectable transport opener +- default build ships a clear stub for the transport layer +- intended production path: wire a **pure-Go SRT transport** (for example a `gosrt` opener) in the target repo + +## Framed WAN audio profile + +The package now assumes a deliberately narrow WAN ingest profile: + +- transport: SRT +- payload framing: custom framed stream defined in `stream_proto.go` +- codec today: PCM `S32LE` +- codec reserved for later: Opus + +This keeps the stack deterministic and avoids generic container / demux complexity. + +## Deliberate non-goals +- no full AES67 compliance claim +- no PTP discipline +- no full SAP session cache +- no bundled gosrt implementation in this zip +- no ST 2110-30 sender/receiver implementation +- no NMOS Node/Registry server implementation + +## Why it is built like this +The goal is not to overbuild a broadcast plant in one step. +The goal is to provide a **repo-addable module** that gives a realistic progression: + +- start with known multicast audio +- add discovery +- add control-plane interoperability +- add WAN ingest without external EXE dependencies as the default design target + +## Suggested integration order + +1. integrate the core receiver into your existing audio input abstraction +2. allow config-by-SDP +3. enable optional SAP auto-discovery +4. add NMOS registry/query support +5. wire a native SRT opener in your target repo + +## Added in this build + +- `StreamFinder` for exact matching by SDP `s=` session name +- `LiveMeter` for per-channel RMS / Peak / Latest values +- `MeterServer` with `/`, `/healthz`, `/api/meter` and `/ws/live` diff --git a/aoiprxkit/cmd/demo/main.go b/aoiprxkit/cmd/demo/main.go new file mode 100644 index 0000000..ccb9cd6 --- /dev/null +++ b/aoiprxkit/cmd/demo/main.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os/signal" + "syscall" + "time" + + "aoiprxkit" +) + +func main() { + mode := flag.String("mode", "rtp", "rtp|sap") + group := flag.String("group", "239.69.0.1", "IPv4 multicast group") + port := flag.Int("port", 5004, "UDP port") + iface := flag.String("iface", "", "network interface name") + pt := flag.Int("pt", 97, "expected RTP payload type") + rate := flag.Int("rate", 48000, "sample rate") + ch := flag.Int("ch", 2, "channels") + flag.Parse() + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + switch *mode { + case "sap": + listener, err := aoiprxkit.NewSAPListener(aoiprxkit.SAPListenerConfig{ + Group: aoiprxkit.DefaultSAPGroup, + Port: aoiprxkit.DefaultSAPPort, + InterfaceName: *iface, + ReadBuffer: 1 << 20, + }, func(a aoiprxkit.SAPAnnouncement) { + fmt.Printf("SAP session: name=%q group=%s port=%d pt=%d encoding=%s rate=%d ch=%d delete=%v\n", + a.ParsedSDP.SessionName, + a.ParsedSDP.MulticastGroup, + a.ParsedSDP.Port, + a.ParsedSDP.PayloadType, + a.ParsedSDP.Encoding, + a.ParsedSDP.SampleRateHz, + a.ParsedSDP.Channels, + a.Delete, + ) + }) + if err != nil { + log.Fatal(err) + } + if err := listener.Start(ctx); err != nil { + log.Fatal(err) + } + defer listener.Stop() + <-ctx.Done() + + default: + cfg := aoiprxkit.DefaultConfig() + cfg.MulticastGroup = *group + cfg.Port = *port + cfg.InterfaceName = *iface + cfg.PayloadType = uint8(*pt) + cfg.SampleRateHz = *rate + cfg.Channels = *ch + + var packets uint64 + rx, err := aoiprxkit.NewReceiver(cfg, func(frame aoiprxkit.PCMFrame) { + packets++ + if packets%100 == 0 { + fmt.Printf("delivered packet seq=%d ts=%d samples=%d source=%s\n", frame.SequenceNumber, frame.Timestamp, len(frame.Samples), frame.Source) + } + }) + if err != nil { + log.Fatal(err) + } + if err := rx.Start(ctx); err != nil { + log.Fatal(err) + } + defer rx.Stop() + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + fmt.Println("stopping") + return + case <-ticker.C: + fmt.Printf("stats: %+v\n", rx.Stats()) + } + } + } +} diff --git a/aoiprxkit/config.go b/aoiprxkit/config.go new file mode 100644 index 0000000..9d44640 --- /dev/null +++ b/aoiprxkit/config.go @@ -0,0 +1,66 @@ +package aoiprxkit + +import ( + "errors" + "fmt" + "net" + "time" +) + +// Config defines a pragmatic RX-only subset for statically configured AES67-style RTP audio. +// It is intentionally narrower than full AES67. +type Config struct { + MulticastGroup string + Port int + InterfaceName string + PayloadType uint8 + SampleRateHz int + Channels int + Encoding string + PacketTime time.Duration + JitterDepthPackets int + ReadBufferBytes int +} + +func DefaultConfig() Config { + return Config{ + MulticastGroup: "239.69.0.1", + Port: 5004, + PayloadType: 97, + SampleRateHz: 48000, + Channels: 2, + Encoding: "L24", + PacketTime: time.Millisecond, + JitterDepthPackets: 8, + ReadBufferBytes: 1 << 20, + } +} + +func (c Config) Validate() error { + if ip := net.ParseIP(c.MulticastGroup); ip == nil || ip.To4() == nil { + return fmt.Errorf("invalid IPv4 multicast group: %q", c.MulticastGroup) + } + ip := net.ParseIP(c.MulticastGroup).To4() + if ip[0] < 224 || ip[0] > 239 { + return fmt.Errorf("multicast group must be IPv4 multicast: %q", c.MulticastGroup) + } + if c.Port < 1 || c.Port > 65535 { + return errors.New("port must be 1..65535") + } + if c.SampleRateHz <= 0 { + return errors.New("sample rate must be > 0") + } + if c.Channels < 1 || c.Channels > 2 { + return errors.New("channels must be 1 or 2") + } + if c.Encoding != "L24" { + return fmt.Errorf("unsupported encoding %q: only L24 is currently supported", c.Encoding) + } + if c.PacketTime <= 0 { + return errors.New("packet time must be > 0") + } + if c.JitterDepthPackets < 1 { + return errors.New("jitter depth must be >= 1") + } + return nil +} diff --git a/aoiprxkit/docs/INTEGRATION.md b/aoiprxkit/docs/INTEGRATION.md new file mode 100644 index 0000000..6c69be8 --- /dev/null +++ b/aoiprxkit/docs/INTEGRATION.md @@ -0,0 +1,39 @@ +# Integration notes + +## Existing FM / DSP project +The intended integration pattern is: + +- your application decides which input mode is active +- this module delivers decoded PCM frames +- your application writes those samples into its existing audio ring buffer or live source abstraction + +## Recommended first integration +Use only: + +- `Config` +- `NewReceiver` +- `Start` +- `Stop` + +and a callback like: + +```go +rx, _ := aoiprxkit.NewReceiver(cfg, func(frame aoiprxkit.PCMFrame) { + audioInput.PushInt32(frame.Samples, frame.SampleRateHz, frame.Channels) +}) +``` + +## SRT integration pattern +The WAN side is now split into two layers: + +1. `SRTReceiver` / `StreamReceiver` +2. a transport opener that returns an `io.ReadCloser` + +That means your target repo can later add a native `gosrt` opener without changing the PCM handling path. + +## Later additions +- derive config from SDP +- attach a SAP listener to discover sessions +- query NMOS registry for streams/receivers +- activate receiver transport with IS-05 +- use a native SRT opener for WAN delivery into the same audio input path diff --git a/aoiprxkit/docs/PROTOCOLS.md b/aoiprxkit/docs/PROTOCOLS.md new file mode 100644 index 0000000..f6acd08 --- /dev/null +++ b/aoiprxkit/docs/PROTOCOLS.md @@ -0,0 +1,26 @@ +# Protocol matrix + +## LAN +### RTP multicast + SDP +Good first step for known sources. + +### SAP +Useful for lightweight multicast session discovery. + +### NMOS IS-04 / IS-05 +Adds discovery, registry and connection management. +Recommended when integrating into professional IP media environments. + +## WAN +### SRT +Current Phase-4 target. +This package now expects a narrow framed-PCM profile over SRT instead of a generic FFmpeg sidecar path. + +### RIST +Not implemented here. +Reasonable future Phase-5 candidate for broadcast-heavy WAN environments. + +## Later / optional +### ST 2110-30 +Not implemented here. +Reasonable future path once AES67 + NMOS + WAN ingest are stable. diff --git a/aoiprxkit/go.mod b/aoiprxkit/go.mod new file mode 100644 index 0000000..79b749d --- /dev/null +++ b/aoiprxkit/go.mod @@ -0,0 +1,3 @@ +module aoiprxkit + +go 1.22 diff --git a/aoiprxkit/jitter.go b/aoiprxkit/jitter.go new file mode 100644 index 0000000..3b24d5c --- /dev/null +++ b/aoiprxkit/jitter.go @@ -0,0 +1,58 @@ +package aoiprxkit + +type jitterBuffer struct { + started bool + expected uint16 + maxDepth int + packets map[uint16]RTPPacket +} + +func newJitterBuffer(maxDepth int) *jitterBuffer { + return &jitterBuffer{maxDepth: maxDepth, packets: make(map[uint16]RTPPacket)} +} + +func (j *jitterBuffer) push(pkt RTPPacket) (ready []RTPPacket, lateDrop bool, gapLoss uint64, reorder bool) { + if !j.started { + j.started = true + j.expected = pkt.SequenceNumber + } + if seqDistance(pkt.SequenceNumber, j.expected) < 0 { + return nil, true, 0, false + } + if _, exists := j.packets[pkt.SequenceNumber]; !exists { + j.packets[pkt.SequenceNumber] = pkt + if pkt.SequenceNumber != j.expected { + reorder = true + } + } + for { + pkt, ok := j.packets[j.expected] + if !ok { + break + } + ready = append(ready, pkt) + delete(j.packets, j.expected) + j.expected++ + } + for len(j.packets) > j.maxDepth { + if _, ok := j.packets[j.expected]; ok { + break + } + j.expected++ + gapLoss++ + for { + pkt, ok := j.packets[j.expected] + if !ok { + break + } + ready = append(ready, pkt) + delete(j.packets, j.expected) + j.expected++ + } + } + return ready, false, gapLoss, reorder +} + +func seqDistance(a, b uint16) int { + return int(int16(a - b)) +} diff --git a/aoiprxkit/jitter_test.go b/aoiprxkit/jitter_test.go new file mode 100644 index 0000000..6788791 --- /dev/null +++ b/aoiprxkit/jitter_test.go @@ -0,0 +1,34 @@ +package aoiprxkit + +import "testing" + +func TestJitterBufferReordersAndReleases(t *testing.T) { + jb := newJitterBuffer(8) + p100 := RTPPacket{SequenceNumber: 100} + p102 := RTPPacket{SequenceNumber: 102} + p101 := RTPPacket{SequenceNumber: 101} + + ready, late, gap, reorder := jb.push(p100) + if late || gap != 0 || reorder { + t.Fatalf("unexpected state on first push") + } + if len(ready) != 1 || ready[0].SequenceNumber != 100 { + t.Fatalf("unexpected ready on first push: %+v", ready) + } + + ready, late, gap, reorder = jb.push(p102) + if late || gap != 0 || !reorder { + t.Fatalf("expected reorder on out-of-order push") + } + if len(ready) != 0 { + t.Fatalf("unexpected ready before missing packet arrives: %+v", ready) + } + + ready, late, gap, reorder = jb.push(p101) + if late || gap != 0 { + t.Fatalf("unexpected late/gap after completing sequence") + } + if len(ready) != 2 || ready[0].SequenceNumber != 101 || ready[1].SequenceNumber != 102 { + t.Fatalf("unexpected ready after sequence repair: %+v", ready) + } +} diff --git a/aoiprxkit/meter.go b/aoiprxkit/meter.go new file mode 100644 index 0000000..21e64fe --- /dev/null +++ b/aoiprxkit/meter.go @@ -0,0 +1,127 @@ +package aoiprxkit + +import ( + "math" + "sync" + "time" +) + +type ChannelMeter struct { + RMS float64 `json:"rms"` + Peak float64 `json:"peak"` + Latest float64 `json:"latest"` +} + +type MeterSnapshot struct { + UpdatedAt string `json:"updatedAt"` + Source string `json:"source"` + SampleRateHz int `json:"sampleRateHz"` + Channels int `json:"channels"` + Meters []ChannelMeter `json:"meters"` +} + +// LiveMeter consumes PCM frames and publishes simple per-channel level data. +type LiveMeter struct { + mu sync.RWMutex + latest MeterSnapshot + subs map[chan MeterSnapshot]struct{} +} + +func NewLiveMeter() *LiveMeter { + return &LiveMeter{subs: make(map[chan MeterSnapshot]struct{})} +} + +func (m *LiveMeter) Consume(frame PCMFrame) { + if frame.Channels <= 0 || len(frame.Samples) == 0 { + return + } + meters := make([]ChannelMeter, frame.Channels) + fullScale := detectFullScale(frame.Samples) + sums := make([]float64, frame.Channels) + counts := make([]int, frame.Channels) + + for i, sample := range frame.Samples { + ch := i % frame.Channels + norm := float64(sample) / fullScale + abs := math.Abs(norm) + if abs > meters[ch].Peak { + meters[ch].Peak = abs + } + meters[ch].Latest = norm + sums[ch] += norm * norm + counts[ch]++ + } + for ch := range meters { + if counts[ch] > 0 { + meters[ch].RMS = math.Sqrt(sums[ch] / float64(counts[ch])) + } + } + + snap := MeterSnapshot{ + UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano), + Source: frame.Source, + SampleRateHz: frame.SampleRateHz, + Channels: frame.Channels, + Meters: meters, + } + + m.mu.Lock() + m.latest = snap + subs := make([]chan MeterSnapshot, 0, len(m.subs)) + for ch := range m.subs { + subs = append(subs, ch) + } + m.mu.Unlock() + + for _, ch := range subs { + select { + case ch <- snap: + default: + } + } +} + +func detectFullScale(samples []int32) float64 { + var maxAbs int64 + for _, s := range samples { + v := int64(s) + if v < 0 { + v = -v + } + if v > maxAbs { + maxAbs = v + } + } + if maxAbs <= 8388608 { + return 8388608.0 + } + return 2147483648.0 +} + +func (m *LiveMeter) Snapshot() MeterSnapshot { + m.mu.RLock() + defer m.mu.RUnlock() + return m.latest +} + +func (m *LiveMeter) Subscribe() (<-chan MeterSnapshot, func()) { + ch := make(chan MeterSnapshot, 8) + m.mu.Lock() + m.subs[ch] = struct{}{} + latest := m.latest + m.mu.Unlock() + + if latest.UpdatedAt != "" { + ch <- latest + } + + unsubscribe := func() { + m.mu.Lock() + if _, ok := m.subs[ch]; ok { + delete(m.subs, ch) + close(ch) + } + m.mu.Unlock() + } + return ch, unsubscribe +} diff --git a/aoiprxkit/meter_server.go b/aoiprxkit/meter_server.go new file mode 100644 index 0000000..8326e1f --- /dev/null +++ b/aoiprxkit/meter_server.go @@ -0,0 +1,205 @@ +package aoiprxkit + +import ( + "bufio" + "context" + "crypto/sha1" + "encoding/base64" + "encoding/json" + "io" + "net" + "net/http" + "strings" + "time" +) + +type MeterServer struct { + meter *LiveMeter + srv *http.Server +} + +func NewMeterServer(listenAddress string, meter *LiveMeter) *MeterServer { + if meter == nil { + meter = NewLiveMeter() + } + ms := &MeterServer{meter: meter} + mux := http.NewServeMux() + mux.HandleFunc("/", ms.handleIndex) + mux.HandleFunc("/healthz", ms.handleHealth) + mux.HandleFunc("/api/meter", ms.handleSnapshot) + mux.HandleFunc("/ws/live", ms.handleWS) + ms.srv = &http.Server{ + Addr: listenAddress, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 60 * time.Second, + } + return ms +} + +func (m *MeterServer) Meter() *LiveMeter { return m.meter } + +func (m *MeterServer) Start() error { + go func() { + _ = m.srv.ListenAndServe() + }() + return nil +} + +func (m *MeterServer) Shutdown(ctx context.Context) error { + return m.srv.Shutdown(ctx) +} + +func (m *MeterServer) handleHealth(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) +} + +func (m *MeterServer) handleSnapshot(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(m.meter.Snapshot()) +} + +func (m *MeterServer) handleIndex(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + _, _ = io.WriteString(w, meterIndexHTML) +} + +func (m *MeterServer) handleWS(w http.ResponseWriter, r *http.Request) { + if !headerContainsToken(r.Header, "Connection", "Upgrade") || !strings.EqualFold(r.Header.Get("Upgrade"), "websocket") { + http.Error(w, "upgrade required", http.StatusUpgradeRequired) + return + } + key := strings.TrimSpace(r.Header.Get("Sec-WebSocket-Key")) + if key == "" { + http.Error(w, "missing Sec-WebSocket-Key", http.StatusBadRequest) + return + } + hj, ok := w.(http.Hijacker) + if !ok { + http.Error(w, "hijacking not supported", http.StatusInternalServerError) + return + } + conn, rw, err := hj.Hijack() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + accept := computeWebSocketAccept(key) + _, _ = rw.WriteString("HTTP/1.1 101 Switching Protocols\r\n") + _, _ = rw.WriteString("Upgrade: websocket\r\n") + _, _ = rw.WriteString("Connection: Upgrade\r\n") + _, _ = rw.WriteString("Sec-WebSocket-Accept: " + accept + "\r\n\r\n") + _ = rw.Flush() + + ch, unsubscribe := m.meter.Subscribe() + defer unsubscribe() + defer conn.Close() + + _ = conn.SetDeadline(time.Time{}) + for snap := range ch { + payload, err := json.Marshal(snap) + if err != nil { + return + } + if err := writeWebSocketTextFrame(conn, payload); err != nil { + return + } + } +} + +func headerContainsToken(h http.Header, key, token string) bool { + for _, v := range h.Values(key) { + parts := strings.Split(v, ",") + for _, part := range parts { + if strings.EqualFold(strings.TrimSpace(part), token) { + return true + } + } + } + return false +} + +func computeWebSocketAccept(key string) string { + const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + sum := sha1.Sum([]byte(key + magic)) + return base64.StdEncoding.EncodeToString(sum[:]) +} + +func writeWebSocketTextFrame(conn net.Conn, payload []byte) error { + bw := bufio.NewWriter(conn) + header := []byte{0x81} + switch { + case len(payload) < 126: + header = append(header, byte(len(payload))) + case len(payload) <= 65535: + header = append(header, 126, byte(len(payload)>>8), byte(len(payload))) + default: + header = append(header, 127, + byte(uint64(len(payload))>>56), byte(uint64(len(payload))>>48), byte(uint64(len(payload))>>40), byte(uint64(len(payload))>>32), + byte(uint64(len(payload))>>24), byte(uint64(len(payload))>>16), byte(uint64(len(payload))>>8), byte(uint64(len(payload))), + ) + } + if _, err := bw.Write(header); err != nil { + return err + } + if _, err := bw.Write(payload); err != nil { + return err + } + return bw.Flush() +} + +const meterIndexHTML = ` + + + + + aoiprxkit meter + + + +

aoiprxkit live meter

+
waiting for frames…
+
+ + +` diff --git a/aoiprxkit/nmos/is05.go b/aoiprxkit/nmos/is05.go new file mode 100644 index 0000000..2de600e --- /dev/null +++ b/aoiprxkit/nmos/is05.go @@ -0,0 +1,62 @@ +package nmos + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" +) + +type ConnectionClient struct { + BaseURL string + HTTPClient *http.Client +} + +func NewConnectionClient(baseURL string) *ConnectionClient { + return &ConnectionClient{ + BaseURL: strings.TrimRight(baseURL, "/"), + HTTPClient: &http.Client{ + Timeout: 10 * time.Second, + }, + } +} + +func (c *ConnectionClient) StageReceiver(ctx context.Context, receiverID string, reqBody StagedReceiverRequest) error { + body, err := json.Marshal(reqBody) + if err != nil { + return err + } + url := fmt.Sprintf("%s/x-nmos/connection/v1.1/receivers/%s/staged", c.BaseURL, receiverID) + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := c.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("NMOS IS-05 stage receiver returned %s", resp.Status) + } + return nil +} + +func BuildRTPReceiverStagedRequest(senderID *string, sdp string) StagedReceiverRequest { + transportFile := map[string]string{ + "data": sdp, + "type": "application/sdp", + } + return StagedReceiverRequest{ + MasterEnable: true, + Activation: Activation{ + Mode: "activate_immediate", + }, + SenderID: senderID, + TransportFile: transportFile, + } +} diff --git a/aoiprxkit/nmos/models.go b/aoiprxkit/nmos/models.go new file mode 100644 index 0000000..6c9adde --- /dev/null +++ b/aoiprxkit/nmos/models.go @@ -0,0 +1,39 @@ +package nmos + +type Resource struct { + ID string `json:"id"` + Label string `json:"label,omitempty"` +} + +type Sender struct { + ID string `json:"id"` + Label string `json:"label,omitempty"` + Description string `json:"description,omitempty"` + Transport string `json:"transport,omitempty"` + DeviceID string `json:"device_id,omitempty"` + ManifestURL string `json:"manifest_href,omitempty"` + Subscription any `json:"subscription,omitempty"` + InterfaceBinds []string `json:"interface_bindings,omitempty"` +} + +type Receiver struct { + ID string `json:"id"` + Label string `json:"label,omitempty"` + Description string `json:"description,omitempty"` + DeviceID string `json:"device_id,omitempty"` + Transport string `json:"transport,omitempty"` + Format string `json:"format,omitempty"` +} + +type Activation struct { + Mode string `json:"mode"` + RequestedTime string `json:"requested_time,omitempty"` +} + +type StagedReceiverRequest struct { + MasterEnable bool `json:"master_enable"` + Activation Activation `json:"activation"` + SenderID *string `json:"sender_id,omitempty"` + TransportFile map[string]string `json:"transport_file,omitempty"` + TransportParams []map[string]any `json:"transport_params,omitempty"` +} diff --git a/aoiprxkit/nmos/query.go b/aoiprxkit/nmos/query.go new file mode 100644 index 0000000..6c6a8e9 --- /dev/null +++ b/aoiprxkit/nmos/query.go @@ -0,0 +1,56 @@ +package nmos + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" +) + +type QueryClient struct { + BaseURL string + HTTPClient *http.Client +} + +func NewQueryClient(baseURL string) *QueryClient { + return &QueryClient{ + BaseURL: strings.TrimRight(baseURL, "/"), + HTTPClient: &http.Client{ + Timeout: 10 * time.Second, + }, + } +} + +func (c *QueryClient) GetSenders(ctx context.Context) ([]Sender, error) { + var out []Sender + if err := c.getJSON(ctx, "/x-nmos/query/v1.3/senders", &out); err != nil { + return nil, err + } + return out, nil +} + +func (c *QueryClient) GetReceivers(ctx context.Context) ([]Receiver, error) { + var out []Receiver + if err := c.getJSON(ctx, "/x-nmos/query/v1.3/receivers", &out); err != nil { + return nil, err + } + return out, nil +} + +func (c *QueryClient) getJSON(ctx context.Context, path string, target any) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.BaseURL+path, nil) + if err != nil { + return err + } + resp, err := c.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("NMOS query %s returned %s", path, resp.Status) + } + return json.NewDecoder(resp.Body).Decode(target) +} diff --git a/aoiprxkit/pcm.go b/aoiprxkit/pcm.go new file mode 100644 index 0000000..cf8ccda --- /dev/null +++ b/aoiprxkit/pcm.go @@ -0,0 +1,50 @@ +package aoiprxkit + +import ( + "encoding/binary" + "fmt" +) + +// DecodeL24BE decodes signed 24-bit big-endian PCM into int32 samples sign-extended to 32 bits. +func DecodeL24BE(payload []byte, channels int) ([]int32, error) { + if channels < 1 { + return nil, fmt.Errorf("invalid channels: %d", channels) + } + if len(payload)%3 != 0 { + return nil, fmt.Errorf("payload length %d is not divisible by 3", len(payload)) + } + totalSamples := len(payload) / 3 + if totalSamples%channels != 0 { + return nil, fmt.Errorf("payload sample count %d is not divisible by channels %d", totalSamples, channels) + } + out := make([]int32, totalSamples) + j := 0 + for i := 0; i < len(payload); i += 3 { + v := int32(payload[i])<<16 | int32(payload[i+1])<<8 | int32(payload[i+2]) + if v&0x800000 != 0 { + v |= ^int32(0xFFFFFF) + } + out[j] = v + j++ + } + return out, nil +} + +// DecodeS32LE decodes signed 32-bit little-endian PCM into int32 samples. +func DecodeS32LE(payload []byte, channels int) ([]int32, error) { + if channels < 1 { + return nil, fmt.Errorf("invalid channels: %d", channels) + } + if len(payload)%4 != 0 { + return nil, fmt.Errorf("payload length %d is not divisible by 4", len(payload)) + } + totalSamples := len(payload) / 4 + if totalSamples%channels != 0 { + return nil, fmt.Errorf("payload sample count %d is not divisible by channels %d", totalSamples, channels) + } + out := make([]int32, totalSamples) + for i := 0; i < totalSamples; i++ { + out[i] = int32(binary.LittleEndian.Uint32(payload[i*4 : i*4+4])) + } + return out, nil +} diff --git a/aoiprxkit/pcm_test.go b/aoiprxkit/pcm_test.go new file mode 100644 index 0000000..431b99c --- /dev/null +++ b/aoiprxkit/pcm_test.go @@ -0,0 +1,39 @@ +package aoiprxkit + +import "testing" + +func TestDecodeL24BE(t *testing.T) { + payload := []byte{ + 0x7f, 0xff, 0xff, + 0x80, 0x00, 0x00, + 0x00, 0x00, 0x01, + 0xff, 0xff, 0xff, + } + got, err := DecodeL24BE(payload, 2) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + want := []int32{8388607, -8388608, 1, -1} + if len(got) != len(want) { + t.Fatalf("len mismatch: got=%d want=%d", len(got), len(want)) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("sample %d mismatch: got=%d want=%d", i, got[i], want[i]) + } + } +} + +func TestDecodeS32LE(t *testing.T) { + payload := []byte{ + 0x01, 0x00, 0x00, 0x00, + 0xff, 0xff, 0xff, 0xff, + } + got, err := DecodeS32LE(payload, 1) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if len(got) != 2 || got[0] != 1 || got[1] != -1 { + t.Fatalf("unexpected samples: %+v", got) + } +} diff --git a/aoiprxkit/receiver.go b/aoiprxkit/receiver.go new file mode 100644 index 0000000..7ea6a52 --- /dev/null +++ b/aoiprxkit/receiver.go @@ -0,0 +1,194 @@ +package aoiprxkit + +import ( + "context" + "fmt" + "net" + "sync" + "time" +) + +type PCMFrame struct { + SequenceNumber uint16 + Timestamp uint32 + SampleRateHz int + Channels int + Samples []int32 // interleaved + ReceivedAt time.Time + Source string +} + +type FrameHandler func(frame PCMFrame) + +type Receiver struct { + cfg Config + onFrame FrameHandler + + mu sync.Mutex + conn *net.UDPConn + cancel context.CancelFunc + done chan struct{} + doneOnce sync.Once + stats statsAtomic +} + +func NewReceiver(cfg Config, onFrame FrameHandler) (*Receiver, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + if onFrame == nil { + return nil, fmt.Errorf("onFrame must not be nil") + } + return &Receiver{ + cfg: cfg, + onFrame: onFrame, + done: make(chan struct{}), + }, nil +} + +func (r *Receiver) Start(ctx context.Context) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.conn != nil { + return fmt.Errorf("receiver already started") + } + + group := net.ParseIP(r.cfg.MulticastGroup) + ifi, err := resolveMulticastInterface(r.cfg.InterfaceName) + if err != nil { + return err + } + + addr := &net.UDPAddr{IP: group, Port: r.cfg.Port} + conn, err := net.ListenMulticastUDP("udp4", ifi, addr) + if err != nil { + return fmt.Errorf("listen multicast UDP: %w", err) + } + if r.cfg.ReadBufferBytes > 0 { + _ = conn.SetReadBuffer(r.cfg.ReadBufferBytes) + } + + cctx, cancel := context.WithCancel(ctx) + r.conn = conn + r.cancel = cancel + r.done = make(chan struct{}) + r.doneOnce = sync.Once{} + go r.loop(cctx) + return nil +} + +func (r *Receiver) Stop() error { + r.mu.Lock() + if r.conn == nil { + r.mu.Unlock() + return nil + } + conn := r.conn + cancel := r.cancel + done := r.done + r.conn = nil + r.cancel = nil + r.mu.Unlock() + + if cancel != nil { + cancel() + } + _ = conn.Close() + <-done + return nil +} + +func (r *Receiver) Stats() Stats { + return r.stats.snapshot() +} + +func (r *Receiver) loop(ctx context.Context) { + defer r.doneOnce.Do(func() { close(r.done) }) + + jb := newJitterBuffer(r.cfg.JitterDepthPackets) + buf := make([]byte, 64*1024) + + for { + select { + case <-ctx.Done(): + return + default: + } + + _ = r.conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + n, _, err := r.conn.ReadFromUDP(buf) + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Timeout() { + continue + } + return + } + + r.stats.packetsReceived.Add(1) + if n < 12 { + r.stats.packetsShort.Add(1) + continue + } + + pkt, err := ParseRTPPacket(buf[:n]) + if err != nil { + r.stats.packetsShort.Add(1) + continue + } + r.stats.packetsParsed.Add(1) + + if pkt.PayloadType != r.cfg.PayloadType { + r.stats.packetsWrongPT.Add(1) + continue + } + + ready, lateDrop, gapLoss, reorder := jb.push(pkt) + if lateDrop { + r.stats.packetsLateDrop.Add(1) + continue + } + if gapLoss > 0 { + r.stats.packetsGapLoss.Add(gapLoss) + } + if reorder { + r.stats.jitterReorders.Add(1) + } + + for _, rp := range ready { + samples, err := DecodeL24BE(rp.Payload, r.cfg.Channels) + if err != nil { + r.stats.decodeErrors.Add(1) + continue + } + frame := PCMFrame{ + SequenceNumber: rp.SequenceNumber, + Timestamp: rp.Timestamp, + SampleRateHz: r.cfg.SampleRateHz, + Channels: r.cfg.Channels, + Samples: samples, + ReceivedAt: time.Now(), + Source: fmt.Sprintf("rtp://%s:%d", r.cfg.MulticastGroup, r.cfg.Port), + } + r.onFrame(frame) + r.stats.packetsDelivered.Add(1) + r.stats.samplesDelivered.Add(uint64(len(samples))) + if r.cfg.Channels > 0 { + r.stats.framesDelivered.Add(uint64(len(samples) / r.cfg.Channels)) + } + r.stats.lastSequence.Store(uint32(rp.SequenceNumber)) + r.stats.sequenceValid.Store(1) + } + } +} + +func resolveMulticastInterface(name string) (*net.Interface, error) { + if name == "" { + return nil, nil + } + ifi, err := net.InterfaceByName(name) + if err != nil { + return nil, fmt.Errorf("resolve interface %q: %w", name, err) + } + return ifi, nil +} diff --git a/aoiprxkit/rtp.go b/aoiprxkit/rtp.go new file mode 100644 index 0000000..b2dc104 --- /dev/null +++ b/aoiprxkit/rtp.go @@ -0,0 +1,68 @@ +package aoiprxkit + +import ( + "encoding/binary" + "errors" +) + +type RTPPacket struct { + Version uint8 + Padding bool + Extension bool + CSRCCount uint8 + Marker bool + PayloadType uint8 + SequenceNumber uint16 + Timestamp uint32 + SSRC uint32 + Payload []byte +} + +func ParseRTPPacket(buf []byte) (RTPPacket, error) { + if len(buf) < 12 { + return RTPPacket{}, errors.New("RTP packet too short") + } + b0 := buf[0] + b1 := buf[1] + p := RTPPacket{ + Version: b0 >> 6, + Padding: (b0 & 0x20) != 0, + Extension: (b0 & 0x10) != 0, + CSRCCount: b0 & 0x0F, + Marker: (b1 & 0x80) != 0, + PayloadType: b1 & 0x7F, + SequenceNumber: binary.BigEndian.Uint16(buf[2:4]), + Timestamp: binary.BigEndian.Uint32(buf[4:8]), + SSRC: binary.BigEndian.Uint32(buf[8:12]), + } + if p.Version != 2 { + return RTPPacket{}, errors.New("unsupported RTP version") + } + headerLen := 12 + int(p.CSRCCount)*4 + if len(buf) < headerLen { + return RTPPacket{}, errors.New("RTP packet too short for CSRC list") + } + if p.Extension { + if len(buf) < headerLen+4 { + return RTPPacket{}, errors.New("RTP packet too short for extension") + } + extLenWords := int(binary.BigEndian.Uint16(buf[headerLen+2 : headerLen+4])) + headerLen += 4 + extLenWords*4 + if len(buf) < headerLen { + return RTPPacket{}, errors.New("RTP packet too short for full extension") + } + } + payload := buf[headerLen:] + if p.Padding { + if len(payload) == 0 { + return RTPPacket{}, errors.New("RTP packet has invalid padding") + } + padLen := int(payload[len(payload)-1]) + if padLen <= 0 || padLen > len(payload) { + return RTPPacket{}, errors.New("RTP packet has invalid pad length") + } + payload = payload[:len(payload)-padLen] + } + p.Payload = payload + return p, nil +} diff --git a/aoiprxkit/rtp_test.go b/aoiprxkit/rtp_test.go new file mode 100644 index 0000000..9d12778 --- /dev/null +++ b/aoiprxkit/rtp_test.go @@ -0,0 +1,22 @@ +package aoiprxkit + +import "testing" + +func TestParseRTPPacket(t *testing.T) { + buf := []byte{ + 0x80, 0x61, 0x12, 0x34, + 0x00, 0x00, 0x00, 0x05, + 0x11, 0x22, 0x33, 0x44, + 0x01, 0x02, 0x03, + } + p, err := ParseRTPPacket(buf) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if p.Version != 2 || p.PayloadType != 97 || p.SequenceNumber != 0x1234 || p.Timestamp != 5 || p.SSRC != 0x11223344 { + t.Fatalf("unexpected packet: %+v", p) + } + if len(p.Payload) != 3 || p.Payload[0] != 1 || p.Payload[2] != 3 { + t.Fatalf("unexpected payload: %v", p.Payload) + } +} diff --git a/aoiprxkit/sap.go b/aoiprxkit/sap.go new file mode 100644 index 0000000..2a533fb --- /dev/null +++ b/aoiprxkit/sap.go @@ -0,0 +1,115 @@ +package aoiprxkit + +import ( + "encoding/binary" + "fmt" + "net" +) + +const ( + DefaultSAPGroup = "224.2.127.254" + DefaultSAPPort = 9875 +) + +type SAPPacket struct { + Version uint8 + AddressTypeIPv6 bool + IsDelete bool + Encrypted bool + Compressed bool + AuthLenWords uint8 + MessageIDHash uint16 + OriginSource net.IP + PayloadType string + Payload []byte +} + +type SAPAnnouncement struct { + ReceivedAt string `json:"receivedAt"` + SourceAddr string `json:"sourceAddr"` + MessageID uint16 `json:"messageIdHash"` + Delete bool `json:"delete"` + PayloadType string `json:"payloadType"` + SDP string `json:"sdp"` + ParsedSDP SDPInfo `json:"parsedSdp"` +} + +func ParseSAPPacket(buf []byte) (SAPPacket, error) { + if len(buf) < 8 { + return SAPPacket{}, fmt.Errorf("SAP packet too short") + } + + b0 := buf[0] + version := b0 >> 5 + if version != 1 { + return SAPPacket{}, fmt.Errorf("unsupported SAP version %d", version) + } + + addrTypeIPv6 := (b0 & 0x10) != 0 + isDelete := (b0 & 0x04) != 0 + encrypted := (b0 & 0x02) != 0 + compressed := (b0 & 0x01) != 0 + authLenWords := buf[1] + msgID := binary.BigEndian.Uint16(buf[2:4]) + + hdrLen := 4 + var origin net.IP + if addrTypeIPv6 { + if len(buf) < hdrLen+16 { + return SAPPacket{}, fmt.Errorf("SAP packet too short for IPv6 source") + } + origin = net.IP(buf[hdrLen : hdrLen+16]) + hdrLen += 16 + } else { + if len(buf) < hdrLen+4 { + return SAPPacket{}, fmt.Errorf("SAP packet too short for IPv4 source") + } + origin = net.IP(buf[hdrLen : hdrLen+4]) + hdrLen += 4 + } + + authBytes := int(authLenWords) * 4 + if len(buf) < hdrLen+authBytes { + return SAPPacket{}, fmt.Errorf("SAP packet too short for auth section") + } + hdrLen += authBytes + + if encrypted || compressed { + return SAPPacket{}, fmt.Errorf("encrypted/compressed SAP payloads are not supported") + } + + payloadType := "application/sdp" + payloadStart := hdrLen + + if len(buf) > payloadStart && !(len(buf)-payloadStart >= 4 && string(buf[payloadStart:payloadStart+4]) == "v=0\n" || len(buf)-payloadStart >= 5 && string(buf[payloadStart:payloadStart+5]) == "v=0\r\n") { + nul := -1 + for i := payloadStart; i < len(buf); i++ { + if buf[i] == 0 { + nul = i + break + } + } + if nul == -1 { + return SAPPacket{}, fmt.Errorf("SAP payload type missing NUL terminator") + } + payloadType = string(buf[payloadStart:nul]) + payloadStart = nul + 1 + } + + if payloadStart > len(buf) { + return SAPPacket{}, fmt.Errorf("invalid SAP payload start") + } + + return SAPPacket{ + Version: version, + AddressTypeIPv6: addrTypeIPv6, + IsDelete: isDelete, + Encrypted: encrypted, + Compressed: compressed, + AuthLenWords: authLenWords, + MessageIDHash: msgID, + OriginSource: origin, + PayloadType: payloadType, + Payload: append([]byte(nil), buf[payloadStart:]...), + }, nil +} diff --git a/aoiprxkit/sap_listener.go b/aoiprxkit/sap_listener.go new file mode 100644 index 0000000..6afe96a --- /dev/null +++ b/aoiprxkit/sap_listener.go @@ -0,0 +1,150 @@ +package aoiprxkit + +import ( + "context" + "fmt" + "net" + "sync" + "time" +) + +type SAPListenerConfig struct { + Group string + Port int + InterfaceName string + ReadBuffer int +} + +func DefaultSAPListenerConfig() SAPListenerConfig { + return SAPListenerConfig{ + Group: DefaultSAPGroup, + Port: DefaultSAPPort, + ReadBuffer: 1 << 20, + } +} + +type SAPHandler func(announcement SAPAnnouncement) + +type SAPListener struct { + cfg SAPListenerConfig + onPacket SAPHandler + + mu sync.Mutex + conn *net.UDPConn + cancel context.CancelFunc + done chan struct{} + doneOnce sync.Once +} + +func NewSAPListener(cfg SAPListenerConfig, onPacket SAPHandler) (*SAPListener, error) { + if cfg.Group == "" { + cfg.Group = DefaultSAPGroup + } + if cfg.Port == 0 { + cfg.Port = DefaultSAPPort + } + if onPacket == nil { + return nil, fmt.Errorf("onPacket must not be nil") + } + if net.ParseIP(cfg.Group) == nil { + return nil, fmt.Errorf("invalid SAP group: %q", cfg.Group) + } + return &SAPListener{ + cfg: cfg, + onPacket: onPacket, + done: make(chan struct{}), + }, nil +} + +func (l *SAPListener) Start(ctx context.Context) error { + l.mu.Lock() + defer l.mu.Unlock() + if l.conn != nil { + return fmt.Errorf("SAP listener already started") + } + + ifi, err := resolveMulticastInterface(l.cfg.InterfaceName) + if err != nil { + return err + } + group := net.ParseIP(l.cfg.Group) + addr := &net.UDPAddr{IP: group, Port: l.cfg.Port} + conn, err := net.ListenMulticastUDP("udp4", ifi, addr) + if err != nil { + return fmt.Errorf("listen SAP multicast UDP: %w", err) + } + if l.cfg.ReadBuffer > 0 { + _ = conn.SetReadBuffer(l.cfg.ReadBuffer) + } + + cctx, cancel := context.WithCancel(ctx) + l.conn = conn + l.cancel = cancel + l.done = make(chan struct{}) + l.doneOnce = sync.Once{} + go l.loop(cctx) + return nil +} + +func (l *SAPListener) Stop() error { + l.mu.Lock() + if l.conn == nil { + l.mu.Unlock() + return nil + } + conn := l.conn + cancel := l.cancel + done := l.done + l.conn = nil + l.cancel = nil + l.mu.Unlock() + + if cancel != nil { + cancel() + } + _ = conn.Close() + <-done + return nil +} + +func (l *SAPListener) loop(ctx context.Context) { + defer l.doneOnce.Do(func() { close(l.done) }) + + buf := make([]byte, 64*1024) + for { + select { + case <-ctx.Done(): + return + default: + } + _ = l.conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + n, src, err := l.conn.ReadFromUDP(buf) + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Timeout() { + continue + } + return + } + pkt, err := ParseSAPPacket(buf[:n]) + if err != nil { + continue + } + if pkt.PayloadType != "application/sdp" { + continue + } + sdp := string(pkt.Payload) + info, err := ParseMinimalSDP(sdp) + if err != nil { + continue + } + l.onPacket(SAPAnnouncement{ + ReceivedAt: time.Now().UTC().Format(time.RFC3339Nano), + SourceAddr: src.String(), + MessageID: pkt.MessageIDHash, + Delete: pkt.IsDelete, + PayloadType: pkt.PayloadType, + SDP: sdp, + ParsedSDP: info, + }) + } +} diff --git a/aoiprxkit/sap_test.go b/aoiprxkit/sap_test.go new file mode 100644 index 0000000..534ab26 --- /dev/null +++ b/aoiprxkit/sap_test.go @@ -0,0 +1,28 @@ +package aoiprxkit + +import "testing" + +func TestParseSAPPacket(t *testing.T) { + payload := []byte("application/sdp\x00v=0\n" + + "o=- 1 1 IN IP4 192.168.1.10\n" + + "s=Test\n" + + "c=IN IP4 239.69.0.1/32\n" + + "t=0 0\n" + + "m=audio 5004 RTP/AVP 97\n" + + "a=rtpmap:97 L24/48000/2\n") + pkt := []byte{ + 0x20, // V=1, IPv4, announce, no enc/compress + 0x00, // auth len + 0x12, 0x34, + 192, 168, 1, 50, + } + pkt = append(pkt, payload...) + + got, err := ParseSAPPacket(pkt) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if got.Version != 1 || got.MessageIDHash != 0x1234 || got.PayloadType != "application/sdp" || got.OriginSource.String() != "192.168.1.50" { + t.Fatalf("unexpected SAP packet: %+v", got) + } +} diff --git a/aoiprxkit/sdp.go b/aoiprxkit/sdp.go new file mode 100644 index 0000000..95f83f9 --- /dev/null +++ b/aoiprxkit/sdp.go @@ -0,0 +1,116 @@ +package aoiprxkit + +import ( + "fmt" + "net" + "strconv" + "strings" + "time" +) + +type SDPInfo struct { + SessionName string + Origin string + MulticastGroup string + Port int + PayloadType uint8 + Encoding string + SampleRateHz int + Channels int + PacketTimeMS int +} + +// ParseMinimalSDP extracts the multicast address, port and one rtpmap line. +// It is deliberately small and not a full SDP parser. +func ParseMinimalSDP(s string) (SDPInfo, error) { + var out SDPInfo + lines := strings.Split(strings.ReplaceAll(s, "\r\n", "\n"), "\n") + for _, raw := range lines { + line := strings.TrimSpace(raw) + switch { + case strings.HasPrefix(line, "s="): + out.SessionName = strings.TrimPrefix(line, "s=") + + case strings.HasPrefix(line, "o="): + out.Origin = strings.TrimPrefix(line, "o=") + + case strings.HasPrefix(line, "c=IN IP4 "): + rest := strings.TrimPrefix(line, "c=IN IP4 ") + host := strings.Split(rest, "/")[0] + if net.ParseIP(host) == nil { + return out, fmt.Errorf("invalid multicast host in c=: %q", host) + } + out.MulticastGroup = host + + case strings.HasPrefix(line, "m=audio "): + fields := strings.Fields(line) + if len(fields) < 4 { + return out, fmt.Errorf("invalid m=audio line") + } + port, err := strconv.Atoi(fields[1]) + if err != nil { + return out, fmt.Errorf("invalid audio port: %w", err) + } + pt, err := strconv.Atoi(fields[3]) + if err != nil { + return out, fmt.Errorf("invalid payload type: %w", err) + } + out.Port = port + out.PayloadType = uint8(pt) + + case strings.HasPrefix(line, "a=rtpmap:"): + rest := strings.TrimPrefix(line, "a=rtpmap:") + parts := strings.Fields(rest) + if len(parts) != 2 { + return out, fmt.Errorf("invalid rtpmap line") + } + pt, err := strconv.Atoi(parts[0]) + if err != nil { + return out, fmt.Errorf("invalid rtpmap payload type: %w", err) + } + codecParts := strings.Split(parts[1], "/") + if len(codecParts) < 2 { + return out, fmt.Errorf("invalid rtpmap codec tuple") + } + sr, err := strconv.Atoi(codecParts[1]) + if err != nil { + return out, fmt.Errorf("invalid rtpmap sample rate: %w", err) + } + ch := 1 + if len(codecParts) >= 3 { + ch, err = strconv.Atoi(codecParts[2]) + if err != nil { + return out, fmt.Errorf("invalid rtpmap channel count: %w", err) + } + } + out.PayloadType = uint8(pt) + out.Encoding = codecParts[0] + out.SampleRateHz = sr + out.Channels = ch + + case strings.HasPrefix(line, "a=ptime:"): + ms, err := strconv.Atoi(strings.TrimPrefix(line, "a=ptime:")) + if err == nil { + out.PacketTimeMS = ms + } + } + } + if out.MulticastGroup == "" || out.Port == 0 || out.Encoding == "" || out.SampleRateHz == 0 { + return out, fmt.Errorf("incomplete SDP: %+v", out) + } + return out, nil +} + +func ConfigFromSDP(base Config, info SDPInfo) (Config, error) { + cfg := base + cfg.MulticastGroup = info.MulticastGroup + cfg.Port = info.Port + cfg.PayloadType = info.PayloadType + cfg.SampleRateHz = info.SampleRateHz + cfg.Channels = info.Channels + cfg.Encoding = info.Encoding + if info.PacketTimeMS > 0 { + cfg.PacketTime = time.Duration(info.PacketTimeMS) * time.Millisecond + } + return cfg, cfg.Validate() +} diff --git a/aoiprxkit/sdp_test.go b/aoiprxkit/sdp_test.go new file mode 100644 index 0000000..52379bf --- /dev/null +++ b/aoiprxkit/sdp_test.go @@ -0,0 +1,22 @@ +package aoiprxkit + +import "testing" + +func TestParseMinimalSDP(t *testing.T) { + sdp := `v=0 +o=- 1 1 IN IP4 192.168.1.10 +s=Test +c=IN IP4 239.69.0.1/32 +t=0 0 +m=audio 5004 RTP/AVP 97 +a=rtpmap:97 L24/48000/2 +a=ptime:1 +` + got, err := ParseMinimalSDP(sdp) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if got.MulticastGroup != "239.69.0.1" || got.Port != 5004 || got.PayloadType != 97 || got.Encoding != "L24" || got.SampleRateHz != 48000 || got.Channels != 2 || got.PacketTimeMS != 1 { + t.Fatalf("unexpected parsed SDP: %+v", got) + } +} diff --git a/aoiprxkit/srt.go b/aoiprxkit/srt.go new file mode 100644 index 0000000..eec9c52 --- /dev/null +++ b/aoiprxkit/srt.go @@ -0,0 +1,70 @@ +package aoiprxkit + +import ( + "context" + "fmt" + "io" +) + +type SRTConfig struct { + URL string + Mode string + SampleRateHz int + Channels int + SourceLabel string +} + +func DefaultSRTConfig() SRTConfig { + return SRTConfig{ + SampleRateHz: 48000, + Channels: 2, + Mode: "listener", + } +} + +func (c SRTConfig) Validate() error { + if c.URL == "" { + return fmt.Errorf("SRT URL must not be empty") + } + if c.SampleRateHz <= 0 { + return fmt.Errorf("SampleRateHz must be > 0") + } + if c.Channels < 1 || c.Channels > 2 { + return fmt.Errorf("Channels must be 1 or 2") + } + return nil +} + +type SRTConnOpener func(ctx context.Context, cfg SRTConfig) (io.ReadCloser, error) + +type SRTReceiver struct { + cfg SRTConfig + streamRx *StreamReceiver +} + +func NewSRTReceiver(cfg SRTConfig, onFrame FrameHandler) (*SRTReceiver, error) { + return NewSRTReceiverWithOpener(cfg, defaultSRTConnOpener, onFrame) +} + +func NewSRTReceiverWithOpener(cfg SRTConfig, opener SRTConnOpener, onFrame FrameHandler) (*SRTReceiver, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + if opener == nil { + return nil, fmt.Errorf("SRT opener must not be nil") + } + src := cfg.SourceLabel + if src == "" { + src = cfg.URL + } + streamRx, err := NewStreamReceiver(StreamReceiverConfig{SourceLabel: src}, func(ctx context.Context) (io.ReadCloser, error) { + return opener(ctx, cfg) + }, onFrame) + if err != nil { + return nil, err + } + return &SRTReceiver{cfg: cfg, streamRx: streamRx}, nil +} + +func (r *SRTReceiver) Start(ctx context.Context) error { return r.streamRx.Start(ctx) } +func (r *SRTReceiver) Stop() error { return r.streamRx.Stop() } diff --git a/aoiprxkit/srt_gosrt.go.example b/aoiprxkit/srt_gosrt.go.example new file mode 100644 index 0000000..c40a1b8 --- /dev/null +++ b/aoiprxkit/srt_gosrt.go.example @@ -0,0 +1,13 @@ +// Example only. Rename to srt_gosrt.go in your target repo and wire it to github.com/datarhei/gosrt once that dependency is available. +//go:build gosrt + +package aoiprxkit + +// This file is intentionally left as a non-compiling example placeholder in the package zip. +// Reason: the current environment cannot fetch external Go modules, and the exact gosrt API +// should be verified against the version you vendor or pin in your target repository. +// +// Expected job of the real implementation: +// - parse cfg.URL +// - open a gosrt listener/caller depending on cfg.Mode +// - return an io.ReadCloser that yields framed PCM packets defined by stream_proto.go diff --git a/aoiprxkit/srt_profile.md b/aoiprxkit/srt_profile.md new file mode 100644 index 0000000..0a5dd5e --- /dev/null +++ b/aoiprxkit/srt_profile.md @@ -0,0 +1,13 @@ +# SRT framed-PCM profile + +This module now assumes a deliberately narrow WAN profile: + +- transport: SRT +- payload framing: custom framed stream defined in `stream_proto.go` +- codec today: PCM S32LE +- codec reserved for later: Opus + +Rationale: +- keep the Go stack small and deterministic +- avoid generic container/demux complexity +- make WAN ingest compatible with the same `PCMFrame` callback used by RTP/AES67-lite diff --git a/aoiprxkit/srt_stub.go b/aoiprxkit/srt_stub.go new file mode 100644 index 0000000..4fffbec --- /dev/null +++ b/aoiprxkit/srt_stub.go @@ -0,0 +1,15 @@ +//go:build !gosrt + +package aoiprxkit + +import ( + "context" + "fmt" + "io" +) + +func defaultSRTConnOpener(ctx context.Context, cfg SRTConfig) (io.ReadCloser, error) { + _ = ctx + _ = cfg + return nil, fmt.Errorf("native SRT transport is not linked in this build: provide a custom opener or add a gosrt-backed opener in your target repo") +} diff --git a/aoiprxkit/srt_test.go b/aoiprxkit/srt_test.go new file mode 100644 index 0000000..3adda79 --- /dev/null +++ b/aoiprxkit/srt_test.go @@ -0,0 +1,58 @@ +package aoiprxkit + +import ( + "bytes" + "context" + "io" + "testing" + "time" +) + +type readCloser struct{ io.Reader } + +func (r readCloser) Close() error { return nil } + +func TestSRTReceiverWithCustomOpener(t *testing.T) { + var stream bytes.Buffer + samples := []int32{1, 2, 3, 4} + if err := WritePCM32Packet(&stream, 2, 48000, 2, 1, 480, samples); err != nil { + t.Fatalf("unexpected write error: %v", err) + } + + got := make(chan PCMFrame, 1) + rx, err := NewSRTReceiverWithOpener(SRTConfig{ + URL: "srt://example:9000?mode=listener", + SampleRateHz: 48000, + Channels: 2, + }, func(ctx context.Context, cfg SRTConfig) (io.ReadCloser, error) { + _ = ctx + _ = cfg + return readCloser{Reader: bytes.NewReader(stream.Bytes())}, nil + }, func(frame PCMFrame) { + select { + case got <- frame: + default: + } + }) + if err != nil { + t.Fatalf("unexpected constructor error: %v", err) + } + if err := rx.Start(context.Background()); err != nil { + t.Fatalf("unexpected start error: %v", err) + } + defer rx.Stop() + + select { + case frame := <-got: + if len(frame.Samples) != len(samples) { + t.Fatalf("unexpected sample len: %d", len(frame.Samples)) + } + for i := range samples { + if frame.Samples[i] != samples[i] { + t.Fatalf("sample %d mismatch: got=%d want=%d", i, frame.Samples[i], samples[i]) + } + } + case <-time.After(500 * time.Millisecond): + t.Fatalf("timeout waiting for frame") + } +} diff --git a/aoiprxkit/stats.go b/aoiprxkit/stats.go new file mode 100644 index 0000000..4e0ff45 --- /dev/null +++ b/aoiprxkit/stats.go @@ -0,0 +1,53 @@ +package aoiprxkit + +import "sync/atomic" + +type Stats struct { + PacketsReceived uint64 `json:"packetsReceived"` + PacketsParsed uint64 `json:"packetsParsed"` + PacketsDelivered uint64 `json:"packetsDelivered"` + PacketsLateDrop uint64 `json:"packetsLateDrop"` + PacketsGapLoss uint64 `json:"packetsGapLoss"` + PacketsWrongPT uint64 `json:"packetsWrongPayloadType"` + PacketsShort uint64 `json:"packetsTooShort"` + JitterReorders uint64 `json:"jitterReorders"` + DecodeErrors uint64 `json:"decodeErrors"` + SamplesDelivered uint64 `json:"samplesDelivered"` + FramesDelivered uint64 `json:"framesDelivered"` + LastSequence uint32 `json:"lastSequence"` + SequenceValid uint32 `json:"sequenceValid"` +} + +type statsAtomic struct { + packetsReceived atomic.Uint64 + packetsParsed atomic.Uint64 + packetsDelivered atomic.Uint64 + packetsLateDrop atomic.Uint64 + packetsGapLoss atomic.Uint64 + packetsWrongPT atomic.Uint64 + packetsShort atomic.Uint64 + jitterReorders atomic.Uint64 + decodeErrors atomic.Uint64 + samplesDelivered atomic.Uint64 + framesDelivered atomic.Uint64 + lastSequence atomic.Uint32 + sequenceValid atomic.Uint32 +} + +func (s *statsAtomic) snapshot() Stats { + return Stats{ + PacketsReceived: s.packetsReceived.Load(), + PacketsParsed: s.packetsParsed.Load(), + PacketsDelivered: s.packetsDelivered.Load(), + PacketsLateDrop: s.packetsLateDrop.Load(), + PacketsGapLoss: s.packetsGapLoss.Load(), + PacketsWrongPT: s.packetsWrongPT.Load(), + PacketsShort: s.packetsShort.Load(), + JitterReorders: s.jitterReorders.Load(), + DecodeErrors: s.decodeErrors.Load(), + SamplesDelivered: s.samplesDelivered.Load(), + FramesDelivered: s.framesDelivered.Load(), + LastSequence: s.lastSequence.Load(), + SequenceValid: s.sequenceValid.Load(), + } +} diff --git a/aoiprxkit/stream_finder.go b/aoiprxkit/stream_finder.go new file mode 100644 index 0000000..af2f614 --- /dev/null +++ b/aoiprxkit/stream_finder.go @@ -0,0 +1,137 @@ +package aoiprxkit + +import ( + "context" + "fmt" + "sync" + "time" +) + +// StreamFinder keeps a live in-memory view of SAP/SDP announcements +// and can wait for sessions by their SDP "s=" session name. +type StreamFinder struct { + listener *SAPListener + + mu sync.Mutex + sessions map[string]SAPAnnouncement + waiters map[string][]chan SAPAnnouncement +} + +func NewStreamFinder(cfg SAPListenerConfig) (*StreamFinder, error) { + sf := &StreamFinder{ + sessions: make(map[string]SAPAnnouncement), + waiters: make(map[string][]chan SAPAnnouncement), + } + listener, err := NewSAPListener(cfg, sf.handleAnnouncement) + if err != nil { + return nil, err + } + sf.listener = listener + return sf, nil +} + +func (s *StreamFinder) Start(ctx context.Context) error { + return s.listener.Start(ctx) +} + +func (s *StreamFinder) Stop() error { + return s.listener.Stop() +} + +func (s *StreamFinder) handleAnnouncement(a SAPAnnouncement) { + name := a.ParsedSDP.SessionName + if name == "" { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + if a.Delete { + delete(s.sessions, name) + return + } + + s.sessions[name] = a + if waiters := s.waiters[name]; len(waiters) > 0 { + delete(s.waiters, name) + for _, ch := range waiters { + select { + case ch <- a: + default: + } + close(ch) + } + } +} + +func (s *StreamFinder) FindByStreamName(name string) (SAPAnnouncement, bool) { + s.mu.Lock() + defer s.mu.Unlock() + a, ok := s.sessions[name] + return a, ok +} + +func (s *StreamFinder) WaitForStreamName(ctx context.Context, name string) (SAPAnnouncement, error) { + if name == "" { + return SAPAnnouncement{}, fmt.Errorf("stream name must not be empty") + } + + s.mu.Lock() + if a, ok := s.sessions[name]; ok { + s.mu.Unlock() + return a, nil + } + ch := make(chan SAPAnnouncement, 1) + s.waiters[name] = append(s.waiters[name], ch) + s.mu.Unlock() + + select { + case <-ctx.Done(): + s.mu.Lock() + waiters := s.waiters[name] + kept := waiters[:0] + for _, w := range waiters { + if w != ch { + kept = append(kept, w) + } + } + if len(kept) == 0 { + delete(s.waiters, name) + } else { + s.waiters[name] = kept + } + s.mu.Unlock() + return SAPAnnouncement{}, ctx.Err() + case a := <-ch: + return a, nil + } +} + +func (s *StreamFinder) WaitConfigByStreamName(ctx context.Context, base Config, name string) (Config, SAPAnnouncement, error) { + a, err := s.WaitForStreamName(ctx, name) + if err != nil { + return Config{}, SAPAnnouncement{}, err + } + cfg, err := ConfigFromSDP(base, a.ParsedSDP) + if err != nil { + return Config{}, SAPAnnouncement{}, err + } + return cfg, a, nil +} + +func (s *StreamFinder) Snapshot() []SAPAnnouncement { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]SAPAnnouncement, 0, len(s.sessions)) + for _, v := range s.sessions { + out = append(out, v) + } + return out +} + +func (s *StreamFinder) WaitForStreamNameTimeout(name string, timeout time.Duration) (SAPAnnouncement, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return s.WaitForStreamName(ctx, name) +} diff --git a/aoiprxkit/stream_proto.go b/aoiprxkit/stream_proto.go new file mode 100644 index 0000000..271323e --- /dev/null +++ b/aoiprxkit/stream_proto.go @@ -0,0 +1,81 @@ +package aoiprxkit + +import ( + "encoding/binary" + "fmt" + "io" +) + +const ( + streamMagic = "ARX1" + + StreamCodecPCM_S32LE uint8 = 1 + StreamCodecOpus uint8 = 2 // reserved for later phases +) + +type StreamHeader struct { + Codec uint8 + Channels uint8 + SampleRateHz uint32 + FrameSamples uint32 + Sequence uint32 + Timestamp uint64 + PayloadBytes uint32 +} + +func ReadStreamHeader(r io.Reader) (StreamHeader, error) { + var raw [26]byte + if _, err := io.ReadFull(r, raw[:]); err != nil { + return StreamHeader{}, err + } + if string(raw[0:4]) != streamMagic { + return StreamHeader{}, fmt.Errorf("invalid stream magic %q", string(raw[0:4])) + } + h := StreamHeader{ + Codec: raw[4], + Channels: raw[5], + SampleRateHz: binary.BigEndian.Uint32(raw[6:10]), + FrameSamples: binary.BigEndian.Uint32(raw[10:14]), + Sequence: binary.BigEndian.Uint32(raw[14:18]), + Timestamp: binary.BigEndian.Uint64(raw[18:26]), + } + var payloadLenRaw [4]byte + if _, err := io.ReadFull(r, payloadLenRaw[:]); err != nil { + return StreamHeader{}, err + } + h.PayloadBytes = binary.BigEndian.Uint32(payloadLenRaw[:]) + return h, nil +} + +func WritePCM32Packet(w io.Writer, channels int, sampleRateHz int, frameSamples int, sequence uint32, timestamp uint64, samples []int32) error { + if channels < 1 || channels > 2 { + return fmt.Errorf("channels must be 1 or 2") + } + if frameSamples < 0 { + return fmt.Errorf("frameSamples must be >= 0") + } + if len(samples) != frameSamples*channels { + return fmt.Errorf("sample length mismatch: got=%d want=%d", len(samples), frameSamples*channels) + } + + payloadBytes := len(samples) * 4 + var hdr [30]byte + copy(hdr[0:4], []byte(streamMagic)) + hdr[4] = StreamCodecPCM_S32LE + hdr[5] = byte(channels) + binary.BigEndian.PutUint32(hdr[6:10], uint32(sampleRateHz)) + binary.BigEndian.PutUint32(hdr[10:14], uint32(frameSamples)) + binary.BigEndian.PutUint32(hdr[14:18], sequence) + binary.BigEndian.PutUint64(hdr[18:26], timestamp) + binary.BigEndian.PutUint32(hdr[26:30], uint32(payloadBytes)) + if _, err := w.Write(hdr[:]); err != nil { + return err + } + + payload := make([]byte, payloadBytes) + for i, s := range samples { + binary.LittleEndian.PutUint32(payload[i*4:i*4+4], uint32(s)) + } + _, err := w.Write(payload) + return err +} diff --git a/aoiprxkit/stream_proto_test.go b/aoiprxkit/stream_proto_test.go new file mode 100644 index 0000000..6a1a8ef --- /dev/null +++ b/aoiprxkit/stream_proto_test.go @@ -0,0 +1,34 @@ +package aoiprxkit + +import ( + "bytes" + "testing" +) + +func TestWriteAndReadPCM32Packet(t *testing.T) { + var buf bytes.Buffer + samples := []int32{1, -1, 10, -10} + if err := WritePCM32Packet(&buf, 2, 48000, 2, 7, 1234, samples); err != nil { + t.Fatalf("unexpected write error: %v", err) + } + hdr, err := ReadStreamHeader(&buf) + if err != nil { + t.Fatalf("unexpected read header error: %v", err) + } + if hdr.Codec != StreamCodecPCM_S32LE || hdr.Channels != 2 || hdr.SampleRateHz != 48000 || hdr.FrameSamples != 2 || hdr.Sequence != 7 || hdr.Timestamp != 1234 || hdr.PayloadBytes != 16 { + t.Fatalf("unexpected header: %+v", hdr) + } + payload := make([]byte, hdr.PayloadBytes) + if _, err := buf.Read(payload); err != nil { + t.Fatalf("unexpected payload read error: %v", err) + } + got, err := DecodeS32LE(payload, 2) + if err != nil { + t.Fatalf("unexpected decode error: %v", err) + } + for i := range samples { + if got[i] != samples[i] { + t.Fatalf("sample %d mismatch: got=%d want=%d", i, got[i], samples[i]) + } + } +} diff --git a/aoiprxkit/stream_receiver.go b/aoiprxkit/stream_receiver.go new file mode 100644 index 0000000..f9b5257 --- /dev/null +++ b/aoiprxkit/stream_receiver.go @@ -0,0 +1,114 @@ +package aoiprxkit + +import ( + "context" + "fmt" + "io" + "sync" + "time" +) + +type StreamReceiverConfig struct { + SourceLabel string +} + +type StreamReceiver struct { + cfg StreamReceiverConfig + opener func(context.Context) (io.ReadCloser, error) + onFrame FrameHandler + + mu sync.Mutex + rc io.ReadCloser + cancel context.CancelFunc + done chan struct{} +} + +func NewStreamReceiver(cfg StreamReceiverConfig, opener func(context.Context) (io.ReadCloser, error), onFrame FrameHandler) (*StreamReceiver, error) { + if opener == nil { + return nil, fmt.Errorf("opener must not be nil") + } + if onFrame == nil { + return nil, fmt.Errorf("onFrame must not be nil") + } + return &StreamReceiver{cfg: cfg, opener: opener, onFrame: onFrame, done: make(chan struct{})}, nil +} + +func (r *StreamReceiver) Start(ctx context.Context) error { + r.mu.Lock() + defer r.mu.Unlock() + if r.rc != nil { + return fmt.Errorf("stream receiver already started") + } + cctx, cancel := context.WithCancel(ctx) + rc, err := r.opener(cctx) + if err != nil { + cancel() + return err + } + r.rc = rc + r.cancel = cancel + r.done = make(chan struct{}) + go r.loop(cctx, rc) + return nil +} + +func (r *StreamReceiver) Stop() error { + r.mu.Lock() + rc := r.rc + cancel := r.cancel + done := r.done + r.rc = nil + r.cancel = nil + r.mu.Unlock() + + if cancel != nil { + cancel() + } + if rc != nil { + _ = rc.Close() + } + if done != nil { + <-done + } + return nil +} + +func (r *StreamReceiver) loop(ctx context.Context, rc io.ReadCloser) { + defer close(r.done) + for { + select { + case <-ctx.Done(): + return + default: + } + hdr, err := ReadStreamHeader(rc) + if err != nil { + return + } + payload := make([]byte, hdr.PayloadBytes) + if _, err := io.ReadFull(rc, payload); err != nil { + return + } + switch hdr.Codec { + case StreamCodecPCM_S32LE: + samples, err := DecodeS32LE(payload, int(hdr.Channels)) + if err != nil { + continue + } + r.onFrame(PCMFrame{ + SequenceNumber: uint16(hdr.Sequence & 0xffff), + Timestamp: uint32(hdr.Timestamp & 0xffffffff), + SampleRateHz: int(hdr.SampleRateHz), + Channels: int(hdr.Channels), + Samples: samples, + ReceivedAt: time.Now(), + Source: r.cfg.SourceLabel, + }) + case StreamCodecOpus: + // Reserved for later phase. Not decoded in this module revision. + continue + default: + continue + } + } +} diff --git a/aoiprxkit/stream_receiver_test.go b/aoiprxkit/stream_receiver_test.go new file mode 100644 index 0000000..f7fa087 --- /dev/null +++ b/aoiprxkit/stream_receiver_test.go @@ -0,0 +1,56 @@ +package aoiprxkit + +import ( + "bytes" + "context" + "io" + "testing" + "time" +) + +type nopCloser struct{ io.Reader } + +func (n nopCloser) Close() error { return nil } + +func TestStreamReceiverPCM(t *testing.T) { + var buf bytes.Buffer + samples := []int32{100, -100, 200, -200} + if err := WritePCM32Packet(&buf, 2, 48000, 2, 55, 999, samples); err != nil { + t.Fatalf("unexpected write error: %v", err) + } + + got := make(chan PCMFrame, 1) + rx, err := NewStreamReceiver(StreamReceiverConfig{SourceLabel: "test-source"}, func(ctx context.Context) (io.ReadCloser, error) { + _ = ctx + return nopCloser{Reader: bytes.NewReader(buf.Bytes())}, nil + }, func(frame PCMFrame) { + select { + case got <- frame: + default: + } + }) + if err != nil { + t.Fatalf("unexpected constructor error: %v", err) + } + if err := rx.Start(context.Background()); err != nil { + t.Fatalf("unexpected start error: %v", err) + } + defer rx.Stop() + + select { + case frame := <-got: + if frame.SampleRateHz != 48000 || frame.Channels != 2 || frame.Source != "test-source" { + t.Fatalf("unexpected frame meta: %+v", frame) + } + if len(frame.Samples) != len(samples) { + t.Fatalf("unexpected sample len: %d", len(frame.Samples)) + } + for i := range samples { + if frame.Samples[i] != samples[i] { + t.Fatalf("sample %d mismatch: got=%d want=%d", i, frame.Samples[i], samples[i]) + } + } + case <-time.After(500 * time.Millisecond): + t.Fatalf("timeout waiting for frame") + } +} diff --git a/go.mod b/go.mod index d553bb4..3f45ac5 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,12 @@ go 1.22 require github.com/jan/fm-rds-tx/internal v0.0.0 require ( + aoiprxkit v0.0.0 // indirect github.com/hajimehoshi/go-mp3 v0.3.4 // indirect github.com/jfreymuth/oggvorbis v1.0.5 // indirect github.com/jfreymuth/vorbis v1.0.2 // indirect ) replace github.com/jan/fm-rds-tx/internal => ./internal + +replace aoiprxkit => ./aoiprxkit diff --git a/internal/config/config.go b/internal/config/config.go index 7a8b56b..eba5ecb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -77,6 +77,7 @@ type IngestConfig struct { Stdin IngestPCMConfig `json:"stdin"` HTTPRaw IngestPCMConfig `json:"httpRaw"` Icecast IngestIcecastConfig `json:"icecast"` + SRT IngestSRTConfig `json:"srt"` } type IngestReconnectConfig struct { @@ -104,6 +105,13 @@ type IngestIcecastRadioTextConfig struct { OnlyOnChange bool `json:"onlyOnChange"` } +type IngestSRTConfig struct { + URL string `json:"url"` + Mode string `json:"mode"` + SampleRateHz int `json:"sampleRateHz"` + Channels int `json:"channels"` +} + func Default() Config { return Config{ Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0.4}, @@ -152,6 +160,11 @@ func Default() Config { OnlyOnChange: true, }, }, + SRT: IngestSRTConfig{ + Mode: "listener", + SampleRateHz: 48000, + Channels: 2, + }, }, } } @@ -238,8 +251,9 @@ func (c Config) Validate() error { if c.Ingest.Kind == "" { c.Ingest.Kind = "none" } - switch strings.ToLower(strings.TrimSpace(c.Ingest.Kind)) { - case "none", "stdin", "stdin-pcm", "http-raw", "icecast": + ingestKind := strings.ToLower(strings.TrimSpace(c.Ingest.Kind)) + switch ingestKind { + case "none", "stdin", "stdin-pcm", "http-raw", "icecast", "srt": default: return fmt.Errorf("ingest.kind unsupported: %s", c.Ingest.Kind) } @@ -270,9 +284,23 @@ func (c Config) Validate() error { if strings.ToLower(strings.TrimSpace(c.Ingest.Stdin.Format)) != "s16le" || strings.ToLower(strings.TrimSpace(c.Ingest.HTTPRaw.Format)) != "s16le" { return fmt.Errorf("ingest pcm format must be s16le") } - if c.Ingest.Kind == "icecast" && strings.TrimSpace(c.Ingest.Icecast.URL) == "" { + if ingestKind == "icecast" && strings.TrimSpace(c.Ingest.Icecast.URL) == "" { return fmt.Errorf("ingest.icecast.url is required when ingest.kind=icecast") } + if ingestKind == "srt" && strings.TrimSpace(c.Ingest.SRT.URL) == "" { + return fmt.Errorf("ingest.srt.url is required when ingest.kind=srt") + } + switch strings.ToLower(strings.TrimSpace(c.Ingest.SRT.Mode)) { + case "", "listener", "caller", "rendezvous": + default: + return fmt.Errorf("ingest.srt.mode unsupported: %s", c.Ingest.SRT.Mode) + } + if c.Ingest.SRT.SampleRateHz <= 0 { + return fmt.Errorf("ingest.srt.sampleRateHz must be > 0") + } + if c.Ingest.SRT.Channels != 1 && c.Ingest.SRT.Channels != 2 { + return fmt.Errorf("ingest.srt.channels must be 1 or 2") + } switch strings.ToLower(strings.TrimSpace(c.Ingest.Icecast.Decoder)) { case "", "auto", "native", "ffmpeg", "fallback": default: diff --git a/internal/config/config_test.go b/internal/config/config_test.go index affdbb7..6002e54 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -132,6 +132,39 @@ func TestValidateRejectsUnsupportedIngestKind(t *testing.T) { } } +func TestValidateRejectsInvalidSRTConfig(t *testing.T) { + cfg := Default() + cfg.Ingest.Kind = "srt" + cfg.Ingest.SRT.URL = "" + if err := cfg.Validate(); err == nil { + t.Fatal("expected srt url error") + } + + cfg = Default() + cfg.Ingest.Kind = "srt" + cfg.Ingest.SRT.URL = "srt://127.0.0.1:9000" + cfg.Ingest.SRT.Mode = "invalid" + if err := cfg.Validate(); err == nil { + t.Fatal("expected srt mode error") + } + + cfg = Default() + cfg.Ingest.Kind = "srt" + cfg.Ingest.SRT.URL = "srt://127.0.0.1:9000" + cfg.Ingest.SRT.SampleRateHz = 0 + if err := cfg.Validate(); err == nil { + t.Fatal("expected srt sample rate error") + } + + cfg = Default() + cfg.Ingest.Kind = "srt" + cfg.Ingest.SRT.URL = "srt://127.0.0.1:9000" + cfg.Ingest.SRT.Channels = 3 + if err := cfg.Validate(); err == nil { + t.Fatal("expected srt channels error") + } +} + func TestValidateRejectsUnsupportedIngestPCMShape(t *testing.T) { cfg := Default() cfg.Ingest.Stdin.SampleRateHz = 0 diff --git a/internal/go.mod b/internal/go.mod index 89df427..350d8f2 100644 --- a/internal/go.mod +++ b/internal/go.mod @@ -1,10 +1,13 @@ module github.com/jan/fm-rds-tx/internal -go 1.21 +go 1.22 require ( + aoiprxkit v0.0.0 github.com/hajimehoshi/go-mp3 v0.3.4 github.com/jfreymuth/oggvorbis v1.0.5 ) require github.com/jfreymuth/vorbis v1.0.2 // indirect + +replace aoiprxkit => ../aoiprxkit diff --git a/internal/ingest/adapters/srt/source.go b/internal/ingest/adapters/srt/source.go new file mode 100644 index 0000000..327e1ae --- /dev/null +++ b/internal/ingest/adapters/srt/source.go @@ -0,0 +1,283 @@ +package srt + +import ( + "context" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "aoiprxkit" + "github.com/jan/fm-rds-tx/internal/ingest" +) + +type Option func(*Source) + +func WithConnOpener(opener aoiprxkit.SRTConnOpener) Option { + return func(s *Source) { + if opener != nil { + s.opener = opener + } + } +} + +type Source struct { + id string + cfg aoiprxkit.SRTConfig + + opener aoiprxkit.SRTConnOpener + + chunks chan ingest.PCMChunk + errs chan error + + cancel context.CancelFunc + wg sync.WaitGroup + + mu sync.Mutex + rx *aoiprxkit.SRTReceiver + started atomic.Bool + closeOnce sync.Once + + state atomic.Value // string + connected atomic.Bool + chunksIn atomic.Uint64 + samplesIn atomic.Uint64 + overflows atomic.Uint64 + discontinuities atomic.Uint64 + transportLoss atomic.Uint64 + reorders atomic.Uint64 + lastChunkAtUnix atomic.Int64 + lastError atomic.Value // string + nextSeq atomic.Uint64 + + seqMu sync.Mutex + lastFrame uint16 + lastHasVal bool +} + +func New(id string, cfg aoiprxkit.SRTConfig, opts ...Option) *Source { + if id == "" { + id = "srt-main" + } + if cfg.Mode == "" { + cfg.Mode = "listener" + } + if cfg.SampleRateHz <= 0 { + cfg.SampleRateHz = 48000 + } + if cfg.Channels <= 0 { + cfg.Channels = 2 + } + + s := &Source{ + id: id, + cfg: cfg, + chunks: make(chan ingest.PCMChunk, 64), + errs: make(chan error, 8), + } + for _, opt := range opts { + if opt != nil { + opt(s) + } + } + s.state.Store("idle") + s.lastError.Store("") + return s +} + +func (s *Source) Descriptor() ingest.SourceDescriptor { + return ingest.SourceDescriptor{ + ID: s.id, + Kind: "srt", + Family: "aoip", + Transport: "srt", + Codec: "pcm_s32le", + Channels: s.cfg.Channels, + SampleRateHz: s.cfg.SampleRateHz, + Detail: s.cfg.URL, + } +} + +func (s *Source) Start(ctx context.Context) error { + if !s.started.CompareAndSwap(false, true) { + return nil + } + + var ( + rx *aoiprxkit.SRTReceiver + err error + ) + if s.opener != nil { + rx, err = aoiprxkit.NewSRTReceiverWithOpener(s.cfg, s.opener, s.handleFrame) + } else { + rx, err = aoiprxkit.NewSRTReceiver(s.cfg, s.handleFrame) + } + if err != nil { + s.started.Store(false) + s.connected.Store(false) + s.state.Store("failed") + s.setError(err) + return err + } + + runCtx, cancel := context.WithCancel(ctx) + s.cancel = cancel + s.mu.Lock() + s.rx = rx + s.mu.Unlock() + s.lastError.Store("") + s.connected.Store(false) + s.state.Store("connecting") + + if err := rx.Start(runCtx); err != nil { + s.started.Store(false) + s.connected.Store(false) + s.state.Store("failed") + s.setError(err) + return err + } + s.connected.Store(true) + s.state.Store("running") + + s.wg.Add(1) + go func() { + defer s.wg.Done() + <-runCtx.Done() + _ = s.stopReceiver() + s.connected.Store(false) + s.closeChannels() + }() + return nil +} + +func (s *Source) Stop() error { + if !s.started.CompareAndSwap(true, false) { + return nil + } + if s.cancel != nil { + s.cancel() + } + if err := s.stopReceiver(); err != nil { + s.setError(err) + s.state.Store("failed") + } + s.wg.Wait() + s.connected.Store(false) + state, _ := s.state.Load().(string) + if state != "failed" { + s.state.Store("stopped") + } + return nil +} + +func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks } +func (s *Source) Errors() <-chan error { return s.errs } + +func (s *Source) Stats() ingest.SourceStats { + state, _ := s.state.Load().(string) + last := s.lastChunkAtUnix.Load() + errStr, _ := s.lastError.Load().(string) + var lastChunkAt time.Time + if last > 0 { + lastChunkAt = time.Unix(0, last) + } + return ingest.SourceStats{ + State: state, + Connected: s.connected.Load(), + LastChunkAt: lastChunkAt, + ChunksIn: s.chunksIn.Load(), + SamplesIn: s.samplesIn.Load(), + Overflows: s.overflows.Load(), + Discontinuities: s.discontinuities.Load(), + TransportLoss: s.transportLoss.Load(), + Reorders: s.reorders.Load(), + LastError: errStr, + } +} + +func (s *Source) handleFrame(frame aoiprxkit.PCMFrame) { + if !s.started.Load() { + return + } + + discontinuity := false + s.seqMu.Lock() + if s.lastHasVal { + expected := s.lastFrame + 1 + if frame.SequenceNumber != expected { + discontinuity = true + delta := int16(frame.SequenceNumber - expected) + if delta > 0 { + s.transportLoss.Add(uint64(delta)) + } else { + s.reorders.Add(1) + } + } + } + s.lastFrame = frame.SequenceNumber + s.lastHasVal = true + s.seqMu.Unlock() + + chunk := ingest.PCMChunk{ + Samples: append([]int32(nil), frame.Samples...), + Channels: frame.Channels, + SampleRateHz: frame.SampleRateHz, + Sequence: s.nextSeq.Add(1) - 1, + Timestamp: frame.ReceivedAt, + SourceID: s.id, + Discontinuity: discontinuity, + } + + s.chunksIn.Add(1) + s.samplesIn.Add(uint64(len(chunk.Samples))) + s.lastChunkAtUnix.Store(time.Now().UnixNano()) + if discontinuity { + s.discontinuities.Add(1) + } + + select { + case s.chunks <- chunk: + default: + s.overflows.Add(1) + s.discontinuities.Add(1) + s.setError(io.ErrShortBuffer) + s.emitError(fmt.Errorf("srt chunk buffer overflow")) + } +} + +func (s *Source) stopReceiver() error { + s.mu.Lock() + rx := s.rx + s.rx = nil + s.mu.Unlock() + if rx == nil { + return nil + } + return rx.Stop() +} + +func (s *Source) closeChannels() { + s.closeOnce.Do(func() { + close(s.chunks) + close(s.errs) + }) +} + +func (s *Source) setError(err error) { + if err == nil { + return + } + s.lastError.Store(err.Error()) + s.emitError(err) +} + +func (s *Source) emitError(err error) { + if err == nil { + return + } + select { + case s.errs <- err: + default: + } +} diff --git a/internal/ingest/adapters/srt/source_test.go b/internal/ingest/adapters/srt/source_test.go new file mode 100644 index 0000000..a4527b1 --- /dev/null +++ b/internal/ingest/adapters/srt/source_test.go @@ -0,0 +1,109 @@ +package srt + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "aoiprxkit" + "github.com/jan/fm-rds-tx/internal/ingest" +) + +type readCloser struct{ io.Reader } + +func (r readCloser) Close() error { return nil } + +func TestSourceEmitsChunksFromSRTFrames(t *testing.T) { + var stream bytes.Buffer + if err := aoiprxkit.WritePCM32Packet(&stream, 2, 48000, 2, 10, 100, []int32{1, 2, 3, 4}); err != nil { + t.Fatalf("write packet 1: %v", err) + } + if err := aoiprxkit.WritePCM32Packet(&stream, 2, 48000, 2, 12, 200, []int32{5, 6, 7, 8}); err != nil { + t.Fatalf("write packet 2: %v", err) + } + + src := New("srt-test", aoiprxkit.SRTConfig{ + URL: "srt://127.0.0.1:9000?mode=listener", + Mode: "listener", + SampleRateHz: 48000, + Channels: 2, + }, WithConnOpener(func(ctx context.Context, cfg aoiprxkit.SRTConfig) (io.ReadCloser, error) { + _ = ctx + _ = cfg + return readCloser{Reader: bytes.NewReader(stream.Bytes())}, nil + })) + + if err := src.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer src.Stop() + + chunk1 := readChunk(t, src.Chunks()) + if chunk1.SourceID != "srt-test" { + t.Fatalf("source id=%q want srt-test", chunk1.SourceID) + } + if chunk1.Channels != 2 || chunk1.SampleRateHz != 48000 { + t.Fatalf("shape=%d/%d", chunk1.Channels, chunk1.SampleRateHz) + } + if chunk1.Discontinuity { + t.Fatalf("first chunk should not be discontinuity") + } + assertSamples(t, chunk1.Samples, []int32{1, 2, 3, 4}) + + chunk2 := readChunk(t, src.Chunks()) + if !chunk2.Discontinuity { + t.Fatalf("second chunk should be marked discontinuity on seq gap") + } + assertSamples(t, chunk2.Samples, []int32{5, 6, 7, 8}) + + stats := src.Stats() + if stats.State != "running" { + t.Fatalf("state=%q want running", stats.State) + } + if !stats.Connected { + t.Fatalf("connected=false want true") + } + if stats.ChunksIn != 2 { + t.Fatalf("chunksIn=%d want 2", stats.ChunksIn) + } + if stats.SamplesIn != 8 { + t.Fatalf("samplesIn=%d want 8", stats.SamplesIn) + } + if stats.TransportLoss != 1 { + t.Fatalf("transportLoss=%d want 1", stats.TransportLoss) + } + if stats.Discontinuities < 1 { + t.Fatalf("discontinuities=%d want >=1", stats.Discontinuities) + } + if stats.LastChunkAt.IsZero() { + t.Fatalf("lastChunkAt should be set") + } +} + +func readChunk(t *testing.T, ch <-chan ingest.PCMChunk) ingest.PCMChunk { + t.Helper() + select { + case chunk, ok := <-ch: + if !ok { + t.Fatal("chunk channel closed") + } + return chunk + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for chunk") + return ingest.PCMChunk{} + } +} + +func assertSamples(t *testing.T, got, want []int32) { + t.Helper() + if len(got) != len(want) { + t.Fatalf("sample len=%d want %d", len(got), len(want)) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("sample[%d]=%d want %d", i, got[i], want[i]) + } + } +} diff --git a/internal/ingest/factory/factory.go b/internal/ingest/factory/factory.go index 5a46905..62146fa 100644 --- a/internal/ingest/factory/factory.go +++ b/internal/ingest/factory/factory.go @@ -7,16 +7,19 @@ import ( "os" "strings" + "aoiprxkit" "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/adapters/httpraw" "github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast" + "github.com/jan/fm-rds-tx/internal/ingest/adapters/srt" "github.com/jan/fm-rds-tx/internal/ingest/adapters/stdinpcm" ) type Deps struct { - Stdin io.Reader - HTTP *http.Client + Stdin io.Reader + HTTP *http.Client + SRTOpener aoiprxkit.SRTConnOpener } type AudioIngress interface { @@ -50,6 +53,19 @@ func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, err icecast.WithDecoderPreference(cfg.Ingest.Icecast.Decoder), ) return src, nil, nil + case "srt": + srtCfg := aoiprxkit.SRTConfig{ + URL: cfg.Ingest.SRT.URL, + Mode: cfg.Ingest.SRT.Mode, + SampleRateHz: cfg.Ingest.SRT.SampleRateHz, + Channels: cfg.Ingest.SRT.Channels, + } + opts := []srt.Option{} + if deps.SRTOpener != nil { + opts = append(opts, srt.WithConnOpener(deps.SRTOpener)) + } + src := srt.New("srt-main", srtCfg, opts...) + return src, nil, nil default: return nil, nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind) } @@ -67,6 +83,10 @@ func SampleRateForKind(cfg config.Config) int { } case "icecast": return 44100 + case "srt": + if cfg.Ingest.SRT.SampleRateHz > 0 { + return cfg.Ingest.SRT.SampleRateHz + } } return 44100 } diff --git a/internal/ingest/factory/factory_test.go b/internal/ingest/factory/factory_test.go index f75a5e1..5b147cd 100644 --- a/internal/ingest/factory/factory_test.go +++ b/internal/ingest/factory/factory_test.go @@ -87,6 +87,29 @@ func TestBuildSourceIcecastUsesDecoderPreference(t *testing.T) { } } +func TestBuildSourceSRT(t *testing.T) { + cfg := config.Default() + cfg.Ingest.Kind = "srt" + cfg.Ingest.SRT.URL = "srt://127.0.0.1:9000?mode=listener" + cfg.Ingest.SRT.Mode = "listener" + cfg.Ingest.SRT.SampleRateHz = 48000 + cfg.Ingest.SRT.Channels = 2 + + src, ingress, err := BuildSource(cfg, Deps{}) + if err != nil { + t.Fatalf("build source: %v", err) + } + if src == nil { + t.Fatalf("expected source") + } + if ingress != nil { + t.Fatalf("expected no ingress for srt") + } + if got := src.Descriptor().Kind; got != "srt" { + t.Fatalf("source kind=%s", got) + } +} + func TestBuildSourceUnsupportedKind(t *testing.T) { cfg := config.Default() cfg.Ingest.Kind = "nope" @@ -114,4 +137,10 @@ func TestSampleRateForKind(t *testing.T) { if got := SampleRateForKind(cfg); got != 44100 { t.Fatalf("icecast sample rate=%d", got) } + + cfg.Ingest.Kind = "srt" + cfg.Ingest.SRT.SampleRateHz = 48000 + if got := SampleRateForKind(cfg); got != 48000 { + t.Fatalf("srt sample rate=%d", got) + } } diff --git a/internal/ingest/factory/ingest_smoke_test.go b/internal/ingest/factory/ingest_smoke_test.go index 1cecd3d..e9aed73 100644 --- a/internal/ingest/factory/ingest_smoke_test.go +++ b/internal/ingest/factory/ingest_smoke_test.go @@ -1,15 +1,22 @@ package factory import ( + "bytes" "context" + "io" "testing" "time" + "aoiprxkit" "github.com/jan/fm-rds-tx/internal/audio" "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/ingest" ) +type streamReadCloser struct{ io.Reader } + +func (r streamReadCloser) Close() error { return nil } + func TestHTTPRawFactoryToRuntimeSmoke(t *testing.T) { cfg := config.Default() cfg.Ingest.Kind = "http-raw" @@ -63,6 +70,56 @@ func TestHTTPRawFactoryToRuntimeSmoke(t *testing.T) { } } +func TestSRTFactoryToRuntimeSmoke(t *testing.T) { + var stream bytes.Buffer + if err := aoiprxkit.WritePCM32Packet(&stream, 2, 48000, 2, 1, 480, []int32{11, -11, 22, -22}); err != nil { + t.Fatalf("write packet: %v", err) + } + + cfg := config.Default() + cfg.Ingest.Kind = "srt" + cfg.Ingest.SRT.URL = "srt://127.0.0.1:9000?mode=listener" + cfg.Ingest.SRT.SampleRateHz = 48000 + cfg.Ingest.SRT.Channels = 2 + + src, ingress, err := BuildSource(cfg, Deps{ + SRTOpener: func(ctx context.Context, srtCfg aoiprxkit.SRTConfig) (io.ReadCloser, error) { + _ = ctx + _ = srtCfg + return streamReadCloser{Reader: bytes.NewReader(stream.Bytes())}, nil + }, + }) + if err != nil { + t.Fatalf("build source: %v", err) + } + if src == nil { + t.Fatalf("expected source for kind=srt") + } + if ingress != nil { + t.Fatalf("expected no ingress for kind=srt") + } + + sink := audio.NewStreamSource(128, cfg.Ingest.SRT.SampleRateHz) + rt := ingest.NewRuntime(sink, src) + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("runtime start: %v", err) + } + defer rt.Stop() + + waitForSinkFrames(t, sink, 2) + + stats := rt.Stats() + if stats.Active.Kind != "srt" { + t.Fatalf("active kind=%q want srt", stats.Active.Kind) + } + if stats.Source.ChunksIn != 1 { + t.Fatalf("source chunksIn=%d want 1", stats.Source.ChunksIn) + } + if stats.Source.SamplesIn != 4 { + t.Fatalf("source samplesIn=%d want 4", stats.Source.SamplesIn) + } +} + func waitForSinkFrames(t *testing.T, sink *audio.StreamSource, minFrames int) { t.Helper() deadline := time.Now().Add(1 * time.Second)