#pragma once /* http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue Simple semi-wait-free queue. Many producers - one consumer. Tracking of allocated memory is not required. No CAS. Only atomic swap (exchange) operations. WARNING: a sleeping producer can stop progress for consumer. WARNING: there is no wait¬ify mechanic for consumer, consumer receives nullptr if queue was empty. WARNING: the algorithm itself is lock-free but producers and consumer could be blocked by memory allocator Reference design: rtmapreduce/libs/threading/lfqueue.h */ #include #include #include #include "tune.h" namespace NThreading { namespace NHTSwapPrivate { template struct TNode : public TTuneup::TNodeBase, public TTuneup::template TNodeLayout, T> { TNode(const T& item) { this->Next = nullptr; this->Item = item; } TNode(T&& item) { this->Next = nullptr; this->Item = std::move(item); } }; struct TDefaultTuneup { struct TNodeBase: private TNonCopyable { }; template struct TNodeLayout { TNode* Next; T Item; }; template struct TQueueLayout { TNode* Head; TNode* Tail; }; }; template class THTSwapQueueImpl : protected TTuneup::template TQueueLayout> { protected: using TTunedNode = TNode; public: using TItem = T; THTSwapQueueImpl() { this->Head = new TTunedNode(T()); this->Tail = this->Head; } ~THTSwapQueueImpl() { TTunedNode* node = this->Head; while (node != nullptr) { TTunedNode* next = node->Next; delete node; node = next; } } template void Push(TT&& item) { Enqueue(new TTunedNode(std::forward(item))); } T Peek() { TTunedNode* next = AtomicGet(this->Head->Next); if (next == nullptr) { return T(); } return next->Item; } void Enqueue(TTunedNode* node) { // our goal is to avoid expensive CAS here, // but now consumer will be blocked until new tail linked. // fortunately 'window of inconsistency' is extremely small. TTunedNode* prev = AtomicSwap(&this->Tail, node); AtomicSet(prev->Next, node); } T Pop() { TTunedNode* next = AtomicGet(this->Head->Next); if (next == nullptr) { return nullptr; } auto item = std::move(next->Item); std::swap(this->Head, next); // no need atomic here delete next; return item; } bool IsEmpty() const { TTunedNode* next = AtomicGet(this->Head->Next); return (next == nullptr); } }; } DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase); DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout); DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout); template class THTSwapQueue : public NHTSwapPrivate::THTSwapQueueImpl> { }; }