actor.h 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999
  1. #pragma once
  2. #include "actorsystem.h"
  3. #include "event.h"
  4. #include "executor_thread.h"
  5. #include "monotonic.h"
  6. #include "thread_context.h"
  7. #include <library/cpp/actors/actor_type/indexes.h>
  8. #include <library/cpp/actors/util/local_process_key.h>
  9. #include <util/system/tls.h>
  10. #include <util/generic/noncopyable.h>
  11. namespace NActors {
  12. class TActorSystem;
  13. class TMailboxTable;
  14. struct TMailboxHeader;
  15. class TExecutorThread;
  16. class IActor;
  17. class ISchedulerCookie;
  18. class IExecutorPool;
  19. namespace NLog {
  20. struct TSettings;
  21. }
  22. struct TActorContext;
  23. struct TActivationContext;
  24. class TActivationContextHolder {
  25. static thread_local TActivationContext *Value;
  26. public:
  27. [[gnu::noinline]] operator bool() const;
  28. [[gnu::noinline]] operator TActivationContext*() const;
  29. [[gnu::noinline]] TActivationContext *operator ->();
  30. [[gnu::noinline]] TActivationContext& operator *();
  31. [[gnu::noinline]] TActivationContextHolder& operator=(TActivationContext *context);
  32. };
  33. extern TActivationContextHolder TlsActivationContext;
  34. struct TActivationContext {
  35. public:
  36. TMailboxHeader& Mailbox;
  37. TExecutorThread& ExecutorThread;
  38. const NHPTimer::STime EventStart;
  39. protected:
  40. explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart)
  41. : Mailbox(mailbox)
  42. , ExecutorThread(executorThread)
  43. , EventStart(eventStart)
  44. {
  45. }
  46. public:
  47. template <ESendingType SendingType = ESendingType::Common>
  48. static bool Send(TAutoPtr<IEventHandle> ev);
  49. template <ESendingType SendingType = ESendingType::Common>
  50. static bool Send(std::unique_ptr<IEventHandle> &&ev);
  51. template <ESendingType SendingType = ESendingType::Common>
  52. static bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient);
  53. template <ESendingType SendingType = ESendingType::Common>
  54. static bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient);
  55. /**
  56. * Schedule one-shot event that will be send at given time point in the future.
  57. *
  58. * @param deadline the wallclock time point in future when event must be send
  59. * @param ev the event to send
  60. * @param cookie cookie that will be piggybacked with event
  61. */
  62. static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
  63. static void Schedule(TInstant deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) {
  64. return Schedule(deadline, TAutoPtr<IEventHandle>(ev.release()), cookie);
  65. }
  66. /**
  67. * Schedule one-shot event that will be send at given time point in the future.
  68. *
  69. * @param deadline the monotonic time point in future when event must be send
  70. * @param ev the event to send
  71. * @param cookie cookie that will be piggybacked with event
  72. */
  73. static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
  74. static void Schedule(TMonotonic deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) {
  75. return Schedule(deadline, TAutoPtr<IEventHandle>(ev.release()), cookie);
  76. }
  77. /**
  78. * Schedule one-shot event that will be send after given delay.
  79. *
  80. * @param delta the time from now to delay event sending
  81. * @param ev the event to send
  82. * @param cookie cookie that will be piggybacked with event
  83. */
  84. static void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
  85. static void Schedule(TDuration delta, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) {
  86. return Schedule(delta, TAutoPtr<IEventHandle>(ev.release()), cookie);
  87. }
  88. static TInstant Now();
  89. static TMonotonic Monotonic();
  90. NLog::TSettings* LoggerSettings() const;
  91. // register new actor in ActorSystem on new fresh mailbox.
  92. template <ESendingType SendingType = ESendingType::Common>
  93. static TActorId Register(IActor* actor, TActorId parentId = TActorId(), TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>());
  94. // Register new actor in ActorSystem on same _mailbox_ as current actor.
  95. // There is one thread per mailbox to execute actor, which mean
  96. // no _cpu core scalability_ for such actors.
  97. // This method of registration can be usefull if multiple actors share
  98. // some memory.
  99. static TActorId RegisterWithSameMailbox(IActor* actor, TActorId parentId);
  100. static const TActorContext& AsActorContext();
  101. static TActorContext ActorContextFor(TActorId id);
  102. static TActorId InterconnectProxy(ui32 nodeid);
  103. static TActorSystem* ActorSystem();
  104. static i64 GetCurrentEventTicks();
  105. static double GetCurrentEventTicksAsSeconds();
  106. };
  107. struct TActorContext: public TActivationContext {
  108. const TActorId SelfID;
  109. using TEventFlags = IEventHandle::TEventFlags;
  110. explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
  111. : TActivationContext(mailbox, executorThread, eventStart)
  112. , SelfID(selfID)
  113. {
  114. }
  115. template <ESendingType SendingType = ESendingType::Common>
  116. bool Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  117. template <ESendingType SendingType = ESendingType::Common>
  118. bool Send(const TActorId& recipient, THolder<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  119. return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId));
  120. }
  121. template <ESendingType SendingType = ESendingType::Common>
  122. bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  123. return Send<SendingType>(recipient, ev.release(), flags, cookie, std::move(traceId));
  124. }
  125. template <ESendingType SendingType = ESendingType::Common>
  126. bool Send(TAutoPtr<IEventHandle> ev) const;
  127. template <ESendingType SendingType = ESendingType::Common>
  128. bool Send(std::unique_ptr<IEventHandle> &&ev) const {
  129. return Send<SendingType>(TAutoPtr<IEventHandle>(ev.release()));
  130. }
  131. template <ESendingType SendingType = ESendingType::Common>
  132. bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const;
  133. template <ESendingType SendingType = ESendingType::Common>
  134. bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const;
  135. TInstant Now() const;
  136. TMonotonic Monotonic() const;
  137. /**
  138. * Schedule one-shot event that will be send at given time point in the future.
  139. *
  140. * @param deadline the wallclock time point in future when event must be send
  141. * @param ev the event to send
  142. * @param cookie cookie that will be piggybacked with event
  143. */
  144. void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  145. /**
  146. * Schedule one-shot event that will be send at given time point in the future.
  147. *
  148. * @param deadline the monotonic time point in future when event must be send
  149. * @param ev the event to send
  150. * @param cookie cookie that will be piggybacked with event
  151. */
  152. void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  153. /**
  154. * Schedule one-shot event that will be send after given delay.
  155. *
  156. * @param delta the time from now to delay event sending
  157. * @param ev the event to send
  158. * @param cookie cookie that will be piggybacked with event
  159. */
  160. void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  161. TActorContext MakeFor(const TActorId& otherId) const {
  162. return TActorContext(Mailbox, ExecutorThread, EventStart, otherId);
  163. }
  164. // register new actor in ActorSystem on new fresh mailbox.
  165. template <ESendingType SendingType = ESendingType::Common>
  166. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const;
  167. // Register new actor in ActorSystem on same _mailbox_ as current actor.
  168. // There is one thread per mailbox to execute actor, which mean
  169. // no _cpu core scalability_ for such actors.
  170. // This method of registration can be usefull if multiple actors share
  171. // some memory.
  172. TActorId RegisterWithSameMailbox(IActor* actor) const;
  173. std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
  174. };
  175. struct TActorIdentity: public TActorId {
  176. using TEventFlags = IEventHandle::TEventFlags;
  177. explicit TActorIdentity(TActorId actorId)
  178. : TActorId(actorId)
  179. {
  180. }
  181. void operator=(TActorId actorId) {
  182. *this = TActorIdentity(actorId);
  183. }
  184. template <ESendingType SendingType = ESendingType::Common>
  185. bool Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  186. bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  187. void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  188. void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  189. void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  190. };
  191. class IActor;
  192. class IActorOps : TNonCopyable {
  193. public:
  194. virtual void Describe(IOutputStream&) const noexcept = 0;
  195. virtual bool Send(const TActorId& recipient, IEventBase*, IEventHandle::TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0;
  196. /**
  197. * Schedule one-shot event that will be send at given time point in the future.
  198. *
  199. * @param deadline the wallclock time point in future when event must be send
  200. * @param ev the event to send
  201. * @param cookie cookie that will be piggybacked with event
  202. */
  203. virtual void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
  204. /**
  205. * Schedule one-shot event that will be send at given time point in the future.
  206. *
  207. * @param deadline the monotonic time point in future when event must be send
  208. * @param ev the event to send
  209. * @param cookie cookie that will be piggybacked with event
  210. */
  211. virtual void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
  212. /**
  213. * Schedule one-shot event that will be send after given delay.
  214. *
  215. * @param delta the time from now to delay event sending
  216. * @param ev the event to send
  217. * @param cookie cookie that will be piggybacked with event
  218. */
  219. virtual void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
  220. virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0;
  221. virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0;
  222. };
  223. class TDecorator;
  224. class TActorVirtualBehaviour {
  225. public:
  226. static void Receive(IActor* actor, std::unique_ptr<IEventHandle> ev);
  227. public:
  228. };
  229. class TActorCallbackBehaviour {
  230. private:
  231. using TBase = IActor;
  232. friend class TDecorator;
  233. public:
  234. using TReceiveFunc = void (IActor::*)(TAutoPtr<IEventHandle>& ev);
  235. private:
  236. TReceiveFunc StateFunc = nullptr;
  237. public:
  238. TActorCallbackBehaviour() = default;
  239. TActorCallbackBehaviour(TReceiveFunc stateFunc)
  240. : StateFunc(stateFunc) {
  241. }
  242. bool Initialized() const {
  243. return !!StateFunc;
  244. }
  245. // NOTE: exceptions must not escape state function but if an exception hasn't be caught
  246. // by the actor then we want to crash an see the stack
  247. void Receive(IActor* actor, TAutoPtr<IEventHandle>& ev);
  248. template <typename T>
  249. void Become(T stateFunc) {
  250. StateFunc = static_cast<TReceiveFunc>(stateFunc);
  251. }
  252. template <typename T, typename... TArgs>
  253. void Become(T stateFunc, const TActorContext& ctx, TArgs&&... args) {
  254. StateFunc = static_cast<TReceiveFunc>(stateFunc);
  255. ctx.Schedule(std::forward<TArgs>(args)...);
  256. }
  257. TReceiveFunc CurrentStateFunc() const {
  258. return StateFunc;
  259. }
  260. };
  261. template<bool>
  262. struct TActorUsageImpl {
  263. void OnEnqueueEvent(ui64 /*time*/) {} // called asynchronously when event is put in the mailbox
  264. void OnDequeueEvent() {} // called when processed by Executor
  265. double GetUsage(ui64 /*time*/) { return 0; } // called from collector thread
  266. void DoActorInit() {}
  267. };
  268. template<>
  269. struct TActorUsageImpl<true> {
  270. static constexpr int TimestampBits = 40;
  271. static constexpr int CountBits = 24;
  272. static constexpr ui64 TimestampMask = ((ui64)1 << TimestampBits) - 1;
  273. static constexpr ui64 CountMask = ((ui64)1 << CountBits) - 1;
  274. std::atomic_uint64_t QueueSizeAndTimestamp = 0;
  275. std::atomic_uint64_t UsedTime = 0; // how much time did we consume since last GetUsage() call
  276. ui64 LastUsageTimestamp = 0; // when GetUsage() was called the last time
  277. void OnEnqueueEvent(ui64 time);
  278. void OnDequeueEvent();
  279. double GetUsage(ui64 time);
  280. void DoActorInit() { LastUsageTimestamp = GetCycleCountFast(); }
  281. };
  282. class IActor
  283. : protected IActorOps
  284. , public TActorUsageImpl<ActorLibCollectUsageStats>
  285. {
  286. private:
  287. TActorIdentity SelfActorId;
  288. i64 ElapsedTicks;
  289. friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
  290. friend class TDecorator;
  291. private: // stuck actor monitoring
  292. TMonotonic LastReceiveTimestamp;
  293. size_t StuckIndex = Max<size_t>();
  294. friend class TExecutorPoolBaseMailboxed;
  295. friend class TExecutorThread;
  296. IActor(const ui32 activityType)
  297. : SelfActorId(TActorId())
  298. , ElapsedTicks(0)
  299. , ActivityType(activityType)
  300. , HandledEvents(0) {
  301. }
  302. protected:
  303. TActorCallbackBehaviour CImpl;
  304. public:
  305. using TEventFlags = IEventHandle::TEventFlags;
  306. using TReceiveFunc = TActorCallbackBehaviour::TReceiveFunc;
  307. /// @sa services.proto NKikimrServices::TActivity::EType
  308. using EActorActivity = EInternalActorType;
  309. using EActivityType = EActorActivity;
  310. ui32 ActivityType;
  311. protected:
  312. ui64 HandledEvents;
  313. template <typename EEnum = EActivityType, typename std::enable_if<std::is_enum<EEnum>::value, bool>::type v = true>
  314. IActor(const EEnum activityEnumType = EActivityType::OTHER)
  315. : IActor(TEnumProcessKey<TActorActivityTag, EEnum>::GetIndex(activityEnumType)) {
  316. }
  317. IActor(TActorCallbackBehaviour&& cImpl, const ui32 activityType)
  318. : SelfActorId(TActorId())
  319. , ElapsedTicks(0)
  320. , CImpl(std::move(cImpl))
  321. , ActivityType(activityType)
  322. , HandledEvents(0)
  323. {
  324. }
  325. template <typename EEnum = EActivityType, typename std::enable_if<std::is_enum<EEnum>::value, bool>::type v = true>
  326. IActor(TActorCallbackBehaviour&& cImpl, const EEnum activityEnumType = EActivityType::OTHER)
  327. : IActor(std::move(cImpl), TEnumProcessKey<TActorActivityTag, EEnum>::GetIndex(activityEnumType)) {
  328. }
  329. public:
  330. template <class TEventBase>
  331. class TEventSenderFromActor: ::TNonCopyable {
  332. private:
  333. TEventFlags Flags = 0;
  334. ui64 Cookie = 0;
  335. const TActorIdentity SenderId;
  336. NWilson::TTraceId TraceId = {};
  337. std::unique_ptr<TEventBase> Event;
  338. public:
  339. template <class... Types>
  340. TEventSenderFromActor(const IActor* owner, Types&&... args)
  341. : SenderId(owner->SelfId())
  342. , Event(new TEventBase(std::forward<Types>(args)...)) {
  343. }
  344. TEventSenderFromActor& SetFlags(const TEventFlags flags) {
  345. Flags = flags;
  346. return *this;
  347. }
  348. TEventSenderFromActor& SetCookie(const ui64 value) {
  349. Cookie = value;
  350. return *this;
  351. }
  352. TEventSenderFromActor& SetTraceId(NWilson::TTraceId&& value) {
  353. TraceId = std::move(value);
  354. return *this;
  355. }
  356. bool SendTo(const TActorId& recipient) {
  357. return SenderId.Send(recipient, Event.release(), Flags, Cookie, std::move(TraceId));
  358. }
  359. };
  360. template <class TEvent, class... Types>
  361. TEventSenderFromActor<TEvent> Sender(Types&&... args) const {
  362. return TEventSenderFromActor<TEvent>(this, std::forward<Types>(args)...);
  363. }
  364. virtual ~IActor() {
  365. } // must not be called for registered actors, see Die method instead
  366. protected:
  367. virtual void Die(const TActorContext& ctx); // would unregister actor so call exactly once and only from inside of message processing
  368. virtual void PassAway();
  369. protected:
  370. void SetActivityType(ui32 activityType) {
  371. ActivityType = activityType;
  372. }
  373. public:
  374. class TPassAwayGuard: TMoveOnly {
  375. private:
  376. IActor* Owner = nullptr;
  377. public:
  378. TPassAwayGuard(TPassAwayGuard&& item) {
  379. Owner = item.Owner;
  380. item.Owner = nullptr;
  381. }
  382. TPassAwayGuard(IActor* owner)
  383. : Owner(owner)
  384. {
  385. }
  386. ~TPassAwayGuard() {
  387. if (Owner) {
  388. Owner->PassAway();
  389. }
  390. }
  391. };
  392. TPassAwayGuard PassAwayGuard() {
  393. return TPassAwayGuard(this);
  394. }
  395. // must be called to wrap any call trasitions from one actor to another
  396. template<typename TActor, typename TMethod, typename... TArgs>
  397. static std::invoke_result_t<TMethod, TActor, TArgs...> InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) {
  398. struct TRecurseContext : TActorContext {
  399. TActivationContext* const Prev;
  400. TRecurseContext(const TActorId& actorId)
  401. : TActorContext(TActivationContext::ActorContextFor(actorId))
  402. , Prev(TlsActivationContext)
  403. {
  404. TlsActivationContext = this;
  405. }
  406. ~TRecurseContext() {
  407. Y_ABORT_UNLESS(TlsActivationContext == this, "TlsActivationContext mismatch; probably InvokeOtherActor was invoked from a coroutine");
  408. TlsActivationContext = Prev;
  409. }
  410. } context(actor.SelfId());
  411. return std::invoke(std::forward<TMethod>(method), actor, std::forward<TArgs>(args)...);
  412. }
  413. virtual void Registered(TActorSystem* sys, const TActorId& owner);
  414. virtual TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) {
  415. Y_UNUSED(self);
  416. Y_UNUSED(parentId);
  417. return TAutoPtr<IEventHandle>();
  418. }
  419. i64 GetElapsedTicks() const {
  420. return ElapsedTicks;
  421. }
  422. double GetElapsedTicksAsSeconds() const;
  423. void AddElapsedTicks(i64 ticks) {
  424. ElapsedTicks += ticks;
  425. }
  426. ui32 GetActivityType() const {
  427. return ActivityType;
  428. }
  429. ui64 GetHandledEvents() const {
  430. return HandledEvents;
  431. }
  432. TActorIdentity SelfId() const {
  433. return SelfActorId;
  434. }
  435. void Receive(TAutoPtr<IEventHandle>& ev) {
  436. ++HandledEvents;
  437. LastReceiveTimestamp = TActivationContext::Monotonic();
  438. if (CImpl.Initialized()) {
  439. CImpl.Receive(this, ev);
  440. } else {
  441. TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandle>(ev.Release()));
  442. }
  443. }
  444. TActorContext ActorContext() const {
  445. return TActivationContext::ActorContextFor(SelfId());
  446. }
  447. protected:
  448. void SetEnoughCpu(bool isEnough) {
  449. if (TlsThreadContext) {
  450. TlsThreadContext->IsEnoughCpu = isEnough;
  451. }
  452. }
  453. void Describe(IOutputStream&) const noexcept override;
  454. bool Send(TAutoPtr<IEventHandle> ev) const noexcept;
  455. bool Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
  456. bool Send(const TActorId& recipient, THolder<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
  457. return Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
  458. }
  459. bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  460. return Send(recipient, ev.release(), flags, cookie, std::move(traceId));
  461. }
  462. template <class TEvent, class ... TEventArgs>
  463. bool Send(TActorId recipient, TEventArgs&& ... args) const {
  464. return Send(recipient, MakeHolder<TEvent>(std::forward<TEventArgs>(args)...));
  465. }
  466. template <ESendingType SendingType>
  467. bool Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  468. template <ESendingType SendingType>
  469. bool Send(const TActorId& recipient, THolder<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  470. return Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
  471. }
  472. template <ESendingType SendingType>
  473. bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, TEventFlags flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  474. return Send(recipient, ev.release(), flags, cookie, std::move(traceId));
  475. }
  476. static bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) {
  477. return TActivationContext::Forward(ev, recipient);
  478. }
  479. static bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
  480. return TActivationContext::Forward(ev, recipient);
  481. }
  482. template <typename TEventHandle>
  483. static bool Forward(TAutoPtr<TEventHandle>& ev, const TActorId& recipient) {
  484. TAutoPtr<IEventHandle> evi(ev.Release());
  485. return TActivationContext::Forward(evi, recipient);
  486. }
  487. void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
  488. void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
  489. void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
  490. // register new actor in ActorSystem on new fresh mailbox.
  491. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final;
  492. template <ESendingType SendingType>
  493. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept;
  494. // Register new actor in ActorSystem on same _mailbox_ as current actor.
  495. // There is one thread per mailbox to execute actor, which mean
  496. // no _cpu core scalability_ for such actors.
  497. // This method of registration can be useful if multiple actors share
  498. // some memory.
  499. TActorId RegisterWithSameMailbox(IActor* actor) const noexcept final;
  500. std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
  501. private:
  502. void ChangeSelfId(TActorId actorId) {
  503. SelfActorId = actorId;
  504. }
  505. };
  506. inline size_t GetActivityTypeCount() {
  507. return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetCount();
  508. }
  509. inline TStringBuf GetActivityTypeName(size_t index) {
  510. return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index);
  511. }
  512. class IActorCallback: public IActor {
  513. protected:
  514. template <class TEnum = IActor::EActivityType>
  515. IActorCallback(TReceiveFunc stateFunc, const TEnum activityType = IActor::EActivityType::OTHER)
  516. : IActor(TActorCallbackBehaviour(stateFunc), activityType) {
  517. }
  518. IActorCallback(TReceiveFunc stateFunc, const ui32 activityType)
  519. : IActor(TActorCallbackBehaviour(stateFunc), activityType) {
  520. }
  521. public:
  522. template <typename T>
  523. void Become(T stateFunc) {
  524. CImpl.Become(stateFunc);
  525. }
  526. template <typename T, typename... TArgs>
  527. void Become(T stateFunc, const TActorContext& ctx, TArgs&&... args) {
  528. CImpl.Become(stateFunc, ctx, std::forward<TArgs>(args)...);
  529. }
  530. template <typename T, typename... TArgs>
  531. void Become(T stateFunc, TArgs&&... args) {
  532. CImpl.Become(stateFunc);
  533. Schedule(std::forward<TArgs>(args)...);
  534. }
  535. TReceiveFunc CurrentStateFunc() const {
  536. return CImpl.CurrentStateFunc();
  537. }
  538. };
  539. template <typename TDerived>
  540. class TActor: public IActorCallback {
  541. private:
  542. using TDerivedReceiveFunc = void (TDerived::*)(TAutoPtr<IEventHandle>& ev);
  543. template <typename T, typename = const char*>
  544. struct HasActorName: std::false_type {};
  545. template <typename T>
  546. struct HasActorName<T, decltype((void)T::ActorName, (const char*)nullptr)>: std::true_type {};
  547. template <typename T, typename = const char*>
  548. struct HasActorActivityType: std::false_type {};
  549. template <typename T>
  550. struct HasActorActivityType<T, decltype((void)T::ActorActivityType, (const char*)nullptr)>: std::true_type {};
  551. static ui32 GetActivityTypeIndexImpl() {
  552. if constexpr(HasActorName<TDerived>::value) {
  553. return TLocalProcessKey<TActorActivityTag, TDerived::ActorName>::GetIndex();
  554. } else if constexpr (HasActorActivityType<TDerived>::value) {
  555. using TActorActivity = decltype(((TDerived*)nullptr)->ActorActivityType());
  556. static_assert(std::is_enum<TActorActivity>::value);
  557. return TEnumProcessKey<TActorActivityTag, TActorActivity>::GetIndex(TDerived::ActorActivityType());
  558. } else {
  559. // 200 characters is limit for solomon metric tag length
  560. return TLocalProcessExtKey<TActorActivityTag, TDerived, 200>::GetIndex();
  561. }
  562. }
  563. static ui32 GetActivityTypeIndex() {
  564. static const ui32 result = GetActivityTypeIndexImpl();
  565. return result;
  566. }
  567. protected:
  568. // static constexpr char ActorName[] = "UNNAMED";
  569. TActor(TDerivedReceiveFunc func)
  570. : IActorCallback(static_cast<TReceiveFunc>(func), GetActivityTypeIndex()) {
  571. }
  572. template <class TEnum = EActivityType>
  573. TActor(TDerivedReceiveFunc func, const TEnum activityEnumType = EActivityType::OTHER)
  574. : IActorCallback(static_cast<TReceiveFunc>(func), activityEnumType) {
  575. }
  576. TActor(TDerivedReceiveFunc func, const TString& actorName)
  577. : IActorCallback(static_cast<TReceiveFunc>(func), TLocalProcessKeyState<TActorActivityTag>::GetInstance().Register(actorName)) {
  578. }
  579. public:
  580. typedef TDerived TThis;
  581. // UnsafeBecome methods don't verify the bindings of the stateFunc to the TDerived
  582. template <typename T>
  583. void UnsafeBecome(T stateFunc) {
  584. this->IActorCallback::Become(stateFunc);
  585. }
  586. template <typename T, typename... TArgs>
  587. void UnsafeBecome(T stateFunc, const TActorContext& ctx, TArgs&&... args) {
  588. this->IActorCallback::Become(stateFunc, ctx, std::forward<TArgs>(args)...);
  589. }
  590. template <typename T, typename... TArgs>
  591. void UnsafeBecome(T stateFunc, TArgs&&... args) {
  592. this->IActorCallback::Become(stateFunc, std::forward<TArgs>(args)...);
  593. }
  594. template <typename T>
  595. void Become(T stateFunc) {
  596. // TODO(kruall): have to uncomment asserts after end of sync contrib/ydb
  597. // static_assert(std::is_convertible_v<T, TDerivedReceiveFunc>);
  598. this->IActorCallback::Become(stateFunc);
  599. }
  600. template <typename T, typename... TArgs>
  601. void Become(T stateFunc, const TActorContext& ctx, TArgs&&... args) {
  602. // static_assert(std::is_convertible_v<T, TDerivedReceiveFunc>);
  603. this->IActorCallback::Become(stateFunc, ctx, std::forward<TArgs>(args)...);
  604. }
  605. template <typename T, typename... TArgs>
  606. void Become(T stateFunc, TArgs&&... args) {
  607. // static_assert(std::is_convertible_v<T, TDerivedReceiveFunc>);
  608. this->IActorCallback::Become(stateFunc, std::forward<TArgs>(args)...);
  609. }
  610. };
  611. #define STFUNC_SIG TAutoPtr<::NActors::IEventHandle>& ev
  612. #define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev
  613. #define STFUNC(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev)
  614. #define STATEFN(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev)
  615. #define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_DEBUG_ABORT_UNLESS(false, "%s: unexpected message type 0x%08" PRIx32, __func__, etype);
  616. #define STFUNC_BODY(HANDLERS, UNHANDLED_MSG_HANDLER) \
  617. switch (const ui32 etype = ev->GetTypeRewrite()) { \
  618. HANDLERS \
  619. default: \
  620. UNHANDLED_MSG_HANDLER \
  621. }
  622. #define STRICT_STFUNC_BODY(HANDLERS) STFUNC_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER)
  623. #define STRICT_STFUNC(NAME, HANDLERS) \
  624. void NAME(STFUNC_SIG) { \
  625. STRICT_STFUNC_BODY(HANDLERS) \
  626. }
  627. #define STRICT_STFUNC_EXC(NAME, HANDLERS, EXCEPTION_HANDLERS) \
  628. void NAME(STFUNC_SIG) { \
  629. try { \
  630. STRICT_STFUNC_BODY(HANDLERS) \
  631. } \
  632. EXCEPTION_HANDLERS \
  633. }
  634. inline const TActorContext& TActivationContext::AsActorContext() {
  635. TActivationContext* tls = TlsActivationContext;
  636. return *static_cast<TActorContext*>(tls);
  637. }
  638. inline TActorContext TActivationContext::ActorContextFor(TActorId id) {
  639. auto& tls = *TlsActivationContext;
  640. return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id);
  641. }
  642. class TDecorator : public IActorCallback {
  643. protected:
  644. THolder<IActor> Actor;
  645. public:
  646. TDecorator(THolder<IActor>&& actor)
  647. : IActorCallback(static_cast<TReceiveFunc>(&TDecorator::State), actor->GetActivityType())
  648. , Actor(std::move(actor))
  649. {
  650. }
  651. void Registered(TActorSystem* sys, const TActorId& owner) override {
  652. Actor->ChangeSelfId(SelfId());
  653. Actor->Registered(sys, owner);
  654. }
  655. virtual bool DoBeforeReceiving(TAutoPtr<IEventHandle>& /*ev*/, const TActorContext& /*ctx*/) {
  656. return true;
  657. }
  658. virtual void DoAfterReceiving(const TActorContext& /*ctx*/)
  659. {
  660. }
  661. STFUNC(State) {
  662. auto ctx(ActorContext());
  663. if (DoBeforeReceiving(ev, ctx)) {
  664. Actor->Receive(ev);
  665. DoAfterReceiving(ctx);
  666. }
  667. }
  668. };
  669. // TTestDecorator doesn't work with the real actor system
  670. struct TTestDecorator : public TDecorator {
  671. TTestDecorator(THolder<IActor>&& actor)
  672. : TDecorator(std::move(actor))
  673. {
  674. }
  675. virtual ~TTestDecorator() = default;
  676. // This method must be called in the test actor system
  677. bool BeforeSending(TAutoPtr<IEventHandle>& ev)
  678. {
  679. bool send = true;
  680. TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(Actor.Get());
  681. if (decorator) {
  682. send = decorator->BeforeSending(ev);
  683. }
  684. return send && ev && DoBeforeSending(ev);
  685. }
  686. virtual bool DoBeforeSending(TAutoPtr<IEventHandle>& /*ev*/) {
  687. return true;
  688. }
  689. };
  690. template <ESendingType SendingType>
  691. bool TExecutorThread::Send(TAutoPtr<IEventHandle> ev) {
  692. #ifdef USE_ACTOR_CALLSTACK
  693. do {
  694. (ev)->Callstack = TCallstack::GetTlsCallstack();
  695. (ev)->Callstack.Trace();
  696. } while (false)
  697. #endif
  698. Ctx.IncrementSentEvents();
  699. return ActorSystem->Send<SendingType>(ev);
  700. }
  701. template <ESendingType SendingType>
  702. TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId,
  703. TActorId parentId)
  704. {
  705. if (!parentId) {
  706. parentId = CurrentRecipient;
  707. }
  708. if (poolId == Max<ui32>()) {
  709. if constexpr (SendingType == ESendingType::Common) {
  710. return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
  711. } else if (!TlsThreadContext) {
  712. return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
  713. } else {
  714. ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
  715. TActorId id = Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
  716. TlsThreadContext->SendingType = previousType;
  717. return id;
  718. }
  719. } else {
  720. return ActorSystem->Register<SendingType>(actor, mailboxType, poolId, ++RevolvingWriteCounter, parentId);
  721. }
  722. }
  723. template <ESendingType SendingType>
  724. TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) {
  725. if (!parentId) {
  726. parentId = CurrentRecipient;
  727. }
  728. if constexpr (SendingType == ESendingType::Common) {
  729. return Ctx.Executor->Register(actor, mailbox, hint, parentId);
  730. } else if (!TlsActivationContext) {
  731. return Ctx.Executor->Register(actor, mailbox, hint, parentId);
  732. } else {
  733. ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
  734. TActorId id = Ctx.Executor->Register(actor, mailbox, hint, parentId);
  735. TlsThreadContext->SendingType = previousType;
  736. return id;
  737. }
  738. }
  739. template <ESendingType SendingType>
  740. bool TActivationContext::Send(TAutoPtr<IEventHandle> ev) {
  741. return TlsActivationContext->ExecutorThread.Send<SendingType>(ev);
  742. }
  743. template <ESendingType SendingType>
  744. bool TActivationContext::Send(std::unique_ptr<IEventHandle> &&ev) {
  745. return TlsActivationContext->ExecutorThread.Send<SendingType>(ev.release());
  746. }
  747. template <ESendingType SendingType>
  748. bool TActivationContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) {
  749. return Send(IEventHandle::Forward(ev, recipient));
  750. }
  751. template <ESendingType SendingType>
  752. bool TActivationContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
  753. return Send(IEventHandle::Forward(ev, recipient));
  754. }
  755. template <ESendingType SendingType>
  756. bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags, ui64 cookie, NWilson::TTraceId traceId) const {
  757. return Send<SendingType>(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
  758. }
  759. template <ESendingType SendingType>
  760. bool TActorContext::Send(TAutoPtr<IEventHandle> ev) const {
  761. return ExecutorThread.Send<SendingType>(ev);
  762. }
  763. template <ESendingType SendingType>
  764. bool TActorContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const {
  765. return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient));
  766. }
  767. template <ESendingType SendingType>
  768. bool TActorContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const {
  769. return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient));
  770. }
  771. template <ESendingType SendingType>
  772. TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) {
  773. return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, parentId);
  774. }
  775. template <ESendingType SendingType>
  776. TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const {
  777. return ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfID);
  778. }
  779. template <ESendingType SendingType>
  780. bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags, ui64 cookie, NWilson::TTraceId traceId) const {
  781. return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
  782. }
  783. template <ESendingType SendingType>
  784. bool IActor::Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags, ui64 cookie, NWilson::TTraceId traceId) const {
  785. return SelfActorId.Send<SendingType>(recipient, ev, flags, cookie, std::move(traceId));
  786. }
  787. template <ESendingType SendingType>
  788. TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
  789. Y_ABORT_UNLESS(actor);
  790. return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfActorId);
  791. }
  792. template <ESendingType SendingType>
  793. TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool,
  794. ui64 revolvingCounter, const TActorId& parentId) {
  795. Y_ABORT_UNLESS(actor);
  796. Y_ABORT_UNLESS(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32,
  797. (ui32)executorPool, (ui32)ExecutorPoolCount);
  798. if constexpr (SendingType == ESendingType::Common) {
  799. return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
  800. } else if (!TlsThreadContext) {
  801. return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
  802. } else {
  803. ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
  804. TActorId id = CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
  805. TlsThreadContext->SendingType = previousType;
  806. return id;
  807. }
  808. }
  809. template <ESendingType SendingType>
  810. bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const {
  811. if constexpr (SendingType == ESendingType::Common) {
  812. return this->GenericSend< &IExecutorPool::Send>(ev);
  813. } else {
  814. return this->SpecificSend(ev, SendingType);
  815. }
  816. }
  817. }
  818. template <>
  819. inline void Out<NActors::TActorIdentity>(IOutputStream& o, const NActors::TActorIdentity& x) {
  820. return x.Out(o);
  821. }
  822. template <>
  823. struct THash<NActors::TActorIdentity> {
  824. inline ui64 operator()(const NActors::TActorIdentity& x) const {
  825. return x.Hash();
  826. }
  827. };
  828. template<> struct std::hash<NActors::TActorIdentity> : THash<NActors::TActorIdentity> {};