lfstack_ut.cpp 9.0 KB


  1. #include "lfstack.h"
  2. #include <library/cpp/testing/unittest/registar.h>
  3. #include <library/cpp/threading/future/legacy_future.h>
  4. #include <util/generic/deque.h>
  5. #include <util/system/event.h>
  6. #include <atomic>
  7. Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
  8. class TCountDownLatch {
  9. private:
  10. std::atomic<size_t> Current_;
  11. TSystemEvent EventObject_;
  12. public:
  13. TCountDownLatch(unsigned initial)
  14. : Current_(initial)
  15. {
  16. }
  17. void CountDown() {
  18. if (--Current_ == 0) {
  19. EventObject_.Signal();
  20. }
  21. }
  22. void Await() {
  23. EventObject_.Wait();
  24. }
  25. bool Await(TDuration timeout) {
  26. return EventObject_.WaitT(timeout);
  27. }
  28. };
  29. template <bool SingleConsumer>
  30. struct TDequeueAllTester {
  31. size_t EnqueueThreads;
  32. size_t DequeueThreads;
  33. size_t EnqueuesPerThread;
  34. std::atomic<size_t> LeftToDequeue;
  35. TCountDownLatch StartLatch;
  36. TLockFreeStack<int> Stack;
  37. TDequeueAllTester()
  38. : EnqueueThreads(4)
  39. , DequeueThreads(SingleConsumer ? 1 : 3)
  40. , EnqueuesPerThread(100000)
  41. , LeftToDequeue(EnqueueThreads * EnqueuesPerThread)
  42. , StartLatch(EnqueueThreads + DequeueThreads)
  43. {
  44. }
  45. void Enqueuer() {
  46. StartLatch.CountDown();
  47. StartLatch.Await();
  48. for (size_t i = 0; i < EnqueuesPerThread; ++i) {
  49. Stack.Enqueue(i);
  50. }
  51. }
  52. void DequeuerAll() {
  53. StartLatch.CountDown();
  54. StartLatch.Await();
  55. TVector<int> temp;
  56. while (LeftToDequeue.load() > 0) {
  57. size_t dequeued = 0;
  58. for (size_t i = 0; i < 100; ++i) {
  59. temp.clear();
  60. if (SingleConsumer) {
  61. Stack.DequeueAllSingleConsumer(&temp);
  62. } else {
  63. Stack.DequeueAll(&temp);
  64. }
  65. dequeued += temp.size();
  66. }
  67. LeftToDequeue -= dequeued;
  68. }
  69. }
  70. void Run() {
  71. TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<>>> futures;
  72. for (size_t i = 0; i < EnqueueThreads; ++i) {
  73. futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester<SingleConsumer>::Enqueuer, this)));
  74. }
  75. for (size_t i = 0; i < DequeueThreads; ++i) {
  76. futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester<SingleConsumer>::DequeuerAll, this)));
  77. }
  78. // effectively join
  79. futures.clear();
  80. UNIT_ASSERT_VALUES_EQUAL(0, int(LeftToDequeue.load()));
  81. TVector<int> left;
  82. Stack.DequeueAll(&left);
  83. UNIT_ASSERT(left.empty());
  84. }
  85. };
  86. Y_UNIT_TEST(TestDequeueAll) {
  87. TDequeueAllTester<false>().Run();
  88. }
  89. Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
  90. TDequeueAllTester<true>().Run();
  91. }
  92. Y_UNIT_TEST(TestDequeueAllEmptyStack) {
  93. TLockFreeStack<int> stack;
  94. TVector<int> r;
  95. stack.DequeueAll(&r);
  96. UNIT_ASSERT(r.empty());
  97. }
  98. Y_UNIT_TEST(TestDequeueAllReturnsInReverseOrder) {
  99. TLockFreeStack<int> stack;
  100. stack.Enqueue(17);
  101. stack.Enqueue(19);
  102. stack.Enqueue(23);
  103. TVector<int> r;
  104. stack.DequeueAll(&r);
  105. UNIT_ASSERT_VALUES_EQUAL(size_t(3), r.size());
  106. UNIT_ASSERT_VALUES_EQUAL(23, r.at(0));
  107. UNIT_ASSERT_VALUES_EQUAL(19, r.at(1));
  108. UNIT_ASSERT_VALUES_EQUAL(17, r.at(2));
  109. }
  110. Y_UNIT_TEST(TestEnqueueAll) {
  111. TLockFreeStack<int> stack;
  112. TVector<int> v;
  113. TVector<int> expected;
  114. stack.EnqueueAll(v); // add empty
  115. v.push_back(2);
  116. v.push_back(3);
  117. v.push_back(5);
  118. expected.insert(expected.end(), v.begin(), v.end());
  119. stack.EnqueueAll(v);
  120. v.clear();
  121. stack.EnqueueAll(v); // add empty
  122. v.push_back(7);
  123. v.push_back(11);
  124. v.push_back(13);
  125. v.push_back(17);
  126. expected.insert(expected.end(), v.begin(), v.end());
  127. stack.EnqueueAll(v);
  128. TVector<int> actual;
  129. stack.DequeueAll(&actual);
  130. UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size());
  131. for (size_t i = 0; i < actual.size(); ++i) {
  132. UNIT_ASSERT_VALUES_EQUAL(expected.at(expected.size() - i - 1), actual.at(i));
  133. }
  134. }
  135. Y_UNIT_TEST(CleanInDestructor) {
  136. TSimpleSharedPtr<bool> p(new bool);
  137. UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());
  138. {
  139. TLockFreeStack<TSimpleSharedPtr<bool>> stack;
  140. stack.Enqueue(p);
  141. stack.Enqueue(p);
  142. UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount());
  143. }
  144. UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount());
  145. }
  146. Y_UNIT_TEST(NoCopyTest) {
  147. static unsigned copied = 0;
  148. struct TCopyCount {
  149. TCopyCount(int) {
  150. }
  151. TCopyCount(const TCopyCount&) {
  152. ++copied;
  153. }
  154. TCopyCount(TCopyCount&&) {
  155. }
  156. TCopyCount& operator=(const TCopyCount&) {
  157. ++copied;
  158. return *this;
  159. }
  160. TCopyCount& operator=(TCopyCount&&) {
  161. return *this;
  162. }
  163. };
  164. TLockFreeStack<TCopyCount> stack;
  165. stack.Enqueue(TCopyCount(1));
  166. TCopyCount val(0);
  167. stack.Dequeue(&val);
  168. UNIT_ASSERT_VALUES_EQUAL(0, copied);
  169. }
  170. Y_UNIT_TEST(MoveOnlyTest) {
  171. TLockFreeStack<THolder<bool>> stack;
  172. stack.Enqueue(MakeHolder<bool>(true));
  173. THolder<bool> val;
  174. stack.Dequeue(&val);
  175. UNIT_ASSERT(val);
  176. UNIT_ASSERT_VALUES_EQUAL(true, *val);
  177. }
  178. template <class TTest>
  179. struct TMultiThreadTester {
  180. using ThisType = TMultiThreadTester<TTest>;
  181. size_t Threads;
  182. size_t OperationsPerThread;
  183. TCountDownLatch StartLatch;
  184. TLockFreeStack<typename TTest::ValueType> Stack;
  185. TMultiThreadTester()
  186. : Threads(10)
  187. , OperationsPerThread(100000)
  188. , StartLatch(Threads)
  189. {
  190. }
  191. void Worker() {
  192. StartLatch.CountDown();
  193. StartLatch.Await();
  194. TVector<typename TTest::ValueType> unused;
  195. for (size_t i = 0; i < OperationsPerThread; ++i) {
  196. switch (GetCycleCount() % 4) {
  197. case 0: {
  198. TTest::Enqueue(Stack, i);
  199. break;
  200. }
  201. case 1: {
  202. TTest::Dequeue(Stack);
  203. break;
  204. }
  205. case 2: {
  206. TTest::EnqueueAll(Stack);
  207. break;
  208. }
  209. case 3: {
  210. TTest::DequeueAll(Stack);
  211. break;
  212. }
  213. }
  214. }
  215. }
  216. void Run() {
  217. TDeque<NThreading::TLegacyFuture<>> futures;
  218. for (size_t i = 0; i < Threads; ++i) {
  219. futures.emplace_back(std::bind(&ThisType::Worker, this));
  220. }
  221. futures.clear();
  222. TTest::DequeueAll(Stack);
  223. }
  224. };
  225. struct TFreeListTest {
  226. using ValueType = int;
  227. static void Enqueue(TLockFreeStack<int>& stack, size_t i) {
  228. stack.Enqueue(static_cast<int>(i));
  229. }
  230. static void Dequeue(TLockFreeStack<int>& stack) {
  231. int value;
  232. stack.Dequeue(&value);
  233. }
  234. static void EnqueueAll(TLockFreeStack<int>& stack) {
  235. TVector<int> values(5);
  236. stack.EnqueueAll(values);
  237. }
  238. static void DequeueAll(TLockFreeStack<int>& stack) {
  239. TVector<int> value;
  240. stack.DequeueAll(&value);
  241. }
  242. };
  243. // Test for catching thread sanitizer problems
  244. Y_UNIT_TEST(TestFreeList) {
  245. TMultiThreadTester<TFreeListTest>().Run();
  246. }
  247. struct TMoveTest {
  248. using ValueType = THolder<int>;
  249. static void Enqueue(TLockFreeStack<ValueType>& stack, size_t i) {
  250. stack.Enqueue(MakeHolder<int>(static_cast<int>(i)));
  251. }
  252. static void Dequeue(TLockFreeStack<ValueType>& stack) {
  253. ValueType value;
  254. if (stack.Dequeue(&value)) {
  255. UNIT_ASSERT(value);
  256. }
  257. }
  258. static void EnqueueAll(TLockFreeStack<ValueType>& stack) {
  259. // there is no enqueAll with moving signature in LockFreeStack
  260. Enqueue(stack, 0);
  261. }
  262. static void DequeueAll(TLockFreeStack<ValueType>& stack) {
  263. TVector<ValueType> values;
  264. stack.DequeueAll(&values);
  265. for (auto& v : values) {
  266. UNIT_ASSERT(v);
  267. }
  268. }
  269. };
  270. // Test for catching thread sanitizer problems
  271. Y_UNIT_TEST(TesMultiThreadMove) {
  272. TMultiThreadTester<TMoveTest>().Run();
  273. }
  274. }