diff --git a/internal/config/config.go b/internal/config/config.go index 2eaa227..37ca0ae 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,7 @@ type Config struct { Backend BackendConfig `json:"backend"` Control ControlConfig `json:"control"` Runtime RuntimeConfig `json:"runtime"` + Ingest IngestConfig `json:"ingest"` } type AudioConfig struct { @@ -68,6 +69,33 @@ type RuntimeConfig struct { FrameQueueCapacity int `json:"frameQueueCapacity"` } +type IngestConfig struct { + Kind string `json:"kind"` + PrebufferMs int `json:"prebufferMs"` + StallTimeoutMs int `json:"stallTimeoutMs"` + Reconnect IngestReconnectConfig `json:"reconnect"` + Stdin IngestPCMConfig `json:"stdin"` + HTTPRaw IngestPCMConfig `json:"httpRaw"` + Icecast IngestIcecastConfig `json:"icecast"` +} + +type IngestReconnectConfig struct { + Enabled bool `json:"enabled"` + InitialBackoffMs int `json:"initialBackoffMs"` + MaxBackoffMs int `json:"maxBackoffMs"` +} + +type IngestPCMConfig struct { + SampleRateHz int `json:"sampleRateHz"` + Channels int `json:"channels"` + Format string `json:"format"` +} + +type IngestIcecastConfig struct { + URL string `json:"url"` + Decoder string `json:"decoder"` +} + func Default() Config { return Config{ Audio: AudioConfig{Gain: 1.0, ToneLeftHz: 1000, ToneRightHz: 1600, ToneAmplitude: 0.4}, @@ -89,6 +117,29 @@ func Default() Config { Backend: BackendConfig{Kind: "file", OutputPath: "build/out/composite.f32"}, Control: ControlConfig{ListenAddress: "127.0.0.1:8088"}, Runtime: RuntimeConfig{FrameQueueCapacity: 3}, + Ingest: IngestConfig{ + Kind: "none", + PrebufferMs: 1500, + StallTimeoutMs: 3000, + Reconnect: IngestReconnectConfig{ + Enabled: true, + InitialBackoffMs: 1000, + MaxBackoffMs: 15000, + }, + Stdin: IngestPCMConfig{ + SampleRateHz: 44100, + Channels: 2, + Format: "s16le", + }, + HTTPRaw: IngestPCMConfig{ + SampleRateHz: 44100, + Channels: 2, + Format: "s16le", + }, + Icecast: IngestIcecastConfig{ + Decoder: "native", + }, + }, } } @@ -174,6 +225,27 @@ func (c Config) Validate() error { if c.Runtime.FrameQueueCapacity <= 0 { return fmt.Errorf("runtime.frameQueueCapacity must be > 0") } + if c.Ingest.Kind == "" { + c.Ingest.Kind = "none" + } + if c.Ingest.PrebufferMs < 0 { + return fmt.Errorf("ingest.prebufferMs must be >= 0") + } + if c.Ingest.StallTimeoutMs < 0 { + return fmt.Errorf("ingest.stallTimeoutMs must be >= 0") + } + if c.Ingest.Reconnect.InitialBackoffMs < 0 || c.Ingest.Reconnect.MaxBackoffMs < 0 { + return fmt.Errorf("ingest.reconnect backoff must be >= 0") + } + if c.Ingest.Reconnect.MaxBackoffMs > 0 && c.Ingest.Reconnect.InitialBackoffMs > c.Ingest.Reconnect.MaxBackoffMs { + return fmt.Errorf("ingest.reconnect.initialBackoffMs must be <= maxBackoffMs") + } + if c.Ingest.Stdin.SampleRateHz < 0 || c.Ingest.HTTPRaw.SampleRateHz < 0 { + return fmt.Errorf("ingest pcm sampleRateHz must be >= 0") + } + if c.Ingest.Stdin.Channels < 0 || c.Ingest.HTTPRaw.Channels < 0 { + return fmt.Errorf("ingest pcm channels must be >= 0") + } // Fail-loud PI validation if c.RDS.Enabled { if _, err := ParsePI(c.RDS.PI); err != nil { diff --git a/internal/ingest/convert.go b/internal/ingest/convert.go new file mode 100644 index 0000000..3dbd1b3 --- /dev/null +++ b/internal/ingest/convert.go @@ -0,0 +1,45 @@ +package ingest + +import ( + "fmt" + "math" + + "github.com/jan/fm-rds-tx/internal/audio" +) + +const int32AbsMax = 2147483648.0 + +func ChunkToFrames(chunk PCMChunk) ([]audio.Frame, error) { + if chunk.Channels != 1 && chunk.Channels != 2 { + return nil, fmt.Errorf("unsupported channel count: %d", chunk.Channels) + } + if chunk.Channels <= 0 { + return nil, fmt.Errorf("invalid channel count: %d", chunk.Channels) + } + if len(chunk.Samples)%chunk.Channels != 0 { + return nil, fmt.Errorf("invalid interleaved sample count: %d for channels=%d", len(chunk.Samples), chunk.Channels) + } + + frames := make([]audio.Frame, len(chunk.Samples)/chunk.Channels) + switch chunk.Channels { + case 1: + for i := range frames { + s := normalizePCM(chunk.Samples[i]) + frames[i] = audio.NewFrame(s, s) + } + case 2: + for i := range frames { + off := i * 2 + l := normalizePCM(chunk.Samples[off]) + r := normalizePCM(chunk.Samples[off+1]) + frames[i] = audio.NewFrame(l, r) + } + } + return frames, nil +} + +func normalizePCM(v int32) audio.Sample { + norm := float64(v) / int32AbsMax + norm = math.Max(float64(audio.SampleMin), math.Min(float64(audio.SampleMax), norm)) + return audio.Sample(norm) +} diff --git a/internal/ingest/convert_test.go b/internal/ingest/convert_test.go new file mode 100644 index 0000000..086325f --- /dev/null +++ b/internal/ingest/convert_test.go @@ -0,0 +1,55 @@ +package ingest + +import "testing" + +func TestChunkToFramesMonoDuplicate(t *testing.T) { + frames, err := ChunkToFrames(PCMChunk{ + Channels: 1, + Samples: []int32{2147483647, -2147483648}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(frames) != 2 { + t.Fatalf("expected 2 frames, got %d", len(frames)) + } + if frames[0].L != frames[0].R { + t.Fatalf("expected mono duplication, got L=%v R=%v", frames[0].L, frames[0].R) + } + if frames[1].L != frames[1].R { + t.Fatalf("expected mono duplication, got L=%v R=%v", frames[1].L, frames[1].R) + } +} + +func TestChunkToFramesStereoPassThrough(t *testing.T) { + frames, err := ChunkToFrames(PCMChunk{ + Channels: 2, + Samples: []int32{100, 200, -300, -400}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(frames) != 2 { + t.Fatalf("expected 2 frames, got %d", len(frames)) + } + if !(frames[0].L < frames[0].R) { + t.Fatalf("expected left < right for first frame, got %v >= %v", frames[0].L, frames[0].R) + } + if !(frames[1].L > frames[1].R) { + t.Fatalf("expected left > right for second frame, got %v <= %v", frames[1].L, frames[1].R) + } +} + +func TestChunkToFramesRejectsUnsupportedChannels(t *testing.T) { + _, err := ChunkToFrames(PCMChunk{Channels: 3, Samples: []int32{1, 2, 3}}) + if err == nil { + t.Fatal("expected error for unsupported channels") + } +} + +func TestChunkToFramesRejectsInvalidInterleaving(t *testing.T) { + _, err := ChunkToFrames(PCMChunk{Channels: 2, Samples: []int32{1, 2, 3}}) + if err == nil { + t.Fatal("expected error for invalid interleaving") + } +} diff --git a/internal/ingest/factory.go b/internal/ingest/factory.go new file mode 100644 index 0000000..d3a65e7 --- /dev/null +++ b/internal/ingest/factory.go @@ -0,0 +1,23 @@ +package ingest + +import ( + "fmt" + "io" + "net/http" + + "github.com/jan/fm-rds-tx/internal/config" +) + +type FactoryDeps struct { + Stdin io.Reader + HTTP *http.Client +} + +func BuildSource(cfg config.Config, deps FactoryDeps) (Source, error) { + switch cfg.Ingest.Kind { + case "", "none": + return nil, nil + default: + return nil, fmt.Errorf("unsupported ingest kind: %s", cfg.Ingest.Kind) + } +} diff --git a/internal/ingest/runtime.go b/internal/ingest/runtime.go new file mode 100644 index 0000000..27c7db5 --- /dev/null +++ b/internal/ingest/runtime.go @@ -0,0 +1,147 @@ +package ingest + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/jan/fm-rds-tx/internal/audio" +) + +type Runtime struct { + sink *audio.StreamSource + source Source + started atomic.Bool + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + mu sync.RWMutex + active SourceDescriptor + stats RuntimeStats +} + +func NewRuntime(sink *audio.StreamSource, src Source) *Runtime { + return &Runtime{ + sink: sink, + source: src, + stats: RuntimeStats{ + State: "idle", + }, + } +} + +func (r *Runtime) Start(ctx context.Context) error { + if r.source == nil { + r.mu.Lock() + r.stats.State = "idle" + r.mu.Unlock() + return nil + } + if !r.started.CompareAndSwap(false, true) { + return nil + } + + r.ctx, r.cancel = context.WithCancel(ctx) + r.mu.Lock() + r.active = r.source.Descriptor() + r.stats.State = "starting" + r.mu.Unlock() + if err := r.source.Start(r.ctx); err != nil { + r.started.Store(false) + r.mu.Lock() + r.stats.State = "failed" + r.mu.Unlock() + return err + } + + r.wg.Add(1) + go r.run() + return nil +} + +func (r *Runtime) Stop() error { + if !r.started.CompareAndSwap(true, false) { + return nil + } + if r.cancel != nil { + r.cancel() + } + if r.source != nil { + _ = r.source.Stop() + } + r.wg.Wait() + r.mu.Lock() + r.stats.State = "stopped" + r.mu.Unlock() + return nil +} + +func (r *Runtime) run() { + defer r.wg.Done() + r.mu.Lock() + r.stats.State = "running" + r.mu.Unlock() + + ch := r.source.Chunks() + errCh := r.source.Errors() + for { + select { + case <-r.ctx.Done(): + return + case err := <-errCh: + if err == nil { + continue + } + r.mu.Lock() + r.stats.State = "degraded" + r.mu.Unlock() + case chunk, ok := <-ch: + if !ok { + return + } + r.handleChunk(chunk) + } + } +} + +func (r *Runtime) handleChunk(chunk PCMChunk) { + frames, err := ChunkToFrames(chunk) + if err != nil { + r.mu.Lock() + r.stats.ConvertErrors++ + r.stats.State = "degraded" + r.mu.Unlock() + return + } + dropped := uint64(0) + for _, frame := range frames { + if !r.sink.WriteFrame(frame) { + dropped++ + } + } + r.mu.Lock() + r.stats.LastChunkAt = time.Now() + r.stats.DroppedFrames += dropped + r.stats.WriteBlocked = dropped > 0 + r.mu.Unlock() +} + +func (r *Runtime) Stats() Stats { + r.mu.RLock() + runtimeStats := r.stats + active := r.active + r.mu.RUnlock() + + sourceStats := SourceStats{} + if r.source != nil { + sourceStats = r.source.Stats() + } + return Stats{ + Active: active, + Source: sourceStats, + Runtime: runtimeStats, + } +} diff --git a/internal/ingest/source.go b/internal/ingest/source.go new file mode 100644 index 0000000..d851ed3 --- /dev/null +++ b/internal/ingest/source.go @@ -0,0 +1,12 @@ +package ingest + +import "context" + +type Source interface { + Descriptor() SourceDescriptor + Start(ctx context.Context) error + Stop() error + Chunks() <-chan PCMChunk + Errors() <-chan error + Stats() SourceStats +} diff --git a/internal/ingest/stats.go b/internal/ingest/stats.go new file mode 100644 index 0000000..fb135c0 --- /dev/null +++ b/internal/ingest/stats.go @@ -0,0 +1,35 @@ +package ingest + +import "time" + +type SourceStats struct { + State string `json:"state"` + Connected bool `json:"connected"` + LastChunkAt time.Time `json:"lastChunkAt,omitempty"` + ChunksIn uint64 `json:"chunksIn"` + SamplesIn uint64 `json:"samplesIn"` + BufferedSeconds float64 `json:"bufferedSeconds"` + Overflows uint64 `json:"overflows"` + Underruns uint64 `json:"underruns"` + Reconnects uint64 `json:"reconnects"` + Discontinuities uint64 `json:"discontinuities"` + TransportLoss uint64 `json:"transportLoss"` + Reorders uint64 `json:"reorders"` + JitterDepth int `json:"jitterDepth"` + LastError string `json:"lastError,omitempty"` +} + +type RuntimeStats struct { + State string `json:"state"` + Prebuffering bool `json:"prebuffering"` + LastChunkAt time.Time `json:"lastChunkAt,omitempty"` + DroppedFrames uint64 `json:"droppedFrames"` + ConvertErrors uint64 `json:"convertErrors"` + WriteBlocked bool `json:"writeBlocked"` +} + +type Stats struct { + Active SourceDescriptor `json:"active"` + Source SourceStats `json:"source"` + Runtime RuntimeStats `json:"runtime"` +} diff --git a/internal/ingest/types.go b/internal/ingest/types.go new file mode 100644 index 0000000..bae4801 --- /dev/null +++ b/internal/ingest/types.go @@ -0,0 +1,26 @@ +package ingest + +import "time" + +// PCMChunk is the ingest-internal normalized PCM unit before TX conversion. +// Samples are interleaved per channel. +type PCMChunk struct { + Samples []int32 + Channels int + SampleRateHz int + Sequence uint64 + Timestamp time.Time + SourceID string + Discontinuity bool +} + +type SourceDescriptor struct { + ID string `json:"id"` + Kind string `json:"kind"` + Family string `json:"family"` + Transport string `json:"transport"` + Codec string `json:"codec"` + Channels int `json:"channels"` + SampleRateHz int `json:"sampleRateHz"` + Detail string `json:"detail,omitempty"` +}