12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007 |
- #include "impl.h"
- #include "condvar.h"
- #include "network.h"
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/string/cast.h>
- #include <util/system/pipe.h>
- #include <util/system/env.h>
- #include <util/system/info.h>
- #include <util/system/thread.h>
- #include <util/generic/xrange.h>
- #include <util/generic/serialized_enum.h>
- // TODO (velavokr): BALANCER-1345 add more tests on pollers
- class TCoroTest: public TTestBase {
- UNIT_TEST_SUITE(TCoroTest);
- UNIT_TEST(TestSimpleX1);
- UNIT_TEST(TestSimpleX1MultiThread);
- UNIT_TEST(TestSimpleX2);
- UNIT_TEST(TestSimpleX3);
- UNIT_TEST(TestMemFun);
- UNIT_TEST(TestMutex);
- UNIT_TEST(TestCondVar);
- UNIT_TEST(TestJoinDefault);
- UNIT_TEST(TestJoinEpoll);
- UNIT_TEST(TestJoinKqueue);
- UNIT_TEST(TestJoinPoll);
- UNIT_TEST(TestJoinSelect);
- UNIT_TEST(TestException);
- UNIT_TEST(TestJoinCancelExitRaceBug);
- UNIT_TEST(TestWaitWakeLivelockBug);
- UNIT_TEST(TestFastPathWakeDefault)
- // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
- // UNIT_TEST(TestFastPathWakeEpoll)
- UNIT_TEST(TestFastPathWakeKqueue)
- UNIT_TEST(TestFastPathWakePoll)
- UNIT_TEST(TestFastPathWakeSelect)
- UNIT_TEST(TestLegacyCancelYieldRaceBug)
- UNIT_TEST(TestJoinRescheduleBug);
- UNIT_TEST(TestEventQueue)
- UNIT_TEST(TestNestedExecutor)
- UNIT_TEST(TestComputeCoroutineYield)
- UNIT_TEST(TestPollEngines);
- UNIT_TEST(TestUserEvent);
- UNIT_TEST(TestPause);
- UNIT_TEST(TestOverrideTime);
- UNIT_TEST_SUITE_END();
- public:
- void TestException();
- void TestSimpleX1();
- void TestSimpleX1MultiThread();
- void TestSimpleX2();
- void TestSimpleX3();
- void TestMemFun();
- void TestMutex();
- void TestCondVar();
- void TestJoinDefault();
- void TestJoinEpoll();
- void TestJoinKqueue();
- void TestJoinPoll();
- void TestJoinSelect();
- void TestJoinCancelExitRaceBug();
- void TestWaitWakeLivelockBug();
- void TestFastPathWakeDefault();
- void TestFastPathWakeEpoll();
- void TestFastPathWakeKqueue();
- void TestFastPathWakePoll();
- void TestFastPathWakeSelect();
- void TestLegacyCancelYieldRaceBug();
- void TestJoinRescheduleBug();
- void TestEventQueue();
- void TestNestedExecutor();
- void TestComputeCoroutineYield();
- void TestPollEngines();
- void TestUserEvent();
- void TestPause();
- void TestOverrideTime();
- };
- void TCoroTest::TestException() {
- TContExecutor e(1000000);
- bool f2run = false;
- auto f1 = [&f2run](TCont* c) {
- struct TCtx {
- ~TCtx() {
- Y_VERIFY(!*F2);
- C->Yield();
- }
- TCont* C;
- bool* F2;
- };
- try {
- TCtx ctx = {c, &f2run};
- throw 1;
- } catch (...) {
- }
- };
- bool unc = true;
- auto f2 = [&unc, &f2run](TCont*) {
- f2run = true;
- unc = std::uncaught_exception();
- // check segfault
- try {
- throw 2;
- } catch (int) {
- }
- };
- e.Create(f1, "f1");
- e.Create(f2, "f2");
- e.Execute();
- UNIT_ASSERT(!unc);
- }
- static int i0;
- static void CoRun(TCont* c, void* /*run*/) {
- while (i0 < 100000) {
- ++i0;
- UNIT_ASSERT(RunningCont() == c);
- c->Yield();
- UNIT_ASSERT(RunningCont() == c);
- }
- }
- static void CoMain(TCont* c, void* /*arg*/) {
- for (volatile size_t i2 = 0; i2 < 10; ++i2) {
- UNIT_ASSERT(RunningCont() == c);
- c->Executor()->Create(CoRun, nullptr, "run");
- UNIT_ASSERT(RunningCont() == c);
- }
- }
- void TCoroTest::TestSimpleX1() {
- i0 = 0;
- TContExecutor e(32000);
- UNIT_ASSERT(RunningCont() == nullptr);
- e.Execute(CoMain);
- UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
- UNIT_ASSERT(RunningCont() == nullptr);
- }
- void TCoroTest::TestSimpleX1MultiThread() {
- TVector<THolder<TThread>> threads;
- const size_t nThreads = 0;
- TAtomic c = 0;
- for (size_t i = 0; i < nThreads; ++i) {
- threads.push_back(MakeHolder<TThread>([&]() {
- TestSimpleX1();
- AtomicIncrement(c);
- }));
- }
- for (auto& t : threads) {
- t->Start();
- }
- for (auto& t: threads) {
- t->Join();
- }
- UNIT_ASSERT_EQUAL(c, nThreads);
- }
- struct TTestObject {
- int i = 0;
- int j = 0;
- public:
- void RunTask1(TCont*) {
- i = 1;
- }
- void RunTask2(TCont*) {
- j = 2;
- }
- };
- void TCoroTest::TestMemFun() {
- i0 = 0;
- TContExecutor e(32000);
- TTestObject obj;
- e.Create<TTestObject, &TTestObject::RunTask1>(&obj, "test1");
- e.Execute<TTestObject, &TTestObject::RunTask2>(&obj);
- UNIT_ASSERT_EQUAL(obj.i, 1);
- UNIT_ASSERT_EQUAL(obj.j, 2);
- }
- void TCoroTest::TestSimpleX2() {
- {
- i0 = 0;
- {
- TContExecutor e(32000);
- e.Execute(CoMain);
- }
- UNIT_ASSERT_EQUAL(i0, 100000);
- }
- {
- i0 = 0;
- {
- TContExecutor e(32000);
- e.Execute(CoMain);
- }
- UNIT_ASSERT_EQUAL(i0, 100000);
- }
- }
- struct TRunner {
- inline TRunner()
- : Runs(0)
- {
- }
- inline void operator()(TCont* c) {
- ++Runs;
- c->Yield();
- }
- size_t Runs;
- };
- void TCoroTest::TestSimpleX3() {
- TContExecutor e(32000);
- TRunner runner;
- for (volatile size_t i3 = 0; i3 < 1000; ++i3) {
- e.Create(runner, "runner");
- }
- e.Execute();
- UNIT_ASSERT_EQUAL(runner.Runs, 1000);
- }
- static TString res;
- static TContMutex mutex;
- static void CoMutex(TCont* c, void* /*run*/) {
- {
- mutex.LockI(c);
- c->Yield();
- res += c->Name();
- mutex.UnLock();
- }
- c->Yield();
- {
- mutex.LockI(c);
- c->Yield();
- res += c->Name();
- mutex.UnLock();
- }
- }
- static void CoMutexTest(TCont* c, void* /*run*/) {
- c->Executor()->Create(CoMutex, nullptr, "1");
- c->Executor()->Create(CoMutex, nullptr, "2");
- }
- void TCoroTest::TestMutex() {
- TContExecutor e(32000);
- e.Execute(CoMutexTest);
- UNIT_ASSERT_EQUAL(res, "1212");
- res.clear();
- }
- static TContMutex m1;
- static TContCondVar c1;
- static void CoCondVar(TCont* c, void* /*run*/) {
- for (size_t i4 = 0; i4 < 3; ++i4) {
- UNIT_ASSERT_EQUAL(m1.LockI(c), 0);
- UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0);
- res += c->Name();
- m1.UnLock();
- }
- }
- static void CoCondVarTest(TCont* c, void* /*run*/) {
- c->Executor()->Create(CoCondVar, nullptr, "1");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "2");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "3");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "4");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "5");
- c->Yield();
- c->Executor()->Create(CoCondVar, nullptr, "6");
- c->Yield();
- for (size_t i5 = 0; i5 < 3; ++i5) {
- res += ToString((size_t)i5) + "^";
- c1.BroadCast();
- c->Yield();
- }
- }
- void TCoroTest::TestCondVar() {
- TContExecutor e(32000);
- e.Execute(CoCondVarTest);
- UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456");
- res.clear();
- }
- namespace NCoroTestJoin {
- struct TSleepCont {
- const TInstant Deadline;
- int Result;
- inline void operator()(TCont* c) {
- Result = c->SleepD(Deadline);
- }
- };
- struct TReadCont {
- const TInstant Deadline;
- const SOCKET Sock;
- int Result;
- inline void operator()(TCont* c) {
- char buf = 0;
- Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
- }
- };
- struct TJoinCont {
- const TInstant Deadline;
- TCont* const Cont;
- bool Result;
- inline void operator()(TCont* c) {
- Result = c->Join(Cont, Deadline);
- }
- };
- void DoTestJoin(EContPoller pollerType) {
- auto poller = IPollerFace::Construct(pollerType);
- if (!poller) {
- return;
- }
- TContExecutor e(32000, std::move(poller));
- TPipe in, out;
- TPipe::Pipe(in, out);
- SetNonBlock(in.GetHandle());
- {
- TSleepCont sc = {TInstant::Max(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- {
- TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
- UNIT_ASSERT_EQUAL(jc.Result, true);
- }
- {
- TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- {
- TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- {
- TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
- UNIT_ASSERT_EQUAL(jc.Result, true);
- }
- {
- TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
- e.Execute(jc);
- UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- }
- }
- void TCoroTest::TestJoinDefault() {
- NCoroTestJoin::DoTestJoin(EContPoller::Default);
- }
- void TCoroTest::TestJoinEpoll() {
- NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
- }
- void TCoroTest::TestJoinKqueue() {
- NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
- }
- void TCoroTest::TestJoinPoll() {
- NCoroTestJoin::DoTestJoin(EContPoller::Poll);
- }
- void TCoroTest::TestJoinSelect() {
- NCoroTestJoin::DoTestJoin(EContPoller::Select);
- }
- namespace NCoroJoinCancelExitRaceBug {
- struct TState {
- TCont* Sub = nullptr;
- };
- static void DoAux(TCont*, void* argPtr) noexcept {
- TState& state = *(TState*)(argPtr);
- // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
- state.Sub->Cancel();
- }
- static void DoSub2(TCont*, void*) noexcept {
- // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
- // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
- }
- static void DoSub(TCont* cont, void* argPtr) noexcept {
- TState& state = *(TState*)(argPtr);
- state.Sub = cont;
- // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
- auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
- // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
- // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
- // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
- cont->Join(sub2);
- state.Sub = nullptr;
- }
- static void DoMain(TCont* cont) noexcept {
- TState state;
- // 01.{Ready:[]} > {Ready:[Sub]}
- auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
- // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
- cont->Executor()->Create(DoAux, &state, "Aux");
- // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
- cont->Join(sub);
- }
- }
- void TCoroTest::TestJoinCancelExitRaceBug() {
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
- }
- namespace NCoroWaitWakeLivelockBug {
- struct TState;
- struct TSubState {
- TSubState(TState& parent, ui32 self)
- : Parent(parent)
- , Name(TStringBuilder() << "Sub" << self)
- , Self(self)
- {
- UNIT_ASSERT(self < 2);
- }
- TSubState& OtherState();
- TState& Parent;
- TTimerEvent* Event = nullptr;
- TCont* Cont = nullptr;
- TString Name;
- ui32 Self = -1;
- };
- struct TState {
- TState()
- : Subs{{*this, 0}, {*this, 1}}
- {}
- TSubState Subs[2];
- bool Stop = false;
- };
- TSubState& TSubState::OtherState() {
- return Parent.Subs[1 - Self];
- }
- static void DoStop(TCont* cont, void* argPtr) {
- TState& state = *(TState*)(argPtr);
- TTimerEvent event(cont, TInstant::Now());
- ExecuteEvent(&event);
- state.Stop = true;
- for (auto& sub: state.Subs) {
- if (sub.Event) {
- sub.Event->Wake(EWAKEDUP);
- }
- }
- }
- static void DoSub(TCont* cont, void* argPtr) {
- TSubState& state = *(TSubState*)(argPtr);
- while (!state.Parent.Stop) {
- TTimerEvent event(cont, TInstant::Max());
- if (state.OtherState().Event) {
- state.OtherState().Event->Wake(EWAKEDUP);
- }
- state.Event = &event;
- ExecuteEvent(&event);
- state.Event = nullptr;
- }
- state.Cont = nullptr;
- }
- static void DoMain(TCont* cont) noexcept {
- TState state;
- for (auto& subState : state.Subs) {
- subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
- }
- cont->Join(
- cont->Executor()->Create(DoStop, &state, "Stop")
- );
- for (auto& subState : state.Subs) {
- if (subState.Cont) {
- cont->Join(subState.Cont);
- }
- }
- }
- }
- void TCoroTest::TestWaitWakeLivelockBug() {
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
- }
- namespace NCoroTestFastPathWake {
- struct TState;
- struct TSubState {
- TSubState(TState& parent, ui32 self)
- : Parent(parent)
- , Name(TStringBuilder() << "Sub" << self)
- {}
- TState& Parent;
- TInstant Finish;
- TTimerEvent* Event = nullptr;
- TCont* Cont = nullptr;
- TString Name;
- };
- struct TState {
- TState()
- : Subs{{*this, 0}, {*this, 1}}
- {
- TPipe::Pipe(In, Out);
- SetNonBlock(In.GetHandle());
- }
- TSubState Subs[2];
- TPipe In, Out;
- bool IoSleepRunning = false;
- };
- static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
- try {
- TState& state = *(TState*) (argPtr);
- state.IoSleepRunning = true;
- TTempBuf tmp;
- // Wait for the event from io
- auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
- UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
- state.IoSleepRunning = false;
- } catch (const NUnitTest::TAssertException& ex) {
- Cerr << ex.AsStrBuf() << Endl;
- ex.BackTrace()->PrintTo(Cerr);
- throw;
- } catch (...) {
- Cerr << CurrentExceptionMessage() << Endl;
- throw;
- }
- }
- static void DoSub(TCont* cont, void* argPtr) noexcept {
- TSubState& state = *(TSubState*)(argPtr);
- TTimerEvent event(cont, TInstant::Max());
- state.Event = &event;
- ExecuteEvent(&event);
- state.Event = nullptr;
- state.Cont = nullptr;
- state.Finish = TInstant::Now();
- }
- static void DoMain(TCont* cont) noexcept {
- try {
- TState state;
- TInstant start = TInstant::Now();
- // This guy sleeps on io
- auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
- // These guys are to be woken up right away
- for (auto& subState : state.Subs) {
- subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
- }
- // Give way
- cont->Yield();
- // Check everyone has started, wake those to be woken
- UNIT_ASSERT(state.IoSleepRunning);
- for (auto& subState : state.Subs) {
- UNIT_ASSERT(subState.Event);
- subState.Event->Wake(EWAKEDUP);
- }
- // Give way again
- cont->Yield();
- // Check the woken guys have finished and quite soon
- for (auto& subState : state.Subs) {
- UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
- UNIT_ASSERT(!subState.Cont);
- }
- // Wake the io guy and finish
- state.Out.Close();
- if (state.IoSleepRunning) {
- cont->Join(sleeper);
- }
- // Check everything has ended sooner than the timeout
- UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
- } catch (const NUnitTest::TAssertException& ex) {
- Cerr << ex.AsStrBuf() << Endl;
- ex.BackTrace()->PrintTo(Cerr);
- throw;
- } catch (...) {
- Cerr << CurrentExceptionMessage() << Endl;
- throw;
- }
- }
- static void DoTestFastPathWake(EContPoller pollerType) {
- if (auto poller = IPollerFace::Construct(pollerType)) {
- TContExecutor exec(20000, std::move(poller));
- exec.SetFailOnError(true);
- exec.Execute(NCoroTestFastPathWake::DoMain);
- }
- }
- }
- void TCoroTest::TestFastPathWakeDefault() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
- }
- void TCoroTest::TestFastPathWakeEpoll() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
- }
- void TCoroTest::TestFastPathWakeKqueue() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
- }
- void TCoroTest::TestFastPathWakePoll() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
- }
- void TCoroTest::TestFastPathWakeSelect() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
- }
- namespace NCoroTestLegacyCancelYieldRaceBug {
- enum class EState {
- Idle, Running, Finished,
- };
- struct TState {
- EState SubState = EState::Idle;
- };
- static void DoSub(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubState = EState::Running;
- cont->Yield();
- cont->Yield();
- state.SubState = EState::Finished;
- }
- static void DoMain(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
- sub->Cancel();
- cont->Yield();
- UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
- }
- }
- void TCoroTest::TestLegacyCancelYieldRaceBug() {
- NCoroTestLegacyCancelYieldRaceBug::TState state;
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
- }
- namespace NCoroTestJoinRescheduleBug {
- enum class EState {
- Idle, Running, Finished,
- };
- struct TState {
- TCont* volatile SubA = nullptr;
- volatile EState SubAState = EState::Idle;
- volatile EState SubBState = EState::Idle;
- volatile EState SubCState = EState::Idle;
- };
- static void DoSubC(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubCState = EState::Running;
- while (state.SubBState != EState::Running) {
- cont->Yield();
- }
- while (cont->SleepD(TInstant::Max()) != ECANCELED) {
- }
- state.SubCState = EState::Finished;
- }
- static void DoSubB(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubBState = EState::Running;
- while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
- cont->Yield();
- }
- for (auto i : xrange(100)) {
- Y_UNUSED(i);
- if (!state.SubA) {
- break;
- }
- state.SubA->ReSchedule();
- cont->Yield();
- }
- state.SubBState = EState::Finished;
- }
- static void DoSubA(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubAState = EState::Running;
- TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
- while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
- cont->Yield();
- }
- cont->Join(subC);
- UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
- state.SubA = nullptr;
- state.SubAState = EState::Finished;
- }
- static void DoMain(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
- state.SubA = subA;
- cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
- if (state.SubA) {
- subA->Cancel();
- cont->Join(subA);
- }
- }
- }
- void TCoroTest::TestJoinRescheduleBug() {
- using namespace NCoroTestJoinRescheduleBug;
- TState state;
- {
- TContExecutor exec(20000);
- exec.Execute(DoMain, &state);
- }
- UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
- UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
- UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
- }
- void TCoroTest::TestEventQueue() {
- NCoro::TEventWaitQueue queue;
- UNIT_ASSERT(queue.Empty());
- UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
- TContExecutor exec(32000);
- exec.Execute([](TCont* cont, void* arg) {
- NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
- TTimerEvent ev(cont, TInstant::Max());
- TTimerEvent ev2(cont, TInstant::Seconds(12345));
- q->Register(&ev);
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
- q->Register(&ev2);
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
- }, &queue);
- }
- void TCoroTest::TestNestedExecutor() {
- #ifndef _win_
- //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
- TContExecutor exec(32000);
- UNIT_ASSERT(!RunningCont());
- exec.Execute([](TCont* cont, void*) {
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
- TContExecutor exec2(32000);
- exec2.Execute([](TCont* cont2, void*) {
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
- TContExecutor exec3(32000);
- exec3.Execute([](TCont* cont3, void*) {
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3);
- });
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
- });
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
- });
- UNIT_ASSERT(!RunningCont());
- #endif
- }
- void TCoroTest::TestComputeCoroutineYield() {
- //if we have busy (e.g., on cpu) coroutine, when it yields, io must flow
- TContExecutor exec(32000);
- exec.SetFailOnError(true);
- TPipe in, out;
- TPipe::Pipe(in, out);
- SetNonBlock(in.GetHandle());
- size_t lastRead = 42;
- auto compute = [&](TCont* cont) {
- for (size_t i = 0; i < 10; ++i) {
- write(out.GetHandle(), &i, sizeof i);
- Sleep(TDuration::MilliSeconds(10));
- cont->Yield();
- UNIT_ASSERT(lastRead == i);
- }
- };
- auto io = [&](TCont* cont) {
- for (size_t i = 0; i < 10; ++i) {
- NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead);
- }
- };
- exec.Create(compute, "compute");
- exec.Create(io, "io");
- exec.Execute();
- }
- void TCoroTest::TestPollEngines() {
- bool defaultChecked = false;
- for (auto engine : GetEnumAllValues<EContPoller>()) {
- auto poller = IPollerFace::Construct(engine);
- if (!poller) {
- continue;
- }
- TContExecutor exec(32000, IPollerFace::Construct(engine));
- if (engine == EContPoller::Default) {
- defaultChecked = true;
- UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine);
- }
- }
- UNIT_ASSERT(defaultChecked);
- }
- void TCoroTest::TestPause() {
- TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()};
- int i = 0;
- executor.CreateOwned([&](TCont*) {
- i++;
- executor.Pause();
- i++;
- }, "coro");
- UNIT_ASSERT_EQUAL(i, 0);
- executor.Execute();
- UNIT_ASSERT_EQUAL(i, 1);
- executor.Execute();
- UNIT_ASSERT_EQUAL(i, 2);
- }
- void TCoroTest::TestUserEvent() {
- TContExecutor exec(32000);
- struct TUserEvent : public IUserEvent {
- bool Called = false;
- void Execute() override {
- Called = true;
- }
- } event;
- auto f = [&](TCont* cont) {
- UNIT_ASSERT(!event.Called);
- exec.ScheduleUserEvent(&event);
- UNIT_ASSERT(!event.Called);
- cont->Yield();
- UNIT_ASSERT(event.Called);
- };
- exec.Execute(f);
- UNIT_ASSERT(event.Called);
- }
- void TCoroTest::TestOverrideTime() {
- class TTime: public NCoro::ITime {
- public:
- TInstant Now() override {
- return Current;
- }
- TInstant Current = TInstant::Zero();
- };
- TTime time;
- TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing(), &time};
- executor.CreateOwned([&](TCont* cont) {
- UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Zero());
- time.Current = TInstant::Seconds(1);
- cont->SleepD(TInstant::Seconds(1));
- UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Seconds(1));
- }, "coro");
- executor.Execute();
- }
- UNIT_TEST_SUITE_REGISTRATION(TCoroTest);
|