123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- #pragma once
- #include "defs.h"
- #include "actorid.h"
- #include "callstack.h"
- #include "event_load.h"
- #include <library/cpp/actors/wilson/wilson_trace.h>
- #include <util/system/hp_timer.h>
- #include <util/generic/maybe.h>
- namespace NActors {
- class TChunkSerializer;
- class IActor;
- class ISerializerToStream {
- public:
- virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
- };
- class IEventBase
- : TNonCopyable,
- public ISerializerToStream {
- protected:
- // for compatibility with virtual actors
- virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandle> /*eventPtr*/) {
- Y_DEBUG_ABORT_UNLESS(false);
- return false;
- }
- public:
- // actual typing is performed by IEventHandle
- virtual ~IEventBase() {
- }
- bool Execute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) {
- return DoExecute(actor, std::move(eventPtr));
- }
- virtual TString ToStringHeader() const = 0;
- virtual TString ToString() const {
- return ToStringHeader();
- }
- virtual ui32 CalculateSerializedSize() const {
- return 0;
- }
- virtual ui32 Type() const = 0;
- virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
- virtual bool IsSerializable() const = 0;
- virtual ui32 CalculateSerializedSizeCached() const {
- return CalculateSerializedSize();
- }
- virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; }
- };
- // fat handle
- class IEventHandle : TNonCopyable {
- struct TOnNondelivery {
- TActorId Recipient;
- TOnNondelivery(const TActorId& recipient)
- : Recipient(recipient)
- {
- }
- };
- public:
- template <typename TEv>
- inline TEv* CastAsLocal() const noexcept {
- auto fits = GetTypeRewrite() == TEv::EventType;
- return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
- }
- template <typename TEventType>
- TEventType* Get() {
- if (Type != TEventType::EventType)
- Y_ABORT("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);
- if (!Event) {
- static TEventSerializedData empty;
- Event.Reset(TEventType::Load(Buffer ? Buffer.Get() : &empty));
- }
- if (Event) {
- return static_cast<TEventType*>(Event.Get());
- }
- Y_ABORT("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data());
- }
- template <typename T>
- TAutoPtr<T> Release() {
- TAutoPtr<T> x = Get<T>();
- Y_UNUSED(Event.Release());
- Buffer.Reset();
- return x;
- }
- enum EFlags: ui32 {
- FlagTrackDelivery = 1 << 0,
- FlagForwardOnNondelivery = 1 << 1,
- FlagSubscribeOnSession = 1 << 2,
- FlagUseSubChannel = 1 << 3,
- FlagGenerateUnsureUndelivered = 1 << 4,
- FlagExtendedFormat = 1 << 5,
- };
- using TEventFlags = ui32;
- const ui32 Type;
- const TEventFlags Flags;
- const TActorId Recipient;
- TActorId Sender;
- const ui64 Cookie;
- const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect
- // if set, used by ActorSystem/Interconnect to report tracepoints
- NWilson::TTraceId TraceId;
- // filled if feeded by interconnect session
- const TActorId InterconnectSession;
- #ifdef ACTORSLIB_COLLECT_EXEC_STATS
- ::NHPTimer::STime SendTime;
- #endif
- static const size_t ChannelBits = 12;
- static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
- #ifdef USE_ACTOR_CALLSTACK
- TCallstack Callstack;
- #endif
- ui16 GetChannel() const noexcept {
- return Flags >> ChannelShift;
- }
- ui64 GetSubChannel() const noexcept {
- return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL;
- }
- static ui32 MakeFlags(ui32 channel, TEventFlags flags) {
- Y_ABORT_UNLESS(channel < (1 << ChannelBits));
- Y_ABORT_UNLESS(flags < (1 << ChannelShift));
- return (flags | (channel << ChannelShift));
- }
- private:
- THolder<IEventBase> Event;
- TIntrusivePtr<TEventSerializedData> Buffer;
- TActorId RewriteRecipient;
- ui32 RewriteType;
- THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events
- public:
- void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
- RewriteRecipient = recipientRewrite;
- RewriteType = typeRewrite;
- }
- void DropRewrite() {
- RewriteRecipient = Recipient;
- RewriteType = Type;
- }
- const TActorId& GetRecipientRewrite() const {
- return RewriteRecipient;
- }
- ui32 GetTypeRewrite() const {
- return RewriteType;
- }
- TActorId GetForwardOnNondeliveryRecipient() const {
- return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
- }
- IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0,
- const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
- : Type(ev->Type())
- , Flags(flags)
- , Recipient(recipient)
- , Sender(sender)
- , Cookie(cookie)
- , TraceId(std::move(traceId))
- #ifdef ACTORSLIB_COLLECT_EXEC_STATS
- , SendTime(0)
- #endif
- , Event(ev)
- , RewriteRecipient(Recipient)
- , RewriteType(Type)
- {
- if (forwardOnNondelivery)
- OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
- }
- IEventHandle(ui32 type,
- TEventFlags flags,
- const TActorId& recipient,
- const TActorId& sender,
- TIntrusivePtr<TEventSerializedData> buffer,
- ui64 cookie,
- const TActorId* forwardOnNondelivery = nullptr,
- NWilson::TTraceId traceId = {})
- : Type(type)
- , Flags(flags)
- , Recipient(recipient)
- , Sender(sender)
- , Cookie(cookie)
- , TraceId(std::move(traceId))
- #ifdef ACTORSLIB_COLLECT_EXEC_STATS
- , SendTime(0)
- #endif
- , Buffer(std::move(buffer))
- , RewriteRecipient(Recipient)
- , RewriteType(Type)
- {
- if (forwardOnNondelivery)
- OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
- }
- // Special ctor for events from interconnect.
- IEventHandle(const TActorId& session,
- ui32 type,
- TEventFlags flags,
- const TActorId& recipient,
- const TActorId& sender,
- TIntrusivePtr<TEventSerializedData> buffer,
- ui64 cookie,
- TScopeId originScopeId,
- NWilson::TTraceId traceId) noexcept
- : Type(type)
- , Flags(flags)
- , Recipient(recipient)
- , Sender(sender)
- , Cookie(cookie)
- , OriginScopeId(originScopeId)
- , TraceId(std::move(traceId))
- , InterconnectSession(session)
- #ifdef ACTORSLIB_COLLECT_EXEC_STATS
- , SendTime(0)
- #endif
- , Buffer(std::move(buffer))
- , RewriteRecipient(Recipient)
- , RewriteType(Type)
- {
- }
- TIntrusivePtr<TEventSerializedData> GetChainBuffer();
- TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer();
- ui32 GetSize() const {
- if (Buffer) {
- return Buffer->GetSize();
- } else if (Event) {
- return Event->CalculateSerializedSize();
- } else {
- return 0;
- }
- }
- bool HasBuffer() const {
- return bool(Buffer);
- }
- bool HasEvent() const {
- return bool(Event);
- }
- IEventBase* GetBase() {
- if (!Event) {
- if (!Buffer)
- return nullptr;
- else
- ythrow TWithBackTrace<yexception>() << "don't know how to load the event from buffer";
- }
- return Event.Get();
- }
- TAutoPtr<IEventBase> ReleaseBase() {
- TAutoPtr<IEventBase> x = GetBase();
- Y_UNUSED(Event.Release());
- Buffer.Reset();
- return x;
- }
- TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
- if (Event)
- return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
- else
- return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId));
- }
- TString GetTypeName() const;
- TString ToString() const;
- [[nodiscard]] static std::unique_ptr<IEventHandle> Forward(std::unique_ptr<IEventHandle>&& ev, TActorId recipient);
- [[nodiscard]] static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandle>&& ev, ui32 reason, bool unsure = false);
- [[nodiscard]] static TAutoPtr<IEventHandle> Forward(TAutoPtr<IEventHandle>&& ev, TActorId recipient) {
- return Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release();
- }
- [[nodiscard]] static THolder<IEventHandle> Forward(THolder<IEventHandle>&& ev, TActorId recipient) {
- return THolder(Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release());
- }
- [[nodiscard]] static TAutoPtr<IEventHandle> ForwardOnNondelivery(TAutoPtr<IEventHandle>&& ev, ui32 reason, bool unsure = false) {
- return ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release();
- }
- [[nodiscard]] static THolder<IEventHandle> ForwardOnNondelivery(THolder<IEventHandle>&& ev, ui32 reason, bool unsure = false) {
- return THolder(ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release());
- }
- template<typename T>
- static TAutoPtr<T> Release(TAutoPtr<IEventHandle>& ev) {
- return ev->Release<T>();
- }
- template<typename T>
- static TAutoPtr<T> Release(THolder<IEventHandle>& ev) {
- return ev->Release<T>();
- }
- template <typename TEv>
- inline TEv* StaticCastAsLocal() const noexcept { // blind cast
- return static_cast<TEv*>(Event.Get());
- }
- };
- template <typename TEventType>
- class TEventHandle: public IEventHandle {
- TEventHandle(); // we never made instance of TEventHandle
- public:
- TEventType* Get() {
- return IEventHandle::Get<TEventType>();
- }
- TAutoPtr<TEventType> Release() {
- return IEventHandle::Release<TEventType>();
- }
- };
- static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)");
- template <typename TEventType, ui32 EventType0>
- class TEventBase: public IEventBase {
- public:
- static constexpr ui32 EventType = EventType0;
- ui32 Type() const override {
- return EventType0;
- }
- // still abstract
- typedef TEventHandle<TEventType> THandle;
- typedef TAutoPtr<THandle> TPtr;
- };
- #define DEFINE_SIMPLE_LOCAL_EVENT(eventType, header) \
- TString ToStringHeader() const override { \
- return TString(header); \
- } \
- bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
- Y_ABORT("Local event " #eventType " is not serializable"); \
- } \
- static IEventBase* Load(NActors::TEventSerializedData*) { \
- Y_ABORT("Local event " #eventType " has no load method"); \
- } \
- bool IsSerializable() const override { \
- return false; \
- }
- #define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header) \
- TString ToStringHeader() const override { \
- return TString(header); \
- } \
- bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
- return true; \
- } \
- static IEventBase* Load(NActors::TEventSerializedData*) { \
- return new eventType(); \
- } \
- bool IsSerializable() const override { \
- return true; \
- }
- }
|