Browse Source

YQ Connector:Pushdown of simple filters. First version with support of several predicates/operations

galaxycrab 1 year ago
parent
commit
1ed4742ff7

+ 5 - 0
.mapping.json

@@ -7426,6 +7426,11 @@
   "ydb/library/yql/providers/common/provider/CMakeLists.linux-x86_64.txt":"",
   "ydb/library/yql/providers/common/provider/CMakeLists.txt":"",
   "ydb/library/yql/providers/common/provider/CMakeLists.windows-x86_64.txt":"",
+  "ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt":"",
+  "ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt":"",
+  "ydb/library/yql/providers/common/pushdown/CMakeLists.txt":"",
+  "ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt":"",
   "ydb/library/yql/providers/common/schema/CMakeLists.darwin-x86_64.txt":"",
   "ydb/library/yql/providers/common/schema/CMakeLists.linux-aarch64.txt":"",
   "ydb/library/yql/providers/common/schema/CMakeLists.linux-x86_64.txt":"",

+ 1 - 1
ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt

@@ -19,12 +19,12 @@ target_link_libraries(kqp-opt-physical PUBLIC
   opt-physical-effects
   yql-dq-common
   yql-dq-opt
+  providers-common-pushdown
 )
 target_sources(kqp-opt-physical PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
-  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp

+ 1 - 1
ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt

@@ -20,12 +20,12 @@ target_link_libraries(kqp-opt-physical PUBLIC
   opt-physical-effects
   yql-dq-common
   yql-dq-opt
+  providers-common-pushdown
 )
 target_sources(kqp-opt-physical PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
-  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp

+ 1 - 1
ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt

@@ -20,12 +20,12 @@ target_link_libraries(kqp-opt-physical PUBLIC
   opt-physical-effects
   yql-dq-common
   yql-dq-opt
+  providers-common-pushdown
 )
 target_sources(kqp-opt-physical PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
-  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp

+ 1 - 1
ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt

@@ -19,12 +19,12 @@ target_link_libraries(kqp-opt-physical PUBLIC
   opt-physical-effects
   yql-dq-common
   yql-dq-opt
+  providers-common-pushdown
 )
 target_sources(kqp-opt-physical PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
-  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp

+ 19 - 9
ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp

@@ -1,10 +1,11 @@
 #include "kqp_opt_phy_rules.h"
-#include "kqp_opt_phy_olap_filter_collection.h"
 
 #include <ydb/core/formats/arrow/ssa_runtime_version.h>
 #include <ydb/core/kqp/common/kqp_yql.h>
 #include <ydb/library/yql/core/extract_predicate/extract_predicate.h>
 #include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/providers/common/pushdown/collection.h>
+#include <ydb/library/yql/providers/common/pushdown/predicate_node.h>
 
 #include <unordered_set>
 
@@ -23,6 +24,15 @@ static const std::unordered_set<std::string> SecondLevelFilters = {
     "ends_with"
 };
 
+struct TPushdownSettings : public NPushdown::TSettings {
+    TPushdownSettings() {
+        using EFlag = NPushdown::TSettings::EFeatureFlag;
+        Enable(EFlag::LikeOperator, NSsa::RuntimeVersion >= 2U);
+        Enable(EFlag::LikeOperatorOnlyForUtf8, NSsa::RuntimeVersion < 3U);
+        Enable(EFlag::JsonQueryOperators | EFlag::JsonExistsOperator, NSsa::RuntimeVersion >= 3U);
+    }
+};
+
 struct TFilterOpsLevels {
     TFilterOpsLevels(const TMaybeNode<TExprBase>& firstLevel, const TMaybeNode<TExprBase>& secondLevel)
         : FirstLevelOps(firstLevel)
@@ -511,7 +521,7 @@ TFilterOpsLevels PredicatePushdown(const TExprBase& predicate, TExprContext& ctx
     return TFilterOpsLevels(ops, NullNode);
 }
 
-void SplitForPartialPushdown(const TPredicateNode& predicateTree, TPredicateNode& predicatesToPush, TPredicateNode& remainingPredicates,
+void SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree, NPushdown::TPredicateNode& predicatesToPush, NPushdown::TPredicateNode& remainingPredicates,
     TExprContext& ctx, TPositionHandle pos)
 {
     if (predicateTree.CanBePushed) {
@@ -520,7 +530,7 @@ void SplitForPartialPushdown(const TPredicateNode& predicateTree, TPredicateNode
         return;
     }
 
-    if (predicateTree.Op != EBoolOp::And) {
+    if (predicateTree.Op != NPushdown::EBoolOp::And) {
         // We can partially pushdown predicates from AND operator only.
         // For OR operator we would need to have several read operators which is not acceptable.
         // TODO: Add support for NOT(op1 OR op2), because it expands to (!op1 AND !op2).
@@ -529,8 +539,8 @@ void SplitForPartialPushdown(const TPredicateNode& predicateTree, TPredicateNode
     }
 
     bool isFoundNotStrictOp = false;
-    std::vector<TPredicateNode> pushable;
-    std::vector<TPredicateNode> remaining;
+    std::vector<NPushdown::TPredicateNode> pushable;
+    std::vector<NPushdown::TPredicateNode> remaining;
     for (auto& predicate : predicateTree.Children) {
         if (predicate.CanBePushed && !isFoundNotStrictOp) {
             pushable.emplace_back(predicate);
@@ -578,12 +588,12 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
     }
 
     auto optionalIf = maybeOptionalIf.Cast();
-    TPredicateNode predicateTree(optionalIf.Predicate());
-    CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg, read.Process().Body());
+    NPushdown::TPredicateNode predicateTree(optionalIf.Predicate());
+    CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg, read.Process().Body(), TPushdownSettings());
     YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
 
-    TPredicateNode predicatesToPush;
-    TPredicateNode remainingPredicates;
+    NPushdown::TPredicateNode predicatesToPush;
+    NPushdown::TPredicateNode remainingPredicates;
     SplitForPartialPushdown(predicateTree, predicatesToPush, remainingPredicates, ctx, node.Pos());
     if (!predicatesToPush.IsValid()) {
         return node;

+ 0 - 55
ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.h

@@ -1,55 +0,0 @@
-#pragma once
-
-#include <ydb/core/kqp/common/kqp_yql.h>
-
-namespace NKikimr::NKqp::NOpt {
-
-enum class EBoolOp {
-    Undefined = 0,
-    And,
-    Or,
-    Xor,
-    Not
-};
-
-struct TPredicateNode {
-    TPredicateNode()
-        : ExprNode(nullptr)
-        , Op(EBoolOp::Undefined)
-        , CanBePushed(false)
-    {}
-
-    TPredicateNode(NYql::TExprNode::TPtr nodePtr)
-        : ExprNode(nodePtr)
-        , Op(EBoolOp::Undefined)
-        , CanBePushed(false)
-    {}
-
-    TPredicateNode(NYql::NNodes::TExprBase node)
-        : ExprNode(node)
-        , Op(EBoolOp::Undefined)
-        , CanBePushed(false)
-    {}
-
-    TPredicateNode(const TPredicateNode& predNode)
-        : ExprNode(predNode.ExprNode)
-        , Children(predNode.Children)
-        , Op(predNode.Op)
-        , CanBePushed(predNode.CanBePushed)
-    {}
-
-    ~TPredicateNode() {}
-
-    bool IsValid() const;
-    void SetPredicates(const std::vector<TPredicateNode>& predicates, NYql::TExprContext& ctx, NYql::TPositionHandle pos);
-
-    NYql::NNodes::TMaybeNode<NYql::NNodes::TExprBase> ExprNode;
-    std::vector<TPredicateNode> Children;
-    EBoolOp Op;
-    bool CanBePushed;
-};
-
-void CollectPredicates(const NYql::NNodes::TExprBase& predicate, TPredicateNode& predicateTree,
-    const NYql::TExprNode* lambdaArg, const NYql::NNodes::TExprBase& lambdaBody);
-
-} // namespace NKikimr::NKqp::NOpt

+ 1 - 1
ydb/core/kqp/opt/physical/ya.make

@@ -4,7 +4,6 @@ SRCS(
     kqp_opt_phy_build_stage.cpp
     kqp_opt_phy_limit.cpp
     kqp_opt_phy_olap_agg.cpp
-    kqp_opt_phy_olap_filter_collection.cpp
     kqp_opt_phy_olap_filter.cpp
     kqp_opt_phy_precompute.cpp
     kqp_opt_phy_sort.cpp
@@ -19,6 +18,7 @@ PEERDIR(
     ydb/core/kqp/opt/physical/effects
     ydb/library/yql/dq/common
     ydb/library/yql/dq/opt
+    ydb/library/yql/providers/common/pushdown
 )
 
 YQL_LAST_ABI_VERSION()

+ 108 - 7
ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp

@@ -114,10 +114,9 @@ namespace NKikimr::NKqp {
 
             const TString query = fmt::format(
                 R"(
-                SELECT * FROM {data_source_name}.`{database_name}.{table_name}`;
+                SELECT * FROM {data_source_name}.{table_name};
             )",
                 "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
-                "database_name"_a = DEFAULT_DATABASE,
                 "table_name"_a = DEFAULT_TABLE);
 
             auto db = kikimr->GetQueryClient();
@@ -206,11 +205,10 @@ namespace NKikimr::NKqp {
 
             const TString query = fmt::format(
                 R"(
-                SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`;
-                SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`;
+                SELECT 42 FROM {data_source_name}.{table_name};
+                SELECT 42 FROM {data_source_name}.{table_name};
             )",
                 "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
-                "database_name"_a = DEFAULT_DATABASE,
                 "table_name"_a = DEFAULT_TABLE);
 
             auto db = kikimr->GetQueryClient();
@@ -297,10 +295,9 @@ namespace NKikimr::NKqp {
 
             const TString query = fmt::format(
                 R"(
-                SELECT COUNT(*) FROM {data_source_name}.`{database_name}.{table_name}`;
+                SELECT COUNT(*) FROM {data_source_name}.{table_name};
             )",
                 "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
-                "database_name"_a = DEFAULT_DATABASE,
                 "table_name"_a = DEFAULT_TABLE);
 
             auto db = kikimr->GetQueryClient();
@@ -323,5 +320,109 @@ namespace NKikimr::NKqp {
         Y_UNIT_TEST(ClickHouseSelectCount) {
             TestSelectCount(EProviderType::ClickHouse);
         }
+
+        void TestFilterPushdown(EProviderType providerType) {
+            // prepare mock
+            auto clientMock = std::make_shared<TConnectorClientMock>();
+
+            const NApi::TDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType);
+            // clang-format off
+            const NApi::TSelect select = TConnectorClientMock::TSelectBuilder<>()
+                .DataSourceInstance(dataSourceInstance)
+                .What()
+                    .NullableColumn("data_column", Ydb::Type::STRING)
+                    .NullableColumn("filtered_column", Ydb::Type::INT32)
+                    .Done()
+                .Where()
+                    .Filter()
+                        .Equal()
+                            .Column("filtered_column")
+                            .Value<i32>(42)
+                            .Done()
+                        .Done()
+                    .Done()
+                .GetResult();
+            // clang-format on
+
+            // step 1: DescribeTable
+            // clang-format off
+            clientMock->ExpectDescribeTable()
+                .DataSourceInstance(dataSourceInstance)
+                .Response()
+                    .NullableColumn("filtered_column", Ydb::Type::INT32)
+                    .NullableColumn("data_column", Ydb::Type::STRING);
+            // clang-format on
+
+            // step 2: ListSplits
+            // clang-format off
+            clientMock->ExpectListSplits()
+                .Select(select)
+                .Result()
+                    .AddResponse(NewSuccess())
+                        .Description("some binary description")
+                        .Select(select);
+            // clang-format on
+
+            // step 3: ReadSplits
+            // Return data such that it contains values not satisfying the filter conditions.
+            // Then check that, despite that connector reads additional data,
+            // our generic provider then filters it out.
+            std::vector<std::string> colData = {"Filtered text", "Text"};
+            std::vector<i32> filterColumnData = {42, 24};
+            // clang-format off
+            clientMock->ExpectReadSplits()
+                .DataSourceInstance(dataSourceInstance)
+                .Split()
+                    .Description("some binary description")
+                    .Select(select)
+                    .Done()
+                .Result()
+                    .AddResponse(MakeRecordBatch(
+                        MakeArray<arrow::StringBuilder>("data_column", colData, arrow::utf8()),
+                        MakeArray<arrow::Int32Builder>("filtered_column", filterColumnData, arrow::int32())),
+                        NewSuccess());
+            // clang-format on
+
+            // prepare database resolver mock
+            std::shared_ptr<TDatabaseAsyncResolverMock> databaseAsyncResolverMock;
+            if (providerType == EProviderType::ClickHouse) {
+                databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>();
+                databaseAsyncResolverMock->AddClickHouseCluster();
+            }
+
+            // run test
+            auto kikimr = MakeKikimrRunner(nullptr, clientMock, databaseAsyncResolverMock);
+
+            CreateExternalDataSource(providerType, kikimr);
+
+            const TString query = fmt::format(
+                R"(
+                PRAGMA generic.UsePredicatePushdown="true";
+                SELECT data_column FROM {data_source_name}.{table_name} WHERE filtered_column = 42;
+            )",
+                "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
+                "table_name"_a = DEFAULT_TABLE);
+
+            auto db = kikimr->GetQueryClient();
+            auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync();
+            UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString());
+
+            TResultSetParser resultSet(queryResult.GetResultSetParser(0));
+            UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+            UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
+
+            // check every row
+            // Check that, despite returning nonfiltered data in connector, response will be correct
+            std::vector<TMaybe<TString>> result = {"Filtered text"}; // Only data satisfying filter conditions
+            MATCH_RESULT_WITH_INPUT(result, resultSet, GetOptionalString);
+        }
+
+        Y_UNIT_TEST(PostgreSQLFilterPushdown) {
+            TestFilterPushdown(EProviderType::PostgreSQL);
+        }
+
+        Y_UNIT_TEST(ClickHouseFilterPushdown) {
+            TestFilterPushdown(EProviderType::ClickHouse);
+        }
     }
 }

+ 1 - 0
ydb/library/yql/providers/common/CMakeLists.txt

@@ -20,6 +20,7 @@ add_subdirectory(metrics)
 add_subdirectory(mkql)
 add_subdirectory(proto)
 add_subdirectory(provider)
+add_subdirectory(pushdown)
 add_subdirectory(schema)
 add_subdirectory(structured_token)
 add_subdirectory(token_accessor)

Some files were not shown because too many files changed in this diff