Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

442 Zeilen
12KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "io"
  6. "mime"
  7. "net/http"
  8. "strings"
  9. "sync"
  10. "github.com/jan/fm-rds-tx/internal/audio"
  11. "github.com/jan/fm-rds-tx/internal/config"
  12. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  13. "github.com/jan/fm-rds-tx/internal/platform"
  14. )
  15. //go:embed ui.html
  16. var uiHTML []byte
  17. // TXController is an optional interface the Server uses to start/stop TX
  18. // and apply live config changes.
  19. type TXController interface {
  20. StartTX() error
  21. StopTX() error
  22. TXStats() map[string]any
  23. UpdateConfig(patch LivePatch) error
  24. ResetFault() error
  25. }
  26. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  27. // nil = no change.
  28. type LivePatch struct {
  29. FrequencyMHz *float64
  30. OutputDrive *float64
  31. StereoEnabled *bool
  32. PilotLevel *float64
  33. RDSInjection *float64
  34. RDSEnabled *bool
  35. LimiterEnabled *bool
  36. LimiterCeiling *float64
  37. PS *string
  38. RadioText *string
  39. }
  40. type Server struct {
  41. mu sync.RWMutex
  42. cfg config.Config
  43. tx TXController
  44. drv platform.SoapyDriver // optional, for runtime stats
  45. streamSrc *audio.StreamSource // optional, for live audio ingest
  46. }
  47. const (
  48. maxConfigBodyBytes = 64 << 10 // 64 KiB
  49. configContentTypeHeader = "application/json"
  50. noBodyErrMsg = "request must not include a body"
  51. )
  52. func isJSONContentType(r *http.Request) bool {
  53. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  54. if ct == "" {
  55. return false
  56. }
  57. mediaType, _, err := mime.ParseMediaType(ct)
  58. if err != nil {
  59. return false
  60. }
  61. return strings.EqualFold(mediaType, configContentTypeHeader)
  62. }
  63. type ConfigPatch struct {
  64. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  65. OutputDrive *float64 `json:"outputDrive,omitempty"`
  66. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  67. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  68. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  69. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  70. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  71. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  72. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  73. PS *string `json:"ps,omitempty"`
  74. RadioText *string `json:"radioText,omitempty"`
  75. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  76. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  77. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  78. }
  79. func NewServer(cfg config.Config) *Server {
  80. return &Server{cfg: cfg}
  81. }
  82. func hasRequestBody(r *http.Request) bool {
  83. if r.ContentLength > 0 {
  84. return true
  85. }
  86. for _, te := range r.TransferEncoding {
  87. if strings.EqualFold(te, "chunked") {
  88. return true
  89. }
  90. }
  91. return false
  92. }
  93. func rejectBody(w http.ResponseWriter, r *http.Request) bool {
  94. if !hasRequestBody(r) {
  95. return true
  96. }
  97. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  98. return false
  99. }
  100. func (s *Server) SetTXController(tx TXController) {
  101. s.mu.Lock()
  102. s.tx = tx
  103. s.mu.Unlock()
  104. }
  105. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  106. s.mu.Lock()
  107. s.drv = drv
  108. s.mu.Unlock()
  109. }
  110. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  111. s.mu.Lock()
  112. s.streamSrc = src
  113. s.mu.Unlock()
  114. }
  115. func (s *Server) Handler() http.Handler {
  116. mux := http.NewServeMux()
  117. mux.HandleFunc("/", s.handleUI)
  118. mux.HandleFunc("/healthz", s.handleHealth)
  119. mux.HandleFunc("/status", s.handleStatus)
  120. mux.HandleFunc("/dry-run", s.handleDryRun)
  121. mux.HandleFunc("/config", s.handleConfig)
  122. mux.HandleFunc("/runtime", s.handleRuntime)
  123. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  124. mux.HandleFunc("/tx/start", s.handleTXStart)
  125. mux.HandleFunc("/tx/stop", s.handleTXStop)
  126. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  127. return mux
  128. }
  129. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  130. w.Header().Set("Content-Type", "application/json")
  131. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  132. }
  133. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  134. if r.URL.Path != "/" {
  135. http.NotFound(w, r)
  136. return
  137. }
  138. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  139. w.Header().Set("Cache-Control", "no-cache")
  140. w.Write(uiHTML)
  141. }
  142. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  143. s.mu.RLock()
  144. cfg := s.cfg
  145. tx := s.tx
  146. s.mu.RUnlock()
  147. status := map[string]any{
  148. "service": "fm-rds-tx",
  149. "backend": cfg.Backend.Kind,
  150. "frequencyMHz": cfg.FM.FrequencyMHz,
  151. "stereoEnabled": cfg.FM.StereoEnabled,
  152. "rdsEnabled": cfg.RDS.Enabled,
  153. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  154. "limiterEnabled": cfg.FM.LimiterEnabled,
  155. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  156. }
  157. if tx != nil {
  158. if stats := tx.TXStats(); stats != nil {
  159. if ri, ok := stats["runtimeIndicator"]; ok {
  160. status["runtimeIndicator"] = ri
  161. }
  162. if alert, ok := stats["runtimeAlert"]; ok {
  163. status["runtimeAlert"] = alert
  164. }
  165. if queue, ok := stats["queue"]; ok {
  166. status["queue"] = queue
  167. }
  168. if runtimeState, ok := stats["state"]; ok {
  169. status["runtimeState"] = runtimeState
  170. }
  171. }
  172. }
  173. w.Header().Set("Content-Type", "application/json")
  174. _ = json.NewEncoder(w).Encode(status)
  175. }
  176. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  177. s.mu.RLock()
  178. drv := s.drv
  179. tx := s.tx
  180. stream := s.streamSrc
  181. s.mu.RUnlock()
  182. result := map[string]any{}
  183. if drv != nil {
  184. result["driver"] = drv.Stats()
  185. }
  186. if tx != nil {
  187. result["engine"] = tx.TXStats()
  188. }
  189. if stream != nil {
  190. result["audioStream"] = stream.Stats()
  191. }
  192. w.Header().Set("Content-Type", "application/json")
  193. _ = json.NewEncoder(w).Encode(result)
  194. }
  195. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  196. if r.Method != http.MethodPost {
  197. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  198. return
  199. }
  200. if !rejectBody(w, r) {
  201. return
  202. }
  203. s.mu.RLock()
  204. tx := s.tx
  205. s.mu.RUnlock()
  206. if tx == nil {
  207. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  208. return
  209. }
  210. if err := tx.ResetFault(); err != nil {
  211. http.Error(w, err.Error(), http.StatusConflict)
  212. return
  213. }
  214. w.Header().Set("Content-Type", "application/json")
  215. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  216. }
  217. // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
  218. // it into the live audio ring buffer. Use with:
  219. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  220. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  221. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  222. if r.Method != http.MethodPost {
  223. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  224. return
  225. }
  226. s.mu.RLock()
  227. stream := s.streamSrc
  228. s.mu.RUnlock()
  229. if stream == nil {
  230. http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
  231. return
  232. }
  233. // Read body in chunks and push to ring buffer
  234. buf := make([]byte, 32768)
  235. totalFrames := 0
  236. for {
  237. n, err := r.Body.Read(buf)
  238. if n > 0 {
  239. totalFrames += stream.WritePCM(buf[:n])
  240. }
  241. if err != nil {
  242. if err == io.EOF {
  243. break
  244. }
  245. http.Error(w, err.Error(), http.StatusInternalServerError)
  246. return
  247. }
  248. }
  249. w.Header().Set("Content-Type", "application/json")
  250. _ = json.NewEncoder(w).Encode(map[string]any{
  251. "ok": true,
  252. "frames": totalFrames,
  253. "stats": stream.Stats(),
  254. })
  255. }
  256. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  257. if r.Method != http.MethodPost {
  258. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  259. return
  260. }
  261. if !rejectBody(w, r) {
  262. return
  263. }
  264. s.mu.RLock()
  265. tx := s.tx
  266. s.mu.RUnlock()
  267. if tx == nil {
  268. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  269. return
  270. }
  271. if err := tx.StartTX(); err != nil {
  272. http.Error(w, err.Error(), http.StatusConflict)
  273. return
  274. }
  275. w.Header().Set("Content-Type", "application/json")
  276. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  277. }
  278. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  279. if r.Method != http.MethodPost {
  280. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  281. return
  282. }
  283. if !rejectBody(w, r) {
  284. return
  285. }
  286. s.mu.RLock()
  287. tx := s.tx
  288. s.mu.RUnlock()
  289. if tx == nil {
  290. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  291. return
  292. }
  293. if err := tx.StopTX(); err != nil {
  294. http.Error(w, err.Error(), http.StatusInternalServerError)
  295. return
  296. }
  297. w.Header().Set("Content-Type", "application/json")
  298. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"})
  299. }
  300. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  301. s.mu.RLock()
  302. cfg := s.cfg
  303. s.mu.RUnlock()
  304. w.Header().Set("Content-Type", "application/json")
  305. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  306. }
  307. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  308. switch r.Method {
  309. case http.MethodGet:
  310. s.mu.RLock()
  311. cfg := s.cfg
  312. s.mu.RUnlock()
  313. w.Header().Set("Content-Type", "application/json")
  314. _ = json.NewEncoder(w).Encode(cfg)
  315. case http.MethodPost:
  316. if !isJSONContentType(r) {
  317. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  318. return
  319. }
  320. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  321. var patch ConfigPatch
  322. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  323. statusCode := http.StatusBadRequest
  324. if strings.Contains(err.Error(), "http: request body too large") {
  325. statusCode = http.StatusRequestEntityTooLarge
  326. }
  327. http.Error(w, err.Error(), statusCode)
  328. return
  329. }
  330. // Update the server's config snapshot (for GET /config and /status)
  331. s.mu.Lock()
  332. next := s.cfg
  333. if patch.FrequencyMHz != nil {
  334. next.FM.FrequencyMHz = *patch.FrequencyMHz
  335. }
  336. if patch.OutputDrive != nil {
  337. next.FM.OutputDrive = *patch.OutputDrive
  338. }
  339. if patch.ToneLeftHz != nil {
  340. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  341. }
  342. if patch.ToneRightHz != nil {
  343. next.Audio.ToneRightHz = *patch.ToneRightHz
  344. }
  345. if patch.ToneAmplitude != nil {
  346. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  347. }
  348. if patch.PS != nil {
  349. next.RDS.PS = *patch.PS
  350. }
  351. if patch.RadioText != nil {
  352. next.RDS.RadioText = *patch.RadioText
  353. }
  354. if patch.PreEmphasisTauUS != nil {
  355. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  356. }
  357. if patch.StereoEnabled != nil {
  358. next.FM.StereoEnabled = *patch.StereoEnabled
  359. }
  360. if patch.LimiterEnabled != nil {
  361. next.FM.LimiterEnabled = *patch.LimiterEnabled
  362. }
  363. if patch.LimiterCeiling != nil {
  364. next.FM.LimiterCeiling = *patch.LimiterCeiling
  365. }
  366. if patch.RDSEnabled != nil {
  367. next.RDS.Enabled = *patch.RDSEnabled
  368. }
  369. if patch.PilotLevel != nil {
  370. next.FM.PilotLevel = *patch.PilotLevel
  371. }
  372. if patch.RDSInjection != nil {
  373. next.FM.RDSInjection = *patch.RDSInjection
  374. }
  375. if err := next.Validate(); err != nil {
  376. s.mu.Unlock()
  377. http.Error(w, err.Error(), http.StatusBadRequest)
  378. return
  379. }
  380. lp := LivePatch{
  381. FrequencyMHz: patch.FrequencyMHz,
  382. OutputDrive: patch.OutputDrive,
  383. StereoEnabled: patch.StereoEnabled,
  384. PilotLevel: patch.PilotLevel,
  385. RDSInjection: patch.RDSInjection,
  386. RDSEnabled: patch.RDSEnabled,
  387. LimiterEnabled: patch.LimiterEnabled,
  388. LimiterCeiling: patch.LimiterCeiling,
  389. PS: patch.PS,
  390. RadioText: patch.RadioText,
  391. }
  392. tx := s.tx
  393. if tx != nil {
  394. if err := tx.UpdateConfig(lp); err != nil {
  395. s.mu.Unlock()
  396. http.Error(w, err.Error(), http.StatusBadRequest)
  397. return
  398. }
  399. }
  400. s.cfg = next
  401. live := tx != nil
  402. s.mu.Unlock()
  403. w.Header().Set("Content-Type", "application/json")
  404. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  405. default:
  406. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  407. }
  408. }