123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- #include "equeue.h"
- #include <library/cpp/threading/equeue/fast/equeue.h>
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/system/event.h>
- #include <util/datetime/base.h>
- #include <util/generic/vector.h>
- Y_UNIT_TEST_SUITE(TElasticQueueTest) {
- const size_t MaxQueueSize = 20;
- const size_t ThreadCount = 10;
- template <typename T>
- THolder<T> MakeQueue();
- template <>
- THolder<TElasticQueue> MakeQueue() {
- return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>());
- }
- template <>
- THolder<TFastElasticQueue> MakeQueue() {
- return MakeHolder<TFastElasticQueue>();
- }
- template <typename T>
- struct TEnv {
- static inline THolder<T> Queue;
- struct TQueueSetup {
- TQueueSetup() {
- Queue.Reset(MakeQueue<T>());
- Queue->Start(ThreadCount, MaxQueueSize);
- }
- ~TQueueSetup() {
- Queue->Stop();
- }
- };
- };
- struct TCounters {
- void Reset() {
- Processed = Scheduled = Discarded = Total = 0;
- }
- TAtomic Processed;
- TAtomic Scheduled;
- TAtomic Discarded;
- TAtomic Total;
- };
- static TCounters Counters;
- //fill test -- fill queue with "endless" jobs
- TSystemEvent WaitEvent;
- template <typename T>
- void FillTest() {
- Counters.Reset();
- struct TWaitJob: public IObjectInQueue {
- void Process(void*) override {
- WaitEvent.Wait();
- AtomicIncrement(Counters.Processed);
- }
- } job;
- struct TLocalSetup: TEnv<T>::TQueueSetup {
- TLocalSetup() {
- WaitEvent.Reset();
- }
- ~TLocalSetup() {
- WaitEvent.Signal();
- }
- };
- size_t enqueued = 0;
- {
- TLocalSetup setup;
- while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
- ++enqueued;
- }
- UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
- UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount());
- }
- UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
- UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
- UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
- }
- Y_UNIT_TEST(FillTest) {
- FillTest<TElasticQueue>();
- }
- Y_UNIT_TEST(FillTestFast) {
- FillTest<TFastElasticQueue>();
- }
- //concurrent test -- send many jobs from different threads
- struct TJob: public IObjectInQueue {
- void Process(void*) override {
- AtomicIncrement(Counters.Processed);
- }
- };
- static TJob Job;
- template <typename T>
- static bool TryAdd() {
- AtomicIncrement(Counters.Total);
- if (TEnv<T>::Queue->Add(&Job)) {
- AtomicIncrement(Counters.Scheduled);
- return true;
- } else {
- AtomicIncrement(Counters.Discarded);
- return false;
- }
- }
- const size_t N = 100000;
- static size_t TryCounter;
- template <typename T>
- void ConcurrentTest() {
- Counters.Reset();
- TryCounter = 0;
- struct TSender: public IThreadFactory::IThreadAble {
- void DoExecute() override {
- while ((size_t)AtomicIncrement(TryCounter) <= N) {
- if (!TryAdd<T>()) {
- Sleep(TDuration::MicroSeconds(50));
- }
- }
- }
- } sender;
- {
- typename TEnv<T>::TQueueSetup setup;
- TVector< TAutoPtr<IThreadFactory::IThread> > senders;
- for (size_t i = 0; i < ThreadCount; ++i) {
- senders.push_back(::SystemThreadFactory()->Run(&sender));
- }
- for (size_t i = 0; i < senders.size(); ++i) {
- senders[i]->Join();
- }
- }
- UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N);
- UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
- UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
- }
- Y_UNIT_TEST(ConcurrentTest) {
- ConcurrentTest<TElasticQueue>();
- }
- Y_UNIT_TEST(ConcurrentTestFast) {
- ConcurrentTest<TFastElasticQueue>();
- }
- }
|