lfqueue.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. #pragma once
  2. #include "fwd.h"
  3. #include "lfstack.h"
  4. #include <util/generic/ptr.h>
  5. #include <util/system/yassert.h>
  6. #include <atomic>
  7. struct TDefaultLFCounter {
  8. template <class T>
  9. void IncCount(const T& data) {
  10. (void)data;
  11. }
  12. template <class T>
  13. void DecCount(const T& data) {
  14. (void)data;
  15. }
  16. };
  17. // @brief lockfree queue
  18. // @tparam T - the queue element, should be movable
  19. // @tparam TCounter, a observer class to count number of items in queue
  20. // be careful, IncCount and DecCount can be called on a moved object and
  21. // it is TCounter class responsibility to check validity of passed object
  22. template <class T, class TCounter>
  23. class TLockFreeQueue: public TNonCopyable {
  24. struct TListNode {
  25. template <typename U>
  26. TListNode(U&& u, TListNode* next)
  27. : Next(next)
  28. , Data(std::forward<U>(u))
  29. {
  30. }
  31. template <typename U>
  32. explicit TListNode(U&& u)
  33. : Data(std::forward<U>(u))
  34. {
  35. }
  36. std::atomic<TListNode*> Next;
  37. T Data;
  38. };
  39. // using inheritance to be able to use 0 bytes for TCounter when we don't need one
  40. struct TRootNode: public TCounter {
  41. std::atomic<TListNode*> PushQueue = nullptr;
  42. std::atomic<TListNode*> PopQueue = nullptr;
  43. std::atomic<TListNode*> ToDelete = nullptr;
  44. std::atomic<TRootNode*> NextFree = nullptr;
  45. void CopyCounter(TRootNode* x) {
  46. *(TCounter*)this = *(TCounter*)x;
  47. }
  48. };
  49. static void EraseList(TListNode* n) {
  50. while (n) {
  51. TListNode* keepNext = n->Next.load(std::memory_order_acquire);
  52. delete n;
  53. n = keepNext;
  54. }
  55. }
  56. alignas(64) std::atomic<TRootNode*> JobQueue;
  57. alignas(64) std::atomic<size_t> FreememCounter;
  58. alignas(64) std::atomic<size_t> FreeingTaskCounter;
  59. alignas(64) std::atomic<TRootNode*> FreePtr;
  60. void TryToFreeAsyncMemory() {
  61. const auto keepCounter = FreeingTaskCounter.load();
  62. TRootNode* current = FreePtr.load(std::memory_order_acquire);
  63. if (current == nullptr)
  64. return;
  65. if (FreememCounter.load() == 1) {
  66. // we are the last thread, try to cleanup
  67. // check if another thread have cleaned up
  68. if (keepCounter != FreeingTaskCounter.load()) {
  69. return;
  70. }
  71. if (FreePtr.compare_exchange_strong(current, nullptr)) {
  72. // free list
  73. while (current) {
  74. TRootNode* p = current->NextFree.load(std::memory_order_acquire);
  75. EraseList(current->ToDelete.load(std::memory_order_acquire));
  76. delete current;
  77. current = p;
  78. }
  79. ++FreeingTaskCounter;
  80. }
  81. }
  82. }
  83. void AsyncRef() {
  84. ++FreememCounter;
  85. }
  86. void AsyncUnref() {
  87. TryToFreeAsyncMemory();
  88. --FreememCounter;
  89. }
  90. void AsyncDel(TRootNode* toDelete, TListNode* lst) {
  91. toDelete->ToDelete.store(lst, std::memory_order_release);
  92. for (auto freePtr = FreePtr.load();;) {
  93. toDelete->NextFree.store(freePtr, std::memory_order_release);
  94. if (FreePtr.compare_exchange_weak(freePtr, toDelete))
  95. break;
  96. }
  97. }
  98. void AsyncUnref(TRootNode* toDelete, TListNode* lst) {
  99. TryToFreeAsyncMemory();
  100. if (--FreememCounter == 0) {
  101. // no other operations in progress, can safely reclaim memory
  102. EraseList(lst);
  103. delete toDelete;
  104. } else {
  105. // Dequeue()s in progress, put node to free list
  106. AsyncDel(toDelete, lst);
  107. }
  108. }
  109. struct TListInvertor {
  110. TListNode* Copy;
  111. TListNode* Tail;
  112. TListNode* PrevFirst;
  113. TListInvertor()
  114. : Copy(nullptr)
  115. , Tail(nullptr)
  116. , PrevFirst(nullptr)
  117. {
  118. }
  119. ~TListInvertor() {
  120. EraseList(Copy);
  121. }
  122. void CopyWasUsed() {
  123. Copy = nullptr;
  124. Tail = nullptr;
  125. PrevFirst = nullptr;
  126. }
  127. void DoCopy(TListNode* ptr) {
  128. TListNode* newFirst = ptr;
  129. TListNode* newCopy = nullptr;
  130. TListNode* newTail = nullptr;
  131. while (ptr) {
  132. if (ptr == PrevFirst) {
  133. // short cut, we have copied this part already
  134. Tail->Next.store(newCopy, std::memory_order_release);
  135. newCopy = Copy;
  136. Copy = nullptr; // do not destroy prev try
  137. if (!newTail)
  138. newTail = Tail; // tried to invert same list
  139. break;
  140. }
  141. TListNode* newElem = new TListNode(ptr->Data, newCopy);
  142. newCopy = newElem;
  143. ptr = ptr->Next.load(std::memory_order_acquire);
  144. if (!newTail)
  145. newTail = newElem;
  146. }
  147. EraseList(Copy); // copy was useless
  148. Copy = newCopy;
  149. PrevFirst = newFirst;
  150. Tail = newTail;
  151. }
  152. };
  153. void EnqueueImpl(TListNode* head, TListNode* tail) {
  154. TRootNode* newRoot = new TRootNode;
  155. AsyncRef();
  156. newRoot->PushQueue.store(head, std::memory_order_release);
  157. for (TRootNode* curRoot = JobQueue.load(std::memory_order_acquire);;) {
  158. tail->Next.store(curRoot->PushQueue.load(std::memory_order_acquire), std::memory_order_release);
  159. newRoot->PopQueue.store(curRoot->PopQueue.load(std::memory_order_acquire), std::memory_order_release);
  160. newRoot->CopyCounter(curRoot);
  161. for (TListNode* node = head;; node = node->Next.load(std::memory_order_acquire)) {
  162. newRoot->IncCount(node->Data);
  163. if (node == tail)
  164. break;
  165. }
  166. if (JobQueue.compare_exchange_weak(curRoot, newRoot)) {
  167. AsyncUnref(curRoot, nullptr);
  168. break;
  169. }
  170. }
  171. }
  172. template <typename TCollection>
  173. static void FillCollection(TListNode* lst, TCollection* res) {
  174. while (lst) {
  175. res->emplace_back(std::move(lst->Data));
  176. lst = lst->Next.load(std::memory_order_acquire);
  177. }
  178. }
  179. /** Traverses a given list simultaneously creating its inversed version.
  180. * After that, fills a collection with a reversed version and returns the last visited lst's node.
  181. */
  182. template <typename TCollection>
  183. static TListNode* FillCollectionReverse(TListNode* lst, TCollection* res) {
  184. if (!lst) {
  185. return nullptr;
  186. }
  187. TListNode* newCopy = nullptr;
  188. do {
  189. TListNode* newElem = new TListNode(std::move(lst->Data), newCopy);
  190. newCopy = newElem;
  191. lst = lst->Next.load(std::memory_order_acquire);
  192. } while (lst);
  193. FillCollection(newCopy, res);
  194. EraseList(newCopy);
  195. return lst;
  196. }
  197. public:
  198. TLockFreeQueue()
  199. : JobQueue(new TRootNode)
  200. , FreememCounter(0)
  201. , FreeingTaskCounter(0)
  202. , FreePtr(nullptr)
  203. {
  204. }
  205. ~TLockFreeQueue() {
  206. AsyncRef();
  207. AsyncUnref(); // should free FreeList
  208. EraseList(JobQueue.load(std::memory_order_relaxed)->PushQueue.load(std::memory_order_relaxed));
  209. EraseList(JobQueue.load(std::memory_order_relaxed)->PopQueue.load(std::memory_order_relaxed));
  210. delete JobQueue;
  211. }
  212. template <typename U>
  213. void Enqueue(U&& data) {
  214. TListNode* newNode = new TListNode(std::forward<U>(data));
  215. EnqueueImpl(newNode, newNode);
  216. }
  217. void Enqueue(T&& data) {
  218. TListNode* newNode = new TListNode(std::move(data));
  219. EnqueueImpl(newNode, newNode);
  220. }
  221. void Enqueue(const T& data) {
  222. TListNode* newNode = new TListNode(data);
  223. EnqueueImpl(newNode, newNode);
  224. }
  225. template <typename TCollection>
  226. void EnqueueAll(const TCollection& data) {
  227. EnqueueAll(data.begin(), data.end());
  228. }
  229. template <typename TIter>
  230. void EnqueueAll(TIter dataBegin, TIter dataEnd) {
  231. if (dataBegin == dataEnd)
  232. return;
  233. TIter i = dataBegin;
  234. TListNode* node = new TListNode(*i);
  235. TListNode* tail = node;
  236. for (++i; i != dataEnd; ++i) {
  237. TListNode* nextNode = node;
  238. node = new TListNode(*i, nextNode);
  239. }
  240. EnqueueImpl(node, tail);
  241. }
  242. bool Dequeue(T* data) {
  243. TRootNode* newRoot = nullptr;
  244. TListInvertor listInvertor;
  245. AsyncRef();
  246. for (TRootNode* curRoot = JobQueue.load(std::memory_order_acquire);;) {
  247. TListNode* tail = curRoot->PopQueue.load(std::memory_order_acquire);
  248. if (tail) {
  249. // has elems to pop
  250. if (!newRoot)
  251. newRoot = new TRootNode;
  252. newRoot->PushQueue.store(curRoot->PushQueue.load(std::memory_order_acquire), std::memory_order_release);
  253. newRoot->PopQueue.store(tail->Next.load(std::memory_order_acquire), std::memory_order_release);
  254. newRoot->CopyCounter(curRoot);
  255. newRoot->DecCount(tail->Data);
  256. Y_ASSERT(curRoot->PopQueue.load() == tail);
  257. if (JobQueue.compare_exchange_weak(curRoot, newRoot)) {
  258. *data = std::move(tail->Data);
  259. tail->Next.store(nullptr, std::memory_order_release);
  260. AsyncUnref(curRoot, tail);
  261. return true;
  262. }
  263. continue;
  264. }
  265. if (curRoot->PushQueue.load(std::memory_order_acquire) == nullptr) {
  266. delete newRoot;
  267. AsyncUnref();
  268. return false; // no elems to pop
  269. }
  270. if (!newRoot)
  271. newRoot = new TRootNode;
  272. newRoot->PushQueue.store(nullptr, std::memory_order_release);
  273. listInvertor.DoCopy(curRoot->PushQueue.load(std::memory_order_acquire));
  274. newRoot->PopQueue.store(listInvertor.Copy, std::memory_order_release);
  275. newRoot->CopyCounter(curRoot);
  276. Y_ASSERT(curRoot->PopQueue.load() == nullptr);
  277. if (JobQueue.compare_exchange_weak(curRoot, newRoot)) {
  278. AsyncDel(curRoot, curRoot->PushQueue.load(std::memory_order_acquire));
  279. curRoot = newRoot;
  280. newRoot = nullptr;
  281. listInvertor.CopyWasUsed();
  282. } else {
  283. newRoot->PopQueue.store(nullptr, std::memory_order_release);
  284. }
  285. }
  286. }
  287. template <typename TCollection>
  288. void DequeueAll(TCollection* res) {
  289. AsyncRef();
  290. TRootNode* newRoot = new TRootNode;
  291. TRootNode* curRoot = JobQueue.load(std::memory_order_acquire);
  292. do {
  293. } while (!JobQueue.compare_exchange_weak(curRoot, newRoot));
  294. FillCollection(curRoot->PopQueue, res);
  295. TListNode* toDeleteHead = curRoot->PushQueue;
  296. TListNode* toDeleteTail = FillCollectionReverse(curRoot->PushQueue, res);
  297. curRoot->PushQueue.store(nullptr, std::memory_order_release);
  298. if (toDeleteTail) {
  299. toDeleteTail->Next.store(curRoot->PopQueue.load());
  300. } else {
  301. toDeleteTail = curRoot->PopQueue;
  302. }
  303. curRoot->PopQueue.store(nullptr, std::memory_order_release);
  304. AsyncUnref(curRoot, toDeleteHead);
  305. }
  306. bool IsEmpty() {
  307. AsyncRef();
  308. TRootNode* curRoot = JobQueue.load(std::memory_order_acquire);
  309. bool res = curRoot->PushQueue.load(std::memory_order_acquire) == nullptr &&
  310. curRoot->PopQueue.load(std::memory_order_acquire) == nullptr;
  311. AsyncUnref();
  312. return res;
  313. }
  314. TCounter GetCounter() {
  315. AsyncRef();
  316. TRootNode* curRoot = JobQueue.load(std::memory_order_acquire);
  317. TCounter res = *(TCounter*)curRoot;
  318. AsyncUnref();
  319. return res;
  320. }
  321. };
  322. template <class T, class TCounter>
  323. class TAutoLockFreeQueue {
  324. public:
  325. using TRef = THolder<T>;
  326. inline ~TAutoLockFreeQueue() {
  327. TRef tmp;
  328. while (Dequeue(&tmp)) {
  329. }
  330. }
  331. inline bool Dequeue(TRef* t) {
  332. T* res = nullptr;
  333. if (Queue.Dequeue(&res)) {
  334. t->Reset(res);
  335. return true;
  336. }
  337. return false;
  338. }
  339. inline void Enqueue(TRef& t) {
  340. Queue.Enqueue(t.Get());
  341. Y_UNUSED(t.Release());
  342. }
  343. inline void Enqueue(TRef&& t) {
  344. Queue.Enqueue(t.Get());
  345. Y_UNUSED(t.Release());
  346. }
  347. inline bool IsEmpty() {
  348. return Queue.IsEmpty();
  349. }
  350. inline TCounter GetCounter() {
  351. return Queue.GetCounter();
  352. }
  353. private:
  354. TLockFreeQueue<T*, TCounter> Queue;
  355. };