Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

1439 wiersze
41KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. lastAudioL float32
  37. lastAudioR float32
  38. lastAudioSet bool
  39. // listenOnly sessions have no WAV file and no disk I/O.
  40. // They exist solely to feed audio to live-listen subscribers.
  41. listenOnly bool
  42. // Recording state (nil/zero for listen-only sessions)
  43. dir string
  44. wavFile *os.File
  45. wavBuf *bufio.Writer
  46. wavSamples int64
  47. segmentIdx int
  48. sampleRate int // actual output audio sample rate (always streamAudioRate)
  49. channels int
  50. demodName string
  51. // --- Persistent DSP state for click-free streaming ---
  52. // Overlap-save: tail of previous extracted IQ snippet.
  53. overlapIQ []complex64
  54. // De-emphasis IIR state (persists across frames)
  55. deemphL float64
  56. deemphR float64
  57. // Stereo lock state for live WFM streaming
  58. stereoEnabled bool
  59. stereoOnCount int
  60. stereoOffCount int
  61. // Pilot-locked stereo PLL state (19kHz pilot)
  62. pilotPhase float64
  63. pilotFreq float64
  64. pilotAlpha float64
  65. pilotBeta float64
  66. pilotErrAvg float64
  67. pilotI float64
  68. pilotQ float64
  69. pilotLPAlpha float64
  70. // Polyphase resampler (replaces integer-decimate hack)
  71. monoResampler *dsp.Resampler
  72. monoResamplerRate int
  73. stereoResampler *dsp.StereoResampler
  74. stereoResamplerRate int
  75. // AQ-4: Stateful FIR filters for click-free stereo decode
  76. stereoFilterRate int
  77. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  78. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  79. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  80. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  81. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  82. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  83. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  84. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  85. // and avoids per-frame FIR recomputation)
  86. preDemodFIR *dsp.StatefulFIRComplex
  87. preDemodDecim int // cached decimation factor
  88. preDemodRate int // cached snipRate this FIR was built for
  89. preDemodCutoff float64 // cached cutoff
  90. preDemodDecimPhase int // stateful decimation phase (index offset into next frame)
  91. // AQ-2: De-emphasis config (µs, 0 = disabled)
  92. deemphasisUs float64
  93. // Scratch buffers — reused across frames to avoid GC pressure.
  94. // Grown as needed, never shrunk.
  95. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  96. scratchAudio []float32 // for stereo decode intermediates
  97. scratchPCM []byte // for PCM encoding
  98. // live-listen subscribers
  99. audioSubs []audioSub
  100. }
  101. type audioSub struct {
  102. id int64
  103. ch chan []byte
  104. }
  105. type RuntimeSignalInfo struct {
  106. DemodName string
  107. PlaybackMode string
  108. StereoState string
  109. Channels int
  110. SampleRate int
  111. }
  112. // AudioInfo describes the audio format of a live-listen subscription.
  113. // Sent to the WebSocket client as the first message.
  114. type AudioInfo struct {
  115. SampleRate int `json:"sample_rate"`
  116. Channels int `json:"channels"`
  117. Format string `json:"format"` // always "s16le"
  118. DemodName string `json:"demod"`
  119. PlaybackMode string `json:"playback_mode,omitempty"`
  120. StereoState string `json:"stereo_state,omitempty"`
  121. }
  122. const (
  123. streamAudioRate = 48000
  124. resamplerTaps = 32 // taps per polyphase arm — good quality
  125. )
  126. // ---------------------------------------------------------------------------
  127. // Streamer — manages all active streaming sessions
  128. // ---------------------------------------------------------------------------
  129. type streamFeedItem struct {
  130. signal detector.Signal
  131. snippet []complex64
  132. snipRate int
  133. }
  134. type streamFeedMsg struct {
  135. traceID uint64
  136. items []streamFeedItem
  137. }
  138. type Streamer struct {
  139. mu sync.Mutex
  140. sessions map[int64]*streamSession
  141. policy Policy
  142. centerHz float64
  143. nextSub int64
  144. feedCh chan streamFeedMsg
  145. done chan struct{}
  146. droppedFeed uint64
  147. droppedPCM uint64
  148. lastFeedTS time.Time
  149. lastProcTS time.Time
  150. // pendingListens are subscribers waiting for a matching session.
  151. pendingListens map[int64]*pendingListen
  152. }
  153. type pendingListen struct {
  154. freq float64
  155. bw float64
  156. mode string
  157. ch chan []byte
  158. }
  159. func newStreamer(policy Policy, centerHz float64) *Streamer {
  160. st := &Streamer{
  161. sessions: make(map[int64]*streamSession),
  162. policy: policy,
  163. centerHz: centerHz,
  164. feedCh: make(chan streamFeedMsg, 2),
  165. done: make(chan struct{}),
  166. pendingListens: make(map[int64]*pendingListen),
  167. }
  168. go st.worker()
  169. return st
  170. }
  171. func (st *Streamer) worker() {
  172. for msg := range st.feedCh {
  173. st.processFeed(msg)
  174. }
  175. close(st.done)
  176. }
  177. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  178. st.mu.Lock()
  179. defer st.mu.Unlock()
  180. wasEnabled := st.policy.Enabled
  181. st.policy = policy
  182. st.centerHz = centerHz
  183. // If recording was just disabled, close recording sessions
  184. // but keep listen-only sessions alive.
  185. if wasEnabled && !policy.Enabled {
  186. for id, sess := range st.sessions {
  187. if sess.listenOnly {
  188. continue
  189. }
  190. if len(sess.audioSubs) > 0 {
  191. // Convert to listen-only: close WAV but keep session
  192. convertToListenOnly(sess)
  193. } else {
  194. closeSession(sess, &st.policy)
  195. delete(st.sessions, id)
  196. }
  197. }
  198. }
  199. }
  200. // HasListeners returns true if any sessions have audio subscribers
  201. // or there are pending listen requests. Used by the DSP loop to
  202. // decide whether to feed snippets even when recording is disabled.
  203. func (st *Streamer) HasListeners() bool {
  204. st.mu.Lock()
  205. defer st.mu.Unlock()
  206. return st.hasListenersLocked()
  207. }
  208. func (st *Streamer) hasListenersLocked() bool {
  209. if len(st.pendingListens) > 0 {
  210. return true
  211. }
  212. for _, sess := range st.sessions {
  213. if len(sess.audioSubs) > 0 {
  214. return true
  215. }
  216. }
  217. return false
  218. }
  219. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  220. // Feeds are accepted if:
  221. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  222. // - Any live-listen subscribers exist (listen-only mode)
  223. //
  224. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  225. // data, so items can be passed directly without another copy.
  226. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  227. st.mu.Lock()
  228. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  229. hasListeners := st.hasListenersLocked()
  230. pending := len(st.pendingListens)
  231. debugLiveAudio := st.policy.DebugLiveAudio
  232. now := time.Now()
  233. if !st.lastFeedTS.IsZero() {
  234. gap := now.Sub(st.lastFeedTS)
  235. if gap > 150*time.Millisecond {
  236. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  237. }
  238. }
  239. st.lastFeedTS = now
  240. st.mu.Unlock()
  241. if debugLiveAudio {
  242. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  243. }
  244. if (!recEnabled && !hasListeners) || len(items) == 0 {
  245. return
  246. }
  247. select {
  248. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}:
  249. default:
  250. st.droppedFeed++
  251. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  252. }
  253. }
  254. // processFeed runs in the worker goroutine.
  255. func (st *Streamer) processFeed(msg streamFeedMsg) {
  256. st.mu.Lock()
  257. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  258. hasListeners := st.hasListenersLocked()
  259. now := time.Now()
  260. if !st.lastProcTS.IsZero() {
  261. gap := now.Sub(st.lastProcTS)
  262. if gap > 150*time.Millisecond {
  263. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  264. }
  265. }
  266. st.lastProcTS = now
  267. defer st.mu.Unlock()
  268. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  269. if !recEnabled && !hasListeners {
  270. return
  271. }
  272. seen := make(map[int64]bool, len(msg.items))
  273. for i := range msg.items {
  274. item := &msg.items[i]
  275. sig := &item.signal
  276. seen[sig.ID] = true
  277. if sig.ID == 0 || sig.Class == nil {
  278. continue
  279. }
  280. if len(item.snippet) == 0 || item.snipRate <= 0 {
  281. continue
  282. }
  283. // Decide whether this signal needs a session
  284. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  285. needsListen := st.signalHasListenerLocked(sig)
  286. className := "<nil>"
  287. demodName := ""
  288. if sig.Class != nil {
  289. className = string(sig.Class.ModType)
  290. demodName, _ = resolveDemod(sig)
  291. }
  292. if st.policy.DebugLiveAudio {
  293. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  294. }
  295. if !needsRecording && !needsListen {
  296. continue
  297. }
  298. sess, exists := st.sessions[sig.ID]
  299. requestedMode := ""
  300. for _, pl := range st.pendingListens {
  301. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  302. if m := normalizeRequestedMode(pl.mode); m != "" {
  303. requestedMode = m
  304. break
  305. }
  306. }
  307. }
  308. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  309. for _, sub := range sess.audioSubs {
  310. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  311. }
  312. delete(st.sessions, sig.ID)
  313. sess = nil
  314. exists = false
  315. }
  316. if !exists {
  317. if needsRecording {
  318. s, err := st.openRecordingSession(sig, now)
  319. if err != nil {
  320. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  321. sig.ID, sig.CenterHz/1e6, err)
  322. continue
  323. }
  324. st.sessions[sig.ID] = s
  325. sess = s
  326. } else {
  327. s := st.openListenSession(sig, now)
  328. st.sessions[sig.ID] = s
  329. sess = s
  330. }
  331. // Attach any pending listeners
  332. st.attachPendingListeners(sess)
  333. }
  334. // Update metadata
  335. sess.lastFeed = now
  336. sess.centerHz = sig.CenterHz
  337. sess.bwHz = sig.BWHz
  338. if sig.SNRDb > sess.snrDb {
  339. sess.snrDb = sig.SNRDb
  340. }
  341. if sig.PeakDb > sess.peakDb {
  342. sess.peakDb = sig.PeakDb
  343. }
  344. if sig.Class != nil {
  345. sess.class = sig.Class
  346. }
  347. // Demod with persistent state
  348. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  349. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  350. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  351. if len(audio) == 0 {
  352. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  353. }
  354. if len(audio) > 0 {
  355. if sess.wavSamples == 0 && audioRate > 0 {
  356. sess.sampleRate = audioRate
  357. }
  358. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  359. pcmLen := len(audio) * 2
  360. pcm := sess.growPCM(pcmLen)
  361. for k, s := range audio {
  362. v := int16(clip(s * 32767))
  363. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  364. }
  365. if !sess.listenOnly && sess.wavBuf != nil {
  366. n, err := sess.wavBuf.Write(pcm)
  367. if err != nil {
  368. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  369. } else {
  370. sess.wavSamples += int64(n / 2)
  371. }
  372. }
  373. // Gap logging for live-audio sessions + boundary delta check
  374. if len(sess.audioSubs) > 0 {
  375. if !sess.lastAudioTs.IsZero() {
  376. gap := time.Since(sess.lastAudioTs)
  377. if gap > 150*time.Millisecond {
  378. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  379. }
  380. }
  381. // boundary delta (compare previous last sample with current first sample)
  382. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  383. if sess.lastAudioSet {
  384. if sess.channels > 1 && len(audio) >= 2 {
  385. dL := float64(audio[0] - sess.lastAudioL)
  386. dR := float64(audio[1] - sess.lastAudioR)
  387. if dL < 0 { dL = -dL }
  388. if dR < 0 { dR = -dR }
  389. if dL > 0.2 || dR > 0.2 {
  390. logging.Warn("boundary", "audio_step", "signal", sess.signalID, "dL", dL, "dR", dR)
  391. }
  392. } else {
  393. d := float64(audio[0] - sess.lastAudioL)
  394. if d < 0 { d = -d }
  395. if d > 0.2 {
  396. logging.Warn("boundary", "audio_step", "signal", sess.signalID, "dL", d)
  397. }
  398. }
  399. }
  400. // store last sample
  401. if sess.channels > 1 {
  402. lastIdx := (len(audio)-2)
  403. if lastIdx < 0 { lastIdx = 0 }
  404. sess.lastAudioL = audio[lastIdx]
  405. sess.lastAudioR = audio[lastIdx+1]
  406. } else {
  407. sess.lastAudioL = audio[len(audio)-1]
  408. sess.lastAudioR = 0
  409. }
  410. sess.lastAudioSet = true
  411. }
  412. sess.lastAudioTs = time.Now()
  413. }
  414. st.fanoutPCM(sess, pcm, pcmLen)
  415. }
  416. // Segment split (recording sessions only)
  417. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  418. segIdx := sess.segmentIdx + 1
  419. oldSubs := sess.audioSubs
  420. oldState := sess.captureDSPState()
  421. sess.audioSubs = nil
  422. closeSession(sess, &st.policy)
  423. s, err := st.openRecordingSession(sig, now)
  424. if err != nil {
  425. delete(st.sessions, sig.ID)
  426. continue
  427. }
  428. s.segmentIdx = segIdx
  429. s.audioSubs = oldSubs
  430. s.restoreDSPState(oldState)
  431. st.sessions[sig.ID] = s
  432. }
  433. }
  434. // Close sessions for disappeared signals (with grace period)
  435. for id, sess := range st.sessions {
  436. if seen[id] {
  437. continue
  438. }
  439. gracePeriod := 3 * time.Second
  440. if sess.listenOnly {
  441. gracePeriod = 5 * time.Second
  442. }
  443. if now.Sub(sess.lastFeed) > gracePeriod {
  444. for _, sub := range sess.audioSubs {
  445. close(sub.ch)
  446. }
  447. sess.audioSubs = nil
  448. if !sess.listenOnly {
  449. closeSession(sess, &st.policy)
  450. }
  451. delete(st.sessions, id)
  452. }
  453. }
  454. }
  455. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  456. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  457. if st.policy.DebugLiveAudio {
  458. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  459. }
  460. return true
  461. }
  462. for subID, pl := range st.pendingListens {
  463. delta := math.Abs(sig.CenterHz - pl.freq)
  464. if delta < 200000 {
  465. if st.policy.DebugLiveAudio {
  466. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  467. }
  468. return true
  469. }
  470. }
  471. return false
  472. }
  473. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  474. for subID, pl := range st.pendingListens {
  475. requestedMode := normalizeRequestedMode(pl.mode)
  476. if requestedMode != "" && sess.demodName != requestedMode {
  477. continue
  478. }
  479. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  480. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  481. delete(st.pendingListens, subID)
  482. // Send updated audio_info now that we know the real session params.
  483. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  484. infoJSON, _ := json.Marshal(sess.audioInfo())
  485. tagged := make([]byte, 1+len(infoJSON))
  486. tagged[0] = 0x00 // tag: audio_info
  487. copy(tagged[1:], infoJSON)
  488. select {
  489. case pl.ch <- tagged:
  490. default:
  491. }
  492. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  493. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  494. }
  495. }
  496. }
  497. // CloseAll finalises all sessions and stops the worker goroutine.
  498. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  499. st.mu.Lock()
  500. defer st.mu.Unlock()
  501. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  502. for _, sess := range st.sessions {
  503. out[sess.signalID] = RuntimeSignalInfo{
  504. DemodName: sess.demodName,
  505. PlaybackMode: sess.playbackMode,
  506. StereoState: sess.stereoState,
  507. Channels: sess.channels,
  508. SampleRate: sess.sampleRate,
  509. }
  510. }
  511. return out
  512. }
  513. func (st *Streamer) CloseAll() {
  514. close(st.feedCh)
  515. <-st.done
  516. st.mu.Lock()
  517. defer st.mu.Unlock()
  518. for id, sess := range st.sessions {
  519. for _, sub := range sess.audioSubs {
  520. close(sub.ch)
  521. }
  522. sess.audioSubs = nil
  523. if !sess.listenOnly {
  524. closeSession(sess, &st.policy)
  525. }
  526. delete(st.sessions, id)
  527. }
  528. for _, pl := range st.pendingListens {
  529. close(pl.ch)
  530. }
  531. st.pendingListens = nil
  532. }
  533. // ActiveSessions returns the number of open streaming sessions.
  534. func (st *Streamer) ActiveSessions() int {
  535. st.mu.Lock()
  536. defer st.mu.Unlock()
  537. return len(st.sessions)
  538. }
  539. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  540. //
  541. // LL-2: Returns AudioInfo with correct channels and sample rate.
  542. // LL-3: Returns error only on hard failures (nil streamer etc).
  543. //
  544. // If a matching session exists, attaches immediately. Otherwise, the
  545. // subscriber is held as "pending" and will be attached when a matching
  546. // signal appears in the next DSP frame.
  547. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  548. ch := make(chan []byte, 64)
  549. st.mu.Lock()
  550. defer st.mu.Unlock()
  551. st.nextSub++
  552. subID := st.nextSub
  553. requestedMode := normalizeRequestedMode(mode)
  554. // Try to find a matching session
  555. var bestSess *streamSession
  556. bestDist := math.MaxFloat64
  557. for _, sess := range st.sessions {
  558. if requestedMode != "" && sess.demodName != requestedMode {
  559. continue
  560. }
  561. d := math.Abs(sess.centerHz - freq)
  562. if d < bestDist {
  563. bestDist = d
  564. bestSess = sess
  565. }
  566. }
  567. if bestSess != nil && bestDist < 200000 {
  568. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  569. info := bestSess.audioInfo()
  570. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  571. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  572. return subID, ch, info, nil
  573. }
  574. // No matching session yet — add as pending listener
  575. st.pendingListens[subID] = &pendingListen{
  576. freq: freq,
  577. bw: bw,
  578. mode: mode,
  579. ch: ch,
  580. }
  581. info := defaultAudioInfoForMode(mode)
  582. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  583. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  584. return subID, ch, info, nil
  585. }
  586. // UnsubscribeAudio removes a live-listen subscriber.
  587. func (st *Streamer) UnsubscribeAudio(subID int64) {
  588. st.mu.Lock()
  589. defer st.mu.Unlock()
  590. if pl, ok := st.pendingListens[subID]; ok {
  591. close(pl.ch)
  592. delete(st.pendingListens, subID)
  593. return
  594. }
  595. for _, sess := range st.sessions {
  596. for i, sub := range sess.audioSubs {
  597. if sub.id == subID {
  598. close(sub.ch)
  599. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  600. return
  601. }
  602. }
  603. }
  604. }
  605. // ---------------------------------------------------------------------------
  606. // Session: stateful extraction + demod
  607. // ---------------------------------------------------------------------------
  608. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  609. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  610. // output with zero transient artifacts.
  611. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  612. if len(snippet) == 0 || snipRate <= 0 {
  613. return nil, 0
  614. }
  615. isWFMStereo := sess.demodName == "WFM_STEREO"
  616. isWFM := sess.demodName == "WFM" || isWFMStereo
  617. demodName := sess.demodName
  618. if isWFMStereo {
  619. demodName = "WFM"
  620. }
  621. d := demod.Get(demodName)
  622. if d == nil {
  623. d = demod.Get("NFM")
  624. }
  625. if d == nil {
  626. return nil, 0
  627. }
  628. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  629. // The FM discriminator needs iq[i-1] to compute the first output.
  630. // All FIR filtering is now stateful, so no additional overlap is needed.
  631. var fullSnip []complex64
  632. trimSamples := 0
  633. _ = trimSamples
  634. if len(sess.overlapIQ) == 1 {
  635. fullSnip = make([]complex64, 1+len(snippet))
  636. fullSnip[0] = sess.overlapIQ[0]
  637. copy(fullSnip[1:], snippet)
  638. trimSamples = 1
  639. logging.Debug("discrim", "overlap_applied", "signal", sess.signalID, "snip", len(snippet))
  640. } else {
  641. fullSnip = snippet
  642. }
  643. // Save last sample for next frame's FM discriminator
  644. if len(snippet) > 0 {
  645. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  646. }
  647. // --- Stateful anti-alias FIR + decimation to demod rate ---
  648. demodRate := d.OutputSampleRate()
  649. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  650. if decim1 < 1 {
  651. decim1 = 1
  652. }
  653. actualDemodRate := snipRate / decim1
  654. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  655. var dec []complex64
  656. if decim1 > 1 {
  657. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  658. // Lazy-init or reinit stateful FIR if parameters changed
  659. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  660. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  661. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  662. sess.preDemodRate = snipRate
  663. sess.preDemodCutoff = cutoff
  664. sess.preDemodDecim = decim1
  665. sess.preDemodDecimPhase = 0 // reset decimation phase on FIR reinit
  666. }
  667. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  668. dec = dsp.DecimateStateful(filtered, decim1, &sess.preDemodDecimPhase)
  669. } else {
  670. dec = fullSnip
  671. }
  672. // --- FM Demod ---
  673. audio := d.Demod(dec, actualDemodRate)
  674. if len(audio) == 0 {
  675. return nil, 0
  676. }
  677. // --- Trim the 1-sample FM discriminator overlap ---
  678. // TEMP: skip audio trim to test if per-block trimming causes ticks
  679. // --- Stateful stereo decode with conservative lock/hysteresis ---
  680. channels := 1
  681. if isWFMStereo {
  682. sess.playbackMode = "WFM_STEREO"
  683. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  684. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  685. if locked {
  686. sess.stereoOnCount++
  687. sess.stereoOffCount = 0
  688. if sess.stereoOnCount >= 4 {
  689. sess.stereoEnabled = true
  690. }
  691. } else {
  692. sess.stereoOnCount = 0
  693. sess.stereoOffCount++
  694. if sess.stereoOffCount >= 10 {
  695. sess.stereoEnabled = false
  696. }
  697. }
  698. prevPlayback := sess.playbackMode
  699. prevStereo := sess.stereoState
  700. if sess.stereoEnabled && len(stereoAudio) > 0 {
  701. sess.stereoState = "locked"
  702. audio = stereoAudio
  703. } else {
  704. sess.stereoState = "mono-fallback"
  705. dual := make([]float32, len(audio)*2)
  706. for i, s := range audio {
  707. dual[i*2] = s
  708. dual[i*2+1] = s
  709. }
  710. audio = dual
  711. }
  712. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  713. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  714. }
  715. }
  716. // --- Polyphase resample to exact 48kHz ---
  717. if actualDemodRate != streamAudioRate {
  718. if channels > 1 {
  719. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  720. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  721. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  722. sess.stereoResamplerRate = actualDemodRate
  723. }
  724. audio = sess.stereoResampler.Process(audio)
  725. } else {
  726. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  727. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  728. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  729. sess.monoResamplerRate = actualDemodRate
  730. }
  731. audio = sess.monoResampler.Process(audio)
  732. }
  733. }
  734. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  735. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  736. tau := sess.deemphasisUs * 1e-6
  737. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  738. if channels > 1 {
  739. nFrames := len(audio) / channels
  740. yL, yR := sess.deemphL, sess.deemphR
  741. for i := 0; i < nFrames; i++ {
  742. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  743. audio[i*2] = float32(yL)
  744. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  745. audio[i*2+1] = float32(yR)
  746. }
  747. sess.deemphL, sess.deemphR = yL, yR
  748. } else {
  749. y := sess.deemphL
  750. for i := range audio {
  751. y = alpha*y + (1-alpha)*float64(audio[i])
  752. audio[i] = float32(y)
  753. }
  754. sess.deemphL = y
  755. }
  756. }
  757. if isWFM {
  758. for i := range audio {
  759. audio[i] *= 0.35
  760. }
  761. }
  762. return audio, streamAudioRate
  763. }
  764. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  765. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  766. // loopBW is in Hz, sampleRate in samples/sec.
  767. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  768. if sampleRate <= 0 || loopBW <= 0 {
  769. return 0, 0
  770. }
  771. bl := loopBW / float64(sampleRate)
  772. theta := bl / (damping + 0.25/damping)
  773. d := 1 + 2*damping*theta + theta*theta
  774. alpha := (4 * damping * theta) / d
  775. beta := (4 * theta * theta) / d
  776. return alpha, beta
  777. }
  778. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  779. // Uses persistent FIR filter state across frames for click-free stereo.
  780. // Reuses session scratch buffers to minimize allocations.
  781. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  782. if len(mono) == 0 || sampleRate <= 0 {
  783. return nil, false
  784. }
  785. n := len(mono)
  786. // Rebuild rate-dependent stereo filters when sampleRate changes
  787. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  788. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  789. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  790. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  791. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  792. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  793. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  794. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  795. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  796. sess.stereoFilterRate = sampleRate
  797. // Initialize PLL for 19kHz pilot tracking.
  798. sess.pilotPhase = 0
  799. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  800. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  801. sess.pilotErrAvg = 0
  802. sess.pilotI = 0
  803. sess.pilotQ = 0
  804. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  805. }
  806. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  807. scratch := sess.growAudio(n * 5)
  808. lpr := scratch[:n]
  809. bpfLR := scratch[n : 2*n]
  810. lr := scratch[2*n : 3*n]
  811. work1 := scratch[3*n : 4*n]
  812. work2 := scratch[4*n : 5*n]
  813. sess.stereoLPF.ProcessInto(mono, lpr)
  814. // 23-53kHz bandpass for L-R DSB-SC.
  815. sess.stereoBPHi.ProcessInto(mono, work1)
  816. sess.stereoBPLo.ProcessInto(mono, work2)
  817. for i := 0; i < n; i++ {
  818. bpfLR[i] = work1[i] - work2[i]
  819. }
  820. // 19kHz pilot bandpass for PLL.
  821. sess.pilotLPFHi.ProcessInto(mono, work1)
  822. sess.pilotLPFLo.ProcessInto(mono, work2)
  823. for i := 0; i < n; i++ {
  824. work1[i] = work1[i] - work2[i]
  825. }
  826. pilot := work1
  827. phase := sess.pilotPhase
  828. freq := sess.pilotFreq
  829. alpha := sess.pilotAlpha
  830. beta := sess.pilotBeta
  831. iState := sess.pilotI
  832. qState := sess.pilotQ
  833. lpAlpha := sess.pilotLPAlpha
  834. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  835. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  836. var pilotPower float64
  837. var totalPower float64
  838. var errSum float64
  839. for i := 0; i < n; i++ {
  840. p := float64(pilot[i])
  841. sinP, cosP := math.Sincos(phase)
  842. iMix := p * cosP
  843. qMix := p * -sinP
  844. iState += lpAlpha * (iMix - iState)
  845. qState += lpAlpha * (qMix - qState)
  846. err := math.Atan2(qState, iState)
  847. freq += beta * err
  848. if freq < minFreq {
  849. freq = minFreq
  850. } else if freq > maxFreq {
  851. freq = maxFreq
  852. }
  853. phase += freq + alpha*err
  854. if phase > 2*math.Pi {
  855. phase -= 2 * math.Pi
  856. } else if phase < 0 {
  857. phase += 2 * math.Pi
  858. }
  859. totalPower += float64(mono[i]) * float64(mono[i])
  860. pilotPower += p * p
  861. errSum += math.Abs(err)
  862. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  863. }
  864. sess.pilotPhase = phase
  865. sess.pilotFreq = freq
  866. sess.pilotI = iState
  867. sess.pilotQ = qState
  868. blockErr := errSum / float64(n)
  869. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  870. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  871. pilotRatio := 0.0
  872. if totalPower > 0 {
  873. pilotRatio = pilotPower / totalPower
  874. }
  875. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  876. // Lock heuristics: pilot power fraction and PLL phase error stability.
  877. // Pilot power is a small but stable fraction of composite energy; require
  878. // a modest floor plus PLL settling to avoid flapping in noise.
  879. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  880. out := make([]float32, n*2)
  881. for i := 0; i < n; i++ {
  882. out[i*2] = 0.5 * (lpr[i] + lr[i])
  883. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  884. }
  885. return out, locked
  886. }
  887. // dspStateSnapshot captures persistent DSP state for segment splits.
  888. type dspStateSnapshot struct {
  889. overlapIQ []complex64
  890. deemphL float64
  891. deemphR float64
  892. pilotPhase float64
  893. pilotFreq float64
  894. pilotAlpha float64
  895. pilotBeta float64
  896. pilotErrAvg float64
  897. pilotI float64
  898. pilotQ float64
  899. pilotLPAlpha float64
  900. monoResampler *dsp.Resampler
  901. monoResamplerRate int
  902. stereoResampler *dsp.StereoResampler
  903. stereoResamplerRate int
  904. stereoLPF *dsp.StatefulFIRReal
  905. stereoFilterRate int
  906. stereoBPHi *dsp.StatefulFIRReal
  907. stereoBPLo *dsp.StatefulFIRReal
  908. stereoLRLPF *dsp.StatefulFIRReal
  909. stereoAALPF *dsp.StatefulFIRReal
  910. pilotLPFHi *dsp.StatefulFIRReal
  911. pilotLPFLo *dsp.StatefulFIRReal
  912. preDemodFIR *dsp.StatefulFIRComplex
  913. preDemodDecim int
  914. preDemodRate int
  915. preDemodCutoff float64
  916. preDemodDecimPhase int
  917. }
  918. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  919. return dspStateSnapshot{
  920. overlapIQ: sess.overlapIQ,
  921. deemphL: sess.deemphL,
  922. deemphR: sess.deemphR,
  923. pilotPhase: sess.pilotPhase,
  924. pilotFreq: sess.pilotFreq,
  925. pilotAlpha: sess.pilotAlpha,
  926. pilotBeta: sess.pilotBeta,
  927. pilotErrAvg: sess.pilotErrAvg,
  928. pilotI: sess.pilotI,
  929. pilotQ: sess.pilotQ,
  930. pilotLPAlpha: sess.pilotLPAlpha,
  931. monoResampler: sess.monoResampler,
  932. monoResamplerRate: sess.monoResamplerRate,
  933. stereoResampler: sess.stereoResampler,
  934. stereoResamplerRate: sess.stereoResamplerRate,
  935. stereoLPF: sess.stereoLPF,
  936. stereoFilterRate: sess.stereoFilterRate,
  937. stereoBPHi: sess.stereoBPHi,
  938. stereoBPLo: sess.stereoBPLo,
  939. stereoLRLPF: sess.stereoLRLPF,
  940. stereoAALPF: sess.stereoAALPF,
  941. pilotLPFHi: sess.pilotLPFHi,
  942. pilotLPFLo: sess.pilotLPFLo,
  943. preDemodFIR: sess.preDemodFIR,
  944. preDemodDecim: sess.preDemodDecim,
  945. preDemodRate: sess.preDemodRate,
  946. preDemodCutoff: sess.preDemodCutoff,
  947. preDemodDecimPhase: sess.preDemodDecimPhase,
  948. }
  949. }
  950. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  951. sess.overlapIQ = s.overlapIQ
  952. sess.deemphL = s.deemphL
  953. sess.deemphR = s.deemphR
  954. sess.pilotPhase = s.pilotPhase
  955. sess.pilotFreq = s.pilotFreq
  956. sess.pilotAlpha = s.pilotAlpha
  957. sess.pilotBeta = s.pilotBeta
  958. sess.pilotErrAvg = s.pilotErrAvg
  959. sess.pilotI = s.pilotI
  960. sess.pilotQ = s.pilotQ
  961. sess.pilotLPAlpha = s.pilotLPAlpha
  962. sess.monoResampler = s.monoResampler
  963. sess.monoResamplerRate = s.monoResamplerRate
  964. sess.stereoResampler = s.stereoResampler
  965. sess.stereoResamplerRate = s.stereoResamplerRate
  966. sess.stereoLPF = s.stereoLPF
  967. sess.stereoFilterRate = s.stereoFilterRate
  968. sess.stereoBPHi = s.stereoBPHi
  969. sess.stereoBPLo = s.stereoBPLo
  970. sess.stereoLRLPF = s.stereoLRLPF
  971. sess.stereoAALPF = s.stereoAALPF
  972. sess.pilotLPFHi = s.pilotLPFHi
  973. sess.pilotLPFLo = s.pilotLPFLo
  974. sess.preDemodFIR = s.preDemodFIR
  975. sess.preDemodDecim = s.preDemodDecim
  976. sess.preDemodRate = s.preDemodRate
  977. sess.preDemodCutoff = s.preDemodCutoff
  978. sess.preDemodDecimPhase = s.preDemodDecimPhase
  979. }
  980. // ---------------------------------------------------------------------------
  981. // Session management helpers
  982. // ---------------------------------------------------------------------------
  983. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  984. outputDir := st.policy.OutputDir
  985. if outputDir == "" {
  986. outputDir = "data/recordings"
  987. }
  988. demodName, channels := resolveDemod(sig)
  989. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  990. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  991. dir := filepath.Join(outputDir, dirName)
  992. if err := os.MkdirAll(dir, 0o755); err != nil {
  993. return nil, err
  994. }
  995. wavPath := filepath.Join(dir, "audio.wav")
  996. f, err := os.Create(wavPath)
  997. if err != nil {
  998. return nil, err
  999. }
  1000. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1001. f.Close()
  1002. return nil, err
  1003. }
  1004. playbackMode, stereoState := initialPlaybackState(demodName)
  1005. sess := &streamSession{
  1006. signalID: sig.ID,
  1007. centerHz: sig.CenterHz,
  1008. bwHz: sig.BWHz,
  1009. snrDb: sig.SNRDb,
  1010. peakDb: sig.PeakDb,
  1011. class: sig.Class,
  1012. startTime: now,
  1013. lastFeed: now,
  1014. dir: dir,
  1015. wavFile: f,
  1016. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1017. sampleRate: streamAudioRate,
  1018. channels: channels,
  1019. demodName: demodName,
  1020. playbackMode: playbackMode,
  1021. stereoState: stereoState,
  1022. deemphasisUs: st.policy.DeemphasisUs,
  1023. }
  1024. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1025. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1026. return sess, nil
  1027. }
  1028. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1029. demodName, channels := resolveDemod(sig)
  1030. for _, pl := range st.pendingListens {
  1031. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1032. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1033. demodName = requested
  1034. if demodName == "WFM_STEREO" {
  1035. channels = 2
  1036. } else if d := demod.Get(demodName); d != nil {
  1037. channels = d.Channels()
  1038. } else {
  1039. channels = 1
  1040. }
  1041. break
  1042. }
  1043. }
  1044. }
  1045. playbackMode, stereoState := initialPlaybackState(demodName)
  1046. sess := &streamSession{
  1047. signalID: sig.ID,
  1048. centerHz: sig.CenterHz,
  1049. bwHz: sig.BWHz,
  1050. snrDb: sig.SNRDb,
  1051. peakDb: sig.PeakDb,
  1052. class: sig.Class,
  1053. startTime: now,
  1054. lastFeed: now,
  1055. listenOnly: true,
  1056. sampleRate: streamAudioRate,
  1057. channels: channels,
  1058. demodName: demodName,
  1059. playbackMode: playbackMode,
  1060. stereoState: stereoState,
  1061. deemphasisUs: st.policy.DeemphasisUs,
  1062. }
  1063. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1064. sig.ID, sig.CenterHz/1e6, demodName)
  1065. return sess
  1066. }
  1067. func resolveDemod(sig *detector.Signal) (string, int) {
  1068. demodName := "NFM"
  1069. if sig.Class != nil {
  1070. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1071. demodName = n
  1072. }
  1073. }
  1074. channels := 1
  1075. if demodName == "WFM_STEREO" {
  1076. channels = 2
  1077. } else if d := demod.Get(demodName); d != nil {
  1078. channels = d.Channels()
  1079. }
  1080. return demodName, channels
  1081. }
  1082. func initialPlaybackState(demodName string) (string, string) {
  1083. playbackMode := demodName
  1084. stereoState := "mono"
  1085. if demodName == "WFM_STEREO" {
  1086. stereoState = "searching"
  1087. }
  1088. return playbackMode, stereoState
  1089. }
  1090. func (sess *streamSession) audioInfo() AudioInfo {
  1091. return AudioInfo{
  1092. SampleRate: sess.sampleRate,
  1093. Channels: sess.channels,
  1094. Format: "s16le",
  1095. DemodName: sess.demodName,
  1096. PlaybackMode: sess.playbackMode,
  1097. StereoState: sess.stereoState,
  1098. }
  1099. }
  1100. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1101. infoJSON, _ := json.Marshal(info)
  1102. tagged := make([]byte, 1+len(infoJSON))
  1103. tagged[0] = 0x00 // tag: audio_info
  1104. copy(tagged[1:], infoJSON)
  1105. for _, sub := range subs {
  1106. select {
  1107. case sub.ch <- tagged:
  1108. default:
  1109. }
  1110. }
  1111. }
  1112. func defaultAudioInfoForMode(mode string) AudioInfo {
  1113. demodName := "NFM"
  1114. if requested := normalizeRequestedMode(mode); requested != "" {
  1115. demodName = requested
  1116. }
  1117. channels := 1
  1118. if demodName == "WFM_STEREO" {
  1119. channels = 2
  1120. } else if d := demod.Get(demodName); d != nil {
  1121. channels = d.Channels()
  1122. }
  1123. playbackMode, stereoState := initialPlaybackState(demodName)
  1124. return AudioInfo{
  1125. SampleRate: streamAudioRate,
  1126. Channels: channels,
  1127. Format: "s16le",
  1128. DemodName: demodName,
  1129. PlaybackMode: playbackMode,
  1130. StereoState: stereoState,
  1131. }
  1132. }
  1133. func normalizeRequestedMode(mode string) string {
  1134. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1135. case "", "AUTO":
  1136. return ""
  1137. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1138. return strings.ToUpper(strings.TrimSpace(mode))
  1139. default:
  1140. return ""
  1141. }
  1142. }
  1143. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1144. func (sess *streamSession) growIQ(n int) []complex64 {
  1145. if cap(sess.scratchIQ) >= n {
  1146. return sess.scratchIQ[:n]
  1147. }
  1148. sess.scratchIQ = make([]complex64, n, n*5/4)
  1149. return sess.scratchIQ
  1150. }
  1151. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1152. func (sess *streamSession) growAudio(n int) []float32 {
  1153. if cap(sess.scratchAudio) >= n {
  1154. return sess.scratchAudio[:n]
  1155. }
  1156. sess.scratchAudio = make([]float32, n, n*5/4)
  1157. return sess.scratchAudio
  1158. }
  1159. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1160. func (sess *streamSession) growPCM(n int) []byte {
  1161. if cap(sess.scratchPCM) >= n {
  1162. return sess.scratchPCM[:n]
  1163. }
  1164. sess.scratchPCM = make([]byte, n, n*5/4)
  1165. return sess.scratchPCM
  1166. }
  1167. func convertToListenOnly(sess *streamSession) {
  1168. if sess.wavBuf != nil {
  1169. _ = sess.wavBuf.Flush()
  1170. }
  1171. if sess.wavFile != nil {
  1172. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1173. sess.wavFile.Close()
  1174. }
  1175. sess.wavFile = nil
  1176. sess.wavBuf = nil
  1177. sess.listenOnly = true
  1178. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1179. }
  1180. func closeSession(sess *streamSession, policy *Policy) {
  1181. if sess.listenOnly {
  1182. return
  1183. }
  1184. if sess.wavBuf != nil {
  1185. _ = sess.wavBuf.Flush()
  1186. }
  1187. if sess.wavFile != nil {
  1188. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1189. sess.wavFile.Close()
  1190. sess.wavFile = nil
  1191. sess.wavBuf = nil
  1192. }
  1193. dur := sess.lastFeed.Sub(sess.startTime)
  1194. files := map[string]any{
  1195. "audio": "audio.wav",
  1196. "audio_sample_rate": sess.sampleRate,
  1197. "audio_channels": sess.channels,
  1198. "audio_demod": sess.demodName,
  1199. "recording_mode": "streaming",
  1200. }
  1201. meta := Meta{
  1202. EventID: sess.signalID,
  1203. Start: sess.startTime,
  1204. End: sess.lastFeed,
  1205. CenterHz: sess.centerHz,
  1206. BandwidthHz: sess.bwHz,
  1207. SampleRate: sess.sampleRate,
  1208. SNRDb: sess.snrDb,
  1209. PeakDb: sess.peakDb,
  1210. Class: sess.class,
  1211. DurationMs: dur.Milliseconds(),
  1212. Files: files,
  1213. }
  1214. b, err := json.MarshalIndent(meta, "", " ")
  1215. if err == nil {
  1216. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1217. }
  1218. if policy != nil {
  1219. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1220. }
  1221. }
  1222. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1223. if len(sess.audioSubs) == 0 {
  1224. return
  1225. }
  1226. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1227. tagged := make([]byte, 1+pcmLen)
  1228. tagged[0] = 0x01
  1229. copy(tagged[1:], pcm[:pcmLen])
  1230. alive := sess.audioSubs[:0]
  1231. for _, sub := range sess.audioSubs {
  1232. select {
  1233. case sub.ch <- tagged:
  1234. default:
  1235. st.droppedPCM++
  1236. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1237. }
  1238. alive = append(alive, sub)
  1239. }
  1240. sess.audioSubs = alive
  1241. }
  1242. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1243. if len(st.policy.ClassFilter) == 0 {
  1244. return true
  1245. }
  1246. if cls == nil {
  1247. return false
  1248. }
  1249. for _, f := range st.policy.ClassFilter {
  1250. if strings.EqualFold(f, string(cls.ModType)) {
  1251. return true
  1252. }
  1253. }
  1254. return false
  1255. }
  1256. // ErrNoSession is returned when no matching signal session exists.
  1257. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1258. // ---------------------------------------------------------------------------
  1259. // WAV header helpers
  1260. // ---------------------------------------------------------------------------
  1261. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1262. if channels <= 0 {
  1263. channels = 1
  1264. }
  1265. hdr := make([]byte, 44)
  1266. copy(hdr[0:4], "RIFF")
  1267. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1268. copy(hdr[8:12], "WAVE")
  1269. copy(hdr[12:16], "fmt ")
  1270. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1271. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1272. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1273. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1274. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1275. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1276. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1277. copy(hdr[36:40], "data")
  1278. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1279. _, err := f.Write(hdr)
  1280. return err
  1281. }
  1282. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1283. dataSize := uint32(totalSamples * 2)
  1284. var buf [4]byte
  1285. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1286. if _, err := f.Seek(4, 0); err != nil {
  1287. return
  1288. }
  1289. _, _ = f.Write(buf[:])
  1290. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1291. if _, err := f.Seek(24, 0); err != nil {
  1292. return
  1293. }
  1294. _, _ = f.Write(buf[:])
  1295. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1296. if _, err := f.Seek(28, 0); err != nil {
  1297. return
  1298. }
  1299. _, _ = f.Write(buf[:])
  1300. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1301. if _, err := f.Seek(40, 0); err != nil {
  1302. return
  1303. }
  1304. _, _ = f.Write(buf[:])
  1305. }