Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

1434 lines
41KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. lastAudioL float32
  37. lastAudioR float32
  38. lastAudioSet bool
  39. // listenOnly sessions have no WAV file and no disk I/O.
  40. // They exist solely to feed audio to live-listen subscribers.
  41. listenOnly bool
  42. // Recording state (nil/zero for listen-only sessions)
  43. dir string
  44. wavFile *os.File
  45. wavBuf *bufio.Writer
  46. wavSamples int64
  47. segmentIdx int
  48. sampleRate int // actual output audio sample rate (always streamAudioRate)
  49. channels int
  50. demodName string
  51. // --- Persistent DSP state for click-free streaming ---
  52. // Overlap-save: tail of previous extracted IQ snippet.
  53. overlapIQ []complex64
  54. // De-emphasis IIR state (persists across frames)
  55. deemphL float64
  56. deemphR float64
  57. // Stereo lock state for live WFM streaming
  58. stereoEnabled bool
  59. stereoOnCount int
  60. stereoOffCount int
  61. // Pilot-locked stereo PLL state (19kHz pilot)
  62. pilotPhase float64
  63. pilotFreq float64
  64. pilotAlpha float64
  65. pilotBeta float64
  66. pilotErrAvg float64
  67. pilotI float64
  68. pilotQ float64
  69. pilotLPAlpha float64
  70. // Polyphase resampler (replaces integer-decimate hack)
  71. monoResampler *dsp.Resampler
  72. monoResamplerRate int
  73. stereoResampler *dsp.StereoResampler
  74. stereoResamplerRate int
  75. // AQ-4: Stateful FIR filters for click-free stereo decode
  76. stereoFilterRate int
  77. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  78. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  79. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  80. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  81. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  82. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  83. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  84. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  85. // and avoids per-frame FIR recomputation)
  86. preDemodFIR *dsp.StatefulFIRComplex
  87. preDemodDecim int // cached decimation factor
  88. preDemodRate int // cached snipRate this FIR was built for
  89. preDemodCutoff float64 // cached cutoff
  90. // AQ-2: De-emphasis config (µs, 0 = disabled)
  91. deemphasisUs float64
  92. // Scratch buffers — reused across frames to avoid GC pressure.
  93. // Grown as needed, never shrunk.
  94. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  95. scratchAudio []float32 // for stereo decode intermediates
  96. scratchPCM []byte // for PCM encoding
  97. // live-listen subscribers
  98. audioSubs []audioSub
  99. }
  100. type audioSub struct {
  101. id int64
  102. ch chan []byte
  103. }
  104. type RuntimeSignalInfo struct {
  105. DemodName string
  106. PlaybackMode string
  107. StereoState string
  108. Channels int
  109. SampleRate int
  110. }
  111. // AudioInfo describes the audio format of a live-listen subscription.
  112. // Sent to the WebSocket client as the first message.
  113. type AudioInfo struct {
  114. SampleRate int `json:"sample_rate"`
  115. Channels int `json:"channels"`
  116. Format string `json:"format"` // always "s16le"
  117. DemodName string `json:"demod"`
  118. PlaybackMode string `json:"playback_mode,omitempty"`
  119. StereoState string `json:"stereo_state,omitempty"`
  120. }
  121. const (
  122. streamAudioRate = 48000
  123. resamplerTaps = 32 // taps per polyphase arm — good quality
  124. )
  125. // ---------------------------------------------------------------------------
  126. // Streamer — manages all active streaming sessions
  127. // ---------------------------------------------------------------------------
  128. type streamFeedItem struct {
  129. signal detector.Signal
  130. snippet []complex64
  131. snipRate int
  132. }
  133. type streamFeedMsg struct {
  134. traceID uint64
  135. items []streamFeedItem
  136. }
  137. type Streamer struct {
  138. mu sync.Mutex
  139. sessions map[int64]*streamSession
  140. policy Policy
  141. centerHz float64
  142. nextSub int64
  143. feedCh chan streamFeedMsg
  144. done chan struct{}
  145. droppedFeed uint64
  146. droppedPCM uint64
  147. lastFeedTS time.Time
  148. lastProcTS time.Time
  149. // pendingListens are subscribers waiting for a matching session.
  150. pendingListens map[int64]*pendingListen
  151. }
  152. type pendingListen struct {
  153. freq float64
  154. bw float64
  155. mode string
  156. ch chan []byte
  157. }
  158. func newStreamer(policy Policy, centerHz float64) *Streamer {
  159. st := &Streamer{
  160. sessions: make(map[int64]*streamSession),
  161. policy: policy,
  162. centerHz: centerHz,
  163. feedCh: make(chan streamFeedMsg, 2),
  164. done: make(chan struct{}),
  165. pendingListens: make(map[int64]*pendingListen),
  166. }
  167. go st.worker()
  168. return st
  169. }
  170. func (st *Streamer) worker() {
  171. for msg := range st.feedCh {
  172. st.processFeed(msg)
  173. }
  174. close(st.done)
  175. }
  176. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  177. st.mu.Lock()
  178. defer st.mu.Unlock()
  179. wasEnabled := st.policy.Enabled
  180. st.policy = policy
  181. st.centerHz = centerHz
  182. // If recording was just disabled, close recording sessions
  183. // but keep listen-only sessions alive.
  184. if wasEnabled && !policy.Enabled {
  185. for id, sess := range st.sessions {
  186. if sess.listenOnly {
  187. continue
  188. }
  189. if len(sess.audioSubs) > 0 {
  190. // Convert to listen-only: close WAV but keep session
  191. convertToListenOnly(sess)
  192. } else {
  193. closeSession(sess, &st.policy)
  194. delete(st.sessions, id)
  195. }
  196. }
  197. }
  198. }
  199. // HasListeners returns true if any sessions have audio subscribers
  200. // or there are pending listen requests. Used by the DSP loop to
  201. // decide whether to feed snippets even when recording is disabled.
  202. func (st *Streamer) HasListeners() bool {
  203. st.mu.Lock()
  204. defer st.mu.Unlock()
  205. return st.hasListenersLocked()
  206. }
  207. func (st *Streamer) hasListenersLocked() bool {
  208. if len(st.pendingListens) > 0 {
  209. return true
  210. }
  211. for _, sess := range st.sessions {
  212. if len(sess.audioSubs) > 0 {
  213. return true
  214. }
  215. }
  216. return false
  217. }
  218. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  219. // Feeds are accepted if:
  220. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  221. // - Any live-listen subscribers exist (listen-only mode)
  222. //
  223. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  224. // data, so items can be passed directly without another copy.
  225. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  226. st.mu.Lock()
  227. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  228. hasListeners := st.hasListenersLocked()
  229. pending := len(st.pendingListens)
  230. debugLiveAudio := st.policy.DebugLiveAudio
  231. now := time.Now()
  232. if !st.lastFeedTS.IsZero() {
  233. gap := now.Sub(st.lastFeedTS)
  234. if gap > 150*time.Millisecond {
  235. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  236. }
  237. }
  238. st.lastFeedTS = now
  239. st.mu.Unlock()
  240. if debugLiveAudio {
  241. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  242. }
  243. if (!recEnabled && !hasListeners) || len(items) == 0 {
  244. return
  245. }
  246. select {
  247. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}:
  248. default:
  249. st.droppedFeed++
  250. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  251. }
  252. }
  253. // processFeed runs in the worker goroutine.
  254. func (st *Streamer) processFeed(msg streamFeedMsg) {
  255. st.mu.Lock()
  256. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  257. hasListeners := st.hasListenersLocked()
  258. now := time.Now()
  259. if !st.lastProcTS.IsZero() {
  260. gap := now.Sub(st.lastProcTS)
  261. if gap > 150*time.Millisecond {
  262. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  263. }
  264. }
  265. st.lastProcTS = now
  266. defer st.mu.Unlock()
  267. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  268. if !recEnabled && !hasListeners {
  269. return
  270. }
  271. seen := make(map[int64]bool, len(msg.items))
  272. for i := range msg.items {
  273. item := &msg.items[i]
  274. sig := &item.signal
  275. seen[sig.ID] = true
  276. if sig.ID == 0 || sig.Class == nil {
  277. continue
  278. }
  279. if len(item.snippet) == 0 || item.snipRate <= 0 {
  280. continue
  281. }
  282. // Decide whether this signal needs a session
  283. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  284. needsListen := st.signalHasListenerLocked(sig)
  285. className := "<nil>"
  286. demodName := ""
  287. if sig.Class != nil {
  288. className = string(sig.Class.ModType)
  289. demodName, _ = resolveDemod(sig)
  290. }
  291. if st.policy.DebugLiveAudio {
  292. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  293. }
  294. if !needsRecording && !needsListen {
  295. continue
  296. }
  297. sess, exists := st.sessions[sig.ID]
  298. requestedMode := ""
  299. for _, pl := range st.pendingListens {
  300. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  301. if m := normalizeRequestedMode(pl.mode); m != "" {
  302. requestedMode = m
  303. break
  304. }
  305. }
  306. }
  307. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  308. for _, sub := range sess.audioSubs {
  309. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  310. }
  311. delete(st.sessions, sig.ID)
  312. sess = nil
  313. exists = false
  314. }
  315. if !exists {
  316. if needsRecording {
  317. s, err := st.openRecordingSession(sig, now)
  318. if err != nil {
  319. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  320. sig.ID, sig.CenterHz/1e6, err)
  321. continue
  322. }
  323. st.sessions[sig.ID] = s
  324. sess = s
  325. } else {
  326. s := st.openListenSession(sig, now)
  327. st.sessions[sig.ID] = s
  328. sess = s
  329. }
  330. // Attach any pending listeners
  331. st.attachPendingListeners(sess)
  332. }
  333. // Update metadata
  334. sess.lastFeed = now
  335. sess.centerHz = sig.CenterHz
  336. sess.bwHz = sig.BWHz
  337. if sig.SNRDb > sess.snrDb {
  338. sess.snrDb = sig.SNRDb
  339. }
  340. if sig.PeakDb > sess.peakDb {
  341. sess.peakDb = sig.PeakDb
  342. }
  343. if sig.Class != nil {
  344. sess.class = sig.Class
  345. }
  346. // Demod with persistent state
  347. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  348. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  349. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  350. if len(audio) == 0 {
  351. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  352. }
  353. if len(audio) > 0 {
  354. if sess.wavSamples == 0 && audioRate > 0 {
  355. sess.sampleRate = audioRate
  356. }
  357. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  358. pcmLen := len(audio) * 2
  359. pcm := sess.growPCM(pcmLen)
  360. for k, s := range audio {
  361. v := int16(clip(s * 32767))
  362. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  363. }
  364. if !sess.listenOnly && sess.wavBuf != nil {
  365. n, err := sess.wavBuf.Write(pcm)
  366. if err != nil {
  367. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  368. } else {
  369. sess.wavSamples += int64(n / 2)
  370. }
  371. }
  372. // Gap logging for live-audio sessions + boundary delta check
  373. if len(sess.audioSubs) > 0 {
  374. if !sess.lastAudioTs.IsZero() {
  375. gap := time.Since(sess.lastAudioTs)
  376. if gap > 150*time.Millisecond {
  377. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  378. }
  379. }
  380. // boundary delta (compare previous last sample with current first sample)
  381. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  382. if sess.lastAudioSet {
  383. if sess.channels > 1 && len(audio) >= 2 {
  384. dL := float64(audio[0] - sess.lastAudioL)
  385. dR := float64(audio[1] - sess.lastAudioR)
  386. if dL < 0 { dL = -dL }
  387. if dR < 0 { dR = -dR }
  388. if dL > 0.2 || dR > 0.2 {
  389. logging.Warn("boundary", "audio_step", "signal", sess.signalID, "dL", dL, "dR", dR)
  390. }
  391. } else {
  392. d := float64(audio[0] - sess.lastAudioL)
  393. if d < 0 { d = -d }
  394. if d > 0.2 {
  395. logging.Warn("boundary", "audio_step", "signal", sess.signalID, "dL", d)
  396. }
  397. }
  398. }
  399. // store last sample
  400. if sess.channels > 1 {
  401. lastIdx := (len(audio)-2)
  402. if lastIdx < 0 { lastIdx = 0 }
  403. sess.lastAudioL = audio[lastIdx]
  404. sess.lastAudioR = audio[lastIdx+1]
  405. } else {
  406. sess.lastAudioL = audio[len(audio)-1]
  407. sess.lastAudioR = 0
  408. }
  409. sess.lastAudioSet = true
  410. }
  411. sess.lastAudioTs = time.Now()
  412. }
  413. st.fanoutPCM(sess, pcm, pcmLen)
  414. }
  415. // Segment split (recording sessions only)
  416. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  417. segIdx := sess.segmentIdx + 1
  418. oldSubs := sess.audioSubs
  419. oldState := sess.captureDSPState()
  420. sess.audioSubs = nil
  421. closeSession(sess, &st.policy)
  422. s, err := st.openRecordingSession(sig, now)
  423. if err != nil {
  424. delete(st.sessions, sig.ID)
  425. continue
  426. }
  427. s.segmentIdx = segIdx
  428. s.audioSubs = oldSubs
  429. s.restoreDSPState(oldState)
  430. st.sessions[sig.ID] = s
  431. }
  432. }
  433. // Close sessions for disappeared signals (with grace period)
  434. for id, sess := range st.sessions {
  435. if seen[id] {
  436. continue
  437. }
  438. gracePeriod := 3 * time.Second
  439. if sess.listenOnly {
  440. gracePeriod = 5 * time.Second
  441. }
  442. if now.Sub(sess.lastFeed) > gracePeriod {
  443. for _, sub := range sess.audioSubs {
  444. close(sub.ch)
  445. }
  446. sess.audioSubs = nil
  447. if !sess.listenOnly {
  448. closeSession(sess, &st.policy)
  449. }
  450. delete(st.sessions, id)
  451. }
  452. }
  453. }
  454. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  455. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  456. if st.policy.DebugLiveAudio {
  457. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  458. }
  459. return true
  460. }
  461. for subID, pl := range st.pendingListens {
  462. delta := math.Abs(sig.CenterHz - pl.freq)
  463. if delta < 200000 {
  464. if st.policy.DebugLiveAudio {
  465. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  466. }
  467. return true
  468. }
  469. }
  470. return false
  471. }
  472. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  473. for subID, pl := range st.pendingListens {
  474. requestedMode := normalizeRequestedMode(pl.mode)
  475. if requestedMode != "" && sess.demodName != requestedMode {
  476. continue
  477. }
  478. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  479. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  480. delete(st.pendingListens, subID)
  481. // Send updated audio_info now that we know the real session params.
  482. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  483. infoJSON, _ := json.Marshal(sess.audioInfo())
  484. tagged := make([]byte, 1+len(infoJSON))
  485. tagged[0] = 0x00 // tag: audio_info
  486. copy(tagged[1:], infoJSON)
  487. select {
  488. case pl.ch <- tagged:
  489. default:
  490. }
  491. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  492. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  493. }
  494. }
  495. }
  496. // CloseAll finalises all sessions and stops the worker goroutine.
  497. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  498. st.mu.Lock()
  499. defer st.mu.Unlock()
  500. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  501. for _, sess := range st.sessions {
  502. out[sess.signalID] = RuntimeSignalInfo{
  503. DemodName: sess.demodName,
  504. PlaybackMode: sess.playbackMode,
  505. StereoState: sess.stereoState,
  506. Channels: sess.channels,
  507. SampleRate: sess.sampleRate,
  508. }
  509. }
  510. return out
  511. }
  512. func (st *Streamer) CloseAll() {
  513. close(st.feedCh)
  514. <-st.done
  515. st.mu.Lock()
  516. defer st.mu.Unlock()
  517. for id, sess := range st.sessions {
  518. for _, sub := range sess.audioSubs {
  519. close(sub.ch)
  520. }
  521. sess.audioSubs = nil
  522. if !sess.listenOnly {
  523. closeSession(sess, &st.policy)
  524. }
  525. delete(st.sessions, id)
  526. }
  527. for _, pl := range st.pendingListens {
  528. close(pl.ch)
  529. }
  530. st.pendingListens = nil
  531. }
  532. // ActiveSessions returns the number of open streaming sessions.
  533. func (st *Streamer) ActiveSessions() int {
  534. st.mu.Lock()
  535. defer st.mu.Unlock()
  536. return len(st.sessions)
  537. }
  538. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  539. //
  540. // LL-2: Returns AudioInfo with correct channels and sample rate.
  541. // LL-3: Returns error only on hard failures (nil streamer etc).
  542. //
  543. // If a matching session exists, attaches immediately. Otherwise, the
  544. // subscriber is held as "pending" and will be attached when a matching
  545. // signal appears in the next DSP frame.
  546. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  547. ch := make(chan []byte, 64)
  548. st.mu.Lock()
  549. defer st.mu.Unlock()
  550. st.nextSub++
  551. subID := st.nextSub
  552. requestedMode := normalizeRequestedMode(mode)
  553. // Try to find a matching session
  554. var bestSess *streamSession
  555. bestDist := math.MaxFloat64
  556. for _, sess := range st.sessions {
  557. if requestedMode != "" && sess.demodName != requestedMode {
  558. continue
  559. }
  560. d := math.Abs(sess.centerHz - freq)
  561. if d < bestDist {
  562. bestDist = d
  563. bestSess = sess
  564. }
  565. }
  566. if bestSess != nil && bestDist < 200000 {
  567. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  568. info := bestSess.audioInfo()
  569. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  570. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  571. return subID, ch, info, nil
  572. }
  573. // No matching session yet — add as pending listener
  574. st.pendingListens[subID] = &pendingListen{
  575. freq: freq,
  576. bw: bw,
  577. mode: mode,
  578. ch: ch,
  579. }
  580. info := defaultAudioInfoForMode(mode)
  581. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  582. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  583. return subID, ch, info, nil
  584. }
  585. // UnsubscribeAudio removes a live-listen subscriber.
  586. func (st *Streamer) UnsubscribeAudio(subID int64) {
  587. st.mu.Lock()
  588. defer st.mu.Unlock()
  589. if pl, ok := st.pendingListens[subID]; ok {
  590. close(pl.ch)
  591. delete(st.pendingListens, subID)
  592. return
  593. }
  594. for _, sess := range st.sessions {
  595. for i, sub := range sess.audioSubs {
  596. if sub.id == subID {
  597. close(sub.ch)
  598. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  599. return
  600. }
  601. }
  602. }
  603. }
  604. // ---------------------------------------------------------------------------
  605. // Session: stateful extraction + demod
  606. // ---------------------------------------------------------------------------
  607. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  608. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  609. // output with zero transient artifacts.
  610. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  611. if len(snippet) == 0 || snipRate <= 0 {
  612. return nil, 0
  613. }
  614. isWFMStereo := sess.demodName == "WFM_STEREO"
  615. isWFM := sess.demodName == "WFM" || isWFMStereo
  616. demodName := sess.demodName
  617. if isWFMStereo {
  618. demodName = "WFM"
  619. }
  620. d := demod.Get(demodName)
  621. if d == nil {
  622. d = demod.Get("NFM")
  623. }
  624. if d == nil {
  625. return nil, 0
  626. }
  627. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  628. // The FM discriminator needs iq[i-1] to compute the first output.
  629. // All FIR filtering is now stateful, so no additional overlap is needed.
  630. var fullSnip []complex64
  631. trimSamples := 0
  632. _ = trimSamples
  633. if len(sess.overlapIQ) == 1 {
  634. fullSnip = make([]complex64, 1+len(snippet))
  635. fullSnip[0] = sess.overlapIQ[0]
  636. copy(fullSnip[1:], snippet)
  637. trimSamples = 1
  638. logging.Debug("discrim", "overlap_applied", "signal", sess.signalID, "snip", len(snippet))
  639. } else {
  640. fullSnip = snippet
  641. }
  642. // Save last sample for next frame's FM discriminator
  643. if len(snippet) > 0 {
  644. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  645. }
  646. // --- Stateful anti-alias FIR + decimation to demod rate ---
  647. demodRate := d.OutputSampleRate()
  648. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  649. if decim1 < 1 {
  650. decim1 = 1
  651. }
  652. actualDemodRate := snipRate / decim1
  653. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  654. var dec []complex64
  655. if decim1 > 1 {
  656. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  657. // Lazy-init or reinit stateful FIR if parameters changed
  658. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  659. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  660. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  661. sess.preDemodRate = snipRate
  662. sess.preDemodCutoff = cutoff
  663. sess.preDemodDecim = decim1
  664. }
  665. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  666. dec = dsp.Decimate(filtered, decim1)
  667. } else {
  668. dec = fullSnip
  669. }
  670. // --- FM Demod ---
  671. audio := d.Demod(dec, actualDemodRate)
  672. if len(audio) == 0 {
  673. return nil, 0
  674. }
  675. // --- Trim the 1-sample FM discriminator overlap ---
  676. // TEMP: skip audio trim to test if per-block trimming causes ticks
  677. // --- Stateful stereo decode with conservative lock/hysteresis ---
  678. channels := 1
  679. if isWFMStereo {
  680. sess.playbackMode = "WFM_STEREO"
  681. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  682. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  683. if locked {
  684. sess.stereoOnCount++
  685. sess.stereoOffCount = 0
  686. if sess.stereoOnCount >= 4 {
  687. sess.stereoEnabled = true
  688. }
  689. } else {
  690. sess.stereoOnCount = 0
  691. sess.stereoOffCount++
  692. if sess.stereoOffCount >= 10 {
  693. sess.stereoEnabled = false
  694. }
  695. }
  696. prevPlayback := sess.playbackMode
  697. prevStereo := sess.stereoState
  698. if sess.stereoEnabled && len(stereoAudio) > 0 {
  699. sess.stereoState = "locked"
  700. audio = stereoAudio
  701. } else {
  702. sess.stereoState = "mono-fallback"
  703. dual := make([]float32, len(audio)*2)
  704. for i, s := range audio {
  705. dual[i*2] = s
  706. dual[i*2+1] = s
  707. }
  708. audio = dual
  709. }
  710. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  711. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  712. }
  713. }
  714. // --- Polyphase resample to exact 48kHz ---
  715. if actualDemodRate != streamAudioRate {
  716. if channels > 1 {
  717. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  718. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  719. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  720. sess.stereoResamplerRate = actualDemodRate
  721. }
  722. audio = sess.stereoResampler.Process(audio)
  723. } else {
  724. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  725. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  726. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  727. sess.monoResamplerRate = actualDemodRate
  728. }
  729. audio = sess.monoResampler.Process(audio)
  730. }
  731. }
  732. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  733. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  734. tau := sess.deemphasisUs * 1e-6
  735. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  736. if channels > 1 {
  737. nFrames := len(audio) / channels
  738. yL, yR := sess.deemphL, sess.deemphR
  739. for i := 0; i < nFrames; i++ {
  740. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  741. audio[i*2] = float32(yL)
  742. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  743. audio[i*2+1] = float32(yR)
  744. }
  745. sess.deemphL, sess.deemphR = yL, yR
  746. } else {
  747. y := sess.deemphL
  748. for i := range audio {
  749. y = alpha*y + (1-alpha)*float64(audio[i])
  750. audio[i] = float32(y)
  751. }
  752. sess.deemphL = y
  753. }
  754. }
  755. if isWFM {
  756. for i := range audio {
  757. audio[i] *= 0.35
  758. }
  759. }
  760. return audio, streamAudioRate
  761. }
  762. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  763. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  764. // loopBW is in Hz, sampleRate in samples/sec.
  765. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  766. if sampleRate <= 0 || loopBW <= 0 {
  767. return 0, 0
  768. }
  769. bl := loopBW / float64(sampleRate)
  770. theta := bl / (damping + 0.25/damping)
  771. d := 1 + 2*damping*theta + theta*theta
  772. alpha := (4 * damping * theta) / d
  773. beta := (4 * theta * theta) / d
  774. return alpha, beta
  775. }
  776. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  777. // Uses persistent FIR filter state across frames for click-free stereo.
  778. // Reuses session scratch buffers to minimize allocations.
  779. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  780. if len(mono) == 0 || sampleRate <= 0 {
  781. return nil, false
  782. }
  783. n := len(mono)
  784. // Rebuild rate-dependent stereo filters when sampleRate changes
  785. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  786. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  787. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  788. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  789. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  790. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  791. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  792. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  793. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  794. sess.stereoFilterRate = sampleRate
  795. // Initialize PLL for 19kHz pilot tracking.
  796. sess.pilotPhase = 0
  797. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  798. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  799. sess.pilotErrAvg = 0
  800. sess.pilotI = 0
  801. sess.pilotQ = 0
  802. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  803. }
  804. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  805. scratch := sess.growAudio(n * 5)
  806. lpr := scratch[:n]
  807. bpfLR := scratch[n : 2*n]
  808. lr := scratch[2*n : 3*n]
  809. work1 := scratch[3*n : 4*n]
  810. work2 := scratch[4*n : 5*n]
  811. sess.stereoLPF.ProcessInto(mono, lpr)
  812. // 23-53kHz bandpass for L-R DSB-SC.
  813. sess.stereoBPHi.ProcessInto(mono, work1)
  814. sess.stereoBPLo.ProcessInto(mono, work2)
  815. for i := 0; i < n; i++ {
  816. bpfLR[i] = work1[i] - work2[i]
  817. }
  818. // 19kHz pilot bandpass for PLL.
  819. sess.pilotLPFHi.ProcessInto(mono, work1)
  820. sess.pilotLPFLo.ProcessInto(mono, work2)
  821. for i := 0; i < n; i++ {
  822. work1[i] = work1[i] - work2[i]
  823. }
  824. pilot := work1
  825. phase := sess.pilotPhase
  826. freq := sess.pilotFreq
  827. alpha := sess.pilotAlpha
  828. beta := sess.pilotBeta
  829. iState := sess.pilotI
  830. qState := sess.pilotQ
  831. lpAlpha := sess.pilotLPAlpha
  832. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  833. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  834. var pilotPower float64
  835. var totalPower float64
  836. var errSum float64
  837. for i := 0; i < n; i++ {
  838. p := float64(pilot[i])
  839. sinP, cosP := math.Sincos(phase)
  840. iMix := p * cosP
  841. qMix := p * -sinP
  842. iState += lpAlpha * (iMix - iState)
  843. qState += lpAlpha * (qMix - qState)
  844. err := math.Atan2(qState, iState)
  845. freq += beta * err
  846. if freq < minFreq {
  847. freq = minFreq
  848. } else if freq > maxFreq {
  849. freq = maxFreq
  850. }
  851. phase += freq + alpha*err
  852. if phase > 2*math.Pi {
  853. phase -= 2 * math.Pi
  854. } else if phase < 0 {
  855. phase += 2 * math.Pi
  856. }
  857. totalPower += float64(mono[i]) * float64(mono[i])
  858. pilotPower += p * p
  859. errSum += math.Abs(err)
  860. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  861. }
  862. sess.pilotPhase = phase
  863. sess.pilotFreq = freq
  864. sess.pilotI = iState
  865. sess.pilotQ = qState
  866. blockErr := errSum / float64(n)
  867. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  868. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  869. pilotRatio := 0.0
  870. if totalPower > 0 {
  871. pilotRatio = pilotPower / totalPower
  872. }
  873. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  874. // Lock heuristics: pilot power fraction and PLL phase error stability.
  875. // Pilot power is a small but stable fraction of composite energy; require
  876. // a modest floor plus PLL settling to avoid flapping in noise.
  877. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  878. out := make([]float32, n*2)
  879. for i := 0; i < n; i++ {
  880. out[i*2] = 0.5 * (lpr[i] + lr[i])
  881. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  882. }
  883. return out, locked
  884. }
  885. // dspStateSnapshot captures persistent DSP state for segment splits.
  886. type dspStateSnapshot struct {
  887. overlapIQ []complex64
  888. deemphL float64
  889. deemphR float64
  890. pilotPhase float64
  891. pilotFreq float64
  892. pilotAlpha float64
  893. pilotBeta float64
  894. pilotErrAvg float64
  895. pilotI float64
  896. pilotQ float64
  897. pilotLPAlpha float64
  898. monoResampler *dsp.Resampler
  899. monoResamplerRate int
  900. stereoResampler *dsp.StereoResampler
  901. stereoResamplerRate int
  902. stereoLPF *dsp.StatefulFIRReal
  903. stereoFilterRate int
  904. stereoBPHi *dsp.StatefulFIRReal
  905. stereoBPLo *dsp.StatefulFIRReal
  906. stereoLRLPF *dsp.StatefulFIRReal
  907. stereoAALPF *dsp.StatefulFIRReal
  908. pilotLPFHi *dsp.StatefulFIRReal
  909. pilotLPFLo *dsp.StatefulFIRReal
  910. preDemodFIR *dsp.StatefulFIRComplex
  911. preDemodDecim int
  912. preDemodRate int
  913. preDemodCutoff float64
  914. }
  915. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  916. return dspStateSnapshot{
  917. overlapIQ: sess.overlapIQ,
  918. deemphL: sess.deemphL,
  919. deemphR: sess.deemphR,
  920. pilotPhase: sess.pilotPhase,
  921. pilotFreq: sess.pilotFreq,
  922. pilotAlpha: sess.pilotAlpha,
  923. pilotBeta: sess.pilotBeta,
  924. pilotErrAvg: sess.pilotErrAvg,
  925. pilotI: sess.pilotI,
  926. pilotQ: sess.pilotQ,
  927. pilotLPAlpha: sess.pilotLPAlpha,
  928. monoResampler: sess.monoResampler,
  929. monoResamplerRate: sess.monoResamplerRate,
  930. stereoResampler: sess.stereoResampler,
  931. stereoResamplerRate: sess.stereoResamplerRate,
  932. stereoLPF: sess.stereoLPF,
  933. stereoFilterRate: sess.stereoFilterRate,
  934. stereoBPHi: sess.stereoBPHi,
  935. stereoBPLo: sess.stereoBPLo,
  936. stereoLRLPF: sess.stereoLRLPF,
  937. stereoAALPF: sess.stereoAALPF,
  938. pilotLPFHi: sess.pilotLPFHi,
  939. pilotLPFLo: sess.pilotLPFLo,
  940. preDemodFIR: sess.preDemodFIR,
  941. preDemodDecim: sess.preDemodDecim,
  942. preDemodRate: sess.preDemodRate,
  943. preDemodCutoff: sess.preDemodCutoff,
  944. }
  945. }
  946. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  947. sess.overlapIQ = s.overlapIQ
  948. sess.deemphL = s.deemphL
  949. sess.deemphR = s.deemphR
  950. sess.pilotPhase = s.pilotPhase
  951. sess.pilotFreq = s.pilotFreq
  952. sess.pilotAlpha = s.pilotAlpha
  953. sess.pilotBeta = s.pilotBeta
  954. sess.pilotErrAvg = s.pilotErrAvg
  955. sess.pilotI = s.pilotI
  956. sess.pilotQ = s.pilotQ
  957. sess.pilotLPAlpha = s.pilotLPAlpha
  958. sess.monoResampler = s.monoResampler
  959. sess.monoResamplerRate = s.monoResamplerRate
  960. sess.stereoResampler = s.stereoResampler
  961. sess.stereoResamplerRate = s.stereoResamplerRate
  962. sess.stereoLPF = s.stereoLPF
  963. sess.stereoFilterRate = s.stereoFilterRate
  964. sess.stereoBPHi = s.stereoBPHi
  965. sess.stereoBPLo = s.stereoBPLo
  966. sess.stereoLRLPF = s.stereoLRLPF
  967. sess.stereoAALPF = s.stereoAALPF
  968. sess.pilotLPFHi = s.pilotLPFHi
  969. sess.pilotLPFLo = s.pilotLPFLo
  970. sess.preDemodFIR = s.preDemodFIR
  971. sess.preDemodDecim = s.preDemodDecim
  972. sess.preDemodRate = s.preDemodRate
  973. sess.preDemodCutoff = s.preDemodCutoff
  974. }
  975. // ---------------------------------------------------------------------------
  976. // Session management helpers
  977. // ---------------------------------------------------------------------------
  978. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  979. outputDir := st.policy.OutputDir
  980. if outputDir == "" {
  981. outputDir = "data/recordings"
  982. }
  983. demodName, channels := resolveDemod(sig)
  984. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  985. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  986. dir := filepath.Join(outputDir, dirName)
  987. if err := os.MkdirAll(dir, 0o755); err != nil {
  988. return nil, err
  989. }
  990. wavPath := filepath.Join(dir, "audio.wav")
  991. f, err := os.Create(wavPath)
  992. if err != nil {
  993. return nil, err
  994. }
  995. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  996. f.Close()
  997. return nil, err
  998. }
  999. playbackMode, stereoState := initialPlaybackState(demodName)
  1000. sess := &streamSession{
  1001. signalID: sig.ID,
  1002. centerHz: sig.CenterHz,
  1003. bwHz: sig.BWHz,
  1004. snrDb: sig.SNRDb,
  1005. peakDb: sig.PeakDb,
  1006. class: sig.Class,
  1007. startTime: now,
  1008. lastFeed: now,
  1009. dir: dir,
  1010. wavFile: f,
  1011. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1012. sampleRate: streamAudioRate,
  1013. channels: channels,
  1014. demodName: demodName,
  1015. playbackMode: playbackMode,
  1016. stereoState: stereoState,
  1017. deemphasisUs: st.policy.DeemphasisUs,
  1018. }
  1019. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1020. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1021. return sess, nil
  1022. }
  1023. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1024. demodName, channels := resolveDemod(sig)
  1025. for _, pl := range st.pendingListens {
  1026. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1027. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1028. demodName = requested
  1029. if demodName == "WFM_STEREO" {
  1030. channels = 2
  1031. } else if d := demod.Get(demodName); d != nil {
  1032. channels = d.Channels()
  1033. } else {
  1034. channels = 1
  1035. }
  1036. break
  1037. }
  1038. }
  1039. }
  1040. playbackMode, stereoState := initialPlaybackState(demodName)
  1041. sess := &streamSession{
  1042. signalID: sig.ID,
  1043. centerHz: sig.CenterHz,
  1044. bwHz: sig.BWHz,
  1045. snrDb: sig.SNRDb,
  1046. peakDb: sig.PeakDb,
  1047. class: sig.Class,
  1048. startTime: now,
  1049. lastFeed: now,
  1050. listenOnly: true,
  1051. sampleRate: streamAudioRate,
  1052. channels: channels,
  1053. demodName: demodName,
  1054. playbackMode: playbackMode,
  1055. stereoState: stereoState,
  1056. deemphasisUs: st.policy.DeemphasisUs,
  1057. }
  1058. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1059. sig.ID, sig.CenterHz/1e6, demodName)
  1060. return sess
  1061. }
  1062. func resolveDemod(sig *detector.Signal) (string, int) {
  1063. demodName := "NFM"
  1064. if sig.Class != nil {
  1065. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1066. demodName = n
  1067. }
  1068. }
  1069. channels := 1
  1070. if demodName == "WFM_STEREO" {
  1071. channels = 2
  1072. } else if d := demod.Get(demodName); d != nil {
  1073. channels = d.Channels()
  1074. }
  1075. return demodName, channels
  1076. }
  1077. func initialPlaybackState(demodName string) (string, string) {
  1078. playbackMode := demodName
  1079. stereoState := "mono"
  1080. if demodName == "WFM_STEREO" {
  1081. stereoState = "searching"
  1082. }
  1083. return playbackMode, stereoState
  1084. }
  1085. func (sess *streamSession) audioInfo() AudioInfo {
  1086. return AudioInfo{
  1087. SampleRate: sess.sampleRate,
  1088. Channels: sess.channels,
  1089. Format: "s16le",
  1090. DemodName: sess.demodName,
  1091. PlaybackMode: sess.playbackMode,
  1092. StereoState: sess.stereoState,
  1093. }
  1094. }
  1095. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1096. infoJSON, _ := json.Marshal(info)
  1097. tagged := make([]byte, 1+len(infoJSON))
  1098. tagged[0] = 0x00 // tag: audio_info
  1099. copy(tagged[1:], infoJSON)
  1100. for _, sub := range subs {
  1101. select {
  1102. case sub.ch <- tagged:
  1103. default:
  1104. }
  1105. }
  1106. }
  1107. func defaultAudioInfoForMode(mode string) AudioInfo {
  1108. demodName := "NFM"
  1109. if requested := normalizeRequestedMode(mode); requested != "" {
  1110. demodName = requested
  1111. }
  1112. channels := 1
  1113. if demodName == "WFM_STEREO" {
  1114. channels = 2
  1115. } else if d := demod.Get(demodName); d != nil {
  1116. channels = d.Channels()
  1117. }
  1118. playbackMode, stereoState := initialPlaybackState(demodName)
  1119. return AudioInfo{
  1120. SampleRate: streamAudioRate,
  1121. Channels: channels,
  1122. Format: "s16le",
  1123. DemodName: demodName,
  1124. PlaybackMode: playbackMode,
  1125. StereoState: stereoState,
  1126. }
  1127. }
  1128. func normalizeRequestedMode(mode string) string {
  1129. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1130. case "", "AUTO":
  1131. return ""
  1132. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1133. return strings.ToUpper(strings.TrimSpace(mode))
  1134. default:
  1135. return ""
  1136. }
  1137. }
  1138. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1139. func (sess *streamSession) growIQ(n int) []complex64 {
  1140. if cap(sess.scratchIQ) >= n {
  1141. return sess.scratchIQ[:n]
  1142. }
  1143. sess.scratchIQ = make([]complex64, n, n*5/4)
  1144. return sess.scratchIQ
  1145. }
  1146. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1147. func (sess *streamSession) growAudio(n int) []float32 {
  1148. if cap(sess.scratchAudio) >= n {
  1149. return sess.scratchAudio[:n]
  1150. }
  1151. sess.scratchAudio = make([]float32, n, n*5/4)
  1152. return sess.scratchAudio
  1153. }
  1154. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1155. func (sess *streamSession) growPCM(n int) []byte {
  1156. if cap(sess.scratchPCM) >= n {
  1157. return sess.scratchPCM[:n]
  1158. }
  1159. sess.scratchPCM = make([]byte, n, n*5/4)
  1160. return sess.scratchPCM
  1161. }
  1162. func convertToListenOnly(sess *streamSession) {
  1163. if sess.wavBuf != nil {
  1164. _ = sess.wavBuf.Flush()
  1165. }
  1166. if sess.wavFile != nil {
  1167. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1168. sess.wavFile.Close()
  1169. }
  1170. sess.wavFile = nil
  1171. sess.wavBuf = nil
  1172. sess.listenOnly = true
  1173. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1174. }
  1175. func closeSession(sess *streamSession, policy *Policy) {
  1176. if sess.listenOnly {
  1177. return
  1178. }
  1179. if sess.wavBuf != nil {
  1180. _ = sess.wavBuf.Flush()
  1181. }
  1182. if sess.wavFile != nil {
  1183. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1184. sess.wavFile.Close()
  1185. sess.wavFile = nil
  1186. sess.wavBuf = nil
  1187. }
  1188. dur := sess.lastFeed.Sub(sess.startTime)
  1189. files := map[string]any{
  1190. "audio": "audio.wav",
  1191. "audio_sample_rate": sess.sampleRate,
  1192. "audio_channels": sess.channels,
  1193. "audio_demod": sess.demodName,
  1194. "recording_mode": "streaming",
  1195. }
  1196. meta := Meta{
  1197. EventID: sess.signalID,
  1198. Start: sess.startTime,
  1199. End: sess.lastFeed,
  1200. CenterHz: sess.centerHz,
  1201. BandwidthHz: sess.bwHz,
  1202. SampleRate: sess.sampleRate,
  1203. SNRDb: sess.snrDb,
  1204. PeakDb: sess.peakDb,
  1205. Class: sess.class,
  1206. DurationMs: dur.Milliseconds(),
  1207. Files: files,
  1208. }
  1209. b, err := json.MarshalIndent(meta, "", " ")
  1210. if err == nil {
  1211. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1212. }
  1213. if policy != nil {
  1214. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1215. }
  1216. }
  1217. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1218. if len(sess.audioSubs) == 0 {
  1219. return
  1220. }
  1221. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1222. tagged := make([]byte, 1+pcmLen)
  1223. tagged[0] = 0x01
  1224. copy(tagged[1:], pcm[:pcmLen])
  1225. alive := sess.audioSubs[:0]
  1226. for _, sub := range sess.audioSubs {
  1227. select {
  1228. case sub.ch <- tagged:
  1229. default:
  1230. st.droppedPCM++
  1231. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1232. }
  1233. alive = append(alive, sub)
  1234. }
  1235. sess.audioSubs = alive
  1236. }
  1237. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1238. if len(st.policy.ClassFilter) == 0 {
  1239. return true
  1240. }
  1241. if cls == nil {
  1242. return false
  1243. }
  1244. for _, f := range st.policy.ClassFilter {
  1245. if strings.EqualFold(f, string(cls.ModType)) {
  1246. return true
  1247. }
  1248. }
  1249. return false
  1250. }
  1251. // ErrNoSession is returned when no matching signal session exists.
  1252. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1253. // ---------------------------------------------------------------------------
  1254. // WAV header helpers
  1255. // ---------------------------------------------------------------------------
  1256. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1257. if channels <= 0 {
  1258. channels = 1
  1259. }
  1260. hdr := make([]byte, 44)
  1261. copy(hdr[0:4], "RIFF")
  1262. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1263. copy(hdr[8:12], "WAVE")
  1264. copy(hdr[12:16], "fmt ")
  1265. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1266. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1267. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1268. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1269. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1270. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1271. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1272. copy(hdr[36:40], "data")
  1273. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1274. _, err := f.Write(hdr)
  1275. return err
  1276. }
  1277. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1278. dataSize := uint32(totalSamples * 2)
  1279. var buf [4]byte
  1280. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1281. if _, err := f.Seek(4, 0); err != nil {
  1282. return
  1283. }
  1284. _, _ = f.Write(buf[:])
  1285. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1286. if _, err := f.Seek(24, 0); err != nil {
  1287. return
  1288. }
  1289. _, _ = f.Write(buf[:])
  1290. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1291. if _, err := f.Seek(28, 0); err != nil {
  1292. return
  1293. }
  1294. _, _ = f.Write(buf[:])
  1295. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1296. if _, err := f.Seek(40, 0); err != nil {
  1297. return
  1298. }
  1299. _, _ = f.Write(buf[:])
  1300. }