hcpp 1 год назад
Родитель
Сommit
f95fc3633f

+ 20 - 0
ydb/core/fq/libs/compute/common/config.h

@@ -1,15 +1,22 @@
 #pragma once
 
 #include <ydb/core/fq/libs/config/protos/compute.pb.h>
+#include <ydb/core/fq/libs/protos/fq_private.pb.h>
 
 #include <util/digest/multi.h>
 #include <util/generic/algorithm.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/vector.h>
 #include <util/generic/yexception.h>
 
 namespace NFq {
 
 class TComputeConfig {
 public:
+    explicit TComputeConfig()
+        : TComputeConfig({})
+    {}
+
     explicit TComputeConfig(const NFq::NConfig::TComputeConfig& computeConfig)
         : ComputeConfig(computeConfig)
         , DefaultCompute(ComputeConfig.GetDefaultCompute() != NFq::NConfig::EComputeType::UNKNOWN
@@ -48,6 +55,19 @@ public:
         return DefaultCompute;
     }
 
+    TVector<TString> GetPinTenantNames(FederatedQuery::QueryContent::QueryType queryType, const TString& scope) const {
+        NFq::NConfig::EComputeType computeType = GetComputeType(queryType, scope);
+        switch (computeType) {
+            case NFq::NConfig::EComputeType::YDB:
+                return TVector<TString>{ComputeConfig.GetYdb().GetPinTenantName().begin(), ComputeConfig.GetYdb().GetPinTenantName().end()};
+            case NFq::NConfig::EComputeType::IN_PLACE:
+            case NFq::NConfig::EComputeType::UNKNOWN:
+            case NFq::NConfig::EComputeType_INT_MIN_SENTINEL_DO_NOT_USE_:
+            case NFq::NConfig::EComputeType_INT_MAX_SENTINEL_DO_NOT_USE_:
+                return TVector<TString>{};
+        }
+    }
+
     NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope) const {
         const auto& controlPlane = ComputeConfig.GetYdb().GetControlPlane();
         switch (controlPlane.type_case()) {

+ 1 - 0
ydb/core/fq/libs/config/protos/compute.proto

@@ -52,6 +52,7 @@ message TYdbCompute {
     bool Enable = 1;
     TYdbComputeControlPlane ControlPlane = 2;
     TSynchronizationService SynchronizationService = 3;
+    repeated string PinTenantName = 4;
 }
 
 enum EComputeType {

+ 11 - 5
ydb/core/fq/libs/control_plane_config/control_plane_config.cpp

@@ -34,17 +34,19 @@ class TControlPlaneConfigActor : public NActors::TActorBootstrapped<TControlPlan
     NDbPool::TDbPool::TPtr DbPool;
     ::NMonitoring::TDynamicCounterPtr Counters;
     NConfig::TControlPlaneStorageConfig Config;
+    NConfig::TComputeConfig ComputeConfig;
     TTenantInfo::TPtr TenantInfo;
     bool LoadInProgress = false;
     TDuration DbReloadPeriod;
     TString TablePathPrefix;
 
 public:
-    TControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters)
+    TControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const NConfig::TComputeConfig& computeConfig, const ::NMonitoring::TDynamicCounterPtr& counters)
         : YqSharedResources(yqSharedResources)
         , CredProviderFactory(credProviderFactory)
         , Counters(counters)
         , Config(config)
+        , ComputeConfig(computeConfig)
     {
         DbReloadPeriod = GetDuration(Config.GetDbReloadPeriod(), TDuration::Seconds(3));
     }
@@ -60,7 +62,7 @@ public:
             TablePathPrefix = YdbConnection->TablePathPrefix;
             Schedule(TDuration::Zero(), new NActors::TEvents::TEvWakeup());
         } else {
-            TenantInfo.reset(new TTenantInfo());
+            TenantInfo.reset(new TTenantInfo(ComputeConfig));
             const auto& mapping = Config.GetMapping();
             for (const auto& cloudToTenant : mapping.GetCloudIdToTenantName()) {
                 TenantInfo->SubjectMapping[SUBJECT_TYPE_CLOUD].emplace(cloudToTenant.GetKey(), cloudToTenant.GetValue());
@@ -108,7 +110,7 @@ private:
 
         LoadInProgress = true;
         TDbExecutable::TPtr executable;
-        auto& executer = TTenantExecuter::Create(executable, true, [](TTenantExecuter& executer) { executer.State.reset(new TTenantInfo()); } );
+        auto& executer = TTenantExecuter::Create(executable, true, [computeConfig=ComputeConfig](TTenantExecuter& executer) { executer.State.reset(new TTenantInfo(computeConfig)); } );
 
         executer.Read(
             [=](TTenantExecuter&, TSqlQueryBuilder& builder) {
@@ -242,8 +244,12 @@ TActorId ControlPlaneConfigActorId() {
     return NActors::TActorId(0, name);
 }
 
-NActors::IActor* CreateControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters) {
-    return new TControlPlaneConfigActor(yqSharedResources, credProviderFactory, config, counters);
+NActors::IActor* CreateControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources,
+                                               const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
+                                               const NConfig::TControlPlaneStorageConfig& config,
+                                               const NConfig::TComputeConfig& computeConfig,
+                                               const ::NMonitoring::TDynamicCounterPtr& counters) {
+    return new TControlPlaneConfigActor(yqSharedResources, credProviderFactory, config, computeConfig, counters);
 }
 
 }  // namespace NFq

+ 5 - 1
ydb/core/fq/libs/control_plane_config/control_plane_config.h

@@ -23,6 +23,10 @@ namespace NFq {
 
 NActors::TActorId ControlPlaneConfigActorId();
 
-NActors::IActor* CreateControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters);
+NActors::IActor* CreateControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources,
+                                               const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
+                                               const NConfig::TControlPlaneStorageConfig& config,
+                                               const NConfig::TComputeConfig& computeConfig,
+                                               const ::NMonitoring::TDynamicCounterPtr& counters);
 
 } // namespace NFq

+ 14 - 1
ydb/core/fq/libs/control_plane_config/events/events.h

@@ -2,6 +2,7 @@
 
 #include <ydb/core/fq/libs/events/event_subspace.h>
 #include <ydb/core/fq/libs/quota_manager/events/events.h>
+#include <ydb/core/fq/libs/compute/common/config.h>
 
 #include <ydb/public/api/protos/draft/fq.pb.h>
 
@@ -30,9 +31,21 @@ struct TTenantInfo {
     THashMap<TString /* vtenant */, TString /* tenant */> TenantMapping;
     THashMap<TString /* tenant */, ui32 /* state */> TenantState;
     TInstant StateTime;
+    NFq::TComputeConfig ComputeConfig;
+
+    TTenantInfo() = default;
+
+    TTenantInfo(const NFq::NConfig::TComputeConfig& computeConfig)
+        : ComputeConfig(computeConfig)
+    {}
 
     // this method must be thread safe
-    TString Assign(const TString& cloudId, const TString& /* scope */, const TString& DefaultTenantName = "") const {
+    TString Assign(const TString& cloudId, const TString& scope, FederatedQuery::QueryContent::QueryType queryType, const TString& DefaultTenantName = "") const {
+        auto pinTenants = ComputeConfig.GetPinTenantNames(queryType, scope);
+        if (pinTenants) {
+            return pinTenants[MultiHash(cloudId) % pinTenants.size()];
+        }
+
         auto it = SubjectMapping.find(SUBJECT_TYPE_CLOUD);
         auto vTenant = it == SubjectMapping.end() ? "" : it->second.Value(cloudId, "");
         if (!vTenant && CommonVTenants.size()) {

+ 1 - 1
ydb/core/fq/libs/control_plane_proxy/config.cpp

@@ -30,7 +30,7 @@ TControlPlaneProxyConfig::TControlPlaneProxyConfig(
     const NConfig::TCommonConfig& commonConfig,
     const NYql::TS3GatewayConfig& s3Config)
     : Proto(FillDefaultParameters(config))
-    , StorageConfig(TControlPlaneStorageConfig(storageConfig, s3Config, commonConfig))
+    , StorageConfig(TControlPlaneStorageConfig(storageConfig, s3Config, commonConfig, {}))
     , ComputeConfig(computeConfig)
     , CommonConfig(commonConfig)
     , RequestTimeout(GetDuration(Proto.GetRequestTimeout(), TDuration::Seconds(30)))

+ 1 - 1
ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp

@@ -405,7 +405,7 @@ private:
         );
 
         auto configService = CreateControlPlaneConfigActor(NFq::TYqSharedResources::TPtr{}, NKikimr::TYdbCredentialsProviderFactory(nullptr),
-            NConfig::TControlPlaneStorageConfig{}, MakeIntrusive<::NMonitoring::TDynamicCounters>());
+            NConfig::TControlPlaneStorageConfig{}, NConfig::TComputeConfig{}, MakeIntrusive<::NMonitoring::TDynamicCounters>());
         runtime->AddLocalService(
             NFq::ControlPlaneConfigActorId(),
             TActorSetupCmd(configService, TMailboxType::Simple, 0),

+ 2 - 1
ydb/core/fq/libs/control_plane_storage/config.cpp

@@ -19,8 +19,9 @@ FederatedQuery::BindingSetting::BindingCase GetBindingType(const TString& typeSt
 
 }
 
-TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common)
+TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const NConfig::TComputeConfig& computeConfigProto)
     : Proto(FillDefaultParameters(config))
+    , ComputeConfigProto(computeConfigProto)
     , IdsPrefix(common.GetIdsPrefix())
     , IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10)))
     , AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1)))

+ 2 - 1
ydb/core/fq/libs/control_plane_storage/config.h

@@ -15,6 +15,7 @@ namespace NFq {
 
 struct TControlPlaneStorageConfig {
     NConfig::TControlPlaneStorageConfig Proto;
+    NConfig::TComputeConfig ComputeConfigProto;
     TString IdsPrefix;
     TDuration IdempotencyKeyTtl;
     TDuration AutomaticQueriesTtl;
@@ -30,7 +31,7 @@ struct TControlPlaneStorageConfig {
     TDuration QuotaTtl;
     TDuration MetricsTtl;
 
-    TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common);
+    TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const NConfig::TComputeConfig& computeConfigProto);
 };
 
 } // NFq

+ 1 - 0
ydb/core/fq/libs/control_plane_storage/control_plane_storage.h

@@ -46,6 +46,7 @@ NActors::IActor* CreateYdbControlPlaneStorageServiceActor(
     const NConfig::TControlPlaneStorageConfig& config,
     const NYql::TS3GatewayConfig& s3Config,
     const NConfig::TCommonConfig& common,
+    const NConfig::TComputeConfig& computeConfig,
     const ::NMonitoring::TDynamicCounterPtr& counters,
     const NFq::TYqSharedResources::TPtr& yqSharedResources,
     const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,

Некоторые файлы не были показаны из-за большого количества измененных файлов