Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

530 wiersze
14KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "mime"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "github.com/jan/fm-rds-tx/internal/audio"
  13. "github.com/jan/fm-rds-tx/internal/config"
  14. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  15. "github.com/jan/fm-rds-tx/internal/platform"
  16. )
  17. //go:embed ui.html
  18. var uiHTML []byte
  19. // TXController is an optional interface the Server uses to start/stop TX
  20. // and apply live config changes.
  21. type TXController interface {
  22. StartTX() error
  23. StopTX() error
  24. TXStats() map[string]any
  25. UpdateConfig(patch LivePatch) error
  26. ResetFault() error
  27. }
  28. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  29. // nil = no change.
  30. type LivePatch struct {
  31. FrequencyMHz *float64
  32. OutputDrive *float64
  33. StereoEnabled *bool
  34. PilotLevel *float64
  35. RDSInjection *float64
  36. RDSEnabled *bool
  37. LimiterEnabled *bool
  38. LimiterCeiling *float64
  39. PS *string
  40. RadioText *string
  41. }
  42. type Server struct {
  43. mu sync.RWMutex
  44. cfg config.Config
  45. tx TXController
  46. drv platform.SoapyDriver // optional, for runtime stats
  47. streamSrc *audio.StreamSource // optional, for live audio ingest
  48. audit auditCounters
  49. }
  50. type auditEvent string
  51. const (
  52. auditMethodNotAllowed auditEvent = "methodNotAllowed"
  53. auditUnsupportedMediaType auditEvent = "unsupportedMediaType"
  54. auditBodyTooLarge auditEvent = "bodyTooLarge"
  55. auditUnexpectedBody auditEvent = "unexpectedBody"
  56. )
  57. type auditCounters struct {
  58. methodNotAllowed uint64
  59. unsupportedMediaType uint64
  60. bodyTooLarge uint64
  61. unexpectedBody uint64
  62. }
  63. const (
  64. maxConfigBodyBytes = 64 << 10 // 64 KiB
  65. configContentTypeHeader = "application/json"
  66. noBodyErrMsg = "request must not include a body"
  67. audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16"
  68. audioStreamBodyLimitDefault = 512 << 20 // 512 MiB
  69. )
  70. var audioStreamAllowedMediaTypes = []string{
  71. "application/octet-stream",
  72. "audio/l16",
  73. }
  74. var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override.
  75. func isJSONContentType(r *http.Request) bool {
  76. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  77. if ct == "" {
  78. return false
  79. }
  80. mediaType, _, err := mime.ParseMediaType(ct)
  81. if err != nil {
  82. return false
  83. }
  84. return strings.EqualFold(mediaType, configContentTypeHeader)
  85. }
  86. type ConfigPatch struct {
  87. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  88. OutputDrive *float64 `json:"outputDrive,omitempty"`
  89. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  90. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  91. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  92. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  93. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  94. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  95. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  96. PS *string `json:"ps,omitempty"`
  97. RadioText *string `json:"radioText,omitempty"`
  98. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  99. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  100. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  101. }
  102. func NewServer(cfg config.Config) *Server {
  103. return &Server{cfg: cfg}
  104. }
  105. func hasRequestBody(r *http.Request) bool {
  106. if r.ContentLength > 0 {
  107. return true
  108. }
  109. for _, te := range r.TransferEncoding {
  110. if strings.EqualFold(te, "chunked") {
  111. return true
  112. }
  113. }
  114. return false
  115. }
  116. func (s *Server) rejectBody(w http.ResponseWriter, r *http.Request) bool {
  117. if !hasRequestBody(r) {
  118. return true
  119. }
  120. s.recordAudit(auditUnexpectedBody)
  121. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  122. return false
  123. }
  124. func (s *Server) recordAudit(evt auditEvent) {
  125. switch evt {
  126. case auditMethodNotAllowed:
  127. atomic.AddUint64(&s.audit.methodNotAllowed, 1)
  128. case auditUnsupportedMediaType:
  129. atomic.AddUint64(&s.audit.unsupportedMediaType, 1)
  130. case auditBodyTooLarge:
  131. atomic.AddUint64(&s.audit.bodyTooLarge, 1)
  132. case auditUnexpectedBody:
  133. atomic.AddUint64(&s.audit.unexpectedBody, 1)
  134. }
  135. }
  136. func (s *Server) auditSnapshot() map[string]uint64 {
  137. return map[string]uint64{
  138. "methodNotAllowed": atomic.LoadUint64(&s.audit.methodNotAllowed),
  139. "unsupportedMediaType": atomic.LoadUint64(&s.audit.unsupportedMediaType),
  140. "bodyTooLarge": atomic.LoadUint64(&s.audit.bodyTooLarge),
  141. "unexpectedBody": atomic.LoadUint64(&s.audit.unexpectedBody),
  142. }
  143. }
  144. func isAudioStreamContentType(r *http.Request) bool {
  145. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  146. if ct == "" {
  147. return false
  148. }
  149. mediaType, _, err := mime.ParseMediaType(ct)
  150. if err != nil {
  151. return false
  152. }
  153. for _, allowed := range audioStreamAllowedMediaTypes {
  154. if strings.EqualFold(mediaType, allowed) {
  155. return true
  156. }
  157. }
  158. return false
  159. }
  160. func (s *Server) SetTXController(tx TXController) {
  161. s.mu.Lock()
  162. s.tx = tx
  163. s.mu.Unlock()
  164. }
  165. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  166. s.mu.Lock()
  167. s.drv = drv
  168. s.mu.Unlock()
  169. }
  170. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  171. s.mu.Lock()
  172. s.streamSrc = src
  173. s.mu.Unlock()
  174. }
  175. func (s *Server) Handler() http.Handler {
  176. mux := http.NewServeMux()
  177. mux.HandleFunc("/", s.handleUI)
  178. mux.HandleFunc("/healthz", s.handleHealth)
  179. mux.HandleFunc("/status", s.handleStatus)
  180. mux.HandleFunc("/dry-run", s.handleDryRun)
  181. mux.HandleFunc("/config", s.handleConfig)
  182. mux.HandleFunc("/runtime", s.handleRuntime)
  183. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  184. mux.HandleFunc("/tx/start", s.handleTXStart)
  185. mux.HandleFunc("/tx/stop", s.handleTXStop)
  186. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  187. return mux
  188. }
  189. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  190. w.Header().Set("Content-Type", "application/json")
  191. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  192. }
  193. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  194. if r.URL.Path != "/" {
  195. http.NotFound(w, r)
  196. return
  197. }
  198. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  199. w.Header().Set("Cache-Control", "no-cache")
  200. w.Write(uiHTML)
  201. }
  202. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  203. s.mu.RLock()
  204. cfg := s.cfg
  205. tx := s.tx
  206. s.mu.RUnlock()
  207. status := map[string]any{
  208. "service": "fm-rds-tx",
  209. "backend": cfg.Backend.Kind,
  210. "frequencyMHz": cfg.FM.FrequencyMHz,
  211. "stereoEnabled": cfg.FM.StereoEnabled,
  212. "rdsEnabled": cfg.RDS.Enabled,
  213. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  214. "limiterEnabled": cfg.FM.LimiterEnabled,
  215. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  216. }
  217. if tx != nil {
  218. if stats := tx.TXStats(); stats != nil {
  219. if ri, ok := stats["runtimeIndicator"]; ok {
  220. status["runtimeIndicator"] = ri
  221. }
  222. if alert, ok := stats["runtimeAlert"]; ok {
  223. status["runtimeAlert"] = alert
  224. }
  225. if queue, ok := stats["queue"]; ok {
  226. status["queue"] = queue
  227. }
  228. if runtimeState, ok := stats["state"]; ok {
  229. status["runtimeState"] = runtimeState
  230. }
  231. }
  232. }
  233. w.Header().Set("Content-Type", "application/json")
  234. _ = json.NewEncoder(w).Encode(status)
  235. }
  236. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  237. s.mu.RLock()
  238. drv := s.drv
  239. tx := s.tx
  240. stream := s.streamSrc
  241. s.mu.RUnlock()
  242. result := map[string]any{}
  243. if drv != nil {
  244. result["driver"] = drv.Stats()
  245. }
  246. if tx != nil {
  247. result["engine"] = tx.TXStats()
  248. }
  249. if stream != nil {
  250. result["audioStream"] = stream.Stats()
  251. }
  252. result["controlAudit"] = s.auditSnapshot()
  253. w.Header().Set("Content-Type", "application/json")
  254. _ = json.NewEncoder(w).Encode(result)
  255. }
  256. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  257. if r.Method != http.MethodPost {
  258. s.recordAudit(auditMethodNotAllowed)
  259. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  260. return
  261. }
  262. if !s.rejectBody(w, r) {
  263. return
  264. }
  265. s.mu.RLock()
  266. tx := s.tx
  267. s.mu.RUnlock()
  268. if tx == nil {
  269. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  270. return
  271. }
  272. if err := tx.ResetFault(); err != nil {
  273. http.Error(w, err.Error(), http.StatusConflict)
  274. return
  275. }
  276. w.Header().Set("Content-Type", "application/json")
  277. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  278. }
  279. // handleAudioStream accepts raw S16LE stereo PCM via HTTP POST and pushes
  280. // it into the live audio ring buffer. Use with:
  281. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  282. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  283. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  284. if r.Method != http.MethodPost {
  285. s.recordAudit(auditMethodNotAllowed)
  286. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  287. return
  288. }
  289. if !isAudioStreamContentType(r) {
  290. s.recordAudit(auditUnsupportedMediaType)
  291. http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType)
  292. return
  293. }
  294. s.mu.RLock()
  295. stream := s.streamSrc
  296. s.mu.RUnlock()
  297. if stream == nil {
  298. http.Error(w, "audio stream not configured (use --audio-stdin or --audio-http)", http.StatusServiceUnavailable)
  299. return
  300. }
  301. r.Body = http.MaxBytesReader(w, r.Body, audioStreamBodyLimit)
  302. // Read body in chunks and push to ring buffer
  303. buf := make([]byte, 32768)
  304. totalFrames := 0
  305. for {
  306. n, err := r.Body.Read(buf)
  307. if n > 0 {
  308. totalFrames += stream.WritePCM(buf[:n])
  309. }
  310. if err != nil {
  311. if err == io.EOF {
  312. break
  313. }
  314. var maxErr *http.MaxBytesError
  315. if errors.As(err, &maxErr) {
  316. s.recordAudit(auditBodyTooLarge)
  317. http.Error(w, maxErr.Error(), http.StatusRequestEntityTooLarge)
  318. return
  319. }
  320. http.Error(w, err.Error(), http.StatusInternalServerError)
  321. return
  322. }
  323. }
  324. w.Header().Set("Content-Type", "application/json")
  325. _ = json.NewEncoder(w).Encode(map[string]any{
  326. "ok": true,
  327. "frames": totalFrames,
  328. "stats": stream.Stats(),
  329. })
  330. }
  331. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  332. if r.Method != http.MethodPost {
  333. s.recordAudit(auditMethodNotAllowed)
  334. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  335. return
  336. }
  337. if !s.rejectBody(w, r) {
  338. return
  339. }
  340. s.mu.RLock()
  341. tx := s.tx
  342. s.mu.RUnlock()
  343. if tx == nil {
  344. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  345. return
  346. }
  347. if err := tx.StartTX(); err != nil {
  348. http.Error(w, err.Error(), http.StatusConflict)
  349. return
  350. }
  351. w.Header().Set("Content-Type", "application/json")
  352. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  353. }
  354. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  355. if r.Method != http.MethodPost {
  356. s.recordAudit(auditMethodNotAllowed)
  357. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  358. return
  359. }
  360. if !s.rejectBody(w, r) {
  361. return
  362. }
  363. s.mu.RLock()
  364. tx := s.tx
  365. s.mu.RUnlock()
  366. if tx == nil {
  367. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  368. return
  369. }
  370. if err := tx.StopTX(); err != nil {
  371. http.Error(w, err.Error(), http.StatusInternalServerError)
  372. return
  373. }
  374. w.Header().Set("Content-Type", "application/json")
  375. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"})
  376. }
  377. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  378. s.mu.RLock()
  379. cfg := s.cfg
  380. s.mu.RUnlock()
  381. w.Header().Set("Content-Type", "application/json")
  382. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  383. }
  384. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  385. switch r.Method {
  386. case http.MethodGet:
  387. s.mu.RLock()
  388. cfg := s.cfg
  389. s.mu.RUnlock()
  390. w.Header().Set("Content-Type", "application/json")
  391. _ = json.NewEncoder(w).Encode(cfg)
  392. case http.MethodPost:
  393. if !isJSONContentType(r) {
  394. s.recordAudit(auditUnsupportedMediaType)
  395. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  396. return
  397. }
  398. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  399. var patch ConfigPatch
  400. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  401. statusCode := http.StatusBadRequest
  402. if strings.Contains(err.Error(), "http: request body too large") {
  403. statusCode = http.StatusRequestEntityTooLarge
  404. s.recordAudit(auditBodyTooLarge)
  405. }
  406. http.Error(w, err.Error(), statusCode)
  407. return
  408. }
  409. // Update the server's config snapshot (for GET /config and /status)
  410. s.mu.Lock()
  411. next := s.cfg
  412. if patch.FrequencyMHz != nil {
  413. next.FM.FrequencyMHz = *patch.FrequencyMHz
  414. }
  415. if patch.OutputDrive != nil {
  416. next.FM.OutputDrive = *patch.OutputDrive
  417. }
  418. if patch.ToneLeftHz != nil {
  419. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  420. }
  421. if patch.ToneRightHz != nil {
  422. next.Audio.ToneRightHz = *patch.ToneRightHz
  423. }
  424. if patch.ToneAmplitude != nil {
  425. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  426. }
  427. if patch.PS != nil {
  428. next.RDS.PS = *patch.PS
  429. }
  430. if patch.RadioText != nil {
  431. next.RDS.RadioText = *patch.RadioText
  432. }
  433. if patch.PreEmphasisTauUS != nil {
  434. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  435. }
  436. if patch.StereoEnabled != nil {
  437. next.FM.StereoEnabled = *patch.StereoEnabled
  438. }
  439. if patch.LimiterEnabled != nil {
  440. next.FM.LimiterEnabled = *patch.LimiterEnabled
  441. }
  442. if patch.LimiterCeiling != nil {
  443. next.FM.LimiterCeiling = *patch.LimiterCeiling
  444. }
  445. if patch.RDSEnabled != nil {
  446. next.RDS.Enabled = *patch.RDSEnabled
  447. }
  448. if patch.PilotLevel != nil {
  449. next.FM.PilotLevel = *patch.PilotLevel
  450. }
  451. if patch.RDSInjection != nil {
  452. next.FM.RDSInjection = *patch.RDSInjection
  453. }
  454. if err := next.Validate(); err != nil {
  455. s.mu.Unlock()
  456. http.Error(w, err.Error(), http.StatusBadRequest)
  457. return
  458. }
  459. lp := LivePatch{
  460. FrequencyMHz: patch.FrequencyMHz,
  461. OutputDrive: patch.OutputDrive,
  462. StereoEnabled: patch.StereoEnabled,
  463. PilotLevel: patch.PilotLevel,
  464. RDSInjection: patch.RDSInjection,
  465. RDSEnabled: patch.RDSEnabled,
  466. LimiterEnabled: patch.LimiterEnabled,
  467. LimiterCeiling: patch.LimiterCeiling,
  468. PS: patch.PS,
  469. RadioText: patch.RadioText,
  470. }
  471. tx := s.tx
  472. if tx != nil {
  473. if err := tx.UpdateConfig(lp); err != nil {
  474. s.mu.Unlock()
  475. http.Error(w, err.Error(), http.StatusBadRequest)
  476. return
  477. }
  478. }
  479. s.cfg = next
  480. live := tx != nil
  481. s.mu.Unlock()
  482. w.Header().Set("Content-Type", "application/json")
  483. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  484. default:
  485. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  486. }
  487. }