pool.h 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. #pragma once
  2. #include "fwd.h"
  3. #include "factory.h"
  4. #include <util/system/yassert.h>
  5. #include <util/system/defaults.h>
  6. #include <util/generic/yexception.h>
  7. #include <util/generic/ptr.h>
  8. #include <util/generic/noncopyable.h>
  9. #include <functional>
  10. class TDuration;
  11. struct IObjectInQueue {
  12. virtual ~IObjectInQueue() = default;
  13. /**
  14. * Supposed to be implemented by user, to define jobs processed
  15. * in multiple threads.
  16. *
  17. * @param threadSpecificResource is nullptr by default. But if you override
  18. * IThreadPool::CreateThreadSpecificResource, then result of
  19. * IThreadPool::CreateThreadSpecificResource is passed as threadSpecificResource
  20. * parameter.
  21. */
  22. virtual void Process(void* threadSpecificResource) = 0;
  23. };
  24. /**
  25. * Mighty class to add 'Pool' method to derived classes.
  26. * Useful only for creators of new queue classes.
  27. */
  28. class TThreadFactoryHolder {
  29. public:
  30. TThreadFactoryHolder() noexcept;
  31. inline TThreadFactoryHolder(IThreadFactory* pool) noexcept
  32. : Pool_(pool)
  33. {
  34. }
  35. inline ~TThreadFactoryHolder() = default;
  36. inline IThreadFactory* Pool() const noexcept {
  37. return Pool_;
  38. }
  39. private:
  40. IThreadFactory* Pool_;
  41. };
  42. class TThreadPoolException: public yexception {
  43. };
  44. template <class T>
  45. class TThrFuncObj: public IObjectInQueue {
  46. public:
  47. TThrFuncObj(const T& func)
  48. : Func(func)
  49. {
  50. }
  51. TThrFuncObj(T&& func)
  52. : Func(std::move(func))
  53. {
  54. }
  55. void Process(void*) override {
  56. THolder<TThrFuncObj> self(this);
  57. Func();
  58. }
  59. private:
  60. T Func;
  61. };
  62. template <class T>
  63. IObjectInQueue* MakeThrFuncObj(T&& func) {
  64. return new TThrFuncObj<std::remove_cv_t<std::remove_reference_t<T>>>(std::forward<T>(func));
  65. }
  66. struct TThreadPoolParams {
  67. bool Catching_ = true;
  68. bool Blocking_ = false;
  69. IThreadFactory* Factory_ = SystemThreadFactory();
  70. TString ThreadName_;
  71. bool EnumerateThreads_ = false;
  72. using TSelf = TThreadPoolParams;
  73. TThreadPoolParams() {
  74. }
  75. TThreadPoolParams(IThreadFactory* factory)
  76. : Factory_(factory)
  77. {
  78. }
  79. TThreadPoolParams(const TString& name) {
  80. SetThreadName(name);
  81. }
  82. TThreadPoolParams(const char* name) {
  83. SetThreadName(name);
  84. }
  85. TSelf& SetCatching(bool val) {
  86. Catching_ = val;
  87. return *this;
  88. }
  89. TSelf& SetBlocking(bool val) {
  90. Blocking_ = val;
  91. return *this;
  92. }
  93. TSelf& SetFactory(IThreadFactory* factory) {
  94. Factory_ = factory;
  95. return *this;
  96. }
  97. TSelf& SetThreadName(const TString& name) {
  98. ThreadName_ = name;
  99. EnumerateThreads_ = false;
  100. return *this;
  101. }
  102. TSelf& SetThreadNamePrefix(const TString& prefix) {
  103. ThreadName_ = prefix;
  104. EnumerateThreads_ = true;
  105. return *this;
  106. }
  107. };
  108. /**
  109. * A queue processed simultaneously by several threads
  110. */
  111. class IThreadPool: public IThreadFactory, public TNonCopyable {
  112. public:
  113. using TParams = TThreadPoolParams;
  114. ~IThreadPool() override = default;
  115. /**
  116. * Safe versions of Add*() functions. Behave exactly like as non-safe
  117. * version of Add*(), but use exceptions instead returning false
  118. */
  119. void SafeAdd(IObjectInQueue* obj);
  120. template <class T>
  121. void SafeAddFunc(T&& func) {
  122. Y_ENSURE_EX(AddFunc(std::forward<T>(func)), TThreadPoolException() << TStringBuf("can not add function to queue"));
  123. }
  124. void SafeAddAndOwn(THolder<IObjectInQueue> obj);
  125. /**
  126. * Add object to queue, run obj->Proccess in other threads.
  127. * Obj is not deleted after execution
  128. * @return true of obj is successfully added to queue
  129. * @return false if queue is full or shutting down
  130. */
  131. virtual bool Add(IObjectInQueue* obj) Y_WARN_UNUSED_RESULT = 0;
  132. template <class T>
  133. Y_WARN_UNUSED_RESULT bool AddFunc(T&& func) {
  134. THolder<IObjectInQueue> wrapper(MakeThrFuncObj(std::forward<T>(func)));
  135. bool added = Add(wrapper.Get());
  136. if (added) {
  137. Y_UNUSED(wrapper.Release());
  138. }
  139. return added;
  140. }
  141. bool AddAndOwn(THolder<IObjectInQueue> obj) Y_WARN_UNUSED_RESULT;
  142. virtual void Start(size_t threadCount, size_t queueSizeLimit = 0) = 0;
  143. /** Wait for completion of all scheduled objects, and then exit */
  144. virtual void Stop() noexcept = 0;
  145. /** Number of tasks currently in queue */
  146. virtual size_t Size() const noexcept = 0;
  147. public:
  148. /**
  149. * RAII wrapper for Create/DestroyThreadSpecificResource.
  150. * Useful only for implementers of new IThreadPool queues.
  151. */
  152. class TTsr {
  153. public:
  154. inline TTsr(IThreadPool* q)
  155. : Q_(q)
  156. , Data_(Q_->CreateThreadSpecificResource())
  157. {
  158. }
  159. inline ~TTsr() {
  160. try {
  161. Q_->DestroyThreadSpecificResource(Data_);
  162. } catch (...) {
  163. // ¯\_(ツ)_/¯
  164. }
  165. }
  166. inline operator void*() noexcept {
  167. return Data_;
  168. }
  169. private:
  170. IThreadPool* Q_;
  171. void* Data_;
  172. };
  173. /**
  174. * CreateThreadSpecificResource and DestroyThreadSpecificResource
  175. * called from internals of (TAdaptiveThreadPool, TThreadPool, ...) implementation,
  176. * not by user of IThreadPool interface.
  177. * Created resource is passed to IObjectInQueue::Proccess function.
  178. */
  179. virtual void* CreateThreadSpecificResource() {
  180. return nullptr;
  181. }
  182. virtual void DestroyThreadSpecificResource(void* resource) {
  183. if (resource != nullptr) {
  184. Y_ASSERT(resource == nullptr);
  185. }
  186. }
  187. private:
  188. IThread* DoCreate() override;
  189. };
  190. /**
  191. * Single-threaded implementation of IThreadPool, process tasks in same thread when
  192. * added.
  193. * Can be used to remove multithreading.
  194. */
  195. class TFakeThreadPool: public IThreadPool {
  196. public:
  197. bool Add(IObjectInQueue* pObj) override Y_WARN_UNUSED_RESULT {
  198. TTsr tsr(this);
  199. pObj->Process(tsr);
  200. return true;
  201. }
  202. void Start(size_t, size_t = 0) override {
  203. }
  204. void Stop() noexcept override {
  205. }
  206. size_t Size() const noexcept override {
  207. return 0;
  208. }
  209. };
  210. class TThreadPoolBase: public IThreadPool, public TThreadFactoryHolder {
  211. public:
  212. TThreadPoolBase(const TParams& params);
  213. protected:
  214. TParams Params;
  215. };
  216. /** queue processed by fixed size thread pool */
  217. class TThreadPool: public TThreadPoolBase {
  218. public:
  219. TThreadPool(const TParams& params = {});
  220. ~TThreadPool() override;
  221. bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT;
  222. /**
  223. * @param queueSizeLimit means "unlimited" when = 0
  224. * @param threadCount means "single thread" when = 0
  225. */
  226. void Start(size_t threadCount, size_t queueSizeLimit = 0) override;
  227. void Stop() noexcept override;
  228. size_t Size() const noexcept override;
  229. size_t GetThreadCountExpected() const noexcept;
  230. size_t GetThreadCountReal() const noexcept;
  231. size_t GetMaxQueueSize() const noexcept;
  232. private:
  233. class TImpl;
  234. THolder<TImpl> Impl_;
  235. };
  236. /**
  237. * Always create new thread for new task, when all existing threads are busy.
  238. * Maybe dangerous, number of threads is not limited.
  239. */
  240. class TAdaptiveThreadPool: public TThreadPoolBase {
  241. public:
  242. TAdaptiveThreadPool(const TParams& params = {});
  243. ~TAdaptiveThreadPool() override;
  244. /**
  245. * If working thread waits task too long (more then interval parameter),
  246. * then the thread would be killed. Default value - infinity, all created threads
  247. * waits for new task forever, before Stop.
  248. */
  249. void SetMaxIdleTime(TDuration interval);
  250. bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT;
  251. /** @param thrnum, @param maxque are ignored */
  252. void Start(size_t thrnum = 0, size_t maxque = 0) override;
  253. void Stop() noexcept override;
  254. size_t Size() const noexcept override;
  255. private:
  256. class TImpl;
  257. THolder<TImpl> Impl_;
  258. };
  259. /** Behave like TThreadPool or TAdaptiveThreadPool, choosen by thrnum parameter of Start() */
  260. class TSimpleThreadPool: public TThreadPoolBase {
  261. public:
  262. TSimpleThreadPool(const TParams& params = {});
  263. ~TSimpleThreadPool() override;
  264. bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT;
  265. /**
  266. * @parameter thrnum. If thrnum is 0, use TAdaptiveThreadPool with small
  267. * SetMaxIdleTime interval parameter. if thrnum is not 0, use non-blocking TThreadPool
  268. */
  269. void Start(size_t thrnum, size_t maxque = 0) override;
  270. void Stop() noexcept override;
  271. size_t Size() const noexcept override;
  272. private:
  273. THolder<IThreadPool> Slave_;
  274. };
  275. /**
  276. * Helper to override virtual functions Create/DestroyThreadSpecificResource
  277. * from IThreadPool and implement them using functions with same name from
  278. * pointer to TSlave.
  279. */
  280. template <class TQueueType, class TSlave>
  281. class TThreadPoolBinder: public TQueueType {
  282. public:
  283. inline TThreadPoolBinder(TSlave* slave)
  284. : Slave_(slave)
  285. {
  286. }
  287. template <class... Args>
  288. inline TThreadPoolBinder(TSlave* slave, Args&&... args)
  289. : TQueueType(std::forward<Args>(args)...)
  290. , Slave_(slave)
  291. {
  292. }
  293. inline TThreadPoolBinder(TSlave& slave)
  294. : Slave_(&slave)
  295. {
  296. }
  297. ~TThreadPoolBinder() override {
  298. try {
  299. this->Stop();
  300. } catch (...) {
  301. // ¯\_(ツ)_/¯
  302. }
  303. }
  304. void* CreateThreadSpecificResource() override {
  305. return Slave_->CreateThreadSpecificResource();
  306. }
  307. void DestroyThreadSpecificResource(void* resource) override {
  308. Slave_->DestroyThreadSpecificResource(resource);
  309. }
  310. private:
  311. TSlave* Slave_;
  312. };
  313. inline void Delete(THolder<IThreadPool> q) {
  314. if (q.Get()) {
  315. q->Stop();
  316. }
  317. }
  318. /**
  319. * Creates and starts TThreadPool if threadsCount > 1, or TFakeThreadPool otherwise
  320. * You could specify blocking and catching modes for TThreadPool only
  321. */
  322. THolder<IThreadPool> CreateThreadPool(size_t threadCount, size_t queueSizeLimit = 0, const IThreadPool::TParams& params = {});