Просмотр исходного кода

Pass AddressResolver config to DQ worker (#7251)

Aleksandr Gaev 7 месяцев назад
Родитель
Сommit
cf21471051

+ 4 - 0
ydb/library/yql/providers/dq/actors/yt/resource_manager.h

@@ -10,11 +10,13 @@
 
 #include <library/cpp/threading/future/future.h>
 #include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/yt/yson_string/string.h>
 
 namespace NYql {
     namespace NCommonJobVars {
         extern const TString ACTOR_PORT;
         extern const TString ACTOR_NODE_ID;
+        extern const TString ADDRESS_RESOLVER_CONFIG;
         extern const TString UDFS_PATH;
         extern const TString OPERATION_SIZE;
         extern const TString YT_COORDINATOR;
@@ -71,6 +73,8 @@ namespace NYql {
 
         bool ForceIPv4 = false;
 
+        NYT::NYson::TYsonString AddressResolverConfig;
+
         // Pinger
         TString DieOnFileAbsence; // see YQL-14099
 

+ 2 - 0
ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp

@@ -29,6 +29,7 @@ namespace NYql {
     namespace NCommonJobVars {
         const TString ACTOR_PORT("ACTOR_PORT");
         const TString ACTOR_NODE_ID("ACTOR_NODE_ID");
+        const TString ADDRESS_RESOLVER_CONFIG("ADDRESS_RESOLVER_CONFIG");
         const TString UDFS_PATH("UDFS_PATH");
         const TString OPERATION_SIZE("OPERATION_SIZE");
         const TString YT_COORDINATOR("YT_COORDINATOR");
@@ -655,6 +656,7 @@ namespace NYql {
                                                 .Item(NCommonJobVars::OPERATION_SIZE).Value(ToString(nodes.size()))
                                                 .Item(NCommonJobVars::UDFS_PATH).Value(fileCache)
                                                 .Item(NCommonJobVars::ACTOR_NODE_ID).Value(ToString(nodeId))
+                                                .Item(NCommonJobVars::ADDRESS_RESOLVER_CONFIG).Value(ToString(NYT::NYson::ConvertToYsonString(Options.AddressResolverConfig, NYT::NYson::EYsonFormat::Text)))
                                                 .DoIf(!!GetEnv("YQL_DETERMINISTIC_MODE"), [&](NYT::TFluentMap fluent) {
                                                     fluent.Item("YQL_DETERMINISTIC_MODE").Value("1");
                                                 })

+ 4 - 7
ydb/library/yql/tools/dq/worker_job/dq_worker.cpp

@@ -171,8 +171,8 @@ namespace NYql::NDq::NWorker {
 
         TString fileCacheDir = GetEnv(NCommonJobVars::UDFS_PATH);
         TString ytCoordinatorStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_COORDINATOR);
-
         TString ytBackendStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_BACKEND);
+        NYT::NYson::TYsonString addressResolverYson = NYT::NYson::TYsonString(GetEnv(NCommonJobVars::ADDRESS_RESOLVER_CONFIG));
 
         TString operationId = GetEnv("YT_OPERATION_ID");
         TString jobId = GetEnv("YT_JOB_ID");
@@ -190,13 +190,10 @@ namespace NYql::NDq::NWorker {
         TRangeWalker<int> portWalker(startPort, startPort+100);
         auto ports = BindInRange(portWalker);
 
+        auto addressResolverConfig = NYT::NYTree::ConvertTo<NYT::NNet::TAddressResolverConfigPtr>(addressResolverYson);
+        NYT::NNet::TAddressResolver::Get()->Configure(addressResolverConfig);
+
         auto forceIPv4 = IsTrue(GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_FORCE_IPV4, ""));
-        if (forceIPv4) {
-            auto config = NYT::New<NYT::NNet::TAddressResolverConfig>();
-            config->EnableIPv4 = true;
-            config->EnableIPv6 = false;
-            NYT::NNet::TAddressResolver::Get()->Configure(config);
-        }
 
         auto [host, ip] = NYql::NDqs::GetLocalAddress(
             coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr,

+ 3 - 0
ydb/library/yql/yt/native/dq_manager.cpp

@@ -35,6 +35,8 @@ void TDqManagerConfig::Register(TRegistrar registrar)
         .GreaterThan(0);
     registrar.Parameter("use_ipv4", &TThis::UseIPv4)
         .Default(false);
+    registrar.Parameter("address_resolver", &TThis::AddressResolver)
+        .Default();
 
     registrar.Parameter("yt_backends", &TThis::YtBackends)
         .NonEmpty();
@@ -237,6 +239,7 @@ void TDqManager::Start()
             rmOptions.UploadPrefix = rmOptions.YtBackend.GetUploadPrefix() + "/bin/" + ToString(GetProgramCommitId());
             rmOptions.Counters = MetricsRegistry_->GetSensors()->GetSubgroup("counters", "ytrm")->GetSubgroup("ytname", rmOptions.YtBackend.GetClusterName());
             rmOptions.ForceIPv4 = Config_->UseIPv4;
+            rmOptions.AddressResolverConfig = ConvertToYsonString(Config_->AddressResolver, EYsonFormat::Text);
             ActorSystem_->Register(CreateResourceManager(rmOptions, Coordinator_));
         }
         rmOptions.UploadPrefix = rmOptions.YtBackend.GetUploadPrefix();

+ 1 - 0
ydb/library/yql/yt/native/dq_manager.h

@@ -31,6 +31,7 @@ struct TDqManagerConfig
     NYTree::INodePtr YtCoordinator;
     NYTree::INodePtr Scheduler;
     NYTree::INodePtr ICSettings;
+    NYTree::INodePtr AddressResolver;
 
     TMap<TString, TString> UdfsWithMd5; // autofilled by yql_plugin
     NYql::TFileStoragePtr FileStorage; // autofilled by yql_plugin