123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- #ifndef EVENT_COUNT_INL_H_
- #error "Direct inclusion of this file is not allowed, include event_count.h"
- // For the sake of sane code completion.
- #include "event_count.h"
- #endif
- #undef EVENT_COUNT_INL_H_
- #include <library/cpp/yt/assert/assert.h>
- #include "futex.h"
- #include <errno.h>
- namespace NYT::NThreading {
- ////////////////////////////////////////////////////////////////////////////////
- inline void TEventCount::NotifyOne()
- {
- NotifyMany(1);
- }
- inline void TEventCount::NotifyAll()
- {
- NotifyMany(std::numeric_limits<int>::max());
- }
- inline void TEventCount::NotifyMany(int count)
- {
- // The order is important: Epoch is incremented before Waiters is checked.
- // prepareWait() increments Waiters before checking Epoch, so it is
- // impossible to miss a wakeup.
- #ifndef _linux_
- TGuard<TMutex> guard(Mutex_);
- #endif
- ui64 prev = Value_.fetch_add(AddEpoch, std::memory_order::acq_rel);
- if (Y_UNLIKELY((prev & WaiterMask) != 0)) {
- #ifdef _linux_
- FutexWake(
- reinterpret_cast<int*>(&Value_) + 1, // assume little-endian architecture
- count);
- #else
- if (count == 1) {
- ConditionVariable_.Signal();
- } else {
- ConditionVariable_.BroadCast();
- }
- #endif
- }
- }
- inline TEventCount::TCookie TEventCount::PrepareWait()
- {
- ui64 value = Value_.load(std::memory_order::acquire);
- return TCookie(static_cast<ui32>(value >> EpochShift));
- }
- inline void TEventCount::CancelWait()
- { }
- inline bool TEventCount::Wait(TCookie cookie, TInstant deadline)
- {
- Value_.fetch_add(AddWaiter, std::memory_order::acq_rel);
- bool result = true;
- #ifdef _linux_
- while ((Value_.load(std::memory_order::acquire) >> EpochShift) == cookie.Epoch_) {
- auto timeout = deadline - TInstant::Now();
- auto futexResult = FutexWait(
- reinterpret_cast<int*>(&Value_) + 1, // assume little-endian architecture
- cookie.Epoch_,
- timeout);
- if (futexResult != 0 && errno == ETIMEDOUT) {
- result = false;
- break;
- }
- }
- #else
- TGuard<TMutex> guard(Mutex_);
- if ((Value_.load(std::memory_order::acquire) >> EpochShift) == cookie.Epoch_) {
- result = ConditionVariable_.WaitD(Mutex_, deadline);
- }
- #endif
- ui64 prev = Value_.fetch_add(SubWaiter, std::memory_order::seq_cst);
- YT_ASSERT((prev & WaiterMask) != 0);
- return result;
- }
- inline bool TEventCount::Wait(TCookie cookie, TDuration timeout)
- {
- return Wait(cookie, timeout.ToDeadLine());
- }
- template <class TCondition>
- bool TEventCount::Await(TCondition&& condition, TInstant deadline)
- {
- if (condition()) {
- // Fast path.
- return true;
- }
- // condition() is the only thing that may throw, everything else is
- // noexcept, so we can hoist the try/catch block outside of the loop
- try {
- for (;;) {
- auto cookie = PrepareWait();
- if (condition()) {
- CancelWait();
- break;
- }
- if (!Wait(cookie, deadline)) {
- return false;
- }
- }
- } catch (...) {
- CancelWait();
- throw;
- }
- return true;
- }
- template <class TCondition>
- bool TEventCount::Await(TCondition&& condition, TDuration timeout)
- {
- return Await(std::forward<TCondition>(condition), timeout.ToDeadLine());
- }
- ////////////////////////////////////////////////////////////////////////////////
- inline void TEvent::NotifyOne()
- {
- Set_.store(true, std::memory_order::release);
- EventCount_.NotifyOne();
- }
- inline void TEvent::NotifyAll()
- {
- Set_.store(true, std::memory_order::release);
- EventCount_.NotifyAll();
- }
- inline bool TEvent::Test() const
- {
- return Set_.load(std::memory_order::acquire);
- }
- inline bool TEvent::Wait(TInstant deadline)
- {
- return EventCount_.Await(
- [&] {
- return Set_.load(std::memory_order::acquire);
- },
- deadline);
- }
- inline bool TEvent::Wait(TDuration timeout)
- {
- return Wait(timeout.ToDeadLine());
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT::NThreading
|