Browse Source

replace into olap table (#1164)

Nikita Vasilev 1 year ago
parent
commit
b26eb44f1b

+ 1 - 0
ydb/core/kqp/common/kqp_yql.h

@@ -44,6 +44,7 @@ struct TKqpPhyTxSettings {
 };
 
 constexpr TStringBuf KqpReadRangesSourceName = "KqpReadRangesSource";
+constexpr TStringBuf KqpTableSinkName = "KqpTableSinkName";
 
 static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv;
 static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv;

+ 4 - 2
ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

@@ -4,13 +4,14 @@
 
 #include <ydb/core/base/appdata.h>
 #include <ydb/core/kqp/runtime/kqp_compute.h>
-#include <ydb/core/kqp/runtime/kqp_read_table.h>
 #include <ydb/core/kqp/runtime/kqp_read_actor.h>
+#include <ydb/core/kqp/runtime/kqp_write_actor.h>
+#include <ydb/core/kqp/runtime/kqp_read_table.h>
 #include <ydb/core/kqp/runtime/kqp_sequencer_factory.h>
 #include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
+#include <ydb/library/yql/providers/generic/actors/yql_generic_source_factory.h>
 #include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
 #include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
-#include <ydb/library/yql/providers/generic/actors/yql_generic_source_factory.h>
 
 
 namespace NKikimr {
@@ -67,6 +68,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
     auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
     RegisterStreamLookupActorFactory(*factory, counters);
     RegisterKqpReadActor(*factory, counters);
+    RegisterKqpWriteActor(*factory, counters);
     RegisterSequencerActorFactory(*factory, counters);
 
     if (federatedQuerySetup) {

+ 3 - 2
ydb/core/kqp/executer_actor/kqp_data_executer.cpp

@@ -1665,8 +1665,9 @@ private:
             }
         }
 
-        for (const auto& tableOp : stage.GetTableOps()) {
-            if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
+        for (const auto &tableOp : stage.GetTableOps()) {
+            if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange
+                && tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kUpsertRows) {
                 return true;
             }
         }

+ 34 - 15
ydb/core/kqp/executer_actor/kqp_executer_impl.h

@@ -737,28 +737,45 @@ protected:
         }
     }
 
+    void BuildExternalSinks(const NKqpProto::TKqpSink& sink, TKqpTasksGraph::TTaskType& task) {
+        const auto& extSink = sink.GetExternalSink();
+        auto sinkName = extSink.GetSinkName();
+        if (sinkName) {
+            auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson();
+            task.Meta.SecureParams.emplace(sinkName, structuredToken);
+            if (GetUserRequestContext()->TraceId) {
+                task.Meta.TaskParams.emplace("fq.job_id", GetUserRequestContext()->CustomerSuppliedId);
+                // "fq.restart_count"
+            }
+        }
+
+        auto& output = task.Outputs[sink.GetOutputIndex()];
+        output.Type = TTaskOutputType::Sink;
+        output.SinkType = extSink.GetType();
+        output.SinkSettings = extSink.GetSettings();
+    }
+
+    void BuildInternalSinks(const NKqpProto::TKqpSink& sink, TKqpTasksGraph::TTaskType& task) {
+        const auto& intSink = sink.GetInternalSink();
+        auto& output = task.Outputs[sink.GetOutputIndex()];
+        output.Type = TTaskOutputType::Sink;
+        output.SinkType = intSink.GetType();
+        output.SinkSettings = intSink.GetSettings();
+    }
+
     void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
         if (stage.SinksSize() > 0) {
             YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
             const auto& sink = stage.GetSinks(0);
-            YQL_ENSURE(sink.HasExternalSink(), "only external sinks are supported");
-            const auto& extSink = sink.GetExternalSink();
             YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size());
 
-            auto sinkName = extSink.GetSinkName();
-            if (sinkName) {
-                auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson();
-                task.Meta.SecureParams.emplace(sinkName, structuredToken);
-                if (GetUserRequestContext()->TraceId) {
-                    task.Meta.TaskParams.emplace("fq.job_id", GetUserRequestContext()->CustomerSuppliedId);
-                    // "fq.restart_count"
-                }
+            if (sink.HasInternalSink()) {
+                BuildInternalSinks(sink, task);
+            } else if (sink.HasExternalSink()) {
+                BuildExternalSinks(sink, task);
+            } else {
+                YQL_ENSURE(false, "unknown sink type");
             }
-
-            auto& output = task.Outputs[sink.GetOutputIndex()];
-            output.Type = TTaskOutputType::Sink;
-            output.SinkType = extSink.GetType();
-            output.SinkSettings = extSink.GetSettings();
         }
     }
 
@@ -1353,6 +1370,7 @@ protected:
                         auto& task = TasksGraph.GetTask(taskIdx);
                         task.Meta.SetEnableShardsSequentialScan(readSettings.Sorted);
                         PrepareScanMetaForUsage(task.Meta, keyTypes);
+                        BuildSinks(stage, task);
                     }
                 }
 
@@ -1380,6 +1398,7 @@ protected:
                         task.Meta.ScanTask = true;
                         task.Meta.Type = TTaskMeta::TTaskType::Scan;
                         task.SetMetaId(metaGlueingId);
+                        BuildSinks(stage, task);
                     }
                 }
             }

+ 19 - 0
ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

@@ -521,6 +521,25 @@
             "Base": "TKqlDeleteRowsBase",
             "Match": {"Type": "Callable", "Name": "KqpDeleteRows"}
         },
+        {
+            "Name": "TKqpTableSinkSettings",
+            "Base": "TCallable",
+            "Match": {"Type": "Callable", "Name": "KqpTableSinkSettings"},
+            "Children": [
+                {"Index": 0, "Name": "Table", "Type": "TKqpTable"},
+                {"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
+                {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
+            ]
+        },
+        {
+            "Name": "TKqpTableSink",
+            "Base": "TCallable",
+            "Match": {"Type": "Callable", "Name": "DataSink"},
+            "Children": [
+                {"Index": 0, "Name": "Category", "Type": "TCoAtom"},
+                {"Index": 1, "Name": "Cluster", "Type": "TCoAtom"}
+            ]
+        },
         {
             "Name": "TKqpSinkEffect",
             "Base": "TKqlEffectBase",

+ 12 - 0
ydb/core/kqp/host/kqp_type_ann.cpp

@@ -1748,6 +1748,14 @@ TStatus AnnotateKqpSinkEffect(const TExprNode::TPtr& node, TExprContext& ctx) {
     return TStatus::Ok;
 }
 
+TStatus AnnotateTableSinkSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
+    if (!EnsureMinMaxArgsCount(*input, 2, 3, ctx)) {
+        return TStatus::Error;
+    }
+    input->SetTypeAnn(ctx.MakeType<TVoidExprType>());
+    return TStatus::Ok;
+}
+
 } // namespace
 
 TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cluster,
@@ -1902,6 +1910,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
                 return AnnotateReturningList(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
             }
 
+            if (TKqpTableSinkSettings::Match(input.Get())) {
+                return AnnotateTableSinkSettings(input, ctx);
+            }
+
             return dqTransformer->Transform(input, output, ctx);
         });
 }

+ 2 - 1
ydb/core/kqp/opt/kqp_opt_build_txs.cpp

@@ -125,7 +125,8 @@ private:
         Y_DEBUG_ABORT_UNLESS(!stages.empty());
 
         TKqpPhyTxSettings txSettings;
-        txSettings.Type = EPhysicalTxType::Data;
+        YQL_ENSURE(QueryType != EKikimrQueryType::Scan);
+        txSettings.Type = GetPhyTxType(false);
         txSettings.WithEffects = true;
 
         auto tx = Build<TKqpPhysicalTx>(ctx, inputExpr->Pos())

+ 87 - 17
ydb/core/kqp/opt/kqp_opt_effects.cpp

@@ -217,6 +217,37 @@ bool IsMapWrite(const TKikimrTableDescription& table, TExprBase input, TExprCont
 #undef DBG
 }
 
+TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const TCoAtomList& columns, TExprContext& ctx) {
+    Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr));
+
+    return Build<TDqStage>(ctx, expr.Pos())
+        .Inputs()
+            .Build()
+        .Program()
+            .Args({})
+            .Body<TCoToFlow>()
+                .Input(expr)
+                .Build()
+            .Build()
+        .Outputs<TDqStageOutputsList>()
+            .Add<TDqSink>()
+                .DataSink<TKqpTableSink>()
+                    .Category(ctx.NewAtom(expr.Pos(), NYql::KqpTableSinkName))
+                    .Cluster(ctx.NewAtom(expr.Pos(), "db"))
+                    .Build()
+                .Index().Value("0").Build()
+                .Settings<TKqpTableSinkSettings>()
+                    .Table(table)
+                    .Columns(columns)
+                    .Settings()
+                        .Build()
+                    .Build()
+                .Build()
+            .Build()
+        .Settings().Build()
+        .Done();
+}
+
 TDqPhyPrecompute BuildPrecomputeStage(TExprBase expr, TExprContext& ctx) {
     Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr));
 
@@ -247,23 +278,34 @@ TDqPhyPrecompute BuildPrecomputeStage(TExprBase expr, TExprContext& ctx) {
 }
 
 bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
-    const TCoArgument& inputArg, TMaybeNode<TExprBase>& stageInput, TMaybeNode<TExprBase>& effect)
+    const TCoArgument& inputArg, TMaybeNode<TExprBase>& stageInput, TMaybeNode<TExprBase>& effect, bool& sinkEffect)
 {
+    const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, node.Table().Path());
+
+    sinkEffect = kqpCtx.IsGenericQuery() && table.Metadata->Kind == EKikimrTableKind::Olap;
+
     TKqpUpsertRowsSettings settings;
     if (node.Settings()) {
         settings = TKqpUpsertRowsSettings::Parse(node.Settings().Cast());
     }
     if (IsDqPureExpr(node.Input())) {
-        stageInput = BuildPrecomputeStage(node.Input(), ctx);
-
-        effect = Build<TKqpUpsertRows>(ctx, node.Pos())
-            .Table(node.Table())
-            .Input<TCoIterator>()
-                .List(inputArg)
-                .Build()
-            .Columns(node.Columns())
-            .Settings(settings.BuildNode(ctx, node.Pos()))
-            .Done();
+        if (sinkEffect) {
+            stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), node.Columns(), ctx);
+            effect = Build<TKqpSinkEffect>(ctx, node.Pos())
+                .Stage(stageInput.Cast().Ptr())
+                .SinkIndex().Build("0")
+                .Done();
+        } else {
+            stageInput = BuildPrecomputeStage(node.Input(), ctx);
+            effect = Build<TKqpUpsertRows>(ctx, node.Pos())
+                .Table(node.Table())
+                .Input<TCoIterator>()
+                    .List(inputArg)
+                    .Build()
+                .Columns(node.Columns())
+                .Settings(settings.BuildNode(ctx, node.Pos()))
+                .Done();
+        }
         return true;
     }
 
@@ -271,16 +313,44 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
         return false;
     }
 
-    auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, node.Table().Path());
-
     auto dqUnion = node.Input().Cast<TDqCnUnionAll>();
-    auto program = dqUnion.Output().Stage().Program();
+    auto stage = dqUnion.Output().Stage();
+    auto program = stage.Program();
     auto input = program.Body();
 
-    if (InplaceUpdateEnabled(*kqpCtx.Config, table, node.Columns()) && IsMapWrite(table, input, ctx)) {
+    if (sinkEffect) {
+        stageInput = Build<TDqStage>(ctx, node.Pos())
+            .Inputs(stage.Inputs())
+            .Program()
+                .Args(program.Args())
+                .Body(input)
+                .Build()
+            .Outputs<TDqStageOutputsList>()
+                .Add<TDqSink>()
+                    .DataSink<TKqpTableSink>()
+                        .Category(ctx.NewAtom(node.Pos(), NYql::KqpTableSinkName))
+                        .Cluster(ctx.NewAtom(node.Pos(), "db"))
+                        .Build()
+                    .Index().Value("0").Build()
+                    .Settings<TKqpTableSinkSettings>()
+                        .Table(node.Table())
+                        .Columns(node.Columns())
+                        .Settings()
+                            .Build()
+                        .Build()
+                    .Build()
+                .Build()
+            .Settings().Build()
+            .Done();
+
+        effect = Build<TKqpSinkEffect>(ctx, node.Pos())
+            .Stage(stageInput.Cast().Ptr())
+            .SinkIndex().Build("0")
+            .Done();
+    } else if (InplaceUpdateEnabled(*kqpCtx.Config, table, node.Columns()) && IsMapWrite(table, input, ctx)) {
         stageInput = Build<TKqpCnMapShard>(ctx, node.Pos())
             .Output()
-                .Stage(dqUnion.Output().Stage())
+                .Stage(stage)
                 .Index(dqUnion.Output().Index())
                 .Build()
             .Done();
@@ -389,7 +459,7 @@ bool BuildEffects(TPositionHandle pos, const TVector<TExprBase>& effects,
                 .Done();
 
             if (auto maybeUpsertRows = effect.Maybe<TKqlUpsertRows>()) {
-                if (!BuildUpsertRowsEffect(maybeUpsertRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect)) {
+                if (!BuildUpsertRowsEffect(maybeUpsertRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect, sinkEffect)) {
                     return false;
                 }
             }

+ 0 - 1
ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp

@@ -40,7 +40,6 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim
     };
     TMaybe<TMatchedRead> matched;
 
-    TMaybeNode<TKqpReadTable> mayberead;
     VisitExpr(stage.Program().Body().Ptr(), [&](const TExprNode::TPtr& node) {
             TExprBase expr(node);
             if (auto cast = expr.Maybe<TKqpReadTable>()) {

+ 19 - 0
ydb/core/kqp/provider/yql_kikimr_datasink.cpp

@@ -607,6 +607,21 @@ public:
         return true;
     }
 
+    bool CheckIOOlap(const TKikimrKey& key, const TExprNode::TPtr& node, const TCoAtom& mode, TExprContext& ctx) {
+        TKiDataSink dataSink(node->ChildPtr(1));
+        auto& tableDesc = SessionCtx->Tables().GetTable(TString{dataSink.Cluster()}, key.GetTablePath());
+        if (!tableDesc.Metadata || tableDesc.Metadata->Kind != EKikimrTableKind::Olap) {
+            return true;
+        }
+
+        if (mode != "replace" && mode != "drop" && mode != "drop_if_exists") {
+            ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Write mode '" << static_cast<TStringBuf>(mode) << "' is not supported for olap tables."));
+            return false;
+        }
+
+        return true;
+    }
+
     TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override {
         YQL_ENSURE(node->IsCallable(WriteName), "Expected Write!, got: " << node->Content());
 
@@ -628,6 +643,10 @@ public:
                     return resultNode;
                 }
 
+                if (!CheckIOOlap(key, node, mode, ctx)) {
+                    return nullptr;
+                }
+
                 if (!settings.ReturningList.IsValid()) {
                     settings.ReturningList = Build<TExprList>(ctx, node->Pos()).Done();
                 }

Some files were not shown because too many files changed in this diff