Browse Source

alter with partial fields usage

ref:614b6696343378fc384a9465085d98e2f5651c75
alexnick 2 years ago
parent
commit
fd94790c9e

+ 1 - 0
ydb/public/api/protos/CMakeLists.txt

@@ -45,6 +45,7 @@ target_proto_messages(api-protos PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_table.proto
   ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_value.proto
   ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_s3_internal.proto
+  ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_topic.proto
   ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/yq.proto
 )
 generate_enum_serilization(api-protos

+ 1 - 0
ydb/public/api/protos/ydb_scheme.proto

@@ -58,6 +58,7 @@ message Entry {
         COLUMN_TABLE = 13;
         SEQUENCE = 15;
         REPLICATION = 16;
+        TOPIC = 17;
     }
 
     // Name of scheme entry (dir2 of /dir1/dir2)

+ 101 - 131
ydb/public/api/protos/ydb_topic.proto

@@ -3,11 +3,12 @@ import "ydb/public/api/protos/ydb_operation.proto";
 import "ydb/public/api/protos/ydb_scheme.proto";
 import "ydb/public/api/protos/ydb_status_codes.proto";
 import "ydb/public/api/protos/ydb_issue_message.proto";
-import "ydb/public/api/protos/annotations/validation.proto";
 
 import "google/protobuf/duration.proto";
 import "google/protobuf/timestamp.proto";
 
+import "google/protobuf/wrappers.proto";
+
 package Ydb.Topic;
 
 option java_package = "com.yandex.ydb.topic";
@@ -99,8 +100,7 @@ message StreamWriteMessage {
         int64 partition_id = 3;
 
         // Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise.
-        // See enum Codec above for values.
-        repeated int64 supported_codecs = 10;
+        SupportedCodecs supported_codecs = 4;
     }
 
     // Represents portion of client messages.
@@ -425,9 +425,11 @@ message StreamReadMessage {
         // InitRequest.max_lag and InitRequest.read_from could lead to skip of more messages.
         // Server will return data starting from offset that is maximum of actual committed offset, read_offset (if set)
         // and offsets calculated from InitRequest.max_lag and InitRequest.read_from.
-        optional int64 read_offset = 2;
+//        optional int64 read_offset = 2;
+        google.protobuf.Int64Value read_offset = 2;
         // All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet.
-        optional int64 commit_offset = 3;
+        //optional int64 commit_offset = 3;
+        google.protobuf.Int64Value commit_offset = 3;
     }
 
     // Command from server to stop and destroy concrete partition session.
@@ -474,6 +476,55 @@ message DropTopicResponse {
 message DropTopicResult {
 }
 
+// Description of supported codecs.
+message SupportedCodecs {
+    // List of supported codecs.
+    // See enum Codec above for values.
+    repeated int32 codecs = 1;
+}
+
+// Consumer description.
+message Consumer {
+    // Must have valid not empty name as a key.
+    string name = 1;
+    // Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
+    // User should take care that such consumer never stalls, to prevent running out of disk space.
+    // Flag that this consumer is important.
+    bool important = 2;
+    // All messages with smaller server written_at timestamp will be skipped.
+    google.protobuf.Timestamp read_from = 3;
+    reserved 4; // supported_format
+    // List of supported codecs by this consumer.
+    // supported_codecs on topic must be contained inside this list.
+    SupportedCodecs supported_codecs = 5;
+
+    // Attributes of consumer
+    map<string, string> attributes = 6;
+}
+
+
+// Consumer alter description.
+message AlterConsumer {
+    // Must have valid not empty name as a key.
+    string name = 1;
+    // Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
+    // User should take care that such consumer never stalls, to prevent running out of disk space.
+    // Flag that this consumer is important.
+//    optional bool important = 2;
+    google.protobuf.BoolValue important = 2;
+    // All messages with smaller server written_at timestamp will be skipped.
+    google.protobuf.Timestamp read_from = 3;
+    reserved 4; // supported_format
+    // List of supported codecs by this consumer.
+    // supported_codecs on topic must be contained inside this list.
+    SupportedCodecs supported_codecs = 5;
+
+    // User and server attributes of consumer. Server attributes starts from "_" and will be validated by server.
+    // Leave the value blank to drop an attribute.
+    map<string, string> alter_attributes = 6;
+}
+
+
 // Message for describing topic internals.
 message TopicSettings {
     oneof partitions {
@@ -483,44 +534,24 @@ message TopicSettings {
     }
     // How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
     // Default limit - 36 hours.
-    google.protobuf.Duration retention_period = 2;
-    // How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
-    int64 retention_storage_mb = 3;
-    reserved 4; // supported_format.
+    google.protobuf.Duration retention_period = 3;
+    // How much data in partition should be stored. Must be greater than 0 and less than limit for this database. Zero means infinite limit.
+    int64 retention_storage_mb = 4;
+    reserved 5; // supported_format.
     // List of allowed codecs for writers.
-    // See enum Codec above for values.
     // Writes with codec not from this list are forbidden.
-    repeated int64 supported_codecs = 5;
-    // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s.
-    int64 partition_write_speed_bytes_per_second = 6;
-    // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB.
-    int64 partition_write_burst_bytes = 7;
+    SupportedCodecs supported_codecs = 6;
+    // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s - used when value is zero.
+    int64 partition_write_speed_bytes_per_second = 7;
+    // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB - used when value is zero.
+    int64 partition_write_burst_bytes = 8;
 
     // User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
-    map<string, string> attributes = 8;
+    map<string, string> attributes = 9;
 
     // List of consumers for this topic.
-    repeated Consumer consumers = 9;
-
-    // Consumer description.
-    message Consumer {
-        // Must have valid not empty name as a key.
-        string name = 1;
-        // Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
-        // User should take care that such consumer never stalls, to prevent running out of disk space.
-        // Flag that this consumer is important.
-        bool important = 2;
-        // All messages with smaller server written_at timestamp will be skipped.
-        google.protobuf.Timestamp read_from = 3;
-        reserved 4; // supported_format
-        // List of supported codecs by this consumer.
-        // See enum Codec above for values.
-        // supported_codecs on topic must be contained inside this list.
-        repeated int64 supported_codecs = 5;
+    repeated Consumer consumers = 10;
 
-        // Attributes of consumer
-        map<string, string> attributes = 6;
-    }
 }
 
 // Create topic request sent from client to server.
@@ -528,8 +559,9 @@ message CreateTopicRequest {
     Ydb.Operations.OperationParams operation_params = 1;
     // Topic path.
     string path = 2;
+
     // Topic settings.
-    TopicSettings settings = 4;
+    TopicSettings settings = 3;
 }
 
 
@@ -560,45 +592,42 @@ message AlterTopicResponse {
 
 // Update topic result message that will be inside UpdateTopicResponse.operation.
 message AlterTopicResult {
-}
-
-// Add consumer for existing topic request.
-message AddConsumerRequest {
-    Ydb.Operations.OperationParams operation_params = 1;
-    // Topic path.
-    string path = 2;
-    // consumers to add
-    TopicSettings.Consumer consumer = 3;
-}
-
-// Add consumer for existing topic response.
-message AddConsumerResponse {
-    // Result of request will be inside operation.
-    Ydb.Operations.Operation operation = 1;
-}
-
-// Add consumer result message that will be inside AddConsumerReponse.operation.
-message AddConsumerResult {
-}
+    oneof partitions {
+        // How many uniform partitions in topic. Must less than database limit. Default limit - 10.
+        int32 partitions_count = 1;
+        // 2 reserved for partition_at_keys
+    }
+    // How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
+    // Default limit - 36 hours.
+    google.protobuf.Duration retention_period = 3;
+    // How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
+//    optional int64 retention_storage_mb = 4;
+    google.protobuf.Int64Value retention_storage_mb = 4;
+    reserved 5; // supported_format.
+    // List of allowed codecs for writers.
+    // See enum Codec above for values.
+    // Writes with codec not from this list are forbidden.
+    SupportedCodecs supported_codecs = 6;
 
-// Drop consumer request for existing topic.
-message DropConsumerRequest {
-    Ydb.Operations.OperationParams operation_params = 1;
-    // Topic path.
-    string path = 2;
-    // Name of consumer to drop.
-    string consumer = 3;
-}
+    // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s.
+//    optional int64 partition_write_speed_bytes_per_second = 7;
+    google.protobuf.Int64Value partition_write_speed_bytes_per_second = 7;
+    // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB.
+//    optional int64 partition_write_burst_bytes = 8;
+    google.protobuf.Int64Value partition_write_burst_bytes = 8;
 
-// Drop consumer response for existing topic.
-message DropConsumerResponse {
-    // Result of request will be inside operation.
-    Ydb.Operations.Operation operation = 1;
+    // User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
+    // Leave the value blank to drop an attribute.
+    map<string, string> alter_attributes = 9;
+
+    // Add consumers.
+    repeated Consumer add_consumers = 10;
+    // Remove consumers (by its names)
+    repeated string drop_consumers = 11;
+    // Alter consumers
+    repeated AlterConsumer alter_consumers = 12;
 }
 
-// Drop consumer result message that will be inside DropConsumerReponse.operation.
-message DropConsumerResult {
-}
 
 // Describe topic request sent from client to server.
 message DescribeTopicRequest {
@@ -606,14 +635,6 @@ message DescribeTopicRequest {
 
     // Topic path.
     string path = 2;
-
-    // If this message is present, response will include runtime topic statistics.
-    IncludeStats include_stats = 3;
-
-    message IncludeStats {
-        // Consumer statistics for reading this topic may be included in response.
-        string consumer = 1;
-    }
 }
 
 // Describe topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR".
@@ -628,55 +649,4 @@ message DescribeTopicResult {
     Ydb.Scheme.Entry self = 1;
     // Settings of topic.
     TopicSettings settings = 2;
-
-    // Message containing information about concrete topic reading, if requested.
-    TopicStats topic_stats = 3;
-
-    message TopicStats {
-        repeated PartitionStats partition_stats = 1;
-
-        // Message containing information about concrete topic's partition reading.
-        message PartitionStats {
-            // Patition identifier inside topic.
-            int64 partition_id = 1;
-
-            // Partition contains messages with offsets in range [start, end).
-            OffsetsRange partition_offsets = 2;
-
-            // Host name of node where partition leader is running.
-            string tablet_node = 3;
-
-            // Statistics of particular consumer, if requested.
-            ConsumerStats consumer_stats = 4;
-
-            message ConsumerStats {
-                // Offset of consumer committed message a.k.a. first not processed message.
-                // If commit_offset == end_offset then all messages from partition are processed.
-                int64 commit_offset = 2;
-                // Consumer lag in time between committed and last messages in partition.
-                int64 commit_time_lag_ms = 3;
-
-                // Offset ranges client wants to commit, but server is waiting for commits of gaps.
-                repeated OffsetsRange out_of_order_commit_offset_ranges = 4;
-
-                // Offset of first not read message by consumer from this partition.
-                // read_offset can be bigger that committed_offset - consumer could read some messages but not yet commit them.
-                int64 read_offset = 5;
-                // Consumer lag in time between read and last messages in partition.
-                int64 read_time_lag_ms = 6;
-
-                // Session identifier that locked and reading this partition right now.
-                string session_id = 7;
-                // Ip of node that created reading this session.
-                string client_ip = 8;
-                // Host name of proxy node that processing this reading session.
-                string proxy_node = 9;
-
-                // Assign identifier of actual partition assignment.
-                int64 partition_session_id = 10;
-                // Timestamp of partition session start.
-                google.protobuf.Timestamp partition_session_started_at = 11;
-            }
-        }
-    }
 }

+ 8 - 0
ydb/public/lib/validation/helpers.cpp

@@ -5,6 +5,7 @@
 #include <google/protobuf/duration.pb.h>
 #include <google/protobuf/empty.pb.h>
 #include <google/protobuf/timestamp.pb.h>
+#include <google/protobuf/wrappers.pb.h>
 
 #include <util/string/builder.h>
 #include <util/string/subst.h>
@@ -55,6 +56,13 @@ bool IsCustomMessage(const google::protobuf::Descriptor* message) {
     if (message->full_name() == google::protobuf::Timestamp::descriptor()->full_name()) {
         return false;
     }
+    if (message->full_name() == google::protobuf::Int64Value::descriptor()->full_name()) {
+        return false;
+    }
+    if (message->full_name() == google::protobuf::BoolValue::descriptor()->full_name()) {
+        return false;
+    }
+
     if (message->options().map_entry()) {
         return false;
     }