Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

1775 строки
53KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "sdr-wideband-suite/internal/classifier"
  17. "sdr-wideband-suite/internal/demod"
  18. "sdr-wideband-suite/internal/detector"
  19. "sdr-wideband-suite/internal/dsp"
  20. "sdr-wideband-suite/internal/logging"
  21. )
  22. // ---------------------------------------------------------------------------
  23. // streamSession — one open demod session for one signal
  24. // ---------------------------------------------------------------------------
  25. type streamSession struct {
  26. signalID int64
  27. centerHz float64
  28. bwHz float64
  29. snrDb float64
  30. peakDb float64
  31. class *classifier.Classification
  32. startTime time.Time
  33. lastFeed time.Time
  34. playbackMode string
  35. stereoState string
  36. lastAudioTs time.Time
  37. debugDumpStart time.Time
  38. debugDumpUntil time.Time
  39. debugDumpBase string
  40. demodDump []float32
  41. finalDump []float32
  42. lastAudioL float32
  43. lastAudioR float32
  44. prevAudioL float64 // second-to-last L sample for boundary transient detection
  45. lastAudioSet bool
  46. lastDecIQ complex64
  47. prevDecIQ complex64
  48. lastDecIQSet bool
  49. lastDemodL float32
  50. prevDemodL float64
  51. lastDemodSet bool
  52. // listenOnly sessions have no WAV file and no disk I/O.
  53. // They exist solely to feed audio to live-listen subscribers.
  54. listenOnly bool
  55. // Recording state (nil/zero for listen-only sessions)
  56. dir string
  57. wavFile *os.File
  58. wavBuf *bufio.Writer
  59. wavSamples int64
  60. segmentIdx int
  61. sampleRate int // actual output audio sample rate (always streamAudioRate)
  62. channels int
  63. demodName string
  64. // --- Persistent DSP state for click-free streaming ---
  65. // Overlap-save: tail of previous extracted IQ snippet.
  66. // Currently unused for live demod after removing the extra discriminator
  67. // overlap prepend, but kept in DSP snapshot state for compatibility.
  68. overlapIQ []complex64
  69. // De-emphasis IIR state (persists across frames)
  70. deemphL float64
  71. deemphR float64
  72. // Stereo lock state for live WFM streaming
  73. stereoEnabled bool
  74. stereoOnCount int
  75. stereoOffCount int
  76. // Pilot-locked stereo PLL state (19kHz pilot)
  77. pilotPhase float64
  78. pilotFreq float64
  79. pilotAlpha float64
  80. pilotBeta float64
  81. pilotErrAvg float64
  82. pilotI float64
  83. pilotQ float64
  84. pilotLPAlpha float64
  85. // Polyphase resampler (replaces integer-decimate hack)
  86. monoResampler *dsp.Resampler
  87. monoResamplerRate int
  88. stereoResampler *dsp.StereoResampler
  89. stereoResamplerRate int
  90. // AQ-4: Stateful FIR filters for click-free stereo decode
  91. stereoFilterRate int
  92. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  93. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  94. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  95. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  96. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  97. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  98. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  99. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  100. // and avoids per-frame FIR recomputation)
  101. preDemodFIR *dsp.StatefulFIRComplex
  102. preDemodDecim int // cached decimation factor
  103. preDemodRate int // cached snipRate this FIR was built for
  104. preDemodCutoff float64 // cached cutoff
  105. preDemodDecimPhase int // stateful decimation phase (index offset into next frame)
  106. // AQ-2: De-emphasis config (µs, 0 = disabled)
  107. deemphasisUs float64
  108. // Scratch buffers — reused across frames to avoid GC pressure.
  109. // Grown as needed, never shrunk.
  110. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  111. scratchAudio []float32 // for stereo decode intermediates
  112. scratchPCM []byte // for PCM encoding
  113. // live-listen subscribers
  114. audioSubs []audioSub
  115. }
  116. type audioSub struct {
  117. id int64
  118. ch chan []byte
  119. }
  120. type RuntimeSignalInfo struct {
  121. DemodName string
  122. PlaybackMode string
  123. StereoState string
  124. Channels int
  125. SampleRate int
  126. }
  127. // AudioInfo describes the audio format of a live-listen subscription.
  128. // Sent to the WebSocket client as the first message.
  129. type AudioInfo struct {
  130. SampleRate int `json:"sample_rate"`
  131. Channels int `json:"channels"`
  132. Format string `json:"format"` // always "s16le"
  133. DemodName string `json:"demod"`
  134. PlaybackMode string `json:"playback_mode,omitempty"`
  135. StereoState string `json:"stereo_state,omitempty"`
  136. }
  137. const (
  138. streamAudioRate = 48000
  139. resamplerTaps = 32 // taps per polyphase arm — good quality
  140. )
  141. var debugDumpDelay = func() time.Duration {
  142. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DELAY_SECONDS"))
  143. if raw == "" {
  144. return 5 * time.Second
  145. }
  146. v, err := strconv.Atoi(raw)
  147. if err != nil || v < 0 {
  148. return 5 * time.Second
  149. }
  150. return time.Duration(v) * time.Second
  151. }()
  152. var debugDumpDuration = func() time.Duration {
  153. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DURATION_SECONDS"))
  154. if raw == "" {
  155. return 15 * time.Second
  156. }
  157. v, err := strconv.Atoi(raw)
  158. if err != nil || v <= 0 {
  159. return 15 * time.Second
  160. }
  161. return time.Duration(v) * time.Second
  162. }()
  163. var audioDumpEnabled = func() bool {
  164. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_AUDIO_DUMP_ENABLED"))
  165. if raw == "" {
  166. return false
  167. }
  168. v, err := strconv.ParseBool(raw)
  169. if err != nil {
  170. return false
  171. }
  172. return v
  173. }()
  174. var decHeadTrimSamples = func() int {
  175. raw := strings.TrimSpace(os.Getenv("SDR_DEC_HEAD_TRIM"))
  176. if raw == "" {
  177. return 0
  178. }
  179. v, err := strconv.Atoi(raw)
  180. if err != nil || v < 0 {
  181. return 0
  182. }
  183. return v
  184. }()
  185. // ---------------------------------------------------------------------------
  186. // Streamer — manages all active streaming sessions
  187. // ---------------------------------------------------------------------------
  188. type streamFeedItem struct {
  189. signal detector.Signal
  190. snippet []complex64
  191. snipRate int
  192. }
  193. type streamFeedMsg struct {
  194. traceID uint64
  195. items []streamFeedItem
  196. }
  197. type Streamer struct {
  198. mu sync.Mutex
  199. sessions map[int64]*streamSession
  200. policy Policy
  201. centerHz float64
  202. nextSub int64
  203. feedCh chan streamFeedMsg
  204. done chan struct{}
  205. droppedFeed uint64
  206. droppedPCM uint64
  207. lastFeedTS time.Time
  208. lastProcTS time.Time
  209. // pendingListens are subscribers waiting for a matching session.
  210. pendingListens map[int64]*pendingListen
  211. }
  212. type pendingListen struct {
  213. freq float64
  214. bw float64
  215. mode string
  216. ch chan []byte
  217. }
  218. func newStreamer(policy Policy, centerHz float64) *Streamer {
  219. st := &Streamer{
  220. sessions: make(map[int64]*streamSession),
  221. policy: policy,
  222. centerHz: centerHz,
  223. feedCh: make(chan streamFeedMsg, 2),
  224. done: make(chan struct{}),
  225. pendingListens: make(map[int64]*pendingListen),
  226. }
  227. go st.worker()
  228. return st
  229. }
  230. func (st *Streamer) worker() {
  231. for msg := range st.feedCh {
  232. st.processFeed(msg)
  233. }
  234. close(st.done)
  235. }
  236. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  237. st.mu.Lock()
  238. defer st.mu.Unlock()
  239. wasEnabled := st.policy.Enabled
  240. st.policy = policy
  241. st.centerHz = centerHz
  242. // If recording was just disabled, close recording sessions
  243. // but keep listen-only sessions alive.
  244. if wasEnabled && !policy.Enabled {
  245. for id, sess := range st.sessions {
  246. if sess.listenOnly {
  247. continue
  248. }
  249. if len(sess.audioSubs) > 0 {
  250. // Convert to listen-only: close WAV but keep session
  251. convertToListenOnly(sess)
  252. } else {
  253. closeSession(sess, &st.policy)
  254. delete(st.sessions, id)
  255. }
  256. }
  257. }
  258. }
  259. // HasListeners returns true if any sessions have audio subscribers
  260. // or there are pending listen requests. Used by the DSP loop to
  261. // decide whether to feed snippets even when recording is disabled.
  262. func (st *Streamer) HasListeners() bool {
  263. st.mu.Lock()
  264. defer st.mu.Unlock()
  265. return st.hasListenersLocked()
  266. }
  267. func (st *Streamer) hasListenersLocked() bool {
  268. if len(st.pendingListens) > 0 {
  269. return true
  270. }
  271. for _, sess := range st.sessions {
  272. if len(sess.audioSubs) > 0 {
  273. return true
  274. }
  275. }
  276. return false
  277. }
  278. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  279. // Feeds are accepted if:
  280. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  281. // - Any live-listen subscribers exist (listen-only mode)
  282. //
  283. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  284. // data, so items can be passed directly without another copy.
  285. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  286. st.mu.Lock()
  287. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  288. hasListeners := st.hasListenersLocked()
  289. pending := len(st.pendingListens)
  290. debugLiveAudio := st.policy.DebugLiveAudio
  291. now := time.Now()
  292. if !st.lastFeedTS.IsZero() {
  293. gap := now.Sub(st.lastFeedTS)
  294. if gap > 150*time.Millisecond {
  295. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  296. }
  297. }
  298. st.lastFeedTS = now
  299. st.mu.Unlock()
  300. if debugLiveAudio {
  301. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  302. }
  303. if (!recEnabled && !hasListeners) || len(items) == 0 {
  304. return
  305. }
  306. select {
  307. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}:
  308. default:
  309. st.droppedFeed++
  310. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  311. }
  312. }
  313. // processFeed runs in the worker goroutine.
  314. func (st *Streamer) processFeed(msg streamFeedMsg) {
  315. st.mu.Lock()
  316. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  317. hasListeners := st.hasListenersLocked()
  318. now := time.Now()
  319. if !st.lastProcTS.IsZero() {
  320. gap := now.Sub(st.lastProcTS)
  321. if gap > 150*time.Millisecond {
  322. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  323. }
  324. }
  325. st.lastProcTS = now
  326. defer st.mu.Unlock()
  327. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  328. if !recEnabled && !hasListeners {
  329. return
  330. }
  331. seen := make(map[int64]bool, len(msg.items))
  332. for i := range msg.items {
  333. item := &msg.items[i]
  334. sig := &item.signal
  335. seen[sig.ID] = true
  336. if sig.ID == 0 || sig.Class == nil {
  337. continue
  338. }
  339. if len(item.snippet) == 0 || item.snipRate <= 0 {
  340. continue
  341. }
  342. // Decide whether this signal needs a session
  343. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  344. needsListen := st.signalHasListenerLocked(sig)
  345. className := "<nil>"
  346. demodName := ""
  347. if sig.Class != nil {
  348. className = string(sig.Class.ModType)
  349. demodName, _ = resolveDemod(sig)
  350. }
  351. if st.policy.DebugLiveAudio {
  352. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  353. }
  354. if !needsRecording && !needsListen {
  355. continue
  356. }
  357. sess, exists := st.sessions[sig.ID]
  358. requestedMode := ""
  359. for _, pl := range st.pendingListens {
  360. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  361. if m := normalizeRequestedMode(pl.mode); m != "" {
  362. requestedMode = m
  363. break
  364. }
  365. }
  366. }
  367. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  368. for _, sub := range sess.audioSubs {
  369. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  370. }
  371. delete(st.sessions, sig.ID)
  372. sess = nil
  373. exists = false
  374. }
  375. if !exists {
  376. if needsRecording {
  377. s, err := st.openRecordingSession(sig, now)
  378. if err != nil {
  379. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  380. sig.ID, sig.CenterHz/1e6, err)
  381. continue
  382. }
  383. st.sessions[sig.ID] = s
  384. sess = s
  385. } else {
  386. s := st.openListenSession(sig, now)
  387. st.sessions[sig.ID] = s
  388. sess = s
  389. }
  390. // Attach any pending listeners
  391. st.attachPendingListeners(sess)
  392. }
  393. // Update metadata
  394. sess.lastFeed = now
  395. sess.centerHz = sig.CenterHz
  396. sess.bwHz = sig.BWHz
  397. if sig.SNRDb > sess.snrDb {
  398. sess.snrDb = sig.SNRDb
  399. }
  400. if sig.PeakDb > sess.peakDb {
  401. sess.peakDb = sig.PeakDb
  402. }
  403. if sig.Class != nil {
  404. sess.class = sig.Class
  405. }
  406. // Demod with persistent state
  407. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  408. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  409. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  410. if len(audio) == 0 {
  411. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  412. }
  413. if len(audio) > 0 {
  414. if sess.wavSamples == 0 && audioRate > 0 {
  415. sess.sampleRate = audioRate
  416. }
  417. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  418. pcmLen := len(audio) * 2
  419. pcm := sess.growPCM(pcmLen)
  420. for k, s := range audio {
  421. v := int16(clip(s * 32767))
  422. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  423. }
  424. if !sess.listenOnly && sess.wavBuf != nil {
  425. n, err := sess.wavBuf.Write(pcm)
  426. if err != nil {
  427. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  428. } else {
  429. sess.wavSamples += int64(n / 2)
  430. }
  431. }
  432. // Gap logging for live-audio sessions + transient click detector
  433. if len(sess.audioSubs) > 0 {
  434. if !sess.lastAudioTs.IsZero() {
  435. gap := time.Since(sess.lastAudioTs)
  436. if gap > 150*time.Millisecond {
  437. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  438. }
  439. }
  440. // Transient click detector: finds short impulses (1-3 samples)
  441. // that deviate sharply from the local signal trend.
  442. // A click looks like: ...smooth... SPIKE ...smooth...
  443. // Normal FM audio has large deltas too, but they follow
  444. // a continuous curve. A click has high |d2/dt2| (acceleration).
  445. //
  446. // Method: second-derivative detector. For each sample triplet
  447. // (a, b, c), compute |2b - a - c| which is the discrete
  448. // second derivative magnitude. High values = transient spike.
  449. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  450. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  451. stride := sess.channels
  452. if stride < 1 {
  453. stride = 1
  454. }
  455. nFrames := len(audio) / stride
  456. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  457. if sess.lastAudioSet && nFrames >= 1 {
  458. // second derivative across boundary: |2*last - prevLast - first|
  459. first := float64(audio[0])
  460. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  461. if d2 > 0.15 {
  462. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  463. }
  464. }
  465. // Intra-frame transient scan (L channel only for performance)
  466. nClicks := 0
  467. maxD2 := float64(0)
  468. maxD2Pos := 0
  469. for k := 1; k < nFrames-1; k++ {
  470. a := float64(audio[(k-1)*stride])
  471. b := float64(audio[k*stride])
  472. c := float64(audio[(k+1)*stride])
  473. d2 := math.Abs(2*b - a - c)
  474. if d2 > maxD2 {
  475. maxD2 = d2
  476. maxD2Pos = k
  477. }
  478. if d2 > 0.15 {
  479. nClicks++
  480. }
  481. }
  482. if nClicks > 0 {
  483. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  484. }
  485. // Store last two samples for next frame's boundary check
  486. if nFrames >= 2 {
  487. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  488. sess.lastAudioL = audio[(nFrames-1)*stride]
  489. if stride > 1 {
  490. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  491. }
  492. } else if nFrames == 1 {
  493. sess.prevAudioL = float64(sess.lastAudioL)
  494. sess.lastAudioL = audio[0]
  495. if stride > 1 && len(audio) >= 2 {
  496. sess.lastAudioR = audio[1]
  497. }
  498. }
  499. sess.lastAudioSet = true
  500. }
  501. sess.lastAudioTs = time.Now()
  502. }
  503. st.fanoutPCM(sess, pcm, pcmLen)
  504. }
  505. // Segment split (recording sessions only)
  506. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  507. segIdx := sess.segmentIdx + 1
  508. oldSubs := sess.audioSubs
  509. oldState := sess.captureDSPState()
  510. sess.audioSubs = nil
  511. closeSession(sess, &st.policy)
  512. s, err := st.openRecordingSession(sig, now)
  513. if err != nil {
  514. delete(st.sessions, sig.ID)
  515. continue
  516. }
  517. s.segmentIdx = segIdx
  518. s.audioSubs = oldSubs
  519. s.restoreDSPState(oldState)
  520. st.sessions[sig.ID] = s
  521. }
  522. }
  523. // Close sessions for disappeared signals (with grace period)
  524. for id, sess := range st.sessions {
  525. if seen[id] {
  526. continue
  527. }
  528. gracePeriod := 3 * time.Second
  529. if sess.listenOnly {
  530. gracePeriod = 5 * time.Second
  531. }
  532. if now.Sub(sess.lastFeed) > gracePeriod {
  533. for _, sub := range sess.audioSubs {
  534. close(sub.ch)
  535. }
  536. sess.audioSubs = nil
  537. if !sess.listenOnly {
  538. closeSession(sess, &st.policy)
  539. }
  540. delete(st.sessions, id)
  541. }
  542. }
  543. }
  544. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  545. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  546. if st.policy.DebugLiveAudio {
  547. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  548. }
  549. return true
  550. }
  551. for subID, pl := range st.pendingListens {
  552. delta := math.Abs(sig.CenterHz - pl.freq)
  553. if delta < 200000 {
  554. if st.policy.DebugLiveAudio {
  555. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  556. }
  557. return true
  558. }
  559. }
  560. return false
  561. }
  562. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  563. for subID, pl := range st.pendingListens {
  564. requestedMode := normalizeRequestedMode(pl.mode)
  565. if requestedMode != "" && sess.demodName != requestedMode {
  566. continue
  567. }
  568. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  569. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  570. delete(st.pendingListens, subID)
  571. // Send updated audio_info now that we know the real session params.
  572. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  573. infoJSON, _ := json.Marshal(sess.audioInfo())
  574. tagged := make([]byte, 1+len(infoJSON))
  575. tagged[0] = 0x00 // tag: audio_info
  576. copy(tagged[1:], infoJSON)
  577. select {
  578. case pl.ch <- tagged:
  579. default:
  580. }
  581. if audioDumpEnabled {
  582. now := time.Now()
  583. sess.debugDumpStart = now.Add(debugDumpDelay)
  584. sess.debugDumpUntil = sess.debugDumpStart.Add(debugDumpDuration)
  585. sess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", sess.signalID, now.Format("20060102-150405")))
  586. sess.demodDump = nil
  587. sess.finalDump = nil
  588. }
  589. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  590. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  591. if audioDumpEnabled {
  592. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", sess.signalID, sess.debugDumpStart.Format(time.RFC3339), sess.debugDumpUntil.Format(time.RFC3339))
  593. }
  594. }
  595. }
  596. }
  597. // CloseAll finalises all sessions and stops the worker goroutine.
  598. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  599. st.mu.Lock()
  600. defer st.mu.Unlock()
  601. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  602. for _, sess := range st.sessions {
  603. out[sess.signalID] = RuntimeSignalInfo{
  604. DemodName: sess.demodName,
  605. PlaybackMode: sess.playbackMode,
  606. StereoState: sess.stereoState,
  607. Channels: sess.channels,
  608. SampleRate: sess.sampleRate,
  609. }
  610. }
  611. return out
  612. }
  613. func (st *Streamer) CloseAll() {
  614. close(st.feedCh)
  615. <-st.done
  616. st.mu.Lock()
  617. defer st.mu.Unlock()
  618. for id, sess := range st.sessions {
  619. for _, sub := range sess.audioSubs {
  620. close(sub.ch)
  621. }
  622. sess.audioSubs = nil
  623. if !sess.listenOnly {
  624. closeSession(sess, &st.policy)
  625. }
  626. delete(st.sessions, id)
  627. }
  628. for _, pl := range st.pendingListens {
  629. close(pl.ch)
  630. }
  631. st.pendingListens = nil
  632. }
  633. // ActiveSessions returns the number of open streaming sessions.
  634. func (st *Streamer) ActiveSessions() int {
  635. st.mu.Lock()
  636. defer st.mu.Unlock()
  637. return len(st.sessions)
  638. }
  639. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  640. //
  641. // LL-2: Returns AudioInfo with correct channels and sample rate.
  642. // LL-3: Returns error only on hard failures (nil streamer etc).
  643. //
  644. // If a matching session exists, attaches immediately. Otherwise, the
  645. // subscriber is held as "pending" and will be attached when a matching
  646. // signal appears in the next DSP frame.
  647. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  648. ch := make(chan []byte, 64)
  649. st.mu.Lock()
  650. defer st.mu.Unlock()
  651. st.nextSub++
  652. subID := st.nextSub
  653. requestedMode := normalizeRequestedMode(mode)
  654. // Try to find a matching session
  655. var bestSess *streamSession
  656. bestDist := math.MaxFloat64
  657. for _, sess := range st.sessions {
  658. if requestedMode != "" && sess.demodName != requestedMode {
  659. continue
  660. }
  661. d := math.Abs(sess.centerHz - freq)
  662. if d < bestDist {
  663. bestDist = d
  664. bestSess = sess
  665. }
  666. }
  667. if bestSess != nil && bestDist < 200000 {
  668. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  669. if audioDumpEnabled {
  670. now := time.Now()
  671. bestSess.debugDumpStart = now.Add(debugDumpDelay)
  672. bestSess.debugDumpUntil = bestSess.debugDumpStart.Add(debugDumpDuration)
  673. bestSess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", bestSess.signalID, now.Format("20060102-150405")))
  674. bestSess.demodDump = nil
  675. bestSess.finalDump = nil
  676. }
  677. info := bestSess.audioInfo()
  678. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  679. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  680. if audioDumpEnabled {
  681. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", bestSess.signalID, bestSess.debugDumpStart.Format(time.RFC3339), bestSess.debugDumpUntil.Format(time.RFC3339))
  682. }
  683. return subID, ch, info, nil
  684. }
  685. // No matching session yet — add as pending listener
  686. st.pendingListens[subID] = &pendingListen{
  687. freq: freq,
  688. bw: bw,
  689. mode: mode,
  690. ch: ch,
  691. }
  692. info := defaultAudioInfoForMode(mode)
  693. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  694. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  695. return subID, ch, info, nil
  696. }
  697. // UnsubscribeAudio removes a live-listen subscriber.
  698. func (st *Streamer) UnsubscribeAudio(subID int64) {
  699. st.mu.Lock()
  700. defer st.mu.Unlock()
  701. if pl, ok := st.pendingListens[subID]; ok {
  702. close(pl.ch)
  703. delete(st.pendingListens, subID)
  704. return
  705. }
  706. for _, sess := range st.sessions {
  707. for i, sub := range sess.audioSubs {
  708. if sub.id == subID {
  709. close(sub.ch)
  710. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  711. return
  712. }
  713. }
  714. }
  715. }
  716. // ---------------------------------------------------------------------------
  717. // Session: stateful extraction + demod
  718. // ---------------------------------------------------------------------------
  719. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  720. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  721. // output with zero transient artifacts.
  722. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  723. if len(snippet) == 0 || snipRate <= 0 {
  724. return nil, 0
  725. }
  726. isWFMStereo := sess.demodName == "WFM_STEREO"
  727. isWFM := sess.demodName == "WFM" || isWFMStereo
  728. demodName := sess.demodName
  729. if isWFMStereo {
  730. demodName = "WFM"
  731. }
  732. d := demod.Get(demodName)
  733. if d == nil {
  734. d = demod.Get("NFM")
  735. }
  736. if d == nil {
  737. return nil, 0
  738. }
  739. // The extra 1-sample discriminator overlap prepend was removed after it was
  740. // shown to shift the downstream decimation phase and create heavy click
  741. // artifacts in steady-state streaming/recording. The upstream extraction path
  742. // and the stateful FIR/decimation stages already provide continuity.
  743. fullSnip := snippet
  744. overlapApplied := false
  745. prevTailValid := false
  746. if logging.EnabledCategory("prefir") && len(fullSnip) > 0 {
  747. probeN := 64
  748. if len(fullSnip) < probeN {
  749. probeN = len(fullSnip)
  750. }
  751. minPreMag := math.MaxFloat64
  752. minPreIdx := 0
  753. maxPreStep := 0.0
  754. maxPreStepIdx := 0
  755. for i := 0; i < probeN; i++ {
  756. v := fullSnip[i]
  757. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  758. if mag < minPreMag {
  759. minPreMag = mag
  760. minPreIdx = i
  761. }
  762. if i > 0 {
  763. p := fullSnip[i-1]
  764. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  765. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  766. step := math.Abs(math.Atan2(num, den))
  767. if step > maxPreStep {
  768. maxPreStep = step
  769. maxPreStepIdx = i - 1
  770. }
  771. }
  772. }
  773. logging.Debug("prefir", "pre_fir_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "snip_len", len(fullSnip))
  774. if minPreMag < 0.18 {
  775. logging.Warn("prefir", "pre_fir_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx)
  776. }
  777. if maxPreStep > 1.5 {
  778. logging.Warn("prefir", "pre_fir_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "min_mag", minPreMag, "min_idx", minPreIdx)
  779. }
  780. }
  781. // --- Stateful anti-alias FIR + decimation to demod rate ---
  782. demodRate := d.OutputSampleRate()
  783. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  784. if decim1 < 1 {
  785. decim1 = 1
  786. }
  787. // WFM override: force decim1=2 (256kHz) instead of round(512k/192k)=3 (170kHz).
  788. // At decim1=3, Nyquist is 85kHz which clips FM broadcast ±75kHz deviation.
  789. // At decim1=2, Nyquist is 128kHz → full FM deviation + stereo pilot + guard band.
  790. // Bonus: 256000→48000 resampler ratio is L=3/M=16 (96 taps, 1kB) instead of
  791. // the pathological L=24000/M=85333 (768k taps, 6MB) from 170666→48000.
  792. if isWFM && decim1 > 2 && snipRate/2 >= 200000 {
  793. decim1 = 2
  794. }
  795. actualDemodRate := snipRate / decim1
  796. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  797. var dec []complex64
  798. if decim1 > 1 {
  799. // FIR cutoff: for WFM, use 90kHz (above ±75kHz FM deviation + guard).
  800. // For NFM/other: use standard Nyquist*0.8 cutoff.
  801. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  802. if isWFM {
  803. cutoff = 90000
  804. }
  805. // Lazy-init or reinit stateful FIR if parameters changed
  806. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  807. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  808. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  809. sess.preDemodRate = snipRate
  810. sess.preDemodCutoff = cutoff
  811. sess.preDemodDecim = decim1
  812. sess.preDemodDecimPhase = 0
  813. }
  814. decimPhaseBefore := sess.preDemodDecimPhase
  815. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  816. dec = dsp.DecimateStateful(filtered, decim1, &sess.preDemodDecimPhase)
  817. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase)
  818. } else {
  819. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0)
  820. dec = fullSnip
  821. }
  822. if decHeadTrimSamples > 0 && decHeadTrimSamples < len(dec) {
  823. logging.Warn("boundary", "dec_head_trim_applied", "signal", sess.signalID, "trim", decHeadTrimSamples, "before_len", len(dec))
  824. dec = dec[decHeadTrimSamples:]
  825. }
  826. if logging.EnabledCategory("boundary") && len(dec) > 0 {
  827. first := dec[0]
  828. if sess.lastDecIQSet {
  829. d2Re := math.Abs(2*float64(real(sess.lastDecIQ)) - float64(real(sess.prevDecIQ)) - float64(real(first)))
  830. d2Im := math.Abs(2*float64(imag(sess.lastDecIQ)) - float64(imag(sess.prevDecIQ)) - float64(imag(first)))
  831. d2Mag := math.Hypot(d2Re, d2Im)
  832. if d2Mag > 0.15 {
  833. logging.Warn("boundary", "dec_iq_boundary", "signal", sess.signalID, "d2", d2Mag)
  834. }
  835. }
  836. headN := 16
  837. if len(dec) < headN {
  838. headN = len(dec)
  839. }
  840. tailN := 16
  841. if len(dec) < tailN {
  842. tailN = len(dec)
  843. }
  844. var headSum, tailSum, minMag, maxMag float64
  845. minMag = math.MaxFloat64
  846. for i, v := range dec {
  847. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  848. if mag < minMag {
  849. minMag = mag
  850. }
  851. if mag > maxMag {
  852. maxMag = mag
  853. }
  854. if i < headN {
  855. headSum += mag
  856. }
  857. }
  858. for i := len(dec) - tailN; i < len(dec); i++ {
  859. if i >= 0 {
  860. v := dec[i]
  861. tailSum += math.Hypot(float64(real(v)), float64(imag(v)))
  862. }
  863. }
  864. headAvg := 0.0
  865. if headN > 0 {
  866. headAvg = headSum / float64(headN)
  867. }
  868. tailAvg := 0.0
  869. if tailN > 0 {
  870. tailAvg = tailSum / float64(tailN)
  871. }
  872. logging.Debug("boundary", "dec_iq_meter", "signal", sess.signalID, "len", len(dec), "head_avg", headAvg, "tail_avg", tailAvg, "min_mag", minMag, "max_mag", maxMag)
  873. if tailAvg > 0 {
  874. ratio := headAvg / tailAvg
  875. if ratio < 0.75 || ratio > 1.25 {
  876. logging.Warn("boundary", "dec_iq_head_tail_skew", "signal", sess.signalID, "head_avg", headAvg, "tail_avg", tailAvg, "ratio", ratio)
  877. }
  878. }
  879. probeN := 64
  880. if len(dec) < probeN {
  881. probeN = len(dec)
  882. }
  883. minHeadMag := math.MaxFloat64
  884. minHeadIdx := 0
  885. maxHeadStep := 0.0
  886. maxHeadStepIdx := 0
  887. for i := 0; i < probeN; i++ {
  888. v := dec[i]
  889. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  890. if mag < minHeadMag {
  891. minHeadMag = mag
  892. minHeadIdx = i
  893. }
  894. if i > 0 {
  895. p := dec[i-1]
  896. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  897. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  898. step := math.Abs(math.Atan2(num, den))
  899. if step > maxHeadStep {
  900. maxHeadStep = step
  901. maxHeadStepIdx = i - 1
  902. }
  903. }
  904. }
  905. logging.Debug("boundary", "dec_iq_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  906. if minHeadMag < 0.18 {
  907. logging.Warn("boundary", "dec_iq_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  908. }
  909. if maxHeadStep > 1.5 {
  910. logging.Warn("boundary", "dec_iq_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx, "min_mag", minHeadMag, "min_idx", minHeadIdx)
  911. }
  912. if len(dec) >= 2 {
  913. sess.prevDecIQ = dec[len(dec)-2]
  914. sess.lastDecIQ = dec[len(dec)-1]
  915. } else {
  916. sess.prevDecIQ = sess.lastDecIQ
  917. sess.lastDecIQ = dec[0]
  918. }
  919. sess.lastDecIQSet = true
  920. }
  921. // --- FM/AM/etc Demod ---
  922. audio := d.Demod(dec, actualDemodRate)
  923. if len(audio) == 0 {
  924. return nil, 0
  925. }
  926. if logging.EnabledCategory("boundary") {
  927. stride := d.Channels()
  928. if stride < 1 {
  929. stride = 1
  930. }
  931. nFrames := len(audio) / stride
  932. if nFrames > 0 {
  933. first := float64(audio[0])
  934. if sess.lastDemodSet {
  935. d2 := math.Abs(2*float64(sess.lastDemodL) - sess.prevDemodL - first)
  936. if d2 > 0.15 {
  937. logging.Warn("boundary", "demod_boundary", "signal", sess.signalID, "d2", d2)
  938. }
  939. }
  940. if nFrames >= 2 {
  941. sess.prevDemodL = float64(audio[(nFrames-2)*stride])
  942. sess.lastDemodL = audio[(nFrames-1)*stride]
  943. } else {
  944. sess.prevDemodL = float64(sess.lastDemodL)
  945. sess.lastDemodL = audio[0]
  946. }
  947. sess.lastDemodSet = true
  948. }
  949. }
  950. logging.Debug("boundary", "audio_path", "signal", sess.signalID, "demod", demodName, "actual_rate", actualDemodRate, "audio_len", len(audio), "channels", d.Channels(), "overlap_applied", overlapApplied, "prev_tail_valid", prevTailValid)
  951. shouldDump := !sess.debugDumpStart.IsZero() && !sess.debugDumpUntil.IsZero()
  952. if shouldDump {
  953. now := time.Now()
  954. shouldDump = !now.Before(sess.debugDumpStart) && now.Before(sess.debugDumpUntil)
  955. }
  956. if shouldDump {
  957. sess.demodDump = append(sess.demodDump, audio...)
  958. }
  959. // --- Stateful stereo decode with conservative lock/hysteresis ---
  960. channels := 1
  961. if isWFMStereo {
  962. sess.playbackMode = "WFM_STEREO"
  963. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  964. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  965. if locked {
  966. sess.stereoOnCount++
  967. sess.stereoOffCount = 0
  968. if sess.stereoOnCount >= 4 {
  969. sess.stereoEnabled = true
  970. }
  971. } else {
  972. sess.stereoOnCount = 0
  973. sess.stereoOffCount++
  974. if sess.stereoOffCount >= 10 {
  975. sess.stereoEnabled = false
  976. }
  977. }
  978. prevPlayback := sess.playbackMode
  979. prevStereo := sess.stereoState
  980. if sess.stereoEnabled && len(stereoAudio) > 0 {
  981. sess.stereoState = "locked"
  982. audio = stereoAudio
  983. } else {
  984. sess.stereoState = "mono-fallback"
  985. dual := make([]float32, len(audio)*2)
  986. for i, s := range audio {
  987. dual[i*2] = s
  988. dual[i*2+1] = s
  989. }
  990. audio = dual
  991. }
  992. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  993. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  994. }
  995. }
  996. // --- Polyphase resample to exact 48kHz ---
  997. if actualDemodRate != streamAudioRate {
  998. if channels > 1 {
  999. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  1000. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  1001. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1002. sess.stereoResamplerRate = actualDemodRate
  1003. }
  1004. audio = sess.stereoResampler.Process(audio)
  1005. } else {
  1006. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  1007. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  1008. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1009. sess.monoResamplerRate = actualDemodRate
  1010. }
  1011. audio = sess.monoResampler.Process(audio)
  1012. }
  1013. }
  1014. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  1015. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  1016. tau := sess.deemphasisUs * 1e-6
  1017. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  1018. if channels > 1 {
  1019. nFrames := len(audio) / channels
  1020. yL, yR := sess.deemphL, sess.deemphR
  1021. for i := 0; i < nFrames; i++ {
  1022. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  1023. audio[i*2] = float32(yL)
  1024. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  1025. audio[i*2+1] = float32(yR)
  1026. }
  1027. sess.deemphL, sess.deemphR = yL, yR
  1028. } else {
  1029. y := sess.deemphL
  1030. for i := range audio {
  1031. y = alpha*y + (1-alpha)*float64(audio[i])
  1032. audio[i] = float32(y)
  1033. }
  1034. sess.deemphL = y
  1035. }
  1036. }
  1037. if isWFM {
  1038. for i := range audio {
  1039. audio[i] *= 0.35
  1040. }
  1041. }
  1042. if shouldDump {
  1043. sess.finalDump = append(sess.finalDump, audio...)
  1044. } else if !sess.debugDumpUntil.IsZero() && time.Now().After(sess.debugDumpUntil) && sess.debugDumpBase != "" {
  1045. _ = os.MkdirAll(filepath.Dir(sess.debugDumpBase), 0o755)
  1046. if len(sess.demodDump) > 0 {
  1047. _ = writeWAVFile(sess.debugDumpBase+"-demod.wav", sess.demodDump, actualDemodRate, d.Channels())
  1048. }
  1049. if len(sess.finalDump) > 0 {
  1050. _ = writeWAVFile(sess.debugDumpBase+"-final.wav", sess.finalDump, streamAudioRate, channels)
  1051. }
  1052. logging.Warn("boundary", "debug_audio_dump_window", "signal", sess.signalID, "base", sess.debugDumpBase)
  1053. sess.debugDumpBase = ""
  1054. sess.demodDump = nil
  1055. sess.finalDump = nil
  1056. sess.debugDumpStart = time.Time{}
  1057. sess.debugDumpUntil = time.Time{}
  1058. }
  1059. return audio, streamAudioRate
  1060. }
  1061. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  1062. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  1063. // loopBW is in Hz, sampleRate in samples/sec.
  1064. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  1065. if sampleRate <= 0 || loopBW <= 0 {
  1066. return 0, 0
  1067. }
  1068. bl := loopBW / float64(sampleRate)
  1069. theta := bl / (damping + 0.25/damping)
  1070. d := 1 + 2*damping*theta + theta*theta
  1071. alpha := (4 * damping * theta) / d
  1072. beta := (4 * theta * theta) / d
  1073. return alpha, beta
  1074. }
  1075. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  1076. // Uses persistent FIR filter state across frames for click-free stereo.
  1077. // Reuses session scratch buffers to minimize allocations.
  1078. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  1079. if len(mono) == 0 || sampleRate <= 0 {
  1080. return nil, false
  1081. }
  1082. n := len(mono)
  1083. // Rebuild rate-dependent stereo filters when sampleRate changes
  1084. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  1085. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  1086. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  1087. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  1088. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  1089. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  1090. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  1091. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  1092. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  1093. sess.stereoFilterRate = sampleRate
  1094. // Initialize PLL for 19kHz pilot tracking.
  1095. sess.pilotPhase = 0
  1096. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  1097. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  1098. sess.pilotErrAvg = 0
  1099. sess.pilotI = 0
  1100. sess.pilotQ = 0
  1101. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  1102. }
  1103. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  1104. scratch := sess.growAudio(n * 5)
  1105. lpr := scratch[:n]
  1106. bpfLR := scratch[n : 2*n]
  1107. lr := scratch[2*n : 3*n]
  1108. work1 := scratch[3*n : 4*n]
  1109. work2 := scratch[4*n : 5*n]
  1110. sess.stereoLPF.ProcessInto(mono, lpr)
  1111. // 23-53kHz bandpass for L-R DSB-SC.
  1112. sess.stereoBPHi.ProcessInto(mono, work1)
  1113. sess.stereoBPLo.ProcessInto(mono, work2)
  1114. for i := 0; i < n; i++ {
  1115. bpfLR[i] = work1[i] - work2[i]
  1116. }
  1117. // 19kHz pilot bandpass for PLL.
  1118. sess.pilotLPFHi.ProcessInto(mono, work1)
  1119. sess.pilotLPFLo.ProcessInto(mono, work2)
  1120. for i := 0; i < n; i++ {
  1121. work1[i] = work1[i] - work2[i]
  1122. }
  1123. pilot := work1
  1124. phase := sess.pilotPhase
  1125. freq := sess.pilotFreq
  1126. alpha := sess.pilotAlpha
  1127. beta := sess.pilotBeta
  1128. iState := sess.pilotI
  1129. qState := sess.pilotQ
  1130. lpAlpha := sess.pilotLPAlpha
  1131. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  1132. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  1133. var pilotPower float64
  1134. var totalPower float64
  1135. var errSum float64
  1136. for i := 0; i < n; i++ {
  1137. p := float64(pilot[i])
  1138. sinP, cosP := math.Sincos(phase)
  1139. iMix := p * cosP
  1140. qMix := p * -sinP
  1141. iState += lpAlpha * (iMix - iState)
  1142. qState += lpAlpha * (qMix - qState)
  1143. err := math.Atan2(qState, iState)
  1144. freq += beta * err
  1145. if freq < minFreq {
  1146. freq = minFreq
  1147. } else if freq > maxFreq {
  1148. freq = maxFreq
  1149. }
  1150. phase += freq + alpha*err
  1151. if phase > 2*math.Pi {
  1152. phase -= 2 * math.Pi
  1153. } else if phase < 0 {
  1154. phase += 2 * math.Pi
  1155. }
  1156. totalPower += float64(mono[i]) * float64(mono[i])
  1157. pilotPower += p * p
  1158. errSum += math.Abs(err)
  1159. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  1160. }
  1161. sess.pilotPhase = phase
  1162. sess.pilotFreq = freq
  1163. sess.pilotI = iState
  1164. sess.pilotQ = qState
  1165. blockErr := errSum / float64(n)
  1166. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  1167. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  1168. pilotRatio := 0.0
  1169. if totalPower > 0 {
  1170. pilotRatio = pilotPower / totalPower
  1171. }
  1172. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  1173. // Lock heuristics: pilot power fraction and PLL phase error stability.
  1174. // Pilot power is a small but stable fraction of composite energy; require
  1175. // a modest floor plus PLL settling to avoid flapping in noise.
  1176. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  1177. out := make([]float32, n*2)
  1178. for i := 0; i < n; i++ {
  1179. out[i*2] = 0.5 * (lpr[i] + lr[i])
  1180. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  1181. }
  1182. return out, locked
  1183. }
  1184. // dspStateSnapshot captures persistent DSP state for segment splits.
  1185. type dspStateSnapshot struct {
  1186. overlapIQ []complex64
  1187. deemphL float64
  1188. deemphR float64
  1189. pilotPhase float64
  1190. pilotFreq float64
  1191. pilotAlpha float64
  1192. pilotBeta float64
  1193. pilotErrAvg float64
  1194. pilotI float64
  1195. pilotQ float64
  1196. pilotLPAlpha float64
  1197. monoResampler *dsp.Resampler
  1198. monoResamplerRate int
  1199. stereoResampler *dsp.StereoResampler
  1200. stereoResamplerRate int
  1201. stereoLPF *dsp.StatefulFIRReal
  1202. stereoFilterRate int
  1203. stereoBPHi *dsp.StatefulFIRReal
  1204. stereoBPLo *dsp.StatefulFIRReal
  1205. stereoLRLPF *dsp.StatefulFIRReal
  1206. stereoAALPF *dsp.StatefulFIRReal
  1207. pilotLPFHi *dsp.StatefulFIRReal
  1208. pilotLPFLo *dsp.StatefulFIRReal
  1209. preDemodFIR *dsp.StatefulFIRComplex
  1210. preDemodDecim int
  1211. preDemodRate int
  1212. preDemodCutoff float64
  1213. preDemodDecimPhase int
  1214. }
  1215. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  1216. return dspStateSnapshot{
  1217. overlapIQ: sess.overlapIQ,
  1218. deemphL: sess.deemphL,
  1219. deemphR: sess.deemphR,
  1220. pilotPhase: sess.pilotPhase,
  1221. pilotFreq: sess.pilotFreq,
  1222. pilotAlpha: sess.pilotAlpha,
  1223. pilotBeta: sess.pilotBeta,
  1224. pilotErrAvg: sess.pilotErrAvg,
  1225. pilotI: sess.pilotI,
  1226. pilotQ: sess.pilotQ,
  1227. pilotLPAlpha: sess.pilotLPAlpha,
  1228. monoResampler: sess.monoResampler,
  1229. monoResamplerRate: sess.monoResamplerRate,
  1230. stereoResampler: sess.stereoResampler,
  1231. stereoResamplerRate: sess.stereoResamplerRate,
  1232. stereoLPF: sess.stereoLPF,
  1233. stereoFilterRate: sess.stereoFilterRate,
  1234. stereoBPHi: sess.stereoBPHi,
  1235. stereoBPLo: sess.stereoBPLo,
  1236. stereoLRLPF: sess.stereoLRLPF,
  1237. stereoAALPF: sess.stereoAALPF,
  1238. pilotLPFHi: sess.pilotLPFHi,
  1239. pilotLPFLo: sess.pilotLPFLo,
  1240. preDemodFIR: sess.preDemodFIR,
  1241. preDemodDecim: sess.preDemodDecim,
  1242. preDemodRate: sess.preDemodRate,
  1243. preDemodCutoff: sess.preDemodCutoff,
  1244. preDemodDecimPhase: sess.preDemodDecimPhase,
  1245. }
  1246. }
  1247. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  1248. sess.overlapIQ = s.overlapIQ
  1249. sess.deemphL = s.deemphL
  1250. sess.deemphR = s.deemphR
  1251. sess.pilotPhase = s.pilotPhase
  1252. sess.pilotFreq = s.pilotFreq
  1253. sess.pilotAlpha = s.pilotAlpha
  1254. sess.pilotBeta = s.pilotBeta
  1255. sess.pilotErrAvg = s.pilotErrAvg
  1256. sess.pilotI = s.pilotI
  1257. sess.pilotQ = s.pilotQ
  1258. sess.pilotLPAlpha = s.pilotLPAlpha
  1259. sess.monoResampler = s.monoResampler
  1260. sess.monoResamplerRate = s.monoResamplerRate
  1261. sess.stereoResampler = s.stereoResampler
  1262. sess.stereoResamplerRate = s.stereoResamplerRate
  1263. sess.stereoLPF = s.stereoLPF
  1264. sess.stereoFilterRate = s.stereoFilterRate
  1265. sess.stereoBPHi = s.stereoBPHi
  1266. sess.stereoBPLo = s.stereoBPLo
  1267. sess.stereoLRLPF = s.stereoLRLPF
  1268. sess.stereoAALPF = s.stereoAALPF
  1269. sess.pilotLPFHi = s.pilotLPFHi
  1270. sess.pilotLPFLo = s.pilotLPFLo
  1271. sess.preDemodFIR = s.preDemodFIR
  1272. sess.preDemodDecim = s.preDemodDecim
  1273. sess.preDemodRate = s.preDemodRate
  1274. sess.preDemodCutoff = s.preDemodCutoff
  1275. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1276. }
  1277. // ---------------------------------------------------------------------------
  1278. // Session management helpers
  1279. // ---------------------------------------------------------------------------
  1280. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1281. outputDir := st.policy.OutputDir
  1282. if outputDir == "" {
  1283. outputDir = "data/recordings"
  1284. }
  1285. demodName, channels := resolveDemod(sig)
  1286. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1287. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1288. dir := filepath.Join(outputDir, dirName)
  1289. if err := os.MkdirAll(dir, 0o755); err != nil {
  1290. return nil, err
  1291. }
  1292. wavPath := filepath.Join(dir, "audio.wav")
  1293. f, err := os.Create(wavPath)
  1294. if err != nil {
  1295. return nil, err
  1296. }
  1297. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1298. f.Close()
  1299. return nil, err
  1300. }
  1301. playbackMode, stereoState := initialPlaybackState(demodName)
  1302. sess := &streamSession{
  1303. signalID: sig.ID,
  1304. centerHz: sig.CenterHz,
  1305. bwHz: sig.BWHz,
  1306. snrDb: sig.SNRDb,
  1307. peakDb: sig.PeakDb,
  1308. class: sig.Class,
  1309. startTime: now,
  1310. lastFeed: now,
  1311. dir: dir,
  1312. wavFile: f,
  1313. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1314. sampleRate: streamAudioRate,
  1315. channels: channels,
  1316. demodName: demodName,
  1317. playbackMode: playbackMode,
  1318. stereoState: stereoState,
  1319. deemphasisUs: st.policy.DeemphasisUs,
  1320. }
  1321. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1322. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1323. return sess, nil
  1324. }
  1325. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1326. demodName, channels := resolveDemod(sig)
  1327. for _, pl := range st.pendingListens {
  1328. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1329. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1330. demodName = requested
  1331. if demodName == "WFM_STEREO" {
  1332. channels = 2
  1333. } else if d := demod.Get(demodName); d != nil {
  1334. channels = d.Channels()
  1335. } else {
  1336. channels = 1
  1337. }
  1338. break
  1339. }
  1340. }
  1341. }
  1342. playbackMode, stereoState := initialPlaybackState(demodName)
  1343. sess := &streamSession{
  1344. signalID: sig.ID,
  1345. centerHz: sig.CenterHz,
  1346. bwHz: sig.BWHz,
  1347. snrDb: sig.SNRDb,
  1348. peakDb: sig.PeakDb,
  1349. class: sig.Class,
  1350. startTime: now,
  1351. lastFeed: now,
  1352. listenOnly: true,
  1353. sampleRate: streamAudioRate,
  1354. channels: channels,
  1355. demodName: demodName,
  1356. playbackMode: playbackMode,
  1357. stereoState: stereoState,
  1358. deemphasisUs: st.policy.DeemphasisUs,
  1359. }
  1360. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1361. sig.ID, sig.CenterHz/1e6, demodName)
  1362. return sess
  1363. }
  1364. func resolveDemod(sig *detector.Signal) (string, int) {
  1365. demodName := "NFM"
  1366. if sig.Class != nil {
  1367. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1368. demodName = n
  1369. }
  1370. }
  1371. channels := 1
  1372. if demodName == "WFM_STEREO" {
  1373. channels = 2
  1374. } else if d := demod.Get(demodName); d != nil {
  1375. channels = d.Channels()
  1376. }
  1377. return demodName, channels
  1378. }
  1379. func initialPlaybackState(demodName string) (string, string) {
  1380. playbackMode := demodName
  1381. stereoState := "mono"
  1382. if demodName == "WFM_STEREO" {
  1383. stereoState = "searching"
  1384. }
  1385. return playbackMode, stereoState
  1386. }
  1387. func (sess *streamSession) audioInfo() AudioInfo {
  1388. return AudioInfo{
  1389. SampleRate: sess.sampleRate,
  1390. Channels: sess.channels,
  1391. Format: "s16le",
  1392. DemodName: sess.demodName,
  1393. PlaybackMode: sess.playbackMode,
  1394. StereoState: sess.stereoState,
  1395. }
  1396. }
  1397. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1398. infoJSON, _ := json.Marshal(info)
  1399. tagged := make([]byte, 1+len(infoJSON))
  1400. tagged[0] = 0x00 // tag: audio_info
  1401. copy(tagged[1:], infoJSON)
  1402. for _, sub := range subs {
  1403. select {
  1404. case sub.ch <- tagged:
  1405. default:
  1406. }
  1407. }
  1408. }
  1409. func defaultAudioInfoForMode(mode string) AudioInfo {
  1410. demodName := "NFM"
  1411. if requested := normalizeRequestedMode(mode); requested != "" {
  1412. demodName = requested
  1413. }
  1414. channels := 1
  1415. if demodName == "WFM_STEREO" {
  1416. channels = 2
  1417. } else if d := demod.Get(demodName); d != nil {
  1418. channels = d.Channels()
  1419. }
  1420. playbackMode, stereoState := initialPlaybackState(demodName)
  1421. return AudioInfo{
  1422. SampleRate: streamAudioRate,
  1423. Channels: channels,
  1424. Format: "s16le",
  1425. DemodName: demodName,
  1426. PlaybackMode: playbackMode,
  1427. StereoState: stereoState,
  1428. }
  1429. }
  1430. func normalizeRequestedMode(mode string) string {
  1431. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1432. case "", "AUTO":
  1433. return ""
  1434. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1435. return strings.ToUpper(strings.TrimSpace(mode))
  1436. default:
  1437. return ""
  1438. }
  1439. }
  1440. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1441. func (sess *streamSession) growIQ(n int) []complex64 {
  1442. if cap(sess.scratchIQ) >= n {
  1443. return sess.scratchIQ[:n]
  1444. }
  1445. sess.scratchIQ = make([]complex64, n, n*5/4)
  1446. return sess.scratchIQ
  1447. }
  1448. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1449. func (sess *streamSession) growAudio(n int) []float32 {
  1450. if cap(sess.scratchAudio) >= n {
  1451. return sess.scratchAudio[:n]
  1452. }
  1453. sess.scratchAudio = make([]float32, n, n*5/4)
  1454. return sess.scratchAudio
  1455. }
  1456. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1457. func (sess *streamSession) growPCM(n int) []byte {
  1458. if cap(sess.scratchPCM) >= n {
  1459. return sess.scratchPCM[:n]
  1460. }
  1461. sess.scratchPCM = make([]byte, n, n*5/4)
  1462. return sess.scratchPCM
  1463. }
  1464. func convertToListenOnly(sess *streamSession) {
  1465. if sess.wavBuf != nil {
  1466. _ = sess.wavBuf.Flush()
  1467. }
  1468. if sess.wavFile != nil {
  1469. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1470. sess.wavFile.Close()
  1471. }
  1472. sess.wavFile = nil
  1473. sess.wavBuf = nil
  1474. sess.listenOnly = true
  1475. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1476. }
  1477. func closeSession(sess *streamSession, policy *Policy) {
  1478. if sess.listenOnly {
  1479. return
  1480. }
  1481. if sess.wavBuf != nil {
  1482. _ = sess.wavBuf.Flush()
  1483. }
  1484. if sess.wavFile != nil {
  1485. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1486. sess.wavFile.Close()
  1487. sess.wavFile = nil
  1488. sess.wavBuf = nil
  1489. }
  1490. dur := sess.lastFeed.Sub(sess.startTime)
  1491. files := map[string]any{
  1492. "audio": "audio.wav",
  1493. "audio_sample_rate": sess.sampleRate,
  1494. "audio_channels": sess.channels,
  1495. "audio_demod": sess.demodName,
  1496. "recording_mode": "streaming",
  1497. }
  1498. meta := Meta{
  1499. EventID: sess.signalID,
  1500. Start: sess.startTime,
  1501. End: sess.lastFeed,
  1502. CenterHz: sess.centerHz,
  1503. BandwidthHz: sess.bwHz,
  1504. SampleRate: sess.sampleRate,
  1505. SNRDb: sess.snrDb,
  1506. PeakDb: sess.peakDb,
  1507. Class: sess.class,
  1508. DurationMs: dur.Milliseconds(),
  1509. Files: files,
  1510. }
  1511. b, err := json.MarshalIndent(meta, "", " ")
  1512. if err == nil {
  1513. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1514. }
  1515. if policy != nil {
  1516. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1517. }
  1518. }
  1519. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1520. if len(sess.audioSubs) == 0 {
  1521. return
  1522. }
  1523. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1524. tagged := make([]byte, 1+pcmLen)
  1525. tagged[0] = 0x01
  1526. copy(tagged[1:], pcm[:pcmLen])
  1527. alive := sess.audioSubs[:0]
  1528. for _, sub := range sess.audioSubs {
  1529. select {
  1530. case sub.ch <- tagged:
  1531. default:
  1532. st.droppedPCM++
  1533. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1534. }
  1535. alive = append(alive, sub)
  1536. }
  1537. sess.audioSubs = alive
  1538. }
  1539. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1540. if len(st.policy.ClassFilter) == 0 {
  1541. return true
  1542. }
  1543. if cls == nil {
  1544. return false
  1545. }
  1546. for _, f := range st.policy.ClassFilter {
  1547. if strings.EqualFold(f, string(cls.ModType)) {
  1548. return true
  1549. }
  1550. }
  1551. return false
  1552. }
  1553. // ErrNoSession is returned when no matching signal session exists.
  1554. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1555. // ---------------------------------------------------------------------------
  1556. // WAV header helpers
  1557. // ---------------------------------------------------------------------------
  1558. func writeWAVFile(path string, audio []float32, sampleRate int, channels int) error {
  1559. f, err := os.Create(path)
  1560. if err != nil {
  1561. return err
  1562. }
  1563. defer f.Close()
  1564. return writeWAVTo(f, audio, sampleRate, channels)
  1565. }
  1566. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1567. if channels <= 0 {
  1568. channels = 1
  1569. }
  1570. hdr := make([]byte, 44)
  1571. copy(hdr[0:4], "RIFF")
  1572. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1573. copy(hdr[8:12], "WAVE")
  1574. copy(hdr[12:16], "fmt ")
  1575. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1576. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1577. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1578. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1579. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1580. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1581. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1582. copy(hdr[36:40], "data")
  1583. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1584. _, err := f.Write(hdr)
  1585. return err
  1586. }
  1587. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1588. dataSize := uint32(totalSamples * 2)
  1589. var buf [4]byte
  1590. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1591. if _, err := f.Seek(4, 0); err != nil {
  1592. return
  1593. }
  1594. _, _ = f.Write(buf[:])
  1595. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1596. if _, err := f.Seek(24, 0); err != nil {
  1597. return
  1598. }
  1599. _, _ = f.Write(buf[:])
  1600. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1601. if _, err := f.Seek(28, 0); err != nil {
  1602. return
  1603. }
  1604. _, _ = f.Write(buf[:])
  1605. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1606. if _, err := f.Seek(40, 0); err != nil {
  1607. return
  1608. }
  1609. _, _ = f.Write(buf[:])
  1610. }
  1611. // ResetStreams forces all active streaming sessions to discard their FIR states and decimation phases.
  1612. // This is used when the upstream DSP drops samples, creating a hard break in phase continuity.
  1613. func (st *Streamer) ResetStreams() {
  1614. st.mu.Lock()
  1615. defer st.mu.Unlock()
  1616. for _, sess := range st.sessions {
  1617. sess.preDemodFIR = nil
  1618. sess.preDemodDecimPhase = 0
  1619. sess.stereoResampler = nil
  1620. sess.monoResampler = nil
  1621. }
  1622. }