equeue.h 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #pragma once
  2. #include <util/thread/pool.h>
  3. #include <util/datetime/base.h>
  4. #include <util/thread/lfqueue.h>
  5. #include <util/system/thread.h>
  6. #include <util/generic/vector.h>
  7. #include <util/generic/scope.h>
  8. #include <util/stream/str.h>
  9. #include <library/cpp/threading/bounded_queue/bounded_queue.h>
  10. #include <library/cpp/yt/threading/event_count.h>
  11. class TFastElasticQueue
  12. : public TThreadPoolBase
  13. , private IThreadFactory::IThreadAble
  14. {
  15. public:
  16. explicit TFastElasticQueue(const TParams& params = {})
  17. : TThreadPoolBase(params)
  18. {
  19. Y_ENSURE(!params.Blocking_);
  20. }
  21. ~TFastElasticQueue() {
  22. Stop();
  23. }
  24. void Start(size_t threadCount, size_t maxQueueSize) override {
  25. Y_ENSURE(Threads_.empty());
  26. Y_ENSURE(maxQueueSize > 0);
  27. Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
  28. MaxQueueSize_ = maxQueueSize;
  29. try {
  30. for (size_t i = 0; i < threadCount; ++i) {
  31. Threads_.push_back(Pool()->Run(this));
  32. }
  33. } catch (...) {
  34. Stop();
  35. throw;
  36. }
  37. Stopped_ = false;
  38. }
  39. size_t ObjectCount() const {
  40. //GuardCount_ can be temporary incremented above real object count in queue
  41. return Min(GuardCount_.load(), MaxQueueSize_);
  42. }
  43. bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
  44. if (Stopped_ || !obj) {
  45. return false;
  46. }
  47. if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
  48. GuardCount_.fetch_sub(1);
  49. return false;
  50. }
  51. QueueSize_.fetch_add(1);
  52. if (!Queue_->Enqueue(obj)) {
  53. //Simultaneous Dequeue calls can return not in exact fifo order of items,
  54. //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
  55. //the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
  56. GuardCount_.fetch_sub(1);
  57. QueueSize_.fetch_sub(1);
  58. return false;
  59. }
  60. Event_.NotifyOne();
  61. return true;
  62. }
  63. size_t Size() const noexcept override {
  64. return QueueSize_.load();
  65. }
  66. void Stop() noexcept override {
  67. Stopped_ = true;
  68. for (size_t i = 0; i < Threads_.size(); ++i) {
  69. while (!Queue_->Enqueue(nullptr)) {
  70. Sleep(TDuration::MilliSeconds(1));
  71. }
  72. Event_.NotifyOne();
  73. }
  74. while (!Threads_.empty()) {
  75. Threads_.back()->Join();
  76. Threads_.pop_back();
  77. }
  78. Queue_.Reset();
  79. }
  80. void DoExecute() override {
  81. TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
  82. while (true) {
  83. IObjectInQueue* job = nullptr;
  84. Event_.Await([&]() {
  85. return Queue_->Dequeue(job);
  86. });
  87. if (!job) {
  88. break;
  89. }
  90. QueueSize_.fetch_sub(1);
  91. Y_DEFER {
  92. GuardCount_.fetch_sub(1);
  93. };
  94. if (Params.Catching_) {
  95. try {
  96. try {
  97. job->Process(nullptr);
  98. } catch (...) {
  99. Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
  100. }
  101. } catch (...) {
  102. ;
  103. }
  104. } else {
  105. job->Process(nullptr);
  106. }
  107. }
  108. }
  109. private:
  110. std::atomic<bool> Stopped_ = false;
  111. size_t MaxQueueSize_ = 0;
  112. alignas(64) std::atomic<size_t> GuardCount_ = 0;
  113. alignas(64) std::atomic<size_t> QueueSize_ = 0;
  114. TVector<THolder<IThreadFactory::IThread>> Threads_;
  115. THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
  116. NYT::NThreading::TEventCount Event_;
  117. };