actor.h 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951
  1. #pragma once
  2. #include "actorsystem.h"
  3. #include "event.h"
  4. #include "executor_thread.h"
  5. #include "monotonic.h"
  6. #include "thread_context.h"
  7. #include <library/cpp/actors/actor_type/indexes.h>
  8. #include <library/cpp/actors/util/local_process_key.h>
  9. #include <util/system/tls.h>
  10. #include <util/generic/noncopyable.h>
  11. namespace NActors {
  12. class TActorSystem;
  13. class TMailboxTable;
  14. struct TMailboxHeader;
  15. class TExecutorThread;
  16. class IActor;
  17. class ISchedulerCookie;
  18. class IExecutorPool;
  19. namespace NLog {
  20. struct TSettings;
  21. }
  22. struct TActorContext;
  23. struct TActivationContext;
  24. extern Y_POD_THREAD(TActivationContext*) TlsActivationContext;
  25. struct TActivationContext {
  26. public:
  27. TMailboxHeader& Mailbox;
  28. TExecutorThread& ExecutorThread;
  29. const NHPTimer::STime EventStart;
  30. protected:
  31. explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart)
  32. : Mailbox(mailbox)
  33. , ExecutorThread(executorThread)
  34. , EventStart(eventStart)
  35. {
  36. }
  37. public:
  38. template <ESendingType SendingType = ESendingType::Common>
  39. static bool Send(TAutoPtr<IEventHandle> ev);
  40. template <ESendingType SendingType = ESendingType::Common>
  41. static bool Send(std::unique_ptr<IEventHandle> &&ev);
  42. template <ESendingType SendingType = ESendingType::Common>
  43. static bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient);
  44. template <ESendingType SendingType = ESendingType::Common>
  45. static bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient);
  46. /**
  47. * Schedule one-shot event that will be send at given time point in the future.
  48. *
  49. * @param deadline the wallclock time point in future when event must be send
  50. * @param ev the event to send
  51. * @param cookie cookie that will be piggybacked with event
  52. */
  53. static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
  54. static void Schedule(TInstant deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) {
  55. return Schedule(deadline, TAutoPtr<IEventHandle>(ev.release()), cookie);
  56. }
  57. /**
  58. * Schedule one-shot event that will be send at given time point in the future.
  59. *
  60. * @param deadline the monotonic time point in future when event must be send
  61. * @param ev the event to send
  62. * @param cookie cookie that will be piggybacked with event
  63. */
  64. static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
  65. static void Schedule(TMonotonic deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) {
  66. return Schedule(deadline, TAutoPtr<IEventHandle>(ev.release()), cookie);
  67. }
  68. /**
  69. * Schedule one-shot event that will be send after given delay.
  70. *
  71. * @param delta the time from now to delay event sending
  72. * @param ev the event to send
  73. * @param cookie cookie that will be piggybacked with event
  74. */
  75. static void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
  76. static void Schedule(TDuration delta, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) {
  77. return Schedule(delta, TAutoPtr<IEventHandle>(ev.release()), cookie);
  78. }
  79. static TInstant Now();
  80. static TMonotonic Monotonic();
  81. NLog::TSettings* LoggerSettings() const;
  82. // register new actor in ActorSystem on new fresh mailbox.
  83. template <ESendingType SendingType = ESendingType::Common>
  84. static TActorId Register(IActor* actor, TActorId parentId = TActorId(), TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>());
  85. // Register new actor in ActorSystem on same _mailbox_ as current actor.
  86. // There is one thread per mailbox to execute actor, which mean
  87. // no _cpu core scalability_ for such actors.
  88. // This method of registration can be usefull if multiple actors share
  89. // some memory.
  90. static TActorId RegisterWithSameMailbox(IActor* actor, TActorId parentId);
  91. static const TActorContext& AsActorContext();
  92. static TActorContext ActorContextFor(TActorId id);
  93. static TActorId InterconnectProxy(ui32 nodeid);
  94. static TActorSystem* ActorSystem();
  95. static i64 GetCurrentEventTicks();
  96. static double GetCurrentEventTicksAsSeconds();
  97. };
  98. struct TActorContext: public TActivationContext {
  99. const TActorId SelfID;
  100. using TEventFlags = IEventHandle::TEventFlags;
  101. explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
  102. : TActivationContext(mailbox, executorThread, eventStart)
  103. , SelfID(selfID)
  104. {
  105. }
  106. template <ESendingType SendingType = ESendingType::Common>
  107. bool Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  108. template <ESendingType SendingType = ESendingType::Common>
  109. bool Send(const TActorId& recipient, THolder<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  110. return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId));
  111. }
  112. template <ESendingType SendingType = ESendingType::Common>
  113. bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  114. return Send<SendingType>(recipient, ev.release(), flags, cookie, std::move(traceId));
  115. }
  116. template <ESendingType SendingType = ESendingType::Common>
  117. bool Send(TAutoPtr<IEventHandle> ev) const;
  118. template <ESendingType SendingType = ESendingType::Common>
  119. bool Send(std::unique_ptr<IEventHandle> &&ev) const {
  120. return Send<SendingType>(TAutoPtr<IEventHandle>(ev.release()));
  121. }
  122. template <ESendingType SendingType = ESendingType::Common>
  123. bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const;
  124. template <ESendingType SendingType = ESendingType::Common>
  125. bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const;
  126. TInstant Now() const;
  127. TMonotonic Monotonic() const;
  128. /**
  129. * Schedule one-shot event that will be send at given time point in the future.
  130. *
  131. * @param deadline the wallclock time point in future when event must be send
  132. * @param ev the event to send
  133. * @param cookie cookie that will be piggybacked with event
  134. */
  135. void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  136. /**
  137. * Schedule one-shot event that will be send at given time point in the future.
  138. *
  139. * @param deadline the monotonic time point in future when event must be send
  140. * @param ev the event to send
  141. * @param cookie cookie that will be piggybacked with event
  142. */
  143. void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  144. /**
  145. * Schedule one-shot event that will be send after given delay.
  146. *
  147. * @param delta the time from now to delay event sending
  148. * @param ev the event to send
  149. * @param cookie cookie that will be piggybacked with event
  150. */
  151. void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  152. TActorContext MakeFor(const TActorId& otherId) const {
  153. return TActorContext(Mailbox, ExecutorThread, EventStart, otherId);
  154. }
  155. // register new actor in ActorSystem on new fresh mailbox.
  156. template <ESendingType SendingType = ESendingType::Common>
  157. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const;
  158. // Register new actor in ActorSystem on same _mailbox_ as current actor.
  159. // There is one thread per mailbox to execute actor, which mean
  160. // no _cpu core scalability_ for such actors.
  161. // This method of registration can be usefull if multiple actors share
  162. // some memory.
  163. TActorId RegisterWithSameMailbox(IActor* actor) const;
  164. std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
  165. };
  166. struct TActorIdentity: public TActorId {
  167. using TEventFlags = IEventHandle::TEventFlags;
  168. explicit TActorIdentity(TActorId actorId)
  169. : TActorId(actorId)
  170. {
  171. }
  172. void operator=(TActorId actorId) {
  173. *this = TActorIdentity(actorId);
  174. }
  175. template <ESendingType SendingType = ESendingType::Common>
  176. bool Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  177. bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  178. void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  179. void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  180. void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  181. };
  182. class IActor;
  183. class IActorOps : TNonCopyable {
  184. public:
  185. virtual void Describe(IOutputStream&) const noexcept = 0;
  186. virtual bool Send(const TActorId& recipient, IEventBase*, IEventHandle::TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0;
  187. /**
  188. * Schedule one-shot event that will be send at given time point in the future.
  189. *
  190. * @param deadline the wallclock time point in future when event must be send
  191. * @param ev the event to send
  192. * @param cookie cookie that will be piggybacked with event
  193. */
  194. virtual void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
  195. /**
  196. * Schedule one-shot event that will be send at given time point in the future.
  197. *
  198. * @param deadline the monotonic time point in future when event must be send
  199. * @param ev the event to send
  200. * @param cookie cookie that will be piggybacked with event
  201. */
  202. virtual void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
  203. /**
  204. * Schedule one-shot event that will be send after given delay.
  205. *
  206. * @param delta the time from now to delay event sending
  207. * @param ev the event to send
  208. * @param cookie cookie that will be piggybacked with event
  209. */
  210. virtual void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
  211. virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0;
  212. virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0;
  213. };
  214. class TDecorator;
  215. class TActorVirtualBehaviour {
  216. public:
  217. static void Receive(IActor* actor, std::unique_ptr<IEventHandle> ev);
  218. public:
  219. };
  220. class TActorCallbackBehaviour {
  221. private:
  222. using TBase = IActor;
  223. friend class TDecorator;
  224. public:
  225. using TReceiveFunc = void (IActor::*)(TAutoPtr<IEventHandle>& ev);
  226. private:
  227. TReceiveFunc StateFunc = nullptr;
  228. public:
  229. TActorCallbackBehaviour() = default;
  230. TActorCallbackBehaviour(TReceiveFunc stateFunc)
  231. : StateFunc(stateFunc) {
  232. }
  233. bool Initialized() const {
  234. return !!StateFunc;
  235. }
  236. // NOTE: exceptions must not escape state function but if an exception hasn't be caught
  237. // by the actor then we want to crash an see the stack
  238. void Receive(IActor* actor, TAutoPtr<IEventHandle>& ev);
  239. template <typename T>
  240. void Become(T stateFunc) {
  241. StateFunc = static_cast<TReceiveFunc>(stateFunc);
  242. }
  243. template <typename T, typename... TArgs>
  244. void Become(T stateFunc, const TActorContext& ctx, TArgs&&... args) {
  245. StateFunc = static_cast<TReceiveFunc>(stateFunc);
  246. ctx.Schedule(std::forward<TArgs>(args)...);
  247. }
  248. TReceiveFunc CurrentStateFunc() const {
  249. return StateFunc;
  250. }
  251. };
  252. template<bool>
  253. struct TActorUsageImpl {
  254. void OnEnqueueEvent(ui64 /*time*/) {} // called asynchronously when event is put in the mailbox
  255. void OnDequeueEvent() {} // called when processed by Executor
  256. double GetUsage(ui64 /*time*/) { return 0; } // called from collector thread
  257. void DoActorInit() {}
  258. };
  259. template<>
  260. struct TActorUsageImpl<true> {
  261. static constexpr int TimestampBits = 40;
  262. static constexpr int CountBits = 24;
  263. static constexpr ui64 TimestampMask = ((ui64)1 << TimestampBits) - 1;
  264. static constexpr ui64 CountMask = ((ui64)1 << CountBits) - 1;
  265. std::atomic_uint64_t QueueSizeAndTimestamp = 0;
  266. std::atomic_uint64_t UsedTime = 0; // how much time did we consume since last GetUsage() call
  267. ui64 LastUsageTimestamp = 0; // when GetUsage() was called the last time
  268. void OnEnqueueEvent(ui64 time);
  269. void OnDequeueEvent();
  270. double GetUsage(ui64 time);
  271. void DoActorInit() { LastUsageTimestamp = GetCycleCountFast(); }
  272. };
  273. class IActor
  274. : protected IActorOps
  275. , public TActorUsageImpl<ActorLibCollectUsageStats>
  276. {
  277. private:
  278. TActorIdentity SelfActorId;
  279. i64 ElapsedTicks;
  280. friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
  281. friend class TDecorator;
  282. private: // stuck actor monitoring
  283. TMonotonic LastReceiveTimestamp;
  284. size_t StuckIndex = Max<size_t>();
  285. friend class TExecutorPoolBaseMailboxed;
  286. friend class TExecutorThread;
  287. IActor(const ui32 activityType)
  288. : SelfActorId(TActorId())
  289. , ElapsedTicks(0)
  290. , ActivityType(activityType)
  291. , HandledEvents(0) {
  292. }
  293. protected:
  294. TActorCallbackBehaviour CImpl;
  295. public:
  296. using TEventFlags = IEventHandle::TEventFlags;
  297. using TReceiveFunc = TActorCallbackBehaviour::TReceiveFunc;
  298. /// @sa services.proto NKikimrServices::TActivity::EType
  299. using EActorActivity = EInternalActorType;
  300. using EActivityType = EActorActivity;
  301. ui32 ActivityType;
  302. protected:
  303. ui64 HandledEvents;
  304. template <typename EEnum = EActivityType, typename std::enable_if<std::is_enum<EEnum>::value, bool>::type v = true>
  305. IActor(const EEnum activityEnumType = EActivityType::OTHER)
  306. : IActor(TEnumProcessKey<TActorActivityTag, EEnum>::GetIndex(activityEnumType)) {
  307. }
  308. IActor(TActorCallbackBehaviour&& cImpl, const ui32 activityType)
  309. : SelfActorId(TActorId())
  310. , ElapsedTicks(0)
  311. , CImpl(std::move(cImpl))
  312. , ActivityType(activityType)
  313. , HandledEvents(0)
  314. {
  315. }
  316. template <typename EEnum = EActivityType, typename std::enable_if<std::is_enum<EEnum>::value, bool>::type v = true>
  317. IActor(TActorCallbackBehaviour&& cImpl, const EEnum activityEnumType = EActivityType::OTHER)
  318. : IActor(std::move(cImpl), TEnumProcessKey<TActorActivityTag, EEnum>::GetIndex(activityEnumType)) {
  319. }
  320. public:
  321. template <class TEventBase>
  322. class TEventSenderFromActor: ::TNonCopyable {
  323. private:
  324. TEventFlags Flags = 0;
  325. ui64 Cookie = 0;
  326. const TActorIdentity SenderId;
  327. NWilson::TTraceId TraceId = {};
  328. std::unique_ptr<TEventBase> Event;
  329. public:
  330. template <class... Types>
  331. TEventSenderFromActor(const IActor* owner, Types&&... args)
  332. : SenderId(owner->SelfId())
  333. , Event(new TEventBase(std::forward<Types>(args)...)) {
  334. }
  335. TEventSenderFromActor& SetFlags(const TEventFlags flags) {
  336. Flags = flags;
  337. return *this;
  338. }
  339. TEventSenderFromActor& SetCookie(const ui64 value) {
  340. Cookie = value;
  341. return *this;
  342. }
  343. TEventSenderFromActor& SetTraceId(NWilson::TTraceId&& value) {
  344. TraceId = std::move(value);
  345. return *this;
  346. }
  347. bool SendTo(const TActorId& recipient) {
  348. return SenderId.Send(recipient, Event.release(), Flags, Cookie, std::move(TraceId));
  349. }
  350. };
  351. template <class TEvent, class... Types>
  352. TEventSenderFromActor<TEvent> Sender(Types&&... args) const {
  353. return TEventSenderFromActor<TEvent>(this, std::forward<Types>(args)...);
  354. }
  355. virtual ~IActor() {
  356. } // must not be called for registered actors, see Die method instead
  357. protected:
  358. virtual void Die(const TActorContext& ctx); // would unregister actor so call exactly once and only from inside of message processing
  359. virtual void PassAway();
  360. protected:
  361. void SetActivityType(ui32 activityType) {
  362. ActivityType = activityType;
  363. }
  364. public:
  365. class TPassAwayGuard: TMoveOnly {
  366. private:
  367. IActor* Owner = nullptr;
  368. public:
  369. TPassAwayGuard(TPassAwayGuard&& item) {
  370. Owner = item.Owner;
  371. item.Owner = nullptr;
  372. }
  373. TPassAwayGuard(IActor* owner)
  374. : Owner(owner)
  375. {
  376. }
  377. ~TPassAwayGuard() {
  378. if (Owner) {
  379. Owner->PassAway();
  380. }
  381. }
  382. };
  383. TPassAwayGuard PassAwayGuard() {
  384. return TPassAwayGuard(this);
  385. }
  386. // must be called to wrap any call trasitions from one actor to another
  387. template<typename TActor, typename TMethod, typename... TArgs>
  388. static std::invoke_result_t<TMethod, TActor, TArgs...> InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) {
  389. struct TRecurseContext : TActorContext {
  390. TActivationContext* const Prev;
  391. TRecurseContext(const TActorId& actorId)
  392. : TActorContext(TActivationContext::ActorContextFor(actorId))
  393. , Prev(TlsActivationContext)
  394. {
  395. TlsActivationContext = this;
  396. }
  397. ~TRecurseContext() {
  398. Y_VERIFY(TlsActivationContext == this, "TlsActivationContext mismatch; probably InvokeOtherActor was invoked from a coroutine");
  399. TlsActivationContext = Prev;
  400. }
  401. } context(actor.SelfId());
  402. return std::invoke(std::forward<TMethod>(method), actor, std::forward<TArgs>(args)...);
  403. }
  404. virtual void Registered(TActorSystem* sys, const TActorId& owner);
  405. virtual TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) {
  406. Y_UNUSED(self);
  407. Y_UNUSED(parentId);
  408. return TAutoPtr<IEventHandle>();
  409. }
  410. i64 GetElapsedTicks() const {
  411. return ElapsedTicks;
  412. }
  413. double GetElapsedTicksAsSeconds() const;
  414. void AddElapsedTicks(i64 ticks) {
  415. ElapsedTicks += ticks;
  416. }
  417. ui32 GetActivityType() const {
  418. return ActivityType;
  419. }
  420. ui64 GetHandledEvents() const {
  421. return HandledEvents;
  422. }
  423. TActorIdentity SelfId() const {
  424. return SelfActorId;
  425. }
  426. void Receive(TAutoPtr<IEventHandle>& ev) {
  427. ++HandledEvents;
  428. LastReceiveTimestamp = TActivationContext::Monotonic();
  429. if (CImpl.Initialized()) {
  430. CImpl.Receive(this, ev);
  431. } else {
  432. TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandle>(ev.Release()));
  433. }
  434. }
  435. TActorContext ActorContext() const {
  436. return TActivationContext::ActorContextFor(SelfId());
  437. }
  438. protected:
  439. void SetEnoughCpu(bool isEnough) {
  440. if (TlsThreadContext) {
  441. TlsThreadContext->IsEnoughCpu = isEnough;
  442. }
  443. }
  444. void Describe(IOutputStream&) const noexcept override;
  445. bool Send(TAutoPtr<IEventHandle> ev) const noexcept;
  446. bool Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
  447. bool Send(const TActorId& recipient, THolder<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
  448. return Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
  449. }
  450. bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  451. return Send(recipient, ev.release(), flags, cookie, std::move(traceId));
  452. }
  453. template <class TEvent, class ... TEventArgs>
  454. bool Send(TActorId recipient, TEventArgs&& ... args) const {
  455. return Send(recipient, MakeHolder<TEvent>(std::forward<TEventArgs>(args)...));
  456. }
  457. template <ESendingType SendingType>
  458. bool Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  459. template <ESendingType SendingType>
  460. bool Send(const TActorId& recipient, THolder<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  461. return Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
  462. }
  463. template <ESendingType SendingType>
  464. bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  465. return Send(recipient, ev.release(), flags, cookie, std::move(traceId));
  466. }
  467. static bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) {
  468. return TActivationContext::Forward(ev, recipient);
  469. }
  470. static bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
  471. return TActivationContext::Forward(ev, recipient);
  472. }
  473. template <typename TEventHandle>
  474. static bool Forward(TAutoPtr<TEventHandle>& ev, const TActorId& recipient) {
  475. TAutoPtr<IEventHandle> evi(ev.Release());
  476. return TActivationContext::Forward(evi, recipient);
  477. }
  478. void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
  479. void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
  480. void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
  481. // register new actor in ActorSystem on new fresh mailbox.
  482. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final;
  483. template <ESendingType SendingType>
  484. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept;
  485. // Register new actor in ActorSystem on same _mailbox_ as current actor.
  486. // There is one thread per mailbox to execute actor, which mean
  487. // no _cpu core scalability_ for such actors.
  488. // This method of registration can be useful if multiple actors share
  489. // some memory.
  490. TActorId RegisterWithSameMailbox(IActor* actor) const noexcept final;
  491. std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
  492. private:
  493. void ChangeSelfId(TActorId actorId) {
  494. SelfActorId = actorId;
  495. }
  496. };
  497. inline size_t GetActivityTypeCount() {
  498. return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetCount();
  499. }
  500. inline TStringBuf GetActivityTypeName(size_t index) {
  501. return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index);
  502. }
  503. class IActorCallback: public IActor {
  504. protected:
  505. template <class TEnum = IActor::EActivityType>
  506. IActorCallback(TReceiveFunc stateFunc, const TEnum activityType = IActor::EActivityType::OTHER)
  507. : IActor(TActorCallbackBehaviour(stateFunc), activityType) {
  508. }
  509. IActorCallback(TReceiveFunc stateFunc, const ui32 activityType)
  510. : IActor(TActorCallbackBehaviour(stateFunc), activityType) {
  511. }
  512. public:
  513. template <typename T>
  514. void Become(T stateFunc) {
  515. CImpl.Become(stateFunc);
  516. }
  517. template <typename T, typename... TArgs>
  518. void Become(T stateFunc, const TActorContext& ctx, TArgs&&... args) {
  519. CImpl.Become(stateFunc, ctx, std::forward<TArgs>(args)...);
  520. }
  521. template <typename T, typename... TArgs>
  522. void Become(T stateFunc, TArgs&&... args) {
  523. CImpl.Become(stateFunc);
  524. Schedule(std::forward<TArgs>(args)...);
  525. }
  526. TReceiveFunc CurrentStateFunc() const {
  527. return CImpl.CurrentStateFunc();
  528. }
  529. };
  530. template <typename TDerived>
  531. class TActor: public IActorCallback {
  532. private:
  533. template <typename T, typename = const char*>
  534. struct HasActorName: std::false_type {};
  535. template <typename T>
  536. struct HasActorName<T, decltype((void)T::ActorName, (const char*)nullptr)>: std::true_type {};
  537. template <typename T, typename = const char*>
  538. struct HasActorActivityType: std::false_type {};
  539. template <typename T>
  540. struct HasActorActivityType<T, decltype((void)T::ActorActivityType, (const char*)nullptr)>: std::true_type {};
  541. static ui32 GetActivityTypeIndexImpl() {
  542. if constexpr(HasActorName<TDerived>::value) {
  543. return TLocalProcessKey<TActorActivityTag, TDerived::ActorName>::GetIndex();
  544. } else if constexpr (HasActorActivityType<TDerived>::value) {
  545. using TActorActivity = decltype(((TDerived*)nullptr)->ActorActivityType());
  546. static_assert(std::is_enum<TActorActivity>::value);
  547. return TEnumProcessKey<TActorActivityTag, TActorActivity>::GetIndex(TDerived::ActorActivityType());
  548. } else {
  549. // 200 characters is limit for solomon metric tag length
  550. return TLocalProcessExtKey<TActorActivityTag, TDerived, 200>::GetIndex();
  551. }
  552. }
  553. static ui32 GetActivityTypeIndex() {
  554. static const ui32 result = GetActivityTypeIndexImpl();
  555. return result;
  556. }
  557. protected:
  558. // static constexpr char ActorName[] = "UNNAMED";
  559. TActor(void (TDerived::* func)(TAutoPtr<IEventHandle>& ev))
  560. : IActorCallback(static_cast<TReceiveFunc>(func), GetActivityTypeIndex()) {
  561. }
  562. template <class TEnum = EActivityType>
  563. TActor(void (TDerived::* func)(TAutoPtr<IEventHandle>& ev), const TEnum activityEnumType = EActivityType::OTHER)
  564. : IActorCallback(static_cast<TReceiveFunc>(func), activityEnumType) {
  565. }
  566. TActor(void (TDerived::* func)(TAutoPtr<IEventHandle>& ev), const TString& actorName)
  567. : IActorCallback(static_cast<TReceiveFunc>(func), TLocalProcessKeyState<TActorActivityTag>::GetInstance().Register(actorName)) {
  568. }
  569. public:
  570. typedef TDerived TThis;
  571. };
  572. #define STFUNC_SIG TAutoPtr<::NActors::IEventHandle>& ev
  573. #define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev
  574. #define STFUNC(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev)
  575. #define STATEFN(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev)
  576. #define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_VERIFY_DEBUG(false, "%s: unexpected message type 0x%08" PRIx32, __func__, etype);
  577. #define STFUNC_BODY(HANDLERS, UNHANDLED_MSG_HANDLER) \
  578. switch (const ui32 etype = ev->GetTypeRewrite()) { \
  579. HANDLERS \
  580. default: \
  581. UNHANDLED_MSG_HANDLER \
  582. }
  583. #define STRICT_STFUNC_BODY(HANDLERS) STFUNC_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER)
  584. #define STRICT_STFUNC(NAME, HANDLERS) \
  585. void NAME(STFUNC_SIG) { \
  586. STRICT_STFUNC_BODY(HANDLERS) \
  587. }
  588. #define STRICT_STFUNC_EXC(NAME, HANDLERS, EXCEPTION_HANDLERS) \
  589. void NAME(STFUNC_SIG) { \
  590. try { \
  591. STRICT_STFUNC_BODY(HANDLERS) \
  592. } \
  593. EXCEPTION_HANDLERS \
  594. }
  595. inline const TActorContext& TActivationContext::AsActorContext() {
  596. TActivationContext* tls = TlsActivationContext;
  597. return *static_cast<TActorContext*>(tls);
  598. }
  599. inline TActorContext TActivationContext::ActorContextFor(TActorId id) {
  600. auto& tls = *TlsActivationContext;
  601. return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id);
  602. }
  603. class TDecorator : public IActorCallback {
  604. protected:
  605. THolder<IActor> Actor;
  606. public:
  607. TDecorator(THolder<IActor>&& actor)
  608. : IActorCallback(static_cast<TReceiveFunc>(&TDecorator::State), actor->GetActivityType())
  609. , Actor(std::move(actor))
  610. {
  611. }
  612. void Registered(TActorSystem* sys, const TActorId& owner) override {
  613. Actor->ChangeSelfId(SelfId());
  614. Actor->Registered(sys, owner);
  615. }
  616. virtual bool DoBeforeReceiving(TAutoPtr<IEventHandle>& /*ev*/, const TActorContext& /*ctx*/) {
  617. return true;
  618. }
  619. virtual void DoAfterReceiving(const TActorContext& /*ctx*/)
  620. {
  621. }
  622. STFUNC(State) {
  623. auto ctx(ActorContext());
  624. if (DoBeforeReceiving(ev, ctx)) {
  625. Actor->Receive(ev);
  626. DoAfterReceiving(ctx);
  627. }
  628. }
  629. };
  630. // TTestDecorator doesn't work with the real actor system
  631. struct TTestDecorator : public TDecorator {
  632. TTestDecorator(THolder<IActor>&& actor)
  633. : TDecorator(std::move(actor))
  634. {
  635. }
  636. virtual ~TTestDecorator() = default;
  637. // This method must be called in the test actor system
  638. bool BeforeSending(TAutoPtr<IEventHandle>& ev)
  639. {
  640. bool send = true;
  641. TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(Actor.Get());
  642. if (decorator) {
  643. send = decorator->BeforeSending(ev);
  644. }
  645. return send && ev && DoBeforeSending(ev);
  646. }
  647. virtual bool DoBeforeSending(TAutoPtr<IEventHandle>& /*ev*/) {
  648. return true;
  649. }
  650. };
  651. template <ESendingType SendingType>
  652. bool TExecutorThread::Send(TAutoPtr<IEventHandle> ev) {
  653. #ifdef USE_ACTOR_CALLSTACK
  654. do {
  655. (ev)->Callstack = TCallstack::GetTlsCallstack();
  656. (ev)->Callstack.Trace();
  657. } while (false)
  658. #endif
  659. Ctx.IncrementSentEvents();
  660. return ActorSystem->Send<SendingType>(ev);
  661. }
  662. template <ESendingType SendingType>
  663. TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId,
  664. TActorId parentId)
  665. {
  666. if (!parentId) {
  667. parentId = CurrentRecipient;
  668. }
  669. if (poolId == Max<ui32>()) {
  670. if constexpr (SendingType == ESendingType::Common) {
  671. return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
  672. } else if (!TlsThreadContext) {
  673. return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
  674. } else {
  675. ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
  676. TActorId id = Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
  677. TlsThreadContext->SendingType = previousType;
  678. return id;
  679. }
  680. } else {
  681. return ActorSystem->Register<SendingType>(actor, mailboxType, poolId, ++RevolvingWriteCounter, parentId);
  682. }
  683. }
  684. template <ESendingType SendingType>
  685. TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) {
  686. if (!parentId) {
  687. parentId = CurrentRecipient;
  688. }
  689. if constexpr (SendingType == ESendingType::Common) {
  690. return Ctx.Executor->Register(actor, mailbox, hint, parentId);
  691. } else if (!TlsActivationContext) {
  692. return Ctx.Executor->Register(actor, mailbox, hint, parentId);
  693. } else {
  694. ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
  695. TActorId id = Ctx.Executor->Register(actor, mailbox, hint, parentId);
  696. TlsThreadContext->SendingType = previousType;
  697. return id;
  698. }
  699. }
  700. template <ESendingType SendingType>
  701. bool TActivationContext::Send(TAutoPtr<IEventHandle> ev) {
  702. return TlsActivationContext->ExecutorThread.Send<SendingType>(ev);
  703. }
  704. template <ESendingType SendingType>
  705. bool TActivationContext::Send(std::unique_ptr<IEventHandle> &&ev) {
  706. return TlsActivationContext->ExecutorThread.Send<SendingType>(ev.release());
  707. }
  708. template <ESendingType SendingType>
  709. bool TActivationContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) {
  710. return Send(IEventHandle::Forward(ev, recipient));
  711. }
  712. template <ESendingType SendingType>
  713. bool TActivationContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
  714. return Send(IEventHandle::Forward(ev, recipient));
  715. }
  716. template <ESendingType SendingType>
  717. bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags, ui64 cookie, NWilson::TTraceId traceId) const {
  718. return Send<SendingType>(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
  719. }
  720. template <ESendingType SendingType>
  721. bool TActorContext::Send(TAutoPtr<IEventHandle> ev) const {
  722. return ExecutorThread.Send<SendingType>(ev);
  723. }
  724. template <ESendingType SendingType>
  725. bool TActorContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const {
  726. return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient));
  727. }
  728. template <ESendingType SendingType>
  729. bool TActorContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const {
  730. return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient));
  731. }
  732. template <ESendingType SendingType>
  733. TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) {
  734. return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, parentId);
  735. }
  736. template <ESendingType SendingType>
  737. TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const {
  738. return ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfID);
  739. }
  740. template <ESendingType SendingType>
  741. bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags, ui64 cookie, NWilson::TTraceId traceId) const {
  742. return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
  743. }
  744. template <ESendingType SendingType>
  745. bool IActor::Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags, ui64 cookie, NWilson::TTraceId traceId) const {
  746. return SelfActorId.Send<SendingType>(recipient, ev, flags, cookie, std::move(traceId));
  747. }
  748. template <ESendingType SendingType>
  749. TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
  750. Y_VERIFY(actor);
  751. return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfActorId);
  752. }
  753. template <ESendingType SendingType>
  754. TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool,
  755. ui64 revolvingCounter, const TActorId& parentId) {
  756. Y_VERIFY(actor);
  757. Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32,
  758. (ui32)executorPool, (ui32)ExecutorPoolCount);
  759. if constexpr (SendingType == ESendingType::Common) {
  760. return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
  761. } else if (!TlsThreadContext) {
  762. return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
  763. } else {
  764. ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
  765. TActorId id = CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
  766. TlsThreadContext->SendingType = previousType;
  767. return id;
  768. }
  769. }
  770. template <ESendingType SendingType>
  771. bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const {
  772. if constexpr (SendingType == ESendingType::Common) {
  773. return this->GenericSend< &IExecutorPool::Send>(ev);
  774. } else {
  775. return this->SpecificSend(ev, SendingType);
  776. }
  777. }
  778. }
  779. template <>
  780. inline void Out<NActors::TActorIdentity>(IOutputStream& o, const NActors::TActorIdentity& x) {
  781. return x.Out(o);
  782. }
  783. template <>
  784. struct THash<NActors::TActorIdentity> {
  785. inline ui64 operator()(const NActors::TActorIdentity& x) const {
  786. return x.Hash();
  787. }
  788. };
  789. template<> struct std::hash<NActors::TActorIdentity> : THash<NActors::TActorIdentity> {};