Go-based FM stereo transmitter with RDS, Windows-first and cross-platform
25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

402 lines
11KB

  1. package ingest
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/jan/fm-rds-tx/internal/audio"
  9. )
  10. type fakeSource struct {
  11. desc SourceDescriptor
  12. chunks chan PCMChunk
  13. errs chan error
  14. title chan string
  15. stats SourceStats
  16. once sync.Once
  17. }
  18. func newFakeSource() *fakeSource {
  19. return &fakeSource{
  20. desc: SourceDescriptor{ID: "fake", Kind: "stdin-pcm"},
  21. chunks: make(chan PCMChunk, 4),
  22. errs: make(chan error, 1),
  23. title: make(chan string, 4),
  24. stats: SourceStats{State: "running", Connected: true},
  25. }
  26. }
  27. func (s *fakeSource) Descriptor() SourceDescriptor { return s.desc }
  28. func (s *fakeSource) Start(context.Context) error { return nil }
  29. func (s *fakeSource) Stop() error { s.once.Do(func() { close(s.chunks) }); return nil }
  30. func (s *fakeSource) Chunks() <-chan PCMChunk { return s.chunks }
  31. func (s *fakeSource) Errors() <-chan error { return s.errs }
  32. func (s *fakeSource) StreamTitleUpdates() <-chan string {
  33. return s.title
  34. }
  35. func (s *fakeSource) Stats() SourceStats { return s.stats }
  36. func TestRuntimeWritesFramesToStreamSink(t *testing.T) {
  37. sink := audio.NewStreamSource(128, 44100)
  38. src := newFakeSource()
  39. rt := NewRuntime(sink, src)
  40. if err := rt.Start(context.Background()); err != nil {
  41. t.Fatalf("start: %v", err)
  42. }
  43. defer rt.Stop()
  44. src.chunks <- PCMChunk{
  45. Channels: 2,
  46. SampleRateHz: 44100,
  47. Samples: []int32{1000 << 16, -1000 << 16},
  48. }
  49. deadline := time.Now().Add(1 * time.Second)
  50. for sink.Available() < 1 && time.Now().Before(deadline) {
  51. time.Sleep(10 * time.Millisecond)
  52. }
  53. if sink.Available() < 1 {
  54. t.Fatal("expected at least one frame in sink")
  55. }
  56. }
  57. func TestRuntimeRecoversToRunningAfterSourceError(t *testing.T) {
  58. sink := audio.NewStreamSource(128, 44100)
  59. src := newFakeSource()
  60. rt := NewRuntime(sink, src)
  61. if err := rt.Start(context.Background()); err != nil {
  62. t.Fatalf("start: %v", err)
  63. }
  64. defer rt.Stop()
  65. src.errs <- errors.New("decode transient failure")
  66. waitForRuntimeState(t, rt, "degraded")
  67. src.chunks <- PCMChunk{
  68. Channels: 2,
  69. SampleRateHz: 44100,
  70. Samples: []int32{500 << 16, -500 << 16},
  71. }
  72. waitForRuntimeState(t, rt, "running")
  73. }
  74. func TestRuntimeRecoversToRunningAfterConvertError(t *testing.T) {
  75. sink := audio.NewStreamSource(128, 44100)
  76. src := newFakeSource()
  77. rt := NewRuntime(sink, src)
  78. if err := rt.Start(context.Background()); err != nil {
  79. t.Fatalf("start: %v", err)
  80. }
  81. defer rt.Stop()
  82. // Invalid stereo chunk: odd sample count causes conversion error.
  83. src.chunks <- PCMChunk{
  84. Channels: 2,
  85. SampleRateHz: 44100,
  86. Samples: []int32{100 << 16},
  87. }
  88. waitForRuntimeState(t, rt, "degraded")
  89. if got := rt.Stats().Runtime.ConvertErrors; got != 1 {
  90. t.Fatalf("convertErrors=%d want 1", got)
  91. }
  92. src.chunks <- PCMChunk{
  93. Channels: 2,
  94. SampleRateHz: 44100,
  95. Samples: []int32{300 << 16, -300 << 16},
  96. }
  97. waitForRuntimeState(t, rt, "running")
  98. }
  99. func TestRuntimeWithMissingSourceStaysIdleAndReturnsZeroSourceStats(t *testing.T) {
  100. sink := audio.NewStreamSource(128, 44100)
  101. rt := NewRuntime(sink, nil)
  102. if err := rt.Start(context.Background()); err != nil {
  103. t.Fatalf("start: %v", err)
  104. }
  105. stats := rt.Stats()
  106. if stats.Runtime.State != "idle" {
  107. t.Fatalf("runtime state=%q want idle", stats.Runtime.State)
  108. }
  109. if stats.Active.ID != "" || stats.Active.Kind != "" {
  110. t.Fatalf("expected empty active descriptor, got %+v", stats.Active)
  111. }
  112. if stats.Source.State != "" {
  113. t.Fatalf("expected zero-value source stats, got state=%q", stats.Source.State)
  114. }
  115. }
  116. func TestRuntimeStatsExposeActiveDescriptorAndSourceReconnectState(t *testing.T) {
  117. sink := audio.NewStreamSource(128, 44100)
  118. src := newFakeSource()
  119. src.desc = SourceDescriptor{ID: "icecast-primary", Kind: "icecast"}
  120. src.stats = SourceStats{
  121. State: "reconnecting",
  122. Connected: false,
  123. Reconnects: 4,
  124. LastError: "stream ended",
  125. }
  126. rt := NewRuntime(sink, src)
  127. if err := rt.Start(context.Background()); err != nil {
  128. t.Fatalf("start: %v", err)
  129. }
  130. defer rt.Stop()
  131. stats := rt.Stats()
  132. if stats.Active.ID != "icecast-primary" {
  133. t.Fatalf("active id=%q want icecast-primary", stats.Active.ID)
  134. }
  135. if stats.Active.Kind != "icecast" {
  136. t.Fatalf("active kind=%q want icecast", stats.Active.Kind)
  137. }
  138. if stats.Source.Reconnects != 4 {
  139. t.Fatalf("source reconnects=%d want 4", stats.Source.Reconnects)
  140. }
  141. if stats.Source.LastError != "stream ended" {
  142. t.Fatalf("source lastError=%q want stream ended", stats.Source.LastError)
  143. }
  144. }
  145. func TestRuntimePrebufferGateAppliesBeforeSinkWrites(t *testing.T) {
  146. sink := audio.NewStreamSource(512, 1000)
  147. src := newFakeSource()
  148. rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond))
  149. if err := rt.Start(context.Background()); err != nil {
  150. t.Fatalf("start: %v", err)
  151. }
  152. defer rt.Stop()
  153. src.chunks <- PCMChunk{
  154. Channels: 2,
  155. SampleRateHz: 1000,
  156. Samples: stereoSamples(80, 100),
  157. }
  158. time.Sleep(30 * time.Millisecond)
  159. if sink.Available() != 0 {
  160. t.Fatalf("sink available=%d want 0 while prebuffering", sink.Available())
  161. }
  162. stats := rt.Stats()
  163. if stats.Runtime.State != "prebuffering" || !stats.Runtime.Prebuffering {
  164. t.Fatalf("runtime state=%q prebuffering=%t", stats.Runtime.State, stats.Runtime.Prebuffering)
  165. }
  166. if stats.Runtime.BufferedSeconds <= 0 {
  167. t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds)
  168. }
  169. src.chunks <- PCMChunk{
  170. Channels: 2,
  171. SampleRateHz: 1000,
  172. Samples: stereoSamples(40, 120),
  173. }
  174. waitForSinkFrames(t, sink, 1)
  175. waitForRuntimeState(t, rt, "running")
  176. if got := rt.Stats().Runtime.Prebuffering; got {
  177. t.Fatalf("runtime prebuffering=%t want false", got)
  178. }
  179. }
  180. func TestRuntimeWriteBlockedRetainsWorkingBuffer(t *testing.T) {
  181. sink := audio.NewStreamSource(1, 1000)
  182. src := newFakeSource()
  183. rt := NewRuntime(sink, src)
  184. if err := rt.Start(context.Background()); err != nil {
  185. t.Fatalf("start: %v", err)
  186. }
  187. defer rt.Stop()
  188. src.chunks <- PCMChunk{
  189. Channels: 2,
  190. SampleRateHz: 1000,
  191. Samples: stereoSamples(4, 200),
  192. }
  193. waitForSinkFrames(t, sink, 1)
  194. waitForRuntimeState(t, rt, "running")
  195. stats := rt.Stats()
  196. if stats.Runtime.WriteBlocked {
  197. t.Fatalf("runtime writeBlocked=%t want false", stats.Runtime.WriteBlocked)
  198. }
  199. if stats.Runtime.BufferedSeconds <= 0 {
  200. t.Fatalf("runtime bufferedSeconds=%f want > 0", stats.Runtime.BufferedSeconds)
  201. }
  202. if stats.Runtime.DroppedFrames != 0 {
  203. t.Fatalf("runtime droppedFrames=%d want 0", stats.Runtime.DroppedFrames)
  204. }
  205. if got := sink.Stats().Overflows; got != 0 {
  206. t.Fatalf("sink overflows=%d want 0", got)
  207. }
  208. }
  209. func TestRuntimeDrainWorkingBufferIsBurstBounded(t *testing.T) {
  210. sink := audio.NewStreamSource(64, 1000)
  211. rt := NewRuntime(sink, nil)
  212. rt.gateOpen = true
  213. for i := 0; i < 40; i++ {
  214. if !rt.work.push(audio.NewFrame(0.1, -0.1)) {
  215. t.Fatalf("failed to seed work frame %d", i)
  216. }
  217. }
  218. rt.lastDrainAt = time.Now().Add(-time.Second)
  219. rt.drainWorkingBuffer()
  220. if got := sink.Available(); got != 20 {
  221. t.Fatalf("sink available=%d want 20 (20ms burst at 1kHz)", got)
  222. }
  223. if got := rt.work.available(); got != 20 {
  224. t.Fatalf("work available=%d want 20", got)
  225. }
  226. if got := rt.Stats().Runtime.WriteBlocked; got {
  227. t.Fatalf("runtime writeBlocked=%t want false", got)
  228. }
  229. }
  230. func TestRuntimeDrainWorkingBufferHonorsSinkHeadroom(t *testing.T) {
  231. sink := audio.NewStreamSource(64, 1000)
  232. rt := NewRuntime(sink, nil)
  233. for i := 0; i < 63; i++ {
  234. if !sink.WriteFrame(audio.NewFrame(0.2, -0.2)) {
  235. t.Fatalf("failed to seed sink frame %d", i)
  236. }
  237. }
  238. rt.gateOpen = true
  239. for i := 0; i < 8; i++ {
  240. if !rt.work.push(audio.NewFrame(0.3, -0.3)) {
  241. t.Fatalf("failed to seed work frame %d", i)
  242. }
  243. }
  244. rt.lastDrainAt = time.Now().Add(-time.Second)
  245. rt.drainWorkingBuffer()
  246. if got := sink.Available(); got != 64 {
  247. t.Fatalf("sink available=%d want 64", got)
  248. }
  249. if got := rt.work.available(); got != 7 {
  250. t.Fatalf("work available=%d want 7", got)
  251. }
  252. if got := sink.Stats().Overflows; got != 0 {
  253. t.Fatalf("sink overflows=%d want 0", got)
  254. }
  255. if got := rt.Stats().Runtime.WriteBlocked; got {
  256. t.Fatalf("runtime writeBlocked=%t want false", got)
  257. }
  258. }
  259. func TestRuntimeStatsSourceBufferedSecondsIncludesWorkingBuffer(t *testing.T) {
  260. sink := audio.NewStreamSource(32, 1000)
  261. src := newFakeSource()
  262. src.stats = SourceStats{State: "running", Connected: true, BufferedSeconds: 0}
  263. rt := NewRuntime(sink, src, WithPrebuffer(100*time.Millisecond))
  264. if err := rt.Start(context.Background()); err != nil {
  265. t.Fatalf("start: %v", err)
  266. }
  267. defer rt.Stop()
  268. src.chunks <- PCMChunk{
  269. Channels: 2,
  270. SampleRateHz: 1000,
  271. Samples: stereoSamples(50, 300),
  272. }
  273. time.Sleep(20 * time.Millisecond)
  274. stats := rt.Stats()
  275. if stats.Source.BufferedSeconds <= 0 {
  276. t.Fatalf("source bufferedSeconds=%f want > 0", stats.Source.BufferedSeconds)
  277. }
  278. }
  279. func TestRuntimeUpdatesActiveDescriptorFromChunkMetadata(t *testing.T) {
  280. sink := audio.NewStreamSource(128, 44100)
  281. src := newFakeSource()
  282. src.desc = SourceDescriptor{
  283. ID: "icecast-primary",
  284. Kind: "icecast",
  285. Channels: 0,
  286. SampleRateHz: 0,
  287. }
  288. rt := NewRuntime(sink, src)
  289. if err := rt.Start(context.Background()); err != nil {
  290. t.Fatalf("start: %v", err)
  291. }
  292. defer rt.Stop()
  293. src.chunks <- PCMChunk{
  294. Channels: 2,
  295. SampleRateHz: 48000,
  296. Samples: []int32{100 << 16, -100 << 16},
  297. }
  298. waitForRuntimeState(t, rt, "running")
  299. stats := rt.Stats()
  300. if stats.Active.SampleRateHz != 48000 {
  301. t.Fatalf("active sampleRateHz=%d want 48000", stats.Active.SampleRateHz)
  302. }
  303. if stats.Active.Channels != 2 {
  304. t.Fatalf("active channels=%d want 2", stats.Active.Channels)
  305. }
  306. }
  307. func TestRuntimeForwardsStreamTitleUpdatesToHandler(t *testing.T) {
  308. sink := audio.NewStreamSource(128, 44100)
  309. src := newFakeSource()
  310. got := make(chan string, 1)
  311. rt := NewRuntime(sink, src, WithStreamTitleHandler(func(title string) {
  312. got <- title
  313. }))
  314. if err := rt.Start(context.Background()); err != nil {
  315. t.Fatalf("start: %v", err)
  316. }
  317. defer rt.Stop()
  318. src.title <- "Artist - Song"
  319. select {
  320. case title := <-got:
  321. if title != "Artist - Song" {
  322. t.Fatalf("title=%q want %q", title, "Artist - Song")
  323. }
  324. case <-time.After(1 * time.Second):
  325. t.Fatal("timed out waiting for forwarded title")
  326. }
  327. }
  328. func waitForRuntimeState(t *testing.T, rt *Runtime, want string) {
  329. t.Helper()
  330. deadline := time.Now().Add(1 * time.Second)
  331. for time.Now().Before(deadline) {
  332. if got := rt.Stats().Runtime.State; got == want {
  333. return
  334. }
  335. time.Sleep(10 * time.Millisecond)
  336. }
  337. t.Fatalf("timeout waiting for runtime state %q; last=%q", want, rt.Stats().Runtime.State)
  338. }
  339. func waitForSinkFrames(t *testing.T, sink *audio.StreamSource, minFrames int) {
  340. t.Helper()
  341. deadline := time.Now().Add(1 * time.Second)
  342. for time.Now().Before(deadline) {
  343. if sink.Available() >= minFrames {
  344. return
  345. }
  346. time.Sleep(10 * time.Millisecond)
  347. }
  348. t.Fatalf("timeout waiting for sink frames: have=%d want>=%d", sink.Available(), minFrames)
  349. }
  350. func stereoSamples(frames int, v int32) []int32 {
  351. out := make([]int32, 0, frames*2)
  352. for i := 0; i < frames; i++ {
  353. out = append(out, v<<16, -v<<16)
  354. }
  355. return out
  356. }