Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

1409 lignes
40KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. lastAudioL float32
  37. lastAudioR float32
  38. lastAudioSet bool
  39. // listenOnly sessions have no WAV file and no disk I/O.
  40. // They exist solely to feed audio to live-listen subscribers.
  41. listenOnly bool
  42. // Recording state (nil/zero for listen-only sessions)
  43. dir string
  44. wavFile *os.File
  45. wavBuf *bufio.Writer
  46. wavSamples int64
  47. segmentIdx int
  48. sampleRate int // actual output audio sample rate (always streamAudioRate)
  49. channels int
  50. demodName string
  51. // --- Persistent DSP state for click-free streaming ---
  52. // Overlap-save: tail of previous extracted IQ snippet.
  53. overlapIQ []complex64
  54. // De-emphasis IIR state (persists across frames)
  55. deemphL float64
  56. deemphR float64
  57. // Stereo lock state for live WFM streaming
  58. stereoEnabled bool
  59. stereoOnCount int
  60. stereoOffCount int
  61. // Pilot-locked stereo PLL state (19kHz pilot)
  62. pilotPhase float64
  63. pilotFreq float64
  64. pilotAlpha float64
  65. pilotBeta float64
  66. pilotErrAvg float64
  67. pilotI float64
  68. pilotQ float64
  69. pilotLPAlpha float64
  70. // Polyphase resampler (replaces integer-decimate hack)
  71. monoResampler *dsp.Resampler
  72. monoResamplerRate int
  73. stereoResampler *dsp.StereoResampler
  74. stereoResamplerRate int
  75. // AQ-4: Stateful FIR filters for click-free stereo decode
  76. stereoFilterRate int
  77. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  78. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  79. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  80. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  81. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  82. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  83. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  84. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  85. // and avoids per-frame FIR recomputation)
  86. preDemodFIR *dsp.StatefulFIRComplex
  87. preDemodDecim int // cached decimation factor
  88. preDemodRate int // cached snipRate this FIR was built for
  89. preDemodCutoff float64 // cached cutoff
  90. // AQ-2: De-emphasis config (µs, 0 = disabled)
  91. deemphasisUs float64
  92. // Scratch buffers — reused across frames to avoid GC pressure.
  93. // Grown as needed, never shrunk.
  94. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  95. scratchAudio []float32 // for stereo decode intermediates
  96. scratchPCM []byte // for PCM encoding
  97. // live-listen subscribers
  98. audioSubs []audioSub
  99. }
  100. type audioSub struct {
  101. id int64
  102. ch chan []byte
  103. }
  104. type RuntimeSignalInfo struct {
  105. DemodName string
  106. PlaybackMode string
  107. StereoState string
  108. Channels int
  109. SampleRate int
  110. }
  111. // AudioInfo describes the audio format of a live-listen subscription.
  112. // Sent to the WebSocket client as the first message.
  113. type AudioInfo struct {
  114. SampleRate int `json:"sample_rate"`
  115. Channels int `json:"channels"`
  116. Format string `json:"format"` // always "s16le"
  117. DemodName string `json:"demod"`
  118. PlaybackMode string `json:"playback_mode,omitempty"`
  119. StereoState string `json:"stereo_state,omitempty"`
  120. }
  121. const (
  122. streamAudioRate = 48000
  123. resamplerTaps = 32 // taps per polyphase arm — good quality
  124. )
  125. // ---------------------------------------------------------------------------
  126. // Streamer — manages all active streaming sessions
  127. // ---------------------------------------------------------------------------
  128. type streamFeedItem struct {
  129. signal detector.Signal
  130. snippet []complex64
  131. snipRate int
  132. }
  133. type streamFeedMsg struct {
  134. items []streamFeedItem
  135. }
  136. type Streamer struct {
  137. mu sync.Mutex
  138. sessions map[int64]*streamSession
  139. policy Policy
  140. centerHz float64
  141. nextSub int64
  142. feedCh chan streamFeedMsg
  143. done chan struct{}
  144. droppedFeed uint64
  145. droppedPCM uint64
  146. // pendingListens are subscribers waiting for a matching session.
  147. pendingListens map[int64]*pendingListen
  148. }
  149. type pendingListen struct {
  150. freq float64
  151. bw float64
  152. mode string
  153. ch chan []byte
  154. }
  155. func newStreamer(policy Policy, centerHz float64) *Streamer {
  156. st := &Streamer{
  157. sessions: make(map[int64]*streamSession),
  158. policy: policy,
  159. centerHz: centerHz,
  160. feedCh: make(chan streamFeedMsg, 2),
  161. done: make(chan struct{}),
  162. pendingListens: make(map[int64]*pendingListen),
  163. }
  164. go st.worker()
  165. return st
  166. }
  167. func (st *Streamer) worker() {
  168. for msg := range st.feedCh {
  169. st.processFeed(msg)
  170. }
  171. close(st.done)
  172. }
  173. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  174. st.mu.Lock()
  175. defer st.mu.Unlock()
  176. wasEnabled := st.policy.Enabled
  177. st.policy = policy
  178. st.centerHz = centerHz
  179. // If recording was just disabled, close recording sessions
  180. // but keep listen-only sessions alive.
  181. if wasEnabled && !policy.Enabled {
  182. for id, sess := range st.sessions {
  183. if sess.listenOnly {
  184. continue
  185. }
  186. if len(sess.audioSubs) > 0 {
  187. // Convert to listen-only: close WAV but keep session
  188. convertToListenOnly(sess)
  189. } else {
  190. closeSession(sess, &st.policy)
  191. delete(st.sessions, id)
  192. }
  193. }
  194. }
  195. }
  196. // HasListeners returns true if any sessions have audio subscribers
  197. // or there are pending listen requests. Used by the DSP loop to
  198. // decide whether to feed snippets even when recording is disabled.
  199. func (st *Streamer) HasListeners() bool {
  200. st.mu.Lock()
  201. defer st.mu.Unlock()
  202. return st.hasListenersLocked()
  203. }
  204. func (st *Streamer) hasListenersLocked() bool {
  205. if len(st.pendingListens) > 0 {
  206. return true
  207. }
  208. for _, sess := range st.sessions {
  209. if len(sess.audioSubs) > 0 {
  210. return true
  211. }
  212. }
  213. return false
  214. }
  215. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  216. // Feeds are accepted if:
  217. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  218. // - Any live-listen subscribers exist (listen-only mode)
  219. //
  220. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  221. // data, so items can be passed directly without another copy.
  222. func (st *Streamer) FeedSnippets(items []streamFeedItem) {
  223. st.mu.Lock()
  224. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  225. hasListeners := st.hasListenersLocked()
  226. pending := len(st.pendingListens)
  227. debugLiveAudio := st.policy.DebugLiveAudio
  228. st.mu.Unlock()
  229. if debugLiveAudio {
  230. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  231. }
  232. if (!recEnabled && !hasListeners) || len(items) == 0 {
  233. return
  234. }
  235. select {
  236. case st.feedCh <- streamFeedMsg{items: items}:
  237. default:
  238. st.droppedFeed++
  239. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  240. }
  241. }
  242. // processFeed runs in the worker goroutine.
  243. func (st *Streamer) processFeed(msg streamFeedMsg) {
  244. st.mu.Lock()
  245. defer st.mu.Unlock()
  246. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  247. hasListeners := st.hasListenersLocked()
  248. if !recEnabled && !hasListeners {
  249. return
  250. }
  251. now := time.Now()
  252. seen := make(map[int64]bool, len(msg.items))
  253. for i := range msg.items {
  254. item := &msg.items[i]
  255. sig := &item.signal
  256. seen[sig.ID] = true
  257. if sig.ID == 0 || sig.Class == nil {
  258. continue
  259. }
  260. if len(item.snippet) == 0 || item.snipRate <= 0 {
  261. continue
  262. }
  263. // Decide whether this signal needs a session
  264. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  265. needsListen := st.signalHasListenerLocked(sig)
  266. className := "<nil>"
  267. demodName := ""
  268. if sig.Class != nil {
  269. className = string(sig.Class.ModType)
  270. demodName, _ = resolveDemod(sig)
  271. }
  272. if st.policy.DebugLiveAudio {
  273. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  274. }
  275. if !needsRecording && !needsListen {
  276. continue
  277. }
  278. sess, exists := st.sessions[sig.ID]
  279. requestedMode := ""
  280. for _, pl := range st.pendingListens {
  281. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  282. if m := normalizeRequestedMode(pl.mode); m != "" {
  283. requestedMode = m
  284. break
  285. }
  286. }
  287. }
  288. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  289. for _, sub := range sess.audioSubs {
  290. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  291. }
  292. delete(st.sessions, sig.ID)
  293. sess = nil
  294. exists = false
  295. }
  296. if !exists {
  297. if needsRecording {
  298. s, err := st.openRecordingSession(sig, now)
  299. if err != nil {
  300. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  301. sig.ID, sig.CenterHz/1e6, err)
  302. continue
  303. }
  304. st.sessions[sig.ID] = s
  305. sess = s
  306. } else {
  307. s := st.openListenSession(sig, now)
  308. st.sessions[sig.ID] = s
  309. sess = s
  310. }
  311. // Attach any pending listeners
  312. st.attachPendingListeners(sess)
  313. }
  314. // Update metadata
  315. sess.lastFeed = now
  316. sess.centerHz = sig.CenterHz
  317. sess.bwHz = sig.BWHz
  318. if sig.SNRDb > sess.snrDb {
  319. sess.snrDb = sig.SNRDb
  320. }
  321. if sig.PeakDb > sess.peakDb {
  322. sess.peakDb = sig.PeakDb
  323. }
  324. if sig.Class != nil {
  325. sess.class = sig.Class
  326. }
  327. // Demod with persistent state
  328. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  329. if len(audio) > 0 {
  330. if sess.wavSamples == 0 && audioRate > 0 {
  331. sess.sampleRate = audioRate
  332. }
  333. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  334. pcmLen := len(audio) * 2
  335. pcm := sess.growPCM(pcmLen)
  336. for k, s := range audio {
  337. v := int16(clip(s * 32767))
  338. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  339. }
  340. if !sess.listenOnly && sess.wavBuf != nil {
  341. n, err := sess.wavBuf.Write(pcm)
  342. if err != nil {
  343. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  344. } else {
  345. sess.wavSamples += int64(n / 2)
  346. }
  347. }
  348. // Gap logging for live-audio sessions + boundary delta check
  349. if len(sess.audioSubs) > 0 {
  350. if !sess.lastAudioTs.IsZero() {
  351. gap := time.Since(sess.lastAudioTs)
  352. if gap > 150*time.Millisecond {
  353. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  354. }
  355. }
  356. // boundary delta (compare previous last sample with current first sample)
  357. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  358. if sess.lastAudioSet {
  359. if sess.channels > 1 && len(audio) >= 2 {
  360. dL := float64(audio[0] - sess.lastAudioL)
  361. dR := float64(audio[1] - sess.lastAudioR)
  362. if dL < 0 { dL = -dL }
  363. if dR < 0 { dR = -dR }
  364. if dL > 0.2 || dR > 0.2 {
  365. logging.Warn("boundary", "audio_step", "signal", sess.signalID, "dL", dL, "dR", dR)
  366. }
  367. } else {
  368. d := float64(audio[0] - sess.lastAudioL)
  369. if d < 0 { d = -d }
  370. if d > 0.2 {
  371. logging.Warn("boundary", "audio_step", "signal", sess.signalID, "dL", d)
  372. }
  373. }
  374. }
  375. // store last sample
  376. if sess.channels > 1 {
  377. lastIdx := (len(audio)-2)
  378. if lastIdx < 0 { lastIdx = 0 }
  379. sess.lastAudioL = audio[lastIdx]
  380. sess.lastAudioR = audio[lastIdx+1]
  381. } else {
  382. sess.lastAudioL = audio[len(audio)-1]
  383. sess.lastAudioR = 0
  384. }
  385. sess.lastAudioSet = true
  386. }
  387. sess.lastAudioTs = time.Now()
  388. }
  389. st.fanoutPCM(sess, pcm, pcmLen)
  390. }
  391. // Segment split (recording sessions only)
  392. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  393. segIdx := sess.segmentIdx + 1
  394. oldSubs := sess.audioSubs
  395. oldState := sess.captureDSPState()
  396. sess.audioSubs = nil
  397. closeSession(sess, &st.policy)
  398. s, err := st.openRecordingSession(sig, now)
  399. if err != nil {
  400. delete(st.sessions, sig.ID)
  401. continue
  402. }
  403. s.segmentIdx = segIdx
  404. s.audioSubs = oldSubs
  405. s.restoreDSPState(oldState)
  406. st.sessions[sig.ID] = s
  407. }
  408. }
  409. // Close sessions for disappeared signals (with grace period)
  410. for id, sess := range st.sessions {
  411. if seen[id] {
  412. continue
  413. }
  414. gracePeriod := 3 * time.Second
  415. if sess.listenOnly {
  416. gracePeriod = 5 * time.Second
  417. }
  418. if now.Sub(sess.lastFeed) > gracePeriod {
  419. for _, sub := range sess.audioSubs {
  420. close(sub.ch)
  421. }
  422. sess.audioSubs = nil
  423. if !sess.listenOnly {
  424. closeSession(sess, &st.policy)
  425. }
  426. delete(st.sessions, id)
  427. }
  428. }
  429. }
  430. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  431. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  432. if st.policy.DebugLiveAudio {
  433. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  434. }
  435. return true
  436. }
  437. for subID, pl := range st.pendingListens {
  438. delta := math.Abs(sig.CenterHz - pl.freq)
  439. if delta < 200000 {
  440. if st.policy.DebugLiveAudio {
  441. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  442. }
  443. return true
  444. }
  445. }
  446. return false
  447. }
  448. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  449. for subID, pl := range st.pendingListens {
  450. requestedMode := normalizeRequestedMode(pl.mode)
  451. if requestedMode != "" && sess.demodName != requestedMode {
  452. continue
  453. }
  454. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  455. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  456. delete(st.pendingListens, subID)
  457. // Send updated audio_info now that we know the real session params.
  458. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  459. infoJSON, _ := json.Marshal(sess.audioInfo())
  460. tagged := make([]byte, 1+len(infoJSON))
  461. tagged[0] = 0x00 // tag: audio_info
  462. copy(tagged[1:], infoJSON)
  463. select {
  464. case pl.ch <- tagged:
  465. default:
  466. }
  467. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  468. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  469. }
  470. }
  471. }
  472. // CloseAll finalises all sessions and stops the worker goroutine.
  473. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  474. st.mu.Lock()
  475. defer st.mu.Unlock()
  476. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  477. for _, sess := range st.sessions {
  478. out[sess.signalID] = RuntimeSignalInfo{
  479. DemodName: sess.demodName,
  480. PlaybackMode: sess.playbackMode,
  481. StereoState: sess.stereoState,
  482. Channels: sess.channels,
  483. SampleRate: sess.sampleRate,
  484. }
  485. }
  486. return out
  487. }
  488. func (st *Streamer) CloseAll() {
  489. close(st.feedCh)
  490. <-st.done
  491. st.mu.Lock()
  492. defer st.mu.Unlock()
  493. for id, sess := range st.sessions {
  494. for _, sub := range sess.audioSubs {
  495. close(sub.ch)
  496. }
  497. sess.audioSubs = nil
  498. if !sess.listenOnly {
  499. closeSession(sess, &st.policy)
  500. }
  501. delete(st.sessions, id)
  502. }
  503. for _, pl := range st.pendingListens {
  504. close(pl.ch)
  505. }
  506. st.pendingListens = nil
  507. }
  508. // ActiveSessions returns the number of open streaming sessions.
  509. func (st *Streamer) ActiveSessions() int {
  510. st.mu.Lock()
  511. defer st.mu.Unlock()
  512. return len(st.sessions)
  513. }
  514. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  515. //
  516. // LL-2: Returns AudioInfo with correct channels and sample rate.
  517. // LL-3: Returns error only on hard failures (nil streamer etc).
  518. //
  519. // If a matching session exists, attaches immediately. Otherwise, the
  520. // subscriber is held as "pending" and will be attached when a matching
  521. // signal appears in the next DSP frame.
  522. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  523. ch := make(chan []byte, 64)
  524. st.mu.Lock()
  525. defer st.mu.Unlock()
  526. st.nextSub++
  527. subID := st.nextSub
  528. requestedMode := normalizeRequestedMode(mode)
  529. // Try to find a matching session
  530. var bestSess *streamSession
  531. bestDist := math.MaxFloat64
  532. for _, sess := range st.sessions {
  533. if requestedMode != "" && sess.demodName != requestedMode {
  534. continue
  535. }
  536. d := math.Abs(sess.centerHz - freq)
  537. if d < bestDist {
  538. bestDist = d
  539. bestSess = sess
  540. }
  541. }
  542. if bestSess != nil && bestDist < 200000 {
  543. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  544. info := bestSess.audioInfo()
  545. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  546. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  547. return subID, ch, info, nil
  548. }
  549. // No matching session yet — add as pending listener
  550. st.pendingListens[subID] = &pendingListen{
  551. freq: freq,
  552. bw: bw,
  553. mode: mode,
  554. ch: ch,
  555. }
  556. info := defaultAudioInfoForMode(mode)
  557. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  558. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  559. return subID, ch, info, nil
  560. }
  561. // UnsubscribeAudio removes a live-listen subscriber.
  562. func (st *Streamer) UnsubscribeAudio(subID int64) {
  563. st.mu.Lock()
  564. defer st.mu.Unlock()
  565. if pl, ok := st.pendingListens[subID]; ok {
  566. close(pl.ch)
  567. delete(st.pendingListens, subID)
  568. return
  569. }
  570. for _, sess := range st.sessions {
  571. for i, sub := range sess.audioSubs {
  572. if sub.id == subID {
  573. close(sub.ch)
  574. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  575. return
  576. }
  577. }
  578. }
  579. }
  580. // ---------------------------------------------------------------------------
  581. // Session: stateful extraction + demod
  582. // ---------------------------------------------------------------------------
  583. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  584. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  585. // output with zero transient artifacts.
  586. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  587. if len(snippet) == 0 || snipRate <= 0 {
  588. return nil, 0
  589. }
  590. isWFMStereo := sess.demodName == "WFM_STEREO"
  591. isWFM := sess.demodName == "WFM" || isWFMStereo
  592. demodName := sess.demodName
  593. if isWFMStereo {
  594. demodName = "WFM"
  595. }
  596. d := demod.Get(demodName)
  597. if d == nil {
  598. d = demod.Get("NFM")
  599. }
  600. if d == nil {
  601. return nil, 0
  602. }
  603. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  604. // The FM discriminator needs iq[i-1] to compute the first output.
  605. // All FIR filtering is now stateful, so no additional overlap is needed.
  606. var fullSnip []complex64
  607. trimSamples := 0
  608. _ = trimSamples
  609. if len(sess.overlapIQ) == 1 {
  610. fullSnip = make([]complex64, 1+len(snippet))
  611. fullSnip[0] = sess.overlapIQ[0]
  612. copy(fullSnip[1:], snippet)
  613. trimSamples = 1
  614. logging.Debug("discrim", "overlap_applied", "signal", sess.signalID, "snip", len(snippet))
  615. } else {
  616. fullSnip = snippet
  617. }
  618. // Save last sample for next frame's FM discriminator
  619. if len(snippet) > 0 {
  620. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  621. }
  622. // --- Stateful anti-alias FIR + decimation to demod rate ---
  623. demodRate := d.OutputSampleRate()
  624. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  625. if decim1 < 1 {
  626. decim1 = 1
  627. }
  628. actualDemodRate := snipRate / decim1
  629. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  630. var dec []complex64
  631. if decim1 > 1 {
  632. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  633. // Lazy-init or reinit stateful FIR if parameters changed
  634. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  635. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  636. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  637. sess.preDemodRate = snipRate
  638. sess.preDemodCutoff = cutoff
  639. sess.preDemodDecim = decim1
  640. }
  641. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  642. dec = dsp.Decimate(filtered, decim1)
  643. } else {
  644. dec = fullSnip
  645. }
  646. // --- FM Demod ---
  647. audio := d.Demod(dec, actualDemodRate)
  648. if len(audio) == 0 {
  649. return nil, 0
  650. }
  651. // --- Trim the 1-sample FM discriminator overlap ---
  652. // TEMP: skip audio trim to test if per-block trimming causes ticks
  653. // --- Stateful stereo decode with conservative lock/hysteresis ---
  654. channels := 1
  655. if isWFMStereo {
  656. sess.playbackMode = "WFM_STEREO"
  657. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  658. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  659. if locked {
  660. sess.stereoOnCount++
  661. sess.stereoOffCount = 0
  662. if sess.stereoOnCount >= 4 {
  663. sess.stereoEnabled = true
  664. }
  665. } else {
  666. sess.stereoOnCount = 0
  667. sess.stereoOffCount++
  668. if sess.stereoOffCount >= 10 {
  669. sess.stereoEnabled = false
  670. }
  671. }
  672. prevPlayback := sess.playbackMode
  673. prevStereo := sess.stereoState
  674. if sess.stereoEnabled && len(stereoAudio) > 0 {
  675. sess.stereoState = "locked"
  676. audio = stereoAudio
  677. } else {
  678. sess.stereoState = "mono-fallback"
  679. dual := make([]float32, len(audio)*2)
  680. for i, s := range audio {
  681. dual[i*2] = s
  682. dual[i*2+1] = s
  683. }
  684. audio = dual
  685. }
  686. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  687. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  688. }
  689. }
  690. // --- Polyphase resample to exact 48kHz ---
  691. if actualDemodRate != streamAudioRate {
  692. if channels > 1 {
  693. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  694. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  695. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  696. sess.stereoResamplerRate = actualDemodRate
  697. }
  698. audio = sess.stereoResampler.Process(audio)
  699. } else {
  700. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  701. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  702. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  703. sess.monoResamplerRate = actualDemodRate
  704. }
  705. audio = sess.monoResampler.Process(audio)
  706. }
  707. }
  708. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  709. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  710. tau := sess.deemphasisUs * 1e-6
  711. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  712. if channels > 1 {
  713. nFrames := len(audio) / channels
  714. yL, yR := sess.deemphL, sess.deemphR
  715. for i := 0; i < nFrames; i++ {
  716. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  717. audio[i*2] = float32(yL)
  718. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  719. audio[i*2+1] = float32(yR)
  720. }
  721. sess.deemphL, sess.deemphR = yL, yR
  722. } else {
  723. y := sess.deemphL
  724. for i := range audio {
  725. y = alpha*y + (1-alpha)*float64(audio[i])
  726. audio[i] = float32(y)
  727. }
  728. sess.deemphL = y
  729. }
  730. }
  731. if isWFM {
  732. for i := range audio {
  733. audio[i] *= 0.35
  734. }
  735. }
  736. return audio, streamAudioRate
  737. }
  738. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  739. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  740. // loopBW is in Hz, sampleRate in samples/sec.
  741. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  742. if sampleRate <= 0 || loopBW <= 0 {
  743. return 0, 0
  744. }
  745. bl := loopBW / float64(sampleRate)
  746. theta := bl / (damping + 0.25/damping)
  747. d := 1 + 2*damping*theta + theta*theta
  748. alpha := (4 * damping * theta) / d
  749. beta := (4 * theta * theta) / d
  750. return alpha, beta
  751. }
  752. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  753. // Uses persistent FIR filter state across frames for click-free stereo.
  754. // Reuses session scratch buffers to minimize allocations.
  755. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  756. if len(mono) == 0 || sampleRate <= 0 {
  757. return nil, false
  758. }
  759. n := len(mono)
  760. // Rebuild rate-dependent stereo filters when sampleRate changes
  761. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  762. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  763. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  764. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  765. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  766. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  767. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  768. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  769. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  770. sess.stereoFilterRate = sampleRate
  771. // Initialize PLL for 19kHz pilot tracking.
  772. sess.pilotPhase = 0
  773. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  774. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  775. sess.pilotErrAvg = 0
  776. sess.pilotI = 0
  777. sess.pilotQ = 0
  778. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  779. }
  780. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  781. scratch := sess.growAudio(n * 5)
  782. lpr := scratch[:n]
  783. bpfLR := scratch[n : 2*n]
  784. lr := scratch[2*n : 3*n]
  785. work1 := scratch[3*n : 4*n]
  786. work2 := scratch[4*n : 5*n]
  787. sess.stereoLPF.ProcessInto(mono, lpr)
  788. // 23-53kHz bandpass for L-R DSB-SC.
  789. sess.stereoBPHi.ProcessInto(mono, work1)
  790. sess.stereoBPLo.ProcessInto(mono, work2)
  791. for i := 0; i < n; i++ {
  792. bpfLR[i] = work1[i] - work2[i]
  793. }
  794. // 19kHz pilot bandpass for PLL.
  795. sess.pilotLPFHi.ProcessInto(mono, work1)
  796. sess.pilotLPFLo.ProcessInto(mono, work2)
  797. for i := 0; i < n; i++ {
  798. work1[i] = work1[i] - work2[i]
  799. }
  800. pilot := work1
  801. phase := sess.pilotPhase
  802. freq := sess.pilotFreq
  803. alpha := sess.pilotAlpha
  804. beta := sess.pilotBeta
  805. iState := sess.pilotI
  806. qState := sess.pilotQ
  807. lpAlpha := sess.pilotLPAlpha
  808. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  809. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  810. var pilotPower float64
  811. var totalPower float64
  812. var errSum float64
  813. for i := 0; i < n; i++ {
  814. p := float64(pilot[i])
  815. sinP, cosP := math.Sincos(phase)
  816. iMix := p * cosP
  817. qMix := p * -sinP
  818. iState += lpAlpha * (iMix - iState)
  819. qState += lpAlpha * (qMix - qState)
  820. err := math.Atan2(qState, iState)
  821. freq += beta * err
  822. if freq < minFreq {
  823. freq = minFreq
  824. } else if freq > maxFreq {
  825. freq = maxFreq
  826. }
  827. phase += freq + alpha*err
  828. if phase > 2*math.Pi {
  829. phase -= 2 * math.Pi
  830. } else if phase < 0 {
  831. phase += 2 * math.Pi
  832. }
  833. totalPower += float64(mono[i]) * float64(mono[i])
  834. pilotPower += p * p
  835. errSum += math.Abs(err)
  836. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  837. }
  838. sess.pilotPhase = phase
  839. sess.pilotFreq = freq
  840. sess.pilotI = iState
  841. sess.pilotQ = qState
  842. blockErr := errSum / float64(n)
  843. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  844. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  845. pilotRatio := 0.0
  846. if totalPower > 0 {
  847. pilotRatio = pilotPower / totalPower
  848. }
  849. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  850. // Lock heuristics: pilot power fraction and PLL phase error stability.
  851. // Pilot power is a small but stable fraction of composite energy; require
  852. // a modest floor plus PLL settling to avoid flapping in noise.
  853. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  854. out := make([]float32, n*2)
  855. for i := 0; i < n; i++ {
  856. out[i*2] = 0.5 * (lpr[i] + lr[i])
  857. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  858. }
  859. return out, locked
  860. }
  861. // dspStateSnapshot captures persistent DSP state for segment splits.
  862. type dspStateSnapshot struct {
  863. overlapIQ []complex64
  864. deemphL float64
  865. deemphR float64
  866. pilotPhase float64
  867. pilotFreq float64
  868. pilotAlpha float64
  869. pilotBeta float64
  870. pilotErrAvg float64
  871. pilotI float64
  872. pilotQ float64
  873. pilotLPAlpha float64
  874. monoResampler *dsp.Resampler
  875. monoResamplerRate int
  876. stereoResampler *dsp.StereoResampler
  877. stereoResamplerRate int
  878. stereoLPF *dsp.StatefulFIRReal
  879. stereoFilterRate int
  880. stereoBPHi *dsp.StatefulFIRReal
  881. stereoBPLo *dsp.StatefulFIRReal
  882. stereoLRLPF *dsp.StatefulFIRReal
  883. stereoAALPF *dsp.StatefulFIRReal
  884. pilotLPFHi *dsp.StatefulFIRReal
  885. pilotLPFLo *dsp.StatefulFIRReal
  886. preDemodFIR *dsp.StatefulFIRComplex
  887. preDemodDecim int
  888. preDemodRate int
  889. preDemodCutoff float64
  890. }
  891. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  892. return dspStateSnapshot{
  893. overlapIQ: sess.overlapIQ,
  894. deemphL: sess.deemphL,
  895. deemphR: sess.deemphR,
  896. pilotPhase: sess.pilotPhase,
  897. pilotFreq: sess.pilotFreq,
  898. pilotAlpha: sess.pilotAlpha,
  899. pilotBeta: sess.pilotBeta,
  900. pilotErrAvg: sess.pilotErrAvg,
  901. pilotI: sess.pilotI,
  902. pilotQ: sess.pilotQ,
  903. pilotLPAlpha: sess.pilotLPAlpha,
  904. monoResampler: sess.monoResampler,
  905. monoResamplerRate: sess.monoResamplerRate,
  906. stereoResampler: sess.stereoResampler,
  907. stereoResamplerRate: sess.stereoResamplerRate,
  908. stereoLPF: sess.stereoLPF,
  909. stereoFilterRate: sess.stereoFilterRate,
  910. stereoBPHi: sess.stereoBPHi,
  911. stereoBPLo: sess.stereoBPLo,
  912. stereoLRLPF: sess.stereoLRLPF,
  913. stereoAALPF: sess.stereoAALPF,
  914. pilotLPFHi: sess.pilotLPFHi,
  915. pilotLPFLo: sess.pilotLPFLo,
  916. preDemodFIR: sess.preDemodFIR,
  917. preDemodDecim: sess.preDemodDecim,
  918. preDemodRate: sess.preDemodRate,
  919. preDemodCutoff: sess.preDemodCutoff,
  920. }
  921. }
  922. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  923. sess.overlapIQ = s.overlapIQ
  924. sess.deemphL = s.deemphL
  925. sess.deemphR = s.deemphR
  926. sess.pilotPhase = s.pilotPhase
  927. sess.pilotFreq = s.pilotFreq
  928. sess.pilotAlpha = s.pilotAlpha
  929. sess.pilotBeta = s.pilotBeta
  930. sess.pilotErrAvg = s.pilotErrAvg
  931. sess.pilotI = s.pilotI
  932. sess.pilotQ = s.pilotQ
  933. sess.pilotLPAlpha = s.pilotLPAlpha
  934. sess.monoResampler = s.monoResampler
  935. sess.monoResamplerRate = s.monoResamplerRate
  936. sess.stereoResampler = s.stereoResampler
  937. sess.stereoResamplerRate = s.stereoResamplerRate
  938. sess.stereoLPF = s.stereoLPF
  939. sess.stereoFilterRate = s.stereoFilterRate
  940. sess.stereoBPHi = s.stereoBPHi
  941. sess.stereoBPLo = s.stereoBPLo
  942. sess.stereoLRLPF = s.stereoLRLPF
  943. sess.stereoAALPF = s.stereoAALPF
  944. sess.pilotLPFHi = s.pilotLPFHi
  945. sess.pilotLPFLo = s.pilotLPFLo
  946. sess.preDemodFIR = s.preDemodFIR
  947. sess.preDemodDecim = s.preDemodDecim
  948. sess.preDemodRate = s.preDemodRate
  949. sess.preDemodCutoff = s.preDemodCutoff
  950. }
  951. // ---------------------------------------------------------------------------
  952. // Session management helpers
  953. // ---------------------------------------------------------------------------
  954. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  955. outputDir := st.policy.OutputDir
  956. if outputDir == "" {
  957. outputDir = "data/recordings"
  958. }
  959. demodName, channels := resolveDemod(sig)
  960. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  961. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  962. dir := filepath.Join(outputDir, dirName)
  963. if err := os.MkdirAll(dir, 0o755); err != nil {
  964. return nil, err
  965. }
  966. wavPath := filepath.Join(dir, "audio.wav")
  967. f, err := os.Create(wavPath)
  968. if err != nil {
  969. return nil, err
  970. }
  971. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  972. f.Close()
  973. return nil, err
  974. }
  975. playbackMode, stereoState := initialPlaybackState(demodName)
  976. sess := &streamSession{
  977. signalID: sig.ID,
  978. centerHz: sig.CenterHz,
  979. bwHz: sig.BWHz,
  980. snrDb: sig.SNRDb,
  981. peakDb: sig.PeakDb,
  982. class: sig.Class,
  983. startTime: now,
  984. lastFeed: now,
  985. dir: dir,
  986. wavFile: f,
  987. wavBuf: bufio.NewWriterSize(f, 64*1024),
  988. sampleRate: streamAudioRate,
  989. channels: channels,
  990. demodName: demodName,
  991. playbackMode: playbackMode,
  992. stereoState: stereoState,
  993. deemphasisUs: st.policy.DeemphasisUs,
  994. }
  995. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  996. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  997. return sess, nil
  998. }
  999. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1000. demodName, channels := resolveDemod(sig)
  1001. for _, pl := range st.pendingListens {
  1002. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1003. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1004. demodName = requested
  1005. if demodName == "WFM_STEREO" {
  1006. channels = 2
  1007. } else if d := demod.Get(demodName); d != nil {
  1008. channels = d.Channels()
  1009. } else {
  1010. channels = 1
  1011. }
  1012. break
  1013. }
  1014. }
  1015. }
  1016. playbackMode, stereoState := initialPlaybackState(demodName)
  1017. sess := &streamSession{
  1018. signalID: sig.ID,
  1019. centerHz: sig.CenterHz,
  1020. bwHz: sig.BWHz,
  1021. snrDb: sig.SNRDb,
  1022. peakDb: sig.PeakDb,
  1023. class: sig.Class,
  1024. startTime: now,
  1025. lastFeed: now,
  1026. listenOnly: true,
  1027. sampleRate: streamAudioRate,
  1028. channels: channels,
  1029. demodName: demodName,
  1030. playbackMode: playbackMode,
  1031. stereoState: stereoState,
  1032. deemphasisUs: st.policy.DeemphasisUs,
  1033. }
  1034. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1035. sig.ID, sig.CenterHz/1e6, demodName)
  1036. return sess
  1037. }
  1038. func resolveDemod(sig *detector.Signal) (string, int) {
  1039. demodName := "NFM"
  1040. if sig.Class != nil {
  1041. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1042. demodName = n
  1043. }
  1044. }
  1045. channels := 1
  1046. if demodName == "WFM_STEREO" {
  1047. channels = 2
  1048. } else if d := demod.Get(demodName); d != nil {
  1049. channels = d.Channels()
  1050. }
  1051. return demodName, channels
  1052. }
  1053. func initialPlaybackState(demodName string) (string, string) {
  1054. playbackMode := demodName
  1055. stereoState := "mono"
  1056. if demodName == "WFM_STEREO" {
  1057. stereoState = "searching"
  1058. }
  1059. return playbackMode, stereoState
  1060. }
  1061. func (sess *streamSession) audioInfo() AudioInfo {
  1062. return AudioInfo{
  1063. SampleRate: sess.sampleRate,
  1064. Channels: sess.channels,
  1065. Format: "s16le",
  1066. DemodName: sess.demodName,
  1067. PlaybackMode: sess.playbackMode,
  1068. StereoState: sess.stereoState,
  1069. }
  1070. }
  1071. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1072. infoJSON, _ := json.Marshal(info)
  1073. tagged := make([]byte, 1+len(infoJSON))
  1074. tagged[0] = 0x00 // tag: audio_info
  1075. copy(tagged[1:], infoJSON)
  1076. for _, sub := range subs {
  1077. select {
  1078. case sub.ch <- tagged:
  1079. default:
  1080. }
  1081. }
  1082. }
  1083. func defaultAudioInfoForMode(mode string) AudioInfo {
  1084. demodName := "NFM"
  1085. if requested := normalizeRequestedMode(mode); requested != "" {
  1086. demodName = requested
  1087. }
  1088. channels := 1
  1089. if demodName == "WFM_STEREO" {
  1090. channels = 2
  1091. } else if d := demod.Get(demodName); d != nil {
  1092. channels = d.Channels()
  1093. }
  1094. playbackMode, stereoState := initialPlaybackState(demodName)
  1095. return AudioInfo{
  1096. SampleRate: streamAudioRate,
  1097. Channels: channels,
  1098. Format: "s16le",
  1099. DemodName: demodName,
  1100. PlaybackMode: playbackMode,
  1101. StereoState: stereoState,
  1102. }
  1103. }
  1104. func normalizeRequestedMode(mode string) string {
  1105. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1106. case "", "AUTO":
  1107. return ""
  1108. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1109. return strings.ToUpper(strings.TrimSpace(mode))
  1110. default:
  1111. return ""
  1112. }
  1113. }
  1114. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1115. func (sess *streamSession) growIQ(n int) []complex64 {
  1116. if cap(sess.scratchIQ) >= n {
  1117. return sess.scratchIQ[:n]
  1118. }
  1119. sess.scratchIQ = make([]complex64, n, n*5/4)
  1120. return sess.scratchIQ
  1121. }
  1122. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1123. func (sess *streamSession) growAudio(n int) []float32 {
  1124. if cap(sess.scratchAudio) >= n {
  1125. return sess.scratchAudio[:n]
  1126. }
  1127. sess.scratchAudio = make([]float32, n, n*5/4)
  1128. return sess.scratchAudio
  1129. }
  1130. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1131. func (sess *streamSession) growPCM(n int) []byte {
  1132. if cap(sess.scratchPCM) >= n {
  1133. return sess.scratchPCM[:n]
  1134. }
  1135. sess.scratchPCM = make([]byte, n, n*5/4)
  1136. return sess.scratchPCM
  1137. }
  1138. func convertToListenOnly(sess *streamSession) {
  1139. if sess.wavBuf != nil {
  1140. _ = sess.wavBuf.Flush()
  1141. }
  1142. if sess.wavFile != nil {
  1143. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1144. sess.wavFile.Close()
  1145. }
  1146. sess.wavFile = nil
  1147. sess.wavBuf = nil
  1148. sess.listenOnly = true
  1149. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1150. }
  1151. func closeSession(sess *streamSession, policy *Policy) {
  1152. if sess.listenOnly {
  1153. return
  1154. }
  1155. if sess.wavBuf != nil {
  1156. _ = sess.wavBuf.Flush()
  1157. }
  1158. if sess.wavFile != nil {
  1159. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1160. sess.wavFile.Close()
  1161. sess.wavFile = nil
  1162. sess.wavBuf = nil
  1163. }
  1164. dur := sess.lastFeed.Sub(sess.startTime)
  1165. files := map[string]any{
  1166. "audio": "audio.wav",
  1167. "audio_sample_rate": sess.sampleRate,
  1168. "audio_channels": sess.channels,
  1169. "audio_demod": sess.demodName,
  1170. "recording_mode": "streaming",
  1171. }
  1172. meta := Meta{
  1173. EventID: sess.signalID,
  1174. Start: sess.startTime,
  1175. End: sess.lastFeed,
  1176. CenterHz: sess.centerHz,
  1177. BandwidthHz: sess.bwHz,
  1178. SampleRate: sess.sampleRate,
  1179. SNRDb: sess.snrDb,
  1180. PeakDb: sess.peakDb,
  1181. Class: sess.class,
  1182. DurationMs: dur.Milliseconds(),
  1183. Files: files,
  1184. }
  1185. b, err := json.MarshalIndent(meta, "", " ")
  1186. if err == nil {
  1187. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1188. }
  1189. if policy != nil {
  1190. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1191. }
  1192. }
  1193. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1194. if len(sess.audioSubs) == 0 {
  1195. return
  1196. }
  1197. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1198. tagged := make([]byte, 1+pcmLen)
  1199. tagged[0] = 0x01
  1200. copy(tagged[1:], pcm[:pcmLen])
  1201. alive := sess.audioSubs[:0]
  1202. for _, sub := range sess.audioSubs {
  1203. select {
  1204. case sub.ch <- tagged:
  1205. default:
  1206. st.droppedPCM++
  1207. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1208. }
  1209. alive = append(alive, sub)
  1210. }
  1211. sess.audioSubs = alive
  1212. }
  1213. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1214. if len(st.policy.ClassFilter) == 0 {
  1215. return true
  1216. }
  1217. if cls == nil {
  1218. return false
  1219. }
  1220. for _, f := range st.policy.ClassFilter {
  1221. if strings.EqualFold(f, string(cls.ModType)) {
  1222. return true
  1223. }
  1224. }
  1225. return false
  1226. }
  1227. // ErrNoSession is returned when no matching signal session exists.
  1228. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1229. // ---------------------------------------------------------------------------
  1230. // WAV header helpers
  1231. // ---------------------------------------------------------------------------
  1232. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1233. if channels <= 0 {
  1234. channels = 1
  1235. }
  1236. hdr := make([]byte, 44)
  1237. copy(hdr[0:4], "RIFF")
  1238. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1239. copy(hdr[8:12], "WAVE")
  1240. copy(hdr[12:16], "fmt ")
  1241. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1242. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1243. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1244. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1245. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1246. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1247. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1248. copy(hdr[36:40], "data")
  1249. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1250. _, err := f.Write(hdr)
  1251. return err
  1252. }
  1253. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1254. dataSize := uint32(totalSamples * 2)
  1255. var buf [4]byte
  1256. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1257. if _, err := f.Seek(4, 0); err != nil {
  1258. return
  1259. }
  1260. _, _ = f.Write(buf[:])
  1261. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1262. if _, err := f.Seek(24, 0); err != nil {
  1263. return
  1264. }
  1265. _, _ = f.Write(buf[:])
  1266. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1267. if _, err := f.Seek(28, 0); err != nil {
  1268. return
  1269. }
  1270. _, _ = f.Write(buf[:])
  1271. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1272. if _, err := f.Seek(40, 0); err != nil {
  1273. return
  1274. }
  1275. _, _ = f.Write(buf[:])
  1276. }