123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- #pragma once
- #include "base.h"
- #include "local_flags.h"
- #include "message_status.h"
- #include "netaddr.h"
- #include "socket_addr.h"
- #include <util/generic/array_ref.h>
- #include <util/generic/noncopyable.h>
- #include <util/generic/ptr.h>
- #include <util/generic/string.h>
- #include <util/system/defaults.h>
- #include <util/system/type_name.h>
- #include <util/system/yassert.h>
- #include <optional>
- #include <typeinfo>
- namespace NBus {
- ///////////////////////////////////////////////////////////////////
- /// \brief Structure to preserve identity from message to reply
- struct TBusIdentity : TNonCopyable {
- friend class TBusMessage;
- friend class NPrivate::TRemoteServerSession;
- friend struct NPrivate::TClientRequestImpl;
- friend class TOnMessageContext;
- // TODO: make private
- TBusKey MessageId;
- private:
- ui32 Size;
- TIntrusivePtr<NPrivate::TRemoteServerConnection> Connection;
- ui16 Flags;
- ui32 LocalFlags;
- TInstant RecvTime;
- #ifndef NDEBUG
- std::optional<TString> MessageType;
- #endif
- private:
- // TODO: drop
- TNetAddr GetNetAddr() const;
- public:
- void Pack(char* dest);
- void Unpack(const char* src);
- bool IsInWork() const {
- return LocalFlags & NPrivate::MESSAGE_IN_WORK;
- }
- // for internal use only
- void BeginWork() {
- SetInWork(true);
- }
- // for internal use only
- void EndWork() {
- SetInWork(false);
- }
- TBusIdentity();
- ~TBusIdentity();
- void Swap(TBusIdentity& that) {
- DoSwap(MessageId, that.MessageId);
- DoSwap(Size, that.Size);
- DoSwap(Connection, that.Connection);
- DoSwap(Flags, that.Flags);
- DoSwap(LocalFlags, that.LocalFlags);
- DoSwap(RecvTime, that.RecvTime);
- #ifndef NDEBUG
- DoSwap(MessageType, that.MessageType);
- #endif
- }
- TString ToString() const;
- private:
- void SetInWork(bool inWork) {
- if (LocalFlags == 0 && inWork) {
- LocalFlags = NPrivate::MESSAGE_IN_WORK;
- } else if (LocalFlags == NPrivate::MESSAGE_IN_WORK && !inWork) {
- LocalFlags = 0;
- } else {
- Y_ABORT("impossible combination of flag and parameter: %s %d",
- inWork ? "true" : "false", unsigned(LocalFlags));
- }
- }
- void SetMessageType(const std::type_info& messageTypeInfo) {
- #ifndef NDEBUG
- Y_ABORT_UNLESS(!MessageType, "state check");
- MessageType = TypeName(messageTypeInfo);
- #else
- Y_UNUSED(messageTypeInfo);
- #endif
- }
- };
- static const size_t BUS_IDENTITY_PACKED_SIZE = sizeof(TBusIdentity);
- ///////////////////////////////////////////////////////////////
- /// \brief Message flags in TBusHeader.Flags
- enum EMessageFlags {
- MESSAGE_COMPRESS_INTERNAL = 0x8000, ///< message is compressed
- MESSAGE_COMPRESS_RESPONSE = 0x4000, ///< message prefers compressed response
- MESSAGE_VERSION_INTERNAL = 0x00F0, ///< these bits are used as version
- };
- //////////////////////////////////////////////////////////
- /// \brief Message header present in all message send and received
- /// This header is send into the wire.
- /// \todo fix for low/high end, 32/64bit some day
- #pragma pack(1)
- struct TBusHeader {
- friend class TBusMessage;
- TBusKey Id = 0; ///< unique message ID
- ui32 Size = 0; ///< total size of the message
- TBusInstant SendTime = 0; ///< time the message was sent
- ui16 FlagsInternal = 0; ///< TRACE is one of the flags
- ui16 Type = 0; ///< to be used by TBusProtocol
- int GetVersionInternal() {
- return (FlagsInternal & MESSAGE_VERSION_INTERNAL) >> 4;
- }
- void SetVersionInternal(unsigned ver = YBUS_VERSION) {
- FlagsInternal |= (ver << 4);
- }
- public:
- TBusHeader() {
- }
- TBusHeader(TArrayRef<const char> data) {
- ReadHeader(data);
- }
- private:
- /// function for serialization/deserialization of the header
- /// returns number of bytes written/read
- int ReadHeader(TArrayRef<const char> data);
- void GenerateId();
- };
- #pragma pack()
- #define TBUSMAX_MESSAGE 26 * 1024 * 1024 + sizeof(NBus::TBusHeader) ///< is't it enough?
- #define TBUSMIN_MESSAGE sizeof(NBus::TBusHeader) ///< can't be less then header
- inline bool IsVersionNegotiation(const NBus::TBusHeader& header) {
- return header.Id == 0 && header.Size == sizeof(TBusHeader);
- }
- //////////////////////////////////////////////////////////
- /// \brief Base class for all messages passed in the system
- enum ECreateUninitialized {
- MESSAGE_CREATE_UNINITIALIZED,
- };
- class TBusMessage
- : protected TBusHeader,
- public TRefCounted<TBusMessage, TAtomicCounter, TDelete>,
- private TNonCopyable {
- friend class TLocalSession;
- friend struct ::NBus::NPrivate::TBusSessionImpl;
- friend class ::NBus::NPrivate::TRemoteServerSession;
- friend class ::NBus::NPrivate::TRemoteClientSession;
- friend class ::NBus::NPrivate::TRemoteConnection;
- friend class ::NBus::NPrivate::TRemoteClientConnection;
- friend class ::NBus::NPrivate::TRemoteServerConnection;
- friend struct ::NBus::NPrivate::TBusMessagePtrAndHeader;
- private:
- ui32 LocalFlags;
- /// connection identity for reply set by PushMessage()
- NPrivate::TBusSocketAddr ReplyTo;
- // server-side response only, hack
- ui32 RequestSize;
- TInstant RecvTime;
- public:
- /// constructor to create messages on sending end
- TBusMessage(ui16 type, int approxsize = sizeof(TBusHeader));
- /// constructor with serialzed data to examine the header
- TBusMessage(ECreateUninitialized);
- // slow, for diagnostics only
- virtual TString Describe() const;
- // must be called if this message object needs to be reused
- void Reset();
- void CheckClean() const;
- void SetCompressed(bool);
- void SetCompressedResponse(bool);
- private:
- bool IsCompressed() const {
- return FlagsInternal & MESSAGE_COMPRESS_INTERNAL;
- }
- bool IsCompressedResponse() const {
- return FlagsInternal & MESSAGE_COMPRESS_RESPONSE;
- }
- public:
- /// can have private data to destroy
- virtual ~TBusMessage();
- /// returns header of the message
- TBusHeader* GetHeader() {
- return this;
- }
- const TBusHeader* GetHeader() const {
- return this;
- }
- /// helper to return type for protocol object to unpack object
- static ui16 GetType(TArrayRef<const char> data) {
- return TBusHeader(data).Type;
- }
- /// returns payload data
- static TArrayRef<const char> GetPayload(TArrayRef<const char> data) {
- return data.Slice(sizeof(TBusHeader));
- }
- private:
- void DoReset();
- /// serialize message identity to be used to construct reply message
- void GetIdentity(TBusIdentity& ident) const;
- /// set message identity from serialized form
- void SetIdentity(const TBusIdentity& ident);
- public:
- TNetAddr GetReplyTo() const {
- return ReplyTo.ToNetAddr();
- }
- /// store of application specific data, never serialized into wire
- void* Data;
- };
- class TBusMessageAutoPtr: public TAutoPtr<TBusMessage> {
- public:
- TBusMessageAutoPtr() {
- }
- TBusMessageAutoPtr(TBusMessage* message)
- : TAutoPtr<TBusMessage>(message)
- {
- }
- template <typename T1>
- TBusMessageAutoPtr(const TAutoPtr<T1>& that)
- : TAutoPtr<TBusMessage>(that.Release())
- {
- }
- };
- }
|