From 97c6e9b6a0c49cfa128a2cdeac2f3f09145e81ff Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 22:06:32 +0200 Subject: [PATCH 1/4] ingest: fix source defaults and discovery context handling --- internal/control/server.go | 23 +++++++++++++---------- internal/ingest/factory/factory.go | 22 ++++++++++++++-------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/internal/control/server.go b/internal/control/server.go index 9fcd5cd..fe538b1 100644 --- a/internal/control/server.go +++ b/internal/control/server.go @@ -8,20 +8,23 @@ import ( ) const ( - defaultReadTimeout = 5 * time.Second - defaultWriteTimeout = 10 * time.Second - defaultIdleTimeout = 60 * time.Second - defaultMaxHeaderBytes = 1 << 20 // 1 MiB + defaultReadHeaderTimeout = 5 * time.Second + defaultIdleTimeout = 60 * time.Second + defaultMaxHeaderBytes = 1 << 20 // 1 MiB ) // NewHTTPServer returns a configured HTTP server for the control plane. +// +// WriteTimeout is intentionally not set: /audio/stream accepts long-lived +// POST bodies (continuous PCM push) that would be cut off by a global write +// deadline. Individual endpoints are protected by MaxBytesReader limits. +// ReadHeaderTimeout guards against slow-header attacks. func NewHTTPServer(cfg config.Config, handler http.Handler) *http.Server { return &http.Server{ - Addr: cfg.Control.ListenAddress, - Handler: handler, - ReadTimeout: defaultReadTimeout, - WriteTimeout: defaultWriteTimeout, - IdleTimeout: defaultIdleTimeout, - MaxHeaderBytes: defaultMaxHeaderBytes, + Addr: cfg.Control.ListenAddress, + Handler: handler, + ReadHeaderTimeout: defaultReadHeaderTimeout, + IdleTimeout: defaultIdleTimeout, + MaxHeaderBytes: defaultMaxHeaderBytes, } } diff --git a/internal/ingest/factory/factory.go b/internal/ingest/factory/factory.go index 5f8696c..223e9db 100644 --- a/internal/ingest/factory/factory.go +++ b/internal/ingest/factory/factory.go @@ -42,7 +42,7 @@ type AES67DiscoverRequest struct { type AES67DiscoverFunc func(ctx context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error) -func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, error) { +func BuildSource(ctx context.Context, cfg config.Config, deps Deps) (ingest.Source, AudioIngress, error) { switch normalizeIngestKind(cfg.Ingest.Kind) { case "", "none": return nil, nil, nil @@ -83,7 +83,7 @@ func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, err src := srt.New("srt-main", srtCfg, opts...) return src, nil, nil case "aes67", "aoip", "aoip-rtp": - aoipCfg, detail, origin, err := buildAES67Config(cfg, deps) + aoipCfg, detail, origin, err := buildAES67Config(ctx, cfg, deps) if err != nil { return nil, nil, err } @@ -115,7 +115,10 @@ func SampleRateForKind(cfg config.Config) int { return cfg.Ingest.HTTPRaw.SampleRateHz } case "icecast": - return 44100 + // 48000 Hz is the most common rate for modern Icecast streams. + // The ingest runtime will auto-correct to the actual decoded rate + // after the first PCM chunk arrives (see runtime.go handleChunk). + return 48000 case "srt": if cfg.Ingest.SRT.SampleRateHz > 0 { return cfg.Ingest.SRT.SampleRateHz @@ -125,14 +128,17 @@ func SampleRateForKind(cfg config.Config) int { return cfg.Ingest.AES67.SampleRateHz } } - return 44100 + // Default to 48000 Hz: the correct rate for professional sources + // (SRT, AES67) and modern streams. The ingest runtime corrects this + // dynamically from the first decoded chunk for compressed sources. + return 48000 } func normalizeIngestKind(kind string) string { return strings.ToLower(strings.TrimSpace(kind)) } -func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, *ingest.SourceOrigin, error) { +func buildAES67Config(ctx context.Context, cfg config.Config, deps Deps) (aoiprxkit.Config, string, *ingest.SourceOrigin, error) { base := aoiprxkit.DefaultConfig() ing := cfg.Ingest.AES67 if strings.TrimSpace(ing.InterfaceName) != "" { @@ -160,7 +166,7 @@ func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, * base.ReadBufferBytes = ing.ReadBufferBytes } - sdpText, discoveredStreamName, origin, err := resolveAES67SDP(ing, deps) + sdpText, discoveredStreamName, origin, err := resolveAES67SDP(ctx, ing, deps) if err != nil { return aoiprxkit.Config{}, "", nil, err } @@ -205,7 +211,7 @@ func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, * return base, "", origin, nil } -func resolveAES67SDP(ing config.IngestAES67Config, deps Deps) (string, string, *ingest.SourceOrigin, error) { +func resolveAES67SDP(ctx context.Context, ing config.IngestAES67Config, deps Deps) (string, string, *ingest.SourceOrigin, error) { sdpText := strings.TrimSpace(ing.SDP) if sdpText == "" && strings.TrimSpace(ing.SDPPath) != "" { sdpPath := filepath.Clean(ing.SDPPath) @@ -246,7 +252,7 @@ func resolveAES67SDP(ing config.IngestAES67Config, deps Deps) (string, string, * if discover == nil { discover = discoverAES67ViaSAP } - announcement, err := discover(context.Background(), req) + announcement, err := discover(ctx, req) if err != nil { return "", "", nil, fmt.Errorf("discover ingest.aes67 stream %q via SAP: %w", req.StreamName, err) } From 14bdbbabeae851beb270f0edba84749d44e80099 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 22:06:49 +0200 Subject: [PATCH 2/4] ingest: harden adapter metadata and shutdown handling --- internal/ingest/adapters/icecast/icy.go | 42 +++++++++++++++++++-- internal/ingest/adapters/stdinpcm/source.go | 1 + 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/internal/ingest/adapters/icecast/icy.go b/internal/ingest/adapters/icecast/icy.go index 5a69d43..138db16 100644 --- a/internal/ingest/adapters/icecast/icy.go +++ b/internal/ingest/adapters/icecast/icy.go @@ -77,18 +77,31 @@ func (r *icyReader) readMetadataBlock() error { return nil } +// parseICYMetadata parses the ICY inline metadata block. +// +// ICY metadata is a semicolon-delimited key=value format where values are +// single-quoted strings. A naive strings.Split(raw, ";") breaks when the +// StreamTitle itself contains semicolons (e.g. "Artist - Title; Live Edit"). +// This parser is quote-aware: it only splits on semicolons that appear +// outside of single-quoted value strings. func parseICYMetadata(block []byte) icyMetadata { raw := strings.TrimRight(string(bytes.Trim(block, "\x00")), "\x00") meta := icyMetadata{} - for _, field := range strings.Split(raw, ";") { + + fields := splitICYFields(raw) + for _, field := range fields { field = strings.TrimSpace(field) if !strings.HasPrefix(field, "StreamTitle=") { continue } v := strings.TrimPrefix(field, "StreamTitle=") v = strings.TrimSpace(v) - if len(v) >= 2 && ((v[0] == '\'' && v[len(v)-1] == '\'') || (v[0] == '"' && v[len(v)-1] == '"')) { - v = v[1 : len(v)-1] + // Strip enclosing single or double quotes. + if len(v) >= 2 { + if (v[0] == '\'' && v[len(v)-1] == '\'') || + (v[0] == '"' && v[len(v)-1] == '"') { + v = v[1 : len(v)-1] + } } meta.StreamTitle = v break @@ -96,6 +109,29 @@ func parseICYMetadata(block []byte) icyMetadata { return meta } +// splitICYFields splits an ICY metadata string on semicolons that appear +// outside of single-quoted value strings. Semicolons inside quotes (e.g. +// StreamTitle='Artist - Song; Live';) are preserved as part of the value. +func splitICYFields(s string) []string { + var fields []string + inQuote := false + start := 0 + for i := 0; i < len(s); i++ { + c := s[i] + if c == '\'' { + inQuote = !inQuote + } + if c == ';' && !inQuote { + fields = append(fields, s[start:i]) + start = i + 1 + } + } + if start < len(s) { + fields = append(fields, s[start:]) + } + return fields +} + func parseICYMetaInt(raw string) (int, error) { raw = strings.TrimSpace(raw) if raw == "" { diff --git a/internal/ingest/adapters/stdinpcm/source.go b/internal/ingest/adapters/stdinpcm/source.go index 5785928..104b66b 100644 --- a/internal/ingest/adapters/stdinpcm/source.go +++ b/internal/ingest/adapters/stdinpcm/source.go @@ -119,6 +119,7 @@ func (s *Source) Stats() ingest.SourceStats { func (s *Source) readLoop(ctx context.Context) { defer s.wg.Done() + defer close(s.errs) defer close(s.chunks) frameBytes := s.channels * 2 From 1f49bdd14442252e03bd95def03a1d3fa6c99823 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 22:07:04 +0200 Subject: [PATCH 3/4] runtime: tighten queue, generator, and late-write semantics --- internal/app/engine.go | 2 +- internal/offline/generator.go | 23 +++++++++++++++++++---- internal/output/frame_queue.go | 25 ++++++++++++++----------- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/internal/app/engine.go b/internal/app/engine.go index c25537b..b4a7707 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -194,7 +194,7 @@ func (e *Engine) SetStreamSource(src *audio.StreamSource) { } resampler := audio.NewStreamResampler(src, compositeRate) e.generator.SetExternalSource(resampler) - log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)", + log.Printf("engine: live audio stream wired — initial %d Hz → %.0f Hz composite (buffer %d frames); actual decoded rate auto-corrects on first chunk", src.SampleRate, compositeRate, src.Stats().Capacity) } diff --git a/internal/offline/generator.go b/internal/offline/generator.go index dd6afde..6598087 100644 --- a/internal/offline/generator.go +++ b/internal/offline/generator.go @@ -120,8 +120,15 @@ func NewGenerator(cfg cfgpkg.Config) *Generator { // SetExternalSource sets a live audio source (e.g. StreamResampler) that // takes priority over WAV/tone sources. Must be called before the first -// GenerateFrame() call (i.e. before init). +// GenerateFrame() call; calling it after init() has no effect because +// g.source is already wired to the old source. func (g *Generator) SetExternalSource(src frameSource) { + if g.initialized { + // init() already called sourceFor() and wired g.source. Updating + // g.externalSource here would have no effect on the live DSP chain. + // This is a programming error — log loudly rather than silently break. + panic("generator: SetExternalSource called after GenerateFrame; call it before the engine starts") + } g.externalSource = src } @@ -189,12 +196,14 @@ func (g *Generator) init() { g.mpxNotch19, g.mpxNotch57 = dsp.NewCompositeProtection(g.sampleRate) // BS.412 MPX power limiter (EU/CH requirement for licensed FM) if g.cfg.FM.BS412Enabled { - chunkSec := 0.05 // 50ms chunks (matches engine default) + // chunkSec is not known at init time (Engine.chunkDuration may differ). + // Pass 0 here; GenerateFrame computes the actual chunk duration from + // the real sample count and updates BS.412 accordingly. g.bs412 = dsp.NewBS412Limiter( g.cfg.FM.BS412ThresholdDBr, g.cfg.FM.PilotLevel, g.cfg.FM.RDSInjection, - chunkSec, + 0, ) } if g.cfg.FM.FMModulationEnabled { @@ -360,8 +369,14 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame } } - // BS.412: feed this chunk's average audio power for next chunk's gain calculation + // BS.412: feed this chunk's actual duration and average audio power for + // the next chunk's gain calculation. Using the real sample count avoids + // the error that occurred when chunkSec was hardcoded to 0.05 — any + // SetChunkDuration() call from the engine would silently miscalibrate + // the ITU-R BS.412 power measurement window. if g.bs412 != nil && samples > 0 { + chunkSec := float64(samples) / g.sampleRate + g.bs412.UpdateChunkDuration(chunkSec) g.bs412.ProcessChunk(bs412PowerAccum / float64(samples)) } diff --git a/internal/output/frame_queue.go b/internal/output/frame_queue.go index e3db114..0443eec 100644 --- a/internal/output/frame_queue.go +++ b/internal/output/frame_queue.go @@ -80,22 +80,19 @@ func (q *FrameQueue) Capacity() int { } // FillLevel reports the current occupancy as a fraction of capacity. +// Uses len(ch) directly for accuracy: updateDepth() is called after the +// channel operation, so q.depth can lag by one frame transiently. func (q *FrameQueue) FillLevel() float64 { - q.mu.Lock() - depth := q.depth - q.mu.Unlock() if q.capacity == 0 { return 0 } - return float64(depth) / float64(q.capacity) + return float64(len(q.ch)) / float64(q.capacity) } // Depth returns the current number of frames in the queue. +// Uses len(ch) directly for accuracy (see FillLevel). func (q *FrameQueue) Depth() int { - q.mu.Lock() - depth := q.depth - q.mu.Unlock() - return depth + return len(q.ch) } // Stats returns a snapshot of the queue metrics. @@ -104,7 +101,7 @@ func (q *FrameQueue) Stats() QueueStats { fill := q.fillLevelLocked() stats := QueueStats{ Capacity: q.capacity, - Depth: q.depth, + Depth: len(q.ch), FillLevel: fill, Health: queueHealthFromFill(fill), HighWaterMark: q.highWaterMark, @@ -128,11 +125,15 @@ func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error { return ErrFrameQueueClosed } + // BUG-05 fix: increment depth BEFORE the channel send so that Stats() + // never reports fill=0 while a frame is in the channel awaiting receive. + // On context cancellation, undo the increment. + q.updateDepth(+1) select { case q.ch <- frame: - q.updateDepth(+1) return nil case <-ctx.Done(): + q.updateDepth(-1) q.recordPushTimeout() return ctx.Err() } @@ -211,7 +212,9 @@ func (q *FrameQueue) fillLevelLocked() float64 { if q.capacity == 0 { return 0 } - return float64(q.depth) / float64(q.capacity) + // Use len(ch) rather than q.depth: depth is updated after the channel + // operation, so it can be off by one during the Push/Pop window. + return float64(len(q.ch)) / float64(q.capacity) } func (q *FrameQueue) recordPushTimeout() { From b0964e71dc84d36a6e43e38ce15174a2600ebdb3 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 7 Apr 2026 22:07:19 +0200 Subject: [PATCH 4/4] rds: support explicit text clearing and symbol bootstrap --- internal/rds/encoder.go | 69 +++++++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 16 deletions(-) diff --git a/internal/rds/encoder.go b/internal/rds/encoder.go index fbe7b93..266e572 100644 --- a/internal/rds/encoder.go +++ b/internal/rds/encoder.go @@ -94,8 +94,17 @@ type Encoder struct { // Live-updatable text — written by control API, read at group boundaries. // Zero-contention: atomic swap, checked once per RDS group (~88ms at 228kHz). - livePS atomic.Value // string - liveRT atomic.Value // string + // pendingText.set distinguishes "no pending update" from "update to empty string" + // so that PS/RT can be explicitly cleared via UpdateText. + livePS atomic.Value // pendingText + liveRT atomic.Value // pendingText +} + +// pendingText carries a pending text update for PS or RT. +// set=false means no update is pending; set=true means apply val (even if empty). +type pendingText struct { + val string + set bool } func NewEncoder(cfg RDSConfig) (*Encoder, error) { @@ -163,16 +172,29 @@ func (e *Encoder) Reset() { // UpdateText hot-swaps PS and/or RT. Thread-safe — called from HTTP handlers, // applied at the next RDS group boundary by the DSP goroutine. -// Pass empty string to leave a field unchanged. +// +// Pass empty string to leave a field unchanged. To explicitly clear a field +// (set PS to 8 spaces, or RT to empty), use ClearPS/ClearRT instead. func (e *Encoder) UpdateText(ps, rt string) { if ps != "" { - e.livePS.Store(normalizePS(ps)) + e.livePS.Store(pendingText{val: normalizePS(ps), set: true}) } if rt != "" { - e.liveRT.Store(normalizeRT(rt)) + e.liveRT.Store(pendingText{val: normalizeRT(rt), set: true}) } } +// ClearPS resets the Program Service name to 8 spaces at the next group boundary. +func (e *Encoder) ClearPS() { + e.livePS.Store(pendingText{val: normalizePS(""), set: true}) +} + +// ClearRT resets RadioText to an empty string at the next group boundary. +// Per RDS spec, an empty RT causes receivers to clear their display. +func (e *Encoder) ClearRT() { + e.liveRT.Store(pendingText{val: "", set: true}) +} + // NextSample returns the next RDS subcarrier sample at the configured rate. // Uses the internal free-running 57 kHz carrier. Prefer NextSampleWithCarrier // for phase-locked operation in a stereo MPX chain. @@ -192,15 +214,15 @@ func (e *Encoder) NextSampleWithCarrier(carrier float64) float64 { // Apply live text updates at group boundaries (~88ms at 228kHz). // Atomics are consumed (cleared) after reading to prevent // re-applying the same text every group and toggling A/B flag. - if ps, ok := e.livePS.Load().(string); ok && ps != "" { - e.scheduler.cfg.PS = ps - e.livePS.Store("") // consumed + if pt, ok := e.livePS.Load().(pendingText); ok && pt.set { + e.scheduler.cfg.PS = pt.val + e.livePS.Store(pendingText{}) // consumed } - if rt, ok := e.liveRT.Load().(string); ok && rt != "" { - e.scheduler.cfg.RT = rt + if pt, ok := e.liveRT.Load().(pendingText); ok && pt.set { + e.scheduler.cfg.RT = pt.val e.scheduler.rtIdx = 0 // restart RT transmission for new text e.scheduler.rtABFlag = !e.scheduler.rtABFlag // toggle A/B per RDS spec - e.liveRT.Store("") // consumed + e.liveRT.Store(pendingText{}) // consumed } e.getRDSGroup() e.bitPos = 0 @@ -240,12 +262,27 @@ func (e *Encoder) Generate(n int) []float64 { out := make([]float64, n); for i := range out { out[i] = e.NextSample() }; return out } func (e *Encoder) Symbol() float64 { - if e.bitPos >= bitsPerGroup { return -1 } - sym := 1.0; if e.bitBuffer[e.bitPos] == 0 { sym = -1.0 } + // Populate the bit buffer on first call (bitPos starts at bitsPerGroup + // after NewEncoder/Reset, so the guard below would return -1 immediately + // without this bootstrap step). + if e.bitPos >= bitsPerGroup { + e.getRDSGroup() + e.bitPos = 0 + } + sym := 1.0 + if e.bitBuffer[e.bitPos] == 0 { + sym = -1.0 + } e.sampleCount++ - if e.sampleCount >= e.spb { e.sampleCount = 0; e.bitPos++ - if e.bitPos >= bitsPerGroup { e.getRDSGroup(); e.bitPos = 0 } - }; return sym + if e.sampleCount >= e.spb { + e.sampleCount = 0 + e.bitPos++ + if e.bitPos >= bitsPerGroup { + e.getRDSGroup() + e.bitPos = 0 + } + } + return sym } func (e *Encoder) getRDSGroup() {