123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- #pragma once
- #include <util/thread/pool.h>
- #include <util/datetime/base.h>
- #include <util/thread/lfqueue.h>
- #include <util/system/thread.h>
- #include <util/generic/vector.h>
- #include <util/generic/scope.h>
- #include <util/stream/str.h>
- #include <library/cpp/threading/bounded_queue/bounded_queue.h>
- #include <library/cpp/yt/threading/event_count.h>
- class TFastElasticQueue
- : public TThreadPoolBase
- , private IThreadFactory::IThreadAble
- {
- public:
- explicit TFastElasticQueue(const TParams& params = {})
- : TThreadPoolBase(params)
- {
- Y_ENSURE(!params.Blocking_);
- }
- ~TFastElasticQueue() {
- Stop();
- }
- void Start(size_t threadCount, size_t maxQueueSize) override {
- Y_ENSURE(Threads_.empty());
- Y_ENSURE(maxQueueSize > 0);
- Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
- MaxQueueSize_ = maxQueueSize;
- try {
- for (size_t i = 0; i < threadCount; ++i) {
- Threads_.push_back(Pool()->Run(this));
- }
- } catch (...) {
- Stop();
- throw;
- }
- Stopped_ = false;
- }
- size_t ObjectCount() const {
- //GuardCount_ can be temporary incremented above real object count in queue
- return Min(GuardCount_.load(), MaxQueueSize_);
- }
- bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
- if (Stopped_ || !obj) {
- return false;
- }
- if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
- GuardCount_.fetch_sub(1);
- return false;
- }
- QueueSize_.fetch_add(1);
- if (!Queue_->Enqueue(obj)) {
- //Simultaneous Dequeue calls can return not in exact fifo order of items,
- //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
- //the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
- GuardCount_.fetch_sub(1);
- QueueSize_.fetch_sub(1);
- return false;
- }
- Event_.NotifyOne();
- return true;
- }
- size_t Size() const noexcept override {
- return QueueSize_.load();
- }
- void Stop() noexcept override {
- Stopped_ = true;
- for (size_t i = 0; i < Threads_.size(); ++i) {
- while (!Queue_->Enqueue(nullptr)) {
- Sleep(TDuration::MilliSeconds(1));
- }
- Event_.NotifyOne();
- }
- while (!Threads_.empty()) {
- Threads_.back()->Join();
- Threads_.pop_back();
- }
- Queue_.Reset();
- }
- void DoExecute() override {
- TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
- while (true) {
- IObjectInQueue* job = nullptr;
- Event_.Await([&]() {
- return Queue_->Dequeue(job);
- });
- if (!job) {
- break;
- }
- QueueSize_.fetch_sub(1);
- Y_DEFER {
- GuardCount_.fetch_sub(1);
- };
- if (Params.Catching_) {
- try {
- try {
- job->Process(nullptr);
- } catch (...) {
- Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
- }
- } catch (...) {
- ;
- }
- } else {
- job->Process(nullptr);
- }
- }
- }
- private:
- std::atomic<bool> Stopped_ = false;
- size_t MaxQueueSize_ = 0;
- alignas(64) std::atomic<size_t> GuardCount_ = 0;
- alignas(64) std::atomic<size_t> QueueSize_ = 0;
- TVector<THolder<IThreadFactory::IThread>> Threads_;
- THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
- NYT::NThreading::TEventCount Event_;
- };
|