executor.h 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. #pragma once
  2. #include "asio.h"
  3. #include <library/cpp/deprecated/atomic/atomic.h>
  4. #include <util/thread/factory.h>
  5. #include <util/system/thread.h>
  6. namespace NAsio {
  7. class TIOServiceExecutor: public IThreadFactory::IThreadAble {
  8. public:
  9. TIOServiceExecutor()
  10. : Work_(new TIOService::TWork(Srv_))
  11. {
  12. T_ = SystemThreadFactory()->Run(this);
  13. }
  14. ~TIOServiceExecutor() override {
  15. SyncShutdown();
  16. }
  17. void DoExecute() override {
  18. TThread::SetCurrentThreadName("NehAsioExecutor");
  19. Srv_.Run();
  20. }
  21. inline TIOService& GetIOService() noexcept {
  22. return Srv_;
  23. }
  24. void SyncShutdown() {
  25. if (Work_) {
  26. Work_.Destroy();
  27. Srv_.Abort(); //cancel all async operations, break Run() execution
  28. T_->Join();
  29. }
  30. }
  31. private:
  32. TIOService Srv_;
  33. TAutoPtr<TIOService::TWork> Work_;
  34. typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
  35. IThreadRef T_;
  36. };
  37. class TExecutorsPool {
  38. public:
  39. TExecutorsPool(size_t executors)
  40. : C_(0)
  41. {
  42. for (size_t i = 0; i < executors; ++i) {
  43. E_.push_back(new TIOServiceExecutor());
  44. }
  45. }
  46. inline size_t Size() const noexcept {
  47. return E_.size();
  48. }
  49. inline TIOServiceExecutor& GetExecutor() noexcept {
  50. TAtomicBase next = AtomicIncrement(C_);
  51. return *E_[next % E_.size()];
  52. }
  53. void SyncShutdown() {
  54. for (size_t i = 0; i < E_.size(); ++i) {
  55. E_[i]->SyncShutdown();
  56. }
  57. }
  58. private:
  59. TAtomic C_;
  60. TVector<TAutoPtr<TIOServiceExecutor>> E_;
  61. };
  62. }