interconnect_handshake.cpp 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. #include "interconnect_handshake.h"
  2. #include "interconnect_tcp_proxy.h"
  3. #include <library/cpp/actors/core/actor_coroutine.h>
  4. #include <library/cpp/actors/core/log.h>
  5. #include <library/cpp/actors/protos/services_common.pb.h>
  6. #include <util/system/getpid.h>
  7. #include <google/protobuf/text_format.h>
  8. #include <variant>
  9. namespace NActors {
  10. static constexpr size_t StackSize = 64 * 1024; // 64k should be enough
  11. class THandshakeActor
  12. : public TActorCoroImpl
  13. , public TInterconnectLoggingBase
  14. {
  15. struct TExHandshakeFailed : yexception {};
  16. static constexpr TDuration ResolveTimeout = TDuration::Seconds(1);
  17. #pragma pack(push, 1)
  18. struct TInitialPacket {
  19. struct {
  20. TActorId SelfVirtualId;
  21. TActorId PeerVirtualId;
  22. ui64 NextPacket;
  23. ui64 Version;
  24. } Header;
  25. ui32 Checksum;
  26. TInitialPacket() = default;
  27. TInitialPacket(const TActorId& self, const TActorId& peer, ui64 nextPacket, ui64 version) {
  28. Header.SelfVirtualId = self;
  29. Header.PeerVirtualId = peer;
  30. Header.NextPacket = nextPacket;
  31. Header.Version = version;
  32. Checksum = Crc32cExtendMSanCompatible(0, &Header, sizeof(Header));
  33. }
  34. bool Check() const {
  35. return Checksum == Crc32cExtendMSanCompatible(0, &Header, sizeof(Header));
  36. }
  37. TString ToString() const {
  38. return TStringBuilder()
  39. << "{SelfVirtualId# " << Header.SelfVirtualId.ToString()
  40. << " PeerVirtualId# " << Header.PeerVirtualId.ToString()
  41. << " NextPacket# " << Header.NextPacket
  42. << " Version# " << Header.Version
  43. << "}";
  44. }
  45. };
  46. struct TExHeader {
  47. static constexpr ui32 MaxSize = 1024 * 1024;
  48. ui32 Checksum;
  49. ui32 Size;
  50. ui32 CalculateChecksum(const void* data, size_t len) const {
  51. return Crc32cExtendMSanCompatible(Crc32cExtendMSanCompatible(0, &Size, sizeof(Size)), data, len);
  52. }
  53. void Sign(const void* data, size_t len) {
  54. Checksum = CalculateChecksum(data, len);
  55. }
  56. bool Check(const void* data, size_t len) const {
  57. return Checksum == CalculateChecksum(data, len);
  58. }
  59. };
  60. #pragma pack(pop)
  61. private:
  62. TInterconnectProxyCommon::TPtr Common;
  63. TActorId SelfVirtualId;
  64. TActorId PeerVirtualId;
  65. ui32 PeerNodeId = 0;
  66. ui64 NextPacketToPeer = 0;
  67. TMaybe<ui64> NextPacketFromPeer; // will be obtained from incoming initial packet
  68. TString PeerHostName;
  69. TString PeerAddr;
  70. TSocketPtr Socket;
  71. TPollerToken::TPtr PollerToken;
  72. TString State;
  73. TString HandshakeKind;
  74. TMaybe<THolder<TProgramInfo>> ProgramInfo; // filled in in case of successful handshake; even if null
  75. TSessionParams Params;
  76. bool ResolveTimedOut = false;
  77. THashMap<ui32, TInstant> LastLogNotice;
  78. const TDuration MuteDuration = TDuration::Seconds(15);
  79. TInstant Deadline;
  80. public:
  81. static constexpr IActor::EActivityType ActorActivityType() {
  82. return IActor::INTERCONNECT_HANDSHAKE;
  83. }
  84. THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
  85. ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params)
  86. : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
  87. , Common(std::move(common))
  88. , SelfVirtualId(self)
  89. , PeerVirtualId(peer)
  90. , PeerNodeId(nodeId)
  91. , NextPacketToPeer(nextPacket)
  92. , PeerHostName(std::move(peerHostName))
  93. , HandshakeKind("outgoing handshake")
  94. , Params(std::move(params))
  95. {
  96. Y_VERIFY(SelfVirtualId);
  97. Y_VERIFY(SelfVirtualId.NodeId());
  98. Y_VERIFY(PeerNodeId);
  99. }
  100. THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
  101. : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
  102. , Common(std::move(common))
  103. , Socket(std::move(socket))
  104. , HandshakeKind("incoming handshake")
  105. {
  106. Y_VERIFY(Socket);
  107. PeerAddr = TString::Uninitialized(1024);
  108. if (GetRemoteAddr(*Socket, PeerAddr.Detach(), PeerAddr.size())) {
  109. PeerAddr.resize(strlen(PeerAddr.data()));
  110. } else {
  111. PeerAddr.clear();
  112. }
  113. }
  114. void UpdatePrefix() {
  115. SetPrefix(Sprintf("Handshake %s [node %" PRIu32 "]", SelfActorId.ToString().data(), PeerNodeId));
  116. }
  117. void Run() override {
  118. UpdatePrefix();
  119. // set up overall handshake process timer
  120. TDuration timeout = Common->Settings.Handshake;
  121. if (timeout == TDuration::Zero()) {
  122. timeout = DEFAULT_HANDSHAKE_TIMEOUT;
  123. }
  124. timeout += ResolveTimeout * 2;
  125. Deadline = Now() + timeout;
  126. Schedule(Deadline, new TEvents::TEvWakeup);
  127. try {
  128. if (Socket) {
  129. PerformIncomingHandshake();
  130. } else {
  131. PerformOutgoingHandshake();
  132. }
  133. // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
  134. if (ProgramInfo) {
  135. if (Params.Encryption) {
  136. EstablishSecureConnection();
  137. } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
  138. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
  139. }
  140. }
  141. } catch (const TExHandshakeFailed&) {
  142. ProgramInfo.Clear();
  143. }
  144. if (ProgramInfo) {
  145. LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
  146. Y_VERIFY(NextPacketFromPeer);
  147. if (PollerToken) {
  148. Y_VERIFY(PollerToken->RefCount() == 1);
  149. PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
  150. }
  151. SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
  152. *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
  153. }
  154. Socket.Reset();
  155. }
  156. void EstablishSecureConnection() {
  157. Y_VERIFY(PollerToken && PollerToken->RefCount() == 1);
  158. PollerToken.Reset();
  159. auto ev = AskProxy<TEvSecureSocket>(MakeHolder<TEvGetSecureSocket>(Socket), "AskProxy(TEvSecureContext)");
  160. Socket = std::move(ev->Get()->Socket);
  161. RegisterInPoller();
  162. const ui32 myNodeId = GetActorSystem()->NodeId;
  163. const bool server = myNodeId < PeerNodeId; // keep server/client role permanent to enable easy TLS session resuming
  164. for (;;) {
  165. TString err;
  166. auto& secure = static_cast<NInterconnect::TSecureSocket&>(*Socket);
  167. switch (secure.Establish(server, Params.AuthOnly, err)) {
  168. case NInterconnect::TSecureSocket::EStatus::SUCCESS:
  169. if (Params.AuthOnly) {
  170. Params.Encryption = false;
  171. Params.AuthCN = secure.GetPeerCommonName();
  172. Y_VERIFY(PollerToken && PollerToken->RefCount() == 1);
  173. PollerToken.Reset();
  174. Socket = secure.Detach();
  175. }
  176. return;
  177. case NInterconnect::TSecureSocket::EStatus::ERROR:
  178. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, err, true);
  179. [[fallthrough]];
  180. case NInterconnect::TSecureSocket::EStatus::WANT_READ:
  181. WaitPoller(true, false, "ReadEstablish");
  182. break;
  183. case NInterconnect::TSecureSocket::EStatus::WANT_WRITE:
  184. WaitPoller(false, true, "WriteEstablish");
  185. break;
  186. }
  187. }
  188. }
  189. void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override {
  190. switch (const ui32 type = ev->GetTypeRewrite()) {
  191. case TEvents::TSystem::Wakeup:
  192. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, Sprintf("Handshake timed out, State# %s", State.data()), true);
  193. [[fallthrough]];
  194. case ui32(ENetwork::NodeInfo):
  195. case TEvInterconnect::EvNodeAddress:
  196. case ui32(ENetwork::ResolveError):
  197. break; // most likely a race with resolve timeout
  198. case TEvPollerReady::EventType:
  199. break;
  200. default:
  201. Y_FAIL("unexpected event 0x%08" PRIx32, type);
  202. }
  203. }
  204. template<typename T>
  205. void SetupVersionTag(T& proto) {
  206. if (Common->VersionInfo) {
  207. proto.SetVersionTag(Common->VersionInfo->Tag);
  208. for (const TString& accepted : Common->VersionInfo->AcceptedTags) {
  209. proto.AddAcceptedVersionTags(accepted);
  210. }
  211. }
  212. }
  213. template<typename T>
  214. void SetupClusterUUID(T& proto) {
  215. auto *pb = proto.MutableClusterUUIDs();
  216. pb->SetClusterUUID(Common->ClusterUUID);
  217. for (const TString& uuid : Common->AcceptUUID) {
  218. pb->AddAcceptUUID(uuid);
  219. }
  220. }
  221. template<typename T, typename TCallback>
  222. void ValidateVersionTag(const T& proto, TCallback&& errorCallback) {
  223. // check if we will accept peer's version tag (if peer provides one and if we have accepted list non-empty)
  224. if (Common->VersionInfo) {
  225. if (!proto.HasVersionTag()) {
  226. LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH06", NLog::PRI_WARN,
  227. "peer did not report VersionTag, accepting by default");
  228. } else if (!Common->VersionInfo->AcceptedTags.count(proto.GetVersionTag())) {
  229. // we will not accept peer's tag, so check if remote peer would accept our version tag
  230. size_t i;
  231. for (i = 0; i < proto.AcceptedVersionTagsSize() && Common->VersionInfo->Tag != proto.GetAcceptedVersionTags(i); ++i)
  232. {}
  233. if (i == proto.AcceptedVersionTagsSize()) {
  234. // peer will neither accept our version -- this is total failure
  235. TStringStream s("local/peer version tags did not match accepted ones");
  236. s << " local Tag# " << Common->VersionInfo->Tag << " accepted Tags# [";
  237. bool first = true;
  238. for (const auto& tag : Common->VersionInfo->AcceptedTags) {
  239. s << (std::exchange(first, false) ? "" : " ") << tag;
  240. }
  241. s << "] peer Tag# " << proto.GetVersionTag() << " accepted Tags# [";
  242. first = true;
  243. for (const auto& tag : proto.GetAcceptedVersionTags()) {
  244. s << (std::exchange(first, false) ? "" : " ") << tag;
  245. }
  246. s << "]";
  247. errorCallback(s.Str());
  248. }
  249. }
  250. }
  251. }
  252. template<typename T, typename TCallback>
  253. void ValidateClusterUUID(const T& proto, TCallback&& errorCallback, const TMaybe<TString>& uuid = {}) {
  254. auto formatList = [](const auto& list) {
  255. TStringStream s;
  256. s << "[";
  257. for (auto it = list.begin(); it != list.end(); ++it) {
  258. if (it != list.begin()) {
  259. s << " ";
  260. }
  261. s << *it;
  262. }
  263. s << "]";
  264. return s.Str();
  265. };
  266. if (!Common->AcceptUUID) {
  267. return; // promiscuous mode -- we accept every other peer
  268. }
  269. if (!proto.HasClusterUUIDs()) {
  270. if (uuid) {
  271. // old-style checking, peer does not support symmetric protoocol
  272. bool matching = false;
  273. for (const TString& accepted : Common->AcceptUUID) {
  274. if (*uuid == accepted) {
  275. matching = true;
  276. break;
  277. }
  278. }
  279. if (!matching) {
  280. errorCallback(Sprintf("Peer ClusterUUID# %s mismatch, AcceptUUID# %s", uuid->data(), formatList(Common->AcceptUUID).data()));
  281. }
  282. }
  283. return; // remote side did not fill in this field -- old version, symmetric protocol is not supported
  284. }
  285. const auto& uuids = proto.GetClusterUUIDs();
  286. // check if our UUID matches remote accept list
  287. for (const TString& item : uuids.GetAcceptUUID()) {
  288. if (item == Common->ClusterUUID) {
  289. return; // match
  290. }
  291. }
  292. // check if remote UUID matches our accept list
  293. const TString& remoteUUID = uuids.GetClusterUUID();
  294. for (const TString& item : Common->AcceptUUID) {
  295. if (item == remoteUUID) {
  296. return; // match
  297. }
  298. }
  299. // no match
  300. errorCallback(Sprintf("Peer ClusterUUID# %s mismatch, AcceptUUID# %s", remoteUUID.data(), formatList(Common->AcceptUUID).data()));
  301. }
  302. void ParsePeerScopeId(const NActorsInterconnect::TScopeId& proto) {
  303. Params.PeerScopeId = {proto.GetX1(), proto.GetX2()};
  304. }
  305. void FillInScopeId(NActorsInterconnect::TScopeId& proto) {
  306. const TScopeId& scope = Common->LocalScopeId;
  307. proto.SetX1(scope.first);
  308. proto.SetX2(scope.second);
  309. }
  310. template<typename T>
  311. void ReportProto(const T& protobuf, const char *msg) {
  312. auto formatString = [&] {
  313. google::protobuf::TextFormat::Printer p;
  314. p.SetSingleLineMode(true);
  315. TString s;
  316. p.PrintToString(protobuf, &s);
  317. return s;
  318. };
  319. LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH07", NLog::PRI_DEBUG, "%s %s", msg,
  320. formatString().data());
  321. }
  322. bool CheckPeerCookie(const TString& cookie, TString *error) {
  323. // create a temporary socket to connect to the peer
  324. TSocketPtr tempSocket;
  325. std::swap(tempSocket, Socket);
  326. TPollerToken::TPtr tempPollerToken;
  327. std::swap(tempPollerToken, PollerToken);
  328. // set up virtual self id to ensure peer will not drop our connection
  329. char buf[12] = {'c', 'o', 'o', 'k', 'i', 'e', ' ', 'c', 'h', 'e', 'c', 'k'};
  330. SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12));
  331. bool success = true;
  332. try {
  333. // issue connection and send initial packet
  334. Connect(false);
  335. SendInitialPacket();
  336. // wait for basic response
  337. TInitialPacket response;
  338. ReceiveData(&response, sizeof(response), "ReceiveResponse");
  339. if (!response.Check()) {
  340. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error");
  341. } else if (response.Header.Version != INTERCONNECT_PROTOCOL_VERSION) {
  342. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, Sprintf("Incompatible protocol %" PRIu64, response.Header.Version));
  343. }
  344. // issue cookie check request
  345. NActorsInterconnect::THandshakeRequest request;
  346. request.SetProtocol(INTERCONNECT_PROTOCOL_VERSION);
  347. request.SetProgramPID(0);
  348. request.SetProgramStartTime(0);
  349. request.SetSerial(0);
  350. request.SetReceiverNodeId(0);
  351. request.SetSenderActorId(TString());
  352. request.SetCookie(cookie);
  353. request.SetDoCheckCookie(true);
  354. SendExBlock(request, "SendExBlockDoCheckCookie");
  355. // process cookie check reply
  356. NActorsInterconnect::THandshakeReply reply;
  357. if (!reply.ParseFromString(ReceiveExBlock("ReceiveExBlockDoCheckCookie"))) {
  358. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect packet from peer");
  359. } else if (reply.HasCookieCheckResult() && !reply.GetCookieCheckResult()) {
  360. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Cookie check error -- possible network problem");
  361. }
  362. } catch (const TExHandshakeFailed& e) {
  363. *error = e.what();
  364. success = false;
  365. }
  366. // restore state
  367. SelfVirtualId = TActorId();
  368. std::swap(tempSocket, Socket);
  369. std::swap(tempPollerToken, PollerToken);
  370. return success;
  371. }
  372. void PerformOutgoingHandshake() {
  373. LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH01", NLog::PRI_DEBUG,
  374. "starting outgoing handshake");
  375. // perform connection
  376. Connect(true);
  377. // send initial request packet
  378. SendInitialPacket();
  379. TInitialPacket response;
  380. ReceiveData(&response, sizeof(response), "ReceiveResponse");
  381. if (!response.Check()) {
  382. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error");
  383. } else if (response.Header.Version != INTERCONNECT_PROTOCOL_VERSION) {
  384. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, Sprintf("Incompatible protocol %" PRIu64, response.Header.Version));
  385. }
  386. // extract next packet
  387. NextPacketFromPeer = response.Header.NextPacket;
  388. if (!PeerVirtualId) {
  389. // creating new session -- we have to generate request
  390. NActorsInterconnect::THandshakeRequest request;
  391. request.SetProtocol(INTERCONNECT_PROTOCOL_VERSION);
  392. request.SetProgramPID(GetPID());
  393. request.SetProgramStartTime(Common->StartTime);
  394. request.SetSerial(SelfVirtualId.LocalId());
  395. request.SetReceiverNodeId(PeerNodeId);
  396. request.SetSenderActorId(SelfVirtualId.ToString());
  397. request.SetSenderHostName(Common->TechnicalSelfHostName);
  398. request.SetReceiverHostName(PeerHostName);
  399. if (Common->LocalScopeId != TScopeId()) {
  400. FillInScopeId(*request.MutableClientScopeId());
  401. }
  402. if (Common->Cookie) {
  403. request.SetCookie(Common->Cookie);
  404. }
  405. if (Common->ClusterUUID) {
  406. request.SetUUID(Common->ClusterUUID);
  407. }
  408. SetupClusterUUID(request);
  409. SetupVersionTag(request);
  410. if (const ui32 size = Common->HandshakeBallastSize) {
  411. TString ballast(size, 0);
  412. char* data = ballast.Detach();
  413. for (ui32 i = 0; i < size; ++i) {
  414. data[i] = i;
  415. }
  416. request.SetBallast(ballast);
  417. }
  418. switch (Common->Settings.EncryptionMode) {
  419. case EEncryptionMode::DISABLED:
  420. break;
  421. case EEncryptionMode::OPTIONAL:
  422. request.SetRequireEncryption(false);
  423. break;
  424. case EEncryptionMode::REQUIRED:
  425. request.SetRequireEncryption(true);
  426. break;
  427. }
  428. request.SetRequestModernFrame(true);
  429. request.SetRequestAuthOnly(Common->Settings.TlsAuthOnly);
  430. SendExBlock(request, "ExRequest");
  431. NActorsInterconnect::THandshakeReply reply;
  432. if (!reply.ParseFromString(ReceiveExBlock("ExReply"))) {
  433. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect THandshakeReply");
  434. }
  435. ReportProto(reply, "ReceiveExBlock ExReply");
  436. if (reply.HasErrorExplaination()) {
  437. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "error from peer: " + reply.GetErrorExplaination());
  438. } else if (!reply.HasSuccess()) {
  439. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "empty reply");
  440. }
  441. auto generateError = [this](TString msg) {
  442. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, msg);
  443. };
  444. const auto& success = reply.GetSuccess();
  445. ValidateClusterUUID(success, generateError);
  446. ValidateVersionTag(success, generateError);
  447. const auto& s = success.GetSenderActorId();
  448. PeerVirtualId.Parse(s.data(), s.size());
  449. // recover flags
  450. Params.Encryption = success.GetStartEncryption();
  451. Params.UseModernFrame = success.GetUseModernFrame();
  452. Params.AuthOnly = Params.Encryption && success.GetAuthOnly();
  453. if (success.HasServerScopeId()) {
  454. ParsePeerScopeId(success.GetServerScopeId());
  455. }
  456. // recover peer process info from peer's reply
  457. ProgramInfo = GetProgramInfo(success);
  458. } else if (!response.Header.SelfVirtualId) {
  459. // peer reported error -- empty ack was generated by proxy for this request
  460. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH, "Peer rejected session continuation handshake");
  461. } else if (response.Header.SelfVirtualId != PeerVirtualId || response.Header.PeerVirtualId != SelfVirtualId) {
  462. // resuming existing session; check that virtual ids of peers match each other
  463. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH, "Session virtual ID mismatch");
  464. } else {
  465. ProgramInfo.ConstructInPlace(); // successful handshake
  466. }
  467. }
  468. void PerformIncomingHandshake() {
  469. LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH02", NLog::PRI_DEBUG,
  470. "starting incoming handshake");
  471. // set up incoming socket
  472. SetupSocket();
  473. // wait for initial request packet
  474. TInitialPacket request;
  475. ReceiveData(&request, sizeof(request), "ReceiveRequest");
  476. if (!request.Check()) {
  477. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error");
  478. } else if (request.Header.Version != INTERCONNECT_PROTOCOL_VERSION) {
  479. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, Sprintf("Incompatible protocol %" PRIu64, request.Header.Version));
  480. }
  481. // extract peer node id from the peer
  482. PeerNodeId = request.Header.SelfVirtualId.NodeId();
  483. if (!PeerNodeId) {
  484. Y_VERIFY_DEBUG(false, "PeerNodeId is zero request# %s", request.ToString().data());
  485. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "SelfVirtualId.NodeId is empty in initial packet");
  486. }
  487. UpdatePrefix();
  488. // extract next packet
  489. NextPacketFromPeer = request.Header.NextPacket;
  490. if (request.Header.PeerVirtualId) {
  491. // issue request to the proxy and wait for the response
  492. auto reply = AskProxy<TEvHandshakeAck, TEvHandshakeNak>(MakeHolder<TEvHandshakeAsk>(
  493. request.Header.SelfVirtualId, request.Header.PeerVirtualId, request.Header.NextPacket),
  494. "TEvHandshakeAsk");
  495. if (auto *ack = reply->CastAsLocal<TEvHandshakeAck>()) {
  496. // extract self/peer virtual ids
  497. SelfVirtualId = ack->Self;
  498. PeerVirtualId = request.Header.SelfVirtualId;
  499. NextPacketToPeer = ack->NextPacket;
  500. Params = ack->Params;
  501. // only succeed in case when proxy returned valid SelfVirtualId; otherwise it wants us to terminate
  502. // the handshake process and it does not expect the handshake reply
  503. ProgramInfo.ConstructInPlace();
  504. } else {
  505. LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH08", NLog::PRI_NOTICE,
  506. "Continuation request rejected by proxy");
  507. // report continuation reject to peer
  508. SelfVirtualId = TActorId();
  509. PeerVirtualId = TActorId();
  510. NextPacketToPeer = 0;
  511. }
  512. // issue response to the peer
  513. SendInitialPacket();
  514. } else {
  515. // peer wants a new session, clear fields and send initial packet
  516. SelfVirtualId = TActorId();
  517. PeerVirtualId = TActorId();
  518. NextPacketToPeer = 0;
  519. SendInitialPacket();
  520. // wait for extended request
  521. auto ev = MakeHolder<TEvHandshakeRequest>();
  522. auto& request = ev->Record;
  523. if (!request.ParseFromString(ReceiveExBlock("ExRequest"))) {
  524. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect THandshakeRequest");
  525. }
  526. ReportProto(request, "ReceiveExBlock ExRequest");
  527. auto generateError = [this](TString msg) {
  528. // issue reply to the peer to prevent repeating connection retries
  529. NActorsInterconnect::THandshakeReply reply;
  530. reply.SetErrorExplaination(msg);
  531. SendExBlock(reply, "ExReply");
  532. // terminate ths handshake
  533. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, msg);
  534. };
  535. // check request cookie
  536. TString error;
  537. if (request.HasDoCheckCookie()) {
  538. NActorsInterconnect::THandshakeReply reply;
  539. reply.SetCookieCheckResult(request.GetCookie() == Common->Cookie);
  540. SendExBlock(reply, "ExReplyDoCheckCookie");
  541. throw TExHandshakeFailed();
  542. } else if (request.HasCookie() && !CheckPeerCookie(request.GetCookie(), &error)) {
  543. generateError(TStringBuilder() << "Peer connectivity-checking failed, error# " << error);
  544. }
  545. // update log prefix with the reported peer host name
  546. PeerHostName = request.GetSenderHostName();
  547. // parse peer virtual id
  548. const auto& str = request.GetSenderActorId();
  549. PeerVirtualId.Parse(str.data(), str.size());
  550. // validate request
  551. ValidateClusterUUID(request, generateError, request.GetUUID());
  552. if (request.GetReceiverNodeId() != SelfActorId.NodeId()) {
  553. generateError(Sprintf("Incorrect ReceiverNodeId# %" PRIu32 " from the peer, expected# %" PRIu32,
  554. request.GetReceiverNodeId(), SelfActorId.NodeId()));
  555. } else if (request.GetReceiverHostName() != Common->TechnicalSelfHostName) {
  556. generateError(Sprintf("ReceiverHostName# %s mismatch, expected# %s", request.GetReceiverHostName().data(),
  557. Common->TechnicalSelfHostName.data()));
  558. }
  559. ValidateVersionTag(request, generateError);
  560. // check peer node
  561. auto peerNodeInfo = GetPeerNodeInfo();
  562. if (!peerNodeInfo) {
  563. generateError("Peer node not registered in nameservice");
  564. } else if (peerNodeInfo->Host != request.GetSenderHostName()) {
  565. generateError("SenderHostName mismatch");
  566. }
  567. // check request against encryption
  568. switch (Common->Settings.EncryptionMode) {
  569. case EEncryptionMode::DISABLED:
  570. if (request.GetRequireEncryption()) {
  571. generateError("Peer requested encryption, but it is disabled locally");
  572. }
  573. break;
  574. case EEncryptionMode::OPTIONAL:
  575. Params.Encryption = request.HasRequireEncryption();
  576. break;
  577. case EEncryptionMode::REQUIRED:
  578. if (!request.HasRequireEncryption()) {
  579. generateError("Peer did not request encryption, but it is required locally");
  580. }
  581. Params.Encryption = true;
  582. break;
  583. }
  584. Params.UseModernFrame = request.GetRequestModernFrame();
  585. Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly;
  586. if (request.HasClientScopeId()) {
  587. ParsePeerScopeId(request.GetClientScopeId());
  588. }
  589. // remember program info (assuming successful handshake)
  590. ProgramInfo = GetProgramInfo(request);
  591. // send to proxy
  592. auto reply = AskProxy<TEvHandshakeReplyOK, TEvHandshakeReplyError>(std::move(ev), "TEvHandshakeRequest");
  593. // parse it
  594. if (auto ev = reply->CastAsLocal<TEvHandshakeReplyOK>()) {
  595. // issue successful reply to the peer
  596. auto& record = ev->Record;
  597. Y_VERIFY(record.HasSuccess());
  598. auto& success = *record.MutableSuccess();
  599. SetupClusterUUID(success);
  600. SetupVersionTag(success);
  601. success.SetStartEncryption(Params.Encryption);
  602. if (Common->LocalScopeId != TScopeId()) {
  603. FillInScopeId(*success.MutableServerScopeId());
  604. }
  605. success.SetUseModernFrame(Params.UseModernFrame);
  606. success.SetAuthOnly(Params.AuthOnly);
  607. SendExBlock(record, "ExReply");
  608. // extract sender actor id (self virtual id)
  609. const auto& str = success.GetSenderActorId();
  610. SelfVirtualId.Parse(str.data(), str.size());
  611. } else if (auto ev = reply->CastAsLocal<TEvHandshakeReplyError>()) {
  612. // in case of error just send reply to the peer and terminate handshake
  613. SendExBlock(ev->Record, "ExReply");
  614. ProgramInfo.Clear(); // do not issue reply to the proxy
  615. } else {
  616. Y_FAIL("unexpected event Type# 0x%08" PRIx32, reply->GetTypeRewrite());
  617. }
  618. }
  619. }
  620. template <typename T>
  621. void SendExBlock(const T& proto, const char* what) {
  622. TString data;
  623. Y_PROTOBUF_SUPPRESS_NODISCARD proto.SerializeToString(&data);
  624. Y_VERIFY(data.size() <= TExHeader::MaxSize);
  625. ReportProto(proto, Sprintf("SendExBlock %s", what).data());
  626. TExHeader header;
  627. header.Size = data.size();
  628. header.Sign(data.data(), data.size());
  629. SendData(&header, sizeof(header), Sprintf("Send%sHeader", what));
  630. SendData(data.data(), data.size(), Sprintf("Send%sData", what));
  631. }
  632. TString ReceiveExBlock(const char* what) {
  633. TExHeader header;
  634. ReceiveData(&header, sizeof(header), Sprintf("Receive%sHeader", what));
  635. if (header.Size > TExHeader::MaxSize) {
  636. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect extended header size");
  637. }
  638. TString data;
  639. data.resize(header.Size);
  640. ReceiveData(data.Detach(), data.size(), Sprintf("Receive%sData", what));
  641. if (!header.Check(data.data(), data.size())) {
  642. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Extended header CRC error");
  643. }
  644. return data;
  645. }
  646. private:
  647. void SendToProxy(THolder<IEventBase> ev) {
  648. Y_VERIFY(PeerNodeId);
  649. Send(GetActorSystem()->InterconnectProxy(PeerNodeId), ev.Release());
  650. }
  651. template <typename TEvent>
  652. THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) {
  653. State = std::move(state);
  654. return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline);
  655. }
  656. template <typename T1, typename T2, typename... TEvents>
  657. THolder<IEventHandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) {
  658. State = std::move(state);
  659. return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline);
  660. }
  661. template <typename TEvent>
  662. THolder<typename TEvent::THandle> AskProxy(THolder<IEventBase> ev, TString state) {
  663. SendToProxy(std::move(ev));
  664. return WaitForSpecificEvent<TEvent>(std::move(state));
  665. }
  666. template <typename T1, typename T2, typename... TOther>
  667. THolder<IEventHandle> AskProxy(THolder<IEventBase> ev, TString state) {
  668. SendToProxy(std::move(ev));
  669. return WaitForSpecificEvent<T1, T2, TOther...>(std::move(state));
  670. }
  671. void Fail(TEvHandshakeFail::EnumHandshakeFail reason, TString explanation, bool network = false) {
  672. TString msg = Sprintf("%s Peer# %s(%s) %s%s", HandshakeKind.data(), PeerHostName ? PeerHostName.data() : "<unknown>",
  673. PeerAddr.size() ? PeerAddr.data() : "<unknown>", ResolveTimedOut ? "[resolve timeout] " : "",
  674. explanation.data());
  675. if (network) {
  676. TInstant now = Now();
  677. TInstant prevLog = LastLogNotice[PeerNodeId];
  678. NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG;
  679. if (now - prevLog > MuteDuration) {
  680. logPriority = NActors::NLog::PRI_NOTICE;
  681. LastLogNotice[PeerNodeId] = now;
  682. }
  683. LOG_LOG_NET_X(logPriority, PeerNodeId, "network-related error occured on handshake: %s", msg.data());
  684. } else {
  685. // calculate log severity based on failure type; permanent failures lead to error log messages
  686. auto severity = reason == TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT
  687. ? NActors::NLog::PRI_NOTICE
  688. : NActors::NLog::PRI_INFO;
  689. LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH03", severity, "handshake failed, explanation# %s", msg.data());
  690. }
  691. if (PeerNodeId) {
  692. SendToProxy(MakeHolder<TEvHandshakeFail>(reason, std::move(msg)));
  693. }
  694. throw TExHandshakeFailed() << explanation;
  695. }
  696. private:
  697. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  698. // COMMUNICATION BLOCK
  699. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  700. void Connect(bool updatePeerAddr) {
  701. // issue request to a nameservice to resolve peer node address
  702. Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, Deadline));
  703. // wait for the result
  704. auto ev = WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>("ResolveNode",
  705. Now() + ResolveTimeout);
  706. // extract address from the result
  707. NInterconnect::TAddress address;
  708. if (!ev) {
  709. ResolveTimedOut = true;
  710. if (auto peerNodeInfo = GetPeerNodeInfo(); peerNodeInfo && peerNodeInfo->Address) {
  711. address = {peerNodeInfo->Address, peerNodeInfo->Port};
  712. } else {
  713. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve timed out and no static address defined", true);
  714. }
  715. } else if (auto *p = ev->CastAsLocal<TEvLocalNodeInfo>()) {
  716. if (!p->Address) {
  717. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true);
  718. }
  719. address = {*p->Address};
  720. } else if (auto *p = ev->CastAsLocal<TEvInterconnect::TEvNodeAddress>()) {
  721. const auto& r = p->Record;
  722. if (!r.HasAddress() || !r.HasPort()) {
  723. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true);
  724. }
  725. address = {r.GetAddress(), static_cast<ui16>(r.GetPort())};
  726. } else {
  727. Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError));
  728. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain, true);
  729. }
  730. // create the socket with matching address family
  731. Socket = NInterconnect::TStreamSocket::Make(address.GetFamily());
  732. if (*Socket == -1) {
  733. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: failed to create socket");
  734. }
  735. // extract peer address
  736. if (updatePeerAddr) {
  737. PeerAddr = address.ToString();
  738. }
  739. // set up socket parameters
  740. SetupSocket();
  741. // start connecting
  742. switch (int err = -Socket->Connect(address)) {
  743. case 0: // successful connection
  744. break;
  745. case EINPROGRESS: // connection in progress
  746. WaitPoller(false, true, "WaitConnect");
  747. err = Socket->GetConnectStatus();
  748. if (err) {
  749. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, Sprintf("Connection failed: %s", strerror(err)), true);
  750. }
  751. break;
  752. default:
  753. break;
  754. }
  755. auto it = LastLogNotice.find(PeerNodeId);
  756. NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG;
  757. if (it != LastLogNotice.end()) {
  758. LastLogNotice.erase(it);
  759. logPriority = NActors::NLog::PRI_NOTICE;
  760. }
  761. LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH05", logPriority, "connected to peer");
  762. }
  763. void SetupSocket() {
  764. // switch to nonblocking mode
  765. try {
  766. SetNonBlock(*Socket);
  767. SetNoDelay(*Socket, true);
  768. } catch (...) {
  769. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: can't up nonblocking mode for socket");
  770. }
  771. // setup send buffer size
  772. Socket->SetSendBufferSize(Common->Settings.GetSendBufferSize());
  773. // register in poller
  774. RegisterInPoller();
  775. }
  776. void RegisterInPoller() {
  777. const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, SelfActorId, SelfActorId));
  778. Y_VERIFY(success);
  779. auto result = WaitForSpecificEvent<TEvPollerRegisterResult>("RegisterPoller");
  780. PollerToken = std::move(result->Get()->PollerToken);
  781. Y_VERIFY(PollerToken);
  782. Y_VERIFY(PollerToken->RefCount() == 1); // ensure exclusive ownership
  783. }
  784. void SendInitialPacket() {
  785. TInitialPacket packet(SelfVirtualId, PeerVirtualId, NextPacketToPeer, INTERCONNECT_PROTOCOL_VERSION);
  786. SendData(&packet, sizeof(packet), "SendInitialPacket");
  787. }
  788. void WaitPoller(bool read, bool write, TString state) {
  789. PollerToken->Request(read, write);
  790. WaitForSpecificEvent<TEvPollerReady>(std::move(state));
  791. }
  792. template <typename TDataPtr, typename TSendRecvFunc>
  793. void Process(TDataPtr buffer, size_t len, TSendRecvFunc&& sendRecv, bool read, bool write, TString state) {
  794. Y_VERIFY(Socket);
  795. NInterconnect::TStreamSocket* sock = Socket.Get();
  796. ssize_t (NInterconnect::TStreamSocket::*pfn)(TDataPtr, size_t, TString*) const = sendRecv;
  797. size_t processed = 0;
  798. auto error = [&](TString msg) {
  799. Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, Sprintf("Socket error# %s state# %s processed# %zu remain# %zu",
  800. msg.data(), state.data(), processed, len), true);
  801. };
  802. while (len) {
  803. TString err;
  804. ssize_t nbytes = (sock->*pfn)(buffer, len, &err);
  805. if (nbytes > 0) {
  806. buffer = (char*)buffer + nbytes;
  807. len -= nbytes;
  808. processed += nbytes;
  809. } else if (-nbytes == EAGAIN || -nbytes == EWOULDBLOCK) {
  810. WaitPoller(read, write, state);
  811. } else if (!nbytes) {
  812. error("connection unexpectedly closed");
  813. } else if (-nbytes != EINTR) {
  814. error(err ? err : TString(strerror(-nbytes)));
  815. }
  816. }
  817. }
  818. void SendData(const void* buffer, size_t len, TString state) {
  819. Process(buffer, len, &NInterconnect::TStreamSocket::Send, false, true, std::move(state));
  820. }
  821. void ReceiveData(void* buffer, size_t len, TString state) {
  822. Process(buffer, len, &NInterconnect::TStreamSocket::Recv, true, false, std::move(state));
  823. }
  824. THolder<TEvInterconnect::TNodeInfo> GetPeerNodeInfo() {
  825. Y_VERIFY(PeerNodeId);
  826. Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, Deadline));
  827. auto response = WaitForSpecificEvent<TEvInterconnect::TEvNodeInfo>("GetPeerNodeInfo");
  828. return std::move(response->Get()->Node);
  829. }
  830. template <typename T>
  831. static THolder<TProgramInfo> GetProgramInfo(const T& proto) {
  832. auto programInfo = MakeHolder<TProgramInfo>();
  833. programInfo->PID = proto.GetProgramPID();
  834. programInfo->StartTime = proto.GetProgramStartTime();
  835. programInfo->Serial = proto.GetSerial();
  836. return programInfo;
  837. }
  838. };
  839. IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
  840. const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
  841. TSessionParams params) {
  842. return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket,
  843. std::move(peerHostName), std::move(params)));
  844. }
  845. IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) {
  846. return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket)));
  847. }
  848. }