cont_poller.h 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. #pragma once
  2. #include "poller.h"
  3. #include "sockmap.h"
  4. #include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
  5. #include <util/datetime/base.h>
  6. #include <util/memory/pool.h>
  7. #include <util/memory/smallobj.h>
  8. #include <util/network/init.h>
  9. #include <cerrno>
  10. class TCont;
  11. class TContExecutor;
  12. class TFdEvent;
  13. namespace NCoro {
  14. class IPollEvent;
  15. struct TContPollEventCompare {
  16. template <class T>
  17. static inline bool Compare(const T& l, const T& r) noexcept {
  18. return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
  19. }
  20. };
  21. class TContPollEvent : public TRbTreeItem<TContPollEvent, TContPollEventCompare> {
  22. public:
  23. TContPollEvent(TCont* cont, TInstant deadLine) noexcept
  24. : Cont_(cont)
  25. , DeadLine_(deadLine)
  26. {}
  27. static bool Compare(const TContPollEvent& l, const TContPollEvent& r) noexcept {
  28. return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
  29. }
  30. int Status() const noexcept {
  31. return Status_;
  32. }
  33. void SetStatus(int status) noexcept {
  34. Status_ = status;
  35. }
  36. TCont* Cont() noexcept {
  37. return Cont_;
  38. }
  39. TInstant DeadLine() const noexcept {
  40. return DeadLine_;
  41. }
  42. void Wake(int status) noexcept {
  43. SetStatus(status);
  44. Wake();
  45. }
  46. private:
  47. void Wake() noexcept;
  48. private:
  49. TCont* Cont_;
  50. TInstant DeadLine_;
  51. int Status_ = EINPROGRESS;
  52. };
  53. class IPollEvent: public TIntrusiveListItem<IPollEvent> {
  54. public:
  55. IPollEvent(SOCKET fd, ui16 what) noexcept
  56. : Fd_(fd)
  57. , What_(what)
  58. {}
  59. virtual ~IPollEvent() {}
  60. SOCKET Fd() const noexcept {
  61. return Fd_;
  62. }
  63. int What() const noexcept {
  64. return What_;
  65. }
  66. virtual void OnPollEvent(int status) noexcept = 0;
  67. private:
  68. SOCKET Fd_;
  69. ui16 What_;
  70. };
  71. template <class T>
  72. class TBigArray {
  73. struct TValue: public T, public TObjectFromPool<TValue> {
  74. TValue() {}
  75. };
  76. public:
  77. TBigArray()
  78. : Pool_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
  79. {}
  80. T* Get(size_t index) {
  81. TRef& ret = Lst_.Get(index);
  82. if (!ret) {
  83. ret = TRef(new (&Pool_) TValue());
  84. }
  85. return ret.Get();
  86. }
  87. private:
  88. using TRef = THolder<TValue>;
  89. typename TValue::TPool Pool_;
  90. TSocketMap<TRef> Lst_;
  91. };
  92. using TPollEventList = TIntrusiveList<IPollEvent>;
  93. class TContPoller {
  94. public:
  95. using TEvent = IPollerFace::TEvent;
  96. using TEvents = IPollerFace::TEvents;
  97. TContPoller()
  98. : P_(IPollerFace::Default())
  99. {
  100. }
  101. explicit TContPoller(THolder<IPollerFace> poller)
  102. : P_(std::move(poller))
  103. {}
  104. void Schedule(IPollEvent* event) {
  105. auto* lst = Lists_.Get(event->Fd());
  106. const ui16 oldFlags = Flags(*lst);
  107. lst->PushFront(event);
  108. ui16 newFlags = Flags(*lst);
  109. if (newFlags != oldFlags) {
  110. if (oldFlags) {
  111. newFlags |= CONT_POLL_MODIFY;
  112. }
  113. P_->Set(lst, event->Fd(), newFlags);
  114. }
  115. }
  116. void Remove(IPollEvent* event) noexcept {
  117. auto* lst = Lists_.Get(event->Fd());
  118. const ui16 oldFlags = Flags(*lst);
  119. event->Unlink();
  120. ui16 newFlags = Flags(*lst);
  121. if (newFlags != oldFlags) {
  122. if (newFlags) {
  123. newFlags |= CONT_POLL_MODIFY;
  124. }
  125. P_->Set(lst, event->Fd(), newFlags);
  126. }
  127. }
  128. void Wait(TEvents& events, TInstant deadLine) {
  129. events.clear();
  130. P_->Wait(events, deadLine);
  131. }
  132. EContPoller PollEngine() const {
  133. return P_->PollEngine();
  134. }
  135. private:
  136. static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept {
  137. ui16 ret = 0;
  138. for (auto&& item : lst) {
  139. ret |= item.What();
  140. }
  141. return ret;
  142. }
  143. private:
  144. TBigArray<TPollEventList> Lists_;
  145. THolder<IPollerFace> P_;
  146. };
  147. class TEventWaitQueue {
  148. using TIoWait = TRbTree<NCoro::TContPollEvent, NCoro::TContPollEventCompare>;
  149. public:
  150. void Register(NCoro::TContPollEvent* event);
  151. bool Empty() const noexcept {
  152. return IoWait_.Empty();
  153. }
  154. void Abort() noexcept;
  155. TInstant WakeTimedout(TInstant now) noexcept;
  156. private:
  157. TIoWait IoWait_;
  158. };
  159. }
  160. class TFdEvent final:
  161. public NCoro::TContPollEvent,
  162. public NCoro::IPollEvent
  163. {
  164. public:
  165. TFdEvent(TCont* cont, SOCKET fd, ui16 what, TInstant deadLine) noexcept
  166. : TContPollEvent(cont, deadLine)
  167. , IPollEvent(fd, what)
  168. {}
  169. ~TFdEvent() {
  170. RemoveFromIOWait();
  171. }
  172. void RemoveFromIOWait() noexcept;
  173. void OnPollEvent(int status) noexcept override {
  174. Wake(status);
  175. }
  176. };
  177. class TTimerEvent: public NCoro::TContPollEvent {
  178. public:
  179. TTimerEvent(TCont* cont, TInstant deadLine) noexcept
  180. : TContPollEvent(cont, deadLine)
  181. {}
  182. };
  183. int ExecuteEvent(TFdEvent* event) noexcept;
  184. int ExecuteEvent(TTimerEvent* event) noexcept;