#pragma once #include #include #include #include #include #include #include namespace NUnifiedAgent { struct TClientParameters { // uri format https://github.com/grpc/grpc/blob/master/doc/naming.md // for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp explicit TClientParameters(const TString& uri); // Simple way to protect against writing to unintended/invalid Unified Agent endpoint. // Must correspond to 'shared_secret_key' grpc input parameter // (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219), // session would end with error otherwise. // // Default: not set TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) { SharedSecretKey = sharedSecretKey; return *this; } // Max bytes count that have been received by client session but not acknowledged yet. // When exceeded, new messages will be discarded, an error message // will be written to the TLog instance and drop counter will be incremented. // // Default: 10 mb TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) { MaxInflightBytes = maxInflightBytes; return *this; } // TLog instance for client library's own logs. // // Default: TLoggerOperator::Log() TClientParameters& SetLog(TLog log) { Log = std::move(log); return *this; } // Throttle client library log by rate limit in bytes, excess will be discarded. // // Default: not set TClientParameters& SetLogRateLimit(size_t bytesPerSec) { LogRateLimitBytes = bytesPerSec; return *this; } // Try to establish new grpc session if the current one become broken. // Session may break either due to agent unavailability, or the agent itself may // reject new session creation if it does not satisfy certain // conditions - shared_secret_key does not match, the session creation rate has been // exceeded, invalid session metadata has been used and so on. // Attempts to establish a grpc session will continue indefinitely. // // Default: 50 millis TClientParameters& SetGrpcReconnectDelay(TDuration delay) { GrpcReconnectDelay = delay; return *this; } // Grpc usually writes data to the socket faster than it comes from the client. // This means that it's possible that each TClientMessage would be sent in it's own grpc message. // This is expensive in terms of cpu, since grpc makes at least one syscall // for each message on the sender and receiver sides. // To avoid a large number of syscalls, the client holds incoming messages // in internal buffer in hope of being able to assemble bigger grpc batch. // This parameter sets the timeout for this delay - from IClientSession::Send // call to the actual sending of the corresponding grpc message. // // Default: 10 millis. TClientParameters& SetGrpcSendDelay(TDuration delay) { GrpcSendDelay = delay; return *this; } // Client library sends messages to grpc in batches, this parameter // establishes upper limit on the size of single batch in bytes. // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185) // in grpc input config, it must be grater than GrpcMaxMessageSize. // // Default: 1 mb TClientParameters& SetGrpcMaxMessageSize(size_t size) { GrpcMaxMessageSize = size; return *this; } // Enable forks handling in client library. // Multiple threads and concurrent forks are all supported is this regime. // // Default: false TClientParameters& SetEnableForkSupport(bool value) { EnableForkSupport = value; return *this; } // Client library counters. // App can set this to some leaf of it's TDynamicCounters tree. // Actual provided counters are listed in TClientCounters. // // Default: not set TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) { return SetCounters(MakeIntrusive(counters)); } TClientParameters& SetCounters(const TIntrusivePtr& counters) { Counters = counters; return *this; } public: static const size_t DefaultMaxInflightBytes; static const size_t DefaultGrpcMaxMessageSize; static const TDuration DefaultGrpcSendDelay; public: TString Uri; TMaybe SharedSecretKey; size_t MaxInflightBytes; TLog Log; TMaybe LogRateLimitBytes; TDuration GrpcReconnectDelay; TDuration GrpcSendDelay; bool EnableForkSupport; size_t GrpcMaxMessageSize; TIntrusivePtr Counters; }; struct TSessionParameters { TSessionParameters(); // Session unique identifier. // It's guaranteed that for messages with the same sessionId relative // ordering of the messages will be preserved at all processing stages // in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker) // // Default: generated automatically by Unified Agent. TSessionParameters& SetSessionId(const TString& sessionId) { SessionId = sessionId; return *this; } // Session metadata as key-value set. // Can be used by agent filters and outputs for validation/routing/enrichment/etc. // // Default: not set TSessionParameters& SetMeta(const THashMap& meta) { Meta = meta; return *this; } // Session counters. // Actual provided counters are listed in TClientSessionCounters. // // Default: A single common for all sessions subgroup of client TDynamicCounters instance // with label ('session': 'default'). TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) { return SetCounters(MakeIntrusive(counters)); } TSessionParameters& SetCounters(const TIntrusivePtr& counters) { Counters = counters; return *this; } // Max bytes count that have been received by client session but not acknowledged yet. // When exceeded, new messages will be discarded, an error message // will be written to the TLog instance and drop counter will be incremented. // // Default: value from client settings TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) { MaxInflightBytes = maxInflightBytes; return *this; } public: TMaybe SessionId; TMaybe> Meta; TIntrusivePtr Counters; TMaybe MaxInflightBytes; }; // Message data to be sent to unified agent. struct TClientMessage { // Opaque message payload. TString Payload; // Message metadata as key-value set. // Can be used by agent filters and outputs for validation/routing/enrichment/etc. // // Default: not set TMaybe> Meta{}; // Message timestamp. // // Default: time the client library has received this instance of TClientMessage. TMaybe Timestamp{}; }; // Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc). size_t SizeOf(const TClientMessage& message); class IClientSession: public TAtomicRefCount { public: virtual ~IClientSession() = default; // Places the message into send queue. Actual grpc call may occur later asynchronously, // based on settings GrpcSendDelay and GrpcMaxMessageSize. // A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes // settings are exceeded, or if the Close method has already been called. // In this case an error message will be written to the TLog instance // and drop counter will be incremented. virtual void Send(TClientMessage&& message) = 0; void Send(const TClientMessage& message) { Send(TClientMessage(message)); } // Waits until either all current inflight messages are // acknowledged or the specified deadline is reached. // Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel). virtual NThreading::TFuture CloseAsync(TInstant deadline) = 0; void Close(TInstant deadline) { CloseAsync(deadline).Wait(); } void Close(TDuration timeout = TDuration::Seconds(3)) { Close(Now() + timeout); } }; using TClientSessionPtr = TIntrusivePtr; class IClient: public TAtomicRefCount { public: virtual ~IClient() = default; virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0; virtual void StartTracing(ELogPriority) { } virtual void FinishTracing() { } }; using TClientPtr = TIntrusivePtr; TClientPtr MakeClient(const TClientParameters& parameters); }