blocking_queue_ut.cpp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. #include "blocking_queue.h"
  2. #include <library/cpp/iterator/enumerate.h>
  3. #include <library/cpp/testing/unittest/registar.h>
  4. #include <util/string/builder.h>
  5. #include <util/system/thread.h>
  6. namespace {
  7. class TFunctionThread: public ISimpleThread {
  8. public:
  9. using TFunc = std::function<void()>;
  10. private:
  11. TFunc Func;
  12. public:
  13. TFunctionThread(const TFunc& func)
  14. : Func(func)
  15. {
  16. }
  17. void* ThreadProc() noexcept override {
  18. Func();
  19. return nullptr;
  20. }
  21. };
  22. }
  23. IOutputStream& operator<<(IOutputStream& o, const TMaybe<int>& val) {
  24. if (val) {
  25. o << "TMaybe<int>(" << val.GetRef() << ')';
  26. } else {
  27. o << "TMaybe<int>()";
  28. }
  29. return o;
  30. }
  31. Y_UNIT_TEST_SUITE(BlockingQueueTest) {
  32. Y_UNIT_TEST(SimplePushPopTest) {
  33. const size_t limit = 100;
  34. NThreading::TBlockingQueue<int> queue(100);
  35. for (int i = 0; i != limit; ++i) {
  36. queue.Push(i);
  37. }
  38. for (int i = 0; i != limit; ++i) {
  39. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
  40. }
  41. UNIT_ASSERT(queue.Empty());
  42. }
  43. Y_UNIT_TEST(SimplePushDrainTest) {
  44. const size_t limit = 100;
  45. NThreading::TBlockingQueue<int> queue(100);
  46. for (int i = 0; i != limit; ++i) {
  47. queue.Push(i);
  48. }
  49. auto res = queue.Drain();
  50. UNIT_ASSERT_VALUES_EQUAL(queue.Empty(), true);
  51. UNIT_ASSERT_VALUES_EQUAL(res.size(), limit);
  52. for (auto [i, elem] : Enumerate(res)) {
  53. UNIT_ASSERT_VALUES_EQUAL(elem, i);
  54. }
  55. }
  56. Y_UNIT_TEST(SimpleStopTest) {
  57. const size_t limit = 100;
  58. NThreading::TBlockingQueue<int> queue(100);
  59. for (int i = 0; i != limit; ++i) {
  60. queue.Push(i);
  61. }
  62. queue.Stop();
  63. bool ok = queue.Push(100500);
  64. UNIT_ASSERT_VALUES_EQUAL(ok, false);
  65. for (int i = 0; i != limit; ++i) {
  66. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
  67. }
  68. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
  69. UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true);
  70. }
  71. Y_UNIT_TEST(BigPushPop) {
  72. const int limit = 100000;
  73. NThreading::TBlockingQueue<int> queue(10);
  74. TFunctionThread pusher([&] {
  75. for (int i = 0; i != limit; ++i) {
  76. if (!queue.Push(i)) {
  77. break;
  78. }
  79. }
  80. });
  81. pusher.Start();
  82. try {
  83. for (int i = 0; i != limit; ++i) {
  84. size_t size = queue.Size();
  85. UNIT_ASSERT_C(size <= 10, (TStringBuilder() << "Size exceeds 10: " << size).data());
  86. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
  87. }
  88. } catch (...) {
  89. // gracefull shutdown of pusher thread if assertion fails
  90. queue.Stop();
  91. throw;
  92. }
  93. pusher.Join();
  94. }
  95. Y_UNIT_TEST(StopWhenMultiplePoppers) {
  96. NThreading::TBlockingQueue<int> queue(10);
  97. TFunctionThread popper1([&] {
  98. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
  99. });
  100. TFunctionThread popper2([&] {
  101. UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
  102. });
  103. TFunctionThread drainer([&] {
  104. UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true);
  105. });
  106. popper1.Start();
  107. popper2.Start();
  108. drainer.Start();
  109. queue.Stop();
  110. popper1.Join();
  111. popper2.Join();
  112. drainer.Join();
  113. }
  114. Y_UNIT_TEST(StopWhenMultiplePushers) {
  115. NThreading::TBlockingQueue<int> queue(1);
  116. queue.Push(1);
  117. TFunctionThread pusher1([&] {
  118. UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false);
  119. });
  120. TFunctionThread pusher2([&] {
  121. UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false);
  122. });
  123. pusher1.Start();
  124. pusher2.Start();
  125. queue.Stop();
  126. pusher1.Join();
  127. pusher2.Join();
  128. }
  129. Y_UNIT_TEST(WakeUpAllProducers) {
  130. NThreading::TBlockingQueue<int> queue(2);
  131. queue.Push(1);
  132. queue.Push(2);
  133. TFunctionThread pusher1([&] {
  134. UNIT_ASSERT_VALUES_EQUAL(queue.Push(3), true);
  135. });
  136. TFunctionThread pusher2([&] {
  137. UNIT_ASSERT_VALUES_EQUAL(queue.Push(4), true);
  138. });
  139. pusher1.Start();
  140. pusher2.Start();
  141. queue.Drain();
  142. pusher1.Join();
  143. pusher2.Join();
  144. }
  145. Y_UNIT_TEST(InterruptPopByDeadline) {
  146. NThreading::TBlockingQueue<int> queue1(10);
  147. NThreading::TBlockingQueue<int> queue2(10);
  148. const auto popper1DeadLine = TInstant::Now();
  149. const auto popper2DeadLine = TInstant::Now() + TDuration::Seconds(2);
  150. TFunctionThread popper1([&] {
  151. UNIT_ASSERT_VALUES_EQUAL(queue1.Pop(popper1DeadLine), TMaybe<int>());
  152. UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
  153. });
  154. TFunctionThread popper2([&] {
  155. UNIT_ASSERT_VALUES_EQUAL(queue2.Pop(popper2DeadLine), 2);
  156. UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
  157. });
  158. popper1.Start();
  159. popper2.Start();
  160. Sleep(TDuration::Seconds(1));
  161. queue1.Push(1);
  162. queue2.Push(2);
  163. Sleep(TDuration::Seconds(1));
  164. queue1.Stop();
  165. queue2.Stop();
  166. popper1.Join();
  167. popper2.Join();
  168. }
  169. Y_UNIT_TEST(InterruptDrainByDeadline) {
  170. NThreading::TBlockingQueue<int> queue1(10);
  171. NThreading::TBlockingQueue<int> queue2(10);
  172. const auto drainer1DeadLine = TInstant::Now();
  173. const auto drainer2DeadLine = TInstant::Now() + TDuration::Seconds(2);
  174. TFunctionThread drainer1([&] {
  175. UNIT_ASSERT_VALUES_EQUAL(queue1.Drain(drainer1DeadLine).empty(), true);
  176. UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
  177. });
  178. TFunctionThread drainer2([&] {
  179. auto res = queue2.Drain(drainer2DeadLine);
  180. UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
  181. UNIT_ASSERT_VALUES_EQUAL(res.front(), 2);
  182. UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
  183. });
  184. drainer1.Start();
  185. drainer2.Start();
  186. Sleep(TDuration::Seconds(1));
  187. queue1.Push(1);
  188. queue2.Push(2);
  189. Sleep(TDuration::Seconds(1));
  190. queue1.Stop();
  191. queue2.Stop();
  192. drainer1.Join();
  193. drainer2.Join();
  194. }
  195. Y_UNIT_TEST(InterruptPushByDeadline) {
  196. NThreading::TBlockingQueue<int> queue1(1);
  197. NThreading::TBlockingQueue<int> queue2(1);
  198. queue1.Push(0);
  199. queue2.Push(0);
  200. const auto pusher1DeadLine = TInstant::Now();
  201. const auto pusher2DeadLine = TInstant::Now() + TDuration::Seconds(2);
  202. TFunctionThread pusher1([&] {
  203. UNIT_ASSERT_VALUES_EQUAL(queue1.Push(1, pusher1DeadLine), false);
  204. UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
  205. });
  206. TFunctionThread pusher2([&] {
  207. UNIT_ASSERT_VALUES_EQUAL(queue2.Push(2, pusher2DeadLine), true);
  208. UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
  209. });
  210. pusher1.Start();
  211. pusher2.Start();
  212. Sleep(TDuration::Seconds(1));
  213. queue1.Pop();
  214. queue2.Pop();
  215. Sleep(TDuration::Seconds(1));
  216. queue1.Stop();
  217. queue2.Stop();
  218. pusher1.Join();
  219. pusher2.Join();
  220. }
  221. }