async_queue.h 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. #pragma once
  2. #include <library/cpp/threading/future/future.h>
  3. #include <library/cpp/threading/future/async.h>
  4. #include <util/thread/pool.h>
  5. #include <util/generic/ptr.h>
  6. #include <util/generic/function.h>
  7. #include <util/system/guard.h>
  8. #include <util/system/rwlock.h>
  9. #include <exception>
  10. namespace NYql {
  11. class TAsyncQueue: public TThrRefBase {
  12. public:
  13. using TPtr = TIntrusivePtr<TAsyncQueue>;
  14. static TPtr Make(size_t numThreads, const TString& poolName);
  15. void Stop() {
  16. auto guard = TWriteGuard(Lock_);
  17. if (MtpQueue_) {
  18. MtpQueue_->Stop();
  19. MtpQueue_.Destroy();
  20. }
  21. }
  22. template <typename TCallable>
  23. [[nodiscard]]
  24. ::NThreading::TFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>> Async(TCallable&& func) {
  25. {
  26. auto guard = TReadGuard(Lock_);
  27. if (MtpQueue_) {
  28. return ::NThreading::Async(std::move(func), *MtpQueue_);
  29. }
  30. }
  31. return ::NThreading::MakeErrorFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>>(std::make_exception_ptr(yexception() << "Thread pool is already stopped"));
  32. }
  33. private:
  34. TAsyncQueue(size_t numThreads, const TString& poolName);
  35. private:
  36. TRWMutex Lock_;
  37. THolder<IThreadPool> MtpQueue_;
  38. };
  39. } // NYql