condvar.cpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #include "event.h"
  2. #include "mutex.h"
  3. #include "yassert.h"
  4. #include "condvar.h"
  5. #include "datetime.h"
  6. #include "spinlock.h"
  7. #include <util/generic/ylimits.h>
  8. #include <util/generic/intrlist.h>
  9. #include <util/generic/yexception.h>
  10. #if defined(_unix_)
  11. #include <sys/time.h>
  12. #include <pthread.h>
  13. #include <cerrno>
  14. #endif
  15. namespace {
  16. class TCondVarImpl {
  17. using TLock = TAdaptiveLock;
  18. struct TWaitEvent: public TIntrusiveListItem<TWaitEvent>, public TSystemEvent {
  19. };
  20. using TWaitEvents = TIntrusiveList<TWaitEvent>;
  21. public:
  22. inline ~TCondVarImpl() {
  23. Y_ASSERT(Events_.Empty());
  24. }
  25. inline void Signal() noexcept {
  26. with_lock (Lock_) {
  27. if (!Events_.Empty()) {
  28. Events_.PopFront()->Signal();
  29. }
  30. }
  31. }
  32. inline void BroadCast() noexcept {
  33. with_lock (Lock_) {
  34. // TODO
  35. while (!Events_.Empty()) {
  36. Events_.PopFront()->Signal();
  37. }
  38. }
  39. }
  40. inline bool WaitD(TMutex& m, TInstant deadLine) noexcept {
  41. TWaitEvent event;
  42. with_lock (Lock_) {
  43. Events_.PushBack(&event);
  44. }
  45. m.Release();
  46. const bool signalled = event.WaitD(deadLine);
  47. m.Acquire();
  48. with_lock (Lock_) {
  49. event.Unlink();
  50. }
  51. return signalled;
  52. }
  53. private:
  54. TWaitEvents Events_;
  55. TLock Lock_;
  56. };
  57. } // namespace
  58. #if defined(_win_)
  59. class TCondVar::TImpl: public TCondVarImpl {
  60. };
  61. #else
  62. class TCondVar::TImpl {
  63. public:
  64. inline TImpl() {
  65. if (pthread_cond_init(&Cond_, nullptr)) {
  66. ythrow yexception() << "can not create condvar(" << LastSystemErrorText() << ")";
  67. }
  68. }
  69. inline ~TImpl() {
  70. int ret = pthread_cond_destroy(&Cond_);
  71. Y_ABORT_UNLESS(ret == 0, "pthread_cond_destroy failed: %s", LastSystemErrorText(ret));
  72. }
  73. inline void Signal() noexcept {
  74. int ret = pthread_cond_signal(&Cond_);
  75. Y_ABORT_UNLESS(ret == 0, "pthread_cond_signal failed: %s", LastSystemErrorText(ret));
  76. }
  77. inline bool WaitD(TMutex& lock, TInstant deadLine) noexcept {
  78. if (deadLine == TInstant::Max()) {
  79. int ret = pthread_cond_wait(&Cond_, (pthread_mutex_t*)lock.Handle());
  80. Y_ABORT_UNLESS(ret == 0, "pthread_cond_wait failed: %s", LastSystemErrorText(ret));
  81. return true;
  82. } else {
  83. struct timespec spec;
  84. Zero(spec);
  85. spec.tv_sec = deadLine.Seconds();
  86. spec.tv_nsec = deadLine.NanoSecondsOfSecond();
  87. int ret = pthread_cond_timedwait(&Cond_, (pthread_mutex_t*)lock.Handle(), &spec);
  88. Y_ABORT_UNLESS(ret == 0 || ret == ETIMEDOUT, "pthread_cond_timedwait failed: %s", LastSystemErrorText(ret));
  89. return ret == 0;
  90. }
  91. }
  92. inline void BroadCast() noexcept {
  93. int ret = pthread_cond_broadcast(&Cond_);
  94. Y_ABORT_UNLESS(ret == 0, "pthread_cond_broadcast failed: %s", LastSystemErrorText(ret));
  95. }
  96. private:
  97. pthread_cond_t Cond_;
  98. };
  99. #endif
  100. TCondVar::TCondVar()
  101. : Impl_(new TImpl)
  102. {
  103. }
  104. TCondVar::~TCondVar() = default;
  105. void TCondVar::BroadCast() noexcept {
  106. Impl_->BroadCast();
  107. }
  108. void TCondVar::Signal() noexcept {
  109. Impl_->Signal();
  110. }
  111. bool TCondVar::WaitD(TMutex& mutex, TInstant deadLine) noexcept {
  112. return Impl_->WaitD(mutex, deadLine);
  113. }