123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package sub_coordinator
- import (
- "github.com/stretchr/testify/assert"
- "testing"
- )
- func TestRingBuffer(t *testing.T) {
- // Initialize a RingBuffer with capacity 5
- rb := NewRingBuffer(5)
- // Add timestamps to the buffer
- timestamps := []int64{100, 200, 300, 400, 500}
- for _, ts := range timestamps {
- rb.EnflightTimestamp(ts)
- }
- // Test Add method and buffer size
- expectedSize := 5
- if rb.size != expectedSize {
- t.Errorf("Expected buffer size %d, got %d", expectedSize, rb.size)
- }
- assert.Equal(t, int64(0), rb.OldestAckedTimestamp())
- assert.Equal(t, int64(500), rb.Latest())
- rb.AckTimestamp(200)
- assert.Equal(t, int64(0), rb.OldestAckedTimestamp())
- rb.AckTimestamp(100)
- assert.Equal(t, int64(200), rb.OldestAckedTimestamp())
- rb.EnflightTimestamp(int64(600))
- rb.EnflightTimestamp(int64(700))
- rb.AckTimestamp(500)
- assert.Equal(t, int64(200), rb.OldestAckedTimestamp())
- rb.AckTimestamp(400)
- assert.Equal(t, int64(200), rb.OldestAckedTimestamp())
- rb.AckTimestamp(300)
- assert.Equal(t, int64(500), rb.OldestAckedTimestamp())
- assert.Equal(t, int64(700), rb.Latest())
- }
- func TestInflightMessageTracker(t *testing.T) {
- // Initialize an InflightMessageTracker with capacity 5
- tracker := NewInflightMessageTracker(5)
- // Add inflight messages
- key := []byte("1")
- timestamp := int64(1)
- tracker.EnflightMessage(key, timestamp)
- // Test IsMessageAcknowledged method
- isOld := tracker.IsMessageAcknowledged(key, timestamp-10)
- if !isOld {
- t.Error("Expected message to be old")
- }
- // Test AcknowledgeMessage method
- acked := tracker.AcknowledgeMessage(key, timestamp)
- if !acked {
- t.Error("Expected message to be acked")
- }
- if _, exists := tracker.messages[string(key)]; exists {
- t.Error("Expected message to be deleted after ack")
- }
- if tracker.timestamps.size != 0 {
- t.Error("Expected buffer size to be 0 after ack")
- }
- assert.Equal(t, timestamp, tracker.GetOldestAckedTimestamp())
- }
- func TestInflightMessageTracker2(t *testing.T) {
- // Initialize an InflightMessageTracker with initial capacity 1
- tracker := NewInflightMessageTracker(1)
- tracker.EnflightMessage([]byte("1"), int64(1))
- tracker.EnflightMessage([]byte("2"), int64(2))
- tracker.EnflightMessage([]byte("3"), int64(3))
- tracker.EnflightMessage([]byte("4"), int64(4))
- tracker.EnflightMessage([]byte("5"), int64(5))
- assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
- assert.Equal(t, int64(1), tracker.GetOldestAckedTimestamp())
- // Test IsMessageAcknowledged method
- isAcked := tracker.IsMessageAcknowledged([]byte("2"), int64(2))
- if isAcked {
- t.Error("Expected message to be not acked")
- }
- // Test AcknowledgeMessage method
- assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
- assert.Equal(t, int64(2), tracker.GetOldestAckedTimestamp())
- }
- func TestInflightMessageTracker3(t *testing.T) {
- // Initialize an InflightMessageTracker with initial capacity 1
- tracker := NewInflightMessageTracker(1)
- tracker.EnflightMessage([]byte("1"), int64(1))
- tracker.EnflightMessage([]byte("2"), int64(2))
- tracker.EnflightMessage([]byte("3"), int64(3))
- assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
- tracker.EnflightMessage([]byte("4"), int64(4))
- tracker.EnflightMessage([]byte("5"), int64(5))
- assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
- assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3)))
- tracker.EnflightMessage([]byte("6"), int64(6))
- tracker.EnflightMessage([]byte("7"), int64(7))
- assert.True(t, tracker.AcknowledgeMessage([]byte("4"), int64(4)))
- assert.True(t, tracker.AcknowledgeMessage([]byte("5"), int64(5)))
- assert.True(t, tracker.AcknowledgeMessage([]byte("6"), int64(6)))
- assert.Equal(t, int64(6), tracker.GetOldestAckedTimestamp())
- assert.True(t, tracker.AcknowledgeMessage([]byte("7"), int64(7)))
- assert.Equal(t, int64(7), tracker.GetOldestAckedTimestamp())
- }
- func TestInflightMessageTracker4(t *testing.T) {
- // Initialize an InflightMessageTracker with initial capacity 1
- tracker := NewInflightMessageTracker(1)
- tracker.EnflightMessage([]byte("1"), int64(1))
- tracker.EnflightMessage([]byte("2"), int64(2))
- assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
- assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
- tracker.EnflightMessage([]byte("3"), int64(3))
- assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3)))
- assert.Equal(t, int64(3), tracker.GetOldestAckedTimestamp())
- }
|