interconnect_tcp_session.h 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. #pragma once
  2. #include <library/cpp/actors/core/hfunc.h>
  3. #include <library/cpp/actors/core/event_pb.h>
  4. #include <library/cpp/actors/core/events.h>
  5. #include <library/cpp/actors/core/log.h>
  6. #include <library/cpp/actors/helpers/mon_histogram_helper.h>
  7. #include <library/cpp/actors/protos/services_common.pb.h>
  8. #include <library/cpp/actors/util/datetime.h>
  9. #include <library/cpp/actors/util/rope.h>
  10. #include <library/cpp/actors/util/funnel_queue.h>
  11. #include <library/cpp/actors/util/recentwnd.h>
  12. #include <library/cpp/monlib/dynamic_counters/counters.h>
  13. #include <library/cpp/actors/core/actor_bootstrapped.h>
  14. #include <util/generic/queue.h>
  15. #include <util/generic/deque.h>
  16. #include <util/datetime/cputimer.h>
  17. #include "interconnect_impl.h"
  18. #include "poller_tcp.h"
  19. #include "poller_actor.h"
  20. #include "interconnect_channel.h"
  21. #include "logging.h"
  22. #include "watchdog_timer.h"
  23. #include "event_holder_pool.h"
  24. #include "channel_scheduler.h"
  25. #include <unordered_set>
  26. #include <unordered_map>
  27. namespace NActors {
  28. class TSlowPathChecker {
  29. using TTraceCallback = std::function<void(double)>;
  30. TTraceCallback Callback;
  31. const NHPTimer::STime Start;
  32. public:
  33. TSlowPathChecker(TTraceCallback&& callback)
  34. : Callback(std::move(callback))
  35. , Start(GetCycleCountFast())
  36. {
  37. }
  38. ~TSlowPathChecker() {
  39. const NHPTimer::STime end = GetCycleCountFast();
  40. const NHPTimer::STime elapsed = end - Start;
  41. if (elapsed > 1000000) {
  42. Callback(NHPTimer::GetSeconds(elapsed) * 1000);
  43. }
  44. }
  45. operator bool() const {
  46. return false;
  47. }
  48. };
  49. #define LWPROBE_IF_TOO_LONG(...) \
  50. if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \
  51. ; \
  52. else
  53. class TTimeLimit {
  54. public:
  55. TTimeLimit(ui64 limitInCycles)
  56. : UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles)
  57. {
  58. }
  59. TTimeLimit(ui64 startTS, ui64 limitInCycles)
  60. : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles)
  61. {
  62. }
  63. bool CheckExceeded() {
  64. return UpperLimit != 0 && GetCycleCountFast() > UpperLimit;
  65. }
  66. const ui64 UpperLimit;
  67. };
  68. static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10);
  69. static constexpr TDuration DEFAULT_LOST_CONNECTION_TIMEOUT = TDuration::Seconds(10);
  70. static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024;
  71. static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024;
  72. class TInterconnectProxyTCP;
  73. enum class EUpdateState : ui8 {
  74. NONE, // no updates generated by input session yet
  75. INFLIGHT, // one update is inflight, and no more pending
  76. INFLIGHT_AND_PENDING, // one update is inflight, and one is pending
  77. CONFIRMING, // confirmation inflight
  78. };
  79. struct TReceiveContext: public TAtomicRefCount<TReceiveContext> {
  80. /* All invokations to these fields should be thread-safe */
  81. ui64 ControlPacketSendTimer = 0;
  82. ui64 ControlPacketId = 0;
  83. // number of packets received by input session
  84. TAtomic PacketsReadFromSocket = 0;
  85. TAtomic DataPacketsReadFromSocket = 0;
  86. // last processed packet by input session
  87. std::atomic_uint64_t LastProcessedPacketSerial = 0;
  88. static constexpr uint64_t LastProcessedPacketSerialLockBit = uint64_t(1) << 63;
  89. // for hardened checks
  90. TAtomic NumInputSessions = 0;
  91. NHPTimer::STime StartTime;
  92. std::atomic<ui64> PingRTT_us = 0;
  93. std::atomic<i64> ClockSkew_us = 0;
  94. std::atomic<EUpdateState> UpdateState;
  95. static_assert(std::atomic<EUpdateState>::is_always_lock_free);
  96. bool WriteBlockedByFullSendBuffer = false;
  97. bool ReadPending = false;
  98. std::array<TRope, 16> ChannelArray;
  99. std::unordered_map<ui16, TRope> ChannelMap;
  100. TReceiveContext() {
  101. GetTimeFast(&StartTime);
  102. }
  103. // returns false if sessions needs to be terminated and packet not to be processed
  104. bool AdvanceLastProcessedPacketSerial() {
  105. for (;;) {
  106. uint64_t value = LastProcessedPacketSerial.load();
  107. if (value & LastProcessedPacketSerialLockBit) {
  108. return false;
  109. }
  110. if (LastProcessedPacketSerial.compare_exchange_weak(value, value + 1)) {
  111. return true;
  112. }
  113. }
  114. }
  115. ui64 LockLastProcessedPacketSerial() {
  116. for (;;) {
  117. uint64_t value = LastProcessedPacketSerial.load();
  118. if (value & LastProcessedPacketSerialLockBit) {
  119. return value & ~LastProcessedPacketSerialLockBit;
  120. }
  121. if (LastProcessedPacketSerial.compare_exchange_strong(value, value | LastProcessedPacketSerialLockBit)) {
  122. return value;
  123. }
  124. }
  125. }
  126. void UnlockLastProcessedPacketSerial() {
  127. LastProcessedPacketSerial = LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit;
  128. }
  129. ui64 GetLastProcessedPacketSerial() {
  130. return LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit;
  131. }
  132. };
  133. class TInputSessionTCP
  134. : public TActorBootstrapped<TInputSessionTCP>
  135. , public TInterconnectLoggingBase
  136. {
  137. enum {
  138. EvCheckDeadPeer = EventSpaceBegin(TEvents::ES_PRIVATE),
  139. EvResumeReceiveData,
  140. };
  141. struct TEvCheckDeadPeer : TEventLocal<TEvCheckDeadPeer, EvCheckDeadPeer> {};
  142. struct TEvResumeReceiveData : TEventLocal<TEvResumeReceiveData, EvResumeReceiveData> {};
  143. public:
  144. static constexpr EActivityType ActorActivityType() {
  145. return INTERCONNECT_SESSION_TCP;
  146. }
  147. TInputSessionTCP(const TActorId& sessionId,
  148. TIntrusivePtr<NInterconnect::TStreamSocket> socket,
  149. TIntrusivePtr<TReceiveContext> context,
  150. TInterconnectProxyCommon::TPtr common,
  151. std::shared_ptr<IInterconnectMetrics> metrics,
  152. ui32 nodeId,
  153. ui64 lastConfirmed,
  154. TDuration deadPeerTimeout,
  155. TSessionParams params);
  156. private:
  157. friend class TActorBootstrapped<TInputSessionTCP>;
  158. void Bootstrap();
  159. STRICT_STFUNC(WorkingState,
  160. cFunc(TEvents::TSystem::PoisonPill, PassAway)
  161. hFunc(TEvPollerReady, Handle)
  162. hFunc(TEvPollerRegisterResult, Handle)
  163. cFunc(EvResumeReceiveData, HandleResumeReceiveData)
  164. cFunc(TEvInterconnect::TEvCloseInputSession::EventType, CloseInputSession)
  165. cFunc(EvCheckDeadPeer, HandleCheckDeadPeer)
  166. cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate)
  167. )
  168. private:
  169. TRope IncomingData;
  170. const TActorId SessionId;
  171. TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
  172. TPollerToken::TPtr PollerToken;
  173. TIntrusivePtr<TReceiveContext> Context;
  174. TInterconnectProxyCommon::TPtr Common;
  175. const ui32 NodeId;
  176. const TSessionParams Params;
  177. // header we are currently processing (parsed from the stream)
  178. union {
  179. TTcpPacketHeader_v1 v1;
  180. TTcpPacketHeader_v2 v2;
  181. char Data[1];
  182. } Header;
  183. ui64 HeaderConfirm, HeaderSerial;
  184. size_t PayloadSize;
  185. ui32 ChecksumExpected, Checksum;
  186. bool IgnorePayload;
  187. TRope Payload;
  188. enum class EState {
  189. HEADER,
  190. PAYLOAD,
  191. };
  192. EState State = EState::HEADER;
  193. THolder<TEvUpdateFromInputSession> UpdateFromInputSession;
  194. ui64 ConfirmedByInput;
  195. std::shared_ptr<IInterconnectMetrics> Metrics;
  196. bool CloseInputSessionRequested = false;
  197. void CloseInputSession();
  198. void Handle(TEvPollerReady::TPtr ev);
  199. void Handle(TEvPollerRegisterResult::TPtr ev);
  200. void HandleResumeReceiveData();
  201. void HandleConfirmUpdate();
  202. void ReceiveData();
  203. void ProcessHeader(size_t headerLen);
  204. void ProcessPayload(ui64& numDataBytes);
  205. void ProcessEvent(TRope& data, TEventDescr& descr);
  206. bool ReadMore();
  207. void ReestablishConnection(TDisconnectReason reason);
  208. void DestroySession(TDisconnectReason reason);
  209. TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers;
  210. static constexpr size_t NumPreallocatedBuffers = 16;
  211. void PreallocateBuffers();
  212. inline ui64 GetMaxCyclesPerEvent() const {
  213. return DurationToCycles(TDuration::MicroSeconds(500));
  214. }
  215. const TDuration DeadPeerTimeout;
  216. TInstant LastReceiveTimestamp;
  217. void HandleCheckDeadPeer();
  218. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  219. // pinger logic
  220. bool NewPingProtocol = false;
  221. TDeque<TDuration> PingQ; // last N ping samples
  222. TDeque<i64> SkewQ; // last N calculated clock skew samples
  223. void HandlePingResponse(TDuration passed);
  224. void HandleClock(TInstant clock);
  225. };
  226. class TInterconnectSessionTCP
  227. : public TActor<TInterconnectSessionTCP>
  228. , public TInterconnectLoggingBase
  229. {
  230. enum {
  231. EvCheckCloseOnIdle = EventSpaceBegin(TEvents::ES_PRIVATE),
  232. EvCheckLostConnection,
  233. EvRam,
  234. EvTerminate,
  235. EvFreeItems,
  236. };
  237. struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {};
  238. struct TEvCheckLostConnection : TEventLocal<TEvCheckLostConnection, EvCheckLostConnection> {};
  239. struct TEvRam : TEventLocal<TEvRam, EvRam> {
  240. const bool Batching;
  241. TEvRam(bool batching) : Batching(batching) {}
  242. };
  243. struct TEvTerminate : TEventLocal<TEvTerminate, EvTerminate> {
  244. TDisconnectReason Reason;
  245. TEvTerminate(TDisconnectReason reason)
  246. : Reason(std::move(reason))
  247. {}
  248. };
  249. const TInstant Created;
  250. TInstant NewConnectionSet;
  251. ui64 MessagesGot = 0;
  252. ui64 MessagesWrittenToBuffer = 0;
  253. ui64 PacketsGenerated = 0;
  254. ui64 PacketsWrittenToSocket = 0;
  255. ui64 PacketsConfirmed = 0;
  256. public:
  257. static constexpr EActivityType ActorActivityType() {
  258. return INTERCONNECT_SESSION_TCP;
  259. }
  260. TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params);
  261. ~TInterconnectSessionTCP();
  262. void Init();
  263. void CloseInputSession();
  264. static TEvTerminate* NewEvTerminate(TDisconnectReason reason) {
  265. return new TEvTerminate(std::move(reason));
  266. }
  267. TDuration GetPingRTT() const {
  268. return TDuration::MicroSeconds(ReceiveContext->PingRTT_us);
  269. }
  270. i64 GetClockSkew() const {
  271. return ReceiveContext->ClockSkew_us;
  272. }
  273. private:
  274. friend class TInterconnectProxyTCP;
  275. void Handle(TEvTerminate::TPtr& ev);
  276. void HandlePoison();
  277. void Terminate(TDisconnectReason reason);
  278. void PassAway() override;
  279. void Forward(STATEFN_SIG);
  280. void Subscribe(STATEFN_SIG);
  281. void Unsubscribe(STATEFN_SIG);
  282. STRICT_STFUNC(StateFunc,
  283. fFunc(TEvInterconnect::EvForward, Forward)
  284. cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison)
  285. fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe)
  286. fFunc(TEvents::TEvSubscribe::EventType, Subscribe)
  287. fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe)
  288. cFunc(TEvFlush::EventType, HandleFlush)
  289. hFunc(TEvPollerReady, Handle)
  290. hFunc(TEvPollerRegisterResult, Handle)
  291. hFunc(TEvUpdateFromInputSession, Handle)
  292. hFunc(TEvRam, HandleRam)
  293. hFunc(TEvCheckCloseOnIdle, CloseOnIdleWatchdog)
  294. hFunc(TEvCheckLostConnection, LostConnectionWatchdog)
  295. cFunc(TEvents::TSystem::Wakeup, SendUpdateToWhiteboard)
  296. hFunc(TEvSocketDisconnect, OnDisconnect)
  297. hFunc(TEvTerminate, Handle)
  298. hFunc(TEvProcessPingRequest, Handle)
  299. )
  300. void Handle(TEvUpdateFromInputSession::TPtr& ev);
  301. void OnDisconnect(TEvSocketDisconnect::TPtr& ev);
  302. THolder<TEvHandshakeAck> ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev);
  303. void SetNewConnection(TEvHandshakeDone::TPtr& ev);
  304. TEvRam* RamInQueue = nullptr;
  305. ui64 RamStartedCycles = 0;
  306. void HandleRam(TEvRam::TPtr& ev);
  307. void GenerateTraffic();
  308. void SendUpdateToWhiteboard(bool connected = true);
  309. ui32 CalculateQueueUtilization();
  310. void Handle(TEvPollerReady::TPtr& ev);
  311. void Handle(TEvPollerRegisterResult::TPtr ev);
  312. void WriteData();
  313. ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
  314. void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
  315. bool DropConfirmed(ui64 confirm);
  316. void ShutdownSocket(TDisconnectReason reason);
  317. void StartHandshake();
  318. void ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose,
  319. TDisconnectReason reason);
  320. void ReestablishConnectionWithHandshake(TDisconnectReason reason);
  321. void ReestablishConnectionExecute();
  322. TInterconnectProxyTCP* const Proxy;
  323. // various connection settings access
  324. TDuration GetDeadPeerTimeout() const;
  325. TDuration GetCloseOnIdleTimeout() const;
  326. TDuration GetLostConnectionTimeout() const;
  327. ui32 GetTotalInflightAmountOfData() const;
  328. ui64 GetMaxCyclesPerEvent() const;
  329. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  330. // pinger
  331. TInstant LastPingTimestamp;
  332. static constexpr TDuration PingPeriodicity = TDuration::Seconds(1);
  333. void IssuePingRequest();
  334. void Handle(TEvProcessPingRequest::TPtr ev);
  335. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  336. TInstant LastInputActivityTimestamp;
  337. TInstant LastPayloadActivityTimestamp;
  338. TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog;
  339. TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog;
  340. void OnCloseOnIdleTimerHit() {
  341. LOG_INFO_IC("ICS27", "CloseOnIdle timer hit, session terminated");
  342. Terminate(TDisconnectReason::CloseOnIdle());
  343. }
  344. void OnLostConnectionTimerHit() {
  345. LOG_ERROR_IC("ICS28", "LostConnection timer hit, session terminated");
  346. Terminate(TDisconnectReason::LostConnection());
  347. }
  348. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  349. const TSessionParams Params;
  350. TMaybe<TEventHolderPool> Pool;
  351. TMaybe<TChannelScheduler> ChannelScheduler;
  352. ui64 TotalOutputQueueSize;
  353. bool OutputStuckFlag;
  354. TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization;
  355. size_t NumEventsInReadyChannels = 0;
  356. void SetOutputStuckFlag(bool state);
  357. void SwitchStuckPeriod();
  358. using TSendQueue = TList<TTcpPacketOutTask>;
  359. TSendQueue SendQueue;
  360. TSendQueue SendQueueCache;
  361. TSendQueue::iterator SendQueuePos;
  362. ui64 WriteBlockedCycles = 0; // start of current block period
  363. TDuration WriteBlockedTotal; // total incremental duration that session has been blocked
  364. ui64 BytesUnwritten = 0;
  365. void TrimSendQueueCache();
  366. TDuration GetWriteBlockedTotal() const {
  367. if (ReceiveContext->WriteBlockedByFullSendBuffer) {
  368. double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0;
  369. return WriteBlockedTotal + TDuration::MicroSeconds(blockedUs); // append current blocking period if any
  370. } else {
  371. return WriteBlockedTotal;
  372. }
  373. }
  374. ui64 OutputCounter;
  375. ui64 LastSentSerial = 0;
  376. TInstant LastHandshakeDone;
  377. TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
  378. TPollerToken::TPtr PollerToken;
  379. ui32 SendBufferSize;
  380. ui64 InflightDataAmount = 0;
  381. std::unordered_map<TActorId, ui64, TActorId::THash> Subscribers;
  382. // time at which we want to send confirmation packet even if there was no outgoing data
  383. ui64 UnconfirmedBytes = 0;
  384. TInstant ForcePacketTimestamp = TInstant::Max();
  385. TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule;
  386. size_t MaxFlushSchedule = 0;
  387. ui64 FlushEventsScheduled = 0;
  388. ui64 FlushEventsProcessed = 0;
  389. void SetForcePacketTimestamp(TDuration period);
  390. void ScheduleFlush();
  391. void HandleFlush();
  392. void ResetFlushLogic();
  393. void GenerateHttpInfo(TStringStream& str);
  394. TIntrusivePtr<TReceiveContext> ReceiveContext;
  395. TActorId ReceiverId;
  396. TDuration Ping;
  397. ui64 ConfirmPacketsForcedBySize = 0;
  398. ui64 ConfirmPacketsForcedByTimeout = 0;
  399. ui64 LastConfirmed = 0;
  400. TEvHandshakeDone::TPtr PendingHandshakeDoneEvent;
  401. bool StartHandshakeOnSessionClose = false;
  402. ui64 EqualizeCounter = 0;
  403. };
  404. class TInterconnectSessionKiller
  405. : public TActorBootstrapped<TInterconnectSessionKiller> {
  406. ui32 RepliesReceived = 0;
  407. ui32 RepliesNumber = 0;
  408. TActorId LargestSession = TActorId();
  409. ui64 MaxBufferSize = 0;
  410. TInterconnectProxyCommon::TPtr Common;
  411. public:
  412. static constexpr EActivityType ActorActivityType() {
  413. return INTERCONNECT_SESSION_KILLER;
  414. }
  415. TInterconnectSessionKiller(TInterconnectProxyCommon::TPtr common)
  416. : Common(common)
  417. {
  418. }
  419. void Bootstrap() {
  420. auto sender = SelfId();
  421. const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
  422. auto ev = new TEvSessionBufferSizeRequest();
  423. return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
  424. };
  425. RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric);
  426. Become(&TInterconnectSessionKiller::StateFunc);
  427. }
  428. STRICT_STFUNC(StateFunc,
  429. hFunc(TEvSessionBufferSizeResponse, ProcessResponse)
  430. cFunc(TEvents::TEvUndelivered::EventType, ProcessUndelivered)
  431. )
  432. void ProcessResponse(TEvSessionBufferSizeResponse::TPtr& ev) {
  433. RepliesReceived++;
  434. if (MaxBufferSize < ev->Get()->BufferSize) {
  435. MaxBufferSize = ev->Get()->BufferSize;
  436. LargestSession = ev->Get()->SessionID;
  437. }
  438. if (RepliesReceived == RepliesNumber) {
  439. Send(LargestSession, new TEvents::TEvPoisonPill);
  440. AtomicUnlock(&Common->StartedSessionKiller);
  441. PassAway();
  442. }
  443. }
  444. void ProcessUndelivered() {
  445. RepliesReceived++;
  446. }
  447. };
  448. void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common);
  449. }