lfqueue.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. #pragma once
  2. #include "fwd.h"
  3. #include <util/generic/ptr.h>
  4. #include <util/system/atomic.h>
  5. #include <util/system/yassert.h>
  6. #include "lfstack.h"
  7. struct TDefaultLFCounter {
  8. template <class T>
  9. void IncCount(const T& data) {
  10. (void)data;
  11. }
  12. template <class T>
  13. void DecCount(const T& data) {
  14. (void)data;
  15. }
  16. };
  17. // @brief lockfree queue
  18. // @tparam T - the queue element, should be movable
  19. // @tparam TCounter, a observer class to count number of items in queue
  20. // be carifull, IncCount and DecCount can be called on a moved object and
  21. // it is TCounter class responsibility to check validity of passed object
  22. template <class T, class TCounter>
  23. class TLockFreeQueue: public TNonCopyable {
  24. struct TListNode {
  25. template <typename U>
  26. TListNode(U&& u, TListNode* next)
  27. : Next(next)
  28. , Data(std::forward<U>(u))
  29. {
  30. }
  31. template <typename U>
  32. explicit TListNode(U&& u)
  33. : Data(std::forward<U>(u))
  34. {
  35. }
  36. TListNode* volatile Next;
  37. T Data;
  38. };
  39. // using inheritance to be able to use 0 bytes for TCounter when we don't need one
  40. struct TRootNode: public TCounter {
  41. TListNode* volatile PushQueue;
  42. TListNode* volatile PopQueue;
  43. TListNode* volatile ToDelete;
  44. TRootNode* volatile NextFree;
  45. TRootNode()
  46. : PushQueue(nullptr)
  47. , PopQueue(nullptr)
  48. , ToDelete(nullptr)
  49. , NextFree(nullptr)
  50. {
  51. }
  52. void CopyCounter(TRootNode* x) {
  53. *(TCounter*)this = *(TCounter*)x;
  54. }
  55. };
  56. static void EraseList(TListNode* n) {
  57. while (n) {
  58. TListNode* keepNext = AtomicGet(n->Next);
  59. delete n;
  60. n = keepNext;
  61. }
  62. }
  63. alignas(64) TRootNode* volatile JobQueue;
  64. alignas(64) volatile TAtomic FreememCounter;
  65. alignas(64) volatile TAtomic FreeingTaskCounter;
  66. alignas(64) TRootNode* volatile FreePtr;
  67. void TryToFreeAsyncMemory() {
  68. TAtomic keepCounter = AtomicAdd(FreeingTaskCounter, 0);
  69. TRootNode* current = AtomicGet(FreePtr);
  70. if (current == nullptr)
  71. return;
  72. if (AtomicAdd(FreememCounter, 0) == 1) {
  73. // we are the last thread, try to cleanup
  74. // check if another thread have cleaned up
  75. if (keepCounter != AtomicAdd(FreeingTaskCounter, 0)) {
  76. return;
  77. }
  78. if (AtomicCas(&FreePtr, (TRootNode*)nullptr, current)) {
  79. // free list
  80. while (current) {
  81. TRootNode* p = AtomicGet(current->NextFree);
  82. EraseList(AtomicGet(current->ToDelete));
  83. delete current;
  84. current = p;
  85. }
  86. AtomicAdd(FreeingTaskCounter, 1);
  87. }
  88. }
  89. }
  90. void AsyncRef() {
  91. AtomicAdd(FreememCounter, 1);
  92. }
  93. void AsyncUnref() {
  94. TryToFreeAsyncMemory();
  95. AtomicAdd(FreememCounter, -1);
  96. }
  97. void AsyncDel(TRootNode* toDelete, TListNode* lst) {
  98. AtomicSet(toDelete->ToDelete, lst);
  99. for (;;) {
  100. AtomicSet(toDelete->NextFree, AtomicGet(FreePtr));
  101. if (AtomicCas(&FreePtr, toDelete, AtomicGet(toDelete->NextFree)))
  102. break;
  103. }
  104. }
  105. void AsyncUnref(TRootNode* toDelete, TListNode* lst) {
  106. TryToFreeAsyncMemory();
  107. if (AtomicAdd(FreememCounter, -1) == 0) {
  108. // no other operations in progress, can safely reclaim memory
  109. EraseList(lst);
  110. delete toDelete;
  111. } else {
  112. // Dequeue()s in progress, put node to free list
  113. AsyncDel(toDelete, lst);
  114. }
  115. }
  116. struct TListInvertor {
  117. TListNode* Copy;
  118. TListNode* Tail;
  119. TListNode* PrevFirst;
  120. TListInvertor()
  121. : Copy(nullptr)
  122. , Tail(nullptr)
  123. , PrevFirst(nullptr)
  124. {
  125. }
  126. ~TListInvertor() {
  127. EraseList(Copy);
  128. }
  129. void CopyWasUsed() {
  130. Copy = nullptr;
  131. Tail = nullptr;
  132. PrevFirst = nullptr;
  133. }
  134. void DoCopy(TListNode* ptr) {
  135. TListNode* newFirst = ptr;
  136. TListNode* newCopy = nullptr;
  137. TListNode* newTail = nullptr;
  138. while (ptr) {
  139. if (ptr == PrevFirst) {
  140. // short cut, we have copied this part already
  141. AtomicSet(Tail->Next, newCopy);
  142. newCopy = Copy;
  143. Copy = nullptr; // do not destroy prev try
  144. if (!newTail)
  145. newTail = Tail; // tried to invert same list
  146. break;
  147. }
  148. TListNode* newElem = new TListNode(ptr->Data, newCopy);
  149. newCopy = newElem;
  150. ptr = AtomicGet(ptr->Next);
  151. if (!newTail)
  152. newTail = newElem;
  153. }
  154. EraseList(Copy); // copy was useless
  155. Copy = newCopy;
  156. PrevFirst = newFirst;
  157. Tail = newTail;
  158. }
  159. };
  160. void EnqueueImpl(TListNode* head, TListNode* tail) {
  161. TRootNode* newRoot = new TRootNode;
  162. AsyncRef();
  163. AtomicSet(newRoot->PushQueue, head);
  164. for (;;) {
  165. TRootNode* curRoot = AtomicGet(JobQueue);
  166. AtomicSet(tail->Next, AtomicGet(curRoot->PushQueue));
  167. AtomicSet(newRoot->PopQueue, AtomicGet(curRoot->PopQueue));
  168. newRoot->CopyCounter(curRoot);
  169. for (TListNode* node = head;; node = AtomicGet(node->Next)) {
  170. newRoot->IncCount(node->Data);
  171. if (node == tail)
  172. break;
  173. }
  174. if (AtomicCas(&JobQueue, newRoot, curRoot)) {
  175. AsyncUnref(curRoot, nullptr);
  176. break;
  177. }
  178. }
  179. }
  180. template <typename TCollection>
  181. static void FillCollection(TListNode* lst, TCollection* res) {
  182. while (lst) {
  183. res->emplace_back(std::move(lst->Data));
  184. lst = AtomicGet(lst->Next);
  185. }
  186. }
  187. /** Traverses a given list simultaneously creating its inversed version.
  188. * After that, fills a collection with a reversed version and returns the last visited lst's node.
  189. */
  190. template <typename TCollection>
  191. static TListNode* FillCollectionReverse(TListNode* lst, TCollection* res) {
  192. if (!lst) {
  193. return nullptr;
  194. }
  195. TListNode* newCopy = nullptr;
  196. do {
  197. TListNode* newElem = new TListNode(std::move(lst->Data), newCopy);
  198. newCopy = newElem;
  199. lst = AtomicGet(lst->Next);
  200. } while (lst);
  201. FillCollection(newCopy, res);
  202. EraseList(newCopy);
  203. return lst;
  204. }
  205. public:
  206. TLockFreeQueue()
  207. : JobQueue(new TRootNode)
  208. , FreememCounter(0)
  209. , FreeingTaskCounter(0)
  210. , FreePtr(nullptr)
  211. {
  212. }
  213. ~TLockFreeQueue() {
  214. AsyncRef();
  215. AsyncUnref(); // should free FreeList
  216. EraseList(JobQueue->PushQueue);
  217. EraseList(JobQueue->PopQueue);
  218. delete JobQueue;
  219. }
  220. template <typename U>
  221. void Enqueue(U&& data) {
  222. TListNode* newNode = new TListNode(std::forward<U>(data));
  223. EnqueueImpl(newNode, newNode);
  224. }
  225. void Enqueue(T&& data) {
  226. TListNode* newNode = new TListNode(std::move(data));
  227. EnqueueImpl(newNode, newNode);
  228. }
  229. void Enqueue(const T& data) {
  230. TListNode* newNode = new TListNode(data);
  231. EnqueueImpl(newNode, newNode);
  232. }
  233. template <typename TCollection>
  234. void EnqueueAll(const TCollection& data) {
  235. EnqueueAll(data.begin(), data.end());
  236. }
  237. template <typename TIter>
  238. void EnqueueAll(TIter dataBegin, TIter dataEnd) {
  239. if (dataBegin == dataEnd)
  240. return;
  241. TIter i = dataBegin;
  242. TListNode* volatile node = new TListNode(*i);
  243. TListNode* volatile tail = node;
  244. for (++i; i != dataEnd; ++i) {
  245. TListNode* nextNode = node;
  246. node = new TListNode(*i, nextNode);
  247. }
  248. EnqueueImpl(node, tail);
  249. }
  250. bool Dequeue(T* data) {
  251. TRootNode* newRoot = nullptr;
  252. TListInvertor listInvertor;
  253. AsyncRef();
  254. for (;;) {
  255. TRootNode* curRoot = AtomicGet(JobQueue);
  256. TListNode* tail = AtomicGet(curRoot->PopQueue);
  257. if (tail) {
  258. // has elems to pop
  259. if (!newRoot)
  260. newRoot = new TRootNode;
  261. AtomicSet(newRoot->PushQueue, AtomicGet(curRoot->PushQueue));
  262. AtomicSet(newRoot->PopQueue, AtomicGet(tail->Next));
  263. newRoot->CopyCounter(curRoot);
  264. newRoot->DecCount(tail->Data);
  265. Y_ASSERT(AtomicGet(curRoot->PopQueue) == tail);
  266. if (AtomicCas(&JobQueue, newRoot, curRoot)) {
  267. *data = std::move(tail->Data);
  268. AtomicSet(tail->Next, nullptr);
  269. AsyncUnref(curRoot, tail);
  270. return true;
  271. }
  272. continue;
  273. }
  274. if (AtomicGet(curRoot->PushQueue) == nullptr) {
  275. delete newRoot;
  276. AsyncUnref();
  277. return false; // no elems to pop
  278. }
  279. if (!newRoot)
  280. newRoot = new TRootNode;
  281. AtomicSet(newRoot->PushQueue, nullptr);
  282. listInvertor.DoCopy(AtomicGet(curRoot->PushQueue));
  283. AtomicSet(newRoot->PopQueue, listInvertor.Copy);
  284. newRoot->CopyCounter(curRoot);
  285. Y_ASSERT(AtomicGet(curRoot->PopQueue) == nullptr);
  286. if (AtomicCas(&JobQueue, newRoot, curRoot)) {
  287. newRoot = nullptr;
  288. listInvertor.CopyWasUsed();
  289. AsyncDel(curRoot, AtomicGet(curRoot->PushQueue));
  290. } else {
  291. AtomicSet(newRoot->PopQueue, nullptr);
  292. }
  293. }
  294. }
  295. template <typename TCollection>
  296. void DequeueAll(TCollection* res) {
  297. AsyncRef();
  298. TRootNode* newRoot = new TRootNode;
  299. TRootNode* curRoot;
  300. do {
  301. curRoot = AtomicGet(JobQueue);
  302. } while (!AtomicCas(&JobQueue, newRoot, curRoot));
  303. FillCollection(curRoot->PopQueue, res);
  304. TListNode* toDeleteHead = curRoot->PushQueue;
  305. TListNode* toDeleteTail = FillCollectionReverse(curRoot->PushQueue, res);
  306. AtomicSet(curRoot->PushQueue, nullptr);
  307. if (toDeleteTail) {
  308. toDeleteTail->Next = curRoot->PopQueue;
  309. } else {
  310. toDeleteTail = curRoot->PopQueue;
  311. }
  312. AtomicSet(curRoot->PopQueue, nullptr);
  313. AsyncUnref(curRoot, toDeleteHead);
  314. }
  315. bool IsEmpty() {
  316. AsyncRef();
  317. TRootNode* curRoot = AtomicGet(JobQueue);
  318. bool res = AtomicGet(curRoot->PushQueue) == nullptr && AtomicGet(curRoot->PopQueue) == nullptr;
  319. AsyncUnref();
  320. return res;
  321. }
  322. TCounter GetCounter() {
  323. AsyncRef();
  324. TRootNode* curRoot = AtomicGet(JobQueue);
  325. TCounter res = *(TCounter*)curRoot;
  326. AsyncUnref();
  327. return res;
  328. }
  329. };
  330. template <class T, class TCounter>
  331. class TAutoLockFreeQueue {
  332. public:
  333. using TRef = THolder<T>;
  334. inline ~TAutoLockFreeQueue() {
  335. TRef tmp;
  336. while (Dequeue(&tmp)) {
  337. }
  338. }
  339. inline bool Dequeue(TRef* t) {
  340. T* res = nullptr;
  341. if (Queue.Dequeue(&res)) {
  342. t->Reset(res);
  343. return true;
  344. }
  345. return false;
  346. }
  347. inline void Enqueue(TRef& t) {
  348. Queue.Enqueue(t.Get());
  349. Y_UNUSED(t.Release());
  350. }
  351. inline void Enqueue(TRef&& t) {
  352. Queue.Enqueue(t.Get());
  353. Y_UNUSED(t.Release());
  354. }
  355. inline bool IsEmpty() {
  356. return Queue.IsEmpty();
  357. }
  358. inline TCounter GetCounter() {
  359. return Queue.GetCounter();
  360. }
  361. private:
  362. TLockFreeQueue<T*, TCounter> Queue;
  363. };