message.h 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. #pragma once
  2. #include "base.h"
  3. #include "local_flags.h"
  4. #include "message_status.h"
  5. #include "netaddr.h"
  6. #include "socket_addr.h"
  7. #include <util/generic/array_ref.h>
  8. #include <util/generic/noncopyable.h>
  9. #include <util/generic/ptr.h>
  10. #include <util/generic/string.h>
  11. #include <util/system/defaults.h>
  12. #include <util/system/type_name.h>
  13. #include <util/system/yassert.h>
  14. #include <optional>
  15. #include <typeinfo>
  16. namespace NBus {
  17. ///////////////////////////////////////////////////////////////////
  18. /// \brief Structure to preserve identity from message to reply
  19. struct TBusIdentity : TNonCopyable {
  20. friend class TBusMessage;
  21. friend class NPrivate::TRemoteServerSession;
  22. friend struct NPrivate::TClientRequestImpl;
  23. friend class TOnMessageContext;
  24. // TODO: make private
  25. TBusKey MessageId;
  26. private:
  27. ui32 Size;
  28. TIntrusivePtr<NPrivate::TRemoteServerConnection> Connection;
  29. ui16 Flags;
  30. ui32 LocalFlags;
  31. TInstant RecvTime;
  32. #ifndef NDEBUG
  33. std::optional<TString> MessageType;
  34. #endif
  35. private:
  36. // TODO: drop
  37. TNetAddr GetNetAddr() const;
  38. public:
  39. void Pack(char* dest);
  40. void Unpack(const char* src);
  41. bool IsInWork() const {
  42. return LocalFlags & NPrivate::MESSAGE_IN_WORK;
  43. }
  44. // for internal use only
  45. void BeginWork() {
  46. SetInWork(true);
  47. }
  48. // for internal use only
  49. void EndWork() {
  50. SetInWork(false);
  51. }
  52. TBusIdentity();
  53. ~TBusIdentity();
  54. void Swap(TBusIdentity& that) {
  55. DoSwap(MessageId, that.MessageId);
  56. DoSwap(Size, that.Size);
  57. DoSwap(Connection, that.Connection);
  58. DoSwap(Flags, that.Flags);
  59. DoSwap(LocalFlags, that.LocalFlags);
  60. DoSwap(RecvTime, that.RecvTime);
  61. #ifndef NDEBUG
  62. DoSwap(MessageType, that.MessageType);
  63. #endif
  64. }
  65. TString ToString() const;
  66. private:
  67. void SetInWork(bool inWork) {
  68. if (LocalFlags == 0 && inWork) {
  69. LocalFlags = NPrivate::MESSAGE_IN_WORK;
  70. } else if (LocalFlags == NPrivate::MESSAGE_IN_WORK && !inWork) {
  71. LocalFlags = 0;
  72. } else {
  73. Y_ABORT("impossible combination of flag and parameter: %s %d",
  74. inWork ? "true" : "false", unsigned(LocalFlags));
  75. }
  76. }
  77. void SetMessageType(const std::type_info& messageTypeInfo) {
  78. #ifndef NDEBUG
  79. Y_ABORT_UNLESS(!MessageType, "state check");
  80. MessageType = TypeName(messageTypeInfo);
  81. #else
  82. Y_UNUSED(messageTypeInfo);
  83. #endif
  84. }
  85. };
  86. static const size_t BUS_IDENTITY_PACKED_SIZE = sizeof(TBusIdentity);
  87. ///////////////////////////////////////////////////////////////
  88. /// \brief Message flags in TBusHeader.Flags
  89. enum EMessageFlags {
  90. MESSAGE_COMPRESS_INTERNAL = 0x8000, ///< message is compressed
  91. MESSAGE_COMPRESS_RESPONSE = 0x4000, ///< message prefers compressed response
  92. MESSAGE_VERSION_INTERNAL = 0x00F0, ///< these bits are used as version
  93. };
  94. //////////////////////////////////////////////////////////
  95. /// \brief Message header present in all message send and received
  96. /// This header is send into the wire.
  97. /// \todo fix for low/high end, 32/64bit some day
  98. #pragma pack(1)
  99. struct TBusHeader {
  100. friend class TBusMessage;
  101. TBusKey Id = 0; ///< unique message ID
  102. ui32 Size = 0; ///< total size of the message
  103. TBusInstant SendTime = 0; ///< time the message was sent
  104. ui16 FlagsInternal = 0; ///< TRACE is one of the flags
  105. ui16 Type = 0; ///< to be used by TBusProtocol
  106. int GetVersionInternal() {
  107. return (FlagsInternal & MESSAGE_VERSION_INTERNAL) >> 4;
  108. }
  109. void SetVersionInternal(unsigned ver = YBUS_VERSION) {
  110. FlagsInternal |= (ver << 4);
  111. }
  112. public:
  113. TBusHeader() {
  114. }
  115. TBusHeader(TArrayRef<const char> data) {
  116. ReadHeader(data);
  117. }
  118. private:
  119. /// function for serialization/deserialization of the header
  120. /// returns number of bytes written/read
  121. int ReadHeader(TArrayRef<const char> data);
  122. void GenerateId();
  123. };
  124. #pragma pack()
  125. #define TBUSMAX_MESSAGE 26 * 1024 * 1024 + sizeof(NBus::TBusHeader) ///< is't it enough?
  126. #define TBUSMIN_MESSAGE sizeof(NBus::TBusHeader) ///< can't be less then header
  127. inline bool IsVersionNegotiation(const NBus::TBusHeader& header) {
  128. return header.Id == 0 && header.Size == sizeof(TBusHeader);
  129. }
  130. //////////////////////////////////////////////////////////
  131. /// \brief Base class for all messages passed in the system
  132. enum ECreateUninitialized {
  133. MESSAGE_CREATE_UNINITIALIZED,
  134. };
  135. class TBusMessage
  136. : protected TBusHeader,
  137. public TRefCounted<TBusMessage, TAtomicCounter, TDelete>,
  138. private TNonCopyable {
  139. friend class TLocalSession;
  140. friend struct ::NBus::NPrivate::TBusSessionImpl;
  141. friend class ::NBus::NPrivate::TRemoteServerSession;
  142. friend class ::NBus::NPrivate::TRemoteClientSession;
  143. friend class ::NBus::NPrivate::TRemoteConnection;
  144. friend class ::NBus::NPrivate::TRemoteClientConnection;
  145. friend class ::NBus::NPrivate::TRemoteServerConnection;
  146. friend struct ::NBus::NPrivate::TBusMessagePtrAndHeader;
  147. private:
  148. ui32 LocalFlags;
  149. /// connection identity for reply set by PushMessage()
  150. NPrivate::TBusSocketAddr ReplyTo;
  151. // server-side response only, hack
  152. ui32 RequestSize;
  153. TInstant RecvTime;
  154. public:
  155. /// constructor to create messages on sending end
  156. TBusMessage(ui16 type, int approxsize = sizeof(TBusHeader));
  157. /// constructor with serialzed data to examine the header
  158. TBusMessage(ECreateUninitialized);
  159. // slow, for diagnostics only
  160. virtual TString Describe() const;
  161. // must be called if this message object needs to be reused
  162. void Reset();
  163. void CheckClean() const;
  164. void SetCompressed(bool);
  165. void SetCompressedResponse(bool);
  166. private:
  167. bool IsCompressed() const {
  168. return FlagsInternal & MESSAGE_COMPRESS_INTERNAL;
  169. }
  170. bool IsCompressedResponse() const {
  171. return FlagsInternal & MESSAGE_COMPRESS_RESPONSE;
  172. }
  173. public:
  174. /// can have private data to destroy
  175. virtual ~TBusMessage();
  176. /// returns header of the message
  177. TBusHeader* GetHeader() {
  178. return this;
  179. }
  180. const TBusHeader* GetHeader() const {
  181. return this;
  182. }
  183. /// helper to return type for protocol object to unpack object
  184. static ui16 GetType(TArrayRef<const char> data) {
  185. return TBusHeader(data).Type;
  186. }
  187. /// returns payload data
  188. static TArrayRef<const char> GetPayload(TArrayRef<const char> data) {
  189. return data.Slice(sizeof(TBusHeader));
  190. }
  191. private:
  192. void DoReset();
  193. /// serialize message identity to be used to construct reply message
  194. void GetIdentity(TBusIdentity& ident) const;
  195. /// set message identity from serialized form
  196. void SetIdentity(const TBusIdentity& ident);
  197. public:
  198. TNetAddr GetReplyTo() const {
  199. return ReplyTo.ToNetAddr();
  200. }
  201. /// store of application specific data, never serialized into wire
  202. void* Data;
  203. };
  204. class TBusMessageAutoPtr: public TAutoPtr<TBusMessage> {
  205. public:
  206. TBusMessageAutoPtr() {
  207. }
  208. TBusMessageAutoPtr(TBusMessage* message)
  209. : TAutoPtr<TBusMessage>(message)
  210. {
  211. }
  212. template <typename T1>
  213. TBusMessageAutoPtr(const TAutoPtr<T1>& that)
  214. : TAutoPtr<TBusMessage>(that.Release())
  215. {
  216. }
  217. };
  218. }