Browse Source

YT-18503: Mirror Cypress Tx to Sequoia Ground
e6d585180289325f8082f42f85a60478194ba266

kvk1920 11 months ago
parent
commit
72eeab5172

+ 2 - 1
yt/cpp/mapreduce/common/retry_lib.cpp

@@ -220,10 +220,11 @@ static TMaybe<TDuration> TryGetBackoffDuration(const TErrorResponse& errorRespon
         // chunk client errors
         return config->ChunkErrorsRetryInterval;
     }
-    for (auto code : TVector<int>{
+    for (auto code : {
         NRpc::TransportError,
         NRpc::Unavailable,
         NApi::RetriableArchiveError,
+        NSequoiaClient::SequoiaRetriableError,
         Canceled,
     }) {
         if (allCodes.contains(code)) {

+ 13 - 0
yt/cpp/mapreduce/interface/error_codes.h

@@ -465,5 +465,18 @@ namespace NJobProberClient {
 
 } // namespace NJobProberClient
 
+
+
+// from ./ytlib/sequoia_client/public.h
+namespace NSequoiaClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+    constexpr int SequoiaRetriableError = 6002;
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NSequoiaClient
+
 } // namespace NClusterErrorCodes
 } // namespace NYT

+ 4 - 0
yt/python/yt/common.py

@@ -359,6 +359,10 @@ class YtError(Exception):
         """Cross-cell "copy"/"move" command is explicitly disabled"""
         return self.contains_code(1002)
 
+    def is_sequoia_retriable_error(self):
+        """Probably lock conflict in Sequoia tables"""
+        return self.contains_code(6002)
+
 
 class YtResponseError(YtError):
     """Represents an error in YT response."""

+ 7 - 0
yt/yt/client/api/rpc_proxy/client_base.cpp

@@ -141,6 +141,13 @@ TFuture<ITransactionPtr> TClientBase::StartTransaction(
         ToProto(req->mutable_parent_id(), options.ParentId);
     }
     ToProto(req->mutable_prerequisite_transaction_ids(), options.PrerequisiteTransactionIds);
+
+    if (options.ReplicateToMasterCellTags) {
+        ToProto(
+            req->mutable_replicate_to_master_cell_tags()->mutable_cell_tags(),
+            *options.ReplicateToMasterCellTags);
+    }
+
     // XXX(sandello): Better? Remove these fields from the protocol at all?
     // COMPAT(kiselyovp): remove auto_abort from the protocol
     req->set_auto_abort(false);

+ 12 - 13
yt/yt/client/api/rpc_proxy/client_impl.cpp

@@ -68,7 +68,7 @@ TClient::TClient(
     TConnectionPtr connection,
     const TClientOptions& clientOptions)
     : Connection_(std::move(connection))
-    , RetryingChannel_(MaybeCreateRetryingChannel(
+    , RetryingChannel_(CreateSequoiaAwareRetryingChannel(
         CreateCredentialsInjectingChannel(
             Connection_->CreateChannel(false),
             clientOptions),
@@ -103,19 +103,18 @@ void TClient::Terminate()
 
 ////////////////////////////////////////////////////////////////////////////////
 
-IChannelPtr TClient::MaybeCreateRetryingChannel(NRpc::IChannelPtr channel, bool retryProxyBanned) const
+IChannelPtr TClient::CreateSequoiaAwareRetryingChannel(NRpc::IChannelPtr channel, bool retryProxyBanned) const
 {
     const auto& config = Connection_->GetConfig();
-    if (config->EnableRetries) {
-        return NRpc::CreateRetryingChannel(
-            config->RetryingChannel,
-            std::move(channel),
-            BIND([=] (const TError& error) {
-                return IsRetriableError(error, retryProxyBanned);
-            }));
-    } else {
-        return channel;
-    }
+    bool retrySequoiaErrorsOnly = !config->EnableRetries;
+    // NB: even if client's retries are disabled Sequoia transient failures are
+    // still retriable. See IsRetriableError().
+    return NRpc::CreateRetryingChannel(
+        config->RetryingChannel,
+        std::move(channel),
+        BIND([=] (const TError& error) {
+            return IsRetriableError(error, retryProxyBanned, retrySequoiaErrorsOnly);
+        }));
 }
 
 IChannelPtr TClient::CreateNonRetryingChannelByAddress(const TString& address) const
@@ -153,7 +152,7 @@ IChannelPtr TClient::CreateNonRetryingStickyChannel() const
 
 IChannelPtr TClient::WrapStickyChannelIntoRetrying(IChannelPtr underlying) const
 {
-    return MaybeCreateRetryingChannel(
+    return CreateSequoiaAwareRetryingChannel(
         std::move(underlying),
         /*retryProxyBanned*/ false);
 }

+ 1 - 1
yt/yt/client/api/rpc_proxy/client_impl.h

@@ -548,7 +548,7 @@ private:
 
     NTransactionClient::ITimestampProviderPtr CreateTimestampProvider() const;
 
-    NRpc::IChannelPtr MaybeCreateRetryingChannel(NRpc::IChannelPtr channel, bool retryProxyBanned) const;
+    NRpc::IChannelPtr CreateSequoiaAwareRetryingChannel(NRpc::IChannelPtr channel, bool retryProxyBanned) const;
     // Returns an RPC channel to use for API calls to the particular address (e.g.: AttachTransaction).
     // The channel is non-retrying, so should be wrapped into retrying channel on demand.
     NRpc::IChannelPtr CreateNonRetryingChannelByAddress(const TString& address) const;

+ 15 - 1
yt/yt/client/api/rpc_proxy/helpers.cpp

@@ -3,6 +3,8 @@
 #include <yt/yt/client/api/rowset.h>
 #include <yt/yt/client/api/table_client.h>
 
+#include <yt/yt/client/sequoia_client/public.h>
+
 #include <yt/yt/client/table_client/columnar_statistics.h>
 #include <yt/yt/client/table_client/column_sort_schema.h>
 #include <yt/yt/client/table_client/logical_type.h>
@@ -1888,8 +1890,20 @@ bool IsDynamicTableRetriableError(const TError& error)
         error.FindMatching(NTabletClient::EErrorCode::NoSuchTablet);
 }
 
-bool IsRetriableError(const TError& error, bool retryProxyBanned)
+bool IsRetriableError(const TError& error, bool retryProxyBanned, bool retrySequoiaErrorsOnly)
 {
+    // For now transient Sequoia failures are always retriable even if client's
+    // retries are disabled.
+    // TODO(kvk1920): consider to make a separate flag "EnableSequoiaRetries"
+    // for this.
+    if (error.FindMatching(NSequoiaClient::EErrorCode::SequoiaRetriableError)) {
+        return true;
+    }
+
+    if (retrySequoiaErrorsOnly) {
+        return false;
+    }
+
     if (error.FindMatching(NRpcProxy::EErrorCode::ProxyBanned) ||
         error.FindMatching(NRpc::EErrorCode::PeerBanned))
     {

+ 4 - 1
yt/yt/client/api/rpc_proxy/helpers.h

@@ -275,7 +275,10 @@ NQueryTrackerClient::EQueryState ConvertQueryStateFromProto(
 
 ////////////////////////////////////////////////////////////////////////////////
 
-bool IsRetriableError(const TError& error, bool retryProxyBanned = true);
+bool IsRetriableError(
+    const TError& error,
+    bool retryProxyBanned = true,
+    bool retrySequoiaErrorsOnly = false);
 
 ////////////////////////////////////////////////////////////////////////////////
 

+ 17 - 0
yt/yt/client/sequoia_client/public.h

@@ -0,0 +1,17 @@
+#pragma once
+
+#include <yt/yt/core/misc/error_code.h>
+
+namespace NYT::NSequoiaClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+YT_DEFINE_ERROR_ENUM(
+    ((SequoiaClientNotReady)          (6000))
+    ((SequoiaTableCorrupted)          (6001))
+    ((SequoiaRetriableError)          (6002))
+);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NSequoiaClient

+ 7 - 0
yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto

@@ -366,6 +366,11 @@ message TRowsetStatistics
 
 message TReqStartTransaction
 {
+    message TReplicateToMasterCellTags
+    {
+        repeated int32 cell_tags = 1;
+    }
+
     required ETransactionType type = 1;
 
     optional int64 timeout = 2;
@@ -393,6 +398,8 @@ message TReqStartTransaction
 
     optional uint64 start_timestamp = 14;
 
+    optional TReplicateToMasterCellTags replicate_to_master_cell_tags = 15;
+
     optional TMutatingOptions mutating_options = 103;
 }