batching_queue_test.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package util_test
  2. import (
  3. "github.com/stretchr/testify/require"
  4. "heckel.io/ntfy/util"
  5. "math/rand"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. func TestBatchingQueue_InfTimeout(t *testing.T) {
  11. q := util.NewBatchingQueue[int](25, 1*time.Hour)
  12. batches, total := make([][]int, 0), 0
  13. var mu sync.Mutex
  14. go func() {
  15. for batch := range q.Dequeue() {
  16. mu.Lock()
  17. batches = append(batches, batch)
  18. total += len(batch)
  19. mu.Unlock()
  20. }
  21. }()
  22. for i := 0; i < 101; i++ {
  23. go q.Enqueue(i)
  24. }
  25. time.Sleep(time.Second)
  26. mu.Lock()
  27. require.Equal(t, 100, total) // One is missing, stuck in the last batch!
  28. require.Equal(t, 4, len(batches))
  29. mu.Unlock()
  30. }
  31. func TestBatchingQueue_WithTimeout(t *testing.T) {
  32. q := util.NewBatchingQueue[int](25, 100*time.Millisecond)
  33. batches, total := make([][]int, 0), 0
  34. var mu sync.Mutex
  35. go func() {
  36. for batch := range q.Dequeue() {
  37. mu.Lock()
  38. batches = append(batches, batch)
  39. total += len(batch)
  40. mu.Unlock()
  41. }
  42. }()
  43. for i := 0; i < 101; i++ {
  44. go func(i int) {
  45. time.Sleep(time.Duration(rand.Intn(700)) * time.Millisecond)
  46. q.Enqueue(i)
  47. }(i)
  48. }
  49. time.Sleep(time.Second)
  50. mu.Lock()
  51. require.Equal(t, 101, total)
  52. require.True(t, len(batches) > 4) // 101/25
  53. require.True(t, len(batches) < 21)
  54. mu.Unlock()
  55. }