123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/system/thread.h>
- #include "ut_helpers.h"
- typedef void* TMsgLink;
- template <typename TQueueType>
- class TQueueTestProcs: public TTestBase {
- private:
- UNIT_TEST_SUITE_DEMANGLE(TQueueTestProcs<TQueueType>);
- UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
- UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
- UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
- /*
- UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
- */
- UNIT_TEST_SUITE_END();
- public:
- void Push1M_Pop1M() {
- TQueueType queue;
- TMsgLink msg = &queue;
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- for (int i = 0; i < 1000000; ++i) {
- queue.Push((char*)msg + i);
- }
- for (int i = 0; i < 1000000; ++i) {
- auto popped = queue.Pop();
- UNIT_ASSERT_EQUAL((char*)msg + i, popped);
- }
- pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
- void Threads2_Push1M_Threads1_Pop2M() {
- TQueueType queue;
- class TPusherThread: public ISimpleThread {
- public:
- TPusherThread(TQueueType& theQueue, char* start)
- : Queue(theQueue)
- , Arg(start)
- {
- }
- TQueueType& Queue;
- char* Arg;
- void* ThreadProc() override {
- for (int i = 0; i < 1000000; ++i) {
- Queue.Push(Arg + i);
- }
- return nullptr;
- }
- };
- TPusherThread pusher1(queue, (char*)&queue);
- TPusherThread pusher2(queue, (char*)&queue + 2000000);
- pusher1.Start();
- pusher2.Start();
- for (int i = 0; i < 2000000; ++i) {
- while (queue.Pop() == nullptr) {
- SpinLockPause();
- }
- }
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
- void Threads4_Push1M_Threads1_Pop4M() {
- TQueueType queue;
- class TPusherThread: public ISimpleThread {
- public:
- TPusherThread(TQueueType& theQueue, char* start)
- : Queue(theQueue)
- , Arg(start)
- {
- }
- TQueueType& Queue;
- char* Arg;
- void* ThreadProc() override {
- for (int i = 0; i < 1000000; ++i) {
- Queue.Push(Arg + i);
- }
- return nullptr;
- }
- };
- TPusherThread pusher1(queue, (char*)&queue);
- TPusherThread pusher2(queue, (char*)&queue + 2000000);
- TPusherThread pusher3(queue, (char*)&queue + 4000000);
- TPusherThread pusher4(queue, (char*)&queue + 6000000);
- pusher1.Start();
- pusher2.Start();
- pusher3.Start();
- pusher4.Start();
- for (int i = 0; i < 4000000; ++i) {
- while (queue.Pop() == nullptr) {
- SpinLockPause();
- }
- }
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
- template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>
- void ManyRndPush100K_ManyQueues() {
- TQueueType queue[NUMBER_OF_QUEUES];
- class TPusherThread: public ISimpleThread {
- public:
- TPusherThread(TQueueType* queues, char* start)
- : Queues(queues)
- , Arg(start)
- {
- }
- TQueueType* Queues;
- char* Arg;
- void* ThreadProc() override {
- ui64 counters[NUMBER_OF_QUEUES];
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- counters[i] = 0;
- }
- for (int i = 0; i < 100000; ++i) {
- size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;
- int cookie = counters[rnd]++;
- Queues[rnd].Push(Arg + cookie);
- }
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- Queues[i].Push((void*)2ULL);
- }
- return nullptr;
- }
- };
- class TPopperThread: public ISimpleThread {
- public:
- TPopperThread(TQueueType* theQueue, char* base)
- : Queue(theQueue)
- , Base(base)
- {
- }
- TQueueType* Queue;
- char* Base;
- void* ThreadProc() override {
- ui64 counters[NUMBER_OF_PUSHERS];
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- counters[i] = 0;
- }
- for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {
- auto msg = Queue->Pop();
- if (msg == nullptr) {
- SpinLockPause();
- continue;
- }
- if (msg == (void*)2ULL) {
- ++fin;
- continue;
- }
- ui64 shift = (char*)msg - Base;
- auto pusherNum = shift / 200000000ULL;
- auto msgNum = shift % 200000000ULL;
- UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);
- ++counters[pusherNum];
- }
- auto pmsg = Queue->Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- return nullptr;
- }
- };
- TVector<TAutoPtr<TPopperThread>> poppers;
- TVector<TAutoPtr<TPusherThread>> pushers;
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));
- poppers.back()->Start();
- }
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- pushers.emplace_back(
- new TPusherThread(queue, (char*)&queue + 200000000ULL * i));
- pushers.back()->Start();
- }
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- poppers[i]->Join();
- }
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- pushers[i]->Join();
- }
- }
- void Threads8_RndPush100K_Threads8_Queues() {
- ManyRndPush100K_ManyQueues<8, 8>();
- }
- /*
- void Threads24_RndPush100K_Threads24_Queues() {
- ManyRndPush100K_ManyQueues<24, 24>();
- }
- void Threads24_RndPush100K_Threads8_Queues() {
- ManyRndPush100K_ManyQueues<24, 8>();
- }
- void Threads24_RndPush100K_Threads4_Queues() {
- ManyRndPush100K_ManyQueues<24, 4>();
- }
- */
- };
- REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs);
|