buffered_queue.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package buffered_queue
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. // ItemChunkNode represents a node in the linked list of job chunks
  7. type ItemChunkNode[T any] struct {
  8. items []T
  9. headIndex int
  10. tailIndex int
  11. next *ItemChunkNode[T]
  12. nodeId int
  13. }
  14. // BufferedQueue implements a buffered queue using a linked list of job chunks
  15. type BufferedQueue[T any] struct {
  16. chunkSize int // Maximum number of items per chunk
  17. head *ItemChunkNode[T]
  18. tail *ItemChunkNode[T]
  19. last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
  20. count int // Total number of items in the queue
  21. mutex sync.Mutex
  22. nodeCounter int
  23. waitCond *sync.Cond
  24. isClosed bool
  25. }
  26. // NewBufferedQueue creates a new buffered queue with the specified chunk size
  27. func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
  28. // Create an empty chunk to initialize head and tail
  29. chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
  30. bq := &BufferedQueue[T]{
  31. chunkSize: chunkSize,
  32. head: chunk,
  33. tail: chunk,
  34. last: chunk,
  35. count: 0,
  36. mutex: sync.Mutex{},
  37. }
  38. bq.waitCond = sync.NewCond(&bq.mutex)
  39. return bq
  40. }
  41. // Enqueue adds a job to the queue
  42. func (q *BufferedQueue[T]) Enqueue(job T) error {
  43. if q.isClosed {
  44. return fmt.Errorf("queue is closed")
  45. }
  46. q.mutex.Lock()
  47. defer q.mutex.Unlock()
  48. // If the tail chunk is full, create a new chunk (reusing empty chunks if available)
  49. if q.tail.tailIndex == q.chunkSize {
  50. if q.tail == q.last {
  51. // Create a new chunk
  52. q.nodeCounter++
  53. newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter}
  54. q.tail.next = newChunk
  55. q.tail = newChunk
  56. q.last = newChunk
  57. } else {
  58. // Reuse an empty chunk
  59. q.tail = q.tail.next
  60. q.tail.headIndex = 0
  61. q.tail.tailIndex = 0
  62. // println("tail moved to chunk", q.tail.nodeId)
  63. }
  64. }
  65. // Add the job to the tail chunk
  66. q.tail.items[q.tail.tailIndex] = job
  67. q.tail.tailIndex++
  68. q.count++
  69. if q.count == 1 {
  70. q.waitCond.Signal()
  71. }
  72. return nil
  73. }
  74. // Dequeue removes and returns a job from the queue
  75. func (q *BufferedQueue[T]) Dequeue() (T, bool) {
  76. q.mutex.Lock()
  77. defer q.mutex.Unlock()
  78. for q.count <= 0 && !q.isClosed {
  79. q.waitCond.Wait()
  80. }
  81. if q.count <= 0 && q.isClosed {
  82. var a T
  83. return a, false
  84. }
  85. q.maybeAdjustHeadIndex()
  86. job := q.head.items[q.head.headIndex]
  87. q.head.headIndex++
  88. q.count--
  89. return job, true
  90. }
  91. func (q *BufferedQueue[T]) maybeAdjustHeadIndex() {
  92. if q.head.headIndex == q.chunkSize {
  93. q.last.next = q.head
  94. q.head = q.head.next
  95. q.last = q.last.next
  96. q.last.next = nil
  97. //println("reusing chunk", q.last.nodeId)
  98. //fmt.Printf("head: %+v\n", q.head)
  99. //fmt.Printf("tail: %+v\n", q.tail)
  100. //fmt.Printf("last: %+v\n", q.last)
  101. //fmt.Printf("count: %d\n", q.count)
  102. //for p := q.head; p != nil ; p = p.next {
  103. // fmt.Printf("Node: %+v\n", p)
  104. //}
  105. }
  106. }
  107. func (q *BufferedQueue[T]) PeekHead() (T, bool) {
  108. q.mutex.Lock()
  109. defer q.mutex.Unlock()
  110. if q.count <= 0 {
  111. var a T
  112. return a, false
  113. }
  114. q.maybeAdjustHeadIndex()
  115. job := q.head.items[q.head.headIndex]
  116. return job, true
  117. }
  118. // Size returns the number of items in the queue
  119. func (q *BufferedQueue[T]) Size() int {
  120. q.mutex.Lock()
  121. defer q.mutex.Unlock()
  122. return q.count
  123. }
  124. // IsEmpty returns true if the queue is empty
  125. func (q *BufferedQueue[T]) IsEmpty() bool {
  126. return q.Size() == 0
  127. }
  128. func (q *BufferedQueue[T]) CloseInput() {
  129. q.mutex.Lock()
  130. defer q.mutex.Unlock()
  131. q.isClosed = true
  132. q.waitCond.Broadcast()
  133. }