queue_ut.cpp 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include <util/system/thread.h>
  3. #include "ut_helpers.h"
  4. typedef void* TMsgLink;
  5. template <typename TQueueType>
  6. class TQueueTestProcs: public TTestBase {
  7. private:
  8. UNIT_TEST_SUITE_DEMANGLE(TQueueTestProcs<TQueueType>);
  9. UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
  10. UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
  11. UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
  12. /*
  13. UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
  14. UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
  15. UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
  16. */
  17. UNIT_TEST_SUITE_END();
  18. public:
  19. void Push1M_Pop1M() {
  20. TQueueType queue;
  21. TMsgLink msg = &queue;
  22. auto pmsg = queue.Pop();
  23. UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
  24. for (int i = 0; i < 1000000; ++i) {
  25. queue.Push((char*)msg + i);
  26. }
  27. for (int i = 0; i < 1000000; ++i) {
  28. auto popped = queue.Pop();
  29. UNIT_ASSERT_EQUAL((char*)msg + i, popped);
  30. }
  31. pmsg = queue.Pop();
  32. UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
  33. }
  34. void Threads2_Push1M_Threads1_Pop2M() {
  35. TQueueType queue;
  36. class TPusherThread: public ISimpleThread {
  37. public:
  38. TPusherThread(TQueueType& theQueue, char* start)
  39. : Queue(theQueue)
  40. , Arg(start)
  41. {
  42. }
  43. TQueueType& Queue;
  44. char* Arg;
  45. void* ThreadProc() override {
  46. for (int i = 0; i < 1000000; ++i) {
  47. Queue.Push(Arg + i);
  48. }
  49. return nullptr;
  50. }
  51. };
  52. TPusherThread pusher1(queue, (char*)&queue);
  53. TPusherThread pusher2(queue, (char*)&queue + 2000000);
  54. pusher1.Start();
  55. pusher2.Start();
  56. for (int i = 0; i < 2000000; ++i) {
  57. while (queue.Pop() == nullptr) {
  58. SpinLockPause();
  59. }
  60. }
  61. auto pmsg = queue.Pop();
  62. UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
  63. }
  64. void Threads4_Push1M_Threads1_Pop4M() {
  65. TQueueType queue;
  66. class TPusherThread: public ISimpleThread {
  67. public:
  68. TPusherThread(TQueueType& theQueue, char* start)
  69. : Queue(theQueue)
  70. , Arg(start)
  71. {
  72. }
  73. TQueueType& Queue;
  74. char* Arg;
  75. void* ThreadProc() override {
  76. for (int i = 0; i < 1000000; ++i) {
  77. Queue.Push(Arg + i);
  78. }
  79. return nullptr;
  80. }
  81. };
  82. TPusherThread pusher1(queue, (char*)&queue);
  83. TPusherThread pusher2(queue, (char*)&queue + 2000000);
  84. TPusherThread pusher3(queue, (char*)&queue + 4000000);
  85. TPusherThread pusher4(queue, (char*)&queue + 6000000);
  86. pusher1.Start();
  87. pusher2.Start();
  88. pusher3.Start();
  89. pusher4.Start();
  90. for (int i = 0; i < 4000000; ++i) {
  91. while (queue.Pop() == nullptr) {
  92. SpinLockPause();
  93. }
  94. }
  95. auto pmsg = queue.Pop();
  96. UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
  97. }
  98. template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>
  99. void ManyRndPush100K_ManyQueues() {
  100. TQueueType queue[NUMBER_OF_QUEUES];
  101. class TPusherThread: public ISimpleThread {
  102. public:
  103. TPusherThread(TQueueType* queues, char* start)
  104. : Queues(queues)
  105. , Arg(start)
  106. {
  107. }
  108. TQueueType* Queues;
  109. char* Arg;
  110. void* ThreadProc() override {
  111. ui64 counters[NUMBER_OF_QUEUES];
  112. for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
  113. counters[i] = 0;
  114. }
  115. for (int i = 0; i < 100000; ++i) {
  116. size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;
  117. int cookie = counters[rnd]++;
  118. Queues[rnd].Push(Arg + cookie);
  119. }
  120. for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
  121. Queues[i].Push((void*)2ULL);
  122. }
  123. return nullptr;
  124. }
  125. };
  126. class TPopperThread: public ISimpleThread {
  127. public:
  128. TPopperThread(TQueueType* theQueue, char* base)
  129. : Queue(theQueue)
  130. , Base(base)
  131. {
  132. }
  133. TQueueType* Queue;
  134. char* Base;
  135. void* ThreadProc() override {
  136. ui64 counters[NUMBER_OF_PUSHERS];
  137. for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
  138. counters[i] = 0;
  139. }
  140. for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {
  141. auto msg = Queue->Pop();
  142. if (msg == nullptr) {
  143. SpinLockPause();
  144. continue;
  145. }
  146. if (msg == (void*)2ULL) {
  147. ++fin;
  148. continue;
  149. }
  150. ui64 shift = (char*)msg - Base;
  151. auto pusherNum = shift / 200000000ULL;
  152. auto msgNum = shift % 200000000ULL;
  153. UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);
  154. ++counters[pusherNum];
  155. }
  156. auto pmsg = Queue->Pop();
  157. UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
  158. return nullptr;
  159. }
  160. };
  161. TVector<TAutoPtr<TPopperThread>> poppers;
  162. TVector<TAutoPtr<TPusherThread>> pushers;
  163. for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
  164. poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));
  165. poppers.back()->Start();
  166. }
  167. for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
  168. pushers.emplace_back(
  169. new TPusherThread(queue, (char*)&queue + 200000000ULL * i));
  170. pushers.back()->Start();
  171. }
  172. for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
  173. poppers[i]->Join();
  174. }
  175. for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
  176. pushers[i]->Join();
  177. }
  178. }
  179. void Threads8_RndPush100K_Threads8_Queues() {
  180. ManyRndPush100K_ManyQueues<8, 8>();
  181. }
  182. /*
  183. void Threads24_RndPush100K_Threads24_Queues() {
  184. ManyRndPush100K_ManyQueues<24, 24>();
  185. }
  186. void Threads24_RndPush100K_Threads8_Queues() {
  187. ManyRndPush100K_ManyQueues<24, 8>();
  188. }
  189. void Threads24_RndPush100K_Threads4_Queues() {
  190. ManyRndPush100K_ManyQueues<24, 4>();
  191. }
  192. */
  193. };
  194. REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs);