Wideband autonomous SDR analysis engine forked from sdr-visual-suite
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2112 lines
69KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "sort"
  15. "sync"
  16. "time"
  17. "sdr-wideband-suite/internal/classifier"
  18. "sdr-wideband-suite/internal/demod"
  19. "sdr-wideband-suite/internal/detector"
  20. "sdr-wideband-suite/internal/dsp"
  21. "sdr-wideband-suite/internal/logging"
  22. "sdr-wideband-suite/internal/telemetry"
  23. )
  24. // ---------------------------------------------------------------------------
  25. // streamSession — one open demod session for one signal
  26. // ---------------------------------------------------------------------------
  27. type streamSession struct {
  28. sessionID string
  29. signalID int64
  30. centerHz float64
  31. bwHz float64
  32. snrDb float64
  33. peakDb float64
  34. class *classifier.Classification
  35. startTime time.Time
  36. lastFeed time.Time
  37. playbackMode string
  38. stereoState string
  39. lastAudioTs time.Time
  40. debugDumpStart time.Time
  41. debugDumpUntil time.Time
  42. debugDumpBase string
  43. demodDump []float32
  44. finalDump []float32
  45. lastAudioL float32
  46. lastAudioR float32
  47. prevAudioL float64 // second-to-last L sample for boundary transient detection
  48. lastAudioSet bool
  49. lastDecIQ complex64
  50. prevDecIQ complex64
  51. lastDecIQSet bool
  52. lastExtractIQ complex64
  53. prevExtractIQ complex64
  54. lastExtractIQSet bool
  55. // FM discriminator cross-block bridging: carry the last IQ sample so the
  56. // discriminator can compute the phase step across block boundaries.
  57. lastDiscrimIQ complex64
  58. lastDiscrimIQSet bool
  59. lastDemodL float32
  60. prevDemodL float64
  61. lastDemodSet bool
  62. snippetSeq uint64
  63. // listenOnly sessions have no WAV file and no disk I/O.
  64. // They exist solely to feed audio to live-listen subscribers.
  65. listenOnly bool
  66. // Recording state (nil/zero for listen-only sessions)
  67. dir string
  68. wavFile *os.File
  69. wavBuf *bufio.Writer
  70. wavSamples int64
  71. segmentIdx int
  72. sampleRate int // actual output audio sample rate (always streamAudioRate)
  73. channels int
  74. demodName string
  75. // --- Persistent DSP state for click-free streaming ---
  76. // Overlap-save: tail of previous extracted IQ snippet.
  77. // Currently unused for live demod after removing the extra discriminator
  78. // overlap prepend, but kept in DSP snapshot state for compatibility.
  79. overlapIQ []complex64
  80. // De-emphasis IIR state (persists across frames)
  81. deemphL float64
  82. deemphR float64
  83. // Stereo lock state for live WFM streaming
  84. stereoEnabled bool
  85. stereoOnCount int
  86. stereoOffCount int
  87. // Pilot-locked stereo PLL state (19kHz pilot)
  88. pilotPhase float64
  89. pilotFreq float64
  90. pilotAlpha float64
  91. pilotBeta float64
  92. pilotErrAvg float64
  93. pilotI float64
  94. pilotQ float64
  95. pilotLPAlpha float64
  96. // Polyphase resampler (replaces integer-decimate hack)
  97. monoResampler *dsp.Resampler
  98. monoResamplerRate int
  99. stereoResampler *dsp.StereoResampler
  100. stereoResamplerRate int
  101. // AQ-4: Stateful FIR filters for click-free stereo decode
  102. stereoFilterRate int
  103. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  104. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  105. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  106. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  107. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  108. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  109. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  110. // WFM 15kHz audio LPF — removes pilot (19kHz), L-R subcarrier (23-53kHz),
  111. // and RDS (57kHz) from the FM discriminator output before resampling.
  112. // Without this, the pilot leaks into the audio as a 19kHz tone (+55dB above
  113. // noise floor) and L-R subcarrier energy causes audible click-like artifacts.
  114. wfmAudioLPF *dsp.StatefulFIRReal
  115. wfmAudioLPFRate int
  116. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  117. // and avoids per-frame FIR recomputation)
  118. preDemodFIR *dsp.StatefulFIRComplex
  119. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  120. preDemodDecim int // cached decimation factor
  121. preDemodRate int // cached snipRate this FIR was built for
  122. preDemodCutoff float64 // cached cutoff
  123. preDemodDecimPhase int // retained for backward compatibility in snapshots/debug
  124. // AQ-2: De-emphasis config (µs, 0 = disabled)
  125. deemphasisUs float64
  126. // Scratch buffers — reused across frames to avoid GC pressure.
  127. // Grown as needed, never shrunk.
  128. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  129. scratchAudio []float32 // for stereo decode intermediates
  130. scratchPCM []byte // for PCM encoding
  131. // live-listen subscribers
  132. audioSubs []audioSub
  133. }
  134. type audioSub struct {
  135. id int64
  136. ch chan []byte
  137. }
  138. type RuntimeSignalInfo struct {
  139. DemodName string
  140. PlaybackMode string
  141. StereoState string
  142. Channels int
  143. SampleRate int
  144. }
  145. // AudioInfo describes the audio format of a live-listen subscription.
  146. // Sent to the WebSocket client as the first message.
  147. type AudioInfo struct {
  148. SampleRate int `json:"sample_rate"`
  149. Channels int `json:"channels"`
  150. Format string `json:"format"` // always "s16le"
  151. DemodName string `json:"demod"`
  152. PlaybackMode string `json:"playback_mode,omitempty"`
  153. StereoState string `json:"stereo_state,omitempty"`
  154. }
  155. const (
  156. streamAudioRate = 48000
  157. resamplerTaps = 32 // taps per polyphase arm — good quality
  158. )
  159. var debugDumpDelay = func() time.Duration {
  160. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DELAY_SECONDS"))
  161. if raw == "" {
  162. return 5 * time.Second
  163. }
  164. v, err := strconv.Atoi(raw)
  165. if err != nil || v < 0 {
  166. return 5 * time.Second
  167. }
  168. return time.Duration(v) * time.Second
  169. }()
  170. var debugDumpDuration = func() time.Duration {
  171. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DURATION_SECONDS"))
  172. if raw == "" {
  173. return 15 * time.Second
  174. }
  175. v, err := strconv.Atoi(raw)
  176. if err != nil || v <= 0 {
  177. return 15 * time.Second
  178. }
  179. return time.Duration(v) * time.Second
  180. }()
  181. var audioDumpEnabled = func() bool {
  182. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_AUDIO_DUMP_ENABLED"))
  183. if raw == "" {
  184. return false
  185. }
  186. v, err := strconv.ParseBool(raw)
  187. if err != nil {
  188. return false
  189. }
  190. return v
  191. }()
  192. var decHeadTrimSamples = func() int {
  193. raw := strings.TrimSpace(os.Getenv("SDR_DEC_HEAD_TRIM"))
  194. if raw == "" {
  195. return 0
  196. }
  197. v, err := strconv.Atoi(raw)
  198. if err != nil || v < 0 {
  199. return 0
  200. }
  201. return v
  202. }()
  203. // ---------------------------------------------------------------------------
  204. // Streamer — manages all active streaming sessions
  205. // ---------------------------------------------------------------------------
  206. type streamFeedItem struct {
  207. signal detector.Signal
  208. snippet []complex64
  209. snipRate int
  210. }
  211. type streamFeedMsg struct {
  212. traceID uint64
  213. items []streamFeedItem
  214. enqueuedAt time.Time
  215. }
  216. type Streamer struct {
  217. mu sync.Mutex
  218. sessions map[int64]*streamSession
  219. policy Policy
  220. centerHz float64
  221. nextSub int64
  222. feedCh chan streamFeedMsg
  223. done chan struct{}
  224. droppedFeed uint64
  225. droppedPCM uint64
  226. lastFeedTS time.Time
  227. lastProcTS time.Time
  228. // pendingListens are subscribers waiting for a matching session.
  229. pendingListens map[int64]*pendingListen
  230. telemetry *telemetry.Collector
  231. }
  232. type pendingListen struct {
  233. freq float64
  234. bw float64
  235. mode string
  236. ch chan []byte
  237. }
  238. func newStreamer(policy Policy, centerHz float64, coll *telemetry.Collector) *Streamer {
  239. st := &Streamer{
  240. sessions: make(map[int64]*streamSession),
  241. policy: policy,
  242. centerHz: centerHz,
  243. feedCh: make(chan streamFeedMsg, 2),
  244. done: make(chan struct{}),
  245. pendingListens: make(map[int64]*pendingListen),
  246. telemetry: coll,
  247. }
  248. go st.worker()
  249. return st
  250. }
  251. func (st *Streamer) worker() {
  252. for msg := range st.feedCh {
  253. st.processFeed(msg)
  254. }
  255. close(st.done)
  256. }
  257. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  258. st.mu.Lock()
  259. defer st.mu.Unlock()
  260. wasEnabled := st.policy.Enabled
  261. st.policy = policy
  262. st.centerHz = centerHz
  263. // If recording was just disabled, close recording sessions
  264. // but keep listen-only sessions alive.
  265. if wasEnabled && !policy.Enabled {
  266. for id, sess := range st.sessions {
  267. if sess.listenOnly {
  268. continue
  269. }
  270. if len(sess.audioSubs) > 0 {
  271. // Convert to listen-only: close WAV but keep session
  272. convertToListenOnly(sess)
  273. } else {
  274. closeSession(sess, &st.policy)
  275. delete(st.sessions, id)
  276. }
  277. }
  278. }
  279. }
  280. // HasListeners returns true if any sessions have audio subscribers
  281. // or there are pending listen requests. Used by the DSP loop to
  282. // decide whether to feed snippets even when recording is disabled.
  283. func (st *Streamer) HasListeners() bool {
  284. st.mu.Lock()
  285. defer st.mu.Unlock()
  286. return st.hasListenersLocked()
  287. }
  288. func (st *Streamer) hasListenersLocked() bool {
  289. if len(st.pendingListens) > 0 {
  290. return true
  291. }
  292. for _, sess := range st.sessions {
  293. if len(sess.audioSubs) > 0 {
  294. return true
  295. }
  296. }
  297. return false
  298. }
  299. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  300. // Feeds are accepted if:
  301. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  302. // - Any live-listen subscribers exist (listen-only mode)
  303. //
  304. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  305. // data, so items can be passed directly without another copy.
  306. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  307. st.mu.Lock()
  308. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  309. hasListeners := st.hasListenersLocked()
  310. pending := len(st.pendingListens)
  311. debugLiveAudio := st.policy.DebugLiveAudio
  312. now := time.Now()
  313. if !st.lastFeedTS.IsZero() {
  314. gap := now.Sub(st.lastFeedTS)
  315. if gap > 150*time.Millisecond {
  316. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  317. }
  318. }
  319. st.lastFeedTS = now
  320. st.mu.Unlock()
  321. if debugLiveAudio {
  322. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  323. }
  324. if (!recEnabled && !hasListeners) || len(items) == 0 {
  325. return
  326. }
  327. if st.telemetry != nil {
  328. st.telemetry.SetGauge("streamer.feed.queue_len", float64(len(st.feedCh)), nil)
  329. st.telemetry.SetGauge("streamer.pending_listeners", float64(pending), nil)
  330. st.telemetry.Observe("streamer.feed.batch_size", float64(len(items)), nil)
  331. }
  332. select {
  333. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items, enqueuedAt: time.Now()}:
  334. default:
  335. st.droppedFeed++
  336. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  337. if st.telemetry != nil {
  338. st.telemetry.IncCounter("streamer.feed.drop", 1, nil)
  339. st.telemetry.Event("stream_feed_drop", "warn", "feed queue full", nil, map[string]any{
  340. "trace_id": traceID,
  341. "queue_len": len(st.feedCh),
  342. })
  343. }
  344. }
  345. }
  346. // processFeed runs in the worker goroutine.
  347. func (st *Streamer) processFeed(msg streamFeedMsg) {
  348. procStart := time.Now()
  349. lockStart := time.Now()
  350. st.mu.Lock()
  351. lockWait := time.Since(lockStart)
  352. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  353. hasListeners := st.hasListenersLocked()
  354. now := time.Now()
  355. if !st.lastProcTS.IsZero() {
  356. gap := now.Sub(st.lastProcTS)
  357. if gap > 150*time.Millisecond {
  358. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  359. if st.telemetry != nil {
  360. st.telemetry.IncCounter("streamer.process.gap.count", 1, nil)
  361. st.telemetry.Observe("streamer.process.gap_ms", float64(gap.Milliseconds()), nil)
  362. }
  363. }
  364. }
  365. st.lastProcTS = now
  366. defer st.mu.Unlock()
  367. defer func() {
  368. if st.telemetry != nil {
  369. st.telemetry.Observe("streamer.process.total_ms", float64(time.Since(procStart).Microseconds())/1000.0, nil)
  370. st.telemetry.Observe("streamer.lock_wait_ms", float64(lockWait.Microseconds())/1000.0, telemetry.TagsFromPairs("lock", "process"))
  371. }
  372. }()
  373. if st.telemetry != nil {
  374. st.telemetry.Observe("streamer.feed.enqueue_delay_ms", float64(now.Sub(msg.enqueuedAt).Microseconds())/1000.0, nil)
  375. st.telemetry.SetGauge("streamer.sessions.active", float64(len(st.sessions)), nil)
  376. }
  377. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  378. if !recEnabled && !hasListeners {
  379. return
  380. }
  381. seen := make(map[int64]bool, len(msg.items))
  382. for i := range msg.items {
  383. item := &msg.items[i]
  384. sig := &item.signal
  385. seen[sig.ID] = true
  386. if sig.ID == 0 || sig.Class == nil {
  387. continue
  388. }
  389. if len(item.snippet) == 0 || item.snipRate <= 0 {
  390. continue
  391. }
  392. // Decide whether this signal needs a session
  393. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  394. needsListen := st.signalHasListenerLocked(sig)
  395. className := "<nil>"
  396. demodName := ""
  397. if sig.Class != nil {
  398. className = string(sig.Class.ModType)
  399. demodName, _ = resolveDemod(sig)
  400. }
  401. if st.policy.DebugLiveAudio {
  402. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  403. }
  404. if !needsRecording && !needsListen {
  405. continue
  406. }
  407. sess, exists := st.sessions[sig.ID]
  408. requestedMode := ""
  409. for _, pl := range st.pendingListens {
  410. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  411. if m := normalizeRequestedMode(pl.mode); m != "" {
  412. requestedMode = m
  413. break
  414. }
  415. }
  416. }
  417. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  418. for _, sub := range sess.audioSubs {
  419. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  420. }
  421. delete(st.sessions, sig.ID)
  422. sess = nil
  423. exists = false
  424. }
  425. if !exists {
  426. if needsRecording {
  427. s, err := st.openRecordingSession(sig, now)
  428. if err != nil {
  429. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  430. sig.ID, sig.CenterHz/1e6, err)
  431. if st.telemetry != nil {
  432. st.telemetry.IncCounter("streamer.session.open_error", 1, telemetry.TagsFromPairs("kind", "recording"))
  433. }
  434. continue
  435. }
  436. st.sessions[sig.ID] = s
  437. sess = s
  438. } else {
  439. s := st.openListenSession(sig, now)
  440. st.sessions[sig.ID] = s
  441. sess = s
  442. }
  443. // Attach any pending listeners
  444. st.attachPendingListeners(sess)
  445. if st.telemetry != nil {
  446. st.telemetry.IncCounter("streamer.session.open", 1, telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID)))
  447. st.telemetry.Event("session_open", "info", "stream session opened", telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{
  448. "listen_only": sess.listenOnly,
  449. "demod": sess.demodName,
  450. })
  451. }
  452. }
  453. // Update metadata
  454. sess.lastFeed = now
  455. sess.centerHz = sig.CenterHz
  456. sess.bwHz = sig.BWHz
  457. if sig.SNRDb > sess.snrDb {
  458. sess.snrDb = sig.SNRDb
  459. }
  460. if sig.PeakDb > sess.peakDb {
  461. sess.peakDb = sig.PeakDb
  462. }
  463. if sig.Class != nil {
  464. sess.class = sig.Class
  465. }
  466. // Demod with persistent state
  467. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  468. audioStart := time.Now()
  469. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate, st.telemetry)
  470. if st.telemetry != nil {
  471. st.telemetry.Observe("streamer.process_snippet_ms", float64(time.Since(audioStart).Microseconds())/1000.0, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  472. }
  473. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  474. if len(audio) == 0 {
  475. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  476. if st.telemetry != nil {
  477. st.telemetry.IncCounter("streamer.audio.empty", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  478. }
  479. }
  480. if len(audio) > 0 {
  481. if sess.wavSamples == 0 && audioRate > 0 {
  482. sess.sampleRate = audioRate
  483. }
  484. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  485. pcmLen := len(audio) * 2
  486. pcm := sess.growPCM(pcmLen)
  487. for k, s := range audio {
  488. v := int16(clip(s * 32767))
  489. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  490. }
  491. if !sess.listenOnly && sess.wavBuf != nil {
  492. n, err := sess.wavBuf.Write(pcm)
  493. if err != nil {
  494. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  495. } else {
  496. sess.wavSamples += int64(n / 2)
  497. }
  498. }
  499. // Gap logging for live-audio sessions + transient click detector
  500. if len(sess.audioSubs) > 0 {
  501. if !sess.lastAudioTs.IsZero() {
  502. gap := time.Since(sess.lastAudioTs)
  503. if gap > 150*time.Millisecond {
  504. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  505. if st.telemetry != nil {
  506. st.telemetry.IncCounter("streamer.audio.gap.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  507. st.telemetry.Observe("streamer.audio.gap_ms", float64(gap.Milliseconds()), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  508. }
  509. }
  510. }
  511. // Transient click detector: finds short impulses (1-3 samples)
  512. // that deviate sharply from the local signal trend.
  513. // A click looks like: ...smooth... SPIKE ...smooth...
  514. // Normal FM audio has large deltas too, but they follow
  515. // a continuous curve. A click has high |d2/dt2| (acceleration).
  516. //
  517. // Method: second-derivative detector. For each sample triplet
  518. // (a, b, c), compute |2b - a - c| which is the discrete
  519. // second derivative magnitude. High values = transient spike.
  520. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  521. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  522. stride := sess.channels
  523. if stride < 1 {
  524. stride = 1
  525. }
  526. nFrames := len(audio) / stride
  527. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  528. if sess.lastAudioSet && nFrames >= 1 {
  529. // second derivative across boundary: |2*last - prevLast - first|
  530. first := float64(audio[0])
  531. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  532. if d2 > 0.15 {
  533. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  534. if st.telemetry != nil {
  535. st.telemetry.IncCounter("audio.boundary_click.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  536. st.telemetry.Observe("audio.boundary_click.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  537. }
  538. }
  539. }
  540. // Intra-frame transient scan (L channel only for performance)
  541. nClicks := 0
  542. maxD2 := float64(0)
  543. maxD2Pos := 0
  544. for k := 1; k < nFrames-1; k++ {
  545. a := float64(audio[(k-1)*stride])
  546. b := float64(audio[k*stride])
  547. c := float64(audio[(k+1)*stride])
  548. d2 := math.Abs(2*b - a - c)
  549. if d2 > maxD2 {
  550. maxD2 = d2
  551. maxD2Pos = k
  552. }
  553. if d2 > 0.15 {
  554. nClicks++
  555. }
  556. }
  557. if nClicks > 0 {
  558. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  559. if st.telemetry != nil {
  560. st.telemetry.IncCounter("audio.intra_click.count", float64(nClicks), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  561. st.telemetry.Observe("audio.intra_click.max_d2", maxD2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  562. }
  563. }
  564. // Store last two samples for next frame's boundary check
  565. if nFrames >= 2 {
  566. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  567. sess.lastAudioL = audio[(nFrames-1)*stride]
  568. if stride > 1 {
  569. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  570. }
  571. } else if nFrames == 1 {
  572. sess.prevAudioL = float64(sess.lastAudioL)
  573. sess.lastAudioL = audio[0]
  574. if stride > 1 && len(audio) >= 2 {
  575. sess.lastAudioR = audio[1]
  576. }
  577. }
  578. sess.lastAudioSet = true
  579. }
  580. sess.lastAudioTs = time.Now()
  581. }
  582. st.fanoutPCM(sess, pcm, pcmLen)
  583. }
  584. // Segment split (recording sessions only)
  585. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  586. segIdx := sess.segmentIdx + 1
  587. oldSubs := sess.audioSubs
  588. oldState := sess.captureDSPState()
  589. sess.audioSubs = nil
  590. closeSession(sess, &st.policy)
  591. s, err := st.openRecordingSession(sig, now)
  592. if err != nil {
  593. delete(st.sessions, sig.ID)
  594. continue
  595. }
  596. s.segmentIdx = segIdx
  597. s.audioSubs = oldSubs
  598. s.restoreDSPState(oldState)
  599. st.sessions[sig.ID] = s
  600. if st.telemetry != nil {
  601. st.telemetry.IncCounter("streamer.session.reopen", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID)))
  602. st.telemetry.Event("session_reopen", "info", "stream session rotated by max duration", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{
  603. "old_session": sess.sessionID,
  604. "new_session": s.sessionID,
  605. })
  606. }
  607. }
  608. }
  609. // Close sessions for disappeared signals (with grace period)
  610. for id, sess := range st.sessions {
  611. if seen[id] {
  612. continue
  613. }
  614. gracePeriod := 3 * time.Second
  615. if sess.listenOnly {
  616. gracePeriod = 5 * time.Second
  617. }
  618. if now.Sub(sess.lastFeed) > gracePeriod {
  619. for _, sub := range sess.audioSubs {
  620. close(sub.ch)
  621. }
  622. sess.audioSubs = nil
  623. if !sess.listenOnly {
  624. closeSession(sess, &st.policy)
  625. }
  626. if st.telemetry != nil {
  627. st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID))
  628. st.telemetry.Event("session_close", "info", "stream session closed", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID), map[string]any{
  629. "reason": "signal_missing",
  630. "listen_only": sess.listenOnly,
  631. })
  632. }
  633. delete(st.sessions, id)
  634. }
  635. }
  636. }
  637. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  638. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  639. if st.policy.DebugLiveAudio {
  640. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  641. }
  642. return true
  643. }
  644. for subID, pl := range st.pendingListens {
  645. delta := math.Abs(sig.CenterHz - pl.freq)
  646. if delta < 200000 {
  647. if st.policy.DebugLiveAudio {
  648. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  649. }
  650. return true
  651. }
  652. }
  653. return false
  654. }
  655. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  656. for subID, pl := range st.pendingListens {
  657. requestedMode := normalizeRequestedMode(pl.mode)
  658. if requestedMode != "" && sess.demodName != requestedMode {
  659. continue
  660. }
  661. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  662. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  663. delete(st.pendingListens, subID)
  664. // Send updated audio_info now that we know the real session params.
  665. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  666. infoJSON, _ := json.Marshal(sess.audioInfo())
  667. tagged := make([]byte, 1+len(infoJSON))
  668. tagged[0] = 0x00 // tag: audio_info
  669. copy(tagged[1:], infoJSON)
  670. select {
  671. case pl.ch <- tagged:
  672. default:
  673. }
  674. if audioDumpEnabled {
  675. now := time.Now()
  676. sess.debugDumpStart = now.Add(debugDumpDelay)
  677. sess.debugDumpUntil = sess.debugDumpStart.Add(debugDumpDuration)
  678. sess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", sess.signalID, now.Format("20060102-150405")))
  679. sess.demodDump = nil
  680. sess.finalDump = nil
  681. }
  682. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  683. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  684. if audioDumpEnabled {
  685. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", sess.signalID, sess.debugDumpStart.Format(time.RFC3339), sess.debugDumpUntil.Format(time.RFC3339))
  686. }
  687. }
  688. }
  689. }
  690. // CloseAll finalises all sessions and stops the worker goroutine.
  691. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  692. st.mu.Lock()
  693. defer st.mu.Unlock()
  694. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  695. for _, sess := range st.sessions {
  696. out[sess.signalID] = RuntimeSignalInfo{
  697. DemodName: sess.demodName,
  698. PlaybackMode: sess.playbackMode,
  699. StereoState: sess.stereoState,
  700. Channels: sess.channels,
  701. SampleRate: sess.sampleRate,
  702. }
  703. }
  704. return out
  705. }
  706. func (st *Streamer) CloseAll() {
  707. close(st.feedCh)
  708. <-st.done
  709. st.mu.Lock()
  710. defer st.mu.Unlock()
  711. for id, sess := range st.sessions {
  712. for _, sub := range sess.audioSubs {
  713. close(sub.ch)
  714. }
  715. sess.audioSubs = nil
  716. if !sess.listenOnly {
  717. closeSession(sess, &st.policy)
  718. }
  719. if st.telemetry != nil {
  720. st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID))
  721. }
  722. delete(st.sessions, id)
  723. }
  724. for _, pl := range st.pendingListens {
  725. close(pl.ch)
  726. }
  727. st.pendingListens = nil
  728. if st.telemetry != nil {
  729. st.telemetry.Event("streamer_close_all", "info", "all stream sessions closed", nil, nil)
  730. }
  731. }
  732. // ActiveSessions returns the number of open streaming sessions.
  733. func (st *Streamer) ActiveSessions() int {
  734. st.mu.Lock()
  735. defer st.mu.Unlock()
  736. return len(st.sessions)
  737. }
  738. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  739. //
  740. // LL-2: Returns AudioInfo with correct channels and sample rate.
  741. // LL-3: Returns error only on hard failures (nil streamer etc).
  742. //
  743. // If a matching session exists, attaches immediately. Otherwise, the
  744. // subscriber is held as "pending" and will be attached when a matching
  745. // signal appears in the next DSP frame.
  746. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  747. ch := make(chan []byte, 64)
  748. st.mu.Lock()
  749. defer st.mu.Unlock()
  750. st.nextSub++
  751. subID := st.nextSub
  752. requestedMode := normalizeRequestedMode(mode)
  753. // Try to find a matching session
  754. var bestSess *streamSession
  755. bestDist := math.MaxFloat64
  756. for _, sess := range st.sessions {
  757. if requestedMode != "" && sess.demodName != requestedMode {
  758. continue
  759. }
  760. d := math.Abs(sess.centerHz - freq)
  761. if d < bestDist {
  762. bestDist = d
  763. bestSess = sess
  764. }
  765. }
  766. if bestSess != nil && bestDist < 200000 {
  767. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  768. if audioDumpEnabled {
  769. now := time.Now()
  770. bestSess.debugDumpStart = now.Add(debugDumpDelay)
  771. bestSess.debugDumpUntil = bestSess.debugDumpStart.Add(debugDumpDuration)
  772. bestSess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", bestSess.signalID, now.Format("20060102-150405")))
  773. bestSess.demodDump = nil
  774. bestSess.finalDump = nil
  775. }
  776. info := bestSess.audioInfo()
  777. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  778. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  779. if audioDumpEnabled {
  780. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", bestSess.signalID, bestSess.debugDumpStart.Format(time.RFC3339), bestSess.debugDumpUntil.Format(time.RFC3339))
  781. }
  782. if st.telemetry != nil {
  783. st.telemetry.IncCounter("streamer.listener.attach", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", bestSess.signalID), "session_id", bestSess.sessionID))
  784. }
  785. return subID, ch, info, nil
  786. }
  787. // No matching session yet — add as pending listener
  788. st.pendingListens[subID] = &pendingListen{
  789. freq: freq,
  790. bw: bw,
  791. mode: mode,
  792. ch: ch,
  793. }
  794. info := defaultAudioInfoForMode(mode)
  795. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  796. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  797. if st.telemetry != nil {
  798. st.telemetry.IncCounter("streamer.listener.pending", 1, nil)
  799. st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil)
  800. }
  801. return subID, ch, info, nil
  802. }
  803. // UnsubscribeAudio removes a live-listen subscriber.
  804. func (st *Streamer) UnsubscribeAudio(subID int64) {
  805. st.mu.Lock()
  806. defer st.mu.Unlock()
  807. if pl, ok := st.pendingListens[subID]; ok {
  808. close(pl.ch)
  809. delete(st.pendingListens, subID)
  810. if st.telemetry != nil {
  811. st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "pending"))
  812. st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil)
  813. }
  814. return
  815. }
  816. for _, sess := range st.sessions {
  817. for i, sub := range sess.audioSubs {
  818. if sub.id == subID {
  819. close(sub.ch)
  820. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  821. if st.telemetry != nil {
  822. st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "active", "session_id", sess.sessionID))
  823. }
  824. return
  825. }
  826. }
  827. }
  828. }
  829. // ---------------------------------------------------------------------------
  830. // Session: stateful extraction + demod
  831. // ---------------------------------------------------------------------------
  832. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  833. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  834. // output with zero transient artifacts.
  835. type iqHeadProbeStats struct {
  836. meanMag float64
  837. minMag float64
  838. maxStep float64
  839. p95Step float64
  840. lowMag int
  841. }
  842. func probeIQHeadStats(iq []complex64, probeLen int) iqHeadProbeStats {
  843. if probeLen <= 0 || len(iq) == 0 {
  844. return iqHeadProbeStats{}
  845. }
  846. if len(iq) < probeLen {
  847. probeLen = len(iq)
  848. }
  849. stats := iqHeadProbeStats{minMag: math.MaxFloat64}
  850. steps := make([]float64, 0, probeLen)
  851. var sum float64
  852. for i := 0; i < probeLen; i++ {
  853. v := iq[i]
  854. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  855. sum += mag
  856. if mag < stats.minMag {
  857. stats.minMag = mag
  858. }
  859. if mag < 0.02 {
  860. stats.lowMag++
  861. }
  862. if i > 0 {
  863. p := iq[i-1]
  864. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  865. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  866. step := math.Abs(math.Atan2(num, den))
  867. steps = append(steps, step)
  868. if step > stats.maxStep {
  869. stats.maxStep = step
  870. }
  871. }
  872. }
  873. stats.meanMag = sum / float64(probeLen)
  874. if len(steps) > 0 {
  875. sorted := append([]float64(nil), steps...)
  876. sort.Float64s(sorted)
  877. idx := int(math.Round(0.95 * float64(len(sorted)-1)))
  878. if idx < 0 {
  879. idx = 0
  880. }
  881. if idx >= len(sorted) {
  882. idx = len(sorted) - 1
  883. }
  884. stats.p95Step = sorted[idx]
  885. }
  886. if stats.minMag == math.MaxFloat64 {
  887. stats.minMag = 0
  888. }
  889. return stats
  890. }
  891. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int, coll *telemetry.Collector) ([]float32, int) {
  892. if len(snippet) == 0 || snipRate <= 0 {
  893. return nil, 0
  894. }
  895. baseTags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)
  896. if coll != nil {
  897. coll.SetGauge("iq.stage.snippet.length", float64(len(snippet)), baseTags)
  898. stats := probeIQHeadStats(snippet, 64)
  899. coll.Observe("iq.snippet.head_mean_mag", stats.meanMag, baseTags)
  900. coll.Observe("iq.snippet.head_min_mag", stats.minMag, baseTags)
  901. coll.Observe("iq.snippet.head_max_step", stats.maxStep, baseTags)
  902. coll.Observe("iq.snippet.head_p95_step", stats.p95Step, baseTags)
  903. coll.SetGauge("iq.snippet.head_low_magnitude_count", float64(stats.lowMag), baseTags)
  904. if sess.lastExtractIQSet {
  905. prevMag := math.Hypot(float64(real(sess.lastExtractIQ)), float64(imag(sess.lastExtractIQ)))
  906. currMag := math.Hypot(float64(real(snippet[0])), float64(imag(snippet[0])))
  907. deltaMag := math.Abs(currMag - prevMag)
  908. num := float64(real(sess.lastExtractIQ))*float64(imag(snippet[0])) - float64(imag(sess.lastExtractIQ))*float64(real(snippet[0]))
  909. den := float64(real(sess.lastExtractIQ))*float64(real(snippet[0])) + float64(imag(sess.lastExtractIQ))*float64(imag(snippet[0]))
  910. deltaPhase := math.Abs(math.Atan2(num, den))
  911. d2 := float64(real(snippet[0]-sess.lastExtractIQ))*float64(real(snippet[0]-sess.lastExtractIQ)) + float64(imag(snippet[0]-sess.lastExtractIQ))*float64(imag(snippet[0]-sess.lastExtractIQ))
  912. coll.Observe("iq.extract.output.boundary.delta_mag", deltaMag, baseTags)
  913. coll.Observe("iq.extract.output.boundary.delta_phase", deltaPhase, baseTags)
  914. coll.Observe("iq.extract.output.boundary.d2", d2, baseTags)
  915. coll.Observe("iq.extract.output.boundary.discontinuity_score", deltaMag+deltaPhase, baseTags)
  916. }
  917. }
  918. if len(snippet) > 0 {
  919. sess.prevExtractIQ = sess.lastExtractIQ
  920. sess.lastExtractIQ = snippet[len(snippet)-1]
  921. sess.lastExtractIQSet = true
  922. }
  923. isWFMStereo := sess.demodName == "WFM_STEREO"
  924. isWFM := sess.demodName == "WFM" || isWFMStereo
  925. demodName := sess.demodName
  926. if isWFMStereo {
  927. demodName = "WFM"
  928. }
  929. d := demod.Get(demodName)
  930. if d == nil {
  931. d = demod.Get("NFM")
  932. }
  933. if d == nil {
  934. return nil, 0
  935. }
  936. // The extra 1-sample discriminator overlap prepend was removed after it was
  937. // shown to shift the downstream decimation phase and create heavy click
  938. // artifacts in steady-state streaming/recording. The upstream extraction path
  939. // and the stateful FIR/decimation stages already provide continuity.
  940. fullSnip := snippet
  941. overlapApplied := false
  942. prevTailValid := false
  943. if logging.EnabledCategory("prefir") && len(fullSnip) > 0 {
  944. probeN := 64
  945. if len(fullSnip) < probeN {
  946. probeN = len(fullSnip)
  947. }
  948. minPreMag := math.MaxFloat64
  949. minPreIdx := 0
  950. maxPreStep := 0.0
  951. maxPreStepIdx := 0
  952. for i := 0; i < probeN; i++ {
  953. v := fullSnip[i]
  954. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  955. if mag < minPreMag {
  956. minPreMag = mag
  957. minPreIdx = i
  958. }
  959. if i > 0 {
  960. p := fullSnip[i-1]
  961. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  962. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  963. step := math.Abs(math.Atan2(num, den))
  964. if step > maxPreStep {
  965. maxPreStep = step
  966. maxPreStepIdx = i - 1
  967. }
  968. }
  969. }
  970. logging.Debug("prefir", "pre_fir_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "snip_len", len(fullSnip))
  971. if minPreMag < 0.18 {
  972. logging.Warn("prefir", "pre_fir_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx)
  973. }
  974. if maxPreStep > 1.5 {
  975. logging.Warn("prefir", "pre_fir_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "min_mag", minPreMag, "min_idx", minPreIdx)
  976. }
  977. }
  978. // --- Stateful anti-alias FIR + decimation to demod rate ---
  979. demodRate := d.OutputSampleRate()
  980. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  981. if decim1 < 1 {
  982. decim1 = 1
  983. }
  984. // WFM override: force decim1=2 (256kHz) instead of round(512k/192k)=3 (170kHz).
  985. // At decim1=3, Nyquist is 85kHz which clips FM broadcast ±75kHz deviation.
  986. // At decim1=2, Nyquist is 128kHz → full FM deviation + stereo pilot + guard band.
  987. // Bonus: 256000→48000 resampler ratio is L=3/M=16 (96 taps, 1kB) instead of
  988. // the pathological L=24000/M=85333 (768k taps, 6MB) from 170666→48000.
  989. if isWFM && decim1 > 2 && snipRate/2 >= 200000 {
  990. decim1 = 2
  991. }
  992. actualDemodRate := snipRate / decim1
  993. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  994. var dec []complex64
  995. if decim1 > 1 {
  996. // FIR cutoff: for WFM, use 90kHz (above ±75kHz FM deviation + guard).
  997. // For NFM/other: use standard Nyquist*0.8 cutoff.
  998. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  999. if isWFM {
  1000. cutoff = 90000
  1001. }
  1002. // Lazy-init or reinit stateful FIR if parameters changed
  1003. if sess.preDemodDecimator == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff || sess.preDemodDecim != decim1 {
  1004. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  1005. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  1006. sess.preDemodDecimator = dsp.NewStatefulDecimatingFIRComplex(taps, decim1)
  1007. sess.preDemodRate = snipRate
  1008. sess.preDemodCutoff = cutoff
  1009. sess.preDemodDecim = decim1
  1010. sess.preDemodDecimPhase = 0
  1011. if coll != nil {
  1012. coll.IncCounter("dsp.pre_demod.init", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1013. coll.Event("prefir_reinit", "info", "pre-demod decimator reinitialized", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1014. "snip_rate": snipRate,
  1015. "cutoff_hz": cutoff,
  1016. "decim": decim1,
  1017. })
  1018. }
  1019. }
  1020. decimPhaseBefore := sess.preDemodDecimPhase
  1021. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  1022. dec = sess.preDemodDecimator.Process(fullSnip)
  1023. sess.preDemodDecimPhase = sess.preDemodDecimator.Phase()
  1024. if coll != nil {
  1025. coll.Observe("dsp.pre_demod.decimation_factor", float64(decim1), baseTags)
  1026. coll.SetGauge("iq.stage.pre_demod.length", float64(len(dec)), baseTags)
  1027. decStats := probeIQHeadStats(dec, 64)
  1028. coll.Observe("iq.pre_demod.head_mean_mag", decStats.meanMag, baseTags)
  1029. coll.Observe("iq.pre_demod.head_min_mag", decStats.minMag, baseTags)
  1030. coll.Observe("iq.pre_demod.head_max_step", decStats.maxStep, baseTags)
  1031. coll.Observe("iq.pre_demod.head_p95_step", decStats.p95Step, baseTags)
  1032. coll.SetGauge("iq.pre_demod.head_low_magnitude_count", float64(decStats.lowMag), baseTags)
  1033. }
  1034. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase)
  1035. } else {
  1036. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0)
  1037. dec = fullSnip
  1038. }
  1039. if decHeadTrimSamples > 0 && decHeadTrimSamples < len(dec) {
  1040. logging.Warn("boundary", "dec_head_trim_applied", "signal", sess.signalID, "trim", decHeadTrimSamples, "before_len", len(dec))
  1041. dec = dec[decHeadTrimSamples:]
  1042. if coll != nil {
  1043. coll.IncCounter("dsp.pre_demod.head_trim", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1044. }
  1045. }
  1046. if logging.EnabledCategory("boundary") && len(dec) > 0 {
  1047. first := dec[0]
  1048. if sess.lastDecIQSet {
  1049. d2Re := math.Abs(2*float64(real(sess.lastDecIQ)) - float64(real(sess.prevDecIQ)) - float64(real(first)))
  1050. d2Im := math.Abs(2*float64(imag(sess.lastDecIQ)) - float64(imag(sess.prevDecIQ)) - float64(imag(first)))
  1051. d2Mag := math.Hypot(d2Re, d2Im)
  1052. if d2Mag > 0.15 {
  1053. logging.Warn("boundary", "dec_iq_boundary", "signal", sess.signalID, "d2", d2Mag)
  1054. if coll != nil {
  1055. coll.IncCounter("iq.dec.boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1056. coll.Observe("iq.dec.boundary.d2", d2Mag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1057. }
  1058. }
  1059. }
  1060. headN := 16
  1061. if len(dec) < headN {
  1062. headN = len(dec)
  1063. }
  1064. tailN := 16
  1065. if len(dec) < tailN {
  1066. tailN = len(dec)
  1067. }
  1068. var headSum, tailSum, minMag, maxMag float64
  1069. minMag = math.MaxFloat64
  1070. for i, v := range dec {
  1071. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  1072. if mag < minMag {
  1073. minMag = mag
  1074. }
  1075. if mag > maxMag {
  1076. maxMag = mag
  1077. }
  1078. if i < headN {
  1079. headSum += mag
  1080. }
  1081. }
  1082. for i := len(dec) - tailN; i < len(dec); i++ {
  1083. if i >= 0 {
  1084. v := dec[i]
  1085. tailSum += math.Hypot(float64(real(v)), float64(imag(v)))
  1086. }
  1087. }
  1088. headAvg := 0.0
  1089. if headN > 0 {
  1090. headAvg = headSum / float64(headN)
  1091. }
  1092. tailAvg := 0.0
  1093. if tailN > 0 {
  1094. tailAvg = tailSum / float64(tailN)
  1095. }
  1096. logging.Debug("boundary", "dec_iq_meter", "signal", sess.signalID, "len", len(dec), "head_avg", headAvg, "tail_avg", tailAvg, "min_mag", minMag, "max_mag", maxMag)
  1097. if tailAvg > 0 {
  1098. ratio := headAvg / tailAvg
  1099. if ratio < 0.75 || ratio > 1.25 {
  1100. logging.Warn("boundary", "dec_iq_head_tail_skew", "signal", sess.signalID, "head_avg", headAvg, "tail_avg", tailAvg, "ratio", ratio)
  1101. }
  1102. if coll != nil {
  1103. coll.Observe("iq.dec.head_tail_ratio", ratio, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1104. }
  1105. }
  1106. probeN := 64
  1107. if len(dec) < probeN {
  1108. probeN = len(dec)
  1109. }
  1110. minHeadMag := math.MaxFloat64
  1111. minHeadIdx := 0
  1112. maxHeadStep := 0.0
  1113. maxHeadStepIdx := 0
  1114. for i := 0; i < probeN; i++ {
  1115. v := dec[i]
  1116. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  1117. if mag < minHeadMag {
  1118. minHeadMag = mag
  1119. minHeadIdx = i
  1120. }
  1121. if i > 0 {
  1122. p := dec[i-1]
  1123. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  1124. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  1125. step := math.Abs(math.Atan2(num, den))
  1126. if step > maxHeadStep {
  1127. maxHeadStep = step
  1128. maxHeadStepIdx = i - 1
  1129. }
  1130. }
  1131. }
  1132. logging.Debug("boundary", "dec_iq_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  1133. if minHeadMag < 0.18 {
  1134. logging.Warn("boundary", "dec_iq_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  1135. }
  1136. if maxHeadStep > 1.5 {
  1137. logging.Warn("boundary", "dec_iq_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx, "min_mag", minHeadMag, "min_idx", minHeadIdx)
  1138. }
  1139. if coll != nil {
  1140. coll.Observe("iq.dec.magnitude.min", minMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1141. coll.Observe("iq.dec.magnitude.max", maxMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1142. coll.Observe("iq.dec.phase_step.max", maxHeadStep, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1143. }
  1144. if len(dec) >= 2 {
  1145. sess.prevDecIQ = dec[len(dec)-2]
  1146. sess.lastDecIQ = dec[len(dec)-1]
  1147. } else {
  1148. sess.prevDecIQ = sess.lastDecIQ
  1149. sess.lastDecIQ = dec[0]
  1150. }
  1151. sess.lastDecIQSet = true
  1152. }
  1153. // --- FM/AM/etc Demod ---
  1154. // For FM demod (NFM/WFM): bridge the block boundary by prepending the
  1155. // previous block's last IQ sample. Without this, the discriminator loses
  1156. // the cross-boundary phase step (1 audio sample missing per block) and
  1157. // any phase discontinuity at the seam becomes an unsmoothed audio transient.
  1158. var audio []float32
  1159. isFMDemod := demodName == "NFM" || demodName == "WFM"
  1160. if isFMDemod && sess.lastDiscrimIQSet && len(dec) > 0 {
  1161. bridged := make([]complex64, len(dec)+1)
  1162. bridged[0] = sess.lastDiscrimIQ
  1163. copy(bridged[1:], dec)
  1164. audio = d.Demod(bridged, actualDemodRate)
  1165. // bridged produced len(dec) audio samples (= len(bridged)-1)
  1166. // which is exactly the correct count for the new data
  1167. } else {
  1168. audio = d.Demod(dec, actualDemodRate)
  1169. }
  1170. if len(dec) > 0 {
  1171. sess.lastDiscrimIQ = dec[len(dec)-1]
  1172. sess.lastDiscrimIQSet = true
  1173. }
  1174. if len(audio) == 0 {
  1175. return nil, 0
  1176. }
  1177. if coll != nil {
  1178. coll.SetGauge("audio.stage.demod.length", float64(len(audio)), baseTags)
  1179. probe := 64
  1180. if len(audio) < probe {
  1181. probe = len(audio)
  1182. }
  1183. if probe > 0 {
  1184. var headAbs, tailAbs float64
  1185. for i := 0; i < probe; i++ {
  1186. headAbs += math.Abs(float64(audio[i]))
  1187. tailAbs += math.Abs(float64(audio[len(audio)-probe+i]))
  1188. }
  1189. coll.Observe("audio.demod.head_mean_abs", headAbs/float64(probe), baseTags)
  1190. coll.Observe("audio.demod.tail_mean_abs", tailAbs/float64(probe), baseTags)
  1191. coll.Observe("audio.demod.edge_delta_abs", math.Abs(float64(audio[0])-float64(audio[len(audio)-1])), baseTags)
  1192. }
  1193. }
  1194. if logging.EnabledCategory("boundary") {
  1195. stride := d.Channels()
  1196. if stride < 1 {
  1197. stride = 1
  1198. }
  1199. nFrames := len(audio) / stride
  1200. if nFrames > 0 {
  1201. first := float64(audio[0])
  1202. if sess.lastDemodSet {
  1203. d2 := math.Abs(2*float64(sess.lastDemodL) - sess.prevDemodL - first)
  1204. if d2 > 0.15 {
  1205. logging.Warn("boundary", "demod_boundary", "signal", sess.signalID, "d2", d2)
  1206. if coll != nil {
  1207. coll.IncCounter("audio.demod_boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1208. coll.Observe("audio.demod_boundary.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1209. }
  1210. }
  1211. }
  1212. if nFrames >= 2 {
  1213. sess.prevDemodL = float64(audio[(nFrames-2)*stride])
  1214. sess.lastDemodL = audio[(nFrames-1)*stride]
  1215. } else {
  1216. sess.prevDemodL = float64(sess.lastDemodL)
  1217. sess.lastDemodL = audio[0]
  1218. }
  1219. sess.lastDemodSet = true
  1220. }
  1221. }
  1222. logging.Debug("boundary", "audio_path", "signal", sess.signalID, "demod", demodName, "actual_rate", actualDemodRate, "audio_len", len(audio), "channels", d.Channels(), "overlap_applied", overlapApplied, "prev_tail_valid", prevTailValid)
  1223. shouldDump := !sess.debugDumpStart.IsZero() && !sess.debugDumpUntil.IsZero()
  1224. if shouldDump {
  1225. now := time.Now()
  1226. shouldDump = !now.Before(sess.debugDumpStart) && now.Before(sess.debugDumpUntil)
  1227. }
  1228. if shouldDump {
  1229. sess.demodDump = append(sess.demodDump, audio...)
  1230. }
  1231. // --- Stateful stereo decode with conservative lock/hysteresis ---
  1232. channels := 1
  1233. if isWFMStereo {
  1234. sess.playbackMode = "WFM_STEREO"
  1235. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  1236. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  1237. if locked {
  1238. sess.stereoOnCount++
  1239. sess.stereoOffCount = 0
  1240. if sess.stereoOnCount >= 4 {
  1241. sess.stereoEnabled = true
  1242. }
  1243. } else {
  1244. sess.stereoOnCount = 0
  1245. sess.stereoOffCount++
  1246. if sess.stereoOffCount >= 10 {
  1247. sess.stereoEnabled = false
  1248. }
  1249. }
  1250. prevPlayback := sess.playbackMode
  1251. prevStereo := sess.stereoState
  1252. if sess.stereoEnabled && len(stereoAudio) > 0 {
  1253. sess.stereoState = "locked"
  1254. audio = stereoAudio
  1255. } else {
  1256. sess.stereoState = "mono-fallback"
  1257. // Apply 15kHz LPF before output: the raw discriminator contains
  1258. // the 19kHz pilot (+55dB), L-R subcarrier (23-53kHz), and RDS (57kHz).
  1259. // Without filtering, the pilot leaks into audio and subcarrier
  1260. // energy produces audible click-like artifacts.
  1261. audio = sess.wfmAudioFilter(audio, actualDemodRate)
  1262. dual := make([]float32, len(audio)*2)
  1263. for i, s := range audio {
  1264. dual[i*2] = s
  1265. dual[i*2+1] = s
  1266. }
  1267. audio = dual
  1268. }
  1269. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  1270. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  1271. }
  1272. } else if isWFM {
  1273. // Plain WFM (not stereo): also needs 15kHz LPF on discriminator output
  1274. audio = sess.wfmAudioFilter(audio, actualDemodRate)
  1275. }
  1276. // --- Polyphase resample to exact 48kHz ---
  1277. if actualDemodRate != streamAudioRate {
  1278. if channels > 1 {
  1279. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  1280. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  1281. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1282. sess.stereoResamplerRate = actualDemodRate
  1283. if coll != nil {
  1284. coll.Event("resampler_reset", "info", "stereo resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1285. "mode": "stereo",
  1286. "rate": actualDemodRate,
  1287. })
  1288. }
  1289. }
  1290. audio = sess.stereoResampler.Process(audio)
  1291. } else {
  1292. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  1293. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  1294. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1295. sess.monoResamplerRate = actualDemodRate
  1296. if coll != nil {
  1297. coll.Event("resampler_reset", "info", "mono resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1298. "mode": "mono",
  1299. "rate": actualDemodRate,
  1300. })
  1301. }
  1302. }
  1303. audio = sess.monoResampler.Process(audio)
  1304. }
  1305. }
  1306. if coll != nil {
  1307. coll.SetGauge("audio.stage.output.length", float64(len(audio)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1308. }
  1309. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  1310. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  1311. tau := sess.deemphasisUs * 1e-6
  1312. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  1313. if channels > 1 {
  1314. nFrames := len(audio) / channels
  1315. yL, yR := sess.deemphL, sess.deemphR
  1316. for i := 0; i < nFrames; i++ {
  1317. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  1318. audio[i*2] = float32(yL)
  1319. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  1320. audio[i*2+1] = float32(yR)
  1321. }
  1322. sess.deemphL, sess.deemphR = yL, yR
  1323. } else {
  1324. y := sess.deemphL
  1325. for i := range audio {
  1326. y = alpha*y + (1-alpha)*float64(audio[i])
  1327. audio[i] = float32(y)
  1328. }
  1329. sess.deemphL = y
  1330. }
  1331. }
  1332. if isWFM {
  1333. for i := range audio {
  1334. audio[i] *= 0.35
  1335. }
  1336. }
  1337. if shouldDump {
  1338. sess.finalDump = append(sess.finalDump, audio...)
  1339. } else if !sess.debugDumpUntil.IsZero() && time.Now().After(sess.debugDumpUntil) && sess.debugDumpBase != "" {
  1340. _ = os.MkdirAll(filepath.Dir(sess.debugDumpBase), 0o755)
  1341. if len(sess.demodDump) > 0 {
  1342. _ = writeWAVFile(sess.debugDumpBase+"-demod.wav", sess.demodDump, actualDemodRate, d.Channels())
  1343. }
  1344. if len(sess.finalDump) > 0 {
  1345. _ = writeWAVFile(sess.debugDumpBase+"-final.wav", sess.finalDump, streamAudioRate, channels)
  1346. }
  1347. logging.Warn("boundary", "debug_audio_dump_window", "signal", sess.signalID, "base", sess.debugDumpBase)
  1348. sess.debugDumpBase = ""
  1349. sess.demodDump = nil
  1350. sess.finalDump = nil
  1351. sess.debugDumpStart = time.Time{}
  1352. sess.debugDumpUntil = time.Time{}
  1353. }
  1354. return audio, streamAudioRate
  1355. }
  1356. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  1357. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  1358. // loopBW is in Hz, sampleRate in samples/sec.
  1359. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  1360. if sampleRate <= 0 || loopBW <= 0 {
  1361. return 0, 0
  1362. }
  1363. bl := loopBW / float64(sampleRate)
  1364. theta := bl / (damping + 0.25/damping)
  1365. d := 1 + 2*damping*theta + theta*theta
  1366. alpha := (4 * damping * theta) / d
  1367. beta := (4 * theta * theta) / d
  1368. return alpha, beta
  1369. }
  1370. // wfmAudioFilter applies a stateful 15kHz lowpass to WFM discriminator output.
  1371. // Removes the 19kHz stereo pilot, L-R DSB-SC subcarrier (23-53kHz), and RDS (57kHz)
  1372. // that would otherwise leak into the audio output as clicks and tonal artifacts.
  1373. func (sess *streamSession) wfmAudioFilter(audio []float32, sampleRate int) []float32 {
  1374. if len(audio) == 0 || sampleRate <= 0 {
  1375. return audio
  1376. }
  1377. if sess.wfmAudioLPF == nil || sess.wfmAudioLPFRate != sampleRate {
  1378. sess.wfmAudioLPF = dsp.NewStatefulFIRReal(dsp.LowpassFIR(15000, sampleRate, 101))
  1379. sess.wfmAudioLPFRate = sampleRate
  1380. }
  1381. return sess.wfmAudioLPF.Process(audio)
  1382. }
  1383. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  1384. // Uses persistent FIR filter state across frames for click-free stereo.
  1385. // Reuses session scratch buffers to minimize allocations.
  1386. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  1387. if len(mono) == 0 || sampleRate <= 0 {
  1388. return nil, false
  1389. }
  1390. n := len(mono)
  1391. // Rebuild rate-dependent stereo filters when sampleRate changes
  1392. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  1393. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  1394. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  1395. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  1396. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  1397. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  1398. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  1399. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  1400. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  1401. sess.stereoFilterRate = sampleRate
  1402. // Initialize PLL for 19kHz pilot tracking.
  1403. sess.pilotPhase = 0
  1404. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  1405. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  1406. sess.pilotErrAvg = 0
  1407. sess.pilotI = 0
  1408. sess.pilotQ = 0
  1409. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  1410. }
  1411. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  1412. scratch := sess.growAudio(n * 5)
  1413. lpr := scratch[:n]
  1414. bpfLR := scratch[n : 2*n]
  1415. lr := scratch[2*n : 3*n]
  1416. work1 := scratch[3*n : 4*n]
  1417. work2 := scratch[4*n : 5*n]
  1418. sess.stereoLPF.ProcessInto(mono, lpr)
  1419. // 23-53kHz bandpass for L-R DSB-SC.
  1420. sess.stereoBPHi.ProcessInto(mono, work1)
  1421. sess.stereoBPLo.ProcessInto(mono, work2)
  1422. for i := 0; i < n; i++ {
  1423. bpfLR[i] = work1[i] - work2[i]
  1424. }
  1425. // 19kHz pilot bandpass for PLL.
  1426. sess.pilotLPFHi.ProcessInto(mono, work1)
  1427. sess.pilotLPFLo.ProcessInto(mono, work2)
  1428. for i := 0; i < n; i++ {
  1429. work1[i] = work1[i] - work2[i]
  1430. }
  1431. pilot := work1
  1432. phase := sess.pilotPhase
  1433. freq := sess.pilotFreq
  1434. alpha := sess.pilotAlpha
  1435. beta := sess.pilotBeta
  1436. iState := sess.pilotI
  1437. qState := sess.pilotQ
  1438. lpAlpha := sess.pilotLPAlpha
  1439. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  1440. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  1441. var pilotPower float64
  1442. var totalPower float64
  1443. var errSum float64
  1444. for i := 0; i < n; i++ {
  1445. p := float64(pilot[i])
  1446. sinP, cosP := math.Sincos(phase)
  1447. iMix := p * cosP
  1448. qMix := p * -sinP
  1449. iState += lpAlpha * (iMix - iState)
  1450. qState += lpAlpha * (qMix - qState)
  1451. err := math.Atan2(qState, iState)
  1452. freq += beta * err
  1453. if freq < minFreq {
  1454. freq = minFreq
  1455. } else if freq > maxFreq {
  1456. freq = maxFreq
  1457. }
  1458. phase += freq + alpha*err
  1459. if phase > 2*math.Pi {
  1460. phase -= 2 * math.Pi
  1461. } else if phase < 0 {
  1462. phase += 2 * math.Pi
  1463. }
  1464. totalPower += float64(mono[i]) * float64(mono[i])
  1465. pilotPower += p * p
  1466. errSum += math.Abs(err)
  1467. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  1468. }
  1469. sess.pilotPhase = phase
  1470. sess.pilotFreq = freq
  1471. sess.pilotI = iState
  1472. sess.pilotQ = qState
  1473. blockErr := errSum / float64(n)
  1474. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  1475. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  1476. pilotRatio := 0.0
  1477. if totalPower > 0 {
  1478. pilotRatio = pilotPower / totalPower
  1479. }
  1480. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  1481. // Lock heuristics: pilot power fraction and PLL phase error stability.
  1482. // Pilot power is a small but stable fraction of composite energy; require
  1483. // a modest floor plus PLL settling to avoid flapping in noise.
  1484. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  1485. out := make([]float32, n*2)
  1486. for i := 0; i < n; i++ {
  1487. out[i*2] = 0.5 * (lpr[i] + lr[i])
  1488. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  1489. }
  1490. return out, locked
  1491. }
  1492. // dspStateSnapshot captures persistent DSP state for segment splits.
  1493. type dspStateSnapshot struct {
  1494. overlapIQ []complex64
  1495. deemphL float64
  1496. deemphR float64
  1497. pilotPhase float64
  1498. pilotFreq float64
  1499. pilotAlpha float64
  1500. pilotBeta float64
  1501. pilotErrAvg float64
  1502. pilotI float64
  1503. pilotQ float64
  1504. pilotLPAlpha float64
  1505. monoResampler *dsp.Resampler
  1506. monoResamplerRate int
  1507. stereoResampler *dsp.StereoResampler
  1508. stereoResamplerRate int
  1509. stereoLPF *dsp.StatefulFIRReal
  1510. stereoFilterRate int
  1511. stereoBPHi *dsp.StatefulFIRReal
  1512. stereoBPLo *dsp.StatefulFIRReal
  1513. stereoLRLPF *dsp.StatefulFIRReal
  1514. stereoAALPF *dsp.StatefulFIRReal
  1515. pilotLPFHi *dsp.StatefulFIRReal
  1516. pilotLPFLo *dsp.StatefulFIRReal
  1517. preDemodFIR *dsp.StatefulFIRComplex
  1518. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  1519. preDemodDecim int
  1520. preDemodRate int
  1521. preDemodCutoff float64
  1522. preDemodDecimPhase int
  1523. wfmAudioLPF *dsp.StatefulFIRReal
  1524. wfmAudioLPFRate int
  1525. }
  1526. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  1527. return dspStateSnapshot{
  1528. overlapIQ: sess.overlapIQ,
  1529. deemphL: sess.deemphL,
  1530. deemphR: sess.deemphR,
  1531. pilotPhase: sess.pilotPhase,
  1532. pilotFreq: sess.pilotFreq,
  1533. pilotAlpha: sess.pilotAlpha,
  1534. pilotBeta: sess.pilotBeta,
  1535. pilotErrAvg: sess.pilotErrAvg,
  1536. pilotI: sess.pilotI,
  1537. pilotQ: sess.pilotQ,
  1538. pilotLPAlpha: sess.pilotLPAlpha,
  1539. monoResampler: sess.monoResampler,
  1540. monoResamplerRate: sess.monoResamplerRate,
  1541. stereoResampler: sess.stereoResampler,
  1542. stereoResamplerRate: sess.stereoResamplerRate,
  1543. stereoLPF: sess.stereoLPF,
  1544. stereoFilterRate: sess.stereoFilterRate,
  1545. stereoBPHi: sess.stereoBPHi,
  1546. stereoBPLo: sess.stereoBPLo,
  1547. stereoLRLPF: sess.stereoLRLPF,
  1548. stereoAALPF: sess.stereoAALPF,
  1549. pilotLPFHi: sess.pilotLPFHi,
  1550. pilotLPFLo: sess.pilotLPFLo,
  1551. preDemodFIR: sess.preDemodFIR,
  1552. preDemodDecimator: sess.preDemodDecimator,
  1553. preDemodDecim: sess.preDemodDecim,
  1554. preDemodRate: sess.preDemodRate,
  1555. preDemodCutoff: sess.preDemodCutoff,
  1556. preDemodDecimPhase: sess.preDemodDecimPhase,
  1557. wfmAudioLPF: sess.wfmAudioLPF,
  1558. wfmAudioLPFRate: sess.wfmAudioLPFRate,
  1559. }
  1560. }
  1561. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  1562. sess.overlapIQ = s.overlapIQ
  1563. sess.deemphL = s.deemphL
  1564. sess.deemphR = s.deemphR
  1565. sess.pilotPhase = s.pilotPhase
  1566. sess.pilotFreq = s.pilotFreq
  1567. sess.pilotAlpha = s.pilotAlpha
  1568. sess.pilotBeta = s.pilotBeta
  1569. sess.pilotErrAvg = s.pilotErrAvg
  1570. sess.pilotI = s.pilotI
  1571. sess.pilotQ = s.pilotQ
  1572. sess.pilotLPAlpha = s.pilotLPAlpha
  1573. sess.monoResampler = s.monoResampler
  1574. sess.monoResamplerRate = s.monoResamplerRate
  1575. sess.stereoResampler = s.stereoResampler
  1576. sess.stereoResamplerRate = s.stereoResamplerRate
  1577. sess.stereoLPF = s.stereoLPF
  1578. sess.stereoFilterRate = s.stereoFilterRate
  1579. sess.stereoBPHi = s.stereoBPHi
  1580. sess.stereoBPLo = s.stereoBPLo
  1581. sess.stereoLRLPF = s.stereoLRLPF
  1582. sess.stereoAALPF = s.stereoAALPF
  1583. sess.pilotLPFHi = s.pilotLPFHi
  1584. sess.pilotLPFLo = s.pilotLPFLo
  1585. sess.preDemodFIR = s.preDemodFIR
  1586. sess.preDemodDecimator = s.preDemodDecimator
  1587. sess.preDemodDecim = s.preDemodDecim
  1588. sess.preDemodRate = s.preDemodRate
  1589. sess.preDemodCutoff = s.preDemodCutoff
  1590. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1591. sess.wfmAudioLPF = s.wfmAudioLPF
  1592. sess.wfmAudioLPFRate = s.wfmAudioLPFRate
  1593. }
  1594. // ---------------------------------------------------------------------------
  1595. // Session management helpers
  1596. // ---------------------------------------------------------------------------
  1597. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1598. outputDir := st.policy.OutputDir
  1599. if outputDir == "" {
  1600. outputDir = "data/recordings"
  1601. }
  1602. demodName, channels := resolveDemod(sig)
  1603. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1604. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1605. dir := filepath.Join(outputDir, dirName)
  1606. if err := os.MkdirAll(dir, 0o755); err != nil {
  1607. return nil, err
  1608. }
  1609. wavPath := filepath.Join(dir, "audio.wav")
  1610. f, err := os.Create(wavPath)
  1611. if err != nil {
  1612. return nil, err
  1613. }
  1614. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1615. f.Close()
  1616. return nil, err
  1617. }
  1618. playbackMode, stereoState := initialPlaybackState(demodName)
  1619. sess := &streamSession{
  1620. sessionID: fmt.Sprintf("%d-%d-r", sig.ID, now.UnixMilli()),
  1621. signalID: sig.ID,
  1622. centerHz: sig.CenterHz,
  1623. bwHz: sig.BWHz,
  1624. snrDb: sig.SNRDb,
  1625. peakDb: sig.PeakDb,
  1626. class: sig.Class,
  1627. startTime: now,
  1628. lastFeed: now,
  1629. dir: dir,
  1630. wavFile: f,
  1631. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1632. sampleRate: streamAudioRate,
  1633. channels: channels,
  1634. demodName: demodName,
  1635. playbackMode: playbackMode,
  1636. stereoState: stereoState,
  1637. deemphasisUs: st.policy.DeemphasisUs,
  1638. }
  1639. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1640. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1641. return sess, nil
  1642. }
  1643. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1644. demodName, channels := resolveDemod(sig)
  1645. for _, pl := range st.pendingListens {
  1646. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1647. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1648. demodName = requested
  1649. if demodName == "WFM_STEREO" {
  1650. channels = 2
  1651. } else if d := demod.Get(demodName); d != nil {
  1652. channels = d.Channels()
  1653. } else {
  1654. channels = 1
  1655. }
  1656. break
  1657. }
  1658. }
  1659. }
  1660. playbackMode, stereoState := initialPlaybackState(demodName)
  1661. sess := &streamSession{
  1662. sessionID: fmt.Sprintf("%d-%d-l", sig.ID, now.UnixMilli()),
  1663. signalID: sig.ID,
  1664. centerHz: sig.CenterHz,
  1665. bwHz: sig.BWHz,
  1666. snrDb: sig.SNRDb,
  1667. peakDb: sig.PeakDb,
  1668. class: sig.Class,
  1669. startTime: now,
  1670. lastFeed: now,
  1671. listenOnly: true,
  1672. sampleRate: streamAudioRate,
  1673. channels: channels,
  1674. demodName: demodName,
  1675. playbackMode: playbackMode,
  1676. stereoState: stereoState,
  1677. deemphasisUs: st.policy.DeemphasisUs,
  1678. }
  1679. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1680. sig.ID, sig.CenterHz/1e6, demodName)
  1681. return sess
  1682. }
  1683. func resolveDemod(sig *detector.Signal) (string, int) {
  1684. demodName := "NFM"
  1685. if sig.Class != nil {
  1686. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1687. demodName = n
  1688. }
  1689. }
  1690. channels := 1
  1691. if demodName == "WFM_STEREO" {
  1692. channels = 2
  1693. } else if d := demod.Get(demodName); d != nil {
  1694. channels = d.Channels()
  1695. }
  1696. return demodName, channels
  1697. }
  1698. func initialPlaybackState(demodName string) (string, string) {
  1699. playbackMode := demodName
  1700. stereoState := "mono"
  1701. if demodName == "WFM_STEREO" {
  1702. stereoState = "searching"
  1703. }
  1704. return playbackMode, stereoState
  1705. }
  1706. func (sess *streamSession) audioInfo() AudioInfo {
  1707. return AudioInfo{
  1708. SampleRate: sess.sampleRate,
  1709. Channels: sess.channels,
  1710. Format: "s16le",
  1711. DemodName: sess.demodName,
  1712. PlaybackMode: sess.playbackMode,
  1713. StereoState: sess.stereoState,
  1714. }
  1715. }
  1716. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1717. infoJSON, _ := json.Marshal(info)
  1718. tagged := make([]byte, 1+len(infoJSON))
  1719. tagged[0] = 0x00 // tag: audio_info
  1720. copy(tagged[1:], infoJSON)
  1721. for _, sub := range subs {
  1722. select {
  1723. case sub.ch <- tagged:
  1724. default:
  1725. }
  1726. }
  1727. }
  1728. func defaultAudioInfoForMode(mode string) AudioInfo {
  1729. demodName := "NFM"
  1730. if requested := normalizeRequestedMode(mode); requested != "" {
  1731. demodName = requested
  1732. }
  1733. channels := 1
  1734. if demodName == "WFM_STEREO" {
  1735. channels = 2
  1736. } else if d := demod.Get(demodName); d != nil {
  1737. channels = d.Channels()
  1738. }
  1739. playbackMode, stereoState := initialPlaybackState(demodName)
  1740. return AudioInfo{
  1741. SampleRate: streamAudioRate,
  1742. Channels: channels,
  1743. Format: "s16le",
  1744. DemodName: demodName,
  1745. PlaybackMode: playbackMode,
  1746. StereoState: stereoState,
  1747. }
  1748. }
  1749. func normalizeRequestedMode(mode string) string {
  1750. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1751. case "", "AUTO":
  1752. return ""
  1753. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1754. return strings.ToUpper(strings.TrimSpace(mode))
  1755. default:
  1756. return ""
  1757. }
  1758. }
  1759. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1760. func (sess *streamSession) growIQ(n int) []complex64 {
  1761. if cap(sess.scratchIQ) >= n {
  1762. return sess.scratchIQ[:n]
  1763. }
  1764. sess.scratchIQ = make([]complex64, n, n*5/4)
  1765. return sess.scratchIQ
  1766. }
  1767. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1768. func (sess *streamSession) growAudio(n int) []float32 {
  1769. if cap(sess.scratchAudio) >= n {
  1770. return sess.scratchAudio[:n]
  1771. }
  1772. sess.scratchAudio = make([]float32, n, n*5/4)
  1773. return sess.scratchAudio
  1774. }
  1775. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1776. func (sess *streamSession) growPCM(n int) []byte {
  1777. if cap(sess.scratchPCM) >= n {
  1778. return sess.scratchPCM[:n]
  1779. }
  1780. sess.scratchPCM = make([]byte, n, n*5/4)
  1781. return sess.scratchPCM
  1782. }
  1783. func convertToListenOnly(sess *streamSession) {
  1784. if sess.wavBuf != nil {
  1785. _ = sess.wavBuf.Flush()
  1786. }
  1787. if sess.wavFile != nil {
  1788. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1789. sess.wavFile.Close()
  1790. }
  1791. sess.wavFile = nil
  1792. sess.wavBuf = nil
  1793. sess.listenOnly = true
  1794. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1795. }
  1796. func closeSession(sess *streamSession, policy *Policy) {
  1797. if sess.listenOnly {
  1798. return
  1799. }
  1800. if sess.wavBuf != nil {
  1801. _ = sess.wavBuf.Flush()
  1802. }
  1803. if sess.wavFile != nil {
  1804. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1805. sess.wavFile.Close()
  1806. sess.wavFile = nil
  1807. sess.wavBuf = nil
  1808. }
  1809. dur := sess.lastFeed.Sub(sess.startTime)
  1810. files := map[string]any{
  1811. "audio": "audio.wav",
  1812. "audio_sample_rate": sess.sampleRate,
  1813. "audio_channels": sess.channels,
  1814. "audio_demod": sess.demodName,
  1815. "recording_mode": "streaming",
  1816. }
  1817. meta := Meta{
  1818. EventID: sess.signalID,
  1819. Start: sess.startTime,
  1820. End: sess.lastFeed,
  1821. CenterHz: sess.centerHz,
  1822. BandwidthHz: sess.bwHz,
  1823. SampleRate: sess.sampleRate,
  1824. SNRDb: sess.snrDb,
  1825. PeakDb: sess.peakDb,
  1826. Class: sess.class,
  1827. DurationMs: dur.Milliseconds(),
  1828. Files: files,
  1829. }
  1830. b, err := json.MarshalIndent(meta, "", " ")
  1831. if err == nil {
  1832. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1833. }
  1834. if policy != nil {
  1835. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1836. }
  1837. }
  1838. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1839. if len(sess.audioSubs) == 0 {
  1840. return
  1841. }
  1842. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1843. tagged := make([]byte, 1+pcmLen)
  1844. tagged[0] = 0x01
  1845. copy(tagged[1:], pcm[:pcmLen])
  1846. alive := sess.audioSubs[:0]
  1847. for _, sub := range sess.audioSubs {
  1848. select {
  1849. case sub.ch <- tagged:
  1850. default:
  1851. st.droppedPCM++
  1852. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1853. if st.telemetry != nil {
  1854. st.telemetry.IncCounter("streamer.pcm.drop", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1855. }
  1856. }
  1857. alive = append(alive, sub)
  1858. }
  1859. sess.audioSubs = alive
  1860. if st.telemetry != nil {
  1861. st.telemetry.SetGauge("streamer.subscribers.count", float64(len(alive)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1862. }
  1863. }
  1864. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1865. if len(st.policy.ClassFilter) == 0 {
  1866. return true
  1867. }
  1868. if cls == nil {
  1869. return false
  1870. }
  1871. for _, f := range st.policy.ClassFilter {
  1872. if strings.EqualFold(f, string(cls.ModType)) {
  1873. return true
  1874. }
  1875. }
  1876. return false
  1877. }
  1878. // ErrNoSession is returned when no matching signal session exists.
  1879. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1880. // ---------------------------------------------------------------------------
  1881. // WAV header helpers
  1882. // ---------------------------------------------------------------------------
  1883. func writeWAVFile(path string, audio []float32, sampleRate int, channels int) error {
  1884. f, err := os.Create(path)
  1885. if err != nil {
  1886. return err
  1887. }
  1888. defer f.Close()
  1889. return writeWAVTo(f, audio, sampleRate, channels)
  1890. }
  1891. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1892. if channels <= 0 {
  1893. channels = 1
  1894. }
  1895. hdr := make([]byte, 44)
  1896. copy(hdr[0:4], "RIFF")
  1897. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1898. copy(hdr[8:12], "WAVE")
  1899. copy(hdr[12:16], "fmt ")
  1900. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1901. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1902. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1903. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1904. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1905. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1906. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1907. copy(hdr[36:40], "data")
  1908. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1909. _, err := f.Write(hdr)
  1910. return err
  1911. }
  1912. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1913. dataSize := uint32(totalSamples * 2)
  1914. var buf [4]byte
  1915. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1916. if _, err := f.Seek(4, 0); err != nil {
  1917. return
  1918. }
  1919. _, _ = f.Write(buf[:])
  1920. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1921. if _, err := f.Seek(24, 0); err != nil {
  1922. return
  1923. }
  1924. _, _ = f.Write(buf[:])
  1925. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1926. if _, err := f.Seek(28, 0); err != nil {
  1927. return
  1928. }
  1929. _, _ = f.Write(buf[:])
  1930. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1931. if _, err := f.Seek(40, 0); err != nil {
  1932. return
  1933. }
  1934. _, _ = f.Write(buf[:])
  1935. }
  1936. // ResetStreams forces all active streaming sessions to discard their FIR states and decimation phases.
  1937. // This is used when the upstream DSP drops samples, creating a hard break in phase continuity.
  1938. func (st *Streamer) ResetStreams() {
  1939. st.mu.Lock()
  1940. defer st.mu.Unlock()
  1941. if st.telemetry != nil {
  1942. st.telemetry.IncCounter("streamer.reset.count", 1, nil)
  1943. st.telemetry.Event("stream_reset", "warn", "stream DSP state reset", nil, map[string]any{"sessions": len(st.sessions)})
  1944. }
  1945. for _, sess := range st.sessions {
  1946. sess.preDemodFIR = nil
  1947. sess.preDemodDecimator = nil
  1948. sess.preDemodDecimPhase = 0
  1949. sess.stereoResampler = nil
  1950. sess.monoResampler = nil
  1951. sess.wfmAudioLPF = nil
  1952. }
  1953. }