Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

680 rindas
19KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "mime"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jan/fm-rds-tx/internal/audio"
  14. "github.com/jan/fm-rds-tx/internal/config"
  15. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  16. "github.com/jan/fm-rds-tx/internal/ingest"
  17. "github.com/jan/fm-rds-tx/internal/platform"
  18. )
  19. //go:embed ui.html
  20. var uiHTML []byte
  21. // TXController is an optional interface the Server uses to start/stop TX
  22. // and apply live config changes.
  23. type TXController interface {
  24. StartTX() error
  25. StopTX() error
  26. TXStats() map[string]any
  27. UpdateConfig(patch LivePatch) error
  28. ResetFault() error
  29. }
  30. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  31. // nil = no change.
  32. type LivePatch struct {
  33. FrequencyMHz *float64
  34. OutputDrive *float64
  35. StereoEnabled *bool
  36. PilotLevel *float64
  37. RDSInjection *float64
  38. RDSEnabled *bool
  39. LimiterEnabled *bool
  40. LimiterCeiling *float64
  41. PS *string
  42. RadioText *string
  43. }
  44. type Server struct {
  45. mu sync.RWMutex
  46. cfg config.Config
  47. tx TXController
  48. drv platform.SoapyDriver // optional, for runtime stats
  49. streamSrc *audio.StreamSource // optional, for live audio ring stats
  50. audioIngress AudioIngress // optional, for /audio/stream
  51. ingestRt IngestRuntime // optional, for /runtime ingest stats
  52. saveConfig func(config.Config) error
  53. hardReload func()
  54. audit auditCounters
  55. }
  56. type AudioIngress interface {
  57. WritePCM16(data []byte) (int, error)
  58. }
  59. type IngestRuntime interface {
  60. Stats() ingest.Stats
  61. }
  62. type auditEvent string
  63. const (
  64. auditMethodNotAllowed auditEvent = "methodNotAllowed"
  65. auditUnsupportedMediaType auditEvent = "unsupportedMediaType"
  66. auditBodyTooLarge auditEvent = "bodyTooLarge"
  67. auditUnexpectedBody auditEvent = "unexpectedBody"
  68. )
  69. type auditCounters struct {
  70. methodNotAllowed uint64
  71. unsupportedMediaType uint64
  72. bodyTooLarge uint64
  73. unexpectedBody uint64
  74. }
  75. const (
  76. maxConfigBodyBytes = 64 << 10 // 64 KiB
  77. configContentTypeHeader = "application/json"
  78. noBodyErrMsg = "request must not include a body"
  79. audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16"
  80. audioStreamBodyLimitDefault = 512 << 20 // 512 MiB
  81. )
  82. var audioStreamAllowedMediaTypes = []string{
  83. "application/octet-stream",
  84. "audio/l16",
  85. }
  86. var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override.
  87. func isJSONContentType(r *http.Request) bool {
  88. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  89. if ct == "" {
  90. return false
  91. }
  92. mediaType, _, err := mime.ParseMediaType(ct)
  93. if err != nil {
  94. return false
  95. }
  96. return strings.EqualFold(mediaType, configContentTypeHeader)
  97. }
  98. type ConfigPatch struct {
  99. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  100. OutputDrive *float64 `json:"outputDrive,omitempty"`
  101. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  102. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  103. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  104. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  105. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  106. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  107. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  108. PS *string `json:"ps,omitempty"`
  109. RadioText *string `json:"radioText,omitempty"`
  110. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  111. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  112. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  113. AudioGain *float64 `json:"audioGain,omitempty"`
  114. PI *string `json:"pi,omitempty"`
  115. PTY *int `json:"pty,omitempty"`
  116. BS412Enabled *bool `json:"bs412Enabled,omitempty"`
  117. BS412ThresholdDBr *float64 `json:"bs412ThresholdDBr,omitempty"`
  118. MpxGain *float64 `json:"mpxGain,omitempty"`
  119. }
  120. type IngestSaveRequest struct {
  121. Ingest config.IngestConfig `json:"ingest"`
  122. }
  123. func NewServer(cfg config.Config) *Server {
  124. return &Server{cfg: cfg}
  125. }
  126. func hasRequestBody(r *http.Request) bool {
  127. if r.ContentLength > 0 {
  128. return true
  129. }
  130. for _, te := range r.TransferEncoding {
  131. if strings.EqualFold(te, "chunked") {
  132. return true
  133. }
  134. }
  135. return false
  136. }
  137. func (s *Server) rejectBody(w http.ResponseWriter, r *http.Request) bool {
  138. // Returns true when the request has an unexpected body and the error response
  139. // has already been written — callers should return immediately in that case.
  140. // Returns false when there is no body (happy path — request should proceed).
  141. if !hasRequestBody(r) {
  142. return false
  143. }
  144. s.recordAudit(auditUnexpectedBody)
  145. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  146. return true
  147. }
  148. func (s *Server) recordAudit(evt auditEvent) {
  149. switch evt {
  150. case auditMethodNotAllowed:
  151. atomic.AddUint64(&s.audit.methodNotAllowed, 1)
  152. case auditUnsupportedMediaType:
  153. atomic.AddUint64(&s.audit.unsupportedMediaType, 1)
  154. case auditBodyTooLarge:
  155. atomic.AddUint64(&s.audit.bodyTooLarge, 1)
  156. case auditUnexpectedBody:
  157. atomic.AddUint64(&s.audit.unexpectedBody, 1)
  158. }
  159. }
  160. func (s *Server) auditSnapshot() map[string]uint64 {
  161. return map[string]uint64{
  162. "methodNotAllowed": atomic.LoadUint64(&s.audit.methodNotAllowed),
  163. "unsupportedMediaType": atomic.LoadUint64(&s.audit.unsupportedMediaType),
  164. "bodyTooLarge": atomic.LoadUint64(&s.audit.bodyTooLarge),
  165. "unexpectedBody": atomic.LoadUint64(&s.audit.unexpectedBody),
  166. }
  167. }
  168. func isAudioStreamContentType(r *http.Request) bool {
  169. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  170. if ct == "" {
  171. return false
  172. }
  173. mediaType, _, err := mime.ParseMediaType(ct)
  174. if err != nil {
  175. return false
  176. }
  177. for _, allowed := range audioStreamAllowedMediaTypes {
  178. if strings.EqualFold(mediaType, allowed) {
  179. return true
  180. }
  181. }
  182. return false
  183. }
  184. func (s *Server) SetTXController(tx TXController) {
  185. s.mu.Lock()
  186. s.tx = tx
  187. s.mu.Unlock()
  188. }
  189. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  190. s.mu.Lock()
  191. s.drv = drv
  192. s.mu.Unlock()
  193. }
  194. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  195. s.mu.Lock()
  196. s.streamSrc = src
  197. s.mu.Unlock()
  198. }
  199. func (s *Server) SetAudioIngress(ingress AudioIngress) {
  200. s.mu.Lock()
  201. s.audioIngress = ingress
  202. s.mu.Unlock()
  203. }
  204. func (s *Server) SetIngestRuntime(rt IngestRuntime) {
  205. s.mu.Lock()
  206. s.ingestRt = rt
  207. s.mu.Unlock()
  208. }
  209. func (s *Server) SetConfigSaver(save func(config.Config) error) {
  210. s.mu.Lock()
  211. s.saveConfig = save
  212. s.mu.Unlock()
  213. }
  214. func (s *Server) SetHardReload(fn func()) {
  215. s.mu.Lock()
  216. s.hardReload = fn
  217. s.mu.Unlock()
  218. }
  219. func (s *Server) Handler() http.Handler {
  220. mux := http.NewServeMux()
  221. mux.HandleFunc("/", s.handleUI)
  222. mux.HandleFunc("/healthz", s.handleHealth)
  223. mux.HandleFunc("/status", s.handleStatus)
  224. mux.HandleFunc("/dry-run", s.handleDryRun)
  225. mux.HandleFunc("/config", s.handleConfig)
  226. mux.HandleFunc("/config/ingest/save", s.handleIngestSave)
  227. mux.HandleFunc("/runtime", s.handleRuntime)
  228. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  229. mux.HandleFunc("/tx/start", s.handleTXStart)
  230. mux.HandleFunc("/tx/stop", s.handleTXStop)
  231. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  232. return mux
  233. }
  234. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  235. w.Header().Set("Content-Type", "application/json")
  236. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  237. }
  238. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  239. if r.URL.Path != "/" {
  240. http.NotFound(w, r)
  241. return
  242. }
  243. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  244. w.Header().Set("Cache-Control", "no-cache")
  245. w.Write(uiHTML)
  246. }
  247. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  248. s.mu.RLock()
  249. cfg := s.cfg
  250. tx := s.tx
  251. s.mu.RUnlock()
  252. status := map[string]any{
  253. "service": "fm-rds-tx",
  254. "backend": cfg.Backend.Kind,
  255. "frequencyMHz": cfg.FM.FrequencyMHz,
  256. "stereoEnabled": cfg.FM.StereoEnabled,
  257. "rdsEnabled": cfg.RDS.Enabled,
  258. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  259. "limiterEnabled": cfg.FM.LimiterEnabled,
  260. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  261. }
  262. if tx != nil {
  263. if stats := tx.TXStats(); stats != nil {
  264. if ri, ok := stats["runtimeIndicator"]; ok {
  265. status["runtimeIndicator"] = ri
  266. }
  267. if alert, ok := stats["runtimeAlert"]; ok {
  268. status["runtimeAlert"] = alert
  269. }
  270. if queue, ok := stats["queue"]; ok {
  271. status["queue"] = queue
  272. }
  273. if runtimeState, ok := stats["state"]; ok {
  274. status["runtimeState"] = runtimeState
  275. }
  276. }
  277. }
  278. w.Header().Set("Content-Type", "application/json")
  279. _ = json.NewEncoder(w).Encode(status)
  280. }
  281. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  282. s.mu.RLock()
  283. drv := s.drv
  284. tx := s.tx
  285. stream := s.streamSrc
  286. ingestRt := s.ingestRt
  287. s.mu.RUnlock()
  288. result := map[string]any{}
  289. if drv != nil {
  290. result["driver"] = drv.Stats()
  291. }
  292. if tx != nil {
  293. if stats := tx.TXStats(); stats != nil {
  294. result["engine"] = stats
  295. }
  296. }
  297. if stream != nil {
  298. result["audioStream"] = stream.Stats()
  299. }
  300. if ingestRt != nil {
  301. result["ingest"] = ingestRt.Stats()
  302. }
  303. result["controlAudit"] = s.auditSnapshot()
  304. w.Header().Set("Content-Type", "application/json")
  305. _ = json.NewEncoder(w).Encode(result)
  306. }
  307. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  308. if r.Method != http.MethodPost {
  309. s.recordAudit(auditMethodNotAllowed)
  310. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  311. return
  312. }
  313. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  314. return
  315. }
  316. s.mu.RLock()
  317. tx := s.tx
  318. s.mu.RUnlock()
  319. if tx == nil {
  320. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  321. return
  322. }
  323. if err := tx.ResetFault(); err != nil {
  324. http.Error(w, err.Error(), http.StatusConflict)
  325. return
  326. }
  327. w.Header().Set("Content-Type", "application/json")
  328. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  329. }
  330. // handleAudioStream accepts raw S16LE PCM via HTTP POST and pushes
  331. // it into the configured ingest http-raw source. Use with:
  332. //
  333. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  334. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  335. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  336. if r.Method != http.MethodPost {
  337. s.recordAudit(auditMethodNotAllowed)
  338. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  339. return
  340. }
  341. if !isAudioStreamContentType(r) {
  342. s.recordAudit(auditUnsupportedMediaType)
  343. http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType)
  344. return
  345. }
  346. s.mu.RLock()
  347. ingress := s.audioIngress
  348. s.mu.RUnlock()
  349. if ingress == nil {
  350. http.Error(w, "audio ingest not configured (use --audio-http with ingest runtime)", http.StatusServiceUnavailable)
  351. return
  352. }
  353. // BUG-10 fix: /audio/stream is a long-lived streaming endpoint.
  354. // The global HTTP server ReadTimeout (5s) and WriteTimeout (10s) would
  355. // kill connections mid-stream. Disable them per-request via ResponseController
  356. // (requires Go 1.20+, confirmed Go 1.22).
  357. rc := http.NewResponseController(w)
  358. _ = rc.SetReadDeadline(time.Time{})
  359. _ = rc.SetWriteDeadline(time.Time{})
  360. r.Body = http.MaxBytesReader(w, r.Body, audioStreamBodyLimit)
  361. // Read body in chunks and push to ring buffer
  362. buf := make([]byte, 32768)
  363. totalFrames := 0
  364. for {
  365. n, err := r.Body.Read(buf)
  366. if n > 0 {
  367. written, writeErr := ingress.WritePCM16(buf[:n])
  368. totalFrames += written
  369. if writeErr != nil {
  370. http.Error(w, writeErr.Error(), http.StatusServiceUnavailable)
  371. return
  372. }
  373. }
  374. if err != nil {
  375. if err == io.EOF {
  376. break
  377. }
  378. var maxErr *http.MaxBytesError
  379. if errors.As(err, &maxErr) {
  380. s.recordAudit(auditBodyTooLarge)
  381. http.Error(w, maxErr.Error(), http.StatusRequestEntityTooLarge)
  382. return
  383. }
  384. http.Error(w, err.Error(), http.StatusInternalServerError)
  385. return
  386. }
  387. }
  388. w.Header().Set("Content-Type", "application/json")
  389. _ = json.NewEncoder(w).Encode(map[string]any{
  390. "ok": true,
  391. "frames": totalFrames,
  392. })
  393. }
  394. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  395. if r.Method != http.MethodPost {
  396. s.recordAudit(auditMethodNotAllowed)
  397. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  398. return
  399. }
  400. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  401. return
  402. }
  403. s.mu.RLock()
  404. tx := s.tx
  405. s.mu.RUnlock()
  406. if tx == nil {
  407. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  408. return
  409. }
  410. if err := tx.StartTX(); err != nil {
  411. http.Error(w, err.Error(), http.StatusConflict)
  412. return
  413. }
  414. w.Header().Set("Content-Type", "application/json")
  415. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  416. }
  417. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  418. if r.Method != http.MethodPost {
  419. s.recordAudit(auditMethodNotAllowed)
  420. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  421. return
  422. }
  423. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  424. return
  425. }
  426. s.mu.RLock()
  427. tx := s.tx
  428. s.mu.RUnlock()
  429. if tx == nil {
  430. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  431. return
  432. }
  433. go func() {
  434. _ = tx.StopTX()
  435. }()
  436. w.Header().Set("Content-Type", "application/json")
  437. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stop-requested"})
  438. }
  439. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  440. s.mu.RLock()
  441. cfg := s.cfg
  442. s.mu.RUnlock()
  443. w.Header().Set("Content-Type", "application/json")
  444. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  445. }
  446. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  447. switch r.Method {
  448. case http.MethodGet:
  449. s.mu.RLock()
  450. cfg := s.cfg
  451. s.mu.RUnlock()
  452. w.Header().Set("Content-Type", "application/json")
  453. _ = json.NewEncoder(w).Encode(cfg)
  454. case http.MethodPost:
  455. if !isJSONContentType(r) {
  456. s.recordAudit(auditUnsupportedMediaType)
  457. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  458. return
  459. }
  460. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  461. var patch ConfigPatch
  462. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  463. statusCode := http.StatusBadRequest
  464. if strings.Contains(err.Error(), "http: request body too large") {
  465. statusCode = http.StatusRequestEntityTooLarge
  466. s.recordAudit(auditBodyTooLarge)
  467. }
  468. http.Error(w, err.Error(), statusCode)
  469. return
  470. }
  471. // Update the server's config snapshot (for GET /config and /status)
  472. s.mu.Lock()
  473. next := s.cfg
  474. if patch.FrequencyMHz != nil {
  475. next.FM.FrequencyMHz = *patch.FrequencyMHz
  476. }
  477. if patch.OutputDrive != nil {
  478. next.FM.OutputDrive = *patch.OutputDrive
  479. }
  480. if patch.ToneLeftHz != nil {
  481. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  482. }
  483. if patch.ToneRightHz != nil {
  484. next.Audio.ToneRightHz = *patch.ToneRightHz
  485. }
  486. if patch.ToneAmplitude != nil {
  487. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  488. }
  489. if patch.AudioGain != nil {
  490. next.Audio.Gain = *patch.AudioGain
  491. }
  492. if patch.PS != nil {
  493. next.RDS.PS = *patch.PS
  494. }
  495. if patch.RadioText != nil {
  496. next.RDS.RadioText = *patch.RadioText
  497. }
  498. if patch.PI != nil {
  499. next.RDS.PI = *patch.PI
  500. }
  501. if patch.PTY != nil {
  502. next.RDS.PTY = *patch.PTY
  503. }
  504. if patch.PreEmphasisTauUS != nil {
  505. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  506. }
  507. if patch.StereoEnabled != nil {
  508. next.FM.StereoEnabled = *patch.StereoEnabled
  509. }
  510. if patch.LimiterEnabled != nil {
  511. next.FM.LimiterEnabled = *patch.LimiterEnabled
  512. }
  513. if patch.LimiterCeiling != nil {
  514. next.FM.LimiterCeiling = *patch.LimiterCeiling
  515. }
  516. if patch.RDSEnabled != nil {
  517. next.RDS.Enabled = *patch.RDSEnabled
  518. }
  519. if patch.PilotLevel != nil {
  520. next.FM.PilotLevel = *patch.PilotLevel
  521. }
  522. if patch.RDSInjection != nil {
  523. next.FM.RDSInjection = *patch.RDSInjection
  524. }
  525. if patch.BS412Enabled != nil {
  526. next.FM.BS412Enabled = *patch.BS412Enabled
  527. }
  528. if patch.BS412ThresholdDBr != nil {
  529. next.FM.BS412ThresholdDBr = *patch.BS412ThresholdDBr
  530. }
  531. if patch.MpxGain != nil {
  532. next.FM.MpxGain = *patch.MpxGain
  533. }
  534. if err := next.Validate(); err != nil {
  535. s.mu.Unlock()
  536. http.Error(w, err.Error(), http.StatusBadRequest)
  537. return
  538. }
  539. lp := LivePatch{
  540. FrequencyMHz: patch.FrequencyMHz,
  541. OutputDrive: patch.OutputDrive,
  542. StereoEnabled: patch.StereoEnabled,
  543. PilotLevel: patch.PilotLevel,
  544. RDSInjection: patch.RDSInjection,
  545. RDSEnabled: patch.RDSEnabled,
  546. LimiterEnabled: patch.LimiterEnabled,
  547. LimiterCeiling: patch.LimiterCeiling,
  548. PS: patch.PS,
  549. RadioText: patch.RadioText,
  550. }
  551. tx := s.tx
  552. if tx != nil {
  553. if err := tx.UpdateConfig(lp); err != nil {
  554. s.mu.Unlock()
  555. http.Error(w, err.Error(), http.StatusBadRequest)
  556. return
  557. }
  558. }
  559. s.cfg = next
  560. live := tx != nil
  561. s.mu.Unlock()
  562. w.Header().Set("Content-Type", "application/json")
  563. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  564. default:
  565. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  566. }
  567. }
  568. func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) {
  569. if r.Method != http.MethodPost {
  570. s.recordAudit(auditMethodNotAllowed)
  571. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  572. return
  573. }
  574. if !isJSONContentType(r) {
  575. s.recordAudit(auditUnsupportedMediaType)
  576. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  577. return
  578. }
  579. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  580. var req IngestSaveRequest
  581. if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
  582. statusCode := http.StatusBadRequest
  583. if strings.Contains(err.Error(), "http: request body too large") {
  584. statusCode = http.StatusRequestEntityTooLarge
  585. s.recordAudit(auditBodyTooLarge)
  586. }
  587. http.Error(w, err.Error(), statusCode)
  588. return
  589. }
  590. s.mu.Lock()
  591. next := s.cfg
  592. next.Ingest = req.Ingest
  593. if err := next.Validate(); err != nil {
  594. s.mu.Unlock()
  595. http.Error(w, err.Error(), http.StatusBadRequest)
  596. return
  597. }
  598. save := s.saveConfig
  599. reload := s.hardReload
  600. if save == nil {
  601. s.mu.Unlock()
  602. http.Error(w, "config save is not configured (start with --config <path>)", http.StatusServiceUnavailable)
  603. return
  604. }
  605. if err := save(next); err != nil {
  606. s.mu.Unlock()
  607. http.Error(w, err.Error(), http.StatusInternalServerError)
  608. return
  609. }
  610. s.cfg = next
  611. s.mu.Unlock()
  612. w.Header().Set("Content-Type", "application/json")
  613. reloadScheduled := reload != nil
  614. _ = json.NewEncoder(w).Encode(map[string]any{
  615. "ok": true,
  616. "saved": true,
  617. "reloadScheduled": reloadScheduled,
  618. })
  619. if reloadScheduled {
  620. go func(fn func()) {
  621. time.Sleep(250 * time.Millisecond)
  622. fn()
  623. }(reload)
  624. }
  625. }