123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611 |
- #pragma once
- /*
- Completely wait-free queue, multiple producers - one consumer. Strict order.
- The queue algorithm is using concept of virtual infinite array.
- A producer takes a number from a counter and atomically increments the counter.
- The number taken is a number of a slot for the producer to put a new message
- into infinite array.
- Then producer constructs a virtual infinite array by bidirectional linked list
- of blocks. Each block contains several slots.
- There is a hint pointer which optimistically points to the last block
- of the list and never goes backward.
- Consumer exploits the property of the hint pointer always going forward
- to free old blocks eventually. Consumer periodically read the hint pointer
- and the counter and thus deduce producers which potentially holds the pointer
- to a block. Consumer can free the block if all that producers filled their
- slots and left the queue.
- No producer can stop the progress for other producers.
- Consumer can't stop the progress for producers.
- Consumer can skip not-yet-filled slots and read them later.
- Thus no producer can stop the progress for consumer.
- The algorithm is virtually strictly ordered because it skips slots only
- if it is really does not matter in which order the slots were produced and
- consumed.
- WARNING: there is no wait¬ify mechanic for consumer,
- consumer receives nullptr if queue was empty.
- WARNING: though the algorithm itself is completely wait-free
- but producers and consumer could be blocked by memory allocator
- WARNING: copy constructors of the queue are not thread-safe
- */
- #include <util/generic/deque.h>
- #include <util/generic/ptr.h>
- #include <library/cpp/deprecated/atomic/atomic.h>
- #include <util/system/spinlock.h>
- #include "tune.h"
- namespace NThreading {
- namespace NReadAsFilledPrivate {
- typedef void* TMsgLink;
- static constexpr ui32 DEFAULT_BUNCH_SIZE = 251;
- struct TEmpty {
- };
- struct TEmptyAux {
- TEmptyAux Retrieve() const {
- return TEmptyAux();
- }
- void Store(TEmptyAux&) {
- }
- static constexpr TEmptyAux Zero() {
- return TEmptyAux();
- }
- };
- template <typename TAux>
- struct TSlot {
- TMsgLink volatile Msg;
- TAux AuxiliaryData;
- inline void Store(TAux& aux) {
- AuxiliaryData.Store(aux);
- }
- inline TAux Retrieve() const {
- return AuxiliaryData.Retrieve();
- }
- static TSlot<TAux> NullElem() {
- return {nullptr, TAux::Zero()};
- }
- static TSlot<TAux> Pair(TMsgLink msg, TAux aux) {
- return {msg, std::move(aux)};
- }
- };
- template <>
- struct TSlot<TEmptyAux> {
- TMsgLink volatile Msg;
- inline void Store(TEmptyAux&) {
- }
- inline TEmptyAux Retrieve() const {
- return TEmptyAux();
- }
- static TSlot<TEmptyAux> NullElem() {
- return {nullptr};
- }
- static TSlot<TEmptyAux> Pair(TMsgLink msg, TEmptyAux) {
- return {msg};
- }
- };
- enum TPushResult {
- PUSH_RESULT_OK,
- PUSH_RESULT_BACKWARD,
- PUSH_RESULT_FORWARD,
- };
- template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE,
- typename TBase = TEmpty,
- typename TAux = TEmptyAux>
- struct TMsgBunch: public TBase {
- static constexpr size_t RELEASE_SIZE = BUNCH_SIZE * 2;
- ui64 FirstSlot;
- TSlot<TAux> LinkArray[BUNCH_SIZE];
- TMsgBunch* volatile NextBunch;
- TMsgBunch* volatile BackLink;
- ui64 volatile Token;
- TMsgBunch* volatile NextToken;
- /* this push can return PUSH_RESULT_BLOCKED */
- inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {
- if (Y_UNLIKELY(slot < FirstSlot)) {
- return PUSH_RESULT_BACKWARD;
- }
- if (Y_UNLIKELY(slot >= FirstSlot + BUNCH_SIZE)) {
- return PUSH_RESULT_FORWARD;
- }
- LinkArray[slot - FirstSlot].Store(auxiliary);
- AtomicSet(LinkArray[slot - FirstSlot].Msg, msg);
- return PUSH_RESULT_OK;
- }
- inline bool IsSlotHere(ui64 slot) {
- return slot < FirstSlot + BUNCH_SIZE;
- }
- inline TMsgLink GetSlot(ui64 slot) const {
- return AtomicGet(LinkArray[slot - FirstSlot].Msg);
- }
- inline TSlot<TAux> GetSlotAux(ui64 slot) const {
- auto msg = GetSlot(slot);
- auto aux = LinkArray[slot - FirstSlot].Retrieve();
- return TSlot<TAux>::Pair(msg, aux);
- }
- inline TMsgBunch* GetNextBunch() const {
- return AtomicGet(NextBunch);
- }
- inline bool SetNextBunch(TMsgBunch* ptr) {
- return AtomicCas(&NextBunch, ptr, nullptr);
- }
- inline TMsgBunch* GetBackLink() const {
- return AtomicGet(BackLink);
- }
- inline TMsgBunch* GetToken(ui64 slot) {
- return reinterpret_cast<TMsgBunch*>(
- LinkArray[slot - FirstSlot].Msg);
- }
- inline void IncrementToken() {
- AtomicIncrement(Token);
- }
- // the object could be destroyed after this method
- inline void DecrementToken() {
- if (Y_UNLIKELY(AtomicDecrement(Token) == RELEASE_SIZE)) {
- Release(this);
- AtomicGet(NextToken)->DecrementToken();
- // this could be invalid here
- }
- }
- // the object could be destroyed after this method
- inline void SetNextToken(TMsgBunch* next) {
- AtomicSet(NextToken, next);
- if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) {
- Release(this);
- next->DecrementToken();
- }
- // this could be invalid here
- }
- TMsgBunch(ui64 start, TMsgBunch* backLink) {
- AtomicSet(FirstSlot, start);
- memset(&LinkArray, 0, sizeof(LinkArray));
- AtomicSet(NextBunch, nullptr);
- AtomicSet(BackLink, backLink);
- AtomicSet(Token, 1);
- AtomicSet(NextToken, nullptr);
- }
- static void Release(TMsgBunch* block) {
- auto backLink = AtomicGet(block->BackLink);
- if (backLink == nullptr) {
- return;
- }
- AtomicSet(block->BackLink, nullptr);
- do {
- auto bbackLink = backLink->BackLink;
- delete backLink;
- backLink = bbackLink;
- } while (backLink != nullptr);
- }
- void Destroy() {
- for (auto tail = BackLink; tail != nullptr;) {
- auto next = tail->BackLink;
- delete tail;
- tail = next;
- }
- for (auto next = this; next != nullptr;) {
- auto nnext = next->NextBunch;
- delete next;
- next = nnext;
- }
- }
- };
- template <ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE,
- typename TBunchBase = NReadAsFilledPrivate::TEmpty,
- typename TAux = TEmptyAux>
- class TWriteBucket {
- public:
- using TUsingAux = TAux; // for TReadBucket binding
- using TBunch = TMsgBunch<BUNCH_SIZE, TBunchBase, TAux>;
- TWriteBucket(TBunch* bunch = new TBunch(0, nullptr)) {
- AtomicSet(LastBunch, bunch);
- AtomicSet(SlotCounter, 0);
- }
- TWriteBucket(TWriteBucket&& move)
- : LastBunch(move.LastBunch)
- , SlotCounter(move.SlotCounter)
- {
- move.LastBunch = nullptr;
- }
- ~TWriteBucket() {
- if (LastBunch != nullptr) {
- LastBunch->Destroy();
- }
- }
- inline void Push(TMsgLink msg, TAux aux) {
- ui64 pushSlot = AtomicGetAndIncrement(SlotCounter);
- TBunch* hintBunch = GetLastBunch();
- for (;;) {
- auto hint = hintBunch->Push(msg, pushSlot, aux);
- if (Y_LIKELY(hint == PUSH_RESULT_OK)) {
- return;
- }
- HandleHint(hintBunch, hint);
- }
- }
- protected:
- template <typename, template <typename, typename...> class>
- friend class TReadBucket;
- TBunch* volatile LastBunch; // Hint
- volatile ui64 SlotCounter;
- inline TBunch* GetLastBunch() const {
- return AtomicGet(LastBunch);
- }
- void HandleHint(TBunch*& hintBunch, TPushResult hint) {
- if (Y_UNLIKELY(hint == PUSH_RESULT_BACKWARD)) {
- hintBunch = hintBunch->GetBackLink();
- return;
- }
- // PUSH_RESULT_FORWARD
- auto nextBunch = hintBunch->GetNextBunch();
- if (nextBunch == nullptr) {
- auto first = hintBunch->FirstSlot + BUNCH_SIZE;
- nextBunch = new TBunch(first, hintBunch);
- if (Y_UNLIKELY(!hintBunch->SetNextBunch(nextBunch))) {
- delete nextBunch;
- nextBunch = hintBunch->GetNextBunch();
- }
- }
- // hintBunch could not be freed here so it cannot be reused
- // it's alright if this CAS was not succeeded,
- // it means that other thread did that recently
- AtomicCas(&LastBunch, nextBunch, hintBunch);
- hintBunch = nextBunch;
- }
- };
- template <typename TWBucket = TWriteBucket<>,
- template <typename, typename...> class TContainer = TDeque>
- class TReadBucket {
- public:
- using TAux = typename TWBucket::TUsingAux;
- using TBunch = typename TWBucket::TBunch;
- static constexpr int MAX_NUMBER_OF_TRIES_TO_READ = 5;
- TReadBucket(TWBucket* writer)
- : Writer(writer)
- , ReadBunch(writer->GetLastBunch())
- , LastKnownPushBunch(writer->GetLastBunch())
- {
- ReadBunch->DecrementToken(); // no previous token
- }
- TReadBucket(TReadBucket toCopy, TWBucket* writer)
- : TReadBucket(std::move(toCopy))
- {
- Writer = writer;
- }
- ui64 ReadyCount() const {
- return AtomicGet(Writer->SlotCounter) - ReadSlot;
- }
- TMsgLink Pop() {
- return PopAux().Msg;
- }
- TMsgLink Peek() {
- return PeekAux().Msg;
- }
- TSlot<TAux> PopAux() {
- for (;;) {
- if (Y_UNLIKELY(ReadNow.size() != 0)) {
- auto result = PopSkipped();
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- }
- if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
- if (Y_LIKELY(!RereadPushSlot())) {
- return TSlot<TAux>::NullElem();
- }
- continue;
- }
- if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
- if (Y_UNLIKELY(!SwitchToNextBunch())) {
- return TSlot<TAux>::NullElem();
- }
- }
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- ++ReadSlot;
- return result;
- }
- result = StubbornPop();
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- }
- }
- TSlot<TAux> PeekAux() {
- for (;;) {
- if (Y_UNLIKELY(ReadNow.size() != 0)) {
- auto result = PeekSkipped();
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- }
- if (Y_UNLIKELY(ReadSlot == LastKnownPushSlot)) {
- if (Y_LIKELY(!RereadPushSlot())) {
- return TSlot<TAux>::NullElem();
- }
- continue;
- }
- if (Y_UNLIKELY(!ReadBunch->IsSlotHere(ReadSlot))) {
- if (Y_UNLIKELY(!SwitchToNextBunch())) {
- return TSlot<TAux>::NullElem();
- }
- }
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- result = StubbornPeek();
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- }
- }
- private:
- TWBucket* Writer;
- TBunch* ReadBunch;
- ui64 ReadSlot = 0;
- TBunch* LastKnownPushBunch;
- ui64 LastKnownPushSlot = 0;
- struct TSkipItem {
- TBunch* Bunch;
- ui64 Slot;
- TBunch* Token;
- };
- TContainer<TSkipItem> ReadNow;
- TContainer<TSkipItem> ReadLater;
- void AddToReadLater() {
- ReadLater.push_back({ReadBunch, ReadSlot, LastKnownPushBunch});
- LastKnownPushBunch->IncrementToken();
- ++ReadSlot;
- }
- // MUST BE: ReadSlot == LastKnownPushSlot
- bool RereadPushSlot() {
- ReadNow = std::move(ReadLater);
- ReadLater.clear();
- auto oldSlot = LastKnownPushSlot;
- auto currentPushBunch = Writer->GetLastBunch();
- auto currentPushSlot = AtomicGet(Writer->SlotCounter);
- if (currentPushBunch != LastKnownPushBunch) {
- // LastKnownPushBunch could be invalid after this line
- LastKnownPushBunch->SetNextToken(currentPushBunch);
- }
- LastKnownPushBunch = currentPushBunch;
- LastKnownPushSlot = currentPushSlot;
- return oldSlot != LastKnownPushSlot;
- }
- bool SwitchToNextBunch() {
- for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
- auto next = ReadBunch->GetNextBunch();
- if (next != nullptr) {
- ReadBunch = next;
- return true;
- }
- SpinLockPause();
- }
- return false;
- }
- TSlot<TAux> StubbornPop() {
- for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- ++ReadSlot;
- return result;
- }
- SpinLockPause();
- }
- AddToReadLater();
- return TSlot<TAux>::NullElem();
- }
- TSlot<TAux> StubbornPeek() {
- for (int q = 0; q < MAX_NUMBER_OF_TRIES_TO_READ; ++q) {
- auto result = ReadBunch->GetSlotAux(ReadSlot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- SpinLockPause();
- }
- AddToReadLater();
- return TSlot<TAux>::NullElem();
- }
- TSlot<TAux> PopSkipped() {
- do {
- auto elem = ReadNow.front();
- ReadNow.pop_front();
- auto result = elem.Bunch->GetSlotAux(elem.Slot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- elem.Token->DecrementToken();
- return result;
- }
- ReadLater.emplace_back(elem);
- } while (ReadNow.size() > 0);
- return TSlot<TAux>::NullElem();
- }
- TSlot<TAux> PeekSkipped() {
- do {
- auto elem = ReadNow.front();
- auto result = elem.Bunch->GetSlotAux(elem.Slot);
- if (Y_LIKELY(result.Msg != nullptr)) {
- return result;
- }
- ReadNow.pop_front();
- ReadLater.emplace_back(elem);
- } while (ReadNow.size() > 0);
- return TSlot<TAux>::NullElem();
- }
- };
- struct TDefaultParams {
- static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE;
- using TBunchBase = TEmpty;
- template <typename TElem, typename... TRest>
- using TContainer = TDeque<TElem, TRest...>;
- static constexpr bool DeleteItems = true;
- };
- } //namespace NReadAsFilledPrivate
- DeclareTuneValueParam(TRaFQueueBunchSize, ui32, BUNCH_SIZE);
- DeclareTuneTypeParam(TRaFQueueBunchBase, TBunchBase);
- DeclareTuneContainer(TRaFQueueSkipContainer, TContainer);
- DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems);
- template <typename TItem = void, typename... TParams>
- class TReadAsFilledQueue {
- private:
- using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>;
- static constexpr ui32 BUNCH_SIZE = TTuned::BUNCH_SIZE;
- using TBunchBase = typename TTuned::TBunchBase;
- template <typename TElem, typename... TRest>
- using TContainer =
- typename TTuned::template TContainer<TElem, TRest...>;
- using TWriteBucket =
- NReadAsFilledPrivate::TWriteBucket<BUNCH_SIZE, TBunchBase>;
- using TReadBucket =
- NReadAsFilledPrivate::TReadBucket<TWriteBucket, TContainer>;
- public:
- TReadAsFilledQueue()
- : RBucket(&WBucket)
- {
- }
- ~TReadAsFilledQueue() {
- if (TTuned::DeleteItems) {
- for (;;) {
- auto msg = Pop();
- if (msg == nullptr) {
- break;
- }
- TDelete::Destroy(msg);
- }
- }
- }
- void Push(TItem* msg) {
- WBucket.Push((void*)msg, NReadAsFilledPrivate::TEmptyAux());
- }
- TItem* Pop() {
- return (TItem*)RBucket.Pop();
- }
- TItem* Peek() {
- return (TItem*)RBucket.Peek();
- }
- protected:
- TWriteBucket WBucket;
- TReadBucket RBucket;
- };
- }
|