|
- package stdinpcm
-
- import (
- "context"
- "encoding/binary"
- "fmt"
- "io"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/jan/fm-rds-tx/internal/ingest"
- )
-
- type Source struct {
- id string
- reader io.Reader
- sampleRate int
- channels int
- chunkFrames int
-
- chunks chan ingest.PCMChunk
- errs chan error
-
- cancel context.CancelFunc
- wg sync.WaitGroup
-
- state atomic.Value // string
- chunksIn atomic.Uint64
- samplesIn atomic.Uint64
- discontinuities atomic.Uint64
- lastChunkAtUnix atomic.Int64
- lastError atomic.Value // string
- }
-
- func New(id string, reader io.Reader, sampleRate, channels, chunkFrames int) *Source {
- if id == "" {
- id = "stdin"
- }
- if sampleRate <= 0 {
- sampleRate = 44100
- }
- if channels <= 0 {
- channels = 2
- }
- if chunkFrames <= 0 {
- chunkFrames = 1024
- }
-
- s := &Source{
- id: id,
- reader: reader,
- sampleRate: sampleRate,
- channels: channels,
- chunkFrames: chunkFrames,
- chunks: make(chan ingest.PCMChunk, 8),
- errs: make(chan error, 4),
- }
- s.state.Store("idle")
- return s
- }
-
- func (s *Source) Descriptor() ingest.SourceDescriptor {
- return ingest.SourceDescriptor{
- ID: s.id,
- Kind: "stdin-pcm",
- Family: "raw",
- Transport: "stdin",
- Codec: "pcm_s16le",
- Channels: s.channels,
- SampleRateHz: s.sampleRate,
- Detail: "S16LE interleaved PCM via stdin",
- }
- }
-
- func (s *Source) Start(ctx context.Context) error {
- if s.reader == nil {
- return fmt.Errorf("stdin source reader is nil")
- }
- runCtx, cancel := context.WithCancel(ctx)
- s.cancel = cancel
- s.state.Store("running")
-
- s.wg.Add(1)
- go s.readLoop(runCtx)
- return nil
- }
-
- func (s *Source) Stop() error {
- if s.cancel != nil {
- s.cancel()
- }
- s.wg.Wait()
- s.state.Store("stopped")
- return nil
- }
-
- func (s *Source) Chunks() <-chan ingest.PCMChunk { return s.chunks }
- func (s *Source) Errors() <-chan error { return s.errs }
-
- func (s *Source) Stats() ingest.SourceStats {
- state, _ := s.state.Load().(string)
- last := s.lastChunkAtUnix.Load()
- errStr, _ := s.lastError.Load().(string)
- var lastChunkAt time.Time
- if last > 0 {
- lastChunkAt = time.Unix(0, last)
- }
- return ingest.SourceStats{
- State: state,
- Connected: state == "running",
- LastChunkAt: lastChunkAt,
- ChunksIn: s.chunksIn.Load(),
- SamplesIn: s.samplesIn.Load(),
- Discontinuities: s.discontinuities.Load(),
- LastError: errStr,
- }
- }
-
- func (s *Source) readLoop(ctx context.Context) {
- defer s.wg.Done()
- defer close(s.chunks)
-
- frameBytes := s.channels * 2
- buf := make([]byte, s.chunkFrames*frameBytes)
- seq := uint64(0)
-
- for {
- select {
- case <-ctx.Done():
- return
- default:
- }
-
- n, err := io.ReadAtLeast(s.reader, buf, frameBytes)
- if err != nil {
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- if n > 0 {
- s.emitChunk(buf[:n], seq)
- }
- s.state.Store("stopped")
- return
- }
- wrapped := fmt.Errorf("stdin read: %w", err)
- s.lastError.Store(wrapped.Error())
- s.state.Store("failed")
- select {
- case s.errs <- wrapped:
- default:
- }
- return
- }
- s.emitChunk(buf[:n], seq)
- seq++
- }
- }
-
- func (s *Source) emitChunk(data []byte, seq uint64) {
- samples := make([]int32, 0, len(data)/2)
- for i := 0; i+1 < len(data); i += 2 {
- v := int16(binary.LittleEndian.Uint16(data[i : i+2]))
- samples = append(samples, int32(v)<<16)
- }
- chunk := ingest.PCMChunk{
- Samples: samples,
- Channels: s.channels,
- SampleRateHz: s.sampleRate,
- Sequence: seq,
- Timestamp: time.Now(),
- SourceID: s.id,
- }
- s.chunksIn.Add(1)
- s.samplesIn.Add(uint64(len(samples)))
- s.lastChunkAtUnix.Store(time.Now().UnixNano())
- select {
- case s.chunks <- chunk:
- default:
- s.discontinuities.Add(1)
- }
- }
|