123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568 |
- #pragma once
- #include <util/datetime/base.h>
- #include <util/generic/noncopyable.h>
- #include <util/generic/ptr.h>
- #include <util/generic/typetraits.h>
- #include <util/generic/vector.h>
- #include <util/generic/ylimits.h>
- #include <library/cpp/deprecated/atomic/atomic.h>
- #include <util/system/guard.h>
- #include <util/system/spinlock.h>
- #include <util/system/yassert.h>
- #include <type_traits>
- #include <utility>
- namespace NThreading {
- ////////////////////////////////////////////////////////////////////////////////
- // Platform helpers
- #if !defined(PLATFORM_CACHE_LINE)
- #define PLATFORM_CACHE_LINE 64
- #endif
- #if !defined(PLATFORM_PAGE_SIZE)
- #define PLATFORM_PAGE_SIZE 4 * 1024
- #endif
- template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
- struct TPadded: public T {
- char Pad[PadSize - sizeof(T) % PadSize];
- TPadded() {
- static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
- Y_UNUSED(Pad);
- }
- template<typename... Args>
- TPadded(Args&&... args)
- : T(std::forward<Args>(args)...)
- {
- static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
- Y_UNUSED(Pad);
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- // Type helpers
- namespace NImpl {
- template <typename T>
- struct TPodTypeHelper {
- template <typename TT>
- static void Write(T* ptr, TT&& value) {
- *ptr = value;
- }
- static T Read(T* ptr) {
- return *ptr;
- }
- static void Destroy(T* ptr) {
- Y_UNUSED(ptr);
- }
- };
- template <typename T>
- struct TNonPodTypeHelper {
- template <typename TT>
- static void Write(T* ptr, TT&& value) {
- new (ptr) T(std::forward<TT>(value));
- }
- static T Read(T* ptr) {
- return std::move(*ptr);
- }
- static void Destroy(T* ptr) {
- (void)ptr; /* Make MSVC happy. */
- ptr->~T();
- }
- };
- template <typename T>
- using TTypeHelper = std::conditional_t<
- TTypeTraits<T>::IsPod,
- TPodTypeHelper<T>,
- TNonPodTypeHelper<T>>;
- }
- ////////////////////////////////////////////////////////////////////////////////
- // One producer/one consumer chunked queue.
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TOneOneQueue: private TNonCopyable {
- using TTypeHelper = NImpl::TTypeHelper<T>;
- struct TChunk;
- struct TChunkHeader {
- size_t Count = 0;
- TChunk* Next = nullptr;
- };
- struct TChunk: public TChunkHeader {
- static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);
- char Entries[MaxCount * sizeof(T)];
- TChunk() {
- Y_UNUSED(Entries); // uninitialized
- }
- ~TChunk() {
- for (size_t i = 0; i < this->Count; ++i) {
- TTypeHelper::Destroy(GetPtr(i));
- }
- }
- T* GetPtr(size_t i) {
- return (T*)Entries + i;
- }
- };
- struct TWriterState {
- TChunk* Chunk = nullptr;
- };
- struct TReaderState {
- TChunk* Chunk = nullptr;
- size_t Count = 0;
- };
- private:
- TPadded<TWriterState> Writer;
- TPadded<TReaderState> Reader;
- public:
- using TItem = T;
- TOneOneQueue() {
- Writer.Chunk = Reader.Chunk = new TChunk();
- }
- ~TOneOneQueue() {
- DeleteChunks(Reader.Chunk);
- }
- template <typename TT>
- void Enqueue(TT&& value) {
- T* ptr = PrepareWrite();
- Y_ASSERT(ptr);
- TTypeHelper::Write(ptr, std::forward<TT>(value));
- CompleteWrite();
- }
- bool Dequeue(T& value) {
- if (T* ptr = PrepareRead()) {
- value = TTypeHelper::Read(ptr);
- CompleteRead();
- return true;
- }
- return false;
- }
- bool IsEmpty() {
- return !PrepareRead();
- }
- protected:
- T* PrepareWrite() {
- TChunk* chunk = Writer.Chunk;
- Y_ASSERT(chunk && !chunk->Next);
- if (chunk->Count != TChunk::MaxCount) {
- return chunk->GetPtr(chunk->Count);
- }
- chunk = new TChunk();
- AtomicSet(Writer.Chunk->Next, chunk);
- Writer.Chunk = chunk;
- return chunk->GetPtr(0);
- }
- void CompleteWrite() {
- AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
- }
- T* PrepareRead() {
- TChunk* chunk = Reader.Chunk;
- Y_ASSERT(chunk);
- for (;;) {
- size_t writerCount = AtomicGet(chunk->Count);
- if (Reader.Count != writerCount) {
- return chunk->GetPtr(Reader.Count);
- }
- if (writerCount != TChunk::MaxCount) {
- return nullptr;
- }
- chunk = AtomicGet(chunk->Next);
- if (!chunk) {
- return nullptr;
- }
- delete Reader.Chunk;
- Reader.Chunk = chunk;
- Reader.Count = 0;
- }
- }
- void CompleteRead() {
- ++Reader.Count;
- }
- private:
- static void DeleteChunks(TChunk* chunk) {
- while (chunk) {
- TChunk* next = chunk->Next;
- delete chunk;
- chunk = next;
- }
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- // Multiple producers/single consumer partitioned queue.
- // Provides FIFO guaranties for each producer.
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TManyOneQueue: private TNonCopyable {
- using TTypeHelper = NImpl::TTypeHelper<T>;
- struct TEntry {
- T Value;
- ui64 Tag;
- };
- struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
- TAtomic WriteLock = 0;
- using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
- using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
- using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
- using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
- };
- private:
- union {
- TAtomic WriteTag = 0;
- char Pad[PLATFORM_CACHE_LINE];
- };
- TQueueType Queues[Concurrency];
- public:
- using TItem = T;
- template <typename TT>
- void Enqueue(TT&& value) {
- ui64 tag = NextTag();
- while (!TryEnqueue(std::forward<TT>(value), tag)) {
- SpinLockPause();
- }
- }
- bool Dequeue(T& value) {
- size_t index = 0;
- if (TEntry* entry = PrepareRead(index)) {
- value = TTypeHelper::Read(&entry->Value);
- Queues[index].CompleteRead();
- return true;
- }
- return false;
- }
- bool IsEmpty() {
- for (size_t i = 0; i < Concurrency; ++i) {
- if (!Queues[i].IsEmpty()) {
- return false;
- }
- }
- return true;
- }
- private:
- ui64 NextTag() {
- // TODO: can we avoid synchronization here? it costs 1.5x performance penalty
- // return GetCycleCount();
- return AtomicIncrement(WriteTag);
- }
- template <typename TT>
- bool TryEnqueue(TT&& value, ui64 tag) {
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[i];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
- TEntry* entry = queue.PrepareWrite();
- Y_ASSERT(entry);
- TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
- entry->Tag = tag;
- queue.CompleteWrite();
- AtomicUnlock(&queue.WriteLock);
- return true;
- }
- }
- return false;
- }
- TEntry* PrepareRead(size_t& index) {
- TEntry* entry = nullptr;
- ui64 tag = Max();
- for (size_t i = 0; i < Concurrency; ++i) {
- TEntry* e = Queues[i].PrepareRead();
- if (e && e->Tag < tag) {
- index = i;
- entry = e;
- tag = e->Tag;
- }
- }
- if (entry) {
- // need second pass to catch updates within already scanned range
- size_t candidate = index;
- for (size_t i = 0; i < candidate; ++i) {
- TEntry* e = Queues[i].PrepareRead();
- if (e && e->Tag < tag) {
- index = i;
- entry = e;
- tag = e->Tag;
- }
- }
- }
- return entry;
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- // Concurrent many-many queue with strong FIFO guaranties.
- // Writers will not block readers (and vice versa), but will block each other.
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
- class TManyManyQueue: private TNonCopyable {
- private:
- TPadded<TLock> WriteLock;
- TPadded<TLock> ReadLock;
- TOneOneQueue<T, ChunkSize> Queue;
- public:
- using TItem = T;
- template <typename TT>
- void Enqueue(TT&& value) {
- with_lock (WriteLock) {
- Queue.Enqueue(std::forward<TT>(value));
- }
- }
- bool Dequeue(T& value) {
- with_lock (ReadLock) {
- return Queue.Dequeue(value);
- }
- }
- bool IsEmpty() {
- with_lock (ReadLock) {
- return Queue.IsEmpty();
- }
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- // Multiple producers/single consumer partitioned queue.
- // Because of random partitioning reordering possible - FIFO not guaranteed!
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TRelaxedManyOneQueue: private TNonCopyable {
- struct TQueueType: public TOneOneQueue<T, ChunkSize> {
- TAtomic WriteLock = 0;
- };
- private:
- union {
- size_t ReadPos = 0;
- char Pad[PLATFORM_CACHE_LINE];
- };
- TQueueType Queues[Concurrency];
- public:
- using TItem = T;
- template <typename TT>
- void Enqueue(TT&& value) {
- while (!TryEnqueue(std::forward<TT>(value))) {
- SpinLockPause();
- }
- }
- bool Dequeue(T& value) {
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[ReadPos++ % Concurrency];
- if (queue.Dequeue(value)) {
- return true;
- }
- }
- return false;
- }
- bool IsEmpty() {
- for (size_t i = 0; i < Concurrency; ++i) {
- if (!Queues[i].IsEmpty()) {
- return false;
- }
- }
- return true;
- }
- private:
- template <typename TT>
- bool TryEnqueue(TT&& value) {
- size_t writePos = GetCycleCount();
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[writePos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
- queue.Enqueue(std::forward<TT>(value));
- AtomicUnlock(&queue.WriteLock);
- return true;
- }
- }
- return false;
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- // Concurrent many-many partitioned queue.
- // Because of random partitioning reordering possible - FIFO not guaranteed!
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TRelaxedManyManyQueue: private TNonCopyable {
- struct TQueueType: public TOneOneQueue<T, ChunkSize> {
- union {
- TAtomic WriteLock = 0;
- char Pad1[PLATFORM_CACHE_LINE];
- };
- union {
- TAtomic ReadLock = 0;
- char Pad2[PLATFORM_CACHE_LINE];
- };
- };
- private:
- TQueueType Queues[Concurrency];
- public:
- using TItem = T;
- template <typename TT>
- void Enqueue(TT&& value) {
- while (!TryEnqueue(std::forward<TT>(value))) {
- SpinLockPause();
- }
- }
- bool Dequeue(T& value) {
- size_t readPos = GetCycleCount();
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[readPos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.ReadLock)) {
- bool dequeued = queue.Dequeue(value);
- AtomicUnlock(&queue.ReadLock);
- if (dequeued) {
- return true;
- }
- }
- }
- return false;
- }
- bool IsEmpty() {
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[i];
- if (AtomicTryAndTryLock(&queue.ReadLock)) {
- bool empty = queue.IsEmpty();
- AtomicUnlock(&queue.ReadLock);
- if (!empty) {
- return false;
- }
- }
- }
- return true;
- }
- private:
- template <typename TT>
- bool TryEnqueue(TT&& value) {
- size_t writePos = GetCycleCount();
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[writePos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
- queue.Enqueue(std::forward<TT>(value));
- AtomicUnlock(&queue.WriteLock);
- return true;
- }
- }
- return false;
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- // Simple wrapper to deal with AutoPtrs
- template <typename T, typename TImpl>
- class TAutoQueueBase: private TNonCopyable {
- private:
- TImpl Impl;
- public:
- using TItem = TAutoPtr<T>;
- ~TAutoQueueBase() {
- TAutoPtr<T> value;
- while (Dequeue(value)) {
- // do nothing
- }
- }
- void Enqueue(TAutoPtr<T> value) {
- Impl.Enqueue(value.Get());
- Y_UNUSED(value.Release());
- }
- bool Dequeue(TAutoPtr<T>& value) {
- T* ptr = nullptr;
- if (Impl.Dequeue(ptr)) {
- value.Reset(ptr);
- return true;
- }
- return false;
- }
- bool IsEmpty() {
- return Impl.IsEmpty();
- }
- };
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
- using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
- }
|