Browse Source

restrictions on supported codecs inside ydb LOGBROKER-7533

ref:3d370cafe7740d66ccab1b9bb80d74b79c3d4562
alexnick 2 years ago
parent
commit
d035a83fda

+ 1 - 1
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp

@@ -12,7 +12,7 @@
 namespace NYdb::NPersQueue {
 
 const TVector<ECodec>& GetDefaultCodecs() {
-    static const TVector<ECodec> codecs = {ECodec::RAW, ECodec::GZIP, ECodec::LZOP};
+    static const TVector<ECodec> codecs = {};
     return codecs;
 }
 

+ 1 - 1
ydb/services/datastreams/datastreams_proxy.cpp

@@ -987,7 +987,7 @@ namespace NKikimr::NDataStreams::V1 {
         TString error = AddReadRuleToConfig(pqConfig, readRule, serviceTypes, ctx);
         bool hasDuplicates = false;
         if (error.Empty()) {
-            hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error);
+            hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error, ctx);
         }
 
         if (!error.Empty()) {

+ 23 - 4
ydb/services/lib/actors/pq_schema_actor.cpp

@@ -269,7 +269,7 @@ namespace NKikimr::NGRpcProxy::V1 {
 
     bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config,
                               const TClientServiceTypes& supportedClientServiceTypes,
-                              TString& error) {
+                              TString& error, const TActorContext& ctx) {
 
         if (config.GetReadRules().size() > MAX_READ_RULES_COUNT) {
             error = TStringBuilder() << "read rules count cannot be more than "
@@ -299,6 +299,25 @@ namespace NKikimr::NGRpcProxy::V1 {
                 return false;
             }
         }
+        if (config.GetCodecs().IdsSize() > 0) {
+            for (ui32 i = 0; i < config.ConsumerCodecsSize(); ++i) {
+                TString name = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx);
+
+                auto& consumerCodecs = config.GetConsumerCodecs(i);
+                if (consumerCodecs.IdsSize() > 0) {
+                    THashSet<i64> codecs;
+                    for (auto& cc : consumerCodecs.GetIds()) {
+                        codecs.insert(cc);
+                    }
+                    for (auto& cc : config.GetCodecs().GetIds()) {
+                        if (codecs.find(cc) == codecs.end()) {
+                            error = TStringBuilder() << "for consumer '" << name << "' got unsupported codec " << (cc+1) << " which is suppored by topic";
+                            return false;
+                        }
+                    }
+                }
+            }
+        }
 
         return false;
     }
@@ -712,7 +731,7 @@ namespace NKikimr::NGRpcProxy::V1 {
             }
         }
 
-        CheckReadRulesConfig(*config, supportedClientServiceTypes, error);
+        CheckReadRulesConfig(*config, supportedClientServiceTypes, error, ctx);
         return error.empty() ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::BAD_REQUEST;
     }
 
@@ -852,7 +871,7 @@ namespace NKikimr::NGRpcProxy::V1 {
             }
         }
 
-        CheckReadRulesConfig(*config, supportedClientServiceTypes, error);
+        CheckReadRulesConfig(*config, supportedClientServiceTypes, error, ctx);
         return error.empty() ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::BAD_REQUEST;
     }
 
@@ -1019,7 +1038,7 @@ namespace NKikimr::NGRpcProxy::V1 {
             }
         }
 
-        bool hasDuplicates = CheckReadRulesConfig(*config, supportedClientServiceTypes, error);
+        bool hasDuplicates = CheckReadRulesConfig(*config, supportedClientServiceTypes, error, ctx);
         return error.empty() ? Ydb::StatusIds::SUCCESS : (hasDuplicates ? Ydb::StatusIds::ALREADY_EXISTS : Ydb::StatusIds::BAD_REQUEST);
     }
 

+ 18 - 2
ydb/services/lib/actors/pq_schema_actor.h

@@ -51,7 +51,7 @@ namespace NKikimr::NGRpcProxy::V1 {
     TClientServiceTypes GetSupportedClientServiceTypes(const TActorContext& ctx);
 
     // Returns true if have duplicated read rules
-    bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedReadRuleServiceTypes, TString& error);
+    bool CheckReadRulesConfig(const NKikimrPQ::TPQTabletConfig& config, const TClientServiceTypes& supportedReadRuleServiceTypes, TString& error, const TActorContext& ctx);
 
     TString AddReadRuleToConfig(
         NKikimrPQ::TPQTabletConfig *config,
@@ -361,9 +361,25 @@ namespace NKikimr::NGRpcProxy::V1 {
             return this->SendProposeRequest(ctx);
         }
 
+        void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) {
+            auto msg = ev->Get();
+            const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(ev->Get()->Record.GetStatus());
+
+            if (status ==  TEvTxUserProxy::TResultStatus::ExecError && msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusPreconditionFailed)
+            {
+                return TBase::ReplyWithError(Ydb::StatusIds::OVERLOADED,
+                                                         Ydb::PersQueue::ErrorCode::ERROR,
+                                                         TStringBuilder() << "Topic with name " << TBase::GetTopicPath(ctx) << " has another alter in progress",
+                                                         ctx);
+            }
+
+            return TBase::TBase::Handle(ev, ctx);
+        }
+
         void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
             switch (ev->GetTypeRewrite()) {
-            default: TBase::StateWork(ev, ctx);
+                HFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle);
+                default: TBase::StateWork(ev, ctx);
             }
         }
 

+ 1 - 1
ydb/services/persqueue_v1/actors/schema_actors.cpp

@@ -239,7 +239,7 @@ void TAddReadRuleActor::ModifyPersqueueConfig(
     TString error = AddReadRuleToConfig(pqConfig, rule, serviceTypes, ctx);
     bool hasDuplicates = false;
     if (error.Empty()) {
-        hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error);
+        hasDuplicates = CheckReadRulesConfig(*pqConfig, serviceTypes, error, ctx);
     }
 
     if (!error.Empty()) {

+ 2 - 2
ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp

@@ -622,7 +622,7 @@ namespace NKikimr::NPersQueueTests {
                 rr->set_version(0);
                 rr->set_important(true);
                 rr->set_supported_format(TopicSettings::FORMAT_BASE);
-                rr->add_supported_codecs(CODEC_ZSTD);
+                rr->add_supported_codecs(CODEC_GZIP);
                 auto status = stub->AddReadRule(&grpcContext, addRuleRequest, &addRuleResponse);
                 Cerr << "ADD RR RESPONSE " << addRuleResponse << "\n";
                 UNIT_ASSERT(status.ok() && addRuleResponse.operation().status() == Ydb::StatusIds::SUCCESS);
@@ -696,7 +696,7 @@ namespace NKikimr::NPersQueueTests {
                 rr->set_version(0);
                 rr->set_important(true);
                 rr->set_supported_format(TopicSettings::FORMAT_BASE);
-                rr->add_supported_codecs(CODEC_ZSTD);
+                rr->add_supported_codecs(CODEC_GZIP);
                 auto status = stub->AddReadRule(&grpcContext, addRuleRequest, &addRuleResponse);
                 Cerr << addRuleResponse << "\n";
                 UNIT_ASSERT(status.ok() && addRuleResponse.operation().status() == Ydb::StatusIds::SUCCESS);

+ 7 - 1
ydb/services/persqueue_v1/persqueue_ut.cpp

@@ -3035,8 +3035,14 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
         (*request.mutable_alter_attributes())["_allow_unauthenticated_read"] = "true";
 
         (*request.mutable_alter_attributes())["_partitions_per_tablet"] = "5";
-        alter(request, Ydb::StatusIds::SUCCESS, false);
 
+        rr->mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_LZOP);
+
+        alter(request, Ydb::StatusIds::BAD_REQUEST, false);
+
+        rr->mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_CUSTOM + 5);
+
+        alter(request, Ydb::StatusIds::SUCCESS, false);
 
         request = Ydb::Topic::AlterTopicRequest{};
         request.set_path(TStringBuilder() << "/Root/PQ/" << topic3);