client_impl.h 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. #pragma once
  2. #include <library/cpp/unified_agent_client/client.h>
  3. #include <library/cpp/unified_agent_client/client_proto_weighing.h>
  4. #include <library/cpp/unified_agent_client/counters.h>
  5. #include <library/cpp/unified_agent_client/logger.h>
  6. #include <library/cpp/unified_agent_client/variant.h>
  7. #include <library/cpp/unified_agent_client/proto/unified_agent.grpc.pb.h>
  8. #include <library/cpp/unified_agent_client/grpc_io.h>
  9. #include <library/cpp/logger/global/global.h>
  10. #include <util/generic/deque.h>
  11. #include <util/system/mutex.h>
  12. namespace NUnifiedAgent::NPrivate {
  13. class TClientSession;
  14. class TGrpcCall;
  15. class TForkProtector;
  16. class TClient: public IClient {
  17. public:
  18. explicit TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector);
  19. ~TClient() override;
  20. TClientSessionPtr CreateSession(const TSessionParameters& parameters) override;
  21. void StartTracing(ELogPriority logPriority) override;
  22. void FinishTracing() override;
  23. inline const TIntrusivePtr<TClientCounters>& GetCounters() const noexcept {
  24. return Counters;
  25. }
  26. inline NUnifiedAgentProto::UnifiedAgentService::Stub& GetStub() noexcept {
  27. return *Stub;
  28. }
  29. TScopeLogger CreateSessionLogger();
  30. inline const TClientParameters& GetParameters() const noexcept {
  31. return Parameters;
  32. }
  33. inline grpc::CompletionQueue& GetCompletionQueue() noexcept {
  34. return ActiveCompletionQueue->GetCompletionQueue();
  35. }
  36. void RegisterSession(TClientSession* session);
  37. void UnregisterSession(TClientSession* session);
  38. void PreFork();
  39. void PostForkParent();
  40. void PostForkChild();
  41. void EnsureStarted();
  42. private:
  43. void EnsureStartedNoLock();
  44. void EnsureStoppedNoLock();
  45. private:
  46. const TClientParameters Parameters;
  47. std::shared_ptr<TForkProtector> ForkProtector;
  48. TIntrusivePtr<TClientCounters> Counters;
  49. TLog Log;
  50. TLogger MainLogger;
  51. TScopeLogger Logger;
  52. std::shared_ptr<grpc::Channel> Channel;
  53. std::unique_ptr<NUnifiedAgentProto::UnifiedAgentService::Stub> Stub;
  54. THolder<TGrpcCompletionQueueHost> ActiveCompletionQueue;
  55. std::atomic<size_t> SessionLogLabel;
  56. TVector<TClientSession*> ActiveSessions;
  57. bool Started;
  58. bool Destroyed;
  59. TAdaptiveLock Lock;
  60. static std::atomic<ui64> Id;
  61. };
  62. class TForkProtector {
  63. public:
  64. TForkProtector();
  65. void Register(TClient& client);
  66. void Unregister(TClient& client);
  67. static std::shared_ptr<TForkProtector> Get(bool createIfNotExists);
  68. private:
  69. static void PreFork();
  70. static void PostForkParent();
  71. static void PostForkChild();
  72. private:
  73. TVector<TClient*> Clients;
  74. grpc::internal::GrpcLibrary GrpcInitializer;
  75. bool Enabled;
  76. TAdaptiveLock Lock;
  77. static std::weak_ptr<TForkProtector> Instance;
  78. static TMutex InstanceLock;
  79. static bool SubscribedToForks;
  80. };
  81. class TClientSession: public IClientSession {
  82. public:
  83. TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters);
  84. ~TClientSession();
  85. void Send(TClientMessage&& message) override;
  86. NThreading::TFuture<void> CloseAsync(TInstant deadline) override;
  87. inline TClient& GetClient() noexcept {
  88. return *Client;
  89. }
  90. inline TScopeLogger& GetLogger() noexcept {
  91. return Logger;
  92. }
  93. inline TClientSessionCounters& GetCounters() noexcept {
  94. return *Counters;
  95. }
  96. inline TAsyncJoiner& GetAsyncJoiner() noexcept {
  97. return AsyncJoiner;
  98. }
  99. void PrepareInitializeRequest(NUnifiedAgentProto::Request& target);
  100. void PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target);
  101. void Acknowledge(ui64 seqNo);
  102. void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo);
  103. void OnGrpcCallFinished();
  104. NThreading::TFuture<void> PreFork();
  105. void PostForkParent();
  106. void PostForkChild();
  107. void SetAgentMaxReceiveMessage(size_t);
  108. private:
  109. enum class EPollingStatus {
  110. Active,
  111. Inactive
  112. };
  113. struct TCloseRequestedEvent {
  114. TInstant Deadline;
  115. };
  116. struct TMessageReceivedEvent {
  117. TClientMessage Message;
  118. size_t Size;
  119. };
  120. struct TPurgeWriteQueueStats {
  121. size_t PurgedMessages{};
  122. size_t PurgedBytes{};
  123. };
  124. using TEvent = std::variant<TCloseRequestedEvent, TMessageReceivedEvent>;
  125. public:
  126. struct TPendingMessage {
  127. TClientMessage Message;
  128. size_t Size;
  129. bool Skipped;
  130. };
  131. class TRequestBuilder {
  132. public:
  133. struct TAddResult;
  134. public:
  135. TRequestBuilder(NUnifiedAgentProto::Request &target, size_t RequestPayloadLimitBytes,
  136. TFMaybe<size_t> serializedRequestLimitBytes);
  137. TAddResult TryAddMessage(const TPendingMessage& message, size_t seqNo);
  138. void ResetCounters();
  139. inline size_t GetSerializedRequestSize() const {
  140. return SerializedRequestSize;
  141. }
  142. inline size_t GetRequestPayloadSize() const {
  143. return RequestPayloadSize;
  144. }
  145. public:
  146. struct TAddResult {
  147. bool LimitExceeded;
  148. size_t NewRequestPayloadSize; // == actual value, if !LimitExceeded
  149. size_t NewSerializedRequestSize; // == actual value, if !LimitExceeded
  150. };
  151. private:
  152. struct TMetaItemBuilder {
  153. size_t ItemIndex;
  154. size_t ValueIndex{0};
  155. };
  156. private:
  157. NUnifiedAgentProto::Request& Target;
  158. TFMaybe<NPW::TRequest> PwTarget;
  159. THashMap<TString, TMetaItemBuilder> MetaItems;
  160. size_t RequestPayloadSize;
  161. size_t RequestPayloadLimitBytes;
  162. size_t SerializedRequestSize;
  163. TFMaybe<size_t> SerializedRequestLimitBytes;
  164. bool CountersInvalid;
  165. };
  166. private:
  167. void MakeGrpcCall();
  168. void DoClose();
  169. void BeginClose(TInstant deadline);
  170. void Poll();
  171. TPurgeWriteQueueStats PurgeWriteQueue();
  172. void DoStart();
  173. private:
  174. TAsyncJoiner AsyncJoiner;
  175. TIntrusivePtr<TClient> Client;
  176. TFMaybe<TString> OriginalSessionId;
  177. TFMaybe<TString> SessionId;
  178. TFMaybe<THashMap<TString, TString>> Meta;
  179. TScopeLogger Logger;
  180. bool CloseStarted;
  181. bool ForcedCloseStarted;
  182. bool Closed;
  183. bool ForkInProgressLocal;
  184. bool Started;
  185. NThreading::TPromise<void> ClosePromise;
  186. TIntrusivePtr<TGrpcCall> ActiveGrpcCall;
  187. TDeque<TPendingMessage> WriteQueue;
  188. size_t TrimmedCount;
  189. size_t NextIndex;
  190. TFMaybe<ui64> AckSeqNo;
  191. TInstant PollerLastEventTimestamp;
  192. TIntrusivePtr<TClientSessionCounters> Counters;
  193. THolder<TGrpcTimer> MakeGrpcCallTimer;
  194. THolder<TGrpcTimer> ForceCloseTimer;
  195. THolder<TGrpcTimer> PollTimer;
  196. ui64 GrpcInflightMessages;
  197. ui64 GrpcInflightBytes;
  198. std::atomic<size_t> InflightBytes;
  199. bool CloseRequested;
  200. size_t EventsBatchSize;
  201. EPollingStatus PollingStatus;
  202. THolder<TGrpcNotification> EventNotification;
  203. bool EventNotificationTriggered;
  204. TVector<TEvent> EventsBatch;
  205. TVector<TEvent> SecondaryEventsBatch;
  206. std::atomic<bool> ForkInProgress;
  207. TAdaptiveLock Lock;
  208. size_t MaxInflightBytes;
  209. TFMaybe<size_t> AgentMaxReceiveMessage;
  210. };
  211. class TGrpcCall final: public TAtomicRefCount<TGrpcCall> {
  212. public:
  213. explicit TGrpcCall(TClientSession& session);
  214. void Start();
  215. ~TGrpcCall();
  216. void BeginClose(bool force);
  217. void Poison();
  218. void NotifyMessageAdded();
  219. inline bool Initialized() const {
  220. return Initialized_;
  221. }
  222. inline bool ReuseSessions() const {
  223. return ReuseSessions_;
  224. }
  225. private:
  226. void EndAccept(EIOStatus status);
  227. void EndRead(EIOStatus status);
  228. void EndWrite(EIOStatus status);
  229. void EndFinish(EIOStatus status);
  230. void EndWritesDone(EIOStatus);
  231. void ScheduleWrite();
  232. void BeginWritesDone();
  233. bool CheckHasError(EIOStatus status, const char* method);
  234. void SetError(const TString& error);
  235. void EnsureFinishStarted();
  236. void BeginRead();
  237. void BeginWrite();
  238. void ScheduleFinishOnError();
  239. private:
  240. TClientSession& Session;
  241. TAsyncJoinerToken AsyncJoinerToken;
  242. THolder<IIOCallback> AcceptTag;
  243. THolder<IIOCallback> ReadTag;
  244. THolder<IIOCallback> WriteTag;
  245. THolder<IIOCallback> WritesDoneTag;
  246. THolder<IIOCallback> FinishTag;
  247. TScopeLogger Logger;
  248. bool AcceptPending;
  249. bool Initialized_;
  250. bool ReadPending;
  251. bool ReadsDone;
  252. bool WritePending;
  253. bool WritesBlocked;
  254. bool WritesDonePending;
  255. bool WritesDone;
  256. bool ErrorOccured;
  257. bool FinishRequested;
  258. bool FinishStarted;
  259. bool FinishDone;
  260. bool Cancelled;
  261. bool Poisoned;
  262. bool PoisonPillSent;
  263. bool ReuseSessions_;
  264. grpc::Status FinishStatus;
  265. grpc::ClientContext ClientContext;
  266. std::unique_ptr<grpc::ClientAsyncReaderWriter<NUnifiedAgentProto::Request, NUnifiedAgentProto::Response>> ReaderWriter;
  267. NUnifiedAgentProto::Request Request;
  268. NUnifiedAgentProto::Response Response;
  269. };
  270. }