condvar.cpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. #include "event.h"
  2. #include "mutex.h"
  3. #include "yassert.h"
  4. #include "condvar.h"
  5. #include "datetime.h"
  6. #include "spinlock.h"
  7. #include <util/generic/ylimits.h>
  8. #include <util/generic/intrlist.h>
  9. #include <util/generic/yexception.h>
  10. #include <cstdlib>
  11. #if defined(_unix_)
  12. #include <sys/time.h>
  13. #include <pthread.h>
  14. #include <cerrno>
  15. #endif
  16. namespace {
  17. class TCondVarImpl {
  18. using TLock = TAdaptiveLock;
  19. struct TWaitEvent: public TIntrusiveListItem<TWaitEvent>, public TSystemEvent {
  20. };
  21. using TWaitEvents = TIntrusiveList<TWaitEvent>;
  22. public:
  23. inline ~TCondVarImpl() {
  24. Y_ASSERT(Events_.Empty());
  25. }
  26. inline void Signal() noexcept {
  27. with_lock (Lock_) {
  28. if (!Events_.Empty()) {
  29. Events_.PopFront()->Signal();
  30. }
  31. }
  32. }
  33. inline void BroadCast() noexcept {
  34. with_lock (Lock_) {
  35. //TODO
  36. while (!Events_.Empty()) {
  37. Events_.PopFront()->Signal();
  38. }
  39. }
  40. }
  41. inline bool WaitD(TMutex& m, TInstant deadLine) noexcept {
  42. TWaitEvent event;
  43. with_lock (Lock_) {
  44. Events_.PushBack(&event);
  45. }
  46. m.Release();
  47. const bool signalled = event.WaitD(deadLine);
  48. m.Acquire();
  49. with_lock (Lock_) {
  50. event.Unlink();
  51. }
  52. return signalled;
  53. }
  54. private:
  55. TWaitEvents Events_;
  56. TLock Lock_;
  57. };
  58. }
  59. #if defined(_win_)
  60. class TCondVar::TImpl: public TCondVarImpl {
  61. };
  62. #else
  63. class TCondVar::TImpl {
  64. public:
  65. inline TImpl() {
  66. if (pthread_cond_init(&Cond_, nullptr)) {
  67. ythrow yexception() << "can not create condvar(" << LastSystemErrorText() << ")";
  68. }
  69. }
  70. inline ~TImpl() {
  71. int ret = pthread_cond_destroy(&Cond_);
  72. Y_VERIFY(ret == 0, "pthread_cond_destroy failed: %s", LastSystemErrorText(ret));
  73. }
  74. inline void Signal() noexcept {
  75. int ret = pthread_cond_signal(&Cond_);
  76. Y_VERIFY(ret == 0, "pthread_cond_signal failed: %s", LastSystemErrorText(ret));
  77. }
  78. inline bool WaitD(TMutex& lock, TInstant deadLine) noexcept {
  79. if (deadLine == TInstant::Max()) {
  80. int ret = pthread_cond_wait(&Cond_, (pthread_mutex_t*)lock.Handle());
  81. Y_VERIFY(ret == 0, "pthread_cond_wait failed: %s", LastSystemErrorText(ret));
  82. return true;
  83. } else {
  84. struct timespec spec;
  85. Zero(spec);
  86. spec.tv_sec = deadLine.Seconds();
  87. spec.tv_nsec = deadLine.NanoSecondsOfSecond();
  88. int ret = pthread_cond_timedwait(&Cond_, (pthread_mutex_t*)lock.Handle(), &spec);
  89. Y_VERIFY(ret == 0 || ret == ETIMEDOUT, "pthread_cond_timedwait failed: %s", LastSystemErrorText(ret));
  90. return ret == 0;
  91. }
  92. }
  93. inline void BroadCast() noexcept {
  94. int ret = pthread_cond_broadcast(&Cond_);
  95. Y_VERIFY(ret == 0, "pthread_cond_broadcast failed: %s", LastSystemErrorText(ret));
  96. }
  97. private:
  98. pthread_cond_t Cond_;
  99. };
  100. #endif
  101. TCondVar::TCondVar()
  102. : Impl_(new TImpl)
  103. {
  104. }
  105. TCondVar::~TCondVar() = default;
  106. void TCondVar::BroadCast() noexcept {
  107. Impl_->BroadCast();
  108. }
  109. void TCondVar::Signal() noexcept {
  110. Impl_->Signal();
  111. }
  112. bool TCondVar::WaitD(TMutex& mutex, TInstant deadLine) noexcept {
  113. return Impl_->WaitD(mutex, deadLine);
  114. }