Browse Source

support ydb pragma MaxTasksPerStage

Added MaxTasksPerStage setting.
grigoriypisar 1 year ago
parent
commit
1f319d6f63

+ 1 - 1
ydb/core/kqp/host/kqp_runner.cpp

@@ -307,7 +307,7 @@ private:
         auto& preparedQuery = *TransformCtx->QueryCtx->PreparingQuery;
         auto& preparedQuery = *TransformCtx->QueryCtx->PreparingQuery;
         TKqpPhysicalQuery physicalQuery(transformedQuery);
         TKqpPhysicalQuery physicalQuery(transformedQuery);
 
 
-        auto compiler = CreateKqpQueryCompiler(Cluster, OptimizeCtx->Tables, FuncRegistry, TypesCtx);
+        auto compiler = CreateKqpQueryCompiler(Cluster, OptimizeCtx->Tables, FuncRegistry, TypesCtx, Config);
         auto ret = compiler->CompilePhysicalQuery(physicalQuery, dataQueryBlocks, *preparedQuery.MutablePhysicalQuery(), ctx);
         auto ret = compiler->CompilePhysicalQuery(physicalQuery, dataQueryBlocks, *preparedQuery.MutablePhysicalQuery(), ctx);
         if (!ret) {
         if (!ret) {
             ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to compile physical query."));
             ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to compile physical query."));

+ 2 - 0
ydb/core/kqp/provider/yql_kikimr_settings.cpp

@@ -65,6 +65,8 @@ TKikimrConfiguration::TKikimrConfiguration() {
     REGISTER_SETTING(*this, OptUseFinalizeByKey);
     REGISTER_SETTING(*this, OptUseFinalizeByKey);
     REGISTER_SETTING(*this, OptEnableCostBasedOptimization);
     REGISTER_SETTING(*this, OptEnableCostBasedOptimization);
 
 
+    REGISTER_SETTING(*this, MaxTasksPerStage);
+
     /* Runtime */
     /* Runtime */
     REGISTER_SETTING(*this, ScanQuery);
     REGISTER_SETTING(*this, ScanQuery);
 }
 }

+ 2 - 0
ydb/core/kqp/provider/yql_kikimr_settings.h

@@ -58,6 +58,8 @@ struct TKikimrSettings {
     NCommon::TConfSetting<bool, false> OptUseFinalizeByKey;
     NCommon::TConfSetting<bool, false> OptUseFinalizeByKey;
     NCommon::TConfSetting<bool, false> OptEnableCostBasedOptimization;
     NCommon::TConfSetting<bool, false> OptEnableCostBasedOptimization;
 
 
+    NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
+
     /* Runtime */
     /* Runtime */
     NCommon::TConfSetting<bool, true> ScanQuery;
     NCommon::TConfSetting<bool, true> ScanQuery;
 
 

+ 7 - 8
ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

@@ -411,7 +411,7 @@ void FillOlapProgram(const TCoLambda& process, const NKikimr::NMiniKQL::TType* m
 class TKqpQueryCompiler : public IKqpQueryCompiler {
 class TKqpQueryCompiler : public IKqpQueryCompiler {
 public:
 public:
     TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
     TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
-        const NMiniKQL::IFunctionRegistry& funcRegistry, TTypeAnnotationContext& typesCtx)
+        const NMiniKQL::IFunctionRegistry& funcRegistry, TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config)
         : Cluster(cluster)
         : Cluster(cluster)
         , TablesData(tablesData)
         , TablesData(tablesData)
         , FuncRegistry(funcRegistry)
         , FuncRegistry(funcRegistry)
@@ -420,6 +420,7 @@ public:
         , KqlCtx(cluster, tablesData, TypeEnv, FuncRegistry)
         , KqlCtx(cluster, tablesData, TypeEnv, FuncRegistry)
         , KqlCompiler(CreateKqlCompiler(KqlCtx, typesCtx))
         , KqlCompiler(CreateKqlCompiler(KqlCtx, typesCtx))
         , TypesCtx(typesCtx)
         , TypesCtx(typesCtx)
+        , Config(config)
     {
     {
         Alloc.Release();
         Alloc.Release();
     }
     }
@@ -928,11 +929,8 @@ private:
             // In runtime, number of tasks with Sources is limited by 2x of node count
             // In runtime, number of tasks with Sources is limited by 2x of node count
             // We prepare a lot of partitions and distribute them between these tasks
             // We prepare a lot of partitions and distribute them between these tasks
             // Constraint of 1 task per partition is NOT valid anymore
             // Constraint of 1 task per partition is NOT valid anymore
-            // We choose 120 as number with a lot of divisors for even final distribution
-            //
-            // TODO: Replace with ydb.MaxTasksPerStage when implemented
-            //
-            dqIntegration->Partition(NYql::TDqSettings(), 120, source.Ref(), partitionParams, &clusterName, ctx, false);
+            auto maxTasksPerStage = Config->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
+            dqIntegration->Partition(NYql::TDqSettings(), maxTasksPerStage, source.Ref(), partitionParams, &clusterName, ctx, false);
             externalSource.SetTaskParamKey(TString(dataSourceCategory));
             externalSource.SetTaskParamKey(TString(dataSourceCategory));
             for (const TString& partitionParam : partitionParams) {
             for (const TString& partitionParam : partitionParams) {
                 externalSource.AddPartitionedTaskParams(partitionParam);
                 externalSource.AddPartitionedTaskParams(partitionParam);
@@ -1146,6 +1144,7 @@ private:
     TKqlCompileContext KqlCtx;
     TKqlCompileContext KqlCtx;
     TIntrusivePtr<NCommon::IMkqlCallableCompiler> KqlCompiler;
     TIntrusivePtr<NCommon::IMkqlCallableCompiler> KqlCompiler;
     TTypeAnnotationContext& TypesCtx;
     TTypeAnnotationContext& TypesCtx;
+    TKikimrConfiguration::TPtr Config;
     TSet<TString> SecretNames;
     TSet<TString> SecretNames;
 };
 };
 
 
@@ -1153,9 +1152,9 @@ private:
 
 
 TIntrusivePtr<IKqpQueryCompiler> CreateKqpQueryCompiler(const TString& cluster,
 TIntrusivePtr<IKqpQueryCompiler> CreateKqpQueryCompiler(const TString& cluster,
     const TIntrusivePtr<TKikimrTablesData> tablesData, const IFunctionRegistry& funcRegistry,
     const TIntrusivePtr<TKikimrTablesData> tablesData, const IFunctionRegistry& funcRegistry,
-    TTypeAnnotationContext& typesCtx)
+    TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config)
 {
 {
-    return MakeIntrusive<TKqpQueryCompiler>(cluster, tablesData, funcRegistry, typesCtx);
+    return MakeIntrusive<TKqpQueryCompiler>(cluster, tablesData, funcRegistry, typesCtx, config);
 }
 }
 
 
 } // namespace NKqp
 } // namespace NKqp

+ 2 - 1
ydb/core/kqp/query_compiler/kqp_query_compiler.h

@@ -2,6 +2,7 @@
 
 
 #include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
 #include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
 #include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
 #include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
+#include <ydb/core/kqp/provider/yql_kikimr_settings.h>
 #include <ydb/core/protos/kqp_physical.pb.h>
 #include <ydb/core/protos/kqp_physical.pb.h>
 
 
 #include <ydb/core/kqp/provider/yql_kikimr_provider.h>
 #include <ydb/core/kqp/provider/yql_kikimr_provider.h>
@@ -18,7 +19,7 @@ public:
 
 
 TIntrusivePtr<IKqpQueryCompiler> CreateKqpQueryCompiler(const TString& cluster,
 TIntrusivePtr<IKqpQueryCompiler> CreateKqpQueryCompiler(const TString& cluster,
     const TIntrusivePtr<NYql::TKikimrTablesData> tablesData, const NMiniKQL::IFunctionRegistry& funcRegistry,
     const TIntrusivePtr<NYql::TKikimrTablesData> tablesData, const NMiniKQL::IFunctionRegistry& funcRegistry,
-    NYql::TTypeAnnotationContext& typesCtx);
+    NYql::TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config);
 
 
 } // namespace NKqp
 } // namespace NKqp
 } // namespace NKikimr
 } // namespace NKikimr