|
- #include "remote_client_connection.h"
- #include "mb_lwtrace.h"
- #include "network.h"
- #include "remote_client_session.h"
- #include <library/cpp/messagebus/actor/executor.h>
- #include <library/cpp/messagebus/actor/temp_tls_vector.h>
- #include <util/generic/cast.h>
- #include <util/thread/singleton.h>
- LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
- using namespace NActor;
- using namespace NBus;
- using namespace NBus::NPrivate;
- TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr)
- : TRemoteConnection(session.Get(), id, addr)
- , ClientHandler(GetSession()->ClientHandler)
- {
- Y_ABORT_UNLESS(addr.GetPort() > 0, "must connect to non-zero port");
- ScheduleWrite();
- }
- TRemoteClientSession* TRemoteClientConnection::GetSession() {
- return CheckedCast<TRemoteClientSession*>(Session.Get());
- }
- TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) {
- return AckMessages.Pop(id);
- }
- SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) {
- SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0);
- Y_ABORT_UNLESS(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText());
- TSocketHolder s(handle);
- SetNonBlock(s, true);
- SetNoDelay(s, Config.TcpNoDelay);
- SetSockOptTcpCork(s, Config.TcpCork);
- SetCloseOnExec(s, true);
- SetKeepAlive(s, true);
- if (Config.SocketRecvBufferSize != 0) {
- SetInputBuffer(s, Config.SocketRecvBufferSize);
- }
- if (Config.SocketSendBufferSize != 0) {
- SetOutputBuffer(s, Config.SocketSendBufferSize);
- }
- if (Config.SocketToS >= 0) {
- SetSocketToS(s, &addr, Config.SocketToS);
- }
- return s.Release();
- }
- void TRemoteClientConnection::TryConnect() {
- if (AtomicGet(WriterData.Down)) {
- return;
- }
- Y_ABORT_UNLESS(!WriterData.Status.Connected);
- TInstant now = TInstant::Now();
- if (!WriterData.Channel) {
- if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {
- DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
- return;
- }
- LastConnectAttempt = now;
- TSocket connectSocket(CreateSocket(PeerAddr));
- WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie));
- }
- if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) {
- // TryConnect is called from Writer::Act, which is called in cycle
- // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.
- return;
- }
- ++WriterData.Status.ConnectSyscalls;
- int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len());
- int err = ret ? LastSystemError() : 0;
- if (!ret || (ret && err == EISCONN)) {
- WriterData.Status.ConnectTime = now;
- ++WriterData.SocketVersion;
- WriterData.Channel->DisableWrite();
- WriterData.Status.Connected = true;
- AtomicSet(ReturnConnectFailedImmediately, false);
- WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket()));
- TSocket readSocket = WriterData.Channel->GetSocketPtr();
- ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
- FireClientConnectionEvent(TClientConnectionEvent::CONNECTED);
- ScheduleWrite();
- } else {
- if (WouldBlock() || err == EALREADY) {
- WriterData.Channel->EnableWrite();
- } else {
- WriterData.DropChannel();
- WriterData.Status.MyAddr = TNetAddr();
- WriterData.Status.Connected = false;
- WriterData.Status.ConnectError = err;
- DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
- }
- }
- }
- void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) {
- Y_UNUSED(socket);
- Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie);
- if (cookie == ReadCookie) {
- ScheduleRead();
- } else {
- ScheduleWrite();
- }
- }
- void TRemoteClientConnection::WriterFillStatus() {
- TRemoteConnection::WriterFillStatus();
- WriterData.Status.AckMessagesSize = AckMessages.Size();
- }
- void TRemoteClientConnection::BeforeTryWrite() {
- ProcessReplyQueue();
- TimeoutMessages();
- }
- namespace NBus {
- namespace NPrivate {
- class TInvokeOnReply: public IWorkItem {
- private:
- TRemoteClientSession* RemoteClientSession;
- TNonDestroyingHolder<TBusMessage> Request;
- TBusMessagePtrAndHeader Response;
- public:
- TInvokeOnReply(TRemoteClientSession* session,
- TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response)
- : RemoteClientSession(session)
- , Request(request)
- {
- Response.Swap(response);
- }
- void DoWork() override {
- THolder<TInvokeOnReply> holder(this);
- RemoteClientSession->ReleaseInFlightAndCallOnReply(Request.Release(), Response);
- // TODO: TRemoteClientSessionSemaphore should be enough
- RemoteClientSession->JobCount.Decrement();
- }
- };
- }
- }
- void TRemoteClientConnection::ProcessReplyQueue() {
- if (AtomicGet(WriterData.Down)) {
- return;
- }
- bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool;
- TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp;
- TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp;
- ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector());
- if (executeInWorkerPool) {
- workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size());
- }
- for (auto& resp : *replyQueueTemp.GetVector()) {
- TBusMessage* req = PopAck(resp.Header.Id);
- if (!req) {
- WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN);
- continue;
- }
- if (executeInWorkerPool) {
- workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp));
- } else {
- GetSession()->ReleaseInFlightAndCallOnReply(req, resp);
- }
- }
- if (executeInWorkerPool) {
- Session->JobCount.Add(workQueueTemp.GetVector()->size());
- Session->Queue->EnqueueWork(*workQueueTemp.GetVector());
- }
- }
- void TRemoteClientConnection::TimeoutMessages() {
- if (!TimeToTimeoutMessages.FetchTask()) {
- return;
- }
- TMessagesPtrs timedOutMessages;
- TInstant sendDeadline;
- TInstant ackDeadline;
- if (IsReturnConnectFailedImmediately()) {
- sendDeadline = TInstant::Max();
- ackDeadline = TInstant::Max();
- } else {
- TInstant now = TInstant::Now();
- sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout);
- ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout);
- }
- {
- TMessagesPtrs temp;
- WriterData.SendQueue.Timeout(sendDeadline, &temp);
- timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
- }
- // Ignores message that is being written currently (that is stored
- // in WriteMessage). It is not a big problem, because after written
- // to the network, message will be placed to the AckMessages queue,
- // and timed out on the next iteration of this procedure.
- {
- TMessagesPtrs temp;
- AckMessages.Timeout(ackDeadline, &temp);
- timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
- }
- ResetOneWayFlag(timedOutMessages);
- GetSession()->ReleaseInFlight(timedOutMessages);
- WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT);
- }
- void TRemoteClientConnection::ScheduleTimeoutMessages() {
- TimeToTimeoutMessages.AddTask();
- ScheduleWrite();
- }
- void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char>) {
- LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), "");
- ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1;
- // TODO: close connection
- Y_ABORT("unknown message");
- }
- void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
- Y_ASSERT(result.empty());
- TRemoteConnection::ClearOutgoingQueue(result, reconnect);
- AckMessages.Clear(&result);
- ResetOneWayFlag(result);
- GetSession()->ReleaseInFlight(result);
- }
- void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
- for (auto& message : messages) {
- bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL;
- if (oneWay) {
- message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
- TBusMessage* ackMsg = this->PopAck(message.Header.Id);
- if (!ackMsg) {
- // TODO: expired?
- }
- if (ackMsg != message.MessagePtr.Get()) {
- // TODO: non-unique id?
- }
- GetSession()->ReleaseInFlight({message.MessagePtr.Get()});
- ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release());
- } else {
- ClientHandler->OnMessageSent(message.MessagePtr.Get());
- AckMessages.Push(message);
- }
- }
- }
- EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) {
- return SendMessageImpl(req, wait, false);
- }
- EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) {
- return SendMessageImpl(req, wait, true);
- }
- EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) {
- msg->CheckClean();
- if (Session->IsDown()) {
- return MESSAGE_SHUTDOWN;
- }
- if (wait) {
- Y_ABORT_UNLESS(!Session->Queue->GetExecutor()->IsInExecutorThread());
- GetSession()->ClientRemoteInFlight.Wait();
- } else {
- if (!GetSession()->ClientRemoteInFlight.TryWait()) {
- return MESSAGE_BUSY;
- }
- }
- GetSession()->AcquireInFlight({msg});
- EMessageStatus ret = MESSAGE_OK;
- if (oneWay) {
- msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL;
- }
- msg->GetHeader()->SendTime = Now();
- if (IsReturnConnectFailedImmediately()) {
- ret = MESSAGE_CONNECT_FAILED;
- goto clean;
- }
- Send(msg);
- return MESSAGE_OK;
- clean:
- msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
- GetSession()->ReleaseInFlight({msg});
- return ret;
- }
- void TRemoteClientConnection::OpenConnection() {
- // TODO
- }
|