test_runtime.cpp 75 KB


  1. #include "test_runtime.h"
  2. #include <library/cpp/actors/core/actor_bootstrapped.h>
  3. #include <library/cpp/actors/core/callstack.h>
  4. #include <library/cpp/actors/core/executor_pool_basic.h>
  5. #include <library/cpp/actors/core/executor_pool_io.h>
  6. #include <library/cpp/actors/core/log.h>
  7. #include <library/cpp/actors/core/scheduler_basic.h>
  8. #include <library/cpp/actors/util/datetime.h>
  9. #include <library/cpp/actors/protos/services_common.pb.h>
  10. #include <library/cpp/random_provider/random_provider.h>
  11. #include <library/cpp/actors/interconnect/interconnect.h>
  12. #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
  13. #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
  14. #include <util/generic/maybe.h>
  15. #include <util/generic/bt_exception.h>
  16. #include <util/random/mersenne.h>
  17. #include <util/string/printf.h>
  18. #include <typeinfo>
  19. bool VERBOSE = false;
  20. const bool PRINT_EVENT_BODY = false;
  21. namespace {
  22. TString MakeClusterId() {
  23. pid_t pid = getpid();
  24. TStringBuilder uuid;
  25. uuid << "Cluster for process with id: " << pid;
  26. return uuid;
  27. }
  28. }
  29. namespace NActors {
  30. ui64 TScheduledEventQueueItem::NextUniqueId = 0;
  31. void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) {
  32. Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite())
  33. << ", from " << ev->Sender.LocalId();
  34. TString name = runtime->GetActorName(ev->Sender);
  35. if (!name.empty())
  36. Cerr << " \"" << name << "\"";
  37. Cerr << ", to " << ev->GetRecipientRewrite().LocalId();
  38. name = runtime->GetActorName(ev->GetRecipientRewrite());
  39. if (!name.empty())
  40. Cerr << " \"" << name << "\"";
  41. Cerr << ", ";
  42. if (ev->HasEvent())
  43. Cerr << " : " << (PRINT_EVENT_BODY ? ev->ToString() : ev->GetTypeName());
  44. else if (ev->HasBuffer())
  45. Cerr << " : BUFFER";
  46. else
  47. Cerr << " : EMPTY";
  48. Cerr << "\n";
  49. }
  50. TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() {
  51. ActorSystemTimestamp = nullptr;
  52. ActorSystemMonotonic = nullptr;
  53. }
  54. void TTestActorRuntimeBase::TNodeDataBase::Stop() {
  55. if (Poller)
  56. Poller->Stop();
  57. if (MailboxTable) {
  58. for (ui32 round = 0; !MailboxTable->Cleanup(); ++round)
  59. Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub");
  60. }
  61. if (ActorSystem)
  62. ActorSystem->Stop();
  63. ActorSystem.Destroy();
  64. Poller.Reset();
  65. }
  66. TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() {
  67. Stop();
  68. }
  69. class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
  70. public:
  71. static constexpr EActivityType ActorActivityType() {
  72. return EActivityType::TEST_ACTOR_RUNTIME;
  73. }
  74. TEdgeActor(TTestActorRuntimeBase* runtime)
  75. : TActor(&TEdgeActor::StateFunc)
  76. , Runtime(runtime)
  77. {
  78. }
  79. STFUNC(StateFunc) {
  80. TGuard<TMutex> guard(Runtime->Mutex);
  81. bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
  82. if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
  83. verbose = false;
  84. }
  85. if (verbose) {
  86. Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
  87. PrintEvent(ev, Runtime);
  88. }
  89. if (!Runtime->EventFilterFunc(*Runtime, ev)) {
  90. ui32 nodeId = ev->GetRecipientRewrite().NodeId();
  91. Y_VERIFY(nodeId != 0);
  92. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  93. Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
  94. Runtime->MailboxesHasEvents.Signal();
  95. if (verbose)
  96. Cerr << "Event was added to sent queue\n";
  97. }
  98. else {
  99. if (verbose)
  100. Cerr << "Event was dropped\n";
  101. }
  102. }
  103. private:
  104. TTestActorRuntimeBase* Runtime;
  105. };
  106. void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
  107. IEventHandle* ptr = ev.Get();
  108. Y_VERIFY(ptr);
  109. #ifdef DEBUG_ORDER_EVENTS
  110. ui64 counter = NextToSend++;
  111. TrackSent[ptr] = counter;
  112. #endif
  113. Sent.push_back(ev);
  114. }
  115. TAutoPtr<IEventHandle> TEventMailBox::Pop() {
  116. TAutoPtr<IEventHandle> result = Sent.front();
  117. Sent.pop_front();
  118. #ifdef DEBUG_ORDER_EVENTS
  119. auto it = TrackSent.find(result.Get());
  120. if (it != TrackSent.end()) {
  121. Y_VERIFY(ExpectedReceive == it->second);
  122. TrackSent.erase(result.Get());
  123. ++ExpectedReceive;
  124. }
  125. #endif
  126. return result;
  127. }
  128. bool TEventMailBox::IsEmpty() const {
  129. return Sent.empty();
  130. }
  131. void TEventMailBox::Capture(TEventsList& evList) {
  132. evList.insert(evList.end(), Sent.begin(), Sent.end());
  133. Sent.clear();
  134. }
  135. void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) {
  136. Sent.push_front(ev);
  137. }
  138. void TEventMailBox::PushFront(TEventsList& evList) {
  139. for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) {
  140. if (*rit) {
  141. Sent.push_front(*rit);
  142. }
  143. }
  144. }
  145. void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) {
  146. for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) {
  147. evList.insert(*it);
  148. }
  149. Scheduled.clear();
  150. }
  151. void TEventMailBox::PushScheduled(TScheduledEventsList& evList) {
  152. for (auto it = evList.begin(); it != evList.end(); ++it) {
  153. if (it->Event) {
  154. Scheduled.insert(*it);
  155. }
  156. }
  157. evList.clear();
  158. }
  159. bool TEventMailBox::IsActive(const TInstant& currentTime) const {
  160. return currentTime >= InactiveUntil;
  161. }
  162. void TEventMailBox::Freeze(const TInstant& deadline) {
  163. if (deadline > InactiveUntil)
  164. InactiveUntil = deadline;
  165. }
  166. TInstant TEventMailBox::GetInactiveUntil() const {
  167. return InactiveUntil;
  168. }
  169. void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) {
  170. Scheduled.insert(item);
  171. }
  172. bool TEventMailBox::IsScheduledEmpty() const {
  173. return Scheduled.empty();
  174. }
  175. TInstant TEventMailBox::GetFirstScheduleDeadline() const {
  176. return Scheduled.begin()->Deadline;
  177. }
  178. ui64 TEventMailBox::GetSentEventCount() const {
  179. return Sent.size();
  180. }
  181. class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
  182. public:
  183. TTimeProvider(TTestActorRuntimeBase& runtime)
  184. : Runtime(runtime)
  185. {
  186. }
  187. TInstant Now() override {
  188. return Runtime.GetCurrentTime();
  189. }
  190. private:
  191. TTestActorRuntimeBase& Runtime;
  192. };
  193. class TTestActorRuntimeBase::TMonotonicTimeProvider : public IMonotonicTimeProvider {
  194. public:
  195. TMonotonicTimeProvider(TTestActorRuntimeBase& runtime)
  196. : Runtime(runtime)
  197. { }
  198. TMonotonic Now() override {
  199. return Runtime.GetCurrentMonotonicTime();
  200. }
  201. private:
  202. TTestActorRuntimeBase& Runtime;
  203. };
  204. class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
  205. public:
  206. TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
  207. : Runtime(runtime)
  208. , Node(node)
  209. {
  210. Y_UNUSED(Runtime);
  211. }
  212. void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override {
  213. Y_UNUSED(actorSystem);
  214. Node->ActorSystemTimestamp = currentTimestamp;
  215. Node->ActorSystemMonotonic = currentMonotonic;
  216. }
  217. void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override {
  218. Y_UNUSED(readers);
  219. Y_UNUSED(scheduleReadersCount);
  220. }
  221. void Start() override {
  222. }
  223. void PrepareStop() override {
  224. }
  225. void Stop() override {
  226. }
  227. private:
  228. TTestActorRuntimeBase* Runtime;
  229. TTestActorRuntimeBase::TNodeDataBase* Node;
  230. };
  231. class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool {
  232. public:
  233. TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId)
  234. : IExecutorPool(poolId)
  235. , Runtime(runtime)
  236. , NodeIndex(nodeIndex)
  237. , Node(node)
  238. {
  239. }
  240. TTestActorRuntimeBase* GetRuntime() {
  241. return Runtime;
  242. }
  243. // for threads
  244. ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override {
  245. Y_UNUSED(wctx);
  246. Y_UNUSED(revolvingCounter);
  247. Y_FAIL();
  248. }
  249. void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override {
  250. Y_UNUSED(workerId);
  251. Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter);
  252. }
  253. TMailboxHeader *ResolveMailbox(ui32 hint) override {
  254. return Node->MailboxTable->Get(hint);
  255. }
  256. void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
  257. DoSchedule(deadline, ev, cookie, workerId);
  258. }
  259. void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
  260. DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId);
  261. }
  262. void Schedule(TDuration delay, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
  263. TInstant deadline = Runtime->GetTimeProvider()->Now() + delay;
  264. DoSchedule(deadline, ev, cookie, workerId);
  265. }
  266. void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) {
  267. Y_UNUSED(workerId);
  268. TGuard<TMutex> guard(Runtime->Mutex);
  269. bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
  270. if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
  271. verbose = false;
  272. }
  273. if (verbose) {
  274. Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
  275. PrintEvent(ev, Runtime);
  276. }
  277. auto now = Runtime->GetTimeProvider()->Now();
  278. if (deadline < now) {
  279. deadline = now; // avoid going backwards in time
  280. }
  281. TDuration delay = (deadline - now);
  282. if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
  283. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  284. Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
  285. Runtime->MailboxesHasEvents.Signal();
  286. if (verbose)
  287. Cerr << "Event was added to scheduled queue\n";
  288. } else {
  289. if (cookie) {
  290. cookie->Detach();
  291. }
  292. if (verbose) {
  293. Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
  294. }
  295. }
  296. }
  297. // for actorsystem
  298. bool SpecificSend(TAutoPtr<IEventHandle>& ev) override {
  299. return Send(ev);
  300. }
  301. bool Send(TAutoPtr<IEventHandle>& ev) override {
  302. TGuard<TMutex> guard(Runtime->Mutex);
  303. bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
  304. if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
  305. verbose = false;
  306. }
  307. if (verbose) {
  308. Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
  309. PrintEvent(ev, Runtime);
  310. }
  311. if (!Runtime->EventFilterFunc(*Runtime, ev)) {
  312. ui32 nodeId = ev->GetRecipientRewrite().NodeId();
  313. Y_VERIFY(nodeId != 0);
  314. TNodeDataBase* node = Runtime->Nodes[nodeId].Get();
  315. if (!AllowSendFrom(node, ev)) {
  316. return true;
  317. }
  318. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  319. if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
  320. const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
  321. TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
  322. if (ev->GetRecipientRewrite() == logger) {
  323. TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
  324. IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
  325. if (recipientActor) {
  326. TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite());
  327. TActivationContext *prevTlsActivationContext = TlsActivationContext;
  328. TlsActivationContext = &ctx;
  329. recipientActor->Receive(ev);
  330. TlsActivationContext = prevTlsActivationContext;
  331. // we expect the logger to never die in tests
  332. }
  333. }
  334. } else {
  335. Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
  336. Runtime->MailboxesHasEvents.Signal();
  337. }
  338. if (verbose)
  339. Cerr << "Event was added to sent queue\n";
  340. } else {
  341. if (verbose)
  342. Cerr << "Event was dropped\n";
  343. }
  344. return true;
  345. }
  346. void ScheduleActivation(ui32 activation) override {
  347. Y_UNUSED(activation);
  348. }
  349. void SpecificScheduleActivation(ui32 activation) override {
  350. Y_UNUSED(activation);
  351. }
  352. void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override {
  353. Y_UNUSED(activation);
  354. Y_UNUSED(revolvingCounter);
  355. }
  356. TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter,
  357. const TActorId& parentId) override {
  358. return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
  359. }
  360. TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override {
  361. return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId);
  362. }
  363. // lifecycle stuff
  364. void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override {
  365. Y_UNUSED(actorSystem);
  366. Y_UNUSED(scheduleReaders);
  367. Y_UNUSED(scheduleSz);
  368. }
  369. void Start() override {
  370. }
  371. void PrepareStop() override {
  372. }
  373. void Shutdown() override {
  374. }
  375. bool Cleanup() override {
  376. return true;
  377. }
  378. // generic
  379. TAffinity* Affinity() const override {
  380. Y_FAIL();
  381. }
  382. private:
  383. TTestActorRuntimeBase* const Runtime;
  384. const ui32 NodeIndex;
  385. TTestActorRuntimeBase::TNodeDataBase* const Node;
  386. };
  387. IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) {
  388. return new TExecutorPoolStub{runtime, nodeIndex, node, poolId};
  389. }
  390. ui32 TTestActorRuntimeBase::NextNodeId = 1;
  391. TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv)
  392. : TTestActorRuntimeBase(1, 1, false)
  393. {
  394. SingleSysEnv = true;
  395. }
  396. TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
  397. : ScheduledCount(0)
  398. , ScheduledLimit(100000)
  399. , MainThreadId(TThread::CurrentThreadId())
  400. , ClusterUUID(MakeClusterId())
  401. , FirstNodeId(NextNodeId)
  402. , NodeCount(nodeCount)
  403. , DataCenterCount(dataCenterCount)
  404. , UseRealThreads(useRealThreads)
  405. , LocalId(0)
  406. , DispatchCyclesCount(0)
  407. , DispatchedEventsCount(0)
  408. , NeedMonitoring(false)
  409. , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
  410. , TimeProvider(new TTimeProvider(*this))
  411. , MonotonicTimeProvider(new TMonotonicTimeProvider(*this))
  412. , ShouldContinue()
  413. , CurrentTimestamp(0)
  414. , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT)
  415. , ReschedulingDelay(TDuration::MicroSeconds(0))
  416. , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc)
  417. , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
  418. , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc)
  419. , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc)
  420. , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
  421. , CurrentDispatchContext(nullptr)
  422. {
  423. SetDispatcherRandomSeed(TInstant::Now(), 0);
  424. EnableActorCallstack();
  425. }
  426. void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
  427. const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
  428. node->LogSettings = new NActors::NLog::TSettings(loggerActorId, NActorsServices::LOGGER,
  429. NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0);
  430. node->LogSettings->SetAllowDrop(false);
  431. node->LogSettings->SetThrottleDelay(TDuration::Zero());
  432. node->DynamicCounters = new NMonitoring::TDynamicCounters;
  433. InitNodeImpl(node, nodeIndex);
  434. }
  435. void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) {
  436. node->LogSettings->Append(
  437. NActorsServices::EServiceCommon_MIN,
  438. NActorsServices::EServiceCommon_MAX,
  439. NActorsServices::EServiceCommon_Name
  440. );
  441. if (!UseRealThreads) {
  442. node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
  443. node->MailboxTable.Reset(new TMailboxTable());
  444. node->ActorSystem = MakeActorSystem(nodeIndex, node);
  445. node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
  446. } else {
  447. node->ActorSystem = MakeActorSystem(nodeIndex, node);
  448. }
  449. node->ActorSystem->Start();
  450. }
  451. bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) {
  452. ui64 senderLocalId = ev->Sender.LocalId();
  453. ui64 senderMailboxHint = ev->Sender.Hint();
  454. TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint);
  455. if (senderMailbox) {
  456. IActor* senderActor = senderMailbox->FindActor(senderLocalId);
  457. TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(senderActor);
  458. return !decorator || decorator->BeforeSending(ev);
  459. }
  460. return true;
  461. }
  462. TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount)
  463. : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) {
  464. }
  465. TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads)
  466. : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) {
  467. }
  468. TTestActorRuntimeBase::~TTestActorRuntimeBase() {
  469. CleanupNodes();
  470. Cerr.Flush();
  471. Cerr.Flush();
  472. Clog.Flush();
  473. DisableActorCallstack();
  474. }
  475. void TTestActorRuntimeBase::CleanupNodes() {
  476. Nodes.clear();
  477. }
  478. bool TTestActorRuntimeBase::IsRealThreads() const {
  479. return UseRealThreads;
  480. }
  481. TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
  482. Y_UNUSED(runtime);
  483. Y_UNUSED(event);
  484. return EEventAction::PROCESS;
  485. }
  486. void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
  487. Y_UNUSED(runtime);
  488. Y_UNUSED(queue);
  489. scheduledEvents.clear();
  490. }
  491. bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
  492. Y_UNUSED(runtime);
  493. Y_UNUSED(event);
  494. return false;
  495. }
  496. bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) {
  497. Y_UNUSED(runtime);
  498. Y_UNUSED(delay);
  499. Y_UNUSED(event);
  500. Y_UNUSED(deadline);
  501. return true;
  502. }
  503. void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
  504. if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
  505. runtime.ScheduleWhiteList.insert(actorId);
  506. runtime.ScheduleWhiteListParent[actorId] = parentId;
  507. }
  508. }
  509. class TScheduledTreeItem {
  510. public:
  511. TString Name;
  512. ui64 Count;
  513. TVector<TScheduledTreeItem> Children;
  514. TScheduledTreeItem(const TString& name)
  515. : Name(name)
  516. , Count(0)
  517. {}
  518. TScheduledTreeItem* GetItem(const TString& name) {
  519. TScheduledTreeItem* item = nullptr;
  520. for (TScheduledTreeItem& i : Children) {
  521. if (i.Name == name) {
  522. item = &i;
  523. break;
  524. }
  525. }
  526. if (item != nullptr)
  527. return item;
  528. Children.emplace_back(name);
  529. return &Children.back();
  530. }
  531. void RecursiveSort() {
  532. Sort(Children, [](const TScheduledTreeItem& a, const TScheduledTreeItem& b) -> bool { return a.Count > b.Count; });
  533. for (TScheduledTreeItem& item : Children) {
  534. item.RecursiveSort();
  535. }
  536. }
  537. void Print(IOutputStream& stream, const TString& prefix) {
  538. for (auto it = Children.begin(); it != Children.end(); ++it) {
  539. bool lastChild = (std::next(it) == Children.end());
  540. TString connectionPrefix = lastChild ? "└─ " : "├─ ";
  541. TString subChildPrefix = lastChild ? " " : "│ ";
  542. stream << prefix << connectionPrefix << it->Name << " (" << it->Count << ")\n";
  543. it->Print(stream, prefix + subChildPrefix);
  544. }
  545. }
  546. void Print(IOutputStream& stream) {
  547. stream << Name << " (" << Count << ")\n";
  548. Print(stream, TString());
  549. }
  550. };
  551. void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
  552. if (scheduledEvents.empty())
  553. return;
  554. TInstant time = scheduledEvents.begin()->Deadline;
  555. while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
  556. // static THashMap<std::pair<TActorId, TString>, ui64> eventTypes;
  557. auto& item = *scheduledEvents.begin();
  558. TString name = item.Event->GetTypeName();
  559. // eventTypes[std::make_pair(item.Event->Recipient, name)]++;
  560. runtime.ScheduledCount++;
  561. if (runtime.ScheduledCount > runtime.ScheduledLimit) {
  562. // TScheduledTreeItem root("Root");
  563. // TVector<TString> path;
  564. // for (const auto& pr : eventTypes) {
  565. // path.clear();
  566. // path.push_back(runtime.GetActorName(pr.first.first));
  567. // for (auto it = runtime.ScheduleWhiteListParent.find(pr.first.first); it != runtime.ScheduleWhiteListParent.end(); it = runtime.ScheduleWhiteListParent.find(it->second)) {
  568. // path.insert(path.begin(), runtime.GetActorName(it->second));
  569. // }
  570. // path.push_back("<" + pr.first.second + ">"); // event name;
  571. // ui64 count = pr.second;
  572. // TScheduledTreeItem* item = &root;
  573. // item->Count += count;
  574. // for (TString name : path) {
  575. // item = item->GetItem(name);
  576. // item->Count += count;
  577. // }
  578. // }
  579. // root.RecursiveSort();
  580. // root.Print(Cerr);
  581. ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
  582. }
  583. if (item.Cookie->Get()) {
  584. if (item.Cookie->Detach()) {
  585. queue.push_back(item.Event);
  586. }
  587. } else {
  588. queue.push_back(item.Event);
  589. }
  590. scheduledEvents.erase(scheduledEvents.begin());
  591. }
  592. runtime.UpdateCurrentTime(time);
  593. }
  594. TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) {
  595. TGuard<TMutex> guard(Mutex);
  596. auto result = ObserverFunc;
  597. ObserverFunc = observerFunc;
  598. return result;
  599. }
  600. TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) {
  601. TGuard<TMutex> guard(Mutex);
  602. auto result = ScheduledEventsSelectorFunc;
  603. ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc;
  604. return result;
  605. }
  606. TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) {
  607. TGuard<TMutex> guard(Mutex);
  608. auto result = EventFilterFunc;
  609. EventFilterFunc = filterFunc;
  610. return result;
  611. }
  612. TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) {
  613. TGuard<TMutex> guard(Mutex);
  614. auto result = ScheduledEventFilterFunc;
  615. ScheduledEventFilterFunc = filterFunc;
  616. return result;
  617. }
  618. TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) {
  619. TGuard<TMutex> guard(Mutex);
  620. auto result = RegistrationObserver;
  621. RegistrationObserver = observerFunc;
  622. return result;
  623. }
  624. bool TTestActorRuntimeBase::IsVerbose() {
  625. return VERBOSE;
  626. }
  627. void TTestActorRuntimeBase::SetVerbose(bool verbose) {
  628. VERBOSE = verbose;
  629. }
  630. void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, TActorSetupCmd cmd, ui32 nodeIndex) {
  631. Y_VERIFY(!IsInitialized);
  632. Y_VERIFY(nodeIndex < NodeCount);
  633. auto node = Nodes[nodeIndex + FirstNodeId];
  634. if (!node) {
  635. node = GetNodeFactory().CreateNode();
  636. Nodes[nodeIndex + FirstNodeId] = node;
  637. }
  638. node->LocalServicesActors[actorId] = cmd.Actor.get();
  639. node->LocalServices.push_back(std::make_pair(actorId, TTestActorSetupCmd(std::move(cmd))));
  640. }
  641. void TTestActorRuntimeBase::InitNodes() {
  642. NextNodeId += NodeCount;
  643. Y_VERIFY(NodeCount > 0);
  644. for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
  645. auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
  646. TNodeDataBase* node = nodeIt->second.Get();
  647. InitNode(node, nodeIndex);
  648. }
  649. }
  650. void TTestActorRuntimeBase::Initialize() {
  651. InitNodes();
  652. IsInitialized = true;
  653. }
  654. void SetupCrossDC() {
  655. }
  656. TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) {
  657. TGuard<TMutex> guard(Mutex);
  658. TDuration oldTimeout = DispatchTimeout;
  659. DispatchTimeout = timeout;
  660. return oldTimeout;
  661. }
  662. TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) {
  663. TGuard<TMutex> guard(Mutex);
  664. TDuration oldDelay = ReschedulingDelay;
  665. ReschedulingDelay = delay;
  666. return oldDelay;
  667. }
  668. void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
  669. Y_VERIFY(!IsInitialized);
  670. TGuard<TMutex> guard(Mutex);
  671. LogBackend = logBackend;
  672. }
  673. void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
  674. TGuard<TMutex> guard(Mutex);
  675. for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
  676. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  677. TString explanation;
  678. auto status = node->LogSettings->SetLevel(priority, component, explanation);
  679. if (status) {
  680. Y_FAIL("SetLogPriority failed: %s", explanation.c_str());
  681. }
  682. }
  683. }
  684. TInstant TTestActorRuntimeBase::GetCurrentTime() const {
  685. TGuard<TMutex> guard(Mutex);
  686. Y_VERIFY(!UseRealThreads);
  687. return TInstant::MicroSeconds(CurrentTimestamp);
  688. }
  689. TMonotonic TTestActorRuntimeBase::GetCurrentMonotonicTime() const {
  690. TGuard<TMutex> guard(Mutex);
  691. Y_VERIFY(!UseRealThreads);
  692. return TMonotonic::MicroSeconds(CurrentTimestamp);
  693. }
  694. void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
  695. static int counter = 0;
  696. ++counter;
  697. if (VERBOSE) {
  698. Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n";
  699. }
  700. TGuard<TMutex> guard(Mutex);
  701. Y_VERIFY(!UseRealThreads);
  702. if (newTime.MicroSeconds() > CurrentTimestamp) {
  703. CurrentTimestamp = newTime.MicroSeconds();
  704. for (auto& kv : Nodes) {
  705. AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
  706. AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
  707. }
  708. }
  709. }
  710. void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) {
  711. UpdateCurrentTime(GetCurrentTime() + duration);
  712. }
  713. TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() {
  714. Y_VERIFY(!UseRealThreads);
  715. return TimeProvider;
  716. }
  717. TIntrusivePtr<IMonotonicTimeProvider> TTestActorRuntimeBase::GetMonotonicTimeProvider() {
  718. Y_VERIFY(!UseRealThreads);
  719. return MonotonicTimeProvider;
  720. }
  721. ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
  722. Y_VERIFY(index < NodeCount);
  723. return FirstNodeId + index;
  724. }
  725. ui32 TTestActorRuntimeBase::GetNodeCount() const {
  726. return NodeCount;
  727. }
  728. ui64 TTestActorRuntimeBase::AllocateLocalId() {
  729. TGuard<TMutex> guard(Mutex);
  730. ui64 nextId = ++LocalId;
  731. if (VERBOSE) {
  732. Cerr << "Allocated id: " << nextId << "\n";
  733. }
  734. return nextId;
  735. }
  736. ui32 TTestActorRuntimeBase::InterconnectPoolId() const {
  737. if (UseRealThreads && NSan::TSanIsOn()) {
  738. // Interconnect coroutines may move across threads
  739. // Use a special single-threaded pool to avoid that
  740. return 4;
  741. }
  742. return 0;
  743. }
  744. TString TTestActorRuntimeBase::GetTempDir() {
  745. if (!TmpDir)
  746. TmpDir.Reset(new TTempDir());
  747. return (*TmpDir)();
  748. }
  749. TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType,
  750. ui64 revolvingCounter, const TActorId& parentId) {
  751. Y_VERIFY(nodeIndex < NodeCount);
  752. TGuard<TMutex> guard(Mutex);
  753. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  754. if (UseRealThreads) {
  755. Y_VERIFY(poolId < node->ExecutorPools.size());
  756. return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
  757. }
  758. // first step - find good enough mailbox
  759. ui32 hint = 0;
  760. TMailboxHeader *mailbox = nullptr;
  761. {
  762. ui32 hintBackoff = 0;
  763. while (hint == 0) {
  764. hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter);
  765. mailbox = node->MailboxTable->Get(hint);
  766. if (!mailbox->LockFromFree()) {
  767. node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
  768. hintBackoff = hint;
  769. hint = 0;
  770. }
  771. }
  772. node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
  773. }
  774. const ui64 localActorId = AllocateLocalId();
  775. if (VERBOSE) {
  776. Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n";
  777. }
  778. // ok, got mailbox
  779. mailbox->AttachActor(localActorId, actor);
  780. // do init
  781. const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
  782. ActorNames[actorId] = TypeName(*actor);
  783. RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
  784. DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
  785. switch (mailboxType) {
  786. case TMailboxType::Simple:
  787. UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  788. break;
  789. case TMailboxType::Revolving:
  790. UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  791. break;
  792. case TMailboxType::HTSwap:
  793. UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  794. break;
  795. case TMailboxType::ReadAsFilled:
  796. UnlockFromExecution((TMailboxTable::TReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  797. break;
  798. case TMailboxType::TinyReadAsFilled:
  799. UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
  800. break;
  801. default:
  802. Y_FAIL("Unsupported mailbox type");
  803. }
  804. return actorId;
  805. }
  806. TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
  807. const TActorId& parentId) {
  808. Y_VERIFY(nodeIndex < NodeCount);
  809. TGuard<TMutex> guard(Mutex);
  810. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  811. if (UseRealThreads) {
  812. Y_VERIFY(poolId < node->ExecutorPools.size());
  813. return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
  814. }
  815. const ui64 localActorId = AllocateLocalId();
  816. if (VERBOSE) {
  817. Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n";
  818. }
  819. mailbox->AttachActor(localActorId, actor);
  820. const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
  821. ActorNames[actorId] = TypeName(*actor);
  822. RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
  823. DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
  824. return actorId;
  825. }
  826. TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
  827. TGuard<TMutex> guard(Mutex);
  828. Y_VERIFY(nodeIndex < NodeCount);
  829. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  830. if (!UseRealThreads) {
  831. IActor* actor = FindActor(actorId, node);
  832. node->LocalServicesActors[serviceId] = actor;
  833. node->ActorToActorId[actor] = actorId;
  834. }
  835. return node->ActorSystem->RegisterLocalService(serviceId, actorId);
  836. }
  837. TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) {
  838. TGuard<TMutex> guard(Mutex);
  839. Y_VERIFY(nodeIndex < NodeCount);
  840. TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex);
  841. EdgeActors.insert(edgeActor);
  842. EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor;
  843. return edgeActor;
  844. }
  845. TEventsList TTestActorRuntimeBase::CaptureEvents() {
  846. TGuard<TMutex> guard(Mutex);
  847. TEventsList result;
  848. for (auto& mbox : Mailboxes) {
  849. mbox.second->Capture(result);
  850. }
  851. return result;
  852. }
  853. TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) {
  854. TGuard<TMutex> guard(Mutex);
  855. Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
  856. TEventsList result;
  857. GetMailbox(nodeId, hint).Capture(result);
  858. return result;
  859. }
  860. void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) {
  861. TGuard<TMutex> guard(Mutex);
  862. ui32 nodeId = ev->GetRecipientRewrite().NodeId();
  863. Y_VERIFY(nodeId != 0);
  864. GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
  865. }
  866. void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) {
  867. TGuard<TMutex> guard(Mutex);
  868. for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
  869. if (*rit) {
  870. auto& ev = *rit;
  871. ui32 nodeId = ev->GetRecipientRewrite().NodeId();
  872. Y_VERIFY(nodeId != 0);
  873. GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
  874. }
  875. }
  876. events.clear();
  877. }
  878. void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) {
  879. TGuard<TMutex> guard(Mutex);
  880. Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
  881. TEventsList result;
  882. GetMailbox(nodeId, hint).PushFront(events);
  883. events.clear();
  884. }
  885. TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() {
  886. TGuard<TMutex> guard(Mutex);
  887. TScheduledEventsList result;
  888. for (auto& mbox : Mailboxes) {
  889. mbox.second->CaptureScheduled(result);
  890. }
  891. return result;
  892. }
  893. bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) {
  894. return DispatchEvents(options, TInstant::Max());
  895. }
  896. bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) {
  897. return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout);
  898. }
  899. bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
  900. TGuard<TMutex> guard(Mutex);
  901. return DispatchEventsInternal(options, simDeadline);
  902. }
  903. // Mutex must be locked by caller!
  904. bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
  905. TDispatchContext localContext;
  906. localContext.Options = &options;
  907. localContext.PrevContext = nullptr;
  908. bool verbose = !options.Quiet && VERBOSE;
  909. struct TDispatchContextSetter {
  910. TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext)
  911. : Runtime(runtime)
  912. {
  913. lastContext.PrevContext = Runtime.CurrentDispatchContext;
  914. Runtime.CurrentDispatchContext = &lastContext;
  915. }
  916. ~TDispatchContextSetter() {
  917. Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext;
  918. }
  919. TTestActorRuntimeBase& Runtime;
  920. } DispatchContextSetter(*this, localContext);
  921. TInstant dispatchTime = TInstant::MicroSeconds(0);
  922. TInstant deadline = dispatchTime + DispatchTimeout;
  923. const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10);
  924. TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
  925. if (verbose) {
  926. Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n";
  927. }
  928. struct TTempEdgeEventsCaptor {
  929. TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime)
  930. : Runtime(runtime)
  931. , HasEvents(false)
  932. {
  933. for (auto edgeActor : Runtime.EdgeActors) {
  934. TEventsList events;
  935. Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events);
  936. auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
  937. auto storeIt = Store.find(mboxId);
  938. Y_VERIFY(storeIt == Store.end());
  939. storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first;
  940. storeIt->second->PushFront(events);
  941. if (!events.empty())
  942. HasEvents = true;
  943. }
  944. }
  945. ~TTempEdgeEventsCaptor() {
  946. for (auto edgeActor : Runtime.EdgeActors) {
  947. auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
  948. auto storeIt = Store.find(mboxId);
  949. if (storeIt == Store.end()) {
  950. continue;
  951. }
  952. TEventsList events;
  953. storeIt->second->Capture(events);
  954. Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events);
  955. }
  956. }
  957. TTestActorRuntimeBase& Runtime;
  958. TEventMailBoxList Store;
  959. bool HasEvents;
  960. };
  961. TEventMailBoxList restrictedMailboxes;
  962. const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty();
  963. for (auto mailboxId : options.OnlyMailboxes) {
  964. auto it = Mailboxes.find(mailboxId);
  965. if (it == Mailboxes.end()) {
  966. it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first;
  967. }
  968. restrictedMailboxes.insert(std::make_pair(mailboxId, it->second));
  969. }
  970. TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor;
  971. if (!restrictedMailboxes) {
  972. tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this));
  973. }
  974. TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
  975. while (!currentMailboxes.empty()) {
  976. bool hasProgress = true;
  977. while (hasProgress) {
  978. ++DispatchCyclesCount;
  979. hasProgress = false;
  980. ui64 eventsToDispatch = 0;
  981. for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
  982. if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  983. eventsToDispatch += mboxIt->second->GetSentEventCount();
  984. }
  985. }
  986. ui32 eventsDispatched = 0;
  987. //TODO: count events before each cycle, break after dispatching that much events
  988. bool isEmpty = false;
  989. while (!isEmpty && eventsDispatched < eventsToDispatch) {
  990. ui64 mailboxCount = currentMailboxes.size();
  991. ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
  992. auto startWithMboxIt = currentMailboxes.begin();
  993. for (ui64 i = 0; i < startWith; ++i) {
  994. ++startWithMboxIt;
  995. }
  996. auto endWithMboxIt = startWithMboxIt;
  997. isEmpty = true;
  998. auto mboxIt = startWithMboxIt;
  999. TDeque<TEventMailboxId> suspectedBoxes;
  1000. while (true) {
  1001. auto& mbox = *mboxIt;
  1002. bool isIgnored = true;
  1003. if (!mbox.second->IsEmpty()) {
  1004. HandleNonEmptyMailboxesForEachContext(mbox.first);
  1005. if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  1006. bool isEdgeMailbox = false;
  1007. if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) {
  1008. isEdgeMailbox = true;
  1009. TEventsList events;
  1010. mbox.second->Capture(events);
  1011. for (auto& ev : events) {
  1012. TInverseGuard<TMutex> inverseGuard(Mutex);
  1013. ObserverFunc(*this, ev);
  1014. }
  1015. mbox.second->PushFront(events);
  1016. }
  1017. if (!isEdgeMailbox) {
  1018. isEmpty = false;
  1019. isIgnored = false;
  1020. ++eventsDispatched;
  1021. ++DispatchedEventsCount;
  1022. if (DispatchedEventsCount > DispatchedEventsLimit) {
  1023. ythrow TWithBackTrace<yexception>() << "Dispatched "
  1024. << DispatchedEventsLimit << " events, limit reached.";
  1025. }
  1026. auto ev = mbox.second->Pop();
  1027. if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) {
  1028. //UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10));
  1029. if (verbose) {
  1030. Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
  1031. PrintEvent(ev, this);
  1032. }
  1033. }
  1034. hasProgress = true;
  1035. EEventAction action;
  1036. {
  1037. TInverseGuard<TMutex> inverseGuard(Mutex);
  1038. action = ObserverFunc(*this, ev);
  1039. }
  1040. switch (action) {
  1041. case EEventAction::PROCESS:
  1042. UpdateFinalEventsStatsForEachContext(*ev);
  1043. SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
  1044. break;
  1045. case EEventAction::DROP:
  1046. // do nothing
  1047. break;
  1048. case EEventAction::RESCHEDULE: {
  1049. TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
  1050. mbox.second->Freeze(deadline);
  1051. mbox.second->PushFront(ev);
  1052. break;
  1053. }
  1054. default:
  1055. Y_FAIL("Unknown action");
  1056. }
  1057. }
  1058. }
  1059. }
  1060. Y_VERIFY(mboxIt != currentMailboxes.end());
  1061. if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
  1062. mboxIt->second->IsEmpty() &&
  1063. mboxIt->second->IsScheduledEmpty() &&
  1064. mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  1065. suspectedBoxes.push_back(mboxIt->first);
  1066. }
  1067. ++mboxIt;
  1068. if (mboxIt == currentMailboxes.end()) {
  1069. mboxIt = currentMailboxes.begin();
  1070. }
  1071. Y_VERIFY(endWithMboxIt != currentMailboxes.end());
  1072. if (mboxIt == endWithMboxIt) {
  1073. break;
  1074. }
  1075. }
  1076. for (auto id : suspectedBoxes) {
  1077. auto it = currentMailboxes.find(id);
  1078. if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
  1079. it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  1080. currentMailboxes.erase(it);
  1081. }
  1082. }
  1083. }
  1084. }
  1085. if (localContext.FinalEventFound) {
  1086. return true;
  1087. }
  1088. if (!localContext.FoundNonEmptyMailboxes.empty())
  1089. return true;
  1090. if (options.CustomFinalCondition && options.CustomFinalCondition())
  1091. return true;
  1092. if (options.FinalEvents.empty()) {
  1093. for (auto& mbox : currentMailboxes) {
  1094. if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
  1095. continue;
  1096. if (!mbox.second->IsEmpty()) {
  1097. if (verbose) {
  1098. Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
  1099. }
  1100. return true;
  1101. }
  1102. }
  1103. }
  1104. if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) {
  1105. return false;
  1106. }
  1107. if (dispatchTime >= deadline) {
  1108. if (verbose) {
  1109. Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
  1110. }
  1111. ythrow TWithBackTrace<TEmptyEventQueueException>();
  1112. }
  1113. if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
  1114. inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
  1115. bool isEmpty = true;
  1116. TMaybe<TInstant> nearestMailboxDeadline;
  1117. TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes;
  1118. TMaybe<TInstant> nextScheduleDeadline;
  1119. for (auto& mbox : currentMailboxes) {
  1120. if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  1121. if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) {
  1122. nearestMailboxDeadline = mbox.second->GetInactiveUntil();
  1123. }
  1124. continue;
  1125. }
  1126. if (mbox.second->IsScheduledEmpty())
  1127. continue;
  1128. auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline();
  1129. if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) {
  1130. nextScheduleMboxes.clear();
  1131. nextScheduleMboxes.emplace_back(mbox.second);
  1132. nextScheduleDeadline = firstScheduleDeadline;
  1133. } else if (firstScheduleDeadline == *nextScheduleDeadline) {
  1134. nextScheduleMboxes.emplace_back(mbox.second);
  1135. }
  1136. }
  1137. for (const auto& nextScheduleMbox : nextScheduleMboxes) {
  1138. TEventsList selectedEvents;
  1139. TScheduledEventsList capturedScheduledEvents;
  1140. nextScheduleMbox->CaptureScheduled(capturedScheduledEvents);
  1141. ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents);
  1142. nextScheduleMbox->PushScheduled(capturedScheduledEvents);
  1143. for (auto& event : selectedEvents) {
  1144. if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) {
  1145. Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
  1146. PrintEvent(event, this);
  1147. }
  1148. nextScheduleMbox->Send(event);
  1149. isEmpty = false;
  1150. }
  1151. }
  1152. if (!isEmpty) {
  1153. if (verbose) {
  1154. Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
  1155. }
  1156. deadline = dispatchTime + DispatchTimeout;
  1157. continue;
  1158. }
  1159. if (nearestMailboxDeadline.Defined()) {
  1160. if (verbose) {
  1161. Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n";
  1162. }
  1163. UpdateCurrentTime(*nearestMailboxDeadline.Get());
  1164. continue;
  1165. }
  1166. }
  1167. TDuration waitDelay = TDuration::MilliSeconds(10);
  1168. dispatchTime += waitDelay;
  1169. MailboxesHasEvents.WaitT(Mutex, waitDelay);
  1170. }
  1171. return false;
  1172. }
  1173. void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) {
  1174. TDispatchContext* context = CurrentDispatchContext;
  1175. while (context) {
  1176. const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
  1177. if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) {
  1178. context->FoundNonEmptyMailboxes.insert(mboxId);
  1179. }
  1180. context = context->PrevContext;
  1181. }
  1182. }
  1183. void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) {
  1184. TDispatchContext* context = CurrentDispatchContext;
  1185. while (context) {
  1186. for (const auto& finalEvent : context->Options->FinalEvents) {
  1187. if (finalEvent.EventCheck(ev)) {
  1188. auto& freq = context->FinalEventFrequency[&finalEvent];
  1189. if (++freq >= finalEvent.RequiredCount) {
  1190. context->FinalEventFound = true;
  1191. }
  1192. }
  1193. }
  1194. context = context->PrevContext;
  1195. }
  1196. }
  1197. void TTestActorRuntimeBase::Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventBase> ev, ui32 senderNodeIndex, bool viaActorSystem) {
  1198. Send(new IEventHandle(recipient, sender, ev.Release()), senderNodeIndex, viaActorSystem);
  1199. }
  1200. void TTestActorRuntimeBase::Send(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex, bool viaActorSystem) {
  1201. TGuard<TMutex> guard(Mutex);
  1202. Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
  1203. senderNodeIndex, NodeCount);
  1204. SendInternal(ev, senderNodeIndex, viaActorSystem);
  1205. }
  1206. void TTestActorRuntimeBase::SendAsync(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex) {
  1207. Send(ev, senderNodeIndex, true);
  1208. }
  1209. void TTestActorRuntimeBase::Schedule(TAutoPtr<IEventHandle> ev, const TDuration& duration, ui32 nodeIndex) {
  1210. TGuard<TMutex> guard(Mutex);
  1211. Y_VERIFY(nodeIndex < NodeCount);
  1212. ui32 nodeId = FirstNodeId + nodeIndex;
  1213. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  1214. TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration;
  1215. GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr));
  1216. if (VERBOSE)
  1217. Cerr << "Event was added to scheduled queue\n";
  1218. }
  1219. void TTestActorRuntimeBase::ClearCounters() {
  1220. TGuard<TMutex> guard(Mutex);
  1221. EvCounters.clear();
  1222. }
  1223. ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const {
  1224. TGuard<TMutex> guard(Mutex);
  1225. auto it = EvCounters.find(evType);
  1226. if (it == EvCounters.end())
  1227. return 0;
  1228. return it->second;
  1229. }
  1230. TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
  1231. TGuard<TMutex> guard(Mutex);
  1232. Y_VERIFY(nodeIndex < NodeCount);
  1233. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
  1234. return node->ActorSystem->LookupLocalService(serviceId);
  1235. }
  1236. void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) {
  1237. TGuard<TMutex> guard(Mutex);
  1238. ui32 dispatchCount = 0;
  1239. if (!edgeFilter.empty()) {
  1240. for (auto edgeActor : edgeFilter) {
  1241. Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data());
  1242. }
  1243. }
  1244. const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter;
  1245. TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout;
  1246. for (;;) {
  1247. for (auto edgeActor : edgeActors) {
  1248. TEventsList events;
  1249. auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint());
  1250. bool foundEvent = false;
  1251. mbox.Capture(events);
  1252. for (auto& ev : events) {
  1253. if (filter(*this, ev)) {
  1254. foundEvent = true;
  1255. break;
  1256. }
  1257. }
  1258. mbox.PushFront(events);
  1259. if (foundEvent)
  1260. return;
  1261. }
  1262. ++dispatchCount;
  1263. {
  1264. if (!DispatchEventsInternal(TDispatchOptions(), deadline)) {
  1265. return; // Timed out; event was not found
  1266. }
  1267. }
  1268. Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop");
  1269. }
  1270. }
  1271. TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
  1272. TGuard<TMutex> guard(Mutex);
  1273. Y_VERIFY(nodeIndexFrom < NodeCount);
  1274. Y_VERIFY(nodeIndexTo < NodeCount);
  1275. Y_VERIFY(nodeIndexFrom != nodeIndexTo);
  1276. TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get();
  1277. return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
  1278. }
  1279. void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) {
  1280. TGuard<TMutex> guard(Mutex);
  1281. BlockedOutput.insert(actorId);
  1282. }
  1283. void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
  1284. ui64 days = (time.Hours() / 24);
  1285. DispatcherRandomSeed = (days << 32) ^ iteration;
  1286. DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
  1287. }
  1288. IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
  1289. TGuard<TMutex> guard(Mutex);
  1290. if (nodeIndex == Max<ui32>()) {
  1291. Y_VERIFY(actorId.NodeId());
  1292. nodeIndex = actorId.NodeId() - FirstNodeId;
  1293. }
  1294. Y_VERIFY(nodeIndex < NodeCount);
  1295. auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
  1296. Y_VERIFY(nodeIt != Nodes.end());
  1297. TNodeDataBase* node = nodeIt->second.Get();
  1298. return FindActor(actorId, node);
  1299. }
  1300. void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
  1301. TGuard<TMutex> guard(Mutex);
  1302. if (allow) {
  1303. if (VERBOSE) {
  1304. Cerr << "Actor " << actorId << " added to schedule whitelist";
  1305. }
  1306. ScheduleWhiteList.insert(actorId);
  1307. } else {
  1308. if (VERBOSE) {
  1309. Cerr << "Actor " << actorId << " removed from schedule whitelist";
  1310. }
  1311. ScheduleWhiteList.erase(actorId);
  1312. }
  1313. }
  1314. bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const {
  1315. TGuard<TMutex> guard(Mutex);
  1316. return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
  1317. }
  1318. TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) {
  1319. TGuard<TMutex> guard(Mutex);
  1320. Y_VERIFY(nodeIndex < NodeCount);
  1321. ui32 nodeId = FirstNodeId + nodeIndex;
  1322. TNodeDataBase* node = Nodes[nodeId].Get();
  1323. return node->DynamicCounters;
  1324. }
  1325. void TTestActorRuntimeBase::SetupMonitoring() {
  1326. NeedMonitoring = true;
  1327. }
  1328. void TTestActorRuntimeBase::SendInternal(TAutoPtr<IEventHandle> ev, ui32 nodeIndex, bool viaActorSystem) {
  1329. Y_VERIFY(nodeIndex < NodeCount);
  1330. ui32 nodeId = FirstNodeId + nodeIndex;
  1331. TNodeDataBase* node = Nodes[nodeId].Get();
  1332. ui32 targetNode = ev->GetRecipientRewrite().NodeId();
  1333. ui32 targetNodeIndex;
  1334. if (targetNode == 0) {
  1335. targetNodeIndex = nodeIndex;
  1336. } else {
  1337. targetNodeIndex = targetNode - FirstNodeId;
  1338. Y_VERIFY(targetNodeIndex < NodeCount);
  1339. }
  1340. if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) {
  1341. node->ActorSystem->Send(ev);
  1342. return;
  1343. }
  1344. Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex));
  1345. if (!AllowSendFrom(node, ev)) {
  1346. return;
  1347. }
  1348. ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
  1349. TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
  1350. if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
  1351. mbox.PushFront(ev);
  1352. return;
  1353. }
  1354. ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId();
  1355. if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) {
  1356. Cerr << "Send event, ";
  1357. PrintEvent(ev, this);
  1358. }
  1359. EvCounters[ev->GetTypeRewrite()]++;
  1360. TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
  1361. IActor* recipientActor = mailbox->FindActor(recipientLocalId);
  1362. if (recipientActor) {
  1363. // Save actorId by value in order to prevent ctx from being invalidated during another Send call.
  1364. TActorId actorId = ev->GetRecipientRewrite();
  1365. node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
  1366. TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
  1367. TActivationContext *prevTlsActivationContext = TlsActivationContext;
  1368. TlsActivationContext = &ctx;
  1369. CurrentRecipient = actorId;
  1370. {
  1371. TInverseGuard<TMutex> inverseGuard(Mutex);
  1372. #ifdef USE_ACTOR_CALLSTACK
  1373. TCallstack::GetTlsCallstack() = ev->Callstack;
  1374. TCallstack::GetTlsCallstack().SetLinesToSkip();
  1375. #endif
  1376. recipientActor->Receive(ev);
  1377. node->ExecutorThread->DropUnregistered();
  1378. }
  1379. CurrentRecipient = TActorId();
  1380. TlsActivationContext = prevTlsActivationContext;
  1381. } else {
  1382. if (VERBOSE) {
  1383. Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n";
  1384. }
  1385. auto fw = IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::ReasonActorUnknown);
  1386. node->ActorSystem->Send(fw);
  1387. }
  1388. }
  1389. IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const {
  1390. ui32 mailboxHint = actorId.Hint();
  1391. ui64 localId = actorId.LocalId();
  1392. TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
  1393. IActor* actor = mailbox->FindActor(localId);
  1394. return actor;
  1395. }
  1396. THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) {
  1397. THolder<TActorSystemSetup> setup(new TActorSystemSetup);
  1398. setup->NodeId = FirstNodeId + nodeIndex;
  1399. if (UseRealThreads) {
  1400. setup->ExecutorsCount = 5;
  1401. setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]);
  1402. setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
  1403. setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
  1404. setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
  1405. setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
  1406. setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20));
  1407. setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100)));
  1408. } else {
  1409. setup->ExecutorsCount = 1;
  1410. setup->Scheduler.Reset(new TSchedulerThreadStub(this, node));
  1411. setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
  1412. setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
  1413. }
  1414. InitActorSystemSetup(*setup);
  1415. return setup;
  1416. }
  1417. THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) {
  1418. auto setup = MakeActorSystemSetup(nodeIndex, node);
  1419. node->ExecutorPools.resize(setup->ExecutorsCount);
  1420. for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
  1421. node->ExecutorPools[i] = setup->Executors[i].Get();
  1422. }
  1423. const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");
  1424. for (const auto& cmd : node->LocalServices) {
  1425. setup->LocalServices.emplace_back(cmd.first, TActorSetupCmd(cmd.second.Actor, cmd.second.MailboxType, cmd.second.PoolId));
  1426. }
  1427. setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
  1428. const TActorId nameserviceId = GetNameserviceActorId();
  1429. TIntrusivePtr<TInterconnectProxyCommon> common;
  1430. common.Reset(new TInterconnectProxyCommon);
  1431. common->NameserviceId = nameserviceId;
  1432. common->MonCounters = interconnectCounters;
  1433. common->TechnicalSelfHostName = "::1";
  1434. if (!UseRealThreads) {
  1435. common->Settings.DeadPeer = TDuration::Max();
  1436. common->Settings.CloseOnIdle = TDuration::Max();
  1437. common->Settings.PingPeriod = TDuration::Max();
  1438. common->Settings.ForceConfirmPeriod = TDuration::Max();
  1439. common->Settings.Handshake = TDuration::Max();
  1440. }
  1441. common->ClusterUUID = ClusterUUID;
  1442. common->AcceptUUID = {ClusterUUID};
  1443. if (ICCommonSetupper) {
  1444. ICCommonSetupper(nodeIndex, common);
  1445. }
  1446. for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) {
  1447. if (proxyNodeIndex == nodeIndex)
  1448. continue;
  1449. const ui32 peerNodeId = FirstNodeId + proxyNodeIndex;
  1450. IActor *proxyActor = UseRealInterconnect
  1451. ? new TInterconnectProxyTCP(peerNodeId, common)
  1452. : InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common);
  1453. setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()};
  1454. }
  1455. setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock);
  1456. if (UseRealInterconnect) {
  1457. setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(),
  1458. NActors::TMailboxType::Simple, InterconnectPoolId()));
  1459. }
  1460. if (!SingleSysEnv) { // Single system env should do this self
  1461. TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend();
  1462. NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
  1463. logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
  1464. NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId());
  1465. std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, std::move(loggerActorCmd));
  1466. setup->LocalServices.push_back(std::move(loggerActorPair));
  1467. }
  1468. return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
  1469. }
  1470. TActorSystem* TTestActorRuntimeBase::SingleSys() const {
  1471. Y_VERIFY(Nodes.size() == 1, "Works only for single system env");
  1472. return Nodes.begin()->second->ActorSystem.Get();
  1473. }
  1474. TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() {
  1475. for (auto& x : Nodes) {
  1476. return x.second->ActorSystem.Get();
  1477. }
  1478. Y_FAIL("Don't use this method.");
  1479. }
  1480. TActorSystem* TTestActorRuntimeBase::GetActorSystem(ui32 nodeId) {
  1481. auto it = Nodes.find(GetNodeId(nodeId));
  1482. Y_VERIFY(it != Nodes.end());
  1483. return it->second->ActorSystem.Get();
  1484. }
  1485. TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) {
  1486. TGuard<TMutex> guard(Mutex);
  1487. auto mboxId = TEventMailboxId(nodeId, hint);
  1488. auto it = Mailboxes.find(mboxId);
  1489. if (it == Mailboxes.end()) {
  1490. it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first;
  1491. }
  1492. return *it->second;
  1493. }
  1494. void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) {
  1495. TGuard<TMutex> guard(Mutex);
  1496. auto mboxId = TEventMailboxId(nodeId, hint);
  1497. Mailboxes.erase(mboxId);
  1498. }
  1499. TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const {
  1500. auto it = ActorNames.find(actorId);
  1501. if (it != ActorNames.end())
  1502. return it->second;
  1503. return actorId.ToString();
  1504. }
  1505. struct TStrandingActorDecoratorContext : public TThrRefBase {
  1506. TStrandingActorDecoratorContext()
  1507. : Queue(new TQueueType)
  1508. {
  1509. }
  1510. typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType;
  1511. TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
  1512. };
  1513. class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
  1514. public:
  1515. class TReplyActor : public TActor<TReplyActor> {
  1516. public:
  1517. static constexpr EActivityType ActorActivityType() {
  1518. return EActivityType::TEST_ACTOR_RUNTIME;
  1519. }
  1520. TReplyActor(TStrandingActorDecorator* owner)
  1521. : TActor(&TReplyActor::StateFunc)
  1522. , Owner(owner)
  1523. {
  1524. }
  1525. STFUNC(StateFunc);
  1526. private:
  1527. TStrandingActorDecorator* const Owner;
  1528. };
  1529. static constexpr EActivityType ActorActivityType() {
  1530. return EActivityType::TEST_ACTOR_RUNTIME;
  1531. }
  1532. TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
  1533. TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
  1534. TReplyCheckerCreator createReplyChecker)
  1535. : Delegatee(delegatee)
  1536. , IsSync(isSync)
  1537. , AdditionalActors(additionalActors)
  1538. , Context(context)
  1539. , HasReply(false)
  1540. , Runtime(runtime)
  1541. , ReplyChecker(createReplyChecker())
  1542. {
  1543. if (IsSync) {
  1544. Y_VERIFY(!runtime->IsRealThreads());
  1545. }
  1546. }
  1547. void Bootstrap(const TActorContext& ctx) {
  1548. Become(&TStrandingActorDecorator::StateFunc);
  1549. ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this));
  1550. DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
  1551. for (const auto& actor : AdditionalActors) {
  1552. DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));
  1553. }
  1554. DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
  1555. DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
  1556. DelegateeOptions.Quiet = true;
  1557. }
  1558. STFUNC(StateFunc) {
  1559. bool wasEmpty = !Context->Queue->Head();
  1560. Context->Queue->Push(ev.Release());
  1561. if (wasEmpty) {
  1562. SendHead(ActorContext());
  1563. }
  1564. }
  1565. STFUNC(Reply) {
  1566. Y_VERIFY(!HasReply);
  1567. IEventHandle *requestEv = Context->Queue->Head();
  1568. TActorId originalSender = requestEv->Sender;
  1569. HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get());
  1570. if (HasReply) {
  1571. delete Context->Queue->Pop();
  1572. }
  1573. auto ctx(ActorContext());
  1574. ctx.ExecutorThread.Send(IEventHandle::Forward(ev, originalSender));
  1575. if (!IsSync && Context->Queue->Head()) {
  1576. SendHead(ctx);
  1577. }
  1578. }
  1579. private:
  1580. void SendHead(const TActorContext& ctx) {
  1581. if (!IsSync) {
  1582. ctx.ExecutorThread.Send(GetForwardedEvent().Release());
  1583. } else {
  1584. while (Context->Queue->Head()) {
  1585. HasReply = false;
  1586. ctx.ExecutorThread.Send(GetForwardedEvent().Release());
  1587. int count = 100;
  1588. while (!HasReply && count > 0) {
  1589. try {
  1590. Runtime->DispatchEvents(DelegateeOptions);
  1591. } catch (TEmptyEventQueueException&) {
  1592. count--;
  1593. Cerr << "No reply" << Endl;
  1594. }
  1595. }
  1596. Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000));
  1597. }
  1598. }
  1599. }
  1600. TAutoPtr<IEventHandle> GetForwardedEvent() {
  1601. IEventHandle* ev = Context->Queue->Head();
  1602. ReplyChecker->OnRequest(ev);
  1603. TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
  1604. ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
  1605. : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);
  1606. return forwardedEv;
  1607. }
  1608. private:
  1609. const TActorId Delegatee;
  1610. const bool IsSync;
  1611. const TVector<TActorId> AdditionalActors;
  1612. TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
  1613. TActorId ReplyId;
  1614. bool HasReply;
  1615. TDispatchOptions DelegateeOptions;
  1616. TTestActorRuntimeBase* Runtime;
  1617. THolder<IReplyChecker> ReplyChecker;
  1618. };
  1619. void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
  1620. Owner->Reply(ev);
  1621. }
  1622. class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
  1623. public:
  1624. TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
  1625. TReplyCheckerCreator createReplyChecker)
  1626. : Context(new TStrandingActorDecoratorContext())
  1627. , Runtime(runtime)
  1628. , CreateReplyChecker(createReplyChecker)
  1629. {
  1630. }
  1631. IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
  1632. return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
  1633. CreateReplyChecker);
  1634. }
  1635. private:
  1636. TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
  1637. TTestActorRuntimeBase* Runtime;
  1638. TReplyCheckerCreator CreateReplyChecker;
  1639. };
  1640. TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
  1641. TReplyCheckerCreator createReplyChecker) {
  1642. return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker));
  1643. }
  1644. ui64 DefaultRandomSeed = 9999;
  1645. }