notification_handle.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. #include "notification_handle.h"
  2. #include <library/cpp/yt/exception/exception.h>
  3. #include <library/cpp/yt/system/handle_eintr.h>
  4. #include <library/cpp/yt/assert/assert.h>
  5. #ifdef _linux_
  6. #include <unistd.h>
  7. #include <sys/eventfd.h>
  8. #endif
  9. #ifdef _darwin_
  10. #include <fcntl.h>
  11. #include <unistd.h>
  12. #endif
  13. #ifdef _win_
  14. #include <util/network/socket.h>
  15. #endif
  16. namespace NYT::NThreading {
  17. ////////////////////////////////////////////////////////////////////////////////
  18. TNotificationHandle::TNotificationHandle(bool blocking)
  19. {
  20. #ifdef _linux_
  21. EventFD_ = HandleEintr(
  22. eventfd,
  23. 0,
  24. EFD_CLOEXEC | (blocking ? 0 : EFD_NONBLOCK));
  25. if (EventFD_ < 0) {
  26. throw TSimpleException("Error creating notification handle");
  27. }
  28. #elif defined(_win_)
  29. TPipeHandle::Pipe(Reader_, Writer_, EOpenModeFlag::CloseOnExec);
  30. if (!blocking) {
  31. SetNonBlock(Reader_);
  32. }
  33. #else
  34. #ifdef _darwin_
  35. YT_VERIFY(HandleEintr(pipe, PipeFDs_) == 0);
  36. #else
  37. YT_VERIFY(HandleEintr(pipe2, PipeFDs_, O_CLOEXEC) == 0);
  38. #endif
  39. if (!blocking) {
  40. YT_VERIFY(fcntl(PipeFDs_[0], F_SETFL, O_NONBLOCK) == 0);
  41. }
  42. #endif
  43. }
  44. TNotificationHandle::~TNotificationHandle()
  45. {
  46. #ifdef _linux_
  47. YT_VERIFY(HandleEintr(close, EventFD_) == 0);
  48. #elif !defined(_win_)
  49. YT_VERIFY(HandleEintr(close, PipeFDs_[0]) == 0);
  50. YT_VERIFY(HandleEintr(close, PipeFDs_[1]) == 0);
  51. #endif
  52. }
  53. void TNotificationHandle::Raise()
  54. {
  55. #ifdef _linux_
  56. uint64_t one = 1;
  57. YT_VERIFY(HandleEintr(write, EventFD_, &one, sizeof(one)) == sizeof(one));
  58. #elif defined(_win_)
  59. char c = 'x';
  60. YT_VERIFY(Writer_.Write(&c, sizeof(char)) == sizeof(char));
  61. #else
  62. char c = 'x';
  63. YT_VERIFY(HandleEintr(write, PipeFDs_[1], &c, sizeof(char)) == sizeof(char));
  64. #endif
  65. }
  66. void TNotificationHandle::Clear()
  67. {
  68. #ifdef _linux_
  69. uint64_t count = 0;
  70. auto ret = HandleEintr(read, EventFD_, &count, sizeof(count));
  71. // For edge-triggered one could clear multiple events, others get nothing.
  72. YT_VERIFY(ret == sizeof(count) || (ret < 0 && errno == EAGAIN));
  73. #elif defined(_win_)
  74. while (true) {
  75. char c;
  76. auto ret = Reader_.Read(&c, sizeof(c));
  77. YT_VERIFY(ret == sizeof(c) || (ret == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK));
  78. if (ret == SOCKET_ERROR) {
  79. break;
  80. }
  81. }
  82. #else
  83. while (true) {
  84. char c;
  85. auto ret = HandleEintr(read, PipeFDs_[0], &c, sizeof(c));
  86. YT_VERIFY(ret == sizeof(c) || (ret < 0 && errno == EAGAIN));
  87. if (ret < 0) {
  88. break;
  89. }
  90. }
  91. #endif
  92. }
  93. int TNotificationHandle::GetFD() const
  94. {
  95. #ifdef _linux_
  96. return EventFD_;
  97. #elif defined(_win_)
  98. return Reader_;
  99. #else
  100. return PipeFDs_[0];
  101. #endif
  102. }
  103. ////////////////////////////////////////////////////////////////////////////////
  104. } // namespace NYT::NThreading