Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

1473 rindas
42KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. lastAudioL float32
  37. lastAudioR float32
  38. prevAudioL float64 // second-to-last L sample for boundary transient detection
  39. lastAudioSet bool
  40. // listenOnly sessions have no WAV file and no disk I/O.
  41. // They exist solely to feed audio to live-listen subscribers.
  42. listenOnly bool
  43. // Recording state (nil/zero for listen-only sessions)
  44. dir string
  45. wavFile *os.File
  46. wavBuf *bufio.Writer
  47. wavSamples int64
  48. segmentIdx int
  49. sampleRate int // actual output audio sample rate (always streamAudioRate)
  50. channels int
  51. demodName string
  52. // --- Persistent DSP state for click-free streaming ---
  53. // Overlap-save: tail of previous extracted IQ snippet.
  54. overlapIQ []complex64
  55. // De-emphasis IIR state (persists across frames)
  56. deemphL float64
  57. deemphR float64
  58. // Stereo lock state for live WFM streaming
  59. stereoEnabled bool
  60. stereoOnCount int
  61. stereoOffCount int
  62. // Pilot-locked stereo PLL state (19kHz pilot)
  63. pilotPhase float64
  64. pilotFreq float64
  65. pilotAlpha float64
  66. pilotBeta float64
  67. pilotErrAvg float64
  68. pilotI float64
  69. pilotQ float64
  70. pilotLPAlpha float64
  71. // Polyphase resampler (replaces integer-decimate hack)
  72. monoResampler *dsp.Resampler
  73. monoResamplerRate int
  74. stereoResampler *dsp.StereoResampler
  75. stereoResamplerRate int
  76. // AQ-4: Stateful FIR filters for click-free stereo decode
  77. stereoFilterRate int
  78. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  79. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  80. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  81. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  82. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  83. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  84. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  85. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  86. // and avoids per-frame FIR recomputation)
  87. preDemodFIR *dsp.StatefulFIRComplex
  88. preDemodDecim int // cached decimation factor
  89. preDemodRate int // cached snipRate this FIR was built for
  90. preDemodCutoff float64 // cached cutoff
  91. preDemodDecimPhase int // stateful decimation phase (index offset into next frame)
  92. // AQ-2: De-emphasis config (µs, 0 = disabled)
  93. deemphasisUs float64
  94. // Scratch buffers — reused across frames to avoid GC pressure.
  95. // Grown as needed, never shrunk.
  96. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  97. scratchAudio []float32 // for stereo decode intermediates
  98. scratchPCM []byte // for PCM encoding
  99. // live-listen subscribers
  100. audioSubs []audioSub
  101. }
  102. type audioSub struct {
  103. id int64
  104. ch chan []byte
  105. }
  106. type RuntimeSignalInfo struct {
  107. DemodName string
  108. PlaybackMode string
  109. StereoState string
  110. Channels int
  111. SampleRate int
  112. }
  113. // AudioInfo describes the audio format of a live-listen subscription.
  114. // Sent to the WebSocket client as the first message.
  115. type AudioInfo struct {
  116. SampleRate int `json:"sample_rate"`
  117. Channels int `json:"channels"`
  118. Format string `json:"format"` // always "s16le"
  119. DemodName string `json:"demod"`
  120. PlaybackMode string `json:"playback_mode,omitempty"`
  121. StereoState string `json:"stereo_state,omitempty"`
  122. }
  123. const (
  124. streamAudioRate = 48000
  125. resamplerTaps = 32 // taps per polyphase arm — good quality
  126. )
  127. // ---------------------------------------------------------------------------
  128. // Streamer — manages all active streaming sessions
  129. // ---------------------------------------------------------------------------
  130. type streamFeedItem struct {
  131. signal detector.Signal
  132. snippet []complex64
  133. snipRate int
  134. }
  135. type streamFeedMsg struct {
  136. traceID uint64
  137. items []streamFeedItem
  138. }
  139. type Streamer struct {
  140. mu sync.Mutex
  141. sessions map[int64]*streamSession
  142. policy Policy
  143. centerHz float64
  144. nextSub int64
  145. feedCh chan streamFeedMsg
  146. done chan struct{}
  147. droppedFeed uint64
  148. droppedPCM uint64
  149. lastFeedTS time.Time
  150. lastProcTS time.Time
  151. // pendingListens are subscribers waiting for a matching session.
  152. pendingListens map[int64]*pendingListen
  153. }
  154. type pendingListen struct {
  155. freq float64
  156. bw float64
  157. mode string
  158. ch chan []byte
  159. }
  160. func newStreamer(policy Policy, centerHz float64) *Streamer {
  161. st := &Streamer{
  162. sessions: make(map[int64]*streamSession),
  163. policy: policy,
  164. centerHz: centerHz,
  165. feedCh: make(chan streamFeedMsg, 2),
  166. done: make(chan struct{}),
  167. pendingListens: make(map[int64]*pendingListen),
  168. }
  169. go st.worker()
  170. return st
  171. }
  172. func (st *Streamer) worker() {
  173. for msg := range st.feedCh {
  174. st.processFeed(msg)
  175. }
  176. close(st.done)
  177. }
  178. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  179. st.mu.Lock()
  180. defer st.mu.Unlock()
  181. wasEnabled := st.policy.Enabled
  182. st.policy = policy
  183. st.centerHz = centerHz
  184. // If recording was just disabled, close recording sessions
  185. // but keep listen-only sessions alive.
  186. if wasEnabled && !policy.Enabled {
  187. for id, sess := range st.sessions {
  188. if sess.listenOnly {
  189. continue
  190. }
  191. if len(sess.audioSubs) > 0 {
  192. // Convert to listen-only: close WAV but keep session
  193. convertToListenOnly(sess)
  194. } else {
  195. closeSession(sess, &st.policy)
  196. delete(st.sessions, id)
  197. }
  198. }
  199. }
  200. }
  201. // HasListeners returns true if any sessions have audio subscribers
  202. // or there are pending listen requests. Used by the DSP loop to
  203. // decide whether to feed snippets even when recording is disabled.
  204. func (st *Streamer) HasListeners() bool {
  205. st.mu.Lock()
  206. defer st.mu.Unlock()
  207. return st.hasListenersLocked()
  208. }
  209. func (st *Streamer) hasListenersLocked() bool {
  210. if len(st.pendingListens) > 0 {
  211. return true
  212. }
  213. for _, sess := range st.sessions {
  214. if len(sess.audioSubs) > 0 {
  215. return true
  216. }
  217. }
  218. return false
  219. }
  220. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  221. // Feeds are accepted if:
  222. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  223. // - Any live-listen subscribers exist (listen-only mode)
  224. //
  225. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  226. // data, so items can be passed directly without another copy.
  227. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  228. st.mu.Lock()
  229. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  230. hasListeners := st.hasListenersLocked()
  231. pending := len(st.pendingListens)
  232. debugLiveAudio := st.policy.DebugLiveAudio
  233. now := time.Now()
  234. if !st.lastFeedTS.IsZero() {
  235. gap := now.Sub(st.lastFeedTS)
  236. if gap > 150*time.Millisecond {
  237. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  238. }
  239. }
  240. st.lastFeedTS = now
  241. st.mu.Unlock()
  242. if debugLiveAudio {
  243. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  244. }
  245. if (!recEnabled && !hasListeners) || len(items) == 0 {
  246. return
  247. }
  248. select {
  249. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}:
  250. default:
  251. st.droppedFeed++
  252. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  253. }
  254. }
  255. // processFeed runs in the worker goroutine.
  256. func (st *Streamer) processFeed(msg streamFeedMsg) {
  257. st.mu.Lock()
  258. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  259. hasListeners := st.hasListenersLocked()
  260. now := time.Now()
  261. if !st.lastProcTS.IsZero() {
  262. gap := now.Sub(st.lastProcTS)
  263. if gap > 150*time.Millisecond {
  264. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  265. }
  266. }
  267. st.lastProcTS = now
  268. defer st.mu.Unlock()
  269. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  270. if !recEnabled && !hasListeners {
  271. return
  272. }
  273. seen := make(map[int64]bool, len(msg.items))
  274. for i := range msg.items {
  275. item := &msg.items[i]
  276. sig := &item.signal
  277. seen[sig.ID] = true
  278. if sig.ID == 0 || sig.Class == nil {
  279. continue
  280. }
  281. if len(item.snippet) == 0 || item.snipRate <= 0 {
  282. continue
  283. }
  284. // Decide whether this signal needs a session
  285. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  286. needsListen := st.signalHasListenerLocked(sig)
  287. className := "<nil>"
  288. demodName := ""
  289. if sig.Class != nil {
  290. className = string(sig.Class.ModType)
  291. demodName, _ = resolveDemod(sig)
  292. }
  293. if st.policy.DebugLiveAudio {
  294. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  295. }
  296. if !needsRecording && !needsListen {
  297. continue
  298. }
  299. sess, exists := st.sessions[sig.ID]
  300. requestedMode := ""
  301. for _, pl := range st.pendingListens {
  302. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  303. if m := normalizeRequestedMode(pl.mode); m != "" {
  304. requestedMode = m
  305. break
  306. }
  307. }
  308. }
  309. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  310. for _, sub := range sess.audioSubs {
  311. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  312. }
  313. delete(st.sessions, sig.ID)
  314. sess = nil
  315. exists = false
  316. }
  317. if !exists {
  318. if needsRecording {
  319. s, err := st.openRecordingSession(sig, now)
  320. if err != nil {
  321. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  322. sig.ID, sig.CenterHz/1e6, err)
  323. continue
  324. }
  325. st.sessions[sig.ID] = s
  326. sess = s
  327. } else {
  328. s := st.openListenSession(sig, now)
  329. st.sessions[sig.ID] = s
  330. sess = s
  331. }
  332. // Attach any pending listeners
  333. st.attachPendingListeners(sess)
  334. }
  335. // Update metadata
  336. sess.lastFeed = now
  337. sess.centerHz = sig.CenterHz
  338. sess.bwHz = sig.BWHz
  339. if sig.SNRDb > sess.snrDb {
  340. sess.snrDb = sig.SNRDb
  341. }
  342. if sig.PeakDb > sess.peakDb {
  343. sess.peakDb = sig.PeakDb
  344. }
  345. if sig.Class != nil {
  346. sess.class = sig.Class
  347. }
  348. // Demod with persistent state
  349. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  350. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  351. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  352. if len(audio) == 0 {
  353. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  354. }
  355. if len(audio) > 0 {
  356. if sess.wavSamples == 0 && audioRate > 0 {
  357. sess.sampleRate = audioRate
  358. }
  359. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  360. pcmLen := len(audio) * 2
  361. pcm := sess.growPCM(pcmLen)
  362. for k, s := range audio {
  363. v := int16(clip(s * 32767))
  364. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  365. }
  366. if !sess.listenOnly && sess.wavBuf != nil {
  367. n, err := sess.wavBuf.Write(pcm)
  368. if err != nil {
  369. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  370. } else {
  371. sess.wavSamples += int64(n / 2)
  372. }
  373. }
  374. // Gap logging for live-audio sessions + transient click detector
  375. if len(sess.audioSubs) > 0 {
  376. if !sess.lastAudioTs.IsZero() {
  377. gap := time.Since(sess.lastAudioTs)
  378. if gap > 150*time.Millisecond {
  379. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  380. }
  381. }
  382. // Transient click detector: finds short impulses (1-3 samples)
  383. // that deviate sharply from the local signal trend.
  384. // A click looks like: ...smooth... SPIKE ...smooth...
  385. // Normal FM audio has large deltas too, but they follow
  386. // a continuous curve. A click has high |d2/dt2| (acceleration).
  387. //
  388. // Method: second-derivative detector. For each sample triplet
  389. // (a, b, c), compute |2b - a - c| which is the discrete
  390. // second derivative magnitude. High values = transient spike.
  391. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  392. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  393. stride := sess.channels
  394. if stride < 1 {
  395. stride = 1
  396. }
  397. nFrames := len(audio) / stride
  398. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  399. if sess.lastAudioSet && nFrames >= 1 {
  400. // second derivative across boundary: |2*last - prevLast - first|
  401. first := float64(audio[0])
  402. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  403. if d2 > 0.15 {
  404. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  405. }
  406. }
  407. // Intra-frame transient scan (L channel only for performance)
  408. nClicks := 0
  409. maxD2 := float64(0)
  410. maxD2Pos := 0
  411. for k := 1; k < nFrames-1; k++ {
  412. a := float64(audio[(k-1)*stride])
  413. b := float64(audio[k*stride])
  414. c := float64(audio[(k+1)*stride])
  415. d2 := math.Abs(2*b - a - c)
  416. if d2 > maxD2 {
  417. maxD2 = d2
  418. maxD2Pos = k
  419. }
  420. if d2 > 0.15 {
  421. nClicks++
  422. }
  423. }
  424. if nClicks > 0 {
  425. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  426. }
  427. // Store last two samples for next frame's boundary check
  428. if nFrames >= 2 {
  429. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  430. sess.lastAudioL = audio[(nFrames-1)*stride]
  431. if stride > 1 {
  432. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  433. }
  434. } else if nFrames == 1 {
  435. sess.prevAudioL = float64(sess.lastAudioL)
  436. sess.lastAudioL = audio[0]
  437. if stride > 1 && len(audio) >= 2 {
  438. sess.lastAudioR = audio[1]
  439. }
  440. }
  441. sess.lastAudioSet = true
  442. }
  443. sess.lastAudioTs = time.Now()
  444. }
  445. st.fanoutPCM(sess, pcm, pcmLen)
  446. }
  447. // Segment split (recording sessions only)
  448. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  449. segIdx := sess.segmentIdx + 1
  450. oldSubs := sess.audioSubs
  451. oldState := sess.captureDSPState()
  452. sess.audioSubs = nil
  453. closeSession(sess, &st.policy)
  454. s, err := st.openRecordingSession(sig, now)
  455. if err != nil {
  456. delete(st.sessions, sig.ID)
  457. continue
  458. }
  459. s.segmentIdx = segIdx
  460. s.audioSubs = oldSubs
  461. s.restoreDSPState(oldState)
  462. st.sessions[sig.ID] = s
  463. }
  464. }
  465. // Close sessions for disappeared signals (with grace period)
  466. for id, sess := range st.sessions {
  467. if seen[id] {
  468. continue
  469. }
  470. gracePeriod := 3 * time.Second
  471. if sess.listenOnly {
  472. gracePeriod = 5 * time.Second
  473. }
  474. if now.Sub(sess.lastFeed) > gracePeriod {
  475. for _, sub := range sess.audioSubs {
  476. close(sub.ch)
  477. }
  478. sess.audioSubs = nil
  479. if !sess.listenOnly {
  480. closeSession(sess, &st.policy)
  481. }
  482. delete(st.sessions, id)
  483. }
  484. }
  485. }
  486. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  487. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  488. if st.policy.DebugLiveAudio {
  489. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  490. }
  491. return true
  492. }
  493. for subID, pl := range st.pendingListens {
  494. delta := math.Abs(sig.CenterHz - pl.freq)
  495. if delta < 200000 {
  496. if st.policy.DebugLiveAudio {
  497. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  498. }
  499. return true
  500. }
  501. }
  502. return false
  503. }
  504. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  505. for subID, pl := range st.pendingListens {
  506. requestedMode := normalizeRequestedMode(pl.mode)
  507. if requestedMode != "" && sess.demodName != requestedMode {
  508. continue
  509. }
  510. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  511. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  512. delete(st.pendingListens, subID)
  513. // Send updated audio_info now that we know the real session params.
  514. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  515. infoJSON, _ := json.Marshal(sess.audioInfo())
  516. tagged := make([]byte, 1+len(infoJSON))
  517. tagged[0] = 0x00 // tag: audio_info
  518. copy(tagged[1:], infoJSON)
  519. select {
  520. case pl.ch <- tagged:
  521. default:
  522. }
  523. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  524. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  525. }
  526. }
  527. }
  528. // CloseAll finalises all sessions and stops the worker goroutine.
  529. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  530. st.mu.Lock()
  531. defer st.mu.Unlock()
  532. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  533. for _, sess := range st.sessions {
  534. out[sess.signalID] = RuntimeSignalInfo{
  535. DemodName: sess.demodName,
  536. PlaybackMode: sess.playbackMode,
  537. StereoState: sess.stereoState,
  538. Channels: sess.channels,
  539. SampleRate: sess.sampleRate,
  540. }
  541. }
  542. return out
  543. }
  544. func (st *Streamer) CloseAll() {
  545. close(st.feedCh)
  546. <-st.done
  547. st.mu.Lock()
  548. defer st.mu.Unlock()
  549. for id, sess := range st.sessions {
  550. for _, sub := range sess.audioSubs {
  551. close(sub.ch)
  552. }
  553. sess.audioSubs = nil
  554. if !sess.listenOnly {
  555. closeSession(sess, &st.policy)
  556. }
  557. delete(st.sessions, id)
  558. }
  559. for _, pl := range st.pendingListens {
  560. close(pl.ch)
  561. }
  562. st.pendingListens = nil
  563. }
  564. // ActiveSessions returns the number of open streaming sessions.
  565. func (st *Streamer) ActiveSessions() int {
  566. st.mu.Lock()
  567. defer st.mu.Unlock()
  568. return len(st.sessions)
  569. }
  570. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  571. //
  572. // LL-2: Returns AudioInfo with correct channels and sample rate.
  573. // LL-3: Returns error only on hard failures (nil streamer etc).
  574. //
  575. // If a matching session exists, attaches immediately. Otherwise, the
  576. // subscriber is held as "pending" and will be attached when a matching
  577. // signal appears in the next DSP frame.
  578. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  579. ch := make(chan []byte, 64)
  580. st.mu.Lock()
  581. defer st.mu.Unlock()
  582. st.nextSub++
  583. subID := st.nextSub
  584. requestedMode := normalizeRequestedMode(mode)
  585. // Try to find a matching session
  586. var bestSess *streamSession
  587. bestDist := math.MaxFloat64
  588. for _, sess := range st.sessions {
  589. if requestedMode != "" && sess.demodName != requestedMode {
  590. continue
  591. }
  592. d := math.Abs(sess.centerHz - freq)
  593. if d < bestDist {
  594. bestDist = d
  595. bestSess = sess
  596. }
  597. }
  598. if bestSess != nil && bestDist < 200000 {
  599. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  600. info := bestSess.audioInfo()
  601. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  602. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  603. return subID, ch, info, nil
  604. }
  605. // No matching session yet — add as pending listener
  606. st.pendingListens[subID] = &pendingListen{
  607. freq: freq,
  608. bw: bw,
  609. mode: mode,
  610. ch: ch,
  611. }
  612. info := defaultAudioInfoForMode(mode)
  613. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  614. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  615. return subID, ch, info, nil
  616. }
  617. // UnsubscribeAudio removes a live-listen subscriber.
  618. func (st *Streamer) UnsubscribeAudio(subID int64) {
  619. st.mu.Lock()
  620. defer st.mu.Unlock()
  621. if pl, ok := st.pendingListens[subID]; ok {
  622. close(pl.ch)
  623. delete(st.pendingListens, subID)
  624. return
  625. }
  626. for _, sess := range st.sessions {
  627. for i, sub := range sess.audioSubs {
  628. if sub.id == subID {
  629. close(sub.ch)
  630. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  631. return
  632. }
  633. }
  634. }
  635. }
  636. // ---------------------------------------------------------------------------
  637. // Session: stateful extraction + demod
  638. // ---------------------------------------------------------------------------
  639. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  640. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  641. // output with zero transient artifacts.
  642. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  643. if len(snippet) == 0 || snipRate <= 0 {
  644. return nil, 0
  645. }
  646. isWFMStereo := sess.demodName == "WFM_STEREO"
  647. isWFM := sess.demodName == "WFM" || isWFMStereo
  648. demodName := sess.demodName
  649. if isWFMStereo {
  650. demodName = "WFM"
  651. }
  652. d := demod.Get(demodName)
  653. if d == nil {
  654. d = demod.Get("NFM")
  655. }
  656. if d == nil {
  657. return nil, 0
  658. }
  659. // --- FM discriminator overlap: prepend 1 sample from previous frame ---
  660. // The FM discriminator needs iq[i-1] to compute the first output.
  661. // All FIR filtering is now stateful, so no additional overlap is needed.
  662. var fullSnip []complex64
  663. trimSamples := 0
  664. _ = trimSamples
  665. if len(sess.overlapIQ) == 1 {
  666. fullSnip = make([]complex64, 1+len(snippet))
  667. fullSnip[0] = sess.overlapIQ[0]
  668. copy(fullSnip[1:], snippet)
  669. trimSamples = 1
  670. logging.Debug("discrim", "overlap_applied", "signal", sess.signalID, "snip", len(snippet))
  671. } else {
  672. fullSnip = snippet
  673. }
  674. // Save last sample for next frame's FM discriminator
  675. if len(snippet) > 0 {
  676. sess.overlapIQ = []complex64{snippet[len(snippet)-1]}
  677. }
  678. // --- Stateful anti-alias FIR + decimation to demod rate ---
  679. demodRate := d.OutputSampleRate()
  680. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  681. if decim1 < 1 {
  682. decim1 = 1
  683. }
  684. actualDemodRate := snipRate / decim1
  685. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  686. var dec []complex64
  687. if decim1 > 1 {
  688. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  689. // Lazy-init or reinit stateful FIR if parameters changed
  690. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  691. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  692. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  693. sess.preDemodRate = snipRate
  694. sess.preDemodCutoff = cutoff
  695. sess.preDemodDecim = decim1
  696. sess.preDemodDecimPhase = 0 // reset decimation phase on FIR reinit
  697. }
  698. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  699. dec = dsp.DecimateStateful(filtered, decim1, &sess.preDemodDecimPhase)
  700. } else {
  701. dec = fullSnip
  702. }
  703. // --- FM Demod ---
  704. audio := d.Demod(dec, actualDemodRate)
  705. if len(audio) == 0 {
  706. return nil, 0
  707. }
  708. // --- Trim the 1-sample FM discriminator overlap ---
  709. // TEMP: skip audio trim to test if per-block trimming causes ticks
  710. // --- Stateful stereo decode with conservative lock/hysteresis ---
  711. channels := 1
  712. if isWFMStereo {
  713. sess.playbackMode = "WFM_STEREO"
  714. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  715. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  716. if locked {
  717. sess.stereoOnCount++
  718. sess.stereoOffCount = 0
  719. if sess.stereoOnCount >= 4 {
  720. sess.stereoEnabled = true
  721. }
  722. } else {
  723. sess.stereoOnCount = 0
  724. sess.stereoOffCount++
  725. if sess.stereoOffCount >= 10 {
  726. sess.stereoEnabled = false
  727. }
  728. }
  729. prevPlayback := sess.playbackMode
  730. prevStereo := sess.stereoState
  731. if sess.stereoEnabled && len(stereoAudio) > 0 {
  732. sess.stereoState = "locked"
  733. audio = stereoAudio
  734. } else {
  735. sess.stereoState = "mono-fallback"
  736. dual := make([]float32, len(audio)*2)
  737. for i, s := range audio {
  738. dual[i*2] = s
  739. dual[i*2+1] = s
  740. }
  741. audio = dual
  742. }
  743. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  744. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  745. }
  746. }
  747. // --- Polyphase resample to exact 48kHz ---
  748. if actualDemodRate != streamAudioRate {
  749. if channels > 1 {
  750. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  751. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  752. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  753. sess.stereoResamplerRate = actualDemodRate
  754. }
  755. audio = sess.stereoResampler.Process(audio)
  756. } else {
  757. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  758. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  759. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  760. sess.monoResamplerRate = actualDemodRate
  761. }
  762. audio = sess.monoResampler.Process(audio)
  763. }
  764. }
  765. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  766. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  767. tau := sess.deemphasisUs * 1e-6
  768. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  769. if channels > 1 {
  770. nFrames := len(audio) / channels
  771. yL, yR := sess.deemphL, sess.deemphR
  772. for i := 0; i < nFrames; i++ {
  773. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  774. audio[i*2] = float32(yL)
  775. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  776. audio[i*2+1] = float32(yR)
  777. }
  778. sess.deemphL, sess.deemphR = yL, yR
  779. } else {
  780. y := sess.deemphL
  781. for i := range audio {
  782. y = alpha*y + (1-alpha)*float64(audio[i])
  783. audio[i] = float32(y)
  784. }
  785. sess.deemphL = y
  786. }
  787. }
  788. if isWFM {
  789. for i := range audio {
  790. audio[i] *= 0.35
  791. }
  792. }
  793. return audio, streamAudioRate
  794. }
  795. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  796. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  797. // loopBW is in Hz, sampleRate in samples/sec.
  798. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  799. if sampleRate <= 0 || loopBW <= 0 {
  800. return 0, 0
  801. }
  802. bl := loopBW / float64(sampleRate)
  803. theta := bl / (damping + 0.25/damping)
  804. d := 1 + 2*damping*theta + theta*theta
  805. alpha := (4 * damping * theta) / d
  806. beta := (4 * theta * theta) / d
  807. return alpha, beta
  808. }
  809. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  810. // Uses persistent FIR filter state across frames for click-free stereo.
  811. // Reuses session scratch buffers to minimize allocations.
  812. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  813. if len(mono) == 0 || sampleRate <= 0 {
  814. return nil, false
  815. }
  816. n := len(mono)
  817. // Rebuild rate-dependent stereo filters when sampleRate changes
  818. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  819. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  820. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  821. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  822. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  823. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  824. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  825. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  826. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  827. sess.stereoFilterRate = sampleRate
  828. // Initialize PLL for 19kHz pilot tracking.
  829. sess.pilotPhase = 0
  830. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  831. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  832. sess.pilotErrAvg = 0
  833. sess.pilotI = 0
  834. sess.pilotQ = 0
  835. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  836. }
  837. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  838. scratch := sess.growAudio(n * 5)
  839. lpr := scratch[:n]
  840. bpfLR := scratch[n : 2*n]
  841. lr := scratch[2*n : 3*n]
  842. work1 := scratch[3*n : 4*n]
  843. work2 := scratch[4*n : 5*n]
  844. sess.stereoLPF.ProcessInto(mono, lpr)
  845. // 23-53kHz bandpass for L-R DSB-SC.
  846. sess.stereoBPHi.ProcessInto(mono, work1)
  847. sess.stereoBPLo.ProcessInto(mono, work2)
  848. for i := 0; i < n; i++ {
  849. bpfLR[i] = work1[i] - work2[i]
  850. }
  851. // 19kHz pilot bandpass for PLL.
  852. sess.pilotLPFHi.ProcessInto(mono, work1)
  853. sess.pilotLPFLo.ProcessInto(mono, work2)
  854. for i := 0; i < n; i++ {
  855. work1[i] = work1[i] - work2[i]
  856. }
  857. pilot := work1
  858. phase := sess.pilotPhase
  859. freq := sess.pilotFreq
  860. alpha := sess.pilotAlpha
  861. beta := sess.pilotBeta
  862. iState := sess.pilotI
  863. qState := sess.pilotQ
  864. lpAlpha := sess.pilotLPAlpha
  865. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  866. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  867. var pilotPower float64
  868. var totalPower float64
  869. var errSum float64
  870. for i := 0; i < n; i++ {
  871. p := float64(pilot[i])
  872. sinP, cosP := math.Sincos(phase)
  873. iMix := p * cosP
  874. qMix := p * -sinP
  875. iState += lpAlpha * (iMix - iState)
  876. qState += lpAlpha * (qMix - qState)
  877. err := math.Atan2(qState, iState)
  878. freq += beta * err
  879. if freq < minFreq {
  880. freq = minFreq
  881. } else if freq > maxFreq {
  882. freq = maxFreq
  883. }
  884. phase += freq + alpha*err
  885. if phase > 2*math.Pi {
  886. phase -= 2 * math.Pi
  887. } else if phase < 0 {
  888. phase += 2 * math.Pi
  889. }
  890. totalPower += float64(mono[i]) * float64(mono[i])
  891. pilotPower += p * p
  892. errSum += math.Abs(err)
  893. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  894. }
  895. sess.pilotPhase = phase
  896. sess.pilotFreq = freq
  897. sess.pilotI = iState
  898. sess.pilotQ = qState
  899. blockErr := errSum / float64(n)
  900. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  901. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  902. pilotRatio := 0.0
  903. if totalPower > 0 {
  904. pilotRatio = pilotPower / totalPower
  905. }
  906. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  907. // Lock heuristics: pilot power fraction and PLL phase error stability.
  908. // Pilot power is a small but stable fraction of composite energy; require
  909. // a modest floor plus PLL settling to avoid flapping in noise.
  910. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  911. out := make([]float32, n*2)
  912. for i := 0; i < n; i++ {
  913. out[i*2] = 0.5 * (lpr[i] + lr[i])
  914. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  915. }
  916. return out, locked
  917. }
  918. // dspStateSnapshot captures persistent DSP state for segment splits.
  919. type dspStateSnapshot struct {
  920. overlapIQ []complex64
  921. deemphL float64
  922. deemphR float64
  923. pilotPhase float64
  924. pilotFreq float64
  925. pilotAlpha float64
  926. pilotBeta float64
  927. pilotErrAvg float64
  928. pilotI float64
  929. pilotQ float64
  930. pilotLPAlpha float64
  931. monoResampler *dsp.Resampler
  932. monoResamplerRate int
  933. stereoResampler *dsp.StereoResampler
  934. stereoResamplerRate int
  935. stereoLPF *dsp.StatefulFIRReal
  936. stereoFilterRate int
  937. stereoBPHi *dsp.StatefulFIRReal
  938. stereoBPLo *dsp.StatefulFIRReal
  939. stereoLRLPF *dsp.StatefulFIRReal
  940. stereoAALPF *dsp.StatefulFIRReal
  941. pilotLPFHi *dsp.StatefulFIRReal
  942. pilotLPFLo *dsp.StatefulFIRReal
  943. preDemodFIR *dsp.StatefulFIRComplex
  944. preDemodDecim int
  945. preDemodRate int
  946. preDemodCutoff float64
  947. preDemodDecimPhase int
  948. }
  949. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  950. return dspStateSnapshot{
  951. overlapIQ: sess.overlapIQ,
  952. deemphL: sess.deemphL,
  953. deemphR: sess.deemphR,
  954. pilotPhase: sess.pilotPhase,
  955. pilotFreq: sess.pilotFreq,
  956. pilotAlpha: sess.pilotAlpha,
  957. pilotBeta: sess.pilotBeta,
  958. pilotErrAvg: sess.pilotErrAvg,
  959. pilotI: sess.pilotI,
  960. pilotQ: sess.pilotQ,
  961. pilotLPAlpha: sess.pilotLPAlpha,
  962. monoResampler: sess.monoResampler,
  963. monoResamplerRate: sess.monoResamplerRate,
  964. stereoResampler: sess.stereoResampler,
  965. stereoResamplerRate: sess.stereoResamplerRate,
  966. stereoLPF: sess.stereoLPF,
  967. stereoFilterRate: sess.stereoFilterRate,
  968. stereoBPHi: sess.stereoBPHi,
  969. stereoBPLo: sess.stereoBPLo,
  970. stereoLRLPF: sess.stereoLRLPF,
  971. stereoAALPF: sess.stereoAALPF,
  972. pilotLPFHi: sess.pilotLPFHi,
  973. pilotLPFLo: sess.pilotLPFLo,
  974. preDemodFIR: sess.preDemodFIR,
  975. preDemodDecim: sess.preDemodDecim,
  976. preDemodRate: sess.preDemodRate,
  977. preDemodCutoff: sess.preDemodCutoff,
  978. preDemodDecimPhase: sess.preDemodDecimPhase,
  979. }
  980. }
  981. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  982. sess.overlapIQ = s.overlapIQ
  983. sess.deemphL = s.deemphL
  984. sess.deemphR = s.deemphR
  985. sess.pilotPhase = s.pilotPhase
  986. sess.pilotFreq = s.pilotFreq
  987. sess.pilotAlpha = s.pilotAlpha
  988. sess.pilotBeta = s.pilotBeta
  989. sess.pilotErrAvg = s.pilotErrAvg
  990. sess.pilotI = s.pilotI
  991. sess.pilotQ = s.pilotQ
  992. sess.pilotLPAlpha = s.pilotLPAlpha
  993. sess.monoResampler = s.monoResampler
  994. sess.monoResamplerRate = s.monoResamplerRate
  995. sess.stereoResampler = s.stereoResampler
  996. sess.stereoResamplerRate = s.stereoResamplerRate
  997. sess.stereoLPF = s.stereoLPF
  998. sess.stereoFilterRate = s.stereoFilterRate
  999. sess.stereoBPHi = s.stereoBPHi
  1000. sess.stereoBPLo = s.stereoBPLo
  1001. sess.stereoLRLPF = s.stereoLRLPF
  1002. sess.stereoAALPF = s.stereoAALPF
  1003. sess.pilotLPFHi = s.pilotLPFHi
  1004. sess.pilotLPFLo = s.pilotLPFLo
  1005. sess.preDemodFIR = s.preDemodFIR
  1006. sess.preDemodDecim = s.preDemodDecim
  1007. sess.preDemodRate = s.preDemodRate
  1008. sess.preDemodCutoff = s.preDemodCutoff
  1009. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1010. }
  1011. // ---------------------------------------------------------------------------
  1012. // Session management helpers
  1013. // ---------------------------------------------------------------------------
  1014. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1015. outputDir := st.policy.OutputDir
  1016. if outputDir == "" {
  1017. outputDir = "data/recordings"
  1018. }
  1019. demodName, channels := resolveDemod(sig)
  1020. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1021. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1022. dir := filepath.Join(outputDir, dirName)
  1023. if err := os.MkdirAll(dir, 0o755); err != nil {
  1024. return nil, err
  1025. }
  1026. wavPath := filepath.Join(dir, "audio.wav")
  1027. f, err := os.Create(wavPath)
  1028. if err != nil {
  1029. return nil, err
  1030. }
  1031. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1032. f.Close()
  1033. return nil, err
  1034. }
  1035. playbackMode, stereoState := initialPlaybackState(demodName)
  1036. sess := &streamSession{
  1037. signalID: sig.ID,
  1038. centerHz: sig.CenterHz,
  1039. bwHz: sig.BWHz,
  1040. snrDb: sig.SNRDb,
  1041. peakDb: sig.PeakDb,
  1042. class: sig.Class,
  1043. startTime: now,
  1044. lastFeed: now,
  1045. dir: dir,
  1046. wavFile: f,
  1047. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1048. sampleRate: streamAudioRate,
  1049. channels: channels,
  1050. demodName: demodName,
  1051. playbackMode: playbackMode,
  1052. stereoState: stereoState,
  1053. deemphasisUs: st.policy.DeemphasisUs,
  1054. }
  1055. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1056. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1057. return sess, nil
  1058. }
  1059. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1060. demodName, channels := resolveDemod(sig)
  1061. for _, pl := range st.pendingListens {
  1062. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1063. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1064. demodName = requested
  1065. if demodName == "WFM_STEREO" {
  1066. channels = 2
  1067. } else if d := demod.Get(demodName); d != nil {
  1068. channels = d.Channels()
  1069. } else {
  1070. channels = 1
  1071. }
  1072. break
  1073. }
  1074. }
  1075. }
  1076. playbackMode, stereoState := initialPlaybackState(demodName)
  1077. sess := &streamSession{
  1078. signalID: sig.ID,
  1079. centerHz: sig.CenterHz,
  1080. bwHz: sig.BWHz,
  1081. snrDb: sig.SNRDb,
  1082. peakDb: sig.PeakDb,
  1083. class: sig.Class,
  1084. startTime: now,
  1085. lastFeed: now,
  1086. listenOnly: true,
  1087. sampleRate: streamAudioRate,
  1088. channels: channels,
  1089. demodName: demodName,
  1090. playbackMode: playbackMode,
  1091. stereoState: stereoState,
  1092. deemphasisUs: st.policy.DeemphasisUs,
  1093. }
  1094. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1095. sig.ID, sig.CenterHz/1e6, demodName)
  1096. return sess
  1097. }
  1098. func resolveDemod(sig *detector.Signal) (string, int) {
  1099. demodName := "NFM"
  1100. if sig.Class != nil {
  1101. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1102. demodName = n
  1103. }
  1104. }
  1105. channels := 1
  1106. if demodName == "WFM_STEREO" {
  1107. channels = 2
  1108. } else if d := demod.Get(demodName); d != nil {
  1109. channels = d.Channels()
  1110. }
  1111. return demodName, channels
  1112. }
  1113. func initialPlaybackState(demodName string) (string, string) {
  1114. playbackMode := demodName
  1115. stereoState := "mono"
  1116. if demodName == "WFM_STEREO" {
  1117. stereoState = "searching"
  1118. }
  1119. return playbackMode, stereoState
  1120. }
  1121. func (sess *streamSession) audioInfo() AudioInfo {
  1122. return AudioInfo{
  1123. SampleRate: sess.sampleRate,
  1124. Channels: sess.channels,
  1125. Format: "s16le",
  1126. DemodName: sess.demodName,
  1127. PlaybackMode: sess.playbackMode,
  1128. StereoState: sess.stereoState,
  1129. }
  1130. }
  1131. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1132. infoJSON, _ := json.Marshal(info)
  1133. tagged := make([]byte, 1+len(infoJSON))
  1134. tagged[0] = 0x00 // tag: audio_info
  1135. copy(tagged[1:], infoJSON)
  1136. for _, sub := range subs {
  1137. select {
  1138. case sub.ch <- tagged:
  1139. default:
  1140. }
  1141. }
  1142. }
  1143. func defaultAudioInfoForMode(mode string) AudioInfo {
  1144. demodName := "NFM"
  1145. if requested := normalizeRequestedMode(mode); requested != "" {
  1146. demodName = requested
  1147. }
  1148. channels := 1
  1149. if demodName == "WFM_STEREO" {
  1150. channels = 2
  1151. } else if d := demod.Get(demodName); d != nil {
  1152. channels = d.Channels()
  1153. }
  1154. playbackMode, stereoState := initialPlaybackState(demodName)
  1155. return AudioInfo{
  1156. SampleRate: streamAudioRate,
  1157. Channels: channels,
  1158. Format: "s16le",
  1159. DemodName: demodName,
  1160. PlaybackMode: playbackMode,
  1161. StereoState: stereoState,
  1162. }
  1163. }
  1164. func normalizeRequestedMode(mode string) string {
  1165. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1166. case "", "AUTO":
  1167. return ""
  1168. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1169. return strings.ToUpper(strings.TrimSpace(mode))
  1170. default:
  1171. return ""
  1172. }
  1173. }
  1174. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1175. func (sess *streamSession) growIQ(n int) []complex64 {
  1176. if cap(sess.scratchIQ) >= n {
  1177. return sess.scratchIQ[:n]
  1178. }
  1179. sess.scratchIQ = make([]complex64, n, n*5/4)
  1180. return sess.scratchIQ
  1181. }
  1182. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1183. func (sess *streamSession) growAudio(n int) []float32 {
  1184. if cap(sess.scratchAudio) >= n {
  1185. return sess.scratchAudio[:n]
  1186. }
  1187. sess.scratchAudio = make([]float32, n, n*5/4)
  1188. return sess.scratchAudio
  1189. }
  1190. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1191. func (sess *streamSession) growPCM(n int) []byte {
  1192. if cap(sess.scratchPCM) >= n {
  1193. return sess.scratchPCM[:n]
  1194. }
  1195. sess.scratchPCM = make([]byte, n, n*5/4)
  1196. return sess.scratchPCM
  1197. }
  1198. func convertToListenOnly(sess *streamSession) {
  1199. if sess.wavBuf != nil {
  1200. _ = sess.wavBuf.Flush()
  1201. }
  1202. if sess.wavFile != nil {
  1203. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1204. sess.wavFile.Close()
  1205. }
  1206. sess.wavFile = nil
  1207. sess.wavBuf = nil
  1208. sess.listenOnly = true
  1209. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1210. }
  1211. func closeSession(sess *streamSession, policy *Policy) {
  1212. if sess.listenOnly {
  1213. return
  1214. }
  1215. if sess.wavBuf != nil {
  1216. _ = sess.wavBuf.Flush()
  1217. }
  1218. if sess.wavFile != nil {
  1219. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1220. sess.wavFile.Close()
  1221. sess.wavFile = nil
  1222. sess.wavBuf = nil
  1223. }
  1224. dur := sess.lastFeed.Sub(sess.startTime)
  1225. files := map[string]any{
  1226. "audio": "audio.wav",
  1227. "audio_sample_rate": sess.sampleRate,
  1228. "audio_channels": sess.channels,
  1229. "audio_demod": sess.demodName,
  1230. "recording_mode": "streaming",
  1231. }
  1232. meta := Meta{
  1233. EventID: sess.signalID,
  1234. Start: sess.startTime,
  1235. End: sess.lastFeed,
  1236. CenterHz: sess.centerHz,
  1237. BandwidthHz: sess.bwHz,
  1238. SampleRate: sess.sampleRate,
  1239. SNRDb: sess.snrDb,
  1240. PeakDb: sess.peakDb,
  1241. Class: sess.class,
  1242. DurationMs: dur.Milliseconds(),
  1243. Files: files,
  1244. }
  1245. b, err := json.MarshalIndent(meta, "", " ")
  1246. if err == nil {
  1247. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1248. }
  1249. if policy != nil {
  1250. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1251. }
  1252. }
  1253. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1254. if len(sess.audioSubs) == 0 {
  1255. return
  1256. }
  1257. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1258. tagged := make([]byte, 1+pcmLen)
  1259. tagged[0] = 0x01
  1260. copy(tagged[1:], pcm[:pcmLen])
  1261. alive := sess.audioSubs[:0]
  1262. for _, sub := range sess.audioSubs {
  1263. select {
  1264. case sub.ch <- tagged:
  1265. default:
  1266. st.droppedPCM++
  1267. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1268. }
  1269. alive = append(alive, sub)
  1270. }
  1271. sess.audioSubs = alive
  1272. }
  1273. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1274. if len(st.policy.ClassFilter) == 0 {
  1275. return true
  1276. }
  1277. if cls == nil {
  1278. return false
  1279. }
  1280. for _, f := range st.policy.ClassFilter {
  1281. if strings.EqualFold(f, string(cls.ModType)) {
  1282. return true
  1283. }
  1284. }
  1285. return false
  1286. }
  1287. // ErrNoSession is returned when no matching signal session exists.
  1288. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1289. // ---------------------------------------------------------------------------
  1290. // WAV header helpers
  1291. // ---------------------------------------------------------------------------
  1292. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1293. if channels <= 0 {
  1294. channels = 1
  1295. }
  1296. hdr := make([]byte, 44)
  1297. copy(hdr[0:4], "RIFF")
  1298. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1299. copy(hdr[8:12], "WAVE")
  1300. copy(hdr[12:16], "fmt ")
  1301. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1302. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1303. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1304. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1305. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1306. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1307. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1308. copy(hdr[36:40], "data")
  1309. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1310. _, err := f.Write(hdr)
  1311. return err
  1312. }
  1313. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1314. dataSize := uint32(totalSamples * 2)
  1315. var buf [4]byte
  1316. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1317. if _, err := f.Seek(4, 0); err != nil {
  1318. return
  1319. }
  1320. _, _ = f.Write(buf[:])
  1321. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1322. if _, err := f.Seek(24, 0); err != nil {
  1323. return
  1324. }
  1325. _, _ = f.Write(buf[:])
  1326. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1327. if _, err := f.Seek(28, 0); err != nil {
  1328. return
  1329. }
  1330. _, _ = f.Write(buf[:])
  1331. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1332. if _, err := f.Seek(40, 0); err != nil {
  1333. return
  1334. }
  1335. _, _ = f.Write(buf[:])
  1336. }