coroutine_ut.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050
  1. #include "impl.h"
  2. #include "condvar.h"
  3. #include "network.h"
  4. #include <library/cpp/deprecated/atomic/atomic.h>
  5. #include <library/cpp/testing/unittest/registar.h>
  6. #include <util/string/cast.h>
  7. #include <util/system/pipe.h>
  8. #include <util/system/env.h>
  9. #include <util/system/info.h>
  10. #include <util/system/thread.h>
  11. #include <util/generic/xrange.h>
  12. #include <util/generic/serialized_enum.h>
  13. // TODO (velavokr): BALANCER-1345 add more tests on pollers
  14. class TCoroTest: public TTestBase {
  15. UNIT_TEST_SUITE(TCoroTest);
  16. UNIT_TEST(TestSimpleX1);
  17. UNIT_TEST(TestSimpleX1MultiThread);
  18. UNIT_TEST(TestSimpleX2);
  19. UNIT_TEST(TestSimpleX3);
  20. UNIT_TEST(TestMemFun);
  21. UNIT_TEST(TestMutex);
  22. UNIT_TEST(TestCondVar);
  23. UNIT_TEST(TestJoinDefault);
  24. UNIT_TEST(TestJoinEpoll);
  25. UNIT_TEST(TestJoinKqueue);
  26. UNIT_TEST(TestJoinPoll);
  27. UNIT_TEST(TestJoinSelect);
  28. UNIT_TEST(TestException);
  29. UNIT_TEST(TestJoinCancelExitRaceBug);
  30. UNIT_TEST(TestWaitWakeLivelockBug);
  31. // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
  32. // UNIT_TEST(TestFastPathWakeDefault)
  33. // UNIT_TEST(TestFastPathWakeEpoll)
  34. UNIT_TEST(TestFastPathWakeKqueue)
  35. UNIT_TEST(TestFastPathWakePoll)
  36. UNIT_TEST(TestFastPathWakeSelect)
  37. UNIT_TEST(TestLegacyCancelYieldRaceBug)
  38. UNIT_TEST(TestJoinRescheduleBug);
  39. UNIT_TEST(TestEventQueue)
  40. UNIT_TEST(TestNestedExecutor)
  41. UNIT_TEST(TestComputeCoroutineYield)
  42. UNIT_TEST(TestPollEngines);
  43. UNIT_TEST(TestUserEvent);
  44. UNIT_TEST(TestPause);
  45. UNIT_TEST(TestOverrideTime);
  46. UNIT_TEST(TestCancelWithException);
  47. UNIT_TEST_SUITE_END();
  48. public:
  49. void TestException();
  50. void TestSimpleX1();
  51. void TestSimpleX1MultiThread();
  52. void TestSimpleX2();
  53. void TestSimpleX3();
  54. void TestMemFun();
  55. void TestMutex();
  56. void TestCondVar();
  57. void TestJoinDefault();
  58. void TestJoinEpoll();
  59. void TestJoinKqueue();
  60. void TestJoinPoll();
  61. void TestJoinSelect();
  62. void TestJoinCancelExitRaceBug();
  63. void TestWaitWakeLivelockBug();
  64. void TestFastPathWakeDefault();
  65. void TestFastPathWakeEpoll();
  66. void TestFastPathWakeKqueue();
  67. void TestFastPathWakePoll();
  68. void TestFastPathWakeSelect();
  69. void TestLegacyCancelYieldRaceBug();
  70. void TestJoinRescheduleBug();
  71. void TestEventQueue();
  72. void TestNestedExecutor();
  73. void TestComputeCoroutineYield();
  74. void TestPollEngines();
  75. void TestUserEvent();
  76. void TestPause();
  77. void TestOverrideTime();
  78. void TestCancelWithException();
  79. };
  80. void TCoroTest::TestException() {
  81. TContExecutor e(1000000);
  82. bool f2run = false;
  83. auto f1 = [&f2run](TCont* c) {
  84. struct TCtx {
  85. ~TCtx() {
  86. Y_ABORT_UNLESS(!*F2);
  87. C->Yield();
  88. }
  89. TCont* C;
  90. bool* F2;
  91. };
  92. try {
  93. TCtx ctx = {c, &f2run};
  94. throw 1;
  95. } catch (...) {
  96. }
  97. };
  98. bool unc = true;
  99. auto f2 = [&unc, &f2run](TCont*) {
  100. f2run = true;
  101. unc = std::uncaught_exception();
  102. // check segfault
  103. try {
  104. throw 2;
  105. } catch (int) {
  106. }
  107. };
  108. e.Create(f1, "f1");
  109. e.Create(f2, "f2");
  110. e.Execute();
  111. UNIT_ASSERT(!unc);
  112. }
  113. static int i0;
  114. static void CoRun(TCont* c, void* /*run*/) {
  115. while (i0 < 100000) {
  116. ++i0;
  117. UNIT_ASSERT(RunningCont() == c);
  118. c->Yield();
  119. UNIT_ASSERT(RunningCont() == c);
  120. }
  121. }
  122. static void CoMain(TCont* c, void* /*arg*/) {
  123. for (volatile size_t i2 = 0; i2 < 10; ++i2) {
  124. UNIT_ASSERT(RunningCont() == c);
  125. c->Executor()->Create(CoRun, nullptr, "run");
  126. UNIT_ASSERT(RunningCont() == c);
  127. }
  128. }
  129. void TCoroTest::TestSimpleX1() {
  130. i0 = 0;
  131. TContExecutor e(32000);
  132. UNIT_ASSERT(RunningCont() == nullptr);
  133. e.Execute(CoMain);
  134. UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
  135. UNIT_ASSERT(RunningCont() == nullptr);
  136. }
  137. void TCoroTest::TestSimpleX1MultiThread() {
  138. TVector<THolder<TThread>> threads;
  139. const size_t nThreads = 0;
  140. TAtomic c = 0;
  141. for (size_t i = 0; i < nThreads; ++i) {
  142. threads.push_back(MakeHolder<TThread>([&]() {
  143. TestSimpleX1();
  144. AtomicIncrement(c);
  145. }));
  146. }
  147. for (auto& t : threads) {
  148. t->Start();
  149. }
  150. for (auto& t: threads) {
  151. t->Join();
  152. }
  153. UNIT_ASSERT_EQUAL(c, nThreads);
  154. }
  155. struct TTestObject {
  156. int i = 0;
  157. int j = 0;
  158. public:
  159. void RunTask1(TCont*) {
  160. i = 1;
  161. }
  162. void RunTask2(TCont*) {
  163. j = 2;
  164. }
  165. };
  166. void TCoroTest::TestMemFun() {
  167. i0 = 0;
  168. TContExecutor e(32000);
  169. TTestObject obj;
  170. e.Create<TTestObject, &TTestObject::RunTask1>(&obj, "test1");
  171. e.Execute<TTestObject, &TTestObject::RunTask2>(&obj);
  172. UNIT_ASSERT_EQUAL(obj.i, 1);
  173. UNIT_ASSERT_EQUAL(obj.j, 2);
  174. }
  175. void TCoroTest::TestSimpleX2() {
  176. {
  177. i0 = 0;
  178. {
  179. TContExecutor e(32000);
  180. e.Execute(CoMain);
  181. }
  182. UNIT_ASSERT_EQUAL(i0, 100000);
  183. }
  184. {
  185. i0 = 0;
  186. {
  187. TContExecutor e(32000);
  188. e.Execute(CoMain);
  189. }
  190. UNIT_ASSERT_EQUAL(i0, 100000);
  191. }
  192. }
  193. struct TRunner {
  194. inline TRunner()
  195. : Runs(0)
  196. {
  197. }
  198. inline void operator()(TCont* c) {
  199. ++Runs;
  200. c->Yield();
  201. }
  202. size_t Runs;
  203. };
  204. void TCoroTest::TestSimpleX3() {
  205. TContExecutor e(32000);
  206. TRunner runner;
  207. for (volatile size_t i3 = 0; i3 < 1000; ++i3) {
  208. e.Create(runner, "runner");
  209. }
  210. e.Execute();
  211. UNIT_ASSERT_EQUAL(runner.Runs, 1000);
  212. }
  213. static TString res;
  214. static TContMutex mutex;
  215. static void CoMutex(TCont* c, void* /*run*/) {
  216. {
  217. mutex.LockI(c);
  218. c->Yield();
  219. res += c->Name();
  220. mutex.UnLock();
  221. }
  222. c->Yield();
  223. {
  224. mutex.LockI(c);
  225. c->Yield();
  226. res += c->Name();
  227. mutex.UnLock();
  228. }
  229. }
  230. static void CoMutexTest(TCont* c, void* /*run*/) {
  231. c->Executor()->Create(CoMutex, nullptr, "1");
  232. c->Executor()->Create(CoMutex, nullptr, "2");
  233. }
  234. void TCoroTest::TestMutex() {
  235. TContExecutor e(32000);
  236. e.Execute(CoMutexTest);
  237. UNIT_ASSERT_EQUAL(res, "1212");
  238. res.clear();
  239. }
  240. static TContMutex m1;
  241. static TContCondVar c1;
  242. static void CoCondVar(TCont* c, void* /*run*/) {
  243. for (size_t i4 = 0; i4 < 3; ++i4) {
  244. UNIT_ASSERT_EQUAL(m1.LockI(c), 0);
  245. UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0);
  246. res += c->Name();
  247. m1.UnLock();
  248. }
  249. }
  250. static void CoCondVarTest(TCont* c, void* /*run*/) {
  251. c->Executor()->Create(CoCondVar, nullptr, "1");
  252. c->Yield();
  253. c->Executor()->Create(CoCondVar, nullptr, "2");
  254. c->Yield();
  255. c->Executor()->Create(CoCondVar, nullptr, "3");
  256. c->Yield();
  257. c->Executor()->Create(CoCondVar, nullptr, "4");
  258. c->Yield();
  259. c->Executor()->Create(CoCondVar, nullptr, "5");
  260. c->Yield();
  261. c->Executor()->Create(CoCondVar, nullptr, "6");
  262. c->Yield();
  263. for (size_t i5 = 0; i5 < 3; ++i5) {
  264. res += ToString((size_t)i5) + "^";
  265. c1.BroadCast();
  266. c->Yield();
  267. }
  268. }
  269. void TCoroTest::TestCondVar() {
  270. TContExecutor e(32000);
  271. e.Execute(CoCondVarTest);
  272. UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456");
  273. res.clear();
  274. }
  275. namespace NCoroTestJoin {
  276. struct TSleepCont {
  277. const TInstant Deadline;
  278. int Result;
  279. inline void operator()(TCont* c) {
  280. Result = c->SleepD(Deadline);
  281. }
  282. };
  283. struct TReadCont {
  284. const TInstant Deadline;
  285. const SOCKET Sock;
  286. int Result;
  287. inline void operator()(TCont* c) {
  288. char buf = 0;
  289. Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
  290. }
  291. };
  292. struct TJoinCont {
  293. const TInstant Deadline;
  294. TCont* const Cont;
  295. bool Result;
  296. inline void operator()(TCont* c) {
  297. Result = c->Join(Cont, Deadline);
  298. }
  299. };
  300. void DoTestJoin(EContPoller pollerType) {
  301. auto poller = IPollerFace::Construct(pollerType);
  302. if (!poller) {
  303. return;
  304. }
  305. TContExecutor e(32000, std::move(poller));
  306. TPipe in, out;
  307. TPipe::Pipe(in, out);
  308. SetNonBlock(in.GetHandle());
  309. {
  310. TSleepCont sc = {TInstant::Max(), 0};
  311. TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
  312. e.Execute(jc);
  313. UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
  314. UNIT_ASSERT_EQUAL(jc.Result, false);
  315. }
  316. {
  317. TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
  318. TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
  319. e.Execute(jc);
  320. UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
  321. UNIT_ASSERT_EQUAL(jc.Result, true);
  322. }
  323. {
  324. TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
  325. TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
  326. e.Execute(jc);
  327. UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
  328. UNIT_ASSERT_EQUAL(jc.Result, false);
  329. }
  330. {
  331. TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
  332. TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
  333. e.Execute(jc);
  334. UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
  335. UNIT_ASSERT_EQUAL(jc.Result, false);
  336. }
  337. {
  338. TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
  339. TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
  340. e.Execute(jc);
  341. UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
  342. UNIT_ASSERT_EQUAL(jc.Result, true);
  343. }
  344. {
  345. TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
  346. TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
  347. e.Execute(jc);
  348. UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
  349. UNIT_ASSERT_EQUAL(jc.Result, false);
  350. }
  351. }
  352. }
  353. void TCoroTest::TestJoinDefault() {
  354. NCoroTestJoin::DoTestJoin(EContPoller::Default);
  355. }
  356. void TCoroTest::TestJoinEpoll() {
  357. NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
  358. }
  359. void TCoroTest::TestJoinKqueue() {
  360. NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
  361. }
  362. void TCoroTest::TestJoinPoll() {
  363. NCoroTestJoin::DoTestJoin(EContPoller::Poll);
  364. }
  365. void TCoroTest::TestJoinSelect() {
  366. NCoroTestJoin::DoTestJoin(EContPoller::Select);
  367. }
  368. namespace NCoroJoinCancelExitRaceBug {
  369. struct TState {
  370. TCont* Sub = nullptr;
  371. };
  372. static void DoAux(TCont*, void* argPtr) noexcept {
  373. TState& state = *(TState*)(argPtr);
  374. // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
  375. state.Sub->Cancel();
  376. }
  377. static void DoSub2(TCont*, void*) noexcept {
  378. // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
  379. // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
  380. }
  381. static void DoSub(TCont* cont, void* argPtr) noexcept {
  382. TState& state = *(TState*)(argPtr);
  383. state.Sub = cont;
  384. // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
  385. auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
  386. // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
  387. // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
  388. // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
  389. cont->Join(sub2);
  390. state.Sub = nullptr;
  391. }
  392. static void DoMain(TCont* cont) noexcept {
  393. TState state;
  394. // 01.{Ready:[]} > {Ready:[Sub]}
  395. auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
  396. // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
  397. cont->Executor()->Create(DoAux, &state, "Aux");
  398. // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
  399. cont->Join(sub);
  400. }
  401. }
  402. void TCoroTest::TestJoinCancelExitRaceBug() {
  403. TContExecutor exec(20000);
  404. exec.SetFailOnError(true);
  405. exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
  406. }
  407. namespace NCoroWaitWakeLivelockBug {
  408. struct TState;
  409. struct TSubState {
  410. TSubState(TState& parent, ui32 self)
  411. : Parent(parent)
  412. , Name(TStringBuilder() << "Sub" << self)
  413. , Self(self)
  414. {
  415. UNIT_ASSERT(self < 2);
  416. }
  417. TSubState& OtherState();
  418. TState& Parent;
  419. TTimerEvent* Event = nullptr;
  420. TCont* Cont = nullptr;
  421. TString Name;
  422. ui32 Self = -1;
  423. };
  424. struct TState {
  425. TState()
  426. : Subs{{*this, 0}, {*this, 1}}
  427. {}
  428. TSubState Subs[2];
  429. bool Stop = false;
  430. };
  431. TSubState& TSubState::OtherState() {
  432. return Parent.Subs[1 - Self];
  433. }
  434. static void DoStop(TCont* cont, void* argPtr) {
  435. TState& state = *(TState*)(argPtr);
  436. TTimerEvent event(cont, TInstant::Now());
  437. ExecuteEvent(&event);
  438. state.Stop = true;
  439. for (auto& sub: state.Subs) {
  440. if (sub.Event) {
  441. sub.Event->Wake(EWAKEDUP);
  442. }
  443. }
  444. }
  445. static void DoSub(TCont* cont, void* argPtr) {
  446. TSubState& state = *(TSubState*)(argPtr);
  447. while (!state.Parent.Stop) {
  448. TTimerEvent event(cont, TInstant::Max());
  449. if (state.OtherState().Event) {
  450. state.OtherState().Event->Wake(EWAKEDUP);
  451. }
  452. state.Event = &event;
  453. ExecuteEvent(&event);
  454. state.Event = nullptr;
  455. }
  456. state.Cont = nullptr;
  457. }
  458. static void DoMain(TCont* cont) noexcept {
  459. TState state;
  460. for (auto& subState : state.Subs) {
  461. subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
  462. }
  463. cont->Join(
  464. cont->Executor()->Create(DoStop, &state, "Stop")
  465. );
  466. for (auto& subState : state.Subs) {
  467. if (subState.Cont) {
  468. cont->Join(subState.Cont);
  469. }
  470. }
  471. }
  472. }
  473. void TCoroTest::TestWaitWakeLivelockBug() {
  474. TContExecutor exec(20000);
  475. exec.SetFailOnError(true);
  476. exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
  477. }
  478. namespace NCoroTestFastPathWake {
  479. struct TState;
  480. struct TSubState {
  481. TSubState(TState& parent, ui32 self)
  482. : Parent(parent)
  483. , Name(TStringBuilder() << "Sub" << self)
  484. {}
  485. TState& Parent;
  486. TInstant Finish;
  487. TTimerEvent* Event = nullptr;
  488. TCont* Cont = nullptr;
  489. TString Name;
  490. };
  491. struct TState {
  492. TState()
  493. : Subs{{*this, 0}, {*this, 1}}
  494. {
  495. TPipe::Pipe(In, Out);
  496. SetNonBlock(In.GetHandle());
  497. }
  498. TSubState Subs[2];
  499. TPipe In, Out;
  500. bool IoSleepRunning = false;
  501. };
  502. static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
  503. try {
  504. TState& state = *(TState*) (argPtr);
  505. state.IoSleepRunning = true;
  506. TTempBuf tmp;
  507. // Wait for the event from io
  508. auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
  509. UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
  510. state.IoSleepRunning = false;
  511. } catch (const NUnitTest::TAssertException& ex) {
  512. Cerr << ex.AsStrBuf() << Endl;
  513. ex.BackTrace()->PrintTo(Cerr);
  514. throw;
  515. } catch (...) {
  516. Cerr << CurrentExceptionMessage() << Endl;
  517. throw;
  518. }
  519. }
  520. static void DoSub(TCont* cont, void* argPtr) noexcept {
  521. TSubState& state = *(TSubState*)(argPtr);
  522. TTimerEvent event(cont, TInstant::Max());
  523. state.Event = &event;
  524. ExecuteEvent(&event);
  525. state.Event = nullptr;
  526. state.Cont = nullptr;
  527. state.Finish = TInstant::Now();
  528. }
  529. static void DoMain(TCont* cont) noexcept {
  530. try {
  531. TState state;
  532. TInstant start = TInstant::Now();
  533. // This guy sleeps on io
  534. auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
  535. // These guys are to be woken up right away
  536. for (auto& subState : state.Subs) {
  537. subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
  538. }
  539. // Give way
  540. cont->Yield();
  541. // Check everyone has started, wake those to be woken
  542. UNIT_ASSERT(state.IoSleepRunning);
  543. for (auto& subState : state.Subs) {
  544. UNIT_ASSERT(subState.Event);
  545. subState.Event->Wake(EWAKEDUP);
  546. }
  547. // Give way again
  548. cont->Yield();
  549. // Check the woken guys have finished and quite soon
  550. for (auto& subState : state.Subs) {
  551. UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
  552. UNIT_ASSERT(!subState.Cont);
  553. }
  554. // Wake the io guy and finish
  555. state.Out.Close();
  556. if (state.IoSleepRunning) {
  557. cont->Join(sleeper);
  558. }
  559. // Check everything has ended sooner than the timeout
  560. UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
  561. } catch (const NUnitTest::TAssertException& ex) {
  562. Cerr << ex.AsStrBuf() << Endl;
  563. ex.BackTrace()->PrintTo(Cerr);
  564. throw;
  565. } catch (...) {
  566. Cerr << CurrentExceptionMessage() << Endl;
  567. throw;
  568. }
  569. }
  570. static void DoTestFastPathWake(EContPoller pollerType) {
  571. if (auto poller = IPollerFace::Construct(pollerType)) {
  572. TContExecutor exec(20000, std::move(poller));
  573. exec.SetFailOnError(true);
  574. exec.Execute(NCoroTestFastPathWake::DoMain);
  575. }
  576. }
  577. }
  578. void TCoroTest::TestFastPathWakeDefault() {
  579. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
  580. }
  581. void TCoroTest::TestFastPathWakeEpoll() {
  582. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
  583. }
  584. void TCoroTest::TestFastPathWakeKqueue() {
  585. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
  586. }
  587. void TCoroTest::TestFastPathWakePoll() {
  588. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
  589. }
  590. void TCoroTest::TestFastPathWakeSelect() {
  591. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
  592. }
  593. namespace NCoroTestLegacyCancelYieldRaceBug {
  594. enum class EState {
  595. Idle, Running, Finished,
  596. };
  597. struct TState {
  598. EState SubState = EState::Idle;
  599. };
  600. static void DoSub(TCont* cont, void* argPtr) {
  601. TState& state = *(TState*)argPtr;
  602. state.SubState = EState::Running;
  603. cont->Yield();
  604. cont->Yield();
  605. state.SubState = EState::Finished;
  606. }
  607. static void DoMain(TCont* cont, void* argPtr) {
  608. TState& state = *(TState*)argPtr;
  609. TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
  610. sub->Cancel();
  611. cont->Yield();
  612. UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
  613. }
  614. }
  615. void TCoroTest::TestLegacyCancelYieldRaceBug() {
  616. NCoroTestLegacyCancelYieldRaceBug::TState state;
  617. TContExecutor exec(20000);
  618. exec.SetFailOnError(true);
  619. exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
  620. }
  621. namespace NCoroTestJoinRescheduleBug {
  622. enum class EState {
  623. Idle, Running, Finished,
  624. };
  625. struct TState {
  626. TCont* volatile SubA = nullptr;
  627. volatile EState SubAState = EState::Idle;
  628. volatile EState SubBState = EState::Idle;
  629. volatile EState SubCState = EState::Idle;
  630. };
  631. static void DoSubC(TCont* cont, void* argPtr) {
  632. TState& state = *(TState*)argPtr;
  633. state.SubCState = EState::Running;
  634. while (state.SubBState != EState::Running) {
  635. cont->Yield();
  636. }
  637. while (cont->SleepD(TInstant::Max()) != ECANCELED) {
  638. }
  639. state.SubCState = EState::Finished;
  640. }
  641. static void DoSubB(TCont* cont, void* argPtr) {
  642. TState& state = *(TState*)argPtr;
  643. state.SubBState = EState::Running;
  644. while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
  645. cont->Yield();
  646. }
  647. for (auto i : xrange(100)) {
  648. Y_UNUSED(i);
  649. if (!state.SubA) {
  650. break;
  651. }
  652. state.SubA->ReSchedule();
  653. cont->Yield();
  654. }
  655. state.SubBState = EState::Finished;
  656. }
  657. static void DoSubA(TCont* cont, void* argPtr) {
  658. TState& state = *(TState*)argPtr;
  659. state.SubAState = EState::Running;
  660. TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
  661. while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
  662. cont->Yield();
  663. }
  664. cont->Join(subC);
  665. UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
  666. state.SubA = nullptr;
  667. state.SubAState = EState::Finished;
  668. }
  669. static void DoMain(TCont* cont, void* argPtr) {
  670. TState& state = *(TState*)argPtr;
  671. TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
  672. state.SubA = subA;
  673. cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
  674. if (state.SubA) {
  675. subA->Cancel();
  676. cont->Join(subA);
  677. }
  678. }
  679. }
  680. void TCoroTest::TestJoinRescheduleBug() {
  681. using namespace NCoroTestJoinRescheduleBug;
  682. TState state;
  683. {
  684. TContExecutor exec(20000);
  685. exec.Execute(DoMain, &state);
  686. }
  687. UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
  688. UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
  689. UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
  690. }
  691. void TCoroTest::TestEventQueue() {
  692. NCoro::TEventWaitQueue queue;
  693. UNIT_ASSERT(queue.Empty());
  694. UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
  695. TContExecutor exec(32000);
  696. exec.Execute([](TCont* cont, void* arg) {
  697. NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
  698. TTimerEvent ev(cont, TInstant::Max());
  699. TTimerEvent ev2(cont, TInstant::Seconds(12345));
  700. q->Register(&ev);
  701. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
  702. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
  703. q->Register(&ev2);
  704. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
  705. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
  706. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
  707. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
  708. }, &queue);
  709. }
  710. void TCoroTest::TestNestedExecutor() {
  711. #ifndef _win_
  712. //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
  713. TContExecutor exec(32000);
  714. UNIT_ASSERT(!RunningCont());
  715. exec.Execute([](TCont* cont, void*) {
  716. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
  717. TContExecutor exec2(32000);
  718. exec2.Execute([](TCont* cont2, void*) {
  719. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
  720. TContExecutor exec3(32000);
  721. exec3.Execute([](TCont* cont3, void*) {
  722. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3);
  723. });
  724. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
  725. });
  726. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
  727. });
  728. UNIT_ASSERT(!RunningCont());
  729. #endif
  730. }
  731. void TCoroTest::TestComputeCoroutineYield() {
  732. //if we have busy (e.g., on cpu) coroutine, when it yields, io must flow
  733. TContExecutor exec(32000);
  734. exec.SetFailOnError(true);
  735. TPipe in, out;
  736. TPipe::Pipe(in, out);
  737. SetNonBlock(in.GetHandle());
  738. size_t lastRead = 42;
  739. auto compute = [&](TCont* cont) {
  740. for (size_t i = 0; i < 10; ++i) {
  741. write(out.GetHandle(), &i, sizeof i);
  742. Sleep(TDuration::MilliSeconds(10));
  743. cont->Yield();
  744. UNIT_ASSERT(lastRead == i);
  745. }
  746. };
  747. auto io = [&](TCont* cont) {
  748. for (size_t i = 0; i < 10; ++i) {
  749. NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead);
  750. }
  751. };
  752. exec.Create(compute, "compute");
  753. exec.Create(io, "io");
  754. exec.Execute();
  755. }
  756. void TCoroTest::TestPollEngines() {
  757. bool defaultChecked = false;
  758. for (auto engine : GetEnumAllValues<EContPoller>()) {
  759. auto poller = IPollerFace::Construct(engine);
  760. if (!poller) {
  761. continue;
  762. }
  763. TContExecutor exec(32000, IPollerFace::Construct(engine));
  764. if (engine == EContPoller::Default) {
  765. defaultChecked = true;
  766. #if defined(HAVE_EPOLL_POLLER)
  767. UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Epoll);
  768. #elif defined(HAVE_KQUEUE_POLLER)
  769. UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Kqueue);
  770. #else
  771. UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Select);
  772. #endif
  773. } else {
  774. UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine);
  775. }
  776. }
  777. UNIT_ASSERT(defaultChecked);
  778. }
  779. void TCoroTest::TestPause() {
  780. TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()};
  781. int i = 0;
  782. executor.CreateOwned([&](TCont*) {
  783. i++;
  784. executor.Pause();
  785. i++;
  786. }, "coro");
  787. UNIT_ASSERT_EQUAL(i, 0);
  788. executor.Execute();
  789. UNIT_ASSERT_EQUAL(i, 1);
  790. executor.Execute();
  791. UNIT_ASSERT_EQUAL(i, 2);
  792. }
  793. void TCoroTest::TestUserEvent() {
  794. TContExecutor exec(32000);
  795. struct TUserEvent : public IUserEvent {
  796. bool Called = false;
  797. void Execute() override {
  798. Called = true;
  799. }
  800. } event;
  801. auto f = [&](TCont* cont) {
  802. UNIT_ASSERT(!event.Called);
  803. exec.ScheduleUserEvent(&event);
  804. UNIT_ASSERT(!event.Called);
  805. cont->Yield();
  806. UNIT_ASSERT(event.Called);
  807. };
  808. exec.Execute(f);
  809. UNIT_ASSERT(event.Called);
  810. }
  811. void TCoroTest::TestOverrideTime() {
  812. class TTime: public NCoro::ITime {
  813. public:
  814. TInstant Now() override {
  815. return Current;
  816. }
  817. TInstant Current = TInstant::Zero();
  818. };
  819. TTime time;
  820. TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing(), &time};
  821. executor.CreateOwned([&](TCont* cont) {
  822. UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Zero());
  823. time.Current = TInstant::Seconds(1);
  824. cont->SleepD(TInstant::Seconds(1));
  825. UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Seconds(1));
  826. }, "coro");
  827. executor.Execute();
  828. }
  829. void TCoroTest::TestCancelWithException() {
  830. TContExecutor exec(32000);
  831. TString excText = "test exception";
  832. THolder<std::exception> excep = MakeHolder<yexception>(yexception() << excText);
  833. std::exception* excPtr = excep.Get();
  834. exec.CreateOwned([&](TCont* cont){
  835. TCont *cont1 = cont->Executor()->CreateOwned([&](TCont* c) {
  836. int result = c->SleepD(TDuration::MilliSeconds(200).ToDeadLine());
  837. UNIT_ASSERT_EQUAL(result, ECANCELED);
  838. UNIT_ASSERT_EQUAL(c->Cancelled(), true);
  839. THolder<std::exception> exc = c->TakeException();
  840. UNIT_ASSERT_EQUAL(exc.Get(), excPtr);
  841. UNIT_ASSERT_EQUAL(exc->what(), excText);
  842. UNIT_ASSERT(dynamic_cast<yexception*>(exc.Get()) != nullptr);
  843. }, "cancelExc");
  844. cont1->Cancel(std::move(excep));
  845. TCont* cont2 = cont->Executor()->CreateOwned([&](TCont* c) {
  846. int result = c->SleepD(TDuration::MilliSeconds(200).ToDeadLine());
  847. UNIT_ASSERT_EQUAL(result, ECANCELED);
  848. UNIT_ASSERT_EQUAL(c->Cancelled(), true);
  849. THolder<std::exception> exc = c->TakeException();
  850. UNIT_ASSERT_EQUAL(exc.Get(), nullptr);
  851. }, "cancelTwice");
  852. cont2->Cancel();
  853. THolder<std::exception> e = MakeHolder<yexception>(yexception() << "another exception");
  854. cont2->Cancel(std::move(e));
  855. }, "coro");
  856. exec.Execute();
  857. }
  858. UNIT_TEST_SUITE_REGISTRATION(TCoroTest);