transaction.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. #include "transaction.h"
  2. #include "transaction_pinger.h"
  3. #include <yt/cpp/mapreduce/common/retry_lib.h>
  4. #include <yt/cpp/mapreduce/common/retry_request.h>
  5. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  6. #include <yt/cpp/mapreduce/interface/config.h>
  7. #include <yt/cpp/mapreduce/interface/error_codes.h>
  8. #include <yt/cpp/mapreduce/interface/raw_client.h>
  9. #include <util/datetime/base.h>
  10. #include <util/generic/scope.h>
  11. #include <util/random/random.h>
  12. #include <util/string/builder.h>
  13. namespace NYT {
  14. ////////////////////////////////////////////////////////////////////////////////
  15. TPingableTransaction::TPingableTransaction(
  16. const IRawClientPtr& rawClient,
  17. const IClientRetryPolicyPtr& retryPolicy,
  18. const TClientContext& context,
  19. const TTransactionId& parentId,
  20. ITransactionPingerPtr transactionPinger,
  21. const TStartTransactionOptions& options)
  22. : RawClient_(rawClient)
  23. , ClientRetryPolicy_(retryPolicy)
  24. , Context_(context)
  25. , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
  26. , AbortOnTermination_(true)
  27. , AutoPingable_(options.AutoPingable_)
  28. , Pinger_(std::move(transactionPinger))
  29. {
  30. auto transactionId = NDetail::RequestWithRetry<TTransactionId>(
  31. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  32. [this, &parentId, &options] (TMutationId& mutationId) {
  33. return RawClient_->StartTransaction(mutationId, parentId, options);
  34. });
  35. auto actualTimeout = options.Timeout_.GetOrElse(Context_.Config->TxTimeout);
  36. Init(rawClient, context, transactionId, actualTimeout);
  37. }
  38. TPingableTransaction::TPingableTransaction(
  39. const IRawClientPtr& rawClient,
  40. const IClientRetryPolicyPtr& retryPolicy,
  41. const TClientContext& context,
  42. const TTransactionId& transactionId,
  43. ITransactionPingerPtr transactionPinger,
  44. const TAttachTransactionOptions& options)
  45. : RawClient_(rawClient)
  46. , ClientRetryPolicy_(retryPolicy)
  47. , Context_(context)
  48. , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
  49. , AbortOnTermination_(options.AbortOnTermination_)
  50. , AutoPingable_(options.AutoPingable_)
  51. , Pinger_(std::move(transactionPinger))
  52. {
  53. auto timeoutNode = NDetail::RequestWithRetry<TNode>(
  54. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  55. [this, &transactionId] (TMutationId /*mutationId*/) {
  56. return RawClient_->TryGet(
  57. TTransactionId(),
  58. "#" + GetGuidAsString(transactionId) + "/@timeout",
  59. TGetOptions());
  60. });
  61. if (timeoutNode.IsUndefined()) {
  62. throw yexception() << "Transaction " << GetGuidAsString(transactionId) << " does not exist";
  63. }
  64. auto timeout = TDuration::MilliSeconds(timeoutNode.AsInt64());
  65. Init(rawClient, context, transactionId, timeout);
  66. }
  67. void TPingableTransaction::Init(
  68. const IRawClientPtr& rawClient,
  69. const TClientContext& context,
  70. const TTransactionId& transactionId,
  71. TDuration timeout)
  72. {
  73. TransactionId_ = transactionId;
  74. if (AbortOnTermination_) {
  75. AbortableRegistry_->Add(
  76. TransactionId_,
  77. ::MakeIntrusive<NDetail::TTransactionAbortable>(rawClient, context, TransactionId_));
  78. }
  79. if (AutoPingable_) {
  80. // Compute 'MaxPingInterval_' and 'MinPingInterval_' such that 'pingInterval == (max + min) / 2'.
  81. auto pingInterval = context.Config->PingInterval;
  82. auto safeTimeout = timeout - TDuration::Seconds(5);
  83. MaxPingInterval_ = Max(pingInterval, Min(safeTimeout, pingInterval * 1.5));
  84. MinPingInterval_ = pingInterval - (MaxPingInterval_ - pingInterval);
  85. Pinger_->RegisterTransaction(*this);
  86. }
  87. }
  88. TPingableTransaction::~TPingableTransaction()
  89. {
  90. try {
  91. Stop(AbortOnTermination_ ? EStopAction::Abort : EStopAction::Detach);
  92. } catch (...) {
  93. }
  94. }
  95. const TTransactionId TPingableTransaction::GetId() const
  96. {
  97. return TransactionId_;
  98. }
  99. const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const
  100. {
  101. return {MinPingInterval_, MaxPingInterval_};
  102. }
  103. const TClientContext TPingableTransaction::GetContext() const
  104. {
  105. return Context_;
  106. }
  107. void TPingableTransaction::Ping() const
  108. {
  109. RawClient_->PingTransaction(TransactionId_);
  110. }
  111. void TPingableTransaction::Commit()
  112. {
  113. Stop(EStopAction::Commit);
  114. }
  115. void TPingableTransaction::Abort()
  116. {
  117. Stop(EStopAction::Abort);
  118. }
  119. void TPingableTransaction::Detach()
  120. {
  121. Stop(EStopAction::Detach);
  122. }
  123. void TPingableTransaction::Stop(EStopAction action)
  124. {
  125. if (Finalized_) {
  126. return;
  127. }
  128. Y_DEFER {
  129. Finalized_ = true;
  130. if (AutoPingable_ && Pinger_->HasTransaction(*this)) {
  131. Pinger_->RemoveTransaction(*this);
  132. }
  133. };
  134. switch (action) {
  135. case EStopAction::Commit:
  136. NDetail::RequestWithRetry<void>(
  137. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  138. [this] (TMutationId& mutationId) {
  139. RawClient_->CommitTransaction(mutationId, TransactionId_);
  140. });
  141. break;
  142. case EStopAction::Abort:
  143. NDetail::RequestWithRetry<void>(
  144. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  145. [this] (TMutationId& mutationId) {
  146. RawClient_->AbortTransaction(mutationId, TransactionId_);
  147. });
  148. break;
  149. case EStopAction::Detach:
  150. // Do nothing.
  151. break;
  152. }
  153. AbortableRegistry_->Remove(TransactionId_);
  154. }
  155. ////////////////////////////////////////////////////////////////////////////////
  156. TYPath Snapshot(
  157. const IRawClientPtr& rawClient,
  158. const IClientRetryPolicyPtr& clientRetryPolicy,
  159. const TTransactionId& transactionId,
  160. const TYPath& path)
  161. {
  162. auto lockId = NDetail::RequestWithRetry<TLockId>(
  163. clientRetryPolicy->CreatePolicyForGenericRequest(),
  164. [&rawClient, &transactionId, &path] (TMutationId& mutationId) {
  165. return rawClient->Lock(
  166. mutationId,
  167. transactionId,
  168. path,
  169. ELockMode::LM_SNAPSHOT);
  170. });
  171. auto lockedNodeId = NDetail::RequestWithRetry<TNode>(
  172. clientRetryPolicy->CreatePolicyForGenericRequest(),
  173. [&rawClient, &transactionId, &lockId] (TMutationId /*mutationId*/) {
  174. return rawClient->Get(
  175. transactionId,
  176. ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id");
  177. });
  178. return "#" + lockedNodeId.AsString();
  179. }
  180. ////////////////////////////////////////////////////////////////////////////////
  181. } // namespace NYT