pollerimpl.h 18 KB


  1. #pragma once
  2. #include "socket.h"
  3. #include <util/system/error.h>
  4. #include <util/system/mutex.h>
  5. #include <util/system/defaults.h>
  6. #include <util/generic/ylimits.h>
  7. #include <util/generic/utility.h>
  8. #include <util/generic/vector.h>
  9. #include <util/generic/yexception.h>
  10. #include <util/datetime/base.h>
  11. #if defined(_freebsd_) || defined(_darwin_)
  12. #define HAVE_KQUEUE_POLLER
  13. #endif
  14. #if (defined(_linux_) && !defined(_bionic_)) || (__ANDROID_API__ >= 21)
  15. #define HAVE_EPOLL_POLLER
  16. #endif
  17. //now we always have it
  18. #define HAVE_SELECT_POLLER
  19. #if defined(HAVE_KQUEUE_POLLER)
  20. #include <sys/event.h>
  21. #endif
  22. #if defined(HAVE_EPOLL_POLLER)
  23. #include <sys/epoll.h>
  24. #endif
  25. enum EContPoll {
  26. CONT_POLL_READ = 1,
  27. CONT_POLL_WRITE = 2,
  28. CONT_POLL_RDHUP = 4,
  29. CONT_POLL_ONE_SHOT = 8, // Disable after first event
  30. CONT_POLL_MODIFY = 16, // Modify already added event
  31. CONT_POLL_EDGE_TRIGGERED = 32, // Notify only about new events
  32. CONT_POLL_BACKLOG_EMPTY = 64, // Backlog is empty (seen end of request, EAGAIN or truncated read)
  33. };
  34. static inline bool IsSocket(SOCKET fd) noexcept {
  35. int val = 0;
  36. socklen_t len = sizeof(val);
  37. if (getsockopt(fd, SOL_SOCKET, SO_TYPE, (char*)&val, &len) == 0) {
  38. return true;
  39. }
  40. return LastSystemError() != ENOTSOCK;
  41. }
  42. static inline int MicroToMilli(int timeout) noexcept {
  43. if (timeout) {
  44. /*
  45. * 1. API of epoll syscall allows to specify timeout with millisecond
  46. * accuracy only
  47. * 2. It is quite complicated to guarantee time resolution of blocking
  48. * syscall less than kernel 1/HZ
  49. *
  50. * Without this rounding we just waste cpu time and do a lot of
  51. * fast epoll_wait(..., 0) syscalls.
  52. */
  53. return Max(timeout / 1000, 1);
  54. }
  55. return 0;
  56. }
  57. struct TWithoutLocking {
  58. using TMyMutex = TFakeMutex;
  59. };
  60. #if defined(HAVE_KQUEUE_POLLER)
  61. static inline int Kevent(int kq, struct kevent* changelist, int nchanges,
  62. struct kevent* eventlist, int nevents, const struct timespec* timeout) noexcept {
  63. int ret;
  64. do {
  65. ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout);
  66. } while (ret == -1 && errno == EINTR);
  67. return ret;
  68. }
  69. template <class TLockPolicy>
  70. class TKqueuePoller {
  71. public:
  72. typedef struct ::kevent TEvent;
  73. inline TKqueuePoller()
  74. : Fd_(kqueue())
  75. {
  76. if (Fd_ == -1) {
  77. ythrow TSystemError() << "kqueue failed";
  78. }
  79. }
  80. inline ~TKqueuePoller() {
  81. close(Fd_);
  82. }
  83. inline int Fd() const noexcept {
  84. return Fd_;
  85. }
  86. inline void SetImpl(void* data, int fd, int what) {
  87. TEvent e[2];
  88. int flags = EV_ADD;
  89. if (what & CONT_POLL_EDGE_TRIGGERED) {
  90. if (what & CONT_POLL_BACKLOG_EMPTY) {
  91. // When backlog is empty, edge-triggered does not need restart.
  92. return;
  93. }
  94. flags |= EV_CLEAR;
  95. }
  96. if (what & CONT_POLL_ONE_SHOT) {
  97. flags |= EV_ONESHOT;
  98. }
  99. Zero(e);
  100. EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
  101. EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
  102. if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) {
  103. ythrow TSystemError() << "kevent add failed";
  104. }
  105. }
  106. inline void Remove(int fd) noexcept {
  107. TEvent e[2];
  108. Zero(e);
  109. EV_SET(e + 0, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
  110. EV_SET(e + 1, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  111. Y_VERIFY(!(Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1 && errno != ENOENT), "kevent remove failed: %s", LastSystemErrorText());
  112. }
  113. inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
  114. struct timespec ts;
  115. ts.tv_sec = timeout / 1000000;
  116. ts.tv_nsec = (timeout % 1000000) * 1000;
  117. const int ret = Kevent(Fd_, nullptr, 0, events, len, &ts);
  118. Y_VERIFY(ret >= 0, "kevent failed: %s", LastSystemErrorText());
  119. return (size_t)ret;
  120. }
  121. static inline void* ExtractEvent(const TEvent* event) noexcept {
  122. return event->udata;
  123. }
  124. static inline int ExtractStatus(const TEvent* event) noexcept {
  125. if (event->flags & EV_ERROR) {
  126. return EIO;
  127. }
  128. return event->fflags;
  129. }
  130. static inline int ExtractFilterImpl(const TEvent* event) noexcept {
  131. if (event->filter == EVFILT_READ) {
  132. return CONT_POLL_READ;
  133. }
  134. if (event->filter == EVFILT_WRITE) {
  135. return CONT_POLL_WRITE;
  136. }
  137. if (event->flags & EV_EOF) {
  138. return CONT_POLL_READ | CONT_POLL_WRITE;
  139. }
  140. return 0;
  141. }
  142. private:
  143. int Fd_;
  144. };
  145. #endif
  146. #if defined(HAVE_EPOLL_POLLER)
  147. static inline int ContEpollWait(int epfd, struct epoll_event* events, int maxevents, int timeout) noexcept {
  148. int ret;
  149. do {
  150. ret = epoll_wait(epfd, events, maxevents, Min<int>(timeout, 35 * 60 * 1000));
  151. } while (ret == -1 && errno == EINTR);
  152. return ret;
  153. }
  154. template <class TLockPolicy>
  155. class TEpollPoller {
  156. public:
  157. typedef struct ::epoll_event TEvent;
  158. inline TEpollPoller(bool closeOnExec = false)
  159. : Fd_(epoll_create1(closeOnExec ? EPOLL_CLOEXEC : 0))
  160. {
  161. if (Fd_ == -1) {
  162. ythrow TSystemError() << "epoll_create failed";
  163. }
  164. }
  165. inline ~TEpollPoller() {
  166. close(Fd_);
  167. }
  168. inline int Fd() const noexcept {
  169. return Fd_;
  170. }
  171. inline void SetImpl(void* data, int fd, int what) {
  172. TEvent e;
  173. Zero(e);
  174. if (what & CONT_POLL_EDGE_TRIGGERED) {
  175. if (what & CONT_POLL_BACKLOG_EMPTY) {
  176. // When backlog is empty, edge-triggered does not need restart.
  177. return;
  178. }
  179. e.events |= EPOLLET;
  180. }
  181. if (what & CONT_POLL_ONE_SHOT) {
  182. e.events |= EPOLLONESHOT;
  183. }
  184. if (what & CONT_POLL_READ) {
  185. e.events |= EPOLLIN;
  186. }
  187. if (what & CONT_POLL_WRITE) {
  188. e.events |= EPOLLOUT;
  189. }
  190. if (what & CONT_POLL_RDHUP) {
  191. e.events |= EPOLLRDHUP;
  192. }
  193. e.data.ptr = data;
  194. if (what & CONT_POLL_MODIFY) {
  195. if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) {
  196. ythrow TSystemError() << "epoll modify failed (fd=" << fd << ", what=" << what << ")";
  197. }
  198. } else if (epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) {
  199. if (LastSystemError() != EEXIST) {
  200. ythrow TSystemError() << "epoll add failed (fd=" << fd << ", what=" << what << ")";
  201. }
  202. if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) {
  203. ythrow TSystemError() << "epoll modify failed (fd=" << fd << ", what=" << what << ")";
  204. }
  205. }
  206. }
  207. inline void Remove(int fd) noexcept {
  208. TEvent e;
  209. Zero(e);
  210. epoll_ctl(Fd_, EPOLL_CTL_DEL, fd, &e);
  211. }
  212. inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
  213. const int ret = ContEpollWait(Fd_, events, len, MicroToMilli(timeout));
  214. Y_VERIFY(ret >= 0, "epoll wait error: %s", LastSystemErrorText());
  215. return (size_t)ret;
  216. }
  217. static inline void* ExtractEvent(const TEvent* event) noexcept {
  218. return event->data.ptr;
  219. }
  220. static inline int ExtractStatus(const TEvent* event) noexcept {
  221. if (event->events & (EPOLLERR | EPOLLHUP)) {
  222. return EIO;
  223. }
  224. return 0;
  225. }
  226. static inline int ExtractFilterImpl(const TEvent* event) noexcept {
  227. int ret = 0;
  228. if (event->events & EPOLLIN) {
  229. ret |= CONT_POLL_READ;
  230. }
  231. if (event->events & EPOLLOUT) {
  232. ret |= CONT_POLL_WRITE;
  233. }
  234. if (event->events & EPOLLRDHUP) {
  235. ret |= CONT_POLL_RDHUP;
  236. }
  237. return ret;
  238. }
  239. private:
  240. int Fd_;
  241. };
  242. #endif
  243. #if defined(HAVE_SELECT_POLLER)
  244. #include <util/memory/tempbuf.h>
  245. #include <util/generic/hash.h>
  246. #include "pair.h"
  247. static inline int ContSelect(int n, fd_set* r, fd_set* w, fd_set* e, struct timeval* t) noexcept {
  248. int ret;
  249. do {
  250. ret = select(n, r, w, e, t);
  251. } while (ret == -1 && errno == EINTR);
  252. return ret;
  253. }
  254. struct TSelectPollerNoTemplate {
  255. struct THandle {
  256. void* Data_;
  257. int Filter_;
  258. inline THandle()
  259. : Data_(nullptr)
  260. , Filter_(0)
  261. {
  262. }
  263. inline void* Data() const noexcept {
  264. return Data_;
  265. }
  266. inline void Set(void* d, int s) noexcept {
  267. Data_ = d;
  268. Filter_ = s;
  269. }
  270. inline void Clear(int c) noexcept {
  271. Filter_ &= ~c;
  272. }
  273. inline int Filter() const noexcept {
  274. return Filter_;
  275. }
  276. };
  277. class TFds: public THashMap<SOCKET, THandle> {
  278. public:
  279. inline void Set(SOCKET fd, void* data, int filter) {
  280. (*this)[fd].Set(data, filter);
  281. }
  282. inline void Remove(SOCKET fd) {
  283. erase(fd);
  284. }
  285. inline SOCKET Build(fd_set* r, fd_set* w, fd_set* e) const noexcept {
  286. SOCKET ret = 0;
  287. for (const auto& it : *this) {
  288. const SOCKET fd = it.first;
  289. const THandle& handle = it.second;
  290. FD_SET(fd, e);
  291. if (handle.Filter() & CONT_POLL_READ) {
  292. FD_SET(fd, r);
  293. }
  294. if (handle.Filter() & CONT_POLL_WRITE) {
  295. FD_SET(fd, w);
  296. }
  297. if (fd > ret) {
  298. ret = fd;
  299. }
  300. }
  301. return ret;
  302. }
  303. };
  304. struct TEvent: public THandle {
  305. inline int Status() const noexcept {
  306. return -Min(Filter(), 0);
  307. }
  308. inline void Error(void* d, int err) noexcept {
  309. Set(d, -err);
  310. }
  311. inline void Success(void* d, int what) noexcept {
  312. Set(d, what);
  313. }
  314. };
  315. };
  316. template <class TLockPolicy>
  317. class TSelectPoller: public TSelectPollerNoTemplate {
  318. using TMyMutex = typename TLockPolicy::TMyMutex;
  319. public:
  320. inline TSelectPoller()
  321. : Begin_(nullptr)
  322. , End_(nullptr)
  323. {
  324. SocketPair(Signal_);
  325. SetNonBlock(WaitSock());
  326. SetNonBlock(SigSock());
  327. }
  328. inline ~TSelectPoller() {
  329. closesocket(Signal_[0]);
  330. closesocket(Signal_[1]);
  331. }
  332. inline void SetImpl(void* data, SOCKET fd, int what) {
  333. with_lock (CommandLock_) {
  334. Commands_.push_back(TCommand(fd, what, data));
  335. }
  336. Signal();
  337. }
  338. inline void Remove(SOCKET fd) noexcept {
  339. with_lock (CommandLock_) {
  340. Commands_.push_back(TCommand(fd, 0));
  341. }
  342. Signal();
  343. }
  344. inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
  345. auto guard = Guard(Lock_);
  346. do {
  347. if (Begin_ != End_) {
  348. const size_t ret = Min<size_t>(End_ - Begin_, len);
  349. memcpy(events, Begin_, sizeof(*events) * ret);
  350. Begin_ += ret;
  351. return ret;
  352. }
  353. if (len >= EventNumberHint()) {
  354. return WaitBase(events, len, timeout);
  355. }
  356. Begin_ = SavedEvents();
  357. End_ = Begin_ + WaitBase(Begin_, EventNumberHint(), timeout);
  358. } while (Begin_ != End_);
  359. return 0;
  360. }
  361. inline TEvent* SavedEvents() {
  362. if (!SavedEvents_) {
  363. SavedEvents_.Reset(new TEvent[EventNumberHint()]);
  364. }
  365. return SavedEvents_.Get();
  366. }
  367. inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept {
  368. with_lock (CommandLock_) {
  369. for (auto command = Commands_.begin(); command != Commands_.end(); ++command) {
  370. if (command->Filter_ != 0) {
  371. Fds_.Set(command->Fd_, command->Cookie_, command->Filter_);
  372. } else {
  373. Fds_.Remove(command->Fd_);
  374. }
  375. }
  376. Commands_.clear();
  377. }
  378. TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET));
  379. fd_set* in = (fd_set*)tmpBuf.Data();
  380. fd_set* out = &in[1];
  381. fd_set* errFds = &in[2];
  382. SOCKET* keysToDeleteBegin = (SOCKET*)&in[3];
  383. SOCKET* keysToDeleteEnd = keysToDeleteBegin;
  384. #if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347
  385. memset(in, 0, sizeof(*in));
  386. memset(out, 0, sizeof(*out));
  387. memset(errFds, 0, sizeof(*errFds));
  388. #endif
  389. FD_ZERO(in);
  390. FD_ZERO(out);
  391. FD_ZERO(errFds);
  392. FD_SET(WaitSock(), in);
  393. const SOCKET maxFdNum = Max(Fds_.Build(in, out, errFds), WaitSock());
  394. struct timeval tout;
  395. tout.tv_sec = timeout / 1000000;
  396. tout.tv_usec = timeout % 1000000;
  397. int ret = ContSelect(int(maxFdNum + 1), in, out, errFds, &tout);
  398. if (ret > 0 && FD_ISSET(WaitSock(), in)) {
  399. --ret;
  400. TryWait();
  401. }
  402. Y_VERIFY(ret >= 0 && (size_t)ret <= len, "select error: %s", LastSystemErrorText());
  403. TEvent* eventsStart = events;
  404. for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) {
  405. const SOCKET fd = it->first;
  406. THandle& handle = it->second;
  407. if (FD_ISSET(fd, errFds)) {
  408. (events++)->Error(handle.Data(), EIO);
  409. if (handle.Filter() & CONT_POLL_ONE_SHOT) {
  410. *keysToDeleteEnd = fd;
  411. ++keysToDeleteEnd;
  412. }
  413. } else {
  414. int what = 0;
  415. if (FD_ISSET(fd, in)) {
  416. what |= CONT_POLL_READ;
  417. }
  418. if (FD_ISSET(fd, out)) {
  419. what |= CONT_POLL_WRITE;
  420. }
  421. if (what) {
  422. (events++)->Success(handle.Data(), what);
  423. if (handle.Filter() & CONT_POLL_ONE_SHOT) {
  424. *keysToDeleteEnd = fd;
  425. ++keysToDeleteEnd;
  426. }
  427. if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) {
  428. // Emulate edge-triggered for level-triggered select().
  429. // User must restart waiting this event when needed.
  430. handle.Clear(what);
  431. }
  432. }
  433. }
  434. }
  435. while (keysToDeleteBegin != keysToDeleteEnd) {
  436. Fds_.erase(*keysToDeleteBegin);
  437. ++keysToDeleteBegin;
  438. }
  439. return events - eventsStart;
  440. }
  441. inline size_t EventNumberHint() const noexcept {
  442. return sizeof(fd_set) * 8 * 2;
  443. }
  444. static inline void* ExtractEvent(const TEvent* event) noexcept {
  445. return event->Data();
  446. }
  447. static inline int ExtractStatus(const TEvent* event) noexcept {
  448. return event->Status();
  449. }
  450. static inline int ExtractFilterImpl(const TEvent* event) noexcept {
  451. return event->Filter();
  452. }
  453. private:
  454. inline void Signal() noexcept {
  455. char ch = 13;
  456. send(SigSock(), &ch, 1, 0);
  457. }
  458. inline void TryWait() {
  459. char ch[32];
  460. while (recv(WaitSock(), ch, sizeof(ch), 0) > 0) {
  461. Y_ASSERT(ch[0] == 13);
  462. }
  463. }
  464. inline SOCKET WaitSock() const noexcept {
  465. return Signal_[1];
  466. }
  467. inline SOCKET SigSock() const noexcept {
  468. return Signal_[0];
  469. }
  470. private:
  471. struct TCommand {
  472. SOCKET Fd_;
  473. int Filter_; // 0 to remove
  474. void* Cookie_;
  475. TCommand(SOCKET fd, int filter, void* cookie)
  476. : Fd_(fd)
  477. , Filter_(filter)
  478. , Cookie_(cookie)
  479. {
  480. }
  481. TCommand(SOCKET fd, int filter)
  482. : Fd_(fd)
  483. , Filter_(filter)
  484. , Cookie_(nullptr)
  485. {
  486. }
  487. };
  488. TFds Fds_;
  489. TMyMutex Lock_;
  490. TArrayHolder<TEvent> SavedEvents_;
  491. TEvent* Begin_;
  492. TEvent* End_;
  493. TMyMutex CommandLock_;
  494. TVector<TCommand> Commands_;
  495. SOCKET Signal_[2];
  496. };
  497. #endif
  498. static inline TDuration PollStep(const TInstant& deadLine, const TInstant& now) noexcept {
  499. if (deadLine < now) {
  500. return TDuration::Zero();
  501. }
  502. return Min(deadLine - now, TDuration::Seconds(1000));
  503. }
  504. template <class TBase>
  505. class TGenericPoller: public TBase {
  506. public:
  507. using TBase::TBase;
  508. using TEvent = typename TBase::TEvent;
  509. inline void Set(void* data, SOCKET fd, int what) {
  510. if (what) {
  511. this->SetImpl(data, fd, what);
  512. } else {
  513. this->Remove(fd);
  514. }
  515. }
  516. static inline int ExtractFilter(const TEvent* event) noexcept {
  517. if (TBase::ExtractStatus(event)) {
  518. return CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
  519. }
  520. return TBase::ExtractFilterImpl(event);
  521. }
  522. inline size_t WaitD(TEvent* events, size_t len, TInstant deadLine, TInstant now = TInstant::Now()) noexcept {
  523. if (!len) {
  524. return 0;
  525. }
  526. size_t ret;
  527. do {
  528. ret = this->Wait(events, len, (int)PollStep(deadLine, now).MicroSeconds());
  529. } while (!ret && ((now = TInstant::Now()) < deadLine));
  530. return ret;
  531. }
  532. };
  533. #if defined(HAVE_KQUEUE_POLLER)
  534. #define TPollerImplBase TKqueuePoller
  535. #elif defined(HAVE_EPOLL_POLLER)
  536. #define TPollerImplBase TEpollPoller
  537. #elif defined(HAVE_SELECT_POLLER)
  538. #define TPollerImplBase TSelectPoller
  539. #else
  540. #error "unsupported platform"
  541. #endif
  542. template <class TLockPolicy>
  543. using TPollerImpl = TGenericPoller<TPollerImplBase<TLockPolicy>>;
  544. #undef TPollerImplBase