Pārlūkot izejas kodu

fix: refine measurement semantics and stabilize live UI updates

main
Jan pirms 4 nedēļām
vecāks
revīzija
ddb8a75702
3 mainītis faili ar 150 papildinājumiem un 21 dzēšanām
  1. +32
    -17
      internal/control/control.go
  2. +110
    -0
      internal/control/control_test.go
  3. +8
    -4
      internal/control/ui.html

+ 32
- 17
internal/control/control.go Parādīt failu

@@ -117,6 +117,16 @@ var audioStreamAllowedMediaTypes = []string{

var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override.

func anyToString(v any) string {
if v == nil {
return ""
}
if s, ok := v.(string); ok {
return s
}
return ""
}

func isJSONContentType(r *http.Request) bool {
ct := strings.TrimSpace(r.Header.Get("Content-Type"))
if ct == "" {
@@ -373,23 +383,28 @@ func (s *Server) handleMeasurements(w http.ResponseWriter, _ *http.Request) {
result := map[string]any{"noData": true, "stale": true}
if tx != nil {
if stats := tx.TXStats(); stats != nil {
if measurement, ok := stats["measurement"]; ok && measurement != nil {
result = map[string]any{"noData": false, "stale": false, "measurement": measurement}
if state, ok := stats["state"]; ok {
result["state"] = state
}
if applied, ok := stats["appliedFrequencyMHz"]; ok {
result["appliedFrequencyMHz"] = applied
}
if queue, ok := stats["queue"]; ok {
result["queue"] = queue
}
if runtimeIndicator, ok := stats["runtimeIndicator"]; ok {
result["runtimeIndicator"] = runtimeIndicator
}
if runtimeAlert, ok := stats["runtimeAlert"]; ok {
result["runtimeAlert"] = runtimeAlert
}
if state, ok := stats["state"]; ok {
result["state"] = state
}
if applied, ok := stats["appliedFrequencyMHz"]; ok {
result["appliedFrequencyMHz"] = applied
}
if queue, ok := stats["queue"]; ok {
result["queue"] = queue
}
if runtimeIndicator, ok := stats["runtimeIndicator"]; ok {
result["runtimeIndicator"] = runtimeIndicator
}
if runtimeAlert, ok := stats["runtimeAlert"]; ok {
result["runtimeAlert"] = runtimeAlert
}
if measurement, ok := stats["measurement"].(*offpkg.MeasurementSnapshot); ok && measurement != nil {
state := strings.ToLower(strings.TrimSpace(anyToString(stats["state"])))
fresh := !measurement.Timestamp.IsZero() && time.Since(measurement.Timestamp) <= 2*time.Second
runningish := state == "running" || state == "degraded" || state == "muted" || state == "faulted" || state == "arming" || state == "prebuffering"
result["measurement"] = measurement
result["noData"] = false
result["stale"] = !(fresh && runningish)
}
}
}


+ 110
- 0
internal/control/control_test.go Parādīt failu

@@ -14,6 +14,7 @@ import (

cfgpkg "github.com/jan/fm-rds-tx/internal/config"
"github.com/jan/fm-rds-tx/internal/ingest"
offpkg "github.com/jan/fm-rds-tx/internal/offline"
"github.com/jan/fm-rds-tx/internal/output"
)

@@ -1018,3 +1019,112 @@ func (f *fakeTXController) TXStats() map[string]any {
}
func (f *fakeTXController) UpdateConfig(_ LivePatch) error { return f.updateErr }
func (f *fakeTXController) ResetFault() error { return f.resetErr }

func TestMeasurementsNoDataWhenMissing(t *testing.T) {
srv := NewServer(cfgpkg.Default())
srv.SetTXController(&fakeTXController{stats: map[string]any{"state": "idle"}})
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/measurements", nil))
if rec.Code != http.StatusOK {
t.Fatalf("status: %d", rec.Code)
}
var body map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if body["noData"] != true {
t.Fatalf("expected noData=true, got %v", body["noData"])
}
if body["stale"] != true {
t.Fatalf("expected stale=true, got %v", body["stale"])
}
}

func TestMeasurementsRunningFreshSnapshotIsNotStale(t *testing.T) {
srv := NewServer(cfgpkg.Default())
srv.SetTXController(&fakeTXController{stats: map[string]any{
"state": "running",
"measurement": &offpkg.MeasurementSnapshot{Timestamp: time.Now(), Sequence: 7},
}})
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/measurements", nil))
if rec.Code != http.StatusOK {
t.Fatalf("status: %d", rec.Code)
}
var body map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if body["noData"] != false {
t.Fatalf("expected noData=false, got %v", body["noData"])
}
if body["stale"] != false {
t.Fatalf("expected stale=false, got %v", body["stale"])
}
}

func TestMeasurementsIdleSnapshotIsStale(t *testing.T) {
srv := NewServer(cfgpkg.Default())
srv.SetTXController(&fakeTXController{stats: map[string]any{
"state": "idle",
"measurement": &offpkg.MeasurementSnapshot{Timestamp: time.Now(), Sequence: 9},
}})
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/measurements", nil))
if rec.Code != http.StatusOK {
t.Fatalf("status: %d", rec.Code)
}
var body map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if body["noData"] != false {
t.Fatalf("expected noData=false, got %v", body["noData"])
}
if body["stale"] != true {
t.Fatalf("expected stale=true, got %v", body["stale"])
}
}

func TestMeasurementsOldSnapshotIsStale(t *testing.T) {
srv := NewServer(cfgpkg.Default())
srv.SetTXController(&fakeTXController{stats: map[string]any{
"state": "running",
"measurement": &offpkg.MeasurementSnapshot{Timestamp: time.Now().Add(-3 * time.Second), Sequence: 11},
}})
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/measurements", nil))
if rec.Code != http.StatusOK {
t.Fatalf("status: %d", rec.Code)
}
var body map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if body["stale"] != true {
t.Fatalf("expected stale=true, got %v", body["stale"])
}
}

func TestTelemetryUnsubscribeDuringPublishDoesNotPanic(t *testing.T) {
hub := NewTelemetryHub()
sub, unsubscribe := hub.Subscribe()
done := make(chan struct{})
go func() {
defer close(done)
unsubscribe()
}()
for i := 0; i < 100; i++ {
hub.PublishMeasurement(&offpkg.MeasurementSnapshot{Timestamp: time.Now(), Sequence: uint64(i + 1)})
}
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("unsubscribe did not complete")
}
select {
case <-sub.done:
case <-time.After(2 * time.Second):
t.Fatal("subscriber done not closed")
}
}

+ 8
- 4
internal/control/ui.html Parādīt failu

@@ -913,7 +913,7 @@ let toastTimer=null;

const S={
server:{config:null,runtime:null,measurements:null,configOk:false,runtimeOk:false,lastConfigAt:0,lastRuntimeAt:0,lastMeasurementsAt:0},
telemetry:{ws:null,wsConnected:false,wsRetryTimer:null,snapshotPollingActive:true},
telemetry:{ws:null,wsConnected:false,wsRetryTimer:null,snapshotPollingActive:true,source:'snapshot',lastWsMessageAt:null,fallbackReason:null,reconnectDelayMs:1500},
lastRTState:'',draft:{},errors:{},dirty:new Set(),
fieldErrors:{},
flowSelected:null,flowHover:null,flowAnchor:null,
@@ -1014,8 +1014,10 @@ async function api(path,opts){const r=await fetch(path,opts);const t=await r.tex
function setConn(ok,label){const led=$('led-conn'),lbl=$('conn-label');led.className='led '+(ok?S.pending>0?'on-amber':'on-green':'on-red');lbl.textContent=ok?S.pending>0?'busy':label||'connected':label||'offline';}
async function loadConfig({silent=false}={}){try{const cfg=await api('/config');S.server.config=cfg;S.server.configOk=true;S.server.lastConfigAt=Date.now();syncIngDraft();syncCfgFromServer();syncFreqPresetIdx(cfg.fm?.frequencyMHz);setConn(true);render();if(!silent)log('Config synchronized','info');return cfg;}catch(e){S.server.configOk=false;if(!S.server.runtimeOk)setConn(false);render();if(!silent)log('Config load failed: '+e.message,'err');throw e;}}
async function loadRuntime({silent=true}={}){try{const rt=await api('/runtime');S.server.runtime=rt;S.server.runtimeOk=true;S.server.lastRuntimeAt=Date.now();const synced=syncTransitions(rt.engine);notifyTransition(rt.engine,!synced);pushHistory(rt);setConn(true);render();return rt;}catch(e){S.server.runtimeOk=false;if(!S.server.configOk)setConn(false);render();if(!silent)log('Runtime load failed: '+e.message,'err');throw e;}}
async function loadMeasurements({silent=true}={}){if(!S.telemetry.snapshotPollingActive)return S.server.measurements;try{const ms=await api('/measurements');S.server.measurements=ms;S.server.lastMeasurementsAt=Date.now();render();return ms;}catch(e){if(!silent)log('Measurements load failed: '+e.message,'err');throw e;}}
function connectTelemetryWS(){try{if(S.telemetry.ws){try{S.telemetry.ws.close();}catch{}}const proto=location.protocol==='https:'?'wss':'ws';const ws=new WebSocket(`${proto}://${location.host}/ws/telemetry`);S.telemetry.ws=ws;ws.onopen=()=>{S.telemetry.wsConnected=true;S.telemetry.snapshotPollingActive=false;render();log('Telemetry WS connected','ok');};ws.onmessage=(ev)=>{try{const msg=JSON.parse(ev.data);if(msg?.type==='measurement'&&msg.data){S.server.measurements={noData:false,stale:false,measurement:msg.data};S.server.lastMeasurementsAt=Date.now();render();}}catch(e){console.warn('telemetry ws parse',e);}};ws.onclose=()=>{S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;render();if(S.telemetry.ws===ws)S.telemetry.ws=null;if(S.telemetry.wsRetryTimer)clearTimeout(S.telemetry.wsRetryTimer);S.telemetry.wsRetryTimer=setTimeout(()=>connectTelemetryWS(),1500);};ws.onerror=()=>{try{ws.close();}catch{}};}catch(e){S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;}}
async function loadMeasurements({silent=true}={}){if(!S.telemetry.snapshotPollingActive)return S.server.measurements;try{const ms=await api('/measurements');S.server.measurements=ms;S.server.lastMeasurementsAt=Date.now();S.telemetry.source='snapshot';render();return ms;}catch(e){if(!silent)log('Measurements load failed: '+e.message,'err');throw e;}}
function scheduleLiveRender(){if(S.liveRenderScheduled)return;S.liveRenderScheduled=true;requestAnimationFrame(()=>{S.liveRenderScheduled=false;renderLiveOnly();});}
function mergeLiveMeasurement(msg){const prev=S.server.measurements||{};S.server.measurements={...prev,measurement:msg.data};S.server.lastMeasurementsAt=Date.now();S.telemetry.lastWsMessageAt=Date.now();S.telemetry.source='ws';}
function connectTelemetryWS(){try{if(S.telemetry.ws){try{S.telemetry.ws.close();}catch{}}const proto=location.protocol==='https:'?'wss':'ws';const ws=new WebSocket(`${proto}://${location.host}/ws/telemetry`);S.telemetry.ws=ws;ws.onopen=()=>{S.telemetry.wsConnected=true;S.telemetry.snapshotPollingActive=false;S.telemetry.fallbackReason=null;S.telemetry.source='ws';S.telemetry.reconnectDelayMs=1500;render();log('Telemetry WS connected','ok');};ws.onmessage=(ev)=>{try{const msg=JSON.parse(ev.data);if(msg?.type==='measurement'&&msg.data){mergeLiveMeasurement(msg);scheduleLiveRender();}}catch(e){console.warn('telemetry ws parse',e);}};ws.onclose=()=>{S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;S.telemetry.fallbackReason='ws-disconnected';S.telemetry.source='snapshot';render();if(S.telemetry.ws===ws)S.telemetry.ws=null;if(S.telemetry.wsRetryTimer)clearTimeout(S.telemetry.wsRetryTimer);const delay=S.telemetry.reconnectDelayMs||1500;S.telemetry.wsRetryTimer=setTimeout(()=>connectTelemetryWS(),delay);S.telemetry.reconnectDelayMs=Math.min(10000,Math.round(delay*1.7));};ws.onerror=()=>{try{ws.close();}catch{}};}catch(e){S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;S.telemetry.fallbackReason='ws-error';S.telemetry.source='snapshot';}}
function syncCfgFromServer(){Object.keys(CFG).forEach(k=>{if(S.cfgDraft[k]===undefined)S.cfgDraft[k]=cfgSrvVal(k);});Object.keys(S.cfgDirty).forEach(s=>{S.cfgDirty[s]=Object.keys(CFG).filter(k=>CFG[k].sec===s).some(k=>S.cfgDraft[k]!==undefined&&!cfgEq(k,S.cfgDraft[k],cfgSrvVal(k)));});}

// ── History ────────────────────────────────────────────────────────────────
@@ -1048,10 +1050,11 @@ function renderFlow(){const chain=$('flow-chain');if(!chain)return;const data=fl
const sourceSummary=(()=>{const kind=String(S.server.runtime?.ingest?.active?.kind||S.server.config?.ingest?.kind||'none');const endpoint=String(S.server.runtime?.ingest?.active?.origin?.endpoint||'');try{return joinParts([kind,new URL(endpoint).host]);}catch{return joinParts([kind,S.server.runtime?.ingest?.active?.origin?.streamName||'']);}})();
const ingestState=(S.server.runtime?.ingest?.runtime?.state||S.server.runtime?.ingest?.source?.state||'--');
const runtimeAge=Number(S.server.runtime?.engine?.runtimeStateDurationSeconds);
setText('flow-top-applied',isFinite(Number(S.server.runtime?.engine?.appliedFrequencyMHz))?`${Number(S.server.runtime.engine.appliedFrequencyMHz).toFixed(1)} MHz`:(isFinite(Number(S.server.measurements?.appliedFrequencyMHz))?`${Number(S.server.measurements.appliedFrequencyMHz).toFixed(1)} MHz`:'--'));setText('flow-top-target',isFinite(Number(S.server.config?.fm?.frequencyMHz))?`${Number(S.server.config.fm.frequencyMHz).toFixed(1)} MHz`:'--');setText('flow-top-source',sourceSummary||'--');setText('flow-top-alert',issue?issue.text:(S.server.measurements?.runtimeAlert||'None'));setText('flow-bottom-queue',String(S.server.runtime?.engine?.queue?.health||S.server.measurements?.queue?.health||'--').toUpperCase());setText('flow-bottom-ingest',String(ingestState||'--').toUpperCase());setText('flow-bottom-age',isFinite(runtimeAge)?fmtTime(runtimeAge):'--');setText('flow-bottom-update',ageStr(Math.max(S.server.lastConfigAt||0,S.server.lastRuntimeAt||0,S.server.lastMeasurementsAt||0)));renderFlowPopover();
setText('flow-top-applied',isFinite(Number(S.server.runtime?.engine?.appliedFrequencyMHz))?`${Number(S.server.runtime.engine.appliedFrequencyMHz).toFixed(1)} MHz`:(isFinite(Number(S.server.measurements?.appliedFrequencyMHz))?`${Number(S.server.measurements.appliedFrequencyMHz).toFixed(1)} MHz`:'--'));setText('flow-top-target',isFinite(Number(S.server.config?.fm?.frequencyMHz))?`${Number(S.server.config.fm.frequencyMHz).toFixed(1)} MHz`:'--');setText('flow-top-source',sourceSummary||'--');setText('flow-top-alert',issue?issue.text:(S.server.measurements?.runtimeAlert||'None'));setText('flow-bottom-queue',String(S.server.runtime?.engine?.queue?.health||S.server.measurements?.queue?.health||'--').toUpperCase());setText('flow-bottom-ingest',String(ingestState||'--').toUpperCase());setText('flow-bottom-age',isFinite(runtimeAge)?fmtTime(runtimeAge):'--');setText('flow-bottom-update',ageStr(Math.max(S.server.lastConfigAt||0,S.server.lastRuntimeAt||0,S.server.lastMeasurementsAt||0)));if(!S.flowSelected||!flowPopoverHasActiveEditor())renderFlowPopover();
}
function showFlowTooltip(key,anchor){const tip=$('flow-tooltip');if(!tip||S.flowSelected===key)return;const d=flowNodeData()[key];if(!d)return;tip.innerHTML=`<div class="flow-tooltip-title">${FLOW_NODES.find(n=>n.key===key)?.label||key}</div><div class="flow-tooltip-status">${String(d.state||'idle').toUpperCase()}</div><div class="flow-tooltip-lines">${(d.lines||[]).map(line=>`<div>${line}</div>`).join('')}</div>`;const r=anchor.getBoundingClientRect();tip.style.left=`${Math.min(window.innerWidth-300,Math.max(12,r.left + window.scrollX))}px`;tip.style.top=`${r.bottom + window.scrollY + 8}px`;tip.classList.add('show');}
function hideFlowTooltip(){const tip=$('flow-tooltip');if(tip)tip.classList.remove('show');}
function flowPopoverHasActiveEditor(){const pop=$('flow-popover');if(!pop||!pop.classList.contains('show'))return false;const ae=document.activeElement;if(!ae)return false;return pop.contains(ae)&&((ae.tagName||'').toLowerCase()==='input'||(ae.tagName||'').toLowerCase()==='select'||(ae.tagName||'').toLowerCase()==='textarea'||ae.isContentEditable);}
function openFlowPopover(key,anchor){S.flowSelected=S.flowSelected===key?null:key;S.flowAnchor=S.flowSelected?anchor:null;hideFlowTooltip();render();}
function closeFlowPopover(){S.flowSelected=null;S.flowAnchor=null;render();}
function flowJump(tab){document.querySelectorAll('.tab-btn[data-tab]').forEach(b=>b.classList.toggle('active',b.dataset.tab===tab));document.querySelectorAll('.tab-panel[data-tab-panel]').forEach(p=>p.classList.toggle('active',p.dataset.tabPanel===tab));closeFlowPopover();}
@@ -1155,6 +1158,7 @@ async function saveIngest(){if(S.ingestSaving)return;if(!S.ingestDirty){toast('N

// ── Render ─────────────────────────────────────────────────────────────────
function render(){try{_render();}catch(e){console.error('[render]',e);}}
function renderLiveOnly(){try{_render();}catch(e){console.error('[renderLiveOnly]',e);}}

function _render(){
const cfg=S.server.config||{},rt=S.server.runtime||{},eng=rt.engine||{},drv=rt.driver||{},aud=rt.audioStream||null,ing=rt.ingest||{},ingRt=ing.runtime||{},measWrap=S.server.measurements||{},meas=measWrap.measurement||eng.measurement||{};


Notiek ielāde…
Atcelt
Saglabāt