Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

480 строки
13KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "mime"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "github.com/jan/fm-rds-tx/internal/audio"
  12. "github.com/jan/fm-rds-tx/internal/config"
  13. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  14. "github.com/jan/fm-rds-tx/internal/platform"
  15. )
  16. //go:embed ui.html
  17. var uiHTML []byte
  18. // TXController is an optional interface the Server uses to start/stop TX
  19. // and apply live config changes.
  20. type TXController interface {
  21. StartTX() error
  22. StopTX() error
  23. TXStats() map[string]any
  24. UpdateConfig(patch LivePatch) error
  25. ResetFault() error
  26. }
  27. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  28. // nil = no change.
  29. type LivePatch struct {
  30. FrequencyMHz *float64
  31. OutputDrive *float64
  32. StereoEnabled *bool
  33. PilotLevel *float64
  34. RDSInjection *float64
  35. RDSEnabled *bool
  36. LimiterEnabled *bool
  37. LimiterCeiling *float64
  38. PS *string
  39. RadioText *string
  40. }
  41. type Server struct {
  42. mu sync.RWMutex
  43. cfg config.Config
  44. tx TXController
  45. drv platform.SoapyDriver // optional, for runtime stats
  46. streamSrc *audio.StreamSource // optional, for live audio ingest
  47. }
  48. const (
  49. maxConfigBodyBytes = 64 << 10 // 64 KiB
  50. configContentTypeHeader = "application/json"
  51. noBodyErrMsg = "request must not include a body"
  52. audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16"
  53. audioStreamBodyLimitDefault = 512 << 20 // 512 MiB
  54. )
  55. var audioStreamAllowedMediaTypes = []string{
  56. "application/octet-stream",
  57. "audio/l16",
  58. }
  59. var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override.
  60. func isJSONContentType(r *http.Request) bool {
  61. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  62. if ct == "" {
  63. return false
  64. }
  65. mediaType, _, err := mime.ParseMediaType(ct)
  66. if err != nil {
  67. return false
  68. }
  69. return strings.EqualFold(mediaType, configContentTypeHeader)
  70. }
  71. type ConfigPatch struct {
  72. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  73. OutputDrive *float64 `json:"outputDrive,omitempty"`
  74. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  75. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  76. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  77. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  78. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  79. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  80. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  81. PS *string `json:"ps,omitempty"`
  82. RadioText *string `json:"radioText,omitempty"`
  83. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  84. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  85. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  86. }
  87. func NewServer(cfg config.Config) *Server {
  88. return &Server{cfg: cfg}
  89. }
  90. func hasRequestBody(r *http.Request) bool {
  91. if r.ContentLength > 0 {
  92. return true
  93. }
  94. for _, te := range r.TransferEncoding {
  95. if strings.EqualFold(te, "chunked") {
  96. return true
  97. }
  98. }
  99. return false
  100. }
  101. func rejectBody(w http.ResponseWriter, r *http.Request) bool {
  102. if !hasRequestBody(r) {
  103. return true
  104. }
  105. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  106. return false
  107. }
  108. func isAudioStreamContentType(r *http.Request) bool {
  109. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  110. if ct == "" {
  111. return false
  112. }
  113. mediaType, _, err := mime.ParseMediaType(ct)
  114. if err != nil {
  115. return false
  116. }
  117. for _, allowed := range audioStreamAllowedMediaTypes {
  118. if strings.EqualFold(mediaType, allowed) {
  119. return true
  120. }
  121. }
  122. return false
  123. }
  124. func (s *Server) SetTXController(tx TXController) {
  125. s.mu.Lock()
  126. s.tx = tx
  127. s.mu.Unlock()
  128. }
  129. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  130. s.mu.Lock()
  131. s.drv = drv
  132. s.mu.Unlock()
  133. }
  134. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  135. s.mu.Lock()
  136. s.streamSrc = src
  137. s.mu.Unlock()
  138. }
  139. func (s *Server) Handler() http.Handler {
  140. mux := http.NewServeMux()
  141. mux.HandleFunc("/", s.handleUI)
  142. mux.HandleFunc("/healthz", s.handleHealth)
  143. mux.HandleFunc("/status", s.handleStatus)
  144. mux.HandleFunc("/dry-run", s.handleDryRun)
  145. mux.HandleFunc("/config", s.handleConfig)
  146. mux.HandleFunc("/runtime", s.handleRuntime)
  147. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  148. mux.HandleFunc("/tx/start", s.handleTXStart)
  149. mux.HandleFunc("/tx/stop", s.handleTXStop)
  150. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  151. return mux
  152. }
  153. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  154. w.Header().Set("Content-Type", "application/json")
  155. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  156. }
  157. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  158. if r.URL.Path != "/" {
  159. http.NotFound(w, r)
  160. return
  161. }
  162. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  163. w.Header().Set("Cache-Control", "no-cache")
  164. w.Write(uiHTML)
  165. }
  166. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  167. s.mu.RLock()
  168. cfg := s.cfg
  169. tx := s.tx
  170. s.mu.RUnlock()
  171. status := map[string]any{
  172. "service": "fm-rds-tx",
  173. "backend": cfg.Backend.Kind,
  174. "frequencyMHz": cfg.FM.FrequencyMHz,
  175. "stereoEnabled": cfg.FM.StereoEnabled,
  176. "rdsEnabled": cfg.RDS.Enabled,
  177. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  178. "limiterEnabled": cfg.FM.LimiterEnabled,
  179. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  180. }
  181. if tx != nil {
  182. if stats := tx.TXStats(); stats != nil {
  183. if ri, ok := stats["runtimeIndicator"]; ok {
  184. status["runtimeIndicator"] = ri
  185. }
  186. if alert, ok := stats["runtimeAlert"]; ok {
  187. status["runtimeAlert"] = alert
  188. }
  189. if queue, ok := stats["queue"]; ok {
  190. status["queue"] = queue
  191. }
  192. if runtimeState, ok := stats["state"]; ok {
  193. status["runtimeState"] = runtimeState
  194. }
  195. }
  196. }
  197. w.Header().Set("Content-Type", "application/json")
  198. _ = json.NewEncoder(w).Encode(status)
  199. }
  200. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  201. s.mu.RLock()
  202. drv := s.drv
  203. tx := s.tx
  204. stream := s.streamSrc
  205. s.mu.RUnlock()
  206. result := map[string]any{}
  207. if drv != nil {
  208. result["driver"] = drv.Stats()
  209. }
  210. if tx != nil {
  211. result["engine"] = tx.TXStats()
  212. }
  213. if stream != nil {
  214. result["audioStream"] = stream.Stats()
  215. }
  216. w.Header().Set("Content-Type", "application/json")
  217. _ = json.NewEncoder(w).Encode(result)
  218. }
  219. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  220. if r.Method != http.MethodPost {
  221. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  222. return
  223. }
  224. if !rejectBody(w, r) {
  225. return
  226. }
  227. s.mu.RLock()
  228. tx := s.tx
  229. s.mu.RUnlock()
  230. if tx == nil {
  231. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  232. return
  233. }
  234. if err := tx.ResetFault(); err != nil {
  235. http.Error(w, err.Error(), http.StatusConflict)
  236. return
  237. }
  238. w.Header().Set("Content-Type", "application/json")
  239. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  240. }
  241. // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
  242. // it into the live audio ring buffer. Use with:
  243. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  244. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  245. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  246. if r.Method != http.MethodPost {
  247. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  248. return
  249. }
  250. if !isAudioStreamContentType(r) {
  251. http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType)
  252. return
  253. }
  254. s.mu.RLock()
  255. stream := s.streamSrc
  256. s.mu.RUnlock()
  257. if stream == nil {
  258. http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
  259. return
  260. }
  261. r.Body = http.MaxBytesReader(w, r.Body, audioStreamBodyLimit)
  262. // Read body in chunks and push to ring buffer
  263. buf := make([]byte, 32768)
  264. totalFrames := 0
  265. for {
  266. n, err := r.Body.Read(buf)
  267. if n > 0 {
  268. totalFrames += stream.WritePCM(buf[:n])
  269. }
  270. if err != nil {
  271. if err == io.EOF {
  272. break
  273. }
  274. var maxErr *http.MaxBytesError
  275. if errors.As(err, &maxErr) {
  276. http.Error(w, maxErr.Error(), http.StatusRequestEntityTooLarge)
  277. return
  278. }
  279. http.Error(w, err.Error(), http.StatusInternalServerError)
  280. return
  281. }
  282. }
  283. w.Header().Set("Content-Type", "application/json")
  284. _ = json.NewEncoder(w).Encode(map[string]any{
  285. "ok": true,
  286. "frames": totalFrames,
  287. "stats": stream.Stats(),
  288. })
  289. }
  290. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  291. if r.Method != http.MethodPost {
  292. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  293. return
  294. }
  295. if !rejectBody(w, r) {
  296. return
  297. }
  298. s.mu.RLock()
  299. tx := s.tx
  300. s.mu.RUnlock()
  301. if tx == nil {
  302. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  303. return
  304. }
  305. if err := tx.StartTX(); err != nil {
  306. http.Error(w, err.Error(), http.StatusConflict)
  307. return
  308. }
  309. w.Header().Set("Content-Type", "application/json")
  310. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  311. }
  312. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  313. if r.Method != http.MethodPost {
  314. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  315. return
  316. }
  317. if !rejectBody(w, r) {
  318. return
  319. }
  320. s.mu.RLock()
  321. tx := s.tx
  322. s.mu.RUnlock()
  323. if tx == nil {
  324. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  325. return
  326. }
  327. if err := tx.StopTX(); err != nil {
  328. http.Error(w, err.Error(), http.StatusInternalServerError)
  329. return
  330. }
  331. w.Header().Set("Content-Type", "application/json")
  332. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"})
  333. }
  334. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  335. s.mu.RLock()
  336. cfg := s.cfg
  337. s.mu.RUnlock()
  338. w.Header().Set("Content-Type", "application/json")
  339. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  340. }
  341. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  342. switch r.Method {
  343. case http.MethodGet:
  344. s.mu.RLock()
  345. cfg := s.cfg
  346. s.mu.RUnlock()
  347. w.Header().Set("Content-Type", "application/json")
  348. _ = json.NewEncoder(w).Encode(cfg)
  349. case http.MethodPost:
  350. if !isJSONContentType(r) {
  351. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  352. return
  353. }
  354. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  355. var patch ConfigPatch
  356. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  357. statusCode := http.StatusBadRequest
  358. if strings.Contains(err.Error(), "http: request body too large") {
  359. statusCode = http.StatusRequestEntityTooLarge
  360. }
  361. http.Error(w, err.Error(), statusCode)
  362. return
  363. }
  364. // Update the server's config snapshot (for GET /config and /status)
  365. s.mu.Lock()
  366. next := s.cfg
  367. if patch.FrequencyMHz != nil {
  368. next.FM.FrequencyMHz = *patch.FrequencyMHz
  369. }
  370. if patch.OutputDrive != nil {
  371. next.FM.OutputDrive = *patch.OutputDrive
  372. }
  373. if patch.ToneLeftHz != nil {
  374. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  375. }
  376. if patch.ToneRightHz != nil {
  377. next.Audio.ToneRightHz = *patch.ToneRightHz
  378. }
  379. if patch.ToneAmplitude != nil {
  380. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  381. }
  382. if patch.PS != nil {
  383. next.RDS.PS = *patch.PS
  384. }
  385. if patch.RadioText != nil {
  386. next.RDS.RadioText = *patch.RadioText
  387. }
  388. if patch.PreEmphasisTauUS != nil {
  389. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  390. }
  391. if patch.StereoEnabled != nil {
  392. next.FM.StereoEnabled = *patch.StereoEnabled
  393. }
  394. if patch.LimiterEnabled != nil {
  395. next.FM.LimiterEnabled = *patch.LimiterEnabled
  396. }
  397. if patch.LimiterCeiling != nil {
  398. next.FM.LimiterCeiling = *patch.LimiterCeiling
  399. }
  400. if patch.RDSEnabled != nil {
  401. next.RDS.Enabled = *patch.RDSEnabled
  402. }
  403. if patch.PilotLevel != nil {
  404. next.FM.PilotLevel = *patch.PilotLevel
  405. }
  406. if patch.RDSInjection != nil {
  407. next.FM.RDSInjection = *patch.RDSInjection
  408. }
  409. if err := next.Validate(); err != nil {
  410. s.mu.Unlock()
  411. http.Error(w, err.Error(), http.StatusBadRequest)
  412. return
  413. }
  414. lp := LivePatch{
  415. FrequencyMHz: patch.FrequencyMHz,
  416. OutputDrive: patch.OutputDrive,
  417. StereoEnabled: patch.StereoEnabled,
  418. PilotLevel: patch.PilotLevel,
  419. RDSInjection: patch.RDSInjection,
  420. RDSEnabled: patch.RDSEnabled,
  421. LimiterEnabled: patch.LimiterEnabled,
  422. LimiterCeiling: patch.LimiterCeiling,
  423. PS: patch.PS,
  424. RadioText: patch.RadioText,
  425. }
  426. tx := s.tx
  427. if tx != nil {
  428. if err := tx.UpdateConfig(lp); err != nil {
  429. s.mu.Unlock()
  430. http.Error(w, err.Error(), http.StatusBadRequest)
  431. return
  432. }
  433. }
  434. s.cfg = next
  435. live := tx != nil
  436. s.mu.Unlock()
  437. w.Header().Set("Content-Type", "application/json")
  438. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  439. default:
  440. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  441. }
  442. }