Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

384 строки
10KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "io"
  6. "net/http"
  7. "sync"
  8. "github.com/jan/fm-rds-tx/internal/audio"
  9. "github.com/jan/fm-rds-tx/internal/config"
  10. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  11. "github.com/jan/fm-rds-tx/internal/platform"
  12. )
  13. //go:embed ui.html
  14. var uiHTML []byte
  15. // TXController is an optional interface the Server uses to start/stop TX
  16. // and apply live config changes.
  17. type TXController interface {
  18. StartTX() error
  19. StopTX() error
  20. TXStats() map[string]any
  21. UpdateConfig(patch LivePatch) error
  22. ResetFault() error
  23. }
  24. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  25. // nil = no change.
  26. type LivePatch struct {
  27. FrequencyMHz *float64
  28. OutputDrive *float64
  29. StereoEnabled *bool
  30. PilotLevel *float64
  31. RDSInjection *float64
  32. RDSEnabled *bool
  33. LimiterEnabled *bool
  34. LimiterCeiling *float64
  35. PS *string
  36. RadioText *string
  37. }
  38. type Server struct {
  39. mu sync.RWMutex
  40. cfg config.Config
  41. tx TXController
  42. drv platform.SoapyDriver // optional, for runtime stats
  43. streamSrc *audio.StreamSource // optional, for live audio ingest
  44. }
  45. type ConfigPatch struct {
  46. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  47. OutputDrive *float64 `json:"outputDrive,omitempty"`
  48. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  49. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  50. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  51. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  52. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  53. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  54. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  55. PS *string `json:"ps,omitempty"`
  56. RadioText *string `json:"radioText,omitempty"`
  57. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  58. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  59. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  60. }
  61. func NewServer(cfg config.Config) *Server {
  62. return &Server{cfg: cfg}
  63. }
  64. func (s *Server) SetTXController(tx TXController) {
  65. s.mu.Lock()
  66. s.tx = tx
  67. s.mu.Unlock()
  68. }
  69. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  70. s.mu.Lock()
  71. s.drv = drv
  72. s.mu.Unlock()
  73. }
  74. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  75. s.mu.Lock()
  76. s.streamSrc = src
  77. s.mu.Unlock()
  78. }
  79. func (s *Server) Handler() http.Handler {
  80. mux := http.NewServeMux()
  81. mux.HandleFunc("/", s.handleUI)
  82. mux.HandleFunc("/healthz", s.handleHealth)
  83. mux.HandleFunc("/status", s.handleStatus)
  84. mux.HandleFunc("/dry-run", s.handleDryRun)
  85. mux.HandleFunc("/config", s.handleConfig)
  86. mux.HandleFunc("/runtime", s.handleRuntime)
  87. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  88. mux.HandleFunc("/tx/start", s.handleTXStart)
  89. mux.HandleFunc("/tx/stop", s.handleTXStop)
  90. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  91. return mux
  92. }
  93. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  94. w.Header().Set("Content-Type", "application/json")
  95. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  96. }
  97. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  98. if r.URL.Path != "/" {
  99. http.NotFound(w, r)
  100. return
  101. }
  102. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  103. w.Header().Set("Cache-Control", "no-cache")
  104. w.Write(uiHTML)
  105. }
  106. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  107. s.mu.RLock()
  108. cfg := s.cfg
  109. tx := s.tx
  110. s.mu.RUnlock()
  111. status := map[string]any{
  112. "service": "fm-rds-tx",
  113. "backend": cfg.Backend.Kind,
  114. "frequencyMHz": cfg.FM.FrequencyMHz,
  115. "stereoEnabled": cfg.FM.StereoEnabled,
  116. "rdsEnabled": cfg.RDS.Enabled,
  117. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  118. "limiterEnabled": cfg.FM.LimiterEnabled,
  119. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  120. }
  121. if tx != nil {
  122. if stats := tx.TXStats(); stats != nil {
  123. if ri, ok := stats["runtimeIndicator"]; ok {
  124. status["runtimeIndicator"] = ri
  125. }
  126. if alert, ok := stats["runtimeAlert"]; ok {
  127. status["runtimeAlert"] = alert
  128. }
  129. if queue, ok := stats["queue"]; ok {
  130. status["queue"] = queue
  131. }
  132. if runtimeState, ok := stats["state"]; ok {
  133. status["runtimeState"] = runtimeState
  134. }
  135. }
  136. }
  137. w.Header().Set("Content-Type", "application/json")
  138. _ = json.NewEncoder(w).Encode(status)
  139. }
  140. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  141. s.mu.RLock()
  142. drv := s.drv
  143. tx := s.tx
  144. stream := s.streamSrc
  145. s.mu.RUnlock()
  146. result := map[string]any{}
  147. if drv != nil {
  148. result["driver"] = drv.Stats()
  149. }
  150. if tx != nil {
  151. result["engine"] = tx.TXStats()
  152. }
  153. if stream != nil {
  154. result["audioStream"] = stream.Stats()
  155. }
  156. w.Header().Set("Content-Type", "application/json")
  157. _ = json.NewEncoder(w).Encode(result)
  158. }
  159. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  160. if r.Method != http.MethodPost {
  161. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  162. return
  163. }
  164. s.mu.RLock()
  165. tx := s.tx
  166. s.mu.RUnlock()
  167. if tx == nil {
  168. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  169. return
  170. }
  171. if err := tx.ResetFault(); err != nil {
  172. http.Error(w, err.Error(), http.StatusConflict)
  173. return
  174. }
  175. w.Header().Set("Content-Type", "application/json")
  176. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  177. }
  178. // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
  179. // it into the live audio ring buffer. Use with:
  180. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  181. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  182. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  183. if r.Method != http.MethodPost {
  184. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  185. return
  186. }
  187. s.mu.RLock()
  188. stream := s.streamSrc
  189. s.mu.RUnlock()
  190. if stream == nil {
  191. http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
  192. return
  193. }
  194. // Read body in chunks and push to ring buffer
  195. buf := make([]byte, 32768)
  196. totalFrames := 0
  197. for {
  198. n, err := r.Body.Read(buf)
  199. if n > 0 {
  200. totalFrames += stream.WritePCM(buf[:n])
  201. }
  202. if err != nil {
  203. if err == io.EOF {
  204. break
  205. }
  206. http.Error(w, err.Error(), http.StatusInternalServerError)
  207. return
  208. }
  209. }
  210. w.Header().Set("Content-Type", "application/json")
  211. _ = json.NewEncoder(w).Encode(map[string]any{
  212. "ok": true,
  213. "frames": totalFrames,
  214. "stats": stream.Stats(),
  215. })
  216. }
  217. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  218. if r.Method != http.MethodPost {
  219. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  220. return
  221. }
  222. s.mu.RLock()
  223. tx := s.tx
  224. s.mu.RUnlock()
  225. if tx == nil {
  226. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  227. return
  228. }
  229. if err := tx.StartTX(); err != nil {
  230. http.Error(w, err.Error(), http.StatusConflict)
  231. return
  232. }
  233. w.Header().Set("Content-Type", "application/json")
  234. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  235. }
  236. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  237. if r.Method != http.MethodPost {
  238. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  239. return
  240. }
  241. s.mu.RLock()
  242. tx := s.tx
  243. s.mu.RUnlock()
  244. if tx == nil {
  245. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  246. return
  247. }
  248. if err := tx.StopTX(); err != nil {
  249. http.Error(w, err.Error(), http.StatusInternalServerError)
  250. return
  251. }
  252. w.Header().Set("Content-Type", "application/json")
  253. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"})
  254. }
  255. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  256. s.mu.RLock()
  257. cfg := s.cfg
  258. s.mu.RUnlock()
  259. w.Header().Set("Content-Type", "application/json")
  260. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  261. }
  262. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  263. switch r.Method {
  264. case http.MethodGet:
  265. s.mu.RLock()
  266. cfg := s.cfg
  267. s.mu.RUnlock()
  268. w.Header().Set("Content-Type", "application/json")
  269. _ = json.NewEncoder(w).Encode(cfg)
  270. case http.MethodPost:
  271. var patch ConfigPatch
  272. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  273. http.Error(w, err.Error(), http.StatusBadRequest)
  274. return
  275. }
  276. // Update the server's config snapshot (for GET /config and /status)
  277. s.mu.Lock()
  278. next := s.cfg
  279. if patch.FrequencyMHz != nil {
  280. next.FM.FrequencyMHz = *patch.FrequencyMHz
  281. }
  282. if patch.OutputDrive != nil {
  283. next.FM.OutputDrive = *patch.OutputDrive
  284. }
  285. if patch.ToneLeftHz != nil {
  286. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  287. }
  288. if patch.ToneRightHz != nil {
  289. next.Audio.ToneRightHz = *patch.ToneRightHz
  290. }
  291. if patch.ToneAmplitude != nil {
  292. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  293. }
  294. if patch.PS != nil {
  295. next.RDS.PS = *patch.PS
  296. }
  297. if patch.RadioText != nil {
  298. next.RDS.RadioText = *patch.RadioText
  299. }
  300. if patch.PreEmphasisTauUS != nil {
  301. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  302. }
  303. if patch.StereoEnabled != nil {
  304. next.FM.StereoEnabled = *patch.StereoEnabled
  305. }
  306. if patch.LimiterEnabled != nil {
  307. next.FM.LimiterEnabled = *patch.LimiterEnabled
  308. }
  309. if patch.LimiterCeiling != nil {
  310. next.FM.LimiterCeiling = *patch.LimiterCeiling
  311. }
  312. if patch.RDSEnabled != nil {
  313. next.RDS.Enabled = *patch.RDSEnabled
  314. }
  315. if patch.PilotLevel != nil {
  316. next.FM.PilotLevel = *patch.PilotLevel
  317. }
  318. if patch.RDSInjection != nil {
  319. next.FM.RDSInjection = *patch.RDSInjection
  320. }
  321. if err := next.Validate(); err != nil {
  322. s.mu.Unlock()
  323. http.Error(w, err.Error(), http.StatusBadRequest)
  324. return
  325. }
  326. lp := LivePatch{
  327. FrequencyMHz: patch.FrequencyMHz,
  328. OutputDrive: patch.OutputDrive,
  329. StereoEnabled: patch.StereoEnabled,
  330. PilotLevel: patch.PilotLevel,
  331. RDSInjection: patch.RDSInjection,
  332. RDSEnabled: patch.RDSEnabled,
  333. LimiterEnabled: patch.LimiterEnabled,
  334. LimiterCeiling: patch.LimiterCeiling,
  335. PS: patch.PS,
  336. RadioText: patch.RadioText,
  337. }
  338. tx := s.tx
  339. if tx != nil {
  340. if err := tx.UpdateConfig(lp); err != nil {
  341. s.mu.Unlock()
  342. http.Error(w, err.Error(), http.StatusBadRequest)
  343. return
  344. }
  345. }
  346. s.cfg = next
  347. live := tx != nil
  348. s.mu.Unlock()
  349. w.Header().Set("Content-Type", "application/json")
  350. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  351. default:
  352. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  353. }
  354. }