coroutine_ut.cpp 26 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007
  1. #include "impl.h"
  2. #include "condvar.h"
  3. #include "network.h"
  4. #include <library/cpp/testing/unittest/registar.h>
  5. #include <util/string/cast.h>
  6. #include <util/system/pipe.h>
  7. #include <util/system/env.h>
  8. #include <util/system/info.h>
  9. #include <util/system/thread.h>
  10. #include <util/generic/xrange.h>
  11. #include <util/generic/serialized_enum.h>
  12. // TODO (velavokr): BALANCER-1345 add more tests on pollers
  13. class TCoroTest: public TTestBase {
  14. UNIT_TEST_SUITE(TCoroTest);
  15. UNIT_TEST(TestSimpleX1);
  16. UNIT_TEST(TestSimpleX1MultiThread);
  17. UNIT_TEST(TestSimpleX2);
  18. UNIT_TEST(TestSimpleX3);
  19. UNIT_TEST(TestMemFun);
  20. UNIT_TEST(TestMutex);
  21. UNIT_TEST(TestCondVar);
  22. UNIT_TEST(TestJoinDefault);
  23. UNIT_TEST(TestJoinEpoll);
  24. UNIT_TEST(TestJoinKqueue);
  25. UNIT_TEST(TestJoinPoll);
  26. UNIT_TEST(TestJoinSelect);
  27. UNIT_TEST(TestException);
  28. UNIT_TEST(TestJoinCancelExitRaceBug);
  29. UNIT_TEST(TestWaitWakeLivelockBug);
  30. UNIT_TEST(TestFastPathWakeDefault)
  31. // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
  32. // UNIT_TEST(TestFastPathWakeEpoll)
  33. UNIT_TEST(TestFastPathWakeKqueue)
  34. UNIT_TEST(TestFastPathWakePoll)
  35. UNIT_TEST(TestFastPathWakeSelect)
  36. UNIT_TEST(TestLegacyCancelYieldRaceBug)
  37. UNIT_TEST(TestJoinRescheduleBug);
  38. UNIT_TEST(TestEventQueue)
  39. UNIT_TEST(TestNestedExecutor)
  40. UNIT_TEST(TestComputeCoroutineYield)
  41. UNIT_TEST(TestPollEngines);
  42. UNIT_TEST(TestUserEvent);
  43. UNIT_TEST(TestPause);
  44. UNIT_TEST(TestOverrideTime);
  45. UNIT_TEST_SUITE_END();
  46. public:
  47. void TestException();
  48. void TestSimpleX1();
  49. void TestSimpleX1MultiThread();
  50. void TestSimpleX2();
  51. void TestSimpleX3();
  52. void TestMemFun();
  53. void TestMutex();
  54. void TestCondVar();
  55. void TestJoinDefault();
  56. void TestJoinEpoll();
  57. void TestJoinKqueue();
  58. void TestJoinPoll();
  59. void TestJoinSelect();
  60. void TestJoinCancelExitRaceBug();
  61. void TestWaitWakeLivelockBug();
  62. void TestFastPathWakeDefault();
  63. void TestFastPathWakeEpoll();
  64. void TestFastPathWakeKqueue();
  65. void TestFastPathWakePoll();
  66. void TestFastPathWakeSelect();
  67. void TestLegacyCancelYieldRaceBug();
  68. void TestJoinRescheduleBug();
  69. void TestEventQueue();
  70. void TestNestedExecutor();
  71. void TestComputeCoroutineYield();
  72. void TestPollEngines();
  73. void TestUserEvent();
  74. void TestPause();
  75. void TestOverrideTime();
  76. };
  77. void TCoroTest::TestException() {
  78. TContExecutor e(1000000);
  79. bool f2run = false;
  80. auto f1 = [&f2run](TCont* c) {
  81. struct TCtx {
  82. ~TCtx() {
  83. Y_VERIFY(!*F2);
  84. C->Yield();
  85. }
  86. TCont* C;
  87. bool* F2;
  88. };
  89. try {
  90. TCtx ctx = {c, &f2run};
  91. throw 1;
  92. } catch (...) {
  93. }
  94. };
  95. bool unc = true;
  96. auto f2 = [&unc, &f2run](TCont*) {
  97. f2run = true;
  98. unc = std::uncaught_exception();
  99. // check segfault
  100. try {
  101. throw 2;
  102. } catch (int) {
  103. }
  104. };
  105. e.Create(f1, "f1");
  106. e.Create(f2, "f2");
  107. e.Execute();
  108. UNIT_ASSERT(!unc);
  109. }
  110. static int i0;
  111. static void CoRun(TCont* c, void* /*run*/) {
  112. while (i0 < 100000) {
  113. ++i0;
  114. UNIT_ASSERT(RunningCont() == c);
  115. c->Yield();
  116. UNIT_ASSERT(RunningCont() == c);
  117. }
  118. }
  119. static void CoMain(TCont* c, void* /*arg*/) {
  120. for (volatile size_t i2 = 0; i2 < 10; ++i2) {
  121. UNIT_ASSERT(RunningCont() == c);
  122. c->Executor()->Create(CoRun, nullptr, "run");
  123. UNIT_ASSERT(RunningCont() == c);
  124. }
  125. }
  126. void TCoroTest::TestSimpleX1() {
  127. i0 = 0;
  128. TContExecutor e(32000);
  129. UNIT_ASSERT(RunningCont() == nullptr);
  130. e.Execute(CoMain);
  131. UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
  132. UNIT_ASSERT(RunningCont() == nullptr);
  133. }
  134. void TCoroTest::TestSimpleX1MultiThread() {
  135. TVector<THolder<TThread>> threads;
  136. const size_t nThreads = 0;
  137. TAtomic c = 0;
  138. for (size_t i = 0; i < nThreads; ++i) {
  139. threads.push_back(MakeHolder<TThread>([&]() {
  140. TestSimpleX1();
  141. AtomicIncrement(c);
  142. }));
  143. }
  144. for (auto& t : threads) {
  145. t->Start();
  146. }
  147. for (auto& t: threads) {
  148. t->Join();
  149. }
  150. UNIT_ASSERT_EQUAL(c, nThreads);
  151. }
  152. struct TTestObject {
  153. int i = 0;
  154. int j = 0;
  155. public:
  156. void RunTask1(TCont*) {
  157. i = 1;
  158. }
  159. void RunTask2(TCont*) {
  160. j = 2;
  161. }
  162. };
  163. void TCoroTest::TestMemFun() {
  164. i0 = 0;
  165. TContExecutor e(32000);
  166. TTestObject obj;
  167. e.Create<TTestObject, &TTestObject::RunTask1>(&obj, "test1");
  168. e.Execute<TTestObject, &TTestObject::RunTask2>(&obj);
  169. UNIT_ASSERT_EQUAL(obj.i, 1);
  170. UNIT_ASSERT_EQUAL(obj.j, 2);
  171. }
  172. void TCoroTest::TestSimpleX2() {
  173. {
  174. i0 = 0;
  175. {
  176. TContExecutor e(32000);
  177. e.Execute(CoMain);
  178. }
  179. UNIT_ASSERT_EQUAL(i0, 100000);
  180. }
  181. {
  182. i0 = 0;
  183. {
  184. TContExecutor e(32000);
  185. e.Execute(CoMain);
  186. }
  187. UNIT_ASSERT_EQUAL(i0, 100000);
  188. }
  189. }
  190. struct TRunner {
  191. inline TRunner()
  192. : Runs(0)
  193. {
  194. }
  195. inline void operator()(TCont* c) {
  196. ++Runs;
  197. c->Yield();
  198. }
  199. size_t Runs;
  200. };
  201. void TCoroTest::TestSimpleX3() {
  202. TContExecutor e(32000);
  203. TRunner runner;
  204. for (volatile size_t i3 = 0; i3 < 1000; ++i3) {
  205. e.Create(runner, "runner");
  206. }
  207. e.Execute();
  208. UNIT_ASSERT_EQUAL(runner.Runs, 1000);
  209. }
  210. static TString res;
  211. static TContMutex mutex;
  212. static void CoMutex(TCont* c, void* /*run*/) {
  213. {
  214. mutex.LockI(c);
  215. c->Yield();
  216. res += c->Name();
  217. mutex.UnLock();
  218. }
  219. c->Yield();
  220. {
  221. mutex.LockI(c);
  222. c->Yield();
  223. res += c->Name();
  224. mutex.UnLock();
  225. }
  226. }
  227. static void CoMutexTest(TCont* c, void* /*run*/) {
  228. c->Executor()->Create(CoMutex, nullptr, "1");
  229. c->Executor()->Create(CoMutex, nullptr, "2");
  230. }
  231. void TCoroTest::TestMutex() {
  232. TContExecutor e(32000);
  233. e.Execute(CoMutexTest);
  234. UNIT_ASSERT_EQUAL(res, "1212");
  235. res.clear();
  236. }
  237. static TContMutex m1;
  238. static TContCondVar c1;
  239. static void CoCondVar(TCont* c, void* /*run*/) {
  240. for (size_t i4 = 0; i4 < 3; ++i4) {
  241. UNIT_ASSERT_EQUAL(m1.LockI(c), 0);
  242. UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0);
  243. res += c->Name();
  244. m1.UnLock();
  245. }
  246. }
  247. static void CoCondVarTest(TCont* c, void* /*run*/) {
  248. c->Executor()->Create(CoCondVar, nullptr, "1");
  249. c->Yield();
  250. c->Executor()->Create(CoCondVar, nullptr, "2");
  251. c->Yield();
  252. c->Executor()->Create(CoCondVar, nullptr, "3");
  253. c->Yield();
  254. c->Executor()->Create(CoCondVar, nullptr, "4");
  255. c->Yield();
  256. c->Executor()->Create(CoCondVar, nullptr, "5");
  257. c->Yield();
  258. c->Executor()->Create(CoCondVar, nullptr, "6");
  259. c->Yield();
  260. for (size_t i5 = 0; i5 < 3; ++i5) {
  261. res += ToString((size_t)i5) + "^";
  262. c1.BroadCast();
  263. c->Yield();
  264. }
  265. }
  266. void TCoroTest::TestCondVar() {
  267. TContExecutor e(32000);
  268. e.Execute(CoCondVarTest);
  269. UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456");
  270. res.clear();
  271. }
  272. namespace NCoroTestJoin {
  273. struct TSleepCont {
  274. const TInstant Deadline;
  275. int Result;
  276. inline void operator()(TCont* c) {
  277. Result = c->SleepD(Deadline);
  278. }
  279. };
  280. struct TReadCont {
  281. const TInstant Deadline;
  282. const SOCKET Sock;
  283. int Result;
  284. inline void operator()(TCont* c) {
  285. char buf = 0;
  286. Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
  287. }
  288. };
  289. struct TJoinCont {
  290. const TInstant Deadline;
  291. TCont* const Cont;
  292. bool Result;
  293. inline void operator()(TCont* c) {
  294. Result = c->Join(Cont, Deadline);
  295. }
  296. };
  297. void DoTestJoin(EContPoller pollerType) {
  298. auto poller = IPollerFace::Construct(pollerType);
  299. if (!poller) {
  300. return;
  301. }
  302. TContExecutor e(32000, std::move(poller));
  303. TPipe in, out;
  304. TPipe::Pipe(in, out);
  305. SetNonBlock(in.GetHandle());
  306. {
  307. TSleepCont sc = {TInstant::Max(), 0};
  308. TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
  309. e.Execute(jc);
  310. UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
  311. UNIT_ASSERT_EQUAL(jc.Result, false);
  312. }
  313. {
  314. TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
  315. TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
  316. e.Execute(jc);
  317. UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
  318. UNIT_ASSERT_EQUAL(jc.Result, true);
  319. }
  320. {
  321. TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
  322. TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
  323. e.Execute(jc);
  324. UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
  325. UNIT_ASSERT_EQUAL(jc.Result, false);
  326. }
  327. {
  328. TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
  329. TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
  330. e.Execute(jc);
  331. UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
  332. UNIT_ASSERT_EQUAL(jc.Result, false);
  333. }
  334. {
  335. TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
  336. TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
  337. e.Execute(jc);
  338. UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
  339. UNIT_ASSERT_EQUAL(jc.Result, true);
  340. }
  341. {
  342. TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
  343. TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
  344. e.Execute(jc);
  345. UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
  346. UNIT_ASSERT_EQUAL(jc.Result, false);
  347. }
  348. }
  349. }
  350. void TCoroTest::TestJoinDefault() {
  351. NCoroTestJoin::DoTestJoin(EContPoller::Default);
  352. }
  353. void TCoroTest::TestJoinEpoll() {
  354. NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
  355. }
  356. void TCoroTest::TestJoinKqueue() {
  357. NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
  358. }
  359. void TCoroTest::TestJoinPoll() {
  360. NCoroTestJoin::DoTestJoin(EContPoller::Poll);
  361. }
  362. void TCoroTest::TestJoinSelect() {
  363. NCoroTestJoin::DoTestJoin(EContPoller::Select);
  364. }
  365. namespace NCoroJoinCancelExitRaceBug {
  366. struct TState {
  367. TCont* Sub = nullptr;
  368. };
  369. static void DoAux(TCont*, void* argPtr) noexcept {
  370. TState& state = *(TState*)(argPtr);
  371. // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
  372. state.Sub->Cancel();
  373. }
  374. static void DoSub2(TCont*, void*) noexcept {
  375. // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
  376. // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
  377. }
  378. static void DoSub(TCont* cont, void* argPtr) noexcept {
  379. TState& state = *(TState*)(argPtr);
  380. state.Sub = cont;
  381. // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
  382. auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
  383. // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
  384. // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
  385. // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
  386. cont->Join(sub2);
  387. state.Sub = nullptr;
  388. }
  389. static void DoMain(TCont* cont) noexcept {
  390. TState state;
  391. // 01.{Ready:[]} > {Ready:[Sub]}
  392. auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
  393. // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
  394. cont->Executor()->Create(DoAux, &state, "Aux");
  395. // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
  396. cont->Join(sub);
  397. }
  398. }
  399. void TCoroTest::TestJoinCancelExitRaceBug() {
  400. TContExecutor exec(20000);
  401. exec.SetFailOnError(true);
  402. exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
  403. }
  404. namespace NCoroWaitWakeLivelockBug {
  405. struct TState;
  406. struct TSubState {
  407. TSubState(TState& parent, ui32 self)
  408. : Parent(parent)
  409. , Name(TStringBuilder() << "Sub" << self)
  410. , Self(self)
  411. {
  412. UNIT_ASSERT(self < 2);
  413. }
  414. TSubState& OtherState();
  415. TState& Parent;
  416. TTimerEvent* Event = nullptr;
  417. TCont* Cont = nullptr;
  418. TString Name;
  419. ui32 Self = -1;
  420. };
  421. struct TState {
  422. TState()
  423. : Subs{{*this, 0}, {*this, 1}}
  424. {}
  425. TSubState Subs[2];
  426. bool Stop = false;
  427. };
  428. TSubState& TSubState::OtherState() {
  429. return Parent.Subs[1 - Self];
  430. }
  431. static void DoStop(TCont* cont, void* argPtr) {
  432. TState& state = *(TState*)(argPtr);
  433. TTimerEvent event(cont, TInstant::Now());
  434. ExecuteEvent(&event);
  435. state.Stop = true;
  436. for (auto& sub: state.Subs) {
  437. if (sub.Event) {
  438. sub.Event->Wake(EWAKEDUP);
  439. }
  440. }
  441. }
  442. static void DoSub(TCont* cont, void* argPtr) {
  443. TSubState& state = *(TSubState*)(argPtr);
  444. while (!state.Parent.Stop) {
  445. TTimerEvent event(cont, TInstant::Max());
  446. if (state.OtherState().Event) {
  447. state.OtherState().Event->Wake(EWAKEDUP);
  448. }
  449. state.Event = &event;
  450. ExecuteEvent(&event);
  451. state.Event = nullptr;
  452. }
  453. state.Cont = nullptr;
  454. }
  455. static void DoMain(TCont* cont) noexcept {
  456. TState state;
  457. for (auto& subState : state.Subs) {
  458. subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
  459. }
  460. cont->Join(
  461. cont->Executor()->Create(DoStop, &state, "Stop")
  462. );
  463. for (auto& subState : state.Subs) {
  464. if (subState.Cont) {
  465. cont->Join(subState.Cont);
  466. }
  467. }
  468. }
  469. }
  470. void TCoroTest::TestWaitWakeLivelockBug() {
  471. TContExecutor exec(20000);
  472. exec.SetFailOnError(true);
  473. exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
  474. }
  475. namespace NCoroTestFastPathWake {
  476. struct TState;
  477. struct TSubState {
  478. TSubState(TState& parent, ui32 self)
  479. : Parent(parent)
  480. , Name(TStringBuilder() << "Sub" << self)
  481. {}
  482. TState& Parent;
  483. TInstant Finish;
  484. TTimerEvent* Event = nullptr;
  485. TCont* Cont = nullptr;
  486. TString Name;
  487. };
  488. struct TState {
  489. TState()
  490. : Subs{{*this, 0}, {*this, 1}}
  491. {
  492. TPipe::Pipe(In, Out);
  493. SetNonBlock(In.GetHandle());
  494. }
  495. TSubState Subs[2];
  496. TPipe In, Out;
  497. bool IoSleepRunning = false;
  498. };
  499. static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
  500. try {
  501. TState& state = *(TState*) (argPtr);
  502. state.IoSleepRunning = true;
  503. TTempBuf tmp;
  504. // Wait for the event from io
  505. auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
  506. UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
  507. state.IoSleepRunning = false;
  508. } catch (const NUnitTest::TAssertException& ex) {
  509. Cerr << ex.AsStrBuf() << Endl;
  510. ex.BackTrace()->PrintTo(Cerr);
  511. throw;
  512. } catch (...) {
  513. Cerr << CurrentExceptionMessage() << Endl;
  514. throw;
  515. }
  516. }
  517. static void DoSub(TCont* cont, void* argPtr) noexcept {
  518. TSubState& state = *(TSubState*)(argPtr);
  519. TTimerEvent event(cont, TInstant::Max());
  520. state.Event = &event;
  521. ExecuteEvent(&event);
  522. state.Event = nullptr;
  523. state.Cont = nullptr;
  524. state.Finish = TInstant::Now();
  525. }
  526. static void DoMain(TCont* cont) noexcept {
  527. try {
  528. TState state;
  529. TInstant start = TInstant::Now();
  530. // This guy sleeps on io
  531. auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
  532. // These guys are to be woken up right away
  533. for (auto& subState : state.Subs) {
  534. subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
  535. }
  536. // Give way
  537. cont->Yield();
  538. // Check everyone has started, wake those to be woken
  539. UNIT_ASSERT(state.IoSleepRunning);
  540. for (auto& subState : state.Subs) {
  541. UNIT_ASSERT(subState.Event);
  542. subState.Event->Wake(EWAKEDUP);
  543. }
  544. // Give way again
  545. cont->Yield();
  546. // Check the woken guys have finished and quite soon
  547. for (auto& subState : state.Subs) {
  548. UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
  549. UNIT_ASSERT(!subState.Cont);
  550. }
  551. // Wake the io guy and finish
  552. state.Out.Close();
  553. if (state.IoSleepRunning) {
  554. cont->Join(sleeper);
  555. }
  556. // Check everything has ended sooner than the timeout
  557. UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
  558. } catch (const NUnitTest::TAssertException& ex) {
  559. Cerr << ex.AsStrBuf() << Endl;
  560. ex.BackTrace()->PrintTo(Cerr);
  561. throw;
  562. } catch (...) {
  563. Cerr << CurrentExceptionMessage() << Endl;
  564. throw;
  565. }
  566. }
  567. static void DoTestFastPathWake(EContPoller pollerType) {
  568. if (auto poller = IPollerFace::Construct(pollerType)) {
  569. TContExecutor exec(20000, std::move(poller));
  570. exec.SetFailOnError(true);
  571. exec.Execute(NCoroTestFastPathWake::DoMain);
  572. }
  573. }
  574. }
  575. void TCoroTest::TestFastPathWakeDefault() {
  576. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
  577. }
  578. void TCoroTest::TestFastPathWakeEpoll() {
  579. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
  580. }
  581. void TCoroTest::TestFastPathWakeKqueue() {
  582. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
  583. }
  584. void TCoroTest::TestFastPathWakePoll() {
  585. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
  586. }
  587. void TCoroTest::TestFastPathWakeSelect() {
  588. NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
  589. }
  590. namespace NCoroTestLegacyCancelYieldRaceBug {
  591. enum class EState {
  592. Idle, Running, Finished,
  593. };
  594. struct TState {
  595. EState SubState = EState::Idle;
  596. };
  597. static void DoSub(TCont* cont, void* argPtr) {
  598. TState& state = *(TState*)argPtr;
  599. state.SubState = EState::Running;
  600. cont->Yield();
  601. cont->Yield();
  602. state.SubState = EState::Finished;
  603. }
  604. static void DoMain(TCont* cont, void* argPtr) {
  605. TState& state = *(TState*)argPtr;
  606. TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
  607. sub->Cancel();
  608. cont->Yield();
  609. UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
  610. }
  611. }
  612. void TCoroTest::TestLegacyCancelYieldRaceBug() {
  613. NCoroTestLegacyCancelYieldRaceBug::TState state;
  614. TContExecutor exec(20000);
  615. exec.SetFailOnError(true);
  616. exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
  617. }
  618. namespace NCoroTestJoinRescheduleBug {
  619. enum class EState {
  620. Idle, Running, Finished,
  621. };
  622. struct TState {
  623. TCont* volatile SubA = nullptr;
  624. volatile EState SubAState = EState::Idle;
  625. volatile EState SubBState = EState::Idle;
  626. volatile EState SubCState = EState::Idle;
  627. };
  628. static void DoSubC(TCont* cont, void* argPtr) {
  629. TState& state = *(TState*)argPtr;
  630. state.SubCState = EState::Running;
  631. while (state.SubBState != EState::Running) {
  632. cont->Yield();
  633. }
  634. while (cont->SleepD(TInstant::Max()) != ECANCELED) {
  635. }
  636. state.SubCState = EState::Finished;
  637. }
  638. static void DoSubB(TCont* cont, void* argPtr) {
  639. TState& state = *(TState*)argPtr;
  640. state.SubBState = EState::Running;
  641. while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
  642. cont->Yield();
  643. }
  644. for (auto i : xrange(100)) {
  645. Y_UNUSED(i);
  646. if (!state.SubA) {
  647. break;
  648. }
  649. state.SubA->ReSchedule();
  650. cont->Yield();
  651. }
  652. state.SubBState = EState::Finished;
  653. }
  654. static void DoSubA(TCont* cont, void* argPtr) {
  655. TState& state = *(TState*)argPtr;
  656. state.SubAState = EState::Running;
  657. TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
  658. while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
  659. cont->Yield();
  660. }
  661. cont->Join(subC);
  662. UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
  663. state.SubA = nullptr;
  664. state.SubAState = EState::Finished;
  665. }
  666. static void DoMain(TCont* cont, void* argPtr) {
  667. TState& state = *(TState*)argPtr;
  668. TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
  669. state.SubA = subA;
  670. cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
  671. if (state.SubA) {
  672. subA->Cancel();
  673. cont->Join(subA);
  674. }
  675. }
  676. }
  677. void TCoroTest::TestJoinRescheduleBug() {
  678. using namespace NCoroTestJoinRescheduleBug;
  679. TState state;
  680. {
  681. TContExecutor exec(20000);
  682. exec.Execute(DoMain, &state);
  683. }
  684. UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
  685. UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
  686. UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
  687. }
  688. void TCoroTest::TestEventQueue() {
  689. NCoro::TEventWaitQueue queue;
  690. UNIT_ASSERT(queue.Empty());
  691. UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
  692. TContExecutor exec(32000);
  693. exec.Execute([](TCont* cont, void* arg) {
  694. NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
  695. TTimerEvent ev(cont, TInstant::Max());
  696. TTimerEvent ev2(cont, TInstant::Seconds(12345));
  697. q->Register(&ev);
  698. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
  699. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
  700. q->Register(&ev2);
  701. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
  702. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
  703. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
  704. UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
  705. }, &queue);
  706. }
  707. void TCoroTest::TestNestedExecutor() {
  708. #ifndef _win_
  709. //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
  710. TContExecutor exec(32000);
  711. UNIT_ASSERT(!RunningCont());
  712. exec.Execute([](TCont* cont, void*) {
  713. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
  714. TContExecutor exec2(32000);
  715. exec2.Execute([](TCont* cont2, void*) {
  716. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
  717. TContExecutor exec3(32000);
  718. exec3.Execute([](TCont* cont3, void*) {
  719. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3);
  720. });
  721. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
  722. });
  723. UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
  724. });
  725. UNIT_ASSERT(!RunningCont());
  726. #endif
  727. }
  728. void TCoroTest::TestComputeCoroutineYield() {
  729. //if we have busy (e.g., on cpu) coroutine, when it yields, io must flow
  730. TContExecutor exec(32000);
  731. exec.SetFailOnError(true);
  732. TPipe in, out;
  733. TPipe::Pipe(in, out);
  734. SetNonBlock(in.GetHandle());
  735. size_t lastRead = 42;
  736. auto compute = [&](TCont* cont) {
  737. for (size_t i = 0; i < 10; ++i) {
  738. write(out.GetHandle(), &i, sizeof i);
  739. Sleep(TDuration::MilliSeconds(10));
  740. cont->Yield();
  741. UNIT_ASSERT(lastRead == i);
  742. }
  743. };
  744. auto io = [&](TCont* cont) {
  745. for (size_t i = 0; i < 10; ++i) {
  746. NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead);
  747. }
  748. };
  749. exec.Create(compute, "compute");
  750. exec.Create(io, "io");
  751. exec.Execute();
  752. }
  753. void TCoroTest::TestPollEngines() {
  754. bool defaultChecked = false;
  755. for (auto engine : GetEnumAllValues<EContPoller>()) {
  756. auto poller = IPollerFace::Construct(engine);
  757. if (!poller) {
  758. continue;
  759. }
  760. TContExecutor exec(32000, IPollerFace::Construct(engine));
  761. if (engine == EContPoller::Default) {
  762. defaultChecked = true;
  763. UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined);
  764. } else {
  765. UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine);
  766. }
  767. }
  768. UNIT_ASSERT(defaultChecked);
  769. }
  770. void TCoroTest::TestPause() {
  771. TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()};
  772. int i = 0;
  773. executor.CreateOwned([&](TCont*) {
  774. i++;
  775. executor.Pause();
  776. i++;
  777. }, "coro");
  778. UNIT_ASSERT_EQUAL(i, 0);
  779. executor.Execute();
  780. UNIT_ASSERT_EQUAL(i, 1);
  781. executor.Execute();
  782. UNIT_ASSERT_EQUAL(i, 2);
  783. }
  784. void TCoroTest::TestUserEvent() {
  785. TContExecutor exec(32000);
  786. struct TUserEvent : public IUserEvent {
  787. bool Called = false;
  788. void Execute() override {
  789. Called = true;
  790. }
  791. } event;
  792. auto f = [&](TCont* cont) {
  793. UNIT_ASSERT(!event.Called);
  794. exec.ScheduleUserEvent(&event);
  795. UNIT_ASSERT(!event.Called);
  796. cont->Yield();
  797. UNIT_ASSERT(event.Called);
  798. };
  799. exec.Execute(f);
  800. UNIT_ASSERT(event.Called);
  801. }
  802. void TCoroTest::TestOverrideTime() {
  803. class TTime: public NCoro::ITime {
  804. public:
  805. TInstant Now() override {
  806. return Current;
  807. }
  808. TInstant Current = TInstant::Zero();
  809. };
  810. TTime time;
  811. TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing(), &time};
  812. executor.CreateOwned([&](TCont* cont) {
  813. UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Zero());
  814. time.Current = TInstant::Seconds(1);
  815. cont->SleepD(TInstant::Seconds(1));
  816. UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Seconds(1));
  817. }, "coro");
  818. executor.Execute();
  819. }
  820. UNIT_TEST_SUITE_REGISTRATION(TCoroTest);