#include "lfstack.h" #include #include #include #include #include Y_UNIT_TEST_SUITE(TLockFreeStackTests) { class TCountDownLatch { private: std::atomic Current_; TSystemEvent EventObject_; public: TCountDownLatch(unsigned initial) : Current_(initial) { } void CountDown() { if (--Current_ == 0) { EventObject_.Signal(); } } void Await() { EventObject_.Wait(); } bool Await(TDuration timeout) { return EventObject_.WaitT(timeout); } }; template struct TDequeueAllTester { size_t EnqueueThreads; size_t DequeueThreads; size_t EnqueuesPerThread; std::atomic LeftToDequeue; TCountDownLatch StartLatch; TLockFreeStack Stack; TDequeueAllTester() : EnqueueThreads(4) , DequeueThreads(SingleConsumer ? 1 : 3) , EnqueuesPerThread(100000) , LeftToDequeue(EnqueueThreads * EnqueuesPerThread) , StartLatch(EnqueueThreads + DequeueThreads) { } void Enqueuer() { StartLatch.CountDown(); StartLatch.Await(); for (size_t i = 0; i < EnqueuesPerThread; ++i) { Stack.Enqueue(i); } } void DequeuerAll() { StartLatch.CountDown(); StartLatch.Await(); TVector temp; while (LeftToDequeue.load() > 0) { size_t dequeued = 0; for (size_t i = 0; i < 100; ++i) { temp.clear(); if (SingleConsumer) { Stack.DequeueAllSingleConsumer(&temp); } else { Stack.DequeueAll(&temp); } dequeued += temp.size(); } LeftToDequeue -= dequeued; } } void Run() { TVector>> futures; for (size_t i = 0; i < EnqueueThreads; ++i) { futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester::Enqueuer, this))); } for (size_t i = 0; i < DequeueThreads; ++i) { futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester::DequeuerAll, this))); } // effectively join futures.clear(); UNIT_ASSERT_VALUES_EQUAL(0, int(LeftToDequeue.load())); TVector left; Stack.DequeueAll(&left); UNIT_ASSERT(left.empty()); } }; Y_UNIT_TEST(TestDequeueAll) { TDequeueAllTester().Run(); } Y_UNIT_TEST(TestDequeueAllSingleConsumer) { TDequeueAllTester().Run(); } Y_UNIT_TEST(TestDequeueAllEmptyStack) { TLockFreeStack stack; TVector r; stack.DequeueAll(&r); UNIT_ASSERT(r.empty()); } Y_UNIT_TEST(TestDequeueAllReturnsInReverseOrder) { TLockFreeStack stack; stack.Enqueue(17); stack.Enqueue(19); stack.Enqueue(23); TVector r; stack.DequeueAll(&r); UNIT_ASSERT_VALUES_EQUAL(size_t(3), r.size()); UNIT_ASSERT_VALUES_EQUAL(23, r.at(0)); UNIT_ASSERT_VALUES_EQUAL(19, r.at(1)); UNIT_ASSERT_VALUES_EQUAL(17, r.at(2)); } Y_UNIT_TEST(TestEnqueueAll) { TLockFreeStack stack; TVector v; TVector expected; stack.EnqueueAll(v); // add empty v.push_back(2); v.push_back(3); v.push_back(5); expected.insert(expected.end(), v.begin(), v.end()); stack.EnqueueAll(v); v.clear(); stack.EnqueueAll(v); // add empty v.push_back(7); v.push_back(11); v.push_back(13); v.push_back(17); expected.insert(expected.end(), v.begin(), v.end()); stack.EnqueueAll(v); TVector actual; stack.DequeueAll(&actual); UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size()); for (size_t i = 0; i < actual.size(); ++i) { UNIT_ASSERT_VALUES_EQUAL(expected.at(expected.size() - i - 1), actual.at(i)); } } Y_UNIT_TEST(CleanInDestructor) { TSimpleSharedPtr p(new bool); UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount()); { TLockFreeStack> stack; stack.Enqueue(p); stack.Enqueue(p); UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount()); } UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount()); } Y_UNIT_TEST(NoCopyTest) { static unsigned copied = 0; struct TCopyCount { TCopyCount(int) { } TCopyCount(const TCopyCount&) { ++copied; } TCopyCount(TCopyCount&&) { } TCopyCount& operator=(const TCopyCount&) { ++copied; return *this; } TCopyCount& operator=(TCopyCount&&) { return *this; } }; TLockFreeStack stack; stack.Enqueue(TCopyCount(1)); TCopyCount val(0); stack.Dequeue(&val); UNIT_ASSERT_VALUES_EQUAL(0, copied); } Y_UNIT_TEST(MoveOnlyTest) { TLockFreeStack> stack; stack.Enqueue(MakeHolder(true)); THolder val; stack.Dequeue(&val); UNIT_ASSERT(val); UNIT_ASSERT_VALUES_EQUAL(true, *val); } template struct TMultiThreadTester { using ThisType = TMultiThreadTester; size_t Threads; size_t OperationsPerThread; TCountDownLatch StartLatch; TLockFreeStack Stack; TMultiThreadTester() : Threads(10) , OperationsPerThread(100000) , StartLatch(Threads) { } void Worker() { StartLatch.CountDown(); StartLatch.Await(); TVector unused; for (size_t i = 0; i < OperationsPerThread; ++i) { switch (GetCycleCount() % 4) { case 0: { TTest::Enqueue(Stack, i); break; } case 1: { TTest::Dequeue(Stack); break; } case 2: { TTest::EnqueueAll(Stack); break; } case 3: { TTest::DequeueAll(Stack); break; } } } } void Run() { TDeque> futures; for (size_t i = 0; i < Threads; ++i) { futures.emplace_back(std::bind(&ThisType::Worker, this)); } futures.clear(); TTest::DequeueAll(Stack); } }; struct TFreeListTest { using ValueType = int; static void Enqueue(TLockFreeStack& stack, size_t i) { stack.Enqueue(static_cast(i)); } static void Dequeue(TLockFreeStack& stack) { int value; stack.Dequeue(&value); } static void EnqueueAll(TLockFreeStack& stack) { TVector values(5); stack.EnqueueAll(values); } static void DequeueAll(TLockFreeStack& stack) { TVector value; stack.DequeueAll(&value); } }; // Test for catching thread sanitizer problems Y_UNIT_TEST(TestFreeList) { TMultiThreadTester().Run(); } struct TMoveTest { using ValueType = THolder; static void Enqueue(TLockFreeStack& stack, size_t i) { stack.Enqueue(MakeHolder(static_cast(i))); } static void Dequeue(TLockFreeStack& stack) { ValueType value; if (stack.Dequeue(&value)) { UNIT_ASSERT(value); } } static void EnqueueAll(TLockFreeStack& stack) { // there is no enqueAll with moving signature in LockFreeStack Enqueue(stack, 0); } static void DequeueAll(TLockFreeStack& stack) { TVector values; stack.DequeueAll(&values); for (auto& v : values) { UNIT_ASSERT(v); } } }; // Test for catching thread sanitizer problems Y_UNIT_TEST(TesMultiThreadMove) { TMultiThreadTester().Run(); } } // Y_UNIT_TEST_SUITE(TLockFreeStackTests)