event_count.h 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. #pragma once
  2. #ifndef _linux_
  3. #include <util/system/mutex.h>
  4. #include <util/system/condvar.h>
  5. #endif
  6. #include <limits>
  7. #include <atomic>
  8. namespace NYT::NThreading {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. //! Event count: a condition variable for lock free algorithms.
  11. /*!
  12. * This is an adapted version from Facebook's Folly. See
  13. * https://raw.github.com/facebook/folly/master/folly/experimental/EventCount.h
  14. * http://www.1024cores.net/home/lock-free-algorithms/eventcounts
  15. * for details.
  16. *
  17. * Event counts allow you to convert a non-blocking lock-free / wait-free
  18. * algorithm into a blocking one, by isolating the blocking logic. You call
  19. * PrepareWait() before checking your condition and then either CancelWait()
  20. * or Wait() depending on whether the condition was true. When another
  21. * thread makes the condition true, it must call NotifyOne() / NotifyAll() just
  22. * like a regular condition variable.
  23. *
  24. * Let "<" denote the happens-before relationship.
  25. * Consider 2 threads (T1 and T2) and 3 events:
  26. * - E1: T1 returns from PrepareWait
  27. * - E2: T1 calls Wait (obviously E1 < E2, intra-thread)
  28. * - E3: T2 calls NotifyAll
  29. *
  30. * If E1 < E3, then E2's Wait() will complete (and T1 will either wake up,
  31. * or not block at all)
  32. *
  33. * This means that you can use an EventCount in the following manner:
  34. *
  35. * Waiter:
  36. * if (!condition()) { // Handle fast path first.
  37. * for (;;) {
  38. * auto cookie = ec.PrepareWait();
  39. * if (condition()) {
  40. * ec.CancelWait();
  41. * break;
  42. * } else {
  43. * ec.Wait(cookie);
  44. * }
  45. * }
  46. * }
  47. *
  48. * (This pattern is encapsulated in Await())
  49. *
  50. * Poster:
  51. * ... make condition true...
  52. * ec.NotifyAll();
  53. *
  54. * Note that, just like with regular condition variables, the waiter needs to
  55. * be tolerant of spurious wakeups and needs to recheck the condition after
  56. * being woken up. Also, as there is no mutual exclusion implied, "checking"
  57. * the condition likely means attempting an operation on an underlying
  58. * data structure (push into a lock-free queue, etc) and returning true on
  59. * success and false on failure.
  60. */
  61. class TEventCount final
  62. {
  63. public:
  64. TEventCount() = default;
  65. TEventCount(const TEventCount&) = delete;
  66. TEventCount(TEventCount&&) = delete;
  67. class TCookie
  68. {
  69. public:
  70. explicit TCookie(ui32 epoch)
  71. : Epoch_(epoch)
  72. { }
  73. private:
  74. friend class TEventCount;
  75. ui32 Epoch_;
  76. };
  77. void NotifyOne();
  78. void NotifyAll();
  79. void NotifyMany(int count);
  80. TCookie PrepareWait();
  81. void CancelWait();
  82. bool Wait(TCookie cookie, TInstant deadline = TInstant::Max());
  83. bool Wait(TCookie cookie, TDuration timeout);
  84. //! Wait for |condition()| to become |true|.
  85. //! Will clean up appropriately if |condition()| throws, and then rethrow.
  86. template <class TCondition>
  87. bool Await(TCondition&& condition, TInstant deadline = TInstant::Max());
  88. template <class TCondition>
  89. bool Await(TCondition&& condition, TDuration timeout);
  90. private:
  91. //! Lower 32 bits: number of waiters.
  92. //! Upper 32 bits: epoch
  93. std::atomic<ui64> Value_ = 0;
  94. static constexpr ui64 AddWaiter = static_cast<ui64>(1);
  95. static constexpr ui64 SubWaiter = static_cast<ui64>(-1);
  96. static constexpr ui64 EpochShift = 32;
  97. static constexpr ui64 AddEpoch = static_cast<ui64>(1) << EpochShift;
  98. static constexpr ui64 WaiterMask = AddEpoch - 1;
  99. #ifndef _linux_
  100. TCondVar ConditionVariable_;
  101. TMutex Mutex_;
  102. #endif
  103. };
  104. ////////////////////////////////////////////////////////////////////////////////
  105. //! A single-shot non-resettable event implemented on top of TEventCount.
  106. class TEvent final
  107. {
  108. public:
  109. TEvent() = default;
  110. TEvent(const TEvent&) = delete;
  111. TEvent(TEvent&&) = delete;
  112. void NotifyOne();
  113. void NotifyAll();
  114. bool Test() const;
  115. bool Wait(TInstant deadline = TInstant::Max());
  116. bool Wait(TDuration timeout);
  117. private:
  118. std::atomic<bool> Set_ = false;
  119. TEventCount EventCount_;
  120. };
  121. ////////////////////////////////////////////////////////////////////////////////
  122. } // namespace NYT::NThreading
  123. #define EVENT_COUNT_INL_H_
  124. #include "event_count-inl.h"
  125. #undef EVENT_COUNT_INL_H_