123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775 |
- #include <atomic>
- #include <util/system/defaults.h>
- #if defined(_unix_)
- #include <pthread.h>
- #endif
- #include <util/generic/vector.h>
- #include <util/generic/intrlist.h>
- #include <util/generic/yexception.h>
- #include <util/generic/ylimits.h>
- #include <util/generic/singleton.h>
- #include <util/generic/fastqueue.h>
- #include <util/stream/output.h>
- #include <util/string/builder.h>
- #include <util/system/event.h>
- #include <util/system/mutex.h>
- #include <util/system/condvar.h>
- #include <util/system/thread.h>
- #include <util/datetime/base.h>
- #include "factory.h"
- #include "pool.h"
- namespace {
- class TThreadNamer {
- public:
- TThreadNamer(const IThreadPool::TParams& params)
- : ThreadName(params.ThreadName_)
- , EnumerateThreads(params.EnumerateThreads_)
- {
- }
- explicit operator bool() const {
- return !ThreadName.empty();
- }
- void SetCurrentThreadName() {
- if (EnumerateThreads) {
- Set(TStringBuilder() << ThreadName << (Index++));
- } else {
- Set(ThreadName);
- }
- }
- private:
- void Set(const TString& name) {
- TThread::SetCurrentThreadName(name.c_str());
- }
- private:
- TString ThreadName;
- bool EnumerateThreads = false;
- std::atomic<ui64> Index{0};
- };
- }
- TThreadFactoryHolder::TThreadFactoryHolder() noexcept
- : Pool_(SystemThreadFactory())
- {
- }
- class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactory::IThreadAble {
- using TTsr = IThreadPool::TTsr;
- using TJobQueue = TFastQueue<IObjectInQueue*>;
- using TThreadRef = THolder<IThreadFactory::IThread>;
- public:
- inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params)
- : Parent_(parent)
- , Blocking(params.Blocking_)
- , Catching(params.Catching_)
- , Namer(params)
- , ShouldTerminate(true)
- , MaxQueueSize(0)
- , ThreadCountExpected(0)
- , ThreadCountReal(0)
- , Forked(false)
- {
- TAtforkQueueRestarter::Get().RegisterObject(this);
- Start(thrnum, maxqueue);
- }
- inline ~TImpl() override {
- try {
- Stop();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
- TAtforkQueueRestarter::Get().UnregisterObject(this);
- Y_ASSERT(Tharr.empty());
- }
- inline bool Add(IObjectInQueue* obj) {
- if (ShouldTerminate.load()) {
- return false;
- }
- if (Tharr.empty()) {
- TTsr tsr(Parent_);
- obj->Process(tsr);
- return true;
- }
- with_lock (QueueMutex) {
- while (MaxQueueSize > 0 && Queue.Size() >= MaxQueueSize && !ShouldTerminate.load()) {
- if (!Blocking) {
- return false;
- }
- QueuePopCond.Wait(QueueMutex);
- }
- if (ShouldTerminate.load()) {
- return false;
- }
- Queue.Push(obj);
- }
- QueuePushCond.Signal();
- return true;
- }
- inline size_t Size() const noexcept {
- auto guard = Guard(QueueMutex);
- return Queue.Size();
- }
- inline size_t GetMaxQueueSize() const noexcept {
- return MaxQueueSize;
- }
- inline size_t GetThreadCountExpected() const noexcept {
- return ThreadCountExpected;
- }
- inline size_t GetThreadCountReal() const noexcept {
- return ThreadCountReal;
- }
- inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") {
- Forked = true;
- }
- inline bool NeedRestart() const noexcept {
- return Forked;
- }
- private:
- inline void Start(size_t num, size_t maxque) {
- ShouldTerminate.store(false);
- MaxQueueSize = maxque;
- ThreadCountExpected = num;
- try {
- for (size_t i = 0; i < num; ++i) {
- Tharr.push_back(Parent_->Pool()->Run(this));
- ++ThreadCountReal;
- }
- } catch (...) {
- Stop();
- throw;
- }
- }
- inline void Stop() {
- ShouldTerminate.store(true);
- with_lock (QueueMutex) {
- QueuePopCond.BroadCast();
- }
- if (!NeedRestart()) {
- WaitForComplete();
- }
- Tharr.clear();
- ThreadCountExpected = 0;
- MaxQueueSize = 0;
- }
- inline void WaitForComplete() noexcept {
- with_lock (StopMutex) {
- while (ThreadCountReal) {
- with_lock (QueueMutex) {
- QueuePushCond.Signal();
- }
- StopCond.Wait(StopMutex);
- }
- }
- }
- void DoExecute() override {
- THolder<TTsr> tsr(new TTsr(Parent_));
- if (Namer) {
- Namer.SetCurrentThreadName();
- }
- while (true) {
- IObjectInQueue* job = nullptr;
- with_lock (QueueMutex) {
- while (Queue.Empty() && !ShouldTerminate.load()) {
- QueuePushCond.Wait(QueueMutex);
- }
- if (ShouldTerminate.load() && Queue.Empty()) {
- tsr.Destroy();
- break;
- }
- job = Queue.Pop();
- }
- QueuePopCond.Signal();
- if (Catching) {
- try {
- try {
- job->Process(*tsr);
- } catch (...) {
- Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
- }
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
- } else {
- job->Process(*tsr);
- }
- }
- FinishOneThread();
- }
- inline void FinishOneThread() noexcept {
- auto guard = Guard(StopMutex);
- --ThreadCountReal;
- StopCond.Signal();
- }
- private:
- TThreadPool* Parent_;
- const bool Blocking;
- const bool Catching;
- TThreadNamer Namer;
- mutable TMutex QueueMutex;
- mutable TMutex StopMutex;
- TCondVar QueuePushCond;
- TCondVar QueuePopCond;
- TCondVar StopCond;
- TJobQueue Queue;
- TVector<TThreadRef> Tharr;
- std::atomic<bool> ShouldTerminate;
- size_t MaxQueueSize;
- size_t ThreadCountExpected;
- size_t ThreadCountReal;
- bool Forked;
- class TAtforkQueueRestarter {
- public:
- static TAtforkQueueRestarter& Get() {
- return *SingletonWithPriority<TAtforkQueueRestarter, 256>();
- }
- inline void RegisterObject(TImpl* obj) {
- auto guard = Guard(ActionMutex);
- RegisteredObjects.PushBack(obj);
- }
- inline void UnregisterObject(TImpl* obj) {
- auto guard = Guard(ActionMutex);
- obj->Unlink();
- }
- private:
- void ChildAction() {
- TTryGuard guard{ActionMutex};
- // If you get an error here, it means you've used fork(2) in multi-threaded environment and probably created thread pools often.
- // Don't use fork(2) in multi-threaded programs, don't create thread pools often.
- // The mutex is locked after fork iff the fork(2) call was concurrent with RegisterObject / UnregisterObject in another thread.
- Y_ABORT_UNLESS(guard.WasAcquired(), "Failed to acquire ActionMutex after fork");
- for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) {
- it->AtforkAction();
- }
- }
- static void ProcessChildAction() {
- Get().ChildAction();
- }
- TIntrusiveList<TImpl> RegisteredObjects;
- TMutex ActionMutex;
- public:
- inline TAtforkQueueRestarter() {
- #if defined(_bionic_)
- //no pthread_atfork on android libc
- #elif defined(_unix_)
- pthread_atfork(nullptr, nullptr, ProcessChildAction);
- #endif
- }
- };
- };
- TThreadPool::~TThreadPool() = default;
- size_t TThreadPool::Size() const noexcept {
- if (!Impl_.Get()) {
- return 0;
- }
- return Impl_->Size();
- }
- size_t TThreadPool::GetThreadCountExpected() const noexcept {
- if (!Impl_.Get()) {
- return 0;
- }
- return Impl_->GetThreadCountExpected();
- }
- size_t TThreadPool::GetThreadCountReal() const noexcept {
- if (!Impl_.Get()) {
- return 0;
- }
- return Impl_->GetThreadCountReal();
- }
- size_t TThreadPool::GetMaxQueueSize() const noexcept {
- if (!Impl_.Get()) {
- return 0;
- }
- return Impl_->GetMaxQueueSize();
- }
- bool TThreadPool::Add(IObjectInQueue* obj) {
- Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
- if (Impl_->NeedRestart()) {
- Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize());
- }
- return Impl_->Add(obj);
- }
- void TThreadPool::Start(size_t thrnum, size_t maxque) {
- Impl_.Reset(new TImpl(this, thrnum, maxque, Params));
- }
- void TThreadPool::Stop() noexcept {
- Impl_.Destroy();
- }
- static std::atomic<long> MtpQueueCounter = 0;
- class TAdaptiveThreadPool::TImpl {
- public:
- class TThread: public IThreadFactory::IThreadAble {
- public:
- inline TThread(TImpl* parent)
- : Impl_(parent)
- , Thread_(Impl_->Parent_->Pool()->Run(this))
- {
- }
- inline ~TThread() override {
- Impl_->DecThreadCount();
- }
- private:
- void DoExecute() noexcept override {
- THolder<TThread> This(this);
- if (Impl_->Namer) {
- Impl_->Namer.SetCurrentThreadName();
- }
- {
- TTsr tsr(Impl_->Parent_);
- IObjectInQueue* obj;
- while ((obj = Impl_->WaitForJob()) != nullptr) {
- if (Impl_->Catching) {
- try {
- try {
- obj->Process(tsr);
- } catch (...) {
- Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl;
- }
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
- } else {
- obj->Process(tsr);
- }
- }
- }
- }
- private:
- TImpl* Impl_;
- THolder<IThreadFactory::IThread> Thread_;
- };
- inline TImpl(TAdaptiveThreadPool* parent, const TParams& params)
- : Parent_(parent)
- , Catching(params.Catching_)
- , Namer(params)
- , ThrCount_(0)
- , AllDone_(false)
- , Obj_(nullptr)
- , Free_(0)
- , IdleTime_(TDuration::Max())
- {
- sprintf(Name_, "[mtp queue %ld]", ++MtpQueueCounter);
- }
- inline ~TImpl() {
- Stop();
- }
- inline void SetMaxIdleTime(TDuration idleTime) {
- IdleTime_ = idleTime;
- }
- inline const char* Name() const noexcept {
- return Name_;
- }
- inline void Add(IObjectInQueue* obj) {
- with_lock (Mutex_) {
- while (Obj_ != nullptr) {
- CondFree_.Wait(Mutex_);
- }
- if (Free_ == 0) {
- AddThreadNoLock();
- }
- Obj_ = obj;
- Y_ENSURE_EX(!AllDone_, TThreadPoolException() << TStringBuf("adding to a stopped queue"));
- }
- CondReady_.Signal();
- }
- inline void AddThreads(size_t n) {
- with_lock (Mutex_) {
- while (n) {
- AddThreadNoLock();
- --n;
- }
- }
- }
- inline size_t Size() const noexcept {
- return ThrCount_.load();
- }
- private:
- inline void IncThreadCount() noexcept {
- ++ThrCount_;
- }
- inline void DecThreadCount() noexcept {
- --ThrCount_;
- }
- inline void AddThreadNoLock() {
- IncThreadCount();
- try {
- new TThread(this);
- } catch (...) {
- DecThreadCount();
- throw;
- }
- }
- inline void Stop() noexcept {
- Mutex_.Acquire();
- AllDone_ = true;
- while (ThrCount_.load()) {
- Mutex_.Release();
- CondReady_.Signal();
- Mutex_.Acquire();
- }
- Mutex_.Release();
- }
- inline IObjectInQueue* WaitForJob() noexcept {
- Mutex_.Acquire();
- ++Free_;
- while (!Obj_ && !AllDone_) {
- if (!CondReady_.WaitT(Mutex_, IdleTime_)) {
- break;
- }
- }
- IObjectInQueue* ret = Obj_;
- Obj_ = nullptr;
- --Free_;
- Mutex_.Release();
- CondFree_.Signal();
- return ret;
- }
- private:
- TAdaptiveThreadPool* Parent_;
- const bool Catching;
- TThreadNamer Namer;
- std::atomic<size_t> ThrCount_;
- TMutex Mutex_;
- TCondVar CondReady_;
- TCondVar CondFree_;
- bool AllDone_;
- IObjectInQueue* Obj_;
- size_t Free_;
- char Name_[64];
- TDuration IdleTime_;
- };
- TThreadPoolBase::TThreadPoolBase(const TParams& params)
- : TThreadFactoryHolder(params.Factory_)
- , Params(params)
- {
- }
- #define DEFINE_THREAD_POOL_CTORS(type) \
- type::type(const TParams& params) \
- : TThreadPoolBase(params) \
- { \
- }
- DEFINE_THREAD_POOL_CTORS(TThreadPool)
- DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool)
- DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool)
- TAdaptiveThreadPool::~TAdaptiveThreadPool() = default;
- bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) {
- Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
- Impl_->Add(obj);
- return true;
- }
- void TAdaptiveThreadPool::Start(size_t, size_t) {
- Impl_.Reset(new TImpl(this, Params));
- }
- void TAdaptiveThreadPool::Stop() noexcept {
- Impl_.Destroy();
- }
- size_t TAdaptiveThreadPool::Size() const noexcept {
- if (Impl_.Get()) {
- return Impl_->Size();
- }
- return 0;
- }
- void TAdaptiveThreadPool::SetMaxIdleTime(TDuration interval) {
- Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
- Impl_->SetMaxIdleTime(interval);
- }
- TSimpleThreadPool::~TSimpleThreadPool() {
- try {
- Stop();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
- }
- bool TSimpleThreadPool::Add(IObjectInQueue* obj) {
- Y_ENSURE_EX(Slave_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
- return Slave_->Add(obj);
- }
- void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) {
- THolder<IThreadPool> tmp;
- TAdaptiveThreadPool* adaptive(nullptr);
- if (thrnum) {
- tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params));
- } else {
- adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params);
- tmp.Reset(adaptive);
- }
- tmp->Start(thrnum, maxque);
- if (adaptive) {
- adaptive->SetMaxIdleTime(TDuration::Seconds(100));
- }
- Slave_.Swap(tmp);
- }
- void TSimpleThreadPool::Stop() noexcept {
- Slave_.Destroy();
- }
- size_t TSimpleThreadPool::Size() const noexcept {
- if (Slave_.Get()) {
- return Slave_->Size();
- }
- return 0;
- }
- namespace {
- class TOwnedObjectInQueue: public IObjectInQueue {
- private:
- THolder<IObjectInQueue> Owned;
- public:
- TOwnedObjectInQueue(THolder<IObjectInQueue> owned)
- : Owned(std::move(owned))
- {
- }
- void Process(void* data) override {
- THolder<TOwnedObjectInQueue> self(this);
- Owned->Process(data);
- }
- };
- }
- void IThreadPool::SafeAdd(IObjectInQueue* obj) {
- Y_ENSURE_EX(Add(obj), TThreadPoolException() << TStringBuf("can not add object to queue"));
- }
- void IThreadPool::SafeAddAndOwn(THolder<IObjectInQueue> obj) {
- Y_ENSURE_EX(AddAndOwn(std::move(obj)), TThreadPoolException() << TStringBuf("can not add to queue and own"));
- }
- bool IThreadPool::AddAndOwn(THolder<IObjectInQueue> obj) {
- auto owner = MakeHolder<TOwnedObjectInQueue>(std::move(obj));
- bool added = Add(owner.Get());
- if (added) {
- Y_UNUSED(owner.Release());
- }
- return added;
- }
- using IThread = IThreadFactory::IThread;
- using IThreadAble = IThreadFactory::IThreadAble;
- namespace {
- class TPoolThread: public IThread {
- class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> {
- public:
- inline TThreadImpl(IThreadAble* func)
- : Func_(func)
- {
- }
- ~TThreadImpl() override = default;
- inline void WaitForStart() noexcept {
- StartEvent_.Wait();
- }
- inline void WaitForComplete() noexcept {
- CompleteEvent_.Wait();
- }
- private:
- void Process(void* /*tsr*/) override {
- TThreadImplRef This(this);
- {
- StartEvent_.Signal();
- try {
- Func_->Execute();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
- CompleteEvent_.Signal();
- }
- }
- private:
- IThreadAble* Func_;
- TSystemEvent CompleteEvent_;
- TSystemEvent StartEvent_;
- };
- using TThreadImplRef = TIntrusivePtr<TThreadImpl>;
- public:
- inline TPoolThread(IThreadPool* parent)
- : Parent_(parent)
- {
- }
- ~TPoolThread() override {
- if (Impl_) {
- Impl_->WaitForStart();
- }
- }
- private:
- void DoRun(IThreadAble* func) override {
- TThreadImplRef impl(new TThreadImpl(func));
- Parent_->SafeAdd(impl.Get());
- Impl_.Swap(impl);
- }
- void DoJoin() noexcept override {
- if (Impl_) {
- Impl_->WaitForComplete();
- Impl_ = nullptr;
- }
- }
- private:
- IThreadPool* Parent_;
- TThreadImplRef Impl_;
- };
- }
- IThread* IThreadPool::DoCreate() {
- return new TPoolThread(this);
- }
- THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) {
- THolder<IThreadPool> queue;
- if (threadsCount > 1) {
- queue.Reset(new TThreadPool(params));
- } else {
- queue.Reset(new TFakeThreadPool());
- }
- queue->Start(threadsCount, queueSizeLimit);
- return queue;
- }
|