Wideband autonomous SDR analysis engine forked from sdr-visual-suite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1383 lines
39KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. // listenOnly sessions have no WAV file and no disk I/O.
  37. // They exist solely to feed audio to live-listen subscribers.
  38. listenOnly bool
  39. // Recording state (nil/zero for listen-only sessions)
  40. dir string
  41. wavFile *os.File
  42. wavBuf *bufio.Writer
  43. wavSamples int64
  44. segmentIdx int
  45. sampleRate int // actual output audio sample rate (always streamAudioRate)
  46. channels int
  47. demodName string
  48. // --- Persistent DSP state for click-free streaming ---
  49. // Overlap-save: tail of previous extracted IQ snippet.
  50. overlapIQ []complex64
  51. // De-emphasis IIR state (persists across frames)
  52. deemphL float64
  53. deemphR float64
  54. // Stereo lock state for live WFM streaming
  55. stereoEnabled bool
  56. stereoOnCount int
  57. stereoOffCount int
  58. // Pilot-locked stereo PLL state (19kHz pilot)
  59. pilotPhase float64
  60. pilotFreq float64
  61. pilotAlpha float64
  62. pilotBeta float64
  63. pilotErrAvg float64
  64. pilotI float64
  65. pilotQ float64
  66. pilotLPAlpha float64
  67. // Polyphase resampler (replaces integer-decimate hack)
  68. monoResampler *dsp.Resampler
  69. monoResamplerRate int
  70. stereoResampler *dsp.StereoResampler
  71. stereoResamplerRate int
  72. // AQ-4: Stateful FIR filters for click-free stereo decode
  73. stereoFilterRate int
  74. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  75. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  76. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  77. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  78. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  79. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  80. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  81. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  82. // and avoids per-frame FIR recomputation)
  83. preDemodFIR *dsp.StatefulFIRComplex
  84. preDemodDecim int // cached decimation factor
  85. preDemodRate int // cached snipRate this FIR was built for
  86. preDemodCutoff float64 // cached cutoff
  87. // AQ-2: De-emphasis config (µs, 0 = disabled)
  88. deemphasisUs float64
  89. // Scratch buffers — reused across frames to avoid GC pressure.
  90. // Grown as needed, never shrunk.
  91. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  92. scratchAudio []float32 // for stereo decode intermediates
  93. scratchPCM []byte // for PCM encoding
  94. // live-listen subscribers
  95. audioSubs []audioSub
  96. }
  97. type audioSub struct {
  98. id int64
  99. ch chan []byte
  100. }
  101. type RuntimeSignalInfo struct {
  102. DemodName string
  103. PlaybackMode string
  104. StereoState string
  105. Channels int
  106. SampleRate int
  107. }
  108. // AudioInfo describes the audio format of a live-listen subscription.
  109. // Sent to the WebSocket client as the first message.
  110. type AudioInfo struct {
  111. SampleRate int `json:"sample_rate"`
  112. Channels int `json:"channels"`
  113. Format string `json:"format"` // always "s16le"
  114. DemodName string `json:"demod"`
  115. PlaybackMode string `json:"playback_mode,omitempty"`
  116. StereoState string `json:"stereo_state,omitempty"`
  117. }
  118. const (
  119. streamAudioRate = 48000
  120. resamplerTaps = 32 // taps per polyphase arm — good quality
  121. )
  122. // ---------------------------------------------------------------------------
  123. // Streamer — manages all active streaming sessions
  124. // ---------------------------------------------------------------------------
  125. type streamFeedItem struct {
  126. signal detector.Signal
  127. snippet []complex64
  128. snipRate int
  129. }
  130. type streamFeedMsg struct {
  131. items []streamFeedItem
  132. }
  133. type Streamer struct {
  134. mu sync.Mutex
  135. sessions map[int64]*streamSession
  136. policy Policy
  137. centerHz float64
  138. nextSub int64
  139. feedCh chan streamFeedMsg
  140. done chan struct{}
  141. droppedFeed uint64
  142. droppedPCM uint64
  143. // pendingListens are subscribers waiting for a matching session.
  144. pendingListens map[int64]*pendingListen
  145. }
  146. type pendingListen struct {
  147. freq float64
  148. bw float64
  149. mode string
  150. ch chan []byte
  151. }
  152. func newStreamer(policy Policy, centerHz float64) *Streamer {
  153. st := &Streamer{
  154. sessions: make(map[int64]*streamSession),
  155. policy: policy,
  156. centerHz: centerHz,
  157. feedCh: make(chan streamFeedMsg, 2),
  158. done: make(chan struct{}),
  159. pendingListens: make(map[int64]*pendingListen),
  160. }
  161. go st.worker()
  162. return st
  163. }
  164. func (st *Streamer) worker() {
  165. for msg := range st.feedCh {
  166. st.processFeed(msg)
  167. }
  168. close(st.done)
  169. }
  170. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  171. st.mu.Lock()
  172. defer st.mu.Unlock()
  173. wasEnabled := st.policy.Enabled
  174. st.policy = policy
  175. st.centerHz = centerHz
  176. // If recording was just disabled, close recording sessions
  177. // but keep listen-only sessions alive.
  178. if wasEnabled && !policy.Enabled {
  179. for id, sess := range st.sessions {
  180. if sess.listenOnly {
  181. continue
  182. }
  183. if len(sess.audioSubs) > 0 {
  184. // Convert to listen-only: close WAV but keep session
  185. convertToListenOnly(sess)
  186. } else {
  187. closeSession(sess, &st.policy)
  188. delete(st.sessions, id)
  189. }
  190. }
  191. }
  192. }
  193. // HasListeners returns true if any sessions have audio subscribers
  194. // or there are pending listen requests. Used by the DSP loop to
  195. // decide whether to feed snippets even when recording is disabled.
  196. func (st *Streamer) HasListeners() bool {
  197. st.mu.Lock()
  198. defer st.mu.Unlock()
  199. return st.hasListenersLocked()
  200. }
  201. func (st *Streamer) hasListenersLocked() bool {
  202. if len(st.pendingListens) > 0 {
  203. return true
  204. }
  205. for _, sess := range st.sessions {
  206. if len(sess.audioSubs) > 0 {
  207. return true
  208. }
  209. }
  210. return false
  211. }
  212. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  213. // Feeds are accepted if:
  214. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  215. // - Any live-listen subscribers exist (listen-only mode)
  216. //
  217. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  218. // data, so items can be passed directly without another copy.
  219. func (st *Streamer) FeedSnippets(items []streamFeedItem) {
  220. st.mu.Lock()
  221. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  222. hasListeners := st.hasListenersLocked()
  223. pending := len(st.pendingListens)
  224. debugLiveAudio := st.policy.DebugLiveAudio
  225. st.mu.Unlock()
  226. if debugLiveAudio {
  227. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  228. }
  229. if (!recEnabled && !hasListeners) || len(items) == 0 {
  230. return
  231. }
  232. select {
  233. case st.feedCh <- streamFeedMsg{items: items}:
  234. default:
  235. st.droppedFeed++
  236. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  237. }
  238. }
  239. // processFeed runs in the worker goroutine.
  240. func (st *Streamer) processFeed(msg streamFeedMsg) {
  241. st.mu.Lock()
  242. defer st.mu.Unlock()
  243. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  244. hasListeners := st.hasListenersLocked()
  245. if !recEnabled && !hasListeners {
  246. return
  247. }
  248. now := time.Now()
  249. seen := make(map[int64]bool, len(msg.items))
  250. for i := range msg.items {
  251. item := &msg.items[i]
  252. sig := &item.signal
  253. seen[sig.ID] = true
  254. if sig.ID == 0 || sig.Class == nil {
  255. continue
  256. }
  257. if len(item.snippet) == 0 || item.snipRate <= 0 {
  258. continue
  259. }
  260. // Decide whether this signal needs a session
  261. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  262. needsListen := st.signalHasListenerLocked(sig)
  263. className := "<nil>"
  264. demodName := ""
  265. if sig.Class != nil {
  266. className = string(sig.Class.ModType)
  267. demodName, _ = resolveDemod(sig)
  268. }
  269. if st.policy.DebugLiveAudio {
  270. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  271. }
  272. if !needsRecording && !needsListen {
  273. continue
  274. }
  275. sess, exists := st.sessions[sig.ID]
  276. requestedMode := ""
  277. for _, pl := range st.pendingListens {
  278. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  279. if m := normalizeRequestedMode(pl.mode); m != "" {
  280. requestedMode = m
  281. break
  282. }
  283. }
  284. }
  285. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  286. for _, sub := range sess.audioSubs {
  287. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  288. }
  289. delete(st.sessions, sig.ID)
  290. sess = nil
  291. exists = false
  292. }
  293. if !exists {
  294. if needsRecording {
  295. s, err := st.openRecordingSession(sig, now)
  296. if err != nil {
  297. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  298. sig.ID, sig.CenterHz/1e6, err)
  299. continue
  300. }
  301. st.sessions[sig.ID] = s
  302. sess = s
  303. } else {
  304. s := st.openListenSession(sig, now)
  305. st.sessions[sig.ID] = s
  306. sess = s
  307. }
  308. // Attach any pending listeners
  309. st.attachPendingListeners(sess)
  310. }
  311. // Update metadata
  312. sess.lastFeed = now
  313. sess.centerHz = sig.CenterHz
  314. sess.bwHz = sig.BWHz
  315. if sig.SNRDb > sess.snrDb {
  316. sess.snrDb = sig.SNRDb
  317. }
  318. if sig.PeakDb > sess.peakDb {
  319. sess.peakDb = sig.PeakDb
  320. }
  321. if sig.Class != nil {
  322. sess.class = sig.Class
  323. }
  324. // Demod with persistent state
  325. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  326. if len(audio) > 0 {
  327. if sess.wavSamples == 0 && audioRate > 0 {
  328. sess.sampleRate = audioRate
  329. }
  330. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  331. pcmLen := len(audio) * 2
  332. pcm := sess.growPCM(pcmLen)
  333. for k, s := range audio {
  334. v := int16(clip(s * 32767))
  335. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  336. }
  337. if !sess.listenOnly && sess.wavBuf != nil {
  338. n, err := sess.wavBuf.Write(pcm)
  339. if err != nil {
  340. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  341. } else {
  342. sess.wavSamples += int64(n / 2)
  343. }
  344. }
  345. // Gap logging for live-audio sessions
  346. if len(sess.audioSubs) > 0 {
  347. if !sess.lastAudioTs.IsZero() {
  348. gap := time.Since(sess.lastAudioTs)
  349. if gap > 150*time.Millisecond {
  350. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  351. }
  352. }
  353. sess.lastAudioTs = time.Now()
  354. }
  355. st.fanoutPCM(sess, pcm, pcmLen)
  356. }
  357. // Segment split (recording sessions only)
  358. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  359. segIdx := sess.segmentIdx + 1
  360. oldSubs := sess.audioSubs
  361. oldState := sess.captureDSPState()
  362. sess.audioSubs = nil
  363. closeSession(sess, &st.policy)
  364. s, err := st.openRecordingSession(sig, now)
  365. if err != nil {
  366. delete(st.sessions, sig.ID)
  367. continue
  368. }
  369. s.segmentIdx = segIdx
  370. s.audioSubs = oldSubs
  371. s.restoreDSPState(oldState)
  372. st.sessions[sig.ID] = s
  373. }
  374. }
  375. // Close sessions for disappeared signals (with grace period)
  376. for id, sess := range st.sessions {
  377. if seen[id] {
  378. continue
  379. }
  380. gracePeriod := 3 * time.Second
  381. if sess.listenOnly {
  382. gracePeriod = 5 * time.Second
  383. }
  384. if now.Sub(sess.lastFeed) > gracePeriod {
  385. for _, sub := range sess.audioSubs {
  386. close(sub.ch)
  387. }
  388. sess.audioSubs = nil
  389. if !sess.listenOnly {
  390. closeSession(sess, &st.policy)
  391. }
  392. delete(st.sessions, id)
  393. }
  394. }
  395. }
  396. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  397. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  398. if st.policy.DebugLiveAudio {
  399. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  400. }
  401. return true
  402. }
  403. for subID, pl := range st.pendingListens {
  404. delta := math.Abs(sig.CenterHz - pl.freq)
  405. if delta < 200000 {
  406. if st.policy.DebugLiveAudio {
  407. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  408. }
  409. return true
  410. }
  411. }
  412. return false
  413. }
  414. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  415. for subID, pl := range st.pendingListens {
  416. requestedMode := normalizeRequestedMode(pl.mode)
  417. if requestedMode != "" && sess.demodName != requestedMode {
  418. continue
  419. }
  420. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  421. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  422. delete(st.pendingListens, subID)
  423. // Send updated audio_info now that we know the real session params.
  424. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  425. infoJSON, _ := json.Marshal(sess.audioInfo())
  426. tagged := make([]byte, 1+len(infoJSON))
  427. tagged[0] = 0x00 // tag: audio_info
  428. copy(tagged[1:], infoJSON)
  429. select {
  430. case pl.ch <- tagged:
  431. default:
  432. }
  433. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  434. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  435. }
  436. }
  437. }
  438. // CloseAll finalises all sessions and stops the worker goroutine.
  439. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  440. st.mu.Lock()
  441. defer st.mu.Unlock()
  442. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  443. for _, sess := range st.sessions {
  444. out[sess.signalID] = RuntimeSignalInfo{
  445. DemodName: sess.demodName,
  446. PlaybackMode: sess.playbackMode,
  447. StereoState: sess.stereoState,
  448. Channels: sess.channels,
  449. SampleRate: sess.sampleRate,
  450. }
  451. }
  452. return out
  453. }
  454. func (st *Streamer) CloseAll() {
  455. close(st.feedCh)
  456. <-st.done
  457. st.mu.Lock()
  458. defer st.mu.Unlock()
  459. for id, sess := range st.sessions {
  460. for _, sub := range sess.audioSubs {
  461. close(sub.ch)
  462. }
  463. sess.audioSubs = nil
  464. if !sess.listenOnly {
  465. closeSession(sess, &st.policy)
  466. }
  467. delete(st.sessions, id)
  468. }
  469. for _, pl := range st.pendingListens {
  470. close(pl.ch)
  471. }
  472. st.pendingListens = nil
  473. }
  474. // ActiveSessions returns the number of open streaming sessions.
  475. func (st *Streamer) ActiveSessions() int {
  476. st.mu.Lock()
  477. defer st.mu.Unlock()
  478. return len(st.sessions)
  479. }
  480. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  481. //
  482. // LL-2: Returns AudioInfo with correct channels and sample rate.
  483. // LL-3: Returns error only on hard failures (nil streamer etc).
  484. //
  485. // If a matching session exists, attaches immediately. Otherwise, the
  486. // subscriber is held as "pending" and will be attached when a matching
  487. // signal appears in the next DSP frame.
  488. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  489. ch := make(chan []byte, 64)
  490. st.mu.Lock()
  491. defer st.mu.Unlock()
  492. st.nextSub++
  493. subID := st.nextSub
  494. requestedMode := normalizeRequestedMode(mode)
  495. // Try to find a matching session
  496. var bestSess *streamSession
  497. bestDist := math.MaxFloat64
  498. for _, sess := range st.sessions {
  499. if requestedMode != "" && sess.demodName != requestedMode {
  500. continue
  501. }
  502. d := math.Abs(sess.centerHz - freq)
  503. if d < bestDist {
  504. bestDist = d
  505. bestSess = sess
  506. }
  507. }
  508. if bestSess != nil && bestDist < 200000 {
  509. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  510. info := bestSess.audioInfo()
  511. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  512. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  513. return subID, ch, info, nil
  514. }
  515. // No matching session yet — add as pending listener
  516. st.pendingListens[subID] = &pendingListen{
  517. freq: freq,
  518. bw: bw,
  519. mode: mode,
  520. ch: ch,
  521. }
  522. info := defaultAudioInfoForMode(mode)
  523. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  524. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  525. return subID, ch, info, nil
  526. }
  527. // UnsubscribeAudio removes a live-listen subscriber.
  528. func (st *Streamer) UnsubscribeAudio(subID int64) {
  529. st.mu.Lock()
  530. defer st.mu.Unlock()
  531. if pl, ok := st.pendingListens[subID]; ok {
  532. close(pl.ch)
  533. delete(st.pendingListens, subID)
  534. return
  535. }
  536. for _, sess := range st.sessions {
  537. for i, sub := range sess.audioSubs {
  538. if sub.id == subID {
  539. close(sub.ch)
  540. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  541. return
  542. }
  543. }
  544. }
  545. }
  546. // ---------------------------------------------------------------------------
  547. // Session: stateful extraction + demod
  548. // ---------------------------------------------------------------------------
  549. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  550. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  551. // output with zero transient artifacts.
  552. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  553. if len(snippet) == 0 || snipRate <= 0 {
  554. return nil, 0
  555. }
  556. isWFMStereo := sess.demodName == "WFM_STEREO"
  557. isWFM := sess.demodName == "WFM" || isWFMStereo
  558. demodName := sess.demodName
  559. if isWFMStereo {
  560. demodName = "WFM"
  561. }
  562. d := demod.Get(demodName)
  563. if d == nil {
  564. d = demod.Get("NFM")
  565. }
  566. if d == nil {
  567. return nil, 0
  568. }
  569. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  570. // The FM discriminator needs iq[i-1] to compute the first output.
  571. // All FIR filtering is now stateful, so no additional overlap is needed.
  572. var fullSnip []complex64
  573. trimSamples := 0
  574. if len(sess.overlapIQ) == 1 {
  575. fullSnip = make([]complex64, 1+len(snippet))
  576. fullSnip[0] = sess.overlapIQ[0]
  577. copy(fullSnip[1:], snippet)
  578. trimSamples = 1
  579. logging.Debug("discrim", "overlap_applied", "signal", sess.signalID, "snip", len(snippet))
  580. } else {
  581. fullSnip = snippet
  582. }
  583. // Save last sample for next frame's FM discriminator
  584. if len(snippet) > 0 {
  585. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  586. }
  587. // --- Stateful anti-alias FIR + decimation to demod rate ---
  588. demodRate := d.OutputSampleRate()
  589. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  590. if decim1 < 1 {
  591. decim1 = 1
  592. }
  593. actualDemodRate := snipRate / decim1
  594. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  595. var dec []complex64
  596. if decim1 > 1 {
  597. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  598. // Lazy-init or reinit stateful FIR if parameters changed
  599. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  600. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  601. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  602. sess.preDemodRate = snipRate
  603. sess.preDemodCutoff = cutoff
  604. sess.preDemodDecim = decim1
  605. }
  606. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  607. dec = dsp.Decimate(filtered, decim1)
  608. } else {
  609. dec = fullSnip
  610. }
  611. // --- FM Demod ---
  612. audio := d.Demod(dec, actualDemodRate)
  613. if len(audio) == 0 {
  614. return nil, 0
  615. }
  616. // --- Trim the 1-sample FM discriminator overlap ---
  617. if trimSamples > 0 {
  618. audioTrim := trimSamples / decim1
  619. if audioTrim < 1 {
  620. audioTrim = 1 // at minimum trim 1 audio sample
  621. }
  622. if audioTrim > 0 && audioTrim < len(audio) {
  623. logging.Debug("discrim", "audio_trim", "signal", sess.signalID, "trim", audioTrim, "decim1", decim1, "audio_len", len(audio))
  624. audio = audio[audioTrim:]
  625. }
  626. }
  627. // --- Stateful stereo decode with conservative lock/hysteresis ---
  628. channels := 1
  629. if isWFMStereo {
  630. sess.playbackMode = "WFM_STEREO"
  631. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  632. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  633. if locked {
  634. sess.stereoOnCount++
  635. sess.stereoOffCount = 0
  636. if sess.stereoOnCount >= 4 {
  637. sess.stereoEnabled = true
  638. }
  639. } else {
  640. sess.stereoOnCount = 0
  641. sess.stereoOffCount++
  642. if sess.stereoOffCount >= 10 {
  643. sess.stereoEnabled = false
  644. }
  645. }
  646. prevPlayback := sess.playbackMode
  647. prevStereo := sess.stereoState
  648. if sess.stereoEnabled && len(stereoAudio) > 0 {
  649. sess.stereoState = "locked"
  650. audio = stereoAudio
  651. } else {
  652. sess.stereoState = "mono-fallback"
  653. dual := make([]float32, len(audio)*2)
  654. for i, s := range audio {
  655. dual[i*2] = s
  656. dual[i*2+1] = s
  657. }
  658. audio = dual
  659. }
  660. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  661. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  662. }
  663. }
  664. // --- Polyphase resample to exact 48kHz ---
  665. if actualDemodRate != streamAudioRate {
  666. if channels > 1 {
  667. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  668. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  669. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  670. sess.stereoResamplerRate = actualDemodRate
  671. }
  672. audio = sess.stereoResampler.Process(audio)
  673. } else {
  674. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  675. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  676. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  677. sess.monoResamplerRate = actualDemodRate
  678. }
  679. audio = sess.monoResampler.Process(audio)
  680. }
  681. }
  682. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  683. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  684. tau := sess.deemphasisUs * 1e-6
  685. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  686. if channels > 1 {
  687. nFrames := len(audio) / channels
  688. yL, yR := sess.deemphL, sess.deemphR
  689. for i := 0; i < nFrames; i++ {
  690. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  691. audio[i*2] = float32(yL)
  692. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  693. audio[i*2+1] = float32(yR)
  694. }
  695. sess.deemphL, sess.deemphR = yL, yR
  696. } else {
  697. y := sess.deemphL
  698. for i := range audio {
  699. y = alpha*y + (1-alpha)*float64(audio[i])
  700. audio[i] = float32(y)
  701. }
  702. sess.deemphL = y
  703. }
  704. }
  705. if isWFM {
  706. for i := range audio {
  707. audio[i] *= 0.35
  708. }
  709. }
  710. return audio, streamAudioRate
  711. }
  712. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  713. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  714. // loopBW is in Hz, sampleRate in samples/sec.
  715. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  716. if sampleRate <= 0 || loopBW <= 0 {
  717. return 0, 0
  718. }
  719. bl := loopBW / float64(sampleRate)
  720. theta := bl / (damping + 0.25/damping)
  721. d := 1 + 2*damping*theta + theta*theta
  722. alpha := (4 * damping * theta) / d
  723. beta := (4 * theta * theta) / d
  724. return alpha, beta
  725. }
  726. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  727. // Uses persistent FIR filter state across frames for click-free stereo.
  728. // Reuses session scratch buffers to minimize allocations.
  729. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  730. if len(mono) == 0 || sampleRate <= 0 {
  731. return nil, false
  732. }
  733. n := len(mono)
  734. // Rebuild rate-dependent stereo filters when sampleRate changes
  735. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  736. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  737. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  738. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  739. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  740. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  741. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  742. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  743. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  744. sess.stereoFilterRate = sampleRate
  745. // Initialize PLL for 19kHz pilot tracking.
  746. sess.pilotPhase = 0
  747. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  748. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  749. sess.pilotErrAvg = 0
  750. sess.pilotI = 0
  751. sess.pilotQ = 0
  752. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  753. }
  754. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  755. scratch := sess.growAudio(n * 5)
  756. lpr := scratch[:n]
  757. bpfLR := scratch[n : 2*n]
  758. lr := scratch[2*n : 3*n]
  759. work1 := scratch[3*n : 4*n]
  760. work2 := scratch[4*n : 5*n]
  761. sess.stereoLPF.ProcessInto(mono, lpr)
  762. // 23-53kHz bandpass for L-R DSB-SC.
  763. sess.stereoBPHi.ProcessInto(mono, work1)
  764. sess.stereoBPLo.ProcessInto(mono, work2)
  765. for i := 0; i < n; i++ {
  766. bpfLR[i] = work1[i] - work2[i]
  767. }
  768. // 19kHz pilot bandpass for PLL.
  769. sess.pilotLPFHi.ProcessInto(mono, work1)
  770. sess.pilotLPFLo.ProcessInto(mono, work2)
  771. for i := 0; i < n; i++ {
  772. work1[i] = work1[i] - work2[i]
  773. }
  774. pilot := work1
  775. phase := sess.pilotPhase
  776. freq := sess.pilotFreq
  777. alpha := sess.pilotAlpha
  778. beta := sess.pilotBeta
  779. iState := sess.pilotI
  780. qState := sess.pilotQ
  781. lpAlpha := sess.pilotLPAlpha
  782. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  783. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  784. var pilotPower float64
  785. var totalPower float64
  786. var errSum float64
  787. for i := 0; i < n; i++ {
  788. p := float64(pilot[i])
  789. sinP, cosP := math.Sincos(phase)
  790. iMix := p * cosP
  791. qMix := p * -sinP
  792. iState += lpAlpha * (iMix - iState)
  793. qState += lpAlpha * (qMix - qState)
  794. err := math.Atan2(qState, iState)
  795. freq += beta * err
  796. if freq < minFreq {
  797. freq = minFreq
  798. } else if freq > maxFreq {
  799. freq = maxFreq
  800. }
  801. phase += freq + alpha*err
  802. if phase > 2*math.Pi {
  803. phase -= 2 * math.Pi
  804. } else if phase < 0 {
  805. phase += 2 * math.Pi
  806. }
  807. totalPower += float64(mono[i]) * float64(mono[i])
  808. pilotPower += p * p
  809. errSum += math.Abs(err)
  810. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  811. }
  812. sess.pilotPhase = phase
  813. sess.pilotFreq = freq
  814. sess.pilotI = iState
  815. sess.pilotQ = qState
  816. blockErr := errSum / float64(n)
  817. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  818. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  819. pilotRatio := 0.0
  820. if totalPower > 0 {
  821. pilotRatio = pilotPower / totalPower
  822. }
  823. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  824. // Lock heuristics: pilot power fraction and PLL phase error stability.
  825. // Pilot power is a small but stable fraction of composite energy; require
  826. // a modest floor plus PLL settling to avoid flapping in noise.
  827. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  828. out := make([]float32, n*2)
  829. for i := 0; i < n; i++ {
  830. out[i*2] = 0.5 * (lpr[i] + lr[i])
  831. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  832. }
  833. return out, locked
  834. }
  835. // dspStateSnapshot captures persistent DSP state for segment splits.
  836. type dspStateSnapshot struct {
  837. overlapIQ []complex64
  838. deemphL float64
  839. deemphR float64
  840. pilotPhase float64
  841. pilotFreq float64
  842. pilotAlpha float64
  843. pilotBeta float64
  844. pilotErrAvg float64
  845. pilotI float64
  846. pilotQ float64
  847. pilotLPAlpha float64
  848. monoResampler *dsp.Resampler
  849. monoResamplerRate int
  850. stereoResampler *dsp.StereoResampler
  851. stereoResamplerRate int
  852. stereoLPF *dsp.StatefulFIRReal
  853. stereoFilterRate int
  854. stereoBPHi *dsp.StatefulFIRReal
  855. stereoBPLo *dsp.StatefulFIRReal
  856. stereoLRLPF *dsp.StatefulFIRReal
  857. stereoAALPF *dsp.StatefulFIRReal
  858. pilotLPFHi *dsp.StatefulFIRReal
  859. pilotLPFLo *dsp.StatefulFIRReal
  860. preDemodFIR *dsp.StatefulFIRComplex
  861. preDemodDecim int
  862. preDemodRate int
  863. preDemodCutoff float64
  864. }
  865. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  866. return dspStateSnapshot{
  867. overlapIQ: sess.overlapIQ,
  868. deemphL: sess.deemphL,
  869. deemphR: sess.deemphR,
  870. pilotPhase: sess.pilotPhase,
  871. pilotFreq: sess.pilotFreq,
  872. pilotAlpha: sess.pilotAlpha,
  873. pilotBeta: sess.pilotBeta,
  874. pilotErrAvg: sess.pilotErrAvg,
  875. pilotI: sess.pilotI,
  876. pilotQ: sess.pilotQ,
  877. pilotLPAlpha: sess.pilotLPAlpha,
  878. monoResampler: sess.monoResampler,
  879. monoResamplerRate: sess.monoResamplerRate,
  880. stereoResampler: sess.stereoResampler,
  881. stereoResamplerRate: sess.stereoResamplerRate,
  882. stereoLPF: sess.stereoLPF,
  883. stereoFilterRate: sess.stereoFilterRate,
  884. stereoBPHi: sess.stereoBPHi,
  885. stereoBPLo: sess.stereoBPLo,
  886. stereoLRLPF: sess.stereoLRLPF,
  887. stereoAALPF: sess.stereoAALPF,
  888. pilotLPFHi: sess.pilotLPFHi,
  889. pilotLPFLo: sess.pilotLPFLo,
  890. preDemodFIR: sess.preDemodFIR,
  891. preDemodDecim: sess.preDemodDecim,
  892. preDemodRate: sess.preDemodRate,
  893. preDemodCutoff: sess.preDemodCutoff,
  894. }
  895. }
  896. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  897. sess.overlapIQ = s.overlapIQ
  898. sess.deemphL = s.deemphL
  899. sess.deemphR = s.deemphR
  900. sess.pilotPhase = s.pilotPhase
  901. sess.pilotFreq = s.pilotFreq
  902. sess.pilotAlpha = s.pilotAlpha
  903. sess.pilotBeta = s.pilotBeta
  904. sess.pilotErrAvg = s.pilotErrAvg
  905. sess.pilotI = s.pilotI
  906. sess.pilotQ = s.pilotQ
  907. sess.pilotLPAlpha = s.pilotLPAlpha
  908. sess.monoResampler = s.monoResampler
  909. sess.monoResamplerRate = s.monoResamplerRate
  910. sess.stereoResampler = s.stereoResampler
  911. sess.stereoResamplerRate = s.stereoResamplerRate
  912. sess.stereoLPF = s.stereoLPF
  913. sess.stereoFilterRate = s.stereoFilterRate
  914. sess.stereoBPHi = s.stereoBPHi
  915. sess.stereoBPLo = s.stereoBPLo
  916. sess.stereoLRLPF = s.stereoLRLPF
  917. sess.stereoAALPF = s.stereoAALPF
  918. sess.pilotLPFHi = s.pilotLPFHi
  919. sess.pilotLPFLo = s.pilotLPFLo
  920. sess.preDemodFIR = s.preDemodFIR
  921. sess.preDemodDecim = s.preDemodDecim
  922. sess.preDemodRate = s.preDemodRate
  923. sess.preDemodCutoff = s.preDemodCutoff
  924. }
  925. // ---------------------------------------------------------------------------
  926. // Session management helpers
  927. // ---------------------------------------------------------------------------
  928. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  929. outputDir := st.policy.OutputDir
  930. if outputDir == "" {
  931. outputDir = "data/recordings"
  932. }
  933. demodName, channels := resolveDemod(sig)
  934. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  935. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  936. dir := filepath.Join(outputDir, dirName)
  937. if err := os.MkdirAll(dir, 0o755); err != nil {
  938. return nil, err
  939. }
  940. wavPath := filepath.Join(dir, "audio.wav")
  941. f, err := os.Create(wavPath)
  942. if err != nil {
  943. return nil, err
  944. }
  945. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  946. f.Close()
  947. return nil, err
  948. }
  949. playbackMode, stereoState := initialPlaybackState(demodName)
  950. sess := &streamSession{
  951. signalID: sig.ID,
  952. centerHz: sig.CenterHz,
  953. bwHz: sig.BWHz,
  954. snrDb: sig.SNRDb,
  955. peakDb: sig.PeakDb,
  956. class: sig.Class,
  957. startTime: now,
  958. lastFeed: now,
  959. dir: dir,
  960. wavFile: f,
  961. wavBuf: bufio.NewWriterSize(f, 64*1024),
  962. sampleRate: streamAudioRate,
  963. channels: channels,
  964. demodName: demodName,
  965. playbackMode: playbackMode,
  966. stereoState: stereoState,
  967. deemphasisUs: st.policy.DeemphasisUs,
  968. }
  969. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  970. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  971. return sess, nil
  972. }
  973. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  974. demodName, channels := resolveDemod(sig)
  975. for _, pl := range st.pendingListens {
  976. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  977. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  978. demodName = requested
  979. if demodName == "WFM_STEREO" {
  980. channels = 2
  981. } else if d := demod.Get(demodName); d != nil {
  982. channels = d.Channels()
  983. } else {
  984. channels = 1
  985. }
  986. break
  987. }
  988. }
  989. }
  990. playbackMode, stereoState := initialPlaybackState(demodName)
  991. sess := &streamSession{
  992. signalID: sig.ID,
  993. centerHz: sig.CenterHz,
  994. bwHz: sig.BWHz,
  995. snrDb: sig.SNRDb,
  996. peakDb: sig.PeakDb,
  997. class: sig.Class,
  998. startTime: now,
  999. lastFeed: now,
  1000. listenOnly: true,
  1001. sampleRate: streamAudioRate,
  1002. channels: channels,
  1003. demodName: demodName,
  1004. playbackMode: playbackMode,
  1005. stereoState: stereoState,
  1006. deemphasisUs: st.policy.DeemphasisUs,
  1007. }
  1008. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1009. sig.ID, sig.CenterHz/1e6, demodName)
  1010. return sess
  1011. }
  1012. func resolveDemod(sig *detector.Signal) (string, int) {
  1013. demodName := "NFM"
  1014. if sig.Class != nil {
  1015. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1016. demodName = n
  1017. }
  1018. }
  1019. channels := 1
  1020. if demodName == "WFM_STEREO" {
  1021. channels = 2
  1022. } else if d := demod.Get(demodName); d != nil {
  1023. channels = d.Channels()
  1024. }
  1025. return demodName, channels
  1026. }
  1027. func initialPlaybackState(demodName string) (string, string) {
  1028. playbackMode := demodName
  1029. stereoState := "mono"
  1030. if demodName == "WFM_STEREO" {
  1031. stereoState = "searching"
  1032. }
  1033. return playbackMode, stereoState
  1034. }
  1035. func (sess *streamSession) audioInfo() AudioInfo {
  1036. return AudioInfo{
  1037. SampleRate: sess.sampleRate,
  1038. Channels: sess.channels,
  1039. Format: "s16le",
  1040. DemodName: sess.demodName,
  1041. PlaybackMode: sess.playbackMode,
  1042. StereoState: sess.stereoState,
  1043. }
  1044. }
  1045. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1046. infoJSON, _ := json.Marshal(info)
  1047. tagged := make([]byte, 1+len(infoJSON))
  1048. tagged[0] = 0x00 // tag: audio_info
  1049. copy(tagged[1:], infoJSON)
  1050. for _, sub := range subs {
  1051. select {
  1052. case sub.ch <- tagged:
  1053. default:
  1054. }
  1055. }
  1056. }
  1057. func defaultAudioInfoForMode(mode string) AudioInfo {
  1058. demodName := "NFM"
  1059. if requested := normalizeRequestedMode(mode); requested != "" {
  1060. demodName = requested
  1061. }
  1062. channels := 1
  1063. if demodName == "WFM_STEREO" {
  1064. channels = 2
  1065. } else if d := demod.Get(demodName); d != nil {
  1066. channels = d.Channels()
  1067. }
  1068. playbackMode, stereoState := initialPlaybackState(demodName)
  1069. return AudioInfo{
  1070. SampleRate: streamAudioRate,
  1071. Channels: channels,
  1072. Format: "s16le",
  1073. DemodName: demodName,
  1074. PlaybackMode: playbackMode,
  1075. StereoState: stereoState,
  1076. }
  1077. }
  1078. func normalizeRequestedMode(mode string) string {
  1079. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1080. case "", "AUTO":
  1081. return ""
  1082. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1083. return strings.ToUpper(strings.TrimSpace(mode))
  1084. default:
  1085. return ""
  1086. }
  1087. }
  1088. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1089. func (sess *streamSession) growIQ(n int) []complex64 {
  1090. if cap(sess.scratchIQ) >= n {
  1091. return sess.scratchIQ[:n]
  1092. }
  1093. sess.scratchIQ = make([]complex64, n, n*5/4)
  1094. return sess.scratchIQ
  1095. }
  1096. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1097. func (sess *streamSession) growAudio(n int) []float32 {
  1098. if cap(sess.scratchAudio) >= n {
  1099. return sess.scratchAudio[:n]
  1100. }
  1101. sess.scratchAudio = make([]float32, n, n*5/4)
  1102. return sess.scratchAudio
  1103. }
  1104. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1105. func (sess *streamSession) growPCM(n int) []byte {
  1106. if cap(sess.scratchPCM) >= n {
  1107. return sess.scratchPCM[:n]
  1108. }
  1109. sess.scratchPCM = make([]byte, n, n*5/4)
  1110. return sess.scratchPCM
  1111. }
  1112. func convertToListenOnly(sess *streamSession) {
  1113. if sess.wavBuf != nil {
  1114. _ = sess.wavBuf.Flush()
  1115. }
  1116. if sess.wavFile != nil {
  1117. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1118. sess.wavFile.Close()
  1119. }
  1120. sess.wavFile = nil
  1121. sess.wavBuf = nil
  1122. sess.listenOnly = true
  1123. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1124. }
  1125. func closeSession(sess *streamSession, policy *Policy) {
  1126. if sess.listenOnly {
  1127. return
  1128. }
  1129. if sess.wavBuf != nil {
  1130. _ = sess.wavBuf.Flush()
  1131. }
  1132. if sess.wavFile != nil {
  1133. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1134. sess.wavFile.Close()
  1135. sess.wavFile = nil
  1136. sess.wavBuf = nil
  1137. }
  1138. dur := sess.lastFeed.Sub(sess.startTime)
  1139. files := map[string]any{
  1140. "audio": "audio.wav",
  1141. "audio_sample_rate": sess.sampleRate,
  1142. "audio_channels": sess.channels,
  1143. "audio_demod": sess.demodName,
  1144. "recording_mode": "streaming",
  1145. }
  1146. meta := Meta{
  1147. EventID: sess.signalID,
  1148. Start: sess.startTime,
  1149. End: sess.lastFeed,
  1150. CenterHz: sess.centerHz,
  1151. BandwidthHz: sess.bwHz,
  1152. SampleRate: sess.sampleRate,
  1153. SNRDb: sess.snrDb,
  1154. PeakDb: sess.peakDb,
  1155. Class: sess.class,
  1156. DurationMs: dur.Milliseconds(),
  1157. Files: files,
  1158. }
  1159. b, err := json.MarshalIndent(meta, "", " ")
  1160. if err == nil {
  1161. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1162. }
  1163. if policy != nil {
  1164. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1165. }
  1166. }
  1167. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1168. if len(sess.audioSubs) == 0 {
  1169. return
  1170. }
  1171. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1172. tagged := make([]byte, 1+pcmLen)
  1173. tagged[0] = 0x01
  1174. copy(tagged[1:], pcm[:pcmLen])
  1175. alive := sess.audioSubs[:0]
  1176. for _, sub := range sess.audioSubs {
  1177. select {
  1178. case sub.ch <- tagged:
  1179. default:
  1180. st.droppedPCM++
  1181. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1182. }
  1183. alive = append(alive, sub)
  1184. }
  1185. sess.audioSubs = alive
  1186. }
  1187. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1188. if len(st.policy.ClassFilter) == 0 {
  1189. return true
  1190. }
  1191. if cls == nil {
  1192. return false
  1193. }
  1194. for _, f := range st.policy.ClassFilter {
  1195. if strings.EqualFold(f, string(cls.ModType)) {
  1196. return true
  1197. }
  1198. }
  1199. return false
  1200. }
  1201. // ErrNoSession is returned when no matching signal session exists.
  1202. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1203. // ---------------------------------------------------------------------------
  1204. // WAV header helpers
  1205. // ---------------------------------------------------------------------------
  1206. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1207. if channels <= 0 {
  1208. channels = 1
  1209. }
  1210. hdr := make([]byte, 44)
  1211. copy(hdr[0:4], "RIFF")
  1212. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1213. copy(hdr[8:12], "WAVE")
  1214. copy(hdr[12:16], "fmt ")
  1215. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1216. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1217. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1218. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1219. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1220. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1221. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1222. copy(hdr[36:40], "data")
  1223. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1224. _, err := f.Write(hdr)
  1225. return err
  1226. }
  1227. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1228. dataSize := uint32(totalSamples * 2)
  1229. var buf [4]byte
  1230. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1231. if _, err := f.Seek(4, 0); err != nil {
  1232. return
  1233. }
  1234. _, _ = f.Write(buf[:])
  1235. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1236. if _, err := f.Seek(24, 0); err != nil {
  1237. return
  1238. }
  1239. _, _ = f.Write(buf[:])
  1240. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1241. if _, err := f.Seek(28, 0); err != nil {
  1242. return
  1243. }
  1244. _, _ = f.Write(buf[:])
  1245. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1246. if _, err := f.Seek(40, 0); err != nil {
  1247. return
  1248. }
  1249. _, _ = f.Write(buf[:])
  1250. }