syntax = "proto3"; import "google/protobuf/descriptor.proto"; package NUnifiedAgentProto; option java_package = "com.yandex.unified_agent"; option go_package = "a.yandex-team.ru/library/cpp/unified_agent_client/proto;unifiedagent"; extend google.protobuf.FileOptions { bool GenerateYaStyle = 66777; } message Request { message SessionMetaItem { string name = 1; string value = 2; } message Initialize { // Session_id provided by server, use it in case of reconnects. string session_id = 1; // Session metadata repeated SessionMetaItem meta = 2; string shared_secret_key = 3; } message MessageMetaItem { // Arbitrary key-value pairs. Can be used by agent filters to modify/filter messages // or to route them to target outputs. // Meta items of all messages should be grouped by meta key, it's expected in the 'key' field. // Meta values should be passed in the 'value' sequence, it corresponds to the payload // sequence from DataBatch. If some messages don't have a meta with this key, the range of such messages // can be passed via skip_start/skip_length sequences. // For example, [{m:v1}, {}, {}, {m: v2}, {}, {m: v3}, {}, {}] can be represented as follows: // key: 'm' // value: ['v1', 'v2', 'v3'] // skip_start: [1, 4] // skip_length: [2, 1] string key = 1; repeated string value = 2; repeated uint32 skip_start = 3; repeated uint32 skip_length = 4; } message DataBatch { repeated uint64 seq_no = 1; repeated uint64 timestamp = 2; //microseconds repeated bytes payload = 100; repeated MessageMetaItem meta = 101; } oneof request { Initialize initialize = 1; DataBatch data_batch = 2; } } message Response { message Initialized { // Session identifier for log and deduplication purposes. string session_id = 1; // Application can skip all formed messages by seq_no upto last_seq_no - they are consumed by server. uint64 last_seq_no = 2; } message Ack { uint64 seq_no = 1; } oneof response { Initialized initialized = 1; Ack ack = 2; } } service UnifiedAgentService { rpc Session(stream Request) returns (stream Response); } // dataflow: // Request.initialize -> UnifiedAgent; // specify session_id when this is a retry. Сlient can already have sesison_id from previous init response, // or it can use some pregenerated sessionId for each session. // Response.initializeded -> client; // Request.entry -> UnifiedAgent; // .... // Response.ack -> client; // when this record is consumed by UnifiedAgent with choosen garanties UnifiedAgent will send ack to client; // client can forget about this log record now // // grpc finish session -> client; // something went wrong; client must reconnect and retry all not acknowleged records // // Exactly once retries - when reconnect, client must provide previous session_id and same seq_no`s // for records - only in this case UnifiedAgent can dedup.