Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

469 строки
12KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "io"
  6. "mime"
  7. "net/http"
  8. "strings"
  9. "sync"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. "github.com/jan/fm-rds-tx/internal/config"
  12. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  13. "github.com/jan/fm-rds-tx/internal/platform"
  14. )
  15. //go:embed ui.html
  16. var uiHTML []byte
  17. // TXController is an optional interface the Server uses to start/stop TX
  18. // and apply live config changes.
  19. type TXController interface {
  20. StartTX() error
  21. StopTX() error
  22. TXStats() map[string]any
  23. UpdateConfig(patch LivePatch) error
  24. ResetFault() error
  25. }
  26. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  27. // nil = no change.
  28. type LivePatch struct {
  29. FrequencyMHz *float64
  30. OutputDrive *float64
  31. StereoEnabled *bool
  32. PilotLevel *float64
  33. RDSInjection *float64
  34. RDSEnabled *bool
  35. LimiterEnabled *bool
  36. LimiterCeiling *float64
  37. PS *string
  38. RadioText *string
  39. }
  40. type Server struct {
  41. mu sync.RWMutex
  42. cfg config.Config
  43. tx TXController
  44. drv platform.SoapyDriver // optional, for runtime stats
  45. streamSrc *audio.StreamSource // optional, for live audio ingest
  46. }
  47. const (
  48. maxConfigBodyBytes = 64 << 10 // 64 KiB
  49. configContentTypeHeader = "application/json"
  50. noBodyErrMsg = "request must not include a body"
  51. audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16"
  52. )
  53. var audioStreamAllowedMediaTypes = []string{
  54. "application/octet-stream",
  55. "audio/l16",
  56. }
  57. func isJSONContentType(r *http.Request) bool {
  58. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  59. if ct == "" {
  60. return false
  61. }
  62. mediaType, _, err := mime.ParseMediaType(ct)
  63. if err != nil {
  64. return false
  65. }
  66. return strings.EqualFold(mediaType, configContentTypeHeader)
  67. }
  68. type ConfigPatch struct {
  69. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  70. OutputDrive *float64 `json:"outputDrive,omitempty"`
  71. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  72. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  73. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  74. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  75. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  76. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  77. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  78. PS *string `json:"ps,omitempty"`
  79. RadioText *string `json:"radioText,omitempty"`
  80. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  81. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  82. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  83. }
  84. func NewServer(cfg config.Config) *Server {
  85. return &Server{cfg: cfg}
  86. }
  87. func hasRequestBody(r *http.Request) bool {
  88. if r.ContentLength > 0 {
  89. return true
  90. }
  91. for _, te := range r.TransferEncoding {
  92. if strings.EqualFold(te, "chunked") {
  93. return true
  94. }
  95. }
  96. return false
  97. }
  98. func rejectBody(w http.ResponseWriter, r *http.Request) bool {
  99. if !hasRequestBody(r) {
  100. return true
  101. }
  102. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  103. return false
  104. }
  105. func isAudioStreamContentType(r *http.Request) bool {
  106. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  107. if ct == "" {
  108. return false
  109. }
  110. mediaType, _, err := mime.ParseMediaType(ct)
  111. if err != nil {
  112. return false
  113. }
  114. for _, allowed := range audioStreamAllowedMediaTypes {
  115. if strings.EqualFold(mediaType, allowed) {
  116. return true
  117. }
  118. }
  119. return false
  120. }
  121. func (s *Server) SetTXController(tx TXController) {
  122. s.mu.Lock()
  123. s.tx = tx
  124. s.mu.Unlock()
  125. }
  126. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  127. s.mu.Lock()
  128. s.drv = drv
  129. s.mu.Unlock()
  130. }
  131. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  132. s.mu.Lock()
  133. s.streamSrc = src
  134. s.mu.Unlock()
  135. }
  136. func (s *Server) Handler() http.Handler {
  137. mux := http.NewServeMux()
  138. mux.HandleFunc("/", s.handleUI)
  139. mux.HandleFunc("/healthz", s.handleHealth)
  140. mux.HandleFunc("/status", s.handleStatus)
  141. mux.HandleFunc("/dry-run", s.handleDryRun)
  142. mux.HandleFunc("/config", s.handleConfig)
  143. mux.HandleFunc("/runtime", s.handleRuntime)
  144. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  145. mux.HandleFunc("/tx/start", s.handleTXStart)
  146. mux.HandleFunc("/tx/stop", s.handleTXStop)
  147. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  148. return mux
  149. }
  150. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  151. w.Header().Set("Content-Type", "application/json")
  152. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  153. }
  154. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  155. if r.URL.Path != "/" {
  156. http.NotFound(w, r)
  157. return
  158. }
  159. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  160. w.Header().Set("Cache-Control", "no-cache")
  161. w.Write(uiHTML)
  162. }
  163. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  164. s.mu.RLock()
  165. cfg := s.cfg
  166. tx := s.tx
  167. s.mu.RUnlock()
  168. status := map[string]any{
  169. "service": "fm-rds-tx",
  170. "backend": cfg.Backend.Kind,
  171. "frequencyMHz": cfg.FM.FrequencyMHz,
  172. "stereoEnabled": cfg.FM.StereoEnabled,
  173. "rdsEnabled": cfg.RDS.Enabled,
  174. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  175. "limiterEnabled": cfg.FM.LimiterEnabled,
  176. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  177. }
  178. if tx != nil {
  179. if stats := tx.TXStats(); stats != nil {
  180. if ri, ok := stats["runtimeIndicator"]; ok {
  181. status["runtimeIndicator"] = ri
  182. }
  183. if alert, ok := stats["runtimeAlert"]; ok {
  184. status["runtimeAlert"] = alert
  185. }
  186. if queue, ok := stats["queue"]; ok {
  187. status["queue"] = queue
  188. }
  189. if runtimeState, ok := stats["state"]; ok {
  190. status["runtimeState"] = runtimeState
  191. }
  192. }
  193. }
  194. w.Header().Set("Content-Type", "application/json")
  195. _ = json.NewEncoder(w).Encode(status)
  196. }
  197. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  198. s.mu.RLock()
  199. drv := s.drv
  200. tx := s.tx
  201. stream := s.streamSrc
  202. s.mu.RUnlock()
  203. result := map[string]any{}
  204. if drv != nil {
  205. result["driver"] = drv.Stats()
  206. }
  207. if tx != nil {
  208. result["engine"] = tx.TXStats()
  209. }
  210. if stream != nil {
  211. result["audioStream"] = stream.Stats()
  212. }
  213. w.Header().Set("Content-Type", "application/json")
  214. _ = json.NewEncoder(w).Encode(result)
  215. }
  216. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  217. if r.Method != http.MethodPost {
  218. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  219. return
  220. }
  221. if !rejectBody(w, r) {
  222. return
  223. }
  224. s.mu.RLock()
  225. tx := s.tx
  226. s.mu.RUnlock()
  227. if tx == nil {
  228. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  229. return
  230. }
  231. if err := tx.ResetFault(); err != nil {
  232. http.Error(w, err.Error(), http.StatusConflict)
  233. return
  234. }
  235. w.Header().Set("Content-Type", "application/json")
  236. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  237. }
  238. // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
  239. // it into the live audio ring buffer. Use with:
  240. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  241. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  242. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  243. if r.Method != http.MethodPost {
  244. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  245. return
  246. }
  247. if !isAudioStreamContentType(r) {
  248. http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType)
  249. return
  250. }
  251. s.mu.RLock()
  252. stream := s.streamSrc
  253. s.mu.RUnlock()
  254. if stream == nil {
  255. http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
  256. return
  257. }
  258. // Read body in chunks and push to ring buffer
  259. buf := make([]byte, 32768)
  260. totalFrames := 0
  261. for {
  262. n, err := r.Body.Read(buf)
  263. if n > 0 {
  264. totalFrames += stream.WritePCM(buf[:n])
  265. }
  266. if err != nil {
  267. if err == io.EOF {
  268. break
  269. }
  270. http.Error(w, err.Error(), http.StatusInternalServerError)
  271. return
  272. }
  273. }
  274. w.Header().Set("Content-Type", "application/json")
  275. _ = json.NewEncoder(w).Encode(map[string]any{
  276. "ok": true,
  277. "frames": totalFrames,
  278. "stats": stream.Stats(),
  279. })
  280. }
  281. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  282. if r.Method != http.MethodPost {
  283. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  284. return
  285. }
  286. if !rejectBody(w, r) {
  287. return
  288. }
  289. s.mu.RLock()
  290. tx := s.tx
  291. s.mu.RUnlock()
  292. if tx == nil {
  293. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  294. return
  295. }
  296. if err := tx.StartTX(); err != nil {
  297. http.Error(w, err.Error(), http.StatusConflict)
  298. return
  299. }
  300. w.Header().Set("Content-Type", "application/json")
  301. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  302. }
  303. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  304. if r.Method != http.MethodPost {
  305. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  306. return
  307. }
  308. if !rejectBody(w, r) {
  309. return
  310. }
  311. s.mu.RLock()
  312. tx := s.tx
  313. s.mu.RUnlock()
  314. if tx == nil {
  315. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  316. return
  317. }
  318. if err := tx.StopTX(); err != nil {
  319. http.Error(w, err.Error(), http.StatusInternalServerError)
  320. return
  321. }
  322. w.Header().Set("Content-Type", "application/json")
  323. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"})
  324. }
  325. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  326. s.mu.RLock()
  327. cfg := s.cfg
  328. s.mu.RUnlock()
  329. w.Header().Set("Content-Type", "application/json")
  330. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  331. }
  332. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  333. switch r.Method {
  334. case http.MethodGet:
  335. s.mu.RLock()
  336. cfg := s.cfg
  337. s.mu.RUnlock()
  338. w.Header().Set("Content-Type", "application/json")
  339. _ = json.NewEncoder(w).Encode(cfg)
  340. case http.MethodPost:
  341. if !isJSONContentType(r) {
  342. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  343. return
  344. }
  345. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  346. var patch ConfigPatch
  347. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  348. statusCode := http.StatusBadRequest
  349. if strings.Contains(err.Error(), "http: request body too large") {
  350. statusCode = http.StatusRequestEntityTooLarge
  351. }
  352. http.Error(w, err.Error(), statusCode)
  353. return
  354. }
  355. // Update the server's config snapshot (for GET /config and /status)
  356. s.mu.Lock()
  357. next := s.cfg
  358. if patch.FrequencyMHz != nil {
  359. next.FM.FrequencyMHz = *patch.FrequencyMHz
  360. }
  361. if patch.OutputDrive != nil {
  362. next.FM.OutputDrive = *patch.OutputDrive
  363. }
  364. if patch.ToneLeftHz != nil {
  365. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  366. }
  367. if patch.ToneRightHz != nil {
  368. next.Audio.ToneRightHz = *patch.ToneRightHz
  369. }
  370. if patch.ToneAmplitude != nil {
  371. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  372. }
  373. if patch.PS != nil {
  374. next.RDS.PS = *patch.PS
  375. }
  376. if patch.RadioText != nil {
  377. next.RDS.RadioText = *patch.RadioText
  378. }
  379. if patch.PreEmphasisTauUS != nil {
  380. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  381. }
  382. if patch.StereoEnabled != nil {
  383. next.FM.StereoEnabled = *patch.StereoEnabled
  384. }
  385. if patch.LimiterEnabled != nil {
  386. next.FM.LimiterEnabled = *patch.LimiterEnabled
  387. }
  388. if patch.LimiterCeiling != nil {
  389. next.FM.LimiterCeiling = *patch.LimiterCeiling
  390. }
  391. if patch.RDSEnabled != nil {
  392. next.RDS.Enabled = *patch.RDSEnabled
  393. }
  394. if patch.PilotLevel != nil {
  395. next.FM.PilotLevel = *patch.PilotLevel
  396. }
  397. if patch.RDSInjection != nil {
  398. next.FM.RDSInjection = *patch.RDSInjection
  399. }
  400. if err := next.Validate(); err != nil {
  401. s.mu.Unlock()
  402. http.Error(w, err.Error(), http.StatusBadRequest)
  403. return
  404. }
  405. lp := LivePatch{
  406. FrequencyMHz: patch.FrequencyMHz,
  407. OutputDrive: patch.OutputDrive,
  408. StereoEnabled: patch.StereoEnabled,
  409. PilotLevel: patch.PilotLevel,
  410. RDSInjection: patch.RDSInjection,
  411. RDSEnabled: patch.RDSEnabled,
  412. LimiterEnabled: patch.LimiterEnabled,
  413. LimiterCeiling: patch.LimiterCeiling,
  414. PS: patch.PS,
  415. RadioText: patch.RadioText,
  416. }
  417. tx := s.tx
  418. if tx != nil {
  419. if err := tx.UpdateConfig(lp); err != nil {
  420. s.mu.Unlock()
  421. http.Error(w, err.Error(), http.StatusBadRequest)
  422. return
  423. }
  424. }
  425. s.cfg = next
  426. live := tx != nil
  427. s.mu.Unlock()
  428. w.Header().Set("Content-Type", "application/json")
  429. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  430. default:
  431. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  432. }
  433. }