#pragma once #include "socket.h" #include #include #include #include #include #include #include #include #if defined(_freebsd_) || defined(_darwin_) #define HAVE_KQUEUE_POLLER #endif #if (defined(_linux_) && !defined(_bionic_)) || (__ANDROID_API__ >= 21) #define HAVE_EPOLL_POLLER #endif // now we always have it #define HAVE_SELECT_POLLER #if defined(HAVE_KQUEUE_POLLER) #include #endif #if defined(HAVE_EPOLL_POLLER) #include #endif enum EContPoll { CONT_POLL_READ = 1, CONT_POLL_WRITE = 2, CONT_POLL_RDHUP = 4, CONT_POLL_ONE_SHOT = 8, // Disable after first event CONT_POLL_MODIFY = 16, // Modify already added event CONT_POLL_EDGE_TRIGGERED = 32, // Notify only about new events CONT_POLL_BACKLOG_EMPTY = 64, // Backlog is empty (seen end of request, EAGAIN or truncated read) }; static inline bool IsSocket(SOCKET fd) noexcept { int val = 0; socklen_t len = sizeof(val); if (getsockopt(fd, SOL_SOCKET, SO_TYPE, (char*)&val, &len) == 0) { return true; } return LastSystemError() != ENOTSOCK; } static inline int MicroToMilli(int timeout) noexcept { if (timeout) { /* * 1. API of epoll syscall allows to specify timeout with millisecond * accuracy only * 2. It is quite complicated to guarantee time resolution of blocking * syscall less than kernel 1/HZ * * Without this rounding we just waste cpu time and do a lot of * fast epoll_wait(..., 0) syscalls. */ return Max(timeout / 1000, 1); } return 0; } struct TWithoutLocking { using TMyMutex = TFakeMutex; }; #if defined(HAVE_KQUEUE_POLLER) static inline int Kevent(int kq, struct kevent* changelist, int nchanges, struct kevent* eventlist, int nevents, const struct timespec* timeout) noexcept { int ret; do { ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout); } while (ret == -1 && errno == EINTR); return ret; } template class TKqueuePoller { public: typedef struct ::kevent TEvent; inline TKqueuePoller() : Fd_(kqueue()) { if (Fd_ == -1) { ythrow TSystemError() << "kqueue failed"; } } inline ~TKqueuePoller() { close(Fd_); } inline int Fd() const noexcept { return Fd_; } inline void SetImpl(void* data, int fd, int what) { TEvent e[2]; int flags = EV_ADD; if (what & CONT_POLL_EDGE_TRIGGERED) { if (what & CONT_POLL_BACKLOG_EMPTY) { // When backlog is empty, edge-triggered does not need restart. return; } flags |= EV_CLEAR; } if (what & CONT_POLL_ONE_SHOT) { flags |= EV_ONESHOT; } Zero(e); EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data); EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data); if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) { ythrow TSystemError() << "kevent add failed"; } } inline void Remove(int fd) noexcept { TEvent e[2]; Zero(e); EV_SET(e + 0, fd, EVFILT_READ, EV_DELETE, 0, 0, 0); EV_SET(e + 1, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); Y_ABORT_UNLESS(!(Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1 && errno != ENOENT), "kevent remove failed: %s", LastSystemErrorText()); } inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept { struct timespec ts; ts.tv_sec = timeout / 1000000; ts.tv_nsec = (timeout % 1000000) * 1000; const int ret = Kevent(Fd_, nullptr, 0, events, len, &ts); Y_ABORT_UNLESS(ret >= 0, "kevent failed: %s", LastSystemErrorText()); return (size_t)ret; } static inline void* ExtractEvent(const TEvent* event) noexcept { return event->udata; } static inline int ExtractStatus(const TEvent* event) noexcept { if (event->flags & EV_ERROR) { return EIO; } return event->fflags; } static inline int ExtractFilterImpl(const TEvent* event) noexcept { if (event->filter == EVFILT_READ) { return CONT_POLL_READ; } if (event->filter == EVFILT_WRITE) { return CONT_POLL_WRITE; } if (event->flags & EV_EOF) { return CONT_POLL_READ | CONT_POLL_WRITE; } return 0; } private: int Fd_; }; #endif #if defined(HAVE_EPOLL_POLLER) static inline int ContEpollWait(int epfd, struct epoll_event* events, int maxevents, int timeout) noexcept { int ret; do { ret = epoll_wait(epfd, events, maxevents, Min(timeout, 35 * 60 * 1000)); } while (ret == -1 && errno == EINTR); return ret; } template class TEpollPoller { public: typedef struct ::epoll_event TEvent; inline TEpollPoller(bool closeOnExec = false) : Fd_(epoll_create1(closeOnExec ? EPOLL_CLOEXEC : 0)) { if (Fd_ == -1) { ythrow TSystemError() << "epoll_create failed"; } } inline ~TEpollPoller() { close(Fd_); } inline int Fd() const noexcept { return Fd_; } inline void SetImpl(void* data, int fd, int what) { TEvent e; Zero(e); if (what & CONT_POLL_EDGE_TRIGGERED) { if (what & CONT_POLL_BACKLOG_EMPTY) { // When backlog is empty, edge-triggered does not need restart. return; } e.events |= EPOLLET; } if (what & CONT_POLL_ONE_SHOT) { e.events |= EPOLLONESHOT; } if (what & CONT_POLL_READ) { e.events |= EPOLLIN; } if (what & CONT_POLL_WRITE) { e.events |= EPOLLOUT; } if (what & CONT_POLL_RDHUP) { e.events |= EPOLLRDHUP; } e.data.ptr = data; if (what & CONT_POLL_MODIFY) { if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) { ythrow TSystemError() << "epoll modify failed (fd=" << fd << ", what=" << what << ")"; } } else if (epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) { if (LastSystemError() != EEXIST) { ythrow TSystemError() << "epoll add failed (fd=" << fd << ", what=" << what << ")"; } if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) { ythrow TSystemError() << "epoll modify failed (fd=" << fd << ", what=" << what << ")"; } } } inline void Remove(int fd) noexcept { TEvent e; Zero(e); epoll_ctl(Fd_, EPOLL_CTL_DEL, fd, &e); } inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept { const int ret = ContEpollWait(Fd_, events, len, MicroToMilli(timeout)); Y_ABORT_UNLESS(ret >= 0, "epoll wait error: %s", LastSystemErrorText()); return (size_t)ret; } static inline void* ExtractEvent(const TEvent* event) noexcept { return event->data.ptr; } static inline int ExtractStatus(const TEvent* event) noexcept { if (event->events & (EPOLLERR | EPOLLHUP)) { return EIO; } return 0; } static inline int ExtractFilterImpl(const TEvent* event) noexcept { int ret = 0; if (event->events & EPOLLIN) { ret |= CONT_POLL_READ; } if (event->events & EPOLLOUT) { ret |= CONT_POLL_WRITE; } if (event->events & EPOLLRDHUP) { ret |= CONT_POLL_RDHUP; } return ret; } private: int Fd_; }; #endif #if defined(HAVE_SELECT_POLLER) #include #include #include "pair.h" static inline int ContSelect(int n, fd_set* r, fd_set* w, fd_set* e, struct timeval* t) noexcept { int ret; do { ret = select(n, r, w, e, t); } while (ret == -1 && errno == EINTR); return ret; } struct TSelectPollerNoTemplate { struct THandle { void* Data_; int Filter_; inline THandle() : Data_(nullptr) , Filter_(0) { } inline void* Data() const noexcept { return Data_; } inline void Set(void* d, int s) noexcept { Data_ = d; Filter_ = s; } inline void Clear(int c) noexcept { Filter_ &= ~c; } inline int Filter() const noexcept { return Filter_; } }; class TFds: public THashMap { public: inline void Set(SOCKET fd, void* data, int filter) { (*this)[fd].Set(data, filter); } inline void Remove(SOCKET fd) { erase(fd); } inline SOCKET Build(fd_set* r, fd_set* w, fd_set* e) const noexcept { SOCKET ret = 0; for (const auto& it : *this) { const SOCKET fd = it.first; const THandle& handle = it.second; FD_SET(fd, e); if (handle.Filter() & CONT_POLL_READ) { FD_SET(fd, r); } if (handle.Filter() & CONT_POLL_WRITE) { FD_SET(fd, w); } if (fd > ret) { ret = fd; } } return ret; } }; struct TEvent: public THandle { inline int Status() const noexcept { return -Min(Filter(), 0); } inline void Error(void* d, int err) noexcept { Set(d, -err); } inline void Success(void* d, int what) noexcept { Set(d, what); } }; }; template class TSelectPoller: public TSelectPollerNoTemplate { using TMyMutex = typename TLockPolicy::TMyMutex; public: inline TSelectPoller() : Begin_(nullptr) , End_(nullptr) { SocketPair(Signal_); SetNonBlock(WaitSock()); SetNonBlock(SigSock()); } inline ~TSelectPoller() { closesocket(Signal_[0]); closesocket(Signal_[1]); } inline void SetImpl(void* data, SOCKET fd, int what) { with_lock (CommandLock_) { Commands_.push_back(TCommand(fd, what, data)); } Signal(); } inline void Remove(SOCKET fd) noexcept { with_lock (CommandLock_) { Commands_.push_back(TCommand(fd, 0)); } Signal(); } inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept { auto guard = Guard(Lock_); do { if (Begin_ != End_) { const size_t ret = Min(End_ - Begin_, len); memcpy(events, Begin_, sizeof(*events) * ret); Begin_ += ret; return ret; } if (len >= EventNumberHint()) { return WaitBase(events, len, timeout); } Begin_ = SavedEvents(); End_ = Begin_ + WaitBase(Begin_, EventNumberHint(), timeout); } while (Begin_ != End_); return 0; } inline TEvent* SavedEvents() { if (!SavedEvents_) { SavedEvents_.Reset(new TEvent[EventNumberHint()]); } return SavedEvents_.Get(); } inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept { with_lock (CommandLock_) { for (auto command = Commands_.begin(); command != Commands_.end(); ++command) { if (command->Filter_ != 0) { Fds_.Set(command->Fd_, command->Cookie_, command->Filter_); } else { Fds_.Remove(command->Fd_); } } Commands_.clear(); } TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET)); fd_set* in = (fd_set*)tmpBuf.Data(); fd_set* out = &in[1]; fd_set* errFds = &in[2]; SOCKET* keysToDeleteBegin = (SOCKET*)&in[3]; SOCKET* keysToDeleteEnd = keysToDeleteBegin; #if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347 memset(in, 0, sizeof(*in)); memset(out, 0, sizeof(*out)); memset(errFds, 0, sizeof(*errFds)); #endif FD_ZERO(in); FD_ZERO(out); FD_ZERO(errFds); FD_SET(WaitSock(), in); const SOCKET maxFdNum = Max(Fds_.Build(in, out, errFds), WaitSock()); struct timeval tout; tout.tv_sec = timeout / 1000000; tout.tv_usec = timeout % 1000000; int ret = ContSelect(int(maxFdNum + 1), in, out, errFds, &tout); if (ret > 0 && FD_ISSET(WaitSock(), in)) { --ret; TryWait(); } Y_ABORT_UNLESS(ret >= 0 && (size_t)ret <= len, "select error: %s", LastSystemErrorText()); TEvent* eventsStart = events; for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) { const SOCKET fd = it->first; THandle& handle = it->second; if (FD_ISSET(fd, errFds)) { (events++)->Error(handle.Data(), EIO); if (handle.Filter() & CONT_POLL_ONE_SHOT) { *keysToDeleteEnd = fd; ++keysToDeleteEnd; } } else { int what = 0; if (FD_ISSET(fd, in)) { what |= CONT_POLL_READ; } if (FD_ISSET(fd, out)) { what |= CONT_POLL_WRITE; } if (what) { (events++)->Success(handle.Data(), what); if (handle.Filter() & CONT_POLL_ONE_SHOT) { *keysToDeleteEnd = fd; ++keysToDeleteEnd; } if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) { // Emulate edge-triggered for level-triggered select(). // User must restart waiting this event when needed. handle.Clear(what); } } } } while (keysToDeleteBegin != keysToDeleteEnd) { Fds_.erase(*keysToDeleteBegin); ++keysToDeleteBegin; } return events - eventsStart; } inline size_t EventNumberHint() const noexcept { return sizeof(fd_set) * 8 * 2; } static inline void* ExtractEvent(const TEvent* event) noexcept { return event->Data(); } static inline int ExtractStatus(const TEvent* event) noexcept { return event->Status(); } static inline int ExtractFilterImpl(const TEvent* event) noexcept { return event->Filter(); } private: inline void Signal() noexcept { char ch = 13; send(SigSock(), &ch, 1, 0); } inline void TryWait() { char ch[32]; while (recv(WaitSock(), ch, sizeof(ch), 0) > 0) { Y_ASSERT(ch[0] == 13); } } inline SOCKET WaitSock() const noexcept { return Signal_[1]; } inline SOCKET SigSock() const noexcept { return Signal_[0]; } private: struct TCommand { SOCKET Fd_; int Filter_; // 0 to remove void* Cookie_; TCommand(SOCKET fd, int filter, void* cookie) : Fd_(fd) , Filter_(filter) , Cookie_(cookie) { } TCommand(SOCKET fd, int filter) : Fd_(fd) , Filter_(filter) , Cookie_(nullptr) { } }; TFds Fds_; TMyMutex Lock_; TArrayHolder SavedEvents_; TEvent* Begin_; TEvent* End_; TMyMutex CommandLock_; TVector Commands_; SOCKET Signal_[2]; }; #endif static inline TDuration PollStep(const TInstant& deadLine, const TInstant& now) noexcept { if (deadLine < now) { return TDuration::Zero(); } return Min(deadLine - now, TDuration::Seconds(1000)); } template class TGenericPoller: public TBase { public: using TBase::TBase; using TEvent = typename TBase::TEvent; inline void Set(void* data, SOCKET fd, int what) { if (what) { this->SetImpl(data, fd, what); } else { this->Remove(fd); } } static inline int ExtractFilter(const TEvent* event) noexcept { if (TBase::ExtractStatus(event)) { return CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP; } return TBase::ExtractFilterImpl(event); } inline size_t WaitD(TEvent* events, size_t len, TInstant deadLine, TInstant now = TInstant::Now()) noexcept { if (!len) { return 0; } size_t ret; do { ret = this->Wait(events, len, (int)PollStep(deadLine, now).MicroSeconds()); } while (!ret && ((now = TInstant::Now()) < deadLine)); return ret; } }; #if defined(HAVE_KQUEUE_POLLER) #define TPollerImplBase TKqueuePoller #elif defined(HAVE_EPOLL_POLLER) #define TPollerImplBase TEpollPoller #elif defined(HAVE_SELECT_POLLER) #define TPollerImplBase TSelectPoller #else #error "unsupported platform" #endif template using TPollerImpl = TGenericPoller>; #undef TPollerImplBase