diff --git a/internal/control/control.go b/internal/control/control.go index 2fd125f..63b1918 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -117,6 +117,16 @@ var audioStreamAllowedMediaTypes = []string{ var audioStreamBodyLimit = int64(audioStreamBodyLimitDefault) // bytes allowed per /audio/stream request; tests may override. +func anyToString(v any) string { + if v == nil { + return "" + } + if s, ok := v.(string); ok { + return s + } + return "" +} + func isJSONContentType(r *http.Request) bool { ct := strings.TrimSpace(r.Header.Get("Content-Type")) if ct == "" { @@ -373,23 +383,28 @@ func (s *Server) handleMeasurements(w http.ResponseWriter, _ *http.Request) { result := map[string]any{"noData": true, "stale": true} if tx != nil { if stats := tx.TXStats(); stats != nil { - if measurement, ok := stats["measurement"]; ok && measurement != nil { - result = map[string]any{"noData": false, "stale": false, "measurement": measurement} - if state, ok := stats["state"]; ok { - result["state"] = state - } - if applied, ok := stats["appliedFrequencyMHz"]; ok { - result["appliedFrequencyMHz"] = applied - } - if queue, ok := stats["queue"]; ok { - result["queue"] = queue - } - if runtimeIndicator, ok := stats["runtimeIndicator"]; ok { - result["runtimeIndicator"] = runtimeIndicator - } - if runtimeAlert, ok := stats["runtimeAlert"]; ok { - result["runtimeAlert"] = runtimeAlert - } + if state, ok := stats["state"]; ok { + result["state"] = state + } + if applied, ok := stats["appliedFrequencyMHz"]; ok { + result["appliedFrequencyMHz"] = applied + } + if queue, ok := stats["queue"]; ok { + result["queue"] = queue + } + if runtimeIndicator, ok := stats["runtimeIndicator"]; ok { + result["runtimeIndicator"] = runtimeIndicator + } + if runtimeAlert, ok := stats["runtimeAlert"]; ok { + result["runtimeAlert"] = runtimeAlert + } + if measurement, ok := stats["measurement"].(*offpkg.MeasurementSnapshot); ok && measurement != nil { + state := strings.ToLower(strings.TrimSpace(anyToString(stats["state"]))) + fresh := !measurement.Timestamp.IsZero() && time.Since(measurement.Timestamp) <= 2*time.Second + runningish := state == "running" || state == "degraded" || state == "muted" || state == "faulted" || state == "arming" || state == "prebuffering" + result["measurement"] = measurement + result["noData"] = false + result["stale"] = !(fresh && runningish) } } } diff --git a/internal/control/control_test.go b/internal/control/control_test.go index b0492e5..c9c8bf3 100644 --- a/internal/control/control_test.go +++ b/internal/control/control_test.go @@ -14,6 +14,7 @@ import ( cfgpkg "github.com/jan/fm-rds-tx/internal/config" "github.com/jan/fm-rds-tx/internal/ingest" + offpkg "github.com/jan/fm-rds-tx/internal/offline" "github.com/jan/fm-rds-tx/internal/output" ) @@ -1018,3 +1019,112 @@ func (f *fakeTXController) TXStats() map[string]any { } func (f *fakeTXController) UpdateConfig(_ LivePatch) error { return f.updateErr } func (f *fakeTXController) ResetFault() error { return f.resetErr } + +func TestMeasurementsNoDataWhenMissing(t *testing.T) { + srv := NewServer(cfgpkg.Default()) + srv.SetTXController(&fakeTXController{stats: map[string]any{"state": "idle"}}) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/measurements", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("status: %d", rec.Code) + } + var body map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if body["noData"] != true { + t.Fatalf("expected noData=true, got %v", body["noData"]) + } + if body["stale"] != true { + t.Fatalf("expected stale=true, got %v", body["stale"]) + } +} + +func TestMeasurementsRunningFreshSnapshotIsNotStale(t *testing.T) { + srv := NewServer(cfgpkg.Default()) + srv.SetTXController(&fakeTXController{stats: map[string]any{ + "state": "running", + "measurement": &offpkg.MeasurementSnapshot{Timestamp: time.Now(), Sequence: 7}, + }}) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/measurements", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("status: %d", rec.Code) + } + var body map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if body["noData"] != false { + t.Fatalf("expected noData=false, got %v", body["noData"]) + } + if body["stale"] != false { + t.Fatalf("expected stale=false, got %v", body["stale"]) + } +} + +func TestMeasurementsIdleSnapshotIsStale(t *testing.T) { + srv := NewServer(cfgpkg.Default()) + srv.SetTXController(&fakeTXController{stats: map[string]any{ + "state": "idle", + "measurement": &offpkg.MeasurementSnapshot{Timestamp: time.Now(), Sequence: 9}, + }}) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/measurements", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("status: %d", rec.Code) + } + var body map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if body["noData"] != false { + t.Fatalf("expected noData=false, got %v", body["noData"]) + } + if body["stale"] != true { + t.Fatalf("expected stale=true, got %v", body["stale"]) + } +} + +func TestMeasurementsOldSnapshotIsStale(t *testing.T) { + srv := NewServer(cfgpkg.Default()) + srv.SetTXController(&fakeTXController{stats: map[string]any{ + "state": "running", + "measurement": &offpkg.MeasurementSnapshot{Timestamp: time.Now().Add(-3 * time.Second), Sequence: 11}, + }}) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/measurements", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("status: %d", rec.Code) + } + var body map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if body["stale"] != true { + t.Fatalf("expected stale=true, got %v", body["stale"]) + } +} + +func TestTelemetryUnsubscribeDuringPublishDoesNotPanic(t *testing.T) { + hub := NewTelemetryHub() + sub, unsubscribe := hub.Subscribe() + done := make(chan struct{}) + go func() { + defer close(done) + unsubscribe() + }() + for i := 0; i < 100; i++ { + hub.PublishMeasurement(&offpkg.MeasurementSnapshot{Timestamp: time.Now(), Sequence: uint64(i + 1)}) + } + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("unsubscribe did not complete") + } + select { + case <-sub.done: + case <-time.After(2 * time.Second): + t.Fatal("subscriber done not closed") + } +} diff --git a/internal/control/ui.html b/internal/control/ui.html index b43be36..a4fd755 100644 --- a/internal/control/ui.html +++ b/internal/control/ui.html @@ -913,7 +913,7 @@ let toastTimer=null; const S={ server:{config:null,runtime:null,measurements:null,configOk:false,runtimeOk:false,lastConfigAt:0,lastRuntimeAt:0,lastMeasurementsAt:0}, - telemetry:{ws:null,wsConnected:false,wsRetryTimer:null,snapshotPollingActive:true}, + telemetry:{ws:null,wsConnected:false,wsRetryTimer:null,snapshotPollingActive:true,source:'snapshot',lastWsMessageAt:null,fallbackReason:null,reconnectDelayMs:1500}, lastRTState:'',draft:{},errors:{},dirty:new Set(), fieldErrors:{}, flowSelected:null,flowHover:null,flowAnchor:null, @@ -1014,8 +1014,10 @@ async function api(path,opts){const r=await fetch(path,opts);const t=await r.tex function setConn(ok,label){const led=$('led-conn'),lbl=$('conn-label');led.className='led '+(ok?S.pending>0?'on-amber':'on-green':'on-red');lbl.textContent=ok?S.pending>0?'busy':label||'connected':label||'offline';} async function loadConfig({silent=false}={}){try{const cfg=await api('/config');S.server.config=cfg;S.server.configOk=true;S.server.lastConfigAt=Date.now();syncIngDraft();syncCfgFromServer();syncFreqPresetIdx(cfg.fm?.frequencyMHz);setConn(true);render();if(!silent)log('Config synchronized','info');return cfg;}catch(e){S.server.configOk=false;if(!S.server.runtimeOk)setConn(false);render();if(!silent)log('Config load failed: '+e.message,'err');throw e;}} async function loadRuntime({silent=true}={}){try{const rt=await api('/runtime');S.server.runtime=rt;S.server.runtimeOk=true;S.server.lastRuntimeAt=Date.now();const synced=syncTransitions(rt.engine);notifyTransition(rt.engine,!synced);pushHistory(rt);setConn(true);render();return rt;}catch(e){S.server.runtimeOk=false;if(!S.server.configOk)setConn(false);render();if(!silent)log('Runtime load failed: '+e.message,'err');throw e;}} -async function loadMeasurements({silent=true}={}){if(!S.telemetry.snapshotPollingActive)return S.server.measurements;try{const ms=await api('/measurements');S.server.measurements=ms;S.server.lastMeasurementsAt=Date.now();render();return ms;}catch(e){if(!silent)log('Measurements load failed: '+e.message,'err');throw e;}} -function connectTelemetryWS(){try{if(S.telemetry.ws){try{S.telemetry.ws.close();}catch{}}const proto=location.protocol==='https:'?'wss':'ws';const ws=new WebSocket(`${proto}://${location.host}/ws/telemetry`);S.telemetry.ws=ws;ws.onopen=()=>{S.telemetry.wsConnected=true;S.telemetry.snapshotPollingActive=false;render();log('Telemetry WS connected','ok');};ws.onmessage=(ev)=>{try{const msg=JSON.parse(ev.data);if(msg?.type==='measurement'&&msg.data){S.server.measurements={noData:false,stale:false,measurement:msg.data};S.server.lastMeasurementsAt=Date.now();render();}}catch(e){console.warn('telemetry ws parse',e);}};ws.onclose=()=>{S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;render();if(S.telemetry.ws===ws)S.telemetry.ws=null;if(S.telemetry.wsRetryTimer)clearTimeout(S.telemetry.wsRetryTimer);S.telemetry.wsRetryTimer=setTimeout(()=>connectTelemetryWS(),1500);};ws.onerror=()=>{try{ws.close();}catch{}};}catch(e){S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;}} +async function loadMeasurements({silent=true}={}){if(!S.telemetry.snapshotPollingActive)return S.server.measurements;try{const ms=await api('/measurements');S.server.measurements=ms;S.server.lastMeasurementsAt=Date.now();S.telemetry.source='snapshot';render();return ms;}catch(e){if(!silent)log('Measurements load failed: '+e.message,'err');throw e;}} +function scheduleLiveRender(){if(S.liveRenderScheduled)return;S.liveRenderScheduled=true;requestAnimationFrame(()=>{S.liveRenderScheduled=false;renderLiveOnly();});} +function mergeLiveMeasurement(msg){const prev=S.server.measurements||{};S.server.measurements={...prev,measurement:msg.data};S.server.lastMeasurementsAt=Date.now();S.telemetry.lastWsMessageAt=Date.now();S.telemetry.source='ws';} +function connectTelemetryWS(){try{if(S.telemetry.ws){try{S.telemetry.ws.close();}catch{}}const proto=location.protocol==='https:'?'wss':'ws';const ws=new WebSocket(`${proto}://${location.host}/ws/telemetry`);S.telemetry.ws=ws;ws.onopen=()=>{S.telemetry.wsConnected=true;S.telemetry.snapshotPollingActive=false;S.telemetry.fallbackReason=null;S.telemetry.source='ws';S.telemetry.reconnectDelayMs=1500;render();log('Telemetry WS connected','ok');};ws.onmessage=(ev)=>{try{const msg=JSON.parse(ev.data);if(msg?.type==='measurement'&&msg.data){mergeLiveMeasurement(msg);scheduleLiveRender();}}catch(e){console.warn('telemetry ws parse',e);}};ws.onclose=()=>{S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;S.telemetry.fallbackReason='ws-disconnected';S.telemetry.source='snapshot';render();if(S.telemetry.ws===ws)S.telemetry.ws=null;if(S.telemetry.wsRetryTimer)clearTimeout(S.telemetry.wsRetryTimer);const delay=S.telemetry.reconnectDelayMs||1500;S.telemetry.wsRetryTimer=setTimeout(()=>connectTelemetryWS(),delay);S.telemetry.reconnectDelayMs=Math.min(10000,Math.round(delay*1.7));};ws.onerror=()=>{try{ws.close();}catch{}};}catch(e){S.telemetry.wsConnected=false;S.telemetry.snapshotPollingActive=true;S.telemetry.fallbackReason='ws-error';S.telemetry.source='snapshot';}} function syncCfgFromServer(){Object.keys(CFG).forEach(k=>{if(S.cfgDraft[k]===undefined)S.cfgDraft[k]=cfgSrvVal(k);});Object.keys(S.cfgDirty).forEach(s=>{S.cfgDirty[s]=Object.keys(CFG).filter(k=>CFG[k].sec===s).some(k=>S.cfgDraft[k]!==undefined&&!cfgEq(k,S.cfgDraft[k],cfgSrvVal(k)));});} // ── History ──────────────────────────────────────────────────────────────── @@ -1048,10 +1050,11 @@ function renderFlow(){const chain=$('flow-chain');if(!chain)return;const data=fl const sourceSummary=(()=>{const kind=String(S.server.runtime?.ingest?.active?.kind||S.server.config?.ingest?.kind||'none');const endpoint=String(S.server.runtime?.ingest?.active?.origin?.endpoint||'');try{return joinParts([kind,new URL(endpoint).host]);}catch{return joinParts([kind,S.server.runtime?.ingest?.active?.origin?.streamName||'']);}})(); const ingestState=(S.server.runtime?.ingest?.runtime?.state||S.server.runtime?.ingest?.source?.state||'--'); const runtimeAge=Number(S.server.runtime?.engine?.runtimeStateDurationSeconds); - setText('flow-top-applied',isFinite(Number(S.server.runtime?.engine?.appliedFrequencyMHz))?`${Number(S.server.runtime.engine.appliedFrequencyMHz).toFixed(1)} MHz`:(isFinite(Number(S.server.measurements?.appliedFrequencyMHz))?`${Number(S.server.measurements.appliedFrequencyMHz).toFixed(1)} MHz`:'--'));setText('flow-top-target',isFinite(Number(S.server.config?.fm?.frequencyMHz))?`${Number(S.server.config.fm.frequencyMHz).toFixed(1)} MHz`:'--');setText('flow-top-source',sourceSummary||'--');setText('flow-top-alert',issue?issue.text:(S.server.measurements?.runtimeAlert||'None'));setText('flow-bottom-queue',String(S.server.runtime?.engine?.queue?.health||S.server.measurements?.queue?.health||'--').toUpperCase());setText('flow-bottom-ingest',String(ingestState||'--').toUpperCase());setText('flow-bottom-age',isFinite(runtimeAge)?fmtTime(runtimeAge):'--');setText('flow-bottom-update',ageStr(Math.max(S.server.lastConfigAt||0,S.server.lastRuntimeAt||0,S.server.lastMeasurementsAt||0)));renderFlowPopover(); + setText('flow-top-applied',isFinite(Number(S.server.runtime?.engine?.appliedFrequencyMHz))?`${Number(S.server.runtime.engine.appliedFrequencyMHz).toFixed(1)} MHz`:(isFinite(Number(S.server.measurements?.appliedFrequencyMHz))?`${Number(S.server.measurements.appliedFrequencyMHz).toFixed(1)} MHz`:'--'));setText('flow-top-target',isFinite(Number(S.server.config?.fm?.frequencyMHz))?`${Number(S.server.config.fm.frequencyMHz).toFixed(1)} MHz`:'--');setText('flow-top-source',sourceSummary||'--');setText('flow-top-alert',issue?issue.text:(S.server.measurements?.runtimeAlert||'None'));setText('flow-bottom-queue',String(S.server.runtime?.engine?.queue?.health||S.server.measurements?.queue?.health||'--').toUpperCase());setText('flow-bottom-ingest',String(ingestState||'--').toUpperCase());setText('flow-bottom-age',isFinite(runtimeAge)?fmtTime(runtimeAge):'--');setText('flow-bottom-update',ageStr(Math.max(S.server.lastConfigAt||0,S.server.lastRuntimeAt||0,S.server.lastMeasurementsAt||0)));if(!S.flowSelected||!flowPopoverHasActiveEditor())renderFlowPopover(); } function showFlowTooltip(key,anchor){const tip=$('flow-tooltip');if(!tip||S.flowSelected===key)return;const d=flowNodeData()[key];if(!d)return;tip.innerHTML=`