client_impl.cpp 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274
  1. #include "client_impl.h"
  2. #include "helpers.h"
  3. #include <contrib/libs/grpc/include/grpc/grpc.h>
  4. #include <contrib/libs/grpc/src/core/lib/gpr/string.h>
  5. #include <contrib/libs/grpc/src/core/lib/gprpp/fork.h>
  6. #include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
  7. #include <util/charset/utf8.h>
  8. #include <util/generic/size_literals.h>
  9. #include <util/system/env.h>
  10. using namespace NThreading;
  11. using namespace NMonitoring;
  12. namespace NUnifiedAgent::NPrivate {
  13. std::shared_ptr<grpc::Channel> CreateChannel(const grpc::string& target) {
  14. grpc::ChannelArguments args;
  15. args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE);
  16. args.SetMaxReceiveMessageSize(Max<int>());
  17. args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
  18. args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000);
  19. args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
  20. args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 200);
  21. args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
  22. args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
  23. args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
  24. args.SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
  25. args.SetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE, 1024*1024);
  26. return grpc::CreateCustomChannel(target, grpc::InsecureChannelCredentials(), args);
  27. }
  28. void AddMeta(NUnifiedAgentProto::Request_Initialize& init, const TString& name, const TString& value) {
  29. auto* metaItem = init.MutableMeta()->Add();
  30. metaItem->SetName(name);
  31. metaItem->SetValue(value);
  32. }
  33. std::atomic<ui64> TClient::Id{0};
  34. TClient::TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector)
  35. : Parameters(parameters)
  36. , ForkProtector(forkProtector)
  37. , Counters(parameters.Counters ? parameters.Counters : MakeIntrusive<TClientCounters>())
  38. , Log(parameters.Log)
  39. , MainLogger(Log, MakeFMaybe(Parameters.LogRateLimitBytes))
  40. , Logger(MainLogger.Child(Sprintf("ua_%lu", Id.fetch_add(1))))
  41. , Channel(nullptr)
  42. , Stub(nullptr)
  43. , ActiveCompletionQueue(nullptr)
  44. , SessionLogLabel(0)
  45. , ActiveSessions()
  46. , Started(false)
  47. , Destroyed(false)
  48. , Lock()
  49. {
  50. MainLogger.SetDroppedBytesCounter(&Counters->ClientLogDroppedBytes);
  51. if (ForkProtector != nullptr) {
  52. ForkProtector->Register(*this);
  53. }
  54. EnsureStarted();
  55. YLOG_INFO(Sprintf("created, uri [%s]", Parameters.Uri.c_str()));
  56. }
  57. TClient::~TClient() {
  58. with_lock(Lock) {
  59. Y_VERIFY(ActiveSessions.empty(), "active sessions found");
  60. EnsureStoppedNoLock();
  61. Destroyed = true;
  62. }
  63. if (ForkProtector != nullptr) {
  64. ForkProtector->Unregister(*this);
  65. }
  66. YLOG_INFO(Sprintf("destroyed, uri [%s]", Parameters.Uri.c_str()));
  67. }
  68. TClientSessionPtr TClient::CreateSession(const TSessionParameters& parameters) {
  69. return MakeIntrusive<TClientSession>(this, parameters);
  70. }
  71. void TClient::StartTracing(ELogPriority logPriority) {
  72. MainLogger.StartTracing(logPriority);
  73. StartGrpcTracing();
  74. YLOG_INFO("tracing started");
  75. }
  76. void TClient::FinishTracing() {
  77. FinishGrpcTracing();
  78. MainLogger.FinishTracing();
  79. YLOG_INFO("tracing finished");
  80. }
  81. void TClient::RegisterSession(TClientSession* session) {
  82. with_lock(Lock) {
  83. ActiveSessions.push_back(session);
  84. }
  85. }
  86. void TClient::UnregisterSession(TClientSession* session) {
  87. with_lock(Lock) {
  88. const auto it = Find(ActiveSessions, session);
  89. Y_VERIFY(it != ActiveSessions.end());
  90. ActiveSessions.erase(it);
  91. }
  92. }
  93. void TClient::PreFork() {
  94. YLOG_INFO("pre fork started");
  95. Lock.Acquire();
  96. auto futures = TVector<TFuture<void>>(Reserve(ActiveSessions.size()));
  97. for (auto* s: ActiveSessions) {
  98. futures.push_back(s->PreFork());
  99. }
  100. YLOG_INFO("waiting for sessions");
  101. WaitAll(futures).Wait();
  102. EnsureStoppedNoLock();
  103. YLOG_INFO("shutdown grpc executor");
  104. grpc_core::Executor::SetThreadingAll(false);
  105. YLOG_INFO("pre fork finished");
  106. }
  107. void TClient::PostForkParent() {
  108. YLOG_INFO("post fork parent started");
  109. if (!Destroyed) {
  110. EnsureStartedNoLock();
  111. }
  112. Lock.Release();
  113. for (auto* s: ActiveSessions) {
  114. s->PostForkParent();
  115. }
  116. YLOG_INFO("post fork parent finished");
  117. }
  118. void TClient::PostForkChild() {
  119. YLOG_INFO("post fork child started");
  120. Lock.Release();
  121. for (auto* s: ActiveSessions) {
  122. s->PostForkChild();
  123. }
  124. YLOG_INFO("post fork child finished");
  125. }
  126. void TClient::EnsureStarted() {
  127. with_lock(Lock) {
  128. EnsureStartedNoLock();
  129. }
  130. }
  131. void TClient::EnsureStartedNoLock() {
  132. // Lock must be held
  133. if (Started) {
  134. return;
  135. }
  136. Channel = CreateChannel(Parameters.Uri);
  137. Stub = NUnifiedAgentProto::UnifiedAgentService::NewStub(Channel);
  138. ActiveCompletionQueue = MakeHolder<TGrpcCompletionQueueHost>();
  139. ActiveCompletionQueue->Start();
  140. Started = true;
  141. }
  142. void TClient::EnsureStoppedNoLock() {
  143. // Lock must be held
  144. if (!Started) {
  145. return;
  146. }
  147. YLOG_INFO("stopping");
  148. ActiveCompletionQueue->Stop();
  149. ActiveCompletionQueue = nullptr;
  150. Stub = nullptr;
  151. Channel = nullptr;
  152. YLOG_INFO("stopped");
  153. Started = false;
  154. }
  155. TScopeLogger TClient::CreateSessionLogger() {
  156. return Logger.Child(ToString(SessionLogLabel.fetch_add(1)));
  157. }
  158. TForkProtector::TForkProtector()
  159. : Clients()
  160. , GrpcInitializer()
  161. , Enabled(grpc_core::Fork::Enabled())
  162. , Lock()
  163. {
  164. }
  165. void TForkProtector::Register(TClient& client) {
  166. if (!Enabled) {
  167. return;
  168. }
  169. Y_VERIFY(grpc_is_initialized());
  170. Y_VERIFY(grpc_core::Fork::Enabled());
  171. with_lock(Lock) {
  172. Clients.push_back(&client);
  173. }
  174. }
  175. void TForkProtector::Unregister(TClient& client) {
  176. if (!Enabled) {
  177. return;
  178. }
  179. with_lock(Lock) {
  180. const auto it = Find(Clients, &client);
  181. Y_VERIFY(it != Clients.end());
  182. Clients.erase(it);
  183. }
  184. }
  185. std::shared_ptr<TForkProtector> TForkProtector::Get(bool createIfNotExists) {
  186. with_lock(InstanceLock) {
  187. auto result = Instance.lock();
  188. if (!result && createIfNotExists) {
  189. SetEnv("GRPC_ENABLE_FORK_SUPPORT", "true");
  190. result = std::make_shared<TForkProtector>();
  191. if (!result->Enabled) {
  192. TLog log("cerr");
  193. TLogger logger(log, Nothing());
  194. auto scopeLogger = logger.Child("ua client");
  195. YLOG(TLOG_WARNING,
  196. "Grpc is already initialized, can't enable fork support. "
  197. "If forks are possible, please set environment variable GRPC_ENABLE_FORK_SUPPORT to 'true'. "
  198. "If not, you can suppress this warning by setting EnableForkSupport "
  199. "to false when creating the ua client.",
  200. scopeLogger);
  201. } else if (!SubscribedToForks) {
  202. SubscribedToForks = true;
  203. #ifdef _unix_
  204. pthread_atfork(
  205. &TForkProtector::PreFork,
  206. &TForkProtector::PostForkParent,
  207. &TForkProtector::PostForkChild);
  208. #endif
  209. }
  210. Instance = result;
  211. }
  212. return result;
  213. }
  214. }
  215. void TForkProtector::PreFork() {
  216. auto self = Get(false);
  217. if (!self) {
  218. return;
  219. }
  220. self->Lock.Acquire();
  221. for (auto* c : self->Clients) {
  222. c->PreFork();
  223. }
  224. }
  225. void TForkProtector::PostForkParent() {
  226. auto self = Get(false);
  227. if (!self) {
  228. return;
  229. }
  230. for (auto* c : self->Clients) {
  231. c->PostForkParent();
  232. }
  233. self->Lock.Release();
  234. }
  235. void TForkProtector::PostForkChild() {
  236. auto self = Get(false);
  237. if (!self) {
  238. return;
  239. }
  240. for (auto* c : self->Clients) {
  241. c->PostForkChild();
  242. }
  243. self->Lock.Release();
  244. }
  245. std::weak_ptr<TForkProtector> TForkProtector::Instance{};
  246. TMutex TForkProtector::InstanceLock{};
  247. bool TForkProtector::SubscribedToForks{false};
  248. TClientSession::TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters)
  249. : AsyncJoiner()
  250. , Client(client)
  251. , OriginalSessionId(MakeFMaybe(parameters.SessionId))
  252. , SessionId(OriginalSessionId)
  253. , Meta(MakeFMaybe(parameters.Meta))
  254. , Logger(Client->CreateSessionLogger())
  255. , CloseStarted(false)
  256. , ForcedCloseStarted(false)
  257. , Closed(false)
  258. , ForkInProgressLocal(false)
  259. , Started(false)
  260. , ClosePromise()
  261. , ActiveGrpcCall(nullptr)
  262. , WriteQueue()
  263. , TrimmedCount(0)
  264. , NextIndex(0)
  265. , AckSeqNo(Nothing())
  266. , PollerLastEventTimestamp()
  267. , Counters(parameters.Counters ? parameters.Counters : Client->GetCounters()->GetDefaultSessionCounters())
  268. , MakeGrpcCallTimer(nullptr)
  269. , ForceCloseTimer(nullptr)
  270. , PollTimer(nullptr)
  271. , GrpcInflightMessages(0)
  272. , GrpcInflightBytes(0)
  273. , InflightBytes(0)
  274. , CloseRequested(false)
  275. , EventsBatchSize(0)
  276. , PollingStatus(EPollingStatus::Inactive)
  277. , EventNotification(nullptr)
  278. , EventNotificationTriggered(false)
  279. , EventsBatch()
  280. , SecondaryEventsBatch()
  281. , ForkInProgress(false)
  282. , Lock()
  283. , MaxInflightBytes(
  284. parameters.MaxInflightBytes.GetOrElse(Client->GetParameters().MaxInflightBytes))
  285. , AgentMaxReceiveMessage(Nothing()) {
  286. if (Meta.Defined() && !IsUtf8(*Meta)) {
  287. throw std::runtime_error("session meta contains non UTF-8 characters");
  288. }
  289. Y_ENSURE(!(Client->GetParameters().EnableForkSupport && SessionId.Defined()),
  290. "explicit session id is not supported with forks");
  291. Client->RegisterSession(this);
  292. with_lock(Lock) {
  293. DoStart();
  294. }
  295. }
  296. TFuture<void> TClientSession::PreFork() {
  297. YLOG_INFO("pre fork started");
  298. Lock.Acquire();
  299. YLOG_INFO("triggering event notification");
  300. if (!EventNotificationTriggered) {
  301. EventNotificationTriggered = true;
  302. EventNotification->Trigger();
  303. }
  304. YLOG_INFO("setting 'fork in progress' flag");
  305. ForkInProgress.store(true);
  306. if (!Started) {
  307. ClosePromise.TrySetValue();
  308. }
  309. YLOG_INFO("pre fork finished");
  310. return ClosePromise.GetFuture();
  311. }
  312. void TClientSession::PostForkParent() {
  313. YLOG_INFO("post fork parent started");
  314. ForkInProgress.store(false);
  315. ForkInProgressLocal = false;
  316. Started = false;
  317. if (!CloseRequested) {
  318. DoStart();
  319. YLOG_INFO("triggering event notification");
  320. EventNotificationTriggered = true;
  321. EventNotification->Trigger();
  322. }
  323. Lock.Release();
  324. YLOG_INFO("post fork parent finished");
  325. }
  326. void TClientSession::PostForkChild() {
  327. YLOG_INFO("post fork child started");
  328. ForkInProgress.store(false);
  329. ForkInProgressLocal = false;
  330. Started = false;
  331. SessionId.Clear();
  332. TrimmedCount = 0;
  333. NextIndex = 0;
  334. AckSeqNo.Clear();
  335. PurgeWriteQueue();
  336. EventsBatch.clear();
  337. SecondaryEventsBatch.clear();
  338. EventsBatchSize = 0;
  339. Lock.Release();
  340. YLOG_INFO("post fork child finished");
  341. }
  342. void TClientSession::SetAgentMaxReceiveMessage(size_t newValue) {
  343. AgentMaxReceiveMessage = newValue;
  344. }
  345. void TClientSession::DoStart() {
  346. // Lock must be held
  347. Y_VERIFY(!Started);
  348. YLOG_INFO("starting");
  349. Client->EnsureStarted();
  350. MakeGrpcCallTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
  351. MakeIOCallback([this](EIOStatus status) {
  352. if (status == EIOStatus::Error) {
  353. return;
  354. }
  355. MakeGrpcCall();
  356. }, &AsyncJoiner));
  357. ForceCloseTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
  358. MakeIOCallback([this](EIOStatus status) {
  359. if (status == EIOStatus::Error) {
  360. return;
  361. }
  362. YLOG_INFO("ForceCloseTimer");
  363. BeginClose(TInstant::Zero());
  364. }, &AsyncJoiner));
  365. PollTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
  366. MakeIOCallback([this](EIOStatus status) {
  367. if (status == EIOStatus::Error) {
  368. return;
  369. }
  370. Poll();
  371. }, &AsyncJoiner));
  372. EventNotification = MakeHolder<TGrpcNotification>(Client->GetCompletionQueue(),
  373. MakeIOCallback([this](EIOStatus status) {
  374. Y_VERIFY(status == EIOStatus::Ok);
  375. Poll();
  376. }, &AsyncJoiner));
  377. CloseStarted = false;
  378. ForcedCloseStarted = false;
  379. Closed = false;
  380. ClosePromise = NewPromise();
  381. EventNotificationTriggered = false;
  382. PollerLastEventTimestamp = Now();
  383. PollingStatus = EPollingStatus::Inactive;
  384. ++Client->GetCounters()->ActiveSessionsCount;
  385. MakeGrpcCallTimer->Set(Now());
  386. YLOG_INFO(Sprintf("started, sessionId [%s]", OriginalSessionId.GetOrElse("").c_str()));
  387. Started = true;
  388. }
  389. void TClientSession::MakeGrpcCall() {
  390. if (Closed) {
  391. YLOG_INFO("MakeGrpcCall, session already closed");
  392. return;
  393. }
  394. Y_VERIFY(!ForcedCloseStarted);
  395. Y_VERIFY(!ActiveGrpcCall);
  396. ActiveGrpcCall = MakeIntrusive<TGrpcCall>(*this);
  397. ActiveGrpcCall->Start();
  398. ++Counters->GrpcCalls;
  399. if (CloseStarted) {
  400. ActiveGrpcCall->BeginClose(false);
  401. }
  402. }
  403. TClientSession::~TClientSession() {
  404. Close(TInstant::Zero());
  405. AsyncJoiner.Join().Wait();
  406. Client->UnregisterSession(this);
  407. YLOG_INFO("destroyed");
  408. }
  409. void TClientSession::Send(TClientMessage&& message) {
  410. const auto messageSize = SizeOf(message);
  411. ++Counters->ReceivedMessages;
  412. Counters->ReceivedBytes += messageSize;
  413. if (messageSize > Client->GetParameters().GrpcMaxMessageSize) {
  414. YLOG_ERR(Sprintf("message size [%lu] is greater than max grpc message size [%lu], message dropped",
  415. messageSize, Client->GetParameters().GrpcMaxMessageSize));
  416. ++Counters->DroppedMessages;
  417. Counters->DroppedBytes += messageSize;
  418. ++Counters->ErrorsCount;
  419. return;
  420. }
  421. if (message.Meta.Defined() && !IsUtf8(*message.Meta)) {
  422. YLOG_ERR("message meta contains non UTF-8 characters, message dropped");
  423. ++Counters->DroppedMessages;
  424. Counters->DroppedBytes += messageSize;
  425. ++Counters->ErrorsCount;
  426. return;
  427. }
  428. if (!message.Timestamp.Defined()) {
  429. message.Timestamp = TInstant::Now();
  430. }
  431. ++Counters->InflightMessages;
  432. Counters->InflightBytes += messageSize;
  433. {
  434. auto g = Guard(Lock);
  435. if (!Started) {
  436. DoStart();
  437. }
  438. if (CloseRequested) {
  439. g.Release();
  440. YLOG_ERR(Sprintf("session is closing, message dropped, [%lu] bytes", messageSize));
  441. --Counters->InflightMessages;
  442. Counters->InflightBytes -= messageSize;
  443. ++Counters->DroppedMessages;
  444. Counters->DroppedBytes += messageSize;
  445. ++Counters->ErrorsCount;
  446. return;
  447. }
  448. if (InflightBytes.load() + messageSize > MaxInflightBytes) {
  449. g.Release();
  450. YLOG_ERR(Sprintf("max inflight of [%lu] bytes reached, [%lu] bytes dropped",
  451. MaxInflightBytes, messageSize));
  452. --Counters->InflightMessages;
  453. Counters->InflightBytes -= messageSize;
  454. ++Counters->DroppedMessages;
  455. Counters->DroppedBytes += messageSize;
  456. ++Counters->ErrorsCount;
  457. return;
  458. }
  459. InflightBytes.fetch_add(messageSize);
  460. EventsBatch.push_back(TMessageReceivedEvent{std::move(message), messageSize});
  461. EventsBatchSize += messageSize;
  462. if ((PollingStatus == EPollingStatus::Inactive ||
  463. EventsBatchSize >= Client->GetParameters().GrpcMaxMessageSize) &&
  464. !EventNotificationTriggered)
  465. {
  466. EventNotificationTriggered = true;
  467. EventNotification->Trigger();
  468. }
  469. }
  470. }
  471. TFuture<void> TClientSession::CloseAsync(TInstant deadline) {
  472. YLOG_INFO(Sprintf("close, deadline [%s]", ToString(deadline).c_str()));
  473. if (!ClosePromise.GetFuture().HasValue()) {
  474. with_lock(Lock) {
  475. if (!Started) {
  476. return MakeFuture();
  477. }
  478. CloseRequested = true;
  479. EventsBatch.push_back(TCloseRequestedEvent{deadline});
  480. if (!EventNotificationTriggered) {
  481. EventNotificationTriggered = true;
  482. EventNotification->Trigger();
  483. }
  484. }
  485. }
  486. return ClosePromise.GetFuture();
  487. }
  488. void TClientSession::BeginClose(TInstant deadline) {
  489. if (Closed) {
  490. return;
  491. }
  492. if (!CloseStarted) {
  493. CloseStarted = true;
  494. YLOG_INFO("close started");
  495. }
  496. const auto force = deadline == TInstant::Zero();
  497. if (force && !ForcedCloseStarted) {
  498. ForcedCloseStarted = true;
  499. YLOG_INFO("forced close started");
  500. }
  501. if (!ActiveGrpcCall && (ForcedCloseStarted || WriteQueue.empty())) {
  502. DoClose();
  503. } else {
  504. if (!force) {
  505. ForceCloseTimer->Set(deadline);
  506. }
  507. if (ActiveGrpcCall) {
  508. ActiveGrpcCall->BeginClose(ForcedCloseStarted);
  509. }
  510. }
  511. }
  512. void TClientSession::Poll() {
  513. if (ForkInProgressLocal) {
  514. return;
  515. }
  516. const auto now = Now();
  517. const auto sendDelay = Client->GetParameters().GrpcSendDelay;
  518. const auto oldPollingStatus = PollingStatus;
  519. {
  520. if (!Lock.TryAcquire()) {
  521. TSpinWait sw;
  522. while (Lock.IsLocked() || !Lock.TryAcquire()) {
  523. if (ForkInProgress.load()) {
  524. YLOG_INFO("poller 'fork in progress' signal received, stopping session");
  525. ForkInProgressLocal = true;
  526. if (!ActiveGrpcCall || !ActiveGrpcCall->Initialized()) {
  527. BeginClose(TInstant::Max());
  528. } else if (ActiveGrpcCall->ReuseSessions()) {
  529. ActiveGrpcCall->Poison();
  530. BeginClose(TInstant::Max());
  531. } else {
  532. BeginClose(TInstant::Zero());
  533. }
  534. return;
  535. }
  536. sw.Sleep();
  537. }
  538. }
  539. if (!EventsBatch.empty()) {
  540. DoSwap(EventsBatch, SecondaryEventsBatch);
  541. EventsBatchSize = 0;
  542. PollerLastEventTimestamp = now;
  543. }
  544. const auto needNextPollStep = sendDelay != TDuration::Zero() &&
  545. !CloseRequested &&
  546. (now - PollerLastEventTimestamp) < 10 * sendDelay;
  547. PollingStatus = needNextPollStep ? EPollingStatus::Active : EPollingStatus::Inactive;
  548. EventNotificationTriggered = false;
  549. Lock.Release();
  550. }
  551. if (PollingStatus == EPollingStatus::Active) {
  552. PollTimer->Set(now + sendDelay);
  553. }
  554. if (PollingStatus != oldPollingStatus) {
  555. YLOG_DEBUG(Sprintf("poller %s", PollingStatus == EPollingStatus::Active ? "started" : "stopped"));
  556. }
  557. if (auto& batch = SecondaryEventsBatch; !batch.empty()) {
  558. auto closeIt = FindIf(batch, [](const auto& e) {
  559. return std::holds_alternative<TCloseRequestedEvent>(e);
  560. });
  561. if (auto it = begin(batch); it != closeIt) {
  562. Y_VERIFY(!CloseStarted);
  563. do {
  564. auto& e = std::get<TMessageReceivedEvent>(*it++);
  565. WriteQueue.push_back({std::move(e.Message), e.Size, false});
  566. } while (it != closeIt);
  567. if (ActiveGrpcCall) {
  568. ActiveGrpcCall->NotifyMessageAdded();
  569. }
  570. }
  571. for (auto endIt = end(batch); closeIt != endIt; ++closeIt) {
  572. const auto& e = std::get<TCloseRequestedEvent>(*closeIt);
  573. BeginClose(e.Deadline);
  574. }
  575. batch.clear();
  576. }
  577. };
  578. void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) {
  579. auto& initializeMessage = *target.MutableInitialize();
  580. if (SessionId.Defined()) {
  581. initializeMessage.SetSessionId(*SessionId);
  582. }
  583. if (Client->GetParameters().SharedSecretKey.Defined()) {
  584. initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey);
  585. }
  586. if (Meta.Defined()) {
  587. for (const auto& p: *Meta) {
  588. AddMeta(initializeMessage, p.first, p.second);
  589. }
  590. }
  591. if (!Meta.Defined() || Meta->find("_reusable") == Meta->end()) {
  592. AddMeta(initializeMessage, "_reusable", "true");
  593. }
  594. }
  595. TClientSession::TRequestBuilder::TRequestBuilder(NUnifiedAgentProto::Request& target, size_t RequestPayloadLimitBytes,
  596. TFMaybe<size_t> serializedRequestLimitBytes)
  597. : Target(target)
  598. , PwTarget(MakeFMaybe<NPW::TRequest>())
  599. , MetaItems()
  600. , RequestPayloadSize(0)
  601. , RequestPayloadLimitBytes(RequestPayloadLimitBytes)
  602. , SerializedRequestSize(0)
  603. , SerializedRequestLimitBytes(serializedRequestLimitBytes)
  604. , CountersInvalid(false)
  605. {
  606. }
  607. void TClientSession::TRequestBuilder::ResetCounters() {
  608. RequestPayloadSize = 0;
  609. SerializedRequestSize = 0;
  610. PwTarget.Clear();
  611. PwTarget.ConstructInPlace();
  612. CountersInvalid = false;
  613. }
  614. TClientSession::TRequestBuilder::TAddResult TClientSession::TRequestBuilder::TryAddMessage(
  615. const TPendingMessage& message, size_t seqNo) {
  616. Y_VERIFY(!CountersInvalid);
  617. {
  618. // add item to pwRequest to increase calculated size
  619. PwTarget->DataBatch.SeqNo.Add(seqNo);
  620. PwTarget->DataBatch.Timestamp.Add(message.Message.Timestamp->MicroSeconds());
  621. PwTarget->DataBatch.Payload.Add().SetValue(message.Message.Payload);
  622. if (message.Message.Meta.Defined()) {
  623. for (const auto &m: *message.Message.Meta) {
  624. TMetaItemBuilder *metaItemBuilder = nullptr;
  625. {
  626. auto it = MetaItems.find(m.first);
  627. if (it == MetaItems.end()) {
  628. PwTarget->DataBatch.Meta.Add().Key.SetValue(m.first);
  629. } else {
  630. metaItemBuilder = &it->second;
  631. }
  632. }
  633. size_t metaItemIdx = (metaItemBuilder != nullptr) ? metaItemBuilder->ItemIndex :
  634. PwTarget->DataBatch.Meta.GetSize() - 1;
  635. auto &pwMetaItem = PwTarget->DataBatch.Meta.Get(metaItemIdx);
  636. pwMetaItem.Value.Add().SetValue(m.second);
  637. const auto index = Target.GetDataBatch().SeqNoSize();
  638. if ((metaItemBuilder != nullptr && metaItemBuilder->ValueIndex != index) ||
  639. (metaItemBuilder == nullptr && index != 0)) {
  640. const auto valueIdx = (metaItemBuilder) ? metaItemBuilder->ValueIndex : 0;
  641. pwMetaItem.SkipStart.Add(valueIdx);
  642. pwMetaItem.SkipLength.Add(index - valueIdx);
  643. }
  644. }
  645. }
  646. }
  647. const auto newSerializedRequestSize = PwTarget->ByteSizeLong();
  648. const auto newPayloadSize = RequestPayloadSize + message.Size;
  649. if ((SerializedRequestLimitBytes.Defined() && newSerializedRequestSize > *SerializedRequestLimitBytes) ||
  650. newPayloadSize > RequestPayloadLimitBytes) {
  651. CountersInvalid = true;
  652. return {true, newPayloadSize, newSerializedRequestSize};
  653. }
  654. {
  655. // add item to the real request
  656. auto& batch = *Target.MutableDataBatch();
  657. batch.AddSeqNo(seqNo);
  658. batch.AddTimestamp(message.Message.Timestamp->MicroSeconds());
  659. batch.AddPayload(message.Message.Payload);
  660. if (message.Message.Meta.Defined()) {
  661. for (const auto &m: *message.Message.Meta) {
  662. TMetaItemBuilder *metaItemBuilder;
  663. {
  664. auto it = MetaItems.find(m.first);
  665. if (it == MetaItems.end()) {
  666. batch.AddMeta()->SetKey(m.first);
  667. auto insertResult = MetaItems.insert({m.first, {batch.MetaSize() - 1}});
  668. Y_VERIFY(insertResult.second);
  669. metaItemBuilder = &insertResult.first->second;
  670. } else {
  671. metaItemBuilder = &it->second;
  672. }
  673. }
  674. auto *metaItem = batch.MutableMeta(metaItemBuilder->ItemIndex);
  675. metaItem->AddValue(m.second);
  676. const auto index = batch.SeqNoSize() - 1;
  677. if (metaItemBuilder->ValueIndex != index) {
  678. metaItem->AddSkipStart(metaItemBuilder->ValueIndex);
  679. metaItem->AddSkipLength(index - metaItemBuilder->ValueIndex);
  680. }
  681. metaItemBuilder->ValueIndex = index + 1;
  682. }
  683. }
  684. SerializedRequestSize = newSerializedRequestSize;
  685. RequestPayloadSize = newPayloadSize;
  686. }
  687. return {false, newPayloadSize, newSerializedRequestSize};
  688. }
  689. void TClientSession::PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target) {
  690. Y_VERIFY(AckSeqNo.Defined());
  691. TRequestBuilder requestBuilder(target, Client->GetParameters().GrpcMaxMessageSize, AgentMaxReceiveMessage);
  692. const auto startIndex = NextIndex - TrimmedCount;
  693. for (size_t i = startIndex; i < WriteQueue.size(); ++i) {
  694. auto& queueItem = WriteQueue[i];
  695. if (queueItem.Skipped) {
  696. NextIndex++;
  697. continue;
  698. }
  699. const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1);
  700. const auto serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0;
  701. if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) {
  702. YLOG_ERR(Sprintf("single serialized message is too large [%lu] > [%lu], dropping it",
  703. addResult.NewSerializedRequestSize, serializedLimitToLog));
  704. queueItem.Skipped = true;
  705. ++Counters->DroppedMessages;
  706. Counters->DroppedBytes += queueItem.Size;
  707. ++Counters->ErrorsCount;
  708. NextIndex++;
  709. requestBuilder.ResetCounters();
  710. continue;
  711. }
  712. if (addResult.LimitExceeded) {
  713. YLOG_DEBUG(Sprintf(
  714. "batch limit exceeded: [%lu] > [%lu] (limit for serialized batch)"
  715. "OR [%lu] > [%lu] (limit for raw batch)",
  716. addResult.NewSerializedRequestSize, serializedLimitToLog,
  717. addResult.NewRequestPayloadSize, Client->GetParameters().GrpcMaxMessageSize));
  718. break;
  719. }
  720. NextIndex++;
  721. }
  722. const auto messagesCount = target.GetDataBatch().SeqNoSize();
  723. if (messagesCount == 0) {
  724. return;
  725. }
  726. Y_VERIFY(requestBuilder.GetSerializedRequestSize() == target.ByteSizeLong(),
  727. "failed to calculate size for message [%s]", target.ShortDebugString().c_str());
  728. GrpcInflightMessages += messagesCount;
  729. GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
  730. YLOG_DEBUG(Sprintf("new write batch, [%lu] messages, [%lu] bytes, first seq_no [%lu], serialized size [%lu]",
  731. messagesCount, requestBuilder.GetRequestPayloadSize(),
  732. *target.GetDataBatch().GetSeqNo().begin(), requestBuilder.GetSerializedRequestSize()));
  733. ++Counters->GrpcWriteBatchRequests;
  734. Counters->GrpcInflightMessages += messagesCount;
  735. Counters->GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
  736. }
  737. void TClientSession::Acknowledge(ui64 seqNo) {
  738. size_t messagesCount = 0;
  739. size_t bytesCount = 0;
  740. size_t skippedMessagesCount = 0;
  741. size_t skippedBytesCount = 0;
  742. if (AckSeqNo.Defined()) {
  743. while (!WriteQueue.empty() && ((*AckSeqNo < seqNo) || WriteQueue.front().Skipped)) {
  744. if (WriteQueue.front().Skipped) {
  745. skippedMessagesCount++;
  746. skippedBytesCount += WriteQueue.front().Size;
  747. } else {
  748. ++messagesCount;
  749. bytesCount += WriteQueue.front().Size;
  750. }
  751. ++(*AckSeqNo);
  752. WriteQueue.pop_front();
  753. ++TrimmedCount;
  754. }
  755. }
  756. if (!AckSeqNo.Defined() || seqNo > *AckSeqNo) {
  757. AckSeqNo = seqNo;
  758. }
  759. Counters->AcknowledgedMessages += messagesCount;
  760. Counters->AcknowledgedBytes += bytesCount;
  761. Counters->InflightMessages -= (messagesCount + skippedMessagesCount);
  762. Counters->InflightBytes -= (bytesCount + skippedBytesCount);
  763. InflightBytes.fetch_sub(bytesCount);
  764. Counters->GrpcInflightMessages -= messagesCount;
  765. Counters->GrpcInflightBytes -= bytesCount;
  766. GrpcInflightMessages -= messagesCount;
  767. GrpcInflightBytes -= bytesCount;
  768. YLOG_DEBUG(Sprintf("ack [%lu], [%lu] messages, [%lu] bytes", seqNo, messagesCount, bytesCount));
  769. }
  770. void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) {
  771. SessionId = sessionId;
  772. Acknowledge(lastSeqNo);
  773. NextIndex = TrimmedCount;
  774. ++Counters->GrpcCallsInitialized;
  775. Counters->GrpcInflightMessages -= GrpcInflightMessages;
  776. Counters->GrpcInflightBytes -= GrpcInflightBytes;
  777. GrpcInflightMessages = 0;
  778. GrpcInflightBytes = 0;
  779. YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%lu]",
  780. sessionId.c_str(), lastSeqNo));
  781. }
  782. void TClientSession::OnGrpcCallFinished() {
  783. Y_VERIFY(!Closed);
  784. Y_VERIFY(ActiveGrpcCall);
  785. ActiveGrpcCall = nullptr;
  786. if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) {
  787. DoClose();
  788. } else {
  789. const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay;
  790. MakeGrpcCallTimer->Set(reconnectTime);
  791. YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str()));
  792. }
  793. }
  794. auto TClientSession::PurgeWriteQueue() -> TPurgeWriteQueueStats {
  795. size_t bytesCount = 0;
  796. for (const auto& m: WriteQueue) {
  797. bytesCount += m.Size;
  798. }
  799. auto result = TPurgeWriteQueueStats{WriteQueue.size(), bytesCount};
  800. Counters->DroppedMessages += WriteQueue.size();
  801. Counters->DroppedBytes += bytesCount;
  802. Counters->InflightMessages -= WriteQueue.size();
  803. Counters->InflightBytes -= bytesCount;
  804. Counters->GrpcInflightMessages -= GrpcInflightMessages;
  805. Counters->GrpcInflightBytes -= GrpcInflightBytes;
  806. InflightBytes.fetch_sub(bytesCount);
  807. GrpcInflightMessages = 0;
  808. GrpcInflightBytes = 0;
  809. WriteQueue.clear();
  810. return result;
  811. }
  812. void TClientSession::DoClose() {
  813. Y_VERIFY(CloseStarted);
  814. Y_VERIFY(!Closed);
  815. Y_VERIFY(!ClosePromise.HasValue());
  816. MakeGrpcCallTimer->Cancel();
  817. ForceCloseTimer->Cancel();
  818. PollTimer->Cancel();
  819. if (!ForkInProgressLocal && WriteQueue.size() > 0) {
  820. const auto stats = PurgeWriteQueue();
  821. ++Counters->ErrorsCount;
  822. YLOG_ERR(Sprintf("DoClose, dropped [%lu] messages, [%lu] bytes",
  823. stats.PurgedMessages, stats.PurgedBytes));
  824. }
  825. --Client->GetCounters()->ActiveSessionsCount;
  826. Closed = true;
  827. ClosePromise.SetValue();
  828. YLOG_INFO("session closed");
  829. }
  830. TGrpcCall::TGrpcCall(TClientSession& session)
  831. : Session(session)
  832. , AsyncJoinerToken(&Session.GetAsyncJoiner())
  833. , AcceptTag(MakeIOCallback(this, &TGrpcCall::EndAccept))
  834. , ReadTag(MakeIOCallback(this, &TGrpcCall::EndRead))
  835. , WriteTag(MakeIOCallback(this, &TGrpcCall::EndWrite))
  836. , WritesDoneTag(MakeIOCallback(this, &TGrpcCall::EndWritesDone))
  837. , FinishTag(MakeIOCallback(this, &TGrpcCall::EndFinish))
  838. , Logger(session.GetLogger().Child("grpc"))
  839. , AcceptPending(false)
  840. , Initialized_(false)
  841. , ReadPending(false)
  842. , ReadsDone(false)
  843. , WritePending(false)
  844. , WritesBlocked(false)
  845. , WritesDonePending(false)
  846. , WritesDone(false)
  847. , ErrorOccured(false)
  848. , FinishRequested(false)
  849. , FinishStarted(false)
  850. , FinishDone(false)
  851. , Cancelled(false)
  852. , Poisoned(false)
  853. , PoisonPillSent(false)
  854. , ReuseSessions_(false)
  855. , FinishStatus()
  856. , ClientContext()
  857. , ReaderWriter(nullptr)
  858. , Request()
  859. , Response()
  860. {
  861. }
  862. void TGrpcCall::Start() {
  863. AcceptPending = true;
  864. auto& client = Session.GetClient();
  865. ReaderWriter = client.GetStub().AsyncSession(&ClientContext,
  866. &client.GetCompletionQueue(),
  867. AcceptTag->Ref());
  868. YLOG_INFO("AsyncSession started");
  869. }
  870. TGrpcCall::~TGrpcCall() {
  871. YLOG_INFO("destroyed");
  872. }
  873. void TGrpcCall::EnsureFinishStarted() {
  874. if (!FinishStarted) {
  875. FinishStarted = true;
  876. ReaderWriter->Finish(&FinishStatus, FinishTag->Ref());
  877. YLOG_INFO("Finish started");
  878. }
  879. }
  880. bool TGrpcCall::CheckHasError(EIOStatus status, const char* method) {
  881. if (status == EIOStatus::Error) {
  882. SetError(Sprintf("%s %s", method, ToString(status).c_str()));
  883. return true;
  884. }
  885. if (ErrorOccured) {
  886. ScheduleFinishOnError();
  887. return true;
  888. }
  889. return false;
  890. }
  891. void TGrpcCall::SetError(const TString& error) {
  892. if (!Cancelled) {
  893. YLOG_ERR(error);
  894. ++Session.GetCounters().ErrorsCount;
  895. }
  896. ErrorOccured = true;
  897. ScheduleFinishOnError();
  898. }
  899. void TGrpcCall::ScheduleFinishOnError() {
  900. if (!AcceptPending && !WritePending && !WritesDonePending) {
  901. EnsureFinishStarted();
  902. }
  903. }
  904. void TGrpcCall::BeginClose(bool force) {
  905. if (force) {
  906. if (!Cancelled) {
  907. Cancelled = true;
  908. ClientContext.TryCancel();
  909. SetError("forced close");
  910. }
  911. return;
  912. }
  913. YLOG_INFO(Sprintf("Close Initialized [%d], AcceptPending [%d], "
  914. "WritePending [%d], FinishRequested [%d], "
  915. "ErrorOccured [%d]",
  916. static_cast<int>(Initialized_),
  917. static_cast<int>(AcceptPending),
  918. static_cast<int>(WritePending),
  919. static_cast<int>(FinishRequested),
  920. static_cast<int>(ErrorOccured)));
  921. if (ErrorOccured || FinishRequested) {
  922. return;
  923. }
  924. FinishRequested = true;
  925. if (!Initialized_ || WritePending) {
  926. return;
  927. }
  928. WritesBlocked = true;
  929. BeginWritesDone();
  930. }
  931. void TGrpcCall::Poison() {
  932. Poisoned = true;
  933. NotifyMessageAdded();
  934. }
  935. void TGrpcCall::NotifyMessageAdded() {
  936. if (WritePending || !Initialized_ || ErrorOccured || FinishRequested) {
  937. return;
  938. }
  939. ScheduleWrite();
  940. }
  941. void TGrpcCall::ScheduleWrite() {
  942. Request.Clear();
  943. if (!Poisoned) {
  944. Session.PrepareWriteBatchRequest(Request);
  945. } else if (!PoisonPillSent) {
  946. PoisonPillSent = true;
  947. auto& batch = *Request.mutable_data_batch();
  948. batch.AddSeqNo(std::numeric_limits<::google::protobuf::uint64>::max());
  949. batch.AddTimestamp(Now().MicroSeconds());
  950. batch.AddPayload("");
  951. YLOG_INFO("poison pill sent");
  952. }
  953. if (Request.GetDataBatch().GetSeqNo().empty()) {
  954. if (FinishRequested) {
  955. WritesBlocked = true;
  956. BeginWritesDone();
  957. }
  958. return;
  959. }
  960. BeginWrite();
  961. }
  962. void TGrpcCall::EndAccept(EIOStatus status) {
  963. Y_VERIFY(AcceptPending);
  964. AcceptPending = false;
  965. if (CheckHasError(status, "EndAccept")) {
  966. return;
  967. }
  968. BeginRead();
  969. Request.Clear();
  970. Session.PrepareInitializeRequest(Request);
  971. BeginWrite();
  972. }
  973. void TGrpcCall::EndRead(EIOStatus status) {
  974. ReadPending = false;
  975. if (FinishDone) {
  976. Session.OnGrpcCallFinished();
  977. return;
  978. }
  979. if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) {
  980. Y_VERIFY(!WritePending);
  981. YLOG_INFO("EndRead ReadsDone");
  982. ReadsDone = true;
  983. if (WritesDone) {
  984. EnsureFinishStarted();
  985. return;
  986. }
  987. return;
  988. }
  989. if (CheckHasError(status, "EndRead")) {
  990. return;
  991. }
  992. if (!Initialized_) {
  993. const auto metadata = ClientContext.GetServerInitialMetadata();
  994. {
  995. const auto it = metadata.find("ua-reuse-sessions");
  996. if (it != metadata.end() && it->second == "true") {
  997. ReuseSessions_ = true;
  998. }
  999. }
  1000. {
  1001. const auto it = metadata.find("ua-max-receive-message-size");
  1002. if (it != metadata.end()) {
  1003. Session.SetAgentMaxReceiveMessage(FromString<size_t>(TString{it->second.begin(), it->second.end()}));
  1004. }
  1005. }
  1006. if (Response.response_case() != NUnifiedAgentProto::Response::kInitialized) {
  1007. SetError(Sprintf("EndRead while initializing, unexpected response_case [%d]",
  1008. static_cast<int>(Response.response_case())));
  1009. return;
  1010. }
  1011. Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(),
  1012. Response.GetInitialized().GetLastSeqNo());
  1013. Initialized_ = true;
  1014. if (!WritePending) {
  1015. ScheduleWrite();
  1016. }
  1017. } else {
  1018. if (Response.response_case() != NUnifiedAgentProto::Response::kAck) {
  1019. SetError(Sprintf("EndRead unexpected response_case [%d]",
  1020. static_cast<int>(Response.response_case())));
  1021. return;
  1022. }
  1023. Session.Acknowledge(Response.GetAck().GetSeqNo());
  1024. }
  1025. BeginRead();
  1026. }
  1027. void TGrpcCall::EndWrite(EIOStatus status) {
  1028. WritePending = false;
  1029. if (CheckHasError(status, "EndWrite")) {
  1030. return;
  1031. }
  1032. if (!Initialized_) {
  1033. return;
  1034. }
  1035. ScheduleWrite();
  1036. }
  1037. void TGrpcCall::EndFinish(EIOStatus status) {
  1038. FinishDone = true;
  1039. const auto finishStatus = status == EIOStatus::Error
  1040. ? grpc::Status(grpc::UNKNOWN, "finish error")
  1041. : FinishStatus;
  1042. YLOG(finishStatus.ok() || Cancelled || Poisoned ? TLOG_INFO : TLOG_ERR,
  1043. Sprintf("EndFinish, code [%s], message [%s]",
  1044. ToString(finishStatus.error_code()).c_str(),
  1045. finishStatus.error_message().c_str()),
  1046. Logger);
  1047. if (!finishStatus.ok() && !Cancelled) {
  1048. ++Session.GetCounters().ErrorsCount;
  1049. }
  1050. if (!ReadPending) {
  1051. Session.OnGrpcCallFinished();
  1052. }
  1053. }
  1054. void TGrpcCall::EndWritesDone(EIOStatus status) {
  1055. YLOG_INFO(Sprintf("EndWritesDone [%s]", ToString(status).c_str()));
  1056. Y_VERIFY(!WritePending && !WritesDone && WritesDonePending);
  1057. WritesDonePending = false;
  1058. WritesDone = true;
  1059. if (CheckHasError(status, "EndWriteDone")) {
  1060. return;
  1061. }
  1062. if (ReadsDone) {
  1063. EnsureFinishStarted();
  1064. }
  1065. }
  1066. void TGrpcCall::BeginWritesDone() {
  1067. WritesDonePending = true;
  1068. ReaderWriter->WritesDone(WritesDoneTag->Ref());
  1069. YLOG_INFO("WritesDone started");
  1070. }
  1071. void TGrpcCall::BeginRead() {
  1072. ReadPending = true;
  1073. Response.Clear();
  1074. ReaderWriter->Read(&Response, ReadTag->Ref());
  1075. YLOG_DEBUG("Read started");
  1076. }
  1077. void TGrpcCall::BeginWrite() {
  1078. WritePending = true;
  1079. ReaderWriter->Write(Request, WriteTag->Ref());
  1080. YLOG_DEBUG("Write started");
  1081. }
  1082. }
  1083. namespace NUnifiedAgent {
  1084. size_t SizeOf(const TClientMessage& message) {
  1085. auto result = message.Payload.Size() + sizeof(TInstant);
  1086. if (message.Meta.Defined()) {
  1087. for (const auto& m: *message.Meta) {
  1088. result += m.first.Size() + m.second.Size();
  1089. }
  1090. }
  1091. return result;
  1092. }
  1093. TClientParameters::TClientParameters(const TString& uri)
  1094. : Uri(uri)
  1095. , SharedSecretKey(Nothing())
  1096. , MaxInflightBytes(DefaultMaxInflightBytes)
  1097. , Log(TLoggerOperator<TGlobalLog>::Log())
  1098. , LogRateLimitBytes(Nothing())
  1099. , GrpcReconnectDelay(TDuration::MilliSeconds(50))
  1100. , GrpcSendDelay(DefaultGrpcSendDelay)
  1101. , EnableForkSupport(false)
  1102. , GrpcMaxMessageSize(DefaultGrpcMaxMessageSize)
  1103. , Counters(nullptr)
  1104. {
  1105. }
  1106. TSessionParameters::TSessionParameters()
  1107. : SessionId(Nothing())
  1108. , Meta(Nothing())
  1109. , Counters(nullptr)
  1110. , MaxInflightBytes()
  1111. {
  1112. }
  1113. const size_t TClientParameters::DefaultMaxInflightBytes = 10_MB;
  1114. const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB;
  1115. const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10);
  1116. TClientPtr MakeClient(const TClientParameters& parameters) {
  1117. if (!grpc_is_initialized()) {
  1118. EnsureGrpcConfigured();
  1119. }
  1120. std::shared_ptr<NPrivate::TForkProtector> forkProtector{};
  1121. #ifdef _unix_
  1122. if (parameters.EnableForkSupport) {
  1123. forkProtector = NPrivate::TForkProtector::Get(true);
  1124. }
  1125. #endif
  1126. return MakeIntrusive<NPrivate::TClient>(parameters, forkProtector);
  1127. }
  1128. }