event.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. #pragma once
  2. #include "defs.h"
  3. #include "actorid.h"
  4. #include "callstack.h"
  5. #include "event_load.h"
  6. #include <library/cpp/actors/wilson/wilson_trace.h>
  7. #include <util/system/hp_timer.h>
  8. #include <util/generic/maybe.h>
  9. namespace NActors {
  10. class TChunkSerializer;
  11. class ISerializerToStream {
  12. public:
  13. virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
  14. };
  15. class IEventBase
  16. : TNonCopyable,
  17. public ISerializerToStream {
  18. public:
  19. // actual typing is performed by IEventHandle
  20. virtual ~IEventBase() {
  21. }
  22. virtual TString ToStringHeader() const = 0;
  23. virtual TString ToString() const {
  24. return ToStringHeader();
  25. }
  26. virtual ui32 CalculateSerializedSize() const {
  27. return 0;
  28. }
  29. virtual ui32 Type() const = 0;
  30. virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
  31. virtual bool IsSerializable() const = 0;
  32. virtual bool IsExtendedFormat() const {
  33. return false;
  34. }
  35. virtual ui32 CalculateSerializedSizeCached() const {
  36. return CalculateSerializedSize();
  37. }
  38. };
  39. // fat handle
  40. class IEventHandle : TNonCopyable {
  41. struct TOnNondelivery {
  42. TActorId Recipient;
  43. TOnNondelivery(const TActorId& recipient)
  44. : Recipient(recipient)
  45. {
  46. }
  47. };
  48. public:
  49. template <typename TEv>
  50. inline TEv* CastAsLocal() const noexcept {
  51. auto fits = GetTypeRewrite() == TEv::EventType;
  52. return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
  53. }
  54. template <typename TEventType>
  55. TEventType* Get() {
  56. if (Type != TEventType::EventType)
  57. Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);
  58. if (!Event) {
  59. Event.Reset(TEventType::Load(Buffer.Get()));
  60. }
  61. if (Event) {
  62. return static_cast<TEventType*>(Event.Get());
  63. }
  64. Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data());
  65. }
  66. template <typename T>
  67. TAutoPtr<T> Release() {
  68. TAutoPtr<T> x = Get<T>();
  69. Y_UNUSED(Event.Release());
  70. Buffer.Reset();
  71. return x;
  72. }
  73. enum EFlags {
  74. FlagTrackDelivery = 1 << 0,
  75. FlagForwardOnNondelivery = 1 << 1,
  76. FlagSubscribeOnSession = 1 << 2,
  77. FlagUseSubChannel = 1 << 3,
  78. FlagGenerateUnsureUndelivered = 1 << 4,
  79. FlagExtendedFormat = 1 << 5,
  80. };
  81. const ui32 Type;
  82. const ui32 Flags;
  83. const TActorId Recipient;
  84. const TActorId Sender;
  85. const ui64 Cookie;
  86. const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect
  87. // if set, used by ActorSystem/Interconnect to report tracepoints
  88. NWilson::TTraceId TraceId;
  89. // filled if feeded by interconnect session
  90. const TActorId InterconnectSession;
  91. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  92. ::NHPTimer::STime SendTime;
  93. #endif
  94. static const size_t ChannelBits = 12;
  95. static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
  96. #ifdef USE_ACTOR_CALLSTACK
  97. TCallstack Callstack;
  98. #endif
  99. ui16 GetChannel() const noexcept {
  100. return Flags >> ChannelShift;
  101. }
  102. ui64 GetSubChannel() const noexcept {
  103. return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL;
  104. }
  105. static ui32 MakeFlags(ui32 channel, ui32 flags) {
  106. Y_VERIFY(channel < (1 << ChannelBits));
  107. Y_VERIFY(flags < (1 << ChannelShift));
  108. return (flags | (channel << ChannelShift));
  109. }
  110. private:
  111. THolder<IEventBase> Event;
  112. TIntrusivePtr<TEventSerializedData> Buffer;
  113. TActorId RewriteRecipient;
  114. ui32 RewriteType;
  115. THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events
  116. public:
  117. void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
  118. RewriteRecipient = recipientRewrite;
  119. RewriteType = typeRewrite;
  120. }
  121. void DropRewrite() {
  122. RewriteRecipient = Recipient;
  123. RewriteType = Type;
  124. }
  125. const TActorId& GetRecipientRewrite() const {
  126. return RewriteRecipient;
  127. }
  128. ui32 GetTypeRewrite() const {
  129. return RewriteType;
  130. }
  131. TActorId GetForwardOnNondeliveryRecipient() const {
  132. return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
  133. }
  134. IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0,
  135. const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
  136. : Type(ev->Type())
  137. , Flags(flags)
  138. , Recipient(recipient)
  139. , Sender(sender)
  140. , Cookie(cookie)
  141. , TraceId(std::move(traceId))
  142. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  143. , SendTime(0)
  144. #endif
  145. , Event(ev)
  146. , RewriteRecipient(Recipient)
  147. , RewriteType(Type)
  148. {
  149. if (forwardOnNondelivery)
  150. OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
  151. }
  152. IEventHandle(ui32 type,
  153. ui32 flags,
  154. const TActorId& recipient,
  155. const TActorId& sender,
  156. TIntrusivePtr<TEventSerializedData> buffer,
  157. ui64 cookie,
  158. const TActorId* forwardOnNondelivery = nullptr,
  159. NWilson::TTraceId traceId = {})
  160. : Type(type)
  161. , Flags(flags)
  162. , Recipient(recipient)
  163. , Sender(sender)
  164. , Cookie(cookie)
  165. , TraceId(std::move(traceId))
  166. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  167. , SendTime(0)
  168. #endif
  169. , Buffer(std::move(buffer))
  170. , RewriteRecipient(Recipient)
  171. , RewriteType(Type)
  172. {
  173. if (forwardOnNondelivery)
  174. OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
  175. }
  176. // Special ctor for events from interconnect.
  177. IEventHandle(const TActorId& session,
  178. ui32 type,
  179. ui32 flags,
  180. const TActorId& recipient,
  181. const TActorId& sender,
  182. TIntrusivePtr<TEventSerializedData> buffer,
  183. ui64 cookie,
  184. TScopeId originScopeId,
  185. NWilson::TTraceId traceId) noexcept
  186. : Type(type)
  187. , Flags(flags)
  188. , Recipient(recipient)
  189. , Sender(sender)
  190. , Cookie(cookie)
  191. , OriginScopeId(originScopeId)
  192. , TraceId(std::move(traceId))
  193. , InterconnectSession(session)
  194. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  195. , SendTime(0)
  196. #endif
  197. , Buffer(std::move(buffer))
  198. , RewriteRecipient(Recipient)
  199. , RewriteType(Type)
  200. {
  201. }
  202. TIntrusivePtr<TEventSerializedData> GetChainBuffer();
  203. TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer();
  204. ui32 GetSize() const {
  205. if (Buffer) {
  206. return Buffer->GetSize();
  207. } else if (Event) {
  208. return Event->CalculateSerializedSize();
  209. } else {
  210. return 0;
  211. }
  212. }
  213. bool HasBuffer() const {
  214. return bool(Buffer);
  215. }
  216. bool HasEvent() const {
  217. return bool(Event);
  218. }
  219. IEventBase* GetBase() {
  220. if (!Event) {
  221. if (!Buffer)
  222. return nullptr;
  223. else
  224. ythrow TWithBackTrace<yexception>() << "don't know how to load the event from buffer";
  225. }
  226. return Event.Get();
  227. }
  228. TAutoPtr<IEventBase> ReleaseBase() {
  229. TAutoPtr<IEventBase> x = GetBase();
  230. Y_UNUSED(Event.Release());
  231. Buffer.Reset();
  232. return x;
  233. }
  234. TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
  235. if (Event)
  236. return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
  237. else
  238. return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId));
  239. }
  240. TAutoPtr<IEventHandle> ForwardOnNondelivery(ui32 reason, bool unsure = false);
  241. };
  242. template <typename TEventType>
  243. class TEventHandle: public IEventHandle {
  244. TEventHandle(); // we never made instance of TEventHandle
  245. public:
  246. TEventType* Get() {
  247. return IEventHandle::Get<TEventType>();
  248. }
  249. TAutoPtr<TEventType> Release() {
  250. return IEventHandle::Release<TEventType>();
  251. }
  252. };
  253. static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)");
  254. template <typename TEventType, ui32 EventType0>
  255. class TEventBase: public IEventBase {
  256. public:
  257. static constexpr ui32 EventType = EventType0;
  258. ui32 Type() const override {
  259. return EventType0;
  260. }
  261. // still abstract
  262. typedef TEventHandle<TEventType> THandle;
  263. typedef TAutoPtr<THandle> TPtr;
  264. };
  265. #define DEFINE_SIMPLE_LOCAL_EVENT(eventType, header) \
  266. TString ToStringHeader() const override { \
  267. return TString(header); \
  268. } \
  269. bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
  270. Y_FAIL("Local event " #eventType " is not serializable"); \
  271. } \
  272. static IEventBase* Load(NActors::TEventSerializedData*) { \
  273. Y_FAIL("Local event " #eventType " has no load method"); \
  274. } \
  275. bool IsSerializable() const override { \
  276. return false; \
  277. }
  278. #define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header) \
  279. TString ToStringHeader() const override { \
  280. return TString(header); \
  281. } \
  282. bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
  283. return true; \
  284. } \
  285. static IEventBase* Load(NActors::TEventSerializedData*) { \
  286. return new eventType(); \
  287. } \
  288. bool IsSerializable() const override { \
  289. return true; \
  290. }
  291. }