123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- #pragma once
- #include <library/cpp/threading/future/future.h>
- #include <util/generic/cast.h>
- #include <util/generic/fwd.h>
- #include <util/generic/noncopyable.h>
- #include <util/generic/ptr.h>
- #include <util/generic/singleton.h>
- #include <util/generic/ymath.h>
- #include <functional>
- namespace NPar {
- struct ILocallyExecutable : virtual public TThrRefBase {
- // Must be implemented by the end user to define job that will be processed by one of
- // executor threads.
- //
- // @param id Job parameter, typically an index pointing somewhere in array, or just
- // some dummy value, e.g. `0`.
- virtual void LocalExec(int id) = 0;
- };
- // Alternative and simpler way of describing a job for executor. Function argument has the
- // same meaning as `id` in `ILocallyExecutable::LocalExec`.
- //
- using TLocallyExecutableFunction = std::function<void(int)>;
- class ILocalExecutor: public TNonCopyable {
- public:
- ILocalExecutor() = default;
- virtual ~ILocalExecutor() = default;
- enum EFlags : int {
- HIGH_PRIORITY = 0,
- MED_PRIORITY = 1,
- LOW_PRIORITY = 2,
- PRIORITY_MASK = 3,
- WAIT_COMPLETE = 4
- };
- // Add task for further execution.
- //
- // @param exec Task description.
- // @param id Task argument.
- // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
- // and `WAIT_COMPLETE`.
- virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0;
- // Add tasks range for further execution.
- //
- // @param exec Task description.
- // @param firstId, lastId Task arguments [firstId, lastId)
- // @param flags Same as for `Exec`.
- virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0;
- // 0-based ILocalExecutor worker thread identification
- virtual int GetWorkerThreadId() const noexcept = 0;
- virtual int GetThreadCount() const noexcept = 0;
- // Describes a range of tasks with parameters from integer range [FirstId, LastId).
- //
- class TExecRangeParams {
- public:
- template <typename TFirst, typename TLast>
- TExecRangeParams(TFirst firstId, TLast lastId)
- : FirstId(SafeIntegerCast<int>(firstId))
- , LastId(SafeIntegerCast<int>(lastId))
- {
- Y_ASSERT(LastId >= FirstId);
- SetBlockSize(1);
- }
- // Partition tasks into `blockCount` blocks of approximately equal size, each of which
- // will be executed as a separate bigger task.
- //
- template <typename TBlockCount>
- TExecRangeParams& SetBlockCount(TBlockCount blockCount) {
- Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId);
- BlockSize = FirstId == LastId ? 0 : CeilDiv(LastId - FirstId, SafeIntegerCast<int>(blockCount));
- BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
- BlockEqualToThreads = false;
- return *this;
- }
- // Partition tasks into blocks of approximately `blockSize` size, each of which will
- // be executed as a separate bigger task.
- //
- template <typename TBlockSize>
- TExecRangeParams& SetBlockSize(TBlockSize blockSize) {
- Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId);
- BlockSize = SafeIntegerCast<int>(blockSize);
- BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
- BlockEqualToThreads = false;
- return *this;
- }
- // Partition tasks into thread count blocks of approximately equal size, each of which
- // will be executed as a separate bigger task.
- //
- TExecRangeParams& SetBlockCountToThreadCount() {
- BlockEqualToThreads = true;
- return *this;
- }
- int GetBlockCount() const {
- Y_ASSERT(!BlockEqualToThreads);
- return BlockCount;
- }
- int GetBlockSize() const {
- Y_ASSERT(!BlockEqualToThreads);
- return BlockSize;
- }
- bool GetBlockEqualToThreads() {
- return BlockEqualToThreads;
- }
- const int FirstId = 0;
- const int LastId = 0;
- private:
- int BlockSize;
- int BlockCount;
- bool BlockEqualToThreads;
- };
- // `Exec` and `ExecRange` versions that accept functions.
- //
- void Exec(TLocallyExecutableFunction exec, int id, int flags);
- void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
- // Version of `ExecRange` that throws exception from task with minimal id if at least one of
- // task threw an exception.
- //
- void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
- // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if
- // it fails.
- //
- TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
- template <typename TBody>
- static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {
- return [=](int blockId) {
- const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
- const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
- for (int i = blockFirstId; i < blockLastId; ++i) {
- body(i);
- }
- };
- }
- template <typename TBody>
- inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) {
- if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {
- return;
- }
- if (params.GetBlockEqualToThreads()) {
- params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag
- }
- ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
- }
- template <typename TBody>
- inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) {
- if (firstId >= lastId) {
- return;
- }
- const int threadCount = Max(GetThreadCount(), 1);
- const int batchSize = batchSizeOrZeroForAutoBatchSize
- ? batchSizeOrZeroForAutoBatchSize
- : (lastId - firstId + threadCount - 1) / threadCount;
- const int batchCount = (lastId - firstId + batchSize - 1) / batchSize;
- const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount;
- auto states = ExecRangeWithFutures(
- [=](int threadId) {
- for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) {
- int batchId = batchIdPerThread * threadCount + threadId;
- int begin = firstId + batchId * batchSize;
- int end = Min(begin + batchSize, lastId);
- for (int i = begin; i < end; ++i) {
- body(i);
- }
- }
- },
- 0, threadCount, flags);
- for (auto& state: states) {
- state.GetValueSync(); // Re-throw exception if any.
- }
- }
- template <typename TBody>
- static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) {
- if (lastId == firstId) {
- return true;
- }
- if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) {
- body(firstId);
- return true;
- }
- return false;
- }
- };
- // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles.
- //
- // Examples:
- // Execute one task with medium priority and wait for it completion.
- // ```
- // LocalExecutor().Run(4);
- // TEvent event;
- // LocalExecutor().Exec([](int) {
- // SomeFunc();
- // event.Signal();
- // }, 0, TLocalExecutor::MED_PRIORITY);
- //
- // SomeOtherCode();
- // event.WaitI();
- // ```
- //
- // Execute range of tasks with medium priority.
- // ```
- // LocalExecutor().Run(4);
- // LocalExecutor().ExecRange([](int id) {
- // SomeFunc(id);
- // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY);
- // ```
- //
- class TLocalExecutor final: public ILocalExecutor {
- public:
- using EFlags = ILocalExecutor::EFlags;
- // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads`
- // to add threads to underlying thread pool.
- //
- TLocalExecutor();
- ~TLocalExecutor();
- int GetQueueSize() const noexcept;
- int GetMPQueueSize() const noexcept;
- int GetLPQueueSize() const noexcept;
- void ClearLPQueue();
- // 0-based TLocalExecutor worker thread identification
- int GetWorkerThreadId() const noexcept override;
- int GetThreadCount() const noexcept override;
- // **Add** threads to underlying thread pool.
- //
- // @param threadCount Number of threads to add.
- void RunAdditionalThreads(int threadCount);
- // Add task for further execution.
- //
- // @param exec Task description.
- // @param id Task argument.
- // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
- // and `WAIT_COMPLETE`.
- void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override;
- // Add tasks range for further execution.
- //
- // @param exec Task description.
- // @param firstId, lastId Task arguments [firstId, lastId)
- // @param flags Same as for `Exec`.
- void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override;
- using ILocalExecutor::Exec;
- using ILocalExecutor::ExecRange;
- private:
- class TImpl;
- THolder<TImpl> Impl_;
- };
- static inline TLocalExecutor& LocalExecutor() {
- return *Singleton<TLocalExecutor>();
- }
- template <typename TBody>
- inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) {
- ILocalExecutor::TExecRangeParams params(from, to);
- params.SetBlockCountToThreadCount();
- executor.ExecRange(std::forward<TBody>(body), params, TLocalExecutor::WAIT_COMPLETE);
- }
- template <typename TBody>
- inline void ParallelFor(ui32 from, ui32 to, TBody&& body) {
- ParallelFor(LocalExecutor(), from, to, std::forward<TBody>(body));
- }
- template <typename TBody>
- inline void AsyncParallelFor(ui32 from, ui32 to, TBody&& body) {
- ILocalExecutor::TExecRangeParams params(from, to);
- params.SetBlockCountToThreadCount();
- LocalExecutor().ExecRange(std::forward<TBody>(body), params, 0);
- }
- }
|