local_executor.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #pragma once
  2. #include <library/cpp/threading/future/future.h>
  3. #include <util/generic/cast.h>
  4. #include <util/generic/fwd.h>
  5. #include <util/generic/noncopyable.h>
  6. #include <util/generic/ptr.h>
  7. #include <util/generic/singleton.h>
  8. #include <util/generic/ymath.h>
  9. #include <functional>
  10. namespace NPar {
  11. struct ILocallyExecutable : virtual public TThrRefBase {
  12. // Must be implemented by the end user to define job that will be processed by one of
  13. // executor threads.
  14. //
  15. // @param id Job parameter, typically an index pointing somewhere in array, or just
  16. // some dummy value, e.g. `0`.
  17. virtual void LocalExec(int id) = 0;
  18. };
  19. // Alternative and simpler way of describing a job for executor. Function argument has the
  20. // same meaning as `id` in `ILocallyExecutable::LocalExec`.
  21. //
  22. using TLocallyExecutableFunction = std::function<void(int)>;
  23. class ILocalExecutor: public TNonCopyable {
  24. public:
  25. ILocalExecutor() = default;
  26. virtual ~ILocalExecutor() = default;
  27. enum EFlags : int {
  28. HIGH_PRIORITY = 0,
  29. MED_PRIORITY = 1,
  30. LOW_PRIORITY = 2,
  31. PRIORITY_MASK = 3,
  32. WAIT_COMPLETE = 4
  33. };
  34. // Add task for further execution.
  35. //
  36. // @param exec Task description.
  37. // @param id Task argument.
  38. // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
  39. // and `WAIT_COMPLETE`.
  40. virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0;
  41. // Add tasks range for further execution.
  42. //
  43. // @param exec Task description.
  44. // @param firstId, lastId Task arguments [firstId, lastId)
  45. // @param flags Same as for `Exec`.
  46. virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0;
  47. // 0-based ILocalExecutor worker thread identification
  48. virtual int GetWorkerThreadId() const noexcept = 0;
  49. virtual int GetThreadCount() const noexcept = 0;
  50. // Describes a range of tasks with parameters from integer range [FirstId, LastId).
  51. //
  52. class TExecRangeParams {
  53. public:
  54. template <typename TFirst, typename TLast>
  55. TExecRangeParams(TFirst firstId, TLast lastId)
  56. : FirstId(SafeIntegerCast<int>(firstId))
  57. , LastId(SafeIntegerCast<int>(lastId))
  58. {
  59. Y_ASSERT(LastId >= FirstId);
  60. SetBlockSize(1);
  61. }
  62. // Partition tasks into `blockCount` blocks of approximately equal size, each of which
  63. // will be executed as a separate bigger task.
  64. //
  65. template <typename TBlockCount>
  66. TExecRangeParams& SetBlockCount(TBlockCount blockCount) {
  67. Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId);
  68. BlockSize = FirstId == LastId ? 0 : CeilDiv(LastId - FirstId, SafeIntegerCast<int>(blockCount));
  69. BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
  70. BlockEqualToThreads = false;
  71. return *this;
  72. }
  73. // Partition tasks into blocks of approximately `blockSize` size, each of which will
  74. // be executed as a separate bigger task.
  75. //
  76. template <typename TBlockSize>
  77. TExecRangeParams& SetBlockSize(TBlockSize blockSize) {
  78. Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId);
  79. BlockSize = SafeIntegerCast<int>(blockSize);
  80. BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
  81. BlockEqualToThreads = false;
  82. return *this;
  83. }
  84. // Partition tasks into thread count blocks of approximately equal size, each of which
  85. // will be executed as a separate bigger task.
  86. //
  87. TExecRangeParams& SetBlockCountToThreadCount() {
  88. BlockEqualToThreads = true;
  89. return *this;
  90. }
  91. int GetBlockCount() const {
  92. Y_ASSERT(!BlockEqualToThreads);
  93. return BlockCount;
  94. }
  95. int GetBlockSize() const {
  96. Y_ASSERT(!BlockEqualToThreads);
  97. return BlockSize;
  98. }
  99. bool GetBlockEqualToThreads() {
  100. return BlockEqualToThreads;
  101. }
  102. const int FirstId = 0;
  103. const int LastId = 0;
  104. private:
  105. int BlockSize;
  106. int BlockCount;
  107. bool BlockEqualToThreads;
  108. };
  109. // `Exec` and `ExecRange` versions that accept functions.
  110. //
  111. void Exec(TLocallyExecutableFunction exec, int id, int flags);
  112. void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
  113. // Version of `ExecRange` that throws exception from task with minimal id if at least one of
  114. // task threw an exception.
  115. //
  116. void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
  117. // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if
  118. // it fails.
  119. //
  120. TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
  121. template <typename TBody>
  122. static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {
  123. return [=](int blockId) {
  124. const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
  125. const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
  126. for (int i = blockFirstId; i < blockLastId; ++i) {
  127. body(i);
  128. }
  129. };
  130. }
  131. template <typename TBody>
  132. inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) {
  133. if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {
  134. return;
  135. }
  136. if (params.GetBlockEqualToThreads()) {
  137. params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag
  138. }
  139. ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
  140. }
  141. template <typename TBody>
  142. inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) {
  143. if (firstId >= lastId) {
  144. return;
  145. }
  146. const int threadCount = Max(GetThreadCount(), 1);
  147. const int batchSize = batchSizeOrZeroForAutoBatchSize
  148. ? batchSizeOrZeroForAutoBatchSize
  149. : (lastId - firstId + threadCount - 1) / threadCount;
  150. const int batchCount = (lastId - firstId + batchSize - 1) / batchSize;
  151. const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount;
  152. auto states = ExecRangeWithFutures(
  153. [=](int threadId) {
  154. for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) {
  155. int batchId = batchIdPerThread * threadCount + threadId;
  156. int begin = firstId + batchId * batchSize;
  157. int end = Min(begin + batchSize, lastId);
  158. for (int i = begin; i < end; ++i) {
  159. body(i);
  160. }
  161. }
  162. },
  163. 0, threadCount, flags);
  164. for (auto& state: states) {
  165. state.GetValueSync(); // Re-throw exception if any.
  166. }
  167. }
  168. template <typename TBody>
  169. static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) {
  170. if (lastId == firstId) {
  171. return true;
  172. }
  173. if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) {
  174. body(firstId);
  175. return true;
  176. }
  177. return false;
  178. }
  179. };
  180. // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles.
  181. //
  182. // Examples:
  183. // Execute one task with medium priority and wait for it completion.
  184. // ```
  185. // LocalExecutor().Run(4);
  186. // TEvent event;
  187. // LocalExecutor().Exec([](int) {
  188. // SomeFunc();
  189. // event.Signal();
  190. // }, 0, TLocalExecutor::MED_PRIORITY);
  191. //
  192. // SomeOtherCode();
  193. // event.WaitI();
  194. // ```
  195. //
  196. // Execute range of tasks with medium priority.
  197. // ```
  198. // LocalExecutor().Run(4);
  199. // LocalExecutor().ExecRange([](int id) {
  200. // SomeFunc(id);
  201. // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY);
  202. // ```
  203. //
  204. class TLocalExecutor final: public ILocalExecutor {
  205. public:
  206. using EFlags = ILocalExecutor::EFlags;
  207. // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads`
  208. // to add threads to underlying thread pool.
  209. //
  210. TLocalExecutor();
  211. ~TLocalExecutor();
  212. int GetQueueSize() const noexcept;
  213. int GetMPQueueSize() const noexcept;
  214. int GetLPQueueSize() const noexcept;
  215. void ClearLPQueue();
  216. // 0-based TLocalExecutor worker thread identification
  217. int GetWorkerThreadId() const noexcept override;
  218. int GetThreadCount() const noexcept override;
  219. // **Add** threads to underlying thread pool.
  220. //
  221. // @param threadCount Number of threads to add.
  222. void RunAdditionalThreads(int threadCount);
  223. // Add task for further execution.
  224. //
  225. // @param exec Task description.
  226. // @param id Task argument.
  227. // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
  228. // and `WAIT_COMPLETE`.
  229. void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override;
  230. // Add tasks range for further execution.
  231. //
  232. // @param exec Task description.
  233. // @param firstId, lastId Task arguments [firstId, lastId)
  234. // @param flags Same as for `Exec`.
  235. void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override;
  236. using ILocalExecutor::Exec;
  237. using ILocalExecutor::ExecRange;
  238. private:
  239. class TImpl;
  240. THolder<TImpl> Impl_;
  241. };
  242. static inline TLocalExecutor& LocalExecutor() {
  243. return *Singleton<TLocalExecutor>();
  244. }
  245. template <typename TBody>
  246. inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) {
  247. ILocalExecutor::TExecRangeParams params(from, to);
  248. params.SetBlockCountToThreadCount();
  249. executor.ExecRange(std::forward<TBody>(body), params, TLocalExecutor::WAIT_COMPLETE);
  250. }
  251. template <typename TBody>
  252. inline void ParallelFor(ui32 from, ui32 to, TBody&& body) {
  253. ParallelFor(LocalExecutor(), from, to, std::forward<TBody>(body));
  254. }
  255. template <typename TBody>
  256. inline void AsyncParallelFor(ui32 from, ui32 to, TBody&& body) {
  257. ILocalExecutor::TExecRangeParams params(from, to);
  258. params.SetBlockCountToThreadCount();
  259. LocalExecutor().ExecRange(std::forward<TBody>(body), params, 0);
  260. }
  261. }