#include "remote_client_connection.h" #include "mb_lwtrace.h" #include "network.h" #include "remote_client_session.h" #include #include #include #include LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) using namespace NActor; using namespace NBus; using namespace NBus::NPrivate; TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr) : TRemoteConnection(session.Get(), id, addr) , ClientHandler(GetSession()->ClientHandler) { Y_ABORT_UNLESS(addr.GetPort() > 0, "must connect to non-zero port"); ScheduleWrite(); } TRemoteClientSession* TRemoteClientConnection::GetSession() { return CheckedCast(Session.Get()); } TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) { return AckMessages.Pop(id); } SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) { SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0); Y_ABORT_UNLESS(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText()); TSocketHolder s(handle); SetNonBlock(s, true); SetNoDelay(s, Config.TcpNoDelay); SetSockOptTcpCork(s, Config.TcpCork); SetCloseOnExec(s, true); SetKeepAlive(s, true); if (Config.SocketRecvBufferSize != 0) { SetInputBuffer(s, Config.SocketRecvBufferSize); } if (Config.SocketSendBufferSize != 0) { SetOutputBuffer(s, Config.SocketSendBufferSize); } if (Config.SocketToS >= 0) { SetSocketToS(s, &addr, Config.SocketToS); } return s.Release(); } void TRemoteClientConnection::TryConnect() { if (AtomicGet(WriterData.Down)) { return; } Y_ABORT_UNLESS(!WriterData.Status.Connected); TInstant now = TInstant::Now(); if (!WriterData.Channel) { if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) { DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); return; } LastConnectAttempt = now; TSocket connectSocket(CreateSocket(PeerAddr)); WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie)); } if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) { // TryConnect is called from Writer::Act, which is called in cycle // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects. return; } ++WriterData.Status.ConnectSyscalls; int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len()); int err = ret ? LastSystemError() : 0; if (!ret || (ret && err == EISCONN)) { WriterData.Status.ConnectTime = now; ++WriterData.SocketVersion; WriterData.Channel->DisableWrite(); WriterData.Status.Connected = true; AtomicSet(ReturnConnectFailedImmediately, false); WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket())); TSocket readSocket = WriterData.Channel->GetSocketPtr(); ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion)); FireClientConnectionEvent(TClientConnectionEvent::CONNECTED); ScheduleWrite(); } else { if (WouldBlock() || err == EALREADY) { WriterData.Channel->EnableWrite(); } else { WriterData.DropChannel(); WriterData.Status.MyAddr = TNetAddr(); WriterData.Status.Connected = false; WriterData.Status.ConnectError = err; DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); } } } void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) { Y_UNUSED(socket); Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie); if (cookie == ReadCookie) { ScheduleRead(); } else { ScheduleWrite(); } } void TRemoteClientConnection::WriterFillStatus() { TRemoteConnection::WriterFillStatus(); WriterData.Status.AckMessagesSize = AckMessages.Size(); } void TRemoteClientConnection::BeforeTryWrite() { ProcessReplyQueue(); TimeoutMessages(); } namespace NBus { namespace NPrivate { class TInvokeOnReply: public IWorkItem { private: TRemoteClientSession* RemoteClientSession; TNonDestroyingHolder Request; TBusMessagePtrAndHeader Response; public: TInvokeOnReply(TRemoteClientSession* session, TNonDestroyingAutoPtr request, TBusMessagePtrAndHeader& response) : RemoteClientSession(session) , Request(request) { Response.Swap(response); } void DoWork() override { THolder holder(this); RemoteClientSession->ReleaseInFlightAndCallOnReply(Request.Release(), Response); // TODO: TRemoteClientSessionSemaphore should be enough RemoteClientSession->JobCount.Decrement(); } }; } } void TRemoteClientConnection::ProcessReplyQueue() { if (AtomicGet(WriterData.Down)) { return; } bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool; TTempTlsVector replyQueueTemp; TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp; ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector()); if (executeInWorkerPool) { workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size()); } for (auto& resp : *replyQueueTemp.GetVector()) { TBusMessage* req = PopAck(resp.Header.Id); if (!req) { WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN); continue; } if (executeInWorkerPool) { workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp)); } else { GetSession()->ReleaseInFlightAndCallOnReply(req, resp); } } if (executeInWorkerPool) { Session->JobCount.Add(workQueueTemp.GetVector()->size()); Session->Queue->EnqueueWork(*workQueueTemp.GetVector()); } } void TRemoteClientConnection::TimeoutMessages() { if (!TimeToTimeoutMessages.FetchTask()) { return; } TMessagesPtrs timedOutMessages; TInstant sendDeadline; TInstant ackDeadline; if (IsReturnConnectFailedImmediately()) { sendDeadline = TInstant::Max(); ackDeadline = TInstant::Max(); } else { TInstant now = TInstant::Now(); sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout); ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout); } { TMessagesPtrs temp; WriterData.SendQueue.Timeout(sendDeadline, &temp); timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); } // Ignores message that is being written currently (that is stored // in WriteMessage). It is not a big problem, because after written // to the network, message will be placed to the AckMessages queue, // and timed out on the next iteration of this procedure. { TMessagesPtrs temp; AckMessages.Timeout(ackDeadline, &temp); timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); } ResetOneWayFlag(timedOutMessages); GetSession()->ReleaseInFlight(timedOutMessages); WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT); } void TRemoteClientConnection::ScheduleTimeoutMessages() { TimeToTimeoutMessages.AddTask(); ScheduleWrite(); } void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef) { LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), ""); ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1; // TODO: close connection Y_ABORT("unknown message"); } void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { Y_ASSERT(result.empty()); TRemoteConnection::ClearOutgoingQueue(result, reconnect); AckMessages.Clear(&result); ResetOneWayFlag(result); GetSession()->ReleaseInFlight(result); } void TRemoteClientConnection::MessageSent(TArrayRef messages) { for (auto& message : messages) { bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL; if (oneWay) { message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; TBusMessage* ackMsg = this->PopAck(message.Header.Id); if (!ackMsg) { // TODO: expired? } if (ackMsg != message.MessagePtr.Get()) { // TODO: non-unique id? } GetSession()->ReleaseInFlight({message.MessagePtr.Get()}); ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release()); } else { ClientHandler->OnMessageSent(message.MessagePtr.Get()); AckMessages.Push(message); } } } EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) { return SendMessageImpl(req, wait, false); } EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) { return SendMessageImpl(req, wait, true); } EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) { msg->CheckClean(); if (Session->IsDown()) { return MESSAGE_SHUTDOWN; } if (wait) { Y_ABORT_UNLESS(!Session->Queue->GetExecutor()->IsInExecutorThread()); GetSession()->ClientRemoteInFlight.Wait(); } else { if (!GetSession()->ClientRemoteInFlight.TryWait()) { return MESSAGE_BUSY; } } GetSession()->AcquireInFlight({msg}); EMessageStatus ret = MESSAGE_OK; if (oneWay) { msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL; } msg->GetHeader()->SendTime = Now(); if (IsReturnConnectFailedImmediately()) { ret = MESSAGE_CONNECT_FAILED; goto clean; } Send(msg); return MESSAGE_OK; clean: msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; GetSession()->ReleaseInFlight({msg}); return ret; } void TRemoteClientConnection::OpenConnection() { // TODO }