test_runtime.h 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751
  1. #pragma once
  2. #include <library/cpp/actors/core/actor.h>
  3. #include <library/cpp/actors/core/actorsystem.h>
  4. #include <library/cpp/actors/core/log.h>
  5. #include <library/cpp/actors/core/events.h>
  6. #include <library/cpp/actors/core/executor_thread.h>
  7. #include <library/cpp/actors/core/mailbox.h>
  8. #include <library/cpp/actors/core/monotonic_provider.h>
  9. #include <library/cpp/actors/util/should_continue.h>
  10. #include <library/cpp/actors/interconnect/poller_tcp.h>
  11. #include <library/cpp/actors/interconnect/mock/ic_mock.h>
  12. #include <library/cpp/random_provider/random_provider.h>
  13. #include <library/cpp/time_provider/time_provider.h>
  14. #include <library/cpp/testing/unittest/tests_data.h>
  15. #include <util/datetime/base.h>
  16. #include <util/folder/tempdir.h>
  17. #include <util/generic/deque.h>
  18. #include <util/generic/hash.h>
  19. #include <util/generic/noncopyable.h>
  20. #include <util/generic/ptr.h>
  21. #include <util/generic/queue.h>
  22. #include <util/generic/set.h>
  23. #include <util/generic/vector.h>
  24. #include <util/system/defaults.h>
  25. #include <util/system/mutex.h>
  26. #include <util/system/condvar.h>
  27. #include <util/system/thread.h>
  28. #include <util/system/sanitizers.h>
  29. #include <util/system/valgrind.h>
  30. #include <utility>
  31. #include <functional>
  32. const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer(
  33. NValgrind::PlainOrUnderValgrind(TDuration::Seconds(60), TDuration::Seconds(120)),
  34. TDuration::Seconds(120)
  35. );
  36. namespace NActors {
  37. struct THeSingleSystemEnv { };
  38. struct TTestActorSetupCmd { // like TActorSetupCmd, but not owning the Actor
  39. TTestActorSetupCmd(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId)
  40. : MailboxType(mailboxType)
  41. , PoolId(poolId)
  42. , Actor(actor)
  43. {
  44. }
  45. TTestActorSetupCmd(TActorSetupCmd cmd)
  46. : MailboxType(cmd.MailboxType)
  47. , PoolId(cmd.PoolId)
  48. , Actor(cmd.Actor.release())
  49. {
  50. }
  51. TMailboxType::EType MailboxType;
  52. ui32 PoolId;
  53. IActor* Actor;
  54. };
  55. struct TEventMailboxId {
  56. TEventMailboxId()
  57. : NodeId(0)
  58. , Hint(0)
  59. {
  60. }
  61. TEventMailboxId(ui32 nodeId, ui32 hint)
  62. : NodeId(nodeId)
  63. , Hint(hint)
  64. {
  65. }
  66. bool operator<(const TEventMailboxId& other) const {
  67. return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint);
  68. }
  69. bool operator==(const TEventMailboxId& other) const {
  70. return (NodeId == other.NodeId) && (Hint == other.Hint);
  71. }
  72. struct THash {
  73. ui64 operator()(const TEventMailboxId& mboxId) const noexcept {
  74. return mboxId.NodeId * 31ULL + mboxId.Hint;
  75. }
  76. };
  77. ui32 NodeId;
  78. ui32 Hint;
  79. };
  80. struct TDispatchOptions {
  81. struct TFinalEventCondition {
  82. std::function<bool(IEventHandle& ev)> EventCheck;
  83. ui32 RequiredCount;
  84. TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1)
  85. : EventCheck([eventType](IEventHandle& ev) -> bool { return ev.GetTypeRewrite() == eventType; })
  86. , RequiredCount(requiredCount)
  87. {
  88. }
  89. TFinalEventCondition(std::function<bool(IEventHandle& ev)> eventCheck, ui32 requiredCount = 1)
  90. : EventCheck(eventCheck)
  91. , RequiredCount(requiredCount)
  92. {
  93. }
  94. };
  95. TVector<TFinalEventCondition> FinalEvents;
  96. TVector<TEventMailboxId> NonEmptyMailboxes;
  97. TVector<TEventMailboxId> OnlyMailboxes;
  98. std::function<bool()> CustomFinalCondition;
  99. bool Quiet = false;
  100. };
  101. struct TScheduledEventQueueItem {
  102. TInstant Deadline;
  103. TAutoPtr<IEventHandle> Event;
  104. TAutoPtr<TSchedulerCookieHolder> Cookie;
  105. ui64 UniqueId;
  106. TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie)
  107. : Deadline(deadline)
  108. , Event(event)
  109. , Cookie(new TSchedulerCookieHolder(cookie))
  110. , UniqueId(++NextUniqueId)
  111. {}
  112. bool operator<(const TScheduledEventQueueItem& other) const {
  113. if (Deadline < other.Deadline)
  114. return true;
  115. if (Deadline > other.Deadline)
  116. return false;
  117. return UniqueId < other.UniqueId;
  118. }
  119. static ui64 NextUniqueId;
  120. };
  121. typedef TDeque<TAutoPtr<IEventHandle>> TEventsList;
  122. typedef TSet<TScheduledEventQueueItem> TScheduledEventsList;
  123. class TEventMailBox : public TThrRefBase {
  124. public:
  125. TEventMailBox()
  126. : InactiveUntil(TInstant::MicroSeconds(0))
  127. #ifdef DEBUG_ORDER_EVENTS
  128. , ExpectedReceive(0)
  129. , NextToSend(0)
  130. #endif
  131. {
  132. }
  133. void Send(TAutoPtr<IEventHandle> ev);
  134. bool IsEmpty() const;
  135. TAutoPtr<IEventHandle> Pop();
  136. void Capture(TEventsList& evList);
  137. void PushFront(TAutoPtr<IEventHandle>& ev);
  138. void PushFront(TEventsList& evList);
  139. void CaptureScheduled(TScheduledEventsList& evList);
  140. void PushScheduled(TScheduledEventsList& evList);
  141. bool IsActive(const TInstant& currentTime) const;
  142. void Freeze(const TInstant& deadline);
  143. TInstant GetInactiveUntil() const;
  144. void Schedule(const TScheduledEventQueueItem& item);
  145. bool IsScheduledEmpty() const;
  146. TInstant GetFirstScheduleDeadline() const;
  147. ui64 GetSentEventCount() const;
  148. private:
  149. TScheduledEventsList Scheduled;
  150. TInstant InactiveUntil;
  151. TEventsList Sent;
  152. #ifdef DEBUG_ORDER_EVENTS
  153. TMap<IEventHandle*, ui64> TrackSent;
  154. ui64 ExpectedReceive;
  155. ui64 NextToSend;
  156. #endif
  157. };
  158. typedef THashMap<TEventMailboxId, TIntrusivePtr<TEventMailBox>, TEventMailboxId::THash> TEventMailBoxList;
  159. class TEmptyEventQueueException : public yexception {
  160. public:
  161. TEmptyEventQueueException() {
  162. Append("Event queue is still empty.");
  163. }
  164. };
  165. class TSchedulingLimitReachedException : public yexception {
  166. public:
  167. TSchedulingLimitReachedException(ui64 limit) {
  168. TStringStream str;
  169. str << "TestActorRuntime Processed over " << limit << " events.";
  170. Append(str.Str());
  171. }
  172. };
  173. class TTestActorRuntimeBase: public TNonCopyable {
  174. public:
  175. class TEdgeActor;
  176. class TSchedulerThreadStub;
  177. class TExecutorPoolStub;
  178. class TTimeProvider;
  179. class TMonotonicTimeProvider;
  180. enum class EEventAction {
  181. PROCESS,
  182. DROP,
  183. RESCHEDULE
  184. };
  185. typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver;
  186. typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector;
  187. typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter;
  188. typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter;
  189. typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver;
  190. TTestActorRuntimeBase(THeSingleSystemEnv);
  191. TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads);
  192. TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount);
  193. TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false);
  194. virtual ~TTestActorRuntimeBase();
  195. bool IsRealThreads() const;
  196. static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
  197. static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
  198. static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
  199. static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
  200. static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline);
  201. static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId);
  202. TEventObserver SetObserverFunc(TEventObserver observerFunc);
  203. TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc);
  204. TEventFilter SetEventFilter(TEventFilter filterFunc);
  205. TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc);
  206. TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc);
  207. static bool IsVerbose();
  208. static void SetVerbose(bool verbose);
  209. TDuration SetDispatchTimeout(TDuration timeout);
  210. void SetDispatchedEventsLimit(ui64 limit) {
  211. DispatchedEventsLimit = limit;
  212. }
  213. TDuration SetReschedulingDelay(TDuration delay);
  214. void SetLogBackend(const TAutoPtr<TLogBackend> logBackend);
  215. void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority);
  216. TIntrusivePtr<ITimeProvider> GetTimeProvider();
  217. TIntrusivePtr<IMonotonicTimeProvider> GetMonotonicTimeProvider();
  218. TInstant GetCurrentTime() const;
  219. TMonotonic GetCurrentMonotonicTime() const;
  220. void UpdateCurrentTime(TInstant newTime);
  221. void AdvanceCurrentTime(TDuration duration);
  222. void AddLocalService(const TActorId& actorId, TActorSetupCmd cmd, ui32 nodeIndex = 0);
  223. virtual void Initialize();
  224. ui32 GetNodeId(ui32 index = 0) const;
  225. ui32 GetNodeCount() const;
  226. ui64 AllocateLocalId();
  227. ui32 InterconnectPoolId() const;
  228. TString GetTempDir();
  229. TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0,
  230. TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0,
  231. const TActorId& parentid = TActorId());
  232. TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
  233. const TActorId& parentid = TActorId());
  234. TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0);
  235. TActorId AllocateEdgeActor(ui32 nodeIndex = 0);
  236. TEventsList CaptureEvents();
  237. TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId);
  238. TScheduledEventsList CaptureScheduledEvents();
  239. void PushFront(TAutoPtr<IEventHandle>& ev);
  240. void PushEventsFront(TEventsList& events);
  241. void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events);
  242. // doesn't dispatch events for edge actors
  243. bool DispatchEvents(const TDispatchOptions& options = TDispatchOptions());
  244. bool DispatchEvents(const TDispatchOptions& options, TDuration simTimeout);
  245. bool DispatchEvents(const TDispatchOptions& options, TInstant simDeadline);
  246. void Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventBase> ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false);
  247. void Send(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false);
  248. void SendAsync(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex = 0);
  249. void Schedule(TAutoPtr<IEventHandle> ev, const TDuration& duration, ui32 nodeIndex = 0);
  250. void ClearCounters();
  251. ui64 GetCounter(ui32 evType) const;
  252. TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0);
  253. void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max());
  254. TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo);
  255. void BlockOutputForActor(const TActorId& actorId);
  256. IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
  257. void EnableScheduleForActor(const TActorId& actorId, bool allow = true);
  258. bool IsScheduleForActorEnabled(const TActorId& actorId) const;
  259. TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
  260. void SetupMonitoring();
  261. template<typename T>
  262. void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) {
  263. Y_VERIFY(!IsInitialized);
  264. for (const auto& pair : Nodes) {
  265. pair.second->LogSettings->Append(minVal, maxVal, func);
  266. }
  267. }
  268. TIntrusivePtr<NLog::TSettings> GetLogSettings(ui32 nodeIdx)
  269. {
  270. return Nodes[FirstNodeId + nodeIdx]->LogSettings;
  271. }
  272. TActorSystem* SingleSys() const;
  273. TActorSystem* GetAnyNodeActorSystem();
  274. TActorSystem* GetActorSystem(ui32 nodeId);
  275. template <typename TEvent>
  276. TEvent* GrabEdgeEventIf(TAutoPtr<IEventHandle>& handle, std::function<bool(const TEvent&)> predicate, TDuration simTimeout = TDuration::Max()) {
  277. handle.Destroy();
  278. const ui32 eventType = TEvent::EventType;
  279. WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
  280. Y_UNUSED(runtime);
  281. if (event->GetTypeRewrite() != eventType)
  282. return false;
  283. TEvent* typedEvent = event->Get<TEvent>();
  284. if (predicate(*typedEvent)) {
  285. handle = event;
  286. return true;
  287. }
  288. return false;
  289. }, {}, simTimeout);
  290. if (simTimeout == TDuration::Max())
  291. Y_VERIFY(handle);
  292. if (handle) {
  293. return handle->Get<TEvent>();
  294. } else {
  295. return nullptr;
  296. }
  297. }
  298. template<class TEvent>
  299. typename TEvent::TPtr GrabEdgeEventIf(
  300. const TSet<TActorId>& edgeFilter,
  301. const std::function<bool(const typename TEvent::TPtr&)>& predicate,
  302. TDuration simTimeout = TDuration::Max())
  303. {
  304. typename TEvent::TPtr handle;
  305. const ui32 eventType = TEvent::EventType;
  306. WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
  307. Y_UNUSED(runtime);
  308. if (event->GetTypeRewrite() != eventType)
  309. return false;
  310. typename TEvent::TPtr* typedEvent = reinterpret_cast<typename TEvent::TPtr*>(&event);
  311. if (predicate(*typedEvent)) {
  312. handle = *typedEvent;
  313. return true;
  314. }
  315. return false;
  316. }, edgeFilter, simTimeout);
  317. if (simTimeout == TDuration::Max())
  318. Y_VERIFY(handle);
  319. return handle;
  320. }
  321. template<class TEvent>
  322. typename TEvent::TPtr GrabEdgeEventIf(
  323. const TActorId& edgeActor,
  324. const std::function<bool(const typename TEvent::TPtr&)>& predicate,
  325. TDuration simTimeout = TDuration::Max())
  326. {
  327. TSet<TActorId> edgeFilter{edgeActor};
  328. return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout);
  329. }
  330. template <typename TEvent>
  331. TEvent* GrabEdgeEvent(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
  332. std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; };
  333. return GrabEdgeEventIf(handle, truth, simTimeout);
  334. }
  335. template <typename TEvent>
  336. THolder<TEvent> GrabEdgeEvent(TDuration simTimeout = TDuration::Max()) {
  337. TAutoPtr<IEventHandle> handle;
  338. std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; };
  339. GrabEdgeEventIf(handle, truth, simTimeout);
  340. if (handle) {
  341. return THolder<TEvent>(handle->Release<TEvent>());
  342. }
  343. return {};
  344. }
  345. template<class TEvent>
  346. typename TEvent::TPtr GrabEdgeEvent(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) {
  347. return GrabEdgeEventIf<TEvent>(edgeFilter, [](const typename TEvent::TPtr&) { return true; }, simTimeout);
  348. }
  349. template<class TEvent>
  350. typename TEvent::TPtr GrabEdgeEvent(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) {
  351. TSet<TActorId> edgeFilter{edgeActor};
  352. return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout);
  353. }
  354. // replace with std::variant<>
  355. template <typename... TEvents>
  356. std::tuple<TEvents*...> GrabEdgeEvents(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
  357. handle.Destroy();
  358. auto eventTypes = { TEvents::EventType... };
  359. WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
  360. if (std::find(std::begin(eventTypes), std::end(eventTypes), event->GetTypeRewrite()) == std::end(eventTypes))
  361. return false;
  362. handle = event;
  363. return true;
  364. }, {}, simTimeout);
  365. if (simTimeout == TDuration::Max())
  366. Y_VERIFY(handle);
  367. if (handle) {
  368. return std::make_tuple(handle->Type == TEvents::EventType
  369. ? handle->Get<TEvents>()
  370. : static_cast<TEvents*>(nullptr)...);
  371. }
  372. return {};
  373. }
  374. template <typename TEvent>
  375. TEvent* GrabEdgeEventRethrow(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
  376. try {
  377. return GrabEdgeEvent<TEvent>(handle, simTimeout);
  378. } catch (...) {
  379. ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage();
  380. }
  381. }
  382. template<class TEvent>
  383. typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) {
  384. try {
  385. return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout);
  386. } catch (...) {
  387. ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage();
  388. }
  389. }
  390. template<class TEvent>
  391. typename TEvent::TPtr GrabEdgeEventRethrow(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) {
  392. try {
  393. return GrabEdgeEvent<TEvent>(edgeActor, simTimeout);
  394. } catch (...) {
  395. ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage();
  396. }
  397. }
  398. template <typename... TEvents>
  399. static TString TypeNames() {
  400. static TString names[] = { TypeName<TEvents>()... };
  401. TString result;
  402. for (const TString& s : names) {
  403. if (result.empty()) {
  404. result += '<';
  405. } else {
  406. result += ',';
  407. }
  408. result += s;
  409. }
  410. if (!result.empty()) {
  411. result += '>';
  412. }
  413. return result;
  414. }
  415. template <typename... TEvents>
  416. std::tuple<TEvents*...> GrabEdgeEventsRethrow(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
  417. try {
  418. return GrabEdgeEvents<TEvents...>(handle, simTimeout);
  419. } catch (...) {
  420. ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeNames<TEvents...>() << ": " << CurrentExceptionMessage();
  421. }
  422. }
  423. void ResetScheduledCount() {
  424. ScheduledCount = 0;
  425. }
  426. void SetScheduledLimit(ui64 limit) {
  427. ScheduledLimit = limit;
  428. }
  429. void SetDispatcherRandomSeed(TInstant time, ui64 iteration);
  430. TString GetActorName(const TActorId& actorId) const;
  431. const TVector<ui64>& GetTxAllocatorTabletIds() const { return TxAllocatorTabletIds; }
  432. void SetTxAllocatorTabletIds(const TVector<ui64>& ids) { TxAllocatorTabletIds = ids; }
  433. void SetUseRealInterconnect() {
  434. UseRealInterconnect = true;
  435. }
  436. void SetICCommonSetupper(std::function<void(ui32, TIntrusivePtr<TInterconnectProxyCommon>)>&& icCommonSetupper) {
  437. ICCommonSetupper = std::move(icCommonSetupper);
  438. }
  439. protected:
  440. struct TNodeDataBase;
  441. TNodeDataBase* GetRawNode(ui32 node) const {
  442. return Nodes.at(FirstNodeId + node).Get();
  443. }
  444. static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId);
  445. virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) {
  446. Y_UNUSED(counters);
  447. Y_UNUSED(component);
  448. // do nothing, just return the existing counters
  449. return counters;
  450. }
  451. THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node);
  452. THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node);
  453. virtual void InitActorSystemSetup(TActorSystemSetup& setup) {
  454. Y_UNUSED(setup);
  455. }
  456. private:
  457. IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const;
  458. void SendInternal(TAutoPtr<IEventHandle> ev, ui32 nodeIndex, bool viaActorSystem);
  459. TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint);
  460. void ClearMailbox(ui32 nodeId, ui32 hint);
  461. void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId);
  462. void UpdateFinalEventsStatsForEachContext(IEventHandle& ev);
  463. bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline);
  464. private:
  465. ui64 ScheduledCount;
  466. ui64 ScheduledLimit;
  467. THolder<TTempDir> TmpDir;
  468. const TThread::TId MainThreadId;
  469. protected:
  470. bool UseRealInterconnect = false;
  471. TInterconnectMock InterconnectMock;
  472. bool IsInitialized = false;
  473. bool SingleSysEnv = false;
  474. const TString ClusterUUID;
  475. const ui32 FirstNodeId;
  476. const ui32 NodeCount;
  477. const ui32 DataCenterCount;
  478. const bool UseRealThreads;
  479. std::function<void(ui32, TIntrusivePtr<TInterconnectProxyCommon>)> ICCommonSetupper;
  480. ui64 LocalId;
  481. TMutex Mutex;
  482. TCondVar MailboxesHasEvents;
  483. TEventMailBoxList Mailboxes;
  484. TMap<ui32, ui64> EvCounters;
  485. ui64 DispatchCyclesCount;
  486. ui64 DispatchedEventsCount;
  487. ui64 DispatchedEventsLimit = 2'500'000;
  488. TActorId CurrentRecipient;
  489. ui64 DispatcherRandomSeed;
  490. TIntrusivePtr<IRandomProvider> DispatcherRandomProvider;
  491. TAutoPtr<TLogBackend> LogBackend;
  492. bool NeedMonitoring;
  493. TIntrusivePtr<IRandomProvider> RandomProvider;
  494. TIntrusivePtr<ITimeProvider> TimeProvider;
  495. TIntrusivePtr<IMonotonicTimeProvider> MonotonicTimeProvider;
  496. protected:
  497. struct TNodeDataBase: public TThrRefBase {
  498. TNodeDataBase();
  499. void Stop();
  500. virtual ~TNodeDataBase();
  501. virtual ui64 GetLoggerPoolId() const {
  502. return 0;
  503. }
  504. template <typename T = void>
  505. T* GetAppData() {
  506. return static_cast<T*>(AppData0.get());
  507. }
  508. template <typename T = void>
  509. const T* GetAppData() const {
  510. return static_cast<T*>(AppData0.get());
  511. }
  512. TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
  513. TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
  514. TIntrusivePtr<NInterconnect::TPollerThreads> Poller;
  515. volatile ui64* ActorSystemTimestamp;
  516. volatile ui64* ActorSystemMonotonic;
  517. TVector<std::pair<TActorId, TTestActorSetupCmd>> LocalServices;
  518. TMap<TActorId, IActor*> LocalServicesActors;
  519. TMap<IActor*, TActorId> ActorToActorId;
  520. THolder<TMailboxTable> MailboxTable;
  521. std::shared_ptr<void> AppData0;
  522. THolder<TActorSystem> ActorSystem;
  523. THolder<IExecutorPool> SchedulerPool;
  524. TVector<IExecutorPool*> ExecutorPools;
  525. THolder<TExecutorThread> ExecutorThread;
  526. };
  527. struct INodeFactory {
  528. virtual ~INodeFactory() = default;
  529. virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0;
  530. };
  531. struct TDefaultNodeFactory final: INodeFactory {
  532. virtual TIntrusivePtr<TNodeDataBase> CreateNode() override {
  533. return new TNodeDataBase();
  534. }
  535. };
  536. INodeFactory& GetNodeFactory() {
  537. return *NodeFactory;
  538. }
  539. virtual TNodeDataBase* GetNodeById(size_t idx) {
  540. return Nodes[idx].Get();
  541. }
  542. void InitNodes();
  543. void CleanupNodes();
  544. virtual void InitNodeImpl(TNodeDataBase*, size_t);
  545. static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev);
  546. protected:
  547. THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory};
  548. private:
  549. void InitNode(TNodeDataBase* node, size_t idx);
  550. struct TDispatchContext {
  551. const TDispatchOptions* Options;
  552. TDispatchContext* PrevContext;
  553. TMap<const TDispatchOptions::TFinalEventCondition*, ui32> FinalEventFrequency;
  554. TSet<TEventMailboxId> FoundNonEmptyMailboxes;
  555. bool FinalEventFound = false;
  556. };
  557. TProgramShouldContinue ShouldContinue;
  558. TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes;
  559. ui64 CurrentTimestamp;
  560. TSet<TActorId> EdgeActors;
  561. THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox;
  562. TDuration DispatchTimeout;
  563. TDuration ReschedulingDelay;
  564. TEventObserver ObserverFunc;
  565. TScheduledEventsSelector ScheduledEventsSelectorFunc;
  566. TEventFilter EventFilterFunc;
  567. TScheduledEventFilter ScheduledEventFilterFunc;
  568. TRegistrationObserver RegistrationObserver;
  569. TSet<TActorId> BlockedOutput;
  570. TSet<TActorId> ScheduleWhiteList;
  571. THashMap<TActorId, TActorId> ScheduleWhiteListParent;
  572. THashMap<TActorId, TString> ActorNames;
  573. TDispatchContext* CurrentDispatchContext;
  574. TVector<ui64> TxAllocatorTabletIds;
  575. static ui32 NextNodeId;
  576. };
  577. template <typename TEvent>
  578. TEvent* FindEvent(TEventsList& events) {
  579. for (auto& event : events) {
  580. if (event && event->GetTypeRewrite() == TEvent::EventType) {
  581. return event->CastAsLocal<TEvent>();
  582. }
  583. }
  584. return nullptr;
  585. }
  586. template <typename TEvent>
  587. TEvent* FindEvent(TEventsList& events, const std::function<bool(const TEvent&)>& predicate) {
  588. for (auto& event : events) {
  589. if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*event->CastAsLocal<TEvent>())) {
  590. return event->CastAsLocal<TEvent>();
  591. }
  592. }
  593. return nullptr;
  594. }
  595. template <typename TEvent>
  596. TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev) {
  597. ev.Destroy();
  598. for (auto& event : events) {
  599. if (event && event->GetTypeRewrite() == TEvent::EventType) {
  600. ev = event;
  601. return ev->CastAsLocal<TEvent>();
  602. }
  603. }
  604. return nullptr;
  605. }
  606. template <typename TEvent>
  607. TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev,
  608. const std::function<bool(const typename TEvent::TPtr&)>& predicate) {
  609. ev.Destroy();
  610. for (auto& event : events) {
  611. if (event && event->GetTypeRewrite() == TEvent::EventType) {
  612. if (predicate(reinterpret_cast<const typename TEvent::TPtr&>(event))) {
  613. ev = event;
  614. return ev->CastAsLocal<TEvent>();
  615. }
  616. }
  617. }
  618. return nullptr;
  619. }
  620. class IStrandingDecoratorFactory {
  621. public:
  622. virtual ~IStrandingDecoratorFactory() {}
  623. virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0;
  624. };
  625. struct IReplyChecker {
  626. virtual ~IReplyChecker() {}
  627. virtual void OnRequest(IEventHandle *request) = 0;
  628. virtual bool IsWaitingForMoreResponses(IEventHandle *response) = 0;
  629. };
  630. struct TNoneReplyChecker : IReplyChecker {
  631. void OnRequest(IEventHandle*) override {
  632. }
  633. bool IsWaitingForMoreResponses(IEventHandle*) override {
  634. return false;
  635. }
  636. };
  637. using TReplyCheckerCreator = std::function<THolder<IReplyChecker>(void)>;
  638. inline THolder<IReplyChecker> CreateNoneReplyChecker() {
  639. return MakeHolder<TNoneReplyChecker>();
  640. }
  641. TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
  642. TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker);
  643. extern ui64 DefaultRandomSeed;
  644. }