Browse Source

YQ Connector: refactor external databases and database resolving

1. Поправлено именование некоторых типов, связанных с ресолвингом MDB кластеров.
2. Устранено лишнее преобразование, подразумевавшее парсинг строки `TString -> std::pair<TString, ui32>`
3. Устранен лишний `enum`, функциональность которого полностью закрывается имеющимся `NYql::EDatabaseType`
4. Юнит-тесты на имплементации `IMdbEndpointGenerator` перенесены к ним поближе.
vitalyisaev 1 year ago
parent
commit
3a5947c0ad

+ 5 - 0
.mapping.json

@@ -4186,6 +4186,11 @@
   "ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt":"",
   "ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.txt":"",
   "ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/fq/libs/db_id_async_resolver_impl/ut/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/fq/libs/db_id_async_resolver_impl/ut/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/fq/libs/db_id_async_resolver_impl/ut/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/fq/libs/db_id_async_resolver_impl/ut/CMakeLists.txt":"",
+  "ydb/core/fq/libs/db_id_async_resolver_impl/ut/CMakeLists.windows-x86_64.txt":"",
   "ydb/core/fq/libs/db_schema/CMakeLists.darwin-x86_64.txt":"",
   "ydb/core/fq/libs/db_schema/CMakeLists.linux-aarch64.txt":"",
   "ydb/core/fq/libs/db_schema/CMakeLists.linux-x86_64.txt":"",

+ 1 - 1
ydb/core/external_sources/CMakeLists.darwin-x86_64.txt

@@ -15,7 +15,7 @@ target_link_libraries(ydb-core-external_sources PUBLIC
   library-cpp-scheme
   ydb-core-base
   ydb-core-protos
-  generic-connector-libcpp
+  providers-common-db_id_async_resolver
   providers-s3-path_generator
   cpp-client-ydb_params
   cpp-client-ydb_value

+ 1 - 1
ydb/core/external_sources/CMakeLists.linux-aarch64.txt

@@ -16,7 +16,7 @@ target_link_libraries(ydb-core-external_sources PUBLIC
   library-cpp-scheme
   ydb-core-base
   ydb-core-protos
-  generic-connector-libcpp
+  providers-common-db_id_async_resolver
   providers-s3-path_generator
   cpp-client-ydb_params
   cpp-client-ydb_value

+ 1 - 1
ydb/core/external_sources/CMakeLists.linux-x86_64.txt

@@ -16,7 +16,7 @@ target_link_libraries(ydb-core-external_sources PUBLIC
   library-cpp-scheme
   ydb-core-base
   ydb-core-protos
-  generic-connector-libcpp
+  providers-common-db_id_async_resolver
   providers-s3-path_generator
   cpp-client-ydb_params
   cpp-client-ydb_value

+ 1 - 1
ydb/core/external_sources/CMakeLists.windows-x86_64.txt

@@ -15,7 +15,7 @@ target_link_libraries(ydb-core-external_sources PUBLIC
   library-cpp-scheme
   ydb-core-base
   ydb-core-protos
-  generic-connector-libcpp
+  providers-common-db_id_async_resolver
   providers-s3-path_generator
   cpp-client-ydb_params
   cpp-client-ydb_value

+ 7 - 4
ydb/core/external_sources/external_source_factory.cpp

@@ -6,7 +6,7 @@
 #include <util/string/cast.h>
 
 #include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
-#include <ydb/library/yql/providers/generic/connector/libcpp/external_data_source.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
 
 
 namespace NKikimr::NExternalSource {
@@ -34,13 +34,16 @@ private:
 
 IExternalSourceFactory::TPtr CreateExternalSourceFactory() {
     return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
-        {"ObjectStorage", CreateObjectStorageExternalSource()},
         {
-            ToString(NYql::NConnector::EExternalDataSource::ClickHouse),
+            ToString(NYql::EDatabaseType::ObjectStorage),
+            CreateObjectStorageExternalSource()
+        },
+        {
+            ToString(NYql::EDatabaseType::ClickHouse),
             CreateExternalDataSource(TString{NYql::GenericProviderName}, {"MDB_BASIC", "BASIC"}, {"database_name", "protocol", "mdb_cluster_id", "use_tls"})
         },
         {
-            ToString(NYql::NConnector::EExternalDataSource::PostgreSQL), 
+            ToString(NYql::EDatabaseType::PostgreSQL),
             CreateExternalDataSource(TString{NYql::GenericProviderName}, {"MDB_BASIC", "BASIC"}, {"database_name", "protocol", "mdb_cluster_id", "use_tls", "schema"})
         }
     });

+ 1 - 1
ydb/core/external_sources/ya.make

@@ -10,7 +10,7 @@ PEERDIR(
     library/cpp/scheme
     ydb/core/base
     ydb/core/protos
-    ydb/library/yql/providers/generic/connector/libcpp
+    ydb/library/yql/providers/common/db_id_async_resolver
     ydb/library/yql/providers/s3/path_generator
     ydb/public/sdk/cpp/client/ydb_params
     ydb/public/sdk/cpp/client/ydb_value

+ 60 - 55
ydb/core/fq/libs/actors/database_resolver.cpp

@@ -19,18 +19,21 @@ namespace NFq {
 using namespace NActors;
 using namespace NYql;
 
-using TEndpoint = NYql::TDatabaseResolverResponse::TEndpoint;
-using TParser = std::function<TEndpoint(NJson::TJsonValue& body, const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator, bool useTls)>;
+using TDatabaseDescription = NYql::TDatabaseResolverResponse::TDatabaseDescription;
+using TParser = std::function<TDatabaseDescription(NJson::TJsonValue& body, const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator, bool useTls)>;
 using TParsers = THashMap<NYql::EDatabaseType, TParser>;
 
-struct TDatabaseDescription {
-    TString Id;
-    NYql::EDatabaseType Type = NYql::EDatabaseType::Ydb;
-    NYql::TDatabaseAuth Auth;
+struct TResolveParams {
+    // Treat ID as:
+    // - cluster ID (ClickHouse, PostgreSQL)
+    // - database ID (YDB)
+    TString Id; 
+    NYql::EDatabaseType DatabaseType = NYql::EDatabaseType::Ydb;
+    NYql::TDatabaseAuth DatabaseAuth;
 };
 
-using TCache = TTtlCache<std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>, std::variant<TEndpoint, TString>>;
-using TRequestMap = THashMap<NHttp::THttpOutgoingRequestPtr, TDatabaseDescription>;
+using TCache = TTtlCache<std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>, std::variant<TDatabaseDescription, TString>>;
+using TRequestMap = THashMap<NHttp::THttpOutgoingRequestPtr, TResolveParams>;
 
 class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
 {
@@ -42,7 +45,7 @@ public:
     TResponseProcessor(
         const TActorId sender,
         TCache& cache,
-        const TDatabaseResolverResponse::TDatabaseEndpointsMap& ready,
+        const TDatabaseResolverResponse::TDatabaseDescriptionMap& ready,
         const TRequestMap& requests,
         const TString& traceId,
         const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
@@ -85,7 +88,7 @@ private:
         TString errorMsg = "Could not resolve database ids: ";
         bool firstUnresolvedDbId = true;
         for (const auto& [_, params]: Requests) {
-            if (const auto it = DatabaseId2Endpoint.find(std::make_pair(params.Id, params.Type)); it == DatabaseId2Endpoint.end()) {
+            if (const auto it = DatabaseId2Endpoint.find(std::make_pair(params.Id, params.DatabaseType)); it == DatabaseId2Endpoint.end()) {
                 errorMsg += (firstUnresolvedDbId ? TString{""} : TString{", "}) + params.Id;
                 if (firstUnresolvedDbId)
                     firstUnresolvedDbId = false;
@@ -113,7 +116,7 @@ private:
     {
         TString status;
         TString errorMessage;
-        TMaybe<TEndpoint> result;
+        TMaybe<TDatabaseDescription> result;
         auto requestIter = Requests.find(ev->Get()->Request);
         HandledIds++;
 
@@ -121,39 +124,39 @@ private:
             NJson::TJsonReaderConfig jsonConfig;
             NJson::TJsonValue databaseInfo;
 
-            LOG_D("Got databaseId response " << ev->Get()->Response->Body);
+            LOG_D("Got database id response " << ev->Get()->Response->Body);
             if (requestIter == Requests.end()) {
-                errorMessage = "Unknown databaseId";
+                errorMessage = "Unknown database id";
             } else {
                 const auto& params = requestIter->second;
                 const bool parseJsonOk = NJson::ReadJsonTree(ev->Get()->Response->Body, &jsonConfig, &databaseInfo);
                 TParsers::const_iterator parserIt;
-                if (parseJsonOk && (parserIt = Parsers.find(params.Type)) != Parsers.end()) {
+                if (parseJsonOk && (parserIt = Parsers.find(params.DatabaseType)) != Parsers.end()) {
                     try {
-                        auto res = parserIt->second(databaseInfo, MdbEndpointGenerator, params.Auth.UseTls);
-                        LOG_D("Got db_id: " << params.Id
-                            << ", db type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(params.Type)
+                        auto res = parserIt->second(databaseInfo, MdbEndpointGenerator, params.DatabaseAuth.UseTls);
+                        LOG_D("database id: " << params.Id
+                            << ", database type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(params.DatabaseType)
                             << ", endpoint: " << res.Endpoint
                             << ", database: " << res.Database);
-                        DatabaseId2Endpoint[std::make_pair(params.Id, params.Type)] = res;
+                        DatabaseId2Endpoint[std::make_pair(params.Id, params.DatabaseType)] = res;
                         result.ConstructInPlace(res);
                     } catch (...) {
                         errorMessage = TStringBuilder()
                             << " Couldn't resolve "
-                            << "databaseId: " << params.Id
-                            << ", db type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(params.Type) << "\n"
+                            << "database id: " << params.Id
+                            << ", database type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(params.DatabaseType) << "\n"
                             << CurrentExceptionMessage();
                     }
                 } else {
                     errorMessage = TStringBuilder() << "Unable to parse database information. "
-                        << "Database Id: " << params.Id
-                        << ", db type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(params.Type);
+                        << "database id: " << params.Id
+                        << ", database type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(params.DatabaseType);
                 }
             }
         } else {
             errorMessage = ev->Get()->Error;
             const TString error = TStringBuilder()
-                << "Cannot resolve databaseId (status = " + ToString(status) + "). "
+                << "Cannot resolve database id (status = " + ToString(status) + "). "
                 << "Response body from " << ev->Get()->Request->URL << ": " << (ev->Get()->Response ? ev->Get()->Response->Body : "empty");
             if (!errorMessage.empty()) {
                 errorMessage += '\n';
@@ -166,7 +169,7 @@ private:
             Success = false;
         } else {
             const auto& params = requestIter->second;
-            auto key = std::make_tuple(params.Id, params.Type, params.Auth);
+            auto key = std::make_tuple(params.Id, params.DatabaseType, params.DatabaseAuth);
             if (errorMessage) {
                 Cache.Put(key, errorMessage);
             } else {
@@ -187,7 +190,7 @@ private:
     const TRequestMap Requests;
     const TString TraceId;
     const NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
-    TDatabaseResolverResponse::TDatabaseEndpointsMap DatabaseId2Endpoint;
+    TDatabaseResolverResponse::TDatabaseDescriptionMap DatabaseId2Endpoint;
     size_t HandledIds = 0;
     bool Success = true;
     const TParsers& Parsers;
@@ -226,7 +229,7 @@ public:
             }
 
             Y_ENSURE(endpoint);
-            return TEndpoint{endpoint, database, secure};
+            return TDatabaseDescription{endpoint, "", 0, database, secure};
         };
         Parsers[NYql::EDatabaseType::Ydb] = ydbParser;
         Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator, bool useTls)
@@ -240,29 +243,32 @@ public:
             return ret;
         };
         Parsers[NYql::EDatabaseType::ClickHouse] = [](NJson::TJsonValue& databaseInfo, const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator, bool useTls) {
-            TString endpoint;
+            NYql::IMdbEndpointGenerator::TEndpoint endpoint;
             TVector<TString> aliveHosts;
+
             for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) {
                 if (host["health"].GetString() == "ALIVE" && host["type"].GetString() == "CLICKHOUSE") {
                     aliveHosts.push_back(host["name"].GetString());
                 }
             }
-            if (!aliveHosts.empty()) {
-                endpoint = mdbEndpointGenerator->ToEndpoint(
-                    NYql::EDatabaseType::ClickHouse,
-                    aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
-                    useTls
-                );
-            }
-            if (!endpoint) {
+
+            if (aliveHosts.empty()) {
                 ythrow yexception() << "No ALIVE ClickHouse hosts found";
             }
-            return TEndpoint{endpoint, "", useTls};
+
+            endpoint = mdbEndpointGenerator->ToEndpoint(
+                NYql::EDatabaseType::ClickHouse,
+                aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
+                useTls
+            );
+
+            return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
         };
 
         Parsers[NYql::EDatabaseType::PostgreSQL] = [](NJson::TJsonValue& databaseInfo, const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator, bool useTls) {
-            TString endpoint;
+            NYql::IMdbEndpointGenerator::TEndpoint endpoint;
             TVector<TString> aliveHosts;
+
             for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) {
                 // all host services must be alive
                 bool alive = true;
@@ -277,19 +283,18 @@ public:
                     aliveHosts.push_back(host["name"].GetString());
                 }
             }
-            if (!aliveHosts.empty()) {
-                endpoint = mdbEndpointGenerator->ToEndpoint(
-                    NYql::EDatabaseType::PostgreSQL,
-                    aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
-                    useTls
-                );
-            }
-
-            if (!endpoint) {
+            
+            if (aliveHosts.empty()) {
                 ythrow yexception() << "No ALIVE PostgreSQL hosts found";
             }
 
-            return TEndpoint{endpoint, "", useTls};
+            endpoint = mdbEndpointGenerator->ToEndpoint(
+                NYql::EDatabaseType::PostgreSQL,
+                aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
+                useTls
+            );
+
+            return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
         };
     }
 
@@ -303,7 +308,7 @@ private:
 
     void SendResponse(
         const NActors::TActorId& recipient,
-        TDatabaseResolverResponse::TDatabaseEndpointsMap&& ready,
+        TDatabaseResolverResponse::TDatabaseDescriptionMap&& ready,
         bool success = true,
         const TString& errorMessage = "")
     {
@@ -318,12 +323,12 @@ private:
     void Handle(TEvents::TEvEndpointRequest::TPtr ev)
     {
         TraceId = ev->Get()->TraceId;
-        LOG_D("Start databaseId resolver for " << ev->Get()->DatabaseIds.size() << " ids");
-        TRequestMap requests; // request, (dbId, type, info)
-        TDatabaseResolverResponse::TDatabaseEndpointsMap ready;
+        LOG_D("Start database id resolver for " << ev->Get()->DatabaseIds.size() << " ids");
+        TRequestMap requests;
+        TDatabaseResolverResponse::TDatabaseDescriptionMap ready;
         for (const auto& [p, databaseAuth] : ev->Get()->DatabaseIds) {
             const auto& [databaseId, databaseType] = p;
-            TMaybe<std::variant<TEndpoint, TString>> cacheVal;
+            TMaybe<std::variant<TDatabaseDescription, TString>> cacheVal;
             auto key = std::make_tuple(databaseId, databaseType, databaseAuth);
             if (Cache.Get(key, &cacheVal)) {
                 switch(cacheVal->index()) {
@@ -351,7 +356,7 @@ private:
                 } else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL }, databaseType)) {
                     YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
                     url = TUrlBuilder(
-                        ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeToString(databaseType) + "/v1/clusters/")
+                        ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeToMdbUrlPath(databaseType) + "/v1/clusters/")
                             .AddPathComponent(databaseId)
                             .AddPathComponent("hosts")
                             .Build();
@@ -366,9 +371,9 @@ private:
                     httpRequest->Set("Authorization", token);
                 }
 
-                requests[httpRequest] = TDatabaseDescription{databaseId, databaseType, databaseAuth};
+                requests[httpRequest] = TResolveParams{databaseId, databaseType, databaseAuth};
             } catch (const std::exception& e) {
-                const TString msg = TStringBuilder() << " Error while preparing to resolve databaseId: " << databaseId << ", details: " << e.what();
+                const TString msg = TStringBuilder() << " Error while preparing to resolve database id: " << databaseId << ", details: " << e.what();
                 LOG_E(msg);
                 Cache.Put(key, msg);
                 SendResponse(ev->Sender, {}, /*success=*/false, msg);

+ 1 - 0
ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt

@@ -6,6 +6,7 @@
 # original buildsystem will not be accepted.
 
 
+add_subdirectory(ut)
 
 add_library(fq-libs-db_id_async_resolver_impl)
 target_compile_options(fq-libs-db_id_async_resolver_impl PRIVATE

+ 1 - 0
ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt

@@ -6,6 +6,7 @@
 # original buildsystem will not be accepted.
 
 
+add_subdirectory(ut)
 
 add_library(fq-libs-db_id_async_resolver_impl)
 target_compile_options(fq-libs-db_id_async_resolver_impl PRIVATE

Some files were not shown because too many files changed in this diff