Browse Source

YQ-911 Use YDB driver from YQ shared resources in internal YDB connections for CP and checkpointing

Use CaCert setting for client

Refactor YDB connection

Refactor ydb lib

Split two drivers

ref:b529cad63e7e5aa22cb8d32f77f150e75135f71a
Vasily Gerasimov 3 years ago
parent
commit
3db4a2c736

+ 1 - 0
ydb/core/testlib/CMakeLists.txt

@@ -62,6 +62,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
   libs-audit-mock
   yq-libs-init
   yq-libs-mock
+  yq-libs-shared_resources
   ydb-library-aclib
   library-folder_service-mock
   library-mkql_proto-protos

+ 7 - 3
ydb/core/testlib/test_client.cpp

@@ -804,7 +804,7 @@ namespace Tests {
 
             const auto ydbCredFactory = NKikimr::CreateYdbCredentialsProviderFactory;
             auto counters = MakeIntrusive<NMonitoring::TDynamicCounters>();
-            auto yqSharedResources = NYq::CreateYqSharedResources(protoConfig, ydbCredFactory, counters);
+            YqSharedResources = NYq::CreateYqSharedResources(protoConfig, ydbCredFactory, counters);
             NYq::Init(
                 protoConfig,
                 Runtime->GetNodeId(nodeIdx),
@@ -812,13 +812,13 @@ namespace Tests {
                 &appData,
                 "TestTenant",
                 nullptr, // MakeIntrusive<NPq::NConfigurationManager::TConnections>(),
-                yqSharedResources,
+                YqSharedResources,
                 NKikimr::NFolderService::CreateMockFolderServiceActor,
                 NYq::CreateMockYqAuditServiceActor,
                 ydbCredFactory,
                 /*IcPort = */0
                 );
-            NYq::InitTest(Runtime.Get(), port, Settings->GrpcPort, yqSharedResources);
+            NYq::InitTest(Runtime.Get(), port, Settings->GrpcPort, YqSharedResources);
         }
     }
 
@@ -885,6 +885,10 @@ namespace Tests {
             GRpcServer->Stop();
         }
 
+        if (YqSharedResources) {
+            YqSharedResources->Stop();
+        }
+
         if (Runtime) {
             Runtime.Destroy();
         }

+ 2 - 0
ydb/core/testlib/test_client.h

@@ -24,6 +24,7 @@
 #include <ydb/core/kesus/tablet/events.h>
 #include <ydb/core/security/ticket_parser.h>
 #include <ydb/core/base/grpc_service_factory.h>
+#include <ydb/core/yq/libs/shared_resources/interface/shared_resources.h>
 
 #include <google/protobuf/text_format.h>
 
@@ -260,6 +261,7 @@ namespace Tests {
         TAutoPtr<NMsgBusProxy::IMessageBusServer> BusServer;
         std::unique_ptr<NGrpc::TGRpcServer> GRpcServer;
         TIntrusivePtr<NMonitoring::TDynamicCounters> GRpcServerRootCounters;
+        NYq::IYqSharedResources::TPtr YqSharedResources;
     };
 
     class TClient {

+ 1 - 1
ydb/core/yq/libs/actors/nodes_manager.cpp

@@ -73,7 +73,7 @@ public:
         , YqSharedResources(yqSharedResources)
         , IcPort(icPort)
         , Client(
-            YqSharedResources->YdbDriver,
+            YqSharedResources->CoreYdbDriver,
             NYdb::TCommonClientSettings()
                 .DiscoveryEndpoint(PrivateApiConfig.GetTaskServiceEndpoint())
                 .EnableSsl(PrivateApiConfig.GetSecureTaskService())

+ 2 - 2
ydb/core/yq/libs/actors/pending_fetcher.cpp

@@ -160,7 +160,7 @@ public:
         , Guid(CreateGuidAsString())
         , ClientCounters(clientCounters)
         , Client(
-            YqSharedResources->YdbDriver,
+            YqSharedResources->CoreYdbDriver,
             NYdb::TCommonClientSettings()
                 .DiscoveryEndpoint(PrivateApiConfig.GetTaskServiceEndpoint())
                 .EnableSsl(PrivateApiConfig.GetSecureTaskService())
@@ -336,7 +336,7 @@ private:
         const auto createdAt = TInstant::Now();
 
         TRunActorParams params(
-            YqSharedResources->YdbDriver, S3Gateway,
+            YqSharedResources, S3Gateway,
             FunctionRegistry, RandomProvider,
             ModuleResolver, ModuleResolver->GetNextUniqueId(),
             DqCompFactory, PqCmConnections,

+ 6 - 6
ydb/core/yq/libs/actors/run_actor.cpp

@@ -113,7 +113,7 @@ public:
                 Params.QueryId,
                 Params.Owner,
                 TPrivateClient(
-                    Params.Driver,
+                    Params.YqSharedResources->CoreYdbDriver,
                     NYdb::TCommonClientSettings()
                     .DiscoveryEndpoint(Params.PrivateApiConfig.GetTaskServiceEndpoint())
                     .EnableSsl(Params.PrivateApiConfig.GetSecureTaskService())
@@ -412,7 +412,7 @@ private:
                     ::NYq::MakeReadRuleCreatorActor(
                         SelfId(),
                         Params.QueryId,
-                        Params.Driver,
+                        Params.YqSharedResources->UserSpaceYdbDriver,
                         std::move(TopicsForConsumersCreation),
                         std::move(CredentialsForConsumersCreation)
                     )
@@ -771,7 +771,7 @@ private:
             ::NYq::MakeReadRuleDeleterActor(
                 SelfId(),
                 Params.QueryId,
-                Params.Driver,
+                Params.YqSharedResources->UserSpaceYdbDriver,
                 Params.CreatedTopicConsumers,
                 std::move(credentials)
             )
@@ -832,7 +832,7 @@ private:
             }
             resultId = NActors::TActivationContext::Register(
                     CreateResultWriter(
-                        Params.Driver, ExecuterId, dqGraphParams.GetResultType(), Params.PrivateApiConfig,
+                        Params.YqSharedResources->UserSpaceYdbDriver, ExecuterId, dqGraphParams.GetResultType(), Params.PrivateApiConfig,
                         writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ClientCounters));
         } else {
             LOG_D("ResultWriter was NOT CREATED since ResultType is empty");
@@ -1096,7 +1096,7 @@ private:
         }
 
         {
-            dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.Driver, Params.CredentialsFactory, dbResolver));
+            dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
         }
 
         {
@@ -1109,7 +1109,7 @@ private:
 
         {
             NYql::TPqGatewayServices pqServices(
-                Params.Driver,
+                Params.YqSharedResources->UserSpaceYdbDriver,
                 Params.PqCmConnections,
                 Params.CredentialsFactory,
                 std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()),

+ 2 - 2
ydb/core/yq/libs/actors/run_actor_params.cpp

@@ -5,7 +5,7 @@ namespace NYq {
 using namespace NActors;
 
 TRunActorParams::TRunActorParams(
-    NYdb::TDriver driver,
+    TYqSharedResources::TPtr yqSharedResources,
     NYql::IHTTPGateway::TPtr s3Gateway,
     const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
     TIntrusivePtr<IRandomProvider> randomProvider,
@@ -48,7 +48,7 @@ TRunActorParams::TRunActorParams(
     TInstant createdAt,
     const TString& tenantName
     )
-    : Driver(driver)
+    : YqSharedResources(yqSharedResources)
     , S3Gateway(s3Gateway)
     , FunctionRegistry(functionRegistry)
     , RandomProvider(randomProvider)

+ 3 - 2
ydb/core/yq/libs/actors/run_actor_params.h

@@ -3,6 +3,7 @@
 #include <ydb/core/yq/libs/config/protos/pinger.pb.h>
 #include <ydb/core/yq/libs/config/protos/yq_config.pb.h>
 #include <ydb/core/yq/libs/events/events.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
 
 #include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
 #include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
@@ -19,7 +20,7 @@ namespace NYq {
 
 struct TRunActorParams { // TODO2 : Change name
     TRunActorParams(
-        NYdb::TDriver driver,
+        TYqSharedResources::TPtr yqSharedResources,
         NYql::IHTTPGateway::TPtr s3Gateway,
         const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
         TIntrusivePtr<IRandomProvider> randomProvider,
@@ -66,7 +67,7 @@ struct TRunActorParams { // TODO2 : Change name
     TRunActorParams(const TRunActorParams& params) = default;
     TRunActorParams(TRunActorParams&& params) = default;
 
-    NYdb::TDriver Driver;
+    TYqSharedResources::TPtr YqSharedResources;
     NYql::IHTTPGateway::TPtr S3Gateway;
     const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
     TIntrusivePtr<IRandomProvider> RandomProvider;

+ 1 - 0
ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt

@@ -22,6 +22,7 @@ target_link_libraries(yq-libs-checkpoint_storage PUBLIC
   libs-checkpoint_storage-events
   libs-checkpoint_storage-proto
   yq-libs-checkpointing_common
+  yq-libs-shared_resources
   ydb-library-security
   cpp-client-ydb_scheme
   cpp-client-ydb_table

+ 12 - 7
ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp

@@ -41,12 +41,14 @@ class TStorageProxy : public TActorBootstrapped<TStorageProxy> {
     TStateStoragePtr StateStorage;
     TActorId ActorGC;
     NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
+    TYqSharedResources::TPtr YqSharedResources;
 
 public:
     explicit TStorageProxy(
         const NConfig::TCheckpointCoordinatorConfig& config,
         const NConfig::TCommonConfig& commonConfig,
-        const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+        const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+        const TYqSharedResources::TPtr& yqSharedResources);
 
     void Bootstrap();
 
@@ -100,22 +102,24 @@ static void FillDefaultParameters(NConfig::TCheckpointCoordinatorConfig& checkpo
 TStorageProxy::TStorageProxy(
     const NConfig::TCheckpointCoordinatorConfig& config,
     const NConfig::TCommonConfig& commonConfig,
-    const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+    const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+    const TYqSharedResources::TPtr& yqSharedResources)
     : Config(config)
     , CommonConfig(commonConfig)
     , StorageConfig(Config.GetStorage())
-    , CredentialsProviderFactory(credentialsProviderFactory) {
+    , CredentialsProviderFactory(credentialsProviderFactory)
+    , YqSharedResources(yqSharedResources) {
     FillDefaultParameters(Config, StorageConfig);
 }
 
 void TStorageProxy::Bootstrap() {
-    CheckpointStorage = NewYdbCheckpointStorage(StorageConfig, CredentialsProviderFactory, CreateEntityIdGenerator(CommonConfig.GetIdsPrefix()));
+    CheckpointStorage = NewYdbCheckpointStorage(StorageConfig, CredentialsProviderFactory, CreateEntityIdGenerator(CommonConfig.GetIdsPrefix()), YqSharedResources);
     auto issues = CheckpointStorage->Init().GetValueSync();
     if (!issues.Empty()) {
         LOG_STREAMS_STORAGE_SERVICE_ERROR("Failed to init checkpoint storage: " << issues.ToOneLineString());
     }
 
-    StateStorage = NewYdbStateStorage(StorageConfig, CredentialsProviderFactory);
+    StateStorage = NewYdbStateStorage(StorageConfig, CredentialsProviderFactory, YqSharedResources);
     issues = StateStorage->Init().GetValueSync();
     if (!issues.Empty()) {
         LOG_STREAMS_STORAGE_SERVICE_ERROR("Failed to init checkpoint state storage: " << issues.ToOneLineString());
@@ -393,9 +397,10 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
 std::unique_ptr<NActors::IActor> NewStorageProxy(
     const NConfig::TCheckpointCoordinatorConfig& config,
     const NConfig::TCommonConfig& commonConfig,
-    const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+    const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+    const TYqSharedResources::TPtr& yqSharedResources)
 {
-    return std::unique_ptr<NActors::IActor>(new TStorageProxy(config, commonConfig, credentialsProviderFactory));
+    return std::unique_ptr<NActors::IActor>(new TStorageProxy(config, commonConfig, credentialsProviderFactory, yqSharedResources));
 }
 
 } // namespace NYq

Some files were not shown because too many files changed in this diff