remote_client_connection.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. #include "remote_client_connection.h"
  2. #include "mb_lwtrace.h"
  3. #include "network.h"
  4. #include "remote_client_session.h"
  5. #include <library/cpp/messagebus/actor/executor.h>
  6. #include <library/cpp/messagebus/actor/temp_tls_vector.h>
  7. #include <util/generic/cast.h>
  8. #include <util/thread/singleton.h>
  9. LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
  10. using namespace NActor;
  11. using namespace NBus;
  12. using namespace NBus::NPrivate;
  13. TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr)
  14. : TRemoteConnection(session.Get(), id, addr)
  15. , ClientHandler(GetSession()->ClientHandler)
  16. {
  17. Y_ABORT_UNLESS(addr.GetPort() > 0, "must connect to non-zero port");
  18. ScheduleWrite();
  19. }
  20. TRemoteClientSession* TRemoteClientConnection::GetSession() {
  21. return CheckedCast<TRemoteClientSession*>(Session.Get());
  22. }
  23. TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) {
  24. return AckMessages.Pop(id);
  25. }
  26. SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) {
  27. SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0);
  28. Y_ABORT_UNLESS(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText());
  29. TSocketHolder s(handle);
  30. SetNonBlock(s, true);
  31. SetNoDelay(s, Config.TcpNoDelay);
  32. SetSockOptTcpCork(s, Config.TcpCork);
  33. SetCloseOnExec(s, true);
  34. SetKeepAlive(s, true);
  35. if (Config.SocketRecvBufferSize != 0) {
  36. SetInputBuffer(s, Config.SocketRecvBufferSize);
  37. }
  38. if (Config.SocketSendBufferSize != 0) {
  39. SetOutputBuffer(s, Config.SocketSendBufferSize);
  40. }
  41. if (Config.SocketToS >= 0) {
  42. SetSocketToS(s, &addr, Config.SocketToS);
  43. }
  44. return s.Release();
  45. }
  46. void TRemoteClientConnection::TryConnect() {
  47. if (AtomicGet(WriterData.Down)) {
  48. return;
  49. }
  50. Y_ABORT_UNLESS(!WriterData.Status.Connected);
  51. TInstant now = TInstant::Now();
  52. if (!WriterData.Channel) {
  53. if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {
  54. DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
  55. return;
  56. }
  57. LastConnectAttempt = now;
  58. TSocket connectSocket(CreateSocket(PeerAddr));
  59. WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie));
  60. }
  61. if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) {
  62. // TryConnect is called from Writer::Act, which is called in cycle
  63. // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.
  64. return;
  65. }
  66. ++WriterData.Status.ConnectSyscalls;
  67. int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len());
  68. int err = ret ? LastSystemError() : 0;
  69. if (!ret || (ret && err == EISCONN)) {
  70. WriterData.Status.ConnectTime = now;
  71. ++WriterData.SocketVersion;
  72. WriterData.Channel->DisableWrite();
  73. WriterData.Status.Connected = true;
  74. AtomicSet(ReturnConnectFailedImmediately, false);
  75. WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket()));
  76. TSocket readSocket = WriterData.Channel->GetSocketPtr();
  77. ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
  78. FireClientConnectionEvent(TClientConnectionEvent::CONNECTED);
  79. ScheduleWrite();
  80. } else {
  81. if (WouldBlock() || err == EALREADY) {
  82. WriterData.Channel->EnableWrite();
  83. } else {
  84. WriterData.DropChannel();
  85. WriterData.Status.MyAddr = TNetAddr();
  86. WriterData.Status.Connected = false;
  87. WriterData.Status.ConnectError = err;
  88. DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
  89. }
  90. }
  91. }
  92. void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) {
  93. Y_UNUSED(socket);
  94. Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie);
  95. if (cookie == ReadCookie) {
  96. ScheduleRead();
  97. } else {
  98. ScheduleWrite();
  99. }
  100. }
  101. void TRemoteClientConnection::WriterFillStatus() {
  102. TRemoteConnection::WriterFillStatus();
  103. WriterData.Status.AckMessagesSize = AckMessages.Size();
  104. }
  105. void TRemoteClientConnection::BeforeTryWrite() {
  106. ProcessReplyQueue();
  107. TimeoutMessages();
  108. }
  109. namespace NBus {
  110. namespace NPrivate {
  111. class TInvokeOnReply: public IWorkItem {
  112. private:
  113. TRemoteClientSession* RemoteClientSession;
  114. TNonDestroyingHolder<TBusMessage> Request;
  115. TBusMessagePtrAndHeader Response;
  116. public:
  117. TInvokeOnReply(TRemoteClientSession* session,
  118. TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response)
  119. : RemoteClientSession(session)
  120. , Request(request)
  121. {
  122. Response.Swap(response);
  123. }
  124. void DoWork() override {
  125. THolder<TInvokeOnReply> holder(this);
  126. RemoteClientSession->ReleaseInFlightAndCallOnReply(Request.Release(), Response);
  127. // TODO: TRemoteClientSessionSemaphore should be enough
  128. RemoteClientSession->JobCount.Decrement();
  129. }
  130. };
  131. }
  132. }
  133. void TRemoteClientConnection::ProcessReplyQueue() {
  134. if (AtomicGet(WriterData.Down)) {
  135. return;
  136. }
  137. bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool;
  138. TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp;
  139. TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp;
  140. ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector());
  141. if (executeInWorkerPool) {
  142. workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size());
  143. }
  144. for (auto& resp : *replyQueueTemp.GetVector()) {
  145. TBusMessage* req = PopAck(resp.Header.Id);
  146. if (!req) {
  147. WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN);
  148. continue;
  149. }
  150. if (executeInWorkerPool) {
  151. workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp));
  152. } else {
  153. GetSession()->ReleaseInFlightAndCallOnReply(req, resp);
  154. }
  155. }
  156. if (executeInWorkerPool) {
  157. Session->JobCount.Add(workQueueTemp.GetVector()->size());
  158. Session->Queue->EnqueueWork(*workQueueTemp.GetVector());
  159. }
  160. }
  161. void TRemoteClientConnection::TimeoutMessages() {
  162. if (!TimeToTimeoutMessages.FetchTask()) {
  163. return;
  164. }
  165. TMessagesPtrs timedOutMessages;
  166. TInstant sendDeadline;
  167. TInstant ackDeadline;
  168. if (IsReturnConnectFailedImmediately()) {
  169. sendDeadline = TInstant::Max();
  170. ackDeadline = TInstant::Max();
  171. } else {
  172. TInstant now = TInstant::Now();
  173. sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout);
  174. ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout);
  175. }
  176. {
  177. TMessagesPtrs temp;
  178. WriterData.SendQueue.Timeout(sendDeadline, &temp);
  179. timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
  180. }
  181. // Ignores message that is being written currently (that is stored
  182. // in WriteMessage). It is not a big problem, because after written
  183. // to the network, message will be placed to the AckMessages queue,
  184. // and timed out on the next iteration of this procedure.
  185. {
  186. TMessagesPtrs temp;
  187. AckMessages.Timeout(ackDeadline, &temp);
  188. timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
  189. }
  190. ResetOneWayFlag(timedOutMessages);
  191. GetSession()->ReleaseInFlight(timedOutMessages);
  192. WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT);
  193. }
  194. void TRemoteClientConnection::ScheduleTimeoutMessages() {
  195. TimeToTimeoutMessages.AddTask();
  196. ScheduleWrite();
  197. }
  198. void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char>) {
  199. LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), "");
  200. ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1;
  201. // TODO: close connection
  202. Y_ABORT("unknown message");
  203. }
  204. void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
  205. Y_ASSERT(result.empty());
  206. TRemoteConnection::ClearOutgoingQueue(result, reconnect);
  207. AckMessages.Clear(&result);
  208. ResetOneWayFlag(result);
  209. GetSession()->ReleaseInFlight(result);
  210. }
  211. void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
  212. for (auto& message : messages) {
  213. bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL;
  214. if (oneWay) {
  215. message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
  216. TBusMessage* ackMsg = this->PopAck(message.Header.Id);
  217. if (!ackMsg) {
  218. // TODO: expired?
  219. }
  220. if (ackMsg != message.MessagePtr.Get()) {
  221. // TODO: non-unique id?
  222. }
  223. GetSession()->ReleaseInFlight({message.MessagePtr.Get()});
  224. ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release());
  225. } else {
  226. ClientHandler->OnMessageSent(message.MessagePtr.Get());
  227. AckMessages.Push(message);
  228. }
  229. }
  230. }
  231. EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) {
  232. return SendMessageImpl(req, wait, false);
  233. }
  234. EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) {
  235. return SendMessageImpl(req, wait, true);
  236. }
  237. EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) {
  238. msg->CheckClean();
  239. if (Session->IsDown()) {
  240. return MESSAGE_SHUTDOWN;
  241. }
  242. if (wait) {
  243. Y_ABORT_UNLESS(!Session->Queue->GetExecutor()->IsInExecutorThread());
  244. GetSession()->ClientRemoteInFlight.Wait();
  245. } else {
  246. if (!GetSession()->ClientRemoteInFlight.TryWait()) {
  247. return MESSAGE_BUSY;
  248. }
  249. }
  250. GetSession()->AcquireInFlight({msg});
  251. EMessageStatus ret = MESSAGE_OK;
  252. if (oneWay) {
  253. msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL;
  254. }
  255. msg->GetHeader()->SendTime = Now();
  256. if (IsReturnConnectFailedImmediately()) {
  257. ret = MESSAGE_CONNECT_FAILED;
  258. goto clean;
  259. }
  260. Send(msg);
  261. return MESSAGE_OK;
  262. clean:
  263. msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
  264. GetSession()->ReleaseInFlight({msg});
  265. return ret;
  266. }
  267. void TRemoteClientConnection::OpenConnection() {
  268. // TODO
  269. }