blocking_queue_ut.cpp 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. #include "blocking_queue.h"
  2. #include <library/cpp/testing/unittest/registar.h>
  3. #include <util/string/builder.h>
  4. #include <util/system/thread.h>
  5. namespace {
  6. class TFunctionThread: public ISimpleThread {
  7. public:
  8. using TFunc = std::function<void()>;
  9. private:
  10. TFunc Func;
  11. public:
  12. TFunctionThread(const TFunc& func)
  13. : Func(func)
  14. {
  15. }
  16. void* ThreadProc() noexcept override {
  17. Func();
  18. return nullptr;
  19. }
  20. };
  21. }
  22. IOutputStream& operator<<(IOutputStream& o, const TMaybe<int>& val) {
  23. if (val) {
  24. o << "TMaybe<int>(" << val.GetRef() << ')';
  25. } else {
  26. o << "TMaybe<int>()";
  27. }
  28. return o;
  29. }
  30. Y_UNIT_TEST_SUITE(BlockingQueueTest) {
  31. Y_UNIT_TEST(SimplePushPopTest) {
  32. const size_t limit = 100;
  33. NThreading::TBlockingQueue<int> queue(100);
  34. for (int i = 0; i != limit; ++i) {
  35. queue.Push(i);
  36. }
  37. for (int i = 0; i != limit; ++i) {
  38. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
  39. }
  40. UNIT_ASSERT(queue.Empty());
  41. }
  42. Y_UNIT_TEST(SimpleStopTest) {
  43. const size_t limit = 100;
  44. NThreading::TBlockingQueue<int> queue(100);
  45. for (int i = 0; i != limit; ++i) {
  46. queue.Push(i);
  47. }
  48. queue.Stop();
  49. bool ok = queue.Push(100500);
  50. UNIT_ASSERT_VALUES_EQUAL(ok, false);
  51. for (int i = 0; i != limit; ++i) {
  52. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
  53. }
  54. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
  55. }
  56. Y_UNIT_TEST(BigPushPop) {
  57. const int limit = 100000;
  58. NThreading::TBlockingQueue<int> queue(10);
  59. TFunctionThread pusher([&] {
  60. for (int i = 0; i != limit; ++i) {
  61. if (!queue.Push(i)) {
  62. break;
  63. }
  64. }
  65. });
  66. pusher.Start();
  67. try {
  68. for (int i = 0; i != limit; ++i) {
  69. size_t size = queue.Size();
  70. UNIT_ASSERT_C(size <= 10, (TStringBuilder() << "Size exceeds 10: " << size).data());
  71. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
  72. }
  73. } catch (...) {
  74. // gracefull shutdown of pusher thread if assertion fails
  75. queue.Stop();
  76. throw;
  77. }
  78. pusher.Join();
  79. }
  80. Y_UNIT_TEST(StopWhenMultiplePoppers) {
  81. NThreading::TBlockingQueue<int> queue(10);
  82. TFunctionThread popper1([&] {
  83. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
  84. });
  85. TFunctionThread popper2([&] {
  86. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
  87. });
  88. popper1.Start();
  89. popper2.Start();
  90. queue.Stop();
  91. popper1.Join();
  92. popper2.Join();
  93. }
  94. Y_UNIT_TEST(StopWhenMultiplePushers) {
  95. NThreading::TBlockingQueue<int> queue(1);
  96. queue.Push(1);
  97. TFunctionThread pusher1([&] {
  98. UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false);
  99. });
  100. TFunctionThread pusher2([&] {
  101. UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false);
  102. });
  103. pusher1.Start();
  104. pusher2.Start();
  105. queue.Stop();
  106. pusher1.Join();
  107. pusher2.Join();
  108. }
  109. Y_UNIT_TEST(InterruptPopByDeadline) {
  110. NThreading::TBlockingQueue<int> queue1(10);
  111. NThreading::TBlockingQueue<int> queue2(10);
  112. const auto popper1DeadLine = TInstant::Now();
  113. const auto popper2DeadLine = TInstant::Now() + TDuration::Seconds(2);
  114. TFunctionThread popper1([&] {
  115. UNIT_ASSERT_VALUES_EQUAL(queue1.Pop(popper1DeadLine), TMaybe<int>());
  116. UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
  117. });
  118. TFunctionThread popper2([&] {
  119. UNIT_ASSERT_VALUES_EQUAL(queue2.Pop(popper2DeadLine), 2);
  120. UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
  121. });
  122. popper1.Start();
  123. popper2.Start();
  124. Sleep(TDuration::Seconds(1));
  125. queue1.Push(1);
  126. queue2.Push(2);
  127. Sleep(TDuration::Seconds(1));
  128. queue1.Stop();
  129. queue2.Stop();
  130. popper1.Join();
  131. popper2.Join();
  132. }
  133. Y_UNIT_TEST(InterruptPushByDeadline) {
  134. NThreading::TBlockingQueue<int> queue1(1);
  135. NThreading::TBlockingQueue<int> queue2(1);
  136. queue1.Push(0);
  137. queue2.Push(0);
  138. const auto pusher1DeadLine = TInstant::Now();
  139. const auto pusher2DeadLine = TInstant::Now() + TDuration::Seconds(2);
  140. TFunctionThread pusher1([&] {
  141. UNIT_ASSERT_VALUES_EQUAL(queue1.Push(1, pusher1DeadLine), false);
  142. UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
  143. });
  144. TFunctionThread pusher2([&] {
  145. UNIT_ASSERT_VALUES_EQUAL(queue2.Push(2, pusher2DeadLine), true);
  146. UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
  147. });
  148. pusher1.Start();
  149. pusher2.Start();
  150. Sleep(TDuration::Seconds(1));
  151. queue1.Pop();
  152. queue2.Pop();
  153. Sleep(TDuration::Seconds(1));
  154. queue1.Stop();
  155. queue2.Stop();
  156. pusher1.Join();
  157. pusher2.Join();
  158. }
  159. }