Browse Source

[yt/cpp/mapreduce] YT-23616: Move rest of Cypress methods to THttpRawClient
commit_hash:2d705aa0369f1425152547803e0fcf80780e1bf6

hiddenpath 2 months ago
parent
commit
17c15e9fcc

+ 61 - 35
yt/cpp/mapreduce/client/client.cpp

@@ -9,7 +9,6 @@
 #include "init.h"
 #include "lock.h"
 #include "operation.h"
-#include "retry_transaction.h"
 #include "retryful_writer.h"
 #include "transaction.h"
 #include "transaction_pinger.h"
@@ -123,7 +122,11 @@ void TClientBase::Remove(
     const TYPath& path,
     const TRemoveOptions& options)
 {
-    return NRawClient::Remove(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
+    RequestWithRetry<void>(
+        ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+        [this, &path, &options] (TMutationId& mutationId) {
+            RawClient_->Remove(mutationId, TransactionId_, path, options);
+        });
 }
 
 bool TClientBase::Exists(
@@ -176,7 +179,11 @@ TNode::TListType TClientBase::List(
     const TYPath& path,
     const TListOptions& options)
 {
-    return NRawClient::List(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
+    return RequestWithRetry<TNode::TListType>(
+        ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+        [this, &path, &options] (TMutationId /*mutationId*/) {
+            return RawClient_->List(TransactionId_, path, options);
+        });
 }
 
 TNodeId TClientBase::Copy(
@@ -193,16 +200,14 @@ TNodeId TClientBase::Copy(
     } catch (const TErrorResponse& e) {
         if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
             // Do transaction for cross cell copying.
-
-            std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
-                TMutationId mutationId;
-                return RawClient_->CopyWithoutRetries(mutationId, transaction->GetId(), sourcePath, destinationPath, options);
-            };
-            return RetryTransactionWithPolicy<TNodeId>(
-                this,
-                lambda,
-                ClientRetryPolicy_->CreatePolicyForGenericRequest()
-            );
+            return RequestWithRetry<TNodeId>(
+                ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+                [this, &sourcePath, &destinationPath, &options] (TMutationId /*mutationId*/) {
+                    auto transaction = StartTransaction(TStartTransactionOptions());
+                    auto nodeId = RawClient_->CopyWithoutRetries(transaction->GetId(), sourcePath, destinationPath, options);
+                    transaction->Commit();
+                    return nodeId;
+                });
         } else {
             throw;
         }
@@ -215,19 +220,22 @@ TNodeId TClientBase::Move(
     const TMoveOptions& options)
 {
     try {
-        return NRawClient::MoveInsideMasterCell(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
+        return RequestWithRetry<TNodeId>(
+            ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+            [this, &sourcePath, &destinationPath, &options] (TMutationId& mutationId) {
+                return RawClient_->MoveInsideMasterCell(mutationId, TransactionId_, sourcePath, destinationPath, options);
+            });
     } catch (const TErrorResponse& e) {
         if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
             // Do transaction for cross cell moving.
-
-            std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
-                return NRawClient::MoveWithoutRetries(Context_, transaction->GetId(), sourcePath, destinationPath, options);
-            };
-            return RetryTransactionWithPolicy<TNodeId>(
-                this,
-                lambda,
-                ClientRetryPolicy_->CreatePolicyForGenericRequest()
-            );
+            return RequestWithRetry<TNodeId>(
+                ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+                [this, &sourcePath, &destinationPath, &options] (TMutationId /*mutationId*/) {
+                    auto transaction = StartTransaction(TStartTransactionOptions());
+                    auto nodeId = RawClient_->MoveWithoutRetries(transaction->GetId(), sourcePath, destinationPath, options);
+                    transaction->Commit();
+                    return nodeId;
+                });
         } else {
             throw;
         }
@@ -239,7 +247,11 @@ TNodeId TClientBase::Link(
     const TYPath& linkPath,
     const TLinkOptions& options)
 {
-    return NRawClient::Link(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, targetPath, linkPath, options);
+    return RequestWithRetry<TNodeId>(
+        ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+        [this, &targetPath, &linkPath, &options] (TMutationId& mutationId) {
+            return RawClient_->Link(mutationId, TransactionId_, targetPath, linkPath, options);
+        });
 }
 
 void TClientBase::Concatenate(
@@ -247,15 +259,21 @@ void TClientBase::Concatenate(
     const TRichYPath& destinationPath,
     const TConcatenateOptions& options)
 {
-    std::function<void(ITransactionPtr)> lambda = [&sourcePaths, &destinationPath, &options, this](ITransactionPtr transaction) {
-        if (!options.Append_ && !sourcePaths.empty() && !transaction->Exists(destinationPath.Path_)) {
-            auto typeNode = transaction->Get(CanonizeYPath(sourcePaths.front()).Path_ + "/@type");
-            auto type = FromString<ENodeType>(typeNode.AsString());
-            transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
-        }
-        NRawClient::Concatenate(this->Context_, transaction->GetId(), sourcePaths, destinationPath, options);
-    };
-    RetryTransactionWithPolicy(this, lambda, ClientRetryPolicy_->CreatePolicyForGenericRequest());
+    RequestWithRetry<void>(
+        ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+        [this, &sourcePaths, &destinationPath, &options] (TMutationId /*mutationId*/) {
+            auto transaction = StartTransaction(TStartTransactionOptions());
+
+            if (!options.Append_ && !sourcePaths.empty() && !transaction->Exists(destinationPath.Path_)) {
+                auto typeNode = transaction->Get(CanonizeYPath(sourcePaths.front()).Path_ + "/@type");
+                auto type = FromString<ENodeType>(typeNode.AsString());
+                transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
+            }
+
+            RawClient_->Concatenate(transaction->GetId(), sourcePaths, destinationPath, options);
+
+            transaction->Commit();
+        });
 }
 
 TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)
@@ -947,7 +965,11 @@ ILockPtr TTransaction::Lock(
     ELockMode mode,
     const TLockOptions& options)
 {
-    auto lockId = NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, mode, options);
+    auto lockId = RequestWithRetry<TLockId>(
+        ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+        [this, &path, &mode, &options] (TMutationId& mutationId) {
+            return RawClient_->Lock(mutationId, TransactionId_, path, mode, options);
+        });
     return ::MakeIntrusive<TLock>(lockId, GetParentClientImpl(), options.Waitable_);
 }
 
@@ -955,7 +977,11 @@ void TTransaction::Unlock(
     const TYPath& path,
     const TUnlockOptions& options)
 {
-    NRawClient::Unlock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
+    RequestWithRetry<void>(
+        ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+        [this, &path, &options] (TMutationId& mutationId) {
+            RawClient_->Unlock(mutationId, TransactionId_, path, options);
+        });
 }
 
 void TTransaction::Commit()

+ 0 - 2
yt/cpp/mapreduce/client/client.h

@@ -332,8 +332,6 @@ protected:
     TClientPtr GetParentClientImpl() override;
 
 private:
-    const IRawClientPtr RawClient_;
-
     ITransactionPingerPtr TransactionPinger_;
     THolder<TPingableTransaction> PingableTx_;
     TClientPtr ParentClient_;

+ 0 - 1
yt/cpp/mapreduce/client/client_reader.cpp

@@ -68,7 +68,6 @@ TClientReader::TClientReader(
         Path_.Path(Snapshot(
             RawClient_,
             ClientRetryPolicy_,
-            Context_,
             ReadTransaction_->GetId(),
             path.Path_));
     }

+ 1 - 1
yt/cpp/mapreduce/client/file_reader.cpp

@@ -61,7 +61,7 @@ TStreamReaderBase::~TStreamReaderBase() = default;
 
 TYPath TStreamReaderBase::Snapshot(const TYPath& path)
 {
-    return NYT::Snapshot(RawClient_, ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path);
+    return NYT::Snapshot(RawClient_, ClientRetryPolicy_, ReadTransaction_->GetId(), path);
 }
 
 TString TStreamReaderBase::GetActiveRequestId() const

+ 4 - 5
yt/cpp/mapreduce/client/operation_preparer.cpp

@@ -559,12 +559,11 @@ TString TJobPreparer::PutFileToCypressCache(
         GetCachePath(),
         putFileToCacheOptions);
 
-    Remove(
+    RequestWithRetry<void>(
         OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
-        OperationPreparer_.GetContext(),
-        transactionId,
-        path,
-        TRemoveOptions().Force(true));
+        [this, &transactionId, &path] (TMutationId& mutationId) {
+            RawClient_->Remove(mutationId, transactionId, path, TRemoveOptions().Force(true));
+        });
 
     return cachePath;
 }

+ 0 - 1
yt/cpp/mapreduce/client/retry_transaction.cpp

@@ -1 +0,0 @@
-#include "retry_transaction.h"

+ 0 - 71
yt/cpp/mapreduce/client/retry_transaction.h

@@ -1,71 +0,0 @@
-#pragma once
-
-#include <yt/cpp/mapreduce/http/retry_request.h>
-
-#include <yt/cpp/mapreduce/client/client.h>
-
-#include <yt/cpp/mapreduce/common/wait_proxy.h>
-#include <yt/cpp/mapreduce/common/retry_lib.h>
-
-#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
-
-namespace NYT::NDetail {
-
-template <typename TResult>
-TResult RetryTransactionWithPolicy(
-    const TClientBasePtr& client,
-    std::function<TResult(ITransactionPtr)> func,
-    IRequestRetryPolicyPtr retryPolicy)
-{
-    if (!retryPolicy) {
-        retryPolicy = CreateDefaultRequestRetryPolicy(client->GetContext().Config);
-    }
-
-    while (true) {
-        try {
-            retryPolicy->NotifyNewAttempt();
-            auto transaction = client->StartTransaction(TStartTransactionOptions());
-            if constexpr (std::is_same<TResult, void>::value) {
-                func(transaction);
-                transaction->Commit();
-                return;
-            } else {
-                auto result = func(transaction);
-                transaction->Commit();
-                return result;
-            }
-        } catch (const TErrorResponse& e) {
-            YT_LOG_ERROR("Retry failed %v - %v",
-                e.GetError().GetMessage(),
-                retryPolicy->GetAttemptDescription());
-
-            if (!IsRetriable(e)) {
-                throw;
-            }
-
-            auto maybeRetryTimeout = retryPolicy->OnRetriableError(e);
-            if (maybeRetryTimeout) {
-                TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
-            } else {
-                throw;
-            }
-        } catch (const std::exception& e) {
-            YT_LOG_ERROR("Retry failed %v - %v",
-                e.what(),
-                retryPolicy->GetAttemptDescription());
-
-            if (!IsRetriable(e)) {
-                throw;
-            }
-
-            auto maybeRetryTimeout = retryPolicy->OnGenericError(e);
-            if (maybeRetryTimeout) {
-                TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
-            } else {
-                throw;
-            }
-        }
-    }
-}
-
-} // namespace NYT::NDetail

+ 11 - 2
yt/cpp/mapreduce/client/retryful_writer.h

@@ -4,18 +4,23 @@
 #include "transaction_pinger.h"
 
 #include <yt/cpp/mapreduce/common/retry_lib.h>
+
 #include <yt/cpp/mapreduce/http/http.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
 #include <yt/cpp/mapreduce/interface/common.h>
 #include <yt/cpp/mapreduce/interface/io.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
+
 #include <yt/cpp/mapreduce/io/helpers.h>
 
 #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
 
 #include <library/cpp/threading/blocking_queue/blocking_queue.h>
 
-#include <util/stream/output.h>
 #include <util/generic/buffer.h>
 #include <util/stream/buffer.h>
+#include <util/stream/output.h>
 #include <util/system/thread.h>
 #include <util/system/event.h>
 
@@ -67,7 +72,11 @@ public:
             WriteTransaction_.ConstructInPlace(rawClient, ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
             auto append = path.Append_.GetOrElse(false);
             auto lockMode = (append  ? LM_SHARED : LM_EXCLUSIVE);
-            NDetail::NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, WriteTransaction_->GetId(), path.Path_, lockMode);
+            NDetail::RequestWithRetry<void>(
+                ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+                [this, &path, &lockMode] (TMutationId& mutationId) {
+                    RawClient_->Lock(mutationId, WriteTransaction_->GetId(), path.Path_, lockMode);
+                });
         }
 
         EmptyBuffers_.Push(TBuffer(BufferSize_ * 2));

+ 4 - 6
yt/cpp/mapreduce/client/retryful_writer_v2.cpp

@@ -321,13 +321,11 @@ TRetryfulWriterV2::TRetryfulWriterV2(
         );
         auto append = path.Append_.GetOrElse(false);
         auto lockMode = (append  ? LM_SHARED : LM_EXCLUSIVE);
-        NDetail::NRawClient::Lock(
+        NDetail::RequestWithRetry<void>(
             clientRetryPolicy->CreatePolicyForGenericRequest(),
-            context,
-            WriteTransaction_->GetId(),
-            path.Path_,
-            lockMode
-        );
+            [this, &rawClient, &path, &lockMode] (TMutationId& mutationId) {
+                rawClient->Lock(mutationId, WriteTransaction_->GetId(), path.Path_, lockMode);
+            });
     }
 
     THeavyRequestRetrier::TParameters parameters = {

+ 8 - 6
yt/cpp/mapreduce/client/transaction.cpp

@@ -180,16 +180,18 @@ void TPingableTransaction::Stop(EStopAction action)
 TYPath Snapshot(
     const IRawClientPtr& rawClient,
     const IClientRetryPolicyPtr& clientRetryPolicy,
-    const TClientContext& context,
     const TTransactionId& transactionId,
     const TYPath& path)
 {
-    auto lockId = NDetail::NRawClient::Lock(
+    auto lockId = NDetail::RequestWithRetry<TLockId>(
         clientRetryPolicy->CreatePolicyForGenericRequest(),
-        context,
-        transactionId,
-        path,
-        ELockMode::LM_SNAPSHOT);
+        [&rawClient, &transactionId, &path] (TMutationId& mutationId) {
+            return rawClient->Lock(
+                mutationId,
+                transactionId,
+                path,
+                ELockMode::LM_SNAPSHOT);
+        });
 
     auto lockedNodeId = NDetail::RequestWithRetry<TNode>(
         clientRetryPolicy->CreatePolicyForGenericRequest(),

Some files were not shown because too many files changed in this diff