123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- #include "mutex.h"
- #include "guard.h"
- #include "condvar.h"
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/system/atomic.h>
- #include <util/system/atomic_ops.h>
- #include <util/thread/pool.h>
- class TCondVarTest: public TTestBase {
- UNIT_TEST_SUITE(TCondVarTest);
- UNIT_TEST(TestBasics)
- UNIT_TEST(TestSyncronize)
- UNIT_TEST_SUITE_END();
- struct TSharedData {
- TSharedData()
- : stopWaiting(false)
- , in(0)
- , out(0)
- , waited(0)
- , failed(false)
- {
- }
- TMutex mutex;
- TCondVar condVar1;
- TCondVar condVar2;
- TAtomic stopWaiting;
- TAtomic in;
- TAtomic out;
- TAtomic waited;
- bool failed;
- };
- class TThreadTask: public IObjectInQueue {
- public:
- using PFunc = void (TThreadTask::*)(void);
- TThreadTask(PFunc func, size_t id, size_t totalIds, TSharedData& data)
- : Func_(func)
- , Id_(id)
- , TotalIds_(totalIds)
- , Data_(data)
- {
- }
- void Process(void*) override {
- THolder<TThreadTask> This(this);
- (this->*Func_)();
- }
- #define FAIL_ASSERT(cond) \
- if (!(cond)) { \
- Data_.failed = true; \
- }
- void RunBasics() {
- Y_ASSERT(TotalIds_ == 3);
- if (Id_ < 2) {
- TGuard<TMutex> guard(Data_.mutex);
- while (!AtomicGet(Data_.stopWaiting)) {
- bool res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1));
- FAIL_ASSERT(res == true);
- }
- } else {
- usleep(100000);
- AtomicSet(Data_.stopWaiting, true);
- TGuard<TMutex> guard(Data_.mutex);
- Data_.condVar1.Signal();
- Data_.condVar1.Signal();
- }
- }
- void RunBasicsWithPredicate() {
- Y_ASSERT(TotalIds_ == 3);
- if (Id_ < 2) {
- TGuard<TMutex> guard(Data_.mutex);
- const auto res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1), [&] {
- return AtomicGet(Data_.stopWaiting);
- });
- FAIL_ASSERT(res == true);
- } else {
- usleep(100000);
- AtomicSet(Data_.stopWaiting, true);
- TGuard<TMutex> guard(Data_.mutex);
- Data_.condVar1.Signal();
- Data_.condVar1.Signal();
- }
- }
- void RunSyncronize() {
- for (size_t i = 0; i < 10; ++i) {
- TGuard<TMutex> guard(Data_.mutex);
- AtomicIncrement(Data_.in);
- if (AtomicGet(Data_.in) == TotalIds_) {
- AtomicSet(Data_.out, 0);
- Data_.condVar1.BroadCast();
- } else {
- AtomicIncrement(Data_.waited);
- while (AtomicGet(Data_.in) < TotalIds_) {
- bool res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1));
- FAIL_ASSERT(res == true);
- }
- }
- AtomicIncrement(Data_.out);
- if (AtomicGet(Data_.out) == TotalIds_) {
- AtomicSet(Data_.in, 0);
- Data_.condVar2.BroadCast();
- } else {
- while (AtomicGet(Data_.out) < TotalIds_) {
- bool res = Data_.condVar2.WaitT(Data_.mutex, TDuration::Seconds(1));
- FAIL_ASSERT(res == true);
- }
- }
- }
- FAIL_ASSERT(AtomicGet(Data_.waited) == (TotalIds_ - 1) * 10);
- }
- void RunSyncronizeWithPredicate() {
- for (size_t i = 0; i < 10; ++i) {
- TGuard<TMutex> guard(Data_.mutex);
- AtomicIncrement(Data_.in);
- if (AtomicGet(Data_.in) == TotalIds_) {
- AtomicSet(Data_.out, 0);
- Data_.condVar1.BroadCast();
- } else {
- AtomicIncrement(Data_.waited);
- const auto res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1), [&] {
- return AtomicGet(Data_.in) >= TotalIds_;
- });
- FAIL_ASSERT(res == true);
- }
- AtomicIncrement(Data_.out);
- if (AtomicGet(Data_.out) == TotalIds_) {
- AtomicSet(Data_.in, 0);
- Data_.condVar2.BroadCast();
- } else {
- const auto res = Data_.condVar2.WaitT(Data_.mutex, TDuration::Seconds(1), [&] {
- return AtomicGet(Data_.out) >= TotalIds_;
- });
- FAIL_ASSERT(res == true);
- }
- }
- FAIL_ASSERT(Data_.waited == (TotalIds_ - 1) * 10);
- }
- #undef FAIL_ASSERT
- private:
- PFunc Func_;
- size_t Id_;
- TAtomicBase TotalIds_;
- TSharedData& Data_;
- };
- private:
- #define RUN_CYCLE(what, count) \
- Q_.Start(count); \
- for (size_t i = 0; i < count; ++i) { \
- UNIT_ASSERT(Q_.Add(new TThreadTask(&TThreadTask::what, i, count, Data_))); \
- } \
- Q_.Stop(); \
- bool b = Data_.failed; \
- Data_.failed = false; \
- UNIT_ASSERT(!b);
- inline void TestBasics() {
- RUN_CYCLE(RunBasics, 3);
- }
- inline void TestBasicsWithPredicate() {
- RUN_CYCLE(RunBasicsWithPredicate, 3);
- }
- inline void TestSyncronize() {
- RUN_CYCLE(RunSyncronize, 6);
- }
- inline void TestSyncronizeWithPredicate() {
- RUN_CYCLE(RunSyncronizeWithPredicate, 6);
- }
- #undef RUN_CYCLE
- TSharedData Data_;
- TThreadPool Q_;
- };
- UNIT_TEST_SUITE_REGISTRATION(TCondVarTest);
|