event_count-inl.h 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #ifndef EVENT_COUNT_INL_H_
  2. #error "Direct inclusion of this file is not allowed, include event_count.h"
  3. // For the sake of sane code completion.
  4. #include "event_count.h"
  5. #endif
  6. #undef EVENT_COUNT_INL_H_
  7. #include <library/cpp/yt/assert/assert.h>
  8. #include "futex.h"
  9. #include <errno.h>
  10. namespace NYT::NThreading {
  11. ////////////////////////////////////////////////////////////////////////////////
  12. inline void TEventCount::NotifyOne()
  13. {
  14. NotifyMany(1);
  15. }
  16. inline void TEventCount::NotifyAll()
  17. {
  18. NotifyMany(std::numeric_limits<int>::max());
  19. }
  20. inline void TEventCount::NotifyMany(int count)
  21. {
  22. // The order is important: Epoch is incremented before Waiters is checked.
  23. // prepareWait() increments Waiters before checking Epoch, so it is
  24. // impossible to miss a wakeup.
  25. #ifndef _linux_
  26. TGuard<TMutex> guard(Mutex_);
  27. #endif
  28. ui64 prev = Value_.fetch_add(AddEpoch, std::memory_order::acq_rel);
  29. if (Y_UNLIKELY((prev & WaiterMask) != 0)) {
  30. #ifdef _linux_
  31. FutexWake(
  32. reinterpret_cast<int*>(&Value_) + 1, // assume little-endian architecture
  33. count);
  34. #else
  35. if (count == 1) {
  36. ConditionVariable_.Signal();
  37. } else {
  38. ConditionVariable_.BroadCast();
  39. }
  40. #endif
  41. }
  42. }
  43. inline TEventCount::TCookie TEventCount::PrepareWait()
  44. {
  45. ui64 value = Value_.load(std::memory_order::acquire);
  46. return TCookie(static_cast<ui32>(value >> EpochShift));
  47. }
  48. inline void TEventCount::CancelWait()
  49. { }
  50. inline bool TEventCount::Wait(TCookie cookie, TInstant deadline)
  51. {
  52. Value_.fetch_add(AddWaiter, std::memory_order::acq_rel);
  53. bool result = true;
  54. #ifdef _linux_
  55. while ((Value_.load(std::memory_order::acquire) >> EpochShift) == cookie.Epoch_) {
  56. auto timeout = deadline - TInstant::Now();
  57. auto futexResult = FutexWait(
  58. reinterpret_cast<int*>(&Value_) + 1, // assume little-endian architecture
  59. cookie.Epoch_,
  60. timeout);
  61. if (futexResult != 0 && errno == ETIMEDOUT) {
  62. result = false;
  63. break;
  64. }
  65. }
  66. #else
  67. TGuard<TMutex> guard(Mutex_);
  68. if ((Value_.load(std::memory_order::acquire) >> EpochShift) == cookie.Epoch_) {
  69. result = ConditionVariable_.WaitD(Mutex_, deadline);
  70. }
  71. #endif
  72. ui64 prev = Value_.fetch_add(SubWaiter, std::memory_order::seq_cst);
  73. YT_ASSERT((prev & WaiterMask) != 0);
  74. return result;
  75. }
  76. inline bool TEventCount::Wait(TCookie cookie, TDuration timeout)
  77. {
  78. return Wait(cookie, timeout.ToDeadLine());
  79. }
  80. template <class TCondition>
  81. bool TEventCount::Await(TCondition&& condition, TInstant deadline)
  82. {
  83. if (condition()) {
  84. // Fast path.
  85. return true;
  86. }
  87. // condition() is the only thing that may throw, everything else is
  88. // noexcept, so we can hoist the try/catch block outside of the loop
  89. try {
  90. for (;;) {
  91. auto cookie = PrepareWait();
  92. if (condition()) {
  93. CancelWait();
  94. break;
  95. }
  96. if (!Wait(cookie, deadline)) {
  97. return false;
  98. }
  99. }
  100. } catch (...) {
  101. CancelWait();
  102. throw;
  103. }
  104. return true;
  105. }
  106. template <class TCondition>
  107. bool TEventCount::Await(TCondition&& condition, TDuration timeout)
  108. {
  109. return Await(std::forward<TCondition>(condition), timeout.ToDeadLine());
  110. }
  111. ////////////////////////////////////////////////////////////////////////////////
  112. inline void TEvent::NotifyOne()
  113. {
  114. Set_.store(true, std::memory_order::release);
  115. EventCount_.NotifyOne();
  116. }
  117. inline void TEvent::NotifyAll()
  118. {
  119. Set_.store(true, std::memory_order::release);
  120. EventCount_.NotifyAll();
  121. }
  122. inline bool TEvent::Test() const
  123. {
  124. return Set_.load(std::memory_order::acquire);
  125. }
  126. inline bool TEvent::Wait(TInstant deadline)
  127. {
  128. return EventCount_.Await(
  129. [&] {
  130. return Set_.load(std::memory_order::acquire);
  131. },
  132. deadline);
  133. }
  134. inline bool TEvent::Wait(TDuration timeout)
  135. {
  136. return Wait(timeout.ToDeadLine());
  137. }
  138. ////////////////////////////////////////////////////////////////////////////////
  139. } // namespace NYT::NThreading