Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

359 строки
9.6KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "io"
  6. "net/http"
  7. "sync"
  8. "github.com/jan/fm-rds-tx/internal/audio"
  9. "github.com/jan/fm-rds-tx/internal/config"
  10. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  11. "github.com/jan/fm-rds-tx/internal/platform"
  12. )
  13. //go:embed ui.html
  14. var uiHTML []byte
  15. // TXController is an optional interface the Server uses to start/stop TX
  16. // and apply live config changes.
  17. type TXController interface {
  18. StartTX() error
  19. StopTX() error
  20. TXStats() map[string]any
  21. UpdateConfig(patch LivePatch) error
  22. }
  23. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  24. // nil = no change.
  25. type LivePatch struct {
  26. FrequencyMHz *float64
  27. OutputDrive *float64
  28. StereoEnabled *bool
  29. PilotLevel *float64
  30. RDSInjection *float64
  31. RDSEnabled *bool
  32. LimiterEnabled *bool
  33. LimiterCeiling *float64
  34. PS *string
  35. RadioText *string
  36. }
  37. type Server struct {
  38. mu sync.RWMutex
  39. cfg config.Config
  40. tx TXController
  41. drv platform.SoapyDriver // optional, for runtime stats
  42. streamSrc *audio.StreamSource // optional, for live audio ingest
  43. }
  44. type ConfigPatch struct {
  45. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  46. OutputDrive *float64 `json:"outputDrive,omitempty"`
  47. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  48. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  49. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  50. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  51. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  52. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  53. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  54. PS *string `json:"ps,omitempty"`
  55. RadioText *string `json:"radioText,omitempty"`
  56. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  57. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  58. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  59. }
  60. func NewServer(cfg config.Config) *Server {
  61. return &Server{cfg: cfg}
  62. }
  63. func (s *Server) SetTXController(tx TXController) {
  64. s.mu.Lock()
  65. s.tx = tx
  66. s.mu.Unlock()
  67. }
  68. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  69. s.mu.Lock()
  70. s.drv = drv
  71. s.mu.Unlock()
  72. }
  73. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  74. s.mu.Lock()
  75. s.streamSrc = src
  76. s.mu.Unlock()
  77. }
  78. func (s *Server) Handler() http.Handler {
  79. mux := http.NewServeMux()
  80. mux.HandleFunc("/", s.handleUI)
  81. mux.HandleFunc("/healthz", s.handleHealth)
  82. mux.HandleFunc("/status", s.handleStatus)
  83. mux.HandleFunc("/dry-run", s.handleDryRun)
  84. mux.HandleFunc("/config", s.handleConfig)
  85. mux.HandleFunc("/runtime", s.handleRuntime)
  86. mux.HandleFunc("/tx/start", s.handleTXStart)
  87. mux.HandleFunc("/tx/stop", s.handleTXStop)
  88. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  89. return mux
  90. }
  91. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  92. w.Header().Set("Content-Type", "application/json")
  93. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  94. }
  95. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  96. if r.URL.Path != "/" {
  97. http.NotFound(w, r)
  98. return
  99. }
  100. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  101. w.Header().Set("Cache-Control", "no-cache")
  102. w.Write(uiHTML)
  103. }
  104. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  105. s.mu.RLock()
  106. cfg := s.cfg
  107. tx := s.tx
  108. s.mu.RUnlock()
  109. status := map[string]any{
  110. "service": "fm-rds-tx",
  111. "backend": cfg.Backend.Kind,
  112. "frequencyMHz": cfg.FM.FrequencyMHz,
  113. "stereoEnabled": cfg.FM.StereoEnabled,
  114. "rdsEnabled": cfg.RDS.Enabled,
  115. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  116. "limiterEnabled": cfg.FM.LimiterEnabled,
  117. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  118. }
  119. if tx != nil {
  120. if stats := tx.TXStats(); stats != nil {
  121. if ri, ok := stats["runtimeIndicator"]; ok {
  122. status["runtimeIndicator"] = ri
  123. }
  124. if alert, ok := stats["runtimeAlert"]; ok {
  125. status["runtimeAlert"] = alert
  126. }
  127. if queue, ok := stats["queue"]; ok {
  128. status["queue"] = queue
  129. }
  130. }
  131. }
  132. w.Header().Set("Content-Type", "application/json")
  133. _ = json.NewEncoder(w).Encode(status)
  134. }
  135. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  136. s.mu.RLock()
  137. drv := s.drv
  138. tx := s.tx
  139. stream := s.streamSrc
  140. s.mu.RUnlock()
  141. result := map[string]any{}
  142. if drv != nil {
  143. result["driver"] = drv.Stats()
  144. }
  145. if tx != nil {
  146. result["engine"] = tx.TXStats()
  147. }
  148. if stream != nil {
  149. result["audioStream"] = stream.Stats()
  150. }
  151. w.Header().Set("Content-Type", "application/json")
  152. _ = json.NewEncoder(w).Encode(result)
  153. }
  154. // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
  155. // it into the live audio ring buffer. Use with:
  156. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  157. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  158. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  159. if r.Method != http.MethodPost {
  160. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  161. return
  162. }
  163. s.mu.RLock()
  164. stream := s.streamSrc
  165. s.mu.RUnlock()
  166. if stream == nil {
  167. http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
  168. return
  169. }
  170. // Read body in chunks and push to ring buffer
  171. buf := make([]byte, 32768)
  172. totalFrames := 0
  173. for {
  174. n, err := r.Body.Read(buf)
  175. if n > 0 {
  176. totalFrames += stream.WritePCM(buf[:n])
  177. }
  178. if err != nil {
  179. if err == io.EOF {
  180. break
  181. }
  182. http.Error(w, err.Error(), http.StatusInternalServerError)
  183. return
  184. }
  185. }
  186. w.Header().Set("Content-Type", "application/json")
  187. _ = json.NewEncoder(w).Encode(map[string]any{
  188. "ok": true,
  189. "frames": totalFrames,
  190. "stats": stream.Stats(),
  191. })
  192. }
  193. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  194. if r.Method != http.MethodPost {
  195. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  196. return
  197. }
  198. s.mu.RLock()
  199. tx := s.tx
  200. s.mu.RUnlock()
  201. if tx == nil {
  202. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  203. return
  204. }
  205. if err := tx.StartTX(); err != nil {
  206. http.Error(w, err.Error(), http.StatusConflict)
  207. return
  208. }
  209. w.Header().Set("Content-Type", "application/json")
  210. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  211. }
  212. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  213. if r.Method != http.MethodPost {
  214. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  215. return
  216. }
  217. s.mu.RLock()
  218. tx := s.tx
  219. s.mu.RUnlock()
  220. if tx == nil {
  221. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  222. return
  223. }
  224. if err := tx.StopTX(); err != nil {
  225. http.Error(w, err.Error(), http.StatusInternalServerError)
  226. return
  227. }
  228. w.Header().Set("Content-Type", "application/json")
  229. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"})
  230. }
  231. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  232. s.mu.RLock()
  233. cfg := s.cfg
  234. s.mu.RUnlock()
  235. w.Header().Set("Content-Type", "application/json")
  236. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  237. }
  238. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  239. switch r.Method {
  240. case http.MethodGet:
  241. s.mu.RLock()
  242. cfg := s.cfg
  243. s.mu.RUnlock()
  244. w.Header().Set("Content-Type", "application/json")
  245. _ = json.NewEncoder(w).Encode(cfg)
  246. case http.MethodPost:
  247. var patch ConfigPatch
  248. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  249. http.Error(w, err.Error(), http.StatusBadRequest)
  250. return
  251. }
  252. // Update the server's config snapshot (for GET /config and /status)
  253. s.mu.Lock()
  254. next := s.cfg
  255. if patch.FrequencyMHz != nil {
  256. next.FM.FrequencyMHz = *patch.FrequencyMHz
  257. }
  258. if patch.OutputDrive != nil {
  259. next.FM.OutputDrive = *patch.OutputDrive
  260. }
  261. if patch.ToneLeftHz != nil {
  262. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  263. }
  264. if patch.ToneRightHz != nil {
  265. next.Audio.ToneRightHz = *patch.ToneRightHz
  266. }
  267. if patch.ToneAmplitude != nil {
  268. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  269. }
  270. if patch.PS != nil {
  271. next.RDS.PS = *patch.PS
  272. }
  273. if patch.RadioText != nil {
  274. next.RDS.RadioText = *patch.RadioText
  275. }
  276. if patch.PreEmphasisTauUS != nil {
  277. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  278. }
  279. if patch.StereoEnabled != nil {
  280. next.FM.StereoEnabled = *patch.StereoEnabled
  281. }
  282. if patch.LimiterEnabled != nil {
  283. next.FM.LimiterEnabled = *patch.LimiterEnabled
  284. }
  285. if patch.LimiterCeiling != nil {
  286. next.FM.LimiterCeiling = *patch.LimiterCeiling
  287. }
  288. if patch.RDSEnabled != nil {
  289. next.RDS.Enabled = *patch.RDSEnabled
  290. }
  291. if patch.PilotLevel != nil {
  292. next.FM.PilotLevel = *patch.PilotLevel
  293. }
  294. if patch.RDSInjection != nil {
  295. next.FM.RDSInjection = *patch.RDSInjection
  296. }
  297. if err := next.Validate(); err != nil {
  298. s.mu.Unlock()
  299. http.Error(w, err.Error(), http.StatusBadRequest)
  300. return
  301. }
  302. lp := LivePatch{
  303. FrequencyMHz: patch.FrequencyMHz,
  304. OutputDrive: patch.OutputDrive,
  305. StereoEnabled: patch.StereoEnabled,
  306. PilotLevel: patch.PilotLevel,
  307. RDSInjection: patch.RDSInjection,
  308. RDSEnabled: patch.RDSEnabled,
  309. LimiterEnabled: patch.LimiterEnabled,
  310. LimiterCeiling: patch.LimiterCeiling,
  311. PS: patch.PS,
  312. RadioText: patch.RadioText,
  313. }
  314. tx := s.tx
  315. if tx != nil {
  316. if err := tx.UpdateConfig(lp); err != nil {
  317. s.mu.Unlock()
  318. http.Error(w, err.Error(), http.StatusBadRequest)
  319. return
  320. }
  321. }
  322. s.cfg = next
  323. live := tx != nil
  324. s.mu.Unlock()
  325. w.Header().Set("Content-Type", "application/json")
  326. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  327. default:
  328. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  329. }
  330. }