equeue.cpp 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. #include "equeue.h"
  2. TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue)
  3. : SlaveQueue_(std::move(slaveQueue))
  4. {
  5. }
  6. size_t TElasticQueue::ObjectCount() const {
  7. return (size_t)AtomicGet(ObjectCount_);
  8. }
  9. bool TElasticQueue::TryIncCounter() {
  10. if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) {
  11. AtomicDecrement(GuardCount_);
  12. return false;
  13. }
  14. return true;
  15. }
  16. class TElasticQueue::TDecrementingWrapper: TNonCopyable, public IObjectInQueue {
  17. public:
  18. TDecrementingWrapper(IObjectInQueue* realObject, TElasticQueue* queue)
  19. : RealObject_(realObject)
  20. , Queue_(queue)
  21. {
  22. AtomicIncrement(Queue_->ObjectCount_);
  23. }
  24. ~TDecrementingWrapper() override {
  25. AtomicDecrement(Queue_->ObjectCount_);
  26. AtomicDecrement(Queue_->GuardCount_);
  27. }
  28. private:
  29. void Process(void *tsr) override {
  30. THolder<TDecrementingWrapper> self(this);
  31. RealObject_->Process(tsr);
  32. }
  33. private:
  34. IObjectInQueue* const RealObject_;
  35. TElasticQueue* const Queue_;
  36. };
  37. bool TElasticQueue::Add(IObjectInQueue* obj) {
  38. if (!TryIncCounter()) {
  39. return false;
  40. }
  41. THolder<TDecrementingWrapper> wrapper;
  42. try {
  43. wrapper.Reset(new TDecrementingWrapper(obj, this));
  44. } catch (...) {
  45. AtomicDecrement(GuardCount_);
  46. throw;
  47. }
  48. if (SlaveQueue_->Add(wrapper.Get())) {
  49. Y_UNUSED(wrapper.Release());
  50. return true;
  51. } else {
  52. return false;
  53. }
  54. }
  55. void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) {
  56. MaxQueueSize_ = maxQueueSize;
  57. SlaveQueue_->Start(threadCount, maxQueueSize);
  58. }
  59. void TElasticQueue::Stop() noexcept {
  60. return SlaveQueue_->Stop();
  61. }
  62. size_t TElasticQueue::Size() const noexcept {
  63. return SlaveQueue_->Size();
  64. }