#include #include #include #include #include #include "ut_helpers.h" template class TTestUnorderedQueue: public TTestBase { private: using TLink = TIntrusiveLink; UNIT_TEST_SUITE_DEMANGLE(TTestUnorderedQueue); UNIT_TEST(Push1M_Pop1M_Unordered) UNIT_TEST_SUITE_END(); public: void Push1M_Pop1M_Unordered() { constexpr int REPEAT = 1000000; TQueueType queue; TLink msg[REPEAT]; auto pmsg = queue.Pop(); UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); for (int i = 0; i < REPEAT; ++i) { queue.Push(&msg[i]); } TVector popped; popped.reserve(REPEAT); for (int i = 0; i < REPEAT; ++i) { popped.push_back((TLink*)queue.Pop()); } pmsg = queue.Pop(); UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); std::sort(popped.begin(), popped.end()); for (int i = 0; i < REPEAT; ++i) { UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]); } } }; template class TTestWeakQueue: public TTestBase { private: UNIT_TEST_SUITE_DEMANGLE(TTestWeakQueue); UNIT_TEST(Threads8_Rnd_Exchange) UNIT_TEST_SUITE_END(); public: template void ManyThreadsRndExchange() { TQueueType queues[COUNT]; class TWorker: public ISimpleThread { public: TWorker( TQueueType* queues_, ui16 mine, TAtomic* pushDone) : Queues(queues_) , MineQueue(mine) , PushDone(pushDone) { } TQueueType* Queues; ui16 MineQueue; TVector Received; TAtomic* PushDone; void* ThreadProc() override { TReallyFastRng32 rng(GetCycleCount()); Received.reserve(MSG_COUNT * 2); for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) { for (;;) { auto msg = Queues[MineQueue].Pop(); if (msg == nullptr) { break; } Received.push_back((uintptr_t)msg); } ui16 rnd = rng.GenRand64() % COUNT; ui64 msg = ((ui64)MineQueue << 32) + loop; while (!Queues[rnd].Push((void*)msg)) { } } AtomicIncrement(*PushDone); for (;;) { bool isItLast = AtomicGet(*PushDone) == COUNT; auto msg = Queues[MineQueue].Pop(); if (msg != nullptr) { Received.push_back((uintptr_t)msg); } else { if (isItLast) { break; } SpinLockPause(); } } for (ui64 last = 0;;) { auto msg = Queues[MineQueue].UnsafeScanningPop(&last); if (msg == nullptr) { break; } Received.push_back((uintptr_t)msg); } return nullptr; } }; TVector> workers; TAtomic pushDone = 0; for (ui32 i = 0; i < COUNT; ++i) { workers.emplace_back(new TWorker(&queues[0], i, &pushDone)); workers.back()->Start(); } TVector all; for (ui32 i = 0; i < COUNT; ++i) { workers[i]->Join(); all.insert(all.begin(), workers[i]->Received.begin(), workers[i]->Received.end()); } std::sort(all.begin(), all.end()); auto iter = all.begin(); for (ui32 i = 0; i < COUNT; ++i) { for (ui32 k = 1; k <= MSG_COUNT; ++k) { UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter); ++iter; } } } void Threads8_Rnd_Exchange() { ManyThreadsRndExchange<8>(); } }; REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue); UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue);