poller.cpp 8.6 KB


  1. #include "poller.h"
  2. #include "sockmap.h"
  3. #include <util/memory/smallobj.h>
  4. #include <util/generic/intrlist.h>
  5. #include <util/generic/singleton.h>
  6. #include <util/system/env.h>
  7. #include <util/string/cast.h>
  8. namespace {
  9. using TChange = IPollerFace::TChange;
  10. using TEvent = IPollerFace::TEvent;
  11. using TEvents = IPollerFace::TEvents;
  12. template <class T>
  13. class TUnsafeBuf {
  14. public:
  15. TUnsafeBuf() noexcept
  16. : L_(0)
  17. {
  18. }
  19. T* operator~() const noexcept {
  20. return B_.Get();
  21. }
  22. size_t operator+() const noexcept {
  23. return L_;
  24. }
  25. void Reserve(size_t len) {
  26. len = FastClp2(len);
  27. if (len > L_) {
  28. B_.Reset(new T[len]);
  29. L_ = len;
  30. }
  31. }
  32. private:
  33. TArrayHolder<T> B_;
  34. size_t L_;
  35. };
  36. template <class T>
  37. class TVirtualize: public IPollerFace {
  38. public:
  39. TVirtualize(EContPoller pollerEngine)
  40. : PollerEngine_(pollerEngine)
  41. {
  42. }
  43. void Set(const TChange& c) override {
  44. P_.Set(c);
  45. }
  46. void Wait(TEvents& events, TInstant deadLine) override {
  47. P_.Wait(events, deadLine);
  48. }
  49. EContPoller PollEngine() const override {
  50. return PollerEngine_;
  51. }
  52. private:
  53. T P_;
  54. const EContPoller PollerEngine_;
  55. };
  56. template <class T>
  57. class TPoller {
  58. using TInternalEvent = typename T::TEvent;
  59. public:
  60. TPoller() {
  61. E_.Reserve(1);
  62. }
  63. void Set(const TChange& c) {
  64. P_.Set(c.Data, c.Fd, c.Flags);
  65. }
  66. void Reserve(size_t size) {
  67. E_.Reserve(size);
  68. }
  69. void Wait(TEvents& events, TInstant deadLine) {
  70. const size_t ret = P_.WaitD(~E_, +E_, deadLine);
  71. events.reserve(ret);
  72. for (size_t i = 0; i < ret; ++i) {
  73. const TInternalEvent* ie = ~E_ + i;
  74. const TEvent e = {
  75. T::ExtractEvent(ie),
  76. T::ExtractStatus(ie),
  77. (ui16)T::ExtractFilter(ie),
  78. };
  79. events.push_back(e);
  80. }
  81. E_.Reserve(ret + 1);
  82. }
  83. private:
  84. T P_;
  85. TUnsafeBuf<TInternalEvent> E_;
  86. };
  87. template <class T>
  88. class TIndexedArray {
  89. struct TVal:
  90. public T,
  91. public TIntrusiveListItem<TVal>,
  92. public TObjectFromPool<TVal>
  93. {
  94. // NOTE Constructor must be user-defined (and not =default) here
  95. // because TVal objects are created in the UB-capable placement
  96. // TObjectFromPool::new operator that stores data in a memory
  97. // allocated for the object. Without user defined constructor
  98. // zero-initialization takes place in TVal() expression and the
  99. // data is overwritten.
  100. TVal() {
  101. }
  102. };
  103. typedef TIntrusiveList<TVal> TListType;
  104. public:
  105. typedef typename TListType::TIterator TIterator;
  106. typedef typename TListType::TConstIterator TConstIterator;
  107. TIndexedArray()
  108. : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
  109. {
  110. }
  111. TIterator Begin() noexcept {
  112. return I_.Begin();
  113. }
  114. TIterator End() noexcept {
  115. return I_.End();
  116. }
  117. TConstIterator Begin() const noexcept {
  118. return I_.Begin();
  119. }
  120. TConstIterator End() const noexcept {
  121. return I_.End();
  122. }
  123. T& operator[](size_t i) {
  124. return *Get(i);
  125. }
  126. T* Get(size_t i) {
  127. TValRef& v = V_.Get(i);
  128. if (Y_UNLIKELY(!v)) {
  129. v.Reset(new (&P_) TVal());
  130. I_.PushFront(v.Get());
  131. }
  132. Y_PREFETCH_WRITE(v.Get(), 1);
  133. return v.Get();
  134. }
  135. void Erase(size_t i) noexcept {
  136. V_.Get(i).Destroy();
  137. }
  138. size_t Size() const noexcept {
  139. return I_.Size();
  140. }
  141. private:
  142. using TValRef = THolder<TVal>;
  143. typename TVal::TPool P_;
  144. TSocketMap<TValRef> V_;
  145. TListType I_;
  146. };
  147. inline short PollFlags(ui16 flags) noexcept {
  148. short ret = 0;
  149. if (flags & CONT_POLL_READ) {
  150. ret |= POLLIN;
  151. }
  152. if (flags & CONT_POLL_WRITE) {
  153. ret |= POLLOUT;
  154. }
  155. #if defined(_linux_)
  156. if (flags & CONT_POLL_RDHUP) {
  157. ret |= POLLRDHUP;
  158. }
  159. #endif
  160. return ret;
  161. }
  162. class TPollPoller {
  163. public:
  164. size_t Size() const noexcept {
  165. return S_.Size();
  166. }
  167. template <class T>
  168. void Build(T& t) const {
  169. for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
  170. t.Set(*it);
  171. }
  172. t.Reserve(Size());
  173. }
  174. void Set(const TChange& c) {
  175. if (c.Flags) {
  176. S_[c.Fd] = c;
  177. } else {
  178. S_.Erase(c.Fd);
  179. }
  180. }
  181. void Wait(TEvents& events, TInstant deadLine) {
  182. T_.clear();
  183. T_.reserve(Size());
  184. for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
  185. const pollfd pfd = {
  186. it->Fd,
  187. PollFlags(it->Flags),
  188. 0,
  189. };
  190. T_.push_back(pfd);
  191. }
  192. const ssize_t ret = PollD(T_.data(), (nfds_t) T_.size(), deadLine);
  193. if (ret <= 0) {
  194. return;
  195. }
  196. events.reserve(T_.size());
  197. for (size_t i = 0; i < T_.size(); ++i) {
  198. const pollfd& pfd = T_[i];
  199. const short ev = pfd.revents;
  200. if (!ev) {
  201. continue;
  202. }
  203. int status = 0;
  204. ui16 filter = 0;
  205. // We are perfectly fine with an EOF while reading a pipe or a unix socket
  206. if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) {
  207. filter |= CONT_POLL_READ;
  208. }
  209. if (ev & POLLOUT) {
  210. filter |= CONT_POLL_WRITE;
  211. }
  212. #if defined(_linux_)
  213. if (ev & POLLRDHUP) {
  214. filter |= CONT_POLL_RDHUP;
  215. }
  216. #endif
  217. if (ev & POLLERR) {
  218. status = EIO;
  219. } else if (ev & POLLHUP && pfd.events & POLLOUT) {
  220. // Only write operations may cause EPIPE
  221. status = EPIPE;
  222. } else if (ev & POLLNVAL) {
  223. status = EINVAL;
  224. }
  225. if (status) {
  226. filter = CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
  227. }
  228. const TEvent res = {
  229. S_[pfd.fd].Data,
  230. status,
  231. filter,
  232. };
  233. events.push_back(res);
  234. }
  235. }
  236. private:
  237. typedef TIndexedArray<TChange> TFds;
  238. TFds S_;
  239. typedef TVector<pollfd> TPollVec;
  240. TPollVec T_;
  241. };
  242. struct TUserPoller: public TString {
  243. TUserPoller()
  244. : TString(GetEnv("USER_POLLER"))
  245. {
  246. }
  247. };
  248. }
  249. THolder<IPollerFace> IPollerFace::Default() {
  250. return Construct(*SingletonWithPriority<TUserPoller, 0>());
  251. }
  252. THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) {
  253. return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default);
  254. }
  255. THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
  256. if (poller == EContPoller::Default) {
  257. #if defined (HAVE_EPOLL_POLLER)
  258. poller = EContPoller::Epoll;
  259. #elif defined(HAVE_KQUEUE_POLLER)
  260. poller = EContPoller::Kqueue;
  261. #else
  262. poller = EContPoller::Select;
  263. #endif
  264. }
  265. switch (poller) {
  266. case EContPoller::Select:
  267. return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller);
  268. case EContPoller::Poll:
  269. return MakeHolder<TVirtualize<TPollPoller>>(poller);
  270. case EContPoller::Epoll:
  271. #if defined(HAVE_EPOLL_POLLER)
  272. return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller);
  273. #else
  274. return nullptr;
  275. #endif
  276. case EContPoller::Kqueue:
  277. #if defined(HAVE_KQUEUE_POLLER)
  278. return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller);
  279. #else
  280. return nullptr;
  281. #endif
  282. default:
  283. Y_ABORT("bad poller type");
  284. }
  285. }