Wideband autonomous SDR analysis engine forked from sdr-visual-suite
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

2076 行
67KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "sort"
  15. "sync"
  16. "time"
  17. "sdr-wideband-suite/internal/classifier"
  18. "sdr-wideband-suite/internal/demod"
  19. "sdr-wideband-suite/internal/detector"
  20. "sdr-wideband-suite/internal/dsp"
  21. "sdr-wideband-suite/internal/logging"
  22. "sdr-wideband-suite/internal/telemetry"
  23. )
  24. // ---------------------------------------------------------------------------
  25. // streamSession — one open demod session for one signal
  26. // ---------------------------------------------------------------------------
  27. type streamSession struct {
  28. sessionID string
  29. signalID int64
  30. centerHz float64
  31. bwHz float64
  32. snrDb float64
  33. peakDb float64
  34. class *classifier.Classification
  35. startTime time.Time
  36. lastFeed time.Time
  37. playbackMode string
  38. stereoState string
  39. lastAudioTs time.Time
  40. debugDumpStart time.Time
  41. debugDumpUntil time.Time
  42. debugDumpBase string
  43. demodDump []float32
  44. finalDump []float32
  45. lastAudioL float32
  46. lastAudioR float32
  47. prevAudioL float64 // second-to-last L sample for boundary transient detection
  48. lastAudioSet bool
  49. lastDecIQ complex64
  50. prevDecIQ complex64
  51. lastDecIQSet bool
  52. lastExtractIQ complex64
  53. prevExtractIQ complex64
  54. lastExtractIQSet bool
  55. // FM discriminator cross-block bridging: carry the last IQ sample so the
  56. // discriminator can compute the phase step across block boundaries.
  57. lastDiscrimIQ complex64
  58. lastDiscrimIQSet bool
  59. lastDemodL float32
  60. prevDemodL float64
  61. lastDemodSet bool
  62. snippetSeq uint64
  63. // listenOnly sessions have no WAV file and no disk I/O.
  64. // They exist solely to feed audio to live-listen subscribers.
  65. listenOnly bool
  66. // Recording state (nil/zero for listen-only sessions)
  67. dir string
  68. wavFile *os.File
  69. wavBuf *bufio.Writer
  70. wavSamples int64
  71. segmentIdx int
  72. sampleRate int // actual output audio sample rate (always streamAudioRate)
  73. channels int
  74. demodName string
  75. // --- Persistent DSP state for click-free streaming ---
  76. // Overlap-save: tail of previous extracted IQ snippet.
  77. // Currently unused for live demod after removing the extra discriminator
  78. // overlap prepend, but kept in DSP snapshot state for compatibility.
  79. overlapIQ []complex64
  80. // De-emphasis IIR state (persists across frames)
  81. deemphL float64
  82. deemphR float64
  83. // Stereo lock state for live WFM streaming
  84. stereoEnabled bool
  85. stereoOnCount int
  86. stereoOffCount int
  87. // Pilot-locked stereo PLL state (19kHz pilot)
  88. pilotPhase float64
  89. pilotFreq float64
  90. pilotAlpha float64
  91. pilotBeta float64
  92. pilotErrAvg float64
  93. pilotI float64
  94. pilotQ float64
  95. pilotLPAlpha float64
  96. // Polyphase resampler (replaces integer-decimate hack)
  97. monoResampler *dsp.Resampler
  98. monoResamplerRate int
  99. stereoResampler *dsp.StereoResampler
  100. stereoResamplerRate int
  101. // AQ-4: Stateful FIR filters for click-free stereo decode
  102. stereoFilterRate int
  103. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  104. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  105. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  106. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  107. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  108. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  109. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  110. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  111. // and avoids per-frame FIR recomputation)
  112. preDemodFIR *dsp.StatefulFIRComplex
  113. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  114. preDemodDecim int // cached decimation factor
  115. preDemodRate int // cached snipRate this FIR was built for
  116. preDemodCutoff float64 // cached cutoff
  117. preDemodDecimPhase int // retained for backward compatibility in snapshots/debug
  118. // AQ-2: De-emphasis config (µs, 0 = disabled)
  119. deemphasisUs float64
  120. // Scratch buffers — reused across frames to avoid GC pressure.
  121. // Grown as needed, never shrunk.
  122. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  123. scratchAudio []float32 // for stereo decode intermediates
  124. scratchPCM []byte // for PCM encoding
  125. // live-listen subscribers
  126. audioSubs []audioSub
  127. }
  128. type audioSub struct {
  129. id int64
  130. ch chan []byte
  131. }
  132. type RuntimeSignalInfo struct {
  133. DemodName string
  134. PlaybackMode string
  135. StereoState string
  136. Channels int
  137. SampleRate int
  138. }
  139. // AudioInfo describes the audio format of a live-listen subscription.
  140. // Sent to the WebSocket client as the first message.
  141. type AudioInfo struct {
  142. SampleRate int `json:"sample_rate"`
  143. Channels int `json:"channels"`
  144. Format string `json:"format"` // always "s16le"
  145. DemodName string `json:"demod"`
  146. PlaybackMode string `json:"playback_mode,omitempty"`
  147. StereoState string `json:"stereo_state,omitempty"`
  148. }
  149. const (
  150. streamAudioRate = 48000
  151. resamplerTaps = 32 // taps per polyphase arm — good quality
  152. )
  153. var debugDumpDelay = func() time.Duration {
  154. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DELAY_SECONDS"))
  155. if raw == "" {
  156. return 5 * time.Second
  157. }
  158. v, err := strconv.Atoi(raw)
  159. if err != nil || v < 0 {
  160. return 5 * time.Second
  161. }
  162. return time.Duration(v) * time.Second
  163. }()
  164. var debugDumpDuration = func() time.Duration {
  165. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_DUMP_DURATION_SECONDS"))
  166. if raw == "" {
  167. return 15 * time.Second
  168. }
  169. v, err := strconv.Atoi(raw)
  170. if err != nil || v <= 0 {
  171. return 15 * time.Second
  172. }
  173. return time.Duration(v) * time.Second
  174. }()
  175. var audioDumpEnabled = func() bool {
  176. raw := strings.TrimSpace(os.Getenv("SDR_DEBUG_AUDIO_DUMP_ENABLED"))
  177. if raw == "" {
  178. return false
  179. }
  180. v, err := strconv.ParseBool(raw)
  181. if err != nil {
  182. return false
  183. }
  184. return v
  185. }()
  186. var decHeadTrimSamples = func() int {
  187. raw := strings.TrimSpace(os.Getenv("SDR_DEC_HEAD_TRIM"))
  188. if raw == "" {
  189. return 0
  190. }
  191. v, err := strconv.Atoi(raw)
  192. if err != nil || v < 0 {
  193. return 0
  194. }
  195. return v
  196. }()
  197. // ---------------------------------------------------------------------------
  198. // Streamer — manages all active streaming sessions
  199. // ---------------------------------------------------------------------------
  200. type streamFeedItem struct {
  201. signal detector.Signal
  202. snippet []complex64
  203. snipRate int
  204. }
  205. type streamFeedMsg struct {
  206. traceID uint64
  207. items []streamFeedItem
  208. enqueuedAt time.Time
  209. }
  210. type Streamer struct {
  211. mu sync.Mutex
  212. sessions map[int64]*streamSession
  213. policy Policy
  214. centerHz float64
  215. nextSub int64
  216. feedCh chan streamFeedMsg
  217. done chan struct{}
  218. droppedFeed uint64
  219. droppedPCM uint64
  220. lastFeedTS time.Time
  221. lastProcTS time.Time
  222. // pendingListens are subscribers waiting for a matching session.
  223. pendingListens map[int64]*pendingListen
  224. telemetry *telemetry.Collector
  225. }
  226. type pendingListen struct {
  227. freq float64
  228. bw float64
  229. mode string
  230. ch chan []byte
  231. }
  232. func newStreamer(policy Policy, centerHz float64, coll *telemetry.Collector) *Streamer {
  233. st := &Streamer{
  234. sessions: make(map[int64]*streamSession),
  235. policy: policy,
  236. centerHz: centerHz,
  237. feedCh: make(chan streamFeedMsg, 2),
  238. done: make(chan struct{}),
  239. pendingListens: make(map[int64]*pendingListen),
  240. telemetry: coll,
  241. }
  242. go st.worker()
  243. return st
  244. }
  245. func (st *Streamer) worker() {
  246. for msg := range st.feedCh {
  247. st.processFeed(msg)
  248. }
  249. close(st.done)
  250. }
  251. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  252. st.mu.Lock()
  253. defer st.mu.Unlock()
  254. wasEnabled := st.policy.Enabled
  255. st.policy = policy
  256. st.centerHz = centerHz
  257. // If recording was just disabled, close recording sessions
  258. // but keep listen-only sessions alive.
  259. if wasEnabled && !policy.Enabled {
  260. for id, sess := range st.sessions {
  261. if sess.listenOnly {
  262. continue
  263. }
  264. if len(sess.audioSubs) > 0 {
  265. // Convert to listen-only: close WAV but keep session
  266. convertToListenOnly(sess)
  267. } else {
  268. closeSession(sess, &st.policy)
  269. delete(st.sessions, id)
  270. }
  271. }
  272. }
  273. }
  274. // HasListeners returns true if any sessions have audio subscribers
  275. // or there are pending listen requests. Used by the DSP loop to
  276. // decide whether to feed snippets even when recording is disabled.
  277. func (st *Streamer) HasListeners() bool {
  278. st.mu.Lock()
  279. defer st.mu.Unlock()
  280. return st.hasListenersLocked()
  281. }
  282. func (st *Streamer) hasListenersLocked() bool {
  283. if len(st.pendingListens) > 0 {
  284. return true
  285. }
  286. for _, sess := range st.sessions {
  287. if len(sess.audioSubs) > 0 {
  288. return true
  289. }
  290. }
  291. return false
  292. }
  293. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  294. // Feeds are accepted if:
  295. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  296. // - Any live-listen subscribers exist (listen-only mode)
  297. //
  298. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  299. // data, so items can be passed directly without another copy.
  300. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  301. st.mu.Lock()
  302. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  303. hasListeners := st.hasListenersLocked()
  304. pending := len(st.pendingListens)
  305. debugLiveAudio := st.policy.DebugLiveAudio
  306. now := time.Now()
  307. if !st.lastFeedTS.IsZero() {
  308. gap := now.Sub(st.lastFeedTS)
  309. if gap > 150*time.Millisecond {
  310. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  311. }
  312. }
  313. st.lastFeedTS = now
  314. st.mu.Unlock()
  315. if debugLiveAudio {
  316. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  317. }
  318. if (!recEnabled && !hasListeners) || len(items) == 0 {
  319. return
  320. }
  321. if st.telemetry != nil {
  322. st.telemetry.SetGauge("streamer.feed.queue_len", float64(len(st.feedCh)), nil)
  323. st.telemetry.SetGauge("streamer.pending_listeners", float64(pending), nil)
  324. st.telemetry.Observe("streamer.feed.batch_size", float64(len(items)), nil)
  325. }
  326. select {
  327. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items, enqueuedAt: time.Now()}:
  328. default:
  329. st.droppedFeed++
  330. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  331. if st.telemetry != nil {
  332. st.telemetry.IncCounter("streamer.feed.drop", 1, nil)
  333. st.telemetry.Event("stream_feed_drop", "warn", "feed queue full", nil, map[string]any{
  334. "trace_id": traceID,
  335. "queue_len": len(st.feedCh),
  336. })
  337. }
  338. }
  339. }
  340. // processFeed runs in the worker goroutine.
  341. func (st *Streamer) processFeed(msg streamFeedMsg) {
  342. procStart := time.Now()
  343. lockStart := time.Now()
  344. st.mu.Lock()
  345. lockWait := time.Since(lockStart)
  346. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  347. hasListeners := st.hasListenersLocked()
  348. now := time.Now()
  349. if !st.lastProcTS.IsZero() {
  350. gap := now.Sub(st.lastProcTS)
  351. if gap > 150*time.Millisecond {
  352. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  353. if st.telemetry != nil {
  354. st.telemetry.IncCounter("streamer.process.gap.count", 1, nil)
  355. st.telemetry.Observe("streamer.process.gap_ms", float64(gap.Milliseconds()), nil)
  356. }
  357. }
  358. }
  359. st.lastProcTS = now
  360. defer st.mu.Unlock()
  361. defer func() {
  362. if st.telemetry != nil {
  363. st.telemetry.Observe("streamer.process.total_ms", float64(time.Since(procStart).Microseconds())/1000.0, nil)
  364. st.telemetry.Observe("streamer.lock_wait_ms", float64(lockWait.Microseconds())/1000.0, telemetry.TagsFromPairs("lock", "process"))
  365. }
  366. }()
  367. if st.telemetry != nil {
  368. st.telemetry.Observe("streamer.feed.enqueue_delay_ms", float64(now.Sub(msg.enqueuedAt).Microseconds())/1000.0, nil)
  369. st.telemetry.SetGauge("streamer.sessions.active", float64(len(st.sessions)), nil)
  370. }
  371. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  372. if !recEnabled && !hasListeners {
  373. return
  374. }
  375. seen := make(map[int64]bool, len(msg.items))
  376. for i := range msg.items {
  377. item := &msg.items[i]
  378. sig := &item.signal
  379. seen[sig.ID] = true
  380. if sig.ID == 0 || sig.Class == nil {
  381. continue
  382. }
  383. if len(item.snippet) == 0 || item.snipRate <= 0 {
  384. continue
  385. }
  386. // Decide whether this signal needs a session
  387. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  388. needsListen := st.signalHasListenerLocked(sig)
  389. className := "<nil>"
  390. demodName := ""
  391. if sig.Class != nil {
  392. className = string(sig.Class.ModType)
  393. demodName, _ = resolveDemod(sig)
  394. }
  395. if st.policy.DebugLiveAudio {
  396. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  397. }
  398. if !needsRecording && !needsListen {
  399. continue
  400. }
  401. sess, exists := st.sessions[sig.ID]
  402. requestedMode := ""
  403. for _, pl := range st.pendingListens {
  404. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  405. if m := normalizeRequestedMode(pl.mode); m != "" {
  406. requestedMode = m
  407. break
  408. }
  409. }
  410. }
  411. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  412. for _, sub := range sess.audioSubs {
  413. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  414. }
  415. delete(st.sessions, sig.ID)
  416. sess = nil
  417. exists = false
  418. }
  419. if !exists {
  420. if needsRecording {
  421. s, err := st.openRecordingSession(sig, now)
  422. if err != nil {
  423. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  424. sig.ID, sig.CenterHz/1e6, err)
  425. if st.telemetry != nil {
  426. st.telemetry.IncCounter("streamer.session.open_error", 1, telemetry.TagsFromPairs("kind", "recording"))
  427. }
  428. continue
  429. }
  430. st.sessions[sig.ID] = s
  431. sess = s
  432. } else {
  433. s := st.openListenSession(sig, now)
  434. st.sessions[sig.ID] = s
  435. sess = s
  436. }
  437. // Attach any pending listeners
  438. st.attachPendingListeners(sess)
  439. if st.telemetry != nil {
  440. st.telemetry.IncCounter("streamer.session.open", 1, telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID)))
  441. st.telemetry.Event("session_open", "info", "stream session opened", telemetry.TagsFromPairs("session_id", sess.sessionID, "signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{
  442. "listen_only": sess.listenOnly,
  443. "demod": sess.demodName,
  444. })
  445. }
  446. }
  447. // Update metadata
  448. sess.lastFeed = now
  449. sess.centerHz = sig.CenterHz
  450. sess.bwHz = sig.BWHz
  451. if sig.SNRDb > sess.snrDb {
  452. sess.snrDb = sig.SNRDb
  453. }
  454. if sig.PeakDb > sess.peakDb {
  455. sess.peakDb = sig.PeakDb
  456. }
  457. if sig.Class != nil {
  458. sess.class = sig.Class
  459. }
  460. // Demod with persistent state
  461. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  462. audioStart := time.Now()
  463. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate, st.telemetry)
  464. if st.telemetry != nil {
  465. st.telemetry.Observe("streamer.process_snippet_ms", float64(time.Since(audioStart).Microseconds())/1000.0, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  466. }
  467. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  468. if len(audio) == 0 {
  469. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  470. if st.telemetry != nil {
  471. st.telemetry.IncCounter("streamer.audio.empty", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  472. }
  473. }
  474. if len(audio) > 0 {
  475. if sess.wavSamples == 0 && audioRate > 0 {
  476. sess.sampleRate = audioRate
  477. }
  478. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  479. pcmLen := len(audio) * 2
  480. pcm := sess.growPCM(pcmLen)
  481. for k, s := range audio {
  482. v := int16(clip(s * 32767))
  483. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  484. }
  485. if !sess.listenOnly && sess.wavBuf != nil {
  486. n, err := sess.wavBuf.Write(pcm)
  487. if err != nil {
  488. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  489. } else {
  490. sess.wavSamples += int64(n / 2)
  491. }
  492. }
  493. // Gap logging for live-audio sessions + transient click detector
  494. if len(sess.audioSubs) > 0 {
  495. if !sess.lastAudioTs.IsZero() {
  496. gap := time.Since(sess.lastAudioTs)
  497. if gap > 150*time.Millisecond {
  498. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  499. if st.telemetry != nil {
  500. st.telemetry.IncCounter("streamer.audio.gap.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  501. st.telemetry.Observe("streamer.audio.gap_ms", float64(gap.Milliseconds()), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  502. }
  503. }
  504. }
  505. // Transient click detector: finds short impulses (1-3 samples)
  506. // that deviate sharply from the local signal trend.
  507. // A click looks like: ...smooth... SPIKE ...smooth...
  508. // Normal FM audio has large deltas too, but they follow
  509. // a continuous curve. A click has high |d2/dt2| (acceleration).
  510. //
  511. // Method: second-derivative detector. For each sample triplet
  512. // (a, b, c), compute |2b - a - c| which is the discrete
  513. // second derivative magnitude. High values = transient spike.
  514. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  515. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  516. stride := sess.channels
  517. if stride < 1 {
  518. stride = 1
  519. }
  520. nFrames := len(audio) / stride
  521. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  522. if sess.lastAudioSet && nFrames >= 1 {
  523. // second derivative across boundary: |2*last - prevLast - first|
  524. first := float64(audio[0])
  525. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  526. if d2 > 0.15 {
  527. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  528. if st.telemetry != nil {
  529. st.telemetry.IncCounter("audio.boundary_click.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  530. st.telemetry.Observe("audio.boundary_click.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  531. }
  532. }
  533. }
  534. // Intra-frame transient scan (L channel only for performance)
  535. nClicks := 0
  536. maxD2 := float64(0)
  537. maxD2Pos := 0
  538. for k := 1; k < nFrames-1; k++ {
  539. a := float64(audio[(k-1)*stride])
  540. b := float64(audio[k*stride])
  541. c := float64(audio[(k+1)*stride])
  542. d2 := math.Abs(2*b - a - c)
  543. if d2 > maxD2 {
  544. maxD2 = d2
  545. maxD2Pos = k
  546. }
  547. if d2 > 0.15 {
  548. nClicks++
  549. }
  550. }
  551. if nClicks > 0 {
  552. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  553. if st.telemetry != nil {
  554. st.telemetry.IncCounter("audio.intra_click.count", float64(nClicks), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  555. st.telemetry.Observe("audio.intra_click.max_d2", maxD2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  556. }
  557. }
  558. // Store last two samples for next frame's boundary check
  559. if nFrames >= 2 {
  560. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  561. sess.lastAudioL = audio[(nFrames-1)*stride]
  562. if stride > 1 {
  563. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  564. }
  565. } else if nFrames == 1 {
  566. sess.prevAudioL = float64(sess.lastAudioL)
  567. sess.lastAudioL = audio[0]
  568. if stride > 1 && len(audio) >= 2 {
  569. sess.lastAudioR = audio[1]
  570. }
  571. }
  572. sess.lastAudioSet = true
  573. }
  574. sess.lastAudioTs = time.Now()
  575. }
  576. st.fanoutPCM(sess, pcm, pcmLen)
  577. }
  578. // Segment split (recording sessions only)
  579. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  580. segIdx := sess.segmentIdx + 1
  581. oldSubs := sess.audioSubs
  582. oldState := sess.captureDSPState()
  583. sess.audioSubs = nil
  584. closeSession(sess, &st.policy)
  585. s, err := st.openRecordingSession(sig, now)
  586. if err != nil {
  587. delete(st.sessions, sig.ID)
  588. continue
  589. }
  590. s.segmentIdx = segIdx
  591. s.audioSubs = oldSubs
  592. s.restoreDSPState(oldState)
  593. st.sessions[sig.ID] = s
  594. if st.telemetry != nil {
  595. st.telemetry.IncCounter("streamer.session.reopen", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID)))
  596. st.telemetry.Event("session_reopen", "info", "stream session rotated by max duration", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sig.ID)), map[string]any{
  597. "old_session": sess.sessionID,
  598. "new_session": s.sessionID,
  599. })
  600. }
  601. }
  602. }
  603. // Close sessions for disappeared signals (with grace period)
  604. for id, sess := range st.sessions {
  605. if seen[id] {
  606. continue
  607. }
  608. gracePeriod := 3 * time.Second
  609. if sess.listenOnly {
  610. gracePeriod = 5 * time.Second
  611. }
  612. if now.Sub(sess.lastFeed) > gracePeriod {
  613. for _, sub := range sess.audioSubs {
  614. close(sub.ch)
  615. }
  616. sess.audioSubs = nil
  617. if !sess.listenOnly {
  618. closeSession(sess, &st.policy)
  619. }
  620. if st.telemetry != nil {
  621. st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID))
  622. st.telemetry.Event("session_close", "info", "stream session closed", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID), map[string]any{
  623. "reason": "signal_missing",
  624. "listen_only": sess.listenOnly,
  625. })
  626. }
  627. delete(st.sessions, id)
  628. }
  629. }
  630. }
  631. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  632. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  633. if st.policy.DebugLiveAudio {
  634. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  635. }
  636. return true
  637. }
  638. for subID, pl := range st.pendingListens {
  639. delta := math.Abs(sig.CenterHz - pl.freq)
  640. if delta < 200000 {
  641. if st.policy.DebugLiveAudio {
  642. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  643. }
  644. return true
  645. }
  646. }
  647. return false
  648. }
  649. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  650. for subID, pl := range st.pendingListens {
  651. requestedMode := normalizeRequestedMode(pl.mode)
  652. if requestedMode != "" && sess.demodName != requestedMode {
  653. continue
  654. }
  655. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  656. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  657. delete(st.pendingListens, subID)
  658. // Send updated audio_info now that we know the real session params.
  659. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  660. infoJSON, _ := json.Marshal(sess.audioInfo())
  661. tagged := make([]byte, 1+len(infoJSON))
  662. tagged[0] = 0x00 // tag: audio_info
  663. copy(tagged[1:], infoJSON)
  664. select {
  665. case pl.ch <- tagged:
  666. default:
  667. }
  668. if audioDumpEnabled {
  669. now := time.Now()
  670. sess.debugDumpStart = now.Add(debugDumpDelay)
  671. sess.debugDumpUntil = sess.debugDumpStart.Add(debugDumpDuration)
  672. sess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", sess.signalID, now.Format("20060102-150405")))
  673. sess.demodDump = nil
  674. sess.finalDump = nil
  675. }
  676. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  677. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  678. if audioDumpEnabled {
  679. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", sess.signalID, sess.debugDumpStart.Format(time.RFC3339), sess.debugDumpUntil.Format(time.RFC3339))
  680. }
  681. }
  682. }
  683. }
  684. // CloseAll finalises all sessions and stops the worker goroutine.
  685. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  686. st.mu.Lock()
  687. defer st.mu.Unlock()
  688. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  689. for _, sess := range st.sessions {
  690. out[sess.signalID] = RuntimeSignalInfo{
  691. DemodName: sess.demodName,
  692. PlaybackMode: sess.playbackMode,
  693. StereoState: sess.stereoState,
  694. Channels: sess.channels,
  695. SampleRate: sess.sampleRate,
  696. }
  697. }
  698. return out
  699. }
  700. func (st *Streamer) CloseAll() {
  701. close(st.feedCh)
  702. <-st.done
  703. st.mu.Lock()
  704. defer st.mu.Unlock()
  705. for id, sess := range st.sessions {
  706. for _, sub := range sess.audioSubs {
  707. close(sub.ch)
  708. }
  709. sess.audioSubs = nil
  710. if !sess.listenOnly {
  711. closeSession(sess, &st.policy)
  712. }
  713. if st.telemetry != nil {
  714. st.telemetry.IncCounter("streamer.session.close", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", id), "session_id", sess.sessionID))
  715. }
  716. delete(st.sessions, id)
  717. }
  718. for _, pl := range st.pendingListens {
  719. close(pl.ch)
  720. }
  721. st.pendingListens = nil
  722. if st.telemetry != nil {
  723. st.telemetry.Event("streamer_close_all", "info", "all stream sessions closed", nil, nil)
  724. }
  725. }
  726. // ActiveSessions returns the number of open streaming sessions.
  727. func (st *Streamer) ActiveSessions() int {
  728. st.mu.Lock()
  729. defer st.mu.Unlock()
  730. return len(st.sessions)
  731. }
  732. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  733. //
  734. // LL-2: Returns AudioInfo with correct channels and sample rate.
  735. // LL-3: Returns error only on hard failures (nil streamer etc).
  736. //
  737. // If a matching session exists, attaches immediately. Otherwise, the
  738. // subscriber is held as "pending" and will be attached when a matching
  739. // signal appears in the next DSP frame.
  740. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  741. ch := make(chan []byte, 64)
  742. st.mu.Lock()
  743. defer st.mu.Unlock()
  744. st.nextSub++
  745. subID := st.nextSub
  746. requestedMode := normalizeRequestedMode(mode)
  747. // Try to find a matching session
  748. var bestSess *streamSession
  749. bestDist := math.MaxFloat64
  750. for _, sess := range st.sessions {
  751. if requestedMode != "" && sess.demodName != requestedMode {
  752. continue
  753. }
  754. d := math.Abs(sess.centerHz - freq)
  755. if d < bestDist {
  756. bestDist = d
  757. bestSess = sess
  758. }
  759. }
  760. if bestSess != nil && bestDist < 200000 {
  761. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  762. if audioDumpEnabled {
  763. now := time.Now()
  764. bestSess.debugDumpStart = now.Add(debugDumpDelay)
  765. bestSess.debugDumpUntil = bestSess.debugDumpStart.Add(debugDumpDuration)
  766. bestSess.debugDumpBase = filepath.Join("debug", fmt.Sprintf("signal-%d-window-%s", bestSess.signalID, now.Format("20060102-150405")))
  767. bestSess.demodDump = nil
  768. bestSess.finalDump = nil
  769. }
  770. info := bestSess.audioInfo()
  771. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  772. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  773. if audioDumpEnabled {
  774. log.Printf("STREAM: debug dump armed signal=%d start=%s until=%s", bestSess.signalID, bestSess.debugDumpStart.Format(time.RFC3339), bestSess.debugDumpUntil.Format(time.RFC3339))
  775. }
  776. if st.telemetry != nil {
  777. st.telemetry.IncCounter("streamer.listener.attach", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", bestSess.signalID), "session_id", bestSess.sessionID))
  778. }
  779. return subID, ch, info, nil
  780. }
  781. // No matching session yet — add as pending listener
  782. st.pendingListens[subID] = &pendingListen{
  783. freq: freq,
  784. bw: bw,
  785. mode: mode,
  786. ch: ch,
  787. }
  788. info := defaultAudioInfoForMode(mode)
  789. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  790. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  791. if st.telemetry != nil {
  792. st.telemetry.IncCounter("streamer.listener.pending", 1, nil)
  793. st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil)
  794. }
  795. return subID, ch, info, nil
  796. }
  797. // UnsubscribeAudio removes a live-listen subscriber.
  798. func (st *Streamer) UnsubscribeAudio(subID int64) {
  799. st.mu.Lock()
  800. defer st.mu.Unlock()
  801. if pl, ok := st.pendingListens[subID]; ok {
  802. close(pl.ch)
  803. delete(st.pendingListens, subID)
  804. if st.telemetry != nil {
  805. st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "pending"))
  806. st.telemetry.SetGauge("streamer.pending_listeners", float64(len(st.pendingListens)), nil)
  807. }
  808. return
  809. }
  810. for _, sess := range st.sessions {
  811. for i, sub := range sess.audioSubs {
  812. if sub.id == subID {
  813. close(sub.ch)
  814. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  815. if st.telemetry != nil {
  816. st.telemetry.IncCounter("streamer.listener.unsubscribe", 1, telemetry.TagsFromPairs("kind", "active", "session_id", sess.sessionID))
  817. }
  818. return
  819. }
  820. }
  821. }
  822. }
  823. // ---------------------------------------------------------------------------
  824. // Session: stateful extraction + demod
  825. // ---------------------------------------------------------------------------
  826. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  827. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  828. // output with zero transient artifacts.
  829. type iqHeadProbeStats struct {
  830. meanMag float64
  831. minMag float64
  832. maxStep float64
  833. p95Step float64
  834. lowMag int
  835. }
  836. func probeIQHeadStats(iq []complex64, probeLen int) iqHeadProbeStats {
  837. if probeLen <= 0 || len(iq) == 0 {
  838. return iqHeadProbeStats{}
  839. }
  840. if len(iq) < probeLen {
  841. probeLen = len(iq)
  842. }
  843. stats := iqHeadProbeStats{minMag: math.MaxFloat64}
  844. steps := make([]float64, 0, probeLen)
  845. var sum float64
  846. for i := 0; i < probeLen; i++ {
  847. v := iq[i]
  848. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  849. sum += mag
  850. if mag < stats.minMag {
  851. stats.minMag = mag
  852. }
  853. if mag < 0.02 {
  854. stats.lowMag++
  855. }
  856. if i > 0 {
  857. p := iq[i-1]
  858. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  859. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  860. step := math.Abs(math.Atan2(num, den))
  861. steps = append(steps, step)
  862. if step > stats.maxStep {
  863. stats.maxStep = step
  864. }
  865. }
  866. }
  867. stats.meanMag = sum / float64(probeLen)
  868. if len(steps) > 0 {
  869. sorted := append([]float64(nil), steps...)
  870. sort.Float64s(sorted)
  871. idx := int(math.Round(0.95 * float64(len(sorted)-1)))
  872. if idx < 0 {
  873. idx = 0
  874. }
  875. if idx >= len(sorted) {
  876. idx = len(sorted) - 1
  877. }
  878. stats.p95Step = sorted[idx]
  879. }
  880. if stats.minMag == math.MaxFloat64 {
  881. stats.minMag = 0
  882. }
  883. return stats
  884. }
  885. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int, coll *telemetry.Collector) ([]float32, int) {
  886. if len(snippet) == 0 || snipRate <= 0 {
  887. return nil, 0
  888. }
  889. baseTags := telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID)
  890. if coll != nil {
  891. coll.SetGauge("iq.stage.snippet.length", float64(len(snippet)), baseTags)
  892. stats := probeIQHeadStats(snippet, 64)
  893. coll.Observe("iq.snippet.head_mean_mag", stats.meanMag, baseTags)
  894. coll.Observe("iq.snippet.head_min_mag", stats.minMag, baseTags)
  895. coll.Observe("iq.snippet.head_max_step", stats.maxStep, baseTags)
  896. coll.Observe("iq.snippet.head_p95_step", stats.p95Step, baseTags)
  897. coll.SetGauge("iq.snippet.head_low_magnitude_count", float64(stats.lowMag), baseTags)
  898. if sess.lastExtractIQSet {
  899. prevMag := math.Hypot(float64(real(sess.lastExtractIQ)), float64(imag(sess.lastExtractIQ)))
  900. currMag := math.Hypot(float64(real(snippet[0])), float64(imag(snippet[0])))
  901. deltaMag := math.Abs(currMag - prevMag)
  902. num := float64(real(sess.lastExtractIQ))*float64(imag(snippet[0])) - float64(imag(sess.lastExtractIQ))*float64(real(snippet[0]))
  903. den := float64(real(sess.lastExtractIQ))*float64(real(snippet[0])) + float64(imag(sess.lastExtractIQ))*float64(imag(snippet[0]))
  904. deltaPhase := math.Abs(math.Atan2(num, den))
  905. d2 := float64(real(snippet[0]-sess.lastExtractIQ))*float64(real(snippet[0]-sess.lastExtractIQ)) + float64(imag(snippet[0]-sess.lastExtractIQ))*float64(imag(snippet[0]-sess.lastExtractIQ))
  906. coll.Observe("iq.extract.output.boundary.delta_mag", deltaMag, baseTags)
  907. coll.Observe("iq.extract.output.boundary.delta_phase", deltaPhase, baseTags)
  908. coll.Observe("iq.extract.output.boundary.d2", d2, baseTags)
  909. coll.Observe("iq.extract.output.boundary.discontinuity_score", deltaMag+deltaPhase, baseTags)
  910. }
  911. }
  912. if len(snippet) > 0 {
  913. sess.prevExtractIQ = sess.lastExtractIQ
  914. sess.lastExtractIQ = snippet[len(snippet)-1]
  915. sess.lastExtractIQSet = true
  916. }
  917. isWFMStereo := sess.demodName == "WFM_STEREO"
  918. isWFM := sess.demodName == "WFM" || isWFMStereo
  919. demodName := sess.demodName
  920. if isWFMStereo {
  921. demodName = "WFM"
  922. }
  923. d := demod.Get(demodName)
  924. if d == nil {
  925. d = demod.Get("NFM")
  926. }
  927. if d == nil {
  928. return nil, 0
  929. }
  930. // The extra 1-sample discriminator overlap prepend was removed after it was
  931. // shown to shift the downstream decimation phase and create heavy click
  932. // artifacts in steady-state streaming/recording. The upstream extraction path
  933. // and the stateful FIR/decimation stages already provide continuity.
  934. fullSnip := snippet
  935. overlapApplied := false
  936. prevTailValid := false
  937. if logging.EnabledCategory("prefir") && len(fullSnip) > 0 {
  938. probeN := 64
  939. if len(fullSnip) < probeN {
  940. probeN = len(fullSnip)
  941. }
  942. minPreMag := math.MaxFloat64
  943. minPreIdx := 0
  944. maxPreStep := 0.0
  945. maxPreStepIdx := 0
  946. for i := 0; i < probeN; i++ {
  947. v := fullSnip[i]
  948. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  949. if mag < minPreMag {
  950. minPreMag = mag
  951. minPreIdx = i
  952. }
  953. if i > 0 {
  954. p := fullSnip[i-1]
  955. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  956. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  957. step := math.Abs(math.Atan2(num, den))
  958. if step > maxPreStep {
  959. maxPreStep = step
  960. maxPreStepIdx = i - 1
  961. }
  962. }
  963. }
  964. logging.Debug("prefir", "pre_fir_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "snip_len", len(fullSnip))
  965. if minPreMag < 0.18 {
  966. logging.Warn("prefir", "pre_fir_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minPreMag, "min_idx", minPreIdx, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx)
  967. }
  968. if maxPreStep > 1.5 {
  969. logging.Warn("prefir", "pre_fir_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxPreStep, "max_step_idx", maxPreStepIdx, "min_mag", minPreMag, "min_idx", minPreIdx)
  970. }
  971. }
  972. // --- Stateful anti-alias FIR + decimation to demod rate ---
  973. demodRate := d.OutputSampleRate()
  974. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  975. if decim1 < 1 {
  976. decim1 = 1
  977. }
  978. // WFM override: force decim1=2 (256kHz) instead of round(512k/192k)=3 (170kHz).
  979. // At decim1=3, Nyquist is 85kHz which clips FM broadcast ±75kHz deviation.
  980. // At decim1=2, Nyquist is 128kHz → full FM deviation + stereo pilot + guard band.
  981. // Bonus: 256000→48000 resampler ratio is L=3/M=16 (96 taps, 1kB) instead of
  982. // the pathological L=24000/M=85333 (768k taps, 6MB) from 170666→48000.
  983. if isWFM && decim1 > 2 && snipRate/2 >= 200000 {
  984. decim1 = 2
  985. }
  986. actualDemodRate := snipRate / decim1
  987. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  988. var dec []complex64
  989. if decim1 > 1 {
  990. // FIR cutoff: for WFM, use 90kHz (above ±75kHz FM deviation + guard).
  991. // For NFM/other: use standard Nyquist*0.8 cutoff.
  992. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  993. if isWFM {
  994. cutoff = 90000
  995. }
  996. // Lazy-init or reinit stateful FIR if parameters changed
  997. if sess.preDemodDecimator == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff || sess.preDemodDecim != decim1 {
  998. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  999. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  1000. sess.preDemodDecimator = dsp.NewStatefulDecimatingFIRComplex(taps, decim1)
  1001. sess.preDemodRate = snipRate
  1002. sess.preDemodCutoff = cutoff
  1003. sess.preDemodDecim = decim1
  1004. sess.preDemodDecimPhase = 0
  1005. if coll != nil {
  1006. coll.IncCounter("dsp.pre_demod.init", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1007. coll.Event("prefir_reinit", "info", "pre-demod decimator reinitialized", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1008. "snip_rate": snipRate,
  1009. "cutoff_hz": cutoff,
  1010. "decim": decim1,
  1011. })
  1012. }
  1013. }
  1014. decimPhaseBefore := sess.preDemodDecimPhase
  1015. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  1016. dec = sess.preDemodDecimator.Process(fullSnip)
  1017. sess.preDemodDecimPhase = sess.preDemodDecimator.Phase()
  1018. if coll != nil {
  1019. coll.Observe("dsp.pre_demod.decimation_factor", float64(decim1), baseTags)
  1020. coll.SetGauge("iq.stage.pre_demod.length", float64(len(dec)), baseTags)
  1021. decStats := probeIQHeadStats(dec, 64)
  1022. coll.Observe("iq.pre_demod.head_mean_mag", decStats.meanMag, baseTags)
  1023. coll.Observe("iq.pre_demod.head_min_mag", decStats.minMag, baseTags)
  1024. coll.Observe("iq.pre_demod.head_max_step", decStats.maxStep, baseTags)
  1025. coll.Observe("iq.pre_demod.head_p95_step", decStats.p95Step, baseTags)
  1026. coll.SetGauge("iq.pre_demod.head_low_magnitude_count", float64(decStats.lowMag), baseTags)
  1027. }
  1028. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase)
  1029. } else {
  1030. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0)
  1031. dec = fullSnip
  1032. }
  1033. if decHeadTrimSamples > 0 && decHeadTrimSamples < len(dec) {
  1034. logging.Warn("boundary", "dec_head_trim_applied", "signal", sess.signalID, "trim", decHeadTrimSamples, "before_len", len(dec))
  1035. dec = dec[decHeadTrimSamples:]
  1036. if coll != nil {
  1037. coll.IncCounter("dsp.pre_demod.head_trim", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1038. }
  1039. }
  1040. if logging.EnabledCategory("boundary") && len(dec) > 0 {
  1041. first := dec[0]
  1042. if sess.lastDecIQSet {
  1043. d2Re := math.Abs(2*float64(real(sess.lastDecIQ)) - float64(real(sess.prevDecIQ)) - float64(real(first)))
  1044. d2Im := math.Abs(2*float64(imag(sess.lastDecIQ)) - float64(imag(sess.prevDecIQ)) - float64(imag(first)))
  1045. d2Mag := math.Hypot(d2Re, d2Im)
  1046. if d2Mag > 0.15 {
  1047. logging.Warn("boundary", "dec_iq_boundary", "signal", sess.signalID, "d2", d2Mag)
  1048. if coll != nil {
  1049. coll.IncCounter("iq.dec.boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1050. coll.Observe("iq.dec.boundary.d2", d2Mag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1051. }
  1052. }
  1053. }
  1054. headN := 16
  1055. if len(dec) < headN {
  1056. headN = len(dec)
  1057. }
  1058. tailN := 16
  1059. if len(dec) < tailN {
  1060. tailN = len(dec)
  1061. }
  1062. var headSum, tailSum, minMag, maxMag float64
  1063. minMag = math.MaxFloat64
  1064. for i, v := range dec {
  1065. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  1066. if mag < minMag {
  1067. minMag = mag
  1068. }
  1069. if mag > maxMag {
  1070. maxMag = mag
  1071. }
  1072. if i < headN {
  1073. headSum += mag
  1074. }
  1075. }
  1076. for i := len(dec) - tailN; i < len(dec); i++ {
  1077. if i >= 0 {
  1078. v := dec[i]
  1079. tailSum += math.Hypot(float64(real(v)), float64(imag(v)))
  1080. }
  1081. }
  1082. headAvg := 0.0
  1083. if headN > 0 {
  1084. headAvg = headSum / float64(headN)
  1085. }
  1086. tailAvg := 0.0
  1087. if tailN > 0 {
  1088. tailAvg = tailSum / float64(tailN)
  1089. }
  1090. logging.Debug("boundary", "dec_iq_meter", "signal", sess.signalID, "len", len(dec), "head_avg", headAvg, "tail_avg", tailAvg, "min_mag", minMag, "max_mag", maxMag)
  1091. if tailAvg > 0 {
  1092. ratio := headAvg / tailAvg
  1093. if ratio < 0.75 || ratio > 1.25 {
  1094. logging.Warn("boundary", "dec_iq_head_tail_skew", "signal", sess.signalID, "head_avg", headAvg, "tail_avg", tailAvg, "ratio", ratio)
  1095. }
  1096. if coll != nil {
  1097. coll.Observe("iq.dec.head_tail_ratio", ratio, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1098. }
  1099. }
  1100. probeN := 64
  1101. if len(dec) < probeN {
  1102. probeN = len(dec)
  1103. }
  1104. minHeadMag := math.MaxFloat64
  1105. minHeadIdx := 0
  1106. maxHeadStep := 0.0
  1107. maxHeadStepIdx := 0
  1108. for i := 0; i < probeN; i++ {
  1109. v := dec[i]
  1110. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  1111. if mag < minHeadMag {
  1112. minHeadMag = mag
  1113. minHeadIdx = i
  1114. }
  1115. if i > 0 {
  1116. p := dec[i-1]
  1117. num := float64(real(p))*float64(imag(v)) - float64(imag(p))*float64(real(v))
  1118. den := float64(real(p))*float64(real(v)) + float64(imag(p))*float64(imag(v))
  1119. step := math.Abs(math.Atan2(num, den))
  1120. if step > maxHeadStep {
  1121. maxHeadStep = step
  1122. maxHeadStepIdx = i - 1
  1123. }
  1124. }
  1125. }
  1126. logging.Debug("boundary", "dec_iq_head_probe", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  1127. if minHeadMag < 0.18 {
  1128. logging.Warn("boundary", "dec_iq_head_dip", "signal", sess.signalID, "probe_len", probeN, "min_mag", minHeadMag, "min_idx", minHeadIdx, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx)
  1129. }
  1130. if maxHeadStep > 1.5 {
  1131. logging.Warn("boundary", "dec_iq_head_step", "signal", sess.signalID, "probe_len", probeN, "max_step", maxHeadStep, "max_step_idx", maxHeadStepIdx, "min_mag", minHeadMag, "min_idx", minHeadIdx)
  1132. }
  1133. if coll != nil {
  1134. coll.Observe("iq.dec.magnitude.min", minMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1135. coll.Observe("iq.dec.magnitude.max", maxMag, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1136. coll.Observe("iq.dec.phase_step.max", maxHeadStep, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1137. }
  1138. if len(dec) >= 2 {
  1139. sess.prevDecIQ = dec[len(dec)-2]
  1140. sess.lastDecIQ = dec[len(dec)-1]
  1141. } else {
  1142. sess.prevDecIQ = sess.lastDecIQ
  1143. sess.lastDecIQ = dec[0]
  1144. }
  1145. sess.lastDecIQSet = true
  1146. }
  1147. // --- FM/AM/etc Demod ---
  1148. // For FM demod (NFM/WFM): bridge the block boundary by prepending the
  1149. // previous block's last IQ sample. Without this, the discriminator loses
  1150. // the cross-boundary phase step (1 audio sample missing per block) and
  1151. // any phase discontinuity at the seam becomes an unsmoothed audio transient.
  1152. var audio []float32
  1153. isFMDemod := demodName == "NFM" || demodName == "WFM"
  1154. if isFMDemod && sess.lastDiscrimIQSet && len(dec) > 0 {
  1155. bridged := make([]complex64, len(dec)+1)
  1156. bridged[0] = sess.lastDiscrimIQ
  1157. copy(bridged[1:], dec)
  1158. audio = d.Demod(bridged, actualDemodRate)
  1159. // bridged produced len(dec) audio samples (= len(bridged)-1)
  1160. // which is exactly the correct count for the new data
  1161. } else {
  1162. audio = d.Demod(dec, actualDemodRate)
  1163. }
  1164. if len(dec) > 0 {
  1165. sess.lastDiscrimIQ = dec[len(dec)-1]
  1166. sess.lastDiscrimIQSet = true
  1167. }
  1168. if len(audio) == 0 {
  1169. return nil, 0
  1170. }
  1171. if coll != nil {
  1172. coll.SetGauge("audio.stage.demod.length", float64(len(audio)), baseTags)
  1173. probe := 64
  1174. if len(audio) < probe {
  1175. probe = len(audio)
  1176. }
  1177. if probe > 0 {
  1178. var headAbs, tailAbs float64
  1179. for i := 0; i < probe; i++ {
  1180. headAbs += math.Abs(float64(audio[i]))
  1181. tailAbs += math.Abs(float64(audio[len(audio)-probe+i]))
  1182. }
  1183. coll.Observe("audio.demod.head_mean_abs", headAbs/float64(probe), baseTags)
  1184. coll.Observe("audio.demod.tail_mean_abs", tailAbs/float64(probe), baseTags)
  1185. coll.Observe("audio.demod.edge_delta_abs", math.Abs(float64(audio[0])-float64(audio[len(audio)-1])), baseTags)
  1186. }
  1187. }
  1188. if logging.EnabledCategory("boundary") {
  1189. stride := d.Channels()
  1190. if stride < 1 {
  1191. stride = 1
  1192. }
  1193. nFrames := len(audio) / stride
  1194. if nFrames > 0 {
  1195. first := float64(audio[0])
  1196. if sess.lastDemodSet {
  1197. d2 := math.Abs(2*float64(sess.lastDemodL) - sess.prevDemodL - first)
  1198. if d2 > 0.15 {
  1199. logging.Warn("boundary", "demod_boundary", "signal", sess.signalID, "d2", d2)
  1200. if coll != nil {
  1201. coll.IncCounter("audio.demod_boundary.count", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1202. coll.Observe("audio.demod_boundary.d2", d2, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID)))
  1203. }
  1204. }
  1205. }
  1206. if nFrames >= 2 {
  1207. sess.prevDemodL = float64(audio[(nFrames-2)*stride])
  1208. sess.lastDemodL = audio[(nFrames-1)*stride]
  1209. } else {
  1210. sess.prevDemodL = float64(sess.lastDemodL)
  1211. sess.lastDemodL = audio[0]
  1212. }
  1213. sess.lastDemodSet = true
  1214. }
  1215. }
  1216. logging.Debug("boundary", "audio_path", "signal", sess.signalID, "demod", demodName, "actual_rate", actualDemodRate, "audio_len", len(audio), "channels", d.Channels(), "overlap_applied", overlapApplied, "prev_tail_valid", prevTailValid)
  1217. shouldDump := !sess.debugDumpStart.IsZero() && !sess.debugDumpUntil.IsZero()
  1218. if shouldDump {
  1219. now := time.Now()
  1220. shouldDump = !now.Before(sess.debugDumpStart) && now.Before(sess.debugDumpUntil)
  1221. }
  1222. if shouldDump {
  1223. sess.demodDump = append(sess.demodDump, audio...)
  1224. }
  1225. // --- Stateful stereo decode with conservative lock/hysteresis ---
  1226. channels := 1
  1227. if isWFMStereo {
  1228. sess.playbackMode = "WFM_STEREO"
  1229. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  1230. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  1231. if locked {
  1232. sess.stereoOnCount++
  1233. sess.stereoOffCount = 0
  1234. if sess.stereoOnCount >= 4 {
  1235. sess.stereoEnabled = true
  1236. }
  1237. } else {
  1238. sess.stereoOnCount = 0
  1239. sess.stereoOffCount++
  1240. if sess.stereoOffCount >= 10 {
  1241. sess.stereoEnabled = false
  1242. }
  1243. }
  1244. prevPlayback := sess.playbackMode
  1245. prevStereo := sess.stereoState
  1246. if sess.stereoEnabled && len(stereoAudio) > 0 {
  1247. sess.stereoState = "locked"
  1248. audio = stereoAudio
  1249. } else {
  1250. sess.stereoState = "mono-fallback"
  1251. dual := make([]float32, len(audio)*2)
  1252. for i, s := range audio {
  1253. dual[i*2] = s
  1254. dual[i*2+1] = s
  1255. }
  1256. audio = dual
  1257. }
  1258. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  1259. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  1260. }
  1261. }
  1262. // --- Polyphase resample to exact 48kHz ---
  1263. if actualDemodRate != streamAudioRate {
  1264. if channels > 1 {
  1265. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  1266. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  1267. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1268. sess.stereoResamplerRate = actualDemodRate
  1269. if coll != nil {
  1270. coll.Event("resampler_reset", "info", "stereo resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1271. "mode": "stereo",
  1272. "rate": actualDemodRate,
  1273. })
  1274. }
  1275. }
  1276. audio = sess.stereoResampler.Process(audio)
  1277. } else {
  1278. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  1279. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  1280. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  1281. sess.monoResamplerRate = actualDemodRate
  1282. if coll != nil {
  1283. coll.Event("resampler_reset", "info", "mono resampler reset", telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID), map[string]any{
  1284. "mode": "mono",
  1285. "rate": actualDemodRate,
  1286. })
  1287. }
  1288. }
  1289. audio = sess.monoResampler.Process(audio)
  1290. }
  1291. }
  1292. if coll != nil {
  1293. coll.SetGauge("audio.stage.output.length", float64(len(audio)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1294. }
  1295. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  1296. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  1297. tau := sess.deemphasisUs * 1e-6
  1298. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  1299. if channels > 1 {
  1300. nFrames := len(audio) / channels
  1301. yL, yR := sess.deemphL, sess.deemphR
  1302. for i := 0; i < nFrames; i++ {
  1303. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  1304. audio[i*2] = float32(yL)
  1305. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  1306. audio[i*2+1] = float32(yR)
  1307. }
  1308. sess.deemphL, sess.deemphR = yL, yR
  1309. } else {
  1310. y := sess.deemphL
  1311. for i := range audio {
  1312. y = alpha*y + (1-alpha)*float64(audio[i])
  1313. audio[i] = float32(y)
  1314. }
  1315. sess.deemphL = y
  1316. }
  1317. }
  1318. if isWFM {
  1319. for i := range audio {
  1320. audio[i] *= 0.35
  1321. }
  1322. }
  1323. if shouldDump {
  1324. sess.finalDump = append(sess.finalDump, audio...)
  1325. } else if !sess.debugDumpUntil.IsZero() && time.Now().After(sess.debugDumpUntil) && sess.debugDumpBase != "" {
  1326. _ = os.MkdirAll(filepath.Dir(sess.debugDumpBase), 0o755)
  1327. if len(sess.demodDump) > 0 {
  1328. _ = writeWAVFile(sess.debugDumpBase+"-demod.wav", sess.demodDump, actualDemodRate, d.Channels())
  1329. }
  1330. if len(sess.finalDump) > 0 {
  1331. _ = writeWAVFile(sess.debugDumpBase+"-final.wav", sess.finalDump, streamAudioRate, channels)
  1332. }
  1333. logging.Warn("boundary", "debug_audio_dump_window", "signal", sess.signalID, "base", sess.debugDumpBase)
  1334. sess.debugDumpBase = ""
  1335. sess.demodDump = nil
  1336. sess.finalDump = nil
  1337. sess.debugDumpStart = time.Time{}
  1338. sess.debugDumpUntil = time.Time{}
  1339. }
  1340. return audio, streamAudioRate
  1341. }
  1342. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  1343. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  1344. // loopBW is in Hz, sampleRate in samples/sec.
  1345. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  1346. if sampleRate <= 0 || loopBW <= 0 {
  1347. return 0, 0
  1348. }
  1349. bl := loopBW / float64(sampleRate)
  1350. theta := bl / (damping + 0.25/damping)
  1351. d := 1 + 2*damping*theta + theta*theta
  1352. alpha := (4 * damping * theta) / d
  1353. beta := (4 * theta * theta) / d
  1354. return alpha, beta
  1355. }
  1356. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  1357. // Uses persistent FIR filter state across frames for click-free stereo.
  1358. // Reuses session scratch buffers to minimize allocations.
  1359. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  1360. if len(mono) == 0 || sampleRate <= 0 {
  1361. return nil, false
  1362. }
  1363. n := len(mono)
  1364. // Rebuild rate-dependent stereo filters when sampleRate changes
  1365. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  1366. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  1367. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  1368. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  1369. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  1370. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  1371. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  1372. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  1373. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  1374. sess.stereoFilterRate = sampleRate
  1375. // Initialize PLL for 19kHz pilot tracking.
  1376. sess.pilotPhase = 0
  1377. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  1378. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  1379. sess.pilotErrAvg = 0
  1380. sess.pilotI = 0
  1381. sess.pilotQ = 0
  1382. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  1383. }
  1384. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  1385. scratch := sess.growAudio(n * 5)
  1386. lpr := scratch[:n]
  1387. bpfLR := scratch[n : 2*n]
  1388. lr := scratch[2*n : 3*n]
  1389. work1 := scratch[3*n : 4*n]
  1390. work2 := scratch[4*n : 5*n]
  1391. sess.stereoLPF.ProcessInto(mono, lpr)
  1392. // 23-53kHz bandpass for L-R DSB-SC.
  1393. sess.stereoBPHi.ProcessInto(mono, work1)
  1394. sess.stereoBPLo.ProcessInto(mono, work2)
  1395. for i := 0; i < n; i++ {
  1396. bpfLR[i] = work1[i] - work2[i]
  1397. }
  1398. // 19kHz pilot bandpass for PLL.
  1399. sess.pilotLPFHi.ProcessInto(mono, work1)
  1400. sess.pilotLPFLo.ProcessInto(mono, work2)
  1401. for i := 0; i < n; i++ {
  1402. work1[i] = work1[i] - work2[i]
  1403. }
  1404. pilot := work1
  1405. phase := sess.pilotPhase
  1406. freq := sess.pilotFreq
  1407. alpha := sess.pilotAlpha
  1408. beta := sess.pilotBeta
  1409. iState := sess.pilotI
  1410. qState := sess.pilotQ
  1411. lpAlpha := sess.pilotLPAlpha
  1412. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  1413. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  1414. var pilotPower float64
  1415. var totalPower float64
  1416. var errSum float64
  1417. for i := 0; i < n; i++ {
  1418. p := float64(pilot[i])
  1419. sinP, cosP := math.Sincos(phase)
  1420. iMix := p * cosP
  1421. qMix := p * -sinP
  1422. iState += lpAlpha * (iMix - iState)
  1423. qState += lpAlpha * (qMix - qState)
  1424. err := math.Atan2(qState, iState)
  1425. freq += beta * err
  1426. if freq < minFreq {
  1427. freq = minFreq
  1428. } else if freq > maxFreq {
  1429. freq = maxFreq
  1430. }
  1431. phase += freq + alpha*err
  1432. if phase > 2*math.Pi {
  1433. phase -= 2 * math.Pi
  1434. } else if phase < 0 {
  1435. phase += 2 * math.Pi
  1436. }
  1437. totalPower += float64(mono[i]) * float64(mono[i])
  1438. pilotPower += p * p
  1439. errSum += math.Abs(err)
  1440. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  1441. }
  1442. sess.pilotPhase = phase
  1443. sess.pilotFreq = freq
  1444. sess.pilotI = iState
  1445. sess.pilotQ = qState
  1446. blockErr := errSum / float64(n)
  1447. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  1448. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  1449. pilotRatio := 0.0
  1450. if totalPower > 0 {
  1451. pilotRatio = pilotPower / totalPower
  1452. }
  1453. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  1454. // Lock heuristics: pilot power fraction and PLL phase error stability.
  1455. // Pilot power is a small but stable fraction of composite energy; require
  1456. // a modest floor plus PLL settling to avoid flapping in noise.
  1457. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  1458. out := make([]float32, n*2)
  1459. for i := 0; i < n; i++ {
  1460. out[i*2] = 0.5 * (lpr[i] + lr[i])
  1461. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  1462. }
  1463. return out, locked
  1464. }
  1465. // dspStateSnapshot captures persistent DSP state for segment splits.
  1466. type dspStateSnapshot struct {
  1467. overlapIQ []complex64
  1468. deemphL float64
  1469. deemphR float64
  1470. pilotPhase float64
  1471. pilotFreq float64
  1472. pilotAlpha float64
  1473. pilotBeta float64
  1474. pilotErrAvg float64
  1475. pilotI float64
  1476. pilotQ float64
  1477. pilotLPAlpha float64
  1478. monoResampler *dsp.Resampler
  1479. monoResamplerRate int
  1480. stereoResampler *dsp.StereoResampler
  1481. stereoResamplerRate int
  1482. stereoLPF *dsp.StatefulFIRReal
  1483. stereoFilterRate int
  1484. stereoBPHi *dsp.StatefulFIRReal
  1485. stereoBPLo *dsp.StatefulFIRReal
  1486. stereoLRLPF *dsp.StatefulFIRReal
  1487. stereoAALPF *dsp.StatefulFIRReal
  1488. pilotLPFHi *dsp.StatefulFIRReal
  1489. pilotLPFLo *dsp.StatefulFIRReal
  1490. preDemodFIR *dsp.StatefulFIRComplex
  1491. preDemodDecimator *dsp.StatefulDecimatingFIRComplex
  1492. preDemodDecim int
  1493. preDemodRate int
  1494. preDemodCutoff float64
  1495. preDemodDecimPhase int
  1496. }
  1497. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  1498. return dspStateSnapshot{
  1499. overlapIQ: sess.overlapIQ,
  1500. deemphL: sess.deemphL,
  1501. deemphR: sess.deemphR,
  1502. pilotPhase: sess.pilotPhase,
  1503. pilotFreq: sess.pilotFreq,
  1504. pilotAlpha: sess.pilotAlpha,
  1505. pilotBeta: sess.pilotBeta,
  1506. pilotErrAvg: sess.pilotErrAvg,
  1507. pilotI: sess.pilotI,
  1508. pilotQ: sess.pilotQ,
  1509. pilotLPAlpha: sess.pilotLPAlpha,
  1510. monoResampler: sess.monoResampler,
  1511. monoResamplerRate: sess.monoResamplerRate,
  1512. stereoResampler: sess.stereoResampler,
  1513. stereoResamplerRate: sess.stereoResamplerRate,
  1514. stereoLPF: sess.stereoLPF,
  1515. stereoFilterRate: sess.stereoFilterRate,
  1516. stereoBPHi: sess.stereoBPHi,
  1517. stereoBPLo: sess.stereoBPLo,
  1518. stereoLRLPF: sess.stereoLRLPF,
  1519. stereoAALPF: sess.stereoAALPF,
  1520. pilotLPFHi: sess.pilotLPFHi,
  1521. pilotLPFLo: sess.pilotLPFLo,
  1522. preDemodFIR: sess.preDemodFIR,
  1523. preDemodDecimator: sess.preDemodDecimator,
  1524. preDemodDecim: sess.preDemodDecim,
  1525. preDemodRate: sess.preDemodRate,
  1526. preDemodCutoff: sess.preDemodCutoff,
  1527. preDemodDecimPhase: sess.preDemodDecimPhase,
  1528. }
  1529. }
  1530. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  1531. sess.overlapIQ = s.overlapIQ
  1532. sess.deemphL = s.deemphL
  1533. sess.deemphR = s.deemphR
  1534. sess.pilotPhase = s.pilotPhase
  1535. sess.pilotFreq = s.pilotFreq
  1536. sess.pilotAlpha = s.pilotAlpha
  1537. sess.pilotBeta = s.pilotBeta
  1538. sess.pilotErrAvg = s.pilotErrAvg
  1539. sess.pilotI = s.pilotI
  1540. sess.pilotQ = s.pilotQ
  1541. sess.pilotLPAlpha = s.pilotLPAlpha
  1542. sess.monoResampler = s.monoResampler
  1543. sess.monoResamplerRate = s.monoResamplerRate
  1544. sess.stereoResampler = s.stereoResampler
  1545. sess.stereoResamplerRate = s.stereoResamplerRate
  1546. sess.stereoLPF = s.stereoLPF
  1547. sess.stereoFilterRate = s.stereoFilterRate
  1548. sess.stereoBPHi = s.stereoBPHi
  1549. sess.stereoBPLo = s.stereoBPLo
  1550. sess.stereoLRLPF = s.stereoLRLPF
  1551. sess.stereoAALPF = s.stereoAALPF
  1552. sess.pilotLPFHi = s.pilotLPFHi
  1553. sess.pilotLPFLo = s.pilotLPFLo
  1554. sess.preDemodFIR = s.preDemodFIR
  1555. sess.preDemodDecimator = s.preDemodDecimator
  1556. sess.preDemodDecim = s.preDemodDecim
  1557. sess.preDemodRate = s.preDemodRate
  1558. sess.preDemodCutoff = s.preDemodCutoff
  1559. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1560. }
  1561. // ---------------------------------------------------------------------------
  1562. // Session management helpers
  1563. // ---------------------------------------------------------------------------
  1564. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1565. outputDir := st.policy.OutputDir
  1566. if outputDir == "" {
  1567. outputDir = "data/recordings"
  1568. }
  1569. demodName, channels := resolveDemod(sig)
  1570. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1571. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1572. dir := filepath.Join(outputDir, dirName)
  1573. if err := os.MkdirAll(dir, 0o755); err != nil {
  1574. return nil, err
  1575. }
  1576. wavPath := filepath.Join(dir, "audio.wav")
  1577. f, err := os.Create(wavPath)
  1578. if err != nil {
  1579. return nil, err
  1580. }
  1581. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1582. f.Close()
  1583. return nil, err
  1584. }
  1585. playbackMode, stereoState := initialPlaybackState(demodName)
  1586. sess := &streamSession{
  1587. sessionID: fmt.Sprintf("%d-%d-r", sig.ID, now.UnixMilli()),
  1588. signalID: sig.ID,
  1589. centerHz: sig.CenterHz,
  1590. bwHz: sig.BWHz,
  1591. snrDb: sig.SNRDb,
  1592. peakDb: sig.PeakDb,
  1593. class: sig.Class,
  1594. startTime: now,
  1595. lastFeed: now,
  1596. dir: dir,
  1597. wavFile: f,
  1598. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1599. sampleRate: streamAudioRate,
  1600. channels: channels,
  1601. demodName: demodName,
  1602. playbackMode: playbackMode,
  1603. stereoState: stereoState,
  1604. deemphasisUs: st.policy.DeemphasisUs,
  1605. }
  1606. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1607. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1608. return sess, nil
  1609. }
  1610. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1611. demodName, channels := resolveDemod(sig)
  1612. for _, pl := range st.pendingListens {
  1613. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1614. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1615. demodName = requested
  1616. if demodName == "WFM_STEREO" {
  1617. channels = 2
  1618. } else if d := demod.Get(demodName); d != nil {
  1619. channels = d.Channels()
  1620. } else {
  1621. channels = 1
  1622. }
  1623. break
  1624. }
  1625. }
  1626. }
  1627. playbackMode, stereoState := initialPlaybackState(demodName)
  1628. sess := &streamSession{
  1629. sessionID: fmt.Sprintf("%d-%d-l", sig.ID, now.UnixMilli()),
  1630. signalID: sig.ID,
  1631. centerHz: sig.CenterHz,
  1632. bwHz: sig.BWHz,
  1633. snrDb: sig.SNRDb,
  1634. peakDb: sig.PeakDb,
  1635. class: sig.Class,
  1636. startTime: now,
  1637. lastFeed: now,
  1638. listenOnly: true,
  1639. sampleRate: streamAudioRate,
  1640. channels: channels,
  1641. demodName: demodName,
  1642. playbackMode: playbackMode,
  1643. stereoState: stereoState,
  1644. deemphasisUs: st.policy.DeemphasisUs,
  1645. }
  1646. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1647. sig.ID, sig.CenterHz/1e6, demodName)
  1648. return sess
  1649. }
  1650. func resolveDemod(sig *detector.Signal) (string, int) {
  1651. demodName := "NFM"
  1652. if sig.Class != nil {
  1653. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1654. demodName = n
  1655. }
  1656. }
  1657. channels := 1
  1658. if demodName == "WFM_STEREO" {
  1659. channels = 2
  1660. } else if d := demod.Get(demodName); d != nil {
  1661. channels = d.Channels()
  1662. }
  1663. return demodName, channels
  1664. }
  1665. func initialPlaybackState(demodName string) (string, string) {
  1666. playbackMode := demodName
  1667. stereoState := "mono"
  1668. if demodName == "WFM_STEREO" {
  1669. stereoState = "searching"
  1670. }
  1671. return playbackMode, stereoState
  1672. }
  1673. func (sess *streamSession) audioInfo() AudioInfo {
  1674. return AudioInfo{
  1675. SampleRate: sess.sampleRate,
  1676. Channels: sess.channels,
  1677. Format: "s16le",
  1678. DemodName: sess.demodName,
  1679. PlaybackMode: sess.playbackMode,
  1680. StereoState: sess.stereoState,
  1681. }
  1682. }
  1683. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1684. infoJSON, _ := json.Marshal(info)
  1685. tagged := make([]byte, 1+len(infoJSON))
  1686. tagged[0] = 0x00 // tag: audio_info
  1687. copy(tagged[1:], infoJSON)
  1688. for _, sub := range subs {
  1689. select {
  1690. case sub.ch <- tagged:
  1691. default:
  1692. }
  1693. }
  1694. }
  1695. func defaultAudioInfoForMode(mode string) AudioInfo {
  1696. demodName := "NFM"
  1697. if requested := normalizeRequestedMode(mode); requested != "" {
  1698. demodName = requested
  1699. }
  1700. channels := 1
  1701. if demodName == "WFM_STEREO" {
  1702. channels = 2
  1703. } else if d := demod.Get(demodName); d != nil {
  1704. channels = d.Channels()
  1705. }
  1706. playbackMode, stereoState := initialPlaybackState(demodName)
  1707. return AudioInfo{
  1708. SampleRate: streamAudioRate,
  1709. Channels: channels,
  1710. Format: "s16le",
  1711. DemodName: demodName,
  1712. PlaybackMode: playbackMode,
  1713. StereoState: stereoState,
  1714. }
  1715. }
  1716. func normalizeRequestedMode(mode string) string {
  1717. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1718. case "", "AUTO":
  1719. return ""
  1720. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1721. return strings.ToUpper(strings.TrimSpace(mode))
  1722. default:
  1723. return ""
  1724. }
  1725. }
  1726. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1727. func (sess *streamSession) growIQ(n int) []complex64 {
  1728. if cap(sess.scratchIQ) >= n {
  1729. return sess.scratchIQ[:n]
  1730. }
  1731. sess.scratchIQ = make([]complex64, n, n*5/4)
  1732. return sess.scratchIQ
  1733. }
  1734. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1735. func (sess *streamSession) growAudio(n int) []float32 {
  1736. if cap(sess.scratchAudio) >= n {
  1737. return sess.scratchAudio[:n]
  1738. }
  1739. sess.scratchAudio = make([]float32, n, n*5/4)
  1740. return sess.scratchAudio
  1741. }
  1742. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1743. func (sess *streamSession) growPCM(n int) []byte {
  1744. if cap(sess.scratchPCM) >= n {
  1745. return sess.scratchPCM[:n]
  1746. }
  1747. sess.scratchPCM = make([]byte, n, n*5/4)
  1748. return sess.scratchPCM
  1749. }
  1750. func convertToListenOnly(sess *streamSession) {
  1751. if sess.wavBuf != nil {
  1752. _ = sess.wavBuf.Flush()
  1753. }
  1754. if sess.wavFile != nil {
  1755. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1756. sess.wavFile.Close()
  1757. }
  1758. sess.wavFile = nil
  1759. sess.wavBuf = nil
  1760. sess.listenOnly = true
  1761. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1762. }
  1763. func closeSession(sess *streamSession, policy *Policy) {
  1764. if sess.listenOnly {
  1765. return
  1766. }
  1767. if sess.wavBuf != nil {
  1768. _ = sess.wavBuf.Flush()
  1769. }
  1770. if sess.wavFile != nil {
  1771. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1772. sess.wavFile.Close()
  1773. sess.wavFile = nil
  1774. sess.wavBuf = nil
  1775. }
  1776. dur := sess.lastFeed.Sub(sess.startTime)
  1777. files := map[string]any{
  1778. "audio": "audio.wav",
  1779. "audio_sample_rate": sess.sampleRate,
  1780. "audio_channels": sess.channels,
  1781. "audio_demod": sess.demodName,
  1782. "recording_mode": "streaming",
  1783. }
  1784. meta := Meta{
  1785. EventID: sess.signalID,
  1786. Start: sess.startTime,
  1787. End: sess.lastFeed,
  1788. CenterHz: sess.centerHz,
  1789. BandwidthHz: sess.bwHz,
  1790. SampleRate: sess.sampleRate,
  1791. SNRDb: sess.snrDb,
  1792. PeakDb: sess.peakDb,
  1793. Class: sess.class,
  1794. DurationMs: dur.Milliseconds(),
  1795. Files: files,
  1796. }
  1797. b, err := json.MarshalIndent(meta, "", " ")
  1798. if err == nil {
  1799. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1800. }
  1801. if policy != nil {
  1802. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1803. }
  1804. }
  1805. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1806. if len(sess.audioSubs) == 0 {
  1807. return
  1808. }
  1809. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1810. tagged := make([]byte, 1+pcmLen)
  1811. tagged[0] = 0x01
  1812. copy(tagged[1:], pcm[:pcmLen])
  1813. alive := sess.audioSubs[:0]
  1814. for _, sub := range sess.audioSubs {
  1815. select {
  1816. case sub.ch <- tagged:
  1817. default:
  1818. st.droppedPCM++
  1819. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1820. if st.telemetry != nil {
  1821. st.telemetry.IncCounter("streamer.pcm.drop", 1, telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1822. }
  1823. }
  1824. alive = append(alive, sub)
  1825. }
  1826. sess.audioSubs = alive
  1827. if st.telemetry != nil {
  1828. st.telemetry.SetGauge("streamer.subscribers.count", float64(len(alive)), telemetry.TagsFromPairs("signal_id", fmt.Sprintf("%d", sess.signalID), "session_id", sess.sessionID))
  1829. }
  1830. }
  1831. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1832. if len(st.policy.ClassFilter) == 0 {
  1833. return true
  1834. }
  1835. if cls == nil {
  1836. return false
  1837. }
  1838. for _, f := range st.policy.ClassFilter {
  1839. if strings.EqualFold(f, string(cls.ModType)) {
  1840. return true
  1841. }
  1842. }
  1843. return false
  1844. }
  1845. // ErrNoSession is returned when no matching signal session exists.
  1846. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1847. // ---------------------------------------------------------------------------
  1848. // WAV header helpers
  1849. // ---------------------------------------------------------------------------
  1850. func writeWAVFile(path string, audio []float32, sampleRate int, channels int) error {
  1851. f, err := os.Create(path)
  1852. if err != nil {
  1853. return err
  1854. }
  1855. defer f.Close()
  1856. return writeWAVTo(f, audio, sampleRate, channels)
  1857. }
  1858. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1859. if channels <= 0 {
  1860. channels = 1
  1861. }
  1862. hdr := make([]byte, 44)
  1863. copy(hdr[0:4], "RIFF")
  1864. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1865. copy(hdr[8:12], "WAVE")
  1866. copy(hdr[12:16], "fmt ")
  1867. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1868. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1869. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1870. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1871. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1872. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1873. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1874. copy(hdr[36:40], "data")
  1875. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1876. _, err := f.Write(hdr)
  1877. return err
  1878. }
  1879. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1880. dataSize := uint32(totalSamples * 2)
  1881. var buf [4]byte
  1882. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1883. if _, err := f.Seek(4, 0); err != nil {
  1884. return
  1885. }
  1886. _, _ = f.Write(buf[:])
  1887. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1888. if _, err := f.Seek(24, 0); err != nil {
  1889. return
  1890. }
  1891. _, _ = f.Write(buf[:])
  1892. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1893. if _, err := f.Seek(28, 0); err != nil {
  1894. return
  1895. }
  1896. _, _ = f.Write(buf[:])
  1897. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1898. if _, err := f.Seek(40, 0); err != nil {
  1899. return
  1900. }
  1901. _, _ = f.Write(buf[:])
  1902. }
  1903. // ResetStreams forces all active streaming sessions to discard their FIR states and decimation phases.
  1904. // This is used when the upstream DSP drops samples, creating a hard break in phase continuity.
  1905. func (st *Streamer) ResetStreams() {
  1906. st.mu.Lock()
  1907. defer st.mu.Unlock()
  1908. if st.telemetry != nil {
  1909. st.telemetry.IncCounter("streamer.reset.count", 1, nil)
  1910. st.telemetry.Event("stream_reset", "warn", "stream DSP state reset", nil, map[string]any{"sessions": len(st.sessions)})
  1911. }
  1912. for _, sess := range st.sessions {
  1913. sess.preDemodFIR = nil
  1914. sess.preDemodDecimator = nil
  1915. sess.preDemodDecimPhase = 0
  1916. sess.stereoResampler = nil
  1917. sess.monoResampler = nil
  1918. }
  1919. }