pool.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772
  1. #include <atomic>
  2. #include <util/system/defaults.h>
  3. #if defined(_unix_)
  4. #include <pthread.h>
  5. #endif
  6. #include <util/generic/vector.h>
  7. #include <util/generic/intrlist.h>
  8. #include <util/generic/yexception.h>
  9. #include <util/generic/ylimits.h>
  10. #include <util/generic/singleton.h>
  11. #include <util/generic/fastqueue.h>
  12. #include <util/stream/output.h>
  13. #include <util/string/builder.h>
  14. #include <util/system/event.h>
  15. #include <util/system/mutex.h>
  16. #include <util/system/atomic.h>
  17. #include <util/system/condvar.h>
  18. #include <util/system/thread.h>
  19. #include <util/datetime/base.h>
  20. #include "factory.h"
  21. #include "pool.h"
  22. namespace {
  23. class TThreadNamer {
  24. public:
  25. TThreadNamer(const IThreadPool::TParams& params)
  26. : ThreadName(params.ThreadName_)
  27. , EnumerateThreads(params.EnumerateThreads_)
  28. {
  29. }
  30. explicit operator bool() const {
  31. return !ThreadName.empty();
  32. }
  33. void SetCurrentThreadName() {
  34. if (EnumerateThreads) {
  35. Set(TStringBuilder() << ThreadName << (Index++));
  36. } else {
  37. Set(ThreadName);
  38. }
  39. }
  40. private:
  41. void Set(const TString& name) {
  42. TThread::SetCurrentThreadName(name.c_str());
  43. }
  44. private:
  45. TString ThreadName;
  46. bool EnumerateThreads = false;
  47. std::atomic<ui64> Index{0};
  48. };
  49. }
  50. TThreadFactoryHolder::TThreadFactoryHolder() noexcept
  51. : Pool_(SystemThreadFactory())
  52. {
  53. }
  54. class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactory::IThreadAble {
  55. using TTsr = IThreadPool::TTsr;
  56. using TJobQueue = TFastQueue<IObjectInQueue*>;
  57. using TThreadRef = THolder<IThreadFactory::IThread>;
  58. public:
  59. inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params)
  60. : Parent_(parent)
  61. , Blocking(params.Blocking_)
  62. , Catching(params.Catching_)
  63. , Namer(params)
  64. , ShouldTerminate(1)
  65. , MaxQueueSize(0)
  66. , ThreadCountExpected(0)
  67. , ThreadCountReal(0)
  68. , Forked(false)
  69. {
  70. TAtforkQueueRestarter::Get().RegisterObject(this);
  71. Start(thrnum, maxqueue);
  72. }
  73. inline ~TImpl() override {
  74. try {
  75. Stop();
  76. } catch (...) {
  77. // ¯\_(ツ)_/¯
  78. }
  79. TAtforkQueueRestarter::Get().UnregisterObject(this);
  80. Y_ASSERT(Tharr.empty());
  81. }
  82. inline bool Add(IObjectInQueue* obj) {
  83. if (AtomicGet(ShouldTerminate)) {
  84. return false;
  85. }
  86. if (Tharr.empty()) {
  87. TTsr tsr(Parent_);
  88. obj->Process(tsr);
  89. return true;
  90. }
  91. with_lock (QueueMutex) {
  92. while (MaxQueueSize > 0 && Queue.Size() >= MaxQueueSize && !AtomicGet(ShouldTerminate)) {
  93. if (!Blocking) {
  94. return false;
  95. }
  96. QueuePopCond.Wait(QueueMutex);
  97. }
  98. if (AtomicGet(ShouldTerminate)) {
  99. return false;
  100. }
  101. Queue.Push(obj);
  102. }
  103. QueuePushCond.Signal();
  104. return true;
  105. }
  106. inline size_t Size() const noexcept {
  107. auto guard = Guard(QueueMutex);
  108. return Queue.Size();
  109. }
  110. inline size_t GetMaxQueueSize() const noexcept {
  111. return MaxQueueSize;
  112. }
  113. inline size_t GetThreadCountExpected() const noexcept {
  114. return ThreadCountExpected;
  115. }
  116. inline size_t GetThreadCountReal() const noexcept {
  117. return ThreadCountReal;
  118. }
  119. inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") {
  120. Forked = true;
  121. }
  122. inline bool NeedRestart() const noexcept {
  123. return Forked;
  124. }
  125. private:
  126. inline void Start(size_t num, size_t maxque) {
  127. AtomicSet(ShouldTerminate, 0);
  128. MaxQueueSize = maxque;
  129. ThreadCountExpected = num;
  130. try {
  131. for (size_t i = 0; i < num; ++i) {
  132. Tharr.push_back(Parent_->Pool()->Run(this));
  133. ++ThreadCountReal;
  134. }
  135. } catch (...) {
  136. Stop();
  137. throw;
  138. }
  139. }
  140. inline void Stop() {
  141. AtomicSet(ShouldTerminate, 1);
  142. with_lock (QueueMutex) {
  143. QueuePopCond.BroadCast();
  144. }
  145. if (!NeedRestart()) {
  146. WaitForComplete();
  147. }
  148. Tharr.clear();
  149. ThreadCountExpected = 0;
  150. MaxQueueSize = 0;
  151. }
  152. inline void WaitForComplete() noexcept {
  153. with_lock (StopMutex) {
  154. while (ThreadCountReal) {
  155. with_lock (QueueMutex) {
  156. QueuePushCond.Signal();
  157. }
  158. StopCond.Wait(StopMutex);
  159. }
  160. }
  161. }
  162. void DoExecute() override {
  163. THolder<TTsr> tsr(new TTsr(Parent_));
  164. if (Namer) {
  165. Namer.SetCurrentThreadName();
  166. }
  167. while (true) {
  168. IObjectInQueue* job = nullptr;
  169. with_lock (QueueMutex) {
  170. while (Queue.Empty() && !AtomicGet(ShouldTerminate)) {
  171. QueuePushCond.Wait(QueueMutex);
  172. }
  173. if (AtomicGet(ShouldTerminate) && Queue.Empty()) {
  174. tsr.Destroy();
  175. break;
  176. }
  177. job = Queue.Pop();
  178. }
  179. QueuePopCond.Signal();
  180. if (Catching) {
  181. try {
  182. try {
  183. job->Process(*tsr);
  184. } catch (...) {
  185. Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
  186. }
  187. } catch (...) {
  188. // ¯\_(ツ)_/¯
  189. }
  190. } else {
  191. job->Process(*tsr);
  192. }
  193. }
  194. FinishOneThread();
  195. }
  196. inline void FinishOneThread() noexcept {
  197. auto guard = Guard(StopMutex);
  198. --ThreadCountReal;
  199. StopCond.Signal();
  200. }
  201. private:
  202. TThreadPool* Parent_;
  203. const bool Blocking;
  204. const bool Catching;
  205. TThreadNamer Namer;
  206. mutable TMutex QueueMutex;
  207. mutable TMutex StopMutex;
  208. TCondVar QueuePushCond;
  209. TCondVar QueuePopCond;
  210. TCondVar StopCond;
  211. TJobQueue Queue;
  212. TVector<TThreadRef> Tharr;
  213. TAtomic ShouldTerminate;
  214. size_t MaxQueueSize;
  215. size_t ThreadCountExpected;
  216. size_t ThreadCountReal;
  217. bool Forked;
  218. class TAtforkQueueRestarter {
  219. public:
  220. static TAtforkQueueRestarter& Get() {
  221. return *SingletonWithPriority<TAtforkQueueRestarter, 256>();
  222. }
  223. inline void RegisterObject(TImpl* obj) {
  224. auto guard = Guard(ActionMutex);
  225. RegisteredObjects.PushBack(obj);
  226. }
  227. inline void UnregisterObject(TImpl* obj) {
  228. auto guard = Guard(ActionMutex);
  229. obj->Unlink();
  230. }
  231. private:
  232. void ChildAction() {
  233. with_lock (ActionMutex) {
  234. for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) {
  235. it->AtforkAction();
  236. }
  237. }
  238. }
  239. static void ProcessChildAction() {
  240. Get().ChildAction();
  241. }
  242. TIntrusiveList<TImpl> RegisteredObjects;
  243. TMutex ActionMutex;
  244. public:
  245. inline TAtforkQueueRestarter() {
  246. #if defined(_bionic_)
  247. //no pthread_atfork on android libc
  248. #elif defined(_unix_)
  249. pthread_atfork(nullptr, nullptr, ProcessChildAction);
  250. #endif
  251. }
  252. };
  253. };
  254. TThreadPool::~TThreadPool() = default;
  255. size_t TThreadPool::Size() const noexcept {
  256. if (!Impl_.Get()) {
  257. return 0;
  258. }
  259. return Impl_->Size();
  260. }
  261. size_t TThreadPool::GetThreadCountExpected() const noexcept {
  262. if (!Impl_.Get()) {
  263. return 0;
  264. }
  265. return Impl_->GetThreadCountExpected();
  266. }
  267. size_t TThreadPool::GetThreadCountReal() const noexcept {
  268. if (!Impl_.Get()) {
  269. return 0;
  270. }
  271. return Impl_->GetThreadCountReal();
  272. }
  273. size_t TThreadPool::GetMaxQueueSize() const noexcept {
  274. if (!Impl_.Get()) {
  275. return 0;
  276. }
  277. return Impl_->GetMaxQueueSize();
  278. }
  279. bool TThreadPool::Add(IObjectInQueue* obj) {
  280. Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
  281. if (Impl_->NeedRestart()) {
  282. Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize());
  283. }
  284. return Impl_->Add(obj);
  285. }
  286. void TThreadPool::Start(size_t thrnum, size_t maxque) {
  287. Impl_.Reset(new TImpl(this, thrnum, maxque, Params));
  288. }
  289. void TThreadPool::Stop() noexcept {
  290. Impl_.Destroy();
  291. }
  292. static TAtomic mtp_queue_counter = 0;
  293. class TAdaptiveThreadPool::TImpl {
  294. public:
  295. class TThread: public IThreadFactory::IThreadAble {
  296. public:
  297. inline TThread(TImpl* parent)
  298. : Impl_(parent)
  299. , Thread_(Impl_->Parent_->Pool()->Run(this))
  300. {
  301. }
  302. inline ~TThread() override {
  303. Impl_->DecThreadCount();
  304. }
  305. private:
  306. void DoExecute() noexcept override {
  307. THolder<TThread> This(this);
  308. if (Impl_->Namer) {
  309. Impl_->Namer.SetCurrentThreadName();
  310. }
  311. {
  312. TTsr tsr(Impl_->Parent_);
  313. IObjectInQueue* obj;
  314. while ((obj = Impl_->WaitForJob()) != nullptr) {
  315. if (Impl_->Catching) {
  316. try {
  317. try {
  318. obj->Process(tsr);
  319. } catch (...) {
  320. Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl;
  321. }
  322. } catch (...) {
  323. // ¯\_(ツ)_/¯
  324. }
  325. } else {
  326. obj->Process(tsr);
  327. }
  328. }
  329. }
  330. }
  331. private:
  332. TImpl* Impl_;
  333. THolder<IThreadFactory::IThread> Thread_;
  334. };
  335. inline TImpl(TAdaptiveThreadPool* parent, const TParams& params)
  336. : Parent_(parent)
  337. , Catching(params.Catching_)
  338. , Namer(params)
  339. , ThrCount_(0)
  340. , AllDone_(false)
  341. , Obj_(nullptr)
  342. , Free_(0)
  343. , IdleTime_(TDuration::Max())
  344. {
  345. sprintf(Name_, "[mtp queue %ld]", (long)AtomicAdd(mtp_queue_counter, 1));
  346. }
  347. inline ~TImpl() {
  348. Stop();
  349. }
  350. inline void SetMaxIdleTime(TDuration idleTime) {
  351. IdleTime_ = idleTime;
  352. }
  353. inline const char* Name() const noexcept {
  354. return Name_;
  355. }
  356. inline void Add(IObjectInQueue* obj) {
  357. with_lock (Mutex_) {
  358. while (Obj_ != nullptr) {
  359. CondFree_.Wait(Mutex_);
  360. }
  361. if (Free_ == 0) {
  362. AddThreadNoLock();
  363. }
  364. Obj_ = obj;
  365. Y_ENSURE_EX(!AllDone_, TThreadPoolException() << TStringBuf("adding to a stopped queue"));
  366. }
  367. CondReady_.Signal();
  368. }
  369. inline void AddThreads(size_t n) {
  370. with_lock (Mutex_) {
  371. while (n) {
  372. AddThreadNoLock();
  373. --n;
  374. }
  375. }
  376. }
  377. inline size_t Size() const noexcept {
  378. return (size_t)ThrCount_;
  379. }
  380. private:
  381. inline void IncThreadCount() noexcept {
  382. AtomicAdd(ThrCount_, 1);
  383. }
  384. inline void DecThreadCount() noexcept {
  385. AtomicAdd(ThrCount_, -1);
  386. }
  387. inline void AddThreadNoLock() {
  388. IncThreadCount();
  389. try {
  390. new TThread(this);
  391. } catch (...) {
  392. DecThreadCount();
  393. throw;
  394. }
  395. }
  396. inline void Stop() noexcept {
  397. Mutex_.Acquire();
  398. AllDone_ = true;
  399. while (AtomicGet(ThrCount_)) {
  400. Mutex_.Release();
  401. CondReady_.Signal();
  402. Mutex_.Acquire();
  403. }
  404. Mutex_.Release();
  405. }
  406. inline IObjectInQueue* WaitForJob() noexcept {
  407. Mutex_.Acquire();
  408. ++Free_;
  409. while (!Obj_ && !AllDone_) {
  410. if (!CondReady_.WaitT(Mutex_, IdleTime_)) {
  411. break;
  412. }
  413. }
  414. IObjectInQueue* ret = Obj_;
  415. Obj_ = nullptr;
  416. --Free_;
  417. Mutex_.Release();
  418. CondFree_.Signal();
  419. return ret;
  420. }
  421. private:
  422. TAdaptiveThreadPool* Parent_;
  423. const bool Catching;
  424. TThreadNamer Namer;
  425. TAtomic ThrCount_;
  426. TMutex Mutex_;
  427. TCondVar CondReady_;
  428. TCondVar CondFree_;
  429. bool AllDone_;
  430. IObjectInQueue* Obj_;
  431. size_t Free_;
  432. char Name_[64];
  433. TDuration IdleTime_;
  434. };
  435. TThreadPoolBase::TThreadPoolBase(const TParams& params)
  436. : TThreadFactoryHolder(params.Factory_)
  437. , Params(params)
  438. {
  439. }
  440. #define DEFINE_THREAD_POOL_CTORS(type) \
  441. type::type(const TParams& params) \
  442. : TThreadPoolBase(params) \
  443. { \
  444. }
  445. DEFINE_THREAD_POOL_CTORS(TThreadPool)
  446. DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool)
  447. DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool)
  448. TAdaptiveThreadPool::~TAdaptiveThreadPool() = default;
  449. bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) {
  450. Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
  451. Impl_->Add(obj);
  452. return true;
  453. }
  454. void TAdaptiveThreadPool::Start(size_t, size_t) {
  455. Impl_.Reset(new TImpl(this, Params));
  456. }
  457. void TAdaptiveThreadPool::Stop() noexcept {
  458. Impl_.Destroy();
  459. }
  460. size_t TAdaptiveThreadPool::Size() const noexcept {
  461. if (Impl_.Get()) {
  462. return Impl_->Size();
  463. }
  464. return 0;
  465. }
  466. void TAdaptiveThreadPool::SetMaxIdleTime(TDuration interval) {
  467. Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
  468. Impl_->SetMaxIdleTime(interval);
  469. }
  470. TSimpleThreadPool::~TSimpleThreadPool() {
  471. try {
  472. Stop();
  473. } catch (...) {
  474. // ¯\_(ツ)_/¯
  475. }
  476. }
  477. bool TSimpleThreadPool::Add(IObjectInQueue* obj) {
  478. Y_ENSURE_EX(Slave_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
  479. return Slave_->Add(obj);
  480. }
  481. void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) {
  482. THolder<IThreadPool> tmp;
  483. TAdaptiveThreadPool* adaptive(nullptr);
  484. if (thrnum) {
  485. tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params));
  486. } else {
  487. adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params);
  488. tmp.Reset(adaptive);
  489. }
  490. tmp->Start(thrnum, maxque);
  491. if (adaptive) {
  492. adaptive->SetMaxIdleTime(TDuration::Seconds(100));
  493. }
  494. Slave_.Swap(tmp);
  495. }
  496. void TSimpleThreadPool::Stop() noexcept {
  497. Slave_.Destroy();
  498. }
  499. size_t TSimpleThreadPool::Size() const noexcept {
  500. if (Slave_.Get()) {
  501. return Slave_->Size();
  502. }
  503. return 0;
  504. }
  505. namespace {
  506. class TOwnedObjectInQueue: public IObjectInQueue {
  507. private:
  508. THolder<IObjectInQueue> Owned;
  509. public:
  510. TOwnedObjectInQueue(THolder<IObjectInQueue> owned)
  511. : Owned(std::move(owned))
  512. {
  513. }
  514. void Process(void* data) override {
  515. THolder<TOwnedObjectInQueue> self(this);
  516. Owned->Process(data);
  517. }
  518. };
  519. }
  520. void IThreadPool::SafeAdd(IObjectInQueue* obj) {
  521. Y_ENSURE_EX(Add(obj), TThreadPoolException() << TStringBuf("can not add object to queue"));
  522. }
  523. void IThreadPool::SafeAddAndOwn(THolder<IObjectInQueue> obj) {
  524. Y_ENSURE_EX(AddAndOwn(std::move(obj)), TThreadPoolException() << TStringBuf("can not add to queue and own"));
  525. }
  526. bool IThreadPool::AddAndOwn(THolder<IObjectInQueue> obj) {
  527. auto owner = MakeHolder<TOwnedObjectInQueue>(std::move(obj));
  528. bool added = Add(owner.Get());
  529. if (added) {
  530. Y_UNUSED(owner.Release());
  531. }
  532. return added;
  533. }
  534. using IThread = IThreadFactory::IThread;
  535. using IThreadAble = IThreadFactory::IThreadAble;
  536. namespace {
  537. class TPoolThread: public IThread {
  538. class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> {
  539. public:
  540. inline TThreadImpl(IThreadAble* func)
  541. : Func_(func)
  542. {
  543. }
  544. ~TThreadImpl() override = default;
  545. inline void WaitForStart() noexcept {
  546. StartEvent_.Wait();
  547. }
  548. inline void WaitForComplete() noexcept {
  549. CompleteEvent_.Wait();
  550. }
  551. private:
  552. void Process(void* /*tsr*/) override {
  553. TThreadImplRef This(this);
  554. {
  555. StartEvent_.Signal();
  556. try {
  557. Func_->Execute();
  558. } catch (...) {
  559. // ¯\_(ツ)_/¯
  560. }
  561. CompleteEvent_.Signal();
  562. }
  563. }
  564. private:
  565. IThreadAble* Func_;
  566. TSystemEvent CompleteEvent_;
  567. TSystemEvent StartEvent_;
  568. };
  569. using TThreadImplRef = TIntrusivePtr<TThreadImpl>;
  570. public:
  571. inline TPoolThread(IThreadPool* parent)
  572. : Parent_(parent)
  573. {
  574. }
  575. ~TPoolThread() override {
  576. if (Impl_) {
  577. Impl_->WaitForStart();
  578. }
  579. }
  580. private:
  581. void DoRun(IThreadAble* func) override {
  582. TThreadImplRef impl(new TThreadImpl(func));
  583. Parent_->SafeAdd(impl.Get());
  584. Impl_.Swap(impl);
  585. }
  586. void DoJoin() noexcept override {
  587. if (Impl_) {
  588. Impl_->WaitForComplete();
  589. Impl_ = nullptr;
  590. }
  591. }
  592. private:
  593. IThreadPool* Parent_;
  594. TThreadImplRef Impl_;
  595. };
  596. }
  597. IThread* IThreadPool::DoCreate() {
  598. return new TPoolThread(this);
  599. }
  600. THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) {
  601. THolder<IThreadPool> queue;
  602. if (threadsCount > 1) {
  603. queue.Reset(new TThreadPool(params));
  604. } else {
  605. queue.Reset(new TFakeThreadPool());
  606. }
  607. queue->Start(threadsCount, queueSizeLimit);
  608. return queue;
  609. }