pool.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. #include <atomic>
  2. #include <util/system/defaults.h>
  3. #if defined(_unix_)
  4. #include <pthread.h>
  5. #endif
  6. #include <util/generic/vector.h>
  7. #include <util/generic/intrlist.h>
  8. #include <util/generic/yexception.h>
  9. #include <util/generic/ylimits.h>
  10. #include <util/generic/singleton.h>
  11. #include <util/generic/fastqueue.h>
  12. #include <util/stream/output.h>
  13. #include <util/string/builder.h>
  14. #include <util/system/event.h>
  15. #include <util/system/mutex.h>
  16. #include <util/system/condvar.h>
  17. #include <util/system/thread.h>
  18. #include <util/datetime/base.h>
  19. #include "factory.h"
  20. #include "pool.h"
  21. namespace {
  22. class TThreadNamer {
  23. public:
  24. TThreadNamer(const IThreadPool::TParams& params)
  25. : ThreadName(params.ThreadName_)
  26. , EnumerateThreads(params.EnumerateThreads_)
  27. {
  28. }
  29. explicit operator bool() const {
  30. return !ThreadName.empty();
  31. }
  32. void SetCurrentThreadName() {
  33. if (EnumerateThreads) {
  34. Set(TStringBuilder() << ThreadName << (Index++));
  35. } else {
  36. Set(ThreadName);
  37. }
  38. }
  39. private:
  40. void Set(const TString& name) {
  41. TThread::SetCurrentThreadName(name.c_str());
  42. }
  43. private:
  44. TString ThreadName;
  45. bool EnumerateThreads = false;
  46. std::atomic<ui64> Index{0};
  47. };
  48. }
  49. TThreadFactoryHolder::TThreadFactoryHolder() noexcept
  50. : Pool_(SystemThreadFactory())
  51. {
  52. }
  53. class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactory::IThreadAble {
  54. using TTsr = IThreadPool::TTsr;
  55. using TJobQueue = TFastQueue<IObjectInQueue*>;
  56. using TThreadRef = THolder<IThreadFactory::IThread>;
  57. public:
  58. inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params)
  59. : Parent_(parent)
  60. , Blocking(params.Blocking_)
  61. , Catching(params.Catching_)
  62. , Namer(params)
  63. , ShouldTerminate(true)
  64. , MaxQueueSize(0)
  65. , ThreadCountExpected(0)
  66. , ThreadCountReal(0)
  67. , Forked(false)
  68. {
  69. TAtforkQueueRestarter::Get().RegisterObject(this);
  70. Start(thrnum, maxqueue);
  71. }
  72. inline ~TImpl() override {
  73. try {
  74. Stop();
  75. } catch (...) {
  76. // ¯\_(ツ)_/¯
  77. }
  78. TAtforkQueueRestarter::Get().UnregisterObject(this);
  79. Y_ASSERT(Tharr.empty());
  80. }
  81. inline bool Add(IObjectInQueue* obj) {
  82. if (ShouldTerminate.load()) {
  83. return false;
  84. }
  85. if (Tharr.empty()) {
  86. TTsr tsr(Parent_);
  87. obj->Process(tsr);
  88. return true;
  89. }
  90. with_lock (QueueMutex) {
  91. while (MaxQueueSize > 0 && Queue.Size() >= MaxQueueSize && !ShouldTerminate.load()) {
  92. if (!Blocking) {
  93. return false;
  94. }
  95. QueuePopCond.Wait(QueueMutex);
  96. }
  97. if (ShouldTerminate.load()) {
  98. return false;
  99. }
  100. Queue.Push(obj);
  101. }
  102. QueuePushCond.Signal();
  103. return true;
  104. }
  105. inline size_t Size() const noexcept {
  106. auto guard = Guard(QueueMutex);
  107. return Queue.Size();
  108. }
  109. inline size_t GetMaxQueueSize() const noexcept {
  110. return MaxQueueSize;
  111. }
  112. inline size_t GetThreadCountExpected() const noexcept {
  113. return ThreadCountExpected;
  114. }
  115. inline size_t GetThreadCountReal() const noexcept {
  116. return ThreadCountReal;
  117. }
  118. inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") {
  119. Forked = true;
  120. }
  121. inline bool NeedRestart() const noexcept {
  122. return Forked;
  123. }
  124. private:
  125. inline void Start(size_t num, size_t maxque) {
  126. ShouldTerminate.store(false);
  127. MaxQueueSize = maxque;
  128. ThreadCountExpected = num;
  129. try {
  130. for (size_t i = 0; i < num; ++i) {
  131. Tharr.push_back(Parent_->Pool()->Run(this));
  132. ++ThreadCountReal;
  133. }
  134. } catch (...) {
  135. Stop();
  136. throw;
  137. }
  138. }
  139. inline void Stop() {
  140. ShouldTerminate.store(true);
  141. with_lock (QueueMutex) {
  142. QueuePopCond.BroadCast();
  143. }
  144. if (!NeedRestart()) {
  145. WaitForComplete();
  146. }
  147. Tharr.clear();
  148. ThreadCountExpected = 0;
  149. MaxQueueSize = 0;
  150. }
  151. inline void WaitForComplete() noexcept {
  152. with_lock (StopMutex) {
  153. while (ThreadCountReal) {
  154. with_lock (QueueMutex) {
  155. QueuePushCond.Signal();
  156. }
  157. StopCond.Wait(StopMutex);
  158. }
  159. }
  160. }
  161. void DoExecute() override {
  162. THolder<TTsr> tsr(new TTsr(Parent_));
  163. if (Namer) {
  164. Namer.SetCurrentThreadName();
  165. }
  166. while (true) {
  167. IObjectInQueue* job = nullptr;
  168. with_lock (QueueMutex) {
  169. while (Queue.Empty() && !ShouldTerminate.load()) {
  170. QueuePushCond.Wait(QueueMutex);
  171. }
  172. if (ShouldTerminate.load() && Queue.Empty()) {
  173. tsr.Destroy();
  174. break;
  175. }
  176. job = Queue.Pop();
  177. }
  178. QueuePopCond.Signal();
  179. if (Catching) {
  180. try {
  181. try {
  182. job->Process(*tsr);
  183. } catch (...) {
  184. Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
  185. }
  186. } catch (...) {
  187. // ¯\_(ツ)_/¯
  188. }
  189. } else {
  190. job->Process(*tsr);
  191. }
  192. }
  193. FinishOneThread();
  194. }
  195. inline void FinishOneThread() noexcept {
  196. auto guard = Guard(StopMutex);
  197. --ThreadCountReal;
  198. StopCond.Signal();
  199. }
  200. private:
  201. TThreadPool* Parent_;
  202. const bool Blocking;
  203. const bool Catching;
  204. TThreadNamer Namer;
  205. mutable TMutex QueueMutex;
  206. mutable TMutex StopMutex;
  207. TCondVar QueuePushCond;
  208. TCondVar QueuePopCond;
  209. TCondVar StopCond;
  210. TJobQueue Queue;
  211. TVector<TThreadRef> Tharr;
  212. std::atomic<bool> ShouldTerminate;
  213. size_t MaxQueueSize;
  214. size_t ThreadCountExpected;
  215. size_t ThreadCountReal;
  216. bool Forked;
  217. class TAtforkQueueRestarter {
  218. public:
  219. static TAtforkQueueRestarter& Get() {
  220. return *SingletonWithPriority<TAtforkQueueRestarter, 256>();
  221. }
  222. inline void RegisterObject(TImpl* obj) {
  223. auto guard = Guard(ActionMutex);
  224. RegisteredObjects.PushBack(obj);
  225. }
  226. inline void UnregisterObject(TImpl* obj) {
  227. auto guard = Guard(ActionMutex);
  228. obj->Unlink();
  229. }
  230. private:
  231. void ChildAction() {
  232. TTryGuard guard{ActionMutex};
  233. // If you get an error here, it means you've used fork(2) in multi-threaded environment and probably created thread pools often.
  234. // Don't use fork(2) in multi-threaded programs, don't create thread pools often.
  235. // The mutex is locked after fork iff the fork(2) call was concurrent with RegisterObject / UnregisterObject in another thread.
  236. Y_ABORT_UNLESS(guard.WasAcquired(), "Failed to acquire ActionMutex after fork");
  237. for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) {
  238. it->AtforkAction();
  239. }
  240. }
  241. static void ProcessChildAction() {
  242. Get().ChildAction();
  243. }
  244. TIntrusiveList<TImpl> RegisteredObjects;
  245. TMutex ActionMutex;
  246. public:
  247. inline TAtforkQueueRestarter() {
  248. #if defined(_bionic_)
  249. //no pthread_atfork on android libc
  250. #elif defined(_unix_)
  251. pthread_atfork(nullptr, nullptr, ProcessChildAction);
  252. #endif
  253. }
  254. };
  255. };
  256. TThreadPool::~TThreadPool() = default;
  257. size_t TThreadPool::Size() const noexcept {
  258. if (!Impl_.Get()) {
  259. return 0;
  260. }
  261. return Impl_->Size();
  262. }
  263. size_t TThreadPool::GetThreadCountExpected() const noexcept {
  264. if (!Impl_.Get()) {
  265. return 0;
  266. }
  267. return Impl_->GetThreadCountExpected();
  268. }
  269. size_t TThreadPool::GetThreadCountReal() const noexcept {
  270. if (!Impl_.Get()) {
  271. return 0;
  272. }
  273. return Impl_->GetThreadCountReal();
  274. }
  275. size_t TThreadPool::GetMaxQueueSize() const noexcept {
  276. if (!Impl_.Get()) {
  277. return 0;
  278. }
  279. return Impl_->GetMaxQueueSize();
  280. }
  281. bool TThreadPool::Add(IObjectInQueue* obj) {
  282. Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
  283. if (Impl_->NeedRestart()) {
  284. Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize());
  285. }
  286. return Impl_->Add(obj);
  287. }
  288. void TThreadPool::Start(size_t thrnum, size_t maxque) {
  289. Impl_.Reset(new TImpl(this, thrnum, maxque, Params));
  290. }
  291. void TThreadPool::Stop() noexcept {
  292. Impl_.Destroy();
  293. }
  294. static std::atomic<long> MtpQueueCounter = 0;
  295. class TAdaptiveThreadPool::TImpl {
  296. public:
  297. class TThread: public IThreadFactory::IThreadAble {
  298. public:
  299. inline TThread(TImpl* parent)
  300. : Impl_(parent)
  301. , Thread_(Impl_->Parent_->Pool()->Run(this))
  302. {
  303. }
  304. inline ~TThread() override {
  305. Impl_->DecThreadCount();
  306. }
  307. private:
  308. void DoExecute() noexcept override {
  309. THolder<TThread> This(this);
  310. if (Impl_->Namer) {
  311. Impl_->Namer.SetCurrentThreadName();
  312. }
  313. {
  314. TTsr tsr(Impl_->Parent_);
  315. IObjectInQueue* obj;
  316. while ((obj = Impl_->WaitForJob()) != nullptr) {
  317. if (Impl_->Catching) {
  318. try {
  319. try {
  320. obj->Process(tsr);
  321. } catch (...) {
  322. Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl;
  323. }
  324. } catch (...) {
  325. // ¯\_(ツ)_/¯
  326. }
  327. } else {
  328. obj->Process(tsr);
  329. }
  330. }
  331. }
  332. }
  333. private:
  334. TImpl* Impl_;
  335. THolder<IThreadFactory::IThread> Thread_;
  336. };
  337. inline TImpl(TAdaptiveThreadPool* parent, const TParams& params)
  338. : Parent_(parent)
  339. , Catching(params.Catching_)
  340. , Namer(params)
  341. , ThrCount_(0)
  342. , AllDone_(false)
  343. , Obj_(nullptr)
  344. , Free_(0)
  345. , IdleTime_(TDuration::Max())
  346. {
  347. snprintf(Name_, sizeof(Name_), "[mtp queue %ld]", ++MtpQueueCounter);
  348. }
  349. inline ~TImpl() {
  350. Stop();
  351. }
  352. inline void SetMaxIdleTime(TDuration idleTime) {
  353. IdleTime_ = idleTime;
  354. }
  355. inline const char* Name() const noexcept {
  356. return Name_;
  357. }
  358. inline void Add(IObjectInQueue* obj) {
  359. with_lock (Mutex_) {
  360. while (Obj_ != nullptr) {
  361. CondFree_.Wait(Mutex_);
  362. }
  363. if (Free_ == 0) {
  364. AddThreadNoLock();
  365. }
  366. Obj_ = obj;
  367. Y_ENSURE_EX(!AllDone_, TThreadPoolException() << TStringBuf("adding to a stopped queue"));
  368. }
  369. CondReady_.Signal();
  370. }
  371. inline void AddThreads(size_t n) {
  372. with_lock (Mutex_) {
  373. while (n) {
  374. AddThreadNoLock();
  375. --n;
  376. }
  377. }
  378. }
  379. inline size_t Size() const noexcept {
  380. return ThrCount_.load();
  381. }
  382. private:
  383. inline void IncThreadCount() noexcept {
  384. ++ThrCount_;
  385. }
  386. inline void DecThreadCount() noexcept {
  387. --ThrCount_;
  388. }
  389. inline void AddThreadNoLock() {
  390. IncThreadCount();
  391. try {
  392. new TThread(this);
  393. } catch (...) {
  394. DecThreadCount();
  395. throw;
  396. }
  397. }
  398. inline void Stop() noexcept {
  399. Mutex_.Acquire();
  400. AllDone_ = true;
  401. while (ThrCount_.load()) {
  402. Mutex_.Release();
  403. CondReady_.Signal();
  404. Mutex_.Acquire();
  405. }
  406. Mutex_.Release();
  407. }
  408. inline IObjectInQueue* WaitForJob() noexcept {
  409. Mutex_.Acquire();
  410. ++Free_;
  411. while (!Obj_ && !AllDone_) {
  412. if (!CondReady_.WaitT(Mutex_, IdleTime_)) {
  413. break;
  414. }
  415. }
  416. IObjectInQueue* ret = Obj_;
  417. Obj_ = nullptr;
  418. --Free_;
  419. Mutex_.Release();
  420. CondFree_.Signal();
  421. return ret;
  422. }
  423. private:
  424. TAdaptiveThreadPool* Parent_;
  425. const bool Catching;
  426. TThreadNamer Namer;
  427. std::atomic<size_t> ThrCount_;
  428. TMutex Mutex_;
  429. TCondVar CondReady_;
  430. TCondVar CondFree_;
  431. bool AllDone_;
  432. IObjectInQueue* Obj_;
  433. size_t Free_;
  434. char Name_[64];
  435. TDuration IdleTime_;
  436. };
  437. TThreadPoolBase::TThreadPoolBase(const TParams& params)
  438. : TThreadFactoryHolder(params.Factory_)
  439. , Params(params)
  440. {
  441. }
  442. #define DEFINE_THREAD_POOL_CTORS(type) \
  443. type::type(const TParams& params) \
  444. : TThreadPoolBase(params) \
  445. { \
  446. }
  447. DEFINE_THREAD_POOL_CTORS(TThreadPool)
  448. DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool)
  449. DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool)
  450. TAdaptiveThreadPool::~TAdaptiveThreadPool() = default;
  451. bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) {
  452. Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
  453. Impl_->Add(obj);
  454. return true;
  455. }
  456. void TAdaptiveThreadPool::Start(size_t, size_t) {
  457. Impl_.Reset(new TImpl(this, Params));
  458. }
  459. void TAdaptiveThreadPool::Stop() noexcept {
  460. Impl_.Destroy();
  461. }
  462. size_t TAdaptiveThreadPool::Size() const noexcept {
  463. if (Impl_.Get()) {
  464. return Impl_->Size();
  465. }
  466. return 0;
  467. }
  468. void TAdaptiveThreadPool::SetMaxIdleTime(TDuration interval) {
  469. Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
  470. Impl_->SetMaxIdleTime(interval);
  471. }
  472. TSimpleThreadPool::~TSimpleThreadPool() {
  473. try {
  474. Stop();
  475. } catch (...) {
  476. // ¯\_(ツ)_/¯
  477. }
  478. }
  479. bool TSimpleThreadPool::Add(IObjectInQueue* obj) {
  480. Y_ENSURE_EX(Slave_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
  481. return Slave_->Add(obj);
  482. }
  483. void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) {
  484. THolder<IThreadPool> tmp;
  485. TAdaptiveThreadPool* adaptive(nullptr);
  486. if (thrnum) {
  487. tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params));
  488. } else {
  489. adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params);
  490. tmp.Reset(adaptive);
  491. }
  492. tmp->Start(thrnum, maxque);
  493. if (adaptive) {
  494. adaptive->SetMaxIdleTime(TDuration::Seconds(100));
  495. }
  496. Slave_.Swap(tmp);
  497. }
  498. void TSimpleThreadPool::Stop() noexcept {
  499. Slave_.Destroy();
  500. }
  501. size_t TSimpleThreadPool::Size() const noexcept {
  502. if (Slave_.Get()) {
  503. return Slave_->Size();
  504. }
  505. return 0;
  506. }
  507. namespace {
  508. class TOwnedObjectInQueue: public IObjectInQueue {
  509. private:
  510. THolder<IObjectInQueue> Owned;
  511. public:
  512. TOwnedObjectInQueue(THolder<IObjectInQueue> owned)
  513. : Owned(std::move(owned))
  514. {
  515. }
  516. void Process(void* data) override {
  517. THolder<TOwnedObjectInQueue> self(this);
  518. Owned->Process(data);
  519. }
  520. };
  521. }
  522. void IThreadPool::SafeAdd(IObjectInQueue* obj) {
  523. Y_ENSURE_EX(Add(obj), TThreadPoolException() << TStringBuf("can not add object to queue"));
  524. }
  525. void IThreadPool::SafeAddAndOwn(THolder<IObjectInQueue> obj) {
  526. Y_ENSURE_EX(AddAndOwn(std::move(obj)), TThreadPoolException() << TStringBuf("can not add to queue and own"));
  527. }
  528. bool IThreadPool::AddAndOwn(THolder<IObjectInQueue> obj) {
  529. auto owner = MakeHolder<TOwnedObjectInQueue>(std::move(obj));
  530. bool added = Add(owner.Get());
  531. if (added) {
  532. Y_UNUSED(owner.Release());
  533. }
  534. return added;
  535. }
  536. using IThread = IThreadFactory::IThread;
  537. using IThreadAble = IThreadFactory::IThreadAble;
  538. namespace {
  539. class TPoolThread: public IThread {
  540. class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> {
  541. public:
  542. inline TThreadImpl(IThreadAble* func)
  543. : Func_(func)
  544. {
  545. }
  546. ~TThreadImpl() override = default;
  547. inline void WaitForStart() noexcept {
  548. StartEvent_.Wait();
  549. }
  550. inline void WaitForComplete() noexcept {
  551. CompleteEvent_.Wait();
  552. }
  553. private:
  554. void Process(void* /*tsr*/) override {
  555. TThreadImplRef This(this);
  556. {
  557. StartEvent_.Signal();
  558. try {
  559. Func_->Execute();
  560. } catch (...) {
  561. // ¯\_(ツ)_/¯
  562. }
  563. CompleteEvent_.Signal();
  564. }
  565. }
  566. private:
  567. IThreadAble* Func_;
  568. TSystemEvent CompleteEvent_;
  569. TSystemEvent StartEvent_;
  570. };
  571. using TThreadImplRef = TIntrusivePtr<TThreadImpl>;
  572. public:
  573. inline TPoolThread(IThreadPool* parent)
  574. : Parent_(parent)
  575. {
  576. }
  577. ~TPoolThread() override {
  578. if (Impl_) {
  579. Impl_->WaitForStart();
  580. }
  581. }
  582. private:
  583. void DoRun(IThreadAble* func) override {
  584. TThreadImplRef impl(new TThreadImpl(func));
  585. Parent_->SafeAdd(impl.Get());
  586. Impl_.Swap(impl);
  587. }
  588. void DoJoin() noexcept override {
  589. if (Impl_) {
  590. Impl_->WaitForComplete();
  591. Impl_ = nullptr;
  592. }
  593. }
  594. private:
  595. IThreadPool* Parent_;
  596. TThreadImplRef Impl_;
  597. };
  598. }
  599. IThread* IThreadPool::DoCreate() {
  600. return new TPoolThread(this);
  601. }
  602. THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) {
  603. THolder<IThreadPool> queue;
  604. if (threadsCount > 1) {
  605. queue.Reset(new TThreadPool(params));
  606. } else {
  607. queue.Reset(new TFakeThreadPool());
  608. }
  609. queue->Start(threadsCount, queueSizeLimit);
  610. return queue;
  611. }