Wideband autonomous SDR analysis engine forked from sdr-visual-suite
No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

1370 líneas
38KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. // listenOnly sessions have no WAV file and no disk I/O.
  36. // They exist solely to feed audio to live-listen subscribers.
  37. listenOnly bool
  38. // Recording state (nil/zero for listen-only sessions)
  39. dir string
  40. wavFile *os.File
  41. wavBuf *bufio.Writer
  42. wavSamples int64
  43. segmentIdx int
  44. sampleRate int // actual output audio sample rate (always streamAudioRate)
  45. channels int
  46. demodName string
  47. // --- Persistent DSP state for click-free streaming ---
  48. // Overlap-save: tail of previous extracted IQ snippet.
  49. overlapIQ []complex64
  50. // De-emphasis IIR state (persists across frames)
  51. deemphL float64
  52. deemphR float64
  53. // Stereo lock state for live WFM streaming
  54. stereoEnabled bool
  55. stereoOnCount int
  56. stereoOffCount int
  57. // Pilot-locked stereo PLL state (19kHz pilot)
  58. pilotPhase float64
  59. pilotFreq float64
  60. pilotAlpha float64
  61. pilotBeta float64
  62. pilotErrAvg float64
  63. pilotI float64
  64. pilotQ float64
  65. pilotLPAlpha float64
  66. // Polyphase resampler (replaces integer-decimate hack)
  67. monoResampler *dsp.Resampler
  68. monoResamplerRate int
  69. stereoResampler *dsp.StereoResampler
  70. stereoResamplerRate int
  71. // AQ-4: Stateful FIR filters for click-free stereo decode
  72. stereoFilterRate int
  73. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  74. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  75. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  76. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  77. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  78. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  79. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  80. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  81. // and avoids per-frame FIR recomputation)
  82. preDemodFIR *dsp.StatefulFIRComplex
  83. preDemodDecim int // cached decimation factor
  84. preDemodRate int // cached snipRate this FIR was built for
  85. preDemodCutoff float64 // cached cutoff
  86. // AQ-2: De-emphasis config (µs, 0 = disabled)
  87. deemphasisUs float64
  88. // Scratch buffers — reused across frames to avoid GC pressure.
  89. // Grown as needed, never shrunk.
  90. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  91. scratchAudio []float32 // for stereo decode intermediates
  92. scratchPCM []byte // for PCM encoding
  93. // live-listen subscribers
  94. audioSubs []audioSub
  95. }
  96. type audioSub struct {
  97. id int64
  98. ch chan []byte
  99. }
  100. type RuntimeSignalInfo struct {
  101. DemodName string
  102. PlaybackMode string
  103. StereoState string
  104. Channels int
  105. SampleRate int
  106. }
  107. // AudioInfo describes the audio format of a live-listen subscription.
  108. // Sent to the WebSocket client as the first message.
  109. type AudioInfo struct {
  110. SampleRate int `json:"sample_rate"`
  111. Channels int `json:"channels"`
  112. Format string `json:"format"` // always "s16le"
  113. DemodName string `json:"demod"`
  114. PlaybackMode string `json:"playback_mode,omitempty"`
  115. StereoState string `json:"stereo_state,omitempty"`
  116. }
  117. const (
  118. streamAudioRate = 48000
  119. resamplerTaps = 32 // taps per polyphase arm — good quality
  120. )
  121. // ---------------------------------------------------------------------------
  122. // Streamer — manages all active streaming sessions
  123. // ---------------------------------------------------------------------------
  124. type streamFeedItem struct {
  125. signal detector.Signal
  126. snippet []complex64
  127. snipRate int
  128. }
  129. type streamFeedMsg struct {
  130. items []streamFeedItem
  131. }
  132. type Streamer struct {
  133. mu sync.Mutex
  134. sessions map[int64]*streamSession
  135. policy Policy
  136. centerHz float64
  137. nextSub int64
  138. feedCh chan streamFeedMsg
  139. done chan struct{}
  140. droppedFeed uint64
  141. droppedPCM uint64
  142. // pendingListens are subscribers waiting for a matching session.
  143. pendingListens map[int64]*pendingListen
  144. }
  145. type pendingListen struct {
  146. freq float64
  147. bw float64
  148. mode string
  149. ch chan []byte
  150. }
  151. func newStreamer(policy Policy, centerHz float64) *Streamer {
  152. st := &Streamer{
  153. sessions: make(map[int64]*streamSession),
  154. policy: policy,
  155. centerHz: centerHz,
  156. feedCh: make(chan streamFeedMsg, 2),
  157. done: make(chan struct{}),
  158. pendingListens: make(map[int64]*pendingListen),
  159. }
  160. go st.worker()
  161. return st
  162. }
  163. func (st *Streamer) worker() {
  164. for msg := range st.feedCh {
  165. st.processFeed(msg)
  166. }
  167. close(st.done)
  168. }
  169. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  170. st.mu.Lock()
  171. defer st.mu.Unlock()
  172. wasEnabled := st.policy.Enabled
  173. st.policy = policy
  174. st.centerHz = centerHz
  175. // If recording was just disabled, close recording sessions
  176. // but keep listen-only sessions alive.
  177. if wasEnabled && !policy.Enabled {
  178. for id, sess := range st.sessions {
  179. if sess.listenOnly {
  180. continue
  181. }
  182. if len(sess.audioSubs) > 0 {
  183. // Convert to listen-only: close WAV but keep session
  184. convertToListenOnly(sess)
  185. } else {
  186. closeSession(sess, &st.policy)
  187. delete(st.sessions, id)
  188. }
  189. }
  190. }
  191. }
  192. // HasListeners returns true if any sessions have audio subscribers
  193. // or there are pending listen requests. Used by the DSP loop to
  194. // decide whether to feed snippets even when recording is disabled.
  195. func (st *Streamer) HasListeners() bool {
  196. st.mu.Lock()
  197. defer st.mu.Unlock()
  198. return st.hasListenersLocked()
  199. }
  200. func (st *Streamer) hasListenersLocked() bool {
  201. if len(st.pendingListens) > 0 {
  202. return true
  203. }
  204. for _, sess := range st.sessions {
  205. if len(sess.audioSubs) > 0 {
  206. return true
  207. }
  208. }
  209. return false
  210. }
  211. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  212. // Feeds are accepted if:
  213. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  214. // - Any live-listen subscribers exist (listen-only mode)
  215. //
  216. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  217. // data, so items can be passed directly without another copy.
  218. func (st *Streamer) FeedSnippets(items []streamFeedItem) {
  219. st.mu.Lock()
  220. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  221. hasListeners := st.hasListenersLocked()
  222. pending := len(st.pendingListens)
  223. debugLiveAudio := st.policy.DebugLiveAudio
  224. st.mu.Unlock()
  225. if debugLiveAudio {
  226. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  227. }
  228. if (!recEnabled && !hasListeners) || len(items) == 0 {
  229. return
  230. }
  231. select {
  232. case st.feedCh <- streamFeedMsg{items: items}:
  233. default:
  234. st.droppedFeed++
  235. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  236. }
  237. }
  238. // processFeed runs in the worker goroutine.
  239. func (st *Streamer) processFeed(msg streamFeedMsg) {
  240. st.mu.Lock()
  241. defer st.mu.Unlock()
  242. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  243. hasListeners := st.hasListenersLocked()
  244. if !recEnabled && !hasListeners {
  245. return
  246. }
  247. now := time.Now()
  248. seen := make(map[int64]bool, len(msg.items))
  249. for i := range msg.items {
  250. item := &msg.items[i]
  251. sig := &item.signal
  252. seen[sig.ID] = true
  253. if sig.ID == 0 || sig.Class == nil {
  254. continue
  255. }
  256. if len(item.snippet) == 0 || item.snipRate <= 0 {
  257. continue
  258. }
  259. // Decide whether this signal needs a session
  260. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  261. needsListen := st.signalHasListenerLocked(sig)
  262. className := "<nil>"
  263. demodName := ""
  264. if sig.Class != nil {
  265. className = string(sig.Class.ModType)
  266. demodName, _ = resolveDemod(sig)
  267. }
  268. if st.policy.DebugLiveAudio {
  269. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  270. }
  271. if !needsRecording && !needsListen {
  272. continue
  273. }
  274. sess, exists := st.sessions[sig.ID]
  275. requestedMode := ""
  276. for _, pl := range st.pendingListens {
  277. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  278. if m := normalizeRequestedMode(pl.mode); m != "" {
  279. requestedMode = m
  280. break
  281. }
  282. }
  283. }
  284. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  285. for _, sub := range sess.audioSubs {
  286. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  287. }
  288. delete(st.sessions, sig.ID)
  289. sess = nil
  290. exists = false
  291. }
  292. if !exists {
  293. if needsRecording {
  294. s, err := st.openRecordingSession(sig, now)
  295. if err != nil {
  296. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  297. sig.ID, sig.CenterHz/1e6, err)
  298. continue
  299. }
  300. st.sessions[sig.ID] = s
  301. sess = s
  302. } else {
  303. s := st.openListenSession(sig, now)
  304. st.sessions[sig.ID] = s
  305. sess = s
  306. }
  307. // Attach any pending listeners
  308. st.attachPendingListeners(sess)
  309. }
  310. // Update metadata
  311. sess.lastFeed = now
  312. sess.centerHz = sig.CenterHz
  313. sess.bwHz = sig.BWHz
  314. if sig.SNRDb > sess.snrDb {
  315. sess.snrDb = sig.SNRDb
  316. }
  317. if sig.PeakDb > sess.peakDb {
  318. sess.peakDb = sig.PeakDb
  319. }
  320. if sig.Class != nil {
  321. sess.class = sig.Class
  322. }
  323. // Demod with persistent state
  324. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  325. if len(audio) > 0 {
  326. if sess.wavSamples == 0 && audioRate > 0 {
  327. sess.sampleRate = audioRate
  328. }
  329. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  330. pcmLen := len(audio) * 2
  331. pcm := sess.growPCM(pcmLen)
  332. for k, s := range audio {
  333. v := int16(clip(s * 32767))
  334. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  335. }
  336. if !sess.listenOnly && sess.wavBuf != nil {
  337. n, err := sess.wavBuf.Write(pcm)
  338. if err != nil {
  339. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  340. } else {
  341. sess.wavSamples += int64(n / 2)
  342. }
  343. }
  344. st.fanoutPCM(sess, pcm, pcmLen)
  345. }
  346. // Segment split (recording sessions only)
  347. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  348. segIdx := sess.segmentIdx + 1
  349. oldSubs := sess.audioSubs
  350. oldState := sess.captureDSPState()
  351. sess.audioSubs = nil
  352. closeSession(sess, &st.policy)
  353. s, err := st.openRecordingSession(sig, now)
  354. if err != nil {
  355. delete(st.sessions, sig.ID)
  356. continue
  357. }
  358. s.segmentIdx = segIdx
  359. s.audioSubs = oldSubs
  360. s.restoreDSPState(oldState)
  361. st.sessions[sig.ID] = s
  362. }
  363. }
  364. // Close sessions for disappeared signals (with grace period)
  365. for id, sess := range st.sessions {
  366. if seen[id] {
  367. continue
  368. }
  369. gracePeriod := 3 * time.Second
  370. if sess.listenOnly {
  371. gracePeriod = 5 * time.Second
  372. }
  373. if now.Sub(sess.lastFeed) > gracePeriod {
  374. for _, sub := range sess.audioSubs {
  375. close(sub.ch)
  376. }
  377. sess.audioSubs = nil
  378. if !sess.listenOnly {
  379. closeSession(sess, &st.policy)
  380. }
  381. delete(st.sessions, id)
  382. }
  383. }
  384. }
  385. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  386. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  387. if st.policy.DebugLiveAudio {
  388. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  389. }
  390. return true
  391. }
  392. for subID, pl := range st.pendingListens {
  393. delta := math.Abs(sig.CenterHz - pl.freq)
  394. if delta < 200000 {
  395. if st.policy.DebugLiveAudio {
  396. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  397. }
  398. return true
  399. }
  400. }
  401. return false
  402. }
  403. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  404. for subID, pl := range st.pendingListens {
  405. requestedMode := normalizeRequestedMode(pl.mode)
  406. if requestedMode != "" && sess.demodName != requestedMode {
  407. continue
  408. }
  409. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  410. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  411. delete(st.pendingListens, subID)
  412. // Send updated audio_info now that we know the real session params.
  413. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  414. infoJSON, _ := json.Marshal(sess.audioInfo())
  415. tagged := make([]byte, 1+len(infoJSON))
  416. tagged[0] = 0x00 // tag: audio_info
  417. copy(tagged[1:], infoJSON)
  418. select {
  419. case pl.ch <- tagged:
  420. default:
  421. }
  422. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  423. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  424. }
  425. }
  426. }
  427. // CloseAll finalises all sessions and stops the worker goroutine.
  428. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  429. st.mu.Lock()
  430. defer st.mu.Unlock()
  431. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  432. for _, sess := range st.sessions {
  433. out[sess.signalID] = RuntimeSignalInfo{
  434. DemodName: sess.demodName,
  435. PlaybackMode: sess.playbackMode,
  436. StereoState: sess.stereoState,
  437. Channels: sess.channels,
  438. SampleRate: sess.sampleRate,
  439. }
  440. }
  441. return out
  442. }
  443. func (st *Streamer) CloseAll() {
  444. close(st.feedCh)
  445. <-st.done
  446. st.mu.Lock()
  447. defer st.mu.Unlock()
  448. for id, sess := range st.sessions {
  449. for _, sub := range sess.audioSubs {
  450. close(sub.ch)
  451. }
  452. sess.audioSubs = nil
  453. if !sess.listenOnly {
  454. closeSession(sess, &st.policy)
  455. }
  456. delete(st.sessions, id)
  457. }
  458. for _, pl := range st.pendingListens {
  459. close(pl.ch)
  460. }
  461. st.pendingListens = nil
  462. }
  463. // ActiveSessions returns the number of open streaming sessions.
  464. func (st *Streamer) ActiveSessions() int {
  465. st.mu.Lock()
  466. defer st.mu.Unlock()
  467. return len(st.sessions)
  468. }
  469. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  470. //
  471. // LL-2: Returns AudioInfo with correct channels and sample rate.
  472. // LL-3: Returns error only on hard failures (nil streamer etc).
  473. //
  474. // If a matching session exists, attaches immediately. Otherwise, the
  475. // subscriber is held as "pending" and will be attached when a matching
  476. // signal appears in the next DSP frame.
  477. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  478. ch := make(chan []byte, 64)
  479. st.mu.Lock()
  480. defer st.mu.Unlock()
  481. st.nextSub++
  482. subID := st.nextSub
  483. requestedMode := normalizeRequestedMode(mode)
  484. // Try to find a matching session
  485. var bestSess *streamSession
  486. bestDist := math.MaxFloat64
  487. for _, sess := range st.sessions {
  488. if requestedMode != "" && sess.demodName != requestedMode {
  489. continue
  490. }
  491. d := math.Abs(sess.centerHz - freq)
  492. if d < bestDist {
  493. bestDist = d
  494. bestSess = sess
  495. }
  496. }
  497. if bestSess != nil && bestDist < 200000 {
  498. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  499. info := bestSess.audioInfo()
  500. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  501. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  502. return subID, ch, info, nil
  503. }
  504. // No matching session yet — add as pending listener
  505. st.pendingListens[subID] = &pendingListen{
  506. freq: freq,
  507. bw: bw,
  508. mode: mode,
  509. ch: ch,
  510. }
  511. info := defaultAudioInfoForMode(mode)
  512. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  513. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  514. return subID, ch, info, nil
  515. }
  516. // UnsubscribeAudio removes a live-listen subscriber.
  517. func (st *Streamer) UnsubscribeAudio(subID int64) {
  518. st.mu.Lock()
  519. defer st.mu.Unlock()
  520. if pl, ok := st.pendingListens[subID]; ok {
  521. close(pl.ch)
  522. delete(st.pendingListens, subID)
  523. return
  524. }
  525. for _, sess := range st.sessions {
  526. for i, sub := range sess.audioSubs {
  527. if sub.id == subID {
  528. close(sub.ch)
  529. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  530. return
  531. }
  532. }
  533. }
  534. }
  535. // ---------------------------------------------------------------------------
  536. // Session: stateful extraction + demod
  537. // ---------------------------------------------------------------------------
  538. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  539. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  540. // output with zero transient artifacts.
  541. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  542. if len(snippet) == 0 || snipRate <= 0 {
  543. return nil, 0
  544. }
  545. isWFMStereo := sess.demodName == "WFM_STEREO"
  546. isWFM := sess.demodName == "WFM" || isWFMStereo
  547. demodName := sess.demodName
  548. if isWFMStereo {
  549. demodName = "WFM"
  550. }
  551. d := demod.Get(demodName)
  552. if d == nil {
  553. d = demod.Get("NFM")
  554. }
  555. if d == nil {
  556. return nil, 0
  557. }
  558. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  559. // The FM discriminator needs iq[i-1] to compute the first output.
  560. // All FIR filtering is now stateful, so no additional overlap is needed.
  561. var fullSnip []complex64
  562. trimSamples := 0
  563. if len(sess.overlapIQ) == 1 {
  564. fullSnip = make([]complex64, 1+len(snippet))
  565. fullSnip[0] = sess.overlapIQ[0]
  566. copy(fullSnip[1:], snippet)
  567. trimSamples = 1
  568. } else {
  569. fullSnip = snippet
  570. }
  571. // Save last sample for next frame's FM discriminator
  572. if len(snippet) > 0 {
  573. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  574. }
  575. // --- Stateful anti-alias FIR + decimation to demod rate ---
  576. demodRate := d.OutputSampleRate()
  577. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  578. if decim1 < 1 {
  579. decim1 = 1
  580. }
  581. actualDemodRate := snipRate / decim1
  582. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  583. var dec []complex64
  584. if decim1 > 1 {
  585. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  586. // Lazy-init or reinit stateful FIR if parameters changed
  587. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  588. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  589. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  590. sess.preDemodRate = snipRate
  591. sess.preDemodCutoff = cutoff
  592. sess.preDemodDecim = decim1
  593. }
  594. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  595. dec = dsp.Decimate(filtered, decim1)
  596. } else {
  597. dec = fullSnip
  598. }
  599. // --- FM Demod ---
  600. audio := d.Demod(dec, actualDemodRate)
  601. if len(audio) == 0 {
  602. return nil, 0
  603. }
  604. // --- Trim the 1-sample FM discriminator overlap ---
  605. if trimSamples > 0 {
  606. audioTrim := trimSamples / decim1
  607. if audioTrim < 1 {
  608. audioTrim = 1 // at minimum trim 1 audio sample
  609. }
  610. if audioTrim > 0 && audioTrim < len(audio) {
  611. audio = audio[audioTrim:]
  612. }
  613. }
  614. // --- Stateful stereo decode with conservative lock/hysteresis ---
  615. channels := 1
  616. if isWFMStereo {
  617. sess.playbackMode = "WFM_STEREO"
  618. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  619. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  620. if locked {
  621. sess.stereoOnCount++
  622. sess.stereoOffCount = 0
  623. if sess.stereoOnCount >= 4 {
  624. sess.stereoEnabled = true
  625. }
  626. } else {
  627. sess.stereoOnCount = 0
  628. sess.stereoOffCount++
  629. if sess.stereoOffCount >= 10 {
  630. sess.stereoEnabled = false
  631. }
  632. }
  633. prevPlayback := sess.playbackMode
  634. prevStereo := sess.stereoState
  635. if sess.stereoEnabled && len(stereoAudio) > 0 {
  636. sess.stereoState = "locked"
  637. audio = stereoAudio
  638. } else {
  639. sess.stereoState = "mono-fallback"
  640. dual := make([]float32, len(audio)*2)
  641. for i, s := range audio {
  642. dual[i*2] = s
  643. dual[i*2+1] = s
  644. }
  645. audio = dual
  646. }
  647. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  648. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  649. }
  650. }
  651. // --- Polyphase resample to exact 48kHz ---
  652. if actualDemodRate != streamAudioRate {
  653. if channels > 1 {
  654. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  655. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  656. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  657. sess.stereoResamplerRate = actualDemodRate
  658. }
  659. audio = sess.stereoResampler.Process(audio)
  660. } else {
  661. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  662. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  663. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  664. sess.monoResamplerRate = actualDemodRate
  665. }
  666. audio = sess.monoResampler.Process(audio)
  667. }
  668. }
  669. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  670. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  671. tau := sess.deemphasisUs * 1e-6
  672. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  673. if channels > 1 {
  674. nFrames := len(audio) / channels
  675. yL, yR := sess.deemphL, sess.deemphR
  676. for i := 0; i < nFrames; i++ {
  677. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  678. audio[i*2] = float32(yL)
  679. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  680. audio[i*2+1] = float32(yR)
  681. }
  682. sess.deemphL, sess.deemphR = yL, yR
  683. } else {
  684. y := sess.deemphL
  685. for i := range audio {
  686. y = alpha*y + (1-alpha)*float64(audio[i])
  687. audio[i] = float32(y)
  688. }
  689. sess.deemphL = y
  690. }
  691. }
  692. if isWFM {
  693. for i := range audio {
  694. audio[i] *= 0.35
  695. }
  696. }
  697. return audio, streamAudioRate
  698. }
  699. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  700. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  701. // loopBW is in Hz, sampleRate in samples/sec.
  702. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  703. if sampleRate <= 0 || loopBW <= 0 {
  704. return 0, 0
  705. }
  706. bl := loopBW / float64(sampleRate)
  707. theta := bl / (damping + 0.25/damping)
  708. d := 1 + 2*damping*theta + theta*theta
  709. alpha := (4 * damping * theta) / d
  710. beta := (4 * theta * theta) / d
  711. return alpha, beta
  712. }
  713. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  714. // Uses persistent FIR filter state across frames for click-free stereo.
  715. // Reuses session scratch buffers to minimize allocations.
  716. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  717. if len(mono) == 0 || sampleRate <= 0 {
  718. return nil, false
  719. }
  720. n := len(mono)
  721. // Rebuild rate-dependent stereo filters when sampleRate changes
  722. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  723. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  724. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  725. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  726. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  727. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  728. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  729. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  730. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  731. sess.stereoFilterRate = sampleRate
  732. // Initialize PLL for 19kHz pilot tracking.
  733. sess.pilotPhase = 0
  734. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  735. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  736. sess.pilotErrAvg = 0
  737. sess.pilotI = 0
  738. sess.pilotQ = 0
  739. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  740. }
  741. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  742. scratch := sess.growAudio(n * 5)
  743. lpr := scratch[:n]
  744. bpfLR := scratch[n : 2*n]
  745. lr := scratch[2*n : 3*n]
  746. work1 := scratch[3*n : 4*n]
  747. work2 := scratch[4*n : 5*n]
  748. sess.stereoLPF.ProcessInto(mono, lpr)
  749. // 23-53kHz bandpass for L-R DSB-SC.
  750. sess.stereoBPHi.ProcessInto(mono, work1)
  751. sess.stereoBPLo.ProcessInto(mono, work2)
  752. for i := 0; i < n; i++ {
  753. bpfLR[i] = work1[i] - work2[i]
  754. }
  755. // 19kHz pilot bandpass for PLL.
  756. sess.pilotLPFHi.ProcessInto(mono, work1)
  757. sess.pilotLPFLo.ProcessInto(mono, work2)
  758. for i := 0; i < n; i++ {
  759. work1[i] = work1[i] - work2[i]
  760. }
  761. pilot := work1
  762. phase := sess.pilotPhase
  763. freq := sess.pilotFreq
  764. alpha := sess.pilotAlpha
  765. beta := sess.pilotBeta
  766. iState := sess.pilotI
  767. qState := sess.pilotQ
  768. lpAlpha := sess.pilotLPAlpha
  769. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  770. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  771. var pilotPower float64
  772. var totalPower float64
  773. var errSum float64
  774. for i := 0; i < n; i++ {
  775. p := float64(pilot[i])
  776. sinP, cosP := math.Sincos(phase)
  777. iMix := p * cosP
  778. qMix := p * -sinP
  779. iState += lpAlpha * (iMix - iState)
  780. qState += lpAlpha * (qMix - qState)
  781. err := math.Atan2(qState, iState)
  782. freq += beta * err
  783. if freq < minFreq {
  784. freq = minFreq
  785. } else if freq > maxFreq {
  786. freq = maxFreq
  787. }
  788. phase += freq + alpha*err
  789. if phase > 2*math.Pi {
  790. phase -= 2 * math.Pi
  791. } else if phase < 0 {
  792. phase += 2 * math.Pi
  793. }
  794. totalPower += float64(mono[i]) * float64(mono[i])
  795. pilotPower += p * p
  796. errSum += math.Abs(err)
  797. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  798. }
  799. sess.pilotPhase = phase
  800. sess.pilotFreq = freq
  801. sess.pilotI = iState
  802. sess.pilotQ = qState
  803. blockErr := errSum / float64(n)
  804. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  805. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  806. pilotRatio := 0.0
  807. if totalPower > 0 {
  808. pilotRatio = pilotPower / totalPower
  809. }
  810. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  811. // Lock heuristics: pilot power fraction and PLL phase error stability.
  812. // Pilot power is a small but stable fraction of composite energy; require
  813. // a modest floor plus PLL settling to avoid flapping in noise.
  814. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  815. out := make([]float32, n*2)
  816. for i := 0; i < n; i++ {
  817. out[i*2] = 0.5 * (lpr[i] + lr[i])
  818. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  819. }
  820. return out, locked
  821. }
  822. // dspStateSnapshot captures persistent DSP state for segment splits.
  823. type dspStateSnapshot struct {
  824. overlapIQ []complex64
  825. deemphL float64
  826. deemphR float64
  827. pilotPhase float64
  828. pilotFreq float64
  829. pilotAlpha float64
  830. pilotBeta float64
  831. pilotErrAvg float64
  832. pilotI float64
  833. pilotQ float64
  834. pilotLPAlpha float64
  835. monoResampler *dsp.Resampler
  836. monoResamplerRate int
  837. stereoResampler *dsp.StereoResampler
  838. stereoResamplerRate int
  839. stereoLPF *dsp.StatefulFIRReal
  840. stereoFilterRate int
  841. stereoBPHi *dsp.StatefulFIRReal
  842. stereoBPLo *dsp.StatefulFIRReal
  843. stereoLRLPF *dsp.StatefulFIRReal
  844. stereoAALPF *dsp.StatefulFIRReal
  845. pilotLPFHi *dsp.StatefulFIRReal
  846. pilotLPFLo *dsp.StatefulFIRReal
  847. preDemodFIR *dsp.StatefulFIRComplex
  848. preDemodDecim int
  849. preDemodRate int
  850. preDemodCutoff float64
  851. }
  852. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  853. return dspStateSnapshot{
  854. overlapIQ: sess.overlapIQ,
  855. deemphL: sess.deemphL,
  856. deemphR: sess.deemphR,
  857. pilotPhase: sess.pilotPhase,
  858. pilotFreq: sess.pilotFreq,
  859. pilotAlpha: sess.pilotAlpha,
  860. pilotBeta: sess.pilotBeta,
  861. pilotErrAvg: sess.pilotErrAvg,
  862. pilotI: sess.pilotI,
  863. pilotQ: sess.pilotQ,
  864. pilotLPAlpha: sess.pilotLPAlpha,
  865. monoResampler: sess.monoResampler,
  866. monoResamplerRate: sess.monoResamplerRate,
  867. stereoResampler: sess.stereoResampler,
  868. stereoResamplerRate: sess.stereoResamplerRate,
  869. stereoLPF: sess.stereoLPF,
  870. stereoFilterRate: sess.stereoFilterRate,
  871. stereoBPHi: sess.stereoBPHi,
  872. stereoBPLo: sess.stereoBPLo,
  873. stereoLRLPF: sess.stereoLRLPF,
  874. stereoAALPF: sess.stereoAALPF,
  875. pilotLPFHi: sess.pilotLPFHi,
  876. pilotLPFLo: sess.pilotLPFLo,
  877. preDemodFIR: sess.preDemodFIR,
  878. preDemodDecim: sess.preDemodDecim,
  879. preDemodRate: sess.preDemodRate,
  880. preDemodCutoff: sess.preDemodCutoff,
  881. }
  882. }
  883. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  884. sess.overlapIQ = s.overlapIQ
  885. sess.deemphL = s.deemphL
  886. sess.deemphR = s.deemphR
  887. sess.pilotPhase = s.pilotPhase
  888. sess.pilotFreq = s.pilotFreq
  889. sess.pilotAlpha = s.pilotAlpha
  890. sess.pilotBeta = s.pilotBeta
  891. sess.pilotErrAvg = s.pilotErrAvg
  892. sess.pilotI = s.pilotI
  893. sess.pilotQ = s.pilotQ
  894. sess.pilotLPAlpha = s.pilotLPAlpha
  895. sess.monoResampler = s.monoResampler
  896. sess.monoResamplerRate = s.monoResamplerRate
  897. sess.stereoResampler = s.stereoResampler
  898. sess.stereoResamplerRate = s.stereoResamplerRate
  899. sess.stereoLPF = s.stereoLPF
  900. sess.stereoFilterRate = s.stereoFilterRate
  901. sess.stereoBPHi = s.stereoBPHi
  902. sess.stereoBPLo = s.stereoBPLo
  903. sess.stereoLRLPF = s.stereoLRLPF
  904. sess.stereoAALPF = s.stereoAALPF
  905. sess.pilotLPFHi = s.pilotLPFHi
  906. sess.pilotLPFLo = s.pilotLPFLo
  907. sess.preDemodFIR = s.preDemodFIR
  908. sess.preDemodDecim = s.preDemodDecim
  909. sess.preDemodRate = s.preDemodRate
  910. sess.preDemodCutoff = s.preDemodCutoff
  911. }
  912. // ---------------------------------------------------------------------------
  913. // Session management helpers
  914. // ---------------------------------------------------------------------------
  915. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  916. outputDir := st.policy.OutputDir
  917. if outputDir == "" {
  918. outputDir = "data/recordings"
  919. }
  920. demodName, channels := resolveDemod(sig)
  921. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  922. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  923. dir := filepath.Join(outputDir, dirName)
  924. if err := os.MkdirAll(dir, 0o755); err != nil {
  925. return nil, err
  926. }
  927. wavPath := filepath.Join(dir, "audio.wav")
  928. f, err := os.Create(wavPath)
  929. if err != nil {
  930. return nil, err
  931. }
  932. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  933. f.Close()
  934. return nil, err
  935. }
  936. playbackMode, stereoState := initialPlaybackState(demodName)
  937. sess := &streamSession{
  938. signalID: sig.ID,
  939. centerHz: sig.CenterHz,
  940. bwHz: sig.BWHz,
  941. snrDb: sig.SNRDb,
  942. peakDb: sig.PeakDb,
  943. class: sig.Class,
  944. startTime: now,
  945. lastFeed: now,
  946. dir: dir,
  947. wavFile: f,
  948. wavBuf: bufio.NewWriterSize(f, 64*1024),
  949. sampleRate: streamAudioRate,
  950. channels: channels,
  951. demodName: demodName,
  952. playbackMode: playbackMode,
  953. stereoState: stereoState,
  954. deemphasisUs: st.policy.DeemphasisUs,
  955. }
  956. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  957. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  958. return sess, nil
  959. }
  960. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  961. demodName, channels := resolveDemod(sig)
  962. for _, pl := range st.pendingListens {
  963. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  964. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  965. demodName = requested
  966. if demodName == "WFM_STEREO" {
  967. channels = 2
  968. } else if d := demod.Get(demodName); d != nil {
  969. channels = d.Channels()
  970. } else {
  971. channels = 1
  972. }
  973. break
  974. }
  975. }
  976. }
  977. playbackMode, stereoState := initialPlaybackState(demodName)
  978. sess := &streamSession{
  979. signalID: sig.ID,
  980. centerHz: sig.CenterHz,
  981. bwHz: sig.BWHz,
  982. snrDb: sig.SNRDb,
  983. peakDb: sig.PeakDb,
  984. class: sig.Class,
  985. startTime: now,
  986. lastFeed: now,
  987. listenOnly: true,
  988. sampleRate: streamAudioRate,
  989. channels: channels,
  990. demodName: demodName,
  991. playbackMode: playbackMode,
  992. stereoState: stereoState,
  993. deemphasisUs: st.policy.DeemphasisUs,
  994. }
  995. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  996. sig.ID, sig.CenterHz/1e6, demodName)
  997. return sess
  998. }
  999. func resolveDemod(sig *detector.Signal) (string, int) {
  1000. demodName := "NFM"
  1001. if sig.Class != nil {
  1002. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1003. demodName = n
  1004. }
  1005. }
  1006. channels := 1
  1007. if demodName == "WFM_STEREO" {
  1008. channels = 2
  1009. } else if d := demod.Get(demodName); d != nil {
  1010. channels = d.Channels()
  1011. }
  1012. return demodName, channels
  1013. }
  1014. func initialPlaybackState(demodName string) (string, string) {
  1015. playbackMode := demodName
  1016. stereoState := "mono"
  1017. if demodName == "WFM_STEREO" {
  1018. stereoState = "searching"
  1019. }
  1020. return playbackMode, stereoState
  1021. }
  1022. func (sess *streamSession) audioInfo() AudioInfo {
  1023. return AudioInfo{
  1024. SampleRate: sess.sampleRate,
  1025. Channels: sess.channels,
  1026. Format: "s16le",
  1027. DemodName: sess.demodName,
  1028. PlaybackMode: sess.playbackMode,
  1029. StereoState: sess.stereoState,
  1030. }
  1031. }
  1032. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1033. infoJSON, _ := json.Marshal(info)
  1034. tagged := make([]byte, 1+len(infoJSON))
  1035. tagged[0] = 0x00 // tag: audio_info
  1036. copy(tagged[1:], infoJSON)
  1037. for _, sub := range subs {
  1038. select {
  1039. case sub.ch <- tagged:
  1040. default:
  1041. }
  1042. }
  1043. }
  1044. func defaultAudioInfoForMode(mode string) AudioInfo {
  1045. demodName := "NFM"
  1046. if requested := normalizeRequestedMode(mode); requested != "" {
  1047. demodName = requested
  1048. }
  1049. channels := 1
  1050. if demodName == "WFM_STEREO" {
  1051. channels = 2
  1052. } else if d := demod.Get(demodName); d != nil {
  1053. channels = d.Channels()
  1054. }
  1055. playbackMode, stereoState := initialPlaybackState(demodName)
  1056. return AudioInfo{
  1057. SampleRate: streamAudioRate,
  1058. Channels: channels,
  1059. Format: "s16le",
  1060. DemodName: demodName,
  1061. PlaybackMode: playbackMode,
  1062. StereoState: stereoState,
  1063. }
  1064. }
  1065. func normalizeRequestedMode(mode string) string {
  1066. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1067. case "", "AUTO":
  1068. return ""
  1069. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1070. return strings.ToUpper(strings.TrimSpace(mode))
  1071. default:
  1072. return ""
  1073. }
  1074. }
  1075. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1076. func (sess *streamSession) growIQ(n int) []complex64 {
  1077. if cap(sess.scratchIQ) >= n {
  1078. return sess.scratchIQ[:n]
  1079. }
  1080. sess.scratchIQ = make([]complex64, n, n*5/4)
  1081. return sess.scratchIQ
  1082. }
  1083. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1084. func (sess *streamSession) growAudio(n int) []float32 {
  1085. if cap(sess.scratchAudio) >= n {
  1086. return sess.scratchAudio[:n]
  1087. }
  1088. sess.scratchAudio = make([]float32, n, n*5/4)
  1089. return sess.scratchAudio
  1090. }
  1091. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1092. func (sess *streamSession) growPCM(n int) []byte {
  1093. if cap(sess.scratchPCM) >= n {
  1094. return sess.scratchPCM[:n]
  1095. }
  1096. sess.scratchPCM = make([]byte, n, n*5/4)
  1097. return sess.scratchPCM
  1098. }
  1099. func convertToListenOnly(sess *streamSession) {
  1100. if sess.wavBuf != nil {
  1101. _ = sess.wavBuf.Flush()
  1102. }
  1103. if sess.wavFile != nil {
  1104. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1105. sess.wavFile.Close()
  1106. }
  1107. sess.wavFile = nil
  1108. sess.wavBuf = nil
  1109. sess.listenOnly = true
  1110. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1111. }
  1112. func closeSession(sess *streamSession, policy *Policy) {
  1113. if sess.listenOnly {
  1114. return
  1115. }
  1116. if sess.wavBuf != nil {
  1117. _ = sess.wavBuf.Flush()
  1118. }
  1119. if sess.wavFile != nil {
  1120. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1121. sess.wavFile.Close()
  1122. sess.wavFile = nil
  1123. sess.wavBuf = nil
  1124. }
  1125. dur := sess.lastFeed.Sub(sess.startTime)
  1126. files := map[string]any{
  1127. "audio": "audio.wav",
  1128. "audio_sample_rate": sess.sampleRate,
  1129. "audio_channels": sess.channels,
  1130. "audio_demod": sess.demodName,
  1131. "recording_mode": "streaming",
  1132. }
  1133. meta := Meta{
  1134. EventID: sess.signalID,
  1135. Start: sess.startTime,
  1136. End: sess.lastFeed,
  1137. CenterHz: sess.centerHz,
  1138. BandwidthHz: sess.bwHz,
  1139. SampleRate: sess.sampleRate,
  1140. SNRDb: sess.snrDb,
  1141. PeakDb: sess.peakDb,
  1142. Class: sess.class,
  1143. DurationMs: dur.Milliseconds(),
  1144. Files: files,
  1145. }
  1146. b, err := json.MarshalIndent(meta, "", " ")
  1147. if err == nil {
  1148. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1149. }
  1150. if policy != nil {
  1151. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1152. }
  1153. }
  1154. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1155. if len(sess.audioSubs) == 0 {
  1156. return
  1157. }
  1158. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1159. tagged := make([]byte, 1+pcmLen)
  1160. tagged[0] = 0x01
  1161. copy(tagged[1:], pcm[:pcmLen])
  1162. alive := sess.audioSubs[:0]
  1163. for _, sub := range sess.audioSubs {
  1164. select {
  1165. case sub.ch <- tagged:
  1166. default:
  1167. st.droppedPCM++
  1168. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1169. }
  1170. alive = append(alive, sub)
  1171. }
  1172. sess.audioSubs = alive
  1173. }
  1174. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1175. if len(st.policy.ClassFilter) == 0 {
  1176. return true
  1177. }
  1178. if cls == nil {
  1179. return false
  1180. }
  1181. for _, f := range st.policy.ClassFilter {
  1182. if strings.EqualFold(f, string(cls.ModType)) {
  1183. return true
  1184. }
  1185. }
  1186. return false
  1187. }
  1188. // ErrNoSession is returned when no matching signal session exists.
  1189. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1190. // ---------------------------------------------------------------------------
  1191. // WAV header helpers
  1192. // ---------------------------------------------------------------------------
  1193. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1194. if channels <= 0 {
  1195. channels = 1
  1196. }
  1197. hdr := make([]byte, 44)
  1198. copy(hdr[0:4], "RIFF")
  1199. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1200. copy(hdr[8:12], "WAVE")
  1201. copy(hdr[12:16], "fmt ")
  1202. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1203. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1204. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1205. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1206. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1207. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1208. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1209. copy(hdr[36:40], "data")
  1210. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1211. _, err := f.Write(hdr)
  1212. return err
  1213. }
  1214. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1215. dataSize := uint32(totalSamples * 2)
  1216. var buf [4]byte
  1217. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1218. if _, err := f.Seek(4, 0); err != nil {
  1219. return
  1220. }
  1221. _, _ = f.Write(buf[:])
  1222. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1223. if _, err := f.Seek(24, 0); err != nil {
  1224. return
  1225. }
  1226. _, _ = f.Write(buf[:])
  1227. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1228. if _, err := f.Seek(28, 0); err != nil {
  1229. return
  1230. }
  1231. _, _ = f.Write(buf[:])
  1232. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1233. if _, err := f.Seek(40, 0); err != nil {
  1234. return
  1235. }
  1236. _, _ = f.Write(buf[:])
  1237. }