Browse Source

YQ-831 Refactoring DatabaseAsyncResolver

Refactoring

ref:99c3eae012ffa2985b4824a0b9aeebff6f5efb3c
dinmukhammed 3 years ago
parent
commit
457c97d555

+ 1 - 0
ydb/core/yq/libs/actors/pending_fetcher.cpp

@@ -67,6 +67,7 @@
 namespace NYq {
 
 using namespace NActors;
+using namespace NYql;
 
 namespace {
 

+ 2 - 5
ydb/core/yq/libs/actors/run_actor.cpp

@@ -69,7 +69,6 @@
 #include <ydb/core/yq/libs/checkpointing_common/defs.h>
 #include <ydb/core/yq/libs/checkpoint_storage/storage_service.h>
 #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h>
-#include <ydb/core/yq/libs/common/database_token_builder.h>
 #include <ydb/core/yq/libs/private_client/private_client.h>
 
 #define LOG_E(stream) \
@@ -283,7 +282,6 @@ private:
                 LOG_D("Connection with empty name " << connection.meta().id());
                 continue;
             }
-            Connections[connection.content().name()] = connection; // Necessary for TDatabaseAsyncResolverWithMeta
             YqConnections.emplace(connection.meta().id(), connection);
         }
     }
@@ -1138,8 +1136,8 @@ private:
             clusters);
 
         TVector<TDataProviderInitializer> dataProvidersInit;
-        const auto dbResolver = std::make_shared<TDatabaseAsyncResolverWithMeta>(TDatabaseAsyncResolverWithMeta(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver,
-            Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId, Params.AuthToken, Params.AccountIdSignatures, Connections));
+        const std::shared_ptr<IDatabaseAsyncResolver> dbResolver = std::make_shared<TDatabaseAsyncResolverImpl>(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver,
+            Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId);
         {
             // TBD: move init to better place
             QueryStateUpdateRequest.set_scope(Params.Scope.ToString());
@@ -1350,7 +1348,6 @@ private:
     bool EnableCheckpointCoordinator = false;
     bool RetryNeeded = false;
     Yq::Private::PingTaskRequest QueryStateUpdateRequest;
-    THashMap<TString, YandexQuery::Connection> Connections; // Necessary for DbAsyncResolver
 
     const ui64 MaxTasksPerOperation = 100;
 

+ 1 - 1
ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp

@@ -66,7 +66,7 @@ public:
         }
 
         const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
-        AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) {
+        AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) {
             if (const auto res = response.lock())
                 *res = std::move(future.ExtractValue());
         });

+ 3 - 3
ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp

@@ -6,9 +6,9 @@ namespace NYql {
 
 TDataProviderInitializer GetClickHouseDataProviderInitializer(
     IHTTPGateway::TPtr gateway,
-    const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta)
+    const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver)
 {
-    return [gateway, dbResolverWithMeta] (
+    return [gateway, dbResolver] (
         const TString& userName,
         const TString& sessionId,
         const TGatewaysConfig* gatewaysConfig,
@@ -29,7 +29,7 @@ TDataProviderInitializer GetClickHouseDataProviderInitializer(
 
         state->Types = typeCtx.Get();
         state->FunctionRegistry = functionRegistry;
-        state->DbResolver = dbResolverWithMeta;
+        state->DbResolver = dbResolver;
         if (gatewaysConfig) {
             state->Configuration->Init(gatewaysConfig->GetClickHouse(), state->DbResolver, state->DatabaseIds);
         }

+ 5 - 4
ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h

@@ -1,9 +1,10 @@
 #pragma once
 
+#include "yql_clickhouse_settings.h"
+
 #include <ydb/library/yql/core/yql_data_provider.h>
 #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
-#include "yql_clickhouse_settings.h"
-#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
 
 namespace NKikimr::NMiniKQL {
    class IFunctionRegistry;
@@ -27,12 +28,12 @@ struct TClickHouseState : public TThrRefBase
     TClickHouseConfiguration::TPtr Configuration = MakeIntrusive<TClickHouseConfiguration>();
     const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
     THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds;
-    std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> DbResolver;
+    std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver;
 };
 
 TDataProviderInitializer GetClickHouseDataProviderInitializer(
     IHTTPGateway::TPtr gateway,
-    std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta = nullptr
+    std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr
 );
 
 TIntrusivePtr<IDataProvider> CreateClickHouseDataSource(TClickHouseState::TPtr state, IHTTPGateway::TPtr gateway);

+ 7 - 5
ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h

@@ -1,12 +1,13 @@
 #pragma once
 
+#include <ydb/library/yql/utils/log/log.h>
 #include <ydb/library/yql/providers/common/config/yql_dispatch.h>
 #include <ydb/library/yql/providers/common/config/yql_setting.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
 #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
 
 #include <ydb/core/yq/libs/events/events.h>
-#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h>
-#include <ydb/core/yq/libs/common/database_token_builder.h>
+
 
 namespace NYql {
 
@@ -23,7 +24,7 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS
     template <typename TProtoConfig>
     void Init(
         const TProtoConfig& config,
-        const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver,
+        const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
         THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds)
     {
         TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
@@ -37,10 +38,11 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS
         for (auto& cluster: config.GetClusterMapping()) {
             this->Dispatch(cluster.GetName(), cluster.GetSettings());
 
-            if (dbResolver) { //TODO: change log level
+            if (dbResolver) {
                 YQL_CLOG(DEBUG, ProviderClickHouse) << "Settings: clusterName = " << cluster.GetName()
                     << ", clusterDbId = "  << cluster.GetId() << ", cluster.GetCluster(): " << cluster.GetCluster() << ", HasCluster: " << (cluster.HasCluster() ? "TRUE" : "FALSE") ;
-                dbResolver->TryAddDbIdToResolve(cluster.HasCluster(), cluster.GetName(), cluster.GetId(), NYq::DatabaseType::ClickHouse, databaseIds);
+                databaseIds[std::make_pair(cluster.GetId(), NYq::DatabaseType::ClickHouse)] =
+                    NYq::TEvents::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false};
                 if (cluster.GetId()) {
                     DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName());
                     YQL_CLOG(DEBUG, ProviderClickHouse) << "Add dbId: " << cluster.GetId() << " to DbId2Clusters";

+ 2 - 6
ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h

@@ -5,14 +5,10 @@
 
 namespace NYq {
 
-struct TResolveParams {
-    THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth> Ids;
-    TString TraceId;
-};
-
 class IDatabaseAsyncResolver {
 public:
-    virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const = 0;
+    virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(
+        const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const = 0;
 
     virtual ~IDatabaseAsyncResolver() = default;
 };

+ 11 - 6
ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp

@@ -1,22 +1,27 @@
 #include "db_async_resolver_impl.h"
 
 namespace NYq {
+using namespace NThreading;
 
-TDatabaseAsyncResolver::TDatabaseAsyncResolver(
+TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl(
     NActors::TActorSystem* actorSystem,
     const NActors::TActorId& recipient,
     const TString& ydbMvpEndpoint,
     const TString& mdbGateway,
-    const bool mdbTransformHost)
+    bool mdbTransformHost,
+    const TString& traceId)
     : ActorSystem(actorSystem)
     , Recipient(recipient)
     , YdbMvpEndpoint(ydbMvpEndpoint)
     , MdbGateway(mdbGateway)
     , MdbTransformHost(mdbTransformHost)
+    , TraceId(traceId)
 {}
 
-NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolver::ResolveIds(const TResolveParams& params) const {
-    auto promise = NThreading::NewPromise<TEvents::TDbResolverResponse>();
+TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(
+    const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const
+{
+    auto promise = NewPromise<TEvents::TDbResolverResponse>();
     TDuration timeout = TDuration::Seconds(40);
     auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>(
         [promise] (TAutoPtr<NActors::TEventHandle<TEvents::TEvEndpointResponse>>& event) mutable {
@@ -31,8 +36,8 @@ NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolver::Resolv
     NActors::TActorId callbackId = ActorSystem->Register(callback.Release());
 
     ActorSystem->Send(new NActors::IEventHandle(Recipient, callbackId,
-        new TEvents::TEvEndpointRequest(params.Ids, YdbMvpEndpoint, MdbGateway,
-            params.TraceId, MdbTransformHost)));
+        new TEvents::TEvEndpointRequest(ids, YdbMvpEndpoint, MdbGateway,
+            TraceId, MdbTransformHost)));
     return promise.GetFuture();
 }
 

+ 7 - 4
ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h

@@ -4,23 +4,26 @@
 
 namespace NYq {
 
-class TDatabaseAsyncResolver : public IDatabaseAsyncResolver {
+class TDatabaseAsyncResolverImpl : public IDatabaseAsyncResolver {
 public:
-    TDatabaseAsyncResolver(
+    TDatabaseAsyncResolverImpl(
         NActors::TActorSystem* actorSystem,
         const NActors::TActorId& recipient,
         const TString& ydbMvpEndpoint,
         const TString& mdbGateway,
-        const bool mdbTransformHost
+        bool mdbTransformHost = false,
+        const TString& traceId = ""
     );
 
-    NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const override;
+    NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(
+        const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const override;
 private:
     NActors::TActorSystem* ActorSystem;
     const NActors::TActorId Recipient;
     const TString YdbMvpEndpoint;
     const TString MdbGateway;
     const bool MdbTransformHost = false;
+    const TString TraceId;
 };
 
 } // NYq

+ 0 - 72
ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.cpp

@@ -1,72 +0,0 @@
-#include "db_async_resolver_with_meta.h"
-#include <ydb/core/yq/libs/common/database_token_builder.h>
-
-namespace NYq {
-
-    TDatabaseAsyncResolverWithMeta::TDatabaseAsyncResolverWithMeta(
-        NActors::TActorSystem* actorSystem,
-        const NActors::TActorId& recipient,
-        const TString& ydbMvpEndpoint,
-        const TString& mdbGateway,
-        const bool mdbTransformHost,
-        const TString& traceId,
-        const TString& token,
-        const THashMap<TString, TString>& accountIdSignatures,
-        const THashMap<TString, YandexQuery::Connection>& connections)
-        : DbResolver(actorSystem, recipient, ydbMvpEndpoint, mdbGateway, mdbTransformHost)
-        , TraceId(traceId)
-        , Token(token)
-        , AccountIdSignatures(accountIdSignatures)
-        , Connections(connections)
-    {}
-
-    NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverWithMeta::ResolveIds(const TResolveParams& params) const {
-        return DbResolver.ResolveIds(params);
-    }
-
-    TString TDatabaseAsyncResolverWithMeta::GetTraceId() const {
-        return TraceId;
-    }
-
-    TString TDatabaseAsyncResolverWithMeta::GetToken() const {
-        return Token;
-    }
-
-    const THashMap<TString, TString>& TDatabaseAsyncResolverWithMeta::GetAccountIdSignatures() const {
-        return AccountIdSignatures;
-    }
-
-    void TDatabaseAsyncResolverWithMeta::TryAddDbIdToResolve(
-        const bool isEndpoint,
-        const TString& clusterName,
-        const TString& dbId,
-        const DatabaseType type,
-        THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& databaseIds) const {
-            if (isEndpoint) {
-                return;
-            }
-            const auto iter = Connections.find(clusterName);
-            if (iter == Connections.end()) {
-                return;
-            }
-            const auto& conn = iter->second;
-            const auto& setting = conn.content().setting();
-            YandexQuery::IamAuth auth;
-            switch (type) {
-            case DatabaseType::Ydb:
-                auth = setting.ydb_database().auth();
-                break;
-            case DatabaseType::ClickHouse:
-                auth = setting.clickhouse_cluster().auth();
-                break;
-            case DatabaseType::DataStreams:
-                auth = setting.data_streams().auth();
-                break;
-            default:
-                return;
-            }
-            TryAddDatabaseToResolve(auth, dbId, type, Token, AccountIdSignatures, databaseIds);
-        }
-
-} // NYq
-

Some files were not shown because too many files changed in this diff