poller_actor_darwin.h 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #pragma once
  2. #include <sys/event.h>
  3. namespace NActors {
  4. class TKqueueThread : public TPollerThreadBase<TKqueueThread> {
  5. // KQueue file descriptor
  6. int KqDescriptor;
  7. void SafeKevent(const struct kevent* ev, int size) {
  8. int rc;
  9. do {
  10. rc = kevent(KqDescriptor, ev, size, nullptr, 0, nullptr);
  11. } while (rc == -1 && errno == EINTR);
  12. Y_VERIFY(rc != -1, "kevent() failed with %s", strerror(errno));
  13. }
  14. public:
  15. TKqueueThread(TActorSystem *actorSystem)
  16. : TPollerThreadBase(actorSystem)
  17. {
  18. // create kqueue
  19. KqDescriptor = kqueue();
  20. Y_VERIFY(KqDescriptor != -1, "kqueue() failed with %s", strerror(errno));
  21. // set close-on-exit flag
  22. {
  23. int flags = fcntl(KqDescriptor, F_GETFD);
  24. Y_VERIFY(flags >= 0, "fcntl(F_GETFD) failed with %s", strerror(errno));
  25. int rc = fcntl(KqDescriptor, F_SETFD, flags | FD_CLOEXEC);
  26. Y_VERIFY(rc != -1, "fcntl(F_SETFD, +FD_CLOEXEC) failed with %s", strerror(errno));
  27. }
  28. // register pipe's read end in poller
  29. struct kevent ev;
  30. EV_SET(&ev, (int)ReadEnd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, nullptr);
  31. SafeKevent(&ev, 1);
  32. ISimpleThread::Start(); // start poller thread
  33. }
  34. ~TKqueueThread() {
  35. Stop();
  36. close(KqDescriptor);
  37. }
  38. void ProcessEventsInLoop() {
  39. std::array<struct kevent, 256> events;
  40. int numReady = kevent(KqDescriptor, nullptr, 0, events.data(), events.size(), nullptr);
  41. if (numReady == -1) {
  42. if (errno == EINTR) {
  43. return;
  44. } else {
  45. Y_FAIL("kevent() failed with %s", strerror(errno));
  46. }
  47. }
  48. for (int i = 0; i < numReady; ++i) {
  49. const struct kevent& ev = events[i];
  50. if (ev.udata) {
  51. TSocketRecord *it = static_cast<TSocketRecord*>(ev.udata);
  52. const bool error = ev.flags & (EV_EOF | EV_ERROR);
  53. const bool read = error || ev.filter == EVFILT_READ;
  54. const bool write = error || ev.filter == EVFILT_WRITE;
  55. Notify(it, read, write);
  56. }
  57. }
  58. }
  59. void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
  60. struct kevent ev[2];
  61. const int fd = socket->GetDescriptor();
  62. EV_SET(&ev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
  63. EV_SET(&ev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
  64. SafeKevent(ev, 2);
  65. }
  66. void RegisterSocket(const TIntrusivePtr<TSocketRecord>& record) {
  67. int flags = EV_ADD | EV_CLEAR | EV_ENABLE;
  68. struct kevent ev[2];
  69. const int fd = record->Socket->GetDescriptor();
  70. EV_SET(&ev[0], fd, EVFILT_READ, flags, 0, 0, record.Get());
  71. EV_SET(&ev[1], fd, EVFILT_WRITE, flags, 0, 0, record.Get());
  72. SafeKevent(ev, 2);
  73. }
  74. void Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/)
  75. {} // no special processing here as we use kqueue in edge-triggered mode
  76. };
  77. using TPollerThread = TKqueueThread;
  78. }