#pragma once #include #include #include #include #include #include #include #include #include #include #include #include namespace NThreading { //////////////////////////////////////////////////////////////////////////////// // Platform helpers #if !defined(PLATFORM_CACHE_LINE) #define PLATFORM_CACHE_LINE 64 #endif #if !defined(PLATFORM_PAGE_SIZE) #define PLATFORM_PAGE_SIZE 4 * 1024 #endif template struct TPadded: public T { char Pad[PadSize - sizeof(T) % PadSize]; TPadded() { static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); Y_UNUSED(Pad); } template TPadded(Args&&... args) : T(std::forward(args)...) { static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); Y_UNUSED(Pad); } }; //////////////////////////////////////////////////////////////////////////////// // Type helpers namespace NImpl { template struct TPodTypeHelper { template static void Write(T* ptr, TT&& value) { *ptr = value; } static T Read(T* ptr) { return *ptr; } static void Destroy(T* ptr) { Y_UNUSED(ptr); } }; template struct TNonPodTypeHelper { template static void Write(T* ptr, TT&& value) { new (ptr) T(std::forward(value)); } static T Read(T* ptr) { return std::move(*ptr); } static void Destroy(T* ptr) { (void)ptr; /* Make MSVC happy. */ ptr->~T(); } }; template using TTypeHelper = std::conditional_t< TTypeTraits::IsPod, TPodTypeHelper, TNonPodTypeHelper>; } //////////////////////////////////////////////////////////////////////////////// // One producer/one consumer chunked queue. template class TOneOneQueue: private TNonCopyable { using TTypeHelper = NImpl::TTypeHelper; struct TChunk; struct TChunkHeader { size_t Count = 0; TChunk* Next = nullptr; }; struct TChunk: public TChunkHeader { static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); char Entries[MaxCount * sizeof(T)]; TChunk() { Y_UNUSED(Entries); // uninitialized } ~TChunk() { for (size_t i = 0; i < this->Count; ++i) { TTypeHelper::Destroy(GetPtr(i)); } } T* GetPtr(size_t i) { return (T*)Entries + i; } }; struct TWriterState { TChunk* Chunk = nullptr; }; struct TReaderState { TChunk* Chunk = nullptr; size_t Count = 0; }; private: TPadded Writer; TPadded Reader; public: using TItem = T; TOneOneQueue() { Writer.Chunk = Reader.Chunk = new TChunk(); } ~TOneOneQueue() { DeleteChunks(Reader.Chunk); } template void Enqueue(TT&& value) { T* ptr = PrepareWrite(); Y_ASSERT(ptr); TTypeHelper::Write(ptr, std::forward(value)); CompleteWrite(); } bool Dequeue(T& value) { if (T* ptr = PrepareRead()) { value = TTypeHelper::Read(ptr); CompleteRead(); return true; } return false; } bool IsEmpty() { return !PrepareRead(); } protected: T* PrepareWrite() { TChunk* chunk = Writer.Chunk; Y_ASSERT(chunk && !chunk->Next); if (chunk->Count != TChunk::MaxCount) { return chunk->GetPtr(chunk->Count); } chunk = new TChunk(); AtomicSet(Writer.Chunk->Next, chunk); Writer.Chunk = chunk; return chunk->GetPtr(0); } void CompleteWrite() { AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); } T* PrepareRead() { TChunk* chunk = Reader.Chunk; Y_ASSERT(chunk); for (;;) { size_t writerCount = AtomicGet(chunk->Count); if (Reader.Count != writerCount) { return chunk->GetPtr(Reader.Count); } if (writerCount != TChunk::MaxCount) { return nullptr; } chunk = AtomicGet(chunk->Next); if (!chunk) { return nullptr; } delete Reader.Chunk; Reader.Chunk = chunk; Reader.Count = 0; } } void CompleteRead() { ++Reader.Count; } private: static void DeleteChunks(TChunk* chunk) { while (chunk) { TChunk* next = chunk->Next; delete chunk; chunk = next; } } }; //////////////////////////////////////////////////////////////////////////////// // Multiple producers/single consumer partitioned queue. // Provides FIFO guaranties for each producer. template class TManyOneQueue: private TNonCopyable { using TTypeHelper = NImpl::TTypeHelper; struct TEntry { T Value; ui64 Tag; }; struct TQueueType: public TOneOneQueue { TAtomic WriteLock = 0; using TOneOneQueue::PrepareWrite; using TOneOneQueue::CompleteWrite; using TOneOneQueue::PrepareRead; using TOneOneQueue::CompleteRead; }; private: union { TAtomic WriteTag = 0; char Pad[PLATFORM_CACHE_LINE]; }; TQueueType Queues[Concurrency]; public: using TItem = T; template void Enqueue(TT&& value) { ui64 tag = NextTag(); while (!TryEnqueue(std::forward(value), tag)) { SpinLockPause(); } } bool Dequeue(T& value) { size_t index = 0; if (TEntry* entry = PrepareRead(index)) { value = TTypeHelper::Read(&entry->Value); Queues[index].CompleteRead(); return true; } return false; } bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { if (!Queues[i].IsEmpty()) { return false; } } return true; } private: ui64 NextTag() { // TODO: can we avoid synchronization here? it costs 1.5x performance penalty // return GetCycleCount(); return AtomicIncrement(WriteTag); } template bool TryEnqueue(TT&& value, ui64 tag) { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; if (AtomicTryAndTryLock(&queue.WriteLock)) { TEntry* entry = queue.PrepareWrite(); Y_ASSERT(entry); TTypeHelper::Write(&entry->Value, std::forward(value)); entry->Tag = tag; queue.CompleteWrite(); AtomicUnlock(&queue.WriteLock); return true; } } return false; } TEntry* PrepareRead(size_t& index) { TEntry* entry = nullptr; ui64 tag = Max(); for (size_t i = 0; i < Concurrency; ++i) { TEntry* e = Queues[i].PrepareRead(); if (e && e->Tag < tag) { index = i; entry = e; tag = e->Tag; } } if (entry) { // need second pass to catch updates within already scanned range size_t candidate = index; for (size_t i = 0; i < candidate; ++i) { TEntry* e = Queues[i].PrepareRead(); if (e && e->Tag < tag) { index = i; entry = e; tag = e->Tag; } } } return entry; } }; //////////////////////////////////////////////////////////////////////////////// // Concurrent many-many queue with strong FIFO guaranties. // Writers will not block readers (and vice versa), but will block each other. template class TManyManyQueue: private TNonCopyable { private: TPadded WriteLock; TPadded ReadLock; TOneOneQueue Queue; public: using TItem = T; template void Enqueue(TT&& value) { with_lock (WriteLock) { Queue.Enqueue(std::forward(value)); } } bool Dequeue(T& value) { with_lock (ReadLock) { return Queue.Dequeue(value); } } bool IsEmpty() { with_lock (ReadLock) { return Queue.IsEmpty(); } } }; //////////////////////////////////////////////////////////////////////////////// // Multiple producers/single consumer partitioned queue. // Because of random partitioning reordering possible - FIFO not guaranteed! template class TRelaxedManyOneQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue { TAtomic WriteLock = 0; }; private: union { size_t ReadPos = 0; char Pad[PLATFORM_CACHE_LINE]; }; TQueueType Queues[Concurrency]; public: using TItem = T; template void Enqueue(TT&& value) { while (!TryEnqueue(std::forward(value))) { SpinLockPause(); } } bool Dequeue(T& value) { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[ReadPos++ % Concurrency]; if (queue.Dequeue(value)) { return true; } } return false; } bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { if (!Queues[i].IsEmpty()) { return false; } } return true; } private: template bool TryEnqueue(TT&& value) { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; if (AtomicTryAndTryLock(&queue.WriteLock)) { queue.Enqueue(std::forward(value)); AtomicUnlock(&queue.WriteLock); return true; } } return false; } }; //////////////////////////////////////////////////////////////////////////////// // Concurrent many-many partitioned queue. // Because of random partitioning reordering possible - FIFO not guaranteed! template class TRelaxedManyManyQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue { union { TAtomic WriteLock = 0; char Pad1[PLATFORM_CACHE_LINE]; }; union { TAtomic ReadLock = 0; char Pad2[PLATFORM_CACHE_LINE]; }; }; private: TQueueType Queues[Concurrency]; public: using TItem = T; template void Enqueue(TT&& value) { while (!TryEnqueue(std::forward(value))) { SpinLockPause(); } } bool Dequeue(T& value) { size_t readPos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[readPos++ % Concurrency]; if (AtomicTryAndTryLock(&queue.ReadLock)) { bool dequeued = queue.Dequeue(value); AtomicUnlock(&queue.ReadLock); if (dequeued) { return true; } } } return false; } bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; if (AtomicTryAndTryLock(&queue.ReadLock)) { bool empty = queue.IsEmpty(); AtomicUnlock(&queue.ReadLock); if (!empty) { return false; } } } return true; } private: template bool TryEnqueue(TT&& value) { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; if (AtomicTryAndTryLock(&queue.WriteLock)) { queue.Enqueue(std::forward(value)); AtomicUnlock(&queue.WriteLock); return true; } } return false; } }; //////////////////////////////////////////////////////////////////////////////// // Simple wrapper to deal with AutoPtrs template class TAutoQueueBase: private TNonCopyable { private: TImpl Impl; public: using TItem = TAutoPtr; ~TAutoQueueBase() { TAutoPtr value; while (Dequeue(value)) { // do nothing } } void Enqueue(TAutoPtr value) { Impl.Enqueue(value.Get()); Y_UNUSED(value.Release()); } bool Dequeue(TAutoPtr& value) { T* ptr = nullptr; if (Impl.Dequeue(ptr)) { value.Reset(ptr); return true; } return false; } bool IsEmpty() { return Impl.IsEmpty(); } }; template using TAutoOneOneQueue = TAutoQueueBase>; template using TAutoManyOneQueue = TAutoQueueBase>; template using TAutoManyManyQueue = TAutoQueueBase>; template using TAutoRelaxedManyOneQueue = TAutoQueueBase>; template using TAutoRelaxedManyManyQueue = TAutoQueueBase>; }