|
- package app
-
- import (
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-
- cfgpkg "github.com/jan/fm-rds-tx/internal/config"
- offpkg "github.com/jan/fm-rds-tx/internal/offline"
- "github.com/jan/fm-rds-tx/internal/platform"
- )
-
- // EngineState represents the current state of the TX engine.
- type EngineState int
-
- const (
- EngineIdle EngineState = iota
- EngineRunning
- EngineStopping
- )
-
- func (s EngineState) String() string {
- switch s {
- case EngineIdle:
- return "idle"
- case EngineRunning:
- return "running"
- case EngineStopping:
- return "stopping"
- default:
- return "unknown"
- }
- }
-
- // EngineStats exposes runtime telemetry from the engine.
- type EngineStats struct {
- State string `json:"state"`
- ChunksProduced uint64 `json:"chunksProduced"`
- TotalSamples uint64 `json:"totalSamples"`
- Underruns uint64 `json:"underruns"`
- LastError string `json:"lastError,omitempty"`
- UptimeSeconds float64 `json:"uptimeSeconds"`
- }
-
- // Engine is the continuous TX loop that produces chunks of composite/IQ
- // samples and feeds them to a backend driver.
- type Engine struct {
- cfg cfgpkg.Config
- driver platform.SoapyDriver
- generator *offpkg.Generator
- chunkDuration time.Duration
-
- mu sync.Mutex
- state EngineState
- cancel context.CancelFunc
- startedAt time.Time
-
- chunksProduced atomic.Uint64
- totalSamples atomic.Uint64
- underruns atomic.Uint64
- lastError atomic.Value // string
- }
-
- // NewEngine creates a TX engine. Default chunk duration is 50ms.
- func NewEngine(cfg cfgpkg.Config, driver platform.SoapyDriver) *Engine {
- return &Engine{
- cfg: cfg,
- driver: driver,
- generator: offpkg.NewGenerator(cfg),
- chunkDuration: 50 * time.Millisecond,
- state: EngineIdle,
- }
- }
-
- // SetChunkDuration changes the generation chunk size. Must be called before Start.
- func (e *Engine) SetChunkDuration(d time.Duration) {
- e.chunkDuration = d
- }
-
- // Start begins continuous transmission. TX is NOT started automatically.
- func (e *Engine) Start(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineIdle {
- e.mu.Unlock()
- return fmt.Errorf("engine already in state %s", e.state)
- }
-
- if err := e.driver.Start(ctx); err != nil {
- e.mu.Unlock()
- return fmt.Errorf("driver start: %w", err)
- }
-
- runCtx, cancel := context.WithCancel(ctx)
- e.cancel = cancel
- e.state = EngineRunning
- e.startedAt = time.Now()
- e.mu.Unlock()
-
- go e.run(runCtx)
- return nil
- }
-
- // Stop gracefully stops the TX engine.
- func (e *Engine) Stop(ctx context.Context) error {
- e.mu.Lock()
- if e.state != EngineRunning {
- e.mu.Unlock()
- return nil
- }
- e.state = EngineStopping
- e.cancel()
- e.mu.Unlock()
-
- // Give the run loop time to drain
- time.Sleep(e.chunkDuration * 2)
-
- if err := e.driver.Flush(ctx); err != nil {
- return err
- }
- if err := e.driver.Stop(ctx); err != nil {
- return err
- }
-
- e.mu.Lock()
- e.state = EngineIdle
- e.mu.Unlock()
- return nil
- }
-
- // Stats returns current engine telemetry.
- func (e *Engine) Stats() EngineStats {
- e.mu.Lock()
- state := e.state
- startedAt := e.startedAt
- e.mu.Unlock()
-
- var uptime float64
- if state == EngineRunning {
- uptime = time.Since(startedAt).Seconds()
- }
-
- errVal, _ := e.lastError.Load().(string)
-
- return EngineStats{
- State: state.String(),
- ChunksProduced: e.chunksProduced.Load(),
- TotalSamples: e.totalSamples.Load(),
- Underruns: e.underruns.Load(),
- LastError: errVal,
- UptimeSeconds: uptime,
- }
- }
-
- func (e *Engine) run(ctx context.Context) {
- ticker := time.NewTicker(e.chunkDuration)
- defer ticker.Stop()
-
- for {
- select {
- case <-ctx.Done():
- return
- case <-ticker.C:
- frame := e.generator.GenerateFrame(e.chunkDuration)
- n, err := e.driver.Write(ctx, frame)
- if err != nil {
- if ctx.Err() != nil {
- return // clean shutdown
- }
- e.lastError.Store(err.Error())
- e.underruns.Add(1)
- continue
- }
- e.chunksProduced.Add(1)
- e.totalSamples.Add(uint64(n))
- }
- }
- }
|