Wideband autonomous SDR analysis engine forked from sdr-visual-suite
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

1489 Zeilen
44KB

  1. package recorder
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "log"
  9. "math"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "time"
  15. "sdr-wideband-suite/internal/classifier"
  16. "sdr-wideband-suite/internal/demod"
  17. "sdr-wideband-suite/internal/detector"
  18. "sdr-wideband-suite/internal/dsp"
  19. "sdr-wideband-suite/internal/logging"
  20. )
  21. // ---------------------------------------------------------------------------
  22. // streamSession — one open demod session for one signal
  23. // ---------------------------------------------------------------------------
  24. type streamSession struct {
  25. signalID int64
  26. centerHz float64
  27. bwHz float64
  28. snrDb float64
  29. peakDb float64
  30. class *classifier.Classification
  31. startTime time.Time
  32. lastFeed time.Time
  33. playbackMode string
  34. stereoState string
  35. lastAudioTs time.Time
  36. lastAudioL float32
  37. lastAudioR float32
  38. prevAudioL float64 // second-to-last L sample for boundary transient detection
  39. lastAudioSet bool
  40. // listenOnly sessions have no WAV file and no disk I/O.
  41. // They exist solely to feed audio to live-listen subscribers.
  42. listenOnly bool
  43. // Recording state (nil/zero for listen-only sessions)
  44. dir string
  45. wavFile *os.File
  46. wavBuf *bufio.Writer
  47. wavSamples int64
  48. segmentIdx int
  49. sampleRate int // actual output audio sample rate (always streamAudioRate)
  50. channels int
  51. demodName string
  52. // --- Persistent DSP state for click-free streaming ---
  53. // Overlap-save: tail of previous extracted IQ snippet.
  54. // Currently unused for live demod after removing the extra discriminator
  55. // overlap prepend, but kept in DSP snapshot state for compatibility.
  56. overlapIQ []complex64
  57. // De-emphasis IIR state (persists across frames)
  58. deemphL float64
  59. deemphR float64
  60. // Stereo lock state for live WFM streaming
  61. stereoEnabled bool
  62. stereoOnCount int
  63. stereoOffCount int
  64. // Pilot-locked stereo PLL state (19kHz pilot)
  65. pilotPhase float64
  66. pilotFreq float64
  67. pilotAlpha float64
  68. pilotBeta float64
  69. pilotErrAvg float64
  70. pilotI float64
  71. pilotQ float64
  72. pilotLPAlpha float64
  73. // Polyphase resampler (replaces integer-decimate hack)
  74. monoResampler *dsp.Resampler
  75. monoResamplerRate int
  76. stereoResampler *dsp.StereoResampler
  77. stereoResamplerRate int
  78. // AQ-4: Stateful FIR filters for click-free stereo decode
  79. stereoFilterRate int
  80. stereoLPF *dsp.StatefulFIRReal // 15kHz lowpass for L+R
  81. stereoBPHi *dsp.StatefulFIRReal // 53kHz LP for bandpass high
  82. stereoBPLo *dsp.StatefulFIRReal // 23kHz LP for bandpass low
  83. stereoLRLPF *dsp.StatefulFIRReal // 15kHz LP for demodulated L-R
  84. stereoAALPF *dsp.StatefulFIRReal // Anti-alias LP for pre-decim (mono path)
  85. pilotLPFHi *dsp.StatefulFIRReal // ~21kHz LP for pilot bandpass high
  86. pilotLPFLo *dsp.StatefulFIRReal // ~17kHz LP for pilot bandpass low
  87. // Stateful pre-demod anti-alias FIR (eliminates cold-start transients
  88. // and avoids per-frame FIR recomputation)
  89. preDemodFIR *dsp.StatefulFIRComplex
  90. preDemodDecim int // cached decimation factor
  91. preDemodRate int // cached snipRate this FIR was built for
  92. preDemodCutoff float64 // cached cutoff
  93. preDemodDecimPhase int // stateful decimation phase (index offset into next frame)
  94. // AQ-2: De-emphasis config (µs, 0 = disabled)
  95. deemphasisUs float64
  96. // Scratch buffers — reused across frames to avoid GC pressure.
  97. // Grown as needed, never shrunk.
  98. scratchIQ []complex64 // for pre-demod FIR output + decimate input
  99. scratchAudio []float32 // for stereo decode intermediates
  100. scratchPCM []byte // for PCM encoding
  101. // live-listen subscribers
  102. audioSubs []audioSub
  103. }
  104. type audioSub struct {
  105. id int64
  106. ch chan []byte
  107. }
  108. type RuntimeSignalInfo struct {
  109. DemodName string
  110. PlaybackMode string
  111. StereoState string
  112. Channels int
  113. SampleRate int
  114. }
  115. // AudioInfo describes the audio format of a live-listen subscription.
  116. // Sent to the WebSocket client as the first message.
  117. type AudioInfo struct {
  118. SampleRate int `json:"sample_rate"`
  119. Channels int `json:"channels"`
  120. Format string `json:"format"` // always "s16le"
  121. DemodName string `json:"demod"`
  122. PlaybackMode string `json:"playback_mode,omitempty"`
  123. StereoState string `json:"stereo_state,omitempty"`
  124. }
  125. const (
  126. streamAudioRate = 48000
  127. resamplerTaps = 32 // taps per polyphase arm — good quality
  128. )
  129. // ---------------------------------------------------------------------------
  130. // Streamer — manages all active streaming sessions
  131. // ---------------------------------------------------------------------------
  132. type streamFeedItem struct {
  133. signal detector.Signal
  134. snippet []complex64
  135. snipRate int
  136. }
  137. type streamFeedMsg struct {
  138. traceID uint64
  139. items []streamFeedItem
  140. }
  141. type Streamer struct {
  142. mu sync.Mutex
  143. sessions map[int64]*streamSession
  144. policy Policy
  145. centerHz float64
  146. nextSub int64
  147. feedCh chan streamFeedMsg
  148. done chan struct{}
  149. droppedFeed uint64
  150. droppedPCM uint64
  151. lastFeedTS time.Time
  152. lastProcTS time.Time
  153. // pendingListens are subscribers waiting for a matching session.
  154. pendingListens map[int64]*pendingListen
  155. }
  156. type pendingListen struct {
  157. freq float64
  158. bw float64
  159. mode string
  160. ch chan []byte
  161. }
  162. func newStreamer(policy Policy, centerHz float64) *Streamer {
  163. st := &Streamer{
  164. sessions: make(map[int64]*streamSession),
  165. policy: policy,
  166. centerHz: centerHz,
  167. feedCh: make(chan streamFeedMsg, 2),
  168. done: make(chan struct{}),
  169. pendingListens: make(map[int64]*pendingListen),
  170. }
  171. go st.worker()
  172. return st
  173. }
  174. func (st *Streamer) worker() {
  175. for msg := range st.feedCh {
  176. st.processFeed(msg)
  177. }
  178. close(st.done)
  179. }
  180. func (st *Streamer) updatePolicy(policy Policy, centerHz float64) {
  181. st.mu.Lock()
  182. defer st.mu.Unlock()
  183. wasEnabled := st.policy.Enabled
  184. st.policy = policy
  185. st.centerHz = centerHz
  186. // If recording was just disabled, close recording sessions
  187. // but keep listen-only sessions alive.
  188. if wasEnabled && !policy.Enabled {
  189. for id, sess := range st.sessions {
  190. if sess.listenOnly {
  191. continue
  192. }
  193. if len(sess.audioSubs) > 0 {
  194. // Convert to listen-only: close WAV but keep session
  195. convertToListenOnly(sess)
  196. } else {
  197. closeSession(sess, &st.policy)
  198. delete(st.sessions, id)
  199. }
  200. }
  201. }
  202. }
  203. // HasListeners returns true if any sessions have audio subscribers
  204. // or there are pending listen requests. Used by the DSP loop to
  205. // decide whether to feed snippets even when recording is disabled.
  206. func (st *Streamer) HasListeners() bool {
  207. st.mu.Lock()
  208. defer st.mu.Unlock()
  209. return st.hasListenersLocked()
  210. }
  211. func (st *Streamer) hasListenersLocked() bool {
  212. if len(st.pendingListens) > 0 {
  213. return true
  214. }
  215. for _, sess := range st.sessions {
  216. if len(sess.audioSubs) > 0 {
  217. return true
  218. }
  219. }
  220. return false
  221. }
  222. // FeedSnippets is called from the DSP loop with pre-extracted IQ snippets.
  223. // Feeds are accepted if:
  224. // - Recording is enabled (policy.Enabled && RecordAudio/RecordIQ), OR
  225. // - Any live-listen subscribers exist (listen-only mode)
  226. //
  227. // IMPORTANT: The caller (Manager.FeedSnippets) already copies the snippet
  228. // data, so items can be passed directly without another copy.
  229. func (st *Streamer) FeedSnippets(items []streamFeedItem, traceID uint64) {
  230. st.mu.Lock()
  231. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  232. hasListeners := st.hasListenersLocked()
  233. pending := len(st.pendingListens)
  234. debugLiveAudio := st.policy.DebugLiveAudio
  235. now := time.Now()
  236. if !st.lastFeedTS.IsZero() {
  237. gap := now.Sub(st.lastFeedTS)
  238. if gap > 150*time.Millisecond {
  239. logging.Warn("gap", "feed_gap", "gap_ms", gap.Milliseconds())
  240. }
  241. }
  242. st.lastFeedTS = now
  243. st.mu.Unlock()
  244. if debugLiveAudio {
  245. log.Printf("LIVEAUDIO STREAM: feedSnippets items=%d recEnabled=%v hasListeners=%v pending=%d", len(items), recEnabled, hasListeners, pending)
  246. }
  247. if (!recEnabled && !hasListeners) || len(items) == 0 {
  248. return
  249. }
  250. select {
  251. case st.feedCh <- streamFeedMsg{traceID: traceID, items: items}:
  252. default:
  253. st.droppedFeed++
  254. logging.Warn("drop", "feed_drop", "count", st.droppedFeed)
  255. }
  256. }
  257. // processFeed runs in the worker goroutine.
  258. func (st *Streamer) processFeed(msg streamFeedMsg) {
  259. st.mu.Lock()
  260. recEnabled := st.policy.Enabled && (st.policy.RecordAudio || st.policy.RecordIQ)
  261. hasListeners := st.hasListenersLocked()
  262. now := time.Now()
  263. if !st.lastProcTS.IsZero() {
  264. gap := now.Sub(st.lastProcTS)
  265. if gap > 150*time.Millisecond {
  266. logging.Warn("gap", "process_gap", "gap_ms", gap.Milliseconds(), "trace", msg.traceID)
  267. }
  268. }
  269. st.lastProcTS = now
  270. defer st.mu.Unlock()
  271. logging.Debug("trace", "process_feed", "trace", msg.traceID, "items", len(msg.items))
  272. if !recEnabled && !hasListeners {
  273. return
  274. }
  275. seen := make(map[int64]bool, len(msg.items))
  276. for i := range msg.items {
  277. item := &msg.items[i]
  278. sig := &item.signal
  279. seen[sig.ID] = true
  280. if sig.ID == 0 || sig.Class == nil {
  281. continue
  282. }
  283. if len(item.snippet) == 0 || item.snipRate <= 0 {
  284. continue
  285. }
  286. // Decide whether this signal needs a session
  287. needsRecording := recEnabled && sig.SNRDb >= st.policy.MinSNRDb && st.classAllowed(sig.Class)
  288. needsListen := st.signalHasListenerLocked(sig)
  289. className := "<nil>"
  290. demodName := ""
  291. if sig.Class != nil {
  292. className = string(sig.Class.ModType)
  293. demodName, _ = resolveDemod(sig)
  294. }
  295. if st.policy.DebugLiveAudio {
  296. log.Printf("LIVEAUDIO STREAM: signal id=%d center=%.3fMHz bw=%.0f snr=%.1f class=%s demod=%s needsRecord=%v needsListen=%v", sig.ID, sig.CenterHz/1e6, sig.BWHz, sig.SNRDb, className, demodName, needsRecording, needsListen)
  297. }
  298. if !needsRecording && !needsListen {
  299. continue
  300. }
  301. sess, exists := st.sessions[sig.ID]
  302. requestedMode := ""
  303. for _, pl := range st.pendingListens {
  304. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  305. if m := normalizeRequestedMode(pl.mode); m != "" {
  306. requestedMode = m
  307. break
  308. }
  309. }
  310. }
  311. if exists && sess.listenOnly && requestedMode != "" && sess.demodName != requestedMode {
  312. for _, sub := range sess.audioSubs {
  313. st.pendingListens[sub.id] = &pendingListen{freq: sig.CenterHz, bw: sig.BWHz, mode: requestedMode, ch: sub.ch}
  314. }
  315. delete(st.sessions, sig.ID)
  316. sess = nil
  317. exists = false
  318. }
  319. if !exists {
  320. if needsRecording {
  321. s, err := st.openRecordingSession(sig, now)
  322. if err != nil {
  323. log.Printf("STREAM: open failed signal=%d %.1fMHz: %v",
  324. sig.ID, sig.CenterHz/1e6, err)
  325. continue
  326. }
  327. st.sessions[sig.ID] = s
  328. sess = s
  329. } else {
  330. s := st.openListenSession(sig, now)
  331. st.sessions[sig.ID] = s
  332. sess = s
  333. }
  334. // Attach any pending listeners
  335. st.attachPendingListeners(sess)
  336. }
  337. // Update metadata
  338. sess.lastFeed = now
  339. sess.centerHz = sig.CenterHz
  340. sess.bwHz = sig.BWHz
  341. if sig.SNRDb > sess.snrDb {
  342. sess.snrDb = sig.SNRDb
  343. }
  344. if sig.PeakDb > sess.peakDb {
  345. sess.peakDb = sig.PeakDb
  346. }
  347. if sig.Class != nil {
  348. sess.class = sig.Class
  349. }
  350. // Demod with persistent state
  351. logging.Debug("trace", "demod_start", "trace", msg.traceID, "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  352. audio, audioRate := sess.processSnippet(item.snippet, item.snipRate)
  353. logging.Debug("trace", "demod_done", "trace", msg.traceID, "signal", sess.signalID, "audio_len", len(audio), "audio_rate", audioRate)
  354. if len(audio) == 0 {
  355. logging.Warn("gap", "audio_empty", "signal", sess.signalID, "snip_len", len(item.snippet), "snip_rate", item.snipRate)
  356. }
  357. if len(audio) > 0 {
  358. if sess.wavSamples == 0 && audioRate > 0 {
  359. sess.sampleRate = audioRate
  360. }
  361. // Encode PCM once into scratch buffer, reuse for both WAV and fanout
  362. pcmLen := len(audio) * 2
  363. pcm := sess.growPCM(pcmLen)
  364. for k, s := range audio {
  365. v := int16(clip(s * 32767))
  366. binary.LittleEndian.PutUint16(pcm[k*2:], uint16(v))
  367. }
  368. if !sess.listenOnly && sess.wavBuf != nil {
  369. n, err := sess.wavBuf.Write(pcm)
  370. if err != nil {
  371. log.Printf("STREAM: write error signal=%d: %v", sess.signalID, err)
  372. } else {
  373. sess.wavSamples += int64(n / 2)
  374. }
  375. }
  376. // Gap logging for live-audio sessions + transient click detector
  377. if len(sess.audioSubs) > 0 {
  378. if !sess.lastAudioTs.IsZero() {
  379. gap := time.Since(sess.lastAudioTs)
  380. if gap > 150*time.Millisecond {
  381. logging.Warn("gap", "audio_gap", "signal", sess.signalID, "gap_ms", gap.Milliseconds())
  382. }
  383. }
  384. // Transient click detector: finds short impulses (1-3 samples)
  385. // that deviate sharply from the local signal trend.
  386. // A click looks like: ...smooth... SPIKE ...smooth...
  387. // Normal FM audio has large deltas too, but they follow
  388. // a continuous curve. A click has high |d2/dt2| (acceleration).
  389. //
  390. // Method: second-derivative detector. For each sample triplet
  391. // (a, b, c), compute |2b - a - c| which is the discrete
  392. // second derivative magnitude. High values = transient spike.
  393. // Threshold: 0.15 (tuned to reject normal FM content <15kHz).
  394. if logging.EnabledCategory("boundary") && len(audio) > 0 {
  395. stride := sess.channels
  396. if stride < 1 {
  397. stride = 1
  398. }
  399. nFrames := len(audio) / stride
  400. // Boundary transient: use last 2 samples of prev frame + first sample of this frame
  401. if sess.lastAudioSet && nFrames >= 1 {
  402. // second derivative across boundary: |2*last - prevLast - first|
  403. first := float64(audio[0])
  404. d2 := math.Abs(2*float64(sess.lastAudioL) - sess.prevAudioL - first)
  405. if d2 > 0.15 {
  406. logging.Warn("boundary", "boundary_click", "signal", sess.signalID, "d2", d2)
  407. }
  408. }
  409. // Intra-frame transient scan (L channel only for performance)
  410. nClicks := 0
  411. maxD2 := float64(0)
  412. maxD2Pos := 0
  413. for k := 1; k < nFrames-1; k++ {
  414. a := float64(audio[(k-1)*stride])
  415. b := float64(audio[k*stride])
  416. c := float64(audio[(k+1)*stride])
  417. d2 := math.Abs(2*b - a - c)
  418. if d2 > maxD2 {
  419. maxD2 = d2
  420. maxD2Pos = k
  421. }
  422. if d2 > 0.15 {
  423. nClicks++
  424. }
  425. }
  426. if nClicks > 0 {
  427. logging.Warn("boundary", "intra_click", "signal", sess.signalID, "clicks", nClicks, "maxD2", maxD2, "pos", maxD2Pos, "len", nFrames)
  428. }
  429. // Store last two samples for next frame's boundary check
  430. if nFrames >= 2 {
  431. sess.prevAudioL = float64(audio[(nFrames-2)*stride])
  432. sess.lastAudioL = audio[(nFrames-1)*stride]
  433. if stride > 1 {
  434. sess.lastAudioR = audio[(nFrames-1)*stride+1]
  435. }
  436. } else if nFrames == 1 {
  437. sess.prevAudioL = float64(sess.lastAudioL)
  438. sess.lastAudioL = audio[0]
  439. if stride > 1 && len(audio) >= 2 {
  440. sess.lastAudioR = audio[1]
  441. }
  442. }
  443. sess.lastAudioSet = true
  444. }
  445. sess.lastAudioTs = time.Now()
  446. }
  447. st.fanoutPCM(sess, pcm, pcmLen)
  448. }
  449. // Segment split (recording sessions only)
  450. if !sess.listenOnly && st.policy.MaxDuration > 0 && now.Sub(sess.startTime) >= st.policy.MaxDuration {
  451. segIdx := sess.segmentIdx + 1
  452. oldSubs := sess.audioSubs
  453. oldState := sess.captureDSPState()
  454. sess.audioSubs = nil
  455. closeSession(sess, &st.policy)
  456. s, err := st.openRecordingSession(sig, now)
  457. if err != nil {
  458. delete(st.sessions, sig.ID)
  459. continue
  460. }
  461. s.segmentIdx = segIdx
  462. s.audioSubs = oldSubs
  463. s.restoreDSPState(oldState)
  464. st.sessions[sig.ID] = s
  465. }
  466. }
  467. // Close sessions for disappeared signals (with grace period)
  468. for id, sess := range st.sessions {
  469. if seen[id] {
  470. continue
  471. }
  472. gracePeriod := 3 * time.Second
  473. if sess.listenOnly {
  474. gracePeriod = 5 * time.Second
  475. }
  476. if now.Sub(sess.lastFeed) > gracePeriod {
  477. for _, sub := range sess.audioSubs {
  478. close(sub.ch)
  479. }
  480. sess.audioSubs = nil
  481. if !sess.listenOnly {
  482. closeSession(sess, &st.policy)
  483. }
  484. delete(st.sessions, id)
  485. }
  486. }
  487. }
  488. func (st *Streamer) signalHasListenerLocked(sig *detector.Signal) bool {
  489. if sess, ok := st.sessions[sig.ID]; ok && len(sess.audioSubs) > 0 {
  490. if st.policy.DebugLiveAudio {
  491. log.Printf("LIVEAUDIO MATCH: signal id=%d matched existing session listener center=%.3fMHz", sig.ID, sig.CenterHz/1e6)
  492. }
  493. return true
  494. }
  495. for subID, pl := range st.pendingListens {
  496. delta := math.Abs(sig.CenterHz - pl.freq)
  497. if delta < 200000 {
  498. if st.policy.DebugLiveAudio {
  499. log.Printf("LIVEAUDIO MATCH: signal id=%d matched pending subscriber=%d center=%.3fMHz req=%.3fMHz delta=%.0fHz", sig.ID, subID, sig.CenterHz/1e6, pl.freq/1e6, delta)
  500. }
  501. return true
  502. }
  503. }
  504. return false
  505. }
  506. func (st *Streamer) attachPendingListeners(sess *streamSession) {
  507. for subID, pl := range st.pendingListens {
  508. requestedMode := normalizeRequestedMode(pl.mode)
  509. if requestedMode != "" && sess.demodName != requestedMode {
  510. continue
  511. }
  512. if math.Abs(sess.centerHz-pl.freq) < 200000 {
  513. sess.audioSubs = append(sess.audioSubs, audioSub{id: subID, ch: pl.ch})
  514. delete(st.pendingListens, subID)
  515. // Send updated audio_info now that we know the real session params.
  516. // Prefix with 0x00 tag byte so ws/audio handler sends as TextMessage.
  517. infoJSON, _ := json.Marshal(sess.audioInfo())
  518. tagged := make([]byte, 1+len(infoJSON))
  519. tagged[0] = 0x00 // tag: audio_info
  520. copy(tagged[1:], infoJSON)
  521. select {
  522. case pl.ch <- tagged:
  523. default:
  524. }
  525. log.Printf("STREAM: attached pending listener %d to signal %d (%.1fMHz %s ch=%d)",
  526. subID, sess.signalID, sess.centerHz/1e6, sess.demodName, sess.channels)
  527. }
  528. }
  529. }
  530. // CloseAll finalises all sessions and stops the worker goroutine.
  531. func (st *Streamer) RuntimeInfoBySignalID() map[int64]RuntimeSignalInfo {
  532. st.mu.Lock()
  533. defer st.mu.Unlock()
  534. out := make(map[int64]RuntimeSignalInfo, len(st.sessions))
  535. for _, sess := range st.sessions {
  536. out[sess.signalID] = RuntimeSignalInfo{
  537. DemodName: sess.demodName,
  538. PlaybackMode: sess.playbackMode,
  539. StereoState: sess.stereoState,
  540. Channels: sess.channels,
  541. SampleRate: sess.sampleRate,
  542. }
  543. }
  544. return out
  545. }
  546. func (st *Streamer) CloseAll() {
  547. close(st.feedCh)
  548. <-st.done
  549. st.mu.Lock()
  550. defer st.mu.Unlock()
  551. for id, sess := range st.sessions {
  552. for _, sub := range sess.audioSubs {
  553. close(sub.ch)
  554. }
  555. sess.audioSubs = nil
  556. if !sess.listenOnly {
  557. closeSession(sess, &st.policy)
  558. }
  559. delete(st.sessions, id)
  560. }
  561. for _, pl := range st.pendingListens {
  562. close(pl.ch)
  563. }
  564. st.pendingListens = nil
  565. }
  566. // ActiveSessions returns the number of open streaming sessions.
  567. func (st *Streamer) ActiveSessions() int {
  568. st.mu.Lock()
  569. defer st.mu.Unlock()
  570. return len(st.sessions)
  571. }
  572. // SubscribeAudio registers a live-listen subscriber for a given frequency.
  573. //
  574. // LL-2: Returns AudioInfo with correct channels and sample rate.
  575. // LL-3: Returns error only on hard failures (nil streamer etc).
  576. //
  577. // If a matching session exists, attaches immediately. Otherwise, the
  578. // subscriber is held as "pending" and will be attached when a matching
  579. // signal appears in the next DSP frame.
  580. func (st *Streamer) SubscribeAudio(freq float64, bw float64, mode string) (int64, <-chan []byte, AudioInfo, error) {
  581. ch := make(chan []byte, 64)
  582. st.mu.Lock()
  583. defer st.mu.Unlock()
  584. st.nextSub++
  585. subID := st.nextSub
  586. requestedMode := normalizeRequestedMode(mode)
  587. // Try to find a matching session
  588. var bestSess *streamSession
  589. bestDist := math.MaxFloat64
  590. for _, sess := range st.sessions {
  591. if requestedMode != "" && sess.demodName != requestedMode {
  592. continue
  593. }
  594. d := math.Abs(sess.centerHz - freq)
  595. if d < bestDist {
  596. bestDist = d
  597. bestSess = sess
  598. }
  599. }
  600. if bestSess != nil && bestDist < 200000 {
  601. bestSess.audioSubs = append(bestSess.audioSubs, audioSub{id: subID, ch: ch})
  602. info := bestSess.audioInfo()
  603. log.Printf("STREAM: subscriber %d attached to signal %d (%.1fMHz %s)",
  604. subID, bestSess.signalID, bestSess.centerHz/1e6, bestSess.demodName)
  605. return subID, ch, info, nil
  606. }
  607. // No matching session yet — add as pending listener
  608. st.pendingListens[subID] = &pendingListen{
  609. freq: freq,
  610. bw: bw,
  611. mode: mode,
  612. ch: ch,
  613. }
  614. info := defaultAudioInfoForMode(mode)
  615. log.Printf("STREAM: subscriber %d pending (freq=%.1fMHz)", subID, freq/1e6)
  616. log.Printf("LIVEAUDIO MATCH: subscriber=%d pending req=%.3fMHz bw=%.0f mode=%s", subID, freq/1e6, bw, mode)
  617. return subID, ch, info, nil
  618. }
  619. // UnsubscribeAudio removes a live-listen subscriber.
  620. func (st *Streamer) UnsubscribeAudio(subID int64) {
  621. st.mu.Lock()
  622. defer st.mu.Unlock()
  623. if pl, ok := st.pendingListens[subID]; ok {
  624. close(pl.ch)
  625. delete(st.pendingListens, subID)
  626. return
  627. }
  628. for _, sess := range st.sessions {
  629. for i, sub := range sess.audioSubs {
  630. if sub.id == subID {
  631. close(sub.ch)
  632. sess.audioSubs = append(sess.audioSubs[:i], sess.audioSubs[i+1:]...)
  633. return
  634. }
  635. }
  636. }
  637. }
  638. // ---------------------------------------------------------------------------
  639. // Session: stateful extraction + demod
  640. // ---------------------------------------------------------------------------
  641. // processSnippet takes a pre-extracted IQ snippet and demodulates it with
  642. // persistent state. Uses stateful FIR + polyphase resampler for exact 48kHz
  643. // output with zero transient artifacts.
  644. func (sess *streamSession) processSnippet(snippet []complex64, snipRate int) ([]float32, int) {
  645. if len(snippet) == 0 || snipRate <= 0 {
  646. return nil, 0
  647. }
  648. isWFMStereo := sess.demodName == "WFM_STEREO"
  649. isWFM := sess.demodName == "WFM" || isWFMStereo
  650. demodName := sess.demodName
  651. if isWFMStereo {
  652. demodName = "WFM"
  653. }
  654. d := demod.Get(demodName)
  655. if d == nil {
  656. d = demod.Get("NFM")
  657. }
  658. if d == nil {
  659. return nil, 0
  660. }
  661. // The extra 1-sample discriminator overlap prepend was removed after it was
  662. // shown to shift the downstream decimation phase and create heavy click
  663. // artifacts in steady-state streaming/recording. The upstream extraction path
  664. // and the stateful FIR/decimation stages already provide continuity.
  665. fullSnip := snippet
  666. overlapApplied := false
  667. prevTailValid := false
  668. // --- Stateful anti-alias FIR + decimation to demod rate ---
  669. demodRate := d.OutputSampleRate()
  670. decim1 := int(math.Round(float64(snipRate) / float64(demodRate)))
  671. if decim1 < 1 {
  672. decim1 = 1
  673. }
  674. // WFM override: force decim1=2 (256kHz) instead of round(512k/192k)=3 (170kHz).
  675. // At decim1=3, Nyquist is 85kHz which clips FM broadcast ±75kHz deviation.
  676. // At decim1=2, Nyquist is 128kHz → full FM deviation + stereo pilot + guard band.
  677. // Bonus: 256000→48000 resampler ratio is L=3/M=16 (96 taps, 1kB) instead of
  678. // the pathological L=24000/M=85333 (768k taps, 6MB) from 170666→48000.
  679. if isWFM && decim1 > 2 && snipRate/2 >= 200000 {
  680. decim1 = 2
  681. }
  682. actualDemodRate := snipRate / decim1
  683. logging.Debug("demod", "rates", "snipRate", snipRate, "decim1", decim1, "actual", actualDemodRate)
  684. var dec []complex64
  685. if decim1 > 1 {
  686. // FIR cutoff: for WFM, use 90kHz (above ±75kHz FM deviation + guard).
  687. // For NFM/other: use standard Nyquist*0.8 cutoff.
  688. cutoff := float64(actualDemodRate) / 2.0 * 0.8
  689. if isWFM {
  690. cutoff = 90000
  691. }
  692. // Lazy-init or reinit stateful FIR if parameters changed
  693. if sess.preDemodFIR == nil || sess.preDemodRate != snipRate || sess.preDemodCutoff != cutoff {
  694. taps := dsp.LowpassFIR(cutoff, snipRate, 101)
  695. sess.preDemodFIR = dsp.NewStatefulFIRComplex(taps)
  696. sess.preDemodRate = snipRate
  697. sess.preDemodCutoff = cutoff
  698. sess.preDemodDecim = decim1
  699. sess.preDemodDecimPhase = 0
  700. }
  701. decimPhaseBefore := sess.preDemodDecimPhase
  702. filtered := sess.preDemodFIR.ProcessInto(fullSnip, sess.growIQ(len(fullSnip)))
  703. dec = dsp.DecimateStateful(filtered, decim1, &sess.preDemodDecimPhase)
  704. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(filtered), "dec_len", len(dec), "decim1", decim1, "phase_before", decimPhaseBefore, "phase_after", sess.preDemodDecimPhase)
  705. } else {
  706. logging.Debug("boundary", "snippet_path", "signal", sess.signalID, "overlap_applied", overlapApplied, "snip_len", len(snippet), "full_len", len(fullSnip), "filtered_len", len(fullSnip), "dec_len", len(fullSnip), "decim1", decim1, "phase_before", 0, "phase_after", 0)
  707. dec = fullSnip
  708. }
  709. // --- FM/AM/etc Demod ---
  710. audio := d.Demod(dec, actualDemodRate)
  711. if len(audio) == 0 {
  712. return nil, 0
  713. }
  714. logging.Debug("boundary", "audio_path", "signal", sess.signalID, "demod", demodName, "actual_rate", actualDemodRate, "audio_len", len(audio), "channels", d.Channels(), "overlap_applied", overlapApplied, "prev_tail_valid", prevTailValid)
  715. // --- Stateful stereo decode with conservative lock/hysteresis ---
  716. channels := 1
  717. if isWFMStereo {
  718. sess.playbackMode = "WFM_STEREO"
  719. channels = 2 // keep transport format stable for live WFM_STEREO sessions
  720. stereoAudio, locked := sess.stereoDecodeStateful(audio, actualDemodRate)
  721. if locked {
  722. sess.stereoOnCount++
  723. sess.stereoOffCount = 0
  724. if sess.stereoOnCount >= 4 {
  725. sess.stereoEnabled = true
  726. }
  727. } else {
  728. sess.stereoOnCount = 0
  729. sess.stereoOffCount++
  730. if sess.stereoOffCount >= 10 {
  731. sess.stereoEnabled = false
  732. }
  733. }
  734. prevPlayback := sess.playbackMode
  735. prevStereo := sess.stereoState
  736. if sess.stereoEnabled && len(stereoAudio) > 0 {
  737. sess.stereoState = "locked"
  738. audio = stereoAudio
  739. } else {
  740. sess.stereoState = "mono-fallback"
  741. dual := make([]float32, len(audio)*2)
  742. for i, s := range audio {
  743. dual[i*2] = s
  744. dual[i*2+1] = s
  745. }
  746. audio = dual
  747. }
  748. if (prevPlayback != sess.playbackMode || prevStereo != sess.stereoState) && len(sess.audioSubs) > 0 {
  749. sendAudioInfo(sess.audioSubs, sess.audioInfo())
  750. }
  751. }
  752. // --- Polyphase resample to exact 48kHz ---
  753. if actualDemodRate != streamAudioRate {
  754. if channels > 1 {
  755. if sess.stereoResampler == nil || sess.stereoResamplerRate != actualDemodRate {
  756. logging.Info("resample", "reset", "mode", "stereo", "rate", actualDemodRate)
  757. sess.stereoResampler = dsp.NewStereoResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  758. sess.stereoResamplerRate = actualDemodRate
  759. }
  760. audio = sess.stereoResampler.Process(audio)
  761. } else {
  762. if sess.monoResampler == nil || sess.monoResamplerRate != actualDemodRate {
  763. logging.Info("resample", "reset", "mode", "mono", "rate", actualDemodRate)
  764. sess.monoResampler = dsp.NewResampler(actualDemodRate, streamAudioRate, resamplerTaps)
  765. sess.monoResamplerRate = actualDemodRate
  766. }
  767. audio = sess.monoResampler.Process(audio)
  768. }
  769. }
  770. // --- De-emphasis (configurable: 50µs Europe, 75µs US/Japan, 0=disabled) ---
  771. if isWFM && sess.deemphasisUs > 0 && streamAudioRate > 0 {
  772. tau := sess.deemphasisUs * 1e-6
  773. alpha := math.Exp(-1.0 / (float64(streamAudioRate) * tau))
  774. if channels > 1 {
  775. nFrames := len(audio) / channels
  776. yL, yR := sess.deemphL, sess.deemphR
  777. for i := 0; i < nFrames; i++ {
  778. yL = alpha*yL + (1-alpha)*float64(audio[i*2])
  779. audio[i*2] = float32(yL)
  780. yR = alpha*yR + (1-alpha)*float64(audio[i*2+1])
  781. audio[i*2+1] = float32(yR)
  782. }
  783. sess.deemphL, sess.deemphR = yL, yR
  784. } else {
  785. y := sess.deemphL
  786. for i := range audio {
  787. y = alpha*y + (1-alpha)*float64(audio[i])
  788. audio[i] = float32(y)
  789. }
  790. sess.deemphL = y
  791. }
  792. }
  793. if isWFM {
  794. for i := range audio {
  795. audio[i] *= 0.35
  796. }
  797. }
  798. return audio, streamAudioRate
  799. }
  800. // pllCoefficients returns the proportional (alpha) and integral (beta) gains
  801. // for a Type-II PLL using the specified loop bandwidth and damping factor.
  802. // loopBW is in Hz, sampleRate in samples/sec.
  803. func pllCoefficients(loopBW, damping float64, sampleRate int) (float64, float64) {
  804. if sampleRate <= 0 || loopBW <= 0 {
  805. return 0, 0
  806. }
  807. bl := loopBW / float64(sampleRate)
  808. theta := bl / (damping + 0.25/damping)
  809. d := 1 + 2*damping*theta + theta*theta
  810. alpha := (4 * damping * theta) / d
  811. beta := (4 * theta * theta) / d
  812. return alpha, beta
  813. }
  814. // stereoDecodeStateful: pilot-locked 38kHz oscillator for L-R extraction.
  815. // Uses persistent FIR filter state across frames for click-free stereo.
  816. // Reuses session scratch buffers to minimize allocations.
  817. func (sess *streamSession) stereoDecodeStateful(mono []float32, sampleRate int) ([]float32, bool) {
  818. if len(mono) == 0 || sampleRate <= 0 {
  819. return nil, false
  820. }
  821. n := len(mono)
  822. // Rebuild rate-dependent stereo filters when sampleRate changes
  823. if sess.stereoLPF == nil || sess.stereoFilterRate != sampleRate {
  824. lp := dsp.LowpassFIR(15000, sampleRate, 101)
  825. sess.stereoLPF = dsp.NewStatefulFIRReal(lp)
  826. sess.stereoBPHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(53000, sampleRate, 101))
  827. sess.stereoBPLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(23000, sampleRate, 101))
  828. sess.stereoLRLPF = dsp.NewStatefulFIRReal(lp)
  829. // Narrow pilot bandpass via LPF(21k)-LPF(17k).
  830. sess.pilotLPFHi = dsp.NewStatefulFIRReal(dsp.LowpassFIR(21000, sampleRate, 101))
  831. sess.pilotLPFLo = dsp.NewStatefulFIRReal(dsp.LowpassFIR(17000, sampleRate, 101))
  832. sess.stereoFilterRate = sampleRate
  833. // Initialize PLL for 19kHz pilot tracking.
  834. sess.pilotPhase = 0
  835. sess.pilotFreq = 2 * math.Pi * 19000 / float64(sampleRate)
  836. sess.pilotAlpha, sess.pilotBeta = pllCoefficients(50, 0.707, sampleRate)
  837. sess.pilotErrAvg = 0
  838. sess.pilotI = 0
  839. sess.pilotQ = 0
  840. sess.pilotLPAlpha = 1 - math.Exp(-2*math.Pi*200/float64(sampleRate))
  841. }
  842. // Reuse scratch for intermediates: lpr, bpfLR, lr, work1, work2.
  843. scratch := sess.growAudio(n * 5)
  844. lpr := scratch[:n]
  845. bpfLR := scratch[n : 2*n]
  846. lr := scratch[2*n : 3*n]
  847. work1 := scratch[3*n : 4*n]
  848. work2 := scratch[4*n : 5*n]
  849. sess.stereoLPF.ProcessInto(mono, lpr)
  850. // 23-53kHz bandpass for L-R DSB-SC.
  851. sess.stereoBPHi.ProcessInto(mono, work1)
  852. sess.stereoBPLo.ProcessInto(mono, work2)
  853. for i := 0; i < n; i++ {
  854. bpfLR[i] = work1[i] - work2[i]
  855. }
  856. // 19kHz pilot bandpass for PLL.
  857. sess.pilotLPFHi.ProcessInto(mono, work1)
  858. sess.pilotLPFLo.ProcessInto(mono, work2)
  859. for i := 0; i < n; i++ {
  860. work1[i] = work1[i] - work2[i]
  861. }
  862. pilot := work1
  863. phase := sess.pilotPhase
  864. freq := sess.pilotFreq
  865. alpha := sess.pilotAlpha
  866. beta := sess.pilotBeta
  867. iState := sess.pilotI
  868. qState := sess.pilotQ
  869. lpAlpha := sess.pilotLPAlpha
  870. minFreq := 2 * math.Pi * 17000 / float64(sampleRate)
  871. maxFreq := 2 * math.Pi * 21000 / float64(sampleRate)
  872. var pilotPower float64
  873. var totalPower float64
  874. var errSum float64
  875. for i := 0; i < n; i++ {
  876. p := float64(pilot[i])
  877. sinP, cosP := math.Sincos(phase)
  878. iMix := p * cosP
  879. qMix := p * -sinP
  880. iState += lpAlpha * (iMix - iState)
  881. qState += lpAlpha * (qMix - qState)
  882. err := math.Atan2(qState, iState)
  883. freq += beta * err
  884. if freq < minFreq {
  885. freq = minFreq
  886. } else if freq > maxFreq {
  887. freq = maxFreq
  888. }
  889. phase += freq + alpha*err
  890. if phase > 2*math.Pi {
  891. phase -= 2 * math.Pi
  892. } else if phase < 0 {
  893. phase += 2 * math.Pi
  894. }
  895. totalPower += float64(mono[i]) * float64(mono[i])
  896. pilotPower += p * p
  897. errSum += math.Abs(err)
  898. lr[i] = bpfLR[i] * float32(2*math.Sin(2*phase))
  899. }
  900. sess.pilotPhase = phase
  901. sess.pilotFreq = freq
  902. sess.pilotI = iState
  903. sess.pilotQ = qState
  904. blockErr := errSum / float64(n)
  905. sess.pilotErrAvg = 0.9*sess.pilotErrAvg + 0.1*blockErr
  906. lr = sess.stereoLRLPF.ProcessInto(lr, lr)
  907. pilotRatio := 0.0
  908. if totalPower > 0 {
  909. pilotRatio = pilotPower / totalPower
  910. }
  911. freqHz := sess.pilotFreq * float64(sampleRate) / (2 * math.Pi)
  912. // Lock heuristics: pilot power fraction and PLL phase error stability.
  913. // Pilot power is a small but stable fraction of composite energy; require
  914. // a modest floor plus PLL settling to avoid flapping in noise.
  915. locked := pilotRatio > 0.003 && math.Abs(freqHz-19000) < 250 && sess.pilotErrAvg < 0.35
  916. out := make([]float32, n*2)
  917. for i := 0; i < n; i++ {
  918. out[i*2] = 0.5 * (lpr[i] + lr[i])
  919. out[i*2+1] = 0.5 * (lpr[i] - lr[i])
  920. }
  921. return out, locked
  922. }
  923. // dspStateSnapshot captures persistent DSP state for segment splits.
  924. type dspStateSnapshot struct {
  925. overlapIQ []complex64
  926. deemphL float64
  927. deemphR float64
  928. pilotPhase float64
  929. pilotFreq float64
  930. pilotAlpha float64
  931. pilotBeta float64
  932. pilotErrAvg float64
  933. pilotI float64
  934. pilotQ float64
  935. pilotLPAlpha float64
  936. monoResampler *dsp.Resampler
  937. monoResamplerRate int
  938. stereoResampler *dsp.StereoResampler
  939. stereoResamplerRate int
  940. stereoLPF *dsp.StatefulFIRReal
  941. stereoFilterRate int
  942. stereoBPHi *dsp.StatefulFIRReal
  943. stereoBPLo *dsp.StatefulFIRReal
  944. stereoLRLPF *dsp.StatefulFIRReal
  945. stereoAALPF *dsp.StatefulFIRReal
  946. pilotLPFHi *dsp.StatefulFIRReal
  947. pilotLPFLo *dsp.StatefulFIRReal
  948. preDemodFIR *dsp.StatefulFIRComplex
  949. preDemodDecim int
  950. preDemodRate int
  951. preDemodCutoff float64
  952. preDemodDecimPhase int
  953. }
  954. func (sess *streamSession) captureDSPState() dspStateSnapshot {
  955. return dspStateSnapshot{
  956. overlapIQ: sess.overlapIQ,
  957. deemphL: sess.deemphL,
  958. deemphR: sess.deemphR,
  959. pilotPhase: sess.pilotPhase,
  960. pilotFreq: sess.pilotFreq,
  961. pilotAlpha: sess.pilotAlpha,
  962. pilotBeta: sess.pilotBeta,
  963. pilotErrAvg: sess.pilotErrAvg,
  964. pilotI: sess.pilotI,
  965. pilotQ: sess.pilotQ,
  966. pilotLPAlpha: sess.pilotLPAlpha,
  967. monoResampler: sess.monoResampler,
  968. monoResamplerRate: sess.monoResamplerRate,
  969. stereoResampler: sess.stereoResampler,
  970. stereoResamplerRate: sess.stereoResamplerRate,
  971. stereoLPF: sess.stereoLPF,
  972. stereoFilterRate: sess.stereoFilterRate,
  973. stereoBPHi: sess.stereoBPHi,
  974. stereoBPLo: sess.stereoBPLo,
  975. stereoLRLPF: sess.stereoLRLPF,
  976. stereoAALPF: sess.stereoAALPF,
  977. pilotLPFHi: sess.pilotLPFHi,
  978. pilotLPFLo: sess.pilotLPFLo,
  979. preDemodFIR: sess.preDemodFIR,
  980. preDemodDecim: sess.preDemodDecim,
  981. preDemodRate: sess.preDemodRate,
  982. preDemodCutoff: sess.preDemodCutoff,
  983. preDemodDecimPhase: sess.preDemodDecimPhase,
  984. }
  985. }
  986. func (sess *streamSession) restoreDSPState(s dspStateSnapshot) {
  987. sess.overlapIQ = s.overlapIQ
  988. sess.deemphL = s.deemphL
  989. sess.deemphR = s.deemphR
  990. sess.pilotPhase = s.pilotPhase
  991. sess.pilotFreq = s.pilotFreq
  992. sess.pilotAlpha = s.pilotAlpha
  993. sess.pilotBeta = s.pilotBeta
  994. sess.pilotErrAvg = s.pilotErrAvg
  995. sess.pilotI = s.pilotI
  996. sess.pilotQ = s.pilotQ
  997. sess.pilotLPAlpha = s.pilotLPAlpha
  998. sess.monoResampler = s.monoResampler
  999. sess.monoResamplerRate = s.monoResamplerRate
  1000. sess.stereoResampler = s.stereoResampler
  1001. sess.stereoResamplerRate = s.stereoResamplerRate
  1002. sess.stereoLPF = s.stereoLPF
  1003. sess.stereoFilterRate = s.stereoFilterRate
  1004. sess.stereoBPHi = s.stereoBPHi
  1005. sess.stereoBPLo = s.stereoBPLo
  1006. sess.stereoLRLPF = s.stereoLRLPF
  1007. sess.stereoAALPF = s.stereoAALPF
  1008. sess.pilotLPFHi = s.pilotLPFHi
  1009. sess.pilotLPFLo = s.pilotLPFLo
  1010. sess.preDemodFIR = s.preDemodFIR
  1011. sess.preDemodDecim = s.preDemodDecim
  1012. sess.preDemodRate = s.preDemodRate
  1013. sess.preDemodCutoff = s.preDemodCutoff
  1014. sess.preDemodDecimPhase = s.preDemodDecimPhase
  1015. }
  1016. // ---------------------------------------------------------------------------
  1017. // Session management helpers
  1018. // ---------------------------------------------------------------------------
  1019. func (st *Streamer) openRecordingSession(sig *detector.Signal, now time.Time) (*streamSession, error) {
  1020. outputDir := st.policy.OutputDir
  1021. if outputDir == "" {
  1022. outputDir = "data/recordings"
  1023. }
  1024. demodName, channels := resolveDemod(sig)
  1025. dirName := fmt.Sprintf("%s_%.0fHz_stream%d",
  1026. now.Format("2006-01-02T15-04-05"), sig.CenterHz, sig.ID)
  1027. dir := filepath.Join(outputDir, dirName)
  1028. if err := os.MkdirAll(dir, 0o755); err != nil {
  1029. return nil, err
  1030. }
  1031. wavPath := filepath.Join(dir, "audio.wav")
  1032. f, err := os.Create(wavPath)
  1033. if err != nil {
  1034. return nil, err
  1035. }
  1036. if err := writeStreamWAVHeader(f, streamAudioRate, channels); err != nil {
  1037. f.Close()
  1038. return nil, err
  1039. }
  1040. playbackMode, stereoState := initialPlaybackState(demodName)
  1041. sess := &streamSession{
  1042. signalID: sig.ID,
  1043. centerHz: sig.CenterHz,
  1044. bwHz: sig.BWHz,
  1045. snrDb: sig.SNRDb,
  1046. peakDb: sig.PeakDb,
  1047. class: sig.Class,
  1048. startTime: now,
  1049. lastFeed: now,
  1050. dir: dir,
  1051. wavFile: f,
  1052. wavBuf: bufio.NewWriterSize(f, 64*1024),
  1053. sampleRate: streamAudioRate,
  1054. channels: channels,
  1055. demodName: demodName,
  1056. playbackMode: playbackMode,
  1057. stereoState: stereoState,
  1058. deemphasisUs: st.policy.DeemphasisUs,
  1059. }
  1060. log.Printf("STREAM: opened recording signal=%d %.1fMHz %s dir=%s",
  1061. sig.ID, sig.CenterHz/1e6, demodName, dirName)
  1062. return sess, nil
  1063. }
  1064. func (st *Streamer) openListenSession(sig *detector.Signal, now time.Time) *streamSession {
  1065. demodName, channels := resolveDemod(sig)
  1066. for _, pl := range st.pendingListens {
  1067. if math.Abs(sig.CenterHz-pl.freq) < 200000 {
  1068. if requested := normalizeRequestedMode(pl.mode); requested != "" {
  1069. demodName = requested
  1070. if demodName == "WFM_STEREO" {
  1071. channels = 2
  1072. } else if d := demod.Get(demodName); d != nil {
  1073. channels = d.Channels()
  1074. } else {
  1075. channels = 1
  1076. }
  1077. break
  1078. }
  1079. }
  1080. }
  1081. playbackMode, stereoState := initialPlaybackState(demodName)
  1082. sess := &streamSession{
  1083. signalID: sig.ID,
  1084. centerHz: sig.CenterHz,
  1085. bwHz: sig.BWHz,
  1086. snrDb: sig.SNRDb,
  1087. peakDb: sig.PeakDb,
  1088. class: sig.Class,
  1089. startTime: now,
  1090. lastFeed: now,
  1091. listenOnly: true,
  1092. sampleRate: streamAudioRate,
  1093. channels: channels,
  1094. demodName: demodName,
  1095. playbackMode: playbackMode,
  1096. stereoState: stereoState,
  1097. deemphasisUs: st.policy.DeemphasisUs,
  1098. }
  1099. log.Printf("STREAM: opened listen-only signal=%d %.1fMHz %s",
  1100. sig.ID, sig.CenterHz/1e6, demodName)
  1101. return sess
  1102. }
  1103. func resolveDemod(sig *detector.Signal) (string, int) {
  1104. demodName := "NFM"
  1105. if sig.Class != nil {
  1106. if n := mapClassToDemod(sig.Class.ModType); n != "" {
  1107. demodName = n
  1108. }
  1109. }
  1110. channels := 1
  1111. if demodName == "WFM_STEREO" {
  1112. channels = 2
  1113. } else if d := demod.Get(demodName); d != nil {
  1114. channels = d.Channels()
  1115. }
  1116. return demodName, channels
  1117. }
  1118. func initialPlaybackState(demodName string) (string, string) {
  1119. playbackMode := demodName
  1120. stereoState := "mono"
  1121. if demodName == "WFM_STEREO" {
  1122. stereoState = "searching"
  1123. }
  1124. return playbackMode, stereoState
  1125. }
  1126. func (sess *streamSession) audioInfo() AudioInfo {
  1127. return AudioInfo{
  1128. SampleRate: sess.sampleRate,
  1129. Channels: sess.channels,
  1130. Format: "s16le",
  1131. DemodName: sess.demodName,
  1132. PlaybackMode: sess.playbackMode,
  1133. StereoState: sess.stereoState,
  1134. }
  1135. }
  1136. func sendAudioInfo(subs []audioSub, info AudioInfo) {
  1137. infoJSON, _ := json.Marshal(info)
  1138. tagged := make([]byte, 1+len(infoJSON))
  1139. tagged[0] = 0x00 // tag: audio_info
  1140. copy(tagged[1:], infoJSON)
  1141. for _, sub := range subs {
  1142. select {
  1143. case sub.ch <- tagged:
  1144. default:
  1145. }
  1146. }
  1147. }
  1148. func defaultAudioInfoForMode(mode string) AudioInfo {
  1149. demodName := "NFM"
  1150. if requested := normalizeRequestedMode(mode); requested != "" {
  1151. demodName = requested
  1152. }
  1153. channels := 1
  1154. if demodName == "WFM_STEREO" {
  1155. channels = 2
  1156. } else if d := demod.Get(demodName); d != nil {
  1157. channels = d.Channels()
  1158. }
  1159. playbackMode, stereoState := initialPlaybackState(demodName)
  1160. return AudioInfo{
  1161. SampleRate: streamAudioRate,
  1162. Channels: channels,
  1163. Format: "s16le",
  1164. DemodName: demodName,
  1165. PlaybackMode: playbackMode,
  1166. StereoState: stereoState,
  1167. }
  1168. }
  1169. func normalizeRequestedMode(mode string) string {
  1170. switch strings.ToUpper(strings.TrimSpace(mode)) {
  1171. case "", "AUTO":
  1172. return ""
  1173. case "WFM", "WFM_STEREO", "NFM", "AM", "USB", "LSB", "CW":
  1174. return strings.ToUpper(strings.TrimSpace(mode))
  1175. default:
  1176. return ""
  1177. }
  1178. }
  1179. // growIQ returns a complex64 slice of at least n elements, reusing sess.scratchIQ.
  1180. func (sess *streamSession) growIQ(n int) []complex64 {
  1181. if cap(sess.scratchIQ) >= n {
  1182. return sess.scratchIQ[:n]
  1183. }
  1184. sess.scratchIQ = make([]complex64, n, n*5/4)
  1185. return sess.scratchIQ
  1186. }
  1187. // growAudio returns a float32 slice of at least n elements, reusing sess.scratchAudio.
  1188. func (sess *streamSession) growAudio(n int) []float32 {
  1189. if cap(sess.scratchAudio) >= n {
  1190. return sess.scratchAudio[:n]
  1191. }
  1192. sess.scratchAudio = make([]float32, n, n*5/4)
  1193. return sess.scratchAudio
  1194. }
  1195. // growPCM returns a byte slice of at least n bytes, reusing sess.scratchPCM.
  1196. func (sess *streamSession) growPCM(n int) []byte {
  1197. if cap(sess.scratchPCM) >= n {
  1198. return sess.scratchPCM[:n]
  1199. }
  1200. sess.scratchPCM = make([]byte, n, n*5/4)
  1201. return sess.scratchPCM
  1202. }
  1203. func convertToListenOnly(sess *streamSession) {
  1204. if sess.wavBuf != nil {
  1205. _ = sess.wavBuf.Flush()
  1206. }
  1207. if sess.wavFile != nil {
  1208. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1209. sess.wavFile.Close()
  1210. }
  1211. sess.wavFile = nil
  1212. sess.wavBuf = nil
  1213. sess.listenOnly = true
  1214. log.Printf("STREAM: converted signal=%d to listen-only", sess.signalID)
  1215. }
  1216. func closeSession(sess *streamSession, policy *Policy) {
  1217. if sess.listenOnly {
  1218. return
  1219. }
  1220. if sess.wavBuf != nil {
  1221. _ = sess.wavBuf.Flush()
  1222. }
  1223. if sess.wavFile != nil {
  1224. fixStreamWAVHeader(sess.wavFile, sess.wavSamples, sess.sampleRate, sess.channels)
  1225. sess.wavFile.Close()
  1226. sess.wavFile = nil
  1227. sess.wavBuf = nil
  1228. }
  1229. dur := sess.lastFeed.Sub(sess.startTime)
  1230. files := map[string]any{
  1231. "audio": "audio.wav",
  1232. "audio_sample_rate": sess.sampleRate,
  1233. "audio_channels": sess.channels,
  1234. "audio_demod": sess.demodName,
  1235. "recording_mode": "streaming",
  1236. }
  1237. meta := Meta{
  1238. EventID: sess.signalID,
  1239. Start: sess.startTime,
  1240. End: sess.lastFeed,
  1241. CenterHz: sess.centerHz,
  1242. BandwidthHz: sess.bwHz,
  1243. SampleRate: sess.sampleRate,
  1244. SNRDb: sess.snrDb,
  1245. PeakDb: sess.peakDb,
  1246. Class: sess.class,
  1247. DurationMs: dur.Milliseconds(),
  1248. Files: files,
  1249. }
  1250. b, err := json.MarshalIndent(meta, "", " ")
  1251. if err == nil {
  1252. _ = os.WriteFile(filepath.Join(sess.dir, "meta.json"), b, 0o644)
  1253. }
  1254. if policy != nil {
  1255. enforceQuota(policy.OutputDir, policy.MaxDiskMB)
  1256. }
  1257. }
  1258. func (st *Streamer) fanoutPCM(sess *streamSession, pcm []byte, pcmLen int) {
  1259. if len(sess.audioSubs) == 0 {
  1260. return
  1261. }
  1262. // Tag + copy for all subscribers: 0x01 prefix = PCM audio
  1263. tagged := make([]byte, 1+pcmLen)
  1264. tagged[0] = 0x01
  1265. copy(tagged[1:], pcm[:pcmLen])
  1266. alive := sess.audioSubs[:0]
  1267. for _, sub := range sess.audioSubs {
  1268. select {
  1269. case sub.ch <- tagged:
  1270. default:
  1271. st.droppedPCM++
  1272. logging.Warn("drop", "pcm_drop", "count", st.droppedPCM)
  1273. }
  1274. alive = append(alive, sub)
  1275. }
  1276. sess.audioSubs = alive
  1277. }
  1278. func (st *Streamer) classAllowed(cls *classifier.Classification) bool {
  1279. if len(st.policy.ClassFilter) == 0 {
  1280. return true
  1281. }
  1282. if cls == nil {
  1283. return false
  1284. }
  1285. for _, f := range st.policy.ClassFilter {
  1286. if strings.EqualFold(f, string(cls.ModType)) {
  1287. return true
  1288. }
  1289. }
  1290. return false
  1291. }
  1292. // ErrNoSession is returned when no matching signal session exists.
  1293. var ErrNoSession = errors.New("no active or pending session for this frequency")
  1294. // ---------------------------------------------------------------------------
  1295. // WAV header helpers
  1296. // ---------------------------------------------------------------------------
  1297. func writeStreamWAVHeader(f *os.File, sampleRate int, channels int) error {
  1298. if channels <= 0 {
  1299. channels = 1
  1300. }
  1301. hdr := make([]byte, 44)
  1302. copy(hdr[0:4], "RIFF")
  1303. binary.LittleEndian.PutUint32(hdr[4:8], 36)
  1304. copy(hdr[8:12], "WAVE")
  1305. copy(hdr[12:16], "fmt ")
  1306. binary.LittleEndian.PutUint32(hdr[16:20], 16)
  1307. binary.LittleEndian.PutUint16(hdr[20:22], 1)
  1308. binary.LittleEndian.PutUint16(hdr[22:24], uint16(channels))
  1309. binary.LittleEndian.PutUint32(hdr[24:28], uint32(sampleRate))
  1310. binary.LittleEndian.PutUint32(hdr[28:32], uint32(sampleRate*channels*2))
  1311. binary.LittleEndian.PutUint16(hdr[32:34], uint16(channels*2))
  1312. binary.LittleEndian.PutUint16(hdr[34:36], 16)
  1313. copy(hdr[36:40], "data")
  1314. binary.LittleEndian.PutUint32(hdr[40:44], 0)
  1315. _, err := f.Write(hdr)
  1316. return err
  1317. }
  1318. func fixStreamWAVHeader(f *os.File, totalSamples int64, sampleRate int, channels int) {
  1319. dataSize := uint32(totalSamples * 2)
  1320. var buf [4]byte
  1321. binary.LittleEndian.PutUint32(buf[:], 36+dataSize)
  1322. if _, err := f.Seek(4, 0); err != nil {
  1323. return
  1324. }
  1325. _, _ = f.Write(buf[:])
  1326. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate))
  1327. if _, err := f.Seek(24, 0); err != nil {
  1328. return
  1329. }
  1330. _, _ = f.Write(buf[:])
  1331. binary.LittleEndian.PutUint32(buf[:], uint32(sampleRate*channels*2))
  1332. if _, err := f.Seek(28, 0); err != nil {
  1333. return
  1334. }
  1335. _, _ = f.Write(buf[:])
  1336. binary.LittleEndian.PutUint32(buf[:], dataSize)
  1337. if _, err := f.Seek(40, 0); err != nil {
  1338. return
  1339. }
  1340. _, _ = f.Write(buf[:])
  1341. }
  1342. // ResetStreams forces all active streaming sessions to discard their FIR states and decimation phases.
  1343. // This is used when the upstream DSP drops samples, creating a hard break in phase continuity.
  1344. func (st *Streamer) ResetStreams() {
  1345. st.mu.Lock()
  1346. defer st.mu.Unlock()
  1347. for _, sess := range st.sessions {
  1348. sess.preDemodFIR = nil
  1349. sess.preDemodDecimPhase = 0
  1350. sess.stereoResampler = nil
  1351. sess.monoResampler = nil
  1352. }
  1353. }