123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- #pragma once
- #include <library/cpp/unified_agent_client/counters.h>
- #include <library/cpp/logger/log.h>
- #include <library/cpp/threading/future/future.h>
- #include <util/datetime/base.h>
- #include <util/generic/hash.h>
- #include <util/generic/maybe.h>
- #include <util/generic/string.h>
- namespace NUnifiedAgent {
- struct TClientParameters {
- // uri format https://github.com/grpc/grpc/blob/master/doc/naming.md
- // for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp
- explicit TClientParameters(const TString& uri);
- // Simple way to protect against writing to unintended/invalid Unified Agent endpoint.
- // Must correspond to 'shared_secret_key' grpc input parameter
- // (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219),
- // session would end with error otherwise.
- //
- // Default: not set
- TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) {
- SharedSecretKey = sharedSecretKey;
- return *this;
- }
- // Max bytes count that have been received by client session but not acknowledged yet.
- // When exceeded, new messages will be discarded, an error message
- // will be written to the TLog instance and drop counter will be incremented.
- //
- // Default: 10 mb
- TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
- MaxInflightBytes = maxInflightBytes;
- return *this;
- }
- // TLog instance for client library's own logs.
- //
- // Default: TLoggerOperator<TGlobalLog>::Log()
- TClientParameters& SetLog(TLog log) {
- Log = std::move(log);
- return *this;
- }
- // Throttle client library log by rate limit in bytes, excess will be discarded.
- //
- // Default: not set
- TClientParameters& SetLogRateLimit(size_t bytesPerSec) {
- LogRateLimitBytes = bytesPerSec;
- return *this;
- }
- // Try to establish new grpc session if the current one become broken.
- // Session may break either due to agent unavailability, or the agent itself may
- // reject new session creation if it does not satisfy certain
- // conditions - shared_secret_key does not match, the session creation rate has been
- // exceeded, invalid session metadata has been used and so on.
- // Attempts to establish a grpc session will continue indefinitely.
- //
- // Default: 50 millis
- TClientParameters& SetGrpcReconnectDelay(TDuration delay) {
- GrpcReconnectDelay = delay;
- return *this;
- }
- // Grpc usually writes data to the socket faster than it comes from the client.
- // This means that it's possible that each TClientMessage would be sent in it's own grpc message.
- // This is expensive in terms of cpu, since grpc makes at least one syscall
- // for each message on the sender and receiver sides.
- // To avoid a large number of syscalls, the client holds incoming messages
- // in internal buffer in hope of being able to assemble bigger grpc batch.
- // This parameter sets the timeout for this delay - from IClientSession::Send
- // call to the actual sending of the corresponding grpc message.
- //
- // Default: 10 millis.
- TClientParameters& SetGrpcSendDelay(TDuration delay) {
- GrpcSendDelay = delay;
- return *this;
- }
- // Client library sends messages to grpc in batches, this parameter
- // establishes upper limit on the size of single batch in bytes.
- // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185)
- // in grpc input config, it must be grater than GrpcMaxMessageSize.
- //
- // Default: 1 mb
- TClientParameters& SetGrpcMaxMessageSize(size_t size) {
- GrpcMaxMessageSize = size;
- return *this;
- }
- // Enable forks handling in client library.
- // Multiple threads and concurrent forks are all supported is this regime.
- //
- // Default: false
- TClientParameters& SetEnableForkSupport(bool value) {
- EnableForkSupport = value;
- return *this;
- }
- // Client library counters.
- // App can set this to some leaf of it's TDynamicCounters tree.
- // Actual provided counters are listed in TClientCounters.
- //
- // Default: not set
- TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
- return SetCounters(MakeIntrusive<TClientCounters>(counters));
- }
- TClientParameters& SetCounters(const TIntrusivePtr<TClientCounters>& counters) {
- Counters = counters;
- return *this;
- }
- public:
- static const size_t DefaultMaxInflightBytes;
- static const size_t DefaultGrpcMaxMessageSize;
- static const TDuration DefaultGrpcSendDelay;
- public:
- TString Uri;
- TMaybe<TString> SharedSecretKey;
- size_t MaxInflightBytes;
- TLog Log;
- TMaybe<size_t> LogRateLimitBytes;
- TDuration GrpcReconnectDelay;
- TDuration GrpcSendDelay;
- bool EnableForkSupport;
- size_t GrpcMaxMessageSize;
- TIntrusivePtr<TClientCounters> Counters;
- };
- struct TSessionParameters {
- TSessionParameters();
- // Session unique identifier.
- // It's guaranteed that for messages with the same sessionId relative
- // ordering of the messages will be preserved at all processing stages
- // in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker)
- //
- // Default: generated automatically by Unified Agent.
- TSessionParameters& SetSessionId(const TString& sessionId) {
- SessionId = sessionId;
- return *this;
- }
- // Session metadata as key-value set.
- // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
- //
- // Default: not set
- TSessionParameters& SetMeta(const THashMap<TString, TString>& meta) {
- Meta = meta;
- return *this;
- }
- // Session counters.
- // Actual provided counters are listed in TClientSessionCounters.
- //
- // Default: A single common for all sessions subgroup of client TDynamicCounters instance
- // with label ('session': 'default').
- TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
- return SetCounters(MakeIntrusive<TClientSessionCounters>(counters));
- }
- TSessionParameters& SetCounters(const TIntrusivePtr<TClientSessionCounters>& counters) {
- Counters = counters;
- return *this;
- }
- // Max bytes count that have been received by client session but not acknowledged yet.
- // When exceeded, new messages will be discarded, an error message
- // will be written to the TLog instance and drop counter will be incremented.
- //
- // Default: value from client settings
- TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
- MaxInflightBytes = maxInflightBytes;
- return *this;
- }
- public:
- TMaybe<TString> SessionId;
- TMaybe<THashMap<TString, TString>> Meta;
- TIntrusivePtr<TClientSessionCounters> Counters;
- TMaybe<size_t> MaxInflightBytes;
- };
- // Message data to be sent to unified agent.
- struct TClientMessage {
- // Opaque message payload.
- TString Payload;
- // Message metadata as key-value set.
- // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
- //
- // Default: not set
- TMaybe<THashMap<TString, TString>> Meta{};
- // Message timestamp.
- //
- // Default: time the client library has received this instance of TClientMessage.
- TMaybe<TInstant> Timestamp{};
- };
- // Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc).
- size_t SizeOf(const TClientMessage& message);
- class IClientSession: public TAtomicRefCount<IClientSession> {
- public:
- virtual ~IClientSession() = default;
- // Places the message into send queue. Actual grpc call may occur later asynchronously,
- // based on settings GrpcSendDelay and GrpcMaxMessageSize.
- // A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes
- // settings are exceeded, or if the Close method has already been called.
- // In this case an error message will be written to the TLog instance
- // and drop counter will be incremented.
- virtual void Send(TClientMessage&& message) = 0;
- void Send(const TClientMessage& message) {
- Send(TClientMessage(message));
- }
- // Waits until either all current inflight messages are
- // acknowledged or the specified deadline is reached.
- // Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel).
- virtual NThreading::TFuture<void> CloseAsync(TInstant deadline) = 0;
- void Close(TInstant deadline) {
- CloseAsync(deadline).Wait();
- }
- void Close(TDuration timeout = TDuration::Seconds(3)) {
- Close(Now() + timeout);
- }
- };
- using TClientSessionPtr = TIntrusivePtr<IClientSession>;
- class IClient: public TAtomicRefCount<IClient> {
- public:
- virtual ~IClient() = default;
- virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0;
- virtual void StartTracing(ELogPriority) {
- }
- virtual void FinishTracing() {
- }
- };
- using TClientPtr = TIntrusivePtr<IClient>;
- TClientPtr MakeClient(const TClientParameters& parameters);
- }
|