123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/system/thread.h>
- #include <algorithm>
- #include <util/generic/vector.h>
- #include <util/random/fast.h>
- #include "ut_helpers.h"
- template <typename TQueueType>
- class TTestUnorderedQueue: public TTestBase {
- private:
- using TLink = TIntrusiveLink;
- UNIT_TEST_SUITE_DEMANGLE(TTestUnorderedQueue<TQueueType>);
- UNIT_TEST(Push1M_Pop1M_Unordered)
- UNIT_TEST_SUITE_END();
- public:
- void Push1M_Pop1M_Unordered() {
- constexpr int REPEAT = 1000000;
- TQueueType queue;
- TLink msg[REPEAT];
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- for (int i = 0; i < REPEAT; ++i) {
- queue.Push(&msg[i]);
- }
- TVector<TLink*> popped;
- popped.reserve(REPEAT);
- for (int i = 0; i < REPEAT; ++i) {
- popped.push_back((TLink*)queue.Pop());
- }
- pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- std::sort(popped.begin(), popped.end());
- for (int i = 0; i < REPEAT; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]);
- }
- }
- };
- template <typename TQueueType>
- class TTestWeakQueue: public TTestBase {
- private:
- UNIT_TEST_SUITE_DEMANGLE(TTestWeakQueue<TQueueType>);
- UNIT_TEST(Threads8_Rnd_Exchange)
- UNIT_TEST_SUITE_END();
- public:
- template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000>
- void ManyThreadsRndExchange() {
- TQueueType queues[COUNT];
- class TWorker: public ISimpleThread {
- public:
- TWorker(
- TQueueType* queues_,
- ui16 mine,
- TAtomic* pushDone)
- : Queues(queues_)
- , MineQueue(mine)
- , PushDone(pushDone)
- {
- }
- TQueueType* Queues;
- ui16 MineQueue;
- TVector<uintptr_t> Received;
- TAtomic* PushDone;
- void* ThreadProc() override {
- TReallyFastRng32 rng(GetCycleCount());
- Received.reserve(MSG_COUNT * 2);
- for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) {
- for (;;) {
- auto msg = Queues[MineQueue].Pop();
- if (msg == nullptr) {
- break;
- }
- Received.push_back((uintptr_t)msg);
- }
- ui16 rnd = rng.GenRand64() % COUNT;
- ui64 msg = ((ui64)MineQueue << 32) + loop;
- while (!Queues[rnd].Push((void*)msg)) {
- }
- }
- AtomicIncrement(*PushDone);
- for (;;) {
- bool isItLast = AtomicGet(*PushDone) == COUNT;
- auto msg = Queues[MineQueue].Pop();
- if (msg != nullptr) {
- Received.push_back((uintptr_t)msg);
- } else {
- if (isItLast) {
- break;
- }
- SpinLockPause();
- }
- }
- for (ui64 last = 0;;) {
- auto msg = Queues[MineQueue].UnsafeScanningPop(&last);
- if (msg == nullptr) {
- break;
- }
- Received.push_back((uintptr_t)msg);
- }
- return nullptr;
- }
- };
- TVector<TAutoPtr<TWorker>> workers;
- TAtomic pushDone = 0;
- for (ui32 i = 0; i < COUNT; ++i) {
- workers.emplace_back(new TWorker(&queues[0], i, &pushDone));
- workers.back()->Start();
- }
- TVector<uintptr_t> all;
- for (ui32 i = 0; i < COUNT; ++i) {
- workers[i]->Join();
- all.insert(all.begin(),
- workers[i]->Received.begin(), workers[i]->Received.end());
- }
- std::sort(all.begin(), all.end());
- auto iter = all.begin();
- for (ui32 i = 0; i < COUNT; ++i) {
- for (ui32 k = 1; k <= MSG_COUNT; ++k) {
- UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter);
- ++iter;
- }
- }
- }
- void Threads8_Rnd_Exchange() {
- ManyThreadsRndExchange<8>();
- }
- };
- REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue);
- UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>);
|