remote_connection.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #pragma once
  2. #include "async_result.h"
  3. #include "defs.h"
  4. #include "event_loop.h"
  5. #include "left_right_buffer.h"
  6. #include "lfqueue_batch.h"
  7. #include "message_ptr_and_header.h"
  8. #include "nondestroying_holder.h"
  9. #include "remote_connection_status.h"
  10. #include "scheduler_actor.h"
  11. #include "socket_addr.h"
  12. #include "storage.h"
  13. #include "vector_swaps.h"
  14. #include "ybus.h"
  15. #include "misc/granup.h"
  16. #include "misc/tokenquota.h"
  17. #include <library/cpp/messagebus/actor/actor.h>
  18. #include <library/cpp/messagebus/actor/executor.h>
  19. #include <library/cpp/messagebus/actor/queue_for_actor.h>
  20. #include <library/cpp/messagebus/actor/queue_in_actor.h>
  21. #include <library/cpp/messagebus/scheduler/scheduler.h>
  22. #include <library/cpp/deprecated/atomic/atomic.h>
  23. #include <util/system/event.h>
  24. #include <util/thread/lfstack.h>
  25. namespace NBus {
  26. namespace NPrivate {
  27. class TRemoteConnection;
  28. typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
  29. typedef TIntrusivePtr<TBusSessionImpl> TRemoteSessionPtr;
  30. static void* const WriteCookie = (void*)1;
  31. static void* const ReadCookie = (void*)2;
  32. enum {
  33. WAKE_QUOTA_MSG = 0x01,
  34. WAKE_QUOTA_BYTES = 0x02
  35. };
  36. struct TWriterTag {};
  37. struct TReaderTag {};
  38. struct TReconnectTag {};
  39. struct TWakeReaderTag {};
  40. struct TWriterToReaderSocketMessage {
  41. TSocket Socket;
  42. ui32 SocketVersion;
  43. TWriterToReaderSocketMessage(TSocket socket, ui32 socketVersion)
  44. : Socket(socket)
  45. , SocketVersion(socketVersion)
  46. {
  47. }
  48. };
  49. class TRemoteConnection
  50. : public NEventLoop::IEventHandler,
  51. public ::NActor::TActor<TRemoteConnection, TWriterTag>,
  52. public ::NActor::TActor<TRemoteConnection, TReaderTag>,
  53. private ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>,
  54. private ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>,
  55. private ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>,
  56. public TScheduleActor<TRemoteConnection, TWriterTag> {
  57. friend struct TBusSessionImpl;
  58. friend class TRemoteClientSession;
  59. friend class TRemoteServerSession;
  60. friend class ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>;
  61. friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>;
  62. friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>;
  63. protected:
  64. ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>* ReaderGetSocketQueue() {
  65. return this;
  66. }
  67. ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>* WriterGetReconnectQueue() {
  68. return this;
  69. }
  70. ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>* WriterGetWakeQueue() {
  71. return this;
  72. }
  73. protected:
  74. TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr);
  75. ~TRemoteConnection() override;
  76. virtual void ClearOutgoingQueue(TMessagesPtrs&, bool reconnect /* or shutdown */);
  77. public:
  78. void Send(TNonDestroyingAutoPtr<TBusMessage> msg);
  79. void Shutdown(EMessageStatus status);
  80. inline const TNetAddr& GetAddr() const noexcept;
  81. private:
  82. friend class TScheduleConnect;
  83. friend class TWorkIO;
  84. protected:
  85. static size_t MessageSize(TArrayRef<TBusMessagePtrAndHeader>);
  86. bool QuotaAcquire(size_t msg, size_t bytes);
  87. void QuotaConsume(size_t msg, size_t bytes);
  88. void QuotaReturnSelf(size_t items, size_t bytes);
  89. bool QuotaReturnValues(size_t items, size_t bytes);
  90. bool ReaderProcessBuffer();
  91. bool ReaderFillBuffer();
  92. void ReaderFlushMessages();
  93. void ReadQuotaWakeup();
  94. ui32 WriteWakeFlags() const;
  95. virtual bool NeedInterruptRead() {
  96. return false;
  97. }
  98. public:
  99. virtual void TryConnect();
  100. void ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage);
  101. void ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion);
  102. void ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags);
  103. void Act(TReaderTag);
  104. inline void WriterBeforeWriteErrorMessage(TBusMessage*, EMessageStatus);
  105. void ClearBeforeSendQueue(EMessageStatus reasonForQueues);
  106. void ClearReplyQueue(EMessageStatus reasonForQueues);
  107. inline void ProcessBeforeSendQueueMessage(TBusMessage*, TInstant now);
  108. void ProcessBeforeSendQueue(TInstant now);
  109. void WriterProcessStatusDown();
  110. void ReaderProcessStatusDown();
  111. void ProcessWriterDown();
  112. void DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues);
  113. const TRemoteConnectionWriterStatus& WriterGetStatus();
  114. virtual void WriterFillStatus();
  115. void WriterFillInFlight();
  116. virtual void BeforeTryWrite();
  117. void Act(TWriterTag);
  118. void ScheduleRead();
  119. void ScheduleWrite();
  120. void ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer);
  121. void ScheduleShutdown(EMessageStatus status);
  122. void WriterFlushBuffer();
  123. void WriterFillBuffer();
  124. void ReaderSendStatus(TInstant now, bool force = false);
  125. const TRemoteConnectionReaderStatus& ReaderFillStatus();
  126. void WriterRotateCounters();
  127. void WriterSendStatus(TInstant now, bool force = false);
  128. void WriterSendStatusIfNecessary(TInstant now);
  129. void QuotaReturnAside(size_t items, size_t bytes);
  130. virtual void ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) = 0;
  131. bool MessageRead(TArrayRef<const char> dataRef, TInstant now);
  132. virtual void MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) = 0;
  133. void CallSerialize(TBusMessage* msg, TBuffer& buffer) const;
  134. void SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const;
  135. TBusMessage* DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const;
  136. void ResetOneWayFlag(TArrayRef<TBusMessage*>);
  137. inline ::NActor::TActor<TRemoteConnection, TWriterTag>* GetWriterActor() {
  138. return this;
  139. }
  140. inline ::NActor::TActor<TRemoteConnection, TReaderTag>* GetReaderActor() {
  141. return this;
  142. }
  143. inline TScheduleActor<TRemoteConnection, TWriterTag>* GetWriterSchedulerActor() {
  144. return this;
  145. }
  146. void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status);
  147. // takes ownership of ms
  148. void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status);
  149. void FireClientConnectionEvent(TClientConnectionEvent::EType);
  150. size_t GetInFlight();
  151. size_t GetConnectSyscallsNumForTest();
  152. bool IsReturnConnectFailedImmediately() {
  153. return (bool)AtomicGet(ReturnConnectFailedImmediately);
  154. }
  155. bool IsAlive() const;
  156. TRemoteSessionPtr Session;
  157. TBusProtocol* const Proto;
  158. TBusSessionConfig const Config;
  159. bool RemovedFromSession;
  160. const ui64 ConnectionId;
  161. const TNetAddr PeerAddr;
  162. const TBusSocketAddr PeerAddrSocketAddr;
  163. const TInstant CreatedTime;
  164. TInstant LastConnectAttempt;
  165. TAtomic ReturnConnectFailedImmediately;
  166. protected:
  167. ::NActor::TQueueForActor<TBusMessage*> BeforeSendQueue;
  168. TLockFreeStack<TBusHeader> WrongVersionRequests;
  169. struct TWriterData {
  170. TAtomic Down;
  171. NEventLoop::TChannelPtr Channel;
  172. ui32 SocketVersion;
  173. TRemoteConnectionWriterStatus Status;
  174. TInstant StatusLastSendTime;
  175. TLocalTasks TimeToRotateCounters;
  176. TAtomic InFlight;
  177. TTimedMessages SendQueue;
  178. ui32 AwakeFlags;
  179. EWriterState State;
  180. TLeftRightBuffer Buffer;
  181. TInstant CorkUntil;
  182. TSystemEvent ShutdownComplete;
  183. void SetChannel(NEventLoop::TChannelPtr channel);
  184. void DropChannel();
  185. TWriterData();
  186. ~TWriterData();
  187. };
  188. struct TReaderData {
  189. TAtomic Down;
  190. NEventLoop::TChannelPtr Channel;
  191. ui32 SocketVersion;
  192. TRemoteConnectionReaderStatus Status;
  193. TInstant StatusLastSendTime;
  194. TBuffer Buffer;
  195. size_t Offset; /* offset in read buffer */
  196. size_t MoreBytes; /* more bytes required from socket */
  197. TVectorSwaps<TBusMessagePtrAndHeader> ReadMessages;
  198. TSystemEvent ShutdownComplete;
  199. bool BufferMore() const noexcept {
  200. return MoreBytes > 0;
  201. }
  202. bool HasBytesInBuf(size_t bytes) noexcept;
  203. void SetChannel(NEventLoop::TChannelPtr channel);
  204. void DropChannel();
  205. TReaderData();
  206. ~TReaderData();
  207. };
  208. // owned by session status actor
  209. struct TGranStatus {
  210. TGranStatus(TDuration gran)
  211. : Writer(gran)
  212. , Reader(gran)
  213. {
  214. }
  215. TGranUp<TRemoteConnectionWriterStatus> Writer;
  216. TGranUp<TRemoteConnectionReaderStatus> Reader;
  217. };
  218. TWriterData WriterData;
  219. TReaderData ReaderData;
  220. TGranStatus GranStatus;
  221. TTokenQuota QuotaMsg;
  222. TTokenQuota QuotaBytes;
  223. size_t MaxBufferSize;
  224. // client connection only
  225. TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue;
  226. EMessageStatus ShutdownReason;
  227. };
  228. inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept {
  229. return PeerAddr;
  230. }
  231. typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
  232. }
  233. }