Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

678 строки
18KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "mime"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jan/fm-rds-tx/internal/audio"
  14. "github.com/jan/fm-rds-tx/internal/config"
  15. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  16. "github.com/jan/fm-rds-tx/internal/ingest"
  17. "github.com/jan/fm-rds-tx/internal/platform"
  18. )
  19. //go:embed ui.html
  20. var uiHTML []byte
  21. // TXController is an optional interface the Server uses to start/stop TX
  22. // and apply live config changes.
  23. type TXController interface {
  24. StartTX() error
  25. StopTX() error
  26. TXStats() map[string]any
  27. UpdateConfig(patch LivePatch) error
  28. ResetFault() error
  29. }
  30. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  31. // nil = no change.
  32. type LivePatch struct {
  33. FrequencyMHz *float64
  34. OutputDrive *float64
  35. StereoEnabled *bool
  36. PilotLevel *float64
  37. RDSInjection *float64
  38. RDSEnabled *bool
  39. LimiterEnabled *bool
  40. LimiterCeiling *float64
  41. PS *string
  42. RadioText *string
  43. }
  44. type Server struct {
  45. mu sync.RWMutex
  46. cfg config.Config
  47. tx TXController
  48. drv platform.SoapyDriver // optional, for runtime stats
  49. streamSrc *audio.StreamSource // optional, for live audio ring stats
  50. audioIngress AudioIngress // optional, for /audio/stream
  51. ingestRt IngestRuntime // optional, for /runtime ingest stats
  52. saveConfig func(config.Config) error
  53. hardReload func()
  54. audit auditCounters
  55. }
  56. type AudioIngress interface {
  57. WritePCM16(data []byte) (int, error)
  58. }
  59. type IngestRuntime interface {
  60. Stats() ingest.Stats
  61. }
  62. type auditEvent string
  63. const (
  64. auditMethodNotAllowed auditEvent = "methodNotAllowed"
  65. auditUnsupportedMediaType auditEvent = "unsupportedMediaType"
  66. auditBodyTooLarge auditEvent = "bodyTooLarge"
  67. auditUnexpectedBody auditEvent = "unexpectedBody"
  68. )
  69. type auditCounters struct {
  70. methodNotAllowed uint64
  71. unsupportedMediaType uint64
  72. bodyTooLarge uint64
  73. unexpectedBody uint64
  74. }
  75. const (
  76. maxConfigBodyBytes = 64 << 10 // 64 KiB
  77. configContentTypeHeader = "application/json"
  78. noBodyErrMsg = "request must not include a body"
  79. audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16"
  80. audioStreamBodyLimitDefault = 512 << 20 // 512 MiB
  81. )
  82. var audioStreamAllowedMediaTypes = []string{
  83. "application/octet-stream",
  84. "audio/l16",
  85. }
  86. var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override.
  87. func isJSONContentType(r *http.Request) bool {
  88. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  89. if ct == "" {
  90. return false
  91. }
  92. mediaType, _, err := mime.ParseMediaType(ct)
  93. if err != nil {
  94. return false
  95. }
  96. return strings.EqualFold(mediaType, configContentTypeHeader)
  97. }
  98. type ConfigPatch struct {
  99. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  100. OutputDrive *float64 `json:"outputDrive,omitempty"`
  101. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  102. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  103. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  104. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  105. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  106. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  107. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  108. PS *string `json:"ps,omitempty"`
  109. RadioText *string `json:"radioText,omitempty"`
  110. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  111. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  112. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  113. AudioGain *float64 `json:"audioGain,omitempty"`
  114. PI *string `json:"pi,omitempty"`
  115. PTY *int `json:"pty,omitempty"`
  116. BS412Enabled *bool `json:"bs412Enabled,omitempty"`
  117. BS412ThresholdDBr *float64 `json:"bs412ThresholdDBr,omitempty"`
  118. MpxGain *float64 `json:"mpxGain,omitempty"`
  119. }
  120. type IngestSaveRequest struct {
  121. Ingest config.IngestConfig `json:"ingest"`
  122. }
  123. func NewServer(cfg config.Config) *Server {
  124. return &Server{cfg: cfg}
  125. }
  126. func hasRequestBody(r *http.Request) bool {
  127. if r.ContentLength > 0 {
  128. return true
  129. }
  130. for _, te := range r.TransferEncoding {
  131. if strings.EqualFold(te, "chunked") {
  132. return true
  133. }
  134. }
  135. return false
  136. }
  137. func (s *Server) rejectBody(w http.ResponseWriter, r *http.Request) bool {
  138. if !hasRequestBody(r) {
  139. return true
  140. }
  141. s.recordAudit(auditUnexpectedBody)
  142. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  143. return false
  144. }
  145. func (s *Server) recordAudit(evt auditEvent) {
  146. switch evt {
  147. case auditMethodNotAllowed:
  148. atomic.AddUint64(&s.audit.methodNotAllowed, 1)
  149. case auditUnsupportedMediaType:
  150. atomic.AddUint64(&s.audit.unsupportedMediaType, 1)
  151. case auditBodyTooLarge:
  152. atomic.AddUint64(&s.audit.bodyTooLarge, 1)
  153. case auditUnexpectedBody:
  154. atomic.AddUint64(&s.audit.unexpectedBody, 1)
  155. }
  156. }
  157. func (s *Server) auditSnapshot() map[string]uint64 {
  158. return map[string]uint64{
  159. "methodNotAllowed": atomic.LoadUint64(&s.audit.methodNotAllowed),
  160. "unsupportedMediaType": atomic.LoadUint64(&s.audit.unsupportedMediaType),
  161. "bodyTooLarge": atomic.LoadUint64(&s.audit.bodyTooLarge),
  162. "unexpectedBody": atomic.LoadUint64(&s.audit.unexpectedBody),
  163. }
  164. }
  165. func isAudioStreamContentType(r *http.Request) bool {
  166. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  167. if ct == "" {
  168. return false
  169. }
  170. mediaType, _, err := mime.ParseMediaType(ct)
  171. if err != nil {
  172. return false
  173. }
  174. for _, allowed := range audioStreamAllowedMediaTypes {
  175. if strings.EqualFold(mediaType, allowed) {
  176. return true
  177. }
  178. }
  179. return false
  180. }
  181. func (s *Server) SetTXController(tx TXController) {
  182. s.mu.Lock()
  183. s.tx = tx
  184. s.mu.Unlock()
  185. }
  186. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  187. s.mu.Lock()
  188. s.drv = drv
  189. s.mu.Unlock()
  190. }
  191. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  192. s.mu.Lock()
  193. s.streamSrc = src
  194. s.mu.Unlock()
  195. }
  196. func (s *Server) SetAudioIngress(ingress AudioIngress) {
  197. s.mu.Lock()
  198. s.audioIngress = ingress
  199. s.mu.Unlock()
  200. }
  201. func (s *Server) SetIngestRuntime(rt IngestRuntime) {
  202. s.mu.Lock()
  203. s.ingestRt = rt
  204. s.mu.Unlock()
  205. }
  206. func (s *Server) SetConfigSaver(save func(config.Config) error) {
  207. s.mu.Lock()
  208. s.saveConfig = save
  209. s.mu.Unlock()
  210. }
  211. func (s *Server) SetHardReload(fn func()) {
  212. s.mu.Lock()
  213. s.hardReload = fn
  214. s.mu.Unlock()
  215. }
  216. func (s *Server) Handler() http.Handler {
  217. mux := http.NewServeMux()
  218. mux.HandleFunc("/", s.handleUI)
  219. mux.HandleFunc("/healthz", s.handleHealth)
  220. mux.HandleFunc("/status", s.handleStatus)
  221. mux.HandleFunc("/dry-run", s.handleDryRun)
  222. mux.HandleFunc("/config", s.handleConfig)
  223. mux.HandleFunc("/config/ingest/save", s.handleIngestSave)
  224. mux.HandleFunc("/runtime", s.handleRuntime)
  225. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  226. mux.HandleFunc("/tx/start", s.handleTXStart)
  227. mux.HandleFunc("/tx/stop", s.handleTXStop)
  228. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  229. return mux
  230. }
  231. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  232. w.Header().Set("Content-Type", "application/json")
  233. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  234. }
  235. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  236. if r.URL.Path != "/" {
  237. http.NotFound(w, r)
  238. return
  239. }
  240. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  241. w.Header().Set("Cache-Control", "no-cache")
  242. w.Write(uiHTML)
  243. }
  244. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  245. s.mu.RLock()
  246. cfg := s.cfg
  247. tx := s.tx
  248. s.mu.RUnlock()
  249. status := map[string]any{
  250. "service": "fm-rds-tx",
  251. "backend": cfg.Backend.Kind,
  252. "frequencyMHz": cfg.FM.FrequencyMHz,
  253. "stereoEnabled": cfg.FM.StereoEnabled,
  254. "rdsEnabled": cfg.RDS.Enabled,
  255. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  256. "limiterEnabled": cfg.FM.LimiterEnabled,
  257. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  258. }
  259. if tx != nil {
  260. if stats := tx.TXStats(); stats != nil {
  261. if ri, ok := stats["runtimeIndicator"]; ok {
  262. status["runtimeIndicator"] = ri
  263. }
  264. if alert, ok := stats["runtimeAlert"]; ok {
  265. status["runtimeAlert"] = alert
  266. }
  267. if queue, ok := stats["queue"]; ok {
  268. status["queue"] = queue
  269. }
  270. if runtimeState, ok := stats["state"]; ok {
  271. status["runtimeState"] = runtimeState
  272. }
  273. }
  274. }
  275. w.Header().Set("Content-Type", "application/json")
  276. _ = json.NewEncoder(w).Encode(status)
  277. }
  278. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  279. s.mu.RLock()
  280. drv := s.drv
  281. tx := s.tx
  282. stream := s.streamSrc
  283. ingestRt := s.ingestRt
  284. s.mu.RUnlock()
  285. result := map[string]any{}
  286. if drv != nil {
  287. result["driver"] = drv.Stats()
  288. }
  289. if tx != nil {
  290. if stats := tx.TXStats(); stats != nil {
  291. result["engine"] = stats
  292. }
  293. }
  294. if stream != nil {
  295. result["audioStream"] = stream.Stats()
  296. }
  297. if ingestRt != nil {
  298. result["ingest"] = ingestRt.Stats()
  299. }
  300. result["controlAudit"] = s.auditSnapshot()
  301. w.Header().Set("Content-Type", "application/json")
  302. _ = json.NewEncoder(w).Encode(result)
  303. }
  304. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  305. if r.Method != http.MethodPost {
  306. s.recordAudit(auditMethodNotAllowed)
  307. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  308. return
  309. }
  310. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  311. return
  312. }
  313. s.mu.RLock()
  314. tx := s.tx
  315. s.mu.RUnlock()
  316. if tx == nil {
  317. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  318. return
  319. }
  320. if err := tx.ResetFault(); err != nil {
  321. http.Error(w, err.Error(), http.StatusConflict)
  322. return
  323. }
  324. w.Header().Set("Content-Type", "application/json")
  325. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  326. }
  327. // handleAudioStream accepts raw S16LE PCM via HTTP POST and pushes
  328. // it into the configured ingest http-raw source. Use with:
  329. //
  330. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  331. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  332. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  333. if r.Method != http.MethodPost {
  334. s.recordAudit(auditMethodNotAllowed)
  335. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  336. return
  337. }
  338. if !isAudioStreamContentType(r) {
  339. s.recordAudit(auditUnsupportedMediaType)
  340. http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType)
  341. return
  342. }
  343. s.mu.RLock()
  344. ingress := s.audioIngress
  345. s.mu.RUnlock()
  346. if ingress == nil {
  347. http.Error(w, "audio ingest not configured (use --audio-http with ingest runtime)", http.StatusServiceUnavailable)
  348. return
  349. }
  350. // BUG-10 fix: /audio/stream is a long-lived streaming endpoint.
  351. // The global HTTP server ReadTimeout (5s) and WriteTimeout (10s) would
  352. // kill connections mid-stream. Disable them per-request via ResponseController
  353. // (requires Go 1.20+, confirmed Go 1.22).
  354. rc := http.NewResponseController(w)
  355. _ = rc.SetReadDeadline(time.Time{})
  356. _ = rc.SetWriteDeadline(time.Time{})
  357. r.Body = http.MaxBytesReader(w, r.Body, audioStreamBodyLimit)
  358. // Read body in chunks and push to ring buffer
  359. buf := make([]byte, 32768)
  360. totalFrames := 0
  361. for {
  362. n, err := r.Body.Read(buf)
  363. if n > 0 {
  364. written, writeErr := ingress.WritePCM16(buf[:n])
  365. totalFrames += written
  366. if writeErr != nil {
  367. http.Error(w, writeErr.Error(), http.StatusServiceUnavailable)
  368. return
  369. }
  370. }
  371. if err != nil {
  372. if err == io.EOF {
  373. break
  374. }
  375. var maxErr *http.MaxBytesError
  376. if errors.As(err, &maxErr) {
  377. s.recordAudit(auditBodyTooLarge)
  378. http.Error(w, maxErr.Error(), http.StatusRequestEntityTooLarge)
  379. return
  380. }
  381. http.Error(w, err.Error(), http.StatusInternalServerError)
  382. return
  383. }
  384. }
  385. w.Header().Set("Content-Type", "application/json")
  386. _ = json.NewEncoder(w).Encode(map[string]any{
  387. "ok": true,
  388. "frames": totalFrames,
  389. })
  390. }
  391. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  392. if r.Method != http.MethodPost {
  393. s.recordAudit(auditMethodNotAllowed)
  394. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  395. return
  396. }
  397. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  398. return
  399. }
  400. s.mu.RLock()
  401. tx := s.tx
  402. s.mu.RUnlock()
  403. if tx == nil {
  404. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  405. return
  406. }
  407. if err := tx.StartTX(); err != nil {
  408. http.Error(w, err.Error(), http.StatusConflict)
  409. return
  410. }
  411. w.Header().Set("Content-Type", "application/json")
  412. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  413. }
  414. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  415. if r.Method != http.MethodPost {
  416. s.recordAudit(auditMethodNotAllowed)
  417. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  418. return
  419. }
  420. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  421. return
  422. }
  423. s.mu.RLock()
  424. tx := s.tx
  425. s.mu.RUnlock()
  426. if tx == nil {
  427. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  428. return
  429. }
  430. if err := tx.StopTX(); err != nil {
  431. http.Error(w, err.Error(), http.StatusInternalServerError)
  432. return
  433. }
  434. w.Header().Set("Content-Type", "application/json")
  435. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stopped"})
  436. }
  437. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  438. s.mu.RLock()
  439. cfg := s.cfg
  440. s.mu.RUnlock()
  441. w.Header().Set("Content-Type", "application/json")
  442. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  443. }
  444. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  445. switch r.Method {
  446. case http.MethodGet:
  447. s.mu.RLock()
  448. cfg := s.cfg
  449. s.mu.RUnlock()
  450. w.Header().Set("Content-Type", "application/json")
  451. _ = json.NewEncoder(w).Encode(cfg)
  452. case http.MethodPost:
  453. if !isJSONContentType(r) {
  454. s.recordAudit(auditUnsupportedMediaType)
  455. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  456. return
  457. }
  458. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  459. var patch ConfigPatch
  460. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  461. statusCode := http.StatusBadRequest
  462. if strings.Contains(err.Error(), "http: request body too large") {
  463. statusCode = http.StatusRequestEntityTooLarge
  464. s.recordAudit(auditBodyTooLarge)
  465. }
  466. http.Error(w, err.Error(), statusCode)
  467. return
  468. }
  469. // Update the server's config snapshot (for GET /config and /status)
  470. s.mu.Lock()
  471. next := s.cfg
  472. if patch.FrequencyMHz != nil {
  473. next.FM.FrequencyMHz = *patch.FrequencyMHz
  474. }
  475. if patch.OutputDrive != nil {
  476. next.FM.OutputDrive = *patch.OutputDrive
  477. }
  478. if patch.ToneLeftHz != nil {
  479. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  480. }
  481. if patch.ToneRightHz != nil {
  482. next.Audio.ToneRightHz = *patch.ToneRightHz
  483. }
  484. if patch.ToneAmplitude != nil {
  485. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  486. }
  487. if patch.AudioGain != nil {
  488. next.Audio.Gain = *patch.AudioGain
  489. }
  490. if patch.PS != nil {
  491. next.RDS.PS = *patch.PS
  492. }
  493. if patch.RadioText != nil {
  494. next.RDS.RadioText = *patch.RadioText
  495. }
  496. if patch.PI != nil {
  497. next.RDS.PI = *patch.PI
  498. }
  499. if patch.PTY != nil {
  500. next.RDS.PTY = *patch.PTY
  501. }
  502. if patch.PreEmphasisTauUS != nil {
  503. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  504. }
  505. if patch.StereoEnabled != nil {
  506. next.FM.StereoEnabled = *patch.StereoEnabled
  507. }
  508. if patch.LimiterEnabled != nil {
  509. next.FM.LimiterEnabled = *patch.LimiterEnabled
  510. }
  511. if patch.LimiterCeiling != nil {
  512. next.FM.LimiterCeiling = *patch.LimiterCeiling
  513. }
  514. if patch.RDSEnabled != nil {
  515. next.RDS.Enabled = *patch.RDSEnabled
  516. }
  517. if patch.PilotLevel != nil {
  518. next.FM.PilotLevel = *patch.PilotLevel
  519. }
  520. if patch.RDSInjection != nil {
  521. next.FM.RDSInjection = *patch.RDSInjection
  522. }
  523. if patch.BS412Enabled != nil {
  524. next.FM.BS412Enabled = *patch.BS412Enabled
  525. }
  526. if patch.BS412ThresholdDBr != nil {
  527. next.FM.BS412ThresholdDBr = *patch.BS412ThresholdDBr
  528. }
  529. if patch.MpxGain != nil {
  530. next.FM.MpxGain = *patch.MpxGain
  531. }
  532. if err := next.Validate(); err != nil {
  533. s.mu.Unlock()
  534. http.Error(w, err.Error(), http.StatusBadRequest)
  535. return
  536. }
  537. lp := LivePatch{
  538. FrequencyMHz: patch.FrequencyMHz,
  539. OutputDrive: patch.OutputDrive,
  540. StereoEnabled: patch.StereoEnabled,
  541. PilotLevel: patch.PilotLevel,
  542. RDSInjection: patch.RDSInjection,
  543. RDSEnabled: patch.RDSEnabled,
  544. LimiterEnabled: patch.LimiterEnabled,
  545. LimiterCeiling: patch.LimiterCeiling,
  546. PS: patch.PS,
  547. RadioText: patch.RadioText,
  548. }
  549. tx := s.tx
  550. if tx != nil {
  551. if err := tx.UpdateConfig(lp); err != nil {
  552. s.mu.Unlock()
  553. http.Error(w, err.Error(), http.StatusBadRequest)
  554. return
  555. }
  556. }
  557. s.cfg = next
  558. live := tx != nil
  559. s.mu.Unlock()
  560. w.Header().Set("Content-Type", "application/json")
  561. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  562. default:
  563. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  564. }
  565. }
  566. func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) {
  567. if r.Method != http.MethodPost {
  568. s.recordAudit(auditMethodNotAllowed)
  569. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  570. return
  571. }
  572. if !isJSONContentType(r) {
  573. s.recordAudit(auditUnsupportedMediaType)
  574. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  575. return
  576. }
  577. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  578. var req IngestSaveRequest
  579. if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
  580. statusCode := http.StatusBadRequest
  581. if strings.Contains(err.Error(), "http: request body too large") {
  582. statusCode = http.StatusRequestEntityTooLarge
  583. s.recordAudit(auditBodyTooLarge)
  584. }
  585. http.Error(w, err.Error(), statusCode)
  586. return
  587. }
  588. s.mu.Lock()
  589. next := s.cfg
  590. next.Ingest = req.Ingest
  591. if err := next.Validate(); err != nil {
  592. s.mu.Unlock()
  593. http.Error(w, err.Error(), http.StatusBadRequest)
  594. return
  595. }
  596. save := s.saveConfig
  597. reload := s.hardReload
  598. if save == nil {
  599. s.mu.Unlock()
  600. http.Error(w, "config save is not configured (start with --config <path>)", http.StatusServiceUnavailable)
  601. return
  602. }
  603. if err := save(next); err != nil {
  604. s.mu.Unlock()
  605. http.Error(w, err.Error(), http.StatusInternalServerError)
  606. return
  607. }
  608. s.cfg = next
  609. s.mu.Unlock()
  610. w.Header().Set("Content-Type", "application/json")
  611. reloadScheduled := reload != nil
  612. _ = json.NewEncoder(w).Encode(map[string]any{
  613. "ok": true,
  614. "saved": true,
  615. "reloadScheduled": reloadScheduled,
  616. })
  617. if reloadScheduled {
  618. go func(fn func()) {
  619. time.Sleep(250 * time.Millisecond)
  620. fn()
  621. }(reload)
  622. }
  623. }