123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- #pragma once
- #include <util/generic/noncopyable.h>
- #include <atomic>
- #include <cstddef>
- #include <utility>
- //////////////////////////////
- // lock free lifo stack
- template <class T>
- class TLockFreeStack: TNonCopyable {
- struct TNode {
- T Value;
- std::atomic<TNode*> Next;
- TNode() = default;
- template <class U>
- explicit TNode(U&& val)
- : Value(std::forward<U>(val))
- , Next(nullptr)
- {
- }
- };
- std::atomic<TNode*> Head = nullptr;
- std::atomic<TNode*> FreePtr = nullptr;
- std::atomic<size_t> DequeueCount = 0;
- void TryToFreeMemory() {
- TNode* current = FreePtr.load(std::memory_order_acquire);
- if (!current) {
- return;
- }
- if (DequeueCount.load() == 1) {
- // node current is in free list, we are the last thread so try to cleanup
- if (FreePtr.compare_exchange_strong(current, nullptr)) {
- EraseList(current);
- }
- }
- }
- void EraseList(TNode* p) {
- while (p) {
- TNode* next = p->Next;
- delete p;
- p = next;
- }
- }
- void EnqueueImpl(TNode* head, TNode* tail) {
- auto headValue = Head.load(std::memory_order_acquire);
- for (;;) {
- tail->Next.store(headValue, std::memory_order_release);
- // NB. See https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
- // The weak forms (1-2) of the functions are allowed to fail spuriously, that is,
- // act as if *this != expected even if they are equal.
- // When a compare-and-exchange is in a loop, the weak version will yield better
- // performance on some platforms.
- if (Head.compare_exchange_weak(headValue, head)) {
- break;
- }
- }
- }
- template <class U>
- void EnqueueImpl(U&& u) {
- TNode* node = new TNode(std::forward<U>(u));
- EnqueueImpl(node, node);
- }
- public:
- TLockFreeStack() = default;
- ~TLockFreeStack() {
- EraseList(Head.load());
- EraseList(FreePtr.load());
- }
- void Enqueue(const T& t) {
- EnqueueImpl(t);
- }
- void Enqueue(T&& t) {
- EnqueueImpl(std::move(t));
- }
- template <typename TCollection>
- void EnqueueAll(const TCollection& data) {
- EnqueueAll(data.begin(), data.end());
- }
- template <typename TIter>
- void EnqueueAll(TIter dataBegin, TIter dataEnd) {
- if (dataBegin == dataEnd) {
- return;
- }
- TIter i = dataBegin;
- TNode* node = new TNode(*i);
- TNode* tail = node;
- for (++i; i != dataEnd; ++i) {
- TNode* nextNode = node;
- node = new TNode(*i);
- node->Next.store(nextNode, std::memory_order_release);
- }
- EnqueueImpl(node, tail);
- }
- bool Dequeue(T* res) {
- ++DequeueCount;
- for (TNode* current = Head.load(std::memory_order_acquire); current;) {
- if (Head.compare_exchange_weak(current, current->Next.load(std::memory_order_acquire))) {
- *res = std::move(current->Value);
- // delete current; // ABA problem
- // even more complex node deletion
- TryToFreeMemory();
- if (--DequeueCount == 0) {
- // no other Dequeue()s, can safely reclaim memory
- delete current;
- } else {
- // Dequeue()s in progress, put node to free list
- for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) {
- current->Next.store(freePtr, std::memory_order_release);
- if (FreePtr.compare_exchange_weak(freePtr, current)) {
- break;
- }
- }
- }
- return true;
- }
- }
- TryToFreeMemory();
- --DequeueCount;
- return false;
- }
- // add all elements to *res
- // elements are returned in order of dequeue (top to bottom; see example in unittest)
- template <typename TCollection>
- void DequeueAll(TCollection* res) {
- ++DequeueCount;
- for (TNode* current = Head.load(std::memory_order_acquire); current;) {
- if (Head.compare_exchange_weak(current, nullptr)) {
- for (TNode* x = current; x;) {
- res->push_back(std::move(x->Value));
- x = x->Next;
- }
- // EraseList(current); // ABA problem
- // even more complex node deletion
- TryToFreeMemory();
- if (--DequeueCount == 0) {
- // no other Dequeue()s, can safely reclaim memory
- EraseList(current);
- } else {
- // Dequeue()s in progress, add nodes list to free list
- TNode* currentLast = current;
- while (currentLast->Next) {
- currentLast = currentLast->Next;
- }
- for (TNode* freePtr = FreePtr.load(std::memory_order_acquire);;) {
- currentLast->Next.store(freePtr, std::memory_order_release);
- if (FreePtr.compare_exchange_weak(freePtr, current)) {
- break;
- }
- }
- }
- return;
- }
- }
- TryToFreeMemory();
- --DequeueCount;
- }
- bool DequeueSingleConsumer(T* res) {
- for (TNode* current = Head.load(std::memory_order_acquire); current;) {
- if (Head.compare_exchange_weak(current, current->Next)) {
- *res = std::move(current->Value);
- delete current; // with single consumer thread ABA does not happen
- return true;
- }
- }
- return false;
- }
- // add all elements to *res
- // elements are returned in order of dequeue (top to bottom; see example in unittest)
- template <typename TCollection>
- void DequeueAllSingleConsumer(TCollection* res) {
- for (TNode* head = Head.load(std::memory_order_acquire); head;) {
- if (Head.compare_exchange_weak(head, nullptr)) {
- for (TNode* x = head; x;) {
- res->push_back(std::move(x->Value));
- x = x->Next;
- }
- EraseList(head); // with single consumer thread ABA does not happen
- return;
- }
- }
- }
- bool IsEmpty() {
- return Head.load() == nullptr; // without lock, so result is approximate
- }
- };
|