actor.h 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. #pragma once
  2. #include "event.h"
  3. #include "monotonic.h"
  4. #include <util/system/tls.h>
  5. #include <library/cpp/actors/util/local_process_key.h>
  6. namespace NActors {
  7. class TActorSystem;
  8. class TMailboxTable;
  9. struct TMailboxHeader;
  10. class TExecutorThread;
  11. class IActor;
  12. class ISchedulerCookie;
  13. namespace NLog {
  14. struct TSettings;
  15. }
  16. struct TActorContext;
  17. struct TActivationContext {
  18. public:
  19. TMailboxHeader& Mailbox;
  20. TExecutorThread& ExecutorThread;
  21. const NHPTimer::STime EventStart;
  22. protected:
  23. explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart)
  24. : Mailbox(mailbox)
  25. , ExecutorThread(executorThread)
  26. , EventStart(eventStart)
  27. {
  28. }
  29. public:
  30. static bool Send(TAutoPtr<IEventHandle> ev);
  31. /**
  32. * Schedule one-shot event that will be send at given time point in the future.
  33. *
  34. * @param deadline the wallclock time point in future when event must be send
  35. * @param ev the event to send
  36. * @param cookie cookie that will be piggybacked with event
  37. */
  38. static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
  39. /**
  40. * Schedule one-shot event that will be send at given time point in the future.
  41. *
  42. * @param deadline the monotonic time point in future when event must be send
  43. * @param ev the event to send
  44. * @param cookie cookie that will be piggybacked with event
  45. */
  46. static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
  47. /**
  48. * Schedule one-shot event that will be send after given delay.
  49. *
  50. * @param delta the time from now to delay event sending
  51. * @param ev the event to send
  52. * @param cookie cookie that will be piggybacked with event
  53. */
  54. static void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
  55. static TInstant Now();
  56. static TMonotonic Monotonic();
  57. NLog::TSettings* LoggerSettings() const;
  58. // register new actor in ActorSystem on new fresh mailbox.
  59. static TActorId Register(IActor* actor, TActorId parentId = TActorId(), TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>());
  60. // Register new actor in ActorSystem on same _mailbox_ as current actor.
  61. // There is one thread per mailbox to execute actor, which mean
  62. // no _cpu core scalability_ for such actors.
  63. // This method of registration can be usefull if multiple actors share
  64. // some memory.
  65. static TActorId RegisterWithSameMailbox(IActor* actor, TActorId parentId);
  66. static const TActorContext& AsActorContext();
  67. static TActorContext ActorContextFor(TActorId id);
  68. static TActorId InterconnectProxy(ui32 nodeid);
  69. static TActorSystem* ActorSystem();
  70. static i64 GetCurrentEventTicks();
  71. static double GetCurrentEventTicksAsSeconds();
  72. };
  73. struct TActorContext: public TActivationContext {
  74. const TActorId SelfID;
  75. explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
  76. : TActivationContext(mailbox, executorThread, eventStart)
  77. , SelfID(selfID)
  78. {
  79. }
  80. bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  81. template <typename TEvent>
  82. bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
  83. return Send(recipient, static_cast<IEventBase*>(ev.Release()), flags, cookie, std::move(traceId));
  84. }
  85. bool Send(TAutoPtr<IEventHandle> ev) const;
  86. TInstant Now() const;
  87. TMonotonic Monotonic() const;
  88. /**
  89. * Schedule one-shot event that will be send at given time point in the future.
  90. *
  91. * @param deadline the wallclock time point in future when event must be send
  92. * @param ev the event to send
  93. * @param cookie cookie that will be piggybacked with event
  94. */
  95. void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  96. /**
  97. * Schedule one-shot event that will be send at given time point in the future.
  98. *
  99. * @param deadline the monotonic time point in future when event must be send
  100. * @param ev the event to send
  101. * @param cookie cookie that will be piggybacked with event
  102. */
  103. void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  104. /**
  105. * Schedule one-shot event that will be send after given delay.
  106. *
  107. * @param delta the time from now to delay event sending
  108. * @param ev the event to send
  109. * @param cookie cookie that will be piggybacked with event
  110. */
  111. void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  112. TActorContext MakeFor(const TActorId& otherId) const {
  113. return TActorContext(Mailbox, ExecutorThread, EventStart, otherId);
  114. }
  115. // register new actor in ActorSystem on new fresh mailbox.
  116. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const;
  117. // Register new actor in ActorSystem on same _mailbox_ as current actor.
  118. // There is one thread per mailbox to execute actor, which mean
  119. // no _cpu core scalability_ for such actors.
  120. // This method of registration can be usefull if multiple actors share
  121. // some memory.
  122. TActorId RegisterWithSameMailbox(IActor* actor) const;
  123. std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
  124. };
  125. extern Y_POD_THREAD(TActivationContext*) TlsActivationContext;
  126. struct TActorIdentity: public TActorId {
  127. explicit TActorIdentity(TActorId actorId)
  128. : TActorId(actorId)
  129. {
  130. }
  131. void operator=(TActorId actorId) {
  132. *this = TActorIdentity(actorId);
  133. }
  134. bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
  135. void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  136. void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  137. void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
  138. };
  139. class IActor;
  140. class IActorOps : TNonCopyable {
  141. public:
  142. virtual void Describe(IOutputStream&) const noexcept = 0;
  143. virtual bool Send(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0;
  144. /**
  145. * Schedule one-shot event that will be send at given time point in the future.
  146. *
  147. * @param deadline the wallclock time point in future when event must be send
  148. * @param ev the event to send
  149. * @param cookie cookie that will be piggybacked with event
  150. */
  151. virtual void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
  152. /**
  153. * Schedule one-shot event that will be send at given time point in the future.
  154. *
  155. * @param deadline the monotonic time point in future when event must be send
  156. * @param ev the event to send
  157. * @param cookie cookie that will be piggybacked with event
  158. */
  159. virtual void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
  160. /**
  161. * Schedule one-shot event that will be send after given delay.
  162. *
  163. * @param delta the time from now to delay event sending
  164. * @param ev the event to send
  165. * @param cookie cookie that will be piggybacked with event
  166. */
  167. virtual void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
  168. virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0;
  169. virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0;
  170. };
  171. class TDecorator;
  172. class IActor : protected IActorOps {
  173. public:
  174. typedef void (IActor::*TReceiveFunc)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
  175. private:
  176. TReceiveFunc StateFunc;
  177. TActorIdentity SelfActorId;
  178. i64 ElapsedTicks;
  179. ui64 HandledEvents;
  180. friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
  181. friend class TDecorator;
  182. public:
  183. /// @sa services.proto NKikimrServices::TActivity::EType
  184. enum EActorActivity {
  185. OTHER = 0,
  186. ACTOR_SYSTEM = 1,
  187. ACTORLIB_COMMON = 2,
  188. ACTORLIB_STATS = 3,
  189. LOG_ACTOR = 4,
  190. INTERCONNECT_PROXY_TCP = 12,
  191. INTERCONNECT_SESSION_TCP = 13,
  192. INTERCONNECT_COMMON = 171,
  193. SELF_PING_ACTOR = 207,
  194. TEST_ACTOR_RUNTIME = 283,
  195. INTERCONNECT_HANDSHAKE = 284,
  196. INTERCONNECT_POLLER = 285,
  197. INTERCONNECT_SESSION_KILLER = 286,
  198. ACTOR_SYSTEM_SCHEDULER_ACTOR = 312,
  199. ACTOR_FUTURE_CALLBACK = 337,
  200. INTERCONNECT_MONACTOR = 362,
  201. INTERCONNECT_LOAD_ACTOR = 376,
  202. INTERCONNECT_LOAD_RESPONDER = 377,
  203. NAMESERVICE = 450,
  204. DNS_RESOLVER = 481,
  205. INTERCONNECT_PROXY_WRAPPER = 546,
  206. };
  207. using EActivityType = EActorActivity;
  208. ui32 ActivityType;
  209. protected:
  210. IActor(TReceiveFunc stateFunc, ui32 activityType = OTHER)
  211. : StateFunc(stateFunc)
  212. , SelfActorId(TActorId())
  213. , ElapsedTicks(0)
  214. , HandledEvents(0)
  215. , ActivityType(activityType)
  216. {
  217. }
  218. public:
  219. virtual ~IActor() {
  220. } // must not be called for registered actors, see Die method instead
  221. protected:
  222. virtual void Die(const TActorContext& ctx); // would unregister actor so call exactly once and only from inside of message processing
  223. virtual void PassAway();
  224. public:
  225. template <typename T>
  226. void Become(T stateFunc) {
  227. StateFunc = static_cast<TReceiveFunc>(stateFunc);
  228. }
  229. template <typename T, typename... TArgs>
  230. void Become(T stateFunc, const TActorContext& ctx, TArgs&&... args) {
  231. StateFunc = static_cast<TReceiveFunc>(stateFunc);
  232. ctx.Schedule(std::forward<TArgs>(args)...);
  233. }
  234. template <typename T, typename... TArgs>
  235. void Become(T stateFunc, TArgs&&... args) {
  236. StateFunc = static_cast<TReceiveFunc>(stateFunc);
  237. Schedule(std::forward<TArgs>(args)...);
  238. }
  239. protected:
  240. void SetActivityType(ui32 activityType) {
  241. ActivityType = activityType;
  242. }
  243. public:
  244. TReceiveFunc CurrentStateFunc() const {
  245. return StateFunc;
  246. }
  247. // NOTE: exceptions must not escape state function but if an exception hasn't be caught
  248. // by the actor then we want to crash an see the stack
  249. void Receive(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
  250. (this->*StateFunc)(ev, ctx);
  251. HandledEvents++;
  252. }
  253. // must be called to wrap any call trasitions from one actor to another
  254. template<typename TActor, typename TMethod, typename... TArgs>
  255. static decltype((std::declval<TActor>().*std::declval<TMethod>())(std::declval<TArgs>()...))
  256. InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) {
  257. struct TRecurseContext : TActorContext {
  258. TActivationContext *Prev;
  259. TRecurseContext(const TActorId& actorId)
  260. : TActorContext(TActivationContext::ActorContextFor(actorId))
  261. , Prev(TlsActivationContext)
  262. {
  263. TlsActivationContext = this;
  264. }
  265. ~TRecurseContext() {
  266. TlsActivationContext = Prev;
  267. }
  268. } context(actor.SelfId());
  269. return (actor.*method)(std::forward<TArgs>(args)...);
  270. }
  271. virtual void Registered(TActorSystem* sys, const TActorId& owner);
  272. virtual TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) {
  273. Y_UNUSED(self);
  274. Y_UNUSED(parentId);
  275. return TAutoPtr<IEventHandle>();
  276. }
  277. i64 GetElapsedTicks() const {
  278. return ElapsedTicks;
  279. }
  280. double GetElapsedTicksAsSeconds() const;
  281. void AddElapsedTicks(i64 ticks) {
  282. ElapsedTicks += ticks;
  283. }
  284. auto GetActivityType() const {
  285. return ActivityType;
  286. }
  287. ui64 GetHandledEvents() const {
  288. return HandledEvents;
  289. }
  290. TActorIdentity SelfId() const {
  291. return SelfActorId;
  292. }
  293. protected:
  294. void Describe(IOutputStream&) const noexcept override;
  295. bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
  296. template <typename TEvent>
  297. bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
  298. return Send(recipient, static_cast<IEventBase*>(ev.Release()), flags, cookie, std::move(traceId));
  299. }
  300. template <class TEvent, class ... TEventArgs>
  301. bool Send(TActorId recipient, TEventArgs&& ... args) const {
  302. return Send(recipient, MakeHolder<TEvent>(std::forward<TEventArgs>(args)...));
  303. }
  304. void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
  305. void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
  306. void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
  307. // register new actor in ActorSystem on new fresh mailbox.
  308. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final;
  309. // Register new actor in ActorSystem on same _mailbox_ as current actor.
  310. // There is one thread per mailbox to execute actor, which mean
  311. // no _cpu core scalability_ for such actors.
  312. // This method of registration can be usefull if multiple actors share
  313. // some memory.
  314. TActorId RegisterWithSameMailbox(IActor* actor) const noexcept final;
  315. std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
  316. private:
  317. void ChangeSelfId(TActorId actorId) {
  318. SelfActorId = actorId;
  319. }
  320. };
  321. struct TActorActivityTag {};
  322. inline size_t GetActivityTypeCount() {
  323. return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetCount();
  324. }
  325. inline TStringBuf GetActivityTypeName(size_t index) {
  326. return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index);
  327. }
  328. template <typename TDerived>
  329. class TActor: public IActor {
  330. private:
  331. template <typename T, typename = const char*>
  332. struct HasActorName: std::false_type { };
  333. template <typename T>
  334. struct HasActorName<T, decltype((void)T::ActorName, (const char*)nullptr)>: std::true_type { };
  335. static ui32 GetActivityTypeIndex() {
  336. if constexpr(HasActorName<TDerived>::value) {
  337. return TLocalProcessKey<TActorActivityTag, TDerived::ActorName>::GetIndex();
  338. } else {
  339. using TActorActivity = decltype(((TDerived*)nullptr)->ActorActivityType());
  340. // if constexpr(std::is_enum<TActorActivity>::value) {
  341. return TEnumProcessKey<TActorActivityTag, TActorActivity>::GetIndex(
  342. TDerived::ActorActivityType());
  343. //} else {
  344. // for int, ui32, ...
  345. // return TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(
  346. // static_cast<IActor::EActorActivity>(TDerived::ActorActivityType()));
  347. //}
  348. }
  349. }
  350. protected:
  351. //* Comment this function to find unmarked activities
  352. static constexpr IActor::EActivityType ActorActivityType() {
  353. return EActorActivity::OTHER;
  354. } //*/
  355. // static constexpr char ActorName[] = "UNNAMED";
  356. TActor(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx), ui32 activityType = GetActivityTypeIndex())
  357. : IActor(static_cast<TReceiveFunc>(func), activityType)
  358. { }
  359. public:
  360. typedef TDerived TThis;
  361. };
  362. #define STFUNC_SIG TAutoPtr< ::NActors::IEventHandle>&ev, const ::NActors::TActorContext &ctx
  363. #define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev
  364. #define STFUNC(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ctx)
  365. #define STATEFN(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& )
  366. #define STRICT_STFUNC(NAME, HANDLERS) \
  367. void NAME(STFUNC_SIG) { \
  368. Y_UNUSED(ctx); \
  369. switch (const ui32 etype = ev->GetTypeRewrite()) { \
  370. HANDLERS \
  371. default: \
  372. Y_VERIFY_DEBUG(false, "%s: unexpected message type 0x%08" PRIx32, __func__, etype); \
  373. } \
  374. }
  375. inline const TActorContext& TActivationContext::AsActorContext() {
  376. TActivationContext* tls = TlsActivationContext;
  377. return *static_cast<TActorContext*>(tls);
  378. }
  379. inline TActorContext TActivationContext::ActorContextFor(TActorId id) {
  380. auto& tls = *TlsActivationContext;
  381. return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id);
  382. }
  383. class TDecorator : public IActor {
  384. protected:
  385. THolder<IActor> Actor;
  386. public:
  387. TDecorator(THolder<IActor>&& actor)
  388. : IActor(static_cast<TReceiveFunc>(&TDecorator::State), actor->GetActivityType())
  389. , Actor(std::move(actor))
  390. {
  391. }
  392. void Registered(TActorSystem* sys, const TActorId& owner) override {
  393. Actor->ChangeSelfId(SelfId());
  394. Actor->Registered(sys, owner);
  395. }
  396. virtual bool DoBeforeReceiving(TAutoPtr<IEventHandle>& /*ev*/, const TActorContext& /*ctx*/) {
  397. return true;
  398. }
  399. virtual void DoAfterReceiving(const TActorContext& /*ctx*/)
  400. {
  401. }
  402. STFUNC(State) {
  403. if (DoBeforeReceiving(ev, ctx)) {
  404. Actor->Receive(ev, ctx);
  405. DoAfterReceiving(ctx);
  406. }
  407. }
  408. };
  409. // TTestDecorator doesn't work with the real actor system
  410. struct TTestDecorator : public TDecorator {
  411. TTestDecorator(THolder<IActor>&& actor)
  412. : TDecorator(std::move(actor))
  413. {
  414. }
  415. virtual ~TTestDecorator() = default;
  416. // This method must be called in the test actor system
  417. bool BeforeSending(TAutoPtr<IEventHandle>& ev)
  418. {
  419. bool send = true;
  420. TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(Actor.Get());
  421. if (decorator) {
  422. send = decorator->BeforeSending(ev);
  423. }
  424. return send && ev && DoBeforeSending(ev);
  425. }
  426. virtual bool DoBeforeSending(TAutoPtr<IEventHandle>& /*ev*/) {
  427. return true;
  428. }
  429. };
  430. }
  431. template <>
  432. inline void Out<NActors::TActorIdentity>(IOutputStream& o, const NActors::TActorIdentity& x) {
  433. return x.Out(o);
  434. }
  435. template <>
  436. struct THash<NActors::TActorIdentity> {
  437. inline ui64 operator()(const NActors::TActorIdentity& x) const {
  438. return x.Hash();
  439. }
  440. };