#pragma once #include #include #include #include #include #include #include #include #include class TFastElasticQueue : public TThreadPoolBase , private IThreadFactory::IThreadAble { public: explicit TFastElasticQueue(const TParams& params = {}) : TThreadPoolBase(params) { Y_ENSURE(!params.Blocking_); } ~TFastElasticQueue() { Stop(); } void Start(size_t threadCount, size_t maxQueueSize) override { Y_ENSURE(Threads_.empty()); Y_ENSURE(maxQueueSize > 0); Queue_.Reset(new NThreading::TBoundedQueue(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events MaxQueueSize_ = maxQueueSize; try { for (size_t i = 0; i < threadCount; ++i) { Threads_.push_back(Pool()->Run(this)); } } catch (...) { Stop(); throw; } Stopped_ = false; } size_t ObjectCount() const { //GuardCount_ can be temporary incremented above real object count in queue return Min(GuardCount_.load(), MaxQueueSize_); } bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT { if (Stopped_ || !obj) { return false; } if (GuardCount_.fetch_add(1) >= MaxQueueSize_) { GuardCount_.fetch_sub(1); return false; } QueueSize_.fetch_add(1); if (!Queue_->Enqueue(obj)) { //Simultaneous Dequeue calls can return not in exact fifo order of items, //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of //the oldest enqueued item is not actually dequeued and ring buffer can't proceed. GuardCount_.fetch_sub(1); QueueSize_.fetch_sub(1); return false; } Event_.NotifyOne(); return true; } size_t Size() const noexcept override { return QueueSize_.load(); } void Stop() noexcept override { Stopped_ = true; for (size_t i = 0; i < Threads_.size(); ++i) { while (!Queue_->Enqueue(nullptr)) { Sleep(TDuration::MilliSeconds(1)); } Event_.NotifyOne(); } while (!Threads_.empty()) { Threads_.back()->Join(); Threads_.pop_back(); } Queue_.Reset(); } void DoExecute() override { TThread::SetCurrentThreadName(Params.ThreadName_.c_str()); while (true) { IObjectInQueue* job = nullptr; Event_.Await([&]() { return Queue_->Dequeue(job); }); if (!job) { break; } QueueSize_.fetch_sub(1); Y_DEFER { GuardCount_.fetch_sub(1); }; if (Params.Catching_) { try { try { job->Process(nullptr); } catch (...) { Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; } } catch (...) { ; } } else { job->Process(nullptr); } } } private: std::atomic Stopped_ = false; size_t MaxQueueSize_ = 0; alignas(64) std::atomic GuardCount_ = 0; alignas(64) std::atomic QueueSize_ = 0; TVector> Threads_; THolder> Queue_; NYT::NThreading::TEventCount Event_; };