Browse Source

KIKIMR-19831: exclude right columns from left only stream join result

fix(kqp): exclude right columns from left only join result
ulya-sidorina 1 year ago
parent
commit
bd90972502

+ 18 - 8
ydb/core/kqp/host/kqp_type_ann.cpp

@@ -23,6 +23,10 @@ using TStatus = IGraphTransformer::TStatus;
 
 
 namespace {
 namespace {
 
 
+bool RightJoinSideAllowed(const TStringBuf& joinType) {
+    return joinType != "LeftOnly";
+}
+
 const TTypeAnnotationNode* MakeKqpEffectType(TExprContext& ctx) {
 const TTypeAnnotationNode* MakeKqpEffectType(TExprContext& ctx) {
     return ctx.MakeType<TResourceExprType>(KqpEffectTag);
     return ctx.MakeType<TResourceExprType>(KqpEffectTag);
 }
 }
@@ -1363,6 +1367,7 @@ TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& c
     }
     }
 
 
     TCoAtom rightLabel(node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel));
     TCoAtom rightLabel(node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel));
+    TCoAtom joinType(node->Child(TKqlStreamIdxLookupJoin::idx_JoinType));
 
 
     const TStructExprType* leftDataType = leftInputTupleType->GetItems()[1]->Cast<TStructExprType>();
     const TStructExprType* leftDataType = leftInputTupleType->GetItems()[1]->Cast<TStructExprType>();
     TVector<const TItemExprType*> resultStructItems;
     TVector<const TItemExprType*> resultStructItems;
@@ -1372,10 +1377,12 @@ TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& c
         );
         );
     }
     }
 
 
-    for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) {
-        resultStructItems.emplace_back(
-            ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), member->GetItemType())
-        );
+    if (RightJoinSideAllowed(joinType.Value())) {
+        for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) {
+            resultStructItems.emplace_back(
+                ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), member->GetItemType())
+            );
+        }
     }
     }
 
 
     auto rowType = ctx.MakeType<TStructExprType>(resultStructItems);
     auto rowType = ctx.MakeType<TStructExprType>(resultStructItems);
@@ -1686,6 +1693,7 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
     }
     }
 
 
     TCoAtom rightLabel(node->Child(TKqpIndexLookupJoin::idx_RightLabel));
     TCoAtom rightLabel(node->Child(TKqpIndexLookupJoin::idx_RightLabel));
+    TCoAtom joinType(node->Child(TKqpIndexLookupJoin::idx_JoinType));
 
 
     TVector<const TItemExprType*> resultStructItems;
     TVector<const TItemExprType*> resultStructItems;
     for (const auto& item : leftRowType->GetItems()) {
     for (const auto& item : leftRowType->GetItems()) {
@@ -1694,10 +1702,12 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
         );
         );
     }
     }
 
 
-    for (const auto& item : rightRowType->Cast<TStructExprType>()->GetItems()) {
-        resultStructItems.emplace_back(
-            ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), item->GetItemType())
-        );
+    if (RightJoinSideAllowed(joinType.Value())) {
+        for (const auto& item : rightRowType->Cast<TStructExprType>()->GetItems()) {
+            resultStructItems.emplace_back(
+                ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), item->GetItemType())
+            );
+        }
     }
     }
 
 
     auto outputRowType = ctx.MakeType<TStructExprType>(resultStructItems);
     auto outputRowType = ctx.MakeType<TStructExprType>(resultStructItems);

+ 1 - 1
ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

@@ -362,7 +362,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
     }
     }
 
 
     static THashSet<TStringBuf> supportedJoinKinds = {"Inner", "Left", "LeftOnly", "LeftSemi", "RightSemi"};
     static THashSet<TStringBuf> supportedJoinKinds = {"Inner", "Left", "LeftOnly", "LeftSemi", "RightSemi"};
-    static THashSet<TStringBuf> supportedStreamJoinKinds = {"Inner", "Left"};
+    static THashSet<TStringBuf> supportedStreamJoinKinds = {"Inner", "Left", "LeftOnly"};
     if (!supportedJoinKinds.contains(join.JoinType().Value())) {
     if (!supportedJoinKinds.contains(join.JoinType().Value())) {
         return {};
         return {};
     }
     }

+ 10 - 4
ydb/core/kqp/runtime/kqp_program_builder.cpp

@@ -159,6 +159,10 @@ EJoinKind GetIndexLookupJoinKind(const TString& joinKind) {
     }
     }
 }
 }
 
 
+bool RightJoinSideAllowed(const TString& joinType) {
+    return joinType != "LeftOnly";
+}
+
 } // namespace
 } // namespace
 
 
 TKqpProgramBuilder::TKqpProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry)
 TKqpProgramBuilder::TKqpProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry)
@@ -347,10 +351,12 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c
         rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i));
         rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i));
     }
     }
 
 
-    for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) {
-        TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i))
-            : TString::Join(rightLabel, ".", rightRowType->GetMemberName(i));
-        rowTypeBuilder.Add(newMemberName, rightRowType->GetMemberType(i));
+    if (RightJoinSideAllowed(joinType)) {
+        for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) {
+            TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i))
+                : TString::Join(rightLabel, ".", rightRowType->GetMemberName(i));
+            rowTypeBuilder.Add(newMemberName, rightRowType->GetMemberType(i));
+        }
     }
     }
 
 
     auto returnType = NewStreamType(rowTypeBuilder.Build());
     auto returnType = NewStreamType(rowTypeBuilder.Build());

+ 2 - 1
ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

@@ -779,7 +779,8 @@ private:
             resultRowItems[1] = NUdf::TUnboxedValuePod();
             resultRowItems[1] = NUdf::TUnboxedValuePod();
         }
         }
 
 
-        rowStats.ReadRowsCount += (rightRowSize > 0 ? 1 : 0);
+        rowStats.ReadRowsCount += (leftRowInfo.RightRowExist ? 1 : 0);
+        // TODO: use datashard statistics KIKIMR-16924
         rowStats.ReadBytesCount += rightRowSize;
         rowStats.ReadBytesCount += rightRowSize;
         rowStats.ResultRowsCount += 1;
         rowStats.ResultRowsCount += 1;
         rowStats.ResultBytesCount += leftRowSize + rightRowSize;
         rowStats.ResultBytesCount += leftRowSize + rightRowSize;

+ 34 - 0
ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp

@@ -414,6 +414,40 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) {
         ])", 3, StreamLookup);
         ])", 3, StreamLookup);
 }
 }
 
 
+Y_UNIT_TEST_TWIN(SimpleLeftOnlyJoin, StreamLookup) {
+    Test(
+        R"(
+        SELECT l.Key, l.Fk, l.Value
+        FROM `/Root/Left` AS l
+        LEFT ONLY JOIN `/Root/Right` AS r
+            ON l.Fk = r.Key
+        ORDER BY l.Key
+    )",
+    R"([
+        [[4];[104];["Value2"]];
+        [[5];[105];["Value3"]];
+        [[6];#;["Value6"]];
+        [[7];#;["Value7"]]
+    ])", 3, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(LeftOnlyJoinValueColumn, StreamLookup) {
+    Test(
+        R"(
+        SELECT l.Value
+        FROM `/Root/Left` AS l
+        LEFT ONLY JOIN `/Root/Right` AS r
+            ON l.Fk = r.Key
+        ORDER BY l.Value
+    )",
+    R"([
+        [["Value2"]];
+        [["Value3"]];
+        [["Value6"]];
+        [["Value7"]]
+    ])", 3, StreamLookup);
+}
+
 void CreateSimpleTableWithKeyType(TSession session, const TString& columnType) {
 void CreateSimpleTableWithKeyType(TSession session, const TString& columnType) {
     using namespace fmt::literals;
     using namespace fmt::literals;