lfstack.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. #pragma once
  2. #include <util/generic/noncopyable.h>
  3. #include <atomic>
  4. #include <cstddef>
  5. #include <utility>
  6. //////////////////////////////
  7. // lock free lifo stack
  8. template <class T>
  9. class TLockFreeStack: TNonCopyable {
  10. struct TNode {
  11. T Value;
  12. std::atomic<TNode*> Next;
  13. TNode() = default;
  14. template <class U>
  15. explicit TNode(U&& val)
  16. : Value(std::forward<U>(val))
  17. , Next(nullptr)
  18. {
  19. }
  20. };
  21. std::atomic<TNode*> Head = nullptr;
  22. std::atomic<TNode*> FreePtr = nullptr;
  23. std::atomic<size_t> DequeueCount = 0;
  24. void TryToFreeMemory() {
  25. TNode* current = FreePtr.load(std::memory_order_acquire);
  26. if (!current) {
  27. return;
  28. }
  29. if (DequeueCount.load() == 1) {
  30. // node current is in free list, we are the last thread so try to cleanup
  31. if (FreePtr.compare_exchange_strong(current, nullptr)) {
  32. EraseList(current);
  33. }
  34. }
  35. }
  36. void EraseList(TNode* p) {
  37. while (p) {
  38. TNode* next = p->Next;
  39. delete p;
  40. p = next;
  41. }
  42. }
  43. void EnqueueImpl(TNode* head, TNode* tail) {
  44. auto headValue = Head.load(std::memory_order_acquire);
  45. for (;;) {
  46. tail->Next.store(headValue, std::memory_order_release);
  47. // NB. See https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
  48. // The weak forms (1-2) of the functions are allowed to fail spuriously, that is,
  49. // act as if *this != expected even if they are equal.
  50. // When a compare-and-exchange is in a loop, the weak version will yield better
  51. // performance on some platforms.
  52. if (Head.compare_exchange_weak(headValue, head)) {
  53. break;
  54. }
  55. }
  56. }
  57. template <class U>
  58. void EnqueueImpl(U&& u) {
  59. TNode* node = new TNode(std::forward<U>(u));
  60. EnqueueImpl(node, node);
  61. }
  62. public:
  63. TLockFreeStack() = default;
  64. ~TLockFreeStack() {
  65. EraseList(Head.load());
  66. EraseList(FreePtr.load());
  67. }
  68. void Enqueue(const T& t) {
  69. EnqueueImpl(t);
  70. }
  71. void Enqueue(T&& t) {
  72. EnqueueImpl(std::move(t));
  73. }
  74. template <typename TCollection>
  75. void EnqueueAll(const TCollection& data) {
  76. EnqueueAll(data.begin(), data.end());
  77. }
  78. template <typename TIter>
  79. void EnqueueAll(TIter dataBegin, TIter dataEnd) {
  80. if (dataBegin == dataEnd) {
  81. return;
  82. }
  83. TIter i = dataBegin;
  84. TNode* node = new TNode(*i);
  85. TNode* tail = node;
  86. for (++i; i != dataEnd; ++i) {
  87. TNode* nextNode = node;
  88. node = new TNode(*i);
  89. node->Next.store(nextNode, std::memory_order_release);
  90. }
  91. EnqueueImpl(node, tail);
  92. }
  93. bool Dequeue(T* res) {
  94. ++DequeueCount;
  95. for (TNode* current = Head.load(std::memory_order_acquire); current;) {
  96. if (Head.compare_exchange_weak(current, current->Next.load(std::memory_order_acquire))) {
  97. *res = std::move(current->Value);
  98. // delete current; // ABA problem
  99. // even more complex node deletion
  100. TryToFreeMemory();
  101. if (--DequeueCount == 0) {
  102. // no other Dequeue()s, can safely reclaim memory
  103. delete current;
  104. } else {
  105. // Dequeue()s in progress, put node to free list
  106. for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) {
  107. current->Next.store(freePtr, std::memory_order_release);
  108. if (FreePtr.compare_exchange_weak(freePtr, current)) {
  109. break;
  110. }
  111. }
  112. }
  113. return true;
  114. }
  115. }
  116. TryToFreeMemory();
  117. --DequeueCount;
  118. return false;
  119. }
  120. // add all elements to *res
  121. // elements are returned in order of dequeue (top to bottom; see example in unittest)
  122. template <typename TCollection>
  123. void DequeueAll(TCollection* res) {
  124. ++DequeueCount;
  125. for (TNode* current = Head.load(std::memory_order_acquire); current;) {
  126. if (Head.compare_exchange_weak(current, nullptr)) {
  127. for (TNode* x = current; x;) {
  128. res->push_back(std::move(x->Value));
  129. x = x->Next;
  130. }
  131. // EraseList(current); // ABA problem
  132. // even more complex node deletion
  133. TryToFreeMemory();
  134. if (--DequeueCount == 0) {
  135. // no other Dequeue()s, can safely reclaim memory
  136. EraseList(current);
  137. } else {
  138. // Dequeue()s in progress, add nodes list to free list
  139. TNode* currentLast = current;
  140. while (currentLast->Next) {
  141. currentLast = currentLast->Next;
  142. }
  143. for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) {
  144. currentLast->Next.store(freePtr, std::memory_order_release);
  145. if (FreePtr.compare_exchange_weak(freePtr, current)) {
  146. break;
  147. }
  148. }
  149. }
  150. return;
  151. }
  152. }
  153. TryToFreeMemory();
  154. --DequeueCount;
  155. }
  156. bool DequeueSingleConsumer(T* res) {
  157. for (TNode* current = Head.load(std::memory_order_acquire); current;) {
  158. if (Head.compare_exchange_weak(current, current->Next)) {
  159. *res = std::move(current->Value);
  160. delete current; // with single consumer thread ABA does not happen
  161. return true;
  162. }
  163. }
  164. return false;
  165. }
  166. // add all elements to *res
  167. // elements are returned in order of dequeue (top to bottom; see example in unittest)
  168. template <typename TCollection>
  169. void DequeueAllSingleConsumer(TCollection* res) {
  170. for (TNode* head = Head.load(std::memory_order_acquire); head;) {
  171. if (Head.compare_exchange_weak(head, nullptr)) {
  172. for (TNode* x = head; x;) {
  173. res->push_back(std::move(x->Value));
  174. x = x->Next;
  175. }
  176. EraseList(head); // with single consumer thread ABA does not happen
  177. return;
  178. }
  179. }
  180. }
  181. bool IsEmpty() {
  182. return Head.load() == nullptr; // without lock, so result is approximate
  183. }
  184. };