123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- #pragma once
- #include <util/generic/deque.h>
- #include <util/generic/maybe.h>
- #include <util/generic/yexception.h>
- #include <util/system/condvar.h>
- #include <util/system/guard.h>
- #include <util/system/mutex.h>
- #include <utility>
- namespace NThreading {
- ///
- /// TBlockingQueue is a queue of elements of limited or unlimited size.
- /// Queue provides Push and Pop operations that block if operation can't be executed
- /// (queue is empty or maximum size is reached).
- ///
- /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false.
- ///
- /// All operations are thread safe.
- ///
- ///
- /// Example of usage:
- /// TBlockingQueue<int> queue;
- ///
- /// ...
- ///
- /// // thread 1
- /// queue.Push(42);
- /// queue.Push(100500);
- ///
- /// ...
- ///
- /// // thread 2
- /// while (TMaybe<int> number = queue.Pop()) {
- /// ProcessNumber(number.GetRef());
- /// }
- template <class TElement>
- class TBlockingQueue {
- public:
- ///
- /// Creates blocking queue with given maxSize
- /// if maxSize == 0 then queue is unlimited
- TBlockingQueue(size_t maxSize)
- : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize)
- , Stopped(false)
- {
- }
- ///
- /// Blocks until queue has some elements or queue is stopped or deadline is reached.
- /// Returns `Nothing` if queue is stopped or deadline is reached.
- /// Returns element otherwise.
- TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) {
- TGuard<TMutex> g(Lock);
- const auto canPop = [this]() { return CanPop(); };
- if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
- return Nothing();
- }
- if (Stopped && Queue.empty()) {
- return Nothing();
- }
- TElement e = std::move(Queue.front());
- Queue.pop_front();
- CanPushCV.Signal();
- return std::move(e);
- }
- TMaybe<TElement> Pop(TDuration duration) {
- return Pop(TInstant::Now() + duration);
- }
- ///
- /// Blocks until queue has some elements or queue is stopped or deadline is reached.
- /// Returns empty internal deque if queue is stopped or deadline is reached.
- /// Returns iternal deque element otherwise.
- TDeque<TElement> Drain(TInstant deadline = TInstant::Max()) {
- TGuard<TMutex> g(Lock);
- const auto canPop = [this]() { return CanPop(); };
- if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
- return {};
- }
- TDeque<TElement> result;
- std::swap(result, Queue);
- CanPushCV.BroadCast();
- return result;
- }
- TDeque<TElement> Drain(TDuration duration) {
- return Drain(TInstant::Now() + duration);
- }
- ///
- /// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
- /// Returns false exception if queue is stopped and push failed or deadline is reached.
- /// Pushes element to queue and returns true otherwise.
- bool Push(const TElement& e, TInstant deadline = TInstant::Max()) {
- return PushRef(e, deadline);
- }
- bool Push(TElement&& e, TInstant deadline = TInstant::Max()) {
- return PushRef(std::move(e), deadline);
- }
- bool Push(const TElement& e, TDuration duration) {
- return Push(e, TInstant::Now() + duration);
- }
- bool Push(TElement&& e, TDuration duration) {
- return Push(std::move(e), TInstant::Now() + duration);
- }
- ///
- /// Stops the queue, all blocked operations will be aborted.
- void Stop() {
- TGuard<TMutex> g(Lock);
- Stopped = true;
- CanPopCV.BroadCast();
- CanPushCV.BroadCast();
- }
- ///
- /// Checks whether queue is empty.
- bool Empty() const {
- TGuard<TMutex> g(Lock);
- return Queue.empty();
- }
- ///
- /// Returns size of the queue.
- size_t Size() const {
- TGuard<TMutex> g(Lock);
- return Queue.size();
- }
- ///
- /// Checks whether queue is stopped.
- bool IsStopped() const {
- TGuard<TMutex> g(Lock);
- return Stopped;
- }
- private:
- bool CanPush() const {
- return Queue.size() < MaxSize || Stopped;
- }
- bool CanPop() const {
- return !Queue.empty() || Stopped;
- }
- template <typename Ref>
- bool PushRef(Ref e, TInstant deadline) {
- TGuard<TMutex> g(Lock);
- const auto canPush = [this]() { return CanPush(); };
- if (!CanPushCV.WaitD(Lock, deadline, canPush)) {
- return false;
- }
- if (Stopped) {
- return false;
- }
- Queue.push_back(std::forward<TElement>(e));
- CanPopCV.Signal();
- return true;
- }
- private:
- TMutex Lock;
- TCondVar CanPopCV;
- TCondVar CanPushCV;
- TDeque<TElement> Queue;
- size_t MaxSize;
- bool Stopped;
- };
- }
|