Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

1486 řádky
43KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. lastAudioL float32
  37. lastAudioR float32
  38. prevAudioL float64 // second-to-last L sample for boundary transient detection
  39. lastAudioSet bool
  40. // listenOnly sessions have no WAV file and no disk I/O.
  41. // They exist solely to feed audio to live-listen subscribers.
  42. listenOnly bool
  43. // Recording state (nil/zero for listen-only sessions)
  44. dir string
  45. wavFile *os.File
  46. wavBuf *bufio.Writer
  47. wavSamples int64
  48. segmentIdx int
  49. sampleRate int // actual output audio sample rate (always streamAudioRate)
  50. channels int
  51. demodName string
  52. // --- Persistent DSP state for click-free streaming ---
  53. // Overlap-save: tail of previous extracted IQ snippet.
  54. overlapIQ []complex64
  55. // De-emphasis IIR state (persists across frames)
  56. deemphL float64
  57. deemphR float64
  58. // Stereo lock state for live WFM streaming
  59. stereoEnabled bool
  60. stereoOnCount int
  61. stereoOffCount int
  62. // Pilot-locked stereo PLL state (19kHz pilot)
  63. pilotPhase float64
  64. pilotFreq float64
  65. pilotAlpha float64
  66. pilotBeta float64
  67. pilotErrAvg float64
  68. pilotI float64
  69. pilotQ float64
  70. pilotLPAlpha float64
  71. // Polyphase resampler (replaces integer-decimate hack)
  72. monoResampler *dsp.Resampler
  73. monoResamplerRate int
  74. stereoResampler *dsp.StereoResampler
  75. stereoResamplerRate int
  76. // AQ-4: Stateful FIR filters for click-free stereo decode
  77. stereoFilterRate int
  78. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  79. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  80. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  81. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  82. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  83. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  84. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  85. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  86. // and avoids per-frame FIR recomputation)
  87. preDemodFIR *dsp.StatefulFIRComplex
  88. preDemodDecim int // cached decimation factor
  89. preDemodRate int // cached snipRate this FIR was built for
  90. preDemodCutoff float64 // cached cutoff
  91. preDemodDecimPhase int // stateful decimation phase (index offset into next frame)
  92. // AQ-2: De-emphasis config (µs, 0 = disabled)
  93. deemphasisUs float64
  94. // Scratch buffers — reused across frames to avoid GC pressure.
  95. // Grown as needed, never shrunk.
  96. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  97. scratchAudio []float32 // for stereo decode intermediates
  98. scratchPCM []byte // for PCM encoding
  99. // live-listen subscribers
  100. audioSubs []audioSub
  101. }
  102. type audioSub struct {
  103. id int64
  104. ch chan []byte
  105. }
  106. type RuntimeSignalInfo struct {
  107. DemodName string
  108. PlaybackMode string
  109. StereoState string
  110. Channels int
  111. SampleRate int
  112. }
  113. // AudioInfo describes the audio format of a live-listen subscription.
  114. // Sent to the WebSocket client as the first message.
  115. type AudioInfo struct {
  116. SampleRate int `json:"sample_rate"`
  117. Channels int `json:"channels"`
  118. Format string `json:"format"` // always "s16le"
  119. DemodName string `json:"demod"`
  120. PlaybackMode string `json:"playback_mode,omitempty"`
  121. StereoState string `json:"stereo_state,omitempty"`
  122. }
  123. const (
  124. streamAudioRate = 48000
  125. resamplerTaps = 32 // taps per polyphase arm — good quality
  126. )
  127. // ---------------------------------------------------------------------------
  128. // Streamer — manages all active streaming sessions
  129. // ---------------------------------------------------------------------------
  130. type streamFeedItem struct {
  131. signal detector.Signal
  132. snippet []complex64
  133. snipRate int
  134. }
  135. type streamFeedMsg struct {
  136. traceID uint64
  137. items []streamFeedItem
  138. }
  139. type Streamer struct {
  140. mu sync.Mutex
  141. sessions map[int64]*streamSession
  142. policy Policy
  143. centerHz float64
  144. nextSub int64
  145. feedCh chan streamFeedMsg
  146. done chan struct{}
  147. droppedFeed uint64
  148. droppedPCM uint64
  149. lastFeedTS time.Time
  150. lastProcTS time.Time
  151. // pendingListens are subscribers waiting for a matching session.
  152. pendingListens map[int64]*pendingListen
  153. }
  154. type pendingListen struct {
  155. freq float64
  156. bw float64
  157. mode string
  158. ch chan []byte
  159. }
  160. func newStreamer(policy Policy, centerHz float64) *Streamer {
  161. st := &Streamer{
  162. sessions: make(map[int64]*streamSession),
  163. policy: policy,
  164. centerHz: centerHz,
  165. feedCh: make(chan streamFeedMsg, 2),
  166. done: make(chan struct{}),
  167. pendingListens: make(map[int64]*pendingListen),
  168. }
  169. go st.worker()
  170. return st
  171. }
  172. func (st *Streamer) worker() {
  173. for msg := range st.feedCh {
  174. st.processFeed(msg)
  175. }
  176. close(st.done)
  177. }
  178. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  179. st.mu.Lock()
  180. defer st.mu.Unlock()
  181. wasEnabled := st.policy.Enabled
  182. st.policy = policy
  183. st.centerHz = centerHz
  184. // If recording was just disabled, close recording sessions
  185. // but keep listen-only sessions alive.
  186. if wasEnabled && !policy.Enabled {
  187. for id, sess := range st.sessions {
  188. if sess.listenOnly {
  189. continue
  190. }
  191. if len(sess.audioSubs) > 0 {
  192. // Convert to listen-only: close WAV but keep session
  193. convertToListenOnly(sess)
  194. } else {
  195. closeSession(sess, &st.policy)
  196. delete(st.sessions, id)
  197. }
  198. }
  199. }
  200. }
  201. // HasListeners returns true if any sessions have audio subscribers
  202. // or there are pending listen requests. Used by the DSP loop to
  203. // decide whether to feed snippets even when recording is disabled.
  204. func (st *Streamer) HasListeners() bool {
  205. st.mu.Lock()
  206. defer st.mu.Unlock()
  207. return st.hasListenersLocked()
  208. }
  209. func (st *Streamer) hasListenersLocked() bool {
  210. if len(st.pendingListens) > 0 {
  211. return true
  212. }
  213. for _, sess := range st.sessions {
  214. if len(sess.audioSubs) > 0 {
  215. return true
  216. }
  217. }
  218. return false
  219. }
  220. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  221. // Feeds are accepted if:
  222. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  223. // - Any live-listen subscribers exist (listen-only mode)
  224. //
  225. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  226. // data, so items can be passed directly without another copy.
  227. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  228. st.mu.Lock()
  229. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  230. hasListeners := st.hasListenersLocked()
  231. pending := len(st.pendingListens)
  232. debugLiveAudio := st.policy.DebugLiveAudio
  233. now := time.Now()
  234. if !st.lastFeedTS.IsZero() {
  235. gap := now.Sub(st.lastFeedTS)
  236. if gap > 150*time.Millisecond {
  237. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  238. }
  239. }
  240. st.lastFeedTS = now
  241. st.mu.Unlock()
  242. if debugLiveAudio {
  243. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  244. }
  245. if (!recEnabled && !hasListeners) || len(items) == 0 {
  246. return
  247. }
  248. select {
  249. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}:
  250. default:
  251. st.droppedFeed++
  252. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  253. }
  254. }
  255. // processFeed runs in the worker goroutine.
  256. func (st *Streamer) processFeed(msg streamFeedMsg) {
  257. st.mu.Lock()
  258. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  259. hasListeners := st.hasListenersLocked()
  260. now := time.Now()
  261. if !st.lastProcTS.IsZero() {
  262. gap := now.Sub(st.lastProcTS)
  263. if gap > 150*time.Millisecond {
  264. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  265. }
  266. }
  267. st.lastProcTS = now
  268. defer st.mu.Unlock()
  269. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  270. if !recEnabled && !hasListeners {
  271. return
  272. }
  273. seen := make(map[int64]bool, len(msg.items))
  274. for i := range msg.items {
  275. item := &msg.items[i]
  276. sig := &item.signal
  277. seen[sig.ID] = true
  278. if sig.ID == 0 || sig.Class == nil {
  279. continue
  280. }
  281. if len(item.snippet) == 0 || item.snipRate <= 0 {
  282. continue
  283. }
  284. // Decide whether this signal needs a session
  285. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  286. needsListen := st.signalHasListenerLocked(sig)
  287. className := "<nil>"
  288. demodName := ""
  289. if sig.Class != nil {
  290. className = string(sig.Class.ModType)
  291. demodName, _ = resolveDemod(sig)
  292. }
  293. if st.policy.DebugLiveAudio {
  294. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  295. }
  296. if !needsRecording && !needsListen {
  297. continue
  298. }
  299. sess, exists := st.sessions[sig.ID]
  300. requestedMode := ""
  301. for _, pl := range st.pendingListens {
  302. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  303. if m := normalizeRequestedMode(pl.mode); m != "" {
  304. requestedMode = m
  305. break
  306. }
  307. }
  308. }
  309. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  310. for _, sub := range sess.audioSubs {
  311. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  312. }
  313. delete(st.sessions, sig.ID)
  314. sess = nil
  315. exists = false
  316. }
  317. if !exists {
  318. if needsRecording {
  319. s, err := st.openRecordingSession(sig, now)
  320. if err != nil {
  321. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  322. sig.ID, sig.CenterHz/1e6, err)
  323. continue
  324. }
  325. st.sessions[sig.ID] = s
  326. sess = s
  327. } else {
  328. s := st.openListenSession(sig, now)
  329. st.sessions[sig.ID] = s
  330. sess = s
  331. }
  332. // Attach any pending listeners
  333. st.attachPendingListeners(sess)
  334. }
  335. // Update metadata
  336. sess.lastFeed = now
  337. sess.centerHz = sig.CenterHz
  338. sess.bwHz = sig.BWHz
  339. if sig.SNRDb > sess.snrDb {
  340. sess.snrDb = sig.SNRDb
  341. }
  342. if sig.PeakDb > sess.peakDb {
  343. sess.peakDb = sig.PeakDb
  344. }
  345. if sig.Class != nil {
  346. sess.class = sig.Class
  347. }
  348. // Demod with persistent state
  349. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  350. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  351. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  352. if len(audio) == 0 {
  353. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  354. }
  355. if len(audio) > 0 {
  356. if sess.wavSamples == 0 && audioRate > 0 {
  357. sess.sampleRate = audioRate
  358. }
  359. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  360. pcmLen := len(audio) * 2
  361. pcm := sess.growPCM(pcmLen)
  362. for k, s := range audio {
  363. v := int16(clip(s * 32767))
  364. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  365. }
  366. if !sess.listenOnly && sess.wavBuf != nil {
  367. n, err := sess.wavBuf.Write(pcm)
  368. if err != nil {
  369. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  370. } else {
  371. sess.wavSamples += int64(n / 2)
  372. }
  373. }
  374. // Gap logging for live-audio sessions + transient click detector
  375. if len(sess.audioSubs) > 0 {
  376. if !sess.lastAudioTs.IsZero() {
  377. gap := time.Since(sess.lastAudioTs)
  378. if gap > 150*time.Millisecond {
  379. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  380. }
  381. }
  382. // Transient click detector: finds short impulses (1-3 samples)
  383. // that deviate sharply from the local signal trend.
  384. // A click looks like: ...smooth... SPIKE ...smooth...
  385. // Normal FM audio has large deltas too, but they follow
  386. // a continuous curve. A click has high |d2/dt2| (acceleration).
  387. //
  388. // Method: second-derivative detector. For each sample triplet
  389. // (a, b, c), compute |2b - a - c| which is the discrete
  390. // second derivative magnitude. High values = transient spike.
  391. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  392. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  393. stride := sess.channels
  394. if stride < 1 {
  395. stride = 1
  396. }
  397. nFrames := len(audio) / stride
  398. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  399. if sess.lastAudioSet && nFrames >= 1 {
  400. // second derivative across boundary: |2*last - prevLast - first|
  401. first := float64(audio[0])
  402. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  403. if d2 > 0.15 {
  404. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  405. }
  406. }
  407. // Intra-frame transient scan (L channel only for performance)
  408. nClicks := 0
  409. maxD2 := float64(0)
  410. maxD2Pos := 0
  411. for k := 1; k < nFrames-1; k++ {
  412. a := float64(audio[(k-1)*stride])
  413. b := float64(audio[k*stride])
  414. c := float64(audio[(k+1)*stride])
  415. d2 := math.Abs(2*b - a - c)
  416. if d2 > maxD2 {
  417. maxD2 = d2
  418. maxD2Pos = k
  419. }
  420. if d2 > 0.15 {
  421. nClicks++
  422. }
  423. }
  424. if nClicks > 0 {
  425. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  426. }
  427. // Store last two samples for next frame's boundary check
  428. if nFrames >= 2 {
  429. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  430. sess.lastAudioL = audio[(nFrames-1)*stride]
  431. if stride > 1 {
  432. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  433. }
  434. } else if nFrames == 1 {
  435. sess.prevAudioL = float64(sess.lastAudioL)
  436. sess.lastAudioL = audio[0]
  437. if stride > 1 && len(audio) >= 2 {
  438. sess.lastAudioR = audio[1]
  439. }
  440. }
  441. sess.lastAudioSet = true
  442. }
  443. sess.lastAudioTs = time.Now()
  444. }
  445. st.fanoutPCM(sess, pcm, pcmLen)
  446. }
  447. // Segment split (recording sessions only)
  448. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  449. segIdx := sess.segmentIdx + 1
  450. oldSubs := sess.audioSubs
  451. oldState := sess.captureDSPState()
  452. sess.audioSubs = nil
  453. closeSession(sess, &st.policy)
  454. s, err := st.openRecordingSession(sig, now)
  455. if err != nil {
  456. delete(st.sessions, sig.ID)
  457. continue
  458. }
  459. s.segmentIdx = segIdx
  460. s.audioSubs = oldSubs
  461. s.restoreDSPState(oldState)
  462. st.sessions[sig.ID] = s
  463. }
  464. }
  465. // Close sessions for disappeared signals (with grace period)
  466. for id, sess := range st.sessions {
  467. if seen[id] {
  468. continue
  469. }
  470. gracePeriod := 3 * time.Second
  471. if sess.listenOnly {
  472. gracePeriod = 5 * time.Second
  473. }
  474. if now.Sub(sess.lastFeed) > gracePeriod {
  475. for _, sub := range sess.audioSubs {
  476. close(sub.ch)
  477. }
  478. sess.audioSubs = nil
  479. if !sess.listenOnly {
  480. closeSession(sess, &st.policy)
  481. }
  482. delete(st.sessions, id)
  483. }
  484. }
  485. }
  486. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  487. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  488. if st.policy.DebugLiveAudio {
  489. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  490. }
  491. return true
  492. }
  493. for subID, pl := range st.pendingListens {
  494. delta := math.Abs(sig.CenterHz - pl.freq)
  495. if delta < 200000 {
  496. if st.policy.DebugLiveAudio {
  497. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  498. }
  499. return true
  500. }
  501. }
  502. return false
  503. }
  504. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  505. for subID, pl := range st.pendingListens {
  506. requestedMode := normalizeRequestedMode(pl.mode)
  507. if requestedMode != "" && sess.demodName != requestedMode {
  508. continue
  509. }
  510. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  511. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  512. delete(st.pendingListens, subID)
  513. // Send updated audio_info now that we know the real session params.
  514. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  515. infoJSON, _ := json.Marshal(sess.audioInfo())
  516. tagged := make([]byte, 1+len(infoJSON))
  517. tagged[0] = 0x00 // tag: audio_info
  518. copy(tagged[1:], infoJSON)
  519. select {
  520. case pl.ch <- tagged:
  521. default:
  522. }
  523. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  524. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  525. }
  526. }
  527. }
  528. // CloseAll finalises all sessions and stops the worker goroutine.
  529. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  530. st.mu.Lock()
  531. defer st.mu.Unlock()
  532. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  533. for _, sess := range st.sessions {
  534. out[sess.signalID] = RuntimeSignalInfo{
  535. DemodName: sess.demodName,
  536. PlaybackMode: sess.playbackMode,
  537. StereoState: sess.stereoState,
  538. Channels: sess.channels,
  539. SampleRate: sess.sampleRate,
  540. }
  541. }
  542. return out
  543. }
  544. func (st *Streamer) CloseAll() {
  545. close(st.feedCh)
  546. <-st.done
  547. st.mu.Lock()
  548. defer st.mu.Unlock()
  549. for id, sess := range st.sessions {
  550. for _, sub := range sess.audioSubs {
  551. close(sub.ch)
  552. }
  553. sess.audioSubs = nil
  554. if !sess.listenOnly {
  555. closeSession(sess, &st.policy)
  556. }
  557. delete(st.sessions, id)
  558. }
  559. for _, pl := range st.pendingListens {
  560. close(pl.ch)
  561. }
  562. st.pendingListens = nil
  563. }
  564. // ActiveSessions returns the number of open streaming sessions.
  565. func (st *Streamer) ActiveSessions() int {
  566. st.mu.Lock()
  567. defer st.mu.Unlock()
  568. return len(st.sessions)
  569. }
  570. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  571. //
  572. // LL-2: Returns AudioInfo with correct channels and sample rate.
  573. // LL-3: Returns error only on hard failures (nil streamer etc).
  574. //
  575. // If a matching session exists, attaches immediately. Otherwise, the
  576. // subscriber is held as "pending" and will be attached when a matching
  577. // signal appears in the next DSP frame.
  578. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  579. ch := make(chan []byte, 64)
  580. st.mu.Lock()
  581. defer st.mu.Unlock()
  582. st.nextSub++
  583. subID := st.nextSub
  584. requestedMode := normalizeRequestedMode(mode)
  585. // Try to find a matching session
  586. var bestSess *streamSession
  587. bestDist := math.MaxFloat64
  588. for _, sess := range st.sessions {
  589. if requestedMode != "" && sess.demodName != requestedMode {
  590. continue
  591. }
  592. d := math.Abs(sess.centerHz - freq)
  593. if d < bestDist {
  594. bestDist = d
  595. bestSess = sess
  596. }
  597. }
  598. if bestSess != nil && bestDist < 200000 {
  599. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  600. info := bestSess.audioInfo()
  601. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  602. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  603. return subID, ch, info, nil
  604. }
  605. // No matching session yet — add as pending listener
  606. st.pendingListens[subID] = &pendingListen{
  607. freq: freq,
  608. bw: bw,
  609. mode: mode,
  610. ch: ch,
  611. }
  612. info := defaultAudioInfoForMode(mode)
  613. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  614. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  615. return subID, ch, info, nil
  616. }
  617. // UnsubscribeAudio removes a live-listen subscriber.
  618. func (st *Streamer) UnsubscribeAudio(subID int64) {
  619. st.mu.Lock()
  620. defer st.mu.Unlock()
  621. if pl, ok := st.pendingListens[subID]; ok {
  622. close(pl.ch)
  623. delete(st.pendingListens, subID)
  624. return
  625. }
  626. for _, sess := range st.sessions {
  627. for i, sub := range sess.audioSubs {
  628. if sub.id == subID {
  629. close(sub.ch)
  630. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  631. return
  632. }
  633. }
  634. }
  635. }
  636. // ---------------------------------------------------------------------------
  637. // Session: stateful extraction + demod
  638. // ---------------------------------------------------------------------------
  639. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  640. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  641. // output with zero transient artifacts.
  642. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  643. if len(snippet) == 0 || snipRate <= 0 {
  644. return nil, 0
  645. }
  646. isWFMStereo := sess.demodName == "WFM_STEREO"
  647. isWFM := sess.demodName == "WFM" || isWFMStereo
  648. demodName := sess.demodName
  649. if isWFMStereo {
  650. demodName = "WFM"
  651. }
  652. d := demod.Get(demodName)
  653. if d == nil {
  654. d = demod.Get("NFM")
  655. }
  656. if d == nil {
  657. return nil, 0
  658. }
  659. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  660. // The FM discriminator needs iq[i-1] to compute the first output.
  661. // All FIR filtering is now stateful, so no additional overlap is needed.
  662. var fullSnip []complex64
  663. trimSamples := 0
  664. _ = trimSamples
  665. if len(sess.overlapIQ) == 1 {
  666. fullSnip = make([]complex64, 1+len(snippet))
  667. fullSnip[0] = sess.overlapIQ[0]
  668. copy(fullSnip[1:], snippet)
  669. trimSamples = 1
  670. logging.Debug("discrim", "overlap_applied", "signal", sess.signalID, "snip", len(snippet))
  671. } else {
  672. fullSnip = snippet
  673. }
  674. // Save last sample for next frame's FM discriminator
  675. if len(snippet) > 0 {
  676. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  677. }
  678. // --- Stateful anti-alias FIR + decimation to demod rate ---
  679. demodRate := d.OutputSampleRate()
  680. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  681. if decim1 < 1 {
  682. decim1 = 1
  683. }
  684. // WFM override: force decim1=2 (256kHz) instead of round(512k/192k)=3 (170kHz).
  685. // At decim1=3, Nyquist is 85kHz which clips FM broadcast ±75kHz deviation.
  686. // At decim1=2, Nyquist is 128kHz → full FM deviation + stereo pilot + guard band.
  687. // Bonus: 256000→48000 resampler ratio is L=3/M=16 (96 taps, 1kB) instead of
  688. // the pathological L=24000/M=85333 (768k taps, 6MB) from 170666→48000.
  689. if isWFM && decim1 > 2 && snipRate/2 >= 200000 {
  690. decim1 = 2
  691. }
  692. actualDemodRate := snipRate / decim1
  693. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  694. var dec []complex64
  695. if decim1 > 1 {
  696. // FIR cutoff: for WFM, use 90kHz (above ±75kHz FM deviation + guard).
  697. // For NFM/other: use standard Nyquist*0.8 cutoff.
  698. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  699. if isWFM {
  700. cutoff = 90000
  701. }
  702. // Lazy-init or reinit stateful FIR if parameters changed
  703. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  704. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  705. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  706. sess.preDemodRate = snipRate
  707. sess.preDemodCutoff = cutoff
  708. sess.preDemodDecim = decim1
  709. sess.preDemodDecimPhase = 0
  710. }
  711. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  712. dec = dsp.DecimateStateful(filtered, decim1, &sess.preDemodDecimPhase)
  713. } else {
  714. dec = fullSnip
  715. }
  716. // --- FM Demod ---
  717. audio := d.Demod(dec, actualDemodRate)
  718. if len(audio) == 0 {
  719. return nil, 0
  720. }
  721. // --- Trim the 1-sample FM discriminator overlap ---
  722. // TEMP: skip audio trim to test if per-block trimming causes ticks
  723. // --- Stateful stereo decode with conservative lock/hysteresis ---
  724. channels := 1
  725. if isWFMStereo {
  726. sess.playbackMode = "WFM_STEREO"
  727. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  728. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  729. if locked {
  730. sess.stereoOnCount++
  731. sess.stereoOffCount = 0
  732. if sess.stereoOnCount >= 4 {
  733. sess.stereoEnabled = true
  734. }
  735. } else {
  736. sess.stereoOnCount = 0
  737. sess.stereoOffCount++
  738. if sess.stereoOffCount >= 10 {
  739. sess.stereoEnabled = false
  740. }
  741. }
  742. prevPlayback := sess.playbackMode
  743. prevStereo := sess.stereoState
  744. if sess.stereoEnabled && len(stereoAudio) > 0 {
  745. sess.stereoState = "locked"
  746. audio = stereoAudio
  747. } else {
  748. sess.stereoState = "mono-fallback"
  749. dual := make([]float32, len(audio)*2)
  750. for i, s := range audio {
  751. dual[i*2] = s
  752. dual[i*2+1] = s
  753. }
  754. audio = dual
  755. }
  756. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  757. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  758. }
  759. }
  760. // --- Polyphase resample to exact 48kHz ---
  761. if actualDemodRate != streamAudioRate {
  762. if channels > 1 {
  763. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  764. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  765. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  766. sess.stereoResamplerRate = actualDemodRate
  767. }
  768. audio = sess.stereoResampler.Process(audio)
  769. } else {
  770. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  771. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  772. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  773. sess.monoResamplerRate = actualDemodRate
  774. }
  775. audio = sess.monoResampler.Process(audio)
  776. }
  777. }
  778. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  779. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  780. tau := sess.deemphasisUs * 1e-6
  781. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  782. if channels > 1 {
  783. nFrames := len(audio) / channels
  784. yL, yR := sess.deemphL, sess.deemphR
  785. for i := 0; i < nFrames; i++ {
  786. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  787. audio[i*2] = float32(yL)
  788. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  789. audio[i*2+1] = float32(yR)
  790. }
  791. sess.deemphL, sess.deemphR = yL, yR
  792. } else {
  793. y := sess.deemphL
  794. for i := range audio {
  795. y = alpha*y + (1-alpha)*float64(audio[i])
  796. audio[i] = float32(y)
  797. }
  798. sess.deemphL = y
  799. }
  800. }
  801. if isWFM {
  802. for i := range audio {
  803. audio[i] *= 0.35
  804. }
  805. }
  806. return audio, streamAudioRate
  807. }
  808. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  809. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  810. // loopBW is in Hz, sampleRate in samples/sec.
  811. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  812. if sampleRate <= 0 || loopBW <= 0 {
  813. return 0, 0
  814. }
  815. bl := loopBW / float64(sampleRate)
  816. theta := bl / (damping + 0.25/damping)
  817. d := 1 + 2*damping*theta + theta*theta
  818. alpha := (4 * damping * theta) / d
  819. beta := (4 * theta * theta) / d
  820. return alpha, beta
  821. }
  822. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  823. // Uses persistent FIR filter state across frames for click-free stereo.
  824. // Reuses session scratch buffers to minimize allocations.
  825. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  826. if len(mono) == 0 || sampleRate <= 0 {
  827. return nil, false
  828. }
  829. n := len(mono)
  830. // Rebuild rate-dependent stereo filters when sampleRate changes
  831. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  832. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  833. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  834. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  835. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  836. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  837. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  838. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  839. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  840. sess.stereoFilterRate = sampleRate
  841. // Initialize PLL for 19kHz pilot tracking.
  842. sess.pilotPhase = 0
  843. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  844. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  845. sess.pilotErrAvg = 0
  846. sess.pilotI = 0
  847. sess.pilotQ = 0
  848. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  849. }
  850. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  851. scratch := sess.growAudio(n * 5)
  852. lpr := scratch[:n]
  853. bpfLR := scratch[n : 2*n]
  854. lr := scratch[2*n : 3*n]
  855. work1 := scratch[3*n : 4*n]
  856. work2 := scratch[4*n : 5*n]
  857. sess.stereoLPF.ProcessInto(mono, lpr)
  858. // 23-53kHz bandpass for L-R DSB-SC.
  859. sess.stereoBPHi.ProcessInto(mono, work1)
  860. sess.stereoBPLo.ProcessInto(mono, work2)
  861. for i := 0; i < n; i++ {
  862. bpfLR[i] = work1[i] - work2[i]
  863. }
  864. // 19kHz pilot bandpass for PLL.
  865. sess.pilotLPFHi.ProcessInto(mono, work1)
  866. sess.pilotLPFLo.ProcessInto(mono, work2)
  867. for i := 0; i < n; i++ {
  868. work1[i] = work1[i] - work2[i]
  869. }
  870. pilot := work1
  871. phase := sess.pilotPhase
  872. freq := sess.pilotFreq
  873. alpha := sess.pilotAlpha
  874. beta := sess.pilotBeta
  875. iState := sess.pilotI
  876. qState := sess.pilotQ
  877. lpAlpha := sess.pilotLPAlpha
  878. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  879. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  880. var pilotPower float64
  881. var totalPower float64
  882. var errSum float64
  883. for i := 0; i < n; i++ {
  884. p := float64(pilot[i])
  885. sinP, cosP := math.Sincos(phase)
  886. iMix := p * cosP
  887. qMix := p * -sinP
  888. iState += lpAlpha * (iMix - iState)
  889. qState += lpAlpha * (qMix - qState)
  890. err := math.Atan2(qState, iState)
  891. freq += beta * err
  892. if freq < minFreq {
  893. freq = minFreq
  894. } else if freq > maxFreq {
  895. freq = maxFreq
  896. }
  897. phase += freq + alpha*err
  898. if phase > 2*math.Pi {
  899. phase -= 2 * math.Pi
  900. } else if phase < 0 {
  901. phase += 2 * math.Pi
  902. }
  903. totalPower += float64(mono[i]) * float64(mono[i])
  904. pilotPower += p * p
  905. errSum += math.Abs(err)
  906. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  907. }
  908. sess.pilotPhase = phase
  909. sess.pilotFreq = freq
  910. sess.pilotI = iState
  911. sess.pilotQ = qState
  912. blockErr := errSum / float64(n)
  913. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  914. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  915. pilotRatio := 0.0
  916. if totalPower > 0 {
  917. pilotRatio = pilotPower / totalPower
  918. }
  919. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  920. // Lock heuristics: pilot power fraction and PLL phase error stability.
  921. // Pilot power is a small but stable fraction of composite energy; require
  922. // a modest floor plus PLL settling to avoid flapping in noise.
  923. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  924. out := make([]float32, n*2)
  925. for i := 0; i < n; i++ {
  926. out[i*2] = 0.5 * (lpr[i] + lr[i])
  927. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  928. }
  929. return out, locked
  930. }
  931. // dspStateSnapshot captures persistent DSP state for segment splits.
  932. type dspStateSnapshot struct {
  933. overlapIQ []complex64
  934. deemphL float64
  935. deemphR float64
  936. pilotPhase float64
  937. pilotFreq float64
  938. pilotAlpha float64
  939. pilotBeta float64
  940. pilotErrAvg float64
  941. pilotI float64
  942. pilotQ float64
  943. pilotLPAlpha float64
  944. monoResampler *dsp.Resampler
  945. monoResamplerRate int
  946. stereoResampler *dsp.StereoResampler
  947. stereoResamplerRate int
  948. stereoLPF *dsp.StatefulFIRReal
  949. stereoFilterRate int
  950. stereoBPHi *dsp.StatefulFIRReal
  951. stereoBPLo *dsp.StatefulFIRReal
  952. stereoLRLPF *dsp.StatefulFIRReal
  953. stereoAALPF *dsp.StatefulFIRReal
  954. pilotLPFHi *dsp.StatefulFIRReal
  955. pilotLPFLo *dsp.StatefulFIRReal
  956. preDemodFIR *dsp.StatefulFIRComplex
  957. preDemodDecim int
  958. preDemodRate int
  959. preDemodCutoff float64
  960. preDemodDecimPhase int
  961. }
  962. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  963. return dspStateSnapshot{
  964. overlapIQ: sess.overlapIQ,
  965. deemphL: sess.deemphL,
  966. deemphR: sess.deemphR,
  967. pilotPhase: sess.pilotPhase,
  968. pilotFreq: sess.pilotFreq,
  969. pilotAlpha: sess.pilotAlpha,
  970. pilotBeta: sess.pilotBeta,
  971. pilotErrAvg: sess.pilotErrAvg,
  972. pilotI: sess.pilotI,
  973. pilotQ: sess.pilotQ,
  974. pilotLPAlpha: sess.pilotLPAlpha,
  975. monoResampler: sess.monoResampler,
  976. monoResamplerRate: sess.monoResamplerRate,
  977. stereoResampler: sess.stereoResampler,
  978. stereoResamplerRate: sess.stereoResamplerRate,
  979. stereoLPF: sess.stereoLPF,
  980. stereoFilterRate: sess.stereoFilterRate,
  981. stereoBPHi: sess.stereoBPHi,
  982. stereoBPLo: sess.stereoBPLo,
  983. stereoLRLPF: sess.stereoLRLPF,
  984. stereoAALPF: sess.stereoAALPF,
  985. pilotLPFHi: sess.pilotLPFHi,
  986. pilotLPFLo: sess.pilotLPFLo,
  987. preDemodFIR: sess.preDemodFIR,
  988. preDemodDecim: sess.preDemodDecim,
  989. preDemodRate: sess.preDemodRate,
  990. preDemodCutoff: sess.preDemodCutoff,
  991. preDemodDecimPhase: sess.preDemodDecimPhase,
  992. }
  993. }
  994. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  995. sess.overlapIQ = s.overlapIQ
  996. sess.deemphL = s.deemphL
  997. sess.deemphR = s.deemphR
  998. sess.pilotPhase = s.pilotPhase
  999. sess.pilotFreq = s.pilotFreq
  1000. sess.pilotAlpha = s.pilotAlpha
  1001. sess.pilotBeta = s.pilotBeta
  1002. sess.pilotErrAvg = s.pilotErrAvg
  1003. sess.pilotI = s.pilotI
  1004. sess.pilotQ = s.pilotQ
  1005. sess.pilotLPAlpha = s.pilotLPAlpha
  1006. sess.monoResampler = s.monoResampler
  1007. sess.monoResamplerRate = s.monoResamplerRate
  1008. sess.stereoResampler = s.stereoResampler
  1009. sess.stereoResamplerRate = s.stereoResamplerRate
  1010. sess.stereoLPF = s.stereoLPF
  1011. sess.stereoFilterRate = s.stereoFilterRate
  1012. sess.stereoBPHi = s.stereoBPHi
  1013. sess.stereoBPLo = s.stereoBPLo
  1014. sess.stereoLRLPF = s.stereoLRLPF
  1015. sess.stereoAALPF = s.stereoAALPF
  1016. sess.pilotLPFHi = s.pilotLPFHi
  1017. sess.pilotLPFLo = s.pilotLPFLo
  1018. sess.preDemodFIR = s.preDemodFIR
  1019. sess.preDemodDecim = s.preDemodDecim
  1020. sess.preDemodRate = s.preDemodRate
  1021. sess.preDemodCutoff = s.preDemodCutoff
  1022. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1023. }
  1024. // ---------------------------------------------------------------------------
  1025. // Session management helpers
  1026. // ---------------------------------------------------------------------------
  1027. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1028. outputDir := st.policy.OutputDir
  1029. if outputDir == "" {
  1030. outputDir = "data/recordings"
  1031. }
  1032. demodName, channels := resolveDemod(sig)
  1033. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1034. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1035. dir := filepath.Join(outputDir, dirName)
  1036. if err := os.MkdirAll(dir, 0o755); err != nil {
  1037. return nil, err
  1038. }
  1039. wavPath := filepath.Join(dir, "audio.wav")
  1040. f, err := os.Create(wavPath)
  1041. if err != nil {
  1042. return nil, err
  1043. }
  1044. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1045. f.Close()
  1046. return nil, err
  1047. }
  1048. playbackMode, stereoState := initialPlaybackState(demodName)
  1049. sess := &streamSession{
  1050. signalID: sig.ID,
  1051. centerHz: sig.CenterHz,
  1052. bwHz: sig.BWHz,
  1053. snrDb: sig.SNRDb,
  1054. peakDb: sig.PeakDb,
  1055. class: sig.Class,
  1056. startTime: now,
  1057. lastFeed: now,
  1058. dir: dir,
  1059. wavFile: f,
  1060. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1061. sampleRate: streamAudioRate,
  1062. channels: channels,
  1063. demodName: demodName,
  1064. playbackMode: playbackMode,
  1065. stereoState: stereoState,
  1066. deemphasisUs: st.policy.DeemphasisUs,
  1067. }
  1068. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1069. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1070. return sess, nil
  1071. }
  1072. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1073. demodName, channels := resolveDemod(sig)
  1074. for _, pl := range st.pendingListens {
  1075. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1076. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1077. demodName = requested
  1078. if demodName == "WFM_STEREO" {
  1079. channels = 2
  1080. } else if d := demod.Get(demodName); d != nil {
  1081. channels = d.Channels()
  1082. } else {
  1083. channels = 1
  1084. }
  1085. break
  1086. }
  1087. }
  1088. }
  1089. playbackMode, stereoState := initialPlaybackState(demodName)
  1090. sess := &streamSession{
  1091. signalID: sig.ID,
  1092. centerHz: sig.CenterHz,
  1093. bwHz: sig.BWHz,
  1094. snrDb: sig.SNRDb,
  1095. peakDb: sig.PeakDb,
  1096. class: sig.Class,
  1097. startTime: now,
  1098. lastFeed: now,
  1099. listenOnly: true,
  1100. sampleRate: streamAudioRate,
  1101. channels: channels,
  1102. demodName: demodName,
  1103. playbackMode: playbackMode,
  1104. stereoState: stereoState,
  1105. deemphasisUs: st.policy.DeemphasisUs,
  1106. }
  1107. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1108. sig.ID, sig.CenterHz/1e6, demodName)
  1109. return sess
  1110. }
  1111. func resolveDemod(sig *detector.Signal) (string, int) {
  1112. demodName := "NFM"
  1113. if sig.Class != nil {
  1114. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1115. demodName = n
  1116. }
  1117. }
  1118. channels := 1
  1119. if demodName == "WFM_STEREO" {
  1120. channels = 2
  1121. } else if d := demod.Get(demodName); d != nil {
  1122. channels = d.Channels()
  1123. }
  1124. return demodName, channels
  1125. }
  1126. func initialPlaybackState(demodName string) (string, string) {
  1127. playbackMode := demodName
  1128. stereoState := "mono"
  1129. if demodName == "WFM_STEREO" {
  1130. stereoState = "searching"
  1131. }
  1132. return playbackMode, stereoState
  1133. }
  1134. func (sess *streamSession) audioInfo() AudioInfo {
  1135. return AudioInfo{
  1136. SampleRate: sess.sampleRate,
  1137. Channels: sess.channels,
  1138. Format: "s16le",
  1139. DemodName: sess.demodName,
  1140. PlaybackMode: sess.playbackMode,
  1141. StereoState: sess.stereoState,
  1142. }
  1143. }
  1144. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1145. infoJSON, _ := json.Marshal(info)
  1146. tagged := make([]byte, 1+len(infoJSON))
  1147. tagged[0] = 0x00 // tag: audio_info
  1148. copy(tagged[1:], infoJSON)
  1149. for _, sub := range subs {
  1150. select {
  1151. case sub.ch <- tagged:
  1152. default:
  1153. }
  1154. }
  1155. }
  1156. func defaultAudioInfoForMode(mode string) AudioInfo {
  1157. demodName := "NFM"
  1158. if requested := normalizeRequestedMode(mode); requested != "" {
  1159. demodName = requested
  1160. }
  1161. channels := 1
  1162. if demodName == "WFM_STEREO" {
  1163. channels = 2
  1164. } else if d := demod.Get(demodName); d != nil {
  1165. channels = d.Channels()
  1166. }
  1167. playbackMode, stereoState := initialPlaybackState(demodName)
  1168. return AudioInfo{
  1169. SampleRate: streamAudioRate,
  1170. Channels: channels,
  1171. Format: "s16le",
  1172. DemodName: demodName,
  1173. PlaybackMode: playbackMode,
  1174. StereoState: stereoState,
  1175. }
  1176. }
  1177. func normalizeRequestedMode(mode string) string {
  1178. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1179. case "", "AUTO":
  1180. return ""
  1181. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1182. return strings.ToUpper(strings.TrimSpace(mode))
  1183. default:
  1184. return ""
  1185. }
  1186. }
  1187. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1188. func (sess *streamSession) growIQ(n int) []complex64 {
  1189. if cap(sess.scratchIQ) >= n {
  1190. return sess.scratchIQ[:n]
  1191. }
  1192. sess.scratchIQ = make([]complex64, n, n*5/4)
  1193. return sess.scratchIQ
  1194. }
  1195. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1196. func (sess *streamSession) growAudio(n int) []float32 {
  1197. if cap(sess.scratchAudio) >= n {
  1198. return sess.scratchAudio[:n]
  1199. }
  1200. sess.scratchAudio = make([]float32, n, n*5/4)
  1201. return sess.scratchAudio
  1202. }
  1203. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1204. func (sess *streamSession) growPCM(n int) []byte {
  1205. if cap(sess.scratchPCM) >= n {
  1206. return sess.scratchPCM[:n]
  1207. }
  1208. sess.scratchPCM = make([]byte, n, n*5/4)
  1209. return sess.scratchPCM
  1210. }
  1211. func convertToListenOnly(sess *streamSession) {
  1212. if sess.wavBuf != nil {
  1213. _ = sess.wavBuf.Flush()
  1214. }
  1215. if sess.wavFile != nil {
  1216. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1217. sess.wavFile.Close()
  1218. }
  1219. sess.wavFile = nil
  1220. sess.wavBuf = nil
  1221. sess.listenOnly = true
  1222. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1223. }
  1224. func closeSession(sess *streamSession, policy *Policy) {
  1225. if sess.listenOnly {
  1226. return
  1227. }
  1228. if sess.wavBuf != nil {
  1229. _ = sess.wavBuf.Flush()
  1230. }
  1231. if sess.wavFile != nil {
  1232. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1233. sess.wavFile.Close()
  1234. sess.wavFile = nil
  1235. sess.wavBuf = nil
  1236. }
  1237. dur := sess.lastFeed.Sub(sess.startTime)
  1238. files := map[string]any{
  1239. "audio": "audio.wav",
  1240. "audio_sample_rate": sess.sampleRate,
  1241. "audio_channels": sess.channels,
  1242. "audio_demod": sess.demodName,
  1243. "recording_mode": "streaming",
  1244. }
  1245. meta := Meta{
  1246. EventID: sess.signalID,
  1247. Start: sess.startTime,
  1248. End: sess.lastFeed,
  1249. CenterHz: sess.centerHz,
  1250. BandwidthHz: sess.bwHz,
  1251. SampleRate: sess.sampleRate,
  1252. SNRDb: sess.snrDb,
  1253. PeakDb: sess.peakDb,
  1254. Class: sess.class,
  1255. DurationMs: dur.Milliseconds(),
  1256. Files: files,
  1257. }
  1258. b, err := json.MarshalIndent(meta, "", " ")
  1259. if err == nil {
  1260. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1261. }
  1262. if policy != nil {
  1263. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1264. }
  1265. }
  1266. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1267. if len(sess.audioSubs) == 0 {
  1268. return
  1269. }
  1270. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1271. tagged := make([]byte, 1+pcmLen)
  1272. tagged[0] = 0x01
  1273. copy(tagged[1:], pcm[:pcmLen])
  1274. alive := sess.audioSubs[:0]
  1275. for _, sub := range sess.audioSubs {
  1276. select {
  1277. case sub.ch <- tagged:
  1278. default:
  1279. st.droppedPCM++
  1280. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1281. }
  1282. alive = append(alive, sub)
  1283. }
  1284. sess.audioSubs = alive
  1285. }
  1286. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1287. if len(st.policy.ClassFilter) == 0 {
  1288. return true
  1289. }
  1290. if cls == nil {
  1291. return false
  1292. }
  1293. for _, f := range st.policy.ClassFilter {
  1294. if strings.EqualFold(f, string(cls.ModType)) {
  1295. return true
  1296. }
  1297. }
  1298. return false
  1299. }
  1300. // ErrNoSession is returned when no matching signal session exists.
  1301. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1302. // ---------------------------------------------------------------------------
  1303. // WAV header helpers
  1304. // ---------------------------------------------------------------------------
  1305. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1306. if channels <= 0 {
  1307. channels = 1
  1308. }
  1309. hdr := make([]byte, 44)
  1310. copy(hdr[0:4], "RIFF")
  1311. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1312. copy(hdr[8:12], "WAVE")
  1313. copy(hdr[12:16], "fmt ")
  1314. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1315. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1316. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1317. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1318. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1319. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1320. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1321. copy(hdr[36:40], "data")
  1322. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1323. _, err := f.Write(hdr)
  1324. return err
  1325. }
  1326. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1327. dataSize := uint32(totalSamples * 2)
  1328. var buf [4]byte
  1329. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1330. if _, err := f.Seek(4, 0); err != nil {
  1331. return
  1332. }
  1333. _, _ = f.Write(buf[:])
  1334. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1335. if _, err := f.Seek(24, 0); err != nil {
  1336. return
  1337. }
  1338. _, _ = f.Write(buf[:])
  1339. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1340. if _, err := f.Seek(28, 0); err != nil {
  1341. return
  1342. }
  1343. _, _ = f.Write(buf[:])
  1344. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1345. if _, err := f.Seek(40, 0); err != nil {
  1346. return
  1347. }
  1348. _, _ = f.Write(buf[:])
  1349. }