interconnect_tcp_session.h 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. #pragma once
  2. #include <library/cpp/actors/core/hfunc.h>
  3. #include <library/cpp/actors/core/event_pb.h>
  4. #include <library/cpp/actors/core/events.h>
  5. #include <library/cpp/actors/core/log.h>
  6. #include <library/cpp/actors/helpers/mon_histogram_helper.h>
  7. #include <library/cpp/actors/protos/services_common.pb.h>
  8. #include <library/cpp/actors/util/datetime.h>
  9. #include <library/cpp/actors/util/rope.h>
  10. #include <library/cpp/actors/util/funnel_queue.h>
  11. #include <library/cpp/actors/util/recentwnd.h>
  12. #include <library/cpp/monlib/dynamic_counters/counters.h>
  13. #include <library/cpp/actors/core/actor_bootstrapped.h>
  14. #include <util/generic/queue.h>
  15. #include <util/generic/deque.h>
  16. #include <util/datetime/cputimer.h>
  17. #include "interconnect_impl.h"
  18. #include "poller_tcp.h"
  19. #include "poller_actor.h"
  20. #include "interconnect_channel.h"
  21. #include "logging.h"
  22. #include "watchdog_timer.h"
  23. #include "event_holder_pool.h"
  24. #include "channel_scheduler.h"
  25. #include <unordered_set>
  26. #include <unordered_map>
  27. namespace NActors {
  28. class TSlowPathChecker {
  29. using TTraceCallback = std::function<void(double)>;
  30. TTraceCallback Callback;
  31. const NHPTimer::STime Start;
  32. public:
  33. TSlowPathChecker(TTraceCallback&& callback)
  34. : Callback(std::move(callback))
  35. , Start(GetCycleCountFast())
  36. {
  37. }
  38. ~TSlowPathChecker() {
  39. const NHPTimer::STime end = GetCycleCountFast();
  40. const NHPTimer::STime elapsed = end - Start;
  41. if (elapsed > 1000000) {
  42. Callback(NHPTimer::GetSeconds(elapsed) * 1000);
  43. }
  44. }
  45. operator bool() const {
  46. return false;
  47. }
  48. };
  49. #define LWPROBE_IF_TOO_LONG(...) \
  50. if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \
  51. ; \
  52. else
  53. class TTimeLimit {
  54. public:
  55. TTimeLimit(ui64 limitInCycles)
  56. : UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles)
  57. {
  58. }
  59. TTimeLimit(ui64 startTS, ui64 limitInCycles)
  60. : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles)
  61. {
  62. }
  63. bool CheckExceeded() {
  64. return UpperLimit != 0 && GetCycleCountFast() > UpperLimit;
  65. }
  66. const ui64 UpperLimit;
  67. };
  68. static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10);
  69. static constexpr TDuration DEFAULT_LOST_CONNECTION_TIMEOUT = TDuration::Seconds(10);
  70. static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024;
  71. static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024;
  72. class TInterconnectProxyTCP;
  73. enum class EUpdateState : ui8 {
  74. NONE, // no updates generated by input session yet
  75. INFLIGHT, // one update is inflight, and no more pending
  76. INFLIGHT_AND_PENDING, // one update is inflight, and one is pending
  77. CONFIRMING, // confirmation inflight
  78. };
  79. struct TReceiveContext: public TAtomicRefCount<TReceiveContext> {
  80. /* All invokations to these fields should be thread-safe */
  81. ui64 ControlPacketSendTimer = 0;
  82. ui64 ControlPacketId = 0;
  83. // number of packets received by input session
  84. TAtomic PacketsReadFromSocket = 0;
  85. TAtomic DataPacketsReadFromSocket = 0;
  86. // last processed packet by input session
  87. std::atomic_uint64_t LastProcessedPacketSerial = 0;
  88. static constexpr uint64_t LastProcessedPacketSerialLockBit = uint64_t(1) << 63;
  89. // for hardened checks
  90. TAtomic NumInputSessions = 0;
  91. NHPTimer::STime StartTime;
  92. std::atomic<ui64> PingRTT_us = 0;
  93. std::atomic<i64> ClockSkew_us = 0;
  94. std::atomic<EUpdateState> UpdateState;
  95. static_assert(std::atomic<EUpdateState>::is_always_lock_free);
  96. bool WriteBlockedByFullSendBuffer = false;
  97. bool ReadPending = false;
  98. std::array<TRope, 16> ChannelArray;
  99. std::unordered_map<ui16, TRope> ChannelMap;
  100. TReceiveContext() {
  101. GetTimeFast(&StartTime);
  102. }
  103. // returns false if sessions needs to be terminated and packet not to be processed
  104. bool AdvanceLastProcessedPacketSerial() {
  105. for (;;) {
  106. uint64_t value = LastProcessedPacketSerial.load();
  107. if (value & LastProcessedPacketSerialLockBit) {
  108. return false;
  109. }
  110. if (LastProcessedPacketSerial.compare_exchange_weak(value, value + 1)) {
  111. return true;
  112. }
  113. }
  114. }
  115. ui64 LockLastProcessedPacketSerial() {
  116. for (;;) {
  117. uint64_t value = LastProcessedPacketSerial.load();
  118. if (value & LastProcessedPacketSerialLockBit) {
  119. return value & ~LastProcessedPacketSerialLockBit;
  120. }
  121. if (LastProcessedPacketSerial.compare_exchange_strong(value, value | LastProcessedPacketSerialLockBit)) {
  122. return value;
  123. }
  124. }
  125. }
  126. void UnlockLastProcessedPacketSerial() {
  127. LastProcessedPacketSerial = LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit;
  128. }
  129. ui64 GetLastProcessedPacketSerial() {
  130. return LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit;
  131. }
  132. };
  133. class TInputSessionTCP
  134. : public TActorBootstrapped<TInputSessionTCP>
  135. , public TInterconnectLoggingBase
  136. {
  137. enum {
  138. EvCheckDeadPeer = EventSpaceBegin(TEvents::ES_PRIVATE),
  139. EvResumeReceiveData,
  140. };
  141. struct TEvCheckDeadPeer : TEventLocal<TEvCheckDeadPeer, EvCheckDeadPeer> {};
  142. struct TEvResumeReceiveData : TEventLocal<TEvResumeReceiveData, EvResumeReceiveData> {};
  143. public:
  144. static constexpr EActivityType ActorActivityType() {
  145. return INTERCONNECT_SESSION_TCP;
  146. }
  147. TInputSessionTCP(const TActorId& sessionId,
  148. TIntrusivePtr<NInterconnect::TStreamSocket> socket,
  149. TIntrusivePtr<TReceiveContext> context,
  150. TInterconnectProxyCommon::TPtr common,
  151. std::shared_ptr<IInterconnectMetrics> metrics,
  152. ui32 nodeId,
  153. ui64 lastConfirmed,
  154. TDuration deadPeerTimeout,
  155. TSessionParams params);
  156. private:
  157. friend class TActorBootstrapped<TInputSessionTCP>;
  158. void Bootstrap();
  159. STRICT_STFUNC(WorkingState,
  160. cFunc(TEvents::TSystem::PoisonPill, PassAway)
  161. hFunc(TEvPollerReady, Handle)
  162. hFunc(TEvPollerRegisterResult, Handle)
  163. cFunc(EvResumeReceiveData, HandleResumeReceiveData)
  164. cFunc(TEvInterconnect::TEvCloseInputSession::EventType, CloseInputSession)
  165. cFunc(EvCheckDeadPeer, HandleCheckDeadPeer)
  166. cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate)
  167. )
  168. private:
  169. TRope IncomingData;
  170. const TActorId SessionId;
  171. TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
  172. TPollerToken::TPtr PollerToken;
  173. TIntrusivePtr<TReceiveContext> Context;
  174. TInterconnectProxyCommon::TPtr Common;
  175. const ui32 NodeId;
  176. const TSessionParams Params;
  177. // header we are currently processing (parsed from the stream)
  178. union {
  179. TTcpPacketHeader_v1 v1;
  180. TTcpPacketHeader_v2 v2;
  181. char Data[1];
  182. } Header;
  183. ui64 HeaderConfirm, HeaderSerial;
  184. size_t PayloadSize;
  185. ui32 ChecksumExpected, Checksum;
  186. bool IgnorePayload;
  187. TRope Payload;
  188. enum class EState {
  189. HEADER,
  190. PAYLOAD,
  191. };
  192. EState State = EState::HEADER;
  193. THolder<TEvUpdateFromInputSession> UpdateFromInputSession;
  194. ui64 ConfirmedByInput;
  195. std::shared_ptr<IInterconnectMetrics> Metrics;
  196. bool CloseInputSessionRequested = false;
  197. void CloseInputSession();
  198. void Handle(TEvPollerReady::TPtr ev);
  199. void Handle(TEvPollerRegisterResult::TPtr ev);
  200. void HandleResumeReceiveData();
  201. void HandleConfirmUpdate();
  202. void ReceiveData();
  203. void ProcessHeader(size_t headerLen);
  204. void ProcessPayload(ui64& numDataBytes);
  205. void ProcessEvent(TRope& data, TEventDescr& descr);
  206. bool ReadMore();
  207. void ReestablishConnection(TDisconnectReason reason);
  208. void DestroySession(TDisconnectReason reason);
  209. void PassAway() override;
  210. TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers;
  211. static constexpr size_t NumPreallocatedBuffers = 16;
  212. void PreallocateBuffers();
  213. inline ui64 GetMaxCyclesPerEvent() const {
  214. return DurationToCycles(TDuration::MicroSeconds(500));
  215. }
  216. const TDuration DeadPeerTimeout;
  217. TInstant LastReceiveTimestamp;
  218. void HandleCheckDeadPeer();
  219. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  220. // pinger logic
  221. bool NewPingProtocol = false;
  222. TDeque<TDuration> PingQ; // last N ping samples
  223. TDeque<i64> SkewQ; // last N calculated clock skew samples
  224. void HandlePingResponse(TDuration passed);
  225. void HandleClock(TInstant clock);
  226. };
  227. class TInterconnectSessionTCP
  228. : public TActor<TInterconnectSessionTCP>
  229. , public TInterconnectLoggingBase
  230. {
  231. enum {
  232. EvCheckCloseOnIdle = EventSpaceBegin(TEvents::ES_PRIVATE),
  233. EvCheckLostConnection,
  234. EvRam,
  235. EvTerminate,
  236. EvFreeItems,
  237. };
  238. struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {};
  239. struct TEvCheckLostConnection : TEventLocal<TEvCheckLostConnection, EvCheckLostConnection> {};
  240. struct TEvRam : TEventLocal<TEvRam, EvRam> {
  241. const bool Batching;
  242. TEvRam(bool batching) : Batching(batching) {}
  243. };
  244. struct TEvTerminate : TEventLocal<TEvTerminate, EvTerminate> {
  245. TDisconnectReason Reason;
  246. TEvTerminate(TDisconnectReason reason)
  247. : Reason(std::move(reason))
  248. {}
  249. };
  250. const TInstant Created;
  251. TInstant NewConnectionSet;
  252. ui64 MessagesGot = 0;
  253. ui64 MessagesWrittenToBuffer = 0;
  254. ui64 PacketsGenerated = 0;
  255. ui64 PacketsWrittenToSocket = 0;
  256. ui64 PacketsConfirmed = 0;
  257. public:
  258. static constexpr EActivityType ActorActivityType() {
  259. return INTERCONNECT_SESSION_TCP;
  260. }
  261. TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params);
  262. ~TInterconnectSessionTCP();
  263. void Init();
  264. void CloseInputSession();
  265. static TEvTerminate* NewEvTerminate(TDisconnectReason reason) {
  266. return new TEvTerminate(std::move(reason));
  267. }
  268. TDuration GetPingRTT() const {
  269. return TDuration::MicroSeconds(ReceiveContext->PingRTT_us);
  270. }
  271. i64 GetClockSkew() const {
  272. return ReceiveContext->ClockSkew_us;
  273. }
  274. private:
  275. friend class TInterconnectProxyTCP;
  276. void Handle(TEvTerminate::TPtr& ev);
  277. void HandlePoison();
  278. void Terminate(TDisconnectReason reason);
  279. void PassAway() override;
  280. void Forward(STATEFN_SIG);
  281. void Subscribe(STATEFN_SIG);
  282. void Unsubscribe(STATEFN_SIG);
  283. STRICT_STFUNC(StateFunc,
  284. fFunc(TEvInterconnect::EvForward, Forward)
  285. cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison)
  286. fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe)
  287. fFunc(TEvents::TEvSubscribe::EventType, Subscribe)
  288. fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe)
  289. cFunc(TEvFlush::EventType, HandleFlush)
  290. hFunc(TEvPollerReady, Handle)
  291. hFunc(TEvPollerRegisterResult, Handle)
  292. hFunc(TEvUpdateFromInputSession, Handle)
  293. hFunc(TEvRam, HandleRam)
  294. hFunc(TEvCheckCloseOnIdle, CloseOnIdleWatchdog)
  295. hFunc(TEvCheckLostConnection, LostConnectionWatchdog)
  296. cFunc(TEvents::TSystem::Wakeup, SendUpdateToWhiteboard)
  297. hFunc(TEvSocketDisconnect, OnDisconnect)
  298. hFunc(TEvTerminate, Handle)
  299. hFunc(TEvProcessPingRequest, Handle)
  300. )
  301. void Handle(TEvUpdateFromInputSession::TPtr& ev);
  302. void OnDisconnect(TEvSocketDisconnect::TPtr& ev);
  303. THolder<TEvHandshakeAck> ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev);
  304. void SetNewConnection(TEvHandshakeDone::TPtr& ev);
  305. TEvRam* RamInQueue = nullptr;
  306. ui64 RamStartedCycles = 0;
  307. void HandleRam(TEvRam::TPtr& ev);
  308. void GenerateTraffic();
  309. void SendUpdateToWhiteboard(bool connected = true);
  310. ui32 CalculateQueueUtilization();
  311. void Handle(TEvPollerReady::TPtr& ev);
  312. void Handle(TEvPollerRegisterResult::TPtr ev);
  313. void WriteData();
  314. ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
  315. void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
  316. bool DropConfirmed(ui64 confirm);
  317. void ShutdownSocket(TDisconnectReason reason);
  318. void StartHandshake();
  319. void ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose,
  320. TDisconnectReason reason);
  321. void ReestablishConnectionWithHandshake(TDisconnectReason reason);
  322. void ReestablishConnectionExecute();
  323. TInterconnectProxyTCP* const Proxy;
  324. // various connection settings access
  325. TDuration GetDeadPeerTimeout() const;
  326. TDuration GetCloseOnIdleTimeout() const;
  327. TDuration GetLostConnectionTimeout() const;
  328. ui32 GetTotalInflightAmountOfData() const;
  329. ui64 GetMaxCyclesPerEvent() const;
  330. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  331. // pinger
  332. TInstant LastPingTimestamp;
  333. static constexpr TDuration PingPeriodicity = TDuration::Seconds(1);
  334. void IssuePingRequest();
  335. void Handle(TEvProcessPingRequest::TPtr ev);
  336. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  337. TInstant LastInputActivityTimestamp;
  338. TInstant LastPayloadActivityTimestamp;
  339. TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog;
  340. TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog;
  341. void OnCloseOnIdleTimerHit() {
  342. LOG_INFO_IC("ICS27", "CloseOnIdle timer hit, session terminated");
  343. Terminate(TDisconnectReason::CloseOnIdle());
  344. }
  345. void OnLostConnectionTimerHit() {
  346. LOG_ERROR_IC("ICS28", "LostConnection timer hit, session terminated");
  347. Terminate(TDisconnectReason::LostConnection());
  348. }
  349. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  350. const TSessionParams Params;
  351. TMaybe<TEventHolderPool> Pool;
  352. TMaybe<TChannelScheduler> ChannelScheduler;
  353. ui64 TotalOutputQueueSize;
  354. bool OutputStuckFlag;
  355. TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization;
  356. size_t NumEventsInReadyChannels = 0;
  357. void SetOutputStuckFlag(bool state);
  358. void SwitchStuckPeriod();
  359. using TSendQueue = TList<TTcpPacketOutTask>;
  360. TSendQueue SendQueue;
  361. TSendQueue SendQueueCache;
  362. TSendQueue::iterator SendQueuePos;
  363. ui64 WriteBlockedCycles = 0; // start of current block period
  364. TDuration WriteBlockedTotal; // total incremental duration that session has been blocked
  365. ui64 BytesUnwritten = 0;
  366. void TrimSendQueueCache();
  367. TDuration GetWriteBlockedTotal() const {
  368. if (ReceiveContext->WriteBlockedByFullSendBuffer) {
  369. double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0;
  370. return WriteBlockedTotal + TDuration::MicroSeconds(blockedUs); // append current blocking period if any
  371. } else {
  372. return WriteBlockedTotal;
  373. }
  374. }
  375. ui64 OutputCounter;
  376. ui64 LastSentSerial = 0;
  377. TInstant LastHandshakeDone;
  378. TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
  379. TPollerToken::TPtr PollerToken;
  380. ui32 SendBufferSize;
  381. ui64 InflightDataAmount = 0;
  382. std::unordered_map<TActorId, ui64, TActorId::THash> Subscribers;
  383. // time at which we want to send confirmation packet even if there was no outgoing data
  384. ui64 UnconfirmedBytes = 0;
  385. TInstant ForcePacketTimestamp = TInstant::Max();
  386. TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule;
  387. size_t MaxFlushSchedule = 0;
  388. ui64 FlushEventsScheduled = 0;
  389. ui64 FlushEventsProcessed = 0;
  390. void SetForcePacketTimestamp(TDuration period);
  391. void ScheduleFlush();
  392. void HandleFlush();
  393. void ResetFlushLogic();
  394. void GenerateHttpInfo(TStringStream& str);
  395. TIntrusivePtr<TReceiveContext> ReceiveContext;
  396. TActorId ReceiverId;
  397. TDuration Ping;
  398. ui64 ConfirmPacketsForcedBySize = 0;
  399. ui64 ConfirmPacketsForcedByTimeout = 0;
  400. ui64 LastConfirmed = 0;
  401. TEvHandshakeDone::TPtr PendingHandshakeDoneEvent;
  402. bool StartHandshakeOnSessionClose = false;
  403. ui64 EqualizeCounter = 0;
  404. };
  405. class TInterconnectSessionKiller
  406. : public TActorBootstrapped<TInterconnectSessionKiller> {
  407. ui32 RepliesReceived = 0;
  408. ui32 RepliesNumber = 0;
  409. TActorId LargestSession = TActorId();
  410. ui64 MaxBufferSize = 0;
  411. TInterconnectProxyCommon::TPtr Common;
  412. public:
  413. static constexpr EActivityType ActorActivityType() {
  414. return INTERCONNECT_SESSION_KILLER;
  415. }
  416. TInterconnectSessionKiller(TInterconnectProxyCommon::TPtr common)
  417. : Common(common)
  418. {
  419. }
  420. void Bootstrap() {
  421. auto sender = SelfId();
  422. const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
  423. auto ev = new TEvSessionBufferSizeRequest();
  424. return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
  425. };
  426. RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric);
  427. Become(&TInterconnectSessionKiller::StateFunc);
  428. }
  429. STRICT_STFUNC(StateFunc,
  430. hFunc(TEvSessionBufferSizeResponse, ProcessResponse)
  431. cFunc(TEvents::TEvUndelivered::EventType, ProcessUndelivered)
  432. )
  433. void ProcessResponse(TEvSessionBufferSizeResponse::TPtr& ev) {
  434. RepliesReceived++;
  435. if (MaxBufferSize < ev->Get()->BufferSize) {
  436. MaxBufferSize = ev->Get()->BufferSize;
  437. LargestSession = ev->Get()->SessionID;
  438. }
  439. if (RepliesReceived == RepliesNumber) {
  440. Send(LargestSession, new TEvents::TEvPoisonPill);
  441. AtomicUnlock(&Common->StartedSessionKiller);
  442. PassAway();
  443. }
  444. }
  445. void ProcessUndelivered() {
  446. RepliesReceived++;
  447. }
  448. };
  449. void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common);
  450. }