|
- #include "interconnect_tcp_proxy.h"
- #include "interconnect_tcp_session.h"
- #include "interconnect_handshake.h"
- #include <library/cpp/actors/core/probes.h>
- #include <library/cpp/actors/core/log.h>
- #include <library/cpp/actors/core/interconnect.h>
- #include <library/cpp/actors/util/datetime.h>
- #include <library/cpp/actors/protos/services_common.pb.h>
- #include <library/cpp/monlib/service/pages/templates.h>
- namespace NActors {
- LWTRACE_USING(ACTORLIB_PROVIDER);
- DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes));
- template<typename T>
- T Coalesce(T&& x) {
- return x;
- }
- template<typename T, typename T2, typename... TRest>
- typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) {
- if (first != typename std::remove_reference<T>::type()) {
- return first;
- } else {
- return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...);
- }
- }
- TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params)
- : TActor(&TInterconnectSessionTCP::StateFunc)
- , Created(TInstant::Now())
- , Proxy(proxy)
- , CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this))
- , LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this))
- , Params(std::move(params))
- , TotalOutputQueueSize(0)
- , OutputStuckFlag(false)
- , OutputQueueUtilization(16)
- , OutputCounter(0ULL)
- {
- Proxy->Metrics->SetConnected(0);
- ReceiveContext.Reset(new TReceiveContext);
- }
- TInterconnectSessionTCP::~TInterconnectSessionTCP() {
- // close socket ASAP when actor system is being shut down
- if (Socket) {
- Socket->Shutdown(SHUT_RDWR);
- }
- }
- void TInterconnectSessionTCP::Init() {
- auto destroyCallback = [as = TlsActivationContext->ExecutorThread.ActorSystem, id = Proxy->Common->DestructorId](THolder<IEventBase> event) {
- as->Send(id, event.Release());
- };
- Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback));
- ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool,
- Proxy->Common->Settings.MaxSerializedEventSize, Params);
- LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId);
- SetPrefix(Sprintf("Session %s [node %" PRIu32 "]", SelfId().ToString().data(), Proxy->PeerNodeId));
- SendUpdateToWhiteboard();
- }
- void TInterconnectSessionTCP::CloseInputSession() {
- Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession);
- }
- void TInterconnectSessionTCP::Handle(TEvTerminate::TPtr& ev) {
- Terminate(ev->Get()->Reason);
- }
- void TInterconnectSessionTCP::HandlePoison() {
- Terminate(TDisconnectReason());
- }
- void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) {
- LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1));
- IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this);
- ShutdownSocket(std::move(reason));
- for (const auto& kv : Subscribers) {
- Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
- }
- Proxy->Metrics->SubSubscribersCount(Subscribers.size());
- Subscribers.clear();
- ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
- channel.NotifyUndelivered();
- });
- if (ReceiverId) {
- Send(ReceiverId, new TEvents::TEvPoisonPill);
- }
- SendUpdateToWhiteboard(false);
- Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize);
- Proxy->Metrics->SubInflightDataAmount(InflightDataAmount);
- LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId);
- if (!Subscribers.empty()) {
- Proxy->Metrics->SubSubscribersCount(Subscribers.size());
- }
- TActor::PassAway();
- }
- void TInterconnectSessionTCP::PassAway() {
- Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly");
- }
- void TInterconnectSessionTCP::Forward(STATEFN_SIG) {
- Proxy->ValidateEvent(ev, "Forward");
- LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data());
- ++MessagesGot;
- if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
- Subscribe(ev);
- }
- ui16 evChannel = ev->GetChannel();
- auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
- const bool wasWorking = oChannel.IsWorking();
- const auto [dataSize, event] = oChannel.Push(*ev);
- LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
- TotalOutputQueueSize += dataSize;
- Proxy->Metrics->AddOutputBuffersTotalSize(dataSize);
- if (!wasWorking) {
- // this channel has returned to work -- it was empty and this we have just put first event in the queue
- ChannelScheduler->AddToHeap(oChannel, EqualizeCounter);
- }
- SetOutputStuckFlag(true);
- ++NumEventsInReadyChannels;
- LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
- WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush,
- QueueSizeInEvents = oChannel.GetQueueSize(),
- QueueSizeInBytes = oChannel.GetBufferedAmountOfData());
- // check for overloaded queues
- ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
- if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) {
- LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64,
- Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit);
- return Terminate(TDisconnectReason::QueueOverload());
- }
- ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20);
- if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) {
- LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size");
- if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) {
- CreateSessionKillingActor(Proxy->Common);
- }
- }
- if (RamInQueue && !RamInQueue->Batching) {
- // we have pending TEvRam, so GenerateTraffic will be called no matter what
- } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket || ReceiveContext->WriteBlockedByFullSendBuffer) {
- // we can't issue more traffic now; GenerateTraffic will be called upon unblocking
- } else if (TotalOutputQueueSize >= 64 * 1024) {
- // output queue size is quite big to issue some traffic
- GenerateTraffic();
- } else if (!RamInQueue) {
- Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1);
- RamInQueue = new TEvRam(true);
- auto *ev = new IEventHandle(SelfId(), {}, RamInQueue);
- const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod;
- if (batchPeriod != TDuration()) {
- TActivationContext::Schedule(batchPeriod, ev);
- } else {
- TActivationContext::Send(ev);
- }
- LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat());
- LOG_DEBUG_IC_SESSION("ICS17", "batching started");
- }
- }
- void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
- LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
- const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
- if (inserted) {
- Proxy->Metrics->IncSubscribersCount();
- } else {
- it->second = ev->Cookie;
- }
- Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie);
- }
- void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
- LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
- Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
- }
- THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) {
- TEvHandshakeAsk *msg = ev->Get();
- // close existing input session, if any, and do nothing upon its destruction
- ReestablishConnection({}, false, TDisconnectReason::NewSession());
- const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial();
- LOG_INFO_IC_SESSION("ICS08", "incoming handshake Self# %s Peer# %s Counter# %" PRIu64 " LastInputSerial# %" PRIu64,
- msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial);
- return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params);
- }
- void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) {
- if (ReceiverId) {
- // upon destruction of input session actor invoke this callback again
- ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession());
- return;
- }
- LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64,
- ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(),
- i64(*ev->Get()->Socket));
- NewConnectionSet = TActivationContext::Now();
- PacketsWrittenToSocket = 0;
- SendBufferSize = ev->Get()->Socket->GetSendBufferSize();
- Socket = std::move(ev->Get()->Socket);
- // there may be a race
- const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
- // arm watchdogs
- CloseOnIdleWatchdog.Arm(SelfId());
- // reset activity timestamps
- LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
- LOG_INFO_IC_SESSION("ICS10", "traffic start");
- // create input session actor
- auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
- Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
- ReceiveContext->UnlockLastProcessedPacketSerial();
- ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
- // register our socket in poller actor
- LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
- const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
- Y_VERIFY(success);
- ReceiveContext->WriteBlockedByFullSendBuffer = false;
- LostConnectionWatchdog.Disarm();
- Proxy->Metrics->SetConnected(1);
- LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId);
- // arm pinger timer
- ResetFlushLogic();
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // REINITIALIZE SEND QUEUE
- //
- // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
- // auxiliary packets; also reset packet metrics to zero to start sending from the beginning
- // also reset SendQueuePos
- // drop confirmed packets first as we do not need unwanted retransmissions
- SendQueuePos = SendQueue.end();
- DropConfirmed(nextPacket);
- for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) {
- const TSendQueue::iterator next = std::next(it);
- if (it->IsEmpty()) {
- SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it);
- } else {
- it->ResetBufs();
- }
- it = next;
- }
- TrimSendQueueCache();
- SendQueuePos = SendQueue.begin();
- TMaybe<ui64> s;
- for (auto it = SendQueuePos; it != SendQueue.end(); ++it) {
- if (!it->IsEmpty()) {
- s = it->GetSerial();
- }
- }
- const ui64 serial = s.GetOrElse(Max<ui64>());
- Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed);
- LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n",
- SendQueue.size(), LastConfirmed, serial);
- BytesUnwritten = 0;
- for (const auto& packet : SendQueue) {
- BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) +
- packet.GetDataSize();
- }
- SwitchStuckPeriod();
- LastHandshakeDone = TActivationContext::Now();
- RamInQueue = nullptr;
- GenerateTraffic();
- }
- void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) {
- if (ev->Sender == ReceiverId) {
- TEvUpdateFromInputSession& msg = *ev->Get();
- // update ping time
- Ping = msg.Ping;
- LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat());
- bool needConfirm = false;
- // update activity timer for dead peer checker
- LastInputActivityTimestamp = TActivationContext::Now();
- if (msg.NumDataBytes) {
- UnconfirmedBytes += msg.NumDataBytes;
- if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) {
- needConfirm = true;
- } else {
- SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod);
- }
- // reset payload watchdog that controls close-on-idle behaviour
- LastPayloadActivityTimestamp = TActivationContext::Now();
- CloseOnIdleWatchdog.Reset();
- }
- bool unblockedSomething = false;
- LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) {
- unblockedSomething = DropConfirmed(msg.ConfirmedByInput);
- }
- // generate more traffic if we have unblocked state now
- if (unblockedSomething) {
- LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
- GenerateTraffic();
- }
- // if we haven't generated any packets, then make a lone Flush packet without any data
- if (needConfirm && Socket) {
- ++ConfirmPacketsForcedBySize;
- MakePacket(false);
- }
- for (;;) {
- switch (EUpdateState state = ReceiveContext->UpdateState) {
- case EUpdateState::NONE:
- case EUpdateState::CONFIRMING:
- Y_FAIL("unexpected state");
- case EUpdateState::INFLIGHT:
- // this message we are processing was the only one in flight, so we can reset state to NONE here
- if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::NONE)) {
- return;
- }
- break;
- case EUpdateState::INFLIGHT_AND_PENDING:
- // there is more messages pending from the input session actor, so we have to inform it to release
- // that message
- if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::CONFIRMING)) {
- Send(ev->Sender, new TEvConfirmUpdate);
- return;
- }
- break;
- }
- }
- }
- }
- void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) {
- if (ev->Get() == RamInQueue) {
- LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
- RamInQueue = nullptr;
- GenerateTraffic();
- }
- }
- void TInterconnectSessionTCP::GenerateTraffic() {
- // generate ping request, if needed
- IssuePingRequest();
- if (RamInQueue && !RamInQueue->Batching) {
- LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0);
- return; // we'll do it a bit later
- } else {
- RamInQueue = nullptr;
- }
- LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic");
- // There is a tradeoff between fairness and efficiency.
- // The less traffic is generated here, the less buffering is after fair scheduler,
- // the more fair system is, the less latency is present.
- // The more traffic is generated here, the less syscalls and actor-system overhead occurs,
- // the less cpu is consumed.
- static const ui64 generateLimit = 64 * 1024;
- const ui64 sizeBefore = TotalOutputQueueSize;
- ui32 generatedPackets = 0;
- ui64 generatedBytes = 0;
- ui64 generateStarted = GetCycleCountFast();
- // apply traffic changes
- auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); };
- // first, we create as many data packets as we can generate under certain conditions; they include presence
- // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions
- // we exit cycle
- while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
- if (generatedBytes >= generateLimit) {
- // resume later but ensure that we have issued at least one packet
- RamInQueue = new TEvRam(false);
- Send(SelfId(), RamInQueue);
- RamStartedCycles = GetCycleCountFast();
- LWPROBE(StartRam, Proxy->PeerNodeId);
- break;
- }
- try {
- generatedBytes += MakePacket(true);
- ++generatedPackets;
- } catch (const TExSerializedEventTooLarge& ex) {
- // terminate session if the event can't be serialized properly
- accountTraffic();
- LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
- return Terminate(TDisconnectReason::EventTooLarge());
- }
- }
- if (Socket) {
- WriteData();
- }
- LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes);
- accountTraffic();
- EqualizeCounter += ChannelScheduler->Equalize();
- }
- void TInterconnectSessionTCP::StartHandshake() {
- LOG_INFO_IC_SESSION("ICS15", "start handshake");
- IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial());
- }
- void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) {
- ReestablishConnection({}, true, std::move(reason));
- }
- void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose,
- TDisconnectReason reason) {
- if (Socket) {
- LOG_INFO_IC_SESSION("ICS13", "reestablish connection");
- ShutdownSocket(std::move(reason)); // stop sending/receiving on socket
- PendingHandshakeDoneEvent = std::move(ev);
- StartHandshakeOnSessionClose = startHandshakeOnSessionClose;
- if (!ReceiverId) {
- ReestablishConnectionExecute();
- }
- }
- }
- void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) {
- if (ev->Sender == ReceiverId) {
- const bool wasConnected(Socket);
- LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data());
- ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet
- if (wasConnected) {
- // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should
- // restart handshake process, closing our part of socket first
- ShutdownSocket(ev->Get()->Reason);
- StartHandshake();
- } else {
- ReestablishConnectionExecute();
- }
- }
- }
- void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) {
- if (Socket) {
- if (const TString& s = reason.ToString()) {
- Proxy->Metrics->IncDisconnectByReason(s);
- }
- LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data());
- Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data());
- Socket->Shutdown(SHUT_RDWR);
- Socket.Reset();
- Proxy->Metrics->IncDisconnections();
- CloseOnIdleWatchdog.Disarm();
- LostConnectionWatchdog.Arm(SelfId());
- Proxy->Metrics->SetConnected(0);
- LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
- }
- }
- void TInterconnectSessionTCP::ReestablishConnectionExecute() {
- bool startHandshakeOnSessionClose = std::exchange(StartHandshakeOnSessionClose, false);
- TEvHandshakeDone::TPtr ev = std::move(PendingHandshakeDoneEvent);
- if (startHandshakeOnSessionClose) {
- StartHandshake();
- } else if (ev) {
- SetNewConnection(ev);
- }
- }
- void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) {
- LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s",
- ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false");
- if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) {
- Proxy->Metrics->IncUsefulWriteWakeups();
- ui64 nowCycles = GetCycleCountFast();
- double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0;
- LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0);
- WriteBlockedTotal += TDuration::MicroSeconds(blockedUs);
- GenerateTraffic();
- } else if (!ev->Cookie) {
- Proxy->Metrics->IncSpuriousWriteWakeups();
- }
- if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) {
- Send(ReceiverId, ev->Release().Release(), 0, 1);
- }
- }
- void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
- PollerToken = std::move(ev->Get()->PollerToken);
- if (ReceiveContext->WriteBlockedByFullSendBuffer) {
- if (Params.Encryption) {
- auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
- PollerToken->Request(secure->WantRead(), secure->WantWrite());
- } else {
- PollerToken->Request(false, true);
- }
- }
- }
- void TInterconnectSessionTCP::WriteData() {
- ui64 written = 0;
- Y_VERIFY(Socket); // ensure that socket wasn't closed
- LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
- constexpr ui32 iovLimit = 256;
- #ifdef _linux_
- ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX));
- #else
- ui32 maxElementsInIOV = 64;
- #endif
- if (Params.Encryption) {
- maxElementsInIOV = 1;
- }
- // vector of write buffers with preallocated stack space
- TStackVec<TConstIoVec, iovLimit> wbuffers;
- LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu",
- ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size());
- // update last confirmed packet number if it has changed
- if (SendQueuePos != SendQueue.end()) {
- SendQueuePos->UpdateConfirmIfPossible(ReceiveContext->GetLastProcessedPacketSerial());
- }
- while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
- for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) {
- it->AppendToIoVector(wbuffers, maxElementsInIOV);
- }
- const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data());
- int iovcnt = wbuffers.size();
- Y_VERIFY(iovcnt > 0);
- Y_VERIFY(iovec->iov_len > 0);
- TString err;
- ssize_t r = 0;
- do {
- #ifndef _win_
- r = iovcnt == 1 ? Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err) : Socket->WriteV(iovec, iovcnt);
- #else
- r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err);
- #endif
- Proxy->Metrics->IncSendSyscalls();
- } while (r == -EINTR);
- LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data());
- wbuffers.clear();
- if (r > 0) {
- Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
- BytesUnwritten -= r;
- written += r;
- ui64 packets = 0;
- // advance SendQueuePos to eat all processed items
- for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) {
- if (!SendQueuePos->IsEmpty()) {
- LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial());
- }
- ++PacketsWrittenToSocket;
- ++packets;
- LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
- }
- LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
- } else if (-r != EAGAIN && -r != EWOULDBLOCK) {
- const TString message = r == 0 ? "connection closed by peer"
- : err ? err
- : Sprintf("socket: %s", strerror(-r));
- LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data());
- if (written) {
- Proxy->Metrics->AddTotalBytesWritten(written);
- }
- return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
- } else {
- // we have to do some hack for secure socket -- mark the packet as 'tried writing'
- if (Params.Encryption) {
- Y_VERIFY(SendQueuePos != SendQueue.end());
- SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL
- }
- // we have received EAGAIN error code, this means that we can't issue more data until we have received
- // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
- Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer);
- ReceiveContext->WriteBlockedByFullSendBuffer = true;
- WriteBlockedCycles = GetCycleCountFast();
- LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written);
- LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit");
- if (PollerToken) {
- if (Params.Encryption) {
- auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
- PollerToken->Request(secure->WantRead(), secure->WantWrite());
- } else {
- PollerToken->Request(false, true);
- }
- }
- }
- }
- }
- if (written) {
- Proxy->Metrics->AddTotalBytesWritten(written);
- }
- }
- void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
- if (period != TDuration::Max()) {
- const TInstant when = TActivationContext::Now() + period;
- if (when < ForcePacketTimestamp) {
- ForcePacketTimestamp = when;
- ScheduleFlush();
- }
- }
- }
- void TInterconnectSessionTCP::ScheduleFlush() {
- if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) {
- Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush);
- FlushSchedule.push(ForcePacketTimestamp);
- MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
- ++FlushEventsScheduled;
- }
- }
- void TInterconnectSessionTCP::HandleFlush() {
- const TInstant now = TActivationContext::Now();
- while (FlushSchedule && now >= FlushSchedule.top()) {
- FlushSchedule.pop();
- }
- IssuePingRequest();
- if (Socket) {
- if (now >= ForcePacketTimestamp) {
- ++ConfirmPacketsForcedByTimeout;
- ++FlushEventsProcessed;
- MakePacket(false); // just generate confirmation packet if we have preconditions for this
- } else if (ForcePacketTimestamp != TInstant::Max()) {
- ScheduleFlush();
- }
- }
- }
- void TInterconnectSessionTCP::ResetFlushLogic() {
- ForcePacketTimestamp = TInstant::Max();
- UnconfirmedBytes = 0;
- const TDuration ping = Proxy->Common->Settings.PingPeriod;
- if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
- SetForcePacketTimestamp(ping);
- }
- }
- void TInterconnectSessionTCP::TrimSendQueueCache() {
- static constexpr size_t maxItems = 32;
- static constexpr size_t trimThreshold = maxItems * 2;
- if (SendQueueCache.size() >= trimThreshold) {
- auto it = SendQueueCache.end();
- for (size_t n = SendQueueCache.size() - maxItems; n; --n) {
- --it;
- }
- auto ev = std::make_unique<TEvFreeItems>();
- ev->Items.splice(ev->Items.end(), SendQueueCache, it, SendQueueCache.end());
- ev->NumBytes = ev->Items.size() * sizeof(TTcpPacketOutTask);
- if (ev->GetInLineForDestruction(Proxy->Common)) {
- Send(Proxy->Common->DestructorId, ev.release());
- }
- }
- }
- ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
- Y_VERIFY(Socket);
- TSendQueue::iterator packet;
- if (SendQueueCache) {
- // we have entries in cache, take one and move it to the end of SendQueue
- packet = SendQueueCache.begin();
- SendQueue.splice(SendQueue.end(), SendQueueCache, packet);
- packet->Reuse(); // reset packet to initial state
- } else {
- // we have to allocate new packet, so just do it
- LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) {
- packet = SendQueue.emplace(SendQueue.end(), Params);
- }
- }
- // update send queue position
- if (SendQueuePos == SendQueue.end()) {
- SendQueuePos = packet; // start sending this packet if we are not sending anything for now
- }
- ui64 serial = 0;
- if (data) {
- // generate serial for this data packet
- serial = ++OutputCounter;
- // fill the data packet
- Y_VERIFY(NumEventsInReadyChannels);
- LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) {
- FillSendingBuffer(*packet, serial);
- }
- Y_VERIFY(!packet->IsEmpty());
- InflightDataAmount += packet->GetDataSize();
- Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
- if (InflightDataAmount > GetTotalInflightAmountOfData()) {
- Proxy->Metrics->IncInflyLimitReach();
- }
- if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
- AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast());
- AtomicSet(ReceiveContext->ControlPacketId, OutputCounter);
- }
- // update payload activity timer
- LastPayloadActivityTimestamp = TActivationContext::Now();
- } else if (pingMask) {
- serial = *pingMask;
- // make this packet a priority one
- if (SendQueuePos != packet) {
- Y_VERIFY(SendQueuePos != SendQueue.end());
- if (SendQueuePos->IsAtBegin()) {
- // insert this packet just before the next being sent and step back
- SendQueue.splice(SendQueuePos, SendQueue, packet);
- --SendQueuePos;
- Y_VERIFY(SendQueuePos == packet);
- } else {
- // current packet is already being sent, so move new packet just after it
- SendQueue.splice(std::next(SendQueuePos), SendQueue, packet);
- }
- }
- }
- const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial();
- packet->SetMetadata(serial, lastInputSerial);
- packet->Sign();
- // count number of bytes pending for write
- ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize();
- BytesUnwritten += packetSize;
- LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
- " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(),
- InflightDataAmount, BytesUnwritten);
- // reset forced packet sending timestamp as we have confirmed all received data
- ResetFlushLogic();
- ++PacketsGenerated;
- LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
- if (!data) {
- WriteData();
- }
- return packetSize;
- }
- bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
- LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm);
- Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
- "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64,
- LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial);
- LastConfirmed = confirm;
- ui64 droppedDataAmount = 0;
- ui32 numDropped = 0;
- // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
- // making Serial <= confirm true
- TSendQueue::iterator it;
- ui64 lastDroppedSerial = 0;
- for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) {
- if (!it->IsEmpty()) {
- lastDroppedSerial = it->GetSerial();
- }
- droppedDataAmount += it->GetDataSize();
- ++numDropped;
- }
- SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it);
- TrimSendQueueCache();
- ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
- channel.DropConfirmed(lastDroppedSerial);
- });
- const ui64 current = InflightDataAmount;
- const ui64 limit = GetTotalInflightAmountOfData();
- const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount;
- PacketsConfirmed += numDropped;
- InflightDataAmount -= droppedDataAmount;
- Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
- LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount);
- LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes"
- " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped);
- Pool->Trim(); // send any unsent free requests
- return unblockedSomething;
- }
- void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) {
- ui32 bytesGenerated = 0;
- Y_VERIFY(NumEventsInReadyChannels);
- while (NumEventsInReadyChannels) {
- TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight();
- Y_VERIFY_DEBUG(!channel->IsEmpty());
- // generate some data within this channel
- const ui64 netBefore = channel->GetBufferedAmountOfData();
- ui64 gross = 0;
- const bool eventDone = channel->FeedBuf(task, serial, &gross);
- channel->UnaccountedTraffic += gross;
- const ui64 netAfter = channel->GetBufferedAmountOfData();
- Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink
- const ui64 net = netBefore - netAfter; // number of net bytes serialized
- // adjust metrics for local and global queue size
- TotalOutputQueueSize -= net;
- Proxy->Metrics->SubOutputBuffersTotalSize(net);
- bytesGenerated += gross;
- Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross);
- // return it back to queue or delete, depending on whether this channel is still working or not
- ChannelScheduler->FinishPick(gross, EqualizeCounter);
- // update some stats if the packet was fully serialized
- if (eventDone) {
- ++MessagesWrittenToBuffer;
- Y_VERIFY(NumEventsInReadyChannels);
- --NumEventsInReadyChannels;
- if (!NumEventsInReadyChannels) {
- SetOutputStuckFlag(false);
- }
- }
- if (!gross) { // no progress -- almost full packet buffer
- break;
- }
- }
- LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal);
- Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization
- }
- ui32 TInterconnectSessionTCP::CalculateQueueUtilization() {
- SwitchStuckPeriod();
- ui64 sumBusy = 0, sumPeriod = 0;
- for (auto iter = OutputQueueUtilization.begin(); iter != OutputQueueUtilization.end() - 1; ++iter) {
- sumBusy += iter->first;
- sumPeriod += iter->second;
- }
- return sumBusy * 1000000 / sumPeriod;
- }
- void TInterconnectSessionTCP::SendUpdateToWhiteboard(bool connected) {
- const ui32 utilization = Socket ? CalculateQueueUtilization() : 0;
- if (const auto& callback = Proxy->Common->UpdateWhiteboard) {
- enum class EFlag {
- GREEN,
- YELLOW,
- ORANGE,
- RED,
- };
- EFlag flagState = EFlag::RED;
- if (Socket) {
- flagState = EFlag::GREEN;
- do {
- auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
- if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
- flagState = EFlag::ORANGE;
- break;
- } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) {
- flagState = EFlag::YELLOW;
- }
- // check utilization
- if (utilization > 875000) { // 7/8
- flagState = EFlag::ORANGE;
- break;
- } else if (utilization > 500000) { // 1/2
- flagState = EFlag::YELLOW;
- }
- } while (false);
- }
- callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(),
- connected,
- flagState == EFlag::GREEN,
- flagState == EFlag::YELLOW,
- flagState == EFlag::ORANGE,
- flagState == EFlag::RED,
- TlsActivationContext->ExecutorThread.ActorSystem);
- }
- if (connected) {
- Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
- }
- }
- void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) {
- if (OutputStuckFlag == state)
- return;
- if (OutputQueueUtilization.Size() == 0)
- return;
- auto& lastpair = OutputQueueUtilization.Last();
- if (state)
- lastpair.first -= GetCycleCountFast();
- else
- lastpair.first += GetCycleCountFast();
- OutputStuckFlag = state;
- }
- void TInterconnectSessionTCP::SwitchStuckPeriod() {
- auto now = GetCycleCountFast();
- if (OutputQueueUtilization.Size() != 0) {
- auto& lastpair = OutputQueueUtilization.Last();
- lastpair.second = now - lastpair.second;
- if (OutputStuckFlag)
- lastpair.first += now;
- }
- OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now));
- if (OutputStuckFlag)
- OutputQueueUtilization.Last().first -= now;
- }
- TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const {
- return Coalesce(Proxy->Common->Settings.DeadPeer, DEFAULT_DEADPEER_TIMEOUT);
- }
- TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const {
- return Proxy->Common->Settings.CloseOnIdle;
- }
- TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const {
- return Coalesce(Proxy->Common->Settings.LostConnection, DEFAULT_LOST_CONNECTION_TIMEOUT);
- }
- ui32 TInterconnectSessionTCP::GetTotalInflightAmountOfData() const {
- return Coalesce(Proxy->Common->Settings.TotalInflightAmountOfData, DEFAULT_TOTAL_INFLIGHT_DATA);
- }
- ui64 TInterconnectSessionTCP::GetMaxCyclesPerEvent() const {
- return DurationToCycles(TDuration::MicroSeconds(50));
- }
- void TInterconnectSessionTCP::IssuePingRequest() {
- const TInstant now = TActivationContext::Now();
- if (now >= LastPingTimestamp + PingPeriodicity) {
- LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
- if (Socket) {
- MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask);
- }
- if (Socket) {
- MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask);
- }
- LastPingTimestamp = now;
- }
- }
- void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) {
- if (Socket) {
- MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask);
- }
- }
- void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) {
- HTML(str) {
- DIV_CLASS("panel panel-info") {
- DIV_CLASS("panel-heading") {
- str << "Session";
- }
- DIV_CLASS("panel-body") {
- TABLE_CLASS("table") {
- TABLEHEAD() {
- TABLER() {
- TABLEH() {
- str << "Sensor";
- }
- TABLEH() {
- str << "Value";
- }
- }
- }
- TABLEBODY() {
- TABLER() {
- TABLED() {
- str << "Encryption";
- }
- TABLED() {
- str << (Params.Encryption ? "<font color=green>Enabled</font>" : "<font color=red>Disabled</font>");
- }
- }
- if (auto *x = dynamic_cast<NInterconnect::TSecureSocket*>(Socket.Get())) {
- TABLER() {
- TABLED() {
- str << "Cipher name";
- }
- TABLED() {
- str << x->GetCipherName();
- }
- }
- TABLER() {
- TABLED() {
- str << "Cipher bits";
- }
- TABLED() {
- str << x->GetCipherBits();
- }
- }
- TABLER() {
- TABLED() {
- str << "Protocol";
- }
- TABLED() {
- str << x->GetProtocolName();
- }
- }
- TABLER() {
- TABLED() {
- str << "Peer CN";
- }
- TABLED() {
- str << x->GetPeerCommonName();
- }
- }
- }
- TABLER() {
- TABLED() { str << "AuthOnly CN"; }
- TABLED() { str << Params.AuthCN; }
- }
- TABLER() {
- TABLED() {
- str << "Local scope id";
- }
- TABLED() {
- str << ScopeIdToString(Proxy->Common->LocalScopeId);
- }
- }
- TABLER() {
- TABLED() {
- str << "Peer scope id";
- }
- TABLED() {
- str << ScopeIdToString(Params.PeerScopeId);
- }
- }
- TABLER() {
- TABLED() {
- str << "This page generated at";
- }
- TABLED() {
- str << TActivationContext::Now() << " / " << Now();
- }
- }
- TABLER() {
- TABLED() {
- str << "SelfID";
- }
- TABLED() {
- str << SelfId().ToString();
- }
- }
- TABLER() {
- TABLED() { str << "Frame version/Checksum"; }
- TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); }
- }
- #define MON_VAR(NAME) \
- TABLER() { \
- TABLED() { \
- str << #NAME; \
- } \
- TABLED() { \
- str << NAME; \
- } \
- }
- MON_VAR(Created)
- MON_VAR(NewConnectionSet)
- MON_VAR(ReceiverId)
- MON_VAR(MessagesGot)
- MON_VAR(MessagesWrittenToBuffer)
- MON_VAR(PacketsGenerated)
- MON_VAR(PacketsWrittenToSocket)
- MON_VAR(PacketsConfirmed)
- MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket))
- MON_VAR(ConfirmPacketsForcedBySize)
- MON_VAR(ConfirmPacketsForcedByTimeout)
- TABLER() {
- TABLED() {
- str << "Virtual self ID";
- }
- TABLED() {
- str << Proxy->SessionVirtualId.ToString();
- }
- }
- TABLER() {
- TABLED() {
- str << "Virtual peer ID";
- }
- TABLED() {
- str << Proxy->RemoteSessionVirtualId.ToString();
- }
- }
- TABLER() {
- TABLED() {
- str << "Socket";
- }
- TABLED() {
- str << (Socket ? i64(*Socket) : -1);
- }
- }
- ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
- MON_VAR(OutputStuckFlag)
- MON_VAR(SendQueue.size())
- MON_VAR(SendQueueCache.size())
- MON_VAR(NumEventsInReadyChannels)
- MON_VAR(TotalOutputQueueSize)
- MON_VAR(BytesUnwritten)
- MON_VAR(InflightDataAmount)
- MON_VAR(unsentQueueSize)
- MON_VAR(SendBufferSize)
- MON_VAR(LastInputActivityTimestamp)
- MON_VAR(LastPayloadActivityTimestamp)
- MON_VAR(LastHandshakeDone)
- MON_VAR(OutputCounter)
- MON_VAR(LastSentSerial)
- MON_VAR(ReceiveContext->GetLastProcessedPacketSerial())
- MON_VAR(LastConfirmed)
- MON_VAR(FlushSchedule.size())
- MON_VAR(MaxFlushSchedule)
- MON_VAR(FlushEventsScheduled)
- MON_VAR(FlushEventsProcessed)
- TString clockSkew;
- i64 x = GetClockSkew();
- if (x < 0) {
- clockSkew = Sprintf("-%s", TDuration::MicroSeconds(-x).ToString().data());
- } else {
- clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data());
- }
- MON_VAR(LastPingTimestamp)
- MON_VAR(GetPingRTT())
- MON_VAR(clockSkew)
- MON_VAR(GetDeadPeerTimeout())
- MON_VAR(GetTotalInflightAmountOfData())
- MON_VAR(GetCloseOnIdleTimeout())
- MON_VAR(Subscribers.size())
- }
- }
- }
- }
- }
- }
- void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) {
- TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common));
- }
- }
|