Browse Source

Refactoring of pqconfig.proto - extract message Consumer instead of separeted arrays of ReadRules (#2347)

Nikolay Shestakov 1 year ago
parent
commit
a3397fbf86

+ 2 - 10
ydb/core/kqp/topics/kqp_topics.cpp

@@ -1,6 +1,7 @@
 #include "kqp_topics.h"
 
 #include <ydb/core/base/path.h>
+#include <ydb/core/persqueue/utils.h>
 #include <ydb/library/actors/core/log.h>
 
 #define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
@@ -294,16 +295,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
                 result.PQGroupInfo->Description;
 
             if (Consumer_) {
-                bool found = false;
-
-                for (auto& consumer : description.GetPQTabletConfig().GetReadRules()) {
-                    if (Consumer_ == consumer) {
-                        found = true;
-                        break;
-                    }
-                }
-
-                if (!found) {
+                if (!NPQ::HasConsumer(description.GetPQTabletConfig(), *Consumer_)) {
                     builder << "Unknown consumer '" << *Consumer_ << "'";
 
                     status = Ydb::StatusIds::BAD_REQUEST;

+ 18 - 17
ydb/core/persqueue/partition.cpp

@@ -279,11 +279,13 @@ ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
 }
 
 ui64 TPartition::ImportantClientsMinOffset() const {
-    const auto& partConfig = Config.GetPartitionConfig();
-
     ui64 minOffset = EndOffset;
-    for (const auto& importantClientId : partConfig.GetImportantClientId()) {
-        const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId);
+    for (const auto& consumer : Config.GetConsumers()) {
+        if (!consumer.GetImportant()) {
+            continue;
+        }
+
+        const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName());
         ui64 curOffset = StartOffset;
         if (userInfo && userInfo->Offset >= 0) //-1 means no offset
             curOffset = userInfo->Offset;
@@ -544,7 +546,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
     InitDone = true;
     TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());
 
-    FillReadFromTimestamps(Config, ctx);
+    FillReadFromTimestamps(ctx);
     ResendPendingEvents(ctx);
     ProcessTxsAndUserActs(ctx);
 
@@ -1791,29 +1793,30 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
     }
 
     TSet<TString> important;
-    for (const auto& importantUser : config.GetPartitionConfig().GetImportantClientId()) {
-        important.insert(importantUser);
+    for (const auto& consumer : config.GetConsumers()) {
+        if (consumer.GetImportant()) {
+            important.insert(consumer.GetName());
+        }
     }
 
-    for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
-        const auto& consumer = config.GetReadRules(i);
-        auto& userInfo = GetOrCreatePendingUser(consumer, 0);
+    for (auto& consumer : config.GetConsumers()) {
+        auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0);
 
-        TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
+        TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
         if (!ts) {
             ts += TDuration::MilliSeconds(1);
         }
         userInfo.ReadFromTimestamp = ts;
-        userInfo.Important = important.contains(consumer);
+        userInfo.Important = important.contains(consumer.GetName());
 
-        ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
+        ui64 rrGen = consumer.GetGeneration();
         if (userInfo.ReadRuleGeneration != rrGen) {
-            TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, 0, TActorId{},
+            TEvPQ::TEvSetClientInfo act(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{},
                                         TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen);
 
             ProcessUserAct(act, ctx);
         }
-        hasReadRule.erase(consumer);
+        hasReadRule.erase(consumer.GetName());
     }
 
     for (auto& consumer : hasReadRule) {
@@ -1915,8 +1918,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
 
     Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);
 
-    UsersInfoStorage->UpdateConfig(Config);
-
     Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
     Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
 

+ 1 - 1
ydb/core/persqueue/partition.h

@@ -117,7 +117,7 @@ private:
     void CreateMirrorerActor();
     void DoRead(TEvPQ::TEvRead::TPtr&& ev, TDuration waitQuotaTime, const TActorContext& ctx);
     void FailBadClient(const TActorContext& ctx);
-    void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx);
+    void FillReadFromTimestamps(const TActorContext& ctx);
     void FilterDeadlinedWrites(const TActorContext& ctx);
 
     void Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx);

+ 3 - 0
ydb/core/persqueue/partition_init.cpp

@@ -163,6 +163,9 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
     switch (response.GetStatus()) {
     case NKikimrProto::OK:
         Y_ABORT_UNLESS(Partition()->Config.ParseFromString(response.GetValue()));
+
+        Migrate(Partition()->Config);
+
         if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) {
             auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter,
                                                                      Partition()->TabletConfig);

+ 20 - 14
ydb/core/persqueue/partition_read.cpp

@@ -33,7 +33,7 @@ void TPartition::SendReadingFinished(const TString& consumer) {
     Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId));
 }
 
-void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) {
+void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
     TSet<TString> hasReadRule;
 
     for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
@@ -41,14 +41,14 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
         userInfo.HasReadRule = false;
         hasReadRule.insert(consumer);
     }
-    for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
-        const auto& consumer = config.GetReadRules(i);
-        auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0);
+
+    for (auto& consumer : Config.GetConsumers()) {
+        auto& userInfo = UsersInfoStorage->GetOrCreate(consumer.GetName(), ctx, 0);
         userInfo.HasReadRule = true;
-        ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
-        if (userInfo.ReadRuleGeneration != rrGen) {
+
+        if (userInfo.ReadRuleGeneration != consumer.GetGeneration()) {
             THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(
-                    0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen
+                    0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, consumer.GetGeneration()
             );
             //
             // TODO(abcdef): заменить на вызов ProcessUserAct
@@ -61,12 +61,13 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
             }
             userInfo.Step = userInfo.Generation = 0;
         }
-        hasReadRule.erase(consumer);
-        TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
+        hasReadRule.erase(consumer.GetName());
+        TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
         if (!ts) ts += TDuration::MilliSeconds(1);
         if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts)
             userInfo.ReadFromTimestamp = ts;
     }
+
     for (auto& consumer : hasReadRule) {
         auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx);
         if (userInfo.NoConsumer) {
@@ -179,9 +180,14 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr&
 
 void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
     TSet<TString> important;
-    for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) {
-        important.insert(importantUser);
-        TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser);
+    for (const auto& consumer : Config.GetConsumers()) {
+        if (!consumer.GetImportant()) {
+            continue;
+        }
+
+        important.insert(consumer.GetName());
+
+        TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName());
         if (userInfo && !userInfo->Important && userInfo->LabeledCounters) {
             ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup()));
             userInfo->SetImportant(true);
@@ -189,12 +195,12 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
         }
         if (!userInfo) {
             userInfo = &UsersInfoStorage->Create(
-                    ctx, importantUser, 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
+                    ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
             );
         }
         if (userInfo->Offset < (i64)StartOffset)
             userInfo->Offset = StartOffset;
-        ReadTimestampForOffset(importantUser, *userInfo, ctx);
+        ReadTimestampForOffset(consumer.GetName(), *userInfo, ctx);
     }
     for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
         if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) {

+ 22 - 13
ydb/core/persqueue/pq_impl.cpp

@@ -4,6 +4,8 @@
 #include "partition_log.h"
 #include "partition.h"
 #include "read.h"
+#include "utils.h"
+
 #include <ydb/core/base/tx_processing.h>
 #include <ydb/core/base/feature_flags.h>
 #include <ydb/core/persqueue/config/config.h>
@@ -1013,6 +1015,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
         bool res = Config.ParseFromString(read.GetValue());
         Y_ABORT_UNLESS(res);
 
+        Migrate(Config);
+
         if (!Config.PartitionsSize()) {
             for (const auto partitionId : Config.GetPartitionIds()) {
                 Config.AddPartitions()->SetPartitionId(partitionId);
@@ -1145,10 +1149,12 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) {
     }
 
     auto countReadRulesWithPricing = [&](const TActorContext& ctx, const auto& config) {
+        auto& defaultClientServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName();
+
         ui32 result = 0;
-        for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
-            TString rrServiceType = config.ReadRuleServiceTypesSize() <= i ? "" : config.GetReadRuleServiceTypes(i);
-            if (rrServiceType.empty() || rrServiceType == AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName())
+        for (auto& consumer : config.GetConsumers()) {
+            TString serviceType = consumer.GetServiceType();
+            if (serviceType.empty() || serviceType == defaultClientServiceType)
                 ++result;
         }
         return result;
@@ -1618,24 +1624,27 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
     if (!cfg.HasCacheSize() && Config.HasCacheSize()) //if not set and it is alter - preserve old cache size
         cfg.SetCacheSize(Config.GetCacheSize());
 
+    Migrate(cfg);
+
     // set rr generation for provided read rules
     {
         THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
-        for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
-            auto version = i < Config.ReadRuleVersionsSize() ? Config.GetReadRuleVersions(i) : 0;
-            auto generation = i < Config.ReadRuleGenerationsSize() ? Config.GetReadRuleGenerations(i) : 0;
-            existed[Config.GetReadRules(i)] = std::make_pair(version, generation);
+        for (const auto& c : Config.GetConsumers()) {
+            existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
         }
-        for (ui32 i = 0; i < cfg.ReadRulesSize(); ++i) {
-            auto version = i < cfg.ReadRuleVersionsSize() ? cfg.GetReadRuleVersions(i) : 0;
-            auto it = existed.find(cfg.GetReadRules(i));
+
+        for (auto& c : *cfg.MutableConsumers()) {
+            auto it = existed.find(c.GetName());
             ui64 generation = 0;
-            if (it != existed.end() && it->second.first == version) {
+            if (it != existed.end() && it->second.first == c.GetVersion()) {
                 generation = it->second.second;
             } else {
                 generation = curConfigVersion;
             }
-            cfg.AddReadRuleGenerations(generation);
+            c.SetGeneration(generation);
+            if (ReadRuleCompatible()) {
+                cfg.AddReadRuleGenerations(generation);
+            }
         }
     }
 
@@ -1648,7 +1657,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
 
     BeginWriteConfig(cfg, bootstrapCfg, ctx);
 
-    NewConfig = cfg;
+    NewConfig = std::move(cfg);
 }
 
 void TPersQueue::BeginWriteConfig(const NKikimrPQ::TPQTabletConfig& cfg,

+ 14 - 18
ydb/core/persqueue/read_balancer.cpp

@@ -66,16 +66,12 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
             if (!config.empty()) {
                 bool res = Self->TabletConfig.ParseFromString(config);
                 Y_ABORT_UNLESS(res);
+
+                Migrate(Self->TabletConfig);
                 Self->Consumers.clear();
 
-                if (Self->TabletConfig.ReadRulesSize() == Self->TabletConfig.ConsumerScalingSupportSize()) {
-                    for (size_t i = 0; i < Self->TabletConfig.ReadRulesSize(); ++i) {
-                        Self->Consumers[Self->TabletConfig.GetReadRules(i)].ScalingSupport = Self->TabletConfig.GetConsumerScalingSupport(i);
-                    }
-                } else {
-                    for (const auto& rr : Self->TabletConfig.GetReadRules()) {
-                        Self->Consumers[rr].ScalingSupport = DefaultScalingSupport();
-                    }
+                for (auto& consumer : Self->TabletConfig.GetConsumers()) {
+                    Self->Consumers[consumer.GetName()].ScalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport();
                 }
 
                 Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);
@@ -532,10 +528,12 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
     Version = record.GetVersion();
     MaxPartsPerTablet = record.GetPartitionPerTablet();
     PathId = record.GetPathId();
-    Topic = record.GetTopicName();
-    Path = record.GetPath();
+    Topic = std::move(record.GetTopicName());
+    Path = std::move(record.GetPath());
     TxId = record.GetTxId();
-    TabletConfig = record.GetTabletConfig();
+    TabletConfig = std::move(record.GetTabletConfig());
+    Migrate(TabletConfig);
+
     SchemeShardId = record.GetSchemeShardId();
     TotalGroups = record.HasTotalGroupCount() ? record.GetTotalGroupCount() : 0;
     ui32 prevNextPartitionId = NextPartitionId;
@@ -547,17 +545,15 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
 
     auto oldConsumers = std::move(Consumers);
     Consumers.clear();
-    for (size_t i = 0; i < TabletConfig.ReadRulesSize(); ++i) {
-        auto& rr = TabletConfig.GetReadRules(i);
+    for (auto& consumer : TabletConfig.GetConsumers()) {
+        auto scalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport();
 
-        auto scalingSupport = i < TabletConfig.ConsumerScalingSupportSize() ? TabletConfig.GetConsumerScalingSupport(i)
-                                                                            : DefaultScalingSupport();
-        auto it = oldConsumers.find(rr);
+        auto it = oldConsumers.find(consumer.GetName());
         if (it != oldConsumers.end()) {
-            auto& c = Consumers[rr] = std::move(it->second);
+            auto& c = Consumers[consumer.GetName()] = std::move(it->second);
             c.ScalingSupport = scalingSupport;
         } else {
-            Consumers[rr].ScalingSupport = scalingSupport;
+            Consumers[consumer.GetName()].ScalingSupport = scalingSupport;
         }
     }
 

+ 4 - 0
ydb/core/persqueue/transaction.cpp

@@ -70,6 +70,8 @@ void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransactio
     TabletConfig = tx.GetTabletConfig();
     BootstrapConfig = tx.GetBootstrapConfig();
 
+    Migrate(TabletConfig);
+
     InitPartitions();
 }
 
@@ -157,6 +159,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
     TabletConfig = txBody.GetTabletConfig();
     BootstrapConfig = txBody.GetBootstrapConfig();
 
+    Migrate(TabletConfig);
+
     TPartitionGraph graph = MakePartitionGraph(TabletConfig);
 
     for (const auto& p : TabletConfig.GetPartitions()) {

+ 3 - 3
ydb/core/persqueue/user_info.cpp

@@ -181,9 +181,9 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx,
 {
     TString defaultServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName();
     TString userServiceType = "";
-    for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
-        if (Config.GetReadRules(i) == user) {
-            userServiceType = Config.ReadRuleServiceTypesSize() > i ? Config.GetReadRuleServiceTypes(i) : "";
+    for (auto& consumer : Config.GetConsumers()) {
+        if (consumer.GetName() == user) {
+            userServiceType = consumer.GetServiceType();
             break;
         }
     }

+ 1 - 5
ydb/core/persqueue/user_info.h

@@ -383,10 +383,6 @@ public:
     const TUserInfo* GetIfExists(const TString& user) const;
     TUserInfo* GetIfExists(const TString& user);
 
-    void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) {
-        Config = config;
-    }
-
     THashMap<TString, TUserInfo>& GetAll();
 
     TUserInfoBase CreateUserInfo(const TString& user,
@@ -422,7 +418,7 @@ private:
 
     TMaybe<TActorId> TabletActor;
     TMaybe<TActorId> PartitionActor;
-    NKikimrPQ::TPQTabletConfig Config;
+    const NKikimrPQ::TPQTabletConfig& Config;
 
     TString CloudId;
     TString DbId;

Some files were not shown because too many files changed in this diff