Browse Source

YQ-1154: support pg types in yq results

ref:0c1fb8697ed1f7d184838f77a5bdf85424f68540
Sergey Uzhakov 2 years ago
parent
commit
c28bb661e3

+ 1 - 0
ydb/core/driver_lib/run/CMakeLists.txt

@@ -108,6 +108,7 @@ target_link_libraries(run PUBLIC
   ydb-library-pdisk_io
   ydb-library-security
   yql-minikql-comp_nodes
+  yql-minikql-computation
   udf-service-exception_policy
   public-lib-base
   lib-deprecated-client

+ 3 - 1
ydb/core/driver_lib/run/factories.h

@@ -15,6 +15,7 @@
 #include <ydb/library/pdisk_io/aio.h>
 #include <ydb/core/yq/libs/config/protos/audit.pb.h>
 
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
 #include <ydb/library/yql/providers/pq/cm_client/interface/client.h>
 
 #include <library/cpp/actors/core/actorsystem.h>
@@ -36,7 +37,7 @@ struct TModuleFactories {
     std::shared_ptr<NKqp::IQueryReplayBackendFactory> QueryReplayBackendFactory;
     //
     std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory;
-    // Can be nullptr. In that case there would be no ability to work with Yandex Logbroker in Yandex Query.
+    // Can be nullptr. In that case there would be no ability to work with internal configuration manager.
     NPq::NConfigurationManager::IConnections::TPtr PqCmConnections;
     // Export implementation for Data Shards
     std::shared_ptr<NDataShard::IExportFactory> DataShardExportFactory;
@@ -59,6 +60,7 @@ struct TModuleFactories {
     std::shared_ptr<NSQS::IAuthFactory> SqsAuthFactory;
 
     std::shared_ptr<NHttpProxy::IAuthFactory> DataStreamsAuthFactory;
+    std::vector<NKikimr::NMiniKQL::TComputationNodeFactory> AdditionalComputationNodeFactories;
 
     ~TModuleFactories();
 };

+ 3 - 1
ydb/core/driver_lib/run/kikimr_services_initializers.cpp

@@ -142,6 +142,7 @@
 #include <ydb/library/folder_service/proto/config.pb.h>
 
 #include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+#include <ydb/library/yql/parser/pg_wrapper/comp_factory.h>
 
 #include <library/cpp/actors/protos/services_common.pb.h>
 
@@ -2332,7 +2333,8 @@ void TYandexQueryInitializer::InitializeServices(TActorSystemSetup* setup, const
         Factories->FolderServiceFactory,
         Factories->YqAuditServiceFactory,
         Factories->YdbCredentialProviderFactory,
-        IcPort
+        IcPort,
+        Factories->AdditionalComputationNodeFactories
         );
 }
 

+ 2 - 1
ydb/core/kqp/provider/yql_kikimr_results.cpp

@@ -1212,8 +1212,9 @@ bool IsSameType(const NKikimrMiniKQL::TType& actual, const NKikimrMiniKQL::TType
             return IsSameType(actual.GetVariant(), expected.GetVariant());
         case NKikimrMiniKQL::ETypeKind::Null:
             return true;
+        case NKikimrMiniKQL::ETypeKind::Pg:
+            return actual.GetPg().Getoid() == expected.GetPg().Getoid();
         case NKikimrMiniKQL::ETypeKind::Unknown:
-        case NKikimrMiniKQL::ETypeKind::Reserved_10:
         case NKikimrMiniKQL::ETypeKind::Reserved_11:
         case NKikimrMiniKQL::ETypeKind::Reserved_12:
         case NKikimrMiniKQL::ETypeKind::Reserved_13:

+ 2 - 1
ydb/core/testlib/test_client.cpp

@@ -818,7 +818,8 @@ namespace Tests {
                 NKikimr::NFolderService::CreateMockFolderServiceActor,
                 NYq::CreateMockYqAuditServiceActor,
                 ydbCredFactory,
-                /*IcPort = */0
+                /*IcPort = */0,
+                {}
                 );
             NYq::InitTest(Runtime.Get(), port, Settings->GrpcPort, YqSharedResources);
         }

+ 7 - 3
ydb/core/yq/libs/init/init.cpp

@@ -57,7 +57,8 @@ void Init(
     const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
     const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig, const NMonitoring::TDynamicCounterPtr& counters)>& auditServiceFactory,
     const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
-    const ui32& icPort
+    ui32 icPort,
+    const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories
     )
 {
     Y_VERIFY(iyqSharedResources, "No YQ shared resources created");
@@ -104,11 +105,14 @@ void Init(
     auto yqCounters = appData->Counters->GetSubgroup("counters", "yq");
     auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters(yqCounters->GetSubgroup("subsystem", "worker_manager"));
 
-    NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
+    TVector<NKikimr::NMiniKQL::TComputationNodeFactory> compNodeFactories = {
         NYql::GetCommonDqFactory(),
         NYql::GetDqYdbFactory(yqSharedResources->UserSpaceYdbDriver),
         NKikimr::NMiniKQL::GetYqlFactory()
-    });
+    };
+
+    compNodeFactories.insert(compNodeFactories.end(), additionalCompNodeFactories.begin(), additionalCompNodeFactories.end());
+    NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory(std::move(compNodeFactories));
 
     NYql::TTaskTransformFactory dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({
         NYql::CreateCommonDqTaskTransformFactory(),

+ 2 - 1
ydb/core/yq/libs/init/init.h

@@ -37,7 +37,8 @@ void Init(
     const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
     const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig, const NMonitoring::TDynamicCounterPtr& counters)>& auditServiceFactory,
     const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
-    const ui32& icPort
+    ui32 icPort,
+    const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories
 );
 
 } // NYq

+ 1 - 0
ydb/library/mkql_proto/CMakeLists.txt

@@ -19,6 +19,7 @@ target_link_libraries(ydb-library-mkql_proto PUBLIC
   api-protos
   library-yql-minikql
   yql-minikql-computation
+  providers-common-codec
 )
 target_sources(ydb-library-mkql_proto PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/mkql_proto/mkql_proto.cpp

+ 60 - 10
ydb/library/mkql_proto/mkql_proto.cpp

@@ -6,6 +6,7 @@
 #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
 #include <ydb/library/yql/minikql/mkql_type_ops.h>
 #include <ydb/library/yql/public/decimal/yql_decimal.h>
+#include <ydb/library/yql/providers/common/codec/yql_pg_codec.h>
 
 #include <library/cpp/containers/stack_vector/stack_vec.h>
 
@@ -148,6 +149,13 @@ void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) {
             break;
         }
 
+        case TType::EKind::Pg: {
+            auto pgType = static_cast<TPgType *>(type);
+            res.SetKind(NKikimrMiniKQL::ETypeKind::Pg);
+            res.MutablePg()->set_oid(pgType->GetTypeId());
+            break;
+        }
+
         case TType::EKind::Optional: {
             auto optionalType = static_cast<TOptionalType *>(type);
             res.SetKind(NKikimrMiniKQL::ETypeKind::Optional);
@@ -255,6 +263,13 @@ void ExportTypeToProtoImpl(TType* type, Ydb::Type& res) {
             break;
         }
 
+        case TType::EKind::Pg: {
+            auto pgType = static_cast<TPgType*>(type);
+            auto t = res.mutable_pg_type();
+            t->set_oid(pgType->GetTypeId());
+            break;
+        }
+
         case TType::EKind::Optional: {
             auto optionalType = static_cast<TOptionalType*>(type);
             ExportTypeToProtoImpl(optionalType->GetItemType(), *res.mutable_optional_type()->mutable_item());
@@ -439,6 +454,17 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, NK
             break;
         }
 
+        case TType::EKind::Pg: {
+            if (!value) {
+                // do not set Text and Bytes fields
+                return;
+            }
+            auto pgType = static_cast<TPgType*>(type);
+            auto textValue = NYql::NCommon::PgValueToString(value, pgType->GetTypeId());
+            res.SetText(textValue);
+            break;
+        }
+
         case TType::EKind::Optional: {
             auto optionalType = static_cast<TOptionalType*>(type);
             if (value) {
@@ -523,7 +549,7 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, Yd
         case TType::EKind::Void:
         case TType::EKind::EmptyList:
         case TType::EKind::EmptyDict:
-	    break;
+        break;
 
         case TType::EKind::Null: {
             res.set_null_flag_value(::google::protobuf::NULL_VALUE);
@@ -535,6 +561,17 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, Yd
             break;
         }
 
+        case TType::EKind::Pg: {
+            if (!value) {
+                // do not set Text and Bytes fields
+                return;
+            }
+            auto pgType = static_cast<TPgType*>(type);
+            auto textValue = NYql::NCommon::PgValueToString(value, pgType->GetTypeId());
+            res.set_text_value(textValue);
+            break;
+        }
+
         case TType::EKind::Optional: {
             auto optionalType = static_cast<TOptionalType*>(type);
             if (value.HasValue()) {
@@ -951,6 +988,10 @@ TType* TProtoImporter::ImportTypeFromProto(const NKikimrMiniKQL::TType& type) {
                 return TDataType::Create(schemeType, env);
             }
         }
+        case NKikimrMiniKQL::ETypeKind::Pg: {
+            const NKikimrMiniKQL::TPgType& protoPgType = type.GetPg();
+            return TPgType::Create(protoPgType.Getoid(), env);
+        }
         case NKikimrMiniKQL::ETypeKind::Optional: {
             const NKikimrMiniKQL::TOptionalType& protoOptionalType = type.GetOptional();
             TType* child = ImportTypeFromProto(protoOptionalType.GetItem());
@@ -1010,13 +1051,13 @@ TType* TProtoImporter::ImportTypeFromProto(const NKikimrMiniKQL::TType& type) {
 
 TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TValue& value) {
     switch (type->GetKind()) {
-        case TCallableType::EKind::Void: {
+        case TType::EKind::Void: {
             return env.GetVoid();
         }
-        case TCallableType::EKind::Null: {
+        case TType::EKind::Null: {
             return env.GetNull();
         }
-        case TCallableType::EKind::Data: {
+        case TType::EKind::Data: {
             TDataType* dataType = static_cast<TDataType*>(type);
             TDataLiteral* dataNode = nullptr;
             switch (const auto schemeType = dataType->GetSchemeType()) {
@@ -1114,7 +1155,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
             }
             return dataNode;
         }
-        case TCallableType::EKind::Optional: {
+        case TType::EKind::Optional: {
             TOptionalType* optionalType = static_cast<TOptionalType*>(type);
             TOptionalLiteral* optionalNode;
             if (value.HasOptional()) {
@@ -1126,7 +1167,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
             }
             return optionalNode;
         }
-        case TCallableType::EKind::List: {
+        case TType::EKind::List: {
             TListType* listType = static_cast<TListType*>(type);
             TType* itemType = listType->GetItemType();
             TVector<TRuntimeNode> items;
@@ -1140,7 +1181,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
             TListLiteral* listNode = TListLiteral::Create(items.data(), items.size(), listType, env);
             return listNode;
         }
-        case TCallableType::EKind::Tuple: {
+        case TType::EKind::Tuple: {
             TTupleType* tupleType = static_cast<TTupleType*>(type);
             ui32 elementsCount = tupleType->GetElementsCount();
             MKQL_ENSURE(elementsCount == value.TupleSize(), "Invalid protobuf format, tuple size mismatch between Type and Value");
@@ -1154,7 +1195,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
             TTupleLiteral* tupleNode = TTupleLiteral::Create(elements.size(), elements.data(), tupleType, env);
             return tupleNode;
         }
-        case TCallableType::EKind::Struct: {
+        case TType::EKind::Struct: {
             TStructType* structType = static_cast<TStructType*>(type);
             ui32 membersCount = structType->GetMembersCount();
             MKQL_ENSURE(membersCount == value.StructSize(), "Invalid protobuf format, struct size mismatch between Type and Value");
@@ -1170,7 +1211,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
             TStructLiteral* structNode = TStructLiteral::Create(members.size(), members.data(), structType, env);
             return structNode;
         }
-        case TCallableType::EKind::Dict: {
+        case TType::EKind::Dict: {
             TDictType* dictType = static_cast<TDictType*>(type);
             ui32 dictSize = value.DictSize();
 
@@ -1185,7 +1226,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
             TDictLiteral* dictNode = TDictLiteral::Create(items.size(), items.data(), dictType, env);
             return dictNode;
         }
-        case TCallableType::EKind::Variant: {
+        case TType::EKind::Variant: {
             TVariantType* variantType = static_cast<TVariantType*>(type);
             auto variantIndex = value.GetVariantIndex();
             TType* innerType = variantType->GetAlternativeType(variantIndex);
@@ -1218,6 +1259,15 @@ NUdf::TUnboxedValue TProtoImporter::ImportValueFromProto(const TType* type, cons
         case TType::EKind::Data:
             return HandleKindDataImport(type, value);
 
+        case TType::EKind::Pg: {
+            auto pgType = static_cast<const TPgType*>(type);
+            MKQL_ENSURE(!value.HasBytes(), "Pg binary format is not supported");
+            if (!value.HasText() && !value.HasBytes()) {
+                return NUdf::TUnboxedValue();
+            }
+            return NYql::NCommon::PgValueFromString(value.GetText(), pgType->GetTypeId());
+        }
+
         case TType::EKind::Optional: {
             auto optionalType = static_cast<const TOptionalType*>(type);
             if (value.HasOptional()) {

+ 14 - 1
ydb/library/mkql_proto/mkql_proto_ut.cpp

@@ -16,6 +16,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) {
 
         UNIT_ASSERT(!CanExportType(pgmBuilder.NewVoid().GetStaticType()->GetType(), env));
         UNIT_ASSERT(CanExportType(pgmBuilder.NewVoid().GetStaticType(), env));
+        UNIT_ASSERT(CanExportType(pgmBuilder.NewPgType(16), env));
         auto dtype = pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id);
         UNIT_ASSERT(CanExportType(dtype, env));
         UNIT_ASSERT(CanExportType(pgmBuilder.NewOptionalType(dtype), env));
@@ -45,7 +46,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) {
         "}\n");
     }
 
- Y_UNIT_TEST(TestExportDecimalType) {
+    Y_UNIT_TEST(TestExportDecimalType) {
         TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) {
             NYql::NDecimal::TInt128 x;
             ui64* p = (ui64*)&x;
@@ -64,6 +65,18 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) {
         "}\n");
     }
 
+    Y_UNIT_TEST(TestExportPgType) {
+        TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) {
+            auto pgType = static_cast<TPgType*>(pgmBuilder.NewPgType(16));
+            auto pgmReturn = pgmBuilder.PgConst(pgType, "true");
+            return pgmReturn;
+        },
+        "Kind: Pg\n"
+        "Pg {\n"
+        "  oid: 16\n"
+        "}\n");
+    }
+
     Y_UNIT_TEST(TestExportUuidType) {
         TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) {
             auto pgmReturn = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Uuid>(TStringBuf("\1\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"sv));

Some files were not shown because too many files changed in this diff