12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283 |
- #include "client_impl.h"
- #include "helpers.h"
- #include <contrib/libs/grpc/include/grpc/grpc.h>
- #include <contrib/libs/grpc/src/core/lib/gpr/string.h>
- #include <contrib/libs/grpc/src/core/lib/gprpp/fork.h>
- #include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
- #include <util/charset/utf8.h>
- #include <util/generic/size_literals.h>
- #include <util/system/env.h>
- using namespace NThreading;
- using namespace NMonitoring;
- namespace NUnifiedAgent::NPrivate {
- std::shared_ptr<grpc::Channel> CreateChannel(const grpc::string& target) {
- grpc::ChannelArguments args;
- args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE);
- args.SetMaxReceiveMessageSize(Max<int>());
- args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
- args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000);
- args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
- args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 200);
- args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
- args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
- args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
- args.SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
- args.SetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE, 1024*1024);
- return grpc::CreateCustomChannel(target, grpc::InsecureChannelCredentials(), args);
- }
- void AddMeta(NUnifiedAgentProto::Request_Initialize& init, const TString& name, const TString& value) {
- auto* metaItem = init.MutableMeta()->Add();
- metaItem->SetName(name);
- metaItem->SetValue(value);
- }
- std::atomic<ui64> TClient::Id{0};
- TClient::TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector)
- : Parameters(parameters)
- , ForkProtector(forkProtector)
- , Counters(parameters.Counters ? parameters.Counters : MakeIntrusive<TClientCounters>())
- , Log(parameters.Log)
- , MainLogger(Log, MakeFMaybe(Parameters.LogRateLimitBytes))
- , Logger(MainLogger.Child(Sprintf("ua_%" PRIu64, Id.fetch_add(1))))
- , Channel(nullptr)
- , Stub(nullptr)
- , ActiveCompletionQueue(nullptr)
- , SessionLogLabel(0)
- , ActiveSessions()
- , Started(false)
- , Destroyed(false)
- , Lock()
- {
- MainLogger.SetDroppedBytesCounter(&Counters->ClientLogDroppedBytes);
- if (ForkProtector != nullptr) {
- ForkProtector->Register(*this);
- }
- EnsureStarted();
- YLOG_INFO(Sprintf("created, uri [%s]", Parameters.Uri.c_str()));
- }
- TClient::~TClient() {
- with_lock(Lock) {
- Y_ABORT_UNLESS(ActiveSessions.empty(), "active sessions found");
- EnsureStoppedNoLock();
- Destroyed = true;
- }
- if (ForkProtector != nullptr) {
- ForkProtector->Unregister(*this);
- }
- YLOG_DEBUG(Sprintf("destroyed, uri [%s]", Parameters.Uri.c_str()));
- }
- TClientSessionPtr TClient::CreateSession(const TSessionParameters& parameters) {
- return MakeIntrusive<TClientSession>(this, parameters);
- }
- void TClient::StartTracing(ELogPriority logPriority) {
- MainLogger.StartTracing(logPriority);
- StartGrpcTracing();
- YLOG_INFO("tracing started");
- }
- void TClient::FinishTracing() {
- FinishGrpcTracing();
- MainLogger.FinishTracing();
- YLOG_INFO("tracing finished");
- }
- void TClient::RegisterSession(TClientSession* session) {
- with_lock(Lock) {
- ActiveSessions.push_back(session);
- }
- }
- void TClient::UnregisterSession(TClientSession* session) {
- with_lock(Lock) {
- const auto it = Find(ActiveSessions, session);
- Y_ABORT_UNLESS(it != ActiveSessions.end());
- ActiveSessions.erase(it);
- }
- }
- void TClient::PreFork() {
- YLOG_INFO("pre fork started");
- Lock.Acquire();
- auto futures = TVector<TFuture<void>>(Reserve(ActiveSessions.size()));
- for (auto* s: ActiveSessions) {
- futures.push_back(s->PreFork());
- }
- YLOG_INFO("waiting for sessions");
- WaitAll(futures).Wait();
- EnsureStoppedNoLock();
- YLOG_INFO("shutdown grpc executor");
- grpc_core::Executor::SetThreadingAll(false);
- YLOG_INFO("pre fork finished");
- }
- void TClient::PostForkParent() {
- YLOG_INFO("post fork parent started");
- if (!Destroyed) {
- EnsureStartedNoLock();
- }
- Lock.Release();
- for (auto* s: ActiveSessions) {
- s->PostForkParent();
- }
- YLOG_INFO("post fork parent finished");
- }
- void TClient::PostForkChild() {
- YLOG_INFO("post fork child started");
- Lock.Release();
- for (auto* s: ActiveSessions) {
- s->PostForkChild();
- }
- YLOG_INFO("post fork child finished");
- }
- void TClient::EnsureStarted() {
- with_lock(Lock) {
- EnsureStartedNoLock();
- }
- }
- void TClient::EnsureStartedNoLock() {
- // Lock must be held
- if (Started) {
- return;
- }
- Channel = CreateChannel(Parameters.Uri);
- Stub = NUnifiedAgentProto::UnifiedAgentService::NewStub(Channel);
- ActiveCompletionQueue = MakeHolder<TGrpcCompletionQueueHost>();
- ActiveCompletionQueue->Start();
- Started = true;
- }
- void TClient::EnsureStoppedNoLock() {
- // Lock must be held
- if (!Started) {
- return;
- }
- YLOG_DEBUG("stopping");
- ActiveCompletionQueue->Stop();
- ActiveCompletionQueue = nullptr;
- Stub = nullptr;
- Channel = nullptr;
- YLOG_DEBUG("stopped");
- Started = false;
- }
- TScopeLogger TClient::CreateSessionLogger() {
- return Logger.Child(ToString(SessionLogLabel.fetch_add(1)));
- }
- TForkProtector::TForkProtector()
- : Clients()
- , GrpcInitializer()
- , Enabled(grpc_core::Fork::Enabled())
- , Lock()
- {
- }
- void TForkProtector::Register(TClient& client) {
- if (!Enabled) {
- return;
- }
- Y_ABORT_UNLESS(grpc_is_initialized());
- Y_ABORT_UNLESS(grpc_core::Fork::Enabled());
- with_lock(Lock) {
- Clients.push_back(&client);
- }
- }
- void TForkProtector::Unregister(TClient& client) {
- if (!Enabled) {
- return;
- }
- with_lock(Lock) {
- const auto it = Find(Clients, &client);
- Y_ABORT_UNLESS(it != Clients.end());
- Clients.erase(it);
- }
- }
- std::shared_ptr<TForkProtector> TForkProtector::Get(bool createIfNotExists) {
- with_lock(InstanceLock) {
- auto result = Instance.lock();
- if (!result && createIfNotExists) {
- result = std::make_shared<TForkProtector>();
- if (!result->Enabled) {
- TLog log("cerr");
- TLogger logger(log, Nothing());
- auto scopeLogger = logger.Child("ua client");
- YLOG(TLOG_WARNING,
- "Grpc is already initialized, can't enable fork support. "
- "If forks are possible, please set environment variable GRPC_ENABLE_FORK_SUPPORT to 'true'. "
- "If not, you can suppress this warning by setting EnableForkSupport "
- "to false when creating the ua client.",
- scopeLogger);
- } else if (!SubscribedToForks) {
- SubscribedToForks = true;
- #ifdef _unix_
- pthread_atfork(
- &TForkProtector::PreFork,
- &TForkProtector::PostForkParent,
- &TForkProtector::PostForkChild);
- #endif
- }
- Instance = result;
- }
- return result;
- }
- }
- void TForkProtector::PreFork() {
- auto self = Get(false);
- if (!self) {
- return;
- }
- self->Lock.Acquire();
- for (auto* c : self->Clients) {
- c->PreFork();
- }
- }
- void TForkProtector::PostForkParent() {
- auto self = Get(false);
- if (!self) {
- return;
- }
- for (auto* c : self->Clients) {
- c->PostForkParent();
- }
- self->Lock.Release();
- }
- void TForkProtector::PostForkChild() {
- auto self = Get(false);
- if (!self) {
- return;
- }
- for (auto* c : self->Clients) {
- c->PostForkChild();
- }
- self->Lock.Release();
- }
- std::weak_ptr<TForkProtector> TForkProtector::Instance{};
- TMutex TForkProtector::InstanceLock{};
- bool TForkProtector::SubscribedToForks{false};
- TClientSession::TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters)
- : AsyncJoiner()
- , Client(client)
- , OriginalSessionId(MakeFMaybe(parameters.SessionId))
- , SessionId(OriginalSessionId)
- , Meta(MakeFMaybe(parameters.Meta))
- , Logger(Client->CreateSessionLogger())
- , CloseStarted(false)
- , ForcedCloseStarted(false)
- , Closed(false)
- , ForkInProgressLocal(false)
- , Started(false)
- , ClosePromise()
- , ActiveGrpcCall(nullptr)
- , WriteQueue()
- , TrimmedCount(0)
- , NextIndex(0)
- , AckSeqNo(Nothing())
- , PollerLastEventTimestamp()
- , Counters(parameters.Counters ? parameters.Counters : Client->GetCounters()->GetDefaultSessionCounters())
- , MakeGrpcCallTimer(nullptr)
- , ForceCloseTimer(nullptr)
- , PollTimer(nullptr)
- , GrpcInflightMessages(0)
- , GrpcInflightBytes(0)
- , InflightBytes(0)
- , CloseRequested(false)
- , EventsBatchSize(0)
- , PollingStatus(EPollingStatus::Inactive)
- , EventNotification(nullptr)
- , EventNotificationTriggered(false)
- , EventsBatch()
- , SecondaryEventsBatch()
- , ForkInProgress(false)
- , Lock()
- , MaxInflightBytes(
- parameters.MaxInflightBytes.GetOrElse(Client->GetParameters().MaxInflightBytes))
- , AgentMaxReceiveMessage(Nothing()) {
- if (Meta.Defined() && !IsUtf8(*Meta)) {
- throw std::runtime_error("session meta contains non UTF-8 characters");
- }
- Y_ENSURE(!(Client->GetParameters().EnableForkSupport && SessionId.Defined()),
- "explicit session id is not supported with forks");
- Client->RegisterSession(this);
- with_lock(Lock) {
- DoStart();
- }
- }
- TFuture<void> TClientSession::PreFork() {
- YLOG_INFO("pre fork started");
- Lock.Acquire();
- YLOG_INFO("triggering event notification");
- if (!EventNotificationTriggered) {
- EventNotificationTriggered = true;
- EventNotification->Trigger();
- }
- YLOG_INFO("setting 'fork in progress' flag");
- ForkInProgress.store(true);
- if (!Started) {
- ClosePromise.TrySetValue();
- }
- YLOG_INFO("pre fork finished");
- return ClosePromise.GetFuture();
- }
- void TClientSession::PostForkParent() {
- YLOG_INFO("post fork parent started");
- ForkInProgress.store(false);
- ForkInProgressLocal = false;
- Started = false;
- if (!CloseRequested) {
- DoStart();
- YLOG_INFO("triggering event notification");
- EventNotificationTriggered = true;
- EventNotification->Trigger();
- }
- Lock.Release();
- YLOG_INFO("post fork parent finished");
- }
- void TClientSession::PostForkChild() {
- YLOG_INFO("post fork child started");
- ForkInProgress.store(false);
- ForkInProgressLocal = false;
- Started = false;
- SessionId.Clear();
- TrimmedCount = 0;
- NextIndex = 0;
- AckSeqNo.Clear();
- PurgeWriteQueue();
- EventsBatch.clear();
- SecondaryEventsBatch.clear();
- EventsBatchSize = 0;
- Lock.Release();
- YLOG_INFO("post fork child finished");
- }
- void TClientSession::SetAgentMaxReceiveMessage(size_t newValue) {
- AgentMaxReceiveMessage = newValue;
- }
- void TClientSession::DoStart() {
- // Lock must be held
- Y_ABORT_UNLESS(!Started);
- YLOG_DEBUG("starting");
- Client->EnsureStarted();
- MakeGrpcCallTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
- MakeIOCallback([this](EIOStatus status) {
- if (status == EIOStatus::Error) {
- return;
- }
- MakeGrpcCall();
- }, &AsyncJoiner));
- ForceCloseTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
- MakeIOCallback([this](EIOStatus status) {
- if (status == EIOStatus::Error) {
- return;
- }
- YLOG_INFO("ForceCloseTimer");
- BeginClose(TInstant::Zero());
- }, &AsyncJoiner));
- PollTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
- MakeIOCallback([this](EIOStatus status) {
- if (status == EIOStatus::Error) {
- return;
- }
- Poll();
- }, &AsyncJoiner));
- EventNotification = MakeHolder<TGrpcNotification>(Client->GetCompletionQueue(),
- MakeIOCallback([this](EIOStatus status) {
- Y_ABORT_UNLESS(status == EIOStatus::Ok);
- Poll();
- }, &AsyncJoiner));
- CloseStarted = false;
- ForcedCloseStarted = false;
- Closed = false;
- ClosePromise = NewPromise();
- EventNotificationTriggered = false;
- PollerLastEventTimestamp = Now();
- PollingStatus = EPollingStatus::Inactive;
- ++Client->GetCounters()->ActiveSessionsCount;
- MakeGrpcCallTimer->Set(Now());
- YLOG_DEBUG(Sprintf("started, sessionId [%s]", OriginalSessionId.GetOrElse("").c_str()));
- Started = true;
- }
- void TClientSession::MakeGrpcCall() {
- if (Closed) {
- YLOG_INFO("MakeGrpcCall, session already closed");
- return;
- }
- Y_ABORT_UNLESS(!ForcedCloseStarted);
- Y_ABORT_UNLESS(!ActiveGrpcCall);
- ActiveGrpcCall = MakeIntrusive<TGrpcCall>(*this);
- ActiveGrpcCall->Start();
- ++Counters->GrpcCalls;
- if (CloseStarted) {
- ActiveGrpcCall->BeginClose(false);
- }
- }
- TClientSession::~TClientSession() {
- Close(TInstant::Zero());
- AsyncJoiner.Join().Wait();
- Client->UnregisterSession(this);
- YLOG_DEBUG("destroyed");
- }
- void TClientSession::Send(TClientMessage&& message) {
- const size_t messageSize = SizeOf(message);
- ++Counters->ReceivedMessages;
- Counters->ReceivedBytes += messageSize;
- if (messageSize > Client->GetParameters().GrpcMaxMessageSize) {
- YLOG_ERR(Sprintf("message size [%zu] is greater than max grpc message size [%zu], message dropped",
- messageSize, Client->GetParameters().GrpcMaxMessageSize));
- ++Counters->DroppedMessages;
- Counters->DroppedBytes += messageSize;
- ++Counters->ErrorsCount;
- return;
- }
- if (message.Meta.Defined() && !IsUtf8(*message.Meta)) {
- YLOG_ERR("message meta contains non UTF-8 characters, message dropped");
- ++Counters->DroppedMessages;
- Counters->DroppedBytes += messageSize;
- ++Counters->ErrorsCount;
- return;
- }
- if (!message.Timestamp.Defined()) {
- message.Timestamp = TInstant::Now();
- }
- ++Counters->InflightMessages;
- Counters->InflightBytes += messageSize;
- {
- auto g = Guard(Lock);
- if (!Started) {
- DoStart();
- }
- if (CloseRequested) {
- g.Release();
- YLOG_ERR(Sprintf("session is closing, message dropped, [%zu] bytes", messageSize));
- --Counters->InflightMessages;
- Counters->InflightBytes -= messageSize;
- ++Counters->DroppedMessages;
- Counters->DroppedBytes += messageSize;
- ++Counters->ErrorsCount;
- return;
- }
- if (InflightBytes.load() + messageSize > MaxInflightBytes) {
- g.Release();
- YLOG_ERR(Sprintf("max inflight of [%zu] bytes reached, [%zu] bytes dropped",
- MaxInflightBytes, messageSize));
- --Counters->InflightMessages;
- Counters->InflightBytes -= messageSize;
- ++Counters->DroppedMessages;
- Counters->DroppedBytes += messageSize;
- ++Counters->ErrorsCount;
- return;
- }
- InflightBytes.fetch_add(messageSize);
- EventsBatch.push_back(TMessageReceivedEvent{std::move(message), messageSize});
- EventsBatchSize += messageSize;
- if ((PollingStatus == EPollingStatus::Inactive ||
- EventsBatchSize >= Client->GetParameters().GrpcMaxMessageSize) &&
- !EventNotificationTriggered)
- {
- EventNotificationTriggered = true;
- EventNotification->Trigger();
- }
- }
- }
- TFuture<void> TClientSession::CloseAsync(TInstant deadline) {
- YLOG_DEBUG(Sprintf("close, deadline [%s]", ToString(deadline).c_str()));
- if (!ClosePromise.GetFuture().HasValue()) {
- with_lock(Lock) {
- if (!Started) {
- return MakeFuture();
- }
- CloseRequested = true;
- EventsBatch.push_back(TCloseRequestedEvent{deadline});
- if (!EventNotificationTriggered) {
- EventNotificationTriggered = true;
- EventNotification->Trigger();
- }
- }
- }
- return ClosePromise.GetFuture();
- }
- void TClientSession::BeginClose(TInstant deadline) {
- if (Closed) {
- return;
- }
- if (!CloseStarted) {
- CloseStarted = true;
- YLOG_DEBUG("close started");
- }
- const auto force = deadline == TInstant::Zero();
- if (force && !ForcedCloseStarted) {
- ForcedCloseStarted = true;
- YLOG_INFO("forced close started");
- }
- if (!ActiveGrpcCall && (ForcedCloseStarted || WriteQueue.empty())) {
- DoClose();
- } else {
- if (!force) {
- ForceCloseTimer->Set(deadline);
- }
- if (ActiveGrpcCall) {
- ActiveGrpcCall->BeginClose(ForcedCloseStarted);
- }
- }
- }
- void TClientSession::Poll() {
- if (ForkInProgressLocal) {
- return;
- }
- const auto now = Now();
- const auto sendDelay = Client->GetParameters().GrpcSendDelay;
- const auto oldPollingStatus = PollingStatus;
- {
- if (!Lock.TryAcquire()) {
- TSpinWait sw;
- while (Lock.IsLocked() || !Lock.TryAcquire()) {
- if (ForkInProgress.load()) {
- YLOG_INFO("poller 'fork in progress' signal received, stopping session");
- ForkInProgressLocal = true;
- if (!ActiveGrpcCall || !ActiveGrpcCall->Initialized()) {
- BeginClose(TInstant::Max());
- } else if (ActiveGrpcCall->ReuseSessions()) {
- ActiveGrpcCall->Poison();
- BeginClose(TInstant::Max());
- } else {
- BeginClose(TInstant::Zero());
- }
- return;
- }
- sw.Sleep();
- }
- }
- if (!EventsBatch.empty()) {
- DoSwap(EventsBatch, SecondaryEventsBatch);
- EventsBatchSize = 0;
- PollerLastEventTimestamp = now;
- }
- const auto needNextPollStep = sendDelay != TDuration::Zero() &&
- !CloseRequested &&
- (now - PollerLastEventTimestamp) < 10 * sendDelay;
- PollingStatus = needNextPollStep ? EPollingStatus::Active : EPollingStatus::Inactive;
- EventNotificationTriggered = false;
- Lock.Release();
- }
- if (PollingStatus == EPollingStatus::Active) {
- PollTimer->Set(now + sendDelay);
- }
- if (PollingStatus != oldPollingStatus) {
- YLOG_DEBUG(Sprintf("poller %s", PollingStatus == EPollingStatus::Active ? "started" : "stopped"));
- }
- if (auto& batch = SecondaryEventsBatch; !batch.empty()) {
- auto closeIt = FindIf(batch, [](const auto& e) {
- return std::holds_alternative<TCloseRequestedEvent>(e);
- });
- if (auto it = begin(batch); it != closeIt) {
- Y_ABORT_UNLESS(!CloseStarted);
- do {
- auto& e = std::get<TMessageReceivedEvent>(*it++);
- WriteQueue.push_back({std::move(e.Message), e.Size, false});
- } while (it != closeIt);
- if (ActiveGrpcCall) {
- ActiveGrpcCall->NotifyMessageAdded();
- }
- }
- for (auto endIt = end(batch); closeIt != endIt; ++closeIt) {
- const auto& e = std::get<TCloseRequestedEvent>(*closeIt);
- BeginClose(e.Deadline);
- }
- batch.clear();
- }
- };
- void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) {
- auto& initializeMessage = *target.MutableInitialize();
- if (SessionId.Defined()) {
- initializeMessage.SetSessionId(*SessionId);
- }
- if (Client->GetParameters().SharedSecretKey.Defined()) {
- initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey);
- }
- if (Meta.Defined()) {
- for (const auto& p: *Meta) {
- AddMeta(initializeMessage, p.first, p.second);
- }
- }
- if (!Meta.Defined() || Meta->find("_reusable") == Meta->end()) {
- AddMeta(initializeMessage, "_reusable", "true");
- }
- }
- TClientSession::TRequestBuilder::TRequestBuilder(NUnifiedAgentProto::Request& target, size_t RequestPayloadLimitBytes,
- TFMaybe<size_t> serializedRequestLimitBytes)
- : Target(target)
- , PwTarget(MakeFMaybe<NPW::TRequest>())
- , MetaItems()
- , RequestPayloadSize(0)
- , RequestPayloadLimitBytes(RequestPayloadLimitBytes)
- , SerializedRequestSize(0)
- , SerializedRequestLimitBytes(serializedRequestLimitBytes)
- , CountersInvalid(false)
- {
- }
- void TClientSession::TRequestBuilder::ResetCounters() {
- RequestPayloadSize = 0;
- SerializedRequestSize = 0;
- PwTarget.Clear();
- PwTarget.ConstructInPlace();
- CountersInvalid = false;
- }
- TClientSession::TRequestBuilder::TAddResult TClientSession::TRequestBuilder::TryAddMessage(
- const TPendingMessage& message, size_t seqNo) {
- Y_ABORT_UNLESS(!CountersInvalid);
- {
- // add item to pwRequest to increase calculated size
- PwTarget->DataBatch.SeqNo.Add(seqNo);
- PwTarget->DataBatch.Timestamp.Add(message.Message.Timestamp->MicroSeconds());
- PwTarget->DataBatch.Payload.Add().SetValue(message.Message.Payload);
- if (message.Message.Meta.Defined()) {
- for (const auto &m: *message.Message.Meta) {
- TMetaItemBuilder *metaItemBuilder = nullptr;
- {
- auto it = MetaItems.find(m.first);
- if (it == MetaItems.end()) {
- PwTarget->DataBatch.Meta.Add().Key.SetValue(m.first);
- } else {
- metaItemBuilder = &it->second;
- }
- }
- size_t metaItemIdx = (metaItemBuilder != nullptr) ? metaItemBuilder->ItemIndex :
- PwTarget->DataBatch.Meta.GetSize() - 1;
- auto &pwMetaItem = PwTarget->DataBatch.Meta.Get(metaItemIdx);
- pwMetaItem.Value.Add().SetValue(m.second);
- const auto index = Target.GetDataBatch().SeqNoSize();
- if ((metaItemBuilder != nullptr && metaItemBuilder->ValueIndex != index) ||
- (metaItemBuilder == nullptr && index != 0)) {
- const auto valueIdx = (metaItemBuilder) ? metaItemBuilder->ValueIndex : 0;
- pwMetaItem.SkipStart.Add(valueIdx);
- pwMetaItem.SkipLength.Add(index - valueIdx);
- }
- }
- }
- }
- const auto newSerializedRequestSize = PwTarget->ByteSizeLong();
- const auto newPayloadSize = RequestPayloadSize + message.Size;
- if ((SerializedRequestLimitBytes.Defined() && newSerializedRequestSize > *SerializedRequestLimitBytes) ||
- newPayloadSize > RequestPayloadLimitBytes) {
- CountersInvalid = true;
- return {true, newPayloadSize, newSerializedRequestSize};
- }
- {
- // add item to the real request
- auto& batch = *Target.MutableDataBatch();
- batch.AddSeqNo(seqNo);
- batch.AddTimestamp(message.Message.Timestamp->MicroSeconds());
- batch.AddPayload(message.Message.Payload);
- if (message.Message.Meta.Defined()) {
- for (const auto &m: *message.Message.Meta) {
- TMetaItemBuilder *metaItemBuilder;
- {
- auto it = MetaItems.find(m.first);
- if (it == MetaItems.end()) {
- batch.AddMeta()->SetKey(m.first);
- auto insertResult = MetaItems.insert({m.first, {batch.MetaSize() - 1}});
- Y_ABORT_UNLESS(insertResult.second);
- metaItemBuilder = &insertResult.first->second;
- } else {
- metaItemBuilder = &it->second;
- }
- }
- auto *metaItem = batch.MutableMeta(metaItemBuilder->ItemIndex);
- metaItem->AddValue(m.second);
- const auto index = batch.SeqNoSize() - 1;
- if (metaItemBuilder->ValueIndex != index) {
- metaItem->AddSkipStart(metaItemBuilder->ValueIndex);
- metaItem->AddSkipLength(index - metaItemBuilder->ValueIndex);
- }
- metaItemBuilder->ValueIndex = index + 1;
- }
- }
- SerializedRequestSize = newSerializedRequestSize;
- RequestPayloadSize = newPayloadSize;
- }
- return {false, newPayloadSize, newSerializedRequestSize};
- }
- void TClientSession::PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target) {
- Y_ABORT_UNLESS(AckSeqNo.Defined());
- TRequestBuilder requestBuilder(target, Client->GetParameters().GrpcMaxMessageSize, AgentMaxReceiveMessage);
- const auto startIndex = NextIndex - TrimmedCount;
- for (size_t i = startIndex; i < WriteQueue.size(); ++i) {
- auto& queueItem = WriteQueue[i];
- if (queueItem.Skipped) {
- NextIndex++;
- continue;
- }
- const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1);
- const size_t serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0;
- if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) {
- YLOG_ERR(Sprintf("single serialized message is too large [%zu] > [%zu], dropping it",
- addResult.NewSerializedRequestSize, serializedLimitToLog));
- queueItem.Skipped = true;
- ++Counters->DroppedMessages;
- Counters->DroppedBytes += queueItem.Size;
- ++Counters->ErrorsCount;
- NextIndex++;
- requestBuilder.ResetCounters();
- continue;
- }
- if (addResult.LimitExceeded) {
- YLOG_DEBUG(Sprintf(
- "batch limit exceeded: [%zu] > [%zu] (limit for serialized batch)"
- "OR [%zu] > [%zu] (limit for raw batch)",
- addResult.NewSerializedRequestSize, serializedLimitToLog,
- addResult.NewRequestPayloadSize, Client->GetParameters().GrpcMaxMessageSize));
- break;
- }
- NextIndex++;
- }
- const auto messagesCount = target.GetDataBatch().SeqNoSize();
- if (messagesCount == 0) {
- return;
- }
- Y_ABORT_UNLESS(requestBuilder.GetSerializedRequestSize() == target.ByteSizeLong(),
- "failed to calculate size for message [%s]", target.ShortDebugString().c_str());
- GrpcInflightMessages += messagesCount;
- GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
- YLOG_DEBUG(Sprintf("new write batch, [%zu] messages, [%zu] bytes, first seq_no [%" PRIu64 "], serialized size [%zu]",
- messagesCount, requestBuilder.GetRequestPayloadSize(),
- *target.GetDataBatch().GetSeqNo().begin(), requestBuilder.GetSerializedRequestSize()));
- ++Counters->GrpcWriteBatchRequests;
- Counters->GrpcInflightMessages += messagesCount;
- Counters->GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
- }
- void TClientSession::Acknowledge(ui64 seqNo) {
- size_t messagesCount = 0;
- size_t bytesCount = 0;
- size_t skippedMessagesCount = 0;
- size_t skippedBytesCount = 0;
- if (AckSeqNo.Defined()) {
- while (!WriteQueue.empty() && ((*AckSeqNo < seqNo) || WriteQueue.front().Skipped)) {
- if (WriteQueue.front().Skipped) {
- skippedMessagesCount++;
- skippedBytesCount += WriteQueue.front().Size;
- } else {
- ++messagesCount;
- bytesCount += WriteQueue.front().Size;
- }
- ++(*AckSeqNo);
- WriteQueue.pop_front();
- ++TrimmedCount;
- }
- }
- if (!AckSeqNo.Defined() || seqNo > *AckSeqNo) {
- AckSeqNo = seqNo;
- }
- Counters->AcknowledgedMessages += messagesCount;
- Counters->AcknowledgedBytes += bytesCount;
- Counters->InflightMessages -= (messagesCount + skippedMessagesCount);
- Counters->InflightBytes -= (bytesCount + skippedBytesCount);
- InflightBytes.fetch_sub(bytesCount);
- Counters->GrpcInflightMessages -= messagesCount;
- Counters->GrpcInflightBytes -= bytesCount;
- GrpcInflightMessages -= messagesCount;
- GrpcInflightBytes -= bytesCount;
- YLOG_DEBUG(Sprintf("ack [%" PRIu64 "], [%zu] messages, [%zu] bytes", seqNo, messagesCount, bytesCount));
- }
- void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) {
- SessionId = sessionId;
- Acknowledge(lastSeqNo);
- NextIndex = TrimmedCount;
- ++Counters->GrpcCallsInitialized;
- Counters->GrpcInflightMessages -= GrpcInflightMessages;
- Counters->GrpcInflightBytes -= GrpcInflightBytes;
- GrpcInflightMessages = 0;
- GrpcInflightBytes = 0;
- YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%" PRIu64 "]",
- sessionId.c_str(), lastSeqNo));
- }
- void TClientSession::OnGrpcCallFinished() {
- Y_ABORT_UNLESS(!Closed);
- Y_ABORT_UNLESS(ActiveGrpcCall);
- ActiveGrpcCall = nullptr;
- if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) {
- DoClose();
- } else {
- const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay;
- MakeGrpcCallTimer->Set(reconnectTime);
- YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str()));
- }
- }
- auto TClientSession::PurgeWriteQueue() -> TPurgeWriteQueueStats {
- size_t bytesCount = 0;
- for (const auto& m: WriteQueue) {
- bytesCount += m.Size;
- }
- auto result = TPurgeWriteQueueStats{WriteQueue.size(), bytesCount};
- Counters->DroppedMessages += WriteQueue.size();
- Counters->DroppedBytes += bytesCount;
- Counters->InflightMessages -= WriteQueue.size();
- Counters->InflightBytes -= bytesCount;
- Counters->GrpcInflightMessages -= GrpcInflightMessages;
- Counters->GrpcInflightBytes -= GrpcInflightBytes;
- InflightBytes.fetch_sub(bytesCount);
- GrpcInflightMessages = 0;
- GrpcInflightBytes = 0;
- WriteQueue.clear();
- return result;
- }
- void TClientSession::DoClose() {
- Y_ABORT_UNLESS(CloseStarted);
- Y_ABORT_UNLESS(!Closed);
- Y_ABORT_UNLESS(!ClosePromise.HasValue());
- MakeGrpcCallTimer->Cancel();
- ForceCloseTimer->Cancel();
- PollTimer->Cancel();
- if (!ForkInProgressLocal && WriteQueue.size() > 0) {
- const auto stats = PurgeWriteQueue();
- ++Counters->ErrorsCount;
- YLOG_ERR(Sprintf("DoClose, dropped [%zu] messages, [%zu] bytes",
- stats.PurgedMessages, stats.PurgedBytes));
- }
- --Client->GetCounters()->ActiveSessionsCount;
- Closed = true;
- ClosePromise.SetValue();
- YLOG_DEBUG("session closed");
- }
- TGrpcCall::TGrpcCall(TClientSession& session)
- : Session(session)
- , AsyncJoinerToken(&Session.GetAsyncJoiner())
- , AcceptTag(MakeIOCallback(this, &TGrpcCall::EndAccept))
- , ReadTag(MakeIOCallback(this, &TGrpcCall::EndRead))
- , WriteTag(MakeIOCallback(this, &TGrpcCall::EndWrite))
- , WritesDoneTag(MakeIOCallback(this, &TGrpcCall::EndWritesDone))
- , FinishTag(MakeIOCallback(this, &TGrpcCall::EndFinish))
- , Logger(session.GetLogger().Child("grpc"))
- , AcceptPending(false)
- , Initialized_(false)
- , ReadPending(false)
- , ReadsDone(false)
- , WritePending(false)
- , WritesBlocked(false)
- , WritesDonePending(false)
- , WritesDone(false)
- , ErrorOccured(false)
- , FinishRequested(false)
- , FinishStarted(false)
- , FinishDone(false)
- , Cancelled(false)
- , Poisoned(false)
- , PoisonPillSent(false)
- , ReuseSessions_(false)
- , FinishStatus()
- , ClientContext()
- , ReaderWriter(nullptr)
- , Request()
- , Response()
- {
- }
- void TGrpcCall::Start() {
- AcceptPending = true;
- auto& client = Session.GetClient();
- ReaderWriter = client.GetStub().AsyncSession(&ClientContext,
- &client.GetCompletionQueue(),
- AcceptTag->Ref());
- YLOG_DEBUG("AsyncSession started");
- }
- TGrpcCall::~TGrpcCall() {
- YLOG_DEBUG("destroyed");
- }
- void TGrpcCall::EnsureFinishStarted() {
- if (!FinishStarted) {
- FinishStarted = true;
- ReaderWriter->Finish(&FinishStatus, FinishTag->Ref());
- YLOG_DEBUG("Finish started");
- }
- }
- bool TGrpcCall::CheckHasError(EIOStatus status, const char* method) {
- if (status == EIOStatus::Error) {
- SetError(Sprintf("%s %s", method, ToString(status).c_str()));
- return true;
- }
- if (ErrorOccured) {
- ScheduleFinishOnError();
- return true;
- }
- return false;
- }
- void TGrpcCall::SetError(const TString& error) {
- if (!Cancelled) {
- YLOG_ERR(error);
- ++Session.GetCounters().ErrorsCount;
- }
- ErrorOccured = true;
- ScheduleFinishOnError();
- }
- void TGrpcCall::ScheduleFinishOnError() {
- if (!AcceptPending && !WritePending && !WritesDonePending) {
- EnsureFinishStarted();
- }
- }
- void TGrpcCall::BeginClose(bool force) {
- if (force) {
- if (!Cancelled) {
- Cancelled = true;
- ClientContext.TryCancel();
- SetError("forced close");
- }
- return;
- }
- YLOG_DEBUG(Sprintf("Close Initialized [%d], AcceptPending [%d], "
- "WritePending [%d], FinishRequested [%d], "
- "ErrorOccured [%d]",
- static_cast<int>(Initialized_),
- static_cast<int>(AcceptPending),
- static_cast<int>(WritePending),
- static_cast<int>(FinishRequested),
- static_cast<int>(ErrorOccured)));
- if (ErrorOccured || FinishRequested) {
- return;
- }
- FinishRequested = true;
- if (!Initialized_ || WritePending) {
- return;
- }
- WritesBlocked = true;
- BeginWritesDone();
- }
- void TGrpcCall::Poison() {
- Poisoned = true;
- NotifyMessageAdded();
- }
- void TGrpcCall::NotifyMessageAdded() {
- if (WritePending || !Initialized_ || ErrorOccured || FinishRequested) {
- return;
- }
- ScheduleWrite();
- }
- void TGrpcCall::ScheduleWrite() {
- Request.Clear();
- if (!Poisoned) {
- Session.PrepareWriteBatchRequest(Request);
- } else if (!PoisonPillSent) {
- PoisonPillSent = true;
- auto& batch = *Request.mutable_data_batch();
- batch.AddSeqNo(std::numeric_limits<::google::protobuf::uint64>::max());
- batch.AddTimestamp(Now().MicroSeconds());
- batch.AddPayload("");
- YLOG_INFO("poison pill sent");
- }
- if (Request.GetDataBatch().GetSeqNo().empty()) {
- if (FinishRequested) {
- WritesBlocked = true;
- BeginWritesDone();
- }
- return;
- }
- BeginWrite();
- }
- void TGrpcCall::EndAccept(EIOStatus status) {
- Y_ABORT_UNLESS(AcceptPending);
- AcceptPending = false;
- if (CheckHasError(status, "EndAccept")) {
- return;
- }
- BeginRead();
- Request.Clear();
- Session.PrepareInitializeRequest(Request);
- BeginWrite();
- }
- void TGrpcCall::EndRead(EIOStatus status) {
- ReadPending = false;
- if (FinishDone) {
- Session.OnGrpcCallFinished();
- return;
- }
- if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) {
- Y_ABORT_UNLESS(!WritePending);
- YLOG_DEBUG("EndRead ReadsDone");
- ReadsDone = true;
- if (WritesDone) {
- EnsureFinishStarted();
- return;
- }
- return;
- }
- if (CheckHasError(status, "EndRead")) {
- return;
- }
- if (!Initialized_) {
- const auto metadata = ClientContext.GetServerInitialMetadata();
- {
- const auto it = metadata.find("ua-reuse-sessions");
- if (it != metadata.end() && it->second == "true") {
- ReuseSessions_ = true;
- }
- }
- {
- const auto it = metadata.find("ua-max-receive-message-size");
- if (it != metadata.end()) {
- Session.SetAgentMaxReceiveMessage(FromString<size_t>(TString{it->second.begin(), it->second.end()}));
- }
- }
- if (Response.response_case() != NUnifiedAgentProto::Response::kInitialized) {
- SetError(Sprintf("EndRead while initializing, unexpected response_case [%d]",
- static_cast<int>(Response.response_case())));
- return;
- }
- Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(),
- Response.GetInitialized().GetLastSeqNo());
- Initialized_ = true;
- if (!WritePending) {
- ScheduleWrite();
- }
- } else {
- if (Response.response_case() != NUnifiedAgentProto::Response::kAck) {
- SetError(Sprintf("EndRead unexpected response_case [%d]",
- static_cast<int>(Response.response_case())));
- return;
- }
- Session.Acknowledge(Response.GetAck().GetSeqNo());
- }
- BeginRead();
- }
- void TGrpcCall::EndWrite(EIOStatus status) {
- WritePending = false;
- if (CheckHasError(status, "EndWrite")) {
- return;
- }
- if (!Initialized_) {
- return;
- }
- ScheduleWrite();
- }
- void TGrpcCall::EndFinish(EIOStatus status) {
- FinishDone = true;
- const auto finishStatus = status == EIOStatus::Error
- ? grpc::Status(grpc::UNKNOWN, "finish error")
- : FinishStatus;
- YLOG(finishStatus.ok() || Cancelled || Poisoned ? TLOG_DEBUG : TLOG_ERR,
- Sprintf("EndFinish, code [%s], message [%s]",
- ToString(finishStatus.error_code()).c_str(),
- finishStatus.error_message().c_str()),
- Logger);
- if (!finishStatus.ok() && !Cancelled) {
- ++Session.GetCounters().ErrorsCount;
- }
- if (!ReadPending) {
- Session.OnGrpcCallFinished();
- }
- }
- void TGrpcCall::EndWritesDone(EIOStatus status) {
- YLOG_DEBUG(Sprintf("EndWritesDone [%s]", ToString(status).c_str()));
- Y_ABORT_UNLESS(!WritePending && !WritesDone && WritesDonePending);
- WritesDonePending = false;
- WritesDone = true;
- if (CheckHasError(status, "EndWriteDone")) {
- return;
- }
- if (ReadsDone) {
- EnsureFinishStarted();
- }
- }
- void TGrpcCall::BeginWritesDone() {
- WritesDonePending = true;
- ReaderWriter->WritesDone(WritesDoneTag->Ref());
- YLOG_DEBUG("WritesDone started");
- }
- void TGrpcCall::BeginRead() {
- ReadPending = true;
- Response.Clear();
- ReaderWriter->Read(&Response, ReadTag->Ref());
- YLOG_DEBUG("Read started");
- }
- void TGrpcCall::BeginWrite() {
- WritePending = true;
- ReaderWriter->Write(Request, WriteTag->Ref());
- YLOG_DEBUG("Write started");
- }
- }
- namespace NUnifiedAgent {
- size_t SizeOf(const TClientMessage& message) {
- auto result = message.Payload.Size() + sizeof(TInstant);
- if (message.Meta.Defined()) {
- for (const auto& m: *message.Meta) {
- result += m.first.Size() + m.second.Size();
- }
- }
- return result;
- }
- TClientParameters::TClientParameters(const TString& uri)
- : Uri(uri)
- , SharedSecretKey(Nothing())
- , MaxInflightBytes(DefaultMaxInflightBytes)
- , Log(TLoggerOperator<TGlobalLog>::Log())
- , LogRateLimitBytes(Nothing())
- , GrpcReconnectDelay(TDuration::MilliSeconds(50))
- , GrpcSendDelay(DefaultGrpcSendDelay)
- , EnableForkSupport(false)
- , GrpcMaxMessageSize(DefaultGrpcMaxMessageSize)
- , Counters(nullptr)
- {
- }
- TSessionParameters::TSessionParameters()
- : SessionId(Nothing())
- , Meta(Nothing())
- , Counters(nullptr)
- , MaxInflightBytes()
- {
- }
- const size_t TClientParameters::DefaultMaxInflightBytes = 10_MB;
- const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB;
- const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10);
- TClientPtr MakeClient(const TClientParameters& parameters) {
- // Initialization of the Fork in newest grcp core is performed
- // in 'do_basic_init', which is called inside 'grpc_is_initialized'.
- // So the set of the fork env variable has to be done before
- // grpc_is_initialized call.
- #ifdef _unix_
- if (parameters.EnableForkSupport) {
- SetEnv("GRPC_ENABLE_FORK_SUPPORT", "true");
- }
- #endif
- if (!grpc_is_initialized()) {
- EnsureGrpcConfigured();
- }
- std::shared_ptr<NPrivate::TForkProtector> forkProtector{};
- #ifdef _unix_
- if (parameters.EnableForkSupport) {
- forkProtector = NPrivate::TForkProtector::Get(true);
- }
- #endif
- return MakeIntrusive<NPrivate::TClient>(parameters, forkProtector);
- }
- }
|