Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

2052 строки
66KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "sort"
  15. "sync"
  16. "time"
  17. "sdr-wideband-suite/internal/classifier"
  18. "sdr-wideband-suite/internal/demod"
  19. "sdr-wideband-suite/internal/detector"
  20. "sdr-wideband-suite/internal/dsp"
  21. "sdr-wideband-suite/internal/logging"
  22. "sdr-wideband-suite/internal/telemetry"
  23. )
  24. // ---------------------------------------------------------------------------
  25. // streamSession — one open demod session for one signal
  26. // ---------------------------------------------------------------------------
  27. type streamSession struct {
  28. sessionID string
  29. signalID int64
  30. centerHz float64
  31. bwHz float64
  32. snrDb float64
  33. peakDb float64
  34. class *classifier.Classification
  35. startTime time.Time
  36. lastFeed time.Time
  37. playbackMode string
  38. stereoState string
  39. lastAudioTs time.Time
  40. debugDumpStart time.Time
  41. debugDumpUntil time.Time
  42. debugDumpBase string
  43. demodDump []float32
  44. finalDump []float32
  45. lastAudioL float32
  46. lastAudioR float32
  47. prevAudioL float64 // second-to-last L sample for boundary transient detection
  48. lastAudioSet bool
  49. lastDecIQ complex64
  50. prevDecIQ complex64
  51. lastDecIQSet bool
  52. lastExtractIQ complex64
  53. prevExtractIQ complex64
  54. lastExtractIQSet bool
  55. lastDemodL float32
  56. prevDemodL float64
  57. lastDemodSet bool
  58. snippetSeq uint64
  59. // listenOnly sessions have no WAV file and no disk I/O.
  60. // They exist solely to feed audio to live-listen subscribers.
  61. listenOnly bool
  62. // Recording state (nil/zero for listen-only sessions)
  63. dir string
  64. wavFile *os.File
  65. wavBuf *bufio.Writer
  66. wavSamples int64
  67. segmentIdx int
  68. sampleRate int // actual output audio sample rate (always streamAudioRate)
  69. channels int
  70. demodName string
  71. // --- Persistent DSP state for click-free streaming ---
  72. // Overlap-save: tail of previous extracted IQ snippet.
  73. // Currently unused for live demod after removing the extra discriminator
  74. // overlap prepend, but kept in DSP snapshot state for compatibility.
  75. overlapIQ []complex64
  76. // De-emphasis IIR state (persists across frames)
  77. deemphL float64
  78. deemphR float64
  79. // Stereo lock state for live WFM streaming
  80. stereoEnabled bool
  81. stereoOnCount int
  82. stereoOffCount int
  83. // Pilot-locked stereo PLL state (19kHz pilot)
  84. pilotPhase float64
  85. pilotFreq float64
  86. pilotAlpha float64
  87. pilotBeta float64
  88. pilotErrAvg float64
  89. pilotI float64
  90. pilotQ float64
  91. pilotLPAlpha float64
  92. // Polyphase resampler (replaces integer-decimate hack)
  93. monoResampler *dsp.Resampler
  94. monoResamplerRate int
  95. stereoResampler *dsp.StereoResampler
  96. stereoResamplerRate int
  97. // AQ-4: Stateful FIR filters for click-free stereo decode
  98. stereoFilterRate int
  99. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  100. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  101. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  102. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  103. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  104. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  105. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  106. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  107. // and avoids per-frame FIR recomputation)
  108. preDemodFIR *dsp.StatefulFIRComplex
  109. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  110. preDemodDecim int // cached decimation factor
  111. preDemodRate int // cached snipRate this FIR was built for
  112. preDemodCutoff float64 // cached cutoff
  113. preDemodDecimPhase int // retained for backward compatibility in snapshots/debug
  114. // AQ-2: De-emphasis config (µs, 0 = disabled)
  115. deemphasisUs float64
  116. // Scratch buffers — reused across frames to avoid GC pressure.
  117. // Grown as needed, never shrunk.
  118. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  119. scratchAudio []float32 // for stereo decode intermediates
  120. scratchPCM []byte // for PCM encoding
  121. // live-listen subscribers
  122. audioSubs []audioSub
  123. }
  124. type audioSub struct {
  125. id int64
  126. ch chan []byte
  127. }
  128. type RuntimeSignalInfo struct {
  129. DemodName string
  130. PlaybackMode string
  131. StereoState string
  132. Channels int
  133. SampleRate int
  134. }
  135. // AudioInfo describes the audio format of a live-listen subscription.
  136. // Sent to the WebSocket client as the first message.
  137. type AudioInfo struct {
  138. SampleRate int `json:"sample_rate"`
  139. Channels int `json:"channels"`
  140. Format string `json:"format"` // always "s16le"
  141. DemodName string `json:"demod"`
  142. PlaybackMode string `json:"playback_mode,omitempty"`
  143. StereoState string `json:"stereo_state,omitempty"`
  144. }
  145. const (
  146. streamAudioRate = 48000
  147. resamplerTaps = 32 // taps per polyphase arm — good quality
  148. )
  149. var debugDumpDelay = func() time.Duration {
  150. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DELAY_SECONDS"))
  151. if raw == "" {
  152. return 5 * time.Second
  153. }
  154. v, err := strconv.Atoi(raw)
  155. if err != nil || v < 0 {
  156. return 5 * time.Second
  157. }
  158. return time.Duration(v) * time.Second
  159. }()
  160. var debugDumpDuration = func() time.Duration {
  161. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DURATION_SECONDS"))
  162. if raw == "" {
  163. return 15 * time.Second
  164. }
  165. v, err := strconv.Atoi(raw)
  166. if err != nil || v <= 0 {
  167. return 15 * time.Second
  168. }
  169. return time.Duration(v) * time.Second
  170. }()
  171. var audioDumpEnabled = func() bool {
  172. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_AUDIO_DUMP_ENABLED"))
  173. if raw == "" {
  174. return false
  175. }
  176. v, err := strconv.ParseBool(raw)
  177. if err != nil {
  178. return false
  179. }
  180. return v
  181. }()
  182. var decHeadTrimSamples = func() int {
  183. raw := strings.TrimSpace(os.Getenv("SDR_DEC_HEAD_TRIM"))
  184. if raw == "" {
  185. return 0
  186. }
  187. v, err := strconv.Atoi(raw)
  188. if err != nil || v < 0 {
  189. return 0
  190. }
  191. return v
  192. }()
  193. // ---------------------------------------------------------------------------
  194. // Streamer — manages all active streaming sessions
  195. // ---------------------------------------------------------------------------
  196. type streamFeedItem struct {
  197. signal detector.Signal
  198. snippet []complex64
  199. snipRate int
  200. }
  201. type streamFeedMsg struct {
  202. traceID uint64
  203. items []streamFeedItem
  204. enqueuedAt time.Time
  205. }
  206. type Streamer struct {
  207. mu sync.Mutex
  208. sessions map[int64]*streamSession
  209. policy Policy
  210. centerHz float64
  211. nextSub int64
  212. feedCh chan streamFeedMsg
  213. done chan struct{}
  214. droppedFeed uint64
  215. droppedPCM uint64
  216. lastFeedTS time.Time
  217. lastProcTS time.Time
  218. // pendingListens are subscribers waiting for a matching session.
  219. pendingListens map[int64]*pendingListen
  220. telemetry *telemetry.Collector
  221. }
  222. type pendingListen struct {
  223. freq float64
  224. bw float64
  225. mode string
  226. ch chan []byte
  227. }
  228. func newStreamer(policy Policy, centerHz float64, coll *telemetry.Collector) *Streamer {
  229. st := &Streamer{
  230. sessions: make(map[int64]*streamSession),
  231. policy: policy,
  232. centerHz: centerHz,
  233. feedCh: make(chan streamFeedMsg, 2),
  234. done: make(chan struct{}),
  235. pendingListens: make(map[int64]*pendingListen),
  236. telemetry: coll,
  237. }
  238. go st.worker()
  239. return st
  240. }
  241. func (st *Streamer) worker() {
  242. for msg := range st.feedCh {
  243. st.processFeed(msg)
  244. }
  245. close(st.done)
  246. }
  247. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  248. st.mu.Lock()
  249. defer st.mu.Unlock()
  250. wasEnabled := st.policy.Enabled
  251. st.policy = policy
  252. st.centerHz = centerHz
  253. // If recording was just disabled, close recording sessions
  254. // but keep listen-only sessions alive.
  255. if wasEnabled && !policy.Enabled {
  256. for id, sess := range st.sessions {
  257. if sess.listenOnly {
  258. continue
  259. }
  260. if len(sess.audioSubs) > 0 {
  261. // Convert to listen-only: close WAV but keep session
  262. convertToListenOnly(sess)
  263. } else {
  264. closeSession(sess, &st.policy)
  265. delete(st.sessions, id)
  266. }
  267. }
  268. }
  269. }
  270. // HasListeners returns true if any sessions have audio subscribers
  271. // or there are pending listen requests. Used by the DSP loop to
  272. // decide whether to feed snippets even when recording is disabled.
  273. func (st *Streamer) HasListeners() bool {
  274. st.mu.Lock()
  275. defer st.mu.Unlock()
  276. return st.hasListenersLocked()
  277. }
  278. func (st *Streamer) hasListenersLocked() bool {
  279. if len(st.pendingListens) > 0 {
  280. return true
  281. }
  282. for _, sess := range st.sessions {
  283. if len(sess.audioSubs) > 0 {
  284. return true
  285. }
  286. }
  287. return false
  288. }
  289. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  290. // Feeds are accepted if:
  291. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  292. // - Any live-listen subscribers exist (listen-only mode)
  293. //
  294. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  295. // data, so items can be passed directly without another copy.
  296. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  297. st.mu.Lock()
  298. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  299. hasListeners := st.hasListenersLocked()
  300. pending := len(st.pendingListens)
  301. debugLiveAudio := st.policy.DebugLiveAudio
  302. now := time.Now()
  303. if !st.lastFeedTS.IsZero() {
  304. gap := now.Sub(st.lastFeedTS)
  305. if gap > 150*time.Millisecond {
  306. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  307. }
  308. }
  309. st.lastFeedTS = now
  310. st.mu.Unlock()
  311. if debugLiveAudio {
  312. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  313. }
  314. if (!recEnabled && !hasListeners) || len(items) == 0 {
  315. return
  316. }
  317. if st.telemetry != nil {
  318. st.telemetry.SetGauge("streamer.feed.queue_len", float64(len(st.feedCh)), nil)
  319. st.telemetry.SetGauge("streamer.pending_listeners", float64(pending), nil)
  320. st.telemetry.Observe("streamer.feed.batch_size", float64(len(items)), nil)
  321. }
  322. select {
  323. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items, enqueuedAt: time.Now()}:
  324. default:
  325. st.droppedFeed++
  326. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  327. if st.telemetry != nil {
  328. st.telemetry.IncCounter("streamer.feed.drop", 1, nil)
  329. st.telemetry.Event("stream_feed_drop", "warn", "feed queue full", nil, map[string]any{
  330. "trace_id": traceID,
  331. "queue_len": len(st.feedCh),
  332. })
  333. }
  334. }
  335. }
  336. // processFeed runs in the worker goroutine.
  337. func (st *Streamer) processFeed(msg streamFeedMsg) {
  338. procStart := time.Now()
  339. lockStart := time.Now()
  340. st.mu.Lock()
  341. lockWait := time.Since(lockStart)
  342. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  343. hasListeners := st.hasListenersLocked()
  344. now := time.Now()
  345. if !st.lastProcTS.IsZero() {
  346. gap := now.Sub(st.lastProcTS)
  347. if gap > 150*time.Millisecond {
  348. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  349. if st.telemetry != nil {
  350. st.telemetry.IncCounter("streamer.process.gap.count", 1, nil)
  351. st.telemetry.Observe("streamer.process.gap_ms", float64(gap.Milliseconds()), nil)
  352. }
  353. }
  354. }
  355. st.lastProcTS = now
  356. defer st.mu.Unlock()
  357. defer func() {
  358. if st.telemetry != nil {
  359. st.telemetry.Observe("streamer.process.total_ms", float64(time.Since(procStart).Microseconds())/1000.0, nil)
  360. st.telemetry.Observe("streamer.lock_wait_ms", float64(lockWait.Microseconds())/1000.0, telemetry.TagsFromPairs("lock", "process"))
  361. }
  362. }()
  363. if st.telemetry != nil {
  364. st.telemetry.Observe("streamer.feed.enqueue_delay_ms", float64(now.Sub(msg.enqueuedAt).Microseconds())/1000.0, nil)
  365. st.telemetry.SetGauge("streamer.sessions.active", float64(len(st.sessions)), nil)
  366. }
  367. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  368. if !recEnabled && !hasListeners {
  369. return
  370. }
  371. seen := make(map[int64]bool, len(msg.items))
  372. for i := range msg.items {
  373. item := &msg.items[i]
  374. sig := &item.signal
  375. seen[sig.ID] = true
  376. if sig.ID == 0 || sig.Class == nil {
  377. continue
  378. }
  379. if len(item.snippet) == 0 || item.snipRate <= 0 {
  380. continue
  381. }
  382. // Decide whether this signal needs a session
  383. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  384. needsListen := st.signalHasListenerLocked(sig)
  385. className := "<nil>"
  386. demodName := ""
  387. if sig.Class != nil {
  388. className = string(sig.Class.ModType)
  389. demodName, _ = resolveDemod(sig)
  390. }
  391. if st.policy.DebugLiveAudio {
  392. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  393. }
  394. if !needsRecording && !needsListen {
  395. continue
  396. }
  397. sess, exists := st.sessions[sig.ID]
  398. requestedMode := ""
  399. for _, pl := range st.pendingListens {
  400. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  401. if m := normalizeRequestedMode(pl.mode); m != "" {
  402. requestedMode = m
  403. break
  404. }
  405. }
  406. }
  407. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  408. for _, sub := range sess.audioSubs {
  409. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  410. }
  411. delete(st.sessions, sig.ID)
  412. sess = nil
  413. exists = false
  414. }
  415. if !exists {
  416. if needsRecording {
  417. s, err := st.openRecordingSession(sig, now)
  418. if err != nil {
  419. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  420. sig.ID, sig.CenterHz/1e6, err)
  421. if st.telemetry != nil {
  422. st.telemetry.IncCounter("streamer.session.open_error", 1, telemetry.TagsFromPairs("kind", "recording"))
  423. }
  424. continue
  425. }
  426. st.sessions[sig.ID] = s
  427. sess = s
  428. } else {
  429. s := st.openListenSession(sig, now)
  430. st.sessions[sig.ID] = s
  431. sess = s
  432. }
  433. // Attach any pending listeners
  434. st.attachPendingListeners(sess)
  435. if st.telemetry != nil {
  436. st.telemetry.IncCounter("streamer.session.open", 1, telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID)))
  437. st.telemetry.Event("session_open", "info", "stream session opened", telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{
  438. "listen_only": sess.listenOnly,
  439. "demod": sess.demodName,
  440. })
  441. }
  442. }
  443. // Update metadata
  444. sess.lastFeed = now
  445. sess.centerHz = sig.CenterHz
  446. sess.bwHz = sig.BWHz
  447. if sig.SNRDb > sess.snrDb {
  448. sess.snrDb = sig.SNRDb
  449. }
  450. if sig.PeakDb > sess.peakDb {
  451. sess.peakDb = sig.PeakDb
  452. }
  453. if sig.Class != nil {
  454. sess.class = sig.Class
  455. }
  456. // Demod with persistent state
  457. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  458. audioStart := time.Now()
  459. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate, st.telemetry)
  460. if st.telemetry != nil {
  461. st.telemetry.Observe("streamer.process_snippet_ms", float64(time.Since(audioStart).Microseconds())/1000.0, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  462. }
  463. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  464. if len(audio) == 0 {
  465. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  466. if st.telemetry != nil {
  467. st.telemetry.IncCounter("streamer.audio.empty", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  468. }
  469. }
  470. if len(audio) > 0 {
  471. if sess.wavSamples == 0 && audioRate > 0 {
  472. sess.sampleRate = audioRate
  473. }
  474. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  475. pcmLen := len(audio) * 2
  476. pcm := sess.growPCM(pcmLen)
  477. for k, s := range audio {
  478. v := int16(clip(s * 32767))
  479. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  480. }
  481. if !sess.listenOnly && sess.wavBuf != nil {
  482. n, err := sess.wavBuf.Write(pcm)
  483. if err != nil {
  484. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  485. } else {
  486. sess.wavSamples += int64(n / 2)
  487. }
  488. }
  489. // Gap logging for live-audio sessions + transient click detector
  490. if len(sess.audioSubs) > 0 {
  491. if !sess.lastAudioTs.IsZero() {
  492. gap := time.Since(sess.lastAudioTs)
  493. if gap > 150*time.Millisecond {
  494. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  495. if st.telemetry != nil {
  496. st.telemetry.IncCounter("streamer.audio.gap.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  497. st.telemetry.Observe("streamer.audio.gap_ms", float64(gap.Milliseconds()), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  498. }
  499. }
  500. }
  501. // Transient click detector: finds short impulses (1-3 samples)
  502. // that deviate sharply from the local signal trend.
  503. // A click looks like: ...smooth... SPIKE ...smooth...
  504. // Normal FM audio has large deltas too, but they follow
  505. // a continuous curve. A click has high |d2/dt2| (acceleration).
  506. //
  507. // Method: second-derivative detector. For each sample triplet
  508. // (a, b, c), compute |2b - a - c| which is the discrete
  509. // second derivative magnitude. High values = transient spike.
  510. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  511. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  512. stride := sess.channels
  513. if stride < 1 {
  514. stride = 1
  515. }
  516. nFrames := len(audio) / stride
  517. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  518. if sess.lastAudioSet && nFrames >= 1 {
  519. // second derivative across boundary: |2*last - prevLast - first|
  520. first := float64(audio[0])
  521. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  522. if d2 > 0.15 {
  523. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  524. if st.telemetry != nil {
  525. st.telemetry.IncCounter("audio.boundary_click.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  526. st.telemetry.Observe("audio.boundary_click.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  527. }
  528. }
  529. }
  530. // Intra-frame transient scan (L channel only for performance)
  531. nClicks := 0
  532. maxD2 := float64(0)
  533. maxD2Pos := 0
  534. for k := 1; k < nFrames-1; k++ {
  535. a := float64(audio[(k-1)*stride])
  536. b := float64(audio[k*stride])
  537. c := float64(audio[(k+1)*stride])
  538. d2 := math.Abs(2*b - a - c)
  539. if d2 > maxD2 {
  540. maxD2 = d2
  541. maxD2Pos = k
  542. }
  543. if d2 > 0.15 {
  544. nClicks++
  545. }
  546. }
  547. if nClicks > 0 {
  548. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  549. if st.telemetry != nil {
  550. st.telemetry.IncCounter("audio.intra_click.count", float64(nClicks), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  551. st.telemetry.Observe("audio.intra_click.max_d2", maxD2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  552. }
  553. }
  554. // Store last two samples for next frame's boundary check
  555. if nFrames >= 2 {
  556. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  557. sess.lastAudioL = audio[(nFrames-1)*stride]
  558. if stride > 1 {
  559. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  560. }
  561. } else if nFrames == 1 {
  562. sess.prevAudioL = float64(sess.lastAudioL)
  563. sess.lastAudioL = audio[0]
  564. if stride > 1 && len(audio) >= 2 {
  565. sess.lastAudioR = audio[1]
  566. }
  567. }
  568. sess.lastAudioSet = true
  569. }
  570. sess.lastAudioTs = time.Now()
  571. }
  572. st.fanoutPCM(sess, pcm, pcmLen)
  573. }
  574. // Segment split (recording sessions only)
  575. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  576. segIdx := sess.segmentIdx + 1
  577. oldSubs := sess.audioSubs
  578. oldState := sess.captureDSPState()
  579. sess.audioSubs = nil
  580. closeSession(sess, &st.policy)
  581. s, err := st.openRecordingSession(sig, now)
  582. if err != nil {
  583. delete(st.sessions, sig.ID)
  584. continue
  585. }
  586. s.segmentIdx = segIdx
  587. s.audioSubs = oldSubs
  588. s.restoreDSPState(oldState)
  589. st.sessions[sig.ID] = s
  590. if st.telemetry != nil {
  591. st.telemetry.IncCounter("streamer.session.reopen", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID)))
  592. st.telemetry.Event("session_reopen", "info", "stream session rotated by max duration", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{
  593. "old_session": sess.sessionID,
  594. "new_session": s.sessionID,
  595. })
  596. }
  597. }
  598. }
  599. // Close sessions for disappeared signals (with grace period)
  600. for id, sess := range st.sessions {
  601. if seen[id] {
  602. continue
  603. }
  604. gracePeriod := 3 * time.Second
  605. if sess.listenOnly {
  606. gracePeriod = 5 * time.Second
  607. }
  608. if now.Sub(sess.lastFeed) > gracePeriod {
  609. for _, sub := range sess.audioSubs {
  610. close(sub.ch)
  611. }
  612. sess.audioSubs = nil
  613. if !sess.listenOnly {
  614. closeSession(sess, &st.policy)
  615. }
  616. if st.telemetry != nil {
  617. st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID))
  618. st.telemetry.Event("session_close", "info", "stream session closed", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID), map[string]any{
  619. "reason": "signal_missing",
  620. "listen_only": sess.listenOnly,
  621. })
  622. }
  623. delete(st.sessions, id)
  624. }
  625. }
  626. }
  627. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  628. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  629. if st.policy.DebugLiveAudio {
  630. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  631. }
  632. return true
  633. }
  634. for subID, pl := range st.pendingListens {
  635. delta := math.Abs(sig.CenterHz - pl.freq)
  636. if delta < 200000 {
  637. if st.policy.DebugLiveAudio {
  638. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  639. }
  640. return true
  641. }
  642. }
  643. return false
  644. }
  645. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  646. for subID, pl := range st.pendingListens {
  647. requestedMode := normalizeRequestedMode(pl.mode)
  648. if requestedMode != "" && sess.demodName != requestedMode {
  649. continue
  650. }
  651. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  652. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  653. delete(st.pendingListens, subID)
  654. // Send updated audio_info now that we know the real session params.
  655. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  656. infoJSON, _ := json.Marshal(sess.audioInfo())
  657. tagged := make([]byte, 1+len(infoJSON))
  658. tagged[0] = 0x00 // tag: audio_info
  659. copy(tagged[1:], infoJSON)
  660. select {
  661. case pl.ch <- tagged:
  662. default:
  663. }
  664. if audioDumpEnabled {
  665. now := time.Now()
  666. sess.debugDumpStart = now.Add(debugDumpDelay)
  667. sess.debugDumpUntil = sess.debugDumpStart.Add(debugDumpDuration)
  668. sess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", sess.signalID, now.Format("20060102-150405")))
  669. sess.demodDump = nil
  670. sess.finalDump = nil
  671. }
  672. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  673. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  674. if audioDumpEnabled {
  675. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", sess.signalID, sess.debugDumpStart.Format(time.RFC3339), sess.debugDumpUntil.Format(time.RFC3339))
  676. }
  677. }
  678. }
  679. }
  680. // CloseAll finalises all sessions and stops the worker goroutine.
  681. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  682. st.mu.Lock()
  683. defer st.mu.Unlock()
  684. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  685. for _, sess := range st.sessions {
  686. out[sess.signalID] = RuntimeSignalInfo{
  687. DemodName: sess.demodName,
  688. PlaybackMode: sess.playbackMode,
  689. StereoState: sess.stereoState,
  690. Channels: sess.channels,
  691. SampleRate: sess.sampleRate,
  692. }
  693. }
  694. return out
  695. }
  696. func (st *Streamer) CloseAll() {
  697. close(st.feedCh)
  698. <-st.done
  699. st.mu.Lock()
  700. defer st.mu.Unlock()
  701. for id, sess := range st.sessions {
  702. for _, sub := range sess.audioSubs {
  703. close(sub.ch)
  704. }
  705. sess.audioSubs = nil
  706. if !sess.listenOnly {
  707. closeSession(sess, &st.policy)
  708. }
  709. if st.telemetry != nil {
  710. st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID))
  711. }
  712. delete(st.sessions, id)
  713. }
  714. for _, pl := range st.pendingListens {
  715. close(pl.ch)
  716. }
  717. st.pendingListens = nil
  718. if st.telemetry != nil {
  719. st.telemetry.Event("streamer_close_all", "info", "all stream sessions closed", nil, nil)
  720. }
  721. }
  722. // ActiveSessions returns the number of open streaming sessions.
  723. func (st *Streamer) ActiveSessions() int {
  724. st.mu.Lock()
  725. defer st.mu.Unlock()
  726. return len(st.sessions)
  727. }
  728. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  729. //
  730. // LL-2: Returns AudioInfo with correct channels and sample rate.
  731. // LL-3: Returns error only on hard failures (nil streamer etc).
  732. //
  733. // If a matching session exists, attaches immediately. Otherwise, the
  734. // subscriber is held as "pending" and will be attached when a matching
  735. // signal appears in the next DSP frame.
  736. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  737. ch := make(chan []byte, 64)
  738. st.mu.Lock()
  739. defer st.mu.Unlock()
  740. st.nextSub++
  741. subID := st.nextSub
  742. requestedMode := normalizeRequestedMode(mode)
  743. // Try to find a matching session
  744. var bestSess *streamSession
  745. bestDist := math.MaxFloat64
  746. for _, sess := range st.sessions {
  747. if requestedMode != "" && sess.demodName != requestedMode {
  748. continue
  749. }
  750. d := math.Abs(sess.centerHz - freq)
  751. if d < bestDist {
  752. bestDist = d
  753. bestSess = sess
  754. }
  755. }
  756. if bestSess != nil && bestDist < 200000 {
  757. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  758. if audioDumpEnabled {
  759. now := time.Now()
  760. bestSess.debugDumpStart = now.Add(debugDumpDelay)
  761. bestSess.debugDumpUntil = bestSess.debugDumpStart.Add(debugDumpDuration)
  762. bestSess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", bestSess.signalID, now.Format("20060102-150405")))
  763. bestSess.demodDump = nil
  764. bestSess.finalDump = nil
  765. }
  766. info := bestSess.audioInfo()
  767. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  768. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  769. if audioDumpEnabled {
  770. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", bestSess.signalID, bestSess.debugDumpStart.Format(time.RFC3339), bestSess.debugDumpUntil.Format(time.RFC3339))
  771. }
  772. if st.telemetry != nil {
  773. st.telemetry.IncCounter("streamer.listener.attach", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", bestSess.signalID), "session_id", bestSess.sessionID))
  774. }
  775. return subID, ch, info, nil
  776. }
  777. // No matching session yet — add as pending listener
  778. st.pendingListens[subID] = &pendingListen{
  779. freq: freq,
  780. bw: bw,
  781. mode: mode,
  782. ch: ch,
  783. }
  784. info := defaultAudioInfoForMode(mode)
  785. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  786. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  787. if st.telemetry != nil {
  788. st.telemetry.IncCounter("streamer.listener.pending", 1, nil)
  789. st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil)
  790. }
  791. return subID, ch, info, nil
  792. }
  793. // UnsubscribeAudio removes a live-listen subscriber.
  794. func (st *Streamer) UnsubscribeAudio(subID int64) {
  795. st.mu.Lock()
  796. defer st.mu.Unlock()
  797. if pl, ok := st.pendingListens[subID]; ok {
  798. close(pl.ch)
  799. delete(st.pendingListens, subID)
  800. if st.telemetry != nil {
  801. st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "pending"))
  802. st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil)
  803. }
  804. return
  805. }
  806. for _, sess := range st.sessions {
  807. for i, sub := range sess.audioSubs {
  808. if sub.id == subID {
  809. close(sub.ch)
  810. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  811. if st.telemetry != nil {
  812. st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "active", "session_id", sess.sessionID))
  813. }
  814. return
  815. }
  816. }
  817. }
  818. }
  819. // ---------------------------------------------------------------------------
  820. // Session: stateful extraction + demod
  821. // ---------------------------------------------------------------------------
  822. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  823. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  824. // output with zero transient artifacts.
  825. type iqHeadProbeStats struct {
  826. meanMag float64
  827. minMag float64
  828. maxStep float64
  829. p95Step float64
  830. lowMag int
  831. }
  832. func probeIQHeadStats(iq []complex64, probeLen int) iqHeadProbeStats {
  833. if probeLen <= 0 || len(iq) == 0 {
  834. return iqHeadProbeStats{}
  835. }
  836. if len(iq) < probeLen {
  837. probeLen = len(iq)
  838. }
  839. stats := iqHeadProbeStats{minMag: math.MaxFloat64}
  840. steps := make([]float64, 0, probeLen)
  841. var sum float64
  842. for i := 0; i < probeLen; i++ {
  843. v := iq[i]
  844. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  845. sum += mag
  846. if mag < stats.minMag {
  847. stats.minMag = mag
  848. }
  849. if mag < 0.02 {
  850. stats.lowMag++
  851. }
  852. if i > 0 {
  853. p := iq[i-1]
  854. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  855. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  856. step := math.Abs(math.Atan2(num, den))
  857. steps = append(steps, step)
  858. if step > stats.maxStep {
  859. stats.maxStep = step
  860. }
  861. }
  862. }
  863. stats.meanMag = sum / float64(probeLen)
  864. if len(steps) > 0 {
  865. sorted := append([]float64(nil), steps...)
  866. sort.Float64s(sorted)
  867. idx := int(math.Round(0.95 * float64(len(sorted)-1)))
  868. if idx < 0 {
  869. idx = 0
  870. }
  871. if idx >= len(sorted) {
  872. idx = len(sorted) - 1
  873. }
  874. stats.p95Step = sorted[idx]
  875. }
  876. if stats.minMag == math.MaxFloat64 {
  877. stats.minMag = 0
  878. }
  879. return stats
  880. }
  881. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int, coll *telemetry.Collector) ([]float32, int) {
  882. if len(snippet) == 0 || snipRate <= 0 {
  883. return nil, 0
  884. }
  885. baseTags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)
  886. if coll != nil {
  887. coll.SetGauge("iq.stage.snippet.length", float64(len(snippet)), baseTags)
  888. stats := probeIQHeadStats(snippet, 64)
  889. coll.Observe("iq.snippet.head_mean_mag", stats.meanMag, baseTags)
  890. coll.Observe("iq.snippet.head_min_mag", stats.minMag, baseTags)
  891. coll.Observe("iq.snippet.head_max_step", stats.maxStep, baseTags)
  892. coll.Observe("iq.snippet.head_p95_step", stats.p95Step, baseTags)
  893. coll.SetGauge("iq.snippet.head_low_magnitude_count", float64(stats.lowMag), baseTags)
  894. if sess.lastExtractIQSet {
  895. prevMag := math.Hypot(float64(real(sess.lastExtractIQ)), float64(imag(sess.lastExtractIQ)))
  896. currMag := math.Hypot(float64(real(snippet[0])), float64(imag(snippet[0])))
  897. deltaMag := math.Abs(currMag - prevMag)
  898. num := float64(real(sess.lastExtractIQ))*float64(imag(snippet[0])) - float64(imag(sess.lastExtractIQ))*float64(real(snippet[0]))
  899. den := float64(real(sess.lastExtractIQ))*float64(real(snippet[0])) + float64(imag(sess.lastExtractIQ))*float64(imag(snippet[0]))
  900. deltaPhase := math.Abs(math.Atan2(num, den))
  901. d2 := float64(real(snippet[0]-sess.lastExtractIQ))*float64(real(snippet[0]-sess.lastExtractIQ)) + float64(imag(snippet[0]-sess.lastExtractIQ))*float64(imag(snippet[0]-sess.lastExtractIQ))
  902. coll.Observe("iq.extract.output.boundary.delta_mag", deltaMag, baseTags)
  903. coll.Observe("iq.extract.output.boundary.delta_phase", deltaPhase, baseTags)
  904. coll.Observe("iq.extract.output.boundary.d2", d2, baseTags)
  905. coll.Observe("iq.extract.output.boundary.discontinuity_score", deltaMag+deltaPhase, baseTags)
  906. }
  907. }
  908. if len(snippet) > 0 {
  909. sess.prevExtractIQ = sess.lastExtractIQ
  910. sess.lastExtractIQ = snippet[len(snippet)-1]
  911. sess.lastExtractIQSet = true
  912. }
  913. isWFMStereo := sess.demodName == "WFM_STEREO"
  914. isWFM := sess.demodName == "WFM" || isWFMStereo
  915. demodName := sess.demodName
  916. if isWFMStereo {
  917. demodName = "WFM"
  918. }
  919. d := demod.Get(demodName)
  920. if d == nil {
  921. d = demod.Get("NFM")
  922. }
  923. if d == nil {
  924. return nil, 0
  925. }
  926. // The extra 1-sample discriminator overlap prepend was removed after it was
  927. // shown to shift the downstream decimation phase and create heavy click
  928. // artifacts in steady-state streaming/recording. The upstream extraction path
  929. // and the stateful FIR/decimation stages already provide continuity.
  930. fullSnip := snippet
  931. overlapApplied := false
  932. prevTailValid := false
  933. if logging.EnabledCategory("prefir") && len(fullSnip) > 0 {
  934. probeN := 64
  935. if len(fullSnip) < probeN {
  936. probeN = len(fullSnip)
  937. }
  938. minPreMag := math.MaxFloat64
  939. minPreIdx := 0
  940. maxPreStep := 0.0
  941. maxPreStepIdx := 0
  942. for i := 0; i < probeN; i++ {
  943. v := fullSnip[i]
  944. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  945. if mag < minPreMag {
  946. minPreMag = mag
  947. minPreIdx = i
  948. }
  949. if i > 0 {
  950. p := fullSnip[i-1]
  951. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  952. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  953. step := math.Abs(math.Atan2(num, den))
  954. if step > maxPreStep {
  955. maxPreStep = step
  956. maxPreStepIdx = i - 1
  957. }
  958. }
  959. }
  960. logging.Debug("prefir", "pre_fir_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "snip_len", len(fullSnip))
  961. if minPreMag < 0.18 {
  962. logging.Warn("prefir", "pre_fir_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx)
  963. }
  964. if maxPreStep > 1.5 {
  965. logging.Warn("prefir", "pre_fir_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "min_mag", minPreMag, "min_idx", minPreIdx)
  966. }
  967. }
  968. // --- Stateful anti-alias FIR + decimation to demod rate ---
  969. demodRate := d.OutputSampleRate()
  970. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  971. if decim1 < 1 {
  972. decim1 = 1
  973. }
  974. // WFM override: force decim1=2 (256kHz) instead of round(512k/192k)=3 (170kHz).
  975. // At decim1=3, Nyquist is 85kHz which clips FM broadcast ±75kHz deviation.
  976. // At decim1=2, Nyquist is 128kHz → full FM deviation + stereo pilot + guard band.
  977. // Bonus: 256000→48000 resampler ratio is L=3/M=16 (96 taps, 1kB) instead of
  978. // the pathological L=24000/M=85333 (768k taps, 6MB) from 170666→48000.
  979. if isWFM && decim1 > 2 && snipRate/2 >= 200000 {
  980. decim1 = 2
  981. }
  982. actualDemodRate := snipRate / decim1
  983. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  984. var dec []complex64
  985. if decim1 > 1 {
  986. // FIR cutoff: for WFM, use 90kHz (above ±75kHz FM deviation + guard).
  987. // For NFM/other: use standard Nyquist*0.8 cutoff.
  988. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  989. if isWFM {
  990. cutoff = 90000
  991. }
  992. // Lazy-init or reinit stateful FIR if parameters changed
  993. if sess.preDemodDecimator == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff || sess.preDemodDecim != decim1 {
  994. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  995. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  996. sess.preDemodDecimator = dsp.NewStatefulDecimatingFIRComplex(taps, decim1)
  997. sess.preDemodRate = snipRate
  998. sess.preDemodCutoff = cutoff
  999. sess.preDemodDecim = decim1
  1000. sess.preDemodDecimPhase = 0
  1001. if coll != nil {
  1002. coll.IncCounter("dsp.pre_demod.init", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1003. coll.Event("prefir_reinit", "info", "pre-demod decimator reinitialized", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1004. "snip_rate": snipRate,
  1005. "cutoff_hz": cutoff,
  1006. "decim": decim1,
  1007. })
  1008. }
  1009. }
  1010. decimPhaseBefore := sess.preDemodDecimPhase
  1011. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  1012. dec = sess.preDemodDecimator.Process(fullSnip)
  1013. sess.preDemodDecimPhase = sess.preDemodDecimator.Phase()
  1014. if coll != nil {
  1015. coll.Observe("dsp.pre_demod.decimation_factor", float64(decim1), baseTags)
  1016. coll.SetGauge("iq.stage.pre_demod.length", float64(len(dec)), baseTags)
  1017. decStats := probeIQHeadStats(dec, 64)
  1018. coll.Observe("iq.pre_demod.head_mean_mag", decStats.meanMag, baseTags)
  1019. coll.Observe("iq.pre_demod.head_min_mag", decStats.minMag, baseTags)
  1020. coll.Observe("iq.pre_demod.head_max_step", decStats.maxStep, baseTags)
  1021. coll.Observe("iq.pre_demod.head_p95_step", decStats.p95Step, baseTags)
  1022. coll.SetGauge("iq.pre_demod.head_low_magnitude_count", float64(decStats.lowMag), baseTags)
  1023. }
  1024. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase)
  1025. } else {
  1026. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0)
  1027. dec = fullSnip
  1028. }
  1029. if decHeadTrimSamples > 0 && decHeadTrimSamples < len(dec) {
  1030. logging.Warn("boundary", "dec_head_trim_applied", "signal", sess.signalID, "trim", decHeadTrimSamples, "before_len", len(dec))
  1031. dec = dec[decHeadTrimSamples:]
  1032. if coll != nil {
  1033. coll.IncCounter("dsp.pre_demod.head_trim", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1034. }
  1035. }
  1036. if logging.EnabledCategory("boundary") && len(dec) > 0 {
  1037. first := dec[0]
  1038. if sess.lastDecIQSet {
  1039. d2Re := math.Abs(2*float64(real(sess.lastDecIQ)) - float64(real(sess.prevDecIQ)) - float64(real(first)))
  1040. d2Im := math.Abs(2*float64(imag(sess.lastDecIQ)) - float64(imag(sess.prevDecIQ)) - float64(imag(first)))
  1041. d2Mag := math.Hypot(d2Re, d2Im)
  1042. if d2Mag > 0.15 {
  1043. logging.Warn("boundary", "dec_iq_boundary", "signal", sess.signalID, "d2", d2Mag)
  1044. if coll != nil {
  1045. coll.IncCounter("iq.dec.boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1046. coll.Observe("iq.dec.boundary.d2", d2Mag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1047. }
  1048. }
  1049. }
  1050. headN := 16
  1051. if len(dec) < headN {
  1052. headN = len(dec)
  1053. }
  1054. tailN := 16
  1055. if len(dec) < tailN {
  1056. tailN = len(dec)
  1057. }
  1058. var headSum, tailSum, minMag, maxMag float64
  1059. minMag = math.MaxFloat64
  1060. for i, v := range dec {
  1061. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  1062. if mag < minMag {
  1063. minMag = mag
  1064. }
  1065. if mag > maxMag {
  1066. maxMag = mag
  1067. }
  1068. if i < headN {
  1069. headSum += mag
  1070. }
  1071. }
  1072. for i := len(dec) - tailN; i < len(dec); i++ {
  1073. if i >= 0 {
  1074. v := dec[i]
  1075. tailSum += math.Hypot(float64(real(v)), float64(imag(v)))
  1076. }
  1077. }
  1078. headAvg := 0.0
  1079. if headN > 0 {
  1080. headAvg = headSum / float64(headN)
  1081. }
  1082. tailAvg := 0.0
  1083. if tailN > 0 {
  1084. tailAvg = tailSum / float64(tailN)
  1085. }
  1086. logging.Debug("boundary", "dec_iq_meter", "signal", sess.signalID, "len", len(dec), "head_avg", headAvg, "tail_avg", tailAvg, "min_mag", minMag, "max_mag", maxMag)
  1087. if tailAvg > 0 {
  1088. ratio := headAvg / tailAvg
  1089. if ratio < 0.75 || ratio > 1.25 {
  1090. logging.Warn("boundary", "dec_iq_head_tail_skew", "signal", sess.signalID, "head_avg", headAvg, "tail_avg", tailAvg, "ratio", ratio)
  1091. }
  1092. if coll != nil {
  1093. coll.Observe("iq.dec.head_tail_ratio", ratio, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1094. }
  1095. }
  1096. probeN := 64
  1097. if len(dec) < probeN {
  1098. probeN = len(dec)
  1099. }
  1100. minHeadMag := math.MaxFloat64
  1101. minHeadIdx := 0
  1102. maxHeadStep := 0.0
  1103. maxHeadStepIdx := 0
  1104. for i := 0; i < probeN; i++ {
  1105. v := dec[i]
  1106. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  1107. if mag < minHeadMag {
  1108. minHeadMag = mag
  1109. minHeadIdx = i
  1110. }
  1111. if i > 0 {
  1112. p := dec[i-1]
  1113. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  1114. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  1115. step := math.Abs(math.Atan2(num, den))
  1116. if step > maxHeadStep {
  1117. maxHeadStep = step
  1118. maxHeadStepIdx = i - 1
  1119. }
  1120. }
  1121. }
  1122. logging.Debug("boundary", "dec_iq_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  1123. if minHeadMag < 0.18 {
  1124. logging.Warn("boundary", "dec_iq_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  1125. }
  1126. if maxHeadStep > 1.5 {
  1127. logging.Warn("boundary", "dec_iq_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx, "min_mag", minHeadMag, "min_idx", minHeadIdx)
  1128. }
  1129. if coll != nil {
  1130. coll.Observe("iq.dec.magnitude.min", minMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1131. coll.Observe("iq.dec.magnitude.max", maxMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1132. coll.Observe("iq.dec.phase_step.max", maxHeadStep, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1133. }
  1134. if len(dec) >= 2 {
  1135. sess.prevDecIQ = dec[len(dec)-2]
  1136. sess.lastDecIQ = dec[len(dec)-1]
  1137. } else {
  1138. sess.prevDecIQ = sess.lastDecIQ
  1139. sess.lastDecIQ = dec[0]
  1140. }
  1141. sess.lastDecIQSet = true
  1142. }
  1143. // --- FM/AM/etc Demod ---
  1144. audio := d.Demod(dec, actualDemodRate)
  1145. if len(audio) == 0 {
  1146. return nil, 0
  1147. }
  1148. if coll != nil {
  1149. coll.SetGauge("audio.stage.demod.length", float64(len(audio)), baseTags)
  1150. probe := 64
  1151. if len(audio) < probe {
  1152. probe = len(audio)
  1153. }
  1154. if probe > 0 {
  1155. var headAbs, tailAbs float64
  1156. for i := 0; i < probe; i++ {
  1157. headAbs += math.Abs(float64(audio[i]))
  1158. tailAbs += math.Abs(float64(audio[len(audio)-probe+i]))
  1159. }
  1160. coll.Observe("audio.demod.head_mean_abs", headAbs/float64(probe), baseTags)
  1161. coll.Observe("audio.demod.tail_mean_abs", tailAbs/float64(probe), baseTags)
  1162. coll.Observe("audio.demod.edge_delta_abs", math.Abs(float64(audio[0])-float64(audio[len(audio)-1])), baseTags)
  1163. }
  1164. }
  1165. if logging.EnabledCategory("boundary") {
  1166. stride := d.Channels()
  1167. if stride < 1 {
  1168. stride = 1
  1169. }
  1170. nFrames := len(audio) / stride
  1171. if nFrames > 0 {
  1172. first := float64(audio[0])
  1173. if sess.lastDemodSet {
  1174. d2 := math.Abs(2*float64(sess.lastDemodL) - sess.prevDemodL - first)
  1175. if d2 > 0.15 {
  1176. logging.Warn("boundary", "demod_boundary", "signal", sess.signalID, "d2", d2)
  1177. if coll != nil {
  1178. coll.IncCounter("audio.demod_boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1179. coll.Observe("audio.demod_boundary.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1180. }
  1181. }
  1182. }
  1183. if nFrames >= 2 {
  1184. sess.prevDemodL = float64(audio[(nFrames-2)*stride])
  1185. sess.lastDemodL = audio[(nFrames-1)*stride]
  1186. } else {
  1187. sess.prevDemodL = float64(sess.lastDemodL)
  1188. sess.lastDemodL = audio[0]
  1189. }
  1190. sess.lastDemodSet = true
  1191. }
  1192. }
  1193. logging.Debug("boundary", "audio_path", "signal", sess.signalID, "demod", demodName, "actual_rate", actualDemodRate, "audio_len", len(audio), "channels", d.Channels(), "overlap_applied", overlapApplied, "prev_tail_valid", prevTailValid)
  1194. shouldDump := !sess.debugDumpStart.IsZero() && !sess.debugDumpUntil.IsZero()
  1195. if shouldDump {
  1196. now := time.Now()
  1197. shouldDump = !now.Before(sess.debugDumpStart) && now.Before(sess.debugDumpUntil)
  1198. }
  1199. if shouldDump {
  1200. sess.demodDump = append(sess.demodDump, audio...)
  1201. }
  1202. // --- Stateful stereo decode with conservative lock/hysteresis ---
  1203. channels := 1
  1204. if isWFMStereo {
  1205. sess.playbackMode = "WFM_STEREO"
  1206. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  1207. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  1208. if locked {
  1209. sess.stereoOnCount++
  1210. sess.stereoOffCount = 0
  1211. if sess.stereoOnCount >= 4 {
  1212. sess.stereoEnabled = true
  1213. }
  1214. } else {
  1215. sess.stereoOnCount = 0
  1216. sess.stereoOffCount++
  1217. if sess.stereoOffCount >= 10 {
  1218. sess.stereoEnabled = false
  1219. }
  1220. }
  1221. prevPlayback := sess.playbackMode
  1222. prevStereo := sess.stereoState
  1223. if sess.stereoEnabled && len(stereoAudio) > 0 {
  1224. sess.stereoState = "locked"
  1225. audio = stereoAudio
  1226. } else {
  1227. sess.stereoState = "mono-fallback"
  1228. dual := make([]float32, len(audio)*2)
  1229. for i, s := range audio {
  1230. dual[i*2] = s
  1231. dual[i*2+1] = s
  1232. }
  1233. audio = dual
  1234. }
  1235. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  1236. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  1237. }
  1238. }
  1239. // --- Polyphase resample to exact 48kHz ---
  1240. if actualDemodRate != streamAudioRate {
  1241. if channels > 1 {
  1242. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  1243. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  1244. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1245. sess.stereoResamplerRate = actualDemodRate
  1246. if coll != nil {
  1247. coll.Event("resampler_reset", "info", "stereo resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1248. "mode": "stereo",
  1249. "rate": actualDemodRate,
  1250. })
  1251. }
  1252. }
  1253. audio = sess.stereoResampler.Process(audio)
  1254. } else {
  1255. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  1256. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  1257. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1258. sess.monoResamplerRate = actualDemodRate
  1259. if coll != nil {
  1260. coll.Event("resampler_reset", "info", "mono resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1261. "mode": "mono",
  1262. "rate": actualDemodRate,
  1263. })
  1264. }
  1265. }
  1266. audio = sess.monoResampler.Process(audio)
  1267. }
  1268. }
  1269. if coll != nil {
  1270. coll.SetGauge("audio.stage.output.length", float64(len(audio)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1271. }
  1272. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  1273. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  1274. tau := sess.deemphasisUs * 1e-6
  1275. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  1276. if channels > 1 {
  1277. nFrames := len(audio) / channels
  1278. yL, yR := sess.deemphL, sess.deemphR
  1279. for i := 0; i < nFrames; i++ {
  1280. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  1281. audio[i*2] = float32(yL)
  1282. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  1283. audio[i*2+1] = float32(yR)
  1284. }
  1285. sess.deemphL, sess.deemphR = yL, yR
  1286. } else {
  1287. y := sess.deemphL
  1288. for i := range audio {
  1289. y = alpha*y + (1-alpha)*float64(audio[i])
  1290. audio[i] = float32(y)
  1291. }
  1292. sess.deemphL = y
  1293. }
  1294. }
  1295. if isWFM {
  1296. for i := range audio {
  1297. audio[i] *= 0.35
  1298. }
  1299. }
  1300. if shouldDump {
  1301. sess.finalDump = append(sess.finalDump, audio...)
  1302. } else if !sess.debugDumpUntil.IsZero() && time.Now().After(sess.debugDumpUntil) && sess.debugDumpBase != "" {
  1303. _ = os.MkdirAll(filepath.Dir(sess.debugDumpBase), 0o755)
  1304. if len(sess.demodDump) > 0 {
  1305. _ = writeWAVFile(sess.debugDumpBase+"-demod.wav", sess.demodDump, actualDemodRate, d.Channels())
  1306. }
  1307. if len(sess.finalDump) > 0 {
  1308. _ = writeWAVFile(sess.debugDumpBase+"-final.wav", sess.finalDump, streamAudioRate, channels)
  1309. }
  1310. logging.Warn("boundary", "debug_audio_dump_window", "signal", sess.signalID, "base", sess.debugDumpBase)
  1311. sess.debugDumpBase = ""
  1312. sess.demodDump = nil
  1313. sess.finalDump = nil
  1314. sess.debugDumpStart = time.Time{}
  1315. sess.debugDumpUntil = time.Time{}
  1316. }
  1317. return audio, streamAudioRate
  1318. }
  1319. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  1320. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  1321. // loopBW is in Hz, sampleRate in samples/sec.
  1322. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  1323. if sampleRate <= 0 || loopBW <= 0 {
  1324. return 0, 0
  1325. }
  1326. bl := loopBW / float64(sampleRate)
  1327. theta := bl / (damping + 0.25/damping)
  1328. d := 1 + 2*damping*theta + theta*theta
  1329. alpha := (4 * damping * theta) / d
  1330. beta := (4 * theta * theta) / d
  1331. return alpha, beta
  1332. }
  1333. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  1334. // Uses persistent FIR filter state across frames for click-free stereo.
  1335. // Reuses session scratch buffers to minimize allocations.
  1336. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  1337. if len(mono) == 0 || sampleRate <= 0 {
  1338. return nil, false
  1339. }
  1340. n := len(mono)
  1341. // Rebuild rate-dependent stereo filters when sampleRate changes
  1342. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  1343. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  1344. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  1345. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  1346. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  1347. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  1348. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  1349. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  1350. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  1351. sess.stereoFilterRate = sampleRate
  1352. // Initialize PLL for 19kHz pilot tracking.
  1353. sess.pilotPhase = 0
  1354. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  1355. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  1356. sess.pilotErrAvg = 0
  1357. sess.pilotI = 0
  1358. sess.pilotQ = 0
  1359. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  1360. }
  1361. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  1362. scratch := sess.growAudio(n * 5)
  1363. lpr := scratch[:n]
  1364. bpfLR := scratch[n : 2*n]
  1365. lr := scratch[2*n : 3*n]
  1366. work1 := scratch[3*n : 4*n]
  1367. work2 := scratch[4*n : 5*n]
  1368. sess.stereoLPF.ProcessInto(mono, lpr)
  1369. // 23-53kHz bandpass for L-R DSB-SC.
  1370. sess.stereoBPHi.ProcessInto(mono, work1)
  1371. sess.stereoBPLo.ProcessInto(mono, work2)
  1372. for i := 0; i < n; i++ {
  1373. bpfLR[i] = work1[i] - work2[i]
  1374. }
  1375. // 19kHz pilot bandpass for PLL.
  1376. sess.pilotLPFHi.ProcessInto(mono, work1)
  1377. sess.pilotLPFLo.ProcessInto(mono, work2)
  1378. for i := 0; i < n; i++ {
  1379. work1[i] = work1[i] - work2[i]
  1380. }
  1381. pilot := work1
  1382. phase := sess.pilotPhase
  1383. freq := sess.pilotFreq
  1384. alpha := sess.pilotAlpha
  1385. beta := sess.pilotBeta
  1386. iState := sess.pilotI
  1387. qState := sess.pilotQ
  1388. lpAlpha := sess.pilotLPAlpha
  1389. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  1390. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  1391. var pilotPower float64
  1392. var totalPower float64
  1393. var errSum float64
  1394. for i := 0; i < n; i++ {
  1395. p := float64(pilot[i])
  1396. sinP, cosP := math.Sincos(phase)
  1397. iMix := p * cosP
  1398. qMix := p * -sinP
  1399. iState += lpAlpha * (iMix - iState)
  1400. qState += lpAlpha * (qMix - qState)
  1401. err := math.Atan2(qState, iState)
  1402. freq += beta * err
  1403. if freq < minFreq {
  1404. freq = minFreq
  1405. } else if freq > maxFreq {
  1406. freq = maxFreq
  1407. }
  1408. phase += freq + alpha*err
  1409. if phase > 2*math.Pi {
  1410. phase -= 2 * math.Pi
  1411. } else if phase < 0 {
  1412. phase += 2 * math.Pi
  1413. }
  1414. totalPower += float64(mono[i]) * float64(mono[i])
  1415. pilotPower += p * p
  1416. errSum += math.Abs(err)
  1417. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  1418. }
  1419. sess.pilotPhase = phase
  1420. sess.pilotFreq = freq
  1421. sess.pilotI = iState
  1422. sess.pilotQ = qState
  1423. blockErr := errSum / float64(n)
  1424. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  1425. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  1426. pilotRatio := 0.0
  1427. if totalPower > 0 {
  1428. pilotRatio = pilotPower / totalPower
  1429. }
  1430. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  1431. // Lock heuristics: pilot power fraction and PLL phase error stability.
  1432. // Pilot power is a small but stable fraction of composite energy; require
  1433. // a modest floor plus PLL settling to avoid flapping in noise.
  1434. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  1435. out := make([]float32, n*2)
  1436. for i := 0; i < n; i++ {
  1437. out[i*2] = 0.5 * (lpr[i] + lr[i])
  1438. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  1439. }
  1440. return out, locked
  1441. }
  1442. // dspStateSnapshot captures persistent DSP state for segment splits.
  1443. type dspStateSnapshot struct {
  1444. overlapIQ []complex64
  1445. deemphL float64
  1446. deemphR float64
  1447. pilotPhase float64
  1448. pilotFreq float64
  1449. pilotAlpha float64
  1450. pilotBeta float64
  1451. pilotErrAvg float64
  1452. pilotI float64
  1453. pilotQ float64
  1454. pilotLPAlpha float64
  1455. monoResampler *dsp.Resampler
  1456. monoResamplerRate int
  1457. stereoResampler *dsp.StereoResampler
  1458. stereoResamplerRate int
  1459. stereoLPF *dsp.StatefulFIRReal
  1460. stereoFilterRate int
  1461. stereoBPHi *dsp.StatefulFIRReal
  1462. stereoBPLo *dsp.StatefulFIRReal
  1463. stereoLRLPF *dsp.StatefulFIRReal
  1464. stereoAALPF *dsp.StatefulFIRReal
  1465. pilotLPFHi *dsp.StatefulFIRReal
  1466. pilotLPFLo *dsp.StatefulFIRReal
  1467. preDemodFIR *dsp.StatefulFIRComplex
  1468. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  1469. preDemodDecim int
  1470. preDemodRate int
  1471. preDemodCutoff float64
  1472. preDemodDecimPhase int
  1473. }
  1474. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  1475. return dspStateSnapshot{
  1476. overlapIQ: sess.overlapIQ,
  1477. deemphL: sess.deemphL,
  1478. deemphR: sess.deemphR,
  1479. pilotPhase: sess.pilotPhase,
  1480. pilotFreq: sess.pilotFreq,
  1481. pilotAlpha: sess.pilotAlpha,
  1482. pilotBeta: sess.pilotBeta,
  1483. pilotErrAvg: sess.pilotErrAvg,
  1484. pilotI: sess.pilotI,
  1485. pilotQ: sess.pilotQ,
  1486. pilotLPAlpha: sess.pilotLPAlpha,
  1487. monoResampler: sess.monoResampler,
  1488. monoResamplerRate: sess.monoResamplerRate,
  1489. stereoResampler: sess.stereoResampler,
  1490. stereoResamplerRate: sess.stereoResamplerRate,
  1491. stereoLPF: sess.stereoLPF,
  1492. stereoFilterRate: sess.stereoFilterRate,
  1493. stereoBPHi: sess.stereoBPHi,
  1494. stereoBPLo: sess.stereoBPLo,
  1495. stereoLRLPF: sess.stereoLRLPF,
  1496. stereoAALPF: sess.stereoAALPF,
  1497. pilotLPFHi: sess.pilotLPFHi,
  1498. pilotLPFLo: sess.pilotLPFLo,
  1499. preDemodFIR: sess.preDemodFIR,
  1500. preDemodDecimator: sess.preDemodDecimator,
  1501. preDemodDecim: sess.preDemodDecim,
  1502. preDemodRate: sess.preDemodRate,
  1503. preDemodCutoff: sess.preDemodCutoff,
  1504. preDemodDecimPhase: sess.preDemodDecimPhase,
  1505. }
  1506. }
  1507. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  1508. sess.overlapIQ = s.overlapIQ
  1509. sess.deemphL = s.deemphL
  1510. sess.deemphR = s.deemphR
  1511. sess.pilotPhase = s.pilotPhase
  1512. sess.pilotFreq = s.pilotFreq
  1513. sess.pilotAlpha = s.pilotAlpha
  1514. sess.pilotBeta = s.pilotBeta
  1515. sess.pilotErrAvg = s.pilotErrAvg
  1516. sess.pilotI = s.pilotI
  1517. sess.pilotQ = s.pilotQ
  1518. sess.pilotLPAlpha = s.pilotLPAlpha
  1519. sess.monoResampler = s.monoResampler
  1520. sess.monoResamplerRate = s.monoResamplerRate
  1521. sess.stereoResampler = s.stereoResampler
  1522. sess.stereoResamplerRate = s.stereoResamplerRate
  1523. sess.stereoLPF = s.stereoLPF
  1524. sess.stereoFilterRate = s.stereoFilterRate
  1525. sess.stereoBPHi = s.stereoBPHi
  1526. sess.stereoBPLo = s.stereoBPLo
  1527. sess.stereoLRLPF = s.stereoLRLPF
  1528. sess.stereoAALPF = s.stereoAALPF
  1529. sess.pilotLPFHi = s.pilotLPFHi
  1530. sess.pilotLPFLo = s.pilotLPFLo
  1531. sess.preDemodFIR = s.preDemodFIR
  1532. sess.preDemodDecimator = s.preDemodDecimator
  1533. sess.preDemodDecim = s.preDemodDecim
  1534. sess.preDemodRate = s.preDemodRate
  1535. sess.preDemodCutoff = s.preDemodCutoff
  1536. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1537. }
  1538. // ---------------------------------------------------------------------------
  1539. // Session management helpers
  1540. // ---------------------------------------------------------------------------
  1541. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1542. outputDir := st.policy.OutputDir
  1543. if outputDir == "" {
  1544. outputDir = "data/recordings"
  1545. }
  1546. demodName, channels := resolveDemod(sig)
  1547. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1548. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1549. dir := filepath.Join(outputDir, dirName)
  1550. if err := os.MkdirAll(dir, 0o755); err != nil {
  1551. return nil, err
  1552. }
  1553. wavPath := filepath.Join(dir, "audio.wav")
  1554. f, err := os.Create(wavPath)
  1555. if err != nil {
  1556. return nil, err
  1557. }
  1558. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1559. f.Close()
  1560. return nil, err
  1561. }
  1562. playbackMode, stereoState := initialPlaybackState(demodName)
  1563. sess := &streamSession{
  1564. sessionID: fmt.Sprintf("%d-%d-r", sig.ID, now.UnixMilli()),
  1565. signalID: sig.ID,
  1566. centerHz: sig.CenterHz,
  1567. bwHz: sig.BWHz,
  1568. snrDb: sig.SNRDb,
  1569. peakDb: sig.PeakDb,
  1570. class: sig.Class,
  1571. startTime: now,
  1572. lastFeed: now,
  1573. dir: dir,
  1574. wavFile: f,
  1575. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1576. sampleRate: streamAudioRate,
  1577. channels: channels,
  1578. demodName: demodName,
  1579. playbackMode: playbackMode,
  1580. stereoState: stereoState,
  1581. deemphasisUs: st.policy.DeemphasisUs,
  1582. }
  1583. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1584. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1585. return sess, nil
  1586. }
  1587. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1588. demodName, channels := resolveDemod(sig)
  1589. for _, pl := range st.pendingListens {
  1590. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1591. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1592. demodName = requested
  1593. if demodName == "WFM_STEREO" {
  1594. channels = 2
  1595. } else if d := demod.Get(demodName); d != nil {
  1596. channels = d.Channels()
  1597. } else {
  1598. channels = 1
  1599. }
  1600. break
  1601. }
  1602. }
  1603. }
  1604. playbackMode, stereoState := initialPlaybackState(demodName)
  1605. sess := &streamSession{
  1606. sessionID: fmt.Sprintf("%d-%d-l", sig.ID, now.UnixMilli()),
  1607. signalID: sig.ID,
  1608. centerHz: sig.CenterHz,
  1609. bwHz: sig.BWHz,
  1610. snrDb: sig.SNRDb,
  1611. peakDb: sig.PeakDb,
  1612. class: sig.Class,
  1613. startTime: now,
  1614. lastFeed: now,
  1615. listenOnly: true,
  1616. sampleRate: streamAudioRate,
  1617. channels: channels,
  1618. demodName: demodName,
  1619. playbackMode: playbackMode,
  1620. stereoState: stereoState,
  1621. deemphasisUs: st.policy.DeemphasisUs,
  1622. }
  1623. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1624. sig.ID, sig.CenterHz/1e6, demodName)
  1625. return sess
  1626. }
  1627. func resolveDemod(sig *detector.Signal) (string, int) {
  1628. demodName := "NFM"
  1629. if sig.Class != nil {
  1630. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1631. demodName = n
  1632. }
  1633. }
  1634. channels := 1
  1635. if demodName == "WFM_STEREO" {
  1636. channels = 2
  1637. } else if d := demod.Get(demodName); d != nil {
  1638. channels = d.Channels()
  1639. }
  1640. return demodName, channels
  1641. }
  1642. func initialPlaybackState(demodName string) (string, string) {
  1643. playbackMode := demodName
  1644. stereoState := "mono"
  1645. if demodName == "WFM_STEREO" {
  1646. stereoState = "searching"
  1647. }
  1648. return playbackMode, stereoState
  1649. }
  1650. func (sess *streamSession) audioInfo() AudioInfo {
  1651. return AudioInfo{
  1652. SampleRate: sess.sampleRate,
  1653. Channels: sess.channels,
  1654. Format: "s16le",
  1655. DemodName: sess.demodName,
  1656. PlaybackMode: sess.playbackMode,
  1657. StereoState: sess.stereoState,
  1658. }
  1659. }
  1660. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1661. infoJSON, _ := json.Marshal(info)
  1662. tagged := make([]byte, 1+len(infoJSON))
  1663. tagged[0] = 0x00 // tag: audio_info
  1664. copy(tagged[1:], infoJSON)
  1665. for _, sub := range subs {
  1666. select {
  1667. case sub.ch <- tagged:
  1668. default:
  1669. }
  1670. }
  1671. }
  1672. func defaultAudioInfoForMode(mode string) AudioInfo {
  1673. demodName := "NFM"
  1674. if requested := normalizeRequestedMode(mode); requested != "" {
  1675. demodName = requested
  1676. }
  1677. channels := 1
  1678. if demodName == "WFM_STEREO" {
  1679. channels = 2
  1680. } else if d := demod.Get(demodName); d != nil {
  1681. channels = d.Channels()
  1682. }
  1683. playbackMode, stereoState := initialPlaybackState(demodName)
  1684. return AudioInfo{
  1685. SampleRate: streamAudioRate,
  1686. Channels: channels,
  1687. Format: "s16le",
  1688. DemodName: demodName,
  1689. PlaybackMode: playbackMode,
  1690. StereoState: stereoState,
  1691. }
  1692. }
  1693. func normalizeRequestedMode(mode string) string {
  1694. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1695. case "", "AUTO":
  1696. return ""
  1697. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1698. return strings.ToUpper(strings.TrimSpace(mode))
  1699. default:
  1700. return ""
  1701. }
  1702. }
  1703. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1704. func (sess *streamSession) growIQ(n int) []complex64 {
  1705. if cap(sess.scratchIQ) >= n {
  1706. return sess.scratchIQ[:n]
  1707. }
  1708. sess.scratchIQ = make([]complex64, n, n*5/4)
  1709. return sess.scratchIQ
  1710. }
  1711. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1712. func (sess *streamSession) growAudio(n int) []float32 {
  1713. if cap(sess.scratchAudio) >= n {
  1714. return sess.scratchAudio[:n]
  1715. }
  1716. sess.scratchAudio = make([]float32, n, n*5/4)
  1717. return sess.scratchAudio
  1718. }
  1719. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1720. func (sess *streamSession) growPCM(n int) []byte {
  1721. if cap(sess.scratchPCM) >= n {
  1722. return sess.scratchPCM[:n]
  1723. }
  1724. sess.scratchPCM = make([]byte, n, n*5/4)
  1725. return sess.scratchPCM
  1726. }
  1727. func convertToListenOnly(sess *streamSession) {
  1728. if sess.wavBuf != nil {
  1729. _ = sess.wavBuf.Flush()
  1730. }
  1731. if sess.wavFile != nil {
  1732. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1733. sess.wavFile.Close()
  1734. }
  1735. sess.wavFile = nil
  1736. sess.wavBuf = nil
  1737. sess.listenOnly = true
  1738. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1739. }
  1740. func closeSession(sess *streamSession, policy *Policy) {
  1741. if sess.listenOnly {
  1742. return
  1743. }
  1744. if sess.wavBuf != nil {
  1745. _ = sess.wavBuf.Flush()
  1746. }
  1747. if sess.wavFile != nil {
  1748. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1749. sess.wavFile.Close()
  1750. sess.wavFile = nil
  1751. sess.wavBuf = nil
  1752. }
  1753. dur := sess.lastFeed.Sub(sess.startTime)
  1754. files := map[string]any{
  1755. "audio": "audio.wav",
  1756. "audio_sample_rate": sess.sampleRate,
  1757. "audio_channels": sess.channels,
  1758. "audio_demod": sess.demodName,
  1759. "recording_mode": "streaming",
  1760. }
  1761. meta := Meta{
  1762. EventID: sess.signalID,
  1763. Start: sess.startTime,
  1764. End: sess.lastFeed,
  1765. CenterHz: sess.centerHz,
  1766. BandwidthHz: sess.bwHz,
  1767. SampleRate: sess.sampleRate,
  1768. SNRDb: sess.snrDb,
  1769. PeakDb: sess.peakDb,
  1770. Class: sess.class,
  1771. DurationMs: dur.Milliseconds(),
  1772. Files: files,
  1773. }
  1774. b, err := json.MarshalIndent(meta, "", " ")
  1775. if err == nil {
  1776. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1777. }
  1778. if policy != nil {
  1779. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1780. }
  1781. }
  1782. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1783. if len(sess.audioSubs) == 0 {
  1784. return
  1785. }
  1786. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1787. tagged := make([]byte, 1+pcmLen)
  1788. tagged[0] = 0x01
  1789. copy(tagged[1:], pcm[:pcmLen])
  1790. alive := sess.audioSubs[:0]
  1791. for _, sub := range sess.audioSubs {
  1792. select {
  1793. case sub.ch <- tagged:
  1794. default:
  1795. st.droppedPCM++
  1796. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1797. if st.telemetry != nil {
  1798. st.telemetry.IncCounter("streamer.pcm.drop", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1799. }
  1800. }
  1801. alive = append(alive, sub)
  1802. }
  1803. sess.audioSubs = alive
  1804. if st.telemetry != nil {
  1805. st.telemetry.SetGauge("streamer.subscribers.count", float64(len(alive)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1806. }
  1807. }
  1808. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1809. if len(st.policy.ClassFilter) == 0 {
  1810. return true
  1811. }
  1812. if cls == nil {
  1813. return false
  1814. }
  1815. for _, f := range st.policy.ClassFilter {
  1816. if strings.EqualFold(f, string(cls.ModType)) {
  1817. return true
  1818. }
  1819. }
  1820. return false
  1821. }
  1822. // ErrNoSession is returned when no matching signal session exists.
  1823. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1824. // ---------------------------------------------------------------------------
  1825. // WAV header helpers
  1826. // ---------------------------------------------------------------------------
  1827. func writeWAVFile(path string, audio []float32, sampleRate int, channels int) error {
  1828. f, err := os.Create(path)
  1829. if err != nil {
  1830. return err
  1831. }
  1832. defer f.Close()
  1833. return writeWAVTo(f, audio, sampleRate, channels)
  1834. }
  1835. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1836. if channels <= 0 {
  1837. channels = 1
  1838. }
  1839. hdr := make([]byte, 44)
  1840. copy(hdr[0:4], "RIFF")
  1841. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1842. copy(hdr[8:12], "WAVE")
  1843. copy(hdr[12:16], "fmt ")
  1844. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1845. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1846. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1847. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1848. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1849. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1850. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1851. copy(hdr[36:40], "data")
  1852. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1853. _, err := f.Write(hdr)
  1854. return err
  1855. }
  1856. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1857. dataSize := uint32(totalSamples * 2)
  1858. var buf [4]byte
  1859. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1860. if _, err := f.Seek(4, 0); err != nil {
  1861. return
  1862. }
  1863. _, _ = f.Write(buf[:])
  1864. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1865. if _, err := f.Seek(24, 0); err != nil {
  1866. return
  1867. }
  1868. _, _ = f.Write(buf[:])
  1869. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1870. if _, err := f.Seek(28, 0); err != nil {
  1871. return
  1872. }
  1873. _, _ = f.Write(buf[:])
  1874. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1875. if _, err := f.Seek(40, 0); err != nil {
  1876. return
  1877. }
  1878. _, _ = f.Write(buf[:])
  1879. }
  1880. // ResetStreams forces all active streaming sessions to discard their FIR states and decimation phases.
  1881. // This is used when the upstream DSP drops samples, creating a hard break in phase continuity.
  1882. func (st *Streamer) ResetStreams() {
  1883. st.mu.Lock()
  1884. defer st.mu.Unlock()
  1885. if st.telemetry != nil {
  1886. st.telemetry.IncCounter("streamer.reset.count", 1, nil)
  1887. st.telemetry.Event("stream_reset", "warn", "stream DSP state reset", nil, map[string]any{"sessions": len(st.sessions)})
  1888. }
  1889. for _, sess := range st.sessions {
  1890. sess.preDemodFIR = nil
  1891. sess.preDemodDecimator = nil
  1892. sess.preDemodDecimPhase = 0
  1893. sess.stereoResampler = nil
  1894. sess.monoResampler = nil
  1895. }
  1896. }