Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

1326 lignes
37KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. )
  20. // ---------------------------------------------------------------------------
  21. // streamSession — one open demod session for one signal
  22. // ---------------------------------------------------------------------------
  23. type streamSession struct {
  24. signalID int64
  25. centerHz float64
  26. bwHz float64
  27. snrDb float64
  28. peakDb float64
  29. class *classifier.Classification
  30. startTime time.Time
  31. lastFeed time.Time
  32. playbackMode string
  33. stereoState string
  34. // listenOnly sessions have no WAV file and no disk I/O.
  35. // They exist solely to feed audio to live-listen subscribers.
  36. listenOnly bool
  37. // Recording state (nil/zero for listen-only sessions)
  38. dir string
  39. wavFile *os.File
  40. wavBuf *bufio.Writer
  41. wavSamples int64
  42. segmentIdx int
  43. sampleRate int // actual output audio sample rate (always streamAudioRate)
  44. channels int
  45. demodName string
  46. // --- Persistent DSP state for click-free streaming ---
  47. // Overlap-save: tail of previous extracted IQ snippet.
  48. overlapIQ []complex64
  49. // De-emphasis IIR state (persists across frames)
  50. deemphL float64
  51. deemphR float64
  52. // Stereo lock state for live WFM streaming
  53. stereoEnabled bool
  54. stereoOnCount int
  55. stereoOffCount int
  56. // Pilot-locked stereo PLL state (19kHz pilot)
  57. pilotPhase float64
  58. pilotFreq float64
  59. pilotAlpha float64
  60. pilotBeta float64
  61. pilotErrAvg float64
  62. pilotI float64
  63. pilotQ float64
  64. pilotLPAlpha float64
  65. // Polyphase resampler (replaces integer-decimate hack)
  66. monoResampler *dsp.Resampler
  67. monoResamplerRate int
  68. stereoResampler *dsp.StereoResampler
  69. stereoResamplerRate int
  70. // AQ-4: Stateful FIR filters for click-free stereo decode
  71. stereoFilterRate int
  72. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  73. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  74. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  75. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  76. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  77. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  78. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  79. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  80. // and avoids per-frame FIR recomputation)
  81. preDemodFIR *dsp.StatefulFIRComplex
  82. preDemodDecim int // cached decimation factor
  83. preDemodRate int // cached snipRate this FIR was built for
  84. preDemodCutoff float64 // cached cutoff
  85. // AQ-2: De-emphasis config (µs, 0 = disabled)
  86. deemphasisUs float64
  87. // Scratch buffers — reused across frames to avoid GC pressure.
  88. // Grown as needed, never shrunk.
  89. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  90. scratchAudio []float32 // for stereo decode intermediates
  91. scratchPCM []byte // for PCM encoding
  92. // live-listen subscribers
  93. audioSubs []audioSub
  94. }
  95. type audioSub struct {
  96. id int64
  97. ch chan []byte
  98. }
  99. // AudioInfo describes the audio format of a live-listen subscription.
  100. // Sent to the WebSocket client as the first message.
  101. type AudioInfo struct {
  102. SampleRate int `json:"sample_rate"`
  103. Channels int `json:"channels"`
  104. Format string `json:"format"` // always "s16le"
  105. DemodName string `json:"demod"`
  106. PlaybackMode string `json:"playback_mode,omitempty"`
  107. StereoState string `json:"stereo_state,omitempty"`
  108. }
  109. const (
  110. streamAudioRate = 48000
  111. resamplerTaps = 32 // taps per polyphase arm — good quality
  112. )
  113. // ---------------------------------------------------------------------------
  114. // Streamer — manages all active streaming sessions
  115. // ---------------------------------------------------------------------------
  116. type streamFeedItem struct {
  117. signal detector.Signal
  118. snippet []complex64
  119. snipRate int
  120. }
  121. type streamFeedMsg struct {
  122. items []streamFeedItem
  123. }
  124. type Streamer struct {
  125. mu sync.Mutex
  126. sessions map[int64]*streamSession
  127. policy Policy
  128. centerHz float64
  129. nextSub int64
  130. feedCh chan streamFeedMsg
  131. done chan struct{}
  132. // pendingListens are subscribers waiting for a matching session.
  133. pendingListens map[int64]*pendingListen
  134. }
  135. type pendingListen struct {
  136. freq float64
  137. bw float64
  138. mode string
  139. ch chan []byte
  140. }
  141. func newStreamer(policy Policy, centerHz float64) *Streamer {
  142. st := &Streamer{
  143. sessions: make(map[int64]*streamSession),
  144. policy: policy,
  145. centerHz: centerHz,
  146. feedCh: make(chan streamFeedMsg, 2),
  147. done: make(chan struct{}),
  148. pendingListens: make(map[int64]*pendingListen),
  149. }
  150. go st.worker()
  151. return st
  152. }
  153. func (st *Streamer) worker() {
  154. for msg := range st.feedCh {
  155. st.processFeed(msg)
  156. }
  157. close(st.done)
  158. }
  159. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  160. st.mu.Lock()
  161. defer st.mu.Unlock()
  162. wasEnabled := st.policy.Enabled
  163. st.policy = policy
  164. st.centerHz = centerHz
  165. // If recording was just disabled, close recording sessions
  166. // but keep listen-only sessions alive.
  167. if wasEnabled && !policy.Enabled {
  168. for id, sess := range st.sessions {
  169. if sess.listenOnly {
  170. continue
  171. }
  172. if len(sess.audioSubs) > 0 {
  173. // Convert to listen-only: close WAV but keep session
  174. convertToListenOnly(sess)
  175. } else {
  176. closeSession(sess, &st.policy)
  177. delete(st.sessions, id)
  178. }
  179. }
  180. }
  181. }
  182. // HasListeners returns true if any sessions have audio subscribers
  183. // or there are pending listen requests. Used by the DSP loop to
  184. // decide whether to feed snippets even when recording is disabled.
  185. func (st *Streamer) HasListeners() bool {
  186. st.mu.Lock()
  187. defer st.mu.Unlock()
  188. return st.hasListenersLocked()
  189. }
  190. func (st *Streamer) hasListenersLocked() bool {
  191. if len(st.pendingListens) > 0 {
  192. return true
  193. }
  194. for _, sess := range st.sessions {
  195. if len(sess.audioSubs) > 0 {
  196. return true
  197. }
  198. }
  199. return false
  200. }
  201. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  202. // Feeds are accepted if:
  203. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  204. // - Any live-listen subscribers exist (listen-only mode)
  205. //
  206. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  207. // data, so items can be passed directly without another copy.
  208. func (st *Streamer) FeedSnippets(items []streamFeedItem) {
  209. st.mu.Lock()
  210. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  211. hasListeners := st.hasListenersLocked()
  212. pending := len(st.pendingListens)
  213. st.mu.Unlock()
  214. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  215. if (!recEnabled && !hasListeners) || len(items) == 0 {
  216. return
  217. }
  218. select {
  219. case st.feedCh <- streamFeedMsg{items: items}:
  220. default:
  221. }
  222. }
  223. // processFeed runs in the worker goroutine.
  224. func (st *Streamer) processFeed(msg streamFeedMsg) {
  225. st.mu.Lock()
  226. defer st.mu.Unlock()
  227. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  228. hasListeners := st.hasListenersLocked()
  229. if !recEnabled && !hasListeners {
  230. return
  231. }
  232. now := time.Now()
  233. seen := make(map[int64]bool, len(msg.items))
  234. for i := range msg.items {
  235. item := &msg.items[i]
  236. sig := &item.signal
  237. seen[sig.ID] = true
  238. if sig.ID == 0 || sig.Class == nil {
  239. continue
  240. }
  241. if len(item.snippet) == 0 || item.snipRate <= 0 {
  242. continue
  243. }
  244. // Decide whether this signal needs a session
  245. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  246. needsListen := st.signalHasListenerLocked(sig)
  247. className := "<nil>"
  248. demodName := ""
  249. if sig.Class != nil {
  250. className = string(sig.Class.ModType)
  251. demodName, _ = resolveDemod(sig)
  252. }
  253. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  254. if !needsRecording && !needsListen {
  255. continue
  256. }
  257. sess, exists := st.sessions[sig.ID]
  258. requestedMode := ""
  259. for _, pl := range st.pendingListens {
  260. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  261. if m := normalizeRequestedMode(pl.mode); m != "" {
  262. requestedMode = m
  263. break
  264. }
  265. }
  266. }
  267. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  268. for _, sub := range sess.audioSubs {
  269. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  270. }
  271. delete(st.sessions, sig.ID)
  272. sess = nil
  273. exists = false
  274. }
  275. if !exists {
  276. if needsRecording {
  277. s, err := st.openRecordingSession(sig, now)
  278. if err != nil {
  279. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  280. sig.ID, sig.CenterHz/1e6, err)
  281. continue
  282. }
  283. st.sessions[sig.ID] = s
  284. sess = s
  285. } else {
  286. s := st.openListenSession(sig, now)
  287. st.sessions[sig.ID] = s
  288. sess = s
  289. }
  290. // Attach any pending listeners
  291. st.attachPendingListeners(sess)
  292. }
  293. // Update metadata
  294. sess.lastFeed = now
  295. sess.centerHz = sig.CenterHz
  296. sess.bwHz = sig.BWHz
  297. if sig.SNRDb > sess.snrDb {
  298. sess.snrDb = sig.SNRDb
  299. }
  300. if sig.PeakDb > sess.peakDb {
  301. sess.peakDb = sig.PeakDb
  302. }
  303. if sig.Class != nil {
  304. sess.class = sig.Class
  305. }
  306. // Demod with persistent state
  307. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  308. if len(audio) > 0 {
  309. if sess.wavSamples == 0 && audioRate > 0 {
  310. sess.sampleRate = audioRate
  311. }
  312. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  313. pcmLen := len(audio) * 2
  314. pcm := sess.growPCM(pcmLen)
  315. for k, s := range audio {
  316. v := int16(clip(s * 32767))
  317. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  318. }
  319. if !sess.listenOnly && sess.wavBuf != nil {
  320. n, err := sess.wavBuf.Write(pcm)
  321. if err != nil {
  322. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  323. } else {
  324. sess.wavSamples += int64(n / 2)
  325. }
  326. }
  327. st.fanoutPCM(sess, pcm, pcmLen)
  328. }
  329. // Segment split (recording sessions only)
  330. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  331. segIdx := sess.segmentIdx + 1
  332. oldSubs := sess.audioSubs
  333. oldState := sess.captureDSPState()
  334. sess.audioSubs = nil
  335. closeSession(sess, &st.policy)
  336. s, err := st.openRecordingSession(sig, now)
  337. if err != nil {
  338. delete(st.sessions, sig.ID)
  339. continue
  340. }
  341. s.segmentIdx = segIdx
  342. s.audioSubs = oldSubs
  343. s.restoreDSPState(oldState)
  344. st.sessions[sig.ID] = s
  345. }
  346. }
  347. // Close sessions for disappeared signals (with grace period)
  348. for id, sess := range st.sessions {
  349. if seen[id] {
  350. continue
  351. }
  352. gracePeriod := 3 * time.Second
  353. if sess.listenOnly {
  354. gracePeriod = 5 * time.Second
  355. }
  356. if now.Sub(sess.lastFeed) > gracePeriod {
  357. for _, sub := range sess.audioSubs {
  358. close(sub.ch)
  359. }
  360. sess.audioSubs = nil
  361. if !sess.listenOnly {
  362. closeSession(sess, &st.policy)
  363. }
  364. delete(st.sessions, id)
  365. }
  366. }
  367. }
  368. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  369. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  370. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  371. return true
  372. }
  373. for subID, pl := range st.pendingListens {
  374. delta := math.Abs(sig.CenterHz - pl.freq)
  375. if delta < 200000 {
  376. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  377. return true
  378. }
  379. }
  380. return false
  381. }
  382. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  383. for subID, pl := range st.pendingListens {
  384. requestedMode := normalizeRequestedMode(pl.mode)
  385. if requestedMode != "" && sess.demodName != requestedMode {
  386. continue
  387. }
  388. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  389. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  390. delete(st.pendingListens, subID)
  391. // Send updated audio_info now that we know the real session params.
  392. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  393. infoJSON, _ := json.Marshal(sess.audioInfo())
  394. tagged := make([]byte, 1+len(infoJSON))
  395. tagged[0] = 0x00 // tag: audio_info
  396. copy(tagged[1:], infoJSON)
  397. select {
  398. case pl.ch <- tagged:
  399. default:
  400. }
  401. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  402. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  403. }
  404. }
  405. }
  406. // CloseAll finalises all sessions and stops the worker goroutine.
  407. func (st *Streamer) CloseAll() {
  408. close(st.feedCh)
  409. <-st.done
  410. st.mu.Lock()
  411. defer st.mu.Unlock()
  412. for id, sess := range st.sessions {
  413. for _, sub := range sess.audioSubs {
  414. close(sub.ch)
  415. }
  416. sess.audioSubs = nil
  417. if !sess.listenOnly {
  418. closeSession(sess, &st.policy)
  419. }
  420. delete(st.sessions, id)
  421. }
  422. for _, pl := range st.pendingListens {
  423. close(pl.ch)
  424. }
  425. st.pendingListens = nil
  426. }
  427. // ActiveSessions returns the number of open streaming sessions.
  428. func (st *Streamer) ActiveSessions() int {
  429. st.mu.Lock()
  430. defer st.mu.Unlock()
  431. return len(st.sessions)
  432. }
  433. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  434. //
  435. // LL-2: Returns AudioInfo with correct channels and sample rate.
  436. // LL-3: Returns error only on hard failures (nil streamer etc).
  437. //
  438. // If a matching session exists, attaches immediately. Otherwise, the
  439. // subscriber is held as "pending" and will be attached when a matching
  440. // signal appears in the next DSP frame.
  441. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  442. ch := make(chan []byte, 64)
  443. st.mu.Lock()
  444. defer st.mu.Unlock()
  445. st.nextSub++
  446. subID := st.nextSub
  447. requestedMode := normalizeRequestedMode(mode)
  448. // Try to find a matching session
  449. var bestSess *streamSession
  450. bestDist := math.MaxFloat64
  451. for _, sess := range st.sessions {
  452. if requestedMode != "" && sess.demodName != requestedMode {
  453. continue
  454. }
  455. d := math.Abs(sess.centerHz - freq)
  456. if d < bestDist {
  457. bestDist = d
  458. bestSess = sess
  459. }
  460. }
  461. if bestSess != nil && bestDist < 200000 {
  462. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  463. info := bestSess.audioInfo()
  464. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  465. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  466. return subID, ch, info, nil
  467. }
  468. // No matching session yet — add as pending listener
  469. st.pendingListens[subID] = &pendingListen{
  470. freq: freq,
  471. bw: bw,
  472. mode: mode,
  473. ch: ch,
  474. }
  475. info := defaultAudioInfoForMode(mode)
  476. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  477. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  478. return subID, ch, info, nil
  479. }
  480. // UnsubscribeAudio removes a live-listen subscriber.
  481. func (st *Streamer) UnsubscribeAudio(subID int64) {
  482. st.mu.Lock()
  483. defer st.mu.Unlock()
  484. if pl, ok := st.pendingListens[subID]; ok {
  485. close(pl.ch)
  486. delete(st.pendingListens, subID)
  487. return
  488. }
  489. for _, sess := range st.sessions {
  490. for i, sub := range sess.audioSubs {
  491. if sub.id == subID {
  492. close(sub.ch)
  493. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  494. return
  495. }
  496. }
  497. }
  498. }
  499. // ---------------------------------------------------------------------------
  500. // Session: stateful extraction + demod
  501. // ---------------------------------------------------------------------------
  502. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  503. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  504. // output with zero transient artifacts.
  505. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  506. if len(snippet) == 0 || snipRate <= 0 {
  507. return nil, 0
  508. }
  509. isWFMStereo := sess.demodName == "WFM_STEREO"
  510. isWFM := sess.demodName == "WFM" || isWFMStereo
  511. demodName := sess.demodName
  512. if isWFMStereo {
  513. demodName = "WFM"
  514. }
  515. d := demod.Get(demodName)
  516. if d == nil {
  517. d = demod.Get("NFM")
  518. }
  519. if d == nil {
  520. return nil, 0
  521. }
  522. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  523. // The FM discriminator needs iq[i-1] to compute the first output.
  524. // All FIR filtering is now stateful, so no additional overlap is needed.
  525. var fullSnip []complex64
  526. trimSamples := 0
  527. if len(sess.overlapIQ) == 1 {
  528. fullSnip = make([]complex64, 1+len(snippet))
  529. fullSnip[0] = sess.overlapIQ[0]
  530. copy(fullSnip[1:], snippet)
  531. trimSamples = 1
  532. } else {
  533. fullSnip = snippet
  534. }
  535. // Save last sample for next frame's FM discriminator
  536. if len(snippet) > 0 {
  537. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  538. }
  539. // --- Stateful anti-alias FIR + decimation to demod rate ---
  540. demodRate := d.OutputSampleRate()
  541. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  542. if decim1 < 1 {
  543. decim1 = 1
  544. }
  545. actualDemodRate := snipRate / decim1
  546. var dec []complex64
  547. if decim1 > 1 {
  548. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  549. // Lazy-init or reinit stateful FIR if parameters changed
  550. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  551. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  552. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  553. sess.preDemodRate = snipRate
  554. sess.preDemodCutoff = cutoff
  555. sess.preDemodDecim = decim1
  556. }
  557. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  558. dec = dsp.Decimate(filtered, decim1)
  559. } else {
  560. dec = fullSnip
  561. }
  562. // --- FM Demod ---
  563. audio := d.Demod(dec, actualDemodRate)
  564. if len(audio) == 0 {
  565. return nil, 0
  566. }
  567. // --- Trim the 1-sample FM discriminator overlap ---
  568. if trimSamples > 0 {
  569. audioTrim := trimSamples / decim1
  570. if audioTrim < 1 {
  571. audioTrim = 1 // at minimum trim 1 audio sample
  572. }
  573. if audioTrim > 0 && audioTrim < len(audio) {
  574. audio = audio[audioTrim:]
  575. }
  576. }
  577. // --- Stateful stereo decode with conservative lock/hysteresis ---
  578. channels := 1
  579. if isWFMStereo {
  580. sess.playbackMode = "WFM_STEREO"
  581. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  582. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  583. if locked {
  584. sess.stereoOnCount++
  585. sess.stereoOffCount = 0
  586. if sess.stereoOnCount >= 4 {
  587. sess.stereoEnabled = true
  588. }
  589. } else {
  590. sess.stereoOnCount = 0
  591. sess.stereoOffCount++
  592. if sess.stereoOffCount >= 10 {
  593. sess.stereoEnabled = false
  594. }
  595. }
  596. prevPlayback := sess.playbackMode
  597. prevStereo := sess.stereoState
  598. if sess.stereoEnabled && len(stereoAudio) > 0 {
  599. sess.stereoState = "locked"
  600. audio = stereoAudio
  601. } else {
  602. sess.stereoState = "mono-fallback"
  603. dual := make([]float32, len(audio)*2)
  604. for i, s := range audio {
  605. dual[i*2] = s
  606. dual[i*2+1] = s
  607. }
  608. audio = dual
  609. }
  610. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  611. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  612. }
  613. }
  614. // --- Polyphase resample to exact 48kHz ---
  615. if actualDemodRate != streamAudioRate {
  616. if channels > 1 {
  617. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  618. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  619. sess.stereoResamplerRate = actualDemodRate
  620. }
  621. audio = sess.stereoResampler.Process(audio)
  622. } else {
  623. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  624. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  625. sess.monoResamplerRate = actualDemodRate
  626. }
  627. audio = sess.monoResampler.Process(audio)
  628. }
  629. }
  630. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  631. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  632. tau := sess.deemphasisUs * 1e-6
  633. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  634. if channels > 1 {
  635. nFrames := len(audio) / channels
  636. yL, yR := sess.deemphL, sess.deemphR
  637. for i := 0; i < nFrames; i++ {
  638. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  639. audio[i*2] = float32(yL)
  640. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  641. audio[i*2+1] = float32(yR)
  642. }
  643. sess.deemphL, sess.deemphR = yL, yR
  644. } else {
  645. y := sess.deemphL
  646. for i := range audio {
  647. y = alpha*y + (1-alpha)*float64(audio[i])
  648. audio[i] = float32(y)
  649. }
  650. sess.deemphL = y
  651. }
  652. }
  653. if isWFM {
  654. for i := range audio {
  655. audio[i] *= 0.35
  656. }
  657. }
  658. return audio, streamAudioRate
  659. }
  660. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  661. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  662. // loopBW is in Hz, sampleRate in samples/sec.
  663. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  664. if sampleRate <= 0 || loopBW <= 0 {
  665. return 0, 0
  666. }
  667. bl := loopBW / float64(sampleRate)
  668. theta := bl / (damping + 0.25/damping)
  669. d := 1 + 2*damping*theta + theta*theta
  670. alpha := (4 * damping * theta) / d
  671. beta := (4 * theta * theta) / d
  672. return alpha, beta
  673. }
  674. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  675. // Uses persistent FIR filter state across frames for click-free stereo.
  676. // Reuses session scratch buffers to minimize allocations.
  677. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  678. if len(mono) == 0 || sampleRate <= 0 {
  679. return nil, false
  680. }
  681. n := len(mono)
  682. // Rebuild rate-dependent stereo filters when sampleRate changes
  683. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  684. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  685. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  686. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  687. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  688. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  689. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  690. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  691. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  692. sess.stereoFilterRate = sampleRate
  693. // Initialize PLL for 19kHz pilot tracking.
  694. sess.pilotPhase = 0
  695. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  696. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  697. sess.pilotErrAvg = 0
  698. sess.pilotI = 0
  699. sess.pilotQ = 0
  700. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  701. }
  702. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  703. scratch := sess.growAudio(n * 5)
  704. lpr := scratch[:n]
  705. bpfLR := scratch[n : 2*n]
  706. lr := scratch[2*n : 3*n]
  707. work1 := scratch[3*n : 4*n]
  708. work2 := scratch[4*n : 5*n]
  709. sess.stereoLPF.ProcessInto(mono, lpr)
  710. // 23-53kHz bandpass for L-R DSB-SC.
  711. sess.stereoBPHi.ProcessInto(mono, work1)
  712. sess.stereoBPLo.ProcessInto(mono, work2)
  713. for i := 0; i < n; i++ {
  714. bpfLR[i] = work1[i] - work2[i]
  715. }
  716. // 19kHz pilot bandpass for PLL.
  717. sess.pilotLPFHi.ProcessInto(mono, work1)
  718. sess.pilotLPFLo.ProcessInto(mono, work2)
  719. for i := 0; i < n; i++ {
  720. work1[i] = work1[i] - work2[i]
  721. }
  722. pilot := work1
  723. phase := sess.pilotPhase
  724. freq := sess.pilotFreq
  725. alpha := sess.pilotAlpha
  726. beta := sess.pilotBeta
  727. iState := sess.pilotI
  728. qState := sess.pilotQ
  729. lpAlpha := sess.pilotLPAlpha
  730. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  731. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  732. var pilotPower float64
  733. var totalPower float64
  734. var errSum float64
  735. for i := 0; i < n; i++ {
  736. p := float64(pilot[i])
  737. sinP, cosP := math.Sincos(phase)
  738. iMix := p * cosP
  739. qMix := p * -sinP
  740. iState += lpAlpha * (iMix - iState)
  741. qState += lpAlpha * (qMix - qState)
  742. err := math.Atan2(qState, iState)
  743. freq += beta * err
  744. if freq < minFreq {
  745. freq = minFreq
  746. } else if freq > maxFreq {
  747. freq = maxFreq
  748. }
  749. phase += freq + alpha*err
  750. if phase > 2*math.Pi {
  751. phase -= 2 * math.Pi
  752. } else if phase < 0 {
  753. phase += 2 * math.Pi
  754. }
  755. totalPower += float64(mono[i]) * float64(mono[i])
  756. pilotPower += p * p
  757. errSum += math.Abs(err)
  758. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  759. }
  760. sess.pilotPhase = phase
  761. sess.pilotFreq = freq
  762. sess.pilotI = iState
  763. sess.pilotQ = qState
  764. blockErr := errSum / float64(n)
  765. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  766. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  767. pilotRatio := 0.0
  768. if totalPower > 0 {
  769. pilotRatio = pilotPower / totalPower
  770. }
  771. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  772. // Lock heuristics: pilot power fraction and PLL phase error stability.
  773. // Pilot power is a small but stable fraction of composite energy; require
  774. // a modest floor plus PLL settling to avoid flapping in noise.
  775. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  776. out := make([]float32, n*2)
  777. for i := 0; i < n; i++ {
  778. out[i*2] = 0.5 * (lpr[i] + lr[i])
  779. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  780. }
  781. return out, locked
  782. }
  783. // dspStateSnapshot captures persistent DSP state for segment splits.
  784. type dspStateSnapshot struct {
  785. overlapIQ []complex64
  786. deemphL float64
  787. deemphR float64
  788. pilotPhase float64
  789. pilotFreq float64
  790. pilotAlpha float64
  791. pilotBeta float64
  792. pilotErrAvg float64
  793. pilotI float64
  794. pilotQ float64
  795. pilotLPAlpha float64
  796. monoResampler *dsp.Resampler
  797. monoResamplerRate int
  798. stereoResampler *dsp.StereoResampler
  799. stereoResamplerRate int
  800. stereoLPF *dsp.StatefulFIRReal
  801. stereoFilterRate int
  802. stereoBPHi *dsp.StatefulFIRReal
  803. stereoBPLo *dsp.StatefulFIRReal
  804. stereoLRLPF *dsp.StatefulFIRReal
  805. stereoAALPF *dsp.StatefulFIRReal
  806. pilotLPFHi *dsp.StatefulFIRReal
  807. pilotLPFLo *dsp.StatefulFIRReal
  808. preDemodFIR *dsp.StatefulFIRComplex
  809. preDemodDecim int
  810. preDemodRate int
  811. preDemodCutoff float64
  812. }
  813. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  814. return dspStateSnapshot{
  815. overlapIQ: sess.overlapIQ,
  816. deemphL: sess.deemphL,
  817. deemphR: sess.deemphR,
  818. pilotPhase: sess.pilotPhase,
  819. pilotFreq: sess.pilotFreq,
  820. pilotAlpha: sess.pilotAlpha,
  821. pilotBeta: sess.pilotBeta,
  822. pilotErrAvg: sess.pilotErrAvg,
  823. pilotI: sess.pilotI,
  824. pilotQ: sess.pilotQ,
  825. pilotLPAlpha: sess.pilotLPAlpha,
  826. monoResampler: sess.monoResampler,
  827. monoResamplerRate: sess.monoResamplerRate,
  828. stereoResampler: sess.stereoResampler,
  829. stereoResamplerRate: sess.stereoResamplerRate,
  830. stereoLPF: sess.stereoLPF,
  831. stereoFilterRate: sess.stereoFilterRate,
  832. stereoBPHi: sess.stereoBPHi,
  833. stereoBPLo: sess.stereoBPLo,
  834. stereoLRLPF: sess.stereoLRLPF,
  835. stereoAALPF: sess.stereoAALPF,
  836. pilotLPFHi: sess.pilotLPFHi,
  837. pilotLPFLo: sess.pilotLPFLo,
  838. preDemodFIR: sess.preDemodFIR,
  839. preDemodDecim: sess.preDemodDecim,
  840. preDemodRate: sess.preDemodRate,
  841. preDemodCutoff: sess.preDemodCutoff,
  842. }
  843. }
  844. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  845. sess.overlapIQ = s.overlapIQ
  846. sess.deemphL = s.deemphL
  847. sess.deemphR = s.deemphR
  848. sess.pilotPhase = s.pilotPhase
  849. sess.pilotFreq = s.pilotFreq
  850. sess.pilotAlpha = s.pilotAlpha
  851. sess.pilotBeta = s.pilotBeta
  852. sess.pilotErrAvg = s.pilotErrAvg
  853. sess.pilotI = s.pilotI
  854. sess.pilotQ = s.pilotQ
  855. sess.pilotLPAlpha = s.pilotLPAlpha
  856. sess.monoResampler = s.monoResampler
  857. sess.monoResamplerRate = s.monoResamplerRate
  858. sess.stereoResampler = s.stereoResampler
  859. sess.stereoResamplerRate = s.stereoResamplerRate
  860. sess.stereoLPF = s.stereoLPF
  861. sess.stereoFilterRate = s.stereoFilterRate
  862. sess.stereoBPHi = s.stereoBPHi
  863. sess.stereoBPLo = s.stereoBPLo
  864. sess.stereoLRLPF = s.stereoLRLPF
  865. sess.stereoAALPF = s.stereoAALPF
  866. sess.pilotLPFHi = s.pilotLPFHi
  867. sess.pilotLPFLo = s.pilotLPFLo
  868. sess.preDemodFIR = s.preDemodFIR
  869. sess.preDemodDecim = s.preDemodDecim
  870. sess.preDemodRate = s.preDemodRate
  871. sess.preDemodCutoff = s.preDemodCutoff
  872. }
  873. // ---------------------------------------------------------------------------
  874. // Session management helpers
  875. // ---------------------------------------------------------------------------
  876. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  877. outputDir := st.policy.OutputDir
  878. if outputDir == "" {
  879. outputDir = "data/recordings"
  880. }
  881. demodName, channels := resolveDemod(sig)
  882. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  883. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  884. dir := filepath.Join(outputDir, dirName)
  885. if err := os.MkdirAll(dir, 0o755); err != nil {
  886. return nil, err
  887. }
  888. wavPath := filepath.Join(dir, "audio.wav")
  889. f, err := os.Create(wavPath)
  890. if err != nil {
  891. return nil, err
  892. }
  893. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  894. f.Close()
  895. return nil, err
  896. }
  897. playbackMode, stereoState := initialPlaybackState(demodName)
  898. sess := &streamSession{
  899. signalID: sig.ID,
  900. centerHz: sig.CenterHz,
  901. bwHz: sig.BWHz,
  902. snrDb: sig.SNRDb,
  903. peakDb: sig.PeakDb,
  904. class: sig.Class,
  905. startTime: now,
  906. lastFeed: now,
  907. dir: dir,
  908. wavFile: f,
  909. wavBuf: bufio.NewWriterSize(f, 64*1024),
  910. sampleRate: streamAudioRate,
  911. channels: channels,
  912. demodName: demodName,
  913. playbackMode: playbackMode,
  914. stereoState: stereoState,
  915. deemphasisUs: st.policy.DeemphasisUs,
  916. }
  917. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  918. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  919. return sess, nil
  920. }
  921. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  922. demodName, channels := resolveDemod(sig)
  923. for _, pl := range st.pendingListens {
  924. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  925. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  926. demodName = requested
  927. if demodName == "WFM_STEREO" {
  928. channels = 2
  929. } else if d := demod.Get(demodName); d != nil {
  930. channels = d.Channels()
  931. } else {
  932. channels = 1
  933. }
  934. break
  935. }
  936. }
  937. }
  938. playbackMode, stereoState := initialPlaybackState(demodName)
  939. sess := &streamSession{
  940. signalID: sig.ID,
  941. centerHz: sig.CenterHz,
  942. bwHz: sig.BWHz,
  943. snrDb: sig.SNRDb,
  944. peakDb: sig.PeakDb,
  945. class: sig.Class,
  946. startTime: now,
  947. lastFeed: now,
  948. listenOnly: true,
  949. sampleRate: streamAudioRate,
  950. channels: channels,
  951. demodName: demodName,
  952. playbackMode: playbackMode,
  953. stereoState: stereoState,
  954. deemphasisUs: st.policy.DeemphasisUs,
  955. }
  956. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  957. sig.ID, sig.CenterHz/1e6, demodName)
  958. return sess
  959. }
  960. func resolveDemod(sig *detector.Signal) (string, int) {
  961. demodName := "NFM"
  962. if sig.Class != nil {
  963. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  964. demodName = n
  965. }
  966. }
  967. channels := 1
  968. if demodName == "WFM_STEREO" {
  969. channels = 2
  970. } else if d := demod.Get(demodName); d != nil {
  971. channels = d.Channels()
  972. }
  973. return demodName, channels
  974. }
  975. func initialPlaybackState(demodName string) (string, string) {
  976. playbackMode := demodName
  977. stereoState := "mono"
  978. if demodName == "WFM_STEREO" {
  979. stereoState = "searching"
  980. }
  981. return playbackMode, stereoState
  982. }
  983. func (sess *streamSession) audioInfo() AudioInfo {
  984. return AudioInfo{
  985. SampleRate: sess.sampleRate,
  986. Channels: sess.channels,
  987. Format: "s16le",
  988. DemodName: sess.demodName,
  989. PlaybackMode: sess.playbackMode,
  990. StereoState: sess.stereoState,
  991. }
  992. }
  993. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  994. infoJSON, _ := json.Marshal(info)
  995. tagged := make([]byte, 1+len(infoJSON))
  996. tagged[0] = 0x00 // tag: audio_info
  997. copy(tagged[1:], infoJSON)
  998. for _, sub := range subs {
  999. select {
  1000. case sub.ch <- tagged:
  1001. default:
  1002. }
  1003. }
  1004. }
  1005. func defaultAudioInfoForMode(mode string) AudioInfo {
  1006. demodName := "NFM"
  1007. if requested := normalizeRequestedMode(mode); requested != "" {
  1008. demodName = requested
  1009. }
  1010. channels := 1
  1011. if demodName == "WFM_STEREO" {
  1012. channels = 2
  1013. } else if d := demod.Get(demodName); d != nil {
  1014. channels = d.Channels()
  1015. }
  1016. playbackMode, stereoState := initialPlaybackState(demodName)
  1017. return AudioInfo{
  1018. SampleRate: streamAudioRate,
  1019. Channels: channels,
  1020. Format: "s16le",
  1021. DemodName: demodName,
  1022. PlaybackMode: playbackMode,
  1023. StereoState: stereoState,
  1024. }
  1025. }
  1026. func normalizeRequestedMode(mode string) string {
  1027. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1028. case "", "AUTO":
  1029. return ""
  1030. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1031. return strings.ToUpper(strings.TrimSpace(mode))
  1032. default:
  1033. return ""
  1034. }
  1035. }
  1036. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1037. func (sess *streamSession) growIQ(n int) []complex64 {
  1038. if cap(sess.scratchIQ) >= n {
  1039. return sess.scratchIQ[:n]
  1040. }
  1041. sess.scratchIQ = make([]complex64, n, n*5/4)
  1042. return sess.scratchIQ
  1043. }
  1044. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1045. func (sess *streamSession) growAudio(n int) []float32 {
  1046. if cap(sess.scratchAudio) >= n {
  1047. return sess.scratchAudio[:n]
  1048. }
  1049. sess.scratchAudio = make([]float32, n, n*5/4)
  1050. return sess.scratchAudio
  1051. }
  1052. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1053. func (sess *streamSession) growPCM(n int) []byte {
  1054. if cap(sess.scratchPCM) >= n {
  1055. return sess.scratchPCM[:n]
  1056. }
  1057. sess.scratchPCM = make([]byte, n, n*5/4)
  1058. return sess.scratchPCM
  1059. }
  1060. func convertToListenOnly(sess *streamSession) {
  1061. if sess.wavBuf != nil {
  1062. _ = sess.wavBuf.Flush()
  1063. }
  1064. if sess.wavFile != nil {
  1065. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1066. sess.wavFile.Close()
  1067. }
  1068. sess.wavFile = nil
  1069. sess.wavBuf = nil
  1070. sess.listenOnly = true
  1071. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1072. }
  1073. func closeSession(sess *streamSession, policy *Policy) {
  1074. if sess.listenOnly {
  1075. return
  1076. }
  1077. if sess.wavBuf != nil {
  1078. _ = sess.wavBuf.Flush()
  1079. }
  1080. if sess.wavFile != nil {
  1081. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1082. sess.wavFile.Close()
  1083. sess.wavFile = nil
  1084. sess.wavBuf = nil
  1085. }
  1086. dur := sess.lastFeed.Sub(sess.startTime)
  1087. files := map[string]any{
  1088. "audio": "audio.wav",
  1089. "audio_sample_rate": sess.sampleRate,
  1090. "audio_channels": sess.channels,
  1091. "audio_demod": sess.demodName,
  1092. "recording_mode": "streaming",
  1093. }
  1094. meta := Meta{
  1095. EventID: sess.signalID,
  1096. Start: sess.startTime,
  1097. End: sess.lastFeed,
  1098. CenterHz: sess.centerHz,
  1099. BandwidthHz: sess.bwHz,
  1100. SampleRate: sess.sampleRate,
  1101. SNRDb: sess.snrDb,
  1102. PeakDb: sess.peakDb,
  1103. Class: sess.class,
  1104. DurationMs: dur.Milliseconds(),
  1105. Files: files,
  1106. }
  1107. b, err := json.MarshalIndent(meta, "", " ")
  1108. if err == nil {
  1109. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1110. }
  1111. if policy != nil {
  1112. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1113. }
  1114. }
  1115. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1116. if len(sess.audioSubs) == 0 {
  1117. return
  1118. }
  1119. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1120. tagged := make([]byte, 1+pcmLen)
  1121. tagged[0] = 0x01
  1122. copy(tagged[1:], pcm[:pcmLen])
  1123. alive := sess.audioSubs[:0]
  1124. for _, sub := range sess.audioSubs {
  1125. select {
  1126. case sub.ch <- tagged:
  1127. default:
  1128. }
  1129. alive = append(alive, sub)
  1130. }
  1131. sess.audioSubs = alive
  1132. }
  1133. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1134. if len(st.policy.ClassFilter) == 0 {
  1135. return true
  1136. }
  1137. if cls == nil {
  1138. return false
  1139. }
  1140. for _, f := range st.policy.ClassFilter {
  1141. if strings.EqualFold(f, string(cls.ModType)) {
  1142. return true
  1143. }
  1144. }
  1145. return false
  1146. }
  1147. // ErrNoSession is returned when no matching signal session exists.
  1148. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1149. // ---------------------------------------------------------------------------
  1150. // WAV header helpers
  1151. // ---------------------------------------------------------------------------
  1152. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1153. if channels <= 0 {
  1154. channels = 1
  1155. }
  1156. hdr := make([]byte, 44)
  1157. copy(hdr[0:4], "RIFF")
  1158. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1159. copy(hdr[8:12], "WAVE")
  1160. copy(hdr[12:16], "fmt ")
  1161. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1162. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1163. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1164. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1165. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1166. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1167. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1168. copy(hdr[36:40], "data")
  1169. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1170. _, err := f.Write(hdr)
  1171. return err
  1172. }
  1173. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1174. dataSize := uint32(totalSamples * 2)
  1175. var buf [4]byte
  1176. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1177. if _, err := f.Seek(4, 0); err != nil {
  1178. return
  1179. }
  1180. _, _ = f.Write(buf[:])
  1181. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1182. if _, err := f.Seek(24, 0); err != nil {
  1183. return
  1184. }
  1185. _, _ = f.Write(buf[:])
  1186. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1187. if _, err := f.Seek(28, 0); err != nil {
  1188. return
  1189. }
  1190. _, _ = f.Write(buf[:])
  1191. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1192. if _, err := f.Seek(40, 0); err != nil {
  1193. return
  1194. }
  1195. _, _ = f.Write(buf[:])
  1196. }