123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- #pragma once
- /*
- http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
- Simple semi-wait-free queue. Many producers - one consumer.
- Tracking of allocated memory is not required.
- No CAS. Only atomic swap (exchange) operations.
- WARNING: a sleeping producer can stop progress for consumer.
- WARNING: there is no wait¬ify mechanic for consumer,
- consumer receives nullptr if queue was empty.
- WARNING: the algorithm itself is lock-free
- but producers and consumer could be blocked by memory allocator
- Reference design: rtmapreduce/libs/threading/lfqueue.h
- */
- #include <util/generic/noncopyable.h>
- #include <util/system/types.h>
- #include <library/cpp/deprecated/atomic/atomic.h>
- #include "tune.h"
- namespace NThreading {
- namespace NHTSwapPrivate {
- template <typename T, typename TTuneup>
- struct TNode
- : public TTuneup::TNodeBase,
- public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> {
- TNode(const T& item) {
- this->Next = nullptr;
- this->Item = item;
- }
- TNode(T&& item) {
- this->Next = nullptr;
- this->Item = std::move(item);
- }
- };
- struct TDefaultTuneup {
- struct TNodeBase: private TNonCopyable {
- };
- template <typename TNode, typename T>
- struct TNodeLayout {
- TNode* Next;
- T Item;
- };
- template <typename TNode>
- struct TQueueLayout {
- TNode* Head;
- TNode* Tail;
- };
- };
- template <typename T, typename TTuneup>
- class THTSwapQueueImpl
- : protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> {
- protected:
- using TTunedNode = TNode<T, TTuneup>;
- public:
- using TItem = T;
- THTSwapQueueImpl() {
- this->Head = new TTunedNode(T());
- this->Tail = this->Head;
- }
- ~THTSwapQueueImpl() {
- TTunedNode* node = this->Head;
- while (node != nullptr) {
- TTunedNode* next = node->Next;
- delete node;
- node = next;
- }
- }
- template <typename TT>
- void Push(TT&& item) {
- Enqueue(new TTunedNode(std::forward<TT>(item)));
- }
- T Peek() {
- TTunedNode* next = AtomicGet(this->Head->Next);
- if (next == nullptr) {
- return T();
- }
- return next->Item;
- }
- void Enqueue(TTunedNode* node) {
- // our goal is to avoid expensive CAS here,
- // but now consumer will be blocked until new tail linked.
- // fortunately 'window of inconsistency' is extremely small.
- TTunedNode* prev = AtomicSwap(&this->Tail, node);
- AtomicSet(prev->Next, node);
- }
- T Pop() {
- TTunedNode* next = AtomicGet(this->Head->Next);
- if (next == nullptr) {
- return nullptr;
- }
- auto item = std::move(next->Item);
- std::swap(this->Head, next); // no need atomic here
- delete next;
- return item;
- }
- bool IsEmpty() const {
- TTunedNode* next = AtomicGet(this->Head->Next);
- return (next == nullptr);
- }
- };
- }
- DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase);
- DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);
- DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);
- template <typename T = void*, typename... TParams>
- class THTSwapQueue
- : public NHTSwapPrivate::THTSwapQueueImpl<T,
- TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> {
- };
- }
|