blocking_queue.h 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. #pragma once
  2. #include <util/generic/deque.h>
  3. #include <util/generic/maybe.h>
  4. #include <util/generic/yexception.h>
  5. #include <util/system/condvar.h>
  6. #include <util/system/guard.h>
  7. #include <util/system/mutex.h>
  8. #include <utility>
  9. namespace NThreading {
  10. ///
  11. /// TBlockingQueue is a queue of elements of limited or unlimited size.
  12. /// Queue provides Push and Pop operations that block if operation can't be executed
  13. /// (queue is empty or maximum size is reached).
  14. ///
  15. /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false.
  16. ///
  17. /// All operations are thread safe.
  18. ///
  19. ///
  20. /// Example of usage:
  21. /// TBlockingQueue<int> queue;
  22. ///
  23. /// ...
  24. ///
  25. /// // thread 1
  26. /// queue.Push(42);
  27. /// queue.Push(100500);
  28. ///
  29. /// ...
  30. ///
  31. /// // thread 2
  32. /// while (TMaybe<int> number = queue.Pop()) {
  33. /// ProcessNumber(number.GetRef());
  34. /// }
  35. template <class TElement>
  36. class TBlockingQueue {
  37. public:
  38. ///
  39. /// Creates blocking queue with given maxSize
  40. /// if maxSize == 0 then queue is unlimited
  41. TBlockingQueue(size_t maxSize)
  42. : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize)
  43. , Stopped(false)
  44. {
  45. }
  46. ///
  47. /// Blocks until queue has some elements or queue is stopped or deadline is reached.
  48. /// Returns `Nothing` if queue is stopped or deadline is reached.
  49. /// Returns element otherwise.
  50. TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) {
  51. TGuard<TMutex> g(Lock);
  52. const auto canPop = [this]() { return CanPop(); };
  53. if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
  54. return Nothing();
  55. }
  56. if (Stopped && Queue.empty()) {
  57. return Nothing();
  58. }
  59. TElement e = std::move(Queue.front());
  60. Queue.pop_front();
  61. CanPushCV.Signal();
  62. return std::move(e);
  63. }
  64. TMaybe<TElement> Pop(TDuration duration) {
  65. return Pop(TInstant::Now() + duration);
  66. }
  67. ///
  68. /// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
  69. /// Returns false exception if queue is stopped and push failed or deadline is reached.
  70. /// Pushes element to queue and returns true otherwise.
  71. bool Push(const TElement& e, TInstant deadline = TInstant::Max()) {
  72. return PushRef(e, deadline);
  73. }
  74. bool Push(TElement&& e, TInstant deadline = TInstant::Max()) {
  75. return PushRef(std::move(e), deadline);
  76. }
  77. bool Push(const TElement& e, TDuration duration) {
  78. return Push(e, TInstant::Now() + duration);
  79. }
  80. bool Push(TElement&& e, TDuration duration) {
  81. return Push(std::move(e), TInstant::Now() + duration);
  82. }
  83. ///
  84. /// Stops the queue, all blocked operations will be aborted.
  85. void Stop() {
  86. TGuard<TMutex> g(Lock);
  87. Stopped = true;
  88. CanPopCV.BroadCast();
  89. CanPushCV.BroadCast();
  90. }
  91. ///
  92. /// Checks whether queue is empty.
  93. bool Empty() const {
  94. TGuard<TMutex> g(Lock);
  95. return Queue.empty();
  96. }
  97. ///
  98. /// Returns size of the queue.
  99. size_t Size() const {
  100. TGuard<TMutex> g(Lock);
  101. return Queue.size();
  102. }
  103. ///
  104. /// Checks whether queue is stopped.
  105. bool IsStopped() const {
  106. TGuard<TMutex> g(Lock);
  107. return Stopped;
  108. }
  109. private:
  110. bool CanPush() const {
  111. return Queue.size() < MaxSize || Stopped;
  112. }
  113. bool CanPop() const {
  114. return !Queue.empty() || Stopped;
  115. }
  116. template <typename Ref>
  117. bool PushRef(Ref e, TInstant deadline) {
  118. TGuard<TMutex> g(Lock);
  119. const auto canPush = [this]() { return CanPush(); };
  120. if (!CanPushCV.WaitD(Lock, deadline, canPush)) {
  121. return false;
  122. }
  123. if (Stopped) {
  124. return false;
  125. }
  126. Queue.push_back(std::forward<TElement>(e));
  127. CanPopCV.Signal();
  128. return true;
  129. }
  130. private:
  131. TMutex Lock;
  132. TCondVar CanPopCV;
  133. TCondVar CanPushCV;
  134. TDeque<TElement> Queue;
  135. size_t MaxSize;
  136. bool Stopped;
  137. };
  138. }