#include "remote_connection.h" #include "key_value_printer.h" #include "mb_lwtrace.h" #include "network.h" #include "remote_client_connection.h" #include "remote_client_session.h" #include "remote_server_session.h" #include "session_impl.h" #include #include #include #include LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) using namespace NActor; using namespace NBus; using namespace NBus::NPrivate; namespace NBus { namespace NPrivate { TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr) : TActor(session->Queue->WorkQueue.Get()) , TActor(session->Queue->WorkQueue.Get()) , TScheduleActor(&session->Queue->Scheduler) , Session(session) , Proto(session->Proto) , Config(session->Config) , RemovedFromSession(false) , ConnectionId(connectionId) , PeerAddr(addr) , PeerAddrSocketAddr(addr) , CreatedTime(TInstant::Now()) , ReturnConnectFailedImmediately(false) , GranStatus(Config.Secret.StatusFlushPeriod) , QuotaMsg(!Session->IsSource_, Config.PerConnectionMaxInFlight, 0) , QuotaBytes(!Session->IsSource_, Config.PerConnectionMaxInFlightBySize, 0) , MaxBufferSize(session->Config.MaxBufferSize) , ShutdownReason(MESSAGE_OK) { WriterData.Status.ConnectionId = connectionId; WriterData.Status.PeerAddr = PeerAddr; ReaderData.Status.ConnectionId = connectionId; const TInstant now = TInstant::Now(); WriterFillStatus(); GranStatus.Writer.Update(WriterData.Status, now, true); GranStatus.Reader.Update(ReaderData.Status, now, true); } TRemoteConnection::~TRemoteConnection() { Y_ABORT_UNLESS(ReplyQueue.IsEmpty()); } TRemoteConnection::TWriterData::TWriterData() : Down(0) , SocketVersion(0) , InFlight(0) , AwakeFlags(0) , State(WRITER_FILLING) { } TRemoteConnection::TWriterData::~TWriterData() { Y_ABORT_UNLESS(AtomicGet(Down)); Y_ABORT_UNLESS(SendQueue.Empty()); } bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { size_t left = Buffer.Size() - Offset; return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0; } void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) { Y_ABORT_UNLESS(!Channel, "must not have channel"); Y_ABORT_UNLESS(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty"); Y_ABORT_UNLESS(State == WRITER_FILLING, "state must be initial"); Channel = channel; } void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) { Y_ABORT_UNLESS(!Channel, "must not have channel"); Y_ABORT_UNLESS(Buffer.Empty(), "buffer must be empty"); Channel = channel; } void TRemoteConnection::TWriterData::DropChannel() { if (!!Channel) { Channel->Unregister(); Channel.Drop(); } Buffer.Reset(); State = WRITER_FILLING; } void TRemoteConnection::TReaderData::DropChannel() { // TODO: make Drop call Unregister if (!!Channel) { Channel->Unregister(); Channel.Drop(); } Buffer.Reset(); Offset = 0; } TRemoteConnection::TReaderData::TReaderData() : Down(0) , SocketVersion(0) , Offset(0) , MoreBytes(0) { } TRemoteConnection::TReaderData::~TReaderData() { Y_ABORT_UNLESS(AtomicGet(Down)); } void TRemoteConnection::Send(TNonDestroyingAutoPtr msg) { BeforeSendQueue.Enqueue(msg.Release()); AtomicIncrement(WriterData.InFlight); ScheduleWrite(); } void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { if (!reconnect) { // Do not clear send queue if reconnecting WriterData.SendQueue.Clear(&result); } } void TRemoteConnection::Shutdown(EMessageStatus status) { ScheduleShutdown(status); ReaderData.ShutdownComplete.WaitI(); WriterData.ShutdownComplete.WaitI(); } void TRemoteConnection::TryConnect() { Y_ABORT("TryConnect is client connection only operation"); } void TRemoteConnection::ScheduleRead() { GetReaderActor()->Schedule(); } void TRemoteConnection::ScheduleWrite() { GetWriterActor()->Schedule(); } void TRemoteConnection::WriterRotateCounters() { if (!WriterData.TimeToRotateCounters.FetchTask()) { return; } WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter; Reset(WriterData.Status.DurationCounter); } void TRemoteConnection::WriterSendStatus(TInstant now, bool force) { GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force); } void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) { GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force); } const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() { ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity(); ReaderData.Status.QuotaMsg = QuotaMsg.Tokens(); ReaderData.Status.QuotaBytes = QuotaBytes.Tokens(); return ReaderData.Status; } void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) { if (AtomicGet(ReaderData.Down)) { ReaderData.Status.Fd = INVALID_SOCKET; return; } ReaderData.DropChannel(); ReaderData.Status.Fd = readSocket.Socket; ReaderData.SocketVersion = readSocket.SocketVersion; if (readSocket.Socket != INVALID_SOCKET) { ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie)); ReaderData.Channel->EnableRead(); } } void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) { Y_ABORT_UNLESS(socketVersion <= WriterData.SocketVersion, "something weird"); if (WriterData.SocketVersion != socketVersion) { return; } Y_ABORT_UNLESS(WriterData.Status.Connected, "must be connected at this point"); Y_ABORT_UNLESS(!!WriterData.Channel, "must have channel at this point"); WriterData.Status.Connected = false; WriterData.DropChannel(); WriterData.Status.MyAddr = TNetAddr(); ++WriterData.SocketVersion; LastConnectAttempt = TInstant(); TMessagesPtrs cleared; ClearOutgoingQueue(cleared, true); WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED); FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion)); } void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) { WriterData.AwakeFlags |= awakeFlags; ReadQuotaWakeup(); } void TRemoteConnection::Act(TReaderTag) { TInstant now = TInstant::Now(); ReaderData.Status.Acts += 1; ReaderGetSocketQueue()->DequeueAllLikelyEmpty(); if (AtomicGet(ReaderData.Down)) { ReaderData.DropChannel(); ReaderProcessStatusDown(); ReaderData.ShutdownComplete.Signal(); } else if (!!ReaderData.Channel) { Y_ASSERT(ReaderData.ReadMessages.empty()); for (int i = 0;; ++i) { if (i == 100) { // perform other tasks GetReaderActor()->AddTaskFromActorLoop(); break; } if (NeedInterruptRead()) { ReaderData.Channel->EnableRead(); break; } if (!ReaderFillBuffer()) break; if (!ReaderProcessBuffer()) break; } ReaderFlushMessages(); } ReaderSendStatus(now); } bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) { ui32 wakeFlags = 0; if (!QuotaMsg.Acquire(msg)) wakeFlags |= WAKE_QUOTA_MSG; else if (!QuotaBytes.Acquire(bytes)) wakeFlags |= WAKE_QUOTA_BYTES; if (wakeFlags) { ReaderData.Status.QuotaExhausted++; WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags); } return wakeFlags == 0; } void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) { QuotaMsg.Consume(msg); QuotaBytes.Consume(bytes); } void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) { if (QuotaReturnValues(items, bytes)) ReadQuotaWakeup(); } void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) { if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down)) WriterGetWakeQueue()->EnqueueAndSchedule(0x0); } bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) { bool rMsg = QuotaMsg.Return(items); bool rBytes = QuotaBytes.Return(bytes); return rMsg || rBytes; } void TRemoteConnection::ReadQuotaWakeup() { const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags(); if (mask && mask == WriterData.AwakeFlags) { WriterData.Status.ReaderWakeups++; WriterData.AwakeFlags = 0; ScheduleRead(); } } ui32 TRemoteConnection::WriteWakeFlags() const { ui32 awakeFlags = 0; if (QuotaMsg.IsAboveWake()) awakeFlags |= WAKE_QUOTA_MSG; if (QuotaBytes.IsAboveWake()) awakeFlags |= WAKE_QUOTA_BYTES; return awakeFlags; } bool TRemoteConnection::ReaderProcessBuffer() { TInstant now = TInstant::Now(); for (;;) { if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) { break; } TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset)); if (header.Size < sizeof(TBusHeader)) { LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); return false; } if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) { LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); return false; } if (header.Size > Config.MaxMessageSize) { LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false); return false; } if (!ReaderData.HasBytesInBuf(header.Size)) { if (ReaderData.Offset == 0) { ReaderData.Buffer.Reserve(header.Size); } break; } if (!QuotaAcquire(1, header.Size)) return false; if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) { return false; } ReaderData.Offset += header.Size; } ReaderData.Buffer.ChopHead(ReaderData.Offset); ReaderData.Offset = 0; if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) { ReaderData.Status.Incremental.BufferDrops += 1; TBuffer temp; // probably should use another constant temp.Reserve(Config.DefaultBufferSize); temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size()); ReaderData.Buffer.Swap(temp); } return true; } bool TRemoteConnection::ReaderFillBuffer() { if (!ReaderData.BufferMore()) return true; if (ReaderData.Buffer.Avail() == 0) { if (ReaderData.Buffer.Size() == 0) { ReaderData.Buffer.Reserve(Config.DefaultBufferSize); } else { ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2); } } Y_ASSERT(ReaderData.Buffer.Avail() > 0); ssize_t bytes; { TWhatThreadDoesPushPop pp("recv syscall"); bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail())); } if (bytes < 0) { if (WouldBlock()) { ReaderData.Channel->EnableRead(); return false; } else { ReaderData.Channel->DisableRead(); ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); return false; } } if (bytes == 0) { ReaderData.Channel->DisableRead(); // TODO: incorrect: it is possible that only input is shutdown, and output is available ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); return false; } ReaderData.Status.Incremental.NetworkOps += 1; ReaderData.Buffer.Advance(bytes); ReaderData.MoreBytes = 0; return true; } void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) { BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason)); } void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) { TVectorSwaps replyQueueTemp; Y_ASSERT(replyQueueTemp.empty()); ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp); TVector messages; for (TVectorSwaps::reverse_iterator message = replyQueueTemp.rbegin(); message != replyQueueTemp.rend(); ++message) { messages.push_back(message->MessagePtr.Release()); } WriterErrorMessages(messages, reason); replyQueueTemp.clear(); } void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) { // legacy clients expect this field to be set if (!Session->IsSource_) { message->SendTime = now.MilliSeconds(); } WriterData.SendQueue.PushBack(message); } void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) { BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now)); } void TRemoteConnection::WriterFillInFlight() { // this is hack for TLoadBalancedProtocol WriterFillStatus(); AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight()); } const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() { WriterRotateCounters(); WriterFillStatus(); return WriterData.Status; } void TRemoteConnection::WriterFillStatus() { if (!!WriterData.Channel) { WriterData.Status.Fd = WriterData.Channel->GetSocket(); } else { WriterData.Status.Fd = INVALID_SOCKET; } WriterData.Status.BufferSize = WriterData.Buffer.Capacity(); WriterData.Status.SendQueueSize = WriterData.SendQueue.Size(); WriterData.Status.State = WriterData.State; } void TRemoteConnection::WriterProcessStatusDown() { Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental); Reset(WriterData.Status.Incremental); } void TRemoteConnection::ReaderProcessStatusDown() { Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental); Reset(ReaderData.Status.Incremental); } void TRemoteConnection::ProcessWriterDown() { if (!RemovedFromSession) { Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this); if (Session->IsSource_) { if (WriterData.Status.Connected) { FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); } } LWPROBE(Disconnected, ToString(PeerAddr)); RemovedFromSession = true; } WriterData.DropChannel(); DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN); WriterProcessStatusDown(); WriterData.ShutdownComplete.Signal(); } void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) { ClearReplyQueue(reasonForQueues); ClearBeforeSendQueue(reasonForQueues); WriterGetReconnectQueue()->Clear(); WriterGetWakeQueue()->Clear(); TMessagesPtrs cleared; ClearOutgoingQueue(cleared, false); if (!Session->IsSource_) { for (auto& i : cleared) { TBusMessagePtrAndHeader h(i); CheckedCast(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); // assignment back is weird i = h.MessagePtr.Release(); // and this part is not batch } } WriterErrorMessages(cleared, reason); } void TRemoteConnection::BeforeTryWrite() { } void TRemoteConnection::Act(TWriterTag) { TInstant now = TInstant::Now(); WriterData.Status.Acts += 1; if (Y_UNLIKELY(AtomicGet(WriterData.Down))) { // dump status must work even if WriterDown WriterSendStatus(now, true); ProcessWriterDown(); return; } ProcessBeforeSendQueue(now); BeforeTryWrite(); WriterFillInFlight(); WriterGetReconnectQueue()->DequeueAllLikelyEmpty(); if (!WriterData.Status.Connected) { TryConnect(); } else { for (int i = 0;; ++i) { if (i == 100) { // perform other tasks GetWriterActor()->AddTaskFromActorLoop(); break; } if (WriterData.State == WRITER_FILLING) { WriterFillBuffer(); if (WriterData.State == WRITER_FILLING) { WriterData.Channel->DisableWrite(); break; } Y_ASSERT(!WriterData.Buffer.Empty()); } if (WriterData.State == WRITER_FLUSHING) { WriterFlushBuffer(); if (WriterData.State == WRITER_FLUSHING) { break; } } } } WriterGetWakeQueue()->DequeueAllLikelyEmpty(); WriterSendStatus(now); } void TRemoteConnection::WriterFlushBuffer() { Y_ASSERT(WriterData.State == WRITER_FLUSHING); Y_ASSERT(!WriterData.Buffer.Empty()); WriterData.CorkUntil = TInstant::Zero(); while (!WriterData.Buffer.Empty()) { ssize_t bytes; { TWhatThreadDoesPushPop pp("send syscall"); bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size())); } if (bytes < 0) { if (WouldBlock()) { WriterData.Channel->EnableWrite(); return; } else { WriterData.Channel->DisableWrite(); ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, true); return; } } WriterData.Status.Incremental.NetworkOps += 1; WriterData.Buffer.LeftProceed(bytes); } WriterData.Buffer.Clear(); if (WriterData.Buffer.Capacity() > MaxBufferSize) { WriterData.Status.Incremental.BufferDrops += 1; WriterData.Buffer.Reset(); } WriterData.State = WRITER_FILLING; } void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) { if (Session->IsSource_) { WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); } else { ScheduleShutdown(status); } } void TRemoteConnection::ScheduleShutdown(EMessageStatus status) { ShutdownReason = status; AtomicSet(ReaderData.Down, 1); ScheduleRead(); AtomicSet(WriterData.Down, 1); ScheduleWrite(); } void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const { size_t posForAssertion = buffer.Size(); Proto->Serialize(msg, buffer); Y_ABORT_UNLESS(buffer.Size() >= posForAssertion, "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d", int(posForAssertion), int(buffer.Size())); } namespace { inline void WriteHeader(const TBusHeader& header, TBuffer& data) { data.Reserve(data.Size() + sizeof(TBusHeader)); /// \todo hton instead of memcpy memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader)); data.Advance(sizeof(TBusHeader)); } inline void WriteDummyHeader(TBuffer& data) { data.Resize(data.Size() + sizeof(TBusHeader)); } } void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const { size_t pos = data->Size(); size_t dataSize; bool compressionRequested = msg->IsCompressed(); if (compressionRequested) { TBuffer compdata; TBuffer plaindata; CallSerialize(msg, plaindata); dataSize = sizeof(TBusHeader) + plaindata.Size(); NCodecs::TCodecPtr c = Proto->GetTransportCodec(); c->Encode(TStringBuf{plaindata.data(), plaindata.size()}, compdata); if (compdata.Size() < plaindata.Size()) { plaindata.Clear(); msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size(); WriteHeader(*msg->GetHeader(), *data); data->Append(compdata.Data(), compdata.Size()); } else { compdata.Clear(); msg->SetCompressed(false); msg->GetHeader()->Size = sizeof(TBusHeader) + plaindata.Size(); WriteHeader(*msg->GetHeader(), *data); data->Append(plaindata.Data(), plaindata.Size()); } } else { WriteDummyHeader(*data); CallSerialize(msg, *data); dataSize = msg->GetHeader()->Size = data->Size() - pos; data->Proceed(pos); WriteHeader(*msg->GetHeader(), *data); data->Proceed(pos + msg->GetHeader()->Size); } Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos); counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested); } TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const { size_t dataSize; TBusMessage* message; if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) { TBuffer msg; { TBuffer plaindata; NCodecs::TCodecPtr c = Proto->GetTransportCodec(); try { TArrayRef payload = TBusMessage::GetPayload(dataRef); c->Decode(TStringBuf{payload.data(), payload.size()}, plaindata); } catch (...) { // catch all, because // http://nga.at.yandex-team.ru/replies.xml?item_no=3884 *status = MESSAGE_DECOMPRESS_ERROR; return nullptr; } msg.Append(dataRef.data(), sizeof(TBusHeader)); msg.Append(plaindata.Data(), plaindata.Size()); } TArrayRef msgRef(msg.Data(), msg.Size()); dataSize = sizeof(TBusHeader) + msgRef.size(); // TODO: track error types message = Proto->Deserialize(header->Type, msgRef.Slice(sizeof(TBusHeader))).Release(); if (!message) { *status = MESSAGE_DESERIALIZE_ERROR; return nullptr; } *message->GetHeader() = *header; message->SetCompressed(true); } else { dataSize = dataRef.size(); message = Proto->Deserialize(header->Type, dataRef.Slice(sizeof(TBusHeader))).Release(); if (!message) { *status = MESSAGE_DESERIALIZE_ERROR; return nullptr; } *message->GetHeader() = *header; } messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false); return message; } void TRemoteConnection::ResetOneWayFlag(TArrayRef messages) { for (auto message : messages) { message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; } } void TRemoteConnection::ReaderFlushMessages() { if (!ReaderData.ReadMessages.empty()) { Session->OnMessageReceived(this, ReaderData.ReadMessages); ReaderData.ReadMessages.clear(); } } // @return false if actor should break bool TRemoteConnection::MessageRead(TArrayRef readDataRef, TInstant now) { TBusHeader header(readDataRef); Y_ASSERT(readDataRef.size() == header.Size); if (header.GetVersionInternal() != YBUS_VERSION) { ReaderProcessMessageUnknownVersion(readDataRef); return true; } EMessageStatus deserializeFailureStatus = MESSAGE_OK; TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus); if (!r) { Y_ABORT_UNLESS(deserializeFailureStatus != MESSAGE_OK, "state check"); LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), ""); ReaderData.Status.Incremental.StatusCounter[deserializeFailureStatus] += 1; ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false); return false; } LWPROBE(Read, r->GetHeader()->Size); r->ReplyTo = PeerAddrSocketAddr; TBusMessagePtrAndHeader h(r); r->RecvTime = now; QuotaConsume(1, header.Size); ReaderData.ReadMessages.push_back(h); if (ReaderData.ReadMessages.size() >= 100) { ReaderFlushMessages(); } return true; } void TRemoteConnection::WriterFillBuffer() { Y_ASSERT(WriterData.State == WRITER_FILLING); Y_ASSERT(WriterData.Buffer.LeftSize() == 0); if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) { TVector headers; WrongVersionRequests.DequeueAllSingleConsumer(&headers); for (TVector::reverse_iterator header = headers.rbegin(); header != headers.rend(); ++header) { TBusHeader response = *header; response.SendTime = NBus::Now(); response.Size = sizeof(TBusHeader); response.FlagsInternal = 0; response.SetVersionInternal(YBUS_VERSION); WriteHeader(response, WriterData.Buffer.GetBuffer()); } Y_ASSERT(!WriterData.Buffer.Empty()); WriterData.State = WRITER_FLUSHING; return; } TTempTlsVector writeMessages; for (;;) { THolder writeMessage(WriterData.SendQueue.PopFront()); if (!writeMessage) { break; } if (Config.Cork != TDuration::Zero()) { if (WriterData.CorkUntil == TInstant::Zero()) { WriterData.CorkUntil = TInstant::Now() + Config.Cork; } } size_t sizeBeforeSerialize = WriterData.Buffer.Size(); TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter; SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter); size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize; if (written > Config.MaxMessageSize) { WriterData.Buffer.GetBuffer().EraseBack(written); WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE); continue; } WriterData.Status.Incremental.MessageCounter = messageCounter; TBusMessagePtrAndHeader h(writeMessage.Release()); writeMessages.GetVector()->push_back(h); Y_ASSERT(!WriterData.Buffer.Empty()); if (WriterData.Buffer.Size() >= Config.SendThreshold) { break; } } if (!WriterData.Buffer.Empty()) { if (WriterData.Buffer.Size() >= Config.SendThreshold) { WriterData.State = WRITER_FLUSHING; } else if (WriterData.CorkUntil == TInstant::Zero()) { WriterData.State = WRITER_FLUSHING; } else if (TInstant::Now() >= WriterData.CorkUntil) { WriterData.State = WRITER_FLUSHING; } else { // keep filling Y_ASSERT(WriterData.State == WRITER_FILLING); GetWriterSchedulerActor()->ScheduleAt(WriterData.CorkUntil); } } else { // keep filling Y_ASSERT(WriterData.State == WRITER_FILLING); } size_t bytes = MessageSize(*writeMessages.GetVector()); QuotaReturnSelf(writeMessages.GetVector()->size(), bytes); // This is called before `send` syscall inducing latency MessageSent(*writeMessages.GetVector()); } size_t TRemoteConnection::MessageSize(TArrayRef messages) { size_t size = 0; for (const auto& message : messages) size += message.MessagePtr->RequestSize; return size; } size_t TRemoteConnection::GetInFlight() { return AtomicGet(WriterData.InFlight); } size_t TRemoteConnection::GetConnectSyscallsNumForTest() { return WriterData.Status.ConnectSyscalls; } void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) { if (Session->IsSource_) { CheckedCast(Session.Get())->ReleaseInFlight({message}); WriterErrorMessage(message, status); } else { TBusMessagePtrAndHeader h(message); CheckedCast(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); WriterErrorMessage(h.MessagePtr.Release(), status); } } void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr m, EMessageStatus status) { TBusMessage* released = m.Release(); WriterErrorMessages(MakeArrayRef(&released, 1), status); } void TRemoteConnection::WriterErrorMessages(const TArrayRef ms, EMessageStatus status) { ResetOneWayFlag(ms); WriterData.Status.Incremental.StatusCounter[status] += ms.size(); for (auto m : ms) { Session->InvokeOnError(m, status); } } void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) { Y_ABORT_UNLESS(Session->IsSource_, "state check"); TClientConnectionEvent event(type, ConnectionId, PeerAddr); TRemoteClientSession* session = CheckedCast(Session.Get()); session->ClientHandler->OnClientConnectionEvent(event); } bool TRemoteConnection::IsAlive() const { return !AtomicGet(WriterData.Down); } } }