Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

392 Zeilen
11KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "io"
  6. "net/http"
  7. "strings"
  8. "sync"
  9. "github.com/jan/fm-rds-tx/internal/audio"
  10. "github.com/jan/fm-rds-tx/internal/config"
  11. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  12. "github.com/jan/fm-rds-tx/internal/platform"
  13. )
  14. //go:embed ui.html
  15. var uiHTML []byte
  16. // TXController is an optional interface the Server uses to start/stop TX
  17. // and apply live config changes.
  18. type TXController interface {
  19. StartTX() error
  20. StopTX() error
  21. TXStats() map[string]any
  22. UpdateConfig(patch LivePatch) error
  23. ResetFault() error
  24. }
  25. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  26. // nil = no change.
  27. type LivePatch struct {
  28. FrequencyMHz *float64
  29. OutputDrive *float64
  30. StereoEnabled *bool
  31. PilotLevel *float64
  32. RDSInjection *float64
  33. RDSEnabled *bool
  34. LimiterEnabled *bool
  35. LimiterCeiling *float64
  36. PS *string
  37. RadioText *string
  38. }
  39. type Server struct {
  40. mu sync.RWMutex
  41. cfg config.Config
  42. tx TXController
  43. drv platform.SoapyDriver // optional, for runtime stats
  44. streamSrc *audio.StreamSource // optional, for live audio ingest
  45. }
  46. const maxConfigBodyBytes = 64 << 10 // 64 KiB
  47. type ConfigPatch struct {
  48. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  49. OutputDrive *float64 `json:"outputDrive,omitempty"`
  50. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  51. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  52. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  53. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  54. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  55. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  56. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  57. PS *string `json:"ps,omitempty"`
  58. RadioText *string `json:"radioText,omitempty"`
  59. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  60. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  61. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  62. }
  63. func NewServer(cfg config.Config) *Server {
  64. return &Server{cfg: cfg}
  65. }
  66. func (s *Server) SetTXController(tx TXController) {
  67. s.mu.Lock()
  68. s.tx = tx
  69. s.mu.Unlock()
  70. }
  71. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  72. s.mu.Lock()
  73. s.drv = drv
  74. s.mu.Unlock()
  75. }
  76. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  77. s.mu.Lock()
  78. s.streamSrc = src
  79. s.mu.Unlock()
  80. }
  81. func (s *Server) Handler() http.Handler {
  82. mux := http.NewServeMux()
  83. mux.HandleFunc("/", s.handleUI)
  84. mux.HandleFunc("/healthz", s.handleHealth)
  85. mux.HandleFunc("/status", s.handleStatus)
  86. mux.HandleFunc("/dry-run", s.handleDryRun)
  87. mux.HandleFunc("/config", s.handleConfig)
  88. mux.HandleFunc("/runtime", s.handleRuntime)
  89. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  90. mux.HandleFunc("/tx/start", s.handleTXStart)
  91. mux.HandleFunc("/tx/stop", s.handleTXStop)
  92. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  93. return mux
  94. }
  95. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  96. w.Header().Set("Content-Type", "application/json")
  97. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  98. }
  99. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  100. if r.URL.Path != "/" {
  101. http.NotFound(w, r)
  102. return
  103. }
  104. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  105. w.Header().Set("Cache-Control", "no-cache")
  106. w.Write(uiHTML)
  107. }
  108. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  109. s.mu.RLock()
  110. cfg := s.cfg
  111. tx := s.tx
  112. s.mu.RUnlock()
  113. status := map[string]any{
  114. "service": "fm-rds-tx",
  115. "backend": cfg.Backend.Kind,
  116. "frequencyMHz": cfg.FM.FrequencyMHz,
  117. "stereoEnabled": cfg.FM.StereoEnabled,
  118. "rdsEnabled": cfg.RDS.Enabled,
  119. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  120. "limiterEnabled": cfg.FM.LimiterEnabled,
  121. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  122. }
  123. if tx != nil {
  124. if stats := tx.TXStats(); stats != nil {
  125. if ri, ok := stats["runtimeIndicator"]; ok {
  126. status["runtimeIndicator"] = ri
  127. }
  128. if alert, ok := stats["runtimeAlert"]; ok {
  129. status["runtimeAlert"] = alert
  130. }
  131. if queue, ok := stats["queue"]; ok {
  132. status["queue"] = queue
  133. }
  134. if runtimeState, ok := stats["state"]; ok {
  135. status["runtimeState"] = runtimeState
  136. }
  137. }
  138. }
  139. w.Header().Set("Content-Type", "application/json")
  140. _ = json.NewEncoder(w).Encode(status)
  141. }
  142. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  143. s.mu.RLock()
  144. drv := s.drv
  145. tx := s.tx
  146. stream := s.streamSrc
  147. s.mu.RUnlock()
  148. result := map[string]any{}
  149. if drv != nil {
  150. result["driver"] = drv.Stats()
  151. }
  152. if tx != nil {
  153. result["engine"] = tx.TXStats()
  154. }
  155. if stream != nil {
  156. result["audioStream"] = stream.Stats()
  157. }
  158. w.Header().Set("Content-Type", "application/json")
  159. _ = json.NewEncoder(w).Encode(result)
  160. }
  161. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  162. if r.Method != http.MethodPost {
  163. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  164. return
  165. }
  166. s.mu.RLock()
  167. tx := s.tx
  168. s.mu.RUnlock()
  169. if tx == nil {
  170. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  171. return
  172. }
  173. if err := tx.ResetFault(); err != nil {
  174. http.Error(w, err.Error(), http.StatusConflict)
  175. return
  176. }
  177. w.Header().Set("Content-Type", "application/json")
  178. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  179. }
  180. // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
  181. // it into the live audio ring buffer. Use with:
  182. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  183. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  184. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  185. if r.Method != http.MethodPost {
  186. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  187. return
  188. }
  189. s.mu.RLock()
  190. stream := s.streamSrc
  191. s.mu.RUnlock()
  192. if stream == nil {
  193. http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
  194. return
  195. }
  196. // Read body in chunks and push to ring buffer
  197. buf := make([]byte, 32768)
  198. totalFrames := 0
  199. for {
  200. n, err := r.Body.Read(buf)
  201. if n > 0 {
  202. totalFrames += stream.WritePCM(buf[:n])
  203. }
  204. if err != nil {
  205. if err == io.EOF {
  206. break
  207. }
  208. http.Error(w, err.Error(), http.StatusInternalServerError)
  209. return
  210. }
  211. }
  212. w.Header().Set("Content-Type", "application/json")
  213. _ = json.NewEncoder(w).Encode(map[string]any{
  214. "ok": true,
  215. "frames": totalFrames,
  216. "stats": stream.Stats(),
  217. })
  218. }
  219. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  220. if r.Method != http.MethodPost {
  221. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  222. return
  223. }
  224. s.mu.RLock()
  225. tx := s.tx
  226. s.mu.RUnlock()
  227. if tx == nil {
  228. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  229. return
  230. }
  231. if err := tx.StartTX(); err != nil {
  232. http.Error(w, err.Error(), http.StatusConflict)
  233. return
  234. }
  235. w.Header().Set("Content-Type", "application/json")
  236. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  237. }
  238. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  239. if r.Method != http.MethodPost {
  240. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  241. return
  242. }
  243. s.mu.RLock()
  244. tx := s.tx
  245. s.mu.RUnlock()
  246. if tx == nil {
  247. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  248. return
  249. }
  250. if err := tx.StopTX(); err != nil {
  251. http.Error(w, err.Error(), http.StatusInternalServerError)
  252. return
  253. }
  254. w.Header().Set("Content-Type", "application/json")
  255. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"})
  256. }
  257. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  258. s.mu.RLock()
  259. cfg := s.cfg
  260. s.mu.RUnlock()
  261. w.Header().Set("Content-Type", "application/json")
  262. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  263. }
  264. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  265. switch r.Method {
  266. case http.MethodGet:
  267. s.mu.RLock()
  268. cfg := s.cfg
  269. s.mu.RUnlock()
  270. w.Header().Set("Content-Type", "application/json")
  271. _ = json.NewEncoder(w).Encode(cfg)
  272. case http.MethodPost:
  273. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  274. var patch ConfigPatch
  275. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  276. statusCode := http.StatusBadRequest
  277. if strings.Contains(err.Error(), "http: request body too large") {
  278. statusCode = http.StatusRequestEntityTooLarge
  279. }
  280. http.Error(w, err.Error(), statusCode)
  281. return
  282. }
  283. // Update the server's config snapshot (for GET /config and /status)
  284. s.mu.Lock()
  285. next := s.cfg
  286. if patch.FrequencyMHz != nil {
  287. next.FM.FrequencyMHz = *patch.FrequencyMHz
  288. }
  289. if patch.OutputDrive != nil {
  290. next.FM.OutputDrive = *patch.OutputDrive
  291. }
  292. if patch.ToneLeftHz != nil {
  293. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  294. }
  295. if patch.ToneRightHz != nil {
  296. next.Audio.ToneRightHz = *patch.ToneRightHz
  297. }
  298. if patch.ToneAmplitude != nil {
  299. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  300. }
  301. if patch.PS != nil {
  302. next.RDS.PS = *patch.PS
  303. }
  304. if patch.RadioText != nil {
  305. next.RDS.RadioText = *patch.RadioText
  306. }
  307. if patch.PreEmphasisTauUS != nil {
  308. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  309. }
  310. if patch.StereoEnabled != nil {
  311. next.FM.StereoEnabled = *patch.StereoEnabled
  312. }
  313. if patch.LimiterEnabled != nil {
  314. next.FM.LimiterEnabled = *patch.LimiterEnabled
  315. }
  316. if patch.LimiterCeiling != nil {
  317. next.FM.LimiterCeiling = *patch.LimiterCeiling
  318. }
  319. if patch.RDSEnabled != nil {
  320. next.RDS.Enabled = *patch.RDSEnabled
  321. }
  322. if patch.PilotLevel != nil {
  323. next.FM.PilotLevel = *patch.PilotLevel
  324. }
  325. if patch.RDSInjection != nil {
  326. next.FM.RDSInjection = *patch.RDSInjection
  327. }
  328. if err := next.Validate(); err != nil {
  329. s.mu.Unlock()
  330. http.Error(w, err.Error(), http.StatusBadRequest)
  331. return
  332. }
  333. lp := LivePatch{
  334. FrequencyMHz: patch.FrequencyMHz,
  335. OutputDrive: patch.OutputDrive,
  336. StereoEnabled: patch.StereoEnabled,
  337. PilotLevel: patch.PilotLevel,
  338. RDSInjection: patch.RDSInjection,
  339. RDSEnabled: patch.RDSEnabled,
  340. LimiterEnabled: patch.LimiterEnabled,
  341. LimiterCeiling: patch.LimiterCeiling,
  342. PS: patch.PS,
  343. RadioText: patch.RadioText,
  344. }
  345. tx := s.tx
  346. if tx != nil {
  347. if err := tx.UpdateConfig(lp); err != nil {
  348. s.mu.Unlock()
  349. http.Error(w, err.Error(), http.StatusBadRequest)
  350. return
  351. }
  352. }
  353. s.cfg = next
  354. live := tx != nil
  355. s.mu.Unlock()
  356. w.Header().Set("Content-Type", "application/json")
  357. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  358. default:
  359. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  360. }
  361. }