event.h 13 KB


  1. #pragma once
  2. #include "defs.h"
  3. #include "actorid.h"
  4. #include "callstack.h"
  5. #include "event_load.h"
  6. #include <library/cpp/actors/wilson/wilson_trace.h>
  7. #include <util/system/hp_timer.h>
  8. #include <util/generic/maybe.h>
  9. namespace NActors {
  10. class TChunkSerializer;
  11. class IActor;
  12. class ISerializerToStream {
  13. public:
  14. virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
  15. };
  16. class IEventBase
  17. : TNonCopyable,
  18. public ISerializerToStream {
  19. protected:
  20. // for compatibility with virtual actors
  21. virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandle> /*eventPtr*/) {
  22. Y_DEBUG_ABORT_UNLESS(false);
  23. return false;
  24. }
  25. public:
  26. // actual typing is performed by IEventHandle
  27. virtual ~IEventBase() {
  28. }
  29. bool Execute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) {
  30. return DoExecute(actor, std::move(eventPtr));
  31. }
  32. virtual TString ToStringHeader() const = 0;
  33. virtual TString ToString() const {
  34. return ToStringHeader();
  35. }
  36. virtual ui32 CalculateSerializedSize() const {
  37. return 0;
  38. }
  39. virtual ui32 Type() const = 0;
  40. virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
  41. virtual bool IsSerializable() const = 0;
  42. virtual ui32 CalculateSerializedSizeCached() const {
  43. return CalculateSerializedSize();
  44. }
  45. virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; }
  46. };
  47. // fat handle
  48. class IEventHandle : TNonCopyable {
  49. struct TOnNondelivery {
  50. TActorId Recipient;
  51. TOnNondelivery(const TActorId& recipient)
  52. : Recipient(recipient)
  53. {
  54. }
  55. };
  56. public:
  57. template <typename TEv>
  58. inline TEv* CastAsLocal() const noexcept {
  59. auto fits = GetTypeRewrite() == TEv::EventType;
  60. return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
  61. }
  62. template <typename TEventType>
  63. TEventType* Get() {
  64. if (Type != TEventType::EventType)
  65. Y_ABORT("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);
  66. if (!Event) {
  67. static TEventSerializedData empty;
  68. Event.Reset(TEventType::Load(Buffer ? Buffer.Get() : &empty));
  69. }
  70. if (Event) {
  71. return static_cast<TEventType*>(Event.Get());
  72. }
  73. Y_ABORT("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data());
  74. }
  75. template <typename T>
  76. TAutoPtr<T> Release() {
  77. TAutoPtr<T> x = Get<T>();
  78. Y_UNUSED(Event.Release());
  79. Buffer.Reset();
  80. return x;
  81. }
  82. enum EFlags: ui32 {
  83. FlagTrackDelivery = 1 << 0,
  84. FlagForwardOnNondelivery = 1 << 1,
  85. FlagSubscribeOnSession = 1 << 2,
  86. FlagUseSubChannel = 1 << 3,
  87. FlagGenerateUnsureUndelivered = 1 << 4,
  88. FlagExtendedFormat = 1 << 5,
  89. };
  90. using TEventFlags = ui32;
  91. const ui32 Type;
  92. const TEventFlags Flags;
  93. const TActorId Recipient;
  94. TActorId Sender;
  95. const ui64 Cookie;
  96. const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect
  97. // if set, used by ActorSystem/Interconnect to report tracepoints
  98. NWilson::TTraceId TraceId;
  99. // filled if feeded by interconnect session
  100. const TActorId InterconnectSession;
  101. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  102. ::NHPTimer::STime SendTime;
  103. #endif
  104. static const size_t ChannelBits = 12;
  105. static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
  106. #ifdef USE_ACTOR_CALLSTACK
  107. TCallstack Callstack;
  108. #endif
  109. ui16 GetChannel() const noexcept {
  110. return Flags >> ChannelShift;
  111. }
  112. ui64 GetSubChannel() const noexcept {
  113. return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL;
  114. }
  115. static ui32 MakeFlags(ui32 channel, TEventFlags flags) {
  116. Y_ABORT_UNLESS(channel < (1 << ChannelBits));
  117. Y_ABORT_UNLESS(flags < (1 << ChannelShift));
  118. return (flags | (channel << ChannelShift));
  119. }
  120. private:
  121. THolder<IEventBase> Event;
  122. TIntrusivePtr<TEventSerializedData> Buffer;
  123. TActorId RewriteRecipient;
  124. ui32 RewriteType;
  125. THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events
  126. public:
  127. void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
  128. RewriteRecipient = recipientRewrite;
  129. RewriteType = typeRewrite;
  130. }
  131. void DropRewrite() {
  132. RewriteRecipient = Recipient;
  133. RewriteType = Type;
  134. }
  135. const TActorId& GetRecipientRewrite() const {
  136. return RewriteRecipient;
  137. }
  138. ui32 GetTypeRewrite() const {
  139. return RewriteType;
  140. }
  141. TActorId GetForwardOnNondeliveryRecipient() const {
  142. return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
  143. }
  144. IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0,
  145. const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
  146. : Type(ev->Type())
  147. , Flags(flags)
  148. , Recipient(recipient)
  149. , Sender(sender)
  150. , Cookie(cookie)
  151. , TraceId(std::move(traceId))
  152. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  153. , SendTime(0)
  154. #endif
  155. , Event(ev)
  156. , RewriteRecipient(Recipient)
  157. , RewriteType(Type)
  158. {
  159. if (forwardOnNondelivery)
  160. OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
  161. }
  162. IEventHandle(ui32 type,
  163. TEventFlags flags,
  164. const TActorId& recipient,
  165. const TActorId& sender,
  166. TIntrusivePtr<TEventSerializedData> buffer,
  167. ui64 cookie,
  168. const TActorId* forwardOnNondelivery = nullptr,
  169. NWilson::TTraceId traceId = {})
  170. : Type(type)
  171. , Flags(flags)
  172. , Recipient(recipient)
  173. , Sender(sender)
  174. , Cookie(cookie)
  175. , TraceId(std::move(traceId))
  176. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  177. , SendTime(0)
  178. #endif
  179. , Buffer(std::move(buffer))
  180. , RewriteRecipient(Recipient)
  181. , RewriteType(Type)
  182. {
  183. if (forwardOnNondelivery)
  184. OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
  185. }
  186. // Special ctor for events from interconnect.
  187. IEventHandle(const TActorId& session,
  188. ui32 type,
  189. TEventFlags flags,
  190. const TActorId& recipient,
  191. const TActorId& sender,
  192. TIntrusivePtr<TEventSerializedData> buffer,
  193. ui64 cookie,
  194. TScopeId originScopeId,
  195. NWilson::TTraceId traceId) noexcept
  196. : Type(type)
  197. , Flags(flags)
  198. , Recipient(recipient)
  199. , Sender(sender)
  200. , Cookie(cookie)
  201. , OriginScopeId(originScopeId)
  202. , TraceId(std::move(traceId))
  203. , InterconnectSession(session)
  204. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  205. , SendTime(0)
  206. #endif
  207. , Buffer(std::move(buffer))
  208. , RewriteRecipient(Recipient)
  209. , RewriteType(Type)
  210. {
  211. }
  212. TIntrusivePtr<TEventSerializedData> GetChainBuffer();
  213. TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer();
  214. ui32 GetSize() const {
  215. if (Buffer) {
  216. return Buffer->GetSize();
  217. } else if (Event) {
  218. return Event->CalculateSerializedSize();
  219. } else {
  220. return 0;
  221. }
  222. }
  223. bool HasBuffer() const {
  224. return bool(Buffer);
  225. }
  226. bool HasEvent() const {
  227. return bool(Event);
  228. }
  229. IEventBase* GetBase() {
  230. if (!Event) {
  231. if (!Buffer)
  232. return nullptr;
  233. else
  234. ythrow TWithBackTrace<yexception>() << "don't know how to load the event from buffer";
  235. }
  236. return Event.Get();
  237. }
  238. TAutoPtr<IEventBase> ReleaseBase() {
  239. TAutoPtr<IEventBase> x = GetBase();
  240. Y_UNUSED(Event.Release());
  241. Buffer.Reset();
  242. return x;
  243. }
  244. TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
  245. if (Event)
  246. return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
  247. else
  248. return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId));
  249. }
  250. TString GetTypeName() const;
  251. TString ToString() const;
  252. [[nodiscard]] static std::unique_ptr<IEventHandle> Forward(std::unique_ptr<IEventHandle>&& ev, TActorId recipient);
  253. [[nodiscard]] static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandle>&& ev, ui32 reason, bool unsure = false);
  254. [[nodiscard]] static TAutoPtr<IEventHandle> Forward(TAutoPtr<IEventHandle>&& ev, TActorId recipient) {
  255. return Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release();
  256. }
  257. [[nodiscard]] static THolder<IEventHandle> Forward(THolder<IEventHandle>&& ev, TActorId recipient) {
  258. return THolder(Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release());
  259. }
  260. [[nodiscard]] static TAutoPtr<IEventHandle> ForwardOnNondelivery(TAutoPtr<IEventHandle>&& ev, ui32 reason, bool unsure = false) {
  261. return ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release();
  262. }
  263. [[nodiscard]] static THolder<IEventHandle> ForwardOnNondelivery(THolder<IEventHandle>&& ev, ui32 reason, bool unsure = false) {
  264. return THolder(ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release());
  265. }
  266. template<typename T>
  267. static TAutoPtr<T> Release(TAutoPtr<IEventHandle>& ev) {
  268. return ev->Release<T>();
  269. }
  270. template<typename T>
  271. static TAutoPtr<T> Release(THolder<IEventHandle>& ev) {
  272. return ev->Release<T>();
  273. }
  274. template <typename TEv>
  275. inline TEv* StaticCastAsLocal() const noexcept { // blind cast
  276. return static_cast<TEv*>(Event.Get());
  277. }
  278. };
  279. template <typename TEventType>
  280. class TEventHandle: public IEventHandle {
  281. TEventHandle(); // we never made instance of TEventHandle
  282. public:
  283. TEventType* Get() {
  284. return IEventHandle::Get<TEventType>();
  285. }
  286. TAutoPtr<TEventType> Release() {
  287. return IEventHandle::Release<TEventType>();
  288. }
  289. };
  290. static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)");
  291. template <typename TEventType, ui32 EventType0>
  292. class TEventBase: public IEventBase {
  293. public:
  294. static constexpr ui32 EventType = EventType0;
  295. ui32 Type() const override {
  296. return EventType0;
  297. }
  298. // still abstract
  299. typedef TEventHandle<TEventType> THandle;
  300. typedef TAutoPtr<THandle> TPtr;
  301. };
  302. #define DEFINE_SIMPLE_LOCAL_EVENT(eventType, header) \
  303. TString ToStringHeader() const override { \
  304. return TString(header); \
  305. } \
  306. bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
  307. Y_ABORT("Local event " #eventType " is not serializable"); \
  308. } \
  309. static IEventBase* Load(NActors::TEventSerializedData*) { \
  310. Y_ABORT("Local event " #eventType " has no load method"); \
  311. } \
  312. bool IsSerializable() const override { \
  313. return false; \
  314. }
  315. #define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header) \
  316. TString ToStringHeader() const override { \
  317. return TString(header); \
  318. } \
  319. bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
  320. return true; \
  321. } \
  322. static IEventBase* Load(NActors::TEventSerializedData*) { \
  323. return new eventType(); \
  324. } \
  325. bool IsSerializable() const override { \
  326. return true; \
  327. }
  328. }