Просмотр исходного кода

YQ-3493 support resource pool classifiers kqp_proxy cache (#7688)

Pisarenko Grigoriy 7 месяцев назад
Родитель
Сommit
a00d2864ab

+ 20 - 0
ydb/core/kqp/common/events/workload_service.h

@@ -12,6 +12,16 @@
 
 namespace NKikimr::NKqp::NWorkload {
 
+struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
+    TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
+        : Database(database)
+        , PoolId(poolId)
+    {}
+
+    const TString Database;
+    const TString PoolId;
+};
+
 struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
     TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
         : Database(database)
@@ -80,4 +90,14 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
     const std::optional<NACLib::TSecurityObject> SecurityObject;
 };
 
+struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
+    TEvUpdateDatabaseInfo(const TString& database, bool serverless)
+        : Database(database)
+        , Serverless(serverless)
+    {}
+
+    const TString Database;
+    const bool Serverless;
+};
+
 }  // NKikimr::NKqp::NWorkload

+ 2 - 0
ydb/core/kqp/common/simple/kqp_event_ids.h

@@ -175,6 +175,8 @@ struct TKqpWorkloadServiceEvents {
         EvCleanupRequest,
         EvCleanupResponse,
         EvUpdatePoolInfo,
+        EvUpdateDatabaseInfo,
+        EvSubscribeOnPoolChanges,
     };
 };
 

+ 1 - 1
ydb/core/kqp/compile_service/kqp_compile_actor.cpp

@@ -275,7 +275,7 @@ private:
 
         KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
             FederatedQuerySetup, UserToken, GUCSettings, QueryServiceConfig, ApplicationName, AppData(ctx)->FunctionRegistry,
-            false, false, std::move(TempTablesState), nullptr, SplitCtx);
+            false, false, std::move(TempTablesState), nullptr, SplitCtx, UserRequestContext);
 
         IKqpHost::TPrepareSettings prepareSettings;
         prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;

+ 1 - 1
ydb/core/kqp/gateway/behaviour/resource_pool_classifier/object.cpp

@@ -123,7 +123,7 @@ NJson::TJsonValue TResourcePoolClassifierConfig::GetDebugJson() const {
 }
 
 bool TResourcePoolClassifierConfig::operator==(const TResourcePoolClassifierConfig& other) const {
-    return std::tie(Database, Name, Rank, ConfigJson) != std::tie(other.Database, other.Name, other.Rank, other.ConfigJson);
+    return std::tie(Database, Name, Rank, ConfigJson) == std::tie(other.Database, other.Name, other.Rank, other.ConfigJson);
 }
 
 NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierConfig::GetBehaviour() {

+ 5 - 4
ydb/core/kqp/host/kqp_host.cpp

@@ -1033,7 +1033,8 @@ public:
         std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
         const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
         TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr,
-        NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig())
+        NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig(),
+        const TIntrusivePtr<TUserRequestContext>& userRequestContext = nullptr)
         : Gateway(gateway)
         , Cluster(cluster)
         , GUCSettings(gUCSettings)
@@ -1044,7 +1045,7 @@ public:
         , KeepConfigChanges(keepConfigChanges)
         , IsInternalCall(isInternalCall)
         , FederatedQuerySetup(federatedQuerySetup)
-        , SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
+        , SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken, nullptr, userRequestContext))
         , Config(config)
         , TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
         , PlanBuilder(CreatePlanBuilder(*TypesCtx))
@@ -1958,10 +1959,10 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const
     const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
     std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TGUCSettings::TPtr& gUCSettings,
     const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
-    bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx)
+    bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
 {
     return MakeIntrusive<TKqpHost>(gateway, cluster, database, gUCSettings, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
-                                   keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig);
+                                   keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig, userRequestContext);
 }
 
 } // namespace NKqp

+ 1 - 1
ydb/core/kqp/host/kqp_host.h

@@ -123,7 +123,7 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
     const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
     bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
     NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/,
-    NYql::TExprContext* ctx = nullptr);
+    NYql::TExprContext* ctx = nullptr, const TIntrusivePtr<TUserRequestContext>& userRequestContext = nullptr);
 
 } // namespace NKqp
 } // namespace NKikimr

+ 1 - 1
ydb/core/kqp/host/kqp_runner.cpp

@@ -146,7 +146,7 @@ public:
         , Config(sessionCtx->ConfigPtr())
         , TransformCtx(transformCtx)
         , OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
-            sessionCtx->TablesPtr()))
+            sessionCtx->TablesPtr(), sessionCtx->GetUserRequestContext()))
         , BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
         , Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel)))
     {

+ 4 - 1
ydb/core/kqp/opt/kqp_opt.h

@@ -10,11 +10,13 @@ namespace NKikimr::NKqp::NOpt {
 
 struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
     TKqpOptimizeContext(const TString& cluster, const NYql::TKikimrConfiguration::TPtr& config,
-        const TIntrusivePtr<NYql::TKikimrQueryContext> queryCtx, const TIntrusivePtr<NYql::TKikimrTablesData>& tables)
+        const TIntrusivePtr<NYql::TKikimrQueryContext> queryCtx, const TIntrusivePtr<NYql::TKikimrTablesData>& tables,
+        const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext>& userRequestContext)
         : Cluster(cluster)
         , Config(config)
         , QueryCtx(queryCtx)
         , Tables(tables)
+        , UserRequestContext(userRequestContext)
     {
         YQL_ENSURE(QueryCtx);
         YQL_ENSURE(Tables);
@@ -24,6 +26,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
     const NYql::TKikimrConfiguration::TPtr Config;
     const TIntrusivePtr<NYql::TKikimrQueryContext> QueryCtx;
     const TIntrusivePtr<NYql::TKikimrTablesData> Tables;
+    const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext> UserRequestContext;
     int JoinsCount{};
     int EquiJoinsCount{};
     std::shared_ptr<NJson::TJsonValue> OverrideStatistics{};

+ 16 - 3
ydb/core/kqp/opt/kqp_query_plan.cpp

@@ -2384,11 +2384,21 @@ void PhyQuerySetTxPlans(NKqpProto::TKqpPhyQuery& queryProto, const TKqpPhysicalQ
         txPlans.emplace_back(phyTx.GetPlan());
     }
 
+    TString queryStats = "";
+    if (optCtx && optCtx->UserRequestContext && optCtx->UserRequestContext->PoolId) {
+        NJsonWriter::TBuf writer;
+        writer.BeginObject();
+        writer.WriteKey("ResourcePoolId").WriteString(optCtx->UserRequestContext->PoolId);
+        writer.EndObject();
+
+        queryStats = writer.Str();
+    }
+
     NJsonWriter::TBuf writer;
     writer.SetIndentSpaces(2);
     WriteCommonTablesInfo(writer, serializerCtx.Tables);
 
-    queryProto.SetQueryPlan(SerializeTxPlans(txPlans, optCtx, writer.Str()));
+    queryProto.SetQueryPlan(SerializeTxPlans(txPlans, optCtx, writer.Str(), queryStats));
 }
 
 void FillAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqStatsAggr& aggr, const TString& name) {
@@ -2740,7 +2750,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
     return AddExecStatsToTxPlan(txPlanJson, stats, TIntrusivePtr<NOpt::TKqpOptimizeContext>());
 }
 
-TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
+TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats, const TString& poolId) {
     TVector<const TString> txPlans;
     for (const auto& execStats: queryStats.GetExecutions()) {
         for (const auto& txPlan: execStats.GetTxPlansWithStats()) {
@@ -2764,7 +2774,10 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
 
     writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs());
     writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs());
-    writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
+    if (poolId) {
+        writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
+        writer.WriteKey("ResourcePoolId").WriteString(poolId);
+    }
     writer.EndObject();
 
     return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>(), "", writer.Str());

+ 1 - 1
ydb/core/kqp/opt/kqp_query_plan.h

@@ -44,7 +44,7 @@ void PhyQuerySetTxPlans(NKqpProto::TKqpPhyQuery& queryProto, const NYql::NNodes:
  */
 TString AddExecStatsToTxPlan(const TString& txPlan, const NYql::NDqProto::TDqExecutionStats& stats);
 
-TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats);
+TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats, const TString& poolId = "");
 
 TString SerializeScriptPlan(const TVector<const TString>& queryPlans);
 

Некоторые файлы не были показаны из-за большого количества измененных файлов