Просмотр исходного кода

ingest: add phase1 runtime skeleton and conversion model

main
Jan 1 месяц назад
Родитель
Сommit
5cb364d742
8 измененных файлов: 415 добавлений и 0 удалений
  1. +72
    -0
      internal/config/config.go
  2. +45
    -0
      internal/ingest/convert.go
  3. +55
    -0
      internal/ingest/convert_test.go
  4. +23
    -0
      internal/ingest/factory.go
  5. +147
    -0
      internal/ingest/runtime.go
  6. +12
    -0
      internal/ingest/source.go
  7. +35
    -0
      internal/ingest/stats.go
  8. +26
    -0
      internal/ingest/types.go

+ 72
- 0
internal/config/config.go Просмотреть файл

@@ -15,6 +15,7 @@ type Config struct {
Backend BackendConfig `json:"backend"`
Control ControlConfig `json:"control"`
Runtime RuntimeConfig `json:"runtime"`
Ingest IngestConfig `json:"ingest"`
}

type AudioConfig struct {
@@ -68,6 +69,33 @@ type RuntimeConfig struct {
FrameQueueCapacity int `json:"frameQueueCapacity"`
}

type IngestConfig struct {
Kind string `json:"kind"`
PrebufferMs int `json:"prebufferMs"`
StallTimeoutMs int `json:"stallTimeoutMs"`
Reconnect IngestReconnectConfig `json:"reconnect"`
Stdin IngestPCMConfig `json:"stdin"`
HTTPRaw IngestPCMConfig `json:"httpRaw"`
Icecast IngestIcecastConfig `json:"icecast"`
}

type IngestReconnectConfig struct {
Enabled bool `json:"enabled"`
InitialBackoffMs int `json:"initialBackoffMs"`
MaxBackoffMs int `json:"maxBackoffMs"`
}

type IngestPCMConfig struct {
SampleRateHz int `json:"sampleRateHz"`
Channels int `json:"channels"`
Format string `json:"format"`
}

type IngestIcecastConfig struct {
URL string `json:"url"`
Decoder string `json:"decoder"`
}

func Default() Config {
return Config{
Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0.4},
@@ -89,6 +117,29 @@ func Default() Config {
Backend: BackendConfig{Kind: "file", OutputPath: "build/out/composite.f32"},
Control: ControlConfig{ListenAddress: "127.0.0.1:8088"},
Runtime: RuntimeConfig{FrameQueueCapacity: 3},
Ingest: IngestConfig{
Kind: "none",
PrebufferMs: 1500,
StallTimeoutMs: 3000,
Reconnect: IngestReconnectConfig{
Enabled: true,
InitialBackoffMs: 1000,
MaxBackoffMs: 15000,
},
Stdin: IngestPCMConfig{
SampleRateHz: 44100,
Channels: 2,
Format: "s16le",
},
HTTPRaw: IngestPCMConfig{
SampleRateHz: 44100,
Channels: 2,
Format: "s16le",
},
Icecast: IngestIcecastConfig{
Decoder: "native",
},
},
}
}

@@ -174,6 +225,27 @@ func (c Config) Validate() error {
if c.Runtime.FrameQueueCapacity <= 0 {
return fmt.Errorf("runtime.frameQueueCapacity must be > 0")
}
if c.Ingest.Kind == "" {
c.Ingest.Kind = "none"
}
if c.Ingest.PrebufferMs < 0 {
return fmt.Errorf("ingest.prebufferMs must be >= 0")
}
if c.Ingest.StallTimeoutMs < 0 {
return fmt.Errorf("ingest.stallTimeoutMs must be >= 0")
}
if c.Ingest.Reconnect.InitialBackoffMs < 0 || c.Ingest.Reconnect.MaxBackoffMs < 0 {
return fmt.Errorf("ingest.reconnect backoff must be >= 0")
}
if c.Ingest.Reconnect.MaxBackoffMs > 0 && c.Ingest.Reconnect.InitialBackoffMs > c.Ingest.Reconnect.MaxBackoffMs {
return fmt.Errorf("ingest.reconnect.initialBackoffMs must be <= maxBackoffMs")
}
if c.Ingest.Stdin.SampleRateHz < 0 || c.Ingest.HTTPRaw.SampleRateHz < 0 {
return fmt.Errorf("ingest pcm sampleRateHz must be >= 0")
}
if c.Ingest.Stdin.Channels < 0 || c.Ingest.HTTPRaw.Channels < 0 {
return fmt.Errorf("ingest pcm channels must be >= 0")
}
// Fail-loud PI validation
if c.RDS.Enabled {
if _, err := ParsePI(c.RDS.PI); err != nil {


+ 45
- 0
internal/ingest/convert.go Просмотреть файл

@@ -0,0 +1,45 @@
package ingest

import (
"fmt"
"math"

"github.com/jan/fm-rds-tx/internal/audio"
)

const int32AbsMax = 2147483648.0

func ChunkToFrames(chunk PCMChunk) ([]audio.Frame, error) {
if chunk.Channels != 1 && chunk.Channels != 2 {
return nil, fmt.Errorf("unsupported channel count: %d", chunk.Channels)
}
if chunk.Channels <= 0 {
return nil, fmt.Errorf("invalid channel count: %d", chunk.Channels)
}
if len(chunk.Samples)%chunk.Channels != 0 {
return nil, fmt.Errorf("invalid interleaved sample count: %d for channels=%d", len(chunk.Samples), chunk.Channels)
}

frames := make([]audio.Frame, len(chunk.Samples)/chunk.Channels)
switch chunk.Channels {
case 1:
for i := range frames {
s := normalizePCM(chunk.Samples[i])
frames[i] = audio.NewFrame(s, s)
}
case 2:
for i := range frames {
off := i * 2
l := normalizePCM(chunk.Samples[off])
r := normalizePCM(chunk.Samples[off+1])
frames[i] = audio.NewFrame(l, r)
}
}
return frames, nil
}

func normalizePCM(v int32) audio.Sample {
norm := float64(v) / int32AbsMax
norm = math.Max(float64(audio.SampleMin), math.Min(float64(audio.SampleMax), norm))
return audio.Sample(norm)
}

+ 55
- 0
internal/ingest/convert_test.go Просмотреть файл

@@ -0,0 +1,55 @@
package ingest

import "testing"

func TestChunkToFramesMonoDuplicate(t *testing.T) {
frames, err := ChunkToFrames(PCMChunk{
Channels: 1,
Samples: []int32{2147483647, -2147483648},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(frames) != 2 {
t.Fatalf("expected 2 frames, got %d", len(frames))
}
if frames[0].L != frames[0].R {
t.Fatalf("expected mono duplication, got L=%v R=%v", frames[0].L, frames[0].R)
}
if frames[1].L != frames[1].R {
t.Fatalf("expected mono duplication, got L=%v R=%v", frames[1].L, frames[1].R)
}
}

func TestChunkToFramesStereoPassThrough(t *testing.T) {
frames, err := ChunkToFrames(PCMChunk{
Channels: 2,
Samples: []int32{100, 200, -300, -400},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(frames) != 2 {
t.Fatalf("expected 2 frames, got %d", len(frames))
}
if !(frames[0].L < frames[0].R) {
t.Fatalf("expected left < right for first frame, got %v >= %v", frames[0].L, frames[0].R)
}
if !(frames[1].L > frames[1].R) {
t.Fatalf("expected left > right for second frame, got %v <= %v", frames[1].L, frames[1].R)
}
}

func TestChunkToFramesRejectsUnsupportedChannels(t *testing.T) {
_, err := ChunkToFrames(PCMChunk{Channels: 3, Samples: []int32{1, 2, 3}})
if err == nil {
t.Fatal("expected error for unsupported channels")
}
}

func TestChunkToFramesRejectsInvalidInterleaving(t *testing.T) {
_, err := ChunkToFrames(PCMChunk{Channels: 2, Samples: []int32{1, 2, 3}})
if err == nil {
t.Fatal("expected error for invalid interleaving")
}
}

+ 23
- 0
internal/ingest/factory.go Просмотреть файл

@@ -0,0 +1,23 @@
package ingest

import (
"fmt"
"io"
"net/http"

"github.com/jan/fm-rds-tx/internal/config"
)

type FactoryDeps struct {
Stdin io.Reader
HTTP *http.Client
}

func BuildSource(cfg config.Config, deps FactoryDeps) (Source, error) {
switch cfg.Ingest.Kind {
case "", "none":
return nil, nil
default:
return nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind)
}
}

+ 147
- 0
internal/ingest/runtime.go Просмотреть файл

@@ -0,0 +1,147 @@
package ingest

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/jan/fm-rds-tx/internal/audio"
)

type Runtime struct {
sink *audio.StreamSource
source Source
started atomic.Bool

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

mu sync.RWMutex
active SourceDescriptor
stats RuntimeStats
}

func NewRuntime(sink *audio.StreamSource, src Source) *Runtime {
return &Runtime{
sink: sink,
source: src,
stats: RuntimeStats{
State: "idle",
},
}
}

func (r *Runtime) Start(ctx context.Context) error {
if r.source == nil {
r.mu.Lock()
r.stats.State = "idle"
r.mu.Unlock()
return nil
}
if !r.started.CompareAndSwap(false, true) {
return nil
}

r.ctx, r.cancel = context.WithCancel(ctx)
r.mu.Lock()
r.active = r.source.Descriptor()
r.stats.State = "starting"
r.mu.Unlock()
if err := r.source.Start(r.ctx); err != nil {
r.started.Store(false)
r.mu.Lock()
r.stats.State = "failed"
r.mu.Unlock()
return err
}

r.wg.Add(1)
go r.run()
return nil
}

func (r *Runtime) Stop() error {
if !r.started.CompareAndSwap(true, false) {
return nil
}
if r.cancel != nil {
r.cancel()
}
if r.source != nil {
_ = r.source.Stop()
}
r.wg.Wait()
r.mu.Lock()
r.stats.State = "stopped"
r.mu.Unlock()
return nil
}

func (r *Runtime) run() {
defer r.wg.Done()
r.mu.Lock()
r.stats.State = "running"
r.mu.Unlock()

ch := r.source.Chunks()
errCh := r.source.Errors()
for {
select {
case <-r.ctx.Done():
return
case err := <-errCh:
if err == nil {
continue
}
r.mu.Lock()
r.stats.State = "degraded"
r.mu.Unlock()
case chunk, ok := <-ch:
if !ok {
return
}
r.handleChunk(chunk)
}
}
}

func (r *Runtime) handleChunk(chunk PCMChunk) {
frames, err := ChunkToFrames(chunk)
if err != nil {
r.mu.Lock()
r.stats.ConvertErrors++
r.stats.State = "degraded"
r.mu.Unlock()
return
}
dropped := uint64(0)
for _, frame := range frames {
if !r.sink.WriteFrame(frame) {
dropped++
}
}
r.mu.Lock()
r.stats.LastChunkAt = time.Now()
r.stats.DroppedFrames += dropped
r.stats.WriteBlocked = dropped > 0
r.mu.Unlock()
}

func (r *Runtime) Stats() Stats {
r.mu.RLock()
runtimeStats := r.stats
active := r.active
r.mu.RUnlock()

sourceStats := SourceStats{}
if r.source != nil {
sourceStats = r.source.Stats()
}
return Stats{
Active: active,
Source: sourceStats,
Runtime: runtimeStats,
}
}

+ 12
- 0
internal/ingest/source.go Просмотреть файл

@@ -0,0 +1,12 @@
package ingest

import "context"

type Source interface {
Descriptor() SourceDescriptor
Start(ctx context.Context) error
Stop() error
Chunks() <-chan PCMChunk
Errors() <-chan error
Stats() SourceStats
}

+ 35
- 0
internal/ingest/stats.go Просмотреть файл

@@ -0,0 +1,35 @@
package ingest

import "time"

type SourceStats struct {
State string `json:"state"`
Connected bool `json:"connected"`
LastChunkAt time.Time `json:"lastChunkAt,omitempty"`
ChunksIn uint64 `json:"chunksIn"`
SamplesIn uint64 `json:"samplesIn"`
BufferedSeconds float64 `json:"bufferedSeconds"`
Overflows uint64 `json:"overflows"`
Underruns uint64 `json:"underruns"`
Reconnects uint64 `json:"reconnects"`
Discontinuities uint64 `json:"discontinuities"`
TransportLoss uint64 `json:"transportLoss"`
Reorders uint64 `json:"reorders"`
JitterDepth int `json:"jitterDepth"`
LastError string `json:"lastError,omitempty"`
}

type RuntimeStats struct {
State string `json:"state"`
Prebuffering bool `json:"prebuffering"`
LastChunkAt time.Time `json:"lastChunkAt,omitempty"`
DroppedFrames uint64 `json:"droppedFrames"`
ConvertErrors uint64 `json:"convertErrors"`
WriteBlocked bool `json:"writeBlocked"`
}

type Stats struct {
Active SourceDescriptor `json:"active"`
Source SourceStats `json:"source"`
Runtime RuntimeStats `json:"runtime"`
}

+ 26
- 0
internal/ingest/types.go Просмотреть файл

@@ -0,0 +1,26 @@
package ingest

import "time"

// PCMChunk is the ingest-internal normalized PCM unit before TX conversion.
// Samples are interleaved per channel.
type PCMChunk struct {
Samples []int32
Channels int
SampleRateHz int
Sequence uint64
Timestamp time.Time
SourceID string
Discontinuity bool
}

type SourceDescriptor struct {
ID string `json:"id"`
Kind string `json:"kind"`
Family string `json:"family"`
Transport string `json:"transport"`
Codec string `json:"codec"`
Channels int `json:"channels"`
SampleRateHz int `json:"sampleRateHz"`
Detail string `json:"detail,omitempty"`
}

Загрузка…
Отмена
Сохранить