lfqueue_ut.cpp 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. #include <library/cpp/threading/future/future.h>
  2. #include <library/cpp/testing/unittest/registar.h>
  3. #include <util/generic/algorithm.h>
  4. #include <util/generic/vector.h>
  5. #include <util/generic/ptr.h>
  6. #include <util/system/atomic.h>
  7. #include <util/thread/pool.h>
  8. #include "lfqueue.h"
  9. class TMoveTest {
  10. public:
  11. TMoveTest(int marker = 0, int value = 0)
  12. : Marker_(marker)
  13. , Value_(value)
  14. {
  15. }
  16. TMoveTest(const TMoveTest& other) {
  17. *this = other;
  18. }
  19. TMoveTest(TMoveTest&& other) {
  20. *this = std::move(other);
  21. }
  22. TMoveTest& operator=(const TMoveTest& other) {
  23. Value_ = other.Value_;
  24. Marker_ = other.Marker_ + 1024;
  25. return *this;
  26. }
  27. TMoveTest& operator=(TMoveTest&& other) {
  28. Value_ = other.Value_;
  29. Marker_ = other.Marker_;
  30. other.Marker_ = 0;
  31. return *this;
  32. }
  33. int Marker() const {
  34. return Marker_;
  35. }
  36. int Value() const {
  37. return Value_;
  38. }
  39. private:
  40. int Marker_ = 0;
  41. int Value_ = 0;
  42. };
  43. class TOperationsChecker {
  44. public:
  45. TOperationsChecker() {
  46. ++DefaultCtor_;
  47. }
  48. TOperationsChecker(TOperationsChecker&&) {
  49. ++MoveCtor_;
  50. }
  51. TOperationsChecker(const TOperationsChecker&) {
  52. ++CopyCtor_;
  53. }
  54. TOperationsChecker& operator=(TOperationsChecker&&) {
  55. ++MoveAssign_;
  56. return *this;
  57. }
  58. TOperationsChecker& operator=(const TOperationsChecker&) {
  59. ++CopyAssign_;
  60. return *this;
  61. }
  62. static void Check(int defaultCtor, int moveCtor, int copyCtor, int moveAssign, int copyAssign) {
  63. UNIT_ASSERT_VALUES_EQUAL(defaultCtor, DefaultCtor_);
  64. UNIT_ASSERT_VALUES_EQUAL(moveCtor, MoveCtor_);
  65. UNIT_ASSERT_VALUES_EQUAL(copyCtor, CopyCtor_);
  66. UNIT_ASSERT_VALUES_EQUAL(moveAssign, MoveAssign_);
  67. UNIT_ASSERT_VALUES_EQUAL(copyAssign, CopyAssign_);
  68. Clear();
  69. }
  70. private:
  71. static void Clear() {
  72. DefaultCtor_ = MoveCtor_ = CopyCtor_ = MoveAssign_ = CopyAssign_ = 0;
  73. }
  74. static int DefaultCtor_;
  75. static int MoveCtor_;
  76. static int CopyCtor_;
  77. static int MoveAssign_;
  78. static int CopyAssign_;
  79. };
  80. int TOperationsChecker::DefaultCtor_ = 0;
  81. int TOperationsChecker::MoveCtor_ = 0;
  82. int TOperationsChecker::CopyCtor_ = 0;
  83. int TOperationsChecker::MoveAssign_ = 0;
  84. int TOperationsChecker::CopyAssign_ = 0;
  85. Y_UNIT_TEST_SUITE(TLockFreeQueueTests) {
  86. Y_UNIT_TEST(TestMoveEnqueue) {
  87. TMoveTest value(0xFF, 0xAA);
  88. TMoveTest tmp;
  89. TLockFreeQueue<TMoveTest> queue;
  90. queue.Enqueue(value);
  91. UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0xFF);
  92. UNIT_ASSERT(queue.Dequeue(&tmp));
  93. UNIT_ASSERT_VALUES_UNEQUAL(tmp.Marker(), 0xFF);
  94. UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);
  95. queue.Enqueue(std::move(value));
  96. UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0);
  97. UNIT_ASSERT(queue.Dequeue(&tmp));
  98. UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);
  99. }
  100. Y_UNIT_TEST(TestSimpleEnqueueDequeue) {
  101. TLockFreeQueue<int> queue;
  102. int i = -1;
  103. UNIT_ASSERT(!queue.Dequeue(&i));
  104. UNIT_ASSERT_VALUES_EQUAL(i, -1);
  105. queue.Enqueue(10);
  106. queue.Enqueue(11);
  107. queue.Enqueue(12);
  108. UNIT_ASSERT(queue.Dequeue(&i));
  109. UNIT_ASSERT_VALUES_EQUAL(10, i);
  110. UNIT_ASSERT(queue.Dequeue(&i));
  111. UNIT_ASSERT_VALUES_EQUAL(11, i);
  112. queue.Enqueue(13);
  113. UNIT_ASSERT(queue.Dequeue(&i));
  114. UNIT_ASSERT_VALUES_EQUAL(12, i);
  115. UNIT_ASSERT(queue.Dequeue(&i));
  116. UNIT_ASSERT_VALUES_EQUAL(13, i);
  117. UNIT_ASSERT(!queue.Dequeue(&i));
  118. const int tmp = 100;
  119. queue.Enqueue(tmp);
  120. UNIT_ASSERT(queue.Dequeue(&i));
  121. UNIT_ASSERT_VALUES_EQUAL(i, tmp);
  122. }
  123. Y_UNIT_TEST(TestSimpleEnqueueAllDequeue) {
  124. TLockFreeQueue<int> queue;
  125. int i = -1;
  126. UNIT_ASSERT(!queue.Dequeue(&i));
  127. UNIT_ASSERT_VALUES_EQUAL(i, -1);
  128. TVector<int> v;
  129. v.push_back(20);
  130. v.push_back(21);
  131. queue.EnqueueAll(v);
  132. v.clear();
  133. v.push_back(22);
  134. v.push_back(23);
  135. v.push_back(24);
  136. queue.EnqueueAll(v);
  137. v.clear();
  138. queue.EnqueueAll(v);
  139. v.clear();
  140. v.push_back(25);
  141. queue.EnqueueAll(v);
  142. for (int j = 20; j <= 25; ++j) {
  143. UNIT_ASSERT(queue.Dequeue(&i));
  144. UNIT_ASSERT_VALUES_EQUAL(j, i);
  145. }
  146. UNIT_ASSERT(!queue.Dequeue(&i));
  147. }
  148. void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) {
  149. size_t threadsNum = 4;
  150. size_t enqueuesPerThread = 10'000;
  151. TThreadPool p;
  152. p.Start(threadsNum, 0);
  153. TVector<NThreading::TFuture<void>> futures;
  154. for (size_t i = 0; i < threadsNum; ++i) {
  155. NThreading::TPromise<void> promise = NThreading::NewPromise();
  156. futures.emplace_back(promise.GetFuture());
  157. p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable {
  158. for (size_t i = 0; i != enqueuesPerThread; ++i) {
  159. queue.Enqueue(i);
  160. }
  161. promise.SetValue();
  162. });
  163. }
  164. TAtomic elementsLeft;
  165. AtomicSet(elementsLeft, threadsNum * enqueuesPerThread);
  166. ui64 numOfConsumers = singleConsumer ? 1 : threadsNum;
  167. TVector<TVector<int>> dataBuckets(numOfConsumers);
  168. for (size_t i = 0; i < numOfConsumers; ++i) {
  169. NThreading::TPromise<void> promise = NThreading::NewPromise();
  170. futures.emplace_back(promise.GetFuture());
  171. p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable {
  172. TVector<int> vec;
  173. while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) {
  174. for (size_t i = 0; i != 100; ++i) {
  175. vec.clear();
  176. queue.DequeueAll(&vec);
  177. AtomicSub(elementsLeft, vec.size());
  178. consumerData->insert(consumerData->end(), vec.begin(), vec.end());
  179. }
  180. }
  181. promise.SetValue();
  182. });
  183. }
  184. NThreading::WaitExceptionOrAll(futures).GetValueSync();
  185. p.Stop();
  186. TVector<int> left;
  187. queue.DequeueAll(&left);
  188. UNIT_ASSERT(left.empty());
  189. TVector<int> data;
  190. for (auto& dataBucket : dataBuckets) {
  191. data.insert(data.end(), dataBucket.begin(), dataBucket.end());
  192. }
  193. UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread);
  194. size_t threadIdx = 0;
  195. size_t cntValue = 0;
  196. Sort(data.begin(), data.end());
  197. for (size_t i = 0; i != data.size(); ++i) {
  198. UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]);
  199. ++threadIdx;
  200. if (threadIdx == threadsNum) {
  201. ++cntValue;
  202. threadIdx = 0;
  203. }
  204. }
  205. }
  206. Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
  207. TLockFreeQueue<int> queue;
  208. DequeueAllRunner(queue, true);
  209. }
  210. Y_UNIT_TEST(TestDequeueAllMultipleConsumers) {
  211. TLockFreeQueue<int> queue;
  212. DequeueAllRunner(queue, false);
  213. }
  214. Y_UNIT_TEST(TestDequeueAllEmptyQueue) {
  215. TLockFreeQueue<int> queue;
  216. TVector<int> vec;
  217. queue.DequeueAll(&vec);
  218. UNIT_ASSERT(vec.empty());
  219. }
  220. Y_UNIT_TEST(TestDequeueAllQueueOrder) {
  221. TLockFreeQueue<int> queue;
  222. queue.Enqueue(1);
  223. queue.Enqueue(2);
  224. queue.Enqueue(3);
  225. TVector<int> v;
  226. queue.DequeueAll(&v);
  227. UNIT_ASSERT_VALUES_EQUAL(v.size(), 3);
  228. UNIT_ASSERT_VALUES_EQUAL(v[0], 1);
  229. UNIT_ASSERT_VALUES_EQUAL(v[1], 2);
  230. UNIT_ASSERT_VALUES_EQUAL(v[2], 3);
  231. }
  232. Y_UNIT_TEST(CleanInDestructor) {
  233. TSimpleSharedPtr<bool> p(new bool);
  234. UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());
  235. {
  236. TLockFreeQueue<TSimpleSharedPtr<bool>> stack;
  237. stack.Enqueue(p);
  238. stack.Enqueue(p);
  239. UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount());
  240. }
  241. UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount());
  242. }
  243. Y_UNIT_TEST(CheckOperationsCount) {
  244. TOperationsChecker o;
  245. o.Check(1, 0, 0, 0, 0);
  246. TLockFreeQueue<TOperationsChecker> queue;
  247. o.Check(0, 0, 0, 0, 0);
  248. queue.Enqueue(std::move(o));
  249. o.Check(0, 1, 0, 0, 0);
  250. queue.Enqueue(o);
  251. o.Check(0, 0, 1, 0, 0);
  252. queue.Dequeue(&o);
  253. o.Check(0, 0, 2, 1, 0);
  254. }
  255. }