Browse Source

remove dangerous bulk-replaces

ssmike 2 years ago
parent
commit
bc6c67e288

+ 11 - 33
ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp

@@ -97,51 +97,29 @@ NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TE
         }
     }
 
-    TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")};
-    NYql::TNodeOnNodeOwnedMap bodyReplaces;
-
-    bodyReplaces[sourceArg.Raw()] =
-        Build<TCoExtractMembers>(ctx, node.Pos())
-            .Members(readRangesSource.Columns())
-            .Input<TCoSkipNullMembers>()
-                .Input(replaceArg)
-                .Members().Add(skipNullColumns).Build()
-            .Build()
-        .Done().Ptr();
-
-    NYql::TNodeOnNodeOwnedMap inputsReplaces;
     settings.SkipNullKeys.clear();
-
-    auto newSource = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos())
+    auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos())
         .Table(readRangesSource.Table())
         .Columns().Add(columns).Build()
         .Settings(settings.BuildNode(ctx, source.Settings().Pos()))
         .RangesExpr(readRangesSource.RangesExpr())
         .ExplainPrompt(readRangesSource.ExplainPrompt())
         .Done();
-    inputsReplaces[readRangesSource.Raw()] = newSource.Ptr();
-
-    TVector<TCoArgument> args;
-    for (auto arg : stage.Program().Args()) {
-        if (arg.Raw() == sourceArg.Raw()) {
-            args.push_back(replaceArg);
-        } else {
-            args.push_back(arg);
-        }
-    }
+    TDqStage replacedSettings = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx);
 
-    return Build<TDqStage>(ctx, node.Pos())
-        .Settings(stage.Settings())
-        .Inputs(TExprList(ctx.ReplaceNodes(stage.Inputs().Ptr(), inputsReplaces)))
-        .Outputs(stage.Outputs())
-        .Program<TCoLambda>()
-            .Args(args)
-            .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), bodyReplaces))
+    TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")};
+    auto replaceExpr =
+        Build<TCoExtractMembers>(ctx, node.Pos())
+            .Members(readRangesSource.Columns())
+            .Input<TCoSkipNullMembers>()
+                .Input(replaceArg)
+                .Members().Add(skipNullColumns).Build()
             .Build()
         .Done();
+
+    return ReplaceStageArg(replacedSettings, *tableSourceIndex, replaceArg, replaceExpr, ctx);
 }
 
-//FIXME: simplify KIKIMR-16987
 TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
     if (!node.Maybe<TKqlReadTable>()) {
         return node;

+ 64 - 0
ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp

@@ -103,4 +103,68 @@ bool AllowFuseJoinInputs(TExprBase node) {
     return true;
 }
 
+NYql::NNodes::TDqStage ReplaceStageArg(NYql::NNodes::TDqStage stage, size_t inputIndex,
+    NYql::NNodes::TCoArgument replaceArg, NYql::NNodes::TExprBase bodyExpression, NYql::TExprContext& ctx)
+{
+    auto sourceArg = stage.Program().Args().Arg(inputIndex);
+
+    size_t index = 0;
+    TVector<TCoArgument> args;
+    NYql::TNodeOnNodeOwnedMap bodyReplaces;
+    for (auto arg : stage.Program().Args()) {
+        if (arg.Raw() == sourceArg.Raw()) {
+            args.push_back(replaceArg);
+        } else {
+            TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_" << index)};
+            args.push_back(replaceArg);
+            bodyReplaces[arg.Raw()] = replaceArg.Ptr();
+        }
+        index += 1;
+    }
+
+    bodyReplaces[sourceArg.Raw()] = bodyExpression.Ptr();
+
+    return Build<TDqStage>(ctx, stage.Pos())
+        .Settings(stage.Settings())
+        .Inputs(stage.Inputs())
+        .Outputs(stage.Outputs())
+        .Program<TCoLambda>()
+            .Args(args)
+            .Body(TExprBase(ctx.ReplaceNodes(stage.Program().Body().Ptr(), bodyReplaces)))
+            .Build()
+        .Done();
+}
+
+NYql::NNodes::TDqStage ReplaceTableSourceSettings(NYql::NNodes::TDqStage stage, size_t inputIndex,
+    NYql::NNodes::TKqpReadRangesSourceSettings settings, NYql::TExprContext& ctx)
+{
+    auto source = stage.Inputs().Item(inputIndex).Cast<TDqSource>();
+    auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>();
+    auto sourceArg = stage.Program().Args().Arg(inputIndex);
+
+    TVector<NYql::NNodes::TExprBase> inputs;
+    size_t index = 0;
+    for (auto input : stage.Inputs()) {
+        if (index == inputIndex) {
+            inputs.push_back(
+                Build<TDqSource>(ctx, input.Pos())
+                    .DataSource<TCoDataSource>()
+                        .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build()
+                        .Build()
+                    .Settings(settings)
+                .Done());
+        } else {
+            inputs.push_back(input);
+        }
+        index += 1;
+    }
+
+    return Build<TDqStage>(ctx, stage.Pos())
+        .Settings(stage.Settings())
+        .Inputs().Add(inputs).Build()
+        .Outputs(stage.Outputs())
+        .Program(stage.Program())
+        .Done();
+}
+
 } // namespace NKikimr::NKqp::NOpt

+ 6 - 0
ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h

@@ -22,6 +22,12 @@ NYql::NNodes::TCoAtomList BuildColumnsList(const THashSet<TStringBuf>& columns,
 NYql::NNodes::TCoAtomList BuildColumnsList(const TVector<TString>& columns, NYql::TPositionHandle pos,
     NYql::TExprContext& ctx);
 
+NYql::NNodes::TDqStage ReplaceStageArg(NYql::NNodes::TDqStage stage, size_t inputIndex,
+    NYql::NNodes::TCoArgument replaceArg, NYql::NNodes::TExprBase bodyExpression, NYql::TExprContext& ctx);
+
+NYql::NNodes::TDqStage ReplaceTableSourceSettings(NYql::NNodes::TDqStage stage, size_t inputIndex,
+    NYql::NNodes::TKqpReadRangesSourceSettings settings, NYql::TExprContext& ctx);
+
 } // NKikimr::NKqp::NOpt
 
 

+ 3 - 4
ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp

@@ -38,7 +38,6 @@ TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, cons
         return node; // already set?
     }
 
-    NYql::TNodeOnNodeOwnedMap replaces;
     auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex);
     TExprNode::TPtr foundTake;
     bool singleConsumer = true;
@@ -105,6 +104,7 @@ TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, cons
             .Input(limitValue.Cast())
             .Done().Ptr());
     }
+
     auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos())
         .Table(readRangesSource.Table())
         .Columns(readRangesSource.Columns())
@@ -112,9 +112,8 @@ TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, cons
         .RangesExpr(readRangesSource.RangesExpr())
         .ExplainPrompt(readRangesSource.ExplainPrompt())
         .Done();
-    replaces[readRangesSource.Raw()] = newSettings.Ptr();
-                              
-    return TExprBase(ctx.ReplaceNodes(node.Ptr(), replaces));
+
+    return ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx);
 }                             
                               
 

+ 22 - 12
ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp

@@ -204,8 +204,7 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(
     auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readRangesSource.Table().Path());
 
     TVector<NYql::TKqpReadTableSettings> newSettings;
-    NYql::TNodeOnNodeOwnedMap replaces;
-    auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex);
+    NYql::TNodeOnNodeOwnedMap bodyReplaces;
     VisitExpr(stage.Program().Body().Ptr(),
         [&](const TExprNode::TPtr& exprPtr) -> bool {
             TExprBase expr(exprPtr);
@@ -225,7 +224,7 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(
                         return input;
                     });
                 if (newExpr.Ptr() != expr.Ptr()) {
-                    replaces[expr.Raw()] = newExpr.Ptr();
+                    bodyReplaces[expr.Raw()] = newExpr.Ptr();
                 }
             }
             return true;
@@ -238,17 +237,28 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(
             }
         }
 
-        auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos())
-            .Table(readRangesSource.Table())
-            .Columns(readRangesSource.Columns())
-            .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos()))
-            .RangesExpr(readRangesSource.RangesExpr())
-            .ExplainPrompt(readRangesSource.ExplainPrompt())
-            .Done();
-        replaces[readRangesSource.Raw()] = newSource.Ptr();
+        if (settings != newSettings[0]) {
+            auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos())
+                .Table(readRangesSource.Table())
+                .Columns(readRangesSource.Columns())
+                .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos()))
+                .RangesExpr(readRangesSource.RangesExpr())
+                .ExplainPrompt(readRangesSource.ExplainPrompt())
+                .Done();
+            stage = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSource, exprCtx);
+        }
+    }
+
+    if (bodyReplaces.empty()) {
+        return stage;
     }
 
-    return TExprBase(exprCtx.ReplaceNodes(node.Ptr(), replaces));
+    return Build<TDqStage>(exprCtx, stage.Pos())
+        .Inputs(stage.Inputs())
+        .Outputs(stage.Outputs())
+        .Settings(stage.Settings())
+        .Program(TCoLambda(exprCtx.ReplaceNodes(stage.Program().Ptr(), bodyReplaces)))
+        .Done();
 }
 
 } // namespace NKikimr::NKqp::NOpt