interconnect_tcp_session.cpp 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228
  1. #include "interconnect_tcp_proxy.h"
  2. #include "interconnect_tcp_session.h"
  3. #include "interconnect_handshake.h"
  4. #include <library/cpp/actors/core/probes.h>
  5. #include <library/cpp/actors/core/log.h>
  6. #include <library/cpp/actors/core/interconnect.h>
  7. #include <library/cpp/actors/util/datetime.h>
  8. #include <library/cpp/actors/protos/services_common.pb.h>
  9. #include <library/cpp/monlib/service/pages/templates.h>
  10. namespace NActors {
  11. LWTRACE_USING(ACTORLIB_PROVIDER);
  12. DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes));
  13. template<typename T>
  14. T Coalesce(T&& x) {
  15. return x;
  16. }
  17. template<typename T, typename T2, typename... TRest>
  18. typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) {
  19. if (first != typename std::remove_reference<T>::type()) {
  20. return first;
  21. } else {
  22. return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...);
  23. }
  24. }
  25. TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params)
  26. : TActor(&TInterconnectSessionTCP::StateFunc)
  27. , Created(TInstant::Now())
  28. , Proxy(proxy)
  29. , CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this))
  30. , LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this))
  31. , Params(std::move(params))
  32. , TotalOutputQueueSize(0)
  33. , OutputStuckFlag(false)
  34. , OutputQueueUtilization(16)
  35. , OutputCounter(0ULL)
  36. {
  37. Proxy->Metrics->SetConnected(0);
  38. ReceiveContext.Reset(new TReceiveContext);
  39. }
  40. TInterconnectSessionTCP::~TInterconnectSessionTCP() {
  41. // close socket ASAP when actor system is being shut down
  42. if (Socket) {
  43. Socket->Shutdown(SHUT_RDWR);
  44. }
  45. }
  46. void TInterconnectSessionTCP::Init() {
  47. auto destroyCallback = [as = TlsActivationContext->ExecutorThread.ActorSystem, id = Proxy->Common->DestructorId](THolder<IEventBase> event) {
  48. as->Send(id, event.Release());
  49. };
  50. Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback));
  51. ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool,
  52. Proxy->Common->Settings.MaxSerializedEventSize, Params);
  53. LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId);
  54. SetPrefix(Sprintf("Session %s [node %" PRIu32 "]", SelfId().ToString().data(), Proxy->PeerNodeId));
  55. SendUpdateToWhiteboard();
  56. }
  57. void TInterconnectSessionTCP::CloseInputSession() {
  58. Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession);
  59. }
  60. void TInterconnectSessionTCP::Handle(TEvTerminate::TPtr& ev) {
  61. Terminate(ev->Get()->Reason);
  62. }
  63. void TInterconnectSessionTCP::HandlePoison() {
  64. Terminate(TDisconnectReason());
  65. }
  66. void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) {
  67. LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1));
  68. IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this);
  69. ShutdownSocket(std::move(reason));
  70. for (const auto& kv : Subscribers) {
  71. Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
  72. }
  73. Proxy->Metrics->SubSubscribersCount(Subscribers.size());
  74. Subscribers.clear();
  75. ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
  76. channel.NotifyUndelivered();
  77. });
  78. if (ReceiverId) {
  79. Send(ReceiverId, new TEvents::TEvPoisonPill);
  80. }
  81. SendUpdateToWhiteboard(false);
  82. Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize);
  83. Proxy->Metrics->SubInflightDataAmount(InflightDataAmount);
  84. LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId);
  85. if (!Subscribers.empty()) {
  86. Proxy->Metrics->SubSubscribersCount(Subscribers.size());
  87. }
  88. TActor::PassAway();
  89. }
  90. void TInterconnectSessionTCP::PassAway() {
  91. Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly");
  92. }
  93. void TInterconnectSessionTCP::Forward(STATEFN_SIG) {
  94. Proxy->ValidateEvent(ev, "Forward");
  95. LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data());
  96. ++MessagesGot;
  97. if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
  98. Subscribe(ev);
  99. }
  100. ui16 evChannel = ev->GetChannel();
  101. auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
  102. const bool wasWorking = oChannel.IsWorking();
  103. const auto [dataSize, event] = oChannel.Push(*ev);
  104. LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
  105. TotalOutputQueueSize += dataSize;
  106. Proxy->Metrics->AddOutputBuffersTotalSize(dataSize);
  107. if (!wasWorking) {
  108. // this channel has returned to work -- it was empty and this we have just put first event in the queue
  109. ChannelScheduler->AddToHeap(oChannel, EqualizeCounter);
  110. }
  111. SetOutputStuckFlag(true);
  112. ++NumEventsInReadyChannels;
  113. LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
  114. WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush,
  115. QueueSizeInEvents = oChannel.GetQueueSize(),
  116. QueueSizeInBytes = oChannel.GetBufferedAmountOfData());
  117. // check for overloaded queues
  118. ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
  119. if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) {
  120. LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64,
  121. Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit);
  122. return Terminate(TDisconnectReason::QueueOverload());
  123. }
  124. ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20);
  125. if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) {
  126. LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size");
  127. if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) {
  128. CreateSessionKillingActor(Proxy->Common);
  129. }
  130. }
  131. if (RamInQueue && !RamInQueue->Batching) {
  132. // we have pending TEvRam, so GenerateTraffic will be called no matter what
  133. } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket || ReceiveContext->WriteBlockedByFullSendBuffer) {
  134. // we can't issue more traffic now; GenerateTraffic will be called upon unblocking
  135. } else if (TotalOutputQueueSize >= 64 * 1024) {
  136. // output queue size is quite big to issue some traffic
  137. GenerateTraffic();
  138. } else if (!RamInQueue) {
  139. Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1);
  140. RamInQueue = new TEvRam(true);
  141. auto *ev = new IEventHandle(SelfId(), {}, RamInQueue);
  142. const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod;
  143. if (batchPeriod != TDuration()) {
  144. TActivationContext::Schedule(batchPeriod, ev);
  145. } else {
  146. TActivationContext::Send(ev);
  147. }
  148. LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat());
  149. LOG_DEBUG_IC_SESSION("ICS17", "batching started");
  150. }
  151. }
  152. void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
  153. LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
  154. const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
  155. if (inserted) {
  156. Proxy->Metrics->IncSubscribersCount();
  157. } else {
  158. it->second = ev->Cookie;
  159. }
  160. Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie);
  161. }
  162. void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
  163. LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
  164. Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
  165. }
  166. THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) {
  167. TEvHandshakeAsk *msg = ev->Get();
  168. // close existing input session, if any, and do nothing upon its destruction
  169. ReestablishConnection({}, false, TDisconnectReason::NewSession());
  170. const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial();
  171. LOG_INFO_IC_SESSION("ICS08", "incoming handshake Self# %s Peer# %s Counter# %" PRIu64 " LastInputSerial# %" PRIu64,
  172. msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial);
  173. return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params);
  174. }
  175. void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) {
  176. if (ReceiverId) {
  177. // upon destruction of input session actor invoke this callback again
  178. ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession());
  179. return;
  180. }
  181. LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64,
  182. ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(),
  183. i64(*ev->Get()->Socket));
  184. NewConnectionSet = TActivationContext::Now();
  185. PacketsWrittenToSocket = 0;
  186. SendBufferSize = ev->Get()->Socket->GetSendBufferSize();
  187. Socket = std::move(ev->Get()->Socket);
  188. // there may be a race
  189. const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
  190. // arm watchdogs
  191. CloseOnIdleWatchdog.Arm(SelfId());
  192. // reset activity timestamps
  193. LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
  194. LOG_INFO_IC_SESSION("ICS10", "traffic start");
  195. // create input session actor
  196. auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
  197. Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
  198. ReceiveContext->UnlockLastProcessedPacketSerial();
  199. ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
  200. // register our socket in poller actor
  201. LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
  202. const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
  203. Y_VERIFY(success);
  204. ReceiveContext->WriteBlockedByFullSendBuffer = false;
  205. LostConnectionWatchdog.Disarm();
  206. Proxy->Metrics->SetConnected(1);
  207. LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId);
  208. // arm pinger timer
  209. ResetFlushLogic();
  210. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  211. // REINITIALIZE SEND QUEUE
  212. //
  213. // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
  214. // auxiliary packets; also reset packet metrics to zero to start sending from the beginning
  215. // also reset SendQueuePos
  216. // drop confirmed packets first as we do not need unwanted retransmissions
  217. SendQueuePos = SendQueue.end();
  218. DropConfirmed(nextPacket);
  219. for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) {
  220. const TSendQueue::iterator next = std::next(it);
  221. if (it->IsEmpty()) {
  222. SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it);
  223. } else {
  224. it->ResetBufs();
  225. }
  226. it = next;
  227. }
  228. TrimSendQueueCache();
  229. SendQueuePos = SendQueue.begin();
  230. TMaybe<ui64> s;
  231. for (auto it = SendQueuePos; it != SendQueue.end(); ++it) {
  232. if (!it->IsEmpty()) {
  233. s = it->GetSerial();
  234. }
  235. }
  236. const ui64 serial = s.GetOrElse(Max<ui64>());
  237. Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed);
  238. LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n",
  239. SendQueue.size(), LastConfirmed, serial);
  240. BytesUnwritten = 0;
  241. for (const auto& packet : SendQueue) {
  242. BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) +
  243. packet.GetDataSize();
  244. }
  245. SwitchStuckPeriod();
  246. LastHandshakeDone = TActivationContext::Now();
  247. RamInQueue = nullptr;
  248. GenerateTraffic();
  249. }
  250. void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) {
  251. if (ev->Sender == ReceiverId) {
  252. TEvUpdateFromInputSession& msg = *ev->Get();
  253. // update ping time
  254. Ping = msg.Ping;
  255. LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat());
  256. bool needConfirm = false;
  257. // update activity timer for dead peer checker
  258. LastInputActivityTimestamp = TActivationContext::Now();
  259. if (msg.NumDataBytes) {
  260. UnconfirmedBytes += msg.NumDataBytes;
  261. if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) {
  262. needConfirm = true;
  263. } else {
  264. SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod);
  265. }
  266. // reset payload watchdog that controls close-on-idle behaviour
  267. LastPayloadActivityTimestamp = TActivationContext::Now();
  268. CloseOnIdleWatchdog.Reset();
  269. }
  270. bool unblockedSomething = false;
  271. LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) {
  272. unblockedSomething = DropConfirmed(msg.ConfirmedByInput);
  273. }
  274. // generate more traffic if we have unblocked state now
  275. if (unblockedSomething) {
  276. LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
  277. GenerateTraffic();
  278. }
  279. // if we haven't generated any packets, then make a lone Flush packet without any data
  280. if (needConfirm && Socket) {
  281. ++ConfirmPacketsForcedBySize;
  282. MakePacket(false);
  283. }
  284. for (;;) {
  285. switch (EUpdateState state = ReceiveContext->UpdateState) {
  286. case EUpdateState::NONE:
  287. case EUpdateState::CONFIRMING:
  288. Y_FAIL("unexpected state");
  289. case EUpdateState::INFLIGHT:
  290. // this message we are processing was the only one in flight, so we can reset state to NONE here
  291. if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::NONE)) {
  292. return;
  293. }
  294. break;
  295. case EUpdateState::INFLIGHT_AND_PENDING:
  296. // there is more messages pending from the input session actor, so we have to inform it to release
  297. // that message
  298. if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::CONFIRMING)) {
  299. Send(ev->Sender, new TEvConfirmUpdate);
  300. return;
  301. }
  302. break;
  303. }
  304. }
  305. }
  306. }
  307. void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) {
  308. if (ev->Get() == RamInQueue) {
  309. LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
  310. RamInQueue = nullptr;
  311. GenerateTraffic();
  312. }
  313. }
  314. void TInterconnectSessionTCP::GenerateTraffic() {
  315. // generate ping request, if needed
  316. IssuePingRequest();
  317. if (RamInQueue && !RamInQueue->Batching) {
  318. LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0);
  319. return; // we'll do it a bit later
  320. } else {
  321. RamInQueue = nullptr;
  322. }
  323. LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic");
  324. // There is a tradeoff between fairness and efficiency.
  325. // The less traffic is generated here, the less buffering is after fair scheduler,
  326. // the more fair system is, the less latency is present.
  327. // The more traffic is generated here, the less syscalls and actor-system overhead occurs,
  328. // the less cpu is consumed.
  329. static const ui64 generateLimit = 64 * 1024;
  330. const ui64 sizeBefore = TotalOutputQueueSize;
  331. ui32 generatedPackets = 0;
  332. ui64 generatedBytes = 0;
  333. ui64 generateStarted = GetCycleCountFast();
  334. // apply traffic changes
  335. auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); };
  336. // first, we create as many data packets as we can generate under certain conditions; they include presence
  337. // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions
  338. // we exit cycle
  339. while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
  340. if (generatedBytes >= generateLimit) {
  341. // resume later but ensure that we have issued at least one packet
  342. RamInQueue = new TEvRam(false);
  343. Send(SelfId(), RamInQueue);
  344. RamStartedCycles = GetCycleCountFast();
  345. LWPROBE(StartRam, Proxy->PeerNodeId);
  346. break;
  347. }
  348. try {
  349. generatedBytes += MakePacket(true);
  350. ++generatedPackets;
  351. } catch (const TExSerializedEventTooLarge& ex) {
  352. // terminate session if the event can't be serialized properly
  353. accountTraffic();
  354. LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
  355. return Terminate(TDisconnectReason::EventTooLarge());
  356. }
  357. }
  358. if (Socket) {
  359. WriteData();
  360. }
  361. LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes);
  362. accountTraffic();
  363. EqualizeCounter += ChannelScheduler->Equalize();
  364. }
  365. void TInterconnectSessionTCP::StartHandshake() {
  366. LOG_INFO_IC_SESSION("ICS15", "start handshake");
  367. IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial());
  368. }
  369. void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) {
  370. ReestablishConnection({}, true, std::move(reason));
  371. }
  372. void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose,
  373. TDisconnectReason reason) {
  374. if (Socket) {
  375. LOG_INFO_IC_SESSION("ICS13", "reestablish connection");
  376. ShutdownSocket(std::move(reason)); // stop sending/receiving on socket
  377. PendingHandshakeDoneEvent = std::move(ev);
  378. StartHandshakeOnSessionClose = startHandshakeOnSessionClose;
  379. if (!ReceiverId) {
  380. ReestablishConnectionExecute();
  381. }
  382. }
  383. }
  384. void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) {
  385. if (ev->Sender == ReceiverId) {
  386. const bool wasConnected(Socket);
  387. LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data());
  388. ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet
  389. if (wasConnected) {
  390. // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should
  391. // restart handshake process, closing our part of socket first
  392. ShutdownSocket(ev->Get()->Reason);
  393. StartHandshake();
  394. } else {
  395. ReestablishConnectionExecute();
  396. }
  397. }
  398. }
  399. void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) {
  400. if (Socket) {
  401. if (const TString& s = reason.ToString()) {
  402. Proxy->Metrics->IncDisconnectByReason(s);
  403. }
  404. LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data());
  405. Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data());
  406. Socket->Shutdown(SHUT_RDWR);
  407. Socket.Reset();
  408. Proxy->Metrics->IncDisconnections();
  409. CloseOnIdleWatchdog.Disarm();
  410. LostConnectionWatchdog.Arm(SelfId());
  411. Proxy->Metrics->SetConnected(0);
  412. LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
  413. }
  414. }
  415. void TInterconnectSessionTCP::ReestablishConnectionExecute() {
  416. bool startHandshakeOnSessionClose = std::exchange(StartHandshakeOnSessionClose, false);
  417. TEvHandshakeDone::TPtr ev = std::move(PendingHandshakeDoneEvent);
  418. if (startHandshakeOnSessionClose) {
  419. StartHandshake();
  420. } else if (ev) {
  421. SetNewConnection(ev);
  422. }
  423. }
  424. void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) {
  425. LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s",
  426. ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false");
  427. if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) {
  428. Proxy->Metrics->IncUsefulWriteWakeups();
  429. ui64 nowCycles = GetCycleCountFast();
  430. double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0;
  431. LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0);
  432. WriteBlockedTotal += TDuration::MicroSeconds(blockedUs);
  433. GenerateTraffic();
  434. } else if (!ev->Cookie) {
  435. Proxy->Metrics->IncSpuriousWriteWakeups();
  436. }
  437. if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) {
  438. Send(ReceiverId, ev->Release().Release(), 0, 1);
  439. }
  440. }
  441. void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
  442. PollerToken = std::move(ev->Get()->PollerToken);
  443. if (ReceiveContext->WriteBlockedByFullSendBuffer) {
  444. if (Params.Encryption) {
  445. auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
  446. PollerToken->Request(secure->WantRead(), secure->WantWrite());
  447. } else {
  448. PollerToken->Request(false, true);
  449. }
  450. }
  451. }
  452. void TInterconnectSessionTCP::WriteData() {
  453. ui64 written = 0;
  454. Y_VERIFY(Socket); // ensure that socket wasn't closed
  455. LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
  456. constexpr ui32 iovLimit = 256;
  457. #ifdef _linux_
  458. ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX));
  459. #else
  460. ui32 maxElementsInIOV = 64;
  461. #endif
  462. if (Params.Encryption) {
  463. maxElementsInIOV = 1;
  464. }
  465. // vector of write buffers with preallocated stack space
  466. TStackVec<TConstIoVec, iovLimit> wbuffers;
  467. LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu",
  468. ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size());
  469. // update last confirmed packet number if it has changed
  470. if (SendQueuePos != SendQueue.end()) {
  471. SendQueuePos->UpdateConfirmIfPossible(ReceiveContext->GetLastProcessedPacketSerial());
  472. }
  473. while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
  474. for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) {
  475. it->AppendToIoVector(wbuffers, maxElementsInIOV);
  476. }
  477. const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data());
  478. int iovcnt = wbuffers.size();
  479. Y_VERIFY(iovcnt > 0);
  480. Y_VERIFY(iovec->iov_len > 0);
  481. TString err;
  482. ssize_t r = 0;
  483. do {
  484. #ifndef _win_
  485. r = iovcnt == 1 ? Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err) : Socket->WriteV(iovec, iovcnt);
  486. #else
  487. r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err);
  488. #endif
  489. Proxy->Metrics->IncSendSyscalls();
  490. } while (r == -EINTR);
  491. LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data());
  492. wbuffers.clear();
  493. if (r > 0) {
  494. Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
  495. BytesUnwritten -= r;
  496. written += r;
  497. ui64 packets = 0;
  498. // advance SendQueuePos to eat all processed items
  499. for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) {
  500. if (!SendQueuePos->IsEmpty()) {
  501. LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial());
  502. }
  503. ++PacketsWrittenToSocket;
  504. ++packets;
  505. LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
  506. }
  507. LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
  508. } else if (-r != EAGAIN && -r != EWOULDBLOCK) {
  509. const TString message = r == 0 ? "connection closed by peer"
  510. : err ? err
  511. : Sprintf("socket: %s", strerror(-r));
  512. LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data());
  513. if (written) {
  514. Proxy->Metrics->AddTotalBytesWritten(written);
  515. }
  516. return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
  517. } else {
  518. // we have to do some hack for secure socket -- mark the packet as 'tried writing'
  519. if (Params.Encryption) {
  520. Y_VERIFY(SendQueuePos != SendQueue.end());
  521. SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL
  522. }
  523. // we have received EAGAIN error code, this means that we can't issue more data until we have received
  524. // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
  525. Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer);
  526. ReceiveContext->WriteBlockedByFullSendBuffer = true;
  527. WriteBlockedCycles = GetCycleCountFast();
  528. LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written);
  529. LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit");
  530. if (PollerToken) {
  531. if (Params.Encryption) {
  532. auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
  533. PollerToken->Request(secure->WantRead(), secure->WantWrite());
  534. } else {
  535. PollerToken->Request(false, true);
  536. }
  537. }
  538. }
  539. }
  540. }
  541. if (written) {
  542. Proxy->Metrics->AddTotalBytesWritten(written);
  543. }
  544. }
  545. void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
  546. if (period != TDuration::Max()) {
  547. const TInstant when = TActivationContext::Now() + period;
  548. if (when < ForcePacketTimestamp) {
  549. ForcePacketTimestamp = when;
  550. ScheduleFlush();
  551. }
  552. }
  553. }
  554. void TInterconnectSessionTCP::ScheduleFlush() {
  555. if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) {
  556. Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush);
  557. FlushSchedule.push(ForcePacketTimestamp);
  558. MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
  559. ++FlushEventsScheduled;
  560. }
  561. }
  562. void TInterconnectSessionTCP::HandleFlush() {
  563. const TInstant now = TActivationContext::Now();
  564. while (FlushSchedule && now >= FlushSchedule.top()) {
  565. FlushSchedule.pop();
  566. }
  567. IssuePingRequest();
  568. if (Socket) {
  569. if (now >= ForcePacketTimestamp) {
  570. ++ConfirmPacketsForcedByTimeout;
  571. ++FlushEventsProcessed;
  572. MakePacket(false); // just generate confirmation packet if we have preconditions for this
  573. } else if (ForcePacketTimestamp != TInstant::Max()) {
  574. ScheduleFlush();
  575. }
  576. }
  577. }
  578. void TInterconnectSessionTCP::ResetFlushLogic() {
  579. ForcePacketTimestamp = TInstant::Max();
  580. UnconfirmedBytes = 0;
  581. const TDuration ping = Proxy->Common->Settings.PingPeriod;
  582. if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
  583. SetForcePacketTimestamp(ping);
  584. }
  585. }
  586. void TInterconnectSessionTCP::TrimSendQueueCache() {
  587. static constexpr size_t maxItems = 32;
  588. static constexpr size_t trimThreshold = maxItems * 2;
  589. if (SendQueueCache.size() >= trimThreshold) {
  590. auto it = SendQueueCache.end();
  591. for (size_t n = SendQueueCache.size() - maxItems; n; --n) {
  592. --it;
  593. }
  594. auto ev = std::make_unique<TEvFreeItems>();
  595. ev->Items.splice(ev->Items.end(), SendQueueCache, it, SendQueueCache.end());
  596. ev->NumBytes = ev->Items.size() * sizeof(TTcpPacketOutTask);
  597. if (ev->GetInLineForDestruction(Proxy->Common)) {
  598. Send(Proxy->Common->DestructorId, ev.release());
  599. }
  600. }
  601. }
  602. ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
  603. Y_VERIFY(Socket);
  604. TSendQueue::iterator packet;
  605. if (SendQueueCache) {
  606. // we have entries in cache, take one and move it to the end of SendQueue
  607. packet = SendQueueCache.begin();
  608. SendQueue.splice(SendQueue.end(), SendQueueCache, packet);
  609. packet->Reuse(); // reset packet to initial state
  610. } else {
  611. // we have to allocate new packet, so just do it
  612. LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) {
  613. packet = SendQueue.emplace(SendQueue.end(), Params);
  614. }
  615. }
  616. // update send queue position
  617. if (SendQueuePos == SendQueue.end()) {
  618. SendQueuePos = packet; // start sending this packet if we are not sending anything for now
  619. }
  620. ui64 serial = 0;
  621. if (data) {
  622. // generate serial for this data packet
  623. serial = ++OutputCounter;
  624. // fill the data packet
  625. Y_VERIFY(NumEventsInReadyChannels);
  626. LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) {
  627. FillSendingBuffer(*packet, serial);
  628. }
  629. Y_VERIFY(!packet->IsEmpty());
  630. InflightDataAmount += packet->GetDataSize();
  631. Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
  632. if (InflightDataAmount > GetTotalInflightAmountOfData()) {
  633. Proxy->Metrics->IncInflyLimitReach();
  634. }
  635. if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
  636. AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast());
  637. AtomicSet(ReceiveContext->ControlPacketId, OutputCounter);
  638. }
  639. // update payload activity timer
  640. LastPayloadActivityTimestamp = TActivationContext::Now();
  641. } else if (pingMask) {
  642. serial = *pingMask;
  643. // make this packet a priority one
  644. if (SendQueuePos != packet) {
  645. Y_VERIFY(SendQueuePos != SendQueue.end());
  646. if (SendQueuePos->IsAtBegin()) {
  647. // insert this packet just before the next being sent and step back
  648. SendQueue.splice(SendQueuePos, SendQueue, packet);
  649. --SendQueuePos;
  650. Y_VERIFY(SendQueuePos == packet);
  651. } else {
  652. // current packet is already being sent, so move new packet just after it
  653. SendQueue.splice(std::next(SendQueuePos), SendQueue, packet);
  654. }
  655. }
  656. }
  657. const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial();
  658. packet->SetMetadata(serial, lastInputSerial);
  659. packet->Sign();
  660. // count number of bytes pending for write
  661. ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize();
  662. BytesUnwritten += packetSize;
  663. LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
  664. " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(),
  665. InflightDataAmount, BytesUnwritten);
  666. // reset forced packet sending timestamp as we have confirmed all received data
  667. ResetFlushLogic();
  668. ++PacketsGenerated;
  669. LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
  670. if (!data) {
  671. WriteData();
  672. }
  673. return packetSize;
  674. }
  675. bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
  676. LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm);
  677. Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
  678. "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64,
  679. LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial);
  680. LastConfirmed = confirm;
  681. ui64 droppedDataAmount = 0;
  682. ui32 numDropped = 0;
  683. // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
  684. // making Serial <= confirm true
  685. TSendQueue::iterator it;
  686. ui64 lastDroppedSerial = 0;
  687. for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) {
  688. if (!it->IsEmpty()) {
  689. lastDroppedSerial = it->GetSerial();
  690. }
  691. droppedDataAmount += it->GetDataSize();
  692. ++numDropped;
  693. }
  694. SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it);
  695. TrimSendQueueCache();
  696. ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
  697. channel.DropConfirmed(lastDroppedSerial);
  698. });
  699. const ui64 current = InflightDataAmount;
  700. const ui64 limit = GetTotalInflightAmountOfData();
  701. const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount;
  702. PacketsConfirmed += numDropped;
  703. InflightDataAmount -= droppedDataAmount;
  704. Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
  705. LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount);
  706. LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes"
  707. " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped);
  708. Pool->Trim(); // send any unsent free requests
  709. return unblockedSomething;
  710. }
  711. void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) {
  712. ui32 bytesGenerated = 0;
  713. Y_VERIFY(NumEventsInReadyChannels);
  714. while (NumEventsInReadyChannels) {
  715. TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight();
  716. Y_VERIFY_DEBUG(!channel->IsEmpty());
  717. // generate some data within this channel
  718. const ui64 netBefore = channel->GetBufferedAmountOfData();
  719. ui64 gross = 0;
  720. const bool eventDone = channel->FeedBuf(task, serial, &gross);
  721. channel->UnaccountedTraffic += gross;
  722. const ui64 netAfter = channel->GetBufferedAmountOfData();
  723. Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink
  724. const ui64 net = netBefore - netAfter; // number of net bytes serialized
  725. // adjust metrics for local and global queue size
  726. TotalOutputQueueSize -= net;
  727. Proxy->Metrics->SubOutputBuffersTotalSize(net);
  728. bytesGenerated += gross;
  729. Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross);
  730. // return it back to queue or delete, depending on whether this channel is still working or not
  731. ChannelScheduler->FinishPick(gross, EqualizeCounter);
  732. // update some stats if the packet was fully serialized
  733. if (eventDone) {
  734. ++MessagesWrittenToBuffer;
  735. Y_VERIFY(NumEventsInReadyChannels);
  736. --NumEventsInReadyChannels;
  737. if (!NumEventsInReadyChannels) {
  738. SetOutputStuckFlag(false);
  739. }
  740. }
  741. if (!gross) { // no progress -- almost full packet buffer
  742. break;
  743. }
  744. }
  745. LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal);
  746. Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization
  747. }
  748. ui32 TInterconnectSessionTCP::CalculateQueueUtilization() {
  749. SwitchStuckPeriod();
  750. ui64 sumBusy = 0, sumPeriod = 0;
  751. for (auto iter = OutputQueueUtilization.begin(); iter != OutputQueueUtilization.end() - 1; ++iter) {
  752. sumBusy += iter->first;
  753. sumPeriod += iter->second;
  754. }
  755. return sumBusy * 1000000 / sumPeriod;
  756. }
  757. void TInterconnectSessionTCP::SendUpdateToWhiteboard(bool connected) {
  758. const ui32 utilization = Socket ? CalculateQueueUtilization() : 0;
  759. if (const auto& callback = Proxy->Common->UpdateWhiteboard) {
  760. enum class EFlag {
  761. GREEN,
  762. YELLOW,
  763. ORANGE,
  764. RED,
  765. };
  766. EFlag flagState = EFlag::RED;
  767. if (Socket) {
  768. flagState = EFlag::GREEN;
  769. do {
  770. auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
  771. if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
  772. flagState = EFlag::ORANGE;
  773. break;
  774. } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) {
  775. flagState = EFlag::YELLOW;
  776. }
  777. // check utilization
  778. if (utilization > 875000) { // 7/8
  779. flagState = EFlag::ORANGE;
  780. break;
  781. } else if (utilization > 500000) { // 1/2
  782. flagState = EFlag::YELLOW;
  783. }
  784. } while (false);
  785. }
  786. callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(),
  787. connected,
  788. flagState == EFlag::GREEN,
  789. flagState == EFlag::YELLOW,
  790. flagState == EFlag::ORANGE,
  791. flagState == EFlag::RED,
  792. TlsActivationContext->ExecutorThread.ActorSystem);
  793. }
  794. if (connected) {
  795. Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
  796. }
  797. }
  798. void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) {
  799. if (OutputStuckFlag == state)
  800. return;
  801. if (OutputQueueUtilization.Size() == 0)
  802. return;
  803. auto& lastpair = OutputQueueUtilization.Last();
  804. if (state)
  805. lastpair.first -= GetCycleCountFast();
  806. else
  807. lastpair.first += GetCycleCountFast();
  808. OutputStuckFlag = state;
  809. }
  810. void TInterconnectSessionTCP::SwitchStuckPeriod() {
  811. auto now = GetCycleCountFast();
  812. if (OutputQueueUtilization.Size() != 0) {
  813. auto& lastpair = OutputQueueUtilization.Last();
  814. lastpair.second = now - lastpair.second;
  815. if (OutputStuckFlag)
  816. lastpair.first += now;
  817. }
  818. OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now));
  819. if (OutputStuckFlag)
  820. OutputQueueUtilization.Last().first -= now;
  821. }
  822. TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const {
  823. return Coalesce(Proxy->Common->Settings.DeadPeer, DEFAULT_DEADPEER_TIMEOUT);
  824. }
  825. TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const {
  826. return Proxy->Common->Settings.CloseOnIdle;
  827. }
  828. TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const {
  829. return Coalesce(Proxy->Common->Settings.LostConnection, DEFAULT_LOST_CONNECTION_TIMEOUT);
  830. }
  831. ui32 TInterconnectSessionTCP::GetTotalInflightAmountOfData() const {
  832. return Coalesce(Proxy->Common->Settings.TotalInflightAmountOfData, DEFAULT_TOTAL_INFLIGHT_DATA);
  833. }
  834. ui64 TInterconnectSessionTCP::GetMaxCyclesPerEvent() const {
  835. return DurationToCycles(TDuration::MicroSeconds(50));
  836. }
  837. void TInterconnectSessionTCP::IssuePingRequest() {
  838. const TInstant now = TActivationContext::Now();
  839. if (now >= LastPingTimestamp + PingPeriodicity) {
  840. LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
  841. if (Socket) {
  842. MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask);
  843. }
  844. if (Socket) {
  845. MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask);
  846. }
  847. LastPingTimestamp = now;
  848. }
  849. }
  850. void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) {
  851. if (Socket) {
  852. MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask);
  853. }
  854. }
  855. void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) {
  856. HTML(str) {
  857. DIV_CLASS("panel panel-info") {
  858. DIV_CLASS("panel-heading") {
  859. str << "Session";
  860. }
  861. DIV_CLASS("panel-body") {
  862. TABLE_CLASS("table") {
  863. TABLEHEAD() {
  864. TABLER() {
  865. TABLEH() {
  866. str << "Sensor";
  867. }
  868. TABLEH() {
  869. str << "Value";
  870. }
  871. }
  872. }
  873. TABLEBODY() {
  874. TABLER() {
  875. TABLED() {
  876. str << "Encryption";
  877. }
  878. TABLED() {
  879. str << (Params.Encryption ? "<font color=green>Enabled</font>" : "<font color=red>Disabled</font>");
  880. }
  881. }
  882. if (auto *x = dynamic_cast<NInterconnect::TSecureSocket*>(Socket.Get())) {
  883. TABLER() {
  884. TABLED() {
  885. str << "Cipher name";
  886. }
  887. TABLED() {
  888. str << x->GetCipherName();
  889. }
  890. }
  891. TABLER() {
  892. TABLED() {
  893. str << "Cipher bits";
  894. }
  895. TABLED() {
  896. str << x->GetCipherBits();
  897. }
  898. }
  899. TABLER() {
  900. TABLED() {
  901. str << "Protocol";
  902. }
  903. TABLED() {
  904. str << x->GetProtocolName();
  905. }
  906. }
  907. TABLER() {
  908. TABLED() {
  909. str << "Peer CN";
  910. }
  911. TABLED() {
  912. str << x->GetPeerCommonName();
  913. }
  914. }
  915. }
  916. TABLER() {
  917. TABLED() { str << "AuthOnly CN"; }
  918. TABLED() { str << Params.AuthCN; }
  919. }
  920. TABLER() {
  921. TABLED() {
  922. str << "Local scope id";
  923. }
  924. TABLED() {
  925. str << ScopeIdToString(Proxy->Common->LocalScopeId);
  926. }
  927. }
  928. TABLER() {
  929. TABLED() {
  930. str << "Peer scope id";
  931. }
  932. TABLED() {
  933. str << ScopeIdToString(Params.PeerScopeId);
  934. }
  935. }
  936. TABLER() {
  937. TABLED() {
  938. str << "This page generated at";
  939. }
  940. TABLED() {
  941. str << TActivationContext::Now() << " / " << Now();
  942. }
  943. }
  944. TABLER() {
  945. TABLED() {
  946. str << "SelfID";
  947. }
  948. TABLED() {
  949. str << SelfId().ToString();
  950. }
  951. }
  952. TABLER() {
  953. TABLED() { str << "Frame version/Checksum"; }
  954. TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); }
  955. }
  956. #define MON_VAR(NAME) \
  957. TABLER() { \
  958. TABLED() { \
  959. str << #NAME; \
  960. } \
  961. TABLED() { \
  962. str << NAME; \
  963. } \
  964. }
  965. MON_VAR(Created)
  966. MON_VAR(NewConnectionSet)
  967. MON_VAR(ReceiverId)
  968. MON_VAR(MessagesGot)
  969. MON_VAR(MessagesWrittenToBuffer)
  970. MON_VAR(PacketsGenerated)
  971. MON_VAR(PacketsWrittenToSocket)
  972. MON_VAR(PacketsConfirmed)
  973. MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket))
  974. MON_VAR(ConfirmPacketsForcedBySize)
  975. MON_VAR(ConfirmPacketsForcedByTimeout)
  976. TABLER() {
  977. TABLED() {
  978. str << "Virtual self ID";
  979. }
  980. TABLED() {
  981. str << Proxy->SessionVirtualId.ToString();
  982. }
  983. }
  984. TABLER() {
  985. TABLED() {
  986. str << "Virtual peer ID";
  987. }
  988. TABLED() {
  989. str << Proxy->RemoteSessionVirtualId.ToString();
  990. }
  991. }
  992. TABLER() {
  993. TABLED() {
  994. str << "Socket";
  995. }
  996. TABLED() {
  997. str << (Socket ? i64(*Socket) : -1);
  998. }
  999. }
  1000. ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
  1001. MON_VAR(OutputStuckFlag)
  1002. MON_VAR(SendQueue.size())
  1003. MON_VAR(SendQueueCache.size())
  1004. MON_VAR(NumEventsInReadyChannels)
  1005. MON_VAR(TotalOutputQueueSize)
  1006. MON_VAR(BytesUnwritten)
  1007. MON_VAR(InflightDataAmount)
  1008. MON_VAR(unsentQueueSize)
  1009. MON_VAR(SendBufferSize)
  1010. MON_VAR(LastInputActivityTimestamp)
  1011. MON_VAR(LastPayloadActivityTimestamp)
  1012. MON_VAR(LastHandshakeDone)
  1013. MON_VAR(OutputCounter)
  1014. MON_VAR(LastSentSerial)
  1015. MON_VAR(ReceiveContext->GetLastProcessedPacketSerial())
  1016. MON_VAR(LastConfirmed)
  1017. MON_VAR(FlushSchedule.size())
  1018. MON_VAR(MaxFlushSchedule)
  1019. MON_VAR(FlushEventsScheduled)
  1020. MON_VAR(FlushEventsProcessed)
  1021. TString clockSkew;
  1022. i64 x = GetClockSkew();
  1023. if (x < 0) {
  1024. clockSkew = Sprintf("-%s", TDuration::MicroSeconds(-x).ToString().data());
  1025. } else {
  1026. clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data());
  1027. }
  1028. MON_VAR(LastPingTimestamp)
  1029. MON_VAR(GetPingRTT())
  1030. MON_VAR(clockSkew)
  1031. MON_VAR(GetDeadPeerTimeout())
  1032. MON_VAR(GetTotalInflightAmountOfData())
  1033. MON_VAR(GetCloseOnIdleTimeout())
  1034. MON_VAR(Subscribers.size())
  1035. }
  1036. }
  1037. }
  1038. }
  1039. }
  1040. }
  1041. void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) {
  1042. TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common));
  1043. }
  1044. }