equeue.h 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. #pragma once
  2. #include <util/thread/pool.h>
  3. #include <util/datetime/base.h>
  4. #include <util/thread/lfqueue.h>
  5. #include <util/system/thread.h>
  6. #include <util/generic/bitops.h>
  7. #include <util/generic/vector.h>
  8. #include <util/generic/scope.h>
  9. #include <util/stream/str.h>
  10. #include <library/cpp/threading/bounded_queue/bounded_queue.h>
  11. #include <library/cpp/yt/threading/event_count.h>
  12. class TFastElasticQueue
  13. : public TThreadPoolBase
  14. , private IThreadFactory::IThreadAble
  15. {
  16. public:
  17. explicit TFastElasticQueue(const TParams& params = {})
  18. : TThreadPoolBase(params)
  19. {
  20. Y_ENSURE(!params.Blocking_);
  21. }
  22. ~TFastElasticQueue() {
  23. Stop();
  24. }
  25. void Start(size_t threadCount, size_t maxQueueSize) override {
  26. Y_ENSURE(Threads_.empty());
  27. Y_ENSURE(maxQueueSize > 0);
  28. Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
  29. MaxQueueSize_ = maxQueueSize;
  30. try {
  31. for (size_t i = 0; i < threadCount; ++i) {
  32. Threads_.push_back(Pool()->Run(this));
  33. }
  34. } catch (...) {
  35. Stop();
  36. throw;
  37. }
  38. Stopped_ = false;
  39. }
  40. size_t ObjectCount() const {
  41. //GuardCount_ can be temporary incremented above real object count in queue
  42. return Min(GuardCount_.load(), MaxQueueSize_);
  43. }
  44. bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
  45. if (Stopped_ || !obj) {
  46. return false;
  47. }
  48. if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
  49. GuardCount_.fetch_sub(1);
  50. return false;
  51. }
  52. QueueSize_.fetch_add(1);
  53. if (!Queue_->Enqueue(obj)) {
  54. //Simultaneous Dequeue calls can return not in exact fifo order of items,
  55. //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
  56. //the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
  57. GuardCount_.fetch_sub(1);
  58. QueueSize_.fetch_sub(1);
  59. return false;
  60. }
  61. Event_.NotifyOne();
  62. return true;
  63. }
  64. size_t Size() const noexcept override {
  65. return QueueSize_.load();
  66. }
  67. void Stop() noexcept override {
  68. Stopped_ = true;
  69. for (size_t i = 0; i < Threads_.size(); ++i) {
  70. while (!Queue_->Enqueue(nullptr)) {
  71. Sleep(TDuration::MilliSeconds(1));
  72. }
  73. Event_.NotifyOne();
  74. }
  75. while (!Threads_.empty()) {
  76. Threads_.back()->Join();
  77. Threads_.pop_back();
  78. }
  79. Queue_.Reset();
  80. }
  81. void DoExecute() override {
  82. TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
  83. while (true) {
  84. IObjectInQueue* job = nullptr;
  85. Event_.Await([&]() {
  86. return Queue_->Dequeue(job);
  87. });
  88. if (!job) {
  89. break;
  90. }
  91. QueueSize_.fetch_sub(1);
  92. Y_DEFER {
  93. GuardCount_.fetch_sub(1);
  94. };
  95. if (Params.Catching_) {
  96. try {
  97. try {
  98. job->Process(nullptr);
  99. } catch (...) {
  100. Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
  101. }
  102. } catch (...) {
  103. ;
  104. }
  105. } else {
  106. job->Process(nullptr);
  107. }
  108. }
  109. }
  110. private:
  111. std::atomic<bool> Stopped_ = false;
  112. size_t MaxQueueSize_ = 0;
  113. alignas(64) std::atomic<size_t> GuardCount_ = 0;
  114. alignas(64) std::atomic<size_t> QueueSize_ = 0;
  115. TVector<THolder<IThreadFactory::IThread>> Threads_;
  116. THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
  117. NYT::NThreading::TEventCount Event_;
  118. };