lfstack.h 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #pragma once
  2. #include <util/generic/noncopyable.h>
  3. #include <atomic>
  4. #include <cstddef>
  5. #include <utility>
  6. //////////////////////////////
  7. // lock free lifo stack
  8. template <class T>
  9. class TLockFreeStack: TNonCopyable {
  10. struct TNode {
  11. T Value;
  12. std::atomic<TNode*> Next;
  13. TNode() = default;
  14. template <class U>
  15. explicit TNode(U&& val)
  16. : Value(std::forward<U>(val))
  17. , Next(nullptr)
  18. {
  19. }
  20. };
  21. std::atomic<TNode*> Head = nullptr;
  22. std::atomic<TNode*> FreePtr = nullptr;
  23. std::atomic<size_t> DequeueCount = 0;
  24. void TryToFreeMemory() {
  25. TNode* current = FreePtr.load(std::memory_order_acquire);
  26. if (!current)
  27. return;
  28. if (DequeueCount.load() == 1) {
  29. // node current is in free list, we are the last thread so try to cleanup
  30. if (FreePtr.compare_exchange_strong(current, nullptr))
  31. EraseList(current);
  32. }
  33. }
  34. void EraseList(TNode* p) {
  35. while (p) {
  36. TNode* next = p->Next;
  37. delete p;
  38. p = next;
  39. }
  40. }
  41. void EnqueueImpl(TNode* head, TNode* tail) {
  42. auto headValue = Head.load(std::memory_order_acquire);
  43. for (;;) {
  44. tail->Next.store(headValue, std::memory_order_release);
  45. // NB. See https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
  46. // The weak forms (1-2) of the functions are allowed to fail spuriously, that is,
  47. // act as if *this != expected even if they are equal.
  48. // When a compare-and-exchange is in a loop, the weak version will yield better
  49. // performance on some platforms.
  50. if (Head.compare_exchange_weak(headValue, head))
  51. break;
  52. }
  53. }
  54. template <class U>
  55. void EnqueueImpl(U&& u) {
  56. TNode* node = new TNode(std::forward<U>(u));
  57. EnqueueImpl(node, node);
  58. }
  59. public:
  60. TLockFreeStack() = default;
  61. ~TLockFreeStack() {
  62. EraseList(Head.load());
  63. EraseList(FreePtr.load());
  64. }
  65. void Enqueue(const T& t) {
  66. EnqueueImpl(t);
  67. }
  68. void Enqueue(T&& t) {
  69. EnqueueImpl(std::move(t));
  70. }
  71. template <typename TCollection>
  72. void EnqueueAll(const TCollection& data) {
  73. EnqueueAll(data.begin(), data.end());
  74. }
  75. template <typename TIter>
  76. void EnqueueAll(TIter dataBegin, TIter dataEnd) {
  77. if (dataBegin == dataEnd) {
  78. return;
  79. }
  80. TIter i = dataBegin;
  81. TNode* node = new TNode(*i);
  82. TNode* tail = node;
  83. for (++i; i != dataEnd; ++i) {
  84. TNode* nextNode = node;
  85. node = new TNode(*i);
  86. node->Next.store(nextNode, std::memory_order_release);
  87. }
  88. EnqueueImpl(node, tail);
  89. }
  90. bool Dequeue(T* res) {
  91. ++DequeueCount;
  92. for (TNode* current = Head.load(std::memory_order_acquire); current;) {
  93. if (Head.compare_exchange_weak(current, current->Next.load(std::memory_order_acquire))) {
  94. *res = std::move(current->Value);
  95. // delete current; // ABA problem
  96. // even more complex node deletion
  97. TryToFreeMemory();
  98. if (--DequeueCount == 0) {
  99. // no other Dequeue()s, can safely reclaim memory
  100. delete current;
  101. } else {
  102. // Dequeue()s in progress, put node to free list
  103. for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) {
  104. current->Next.store(freePtr, std::memory_order_release);
  105. if (FreePtr.compare_exchange_weak(freePtr, current))
  106. break;
  107. }
  108. }
  109. return true;
  110. }
  111. }
  112. TryToFreeMemory();
  113. --DequeueCount;
  114. return false;
  115. }
  116. // add all elements to *res
  117. // elements are returned in order of dequeue (top to bottom; see example in unittest)
  118. template <typename TCollection>
  119. void DequeueAll(TCollection* res) {
  120. ++DequeueCount;
  121. for (TNode* current = Head.load(std::memory_order_acquire); current;) {
  122. if (Head.compare_exchange_weak(current, nullptr)) {
  123. for (TNode* x = current; x;) {
  124. res->push_back(std::move(x->Value));
  125. x = x->Next;
  126. }
  127. // EraseList(current); // ABA problem
  128. // even more complex node deletion
  129. TryToFreeMemory();
  130. if (--DequeueCount == 0) {
  131. // no other Dequeue()s, can safely reclaim memory
  132. EraseList(current);
  133. } else {
  134. // Dequeue()s in progress, add nodes list to free list
  135. TNode* currentLast = current;
  136. while (currentLast->Next) {
  137. currentLast = currentLast->Next;
  138. }
  139. for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) {
  140. currentLast->Next.store(freePtr, std::memory_order_release);
  141. if (FreePtr.compare_exchange_weak(freePtr, current))
  142. break;
  143. }
  144. }
  145. return;
  146. }
  147. }
  148. TryToFreeMemory();
  149. --DequeueCount;
  150. }
  151. bool DequeueSingleConsumer(T* res) {
  152. for (TNode* current = Head.load(std::memory_order_acquire); current;) {
  153. if (Head.compare_exchange_weak(current, current->Next)) {
  154. *res = std::move(current->Value);
  155. delete current; // with single consumer thread ABA does not happen
  156. return true;
  157. }
  158. }
  159. return false;
  160. }
  161. // add all elements to *res
  162. // elements are returned in order of dequeue (top to bottom; see example in unittest)
  163. template <typename TCollection>
  164. void DequeueAllSingleConsumer(TCollection* res) {
  165. for (TNode* head = Head.load(std::memory_order_acquire); head;) {
  166. if (Head.compare_exchange_weak(head, nullptr)) {
  167. for (TNode* x = head; x;) {
  168. res->push_back(std::move(x->Value));
  169. x = x->Next;
  170. }
  171. EraseList(head); // with single consumer thread ABA does not happen
  172. return;
  173. }
  174. }
  175. }
  176. bool IsEmpty() {
  177. return Head.load() == nullptr; // without lock, so result is approximate
  178. }
  179. };