Просмотр исходного кода

YT-20315: Support retries of cross cell copying

add options

YT-20315: Support retries of cross cell copying
nadya02 1 год назад
Родитель
Сommit
b04e3c5ba7

+ 37 - 2
yt/cpp/mapreduce/client/client.cpp

@@ -25,6 +25,7 @@
 
 #include <yt/cpp/mapreduce/interface/config.h>
 #include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/error_codes.h>
 #include <yt/cpp/mapreduce/interface/fluent.h>
 #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
 #include <yt/cpp/mapreduce/interface/skiff_row.h>
@@ -132,7 +133,24 @@ TNodeId TClientBase::Copy(
     const TYPath& destinationPath,
     const TCopyOptions& options)
 {
-    return NRawClient::Copy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
+    try {
+        return NRawClient::CopyInsideMasterCell(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
+    } catch (const TErrorResponse& e) {
+        if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
+            // Do transaction for cross cell copying.
+
+            std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
+                return NRawClient::CopyWithoutRetries(Context_, transaction->GetId(), sourcePath, destinationPath, options);
+            };
+            return RetryTransactionWithPolicy<TNodeId>(
+                this,
+                lambda,
+                ClientRetryPolicy_->CreatePolicyForGenericRequest()
+            );
+        } else {
+            throw;
+        }
+    }
 }
 
 TNodeId TClientBase::Move(
@@ -140,7 +158,24 @@ TNodeId TClientBase::Move(
     const TYPath& destinationPath,
     const TMoveOptions& options)
 {
-    return NRawClient::Move(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
+    try {
+        return NRawClient::MoveInsideMasterCell(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
+    } catch (const TErrorResponse& e) {
+        if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
+            // Do transaction for cross cell moving.
+
+            std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
+                return NRawClient::MoveWithoutRetries(Context_, transaction->GetId(), sourcePath, destinationPath, options);
+            };
+            return RetryTransactionWithPolicy<TNodeId>(
+                this,
+                lambda,
+                ClientRetryPolicy_->CreatePolicyForGenericRequest()
+            );
+        } else {
+            throw;
+        }
+    }
 }
 
 TNodeId TClientBase::Link(

+ 1 - 0
yt/cpp/mapreduce/interface/client_method_options.h

@@ -36,6 +36,7 @@ enum ENodeType : int
     NT_SCHEDULER_POOL       /* "scheduler_pool" */,
     NT_LINK                 /* "link" */,
     NT_GROUP                /* "group" */,
+    NT_PORTAL               /* "portal_entrance" */,
 };
 
 ///

+ 1 - 0
yt/cpp/mapreduce/interface/error_codes.h

@@ -236,6 +236,7 @@ namespace NObjectClient {
 ////////////////////////////////////////////////////////////////////////////////
 
     constexpr int PrerequisiteCheckFailed = 1000;
+    constexpr int CrossCellAdditionalPath = 1002;
 
 ////////////////////////////////////////////////////////////////////////////////
 

+ 38 - 4
yt/cpp/mapreduce/raw_client/raw_requests.cpp

@@ -165,8 +165,7 @@ TNodeId Create(
     return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
 }
 
-TNodeId Copy(
-    const IRequestRetryPolicyPtr& retryPolicy,
+TNodeId CopyWithoutRetries(
     const TClientContext& context,
     const TTransactionId& transactionId,
     const TYPath& sourcePath,
@@ -176,10 +175,41 @@ TNodeId Copy(
     THttpHeader header("POST", "copy");
     header.AddMutationId();
     header.MergeParameters(SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options));
+    return ParseGuidFromResponse(RequestWithoutRetry(context, header).Response);
+}
+
+TNodeId CopyInsideMasterCell(
+    const IRequestRetryPolicyPtr& retryPolicy,
+    const TClientContext& context,
+    const TTransactionId& transactionId,
+    const TYPath& sourcePath,
+    const TYPath& destinationPath,
+    const TCopyOptions& options)
+{
+    THttpHeader header("POST", "copy");
+    header.AddMutationId();
+    auto params = SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options);
+
+    // Make cross cell copying disable.
+    params["enable_cross_cell_copying"] = false;
+    header.MergeParameters(params);
     return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
 }
 
-TNodeId Move(
+TNodeId MoveWithoutRetries(
+    const TClientContext& context,
+    const TTransactionId& transactionId,
+    const TYPath& sourcePath,
+    const TYPath& destinationPath,
+    const TMoveOptions& options)
+{
+    THttpHeader header("POST", "move");
+    header.AddMutationId();
+    header.MergeParameters(SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options));
+    return ParseGuidFromResponse(RequestWithoutRetry( context, header).Response);
+}
+
+TNodeId MoveInsideMasterCell(
     const IRequestRetryPolicyPtr& retryPolicy,
     const TClientContext& context,
     const TTransactionId& transactionId,
@@ -189,7 +219,11 @@ TNodeId Move(
 {
     THttpHeader header("POST", "move");
     header.AddMutationId();
-    header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options));
+    auto params = SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options);
+
+    // Make cross cell copying disable.
+    params["enable_cross_cell_copying"] = false;
+    header.MergeParameters(params);
     return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
 }
 

+ 16 - 2
yt/cpp/mapreduce/raw_client/raw_requests.h

@@ -84,7 +84,14 @@ TNodeId Create(
     const ENodeType& type,
     const TCreateOptions& options = TCreateOptions());
 
-TNodeId Copy(
+TNodeId CopyWithoutRetries(
+    const TClientContext& context,
+    const TTransactionId& transactionId,
+    const TYPath& sourcePath,
+    const TYPath& destinationPath,
+    const TCopyOptions& options = TCopyOptions());
+
+TNodeId CopyInsideMasterCell(
     const IRequestRetryPolicyPtr& retryPolicy,
     const TClientContext& context,
     const TTransactionId& transactionId,
@@ -92,7 +99,14 @@ TNodeId Copy(
     const TYPath& destinationPath,
     const TCopyOptions& options = TCopyOptions());
 
-TNodeId Move(
+TNodeId MoveWithoutRetries(
+    const TClientContext& context,
+    const TTransactionId& transactionId,
+    const TYPath& sourcePath,
+    const TYPath& destinationPath,
+    const TMoveOptions& options = TMoveOptions());
+
+TNodeId MoveInsideMasterCell(
     const IRequestRetryPolicyPtr& retryPolicy,
     const TClientContext& context,
     const TTransactionId& transactionId,