equeue_ut.cpp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. #include "equeue.h"
  2. #include <library/cpp/threading/equeue/fast/equeue.h>
  3. #include <library/cpp/testing/unittest/registar.h>
  4. #include <util/system/event.h>
  5. #include <util/datetime/base.h>
  6. #include <util/generic/vector.h>
  7. Y_UNIT_TEST_SUITE(TElasticQueueTest) {
  8. const size_t MaxQueueSize = 20;
  9. const size_t ThreadCount = 10;
  10. template <typename T>
  11. THolder<T> MakeQueue();
  12. template <>
  13. THolder<TElasticQueue> MakeQueue() {
  14. return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>());
  15. }
  16. template <>
  17. THolder<TFastElasticQueue> MakeQueue() {
  18. return MakeHolder<TFastElasticQueue>();
  19. }
  20. template <typename T>
  21. struct TEnv {
  22. static inline THolder<T> Queue;
  23. struct TQueueSetup {
  24. TQueueSetup() {
  25. Queue.Reset(MakeQueue<T>());
  26. Queue->Start(ThreadCount, MaxQueueSize);
  27. }
  28. ~TQueueSetup() {
  29. Queue->Stop();
  30. }
  31. };
  32. };
  33. struct TCounters {
  34. void Reset() {
  35. Processed = Scheduled = Discarded = Total = 0;
  36. }
  37. TAtomic Processed;
  38. TAtomic Scheduled;
  39. TAtomic Discarded;
  40. TAtomic Total;
  41. };
  42. static TCounters Counters;
  43. //fill test -- fill queue with "endless" jobs
  44. TSystemEvent WaitEvent;
  45. template <typename T>
  46. void FillTest() {
  47. Counters.Reset();
  48. struct TWaitJob: public IObjectInQueue {
  49. void Process(void*) override {
  50. WaitEvent.Wait();
  51. AtomicIncrement(Counters.Processed);
  52. }
  53. } job;
  54. struct TLocalSetup: TEnv<T>::TQueueSetup {
  55. TLocalSetup() {
  56. WaitEvent.Reset();
  57. }
  58. ~TLocalSetup() {
  59. WaitEvent.Signal();
  60. }
  61. };
  62. size_t enqueued = 0;
  63. {
  64. TLocalSetup setup;
  65. while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
  66. ++enqueued;
  67. }
  68. UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
  69. UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount());
  70. }
  71. UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
  72. UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
  73. UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
  74. }
  75. Y_UNIT_TEST(FillTest) {
  76. FillTest<TElasticQueue>();
  77. }
  78. Y_UNIT_TEST(FillTestFast) {
  79. FillTest<TFastElasticQueue>();
  80. }
  81. //concurrent test -- send many jobs from different threads
  82. struct TJob: public IObjectInQueue {
  83. void Process(void*) override {
  84. AtomicIncrement(Counters.Processed);
  85. }
  86. };
  87. static TJob Job;
  88. template <typename T>
  89. static bool TryAdd() {
  90. AtomicIncrement(Counters.Total);
  91. if (TEnv<T>::Queue->Add(&Job)) {
  92. AtomicIncrement(Counters.Scheduled);
  93. return true;
  94. } else {
  95. AtomicIncrement(Counters.Discarded);
  96. return false;
  97. }
  98. }
  99. const size_t N = 100000;
  100. static size_t TryCounter;
  101. template <typename T>
  102. void ConcurrentTest() {
  103. Counters.Reset();
  104. TryCounter = 0;
  105. struct TSender: public IThreadFactory::IThreadAble {
  106. void DoExecute() override {
  107. while ((size_t)AtomicIncrement(TryCounter) <= N) {
  108. if (!TryAdd<T>()) {
  109. Sleep(TDuration::MicroSeconds(50));
  110. }
  111. }
  112. }
  113. } sender;
  114. {
  115. typename TEnv<T>::TQueueSetup setup;
  116. TVector< TAutoPtr<IThreadFactory::IThread> > senders;
  117. for (size_t i = 0; i < ThreadCount; ++i) {
  118. senders.push_back(::SystemThreadFactory()->Run(&sender));
  119. }
  120. for (size_t i = 0; i < senders.size(); ++i) {
  121. senders[i]->Join();
  122. }
  123. }
  124. UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N);
  125. UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
  126. UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
  127. }
  128. Y_UNIT_TEST(ConcurrentTest) {
  129. ConcurrentTest<TElasticQueue>();
  130. }
  131. Y_UNIT_TEST(ConcurrentTestFast) {
  132. ConcurrentTest<TFastElasticQueue>();
  133. }
  134. }