pollerimpl.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706
  1. #pragma once
  2. #include "socket.h"
  3. #include <util/system/error.h>
  4. #include <util/system/mutex.h>
  5. #include <util/system/defaults.h>
  6. #include <util/generic/ylimits.h>
  7. #include <util/generic/utility.h>
  8. #include <util/generic/vector.h>
  9. #include <util/generic/yexception.h>
  10. #include <util/datetime/base.h>
  11. #if defined(_freebsd_) || defined(_darwin_)
  12. #define HAVE_KQUEUE_POLLER
  13. #endif
  14. #if (defined(_linux_) && !defined(_bionic_)) || (__ANDROID_API__ >= 21)
  15. #define HAVE_EPOLL_POLLER
  16. #endif
  17. //now we always have it
  18. #define HAVE_SELECT_POLLER
  19. #if defined(HAVE_KQUEUE_POLLER)
  20. #include <sys/event.h>
  21. #endif
  22. #if defined(HAVE_EPOLL_POLLER)
  23. #include <sys/epoll.h>
  24. #endif
  25. enum EContPoll {
  26. CONT_POLL_READ = 1,
  27. CONT_POLL_WRITE = 2,
  28. CONT_POLL_RDHUP = 4,
  29. CONT_POLL_ONE_SHOT = 8, // Disable after first event
  30. CONT_POLL_MODIFY = 16, // Modify already added event
  31. CONT_POLL_EDGE_TRIGGERED = 32, // Notify only about new events
  32. CONT_POLL_BACKLOG_EMPTY = 64, // Backlog is empty (seen end of request, EAGAIN or truncated read)
  33. };
  34. static inline bool IsSocket(SOCKET fd) noexcept {
  35. int val = 0;
  36. socklen_t len = sizeof(val);
  37. if (getsockopt(fd, SOL_SOCKET, SO_TYPE, (char*)&val, &len) == 0) {
  38. return true;
  39. }
  40. return LastSystemError() != ENOTSOCK;
  41. }
  42. static inline int MicroToMilli(int timeout) noexcept {
  43. if (timeout) {
  44. /*
  45. * 1. API of epoll syscall allows to specify timeout with millisecond
  46. * accuracy only
  47. * 2. It is quite complicated to guarantee time resolution of blocking
  48. * syscall less than kernel 1/HZ
  49. *
  50. * Without this rounding we just waste cpu time and do a lot of
  51. * fast epoll_wait(..., 0) syscalls.
  52. */
  53. return Max(timeout / 1000, 1);
  54. }
  55. return 0;
  56. }
  57. struct TWithoutLocking {
  58. using TMyMutex = TFakeMutex;
  59. };
  60. #if defined(HAVE_KQUEUE_POLLER)
  61. static inline int Kevent(int kq, struct kevent* changelist, int nchanges,
  62. struct kevent* eventlist, int nevents, const struct timespec* timeout) noexcept {
  63. int ret;
  64. do {
  65. ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout);
  66. } while (ret == -1 && errno == EINTR);
  67. return ret;
  68. }
  69. template <class TLockPolicy>
  70. class TKqueuePoller {
  71. public:
  72. typedef struct ::kevent TEvent;
  73. inline TKqueuePoller()
  74. : Fd_(kqueue())
  75. {
  76. if (Fd_ == -1) {
  77. ythrow TSystemError() << "kqueue failed";
  78. }
  79. }
  80. inline ~TKqueuePoller() {
  81. close(Fd_);
  82. }
  83. inline int Fd() const noexcept {
  84. return Fd_;
  85. }
  86. inline void SetImpl(void* data, int fd, int what) {
  87. TEvent e[2];
  88. int flags = EV_ADD;
  89. if (what & CONT_POLL_EDGE_TRIGGERED) {
  90. if (what & CONT_POLL_BACKLOG_EMPTY) {
  91. // When backlog is empty, edge-triggered does not need restart.
  92. return;
  93. }
  94. flags |= EV_CLEAR;
  95. }
  96. if (what & CONT_POLL_ONE_SHOT) {
  97. flags |= EV_ONESHOT;
  98. }
  99. Zero(e);
  100. EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
  101. EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
  102. if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) {
  103. ythrow TSystemError() << "kevent add failed";
  104. }
  105. }
  106. inline void Remove(int fd) noexcept {
  107. TEvent e[2];
  108. Zero(e);
  109. EV_SET(e + 0, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
  110. EV_SET(e + 1, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  111. Y_VERIFY(!(Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1 && errno != ENOENT), "kevent remove failed: %s", LastSystemErrorText());
  112. }
  113. inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
  114. struct timespec ts;
  115. ts.tv_sec = timeout / 1000000;
  116. ts.tv_nsec = (timeout % 1000000) * 1000;
  117. const int ret = Kevent(Fd_, nullptr, 0, events, len, &ts);
  118. Y_VERIFY(ret >= 0, "kevent failed: %s", LastSystemErrorText());
  119. return (size_t)ret;
  120. }
  121. static inline void* ExtractEvent(const TEvent* event) noexcept {
  122. return event->udata;
  123. }
  124. static inline int ExtractStatus(const TEvent* event) noexcept {
  125. if (event->flags & EV_ERROR) {
  126. return EIO;
  127. }
  128. return event->fflags;
  129. }
  130. static inline int ExtractFilterImpl(const TEvent* event) noexcept {
  131. if (event->filter == EVFILT_READ) {
  132. return CONT_POLL_READ;
  133. }
  134. if (event->filter == EVFILT_WRITE) {
  135. return CONT_POLL_WRITE;
  136. }
  137. if (event->flags & EV_EOF) {
  138. return CONT_POLL_READ | CONT_POLL_WRITE;
  139. }
  140. return 0;
  141. }
  142. private:
  143. int Fd_;
  144. };
  145. #endif
  146. #if defined(HAVE_EPOLL_POLLER)
  147. static inline int ContEpollWait(int epfd, struct epoll_event* events, int maxevents, int timeout) noexcept {
  148. int ret;
  149. do {
  150. ret = epoll_wait(epfd, events, maxevents, Min<int>(timeout, 35 * 60 * 1000));
  151. } while (ret == -1 && errno == EINTR);
  152. return ret;
  153. }
  154. template <class TLockPolicy>
  155. class TEpollPoller {
  156. public:
  157. typedef struct ::epoll_event TEvent;
  158. inline TEpollPoller(bool closeOnExec = false)
  159. : Fd_(epoll_create1(closeOnExec ? EPOLL_CLOEXEC : 0))
  160. {
  161. if (Fd_ == -1) {
  162. ythrow TSystemError() << "epoll_create failed";
  163. }
  164. }
  165. inline ~TEpollPoller() {
  166. close(Fd_);
  167. }
  168. inline int Fd() const noexcept {
  169. return Fd_;
  170. }
  171. inline void SetImpl(void* data, int fd, int what) {
  172. TEvent e;
  173. Zero(e);
  174. if (what & CONT_POLL_EDGE_TRIGGERED) {
  175. if (what & CONT_POLL_BACKLOG_EMPTY) {
  176. // When backlog is empty, edge-triggered does not need restart.
  177. return;
  178. }
  179. e.events |= EPOLLET;
  180. }
  181. if (what & CONT_POLL_ONE_SHOT) {
  182. e.events |= EPOLLONESHOT;
  183. }
  184. if (what & CONT_POLL_READ) {
  185. e.events |= EPOLLIN;
  186. }
  187. if (what & CONT_POLL_WRITE) {
  188. e.events |= EPOLLOUT;
  189. }
  190. if (what & CONT_POLL_RDHUP) {
  191. e.events |= EPOLLRDHUP;
  192. }
  193. e.data.ptr = data;
  194. if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) {
  195. if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) {
  196. ythrow TSystemError() << "epoll add failed";
  197. }
  198. }
  199. }
  200. inline void Remove(int fd) noexcept {
  201. TEvent e;
  202. Zero(e);
  203. epoll_ctl(Fd_, EPOLL_CTL_DEL, fd, &e);
  204. }
  205. inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
  206. const int ret = ContEpollWait(Fd_, events, len, MicroToMilli(timeout));
  207. Y_VERIFY(ret >= 0, "epoll wait error: %s", LastSystemErrorText());
  208. return (size_t)ret;
  209. }
  210. static inline void* ExtractEvent(const TEvent* event) noexcept {
  211. return event->data.ptr;
  212. }
  213. static inline int ExtractStatus(const TEvent* event) noexcept {
  214. if (event->events & (EPOLLERR | EPOLLHUP)) {
  215. return EIO;
  216. }
  217. return 0;
  218. }
  219. static inline int ExtractFilterImpl(const TEvent* event) noexcept {
  220. int ret = 0;
  221. if (event->events & EPOLLIN) {
  222. ret |= CONT_POLL_READ;
  223. }
  224. if (event->events & EPOLLOUT) {
  225. ret |= CONT_POLL_WRITE;
  226. }
  227. if (event->events & EPOLLRDHUP) {
  228. ret |= CONT_POLL_RDHUP;
  229. }
  230. return ret;
  231. }
  232. private:
  233. int Fd_;
  234. };
  235. #endif
  236. #if defined(HAVE_SELECT_POLLER)
  237. #include <util/memory/tempbuf.h>
  238. #include <util/generic/hash.h>
  239. #include "pair.h"
  240. static inline int ContSelect(int n, fd_set* r, fd_set* w, fd_set* e, struct timeval* t) noexcept {
  241. int ret;
  242. do {
  243. ret = select(n, r, w, e, t);
  244. } while (ret == -1 && errno == EINTR);
  245. return ret;
  246. }
  247. struct TSelectPollerNoTemplate {
  248. struct THandle {
  249. void* Data_;
  250. int Filter_;
  251. inline THandle()
  252. : Data_(nullptr)
  253. , Filter_(0)
  254. {
  255. }
  256. inline void* Data() const noexcept {
  257. return Data_;
  258. }
  259. inline void Set(void* d, int s) noexcept {
  260. Data_ = d;
  261. Filter_ = s;
  262. }
  263. inline void Clear(int c) noexcept {
  264. Filter_ &= ~c;
  265. }
  266. inline int Filter() const noexcept {
  267. return Filter_;
  268. }
  269. };
  270. class TFds: public THashMap<SOCKET, THandle> {
  271. public:
  272. inline void Set(SOCKET fd, void* data, int filter) {
  273. (*this)[fd].Set(data, filter);
  274. }
  275. inline void Remove(SOCKET fd) {
  276. erase(fd);
  277. }
  278. inline SOCKET Build(fd_set* r, fd_set* w, fd_set* e) const noexcept {
  279. SOCKET ret = 0;
  280. for (const auto& it : *this) {
  281. const SOCKET fd = it.first;
  282. const THandle& handle = it.second;
  283. FD_SET(fd, e);
  284. if (handle.Filter() & CONT_POLL_READ) {
  285. FD_SET(fd, r);
  286. }
  287. if (handle.Filter() & CONT_POLL_WRITE) {
  288. FD_SET(fd, w);
  289. }
  290. if (fd > ret) {
  291. ret = fd;
  292. }
  293. }
  294. return ret;
  295. }
  296. };
  297. struct TEvent: public THandle {
  298. inline int Status() const noexcept {
  299. return -Min(Filter(), 0);
  300. }
  301. inline void Error(void* d, int err) noexcept {
  302. Set(d, -err);
  303. }
  304. inline void Success(void* d, int what) noexcept {
  305. Set(d, what);
  306. }
  307. };
  308. };
  309. template <class TLockPolicy>
  310. class TSelectPoller: public TSelectPollerNoTemplate {
  311. using TMyMutex = typename TLockPolicy::TMyMutex;
  312. public:
  313. inline TSelectPoller()
  314. : Begin_(nullptr)
  315. , End_(nullptr)
  316. {
  317. SocketPair(Signal_);
  318. SetNonBlock(WaitSock());
  319. SetNonBlock(SigSock());
  320. }
  321. inline ~TSelectPoller() {
  322. closesocket(Signal_[0]);
  323. closesocket(Signal_[1]);
  324. }
  325. inline void SetImpl(void* data, SOCKET fd, int what) {
  326. with_lock (CommandLock_) {
  327. Commands_.push_back(TCommand(fd, what, data));
  328. }
  329. Signal();
  330. }
  331. inline void Remove(SOCKET fd) noexcept {
  332. with_lock (CommandLock_) {
  333. Commands_.push_back(TCommand(fd, 0));
  334. }
  335. Signal();
  336. }
  337. inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
  338. auto guard = Guard(Lock_);
  339. do {
  340. if (Begin_ != End_) {
  341. const size_t ret = Min<size_t>(End_ - Begin_, len);
  342. memcpy(events, Begin_, sizeof(*events) * ret);
  343. Begin_ += ret;
  344. return ret;
  345. }
  346. if (len >= EventNumberHint()) {
  347. return WaitBase(events, len, timeout);
  348. }
  349. Begin_ = SavedEvents();
  350. End_ = Begin_ + WaitBase(Begin_, EventNumberHint(), timeout);
  351. } while (Begin_ != End_);
  352. return 0;
  353. }
  354. inline TEvent* SavedEvents() {
  355. if (!SavedEvents_) {
  356. SavedEvents_.Reset(new TEvent[EventNumberHint()]);
  357. }
  358. return SavedEvents_.Get();
  359. }
  360. inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept {
  361. with_lock (CommandLock_) {
  362. for (auto command = Commands_.begin(); command != Commands_.end(); ++command) {
  363. if (command->Filter_ != 0) {
  364. Fds_.Set(command->Fd_, command->Cookie_, command->Filter_);
  365. } else {
  366. Fds_.Remove(command->Fd_);
  367. }
  368. }
  369. Commands_.clear();
  370. }
  371. TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET));
  372. fd_set* in = (fd_set*)tmpBuf.Data();
  373. fd_set* out = &in[1];
  374. fd_set* errFds = &in[2];
  375. SOCKET* keysToDeleteBegin = (SOCKET*)&in[3];
  376. SOCKET* keysToDeleteEnd = keysToDeleteBegin;
  377. #if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347
  378. memset(in, 0, sizeof(*in));
  379. memset(out, 0, sizeof(*out));
  380. memset(errFds, 0, sizeof(*errFds));
  381. #endif
  382. FD_ZERO(in);
  383. FD_ZERO(out);
  384. FD_ZERO(errFds);
  385. FD_SET(WaitSock(), in);
  386. const SOCKET maxFdNum = Max(Fds_.Build(in, out, errFds), WaitSock());
  387. struct timeval tout;
  388. tout.tv_sec = timeout / 1000000;
  389. tout.tv_usec = timeout % 1000000;
  390. int ret = ContSelect(int(maxFdNum + 1), in, out, errFds, &tout);
  391. if (ret > 0 && FD_ISSET(WaitSock(), in)) {
  392. --ret;
  393. TryWait();
  394. }
  395. Y_VERIFY(ret >= 0 && (size_t)ret <= len, "select error: %s", LastSystemErrorText());
  396. TEvent* eventsStart = events;
  397. for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) {
  398. const SOCKET fd = it->first;
  399. THandle& handle = it->second;
  400. if (FD_ISSET(fd, errFds)) {
  401. (events++)->Error(handle.Data(), EIO);
  402. if (handle.Filter() & CONT_POLL_ONE_SHOT) {
  403. *keysToDeleteEnd = fd;
  404. ++keysToDeleteEnd;
  405. }
  406. } else {
  407. int what = 0;
  408. if (FD_ISSET(fd, in)) {
  409. what |= CONT_POLL_READ;
  410. }
  411. if (FD_ISSET(fd, out)) {
  412. what |= CONT_POLL_WRITE;
  413. }
  414. if (what) {
  415. (events++)->Success(handle.Data(), what);
  416. if (handle.Filter() & CONT_POLL_ONE_SHOT) {
  417. *keysToDeleteEnd = fd;
  418. ++keysToDeleteEnd;
  419. }
  420. if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) {
  421. // Emulate edge-triggered for level-triggered select().
  422. // User must restart waiting this event when needed.
  423. handle.Clear(what);
  424. }
  425. }
  426. }
  427. }
  428. while (keysToDeleteBegin != keysToDeleteEnd) {
  429. Fds_.erase(*keysToDeleteBegin);
  430. ++keysToDeleteBegin;
  431. }
  432. return events - eventsStart;
  433. }
  434. inline size_t EventNumberHint() const noexcept {
  435. return sizeof(fd_set) * 8 * 2;
  436. }
  437. static inline void* ExtractEvent(const TEvent* event) noexcept {
  438. return event->Data();
  439. }
  440. static inline int ExtractStatus(const TEvent* event) noexcept {
  441. return event->Status();
  442. }
  443. static inline int ExtractFilterImpl(const TEvent* event) noexcept {
  444. return event->Filter();
  445. }
  446. private:
  447. inline void Signal() noexcept {
  448. char ch = 13;
  449. send(SigSock(), &ch, 1, 0);
  450. }
  451. inline void TryWait() {
  452. char ch[32];
  453. while (recv(WaitSock(), ch, sizeof(ch), 0) > 0) {
  454. Y_ASSERT(ch[0] == 13);
  455. }
  456. }
  457. inline SOCKET WaitSock() const noexcept {
  458. return Signal_[1];
  459. }
  460. inline SOCKET SigSock() const noexcept {
  461. return Signal_[0];
  462. }
  463. private:
  464. struct TCommand {
  465. SOCKET Fd_;
  466. int Filter_; // 0 to remove
  467. void* Cookie_;
  468. TCommand(SOCKET fd, int filter, void* cookie)
  469. : Fd_(fd)
  470. , Filter_(filter)
  471. , Cookie_(cookie)
  472. {
  473. }
  474. TCommand(SOCKET fd, int filter)
  475. : Fd_(fd)
  476. , Filter_(filter)
  477. {
  478. }
  479. };
  480. TFds Fds_;
  481. TMyMutex Lock_;
  482. TArrayHolder<TEvent> SavedEvents_;
  483. TEvent* Begin_;
  484. TEvent* End_;
  485. TMyMutex CommandLock_;
  486. TVector<TCommand> Commands_;
  487. SOCKET Signal_[2];
  488. };
  489. #endif
  490. static inline TDuration PollStep(const TInstant& deadLine, const TInstant& now) noexcept {
  491. if (deadLine < now) {
  492. return TDuration::Zero();
  493. }
  494. return Min(deadLine - now, TDuration::Seconds(1000));
  495. }
  496. template <class TBase>
  497. class TGenericPoller: public TBase {
  498. public:
  499. using TBase::TBase;
  500. using TEvent = typename TBase::TEvent;
  501. inline void Set(void* data, SOCKET fd, int what) {
  502. if (what) {
  503. this->SetImpl(data, fd, what);
  504. } else {
  505. this->Remove(fd);
  506. }
  507. }
  508. static inline int ExtractFilter(const TEvent* event) noexcept {
  509. if (TBase::ExtractStatus(event)) {
  510. return CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
  511. }
  512. return TBase::ExtractFilterImpl(event);
  513. }
  514. inline size_t WaitD(TEvent* events, size_t len, TInstant deadLine, TInstant now = TInstant::Now()) noexcept {
  515. if (!len) {
  516. return 0;
  517. }
  518. size_t ret;
  519. do {
  520. ret = this->Wait(events, len, (int)PollStep(deadLine, now).MicroSeconds());
  521. } while (!ret && ((now = TInstant::Now()) < deadLine));
  522. return ret;
  523. }
  524. };
  525. #if defined(HAVE_KQUEUE_POLLER)
  526. #define TPollerImplBase TKqueuePoller
  527. #elif defined(HAVE_EPOLL_POLLER)
  528. #define TPollerImplBase TEpollPoller
  529. #elif defined(HAVE_SELECT_POLLER)
  530. #define TPollerImplBase TSelectPoller
  531. #else
  532. #error "unsupported platform"
  533. #endif
  534. template <class TLockPolicy>
  535. using TPollerImpl = TGenericPoller<TPollerImplBase<TLockPolicy>>;
  536. #undef TPollerImplBase