message.cpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. #include "remote_server_connection.h"
  2. #include "ybus.h"
  3. #include <util/random/random.h>
  4. #include <util/string/printf.h>
  5. #include <library/cpp/deprecated/atomic/atomic.h>
  6. #include <string.h>
  7. using namespace NBus;
  8. namespace NBus {
  9. using namespace NBus::NPrivate;
  10. TBusIdentity::TBusIdentity()
  11. : MessageId(0)
  12. , Size(0)
  13. , Flags(0)
  14. , LocalFlags(0)
  15. {
  16. }
  17. TBusIdentity::~TBusIdentity() {
  18. // TODO: print local flags
  19. #ifndef NDEBUG
  20. Y_ABORT_UNLESS(LocalFlags == 0, "local flags must be zero at this point; message type is %s",
  21. MessageType.value_or("unknown").c_str());
  22. #else
  23. Y_ABORT_UNLESS(LocalFlags == 0, "local flags must be zero at this point");
  24. #endif
  25. }
  26. TNetAddr TBusIdentity::GetNetAddr() const {
  27. if (!!Connection) {
  28. return Connection->GetAddr();
  29. } else {
  30. Y_ABORT();
  31. }
  32. }
  33. void TBusIdentity::Pack(char* dest) {
  34. memcpy(dest, this, sizeof(TBusIdentity));
  35. LocalFlags = 0;
  36. // prevent decref
  37. new (&Connection) TIntrusivePtr<TRemoteServerConnection>;
  38. }
  39. void TBusIdentity::Unpack(const char* src) {
  40. Y_ABORT_UNLESS(LocalFlags == 0);
  41. Y_ABORT_UNLESS(!Connection);
  42. memcpy(this, src, sizeof(TBusIdentity));
  43. }
  44. void TBusHeader::GenerateId() {
  45. for (;;) {
  46. Id = RandomNumber<TBusKey>();
  47. // Skip reserved ids
  48. if (IsBusKeyValid(Id))
  49. return;
  50. }
  51. }
  52. TBusMessage::TBusMessage(ui16 type, int approxsize)
  53. //: TCtr("BusMessage")
  54. : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
  55. , LocalFlags(0)
  56. , RequestSize(0)
  57. , Data(nullptr)
  58. {
  59. Y_UNUSED(approxsize);
  60. GetHeader()->Type = type;
  61. DoReset();
  62. }
  63. TBusMessage::TBusMessage(ECreateUninitialized)
  64. //: TCtr("BusMessage")
  65. : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
  66. , LocalFlags(0)
  67. , Data(nullptr)
  68. {
  69. }
  70. TString TBusMessage::Describe() const {
  71. return Sprintf("object type: %s, message type: %d", TypeName(*this).data(), int(GetHeader()->Type));
  72. }
  73. TBusMessage::~TBusMessage() {
  74. #ifndef NDEBUG
  75. Y_ABORT_UNLESS(GetHeader()->Id != YBUS_KEYINVALID, "must not be invalid key, message type: %d, ", int(Type));
  76. GetHeader()->Id = YBUS_KEYINVALID;
  77. Data = (void*)17;
  78. CheckClean();
  79. #endif
  80. }
  81. void TBusMessage::DoReset() {
  82. GetHeader()->SendTime = 0;
  83. GetHeader()->Size = 0;
  84. GetHeader()->FlagsInternal = 0;
  85. GetHeader()->GenerateId();
  86. GetHeader()->SetVersionInternal();
  87. }
  88. void TBusMessage::Reset() {
  89. CheckClean();
  90. DoReset();
  91. }
  92. void TBusMessage::CheckClean() const {
  93. if (Y_UNLIKELY(LocalFlags != 0)) {
  94. TString describe = Describe();
  95. TString localFlags = LocalFlagSetToString(LocalFlags);
  96. Y_ABORT("message local flags must be zero, got: %s, message: %s", localFlags.data(), describe.data());
  97. }
  98. }
  99. ///////////////////////////////////////////////////////
  100. /// \brief Unpacks header from network order
  101. /// \todo ntoh instead of memcpy
  102. int TBusHeader::ReadHeader(TArrayRef<const char> data) {
  103. Y_ASSERT(data.size() >= sizeof(TBusHeader));
  104. memcpy(this, data.data(), sizeof(TBusHeader));
  105. return sizeof(TBusHeader);
  106. }
  107. ///////////////////////////////////////////////////////
  108. /// \brief Packs header to network order
  109. //////////////////////////////////////////////////////////
  110. /// \brief serialize message identity to be used to construct reply message
  111. /// function stores messageid, flags and connection reply address into the buffer
  112. /// that can later be used to construct a reply to the message
  113. void TBusMessage::GetIdentity(TBusIdentity& data) const {
  114. data.MessageId = GetHeader()->Id;
  115. data.Size = GetHeader()->Size;
  116. data.Flags = GetHeader()->FlagsInternal;
  117. //data.LocalFlags = LocalFlags;
  118. }
  119. ////////////////////////////////////////////////////////////
  120. /// \brief set message identity from serialized form
  121. /// function restores messageid, flags and connection reply address from the buffer
  122. /// into the reply message
  123. void TBusMessage::SetIdentity(const TBusIdentity& data) {
  124. // TODO: wrong assertion: YBUS_KEYMIN is 0
  125. Y_ASSERT(data.MessageId != 0);
  126. bool compressed = IsCompressed();
  127. GetHeader()->Id = data.MessageId;
  128. GetHeader()->FlagsInternal = data.Flags;
  129. LocalFlags = data.LocalFlags & ~MESSAGE_IN_WORK;
  130. ReplyTo = data.Connection->PeerAddrSocketAddr;
  131. SetCompressed(compressed || IsCompressedResponse());
  132. }
  133. void TBusMessage::SetCompressed(bool v) {
  134. if (v) {
  135. GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_INTERNAL;
  136. } else {
  137. GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_INTERNAL);
  138. }
  139. }
  140. void TBusMessage::SetCompressedResponse(bool v) {
  141. if (v) {
  142. GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_RESPONSE;
  143. } else {
  144. GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_RESPONSE);
  145. }
  146. }
  147. TString TBusIdentity::ToString() const {
  148. TStringStream ss;
  149. ss << "msg-id=" << MessageId
  150. << " size=" << Size;
  151. if (!!Connection) {
  152. ss << " conn=" << Connection->GetAddr();
  153. }
  154. ss
  155. << " flags=" << Flags
  156. << " local-flags=" << LocalFlags
  157. #ifndef NDEBUG
  158. << " msg-type= " << MessageType.value_or("unknown").c_str()
  159. #endif
  160. ;
  161. return ss.Str();
  162. }
  163. }
  164. template <>
  165. void Out<TBusIdentity>(IOutputStream& os, TTypeTraits<TBusIdentity>::TFuncParam ident) {
  166. os << ident.ToString();
  167. }