123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050 |
- #include "impl.h"
- #include "condvar.h"
- #include "network.h"
- #include <library/cpp/deprecated/atomic/atomic.h>
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/string/cast.h>
- #include <util/system/pipe.h>
- #include <util/system/env.h>
- #include <util/system/info.h>
- #include <util/system/thread.h>
- #include <util/generic/xrange.h>
- #include <util/generic/serialized_enum.h>
- // TODO (velavokr): BALANCER-1345 add more tests on pollers
- class TCoroTest: public TTestBase {
- UNIT_TEST_SUITE(TCoroTest);
- UNIT_TEST(TestSimpleX1);
- UNIT_TEST(TestSimpleX1MultiThread);
- UNIT_TEST(TestSimpleX2);
- UNIT_TEST(TestSimpleX3);
- UNIT_TEST(TestMemFun);
- UNIT_TEST(TestMutex);
- UNIT_TEST(TestCondVar);
- UNIT_TEST(TestJoinDefault);
- UNIT_TEST(TestJoinEpoll);
- UNIT_TEST(TestJoinKqueue);
- UNIT_TEST(TestJoinPoll);
- UNIT_TEST(TestJoinSelect);
- UNIT_TEST(TestException);
- UNIT_TEST(TestJoinCancelExitRaceBug);
- UNIT_TEST(TestWaitWakeLivelockBug);
- // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
- // UNIT_TEST(TestFastPathWakeDefault)
- // UNIT_TEST(TestFastPathWakeEpoll)
- UNIT_TEST(TestFastPathWakeKqueue)
- UNIT_TEST(TestFastPathWakePoll)
- UNIT_TEST(TestFastPathWakeSelect)
- UNIT_TEST(TestLegacyCancelYieldRaceBug)
- UNIT_TEST(TestJoinRescheduleBug);
- UNIT_TEST(TestEventQueue)
- UNIT_TEST(TestNestedExecutor)
- UNIT_TEST(TestComputeCoroutineYield)
- UNIT_TEST(TestPollEngines);
- UNIT_TEST(TestUserEvent);
- UNIT_TEST(TestPause);
- UNIT_TEST(TestOverrideTime);
- UNIT_TEST(TestCancelWithException);
- UNIT_TEST_SUITE_END();
- public:
- void TestException();
- void TestSimpleX1();
- void TestSimpleX1MultiThread();
- void TestSimpleX2();
- void TestSimpleX3();
- void TestMemFun();
- void TestMutex();
- void TestCondVar();
- void TestJoinDefault();
- void TestJoinEpoll();
- void TestJoinKqueue();
- void TestJoinPoll();
- void TestJoinSelect();
- void TestJoinCancelExitRaceBug();
- void TestWaitWakeLivelockBug();
- void TestFastPathWakeDefault();
- void TestFastPathWakeEpoll();
- void TestFastPathWakeKqueue();
- void TestFastPathWakePoll();
- void TestFastPathWakeSelect();
- void TestLegacyCancelYieldRaceBug();
- void TestJoinRescheduleBug();
- void TestEventQueue();
- void TestNestedExecutor();
- void TestComputeCoroutineYield();
- void TestPollEngines();
- void TestUserEvent();
- void TestPause();
- void TestOverrideTime();
- void TestCancelWithException();
- };
- void TCoroTest::TestException() {
- TContExecutor e(1000000);
- bool f2run = false;
- auto f1 = [&f2run](TCont* c) {
- struct TCtx {
- ~TCtx() {
- Y_ABORT_UNLESS(!*F2);
- C->Yield();
- }
- TCont* C;
- bool* F2;
- };
- try {
- TCtx ctx = {c, &f2run};
- throw 1;
- } catch (...) {
- }
- };
- bool unc = true;
- auto f2 = [&unc, &f2run](TCont*) {
- f2run = true;
- unc = std::uncaught_exception();
- // check segfault
- try {
- throw 2;
- } catch (int) {
- }
- };
- e.Create(f1, "f1");
- e.Create(f2, "f2");
- e.Execute();
- UNIT_ASSERT(!unc);
- }
- static int i0;
- static void CoRun(TCont* c, void* /*run*/) {
- while (i0 < 100000) {
- ++i0;
- UNIT_ASSERT(RunningCont() == c);
- c->Yield();
- UNIT_ASSERT(RunningCont() == c);
- }
- }
- static void CoMain(TCont* c, void* /*arg*/) {
- for (volatile size_t i2 = 0; i2 < 10; ++i2) {
- UNIT_ASSERT(RunningCont() == c);
- c->Executor()->Create(CoRun, nullptr, "run");
- UNIT_ASSERT(RunningCont() == c);
- }
- }
- void TCoroTest::TestSimpleX1() {
- i0 = 0;
- TContExecutor e(32000);
- UNIT_ASSERT(RunningCont() == nullptr);
- e.Execute(CoMain);
- UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
- UNIT_ASSERT(RunningCont() == nullptr);
- }
- void TCoroTest::TestSimpleX1MultiThread() {
- TVector<THolder<TThread>> threads;
- const size_t nThreads = 0;
- TAtomic c = 0;
- for (size_t i = 0; i < nThreads; ++i) {
- threads.push_back(MakeHolder<TThread>([&]() {
- TestSimpleX1();
- AtomicIncrement(c);
- }));
- }
- for (auto& t : threads) {
- t->Start();
- }
- for (auto& t: threads) {
- t->Join();
- }
- UNIT_ASSERT_EQUAL(c, nThreads);
- }
- struct TTestObject {
- int i = 0;
- int j = 0;
- public:
- void RunTask1(TCont*) {
- i = 1;
- }
- void RunTask2(TCont*) {
- j = 2;
- }
- };
- void TCoroTest::TestMemFun() {
- i0 = 0;
- TContExecutor e(32000);
- TTestObject obj;
- e.Create<TTestObject, &TTestObject::RunTask1>(&obj, "test1");
- e.Execute<TTestObject, &TTestObject::RunTask2>(&obj);
- UNIT_ASSERT_EQUAL(obj.i, 1);
- UNIT_ASSERT_EQUAL(obj.j, 2);
- }
- void TCoroTest::TestSimpleX2() {
- {
- i0 = 0;
- {
- TContExecutor e(32000);
- e.Execute(CoMain);
- }
- UNIT_ASSERT_EQUAL(i0, 100000);
- }
- {
- i0 = 0;
- {
- TContExecutor e(32000);
- e.Execute(CoMain);
- }
- UNIT_ASSERT_EQUAL(i0, 100000);
- }
- }
- struct TRunner {
- inline TRunner()
- : Runs(0)
- {
- }
- inline void operator()(TCont* c) {
- ++Runs;
- c->Yield();
- }
- size_t Runs;
- };
- void TCoroTest::TestSimpleX3() {
- TContExecutor e(32000);
- TRunner runner;
- for (volatile size_t i3 = 0; i3 < 1000; ++i3) {
- e.Create(runner, "runner");
- }
- e.Execute();
- UNIT_ASSERT_EQUAL(runner.Runs, 1000);
- }
- static TString res;
- static TContMutex mutex;
- static void CoMutex(TCont* c, void* /*run*/) {
- {
- mutex.LockI(c);
- c->Yield();
- res += c->Name();
- mutex.UnLock();
- }
- c->Yield();
- {
- mutex.LockI(c);
- c->Yield();
- res += c->Name();
- mutex.UnLock();
- }
- }
- static void CoMutexTest(TCont* c, void* /*run*/) {
- c->Executor()->Create(CoMutex, nullptr, "1");
- c->Executor()->Create(CoMutex, nullptr, "2");
- }
- void TCoroTest::TestMutex() {
- TContExecutor e(32000);
- e.Execute(CoMutexTest);
- UNIT_ASSERT_EQUAL(res, "1212");
- res.clear();
- }
- static TContMutex m1;
- static TContCondVar c1;
- static void CoCondVar(TCont* c, void* /*run*/) {
- for (size_t i4 = 0; i4 < 3; ++i4) {
- UNIT_ASSERT_EQUAL(m1.LockI(c), 0);
- UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0);
- res += c->Name();
- m1.UnLock();
- }
- }
- static void CoCondVarTest(TCont* c, void* /*run*/) {
- c->Executor()->Create(CoCondVar, nullptr, "1");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "2");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "3");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "4");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "5");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "6");
- c->Yield();
- for (size_t i5 = 0; i5 < 3; ++i5) {
- res += ToString((size_t)i5) + "^";
- c1.BroadCast();
- c->Yield();
- }
- }
- void TCoroTest::TestCondVar() {
- TContExecutor e(32000);
- e.Execute(CoCondVarTest);
- UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456");
- res.clear();
- }
- namespace NCoroTestJoin {
- struct TSleepCont {
- const TInstant Deadline;
- int Result;
- inline void operator()(TCont* c) {
- Result = c->SleepD(Deadline);
- }
- };
- struct TReadCont {
- const TInstant Deadline;
- const SOCKET Sock;
- int Result;
- inline void operator()(TCont* c) {
- char buf = 0;
- Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
- }
- };
- struct TJoinCont {
- const TInstant Deadline;
- TCont* const Cont;
- bool Result;
- inline void operator()(TCont* c) {
- Result = c->Join(Cont, Deadline);
- }
- };
- void DoTestJoin(EContPoller pollerType) {
- auto poller = IPollerFace::Construct(pollerType);
- if (!poller) {
- return;
- }
- TContExecutor e(32000, std::move(poller));
- TPipe in, out;
- TPipe::Pipe(in, out);
- SetNonBlock(in.GetHandle());
- {
- TSleepCont sc = {TInstant::Max(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- {
- TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
- UNIT_ASSERT_EQUAL(jc.Result, true);
- }
- {
- TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- {
- TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- {
- TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
- UNIT_ASSERT_EQUAL(jc.Result, true);
- }
- {
- TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- }
- }
- void TCoroTest::TestJoinDefault() {
- NCoroTestJoin::DoTestJoin(EContPoller::Default);
- }
- void TCoroTest::TestJoinEpoll() {
- NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
- }
- void TCoroTest::TestJoinKqueue() {
- NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
- }
- void TCoroTest::TestJoinPoll() {
- NCoroTestJoin::DoTestJoin(EContPoller::Poll);
- }
- void TCoroTest::TestJoinSelect() {
- NCoroTestJoin::DoTestJoin(EContPoller::Select);
- }
- namespace NCoroJoinCancelExitRaceBug {
- struct TState {
- TCont* Sub = nullptr;
- };
- static void DoAux(TCont*, void* argPtr) noexcept {
- TState& state = *(TState*)(argPtr);
- // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
- state.Sub->Cancel();
- }
- static void DoSub2(TCont*, void*) noexcept {
- // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
- // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
- }
- static void DoSub(TCont* cont, void* argPtr) noexcept {
- TState& state = *(TState*)(argPtr);
- state.Sub = cont;
- // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
- auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
- // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
- // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
- // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
- cont->Join(sub2);
- state.Sub = nullptr;
- }
- static void DoMain(TCont* cont) noexcept {
- TState state;
- // 01.{Ready:[]} > {Ready:[Sub]}
- auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
- // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
- cont->Executor()->Create(DoAux, &state, "Aux");
- // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
- cont->Join(sub);
- }
- }
- void TCoroTest::TestJoinCancelExitRaceBug() {
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
- }
- namespace NCoroWaitWakeLivelockBug {
- struct TState;
- struct TSubState {
- TSubState(TState& parent, ui32 self)
- : Parent(parent)
- , Name(TStringBuilder() << "Sub" << self)
- , Self(self)
- {
- UNIT_ASSERT(self < 2);
- }
- TSubState& OtherState();
- TState& Parent;
- TTimerEvent* Event = nullptr;
- TCont* Cont = nullptr;
- TString Name;
- ui32 Self = -1;
- };
- struct TState {
- TState()
- : Subs{{*this, 0}, {*this, 1}}
- {}
- TSubState Subs[2];
- bool Stop = false;
- };
- TSubState& TSubState::OtherState() {
- return Parent.Subs[1 - Self];
- }
- static void DoStop(TCont* cont, void* argPtr) {
- TState& state = *(TState*)(argPtr);
- TTimerEvent event(cont, TInstant::Now());
- ExecuteEvent(&event);
- state.Stop = true;
- for (auto& sub: state.Subs) {
- if (sub.Event) {
- sub.Event->Wake(EWAKEDUP);
- }
- }
- }
- static void DoSub(TCont* cont, void* argPtr) {
- TSubState& state = *(TSubState*)(argPtr);
- while (!state.Parent.Stop) {
- TTimerEvent event(cont, TInstant::Max());
- if (state.OtherState().Event) {
- state.OtherState().Event->Wake(EWAKEDUP);
- }
- state.Event = &event;
- ExecuteEvent(&event);
- state.Event = nullptr;
- }
- state.Cont = nullptr;
- }
- static void DoMain(TCont* cont) noexcept {
- TState state;
- for (auto& subState : state.Subs) {
- subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
- }
- cont->Join(
- cont->Executor()->Create(DoStop, &state, "Stop")
- );
- for (auto& subState : state.Subs) {
- if (subState.Cont) {
- cont->Join(subState.Cont);
- }
- }
- }
- }
- void TCoroTest::TestWaitWakeLivelockBug() {
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
- }
- namespace NCoroTestFastPathWake {
- struct TState;
- struct TSubState {
- TSubState(TState& parent, ui32 self)
- : Parent(parent)
- , Name(TStringBuilder() << "Sub" << self)
- {}
- TState& Parent;
- TInstant Finish;
- TTimerEvent* Event = nullptr;
- TCont* Cont = nullptr;
- TString Name;
- };
- struct TState {
- TState()
- : Subs{{*this, 0}, {*this, 1}}
- {
- TPipe::Pipe(In, Out);
- SetNonBlock(In.GetHandle());
- }
- TSubState Subs[2];
- TPipe In, Out;
- bool IoSleepRunning = false;
- };
- static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
- try {
- TState& state = *(TState*) (argPtr);
- state.IoSleepRunning = true;
- TTempBuf tmp;
- // Wait for the event from io
- auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
- UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
- state.IoSleepRunning = false;
- } catch (const NUnitTest::TAssertException& ex) {
- Cerr << ex.AsStrBuf() << Endl;
- ex.BackTrace()->PrintTo(Cerr);
- throw;
- } catch (...) {
- Cerr << CurrentExceptionMessage() << Endl;
- throw;
- }
- }
- static void DoSub(TCont* cont, void* argPtr) noexcept {
- TSubState& state = *(TSubState*)(argPtr);
- TTimerEvent event(cont, TInstant::Max());
- state.Event = &event;
- ExecuteEvent(&event);
- state.Event = nullptr;
- state.Cont = nullptr;
- state.Finish = TInstant::Now();
- }
- static void DoMain(TCont* cont) noexcept {
- try {
- TState state;
- TInstant start = TInstant::Now();
- // This guy sleeps on io
- auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
- // These guys are to be woken up right away
- for (auto& subState : state.Subs) {
- subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
- }
- // Give way
- cont->Yield();
- // Check everyone has started, wake those to be woken
- UNIT_ASSERT(state.IoSleepRunning);
- for (auto& subState : state.Subs) {
- UNIT_ASSERT(subState.Event);
- subState.Event->Wake(EWAKEDUP);
- }
- // Give way again
- cont->Yield();
- // Check the woken guys have finished and quite soon
- for (auto& subState : state.Subs) {
- UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
- UNIT_ASSERT(!subState.Cont);
- }
- // Wake the io guy and finish
- state.Out.Close();
- if (state.IoSleepRunning) {
- cont->Join(sleeper);
- }
- // Check everything has ended sooner than the timeout
- UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
- } catch (const NUnitTest::TAssertException& ex) {
- Cerr << ex.AsStrBuf() << Endl;
- ex.BackTrace()->PrintTo(Cerr);
- throw;
- } catch (...) {
- Cerr << CurrentExceptionMessage() << Endl;
- throw;
- }
- }
- static void DoTestFastPathWake(EContPoller pollerType) {
- if (auto poller = IPollerFace::Construct(pollerType)) {
- TContExecutor exec(20000, std::move(poller));
- exec.SetFailOnError(true);
- exec.Execute(NCoroTestFastPathWake::DoMain);
- }
- }
- }
- void TCoroTest::TestFastPathWakeDefault() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
- }
- void TCoroTest::TestFastPathWakeEpoll() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
- }
- void TCoroTest::TestFastPathWakeKqueue() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
- }
- void TCoroTest::TestFastPathWakePoll() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
- }
- void TCoroTest::TestFastPathWakeSelect() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
- }
- namespace NCoroTestLegacyCancelYieldRaceBug {
- enum class EState {
- Idle, Running, Finished,
- };
- struct TState {
- EState SubState = EState::Idle;
- };
- static void DoSub(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubState = EState::Running;
- cont->Yield();
- cont->Yield();
- state.SubState = EState::Finished;
- }
- static void DoMain(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
- sub->Cancel();
- cont->Yield();
- UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
- }
- }
- void TCoroTest::TestLegacyCancelYieldRaceBug() {
- NCoroTestLegacyCancelYieldRaceBug::TState state;
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
- }
- namespace NCoroTestJoinRescheduleBug {
- enum class EState {
- Idle, Running, Finished,
- };
- struct TState {
- TCont* volatile SubA = nullptr;
- volatile EState SubAState = EState::Idle;
- volatile EState SubBState = EState::Idle;
- volatile EState SubCState = EState::Idle;
- };
- static void DoSubC(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubCState = EState::Running;
- while (state.SubBState != EState::Running) {
- cont->Yield();
- }
- while (cont->SleepD(TInstant::Max()) != ECANCELED) {
- }
- state.SubCState = EState::Finished;
- }
- static void DoSubB(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubBState = EState::Running;
- while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
- cont->Yield();
- }
- for (auto i : xrange(100)) {
- Y_UNUSED(i);
- if (!state.SubA) {
- break;
- }
- state.SubA->ReSchedule();
- cont->Yield();
- }
- state.SubBState = EState::Finished;
- }
- static void DoSubA(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubAState = EState::Running;
- TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
- while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
- cont->Yield();
- }
- cont->Join(subC);
- UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
- state.SubA = nullptr;
- state.SubAState = EState::Finished;
- }
- static void DoMain(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
- state.SubA = subA;
- cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
- if (state.SubA) {
- subA->Cancel();
- cont->Join(subA);
- }
- }
- }
- void TCoroTest::TestJoinRescheduleBug() {
- using namespace NCoroTestJoinRescheduleBug;
- TState state;
- {
- TContExecutor exec(20000);
- exec.Execute(DoMain, &state);
- }
- UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
- UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
- UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
- }
- void TCoroTest::TestEventQueue() {
- NCoro::TEventWaitQueue queue;
- UNIT_ASSERT(queue.Empty());
- UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
- TContExecutor exec(32000);
- exec.Execute([](TCont* cont, void* arg) {
- NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
- TTimerEvent ev(cont, TInstant::Max());
- TTimerEvent ev2(cont, TInstant::Seconds(12345));
- q->Register(&ev);
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
- q->Register(&ev2);
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
- }, &queue);
- }
- void TCoroTest::TestNestedExecutor() {
- #ifndef _win_
- //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
- TContExecutor exec(32000);
- UNIT_ASSERT(!RunningCont());
- exec.Execute([](TCont* cont, void*) {
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
- TContExecutor exec2(32000);
- exec2.Execute([](TCont* cont2, void*) {
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
- TContExecutor exec3(32000);
- exec3.Execute([](TCont* cont3, void*) {
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3);
- });
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
- });
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
- });
- UNIT_ASSERT(!RunningCont());
- #endif
- }
- void TCoroTest::TestComputeCoroutineYield() {
- //if we have busy (e.g., on cpu) coroutine, when it yields, io must flow
- TContExecutor exec(32000);
- exec.SetFailOnError(true);
- TPipe in, out;
- TPipe::Pipe(in, out);
- SetNonBlock(in.GetHandle());
- size_t lastRead = 42;
- auto compute = [&](TCont* cont) {
- for (size_t i = 0; i < 10; ++i) {
- write(out.GetHandle(), &i, sizeof i);
- Sleep(TDuration::MilliSeconds(10));
- cont->Yield();
- UNIT_ASSERT(lastRead == i);
- }
- };
- auto io = [&](TCont* cont) {
- for (size_t i = 0; i < 10; ++i) {
- NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead);
- }
- };
- exec.Create(compute, "compute");
- exec.Create(io, "io");
- exec.Execute();
- }
- void TCoroTest::TestPollEngines() {
- bool defaultChecked = false;
- for (auto engine : GetEnumAllValues<EContPoller>()) {
- auto poller = IPollerFace::Construct(engine);
- if (!poller) {
- continue;
- }
- TContExecutor exec(32000, IPollerFace::Construct(engine));
- if (engine == EContPoller::Default) {
- defaultChecked = true;
- #if defined(HAVE_EPOLL_POLLER)
- UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Epoll);
- #elif defined(HAVE_KQUEUE_POLLER)
- UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Kqueue);
- #else
- UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Select);
- #endif
- } else {
- UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine);
- }
- }
- UNIT_ASSERT(defaultChecked);
- }
- void TCoroTest::TestPause() {
- TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()};
- int i = 0;
- executor.CreateOwned([&](TCont*) {
- i++;
- executor.Pause();
- i++;
- }, "coro");
- UNIT_ASSERT_EQUAL(i, 0);
- executor.Execute();
- UNIT_ASSERT_EQUAL(i, 1);
- executor.Execute();
- UNIT_ASSERT_EQUAL(i, 2);
- }
- void TCoroTest::TestUserEvent() {
- TContExecutor exec(32000);
- struct TUserEvent : public IUserEvent {
- bool Called = false;
- void Execute() override {
- Called = true;
- }
- } event;
- auto f = [&](TCont* cont) {
- UNIT_ASSERT(!event.Called);
- exec.ScheduleUserEvent(&event);
- UNIT_ASSERT(!event.Called);
- cont->Yield();
- UNIT_ASSERT(event.Called);
- };
- exec.Execute(f);
- UNIT_ASSERT(event.Called);
- }
- void TCoroTest::TestOverrideTime() {
- class TTime: public NCoro::ITime {
- public:
- TInstant Now() override {
- return Current;
- }
- TInstant Current = TInstant::Zero();
- };
- TTime time;
- TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing(), &time};
- executor.CreateOwned([&](TCont* cont) {
- UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Zero());
- time.Current = TInstant::Seconds(1);
- cont->SleepD(TInstant::Seconds(1));
- UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Seconds(1));
- }, "coro");
- executor.Execute();
- }
- void TCoroTest::TestCancelWithException() {
- TContExecutor exec(32000);
- TString excText = "test exception";
- THolder<std::exception> excep = MakeHolder<yexception>(yexception() << excText);
- std::exception* excPtr = excep.Get();
- exec.CreateOwned([&](TCont* cont){
- TCont *cont1 = cont->Executor()->CreateOwned([&](TCont* c) {
- int result = c->SleepD(TDuration::MilliSeconds(200).ToDeadLine());
- UNIT_ASSERT_EQUAL(result, ECANCELED);
- UNIT_ASSERT_EQUAL(c->Cancelled(), true);
- THolder<std::exception> exc = c->TakeException();
- UNIT_ASSERT_EQUAL(exc.Get(), excPtr);
- UNIT_ASSERT_EQUAL(exc->what(), excText);
- UNIT_ASSERT(dynamic_cast<yexception*>(exc.Get()) != nullptr);
- }, "cancelExc");
- cont1->Cancel(std::move(excep));
- TCont* cont2 = cont->Executor()->CreateOwned([&](TCont* c) {
- int result = c->SleepD(TDuration::MilliSeconds(200).ToDeadLine());
- UNIT_ASSERT_EQUAL(result, ECANCELED);
- UNIT_ASSERT_EQUAL(c->Cancelled(), true);
- THolder<std::exception> exc = c->TakeException();
- UNIT_ASSERT_EQUAL(exc.Get(), nullptr);
- }, "cancelTwice");
- cont2->Cancel();
- THolder<std::exception> e = MakeHolder<yexception>(yexception() << "another exception");
- cont2->Cancel(std::move(e));
- }, "coro");
- exec.Execute();
- }
- UNIT_TEST_SUITE_REGISTRATION(TCoroTest);
|