inflight_message_tracker_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package sub_coordinator
  2. import (
  3. "github.com/stretchr/testify/assert"
  4. "testing"
  5. )
  6. func TestRingBuffer(t *testing.T) {
  7. // Initialize a RingBuffer with capacity 5
  8. rb := NewRingBuffer(5)
  9. // Add timestamps to the buffer
  10. timestamps := []int64{100, 200, 300, 400, 500}
  11. for _, ts := range timestamps {
  12. rb.EnflightTimestamp(ts)
  13. }
  14. // Test Add method and buffer size
  15. expectedSize := 5
  16. if rb.size != expectedSize {
  17. t.Errorf("Expected buffer size %d, got %d", expectedSize, rb.size)
  18. }
  19. assert.Equal(t, int64(0), rb.OldestAckedTimestamp())
  20. assert.Equal(t, int64(500), rb.Latest())
  21. rb.AckTimestamp(200)
  22. assert.Equal(t, int64(0), rb.OldestAckedTimestamp())
  23. rb.AckTimestamp(100)
  24. assert.Equal(t, int64(200), rb.OldestAckedTimestamp())
  25. rb.EnflightTimestamp(int64(600))
  26. rb.EnflightTimestamp(int64(700))
  27. rb.AckTimestamp(500)
  28. assert.Equal(t, int64(200), rb.OldestAckedTimestamp())
  29. rb.AckTimestamp(400)
  30. assert.Equal(t, int64(200), rb.OldestAckedTimestamp())
  31. rb.AckTimestamp(300)
  32. assert.Equal(t, int64(500), rb.OldestAckedTimestamp())
  33. assert.Equal(t, int64(700), rb.Latest())
  34. }
  35. func TestInflightMessageTracker(t *testing.T) {
  36. // Initialize an InflightMessageTracker with capacity 5
  37. tracker := NewInflightMessageTracker(5)
  38. // Add inflight messages
  39. key := []byte("1")
  40. timestamp := int64(1)
  41. tracker.EnflightMessage(key, timestamp)
  42. // Test IsMessageAcknowledged method
  43. isOld := tracker.IsMessageAcknowledged(key, timestamp-10)
  44. if !isOld {
  45. t.Error("Expected message to be old")
  46. }
  47. // Test AcknowledgeMessage method
  48. acked := tracker.AcknowledgeMessage(key, timestamp)
  49. if !acked {
  50. t.Error("Expected message to be acked")
  51. }
  52. if _, exists := tracker.messages[string(key)]; exists {
  53. t.Error("Expected message to be deleted after ack")
  54. }
  55. if tracker.timestamps.size != 0 {
  56. t.Error("Expected buffer size to be 0 after ack")
  57. }
  58. assert.Equal(t, timestamp, tracker.GetOldestAckedTimestamp())
  59. }
  60. func TestInflightMessageTracker2(t *testing.T) {
  61. // Initialize an InflightMessageTracker with initial capacity 1
  62. tracker := NewInflightMessageTracker(1)
  63. tracker.EnflightMessage([]byte("1"), int64(1))
  64. tracker.EnflightMessage([]byte("2"), int64(2))
  65. tracker.EnflightMessage([]byte("3"), int64(3))
  66. tracker.EnflightMessage([]byte("4"), int64(4))
  67. tracker.EnflightMessage([]byte("5"), int64(5))
  68. assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
  69. assert.Equal(t, int64(1), tracker.GetOldestAckedTimestamp())
  70. // Test IsMessageAcknowledged method
  71. isAcked := tracker.IsMessageAcknowledged([]byte("2"), int64(2))
  72. if isAcked {
  73. t.Error("Expected message to be not acked")
  74. }
  75. // Test AcknowledgeMessage method
  76. assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
  77. assert.Equal(t, int64(2), tracker.GetOldestAckedTimestamp())
  78. }
  79. func TestInflightMessageTracker3(t *testing.T) {
  80. // Initialize an InflightMessageTracker with initial capacity 1
  81. tracker := NewInflightMessageTracker(1)
  82. tracker.EnflightMessage([]byte("1"), int64(1))
  83. tracker.EnflightMessage([]byte("2"), int64(2))
  84. tracker.EnflightMessage([]byte("3"), int64(3))
  85. assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
  86. tracker.EnflightMessage([]byte("4"), int64(4))
  87. tracker.EnflightMessage([]byte("5"), int64(5))
  88. assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
  89. assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3)))
  90. tracker.EnflightMessage([]byte("6"), int64(6))
  91. tracker.EnflightMessage([]byte("7"), int64(7))
  92. assert.True(t, tracker.AcknowledgeMessage([]byte("4"), int64(4)))
  93. assert.True(t, tracker.AcknowledgeMessage([]byte("5"), int64(5)))
  94. assert.True(t, tracker.AcknowledgeMessage([]byte("6"), int64(6)))
  95. assert.Equal(t, int64(6), tracker.GetOldestAckedTimestamp())
  96. assert.True(t, tracker.AcknowledgeMessage([]byte("7"), int64(7)))
  97. assert.Equal(t, int64(7), tracker.GetOldestAckedTimestamp())
  98. }
  99. func TestInflightMessageTracker4(t *testing.T) {
  100. // Initialize an InflightMessageTracker with initial capacity 1
  101. tracker := NewInflightMessageTracker(1)
  102. tracker.EnflightMessage([]byte("1"), int64(1))
  103. tracker.EnflightMessage([]byte("2"), int64(2))
  104. assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
  105. assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
  106. tracker.EnflightMessage([]byte("3"), int64(3))
  107. assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3)))
  108. assert.Equal(t, int64(3), tracker.GetOldestAckedTimestamp())
  109. }