client_impl.cpp 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283
  1. #include "client_impl.h"
  2. #include "helpers.h"
  3. #include <contrib/libs/grpc/include/grpc/grpc.h>
  4. #include <contrib/libs/grpc/src/core/lib/gpr/string.h>
  5. #include <contrib/libs/grpc/src/core/lib/gprpp/fork.h>
  6. #include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
  7. #include <util/charset/utf8.h>
  8. #include <util/generic/size_literals.h>
  9. #include <util/system/env.h>
  10. using namespace NThreading;
  11. using namespace NMonitoring;
  12. namespace NUnifiedAgent::NPrivate {
  13. std::shared_ptr<grpc::Channel> CreateChannel(const grpc::string& target) {
  14. grpc::ChannelArguments args;
  15. args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE);
  16. args.SetMaxReceiveMessageSize(Max<int>());
  17. args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
  18. args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000);
  19. args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
  20. args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 200);
  21. args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
  22. args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
  23. args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
  24. args.SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
  25. args.SetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE, 1024*1024);
  26. return grpc::CreateCustomChannel(target, grpc::InsecureChannelCredentials(), args);
  27. }
  28. void AddMeta(NUnifiedAgentProto::Request_Initialize& init, const TString& name, const TString& value) {
  29. auto* metaItem = init.MutableMeta()->Add();
  30. metaItem->SetName(name);
  31. metaItem->SetValue(value);
  32. }
  33. std::atomic<ui64> TClient::Id{0};
  34. TClient::TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector)
  35. : Parameters(parameters)
  36. , ForkProtector(forkProtector)
  37. , Counters(parameters.Counters ? parameters.Counters : MakeIntrusive<TClientCounters>())
  38. , Log(parameters.Log)
  39. , MainLogger(Log, MakeFMaybe(Parameters.LogRateLimitBytes))
  40. , Logger(MainLogger.Child(Sprintf("ua_%" PRIu64, Id.fetch_add(1))))
  41. , Channel(nullptr)
  42. , Stub(nullptr)
  43. , ActiveCompletionQueue(nullptr)
  44. , SessionLogLabel(0)
  45. , ActiveSessions()
  46. , Started(false)
  47. , Destroyed(false)
  48. , Lock()
  49. {
  50. MainLogger.SetDroppedBytesCounter(&Counters->ClientLogDroppedBytes);
  51. if (ForkProtector != nullptr) {
  52. ForkProtector->Register(*this);
  53. }
  54. EnsureStarted();
  55. YLOG_INFO(Sprintf("created, uri [%s]", Parameters.Uri.c_str()));
  56. }
  57. TClient::~TClient() {
  58. with_lock(Lock) {
  59. Y_ABORT_UNLESS(ActiveSessions.empty(), "active sessions found");
  60. EnsureStoppedNoLock();
  61. Destroyed = true;
  62. }
  63. if (ForkProtector != nullptr) {
  64. ForkProtector->Unregister(*this);
  65. }
  66. YLOG_DEBUG(Sprintf("destroyed, uri [%s]", Parameters.Uri.c_str()));
  67. }
  68. TClientSessionPtr TClient::CreateSession(const TSessionParameters& parameters) {
  69. return MakeIntrusive<TClientSession>(this, parameters);
  70. }
  71. void TClient::StartTracing(ELogPriority logPriority) {
  72. MainLogger.StartTracing(logPriority);
  73. StartGrpcTracing();
  74. YLOG_INFO("tracing started");
  75. }
  76. void TClient::FinishTracing() {
  77. FinishGrpcTracing();
  78. MainLogger.FinishTracing();
  79. YLOG_INFO("tracing finished");
  80. }
  81. void TClient::RegisterSession(TClientSession* session) {
  82. with_lock(Lock) {
  83. ActiveSessions.push_back(session);
  84. }
  85. }
  86. void TClient::UnregisterSession(TClientSession* session) {
  87. with_lock(Lock) {
  88. const auto it = Find(ActiveSessions, session);
  89. Y_ABORT_UNLESS(it != ActiveSessions.end());
  90. ActiveSessions.erase(it);
  91. }
  92. }
  93. void TClient::PreFork() {
  94. YLOG_INFO("pre fork started");
  95. Lock.Acquire();
  96. auto futures = TVector<TFuture<void>>(Reserve(ActiveSessions.size()));
  97. for (auto* s: ActiveSessions) {
  98. futures.push_back(s->PreFork());
  99. }
  100. YLOG_INFO("waiting for sessions");
  101. WaitAll(futures).Wait();
  102. EnsureStoppedNoLock();
  103. YLOG_INFO("shutdown grpc executor");
  104. grpc_core::Executor::SetThreadingAll(false);
  105. YLOG_INFO("pre fork finished");
  106. }
  107. void TClient::PostForkParent() {
  108. YLOG_INFO("post fork parent started");
  109. if (!Destroyed) {
  110. EnsureStartedNoLock();
  111. }
  112. Lock.Release();
  113. for (auto* s: ActiveSessions) {
  114. s->PostForkParent();
  115. }
  116. YLOG_INFO("post fork parent finished");
  117. }
  118. void TClient::PostForkChild() {
  119. YLOG_INFO("post fork child started");
  120. Lock.Release();
  121. for (auto* s: ActiveSessions) {
  122. s->PostForkChild();
  123. }
  124. YLOG_INFO("post fork child finished");
  125. }
  126. void TClient::EnsureStarted() {
  127. with_lock(Lock) {
  128. EnsureStartedNoLock();
  129. }
  130. }
  131. void TClient::EnsureStartedNoLock() {
  132. // Lock must be held
  133. if (Started) {
  134. return;
  135. }
  136. Channel = CreateChannel(Parameters.Uri);
  137. Stub = NUnifiedAgentProto::UnifiedAgentService::NewStub(Channel);
  138. ActiveCompletionQueue = MakeHolder<TGrpcCompletionQueueHost>();
  139. ActiveCompletionQueue->Start();
  140. Started = true;
  141. }
  142. void TClient::EnsureStoppedNoLock() {
  143. // Lock must be held
  144. if (!Started) {
  145. return;
  146. }
  147. YLOG_DEBUG("stopping");
  148. ActiveCompletionQueue->Stop();
  149. ActiveCompletionQueue = nullptr;
  150. Stub = nullptr;
  151. Channel = nullptr;
  152. YLOG_DEBUG("stopped");
  153. Started = false;
  154. }
  155. TScopeLogger TClient::CreateSessionLogger() {
  156. return Logger.Child(ToString(SessionLogLabel.fetch_add(1)));
  157. }
  158. TForkProtector::TForkProtector()
  159. : Clients()
  160. , GrpcInitializer()
  161. , Enabled(grpc_core::Fork::Enabled())
  162. , Lock()
  163. {
  164. }
  165. void TForkProtector::Register(TClient& client) {
  166. if (!Enabled) {
  167. return;
  168. }
  169. Y_ABORT_UNLESS(grpc_is_initialized());
  170. Y_ABORT_UNLESS(grpc_core::Fork::Enabled());
  171. with_lock(Lock) {
  172. Clients.push_back(&client);
  173. }
  174. }
  175. void TForkProtector::Unregister(TClient& client) {
  176. if (!Enabled) {
  177. return;
  178. }
  179. with_lock(Lock) {
  180. const auto it = Find(Clients, &client);
  181. Y_ABORT_UNLESS(it != Clients.end());
  182. Clients.erase(it);
  183. }
  184. }
  185. std::shared_ptr<TForkProtector> TForkProtector::Get(bool createIfNotExists) {
  186. with_lock(InstanceLock) {
  187. auto result = Instance.lock();
  188. if (!result && createIfNotExists) {
  189. result = std::make_shared<TForkProtector>();
  190. if (!result->Enabled) {
  191. TLog log("cerr");
  192. TLogger logger(log, Nothing());
  193. auto scopeLogger = logger.Child("ua client");
  194. YLOG(TLOG_WARNING,
  195. "Grpc is already initialized, can't enable fork support. "
  196. "If forks are possible, please set environment variable GRPC_ENABLE_FORK_SUPPORT to 'true'. "
  197. "If not, you can suppress this warning by setting EnableForkSupport "
  198. "to false when creating the ua client.",
  199. scopeLogger);
  200. } else if (!SubscribedToForks) {
  201. SubscribedToForks = true;
  202. #ifdef _unix_
  203. pthread_atfork(
  204. &TForkProtector::PreFork,
  205. &TForkProtector::PostForkParent,
  206. &TForkProtector::PostForkChild);
  207. #endif
  208. }
  209. Instance = result;
  210. }
  211. return result;
  212. }
  213. }
  214. void TForkProtector::PreFork() {
  215. auto self = Get(false);
  216. if (!self) {
  217. return;
  218. }
  219. self->Lock.Acquire();
  220. for (auto* c : self->Clients) {
  221. c->PreFork();
  222. }
  223. }
  224. void TForkProtector::PostForkParent() {
  225. auto self = Get(false);
  226. if (!self) {
  227. return;
  228. }
  229. for (auto* c : self->Clients) {
  230. c->PostForkParent();
  231. }
  232. self->Lock.Release();
  233. }
  234. void TForkProtector::PostForkChild() {
  235. auto self = Get(false);
  236. if (!self) {
  237. return;
  238. }
  239. for (auto* c : self->Clients) {
  240. c->PostForkChild();
  241. }
  242. self->Lock.Release();
  243. }
  244. std::weak_ptr<TForkProtector> TForkProtector::Instance{};
  245. TMutex TForkProtector::InstanceLock{};
  246. bool TForkProtector::SubscribedToForks{false};
  247. TClientSession::TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters)
  248. : AsyncJoiner()
  249. , Client(client)
  250. , OriginalSessionId(MakeFMaybe(parameters.SessionId))
  251. , SessionId(OriginalSessionId)
  252. , Meta(MakeFMaybe(parameters.Meta))
  253. , Logger(Client->CreateSessionLogger())
  254. , CloseStarted(false)
  255. , ForcedCloseStarted(false)
  256. , Closed(false)
  257. , ForkInProgressLocal(false)
  258. , Started(false)
  259. , ClosePromise()
  260. , ActiveGrpcCall(nullptr)
  261. , WriteQueue()
  262. , TrimmedCount(0)
  263. , NextIndex(0)
  264. , AckSeqNo(Nothing())
  265. , PollerLastEventTimestamp()
  266. , Counters(parameters.Counters ? parameters.Counters : Client->GetCounters()->GetDefaultSessionCounters())
  267. , MakeGrpcCallTimer(nullptr)
  268. , ForceCloseTimer(nullptr)
  269. , PollTimer(nullptr)
  270. , GrpcInflightMessages(0)
  271. , GrpcInflightBytes(0)
  272. , InflightBytes(0)
  273. , CloseRequested(false)
  274. , EventsBatchSize(0)
  275. , PollingStatus(EPollingStatus::Inactive)
  276. , EventNotification(nullptr)
  277. , EventNotificationTriggered(false)
  278. , EventsBatch()
  279. , SecondaryEventsBatch()
  280. , ForkInProgress(false)
  281. , Lock()
  282. , MaxInflightBytes(
  283. parameters.MaxInflightBytes.GetOrElse(Client->GetParameters().MaxInflightBytes))
  284. , AgentMaxReceiveMessage(Nothing()) {
  285. if (Meta.Defined() && !IsUtf8(*Meta)) {
  286. throw std::runtime_error("session meta contains non UTF-8 characters");
  287. }
  288. Y_ENSURE(!(Client->GetParameters().EnableForkSupport && SessionId.Defined()),
  289. "explicit session id is not supported with forks");
  290. Client->RegisterSession(this);
  291. with_lock(Lock) {
  292. DoStart();
  293. }
  294. }
  295. TFuture<void> TClientSession::PreFork() {
  296. YLOG_INFO("pre fork started");
  297. Lock.Acquire();
  298. YLOG_INFO("triggering event notification");
  299. if (!EventNotificationTriggered) {
  300. EventNotificationTriggered = true;
  301. EventNotification->Trigger();
  302. }
  303. YLOG_INFO("setting 'fork in progress' flag");
  304. ForkInProgress.store(true);
  305. if (!Started) {
  306. ClosePromise.TrySetValue();
  307. }
  308. YLOG_INFO("pre fork finished");
  309. return ClosePromise.GetFuture();
  310. }
  311. void TClientSession::PostForkParent() {
  312. YLOG_INFO("post fork parent started");
  313. ForkInProgress.store(false);
  314. ForkInProgressLocal = false;
  315. Started = false;
  316. if (!CloseRequested) {
  317. DoStart();
  318. YLOG_INFO("triggering event notification");
  319. EventNotificationTriggered = true;
  320. EventNotification->Trigger();
  321. }
  322. Lock.Release();
  323. YLOG_INFO("post fork parent finished");
  324. }
  325. void TClientSession::PostForkChild() {
  326. YLOG_INFO("post fork child started");
  327. ForkInProgress.store(false);
  328. ForkInProgressLocal = false;
  329. Started = false;
  330. SessionId.Clear();
  331. TrimmedCount = 0;
  332. NextIndex = 0;
  333. AckSeqNo.Clear();
  334. PurgeWriteQueue();
  335. EventsBatch.clear();
  336. SecondaryEventsBatch.clear();
  337. EventsBatchSize = 0;
  338. Lock.Release();
  339. YLOG_INFO("post fork child finished");
  340. }
  341. void TClientSession::SetAgentMaxReceiveMessage(size_t newValue) {
  342. AgentMaxReceiveMessage = newValue;
  343. }
  344. void TClientSession::DoStart() {
  345. // Lock must be held
  346. Y_ABORT_UNLESS(!Started);
  347. YLOG_DEBUG("starting");
  348. Client->EnsureStarted();
  349. MakeGrpcCallTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
  350. MakeIOCallback([this](EIOStatus status) {
  351. if (status == EIOStatus::Error) {
  352. return;
  353. }
  354. MakeGrpcCall();
  355. }, &AsyncJoiner));
  356. ForceCloseTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
  357. MakeIOCallback([this](EIOStatus status) {
  358. if (status == EIOStatus::Error) {
  359. return;
  360. }
  361. YLOG_INFO("ForceCloseTimer");
  362. BeginClose(TInstant::Zero());
  363. }, &AsyncJoiner));
  364. PollTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
  365. MakeIOCallback([this](EIOStatus status) {
  366. if (status == EIOStatus::Error) {
  367. return;
  368. }
  369. Poll();
  370. }, &AsyncJoiner));
  371. EventNotification = MakeHolder<TGrpcNotification>(Client->GetCompletionQueue(),
  372. MakeIOCallback([this](EIOStatus status) {
  373. Y_ABORT_UNLESS(status == EIOStatus::Ok);
  374. Poll();
  375. }, &AsyncJoiner));
  376. CloseStarted = false;
  377. ForcedCloseStarted = false;
  378. Closed = false;
  379. ClosePromise = NewPromise();
  380. EventNotificationTriggered = false;
  381. PollerLastEventTimestamp = Now();
  382. PollingStatus = EPollingStatus::Inactive;
  383. ++Client->GetCounters()->ActiveSessionsCount;
  384. MakeGrpcCallTimer->Set(Now());
  385. YLOG_DEBUG(Sprintf("started, sessionId [%s]", OriginalSessionId.GetOrElse("").c_str()));
  386. Started = true;
  387. }
  388. void TClientSession::MakeGrpcCall() {
  389. if (Closed) {
  390. YLOG_INFO("MakeGrpcCall, session already closed");
  391. return;
  392. }
  393. Y_ABORT_UNLESS(!ForcedCloseStarted);
  394. Y_ABORT_UNLESS(!ActiveGrpcCall);
  395. ActiveGrpcCall = MakeIntrusive<TGrpcCall>(*this);
  396. ActiveGrpcCall->Start();
  397. ++Counters->GrpcCalls;
  398. if (CloseStarted) {
  399. ActiveGrpcCall->BeginClose(false);
  400. }
  401. }
  402. TClientSession::~TClientSession() {
  403. Close(TInstant::Zero());
  404. AsyncJoiner.Join().Wait();
  405. Client->UnregisterSession(this);
  406. YLOG_DEBUG("destroyed");
  407. }
  408. void TClientSession::Send(TClientMessage&& message) {
  409. const size_t messageSize = SizeOf(message);
  410. ++Counters->ReceivedMessages;
  411. Counters->ReceivedBytes += messageSize;
  412. if (messageSize > Client->GetParameters().GrpcMaxMessageSize) {
  413. YLOG_ERR(Sprintf("message size [%zu] is greater than max grpc message size [%zu], message dropped",
  414. messageSize, Client->GetParameters().GrpcMaxMessageSize));
  415. ++Counters->DroppedMessages;
  416. Counters->DroppedBytes += messageSize;
  417. ++Counters->ErrorsCount;
  418. return;
  419. }
  420. if (message.Meta.Defined() && !IsUtf8(*message.Meta)) {
  421. YLOG_ERR("message meta contains non UTF-8 characters, message dropped");
  422. ++Counters->DroppedMessages;
  423. Counters->DroppedBytes += messageSize;
  424. ++Counters->ErrorsCount;
  425. return;
  426. }
  427. if (!message.Timestamp.Defined()) {
  428. message.Timestamp = TInstant::Now();
  429. }
  430. ++Counters->InflightMessages;
  431. Counters->InflightBytes += messageSize;
  432. {
  433. auto g = Guard(Lock);
  434. if (!Started) {
  435. DoStart();
  436. }
  437. if (CloseRequested) {
  438. g.Release();
  439. YLOG_ERR(Sprintf("session is closing, message dropped, [%zu] bytes", messageSize));
  440. --Counters->InflightMessages;
  441. Counters->InflightBytes -= messageSize;
  442. ++Counters->DroppedMessages;
  443. Counters->DroppedBytes += messageSize;
  444. ++Counters->ErrorsCount;
  445. return;
  446. }
  447. if (InflightBytes.load() + messageSize > MaxInflightBytes) {
  448. g.Release();
  449. YLOG_ERR(Sprintf("max inflight of [%zu] bytes reached, [%zu] bytes dropped",
  450. MaxInflightBytes, messageSize));
  451. --Counters->InflightMessages;
  452. Counters->InflightBytes -= messageSize;
  453. ++Counters->DroppedMessages;
  454. Counters->DroppedBytes += messageSize;
  455. ++Counters->ErrorsCount;
  456. return;
  457. }
  458. InflightBytes.fetch_add(messageSize);
  459. EventsBatch.push_back(TMessageReceivedEvent{std::move(message), messageSize});
  460. EventsBatchSize += messageSize;
  461. if ((PollingStatus == EPollingStatus::Inactive ||
  462. EventsBatchSize >= Client->GetParameters().GrpcMaxMessageSize) &&
  463. !EventNotificationTriggered)
  464. {
  465. EventNotificationTriggered = true;
  466. EventNotification->Trigger();
  467. }
  468. }
  469. }
  470. TFuture<void> TClientSession::CloseAsync(TInstant deadline) {
  471. YLOG_DEBUG(Sprintf("close, deadline [%s]", ToString(deadline).c_str()));
  472. if (!ClosePromise.GetFuture().HasValue()) {
  473. with_lock(Lock) {
  474. if (!Started) {
  475. return MakeFuture();
  476. }
  477. CloseRequested = true;
  478. EventsBatch.push_back(TCloseRequestedEvent{deadline});
  479. if (!EventNotificationTriggered) {
  480. EventNotificationTriggered = true;
  481. EventNotification->Trigger();
  482. }
  483. }
  484. }
  485. return ClosePromise.GetFuture();
  486. }
  487. void TClientSession::BeginClose(TInstant deadline) {
  488. if (Closed) {
  489. return;
  490. }
  491. if (!CloseStarted) {
  492. CloseStarted = true;
  493. YLOG_DEBUG("close started");
  494. }
  495. const auto force = deadline == TInstant::Zero();
  496. if (force && !ForcedCloseStarted) {
  497. ForcedCloseStarted = true;
  498. YLOG_INFO("forced close started");
  499. }
  500. if (!ActiveGrpcCall && (ForcedCloseStarted || WriteQueue.empty())) {
  501. DoClose();
  502. } else {
  503. if (!force) {
  504. ForceCloseTimer->Set(deadline);
  505. }
  506. if (ActiveGrpcCall) {
  507. ActiveGrpcCall->BeginClose(ForcedCloseStarted);
  508. }
  509. }
  510. }
  511. void TClientSession::Poll() {
  512. if (ForkInProgressLocal) {
  513. return;
  514. }
  515. const auto now = Now();
  516. const auto sendDelay = Client->GetParameters().GrpcSendDelay;
  517. const auto oldPollingStatus = PollingStatus;
  518. {
  519. if (!Lock.TryAcquire()) {
  520. TSpinWait sw;
  521. while (Lock.IsLocked() || !Lock.TryAcquire()) {
  522. if (ForkInProgress.load()) {
  523. YLOG_INFO("poller 'fork in progress' signal received, stopping session");
  524. ForkInProgressLocal = true;
  525. if (!ActiveGrpcCall || !ActiveGrpcCall->Initialized()) {
  526. BeginClose(TInstant::Max());
  527. } else if (ActiveGrpcCall->ReuseSessions()) {
  528. ActiveGrpcCall->Poison();
  529. BeginClose(TInstant::Max());
  530. } else {
  531. BeginClose(TInstant::Zero());
  532. }
  533. return;
  534. }
  535. sw.Sleep();
  536. }
  537. }
  538. if (!EventsBatch.empty()) {
  539. DoSwap(EventsBatch, SecondaryEventsBatch);
  540. EventsBatchSize = 0;
  541. PollerLastEventTimestamp = now;
  542. }
  543. const auto needNextPollStep = sendDelay != TDuration::Zero() &&
  544. !CloseRequested &&
  545. (now - PollerLastEventTimestamp) < 10 * sendDelay;
  546. PollingStatus = needNextPollStep ? EPollingStatus::Active : EPollingStatus::Inactive;
  547. EventNotificationTriggered = false;
  548. Lock.Release();
  549. }
  550. if (PollingStatus == EPollingStatus::Active) {
  551. PollTimer->Set(now + sendDelay);
  552. }
  553. if (PollingStatus != oldPollingStatus) {
  554. YLOG_DEBUG(Sprintf("poller %s", PollingStatus == EPollingStatus::Active ? "started" : "stopped"));
  555. }
  556. if (auto& batch = SecondaryEventsBatch; !batch.empty()) {
  557. auto closeIt = FindIf(batch, [](const auto& e) {
  558. return std::holds_alternative<TCloseRequestedEvent>(e);
  559. });
  560. if (auto it = begin(batch); it != closeIt) {
  561. Y_ABORT_UNLESS(!CloseStarted);
  562. do {
  563. auto& e = std::get<TMessageReceivedEvent>(*it++);
  564. WriteQueue.push_back({std::move(e.Message), e.Size, false});
  565. } while (it != closeIt);
  566. if (ActiveGrpcCall) {
  567. ActiveGrpcCall->NotifyMessageAdded();
  568. }
  569. }
  570. for (auto endIt = end(batch); closeIt != endIt; ++closeIt) {
  571. const auto& e = std::get<TCloseRequestedEvent>(*closeIt);
  572. BeginClose(e.Deadline);
  573. }
  574. batch.clear();
  575. }
  576. };
  577. void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) {
  578. auto& initializeMessage = *target.MutableInitialize();
  579. if (SessionId.Defined()) {
  580. initializeMessage.SetSessionId(*SessionId);
  581. }
  582. if (Client->GetParameters().SharedSecretKey.Defined()) {
  583. initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey);
  584. }
  585. if (Meta.Defined()) {
  586. for (const auto& p: *Meta) {
  587. AddMeta(initializeMessage, p.first, p.second);
  588. }
  589. }
  590. if (!Meta.Defined() || Meta->find("_reusable") == Meta->end()) {
  591. AddMeta(initializeMessage, "_reusable", "true");
  592. }
  593. }
  594. TClientSession::TRequestBuilder::TRequestBuilder(NUnifiedAgentProto::Request& target, size_t RequestPayloadLimitBytes,
  595. TFMaybe<size_t> serializedRequestLimitBytes)
  596. : Target(target)
  597. , PwTarget(MakeFMaybe<NPW::TRequest>())
  598. , MetaItems()
  599. , RequestPayloadSize(0)
  600. , RequestPayloadLimitBytes(RequestPayloadLimitBytes)
  601. , SerializedRequestSize(0)
  602. , SerializedRequestLimitBytes(serializedRequestLimitBytes)
  603. , CountersInvalid(false)
  604. {
  605. }
  606. void TClientSession::TRequestBuilder::ResetCounters() {
  607. RequestPayloadSize = 0;
  608. SerializedRequestSize = 0;
  609. PwTarget.Clear();
  610. PwTarget.ConstructInPlace();
  611. CountersInvalid = false;
  612. }
  613. TClientSession::TRequestBuilder::TAddResult TClientSession::TRequestBuilder::TryAddMessage(
  614. const TPendingMessage& message, size_t seqNo) {
  615. Y_ABORT_UNLESS(!CountersInvalid);
  616. {
  617. // add item to pwRequest to increase calculated size
  618. PwTarget->DataBatch.SeqNo.Add(seqNo);
  619. PwTarget->DataBatch.Timestamp.Add(message.Message.Timestamp->MicroSeconds());
  620. PwTarget->DataBatch.Payload.Add().SetValue(message.Message.Payload);
  621. if (message.Message.Meta.Defined()) {
  622. for (const auto &m: *message.Message.Meta) {
  623. TMetaItemBuilder *metaItemBuilder = nullptr;
  624. {
  625. auto it = MetaItems.find(m.first);
  626. if (it == MetaItems.end()) {
  627. PwTarget->DataBatch.Meta.Add().Key.SetValue(m.first);
  628. } else {
  629. metaItemBuilder = &it->second;
  630. }
  631. }
  632. size_t metaItemIdx = (metaItemBuilder != nullptr) ? metaItemBuilder->ItemIndex :
  633. PwTarget->DataBatch.Meta.GetSize() - 1;
  634. auto &pwMetaItem = PwTarget->DataBatch.Meta.Get(metaItemIdx);
  635. pwMetaItem.Value.Add().SetValue(m.second);
  636. const auto index = Target.GetDataBatch().SeqNoSize();
  637. if ((metaItemBuilder != nullptr && metaItemBuilder->ValueIndex != index) ||
  638. (metaItemBuilder == nullptr && index != 0)) {
  639. const auto valueIdx = (metaItemBuilder) ? metaItemBuilder->ValueIndex : 0;
  640. pwMetaItem.SkipStart.Add(valueIdx);
  641. pwMetaItem.SkipLength.Add(index - valueIdx);
  642. }
  643. }
  644. }
  645. }
  646. const auto newSerializedRequestSize = PwTarget->ByteSizeLong();
  647. const auto newPayloadSize = RequestPayloadSize + message.Size;
  648. if ((SerializedRequestLimitBytes.Defined() && newSerializedRequestSize > *SerializedRequestLimitBytes) ||
  649. newPayloadSize > RequestPayloadLimitBytes) {
  650. CountersInvalid = true;
  651. return {true, newPayloadSize, newSerializedRequestSize};
  652. }
  653. {
  654. // add item to the real request
  655. auto& batch = *Target.MutableDataBatch();
  656. batch.AddSeqNo(seqNo);
  657. batch.AddTimestamp(message.Message.Timestamp->MicroSeconds());
  658. batch.AddPayload(message.Message.Payload);
  659. if (message.Message.Meta.Defined()) {
  660. for (const auto &m: *message.Message.Meta) {
  661. TMetaItemBuilder *metaItemBuilder;
  662. {
  663. auto it = MetaItems.find(m.first);
  664. if (it == MetaItems.end()) {
  665. batch.AddMeta()->SetKey(m.first);
  666. auto insertResult = MetaItems.insert({m.first, {batch.MetaSize() - 1}});
  667. Y_ABORT_UNLESS(insertResult.second);
  668. metaItemBuilder = &insertResult.first->second;
  669. } else {
  670. metaItemBuilder = &it->second;
  671. }
  672. }
  673. auto *metaItem = batch.MutableMeta(metaItemBuilder->ItemIndex);
  674. metaItem->AddValue(m.second);
  675. const auto index = batch.SeqNoSize() - 1;
  676. if (metaItemBuilder->ValueIndex != index) {
  677. metaItem->AddSkipStart(metaItemBuilder->ValueIndex);
  678. metaItem->AddSkipLength(index - metaItemBuilder->ValueIndex);
  679. }
  680. metaItemBuilder->ValueIndex = index + 1;
  681. }
  682. }
  683. SerializedRequestSize = newSerializedRequestSize;
  684. RequestPayloadSize = newPayloadSize;
  685. }
  686. return {false, newPayloadSize, newSerializedRequestSize};
  687. }
  688. void TClientSession::PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target) {
  689. Y_ABORT_UNLESS(AckSeqNo.Defined());
  690. TRequestBuilder requestBuilder(target, Client->GetParameters().GrpcMaxMessageSize, AgentMaxReceiveMessage);
  691. const auto startIndex = NextIndex - TrimmedCount;
  692. for (size_t i = startIndex; i < WriteQueue.size(); ++i) {
  693. auto& queueItem = WriteQueue[i];
  694. if (queueItem.Skipped) {
  695. NextIndex++;
  696. continue;
  697. }
  698. const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1);
  699. const size_t serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0;
  700. if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) {
  701. YLOG_ERR(Sprintf("single serialized message is too large [%zu] > [%zu], dropping it",
  702. addResult.NewSerializedRequestSize, serializedLimitToLog));
  703. queueItem.Skipped = true;
  704. ++Counters->DroppedMessages;
  705. Counters->DroppedBytes += queueItem.Size;
  706. ++Counters->ErrorsCount;
  707. NextIndex++;
  708. requestBuilder.ResetCounters();
  709. continue;
  710. }
  711. if (addResult.LimitExceeded) {
  712. YLOG_DEBUG(Sprintf(
  713. "batch limit exceeded: [%zu] > [%zu] (limit for serialized batch)"
  714. "OR [%zu] > [%zu] (limit for raw batch)",
  715. addResult.NewSerializedRequestSize, serializedLimitToLog,
  716. addResult.NewRequestPayloadSize, Client->GetParameters().GrpcMaxMessageSize));
  717. break;
  718. }
  719. NextIndex++;
  720. }
  721. const auto messagesCount = target.GetDataBatch().SeqNoSize();
  722. if (messagesCount == 0) {
  723. return;
  724. }
  725. Y_ABORT_UNLESS(requestBuilder.GetSerializedRequestSize() == target.ByteSizeLong(),
  726. "failed to calculate size for message [%s]", target.ShortDebugString().c_str());
  727. GrpcInflightMessages += messagesCount;
  728. GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
  729. YLOG_DEBUG(Sprintf("new write batch, [%zu] messages, [%zu] bytes, first seq_no [%" PRIu64 "], serialized size [%zu]",
  730. messagesCount, requestBuilder.GetRequestPayloadSize(),
  731. *target.GetDataBatch().GetSeqNo().begin(), requestBuilder.GetSerializedRequestSize()));
  732. ++Counters->GrpcWriteBatchRequests;
  733. Counters->GrpcInflightMessages += messagesCount;
  734. Counters->GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
  735. }
  736. void TClientSession::Acknowledge(ui64 seqNo) {
  737. size_t messagesCount = 0;
  738. size_t bytesCount = 0;
  739. size_t skippedMessagesCount = 0;
  740. size_t skippedBytesCount = 0;
  741. if (AckSeqNo.Defined()) {
  742. while (!WriteQueue.empty() && ((*AckSeqNo < seqNo) || WriteQueue.front().Skipped)) {
  743. if (WriteQueue.front().Skipped) {
  744. skippedMessagesCount++;
  745. skippedBytesCount += WriteQueue.front().Size;
  746. } else {
  747. ++messagesCount;
  748. bytesCount += WriteQueue.front().Size;
  749. }
  750. ++(*AckSeqNo);
  751. WriteQueue.pop_front();
  752. ++TrimmedCount;
  753. }
  754. }
  755. if (!AckSeqNo.Defined() || seqNo > *AckSeqNo) {
  756. AckSeqNo = seqNo;
  757. }
  758. Counters->AcknowledgedMessages += messagesCount;
  759. Counters->AcknowledgedBytes += bytesCount;
  760. Counters->InflightMessages -= (messagesCount + skippedMessagesCount);
  761. Counters->InflightBytes -= (bytesCount + skippedBytesCount);
  762. InflightBytes.fetch_sub(bytesCount);
  763. Counters->GrpcInflightMessages -= messagesCount;
  764. Counters->GrpcInflightBytes -= bytesCount;
  765. GrpcInflightMessages -= messagesCount;
  766. GrpcInflightBytes -= bytesCount;
  767. YLOG_DEBUG(Sprintf("ack [%" PRIu64 "], [%zu] messages, [%zu] bytes", seqNo, messagesCount, bytesCount));
  768. }
  769. void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) {
  770. SessionId = sessionId;
  771. Acknowledge(lastSeqNo);
  772. NextIndex = TrimmedCount;
  773. ++Counters->GrpcCallsInitialized;
  774. Counters->GrpcInflightMessages -= GrpcInflightMessages;
  775. Counters->GrpcInflightBytes -= GrpcInflightBytes;
  776. GrpcInflightMessages = 0;
  777. GrpcInflightBytes = 0;
  778. YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%" PRIu64 "]",
  779. sessionId.c_str(), lastSeqNo));
  780. }
  781. void TClientSession::OnGrpcCallFinished() {
  782. Y_ABORT_UNLESS(!Closed);
  783. Y_ABORT_UNLESS(ActiveGrpcCall);
  784. ActiveGrpcCall = nullptr;
  785. if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) {
  786. DoClose();
  787. } else {
  788. const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay;
  789. MakeGrpcCallTimer->Set(reconnectTime);
  790. YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str()));
  791. }
  792. }
  793. auto TClientSession::PurgeWriteQueue() -> TPurgeWriteQueueStats {
  794. size_t bytesCount = 0;
  795. for (const auto& m: WriteQueue) {
  796. bytesCount += m.Size;
  797. }
  798. auto result = TPurgeWriteQueueStats{WriteQueue.size(), bytesCount};
  799. Counters->DroppedMessages += WriteQueue.size();
  800. Counters->DroppedBytes += bytesCount;
  801. Counters->InflightMessages -= WriteQueue.size();
  802. Counters->InflightBytes -= bytesCount;
  803. Counters->GrpcInflightMessages -= GrpcInflightMessages;
  804. Counters->GrpcInflightBytes -= GrpcInflightBytes;
  805. InflightBytes.fetch_sub(bytesCount);
  806. GrpcInflightMessages = 0;
  807. GrpcInflightBytes = 0;
  808. WriteQueue.clear();
  809. return result;
  810. }
  811. void TClientSession::DoClose() {
  812. Y_ABORT_UNLESS(CloseStarted);
  813. Y_ABORT_UNLESS(!Closed);
  814. Y_ABORT_UNLESS(!ClosePromise.HasValue());
  815. MakeGrpcCallTimer->Cancel();
  816. ForceCloseTimer->Cancel();
  817. PollTimer->Cancel();
  818. if (!ForkInProgressLocal && WriteQueue.size() > 0) {
  819. const auto stats = PurgeWriteQueue();
  820. ++Counters->ErrorsCount;
  821. YLOG_ERR(Sprintf("DoClose, dropped [%zu] messages, [%zu] bytes",
  822. stats.PurgedMessages, stats.PurgedBytes));
  823. }
  824. --Client->GetCounters()->ActiveSessionsCount;
  825. Closed = true;
  826. ClosePromise.SetValue();
  827. YLOG_DEBUG("session closed");
  828. }
  829. TGrpcCall::TGrpcCall(TClientSession& session)
  830. : Session(session)
  831. , AsyncJoinerToken(&Session.GetAsyncJoiner())
  832. , AcceptTag(MakeIOCallback(this, &TGrpcCall::EndAccept))
  833. , ReadTag(MakeIOCallback(this, &TGrpcCall::EndRead))
  834. , WriteTag(MakeIOCallback(this, &TGrpcCall::EndWrite))
  835. , WritesDoneTag(MakeIOCallback(this, &TGrpcCall::EndWritesDone))
  836. , FinishTag(MakeIOCallback(this, &TGrpcCall::EndFinish))
  837. , Logger(session.GetLogger().Child("grpc"))
  838. , AcceptPending(false)
  839. , Initialized_(false)
  840. , ReadPending(false)
  841. , ReadsDone(false)
  842. , WritePending(false)
  843. , WritesBlocked(false)
  844. , WritesDonePending(false)
  845. , WritesDone(false)
  846. , ErrorOccured(false)
  847. , FinishRequested(false)
  848. , FinishStarted(false)
  849. , FinishDone(false)
  850. , Cancelled(false)
  851. , Poisoned(false)
  852. , PoisonPillSent(false)
  853. , ReuseSessions_(false)
  854. , FinishStatus()
  855. , ClientContext()
  856. , ReaderWriter(nullptr)
  857. , Request()
  858. , Response()
  859. {
  860. }
  861. void TGrpcCall::Start() {
  862. AcceptPending = true;
  863. auto& client = Session.GetClient();
  864. ReaderWriter = client.GetStub().AsyncSession(&ClientContext,
  865. &client.GetCompletionQueue(),
  866. AcceptTag->Ref());
  867. YLOG_DEBUG("AsyncSession started");
  868. }
  869. TGrpcCall::~TGrpcCall() {
  870. YLOG_DEBUG("destroyed");
  871. }
  872. void TGrpcCall::EnsureFinishStarted() {
  873. if (!FinishStarted) {
  874. FinishStarted = true;
  875. ReaderWriter->Finish(&FinishStatus, FinishTag->Ref());
  876. YLOG_DEBUG("Finish started");
  877. }
  878. }
  879. bool TGrpcCall::CheckHasError(EIOStatus status, const char* method) {
  880. if (status == EIOStatus::Error) {
  881. SetError(Sprintf("%s %s", method, ToString(status).c_str()));
  882. return true;
  883. }
  884. if (ErrorOccured) {
  885. ScheduleFinishOnError();
  886. return true;
  887. }
  888. return false;
  889. }
  890. void TGrpcCall::SetError(const TString& error) {
  891. if (!Cancelled) {
  892. YLOG_ERR(error);
  893. ++Session.GetCounters().ErrorsCount;
  894. }
  895. ErrorOccured = true;
  896. ScheduleFinishOnError();
  897. }
  898. void TGrpcCall::ScheduleFinishOnError() {
  899. if (!AcceptPending && !WritePending && !WritesDonePending) {
  900. EnsureFinishStarted();
  901. }
  902. }
  903. void TGrpcCall::BeginClose(bool force) {
  904. if (force) {
  905. if (!Cancelled) {
  906. Cancelled = true;
  907. ClientContext.TryCancel();
  908. SetError("forced close");
  909. }
  910. return;
  911. }
  912. YLOG_DEBUG(Sprintf("Close Initialized [%d], AcceptPending [%d], "
  913. "WritePending [%d], FinishRequested [%d], "
  914. "ErrorOccured [%d]",
  915. static_cast<int>(Initialized_),
  916. static_cast<int>(AcceptPending),
  917. static_cast<int>(WritePending),
  918. static_cast<int>(FinishRequested),
  919. static_cast<int>(ErrorOccured)));
  920. if (ErrorOccured || FinishRequested) {
  921. return;
  922. }
  923. FinishRequested = true;
  924. if (!Initialized_ || WritePending) {
  925. return;
  926. }
  927. WritesBlocked = true;
  928. BeginWritesDone();
  929. }
  930. void TGrpcCall::Poison() {
  931. Poisoned = true;
  932. NotifyMessageAdded();
  933. }
  934. void TGrpcCall::NotifyMessageAdded() {
  935. if (WritePending || !Initialized_ || ErrorOccured || FinishRequested) {
  936. return;
  937. }
  938. ScheduleWrite();
  939. }
  940. void TGrpcCall::ScheduleWrite() {
  941. Request.Clear();
  942. if (!Poisoned) {
  943. Session.PrepareWriteBatchRequest(Request);
  944. } else if (!PoisonPillSent) {
  945. PoisonPillSent = true;
  946. auto& batch = *Request.mutable_data_batch();
  947. batch.AddSeqNo(std::numeric_limits<::google::protobuf::uint64>::max());
  948. batch.AddTimestamp(Now().MicroSeconds());
  949. batch.AddPayload("");
  950. YLOG_INFO("poison pill sent");
  951. }
  952. if (Request.GetDataBatch().GetSeqNo().empty()) {
  953. if (FinishRequested) {
  954. WritesBlocked = true;
  955. BeginWritesDone();
  956. }
  957. return;
  958. }
  959. BeginWrite();
  960. }
  961. void TGrpcCall::EndAccept(EIOStatus status) {
  962. Y_ABORT_UNLESS(AcceptPending);
  963. AcceptPending = false;
  964. if (CheckHasError(status, "EndAccept")) {
  965. return;
  966. }
  967. BeginRead();
  968. Request.Clear();
  969. Session.PrepareInitializeRequest(Request);
  970. BeginWrite();
  971. }
  972. void TGrpcCall::EndRead(EIOStatus status) {
  973. ReadPending = false;
  974. if (FinishDone) {
  975. Session.OnGrpcCallFinished();
  976. return;
  977. }
  978. if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) {
  979. Y_ABORT_UNLESS(!WritePending);
  980. YLOG_DEBUG("EndRead ReadsDone");
  981. ReadsDone = true;
  982. if (WritesDone) {
  983. EnsureFinishStarted();
  984. return;
  985. }
  986. return;
  987. }
  988. if (CheckHasError(status, "EndRead")) {
  989. return;
  990. }
  991. if (!Initialized_) {
  992. const auto metadata = ClientContext.GetServerInitialMetadata();
  993. {
  994. const auto it = metadata.find("ua-reuse-sessions");
  995. if (it != metadata.end() && it->second == "true") {
  996. ReuseSessions_ = true;
  997. }
  998. }
  999. {
  1000. const auto it = metadata.find("ua-max-receive-message-size");
  1001. if (it != metadata.end()) {
  1002. Session.SetAgentMaxReceiveMessage(FromString<size_t>(TString{it->second.begin(), it->second.end()}));
  1003. }
  1004. }
  1005. if (Response.response_case() != NUnifiedAgentProto::Response::kInitialized) {
  1006. SetError(Sprintf("EndRead while initializing, unexpected response_case [%d]",
  1007. static_cast<int>(Response.response_case())));
  1008. return;
  1009. }
  1010. Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(),
  1011. Response.GetInitialized().GetLastSeqNo());
  1012. Initialized_ = true;
  1013. if (!WritePending) {
  1014. ScheduleWrite();
  1015. }
  1016. } else {
  1017. if (Response.response_case() != NUnifiedAgentProto::Response::kAck) {
  1018. SetError(Sprintf("EndRead unexpected response_case [%d]",
  1019. static_cast<int>(Response.response_case())));
  1020. return;
  1021. }
  1022. Session.Acknowledge(Response.GetAck().GetSeqNo());
  1023. }
  1024. BeginRead();
  1025. }
  1026. void TGrpcCall::EndWrite(EIOStatus status) {
  1027. WritePending = false;
  1028. if (CheckHasError(status, "EndWrite")) {
  1029. return;
  1030. }
  1031. if (!Initialized_) {
  1032. return;
  1033. }
  1034. ScheduleWrite();
  1035. }
  1036. void TGrpcCall::EndFinish(EIOStatus status) {
  1037. FinishDone = true;
  1038. const auto finishStatus = status == EIOStatus::Error
  1039. ? grpc::Status(grpc::UNKNOWN, "finish error")
  1040. : FinishStatus;
  1041. YLOG(finishStatus.ok() || Cancelled || Poisoned ? TLOG_DEBUG : TLOG_ERR,
  1042. Sprintf("EndFinish, code [%s], message [%s]",
  1043. ToString(finishStatus.error_code()).c_str(),
  1044. finishStatus.error_message().c_str()),
  1045. Logger);
  1046. if (!finishStatus.ok() && !Cancelled) {
  1047. ++Session.GetCounters().ErrorsCount;
  1048. }
  1049. if (!ReadPending) {
  1050. Session.OnGrpcCallFinished();
  1051. }
  1052. }
  1053. void TGrpcCall::EndWritesDone(EIOStatus status) {
  1054. YLOG_DEBUG(Sprintf("EndWritesDone [%s]", ToString(status).c_str()));
  1055. Y_ABORT_UNLESS(!WritePending && !WritesDone && WritesDonePending);
  1056. WritesDonePending = false;
  1057. WritesDone = true;
  1058. if (CheckHasError(status, "EndWriteDone")) {
  1059. return;
  1060. }
  1061. if (ReadsDone) {
  1062. EnsureFinishStarted();
  1063. }
  1064. }
  1065. void TGrpcCall::BeginWritesDone() {
  1066. WritesDonePending = true;
  1067. ReaderWriter->WritesDone(WritesDoneTag->Ref());
  1068. YLOG_DEBUG("WritesDone started");
  1069. }
  1070. void TGrpcCall::BeginRead() {
  1071. ReadPending = true;
  1072. Response.Clear();
  1073. ReaderWriter->Read(&Response, ReadTag->Ref());
  1074. YLOG_DEBUG("Read started");
  1075. }
  1076. void TGrpcCall::BeginWrite() {
  1077. WritePending = true;
  1078. ReaderWriter->Write(Request, WriteTag->Ref());
  1079. YLOG_DEBUG("Write started");
  1080. }
  1081. }
  1082. namespace NUnifiedAgent {
  1083. size_t SizeOf(const TClientMessage& message) {
  1084. auto result = message.Payload.Size() + sizeof(TInstant);
  1085. if (message.Meta.Defined()) {
  1086. for (const auto& m: *message.Meta) {
  1087. result += m.first.Size() + m.second.Size();
  1088. }
  1089. }
  1090. return result;
  1091. }
  1092. TClientParameters::TClientParameters(const TString& uri)
  1093. : Uri(uri)
  1094. , SharedSecretKey(Nothing())
  1095. , MaxInflightBytes(DefaultMaxInflightBytes)
  1096. , Log(TLoggerOperator<TGlobalLog>::Log())
  1097. , LogRateLimitBytes(Nothing())
  1098. , GrpcReconnectDelay(TDuration::MilliSeconds(50))
  1099. , GrpcSendDelay(DefaultGrpcSendDelay)
  1100. , EnableForkSupport(false)
  1101. , GrpcMaxMessageSize(DefaultGrpcMaxMessageSize)
  1102. , Counters(nullptr)
  1103. {
  1104. }
  1105. TSessionParameters::TSessionParameters()
  1106. : SessionId(Nothing())
  1107. , Meta(Nothing())
  1108. , Counters(nullptr)
  1109. , MaxInflightBytes()
  1110. {
  1111. }
  1112. const size_t TClientParameters::DefaultMaxInflightBytes = 10_MB;
  1113. const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB;
  1114. const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10);
  1115. TClientPtr MakeClient(const TClientParameters& parameters) {
  1116. // Initialization of the Fork in newest grcp core is performed
  1117. // in 'do_basic_init', which is called inside 'grpc_is_initialized'.
  1118. // So the set of the fork env variable has to be done before
  1119. // grpc_is_initialized call.
  1120. #ifdef _unix_
  1121. if (parameters.EnableForkSupport) {
  1122. SetEnv("GRPC_ENABLE_FORK_SUPPORT", "true");
  1123. }
  1124. #endif
  1125. if (!grpc_is_initialized()) {
  1126. EnsureGrpcConfigured();
  1127. }
  1128. std::shared_ptr<NPrivate::TForkProtector> forkProtector{};
  1129. #ifdef _unix_
  1130. if (parameters.EnableForkSupport) {
  1131. forkProtector = NPrivate::TForkProtector::Get(true);
  1132. }
  1133. #endif
  1134. return MakeIntrusive<NPrivate::TClient>(parameters, forkProtector);
  1135. }
  1136. }