From 47764ed655a5bcf4932899543bde3dce5012ddc9 Mon Sep 17 00:00:00 2001 From: Jan Date: Mon, 13 Apr 2026 11:57:16 +0200 Subject: [PATCH] feat: add WS-1 metering telemetry and calmer meter holds --- cmd/fmrtx/main.go | 3 + docs/metering-ws1-implementation-plan.md | 402 +++++++++++++++ docs/metering_telemetry_autobahn_v3.md | 606 +++++++++++++++++++++++ go.mod | 3 +- go.sum | 2 + internal/app/engine.go | 21 + internal/control/control.go | 45 +- internal/control/telemetry.go | 77 +++ internal/control/ui.html | 29 +- 9 files changed, 1173 insertions(+), 15 deletions(-) create mode 100644 docs/metering-ws1-implementation-plan.md create mode 100644 docs/metering_telemetry_autobahn_v3.md create mode 100644 internal/control/telemetry.go diff --git a/cmd/fmrtx/main.go b/cmd/fmrtx/main.go index b241a6a..a80b5a8 100644 --- a/cmd/fmrtx/main.go +++ b/cmd/fmrtx/main.go @@ -243,6 +243,9 @@ func runTXMode(cfg cfgpkg.Config, configPath string, driver platform.SoapyDriver configureControlPlanePersistence(srv, configPath, cancel) srv.SetDriver(driver) srv.SetTXController(&txBridge{engine: engine}) + if hub := srv.TelemetryHub(); hub != nil { + engine.SetMeasurementPublisher(hub.PublishMeasurement) + } if streamSrc != nil { srv.SetStreamSource(streamSrc) } diff --git a/docs/metering-ws1-implementation-plan.md b/docs/metering-ws1-implementation-plan.md new file mode 100644 index 0000000..6c62723 --- /dev/null +++ b/docs/metering-ws1-implementation-plan.md @@ -0,0 +1,402 @@ +# Metering WS-1 Implementation Plan + +Status: Draft +Scope: Minimal WebSocket live-telemetry path for metering in `fm-rds-tx` + +## 1. Goal + +Add the smallest practical live metering transport on top of the existing snapshot system. + +The target is: +- keep `GET /measurements` +- add `GET /ws/telemetry` +- stream only `measurement` +- use the same underlying measurement truth as `/measurements` +- keep the realtime path safe +- make the browser prefer WS, but fall back to snapshot polling + +This is WS-1, not the final telemetry system. + +--- + +## 2. Existing Pieces We Already Have + +The current codebase already provides the most important semantic building blocks: + +- `internal/offline/generator.go` + - produces `MeasurementSnapshot` + - stores `latestMeasurement` +- `internal/app/engine.go` + - carries measurement in `EngineStats` +- `internal/control/control.go` + - exposes `GET /measurements` +- `internal/control/ui.html` + - already consumes `/measurements` + - already contains browser-side meter smoothing / peak-hold style logic + +That means WS-1 does not need a new meter model. +It only needs a new live transport path. + +--- + +## 3. Hard Rules + +## 3.1 Single source of truth + +There must be only one latest measurement truth: +- generator/engine produces the snapshot +- `/measurements` exposes it +- WebSocket streams it + +WebSocket must not introduce a second, UI-specific measurement calculation path. + +## 3.2 Realtime safety + +The producer path must never block on telemetry. + +Allowed on the realtime side: +- reading latest measurement pointer +- comparing sequence numbers +- one non-blocking publish step + +Forbidden on the realtime side: +- JSON encoding +- HTTP work +- WebSocket writes +- blocking channels +- slow shared locks + +## 3.3 WS-1 stays small + +WS-1 should include only: +- one endpoint: `GET /ws/telemetry` +- one message class: `measurement` +- one small broadcaster/hub +- one bounded queue per client +- one drop policy: drop old, keep newest +- browser snapshot bootstrap + WS preference + +No topics, bundles, runtime-event multiplexing, or speculative protocol machinery. + +--- + +## 4. Proposed Files + +## New file +### `internal/control/telemetry.go` + +Contains: +- `TelemetryMessage` +- `TelemetryHub` +- subscriber management +- non-blocking publish logic + +## Modified files +### `internal/control/control.go` +- add telemetry hub to `Server` +- add `GET /ws/telemetry` +- add WS handler + +### `internal/app/engine.go` +- add optional measurement publisher hook +- publish new measurement snapshots when sequence advances + +### `cmd/fmrtx/main.go` +- wire engine publisher to control telemetry hub + +### `internal/control/ui.html` +- add `connectTelemetryWS()` +- bootstrap from `/measurements` +- prefer WS while connected +- fall back to polling when disconnected + +--- + +## 5. Message Shape + +WS-1 should use a minimal typed envelope. + +Example: + +```json +{ + "type": "measurement", + "ts": "2026-04-13T07:00:53.842Z", + "seq": 128, + "data": { + "timestamp": "2026-04-13T07:00:53.842Z", + "sampleRateHz": 228000, + "chunkSamples": 11400, + "chunkDurationMs": 50, + "sequence": 128, + "flags": { + "stereoEnabled": true, + "stereoMode": "DSB" + }, + "lrPreEncodePostWatermark": { + "lRms": 0.27, + "rRms": 0.27, + "lPeakAbs": 0.51, + "rPeakAbs": 0.51 + }, + "compositeFinalPreIq": { + "peakAbs": 0.63, + "pilotInjectionEquivalentPercent": 9.0 + } + } +} +``` + +Rule: +- `data` should be semantically identical to `/measurements.measurement` +- envelope adds only: + - `type` + - `ts` + - `seq` + +--- + +## 6. TelemetryHub Design + +## 6.1 Minimal responsibilities + +The hub should: +- accept published measurement snapshots +- fan them out to connected clients +- use bounded per-client queues +- never block the publisher + +## 6.2 Minimal internal shape + +A small subscriber model is enough for WS-1. + +Conceptually: + +```go +type TelemetryMessage struct { + Type string `json:"type"` + TS time.Time `json:"ts"` + Seq uint64 `json:"seq"` + Data interface{} `json:"data"` +} + +type telemetrySubscriber struct { + ch chan TelemetryMessage +} + +type TelemetryHub struct { + mu sync.Mutex + subscribers map[*telemetrySubscriber]struct{} +} +``` + +## 6.3 Publish policy + +Per client: +- channel size should be tiny, e.g. `1` or `2` +- if the channel is full: + - discard older unsent frame + - keep newest + +In short: +- latest state wins +- producer never blocks + +--- + +## 7. Preferred Publish Path + +The realtime path should not write WebSocket frames directly. + +Preferred path: +- generator finalizes latest measurement snapshot +- engine notices a new sequence +- engine calls a small measurement publisher hook +- hub receives the snapshot non-blockingly +- WS clients receive the transport copy later + +That keeps transport outside the hot path. + +--- + +## 8. Engine Integration + +## 8.1 Publisher hook + +Add a small hook to `Engine`, conceptually like: + +```go +SetMeasurementPublisher(func(*offpkg.MeasurementSnapshot)) +``` + +The engine should publish only when: +- a new snapshot exists +- the sequence advanced + +## 8.2 Why the engine is a good handoff point + +The generator is still the source of truth. + +But the engine is a good place to bridge from: +- chunk production +- to control-plane telemetry transport + +because it already sits between signal generation and external control/runtime reporting. + +--- + +## 9. Initial Snapshot on Subscribe + +On WebSocket connect, the server should: +1. upgrade the connection +2. subscribe the client +3. immediately send the latest known measurement snapshot if one exists +4. continue streaming subsequent updates + +This avoids the UI showing an empty state until the next natural update arrives. + +--- + +## 10. WebSocket Handler Behavior + +## Endpoint +- `GET /ws/telemetry` + +## Handler rules +- upgrade connection +- create subscriber +- send latest known snapshot immediately if available +- loop over subscriber channel +- write typed messages with write deadlines +- on write failure or broken client: + - close connection + - unsubscribe + +Additional safety rule: +- a persistently slow or broken client may be dropped rather than buffered around + +--- + +## 11. UI Behavior + +The browser flow should be: + +1. load snapshot from `GET /measurements` +2. render immediately from snapshot +3. connect to WebSocket +4. if WS is healthy, prefer streamed updates +5. if WS drops, keep last known state and resume snapshot polling + +## UI state additions +Conceptually: + +```js +S.telemetry = { + ws: null, + wsConnected: false, + wsRetryTimer: null, + snapshotPollingActive: true +} +``` + +--- + +## 12. Polling Strategy with WS + +Current UI behavior polls `/measurements` aggressively. + +WS-1 should change that to: + +### While WS is healthy +- no continuous `/measurements` polling +- keep `/runtime` and `/config` polling as needed + +### While WS is disconnected +- resume `/measurements` polling +- lower fallback rate is acceptable + +This preserves `/measurements` without turning it into a redundant live-stream duplicate. + +--- + +## 13. Observability + +WS-1 logging should stay minimal. + +Useful: +- client connected +- client disconnected +- client dropped due to write failure/backpressure + +Not useful: +- logging every telemetry frame + +--- + +## 14. Risks + +### A. Too much logic in the producer path +Avoid. + +### B. Slow client causing sticky WS writes +Use deadlines and disconnect. + +### C. Drift between `/measurements` and WS +Prevent by using the same underlying snapshot source. + +### D. Overengineering WS-1 +Avoid topics, bundles, and speculative protocol work. + +--- + +## 15. Implementation Sequence + +### Step 1 +Create `internal/control/telemetry.go` with: +- `TelemetryMessage` +- `TelemetryHub` +- subscribe/unsubscribe/publish + +### Step 2 +Add telemetry hub ownership to `Server` in `internal/control/control.go`. + +### Step 3 +Add `GET /ws/telemetry` handler in control. + +### Step 4 +Add measurement publisher hook to `Engine`. + +### Step 5 +Wire engine publisher to telemetry hub in `cmd/fmrtx/main.go`. + +### Step 6 +Update `internal/control/ui.html`: +- snapshot bootstrap +- WS connect +- WS preference +- fallback polling + +### Step 7 +Test behavior manually: +- no-data case +- connect while TX idle +- connect while TX running +- disconnect/reconnect +- fallback back to polling +- slow/broken client handling + +--- + +## 16. Recommendation + +Implement WS-1 in the smallest possible form. + +That means: +- keep `/measurements` +- add one WS endpoint +- stream one message type +- keep one truth +- protect the realtime path +- let freshness win over completeness + +That gives `fm-rds-tx` a real live-metering transport backbone without turning the first step into a giant telemetry framework. diff --git a/docs/metering_telemetry_autobahn_v3.md b/docs/metering_telemetry_autobahn_v3.md new file mode 100644 index 0000000..226cc42 --- /dev/null +++ b/docs/metering_telemetry_autobahn_v3.md @@ -0,0 +1,606 @@ +# Metering Telemetry Autobahn Concept + +Status: Draft +Version: 3.1 +Scope: `fm-rds-tx` runtime telemetry transport for live metering, browser UI, future composite spectrum, and snapshot fallback APIs + +## 1. Summary + +The current `GET /measurements` endpoint is already a useful and well-shaped snapshot API for composite/MPX metering. It should stay. + +If the UI is expected to evolve toward: +- smoother live audio meters +- smoother MPX meters +- higher refresh rates +- future composite spectrum/analyzer views +- multiple concurrent telemetry consumers + +then snapshot polling alone is no longer the ideal transport. + +This document proposes a **live telemetry transport layer** — the “metering autobahn” — built around: +- a small telemetry broadcaster/hub inside the control plane +- WebSocket delivery for low-latency/high-rate live data +- a deliberately tiny WS-1 scope +- continued support for `GET /measurements` as a stable snapshot fallback +- explicit protection of the ingest / DSP / TX realtime path + +The key design rules are: +- **streaming is added, not substituted** +- **`/measurements` remains a first-class snapshot endpoint** +- **metering must never be allowed to interfere with ingest / DSP / TX timing** + +--- + +## 2. Core Principle + +### `/measurements` stays + +The existing snapshot endpoint must **not** be removed. + +It remains valuable for: +- debugging +- curl/manual inspection +- API consumers that only want snapshots +- low-complexity integrations +- fallback behavior when WebSocket transport is unavailable + +So the intended model is: +- **WebSocket for live streaming** +- **`GET /measurements` for stable snapshot access** + +Not one replacing the other. + +--- + +## 3. Goals + +### Primary goals +- Provide a transport suitable for higher-rate live metering. +- Support future spectrum/analyzer-style UI features. +- Keep structured measurement semantics separate from transport concerns. +- Avoid forcing the browser to poll snapshots at increasingly high rates. +- Preserve `/measurements` as a stable snapshot API. +- Protect ingest / DSP / TX timing from telemetry, transport, browser, and control-plane behavior. + +### Secondary goals +- Support multiple telemetry consumers. +- Handle slow clients safely. +- Avoid overloading the hot DSP path. +- Make future spectrum support possible without transport redesign. +- Ensure telemetry degrades by dropping metering data rather than slowing the realtime path. + +--- + +## 4. Non-Goals + +This concept is **not**: +- a replacement for `/runtime` +- a replacement for `/measurements` +- a raw-sample streaming design +- a browser-side FFT design +- a long-term telemetry database +- an excuse to build a huge generalized pub/sub system in WS-1 +- a design where browser/UI/control-plane demand can push back into ingest / DSP / TX + +--- + +## 5. Single Source of Truth + +This is the most important semantic rule. + +There must be exactly **one latest measurement snapshot truth** inside the runtime/control system. + +That means: +- the generator/engine path produces the latest measurement snapshot +- `GET /measurements` exposes that snapshot +- WebSocket streams updates derived from that same snapshot source + +WebSocket must **not** introduce a separate meter-calculation path. + +Otherwise the system risks a future mismatch like: +- polling UI shows one value +- streaming UI shows another value +- both appear plausible +- nobody trusts either anymore + +So the rule is: +- **same measurement source** +- **different delivery mechanisms** + +Additional WS rule: +- **the `measurement.data` payload sent over WebSocket should be semantically identical to the `measurement` object returned by `GET /measurements`** +- transport envelope fields such as `type`, `ts`, and `seq` may differ, but the underlying measurement meaning must not drift + +--- + +## 6. Realtime Safety Rule + +This is the most important operational rule. + +Ingest / DSP / TX timing owns the system. + +Metering is valuable, but it is **not** allowed to compete with realtime work for correctness. If the system must choose between: +- keeping ingest / DSP / TX on time +- or delivering every metering update + +then metering loses. + +The rule is: +- **realtime first** +- **metering is best-effort** +- **dropped telemetry is acceptable** +- **timing interference is not acceptable** + +This means metering transport must be designed so that: +- slow clients cannot block producers +- control-plane activity cannot block producers +- JSON / HTTP / WebSocket work cannot occur on the realtime path +- telemetry backlog cannot cause unbounded memory growth +- the realtime path never waits for telemetry consumers + +In short: +- **if anything must be sacrificed under load, sacrifice telemetry freshness/completeness, never ingest / DSP / TX timing** + +This rule also applies to future spectrum support: +- spectrum is also best-effort +- future spectrum work must never degrade ingest / DSP / TX timing + +--- + +## 7. One-Way Data Flow Rule + +The data-flow direction must be explicit. + +Allowed direction: +- **realtime path → published measurement snapshot → control-plane broadcaster → clients** + +Forbidden direction: +- **client demand → control plane → realtime path “give me data now”** + +This means: +- the realtime path produces telemetry only when it naturally completes work +- the control plane reads what the realtime side has already published +- browser refresh rate must not cause extra DSP work +- WebSocket clients do not “request the current meter” from the realtime path + +The system should therefore behave as: +- one producer of measurement snapshots +- one non-RT transport layer that distributes already-produced snapshots +- zero transport-driven callbacks into the DSP hot path + +--- + +## 8. Architectural Layers + +## 8.1 Signal production layer + +The generator / engine already produces semantically meaningful measurement snapshots. + +That should remain the source of truth for metering data. + +Later, spectrum production can be added in a similarly structured way. + +Examples of produced data classes: +- measurement snapshots +- future spectrum frames +- optional future runtime event frames + +## 8.2 Realtime-safe publication boundary + +Between the realtime path and the control plane there must be a strict publication boundary. + +Responsibilities: +- accept already-computed chunk-local measurement results +- publish them in a way that never blocks the producer +- allow overwrite/drop behavior under load +- prevent transport concerns from leaking into ingest / DSP / TX + +This boundary is where realtime safety is enforced. + +## 8.3 Telemetry transport layer + +Introduce a small telemetry broadcaster/hub in the control plane. + +Responsibilities in WS-1: +- accept published measurement snapshots from the non-blocking publication boundary +- fan them out to connected WebSocket clients +- apply bounded-queue/backpressure policy +- isolate transport logic from DSP/runtime logic + +This transport layer should stay intentionally small at first. + +## 8.4 Client/UI layer + +The browser UI should consume: +- `GET /measurements` for initial/fallback snapshot state +- WebSocket for live updates when available + +Rendering logic such as: +- smoothing +- peak hold +- decay +- short local history + +should remain on the UI side. + +--- + +## 9. Why a Broadcaster/Hub Is Still Useful + +Even in a minimal WS-1 design, a broadcaster/hub is useful because it keeps transport logic out of: +- generator code +- engine code +- ad-hoc handler state + +It allows: +- one producer → many consumers +- bounded queues per client +- clean control-plane ownership of transport + +But for WS-1, this hub should be **small and boring**, not a grand infrastructure project. + +--- + +## 10. Hot-Path Constraints + +The realtime path must remain intentionally primitive. + +Allowed on the realtime side: +- chunk-local accumulation into predeclared counters/fields +- simple arithmetic such as abs/square/max/counter updates +- one finalize step per chunk +- one non-blocking publication step per chunk + +Forbidden on the realtime side: +- JSON encoding +- HTTP handling +- WebSocket writes +- logging in the hot path +- blocking channels +- contended locks shared with non-RT code +- dynamic queue growth +- per-sample heap allocation +- transport-driven callback logic + +The model is: +- compute meters while already processing audio/composite data +- finalize once per chunk +- publish once per chunk +- leave all transport/rendering concerns outside the realtime path + +--- + +## 11. Publication Strategy + +The publication boundary must be non-blocking. + +Acceptable implementation styles include: + +### Option A — Atomic latest snapshot +- realtime side writes the latest completed snapshot into a preallocated slot or latest-value holder +- readers fetch the latest available completed value +- no backlog is preserved +- freshness is prioritized completely over completeness + +### Option B — Tiny bounded SPSC-style queue/ring +- queue size intentionally tiny, typically `1` or `2` +- if full, older unsent snapshot is overwritten or discarded +- publisher never blocks +- reader sees the newest available completed value + +For WS-1, either approach is acceptable as long as these rules hold: +- bounded memory only +- no producer blocking +- latest state wins + +For WS-1, the preferred implementation bias is: +- **choose the simplest non-blocking latest-value publication model that satisfies the realtime safety rules** +- in practice this often means starting with an atomic/latest-snapshot publication model before introducing a more explicit tiny ring structure + +The most important rule is not the exact primitive. The most important rule is: +- **metering publication may drop or overwrite telemetry, but may not delay the producer** + +--- + +## 12. WS-1 Scope: Keep It Brutally Small + +This is a deliberate constraint. + +WS-1 should include only: +- one endpoint: `GET /ws/telemetry` +- one message class: `measurement` +- one small broadcaster/hub +- one bounded queue per client +- one drop policy: drop old, keep newest +- UI snapshot bootstrap + WS live updates + +WS-1 should **not** include: +- topic subscriptions +- bundle messages +- runtime-event multiplexing +- quality-level negotiation +- generalized telemetry protocol machinery +- speculative infrastructure for future categories + +The goal of WS-1 is simple: +- make meters smoother +- establish the live telemetry path +- do not overengineer + +--- + +## 13. Why WebSocket and Not Only SSE + +For WS-1, the traffic is fundamentally server → browser. + +That means **Server-Sent Events (SSE)** would also be a technically valid option and would be simpler in some respects. + +However, WebSocket is still preferred here because it better matches the likely next steps: +- future multiple telemetry classes +- future spectrum delivery +- possible future interactive or negotiated telemetry behavior + +So the decision is: +- **SSE would be sufficient for the narrowest first step** +- **WebSocket is preferred for forward compatibility** + +This is a strategic choice, not a claim that basic metering strictly requires WebSocket. + +--- + +## 14. Transport Model + +## 14.1 Existing snapshot endpoint +- `GET /measurements` + +Role: +- stable pull-based snapshot +- debugging +- fallback +- low-rate integrations + +This endpoint should continue returning the latest measurement snapshot in structured JSON form. + +## 14.2 New live endpoint +- proposed: `GET /ws/telemetry` + +Role: +- push-based live measurement updates +- suitable for smoother meter motion +- future-ready for later telemetry expansion + +On subscribe, the server should immediately send the latest known measurement snapshot if one exists, so the client becomes visually current without waiting for the next natural update. + +--- + +## 15. Message Classes + +For WS-1, the system should implement exactly one message class. + +## 15.1 `measurement` + +Carries the latest structured measurement snapshot. + +Preferred rule: +- the `data` payload should match the `GET /measurements` snapshot shape as closely as possible +- transport envelope fields such as `type` may wrap the same underlying snapshot semantics, but WS should not invent a subtly different meter schema + +Example: +```json +{ + "type": "measurement", + "ts": "2026-04-13T07:00:53.842Z", + "seq": 128, + "data": { + "sampleRateHz": 228000, + "chunkSamples": 11400, + "flags": { + "stereoEnabled": true, + "stereoMode": "DSB" + }, + "lrPreEncodePostWatermark": { + "lRms": 0.27, + "rRms": 0.27, + "lPeakAbs": 0.51, + "rPeakAbs": 0.51 + }, + "compositeFinalPreIq": { + "peakAbs": 0.63, + "pilotInjectionEquivalentPercent": 9.0 + } + } +} +``` + +### Not part of WS-1 yet +These are future classes, not current WS-1 deliverables: +- `spectrum` +- `runtime` +- bundles / multiplexed compound messages + +--- + +## 16. Update Rates + +### Measurement snapshots +- target: `10–20 Hz` +- enough for noticeably smoother meters than snapshot polling +- reasonable for WS-1 + +WS-1 does not need extreme rates yet. + +The goal is not “as fast as possible”, but: +- smoother than polling +- stable under load +- simple to reason about + +--- + +## 17. Backpressure Strategy + +This is mandatory. + +For metering, freshness matters more than completeness. + +Preferred policy per client: +- bounded queue/channel +- if full: + - discard older unsent frame(s) + - keep the newest available state + +In short: +- **latest state wins** + +This is especially important because browser-side WebSocket APIs do not give you a magical end-to-end backpressure solution. + +Additional server safety rule: +- if a client remains persistently too slow, broken, or backpressured, the server may close that client connection rather than growing complexity or buffering to accommodate it + +--- + +## 18. UI Consumption Model + +The frontend should have two distinct layers. + +### Transport layer +- connect WebSocket +- reconnect on disconnect +- parse `measurement` messages +- store latest live state +- fall back to `/measurements` when needed + +### Rendering layer +- meter smoothing +- peak hold +- decay +- short local history + +This keeps transport and presentation loosely coupled. + +--- + +## 19. Fallback Behavior + +Preferred UI behavior: +1. load snapshot from `GET /measurements` +2. render immediately from snapshot +3. connect WebSocket +4. if WS is healthy, prefer streamed updates +5. if WS drops, keep rendering last known state and resume snapshot fallback polling + +This keeps the UI both: +- responsive when live transport is available +- robust when it is not + +--- + +## 20. Future Direction (Explicitly Not WS-1) + +These are valid later expansions, but they should not enlarge the first implementation unnecessarily: +- `spectrum` message type +- composite spectrum producer +- runtime/event stream integration +- quality levels / adaptive throttling +- topic or subscription semantics +- bundled telemetry frames + +These can come later once WS-1 proves the transport path. + +--- + +## 21. Proposed Internal Shape + +The internal design only needs to support a small set of concepts for WS-1, such as: +- publish latest measurement snapshot +- subscribe client connection +- drop stale frames under backpressure + +That can still be implemented with a small internal abstraction such as: +- `PublishMeasurement(snapshot)` +- `Subscribe()` + +It does not need a giant generic telemetry framework yet. + +--- + +## 22. Phased Implementation Plan + +## Phase WS-1 — Measurement streaming only + +Deliverables: +- small telemetry broadcaster/hub in control plane +- `GET /ws/telemetry` +- only `measurement` messages +- bounded per-client queue +- drop-old / keep-newest policy +- browser UI loads snapshot first, then prefers WS live updates +- `/measurements` remains unchanged + +## Phase WS-2 — UI transport polish + +Deliverables: +- reconnect handling +- clean fallback behavior +- meter update cadence tuning + +## Phase WS-3 — Spectrum support + +Deliverables: +- server-side composite spectrum producer +- `spectrum` message type +- browser spectrum panel + +## Phase WS-4 — Advanced transport controls + +Deliverables: +- optional adaptive throttling +- optional multiple telemetry classes +- optional richer live transport design + +--- + +## 23. Open Questions + +### Q1 +Should WS-1 stream only `measurement`? + +Current preference: +- yes +- keep it single-purpose + +### Q2 +Should the UI keep polling `/measurements` while WS is healthy? + +Current preference: +- no continuous polling during healthy WS +- fallback polling only + +### Q3 +Should future spectrum run at the same cadence as measurements? + +Current preference: +- no +- spectrum should likely be slower + +### Q4 +When should a more generic telemetry protocol exist? + +Current preference: +- only after WS-1 proves useful +- do not front-load complexity + +--- + +## 24. Recommended Next Step + +Implement **Phase WS-1** in the smallest practical form. + +Concrete steps: +1. add a small telemetry broadcaster in the control layer +2. define one WS message type: `measurement` +3. add `GET /ws/telemetry` +4. publish the latest measurement snapshot into that broadcaster +5. make the browser UI bootstrap from `/measurements`, then prefer WS +6. keep `/measurements` untouched as the snapshot fallback API + +That gives the system the live metering transport backbone without turning WS-1 into a giant infrastructure project. diff --git a/go.mod b/go.mod index 3f45ac5..0f7a4ef 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/jan/fm-rds-tx -go 1.22 +go 1.25.0 require github.com/jan/fm-rds-tx/internal v0.0.0 @@ -9,6 +9,7 @@ require ( github.com/hajimehoshi/go-mp3 v0.3.4 // indirect github.com/jfreymuth/oggvorbis v1.0.5 // indirect github.com/jfreymuth/vorbis v1.0.2 // indirect + golang.org/x/net v0.53.0 // indirect ) replace github.com/jan/fm-rds-tx/internal => ./internal diff --git a/go.sum b/go.sum index a67c282..4105f8c 100644 --- a/go.sum +++ b/go.sum @@ -5,4 +5,6 @@ github.com/jfreymuth/oggvorbis v1.0.5 h1:u+Ck+R0eLSRhgq8WTmffYnrVtSztJcYrl588DM4 github.com/jfreymuth/oggvorbis v1.0.5/go.mod h1:1U4pqWmghcoVsCJJ4fRBKv9peUJMBHixthRlBeD6uII= github.com/jfreymuth/vorbis v1.0.2 h1:m1xH6+ZI4thH927pgKD8JOH4eaGRm18rEE9/0WKjvNE= github.com/jfreymuth/vorbis v1.0.2/go.mod h1:DoftRo4AznKnShRl1GxiTFCseHr4zR9BN3TWXyuzrqQ= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/app/engine.go b/internal/app/engine.go index f307099..c8c1459 100644 --- a/internal/app/engine.go +++ b/internal/app/engine.go @@ -186,6 +186,10 @@ type Engine struct { // Live audio stream (optional) streamSrc *audio.StreamSource + + measurementPublisherMu sync.RWMutex + measurementPublisher func(*offpkg.MeasurementSnapshot) + lastPublishedMeasSeq atomic.Uint64 } // SetStreamSource configures a live audio stream as the audio source. @@ -283,6 +287,12 @@ func (e *Engine) SetChunkDuration(d time.Duration) { e.chunkDuration = d } +func (e *Engine) SetMeasurementPublisher(fn func(*offpkg.MeasurementSnapshot)) { + e.measurementPublisherMu.Lock() + e.measurementPublisher = fn + e.measurementPublisherMu.Unlock() +} + // LiveConfigUpdate carries hot-reloadable parameters from the control API. // nil pointers mean "no change". Validated before applying. type LiveConfigUpdate struct { @@ -722,6 +732,17 @@ func (e *Engine) writerLoop(ctx context.Context) { e.chunksProduced.Add(1) e.totalSamples.Add(uint64(n)) + if m := e.generator.LatestMeasurement(); m != nil { + if m.Sequence != e.lastPublishedMeasSeq.Load() { + e.measurementPublisherMu.RLock() + pub := e.measurementPublisher + e.measurementPublisherMu.RUnlock() + if pub != nil { + pub(m) + } + e.lastPublishedMeasSeq.Store(m.Sequence) + } + } } } diff --git a/internal/control/control.go b/internal/control/control.go index b807c98..1765bb0 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -12,10 +12,13 @@ import ( "sync/atomic" "time" + "golang.org/x/net/websocket" + "github.com/jan/fm-rds-tx/internal/audio" "github.com/jan/fm-rds-tx/internal/config" drypkg "github.com/jan/fm-rds-tx/internal/dryrun" "github.com/jan/fm-rds-tx/internal/ingest" + offpkg "github.com/jan/fm-rds-tx/internal/offline" "github.com/jan/fm-rds-tx/internal/platform" ) @@ -72,6 +75,7 @@ type Server struct { // calling hardReload when handleIngestSave is hit multiple times quickly. reloadPending atomic.Bool audit auditCounters + telemetryHub *TelemetryHub } type AudioIngress interface { @@ -171,7 +175,7 @@ type IngestSaveRequest struct { } func NewServer(cfg config.Config) *Server { - return &Server{cfg: cfg} + return &Server{cfg: cfg, telemetryHub: NewTelemetryHub()} } func hasRequestBody(r *http.Request) bool { @@ -237,6 +241,10 @@ func isAudioStreamContentType(r *http.Request) bool { return false } +func (s *Server) TelemetryHub() *TelemetryHub { + return s.telemetryHub +} + func (s *Server) SetTXController(tx TXController) { s.mu.Lock() s.tx = tx @@ -290,6 +298,7 @@ func (s *Server) Handler() http.Handler { mux.HandleFunc("/config/ingest/save", s.handleIngestSave) mux.HandleFunc("/runtime", s.handleRuntime) mux.HandleFunc("/measurements", s.handleMeasurements) + mux.Handle("/ws/telemetry", websocket.Handler(s.handleTelemetryWS)) mux.HandleFunc("/runtime/fault/reset", s.handleRuntimeFaultReset) mux.HandleFunc("/tx/start", s.handleTXStart) mux.HandleFunc("/tx/stop", s.handleTXStop) @@ -388,6 +397,40 @@ func (s *Server) handleMeasurements(w http.ResponseWriter, _ *http.Request) { _ = json.NewEncoder(w).Encode(result) } +func (s *Server) handleTelemetryWS(ws *websocket.Conn) { + if s.telemetryHub == nil { + _ = ws.Close() + return + } + _ = ws.SetDeadline(time.Now().Add(30 * time.Second)) + sub, unsubscribe := s.telemetryHub.Subscribe() + defer unsubscribe() + defer ws.Close() + + s.mu.RLock() + tx := s.tx + s.mu.RUnlock() + if tx != nil { + if stats := tx.TXStats(); stats != nil { + if measurement, ok := stats["measurement"].(interface{ }); ok { + if m, ok := measurement.(*offpkg.MeasurementSnapshot); ok && m != nil { + _ = ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) + if err := websocket.JSON.Send(ws, TelemetryMessage{Type: "measurement", TS: m.Timestamp, Seq: m.Sequence, Data: m}); err != nil { + return + } + } + } + } + } + + for msg := range sub.ch { + _ = ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) + if err := websocket.JSON.Send(ws, msg); err != nil { + return + } + } +} + func (s *Server) handleRuntime(w http.ResponseWriter, _ *http.Request) { s.mu.RLock() drv := s.drv diff --git a/internal/control/telemetry.go b/internal/control/telemetry.go new file mode 100644 index 0000000..6a38fc0 --- /dev/null +++ b/internal/control/telemetry.go @@ -0,0 +1,77 @@ +package control + +import ( + "sync" + "time" + + offpkg "github.com/jan/fm-rds-tx/internal/offline" +) + +type TelemetryMessage struct { + Type string `json:"type"` + TS time.Time `json:"ts"` + Seq uint64 `json:"seq"` + Data *offpkg.MeasurementSnapshot `json:"data,omitempty"` +} + +type telemetrySubscriber struct { + ch chan TelemetryMessage +} + +type TelemetryHub struct { + mu sync.Mutex + subscribers map[*telemetrySubscriber]struct{} +} + +func NewTelemetryHub() *TelemetryHub { + return &TelemetryHub{subscribers: make(map[*telemetrySubscriber]struct{})} +} + +func (h *TelemetryHub) Subscribe() (*telemetrySubscriber, func()) { + sub := &telemetrySubscriber{ch: make(chan TelemetryMessage, 1)} + h.mu.Lock() + h.subscribers[sub] = struct{}{} + h.mu.Unlock() + return sub, func() { + h.mu.Lock() + if _, ok := h.subscribers[sub]; ok { + delete(h.subscribers, sub) + close(sub.ch) + } + h.mu.Unlock() + } +} + +func (h *TelemetryHub) PublishMeasurement(snapshot *offpkg.MeasurementSnapshot) { + if h == nil || snapshot == nil { + return + } + msg := TelemetryMessage{ + Type: "measurement", + TS: snapshot.Timestamp, + Seq: snapshot.Sequence, + Data: snapshot, + } + + h.mu.Lock() + subs := make([]*telemetrySubscriber, 0, len(h.subscribers)) + for sub := range h.subscribers { + subs = append(subs, sub) + } + h.mu.Unlock() + + for _, sub := range subs { + select { + case sub.ch <- msg: + default: + select { + case <-sub.ch: + default: + } + select { + case sub.ch <- msg: + default: + } + } + } +} diff --git a/internal/control/ui.html b/internal/control/ui.html index 4aa1c3f..9f4968a 100644 --- a/internal/control/ui.html +++ b/internal/control/ui.html @@ -911,6 +911,7 @@ let toastTimer=null; const S={ server:{config:null,runtime:null,measurements:null,configOk:false,runtimeOk:false,lastConfigAt:0,lastRuntimeAt:0,lastMeasurementsAt:0}, + telemetry:{ws:null,wsConnected:false,wsRetryTimer:null,snapshotPollingActive:true}, lastRTState:'',draft:{},errors:{},dirty:new Set(), fieldErrors:{}, flowSelected:null,flowHover:null,flowAnchor:null, @@ -920,7 +921,7 @@ const S={ pollersStarted:false,mobilePanelsApplied:false,freqPresetIndex:0, charts:{audio:[],underruns:[],tx:[],hw:[],qf:[]}, transitions:[], - meterState:{audioL:{rms:0,peak:0,hold:0},audioR:{rms:0,peak:0,hold:0},mpx:{rms:0,peak:0,hold:0}}, + meterState:{audioL:{rms:0,rmsHold:0,rmsHoldTimerMs:0,peak:0,hold:0,holdTimerMs:0,lastTs:0,textRms:0,textPeak:0,textRmsDisplay:0,textPeakDisplay:0,textRmsLatchMs:0,textPeakLatchMs:0},audioR:{rms:0,rmsHold:0,rmsHoldTimerMs:0,peak:0,hold:0,holdTimerMs:0,lastTs:0,textRms:0,textPeak:0,textRmsDisplay:0,textPeakDisplay:0,textRmsLatchMs:0,textPeakLatchMs:0},mpx:{rms:0,rmsHold:0,rmsHoldTimerMs:0,peak:0,hold:0,holdTimerMs:0,lastTs:0,textRms:0,textPeak:0,textRmsDisplay:0,textPeakDisplay:0,textRmsLatchMs:0,textPeakLatchMs:0}}, }; // ── Field definitions ────────────────────────────────────────────────────── @@ -1011,7 +1012,8 @@ async function api(path,opts){const r=await fetch(path,opts);const t=await r.tex function setConn(ok,label){const led=$('led-conn'),lbl=$('conn-label');led.className='led '+(ok?S.pending>0?'on-amber':'on-green':'on-red');lbl.textContent=ok?S.pending>0?'busy':label||'connected':label||'offline';} async function loadConfig({silent=false}={}){try{const cfg=await api('/config');S.server.config=cfg;S.server.configOk=true;S.server.lastConfigAt=Date.now();syncIngDraft();syncCfgFromServer();syncFreqPresetIdx(cfg.fm?.frequencyMHz);setConn(true);render();if(!silent)log('Config synchronized','info');return cfg;}catch(e){S.server.configOk=false;if(!S.server.runtimeOk)setConn(false);render();if(!silent)log('Config load failed: '+e.message,'err');throw e;}} async function loadRuntime({silent=true}={}){try{const rt=await api('/runtime');S.server.runtime=rt;S.server.runtimeOk=true;S.server.lastRuntimeAt=Date.now();const synced=syncTransitions(rt.engine);notifyTransition(rt.engine,!synced);pushHistory(rt);setConn(true);render();return rt;}catch(e){S.server.runtimeOk=false;if(!S.server.configOk)setConn(false);render();if(!silent)log('Runtime load failed: '+e.message,'err');throw e;}} -async function loadMeasurements({silent=true}={}){try{const ms=await api('/measurements');S.server.measurements=ms;S.server.lastMeasurementsAt=Date.now();render();return ms;}catch(e){if(!silent)log('Measurements load failed: '+e.message,'err');throw e;}} +async function loadMeasurements({silent=true}={}){if(!S.telemetry.snapshotPollingActive)return S.server.measurements;try{const ms=await api('/measurements');S.server.measurements=ms;S.server.lastMeasurementsAt=Date.now();render();return ms;}catch(e){if(!silent)log('Measurements load failed: '+e.message,'err');throw e;}} +function connectTelemetryWS(){try{if(S.telemetry.ws){try{S.telemetry.ws.close();}catch{}}const proto=location.protocol==='https:'?'wss':'ws';const ws=new WebSocket(`${proto}://${location.host}/ws/telemetry`);S.telemetry.ws=ws;ws.onopen=()=>{S.telemetry.wsConnected=true;S.telemetry.snapshotPollingActive=false;render();log('Telemetry WS connected','ok');};ws.onmessage=(ev)=>{try{const msg=JSON.parse(ev.data);if(msg?.type==='measurement'&&msg.data){S.server.measurements={noData:false,stale:false,measurement:msg.data};S.server.lastMeasurementsAt=Date.now();render();}}catch(e){console.warn('telemetry ws parse',e);}};ws.onclose=()=>{S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;render();if(S.telemetry.ws===ws)S.telemetry.ws=null;if(S.telemetry.wsRetryTimer)clearTimeout(S.telemetry.wsRetryTimer);S.telemetry.wsRetryTimer=setTimeout(()=>connectTelemetryWS(),1500);};ws.onerror=()=>{try{ws.close();}catch{}};}catch(e){S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;}} function syncCfgFromServer(){Object.keys(CFG).forEach(k=>{if(S.cfgDraft[k]===undefined)S.cfgDraft[k]=cfgSrvVal(k);});Object.keys(S.cfgDirty).forEach(s=>{S.cfgDirty[s]=Object.keys(CFG).filter(k=>CFG[k].sec===s).some(k=>S.cfgDraft[k]!==undefined&&!cfgEq(k,S.cfgDraft[k],cfgSrvVal(k)));});} // ── History ──────────────────────────────────────────────────────────────── @@ -1082,7 +1084,7 @@ function setHTML(id,h){const el=$(id);if(el&&el.innerHTML!==h)el.innerHTML=h;} function setCls(id,c){const el=$(id);if(el)el.className=c;} function setMeter(fid,tid,ratio,text,mode='good'){const f=$(fid);if(!f)return;f.style.width=Math.max(0,Math.min(100,Math.round((ratio??0)*100)))+'%';f.className='meter-fill'+(mode==='warn'?' warn':mode==='err'?' err':'');setText(tid,text);} function drawSpark(svgId,vals,mode='good',maxO=null){const svg=$(svgId);if(!svg)return;svg.setAttribute('class',`spark ${mode}`);const W=160,H=34,pts=vals.length?vals:[0,0],mx=maxO!=null?maxO:Math.max(...pts,1),step=pts.length<=1?W:W/(pts.length-1);const coords=pts.map((v,i)=>{const x=i*step,y=H-4-((v-0)/(mx||1))*(H-8);return[x,y];});const line=coords.map(([x,y],i)=>`${i===0?'M':'L'}${x.toFixed(2)},${y.toFixed(2)}`).join(' ');svg.innerHTML=``;} -function updateHoldMeter(state,targetRms,targetPeak,{attack=1,decay=0.18,holdDecay=0.04}={}){state.rms=targetRms>state.rms?targetRms:Math.max(targetRms,state.rms-decay);state.peak=targetPeak>state.peak?targetPeak:Math.max(targetPeak,state.peak-decay*1.2);state.hold=targetPeak>=state.hold?targetPeak:Math.max(state.peak,state.hold-holdDecay);return state;} +function updateHoldMeter(state,targetRms,targetPeak,{attackPerSec=6.0,releasePerSec=1.4,rmsHoldMs=1400,rmsHoldReleasePerSec=0.12,peakReleasePerSec=1.1,holdMs=900,holdReleasePerSec=0.35,textRmsAttackPerSec=2.2,textRmsReleasePerSec=0.22,textPeakAttackPerSec=3.0,textPeakReleasePerSec=0.16,textRmsLatchMs=220,textPeakLatchMs=220,textRmsStep=0.01,textPeakStep=0.01,textRmsHysteresis=0.015,textPeakHysteresis=0.015}={}){const now=performance.now();const dt=Math.max(0.001,Math.min(0.25,state.lastTs?((now-state.lastTs)/1000):0.05));state.lastTs=now;const approach=(cur,target,upRate,downRate)=>{if(target>cur)return Math.min(target,cur+upRate*dt);return Math.max(target,cur-downRate*dt);};const quantize=(v,step)=>step>0?Math.round(v/step)*step:v;state.rms=approach(state.rms,targetRms,attackPerSec,releasePerSec);if(state.rms>=state.rmsHold){state.rmsHold=state.rms;state.rmsHoldTimerMs=rmsHoldMs;}else if(state.rmsHoldTimerMs>0){state.rmsHoldTimerMs=Math.max(0,state.rmsHoldTimerMs-dt*1000);}else{state.rmsHold=Math.max(state.rms,state.rmsHold-rmsHoldReleasePerSec*dt);}state.peak=approach(state.peak,targetPeak,attackPerSec*1.4,peakReleasePerSec);if(targetPeak>=state.hold){state.hold=targetPeak;state.holdTimerMs=holdMs;}else if(state.holdTimerMs>0){state.holdTimerMs=Math.max(0,state.holdTimerMs-dt*1000);}else{state.hold=Math.max(state.peak,state.hold-holdReleasePerSec*dt);}state.textRms=approach(state.textRms,state.rmsHold,textRmsAttackPerSec,textRmsReleasePerSec);state.textPeak=approach(state.textPeak,state.hold,textPeakAttackPerSec,textPeakReleasePerSec);state.textRmsLatchMs=Math.max(0,(state.textRmsLatchMs||0)-dt*1000);state.textPeakLatchMs=Math.max(0,(state.textPeakLatchMs||0)-dt*1000);const nextTextRms=quantize(state.textRms,textRmsStep);const nextTextPeak=quantize(state.textPeak,textPeakStep);if(state.textRmsLatchMs<=0&&Math.abs(nextTextRms-(state.textRmsDisplay||0))>=textRmsHysteresis){state.textRmsDisplay=nextTextRms;state.textRmsLatchMs=textRmsLatchMs;}if(state.textPeakLatchMs<=0&&Math.abs(nextTextPeak-(state.textPeakDisplay||0))>=textPeakHysteresis){state.textPeakDisplay=nextTextPeak;state.textPeakLatchMs=textPeakLatchMs;}return state;} function renderHifiMeter(fillId,peakId,textId,value,hold,text,scale=1){const fill=$(fillId),peak=$(peakId);if(fill)fill.style.width=`${Math.max(0,Math.min(100,(value/scale)*100))}%`;if(peak)peak.style.left=`${Math.max(0,Math.min(100,(hold/scale)*100))}%`;setText(textId,text);} function syncSlider(sid,did,key,fmt2=v=>v==null?'--':Number(v).toFixed(2)){const sl=$(sid);if(!sl)return;const n=cfgEff(key);if(document.activeElement!==sl&&n!=null)sl.value=String(Number(n));setText(did,fmt2(n));} @@ -1185,14 +1187,14 @@ function _render(){ setMeter('meter-stream-fill','meter-stream-text',urN<=0?1:Math.max(0,1-Math.min(urN,10)/10),urN===0?'Clean':`${urN} underrun${urN===1?'':'s'}`,urN===0?'good':urN<3?'warn':'err'); const txR=txSt==='running'?1:S.txBusy?.55:.08; setMeter('meter-tx-fill','meter-tx-text',txR,txSt==='running'?'Live':S.txBusy?'Working':'Idle',txSt==='running'?'good':S.txBusy?'warn':'err'); - const audioL=updateHoldMeter(S.meterState.audioL,Number(meas.lrPreEncodePostWatermark?.lRms||0),Number(meas.lrPreEncodePostWatermark?.lPeakAbs||0)); - const audioR=updateHoldMeter(S.meterState.audioR,Number(meas.lrPreEncodePostWatermark?.rRms||0),Number(meas.lrPreEncodePostWatermark?.rPeakAbs||0)); - const mpx=updateHoldMeter(S.meterState.mpx,0,Number(meas.compositeFinalPreIq?.peakAbs||0),{decay:0.12,holdDecay:0.025}); - renderHifiMeter('audio-l-rms-fill',null,'audio-l-rms-text',audioL.rms,0,typeof meas.lrPreEncodePostWatermark?.lRms==='number'?Number(meas.lrPreEncodePostWatermark.lRms).toFixed(2):'--'); - renderHifiMeter('audio-r-rms-fill',null,'audio-r-rms-text',audioR.rms,0,typeof meas.lrPreEncodePostWatermark?.rRms==='number'?Number(meas.lrPreEncodePostWatermark.rRms).toFixed(2):'--'); - renderHifiMeter('audio-l-peak-fill','audio-l-peak-marker','audio-l-peak-text',audioL.peak,audioL.hold,typeof meas.lrPreEncodePostWatermark?.lPeakAbs==='number'?Number(meas.lrPreEncodePostWatermark.lPeakAbs).toFixed(2):'--'); - renderHifiMeter('audio-r-peak-fill','audio-r-peak-marker','audio-r-peak-text',audioR.peak,audioR.hold,typeof meas.lrPreEncodePostWatermark?.rPeakAbs==='number'?Number(meas.lrPreEncodePostWatermark.rPeakAbs).toFixed(2):'--'); - renderHifiMeter('mpx-peak-fill','mpx-peak-marker','mpx-peak-text',mpx.peak,mpx.hold,typeof meas.compositeFinalPreIq?.peakAbs==='number'?Number(meas.compositeFinalPreIq.peakAbs).toFixed(2):'--',1.1); + const audioL=updateHoldMeter(S.meterState.audioL,Number(meas.lrPreEncodePostWatermark?.lRms||0),Number(meas.lrPreEncodePostWatermark?.lPeakAbs||0),{attackPerSec:7.0,releasePerSec:0.6,rmsHoldMs:1700,rmsHoldReleasePerSec:0.10,peakReleasePerSec:0.45,holdMs:1700,holdReleasePerSec:0.14,textRmsAttackPerSec:1.0,textRmsReleasePerSec:0.06,textPeakAttackPerSec:1.6,textPeakReleasePerSec:0.07,textRmsLatchMs:260,textPeakLatchMs:260,textRmsStep:0.01,textPeakStep:0.01,textRmsHysteresis:0.01,textPeakHysteresis:0.01}); + const audioR=updateHoldMeter(S.meterState.audioR,Number(meas.lrPreEncodePostWatermark?.rRms||0),Number(meas.lrPreEncodePostWatermark?.rPeakAbs||0),{attackPerSec:7.0,releasePerSec:0.6,rmsHoldMs:1700,rmsHoldReleasePerSec:0.10,peakReleasePerSec:0.45,holdMs:1700,holdReleasePerSec:0.14,textRmsAttackPerSec:1.0,textRmsReleasePerSec:0.06,textPeakAttackPerSec:1.6,textPeakReleasePerSec:0.07,textRmsLatchMs:260,textPeakLatchMs:260,textRmsStep:0.01,textPeakStep:0.01,textRmsHysteresis:0.01,textPeakHysteresis:0.01}); + const mpx=updateHoldMeter(S.meterState.mpx,0,Number(meas.compositeFinalPreIq?.peakAbs||0),{attackPerSec:8.0,releasePerSec:0.4,peakReleasePerSec:0.375,holdMs:2200,holdReleasePerSec:0.09,textPeakAttackPerSec:1.4,textPeakReleasePerSec:0.06,textPeakLatchMs:320,textPeakStep:0.01,textPeakHysteresis:0.01}); + renderHifiMeter('audio-l-rms-fill',null,'audio-l-rms-text',audioL.rms,0,typeof meas.lrPreEncodePostWatermark?.lRms==='number'?audioL.textRmsDisplay.toFixed(2):'--'); + renderHifiMeter('audio-r-rms-fill',null,'audio-r-rms-text',audioR.rms,0,typeof meas.lrPreEncodePostWatermark?.rRms==='number'?audioR.textRmsDisplay.toFixed(2):'--'); + renderHifiMeter('audio-l-peak-fill','audio-l-peak-marker','audio-l-peak-text',audioL.peak,audioL.hold,typeof meas.lrPreEncodePostWatermark?.lPeakAbs==='number'?audioL.textPeakDisplay.toFixed(2):'--'); + renderHifiMeter('audio-r-peak-fill','audio-r-peak-marker','audio-r-peak-text',audioR.peak,audioR.hold,typeof meas.lrPreEncodePostWatermark?.rPeakAbs==='number'?audioR.textPeakDisplay.toFixed(2):'--'); + renderHifiMeter('mpx-peak-fill','mpx-peak-marker','mpx-peak-text',mpx.peak,mpx.hold,typeof meas.compositeFinalPreIq?.peakAbs==='number'?mpx.textPeakDisplay.toFixed(2):'--',1.1); drawSpark('spark-audio',S.charts.audio,'good',1); drawSpark('spark-underruns',S.charts.underruns,urN>0?'err':'warn'); drawSpark('spark-tx',S.charts.tx,txSt==='running'?'good':'warn',1); @@ -1432,14 +1434,15 @@ function bindAll(){ } async function manualRefresh(){beginReq();try{await Promise.allSettled([loadConfig({silent:true}),loadRuntime({silent:true}),loadMeasurements({silent:true})]);toast('UI data refreshed','info');log('Manual refresh','info');}finally{endReq();}} -function startPollers(){if(S.pollersStarted)return;S.pollersStarted=true;setInterval(()=>loadRuntime({silent:true}),RUNTIME_MS);setInterval(()=>loadConfig({silent:true}),CONFIG_MS);setInterval(()=>loadMeasurements({silent:true}),200);} +function startPollers(){if(S.pollersStarted)return;S.pollersStarted=true;setInterval(()=>loadRuntime({silent:true}),RUNTIME_MS);setInterval(()=>loadConfig({silent:true}),CONFIG_MS);setInterval(()=>loadMeasurements({silent:true}),500);} async function init(){ bindAll();render(); log('ferrite.fm control UI booting','info'); await Promise.allSettled([loadConfig({silent:false}),loadRuntime({silent:true}),loadMeasurements({silent:true})]); + connectTelemetryWS(); render();startPollers(); - log('Polling active: runtime 1s · config 8s · measurements 200ms','ok'); + log('Polling active: runtime 1s · config 8s · measurements fallback 500ms · telemetry WS preferred','ok'); log('UI ready','info'); } init();