Wideband autonomous SDR analysis engine forked from sdr-visual-suite
No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

966 líneas
20KB

  1. package telemetry
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "path/filepath"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. type Config struct {
  17. Enabled bool `json:"enabled"`
  18. HeavyEnabled bool `json:"heavy_enabled"`
  19. HeavySampleEvery int `json:"heavy_sample_every"`
  20. MetricSampleEvery int `json:"metric_sample_every"`
  21. MetricHistoryMax int `json:"metric_history_max"`
  22. EventHistoryMax int `json:"event_history_max"`
  23. Retention time.Duration `json:"retention"`
  24. PersistEnabled bool `json:"persist_enabled"`
  25. PersistDir string `json:"persist_dir"`
  26. RotateMB int `json:"rotate_mb"`
  27. KeepFiles int `json:"keep_files"`
  28. }
  29. func DefaultConfig() Config {
  30. return Config{
  31. Enabled: true,
  32. HeavyEnabled: false,
  33. HeavySampleEvery: 12,
  34. MetricSampleEvery: 2,
  35. MetricHistoryMax: 12_000,
  36. EventHistoryMax: 4_000,
  37. Retention: 15 * time.Minute,
  38. PersistEnabled: false,
  39. PersistDir: "debug/telemetry",
  40. RotateMB: 16,
  41. KeepFiles: 8,
  42. }
  43. }
  44. type Tags map[string]string
  45. type MetricPoint struct {
  46. Timestamp time.Time `json:"ts"`
  47. Name string `json:"name"`
  48. Type string `json:"type"`
  49. Value float64 `json:"value"`
  50. Tags Tags `json:"tags,omitempty"`
  51. }
  52. type Event struct {
  53. ID uint64 `json:"id"`
  54. Timestamp time.Time `json:"ts"`
  55. Name string `json:"name"`
  56. Level string `json:"level"`
  57. Message string `json:"message,omitempty"`
  58. Tags Tags `json:"tags,omitempty"`
  59. Fields map[string]any `json:"fields,omitempty"`
  60. }
  61. type SeriesValue struct {
  62. Name string `json:"name"`
  63. Value float64 `json:"value"`
  64. Tags Tags `json:"tags,omitempty"`
  65. }
  66. type DistValue struct {
  67. Name string `json:"name"`
  68. Count int64 `json:"count"`
  69. Min float64 `json:"min"`
  70. Max float64 `json:"max"`
  71. Mean float64 `json:"mean"`
  72. Last float64 `json:"last"`
  73. P95 float64 `json:"p95"`
  74. Tags Tags `json:"tags,omitempty"`
  75. }
  76. type LiveSnapshot struct {
  77. Now time.Time `json:"now"`
  78. StartedAt time.Time `json:"started_at"`
  79. UptimeMs int64 `json:"uptime_ms"`
  80. Config Config `json:"config"`
  81. Counters []SeriesValue `json:"counters"`
  82. Gauges []SeriesValue `json:"gauges"`
  83. Distributions []DistValue `json:"distributions"`
  84. RecentEvents []Event `json:"recent_events"`
  85. Status map[string]any `json:"status,omitempty"`
  86. }
  87. type Query struct {
  88. From time.Time
  89. To time.Time
  90. Limit int
  91. Name string
  92. NamePrefix string
  93. Level string
  94. Tags Tags
  95. IncludePersisted bool
  96. }
  97. type collectorMetric struct {
  98. name string
  99. tags Tags
  100. value float64
  101. }
  102. type distMetric struct {
  103. name string
  104. tags Tags
  105. count int64
  106. sum float64
  107. min float64
  108. max float64
  109. last float64
  110. samples []float64
  111. next int
  112. full bool
  113. }
  114. type persistedEnvelope struct {
  115. Kind string `json:"kind"`
  116. Metric *MetricPoint `json:"metric,omitempty"`
  117. Event *Event `json:"event,omitempty"`
  118. }
  119. type Collector struct {
  120. mu sync.RWMutex
  121. cfg Config
  122. startedAt time.Time
  123. counterSeq uint64
  124. heavySeq uint64
  125. eventSeq uint64
  126. counters map[string]*collectorMetric
  127. gauges map[string]*collectorMetric
  128. dists map[string]*distMetric
  129. metricsHistory []MetricPoint
  130. events []Event
  131. status map[string]any
  132. writer *jsonlWriter
  133. }
  134. func New(cfg Config) (*Collector, error) {
  135. cfg = sanitizeConfig(cfg)
  136. c := &Collector{
  137. cfg: cfg,
  138. startedAt: time.Now().UTC(),
  139. counters: map[string]*collectorMetric{},
  140. gauges: map[string]*collectorMetric{},
  141. dists: map[string]*distMetric{},
  142. metricsHistory: make([]MetricPoint, 0, cfg.MetricHistoryMax),
  143. events: make([]Event, 0, cfg.EventHistoryMax),
  144. status: map[string]any{},
  145. }
  146. if cfg.PersistEnabled {
  147. writer, err := newJSONLWriter(cfg)
  148. if err != nil {
  149. return nil, err
  150. }
  151. c.writer = writer
  152. }
  153. return c, nil
  154. }
  155. func (c *Collector) Close() error {
  156. if c == nil {
  157. return nil
  158. }
  159. c.mu.Lock()
  160. writer := c.writer
  161. c.writer = nil
  162. c.mu.Unlock()
  163. if writer != nil {
  164. return writer.Close()
  165. }
  166. return nil
  167. }
  168. func (c *Collector) Configure(cfg Config) error {
  169. if c == nil {
  170. return nil
  171. }
  172. cfg = sanitizeConfig(cfg)
  173. var writer *jsonlWriter
  174. var err error
  175. if cfg.PersistEnabled {
  176. writer, err = newJSONLWriter(cfg)
  177. if err != nil {
  178. return err
  179. }
  180. }
  181. c.mu.Lock()
  182. old := c.writer
  183. c.cfg = cfg
  184. c.writer = writer
  185. c.trimLocked(time.Now().UTC())
  186. c.mu.Unlock()
  187. if old != nil {
  188. _ = old.Close()
  189. }
  190. return nil
  191. }
  192. func (c *Collector) Config() Config {
  193. c.mu.RLock()
  194. defer c.mu.RUnlock()
  195. return c.cfg
  196. }
  197. func (c *Collector) Enabled() bool {
  198. if c == nil {
  199. return false
  200. }
  201. c.mu.RLock()
  202. defer c.mu.RUnlock()
  203. return c.cfg.Enabled
  204. }
  205. func (c *Collector) ShouldSampleHeavy() bool {
  206. if c == nil {
  207. return false
  208. }
  209. c.mu.RLock()
  210. cfg := c.cfg
  211. c.mu.RUnlock()
  212. if !cfg.Enabled || !cfg.HeavyEnabled {
  213. return false
  214. }
  215. n := cfg.HeavySampleEvery
  216. if n <= 1 {
  217. return true
  218. }
  219. seq := atomic.AddUint64(&c.heavySeq, 1)
  220. return seq%uint64(n) == 0
  221. }
  222. func (c *Collector) SetStatus(key string, value any) {
  223. if c == nil {
  224. return
  225. }
  226. c.mu.Lock()
  227. c.status[key] = value
  228. c.mu.Unlock()
  229. }
  230. func (c *Collector) IncCounter(name string, delta float64, tags Tags) {
  231. c.recordMetric("counter", name, delta, tags, true)
  232. }
  233. func (c *Collector) SetGauge(name string, value float64, tags Tags) {
  234. c.recordMetric("gauge", name, value, tags, false)
  235. }
  236. func (c *Collector) Observe(name string, value float64, tags Tags) {
  237. c.recordMetric("distribution", name, value, tags, false)
  238. }
  239. func (c *Collector) Event(name string, level string, message string, tags Tags, fields map[string]any) {
  240. if c == nil {
  241. return
  242. }
  243. now := time.Now().UTC()
  244. c.mu.Lock()
  245. if !c.cfg.Enabled {
  246. c.mu.Unlock()
  247. return
  248. }
  249. ev := Event{
  250. ID: atomic.AddUint64(&c.eventSeq, 1),
  251. Timestamp: now,
  252. Name: name,
  253. Level: strings.TrimSpace(strings.ToLower(level)),
  254. Message: message,
  255. Tags: cloneTags(tags),
  256. Fields: cloneFields(fields),
  257. }
  258. if ev.Level == "" {
  259. ev.Level = "info"
  260. }
  261. c.events = append(c.events, ev)
  262. c.trimLocked(now)
  263. writer := c.writer
  264. c.mu.Unlock()
  265. if writer != nil {
  266. _ = writer.Write(persistedEnvelope{Kind: "event", Event: &ev})
  267. }
  268. }
  269. func (c *Collector) recordMetric(kind string, name string, value float64, tags Tags, add bool) {
  270. if c == nil || strings.TrimSpace(name) == "" {
  271. return
  272. }
  273. now := time.Now().UTC()
  274. c.mu.Lock()
  275. if !c.cfg.Enabled {
  276. c.mu.Unlock()
  277. return
  278. }
  279. key := metricKey(name, tags)
  280. switch kind {
  281. case "counter":
  282. m := c.counters[key]
  283. if m == nil {
  284. m = &collectorMetric{name: name, tags: cloneTags(tags)}
  285. c.counters[key] = m
  286. }
  287. if add {
  288. m.value += value
  289. } else {
  290. m.value = value
  291. }
  292. case "gauge":
  293. m := c.gauges[key]
  294. if m == nil {
  295. m = &collectorMetric{name: name, tags: cloneTags(tags)}
  296. c.gauges[key] = m
  297. }
  298. m.value = value
  299. case "distribution":
  300. d := c.dists[key]
  301. if d == nil {
  302. d = &distMetric{
  303. name: name,
  304. tags: cloneTags(tags),
  305. min: value,
  306. max: value,
  307. samples: make([]float64, 64),
  308. }
  309. c.dists[key] = d
  310. }
  311. d.count++
  312. d.sum += value
  313. d.last = value
  314. if d.count == 1 || value < d.min {
  315. d.min = value
  316. }
  317. if d.count == 1 || value > d.max {
  318. d.max = value
  319. }
  320. if len(d.samples) > 0 {
  321. d.samples[d.next] = value
  322. d.next++
  323. if d.next >= len(d.samples) {
  324. d.next = 0
  325. d.full = true
  326. }
  327. }
  328. }
  329. sampleN := c.cfg.MetricSampleEvery
  330. seq := atomic.AddUint64(&c.counterSeq, 1)
  331. shouldStore := sampleN <= 1 || seq%uint64(sampleN) == 0 || kind == "counter"
  332. var mp MetricPoint
  333. if shouldStore {
  334. mp = MetricPoint{
  335. Timestamp: now,
  336. Name: name,
  337. Type: kind,
  338. Value: value,
  339. Tags: cloneTags(tags),
  340. }
  341. c.metricsHistory = append(c.metricsHistory, mp)
  342. }
  343. c.trimLocked(now)
  344. writer := c.writer
  345. c.mu.Unlock()
  346. if writer != nil && shouldStore {
  347. _ = writer.Write(persistedEnvelope{Kind: "metric", Metric: &mp})
  348. }
  349. }
  350. func (c *Collector) LiveSnapshot() LiveSnapshot {
  351. now := time.Now().UTC()
  352. c.mu.RLock()
  353. cfg := c.cfg
  354. out := LiveSnapshot{
  355. Now: now,
  356. StartedAt: c.startedAt,
  357. UptimeMs: now.Sub(c.startedAt).Milliseconds(),
  358. Config: cfg,
  359. Counters: make([]SeriesValue, 0, len(c.counters)),
  360. Gauges: make([]SeriesValue, 0, len(c.gauges)),
  361. Distributions: make([]DistValue, 0, len(c.dists)),
  362. RecentEvents: make([]Event, 0, min(40, len(c.events))),
  363. Status: cloneFields(c.status),
  364. }
  365. for _, m := range c.counters {
  366. out.Counters = append(out.Counters, SeriesValue{Name: m.name, Value: m.value, Tags: cloneTags(m.tags)})
  367. }
  368. for _, m := range c.gauges {
  369. out.Gauges = append(out.Gauges, SeriesValue{Name: m.name, Value: m.value, Tags: cloneTags(m.tags)})
  370. }
  371. for _, d := range c.dists {
  372. mean := 0.0
  373. if d.count > 0 {
  374. mean = d.sum / float64(d.count)
  375. }
  376. out.Distributions = append(out.Distributions, DistValue{
  377. Name: d.name,
  378. Count: d.count,
  379. Min: d.min,
  380. Max: d.max,
  381. Mean: mean,
  382. Last: d.last,
  383. P95: p95FromDist(d),
  384. Tags: cloneTags(d.tags),
  385. })
  386. }
  387. start := len(c.events) - cap(out.RecentEvents)
  388. if start < 0 {
  389. start = 0
  390. }
  391. for _, ev := range c.events[start:] {
  392. out.RecentEvents = append(out.RecentEvents, copyEvent(ev))
  393. }
  394. c.mu.RUnlock()
  395. sort.Slice(out.Counters, func(i, j int) bool { return out.Counters[i].Name < out.Counters[j].Name })
  396. sort.Slice(out.Gauges, func(i, j int) bool { return out.Gauges[i].Name < out.Gauges[j].Name })
  397. sort.Slice(out.Distributions, func(i, j int) bool { return out.Distributions[i].Name < out.Distributions[j].Name })
  398. return out
  399. }
  400. func (c *Collector) QueryMetrics(q Query) ([]MetricPoint, error) {
  401. if c == nil {
  402. return nil, nil
  403. }
  404. q = normalizeQuery(q)
  405. c.mu.RLock()
  406. items := make([]MetricPoint, 0, len(c.metricsHistory))
  407. for _, m := range c.metricsHistory {
  408. if metricMatch(m, q) {
  409. items = append(items, copyMetric(m))
  410. }
  411. }
  412. cfg := c.cfg
  413. c.mu.RUnlock()
  414. if q.IncludePersisted && cfg.PersistEnabled {
  415. persisted, err := readPersistedMetrics(cfg, q)
  416. if err != nil && !errors.Is(err, os.ErrNotExist) {
  417. return nil, err
  418. }
  419. items = append(items, persisted...)
  420. }
  421. sort.Slice(items, func(i, j int) bool {
  422. return items[i].Timestamp.Before(items[j].Timestamp)
  423. })
  424. if q.Limit > 0 && len(items) > q.Limit {
  425. items = items[len(items)-q.Limit:]
  426. }
  427. return items, nil
  428. }
  429. func (c *Collector) QueryEvents(q Query) ([]Event, error) {
  430. if c == nil {
  431. return nil, nil
  432. }
  433. q = normalizeQuery(q)
  434. c.mu.RLock()
  435. items := make([]Event, 0, len(c.events))
  436. for _, ev := range c.events {
  437. if eventMatch(ev, q) {
  438. items = append(items, copyEvent(ev))
  439. }
  440. }
  441. cfg := c.cfg
  442. c.mu.RUnlock()
  443. if q.IncludePersisted && cfg.PersistEnabled {
  444. persisted, err := readPersistedEvents(cfg, q)
  445. if err != nil && !errors.Is(err, os.ErrNotExist) {
  446. return nil, err
  447. }
  448. items = append(items, persisted...)
  449. }
  450. sort.Slice(items, func(i, j int) bool {
  451. return items[i].Timestamp.Before(items[j].Timestamp)
  452. })
  453. if q.Limit > 0 && len(items) > q.Limit {
  454. items = items[len(items)-q.Limit:]
  455. }
  456. return items, nil
  457. }
  458. func (c *Collector) trimLocked(now time.Time) {
  459. if c.cfg.MetricHistoryMax > 0 && len(c.metricsHistory) > c.cfg.MetricHistoryMax {
  460. c.metricsHistory = append([]MetricPoint(nil), c.metricsHistory[len(c.metricsHistory)-c.cfg.MetricHistoryMax:]...)
  461. }
  462. if c.cfg.EventHistoryMax > 0 && len(c.events) > c.cfg.EventHistoryMax {
  463. c.events = append([]Event(nil), c.events[len(c.events)-c.cfg.EventHistoryMax:]...)
  464. }
  465. ret := c.cfg.Retention
  466. if ret <= 0 {
  467. return
  468. }
  469. cut := now.Add(-ret)
  470. mStart := 0
  471. for mStart < len(c.metricsHistory) && c.metricsHistory[mStart].Timestamp.Before(cut) {
  472. mStart++
  473. }
  474. if mStart > 0 {
  475. c.metricsHistory = append([]MetricPoint(nil), c.metricsHistory[mStart:]...)
  476. }
  477. eStart := 0
  478. for eStart < len(c.events) && c.events[eStart].Timestamp.Before(cut) {
  479. eStart++
  480. }
  481. if eStart > 0 {
  482. c.events = append([]Event(nil), c.events[eStart:]...)
  483. }
  484. }
  485. func sanitizeConfig(cfg Config) Config {
  486. def := DefaultConfig()
  487. if cfg.HeavySampleEvery <= 0 {
  488. cfg.HeavySampleEvery = def.HeavySampleEvery
  489. }
  490. if cfg.MetricSampleEvery <= 0 {
  491. cfg.MetricSampleEvery = def.MetricSampleEvery
  492. }
  493. if cfg.MetricHistoryMax <= 0 {
  494. cfg.MetricHistoryMax = def.MetricHistoryMax
  495. }
  496. if cfg.EventHistoryMax <= 0 {
  497. cfg.EventHistoryMax = def.EventHistoryMax
  498. }
  499. if cfg.Retention <= 0 {
  500. cfg.Retention = def.Retention
  501. }
  502. if strings.TrimSpace(cfg.PersistDir) == "" {
  503. cfg.PersistDir = def.PersistDir
  504. }
  505. if cfg.RotateMB <= 0 {
  506. cfg.RotateMB = def.RotateMB
  507. }
  508. if cfg.KeepFiles <= 0 {
  509. cfg.KeepFiles = def.KeepFiles
  510. }
  511. return cfg
  512. }
  513. func normalizeQuery(q Query) Query {
  514. if q.Limit <= 0 || q.Limit > 5000 {
  515. q.Limit = 500
  516. }
  517. if q.Tags == nil {
  518. q.Tags = Tags{}
  519. }
  520. return q
  521. }
  522. func metricMatch(m MetricPoint, q Query) bool {
  523. if !q.From.IsZero() && m.Timestamp.Before(q.From) {
  524. return false
  525. }
  526. if !q.To.IsZero() && m.Timestamp.After(q.To) {
  527. return false
  528. }
  529. if q.Name != "" && m.Name != q.Name {
  530. return false
  531. }
  532. if q.NamePrefix != "" && !strings.HasPrefix(m.Name, q.NamePrefix) {
  533. return false
  534. }
  535. for k, v := range q.Tags {
  536. if m.Tags[k] != v {
  537. return false
  538. }
  539. }
  540. return true
  541. }
  542. func eventMatch(ev Event, q Query) bool {
  543. if !q.From.IsZero() && ev.Timestamp.Before(q.From) {
  544. return false
  545. }
  546. if !q.To.IsZero() && ev.Timestamp.After(q.To) {
  547. return false
  548. }
  549. if q.Name != "" && ev.Name != q.Name {
  550. return false
  551. }
  552. if q.NamePrefix != "" && !strings.HasPrefix(ev.Name, q.NamePrefix) {
  553. return false
  554. }
  555. if q.Level != "" && !strings.EqualFold(q.Level, ev.Level) {
  556. return false
  557. }
  558. for k, v := range q.Tags {
  559. if ev.Tags[k] != v {
  560. return false
  561. }
  562. }
  563. return true
  564. }
  565. func metricKey(name string, tags Tags) string {
  566. if len(tags) == 0 {
  567. return name
  568. }
  569. keys := make([]string, 0, len(tags))
  570. for k := range tags {
  571. keys = append(keys, k)
  572. }
  573. sort.Strings(keys)
  574. var b strings.Builder
  575. b.Grow(len(name) + len(keys)*16)
  576. b.WriteString(name)
  577. for _, k := range keys {
  578. b.WriteString("|")
  579. b.WriteString(k)
  580. b.WriteString("=")
  581. b.WriteString(tags[k])
  582. }
  583. return b.String()
  584. }
  585. func cloneTags(tags Tags) Tags {
  586. if len(tags) == 0 {
  587. return nil
  588. }
  589. out := make(Tags, len(tags))
  590. for k, v := range tags {
  591. out[k] = v
  592. }
  593. return out
  594. }
  595. func cloneFields(fields map[string]any) map[string]any {
  596. if len(fields) == 0 {
  597. return nil
  598. }
  599. out := make(map[string]any, len(fields))
  600. for k, v := range fields {
  601. out[k] = v
  602. }
  603. return out
  604. }
  605. func copyMetric(m MetricPoint) MetricPoint {
  606. return MetricPoint{
  607. Timestamp: m.Timestamp,
  608. Name: m.Name,
  609. Type: m.Type,
  610. Value: m.Value,
  611. Tags: cloneTags(m.Tags),
  612. }
  613. }
  614. func copyEvent(ev Event) Event {
  615. return Event{
  616. ID: ev.ID,
  617. Timestamp: ev.Timestamp,
  618. Name: ev.Name,
  619. Level: ev.Level,
  620. Message: ev.Message,
  621. Tags: cloneTags(ev.Tags),
  622. Fields: cloneFields(ev.Fields),
  623. }
  624. }
  625. func p95FromDist(d *distMetric) float64 {
  626. if d == nil || d.count == 0 {
  627. return 0
  628. }
  629. n := d.next
  630. if d.full {
  631. n = len(d.samples)
  632. }
  633. if n <= 0 {
  634. return d.last
  635. }
  636. buf := make([]float64, n)
  637. copy(buf, d.samples[:n])
  638. sort.Float64s(buf)
  639. idx := int(float64(n-1) * 0.95)
  640. if idx < 0 {
  641. idx = 0
  642. }
  643. if idx >= n {
  644. idx = n - 1
  645. }
  646. return buf[idx]
  647. }
  648. type jsonlWriter struct {
  649. cfg Config
  650. mu sync.Mutex
  651. dir string
  652. f *os.File
  653. w *bufio.Writer
  654. currentPath string
  655. currentSize int64
  656. seq int64
  657. }
  658. func newJSONLWriter(cfg Config) (*jsonlWriter, error) {
  659. dir := filepath.Clean(cfg.PersistDir)
  660. if err := os.MkdirAll(dir, 0o755); err != nil {
  661. return nil, err
  662. }
  663. w := &jsonlWriter{cfg: cfg, dir: dir}
  664. if err := w.rotateLocked(); err != nil {
  665. return nil, err
  666. }
  667. return w, nil
  668. }
  669. func (w *jsonlWriter) Write(v persistedEnvelope) error {
  670. w.mu.Lock()
  671. defer w.mu.Unlock()
  672. if w.f == nil || w.w == nil {
  673. return nil
  674. }
  675. line, err := json.Marshal(v)
  676. if err != nil {
  677. return err
  678. }
  679. line = append(line, '\n')
  680. if w.currentSize+int64(len(line)) > int64(w.cfg.RotateMB)*1024*1024 {
  681. if err := w.rotateLocked(); err != nil {
  682. return err
  683. }
  684. }
  685. n, err := w.w.Write(line)
  686. w.currentSize += int64(n)
  687. if err != nil {
  688. return err
  689. }
  690. return w.w.Flush()
  691. }
  692. func (w *jsonlWriter) Close() error {
  693. w.mu.Lock()
  694. defer w.mu.Unlock()
  695. if w.w != nil {
  696. _ = w.w.Flush()
  697. }
  698. if w.f != nil {
  699. err := w.f.Close()
  700. w.f = nil
  701. w.w = nil
  702. return err
  703. }
  704. return nil
  705. }
  706. func (w *jsonlWriter) rotateLocked() error {
  707. if w.w != nil {
  708. _ = w.w.Flush()
  709. }
  710. if w.f != nil {
  711. _ = w.f.Close()
  712. }
  713. w.seq++
  714. name := fmt.Sprintf("telemetry-%s-%04d.jsonl", time.Now().UTC().Format("20060102-150405"), w.seq)
  715. path := filepath.Join(w.dir, name)
  716. f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  717. if err != nil {
  718. return err
  719. }
  720. info, _ := f.Stat()
  721. size := int64(0)
  722. if info != nil {
  723. size = info.Size()
  724. }
  725. w.f = f
  726. w.w = bufio.NewWriterSize(f, 64*1024)
  727. w.currentPath = path
  728. w.currentSize = size
  729. _ = pruneFiles(w.dir, w.cfg.KeepFiles)
  730. return nil
  731. }
  732. func pruneFiles(dir string, keep int) error {
  733. if keep <= 0 {
  734. return nil
  735. }
  736. ents, err := os.ReadDir(dir)
  737. if err != nil {
  738. return err
  739. }
  740. files := make([]string, 0, len(ents))
  741. for _, ent := range ents {
  742. if ent.IsDir() {
  743. continue
  744. }
  745. name := ent.Name()
  746. if !strings.HasPrefix(name, "telemetry-") || !strings.HasSuffix(name, ".jsonl") {
  747. continue
  748. }
  749. files = append(files, filepath.Join(dir, name))
  750. }
  751. if len(files) <= keep {
  752. return nil
  753. }
  754. sort.Strings(files)
  755. for _, path := range files[:len(files)-keep] {
  756. _ = os.Remove(path)
  757. }
  758. return nil
  759. }
  760. func readPersistedMetrics(cfg Config, q Query) ([]MetricPoint, error) {
  761. files, err := listPersistedFiles(cfg.PersistDir)
  762. if err != nil {
  763. return nil, err
  764. }
  765. out := make([]MetricPoint, 0, 256)
  766. for _, path := range files {
  767. points, err := parsePersistedFile(path, q)
  768. if err != nil {
  769. continue
  770. }
  771. for _, p := range points.metrics {
  772. if metricMatch(p, q) {
  773. out = append(out, p)
  774. }
  775. }
  776. }
  777. return out, nil
  778. }
  779. func readPersistedEvents(cfg Config, q Query) ([]Event, error) {
  780. files, err := listPersistedFiles(cfg.PersistDir)
  781. if err != nil {
  782. return nil, err
  783. }
  784. out := make([]Event, 0, 128)
  785. for _, path := range files {
  786. points, err := parsePersistedFile(path, q)
  787. if err != nil {
  788. continue
  789. }
  790. for _, ev := range points.events {
  791. if eventMatch(ev, q) {
  792. out = append(out, ev)
  793. }
  794. }
  795. }
  796. return out, nil
  797. }
  798. type parsedFile struct {
  799. metrics []MetricPoint
  800. events []Event
  801. }
  802. func parsePersistedFile(path string, q Query) (parsedFile, error) {
  803. f, err := os.Open(path)
  804. if err != nil {
  805. return parsedFile{}, err
  806. }
  807. defer f.Close()
  808. out := parsedFile{
  809. metrics: make([]MetricPoint, 0, 64),
  810. events: make([]Event, 0, 32),
  811. }
  812. s := bufio.NewScanner(f)
  813. s.Buffer(make([]byte, 0, 32*1024), 1024*1024)
  814. for s.Scan() {
  815. line := s.Bytes()
  816. if len(line) == 0 {
  817. continue
  818. }
  819. var env persistedEnvelope
  820. if err := json.Unmarshal(line, &env); err != nil {
  821. continue
  822. }
  823. if env.Metric != nil {
  824. out.metrics = append(out.metrics, *env.Metric)
  825. } else if env.Event != nil {
  826. out.events = append(out.events, *env.Event)
  827. }
  828. if q.Limit > 0 && len(out.metrics)+len(out.events) > q.Limit*2 {
  829. // keep bounded while scanning
  830. if len(out.metrics) > q.Limit {
  831. out.metrics = out.metrics[len(out.metrics)-q.Limit:]
  832. }
  833. if len(out.events) > q.Limit {
  834. out.events = out.events[len(out.events)-q.Limit:]
  835. }
  836. }
  837. }
  838. return out, s.Err()
  839. }
  840. func listPersistedFiles(dir string) ([]string, error) {
  841. ents, err := os.ReadDir(dir)
  842. if err != nil {
  843. return nil, err
  844. }
  845. files := make([]string, 0, len(ents))
  846. for _, ent := range ents {
  847. if ent.IsDir() {
  848. continue
  849. }
  850. name := ent.Name()
  851. if strings.HasPrefix(name, "telemetry-") && strings.HasSuffix(name, ".jsonl") {
  852. files = append(files, filepath.Join(dir, name))
  853. }
  854. }
  855. sort.Strings(files)
  856. return files, nil
  857. }
  858. func ParseTimeQuery(raw string) (time.Time, error) {
  859. raw = strings.TrimSpace(raw)
  860. if raw == "" {
  861. return time.Time{}, nil
  862. }
  863. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  864. if ms > 1e12 {
  865. return time.UnixMilli(ms).UTC(), nil
  866. }
  867. return time.Unix(ms, 0).UTC(), nil
  868. }
  869. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  870. return t.UTC(), nil
  871. }
  872. if t, err := time.Parse(time.RFC3339, raw); err == nil {
  873. return t.UTC(), nil
  874. }
  875. return time.Time{}, errors.New("invalid time query")
  876. }
  877. func TagsWith(base Tags, key string, value any) Tags {
  878. out := cloneTags(base)
  879. if out == nil {
  880. out = Tags{}
  881. }
  882. out[key] = fmt.Sprint(value)
  883. return out
  884. }
  885. func TagsFromPairs(kv ...string) Tags {
  886. if len(kv) < 2 {
  887. return nil
  888. }
  889. out := Tags{}
  890. for i := 0; i+1 < len(kv); i += 2 {
  891. k := strings.TrimSpace(kv[i])
  892. if k == "" {
  893. continue
  894. }
  895. out[k] = kv[i+1]
  896. }
  897. if len(out) == 0 {
  898. return nil
  899. }
  900. return out
  901. }
  902. func min(a int, b int) int {
  903. if a < b {
  904. return a
  905. }
  906. return b
  907. }