lfstack.h 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. #pragma once
  2. #include <util/generic/noncopyable.h>
  3. #include <util/system/atomic.h>
  4. //////////////////////////////
  5. // lock free lifo stack
  6. template <class T>
  7. class TLockFreeStack: TNonCopyable {
  8. struct TNode {
  9. T Value;
  10. TNode* Next;
  11. TNode() = default;
  12. template <class U>
  13. explicit TNode(U&& val)
  14. : Value(std::forward<U>(val))
  15. , Next(nullptr)
  16. {
  17. }
  18. };
  19. TNode* Head;
  20. TNode* FreePtr;
  21. TAtomic DequeueCount;
  22. void TryToFreeMemory() {
  23. TNode* current = AtomicGet(FreePtr);
  24. if (!current)
  25. return;
  26. if (AtomicAdd(DequeueCount, 0) == 1) {
  27. // node current is in free list, we are the last thread so try to cleanup
  28. if (AtomicCas(&FreePtr, (TNode*)nullptr, current))
  29. EraseList(current);
  30. }
  31. }
  32. void EraseList(TNode* volatile p) {
  33. while (p) {
  34. TNode* next = p->Next;
  35. delete p;
  36. p = next;
  37. }
  38. }
  39. void EnqueueImpl(TNode* volatile head, TNode* volatile tail) {
  40. for (;;) {
  41. tail->Next = AtomicGet(Head);
  42. if (AtomicCas(&Head, head, tail->Next))
  43. break;
  44. }
  45. }
  46. template <class U>
  47. void EnqueueImpl(U&& u) {
  48. TNode* volatile node = new TNode(std::forward<U>(u));
  49. EnqueueImpl(node, node);
  50. }
  51. public:
  52. TLockFreeStack()
  53. : Head(nullptr)
  54. , FreePtr(nullptr)
  55. , DequeueCount(0)
  56. {
  57. }
  58. ~TLockFreeStack() {
  59. EraseList(Head);
  60. EraseList(FreePtr);
  61. }
  62. void Enqueue(const T& t) {
  63. EnqueueImpl(t);
  64. }
  65. void Enqueue(T&& t) {
  66. EnqueueImpl(std::move(t));
  67. }
  68. template <typename TCollection>
  69. void EnqueueAll(const TCollection& data) {
  70. EnqueueAll(data.begin(), data.end());
  71. }
  72. template <typename TIter>
  73. void EnqueueAll(TIter dataBegin, TIter dataEnd) {
  74. if (dataBegin == dataEnd) {
  75. return;
  76. }
  77. TIter i = dataBegin;
  78. TNode* volatile node = new TNode(*i);
  79. TNode* volatile tail = node;
  80. for (++i; i != dataEnd; ++i) {
  81. TNode* nextNode = node;
  82. node = new TNode(*i);
  83. node->Next = nextNode;
  84. }
  85. EnqueueImpl(node, tail);
  86. }
  87. bool Dequeue(T* res) {
  88. AtomicAdd(DequeueCount, 1);
  89. for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
  90. if (AtomicCas(&Head, AtomicGet(current->Next), current)) {
  91. *res = std::move(current->Value);
  92. // delete current; // ABA problem
  93. // even more complex node deletion
  94. TryToFreeMemory();
  95. if (AtomicAdd(DequeueCount, -1) == 0) {
  96. // no other Dequeue()s, can safely reclaim memory
  97. delete current;
  98. } else {
  99. // Dequeue()s in progress, put node to free list
  100. for (;;) {
  101. AtomicSet(current->Next, AtomicGet(FreePtr));
  102. if (AtomicCas(&FreePtr, current, current->Next))
  103. break;
  104. }
  105. }
  106. return true;
  107. }
  108. }
  109. TryToFreeMemory();
  110. AtomicAdd(DequeueCount, -1);
  111. return false;
  112. }
  113. // add all elements to *res
  114. // elements are returned in order of dequeue (top to bottom; see example in unittest)
  115. template <typename TCollection>
  116. void DequeueAll(TCollection* res) {
  117. AtomicAdd(DequeueCount, 1);
  118. for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
  119. if (AtomicCas(&Head, (TNode*)nullptr, current)) {
  120. for (TNode* x = current; x;) {
  121. res->push_back(std::move(x->Value));
  122. x = x->Next;
  123. }
  124. // EraseList(current); // ABA problem
  125. // even more complex node deletion
  126. TryToFreeMemory();
  127. if (AtomicAdd(DequeueCount, -1) == 0) {
  128. // no other Dequeue()s, can safely reclaim memory
  129. EraseList(current);
  130. } else {
  131. // Dequeue()s in progress, add nodes list to free list
  132. TNode* currentLast = current;
  133. while (currentLast->Next) {
  134. currentLast = currentLast->Next;
  135. }
  136. for (;;) {
  137. AtomicSet(currentLast->Next, AtomicGet(FreePtr));
  138. if (AtomicCas(&FreePtr, current, currentLast->Next))
  139. break;
  140. }
  141. }
  142. return;
  143. }
  144. }
  145. TryToFreeMemory();
  146. AtomicAdd(DequeueCount, -1);
  147. }
  148. bool DequeueSingleConsumer(T* res) {
  149. for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
  150. if (AtomicCas(&Head, current->Next, current)) {
  151. *res = std::move(current->Value);
  152. delete current; // with single consumer thread ABA does not happen
  153. return true;
  154. }
  155. }
  156. return false;
  157. }
  158. // add all elements to *res
  159. // elements are returned in order of dequeue (top to bottom; see example in unittest)
  160. template <typename TCollection>
  161. void DequeueAllSingleConsumer(TCollection* res) {
  162. for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
  163. if (AtomicCas(&Head, (TNode*)nullptr, current)) {
  164. for (TNode* x = current; x;) {
  165. res->push_back(std::move(x->Value));
  166. x = x->Next;
  167. }
  168. EraseList(current); // with single consumer thread ABA does not happen
  169. return;
  170. }
  171. }
  172. }
  173. bool IsEmpty() {
  174. AtomicAdd(DequeueCount, 0); // mem barrier
  175. return AtomicGet(Head) == nullptr; // without lock, so result is approximate
  176. }
  177. };