test_runtime.h 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716
  1. #pragma once
  2. #include <library/cpp/actors/core/actor.h>
  3. #include <library/cpp/actors/core/actorsystem.h>
  4. #include <library/cpp/actors/core/log.h>
  5. #include <library/cpp/actors/core/events.h>
  6. #include <library/cpp/actors/core/executor_thread.h>
  7. #include <library/cpp/actors/core/mailbox.h>
  8. #include <library/cpp/actors/util/should_continue.h>
  9. #include <library/cpp/actors/interconnect/poller_tcp.h>
  10. #include <library/cpp/actors/interconnect/mock/ic_mock.h>
  11. #include <library/cpp/random_provider/random_provider.h>
  12. #include <library/cpp/time_provider/time_provider.h>
  13. #include <library/cpp/testing/unittest/tests_data.h>
  14. #include <util/datetime/base.h>
  15. #include <util/folder/tempdir.h>
  16. #include <util/generic/deque.h>
  17. #include <util/generic/hash.h>
  18. #include <util/generic/noncopyable.h>
  19. #include <util/generic/ptr.h>
  20. #include <util/generic/queue.h>
  21. #include <util/generic/set.h>
  22. #include <util/generic/vector.h>
  23. #include <util/system/defaults.h>
  24. #include <util/system/mutex.h>
  25. #include <util/system/condvar.h>
  26. #include <util/system/thread.h>
  27. #include <util/system/sanitizers.h>
  28. #include <util/system/valgrind.h>
  29. #include <utility>
  30. #include <functional>
  31. const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer(
  32. NValgrind::PlainOrUnderValgrind(TDuration::Seconds(60), TDuration::Seconds(120)),
  33. TDuration::Seconds(120)
  34. );
  35. namespace NActors {
  36. struct THeSingleSystemEnv { };
  37. struct TEventMailboxId {
  38. TEventMailboxId()
  39. : NodeId(0)
  40. , Hint(0)
  41. {
  42. }
  43. TEventMailboxId(ui32 nodeId, ui32 hint)
  44. : NodeId(nodeId)
  45. , Hint(hint)
  46. {
  47. }
  48. bool operator<(const TEventMailboxId& other) const {
  49. return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint);
  50. }
  51. bool operator==(const TEventMailboxId& other) const {
  52. return (NodeId == other.NodeId) && (Hint == other.Hint);
  53. }
  54. struct THash {
  55. ui64 operator()(const TEventMailboxId& mboxId) const noexcept {
  56. return mboxId.NodeId * 31ULL + mboxId.Hint;
  57. }
  58. };
  59. ui32 NodeId;
  60. ui32 Hint;
  61. };
  62. struct TDispatchOptions {
  63. struct TFinalEventCondition {
  64. std::function<bool(IEventHandle& ev)> EventCheck;
  65. ui32 RequiredCount;
  66. TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1)
  67. : EventCheck([eventType](IEventHandle& ev) -> bool { return ev.GetTypeRewrite() == eventType; })
  68. , RequiredCount(requiredCount)
  69. {
  70. }
  71. TFinalEventCondition(std::function<bool(IEventHandle& ev)> eventCheck, ui32 requiredCount = 1)
  72. : EventCheck(eventCheck)
  73. , RequiredCount(requiredCount)
  74. {
  75. }
  76. };
  77. TVector<TFinalEventCondition> FinalEvents;
  78. TVector<TEventMailboxId> NonEmptyMailboxes;
  79. TVector<TEventMailboxId> OnlyMailboxes;
  80. std::function<bool()> CustomFinalCondition;
  81. bool Quiet = false;
  82. };
  83. struct TScheduledEventQueueItem {
  84. TInstant Deadline;
  85. TAutoPtr<IEventHandle> Event;
  86. TAutoPtr<TSchedulerCookieHolder> Cookie;
  87. ui64 UniqueId;
  88. TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie)
  89. : Deadline(deadline)
  90. , Event(event)
  91. , Cookie(new TSchedulerCookieHolder(cookie))
  92. , UniqueId(++NextUniqueId)
  93. {}
  94. bool operator<(const TScheduledEventQueueItem& other) const {
  95. if (Deadline < other.Deadline)
  96. return true;
  97. if (Deadline > other.Deadline)
  98. return false;
  99. return UniqueId < other.UniqueId;
  100. }
  101. static ui64 NextUniqueId;
  102. };
  103. typedef TDeque<TAutoPtr<IEventHandle>> TEventsList;
  104. typedef TSet<TScheduledEventQueueItem> TScheduledEventsList;
  105. class TEventMailBox : public TThrRefBase {
  106. public:
  107. TEventMailBox()
  108. : InactiveUntil(TInstant::MicroSeconds(0))
  109. #ifdef DEBUG_ORDER_EVENTS
  110. , ExpectedReceive(0)
  111. , NextToSend(0)
  112. #endif
  113. {
  114. }
  115. void Send(TAutoPtr<IEventHandle> ev);
  116. bool IsEmpty() const;
  117. TAutoPtr<IEventHandle> Pop();
  118. void Capture(TEventsList& evList);
  119. void PushFront(TAutoPtr<IEventHandle>& ev);
  120. void PushFront(TEventsList& evList);
  121. void CaptureScheduled(TScheduledEventsList& evList);
  122. void PushScheduled(TScheduledEventsList& evList);
  123. bool IsActive(const TInstant& currentTime) const;
  124. void Freeze(const TInstant& deadline);
  125. TInstant GetInactiveUntil() const;
  126. void Schedule(const TScheduledEventQueueItem& item);
  127. bool IsScheduledEmpty() const;
  128. TInstant GetFirstScheduleDeadline() const;
  129. ui64 GetSentEventCount() const;
  130. private:
  131. TScheduledEventsList Scheduled;
  132. TInstant InactiveUntil;
  133. TEventsList Sent;
  134. #ifdef DEBUG_ORDER_EVENTS
  135. TMap<IEventHandle*, ui64> TrackSent;
  136. ui64 ExpectedReceive;
  137. ui64 NextToSend;
  138. #endif
  139. };
  140. typedef THashMap<TEventMailboxId, TIntrusivePtr<TEventMailBox>, TEventMailboxId::THash> TEventMailBoxList;
  141. class TEmptyEventQueueException : public yexception {
  142. public:
  143. TEmptyEventQueueException() {
  144. Append("Event queue is still empty.");
  145. }
  146. };
  147. class TSchedulingLimitReachedException : public yexception {
  148. public:
  149. TSchedulingLimitReachedException(ui64 limit) {
  150. TStringStream str;
  151. str << "TestActorRuntime Processed over " << limit << " events.";
  152. Append(str.Str());
  153. }
  154. };
  155. class TTestActorRuntimeBase: public TNonCopyable {
  156. public:
  157. class TEdgeActor;
  158. class TSchedulerThreadStub;
  159. class TExecutorPoolStub;
  160. class TTimeProvider;
  161. enum class EEventAction {
  162. PROCESS,
  163. DROP,
  164. RESCHEDULE
  165. };
  166. typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver;
  167. typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector;
  168. typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter;
  169. typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter;
  170. typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver;
  171. TTestActorRuntimeBase(THeSingleSystemEnv);
  172. TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads);
  173. TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount);
  174. TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false);
  175. virtual ~TTestActorRuntimeBase();
  176. bool IsRealThreads() const;
  177. static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
  178. static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
  179. static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
  180. static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
  181. static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline);
  182. static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId);
  183. TEventObserver SetObserverFunc(TEventObserver observerFunc);
  184. TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc);
  185. TEventFilter SetEventFilter(TEventFilter filterFunc);
  186. TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc);
  187. TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc);
  188. static bool IsVerbose();
  189. static void SetVerbose(bool verbose);
  190. TDuration SetDispatchTimeout(TDuration timeout);
  191. void SetDispatchedEventsLimit(ui64 limit) {
  192. DispatchedEventsLimit = limit;
  193. }
  194. TDuration SetReschedulingDelay(TDuration delay);
  195. void SetLogBackend(const TAutoPtr<TLogBackend> logBackend);
  196. void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority);
  197. TIntrusivePtr<ITimeProvider> GetTimeProvider();
  198. TInstant GetCurrentTime() const;
  199. void UpdateCurrentTime(TInstant newTime);
  200. void AdvanceCurrentTime(TDuration duration);
  201. void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0);
  202. virtual void Initialize();
  203. ui32 GetNodeId(ui32 index = 0) const;
  204. ui32 GetNodeCount() const;
  205. ui64 AllocateLocalId();
  206. ui32 InterconnectPoolId() const;
  207. TString GetTempDir();
  208. TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0,
  209. TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0,
  210. const TActorId& parentid = TActorId());
  211. TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
  212. const TActorId& parentid = TActorId());
  213. TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0);
  214. TActorId AllocateEdgeActor(ui32 nodeIndex = 0);
  215. TEventsList CaptureEvents();
  216. TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId);
  217. TScheduledEventsList CaptureScheduledEvents();
  218. void PushFront(TAutoPtr<IEventHandle>& ev);
  219. void PushEventsFront(TEventsList& events);
  220. void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events);
  221. // doesn't dispatch events for edge actors
  222. bool DispatchEvents(const TDispatchOptions& options = TDispatchOptions());
  223. bool DispatchEvents(const TDispatchOptions& options, TDuration simTimeout);
  224. bool DispatchEvents(const TDispatchOptions& options, TInstant simDeadline);
  225. void Send(IEventHandle* ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false);
  226. void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0);
  227. void ClearCounters();
  228. ui64 GetCounter(ui32 evType) const;
  229. TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0);
  230. void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max());
  231. TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo);
  232. void BlockOutputForActor(const TActorId& actorId);
  233. IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
  234. void EnableScheduleForActor(const TActorId& actorId, bool allow = true);
  235. bool IsScheduleForActorEnabled(const TActorId& actorId) const;
  236. TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
  237. void SetupMonitoring();
  238. template<typename T>
  239. void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) {
  240. Y_VERIFY(!IsInitialized);
  241. for (const auto& pair : Nodes) {
  242. pair.second->LogSettings->Append(minVal, maxVal, func);
  243. }
  244. }
  245. TIntrusivePtr<NLog::TSettings> GetLogSettings(ui32 nodeIdx)
  246. {
  247. return Nodes[FirstNodeId + nodeIdx]->LogSettings;
  248. }
  249. TActorSystem* SingleSys() const;
  250. TActorSystem* GetAnyNodeActorSystem();
  251. TActorSystem* GetActorSystem(ui32 nodeId);
  252. template <typename TEvent>
  253. TEvent* GrabEdgeEventIf(TAutoPtr<IEventHandle>& handle, std::function<bool(const TEvent&)> predicate, TDuration simTimeout = TDuration::Max()) {
  254. handle.Destroy();
  255. const ui32 eventType = TEvent::EventType;
  256. WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
  257. Y_UNUSED(runtime);
  258. if (event->GetTypeRewrite() != eventType)
  259. return false;
  260. TEvent* typedEvent = reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(event)->Get();
  261. if (predicate(*typedEvent)) {
  262. handle = event;
  263. return true;
  264. }
  265. return false;
  266. }, {}, simTimeout);
  267. if (simTimeout == TDuration::Max())
  268. Y_VERIFY(handle);
  269. if (handle) {
  270. return reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(handle)->Get();
  271. } else {
  272. return nullptr;
  273. }
  274. }
  275. template<class TEvent>
  276. typename TEvent::TPtr GrabEdgeEventIf(
  277. const TSet<TActorId>& edgeFilter,
  278. const std::function<bool(const typename TEvent::TPtr&)>& predicate,
  279. TDuration simTimeout = TDuration::Max())
  280. {
  281. typename TEvent::TPtr handle;
  282. const ui32 eventType = TEvent::EventType;
  283. WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
  284. Y_UNUSED(runtime);
  285. if (event->GetTypeRewrite() != eventType)
  286. return false;
  287. typename TEvent::TPtr* typedEvent = reinterpret_cast<typename TEvent::TPtr*>(&event);
  288. if (predicate(*typedEvent)) {
  289. handle = *typedEvent;
  290. return true;
  291. }
  292. return false;
  293. }, edgeFilter, simTimeout);
  294. if (simTimeout == TDuration::Max())
  295. Y_VERIFY(handle);
  296. return handle;
  297. }
  298. template<class TEvent>
  299. typename TEvent::TPtr GrabEdgeEventIf(
  300. const TActorId& edgeActor,
  301. const std::function<bool(const typename TEvent::TPtr&)>& predicate,
  302. TDuration simTimeout = TDuration::Max())
  303. {
  304. TSet<TActorId> edgeFilter{edgeActor};
  305. return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout);
  306. }
  307. template <typename TEvent>
  308. TEvent* GrabEdgeEvent(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
  309. std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; };
  310. return GrabEdgeEventIf(handle, truth, simTimeout);
  311. }
  312. template <typename TEvent>
  313. THolder<TEvent> GrabEdgeEvent(TDuration simTimeout = TDuration::Max()) {
  314. TAutoPtr<IEventHandle> handle;
  315. std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; };
  316. GrabEdgeEventIf(handle, truth, simTimeout);
  317. return THolder(handle ? handle->Release<TEvent>().Release() : nullptr);
  318. }
  319. template<class TEvent>
  320. typename TEvent::TPtr GrabEdgeEvent(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) {
  321. return GrabEdgeEventIf<TEvent>(edgeFilter, [](const typename TEvent::TPtr&) { return true; }, simTimeout);
  322. }
  323. template<class TEvent>
  324. typename TEvent::TPtr GrabEdgeEvent(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) {
  325. TSet<TActorId> edgeFilter{edgeActor};
  326. return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout);
  327. }
  328. // replace with std::variant<>
  329. template <typename... TEvents>
  330. std::tuple<TEvents*...> GrabEdgeEvents(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
  331. handle.Destroy();
  332. auto eventTypes = { TEvents::EventType... };
  333. WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
  334. if (std::find(std::begin(eventTypes), std::end(eventTypes), event->GetTypeRewrite()) == std::end(eventTypes))
  335. return false;
  336. handle = event;
  337. return true;
  338. }, {}, simTimeout);
  339. if (simTimeout == TDuration::Max())
  340. Y_VERIFY(handle);
  341. if (handle) {
  342. return std::make_tuple(handle->Type == TEvents::EventType
  343. ? reinterpret_cast<TAutoPtr<TEventHandle<TEvents>>&>(handle)->Get()
  344. : static_cast<TEvents*>(nullptr)...);
  345. }
  346. return {};
  347. }
  348. template <typename TEvent>
  349. TEvent* GrabEdgeEventRethrow(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
  350. try {
  351. return GrabEdgeEvent<TEvent>(handle, simTimeout);
  352. } catch (...) {
  353. ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage();
  354. }
  355. }
  356. template<class TEvent>
  357. typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) {
  358. try {
  359. return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout);
  360. } catch (...) {
  361. ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage();
  362. }
  363. }
  364. template<class TEvent>
  365. typename TEvent::TPtr GrabEdgeEventRethrow(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) {
  366. try {
  367. return GrabEdgeEvent<TEvent>(edgeActor, simTimeout);
  368. } catch (...) {
  369. ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage();
  370. }
  371. }
  372. template <typename... TEvents>
  373. static TString TypeNames() {
  374. static TString names[] = { TypeName<TEvents>()... };
  375. TString result;
  376. for (const TString& s : names) {
  377. if (result.empty()) {
  378. result += '<';
  379. } else {
  380. result += ',';
  381. }
  382. result += s;
  383. }
  384. if (!result.empty()) {
  385. result += '>';
  386. }
  387. return result;
  388. }
  389. template <typename... TEvents>
  390. std::tuple<TEvents*...> GrabEdgeEventsRethrow(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
  391. try {
  392. return GrabEdgeEvents<TEvents...>(handle, simTimeout);
  393. } catch (...) {
  394. ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeNames<TEvents...>() << ": " << CurrentExceptionMessage();
  395. }
  396. }
  397. void ResetScheduledCount() {
  398. ScheduledCount = 0;
  399. }
  400. void SetScheduledLimit(ui64 limit) {
  401. ScheduledLimit = limit;
  402. }
  403. void SetDispatcherRandomSeed(TInstant time, ui64 iteration);
  404. TString GetActorName(const TActorId& actorId) const;
  405. const TVector<ui64>& GetTxAllocatorTabletIds() const { return TxAllocatorTabletIds; }
  406. void SetTxAllocatorTabletIds(const TVector<ui64>& ids) { TxAllocatorTabletIds = ids; }
  407. void SetUseRealInterconnect() {
  408. UseRealInterconnect = true;
  409. }
  410. protected:
  411. struct TNodeDataBase;
  412. TNodeDataBase* GetRawNode(ui32 node) const {
  413. return Nodes.at(FirstNodeId + node).Get();
  414. }
  415. static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId);
  416. virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) {
  417. Y_UNUSED(counters);
  418. Y_UNUSED(component);
  419. // do nothing, just return the existing counters
  420. return counters;
  421. }
  422. THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node);
  423. THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node);
  424. virtual void InitActorSystemSetup(TActorSystemSetup& setup) {
  425. Y_UNUSED(setup);
  426. }
  427. private:
  428. IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const;
  429. void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem);
  430. TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint);
  431. void ClearMailbox(ui32 nodeId, ui32 hint);
  432. void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId);
  433. void UpdateFinalEventsStatsForEachContext(IEventHandle& ev);
  434. bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline);
  435. private:
  436. ui64 ScheduledCount;
  437. ui64 ScheduledLimit;
  438. THolder<TTempDir> TmpDir;
  439. const TThread::TId MainThreadId;
  440. protected:
  441. bool UseRealInterconnect = false;
  442. TInterconnectMock InterconnectMock;
  443. bool IsInitialized = false;
  444. bool SingleSysEnv = false;
  445. const TString ClusterUUID;
  446. const ui32 FirstNodeId;
  447. const ui32 NodeCount;
  448. const ui32 DataCenterCount;
  449. const bool UseRealThreads;
  450. ui64 LocalId;
  451. TMutex Mutex;
  452. TCondVar MailboxesHasEvents;
  453. TEventMailBoxList Mailboxes;
  454. TMap<ui32, ui64> EvCounters;
  455. ui64 DispatchCyclesCount;
  456. ui64 DispatchedEventsCount;
  457. ui64 DispatchedEventsLimit = 2'500'000;
  458. TActorId CurrentRecipient;
  459. ui64 DispatcherRandomSeed;
  460. TIntrusivePtr<IRandomProvider> DispatcherRandomProvider;
  461. TAutoPtr<TLogBackend> LogBackend;
  462. bool NeedMonitoring;
  463. TIntrusivePtr<IRandomProvider> RandomProvider;
  464. TIntrusivePtr<ITimeProvider> TimeProvider;
  465. protected:
  466. struct TNodeDataBase: public TThrRefBase {
  467. TNodeDataBase();
  468. void Stop();
  469. virtual ~TNodeDataBase();
  470. virtual ui64 GetLoggerPoolId() const {
  471. return 0;
  472. }
  473. template <typename T = void>
  474. T* GetAppData() {
  475. return static_cast<T*>(AppData0.get());
  476. }
  477. template <typename T = void>
  478. const T* GetAppData() const {
  479. return static_cast<T*>(AppData0.get());
  480. }
  481. TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
  482. TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
  483. TIntrusivePtr<NInterconnect::TPollerThreads> Poller;
  484. volatile ui64* ActorSystemTimestamp;
  485. volatile ui64* ActorSystemMonotonic;
  486. TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices;
  487. TMap<TActorId, IActor*> LocalServicesActors;
  488. TMap<IActor*, TActorId> ActorToActorId;
  489. THolder<TMailboxTable> MailboxTable;
  490. std::shared_ptr<void> AppData0;
  491. THolder<TActorSystem> ActorSystem;
  492. THolder<IExecutorPool> SchedulerPool;
  493. TVector<IExecutorPool*> ExecutorPools;
  494. THolder<TExecutorThread> ExecutorThread;
  495. };
  496. struct INodeFactory {
  497. virtual ~INodeFactory() = default;
  498. virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0;
  499. };
  500. struct TDefaultNodeFactory final: INodeFactory {
  501. virtual TIntrusivePtr<TNodeDataBase> CreateNode() override {
  502. return new TNodeDataBase();
  503. }
  504. };
  505. INodeFactory& GetNodeFactory() {
  506. return *NodeFactory;
  507. }
  508. virtual TNodeDataBase* GetNodeById(size_t idx) {
  509. return Nodes[idx].Get();
  510. }
  511. void InitNodes();
  512. void CleanupNodes();
  513. virtual void InitNodeImpl(TNodeDataBase*, size_t);
  514. static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev);
  515. protected:
  516. THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory};
  517. private:
  518. void InitNode(TNodeDataBase* node, size_t idx);
  519. struct TDispatchContext {
  520. const TDispatchOptions* Options;
  521. TDispatchContext* PrevContext;
  522. TMap<const TDispatchOptions::TFinalEventCondition*, ui32> FinalEventFrequency;
  523. TSet<TEventMailboxId> FoundNonEmptyMailboxes;
  524. bool FinalEventFound = false;
  525. };
  526. TProgramShouldContinue ShouldContinue;
  527. TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes;
  528. ui64 CurrentTimestamp;
  529. TSet<TActorId> EdgeActors;
  530. THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox;
  531. TDuration DispatchTimeout;
  532. TDuration ReschedulingDelay;
  533. TEventObserver ObserverFunc;
  534. TScheduledEventsSelector ScheduledEventsSelectorFunc;
  535. TEventFilter EventFilterFunc;
  536. TScheduledEventFilter ScheduledEventFilterFunc;
  537. TRegistrationObserver RegistrationObserver;
  538. TSet<TActorId> BlockedOutput;
  539. TSet<TActorId> ScheduleWhiteList;
  540. THashMap<TActorId, TActorId> ScheduleWhiteListParent;
  541. THashMap<TActorId, TString> ActorNames;
  542. TDispatchContext* CurrentDispatchContext;
  543. TVector<ui64> TxAllocatorTabletIds;
  544. static ui32 NextNodeId;
  545. };
  546. template <typename TEvent>
  547. TEvent* FindEvent(TEventsList& events) {
  548. for (auto& event : events) {
  549. if (event && event->GetTypeRewrite() == TEvent::EventType) {
  550. return static_cast<TEvent*>(event->GetBase());
  551. }
  552. }
  553. return nullptr;
  554. }
  555. template <typename TEvent>
  556. TEvent* FindEvent(TEventsList& events, const std::function<bool(const TEvent&)>& predicate) {
  557. for (auto& event : events) {
  558. if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*static_cast<TEvent*>(event->GetBase()))) {
  559. return static_cast<TEvent*>(event->GetBase());
  560. }
  561. }
  562. return nullptr;
  563. }
  564. template <typename TEvent>
  565. TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev) {
  566. ev.Destroy();
  567. for (auto& event : events) {
  568. if (event && event->GetTypeRewrite() == TEvent::EventType) {
  569. ev = event;
  570. return static_cast<TEvent*>(ev->GetBase());
  571. }
  572. }
  573. return nullptr;
  574. }
  575. template <typename TEvent>
  576. TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev,
  577. const std::function<bool(const typename TEvent::TPtr&)>& predicate) {
  578. ev.Destroy();
  579. for (auto& event : events) {
  580. if (event && event->GetTypeRewrite() == TEvent::EventType) {
  581. if (predicate(reinterpret_cast<const typename TEvent::TPtr&>(event))) {
  582. ev = event;
  583. return static_cast<TEvent*>(ev->GetBase());
  584. }
  585. }
  586. }
  587. return nullptr;
  588. }
  589. class IStrandingDecoratorFactory {
  590. public:
  591. virtual ~IStrandingDecoratorFactory() {}
  592. virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0;
  593. };
  594. struct IReplyChecker {
  595. virtual ~IReplyChecker() {}
  596. virtual void OnRequest(IEventHandle *request) = 0;
  597. virtual bool IsWaitingForMoreResponses(IEventHandle *response) = 0;
  598. };
  599. struct TNoneReplyChecker : IReplyChecker {
  600. void OnRequest(IEventHandle*) override {
  601. }
  602. bool IsWaitingForMoreResponses(IEventHandle*) override {
  603. return false;
  604. }
  605. };
  606. using TReplyCheckerCreator = std::function<THolder<IReplyChecker>(void)>;
  607. inline THolder<IReplyChecker> CreateNoneReplyChecker() {
  608. return MakeHolder<TNoneReplyChecker>();
  609. }
  610. TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
  611. TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker);
  612. extern ui64 DefaultRandomSeed;
  613. }