count_down_latch.cpp 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. #include "count_down_latch.h"
  2. #include "futex.h"
  3. #include <library/cpp/yt/threading/futex.h>
  4. #include <library/cpp/yt/assert/assert.h>
  5. #include <cerrno>
  6. namespace NYT::NThreading {
  7. ////////////////////////////////////////////////////////////////////////////////
  8. TCountDownLatch::TCountDownLatch(int count)
  9. : Count_(count)
  10. { }
  11. void TCountDownLatch::CountDown()
  12. {
  13. #ifndef _linux_
  14. TGuard<TMutex> guard(Mutex_);
  15. #endif
  16. auto previous = Count_.fetch_sub(1, std::memory_order::release);
  17. if (previous == 1) {
  18. #ifdef _linux_
  19. int rv = NThreading::FutexWake(
  20. reinterpret_cast<int*>(&Count_),
  21. std::numeric_limits<int>::max());
  22. YT_VERIFY(rv >= 0);
  23. #else
  24. ConditionVariable_.BroadCast();
  25. #endif
  26. }
  27. }
  28. void TCountDownLatch::Wait() const
  29. {
  30. while (true) {
  31. #ifndef _linux_
  32. TGuard<TMutex> guard(Mutex_);
  33. #endif
  34. auto count = Count_.load(std::memory_order::acquire);
  35. if (count == 0) {
  36. return;
  37. }
  38. #ifdef _linux_
  39. int rv = NThreading::FutexWait(
  40. const_cast<int*>(reinterpret_cast<const int*>(&Count_)),
  41. count);
  42. YT_VERIFY(rv >= 0 || errno == EWOULDBLOCK || errno == EINTR);
  43. #else
  44. ConditionVariable_.WaitI(Mutex_);
  45. #endif
  46. }
  47. }
  48. bool TCountDownLatch::TryWait() const
  49. {
  50. return Count_.load(std::memory_order::acquire) == 0;
  51. }
  52. int TCountDownLatch::GetCount() const
  53. {
  54. return Count_.load(std::memory_order::relaxed);
  55. }
  56. ////////////////////////////////////////////////////////////////////////////////
  57. } // namespace NYT::NThreading