123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- #pragma once
- #include <util/generic/noncopyable.h>
- #include <util/system/atomic.h>
- //////////////////////////////
- // lock free lifo stack
- template <class T>
- class TLockFreeStack: TNonCopyable {
- struct TNode {
- T Value;
- TNode* Next;
- TNode() = default;
- template <class U>
- explicit TNode(U&& val)
- : Value(std::forward<U>(val))
- , Next(nullptr)
- {
- }
- };
- TNode* Head;
- TNode* FreePtr;
- TAtomic DequeueCount;
- void TryToFreeMemory() {
- TNode* current = AtomicGet(FreePtr);
- if (!current)
- return;
- if (AtomicAdd(DequeueCount, 0) == 1) {
- // node current is in free list, we are the last thread so try to cleanup
- if (AtomicCas(&FreePtr, (TNode*)nullptr, current))
- EraseList(current);
- }
- }
- void EraseList(TNode* volatile p) {
- while (p) {
- TNode* next = p->Next;
- delete p;
- p = next;
- }
- }
- void EnqueueImpl(TNode* volatile head, TNode* volatile tail) {
- for (;;) {
- tail->Next = AtomicGet(Head);
- if (AtomicCas(&Head, head, tail->Next))
- break;
- }
- }
- template <class U>
- void EnqueueImpl(U&& u) {
- TNode* volatile node = new TNode(std::forward<U>(u));
- EnqueueImpl(node, node);
- }
- public:
- TLockFreeStack()
- : Head(nullptr)
- , FreePtr(nullptr)
- , DequeueCount(0)
- {
- }
- ~TLockFreeStack() {
- EraseList(Head);
- EraseList(FreePtr);
- }
- void Enqueue(const T& t) {
- EnqueueImpl(t);
- }
- void Enqueue(T&& t) {
- EnqueueImpl(std::move(t));
- }
- template <typename TCollection>
- void EnqueueAll(const TCollection& data) {
- EnqueueAll(data.begin(), data.end());
- }
- template <typename TIter>
- void EnqueueAll(TIter dataBegin, TIter dataEnd) {
- if (dataBegin == dataEnd) {
- return;
- }
- TIter i = dataBegin;
- TNode* volatile node = new TNode(*i);
- TNode* volatile tail = node;
- for (++i; i != dataEnd; ++i) {
- TNode* nextNode = node;
- node = new TNode(*i);
- node->Next = nextNode;
- }
- EnqueueImpl(node, tail);
- }
- bool Dequeue(T* res) {
- AtomicAdd(DequeueCount, 1);
- for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
- if (AtomicCas(&Head, AtomicGet(current->Next), current)) {
- *res = std::move(current->Value);
- // delete current; // ABA problem
- // even more complex node deletion
- TryToFreeMemory();
- if (AtomicAdd(DequeueCount, -1) == 0) {
- // no other Dequeue()s, can safely reclaim memory
- delete current;
- } else {
- // Dequeue()s in progress, put node to free list
- for (;;) {
- AtomicSet(current->Next, AtomicGet(FreePtr));
- if (AtomicCas(&FreePtr, current, current->Next))
- break;
- }
- }
- return true;
- }
- }
- TryToFreeMemory();
- AtomicAdd(DequeueCount, -1);
- return false;
- }
- // add all elements to *res
- // elements are returned in order of dequeue (top to bottom; see example in unittest)
- template <typename TCollection>
- void DequeueAll(TCollection* res) {
- AtomicAdd(DequeueCount, 1);
- for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
- if (AtomicCas(&Head, (TNode*)nullptr, current)) {
- for (TNode* x = current; x;) {
- res->push_back(std::move(x->Value));
- x = x->Next;
- }
- // EraseList(current); // ABA problem
- // even more complex node deletion
- TryToFreeMemory();
- if (AtomicAdd(DequeueCount, -1) == 0) {
- // no other Dequeue()s, can safely reclaim memory
- EraseList(current);
- } else {
- // Dequeue()s in progress, add nodes list to free list
- TNode* currentLast = current;
- while (currentLast->Next) {
- currentLast = currentLast->Next;
- }
- for (;;) {
- AtomicSet(currentLast->Next, AtomicGet(FreePtr));
- if (AtomicCas(&FreePtr, current, currentLast->Next))
- break;
- }
- }
- return;
- }
- }
- TryToFreeMemory();
- AtomicAdd(DequeueCount, -1);
- }
- bool DequeueSingleConsumer(T* res) {
- for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
- if (AtomicCas(&Head, current->Next, current)) {
- *res = std::move(current->Value);
- delete current; // with single consumer thread ABA does not happen
- return true;
- }
- }
- return false;
- }
- // add all elements to *res
- // elements are returned in order of dequeue (top to bottom; see example in unittest)
- template <typename TCollection>
- void DequeueAllSingleConsumer(TCollection* res) {
- for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
- if (AtomicCas(&Head, (TNode*)nullptr, current)) {
- for (TNode* x = current; x;) {
- res->push_back(std::move(x->Value));
- x = x->Next;
- }
- EraseList(current); // with single consumer thread ABA does not happen
- return;
- }
- }
- }
- bool IsEmpty() {
- AtomicAdd(DequeueCount, 0); // mem barrier
- return AtomicGet(Head) == nullptr; // without lock, so result is approximate
- }
- };
|