#ifndef EVENT_COUNT_INL_H_ #error "Direct inclusion of this file is not allowed, include event_count.h" // For the sake of sane code completion. #include "event_count.h" #endif #undef EVENT_COUNT_INL_H_ #include #include "futex.h" #include namespace NYT::NThreading { //////////////////////////////////////////////////////////////////////////////// inline void TEventCount::NotifyOne() { NotifyMany(1); } inline void TEventCount::NotifyAll() { NotifyMany(std::numeric_limits::max()); } inline void TEventCount::NotifyMany(int count) { // The order is important: Epoch is incremented before Waiters is checked. // prepareWait() increments Waiters before checking Epoch, so it is // impossible to miss a wakeup. #ifndef _linux_ TGuard guard(Mutex_); #endif ui64 prev = Value_.fetch_add(AddEpoch, std::memory_order::acq_rel); if (Y_UNLIKELY((prev & WaiterMask) != 0)) { #ifdef _linux_ FutexWake( reinterpret_cast(&Value_) + 1, // assume little-endian architecture count); #else if (count == 1) { ConditionVariable_.Signal(); } else { ConditionVariable_.BroadCast(); } #endif } } inline TEventCount::TCookie TEventCount::PrepareWait() { ui64 value = Value_.load(std::memory_order::acquire); return TCookie(static_cast(value >> EpochShift)); } inline void TEventCount::CancelWait() { } inline bool TEventCount::Wait(TCookie cookie, TInstant deadline) { Value_.fetch_add(AddWaiter, std::memory_order::acq_rel); bool result = true; #ifdef _linux_ while ((Value_.load(std::memory_order::acquire) >> EpochShift) == cookie.Epoch_) { auto timeout = deadline - TInstant::Now(); auto futexResult = FutexWait( reinterpret_cast(&Value_) + 1, // assume little-endian architecture cookie.Epoch_, timeout); if (futexResult != 0 && errno == ETIMEDOUT) { result = false; break; } } #else TGuard guard(Mutex_); if ((Value_.load(std::memory_order::acquire) >> EpochShift) == cookie.Epoch_) { result = ConditionVariable_.WaitD(Mutex_, deadline); } #endif ui64 prev = Value_.fetch_add(SubWaiter, std::memory_order::seq_cst); YT_ASSERT((prev & WaiterMask) != 0); return result; } inline bool TEventCount::Wait(TCookie cookie, TDuration timeout) { return Wait(cookie, timeout.ToDeadLine()); } template bool TEventCount::Await(TCondition&& condition, TInstant deadline) { if (condition()) { // Fast path. return true; } // condition() is the only thing that may throw, everything else is // noexcept, so we can hoist the try/catch block outside of the loop try { for (;;) { auto cookie = PrepareWait(); if (condition()) { CancelWait(); break; } if (!Wait(cookie, deadline)) { return false; } } } catch (...) { CancelWait(); throw; } return true; } template bool TEventCount::Await(TCondition&& condition, TDuration timeout) { return Await(std::forward(condition), timeout.ToDeadLine()); } //////////////////////////////////////////////////////////////////////////////// inline void TEvent::NotifyOne() { Set_.store(true, std::memory_order::release); EventCount_.NotifyOne(); } inline void TEvent::NotifyAll() { Set_.store(true, std::memory_order::release); EventCount_.NotifyAll(); } inline bool TEvent::Test() const { return Set_.load(std::memory_order::acquire); } inline bool TEvent::Wait(TInstant deadline) { return EventCount_.Await( [&] { return Set_.load(std::memory_order::acquire); }, deadline); } inline bool TEvent::Wait(TDuration timeout) { return Wait(timeout.ToDeadLine()); } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NThreading