blocking_queue.h 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #pragma once
  2. #include <util/generic/deque.h>
  3. #include <util/generic/maybe.h>
  4. #include <util/generic/yexception.h>
  5. #include <util/system/condvar.h>
  6. #include <util/system/guard.h>
  7. #include <util/system/mutex.h>
  8. #include <utility>
  9. namespace NThreading {
  10. ///
  11. /// TBlockingQueue is a queue of elements of limited or unlimited size.
  12. /// Queue provides Push and Pop operations that block if operation can't be executed
  13. /// (queue is empty or maximum size is reached).
  14. ///
  15. /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false.
  16. ///
  17. /// All operations are thread safe.
  18. ///
  19. ///
  20. /// Example of usage:
  21. /// TBlockingQueue<int> queue;
  22. ///
  23. /// ...
  24. ///
  25. /// // thread 1
  26. /// queue.Push(42);
  27. /// queue.Push(100500);
  28. ///
  29. /// ...
  30. ///
  31. /// // thread 2
  32. /// while (TMaybe<int> number = queue.Pop()) {
  33. /// ProcessNumber(number.GetRef());
  34. /// }
  35. template <class TElement>
  36. class TBlockingQueue {
  37. public:
  38. ///
  39. /// Creates blocking queue with given maxSize
  40. /// if maxSize == 0 then queue is unlimited
  41. TBlockingQueue(size_t maxSize)
  42. : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize)
  43. , Stopped(false)
  44. {
  45. }
  46. ///
  47. /// Blocks until queue has some elements or queue is stopped or deadline is reached.
  48. /// Returns `Nothing` if queue is stopped or deadline is reached.
  49. /// Returns element otherwise.
  50. TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) {
  51. TGuard<TMutex> g(Lock);
  52. const auto canPop = [this]() { return CanPop(); };
  53. if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
  54. return Nothing();
  55. }
  56. if (Stopped && Queue.empty()) {
  57. return Nothing();
  58. }
  59. TElement e = std::move(Queue.front());
  60. Queue.pop_front();
  61. CanPushCV.Signal();
  62. return std::move(e);
  63. }
  64. TMaybe<TElement> Pop(TDuration duration) {
  65. return Pop(TInstant::Now() + duration);
  66. }
  67. ///
  68. /// Blocks until queue has some elements or queue is stopped or deadline is reached.
  69. /// Returns empty internal deque if queue is stopped or deadline is reached.
  70. /// Returns iternal deque element otherwise.
  71. TDeque<TElement> Drain(TInstant deadline = TInstant::Max()) {
  72. TGuard<TMutex> g(Lock);
  73. const auto canPop = [this]() { return CanPop(); };
  74. if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
  75. return {};
  76. }
  77. TDeque<TElement> result;
  78. std::swap(result, Queue);
  79. CanPushCV.BroadCast();
  80. return result;
  81. }
  82. TDeque<TElement> Drain(TDuration duration) {
  83. return Drain(TInstant::Now() + duration);
  84. }
  85. ///
  86. /// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
  87. /// Returns false exception if queue is stopped and push failed or deadline is reached.
  88. /// Pushes element to queue and returns true otherwise.
  89. bool Push(const TElement& e, TInstant deadline = TInstant::Max()) {
  90. return PushRef(e, deadline);
  91. }
  92. bool Push(TElement&& e, TInstant deadline = TInstant::Max()) {
  93. return PushRef(std::move(e), deadline);
  94. }
  95. bool Push(const TElement& e, TDuration duration) {
  96. return Push(e, TInstant::Now() + duration);
  97. }
  98. bool Push(TElement&& e, TDuration duration) {
  99. return Push(std::move(e), TInstant::Now() + duration);
  100. }
  101. ///
  102. /// Stops the queue, all blocked operations will be aborted.
  103. void Stop() {
  104. TGuard<TMutex> g(Lock);
  105. Stopped = true;
  106. CanPopCV.BroadCast();
  107. CanPushCV.BroadCast();
  108. }
  109. ///
  110. /// Checks whether queue is empty.
  111. bool Empty() const {
  112. TGuard<TMutex> g(Lock);
  113. return Queue.empty();
  114. }
  115. ///
  116. /// Returns size of the queue.
  117. size_t Size() const {
  118. TGuard<TMutex> g(Lock);
  119. return Queue.size();
  120. }
  121. ///
  122. /// Checks whether queue is stopped.
  123. bool IsStopped() const {
  124. TGuard<TMutex> g(Lock);
  125. return Stopped;
  126. }
  127. private:
  128. bool CanPush() const {
  129. return Queue.size() < MaxSize || Stopped;
  130. }
  131. bool CanPop() const {
  132. return !Queue.empty() || Stopped;
  133. }
  134. template <typename Ref>
  135. bool PushRef(Ref e, TInstant deadline) {
  136. TGuard<TMutex> g(Lock);
  137. const auto canPush = [this]() { return CanPush(); };
  138. if (!CanPushCV.WaitD(Lock, deadline, canPush)) {
  139. return false;
  140. }
  141. if (Stopped) {
  142. return false;
  143. }
  144. Queue.push_back(std::forward<TElement>(e));
  145. CanPopCV.Signal();
  146. return true;
  147. }
  148. private:
  149. TMutex Lock;
  150. TCondVar CanPopCV;
  151. TCondVar CanPushCV;
  152. TDeque<TElement> Queue;
  153. size_t MaxSize;
  154. bool Stopped;
  155. };
  156. }