Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

1036 wiersze
28KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-visual-suite/internal/classifier"
  16. "sdr-visual-suite/internal/demod"
  17. "sdr-visual-suite/internal/detector"
  18. "sdr-visual-suite/internal/dsp"
  19. )
  20. // ---------------------------------------------------------------------------
  21. // streamSession — one open demod session for one signal
  22. // ---------------------------------------------------------------------------
  23. type streamSession struct {
  24. signalID int64
  25. centerHz float64
  26. bwHz float64
  27. snrDb float64
  28. peakDb float64
  29. class *classifier.Classification
  30. startTime time.Time
  31. lastFeed time.Time
  32. // listenOnly sessions have no WAV file and no disk I/O.
  33. // They exist solely to feed audio to live-listen subscribers.
  34. listenOnly bool
  35. // Recording state (nil/zero for listen-only sessions)
  36. dir string
  37. wavFile *os.File
  38. wavBuf *bufio.Writer
  39. wavSamples int64
  40. segmentIdx int
  41. sampleRate int // actual output audio sample rate (always streamAudioRate)
  42. channels int
  43. demodName string
  44. // --- Persistent DSP state for click-free streaming ---
  45. // Overlap-save: tail of previous extracted IQ snippet.
  46. overlapIQ []complex64
  47. // De-emphasis IIR state (persists across frames)
  48. deemphL float64
  49. deemphR float64
  50. // Stereo decode: phase-continuous 38kHz oscillator
  51. stereoPhase float64
  52. // Polyphase resampler (replaces integer-decimate hack)
  53. monoResampler *dsp.Resampler
  54. stereoResampler *dsp.StereoResampler
  55. // AQ-4: Stateful FIR filters for click-free stereo decode
  56. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  57. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  58. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  59. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  60. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  61. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  62. // and avoids per-frame FIR recomputation)
  63. preDemodFIR *dsp.StatefulFIRComplex
  64. preDemodDecim int // cached decimation factor
  65. preDemodRate int // cached snipRate this FIR was built for
  66. preDemodCutoff float64 // cached cutoff
  67. // AQ-2: De-emphasis config (µs, 0 = disabled)
  68. deemphasisUs float64
  69. // Scratch buffers — reused across frames to avoid GC pressure.
  70. // Grown as needed, never shrunk.
  71. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  72. scratchAudio []float32 // for stereo decode intermediates
  73. scratchPCM []byte // for PCM encoding
  74. // live-listen subscribers
  75. audioSubs []audioSub
  76. }
  77. type audioSub struct {
  78. id int64
  79. ch chan []byte
  80. }
  81. // AudioInfo describes the audio format of a live-listen subscription.
  82. // Sent to the WebSocket client as the first message.
  83. type AudioInfo struct {
  84. SampleRate int `json:"sample_rate"`
  85. Channels int `json:"channels"`
  86. Format string `json:"format"` // always "s16le"
  87. DemodName string `json:"demod"`
  88. }
  89. const (
  90. streamAudioRate = 48000
  91. resamplerTaps = 32 // taps per polyphase arm — good quality
  92. )
  93. // ---------------------------------------------------------------------------
  94. // Streamer — manages all active streaming sessions
  95. // ---------------------------------------------------------------------------
  96. type streamFeedItem struct {
  97. signal detector.Signal
  98. snippet []complex64
  99. snipRate int
  100. }
  101. type streamFeedMsg struct {
  102. items []streamFeedItem
  103. }
  104. type Streamer struct {
  105. mu sync.Mutex
  106. sessions map[int64]*streamSession
  107. policy Policy
  108. centerHz float64
  109. nextSub int64
  110. feedCh chan streamFeedMsg
  111. done chan struct{}
  112. // pendingListens are subscribers waiting for a matching session.
  113. pendingListens map[int64]*pendingListen
  114. }
  115. type pendingListen struct {
  116. freq float64
  117. bw float64
  118. mode string
  119. ch chan []byte
  120. }
  121. func newStreamer(policy Policy, centerHz float64) *Streamer {
  122. st := &Streamer{
  123. sessions: make(map[int64]*streamSession),
  124. policy: policy,
  125. centerHz: centerHz,
  126. feedCh: make(chan streamFeedMsg, 2),
  127. done: make(chan struct{}),
  128. pendingListens: make(map[int64]*pendingListen),
  129. }
  130. go st.worker()
  131. return st
  132. }
  133. func (st *Streamer) worker() {
  134. for msg := range st.feedCh {
  135. st.processFeed(msg)
  136. }
  137. close(st.done)
  138. }
  139. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  140. st.mu.Lock()
  141. defer st.mu.Unlock()
  142. wasEnabled := st.policy.Enabled
  143. st.policy = policy
  144. st.centerHz = centerHz
  145. // If recording was just disabled, close recording sessions
  146. // but keep listen-only sessions alive.
  147. if wasEnabled && !policy.Enabled {
  148. for id, sess := range st.sessions {
  149. if sess.listenOnly {
  150. continue
  151. }
  152. if len(sess.audioSubs) > 0 {
  153. // Convert to listen-only: close WAV but keep session
  154. convertToListenOnly(sess)
  155. } else {
  156. closeSession(sess, &st.policy)
  157. delete(st.sessions, id)
  158. }
  159. }
  160. }
  161. }
  162. // HasListeners returns true if any sessions have audio subscribers
  163. // or there are pending listen requests. Used by the DSP loop to
  164. // decide whether to feed snippets even when recording is disabled.
  165. func (st *Streamer) HasListeners() bool {
  166. st.mu.Lock()
  167. defer st.mu.Unlock()
  168. return st.hasListenersLocked()
  169. }
  170. func (st *Streamer) hasListenersLocked() bool {
  171. if len(st.pendingListens) > 0 {
  172. return true
  173. }
  174. for _, sess := range st.sessions {
  175. if len(sess.audioSubs) > 0 {
  176. return true
  177. }
  178. }
  179. return false
  180. }
  181. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  182. // Feeds are accepted if:
  183. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  184. // - Any live-listen subscribers exist (listen-only mode)
  185. //
  186. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  187. // data, so items can be passed directly without another copy.
  188. func (st *Streamer) FeedSnippets(items []streamFeedItem) {
  189. st.mu.Lock()
  190. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  191. hasListeners := st.hasListenersLocked()
  192. st.mu.Unlock()
  193. if (!recEnabled && !hasListeners) || len(items) == 0 {
  194. return
  195. }
  196. select {
  197. case st.feedCh <- streamFeedMsg{items: items}:
  198. default:
  199. }
  200. }
  201. // processFeed runs in the worker goroutine.
  202. func (st *Streamer) processFeed(msg streamFeedMsg) {
  203. st.mu.Lock()
  204. defer st.mu.Unlock()
  205. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  206. hasListeners := st.hasListenersLocked()
  207. if !recEnabled && !hasListeners {
  208. return
  209. }
  210. now := time.Now()
  211. seen := make(map[int64]bool, len(msg.items))
  212. for i := range msg.items {
  213. item := &msg.items[i]
  214. sig := &item.signal
  215. seen[sig.ID] = true
  216. if sig.ID == 0 || sig.Class == nil {
  217. continue
  218. }
  219. if len(item.snippet) == 0 || item.snipRate <= 0 {
  220. continue
  221. }
  222. // Decide whether this signal needs a session
  223. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  224. needsListen := st.signalHasListenerLocked(sig)
  225. if !needsRecording && !needsListen {
  226. continue
  227. }
  228. sess, exists := st.sessions[sig.ID]
  229. if !exists {
  230. if needsRecording {
  231. s, err := st.openRecordingSession(sig, now)
  232. if err != nil {
  233. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  234. sig.ID, sig.CenterHz/1e6, err)
  235. continue
  236. }
  237. st.sessions[sig.ID] = s
  238. sess = s
  239. } else {
  240. s := st.openListenSession(sig, now)
  241. st.sessions[sig.ID] = s
  242. sess = s
  243. }
  244. // Attach any pending listeners
  245. st.attachPendingListeners(sess)
  246. }
  247. // Update metadata
  248. sess.lastFeed = now
  249. sess.centerHz = sig.CenterHz
  250. sess.bwHz = sig.BWHz
  251. if sig.SNRDb > sess.snrDb {
  252. sess.snrDb = sig.SNRDb
  253. }
  254. if sig.PeakDb > sess.peakDb {
  255. sess.peakDb = sig.PeakDb
  256. }
  257. if sig.Class != nil {
  258. sess.class = sig.Class
  259. }
  260. // Demod with persistent state
  261. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  262. if len(audio) > 0 {
  263. if sess.wavSamples == 0 && audioRate > 0 {
  264. sess.sampleRate = audioRate
  265. }
  266. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  267. pcmLen := len(audio) * 2
  268. pcm := sess.growPCM(pcmLen)
  269. for k, s := range audio {
  270. v := int16(clip(s * 32767))
  271. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  272. }
  273. if !sess.listenOnly && sess.wavBuf != nil {
  274. n, err := sess.wavBuf.Write(pcm)
  275. if err != nil {
  276. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  277. } else {
  278. sess.wavSamples += int64(n / 2)
  279. }
  280. }
  281. st.fanoutPCM(sess, pcm, pcmLen)
  282. }
  283. // Segment split (recording sessions only)
  284. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  285. segIdx := sess.segmentIdx + 1
  286. oldSubs := sess.audioSubs
  287. oldState := sess.captureDSPState()
  288. sess.audioSubs = nil
  289. closeSession(sess, &st.policy)
  290. s, err := st.openRecordingSession(sig, now)
  291. if err != nil {
  292. delete(st.sessions, sig.ID)
  293. continue
  294. }
  295. s.segmentIdx = segIdx
  296. s.audioSubs = oldSubs
  297. s.restoreDSPState(oldState)
  298. st.sessions[sig.ID] = s
  299. }
  300. }
  301. // Close sessions for disappeared signals (with grace period)
  302. for id, sess := range st.sessions {
  303. if seen[id] {
  304. continue
  305. }
  306. gracePeriod := 3 * time.Second
  307. if sess.listenOnly {
  308. gracePeriod = 5 * time.Second
  309. }
  310. if now.Sub(sess.lastFeed) > gracePeriod {
  311. for _, sub := range sess.audioSubs {
  312. close(sub.ch)
  313. }
  314. sess.audioSubs = nil
  315. if !sess.listenOnly {
  316. closeSession(sess, &st.policy)
  317. }
  318. delete(st.sessions, id)
  319. }
  320. }
  321. }
  322. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  323. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  324. return true
  325. }
  326. for _, pl := range st.pendingListens {
  327. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  328. return true
  329. }
  330. }
  331. return false
  332. }
  333. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  334. for subID, pl := range st.pendingListens {
  335. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  336. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  337. delete(st.pendingListens, subID)
  338. // Send updated audio_info now that we know the real session params.
  339. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  340. infoJSON, _ := json.Marshal(AudioInfo{
  341. SampleRate: sess.sampleRate,
  342. Channels: sess.channels,
  343. Format: "s16le",
  344. DemodName: sess.demodName,
  345. })
  346. tagged := make([]byte, 1+len(infoJSON))
  347. tagged[0] = 0x00 // tag: audio_info
  348. copy(tagged[1:], infoJSON)
  349. select {
  350. case pl.ch <- tagged:
  351. default:
  352. }
  353. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  354. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  355. }
  356. }
  357. }
  358. // CloseAll finalises all sessions and stops the worker goroutine.
  359. func (st *Streamer) CloseAll() {
  360. close(st.feedCh)
  361. <-st.done
  362. st.mu.Lock()
  363. defer st.mu.Unlock()
  364. for id, sess := range st.sessions {
  365. for _, sub := range sess.audioSubs {
  366. close(sub.ch)
  367. }
  368. sess.audioSubs = nil
  369. if !sess.listenOnly {
  370. closeSession(sess, &st.policy)
  371. }
  372. delete(st.sessions, id)
  373. }
  374. for _, pl := range st.pendingListens {
  375. close(pl.ch)
  376. }
  377. st.pendingListens = nil
  378. }
  379. // ActiveSessions returns the number of open streaming sessions.
  380. func (st *Streamer) ActiveSessions() int {
  381. st.mu.Lock()
  382. defer st.mu.Unlock()
  383. return len(st.sessions)
  384. }
  385. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  386. //
  387. // LL-2: Returns AudioInfo with correct channels and sample rate.
  388. // LL-3: Returns error only on hard failures (nil streamer etc).
  389. //
  390. // If a matching session exists, attaches immediately. Otherwise, the
  391. // subscriber is held as "pending" and will be attached when a matching
  392. // signal appears in the next DSP frame.
  393. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  394. ch := make(chan []byte, 64)
  395. st.mu.Lock()
  396. defer st.mu.Unlock()
  397. st.nextSub++
  398. subID := st.nextSub
  399. // Try to find a matching session
  400. var bestSess *streamSession
  401. bestDist := math.MaxFloat64
  402. for _, sess := range st.sessions {
  403. d := math.Abs(sess.centerHz - freq)
  404. if d < bestDist {
  405. bestDist = d
  406. bestSess = sess
  407. }
  408. }
  409. if bestSess != nil && bestDist < 200000 {
  410. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  411. info := AudioInfo{
  412. SampleRate: bestSess.sampleRate,
  413. Channels: bestSess.channels,
  414. Format: "s16le",
  415. DemodName: bestSess.demodName,
  416. }
  417. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  418. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  419. return subID, ch, info, nil
  420. }
  421. // No matching session yet — add as pending listener
  422. st.pendingListens[subID] = &pendingListen{
  423. freq: freq,
  424. bw: bw,
  425. mode: mode,
  426. ch: ch,
  427. }
  428. info := AudioInfo{
  429. SampleRate: streamAudioRate,
  430. Channels: 1,
  431. Format: "s16le",
  432. DemodName: "NFM",
  433. }
  434. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  435. return subID, ch, info, nil
  436. }
  437. // UnsubscribeAudio removes a live-listen subscriber.
  438. func (st *Streamer) UnsubscribeAudio(subID int64) {
  439. st.mu.Lock()
  440. defer st.mu.Unlock()
  441. if pl, ok := st.pendingListens[subID]; ok {
  442. close(pl.ch)
  443. delete(st.pendingListens, subID)
  444. return
  445. }
  446. for _, sess := range st.sessions {
  447. for i, sub := range sess.audioSubs {
  448. if sub.id == subID {
  449. close(sub.ch)
  450. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  451. return
  452. }
  453. }
  454. }
  455. }
  456. // ---------------------------------------------------------------------------
  457. // Session: stateful extraction + demod
  458. // ---------------------------------------------------------------------------
  459. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  460. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  461. // output with zero transient artifacts.
  462. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  463. if len(snippet) == 0 || snipRate <= 0 {
  464. return nil, 0
  465. }
  466. isWFMStereo := sess.demodName == "WFM_STEREO"
  467. isWFM := sess.demodName == "WFM" || isWFMStereo
  468. demodName := sess.demodName
  469. if isWFMStereo {
  470. demodName = "WFM"
  471. }
  472. d := demod.Get(demodName)
  473. if d == nil {
  474. d = demod.Get("NFM")
  475. }
  476. if d == nil {
  477. return nil, 0
  478. }
  479. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  480. // The FM discriminator needs iq[i-1] to compute the first output.
  481. // All FIR filtering is now stateful, so no additional overlap is needed.
  482. var fullSnip []complex64
  483. trimSamples := 0
  484. if len(sess.overlapIQ) == 1 {
  485. fullSnip = make([]complex64, 1+len(snippet))
  486. fullSnip[0] = sess.overlapIQ[0]
  487. copy(fullSnip[1:], snippet)
  488. trimSamples = 1
  489. } else {
  490. fullSnip = snippet
  491. }
  492. // Save last sample for next frame's FM discriminator
  493. if len(snippet) > 0 {
  494. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  495. }
  496. // --- Stateful anti-alias FIR + decimation to demod rate ---
  497. demodRate := d.OutputSampleRate()
  498. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  499. if decim1 < 1 {
  500. decim1 = 1
  501. }
  502. actualDemodRate := snipRate / decim1
  503. var dec []complex64
  504. if decim1 > 1 {
  505. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  506. // Lazy-init or reinit stateful FIR if parameters changed
  507. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  508. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  509. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  510. sess.preDemodRate = snipRate
  511. sess.preDemodCutoff = cutoff
  512. sess.preDemodDecim = decim1
  513. }
  514. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  515. dec = dsp.Decimate(filtered, decim1)
  516. } else {
  517. dec = fullSnip
  518. }
  519. // --- FM Demod ---
  520. audio := d.Demod(dec, actualDemodRate)
  521. if len(audio) == 0 {
  522. return nil, 0
  523. }
  524. // --- Trim the 1-sample FM discriminator overlap ---
  525. if trimSamples > 0 {
  526. audioTrim := trimSamples / decim1
  527. if audioTrim < 1 {
  528. audioTrim = 1 // at minimum trim 1 audio sample
  529. }
  530. if audioTrim > 0 && audioTrim < len(audio) {
  531. audio = audio[audioTrim:]
  532. }
  533. }
  534. // --- Stateful stereo decode ---
  535. channels := 1
  536. if isWFMStereo {
  537. channels = 2
  538. audio = sess.stereoDecodeStateful(audio, actualDemodRate)
  539. }
  540. // --- Polyphase resample to exact 48kHz ---
  541. if actualDemodRate != streamAudioRate {
  542. if channels > 1 {
  543. if sess.stereoResampler == nil {
  544. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  545. }
  546. audio = sess.stereoResampler.Process(audio)
  547. } else {
  548. if sess.monoResampler == nil {
  549. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  550. }
  551. audio = sess.monoResampler.Process(audio)
  552. }
  553. }
  554. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  555. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  556. tau := sess.deemphasisUs * 1e-6
  557. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  558. if channels > 1 {
  559. nFrames := len(audio) / channels
  560. yL, yR := sess.deemphL, sess.deemphR
  561. for i := 0; i < nFrames; i++ {
  562. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  563. audio[i*2] = float32(yL)
  564. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  565. audio[i*2+1] = float32(yR)
  566. }
  567. sess.deemphL, sess.deemphR = yL, yR
  568. } else {
  569. y := sess.deemphL
  570. for i := range audio {
  571. y = alpha*y + (1-alpha)*float64(audio[i])
  572. audio[i] = float32(y)
  573. }
  574. sess.deemphL = y
  575. }
  576. }
  577. return audio, streamAudioRate
  578. }
  579. // stereoDecodeStateful: phase-continuous 38kHz oscillator for L-R extraction.
  580. // AQ-4: Uses persistent FIR filter state across frames for click-free stereo.
  581. // Reuses session scratch buffers to minimize allocations.
  582. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) []float32 {
  583. if len(mono) == 0 || sampleRate <= 0 {
  584. return nil
  585. }
  586. n := len(mono)
  587. // Lazy-init stateful filters on first call
  588. if sess.stereoLPF == nil {
  589. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  590. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  591. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  592. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  593. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  594. }
  595. // Reuse scratch for intermediates: need 4*n float32 for bpf, lr, hi, lo
  596. // plus 2*n for output. We'll use scratchAudio for bpf+lr (2*n) and
  597. // allocate hi/lo from the stateful FIR ProcessInto.
  598. scratch := sess.growAudio(n * 4)
  599. bpf := scratch[:n]
  600. lr := scratch[n : 2*n]
  601. hiBuf := scratch[2*n : 3*n]
  602. loBuf := scratch[3*n : 4*n]
  603. lpr := sess.stereoLPF.Process(mono) // allocates — but could use ProcessInto too
  604. sess.stereoBPHi.ProcessInto(mono, hiBuf)
  605. sess.stereoBPLo.ProcessInto(mono, loBuf)
  606. for i := 0; i < n; i++ {
  607. bpf[i] = hiBuf[i] - loBuf[i]
  608. }
  609. phase := sess.stereoPhase
  610. inc := 2 * math.Pi * 38000 / float64(sampleRate)
  611. for i := range bpf {
  612. phase += inc
  613. lr[i] = bpf[i] * float32(2*math.Cos(phase))
  614. }
  615. sess.stereoPhase = math.Mod(phase, 2*math.Pi)
  616. lr = sess.stereoLRLPF.Process(lr)
  617. out := make([]float32, n*2)
  618. for i := 0; i < n; i++ {
  619. out[i*2] = 0.5 * (lpr[i] + lr[i])
  620. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  621. }
  622. return out
  623. }
  624. // dspStateSnapshot captures persistent DSP state for segment splits.
  625. type dspStateSnapshot struct {
  626. overlapIQ []complex64
  627. deemphL float64
  628. deemphR float64
  629. stereoPhase float64
  630. monoResampler *dsp.Resampler
  631. stereoResampler *dsp.StereoResampler
  632. stereoLPF *dsp.StatefulFIRReal
  633. stereoBPHi *dsp.StatefulFIRReal
  634. stereoBPLo *dsp.StatefulFIRReal
  635. stereoLRLPF *dsp.StatefulFIRReal
  636. stereoAALPF *dsp.StatefulFIRReal
  637. preDemodFIR *dsp.StatefulFIRComplex
  638. preDemodDecim int
  639. preDemodRate int
  640. preDemodCutoff float64
  641. }
  642. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  643. return dspStateSnapshot{
  644. overlapIQ: sess.overlapIQ,
  645. deemphL: sess.deemphL,
  646. deemphR: sess.deemphR,
  647. stereoPhase: sess.stereoPhase,
  648. monoResampler: sess.monoResampler,
  649. stereoResampler: sess.stereoResampler,
  650. stereoLPF: sess.stereoLPF,
  651. stereoBPHi: sess.stereoBPHi,
  652. stereoBPLo: sess.stereoBPLo,
  653. stereoLRLPF: sess.stereoLRLPF,
  654. stereoAALPF: sess.stereoAALPF,
  655. preDemodFIR: sess.preDemodFIR,
  656. preDemodDecim: sess.preDemodDecim,
  657. preDemodRate: sess.preDemodRate,
  658. preDemodCutoff: sess.preDemodCutoff,
  659. }
  660. }
  661. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  662. sess.overlapIQ = s.overlapIQ
  663. sess.deemphL = s.deemphL
  664. sess.deemphR = s.deemphR
  665. sess.stereoPhase = s.stereoPhase
  666. sess.monoResampler = s.monoResampler
  667. sess.stereoResampler = s.stereoResampler
  668. sess.stereoLPF = s.stereoLPF
  669. sess.stereoBPHi = s.stereoBPHi
  670. sess.stereoBPLo = s.stereoBPLo
  671. sess.stereoLRLPF = s.stereoLRLPF
  672. sess.stereoAALPF = s.stereoAALPF
  673. sess.preDemodFIR = s.preDemodFIR
  674. sess.preDemodDecim = s.preDemodDecim
  675. sess.preDemodRate = s.preDemodRate
  676. sess.preDemodCutoff = s.preDemodCutoff
  677. }
  678. // ---------------------------------------------------------------------------
  679. // Session management helpers
  680. // ---------------------------------------------------------------------------
  681. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  682. outputDir := st.policy.OutputDir
  683. if outputDir == "" {
  684. outputDir = "data/recordings"
  685. }
  686. demodName, channels := resolveDemod(sig)
  687. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  688. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  689. dir := filepath.Join(outputDir, dirName)
  690. if err := os.MkdirAll(dir, 0o755); err != nil {
  691. return nil, err
  692. }
  693. wavPath := filepath.Join(dir, "audio.wav")
  694. f, err := os.Create(wavPath)
  695. if err != nil {
  696. return nil, err
  697. }
  698. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  699. f.Close()
  700. return nil, err
  701. }
  702. sess := &streamSession{
  703. signalID: sig.ID,
  704. centerHz: sig.CenterHz,
  705. bwHz: sig.BWHz,
  706. snrDb: sig.SNRDb,
  707. peakDb: sig.PeakDb,
  708. class: sig.Class,
  709. startTime: now,
  710. lastFeed: now,
  711. dir: dir,
  712. wavFile: f,
  713. wavBuf: bufio.NewWriterSize(f, 64*1024),
  714. sampleRate: streamAudioRate,
  715. channels: channels,
  716. demodName: demodName,
  717. deemphasisUs: st.policy.DeemphasisUs,
  718. }
  719. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  720. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  721. return sess, nil
  722. }
  723. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  724. demodName, channels := resolveDemod(sig)
  725. sess := &streamSession{
  726. signalID: sig.ID,
  727. centerHz: sig.CenterHz,
  728. bwHz: sig.BWHz,
  729. snrDb: sig.SNRDb,
  730. peakDb: sig.PeakDb,
  731. class: sig.Class,
  732. startTime: now,
  733. lastFeed: now,
  734. listenOnly: true,
  735. sampleRate: streamAudioRate,
  736. channels: channels,
  737. demodName: demodName,
  738. deemphasisUs: st.policy.DeemphasisUs,
  739. }
  740. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  741. sig.ID, sig.CenterHz/1e6, demodName)
  742. return sess
  743. }
  744. func resolveDemod(sig *detector.Signal) (string, int) {
  745. demodName := "NFM"
  746. if sig.Class != nil {
  747. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  748. demodName = n
  749. }
  750. }
  751. channels := 1
  752. if demodName == "WFM_STEREO" {
  753. channels = 2
  754. } else if d := demod.Get(demodName); d != nil {
  755. channels = d.Channels()
  756. }
  757. return demodName, channels
  758. }
  759. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  760. func (sess *streamSession) growIQ(n int) []complex64 {
  761. if cap(sess.scratchIQ) >= n {
  762. return sess.scratchIQ[:n]
  763. }
  764. sess.scratchIQ = make([]complex64, n, n*5/4)
  765. return sess.scratchIQ
  766. }
  767. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  768. func (sess *streamSession) growAudio(n int) []float32 {
  769. if cap(sess.scratchAudio) >= n {
  770. return sess.scratchAudio[:n]
  771. }
  772. sess.scratchAudio = make([]float32, n, n*5/4)
  773. return sess.scratchAudio
  774. }
  775. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  776. func (sess *streamSession) growPCM(n int) []byte {
  777. if cap(sess.scratchPCM) >= n {
  778. return sess.scratchPCM[:n]
  779. }
  780. sess.scratchPCM = make([]byte, n, n*5/4)
  781. return sess.scratchPCM
  782. }
  783. func convertToListenOnly(sess *streamSession) {
  784. if sess.wavBuf != nil {
  785. _ = sess.wavBuf.Flush()
  786. }
  787. if sess.wavFile != nil {
  788. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  789. sess.wavFile.Close()
  790. }
  791. sess.wavFile = nil
  792. sess.wavBuf = nil
  793. sess.listenOnly = true
  794. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  795. }
  796. func closeSession(sess *streamSession, policy *Policy) {
  797. if sess.listenOnly {
  798. return
  799. }
  800. if sess.wavBuf != nil {
  801. _ = sess.wavBuf.Flush()
  802. }
  803. if sess.wavFile != nil {
  804. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  805. sess.wavFile.Close()
  806. sess.wavFile = nil
  807. sess.wavBuf = nil
  808. }
  809. dur := sess.lastFeed.Sub(sess.startTime)
  810. files := map[string]any{
  811. "audio": "audio.wav",
  812. "audio_sample_rate": sess.sampleRate,
  813. "audio_channels": sess.channels,
  814. "audio_demod": sess.demodName,
  815. "recording_mode": "streaming",
  816. }
  817. meta := Meta{
  818. EventID: sess.signalID,
  819. Start: sess.startTime,
  820. End: sess.lastFeed,
  821. CenterHz: sess.centerHz,
  822. BandwidthHz: sess.bwHz,
  823. SampleRate: sess.sampleRate,
  824. SNRDb: sess.snrDb,
  825. PeakDb: sess.peakDb,
  826. Class: sess.class,
  827. DurationMs: dur.Milliseconds(),
  828. Files: files,
  829. }
  830. b, err := json.MarshalIndent(meta, "", " ")
  831. if err == nil {
  832. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  833. }
  834. if policy != nil {
  835. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  836. }
  837. }
  838. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  839. if len(sess.audioSubs) == 0 {
  840. return
  841. }
  842. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  843. tagged := make([]byte, 1+pcmLen)
  844. tagged[0] = 0x01
  845. copy(tagged[1:], pcm[:pcmLen])
  846. alive := sess.audioSubs[:0]
  847. for _, sub := range sess.audioSubs {
  848. select {
  849. case sub.ch <- tagged:
  850. default:
  851. }
  852. alive = append(alive, sub)
  853. }
  854. sess.audioSubs = alive
  855. }
  856. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  857. if len(st.policy.ClassFilter) == 0 {
  858. return true
  859. }
  860. if cls == nil {
  861. return false
  862. }
  863. for _, f := range st.policy.ClassFilter {
  864. if strings.EqualFold(f, string(cls.ModType)) {
  865. return true
  866. }
  867. }
  868. return false
  869. }
  870. // ErrNoSession is returned when no matching signal session exists.
  871. var ErrNoSession = errors.New("no active or pending session for this frequency")
  872. // ---------------------------------------------------------------------------
  873. // WAV header helpers
  874. // ---------------------------------------------------------------------------
  875. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  876. if channels <= 0 {
  877. channels = 1
  878. }
  879. hdr := make([]byte, 44)
  880. copy(hdr[0:4], "RIFF")
  881. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  882. copy(hdr[8:12], "WAVE")
  883. copy(hdr[12:16], "fmt ")
  884. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  885. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  886. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  887. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  888. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  889. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  890. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  891. copy(hdr[36:40], "data")
  892. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  893. _, err := f.Write(hdr)
  894. return err
  895. }
  896. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  897. dataSize := uint32(totalSamples * 2)
  898. var buf [4]byte
  899. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  900. if _, err := f.Seek(4, 0); err != nil {
  901. return
  902. }
  903. _, _ = f.Write(buf[:])
  904. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  905. if _, err := f.Seek(24, 0); err != nil {
  906. return
  907. }
  908. _, _ = f.Write(buf[:])
  909. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  910. if _, err := f.Seek(28, 0); err != nil {
  911. return
  912. }
  913. _, _ = f.Write(buf[:])
  914. binary.LittleEndian.PutUint32(buf[:], dataSize)
  915. if _, err := f.Seek(40, 0); err != nil {
  916. return
  917. }
  918. _, _ = f.Write(buf[:])
  919. }