Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

1781 строка
53KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "sdr-wideband-suite/internal/classifier"
  17. "sdr-wideband-suite/internal/demod"
  18. "sdr-wideband-suite/internal/detector"
  19. "sdr-wideband-suite/internal/dsp"
  20. "sdr-wideband-suite/internal/logging"
  21. )
  22. // ---------------------------------------------------------------------------
  23. // streamSession — one open demod session for one signal
  24. // ---------------------------------------------------------------------------
  25. type streamSession struct {
  26. signalID int64
  27. centerHz float64
  28. bwHz float64
  29. snrDb float64
  30. peakDb float64
  31. class *classifier.Classification
  32. startTime time.Time
  33. lastFeed time.Time
  34. playbackMode string
  35. stereoState string
  36. lastAudioTs time.Time
  37. debugDumpStart time.Time
  38. debugDumpUntil time.Time
  39. debugDumpBase string
  40. demodDump []float32
  41. finalDump []float32
  42. lastAudioL float32
  43. lastAudioR float32
  44. prevAudioL float64 // second-to-last L sample for boundary transient detection
  45. lastAudioSet bool
  46. lastDecIQ complex64
  47. prevDecIQ complex64
  48. lastDecIQSet bool
  49. lastDemodL float32
  50. prevDemodL float64
  51. lastDemodSet bool
  52. // listenOnly sessions have no WAV file and no disk I/O.
  53. // They exist solely to feed audio to live-listen subscribers.
  54. listenOnly bool
  55. // Recording state (nil/zero for listen-only sessions)
  56. dir string
  57. wavFile *os.File
  58. wavBuf *bufio.Writer
  59. wavSamples int64
  60. segmentIdx int
  61. sampleRate int // actual output audio sample rate (always streamAudioRate)
  62. channels int
  63. demodName string
  64. // --- Persistent DSP state for click-free streaming ---
  65. // Overlap-save: tail of previous extracted IQ snippet.
  66. // Currently unused for live demod after removing the extra discriminator
  67. // overlap prepend, but kept in DSP snapshot state for compatibility.
  68. overlapIQ []complex64
  69. // De-emphasis IIR state (persists across frames)
  70. deemphL float64
  71. deemphR float64
  72. // Stereo lock state for live WFM streaming
  73. stereoEnabled bool
  74. stereoOnCount int
  75. stereoOffCount int
  76. // Pilot-locked stereo PLL state (19kHz pilot)
  77. pilotPhase float64
  78. pilotFreq float64
  79. pilotAlpha float64
  80. pilotBeta float64
  81. pilotErrAvg float64
  82. pilotI float64
  83. pilotQ float64
  84. pilotLPAlpha float64
  85. // Polyphase resampler (replaces integer-decimate hack)
  86. monoResampler *dsp.Resampler
  87. monoResamplerRate int
  88. stereoResampler *dsp.StereoResampler
  89. stereoResamplerRate int
  90. // AQ-4: Stateful FIR filters for click-free stereo decode
  91. stereoFilterRate int
  92. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  93. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  94. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  95. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  96. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  97. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  98. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  99. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  100. // and avoids per-frame FIR recomputation)
  101. preDemodFIR *dsp.StatefulFIRComplex
  102. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  103. preDemodDecim int // cached decimation factor
  104. preDemodRate int // cached snipRate this FIR was built for
  105. preDemodCutoff float64 // cached cutoff
  106. preDemodDecimPhase int // retained for backward compatibility in snapshots/debug
  107. // AQ-2: De-emphasis config (µs, 0 = disabled)
  108. deemphasisUs float64
  109. // Scratch buffers — reused across frames to avoid GC pressure.
  110. // Grown as needed, never shrunk.
  111. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  112. scratchAudio []float32 // for stereo decode intermediates
  113. scratchPCM []byte // for PCM encoding
  114. // live-listen subscribers
  115. audioSubs []audioSub
  116. }
  117. type audioSub struct {
  118. id int64
  119. ch chan []byte
  120. }
  121. type RuntimeSignalInfo struct {
  122. DemodName string
  123. PlaybackMode string
  124. StereoState string
  125. Channels int
  126. SampleRate int
  127. }
  128. // AudioInfo describes the audio format of a live-listen subscription.
  129. // Sent to the WebSocket client as the first message.
  130. type AudioInfo struct {
  131. SampleRate int `json:"sample_rate"`
  132. Channels int `json:"channels"`
  133. Format string `json:"format"` // always "s16le"
  134. DemodName string `json:"demod"`
  135. PlaybackMode string `json:"playback_mode,omitempty"`
  136. StereoState string `json:"stereo_state,omitempty"`
  137. }
  138. const (
  139. streamAudioRate = 48000
  140. resamplerTaps = 32 // taps per polyphase arm — good quality
  141. )
  142. var debugDumpDelay = func() time.Duration {
  143. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DELAY_SECONDS"))
  144. if raw == "" {
  145. return 5 * time.Second
  146. }
  147. v, err := strconv.Atoi(raw)
  148. if err != nil || v < 0 {
  149. return 5 * time.Second
  150. }
  151. return time.Duration(v) * time.Second
  152. }()
  153. var debugDumpDuration = func() time.Duration {
  154. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DURATION_SECONDS"))
  155. if raw == "" {
  156. return 15 * time.Second
  157. }
  158. v, err := strconv.Atoi(raw)
  159. if err != nil || v <= 0 {
  160. return 15 * time.Second
  161. }
  162. return time.Duration(v) * time.Second
  163. }()
  164. var audioDumpEnabled = func() bool {
  165. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_AUDIO_DUMP_ENABLED"))
  166. if raw == "" {
  167. return false
  168. }
  169. v, err := strconv.ParseBool(raw)
  170. if err != nil {
  171. return false
  172. }
  173. return v
  174. }()
  175. var decHeadTrimSamples = func() int {
  176. raw := strings.TrimSpace(os.Getenv("SDR_DEC_HEAD_TRIM"))
  177. if raw == "" {
  178. return 0
  179. }
  180. v, err := strconv.Atoi(raw)
  181. if err != nil || v < 0 {
  182. return 0
  183. }
  184. return v
  185. }()
  186. // ---------------------------------------------------------------------------
  187. // Streamer — manages all active streaming sessions
  188. // ---------------------------------------------------------------------------
  189. type streamFeedItem struct {
  190. signal detector.Signal
  191. snippet []complex64
  192. snipRate int
  193. }
  194. type streamFeedMsg struct {
  195. traceID uint64
  196. items []streamFeedItem
  197. }
  198. type Streamer struct {
  199. mu sync.Mutex
  200. sessions map[int64]*streamSession
  201. policy Policy
  202. centerHz float64
  203. nextSub int64
  204. feedCh chan streamFeedMsg
  205. done chan struct{}
  206. droppedFeed uint64
  207. droppedPCM uint64
  208. lastFeedTS time.Time
  209. lastProcTS time.Time
  210. // pendingListens are subscribers waiting for a matching session.
  211. pendingListens map[int64]*pendingListen
  212. }
  213. type pendingListen struct {
  214. freq float64
  215. bw float64
  216. mode string
  217. ch chan []byte
  218. }
  219. func newStreamer(policy Policy, centerHz float64) *Streamer {
  220. st := &Streamer{
  221. sessions: make(map[int64]*streamSession),
  222. policy: policy,
  223. centerHz: centerHz,
  224. feedCh: make(chan streamFeedMsg, 2),
  225. done: make(chan struct{}),
  226. pendingListens: make(map[int64]*pendingListen),
  227. }
  228. go st.worker()
  229. return st
  230. }
  231. func (st *Streamer) worker() {
  232. for msg := range st.feedCh {
  233. st.processFeed(msg)
  234. }
  235. close(st.done)
  236. }
  237. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  238. st.mu.Lock()
  239. defer st.mu.Unlock()
  240. wasEnabled := st.policy.Enabled
  241. st.policy = policy
  242. st.centerHz = centerHz
  243. // If recording was just disabled, close recording sessions
  244. // but keep listen-only sessions alive.
  245. if wasEnabled && !policy.Enabled {
  246. for id, sess := range st.sessions {
  247. if sess.listenOnly {
  248. continue
  249. }
  250. if len(sess.audioSubs) > 0 {
  251. // Convert to listen-only: close WAV but keep session
  252. convertToListenOnly(sess)
  253. } else {
  254. closeSession(sess, &st.policy)
  255. delete(st.sessions, id)
  256. }
  257. }
  258. }
  259. }
  260. // HasListeners returns true if any sessions have audio subscribers
  261. // or there are pending listen requests. Used by the DSP loop to
  262. // decide whether to feed snippets even when recording is disabled.
  263. func (st *Streamer) HasListeners() bool {
  264. st.mu.Lock()
  265. defer st.mu.Unlock()
  266. return st.hasListenersLocked()
  267. }
  268. func (st *Streamer) hasListenersLocked() bool {
  269. if len(st.pendingListens) > 0 {
  270. return true
  271. }
  272. for _, sess := range st.sessions {
  273. if len(sess.audioSubs) > 0 {
  274. return true
  275. }
  276. }
  277. return false
  278. }
  279. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  280. // Feeds are accepted if:
  281. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  282. // - Any live-listen subscribers exist (listen-only mode)
  283. //
  284. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  285. // data, so items can be passed directly without another copy.
  286. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  287. st.mu.Lock()
  288. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  289. hasListeners := st.hasListenersLocked()
  290. pending := len(st.pendingListens)
  291. debugLiveAudio := st.policy.DebugLiveAudio
  292. now := time.Now()
  293. if !st.lastFeedTS.IsZero() {
  294. gap := now.Sub(st.lastFeedTS)
  295. if gap > 150*time.Millisecond {
  296. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  297. }
  298. }
  299. st.lastFeedTS = now
  300. st.mu.Unlock()
  301. if debugLiveAudio {
  302. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  303. }
  304. if (!recEnabled && !hasListeners) || len(items) == 0 {
  305. return
  306. }
  307. select {
  308. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}:
  309. default:
  310. st.droppedFeed++
  311. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  312. }
  313. }
  314. // processFeed runs in the worker goroutine.
  315. func (st *Streamer) processFeed(msg streamFeedMsg) {
  316. st.mu.Lock()
  317. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  318. hasListeners := st.hasListenersLocked()
  319. now := time.Now()
  320. if !st.lastProcTS.IsZero() {
  321. gap := now.Sub(st.lastProcTS)
  322. if gap > 150*time.Millisecond {
  323. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  324. }
  325. }
  326. st.lastProcTS = now
  327. defer st.mu.Unlock()
  328. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  329. if !recEnabled && !hasListeners {
  330. return
  331. }
  332. seen := make(map[int64]bool, len(msg.items))
  333. for i := range msg.items {
  334. item := &msg.items[i]
  335. sig := &item.signal
  336. seen[sig.ID] = true
  337. if sig.ID == 0 || sig.Class == nil {
  338. continue
  339. }
  340. if len(item.snippet) == 0 || item.snipRate <= 0 {
  341. continue
  342. }
  343. // Decide whether this signal needs a session
  344. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  345. needsListen := st.signalHasListenerLocked(sig)
  346. className := "<nil>"
  347. demodName := ""
  348. if sig.Class != nil {
  349. className = string(sig.Class.ModType)
  350. demodName, _ = resolveDemod(sig)
  351. }
  352. if st.policy.DebugLiveAudio {
  353. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  354. }
  355. if !needsRecording && !needsListen {
  356. continue
  357. }
  358. sess, exists := st.sessions[sig.ID]
  359. requestedMode := ""
  360. for _, pl := range st.pendingListens {
  361. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  362. if m := normalizeRequestedMode(pl.mode); m != "" {
  363. requestedMode = m
  364. break
  365. }
  366. }
  367. }
  368. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  369. for _, sub := range sess.audioSubs {
  370. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  371. }
  372. delete(st.sessions, sig.ID)
  373. sess = nil
  374. exists = false
  375. }
  376. if !exists {
  377. if needsRecording {
  378. s, err := st.openRecordingSession(sig, now)
  379. if err != nil {
  380. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  381. sig.ID, sig.CenterHz/1e6, err)
  382. continue
  383. }
  384. st.sessions[sig.ID] = s
  385. sess = s
  386. } else {
  387. s := st.openListenSession(sig, now)
  388. st.sessions[sig.ID] = s
  389. sess = s
  390. }
  391. // Attach any pending listeners
  392. st.attachPendingListeners(sess)
  393. }
  394. // Update metadata
  395. sess.lastFeed = now
  396. sess.centerHz = sig.CenterHz
  397. sess.bwHz = sig.BWHz
  398. if sig.SNRDb > sess.snrDb {
  399. sess.snrDb = sig.SNRDb
  400. }
  401. if sig.PeakDb > sess.peakDb {
  402. sess.peakDb = sig.PeakDb
  403. }
  404. if sig.Class != nil {
  405. sess.class = sig.Class
  406. }
  407. // Demod with persistent state
  408. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  409. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  410. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  411. if len(audio) == 0 {
  412. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  413. }
  414. if len(audio) > 0 {
  415. if sess.wavSamples == 0 && audioRate > 0 {
  416. sess.sampleRate = audioRate
  417. }
  418. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  419. pcmLen := len(audio) * 2
  420. pcm := sess.growPCM(pcmLen)
  421. for k, s := range audio {
  422. v := int16(clip(s * 32767))
  423. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  424. }
  425. if !sess.listenOnly && sess.wavBuf != nil {
  426. n, err := sess.wavBuf.Write(pcm)
  427. if err != nil {
  428. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  429. } else {
  430. sess.wavSamples += int64(n / 2)
  431. }
  432. }
  433. // Gap logging for live-audio sessions + transient click detector
  434. if len(sess.audioSubs) > 0 {
  435. if !sess.lastAudioTs.IsZero() {
  436. gap := time.Since(sess.lastAudioTs)
  437. if gap > 150*time.Millisecond {
  438. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  439. }
  440. }
  441. // Transient click detector: finds short impulses (1-3 samples)
  442. // that deviate sharply from the local signal trend.
  443. // A click looks like: ...smooth... SPIKE ...smooth...
  444. // Normal FM audio has large deltas too, but they follow
  445. // a continuous curve. A click has high |d2/dt2| (acceleration).
  446. //
  447. // Method: second-derivative detector. For each sample triplet
  448. // (a, b, c), compute |2b - a - c| which is the discrete
  449. // second derivative magnitude. High values = transient spike.
  450. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  451. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  452. stride := sess.channels
  453. if stride < 1 {
  454. stride = 1
  455. }
  456. nFrames := len(audio) / stride
  457. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  458. if sess.lastAudioSet && nFrames >= 1 {
  459. // second derivative across boundary: |2*last - prevLast - first|
  460. first := float64(audio[0])
  461. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  462. if d2 > 0.15 {
  463. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  464. }
  465. }
  466. // Intra-frame transient scan (L channel only for performance)
  467. nClicks := 0
  468. maxD2 := float64(0)
  469. maxD2Pos := 0
  470. for k := 1; k < nFrames-1; k++ {
  471. a := float64(audio[(k-1)*stride])
  472. b := float64(audio[k*stride])
  473. c := float64(audio[(k+1)*stride])
  474. d2 := math.Abs(2*b - a - c)
  475. if d2 > maxD2 {
  476. maxD2 = d2
  477. maxD2Pos = k
  478. }
  479. if d2 > 0.15 {
  480. nClicks++
  481. }
  482. }
  483. if nClicks > 0 {
  484. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  485. }
  486. // Store last two samples for next frame's boundary check
  487. if nFrames >= 2 {
  488. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  489. sess.lastAudioL = audio[(nFrames-1)*stride]
  490. if stride > 1 {
  491. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  492. }
  493. } else if nFrames == 1 {
  494. sess.prevAudioL = float64(sess.lastAudioL)
  495. sess.lastAudioL = audio[0]
  496. if stride > 1 && len(audio) >= 2 {
  497. sess.lastAudioR = audio[1]
  498. }
  499. }
  500. sess.lastAudioSet = true
  501. }
  502. sess.lastAudioTs = time.Now()
  503. }
  504. st.fanoutPCM(sess, pcm, pcmLen)
  505. }
  506. // Segment split (recording sessions only)
  507. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  508. segIdx := sess.segmentIdx + 1
  509. oldSubs := sess.audioSubs
  510. oldState := sess.captureDSPState()
  511. sess.audioSubs = nil
  512. closeSession(sess, &st.policy)
  513. s, err := st.openRecordingSession(sig, now)
  514. if err != nil {
  515. delete(st.sessions, sig.ID)
  516. continue
  517. }
  518. s.segmentIdx = segIdx
  519. s.audioSubs = oldSubs
  520. s.restoreDSPState(oldState)
  521. st.sessions[sig.ID] = s
  522. }
  523. }
  524. // Close sessions for disappeared signals (with grace period)
  525. for id, sess := range st.sessions {
  526. if seen[id] {
  527. continue
  528. }
  529. gracePeriod := 3 * time.Second
  530. if sess.listenOnly {
  531. gracePeriod = 5 * time.Second
  532. }
  533. if now.Sub(sess.lastFeed) > gracePeriod {
  534. for _, sub := range sess.audioSubs {
  535. close(sub.ch)
  536. }
  537. sess.audioSubs = nil
  538. if !sess.listenOnly {
  539. closeSession(sess, &st.policy)
  540. }
  541. delete(st.sessions, id)
  542. }
  543. }
  544. }
  545. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  546. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  547. if st.policy.DebugLiveAudio {
  548. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  549. }
  550. return true
  551. }
  552. for subID, pl := range st.pendingListens {
  553. delta := math.Abs(sig.CenterHz - pl.freq)
  554. if delta < 200000 {
  555. if st.policy.DebugLiveAudio {
  556. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  557. }
  558. return true
  559. }
  560. }
  561. return false
  562. }
  563. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  564. for subID, pl := range st.pendingListens {
  565. requestedMode := normalizeRequestedMode(pl.mode)
  566. if requestedMode != "" && sess.demodName != requestedMode {
  567. continue
  568. }
  569. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  570. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  571. delete(st.pendingListens, subID)
  572. // Send updated audio_info now that we know the real session params.
  573. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  574. infoJSON, _ := json.Marshal(sess.audioInfo())
  575. tagged := make([]byte, 1+len(infoJSON))
  576. tagged[0] = 0x00 // tag: audio_info
  577. copy(tagged[1:], infoJSON)
  578. select {
  579. case pl.ch <- tagged:
  580. default:
  581. }
  582. if audioDumpEnabled {
  583. now := time.Now()
  584. sess.debugDumpStart = now.Add(debugDumpDelay)
  585. sess.debugDumpUntil = sess.debugDumpStart.Add(debugDumpDuration)
  586. sess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", sess.signalID, now.Format("20060102-150405")))
  587. sess.demodDump = nil
  588. sess.finalDump = nil
  589. }
  590. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  591. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  592. if audioDumpEnabled {
  593. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", sess.signalID, sess.debugDumpStart.Format(time.RFC3339), sess.debugDumpUntil.Format(time.RFC3339))
  594. }
  595. }
  596. }
  597. }
  598. // CloseAll finalises all sessions and stops the worker goroutine.
  599. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  600. st.mu.Lock()
  601. defer st.mu.Unlock()
  602. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  603. for _, sess := range st.sessions {
  604. out[sess.signalID] = RuntimeSignalInfo{
  605. DemodName: sess.demodName,
  606. PlaybackMode: sess.playbackMode,
  607. StereoState: sess.stereoState,
  608. Channels: sess.channels,
  609. SampleRate: sess.sampleRate,
  610. }
  611. }
  612. return out
  613. }
  614. func (st *Streamer) CloseAll() {
  615. close(st.feedCh)
  616. <-st.done
  617. st.mu.Lock()
  618. defer st.mu.Unlock()
  619. for id, sess := range st.sessions {
  620. for _, sub := range sess.audioSubs {
  621. close(sub.ch)
  622. }
  623. sess.audioSubs = nil
  624. if !sess.listenOnly {
  625. closeSession(sess, &st.policy)
  626. }
  627. delete(st.sessions, id)
  628. }
  629. for _, pl := range st.pendingListens {
  630. close(pl.ch)
  631. }
  632. st.pendingListens = nil
  633. }
  634. // ActiveSessions returns the number of open streaming sessions.
  635. func (st *Streamer) ActiveSessions() int {
  636. st.mu.Lock()
  637. defer st.mu.Unlock()
  638. return len(st.sessions)
  639. }
  640. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  641. //
  642. // LL-2: Returns AudioInfo with correct channels and sample rate.
  643. // LL-3: Returns error only on hard failures (nil streamer etc).
  644. //
  645. // If a matching session exists, attaches immediately. Otherwise, the
  646. // subscriber is held as "pending" and will be attached when a matching
  647. // signal appears in the next DSP frame.
  648. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  649. ch := make(chan []byte, 64)
  650. st.mu.Lock()
  651. defer st.mu.Unlock()
  652. st.nextSub++
  653. subID := st.nextSub
  654. requestedMode := normalizeRequestedMode(mode)
  655. // Try to find a matching session
  656. var bestSess *streamSession
  657. bestDist := math.MaxFloat64
  658. for _, sess := range st.sessions {
  659. if requestedMode != "" && sess.demodName != requestedMode {
  660. continue
  661. }
  662. d := math.Abs(sess.centerHz - freq)
  663. if d < bestDist {
  664. bestDist = d
  665. bestSess = sess
  666. }
  667. }
  668. if bestSess != nil && bestDist < 200000 {
  669. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  670. if audioDumpEnabled {
  671. now := time.Now()
  672. bestSess.debugDumpStart = now.Add(debugDumpDelay)
  673. bestSess.debugDumpUntil = bestSess.debugDumpStart.Add(debugDumpDuration)
  674. bestSess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", bestSess.signalID, now.Format("20060102-150405")))
  675. bestSess.demodDump = nil
  676. bestSess.finalDump = nil
  677. }
  678. info := bestSess.audioInfo()
  679. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  680. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  681. if audioDumpEnabled {
  682. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", bestSess.signalID, bestSess.debugDumpStart.Format(time.RFC3339), bestSess.debugDumpUntil.Format(time.RFC3339))
  683. }
  684. return subID, ch, info, nil
  685. }
  686. // No matching session yet — add as pending listener
  687. st.pendingListens[subID] = &pendingListen{
  688. freq: freq,
  689. bw: bw,
  690. mode: mode,
  691. ch: ch,
  692. }
  693. info := defaultAudioInfoForMode(mode)
  694. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  695. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  696. return subID, ch, info, nil
  697. }
  698. // UnsubscribeAudio removes a live-listen subscriber.
  699. func (st *Streamer) UnsubscribeAudio(subID int64) {
  700. st.mu.Lock()
  701. defer st.mu.Unlock()
  702. if pl, ok := st.pendingListens[subID]; ok {
  703. close(pl.ch)
  704. delete(st.pendingListens, subID)
  705. return
  706. }
  707. for _, sess := range st.sessions {
  708. for i, sub := range sess.audioSubs {
  709. if sub.id == subID {
  710. close(sub.ch)
  711. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  712. return
  713. }
  714. }
  715. }
  716. }
  717. // ---------------------------------------------------------------------------
  718. // Session: stateful extraction + demod
  719. // ---------------------------------------------------------------------------
  720. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  721. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  722. // output with zero transient artifacts.
  723. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  724. if len(snippet) == 0 || snipRate <= 0 {
  725. return nil, 0
  726. }
  727. isWFMStereo := sess.demodName == "WFM_STEREO"
  728. isWFM := sess.demodName == "WFM" || isWFMStereo
  729. demodName := sess.demodName
  730. if isWFMStereo {
  731. demodName = "WFM"
  732. }
  733. d := demod.Get(demodName)
  734. if d == nil {
  735. d = demod.Get("NFM")
  736. }
  737. if d == nil {
  738. return nil, 0
  739. }
  740. // The extra 1-sample discriminator overlap prepend was removed after it was
  741. // shown to shift the downstream decimation phase and create heavy click
  742. // artifacts in steady-state streaming/recording. The upstream extraction path
  743. // and the stateful FIR/decimation stages already provide continuity.
  744. fullSnip := snippet
  745. overlapApplied := false
  746. prevTailValid := false
  747. if logging.EnabledCategory("prefir") && len(fullSnip) > 0 {
  748. probeN := 64
  749. if len(fullSnip) < probeN {
  750. probeN = len(fullSnip)
  751. }
  752. minPreMag := math.MaxFloat64
  753. minPreIdx := 0
  754. maxPreStep := 0.0
  755. maxPreStepIdx := 0
  756. for i := 0; i < probeN; i++ {
  757. v := fullSnip[i]
  758. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  759. if mag < minPreMag {
  760. minPreMag = mag
  761. minPreIdx = i
  762. }
  763. if i > 0 {
  764. p := fullSnip[i-1]
  765. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  766. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  767. step := math.Abs(math.Atan2(num, den))
  768. if step > maxPreStep {
  769. maxPreStep = step
  770. maxPreStepIdx = i - 1
  771. }
  772. }
  773. }
  774. logging.Debug("prefir", "pre_fir_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "snip_len", len(fullSnip))
  775. if minPreMag < 0.18 {
  776. logging.Warn("prefir", "pre_fir_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx)
  777. }
  778. if maxPreStep > 1.5 {
  779. logging.Warn("prefir", "pre_fir_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "min_mag", minPreMag, "min_idx", minPreIdx)
  780. }
  781. }
  782. // --- Stateful anti-alias FIR + decimation to demod rate ---
  783. demodRate := d.OutputSampleRate()
  784. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  785. if decim1 < 1 {
  786. decim1 = 1
  787. }
  788. // WFM override: force decim1=2 (256kHz) instead of round(512k/192k)=3 (170kHz).
  789. // At decim1=3, Nyquist is 85kHz which clips FM broadcast ±75kHz deviation.
  790. // At decim1=2, Nyquist is 128kHz → full FM deviation + stereo pilot + guard band.
  791. // Bonus: 256000→48000 resampler ratio is L=3/M=16 (96 taps, 1kB) instead of
  792. // the pathological L=24000/M=85333 (768k taps, 6MB) from 170666→48000.
  793. if isWFM && decim1 > 2 && snipRate/2 >= 200000 {
  794. decim1 = 2
  795. }
  796. actualDemodRate := snipRate / decim1
  797. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  798. var dec []complex64
  799. if decim1 > 1 {
  800. // FIR cutoff: for WFM, use 90kHz (above ±75kHz FM deviation + guard).
  801. // For NFM/other: use standard Nyquist*0.8 cutoff.
  802. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  803. if isWFM {
  804. cutoff = 90000
  805. }
  806. // Lazy-init or reinit stateful FIR if parameters changed
  807. if sess.preDemodDecimator == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff || sess.preDemodDecim != decim1 {
  808. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  809. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  810. sess.preDemodDecimator = dsp.NewStatefulDecimatingFIRComplex(taps, decim1)
  811. sess.preDemodRate = snipRate
  812. sess.preDemodCutoff = cutoff
  813. sess.preDemodDecim = decim1
  814. sess.preDemodDecimPhase = 0
  815. }
  816. decimPhaseBefore := sess.preDemodDecimPhase
  817. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  818. dec = sess.preDemodDecimator.Process(fullSnip)
  819. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase)
  820. } else {
  821. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0)
  822. dec = fullSnip
  823. }
  824. if decHeadTrimSamples > 0 && decHeadTrimSamples < len(dec) {
  825. logging.Warn("boundary", "dec_head_trim_applied", "signal", sess.signalID, "trim", decHeadTrimSamples, "before_len", len(dec))
  826. dec = dec[decHeadTrimSamples:]
  827. }
  828. if logging.EnabledCategory("boundary") && len(dec) > 0 {
  829. first := dec[0]
  830. if sess.lastDecIQSet {
  831. d2Re := math.Abs(2*float64(real(sess.lastDecIQ)) - float64(real(sess.prevDecIQ)) - float64(real(first)))
  832. d2Im := math.Abs(2*float64(imag(sess.lastDecIQ)) - float64(imag(sess.prevDecIQ)) - float64(imag(first)))
  833. d2Mag := math.Hypot(d2Re, d2Im)
  834. if d2Mag > 0.15 {
  835. logging.Warn("boundary", "dec_iq_boundary", "signal", sess.signalID, "d2", d2Mag)
  836. }
  837. }
  838. headN := 16
  839. if len(dec) < headN {
  840. headN = len(dec)
  841. }
  842. tailN := 16
  843. if len(dec) < tailN {
  844. tailN = len(dec)
  845. }
  846. var headSum, tailSum, minMag, maxMag float64
  847. minMag = math.MaxFloat64
  848. for i, v := range dec {
  849. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  850. if mag < minMag {
  851. minMag = mag
  852. }
  853. if mag > maxMag {
  854. maxMag = mag
  855. }
  856. if i < headN {
  857. headSum += mag
  858. }
  859. }
  860. for i := len(dec) - tailN; i < len(dec); i++ {
  861. if i >= 0 {
  862. v := dec[i]
  863. tailSum += math.Hypot(float64(real(v)), float64(imag(v)))
  864. }
  865. }
  866. headAvg := 0.0
  867. if headN > 0 {
  868. headAvg = headSum / float64(headN)
  869. }
  870. tailAvg := 0.0
  871. if tailN > 0 {
  872. tailAvg = tailSum / float64(tailN)
  873. }
  874. logging.Debug("boundary", "dec_iq_meter", "signal", sess.signalID, "len", len(dec), "head_avg", headAvg, "tail_avg", tailAvg, "min_mag", minMag, "max_mag", maxMag)
  875. if tailAvg > 0 {
  876. ratio := headAvg / tailAvg
  877. if ratio < 0.75 || ratio > 1.25 {
  878. logging.Warn("boundary", "dec_iq_head_tail_skew", "signal", sess.signalID, "head_avg", headAvg, "tail_avg", tailAvg, "ratio", ratio)
  879. }
  880. }
  881. probeN := 64
  882. if len(dec) < probeN {
  883. probeN = len(dec)
  884. }
  885. minHeadMag := math.MaxFloat64
  886. minHeadIdx := 0
  887. maxHeadStep := 0.0
  888. maxHeadStepIdx := 0
  889. for i := 0; i < probeN; i++ {
  890. v := dec[i]
  891. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  892. if mag < minHeadMag {
  893. minHeadMag = mag
  894. minHeadIdx = i
  895. }
  896. if i > 0 {
  897. p := dec[i-1]
  898. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  899. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  900. step := math.Abs(math.Atan2(num, den))
  901. if step > maxHeadStep {
  902. maxHeadStep = step
  903. maxHeadStepIdx = i - 1
  904. }
  905. }
  906. }
  907. logging.Debug("boundary", "dec_iq_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  908. if minHeadMag < 0.18 {
  909. logging.Warn("boundary", "dec_iq_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  910. }
  911. if maxHeadStep > 1.5 {
  912. logging.Warn("boundary", "dec_iq_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx, "min_mag", minHeadMag, "min_idx", minHeadIdx)
  913. }
  914. if len(dec) >= 2 {
  915. sess.prevDecIQ = dec[len(dec)-2]
  916. sess.lastDecIQ = dec[len(dec)-1]
  917. } else {
  918. sess.prevDecIQ = sess.lastDecIQ
  919. sess.lastDecIQ = dec[0]
  920. }
  921. sess.lastDecIQSet = true
  922. }
  923. // --- FM/AM/etc Demod ---
  924. audio := d.Demod(dec, actualDemodRate)
  925. if len(audio) == 0 {
  926. return nil, 0
  927. }
  928. if logging.EnabledCategory("boundary") {
  929. stride := d.Channels()
  930. if stride < 1 {
  931. stride = 1
  932. }
  933. nFrames := len(audio) / stride
  934. if nFrames > 0 {
  935. first := float64(audio[0])
  936. if sess.lastDemodSet {
  937. d2 := math.Abs(2*float64(sess.lastDemodL) - sess.prevDemodL - first)
  938. if d2 > 0.15 {
  939. logging.Warn("boundary", "demod_boundary", "signal", sess.signalID, "d2", d2)
  940. }
  941. }
  942. if nFrames >= 2 {
  943. sess.prevDemodL = float64(audio[(nFrames-2)*stride])
  944. sess.lastDemodL = audio[(nFrames-1)*stride]
  945. } else {
  946. sess.prevDemodL = float64(sess.lastDemodL)
  947. sess.lastDemodL = audio[0]
  948. }
  949. sess.lastDemodSet = true
  950. }
  951. }
  952. logging.Debug("boundary", "audio_path", "signal", sess.signalID, "demod", demodName, "actual_rate", actualDemodRate, "audio_len", len(audio), "channels", d.Channels(), "overlap_applied", overlapApplied, "prev_tail_valid", prevTailValid)
  953. shouldDump := !sess.debugDumpStart.IsZero() && !sess.debugDumpUntil.IsZero()
  954. if shouldDump {
  955. now := time.Now()
  956. shouldDump = !now.Before(sess.debugDumpStart) && now.Before(sess.debugDumpUntil)
  957. }
  958. if shouldDump {
  959. sess.demodDump = append(sess.demodDump, audio...)
  960. }
  961. // --- Stateful stereo decode with conservative lock/hysteresis ---
  962. channels := 1
  963. if isWFMStereo {
  964. sess.playbackMode = "WFM_STEREO"
  965. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  966. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  967. if locked {
  968. sess.stereoOnCount++
  969. sess.stereoOffCount = 0
  970. if sess.stereoOnCount >= 4 {
  971. sess.stereoEnabled = true
  972. }
  973. } else {
  974. sess.stereoOnCount = 0
  975. sess.stereoOffCount++
  976. if sess.stereoOffCount >= 10 {
  977. sess.stereoEnabled = false
  978. }
  979. }
  980. prevPlayback := sess.playbackMode
  981. prevStereo := sess.stereoState
  982. if sess.stereoEnabled && len(stereoAudio) > 0 {
  983. sess.stereoState = "locked"
  984. audio = stereoAudio
  985. } else {
  986. sess.stereoState = "mono-fallback"
  987. dual := make([]float32, len(audio)*2)
  988. for i, s := range audio {
  989. dual[i*2] = s
  990. dual[i*2+1] = s
  991. }
  992. audio = dual
  993. }
  994. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  995. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  996. }
  997. }
  998. // --- Polyphase resample to exact 48kHz ---
  999. if actualDemodRate != streamAudioRate {
  1000. if channels > 1 {
  1001. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  1002. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  1003. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1004. sess.stereoResamplerRate = actualDemodRate
  1005. }
  1006. audio = sess.stereoResampler.Process(audio)
  1007. } else {
  1008. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  1009. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  1010. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1011. sess.monoResamplerRate = actualDemodRate
  1012. }
  1013. audio = sess.monoResampler.Process(audio)
  1014. }
  1015. }
  1016. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  1017. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  1018. tau := sess.deemphasisUs * 1e-6
  1019. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  1020. if channels > 1 {
  1021. nFrames := len(audio) / channels
  1022. yL, yR := sess.deemphL, sess.deemphR
  1023. for i := 0; i < nFrames; i++ {
  1024. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  1025. audio[i*2] = float32(yL)
  1026. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  1027. audio[i*2+1] = float32(yR)
  1028. }
  1029. sess.deemphL, sess.deemphR = yL, yR
  1030. } else {
  1031. y := sess.deemphL
  1032. for i := range audio {
  1033. y = alpha*y + (1-alpha)*float64(audio[i])
  1034. audio[i] = float32(y)
  1035. }
  1036. sess.deemphL = y
  1037. }
  1038. }
  1039. if isWFM {
  1040. for i := range audio {
  1041. audio[i] *= 0.35
  1042. }
  1043. }
  1044. if shouldDump {
  1045. sess.finalDump = append(sess.finalDump, audio...)
  1046. } else if !sess.debugDumpUntil.IsZero() && time.Now().After(sess.debugDumpUntil) && sess.debugDumpBase != "" {
  1047. _ = os.MkdirAll(filepath.Dir(sess.debugDumpBase), 0o755)
  1048. if len(sess.demodDump) > 0 {
  1049. _ = writeWAVFile(sess.debugDumpBase+"-demod.wav", sess.demodDump, actualDemodRate, d.Channels())
  1050. }
  1051. if len(sess.finalDump) > 0 {
  1052. _ = writeWAVFile(sess.debugDumpBase+"-final.wav", sess.finalDump, streamAudioRate, channels)
  1053. }
  1054. logging.Warn("boundary", "debug_audio_dump_window", "signal", sess.signalID, "base", sess.debugDumpBase)
  1055. sess.debugDumpBase = ""
  1056. sess.demodDump = nil
  1057. sess.finalDump = nil
  1058. sess.debugDumpStart = time.Time{}
  1059. sess.debugDumpUntil = time.Time{}
  1060. }
  1061. return audio, streamAudioRate
  1062. }
  1063. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  1064. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  1065. // loopBW is in Hz, sampleRate in samples/sec.
  1066. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  1067. if sampleRate <= 0 || loopBW <= 0 {
  1068. return 0, 0
  1069. }
  1070. bl := loopBW / float64(sampleRate)
  1071. theta := bl / (damping + 0.25/damping)
  1072. d := 1 + 2*damping*theta + theta*theta
  1073. alpha := (4 * damping * theta) / d
  1074. beta := (4 * theta * theta) / d
  1075. return alpha, beta
  1076. }
  1077. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  1078. // Uses persistent FIR filter state across frames for click-free stereo.
  1079. // Reuses session scratch buffers to minimize allocations.
  1080. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  1081. if len(mono) == 0 || sampleRate <= 0 {
  1082. return nil, false
  1083. }
  1084. n := len(mono)
  1085. // Rebuild rate-dependent stereo filters when sampleRate changes
  1086. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  1087. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  1088. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  1089. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  1090. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  1091. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  1092. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  1093. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  1094. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  1095. sess.stereoFilterRate = sampleRate
  1096. // Initialize PLL for 19kHz pilot tracking.
  1097. sess.pilotPhase = 0
  1098. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  1099. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  1100. sess.pilotErrAvg = 0
  1101. sess.pilotI = 0
  1102. sess.pilotQ = 0
  1103. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  1104. }
  1105. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  1106. scratch := sess.growAudio(n * 5)
  1107. lpr := scratch[:n]
  1108. bpfLR := scratch[n : 2*n]
  1109. lr := scratch[2*n : 3*n]
  1110. work1 := scratch[3*n : 4*n]
  1111. work2 := scratch[4*n : 5*n]
  1112. sess.stereoLPF.ProcessInto(mono, lpr)
  1113. // 23-53kHz bandpass for L-R DSB-SC.
  1114. sess.stereoBPHi.ProcessInto(mono, work1)
  1115. sess.stereoBPLo.ProcessInto(mono, work2)
  1116. for i := 0; i < n; i++ {
  1117. bpfLR[i] = work1[i] - work2[i]
  1118. }
  1119. // 19kHz pilot bandpass for PLL.
  1120. sess.pilotLPFHi.ProcessInto(mono, work1)
  1121. sess.pilotLPFLo.ProcessInto(mono, work2)
  1122. for i := 0; i < n; i++ {
  1123. work1[i] = work1[i] - work2[i]
  1124. }
  1125. pilot := work1
  1126. phase := sess.pilotPhase
  1127. freq := sess.pilotFreq
  1128. alpha := sess.pilotAlpha
  1129. beta := sess.pilotBeta
  1130. iState := sess.pilotI
  1131. qState := sess.pilotQ
  1132. lpAlpha := sess.pilotLPAlpha
  1133. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  1134. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  1135. var pilotPower float64
  1136. var totalPower float64
  1137. var errSum float64
  1138. for i := 0; i < n; i++ {
  1139. p := float64(pilot[i])
  1140. sinP, cosP := math.Sincos(phase)
  1141. iMix := p * cosP
  1142. qMix := p * -sinP
  1143. iState += lpAlpha * (iMix - iState)
  1144. qState += lpAlpha * (qMix - qState)
  1145. err := math.Atan2(qState, iState)
  1146. freq += beta * err
  1147. if freq < minFreq {
  1148. freq = minFreq
  1149. } else if freq > maxFreq {
  1150. freq = maxFreq
  1151. }
  1152. phase += freq + alpha*err
  1153. if phase > 2*math.Pi {
  1154. phase -= 2 * math.Pi
  1155. } else if phase < 0 {
  1156. phase += 2 * math.Pi
  1157. }
  1158. totalPower += float64(mono[i]) * float64(mono[i])
  1159. pilotPower += p * p
  1160. errSum += math.Abs(err)
  1161. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  1162. }
  1163. sess.pilotPhase = phase
  1164. sess.pilotFreq = freq
  1165. sess.pilotI = iState
  1166. sess.pilotQ = qState
  1167. blockErr := errSum / float64(n)
  1168. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  1169. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  1170. pilotRatio := 0.0
  1171. if totalPower > 0 {
  1172. pilotRatio = pilotPower / totalPower
  1173. }
  1174. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  1175. // Lock heuristics: pilot power fraction and PLL phase error stability.
  1176. // Pilot power is a small but stable fraction of composite energy; require
  1177. // a modest floor plus PLL settling to avoid flapping in noise.
  1178. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  1179. out := make([]float32, n*2)
  1180. for i := 0; i < n; i++ {
  1181. out[i*2] = 0.5 * (lpr[i] + lr[i])
  1182. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  1183. }
  1184. return out, locked
  1185. }
  1186. // dspStateSnapshot captures persistent DSP state for segment splits.
  1187. type dspStateSnapshot struct {
  1188. overlapIQ []complex64
  1189. deemphL float64
  1190. deemphR float64
  1191. pilotPhase float64
  1192. pilotFreq float64
  1193. pilotAlpha float64
  1194. pilotBeta float64
  1195. pilotErrAvg float64
  1196. pilotI float64
  1197. pilotQ float64
  1198. pilotLPAlpha float64
  1199. monoResampler *dsp.Resampler
  1200. monoResamplerRate int
  1201. stereoResampler *dsp.StereoResampler
  1202. stereoResamplerRate int
  1203. stereoLPF *dsp.StatefulFIRReal
  1204. stereoFilterRate int
  1205. stereoBPHi *dsp.StatefulFIRReal
  1206. stereoBPLo *dsp.StatefulFIRReal
  1207. stereoLRLPF *dsp.StatefulFIRReal
  1208. stereoAALPF *dsp.StatefulFIRReal
  1209. pilotLPFHi *dsp.StatefulFIRReal
  1210. pilotLPFLo *dsp.StatefulFIRReal
  1211. preDemodFIR *dsp.StatefulFIRComplex
  1212. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  1213. preDemodDecim int
  1214. preDemodRate int
  1215. preDemodCutoff float64
  1216. preDemodDecimPhase int
  1217. }
  1218. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  1219. return dspStateSnapshot{
  1220. overlapIQ: sess.overlapIQ,
  1221. deemphL: sess.deemphL,
  1222. deemphR: sess.deemphR,
  1223. pilotPhase: sess.pilotPhase,
  1224. pilotFreq: sess.pilotFreq,
  1225. pilotAlpha: sess.pilotAlpha,
  1226. pilotBeta: sess.pilotBeta,
  1227. pilotErrAvg: sess.pilotErrAvg,
  1228. pilotI: sess.pilotI,
  1229. pilotQ: sess.pilotQ,
  1230. pilotLPAlpha: sess.pilotLPAlpha,
  1231. monoResampler: sess.monoResampler,
  1232. monoResamplerRate: sess.monoResamplerRate,
  1233. stereoResampler: sess.stereoResampler,
  1234. stereoResamplerRate: sess.stereoResamplerRate,
  1235. stereoLPF: sess.stereoLPF,
  1236. stereoFilterRate: sess.stereoFilterRate,
  1237. stereoBPHi: sess.stereoBPHi,
  1238. stereoBPLo: sess.stereoBPLo,
  1239. stereoLRLPF: sess.stereoLRLPF,
  1240. stereoAALPF: sess.stereoAALPF,
  1241. pilotLPFHi: sess.pilotLPFHi,
  1242. pilotLPFLo: sess.pilotLPFLo,
  1243. preDemodFIR: sess.preDemodFIR,
  1244. preDemodDecimator: sess.preDemodDecimator,
  1245. preDemodDecim: sess.preDemodDecim,
  1246. preDemodRate: sess.preDemodRate,
  1247. preDemodCutoff: sess.preDemodCutoff,
  1248. preDemodDecimPhase: sess.preDemodDecimPhase,
  1249. }
  1250. }
  1251. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  1252. sess.overlapIQ = s.overlapIQ
  1253. sess.deemphL = s.deemphL
  1254. sess.deemphR = s.deemphR
  1255. sess.pilotPhase = s.pilotPhase
  1256. sess.pilotFreq = s.pilotFreq
  1257. sess.pilotAlpha = s.pilotAlpha
  1258. sess.pilotBeta = s.pilotBeta
  1259. sess.pilotErrAvg = s.pilotErrAvg
  1260. sess.pilotI = s.pilotI
  1261. sess.pilotQ = s.pilotQ
  1262. sess.pilotLPAlpha = s.pilotLPAlpha
  1263. sess.monoResampler = s.monoResampler
  1264. sess.monoResamplerRate = s.monoResamplerRate
  1265. sess.stereoResampler = s.stereoResampler
  1266. sess.stereoResamplerRate = s.stereoResamplerRate
  1267. sess.stereoLPF = s.stereoLPF
  1268. sess.stereoFilterRate = s.stereoFilterRate
  1269. sess.stereoBPHi = s.stereoBPHi
  1270. sess.stereoBPLo = s.stereoBPLo
  1271. sess.stereoLRLPF = s.stereoLRLPF
  1272. sess.stereoAALPF = s.stereoAALPF
  1273. sess.pilotLPFHi = s.pilotLPFHi
  1274. sess.pilotLPFLo = s.pilotLPFLo
  1275. sess.preDemodFIR = s.preDemodFIR
  1276. sess.preDemodDecimator = s.preDemodDecimator
  1277. sess.preDemodDecim = s.preDemodDecim
  1278. sess.preDemodRate = s.preDemodRate
  1279. sess.preDemodCutoff = s.preDemodCutoff
  1280. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1281. }
  1282. // ---------------------------------------------------------------------------
  1283. // Session management helpers
  1284. // ---------------------------------------------------------------------------
  1285. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1286. outputDir := st.policy.OutputDir
  1287. if outputDir == "" {
  1288. outputDir = "data/recordings"
  1289. }
  1290. demodName, channels := resolveDemod(sig)
  1291. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1292. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1293. dir := filepath.Join(outputDir, dirName)
  1294. if err := os.MkdirAll(dir, 0o755); err != nil {
  1295. return nil, err
  1296. }
  1297. wavPath := filepath.Join(dir, "audio.wav")
  1298. f, err := os.Create(wavPath)
  1299. if err != nil {
  1300. return nil, err
  1301. }
  1302. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1303. f.Close()
  1304. return nil, err
  1305. }
  1306. playbackMode, stereoState := initialPlaybackState(demodName)
  1307. sess := &streamSession{
  1308. signalID: sig.ID,
  1309. centerHz: sig.CenterHz,
  1310. bwHz: sig.BWHz,
  1311. snrDb: sig.SNRDb,
  1312. peakDb: sig.PeakDb,
  1313. class: sig.Class,
  1314. startTime: now,
  1315. lastFeed: now,
  1316. dir: dir,
  1317. wavFile: f,
  1318. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1319. sampleRate: streamAudioRate,
  1320. channels: channels,
  1321. demodName: demodName,
  1322. playbackMode: playbackMode,
  1323. stereoState: stereoState,
  1324. deemphasisUs: st.policy.DeemphasisUs,
  1325. }
  1326. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1327. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1328. return sess, nil
  1329. }
  1330. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1331. demodName, channels := resolveDemod(sig)
  1332. for _, pl := range st.pendingListens {
  1333. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1334. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1335. demodName = requested
  1336. if demodName == "WFM_STEREO" {
  1337. channels = 2
  1338. } else if d := demod.Get(demodName); d != nil {
  1339. channels = d.Channels()
  1340. } else {
  1341. channels = 1
  1342. }
  1343. break
  1344. }
  1345. }
  1346. }
  1347. playbackMode, stereoState := initialPlaybackState(demodName)
  1348. sess := &streamSession{
  1349. signalID: sig.ID,
  1350. centerHz: sig.CenterHz,
  1351. bwHz: sig.BWHz,
  1352. snrDb: sig.SNRDb,
  1353. peakDb: sig.PeakDb,
  1354. class: sig.Class,
  1355. startTime: now,
  1356. lastFeed: now,
  1357. listenOnly: true,
  1358. sampleRate: streamAudioRate,
  1359. channels: channels,
  1360. demodName: demodName,
  1361. playbackMode: playbackMode,
  1362. stereoState: stereoState,
  1363. deemphasisUs: st.policy.DeemphasisUs,
  1364. }
  1365. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1366. sig.ID, sig.CenterHz/1e6, demodName)
  1367. return sess
  1368. }
  1369. func resolveDemod(sig *detector.Signal) (string, int) {
  1370. demodName := "NFM"
  1371. if sig.Class != nil {
  1372. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1373. demodName = n
  1374. }
  1375. }
  1376. channels := 1
  1377. if demodName == "WFM_STEREO" {
  1378. channels = 2
  1379. } else if d := demod.Get(demodName); d != nil {
  1380. channels = d.Channels()
  1381. }
  1382. return demodName, channels
  1383. }
  1384. func initialPlaybackState(demodName string) (string, string) {
  1385. playbackMode := demodName
  1386. stereoState := "mono"
  1387. if demodName == "WFM_STEREO" {
  1388. stereoState = "searching"
  1389. }
  1390. return playbackMode, stereoState
  1391. }
  1392. func (sess *streamSession) audioInfo() AudioInfo {
  1393. return AudioInfo{
  1394. SampleRate: sess.sampleRate,
  1395. Channels: sess.channels,
  1396. Format: "s16le",
  1397. DemodName: sess.demodName,
  1398. PlaybackMode: sess.playbackMode,
  1399. StereoState: sess.stereoState,
  1400. }
  1401. }
  1402. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1403. infoJSON, _ := json.Marshal(info)
  1404. tagged := make([]byte, 1+len(infoJSON))
  1405. tagged[0] = 0x00 // tag: audio_info
  1406. copy(tagged[1:], infoJSON)
  1407. for _, sub := range subs {
  1408. select {
  1409. case sub.ch <- tagged:
  1410. default:
  1411. }
  1412. }
  1413. }
  1414. func defaultAudioInfoForMode(mode string) AudioInfo {
  1415. demodName := "NFM"
  1416. if requested := normalizeRequestedMode(mode); requested != "" {
  1417. demodName = requested
  1418. }
  1419. channels := 1
  1420. if demodName == "WFM_STEREO" {
  1421. channels = 2
  1422. } else if d := demod.Get(demodName); d != nil {
  1423. channels = d.Channels()
  1424. }
  1425. playbackMode, stereoState := initialPlaybackState(demodName)
  1426. return AudioInfo{
  1427. SampleRate: streamAudioRate,
  1428. Channels: channels,
  1429. Format: "s16le",
  1430. DemodName: demodName,
  1431. PlaybackMode: playbackMode,
  1432. StereoState: stereoState,
  1433. }
  1434. }
  1435. func normalizeRequestedMode(mode string) string {
  1436. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1437. case "", "AUTO":
  1438. return ""
  1439. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1440. return strings.ToUpper(strings.TrimSpace(mode))
  1441. default:
  1442. return ""
  1443. }
  1444. }
  1445. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1446. func (sess *streamSession) growIQ(n int) []complex64 {
  1447. if cap(sess.scratchIQ) >= n {
  1448. return sess.scratchIQ[:n]
  1449. }
  1450. sess.scratchIQ = make([]complex64, n, n*5/4)
  1451. return sess.scratchIQ
  1452. }
  1453. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1454. func (sess *streamSession) growAudio(n int) []float32 {
  1455. if cap(sess.scratchAudio) >= n {
  1456. return sess.scratchAudio[:n]
  1457. }
  1458. sess.scratchAudio = make([]float32, n, n*5/4)
  1459. return sess.scratchAudio
  1460. }
  1461. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1462. func (sess *streamSession) growPCM(n int) []byte {
  1463. if cap(sess.scratchPCM) >= n {
  1464. return sess.scratchPCM[:n]
  1465. }
  1466. sess.scratchPCM = make([]byte, n, n*5/4)
  1467. return sess.scratchPCM
  1468. }
  1469. func convertToListenOnly(sess *streamSession) {
  1470. if sess.wavBuf != nil {
  1471. _ = sess.wavBuf.Flush()
  1472. }
  1473. if sess.wavFile != nil {
  1474. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1475. sess.wavFile.Close()
  1476. }
  1477. sess.wavFile = nil
  1478. sess.wavBuf = nil
  1479. sess.listenOnly = true
  1480. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1481. }
  1482. func closeSession(sess *streamSession, policy *Policy) {
  1483. if sess.listenOnly {
  1484. return
  1485. }
  1486. if sess.wavBuf != nil {
  1487. _ = sess.wavBuf.Flush()
  1488. }
  1489. if sess.wavFile != nil {
  1490. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1491. sess.wavFile.Close()
  1492. sess.wavFile = nil
  1493. sess.wavBuf = nil
  1494. }
  1495. dur := sess.lastFeed.Sub(sess.startTime)
  1496. files := map[string]any{
  1497. "audio": "audio.wav",
  1498. "audio_sample_rate": sess.sampleRate,
  1499. "audio_channels": sess.channels,
  1500. "audio_demod": sess.demodName,
  1501. "recording_mode": "streaming",
  1502. }
  1503. meta := Meta{
  1504. EventID: sess.signalID,
  1505. Start: sess.startTime,
  1506. End: sess.lastFeed,
  1507. CenterHz: sess.centerHz,
  1508. BandwidthHz: sess.bwHz,
  1509. SampleRate: sess.sampleRate,
  1510. SNRDb: sess.snrDb,
  1511. PeakDb: sess.peakDb,
  1512. Class: sess.class,
  1513. DurationMs: dur.Milliseconds(),
  1514. Files: files,
  1515. }
  1516. b, err := json.MarshalIndent(meta, "", " ")
  1517. if err == nil {
  1518. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1519. }
  1520. if policy != nil {
  1521. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1522. }
  1523. }
  1524. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1525. if len(sess.audioSubs) == 0 {
  1526. return
  1527. }
  1528. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1529. tagged := make([]byte, 1+pcmLen)
  1530. tagged[0] = 0x01
  1531. copy(tagged[1:], pcm[:pcmLen])
  1532. alive := sess.audioSubs[:0]
  1533. for _, sub := range sess.audioSubs {
  1534. select {
  1535. case sub.ch <- tagged:
  1536. default:
  1537. st.droppedPCM++
  1538. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1539. }
  1540. alive = append(alive, sub)
  1541. }
  1542. sess.audioSubs = alive
  1543. }
  1544. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1545. if len(st.policy.ClassFilter) == 0 {
  1546. return true
  1547. }
  1548. if cls == nil {
  1549. return false
  1550. }
  1551. for _, f := range st.policy.ClassFilter {
  1552. if strings.EqualFold(f, string(cls.ModType)) {
  1553. return true
  1554. }
  1555. }
  1556. return false
  1557. }
  1558. // ErrNoSession is returned when no matching signal session exists.
  1559. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1560. // ---------------------------------------------------------------------------
  1561. // WAV header helpers
  1562. // ---------------------------------------------------------------------------
  1563. func writeWAVFile(path string, audio []float32, sampleRate int, channels int) error {
  1564. f, err := os.Create(path)
  1565. if err != nil {
  1566. return err
  1567. }
  1568. defer f.Close()
  1569. return writeWAVTo(f, audio, sampleRate, channels)
  1570. }
  1571. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1572. if channels <= 0 {
  1573. channels = 1
  1574. }
  1575. hdr := make([]byte, 44)
  1576. copy(hdr[0:4], "RIFF")
  1577. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1578. copy(hdr[8:12], "WAVE")
  1579. copy(hdr[12:16], "fmt ")
  1580. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1581. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1582. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1583. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1584. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1585. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1586. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1587. copy(hdr[36:40], "data")
  1588. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1589. _, err := f.Write(hdr)
  1590. return err
  1591. }
  1592. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1593. dataSize := uint32(totalSamples * 2)
  1594. var buf [4]byte
  1595. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1596. if _, err := f.Seek(4, 0); err != nil {
  1597. return
  1598. }
  1599. _, _ = f.Write(buf[:])
  1600. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1601. if _, err := f.Seek(24, 0); err != nil {
  1602. return
  1603. }
  1604. _, _ = f.Write(buf[:])
  1605. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1606. if _, err := f.Seek(28, 0); err != nil {
  1607. return
  1608. }
  1609. _, _ = f.Write(buf[:])
  1610. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1611. if _, err := f.Seek(40, 0); err != nil {
  1612. return
  1613. }
  1614. _, _ = f.Write(buf[:])
  1615. }
  1616. // ResetStreams forces all active streaming sessions to discard their FIR states and decimation phases.
  1617. // This is used when the upstream DSP drops samples, creating a hard break in phase continuity.
  1618. func (st *Streamer) ResetStreams() {
  1619. st.mu.Lock()
  1620. defer st.mu.Unlock()
  1621. for _, sess := range st.sessions {
  1622. sess.preDemodFIR = nil
  1623. sess.preDemodDecimator = nil
  1624. sess.preDemodDecimPhase = 0
  1625. sess.stereoResampler = nil
  1626. sess.monoResampler = nil
  1627. }
  1628. }