Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1487 lines
43KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. lastAudioL float32
  37. lastAudioR float32
  38. prevAudioL float64 // second-to-last L sample for boundary transient detection
  39. lastAudioSet bool
  40. // listenOnly sessions have no WAV file and no disk I/O.
  41. // They exist solely to feed audio to live-listen subscribers.
  42. listenOnly bool
  43. // Recording state (nil/zero for listen-only sessions)
  44. dir string
  45. wavFile *os.File
  46. wavBuf *bufio.Writer
  47. wavSamples int64
  48. segmentIdx int
  49. sampleRate int // actual output audio sample rate (always streamAudioRate)
  50. channels int
  51. demodName string
  52. // --- Persistent DSP state for click-free streaming ---
  53. // Overlap-save: tail of previous extracted IQ snippet.
  54. overlapIQ []complex64
  55. // De-emphasis IIR state (persists across frames)
  56. deemphL float64
  57. deemphR float64
  58. // Stereo lock state for live WFM streaming
  59. stereoEnabled bool
  60. stereoOnCount int
  61. stereoOffCount int
  62. // Pilot-locked stereo PLL state (19kHz pilot)
  63. pilotPhase float64
  64. pilotFreq float64
  65. pilotAlpha float64
  66. pilotBeta float64
  67. pilotErrAvg float64
  68. pilotI float64
  69. pilotQ float64
  70. pilotLPAlpha float64
  71. // Polyphase resampler (replaces integer-decimate hack)
  72. monoResampler *dsp.Resampler
  73. monoResamplerRate int
  74. stereoResampler *dsp.StereoResampler
  75. stereoResamplerRate int
  76. // AQ-4: Stateful FIR filters for click-free stereo decode
  77. stereoFilterRate int
  78. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  79. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  80. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  81. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  82. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  83. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  84. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  85. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  86. // and avoids per-frame FIR recomputation)
  87. preDemodFIR *dsp.StatefulFIRComplex
  88. preDemodDecim int // cached decimation factor
  89. preDemodRate int // cached snipRate this FIR was built for
  90. preDemodCutoff float64 // cached cutoff
  91. preDemodDecimPhase int // stateful decimation phase (index offset into next frame)
  92. // AQ-2: De-emphasis config (µs, 0 = disabled)
  93. deemphasisUs float64
  94. // Scratch buffers — reused across frames to avoid GC pressure.
  95. // Grown as needed, never shrunk.
  96. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  97. scratchAudio []float32 // for stereo decode intermediates
  98. scratchPCM []byte // for PCM encoding
  99. // live-listen subscribers
  100. audioSubs []audioSub
  101. }
  102. type audioSub struct {
  103. id int64
  104. ch chan []byte
  105. }
  106. type RuntimeSignalInfo struct {
  107. DemodName string
  108. PlaybackMode string
  109. StereoState string
  110. Channels int
  111. SampleRate int
  112. }
  113. // AudioInfo describes the audio format of a live-listen subscription.
  114. // Sent to the WebSocket client as the first message.
  115. type AudioInfo struct {
  116. SampleRate int `json:"sample_rate"`
  117. Channels int `json:"channels"`
  118. Format string `json:"format"` // always "s16le"
  119. DemodName string `json:"demod"`
  120. PlaybackMode string `json:"playback_mode,omitempty"`
  121. StereoState string `json:"stereo_state,omitempty"`
  122. }
  123. const (
  124. streamAudioRate = 48000
  125. resamplerTaps = 32 // taps per polyphase arm — good quality
  126. )
  127. // ---------------------------------------------------------------------------
  128. // Streamer — manages all active streaming sessions
  129. // ---------------------------------------------------------------------------
  130. type streamFeedItem struct {
  131. signal detector.Signal
  132. snippet []complex64
  133. snipRate int
  134. }
  135. type streamFeedMsg struct {
  136. traceID uint64
  137. items []streamFeedItem
  138. }
  139. type Streamer struct {
  140. mu sync.Mutex
  141. sessions map[int64]*streamSession
  142. policy Policy
  143. centerHz float64
  144. nextSub int64
  145. feedCh chan streamFeedMsg
  146. done chan struct{}
  147. droppedFeed uint64
  148. droppedPCM uint64
  149. lastFeedTS time.Time
  150. lastProcTS time.Time
  151. // pendingListens are subscribers waiting for a matching session.
  152. pendingListens map[int64]*pendingListen
  153. }
  154. type pendingListen struct {
  155. freq float64
  156. bw float64
  157. mode string
  158. ch chan []byte
  159. }
  160. func newStreamer(policy Policy, centerHz float64) *Streamer {
  161. st := &Streamer{
  162. sessions: make(map[int64]*streamSession),
  163. policy: policy,
  164. centerHz: centerHz,
  165. feedCh: make(chan streamFeedMsg, 2),
  166. done: make(chan struct{}),
  167. pendingListens: make(map[int64]*pendingListen),
  168. }
  169. go st.worker()
  170. return st
  171. }
  172. func (st *Streamer) worker() {
  173. for msg := range st.feedCh {
  174. st.processFeed(msg)
  175. }
  176. close(st.done)
  177. }
  178. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  179. st.mu.Lock()
  180. defer st.mu.Unlock()
  181. wasEnabled := st.policy.Enabled
  182. st.policy = policy
  183. st.centerHz = centerHz
  184. // If recording was just disabled, close recording sessions
  185. // but keep listen-only sessions alive.
  186. if wasEnabled && !policy.Enabled {
  187. for id, sess := range st.sessions {
  188. if sess.listenOnly {
  189. continue
  190. }
  191. if len(sess.audioSubs) > 0 {
  192. // Convert to listen-only: close WAV but keep session
  193. convertToListenOnly(sess)
  194. } else {
  195. closeSession(sess, &st.policy)
  196. delete(st.sessions, id)
  197. }
  198. }
  199. }
  200. }
  201. // HasListeners returns true if any sessions have audio subscribers
  202. // or there are pending listen requests. Used by the DSP loop to
  203. // decide whether to feed snippets even when recording is disabled.
  204. func (st *Streamer) HasListeners() bool {
  205. st.mu.Lock()
  206. defer st.mu.Unlock()
  207. return st.hasListenersLocked()
  208. }
  209. func (st *Streamer) hasListenersLocked() bool {
  210. if len(st.pendingListens) > 0 {
  211. return true
  212. }
  213. for _, sess := range st.sessions {
  214. if len(sess.audioSubs) > 0 {
  215. return true
  216. }
  217. }
  218. return false
  219. }
  220. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  221. // Feeds are accepted if:
  222. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  223. // - Any live-listen subscribers exist (listen-only mode)
  224. //
  225. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  226. // data, so items can be passed directly without another copy.
  227. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  228. st.mu.Lock()
  229. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  230. hasListeners := st.hasListenersLocked()
  231. pending := len(st.pendingListens)
  232. debugLiveAudio := st.policy.DebugLiveAudio
  233. now := time.Now()
  234. if !st.lastFeedTS.IsZero() {
  235. gap := now.Sub(st.lastFeedTS)
  236. if gap > 150*time.Millisecond {
  237. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  238. }
  239. }
  240. st.lastFeedTS = now
  241. st.mu.Unlock()
  242. if debugLiveAudio {
  243. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  244. }
  245. if (!recEnabled && !hasListeners) || len(items) == 0 {
  246. return
  247. }
  248. select {
  249. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}:
  250. default:
  251. st.droppedFeed++
  252. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  253. }
  254. }
  255. // processFeed runs in the worker goroutine.
  256. func (st *Streamer) processFeed(msg streamFeedMsg) {
  257. st.mu.Lock()
  258. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  259. hasListeners := st.hasListenersLocked()
  260. now := time.Now()
  261. if !st.lastProcTS.IsZero() {
  262. gap := now.Sub(st.lastProcTS)
  263. if gap > 150*time.Millisecond {
  264. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  265. }
  266. }
  267. st.lastProcTS = now
  268. defer st.mu.Unlock()
  269. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  270. if !recEnabled && !hasListeners {
  271. return
  272. }
  273. seen := make(map[int64]bool, len(msg.items))
  274. for i := range msg.items {
  275. item := &msg.items[i]
  276. sig := &item.signal
  277. seen[sig.ID] = true
  278. if sig.ID == 0 || sig.Class == nil {
  279. continue
  280. }
  281. if len(item.snippet) == 0 || item.snipRate <= 0 {
  282. continue
  283. }
  284. // Decide whether this signal needs a session
  285. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  286. needsListen := st.signalHasListenerLocked(sig)
  287. className := "<nil>"
  288. demodName := ""
  289. if sig.Class != nil {
  290. className = string(sig.Class.ModType)
  291. demodName, _ = resolveDemod(sig)
  292. }
  293. if st.policy.DebugLiveAudio {
  294. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  295. }
  296. if !needsRecording && !needsListen {
  297. continue
  298. }
  299. sess, exists := st.sessions[sig.ID]
  300. requestedMode := ""
  301. for _, pl := range st.pendingListens {
  302. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  303. if m := normalizeRequestedMode(pl.mode); m != "" {
  304. requestedMode = m
  305. break
  306. }
  307. }
  308. }
  309. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  310. for _, sub := range sess.audioSubs {
  311. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  312. }
  313. delete(st.sessions, sig.ID)
  314. sess = nil
  315. exists = false
  316. }
  317. if !exists {
  318. if needsRecording {
  319. s, err := st.openRecordingSession(sig, now)
  320. if err != nil {
  321. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  322. sig.ID, sig.CenterHz/1e6, err)
  323. continue
  324. }
  325. st.sessions[sig.ID] = s
  326. sess = s
  327. } else {
  328. s := st.openListenSession(sig, now)
  329. st.sessions[sig.ID] = s
  330. sess = s
  331. }
  332. // Attach any pending listeners
  333. st.attachPendingListeners(sess)
  334. }
  335. // Update metadata
  336. sess.lastFeed = now
  337. sess.centerHz = sig.CenterHz
  338. sess.bwHz = sig.BWHz
  339. if sig.SNRDb > sess.snrDb {
  340. sess.snrDb = sig.SNRDb
  341. }
  342. if sig.PeakDb > sess.peakDb {
  343. sess.peakDb = sig.PeakDb
  344. }
  345. if sig.Class != nil {
  346. sess.class = sig.Class
  347. }
  348. // Demod with persistent state
  349. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  350. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  351. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  352. if len(audio) == 0 {
  353. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  354. }
  355. if len(audio) > 0 {
  356. if sess.wavSamples == 0 && audioRate > 0 {
  357. sess.sampleRate = audioRate
  358. }
  359. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  360. pcmLen := len(audio) * 2
  361. pcm := sess.growPCM(pcmLen)
  362. for k, s := range audio {
  363. v := int16(clip(s * 32767))
  364. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  365. }
  366. if !sess.listenOnly && sess.wavBuf != nil {
  367. n, err := sess.wavBuf.Write(pcm)
  368. if err != nil {
  369. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  370. } else {
  371. sess.wavSamples += int64(n / 2)
  372. }
  373. }
  374. // Gap logging for live-audio sessions + transient click detector
  375. if len(sess.audioSubs) > 0 {
  376. if !sess.lastAudioTs.IsZero() {
  377. gap := time.Since(sess.lastAudioTs)
  378. if gap > 150*time.Millisecond {
  379. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  380. }
  381. }
  382. // Transient click detector: finds short impulses (1-3 samples)
  383. // that deviate sharply from the local signal trend.
  384. // A click looks like: ...smooth... SPIKE ...smooth...
  385. // Normal FM audio has large deltas too, but they follow
  386. // a continuous curve. A click has high |d2/dt2| (acceleration).
  387. //
  388. // Method: second-derivative detector. For each sample triplet
  389. // (a, b, c), compute |2b - a - c| which is the discrete
  390. // second derivative magnitude. High values = transient spike.
  391. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  392. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  393. stride := sess.channels
  394. if stride < 1 {
  395. stride = 1
  396. }
  397. nFrames := len(audio) / stride
  398. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  399. if sess.lastAudioSet && nFrames >= 1 {
  400. // second derivative across boundary: |2*last - prevLast - first|
  401. first := float64(audio[0])
  402. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  403. if d2 > 0.15 {
  404. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  405. }
  406. }
  407. // Intra-frame transient scan (L channel only for performance)
  408. nClicks := 0
  409. maxD2 := float64(0)
  410. maxD2Pos := 0
  411. for k := 1; k < nFrames-1; k++ {
  412. a := float64(audio[(k-1)*stride])
  413. b := float64(audio[k*stride])
  414. c := float64(audio[(k+1)*stride])
  415. d2 := math.Abs(2*b - a - c)
  416. if d2 > maxD2 {
  417. maxD2 = d2
  418. maxD2Pos = k
  419. }
  420. if d2 > 0.15 {
  421. nClicks++
  422. }
  423. }
  424. if nClicks > 0 {
  425. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  426. }
  427. // Store last two samples for next frame's boundary check
  428. if nFrames >= 2 {
  429. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  430. sess.lastAudioL = audio[(nFrames-1)*stride]
  431. if stride > 1 {
  432. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  433. }
  434. } else if nFrames == 1 {
  435. sess.prevAudioL = float64(sess.lastAudioL)
  436. sess.lastAudioL = audio[0]
  437. if stride > 1 && len(audio) >= 2 {
  438. sess.lastAudioR = audio[1]
  439. }
  440. }
  441. sess.lastAudioSet = true
  442. }
  443. sess.lastAudioTs = time.Now()
  444. }
  445. st.fanoutPCM(sess, pcm, pcmLen)
  446. }
  447. // Segment split (recording sessions only)
  448. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  449. segIdx := sess.segmentIdx + 1
  450. oldSubs := sess.audioSubs
  451. oldState := sess.captureDSPState()
  452. sess.audioSubs = nil
  453. closeSession(sess, &st.policy)
  454. s, err := st.openRecordingSession(sig, now)
  455. if err != nil {
  456. delete(st.sessions, sig.ID)
  457. continue
  458. }
  459. s.segmentIdx = segIdx
  460. s.audioSubs = oldSubs
  461. s.restoreDSPState(oldState)
  462. st.sessions[sig.ID] = s
  463. }
  464. }
  465. // Close sessions for disappeared signals (with grace period)
  466. for id, sess := range st.sessions {
  467. if seen[id] {
  468. continue
  469. }
  470. gracePeriod := 3 * time.Second
  471. if sess.listenOnly {
  472. gracePeriod = 5 * time.Second
  473. }
  474. if now.Sub(sess.lastFeed) > gracePeriod {
  475. for _, sub := range sess.audioSubs {
  476. close(sub.ch)
  477. }
  478. sess.audioSubs = nil
  479. if !sess.listenOnly {
  480. closeSession(sess, &st.policy)
  481. }
  482. delete(st.sessions, id)
  483. }
  484. }
  485. }
  486. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  487. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  488. if st.policy.DebugLiveAudio {
  489. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  490. }
  491. return true
  492. }
  493. for subID, pl := range st.pendingListens {
  494. delta := math.Abs(sig.CenterHz - pl.freq)
  495. if delta < 200000 {
  496. if st.policy.DebugLiveAudio {
  497. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  498. }
  499. return true
  500. }
  501. }
  502. return false
  503. }
  504. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  505. for subID, pl := range st.pendingListens {
  506. requestedMode := normalizeRequestedMode(pl.mode)
  507. if requestedMode != "" && sess.demodName != requestedMode {
  508. continue
  509. }
  510. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  511. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  512. delete(st.pendingListens, subID)
  513. // Send updated audio_info now that we know the real session params.
  514. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  515. infoJSON, _ := json.Marshal(sess.audioInfo())
  516. tagged := make([]byte, 1+len(infoJSON))
  517. tagged[0] = 0x00 // tag: audio_info
  518. copy(tagged[1:], infoJSON)
  519. select {
  520. case pl.ch <- tagged:
  521. default:
  522. }
  523. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  524. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  525. }
  526. }
  527. }
  528. // CloseAll finalises all sessions and stops the worker goroutine.
  529. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  530. st.mu.Lock()
  531. defer st.mu.Unlock()
  532. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  533. for _, sess := range st.sessions {
  534. out[sess.signalID] = RuntimeSignalInfo{
  535. DemodName: sess.demodName,
  536. PlaybackMode: sess.playbackMode,
  537. StereoState: sess.stereoState,
  538. Channels: sess.channels,
  539. SampleRate: sess.sampleRate,
  540. }
  541. }
  542. return out
  543. }
  544. func (st *Streamer) CloseAll() {
  545. close(st.feedCh)
  546. <-st.done
  547. st.mu.Lock()
  548. defer st.mu.Unlock()
  549. for id, sess := range st.sessions {
  550. for _, sub := range sess.audioSubs {
  551. close(sub.ch)
  552. }
  553. sess.audioSubs = nil
  554. if !sess.listenOnly {
  555. closeSession(sess, &st.policy)
  556. }
  557. delete(st.sessions, id)
  558. }
  559. for _, pl := range st.pendingListens {
  560. close(pl.ch)
  561. }
  562. st.pendingListens = nil
  563. }
  564. // ActiveSessions returns the number of open streaming sessions.
  565. func (st *Streamer) ActiveSessions() int {
  566. st.mu.Lock()
  567. defer st.mu.Unlock()
  568. return len(st.sessions)
  569. }
  570. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  571. //
  572. // LL-2: Returns AudioInfo with correct channels and sample rate.
  573. // LL-3: Returns error only on hard failures (nil streamer etc).
  574. //
  575. // If a matching session exists, attaches immediately. Otherwise, the
  576. // subscriber is held as "pending" and will be attached when a matching
  577. // signal appears in the next DSP frame.
  578. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  579. ch := make(chan []byte, 64)
  580. st.mu.Lock()
  581. defer st.mu.Unlock()
  582. st.nextSub++
  583. subID := st.nextSub
  584. requestedMode := normalizeRequestedMode(mode)
  585. // Try to find a matching session
  586. var bestSess *streamSession
  587. bestDist := math.MaxFloat64
  588. for _, sess := range st.sessions {
  589. if requestedMode != "" && sess.demodName != requestedMode {
  590. continue
  591. }
  592. d := math.Abs(sess.centerHz - freq)
  593. if d < bestDist {
  594. bestDist = d
  595. bestSess = sess
  596. }
  597. }
  598. if bestSess != nil && bestDist < 200000 {
  599. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  600. info := bestSess.audioInfo()
  601. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  602. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  603. return subID, ch, info, nil
  604. }
  605. // No matching session yet — add as pending listener
  606. st.pendingListens[subID] = &pendingListen{
  607. freq: freq,
  608. bw: bw,
  609. mode: mode,
  610. ch: ch,
  611. }
  612. info := defaultAudioInfoForMode(mode)
  613. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  614. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  615. return subID, ch, info, nil
  616. }
  617. // UnsubscribeAudio removes a live-listen subscriber.
  618. func (st *Streamer) UnsubscribeAudio(subID int64) {
  619. st.mu.Lock()
  620. defer st.mu.Unlock()
  621. if pl, ok := st.pendingListens[subID]; ok {
  622. close(pl.ch)
  623. delete(st.pendingListens, subID)
  624. return
  625. }
  626. for _, sess := range st.sessions {
  627. for i, sub := range sess.audioSubs {
  628. if sub.id == subID {
  629. close(sub.ch)
  630. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  631. return
  632. }
  633. }
  634. }
  635. }
  636. // ---------------------------------------------------------------------------
  637. // Session: stateful extraction + demod
  638. // ---------------------------------------------------------------------------
  639. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  640. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  641. // output with zero transient artifacts.
  642. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  643. if len(snippet) == 0 || snipRate <= 0 {
  644. return nil, 0
  645. }
  646. isWFMStereo := sess.demodName == "WFM_STEREO"
  647. isWFM := sess.demodName == "WFM" || isWFMStereo
  648. demodName := sess.demodName
  649. if isWFMStereo {
  650. demodName = "WFM"
  651. }
  652. d := demod.Get(demodName)
  653. if d == nil {
  654. d = demod.Get("NFM")
  655. }
  656. if d == nil {
  657. return nil, 0
  658. }
  659. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  660. // The FM discriminator needs iq[i-1] to compute the first output.
  661. // All FIR filtering is now stateful, so no additional overlap is needed.
  662. var fullSnip []complex64
  663. trimSamples := 0
  664. _ = trimSamples
  665. if len(sess.overlapIQ) == 1 {
  666. fullSnip = make([]complex64, 1+len(snippet))
  667. fullSnip[0] = sess.overlapIQ[0]
  668. copy(fullSnip[1:], snippet)
  669. trimSamples = 1
  670. logging.Debug("discrim", "overlap_applied", "signal", sess.signalID, "snip", len(snippet))
  671. } else {
  672. fullSnip = snippet
  673. }
  674. // Save last sample for next frame's FM discriminator
  675. if len(snippet) > 0 {
  676. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  677. }
  678. // --- Stateful anti-alias FIR + decimation to demod rate ---
  679. demodRate := d.OutputSampleRate()
  680. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  681. if decim1 < 1 {
  682. decim1 = 1
  683. }
  684. actualDemodRate := snipRate / decim1
  685. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  686. var dec []complex64
  687. if decim1 > 1 {
  688. // For WFM: skip the pre-demod anti-alias FIR entirely.
  689. // FM broadcast has ±75kHz deviation. The old cutoff at
  690. // actualDemodRate/2*0.8 = 68kHz was BELOW the FM deviation,
  691. // clipping the modulation and causing amplitude dips that
  692. // the FM discriminator turned into audible clicks (4-5/sec).
  693. // The extraction stage already bandlimited to ±125kHz (BW/2 * bwMult),
  694. // which is sufficient anti-alias protection for the 512k→170k decimation.
  695. //
  696. // For NFM/other: use a cutoff that preserves the full signal bandwidth.
  697. if isWFM {
  698. // WFM: decimate without FIR — extraction filter is sufficient
  699. dec = dsp.DecimateStateful(fullSnip, decim1, &sess.preDemodDecimPhase)
  700. } else {
  701. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  702. // Lazy-init or reinit stateful FIR if parameters changed
  703. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  704. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  705. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  706. sess.preDemodRate = snipRate
  707. sess.preDemodCutoff = cutoff
  708. sess.preDemodDecim = decim1
  709. sess.preDemodDecimPhase = 0
  710. }
  711. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  712. dec = dsp.DecimateStateful(filtered, decim1, &sess.preDemodDecimPhase)
  713. }
  714. } else {
  715. dec = fullSnip
  716. }
  717. // --- FM Demod ---
  718. audio := d.Demod(dec, actualDemodRate)
  719. if len(audio) == 0 {
  720. return nil, 0
  721. }
  722. // --- Trim the 1-sample FM discriminator overlap ---
  723. // TEMP: skip audio trim to test if per-block trimming causes ticks
  724. // --- Stateful stereo decode with conservative lock/hysteresis ---
  725. channels := 1
  726. if isWFMStereo {
  727. sess.playbackMode = "WFM_STEREO"
  728. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  729. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  730. if locked {
  731. sess.stereoOnCount++
  732. sess.stereoOffCount = 0
  733. if sess.stereoOnCount >= 4 {
  734. sess.stereoEnabled = true
  735. }
  736. } else {
  737. sess.stereoOnCount = 0
  738. sess.stereoOffCount++
  739. if sess.stereoOffCount >= 10 {
  740. sess.stereoEnabled = false
  741. }
  742. }
  743. prevPlayback := sess.playbackMode
  744. prevStereo := sess.stereoState
  745. if sess.stereoEnabled && len(stereoAudio) > 0 {
  746. sess.stereoState = "locked"
  747. audio = stereoAudio
  748. } else {
  749. sess.stereoState = "mono-fallback"
  750. dual := make([]float32, len(audio)*2)
  751. for i, s := range audio {
  752. dual[i*2] = s
  753. dual[i*2+1] = s
  754. }
  755. audio = dual
  756. }
  757. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  758. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  759. }
  760. }
  761. // --- Polyphase resample to exact 48kHz ---
  762. if actualDemodRate != streamAudioRate {
  763. if channels > 1 {
  764. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  765. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  766. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  767. sess.stereoResamplerRate = actualDemodRate
  768. }
  769. audio = sess.stereoResampler.Process(audio)
  770. } else {
  771. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  772. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  773. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  774. sess.monoResamplerRate = actualDemodRate
  775. }
  776. audio = sess.monoResampler.Process(audio)
  777. }
  778. }
  779. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  780. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  781. tau := sess.deemphasisUs * 1e-6
  782. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  783. if channels > 1 {
  784. nFrames := len(audio) / channels
  785. yL, yR := sess.deemphL, sess.deemphR
  786. for i := 0; i < nFrames; i++ {
  787. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  788. audio[i*2] = float32(yL)
  789. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  790. audio[i*2+1] = float32(yR)
  791. }
  792. sess.deemphL, sess.deemphR = yL, yR
  793. } else {
  794. y := sess.deemphL
  795. for i := range audio {
  796. y = alpha*y + (1-alpha)*float64(audio[i])
  797. audio[i] = float32(y)
  798. }
  799. sess.deemphL = y
  800. }
  801. }
  802. if isWFM {
  803. for i := range audio {
  804. audio[i] *= 0.35
  805. }
  806. }
  807. return audio, streamAudioRate
  808. }
  809. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  810. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  811. // loopBW is in Hz, sampleRate in samples/sec.
  812. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  813. if sampleRate <= 0 || loopBW <= 0 {
  814. return 0, 0
  815. }
  816. bl := loopBW / float64(sampleRate)
  817. theta := bl / (damping + 0.25/damping)
  818. d := 1 + 2*damping*theta + theta*theta
  819. alpha := (4 * damping * theta) / d
  820. beta := (4 * theta * theta) / d
  821. return alpha, beta
  822. }
  823. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  824. // Uses persistent FIR filter state across frames for click-free stereo.
  825. // Reuses session scratch buffers to minimize allocations.
  826. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  827. if len(mono) == 0 || sampleRate <= 0 {
  828. return nil, false
  829. }
  830. n := len(mono)
  831. // Rebuild rate-dependent stereo filters when sampleRate changes
  832. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  833. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  834. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  835. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  836. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  837. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  838. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  839. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  840. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  841. sess.stereoFilterRate = sampleRate
  842. // Initialize PLL for 19kHz pilot tracking.
  843. sess.pilotPhase = 0
  844. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  845. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  846. sess.pilotErrAvg = 0
  847. sess.pilotI = 0
  848. sess.pilotQ = 0
  849. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  850. }
  851. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  852. scratch := sess.growAudio(n * 5)
  853. lpr := scratch[:n]
  854. bpfLR := scratch[n : 2*n]
  855. lr := scratch[2*n : 3*n]
  856. work1 := scratch[3*n : 4*n]
  857. work2 := scratch[4*n : 5*n]
  858. sess.stereoLPF.ProcessInto(mono, lpr)
  859. // 23-53kHz bandpass for L-R DSB-SC.
  860. sess.stereoBPHi.ProcessInto(mono, work1)
  861. sess.stereoBPLo.ProcessInto(mono, work2)
  862. for i := 0; i < n; i++ {
  863. bpfLR[i] = work1[i] - work2[i]
  864. }
  865. // 19kHz pilot bandpass for PLL.
  866. sess.pilotLPFHi.ProcessInto(mono, work1)
  867. sess.pilotLPFLo.ProcessInto(mono, work2)
  868. for i := 0; i < n; i++ {
  869. work1[i] = work1[i] - work2[i]
  870. }
  871. pilot := work1
  872. phase := sess.pilotPhase
  873. freq := sess.pilotFreq
  874. alpha := sess.pilotAlpha
  875. beta := sess.pilotBeta
  876. iState := sess.pilotI
  877. qState := sess.pilotQ
  878. lpAlpha := sess.pilotLPAlpha
  879. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  880. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  881. var pilotPower float64
  882. var totalPower float64
  883. var errSum float64
  884. for i := 0; i < n; i++ {
  885. p := float64(pilot[i])
  886. sinP, cosP := math.Sincos(phase)
  887. iMix := p * cosP
  888. qMix := p * -sinP
  889. iState += lpAlpha * (iMix - iState)
  890. qState += lpAlpha * (qMix - qState)
  891. err := math.Atan2(qState, iState)
  892. freq += beta * err
  893. if freq < minFreq {
  894. freq = minFreq
  895. } else if freq > maxFreq {
  896. freq = maxFreq
  897. }
  898. phase += freq + alpha*err
  899. if phase > 2*math.Pi {
  900. phase -= 2 * math.Pi
  901. } else if phase < 0 {
  902. phase += 2 * math.Pi
  903. }
  904. totalPower += float64(mono[i]) * float64(mono[i])
  905. pilotPower += p * p
  906. errSum += math.Abs(err)
  907. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  908. }
  909. sess.pilotPhase = phase
  910. sess.pilotFreq = freq
  911. sess.pilotI = iState
  912. sess.pilotQ = qState
  913. blockErr := errSum / float64(n)
  914. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  915. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  916. pilotRatio := 0.0
  917. if totalPower > 0 {
  918. pilotRatio = pilotPower / totalPower
  919. }
  920. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  921. // Lock heuristics: pilot power fraction and PLL phase error stability.
  922. // Pilot power is a small but stable fraction of composite energy; require
  923. // a modest floor plus PLL settling to avoid flapping in noise.
  924. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  925. out := make([]float32, n*2)
  926. for i := 0; i < n; i++ {
  927. out[i*2] = 0.5 * (lpr[i] + lr[i])
  928. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  929. }
  930. return out, locked
  931. }
  932. // dspStateSnapshot captures persistent DSP state for segment splits.
  933. type dspStateSnapshot struct {
  934. overlapIQ []complex64
  935. deemphL float64
  936. deemphR float64
  937. pilotPhase float64
  938. pilotFreq float64
  939. pilotAlpha float64
  940. pilotBeta float64
  941. pilotErrAvg float64
  942. pilotI float64
  943. pilotQ float64
  944. pilotLPAlpha float64
  945. monoResampler *dsp.Resampler
  946. monoResamplerRate int
  947. stereoResampler *dsp.StereoResampler
  948. stereoResamplerRate int
  949. stereoLPF *dsp.StatefulFIRReal
  950. stereoFilterRate int
  951. stereoBPHi *dsp.StatefulFIRReal
  952. stereoBPLo *dsp.StatefulFIRReal
  953. stereoLRLPF *dsp.StatefulFIRReal
  954. stereoAALPF *dsp.StatefulFIRReal
  955. pilotLPFHi *dsp.StatefulFIRReal
  956. pilotLPFLo *dsp.StatefulFIRReal
  957. preDemodFIR *dsp.StatefulFIRComplex
  958. preDemodDecim int
  959. preDemodRate int
  960. preDemodCutoff float64
  961. preDemodDecimPhase int
  962. }
  963. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  964. return dspStateSnapshot{
  965. overlapIQ: sess.overlapIQ,
  966. deemphL: sess.deemphL,
  967. deemphR: sess.deemphR,
  968. pilotPhase: sess.pilotPhase,
  969. pilotFreq: sess.pilotFreq,
  970. pilotAlpha: sess.pilotAlpha,
  971. pilotBeta: sess.pilotBeta,
  972. pilotErrAvg: sess.pilotErrAvg,
  973. pilotI: sess.pilotI,
  974. pilotQ: sess.pilotQ,
  975. pilotLPAlpha: sess.pilotLPAlpha,
  976. monoResampler: sess.monoResampler,
  977. monoResamplerRate: sess.monoResamplerRate,
  978. stereoResampler: sess.stereoResampler,
  979. stereoResamplerRate: sess.stereoResamplerRate,
  980. stereoLPF: sess.stereoLPF,
  981. stereoFilterRate: sess.stereoFilterRate,
  982. stereoBPHi: sess.stereoBPHi,
  983. stereoBPLo: sess.stereoBPLo,
  984. stereoLRLPF: sess.stereoLRLPF,
  985. stereoAALPF: sess.stereoAALPF,
  986. pilotLPFHi: sess.pilotLPFHi,
  987. pilotLPFLo: sess.pilotLPFLo,
  988. preDemodFIR: sess.preDemodFIR,
  989. preDemodDecim: sess.preDemodDecim,
  990. preDemodRate: sess.preDemodRate,
  991. preDemodCutoff: sess.preDemodCutoff,
  992. preDemodDecimPhase: sess.preDemodDecimPhase,
  993. }
  994. }
  995. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  996. sess.overlapIQ = s.overlapIQ
  997. sess.deemphL = s.deemphL
  998. sess.deemphR = s.deemphR
  999. sess.pilotPhase = s.pilotPhase
  1000. sess.pilotFreq = s.pilotFreq
  1001. sess.pilotAlpha = s.pilotAlpha
  1002. sess.pilotBeta = s.pilotBeta
  1003. sess.pilotErrAvg = s.pilotErrAvg
  1004. sess.pilotI = s.pilotI
  1005. sess.pilotQ = s.pilotQ
  1006. sess.pilotLPAlpha = s.pilotLPAlpha
  1007. sess.monoResampler = s.monoResampler
  1008. sess.monoResamplerRate = s.monoResamplerRate
  1009. sess.stereoResampler = s.stereoResampler
  1010. sess.stereoResamplerRate = s.stereoResamplerRate
  1011. sess.stereoLPF = s.stereoLPF
  1012. sess.stereoFilterRate = s.stereoFilterRate
  1013. sess.stereoBPHi = s.stereoBPHi
  1014. sess.stereoBPLo = s.stereoBPLo
  1015. sess.stereoLRLPF = s.stereoLRLPF
  1016. sess.stereoAALPF = s.stereoAALPF
  1017. sess.pilotLPFHi = s.pilotLPFHi
  1018. sess.pilotLPFLo = s.pilotLPFLo
  1019. sess.preDemodFIR = s.preDemodFIR
  1020. sess.preDemodDecim = s.preDemodDecim
  1021. sess.preDemodRate = s.preDemodRate
  1022. sess.preDemodCutoff = s.preDemodCutoff
  1023. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1024. }
  1025. // ---------------------------------------------------------------------------
  1026. // Session management helpers
  1027. // ---------------------------------------------------------------------------
  1028. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1029. outputDir := st.policy.OutputDir
  1030. if outputDir == "" {
  1031. outputDir = "data/recordings"
  1032. }
  1033. demodName, channels := resolveDemod(sig)
  1034. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1035. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1036. dir := filepath.Join(outputDir, dirName)
  1037. if err := os.MkdirAll(dir, 0o755); err != nil {
  1038. return nil, err
  1039. }
  1040. wavPath := filepath.Join(dir, "audio.wav")
  1041. f, err := os.Create(wavPath)
  1042. if err != nil {
  1043. return nil, err
  1044. }
  1045. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1046. f.Close()
  1047. return nil, err
  1048. }
  1049. playbackMode, stereoState := initialPlaybackState(demodName)
  1050. sess := &streamSession{
  1051. signalID: sig.ID,
  1052. centerHz: sig.CenterHz,
  1053. bwHz: sig.BWHz,
  1054. snrDb: sig.SNRDb,
  1055. peakDb: sig.PeakDb,
  1056. class: sig.Class,
  1057. startTime: now,
  1058. lastFeed: now,
  1059. dir: dir,
  1060. wavFile: f,
  1061. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1062. sampleRate: streamAudioRate,
  1063. channels: channels,
  1064. demodName: demodName,
  1065. playbackMode: playbackMode,
  1066. stereoState: stereoState,
  1067. deemphasisUs: st.policy.DeemphasisUs,
  1068. }
  1069. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1070. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1071. return sess, nil
  1072. }
  1073. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1074. demodName, channels := resolveDemod(sig)
  1075. for _, pl := range st.pendingListens {
  1076. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1077. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1078. demodName = requested
  1079. if demodName == "WFM_STEREO" {
  1080. channels = 2
  1081. } else if d := demod.Get(demodName); d != nil {
  1082. channels = d.Channels()
  1083. } else {
  1084. channels = 1
  1085. }
  1086. break
  1087. }
  1088. }
  1089. }
  1090. playbackMode, stereoState := initialPlaybackState(demodName)
  1091. sess := &streamSession{
  1092. signalID: sig.ID,
  1093. centerHz: sig.CenterHz,
  1094. bwHz: sig.BWHz,
  1095. snrDb: sig.SNRDb,
  1096. peakDb: sig.PeakDb,
  1097. class: sig.Class,
  1098. startTime: now,
  1099. lastFeed: now,
  1100. listenOnly: true,
  1101. sampleRate: streamAudioRate,
  1102. channels: channels,
  1103. demodName: demodName,
  1104. playbackMode: playbackMode,
  1105. stereoState: stereoState,
  1106. deemphasisUs: st.policy.DeemphasisUs,
  1107. }
  1108. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1109. sig.ID, sig.CenterHz/1e6, demodName)
  1110. return sess
  1111. }
  1112. func resolveDemod(sig *detector.Signal) (string, int) {
  1113. demodName := "NFM"
  1114. if sig.Class != nil {
  1115. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1116. demodName = n
  1117. }
  1118. }
  1119. channels := 1
  1120. if demodName == "WFM_STEREO" {
  1121. channels = 2
  1122. } else if d := demod.Get(demodName); d != nil {
  1123. channels = d.Channels()
  1124. }
  1125. return demodName, channels
  1126. }
  1127. func initialPlaybackState(demodName string) (string, string) {
  1128. playbackMode := demodName
  1129. stereoState := "mono"
  1130. if demodName == "WFM_STEREO" {
  1131. stereoState = "searching"
  1132. }
  1133. return playbackMode, stereoState
  1134. }
  1135. func (sess *streamSession) audioInfo() AudioInfo {
  1136. return AudioInfo{
  1137. SampleRate: sess.sampleRate,
  1138. Channels: sess.channels,
  1139. Format: "s16le",
  1140. DemodName: sess.demodName,
  1141. PlaybackMode: sess.playbackMode,
  1142. StereoState: sess.stereoState,
  1143. }
  1144. }
  1145. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1146. infoJSON, _ := json.Marshal(info)
  1147. tagged := make([]byte, 1+len(infoJSON))
  1148. tagged[0] = 0x00 // tag: audio_info
  1149. copy(tagged[1:], infoJSON)
  1150. for _, sub := range subs {
  1151. select {
  1152. case sub.ch <- tagged:
  1153. default:
  1154. }
  1155. }
  1156. }
  1157. func defaultAudioInfoForMode(mode string) AudioInfo {
  1158. demodName := "NFM"
  1159. if requested := normalizeRequestedMode(mode); requested != "" {
  1160. demodName = requested
  1161. }
  1162. channels := 1
  1163. if demodName == "WFM_STEREO" {
  1164. channels = 2
  1165. } else if d := demod.Get(demodName); d != nil {
  1166. channels = d.Channels()
  1167. }
  1168. playbackMode, stereoState := initialPlaybackState(demodName)
  1169. return AudioInfo{
  1170. SampleRate: streamAudioRate,
  1171. Channels: channels,
  1172. Format: "s16le",
  1173. DemodName: demodName,
  1174. PlaybackMode: playbackMode,
  1175. StereoState: stereoState,
  1176. }
  1177. }
  1178. func normalizeRequestedMode(mode string) string {
  1179. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1180. case "", "AUTO":
  1181. return ""
  1182. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1183. return strings.ToUpper(strings.TrimSpace(mode))
  1184. default:
  1185. return ""
  1186. }
  1187. }
  1188. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1189. func (sess *streamSession) growIQ(n int) []complex64 {
  1190. if cap(sess.scratchIQ) >= n {
  1191. return sess.scratchIQ[:n]
  1192. }
  1193. sess.scratchIQ = make([]complex64, n, n*5/4)
  1194. return sess.scratchIQ
  1195. }
  1196. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1197. func (sess *streamSession) growAudio(n int) []float32 {
  1198. if cap(sess.scratchAudio) >= n {
  1199. return sess.scratchAudio[:n]
  1200. }
  1201. sess.scratchAudio = make([]float32, n, n*5/4)
  1202. return sess.scratchAudio
  1203. }
  1204. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1205. func (sess *streamSession) growPCM(n int) []byte {
  1206. if cap(sess.scratchPCM) >= n {
  1207. return sess.scratchPCM[:n]
  1208. }
  1209. sess.scratchPCM = make([]byte, n, n*5/4)
  1210. return sess.scratchPCM
  1211. }
  1212. func convertToListenOnly(sess *streamSession) {
  1213. if sess.wavBuf != nil {
  1214. _ = sess.wavBuf.Flush()
  1215. }
  1216. if sess.wavFile != nil {
  1217. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1218. sess.wavFile.Close()
  1219. }
  1220. sess.wavFile = nil
  1221. sess.wavBuf = nil
  1222. sess.listenOnly = true
  1223. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1224. }
  1225. func closeSession(sess *streamSession, policy *Policy) {
  1226. if sess.listenOnly {
  1227. return
  1228. }
  1229. if sess.wavBuf != nil {
  1230. _ = sess.wavBuf.Flush()
  1231. }
  1232. if sess.wavFile != nil {
  1233. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1234. sess.wavFile.Close()
  1235. sess.wavFile = nil
  1236. sess.wavBuf = nil
  1237. }
  1238. dur := sess.lastFeed.Sub(sess.startTime)
  1239. files := map[string]any{
  1240. "audio": "audio.wav",
  1241. "audio_sample_rate": sess.sampleRate,
  1242. "audio_channels": sess.channels,
  1243. "audio_demod": sess.demodName,
  1244. "recording_mode": "streaming",
  1245. }
  1246. meta := Meta{
  1247. EventID: sess.signalID,
  1248. Start: sess.startTime,
  1249. End: sess.lastFeed,
  1250. CenterHz: sess.centerHz,
  1251. BandwidthHz: sess.bwHz,
  1252. SampleRate: sess.sampleRate,
  1253. SNRDb: sess.snrDb,
  1254. PeakDb: sess.peakDb,
  1255. Class: sess.class,
  1256. DurationMs: dur.Milliseconds(),
  1257. Files: files,
  1258. }
  1259. b, err := json.MarshalIndent(meta, "", " ")
  1260. if err == nil {
  1261. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1262. }
  1263. if policy != nil {
  1264. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1265. }
  1266. }
  1267. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1268. if len(sess.audioSubs) == 0 {
  1269. return
  1270. }
  1271. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1272. tagged := make([]byte, 1+pcmLen)
  1273. tagged[0] = 0x01
  1274. copy(tagged[1:], pcm[:pcmLen])
  1275. alive := sess.audioSubs[:0]
  1276. for _, sub := range sess.audioSubs {
  1277. select {
  1278. case sub.ch <- tagged:
  1279. default:
  1280. st.droppedPCM++
  1281. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1282. }
  1283. alive = append(alive, sub)
  1284. }
  1285. sess.audioSubs = alive
  1286. }
  1287. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1288. if len(st.policy.ClassFilter) == 0 {
  1289. return true
  1290. }
  1291. if cls == nil {
  1292. return false
  1293. }
  1294. for _, f := range st.policy.ClassFilter {
  1295. if strings.EqualFold(f, string(cls.ModType)) {
  1296. return true
  1297. }
  1298. }
  1299. return false
  1300. }
  1301. // ErrNoSession is returned when no matching signal session exists.
  1302. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1303. // ---------------------------------------------------------------------------
  1304. // WAV header helpers
  1305. // ---------------------------------------------------------------------------
  1306. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1307. if channels <= 0 {
  1308. channels = 1
  1309. }
  1310. hdr := make([]byte, 44)
  1311. copy(hdr[0:4], "RIFF")
  1312. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1313. copy(hdr[8:12], "WAVE")
  1314. copy(hdr[12:16], "fmt ")
  1315. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1316. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1317. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1318. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1319. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1320. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1321. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1322. copy(hdr[36:40], "data")
  1323. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1324. _, err := f.Write(hdr)
  1325. return err
  1326. }
  1327. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1328. dataSize := uint32(totalSamples * 2)
  1329. var buf [4]byte
  1330. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1331. if _, err := f.Seek(4, 0); err != nil {
  1332. return
  1333. }
  1334. _, _ = f.Write(buf[:])
  1335. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1336. if _, err := f.Seek(24, 0); err != nil {
  1337. return
  1338. }
  1339. _, _ = f.Write(buf[:])
  1340. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1341. if _, err := f.Seek(28, 0); err != nil {
  1342. return
  1343. }
  1344. _, _ = f.Write(buf[:])
  1345. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1346. if _, err := f.Seek(40, 0); err != nil {
  1347. return
  1348. }
  1349. _, _ = f.Write(buf[:])
  1350. }