Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

1355 wiersze
38KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. )
  20. // ---------------------------------------------------------------------------
  21. // streamSession — one open demod session for one signal
  22. // ---------------------------------------------------------------------------
  23. type streamSession struct {
  24. signalID int64
  25. centerHz float64
  26. bwHz float64
  27. snrDb float64
  28. peakDb float64
  29. class *classifier.Classification
  30. startTime time.Time
  31. lastFeed time.Time
  32. playbackMode string
  33. stereoState string
  34. // listenOnly sessions have no WAV file and no disk I/O.
  35. // They exist solely to feed audio to live-listen subscribers.
  36. listenOnly bool
  37. // Recording state (nil/zero for listen-only sessions)
  38. dir string
  39. wavFile *os.File
  40. wavBuf *bufio.Writer
  41. wavSamples int64
  42. segmentIdx int
  43. sampleRate int // actual output audio sample rate (always streamAudioRate)
  44. channels int
  45. demodName string
  46. // --- Persistent DSP state for click-free streaming ---
  47. // Overlap-save: tail of previous extracted IQ snippet.
  48. overlapIQ []complex64
  49. // De-emphasis IIR state (persists across frames)
  50. deemphL float64
  51. deemphR float64
  52. // Stereo lock state for live WFM streaming
  53. stereoEnabled bool
  54. stereoOnCount int
  55. stereoOffCount int
  56. // Pilot-locked stereo PLL state (19kHz pilot)
  57. pilotPhase float64
  58. pilotFreq float64
  59. pilotAlpha float64
  60. pilotBeta float64
  61. pilotErrAvg float64
  62. pilotI float64
  63. pilotQ float64
  64. pilotLPAlpha float64
  65. // Polyphase resampler (replaces integer-decimate hack)
  66. monoResampler *dsp.Resampler
  67. monoResamplerRate int
  68. stereoResampler *dsp.StereoResampler
  69. stereoResamplerRate int
  70. // AQ-4: Stateful FIR filters for click-free stereo decode
  71. stereoFilterRate int
  72. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  73. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  74. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  75. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  76. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  77. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  78. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  79. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  80. // and avoids per-frame FIR recomputation)
  81. preDemodFIR *dsp.StatefulFIRComplex
  82. preDemodDecim int // cached decimation factor
  83. preDemodRate int // cached snipRate this FIR was built for
  84. preDemodCutoff float64 // cached cutoff
  85. // AQ-2: De-emphasis config (µs, 0 = disabled)
  86. deemphasisUs float64
  87. // Scratch buffers — reused across frames to avoid GC pressure.
  88. // Grown as needed, never shrunk.
  89. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  90. scratchAudio []float32 // for stereo decode intermediates
  91. scratchPCM []byte // for PCM encoding
  92. // live-listen subscribers
  93. audioSubs []audioSub
  94. }
  95. type audioSub struct {
  96. id int64
  97. ch chan []byte
  98. }
  99. type RuntimeSignalInfo struct {
  100. DemodName string
  101. PlaybackMode string
  102. StereoState string
  103. Channels int
  104. SampleRate int
  105. }
  106. // AudioInfo describes the audio format of a live-listen subscription.
  107. // Sent to the WebSocket client as the first message.
  108. type AudioInfo struct {
  109. SampleRate int `json:"sample_rate"`
  110. Channels int `json:"channels"`
  111. Format string `json:"format"` // always "s16le"
  112. DemodName string `json:"demod"`
  113. PlaybackMode string `json:"playback_mode,omitempty"`
  114. StereoState string `json:"stereo_state,omitempty"`
  115. }
  116. const (
  117. streamAudioRate = 48000
  118. resamplerTaps = 32 // taps per polyphase arm — good quality
  119. )
  120. // ---------------------------------------------------------------------------
  121. // Streamer — manages all active streaming sessions
  122. // ---------------------------------------------------------------------------
  123. type streamFeedItem struct {
  124. signal detector.Signal
  125. snippet []complex64
  126. snipRate int
  127. }
  128. type streamFeedMsg struct {
  129. items []streamFeedItem
  130. }
  131. type Streamer struct {
  132. mu sync.Mutex
  133. sessions map[int64]*streamSession
  134. policy Policy
  135. centerHz float64
  136. nextSub int64
  137. feedCh chan streamFeedMsg
  138. done chan struct{}
  139. // pendingListens are subscribers waiting for a matching session.
  140. pendingListens map[int64]*pendingListen
  141. }
  142. type pendingListen struct {
  143. freq float64
  144. bw float64
  145. mode string
  146. ch chan []byte
  147. }
  148. func newStreamer(policy Policy, centerHz float64) *Streamer {
  149. st := &Streamer{
  150. sessions: make(map[int64]*streamSession),
  151. policy: policy,
  152. centerHz: centerHz,
  153. feedCh: make(chan streamFeedMsg, 2),
  154. done: make(chan struct{}),
  155. pendingListens: make(map[int64]*pendingListen),
  156. }
  157. go st.worker()
  158. return st
  159. }
  160. func (st *Streamer) worker() {
  161. for msg := range st.feedCh {
  162. st.processFeed(msg)
  163. }
  164. close(st.done)
  165. }
  166. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  167. st.mu.Lock()
  168. defer st.mu.Unlock()
  169. wasEnabled := st.policy.Enabled
  170. st.policy = policy
  171. st.centerHz = centerHz
  172. // If recording was just disabled, close recording sessions
  173. // but keep listen-only sessions alive.
  174. if wasEnabled && !policy.Enabled {
  175. for id, sess := range st.sessions {
  176. if sess.listenOnly {
  177. continue
  178. }
  179. if len(sess.audioSubs) > 0 {
  180. // Convert to listen-only: close WAV but keep session
  181. convertToListenOnly(sess)
  182. } else {
  183. closeSession(sess, &st.policy)
  184. delete(st.sessions, id)
  185. }
  186. }
  187. }
  188. }
  189. // HasListeners returns true if any sessions have audio subscribers
  190. // or there are pending listen requests. Used by the DSP loop to
  191. // decide whether to feed snippets even when recording is disabled.
  192. func (st *Streamer) HasListeners() bool {
  193. st.mu.Lock()
  194. defer st.mu.Unlock()
  195. return st.hasListenersLocked()
  196. }
  197. func (st *Streamer) hasListenersLocked() bool {
  198. if len(st.pendingListens) > 0 {
  199. return true
  200. }
  201. for _, sess := range st.sessions {
  202. if len(sess.audioSubs) > 0 {
  203. return true
  204. }
  205. }
  206. return false
  207. }
  208. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  209. // Feeds are accepted if:
  210. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  211. // - Any live-listen subscribers exist (listen-only mode)
  212. //
  213. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  214. // data, so items can be passed directly without another copy.
  215. func (st *Streamer) FeedSnippets(items []streamFeedItem) {
  216. st.mu.Lock()
  217. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  218. hasListeners := st.hasListenersLocked()
  219. pending := len(st.pendingListens)
  220. debugLiveAudio := st.policy.DebugLiveAudio
  221. st.mu.Unlock()
  222. if debugLiveAudio {
  223. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  224. }
  225. if (!recEnabled && !hasListeners) || len(items) == 0 {
  226. return
  227. }
  228. select {
  229. case st.feedCh <- streamFeedMsg{items: items}:
  230. default:
  231. }
  232. }
  233. // processFeed runs in the worker goroutine.
  234. func (st *Streamer) processFeed(msg streamFeedMsg) {
  235. st.mu.Lock()
  236. defer st.mu.Unlock()
  237. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  238. hasListeners := st.hasListenersLocked()
  239. if !recEnabled && !hasListeners {
  240. return
  241. }
  242. now := time.Now()
  243. seen := make(map[int64]bool, len(msg.items))
  244. for i := range msg.items {
  245. item := &msg.items[i]
  246. sig := &item.signal
  247. seen[sig.ID] = true
  248. if sig.ID == 0 || sig.Class == nil {
  249. continue
  250. }
  251. if len(item.snippet) == 0 || item.snipRate <= 0 {
  252. continue
  253. }
  254. // Decide whether this signal needs a session
  255. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  256. needsListen := st.signalHasListenerLocked(sig)
  257. className := "<nil>"
  258. demodName := ""
  259. if sig.Class != nil {
  260. className = string(sig.Class.ModType)
  261. demodName, _ = resolveDemod(sig)
  262. }
  263. if st.policy.DebugLiveAudio {
  264. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  265. }
  266. if !needsRecording && !needsListen {
  267. continue
  268. }
  269. sess, exists := st.sessions[sig.ID]
  270. requestedMode := ""
  271. for _, pl := range st.pendingListens {
  272. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  273. if m := normalizeRequestedMode(pl.mode); m != "" {
  274. requestedMode = m
  275. break
  276. }
  277. }
  278. }
  279. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  280. for _, sub := range sess.audioSubs {
  281. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  282. }
  283. delete(st.sessions, sig.ID)
  284. sess = nil
  285. exists = false
  286. }
  287. if !exists {
  288. if needsRecording {
  289. s, err := st.openRecordingSession(sig, now)
  290. if err != nil {
  291. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  292. sig.ID, sig.CenterHz/1e6, err)
  293. continue
  294. }
  295. st.sessions[sig.ID] = s
  296. sess = s
  297. } else {
  298. s := st.openListenSession(sig, now)
  299. st.sessions[sig.ID] = s
  300. sess = s
  301. }
  302. // Attach any pending listeners
  303. st.attachPendingListeners(sess)
  304. }
  305. // Update metadata
  306. sess.lastFeed = now
  307. sess.centerHz = sig.CenterHz
  308. sess.bwHz = sig.BWHz
  309. if sig.SNRDb > sess.snrDb {
  310. sess.snrDb = sig.SNRDb
  311. }
  312. if sig.PeakDb > sess.peakDb {
  313. sess.peakDb = sig.PeakDb
  314. }
  315. if sig.Class != nil {
  316. sess.class = sig.Class
  317. }
  318. // Demod with persistent state
  319. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  320. if len(audio) > 0 {
  321. if sess.wavSamples == 0 && audioRate > 0 {
  322. sess.sampleRate = audioRate
  323. }
  324. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  325. pcmLen := len(audio) * 2
  326. pcm := sess.growPCM(pcmLen)
  327. for k, s := range audio {
  328. v := int16(clip(s * 32767))
  329. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  330. }
  331. if !sess.listenOnly && sess.wavBuf != nil {
  332. n, err := sess.wavBuf.Write(pcm)
  333. if err != nil {
  334. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  335. } else {
  336. sess.wavSamples += int64(n / 2)
  337. }
  338. }
  339. st.fanoutPCM(sess, pcm, pcmLen)
  340. }
  341. // Segment split (recording sessions only)
  342. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  343. segIdx := sess.segmentIdx + 1
  344. oldSubs := sess.audioSubs
  345. oldState := sess.captureDSPState()
  346. sess.audioSubs = nil
  347. closeSession(sess, &st.policy)
  348. s, err := st.openRecordingSession(sig, now)
  349. if err != nil {
  350. delete(st.sessions, sig.ID)
  351. continue
  352. }
  353. s.segmentIdx = segIdx
  354. s.audioSubs = oldSubs
  355. s.restoreDSPState(oldState)
  356. st.sessions[sig.ID] = s
  357. }
  358. }
  359. // Close sessions for disappeared signals (with grace period)
  360. for id, sess := range st.sessions {
  361. if seen[id] {
  362. continue
  363. }
  364. gracePeriod := 3 * time.Second
  365. if sess.listenOnly {
  366. gracePeriod = 5 * time.Second
  367. }
  368. if now.Sub(sess.lastFeed) > gracePeriod {
  369. for _, sub := range sess.audioSubs {
  370. close(sub.ch)
  371. }
  372. sess.audioSubs = nil
  373. if !sess.listenOnly {
  374. closeSession(sess, &st.policy)
  375. }
  376. delete(st.sessions, id)
  377. }
  378. }
  379. }
  380. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  381. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  382. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  383. return true
  384. }
  385. for subID, pl := range st.pendingListens {
  386. delta := math.Abs(sig.CenterHz - pl.freq)
  387. if delta < 200000 {
  388. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  389. return true
  390. }
  391. }
  392. return false
  393. }
  394. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  395. for subID, pl := range st.pendingListens {
  396. requestedMode := normalizeRequestedMode(pl.mode)
  397. if requestedMode != "" && sess.demodName != requestedMode {
  398. continue
  399. }
  400. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  401. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  402. delete(st.pendingListens, subID)
  403. // Send updated audio_info now that we know the real session params.
  404. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  405. infoJSON, _ := json.Marshal(sess.audioInfo())
  406. tagged := make([]byte, 1+len(infoJSON))
  407. tagged[0] = 0x00 // tag: audio_info
  408. copy(tagged[1:], infoJSON)
  409. select {
  410. case pl.ch <- tagged:
  411. default:
  412. }
  413. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  414. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  415. }
  416. }
  417. }
  418. // CloseAll finalises all sessions and stops the worker goroutine.
  419. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  420. st.mu.Lock()
  421. defer st.mu.Unlock()
  422. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  423. for _, sess := range st.sessions {
  424. out[sess.signalID] = RuntimeSignalInfo{
  425. DemodName: sess.demodName,
  426. PlaybackMode: sess.playbackMode,
  427. StereoState: sess.stereoState,
  428. Channels: sess.channels,
  429. SampleRate: sess.sampleRate,
  430. }
  431. }
  432. return out
  433. }
  434. func (st *Streamer) CloseAll() {
  435. close(st.feedCh)
  436. <-st.done
  437. st.mu.Lock()
  438. defer st.mu.Unlock()
  439. for id, sess := range st.sessions {
  440. for _, sub := range sess.audioSubs {
  441. close(sub.ch)
  442. }
  443. sess.audioSubs = nil
  444. if !sess.listenOnly {
  445. closeSession(sess, &st.policy)
  446. }
  447. delete(st.sessions, id)
  448. }
  449. for _, pl := range st.pendingListens {
  450. close(pl.ch)
  451. }
  452. st.pendingListens = nil
  453. }
  454. // ActiveSessions returns the number of open streaming sessions.
  455. func (st *Streamer) ActiveSessions() int {
  456. st.mu.Lock()
  457. defer st.mu.Unlock()
  458. return len(st.sessions)
  459. }
  460. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  461. //
  462. // LL-2: Returns AudioInfo with correct channels and sample rate.
  463. // LL-3: Returns error only on hard failures (nil streamer etc).
  464. //
  465. // If a matching session exists, attaches immediately. Otherwise, the
  466. // subscriber is held as "pending" and will be attached when a matching
  467. // signal appears in the next DSP frame.
  468. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  469. ch := make(chan []byte, 64)
  470. st.mu.Lock()
  471. defer st.mu.Unlock()
  472. st.nextSub++
  473. subID := st.nextSub
  474. requestedMode := normalizeRequestedMode(mode)
  475. // Try to find a matching session
  476. var bestSess *streamSession
  477. bestDist := math.MaxFloat64
  478. for _, sess := range st.sessions {
  479. if requestedMode != "" && sess.demodName != requestedMode {
  480. continue
  481. }
  482. d := math.Abs(sess.centerHz - freq)
  483. if d < bestDist {
  484. bestDist = d
  485. bestSess = sess
  486. }
  487. }
  488. if bestSess != nil && bestDist < 200000 {
  489. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  490. info := bestSess.audioInfo()
  491. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  492. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  493. return subID, ch, info, nil
  494. }
  495. // No matching session yet — add as pending listener
  496. st.pendingListens[subID] = &pendingListen{
  497. freq: freq,
  498. bw: bw,
  499. mode: mode,
  500. ch: ch,
  501. }
  502. info := defaultAudioInfoForMode(mode)
  503. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  504. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  505. return subID, ch, info, nil
  506. }
  507. // UnsubscribeAudio removes a live-listen subscriber.
  508. func (st *Streamer) UnsubscribeAudio(subID int64) {
  509. st.mu.Lock()
  510. defer st.mu.Unlock()
  511. if pl, ok := st.pendingListens[subID]; ok {
  512. close(pl.ch)
  513. delete(st.pendingListens, subID)
  514. return
  515. }
  516. for _, sess := range st.sessions {
  517. for i, sub := range sess.audioSubs {
  518. if sub.id == subID {
  519. close(sub.ch)
  520. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  521. return
  522. }
  523. }
  524. }
  525. }
  526. // ---------------------------------------------------------------------------
  527. // Session: stateful extraction + demod
  528. // ---------------------------------------------------------------------------
  529. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  530. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  531. // output with zero transient artifacts.
  532. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  533. if len(snippet) == 0 || snipRate <= 0 {
  534. return nil, 0
  535. }
  536. isWFMStereo := sess.demodName == "WFM_STEREO"
  537. isWFM := sess.demodName == "WFM" || isWFMStereo
  538. demodName := sess.demodName
  539. if isWFMStereo {
  540. demodName = "WFM"
  541. }
  542. d := demod.Get(demodName)
  543. if d == nil {
  544. d = demod.Get("NFM")
  545. }
  546. if d == nil {
  547. return nil, 0
  548. }
  549. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  550. // The FM discriminator needs iq[i-1] to compute the first output.
  551. // All FIR filtering is now stateful, so no additional overlap is needed.
  552. var fullSnip []complex64
  553. trimSamples := 0
  554. if len(sess.overlapIQ) == 1 {
  555. fullSnip = make([]complex64, 1+len(snippet))
  556. fullSnip[0] = sess.overlapIQ[0]
  557. copy(fullSnip[1:], snippet)
  558. trimSamples = 1
  559. } else {
  560. fullSnip = snippet
  561. }
  562. // Save last sample for next frame's FM discriminator
  563. if len(snippet) > 0 {
  564. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  565. }
  566. // --- Stateful anti-alias FIR + decimation to demod rate ---
  567. demodRate := d.OutputSampleRate()
  568. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  569. if decim1 < 1 {
  570. decim1 = 1
  571. }
  572. actualDemodRate := snipRate / decim1
  573. var dec []complex64
  574. if decim1 > 1 {
  575. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  576. // Lazy-init or reinit stateful FIR if parameters changed
  577. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  578. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  579. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  580. sess.preDemodRate = snipRate
  581. sess.preDemodCutoff = cutoff
  582. sess.preDemodDecim = decim1
  583. }
  584. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  585. dec = dsp.Decimate(filtered, decim1)
  586. } else {
  587. dec = fullSnip
  588. }
  589. // --- FM Demod ---
  590. audio := d.Demod(dec, actualDemodRate)
  591. if len(audio) == 0 {
  592. return nil, 0
  593. }
  594. // --- Trim the 1-sample FM discriminator overlap ---
  595. if trimSamples > 0 {
  596. audioTrim := trimSamples / decim1
  597. if audioTrim < 1 {
  598. audioTrim = 1 // at minimum trim 1 audio sample
  599. }
  600. if audioTrim > 0 && audioTrim < len(audio) {
  601. audio = audio[audioTrim:]
  602. }
  603. }
  604. // --- Stateful stereo decode with conservative lock/hysteresis ---
  605. channels := 1
  606. if isWFMStereo {
  607. sess.playbackMode = "WFM_STEREO"
  608. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  609. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  610. if locked {
  611. sess.stereoOnCount++
  612. sess.stereoOffCount = 0
  613. if sess.stereoOnCount >= 4 {
  614. sess.stereoEnabled = true
  615. }
  616. } else {
  617. sess.stereoOnCount = 0
  618. sess.stereoOffCount++
  619. if sess.stereoOffCount >= 10 {
  620. sess.stereoEnabled = false
  621. }
  622. }
  623. prevPlayback := sess.playbackMode
  624. prevStereo := sess.stereoState
  625. if sess.stereoEnabled && len(stereoAudio) > 0 {
  626. sess.stereoState = "locked"
  627. audio = stereoAudio
  628. } else {
  629. sess.stereoState = "mono-fallback"
  630. dual := make([]float32, len(audio)*2)
  631. for i, s := range audio {
  632. dual[i*2] = s
  633. dual[i*2+1] = s
  634. }
  635. audio = dual
  636. }
  637. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  638. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  639. }
  640. }
  641. // --- Polyphase resample to exact 48kHz ---
  642. if actualDemodRate != streamAudioRate {
  643. if channels > 1 {
  644. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  645. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  646. sess.stereoResamplerRate = actualDemodRate
  647. }
  648. audio = sess.stereoResampler.Process(audio)
  649. } else {
  650. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  651. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  652. sess.monoResamplerRate = actualDemodRate
  653. }
  654. audio = sess.monoResampler.Process(audio)
  655. }
  656. }
  657. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  658. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  659. tau := sess.deemphasisUs * 1e-6
  660. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  661. if channels > 1 {
  662. nFrames := len(audio) / channels
  663. yL, yR := sess.deemphL, sess.deemphR
  664. for i := 0; i < nFrames; i++ {
  665. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  666. audio[i*2] = float32(yL)
  667. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  668. audio[i*2+1] = float32(yR)
  669. }
  670. sess.deemphL, sess.deemphR = yL, yR
  671. } else {
  672. y := sess.deemphL
  673. for i := range audio {
  674. y = alpha*y + (1-alpha)*float64(audio[i])
  675. audio[i] = float32(y)
  676. }
  677. sess.deemphL = y
  678. }
  679. }
  680. if isWFM {
  681. for i := range audio {
  682. audio[i] *= 0.35
  683. }
  684. }
  685. return audio, streamAudioRate
  686. }
  687. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  688. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  689. // loopBW is in Hz, sampleRate in samples/sec.
  690. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  691. if sampleRate <= 0 || loopBW <= 0 {
  692. return 0, 0
  693. }
  694. bl := loopBW / float64(sampleRate)
  695. theta := bl / (damping + 0.25/damping)
  696. d := 1 + 2*damping*theta + theta*theta
  697. alpha := (4 * damping * theta) / d
  698. beta := (4 * theta * theta) / d
  699. return alpha, beta
  700. }
  701. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  702. // Uses persistent FIR filter state across frames for click-free stereo.
  703. // Reuses session scratch buffers to minimize allocations.
  704. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  705. if len(mono) == 0 || sampleRate <= 0 {
  706. return nil, false
  707. }
  708. n := len(mono)
  709. // Rebuild rate-dependent stereo filters when sampleRate changes
  710. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  711. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  712. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  713. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  714. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  715. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  716. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  717. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  718. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  719. sess.stereoFilterRate = sampleRate
  720. // Initialize PLL for 19kHz pilot tracking.
  721. sess.pilotPhase = 0
  722. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  723. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  724. sess.pilotErrAvg = 0
  725. sess.pilotI = 0
  726. sess.pilotQ = 0
  727. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  728. }
  729. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  730. scratch := sess.growAudio(n * 5)
  731. lpr := scratch[:n]
  732. bpfLR := scratch[n : 2*n]
  733. lr := scratch[2*n : 3*n]
  734. work1 := scratch[3*n : 4*n]
  735. work2 := scratch[4*n : 5*n]
  736. sess.stereoLPF.ProcessInto(mono, lpr)
  737. // 23-53kHz bandpass for L-R DSB-SC.
  738. sess.stereoBPHi.ProcessInto(mono, work1)
  739. sess.stereoBPLo.ProcessInto(mono, work2)
  740. for i := 0; i < n; i++ {
  741. bpfLR[i] = work1[i] - work2[i]
  742. }
  743. // 19kHz pilot bandpass for PLL.
  744. sess.pilotLPFHi.ProcessInto(mono, work1)
  745. sess.pilotLPFLo.ProcessInto(mono, work2)
  746. for i := 0; i < n; i++ {
  747. work1[i] = work1[i] - work2[i]
  748. }
  749. pilot := work1
  750. phase := sess.pilotPhase
  751. freq := sess.pilotFreq
  752. alpha := sess.pilotAlpha
  753. beta := sess.pilotBeta
  754. iState := sess.pilotI
  755. qState := sess.pilotQ
  756. lpAlpha := sess.pilotLPAlpha
  757. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  758. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  759. var pilotPower float64
  760. var totalPower float64
  761. var errSum float64
  762. for i := 0; i < n; i++ {
  763. p := float64(pilot[i])
  764. sinP, cosP := math.Sincos(phase)
  765. iMix := p * cosP
  766. qMix := p * -sinP
  767. iState += lpAlpha * (iMix - iState)
  768. qState += lpAlpha * (qMix - qState)
  769. err := math.Atan2(qState, iState)
  770. freq += beta * err
  771. if freq < minFreq {
  772. freq = minFreq
  773. } else if freq > maxFreq {
  774. freq = maxFreq
  775. }
  776. phase += freq + alpha*err
  777. if phase > 2*math.Pi {
  778. phase -= 2 * math.Pi
  779. } else if phase < 0 {
  780. phase += 2 * math.Pi
  781. }
  782. totalPower += float64(mono[i]) * float64(mono[i])
  783. pilotPower += p * p
  784. errSum += math.Abs(err)
  785. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  786. }
  787. sess.pilotPhase = phase
  788. sess.pilotFreq = freq
  789. sess.pilotI = iState
  790. sess.pilotQ = qState
  791. blockErr := errSum / float64(n)
  792. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  793. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  794. pilotRatio := 0.0
  795. if totalPower > 0 {
  796. pilotRatio = pilotPower / totalPower
  797. }
  798. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  799. // Lock heuristics: pilot power fraction and PLL phase error stability.
  800. // Pilot power is a small but stable fraction of composite energy; require
  801. // a modest floor plus PLL settling to avoid flapping in noise.
  802. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  803. out := make([]float32, n*2)
  804. for i := 0; i < n; i++ {
  805. out[i*2] = 0.5 * (lpr[i] + lr[i])
  806. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  807. }
  808. return out, locked
  809. }
  810. // dspStateSnapshot captures persistent DSP state for segment splits.
  811. type dspStateSnapshot struct {
  812. overlapIQ []complex64
  813. deemphL float64
  814. deemphR float64
  815. pilotPhase float64
  816. pilotFreq float64
  817. pilotAlpha float64
  818. pilotBeta float64
  819. pilotErrAvg float64
  820. pilotI float64
  821. pilotQ float64
  822. pilotLPAlpha float64
  823. monoResampler *dsp.Resampler
  824. monoResamplerRate int
  825. stereoResampler *dsp.StereoResampler
  826. stereoResamplerRate int
  827. stereoLPF *dsp.StatefulFIRReal
  828. stereoFilterRate int
  829. stereoBPHi *dsp.StatefulFIRReal
  830. stereoBPLo *dsp.StatefulFIRReal
  831. stereoLRLPF *dsp.StatefulFIRReal
  832. stereoAALPF *dsp.StatefulFIRReal
  833. pilotLPFHi *dsp.StatefulFIRReal
  834. pilotLPFLo *dsp.StatefulFIRReal
  835. preDemodFIR *dsp.StatefulFIRComplex
  836. preDemodDecim int
  837. preDemodRate int
  838. preDemodCutoff float64
  839. }
  840. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  841. return dspStateSnapshot{
  842. overlapIQ: sess.overlapIQ,
  843. deemphL: sess.deemphL,
  844. deemphR: sess.deemphR,
  845. pilotPhase: sess.pilotPhase,
  846. pilotFreq: sess.pilotFreq,
  847. pilotAlpha: sess.pilotAlpha,
  848. pilotBeta: sess.pilotBeta,
  849. pilotErrAvg: sess.pilotErrAvg,
  850. pilotI: sess.pilotI,
  851. pilotQ: sess.pilotQ,
  852. pilotLPAlpha: sess.pilotLPAlpha,
  853. monoResampler: sess.monoResampler,
  854. monoResamplerRate: sess.monoResamplerRate,
  855. stereoResampler: sess.stereoResampler,
  856. stereoResamplerRate: sess.stereoResamplerRate,
  857. stereoLPF: sess.stereoLPF,
  858. stereoFilterRate: sess.stereoFilterRate,
  859. stereoBPHi: sess.stereoBPHi,
  860. stereoBPLo: sess.stereoBPLo,
  861. stereoLRLPF: sess.stereoLRLPF,
  862. stereoAALPF: sess.stereoAALPF,
  863. pilotLPFHi: sess.pilotLPFHi,
  864. pilotLPFLo: sess.pilotLPFLo,
  865. preDemodFIR: sess.preDemodFIR,
  866. preDemodDecim: sess.preDemodDecim,
  867. preDemodRate: sess.preDemodRate,
  868. preDemodCutoff: sess.preDemodCutoff,
  869. }
  870. }
  871. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  872. sess.overlapIQ = s.overlapIQ
  873. sess.deemphL = s.deemphL
  874. sess.deemphR = s.deemphR
  875. sess.pilotPhase = s.pilotPhase
  876. sess.pilotFreq = s.pilotFreq
  877. sess.pilotAlpha = s.pilotAlpha
  878. sess.pilotBeta = s.pilotBeta
  879. sess.pilotErrAvg = s.pilotErrAvg
  880. sess.pilotI = s.pilotI
  881. sess.pilotQ = s.pilotQ
  882. sess.pilotLPAlpha = s.pilotLPAlpha
  883. sess.monoResampler = s.monoResampler
  884. sess.monoResamplerRate = s.monoResamplerRate
  885. sess.stereoResampler = s.stereoResampler
  886. sess.stereoResamplerRate = s.stereoResamplerRate
  887. sess.stereoLPF = s.stereoLPF
  888. sess.stereoFilterRate = s.stereoFilterRate
  889. sess.stereoBPHi = s.stereoBPHi
  890. sess.stereoBPLo = s.stereoBPLo
  891. sess.stereoLRLPF = s.stereoLRLPF
  892. sess.stereoAALPF = s.stereoAALPF
  893. sess.pilotLPFHi = s.pilotLPFHi
  894. sess.pilotLPFLo = s.pilotLPFLo
  895. sess.preDemodFIR = s.preDemodFIR
  896. sess.preDemodDecim = s.preDemodDecim
  897. sess.preDemodRate = s.preDemodRate
  898. sess.preDemodCutoff = s.preDemodCutoff
  899. }
  900. // ---------------------------------------------------------------------------
  901. // Session management helpers
  902. // ---------------------------------------------------------------------------
  903. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  904. outputDir := st.policy.OutputDir
  905. if outputDir == "" {
  906. outputDir = "data/recordings"
  907. }
  908. demodName, channels := resolveDemod(sig)
  909. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  910. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  911. dir := filepath.Join(outputDir, dirName)
  912. if err := os.MkdirAll(dir, 0o755); err != nil {
  913. return nil, err
  914. }
  915. wavPath := filepath.Join(dir, "audio.wav")
  916. f, err := os.Create(wavPath)
  917. if err != nil {
  918. return nil, err
  919. }
  920. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  921. f.Close()
  922. return nil, err
  923. }
  924. playbackMode, stereoState := initialPlaybackState(demodName)
  925. sess := &streamSession{
  926. signalID: sig.ID,
  927. centerHz: sig.CenterHz,
  928. bwHz: sig.BWHz,
  929. snrDb: sig.SNRDb,
  930. peakDb: sig.PeakDb,
  931. class: sig.Class,
  932. startTime: now,
  933. lastFeed: now,
  934. dir: dir,
  935. wavFile: f,
  936. wavBuf: bufio.NewWriterSize(f, 64*1024),
  937. sampleRate: streamAudioRate,
  938. channels: channels,
  939. demodName: demodName,
  940. playbackMode: playbackMode,
  941. stereoState: stereoState,
  942. deemphasisUs: st.policy.DeemphasisUs,
  943. }
  944. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  945. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  946. return sess, nil
  947. }
  948. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  949. demodName, channels := resolveDemod(sig)
  950. for _, pl := range st.pendingListens {
  951. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  952. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  953. demodName = requested
  954. if demodName == "WFM_STEREO" {
  955. channels = 2
  956. } else if d := demod.Get(demodName); d != nil {
  957. channels = d.Channels()
  958. } else {
  959. channels = 1
  960. }
  961. break
  962. }
  963. }
  964. }
  965. playbackMode, stereoState := initialPlaybackState(demodName)
  966. sess := &streamSession{
  967. signalID: sig.ID,
  968. centerHz: sig.CenterHz,
  969. bwHz: sig.BWHz,
  970. snrDb: sig.SNRDb,
  971. peakDb: sig.PeakDb,
  972. class: sig.Class,
  973. startTime: now,
  974. lastFeed: now,
  975. listenOnly: true,
  976. sampleRate: streamAudioRate,
  977. channels: channels,
  978. demodName: demodName,
  979. playbackMode: playbackMode,
  980. stereoState: stereoState,
  981. deemphasisUs: st.policy.DeemphasisUs,
  982. }
  983. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  984. sig.ID, sig.CenterHz/1e6, demodName)
  985. return sess
  986. }
  987. func resolveDemod(sig *detector.Signal) (string, int) {
  988. demodName := "NFM"
  989. if sig.Class != nil {
  990. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  991. demodName = n
  992. }
  993. }
  994. channels := 1
  995. if demodName == "WFM_STEREO" {
  996. channels = 2
  997. } else if d := demod.Get(demodName); d != nil {
  998. channels = d.Channels()
  999. }
  1000. return demodName, channels
  1001. }
  1002. func initialPlaybackState(demodName string) (string, string) {
  1003. playbackMode := demodName
  1004. stereoState := "mono"
  1005. if demodName == "WFM_STEREO" {
  1006. stereoState = "searching"
  1007. }
  1008. return playbackMode, stereoState
  1009. }
  1010. func (sess *streamSession) audioInfo() AudioInfo {
  1011. return AudioInfo{
  1012. SampleRate: sess.sampleRate,
  1013. Channels: sess.channels,
  1014. Format: "s16le",
  1015. DemodName: sess.demodName,
  1016. PlaybackMode: sess.playbackMode,
  1017. StereoState: sess.stereoState,
  1018. }
  1019. }
  1020. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1021. infoJSON, _ := json.Marshal(info)
  1022. tagged := make([]byte, 1+len(infoJSON))
  1023. tagged[0] = 0x00 // tag: audio_info
  1024. copy(tagged[1:], infoJSON)
  1025. for _, sub := range subs {
  1026. select {
  1027. case sub.ch <- tagged:
  1028. default:
  1029. }
  1030. }
  1031. }
  1032. func defaultAudioInfoForMode(mode string) AudioInfo {
  1033. demodName := "NFM"
  1034. if requested := normalizeRequestedMode(mode); requested != "" {
  1035. demodName = requested
  1036. }
  1037. channels := 1
  1038. if demodName == "WFM_STEREO" {
  1039. channels = 2
  1040. } else if d := demod.Get(demodName); d != nil {
  1041. channels = d.Channels()
  1042. }
  1043. playbackMode, stereoState := initialPlaybackState(demodName)
  1044. return AudioInfo{
  1045. SampleRate: streamAudioRate,
  1046. Channels: channels,
  1047. Format: "s16le",
  1048. DemodName: demodName,
  1049. PlaybackMode: playbackMode,
  1050. StereoState: stereoState,
  1051. }
  1052. }
  1053. func normalizeRequestedMode(mode string) string {
  1054. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1055. case "", "AUTO":
  1056. return ""
  1057. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1058. return strings.ToUpper(strings.TrimSpace(mode))
  1059. default:
  1060. return ""
  1061. }
  1062. }
  1063. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1064. func (sess *streamSession) growIQ(n int) []complex64 {
  1065. if cap(sess.scratchIQ) >= n {
  1066. return sess.scratchIQ[:n]
  1067. }
  1068. sess.scratchIQ = make([]complex64, n, n*5/4)
  1069. return sess.scratchIQ
  1070. }
  1071. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1072. func (sess *streamSession) growAudio(n int) []float32 {
  1073. if cap(sess.scratchAudio) >= n {
  1074. return sess.scratchAudio[:n]
  1075. }
  1076. sess.scratchAudio = make([]float32, n, n*5/4)
  1077. return sess.scratchAudio
  1078. }
  1079. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1080. func (sess *streamSession) growPCM(n int) []byte {
  1081. if cap(sess.scratchPCM) >= n {
  1082. return sess.scratchPCM[:n]
  1083. }
  1084. sess.scratchPCM = make([]byte, n, n*5/4)
  1085. return sess.scratchPCM
  1086. }
  1087. func convertToListenOnly(sess *streamSession) {
  1088. if sess.wavBuf != nil {
  1089. _ = sess.wavBuf.Flush()
  1090. }
  1091. if sess.wavFile != nil {
  1092. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1093. sess.wavFile.Close()
  1094. }
  1095. sess.wavFile = nil
  1096. sess.wavBuf = nil
  1097. sess.listenOnly = true
  1098. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1099. }
  1100. func closeSession(sess *streamSession, policy *Policy) {
  1101. if sess.listenOnly {
  1102. return
  1103. }
  1104. if sess.wavBuf != nil {
  1105. _ = sess.wavBuf.Flush()
  1106. }
  1107. if sess.wavFile != nil {
  1108. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1109. sess.wavFile.Close()
  1110. sess.wavFile = nil
  1111. sess.wavBuf = nil
  1112. }
  1113. dur := sess.lastFeed.Sub(sess.startTime)
  1114. files := map[string]any{
  1115. "audio": "audio.wav",
  1116. "audio_sample_rate": sess.sampleRate,
  1117. "audio_channels": sess.channels,
  1118. "audio_demod": sess.demodName,
  1119. "recording_mode": "streaming",
  1120. }
  1121. meta := Meta{
  1122. EventID: sess.signalID,
  1123. Start: sess.startTime,
  1124. End: sess.lastFeed,
  1125. CenterHz: sess.centerHz,
  1126. BandwidthHz: sess.bwHz,
  1127. SampleRate: sess.sampleRate,
  1128. SNRDb: sess.snrDb,
  1129. PeakDb: sess.peakDb,
  1130. Class: sess.class,
  1131. DurationMs: dur.Milliseconds(),
  1132. Files: files,
  1133. }
  1134. b, err := json.MarshalIndent(meta, "", " ")
  1135. if err == nil {
  1136. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1137. }
  1138. if policy != nil {
  1139. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1140. }
  1141. }
  1142. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1143. if len(sess.audioSubs) == 0 {
  1144. return
  1145. }
  1146. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1147. tagged := make([]byte, 1+pcmLen)
  1148. tagged[0] = 0x01
  1149. copy(tagged[1:], pcm[:pcmLen])
  1150. alive := sess.audioSubs[:0]
  1151. for _, sub := range sess.audioSubs {
  1152. select {
  1153. case sub.ch <- tagged:
  1154. default:
  1155. }
  1156. alive = append(alive, sub)
  1157. }
  1158. sess.audioSubs = alive
  1159. }
  1160. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1161. if len(st.policy.ClassFilter) == 0 {
  1162. return true
  1163. }
  1164. if cls == nil {
  1165. return false
  1166. }
  1167. for _, f := range st.policy.ClassFilter {
  1168. if strings.EqualFold(f, string(cls.ModType)) {
  1169. return true
  1170. }
  1171. }
  1172. return false
  1173. }
  1174. // ErrNoSession is returned when no matching signal session exists.
  1175. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1176. // ---------------------------------------------------------------------------
  1177. // WAV header helpers
  1178. // ---------------------------------------------------------------------------
  1179. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1180. if channels <= 0 {
  1181. channels = 1
  1182. }
  1183. hdr := make([]byte, 44)
  1184. copy(hdr[0:4], "RIFF")
  1185. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1186. copy(hdr[8:12], "WAVE")
  1187. copy(hdr[12:16], "fmt ")
  1188. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1189. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1190. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1191. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1192. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1193. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1194. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1195. copy(hdr[36:40], "data")
  1196. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1197. _, err := f.Write(hdr)
  1198. return err
  1199. }
  1200. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1201. dataSize := uint32(totalSamples * 2)
  1202. var buf [4]byte
  1203. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1204. if _, err := f.Seek(4, 0); err != nil {
  1205. return
  1206. }
  1207. _, _ = f.Write(buf[:])
  1208. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1209. if _, err := f.Seek(24, 0); err != nil {
  1210. return
  1211. }
  1212. _, _ = f.Write(buf[:])
  1213. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1214. if _, err := f.Seek(28, 0); err != nil {
  1215. return
  1216. }
  1217. _, _ = f.Write(buf[:])
  1218. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1219. if _, err := f.Seek(40, 0); err != nil {
  1220. return
  1221. }
  1222. _, _ = f.Write(buf[:])
  1223. }