client.h 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. #pragma once
  2. #include <library/cpp/unified_agent_client/counters.h>
  3. #include <library/cpp/logger/log.h>
  4. #include <library/cpp/threading/future/future.h>
  5. #include <util/datetime/base.h>
  6. #include <util/generic/hash.h>
  7. #include <util/generic/maybe.h>
  8. #include <util/generic/string.h>
  9. namespace NUnifiedAgent {
  10. struct TClientParameters {
  11. // uri format https://github.com/grpc/grpc/blob/master/doc/naming.md
  12. // for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp
  13. explicit TClientParameters(const TString& uri);
  14. // Simple way to protect against writing to unintended/invalid Unified Agent endpoint.
  15. // Must correspond to 'shared_secret_key' grpc input parameter
  16. // (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219),
  17. // session would end with error otherwise.
  18. //
  19. // Default: not set
  20. TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) {
  21. SharedSecretKey = sharedSecretKey;
  22. return *this;
  23. }
  24. // Max bytes count that have been received by client session but not acknowledged yet.
  25. // When exceeded, new messages will be discarded, an error message
  26. // will be written to the TLog instance and drop counter will be incremented.
  27. //
  28. // Default: 10 mb
  29. TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
  30. MaxInflightBytes = maxInflightBytes;
  31. return *this;
  32. }
  33. // TLog instance for client library's own logs.
  34. //
  35. // Default: TLoggerOperator<TGlobalLog>::Log()
  36. TClientParameters& SetLog(TLog log) {
  37. Log = std::move(log);
  38. return *this;
  39. }
  40. // Throttle client library log by rate limit in bytes, excess will be discarded.
  41. //
  42. // Default: not set
  43. TClientParameters& SetLogRateLimit(size_t bytesPerSec) {
  44. LogRateLimitBytes = bytesPerSec;
  45. return *this;
  46. }
  47. // Try to establish new grpc session if the current one become broken.
  48. // Session may break either due to agent unavailability, or the agent itself may
  49. // reject new session creation if it does not satisfy certain
  50. // conditions - shared_secret_key does not match, the session creation rate has been
  51. // exceeded, invalid session metadata has been used and so on.
  52. // Attempts to establish a grpc session will continue indefinitely.
  53. //
  54. // Default: 50 millis
  55. TClientParameters& SetGrpcReconnectDelay(TDuration delay) {
  56. GrpcReconnectDelay = delay;
  57. return *this;
  58. }
  59. // Grpc usually writes data to the socket faster than it comes from the client.
  60. // This means that it's possible that each TClientMessage would be sent in it's own grpc message.
  61. // This is expensive in terms of cpu, since grpc makes at least one syscall
  62. // for each message on the sender and receiver sides.
  63. // To avoid a large number of syscalls, the client holds incoming messages
  64. // in internal buffer in hope of being able to assemble bigger grpc batch.
  65. // This parameter sets the timeout for this delay - from IClientSession::Send
  66. // call to the actual sending of the corresponding grpc message.
  67. //
  68. // Default: 10 millis.
  69. TClientParameters& SetGrpcSendDelay(TDuration delay) {
  70. GrpcSendDelay = delay;
  71. return *this;
  72. }
  73. // Client library sends messages to grpc in batches, this parameter
  74. // establishes upper limit on the size of single batch in bytes.
  75. // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185)
  76. // in grpc input config, it must be grater than GrpcMaxMessageSize.
  77. //
  78. // Default: 1 mb
  79. TClientParameters& SetGrpcMaxMessageSize(size_t size) {
  80. GrpcMaxMessageSize = size;
  81. return *this;
  82. }
  83. // Enable forks handling in client library.
  84. // Multiple threads and concurrent forks are all supported is this regime.
  85. //
  86. // Default: false
  87. TClientParameters& SetEnableForkSupport(bool value) {
  88. EnableForkSupport = value;
  89. return *this;
  90. }
  91. // Client library counters.
  92. // App can set this to some leaf of it's TDynamicCounters tree.
  93. // Actual provided counters are listed in TClientCounters.
  94. //
  95. // Default: not set
  96. TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
  97. return SetCounters(MakeIntrusive<TClientCounters>(counters));
  98. }
  99. TClientParameters& SetCounters(const TIntrusivePtr<TClientCounters>& counters) {
  100. Counters = counters;
  101. return *this;
  102. }
  103. public:
  104. static const size_t DefaultMaxInflightBytes;
  105. static const size_t DefaultGrpcMaxMessageSize;
  106. static const TDuration DefaultGrpcSendDelay;
  107. public:
  108. TString Uri;
  109. TMaybe<TString> SharedSecretKey;
  110. size_t MaxInflightBytes;
  111. TLog Log;
  112. TMaybe<size_t> LogRateLimitBytes;
  113. TDuration GrpcReconnectDelay;
  114. TDuration GrpcSendDelay;
  115. bool EnableForkSupport;
  116. size_t GrpcMaxMessageSize;
  117. TIntrusivePtr<TClientCounters> Counters;
  118. };
  119. struct TSessionParameters {
  120. TSessionParameters();
  121. // Session unique identifier.
  122. // It's guaranteed that for messages with the same sessionId relative
  123. // ordering of the messages will be preserved at all processing stages
  124. // in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker)
  125. //
  126. // Default: generated automatically by Unified Agent.
  127. TSessionParameters& SetSessionId(const TString& sessionId) {
  128. SessionId = sessionId;
  129. return *this;
  130. }
  131. // Session metadata as key-value set.
  132. // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
  133. //
  134. // Default: not set
  135. TSessionParameters& SetMeta(const THashMap<TString, TString>& meta) {
  136. Meta = meta;
  137. return *this;
  138. }
  139. // Session counters.
  140. // Actual provided counters are listed in TClientSessionCounters.
  141. //
  142. // Default: A single common for all sessions subgroup of client TDynamicCounters instance
  143. // with label ('session': 'default').
  144. TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
  145. return SetCounters(MakeIntrusive<TClientSessionCounters>(counters));
  146. }
  147. TSessionParameters& SetCounters(const TIntrusivePtr<TClientSessionCounters>& counters) {
  148. Counters = counters;
  149. return *this;
  150. }
  151. // Max bytes count that have been received by client session but not acknowledged yet.
  152. // When exceeded, new messages will be discarded, an error message
  153. // will be written to the TLog instance and drop counter will be incremented.
  154. //
  155. // Default: value from client settings
  156. TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
  157. MaxInflightBytes = maxInflightBytes;
  158. return *this;
  159. }
  160. public:
  161. TMaybe<TString> SessionId;
  162. TMaybe<THashMap<TString, TString>> Meta;
  163. TIntrusivePtr<TClientSessionCounters> Counters;
  164. TMaybe<size_t> MaxInflightBytes;
  165. };
  166. // Message data to be sent to unified agent.
  167. struct TClientMessage {
  168. // Opaque message payload.
  169. TString Payload;
  170. // Message metadata as key-value set.
  171. // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
  172. //
  173. // Default: not set
  174. TMaybe<THashMap<TString, TString>> Meta{};
  175. // Message timestamp.
  176. //
  177. // Default: time the client library has received this instance of TClientMessage.
  178. TMaybe<TInstant> Timestamp{};
  179. };
  180. // Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc).
  181. size_t SizeOf(const TClientMessage& message);
  182. class IClientSession: public TAtomicRefCount<IClientSession> {
  183. public:
  184. virtual ~IClientSession() = default;
  185. // Places the message into send queue. Actual grpc call may occur later asynchronously,
  186. // based on settings GrpcSendDelay and GrpcMaxMessageSize.
  187. // A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes
  188. // settings are exceeded, or if the Close method has already been called.
  189. // In this case an error message will be written to the TLog instance
  190. // and drop counter will be incremented.
  191. virtual void Send(TClientMessage&& message) = 0;
  192. void Send(const TClientMessage& message) {
  193. Send(TClientMessage(message));
  194. }
  195. // Waits until either all current inflight messages are
  196. // acknowledged or the specified deadline is reached.
  197. // Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel).
  198. virtual NThreading::TFuture<void> CloseAsync(TInstant deadline) = 0;
  199. void Close(TInstant deadline) {
  200. CloseAsync(deadline).Wait();
  201. }
  202. void Close(TDuration timeout = TDuration::Seconds(3)) {
  203. Close(Now() + timeout);
  204. }
  205. };
  206. using TClientSessionPtr = TIntrusivePtr<IClientSession>;
  207. class IClient: public TAtomicRefCount<IClient> {
  208. public:
  209. virtual ~IClient() = default;
  210. virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0;
  211. virtual void StartTracing(ELogPriority) {
  212. }
  213. virtual void FinishTracing() {
  214. }
  215. };
  216. using TClientPtr = TIntrusivePtr<IClient>;
  217. TClientPtr MakeClient(const TClientParameters& parameters);
  218. }