Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

1155 строки
32KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. )
  20. // ---------------------------------------------------------------------------
  21. // streamSession — one open demod session for one signal
  22. // ---------------------------------------------------------------------------
  23. type streamSession struct {
  24. signalID int64
  25. centerHz float64
  26. bwHz float64
  27. snrDb float64
  28. peakDb float64
  29. class *classifier.Classification
  30. startTime time.Time
  31. lastFeed time.Time
  32. // listenOnly sessions have no WAV file and no disk I/O.
  33. // They exist solely to feed audio to live-listen subscribers.
  34. listenOnly bool
  35. // Recording state (nil/zero for listen-only sessions)
  36. dir string
  37. wavFile *os.File
  38. wavBuf *bufio.Writer
  39. wavSamples int64
  40. segmentIdx int
  41. sampleRate int // actual output audio sample rate (always streamAudioRate)
  42. channels int
  43. demodName string
  44. // --- Persistent DSP state for click-free streaming ---
  45. // Overlap-save: tail of previous extracted IQ snippet.
  46. overlapIQ []complex64
  47. // De-emphasis IIR state (persists across frames)
  48. deemphL float64
  49. deemphR float64
  50. // Stereo decode: phase-continuous 38kHz oscillator
  51. stereoPhase float64
  52. // Stereo lock state for live WFM streaming
  53. stereoEnabled bool
  54. stereoOnCount int
  55. stereoOffCount int
  56. // Polyphase resampler (replaces integer-decimate hack)
  57. monoResampler *dsp.Resampler
  58. monoResamplerRate int
  59. stereoResampler *dsp.StereoResampler
  60. stereoResamplerRate int
  61. // AQ-4: Stateful FIR filters for click-free stereo decode
  62. stereoFilterRate int
  63. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  64. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  65. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  66. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  67. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  68. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  69. // and avoids per-frame FIR recomputation)
  70. preDemodFIR *dsp.StatefulFIRComplex
  71. preDemodDecim int // cached decimation factor
  72. preDemodRate int // cached snipRate this FIR was built for
  73. preDemodCutoff float64 // cached cutoff
  74. // AQ-2: De-emphasis config (µs, 0 = disabled)
  75. deemphasisUs float64
  76. // Scratch buffers — reused across frames to avoid GC pressure.
  77. // Grown as needed, never shrunk.
  78. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  79. scratchAudio []float32 // for stereo decode intermediates
  80. scratchPCM []byte // for PCM encoding
  81. // live-listen subscribers
  82. audioSubs []audioSub
  83. }
  84. type audioSub struct {
  85. id int64
  86. ch chan []byte
  87. }
  88. // AudioInfo describes the audio format of a live-listen subscription.
  89. // Sent to the WebSocket client as the first message.
  90. type AudioInfo struct {
  91. SampleRate int `json:"sample_rate"`
  92. Channels int `json:"channels"`
  93. Format string `json:"format"` // always "s16le"
  94. DemodName string `json:"demod"`
  95. }
  96. const (
  97. streamAudioRate = 48000
  98. resamplerTaps = 32 // taps per polyphase arm — good quality
  99. )
  100. // ---------------------------------------------------------------------------
  101. // Streamer — manages all active streaming sessions
  102. // ---------------------------------------------------------------------------
  103. type streamFeedItem struct {
  104. signal detector.Signal
  105. snippet []complex64
  106. snipRate int
  107. }
  108. type streamFeedMsg struct {
  109. items []streamFeedItem
  110. }
  111. type Streamer struct {
  112. mu sync.Mutex
  113. sessions map[int64]*streamSession
  114. policy Policy
  115. centerHz float64
  116. nextSub int64
  117. feedCh chan streamFeedMsg
  118. done chan struct{}
  119. // pendingListens are subscribers waiting for a matching session.
  120. pendingListens map[int64]*pendingListen
  121. }
  122. type pendingListen struct {
  123. freq float64
  124. bw float64
  125. mode string
  126. ch chan []byte
  127. }
  128. func newStreamer(policy Policy, centerHz float64) *Streamer {
  129. st := &Streamer{
  130. sessions: make(map[int64]*streamSession),
  131. policy: policy,
  132. centerHz: centerHz,
  133. feedCh: make(chan streamFeedMsg, 2),
  134. done: make(chan struct{}),
  135. pendingListens: make(map[int64]*pendingListen),
  136. }
  137. go st.worker()
  138. return st
  139. }
  140. func (st *Streamer) worker() {
  141. for msg := range st.feedCh {
  142. st.processFeed(msg)
  143. }
  144. close(st.done)
  145. }
  146. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  147. st.mu.Lock()
  148. defer st.mu.Unlock()
  149. wasEnabled := st.policy.Enabled
  150. st.policy = policy
  151. st.centerHz = centerHz
  152. // If recording was just disabled, close recording sessions
  153. // but keep listen-only sessions alive.
  154. if wasEnabled && !policy.Enabled {
  155. for id, sess := range st.sessions {
  156. if sess.listenOnly {
  157. continue
  158. }
  159. if len(sess.audioSubs) > 0 {
  160. // Convert to listen-only: close WAV but keep session
  161. convertToListenOnly(sess)
  162. } else {
  163. closeSession(sess, &st.policy)
  164. delete(st.sessions, id)
  165. }
  166. }
  167. }
  168. }
  169. // HasListeners returns true if any sessions have audio subscribers
  170. // or there are pending listen requests. Used by the DSP loop to
  171. // decide whether to feed snippets even when recording is disabled.
  172. func (st *Streamer) HasListeners() bool {
  173. st.mu.Lock()
  174. defer st.mu.Unlock()
  175. return st.hasListenersLocked()
  176. }
  177. func (st *Streamer) hasListenersLocked() bool {
  178. if len(st.pendingListens) > 0 {
  179. return true
  180. }
  181. for _, sess := range st.sessions {
  182. if len(sess.audioSubs) > 0 {
  183. return true
  184. }
  185. }
  186. return false
  187. }
  188. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  189. // Feeds are accepted if:
  190. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  191. // - Any live-listen subscribers exist (listen-only mode)
  192. //
  193. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  194. // data, so items can be passed directly without another copy.
  195. func (st *Streamer) FeedSnippets(items []streamFeedItem) {
  196. st.mu.Lock()
  197. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  198. hasListeners := st.hasListenersLocked()
  199. pending := len(st.pendingListens)
  200. st.mu.Unlock()
  201. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  202. if (!recEnabled && !hasListeners) || len(items) == 0 {
  203. return
  204. }
  205. select {
  206. case st.feedCh <- streamFeedMsg{items: items}:
  207. default:
  208. }
  209. }
  210. // processFeed runs in the worker goroutine.
  211. func (st *Streamer) processFeed(msg streamFeedMsg) {
  212. st.mu.Lock()
  213. defer st.mu.Unlock()
  214. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  215. hasListeners := st.hasListenersLocked()
  216. if !recEnabled && !hasListeners {
  217. return
  218. }
  219. now := time.Now()
  220. seen := make(map[int64]bool, len(msg.items))
  221. for i := range msg.items {
  222. item := &msg.items[i]
  223. sig := &item.signal
  224. seen[sig.ID] = true
  225. if sig.ID == 0 || sig.Class == nil {
  226. continue
  227. }
  228. if len(item.snippet) == 0 || item.snipRate <= 0 {
  229. continue
  230. }
  231. // Decide whether this signal needs a session
  232. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  233. needsListen := st.signalHasListenerLocked(sig)
  234. className := "<nil>"
  235. demodName := ""
  236. if sig.Class != nil {
  237. className = string(sig.Class.ModType)
  238. demodName, _ = resolveDemod(sig)
  239. }
  240. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  241. if !needsRecording && !needsListen {
  242. continue
  243. }
  244. sess, exists := st.sessions[sig.ID]
  245. requestedMode := ""
  246. for _, pl := range st.pendingListens {
  247. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  248. if m := normalizeRequestedMode(pl.mode); m != "" {
  249. requestedMode = m
  250. break
  251. }
  252. }
  253. }
  254. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  255. for _, sub := range sess.audioSubs {
  256. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  257. }
  258. delete(st.sessions, sig.ID)
  259. sess = nil
  260. exists = false
  261. }
  262. if !exists {
  263. if needsRecording {
  264. s, err := st.openRecordingSession(sig, now)
  265. if err != nil {
  266. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  267. sig.ID, sig.CenterHz/1e6, err)
  268. continue
  269. }
  270. st.sessions[sig.ID] = s
  271. sess = s
  272. } else {
  273. s := st.openListenSession(sig, now)
  274. st.sessions[sig.ID] = s
  275. sess = s
  276. }
  277. // Attach any pending listeners
  278. st.attachPendingListeners(sess)
  279. }
  280. // Update metadata
  281. sess.lastFeed = now
  282. sess.centerHz = sig.CenterHz
  283. sess.bwHz = sig.BWHz
  284. if sig.SNRDb > sess.snrDb {
  285. sess.snrDb = sig.SNRDb
  286. }
  287. if sig.PeakDb > sess.peakDb {
  288. sess.peakDb = sig.PeakDb
  289. }
  290. if sig.Class != nil {
  291. sess.class = sig.Class
  292. }
  293. // Demod with persistent state
  294. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  295. if len(audio) > 0 {
  296. if sess.wavSamples == 0 && audioRate > 0 {
  297. sess.sampleRate = audioRate
  298. }
  299. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  300. pcmLen := len(audio) * 2
  301. pcm := sess.growPCM(pcmLen)
  302. for k, s := range audio {
  303. v := int16(clip(s * 32767))
  304. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  305. }
  306. if !sess.listenOnly && sess.wavBuf != nil {
  307. n, err := sess.wavBuf.Write(pcm)
  308. if err != nil {
  309. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  310. } else {
  311. sess.wavSamples += int64(n / 2)
  312. }
  313. }
  314. st.fanoutPCM(sess, pcm, pcmLen)
  315. }
  316. // Segment split (recording sessions only)
  317. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  318. segIdx := sess.segmentIdx + 1
  319. oldSubs := sess.audioSubs
  320. oldState := sess.captureDSPState()
  321. sess.audioSubs = nil
  322. closeSession(sess, &st.policy)
  323. s, err := st.openRecordingSession(sig, now)
  324. if err != nil {
  325. delete(st.sessions, sig.ID)
  326. continue
  327. }
  328. s.segmentIdx = segIdx
  329. s.audioSubs = oldSubs
  330. s.restoreDSPState(oldState)
  331. st.sessions[sig.ID] = s
  332. }
  333. }
  334. // Close sessions for disappeared signals (with grace period)
  335. for id, sess := range st.sessions {
  336. if seen[id] {
  337. continue
  338. }
  339. gracePeriod := 3 * time.Second
  340. if sess.listenOnly {
  341. gracePeriod = 5 * time.Second
  342. }
  343. if now.Sub(sess.lastFeed) > gracePeriod {
  344. for _, sub := range sess.audioSubs {
  345. close(sub.ch)
  346. }
  347. sess.audioSubs = nil
  348. if !sess.listenOnly {
  349. closeSession(sess, &st.policy)
  350. }
  351. delete(st.sessions, id)
  352. }
  353. }
  354. }
  355. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  356. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  357. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  358. return true
  359. }
  360. for subID, pl := range st.pendingListens {
  361. delta := math.Abs(sig.CenterHz - pl.freq)
  362. if delta < 200000 {
  363. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  364. return true
  365. }
  366. }
  367. return false
  368. }
  369. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  370. for subID, pl := range st.pendingListens {
  371. requestedMode := normalizeRequestedMode(pl.mode)
  372. if requestedMode != "" && sess.demodName != requestedMode {
  373. continue
  374. }
  375. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  376. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  377. delete(st.pendingListens, subID)
  378. // Send updated audio_info now that we know the real session params.
  379. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  380. infoJSON, _ := json.Marshal(AudioInfo{
  381. SampleRate: sess.sampleRate,
  382. Channels: sess.channels,
  383. Format: "s16le",
  384. DemodName: sess.demodName,
  385. })
  386. tagged := make([]byte, 1+len(infoJSON))
  387. tagged[0] = 0x00 // tag: audio_info
  388. copy(tagged[1:], infoJSON)
  389. select {
  390. case pl.ch <- tagged:
  391. default:
  392. }
  393. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  394. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  395. }
  396. }
  397. }
  398. // CloseAll finalises all sessions and stops the worker goroutine.
  399. func (st *Streamer) CloseAll() {
  400. close(st.feedCh)
  401. <-st.done
  402. st.mu.Lock()
  403. defer st.mu.Unlock()
  404. for id, sess := range st.sessions {
  405. for _, sub := range sess.audioSubs {
  406. close(sub.ch)
  407. }
  408. sess.audioSubs = nil
  409. if !sess.listenOnly {
  410. closeSession(sess, &st.policy)
  411. }
  412. delete(st.sessions, id)
  413. }
  414. for _, pl := range st.pendingListens {
  415. close(pl.ch)
  416. }
  417. st.pendingListens = nil
  418. }
  419. // ActiveSessions returns the number of open streaming sessions.
  420. func (st *Streamer) ActiveSessions() int {
  421. st.mu.Lock()
  422. defer st.mu.Unlock()
  423. return len(st.sessions)
  424. }
  425. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  426. //
  427. // LL-2: Returns AudioInfo with correct channels and sample rate.
  428. // LL-3: Returns error only on hard failures (nil streamer etc).
  429. //
  430. // If a matching session exists, attaches immediately. Otherwise, the
  431. // subscriber is held as "pending" and will be attached when a matching
  432. // signal appears in the next DSP frame.
  433. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  434. ch := make(chan []byte, 64)
  435. st.mu.Lock()
  436. defer st.mu.Unlock()
  437. st.nextSub++
  438. subID := st.nextSub
  439. requestedMode := normalizeRequestedMode(mode)
  440. // Try to find a matching session
  441. var bestSess *streamSession
  442. bestDist := math.MaxFloat64
  443. for _, sess := range st.sessions {
  444. if requestedMode != "" && sess.demodName != requestedMode {
  445. continue
  446. }
  447. d := math.Abs(sess.centerHz - freq)
  448. if d < bestDist {
  449. bestDist = d
  450. bestSess = sess
  451. }
  452. }
  453. if bestSess != nil && bestDist < 200000 {
  454. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  455. info := AudioInfo{
  456. SampleRate: bestSess.sampleRate,
  457. Channels: bestSess.channels,
  458. Format: "s16le",
  459. DemodName: bestSess.demodName,
  460. }
  461. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  462. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  463. return subID, ch, info, nil
  464. }
  465. // No matching session yet — add as pending listener
  466. st.pendingListens[subID] = &pendingListen{
  467. freq: freq,
  468. bw: bw,
  469. mode: mode,
  470. ch: ch,
  471. }
  472. info := AudioInfo{
  473. SampleRate: streamAudioRate,
  474. Channels: 1,
  475. Format: "s16le",
  476. DemodName: "NFM",
  477. }
  478. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  479. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  480. return subID, ch, info, nil
  481. }
  482. // UnsubscribeAudio removes a live-listen subscriber.
  483. func (st *Streamer) UnsubscribeAudio(subID int64) {
  484. st.mu.Lock()
  485. defer st.mu.Unlock()
  486. if pl, ok := st.pendingListens[subID]; ok {
  487. close(pl.ch)
  488. delete(st.pendingListens, subID)
  489. return
  490. }
  491. for _, sess := range st.sessions {
  492. for i, sub := range sess.audioSubs {
  493. if sub.id == subID {
  494. close(sub.ch)
  495. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  496. return
  497. }
  498. }
  499. }
  500. }
  501. // ---------------------------------------------------------------------------
  502. // Session: stateful extraction + demod
  503. // ---------------------------------------------------------------------------
  504. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  505. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  506. // output with zero transient artifacts.
  507. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  508. if len(snippet) == 0 || snipRate <= 0 {
  509. return nil, 0
  510. }
  511. isWFMStereo := sess.demodName == "WFM_STEREO"
  512. isWFM := sess.demodName == "WFM" || isWFMStereo
  513. demodName := sess.demodName
  514. if isWFMStereo {
  515. demodName = "WFM"
  516. }
  517. d := demod.Get(demodName)
  518. if d == nil {
  519. d = demod.Get("NFM")
  520. }
  521. if d == nil {
  522. return nil, 0
  523. }
  524. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  525. // The FM discriminator needs iq[i-1] to compute the first output.
  526. // All FIR filtering is now stateful, so no additional overlap is needed.
  527. var fullSnip []complex64
  528. trimSamples := 0
  529. if len(sess.overlapIQ) == 1 {
  530. fullSnip = make([]complex64, 1+len(snippet))
  531. fullSnip[0] = sess.overlapIQ[0]
  532. copy(fullSnip[1:], snippet)
  533. trimSamples = 1
  534. } else {
  535. fullSnip = snippet
  536. }
  537. // Save last sample for next frame's FM discriminator
  538. if len(snippet) > 0 {
  539. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  540. }
  541. // --- Stateful anti-alias FIR + decimation to demod rate ---
  542. demodRate := d.OutputSampleRate()
  543. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  544. if decim1 < 1 {
  545. decim1 = 1
  546. }
  547. actualDemodRate := snipRate / decim1
  548. var dec []complex64
  549. if decim1 > 1 {
  550. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  551. // Lazy-init or reinit stateful FIR if parameters changed
  552. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  553. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  554. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  555. sess.preDemodRate = snipRate
  556. sess.preDemodCutoff = cutoff
  557. sess.preDemodDecim = decim1
  558. }
  559. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  560. dec = dsp.Decimate(filtered, decim1)
  561. } else {
  562. dec = fullSnip
  563. }
  564. // --- FM Demod ---
  565. audio := d.Demod(dec, actualDemodRate)
  566. if len(audio) == 0 {
  567. return nil, 0
  568. }
  569. // --- Trim the 1-sample FM discriminator overlap ---
  570. if trimSamples > 0 {
  571. audioTrim := trimSamples / decim1
  572. if audioTrim < 1 {
  573. audioTrim = 1 // at minimum trim 1 audio sample
  574. }
  575. if audioTrim > 0 && audioTrim < len(audio) {
  576. audio = audio[audioTrim:]
  577. }
  578. }
  579. // --- Stateful stereo decode with conservative lock/hysteresis ---
  580. channels := 1
  581. if isWFMStereo {
  582. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  583. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  584. if locked {
  585. sess.stereoOnCount++
  586. sess.stereoOffCount = 0
  587. if sess.stereoOnCount >= 4 {
  588. sess.stereoEnabled = true
  589. }
  590. } else {
  591. sess.stereoOnCount = 0
  592. sess.stereoOffCount++
  593. if sess.stereoOffCount >= 10 {
  594. sess.stereoEnabled = false
  595. }
  596. }
  597. if sess.stereoEnabled && len(stereoAudio) > 0 {
  598. audio = stereoAudio
  599. } else {
  600. dual := make([]float32, len(audio)*2)
  601. for i, s := range audio {
  602. dual[i*2] = s
  603. dual[i*2+1] = s
  604. }
  605. audio = dual
  606. }
  607. }
  608. // --- Polyphase resample to exact 48kHz ---
  609. if actualDemodRate != streamAudioRate {
  610. if channels > 1 {
  611. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  612. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  613. sess.stereoResamplerRate = actualDemodRate
  614. }
  615. audio = sess.stereoResampler.Process(audio)
  616. } else {
  617. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  618. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  619. sess.monoResamplerRate = actualDemodRate
  620. }
  621. audio = sess.monoResampler.Process(audio)
  622. }
  623. }
  624. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  625. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  626. tau := sess.deemphasisUs * 1e-6
  627. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  628. if channels > 1 {
  629. nFrames := len(audio) / channels
  630. yL, yR := sess.deemphL, sess.deemphR
  631. for i := 0; i < nFrames; i++ {
  632. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  633. audio[i*2] = float32(yL)
  634. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  635. audio[i*2+1] = float32(yR)
  636. }
  637. sess.deemphL, sess.deemphR = yL, yR
  638. } else {
  639. y := sess.deemphL
  640. for i := range audio {
  641. y = alpha*y + (1-alpha)*float64(audio[i])
  642. audio[i] = float32(y)
  643. }
  644. sess.deemphL = y
  645. }
  646. }
  647. if isWFM {
  648. for i := range audio {
  649. audio[i] *= 0.35
  650. }
  651. }
  652. return audio, streamAudioRate
  653. }
  654. // stereoDecodeStateful: phase-continuous 38kHz oscillator for L-R extraction.
  655. // AQ-4: Uses persistent FIR filter state across frames for click-free stereo.
  656. // Reuses session scratch buffers to minimize allocations.
  657. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  658. if len(mono) == 0 || sampleRate <= 0 {
  659. return nil, false
  660. }
  661. n := len(mono)
  662. // Rebuild rate-dependent stereo filters when sampleRate changes
  663. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  664. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  665. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  666. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  667. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  668. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  669. sess.stereoFilterRate = sampleRate
  670. }
  671. // Reuse scratch for intermediates: need 4*n float32 for bpf, lr, hi, lo
  672. // plus 2*n for output. We'll use scratchAudio for bpf+lr (2*n) and
  673. // allocate hi/lo from the stateful FIR ProcessInto.
  674. scratch := sess.growAudio(n * 4)
  675. bpf := scratch[:n]
  676. lr := scratch[n : 2*n]
  677. hiBuf := scratch[2*n : 3*n]
  678. loBuf := scratch[3*n : 4*n]
  679. lpr := sess.stereoLPF.Process(mono) // allocates — but could use ProcessInto too
  680. sess.stereoBPHi.ProcessInto(mono, hiBuf)
  681. sess.stereoBPLo.ProcessInto(mono, loBuf)
  682. for i := 0; i < n; i++ {
  683. bpf[i] = hiBuf[i] - loBuf[i]
  684. }
  685. phase := sess.stereoPhase
  686. inc := 2 * math.Pi * 38000 / float64(sampleRate)
  687. var pilotPower float64
  688. var totalPower float64
  689. for i := range bpf {
  690. phase += inc
  691. v := bpf[i] * float32(2*math.Cos(phase))
  692. lr[i] = v
  693. pilotPower += math.Abs(float64(bpf[i]))
  694. totalPower += math.Abs(float64(mono[i]))
  695. }
  696. sess.stereoPhase = math.Mod(phase, 2*math.Pi)
  697. lr = sess.stereoLRLPF.Process(lr)
  698. locked := totalPower > 0 && (pilotPower/totalPower) > 0.12
  699. out := make([]float32, n*2)
  700. for i := 0; i < n; i++ {
  701. out[i*2] = 0.5 * (lpr[i] + lr[i])
  702. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  703. }
  704. return out, locked
  705. }
  706. // dspStateSnapshot captures persistent DSP state for segment splits.
  707. type dspStateSnapshot struct {
  708. overlapIQ []complex64
  709. deemphL float64
  710. deemphR float64
  711. stereoPhase float64
  712. monoResampler *dsp.Resampler
  713. monoResamplerRate int
  714. stereoResampler *dsp.StereoResampler
  715. stereoResamplerRate int
  716. stereoLPF *dsp.StatefulFIRReal
  717. stereoFilterRate int
  718. stereoBPHi *dsp.StatefulFIRReal
  719. stereoBPLo *dsp.StatefulFIRReal
  720. stereoLRLPF *dsp.StatefulFIRReal
  721. stereoAALPF *dsp.StatefulFIRReal
  722. preDemodFIR *dsp.StatefulFIRComplex
  723. preDemodDecim int
  724. preDemodRate int
  725. preDemodCutoff float64
  726. }
  727. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  728. return dspStateSnapshot{
  729. overlapIQ: sess.overlapIQ,
  730. deemphL: sess.deemphL,
  731. deemphR: sess.deemphR,
  732. stereoPhase: sess.stereoPhase,
  733. monoResampler: sess.monoResampler,
  734. monoResamplerRate: sess.monoResamplerRate,
  735. stereoResampler: sess.stereoResampler,
  736. stereoResamplerRate: sess.stereoResamplerRate,
  737. stereoLPF: sess.stereoLPF,
  738. stereoFilterRate: sess.stereoFilterRate,
  739. stereoBPHi: sess.stereoBPHi,
  740. stereoBPLo: sess.stereoBPLo,
  741. stereoLRLPF: sess.stereoLRLPF,
  742. stereoAALPF: sess.stereoAALPF,
  743. preDemodFIR: sess.preDemodFIR,
  744. preDemodDecim: sess.preDemodDecim,
  745. preDemodRate: sess.preDemodRate,
  746. preDemodCutoff: sess.preDemodCutoff,
  747. }
  748. }
  749. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  750. sess.overlapIQ = s.overlapIQ
  751. sess.deemphL = s.deemphL
  752. sess.deemphR = s.deemphR
  753. sess.stereoPhase = s.stereoPhase
  754. sess.monoResampler = s.monoResampler
  755. sess.monoResamplerRate = s.monoResamplerRate
  756. sess.stereoResampler = s.stereoResampler
  757. sess.stereoResamplerRate = s.stereoResamplerRate
  758. sess.stereoLPF = s.stereoLPF
  759. sess.stereoFilterRate = s.stereoFilterRate
  760. sess.stereoBPHi = s.stereoBPHi
  761. sess.stereoBPLo = s.stereoBPLo
  762. sess.stereoLRLPF = s.stereoLRLPF
  763. sess.stereoAALPF = s.stereoAALPF
  764. sess.preDemodFIR = s.preDemodFIR
  765. sess.preDemodDecim = s.preDemodDecim
  766. sess.preDemodRate = s.preDemodRate
  767. sess.preDemodCutoff = s.preDemodCutoff
  768. }
  769. // ---------------------------------------------------------------------------
  770. // Session management helpers
  771. // ---------------------------------------------------------------------------
  772. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  773. outputDir := st.policy.OutputDir
  774. if outputDir == "" {
  775. outputDir = "data/recordings"
  776. }
  777. demodName, channels := resolveDemod(sig)
  778. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  779. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  780. dir := filepath.Join(outputDir, dirName)
  781. if err := os.MkdirAll(dir, 0o755); err != nil {
  782. return nil, err
  783. }
  784. wavPath := filepath.Join(dir, "audio.wav")
  785. f, err := os.Create(wavPath)
  786. if err != nil {
  787. return nil, err
  788. }
  789. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  790. f.Close()
  791. return nil, err
  792. }
  793. sess := &streamSession{
  794. signalID: sig.ID,
  795. centerHz: sig.CenterHz,
  796. bwHz: sig.BWHz,
  797. snrDb: sig.SNRDb,
  798. peakDb: sig.PeakDb,
  799. class: sig.Class,
  800. startTime: now,
  801. lastFeed: now,
  802. dir: dir,
  803. wavFile: f,
  804. wavBuf: bufio.NewWriterSize(f, 64*1024),
  805. sampleRate: streamAudioRate,
  806. channels: channels,
  807. demodName: demodName,
  808. deemphasisUs: st.policy.DeemphasisUs,
  809. }
  810. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  811. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  812. return sess, nil
  813. }
  814. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  815. demodName, channels := resolveDemod(sig)
  816. for _, pl := range st.pendingListens {
  817. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  818. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  819. demodName = requested
  820. if demodName == "WFM_STEREO" {
  821. channels = 2
  822. } else if d := demod.Get(demodName); d != nil {
  823. channels = d.Channels()
  824. } else {
  825. channels = 1
  826. }
  827. break
  828. }
  829. }
  830. }
  831. sess := &streamSession{
  832. signalID: sig.ID,
  833. centerHz: sig.CenterHz,
  834. bwHz: sig.BWHz,
  835. snrDb: sig.SNRDb,
  836. peakDb: sig.PeakDb,
  837. class: sig.Class,
  838. startTime: now,
  839. lastFeed: now,
  840. listenOnly: true,
  841. sampleRate: streamAudioRate,
  842. channels: channels,
  843. demodName: demodName,
  844. deemphasisUs: st.policy.DeemphasisUs,
  845. }
  846. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  847. sig.ID, sig.CenterHz/1e6, demodName)
  848. return sess
  849. }
  850. func resolveDemod(sig *detector.Signal) (string, int) {
  851. demodName := "NFM"
  852. if sig.Class != nil {
  853. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  854. demodName = n
  855. }
  856. }
  857. channels := 1
  858. if demodName == "WFM_STEREO" {
  859. channels = 2
  860. } else if d := demod.Get(demodName); d != nil {
  861. channels = d.Channels()
  862. }
  863. return demodName, channels
  864. }
  865. func normalizeRequestedMode(mode string) string {
  866. switch strings.ToUpper(strings.TrimSpace(mode)) {
  867. case "", "AUTO":
  868. return ""
  869. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  870. return strings.ToUpper(strings.TrimSpace(mode))
  871. default:
  872. return ""
  873. }
  874. }
  875. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  876. func (sess *streamSession) growIQ(n int) []complex64 {
  877. if cap(sess.scratchIQ) >= n {
  878. return sess.scratchIQ[:n]
  879. }
  880. sess.scratchIQ = make([]complex64, n, n*5/4)
  881. return sess.scratchIQ
  882. }
  883. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  884. func (sess *streamSession) growAudio(n int) []float32 {
  885. if cap(sess.scratchAudio) >= n {
  886. return sess.scratchAudio[:n]
  887. }
  888. sess.scratchAudio = make([]float32, n, n*5/4)
  889. return sess.scratchAudio
  890. }
  891. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  892. func (sess *streamSession) growPCM(n int) []byte {
  893. if cap(sess.scratchPCM) >= n {
  894. return sess.scratchPCM[:n]
  895. }
  896. sess.scratchPCM = make([]byte, n, n*5/4)
  897. return sess.scratchPCM
  898. }
  899. func convertToListenOnly(sess *streamSession) {
  900. if sess.wavBuf != nil {
  901. _ = sess.wavBuf.Flush()
  902. }
  903. if sess.wavFile != nil {
  904. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  905. sess.wavFile.Close()
  906. }
  907. sess.wavFile = nil
  908. sess.wavBuf = nil
  909. sess.listenOnly = true
  910. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  911. }
  912. func closeSession(sess *streamSession, policy *Policy) {
  913. if sess.listenOnly {
  914. return
  915. }
  916. if sess.wavBuf != nil {
  917. _ = sess.wavBuf.Flush()
  918. }
  919. if sess.wavFile != nil {
  920. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  921. sess.wavFile.Close()
  922. sess.wavFile = nil
  923. sess.wavBuf = nil
  924. }
  925. dur := sess.lastFeed.Sub(sess.startTime)
  926. files := map[string]any{
  927. "audio": "audio.wav",
  928. "audio_sample_rate": sess.sampleRate,
  929. "audio_channels": sess.channels,
  930. "audio_demod": sess.demodName,
  931. "recording_mode": "streaming",
  932. }
  933. meta := Meta{
  934. EventID: sess.signalID,
  935. Start: sess.startTime,
  936. End: sess.lastFeed,
  937. CenterHz: sess.centerHz,
  938. BandwidthHz: sess.bwHz,
  939. SampleRate: sess.sampleRate,
  940. SNRDb: sess.snrDb,
  941. PeakDb: sess.peakDb,
  942. Class: sess.class,
  943. DurationMs: dur.Milliseconds(),
  944. Files: files,
  945. }
  946. b, err := json.MarshalIndent(meta, "", " ")
  947. if err == nil {
  948. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  949. }
  950. if policy != nil {
  951. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  952. }
  953. }
  954. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  955. if len(sess.audioSubs) == 0 {
  956. return
  957. }
  958. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  959. tagged := make([]byte, 1+pcmLen)
  960. tagged[0] = 0x01
  961. copy(tagged[1:], pcm[:pcmLen])
  962. alive := sess.audioSubs[:0]
  963. for _, sub := range sess.audioSubs {
  964. select {
  965. case sub.ch <- tagged:
  966. default:
  967. }
  968. alive = append(alive, sub)
  969. }
  970. sess.audioSubs = alive
  971. }
  972. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  973. if len(st.policy.ClassFilter) == 0 {
  974. return true
  975. }
  976. if cls == nil {
  977. return false
  978. }
  979. for _, f := range st.policy.ClassFilter {
  980. if strings.EqualFold(f, string(cls.ModType)) {
  981. return true
  982. }
  983. }
  984. return false
  985. }
  986. // ErrNoSession is returned when no matching signal session exists.
  987. var ErrNoSession = errors.New("no active or pending session for this frequency")
  988. // ---------------------------------------------------------------------------
  989. // WAV header helpers
  990. // ---------------------------------------------------------------------------
  991. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  992. if channels <= 0 {
  993. channels = 1
  994. }
  995. hdr := make([]byte, 44)
  996. copy(hdr[0:4], "RIFF")
  997. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  998. copy(hdr[8:12], "WAVE")
  999. copy(hdr[12:16], "fmt ")
  1000. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1001. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1002. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1003. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1004. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1005. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1006. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1007. copy(hdr[36:40], "data")
  1008. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1009. _, err := f.Write(hdr)
  1010. return err
  1011. }
  1012. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1013. dataSize := uint32(totalSamples * 2)
  1014. var buf [4]byte
  1015. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1016. if _, err := f.Seek(4, 0); err != nil {
  1017. return
  1018. }
  1019. _, _ = f.Write(buf[:])
  1020. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1021. if _, err := f.Seek(24, 0); err != nil {
  1022. return
  1023. }
  1024. _, _ = f.Write(buf[:])
  1025. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1026. if _, err := f.Seek(28, 0); err != nil {
  1027. return
  1028. }
  1029. _, _ = f.Write(buf[:])
  1030. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1031. if _, err := f.Seek(40, 0); err != nil {
  1032. return
  1033. }
  1034. _, _ = f.Write(buf[:])
  1035. }