From 180c0197fd2edc423d3ed87857605989afa2e193 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 17:38:02 +0200 Subject: [PATCH] ingest: relay icecast stream titles into rds radiotext --- cmd/fmrtx/main.go | 29 ++++- internal/config/config.go | 20 +++- internal/config/config_test.go | 8 ++ internal/ingest/adapters/icecast/icy.go | 109 ++++++++++++++++++ internal/ingest/adapters/icecast/icy_test.go | 77 +++++++++++++ internal/ingest/adapters/icecast/radiotext.go | 106 +++++++++++++++++ .../ingest/adapters/icecast/radiotext_test.go | 65 +++++++++++ internal/ingest/adapters/icecast/source.go | 38 +++++- .../ingest/adapters/icecast/source_test.go | 52 +++++++++ internal/ingest/runtime.go | 29 ++++- internal/ingest/runtime_test.go | 31 ++++- internal/ingest/source.go | 6 + internal/ingest/stats.go | 4 + 13 files changed, 566 insertions(+), 8 deletions(-) create mode 100644 internal/ingest/adapters/icecast/icy.go create mode 100644 internal/ingest/adapters/icecast/icy_test.go create mode 100644 internal/ingest/adapters/icecast/radiotext.go create mode 100644 internal/ingest/adapters/icecast/radiotext_test.go diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index 080a89c..6617414 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -17,6 +17,7 @@ import ( ctrlpkg "github.com/jan/fm-rds-tx/internal/control" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" "github.com/jan/fm-rds-tx/internal/ingest" + "github.com/jan/fm-rds-tx/internal/ingest/adapters/icecast" ingestfactory "github.com/jan/fm-rds-tx/internal/ingest/factory" "github.com/jan/fm-rds-tx/internal/platform" "github.com/jan/fm-rds-tx/internal/platform/plutosdr" @@ -190,7 +191,33 @@ func runTXMode(cfg cfgpkg.Config, driver platform.SoapyDriver, autoStart bool, a if err != nil { log.Fatalf("ingest source: %v", err) } - ingestRuntime = ingest.NewRuntime(streamSrc, source) + runtimeOpts := []ingest.RuntimeOption{} + if cfg.Ingest.Icecast.RadioText.Enabled { + relay := icecast.NewRadioTextRelay( + icecast.RadioTextOptions{ + Enabled: true, + Prefix: cfg.Ingest.Icecast.RadioText.Prefix, + MaxLen: cfg.Ingest.Icecast.RadioText.MaxLen, + OnlyOnChange: cfg.Ingest.Icecast.RadioText.OnlyOnChange, + }, + cfg.RDS.RadioText, + func(rt string) error { + return engine.UpdateConfig(apppkg.LiveConfigUpdate{RadioText: &rt}) + }, + ) + runtimeOpts = append(runtimeOpts, ingest.WithStreamTitleHandler(func(streamTitle string) { + if err := relay.HandleStreamTitle(streamTitle); err != nil { + log.Printf("ingest: failed to forward StreamTitle to RDS RadioText: %v", err) + } + })) + log.Printf( + "ingest: ICY StreamTitle->RDS enabled (maxLen=%d onlyOnChange=%t prefix=%q)", + cfg.Ingest.Icecast.RadioText.MaxLen, + cfg.Ingest.Icecast.RadioText.OnlyOnChange, + cfg.Ingest.Icecast.RadioText.Prefix, + ) + } + ingestRuntime = ingest.NewRuntime(streamSrc, source, runtimeOpts...) if err := ingestRuntime.Start(ctx); err != nil { log.Fatalf("ingest start: %v", err) } diff --git a/internal/config/config.go b/internal/config/config.go index 2dff082..7a8b56b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -92,8 +92,16 @@ type IngestPCMConfig struct { } type IngestIcecastConfig struct { - URL string `json:"url"` - Decoder string `json:"decoder"` + URL string `json:"url"` + Decoder string `json:"decoder"` + RadioText IngestIcecastRadioTextConfig `json:"radioText"` +} + +type IngestIcecastRadioTextConfig struct { + Enabled bool `json:"enabled"` + Prefix string `json:"prefix"` + MaxLen int `json:"maxLen"` + OnlyOnChange bool `json:"onlyOnChange"` } func Default() Config { @@ -138,6 +146,11 @@ func Default() Config { }, Icecast: IngestIcecastConfig{ Decoder: "auto", + RadioText: IngestIcecastRadioTextConfig{ + Enabled: false, + MaxLen: 64, + OnlyOnChange: true, + }, }, }, } @@ -265,6 +278,9 @@ func (c Config) Validate() error { default: return fmt.Errorf("ingest.icecast.decoder unsupported: %s", c.Ingest.Icecast.Decoder) } + if c.Ingest.Icecast.RadioText.MaxLen < 0 || c.Ingest.Icecast.RadioText.MaxLen > 64 { + return fmt.Errorf("ingest.icecast.radioText.maxLen out of range (0-64)") + } // Fail-loud PI validation if c.RDS.Enabled { if _, err := ParsePI(c.RDS.PI); err != nil { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 031fbcb..affdbb7 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -168,6 +168,14 @@ func TestValidateAcceptsIcecastDecoderFallbackAlias(t *testing.T) { } } +func TestValidateRejectsIcecastRadioTextMaxLenOutOfRange(t *testing.T) { + cfg := Default() + cfg.Ingest.Icecast.RadioText.MaxLen = 65 + if err := cfg.Validate(); err == nil { + t.Fatal("expected maxLen error") + } +} + func TestValidateRejectsReconnectWithMissingBackoff(t *testing.T) { cfg := Default() cfg.Ingest.Reconnect.Enabled = true diff --git a/internal/ingest/adapters/icecast/icy.go b/internal/ingest/adapters/icecast/icy.go new file mode 100644 index 0000000..5a69d43 --- /dev/null +++ b/internal/ingest/adapters/icecast/icy.go @@ -0,0 +1,109 @@ +package icecast + +import ( + "bytes" + "fmt" + "io" + "strconv" + "strings" +) + +type icyMetadata struct { + StreamTitle string +} + +type icyReader struct { + r io.Reader + metaInt int + audioLeft int + onMetadata func(icyMetadata) +} + +func newICYReader(r io.Reader, metaInt int, onMetadata func(icyMetadata)) io.Reader { + if r == nil || metaInt <= 0 { + return r + } + return &icyReader{ + r: r, + metaInt: metaInt, + audioLeft: metaInt, + onMetadata: onMetadata, + } +} + +func (r *icyReader) Read(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + for { + if r.audioLeft == 0 { + if err := r.readMetadataBlock(); err != nil { + return 0, err + } + r.audioLeft = r.metaInt + continue + } + want := len(p) + if want > r.audioLeft { + want = r.audioLeft + } + n, err := r.r.Read(p[:want]) + if n > 0 { + r.audioLeft -= n + return n, nil + } + if err != nil { + return 0, err + } + } +} + +func (r *icyReader) readMetadataBlock() error { + var lenBuf [1]byte + if _, err := io.ReadFull(r.r, lenBuf[:]); err != nil { + return err + } + blockLen := int(lenBuf[0]) * 16 + if blockLen == 0 { + return nil + } + block := make([]byte, blockLen) + if _, err := io.ReadFull(r.r, block); err != nil { + return err + } + if r.onMetadata != nil { + r.onMetadata(parseICYMetadata(block)) + } + return nil +} + +func parseICYMetadata(block []byte) icyMetadata { + raw := strings.TrimRight(string(bytes.Trim(block, "\x00")), "\x00") + meta := icyMetadata{} + for _, field := range strings.Split(raw, ";") { + field = strings.TrimSpace(field) + if !strings.HasPrefix(field, "StreamTitle=") { + continue + } + v := strings.TrimPrefix(field, "StreamTitle=") + v = strings.TrimSpace(v) + if len(v) >= 2 && ((v[0] == '\'' && v[len(v)-1] == '\'') || (v[0] == '"' && v[len(v)-1] == '"')) { + v = v[1 : len(v)-1] + } + meta.StreamTitle = v + break + } + return meta +} + +func parseICYMetaInt(raw string) (int, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0, nil + } + n, err := strconv.Atoi(raw) + if err != nil || n < 0 { + return 0, fmt.Errorf("invalid icy-metaint: %q", raw) + } + return n, nil +} diff --git a/internal/ingest/adapters/icecast/icy_test.go b/internal/ingest/adapters/icecast/icy_test.go new file mode 100644 index 0000000..63a0798 --- /dev/null +++ b/internal/ingest/adapters/icecast/icy_test.go @@ -0,0 +1,77 @@ +package icecast + +import ( + "bytes" + "io" + "testing" +) + +func TestParseICYMetadataExtractsStreamTitle(t *testing.T) { + meta := parseICYMetadata([]byte("StreamTitle='Artist - Track';StreamUrl='';")) + if meta.StreamTitle != "Artist - Track" { + t.Fatalf("streamTitle=%q want %q", meta.StreamTitle, "Artist - Track") + } +} + +func TestICYReaderStripsMetadataAndEmitsTitle(t *testing.T) { + block := buildICYMetadataBlock("StreamTitle='Unit Test';") + wire := append([]byte("ABCD"), byte(len(block)/16)) + wire = append(wire, block...) + wire = append(wire, []byte("EFGH")...) + + var got icyMetadata + r := newICYReader(bytes.NewReader(wire), 4, func(meta icyMetadata) { + got = meta + }) + + audio, err := io.ReadAll(r) + if err != nil { + t.Fatalf("read: %v", err) + } + if string(audio) != "ABCDEFGH" { + t.Fatalf("audio=%q want %q", string(audio), "ABCDEFGH") + } + if got.StreamTitle != "Unit Test" { + t.Fatalf("streamTitle=%q want %q", got.StreamTitle, "Unit Test") + } +} + +func TestParseICYMetaInt(t *testing.T) { + tests := []struct { + name string + in string + want int + wantErr bool + }{ + {name: "empty", in: "", want: 0}, + {name: "valid", in: "16000", want: 16000}, + {name: "invalid", in: "x", wantErr: true}, + {name: "negative", in: "-1", wantErr: true}, + } + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + got, err := parseICYMetaInt(tc.in) + if tc.wantErr { + if err == nil { + t.Fatalf("expected error for %q", tc.in) + } + return + } + if err != nil { + t.Fatalf("parse: %v", err) + } + if got != tc.want { + t.Fatalf("got=%d want %d", got, tc.want) + } + }) + } +} + +func buildICYMetadataBlock(raw string) []byte { + b := []byte(raw) + if rem := len(b) % 16; rem != 0 { + b = append(b, bytes.Repeat([]byte{0x00}, 16-rem)...) + } + return b +} diff --git a/internal/ingest/adapters/icecast/radiotext.go b/internal/ingest/adapters/icecast/radiotext.go new file mode 100644 index 0000000..ea1aee8 --- /dev/null +++ b/internal/ingest/adapters/icecast/radiotext.go @@ -0,0 +1,106 @@ +package icecast + +import ( + "strings" + "sync" +) + +type RadioTextOptions struct { + Enabled bool + Prefix string + MaxLen int + OnlyOnChange bool +} + +func mapStreamTitleToRadioText(streamTitle string, opts RadioTextOptions) string { + if !opts.Enabled { + return "" + } + maxLen := opts.MaxLen + if maxLen <= 0 || maxLen > 64 { + maxLen = 64 + } + title := sanitizeASCII(streamTitle) + if title == "" { + return "" + } + prefixRaw := opts.Prefix + prefixHadTrailingSpace := strings.TrimRight(prefixRaw, " \t\r\n") != prefixRaw + prefix := sanitizeASCII(opts.Prefix) + if prefix != "" && prefixHadTrailingSpace { + prefix += " " + } + rt := title + if prefix != "" { + rt = prefix + title + } + if len(rt) > maxLen { + rt = strings.TrimSpace(rt[:maxLen]) + } + return rt +} + +func sanitizeASCII(raw string) string { + raw = strings.TrimSpace(raw) + if raw == "" { + return "" + } + var b strings.Builder + b.Grow(len(raw)) + prevSpace := true + for _, r := range raw { + switch r { + case '\n', '\r', '\t': + r = ' ' + } + if r < 0x20 || r == 0x7f || r > 0x7e { + continue + } + if r == ' ' { + if prevSpace { + continue + } + prevSpace = true + b.WriteByte(' ') + continue + } + prevSpace = false + b.WriteByte(byte(r)) + } + return strings.TrimSpace(b.String()) +} + +type RadioTextRelay struct { + opts RadioTextOptions + apply func(string) error + mu sync.Mutex + lastRT string +} + +func NewRadioTextRelay(opts RadioTextOptions, initialRT string, apply func(string) error) *RadioTextRelay { + return &RadioTextRelay{ + opts: opts, + apply: apply, + lastRT: sanitizeASCII(initialRT), + } +} + +func (r *RadioTextRelay) HandleStreamTitle(streamTitle string) error { + if r == nil || r.apply == nil { + return nil + } + next := mapStreamTitleToRadioText(streamTitle, r.opts) + if next == "" { + return nil + } + r.mu.Lock() + skip := r.opts.OnlyOnChange && next == r.lastRT + if !skip { + r.lastRT = next + } + r.mu.Unlock() + if skip { + return nil + } + return r.apply(next) +} diff --git a/internal/ingest/adapters/icecast/radiotext_test.go b/internal/ingest/adapters/icecast/radiotext_test.go new file mode 100644 index 0000000..b62df6f --- /dev/null +++ b/internal/ingest/adapters/icecast/radiotext_test.go @@ -0,0 +1,65 @@ +package icecast + +import "testing" + +func TestMapStreamTitleToRadioTextSanitizeAndTruncate(t *testing.T) { + got := mapStreamTitleToRadioText(" Artist\t-\nSong \u2603 ", RadioTextOptions{ + Enabled: true, + Prefix: "Now: ", + MaxLen: 13, + }) + if got != "Now: Artist -" { + t.Fatalf("mapped=%q want %q", got, "Now: Artist -") + } +} + +func TestMapStreamTitleToRadioTextDisabledReturnsEmpty(t *testing.T) { + got := mapStreamTitleToRadioText("Artist - Song", RadioTextOptions{Enabled: false}) + if got != "" { + t.Fatalf("mapped=%q want empty", got) + } +} + +func TestRadioTextRelayOnlyOnChange(t *testing.T) { + calls := 0 + last := "" + relay := NewRadioTextRelay(RadioTextOptions{ + Enabled: true, + OnlyOnChange: true, + }, "", func(rt string) error { + calls++ + last = rt + return nil + }) + + if err := relay.HandleStreamTitle("Artist - Song"); err != nil { + t.Fatalf("first handle: %v", err) + } + if err := relay.HandleStreamTitle("Artist - Song"); err != nil { + t.Fatalf("second handle: %v", err) + } + if calls != 1 { + t.Fatalf("calls=%d want 1", calls) + } + if last != "Artist - Song" { + t.Fatalf("last=%q want %q", last, "Artist - Song") + } +} + +func TestRadioTextRelayInitialSuppressesSameUpdate(t *testing.T) { + calls := 0 + relay := NewRadioTextRelay(RadioTextOptions{ + Enabled: true, + OnlyOnChange: true, + }, "Station default", func(string) error { + calls++ + return nil + }) + + if err := relay.HandleStreamTitle("Station default"); err != nil { + t.Fatalf("handle: %v", err) + } + if calls != 0 { + t.Fatalf("calls=%d want 0", calls) + } +} diff --git a/internal/ingest/adapters/icecast/source.go b/internal/ingest/adapters/icecast/source.go index 1da52c9..028722f 100644 --- a/internal/ingest/adapters/icecast/source.go +++ b/internal/ingest/adapters/icecast/source.go @@ -32,6 +32,7 @@ type Source struct { chunks chan ingest.PCMChunk errs chan error + title chan string cancel context.CancelFunc wg sync.WaitGroup @@ -43,7 +44,11 @@ type Source struct { reconnects atomic.Uint64 discontinuities atomic.Uint64 lastChunkAtUnix atomic.Int64 + lastMetaAtUnix atomic.Int64 + metadataUpdates atomic.Uint64 + icyMetaInt atomic.Int64 lastError atomic.Value // string + streamTitle atomic.Value // string } var errStreamEnded = errors.New("icecast stream ended") @@ -78,6 +83,7 @@ func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Op reconn: reconn, chunks: make(chan ingest.PCMChunk, 64), errs: make(chan error, 8), + title: make(chan string, 16), decReg: defaultRegistry(), decoderPreference: "auto", } @@ -88,6 +94,7 @@ func New(id, url string, client *http.Client, reconn ReconnectConfig, opts ...Op } s.decoderPreference = normalizeDecoderPreference(s.decoderPreference) s.state.Store("idle") + s.streamTitle.Store("") return s } @@ -135,19 +142,32 @@ func (s *Source) Stop() error { func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks } func (s *Source) Errors() <-chan error { return s.errs } +func (s *Source) StreamTitleUpdates() <-chan string { + return s.title +} func (s *Source) Stats() ingest.SourceStats { state, _ := s.state.Load().(string) last := s.lastChunkAtUnix.Load() + lastMeta := s.lastMetaAtUnix.Load() errStr, _ := s.lastError.Load().(string) + streamTitle, _ := s.streamTitle.Load().(string) var lastChunkAt time.Time + var lastMetaAt time.Time if last > 0 { lastChunkAt = time.Unix(0, last) } + if lastMeta > 0 { + lastMetaAt = time.Unix(0, lastMeta) + } return ingest.SourceStats{ State: state, Connected: s.connected.Load(), LastChunkAt: lastChunkAt, + LastMetaAt: lastMetaAt, + StreamTitle: streamTitle, + MetadataUpdates: s.metadataUpdates.Load(), + IcyMetaInt: int(s.icyMetaInt.Load()), ChunksIn: s.chunksIn.Load(), SamplesIn: s.samplesIn.Load(), Reconnects: s.reconnects.Load(), @@ -160,6 +180,7 @@ func (s *Source) loop(ctx context.Context) { defer s.wg.Done() defer close(s.chunks) defer close(s.errs) + defer close(s.title) attempt := 0 for { select { @@ -206,7 +227,7 @@ func (s *Source) connectAndRun(ctx context.Context) error { if err != nil { return err } - req.Header.Set("Icy-MetaData", "0") + req.Header.Set("Icy-MetaData", "1") resp, err := s.client.Do(req) if err != nil { return fmt.Errorf("icecast connect: %w", err) @@ -218,8 +239,11 @@ func (s *Source) connectAndRun(ctx context.Context) error { s.connected.Store(true) s.state.Store("buffering") s.lastError.Store("") + icyMetaInt, _ := parseICYMetaInt(resp.Header.Get("icy-metaint")) + s.icyMetaInt.Store(int64(icyMetaInt)) + stream := newICYReader(resp.Body, icyMetaInt, s.onMetadata) s.state.Store("running") - return s.decodeWithPreference(ctx, resp.Body, decoder.StreamMeta{ + return s.decodeWithPreference(ctx, stream, decoder.StreamMeta{ ContentType: resp.Header.Get("Content-Type"), SourceID: s.id, SampleRateHz: 44100, @@ -227,6 +251,16 @@ func (s *Source) connectAndRun(ctx context.Context) error { }) } +func (s *Source) onMetadata(meta icyMetadata) { + s.streamTitle.Store(meta.StreamTitle) + s.metadataUpdates.Add(1) + s.lastMetaAtUnix.Store(time.Now().UnixNano()) + select { + case s.title <- meta.StreamTitle: + default: + } +} + func (s *Source) emitChunk(chunk ingest.PCMChunk) error { select { case s.chunks <- chunk: diff --git a/internal/ingest/adapters/icecast/source_test.go b/internal/ingest/adapters/icecast/source_test.go index 568110c..162ea89 100644 --- a/internal/ingest/adapters/icecast/source_test.go +++ b/internal/ingest/adapters/icecast/source_test.go @@ -311,6 +311,58 @@ func TestWithDecoderPreferenceFallbackAliasNormalizesToFFmpeg(t *testing.T) { } } +func TestConnectAndRunRequestsICYAndPublishesStreamTitle(t *testing.T) { + const ( + audioPrefix = "ABCD" + audioSuffix = "EFGH" + title = "Artist - Track" + ) + var reqIcyHeader atomic.Value + reqIcyHeader.Store("") + + metadata := buildICYMetadataBlock("StreamTitle='" + title + "';") + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + reqIcyHeader.Store(r.Header.Get("Icy-Metadata")) + w.Header().Set("Content-Type", "audio/mpeg") + w.Header().Set("icy-metaint", "4") + _, _ = w.Write([]byte(audioPrefix)) + _, _ = w.Write([]byte{byte(len(metadata) / 16)}) + _, _ = w.Write(metadata) + _, _ = w.Write([]byte(audioSuffix)) + })) + defer srv.Close() + + native := &captureStreamDecoder{name: "mp3"} + reg := decoder.NewRegistry() + reg.Register("mp3", func() decoder.Decoder { return native }) + reg.Register("ffmpeg", func() decoder.Decoder { return &testDecoder{name: "ffmpeg"} }) + + src := New("ice-test", srv.URL, srv.Client(), ReconnectConfig{}, + WithDecoderRegistry(reg), + WithDecoderPreference("auto"), + ) + + if err := src.connectAndRun(context.Background()); err != nil { + t.Fatalf("connectAndRun: %v", err) + } + if got := reqIcyHeader.Load().(string); got != "1" { + t.Fatalf("Icy-Metadata header=%q want 1", got) + } + if got := string(native.payload); got != audioPrefix+audioSuffix { + t.Fatalf("decoded payload=%q want %q", got, audioPrefix+audioSuffix) + } + stats := src.Stats() + if stats.StreamTitle != title { + t.Fatalf("streamTitle=%q want %q", stats.StreamTitle, title) + } + if stats.MetadataUpdates < 1 { + t.Fatalf("metadataUpdates=%d want >=1", stats.MetadataUpdates) + } + if stats.IcyMetaInt != 4 { + t.Fatalf("icyMetaInt=%d want 4", stats.IcyMetaInt) + } +} + type scriptedLoopDecoder struct { mu sync.Mutex actions []decodeAction diff --git a/internal/ingest/runtime.go b/internal/ingest/runtime.go index fa3ad64..6b9e1ef 100644 --- a/internal/ingest/runtime.go +++ b/internal/ingest/runtime.go @@ -13,6 +13,7 @@ type Runtime struct { sink *audio.StreamSource source Source started atomic.Bool + onTitle func(string) ctx context.Context cancel context.CancelFunc @@ -23,14 +24,28 @@ type Runtime struct { stats RuntimeStats } -func NewRuntime(sink *audio.StreamSource, src Source) *Runtime { - return &Runtime{ +type RuntimeOption func(*Runtime) + +func WithStreamTitleHandler(handler func(string)) RuntimeOption { + return func(r *Runtime) { + r.onTitle = handler + } +} + +func NewRuntime(sink *audio.StreamSource, src Source, opts ...RuntimeOption) *Runtime { + r := &Runtime{ sink: sink, source: src, stats: RuntimeStats{ State: "idle", }, } + for _, opt := range opts { + if opt != nil { + opt(r) + } + } + return r } func (r *Runtime) Start(ctx context.Context) error { @@ -93,6 +108,10 @@ func (r *Runtime) run() { ch := r.source.Chunks() errCh := r.source.Errors() + var titleCh <-chan string + if src, ok := r.source.(StreamTitleSource); ok && r.onTitle != nil { + titleCh = src.StreamTitleUpdates() + } for { select { case <-r.ctx.Done(): @@ -116,6 +135,12 @@ func (r *Runtime) run() { return } r.handleChunk(chunk) + case title, ok := <-titleCh: + if !ok { + titleCh = nil + continue + } + r.onTitle(title) } } } diff --git a/internal/ingest/runtime_test.go b/internal/ingest/runtime_test.go index ee82678..48cfcb3 100644 --- a/internal/ingest/runtime_test.go +++ b/internal/ingest/runtime_test.go @@ -14,6 +14,7 @@ type fakeSource struct { desc SourceDescriptor chunks chan PCMChunk errs chan error + title chan string stats SourceStats once sync.Once } @@ -23,6 +24,7 @@ func newFakeSource() *fakeSource { desc: SourceDescriptor{ID: "fake", Kind: "stdin-pcm"}, chunks: make(chan PCMChunk, 4), errs: make(chan error, 1), + title: make(chan string, 4), stats: SourceStats{State: "running", Connected: true}, } } @@ -32,7 +34,10 @@ func (s *fakeSource) Start(context.Context) error { return nil } func (s *fakeSource) Stop() error { s.once.Do(func() { close(s.chunks) }); return nil } func (s *fakeSource) Chunks() <-chan PCMChunk { return s.chunks } func (s *fakeSource) Errors() <-chan error { return s.errs } -func (s *fakeSource) Stats() SourceStats { return s.stats } +func (s *fakeSource) StreamTitleUpdates() <-chan string { + return s.title +} +func (s *fakeSource) Stats() SourceStats { return s.stats } func TestRuntimeWritesFramesToStreamSink(t *testing.T) { sink := audio.NewStreamSource(128, 44100) @@ -159,6 +164,30 @@ func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) } } +func TestRuntimeForwardsStreamTitleUpdatesToHandler(t *testing.T) { + sink := audio.NewStreamSource(128, 44100) + src := newFakeSource() + got := make(chan string, 1) + rt := NewRuntime(sink, src, WithStreamTitleHandler(func(title string) { + got <- title + })) + + if err := rt.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + defer rt.Stop() + + src.title <- "Artist - Song" + select { + case title := <-got: + if title != "Artist - Song" { + t.Fatalf("title=%q want %q", title, "Artist - Song") + } + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for forwarded title") + } +} + func waitForRuntimeState(t *testing.T, rt *Runtime, want string) { t.Helper() deadline := time.Now().Add(1 * time.Second) diff --git a/internal/ingest/source.go b/internal/ingest/source.go index d851ed3..d4bca57 100644 --- a/internal/ingest/source.go +++ b/internal/ingest/source.go @@ -10,3 +10,9 @@ type Source interface { Errors() <-chan error Stats() SourceStats } + +// StreamTitleSource is an optional extension for sources that expose +// title/metadata updates (for example ICY StreamTitle). +type StreamTitleSource interface { + StreamTitleUpdates() <-chan string +} diff --git a/internal/ingest/stats.go b/internal/ingest/stats.go index fb135c0..55f44a4 100644 --- a/internal/ingest/stats.go +++ b/internal/ingest/stats.go @@ -6,6 +6,10 @@ type SourceStats struct { State string `json:"state"` Connected bool `json:"connected"` LastChunkAt time.Time `json:"lastChunkAt,omitempty"` + LastMetaAt time.Time `json:"lastMetaAt,omitempty"` + StreamTitle string `json:"streamTitle,omitempty"` + MetadataUpdates uint64 `json:"metadataUpdates,omitempty"` + IcyMetaInt int `json:"icyMetaInt,omitempty"` ChunksIn uint64 `json:"chunksIn"` SamplesIn uint64 `json:"samplesIn"` BufferedSeconds float64 `json:"bufferedSeconds"`