Wideband autonomous SDR analysis engine forked from sdr-visual-suite
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

1426 satır
40KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. lastAudioL float32
  37. lastAudioR float32
  38. lastAudioSet bool
  39. // listenOnly sessions have no WAV file and no disk I/O.
  40. // They exist solely to feed audio to live-listen subscribers.
  41. listenOnly bool
  42. // Recording state (nil/zero for listen-only sessions)
  43. dir string
  44. wavFile *os.File
  45. wavBuf *bufio.Writer
  46. wavSamples int64
  47. segmentIdx int
  48. sampleRate int // actual output audio sample rate (always streamAudioRate)
  49. channels int
  50. demodName string
  51. // --- Persistent DSP state for click-free streaming ---
  52. // Overlap-save: tail of previous extracted IQ snippet.
  53. overlapIQ []complex64
  54. // De-emphasis IIR state (persists across frames)
  55. deemphL float64
  56. deemphR float64
  57. // Stereo lock state for live WFM streaming
  58. stereoEnabled bool
  59. stereoOnCount int
  60. stereoOffCount int
  61. // Pilot-locked stereo PLL state (19kHz pilot)
  62. pilotPhase float64
  63. pilotFreq float64
  64. pilotAlpha float64
  65. pilotBeta float64
  66. pilotErrAvg float64
  67. pilotI float64
  68. pilotQ float64
  69. pilotLPAlpha float64
  70. // Polyphase resampler (replaces integer-decimate hack)
  71. monoResampler *dsp.Resampler
  72. monoResamplerRate int
  73. stereoResampler *dsp.StereoResampler
  74. stereoResamplerRate int
  75. // AQ-4: Stateful FIR filters for click-free stereo decode
  76. stereoFilterRate int
  77. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  78. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  79. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  80. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  81. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  82. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  83. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  84. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  85. // and avoids per-frame FIR recomputation)
  86. preDemodFIR *dsp.StatefulFIRComplex
  87. preDemodDecim int // cached decimation factor
  88. preDemodRate int // cached snipRate this FIR was built for
  89. preDemodCutoff float64 // cached cutoff
  90. // AQ-2: De-emphasis config (µs, 0 = disabled)
  91. deemphasisUs float64
  92. // Scratch buffers — reused across frames to avoid GC pressure.
  93. // Grown as needed, never shrunk.
  94. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  95. scratchAudio []float32 // for stereo decode intermediates
  96. scratchPCM []byte // for PCM encoding
  97. // live-listen subscribers
  98. audioSubs []audioSub
  99. }
  100. type audioSub struct {
  101. id int64
  102. ch chan []byte
  103. }
  104. type RuntimeSignalInfo struct {
  105. DemodName string
  106. PlaybackMode string
  107. StereoState string
  108. Channels int
  109. SampleRate int
  110. }
  111. // AudioInfo describes the audio format of a live-listen subscription.
  112. // Sent to the WebSocket client as the first message.
  113. type AudioInfo struct {
  114. SampleRate int `json:"sample_rate"`
  115. Channels int `json:"channels"`
  116. Format string `json:"format"` // always "s16le"
  117. DemodName string `json:"demod"`
  118. PlaybackMode string `json:"playback_mode,omitempty"`
  119. StereoState string `json:"stereo_state,omitempty"`
  120. }
  121. const (
  122. streamAudioRate = 48000
  123. resamplerTaps = 32 // taps per polyphase arm — good quality
  124. )
  125. // ---------------------------------------------------------------------------
  126. // Streamer — manages all active streaming sessions
  127. // ---------------------------------------------------------------------------
  128. type streamFeedItem struct {
  129. signal detector.Signal
  130. snippet []complex64
  131. snipRate int
  132. }
  133. type streamFeedMsg struct {
  134. items []streamFeedItem
  135. }
  136. type Streamer struct {
  137. mu sync.Mutex
  138. sessions map[int64]*streamSession
  139. policy Policy
  140. centerHz float64
  141. nextSub int64
  142. feedCh chan streamFeedMsg
  143. done chan struct{}
  144. droppedFeed uint64
  145. droppedPCM uint64
  146. lastFeedTS time.Time
  147. lastProcTS time.Time
  148. // pendingListens are subscribers waiting for a matching session.
  149. pendingListens map[int64]*pendingListen
  150. }
  151. type pendingListen struct {
  152. freq float64
  153. bw float64
  154. mode string
  155. ch chan []byte
  156. }
  157. func newStreamer(policy Policy, centerHz float64) *Streamer {
  158. st := &Streamer{
  159. sessions: make(map[int64]*streamSession),
  160. policy: policy,
  161. centerHz: centerHz,
  162. feedCh: make(chan streamFeedMsg, 2),
  163. done: make(chan struct{}),
  164. pendingListens: make(map[int64]*pendingListen),
  165. }
  166. go st.worker()
  167. return st
  168. }
  169. func (st *Streamer) worker() {
  170. for msg := range st.feedCh {
  171. st.processFeed(msg)
  172. }
  173. close(st.done)
  174. }
  175. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  176. st.mu.Lock()
  177. defer st.mu.Unlock()
  178. wasEnabled := st.policy.Enabled
  179. st.policy = policy
  180. st.centerHz = centerHz
  181. // If recording was just disabled, close recording sessions
  182. // but keep listen-only sessions alive.
  183. if wasEnabled && !policy.Enabled {
  184. for id, sess := range st.sessions {
  185. if sess.listenOnly {
  186. continue
  187. }
  188. if len(sess.audioSubs) > 0 {
  189. // Convert to listen-only: close WAV but keep session
  190. convertToListenOnly(sess)
  191. } else {
  192. closeSession(sess, &st.policy)
  193. delete(st.sessions, id)
  194. }
  195. }
  196. }
  197. }
  198. // HasListeners returns true if any sessions have audio subscribers
  199. // or there are pending listen requests. Used by the DSP loop to
  200. // decide whether to feed snippets even when recording is disabled.
  201. func (st *Streamer) HasListeners() bool {
  202. st.mu.Lock()
  203. defer st.mu.Unlock()
  204. return st.hasListenersLocked()
  205. }
  206. func (st *Streamer) hasListenersLocked() bool {
  207. if len(st.pendingListens) > 0 {
  208. return true
  209. }
  210. for _, sess := range st.sessions {
  211. if len(sess.audioSubs) > 0 {
  212. return true
  213. }
  214. }
  215. return false
  216. }
  217. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  218. // Feeds are accepted if:
  219. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  220. // - Any live-listen subscribers exist (listen-only mode)
  221. //
  222. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  223. // data, so items can be passed directly without another copy.
  224. func (st *Streamer) FeedSnippets(items []streamFeedItem) {
  225. st.mu.Lock()
  226. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  227. hasListeners := st.hasListenersLocked()
  228. pending := len(st.pendingListens)
  229. debugLiveAudio := st.policy.DebugLiveAudio
  230. now := time.Now()
  231. if !st.lastFeedTS.IsZero() {
  232. gap := now.Sub(st.lastFeedTS)
  233. if gap > 150*time.Millisecond {
  234. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  235. }
  236. }
  237. st.lastFeedTS = now
  238. st.mu.Unlock()
  239. if debugLiveAudio {
  240. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  241. }
  242. if (!recEnabled && !hasListeners) || len(items) == 0 {
  243. return
  244. }
  245. select {
  246. case st.feedCh <- streamFeedMsg{items: items}:
  247. default:
  248. st.droppedFeed++
  249. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  250. }
  251. }
  252. // processFeed runs in the worker goroutine.
  253. func (st *Streamer) processFeed(msg streamFeedMsg) {
  254. st.mu.Lock()
  255. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  256. hasListeners := st.hasListenersLocked()
  257. now := time.Now()
  258. if !st.lastProcTS.IsZero() {
  259. gap := now.Sub(st.lastProcTS)
  260. if gap > 150*time.Millisecond {
  261. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds())
  262. }
  263. }
  264. st.lastProcTS = now
  265. defer st.mu.Unlock()
  266. if !recEnabled && !hasListeners {
  267. return
  268. }
  269. seen := make(map[int64]bool, len(msg.items))
  270. for i := range msg.items {
  271. item := &msg.items[i]
  272. sig := &item.signal
  273. seen[sig.ID] = true
  274. if sig.ID == 0 || sig.Class == nil {
  275. continue
  276. }
  277. if len(item.snippet) == 0 || item.snipRate <= 0 {
  278. continue
  279. }
  280. // Decide whether this signal needs a session
  281. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  282. needsListen := st.signalHasListenerLocked(sig)
  283. className := "<nil>"
  284. demodName := ""
  285. if sig.Class != nil {
  286. className = string(sig.Class.ModType)
  287. demodName, _ = resolveDemod(sig)
  288. }
  289. if st.policy.DebugLiveAudio {
  290. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  291. }
  292. if !needsRecording && !needsListen {
  293. continue
  294. }
  295. sess, exists := st.sessions[sig.ID]
  296. requestedMode := ""
  297. for _, pl := range st.pendingListens {
  298. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  299. if m := normalizeRequestedMode(pl.mode); m != "" {
  300. requestedMode = m
  301. break
  302. }
  303. }
  304. }
  305. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  306. for _, sub := range sess.audioSubs {
  307. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  308. }
  309. delete(st.sessions, sig.ID)
  310. sess = nil
  311. exists = false
  312. }
  313. if !exists {
  314. if needsRecording {
  315. s, err := st.openRecordingSession(sig, now)
  316. if err != nil {
  317. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  318. sig.ID, sig.CenterHz/1e6, err)
  319. continue
  320. }
  321. st.sessions[sig.ID] = s
  322. sess = s
  323. } else {
  324. s := st.openListenSession(sig, now)
  325. st.sessions[sig.ID] = s
  326. sess = s
  327. }
  328. // Attach any pending listeners
  329. st.attachPendingListeners(sess)
  330. }
  331. // Update metadata
  332. sess.lastFeed = now
  333. sess.centerHz = sig.CenterHz
  334. sess.bwHz = sig.BWHz
  335. if sig.SNRDb > sess.snrDb {
  336. sess.snrDb = sig.SNRDb
  337. }
  338. if sig.PeakDb > sess.peakDb {
  339. sess.peakDb = sig.PeakDb
  340. }
  341. if sig.Class != nil {
  342. sess.class = sig.Class
  343. }
  344. // Demod with persistent state
  345. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  346. if len(audio) > 0 {
  347. if sess.wavSamples == 0 && audioRate > 0 {
  348. sess.sampleRate = audioRate
  349. }
  350. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  351. pcmLen := len(audio) * 2
  352. pcm := sess.growPCM(pcmLen)
  353. for k, s := range audio {
  354. v := int16(clip(s * 32767))
  355. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  356. }
  357. if !sess.listenOnly && sess.wavBuf != nil {
  358. n, err := sess.wavBuf.Write(pcm)
  359. if err != nil {
  360. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  361. } else {
  362. sess.wavSamples += int64(n / 2)
  363. }
  364. }
  365. // Gap logging for live-audio sessions + boundary delta check
  366. if len(sess.audioSubs) > 0 {
  367. if !sess.lastAudioTs.IsZero() {
  368. gap := time.Since(sess.lastAudioTs)
  369. if gap > 150*time.Millisecond {
  370. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  371. }
  372. }
  373. // boundary delta (compare previous last sample with current first sample)
  374. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  375. if sess.lastAudioSet {
  376. if sess.channels > 1 && len(audio) >= 2 {
  377. dL := float64(audio[0] - sess.lastAudioL)
  378. dR := float64(audio[1] - sess.lastAudioR)
  379. if dL < 0 { dL = -dL }
  380. if dR < 0 { dR = -dR }
  381. if dL > 0.2 || dR > 0.2 {
  382. logging.Warn("boundary", "audio_step", "signal", sess.signalID, "dL", dL, "dR", dR)
  383. }
  384. } else {
  385. d := float64(audio[0] - sess.lastAudioL)
  386. if d < 0 { d = -d }
  387. if d > 0.2 {
  388. logging.Warn("boundary", "audio_step", "signal", sess.signalID, "dL", d)
  389. }
  390. }
  391. }
  392. // store last sample
  393. if sess.channels > 1 {
  394. lastIdx := (len(audio)-2)
  395. if lastIdx < 0 { lastIdx = 0 }
  396. sess.lastAudioL = audio[lastIdx]
  397. sess.lastAudioR = audio[lastIdx+1]
  398. } else {
  399. sess.lastAudioL = audio[len(audio)-1]
  400. sess.lastAudioR = 0
  401. }
  402. sess.lastAudioSet = true
  403. }
  404. sess.lastAudioTs = time.Now()
  405. }
  406. st.fanoutPCM(sess, pcm, pcmLen)
  407. }
  408. // Segment split (recording sessions only)
  409. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  410. segIdx := sess.segmentIdx + 1
  411. oldSubs := sess.audioSubs
  412. oldState := sess.captureDSPState()
  413. sess.audioSubs = nil
  414. closeSession(sess, &st.policy)
  415. s, err := st.openRecordingSession(sig, now)
  416. if err != nil {
  417. delete(st.sessions, sig.ID)
  418. continue
  419. }
  420. s.segmentIdx = segIdx
  421. s.audioSubs = oldSubs
  422. s.restoreDSPState(oldState)
  423. st.sessions[sig.ID] = s
  424. }
  425. }
  426. // Close sessions for disappeared signals (with grace period)
  427. for id, sess := range st.sessions {
  428. if seen[id] {
  429. continue
  430. }
  431. gracePeriod := 3 * time.Second
  432. if sess.listenOnly {
  433. gracePeriod = 5 * time.Second
  434. }
  435. if now.Sub(sess.lastFeed) > gracePeriod {
  436. for _, sub := range sess.audioSubs {
  437. close(sub.ch)
  438. }
  439. sess.audioSubs = nil
  440. if !sess.listenOnly {
  441. closeSession(sess, &st.policy)
  442. }
  443. delete(st.sessions, id)
  444. }
  445. }
  446. }
  447. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  448. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  449. if st.policy.DebugLiveAudio {
  450. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  451. }
  452. return true
  453. }
  454. for subID, pl := range st.pendingListens {
  455. delta := math.Abs(sig.CenterHz - pl.freq)
  456. if delta < 200000 {
  457. if st.policy.DebugLiveAudio {
  458. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  459. }
  460. return true
  461. }
  462. }
  463. return false
  464. }
  465. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  466. for subID, pl := range st.pendingListens {
  467. requestedMode := normalizeRequestedMode(pl.mode)
  468. if requestedMode != "" && sess.demodName != requestedMode {
  469. continue
  470. }
  471. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  472. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  473. delete(st.pendingListens, subID)
  474. // Send updated audio_info now that we know the real session params.
  475. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  476. infoJSON, _ := json.Marshal(sess.audioInfo())
  477. tagged := make([]byte, 1+len(infoJSON))
  478. tagged[0] = 0x00 // tag: audio_info
  479. copy(tagged[1:], infoJSON)
  480. select {
  481. case pl.ch <- tagged:
  482. default:
  483. }
  484. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  485. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  486. }
  487. }
  488. }
  489. // CloseAll finalises all sessions and stops the worker goroutine.
  490. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  491. st.mu.Lock()
  492. defer st.mu.Unlock()
  493. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  494. for _, sess := range st.sessions {
  495. out[sess.signalID] = RuntimeSignalInfo{
  496. DemodName: sess.demodName,
  497. PlaybackMode: sess.playbackMode,
  498. StereoState: sess.stereoState,
  499. Channels: sess.channels,
  500. SampleRate: sess.sampleRate,
  501. }
  502. }
  503. return out
  504. }
  505. func (st *Streamer) CloseAll() {
  506. close(st.feedCh)
  507. <-st.done
  508. st.mu.Lock()
  509. defer st.mu.Unlock()
  510. for id, sess := range st.sessions {
  511. for _, sub := range sess.audioSubs {
  512. close(sub.ch)
  513. }
  514. sess.audioSubs = nil
  515. if !sess.listenOnly {
  516. closeSession(sess, &st.policy)
  517. }
  518. delete(st.sessions, id)
  519. }
  520. for _, pl := range st.pendingListens {
  521. close(pl.ch)
  522. }
  523. st.pendingListens = nil
  524. }
  525. // ActiveSessions returns the number of open streaming sessions.
  526. func (st *Streamer) ActiveSessions() int {
  527. st.mu.Lock()
  528. defer st.mu.Unlock()
  529. return len(st.sessions)
  530. }
  531. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  532. //
  533. // LL-2: Returns AudioInfo with correct channels and sample rate.
  534. // LL-3: Returns error only on hard failures (nil streamer etc).
  535. //
  536. // If a matching session exists, attaches immediately. Otherwise, the
  537. // subscriber is held as "pending" and will be attached when a matching
  538. // signal appears in the next DSP frame.
  539. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  540. ch := make(chan []byte, 64)
  541. st.mu.Lock()
  542. defer st.mu.Unlock()
  543. st.nextSub++
  544. subID := st.nextSub
  545. requestedMode := normalizeRequestedMode(mode)
  546. // Try to find a matching session
  547. var bestSess *streamSession
  548. bestDist := math.MaxFloat64
  549. for _, sess := range st.sessions {
  550. if requestedMode != "" && sess.demodName != requestedMode {
  551. continue
  552. }
  553. d := math.Abs(sess.centerHz - freq)
  554. if d < bestDist {
  555. bestDist = d
  556. bestSess = sess
  557. }
  558. }
  559. if bestSess != nil && bestDist < 200000 {
  560. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  561. info := bestSess.audioInfo()
  562. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  563. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  564. return subID, ch, info, nil
  565. }
  566. // No matching session yet — add as pending listener
  567. st.pendingListens[subID] = &pendingListen{
  568. freq: freq,
  569. bw: bw,
  570. mode: mode,
  571. ch: ch,
  572. }
  573. info := defaultAudioInfoForMode(mode)
  574. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  575. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  576. return subID, ch, info, nil
  577. }
  578. // UnsubscribeAudio removes a live-listen subscriber.
  579. func (st *Streamer) UnsubscribeAudio(subID int64) {
  580. st.mu.Lock()
  581. defer st.mu.Unlock()
  582. if pl, ok := st.pendingListens[subID]; ok {
  583. close(pl.ch)
  584. delete(st.pendingListens, subID)
  585. return
  586. }
  587. for _, sess := range st.sessions {
  588. for i, sub := range sess.audioSubs {
  589. if sub.id == subID {
  590. close(sub.ch)
  591. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  592. return
  593. }
  594. }
  595. }
  596. }
  597. // ---------------------------------------------------------------------------
  598. // Session: stateful extraction + demod
  599. // ---------------------------------------------------------------------------
  600. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  601. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  602. // output with zero transient artifacts.
  603. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  604. if len(snippet) == 0 || snipRate <= 0 {
  605. return nil, 0
  606. }
  607. isWFMStereo := sess.demodName == "WFM_STEREO"
  608. isWFM := sess.demodName == "WFM" || isWFMStereo
  609. demodName := sess.demodName
  610. if isWFMStereo {
  611. demodName = "WFM"
  612. }
  613. d := demod.Get(demodName)
  614. if d == nil {
  615. d = demod.Get("NFM")
  616. }
  617. if d == nil {
  618. return nil, 0
  619. }
  620. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  621. // The FM discriminator needs iq[i-1] to compute the first output.
  622. // All FIR filtering is now stateful, so no additional overlap is needed.
  623. var fullSnip []complex64
  624. trimSamples := 0
  625. _ = trimSamples
  626. if len(sess.overlapIQ) == 1 {
  627. fullSnip = make([]complex64, 1+len(snippet))
  628. fullSnip[0] = sess.overlapIQ[0]
  629. copy(fullSnip[1:], snippet)
  630. trimSamples = 1
  631. logging.Debug("discrim", "overlap_applied", "signal", sess.signalID, "snip", len(snippet))
  632. } else {
  633. fullSnip = snippet
  634. }
  635. // Save last sample for next frame's FM discriminator
  636. if len(snippet) > 0 {
  637. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  638. }
  639. // --- Stateful anti-alias FIR + decimation to demod rate ---
  640. demodRate := d.OutputSampleRate()
  641. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  642. if decim1 < 1 {
  643. decim1 = 1
  644. }
  645. actualDemodRate := snipRate / decim1
  646. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  647. var dec []complex64
  648. if decim1 > 1 {
  649. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  650. // Lazy-init or reinit stateful FIR if parameters changed
  651. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  652. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  653. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  654. sess.preDemodRate = snipRate
  655. sess.preDemodCutoff = cutoff
  656. sess.preDemodDecim = decim1
  657. }
  658. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  659. dec = dsp.Decimate(filtered, decim1)
  660. } else {
  661. dec = fullSnip
  662. }
  663. // --- FM Demod ---
  664. audio := d.Demod(dec, actualDemodRate)
  665. if len(audio) == 0 {
  666. return nil, 0
  667. }
  668. // --- Trim the 1-sample FM discriminator overlap ---
  669. // TEMP: skip audio trim to test if per-block trimming causes ticks
  670. // --- Stateful stereo decode with conservative lock/hysteresis ---
  671. channels := 1
  672. if isWFMStereo {
  673. sess.playbackMode = "WFM_STEREO"
  674. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  675. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  676. if locked {
  677. sess.stereoOnCount++
  678. sess.stereoOffCount = 0
  679. if sess.stereoOnCount >= 4 {
  680. sess.stereoEnabled = true
  681. }
  682. } else {
  683. sess.stereoOnCount = 0
  684. sess.stereoOffCount++
  685. if sess.stereoOffCount >= 10 {
  686. sess.stereoEnabled = false
  687. }
  688. }
  689. prevPlayback := sess.playbackMode
  690. prevStereo := sess.stereoState
  691. if sess.stereoEnabled && len(stereoAudio) > 0 {
  692. sess.stereoState = "locked"
  693. audio = stereoAudio
  694. } else {
  695. sess.stereoState = "mono-fallback"
  696. dual := make([]float32, len(audio)*2)
  697. for i, s := range audio {
  698. dual[i*2] = s
  699. dual[i*2+1] = s
  700. }
  701. audio = dual
  702. }
  703. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  704. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  705. }
  706. }
  707. // --- Polyphase resample to exact 48kHz ---
  708. if actualDemodRate != streamAudioRate {
  709. if channels > 1 {
  710. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  711. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  712. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  713. sess.stereoResamplerRate = actualDemodRate
  714. }
  715. audio = sess.stereoResampler.Process(audio)
  716. } else {
  717. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  718. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  719. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  720. sess.monoResamplerRate = actualDemodRate
  721. }
  722. audio = sess.monoResampler.Process(audio)
  723. }
  724. }
  725. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  726. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  727. tau := sess.deemphasisUs * 1e-6
  728. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  729. if channels > 1 {
  730. nFrames := len(audio) / channels
  731. yL, yR := sess.deemphL, sess.deemphR
  732. for i := 0; i < nFrames; i++ {
  733. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  734. audio[i*2] = float32(yL)
  735. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  736. audio[i*2+1] = float32(yR)
  737. }
  738. sess.deemphL, sess.deemphR = yL, yR
  739. } else {
  740. y := sess.deemphL
  741. for i := range audio {
  742. y = alpha*y + (1-alpha)*float64(audio[i])
  743. audio[i] = float32(y)
  744. }
  745. sess.deemphL = y
  746. }
  747. }
  748. if isWFM {
  749. for i := range audio {
  750. audio[i] *= 0.35
  751. }
  752. }
  753. return audio, streamAudioRate
  754. }
  755. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  756. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  757. // loopBW is in Hz, sampleRate in samples/sec.
  758. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  759. if sampleRate <= 0 || loopBW <= 0 {
  760. return 0, 0
  761. }
  762. bl := loopBW / float64(sampleRate)
  763. theta := bl / (damping + 0.25/damping)
  764. d := 1 + 2*damping*theta + theta*theta
  765. alpha := (4 * damping * theta) / d
  766. beta := (4 * theta * theta) / d
  767. return alpha, beta
  768. }
  769. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  770. // Uses persistent FIR filter state across frames for click-free stereo.
  771. // Reuses session scratch buffers to minimize allocations.
  772. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  773. if len(mono) == 0 || sampleRate <= 0 {
  774. return nil, false
  775. }
  776. n := len(mono)
  777. // Rebuild rate-dependent stereo filters when sampleRate changes
  778. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  779. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  780. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  781. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  782. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  783. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  784. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  785. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  786. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  787. sess.stereoFilterRate = sampleRate
  788. // Initialize PLL for 19kHz pilot tracking.
  789. sess.pilotPhase = 0
  790. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  791. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  792. sess.pilotErrAvg = 0
  793. sess.pilotI = 0
  794. sess.pilotQ = 0
  795. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  796. }
  797. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  798. scratch := sess.growAudio(n * 5)
  799. lpr := scratch[:n]
  800. bpfLR := scratch[n : 2*n]
  801. lr := scratch[2*n : 3*n]
  802. work1 := scratch[3*n : 4*n]
  803. work2 := scratch[4*n : 5*n]
  804. sess.stereoLPF.ProcessInto(mono, lpr)
  805. // 23-53kHz bandpass for L-R DSB-SC.
  806. sess.stereoBPHi.ProcessInto(mono, work1)
  807. sess.stereoBPLo.ProcessInto(mono, work2)
  808. for i := 0; i < n; i++ {
  809. bpfLR[i] = work1[i] - work2[i]
  810. }
  811. // 19kHz pilot bandpass for PLL.
  812. sess.pilotLPFHi.ProcessInto(mono, work1)
  813. sess.pilotLPFLo.ProcessInto(mono, work2)
  814. for i := 0; i < n; i++ {
  815. work1[i] = work1[i] - work2[i]
  816. }
  817. pilot := work1
  818. phase := sess.pilotPhase
  819. freq := sess.pilotFreq
  820. alpha := sess.pilotAlpha
  821. beta := sess.pilotBeta
  822. iState := sess.pilotI
  823. qState := sess.pilotQ
  824. lpAlpha := sess.pilotLPAlpha
  825. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  826. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  827. var pilotPower float64
  828. var totalPower float64
  829. var errSum float64
  830. for i := 0; i < n; i++ {
  831. p := float64(pilot[i])
  832. sinP, cosP := math.Sincos(phase)
  833. iMix := p * cosP
  834. qMix := p * -sinP
  835. iState += lpAlpha * (iMix - iState)
  836. qState += lpAlpha * (qMix - qState)
  837. err := math.Atan2(qState, iState)
  838. freq += beta * err
  839. if freq < minFreq {
  840. freq = minFreq
  841. } else if freq > maxFreq {
  842. freq = maxFreq
  843. }
  844. phase += freq + alpha*err
  845. if phase > 2*math.Pi {
  846. phase -= 2 * math.Pi
  847. } else if phase < 0 {
  848. phase += 2 * math.Pi
  849. }
  850. totalPower += float64(mono[i]) * float64(mono[i])
  851. pilotPower += p * p
  852. errSum += math.Abs(err)
  853. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  854. }
  855. sess.pilotPhase = phase
  856. sess.pilotFreq = freq
  857. sess.pilotI = iState
  858. sess.pilotQ = qState
  859. blockErr := errSum / float64(n)
  860. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  861. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  862. pilotRatio := 0.0
  863. if totalPower > 0 {
  864. pilotRatio = pilotPower / totalPower
  865. }
  866. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  867. // Lock heuristics: pilot power fraction and PLL phase error stability.
  868. // Pilot power is a small but stable fraction of composite energy; require
  869. // a modest floor plus PLL settling to avoid flapping in noise.
  870. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  871. out := make([]float32, n*2)
  872. for i := 0; i < n; i++ {
  873. out[i*2] = 0.5 * (lpr[i] + lr[i])
  874. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  875. }
  876. return out, locked
  877. }
  878. // dspStateSnapshot captures persistent DSP state for segment splits.
  879. type dspStateSnapshot struct {
  880. overlapIQ []complex64
  881. deemphL float64
  882. deemphR float64
  883. pilotPhase float64
  884. pilotFreq float64
  885. pilotAlpha float64
  886. pilotBeta float64
  887. pilotErrAvg float64
  888. pilotI float64
  889. pilotQ float64
  890. pilotLPAlpha float64
  891. monoResampler *dsp.Resampler
  892. monoResamplerRate int
  893. stereoResampler *dsp.StereoResampler
  894. stereoResamplerRate int
  895. stereoLPF *dsp.StatefulFIRReal
  896. stereoFilterRate int
  897. stereoBPHi *dsp.StatefulFIRReal
  898. stereoBPLo *dsp.StatefulFIRReal
  899. stereoLRLPF *dsp.StatefulFIRReal
  900. stereoAALPF *dsp.StatefulFIRReal
  901. pilotLPFHi *dsp.StatefulFIRReal
  902. pilotLPFLo *dsp.StatefulFIRReal
  903. preDemodFIR *dsp.StatefulFIRComplex
  904. preDemodDecim int
  905. preDemodRate int
  906. preDemodCutoff float64
  907. }
  908. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  909. return dspStateSnapshot{
  910. overlapIQ: sess.overlapIQ,
  911. deemphL: sess.deemphL,
  912. deemphR: sess.deemphR,
  913. pilotPhase: sess.pilotPhase,
  914. pilotFreq: sess.pilotFreq,
  915. pilotAlpha: sess.pilotAlpha,
  916. pilotBeta: sess.pilotBeta,
  917. pilotErrAvg: sess.pilotErrAvg,
  918. pilotI: sess.pilotI,
  919. pilotQ: sess.pilotQ,
  920. pilotLPAlpha: sess.pilotLPAlpha,
  921. monoResampler: sess.monoResampler,
  922. monoResamplerRate: sess.monoResamplerRate,
  923. stereoResampler: sess.stereoResampler,
  924. stereoResamplerRate: sess.stereoResamplerRate,
  925. stereoLPF: sess.stereoLPF,
  926. stereoFilterRate: sess.stereoFilterRate,
  927. stereoBPHi: sess.stereoBPHi,
  928. stereoBPLo: sess.stereoBPLo,
  929. stereoLRLPF: sess.stereoLRLPF,
  930. stereoAALPF: sess.stereoAALPF,
  931. pilotLPFHi: sess.pilotLPFHi,
  932. pilotLPFLo: sess.pilotLPFLo,
  933. preDemodFIR: sess.preDemodFIR,
  934. preDemodDecim: sess.preDemodDecim,
  935. preDemodRate: sess.preDemodRate,
  936. preDemodCutoff: sess.preDemodCutoff,
  937. }
  938. }
  939. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  940. sess.overlapIQ = s.overlapIQ
  941. sess.deemphL = s.deemphL
  942. sess.deemphR = s.deemphR
  943. sess.pilotPhase = s.pilotPhase
  944. sess.pilotFreq = s.pilotFreq
  945. sess.pilotAlpha = s.pilotAlpha
  946. sess.pilotBeta = s.pilotBeta
  947. sess.pilotErrAvg = s.pilotErrAvg
  948. sess.pilotI = s.pilotI
  949. sess.pilotQ = s.pilotQ
  950. sess.pilotLPAlpha = s.pilotLPAlpha
  951. sess.monoResampler = s.monoResampler
  952. sess.monoResamplerRate = s.monoResamplerRate
  953. sess.stereoResampler = s.stereoResampler
  954. sess.stereoResamplerRate = s.stereoResamplerRate
  955. sess.stereoLPF = s.stereoLPF
  956. sess.stereoFilterRate = s.stereoFilterRate
  957. sess.stereoBPHi = s.stereoBPHi
  958. sess.stereoBPLo = s.stereoBPLo
  959. sess.stereoLRLPF = s.stereoLRLPF
  960. sess.stereoAALPF = s.stereoAALPF
  961. sess.pilotLPFHi = s.pilotLPFHi
  962. sess.pilotLPFLo = s.pilotLPFLo
  963. sess.preDemodFIR = s.preDemodFIR
  964. sess.preDemodDecim = s.preDemodDecim
  965. sess.preDemodRate = s.preDemodRate
  966. sess.preDemodCutoff = s.preDemodCutoff
  967. }
  968. // ---------------------------------------------------------------------------
  969. // Session management helpers
  970. // ---------------------------------------------------------------------------
  971. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  972. outputDir := st.policy.OutputDir
  973. if outputDir == "" {
  974. outputDir = "data/recordings"
  975. }
  976. demodName, channels := resolveDemod(sig)
  977. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  978. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  979. dir := filepath.Join(outputDir, dirName)
  980. if err := os.MkdirAll(dir, 0o755); err != nil {
  981. return nil, err
  982. }
  983. wavPath := filepath.Join(dir, "audio.wav")
  984. f, err := os.Create(wavPath)
  985. if err != nil {
  986. return nil, err
  987. }
  988. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  989. f.Close()
  990. return nil, err
  991. }
  992. playbackMode, stereoState := initialPlaybackState(demodName)
  993. sess := &streamSession{
  994. signalID: sig.ID,
  995. centerHz: sig.CenterHz,
  996. bwHz: sig.BWHz,
  997. snrDb: sig.SNRDb,
  998. peakDb: sig.PeakDb,
  999. class: sig.Class,
  1000. startTime: now,
  1001. lastFeed: now,
  1002. dir: dir,
  1003. wavFile: f,
  1004. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1005. sampleRate: streamAudioRate,
  1006. channels: channels,
  1007. demodName: demodName,
  1008. playbackMode: playbackMode,
  1009. stereoState: stereoState,
  1010. deemphasisUs: st.policy.DeemphasisUs,
  1011. }
  1012. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1013. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1014. return sess, nil
  1015. }
  1016. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1017. demodName, channels := resolveDemod(sig)
  1018. for _, pl := range st.pendingListens {
  1019. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1020. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1021. demodName = requested
  1022. if demodName == "WFM_STEREO" {
  1023. channels = 2
  1024. } else if d := demod.Get(demodName); d != nil {
  1025. channels = d.Channels()
  1026. } else {
  1027. channels = 1
  1028. }
  1029. break
  1030. }
  1031. }
  1032. }
  1033. playbackMode, stereoState := initialPlaybackState(demodName)
  1034. sess := &streamSession{
  1035. signalID: sig.ID,
  1036. centerHz: sig.CenterHz,
  1037. bwHz: sig.BWHz,
  1038. snrDb: sig.SNRDb,
  1039. peakDb: sig.PeakDb,
  1040. class: sig.Class,
  1041. startTime: now,
  1042. lastFeed: now,
  1043. listenOnly: true,
  1044. sampleRate: streamAudioRate,
  1045. channels: channels,
  1046. demodName: demodName,
  1047. playbackMode: playbackMode,
  1048. stereoState: stereoState,
  1049. deemphasisUs: st.policy.DeemphasisUs,
  1050. }
  1051. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1052. sig.ID, sig.CenterHz/1e6, demodName)
  1053. return sess
  1054. }
  1055. func resolveDemod(sig *detector.Signal) (string, int) {
  1056. demodName := "NFM"
  1057. if sig.Class != nil {
  1058. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1059. demodName = n
  1060. }
  1061. }
  1062. channels := 1
  1063. if demodName == "WFM_STEREO" {
  1064. channels = 2
  1065. } else if d := demod.Get(demodName); d != nil {
  1066. channels = d.Channels()
  1067. }
  1068. return demodName, channels
  1069. }
  1070. func initialPlaybackState(demodName string) (string, string) {
  1071. playbackMode := demodName
  1072. stereoState := "mono"
  1073. if demodName == "WFM_STEREO" {
  1074. stereoState = "searching"
  1075. }
  1076. return playbackMode, stereoState
  1077. }
  1078. func (sess *streamSession) audioInfo() AudioInfo {
  1079. return AudioInfo{
  1080. SampleRate: sess.sampleRate,
  1081. Channels: sess.channels,
  1082. Format: "s16le",
  1083. DemodName: sess.demodName,
  1084. PlaybackMode: sess.playbackMode,
  1085. StereoState: sess.stereoState,
  1086. }
  1087. }
  1088. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1089. infoJSON, _ := json.Marshal(info)
  1090. tagged := make([]byte, 1+len(infoJSON))
  1091. tagged[0] = 0x00 // tag: audio_info
  1092. copy(tagged[1:], infoJSON)
  1093. for _, sub := range subs {
  1094. select {
  1095. case sub.ch <- tagged:
  1096. default:
  1097. }
  1098. }
  1099. }
  1100. func defaultAudioInfoForMode(mode string) AudioInfo {
  1101. demodName := "NFM"
  1102. if requested := normalizeRequestedMode(mode); requested != "" {
  1103. demodName = requested
  1104. }
  1105. channels := 1
  1106. if demodName == "WFM_STEREO" {
  1107. channels = 2
  1108. } else if d := demod.Get(demodName); d != nil {
  1109. channels = d.Channels()
  1110. }
  1111. playbackMode, stereoState := initialPlaybackState(demodName)
  1112. return AudioInfo{
  1113. SampleRate: streamAudioRate,
  1114. Channels: channels,
  1115. Format: "s16le",
  1116. DemodName: demodName,
  1117. PlaybackMode: playbackMode,
  1118. StereoState: stereoState,
  1119. }
  1120. }
  1121. func normalizeRequestedMode(mode string) string {
  1122. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1123. case "", "AUTO":
  1124. return ""
  1125. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1126. return strings.ToUpper(strings.TrimSpace(mode))
  1127. default:
  1128. return ""
  1129. }
  1130. }
  1131. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1132. func (sess *streamSession) growIQ(n int) []complex64 {
  1133. if cap(sess.scratchIQ) >= n {
  1134. return sess.scratchIQ[:n]
  1135. }
  1136. sess.scratchIQ = make([]complex64, n, n*5/4)
  1137. return sess.scratchIQ
  1138. }
  1139. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1140. func (sess *streamSession) growAudio(n int) []float32 {
  1141. if cap(sess.scratchAudio) >= n {
  1142. return sess.scratchAudio[:n]
  1143. }
  1144. sess.scratchAudio = make([]float32, n, n*5/4)
  1145. return sess.scratchAudio
  1146. }
  1147. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1148. func (sess *streamSession) growPCM(n int) []byte {
  1149. if cap(sess.scratchPCM) >= n {
  1150. return sess.scratchPCM[:n]
  1151. }
  1152. sess.scratchPCM = make([]byte, n, n*5/4)
  1153. return sess.scratchPCM
  1154. }
  1155. func convertToListenOnly(sess *streamSession) {
  1156. if sess.wavBuf != nil {
  1157. _ = sess.wavBuf.Flush()
  1158. }
  1159. if sess.wavFile != nil {
  1160. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1161. sess.wavFile.Close()
  1162. }
  1163. sess.wavFile = nil
  1164. sess.wavBuf = nil
  1165. sess.listenOnly = true
  1166. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1167. }
  1168. func closeSession(sess *streamSession, policy *Policy) {
  1169. if sess.listenOnly {
  1170. return
  1171. }
  1172. if sess.wavBuf != nil {
  1173. _ = sess.wavBuf.Flush()
  1174. }
  1175. if sess.wavFile != nil {
  1176. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1177. sess.wavFile.Close()
  1178. sess.wavFile = nil
  1179. sess.wavBuf = nil
  1180. }
  1181. dur := sess.lastFeed.Sub(sess.startTime)
  1182. files := map[string]any{
  1183. "audio": "audio.wav",
  1184. "audio_sample_rate": sess.sampleRate,
  1185. "audio_channels": sess.channels,
  1186. "audio_demod": sess.demodName,
  1187. "recording_mode": "streaming",
  1188. }
  1189. meta := Meta{
  1190. EventID: sess.signalID,
  1191. Start: sess.startTime,
  1192. End: sess.lastFeed,
  1193. CenterHz: sess.centerHz,
  1194. BandwidthHz: sess.bwHz,
  1195. SampleRate: sess.sampleRate,
  1196. SNRDb: sess.snrDb,
  1197. PeakDb: sess.peakDb,
  1198. Class: sess.class,
  1199. DurationMs: dur.Milliseconds(),
  1200. Files: files,
  1201. }
  1202. b, err := json.MarshalIndent(meta, "", " ")
  1203. if err == nil {
  1204. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1205. }
  1206. if policy != nil {
  1207. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1208. }
  1209. }
  1210. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1211. if len(sess.audioSubs) == 0 {
  1212. return
  1213. }
  1214. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1215. tagged := make([]byte, 1+pcmLen)
  1216. tagged[0] = 0x01
  1217. copy(tagged[1:], pcm[:pcmLen])
  1218. alive := sess.audioSubs[:0]
  1219. for _, sub := range sess.audioSubs {
  1220. select {
  1221. case sub.ch <- tagged:
  1222. default:
  1223. st.droppedPCM++
  1224. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1225. }
  1226. alive = append(alive, sub)
  1227. }
  1228. sess.audioSubs = alive
  1229. }
  1230. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1231. if len(st.policy.ClassFilter) == 0 {
  1232. return true
  1233. }
  1234. if cls == nil {
  1235. return false
  1236. }
  1237. for _, f := range st.policy.ClassFilter {
  1238. if strings.EqualFold(f, string(cls.ModType)) {
  1239. return true
  1240. }
  1241. }
  1242. return false
  1243. }
  1244. // ErrNoSession is returned when no matching signal session exists.
  1245. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1246. // ---------------------------------------------------------------------------
  1247. // WAV header helpers
  1248. // ---------------------------------------------------------------------------
  1249. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1250. if channels <= 0 {
  1251. channels = 1
  1252. }
  1253. hdr := make([]byte, 44)
  1254. copy(hdr[0:4], "RIFF")
  1255. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1256. copy(hdr[8:12], "WAVE")
  1257. copy(hdr[12:16], "fmt ")
  1258. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1259. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1260. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1261. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1262. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1263. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1264. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1265. copy(hdr[36:40], "data")
  1266. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1267. _, err := f.Write(hdr)
  1268. return err
  1269. }
  1270. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1271. dataSize := uint32(totalSamples * 2)
  1272. var buf [4]byte
  1273. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1274. if _, err := f.Seek(4, 0); err != nil {
  1275. return
  1276. }
  1277. _, _ = f.Write(buf[:])
  1278. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1279. if _, err := f.Seek(24, 0); err != nil {
  1280. return
  1281. }
  1282. _, _ = f.Write(buf[:])
  1283. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1284. if _, err := f.Seek(28, 0); err != nil {
  1285. return
  1286. }
  1287. _, _ = f.Write(buf[:])
  1288. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1289. if _, err := f.Seek(40, 0); err != nil {
  1290. return
  1291. }
  1292. _, _ = f.Write(buf[:])
  1293. }