Browse Source

YQL-17264 PostgreSQL provider (pg_catalog) - initial commit

vvvv 1 year ago
parent
commit
3be304b490

+ 13 - 0
.mapping.json

@@ -9541,6 +9541,19 @@
   "ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-x86_64.txt":"",
   "ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.txt":"",
   "ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.windows-x86_64.txt":"",
+  "ydb/library/yql/providers/pg/CMakeLists.txt":"",
+  "ydb/library/yql/providers/pg/expr_nodes/CMakeLists.darwin-arm64.txt":"",
+  "ydb/library/yql/providers/pg/expr_nodes/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/library/yql/providers/pg/expr_nodes/CMakeLists.linux-aarch64.txt":"",
+  "ydb/library/yql/providers/pg/expr_nodes/CMakeLists.linux-x86_64.txt":"",
+  "ydb/library/yql/providers/pg/expr_nodes/CMakeLists.txt":"",
+  "ydb/library/yql/providers/pg/expr_nodes/CMakeLists.windows-x86_64.txt":"",
+  "ydb/library/yql/providers/pg/provider/CMakeLists.darwin-arm64.txt":"",
+  "ydb/library/yql/providers/pg/provider/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/library/yql/providers/pg/provider/CMakeLists.linux-aarch64.txt":"",
+  "ydb/library/yql/providers/pg/provider/CMakeLists.linux-x86_64.txt":"",
+  "ydb/library/yql/providers/pg/provider/CMakeLists.txt":"",
+  "ydb/library/yql/providers/pg/provider/CMakeLists.windows-x86_64.txt":"",
   "ydb/library/yql/providers/pq/CMakeLists.txt":"",
   "ydb/library/yql/providers/pq/async_io/CMakeLists.darwin-arm64.txt":"",
   "ydb/library/yql/providers/pq/async_io/CMakeLists.darwin-x86_64.txt":"",

+ 5 - 0
ydb/library/yql/core/common_opt/yql_co_simple1.cpp

@@ -3649,6 +3649,11 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
             return ctx.SwapWithHead(*node);
         }
 
+        if (node->Head().IsCallable("PgTableContent")) {
+            YQL_CLOG(DEBUG, Core) << "Pushdown ExtractMembers to " << node->Head().Content();
+            return ctx.ChangeChild(node->Head(), 2, node->TailPtr());
+        }
+
         return node;
     };
 

+ 9 - 5
ydb/library/yql/core/yql_expr_type_annotation.cpp

@@ -4871,13 +4871,17 @@ bool IsPureIsolatedLambdaImpl(const TExprNode& lambdaBody, TNodeSet& visited, TS
 
     if (syncList) {
         if (auto right = TMaybeNode<TCoRight>(&lambdaBody)) {
-            auto cons = right.Cast().Input().Maybe<TCoCons>();
-            if (!cons) {
-                return false;
+            if (auto cons = right.Cast().Input().Maybe<TCoCons>()) {
+                syncList->emplace(cons.Cast().World().Ptr(), syncList->size());
+                return IsPureIsolatedLambdaImpl(cons.Cast().Input().Ref(), visited, syncList);
             }
 
-            syncList->emplace(cons.Cast().World().Ptr(), syncList->size());
-            return IsPureIsolatedLambdaImpl(cons.Cast().Input().Ref(), visited, syncList);
+            if (right.Cast().Input().Ref().IsCallable("PgReadTable!")) {
+                syncList->emplace(right.Cast().Input().Ref().HeadPtr(), syncList->size());
+                return true;
+            }
+
+            return false;
         }
     }
 

+ 14 - 0
ydb/library/yql/minikql/mkql_program_builder.cpp

@@ -5536,6 +5536,20 @@ TRuntimeNode TProgramBuilder::PgArray(const TArrayRef<const TRuntimeNode>& args,
     return TRuntimeNode(callableBuilder.Build(), false);
 }
 
+TRuntimeNode TProgramBuilder::PgTableContent(
+    const std::string_view& cluster,
+    const std::string_view& table,
+    TType* returnType) {
+    if constexpr (RuntimeVersion < 47U) {
+        THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+    }
+
+    TCallableBuilder callableBuilder(Env, __func__, returnType);
+    callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(cluster));
+    callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(table));
+    return TRuntimeNode(callableBuilder.Build(), false);
+}
+
 TRuntimeNode TProgramBuilder::PgCast(TRuntimeNode input, TType* returnType, TRuntimeNode typeMod) {
     if constexpr (RuntimeVersion < 30U) {
         THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;

+ 4 - 0
ydb/library/yql/minikql/mkql_program_builder.h

@@ -693,6 +693,10 @@ public:
     TRuntimeNode WithContext(TRuntimeNode input, const std::string_view& contextType);
     TRuntimeNode PgInternal0(TType* returnType);
     TRuntimeNode PgArray(const TArrayRef<const TRuntimeNode>& args, TType* returnType);
+    TRuntimeNode PgTableContent(
+        const std::string_view& cluster,
+        const std::string_view& table,
+        TType* returnType);
 
     TRuntimeNode ScalarApply(const TArrayRef<const TRuntimeNode>& args, const TArrayLambda& handler);
 

+ 1 - 1
ydb/library/yql/minikql/mkql_runtime_version.h

@@ -24,7 +24,7 @@ namespace NMiniKQL {
 // 1. Bump this version every time incompatible runtime nodes are introduced.
 // 2. Make sure you provide runtime node generation for previous runtime versions.
 #ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 46U
+#define MKQL_RUNTIME_VERSION 47U
 #endif
 
 // History:

+ 55 - 0
ydb/library/yql/parser/pg_wrapper/comp_factory.cpp

@@ -253,6 +253,52 @@ private:
     }
 };
 
+class TPgTableContent : public TMutableComputationNode<TPgTableContent> {
+    typedef TMutableComputationNode<TPgTableContent> TBaseComputation;
+public:
+    TPgTableContent(
+        TComputationMutables& mutables,
+        const std::string_view& cluster,
+        const std::string_view& table,
+        TType* returnType)
+        : TBaseComputation(mutables)
+        , Cluster_(cluster)
+        , Table_(table)
+        , ItemType_(AS_TYPE(TStructType, AS_TYPE(TListType, returnType)->GetItemType()))
+    {
+        YQL_ENSURE(Cluster_ == "pg_catalog");
+    }
+
+    NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const {
+        TUnboxedValueVector rows;
+        if (Table_ == "pg_type") {
+            NPg::EnumTypes([&](ui32 oid, const NPg::TTypeDesc& desc) {
+                NUdf::TUnboxedValue* items;
+                auto row = compCtx.HolderFactory.CreateDirectArrayHolder(ItemType_->GetMembersCount(), items);
+                if (auto oidPos = ItemType_->FindMemberIndex("oid")) {
+                    items[*oidPos] = ScalarDatumToPod(Datum(oid));
+                }
+
+                if (auto typnamePos = ItemType_->FindMemberIndex("typname")) {
+                    items[*typnamePos] = PointerDatumToPod((Datum)MakeFixedString(desc.Name, NAMEDATALEN));
+                }
+
+                rows.emplace_back(row);
+            });
+        }
+
+        return compCtx.HolderFactory.VectorAsVectorHolder(std::move(rows));
+    }
+
+private:
+    void RegisterDependencies() const final {
+    }
+
+    const std::string_view Cluster_;
+    const std::string_view Table_;
+    TStructType* const ItemType_;
+};
+
 class TFunctionCallInfo {
 public:
     TFunctionCallInfo(ui32 numArgs, const FmgrInfo* finfo)
@@ -1897,6 +1943,15 @@ TComputationNodeFactory GetPgFactory() {
                 return new TPgInternal0(ctx.Mutables);
             }
 
+            if (name == "PgTableContent") {
+                const auto clusterData = AS_VALUE(TDataLiteral, callable.GetInput(0));
+                const auto tableData = AS_VALUE(TDataLiteral, callable.GetInput(1));
+                const auto cluster = clusterData->AsValue().AsStringRef();
+                const auto table = tableData->AsValue().AsStringRef();
+                const auto returnType = callable.GetType()->GetReturnType();
+                return new TPgTableContent(ctx.Mutables, cluster, table, returnType);
+            }
+
             if (name == "PgResolvedCall") {
                 const auto useContextData = AS_VALUE(TDataLiteral, callable.GetInput(0));
                 const auto rangeFunctionData = AS_VALUE(TDataLiteral, callable.GetInput(1));

+ 1 - 0
ydb/library/yql/providers/CMakeLists.txt

@@ -12,6 +12,7 @@ add_subdirectory(config)
 add_subdirectory(dq)
 add_subdirectory(function)
 add_subdirectory(generic)
+add_subdirectory(pg)
 add_subdirectory(pq)
 add_subdirectory(result)
 add_subdirectory(s3)

+ 8 - 0
ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp

@@ -2683,6 +2683,14 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
         return ctx.ProgramBuilder.PgClone(input, dependentNodes);
     });
 
+     AddCallable("PgTableContent", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+        auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
+        return ctx.ProgramBuilder.PgTableContent(
+            node.Child(0)->Content(),
+            node.Child(1)->Content(),
+            returnType);
+    });
+
     AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
         auto input = MkqlBuildExpr(*node.Child(0), ctx);
         return ctx.ProgramBuilder.WithContext(input, node.Child(1)->Content());

+ 12 - 0
ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp

@@ -334,6 +334,18 @@ TExprNode::TPtr DefaultCleanupWorld(const TExprNode::TPtr& node, TExprContext& c
             if (cons) {
                 return cons.Cast().Input().Ptr();
             }
+
+            if (right.Cast().Input().Ref().IsCallable("PgReadTable!")) {
+                const auto& read = right.Cast().Input().Ref();
+                return ctx.Builder(node->Pos())
+                    .Callable("PgTableContent")
+                        .Add(0, read.Child(1)->TailPtr())
+                        .Add(1, read.ChildPtr(2))
+                        .Add(2, read.ChildPtr(3))
+                        .Add(3, read.ChildPtr(4))
+                    .Seal()
+                    .Build();
+            }
         }
 
         return node;

Some files were not shown because too many files changed in this diff