Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

703 行
20KB

  1. package control
  2. import (
  3. _ "embed"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "mime"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jan/fm-rds-tx/internal/audio"
  14. "github.com/jan/fm-rds-tx/internal/config"
  15. drypkg "github.com/jan/fm-rds-tx/internal/dryrun"
  16. "github.com/jan/fm-rds-tx/internal/ingest"
  17. "github.com/jan/fm-rds-tx/internal/platform"
  18. )
  19. //go:embed ui.html
  20. var uiHTML []byte
  21. // TXController is an optional interface the Server uses to start/stop TX
  22. // and apply live config changes.
  23. type TXController interface {
  24. StartTX() error
  25. StopTX() error
  26. TXStats() map[string]any
  27. UpdateConfig(patch LivePatch) error
  28. ResetFault() error
  29. }
  30. // LivePatch mirrors the patchable fields from ConfigPatch for the engine.
  31. // nil = no change.
  32. type LivePatch struct {
  33. FrequencyMHz *float64
  34. OutputDrive *float64
  35. StereoEnabled *bool
  36. PilotLevel *float64
  37. RDSInjection *float64
  38. RDSEnabled *bool
  39. LimiterEnabled *bool
  40. LimiterCeiling *float64
  41. PS *string
  42. RadioText *string
  43. ToneLeftHz *float64
  44. ToneRightHz *float64
  45. ToneAmplitude *float64
  46. AudioGain *float64
  47. }
  48. type Server struct {
  49. mu sync.RWMutex
  50. cfg config.Config
  51. tx TXController
  52. drv platform.SoapyDriver // optional, for runtime stats
  53. streamSrc *audio.StreamSource // optional, for live audio ring stats
  54. audioIngress AudioIngress // optional, for /audio/stream
  55. ingestRt IngestRuntime // optional, for /runtime ingest stats
  56. saveConfig func(config.Config) error
  57. hardReload func()
  58. audit auditCounters
  59. }
  60. type AudioIngress interface {
  61. WritePCM16(data []byte) (int, error)
  62. }
  63. type IngestRuntime interface {
  64. Stats() ingest.Stats
  65. }
  66. type auditEvent string
  67. const (
  68. auditMethodNotAllowed auditEvent = "methodNotAllowed"
  69. auditUnsupportedMediaType auditEvent = "unsupportedMediaType"
  70. auditBodyTooLarge auditEvent = "bodyTooLarge"
  71. auditUnexpectedBody auditEvent = "unexpectedBody"
  72. )
  73. type auditCounters struct {
  74. methodNotAllowed uint64
  75. unsupportedMediaType uint64
  76. bodyTooLarge uint64
  77. unexpectedBody uint64
  78. }
  79. const (
  80. maxConfigBodyBytes = 64 << 10 // 64 KiB
  81. configContentTypeHeader = "application/json"
  82. noBodyErrMsg = "request must not include a body"
  83. audioStreamContentTypeError = "Content-Type must be application/octet-stream or audio/L16"
  84. audioStreamBodyLimitDefault = 512 << 20 // 512 MiB
  85. )
  86. var audioStreamAllowedMediaTypes = []string{
  87. "application/octet-stream",
  88. "audio/l16",
  89. }
  90. var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override.
  91. func isJSONContentType(r *http.Request) bool {
  92. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  93. if ct == "" {
  94. return false
  95. }
  96. mediaType, _, err := mime.ParseMediaType(ct)
  97. if err != nil {
  98. return false
  99. }
  100. return strings.EqualFold(mediaType, configContentTypeHeader)
  101. }
  102. type ConfigPatch struct {
  103. FrequencyMHz *float64 `json:"frequencyMHz,omitempty"`
  104. OutputDrive *float64 `json:"outputDrive,omitempty"`
  105. StereoEnabled *bool `json:"stereoEnabled,omitempty"`
  106. PilotLevel *float64 `json:"pilotLevel,omitempty"`
  107. RDSInjection *float64 `json:"rdsInjection,omitempty"`
  108. RDSEnabled *bool `json:"rdsEnabled,omitempty"`
  109. ToneLeftHz *float64 `json:"toneLeftHz,omitempty"`
  110. ToneRightHz *float64 `json:"toneRightHz,omitempty"`
  111. ToneAmplitude *float64 `json:"toneAmplitude,omitempty"`
  112. PS *string `json:"ps,omitempty"`
  113. RadioText *string `json:"radioText,omitempty"`
  114. PreEmphasisTauUS *float64 `json:"preEmphasisTauUS,omitempty"`
  115. LimiterEnabled *bool `json:"limiterEnabled,omitempty"`
  116. LimiterCeiling *float64 `json:"limiterCeiling,omitempty"`
  117. AudioGain *float64 `json:"audioGain,omitempty"`
  118. PI *string `json:"pi,omitempty"`
  119. PTY *int `json:"pty,omitempty"`
  120. BS412Enabled *bool `json:"bs412Enabled,omitempty"`
  121. BS412ThresholdDBr *float64 `json:"bs412ThresholdDBr,omitempty"`
  122. MpxGain *float64 `json:"mpxGain,omitempty"`
  123. }
  124. type IngestSaveRequest struct {
  125. Ingest config.IngestConfig `json:"ingest"`
  126. }
  127. func NewServer(cfg config.Config) *Server {
  128. return &Server{cfg: cfg}
  129. }
  130. func hasRequestBody(r *http.Request) bool {
  131. if r.ContentLength > 0 {
  132. return true
  133. }
  134. for _, te := range r.TransferEncoding {
  135. if strings.EqualFold(te, "chunked") {
  136. return true
  137. }
  138. }
  139. return false
  140. }
  141. func (s *Server) rejectBody(w http.ResponseWriter, r *http.Request) bool {
  142. // Returns true when the request has an unexpected body and the error response
  143. // has already been written — callers should return immediately in that case.
  144. // Returns false when there is no body (happy path — request should proceed).
  145. if !hasRequestBody(r) {
  146. return false
  147. }
  148. s.recordAudit(auditUnexpectedBody)
  149. http.Error(w, noBodyErrMsg, http.StatusBadRequest)
  150. return true
  151. }
  152. func (s *Server) recordAudit(evt auditEvent) {
  153. switch evt {
  154. case auditMethodNotAllowed:
  155. atomic.AddUint64(&s.audit.methodNotAllowed, 1)
  156. case auditUnsupportedMediaType:
  157. atomic.AddUint64(&s.audit.unsupportedMediaType, 1)
  158. case auditBodyTooLarge:
  159. atomic.AddUint64(&s.audit.bodyTooLarge, 1)
  160. case auditUnexpectedBody:
  161. atomic.AddUint64(&s.audit.unexpectedBody, 1)
  162. }
  163. }
  164. func (s *Server) auditSnapshot() map[string]uint64 {
  165. return map[string]uint64{
  166. "methodNotAllowed": atomic.LoadUint64(&s.audit.methodNotAllowed),
  167. "unsupportedMediaType": atomic.LoadUint64(&s.audit.unsupportedMediaType),
  168. "bodyTooLarge": atomic.LoadUint64(&s.audit.bodyTooLarge),
  169. "unexpectedBody": atomic.LoadUint64(&s.audit.unexpectedBody),
  170. }
  171. }
  172. func isAudioStreamContentType(r *http.Request) bool {
  173. ct := strings.TrimSpace(r.Header.Get("Content-Type"))
  174. if ct == "" {
  175. return false
  176. }
  177. mediaType, _, err := mime.ParseMediaType(ct)
  178. if err != nil {
  179. return false
  180. }
  181. for _, allowed := range audioStreamAllowedMediaTypes {
  182. if strings.EqualFold(mediaType, allowed) {
  183. return true
  184. }
  185. }
  186. return false
  187. }
  188. func (s *Server) SetTXController(tx TXController) {
  189. s.mu.Lock()
  190. s.tx = tx
  191. s.mu.Unlock()
  192. }
  193. func (s *Server) SetDriver(drv platform.SoapyDriver) {
  194. s.mu.Lock()
  195. s.drv = drv
  196. s.mu.Unlock()
  197. }
  198. func (s *Server) SetStreamSource(src *audio.StreamSource) {
  199. s.mu.Lock()
  200. s.streamSrc = src
  201. s.mu.Unlock()
  202. }
  203. func (s *Server) SetAudioIngress(ingress AudioIngress) {
  204. s.mu.Lock()
  205. s.audioIngress = ingress
  206. s.mu.Unlock()
  207. }
  208. func (s *Server) SetIngestRuntime(rt IngestRuntime) {
  209. s.mu.Lock()
  210. s.ingestRt = rt
  211. s.mu.Unlock()
  212. }
  213. func (s *Server) SetConfigSaver(save func(config.Config) error) {
  214. s.mu.Lock()
  215. s.saveConfig = save
  216. s.mu.Unlock()
  217. }
  218. func (s *Server) SetHardReload(fn func()) {
  219. s.mu.Lock()
  220. s.hardReload = fn
  221. s.mu.Unlock()
  222. }
  223. func (s *Server) Handler() http.Handler {
  224. mux := http.NewServeMux()
  225. mux.HandleFunc("/", s.handleUI)
  226. mux.HandleFunc("/healthz", s.handleHealth)
  227. mux.HandleFunc("/status", s.handleStatus)
  228. mux.HandleFunc("/dry-run", s.handleDryRun)
  229. mux.HandleFunc("/config", s.handleConfig)
  230. mux.HandleFunc("/config/ingest/save", s.handleIngestSave)
  231. mux.HandleFunc("/runtime", s.handleRuntime)
  232. mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset)
  233. mux.HandleFunc("/tx/start", s.handleTXStart)
  234. mux.HandleFunc("/tx/stop", s.handleTXStop)
  235. mux.HandleFunc("/audio/stream", s.handleAudioStream)
  236. return mux
  237. }
  238. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
  239. w.Header().Set("Content-Type", "application/json")
  240. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  241. }
  242. func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) {
  243. if r.URL.Path != "/" {
  244. http.NotFound(w, r)
  245. return
  246. }
  247. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  248. w.Header().Set("Cache-Control", "no-cache")
  249. w.Write(uiHTML)
  250. }
  251. func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) {
  252. s.mu.RLock()
  253. cfg := s.cfg
  254. tx := s.tx
  255. s.mu.RUnlock()
  256. status := map[string]any{
  257. "service": "fm-rds-tx",
  258. "backend": cfg.Backend.Kind,
  259. "frequencyMHz": cfg.FM.FrequencyMHz,
  260. "stereoEnabled": cfg.FM.StereoEnabled,
  261. "rdsEnabled": cfg.RDS.Enabled,
  262. "preEmphasisTauUS": cfg.FM.PreEmphasisTauUS,
  263. "limiterEnabled": cfg.FM.LimiterEnabled,
  264. "fmModulationEnabled": cfg.FM.FMModulationEnabled,
  265. }
  266. if tx != nil {
  267. if stats := tx.TXStats(); stats != nil {
  268. if ri, ok := stats["runtimeIndicator"]; ok {
  269. status["runtimeIndicator"] = ri
  270. }
  271. if alert, ok := stats["runtimeAlert"]; ok {
  272. status["runtimeAlert"] = alert
  273. }
  274. if queue, ok := stats["queue"]; ok {
  275. status["queue"] = queue
  276. }
  277. if runtimeState, ok := stats["state"]; ok {
  278. status["runtimeState"] = runtimeState
  279. }
  280. }
  281. }
  282. w.Header().Set("Content-Type", "application/json")
  283. _ = json.NewEncoder(w).Encode(status)
  284. }
  285. func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) {
  286. s.mu.RLock()
  287. drv := s.drv
  288. tx := s.tx
  289. stream := s.streamSrc
  290. ingestRt := s.ingestRt
  291. s.mu.RUnlock()
  292. result := map[string]any{}
  293. if drv != nil {
  294. result["driver"] = drv.Stats()
  295. }
  296. if tx != nil {
  297. if stats := tx.TXStats(); stats != nil {
  298. result["engine"] = stats
  299. }
  300. }
  301. if stream != nil {
  302. result["audioStream"] = stream.Stats()
  303. }
  304. if ingestRt != nil {
  305. result["ingest"] = ingestRt.Stats()
  306. }
  307. result["controlAudit"] = s.auditSnapshot()
  308. w.Header().Set("Content-Type", "application/json")
  309. _ = json.NewEncoder(w).Encode(result)
  310. }
  311. func (s *Server) handleRuntimeFaultReset(w http.ResponseWriter, r *http.Request) {
  312. if r.Method != http.MethodPost {
  313. s.recordAudit(auditMethodNotAllowed)
  314. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  315. return
  316. }
  317. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  318. return
  319. }
  320. s.mu.RLock()
  321. tx := s.tx
  322. s.mu.RUnlock()
  323. if tx == nil {
  324. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  325. return
  326. }
  327. if err := tx.ResetFault(); err != nil {
  328. http.Error(w, err.Error(), http.StatusConflict)
  329. return
  330. }
  331. w.Header().Set("Content-Type", "application/json")
  332. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
  333. }
  334. // handleAudioStream accepts raw S16LE PCM via HTTP POST and pushes
  335. // it into the configured ingest http-raw source. Use with:
  336. //
  337. // curl -X POST --data-binary @- http://host:8088/audio/stream < audio.raw
  338. // ffmpeg ... -f s16le -ar 44100 -ac 2 - | curl -X POST --data-binary @- http://host:8088/audio/stream
  339. func (s *Server) handleAudioStream(w http.ResponseWriter, r *http.Request) {
  340. if r.Method != http.MethodPost {
  341. s.recordAudit(auditMethodNotAllowed)
  342. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  343. return
  344. }
  345. if !isAudioStreamContentType(r) {
  346. s.recordAudit(auditUnsupportedMediaType)
  347. http.Error(w, audioStreamContentTypeError, http.StatusUnsupportedMediaType)
  348. return
  349. }
  350. s.mu.RLock()
  351. ingress := s.audioIngress
  352. s.mu.RUnlock()
  353. if ingress == nil {
  354. http.Error(w, "audio ingest not configured (use --audio-http with ingest runtime)", http.StatusServiceUnavailable)
  355. return
  356. }
  357. // BUG-10 fix: /audio/stream is a long-lived streaming endpoint.
  358. // The global HTTP server ReadTimeout (5s) and WriteTimeout (10s) would
  359. // kill connections mid-stream. Disable them per-request via ResponseController
  360. // (requires Go 1.20+, confirmed Go 1.22).
  361. rc := http.NewResponseController(w)
  362. _ = rc.SetReadDeadline(time.Time{})
  363. _ = rc.SetWriteDeadline(time.Time{})
  364. r.Body = http.MaxBytesReader(w, r.Body, audioStreamBodyLimit)
  365. // Read body in chunks and push to ring buffer
  366. buf := make([]byte, 32768)
  367. totalFrames := 0
  368. for {
  369. n, err := r.Body.Read(buf)
  370. if n > 0 {
  371. written, writeErr := ingress.WritePCM16(buf[:n])
  372. totalFrames += written
  373. if writeErr != nil {
  374. http.Error(w, writeErr.Error(), http.StatusServiceUnavailable)
  375. return
  376. }
  377. }
  378. if err != nil {
  379. if err == io.EOF {
  380. break
  381. }
  382. var maxErr *http.MaxBytesError
  383. if errors.As(err, &maxErr) {
  384. s.recordAudit(auditBodyTooLarge)
  385. http.Error(w, maxErr.Error(), http.StatusRequestEntityTooLarge)
  386. return
  387. }
  388. http.Error(w, err.Error(), http.StatusInternalServerError)
  389. return
  390. }
  391. }
  392. w.Header().Set("Content-Type", "application/json")
  393. _ = json.NewEncoder(w).Encode(map[string]any{
  394. "ok": true,
  395. "frames": totalFrames,
  396. })
  397. }
  398. func (s *Server) handleTXStart(w http.ResponseWriter, r *http.Request) {
  399. if r.Method != http.MethodPost {
  400. s.recordAudit(auditMethodNotAllowed)
  401. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  402. return
  403. }
  404. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  405. return
  406. }
  407. s.mu.RLock()
  408. tx := s.tx
  409. s.mu.RUnlock()
  410. if tx == nil {
  411. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  412. return
  413. }
  414. if err := tx.StartTX(); err != nil {
  415. http.Error(w, err.Error(), http.StatusConflict)
  416. return
  417. }
  418. w.Header().Set("Content-Type", "application/json")
  419. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "started"})
  420. }
  421. func (s *Server) handleTXStop(w http.ResponseWriter, r *http.Request) {
  422. if r.Method != http.MethodPost {
  423. s.recordAudit(auditMethodNotAllowed)
  424. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  425. return
  426. }
  427. if s.rejectBody(w, r) { // BUG-01 fix: rejectBody returns true when rejected
  428. return
  429. }
  430. s.mu.RLock()
  431. tx := s.tx
  432. s.mu.RUnlock()
  433. if tx == nil {
  434. http.Error(w, "tx controller not available", http.StatusServiceUnavailable)
  435. return
  436. }
  437. go func() {
  438. _ = tx.StopTX()
  439. }()
  440. w.Header().Set("Content-Type", "application/json")
  441. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "action": "stop-requested"})
  442. }
  443. func (s *Server) handleDryRun(w http.ResponseWriter, _ *http.Request) {
  444. s.mu.RLock()
  445. cfg := s.cfg
  446. s.mu.RUnlock()
  447. w.Header().Set("Content-Type", "application/json")
  448. _ = json.NewEncoder(w).Encode(drypkg.Generate(cfg))
  449. }
  450. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
  451. switch r.Method {
  452. case http.MethodGet:
  453. s.mu.RLock()
  454. cfg := s.cfg
  455. s.mu.RUnlock()
  456. w.Header().Set("Content-Type", "application/json")
  457. _ = json.NewEncoder(w).Encode(cfg)
  458. case http.MethodPost:
  459. if !isJSONContentType(r) {
  460. s.recordAudit(auditUnsupportedMediaType)
  461. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  462. return
  463. }
  464. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  465. var patch ConfigPatch
  466. if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
  467. statusCode := http.StatusBadRequest
  468. if strings.Contains(err.Error(), "http: request body too large") {
  469. statusCode = http.StatusRequestEntityTooLarge
  470. s.recordAudit(auditBodyTooLarge)
  471. }
  472. http.Error(w, err.Error(), statusCode)
  473. return
  474. }
  475. // Update the server's config snapshot (for GET /config and /status)
  476. s.mu.Lock()
  477. next := s.cfg
  478. if patch.FrequencyMHz != nil {
  479. next.FM.FrequencyMHz = *patch.FrequencyMHz
  480. }
  481. if patch.OutputDrive != nil {
  482. next.FM.OutputDrive = *patch.OutputDrive
  483. }
  484. if patch.ToneLeftHz != nil {
  485. next.Audio.ToneLeftHz = *patch.ToneLeftHz
  486. }
  487. if patch.ToneRightHz != nil {
  488. next.Audio.ToneRightHz = *patch.ToneRightHz
  489. }
  490. if patch.ToneAmplitude != nil {
  491. next.Audio.ToneAmplitude = *patch.ToneAmplitude
  492. }
  493. if patch.AudioGain != nil {
  494. next.Audio.Gain = *patch.AudioGain
  495. }
  496. if patch.PS != nil {
  497. next.RDS.PS = *patch.PS
  498. }
  499. if patch.RadioText != nil {
  500. next.RDS.RadioText = *patch.RadioText
  501. }
  502. if patch.PI != nil {
  503. next.RDS.PI = *patch.PI
  504. }
  505. if patch.PTY != nil {
  506. next.RDS.PTY = *patch.PTY
  507. }
  508. if patch.PreEmphasisTauUS != nil {
  509. next.FM.PreEmphasisTauUS = *patch.PreEmphasisTauUS
  510. }
  511. if patch.StereoEnabled != nil {
  512. next.FM.StereoEnabled = *patch.StereoEnabled
  513. }
  514. if patch.LimiterEnabled != nil {
  515. next.FM.LimiterEnabled = *patch.LimiterEnabled
  516. }
  517. if patch.LimiterCeiling != nil {
  518. next.FM.LimiterCeiling = *patch.LimiterCeiling
  519. }
  520. if patch.RDSEnabled != nil {
  521. next.RDS.Enabled = *patch.RDSEnabled
  522. }
  523. if patch.PilotLevel != nil {
  524. next.FM.PilotLevel = *patch.PilotLevel
  525. }
  526. if patch.RDSInjection != nil {
  527. next.FM.RDSInjection = *patch.RDSInjection
  528. }
  529. if patch.BS412Enabled != nil {
  530. next.FM.BS412Enabled = *patch.BS412Enabled
  531. }
  532. if patch.BS412ThresholdDBr != nil {
  533. next.FM.BS412ThresholdDBr = *patch.BS412ThresholdDBr
  534. }
  535. if patch.MpxGain != nil {
  536. next.FM.MpxGain = *patch.MpxGain
  537. }
  538. if err := next.Validate(); err != nil {
  539. s.mu.Unlock()
  540. http.Error(w, err.Error(), http.StatusBadRequest)
  541. return
  542. }
  543. lp := LivePatch{
  544. FrequencyMHz: patch.FrequencyMHz,
  545. OutputDrive: patch.OutputDrive,
  546. StereoEnabled: patch.StereoEnabled,
  547. PilotLevel: patch.PilotLevel,
  548. RDSInjection: patch.RDSInjection,
  549. RDSEnabled: patch.RDSEnabled,
  550. LimiterEnabled: patch.LimiterEnabled,
  551. LimiterCeiling: patch.LimiterCeiling,
  552. PS: patch.PS,
  553. RadioText: patch.RadioText,
  554. ToneLeftHz: patch.ToneLeftHz,
  555. ToneRightHz: patch.ToneRightHz,
  556. ToneAmplitude: patch.ToneAmplitude,
  557. AudioGain: patch.AudioGain,
  558. }
  559. // NEU-02 fix: determine whether any live-patchable fields are present,
  560. // then release the lock before calling UpdateConfig to avoid holding
  561. // s.mu across a potentially blocking engine call.
  562. tx := s.tx
  563. hasLiveFields := patch.FrequencyMHz != nil || patch.OutputDrive != nil ||
  564. patch.StereoEnabled != nil || patch.PilotLevel != nil ||
  565. patch.RDSInjection != nil || patch.RDSEnabled != nil ||
  566. patch.LimiterEnabled != nil || patch.LimiterCeiling != nil ||
  567. patch.PS != nil || patch.RadioText != nil ||
  568. patch.ToneLeftHz != nil || patch.ToneRightHz != nil ||
  569. patch.ToneAmplitude != nil || patch.AudioGain != nil
  570. s.cfg = next
  571. s.mu.Unlock()
  572. // Apply live fields to running engine outside the lock.
  573. var updateErr error
  574. if tx != nil && hasLiveFields {
  575. if err := tx.UpdateConfig(lp); err != nil {
  576. updateErr = err
  577. }
  578. }
  579. if updateErr != nil {
  580. http.Error(w, updateErr.Error(), http.StatusBadRequest)
  581. return
  582. }
  583. // NEU-03 fix: report live=true only when live-patchable fields were applied.
  584. live := tx != nil && hasLiveFields
  585. w.Header().Set("Content-Type", "application/json")
  586. _ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "live": live})
  587. default:
  588. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  589. }
  590. }
  591. func (s *Server) handleIngestSave(w http.ResponseWriter, r *http.Request) {
  592. if r.Method != http.MethodPost {
  593. s.recordAudit(auditMethodNotAllowed)
  594. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  595. return
  596. }
  597. if !isJSONContentType(r) {
  598. s.recordAudit(auditUnsupportedMediaType)
  599. http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
  600. return
  601. }
  602. r.Body = http.MaxBytesReader(w, r.Body, maxConfigBodyBytes)
  603. var req IngestSaveRequest
  604. if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
  605. statusCode := http.StatusBadRequest
  606. if strings.Contains(err.Error(), "http: request body too large") {
  607. statusCode = http.StatusRequestEntityTooLarge
  608. s.recordAudit(auditBodyTooLarge)
  609. }
  610. http.Error(w, err.Error(), statusCode)
  611. return
  612. }
  613. s.mu.Lock()
  614. next := s.cfg
  615. next.Ingest = req.Ingest
  616. if err := next.Validate(); err != nil {
  617. s.mu.Unlock()
  618. http.Error(w, err.Error(), http.StatusBadRequest)
  619. return
  620. }
  621. save := s.saveConfig
  622. reload := s.hardReload
  623. if save == nil {
  624. s.mu.Unlock()
  625. http.Error(w, "config save is not configured (start with --config <path>)", http.StatusServiceUnavailable)
  626. return
  627. }
  628. if err := save(next); err != nil {
  629. s.mu.Unlock()
  630. http.Error(w, err.Error(), http.StatusInternalServerError)
  631. return
  632. }
  633. s.cfg = next
  634. s.mu.Unlock()
  635. w.Header().Set("Content-Type", "application/json")
  636. reloadScheduled := reload != nil
  637. _ = json.NewEncoder(w).Encode(map[string]any{
  638. "ok": true,
  639. "saved": true,
  640. "reloadScheduled": reloadScheduled,
  641. })
  642. if reloadScheduled {
  643. go func(fn func()) {
  644. time.Sleep(250 * time.Millisecond)
  645. fn()
  646. }(reload)
  647. }
  648. }