Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

967 рядки
20KB

  1. package telemetry
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "path/filepath"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. type Config struct {
  17. Enabled bool `json:"enabled"`
  18. HeavyEnabled bool `json:"heavy_enabled"`
  19. HeavySampleEvery int `json:"heavy_sample_every"`
  20. MetricSampleEvery int `json:"metric_sample_every"`
  21. MetricHistoryMax int `json:"metric_history_max"`
  22. EventHistoryMax int `json:"event_history_max"`
  23. Retention time.Duration `json:"retention"`
  24. PersistEnabled bool `json:"persist_enabled"`
  25. PersistDir string `json:"persist_dir"`
  26. RotateMB int `json:"rotate_mb"`
  27. KeepFiles int `json:"keep_files"`
  28. }
  29. func DefaultConfig() Config {
  30. return Config{
  31. Enabled: true,
  32. HeavyEnabled: false,
  33. HeavySampleEvery: 12,
  34. MetricSampleEvery: 2,
  35. MetricHistoryMax: 12_000,
  36. EventHistoryMax: 4_000,
  37. Retention: 15 * time.Minute,
  38. PersistEnabled: false,
  39. PersistDir: "debug/telemetry",
  40. RotateMB: 16,
  41. KeepFiles: 8,
  42. }
  43. }
  44. type Tags map[string]string
  45. type MetricPoint struct {
  46. Timestamp time.Time `json:"ts"`
  47. Name string `json:"name"`
  48. Type string `json:"type"`
  49. Value float64 `json:"value"`
  50. Tags Tags `json:"tags,omitempty"`
  51. }
  52. type Event struct {
  53. ID uint64 `json:"id"`
  54. Timestamp time.Time `json:"ts"`
  55. Name string `json:"name"`
  56. Level string `json:"level"`
  57. Message string `json:"message,omitempty"`
  58. Tags Tags `json:"tags,omitempty"`
  59. Fields map[string]any `json:"fields,omitempty"`
  60. }
  61. type SeriesValue struct {
  62. Name string `json:"name"`
  63. Value float64 `json:"value"`
  64. Tags Tags `json:"tags,omitempty"`
  65. }
  66. type DistValue struct {
  67. Name string `json:"name"`
  68. Count int64 `json:"count"`
  69. Min float64 `json:"min"`
  70. Max float64 `json:"max"`
  71. Mean float64 `json:"mean"`
  72. Last float64 `json:"last"`
  73. P95 float64 `json:"p95"`
  74. Tags Tags `json:"tags,omitempty"`
  75. }
  76. type LiveSnapshot struct {
  77. Now time.Time `json:"now"`
  78. StartedAt time.Time `json:"started_at"`
  79. UptimeMs int64 `json:"uptime_ms"`
  80. Config Config `json:"config"`
  81. Counters []SeriesValue `json:"counters"`
  82. Gauges []SeriesValue `json:"gauges"`
  83. Distributions []DistValue `json:"distributions"`
  84. RecentEvents []Event `json:"recent_events"`
  85. Status map[string]any `json:"status,omitempty"`
  86. }
  87. type Query struct {
  88. From time.Time
  89. To time.Time
  90. Limit int
  91. Name string
  92. NamePrefix string
  93. Level string
  94. Tags Tags
  95. IncludePersisted bool
  96. }
  97. type collectorMetric struct {
  98. name string
  99. tags Tags
  100. value float64
  101. }
  102. type distMetric struct {
  103. name string
  104. tags Tags
  105. count int64
  106. sum float64
  107. min float64
  108. max float64
  109. last float64
  110. samples []float64
  111. next int
  112. full bool
  113. }
  114. type persistedEnvelope struct {
  115. Kind string `json:"kind"`
  116. Metric *MetricPoint `json:"metric,omitempty"`
  117. Event *Event `json:"event,omitempty"`
  118. }
  119. type Collector struct {
  120. mu sync.RWMutex
  121. cfg Config
  122. startedAt time.Time
  123. counterSeq uint64
  124. heavySeq uint64
  125. eventSeq uint64
  126. counters map[string]*collectorMetric
  127. gauges map[string]*collectorMetric
  128. dists map[string]*distMetric
  129. metricsHistory []MetricPoint
  130. events []Event
  131. status map[string]any
  132. writer *jsonlWriter
  133. }
  134. func New(cfg Config) (*Collector, error) {
  135. cfg = sanitizeConfig(cfg)
  136. c := &Collector{
  137. cfg: cfg,
  138. startedAt: time.Now().UTC(),
  139. counters: map[string]*collectorMetric{},
  140. gauges: map[string]*collectorMetric{},
  141. dists: map[string]*distMetric{},
  142. metricsHistory: make([]MetricPoint, 0, cfg.MetricHistoryMax),
  143. events: make([]Event, 0, cfg.EventHistoryMax),
  144. status: map[string]any{},
  145. }
  146. if cfg.PersistEnabled {
  147. writer, err := newJSONLWriter(cfg)
  148. if err != nil {
  149. return nil, err
  150. }
  151. c.writer = writer
  152. }
  153. return c, nil
  154. }
  155. func (c *Collector) Close() error {
  156. if c == nil {
  157. return nil
  158. }
  159. c.mu.Lock()
  160. writer := c.writer
  161. c.writer = nil
  162. c.mu.Unlock()
  163. if writer != nil {
  164. return writer.Close()
  165. }
  166. return nil
  167. }
  168. func (c *Collector) Configure(cfg Config) error {
  169. if c == nil {
  170. return nil
  171. }
  172. cfg = sanitizeConfig(cfg)
  173. var writer *jsonlWriter
  174. var err error
  175. if cfg.PersistEnabled {
  176. writer, err = newJSONLWriter(cfg)
  177. if err != nil {
  178. return err
  179. }
  180. }
  181. c.mu.Lock()
  182. old := c.writer
  183. c.cfg = cfg
  184. c.writer = writer
  185. c.trimLocked(time.Now().UTC())
  186. c.mu.Unlock()
  187. if old != nil {
  188. _ = old.Close()
  189. }
  190. return nil
  191. }
  192. func (c *Collector) Config() Config {
  193. c.mu.RLock()
  194. defer c.mu.RUnlock()
  195. return c.cfg
  196. }
  197. func (c *Collector) Enabled() bool {
  198. if c == nil {
  199. return false
  200. }
  201. c.mu.RLock()
  202. defer c.mu.RUnlock()
  203. return c.cfg.Enabled
  204. }
  205. func (c *Collector) ShouldSampleHeavy() bool {
  206. if c == nil {
  207. return false
  208. }
  209. c.mu.RLock()
  210. cfg := c.cfg
  211. c.mu.RUnlock()
  212. if !cfg.Enabled || !cfg.HeavyEnabled {
  213. return false
  214. }
  215. n := cfg.HeavySampleEvery
  216. if n <= 1 {
  217. return true
  218. }
  219. seq := atomic.AddUint64(&c.heavySeq, 1)
  220. return seq%uint64(n) == 0
  221. }
  222. func (c *Collector) SetStatus(key string, value any) {
  223. if c == nil {
  224. return
  225. }
  226. c.mu.Lock()
  227. c.status[key] = value
  228. c.mu.Unlock()
  229. }
  230. func (c *Collector) IncCounter(name string, delta float64, tags Tags) {
  231. c.recordMetric("counter", name, delta, tags, true)
  232. }
  233. func (c *Collector) SetGauge(name string, value float64, tags Tags) {
  234. c.recordMetric("gauge", name, value, tags, false)
  235. }
  236. func (c *Collector) Observe(name string, value float64, tags Tags) {
  237. c.recordMetric("distribution", name, value, tags, false)
  238. }
  239. func (c *Collector) Event(name string, level string, message string, tags Tags, fields map[string]any) {
  240. if c == nil {
  241. return
  242. }
  243. now := time.Now().UTC()
  244. c.mu.Lock()
  245. if !c.cfg.Enabled {
  246. c.mu.Unlock()
  247. return
  248. }
  249. ev := Event{
  250. ID: atomic.AddUint64(&c.eventSeq, 1),
  251. Timestamp: now,
  252. Name: name,
  253. Level: strings.TrimSpace(strings.ToLower(level)),
  254. Message: message,
  255. Tags: cloneTags(tags),
  256. Fields: cloneFields(fields),
  257. }
  258. if ev.Level == "" {
  259. ev.Level = "info"
  260. }
  261. c.events = append(c.events, ev)
  262. c.trimLocked(now)
  263. writer := c.writer
  264. c.mu.Unlock()
  265. if writer != nil {
  266. _ = writer.Write(persistedEnvelope{Kind: "event", Event: &ev})
  267. }
  268. }
  269. func (c *Collector) recordMetric(kind string, name string, value float64, tags Tags, add bool) {
  270. if c == nil || strings.TrimSpace(name) == "" {
  271. return
  272. }
  273. now := time.Now().UTC()
  274. c.mu.Lock()
  275. if !c.cfg.Enabled {
  276. c.mu.Unlock()
  277. return
  278. }
  279. key := metricKey(name, tags)
  280. switch kind {
  281. case "counter":
  282. m := c.counters[key]
  283. if m == nil {
  284. m = &collectorMetric{name: name, tags: cloneTags(tags)}
  285. c.counters[key] = m
  286. }
  287. if add {
  288. m.value += value
  289. } else {
  290. m.value = value
  291. }
  292. case "gauge":
  293. m := c.gauges[key]
  294. if m == nil {
  295. m = &collectorMetric{name: name, tags: cloneTags(tags)}
  296. c.gauges[key] = m
  297. }
  298. m.value = value
  299. case "distribution":
  300. d := c.dists[key]
  301. if d == nil {
  302. d = &distMetric{
  303. name: name,
  304. tags: cloneTags(tags),
  305. min: value,
  306. max: value,
  307. samples: make([]float64, 64),
  308. }
  309. c.dists[key] = d
  310. }
  311. d.count++
  312. d.sum += value
  313. d.last = value
  314. if d.count == 1 || value < d.min {
  315. d.min = value
  316. }
  317. if d.count == 1 || value > d.max {
  318. d.max = value
  319. }
  320. if len(d.samples) > 0 {
  321. d.samples[d.next] = value
  322. d.next++
  323. if d.next >= len(d.samples) {
  324. d.next = 0
  325. d.full = true
  326. }
  327. }
  328. }
  329. sampleN := c.cfg.MetricSampleEvery
  330. seq := atomic.AddUint64(&c.counterSeq, 1)
  331. forceStore := strings.HasPrefix(name, "iq.extract.raw.boundary.") || strings.HasPrefix(name, "iq.extract.trimmed.boundary.")
  332. shouldStore := forceStore || sampleN <= 1 || seq%uint64(sampleN) == 0 || kind == "counter"
  333. var mp MetricPoint
  334. if shouldStore {
  335. mp = MetricPoint{
  336. Timestamp: now,
  337. Name: name,
  338. Type: kind,
  339. Value: value,
  340. Tags: cloneTags(tags),
  341. }
  342. c.metricsHistory = append(c.metricsHistory, mp)
  343. }
  344. c.trimLocked(now)
  345. writer := c.writer
  346. c.mu.Unlock()
  347. if writer != nil && shouldStore {
  348. _ = writer.Write(persistedEnvelope{Kind: "metric", Metric: &mp})
  349. }
  350. }
  351. func (c *Collector) LiveSnapshot() LiveSnapshot {
  352. now := time.Now().UTC()
  353. c.mu.RLock()
  354. cfg := c.cfg
  355. out := LiveSnapshot{
  356. Now: now,
  357. StartedAt: c.startedAt,
  358. UptimeMs: now.Sub(c.startedAt).Milliseconds(),
  359. Config: cfg,
  360. Counters: make([]SeriesValue, 0, len(c.counters)),
  361. Gauges: make([]SeriesValue, 0, len(c.gauges)),
  362. Distributions: make([]DistValue, 0, len(c.dists)),
  363. RecentEvents: make([]Event, 0, min(40, len(c.events))),
  364. Status: cloneFields(c.status),
  365. }
  366. for _, m := range c.counters {
  367. out.Counters = append(out.Counters, SeriesValue{Name: m.name, Value: m.value, Tags: cloneTags(m.tags)})
  368. }
  369. for _, m := range c.gauges {
  370. out.Gauges = append(out.Gauges, SeriesValue{Name: m.name, Value: m.value, Tags: cloneTags(m.tags)})
  371. }
  372. for _, d := range c.dists {
  373. mean := 0.0
  374. if d.count > 0 {
  375. mean = d.sum / float64(d.count)
  376. }
  377. out.Distributions = append(out.Distributions, DistValue{
  378. Name: d.name,
  379. Count: d.count,
  380. Min: d.min,
  381. Max: d.max,
  382. Mean: mean,
  383. Last: d.last,
  384. P95: p95FromDist(d),
  385. Tags: cloneTags(d.tags),
  386. })
  387. }
  388. start := len(c.events) - cap(out.RecentEvents)
  389. if start < 0 {
  390. start = 0
  391. }
  392. for _, ev := range c.events[start:] {
  393. out.RecentEvents = append(out.RecentEvents, copyEvent(ev))
  394. }
  395. c.mu.RUnlock()
  396. sort.Slice(out.Counters, func(i, j int) bool { return out.Counters[i].Name < out.Counters[j].Name })
  397. sort.Slice(out.Gauges, func(i, j int) bool { return out.Gauges[i].Name < out.Gauges[j].Name })
  398. sort.Slice(out.Distributions, func(i, j int) bool { return out.Distributions[i].Name < out.Distributions[j].Name })
  399. return out
  400. }
  401. func (c *Collector) QueryMetrics(q Query) ([]MetricPoint, error) {
  402. if c == nil {
  403. return nil, nil
  404. }
  405. q = normalizeQuery(q)
  406. c.mu.RLock()
  407. items := make([]MetricPoint, 0, len(c.metricsHistory))
  408. for _, m := range c.metricsHistory {
  409. if metricMatch(m, q) {
  410. items = append(items, copyMetric(m))
  411. }
  412. }
  413. cfg := c.cfg
  414. c.mu.RUnlock()
  415. if q.IncludePersisted && cfg.PersistEnabled {
  416. persisted, err := readPersistedMetrics(cfg, q)
  417. if err != nil && !errors.Is(err, os.ErrNotExist) {
  418. return nil, err
  419. }
  420. items = append(items, persisted...)
  421. }
  422. sort.Slice(items, func(i, j int) bool {
  423. return items[i].Timestamp.Before(items[j].Timestamp)
  424. })
  425. if q.Limit > 0 && len(items) > q.Limit {
  426. items = items[len(items)-q.Limit:]
  427. }
  428. return items, nil
  429. }
  430. func (c *Collector) QueryEvents(q Query) ([]Event, error) {
  431. if c == nil {
  432. return nil, nil
  433. }
  434. q = normalizeQuery(q)
  435. c.mu.RLock()
  436. items := make([]Event, 0, len(c.events))
  437. for _, ev := range c.events {
  438. if eventMatch(ev, q) {
  439. items = append(items, copyEvent(ev))
  440. }
  441. }
  442. cfg := c.cfg
  443. c.mu.RUnlock()
  444. if q.IncludePersisted && cfg.PersistEnabled {
  445. persisted, err := readPersistedEvents(cfg, q)
  446. if err != nil && !errors.Is(err, os.ErrNotExist) {
  447. return nil, err
  448. }
  449. items = append(items, persisted...)
  450. }
  451. sort.Slice(items, func(i, j int) bool {
  452. return items[i].Timestamp.Before(items[j].Timestamp)
  453. })
  454. if q.Limit > 0 && len(items) > q.Limit {
  455. items = items[len(items)-q.Limit:]
  456. }
  457. return items, nil
  458. }
  459. func (c *Collector) trimLocked(now time.Time) {
  460. if c.cfg.MetricHistoryMax > 0 && len(c.metricsHistory) > c.cfg.MetricHistoryMax {
  461. c.metricsHistory = append([]MetricPoint(nil), c.metricsHistory[len(c.metricsHistory)-c.cfg.MetricHistoryMax:]...)
  462. }
  463. if c.cfg.EventHistoryMax > 0 && len(c.events) > c.cfg.EventHistoryMax {
  464. c.events = append([]Event(nil), c.events[len(c.events)-c.cfg.EventHistoryMax:]...)
  465. }
  466. ret := c.cfg.Retention
  467. if ret <= 0 {
  468. return
  469. }
  470. cut := now.Add(-ret)
  471. mStart := 0
  472. for mStart < len(c.metricsHistory) && c.metricsHistory[mStart].Timestamp.Before(cut) {
  473. mStart++
  474. }
  475. if mStart > 0 {
  476. c.metricsHistory = append([]MetricPoint(nil), c.metricsHistory[mStart:]...)
  477. }
  478. eStart := 0
  479. for eStart < len(c.events) && c.events[eStart].Timestamp.Before(cut) {
  480. eStart++
  481. }
  482. if eStart > 0 {
  483. c.events = append([]Event(nil), c.events[eStart:]...)
  484. }
  485. }
  486. func sanitizeConfig(cfg Config) Config {
  487. def := DefaultConfig()
  488. if cfg.HeavySampleEvery <= 0 {
  489. cfg.HeavySampleEvery = def.HeavySampleEvery
  490. }
  491. if cfg.MetricSampleEvery <= 0 {
  492. cfg.MetricSampleEvery = def.MetricSampleEvery
  493. }
  494. if cfg.MetricHistoryMax <= 0 {
  495. cfg.MetricHistoryMax = def.MetricHistoryMax
  496. }
  497. if cfg.EventHistoryMax <= 0 {
  498. cfg.EventHistoryMax = def.EventHistoryMax
  499. }
  500. if cfg.Retention <= 0 {
  501. cfg.Retention = def.Retention
  502. }
  503. if strings.TrimSpace(cfg.PersistDir) == "" {
  504. cfg.PersistDir = def.PersistDir
  505. }
  506. if cfg.RotateMB <= 0 {
  507. cfg.RotateMB = def.RotateMB
  508. }
  509. if cfg.KeepFiles <= 0 {
  510. cfg.KeepFiles = def.KeepFiles
  511. }
  512. return cfg
  513. }
  514. func normalizeQuery(q Query) Query {
  515. if q.Limit <= 0 || q.Limit > 5000 {
  516. q.Limit = 500
  517. }
  518. if q.Tags == nil {
  519. q.Tags = Tags{}
  520. }
  521. return q
  522. }
  523. func metricMatch(m MetricPoint, q Query) bool {
  524. if !q.From.IsZero() && m.Timestamp.Before(q.From) {
  525. return false
  526. }
  527. if !q.To.IsZero() && m.Timestamp.After(q.To) {
  528. return false
  529. }
  530. if q.Name != "" && m.Name != q.Name {
  531. return false
  532. }
  533. if q.NamePrefix != "" && !strings.HasPrefix(m.Name, q.NamePrefix) {
  534. return false
  535. }
  536. for k, v := range q.Tags {
  537. if m.Tags[k] != v {
  538. return false
  539. }
  540. }
  541. return true
  542. }
  543. func eventMatch(ev Event, q Query) bool {
  544. if !q.From.IsZero() && ev.Timestamp.Before(q.From) {
  545. return false
  546. }
  547. if !q.To.IsZero() && ev.Timestamp.After(q.To) {
  548. return false
  549. }
  550. if q.Name != "" && ev.Name != q.Name {
  551. return false
  552. }
  553. if q.NamePrefix != "" && !strings.HasPrefix(ev.Name, q.NamePrefix) {
  554. return false
  555. }
  556. if q.Level != "" && !strings.EqualFold(q.Level, ev.Level) {
  557. return false
  558. }
  559. for k, v := range q.Tags {
  560. if ev.Tags[k] != v {
  561. return false
  562. }
  563. }
  564. return true
  565. }
  566. func metricKey(name string, tags Tags) string {
  567. if len(tags) == 0 {
  568. return name
  569. }
  570. keys := make([]string, 0, len(tags))
  571. for k := range tags {
  572. keys = append(keys, k)
  573. }
  574. sort.Strings(keys)
  575. var b strings.Builder
  576. b.Grow(len(name) + len(keys)*16)
  577. b.WriteString(name)
  578. for _, k := range keys {
  579. b.WriteString("|")
  580. b.WriteString(k)
  581. b.WriteString("=")
  582. b.WriteString(tags[k])
  583. }
  584. return b.String()
  585. }
  586. func cloneTags(tags Tags) Tags {
  587. if len(tags) == 0 {
  588. return nil
  589. }
  590. out := make(Tags, len(tags))
  591. for k, v := range tags {
  592. out[k] = v
  593. }
  594. return out
  595. }
  596. func cloneFields(fields map[string]any) map[string]any {
  597. if len(fields) == 0 {
  598. return nil
  599. }
  600. out := make(map[string]any, len(fields))
  601. for k, v := range fields {
  602. out[k] = v
  603. }
  604. return out
  605. }
  606. func copyMetric(m MetricPoint) MetricPoint {
  607. return MetricPoint{
  608. Timestamp: m.Timestamp,
  609. Name: m.Name,
  610. Type: m.Type,
  611. Value: m.Value,
  612. Tags: cloneTags(m.Tags),
  613. }
  614. }
  615. func copyEvent(ev Event) Event {
  616. return Event{
  617. ID: ev.ID,
  618. Timestamp: ev.Timestamp,
  619. Name: ev.Name,
  620. Level: ev.Level,
  621. Message: ev.Message,
  622. Tags: cloneTags(ev.Tags),
  623. Fields: cloneFields(ev.Fields),
  624. }
  625. }
  626. func p95FromDist(d *distMetric) float64 {
  627. if d == nil || d.count == 0 {
  628. return 0
  629. }
  630. n := d.next
  631. if d.full {
  632. n = len(d.samples)
  633. }
  634. if n <= 0 {
  635. return d.last
  636. }
  637. buf := make([]float64, n)
  638. copy(buf, d.samples[:n])
  639. sort.Float64s(buf)
  640. idx := int(float64(n-1) * 0.95)
  641. if idx < 0 {
  642. idx = 0
  643. }
  644. if idx >= n {
  645. idx = n - 1
  646. }
  647. return buf[idx]
  648. }
  649. type jsonlWriter struct {
  650. cfg Config
  651. mu sync.Mutex
  652. dir string
  653. f *os.File
  654. w *bufio.Writer
  655. currentPath string
  656. currentSize int64
  657. seq int64
  658. }
  659. func newJSONLWriter(cfg Config) (*jsonlWriter, error) {
  660. dir := filepath.Clean(cfg.PersistDir)
  661. if err := os.MkdirAll(dir, 0o755); err != nil {
  662. return nil, err
  663. }
  664. w := &jsonlWriter{cfg: cfg, dir: dir}
  665. if err := w.rotateLocked(); err != nil {
  666. return nil, err
  667. }
  668. return w, nil
  669. }
  670. func (w *jsonlWriter) Write(v persistedEnvelope) error {
  671. w.mu.Lock()
  672. defer w.mu.Unlock()
  673. if w.f == nil || w.w == nil {
  674. return nil
  675. }
  676. line, err := json.Marshal(v)
  677. if err != nil {
  678. return err
  679. }
  680. line = append(line, '\n')
  681. if w.currentSize+int64(len(line)) > int64(w.cfg.RotateMB)*1024*1024 {
  682. if err := w.rotateLocked(); err != nil {
  683. return err
  684. }
  685. }
  686. n, err := w.w.Write(line)
  687. w.currentSize += int64(n)
  688. if err != nil {
  689. return err
  690. }
  691. return w.w.Flush()
  692. }
  693. func (w *jsonlWriter) Close() error {
  694. w.mu.Lock()
  695. defer w.mu.Unlock()
  696. if w.w != nil {
  697. _ = w.w.Flush()
  698. }
  699. if w.f != nil {
  700. err := w.f.Close()
  701. w.f = nil
  702. w.w = nil
  703. return err
  704. }
  705. return nil
  706. }
  707. func (w *jsonlWriter) rotateLocked() error {
  708. if w.w != nil {
  709. _ = w.w.Flush()
  710. }
  711. if w.f != nil {
  712. _ = w.f.Close()
  713. }
  714. w.seq++
  715. name := fmt.Sprintf("telemetry-%s-%04d.jsonl", time.Now().UTC().Format("20060102-150405"), w.seq)
  716. path := filepath.Join(w.dir, name)
  717. f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  718. if err != nil {
  719. return err
  720. }
  721. info, _ := f.Stat()
  722. size := int64(0)
  723. if info != nil {
  724. size = info.Size()
  725. }
  726. w.f = f
  727. w.w = bufio.NewWriterSize(f, 64*1024)
  728. w.currentPath = path
  729. w.currentSize = size
  730. _ = pruneFiles(w.dir, w.cfg.KeepFiles)
  731. return nil
  732. }
  733. func pruneFiles(dir string, keep int) error {
  734. if keep <= 0 {
  735. return nil
  736. }
  737. ents, err := os.ReadDir(dir)
  738. if err != nil {
  739. return err
  740. }
  741. files := make([]string, 0, len(ents))
  742. for _, ent := range ents {
  743. if ent.IsDir() {
  744. continue
  745. }
  746. name := ent.Name()
  747. if !strings.HasPrefix(name, "telemetry-") || !strings.HasSuffix(name, ".jsonl") {
  748. continue
  749. }
  750. files = append(files, filepath.Join(dir, name))
  751. }
  752. if len(files) <= keep {
  753. return nil
  754. }
  755. sort.Strings(files)
  756. for _, path := range files[:len(files)-keep] {
  757. _ = os.Remove(path)
  758. }
  759. return nil
  760. }
  761. func readPersistedMetrics(cfg Config, q Query) ([]MetricPoint, error) {
  762. files, err := listPersistedFiles(cfg.PersistDir)
  763. if err != nil {
  764. return nil, err
  765. }
  766. out := make([]MetricPoint, 0, 256)
  767. for _, path := range files {
  768. points, err := parsePersistedFile(path, q)
  769. if err != nil {
  770. continue
  771. }
  772. for _, p := range points.metrics {
  773. if metricMatch(p, q) {
  774. out = append(out, p)
  775. }
  776. }
  777. }
  778. return out, nil
  779. }
  780. func readPersistedEvents(cfg Config, q Query) ([]Event, error) {
  781. files, err := listPersistedFiles(cfg.PersistDir)
  782. if err != nil {
  783. return nil, err
  784. }
  785. out := make([]Event, 0, 128)
  786. for _, path := range files {
  787. points, err := parsePersistedFile(path, q)
  788. if err != nil {
  789. continue
  790. }
  791. for _, ev := range points.events {
  792. if eventMatch(ev, q) {
  793. out = append(out, ev)
  794. }
  795. }
  796. }
  797. return out, nil
  798. }
  799. type parsedFile struct {
  800. metrics []MetricPoint
  801. events []Event
  802. }
  803. func parsePersistedFile(path string, q Query) (parsedFile, error) {
  804. f, err := os.Open(path)
  805. if err != nil {
  806. return parsedFile{}, err
  807. }
  808. defer f.Close()
  809. out := parsedFile{
  810. metrics: make([]MetricPoint, 0, 64),
  811. events: make([]Event, 0, 32),
  812. }
  813. s := bufio.NewScanner(f)
  814. s.Buffer(make([]byte, 0, 32*1024), 1024*1024)
  815. for s.Scan() {
  816. line := s.Bytes()
  817. if len(line) == 0 {
  818. continue
  819. }
  820. var env persistedEnvelope
  821. if err := json.Unmarshal(line, &env); err != nil {
  822. continue
  823. }
  824. if env.Metric != nil {
  825. out.metrics = append(out.metrics, *env.Metric)
  826. } else if env.Event != nil {
  827. out.events = append(out.events, *env.Event)
  828. }
  829. if q.Limit > 0 && len(out.metrics)+len(out.events) > q.Limit*2 {
  830. // keep bounded while scanning
  831. if len(out.metrics) > q.Limit {
  832. out.metrics = out.metrics[len(out.metrics)-q.Limit:]
  833. }
  834. if len(out.events) > q.Limit {
  835. out.events = out.events[len(out.events)-q.Limit:]
  836. }
  837. }
  838. }
  839. return out, s.Err()
  840. }
  841. func listPersistedFiles(dir string) ([]string, error) {
  842. ents, err := os.ReadDir(dir)
  843. if err != nil {
  844. return nil, err
  845. }
  846. files := make([]string, 0, len(ents))
  847. for _, ent := range ents {
  848. if ent.IsDir() {
  849. continue
  850. }
  851. name := ent.Name()
  852. if strings.HasPrefix(name, "telemetry-") && strings.HasSuffix(name, ".jsonl") {
  853. files = append(files, filepath.Join(dir, name))
  854. }
  855. }
  856. sort.Strings(files)
  857. return files, nil
  858. }
  859. func ParseTimeQuery(raw string) (time.Time, error) {
  860. raw = strings.TrimSpace(raw)
  861. if raw == "" {
  862. return time.Time{}, nil
  863. }
  864. if ms, err := strconv.ParseInt(raw, 10, 64); err == nil {
  865. if ms > 1e12 {
  866. return time.UnixMilli(ms).UTC(), nil
  867. }
  868. return time.Unix(ms, 0).UTC(), nil
  869. }
  870. if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
  871. return t.UTC(), nil
  872. }
  873. if t, err := time.Parse(time.RFC3339, raw); err == nil {
  874. return t.UTC(), nil
  875. }
  876. return time.Time{}, errors.New("invalid time query")
  877. }
  878. func TagsWith(base Tags, key string, value any) Tags {
  879. out := cloneTags(base)
  880. if out == nil {
  881. out = Tags{}
  882. }
  883. out[key] = fmt.Sprint(value)
  884. return out
  885. }
  886. func TagsFromPairs(kv ...string) Tags {
  887. if len(kv) < 2 {
  888. return nil
  889. }
  890. out := Tags{}
  891. for i := 0; i+1 < len(kv); i += 2 {
  892. k := strings.TrimSpace(kv[i])
  893. if k == "" {
  894. continue
  895. }
  896. out[k] = kv[i+1]
  897. }
  898. if len(out) == 0 {
  899. return nil
  900. }
  901. return out
  902. }
  903. func min(a int, b int) int {
  904. if a < b {
  905. return a
  906. }
  907. return b
  908. }