inflight_message_tracker.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package sub_coordinator
  2. import (
  3. "sort"
  4. "sync"
  5. )
  6. type InflightMessageTracker struct {
  7. messages map[string]int64
  8. mu sync.Mutex
  9. timestamps *RingBuffer
  10. }
  11. func NewInflightMessageTracker(capacity int) *InflightMessageTracker {
  12. return &InflightMessageTracker{
  13. messages: make(map[string]int64),
  14. timestamps: NewRingBuffer(capacity),
  15. }
  16. }
  17. // EnflightMessage tracks the message with the key and timestamp.
  18. // These messages are sent to the consumer group instances and waiting for ack.
  19. func (imt *InflightMessageTracker) EnflightMessage(key []byte, tsNs int64) {
  20. // fmt.Printf("EnflightMessage(%s,%d)\n", string(key), tsNs)
  21. imt.mu.Lock()
  22. defer imt.mu.Unlock()
  23. imt.messages[string(key)] = tsNs
  24. imt.timestamps.EnflightTimestamp(tsNs)
  25. }
  26. // IsMessageAcknowledged returns true if the message has been acknowledged.
  27. // If the message is older than the oldest inflight messages, returns false.
  28. // returns false if the message is inflight.
  29. // Otherwise, returns false if the message is old and can be ignored.
  30. func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64) bool {
  31. imt.mu.Lock()
  32. defer imt.mu.Unlock()
  33. if tsNs <= imt.timestamps.OldestAckedTimestamp() {
  34. return true
  35. }
  36. if tsNs > imt.timestamps.Latest() {
  37. return false
  38. }
  39. if _, found := imt.messages[string(key)]; found {
  40. return false
  41. }
  42. return true
  43. }
  44. // AcknowledgeMessage acknowledges the message with the key and timestamp.
  45. func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool {
  46. // fmt.Printf("AcknowledgeMessage(%s,%d)\n", string(key), tsNs)
  47. imt.mu.Lock()
  48. defer imt.mu.Unlock()
  49. timestamp, exists := imt.messages[string(key)]
  50. if !exists || timestamp != tsNs {
  51. return false
  52. }
  53. delete(imt.messages, string(key))
  54. // Remove the specific timestamp from the ring buffer.
  55. imt.timestamps.AckTimestamp(tsNs)
  56. return true
  57. }
  58. func (imt *InflightMessageTracker) GetOldestAckedTimestamp() int64 {
  59. return imt.timestamps.OldestAckedTimestamp()
  60. }
  61. // IsInflight returns true if the message with the key is inflight.
  62. func (imt *InflightMessageTracker) IsInflight(key []byte) bool {
  63. imt.mu.Lock()
  64. defer imt.mu.Unlock()
  65. _, found := imt.messages[string(key)]
  66. return found
  67. }
  68. type TimestampStatus struct {
  69. Timestamp int64
  70. Acked bool
  71. }
  72. // RingBuffer represents a circular buffer to hold timestamps.
  73. type RingBuffer struct {
  74. buffer []*TimestampStatus
  75. head int
  76. size int
  77. maxTimestamp int64
  78. maxAllAckedTs int64
  79. }
  80. // NewRingBuffer creates a new RingBuffer of the given capacity.
  81. func NewRingBuffer(capacity int) *RingBuffer {
  82. return &RingBuffer{
  83. buffer: newBuffer(capacity),
  84. }
  85. }
  86. func newBuffer(capacity int) []*TimestampStatus {
  87. buffer := make([]*TimestampStatus, capacity)
  88. for i := range buffer {
  89. buffer[i] = &TimestampStatus{}
  90. }
  91. return buffer
  92. }
  93. // EnflightTimestamp adds a new timestamp to the ring buffer.
  94. func (rb *RingBuffer) EnflightTimestamp(timestamp int64) {
  95. if rb.size < len(rb.buffer) {
  96. rb.size++
  97. } else {
  98. newBuf := newBuffer(2 * len(rb.buffer))
  99. for i := 0; i < rb.size; i++ {
  100. newBuf[i] = rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)]
  101. }
  102. rb.buffer = newBuf
  103. rb.head = rb.size
  104. rb.size++
  105. }
  106. head := rb.buffer[rb.head]
  107. head.Timestamp = timestamp
  108. head.Acked = false
  109. rb.head = (rb.head + 1) % len(rb.buffer)
  110. if timestamp > rb.maxTimestamp {
  111. rb.maxTimestamp = timestamp
  112. }
  113. }
  114. // AckTimestamp removes the specified timestamp from the ring buffer.
  115. func (rb *RingBuffer) AckTimestamp(timestamp int64) {
  116. // Perform binary search
  117. index := sort.Search(rb.size, func(i int) bool {
  118. return rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)].Timestamp >= timestamp
  119. })
  120. actualIndex := (rb.head + len(rb.buffer) - rb.size + index) % len(rb.buffer)
  121. rb.buffer[actualIndex].Acked = true
  122. // Remove all the continuously acknowledged timestamps from the buffer
  123. startPos := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer)
  124. for i := 0; i < len(rb.buffer) && rb.buffer[(startPos+i)%len(rb.buffer)].Acked; i++ {
  125. t := rb.buffer[(startPos+i)%len(rb.buffer)]
  126. if rb.maxAllAckedTs < t.Timestamp {
  127. rb.size--
  128. rb.maxAllAckedTs = t.Timestamp
  129. }
  130. }
  131. }
  132. // OldestAckedTimestamp returns the oldest that is already acked timestamp in the ring buffer.
  133. func (rb *RingBuffer) OldestAckedTimestamp() int64 {
  134. return rb.maxAllAckedTs
  135. }
  136. // Latest returns the most recently known timestamp in the ring buffer.
  137. func (rb *RingBuffer) Latest() int64 {
  138. return rb.maxTimestamp
  139. }