Wideband autonomous SDR analysis engine forked from sdr-visual-suite
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2220 lines
72KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "sort"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "sdr-wideband-suite/internal/classifier"
  18. "sdr-wideband-suite/internal/demod"
  19. "sdr-wideband-suite/internal/detector"
  20. "sdr-wideband-suite/internal/dsp"
  21. "sdr-wideband-suite/internal/logging"
  22. "sdr-wideband-suite/internal/telemetry"
  23. )
  24. // ---------------------------------------------------------------------------
  25. // streamSession — one open demod session for one signal
  26. // ---------------------------------------------------------------------------
  27. type streamSession struct {
  28. sessionID string
  29. signalID int64
  30. centerHz float64
  31. bwHz float64
  32. snrDb float64
  33. peakDb float64
  34. class *classifier.Classification
  35. startTime time.Time
  36. lastFeed time.Time
  37. playbackMode string
  38. stereoState string
  39. lastAudioTs time.Time
  40. debugDumpStart time.Time
  41. debugDumpUntil time.Time
  42. debugDumpBase string
  43. demodDump []float32
  44. finalDump []float32
  45. lastAudioL float32
  46. lastAudioR float32
  47. prevAudioL float64 // second-to-last L sample for boundary transient detection
  48. lastAudioSet bool
  49. lastDecIQ complex64
  50. prevDecIQ complex64
  51. lastDecIQSet bool
  52. lastExtractIQ complex64
  53. prevExtractIQ complex64
  54. lastExtractIQSet bool
  55. // FM discriminator cross-block bridging: carry the last IQ sample so the
  56. // discriminator can compute the phase step across block boundaries.
  57. lastDiscrimIQ complex64
  58. lastDiscrimIQSet bool
  59. lastDemodL float32
  60. prevDemodL float64
  61. lastDemodSet bool
  62. snippetSeq uint64
  63. // listenOnly sessions have no WAV file and no disk I/O.
  64. // They exist solely to feed audio to live-listen subscribers.
  65. listenOnly bool
  66. // Recording state (nil/zero for listen-only sessions)
  67. dir string
  68. wavFile *os.File
  69. wavBuf *bufio.Writer
  70. wavSamples int64
  71. segmentIdx int
  72. sampleRate int // actual output audio sample rate (always streamAudioRate)
  73. channels int
  74. demodName string
  75. // --- Persistent DSP state for click-free streaming ---
  76. // Overlap-save: tail of previous extracted IQ snippet.
  77. // Currently unused for live demod after removing the extra discriminator
  78. // overlap prepend, but kept in DSP snapshot state for compatibility.
  79. overlapIQ []complex64
  80. // De-emphasis IIR state (persists across frames)
  81. deemphL float64
  82. deemphR float64
  83. // Stereo lock state for live WFM streaming
  84. stereoEnabled bool
  85. stereoOnCount int
  86. stereoOffCount int
  87. // Pilot-locked stereo PLL state (19kHz pilot)
  88. pilotPhase float64
  89. pilotFreq float64
  90. pilotAlpha float64
  91. pilotBeta float64
  92. pilotErrAvg float64
  93. pilotI float64
  94. pilotQ float64
  95. pilotLPAlpha float64
  96. // Polyphase resampler (replaces integer-decimate hack)
  97. monoResampler *dsp.Resampler
  98. monoResamplerRate int
  99. stereoResampler *dsp.StereoResampler
  100. stereoResamplerRate int
  101. // AQ-4: Stateful FIR filters for click-free stereo decode
  102. stereoFilterRate int
  103. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  104. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  105. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  106. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  107. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  108. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  109. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  110. // WFM 15kHz audio LPF — removes pilot (19kHz), L-R subcarrier (23-53kHz),
  111. // and RDS (57kHz) from the FM discriminator output before resampling.
  112. // Without this, the pilot leaks into the audio as a 19kHz tone (+55dB above
  113. // noise floor) and L-R subcarrier energy causes audible click-like artifacts.
  114. wfmAudioLPF *dsp.StatefulFIRReal
  115. wfmAudioLPFRate int
  116. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  117. // and avoids per-frame FIR recomputation)
  118. preDemodFIR *dsp.StatefulFIRComplex
  119. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  120. preDemodDecim int // cached decimation factor
  121. preDemodRate int // cached snipRate this FIR was built for
  122. preDemodCutoff float64 // cached cutoff
  123. preDemodDecimPhase int // retained for backward compatibility in snapshots/debug
  124. // AQ-2: De-emphasis config (µs, 0 = disabled)
  125. deemphasisUs float64
  126. // Scratch buffers — reused across frames to avoid GC pressure.
  127. // Grown as needed, never shrunk.
  128. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  129. scratchAudio []float32 // for stereo decode intermediates
  130. scratchPCM []byte // for PCM encoding
  131. // live-listen subscribers
  132. audioSubs []audioSub
  133. }
  134. type audioSub struct {
  135. id int64
  136. ch chan []byte
  137. }
  138. type RuntimeSignalInfo struct {
  139. DemodName string
  140. PlaybackMode string
  141. StereoState string
  142. Channels int
  143. SampleRate int
  144. }
  145. // AudioInfo describes the audio format of a live-listen subscription.
  146. // Sent to the WebSocket client as the first message.
  147. type AudioInfo struct {
  148. SampleRate int `json:"sample_rate"`
  149. Channels int `json:"channels"`
  150. Format string `json:"format"` // always "s16le"
  151. DemodName string `json:"demod"`
  152. PlaybackMode string `json:"playback_mode,omitempty"`
  153. StereoState string `json:"stereo_state,omitempty"`
  154. }
  155. const (
  156. streamAudioRate = 48000
  157. resamplerTaps = 32 // taps per polyphase arm — good quality
  158. )
  159. var debugDumpDelay = func() time.Duration {
  160. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DELAY_SECONDS"))
  161. if raw == "" {
  162. return 5 * time.Second
  163. }
  164. v, err := strconv.Atoi(raw)
  165. if err != nil || v < 0 {
  166. return 5 * time.Second
  167. }
  168. return time.Duration(v) * time.Second
  169. }()
  170. var debugDumpDuration = func() time.Duration {
  171. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DURATION_SECONDS"))
  172. if raw == "" {
  173. return 15 * time.Second
  174. }
  175. v, err := strconv.Atoi(raw)
  176. if err != nil || v <= 0 {
  177. return 15 * time.Second
  178. }
  179. return time.Duration(v) * time.Second
  180. }()
  181. var audioDumpEnabled = func() bool {
  182. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_AUDIO_DUMP_ENABLED"))
  183. if raw == "" {
  184. return false
  185. }
  186. v, err := strconv.ParseBool(raw)
  187. if err != nil {
  188. return false
  189. }
  190. return v
  191. }()
  192. var decHeadTrimSamples = func() int {
  193. raw := strings.TrimSpace(os.Getenv("SDR_DEC_HEAD_TRIM"))
  194. if raw == "" {
  195. return 0
  196. }
  197. v, err := strconv.Atoi(raw)
  198. if err != nil || v < 0 {
  199. return 0
  200. }
  201. return v
  202. }()
  203. // ---------------------------------------------------------------------------
  204. // Streamer — manages all active streaming sessions
  205. // ---------------------------------------------------------------------------
  206. type streamFeedItem struct {
  207. signal detector.Signal
  208. snippet []complex64
  209. snipRate int
  210. }
  211. type streamFeedMsg struct {
  212. traceID uint64
  213. items []streamFeedItem
  214. enqueuedAt time.Time
  215. }
  216. type Streamer struct {
  217. mu sync.Mutex
  218. sessions map[int64]*streamSession
  219. policy Policy
  220. centerHz float64
  221. nextSub int64
  222. feedCh chan streamFeedMsg
  223. done chan struct{}
  224. droppedFeed uint64
  225. droppedPCM uint64
  226. lastFeedTS time.Time
  227. lastProcTS time.Time
  228. // pendingListens are subscribers waiting for a matching session.
  229. pendingListens map[int64]*pendingListen
  230. telemetry *telemetry.Collector
  231. debugSummary *audioStutterDebugLogger
  232. summaryStop chan struct{}
  233. summaryWG sync.WaitGroup
  234. // Stream summary counters (cheap to maintain, sampled every ~5s)
  235. producedPCMFrames uint64
  236. processLoopCount uint64
  237. processLoopSumMs float64
  238. processLoopMaxMs float64
  239. }
  240. type pendingListen struct {
  241. freq float64
  242. bw float64
  243. mode string
  244. ch chan []byte
  245. }
  246. func newStreamer(policy Policy, centerHz float64, coll *telemetry.Collector) *Streamer {
  247. st := &Streamer{
  248. sessions: make(map[int64]*streamSession),
  249. policy: policy,
  250. centerHz: centerHz,
  251. feedCh: make(chan streamFeedMsg, 2),
  252. done: make(chan struct{}),
  253. pendingListens: make(map[int64]*pendingListen),
  254. telemetry: coll,
  255. debugSummary: newAudioStutterDebugLogger(),
  256. summaryStop: make(chan struct{}),
  257. }
  258. go st.worker()
  259. st.summaryWG.Add(1)
  260. go st.summaryWorker()
  261. return st
  262. }
  263. func (st *Streamer) worker() {
  264. for msg := range st.feedCh {
  265. st.processFeed(msg)
  266. }
  267. close(st.done)
  268. }
  269. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  270. st.mu.Lock()
  271. defer st.mu.Unlock()
  272. wasEnabled := st.policy.Enabled
  273. st.policy = policy
  274. st.centerHz = centerHz
  275. // If recording was just disabled, close recording sessions
  276. // but keep listen-only sessions alive.
  277. if wasEnabled && !policy.Enabled {
  278. for id, sess := range st.sessions {
  279. if sess.listenOnly {
  280. continue
  281. }
  282. if len(sess.audioSubs) > 0 {
  283. // Convert to listen-only: close WAV but keep session
  284. convertToListenOnly(sess)
  285. } else {
  286. closeSession(sess, &st.policy)
  287. delete(st.sessions, id)
  288. }
  289. }
  290. }
  291. }
  292. // HasListeners returns true if any sessions have audio subscribers
  293. // or there are pending listen requests. Used by the DSP loop to
  294. // decide whether to feed snippets even when recording is disabled.
  295. func (st *Streamer) HasListeners() bool {
  296. st.mu.Lock()
  297. defer st.mu.Unlock()
  298. return st.hasListenersLocked()
  299. }
  300. func (st *Streamer) hasListenersLocked() bool {
  301. if len(st.pendingListens) > 0 {
  302. return true
  303. }
  304. for _, sess := range st.sessions {
  305. if len(sess.audioSubs) > 0 {
  306. return true
  307. }
  308. }
  309. return false
  310. }
  311. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  312. // Feeds are accepted if:
  313. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  314. // - Any live-listen subscribers exist (listen-only mode)
  315. //
  316. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  317. // data, so items can be passed directly without another copy.
  318. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  319. st.mu.Lock()
  320. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  321. hasListeners := st.hasListenersLocked()
  322. pending := len(st.pendingListens)
  323. debugLiveAudio := st.policy.DebugLiveAudio
  324. now := time.Now()
  325. if !st.lastFeedTS.IsZero() {
  326. gap := now.Sub(st.lastFeedTS)
  327. if gap > 150*time.Millisecond {
  328. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  329. }
  330. }
  331. st.lastFeedTS = now
  332. st.mu.Unlock()
  333. if debugLiveAudio {
  334. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  335. }
  336. if (!recEnabled && !hasListeners) || len(items) == 0 {
  337. return
  338. }
  339. if st.telemetry != nil {
  340. st.telemetry.SetGauge("streamer.feed.queue_len", float64(len(st.feedCh)), nil)
  341. st.telemetry.SetGauge("streamer.pending_listeners", float64(pending), nil)
  342. st.telemetry.Observe("streamer.feed.batch_size", float64(len(items)), nil)
  343. }
  344. select {
  345. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items, enqueuedAt: time.Now()}:
  346. default:
  347. st.droppedFeed++
  348. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  349. if st.telemetry != nil {
  350. st.telemetry.IncCounter("streamer.feed.drop", 1, nil)
  351. st.telemetry.Event("stream_feed_drop", "warn", "feed queue full", nil, map[string]any{
  352. "trace_id": traceID,
  353. "queue_len": len(st.feedCh),
  354. })
  355. }
  356. }
  357. }
  358. // processFeed runs in the worker goroutine.
  359. func (st *Streamer) processFeed(msg streamFeedMsg) {
  360. procStart := time.Now()
  361. lockStart := time.Now()
  362. st.mu.Lock()
  363. lockWait := time.Since(lockStart)
  364. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  365. hasListeners := st.hasListenersLocked()
  366. now := time.Now()
  367. if !st.lastProcTS.IsZero() {
  368. gap := now.Sub(st.lastProcTS)
  369. if gap > 150*time.Millisecond {
  370. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  371. if st.telemetry != nil {
  372. st.telemetry.IncCounter("streamer.process.gap.count", 1, nil)
  373. st.telemetry.Observe("streamer.process.gap_ms", float64(gap.Milliseconds()), nil)
  374. }
  375. }
  376. }
  377. st.lastProcTS = now
  378. defer st.mu.Unlock()
  379. defer func() {
  380. procMs := float64(time.Since(procStart).Microseconds()) / 1000.0
  381. st.processLoopCount++
  382. st.processLoopSumMs += procMs
  383. if procMs > st.processLoopMaxMs {
  384. st.processLoopMaxMs = procMs
  385. }
  386. if st.telemetry != nil {
  387. st.telemetry.Observe("streamer.process.total_ms", procMs, nil)
  388. st.telemetry.Observe("streamer.lock_wait_ms", float64(lockWait.Microseconds())/1000.0, telemetry.TagsFromPairs("lock", "process"))
  389. }
  390. }()
  391. if st.telemetry != nil {
  392. st.telemetry.Observe("streamer.feed.enqueue_delay_ms", float64(now.Sub(msg.enqueuedAt).Microseconds())/1000.0, nil)
  393. st.telemetry.SetGauge("streamer.sessions.active", float64(len(st.sessions)), nil)
  394. }
  395. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  396. if !recEnabled && !hasListeners {
  397. return
  398. }
  399. seen := make(map[int64]bool, len(msg.items))
  400. for i := range msg.items {
  401. item := &msg.items[i]
  402. sig := &item.signal
  403. seen[sig.ID] = true
  404. if sig.ID == 0 || sig.Class == nil {
  405. continue
  406. }
  407. if len(item.snippet) == 0 || item.snipRate <= 0 {
  408. continue
  409. }
  410. // Decide whether this signal needs a session
  411. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  412. needsListen := st.signalHasListenerLocked(sig)
  413. className := "<nil>"
  414. demodName := ""
  415. if sig.Class != nil {
  416. className = string(sig.Class.ModType)
  417. demodName, _ = resolveDemod(sig)
  418. }
  419. if st.policy.DebugLiveAudio {
  420. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  421. }
  422. if !needsRecording && !needsListen {
  423. continue
  424. }
  425. sess, exists := st.sessions[sig.ID]
  426. requestedMode := ""
  427. for _, pl := range st.pendingListens {
  428. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  429. if m := normalizeRequestedMode(pl.mode); m != "" {
  430. requestedMode = m
  431. break
  432. }
  433. }
  434. }
  435. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  436. for _, sub := range sess.audioSubs {
  437. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  438. }
  439. delete(st.sessions, sig.ID)
  440. sess = nil
  441. exists = false
  442. }
  443. if !exists {
  444. if needsRecording {
  445. s, err := st.openRecordingSession(sig, now)
  446. if err != nil {
  447. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  448. sig.ID, sig.CenterHz/1e6, err)
  449. if st.telemetry != nil {
  450. st.telemetry.IncCounter("streamer.session.open_error", 1, telemetry.TagsFromPairs("kind", "recording"))
  451. }
  452. continue
  453. }
  454. st.sessions[sig.ID] = s
  455. sess = s
  456. } else {
  457. s := st.openListenSession(sig, now)
  458. st.sessions[sig.ID] = s
  459. sess = s
  460. }
  461. // Attach any pending listeners
  462. st.attachPendingListeners(sess)
  463. if st.telemetry != nil {
  464. st.telemetry.IncCounter("streamer.session.open", 1, telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID)))
  465. st.telemetry.Event("session_open", "info", "stream session opened", telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{
  466. "listen_only": sess.listenOnly,
  467. "demod": sess.demodName,
  468. })
  469. }
  470. }
  471. // Update metadata
  472. sess.lastFeed = now
  473. sess.centerHz = sig.CenterHz
  474. sess.bwHz = sig.BWHz
  475. if sig.SNRDb > sess.snrDb {
  476. sess.snrDb = sig.SNRDb
  477. }
  478. if sig.PeakDb > sess.peakDb {
  479. sess.peakDb = sig.PeakDb
  480. }
  481. if sig.Class != nil {
  482. sess.class = sig.Class
  483. }
  484. // Demod with persistent state
  485. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  486. audioStart := time.Now()
  487. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate, st.telemetry)
  488. if st.telemetry != nil {
  489. st.telemetry.Observe("streamer.process_snippet_ms", float64(time.Since(audioStart).Microseconds())/1000.0, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  490. }
  491. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  492. if len(audio) == 0 {
  493. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  494. if st.telemetry != nil {
  495. st.telemetry.IncCounter("streamer.audio.empty", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  496. }
  497. }
  498. if len(audio) > 0 {
  499. ch := sess.channels
  500. if ch <= 0 {
  501. ch = 1
  502. }
  503. st.producedPCMFrames += uint64(len(audio) / ch)
  504. if sess.wavSamples == 0 && audioRate > 0 {
  505. sess.sampleRate = audioRate
  506. }
  507. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  508. pcmLen := len(audio) * 2
  509. pcm := sess.growPCM(pcmLen)
  510. for k, s := range audio {
  511. v := int16(clip(s * 32767))
  512. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  513. }
  514. if !sess.listenOnly && sess.wavBuf != nil {
  515. n, err := sess.wavBuf.Write(pcm)
  516. if err != nil {
  517. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  518. } else {
  519. sess.wavSamples += int64(n / 2)
  520. }
  521. }
  522. // Gap logging for live-audio sessions + transient click detector
  523. if len(sess.audioSubs) > 0 {
  524. if !sess.lastAudioTs.IsZero() {
  525. gap := time.Since(sess.lastAudioTs)
  526. if gap > 150*time.Millisecond {
  527. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  528. if st.telemetry != nil {
  529. st.telemetry.IncCounter("streamer.audio.gap.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  530. st.telemetry.Observe("streamer.audio.gap_ms", float64(gap.Milliseconds()), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  531. }
  532. }
  533. }
  534. // Transient click detector: finds short impulses (1-3 samples)
  535. // that deviate sharply from the local signal trend.
  536. // A click looks like: ...smooth... SPIKE ...smooth...
  537. // Normal FM audio has large deltas too, but they follow
  538. // a continuous curve. A click has high |d2/dt2| (acceleration).
  539. //
  540. // Method: second-derivative detector. For each sample triplet
  541. // (a, b, c), compute |2b - a - c| which is the discrete
  542. // second derivative magnitude. High values = transient spike.
  543. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  544. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  545. stride := sess.channels
  546. if stride < 1 {
  547. stride = 1
  548. }
  549. nFrames := len(audio) / stride
  550. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  551. if sess.lastAudioSet && nFrames >= 1 {
  552. // second derivative across boundary: |2*last - prevLast - first|
  553. first := float64(audio[0])
  554. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  555. if d2 > 0.15 {
  556. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  557. if st.telemetry != nil {
  558. st.telemetry.IncCounter("audio.boundary_click.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  559. st.telemetry.Observe("audio.boundary_click.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  560. }
  561. }
  562. }
  563. // Intra-frame transient scan (L channel only for performance)
  564. nClicks := 0
  565. maxD2 := float64(0)
  566. maxD2Pos := 0
  567. for k := 1; k < nFrames-1; k++ {
  568. a := float64(audio[(k-1)*stride])
  569. b := float64(audio[k*stride])
  570. c := float64(audio[(k+1)*stride])
  571. d2 := math.Abs(2*b - a - c)
  572. if d2 > maxD2 {
  573. maxD2 = d2
  574. maxD2Pos = k
  575. }
  576. if d2 > 0.15 {
  577. nClicks++
  578. }
  579. }
  580. if nClicks > 0 {
  581. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  582. if st.telemetry != nil {
  583. st.telemetry.IncCounter("audio.intra_click.count", float64(nClicks), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  584. st.telemetry.Observe("audio.intra_click.max_d2", maxD2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  585. }
  586. }
  587. // Store last two samples for next frame's boundary check
  588. if nFrames >= 2 {
  589. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  590. sess.lastAudioL = audio[(nFrames-1)*stride]
  591. if stride > 1 {
  592. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  593. }
  594. } else if nFrames == 1 {
  595. sess.prevAudioL = float64(sess.lastAudioL)
  596. sess.lastAudioL = audio[0]
  597. if stride > 1 && len(audio) >= 2 {
  598. sess.lastAudioR = audio[1]
  599. }
  600. }
  601. sess.lastAudioSet = true
  602. }
  603. sess.lastAudioTs = time.Now()
  604. }
  605. st.fanoutPCM(sess, pcm, pcmLen)
  606. }
  607. // Segment split (recording sessions only)
  608. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  609. segIdx := sess.segmentIdx + 1
  610. oldSubs := sess.audioSubs
  611. oldState := sess.captureDSPState()
  612. sess.audioSubs = nil
  613. closeSession(sess, &st.policy)
  614. s, err := st.openRecordingSession(sig, now)
  615. if err != nil {
  616. delete(st.sessions, sig.ID)
  617. continue
  618. }
  619. s.segmentIdx = segIdx
  620. s.audioSubs = oldSubs
  621. s.restoreDSPState(oldState)
  622. st.sessions[sig.ID] = s
  623. if st.telemetry != nil {
  624. st.telemetry.IncCounter("streamer.session.reopen", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID)))
  625. st.telemetry.Event("session_reopen", "info", "stream session rotated by max duration", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{
  626. "old_session": sess.sessionID,
  627. "new_session": s.sessionID,
  628. })
  629. }
  630. }
  631. }
  632. // Close sessions for disappeared signals (with grace period)
  633. for id, sess := range st.sessions {
  634. if seen[id] {
  635. continue
  636. }
  637. gracePeriod := 3 * time.Second
  638. if sess.listenOnly {
  639. gracePeriod = 5 * time.Second
  640. }
  641. if now.Sub(sess.lastFeed) > gracePeriod {
  642. for _, sub := range sess.audioSubs {
  643. close(sub.ch)
  644. }
  645. sess.audioSubs = nil
  646. if !sess.listenOnly {
  647. closeSession(sess, &st.policy)
  648. }
  649. if st.telemetry != nil {
  650. st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID))
  651. st.telemetry.Event("session_close", "info", "stream session closed", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID), map[string]any{
  652. "reason": "signal_missing",
  653. "listen_only": sess.listenOnly,
  654. })
  655. }
  656. delete(st.sessions, id)
  657. }
  658. }
  659. }
  660. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  661. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  662. if st.policy.DebugLiveAudio {
  663. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  664. }
  665. return true
  666. }
  667. for subID, pl := range st.pendingListens {
  668. delta := math.Abs(sig.CenterHz - pl.freq)
  669. if delta < 200000 {
  670. if st.policy.DebugLiveAudio {
  671. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  672. }
  673. return true
  674. }
  675. }
  676. return false
  677. }
  678. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  679. for subID, pl := range st.pendingListens {
  680. requestedMode := normalizeRequestedMode(pl.mode)
  681. if requestedMode != "" && sess.demodName != requestedMode {
  682. continue
  683. }
  684. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  685. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  686. delete(st.pendingListens, subID)
  687. // Send updated audio_info now that we know the real session params.
  688. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  689. infoJSON, _ := json.Marshal(sess.audioInfo())
  690. tagged := make([]byte, 1+len(infoJSON))
  691. tagged[0] = 0x00 // tag: audio_info
  692. copy(tagged[1:], infoJSON)
  693. select {
  694. case pl.ch <- tagged:
  695. default:
  696. }
  697. if audioDumpEnabled {
  698. now := time.Now()
  699. sess.debugDumpStart = now.Add(debugDumpDelay)
  700. sess.debugDumpUntil = sess.debugDumpStart.Add(debugDumpDuration)
  701. sess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", sess.signalID, now.Format("20060102-150405")))
  702. sess.demodDump = nil
  703. sess.finalDump = nil
  704. }
  705. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  706. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  707. if audioDumpEnabled {
  708. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", sess.signalID, sess.debugDumpStart.Format(time.RFC3339), sess.debugDumpUntil.Format(time.RFC3339))
  709. }
  710. }
  711. }
  712. }
  713. // CloseAll finalises all sessions and stops the worker goroutine.
  714. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  715. st.mu.Lock()
  716. defer st.mu.Unlock()
  717. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  718. for _, sess := range st.sessions {
  719. out[sess.signalID] = RuntimeSignalInfo{
  720. DemodName: sess.demodName,
  721. PlaybackMode: sess.playbackMode,
  722. StereoState: sess.stereoState,
  723. Channels: sess.channels,
  724. SampleRate: sess.sampleRate,
  725. }
  726. }
  727. return out
  728. }
  729. func (st *Streamer) CloseAll() {
  730. close(st.summaryStop)
  731. st.summaryWG.Wait()
  732. close(st.feedCh)
  733. <-st.done
  734. st.mu.Lock()
  735. defer st.mu.Unlock()
  736. for id, sess := range st.sessions {
  737. for _, sub := range sess.audioSubs {
  738. close(sub.ch)
  739. }
  740. sess.audioSubs = nil
  741. if !sess.listenOnly {
  742. closeSession(sess, &st.policy)
  743. }
  744. if st.telemetry != nil {
  745. st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID))
  746. }
  747. delete(st.sessions, id)
  748. }
  749. for _, pl := range st.pendingListens {
  750. close(pl.ch)
  751. }
  752. st.pendingListens = nil
  753. if st.telemetry != nil {
  754. st.telemetry.Event("streamer_close_all", "info", "all stream sessions closed", nil, nil)
  755. }
  756. if st.debugSummary != nil {
  757. st.debugSummary.Close()
  758. }
  759. }
  760. // ActiveSessions returns the number of open streaming sessions.
  761. func (st *Streamer) ActiveSessions() int {
  762. st.mu.Lock()
  763. defer st.mu.Unlock()
  764. return len(st.sessions)
  765. }
  766. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  767. //
  768. // LL-2: Returns AudioInfo with correct channels and sample rate.
  769. // LL-3: Returns error only on hard failures (nil streamer etc).
  770. //
  771. // If a matching session exists, attaches immediately. Otherwise, the
  772. // subscriber is held as "pending" and will be attached when a matching
  773. // signal appears in the next DSP frame.
  774. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  775. ch := make(chan []byte, 64)
  776. st.mu.Lock()
  777. defer st.mu.Unlock()
  778. st.nextSub++
  779. subID := st.nextSub
  780. requestedMode := normalizeRequestedMode(mode)
  781. // Try to find a matching session
  782. var bestSess *streamSession
  783. bestDist := math.MaxFloat64
  784. for _, sess := range st.sessions {
  785. if requestedMode != "" && sess.demodName != requestedMode {
  786. continue
  787. }
  788. d := math.Abs(sess.centerHz - freq)
  789. if d < bestDist {
  790. bestDist = d
  791. bestSess = sess
  792. }
  793. }
  794. if bestSess != nil && bestDist < 200000 {
  795. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  796. if audioDumpEnabled {
  797. now := time.Now()
  798. bestSess.debugDumpStart = now.Add(debugDumpDelay)
  799. bestSess.debugDumpUntil = bestSess.debugDumpStart.Add(debugDumpDuration)
  800. bestSess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", bestSess.signalID, now.Format("20060102-150405")))
  801. bestSess.demodDump = nil
  802. bestSess.finalDump = nil
  803. }
  804. info := bestSess.audioInfo()
  805. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  806. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  807. if audioDumpEnabled {
  808. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", bestSess.signalID, bestSess.debugDumpStart.Format(time.RFC3339), bestSess.debugDumpUntil.Format(time.RFC3339))
  809. }
  810. if st.telemetry != nil {
  811. st.telemetry.IncCounter("streamer.listener.attach", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", bestSess.signalID), "session_id", bestSess.sessionID))
  812. }
  813. return subID, ch, info, nil
  814. }
  815. // No matching session yet — add as pending listener
  816. st.pendingListens[subID] = &pendingListen{
  817. freq: freq,
  818. bw: bw,
  819. mode: mode,
  820. ch: ch,
  821. }
  822. info := defaultAudioInfoForMode(mode)
  823. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  824. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  825. if st.telemetry != nil {
  826. st.telemetry.IncCounter("streamer.listener.pending", 1, nil)
  827. st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil)
  828. }
  829. return subID, ch, info, nil
  830. }
  831. // UnsubscribeAudio removes a live-listen subscriber.
  832. func (st *Streamer) UnsubscribeAudio(subID int64) {
  833. st.mu.Lock()
  834. defer st.mu.Unlock()
  835. if pl, ok := st.pendingListens[subID]; ok {
  836. close(pl.ch)
  837. delete(st.pendingListens, subID)
  838. if st.telemetry != nil {
  839. st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "pending"))
  840. st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil)
  841. }
  842. return
  843. }
  844. for _, sess := range st.sessions {
  845. for i, sub := range sess.audioSubs {
  846. if sub.id == subID {
  847. close(sub.ch)
  848. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  849. if st.telemetry != nil {
  850. st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "active", "session_id", sess.sessionID))
  851. }
  852. return
  853. }
  854. }
  855. }
  856. }
  857. // ---------------------------------------------------------------------------
  858. // Session: stateful extraction + demod
  859. // ---------------------------------------------------------------------------
  860. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  861. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  862. // output with zero transient artifacts.
  863. type iqHeadProbeStats struct {
  864. meanMag float64
  865. minMag float64
  866. maxStep float64
  867. p95Step float64
  868. lowMag int
  869. }
  870. func probeIQHeadStats(iq []complex64, probeLen int) iqHeadProbeStats {
  871. if probeLen <= 0 || len(iq) == 0 {
  872. return iqHeadProbeStats{}
  873. }
  874. if len(iq) < probeLen {
  875. probeLen = len(iq)
  876. }
  877. stats := iqHeadProbeStats{minMag: math.MaxFloat64}
  878. steps := make([]float64, 0, probeLen)
  879. var sum float64
  880. for i := 0; i < probeLen; i++ {
  881. v := iq[i]
  882. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  883. sum += mag
  884. if mag < stats.minMag {
  885. stats.minMag = mag
  886. }
  887. if mag < 0.02 {
  888. stats.lowMag++
  889. }
  890. if i > 0 {
  891. p := iq[i-1]
  892. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  893. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  894. step := math.Abs(math.Atan2(num, den))
  895. steps = append(steps, step)
  896. if step > stats.maxStep {
  897. stats.maxStep = step
  898. }
  899. }
  900. }
  901. stats.meanMag = sum / float64(probeLen)
  902. if len(steps) > 0 {
  903. sorted := append([]float64(nil), steps...)
  904. sort.Float64s(sorted)
  905. idx := int(math.Round(0.95 * float64(len(sorted)-1)))
  906. if idx < 0 {
  907. idx = 0
  908. }
  909. if idx >= len(sorted) {
  910. idx = len(sorted) - 1
  911. }
  912. stats.p95Step = sorted[idx]
  913. }
  914. if stats.minMag == math.MaxFloat64 {
  915. stats.minMag = 0
  916. }
  917. return stats
  918. }
  919. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int, coll *telemetry.Collector) ([]float32, int) {
  920. if len(snippet) == 0 || snipRate <= 0 {
  921. return nil, 0
  922. }
  923. baseTags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)
  924. if coll != nil {
  925. coll.SetGauge("iq.stage.snippet.length", float64(len(snippet)), baseTags)
  926. stats := probeIQHeadStats(snippet, 64)
  927. coll.Observe("iq.snippet.head_mean_mag", stats.meanMag, baseTags)
  928. coll.Observe("iq.snippet.head_min_mag", stats.minMag, baseTags)
  929. coll.Observe("iq.snippet.head_max_step", stats.maxStep, baseTags)
  930. coll.Observe("iq.snippet.head_p95_step", stats.p95Step, baseTags)
  931. coll.SetGauge("iq.snippet.head_low_magnitude_count", float64(stats.lowMag), baseTags)
  932. if sess.lastExtractIQSet {
  933. prevMag := math.Hypot(float64(real(sess.lastExtractIQ)), float64(imag(sess.lastExtractIQ)))
  934. currMag := math.Hypot(float64(real(snippet[0])), float64(imag(snippet[0])))
  935. deltaMag := math.Abs(currMag - prevMag)
  936. num := float64(real(sess.lastExtractIQ))*float64(imag(snippet[0])) - float64(imag(sess.lastExtractIQ))*float64(real(snippet[0]))
  937. den := float64(real(sess.lastExtractIQ))*float64(real(snippet[0])) + float64(imag(sess.lastExtractIQ))*float64(imag(snippet[0]))
  938. deltaPhase := math.Abs(math.Atan2(num, den))
  939. d2 := float64(real(snippet[0]-sess.lastExtractIQ))*float64(real(snippet[0]-sess.lastExtractIQ)) + float64(imag(snippet[0]-sess.lastExtractIQ))*float64(imag(snippet[0]-sess.lastExtractIQ))
  940. coll.Observe("iq.extract.output.boundary.delta_mag", deltaMag, baseTags)
  941. coll.Observe("iq.extract.output.boundary.delta_phase", deltaPhase, baseTags)
  942. coll.Observe("iq.extract.output.boundary.d2", d2, baseTags)
  943. coll.Observe("iq.extract.output.boundary.discontinuity_score", deltaMag+deltaPhase, baseTags)
  944. }
  945. }
  946. if len(snippet) > 0 {
  947. sess.prevExtractIQ = sess.lastExtractIQ
  948. sess.lastExtractIQ = snippet[len(snippet)-1]
  949. sess.lastExtractIQSet = true
  950. }
  951. isWFMStereo := sess.demodName == "WFM_STEREO"
  952. isWFM := sess.demodName == "WFM" || isWFMStereo
  953. demodName := sess.demodName
  954. if isWFMStereo {
  955. demodName = "WFM"
  956. }
  957. d := demod.Get(demodName)
  958. if d == nil {
  959. d = demod.Get("NFM")
  960. }
  961. if d == nil {
  962. return nil, 0
  963. }
  964. // The extra 1-sample discriminator overlap prepend was removed after it was
  965. // shown to shift the downstream decimation phase and create heavy click
  966. // artifacts in steady-state streaming/recording. The upstream extraction path
  967. // and the stateful FIR/decimation stages already provide continuity.
  968. fullSnip := snippet
  969. overlapApplied := false
  970. prevTailValid := false
  971. if logging.EnabledCategory("prefir") && len(fullSnip) > 0 {
  972. probeN := 64
  973. if len(fullSnip) < probeN {
  974. probeN = len(fullSnip)
  975. }
  976. minPreMag := math.MaxFloat64
  977. minPreIdx := 0
  978. maxPreStep := 0.0
  979. maxPreStepIdx := 0
  980. for i := 0; i < probeN; i++ {
  981. v := fullSnip[i]
  982. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  983. if mag < minPreMag {
  984. minPreMag = mag
  985. minPreIdx = i
  986. }
  987. if i > 0 {
  988. p := fullSnip[i-1]
  989. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  990. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  991. step := math.Abs(math.Atan2(num, den))
  992. if step > maxPreStep {
  993. maxPreStep = step
  994. maxPreStepIdx = i - 1
  995. }
  996. }
  997. }
  998. logging.Debug("prefir", "pre_fir_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "snip_len", len(fullSnip))
  999. if minPreMag < 0.18 {
  1000. logging.Warn("prefir", "pre_fir_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx)
  1001. }
  1002. if maxPreStep > 1.5 {
  1003. logging.Warn("prefir", "pre_fir_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "min_mag", minPreMag, "min_idx", minPreIdx)
  1004. }
  1005. }
  1006. // --- Stateful anti-alias FIR + decimation to demod rate ---
  1007. demodRate := d.OutputSampleRate()
  1008. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  1009. if decim1 < 1 {
  1010. decim1 = 1
  1011. }
  1012. // WFM override: force decim1=2 (256kHz) instead of round(512k/192k)=3 (170kHz).
  1013. // At decim1=3, Nyquist is 85kHz which clips FM broadcast ±75kHz deviation.
  1014. // At decim1=2, Nyquist is 128kHz → full FM deviation + stereo pilot + guard band.
  1015. // Bonus: 256000→48000 resampler ratio is L=3/M=16 (96 taps, 1kB) instead of
  1016. // the pathological L=24000/M=85333 (768k taps, 6MB) from 170666→48000.
  1017. if isWFM && decim1 > 2 && snipRate/2 >= 200000 {
  1018. decim1 = 2
  1019. }
  1020. actualDemodRate := snipRate / decim1
  1021. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  1022. var dec []complex64
  1023. if decim1 > 1 {
  1024. // FIR cutoff: for WFM, use 90kHz (above ±75kHz FM deviation + guard).
  1025. // For NFM/other: use standard Nyquist*0.8 cutoff.
  1026. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  1027. if isWFM {
  1028. cutoff = 90000
  1029. }
  1030. // Lazy-init or reinit stateful FIR if parameters changed
  1031. if sess.preDemodDecimator == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff || sess.preDemodDecim != decim1 {
  1032. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  1033. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  1034. sess.preDemodDecimator = dsp.NewStatefulDecimatingFIRComplex(taps, decim1)
  1035. sess.preDemodRate = snipRate
  1036. sess.preDemodCutoff = cutoff
  1037. sess.preDemodDecim = decim1
  1038. sess.preDemodDecimPhase = 0
  1039. if coll != nil {
  1040. coll.IncCounter("dsp.pre_demod.init", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1041. coll.Event("prefir_reinit", "info", "pre-demod decimator reinitialized", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1042. "snip_rate": snipRate,
  1043. "cutoff_hz": cutoff,
  1044. "decim": decim1,
  1045. })
  1046. }
  1047. }
  1048. decimPhaseBefore := sess.preDemodDecimPhase
  1049. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  1050. dec = sess.preDemodDecimator.Process(fullSnip)
  1051. sess.preDemodDecimPhase = sess.preDemodDecimator.Phase()
  1052. if coll != nil {
  1053. coll.Observe("dsp.pre_demod.decimation_factor", float64(decim1), baseTags)
  1054. coll.SetGauge("iq.stage.pre_demod.length", float64(len(dec)), baseTags)
  1055. decStats := probeIQHeadStats(dec, 64)
  1056. coll.Observe("iq.pre_demod.head_mean_mag", decStats.meanMag, baseTags)
  1057. coll.Observe("iq.pre_demod.head_min_mag", decStats.minMag, baseTags)
  1058. coll.Observe("iq.pre_demod.head_max_step", decStats.maxStep, baseTags)
  1059. coll.Observe("iq.pre_demod.head_p95_step", decStats.p95Step, baseTags)
  1060. coll.SetGauge("iq.pre_demod.head_low_magnitude_count", float64(decStats.lowMag), baseTags)
  1061. }
  1062. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase)
  1063. } else {
  1064. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0)
  1065. dec = fullSnip
  1066. }
  1067. if decHeadTrimSamples > 0 && decHeadTrimSamples < len(dec) {
  1068. logging.Warn("boundary", "dec_head_trim_applied", "signal", sess.signalID, "trim", decHeadTrimSamples, "before_len", len(dec))
  1069. dec = dec[decHeadTrimSamples:]
  1070. if coll != nil {
  1071. coll.IncCounter("dsp.pre_demod.head_trim", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1072. }
  1073. }
  1074. if logging.EnabledCategory("boundary") && len(dec) > 0 {
  1075. first := dec[0]
  1076. if sess.lastDecIQSet {
  1077. d2Re := math.Abs(2*float64(real(sess.lastDecIQ)) - float64(real(sess.prevDecIQ)) - float64(real(first)))
  1078. d2Im := math.Abs(2*float64(imag(sess.lastDecIQ)) - float64(imag(sess.prevDecIQ)) - float64(imag(first)))
  1079. d2Mag := math.Hypot(d2Re, d2Im)
  1080. if d2Mag > 0.15 {
  1081. logging.Warn("boundary", "dec_iq_boundary", "signal", sess.signalID, "d2", d2Mag)
  1082. if coll != nil {
  1083. coll.IncCounter("iq.dec.boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1084. coll.Observe("iq.dec.boundary.d2", d2Mag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1085. }
  1086. }
  1087. }
  1088. headN := 16
  1089. if len(dec) < headN {
  1090. headN = len(dec)
  1091. }
  1092. tailN := 16
  1093. if len(dec) < tailN {
  1094. tailN = len(dec)
  1095. }
  1096. var headSum, tailSum, minMag, maxMag float64
  1097. minMag = math.MaxFloat64
  1098. for i, v := range dec {
  1099. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  1100. if mag < minMag {
  1101. minMag = mag
  1102. }
  1103. if mag > maxMag {
  1104. maxMag = mag
  1105. }
  1106. if i < headN {
  1107. headSum += mag
  1108. }
  1109. }
  1110. for i := len(dec) - tailN; i < len(dec); i++ {
  1111. if i >= 0 {
  1112. v := dec[i]
  1113. tailSum += math.Hypot(float64(real(v)), float64(imag(v)))
  1114. }
  1115. }
  1116. headAvg := 0.0
  1117. if headN > 0 {
  1118. headAvg = headSum / float64(headN)
  1119. }
  1120. tailAvg := 0.0
  1121. if tailN > 0 {
  1122. tailAvg = tailSum / float64(tailN)
  1123. }
  1124. logging.Debug("boundary", "dec_iq_meter", "signal", sess.signalID, "len", len(dec), "head_avg", headAvg, "tail_avg", tailAvg, "min_mag", minMag, "max_mag", maxMag)
  1125. if tailAvg > 0 {
  1126. ratio := headAvg / tailAvg
  1127. if ratio < 0.75 || ratio > 1.25 {
  1128. logging.Warn("boundary", "dec_iq_head_tail_skew", "signal", sess.signalID, "head_avg", headAvg, "tail_avg", tailAvg, "ratio", ratio)
  1129. }
  1130. if coll != nil {
  1131. coll.Observe("iq.dec.head_tail_ratio", ratio, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1132. }
  1133. }
  1134. probeN := 64
  1135. if len(dec) < probeN {
  1136. probeN = len(dec)
  1137. }
  1138. minHeadMag := math.MaxFloat64
  1139. minHeadIdx := 0
  1140. maxHeadStep := 0.0
  1141. maxHeadStepIdx := 0
  1142. for i := 0; i < probeN; i++ {
  1143. v := dec[i]
  1144. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  1145. if mag < minHeadMag {
  1146. minHeadMag = mag
  1147. minHeadIdx = i
  1148. }
  1149. if i > 0 {
  1150. p := dec[i-1]
  1151. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  1152. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  1153. step := math.Abs(math.Atan2(num, den))
  1154. if step > maxHeadStep {
  1155. maxHeadStep = step
  1156. maxHeadStepIdx = i - 1
  1157. }
  1158. }
  1159. }
  1160. logging.Debug("boundary", "dec_iq_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  1161. if minHeadMag < 0.18 {
  1162. logging.Warn("boundary", "dec_iq_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  1163. }
  1164. if maxHeadStep > 1.5 {
  1165. logging.Warn("boundary", "dec_iq_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx, "min_mag", minHeadMag, "min_idx", minHeadIdx)
  1166. }
  1167. if coll != nil {
  1168. coll.Observe("iq.dec.magnitude.min", minMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1169. coll.Observe("iq.dec.magnitude.max", maxMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1170. coll.Observe("iq.dec.phase_step.max", maxHeadStep, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1171. }
  1172. if len(dec) >= 2 {
  1173. sess.prevDecIQ = dec[len(dec)-2]
  1174. sess.lastDecIQ = dec[len(dec)-1]
  1175. } else {
  1176. sess.prevDecIQ = sess.lastDecIQ
  1177. sess.lastDecIQ = dec[0]
  1178. }
  1179. sess.lastDecIQSet = true
  1180. }
  1181. // --- FM/AM/etc Demod ---
  1182. // For FM demod (NFM/WFM): bridge the block boundary by prepending the
  1183. // previous block's last IQ sample. Without this, the discriminator loses
  1184. // the cross-boundary phase step (1 audio sample missing per block) and
  1185. // any phase discontinuity at the seam becomes an unsmoothed audio transient.
  1186. var audio []float32
  1187. isFMDemod := demodName == "NFM" || demodName == "WFM"
  1188. if isFMDemod && sess.lastDiscrimIQSet && len(dec) > 0 {
  1189. bridged := make([]complex64, len(dec)+1)
  1190. bridged[0] = sess.lastDiscrimIQ
  1191. copy(bridged[1:], dec)
  1192. audio = d.Demod(bridged, actualDemodRate)
  1193. // bridged produced len(dec) audio samples (= len(bridged)-1)
  1194. // which is exactly the correct count for the new data
  1195. } else {
  1196. audio = d.Demod(dec, actualDemodRate)
  1197. }
  1198. if len(dec) > 0 {
  1199. sess.lastDiscrimIQ = dec[len(dec)-1]
  1200. sess.lastDiscrimIQSet = true
  1201. }
  1202. if len(audio) == 0 {
  1203. return nil, 0
  1204. }
  1205. if coll != nil {
  1206. coll.SetGauge("audio.stage.demod.length", float64(len(audio)), baseTags)
  1207. probe := 64
  1208. if len(audio) < probe {
  1209. probe = len(audio)
  1210. }
  1211. if probe > 0 {
  1212. var headAbs, tailAbs float64
  1213. for i := 0; i < probe; i++ {
  1214. headAbs += math.Abs(float64(audio[i]))
  1215. tailAbs += math.Abs(float64(audio[len(audio)-probe+i]))
  1216. }
  1217. coll.Observe("audio.demod.head_mean_abs", headAbs/float64(probe), baseTags)
  1218. coll.Observe("audio.demod.tail_mean_abs", tailAbs/float64(probe), baseTags)
  1219. coll.Observe("audio.demod.edge_delta_abs", math.Abs(float64(audio[0])-float64(audio[len(audio)-1])), baseTags)
  1220. }
  1221. }
  1222. if logging.EnabledCategory("boundary") {
  1223. stride := d.Channels()
  1224. if stride < 1 {
  1225. stride = 1
  1226. }
  1227. nFrames := len(audio) / stride
  1228. if nFrames > 0 {
  1229. first := float64(audio[0])
  1230. if sess.lastDemodSet {
  1231. d2 := math.Abs(2*float64(sess.lastDemodL) - sess.prevDemodL - first)
  1232. if d2 > 0.15 {
  1233. logging.Warn("boundary", "demod_boundary", "signal", sess.signalID, "d2", d2)
  1234. if coll != nil {
  1235. coll.IncCounter("audio.demod_boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1236. coll.Observe("audio.demod_boundary.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1237. }
  1238. }
  1239. }
  1240. if nFrames >= 2 {
  1241. sess.prevDemodL = float64(audio[(nFrames-2)*stride])
  1242. sess.lastDemodL = audio[(nFrames-1)*stride]
  1243. } else {
  1244. sess.prevDemodL = float64(sess.lastDemodL)
  1245. sess.lastDemodL = audio[0]
  1246. }
  1247. sess.lastDemodSet = true
  1248. }
  1249. }
  1250. logging.Debug("boundary", "audio_path", "signal", sess.signalID, "demod", demodName, "actual_rate", actualDemodRate, "audio_len", len(audio), "channels", d.Channels(), "overlap_applied", overlapApplied, "prev_tail_valid", prevTailValid)
  1251. shouldDump := !sess.debugDumpStart.IsZero() && !sess.debugDumpUntil.IsZero()
  1252. if shouldDump {
  1253. now := time.Now()
  1254. shouldDump = !now.Before(sess.debugDumpStart) && now.Before(sess.debugDumpUntil)
  1255. }
  1256. if shouldDump {
  1257. sess.demodDump = append(sess.demodDump, audio...)
  1258. }
  1259. // --- Stateful stereo decode with conservative lock/hysteresis ---
  1260. channels := 1
  1261. if isWFMStereo {
  1262. sess.playbackMode = "WFM_STEREO"
  1263. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  1264. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  1265. if locked {
  1266. sess.stereoOnCount++
  1267. sess.stereoOffCount = 0
  1268. if sess.stereoOnCount >= 4 {
  1269. sess.stereoEnabled = true
  1270. }
  1271. } else {
  1272. sess.stereoOnCount = 0
  1273. sess.stereoOffCount++
  1274. if sess.stereoOffCount >= 10 {
  1275. sess.stereoEnabled = false
  1276. }
  1277. }
  1278. prevPlayback := sess.playbackMode
  1279. prevStereo := sess.stereoState
  1280. if sess.stereoEnabled && len(stereoAudio) > 0 {
  1281. sess.stereoState = "locked"
  1282. audio = stereoAudio
  1283. } else {
  1284. sess.stereoState = "mono-fallback"
  1285. // Apply 15kHz LPF before output: the raw discriminator contains
  1286. // the 19kHz pilot (+55dB), L-R subcarrier (23-53kHz), and RDS (57kHz).
  1287. // Without filtering, the pilot leaks into audio and subcarrier
  1288. // energy produces audible click-like artifacts.
  1289. audio = sess.wfmAudioFilter(audio, actualDemodRate)
  1290. dual := make([]float32, len(audio)*2)
  1291. for i, s := range audio {
  1292. dual[i*2] = s
  1293. dual[i*2+1] = s
  1294. }
  1295. audio = dual
  1296. }
  1297. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  1298. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  1299. }
  1300. } else if isWFM {
  1301. // Plain WFM (not stereo): also needs 15kHz LPF on discriminator output
  1302. audio = sess.wfmAudioFilter(audio, actualDemodRate)
  1303. }
  1304. // --- Polyphase resample to exact 48kHz ---
  1305. if actualDemodRate != streamAudioRate {
  1306. if channels > 1 {
  1307. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  1308. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  1309. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1310. sess.stereoResamplerRate = actualDemodRate
  1311. if coll != nil {
  1312. coll.Event("resampler_reset", "info", "stereo resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1313. "mode": "stereo",
  1314. "rate": actualDemodRate,
  1315. })
  1316. }
  1317. }
  1318. audio = sess.stereoResampler.Process(audio)
  1319. } else {
  1320. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  1321. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  1322. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1323. sess.monoResamplerRate = actualDemodRate
  1324. if coll != nil {
  1325. coll.Event("resampler_reset", "info", "mono resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1326. "mode": "mono",
  1327. "rate": actualDemodRate,
  1328. })
  1329. }
  1330. }
  1331. audio = sess.monoResampler.Process(audio)
  1332. }
  1333. }
  1334. if coll != nil {
  1335. coll.SetGauge("audio.stage.output.length", float64(len(audio)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1336. }
  1337. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  1338. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  1339. tau := sess.deemphasisUs * 1e-6
  1340. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  1341. if channels > 1 {
  1342. nFrames := len(audio) / channels
  1343. yL, yR := sess.deemphL, sess.deemphR
  1344. for i := 0; i < nFrames; i++ {
  1345. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  1346. audio[i*2] = float32(yL)
  1347. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  1348. audio[i*2+1] = float32(yR)
  1349. }
  1350. sess.deemphL, sess.deemphR = yL, yR
  1351. } else {
  1352. y := sess.deemphL
  1353. for i := range audio {
  1354. y = alpha*y + (1-alpha)*float64(audio[i])
  1355. audio[i] = float32(y)
  1356. }
  1357. sess.deemphL = y
  1358. }
  1359. }
  1360. if isWFM {
  1361. for i := range audio {
  1362. audio[i] *= 0.35
  1363. }
  1364. }
  1365. if shouldDump {
  1366. sess.finalDump = append(sess.finalDump, audio...)
  1367. } else if !sess.debugDumpUntil.IsZero() && time.Now().After(sess.debugDumpUntil) && sess.debugDumpBase != "" {
  1368. _ = os.MkdirAll(filepath.Dir(sess.debugDumpBase), 0o755)
  1369. if len(sess.demodDump) > 0 {
  1370. _ = writeWAVFile(sess.debugDumpBase+"-demod.wav", sess.demodDump, actualDemodRate, d.Channels())
  1371. }
  1372. if len(sess.finalDump) > 0 {
  1373. _ = writeWAVFile(sess.debugDumpBase+"-final.wav", sess.finalDump, streamAudioRate, channels)
  1374. }
  1375. logging.Warn("boundary", "debug_audio_dump_window", "signal", sess.signalID, "base", sess.debugDumpBase)
  1376. sess.debugDumpBase = ""
  1377. sess.demodDump = nil
  1378. sess.finalDump = nil
  1379. sess.debugDumpStart = time.Time{}
  1380. sess.debugDumpUntil = time.Time{}
  1381. }
  1382. return audio, streamAudioRate
  1383. }
  1384. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  1385. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  1386. // loopBW is in Hz, sampleRate in samples/sec.
  1387. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  1388. if sampleRate <= 0 || loopBW <= 0 {
  1389. return 0, 0
  1390. }
  1391. bl := loopBW / float64(sampleRate)
  1392. theta := bl / (damping + 0.25/damping)
  1393. d := 1 + 2*damping*theta + theta*theta
  1394. alpha := (4 * damping * theta) / d
  1395. beta := (4 * theta * theta) / d
  1396. return alpha, beta
  1397. }
  1398. // wfmAudioFilter applies a stateful 15kHz lowpass to WFM discriminator output.
  1399. // Removes the 19kHz stereo pilot, L-R DSB-SC subcarrier (23-53kHz), and RDS (57kHz)
  1400. // that would otherwise leak into the audio output as clicks and tonal artifacts.
  1401. func (sess *streamSession) wfmAudioFilter(audio []float32, sampleRate int) []float32 {
  1402. if len(audio) == 0 || sampleRate <= 0 {
  1403. return audio
  1404. }
  1405. if sess.wfmAudioLPF == nil || sess.wfmAudioLPFRate != sampleRate {
  1406. sess.wfmAudioLPF = dsp.NewStatefulFIRReal(dsp.LowpassFIR(15000, sampleRate, 101))
  1407. sess.wfmAudioLPFRate = sampleRate
  1408. }
  1409. return sess.wfmAudioLPF.Process(audio)
  1410. }
  1411. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  1412. // Uses persistent FIR filter state across frames for click-free stereo.
  1413. // Reuses session scratch buffers to minimize allocations.
  1414. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  1415. if len(mono) == 0 || sampleRate <= 0 {
  1416. return nil, false
  1417. }
  1418. n := len(mono)
  1419. // Rebuild rate-dependent stereo filters when sampleRate changes
  1420. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  1421. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  1422. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  1423. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  1424. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  1425. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  1426. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  1427. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  1428. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  1429. sess.stereoFilterRate = sampleRate
  1430. // Initialize PLL for 19kHz pilot tracking.
  1431. sess.pilotPhase = 0
  1432. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  1433. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  1434. sess.pilotErrAvg = 0
  1435. sess.pilotI = 0
  1436. sess.pilotQ = 0
  1437. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  1438. }
  1439. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  1440. scratch := sess.growAudio(n * 5)
  1441. lpr := scratch[:n]
  1442. bpfLR := scratch[n : 2*n]
  1443. lr := scratch[2*n : 3*n]
  1444. work1 := scratch[3*n : 4*n]
  1445. work2 := scratch[4*n : 5*n]
  1446. sess.stereoLPF.ProcessInto(mono, lpr)
  1447. // 23-53kHz bandpass for L-R DSB-SC.
  1448. sess.stereoBPHi.ProcessInto(mono, work1)
  1449. sess.stereoBPLo.ProcessInto(mono, work2)
  1450. for i := 0; i < n; i++ {
  1451. bpfLR[i] = work1[i] - work2[i]
  1452. }
  1453. // 19kHz pilot bandpass for PLL.
  1454. sess.pilotLPFHi.ProcessInto(mono, work1)
  1455. sess.pilotLPFLo.ProcessInto(mono, work2)
  1456. for i := 0; i < n; i++ {
  1457. work1[i] = work1[i] - work2[i]
  1458. }
  1459. pilot := work1
  1460. phase := sess.pilotPhase
  1461. freq := sess.pilotFreq
  1462. alpha := sess.pilotAlpha
  1463. beta := sess.pilotBeta
  1464. iState := sess.pilotI
  1465. qState := sess.pilotQ
  1466. lpAlpha := sess.pilotLPAlpha
  1467. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  1468. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  1469. var pilotPower float64
  1470. var totalPower float64
  1471. var errSum float64
  1472. for i := 0; i < n; i++ {
  1473. p := float64(pilot[i])
  1474. sinP, cosP := math.Sincos(phase)
  1475. iMix := p * cosP
  1476. qMix := p * -sinP
  1477. iState += lpAlpha * (iMix - iState)
  1478. qState += lpAlpha * (qMix - qState)
  1479. err := math.Atan2(qState, iState)
  1480. freq += beta * err
  1481. if freq < minFreq {
  1482. freq = minFreq
  1483. } else if freq > maxFreq {
  1484. freq = maxFreq
  1485. }
  1486. phase += freq + alpha*err
  1487. if phase > 2*math.Pi {
  1488. phase -= 2 * math.Pi
  1489. } else if phase < 0 {
  1490. phase += 2 * math.Pi
  1491. }
  1492. totalPower += float64(mono[i]) * float64(mono[i])
  1493. pilotPower += p * p
  1494. errSum += math.Abs(err)
  1495. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  1496. }
  1497. sess.pilotPhase = phase
  1498. sess.pilotFreq = freq
  1499. sess.pilotI = iState
  1500. sess.pilotQ = qState
  1501. blockErr := errSum / float64(n)
  1502. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  1503. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  1504. pilotRatio := 0.0
  1505. if totalPower > 0 {
  1506. pilotRatio = pilotPower / totalPower
  1507. }
  1508. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  1509. // Lock heuristics: pilot power fraction and PLL phase error stability.
  1510. // Pilot power is a small but stable fraction of composite energy; require
  1511. // a modest floor plus PLL settling to avoid flapping in noise.
  1512. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  1513. out := make([]float32, n*2)
  1514. for i := 0; i < n; i++ {
  1515. out[i*2] = 0.5 * (lpr[i] + lr[i])
  1516. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  1517. }
  1518. return out, locked
  1519. }
  1520. // dspStateSnapshot captures persistent DSP state for segment splits.
  1521. type dspStateSnapshot struct {
  1522. overlapIQ []complex64
  1523. deemphL float64
  1524. deemphR float64
  1525. pilotPhase float64
  1526. pilotFreq float64
  1527. pilotAlpha float64
  1528. pilotBeta float64
  1529. pilotErrAvg float64
  1530. pilotI float64
  1531. pilotQ float64
  1532. pilotLPAlpha float64
  1533. monoResampler *dsp.Resampler
  1534. monoResamplerRate int
  1535. stereoResampler *dsp.StereoResampler
  1536. stereoResamplerRate int
  1537. stereoLPF *dsp.StatefulFIRReal
  1538. stereoFilterRate int
  1539. stereoBPHi *dsp.StatefulFIRReal
  1540. stereoBPLo *dsp.StatefulFIRReal
  1541. stereoLRLPF *dsp.StatefulFIRReal
  1542. stereoAALPF *dsp.StatefulFIRReal
  1543. pilotLPFHi *dsp.StatefulFIRReal
  1544. pilotLPFLo *dsp.StatefulFIRReal
  1545. preDemodFIR *dsp.StatefulFIRComplex
  1546. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  1547. preDemodDecim int
  1548. preDemodRate int
  1549. preDemodCutoff float64
  1550. preDemodDecimPhase int
  1551. wfmAudioLPF *dsp.StatefulFIRReal
  1552. wfmAudioLPFRate int
  1553. }
  1554. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  1555. return dspStateSnapshot{
  1556. overlapIQ: sess.overlapIQ,
  1557. deemphL: sess.deemphL,
  1558. deemphR: sess.deemphR,
  1559. pilotPhase: sess.pilotPhase,
  1560. pilotFreq: sess.pilotFreq,
  1561. pilotAlpha: sess.pilotAlpha,
  1562. pilotBeta: sess.pilotBeta,
  1563. pilotErrAvg: sess.pilotErrAvg,
  1564. pilotI: sess.pilotI,
  1565. pilotQ: sess.pilotQ,
  1566. pilotLPAlpha: sess.pilotLPAlpha,
  1567. monoResampler: sess.monoResampler,
  1568. monoResamplerRate: sess.monoResamplerRate,
  1569. stereoResampler: sess.stereoResampler,
  1570. stereoResamplerRate: sess.stereoResamplerRate,
  1571. stereoLPF: sess.stereoLPF,
  1572. stereoFilterRate: sess.stereoFilterRate,
  1573. stereoBPHi: sess.stereoBPHi,
  1574. stereoBPLo: sess.stereoBPLo,
  1575. stereoLRLPF: sess.stereoLRLPF,
  1576. stereoAALPF: sess.stereoAALPF,
  1577. pilotLPFHi: sess.pilotLPFHi,
  1578. pilotLPFLo: sess.pilotLPFLo,
  1579. preDemodFIR: sess.preDemodFIR,
  1580. preDemodDecimator: sess.preDemodDecimator,
  1581. preDemodDecim: sess.preDemodDecim,
  1582. preDemodRate: sess.preDemodRate,
  1583. preDemodCutoff: sess.preDemodCutoff,
  1584. preDemodDecimPhase: sess.preDemodDecimPhase,
  1585. wfmAudioLPF: sess.wfmAudioLPF,
  1586. wfmAudioLPFRate: sess.wfmAudioLPFRate,
  1587. }
  1588. }
  1589. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  1590. sess.overlapIQ = s.overlapIQ
  1591. sess.deemphL = s.deemphL
  1592. sess.deemphR = s.deemphR
  1593. sess.pilotPhase = s.pilotPhase
  1594. sess.pilotFreq = s.pilotFreq
  1595. sess.pilotAlpha = s.pilotAlpha
  1596. sess.pilotBeta = s.pilotBeta
  1597. sess.pilotErrAvg = s.pilotErrAvg
  1598. sess.pilotI = s.pilotI
  1599. sess.pilotQ = s.pilotQ
  1600. sess.pilotLPAlpha = s.pilotLPAlpha
  1601. sess.monoResampler = s.monoResampler
  1602. sess.monoResamplerRate = s.monoResamplerRate
  1603. sess.stereoResampler = s.stereoResampler
  1604. sess.stereoResamplerRate = s.stereoResamplerRate
  1605. sess.stereoLPF = s.stereoLPF
  1606. sess.stereoFilterRate = s.stereoFilterRate
  1607. sess.stereoBPHi = s.stereoBPHi
  1608. sess.stereoBPLo = s.stereoBPLo
  1609. sess.stereoLRLPF = s.stereoLRLPF
  1610. sess.stereoAALPF = s.stereoAALPF
  1611. sess.pilotLPFHi = s.pilotLPFHi
  1612. sess.pilotLPFLo = s.pilotLPFLo
  1613. sess.preDemodFIR = s.preDemodFIR
  1614. sess.preDemodDecimator = s.preDemodDecimator
  1615. sess.preDemodDecim = s.preDemodDecim
  1616. sess.preDemodRate = s.preDemodRate
  1617. sess.preDemodCutoff = s.preDemodCutoff
  1618. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1619. sess.wfmAudioLPF = s.wfmAudioLPF
  1620. sess.wfmAudioLPFRate = s.wfmAudioLPFRate
  1621. }
  1622. // ---------------------------------------------------------------------------
  1623. // Session management helpers
  1624. // ---------------------------------------------------------------------------
  1625. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1626. outputDir := st.policy.OutputDir
  1627. if outputDir == "" {
  1628. outputDir = "data/recordings"
  1629. }
  1630. demodName, channels := resolveDemod(sig)
  1631. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1632. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1633. dir := filepath.Join(outputDir, dirName)
  1634. if err := os.MkdirAll(dir, 0o755); err != nil {
  1635. return nil, err
  1636. }
  1637. wavPath := filepath.Join(dir, "audio.wav")
  1638. f, err := os.Create(wavPath)
  1639. if err != nil {
  1640. return nil, err
  1641. }
  1642. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1643. f.Close()
  1644. return nil, err
  1645. }
  1646. playbackMode, stereoState := initialPlaybackState(demodName)
  1647. sess := &streamSession{
  1648. sessionID: fmt.Sprintf("%d-%d-r", sig.ID, now.UnixMilli()),
  1649. signalID: sig.ID,
  1650. centerHz: sig.CenterHz,
  1651. bwHz: sig.BWHz,
  1652. snrDb: sig.SNRDb,
  1653. peakDb: sig.PeakDb,
  1654. class: sig.Class,
  1655. startTime: now,
  1656. lastFeed: now,
  1657. dir: dir,
  1658. wavFile: f,
  1659. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1660. sampleRate: streamAudioRate,
  1661. channels: channels,
  1662. demodName: demodName,
  1663. playbackMode: playbackMode,
  1664. stereoState: stereoState,
  1665. deemphasisUs: st.policy.DeemphasisUs,
  1666. }
  1667. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1668. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1669. return sess, nil
  1670. }
  1671. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1672. demodName, channels := resolveDemod(sig)
  1673. for _, pl := range st.pendingListens {
  1674. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1675. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1676. demodName = requested
  1677. if demodName == "WFM_STEREO" {
  1678. channels = 2
  1679. } else if d := demod.Get(demodName); d != nil {
  1680. channels = d.Channels()
  1681. } else {
  1682. channels = 1
  1683. }
  1684. break
  1685. }
  1686. }
  1687. }
  1688. playbackMode, stereoState := initialPlaybackState(demodName)
  1689. sess := &streamSession{
  1690. sessionID: fmt.Sprintf("%d-%d-l", sig.ID, now.UnixMilli()),
  1691. signalID: sig.ID,
  1692. centerHz: sig.CenterHz,
  1693. bwHz: sig.BWHz,
  1694. snrDb: sig.SNRDb,
  1695. peakDb: sig.PeakDb,
  1696. class: sig.Class,
  1697. startTime: now,
  1698. lastFeed: now,
  1699. listenOnly: true,
  1700. sampleRate: streamAudioRate,
  1701. channels: channels,
  1702. demodName: demodName,
  1703. playbackMode: playbackMode,
  1704. stereoState: stereoState,
  1705. deemphasisUs: st.policy.DeemphasisUs,
  1706. }
  1707. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1708. sig.ID, sig.CenterHz/1e6, demodName)
  1709. return sess
  1710. }
  1711. func resolveDemod(sig *detector.Signal) (string, int) {
  1712. demodName := "NFM"
  1713. if sig.Class != nil {
  1714. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1715. demodName = n
  1716. }
  1717. }
  1718. channels := 1
  1719. if demodName == "WFM_STEREO" {
  1720. channels = 2
  1721. } else if d := demod.Get(demodName); d != nil {
  1722. channels = d.Channels()
  1723. }
  1724. return demodName, channels
  1725. }
  1726. func initialPlaybackState(demodName string) (string, string) {
  1727. playbackMode := demodName
  1728. stereoState := "mono"
  1729. if demodName == "WFM_STEREO" {
  1730. stereoState = "searching"
  1731. }
  1732. return playbackMode, stereoState
  1733. }
  1734. func (sess *streamSession) audioInfo() AudioInfo {
  1735. return AudioInfo{
  1736. SampleRate: sess.sampleRate,
  1737. Channels: sess.channels,
  1738. Format: "s16le",
  1739. DemodName: sess.demodName,
  1740. PlaybackMode: sess.playbackMode,
  1741. StereoState: sess.stereoState,
  1742. }
  1743. }
  1744. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1745. infoJSON, _ := json.Marshal(info)
  1746. tagged := make([]byte, 1+len(infoJSON))
  1747. tagged[0] = 0x00 // tag: audio_info
  1748. copy(tagged[1:], infoJSON)
  1749. for _, sub := range subs {
  1750. select {
  1751. case sub.ch <- tagged:
  1752. default:
  1753. }
  1754. }
  1755. }
  1756. func defaultAudioInfoForMode(mode string) AudioInfo {
  1757. demodName := "NFM"
  1758. if requested := normalizeRequestedMode(mode); requested != "" {
  1759. demodName = requested
  1760. }
  1761. channels := 1
  1762. if demodName == "WFM_STEREO" {
  1763. channels = 2
  1764. } else if d := demod.Get(demodName); d != nil {
  1765. channels = d.Channels()
  1766. }
  1767. playbackMode, stereoState := initialPlaybackState(demodName)
  1768. return AudioInfo{
  1769. SampleRate: streamAudioRate,
  1770. Channels: channels,
  1771. Format: "s16le",
  1772. DemodName: demodName,
  1773. PlaybackMode: playbackMode,
  1774. StereoState: stereoState,
  1775. }
  1776. }
  1777. func normalizeRequestedMode(mode string) string {
  1778. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1779. case "", "AUTO":
  1780. return ""
  1781. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1782. return strings.ToUpper(strings.TrimSpace(mode))
  1783. default:
  1784. return ""
  1785. }
  1786. }
  1787. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1788. func (sess *streamSession) growIQ(n int) []complex64 {
  1789. if cap(sess.scratchIQ) >= n {
  1790. return sess.scratchIQ[:n]
  1791. }
  1792. sess.scratchIQ = make([]complex64, n, n*5/4)
  1793. return sess.scratchIQ
  1794. }
  1795. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1796. func (sess *streamSession) growAudio(n int) []float32 {
  1797. if cap(sess.scratchAudio) >= n {
  1798. return sess.scratchAudio[:n]
  1799. }
  1800. sess.scratchAudio = make([]float32, n, n*5/4)
  1801. return sess.scratchAudio
  1802. }
  1803. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1804. func (sess *streamSession) growPCM(n int) []byte {
  1805. if cap(sess.scratchPCM) >= n {
  1806. return sess.scratchPCM[:n]
  1807. }
  1808. sess.scratchPCM = make([]byte, n, n*5/4)
  1809. return sess.scratchPCM
  1810. }
  1811. func convertToListenOnly(sess *streamSession) {
  1812. if sess.wavBuf != nil {
  1813. _ = sess.wavBuf.Flush()
  1814. }
  1815. if sess.wavFile != nil {
  1816. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1817. sess.wavFile.Close()
  1818. }
  1819. sess.wavFile = nil
  1820. sess.wavBuf = nil
  1821. sess.listenOnly = true
  1822. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1823. }
  1824. func closeSession(sess *streamSession, policy *Policy) {
  1825. if sess.listenOnly {
  1826. return
  1827. }
  1828. if sess.wavBuf != nil {
  1829. _ = sess.wavBuf.Flush()
  1830. }
  1831. if sess.wavFile != nil {
  1832. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1833. sess.wavFile.Close()
  1834. sess.wavFile = nil
  1835. sess.wavBuf = nil
  1836. }
  1837. dur := sess.lastFeed.Sub(sess.startTime)
  1838. files := map[string]any{
  1839. "audio": "audio.wav",
  1840. "audio_sample_rate": sess.sampleRate,
  1841. "audio_channels": sess.channels,
  1842. "audio_demod": sess.demodName,
  1843. "recording_mode": "streaming",
  1844. }
  1845. meta := Meta{
  1846. EventID: sess.signalID,
  1847. Start: sess.startTime,
  1848. End: sess.lastFeed,
  1849. CenterHz: sess.centerHz,
  1850. BandwidthHz: sess.bwHz,
  1851. SampleRate: sess.sampleRate,
  1852. SNRDb: sess.snrDb,
  1853. PeakDb: sess.peakDb,
  1854. Class: sess.class,
  1855. DurationMs: dur.Milliseconds(),
  1856. Files: files,
  1857. }
  1858. b, err := json.MarshalIndent(meta, "", " ")
  1859. if err == nil {
  1860. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1861. }
  1862. if policy != nil {
  1863. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1864. }
  1865. }
  1866. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1867. if len(sess.audioSubs) == 0 {
  1868. return
  1869. }
  1870. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1871. tagged := make([]byte, 1+pcmLen)
  1872. tagged[0] = 0x01
  1873. copy(tagged[1:], pcm[:pcmLen])
  1874. alive := sess.audioSubs[:0]
  1875. for _, sub := range sess.audioSubs {
  1876. select {
  1877. case sub.ch <- tagged:
  1878. alive = append(alive, sub)
  1879. default:
  1880. st.droppedPCM++
  1881. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1882. if st.telemetry != nil {
  1883. st.telemetry.IncCounter("streamer.pcm.drop", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1884. }
  1885. }
  1886. }
  1887. sess.audioSubs = alive
  1888. if st.telemetry != nil {
  1889. st.telemetry.SetGauge("streamer.subscribers.count", float64(len(alive)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1890. }
  1891. }
  1892. func (st *Streamer) summaryWorker() {
  1893. defer st.summaryWG.Done()
  1894. ticker := time.NewTicker(5 * time.Second)
  1895. defer ticker.Stop()
  1896. for {
  1897. select {
  1898. case <-ticker.C:
  1899. st.writePeriodicSummary()
  1900. case <-st.summaryStop:
  1901. return
  1902. }
  1903. }
  1904. }
  1905. func (st *Streamer) writePeriodicSummary() {
  1906. st.mu.Lock()
  1907. activeSubscribers := 0
  1908. for _, sess := range st.sessions {
  1909. activeSubscribers += len(sess.audioSubs)
  1910. }
  1911. processAvg := 0.0
  1912. if st.processLoopCount > 0 {
  1913. processAvg = st.processLoopSumMs / float64(st.processLoopCount)
  1914. }
  1915. summary := map[string]any{
  1916. "ts": time.Now().UTC().Format(time.RFC3339Nano),
  1917. "feed_drop_total": st.droppedFeed,
  1918. "pcm_drop_total": st.droppedPCM,
  1919. "active_subscribers": activeSubscribers,
  1920. "pending_listeners": len(st.pendingListens),
  1921. "active_sessions": len(st.sessions),
  1922. "produced_pcm_frames": st.producedPCMFrames,
  1923. "process_loop_ms_avg": processAvg,
  1924. "process_loop_ms_max": st.processLoopMaxMs,
  1925. "feed_queue_len": len(st.feedCh),
  1926. "feed_queue_cap": cap(st.feedCh),
  1927. "feed_queue_fill_ratio": safeRatio(float64(len(st.feedCh)), float64(cap(st.feedCh))),
  1928. "backpressure_hint": st.backpressureHintLocked(),
  1929. }
  1930. st.processLoopCount = 0
  1931. st.processLoopSumMs = 0
  1932. st.processLoopMaxMs = 0
  1933. st.mu.Unlock()
  1934. if st.debugSummary != nil {
  1935. _ = st.debugSummary.WriteServerSummary(summary)
  1936. }
  1937. }
  1938. func (st *Streamer) backpressureHintLocked() string {
  1939. queueLen := len(st.feedCh)
  1940. queueCap := cap(st.feedCh)
  1941. if queueCap > 0 && float64(queueLen)/float64(queueCap) >= 0.8 {
  1942. return "feed_queue_high"
  1943. }
  1944. if st.droppedFeed > 0 || st.droppedPCM > 0 {
  1945. return "drops_seen"
  1946. }
  1947. if len(st.pendingListens) > 0 {
  1948. return "pending_listeners"
  1949. }
  1950. return "ok"
  1951. }
  1952. func safeRatio(a float64, b float64) float64 {
  1953. if b <= 0 {
  1954. return 0
  1955. }
  1956. return a / b
  1957. }
  1958. func (st *Streamer) AppendBrowserAudioSummary(v any) error {
  1959. if st == nil || st.debugSummary == nil {
  1960. return nil
  1961. }
  1962. return st.debugSummary.WriteBrowserSummary(v)
  1963. }
  1964. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1965. if len(st.policy.ClassFilter) == 0 {
  1966. return true
  1967. }
  1968. if cls == nil {
  1969. return false
  1970. }
  1971. for _, f := range st.policy.ClassFilter {
  1972. if strings.EqualFold(f, string(cls.ModType)) {
  1973. return true
  1974. }
  1975. }
  1976. return false
  1977. }
  1978. // ErrNoSession is returned when no matching signal session exists.
  1979. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1980. // ---------------------------------------------------------------------------
  1981. // WAV header helpers
  1982. // ---------------------------------------------------------------------------
  1983. func writeWAVFile(path string, audio []float32, sampleRate int, channels int) error {
  1984. f, err := os.Create(path)
  1985. if err != nil {
  1986. return err
  1987. }
  1988. defer f.Close()
  1989. return writeWAVTo(f, audio, sampleRate, channels)
  1990. }
  1991. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1992. if channels <= 0 {
  1993. channels = 1
  1994. }
  1995. hdr := make([]byte, 44)
  1996. copy(hdr[0:4], "RIFF")
  1997. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1998. copy(hdr[8:12], "WAVE")
  1999. copy(hdr[12:16], "fmt ")
  2000. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  2001. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  2002. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  2003. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  2004. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  2005. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  2006. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  2007. copy(hdr[36:40], "data")
  2008. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  2009. _, err := f.Write(hdr)
  2010. return err
  2011. }
  2012. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  2013. dataSize := uint32(totalSamples * 2)
  2014. var buf [4]byte
  2015. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  2016. if _, err := f.Seek(4, 0); err != nil {
  2017. return
  2018. }
  2019. _, _ = f.Write(buf[:])
  2020. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  2021. if _, err := f.Seek(24, 0); err != nil {
  2022. return
  2023. }
  2024. _, _ = f.Write(buf[:])
  2025. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  2026. if _, err := f.Seek(28, 0); err != nil {
  2027. return
  2028. }
  2029. _, _ = f.Write(buf[:])
  2030. binary.LittleEndian.PutUint32(buf[:], dataSize)
  2031. if _, err := f.Seek(40, 0); err != nil {
  2032. return
  2033. }
  2034. _, _ = f.Write(buf[:])
  2035. }
  2036. // ResetStreams forces all active streaming sessions to discard their FIR states and decimation phases.
  2037. // This is used when the upstream DSP drops samples, creating a hard break in phase continuity.
  2038. func (st *Streamer) ResetStreams() {
  2039. st.mu.Lock()
  2040. defer st.mu.Unlock()
  2041. if st.telemetry != nil {
  2042. st.telemetry.IncCounter("streamer.reset.count", 1, nil)
  2043. st.telemetry.Event("stream_reset", "warn", "stream DSP state reset", nil, map[string]any{"sessions": len(st.sessions)})
  2044. }
  2045. for _, sess := range st.sessions {
  2046. sess.preDemodFIR = nil
  2047. sess.preDemodDecimator = nil
  2048. sess.preDemodDecimPhase = 0
  2049. sess.stereoResampler = nil
  2050. sess.monoResampler = nil
  2051. sess.wfmAudioLPF = nil
  2052. }
  2053. }