Browse Source

fix(kqp): pass right table filter through stream index lookup join (#1034)

Iuliia Sidorina 1 year ago
parent
commit
bc9e922a58

+ 14 - 15
ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

@@ -180,21 +180,10 @@
         {
             "Name": "TKqlStreamLookupTable",
             "Base": "TKqlLookupTableBase",
-            "Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"}
-        },
-        {
-            "Name": "TKqlStreamIdxLookupJoin",
-            "Base": "TCallable",
-            "Match": {"Type": "Callable", "Name": "KqlStreamIdxLookupJoin"},
+            "Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"},
             "Children": [
-                {"Index": 0, "Name": "LeftInput", "Type": "TExprBase"},
-                {"Index": 1, "Name": "LeftLabel", "Type": "TCoAtom"},
-                {"Index": 2, "Name": "RightTable", "Type": "TKqpTable"},
-                {"Index": 3, "Name": "RightColumns", "Type": "TCoAtomList"},
-                {"Index": 4, "Name": "RightLabel", "Type": "TCoAtom"},
-                {"Index": 5, "Name": "JoinType", "Type": "TCoAtom"}
+                {"Index": 3, "Name": "LookupStrategy", "Type": "TCoAtom"}
             ]
-
         },
         {
             "Name": "TKqlSequencer",
@@ -469,9 +458,9 @@
             ]
         },
         {
-            "Name": "TKqpIndexLookupJoin",
+            "Name": "TKqlIndexLookupJoinBase",
             "Base": "TCallable",
-            "Match": {"Type": "Callable", "Name": "KqpIndexLookupJoin"},
+            "Match": {"Type": "CallableBase"},
             "Children": [
                 {"Index": 0, "Name": "Input", "Type": "TExprBase"},
                 {"Index": 1, "Name": "JoinType", "Type": "TCoAtom"},
@@ -479,6 +468,16 @@
                 {"Index": 3, "Name": "RightLabel", "Type": "TCoAtom"}
             ]
         },
+        {
+            "Name": "TKqlIndexLookupJoin",
+            "Base": "TKqlIndexLookupJoinBase",
+            "Match": {"Type": "Callable", "Name": "KqlIndexLookupJoin"}
+        },
+        {
+            "Name": "TKqpIndexLookupJoin",
+            "Base": "TKqlIndexLookupJoinBase",
+            "Match": {"Type": "Callable", "Name": "KqpIndexLookupJoin"}
+        },
         {
             "Name": "TKqpCnSequencer",
             "Base": "TKqpConnection",

+ 68 - 118
ydb/core/kqp/host/kqp_type_ann.cpp

@@ -425,7 +425,7 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
 TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
     const TKikimrTablesData& tablesData, bool withSystemColumns)
 {
-    if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) {
+    if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) || TKqlStreamLookupTable::Match(node.Get()) ? 4 : 3, ctx)) {
         return TStatus::Error;
     }
 
@@ -467,10 +467,57 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
     }
 
     YQL_ENSURE(lookupType);
-    if (!EnsureStructType(node->Pos(), *lookupType, ctx)) {
-        return TStatus::Error;
+
+    const TStructExprType* structType = nullptr;
+    bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get());
+    if (isStreamLookup) {
+        auto lookupStrategy = node->Child(TKqlStreamLookupTable::idx_LookupStrategy);
+        if (!EnsureAtom(*lookupStrategy, ctx)) {
+            return TStatus::Error;
+        }
+
+        if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName) {
+            if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) {
+                return TStatus::Error;
+            }
+
+            if (!EnsureTupleTypeSize(node->Pos(), lookupType, 2, ctx)) {
+                return TStatus::Error;
+            }
+
+            auto tupleType = lookupType->Cast<TTupleExprType>();
+            if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[0], ctx)) {
+                return TStatus::Error;
+            }
+
+            if (!EnsureStructType(node->Pos(), *tupleType->GetItems()[1], ctx)) {
+                return TStatus::Error;
+            }
+
+            structType = tupleType->GetItems()[0]->Cast<TStructExprType>();
+            auto leftRowType = tupleType->GetItems()[1]->Cast<TStructExprType>();
+
+            TVector<const TTypeAnnotationNode*> outputTypes;
+            outputTypes.push_back(leftRowType);
+            outputTypes.push_back(ctx.MakeType<TOptionalExprType>(rowType));
+
+            rowType = ctx.MakeType<TTupleExprType>(outputTypes);
+        } else {
+            if (!EnsureStructType(node->Pos(), *lookupType, ctx)) {
+                return TStatus::Error;
+            }
+
+            structType = lookupType->Cast<TStructExprType>();
+        }
+    } else {
+        if (!EnsureStructType(node->Pos(), *lookupType, ctx)) {
+            return TStatus::Error;
+        }
+
+        structType = lookupType->Cast<TStructExprType>();
     }
-    auto structType = lookupType->Cast<TStructExprType>();
+
+    YQL_ENSURE(structType);
 
     ui32 keyColumnsCount = 0;
     if (TKqlLookupIndexBase::Match(node.Get())) {
@@ -1338,105 +1385,6 @@ TStatus AnnotateSequencer(const TExprNode::TPtr& node, TExprContext& ctx, const
     return TStatus::Ok;
 }
 
-TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
-    const TKikimrTablesData& tablesData, bool withSystemColumns)
-{
-    if (!EnsureArgsCount(*node, 6, ctx)) {
-        return TStatus::Error;
-    }
-
-    auto leftInputType = node->Child(TKqlStreamIdxLookupJoin::idx_LeftInput)->GetTypeAnn();
-    const TTypeAnnotationNode* leftInputItemType;
-    if (!EnsureNewSeqType<false>(node->Pos(), *leftInputType, ctx, &leftInputItemType)) {
-        return TStatus::Error;
-    }
-
-    YQL_ENSURE(leftInputItemType);
-    if (!EnsureTupleType(node->Pos(), *leftInputItemType, ctx)) {
-        return TStatus::Error;
-    }
-
-    if (!EnsureTupleTypeSize(node->Pos(), leftInputItemType, 2, ctx)) {
-        return TStatus::Error;
-    }
-
-    auto leftInputTupleType = leftInputItemType->Cast<TTupleExprType>();
-    if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[0], ctx)) {
-        return TStatus::Error;
-    }
-
-    if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[1], ctx)) {
-        return TStatus::Error;
-    }
-
-    if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel), ctx)) {
-        return TStatus::Error;
-    }
-
-    TCoAtom leftLabel(node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel));
-
-    auto rightTable = ResolveTable(node->Child(TKqlStreamIdxLookupJoin::idx_RightTable), ctx, cluster, tablesData);
-    if (!rightTable.second) {
-        return TStatus::Error;
-    }
-
-    const TStructExprType* inputKeysType = leftInputTupleType->GetItems()[0]->Cast<TStructExprType>();
-    for (const auto& inputKey : inputKeysType->GetItems()) {
-        if (!rightTable.second->GetKeyColumnIndex(TString(inputKey->GetName()))) {
-            return TStatus::Error;
-        }
-    }
-
-    if (!EnsureTupleOfAtoms(*node->Child(TKqlStreamIdxLookupJoin::idx_RightColumns), ctx)) {
-        return TStatus::Error;
-    }
-
-    TCoAtomList rightColumns{node->ChildPtr(TKqlStreamIdxLookupJoin::idx_RightColumns)};
-    for (const auto& rightColumn : rightColumns) {
-        if (!rightTable.second->GetColumnType(TString(rightColumn.Value()))) {
-            return TStatus::Error;
-        }
-    }
-
-    auto rightDataType = GetReadTableRowType(ctx, tablesData, cluster, rightTable.first, rightColumns, withSystemColumns);
-    if (!rightDataType) {
-        return TStatus::Error;
-    }
-
-    if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel), ctx)) {
-        return TStatus::Error;
-    }
-
-    TCoAtom rightLabel(node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel));
-    TCoAtom joinType(node->Child(TKqlStreamIdxLookupJoin::idx_JoinType));
-
-    const TStructExprType* leftDataType = leftInputTupleType->GetItems()[1]->Cast<TStructExprType>();
-    TVector<const TItemExprType*> resultStructItems;
-    for (const auto& member : leftDataType->GetItems()) {
-        resultStructItems.emplace_back(
-            ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", member->GetName()), member->GetItemType())
-        );
-    }
-
-    if (RightJoinSideAllowed(joinType.Value())) {
-        for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) {
-            const bool makeOptional = RightJoinSideOptional(joinType.Value()) && !member->GetItemType()->IsOptionalOrNull();
-
-            const TTypeAnnotationNode* memberType = makeOptional
-                ? ctx.MakeType<TOptionalExprType>(member->GetItemType())
-                : member->GetItemType();
-
-            resultStructItems.emplace_back(
-                ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), memberType)
-            );
-        }
-    }
-
-    auto rowType = ctx.MakeType<TStructExprType>(resultStructItems);
-    node->SetTypeAnn(ctx.MakeType<TListExprType>(rowType));
-    return TStatus::Ok;
-}
-
 TStatus AnnotateKqpProgram(const TExprNode::TPtr& node, TExprContext& ctx) {
     if (!EnsureArgsCount(*node, 2, ctx)) {
         return TStatus::Error;
@@ -1621,7 +1569,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
 
     YQL_ENSURE(inputItemType);
 
-    if (lookupStrategy.Value() == "LookupRows") {
+    if (lookupStrategy.Value() == TKqpStreamLookupStrategyName) {
         if (!EnsureStructType(node->Pos(), *inputItemType, ctx)) {
             return TStatus::Error;
         }
@@ -1640,7 +1588,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
 
         node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));
 
-    } else if (lookupStrategy.Value() == "LookupJoinRows") {
+    } else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName) {
         if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
             return TStatus::Error;
         }
@@ -1694,7 +1642,7 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
         return TStatus::Error;
     }
 
-    auto inputType = node->Child(TKqpIndexLookupJoin::idx_Input)->GetTypeAnn();
+    auto inputType = node->Child(TKqlIndexLookupJoinBase::idx_Input)->GetTypeAnn();
     const TTypeAnnotationNode* inputItemType;
     if (!EnsureNewSeqType<false>(node->Pos(), *inputType, ctx, &inputItemType)) {
         return TStatus::Error;
@@ -1725,22 +1673,22 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
         return TStatus::Error;
     }
 
-    if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_JoinType), ctx)) {
+    if (!EnsureAtom(*node->Child(TKqlIndexLookupJoinBase::idx_JoinType), ctx)) {
         return TStatus::Error;
     }
 
-    if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_LeftLabel), ctx)) {
+    if (!EnsureAtom(*node->Child(TKqlIndexLookupJoinBase::idx_LeftLabel), ctx)) {
         return TStatus::Error;
     }
 
-    TCoAtom leftLabel(node->Child(TKqpIndexLookupJoin::idx_LeftLabel));
+    TCoAtom leftLabel(node->Child(TKqlIndexLookupJoinBase::idx_LeftLabel));
 
-    if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_RightLabel), ctx)) {
+    if (!EnsureAtom(*node->Child(TKqlIndexLookupJoinBase::idx_RightLabel), ctx)) {
         return TStatus::Error;
     }
 
-    TCoAtom rightLabel(node->Child(TKqpIndexLookupJoin::idx_RightLabel));
-    TCoAtom joinType(node->Child(TKqpIndexLookupJoin::idx_JoinType));
+    TCoAtom rightLabel(node->Child(TKqlIndexLookupJoinBase::idx_RightLabel));
+    TCoAtom joinType(node->Child(TKqlIndexLookupJoinBase::idx_JoinType));
 
     TVector<const TItemExprType*> resultStructItems;
     for (const auto& item : leftRowType->GetItems()) {
@@ -1764,7 +1712,13 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
     }
 
     auto outputRowType = ctx.MakeType<TStructExprType>(resultStructItems);
-    node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType));
+    const bool isPhysical = TKqpIndexLookupJoin::Match(node.Get());
+    if (isPhysical) {
+        node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType));
+    } else {
+        node->SetTypeAnn(ctx.MakeType<TListExprType>(outputRowType));
+    }
+    
     return TStatus::Ok;
 }
 
@@ -1892,7 +1846,7 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
                 return AnnotateStreamLookupConnection(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
             }
 
-            if (TKqpIndexLookupJoin::Match(input.Get())) {
+            if (TKqlIndexLookupJoinBase::Match(input.Get())) {
                 return AnnotateIndexLookupJoin(input, ctx);
             }
 
@@ -1924,10 +1878,6 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
                 return AnnotateSequencer(input, ctx, cluster, *tablesData);
             }
 
-            if (TKqlStreamIdxLookupJoin::Match(input.Get())) {
-                return AnnotateStreamIdxLookupJoin(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
-            }
-
             if (TKqpProgram::Match(input.Get())) {
                 return AnnotateKqpProgram(input, ctx);
             }

+ 11 - 0
ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp

@@ -214,6 +214,17 @@ TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx,
             .Done();
     }
 
+    if (auto maybeStreamLookup = lookup.Maybe<TKqlStreamLookupTable>()) {
+        auto streamLookup = maybeStreamLookup.Cast();
+
+        return Build<TKqlStreamLookupTable>(ctx, lookup.Pos())
+            .Table(streamLookup.Table())
+            .LookupKeys(streamLookup.LookupKeys())
+            .Columns(usedColumns.Cast())
+            .LookupStrategy(streamLookup.LookupStrategy())
+            .Done();
+    }
+
     return Build<TKqlLookupTableBase>(ctx, lookup.Pos())
         .CallableName(lookup.CallableName())
         .Table(lookup.Table())

+ 7 - 0
ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp

@@ -321,6 +321,7 @@ TExprBase DoRewriteIndexRead(const TReadMatch& read, TExprContext& ctx,
             .Table(read.Table())
             .LookupKeys(readIndexTable.Ptr())
             .Columns(read.Columns())
+            .LookupStrategy().Build(TKqpStreamLookupStrategyName)
             .Done();
     } else {
         return Build<TKqlLookupTable>(ctx, read.Pos())
@@ -365,6 +366,7 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
                     .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
                     .LookupKeys(lookupIndex.LookupKeys())
                     .Columns(lookupIndex.Columns())
+                    .LookupStrategy().Build(TKqpStreamLookupStrategyName)
                     .Done();
             }
 
@@ -382,12 +384,14 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
                 .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
                 .LookupKeys(lookupIndex.LookupKeys())
                 .Columns(keyColumnsList)
+                .LookupStrategy().Build(TKqpStreamLookupStrategyName)
                 .Done();
 
             return Build<TKqlStreamLookupTable>(ctx, node.Pos())
                 .Table(lookupIndex.Table())
                 .LookupKeys(lookupIndexTable.Ptr())
                 .Columns(lookupIndex.Columns())
+                .LookupStrategy().Build(TKqpStreamLookupStrategyName)
                 .Done();
         }
 
@@ -424,6 +428,7 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
                 .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
                 .LookupKeys(streamLookupIndex.LookupKeys())
                 .Columns(streamLookupIndex.Columns())
+                .LookupStrategy().Build(TKqpStreamLookupStrategyName)
                 .Done();
         }
 
@@ -433,12 +438,14 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
             .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
             .LookupKeys(streamLookupIndex.LookupKeys())
             .Columns(keyColumnsList)
+            .LookupStrategy().Build(TKqpStreamLookupStrategyName)
             .Done();
 
         return Build<TKqlStreamLookupTable>(ctx, node.Pos())
             .Table(streamLookupIndex.Table())
             .LookupKeys(lookupIndexTable.Ptr())
             .Columns(streamLookupIndex.Columns())
+            .LookupStrategy().Build(TKqpStreamLookupStrategyName)
             .Done();
     }
 

+ 105 - 6
ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

@@ -236,6 +236,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos,
                     .Build()
                 .Build()
             .Columns(columns)
+            .LookupStrategy().Build(TKqpStreamLookupStrategyName)
             .Done();
     }
 
@@ -249,6 +250,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos,
                     .Build()
                 .Build()
             .Columns(columns)
+            .LookupStrategy().Build(TKqpStreamLookupStrategyName)
             .Done();
     }
 
@@ -338,15 +340,112 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) {
 #define DBG(...)
 
 template<typename ReadType>
-TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(const TDqJoin& join, TExprBase leftInput, ReadType rightRead, TExprContext& ctx) {
+TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(const TDqJoin& join, TExprBase leftInput, const TKqpMatchReadResult& rightReadMatch, TExprContext& ctx) {
     TString leftLabel = join.LeftLabel().Maybe<TCoAtom>() ? TString(join.LeftLabel().Cast<TCoAtom>().Value()) : "";
     TString rightLabel = join.RightLabel().Maybe<TCoAtom>() ? TString(join.RightLabel().Cast<TCoAtom>().Value()) : "";
+    auto rightRead = rightReadMatch.Read.template Cast<ReadType>();
 
-    return Build<TKqlStreamIdxLookupJoin>(ctx, join.Pos())
-        .LeftInput(leftInput)
+    TExprBase lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
+        .Table(rightRead.Table())
+        .LookupKeys(leftInput)
+        .Columns(rightRead.Columns())
+        .LookupStrategy().Build(TKqpStreamLookupJoinStrategyName)
+        .Done();
+
+    // Stream lookup join output: stream<tuple<left_row_struct, optional<right_row_struct>>>
+    // so we should apply filters to second element of tuple for each row
+
+    if (rightReadMatch.ExtractMembers) {
+        lookupJoin = Build<TCoMap>(ctx, join.Pos())
+            .Input(lookupJoin)
+            .Lambda()
+                .Args({"tuple"})
+                .Body<TExprList>()
+                    .Add<TCoNth>()
+                        .Tuple("tuple")
+                        .Index().Value("0").Build()
+                        .Build()
+                    .Add<TCoExtractMembers>()
+                        .Input<TCoNth>()
+                            .Tuple("tuple")
+                            .Index().Value("1").Build()
+                            .Build()
+                        .Members(rightReadMatch.ExtractMembers.Cast().Members())
+                        .Build()    
+                    .Build()
+                .Build()
+            .Done();
+    }    
+
+    if (rightReadMatch.FilterNullMembers) {
+        lookupJoin = Build<TCoMap>(ctx, join.Pos())
+            .Input(lookupJoin)
+            .Lambda()
+                .Args({"tuple"})
+                .Body<TExprList>()
+                    .Add<TCoNth>()
+                        .Tuple("tuple")
+                        .Index().Value("0").Build()
+                        .Build()
+                    .Add<TCoFilterNullMembers>()
+                        .Input<TCoNth>()
+                            .Tuple("tuple")
+                            .Index().Value("1").Build()
+                            .Build()
+                        .Members(rightReadMatch.FilterNullMembers.Cast().Members())
+                        .Build()    
+                    .Build()
+                .Build()
+            .Done();
+    }
+    
+    if (rightReadMatch.SkipNullMembers) {
+        lookupJoin = Build<TCoMap>(ctx, join.Pos())
+            .Input(lookupJoin)
+            .Lambda()
+                .Args({"tuple"})
+                .Body<TExprList>()
+                    .Add<TCoNth>()
+                        .Tuple("tuple")
+                        .Index().Value("0").Build()
+                        .Build()
+                    .Add<TCoSkipNullMembers>()
+                        .Input<TCoNth>()
+                            .Tuple("tuple")
+                            .Index().Value("1").Build()
+                            .Build()
+                        .Members(rightReadMatch.SkipNullMembers.Cast().Members())
+                        .Build()    
+                    .Build()
+                .Build()
+            .Done();
+    }
+
+    if (rightReadMatch.FlatMap) {
+        lookupJoin = Build<TCoMap>(ctx, join.Pos())
+            .Input(lookupJoin)
+            .Lambda()
+                .Args({"tuple"})
+                .Body<TExprList>()
+                    .Add<TCoNth>()
+                        .Tuple("tuple")
+                        .Index().Value("0").Build()
+                        .Build()
+                    .Add<TCoFlatMap>()
+                        .Input<TCoNth>()
+                            .Tuple("tuple")
+                            .Index().Value("1").Build()
+                            .Build()
+                        .Lambda(rightReadMatch.FlatMap.Cast().Lambda())
+                        .Build()    
+                    .Build()  
+                .Build()    
+            .Done();
+    }
+
+    return Build<TKqlIndexLookupJoin>(ctx, join.Pos())
+        .Input(lookupJoin)
         .LeftLabel().Build(leftLabel)
-        .RightTable(rightRead.Table())
-        .RightColumns(rightRead.Columns())
         .RightLabel().Build(rightLabel)
         .JoinType(join.JoinType())
         .Done();
@@ -689,7 +788,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
                 .Build()
             .Done();
 
-        return BuildKqpStreamIndexLookupJoin(join, leftInput, rightRead, ctx);
+        return BuildKqpStreamIndexLookupJoin<ReadType>(join, leftInput, *rightReadMatch, ctx);
     }
 
     auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos());

+ 2 - 0
ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp

@@ -273,6 +273,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
                     .Table(read.Table())
                     .LookupKeys(lookupKeys)
                     .Columns(read.Columns())
+                    .LookupStrategy().Build(TKqpStreamLookupStrategyName)
                     .Done();
             }
         } else {
@@ -433,6 +434,7 @@ TExprBase KqpRewriteLookupTable(const TExprBase& node, TExprContext& ctx, const
         .Table(lookup.Table())
         .LookupKeys(lookup.LookupKeys())
         .Columns(lookup.Columns())
+        .LookupStrategy().Build(TKqpStreamLookupStrategyName)
         .Done();
 }
 

+ 1 - 0
ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp

@@ -399,6 +399,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
                                 .Table(read.Table())
                                 .Columns(read.Columns())
                                 .LookupKeys(keys)
+                                .LookupStrategy().Build(TKqpStreamLookupStrategyName)
                                 .Done();
                         }
                     } else {

+ 1 - 1
ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

@@ -34,7 +34,7 @@ public:
         AddHandler(0, &TKqlReadTableRanges::Match, HNDL(BuildReadTableRangesStage));
         AddHandler(0, &TKqlLookupTable::Match, HNDL(BuildLookupTableStage));
         AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages));
-        AddHandler(0, &TKqlStreamIdxLookupJoin::Match, HNDL(BuildStreamIdxLookupJoinStages));
+        AddHandler(0, &TKqlIndexLookupJoin::Match, HNDL(BuildStreamIdxLookupJoinStages));
         AddHandler(0, &TKqlSequencer::Match, HNDL(BuildSequencerStages));
         AddHandler(0, [](auto) { return true; }, HNDL(RemoveRedundantSortByPk));
         AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable));

+ 8 - 16
ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp

@@ -702,7 +702,7 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
             .Table(lookup.Table())
             .Columns(lookup.Columns())
             .InputType(ExpandType(lookup.Pos(), *lookup.LookupKeys().Ref().GetTypeAnn(), ctx))
-            .LookupStrategy().Build(TKqpStreamLookupStrategyName)
+            .LookupStrategy(lookup.LookupStrategy())
             .Done();
 
     } else if (lookup.LookupKeys().Maybe<TDqCnUnionAll>()) {
@@ -713,7 +713,7 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
             .Table(lookup.Table())
             .Columns(lookup.Columns())
             .InputType(ExpandType(lookup.Pos(), *output.Ref().GetTypeAnn(), ctx))
-            .LookupStrategy().Build(TKqpStreamLookupStrategyName)
+            .LookupStrategy(lookup.LookupStrategy())
             .Done();
     } else {
         return node;
@@ -738,32 +738,24 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
 }
 
 NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx) {
-    if (!node.Maybe<TKqlStreamIdxLookupJoin>()) {
+    if (!node.Maybe<TKqlIndexLookupJoin>()) {
         return node;
     }
 
-    const auto& idxLookupJoin = node.Cast<TKqlStreamIdxLookupJoin>();
-    YQL_ENSURE(idxLookupJoin.LeftInput().Maybe<TDqCnUnionAll>(), "Expected UnionAll as left input");
-
-    auto output = idxLookupJoin.LeftInput().Cast<TDqCnUnionAll>().Output();
-    auto cnStreamIdxLookupJoin = Build<TKqpCnStreamLookup>(ctx, idxLookupJoin.Pos())
-        .Output(output)
-        .Table(idxLookupJoin.RightTable())
-        .Columns(idxLookupJoin.RightColumns())
-        .InputType(ExpandType(idxLookupJoin.Pos(), *output.Ref().GetTypeAnn(), ctx))
-        .LookupStrategy().Build(TKqpStreamLookupJoinStrategyName)
-        .Done();
+    const auto& idxLookupJoin = node.Cast<TKqlIndexLookupJoin>();
 
     return Build<TDqCnUnionAll>(ctx, node.Pos())
         .Output()
             .Stage<TDqStage>()
             .Inputs()
-                .Add(cnStreamIdxLookupJoin)
+                .Add(idxLookupJoin.Input())
                 .Build()
             .Program()
                 .Args({"stream_lookup_join_output"})
                 .Body<TKqpIndexLookupJoin>()
-                    .Input("stream_lookup_join_output")
+                    .Input<TCoToStream>()
+                        .Input("stream_lookup_join_output")
+                        .Build()
                     .JoinType(idxLookupJoin.JoinType())
                     .LeftLabel(idxLookupJoin.LeftLabel())
                     .RightLabel(idxLookupJoin.RightLabel())

+ 90 - 51
ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp

@@ -61,7 +61,8 @@ void PrepareTables(TSession session) {
             (101, "Value21"),
             (102, "Value22"),
             (103, "Value23"),
-            (NULL, "Value24");
+            (NULL, "Value24"),
+            (104, NULL);
 
         REPLACE INTO `/Root/LaunchByProcessIdAndPinned` (idx_processId, idx_pinned, idx_launchNumber) VALUES
             ("eProcess", false, 4),
@@ -174,7 +175,7 @@ Y_UNIT_TEST(MultiJoins) {
     CompareYson(answer, FormatResultSetYson(result.GetResultSet(0)));
 }
 
-Y_UNIT_TEST(Inner) {
+Y_UNIT_TEST_TWIN(Inner, StreamLookup) {
     Test(
         R"(
             SELECT l.Key, l.Fk, l.Value, r.Key, r.Value
@@ -186,10 +187,10 @@ Y_UNIT_TEST(Inner) {
         )",
         R"([
             [[1];[101];["Value1"];[101];["Value21"]]
-        ])", 2);
+        ])", 2, StreamLookup);
 }
 
-Y_UNIT_TEST(Left) {
+Y_UNIT_TEST_TWIN(Left, StreamLookup) {
     Test(
         R"(
             SELECT l.Key, l.Fk, l.Value, r.Key, r.Value
@@ -201,14 +202,14 @@ Y_UNIT_TEST(Left) {
         )",
         R"([
             [[3];[103];["Value2"];[103];["Value23"]];
-            [[4];[104];["Value2"];#;#];
+            [[4];[104];["Value2"];[104];#];
             [[5];[105];["Value3"];#;#];
             [[6];#;["Value6"];#;#];
             [[7];#;["Value7"];#;#]
-        ])", 1);
+        ])", 2, StreamLookup);
 }
 
-Y_UNIT_TEST(LeftOnly) {
+Y_UNIT_TEST_TWIN(LeftOnly, StreamLookup) {
     Test(
         R"(
             SELECT l.Key, l.Fk, l.Value
@@ -219,11 +220,10 @@ Y_UNIT_TEST(LeftOnly) {
             ORDER BY l.Key
         )",
         R"([
-            [[4];[104];["Value2"]];
             [[5];[105];["Value3"]];
             [[6];#;["Value6"]];
             [[7];#;["Value7"]]
-        ])", 1);
+        ])", 2, StreamLookup);
 }
 
 Y_UNIT_TEST(LeftSemi) {
@@ -236,8 +236,9 @@ Y_UNIT_TEST(LeftSemi) {
             WHERE l.Value != 'Value1'   -- left table payload filter
         )",
         R"([
-            [[3];[103];["Value2"]]
-        ])", 1);
+            [[3];[103];["Value2"]];
+            [[4];[104];["Value2"]]
+        ])", 2);
 }
 
 Y_UNIT_TEST(RightSemi) {
@@ -253,7 +254,7 @@ Y_UNIT_TEST(RightSemi) {
         R"([
             [[101];["Value21"]];
             [[103];["Value23"]]
-        ])", 3);
+        ])", 4);
 }
 
 Y_UNIT_TEST_TWIN(SimpleInnerJoin, StreamLookup) {
@@ -268,8 +269,9 @@ Y_UNIT_TEST_TWIN(SimpleInnerJoin, StreamLookup) {
         R"([
             [[1];[101];["Value1"];[101];["Value21"]];
             [[2];[102];["Value1"];[102];["Value22"]];
-            [[3];[103];["Value2"];[103];["Value23"]]
-        ])", 3, StreamLookup);
+            [[3];[103];["Value2"];[103];["Value23"]];
+            [[4];[104];["Value2"];[104];#]
+        ])", 4, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(InnerJoinCustomColumnOrder, StreamLookup) {
@@ -284,8 +286,9 @@ Y_UNIT_TEST_TWIN(InnerJoinCustomColumnOrder, StreamLookup) {
         R"([
             [["Value21"];[1];[101];["Value1"];[101]];
             [["Value22"];[2];[102];["Value1"];[102]];
-            [["Value23"];[3];[103];["Value2"];[103]]
-        ])", 3, StreamLookup);
+            [["Value23"];[3];[103];["Value2"];[103]];
+            [#;[4];[104];["Value2"];[104]]
+        ])", 4, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(InnerJoinOnlyRightColumn, StreamLookup) {
@@ -298,10 +301,11 @@ Y_UNIT_TEST_TWIN(InnerJoinOnlyRightColumn, StreamLookup) {
             ORDER BY r.Value;
         )",
         R"([
+            [#];
             [["Value21"]];
             [["Value22"]];
             [["Value23"]]
-        ])", 3, StreamLookup);
+        ])", 4, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(InnerJoinOnlyLeftColumn, StreamLookup) {
@@ -316,8 +320,9 @@ Y_UNIT_TEST_TWIN(InnerJoinOnlyLeftColumn, StreamLookup) {
         R"([
             [[101]];
             [[102]];
-            [[103]]
-        ])", 3, StreamLookup);
+            [[103]];
+            [[104]]
+        ])", 4, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(InnerJoinLeftFilter, StreamLookup) {
@@ -331,8 +336,9 @@ Y_UNIT_TEST_TWIN(InnerJoinLeftFilter, StreamLookup) {
             ORDER BY l.Key;
         )",
         R"([
-            [[3];[103];["Value2"];[103];["Value23"]]
-        ])", 1, StreamLookup);
+            [[3];[103];["Value2"];[103];["Value23"]];
+            [[4];[104];["Value2"];[104];#]
+        ])", 2, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(SimpleLeftJoin, StreamLookup) {
@@ -348,11 +354,11 @@ Y_UNIT_TEST_TWIN(SimpleLeftJoin, StreamLookup) {
             [[1];[101];["Value1"];[101];["Value21"]];
             [[2];[102];["Value1"];[102];["Value22"]];
             [[3];[103];["Value2"];[103];["Value23"]];
-            [[4];[104];["Value2"];#;#];
+            [[4];[104];["Value2"];[104];#];
             [[5];[105];["Value3"];#;#];
             [[6];#;["Value6"];#;#];
             [[7];#;["Value7"];#;#]
-        ])", 3, StreamLookup);
+        ])", 4, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(LeftJoinCustomColumnOrder, StreamLookup) {
@@ -368,11 +374,11 @@ Y_UNIT_TEST_TWIN(LeftJoinCustomColumnOrder, StreamLookup) {
             [["Value21"];[1];[101];["Value1"];[101]];
             [["Value22"];[2];[102];["Value1"];[102]];
             [["Value23"];[3];[103];["Value2"];[103]];
-            [#;[4];#;["Value2"];[104]];
+            [#;[4];[104];["Value2"];[104]];
             [#;[5];#;["Value3"];[105]];
             [#;[6];#;["Value6"];#];
             [#;[7];#;["Value7"];#]
-        ])", 3, StreamLookup);
+        ])", 4, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(LeftJoinOnlyRightColumn, StreamLookup) {
@@ -392,7 +398,7 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyRightColumn, StreamLookup) {
             [["Value21"]];
             [["Value22"]];
             [["Value23"]]
-        ])", 3, StreamLookup);
+        ])", 4, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) {
@@ -412,41 +418,74 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) {
             [[103]];
             [[104]];
             [[105]]
-        ])", 3, StreamLookup);
+        ])", 4, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(SimpleLeftOnlyJoin, StreamLookup) {
     Test(
         R"(
-        SELECT l.Key, l.Fk, l.Value
-        FROM `/Root/Left` AS l
-        LEFT ONLY JOIN `/Root/Right` AS r
-            ON l.Fk = r.Key
-        ORDER BY l.Key
-    )",
-    R"([
-        [[4];[104];["Value2"]];
-        [[5];[105];["Value3"]];
-        [[6];#;["Value6"]];
-        [[7];#;["Value7"]]
-    ])", 3, StreamLookup);
+            SELECT l.Key, l.Fk, l.Value
+            FROM `/Root/Left` AS l
+            LEFT ONLY JOIN `/Root/Right` AS r
+                ON l.Fk = r.Key
+            ORDER BY l.Key
+        )",
+        R"([
+            [[5];[105];["Value3"]];
+            [[6];#;["Value6"]];
+            [[7];#;["Value7"]]
+        ])", 4, StreamLookup);
 }
 
 Y_UNIT_TEST_TWIN(LeftOnlyJoinValueColumn, StreamLookup) {
     Test(
         R"(
-        SELECT l.Value
-        FROM `/Root/Left` AS l
-        LEFT ONLY JOIN `/Root/Right` AS r
-            ON l.Fk = r.Key
-        ORDER BY l.Value
-    )",
-    R"([
-        [["Value2"]];
-        [["Value3"]];
-        [["Value6"]];
-        [["Value7"]]
-    ])", 3, StreamLookup);
+            SELECT l.Value
+            FROM `/Root/Left` AS l
+            LEFT ONLY JOIN `/Root/Right` AS r
+                ON l.Fk = r.Key
+            ORDER BY l.Value
+        )",
+        R"([
+            [["Value3"]];
+            [["Value6"]];
+            [["Value7"]]
+        ])", 4, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(LeftJoinRightNullFilter, StreamLookup) {
+    Test(
+        R"(
+            SELECT l.Value, r.Value
+            FROM `/Root/Left` AS l
+            LEFT JOIN `/Root/Right` AS r
+                ON l.Fk = r.Key
+            WHERE r.Value IS NULL
+            ORDER BY l.Value
+        )",
+        R"([
+            [["Value2"];#];
+            [["Value3"];#];
+            [["Value6"];#];
+            [["Value7"];#]
+        ])", 4, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(LeftJoinSkipNullFilter, StreamLookup) {
+    Test(
+        R"(
+            SELECT l.Value, r.Value
+            FROM `/Root/Left` AS l
+            LEFT JOIN `/Root/Right` AS r
+                ON l.Fk = r.Key
+            WHERE r.Value IS NOT NULL
+            ORDER BY l.Value
+        )",
+        R"([
+            [["Value1"];["Value21"]];
+            [["Value1"];["Value22"]];
+            [["Value2"];["Value23"]]
+        ])", 4, StreamLookup);
 }
 
 void CreateSimpleTableWithKeyType(TSession session, const TString& columnType) {