buffered_queue_test.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package buffered_queue
  2. import (
  3. "sync"
  4. "testing"
  5. )
  6. func TestJobQueue(t *testing.T) {
  7. type Job[T any] struct {
  8. ID int
  9. Action string
  10. Data T
  11. }
  12. queue := NewBufferedQueue[Job[string]](2) // Chunk size of 5
  13. queue.Enqueue(Job[string]{ID: 1, Action: "task1", Data: "hello"})
  14. queue.Enqueue(Job[string]{ID: 2, Action: "task2", Data: "world"})
  15. if queue.Size() != 2 {
  16. t.Errorf("Expected queue size of 2, got %d", queue.Size())
  17. }
  18. queue.Enqueue(Job[string]{ID: 3, Action: "task3", Data: "3!"})
  19. queue.Enqueue(Job[string]{ID: 4, Action: "task4", Data: "4!"})
  20. queue.Enqueue(Job[string]{ID: 5, Action: "task5", Data: "5!"})
  21. if queue.Size() != 5 {
  22. t.Errorf("Expected queue size of 5, got %d", queue.Size())
  23. }
  24. println("enqueued 5 items")
  25. println("dequeue", 1)
  26. job, ok := queue.Dequeue()
  27. if !ok {
  28. t.Errorf("Expected dequeue to return true")
  29. }
  30. if job.ID != 1 {
  31. t.Errorf("Expected job ID of 1, got %d", job.ID)
  32. }
  33. println("dequeue", 2)
  34. job, ok = queue.Dequeue()
  35. if !ok {
  36. t.Errorf("Expected dequeue to return true")
  37. }
  38. println("enqueue", 6)
  39. queue.Enqueue(Job[string]{ID: 6, Action: "task6", Data: "6!"})
  40. println("enqueue", 7)
  41. queue.Enqueue(Job[string]{ID: 7, Action: "task7", Data: "7!"})
  42. for i := 0; i < 5; i++ {
  43. println("dequeue ...")
  44. job, ok = queue.Dequeue()
  45. if !ok {
  46. t.Errorf("Expected dequeue to return true")
  47. }
  48. println("dequeued", job.ID)
  49. }
  50. if queue.Size() != 0 {
  51. t.Errorf("Expected queue size of 0, got %d", queue.Size())
  52. }
  53. for i := 0; i < 5; i++ {
  54. println("enqueue", i+8)
  55. queue.Enqueue(Job[string]{ID: i + 8, Action: "task", Data: "data"})
  56. }
  57. for i := 0; i < 5; i++ {
  58. job, ok = queue.Dequeue()
  59. if !ok {
  60. t.Errorf("Expected dequeue to return true")
  61. }
  62. if job.ID != i+8 {
  63. t.Errorf("Expected job ID of %d, got %d", i, job.ID)
  64. }
  65. println("dequeued", job.ID)
  66. }
  67. }
  68. func TestJobQueueClose(t *testing.T) {
  69. type Job[T any] struct {
  70. ID int
  71. Action string
  72. Data T
  73. }
  74. queue := NewBufferedQueue[Job[string]](2)
  75. queue.Enqueue(Job[string]{ID: 1, Action: "task1", Data: "hello"})
  76. queue.Enqueue(Job[string]{ID: 2, Action: "task2", Data: "world"})
  77. wg := sync.WaitGroup{}
  78. wg.Add(1)
  79. go func() {
  80. defer wg.Done()
  81. for data, ok := queue.Dequeue(); ok; data, ok = queue.Dequeue() {
  82. println("dequeued", data.ID)
  83. }
  84. }()
  85. for i := 0; i < 5; i++ {
  86. queue.Enqueue(Job[string]{ID: i + 3, Action: "task", Data: "data"})
  87. }
  88. queue.CloseInput()
  89. wg.Wait()
  90. }
  91. func BenchmarkBufferedQueue(b *testing.B) {
  92. type Job[T any] struct {
  93. ID int
  94. Action string
  95. Data T
  96. }
  97. queue := NewBufferedQueue[Job[string]](1024)
  98. for i := 0; i < b.N; i++ {
  99. queue.Enqueue(Job[string]{ID: i, Action: "task", Data: "data"})
  100. }
  101. for i := 0; i < b.N; i++ {
  102. _, _ = queue.Dequeue()
  103. }
  104. }