test_runtime.cpp 73 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902
  1. #include "test_runtime.h"
  2. #include <library/cpp/actors/core/actor_bootstrapped.h>
  3. #include <library/cpp/actors/core/callstack.h>
  4. #include <library/cpp/actors/core/executor_pool_basic.h>
  5. #include <library/cpp/actors/core/executor_pool_io.h>
  6. #include <library/cpp/actors/core/log.h>
  7. #include <library/cpp/actors/core/scheduler_basic.h>
  8. #include <library/cpp/actors/util/datetime.h>
  9. #include <library/cpp/actors/protos/services_common.pb.h>
  10. #include <library/cpp/random_provider/random_provider.h>
  11. #include <library/cpp/actors/interconnect/interconnect.h>
  12. #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
  13. #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
  14. #include <util/generic/maybe.h>
  15. #include <util/generic/bt_exception.h>
  16. #include <util/random/mersenne.h>
  17. #include <util/string/printf.h>
  18. #include <typeinfo>
  19. bool VERBOSE = false;
  20. const bool PRINT_EVENT_BODY = false;
  21. namespace {
  22. TString MakeClusterId() {
  23. pid_t pid = getpid();
  24. TStringBuilder uuid;
  25. uuid << "Cluster for process with id: " << pid;
  26. return uuid;
  27. }
  28. }
  29. namespace NActors {
  30. ui64 TScheduledEventQueueItem::NextUniqueId = 0;
  31. void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) {
  32. Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite())
  33. << ", from " << ev->Sender.LocalId();
  34. TString name = runtime->GetActorName(ev->Sender);
  35. if (!name.empty())
  36. Cerr << " \"" << name << "\"";
  37. Cerr << ", to " << ev->GetRecipientRewrite().LocalId();
  38. name = runtime->GetActorName(ev->GetRecipientRewrite());
  39. if (!name.empty())
  40. Cerr << " \"" << name << "\"";
  41. Cerr << ", ";
  42. if (ev->HasEvent())
  43. Cerr << " : " << (PRINT_EVENT_BODY ? ev->GetBase()->ToString() : ev->GetBase()->ToStringHeader());
  44. else if (ev->HasBuffer())
  45. Cerr << " : BUFFER";
  46. else
  47. Cerr << " : EMPTY";
  48. Cerr << "\n";
  49. }
  50. TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() {
  51. ActorSystemTimestamp = nullptr;
  52. ActorSystemMonotonic = nullptr;
  53. }
  54. void TTestActorRuntimeBase::TNodeDataBase::Stop() {
  55. if (Poller)
  56. Poller->Stop();
  57. if (MailboxTable) {
  58. for (ui32 round = 0; !MailboxTable->Cleanup(); ++round)
  59. Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub");
  60. }
  61. if (ActorSystem)
  62. ActorSystem->Stop();
  63. ActorSystem.Destroy();
  64. Poller.Reset();
  65. }
  66. TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() {
  67. Stop();
  68. }
  69. class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
  70. public:
  71. static constexpr EActivityType ActorActivityType() {
  72. return TEST_ACTOR_RUNTIME;
  73. }
  74. TEdgeActor(TTestActorRuntimeBase* runtime)
  75. : TActor(&TEdgeActor::StateFunc)
  76. , Runtime(runtime)
  77. {
  78. }
  79. STFUNC(StateFunc) {
  80. Y_UNUSED(ctx);
  81. TGuard<TMutex> guard(Runtime->Mutex);
  82. bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
  83. if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
  84. verbose = false;
  85. }
  86. if (verbose) {
  87. Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
  88. PrintEvent(ev, Runtime);
  89. }
  90. if (!Runtime->EventFilterFunc(*Runtime, ev)) {
  91. ui32 nodeId = ev->GetRecipientRewrite().NodeId();
  92. Y_VERIFY(nodeId != 0);
  93. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  94. Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
  95. Runtime->MailboxesHasEvents.Signal();
  96. if (verbose)
  97. Cerr << "Event was added to sent queue\n";
  98. }
  99. else {
  100. if (verbose)
  101. Cerr << "Event was dropped\n";
  102. }
  103. }
  104. private:
  105. TTestActorRuntimeBase* Runtime;
  106. };
  107. void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
  108. IEventHandle* ptr = ev.Get();
  109. Y_VERIFY(ptr);
  110. #ifdef DEBUG_ORDER_EVENTS
  111. ui64 counter = NextToSend++;
  112. TrackSent[ptr] = counter;
  113. #endif
  114. Sent.push_back(ev);
  115. }
  116. TAutoPtr<IEventHandle> TEventMailBox::Pop() {
  117. TAutoPtr<IEventHandle> result = Sent.front();
  118. Sent.pop_front();
  119. #ifdef DEBUG_ORDER_EVENTS
  120. auto it = TrackSent.find(result.Get());
  121. if (it != TrackSent.end()) {
  122. Y_VERIFY(ExpectedReceive == it->second);
  123. TrackSent.erase(result.Get());
  124. ++ExpectedReceive;
  125. }
  126. #endif
  127. return result;
  128. }
  129. bool TEventMailBox::IsEmpty() const {
  130. return Sent.empty();
  131. }
  132. void TEventMailBox::Capture(TEventsList& evList) {
  133. evList.insert(evList.end(), Sent.begin(), Sent.end());
  134. Sent.clear();
  135. }
  136. void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) {
  137. Sent.push_front(ev);
  138. }
  139. void TEventMailBox::PushFront(TEventsList& evList) {
  140. for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) {
  141. if (*rit) {
  142. Sent.push_front(*rit);
  143. }
  144. }
  145. }
  146. void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) {
  147. for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) {
  148. evList.insert(*it);
  149. }
  150. Scheduled.clear();
  151. }
  152. void TEventMailBox::PushScheduled(TScheduledEventsList& evList) {
  153. for (auto it = evList.begin(); it != evList.end(); ++it) {
  154. if (it->Event) {
  155. Scheduled.insert(*it);
  156. }
  157. }
  158. evList.clear();
  159. }
  160. bool TEventMailBox::IsActive(const TInstant& currentTime) const {
  161. return currentTime >= InactiveUntil;
  162. }
  163. void TEventMailBox::Freeze(const TInstant& deadline) {
  164. if (deadline > InactiveUntil)
  165. InactiveUntil = deadline;
  166. }
  167. TInstant TEventMailBox::GetInactiveUntil() const {
  168. return InactiveUntil;
  169. }
  170. void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) {
  171. Scheduled.insert(item);
  172. }
  173. bool TEventMailBox::IsScheduledEmpty() const {
  174. return Scheduled.empty();
  175. }
  176. TInstant TEventMailBox::GetFirstScheduleDeadline() const {
  177. return Scheduled.begin()->Deadline;
  178. }
  179. ui64 TEventMailBox::GetSentEventCount() const {
  180. return Sent.size();
  181. }
  182. class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
  183. public:
  184. TTimeProvider(TTestActorRuntimeBase& runtime)
  185. : Runtime(runtime)
  186. {
  187. }
  188. TInstant Now() override {
  189. return Runtime.GetCurrentTime();
  190. }
  191. private:
  192. TTestActorRuntimeBase& Runtime;
  193. };
  194. class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
  195. public:
  196. TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
  197. : Runtime(runtime)
  198. , Node(node)
  199. {
  200. Y_UNUSED(Runtime);
  201. }
  202. void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override {
  203. Y_UNUSED(actorSystem);
  204. Node->ActorSystemTimestamp = currentTimestamp;
  205. Node->ActorSystemMonotonic = currentMonotonic;
  206. }
  207. void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override {
  208. Y_UNUSED(readers);
  209. Y_UNUSED(scheduleReadersCount);
  210. }
  211. void Start() override {
  212. }
  213. void PrepareStop() override {
  214. }
  215. void Stop() override {
  216. }
  217. private:
  218. TTestActorRuntimeBase* Runtime;
  219. TTestActorRuntimeBase::TNodeDataBase* Node;
  220. };
  221. class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool {
  222. public:
  223. TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId)
  224. : IExecutorPool(poolId)
  225. , Runtime(runtime)
  226. , NodeIndex(nodeIndex)
  227. , Node(node)
  228. {
  229. }
  230. TTestActorRuntimeBase* GetRuntime() {
  231. return Runtime;
  232. }
  233. // for threads
  234. ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override {
  235. Y_UNUSED(wctx);
  236. Y_UNUSED(revolvingCounter);
  237. Y_FAIL();
  238. }
  239. void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override {
  240. Y_UNUSED(workerId);
  241. Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter);
  242. }
  243. void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
  244. DoSchedule(deadline, ev, cookie, workerId);
  245. }
  246. void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
  247. DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId);
  248. }
  249. void Schedule(TDuration delay, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
  250. TInstant deadline = Runtime->GetTimeProvider()->Now() + delay;
  251. DoSchedule(deadline, ev, cookie, workerId);
  252. }
  253. void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) {
  254. Y_UNUSED(workerId);
  255. TGuard<TMutex> guard(Runtime->Mutex);
  256. bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
  257. if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
  258. verbose = false;
  259. }
  260. if (verbose) {
  261. Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
  262. PrintEvent(ev, Runtime);
  263. }
  264. auto now = Runtime->GetTimeProvider()->Now();
  265. if (deadline < now) {
  266. deadline = now; // avoid going backwards in time
  267. }
  268. TDuration delay = (deadline - now);
  269. if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
  270. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  271. Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
  272. Runtime->MailboxesHasEvents.Signal();
  273. if (verbose)
  274. Cerr << "Event was added to scheduled queue\n";
  275. } else {
  276. if (cookie) {
  277. cookie->Detach();
  278. }
  279. if (verbose) {
  280. Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
  281. }
  282. }
  283. }
  284. // for actorsystem
  285. bool Send(TAutoPtr<IEventHandle>& ev) override {
  286. TGuard<TMutex> guard(Runtime->Mutex);
  287. bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
  288. if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
  289. verbose = false;
  290. }
  291. if (verbose) {
  292. Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
  293. PrintEvent(ev, Runtime);
  294. }
  295. if (!Runtime->EventFilterFunc(*Runtime, ev)) {
  296. ui32 nodeId = ev->GetRecipientRewrite().NodeId();
  297. Y_VERIFY(nodeId != 0);
  298. TNodeDataBase* node = Runtime->Nodes[nodeId].Get();
  299. if (!AllowSendFrom(node, ev)) {
  300. return true;
  301. }
  302. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  303. if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
  304. const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
  305. TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
  306. if (ev->GetRecipientRewrite() == logger) {
  307. TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
  308. IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
  309. if (recipientActor) {
  310. TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite());
  311. TActivationContext *prevTlsActivationContext = TlsActivationContext;
  312. TlsActivationContext = &ctx;
  313. recipientActor->Receive(ev, ctx);
  314. TlsActivationContext = prevTlsActivationContext;
  315. // we expect the logger to never die in tests
  316. }
  317. }
  318. } else {
  319. Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
  320. Runtime->MailboxesHasEvents.Signal();
  321. }
  322. if (verbose)
  323. Cerr << "Event was added to sent queue\n";
  324. } else {
  325. if (verbose)
  326. Cerr << "Event was dropped\n";
  327. }
  328. return true;
  329. }
  330. void ScheduleActivation(ui32 activation) override {
  331. Y_UNUSED(activation);
  332. }
  333. void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override {
  334. Y_UNUSED(activation);
  335. Y_UNUSED(revolvingCounter);
  336. }
  337. TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter,
  338. const TActorId& parentId) override {
  339. return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
  340. }
  341. TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override {
  342. return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId);
  343. }
  344. // lifecycle stuff
  345. void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override {
  346. Y_UNUSED(actorSystem);
  347. Y_UNUSED(scheduleReaders);
  348. Y_UNUSED(scheduleSz);
  349. }
  350. void Start() override {
  351. }
  352. void PrepareStop() override {
  353. }
  354. void Shutdown() override {
  355. }
  356. bool Cleanup() override {
  357. return true;
  358. }
  359. // generic
  360. TAffinity* Affinity() const override {
  361. Y_FAIL();
  362. }
  363. private:
  364. TTestActorRuntimeBase* const Runtime;
  365. const ui32 NodeIndex;
  366. TTestActorRuntimeBase::TNodeDataBase* const Node;
  367. };
  368. IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) {
  369. return new TExecutorPoolStub{runtime, nodeIndex, node, poolId};
  370. }
  371. ui32 TTestActorRuntimeBase::NextNodeId = 1;
  372. TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv)
  373. : TTestActorRuntimeBase(1, 1, false)
  374. {
  375. SingleSysEnv = true;
  376. }
  377. TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
  378. : ScheduledCount(0)
  379. , ScheduledLimit(100000)
  380. , MainThreadId(TThread::CurrentThreadId())
  381. , ClusterUUID(MakeClusterId())
  382. , FirstNodeId(NextNodeId)
  383. , NodeCount(nodeCount)
  384. , DataCenterCount(dataCenterCount)
  385. , UseRealThreads(useRealThreads)
  386. , LocalId(0)
  387. , DispatchCyclesCount(0)
  388. , DispatchedEventsCount(0)
  389. , NeedMonitoring(false)
  390. , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
  391. , TimeProvider(new TTimeProvider(*this))
  392. , ShouldContinue()
  393. , CurrentTimestamp(0)
  394. , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT)
  395. , ReschedulingDelay(TDuration::MicroSeconds(0))
  396. , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc)
  397. , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
  398. , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc)
  399. , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc)
  400. , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
  401. , CurrentDispatchContext(nullptr)
  402. {
  403. SetDispatcherRandomSeed(TInstant::Now(), 0);
  404. EnableActorCallstack();
  405. }
  406. void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
  407. const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
  408. node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */,
  409. NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0);
  410. node->LogSettings->SetAllowDrop(false);
  411. node->LogSettings->SetThrottleDelay(TDuration::Zero());
  412. node->DynamicCounters = new NMonitoring::TDynamicCounters;
  413. InitNodeImpl(node, nodeIndex);
  414. }
  415. void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) {
  416. node->LogSettings->Append(
  417. NActorsServices::EServiceCommon_MIN,
  418. NActorsServices::EServiceCommon_MAX,
  419. NActorsServices::EServiceCommon_Name
  420. );
  421. if (!UseRealThreads) {
  422. node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
  423. node->MailboxTable.Reset(new TMailboxTable());
  424. node->ActorSystem = MakeActorSystem(nodeIndex, node);
  425. node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
  426. } else {
  427. node->ActorSystem = MakeActorSystem(nodeIndex, node);
  428. }
  429. node->ActorSystem->Start();
  430. }
  431. bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) {
  432. ui64 senderLocalId = ev->Sender.LocalId();
  433. ui64 senderMailboxHint = ev->Sender.Hint();
  434. TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint);
  435. if (senderMailbox) {
  436. IActor* senderActor = senderMailbox->FindActor(senderLocalId);
  437. TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(senderActor);
  438. return !decorator || decorator->BeforeSending(ev);
  439. }
  440. return true;
  441. }
  442. TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount)
  443. : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) {
  444. }
  445. TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads)
  446. : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) {
  447. }
  448. TTestActorRuntimeBase::~TTestActorRuntimeBase() {
  449. CleanupNodes();
  450. Cerr.Flush();
  451. Cerr.Flush();
  452. Clog.Flush();
  453. DisableActorCallstack();
  454. }
  455. void TTestActorRuntimeBase::CleanupNodes() {
  456. Nodes.clear();
  457. }
  458. bool TTestActorRuntimeBase::IsRealThreads() const {
  459. return UseRealThreads;
  460. }
  461. TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
  462. Y_UNUSED(runtime);
  463. Y_UNUSED(event);
  464. return EEventAction::PROCESS;
  465. }
  466. void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
  467. Y_UNUSED(runtime);
  468. Y_UNUSED(queue);
  469. scheduledEvents.clear();
  470. }
  471. bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
  472. Y_UNUSED(runtime);
  473. Y_UNUSED(event);
  474. return false;
  475. }
  476. bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) {
  477. Y_UNUSED(runtime);
  478. Y_UNUSED(delay);
  479. Y_UNUSED(event);
  480. Y_UNUSED(deadline);
  481. return true;
  482. }
  483. void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
  484. if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
  485. runtime.ScheduleWhiteList.insert(actorId);
  486. runtime.ScheduleWhiteListParent[actorId] = parentId;
  487. }
  488. }
  489. class TScheduledTreeItem {
  490. public:
  491. TString Name;
  492. ui64 Count;
  493. TVector<TScheduledTreeItem> Children;
  494. TScheduledTreeItem(const TString& name)
  495. : Name(name)
  496. , Count(0)
  497. {}
  498. TScheduledTreeItem* GetItem(const TString& name) {
  499. TScheduledTreeItem* item = nullptr;
  500. for (TScheduledTreeItem& i : Children) {
  501. if (i.Name == name) {
  502. item = &i;
  503. break;
  504. }
  505. }
  506. if (item != nullptr)
  507. return item;
  508. Children.emplace_back(name);
  509. return &Children.back();
  510. }
  511. void RecursiveSort() {
  512. Sort(Children, [](const TScheduledTreeItem& a, const TScheduledTreeItem& b) -> bool { return a.Count > b.Count; });
  513. for (TScheduledTreeItem& item : Children) {
  514. item.RecursiveSort();
  515. }
  516. }
  517. void Print(IOutputStream& stream, const TString& prefix) {
  518. for (auto it = Children.begin(); it != Children.end(); ++it) {
  519. bool lastChild = (std::next(it) == Children.end());
  520. TString connectionPrefix = lastChild ? "└─ " : "├─ ";
  521. TString subChildPrefix = lastChild ? " " : "│ ";
  522. stream << prefix << connectionPrefix << it->Name << " (" << it->Count << ")\n";
  523. it->Print(stream, prefix + subChildPrefix);
  524. }
  525. }
  526. void Print(IOutputStream& stream) {
  527. stream << Name << " (" << Count << ")\n";
  528. Print(stream, TString());
  529. }
  530. };
  531. void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
  532. if (scheduledEvents.empty())
  533. return;
  534. TInstant time = scheduledEvents.begin()->Deadline;
  535. while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
  536. static THashMap<std::pair<TActorId, TString>, ui64> eventTypes;
  537. auto& item = *scheduledEvents.begin();
  538. TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type);
  539. eventTypes[std::make_pair(item.Event->Recipient, name)]++;
  540. runtime.ScheduledCount++;
  541. if (runtime.ScheduledCount > runtime.ScheduledLimit) {
  542. // TScheduledTreeItem root("Root");
  543. // TVector<TString> path;
  544. // for (const auto& pr : eventTypes) {
  545. // path.clear();
  546. // path.push_back(runtime.GetActorName(pr.first.first));
  547. // for (auto it = runtime.ScheduleWhiteListParent.find(pr.first.first); it != runtime.ScheduleWhiteListParent.end(); it = runtime.ScheduleWhiteListParent.find(it->second)) {
  548. // path.insert(path.begin(), runtime.GetActorName(it->second));
  549. // }
  550. // path.push_back("<" + pr.first.second + ">"); // event name;
  551. // ui64 count = pr.second;
  552. // TScheduledTreeItem* item = &root;
  553. // item->Count += count;
  554. // for (TString name : path) {
  555. // item = item->GetItem(name);
  556. // item->Count += count;
  557. // }
  558. // }
  559. // root.RecursiveSort();
  560. // root.Print(Cerr);
  561. ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
  562. }
  563. if (item.Cookie->Get()) {
  564. if (item.Cookie->Detach()) {
  565. queue.push_back(item.Event);
  566. }
  567. } else {
  568. queue.push_back(item.Event);
  569. }
  570. scheduledEvents.erase(scheduledEvents.begin());
  571. }
  572. runtime.UpdateCurrentTime(time);
  573. }
  574. TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) {
  575. TGuard<TMutex> guard(Mutex);
  576. auto result = ObserverFunc;
  577. ObserverFunc = observerFunc;
  578. return result;
  579. }
  580. TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) {
  581. TGuard<TMutex> guard(Mutex);
  582. auto result = ScheduledEventsSelectorFunc;
  583. ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc;
  584. return result;
  585. }
  586. TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) {
  587. TGuard<TMutex> guard(Mutex);
  588. auto result = EventFilterFunc;
  589. EventFilterFunc = filterFunc;
  590. return result;
  591. }
  592. TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) {
  593. TGuard<TMutex> guard(Mutex);
  594. auto result = ScheduledEventFilterFunc;
  595. ScheduledEventFilterFunc = filterFunc;
  596. return result;
  597. }
  598. TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) {
  599. TGuard<TMutex> guard(Mutex);
  600. auto result = RegistrationObserver;
  601. RegistrationObserver = observerFunc;
  602. return result;
  603. }
  604. bool TTestActorRuntimeBase::IsVerbose() {
  605. return VERBOSE;
  606. }
  607. void TTestActorRuntimeBase::SetVerbose(bool verbose) {
  608. VERBOSE = verbose;
  609. }
  610. void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) {
  611. Y_VERIFY(!IsInitialized);
  612. Y_VERIFY(nodeIndex < NodeCount);
  613. auto node = Nodes[nodeIndex + FirstNodeId];
  614. if (!node) {
  615. node = GetNodeFactory().CreateNode();
  616. Nodes[nodeIndex + FirstNodeId] = node;
  617. }
  618. node->LocalServicesActors[actorId] = cmd.Actor;
  619. node->LocalServices.push_back(std::make_pair(actorId, cmd));
  620. }
  621. void TTestActorRuntimeBase::InitNodes() {
  622. NextNodeId += NodeCount;
  623. Y_VERIFY(NodeCount > 0);
  624. for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
  625. auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
  626. TNodeDataBase* node = nodeIt->second.Get();
  627. InitNode(node, nodeIndex);
  628. }
  629. }
  630. void TTestActorRuntimeBase::Initialize() {
  631. InitNodes();
  632. IsInitialized = true;
  633. }
  634. void SetupCrossDC() {
  635. }
  636. TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) {
  637. TGuard<TMutex> guard(Mutex);
  638. TDuration oldTimeout = DispatchTimeout;
  639. DispatchTimeout = timeout;
  640. return oldTimeout;
  641. }
  642. TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) {
  643. TGuard<TMutex> guard(Mutex);
  644. TDuration oldDelay = ReschedulingDelay;
  645. ReschedulingDelay = delay;
  646. return oldDelay;
  647. }
  648. void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
  649. Y_VERIFY(!IsInitialized);
  650. TGuard<TMutex> guard(Mutex);
  651. LogBackend = logBackend;
  652. }
  653. void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
  654. TGuard<TMutex> guard(Mutex);
  655. for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
  656. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  657. TString explanation;
  658. auto status = node->LogSettings->SetLevel(priority, component, explanation);
  659. if (status) {
  660. Y_FAIL("SetLogPriority failed: %s", explanation.c_str());
  661. }
  662. }
  663. }
  664. TInstant TTestActorRuntimeBase::GetCurrentTime() const {
  665. TGuard<TMutex> guard(Mutex);
  666. Y_VERIFY(!UseRealThreads);
  667. return TInstant::MicroSeconds(CurrentTimestamp);
  668. }
  669. void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
  670. static int counter = 0;
  671. ++counter;
  672. if (VERBOSE) {
  673. Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n";
  674. }
  675. TGuard<TMutex> guard(Mutex);
  676. Y_VERIFY(!UseRealThreads);
  677. if (newTime.MicroSeconds() > CurrentTimestamp) {
  678. CurrentTimestamp = newTime.MicroSeconds();
  679. for (auto& kv : Nodes) {
  680. AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
  681. AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
  682. }
  683. }
  684. }
  685. void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) {
  686. UpdateCurrentTime(GetCurrentTime() + duration);
  687. }
  688. TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() {
  689. Y_VERIFY(!UseRealThreads);
  690. return TimeProvider;
  691. }
  692. ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
  693. Y_VERIFY(index < NodeCount);
  694. return FirstNodeId + index;
  695. }
  696. ui32 TTestActorRuntimeBase::GetNodeCount() const {
  697. return NodeCount;
  698. }
  699. ui64 TTestActorRuntimeBase::AllocateLocalId() {
  700. TGuard<TMutex> guard(Mutex);
  701. ui64 nextId = ++LocalId;
  702. if (VERBOSE) {
  703. Cerr << "Allocated id: " << nextId << "\n";
  704. }
  705. return nextId;
  706. }
  707. ui32 TTestActorRuntimeBase::InterconnectPoolId() const {
  708. if (UseRealThreads && NSan::TSanIsOn()) {
  709. // Interconnect coroutines may move across threads
  710. // Use a special single-threaded pool to avoid that
  711. return 4;
  712. }
  713. return 0;
  714. }
  715. TString TTestActorRuntimeBase::GetTempDir() {
  716. if (!TmpDir)
  717. TmpDir.Reset(new TTempDir());
  718. return (*TmpDir)();
  719. }
  720. TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType,
  721. ui64 revolvingCounter, const TActorId& parentId) {
  722. Y_VERIFY(nodeIndex < NodeCount);
  723. TGuard<TMutex> guard(Mutex);
  724. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  725. if (UseRealThreads) {
  726. Y_VERIFY(poolId < node->ExecutorPools.size());
  727. return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
  728. }
  729. // first step - find good enough mailbox
  730. ui32 hint = 0;
  731. TMailboxHeader *mailbox = nullptr;
  732. {
  733. ui32 hintBackoff = 0;
  734. while (hint == 0) {
  735. hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter);
  736. mailbox = node->MailboxTable->Get(hint);
  737. if (!mailbox->LockFromFree()) {
  738. node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
  739. hintBackoff = hint;
  740. hint = 0;
  741. }
  742. }
  743. node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
  744. }
  745. const ui64 localActorId = AllocateLocalId();
  746. if (VERBOSE) {
  747. Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n";
  748. }
  749. // ok, got mailbox
  750. mailbox->AttachActor(localActorId, actor);
  751. // do init
  752. const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
  753. ActorNames[actorId] = TypeName(*actor);
  754. RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
  755. DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
  756. switch (mailboxType) {
  757. case TMailboxType::Simple:
  758. UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  759. break;
  760. case TMailboxType::Revolving:
  761. UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  762. break;
  763. case TMailboxType::HTSwap:
  764. UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  765. break;
  766. case TMailboxType::ReadAsFilled:
  767. UnlockFromExecution((TMailboxTable::TReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  768. break;
  769. case TMailboxType::TinyReadAsFilled:
  770. UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  771. break;
  772. default:
  773. Y_FAIL("Unsupported mailbox type");
  774. }
  775. return actorId;
  776. }
  777. TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
  778. const TActorId& parentId) {
  779. Y_VERIFY(nodeIndex < NodeCount);
  780. TGuard<TMutex> guard(Mutex);
  781. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  782. if (UseRealThreads) {
  783. Y_VERIFY(poolId < node->ExecutorPools.size());
  784. return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
  785. }
  786. const ui64 localActorId = AllocateLocalId();
  787. if (VERBOSE) {
  788. Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n";
  789. }
  790. mailbox->AttachActor(localActorId, actor);
  791. const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
  792. ActorNames[actorId] = TypeName(*actor);
  793. RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
  794. DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
  795. return actorId;
  796. }
  797. TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
  798. TGuard<TMutex> guard(Mutex);
  799. Y_VERIFY(nodeIndex < NodeCount);
  800. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  801. if (!UseRealThreads) {
  802. IActor* actor = FindActor(actorId, node);
  803. node->LocalServicesActors[serviceId] = actor;
  804. node->ActorToActorId[actor] = actorId;
  805. }
  806. return node->ActorSystem->RegisterLocalService(serviceId, actorId);
  807. }
  808. TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) {
  809. TGuard<TMutex> guard(Mutex);
  810. Y_VERIFY(nodeIndex < NodeCount);
  811. TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex);
  812. EdgeActors.insert(edgeActor);
  813. EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor;
  814. return edgeActor;
  815. }
  816. TEventsList TTestActorRuntimeBase::CaptureEvents() {
  817. TGuard<TMutex> guard(Mutex);
  818. TEventsList result;
  819. for (auto& mbox : Mailboxes) {
  820. mbox.second->Capture(result);
  821. }
  822. return result;
  823. }
  824. TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) {
  825. TGuard<TMutex> guard(Mutex);
  826. Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
  827. TEventsList result;
  828. GetMailbox(nodeId, hint).Capture(result);
  829. return result;
  830. }
  831. void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) {
  832. TGuard<TMutex> guard(Mutex);
  833. ui32 nodeId = ev->GetRecipientRewrite().NodeId();
  834. Y_VERIFY(nodeId != 0);
  835. GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
  836. }
  837. void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) {
  838. TGuard<TMutex> guard(Mutex);
  839. for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
  840. if (*rit) {
  841. auto& ev = *rit;
  842. ui32 nodeId = ev->GetRecipientRewrite().NodeId();
  843. Y_VERIFY(nodeId != 0);
  844. GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
  845. }
  846. }
  847. events.clear();
  848. }
  849. void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) {
  850. TGuard<TMutex> guard(Mutex);
  851. Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
  852. TEventsList result;
  853. GetMailbox(nodeId, hint).PushFront(events);
  854. events.clear();
  855. }
  856. TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() {
  857. TGuard<TMutex> guard(Mutex);
  858. TScheduledEventsList result;
  859. for (auto& mbox : Mailboxes) {
  860. mbox.second->CaptureScheduled(result);
  861. }
  862. return result;
  863. }
  864. bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) {
  865. return DispatchEvents(options, TInstant::Max());
  866. }
  867. bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) {
  868. return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout);
  869. }
  870. bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
  871. TGuard<TMutex> guard(Mutex);
  872. return DispatchEventsInternal(options, simDeadline);
  873. }
  874. // Mutex must be locked by caller!
  875. bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
  876. TDispatchContext localContext;
  877. localContext.Options = &options;
  878. localContext.PrevContext = nullptr;
  879. bool verbose = !options.Quiet && VERBOSE;
  880. struct TDispatchContextSetter {
  881. TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext)
  882. : Runtime(runtime)
  883. {
  884. lastContext.PrevContext = Runtime.CurrentDispatchContext;
  885. Runtime.CurrentDispatchContext = &lastContext;
  886. }
  887. ~TDispatchContextSetter() {
  888. Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext;
  889. }
  890. TTestActorRuntimeBase& Runtime;
  891. } DispatchContextSetter(*this, localContext);
  892. TInstant dispatchTime = TInstant::MicroSeconds(0);
  893. TInstant deadline = dispatchTime + DispatchTimeout;
  894. const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10);
  895. TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
  896. if (verbose) {
  897. Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n";
  898. }
  899. struct TTempEdgeEventsCaptor {
  900. TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime)
  901. : Runtime(runtime)
  902. , HasEvents(false)
  903. {
  904. for (auto edgeActor : Runtime.EdgeActors) {
  905. TEventsList events;
  906. Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events);
  907. auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
  908. auto storeIt = Store.find(mboxId);
  909. Y_VERIFY(storeIt == Store.end());
  910. storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first;
  911. storeIt->second->PushFront(events);
  912. if (!events.empty())
  913. HasEvents = true;
  914. }
  915. }
  916. ~TTempEdgeEventsCaptor() {
  917. for (auto edgeActor : Runtime.EdgeActors) {
  918. auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
  919. auto storeIt = Store.find(mboxId);
  920. if (storeIt == Store.end()) {
  921. continue;
  922. }
  923. TEventsList events;
  924. storeIt->second->Capture(events);
  925. Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events);
  926. }
  927. }
  928. TTestActorRuntimeBase& Runtime;
  929. TEventMailBoxList Store;
  930. bool HasEvents;
  931. };
  932. TEventMailBoxList restrictedMailboxes;
  933. const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty();
  934. for (auto mailboxId : options.OnlyMailboxes) {
  935. auto it = Mailboxes.find(mailboxId);
  936. if (it == Mailboxes.end()) {
  937. it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first;
  938. }
  939. restrictedMailboxes.insert(std::make_pair(mailboxId, it->second));
  940. }
  941. TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor;
  942. if (!restrictedMailboxes) {
  943. tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this));
  944. }
  945. TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
  946. while (!currentMailboxes.empty()) {
  947. bool hasProgress = true;
  948. while (hasProgress) {
  949. ++DispatchCyclesCount;
  950. hasProgress = false;
  951. ui64 eventsToDispatch = 0;
  952. for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
  953. if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  954. eventsToDispatch += mboxIt->second->GetSentEventCount();
  955. }
  956. }
  957. ui32 eventsDispatched = 0;
  958. //TODO: count events before each cycle, break after dispatching that much events
  959. bool isEmpty = false;
  960. while (!isEmpty && eventsDispatched < eventsToDispatch) {
  961. ui64 mailboxCount = currentMailboxes.size();
  962. ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
  963. auto startWithMboxIt = currentMailboxes.begin();
  964. for (ui64 i = 0; i < startWith; ++i) {
  965. ++startWithMboxIt;
  966. }
  967. auto endWithMboxIt = startWithMboxIt;
  968. isEmpty = true;
  969. auto mboxIt = startWithMboxIt;
  970. TDeque<TEventMailboxId> suspectedBoxes;
  971. while (true) {
  972. auto& mbox = *mboxIt;
  973. bool isIgnored = true;
  974. if (!mbox.second->IsEmpty()) {
  975. HandleNonEmptyMailboxesForEachContext(mbox.first);
  976. if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  977. bool isEdgeMailbox = false;
  978. if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) {
  979. isEdgeMailbox = true;
  980. TEventsList events;
  981. mbox.second->Capture(events);
  982. for (auto& ev : events) {
  983. TInverseGuard<TMutex> inverseGuard(Mutex);
  984. ObserverFunc(*this, ev);
  985. }
  986. mbox.second->PushFront(events);
  987. }
  988. if (!isEdgeMailbox) {
  989. isEmpty = false;
  990. isIgnored = false;
  991. ++eventsDispatched;
  992. ++DispatchedEventsCount;
  993. if (DispatchedEventsCount > DispatchedEventsLimit) {
  994. ythrow TWithBackTrace<yexception>() << "Dispatched "
  995. << DispatchedEventsLimit << " events, limit reached.";
  996. }
  997. auto ev = mbox.second->Pop();
  998. if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) {
  999. //UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10));
  1000. if (verbose) {
  1001. Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
  1002. PrintEvent(ev, this);
  1003. }
  1004. }
  1005. hasProgress = true;
  1006. EEventAction action;
  1007. {
  1008. TInverseGuard<TMutex> inverseGuard(Mutex);
  1009. action = ObserverFunc(*this, ev);
  1010. }
  1011. switch (action) {
  1012. case EEventAction::PROCESS:
  1013. UpdateFinalEventsStatsForEachContext(*ev);
  1014. SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
  1015. break;
  1016. case EEventAction::DROP:
  1017. // do nothing
  1018. break;
  1019. case EEventAction::RESCHEDULE: {
  1020. TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
  1021. mbox.second->Freeze(deadline);
  1022. mbox.second->PushFront(ev);
  1023. break;
  1024. }
  1025. default:
  1026. Y_FAIL("Unknown action");
  1027. }
  1028. }
  1029. }
  1030. }
  1031. Y_VERIFY(mboxIt != currentMailboxes.end());
  1032. if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
  1033. mboxIt->second->IsEmpty() &&
  1034. mboxIt->second->IsScheduledEmpty() &&
  1035. mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  1036. suspectedBoxes.push_back(mboxIt->first);
  1037. }
  1038. ++mboxIt;
  1039. if (mboxIt == currentMailboxes.end()) {
  1040. mboxIt = currentMailboxes.begin();
  1041. }
  1042. Y_VERIFY(endWithMboxIt != currentMailboxes.end());
  1043. if (mboxIt == endWithMboxIt) {
  1044. break;
  1045. }
  1046. }
  1047. for (auto id : suspectedBoxes) {
  1048. auto it = currentMailboxes.find(id);
  1049. if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
  1050. it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  1051. currentMailboxes.erase(it);
  1052. }
  1053. }
  1054. }
  1055. }
  1056. if (localContext.FinalEventFound) {
  1057. return true;
  1058. }
  1059. if (!localContext.FoundNonEmptyMailboxes.empty())
  1060. return true;
  1061. if (options.CustomFinalCondition && options.CustomFinalCondition())
  1062. return true;
  1063. if (options.FinalEvents.empty()) {
  1064. for (auto& mbox : currentMailboxes) {
  1065. if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
  1066. continue;
  1067. if (!mbox.second->IsEmpty()) {
  1068. if (verbose) {
  1069. Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
  1070. }
  1071. return true;
  1072. }
  1073. }
  1074. }
  1075. if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) {
  1076. return false;
  1077. }
  1078. if (dispatchTime >= deadline) {
  1079. if (verbose) {
  1080. Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
  1081. }
  1082. ythrow TWithBackTrace<TEmptyEventQueueException>();
  1083. }
  1084. if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
  1085. inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
  1086. bool isEmpty = true;
  1087. TMaybe<TInstant> nearestMailboxDeadline;
  1088. TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes;
  1089. TMaybe<TInstant> nextScheduleDeadline;
  1090. for (auto& mbox : currentMailboxes) {
  1091. if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  1092. if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) {
  1093. nearestMailboxDeadline = mbox.second->GetInactiveUntil();
  1094. }
  1095. continue;
  1096. }
  1097. if (mbox.second->IsScheduledEmpty())
  1098. continue;
  1099. auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline();
  1100. if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) {
  1101. nextScheduleMboxes.clear();
  1102. nextScheduleMboxes.emplace_back(mbox.second);
  1103. nextScheduleDeadline = firstScheduleDeadline;
  1104. } else if (firstScheduleDeadline == *nextScheduleDeadline) {
  1105. nextScheduleMboxes.emplace_back(mbox.second);
  1106. }
  1107. }
  1108. for (const auto& nextScheduleMbox : nextScheduleMboxes) {
  1109. TEventsList selectedEvents;
  1110. TScheduledEventsList capturedScheduledEvents;
  1111. nextScheduleMbox->CaptureScheduled(capturedScheduledEvents);
  1112. ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents);
  1113. nextScheduleMbox->PushScheduled(capturedScheduledEvents);
  1114. for (auto& event : selectedEvents) {
  1115. if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) {
  1116. Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
  1117. PrintEvent(event, this);
  1118. }
  1119. nextScheduleMbox->Send(event);
  1120. isEmpty = false;
  1121. }
  1122. }
  1123. if (!isEmpty) {
  1124. if (verbose) {
  1125. Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
  1126. }
  1127. deadline = dispatchTime + DispatchTimeout;
  1128. continue;
  1129. }
  1130. if (nearestMailboxDeadline.Defined()) {
  1131. if (verbose) {
  1132. Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n";
  1133. }
  1134. UpdateCurrentTime(*nearestMailboxDeadline.Get());
  1135. continue;
  1136. }
  1137. }
  1138. TDuration waitDelay = TDuration::MilliSeconds(10);
  1139. dispatchTime += waitDelay;
  1140. MailboxesHasEvents.WaitT(Mutex, waitDelay);
  1141. }
  1142. return false;
  1143. }
  1144. void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) {
  1145. TDispatchContext* context = CurrentDispatchContext;
  1146. while (context) {
  1147. const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
  1148. if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) {
  1149. context->FoundNonEmptyMailboxes.insert(mboxId);
  1150. }
  1151. context = context->PrevContext;
  1152. }
  1153. }
  1154. void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) {
  1155. TDispatchContext* context = CurrentDispatchContext;
  1156. while (context) {
  1157. for (const auto& finalEvent : context->Options->FinalEvents) {
  1158. if (finalEvent.EventCheck(ev)) {
  1159. auto& freq = context->FinalEventFrequency[&finalEvent];
  1160. if (++freq >= finalEvent.RequiredCount) {
  1161. context->FinalEventFound = true;
  1162. }
  1163. }
  1164. }
  1165. context = context->PrevContext;
  1166. }
  1167. }
  1168. void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
  1169. TGuard<TMutex> guard(Mutex);
  1170. Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
  1171. senderNodeIndex, NodeCount);
  1172. SendInternal(ev, senderNodeIndex, viaActorSystem);
  1173. }
  1174. void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) {
  1175. TGuard<TMutex> guard(Mutex);
  1176. Y_VERIFY(nodeIndex < NodeCount);
  1177. ui32 nodeId = FirstNodeId + nodeIndex;
  1178. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  1179. TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration;
  1180. GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr));
  1181. if (VERBOSE)
  1182. Cerr << "Event was added to scheduled queue\n";
  1183. }
  1184. void TTestActorRuntimeBase::ClearCounters() {
  1185. TGuard<TMutex> guard(Mutex);
  1186. EvCounters.clear();
  1187. }
  1188. ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const {
  1189. TGuard<TMutex> guard(Mutex);
  1190. auto it = EvCounters.find(evType);
  1191. if (it == EvCounters.end())
  1192. return 0;
  1193. return it->second;
  1194. }
  1195. TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
  1196. TGuard<TMutex> guard(Mutex);
  1197. Y_VERIFY(nodeIndex < NodeCount);
  1198. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  1199. return node->ActorSystem->LookupLocalService(serviceId);
  1200. }
  1201. void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) {
  1202. TGuard<TMutex> guard(Mutex);
  1203. ui32 dispatchCount = 0;
  1204. if (!edgeFilter.empty()) {
  1205. for (auto edgeActor : edgeFilter) {
  1206. Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data());
  1207. }
  1208. }
  1209. const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter;
  1210. TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout;
  1211. for (;;) {
  1212. for (auto edgeActor : edgeActors) {
  1213. TEventsList events;
  1214. auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint());
  1215. bool foundEvent = false;
  1216. mbox.Capture(events);
  1217. for (auto& ev : events) {
  1218. if (filter(*this, ev)) {
  1219. foundEvent = true;
  1220. break;
  1221. }
  1222. }
  1223. mbox.PushFront(events);
  1224. if (foundEvent)
  1225. return;
  1226. }
  1227. ++dispatchCount;
  1228. {
  1229. if (!DispatchEventsInternal(TDispatchOptions(), deadline)) {
  1230. return; // Timed out; event was not found
  1231. }
  1232. }
  1233. Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop");
  1234. }
  1235. }
  1236. TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
  1237. TGuard<TMutex> guard(Mutex);
  1238. Y_VERIFY(nodeIndexFrom < NodeCount);
  1239. Y_VERIFY(nodeIndexTo < NodeCount);
  1240. Y_VERIFY(nodeIndexFrom != nodeIndexTo);
  1241. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get();
  1242. return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
  1243. }
  1244. void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) {
  1245. TGuard<TMutex> guard(Mutex);
  1246. BlockedOutput.insert(actorId);
  1247. }
  1248. void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
  1249. ui64 days = (time.Hours() / 24);
  1250. DispatcherRandomSeed = (days << 32) ^ iteration;
  1251. DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
  1252. }
  1253. IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
  1254. TGuard<TMutex> guard(Mutex);
  1255. if (nodeIndex == Max<ui32>()) {
  1256. Y_VERIFY(actorId.NodeId());
  1257. nodeIndex = actorId.NodeId() - FirstNodeId;
  1258. }
  1259. Y_VERIFY(nodeIndex < NodeCount);
  1260. auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
  1261. Y_VERIFY(nodeIt != Nodes.end());
  1262. TNodeDataBase* node = nodeIt->second.Get();
  1263. return FindActor(actorId, node);
  1264. }
  1265. void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
  1266. TGuard<TMutex> guard(Mutex);
  1267. if (allow) {
  1268. if (VERBOSE) {
  1269. Cerr << "Actor " << actorId << " added to schedule whitelist";
  1270. }
  1271. ScheduleWhiteList.insert(actorId);
  1272. } else {
  1273. if (VERBOSE) {
  1274. Cerr << "Actor " << actorId << " removed from schedule whitelist";
  1275. }
  1276. ScheduleWhiteList.erase(actorId);
  1277. }
  1278. }
  1279. bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const {
  1280. TGuard<TMutex> guard(Mutex);
  1281. return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
  1282. }
  1283. TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) {
  1284. TGuard<TMutex> guard(Mutex);
  1285. Y_VERIFY(nodeIndex < NodeCount);
  1286. ui32 nodeId = FirstNodeId + nodeIndex;
  1287. TNodeDataBase* node = Nodes[nodeId].Get();
  1288. return node->DynamicCounters;
  1289. }
  1290. void TTestActorRuntimeBase::SetupMonitoring() {
  1291. NeedMonitoring = true;
  1292. }
  1293. void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) {
  1294. Y_VERIFY(nodeIndex < NodeCount);
  1295. ui32 nodeId = FirstNodeId + nodeIndex;
  1296. TNodeDataBase* node = Nodes[nodeId].Get();
  1297. ui32 targetNode = ev->GetRecipientRewrite().NodeId();
  1298. ui32 targetNodeIndex;
  1299. if (targetNode == 0) {
  1300. targetNodeIndex = nodeIndex;
  1301. } else {
  1302. targetNodeIndex = targetNode - FirstNodeId;
  1303. Y_VERIFY(targetNodeIndex < NodeCount);
  1304. }
  1305. if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) {
  1306. node->ActorSystem->Send(ev);
  1307. return;
  1308. }
  1309. Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex));
  1310. TAutoPtr<IEventHandle> evHolder(ev);
  1311. if (!AllowSendFrom(node, evHolder)) {
  1312. return;
  1313. }
  1314. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  1315. TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
  1316. if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  1317. mbox.PushFront(evHolder);
  1318. return;
  1319. }
  1320. ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId();
  1321. if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) {
  1322. Cerr << "Send event, ";
  1323. PrintEvent(evHolder, this);
  1324. }
  1325. EvCounters[ev->GetTypeRewrite()]++;
  1326. TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
  1327. IActor* recipientActor = mailbox->FindActor(recipientLocalId);
  1328. if (recipientActor) {
  1329. // Save actorId by value in order to prevent ctx from being invalidated during another Send call.
  1330. TActorId actorId = ev->GetRecipientRewrite();
  1331. node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
  1332. TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
  1333. TActivationContext *prevTlsActivationContext = TlsActivationContext;
  1334. TlsActivationContext = &ctx;
  1335. CurrentRecipient = actorId;
  1336. {
  1337. TInverseGuard<TMutex> inverseGuard(Mutex);
  1338. #ifdef USE_ACTOR_CALLSTACK
  1339. TCallstack::GetTlsCallstack() = ev->Callstack;
  1340. TCallstack::GetTlsCallstack().SetLinesToSkip();
  1341. #endif
  1342. recipientActor->Receive(evHolder, ctx);
  1343. node->ExecutorThread->DropUnregistered();
  1344. }
  1345. CurrentRecipient = TActorId();
  1346. TlsActivationContext = prevTlsActivationContext;
  1347. } else {
  1348. if (VERBOSE) {
  1349. Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n";
  1350. }
  1351. auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
  1352. if (!!forwardedEv) {
  1353. node->ActorSystem->Send(forwardedEv);
  1354. }
  1355. }
  1356. }
  1357. IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const {
  1358. ui32 mailboxHint = actorId.Hint();
  1359. ui64 localId = actorId.LocalId();
  1360. TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
  1361. IActor* actor = mailbox->FindActor(localId);
  1362. return actor;
  1363. }
  1364. THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) {
  1365. THolder<TActorSystemSetup> setup(new TActorSystemSetup);
  1366. setup->NodeId = FirstNodeId + nodeIndex;
  1367. if (UseRealThreads) {
  1368. setup->ExecutorsCount = 5;
  1369. setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]);
  1370. setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
  1371. setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
  1372. setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
  1373. setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
  1374. setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20));
  1375. setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100)));
  1376. } else {
  1377. setup->ExecutorsCount = 1;
  1378. setup->Scheduler.Reset(new TSchedulerThreadStub(this, node));
  1379. setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
  1380. setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
  1381. }
  1382. InitActorSystemSetup(*setup);
  1383. return setup;
  1384. }
  1385. THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) {
  1386. auto setup = MakeActorSystemSetup(nodeIndex, node);
  1387. node->ExecutorPools.resize(setup->ExecutorsCount);
  1388. for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
  1389. node->ExecutorPools[i] = setup->Executors[i].Get();
  1390. }
  1391. const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");
  1392. setup->LocalServices = node->LocalServices;
  1393. setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
  1394. const TActorId nameserviceId = GetNameserviceActorId();
  1395. TIntrusivePtr<TInterconnectProxyCommon> common;
  1396. common.Reset(new TInterconnectProxyCommon);
  1397. common->NameserviceId = nameserviceId;
  1398. common->MonCounters = interconnectCounters;
  1399. common->TechnicalSelfHostName = "::1";
  1400. if (!UseRealThreads) {
  1401. common->Settings.DeadPeer = TDuration::Max();
  1402. common->Settings.CloseOnIdle = TDuration::Max();
  1403. common->Settings.PingPeriod = TDuration::Max();
  1404. common->Settings.ForceConfirmPeriod = TDuration::Max();
  1405. common->Settings.Handshake = TDuration::Max();
  1406. }
  1407. common->ClusterUUID = ClusterUUID;
  1408. common->AcceptUUID = {ClusterUUID};
  1409. for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) {
  1410. if (proxyNodeIndex == nodeIndex)
  1411. continue;
  1412. const ui32 peerNodeId = FirstNodeId + proxyNodeIndex;
  1413. IActor *proxyActor = UseRealInterconnect
  1414. ? new TInterconnectProxyTCP(peerNodeId, common)
  1415. : InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common);
  1416. setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()};
  1417. }
  1418. setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock);
  1419. if (UseRealInterconnect) {
  1420. setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(),
  1421. NActors::TMailboxType::Simple, InterconnectPoolId()));
  1422. }
  1423. if (!SingleSysEnv) { // Single system env should do this self
  1424. TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend();
  1425. NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
  1426. logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
  1427. NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId());
  1428. std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
  1429. setup->LocalServices.push_back(loggerActorPair);
  1430. }
  1431. return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
  1432. }
  1433. TActorSystem* TTestActorRuntimeBase::SingleSys() const {
  1434. Y_VERIFY(Nodes.size() == 1, "Works only for single system env");
  1435. return Nodes.begin()->second->ActorSystem.Get();
  1436. }
  1437. TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() {
  1438. for (auto& x : Nodes) {
  1439. return x.second->ActorSystem.Get();
  1440. }
  1441. Y_FAIL("Don't use this method.");
  1442. }
  1443. TActorSystem* TTestActorRuntimeBase::GetActorSystem(ui32 nodeId) {
  1444. auto it = Nodes.find(GetNodeId(nodeId));
  1445. Y_VERIFY(it != Nodes.end());
  1446. return it->second->ActorSystem.Get();
  1447. }
  1448. TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) {
  1449. TGuard<TMutex> guard(Mutex);
  1450. auto mboxId = TEventMailboxId(nodeId, hint);
  1451. auto it = Mailboxes.find(mboxId);
  1452. if (it == Mailboxes.end()) {
  1453. it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first;
  1454. }
  1455. return *it->second;
  1456. }
  1457. void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) {
  1458. TGuard<TMutex> guard(Mutex);
  1459. auto mboxId = TEventMailboxId(nodeId, hint);
  1460. Mailboxes.erase(mboxId);
  1461. }
  1462. TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const {
  1463. auto it = ActorNames.find(actorId);
  1464. if (it != ActorNames.end())
  1465. return it->second;
  1466. return actorId.ToString();
  1467. }
  1468. struct TStrandingActorDecoratorContext : public TThrRefBase {
  1469. TStrandingActorDecoratorContext()
  1470. : Queue(new TQueueType)
  1471. {
  1472. }
  1473. typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType;
  1474. TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
  1475. };
  1476. class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
  1477. public:
  1478. class TReplyActor : public TActor<TReplyActor> {
  1479. public:
  1480. static constexpr EActivityType ActorActivityType() {
  1481. return TEST_ACTOR_RUNTIME;
  1482. }
  1483. TReplyActor(TStrandingActorDecorator* owner)
  1484. : TActor(&TReplyActor::StateFunc)
  1485. , Owner(owner)
  1486. {
  1487. }
  1488. STFUNC(StateFunc);
  1489. private:
  1490. TStrandingActorDecorator* const Owner;
  1491. };
  1492. static constexpr EActivityType ActorActivityType() {
  1493. return TEST_ACTOR_RUNTIME;
  1494. }
  1495. TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
  1496. TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
  1497. TReplyCheckerCreator createReplyChecker)
  1498. : Delegatee(delegatee)
  1499. , IsSync(isSync)
  1500. , AdditionalActors(additionalActors)
  1501. , Context(context)
  1502. , HasReply(false)
  1503. , Runtime(runtime)
  1504. , ReplyChecker(createReplyChecker())
  1505. {
  1506. if (IsSync) {
  1507. Y_VERIFY(!runtime->IsRealThreads());
  1508. }
  1509. }
  1510. void Bootstrap(const TActorContext& ctx) {
  1511. Become(&TStrandingActorDecorator::StateFunc);
  1512. ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this));
  1513. DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
  1514. for (const auto& actor : AdditionalActors) {
  1515. DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));
  1516. }
  1517. DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
  1518. DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
  1519. DelegateeOptions.Quiet = true;
  1520. }
  1521. STFUNC(StateFunc) {
  1522. bool wasEmpty = !Context->Queue->Head();
  1523. Context->Queue->Push(ev.Release());
  1524. if (wasEmpty) {
  1525. SendHead(ctx);
  1526. }
  1527. }
  1528. STFUNC(Reply) {
  1529. Y_VERIFY(!HasReply);
  1530. IEventHandle *requestEv = Context->Queue->Head();
  1531. TActorId originalSender = requestEv->Sender;
  1532. HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get());
  1533. if (HasReply) {
  1534. delete Context->Queue->Pop();
  1535. }
  1536. ctx.ExecutorThread.Send(ev->Forward(originalSender));
  1537. if (!IsSync && Context->Queue->Head()) {
  1538. SendHead(ctx);
  1539. }
  1540. }
  1541. private:
  1542. void SendHead(const TActorContext& ctx) {
  1543. if (!IsSync) {
  1544. ctx.ExecutorThread.Send(GetForwardedEvent().Release());
  1545. } else {
  1546. while (Context->Queue->Head()) {
  1547. HasReply = false;
  1548. ctx.ExecutorThread.Send(GetForwardedEvent().Release());
  1549. int count = 100;
  1550. while (!HasReply && count > 0) {
  1551. try {
  1552. Runtime->DispatchEvents(DelegateeOptions);
  1553. } catch (TEmptyEventQueueException&) {
  1554. count--;
  1555. Cerr << "No reply" << Endl;
  1556. }
  1557. }
  1558. Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000));
  1559. }
  1560. }
  1561. }
  1562. TAutoPtr<IEventHandle> GetForwardedEvent() {
  1563. IEventHandle* ev = Context->Queue->Head();
  1564. ReplyChecker->OnRequest(ev);
  1565. TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
  1566. ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
  1567. : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);
  1568. return forwardedEv;
  1569. }
  1570. private:
  1571. const TActorId Delegatee;
  1572. const bool IsSync;
  1573. const TVector<TActorId> AdditionalActors;
  1574. TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
  1575. TActorId ReplyId;
  1576. bool HasReply;
  1577. TDispatchOptions DelegateeOptions;
  1578. TTestActorRuntimeBase* Runtime;
  1579. THolder<IReplyChecker> ReplyChecker;
  1580. };
  1581. void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
  1582. Owner->Reply(ev, ctx);
  1583. }
  1584. class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
  1585. public:
  1586. TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
  1587. TReplyCheckerCreator createReplyChecker)
  1588. : Context(new TStrandingActorDecoratorContext())
  1589. , Runtime(runtime)
  1590. , CreateReplyChecker(createReplyChecker)
  1591. {
  1592. }
  1593. IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
  1594. return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
  1595. CreateReplyChecker);
  1596. }
  1597. private:
  1598. TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
  1599. TTestActorRuntimeBase* Runtime;
  1600. TReplyCheckerCreator CreateReplyChecker;
  1601. };
  1602. TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
  1603. TReplyCheckerCreator createReplyChecker) {
  1604. return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker));
  1605. }
  1606. ui64 DefaultRandomSeed = 9999;
  1607. }