Просмотр исходного кода

Merge branch 'bugfix/ingest-followup' into audio-ingest-rework

main
Jan 1 месяц назад
Родитель
Сommit
7caa46691f
8 измененных файлов: 154 добавлений и 53 удалений
  1. +1
    -1
      internal/app/engine.go
  2. +13
    -10
      internal/control/server.go
  3. +39
    -3
      internal/ingest/adapters/icecast/icy.go
  4. +1
    -0
      internal/ingest/adapters/stdinpcm/source.go
  5. +14
    -8
      internal/ingest/factory/factory.go
  6. +19
    -4
      internal/offline/generator.go
  7. +14
    -11
      internal/output/frame_queue.go
  8. +53
    -16
      internal/rds/encoder.go

+ 1
- 1
internal/app/engine.go Просмотреть файл

@@ -194,7 +194,7 @@ func (e *Engine) SetStreamSource(src *audio.StreamSource) {
}
resampler := audio.NewStreamResampler(src, compositeRate)
e.generator.SetExternalSource(resampler)
log.Printf("engine: live audio stream — %d Hz → %.0f Hz (buffer %d frames)",
log.Printf("engine: live audio stream wired initial %d Hz → %.0f Hz composite (buffer %d frames); actual decoded rate auto-corrects on first chunk",
src.SampleRate, compositeRate, src.Stats().Capacity)
}



+ 13
- 10
internal/control/server.go Просмотреть файл

@@ -8,20 +8,23 @@ import (
)

const (
defaultReadTimeout = 5 * time.Second
defaultWriteTimeout = 10 * time.Second
defaultIdleTimeout = 60 * time.Second
defaultMaxHeaderBytes = 1 << 20 // 1 MiB
defaultReadHeaderTimeout = 5 * time.Second
defaultIdleTimeout = 60 * time.Second
defaultMaxHeaderBytes = 1 << 20 // 1 MiB
)

// NewHTTPServer returns a configured HTTP server for the control plane.
//
// WriteTimeout is intentionally not set: /audio/stream accepts long-lived
// POST bodies (continuous PCM push) that would be cut off by a global write
// deadline. Individual endpoints are protected by MaxBytesReader limits.
// ReadHeaderTimeout guards against slow-header attacks.
func NewHTTPServer(cfg config.Config, handler http.Handler) *http.Server {
return &http.Server{
Addr: cfg.Control.ListenAddress,
Handler: handler,
ReadTimeout: defaultReadTimeout,
WriteTimeout: defaultWriteTimeout,
IdleTimeout: defaultIdleTimeout,
MaxHeaderBytes: defaultMaxHeaderBytes,
Addr: cfg.Control.ListenAddress,
Handler: handler,
ReadHeaderTimeout: defaultReadHeaderTimeout,
IdleTimeout: defaultIdleTimeout,
MaxHeaderBytes: defaultMaxHeaderBytes,
}
}

+ 39
- 3
internal/ingest/adapters/icecast/icy.go Просмотреть файл

@@ -77,18 +77,31 @@ func (r *icyReader) readMetadataBlock() error {
return nil
}

// parseICYMetadata parses the ICY inline metadata block.
//
// ICY metadata is a semicolon-delimited key=value format where values are
// single-quoted strings. A naive strings.Split(raw, ";") breaks when the
// StreamTitle itself contains semicolons (e.g. "Artist - Title; Live Edit").
// This parser is quote-aware: it only splits on semicolons that appear
// outside of single-quoted value strings.
func parseICYMetadata(block []byte) icyMetadata {
raw := strings.TrimRight(string(bytes.Trim(block, "\x00")), "\x00")
meta := icyMetadata{}
for _, field := range strings.Split(raw, ";") {

fields := splitICYFields(raw)
for _, field := range fields {
field = strings.TrimSpace(field)
if !strings.HasPrefix(field, "StreamTitle=") {
continue
}
v := strings.TrimPrefix(field, "StreamTitle=")
v = strings.TrimSpace(v)
if len(v) >= 2 && ((v[0] == '\'' && v[len(v)-1] == '\'') || (v[0] == '"' && v[len(v)-1] == '"')) {
v = v[1 : len(v)-1]
// Strip enclosing single or double quotes.
if len(v) >= 2 {
if (v[0] == '\'' && v[len(v)-1] == '\'') ||
(v[0] == '"' && v[len(v)-1] == '"') {
v = v[1 : len(v)-1]
}
}
meta.StreamTitle = v
break
@@ -96,6 +109,29 @@ func parseICYMetadata(block []byte) icyMetadata {
return meta
}

// splitICYFields splits an ICY metadata string on semicolons that appear
// outside of single-quoted value strings. Semicolons inside quotes (e.g.
// StreamTitle='Artist - Song; Live';) are preserved as part of the value.
func splitICYFields(s string) []string {
var fields []string
inQuote := false
start := 0
for i := 0; i < len(s); i++ {
c := s[i]
if c == '\'' {
inQuote = !inQuote
}
if c == ';' && !inQuote {
fields = append(fields, s[start:i])
start = i + 1
}
}
if start < len(s) {
fields = append(fields, s[start:])
}
return fields
}

func parseICYMetaInt(raw string) (int, error) {
raw = strings.TrimSpace(raw)
if raw == "" {


+ 1
- 0
internal/ingest/adapters/stdinpcm/source.go Просмотреть файл

@@ -119,6 +119,7 @@ func (s *Source) Stats() ingest.SourceStats {

func (s *Source) readLoop(ctx context.Context) {
defer s.wg.Done()
defer close(s.errs)
defer close(s.chunks)

frameBytes := s.channels * 2


+ 14
- 8
internal/ingest/factory/factory.go Просмотреть файл

@@ -42,7 +42,7 @@ type AES67DiscoverRequest struct {

type AES67DiscoverFunc func(ctx context.Context, req AES67DiscoverRequest) (aoiprxkit.SAPAnnouncement, error)

func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, error) {
func BuildSource(ctx context.Context, cfg config.Config, deps Deps) (ingest.Source, AudioIngress, error) {
switch normalizeIngestKind(cfg.Ingest.Kind) {
case "", "none":
return nil, nil, nil
@@ -83,7 +83,7 @@ func BuildSource(cfg config.Config, deps Deps) (ingest.Source, AudioIngress, err
src := srt.New("srt-main", srtCfg, opts...)
return src, nil, nil
case "aes67", "aoip", "aoip-rtp":
aoipCfg, detail, origin, err := buildAES67Config(cfg, deps)
aoipCfg, detail, origin, err := buildAES67Config(ctx, cfg, deps)
if err != nil {
return nil, nil, err
}
@@ -115,7 +115,10 @@ func SampleRateForKind(cfg config.Config) int {
return cfg.Ingest.HTTPRaw.SampleRateHz
}
case "icecast":
return 44100
// 48000 Hz is the most common rate for modern Icecast streams.
// The ingest runtime will auto-correct to the actual decoded rate
// after the first PCM chunk arrives (see runtime.go handleChunk).
return 48000
case "srt":
if cfg.Ingest.SRT.SampleRateHz > 0 {
return cfg.Ingest.SRT.SampleRateHz
@@ -125,14 +128,17 @@ func SampleRateForKind(cfg config.Config) int {
return cfg.Ingest.AES67.SampleRateHz
}
}
return 44100
// Default to 48000 Hz: the correct rate for professional sources
// (SRT, AES67) and modern streams. The ingest runtime corrects this
// dynamically from the first decoded chunk for compressed sources.
return 48000
}

func normalizeIngestKind(kind string) string {
return strings.ToLower(strings.TrimSpace(kind))
}

func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, *ingest.SourceOrigin, error) {
func buildAES67Config(ctx context.Context, cfg config.Config, deps Deps) (aoiprxkit.Config, string, *ingest.SourceOrigin, error) {
base := aoiprxkit.DefaultConfig()
ing := cfg.Ingest.AES67
if strings.TrimSpace(ing.InterfaceName) != "" {
@@ -160,7 +166,7 @@ func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, *
base.ReadBufferBytes = ing.ReadBufferBytes
}

sdpText, discoveredStreamName, origin, err := resolveAES67SDP(ing, deps)
sdpText, discoveredStreamName, origin, err := resolveAES67SDP(ctx, ing, deps)
if err != nil {
return aoiprxkit.Config{}, "", nil, err
}
@@ -205,7 +211,7 @@ func buildAES67Config(cfg config.Config, deps Deps) (aoiprxkit.Config, string, *
return base, "", origin, nil
}

func resolveAES67SDP(ing config.IngestAES67Config, deps Deps) (string, string, *ingest.SourceOrigin, error) {
func resolveAES67SDP(ctx context.Context, ing config.IngestAES67Config, deps Deps) (string, string, *ingest.SourceOrigin, error) {
sdpText := strings.TrimSpace(ing.SDP)
if sdpText == "" && strings.TrimSpace(ing.SDPPath) != "" {
sdpPath := filepath.Clean(ing.SDPPath)
@@ -246,7 +252,7 @@ func resolveAES67SDP(ing config.IngestAES67Config, deps Deps) (string, string, *
if discover == nil {
discover = discoverAES67ViaSAP
}
announcement, err := discover(context.Background(), req)
announcement, err := discover(ctx, req)
if err != nil {
return "", "", nil, fmt.Errorf("discover ingest.aes67 stream %q via SAP: %w", req.StreamName, err)
}


+ 19
- 4
internal/offline/generator.go Просмотреть файл

@@ -120,8 +120,15 @@ func NewGenerator(cfg cfgpkg.Config) *Generator {

// SetExternalSource sets a live audio source (e.g. StreamResampler) that
// takes priority over WAV/tone sources. Must be called before the first
// GenerateFrame() call (i.e. before init).
// GenerateFrame() call; calling it after init() has no effect because
// g.source is already wired to the old source.
func (g *Generator) SetExternalSource(src frameSource) {
if g.initialized {
// init() already called sourceFor() and wired g.source. Updating
// g.externalSource here would have no effect on the live DSP chain.
// This is a programming error — log loudly rather than silently break.
panic("generator: SetExternalSource called after GenerateFrame; call it before the engine starts")
}
g.externalSource = src
}

@@ -189,12 +196,14 @@ func (g *Generator) init() {
g.mpxNotch19, g.mpxNotch57 = dsp.NewCompositeProtection(g.sampleRate)
// BS.412 MPX power limiter (EU/CH requirement for licensed FM)
if g.cfg.FM.BS412Enabled {
chunkSec := 0.05 // 50ms chunks (matches engine default)
// chunkSec is not known at init time (Engine.chunkDuration may differ).
// Pass 0 here; GenerateFrame computes the actual chunk duration from
// the real sample count and updates BS.412 accordingly.
g.bs412 = dsp.NewBS412Limiter(
g.cfg.FM.BS412ThresholdDBr,
g.cfg.FM.PilotLevel,
g.cfg.FM.RDSInjection,
chunkSec,
0,
)
}
if g.cfg.FM.FMModulationEnabled {
@@ -360,8 +369,14 @@ func (g *Generator) GenerateFrame(duration time.Duration) *output.CompositeFrame
}
}

// BS.412: feed this chunk's average audio power for next chunk's gain calculation
// BS.412: feed this chunk's actual duration and average audio power for
// the next chunk's gain calculation. Using the real sample count avoids
// the error that occurred when chunkSec was hardcoded to 0.05 — any
// SetChunkDuration() call from the engine would silently miscalibrate
// the ITU-R BS.412 power measurement window.
if g.bs412 != nil && samples > 0 {
chunkSec := float64(samples) / g.sampleRate
g.bs412.UpdateChunkDuration(chunkSec)
g.bs412.ProcessChunk(bs412PowerAccum / float64(samples))
}



+ 14
- 11
internal/output/frame_queue.go Просмотреть файл

@@ -80,22 +80,19 @@ func (q *FrameQueue) Capacity() int {
}

// FillLevel reports the current occupancy as a fraction of capacity.
// Uses len(ch) directly for accuracy: updateDepth() is called after the
// channel operation, so q.depth can lag by one frame transiently.
func (q *FrameQueue) FillLevel() float64 {
q.mu.Lock()
depth := q.depth
q.mu.Unlock()
if q.capacity == 0 {
return 0
}
return float64(depth) / float64(q.capacity)
return float64(len(q.ch)) / float64(q.capacity)
}

// Depth returns the current number of frames in the queue.
// Uses len(ch) directly for accuracy (see FillLevel).
func (q *FrameQueue) Depth() int {
q.mu.Lock()
depth := q.depth
q.mu.Unlock()
return depth
return len(q.ch)
}

// Stats returns a snapshot of the queue metrics.
@@ -104,7 +101,7 @@ func (q *FrameQueue) Stats() QueueStats {
fill := q.fillLevelLocked()
stats := QueueStats{
Capacity: q.capacity,
Depth: q.depth,
Depth: len(q.ch),
FillLevel: fill,
Health: queueHealthFromFill(fill),
HighWaterMark: q.highWaterMark,
@@ -128,11 +125,15 @@ func (q *FrameQueue) Push(ctx context.Context, frame *CompositeFrame) error {
return ErrFrameQueueClosed
}

// BUG-05 fix: increment depth BEFORE the channel send so that Stats()
// never reports fill=0 while a frame is in the channel awaiting receive.
// On context cancellation, undo the increment.
q.updateDepth(+1)
select {
case q.ch <- frame:
q.updateDepth(+1)
return nil
case <-ctx.Done():
q.updateDepth(-1)
q.recordPushTimeout()
return ctx.Err()
}
@@ -211,7 +212,9 @@ func (q *FrameQueue) fillLevelLocked() float64 {
if q.capacity == 0 {
return 0
}
return float64(q.depth) / float64(q.capacity)
// Use len(ch) rather than q.depth: depth is updated after the channel
// operation, so it can be off by one during the Push/Pop window.
return float64(len(q.ch)) / float64(q.capacity)
}

func (q *FrameQueue) recordPushTimeout() {


+ 53
- 16
internal/rds/encoder.go Просмотреть файл

@@ -94,8 +94,17 @@ type Encoder struct {

// Live-updatable text — written by control API, read at group boundaries.
// Zero-contention: atomic swap, checked once per RDS group (~88ms at 228kHz).
livePS atomic.Value // string
liveRT atomic.Value // string
// pendingText.set distinguishes "no pending update" from "update to empty string"
// so that PS/RT can be explicitly cleared via UpdateText.
livePS atomic.Value // pendingText
liveRT atomic.Value // pendingText
}

// pendingText carries a pending text update for PS or RT.
// set=false means no update is pending; set=true means apply val (even if empty).
type pendingText struct {
val string
set bool
}

func NewEncoder(cfg RDSConfig) (*Encoder, error) {
@@ -163,16 +172,29 @@ func (e *Encoder) Reset() {

// UpdateText hot-swaps PS and/or RT. Thread-safe — called from HTTP handlers,
// applied at the next RDS group boundary by the DSP goroutine.
// Pass empty string to leave a field unchanged.
//
// Pass empty string to leave a field unchanged. To explicitly clear a field
// (set PS to 8 spaces, or RT to empty), use ClearPS/ClearRT instead.
func (e *Encoder) UpdateText(ps, rt string) {
if ps != "" {
e.livePS.Store(normalizePS(ps))
e.livePS.Store(pendingText{val: normalizePS(ps), set: true})
}
if rt != "" {
e.liveRT.Store(normalizeRT(rt))
e.liveRT.Store(pendingText{val: normalizeRT(rt), set: true})
}
}

// ClearPS resets the Program Service name to 8 spaces at the next group boundary.
func (e *Encoder) ClearPS() {
e.livePS.Store(pendingText{val: normalizePS(""), set: true})
}

// ClearRT resets RadioText to an empty string at the next group boundary.
// Per RDS spec, an empty RT causes receivers to clear their display.
func (e *Encoder) ClearRT() {
e.liveRT.Store(pendingText{val: "", set: true})
}

// NextSample returns the next RDS subcarrier sample at the configured rate.
// Uses the internal free-running 57 kHz carrier. Prefer NextSampleWithCarrier
// for phase-locked operation in a stereo MPX chain.
@@ -192,15 +214,15 @@ func (e *Encoder) NextSampleWithCarrier(carrier float64) float64 {
// Apply live text updates at group boundaries (~88ms at 228kHz).
// Atomics are consumed (cleared) after reading to prevent
// re-applying the same text every group and toggling A/B flag.
if ps, ok := e.livePS.Load().(string); ok && ps != "" {
e.scheduler.cfg.PS = ps
e.livePS.Store("") // consumed
if pt, ok := e.livePS.Load().(pendingText); ok && pt.set {
e.scheduler.cfg.PS = pt.val
e.livePS.Store(pendingText{}) // consumed
}
if rt, ok := e.liveRT.Load().(string); ok && rt != "" {
e.scheduler.cfg.RT = rt
if pt, ok := e.liveRT.Load().(pendingText); ok && pt.set {
e.scheduler.cfg.RT = pt.val
e.scheduler.rtIdx = 0 // restart RT transmission for new text
e.scheduler.rtABFlag = !e.scheduler.rtABFlag // toggle A/B per RDS spec
e.liveRT.Store("") // consumed
e.liveRT.Store(pendingText{}) // consumed
}
e.getRDSGroup()
e.bitPos = 0
@@ -240,12 +262,27 @@ func (e *Encoder) Generate(n int) []float64 {
out := make([]float64, n); for i := range out { out[i] = e.NextSample() }; return out
}
func (e *Encoder) Symbol() float64 {
if e.bitPos >= bitsPerGroup { return -1 }
sym := 1.0; if e.bitBuffer[e.bitPos] == 0 { sym = -1.0 }
// Populate the bit buffer on first call (bitPos starts at bitsPerGroup
// after NewEncoder/Reset, so the guard below would return -1 immediately
// without this bootstrap step).
if e.bitPos >= bitsPerGroup {
e.getRDSGroup()
e.bitPos = 0
}
sym := 1.0
if e.bitBuffer[e.bitPos] == 0 {
sym = -1.0
}
e.sampleCount++
if e.sampleCount >= e.spb { e.sampleCount = 0; e.bitPos++
if e.bitPos >= bitsPerGroup { e.getRDSGroup(); e.bitPos = 0 }
}; return sym
if e.sampleCount >= e.spb {
e.sampleCount = 0
e.bitPos++
if e.bitPos >= bitsPerGroup {
e.getRDSGroup()
e.bitPos = 0
}
}
return sym
}

func (e *Encoder) getRDSGroup() {


Загрузка…
Отмена
Сохранить