Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1586 lines
46KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. lastAudioL float32
  37. lastAudioR float32
  38. prevAudioL float64 // second-to-last L sample for boundary transient detection
  39. lastAudioSet bool
  40. lastDecIQ complex64
  41. prevDecIQ complex64
  42. lastDecIQSet bool
  43. lastDemodL float32
  44. prevDemodL float64
  45. lastDemodSet bool
  46. // listenOnly sessions have no WAV file and no disk I/O.
  47. // They exist solely to feed audio to live-listen subscribers.
  48. listenOnly bool
  49. // Recording state (nil/zero for listen-only sessions)
  50. dir string
  51. wavFile *os.File
  52. wavBuf *bufio.Writer
  53. wavSamples int64
  54. segmentIdx int
  55. sampleRate int // actual output audio sample rate (always streamAudioRate)
  56. channels int
  57. demodName string
  58. // --- Persistent DSP state for click-free streaming ---
  59. // Overlap-save: tail of previous extracted IQ snippet.
  60. // Currently unused for live demod after removing the extra discriminator
  61. // overlap prepend, but kept in DSP snapshot state for compatibility.
  62. overlapIQ []complex64
  63. // De-emphasis IIR state (persists across frames)
  64. deemphL float64
  65. deemphR float64
  66. // Stereo lock state for live WFM streaming
  67. stereoEnabled bool
  68. stereoOnCount int
  69. stereoOffCount int
  70. // Pilot-locked stereo PLL state (19kHz pilot)
  71. pilotPhase float64
  72. pilotFreq float64
  73. pilotAlpha float64
  74. pilotBeta float64
  75. pilotErrAvg float64
  76. pilotI float64
  77. pilotQ float64
  78. pilotLPAlpha float64
  79. // Polyphase resampler (replaces integer-decimate hack)
  80. monoResampler *dsp.Resampler
  81. monoResamplerRate int
  82. stereoResampler *dsp.StereoResampler
  83. stereoResamplerRate int
  84. // AQ-4: Stateful FIR filters for click-free stereo decode
  85. stereoFilterRate int
  86. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  87. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  88. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  89. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  90. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  91. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  92. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  93. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  94. // and avoids per-frame FIR recomputation)
  95. preDemodFIR *dsp.StatefulFIRComplex
  96. preDemodDecim int // cached decimation factor
  97. preDemodRate int // cached snipRate this FIR was built for
  98. preDemodCutoff float64 // cached cutoff
  99. preDemodDecimPhase int // stateful decimation phase (index offset into next frame)
  100. // AQ-2: De-emphasis config (µs, 0 = disabled)
  101. deemphasisUs float64
  102. // Scratch buffers — reused across frames to avoid GC pressure.
  103. // Grown as needed, never shrunk.
  104. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  105. scratchAudio []float32 // for stereo decode intermediates
  106. scratchPCM []byte // for PCM encoding
  107. // live-listen subscribers
  108. audioSubs []audioSub
  109. }
  110. type audioSub struct {
  111. id int64
  112. ch chan []byte
  113. }
  114. type RuntimeSignalInfo struct {
  115. DemodName string
  116. PlaybackMode string
  117. StereoState string
  118. Channels int
  119. SampleRate int
  120. }
  121. // AudioInfo describes the audio format of a live-listen subscription.
  122. // Sent to the WebSocket client as the first message.
  123. type AudioInfo struct {
  124. SampleRate int `json:"sample_rate"`
  125. Channels int `json:"channels"`
  126. Format string `json:"format"` // always "s16le"
  127. DemodName string `json:"demod"`
  128. PlaybackMode string `json:"playback_mode,omitempty"`
  129. StereoState string `json:"stereo_state,omitempty"`
  130. }
  131. const (
  132. streamAudioRate = 48000
  133. resamplerTaps = 32 // taps per polyphase arm — good quality
  134. )
  135. // ---------------------------------------------------------------------------
  136. // Streamer — manages all active streaming sessions
  137. // ---------------------------------------------------------------------------
  138. type streamFeedItem struct {
  139. signal detector.Signal
  140. snippet []complex64
  141. snipRate int
  142. }
  143. type streamFeedMsg struct {
  144. traceID uint64
  145. items []streamFeedItem
  146. }
  147. type Streamer struct {
  148. mu sync.Mutex
  149. sessions map[int64]*streamSession
  150. policy Policy
  151. centerHz float64
  152. nextSub int64
  153. feedCh chan streamFeedMsg
  154. done chan struct{}
  155. droppedFeed uint64
  156. droppedPCM uint64
  157. lastFeedTS time.Time
  158. lastProcTS time.Time
  159. // pendingListens are subscribers waiting for a matching session.
  160. pendingListens map[int64]*pendingListen
  161. }
  162. type pendingListen struct {
  163. freq float64
  164. bw float64
  165. mode string
  166. ch chan []byte
  167. }
  168. func newStreamer(policy Policy, centerHz float64) *Streamer {
  169. st := &Streamer{
  170. sessions: make(map[int64]*streamSession),
  171. policy: policy,
  172. centerHz: centerHz,
  173. feedCh: make(chan streamFeedMsg, 2),
  174. done: make(chan struct{}),
  175. pendingListens: make(map[int64]*pendingListen),
  176. }
  177. go st.worker()
  178. return st
  179. }
  180. func (st *Streamer) worker() {
  181. for msg := range st.feedCh {
  182. st.processFeed(msg)
  183. }
  184. close(st.done)
  185. }
  186. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  187. st.mu.Lock()
  188. defer st.mu.Unlock()
  189. wasEnabled := st.policy.Enabled
  190. st.policy = policy
  191. st.centerHz = centerHz
  192. // If recording was just disabled, close recording sessions
  193. // but keep listen-only sessions alive.
  194. if wasEnabled && !policy.Enabled {
  195. for id, sess := range st.sessions {
  196. if sess.listenOnly {
  197. continue
  198. }
  199. if len(sess.audioSubs) > 0 {
  200. // Convert to listen-only: close WAV but keep session
  201. convertToListenOnly(sess)
  202. } else {
  203. closeSession(sess, &st.policy)
  204. delete(st.sessions, id)
  205. }
  206. }
  207. }
  208. }
  209. // HasListeners returns true if any sessions have audio subscribers
  210. // or there are pending listen requests. Used by the DSP loop to
  211. // decide whether to feed snippets even when recording is disabled.
  212. func (st *Streamer) HasListeners() bool {
  213. st.mu.Lock()
  214. defer st.mu.Unlock()
  215. return st.hasListenersLocked()
  216. }
  217. func (st *Streamer) hasListenersLocked() bool {
  218. if len(st.pendingListens) > 0 {
  219. return true
  220. }
  221. for _, sess := range st.sessions {
  222. if len(sess.audioSubs) > 0 {
  223. return true
  224. }
  225. }
  226. return false
  227. }
  228. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  229. // Feeds are accepted if:
  230. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  231. // - Any live-listen subscribers exist (listen-only mode)
  232. //
  233. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  234. // data, so items can be passed directly without another copy.
  235. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  236. st.mu.Lock()
  237. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  238. hasListeners := st.hasListenersLocked()
  239. pending := len(st.pendingListens)
  240. debugLiveAudio := st.policy.DebugLiveAudio
  241. now := time.Now()
  242. if !st.lastFeedTS.IsZero() {
  243. gap := now.Sub(st.lastFeedTS)
  244. if gap > 150*time.Millisecond {
  245. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  246. }
  247. }
  248. st.lastFeedTS = now
  249. st.mu.Unlock()
  250. if debugLiveAudio {
  251. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  252. }
  253. if (!recEnabled && !hasListeners) || len(items) == 0 {
  254. return
  255. }
  256. select {
  257. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}:
  258. default:
  259. st.droppedFeed++
  260. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  261. }
  262. }
  263. // processFeed runs in the worker goroutine.
  264. func (st *Streamer) processFeed(msg streamFeedMsg) {
  265. st.mu.Lock()
  266. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  267. hasListeners := st.hasListenersLocked()
  268. now := time.Now()
  269. if !st.lastProcTS.IsZero() {
  270. gap := now.Sub(st.lastProcTS)
  271. if gap > 150*time.Millisecond {
  272. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  273. }
  274. }
  275. st.lastProcTS = now
  276. defer st.mu.Unlock()
  277. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  278. if !recEnabled && !hasListeners {
  279. return
  280. }
  281. seen := make(map[int64]bool, len(msg.items))
  282. for i := range msg.items {
  283. item := &msg.items[i]
  284. sig := &item.signal
  285. seen[sig.ID] = true
  286. if sig.ID == 0 || sig.Class == nil {
  287. continue
  288. }
  289. if len(item.snippet) == 0 || item.snipRate <= 0 {
  290. continue
  291. }
  292. // Decide whether this signal needs a session
  293. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  294. needsListen := st.signalHasListenerLocked(sig)
  295. className := "<nil>"
  296. demodName := ""
  297. if sig.Class != nil {
  298. className = string(sig.Class.ModType)
  299. demodName, _ = resolveDemod(sig)
  300. }
  301. if st.policy.DebugLiveAudio {
  302. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  303. }
  304. if !needsRecording && !needsListen {
  305. continue
  306. }
  307. sess, exists := st.sessions[sig.ID]
  308. requestedMode := ""
  309. for _, pl := range st.pendingListens {
  310. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  311. if m := normalizeRequestedMode(pl.mode); m != "" {
  312. requestedMode = m
  313. break
  314. }
  315. }
  316. }
  317. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  318. for _, sub := range sess.audioSubs {
  319. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  320. }
  321. delete(st.sessions, sig.ID)
  322. sess = nil
  323. exists = false
  324. }
  325. if !exists {
  326. if needsRecording {
  327. s, err := st.openRecordingSession(sig, now)
  328. if err != nil {
  329. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  330. sig.ID, sig.CenterHz/1e6, err)
  331. continue
  332. }
  333. st.sessions[sig.ID] = s
  334. sess = s
  335. } else {
  336. s := st.openListenSession(sig, now)
  337. st.sessions[sig.ID] = s
  338. sess = s
  339. }
  340. // Attach any pending listeners
  341. st.attachPendingListeners(sess)
  342. }
  343. // Update metadata
  344. sess.lastFeed = now
  345. sess.centerHz = sig.CenterHz
  346. sess.bwHz = sig.BWHz
  347. if sig.SNRDb > sess.snrDb {
  348. sess.snrDb = sig.SNRDb
  349. }
  350. if sig.PeakDb > sess.peakDb {
  351. sess.peakDb = sig.PeakDb
  352. }
  353. if sig.Class != nil {
  354. sess.class = sig.Class
  355. }
  356. // Demod with persistent state
  357. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  358. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  359. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  360. if len(audio) == 0 {
  361. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  362. }
  363. if len(audio) > 0 {
  364. if sess.wavSamples == 0 && audioRate > 0 {
  365. sess.sampleRate = audioRate
  366. }
  367. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  368. pcmLen := len(audio) * 2
  369. pcm := sess.growPCM(pcmLen)
  370. for k, s := range audio {
  371. v := int16(clip(s * 32767))
  372. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  373. }
  374. if !sess.listenOnly && sess.wavBuf != nil {
  375. n, err := sess.wavBuf.Write(pcm)
  376. if err != nil {
  377. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  378. } else {
  379. sess.wavSamples += int64(n / 2)
  380. }
  381. }
  382. // Gap logging for live-audio sessions + transient click detector
  383. if len(sess.audioSubs) > 0 {
  384. if !sess.lastAudioTs.IsZero() {
  385. gap := time.Since(sess.lastAudioTs)
  386. if gap > 150*time.Millisecond {
  387. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  388. }
  389. }
  390. // Transient click detector: finds short impulses (1-3 samples)
  391. // that deviate sharply from the local signal trend.
  392. // A click looks like: ...smooth... SPIKE ...smooth...
  393. // Normal FM audio has large deltas too, but they follow
  394. // a continuous curve. A click has high |d2/dt2| (acceleration).
  395. //
  396. // Method: second-derivative detector. For each sample triplet
  397. // (a, b, c), compute |2b - a - c| which is the discrete
  398. // second derivative magnitude. High values = transient spike.
  399. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  400. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  401. stride := sess.channels
  402. if stride < 1 {
  403. stride = 1
  404. }
  405. nFrames := len(audio) / stride
  406. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  407. if sess.lastAudioSet && nFrames >= 1 {
  408. // second derivative across boundary: |2*last - prevLast - first|
  409. first := float64(audio[0])
  410. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  411. if d2 > 0.15 {
  412. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  413. }
  414. }
  415. // Intra-frame transient scan (L channel only for performance)
  416. nClicks := 0
  417. maxD2 := float64(0)
  418. maxD2Pos := 0
  419. for k := 1; k < nFrames-1; k++ {
  420. a := float64(audio[(k-1)*stride])
  421. b := float64(audio[k*stride])
  422. c := float64(audio[(k+1)*stride])
  423. d2 := math.Abs(2*b - a - c)
  424. if d2 > maxD2 {
  425. maxD2 = d2
  426. maxD2Pos = k
  427. }
  428. if d2 > 0.15 {
  429. nClicks++
  430. }
  431. }
  432. if nClicks > 0 {
  433. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  434. }
  435. // Store last two samples for next frame's boundary check
  436. if nFrames >= 2 {
  437. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  438. sess.lastAudioL = audio[(nFrames-1)*stride]
  439. if stride > 1 {
  440. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  441. }
  442. } else if nFrames == 1 {
  443. sess.prevAudioL = float64(sess.lastAudioL)
  444. sess.lastAudioL = audio[0]
  445. if stride > 1 && len(audio) >= 2 {
  446. sess.lastAudioR = audio[1]
  447. }
  448. }
  449. sess.lastAudioSet = true
  450. }
  451. sess.lastAudioTs = time.Now()
  452. }
  453. st.fanoutPCM(sess, pcm, pcmLen)
  454. }
  455. // Segment split (recording sessions only)
  456. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  457. segIdx := sess.segmentIdx + 1
  458. oldSubs := sess.audioSubs
  459. oldState := sess.captureDSPState()
  460. sess.audioSubs = nil
  461. closeSession(sess, &st.policy)
  462. s, err := st.openRecordingSession(sig, now)
  463. if err != nil {
  464. delete(st.sessions, sig.ID)
  465. continue
  466. }
  467. s.segmentIdx = segIdx
  468. s.audioSubs = oldSubs
  469. s.restoreDSPState(oldState)
  470. st.sessions[sig.ID] = s
  471. }
  472. }
  473. // Close sessions for disappeared signals (with grace period)
  474. for id, sess := range st.sessions {
  475. if seen[id] {
  476. continue
  477. }
  478. gracePeriod := 3 * time.Second
  479. if sess.listenOnly {
  480. gracePeriod = 5 * time.Second
  481. }
  482. if now.Sub(sess.lastFeed) > gracePeriod {
  483. for _, sub := range sess.audioSubs {
  484. close(sub.ch)
  485. }
  486. sess.audioSubs = nil
  487. if !sess.listenOnly {
  488. closeSession(sess, &st.policy)
  489. }
  490. delete(st.sessions, id)
  491. }
  492. }
  493. }
  494. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  495. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  496. if st.policy.DebugLiveAudio {
  497. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  498. }
  499. return true
  500. }
  501. for subID, pl := range st.pendingListens {
  502. delta := math.Abs(sig.CenterHz - pl.freq)
  503. if delta < 200000 {
  504. if st.policy.DebugLiveAudio {
  505. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  506. }
  507. return true
  508. }
  509. }
  510. return false
  511. }
  512. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  513. for subID, pl := range st.pendingListens {
  514. requestedMode := normalizeRequestedMode(pl.mode)
  515. if requestedMode != "" && sess.demodName != requestedMode {
  516. continue
  517. }
  518. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  519. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  520. delete(st.pendingListens, subID)
  521. // Send updated audio_info now that we know the real session params.
  522. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  523. infoJSON, _ := json.Marshal(sess.audioInfo())
  524. tagged := make([]byte, 1+len(infoJSON))
  525. tagged[0] = 0x00 // tag: audio_info
  526. copy(tagged[1:], infoJSON)
  527. select {
  528. case pl.ch <- tagged:
  529. default:
  530. }
  531. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  532. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  533. }
  534. }
  535. }
  536. // CloseAll finalises all sessions and stops the worker goroutine.
  537. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  538. st.mu.Lock()
  539. defer st.mu.Unlock()
  540. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  541. for _, sess := range st.sessions {
  542. out[sess.signalID] = RuntimeSignalInfo{
  543. DemodName: sess.demodName,
  544. PlaybackMode: sess.playbackMode,
  545. StereoState: sess.stereoState,
  546. Channels: sess.channels,
  547. SampleRate: sess.sampleRate,
  548. }
  549. }
  550. return out
  551. }
  552. func (st *Streamer) CloseAll() {
  553. close(st.feedCh)
  554. <-st.done
  555. st.mu.Lock()
  556. defer st.mu.Unlock()
  557. for id, sess := range st.sessions {
  558. for _, sub := range sess.audioSubs {
  559. close(sub.ch)
  560. }
  561. sess.audioSubs = nil
  562. if !sess.listenOnly {
  563. closeSession(sess, &st.policy)
  564. }
  565. delete(st.sessions, id)
  566. }
  567. for _, pl := range st.pendingListens {
  568. close(pl.ch)
  569. }
  570. st.pendingListens = nil
  571. }
  572. // ActiveSessions returns the number of open streaming sessions.
  573. func (st *Streamer) ActiveSessions() int {
  574. st.mu.Lock()
  575. defer st.mu.Unlock()
  576. return len(st.sessions)
  577. }
  578. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  579. //
  580. // LL-2: Returns AudioInfo with correct channels and sample rate.
  581. // LL-3: Returns error only on hard failures (nil streamer etc).
  582. //
  583. // If a matching session exists, attaches immediately. Otherwise, the
  584. // subscriber is held as "pending" and will be attached when a matching
  585. // signal appears in the next DSP frame.
  586. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  587. ch := make(chan []byte, 64)
  588. st.mu.Lock()
  589. defer st.mu.Unlock()
  590. st.nextSub++
  591. subID := st.nextSub
  592. requestedMode := normalizeRequestedMode(mode)
  593. // Try to find a matching session
  594. var bestSess *streamSession
  595. bestDist := math.MaxFloat64
  596. for _, sess := range st.sessions {
  597. if requestedMode != "" && sess.demodName != requestedMode {
  598. continue
  599. }
  600. d := math.Abs(sess.centerHz - freq)
  601. if d < bestDist {
  602. bestDist = d
  603. bestSess = sess
  604. }
  605. }
  606. if bestSess != nil && bestDist < 200000 {
  607. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  608. info := bestSess.audioInfo()
  609. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  610. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  611. return subID, ch, info, nil
  612. }
  613. // No matching session yet — add as pending listener
  614. st.pendingListens[subID] = &pendingListen{
  615. freq: freq,
  616. bw: bw,
  617. mode: mode,
  618. ch: ch,
  619. }
  620. info := defaultAudioInfoForMode(mode)
  621. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  622. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  623. return subID, ch, info, nil
  624. }
  625. // UnsubscribeAudio removes a live-listen subscriber.
  626. func (st *Streamer) UnsubscribeAudio(subID int64) {
  627. st.mu.Lock()
  628. defer st.mu.Unlock()
  629. if pl, ok := st.pendingListens[subID]; ok {
  630. close(pl.ch)
  631. delete(st.pendingListens, subID)
  632. return
  633. }
  634. for _, sess := range st.sessions {
  635. for i, sub := range sess.audioSubs {
  636. if sub.id == subID {
  637. close(sub.ch)
  638. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  639. return
  640. }
  641. }
  642. }
  643. }
  644. // ---------------------------------------------------------------------------
  645. // Session: stateful extraction + demod
  646. // ---------------------------------------------------------------------------
  647. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  648. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  649. // output with zero transient artifacts.
  650. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  651. if len(snippet) == 0 || snipRate <= 0 {
  652. return nil, 0
  653. }
  654. isWFMStereo := sess.demodName == "WFM_STEREO"
  655. isWFM := sess.demodName == "WFM" || isWFMStereo
  656. demodName := sess.demodName
  657. if isWFMStereo {
  658. demodName = "WFM"
  659. }
  660. d := demod.Get(demodName)
  661. if d == nil {
  662. d = demod.Get("NFM")
  663. }
  664. if d == nil {
  665. return nil, 0
  666. }
  667. // The extra 1-sample discriminator overlap prepend was removed after it was
  668. // shown to shift the downstream decimation phase and create heavy click
  669. // artifacts in steady-state streaming/recording. The upstream extraction path
  670. // and the stateful FIR/decimation stages already provide continuity.
  671. fullSnip := snippet
  672. overlapApplied := false
  673. prevTailValid := false
  674. // --- Stateful anti-alias FIR + decimation to demod rate ---
  675. demodRate := d.OutputSampleRate()
  676. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  677. if decim1 < 1 {
  678. decim1 = 1
  679. }
  680. // WFM override: force decim1=2 (256kHz) instead of round(512k/192k)=3 (170kHz).
  681. // At decim1=3, Nyquist is 85kHz which clips FM broadcast ±75kHz deviation.
  682. // At decim1=2, Nyquist is 128kHz → full FM deviation + stereo pilot + guard band.
  683. // Bonus: 256000→48000 resampler ratio is L=3/M=16 (96 taps, 1kB) instead of
  684. // the pathological L=24000/M=85333 (768k taps, 6MB) from 170666→48000.
  685. if isWFM && decim1 > 2 && snipRate/2 >= 200000 {
  686. decim1 = 2
  687. }
  688. actualDemodRate := snipRate / decim1
  689. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  690. var dec []complex64
  691. if decim1 > 1 {
  692. // FIR cutoff: for WFM, use 90kHz (above ±75kHz FM deviation + guard).
  693. // For NFM/other: use standard Nyquist*0.8 cutoff.
  694. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  695. if isWFM {
  696. cutoff = 90000
  697. }
  698. // Lazy-init or reinit stateful FIR if parameters changed
  699. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  700. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  701. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  702. sess.preDemodRate = snipRate
  703. sess.preDemodCutoff = cutoff
  704. sess.preDemodDecim = decim1
  705. sess.preDemodDecimPhase = 0
  706. }
  707. decimPhaseBefore := sess.preDemodDecimPhase
  708. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  709. dec = dsp.DecimateStateful(filtered, decim1, &sess.preDemodDecimPhase)
  710. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase)
  711. } else {
  712. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0)
  713. dec = fullSnip
  714. }
  715. if logging.EnabledCategory("boundary") && len(dec) > 0 {
  716. first := dec[0]
  717. if sess.lastDecIQSet {
  718. d2Re := math.Abs(2*float64(real(sess.lastDecIQ)) - float64(real(sess.prevDecIQ)) - float64(real(first)))
  719. d2Im := math.Abs(2*float64(imag(sess.lastDecIQ)) - float64(imag(sess.prevDecIQ)) - float64(imag(first)))
  720. d2Mag := math.Hypot(d2Re, d2Im)
  721. if d2Mag > 0.15 {
  722. logging.Warn("boundary", "dec_iq_boundary", "signal", sess.signalID, "d2", d2Mag)
  723. }
  724. }
  725. headN := 16
  726. if len(dec) < headN {
  727. headN = len(dec)
  728. }
  729. tailN := 16
  730. if len(dec) < tailN {
  731. tailN = len(dec)
  732. }
  733. var headSum, tailSum, minMag, maxMag float64
  734. minMag = math.MaxFloat64
  735. for i, v := range dec {
  736. mag := math.Hypot(float64(real(v)), float64(imag(v)))
  737. if mag < minMag {
  738. minMag = mag
  739. }
  740. if mag > maxMag {
  741. maxMag = mag
  742. }
  743. if i < headN {
  744. headSum += mag
  745. }
  746. }
  747. for i := len(dec) - tailN; i < len(dec); i++ {
  748. if i >= 0 {
  749. v := dec[i]
  750. tailSum += math.Hypot(float64(real(v)), float64(imag(v)))
  751. }
  752. }
  753. headAvg := 0.0
  754. if headN > 0 {
  755. headAvg = headSum / float64(headN)
  756. }
  757. tailAvg := 0.0
  758. if tailN > 0 {
  759. tailAvg = tailSum / float64(tailN)
  760. }
  761. logging.Debug("boundary", "dec_iq_meter", "signal", sess.signalID, "len", len(dec), "head_avg", headAvg, "tail_avg", tailAvg, "min_mag", minMag, "max_mag", maxMag)
  762. if tailAvg > 0 {
  763. ratio := headAvg / tailAvg
  764. if ratio < 0.75 || ratio > 1.25 {
  765. logging.Warn("boundary", "dec_iq_head_tail_skew", "signal", sess.signalID, "head_avg", headAvg, "tail_avg", tailAvg, "ratio", ratio)
  766. }
  767. }
  768. if len(dec) >= 2 {
  769. sess.prevDecIQ = dec[len(dec)-2]
  770. sess.lastDecIQ = dec[len(dec)-1]
  771. } else {
  772. sess.prevDecIQ = sess.lastDecIQ
  773. sess.lastDecIQ = dec[0]
  774. }
  775. sess.lastDecIQSet = true
  776. }
  777. // --- FM/AM/etc Demod ---
  778. audio := d.Demod(dec, actualDemodRate)
  779. if len(audio) == 0 {
  780. return nil, 0
  781. }
  782. if logging.EnabledCategory("boundary") {
  783. stride := d.Channels()
  784. if stride < 1 {
  785. stride = 1
  786. }
  787. nFrames := len(audio) / stride
  788. if nFrames > 0 {
  789. first := float64(audio[0])
  790. if sess.lastDemodSet {
  791. d2 := math.Abs(2*float64(sess.lastDemodL) - sess.prevDemodL - first)
  792. if d2 > 0.15 {
  793. logging.Warn("boundary", "demod_boundary", "signal", sess.signalID, "d2", d2)
  794. }
  795. }
  796. if nFrames >= 2 {
  797. sess.prevDemodL = float64(audio[(nFrames-2)*stride])
  798. sess.lastDemodL = audio[(nFrames-1)*stride]
  799. } else {
  800. sess.prevDemodL = float64(sess.lastDemodL)
  801. sess.lastDemodL = audio[0]
  802. }
  803. sess.lastDemodSet = true
  804. }
  805. }
  806. logging.Debug("boundary", "audio_path", "signal", sess.signalID, "demod", demodName, "actual_rate", actualDemodRate, "audio_len", len(audio), "channels", d.Channels(), "overlap_applied", overlapApplied, "prev_tail_valid", prevTailValid)
  807. // --- Stateful stereo decode with conservative lock/hysteresis ---
  808. channels := 1
  809. if isWFMStereo {
  810. sess.playbackMode = "WFM_STEREO"
  811. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  812. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  813. if locked {
  814. sess.stereoOnCount++
  815. sess.stereoOffCount = 0
  816. if sess.stereoOnCount >= 4 {
  817. sess.stereoEnabled = true
  818. }
  819. } else {
  820. sess.stereoOnCount = 0
  821. sess.stereoOffCount++
  822. if sess.stereoOffCount >= 10 {
  823. sess.stereoEnabled = false
  824. }
  825. }
  826. prevPlayback := sess.playbackMode
  827. prevStereo := sess.stereoState
  828. if sess.stereoEnabled && len(stereoAudio) > 0 {
  829. sess.stereoState = "locked"
  830. audio = stereoAudio
  831. } else {
  832. sess.stereoState = "mono-fallback"
  833. dual := make([]float32, len(audio)*2)
  834. for i, s := range audio {
  835. dual[i*2] = s
  836. dual[i*2+1] = s
  837. }
  838. audio = dual
  839. }
  840. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  841. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  842. }
  843. }
  844. // --- Polyphase resample to exact 48kHz ---
  845. if actualDemodRate != streamAudioRate {
  846. if channels > 1 {
  847. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  848. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  849. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  850. sess.stereoResamplerRate = actualDemodRate
  851. }
  852. audio = sess.stereoResampler.Process(audio)
  853. } else {
  854. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  855. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  856. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  857. sess.monoResamplerRate = actualDemodRate
  858. }
  859. audio = sess.monoResampler.Process(audio)
  860. }
  861. }
  862. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  863. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  864. tau := sess.deemphasisUs * 1e-6
  865. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  866. if channels > 1 {
  867. nFrames := len(audio) / channels
  868. yL, yR := sess.deemphL, sess.deemphR
  869. for i := 0; i < nFrames; i++ {
  870. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  871. audio[i*2] = float32(yL)
  872. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  873. audio[i*2+1] = float32(yR)
  874. }
  875. sess.deemphL, sess.deemphR = yL, yR
  876. } else {
  877. y := sess.deemphL
  878. for i := range audio {
  879. y = alpha*y + (1-alpha)*float64(audio[i])
  880. audio[i] = float32(y)
  881. }
  882. sess.deemphL = y
  883. }
  884. }
  885. if isWFM {
  886. for i := range audio {
  887. audio[i] *= 0.35
  888. }
  889. }
  890. return audio, streamAudioRate
  891. }
  892. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  893. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  894. // loopBW is in Hz, sampleRate in samples/sec.
  895. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  896. if sampleRate <= 0 || loopBW <= 0 {
  897. return 0, 0
  898. }
  899. bl := loopBW / float64(sampleRate)
  900. theta := bl / (damping + 0.25/damping)
  901. d := 1 + 2*damping*theta + theta*theta
  902. alpha := (4 * damping * theta) / d
  903. beta := (4 * theta * theta) / d
  904. return alpha, beta
  905. }
  906. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  907. // Uses persistent FIR filter state across frames for click-free stereo.
  908. // Reuses session scratch buffers to minimize allocations.
  909. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  910. if len(mono) == 0 || sampleRate <= 0 {
  911. return nil, false
  912. }
  913. n := len(mono)
  914. // Rebuild rate-dependent stereo filters when sampleRate changes
  915. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  916. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  917. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  918. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  919. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  920. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  921. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  922. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  923. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  924. sess.stereoFilterRate = sampleRate
  925. // Initialize PLL for 19kHz pilot tracking.
  926. sess.pilotPhase = 0
  927. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  928. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  929. sess.pilotErrAvg = 0
  930. sess.pilotI = 0
  931. sess.pilotQ = 0
  932. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  933. }
  934. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  935. scratch := sess.growAudio(n * 5)
  936. lpr := scratch[:n]
  937. bpfLR := scratch[n : 2*n]
  938. lr := scratch[2*n : 3*n]
  939. work1 := scratch[3*n : 4*n]
  940. work2 := scratch[4*n : 5*n]
  941. sess.stereoLPF.ProcessInto(mono, lpr)
  942. // 23-53kHz bandpass for L-R DSB-SC.
  943. sess.stereoBPHi.ProcessInto(mono, work1)
  944. sess.stereoBPLo.ProcessInto(mono, work2)
  945. for i := 0; i < n; i++ {
  946. bpfLR[i] = work1[i] - work2[i]
  947. }
  948. // 19kHz pilot bandpass for PLL.
  949. sess.pilotLPFHi.ProcessInto(mono, work1)
  950. sess.pilotLPFLo.ProcessInto(mono, work2)
  951. for i := 0; i < n; i++ {
  952. work1[i] = work1[i] - work2[i]
  953. }
  954. pilot := work1
  955. phase := sess.pilotPhase
  956. freq := sess.pilotFreq
  957. alpha := sess.pilotAlpha
  958. beta := sess.pilotBeta
  959. iState := sess.pilotI
  960. qState := sess.pilotQ
  961. lpAlpha := sess.pilotLPAlpha
  962. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  963. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  964. var pilotPower float64
  965. var totalPower float64
  966. var errSum float64
  967. for i := 0; i < n; i++ {
  968. p := float64(pilot[i])
  969. sinP, cosP := math.Sincos(phase)
  970. iMix := p * cosP
  971. qMix := p * -sinP
  972. iState += lpAlpha * (iMix - iState)
  973. qState += lpAlpha * (qMix - qState)
  974. err := math.Atan2(qState, iState)
  975. freq += beta * err
  976. if freq < minFreq {
  977. freq = minFreq
  978. } else if freq > maxFreq {
  979. freq = maxFreq
  980. }
  981. phase += freq + alpha*err
  982. if phase > 2*math.Pi {
  983. phase -= 2 * math.Pi
  984. } else if phase < 0 {
  985. phase += 2 * math.Pi
  986. }
  987. totalPower += float64(mono[i]) * float64(mono[i])
  988. pilotPower += p * p
  989. errSum += math.Abs(err)
  990. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  991. }
  992. sess.pilotPhase = phase
  993. sess.pilotFreq = freq
  994. sess.pilotI = iState
  995. sess.pilotQ = qState
  996. blockErr := errSum / float64(n)
  997. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  998. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  999. pilotRatio := 0.0
  1000. if totalPower > 0 {
  1001. pilotRatio = pilotPower / totalPower
  1002. }
  1003. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  1004. // Lock heuristics: pilot power fraction and PLL phase error stability.
  1005. // Pilot power is a small but stable fraction of composite energy; require
  1006. // a modest floor plus PLL settling to avoid flapping in noise.
  1007. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  1008. out := make([]float32, n*2)
  1009. for i := 0; i < n; i++ {
  1010. out[i*2] = 0.5 * (lpr[i] + lr[i])
  1011. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  1012. }
  1013. return out, locked
  1014. }
  1015. // dspStateSnapshot captures persistent DSP state for segment splits.
  1016. type dspStateSnapshot struct {
  1017. overlapIQ []complex64
  1018. deemphL float64
  1019. deemphR float64
  1020. pilotPhase float64
  1021. pilotFreq float64
  1022. pilotAlpha float64
  1023. pilotBeta float64
  1024. pilotErrAvg float64
  1025. pilotI float64
  1026. pilotQ float64
  1027. pilotLPAlpha float64
  1028. monoResampler *dsp.Resampler
  1029. monoResamplerRate int
  1030. stereoResampler *dsp.StereoResampler
  1031. stereoResamplerRate int
  1032. stereoLPF *dsp.StatefulFIRReal
  1033. stereoFilterRate int
  1034. stereoBPHi *dsp.StatefulFIRReal
  1035. stereoBPLo *dsp.StatefulFIRReal
  1036. stereoLRLPF *dsp.StatefulFIRReal
  1037. stereoAALPF *dsp.StatefulFIRReal
  1038. pilotLPFHi *dsp.StatefulFIRReal
  1039. pilotLPFLo *dsp.StatefulFIRReal
  1040. preDemodFIR *dsp.StatefulFIRComplex
  1041. preDemodDecim int
  1042. preDemodRate int
  1043. preDemodCutoff float64
  1044. preDemodDecimPhase int
  1045. }
  1046. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  1047. return dspStateSnapshot{
  1048. overlapIQ: sess.overlapIQ,
  1049. deemphL: sess.deemphL,
  1050. deemphR: sess.deemphR,
  1051. pilotPhase: sess.pilotPhase,
  1052. pilotFreq: sess.pilotFreq,
  1053. pilotAlpha: sess.pilotAlpha,
  1054. pilotBeta: sess.pilotBeta,
  1055. pilotErrAvg: sess.pilotErrAvg,
  1056. pilotI: sess.pilotI,
  1057. pilotQ: sess.pilotQ,
  1058. pilotLPAlpha: sess.pilotLPAlpha,
  1059. monoResampler: sess.monoResampler,
  1060. monoResamplerRate: sess.monoResamplerRate,
  1061. stereoResampler: sess.stereoResampler,
  1062. stereoResamplerRate: sess.stereoResamplerRate,
  1063. stereoLPF: sess.stereoLPF,
  1064. stereoFilterRate: sess.stereoFilterRate,
  1065. stereoBPHi: sess.stereoBPHi,
  1066. stereoBPLo: sess.stereoBPLo,
  1067. stereoLRLPF: sess.stereoLRLPF,
  1068. stereoAALPF: sess.stereoAALPF,
  1069. pilotLPFHi: sess.pilotLPFHi,
  1070. pilotLPFLo: sess.pilotLPFLo,
  1071. preDemodFIR: sess.preDemodFIR,
  1072. preDemodDecim: sess.preDemodDecim,
  1073. preDemodRate: sess.preDemodRate,
  1074. preDemodCutoff: sess.preDemodCutoff,
  1075. preDemodDecimPhase: sess.preDemodDecimPhase,
  1076. }
  1077. }
  1078. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  1079. sess.overlapIQ = s.overlapIQ
  1080. sess.deemphL = s.deemphL
  1081. sess.deemphR = s.deemphR
  1082. sess.pilotPhase = s.pilotPhase
  1083. sess.pilotFreq = s.pilotFreq
  1084. sess.pilotAlpha = s.pilotAlpha
  1085. sess.pilotBeta = s.pilotBeta
  1086. sess.pilotErrAvg = s.pilotErrAvg
  1087. sess.pilotI = s.pilotI
  1088. sess.pilotQ = s.pilotQ
  1089. sess.pilotLPAlpha = s.pilotLPAlpha
  1090. sess.monoResampler = s.monoResampler
  1091. sess.monoResamplerRate = s.monoResamplerRate
  1092. sess.stereoResampler = s.stereoResampler
  1093. sess.stereoResamplerRate = s.stereoResamplerRate
  1094. sess.stereoLPF = s.stereoLPF
  1095. sess.stereoFilterRate = s.stereoFilterRate
  1096. sess.stereoBPHi = s.stereoBPHi
  1097. sess.stereoBPLo = s.stereoBPLo
  1098. sess.stereoLRLPF = s.stereoLRLPF
  1099. sess.stereoAALPF = s.stereoAALPF
  1100. sess.pilotLPFHi = s.pilotLPFHi
  1101. sess.pilotLPFLo = s.pilotLPFLo
  1102. sess.preDemodFIR = s.preDemodFIR
  1103. sess.preDemodDecim = s.preDemodDecim
  1104. sess.preDemodRate = s.preDemodRate
  1105. sess.preDemodCutoff = s.preDemodCutoff
  1106. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1107. }
  1108. // ---------------------------------------------------------------------------
  1109. // Session management helpers
  1110. // ---------------------------------------------------------------------------
  1111. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1112. outputDir := st.policy.OutputDir
  1113. if outputDir == "" {
  1114. outputDir = "data/recordings"
  1115. }
  1116. demodName, channels := resolveDemod(sig)
  1117. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1118. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1119. dir := filepath.Join(outputDir, dirName)
  1120. if err := os.MkdirAll(dir, 0o755); err != nil {
  1121. return nil, err
  1122. }
  1123. wavPath := filepath.Join(dir, "audio.wav")
  1124. f, err := os.Create(wavPath)
  1125. if err != nil {
  1126. return nil, err
  1127. }
  1128. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1129. f.Close()
  1130. return nil, err
  1131. }
  1132. playbackMode, stereoState := initialPlaybackState(demodName)
  1133. sess := &streamSession{
  1134. signalID: sig.ID,
  1135. centerHz: sig.CenterHz,
  1136. bwHz: sig.BWHz,
  1137. snrDb: sig.SNRDb,
  1138. peakDb: sig.PeakDb,
  1139. class: sig.Class,
  1140. startTime: now,
  1141. lastFeed: now,
  1142. dir: dir,
  1143. wavFile: f,
  1144. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1145. sampleRate: streamAudioRate,
  1146. channels: channels,
  1147. demodName: demodName,
  1148. playbackMode: playbackMode,
  1149. stereoState: stereoState,
  1150. deemphasisUs: st.policy.DeemphasisUs,
  1151. }
  1152. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1153. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1154. return sess, nil
  1155. }
  1156. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1157. demodName, channels := resolveDemod(sig)
  1158. for _, pl := range st.pendingListens {
  1159. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1160. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1161. demodName = requested
  1162. if demodName == "WFM_STEREO" {
  1163. channels = 2
  1164. } else if d := demod.Get(demodName); d != nil {
  1165. channels = d.Channels()
  1166. } else {
  1167. channels = 1
  1168. }
  1169. break
  1170. }
  1171. }
  1172. }
  1173. playbackMode, stereoState := initialPlaybackState(demodName)
  1174. sess := &streamSession{
  1175. signalID: sig.ID,
  1176. centerHz: sig.CenterHz,
  1177. bwHz: sig.BWHz,
  1178. snrDb: sig.SNRDb,
  1179. peakDb: sig.PeakDb,
  1180. class: sig.Class,
  1181. startTime: now,
  1182. lastFeed: now,
  1183. listenOnly: true,
  1184. sampleRate: streamAudioRate,
  1185. channels: channels,
  1186. demodName: demodName,
  1187. playbackMode: playbackMode,
  1188. stereoState: stereoState,
  1189. deemphasisUs: st.policy.DeemphasisUs,
  1190. }
  1191. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1192. sig.ID, sig.CenterHz/1e6, demodName)
  1193. return sess
  1194. }
  1195. func resolveDemod(sig *detector.Signal) (string, int) {
  1196. demodName := "NFM"
  1197. if sig.Class != nil {
  1198. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1199. demodName = n
  1200. }
  1201. }
  1202. channels := 1
  1203. if demodName == "WFM_STEREO" {
  1204. channels = 2
  1205. } else if d := demod.Get(demodName); d != nil {
  1206. channels = d.Channels()
  1207. }
  1208. return demodName, channels
  1209. }
  1210. func initialPlaybackState(demodName string) (string, string) {
  1211. playbackMode := demodName
  1212. stereoState := "mono"
  1213. if demodName == "WFM_STEREO" {
  1214. stereoState = "searching"
  1215. }
  1216. return playbackMode, stereoState
  1217. }
  1218. func (sess *streamSession) audioInfo() AudioInfo {
  1219. return AudioInfo{
  1220. SampleRate: sess.sampleRate,
  1221. Channels: sess.channels,
  1222. Format: "s16le",
  1223. DemodName: sess.demodName,
  1224. PlaybackMode: sess.playbackMode,
  1225. StereoState: sess.stereoState,
  1226. }
  1227. }
  1228. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1229. infoJSON, _ := json.Marshal(info)
  1230. tagged := make([]byte, 1+len(infoJSON))
  1231. tagged[0] = 0x00 // tag: audio_info
  1232. copy(tagged[1:], infoJSON)
  1233. for _, sub := range subs {
  1234. select {
  1235. case sub.ch <- tagged:
  1236. default:
  1237. }
  1238. }
  1239. }
  1240. func defaultAudioInfoForMode(mode string) AudioInfo {
  1241. demodName := "NFM"
  1242. if requested := normalizeRequestedMode(mode); requested != "" {
  1243. demodName = requested
  1244. }
  1245. channels := 1
  1246. if demodName == "WFM_STEREO" {
  1247. channels = 2
  1248. } else if d := demod.Get(demodName); d != nil {
  1249. channels = d.Channels()
  1250. }
  1251. playbackMode, stereoState := initialPlaybackState(demodName)
  1252. return AudioInfo{
  1253. SampleRate: streamAudioRate,
  1254. Channels: channels,
  1255. Format: "s16le",
  1256. DemodName: demodName,
  1257. PlaybackMode: playbackMode,
  1258. StereoState: stereoState,
  1259. }
  1260. }
  1261. func normalizeRequestedMode(mode string) string {
  1262. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1263. case "", "AUTO":
  1264. return ""
  1265. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1266. return strings.ToUpper(strings.TrimSpace(mode))
  1267. default:
  1268. return ""
  1269. }
  1270. }
  1271. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1272. func (sess *streamSession) growIQ(n int) []complex64 {
  1273. if cap(sess.scratchIQ) >= n {
  1274. return sess.scratchIQ[:n]
  1275. }
  1276. sess.scratchIQ = make([]complex64, n, n*5/4)
  1277. return sess.scratchIQ
  1278. }
  1279. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1280. func (sess *streamSession) growAudio(n int) []float32 {
  1281. if cap(sess.scratchAudio) >= n {
  1282. return sess.scratchAudio[:n]
  1283. }
  1284. sess.scratchAudio = make([]float32, n, n*5/4)
  1285. return sess.scratchAudio
  1286. }
  1287. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1288. func (sess *streamSession) growPCM(n int) []byte {
  1289. if cap(sess.scratchPCM) >= n {
  1290. return sess.scratchPCM[:n]
  1291. }
  1292. sess.scratchPCM = make([]byte, n, n*5/4)
  1293. return sess.scratchPCM
  1294. }
  1295. func convertToListenOnly(sess *streamSession) {
  1296. if sess.wavBuf != nil {
  1297. _ = sess.wavBuf.Flush()
  1298. }
  1299. if sess.wavFile != nil {
  1300. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1301. sess.wavFile.Close()
  1302. }
  1303. sess.wavFile = nil
  1304. sess.wavBuf = nil
  1305. sess.listenOnly = true
  1306. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1307. }
  1308. func closeSession(sess *streamSession, policy *Policy) {
  1309. if sess.listenOnly {
  1310. return
  1311. }
  1312. if sess.wavBuf != nil {
  1313. _ = sess.wavBuf.Flush()
  1314. }
  1315. if sess.wavFile != nil {
  1316. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1317. sess.wavFile.Close()
  1318. sess.wavFile = nil
  1319. sess.wavBuf = nil
  1320. }
  1321. dur := sess.lastFeed.Sub(sess.startTime)
  1322. files := map[string]any{
  1323. "audio": "audio.wav",
  1324. "audio_sample_rate": sess.sampleRate,
  1325. "audio_channels": sess.channels,
  1326. "audio_demod": sess.demodName,
  1327. "recording_mode": "streaming",
  1328. }
  1329. meta := Meta{
  1330. EventID: sess.signalID,
  1331. Start: sess.startTime,
  1332. End: sess.lastFeed,
  1333. CenterHz: sess.centerHz,
  1334. BandwidthHz: sess.bwHz,
  1335. SampleRate: sess.sampleRate,
  1336. SNRDb: sess.snrDb,
  1337. PeakDb: sess.peakDb,
  1338. Class: sess.class,
  1339. DurationMs: dur.Milliseconds(),
  1340. Files: files,
  1341. }
  1342. b, err := json.MarshalIndent(meta, "", " ")
  1343. if err == nil {
  1344. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1345. }
  1346. if policy != nil {
  1347. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1348. }
  1349. }
  1350. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1351. if len(sess.audioSubs) == 0 {
  1352. return
  1353. }
  1354. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1355. tagged := make([]byte, 1+pcmLen)
  1356. tagged[0] = 0x01
  1357. copy(tagged[1:], pcm[:pcmLen])
  1358. alive := sess.audioSubs[:0]
  1359. for _, sub := range sess.audioSubs {
  1360. select {
  1361. case sub.ch <- tagged:
  1362. default:
  1363. st.droppedPCM++
  1364. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1365. }
  1366. alive = append(alive, sub)
  1367. }
  1368. sess.audioSubs = alive
  1369. }
  1370. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1371. if len(st.policy.ClassFilter) == 0 {
  1372. return true
  1373. }
  1374. if cls == nil {
  1375. return false
  1376. }
  1377. for _, f := range st.policy.ClassFilter {
  1378. if strings.EqualFold(f, string(cls.ModType)) {
  1379. return true
  1380. }
  1381. }
  1382. return false
  1383. }
  1384. // ErrNoSession is returned when no matching signal session exists.
  1385. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1386. // ---------------------------------------------------------------------------
  1387. // WAV header helpers
  1388. // ---------------------------------------------------------------------------
  1389. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1390. if channels <= 0 {
  1391. channels = 1
  1392. }
  1393. hdr := make([]byte, 44)
  1394. copy(hdr[0:4], "RIFF")
  1395. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1396. copy(hdr[8:12], "WAVE")
  1397. copy(hdr[12:16], "fmt ")
  1398. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1399. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1400. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1401. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1402. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1403. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1404. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1405. copy(hdr[36:40], "data")
  1406. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1407. _, err := f.Write(hdr)
  1408. return err
  1409. }
  1410. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1411. dataSize := uint32(totalSamples * 2)
  1412. var buf [4]byte
  1413. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1414. if _, err := f.Seek(4, 0); err != nil {
  1415. return
  1416. }
  1417. _, _ = f.Write(buf[:])
  1418. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1419. if _, err := f.Seek(24, 0); err != nil {
  1420. return
  1421. }
  1422. _, _ = f.Write(buf[:])
  1423. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1424. if _, err := f.Seek(28, 0); err != nil {
  1425. return
  1426. }
  1427. _, _ = f.Write(buf[:])
  1428. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1429. if _, err := f.Seek(40, 0); err != nil {
  1430. return
  1431. }
  1432. _, _ = f.Write(buf[:])
  1433. }
  1434. // ResetStreams forces all active streaming sessions to discard their FIR states and decimation phases.
  1435. // This is used when the upstream DSP drops samples, creating a hard break in phase continuity.
  1436. func (st *Streamer) ResetStreams() {
  1437. st.mu.Lock()
  1438. defer st.mu.Unlock()
  1439. for _, sess := range st.sessions {
  1440. sess.preDemodFIR = nil
  1441. sess.preDemodDecimPhase = 0
  1442. sess.stereoResampler = nil
  1443. sess.monoResampler = nil
  1444. }
  1445. }