lfqueue_ut.cpp 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. #include <library/cpp/threading/future/future.h>
  2. #include <library/cpp/testing/unittest/registar.h>
  3. #include <util/generic/algorithm.h>
  4. #include <util/generic/vector.h>
  5. #include <util/generic/ptr.h>
  6. #include <util/thread/pool.h>
  7. #include "lfqueue.h"
  8. class TMoveTest {
  9. public:
  10. TMoveTest(int marker = 0, int value = 0)
  11. : Marker_(marker)
  12. , Value_(value)
  13. {
  14. }
  15. TMoveTest(const TMoveTest& other) {
  16. *this = other;
  17. }
  18. TMoveTest(TMoveTest&& other) {
  19. *this = std::move(other);
  20. }
  21. TMoveTest& operator=(const TMoveTest& other) {
  22. Value_ = other.Value_;
  23. Marker_ = other.Marker_ + 1024;
  24. return *this;
  25. }
  26. TMoveTest& operator=(TMoveTest&& other) {
  27. Value_ = other.Value_;
  28. Marker_ = other.Marker_;
  29. other.Marker_ = 0;
  30. return *this;
  31. }
  32. int Marker() const {
  33. return Marker_;
  34. }
  35. int Value() const {
  36. return Value_;
  37. }
  38. private:
  39. int Marker_ = 0;
  40. int Value_ = 0;
  41. };
  42. class TOperationsChecker {
  43. public:
  44. TOperationsChecker() {
  45. ++DefaultCtor_;
  46. }
  47. TOperationsChecker(TOperationsChecker&&) {
  48. ++MoveCtor_;
  49. }
  50. TOperationsChecker(const TOperationsChecker&) {
  51. ++CopyCtor_;
  52. }
  53. TOperationsChecker& operator=(TOperationsChecker&&) {
  54. ++MoveAssign_;
  55. return *this;
  56. }
  57. TOperationsChecker& operator=(const TOperationsChecker&) {
  58. ++CopyAssign_;
  59. return *this;
  60. }
  61. static void Check(int defaultCtor, int moveCtor, int copyCtor, int moveAssign, int copyAssign) {
  62. UNIT_ASSERT_VALUES_EQUAL(defaultCtor, DefaultCtor_);
  63. UNIT_ASSERT_VALUES_EQUAL(moveCtor, MoveCtor_);
  64. UNIT_ASSERT_VALUES_EQUAL(copyCtor, CopyCtor_);
  65. UNIT_ASSERT_VALUES_EQUAL(moveAssign, MoveAssign_);
  66. UNIT_ASSERT_VALUES_EQUAL(copyAssign, CopyAssign_);
  67. Clear();
  68. }
  69. private:
  70. static void Clear() {
  71. DefaultCtor_ = MoveCtor_ = CopyCtor_ = MoveAssign_ = CopyAssign_ = 0;
  72. }
  73. static int DefaultCtor_;
  74. static int MoveCtor_;
  75. static int CopyCtor_;
  76. static int MoveAssign_;
  77. static int CopyAssign_;
  78. };
  79. int TOperationsChecker::DefaultCtor_ = 0;
  80. int TOperationsChecker::MoveCtor_ = 0;
  81. int TOperationsChecker::CopyCtor_ = 0;
  82. int TOperationsChecker::MoveAssign_ = 0;
  83. int TOperationsChecker::CopyAssign_ = 0;
  84. Y_UNIT_TEST_SUITE(TLockFreeQueueTests) {
  85. Y_UNIT_TEST(TestMoveEnqueue) {
  86. TMoveTest value(0xFF, 0xAA);
  87. TMoveTest tmp;
  88. TLockFreeQueue<TMoveTest> queue;
  89. queue.Enqueue(value);
  90. UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0xFF);
  91. UNIT_ASSERT(queue.Dequeue(&tmp));
  92. UNIT_ASSERT_VALUES_UNEQUAL(tmp.Marker(), 0xFF);
  93. UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);
  94. queue.Enqueue(std::move(value));
  95. UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0);
  96. UNIT_ASSERT(queue.Dequeue(&tmp));
  97. UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);
  98. }
  99. Y_UNIT_TEST(TestSimpleEnqueueDequeue) {
  100. TLockFreeQueue<int> queue;
  101. int i = -1;
  102. UNIT_ASSERT(!queue.Dequeue(&i));
  103. UNIT_ASSERT_VALUES_EQUAL(i, -1);
  104. queue.Enqueue(10);
  105. queue.Enqueue(11);
  106. queue.Enqueue(12);
  107. UNIT_ASSERT(queue.Dequeue(&i));
  108. UNIT_ASSERT_VALUES_EQUAL(10, i);
  109. UNIT_ASSERT(queue.Dequeue(&i));
  110. UNIT_ASSERT_VALUES_EQUAL(11, i);
  111. queue.Enqueue(13);
  112. UNIT_ASSERT(queue.Dequeue(&i));
  113. UNIT_ASSERT_VALUES_EQUAL(12, i);
  114. UNIT_ASSERT(queue.Dequeue(&i));
  115. UNIT_ASSERT_VALUES_EQUAL(13, i);
  116. UNIT_ASSERT(!queue.Dequeue(&i));
  117. const int tmp = 100;
  118. queue.Enqueue(tmp);
  119. UNIT_ASSERT(queue.Dequeue(&i));
  120. UNIT_ASSERT_VALUES_EQUAL(i, tmp);
  121. }
  122. Y_UNIT_TEST(TestSimpleEnqueueAllDequeue) {
  123. TLockFreeQueue<int> queue;
  124. int i = -1;
  125. UNIT_ASSERT(!queue.Dequeue(&i));
  126. UNIT_ASSERT_VALUES_EQUAL(i, -1);
  127. TVector<int> v;
  128. v.push_back(20);
  129. v.push_back(21);
  130. queue.EnqueueAll(v);
  131. v.clear();
  132. v.push_back(22);
  133. v.push_back(23);
  134. v.push_back(24);
  135. queue.EnqueueAll(v);
  136. v.clear();
  137. queue.EnqueueAll(v);
  138. v.clear();
  139. v.push_back(25);
  140. queue.EnqueueAll(v);
  141. for (int j = 20; j <= 25; ++j) {
  142. UNIT_ASSERT(queue.Dequeue(&i));
  143. UNIT_ASSERT_VALUES_EQUAL(j, i);
  144. }
  145. UNIT_ASSERT(!queue.Dequeue(&i));
  146. }
  147. void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) {
  148. size_t threadsNum = 4;
  149. size_t enqueuesPerThread = 10'000;
  150. TThreadPool p;
  151. p.Start(threadsNum, 0);
  152. TVector<NThreading::TFuture<void>> futures;
  153. for (size_t i = 0; i < threadsNum; ++i) {
  154. NThreading::TPromise<void> promise = NThreading::NewPromise();
  155. futures.emplace_back(promise.GetFuture());
  156. p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable {
  157. for (size_t i = 0; i != enqueuesPerThread; ++i) {
  158. queue.Enqueue(i);
  159. }
  160. promise.SetValue();
  161. });
  162. }
  163. std::atomic<size_t> elementsLeft = threadsNum * enqueuesPerThread;
  164. ui64 numOfConsumers = singleConsumer ? 1 : threadsNum;
  165. TVector<TVector<int>> dataBuckets(numOfConsumers);
  166. for (size_t i = 0; i < numOfConsumers; ++i) {
  167. NThreading::TPromise<void> promise = NThreading::NewPromise();
  168. futures.emplace_back(promise.GetFuture());
  169. p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable {
  170. TVector<int> vec;
  171. while (static_cast<i64>(elementsLeft.load()) > 0) {
  172. for (size_t i = 0; i != 100; ++i) {
  173. vec.clear();
  174. queue.DequeueAll(&vec);
  175. elementsLeft -= vec.size();
  176. consumerData->insert(consumerData->end(), vec.begin(), vec.end());
  177. }
  178. }
  179. promise.SetValue();
  180. });
  181. }
  182. NThreading::WaitExceptionOrAll(futures).GetValueSync();
  183. p.Stop();
  184. TVector<int> left;
  185. queue.DequeueAll(&left);
  186. UNIT_ASSERT(left.empty());
  187. TVector<int> data;
  188. for (auto& dataBucket : dataBuckets) {
  189. data.insert(data.end(), dataBucket.begin(), dataBucket.end());
  190. }
  191. UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread);
  192. size_t threadIdx = 0;
  193. size_t cntValue = 0;
  194. Sort(data.begin(), data.end());
  195. for (size_t i = 0; i != data.size(); ++i) {
  196. UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]);
  197. ++threadIdx;
  198. if (threadIdx == threadsNum) {
  199. ++cntValue;
  200. threadIdx = 0;
  201. }
  202. }
  203. }
  204. Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
  205. TLockFreeQueue<int> queue;
  206. DequeueAllRunner(queue, true);
  207. }
  208. Y_UNIT_TEST(TestDequeueAllMultipleConsumers) {
  209. TLockFreeQueue<int> queue;
  210. DequeueAllRunner(queue, false);
  211. }
  212. Y_UNIT_TEST(TestDequeueAllEmptyQueue) {
  213. TLockFreeQueue<int> queue;
  214. TVector<int> vec;
  215. queue.DequeueAll(&vec);
  216. UNIT_ASSERT(vec.empty());
  217. }
  218. Y_UNIT_TEST(TestDequeueAllQueueOrder) {
  219. TLockFreeQueue<int> queue;
  220. queue.Enqueue(1);
  221. queue.Enqueue(2);
  222. queue.Enqueue(3);
  223. TVector<int> v;
  224. queue.DequeueAll(&v);
  225. UNIT_ASSERT_VALUES_EQUAL(v.size(), 3);
  226. UNIT_ASSERT_VALUES_EQUAL(v[0], 1);
  227. UNIT_ASSERT_VALUES_EQUAL(v[1], 2);
  228. UNIT_ASSERT_VALUES_EQUAL(v[2], 3);
  229. }
  230. Y_UNIT_TEST(CleanInDestructor) {
  231. TSimpleSharedPtr<bool> p(new bool);
  232. UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());
  233. {
  234. TLockFreeQueue<TSimpleSharedPtr<bool>> stack;
  235. stack.Enqueue(p);
  236. stack.Enqueue(p);
  237. UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount());
  238. }
  239. UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount());
  240. }
  241. Y_UNIT_TEST(CheckOperationsCount) {
  242. TOperationsChecker o;
  243. o.Check(1, 0, 0, 0, 0);
  244. TLockFreeQueue<TOperationsChecker> queue;
  245. o.Check(0, 0, 0, 0, 0);
  246. queue.Enqueue(std::move(o));
  247. o.Check(0, 1, 0, 0, 0);
  248. queue.Enqueue(o);
  249. o.Check(0, 0, 1, 0, 0);
  250. queue.Dequeue(&o);
  251. o.Check(0, 0, 2, 1, 0);
  252. }
  253. }