notification_handle.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #include "notification_handle.h"
  2. #include <library/cpp/yt/system/handle_eintr.h>
  3. #include <library/cpp/yt/assert/assert.h>
  4. #ifdef _linux_
  5. #include <unistd.h>
  6. #include <sys/eventfd.h>
  7. #endif
  8. #ifdef _darwin_
  9. #include <fcntl.h>
  10. #include <unistd.h>
  11. #endif
  12. #ifdef _win_
  13. #include <util/network/socket.h>
  14. #endif
  15. namespace NYT::NThreading {
  16. ////////////////////////////////////////////////////////////////////////////////
  17. TNotificationHandle::TNotificationHandle(bool blocking)
  18. {
  19. #ifdef _linux_
  20. EventFD_ = HandleEintr(
  21. eventfd,
  22. 0,
  23. EFD_CLOEXEC | (blocking ? 0 : EFD_NONBLOCK));
  24. YT_VERIFY(EventFD_ >= 0);
  25. #elif defined(_win_)
  26. TPipeHandle::Pipe(Reader_, Writer_, EOpenModeFlag::CloseOnExec);
  27. if (!blocking) {
  28. SetNonBlock(Reader_);
  29. }
  30. #else
  31. #ifdef _darwin_
  32. YT_VERIFY(HandleEintr(pipe, PipeFDs_) == 0);
  33. #else
  34. YT_VERIFY(HandleEintr(pipe2, PipeFDs_, O_CLOEXEC) == 0);
  35. #endif
  36. if (!blocking) {
  37. YT_VERIFY(fcntl(PipeFDs_[0], F_SETFL, O_NONBLOCK) == 0);
  38. }
  39. #endif
  40. }
  41. TNotificationHandle::~TNotificationHandle()
  42. {
  43. #ifdef _linux_
  44. YT_VERIFY(HandleEintr(close, EventFD_) == 0);
  45. #elif !defined(_win_)
  46. YT_VERIFY(HandleEintr(close, PipeFDs_[0]) == 0);
  47. YT_VERIFY(HandleEintr(close, PipeFDs_[1]) == 0);
  48. #endif
  49. }
  50. void TNotificationHandle::Raise()
  51. {
  52. #ifdef _linux_
  53. uint64_t one = 1;
  54. YT_VERIFY(HandleEintr(write, EventFD_, &one, sizeof(one)) == sizeof(one));
  55. #elif defined(_win_)
  56. char c = 'x';
  57. YT_VERIFY(Writer_.Write(&c, sizeof(char)) == sizeof(char));
  58. #else
  59. char c = 'x';
  60. YT_VERIFY(HandleEintr(write, PipeFDs_[1], &c, sizeof(char)) == sizeof(char));
  61. #endif
  62. }
  63. void TNotificationHandle::Clear()
  64. {
  65. #ifdef _linux_
  66. uint64_t count = 0;
  67. auto ret = HandleEintr(read, EventFD_, &count, sizeof(count));
  68. // For edge-triggered one could clear multiple events, others get nothing.
  69. YT_VERIFY(ret == sizeof(count) || (ret < 0 && errno == EAGAIN));
  70. #elif defined(_win_)
  71. while (true) {
  72. char c;
  73. auto ret = Reader_.Read(&c, sizeof(c));
  74. YT_VERIFY(ret == sizeof(c) || (ret == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK));
  75. if (ret == SOCKET_ERROR) {
  76. break;
  77. }
  78. }
  79. #else
  80. while (true) {
  81. char c;
  82. auto ret = HandleEintr(read, PipeFDs_[0], &c, sizeof(c));
  83. YT_VERIFY(ret == sizeof(c) || (ret < 0 && errno == EAGAIN));
  84. if (ret < 0) {
  85. break;
  86. }
  87. }
  88. #endif
  89. }
  90. int TNotificationHandle::GetFD() const
  91. {
  92. #ifdef _linux_
  93. return EventFD_;
  94. #elif defined(_win_)
  95. return Reader_;
  96. #else
  97. return PipeFDs_[0];
  98. #endif
  99. }
  100. ////////////////////////////////////////////////////////////////////////////////
  101. } // namespace NYT::NThreading