interconnect_tcp_proxy.cpp 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936
  1. #include "interconnect_tcp_proxy.h"
  2. #include "interconnect_handshake.h"
  3. #include "interconnect_tcp_session.h"
  4. #include <library/cpp/actors/core/log.h>
  5. #include <library/cpp/actors/protos/services_common.pb.h>
  6. #include <library/cpp/monlib/service/pages/templates.h>
  7. #include <util/system/getpid.h>
  8. namespace NActors {
  9. static constexpr TDuration GetNodeRequestTimeout = TDuration::Seconds(5);
  10. static constexpr TDuration FirstErrorSleep = TDuration::MilliSeconds(10);
  11. static constexpr TDuration MaxErrorSleep = TDuration::Seconds(10);
  12. static constexpr ui32 SleepRetryMultiplier = 4;
  13. static TString PeerNameForHuman(ui32 nodeNum, const TString& longName, ui16 port) {
  14. TStringBuf token;
  15. TStringBuf(longName).NextTok('.', token);
  16. return ToString<ui32>(nodeNum) + ":" + (token.size() > 0 ? TString(token) : longName) + ":" + ToString<ui16>(port);
  17. }
  18. TInterconnectProxyTCP::TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common,
  19. IActor **dynamicPtr)
  20. : TActor(&TThis::StateInit)
  21. , PeerNodeId(node)
  22. , DynamicPtr(dynamicPtr)
  23. , Common(std::move(common))
  24. , SecureContext(new NInterconnect::TSecureSocketContext(Common->Settings.Certificate, Common->Settings.PrivateKey,
  25. Common->Settings.CaFilePath, Common->Settings.CipherList))
  26. {
  27. Y_VERIFY(Common);
  28. Y_VERIFY(Common->NameserviceId);
  29. if (DynamicPtr) {
  30. Y_VERIFY(!*DynamicPtr);
  31. *DynamicPtr = this;
  32. }
  33. }
  34. void TInterconnectProxyTCP::Bootstrap() {
  35. SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId));
  36. SwitchToInitialState();
  37. PassAwayTimestamp = TActivationContext::Now() + TDuration::Seconds(15);
  38. LOG_INFO_IC("ICP01", "ready to work");
  39. }
  40. void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) {
  41. if (!DynamicPtr) {
  42. // perform usual bootstrap for static nodes
  43. sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0));
  44. }
  45. if (const auto& mon = Common->RegisterMonPage) {
  46. TString path = Sprintf("peer%04" PRIu32, PeerNodeId);
  47. TString title = Sprintf("Peer #%04" PRIu32, PeerNodeId);
  48. mon(path, title, sys, SelfId());
  49. }
  50. }
  51. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  52. // PendingActivation
  53. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  54. void TInterconnectProxyTCP::RequestNodeInfo(STATEFN_SIG) {
  55. ICPROXY_PROFILED;
  56. Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents);
  57. EnqueueSessionEvent(ev);
  58. StartConfiguring();
  59. }
  60. void TInterconnectProxyTCP::RequestNodeInfoForIncomingHandshake(STATEFN_SIG) {
  61. ICPROXY_PROFILED;
  62. if (!Terminated) {
  63. Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents);
  64. EnqueueIncomingHandshakeEvent(ev);
  65. StartConfiguring();
  66. }
  67. }
  68. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  69. // PendingNodeInfo
  70. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  71. void TInterconnectProxyTCP::StartConfiguring() {
  72. ICPROXY_PROFILED;
  73. Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
  74. // issue node info request
  75. Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId));
  76. // arm configure timer; store pointer to event to ensure that we will handle correct one if there were any other
  77. // wakeup events in flight
  78. SwitchToState(__LINE__, "PendingNodeInfo", &TThis::PendingNodeInfo, GetNodeRequestTimeout,
  79. ConfigureTimeoutCookie = new TEvents::TEvWakeup);
  80. }
  81. void TInterconnectProxyTCP::Configure(TEvInterconnect::TEvNodeInfo::TPtr& ev) {
  82. ICPROXY_PROFILED;
  83. Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !Session);
  84. if (!ev->Get()->Node) {
  85. TransitToErrorState("cannot get node info");
  86. } else {
  87. auto& info = *ev->Get()->Node;
  88. TString name = PeerNameForHuman(PeerNodeId, info.Host, info.Port);
  89. TechnicalPeerHostName = info.Host;
  90. if (!Metrics) {
  91. Metrics = Common->Metrics ? CreateInterconnectMetrics(Common) : CreateInterconnectCounters(Common);
  92. }
  93. Metrics->SetPeerInfo(name, info.Location.GetDataCenterId());
  94. LOG_DEBUG_IC("ICP02", "configured for host %s", name.data());
  95. ProcessConfigured();
  96. }
  97. }
  98. void TInterconnectProxyTCP::ConfigureTimeout(TEvents::TEvWakeup::TPtr& ev) {
  99. ICPROXY_PROFILED;
  100. if (ev->Get() == ConfigureTimeoutCookie) {
  101. TransitToErrorState("timed out while waiting for node info");
  102. }
  103. }
  104. void TInterconnectProxyTCP::ProcessConfigured() {
  105. ICPROXY_PROFILED;
  106. // if the request was initiated by some activity involving Interconnect, then we are expected to start handshake
  107. if (PendingSessionEvents) {
  108. StartInitialHandshake();
  109. }
  110. // process incoming handshake requests; all failures were ejected from the queue along with the matching initiation requests
  111. for (THolder<IEventHandle>& ev : PendingIncomingHandshakeEvents) {
  112. TAutoPtr<IEventHandle> x(ev.Release());
  113. IncomingHandshake(x);
  114. }
  115. PendingIncomingHandshakeEvents.clear();
  116. // possible situation -- incoming handshake arrives, but actually it is not satisfied and rejected; in this case
  117. // we are going to return to initial state as we have nothing to do
  118. if (!IncomingHandshakeActor && !OutgoingHandshakeActor) {
  119. SwitchToInitialState();
  120. }
  121. }
  122. void TInterconnectProxyTCP::StartInitialHandshake() {
  123. ICPROXY_PROFILED;
  124. // since we are starting initial handshake for some reason, we'll drop any existing handshakes, if any
  125. DropHandshakes();
  126. // create and register handshake actor
  127. OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, GenerateSessionVirtualId(),
  128. TActorId(), PeerNodeId, 0, TechnicalPeerHostName, TSessionParams()), TMailboxType::ReadAsFilled);
  129. OutgoingHandshakeActorCreated = TActivationContext::Now();
  130. // prepare for new handshake
  131. PrepareNewSessionHandshake();
  132. }
  133. void TInterconnectProxyTCP::StartResumeHandshake(ui64 inputCounter) {
  134. ICPROXY_PROFILED;
  135. // drop outgoing handshake if we have one; keep incoming handshakes as they may be useful
  136. DropOutgoingHandshake();
  137. // ensure that we have session
  138. Y_VERIFY(Session);
  139. // ensure that we have both virtual ids
  140. Y_VERIFY(SessionVirtualId);
  141. Y_VERIFY(RemoteSessionVirtualId);
  142. // create and register handshake actor
  143. OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, SessionVirtualId,
  144. RemoteSessionVirtualId, PeerNodeId, inputCounter, TechnicalPeerHostName, Session->Params),
  145. TMailboxType::ReadAsFilled);
  146. OutgoingHandshakeActorCreated = TActivationContext::Now();
  147. }
  148. void TInterconnectProxyTCP::IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId,
  149. THolder<IEventBase> event) {
  150. ICPROXY_PROFILED;
  151. Y_VERIFY(!IncomingHandshakeActor);
  152. IncomingHandshakeActor = handshakeId;
  153. IncomingHandshakeActorFilledIn = TActivationContext::Now();
  154. Y_VERIFY(!LastSerialFromIncomingHandshake || *LastSerialFromIncomingHandshake <= peerLocalId);
  155. LastSerialFromIncomingHandshake = peerLocalId;
  156. if (OutgoingHandshakeActor && SelfId().NodeId() < PeerNodeId) {
  157. // Both outgoing and incoming handshake are in progress. To prevent race condition during semultanous handshake
  158. // incoming handshake must be held till outgoing handshake is complete or failed
  159. LOG_DEBUG_IC("ICP06", "reply for incoming handshake (actor %s) is held", IncomingHandshakeActor.ToString().data());
  160. HeldHandshakeReply = std::move(event);
  161. // Check that we are in one of acceptable states that would properly handle handshake statuses.
  162. const auto state = CurrentStateFunc();
  163. Y_VERIFY(state == &TThis::PendingConnection || state == &TThis::StateWork, "invalid handshake request in state# %s", State);
  164. } else {
  165. LOG_DEBUG_IC("ICP07", "issued incoming handshake reply");
  166. // No race, so we can send reply immediately.
  167. Y_VERIFY(!HeldHandshakeReply);
  168. Send(IncomingHandshakeActor, event.Release());
  169. // Start waiting for handshake reply, if not yet started; also, if session is already created, then we don't
  170. // switch from working state.
  171. if (!Session) {
  172. LOG_INFO_IC("ICP08", "No active sessions, becoming PendingConnection");
  173. SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection);
  174. } else {
  175. Y_VERIFY(CurrentStateFunc() == &TThis::StateWork);
  176. }
  177. }
  178. }
  179. void TInterconnectProxyTCP::IncomingHandshake(TEvHandshakeAsk::TPtr& ev) {
  180. ICPROXY_PROFILED;
  181. TEvHandshakeAsk *msg = ev->Get();
  182. // TEvHandshakeAsk is only applicable for continuation requests
  183. LOG_DEBUG_IC("ICP09", "(actor %s) from: %s for: %s", ev->Sender.ToString().data(),
  184. ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data());
  185. if (!Session) {
  186. // if there is no open session, report error -- continuation request works only with open sessions
  187. LOG_NOTICE_IC("ICP12", "(actor %s) peer tries to resume nonexistent session Self# %s Peer# %s",
  188. ev->Sender.ToString().data(), msg->Self.ToString().data(), msg->Peer.ToString().data());
  189. } else if (SessionVirtualId != ev->Get()->Peer || RemoteSessionVirtualId != ev->Get()->Self) {
  190. // check session virtual ids for continuation
  191. LOG_NOTICE_IC("ICP13", "(actor %s) virtual id mismatch with existing session (Peer: %s Self: %s"
  192. " SessionVirtualId: %s RemoteSessionVirtualId: %s)", ev->Sender.ToString().data(),
  193. ev->Get()->Peer.ToString().data(), ev->Get()->Self.ToString().data(), SessionVirtualId.ToString().data(),
  194. RemoteSessionVirtualId.ToString().data());
  195. } else {
  196. // if we already have incoming handshake, then terminate existing one
  197. DropIncomingHandshake();
  198. // issue reply to the sender, possibly holding it while outgoing handshake is at race
  199. THolder<IEventBase> reply = IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::ProcessHandshakeRequest, ev);
  200. return IssueIncomingHandshakeReply(ev->Sender, RemoteSessionVirtualId.LocalId(), std::move(reply));
  201. }
  202. // error case -- report error to the handshake actor
  203. Send(ev->Sender, new TEvHandshakeNak);
  204. }
  205. void TInterconnectProxyTCP::IncomingHandshake(TEvHandshakeRequest::TPtr& ev) {
  206. ICPROXY_PROFILED;
  207. LOG_DEBUG_IC("ICP17", "incoming handshake (actor %s)", ev->Sender.ToString().data());
  208. const auto& record = ev->Get()->Record;
  209. ui64 remotePID = record.GetProgramPID();
  210. ui64 remoteStartTime = record.GetProgramStartTime();
  211. ui64 remoteSerial = record.GetSerial();
  212. if (RemoteProgramInfo && remotePID == RemoteProgramInfo->PID && remoteStartTime == RemoteProgramInfo->StartTime) {
  213. if (remoteSerial < RemoteProgramInfo->Serial) {
  214. LOG_INFO_IC("ICP18", "handshake (actor %s) is too old", ev->Sender.ToString().data());
  215. Send(ev->Sender, new TEvents::TEvPoisonPill);
  216. return;
  217. } else {
  218. RemoteProgramInfo->Serial = remoteSerial;
  219. }
  220. } else {
  221. const auto ptr = new TProgramInfo;
  222. ptr->PID = remotePID;
  223. ptr->StartTime = remoteStartTime;
  224. ptr->Serial = remoteSerial;
  225. RemoteProgramInfo.Reset(ptr);
  226. }
  227. /* Let's check peer technical hostname */
  228. if (record.HasSenderHostName() && TechnicalPeerHostName != record.GetSenderHostName()) {
  229. Send(ev->Sender, new TEvHandshakeReplyError("host name mismatch"));
  230. return;
  231. }
  232. // check sender actor id and check if it is not very old
  233. if (LastSerialFromIncomingHandshake) {
  234. const ui64 serial = record.GetSerial();
  235. if (serial < *LastSerialFromIncomingHandshake) {
  236. LOG_NOTICE_IC("ICP15", "Handshake# %s has duplicate serial# %" PRIu64
  237. " LastSerialFromIncomingHandshake# %" PRIu64, ev->Sender.ToString().data(),
  238. serial, *LastSerialFromIncomingHandshake);
  239. Send(ev->Sender, new TEvHandshakeReplyError("duplicate serial"));
  240. return;
  241. } else if (serial == *LastSerialFromIncomingHandshake) {
  242. LOG_NOTICE_IC("ICP15", "Handshake# %s is obsolete, serial# %" PRIu64
  243. " LastSerialFromIncomingHandshake# %" PRIu64, ev->Sender.ToString().data(),
  244. serial, *LastSerialFromIncomingHandshake);
  245. Send(ev->Sender, new TEvents::TEvPoisonPill);
  246. return;
  247. }
  248. }
  249. // drop incoming handshake as this is definitely more recent
  250. DropIncomingHandshake();
  251. // prepare for new session
  252. PrepareNewSessionHandshake();
  253. auto event = MakeHolder<TEvHandshakeReplyOK>();
  254. auto* pb = event->Record.MutableSuccess();
  255. const TActorId virtualId = GenerateSessionVirtualId();
  256. pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION);
  257. pb->SetSenderActorId(virtualId.ToString());
  258. pb->SetProgramPID(GetPID());
  259. pb->SetProgramStartTime(Common->StartTime);
  260. pb->SetSerial(virtualId.LocalId());
  261. IssueIncomingHandshakeReply(ev->Sender, 0, std::move(event));
  262. }
  263. void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeDone::TPtr& ev) {
  264. ICPROXY_PROFILED;
  265. TEvHandshakeDone *msg = ev->Get();
  266. // Terminate handshake actor working in opposite direction, if set up.
  267. if (ev->Sender == IncomingHandshakeActor) {
  268. LOG_INFO_IC("ICP19", "incoming handshake succeeded");
  269. DropIncomingHandshake(false);
  270. DropOutgoingHandshake();
  271. } else if (ev->Sender == OutgoingHandshakeActor) {
  272. LOG_INFO_IC("ICP20", "outgoing handshake succeeded");
  273. DropIncomingHandshake();
  274. DropOutgoingHandshake(false);
  275. } else {
  276. /* It seems to be an old handshake. */
  277. return;
  278. }
  279. Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
  280. SwitchToState(__LINE__, "StateWork", &TThis::StateWork);
  281. if (Session) {
  282. // this is continuation request, check that virtual ids match
  283. Y_VERIFY(SessionVirtualId == msg->Self && RemoteSessionVirtualId == msg->Peer);
  284. } else {
  285. // this is initial request, check that we have virtual ids not filled in
  286. Y_VERIFY(!SessionVirtualId && !RemoteSessionVirtualId);
  287. }
  288. auto error = [&](const char* description) {
  289. TransitToErrorState(description);
  290. };
  291. // If session is not created, then create new one.
  292. if (!Session) {
  293. RemoteProgramInfo = std::move(msg->ProgramInfo);
  294. if (!RemoteProgramInfo) {
  295. // we have received resume handshake, but session was closed concurrently while handshaking
  296. return error("Session continuation race");
  297. }
  298. // Create new session actor.
  299. SessionID = RegisterWithSameMailbox(Session = new TInterconnectSessionTCP(this, msg->Params));
  300. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Init);
  301. SessionVirtualId = msg->Self;
  302. RemoteSessionVirtualId = msg->Peer;
  303. LOG_INFO_IC("ICP22", "created new session: %s", SessionID.ToString().data());
  304. }
  305. // ensure that we have session local/peer virtual ids
  306. Y_VERIFY(Session && SessionVirtualId && RemoteSessionVirtualId);
  307. // Set up new connection for the session.
  308. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::SetNewConnection, ev);
  309. // Reset retry timer
  310. HoldByErrorWakeupDuration = TDuration::Zero();
  311. /* Forward all held events */
  312. ProcessPendingSessionEvents();
  313. }
  314. void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeFail::TPtr& ev) {
  315. ICPROXY_PROFILED;
  316. // update error state log; this fail is inconclusive unless this is the last pending handshake
  317. const bool inconclusive = (ev->Sender != IncomingHandshakeActor && ev->Sender != OutgoingHandshakeActor) ||
  318. (IncomingHandshakeActor && OutgoingHandshakeActor);
  319. LogHandshakeFail(ev, inconclusive);
  320. if (ev->Sender == IncomingHandshakeActor) {
  321. LOG_NOTICE_IC("ICP24", "incoming handshake failed, temporary: %" PRIu32 " explanation: %s outgoing: %s",
  322. ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), OutgoingHandshakeActor.ToString().data());
  323. DropIncomingHandshake(false);
  324. } else if (ev->Sender == OutgoingHandshakeActor) {
  325. LOG_NOTICE_IC("ICP25", "outgoing handshake failed, temporary: %" PRIu32 " explanation: %s incoming: %s held: %s",
  326. ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), IncomingHandshakeActor.ToString().data(),
  327. HeldHandshakeReply ? "yes" : "no");
  328. DropOutgoingHandshake(false);
  329. if (IEventBase* reply = HeldHandshakeReply.Release()) {
  330. Y_VERIFY(IncomingHandshakeActor);
  331. LOG_DEBUG_IC("ICP26", "sent held handshake reply to %s", IncomingHandshakeActor.ToString().data());
  332. Send(IncomingHandshakeActor, reply);
  333. }
  334. // if we have no current session, then we have to drop all pending events as the outgoing handshake has failed
  335. ProcessPendingSessionEvents();
  336. } else {
  337. /* It seems to be an old fail, just ignore it */
  338. LOG_NOTICE_IC("ICP27", "obsolete handshake fail ignored");
  339. return;
  340. }
  341. if (Metrics) {
  342. Metrics->IncHandshakeFails();
  343. }
  344. if (IncomingHandshakeActor || OutgoingHandshakeActor) {
  345. // one of handshakes is still going on
  346. LOG_DEBUG_IC("ICP28", "other handshake is still going on");
  347. return;
  348. }
  349. switch (ev->Get()->Temporary) {
  350. case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT:
  351. if (!Session) {
  352. if (PendingSessionEvents) {
  353. // try to start outgoing handshake as we have some events enqueued
  354. StartInitialHandshake();
  355. } else {
  356. // return back to initial state as we have no session and no pending handshakes
  357. SwitchToInitialState();
  358. }
  359. } else if (Session->Socket) {
  360. // try to reestablish connection -- meaning restart handshake from the last known position
  361. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::ReestablishConnectionWithHandshake,
  362. TDisconnectReason::HandshakeFailTransient());
  363. } else {
  364. // we have no active connection in that session, so just restart handshake from last known position
  365. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::StartHandshake);
  366. }
  367. break;
  368. case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH:
  369. StartInitialHandshake();
  370. break;
  371. case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT:
  372. TString timeExplanation = " LastSessionDieTime# " + LastSessionDieTime.ToString();
  373. if (Session) {
  374. InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate,
  375. TDisconnectReason::HandshakeFailPermanent());
  376. }
  377. TransitToErrorState(ev->Get()->Explanation + timeExplanation, false);
  378. break;
  379. }
  380. }
  381. void TInterconnectProxyTCP::LogHandshakeFail(TEvHandshakeFail::TPtr& ev, bool inconclusive) {
  382. ICPROXY_PROFILED;
  383. TString kind = "unknown";
  384. switch (ev->Get()->Temporary) {
  385. case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT:
  386. kind = Session ? "transient w/session" : "transient w/o session";
  387. break;
  388. case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH:
  389. kind = "session_mismatch";
  390. break;
  391. case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT:
  392. kind = "permanent";
  393. break;
  394. }
  395. if (inconclusive) {
  396. kind += " inconclusive";
  397. }
  398. UpdateErrorStateLog(TActivationContext::Now(), kind, ev->Get()->Explanation);
  399. }
  400. void TInterconnectProxyTCP::ProcessPendingSessionEvents() {
  401. ICPROXY_PROFILED;
  402. while (PendingSessionEvents) {
  403. TPendingSessionEvent ev = std::move(PendingSessionEvents.front());
  404. PendingSessionEventsSize -= ev.Size;
  405. TAutoPtr<IEventHandle> event(ev.Event.Release());
  406. PendingSessionEvents.pop_front();
  407. if (Session) {
  408. ForwardSessionEventToSession(event);
  409. } else {
  410. DropSessionEvent(event);
  411. }
  412. }
  413. }
  414. void TInterconnectProxyTCP::DropSessionEvent(STATEFN_SIG) {
  415. ICPROXY_PROFILED;
  416. ValidateEvent(ev, "DropSessionEvent");
  417. switch (ev->GetTypeRewrite()) {
  418. case TEvInterconnect::EvForward:
  419. if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
  420. Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie);
  421. }
  422. TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected));
  423. break;
  424. case TEvInterconnect::TEvConnectNode::EventType:
  425. case TEvents::TEvSubscribe::EventType:
  426. Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie);
  427. break;
  428. case TEvents::TEvUnsubscribe::EventType:
  429. /* Do nothing */
  430. break;
  431. default:
  432. Y_FAIL("Unexpected type of event in held event queue");
  433. }
  434. }
  435. void TInterconnectProxyTCP::UnregisterSession(TInterconnectSessionTCP* session) {
  436. ICPROXY_PROFILED;
  437. Y_VERIFY(Session && Session == session && SessionID);
  438. LOG_INFO_IC("ICP30", "unregister session Session# %s VirtualId# %s", SessionID.ToString().data(),
  439. SessionVirtualId.ToString().data());
  440. Session = nullptr;
  441. SessionID = TActorId();
  442. // drop all pending events as we are closed
  443. ProcessPendingSessionEvents();
  444. // reset virtual ids as this session is terminated
  445. SessionVirtualId = TActorId();
  446. RemoteSessionVirtualId = TActorId();
  447. if (Metrics) {
  448. Metrics->IncSessionDeaths();
  449. }
  450. LastSessionDieTime = TActivationContext::Now();
  451. if (IncomingHandshakeActor || OutgoingHandshakeActor) {
  452. PrepareNewSessionHandshake();
  453. } else {
  454. SwitchToInitialState();
  455. }
  456. }
  457. void TInterconnectProxyTCP::EnqueueSessionEvent(STATEFN_SIG) {
  458. ICPROXY_PROFILED;
  459. ValidateEvent(ev, "EnqueueSessionEvent");
  460. const ui32 size = ev->GetSize();
  461. PendingSessionEventsSize += size;
  462. PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev);
  463. ScheduleCleanupEventQueue();
  464. CleanupEventQueue();
  465. }
  466. void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(STATEFN_SIG) {
  467. ICPROXY_PROFILED;
  468. // enqueue handshake request
  469. Y_UNUSED();
  470. PendingIncomingHandshakeEvents.emplace_back(ev);
  471. }
  472. void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeDone::TPtr& /*ev*/) {
  473. ICPROXY_PROFILED;
  474. // TEvHandshakeDone can't get into the queue, because we have to process handshake request first; this may be the
  475. // race with the previous handshakes, so simply ignore it
  476. }
  477. void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeFail::TPtr& ev) {
  478. ICPROXY_PROFILED;
  479. for (auto it = PendingIncomingHandshakeEvents.begin(); it != PendingIncomingHandshakeEvents.end(); ++it) {
  480. THolder<IEventHandle>& pendingEvent = *it;
  481. if (pendingEvent->Sender == ev->Sender) {
  482. // we have found cancellation request for the pending handshake request; so simply remove it from the
  483. // deque, as we are not interested in failure reason; must likely it happens because of handshake timeout
  484. if (pendingEvent->GetTypeRewrite() == TEvHandshakeFail::EventType) {
  485. TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(pendingEvent.Release()));
  486. LogHandshakeFail(tmp, true);
  487. }
  488. PendingIncomingHandshakeEvents.erase(it);
  489. break;
  490. }
  491. }
  492. }
  493. void TInterconnectProxyTCP::ForwardSessionEventToSession(STATEFN_SIG) {
  494. ICPROXY_PROFILED;
  495. Y_VERIFY(Session && SessionID);
  496. ValidateEvent(ev, "ForwardSessionEventToSession");
  497. InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID));
  498. }
  499. void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) {
  500. ICPROXY_PROFILED;
  501. LOG_INFO_IC("ICP31", "proxy http called");
  502. TStringStream str;
  503. HTML(str) {
  504. DIV_CLASS("panel panel-info") {
  505. DIV_CLASS("panel-heading") {
  506. str << "Proxy";
  507. }
  508. DIV_CLASS("panel-body") {
  509. TABLE_CLASS("table") {
  510. TABLEHEAD() {
  511. TABLER() {
  512. TABLEH() {
  513. str << "Sensor";
  514. }
  515. TABLEH() {
  516. str << "Value";
  517. }
  518. }
  519. }
  520. #define MON_VAR(NAME) \
  521. TABLER() { \
  522. TABLED() { \
  523. str << #NAME; \
  524. } \
  525. TABLED() { \
  526. str << NAME; \
  527. } \
  528. }
  529. TABLEBODY() {
  530. MON_VAR(TActivationContext::Now())
  531. MON_VAR(SessionID)
  532. MON_VAR(LastSessionDieTime)
  533. MON_VAR(IncomingHandshakeActor)
  534. MON_VAR(IncomingHandshakeActorFilledIn)
  535. MON_VAR(IncomingHandshakeActorReset)
  536. MON_VAR(OutgoingHandshakeActor)
  537. MON_VAR(OutgoingHandshakeActorCreated)
  538. MON_VAR(OutgoingHandshakeActorReset)
  539. MON_VAR(State)
  540. MON_VAR(StateSwitchTime)
  541. }
  542. }
  543. }
  544. }
  545. DIV_CLASS("panel panel-info") {
  546. DIV_CLASS("panel-heading") {
  547. str << "Error Log";
  548. }
  549. DIV_CLASS("panel-body") {
  550. TABLE_CLASS("table") {
  551. TABLEHEAD() {
  552. TABLER() {
  553. TABLEH() {
  554. str << "Timestamp";
  555. }
  556. TABLEH() {
  557. str << "Elapsed";
  558. }
  559. TABLEH() {
  560. str << "Kind";
  561. }
  562. TABLEH() {
  563. str << "Explanation";
  564. }
  565. }
  566. }
  567. TABLEBODY() {
  568. const TInstant now = TActivationContext::Now();
  569. const TInstant barrier = now - TDuration::Minutes(1);
  570. for (auto it = ErrorStateLog.rbegin(); it != ErrorStateLog.rend(); ++it) {
  571. auto wrapper = [&](const auto& lambda) {
  572. if (std::get<0>(*it) > barrier) {
  573. str << "<strong>";
  574. lambda();
  575. str << "</strong>";
  576. } else {
  577. lambda();
  578. }
  579. };
  580. TABLER() {
  581. TABLED() {
  582. wrapper([&] {
  583. str << std::get<0>(*it);
  584. });
  585. }
  586. TABLED() {
  587. wrapper([&] {
  588. str << now - std::get<0>(*it);
  589. });
  590. }
  591. TABLED() {
  592. wrapper([&] {
  593. str << std::get<1>(*it);
  594. });
  595. }
  596. TABLED() {
  597. wrapper([&] {
  598. str << std::get<2>(*it);
  599. });
  600. ui32 rep = std::get<3>(*it);
  601. if (rep != 1) {
  602. str << " <strong>x" << rep << "</strong>";
  603. }
  604. }
  605. }
  606. }
  607. }
  608. }
  609. }
  610. }
  611. }
  612. if (Session != nullptr) {
  613. Session->GenerateHttpInfo(str);
  614. }
  615. Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
  616. }
  617. void TInterconnectProxyTCP::TransitToErrorState(TString explanation, bool updateErrorLog) {
  618. ICPROXY_PROFILED;
  619. LOG_NOTICE_IC("ICP32", "transit to hold-by-error state Explanation# %s", explanation.data());
  620. LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] error state: %s", PeerNodeId, explanation.data());
  621. if (updateErrorLog) {
  622. UpdateErrorStateLog(TActivationContext::Now(), "permanent conclusive", explanation);
  623. }
  624. Y_VERIFY(Session == nullptr);
  625. Y_VERIFY(!SessionID);
  626. // recalculate wakeup timeout -- if this is the first failure, then we sleep for default timeout; otherwise we
  627. // sleep N times longer than the previous try, but not longer than desired number of seconds
  628. HoldByErrorWakeupDuration = HoldByErrorWakeupDuration != TDuration::Zero()
  629. ? Min(HoldByErrorWakeupDuration * SleepRetryMultiplier, MaxErrorSleep)
  630. : FirstErrorSleep;
  631. // transit to required state and arm wakeup timer
  632. if (Terminated) {
  633. // switch to this state permanently
  634. SwitchToState(__LINE__, "HoldByError", &TThis::HoldByError);
  635. HoldByErrorWakeupCookie = nullptr;
  636. } else {
  637. SwitchToState(__LINE__, "HoldByError", &TThis::HoldByError, HoldByErrorWakeupDuration,
  638. HoldByErrorWakeupCookie = new TEvents::TEvWakeup);
  639. }
  640. /* Process all pending events. */
  641. ProcessPendingSessionEvents();
  642. /* Terminate handshakes */
  643. DropHandshakes();
  644. /* Terminate pending incoming handshake requests. */
  645. for (auto& ev : PendingIncomingHandshakeEvents) {
  646. Send(ev->Sender, new TEvents::TEvPoisonPill);
  647. if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) {
  648. TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release()));
  649. LogHandshakeFail(tmp, true);
  650. }
  651. }
  652. PendingIncomingHandshakeEvents.clear();
  653. }
  654. void TInterconnectProxyTCP::WakeupFromErrorState(TEvents::TEvWakeup::TPtr& ev) {
  655. ICPROXY_PROFILED;
  656. LOG_INFO_IC("ICP33", "wake up from error state");
  657. if (ev->Get() == HoldByErrorWakeupCookie) {
  658. SwitchToInitialState();
  659. }
  660. }
  661. void TInterconnectProxyTCP::Disconnect() {
  662. ICPROXY_PROFILED;
  663. // terminate handshakes (if any)
  664. DropHandshakes();
  665. if (Session) {
  666. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::UserRequest());
  667. } else {
  668. TransitToErrorState("forced disconnect");
  669. }
  670. }
  671. void TInterconnectProxyTCP::ScheduleCleanupEventQueue() {
  672. ICPROXY_PROFILED;
  673. if (!CleanupEventQueueScheduled && PendingSessionEvents) {
  674. // apply batching at 50 ms granularity
  675. Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue);
  676. CleanupEventQueueScheduled = true;
  677. }
  678. }
  679. void TInterconnectProxyTCP::HandleCleanupEventQueue() {
  680. ICPROXY_PROFILED;
  681. Y_VERIFY(CleanupEventQueueScheduled);
  682. CleanupEventQueueScheduled = false;
  683. CleanupEventQueue();
  684. ScheduleCleanupEventQueue();
  685. }
  686. void TInterconnectProxyTCP::CleanupEventQueue() {
  687. ICPROXY_PROFILED;
  688. const TInstant now = TActivationContext::Now();
  689. while (PendingSessionEvents) {
  690. TPendingSessionEvent& ev = PendingSessionEvents.front();
  691. if (now >= ev.Deadline || PendingSessionEventsSize > Common->Settings.MessagePendingSize) {
  692. TAutoPtr<IEventHandle> event(ev.Event.Release());
  693. PendingSessionEventsSize -= ev.Size;
  694. DropSessionEvent(event);
  695. PendingSessionEvents.pop_front();
  696. } else {
  697. break;
  698. }
  699. }
  700. }
  701. void TInterconnectProxyTCP::HandleClosePeerSocket() {
  702. ICPROXY_PROFILED;
  703. if (Session && Session->Socket) {
  704. LOG_INFO_IC("ICP34", "closed connection by debug command");
  705. Session->Socket->Shutdown(SHUT_RDWR);
  706. }
  707. }
  708. void TInterconnectProxyTCP::HandleCloseInputSession() {
  709. ICPROXY_PROFILED;
  710. if (Session) {
  711. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::CloseInputSession);
  712. }
  713. }
  714. void TInterconnectProxyTCP::HandlePoisonSession() {
  715. ICPROXY_PROFILED;
  716. if (Session) {
  717. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::Debug());
  718. }
  719. }
  720. void TInterconnectProxyTCP::HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev) {
  721. ICPROXY_PROFILED;
  722. ui64 bufSize = 0;
  723. if (Session) {
  724. bufSize = Session->TotalOutputQueueSize;
  725. }
  726. Send(ev->Sender, new TEvSessionBufferSizeResponse(SessionID, bufSize));
  727. }
  728. void TInterconnectProxyTCP::Handle(TEvQueryStats::TPtr& ev) {
  729. ICPROXY_PROFILED;
  730. TProxyStats stats;
  731. stats.Path = Sprintf("peer%04" PRIu32, PeerNodeId);
  732. stats.State = State;
  733. stats.PeerScopeId = Session ? Session->Params.PeerScopeId : TScopeId();
  734. stats.LastSessionDieTime = LastSessionDieTime;
  735. stats.TotalOutputQueueSize = Session ? Session->TotalOutputQueueSize : 0;
  736. stats.Connected = Session ? (bool)Session->Socket : false;
  737. stats.Host = TechnicalPeerHostName;
  738. stats.Port = 0;
  739. ui32 rep = 0;
  740. std::tie(stats.LastErrorTimestamp, stats.LastErrorKind, stats.LastErrorExplanation, rep) = ErrorStateLog
  741. ? ErrorStateLog.back()
  742. : std::make_tuple(TInstant(), TString(), TString(), 1U);
  743. if (rep != 1) {
  744. stats.LastErrorExplanation += Sprintf(" x%" PRIu32, rep);
  745. }
  746. stats.Ping = Session ? Session->GetPingRTT() : TDuration::Zero();
  747. stats.ClockSkew = Session ? Session->GetClockSkew() : 0;
  748. if (Session) {
  749. if (auto *x = dynamic_cast<NInterconnect::TSecureSocket*>(Session->Socket.Get())) {
  750. stats.Encryption = Sprintf("%s/%u", x->GetCipherName().data(), x->GetCipherBits());
  751. } else {
  752. stats.Encryption = "none";
  753. }
  754. }
  755. auto response = MakeHolder<TEvStats>();
  756. response->PeerNodeId = PeerNodeId;
  757. response->ProxyStats = std::move(stats);
  758. Send(ev->Sender, response.Release());
  759. }
  760. void TInterconnectProxyTCP::HandleTerminate() {
  761. ICPROXY_PROFILED;
  762. if (Session) {
  763. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason());
  764. }
  765. Terminated = true;
  766. TransitToErrorState("terminated");
  767. }
  768. void TInterconnectProxyTCP::PassAway() {
  769. if (Session) {
  770. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason());
  771. }
  772. if (DynamicPtr) {
  773. Y_VERIFY(*DynamicPtr == this);
  774. *DynamicPtr = nullptr;
  775. }
  776. // TODO: unregister actor mon page
  777. TActor::PassAway();
  778. }
  779. }