Browse Source

Support YDB in YQL Generic Provider (YQv1) (#2300)

* Support YDB in YQL Generic Provider (for YQv1 only)
* Add `--token-accessor-endpoint` flag to dqrun
* Drop some outdated tests
Vitaly Isaev 1 year ago
parent
commit
fdb5fe47a6

+ 6 - 9
ydb/core/fq/libs/actors/clusters_from_connections.cpp

@@ -216,17 +216,14 @@ void AddClustersFromConnections(
         switch (conn.content().setting().connection_case()) {
         case FederatedQuery::ConnectionSetting::kYdbDatabase: {
             const auto& db = conn.content().setting().ydb_database();
-            auto* clusterCfg = gatewaysConfig.MutableYdb()->AddClusterMapping();
+            auto* clusterCfg = gatewaysConfig.MutableGeneric()->AddClusterMapping();
+            clusterCfg->SetKind(NYql::NConnector::NApi::EDataSourceKind::YDB);
+            clusterCfg->SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
             clusterCfg->SetName(connectionName);
-            clusterCfg->SetId(db.database_id());
-            if (db.database())
-                clusterCfg->SetDatabase(db.database());
-            if (db.endpoint())
-                clusterCfg->SetEndpoint(db.endpoint());
-            clusterCfg->SetSecure(db.secure());
-            clusterCfg->SetAddBearerToToken(common.GetUseBearerForYdb());
+            clusterCfg->SetDatabaseId(db.database_id());
+            clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
             FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures);
-            clusters.emplace(connectionName, YdbProviderName);
+            clusters.emplace(connectionName, GenericProviderName);
             break;
         }
         case FederatedQuery::ConnectionSetting::kClickhouseCluster: {

+ 13 - 3
ydb/core/fq/libs/actors/database_resolver.cpp

@@ -1,5 +1,6 @@
 #include "database_resolver.h"
 
+#include <util/string/split.h>
 #include <ydb/core/fq/libs/common/cache.h>
 #include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
 #include <ydb/core/fq/libs/events/events.h>
@@ -136,7 +137,7 @@ private:
         const auto requestIter = Requests.find(ev->Get()->Request);
         HandledIds++;
 
-        LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got MDB API response: code=" << ev->Get()->Response->Status);
+        LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got API response: code=" << ev->Get()->Response->Status);
 
         try {
             HandleResponse(ev, requestIter, errorMessage, result);
@@ -312,7 +313,12 @@ public:
             }
 
             Y_ENSURE(endpoint);
-            return TDatabaseDescription{endpoint, "", 0, database, secure};
+
+            TVector<TString> split = StringSplitter(endpoint).Split(':');
+
+            Y_ENSURE(split.size() == 2);
+
+            return TDatabaseDescription{endpoint, split[0], FromString(split[1]), database, secure};
         };
         Parsers[NYql::EDatabaseType::Ydb] = ydbParser;
         Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](
@@ -327,9 +333,11 @@ public:
             if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
                 // Replace "ydb." -> "yds."
                 ret.Endpoint[2] = 's';
+                ret.Host[2] = 's';
             }
             if (isDedicatedDb) {
                 ret.Endpoint = "u-" + ret.Endpoint;
+                ret.Host = "u-" + ret.Host;
             }
             return ret;
         };
@@ -486,6 +494,7 @@ private:
             try {
                 TString url;
                 if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, databaseType)) {
+                    YQL_ENSURE(ev->Get()->YdbMvpEndpoint.Size() > 0, "empty YDB MVP Endpoint");
                     url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
                             .AddUrlParam("databaseId", databaseId)
                             .Build();
@@ -497,7 +506,6 @@ private:
                             .AddPathComponent("hosts")
                             .Build();
                 }
-                LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << url);
 
                 NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(url);
 
@@ -507,6 +515,8 @@ private:
                     httpRequest->Set("Authorization", token);
                 }
 
+                LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << "url: "  << httpRequest->URL);
+
                 requests[httpRequest] = TResolveParams{databaseId, databaseType, databaseAuth};
             } catch (const std::exception& e) {
                 const TString msg = TStringBuilder() << "error while preparing to resolve database id: " << databaseId 

+ 1 - 6
ydb/core/fq/libs/actors/run_actor.cpp

@@ -30,7 +30,6 @@
 #include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
 #include <ydb/library/yql/providers/pq/task_meta/task_meta.h>
 #include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
-#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h>
 #include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
 #include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
 #include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
@@ -1940,11 +1939,7 @@ private:
         }
 
         {
-           dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
-        }
-
-        {
-           dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver));
+           dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver, Params.CredentialsFactory));
         }
 
         {

+ 6 - 6
ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp

@@ -175,8 +175,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
                 })",           
             NYql::TDatabaseResolverResponse::TDatabaseDescription{
                 TString{"ydb.serverless.yandexcloud.net:2135"},
-                TString{""},
-                0,
+                TString{"ydb.serverless.yandexcloud.net"},
+                2135,
                 TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
                 true
                 },
@@ -196,8 +196,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
                 })",
             NYql::TDatabaseResolverResponse::TDatabaseDescription{
                 TString{"yds.serverless.yandexcloud.net:2135"},
-                TString{""},
-                0,
+                TString{"yds.serverless.yandexcloud.net"},
+                2135,
                 TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
                 true
                 },
@@ -218,8 +218,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
                 })",
             NYql::TDatabaseResolverResponse::TDatabaseDescription{
                 TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"},
-                TString{""},
-                0,
+                TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net"},
+                2135,
                 TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
                 true
                 },

+ 0 - 1
ydb/core/fq/libs/actors/ya.make

@@ -80,7 +80,6 @@ PEERDIR(
     ydb/library/yql/providers/pq/provider
     ydb/library/yql/providers/pq/task_meta
     ydb/library/yql/providers/s3/provider
-    ydb/library/yql/providers/ydb/provider
     ydb/library/yql/public/issue
     ydb/library/yql/public/issue/protos
     ydb/library/yql/sql/settings

+ 4 - 1
ydb/core/fq/libs/init/init.cpp

@@ -177,7 +177,10 @@ void Init(
         &protoConfig.GetGateways().GetHttpGateway(),
         yqCounters->GetSubgroup("subcomponent", "http_gateway"));
 
-    const auto connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
+    NYql::NConnector::IClient::TPtr connectorClient = nullptr;
+    if (protoConfig.GetGateways().GetGeneric().HasConnector()) {
+        connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
+    }
 
     if (protoConfig.GetTokenAccessor().GetEnabled()) {
         const auto& tokenAccessorConfig = protoConfig.GetTokenAccessor();

+ 1 - 0
ydb/core/kqp/host/kqp_host.cpp

@@ -1552,6 +1552,7 @@ private:
             TypesCtx.Get(),
             FuncRegistry,
             FederatedQuerySetup->DatabaseAsyncResolver,
+            nullptr,
             FederatedQuerySetup->ConnectorClient,
             FederatedQuerySetup->GenericGatewayConfig
         );

+ 0 - 4
ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp

@@ -108,7 +108,6 @@ namespace NKikimr::NKqp {
             // step 3: ReadSplits
             std::vector<ui16> colData = {10, 20, 30, 40, 50};
             clientMock->ExpectReadSplits()
-                .DataSourceInstance(dataSourceInstance)
                 .Split()
                     .Description("some binary description")
                     .Select()
@@ -208,7 +207,6 @@ namespace NKikimr::NKqp {
 
             // step 3: ReadSplits
             clientMock->ExpectReadSplits()
-                .DataSourceInstance(dataSourceInstance)
                 .Split()
                     .Description("some binary description")
                     .Select()
@@ -304,7 +302,6 @@ namespace NKikimr::NKqp {
 
             // step 3: ReadSplits
             clientMock->ExpectReadSplits()
-                .DataSourceInstance(dataSourceInstance)
                 .Split()
                     .Description("some binary description")
                     .Select()
@@ -413,7 +410,6 @@ namespace NKikimr::NKqp {
             std::vector<i32> filterColumnData = {42, 24};
             // clang-format off
             clientMock->ExpectReadSplits()
-                .DataSourceInstance(dataSourceInstance)
                 .Split()
                     .Description("some binary description")
                     .Select(select)

+ 2 - 1
ydb/library/yql/providers/common/proto/gateways_config.proto

@@ -620,7 +620,8 @@ message TGenericGatewayConfig {
 /////////////////////////////// Db Resolver ///////////////////////////////////
 
 message TDbResolverConfig {
-    // Ydb / Yds mvp endpoint
+    // Ydb / Yds MVP endpoint. Expected format: 
+    // [http|https]://host:port/ydbc/cloud-prod/
     optional string YdbMvpEndpoint = 2;
 }
 

+ 2 - 0
ydb/library/yql/providers/generic/actors/ya.make

@@ -8,10 +8,12 @@ SRCS(
 PEERDIR(
     ydb/library/yql/dq/actors/compute
     ydb/library/yql/minikql/computation
+    ydb/library/yql/providers/common/structured_token
     ydb/library/yql/providers/common/token_accessor/client
     ydb/library/yql/providers/generic/proto
     ydb/library/yql/public/types
     ydb/library/yql/providers/generic/connector/libcpp
+    ydb/public/sdk/cpp/client/ydb_types/credentials
 )
 
 YQL_LAST_ABI_VERSION()

Some files were not shown because too many files changed in this diff