package fallback import ( "context" "encoding/binary" "errors" "fmt" "io" "os/exec" "strings" "sync" "time" "github.com/jan/fm-rds-tx/internal/ingest" "github.com/jan/fm-rds-tx/internal/ingest/decoder" ) type FFmpegDecoder struct{} func NewFFmpeg() *FFmpegDecoder { return &FFmpegDecoder{} } func (d *FFmpegDecoder) Name() string { return "ffmpeg-fallback" } func (d *FFmpegDecoder) DecodeStream(ctx context.Context, r io.Reader, meta decoder.StreamMeta, emit func(ingest.PCMChunk) error) error { if r == nil { return fmt.Errorf("%w: ffmpeg decoder stream reader is nil", decoder.ErrUnsupported) } if emit == nil { return fmt.Errorf("%w: ffmpeg decoder emit callback is nil", decoder.ErrUnsupported) } sampleRate := meta.SampleRateHz if sampleRate <= 0 { sampleRate = 44100 } channels := meta.Channels if channels <= 0 { channels = 2 } cmd := exec.CommandContext(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "pipe:0", "-f", "s16le", "-acodec", "pcm_s16le", "-ac", fmt.Sprintf("%d", channels), "-ar", fmt.Sprintf("%d", sampleRate), "pipe:1", ) stdin, err := cmd.StdinPipe() if err != nil { return fmt.Errorf("ffmpeg stdin pipe: %w", err) } stdout, err := cmd.StdoutPipe() if err != nil { return fmt.Errorf("ffmpeg stdout pipe: %w", err) } stderr, err := cmd.StderrPipe() if err != nil { return fmt.Errorf("ffmpeg stderr pipe: %w", err) } if err := cmd.Start(); err != nil { if errorsIsNotFound(err) { return fmt.Errorf("%w: ffmpeg executable not found in PATH", decoder.ErrUnsupported) } return fmt.Errorf("ffmpeg start: %w", err) } errCh := make(chan error, 2) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() _, copyErr := io.Copy(stdin, r) _ = stdin.Close() if copyErr != nil && ctx.Err() == nil { errCh <- fmt.Errorf("ffmpeg stdin copy: %w", copyErr) } }() stderrData, _ := io.ReadAll(stderr) readErr := d.readPCM(ctx, stdout, sampleRate, channels, meta.SourceID, emit) waitErr := cmd.Wait() wg.Wait() close(errCh) for e := range errCh { if e != nil { return e } } if readErr != nil { return readErr } if waitErr != nil && ctx.Err() == nil { msg := strings.TrimSpace(string(stderrData)) if msg != "" { return fmt.Errorf("ffmpeg decode: %w (%s)", waitErr, msg) } return fmt.Errorf("ffmpeg decode: %w", waitErr) } return nil } func (d *FFmpegDecoder) readPCM(ctx context.Context, r io.Reader, sampleRate, channels int, sourceID string, emit func(ingest.PCMChunk) error) error { const chunkFrames = 1024 frameBytes := channels * 2 buf := make([]byte, chunkFrames*frameBytes) seq := uint64(0) for { select { case <-ctx.Done(): return nil default: } n, err := io.ReadAtLeast(r, buf, frameBytes) if err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { if n > 0 { if emitErr := emitPCM(buf[:n], seq, sampleRate, channels, sourceID, emit); emitErr != nil { return emitErr } } return nil } return fmt.Errorf("ffmpeg read pcm: %w", err) } if emitErr := emitPCM(buf[:n], seq, sampleRate, channels, sourceID, emit); emitErr != nil { return emitErr } seq++ } } func emitPCM(data []byte, seq uint64, sampleRate, channels int, sourceID string, emit func(ingest.PCMChunk) error) error { samples := make([]int32, 0, len(data)/2) for i := 0; i+1 < len(data); i += 2 { v := int16(binary.LittleEndian.Uint16(data[i : i+2])) samples = append(samples, int32(v)<<16) } return emit(ingest.PCMChunk{ Samples: samples, Channels: channels, SampleRateHz: sampleRate, Sequence: seq, Timestamp: time.Now(), SourceID: sourceID, }) } func errorsIsNotFound(err error) bool { var execErr *exec.Error return err != nil && (errors.As(err, &execErr) || strings.Contains(strings.ToLower(err.Error()), "executable file not found")) }