transaction.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. #include "transaction.h"
  2. #include "transaction_pinger.h"
  3. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/http/requests.h>
  6. #include <yt/cpp/mapreduce/http/retry_request.h>
  7. #include <yt/cpp/mapreduce/interface/config.h>
  8. #include <yt/cpp/mapreduce/interface/error_codes.h>
  9. #include <yt/cpp/mapreduce/interface/raw_client.h>
  10. #include <util/datetime/base.h>
  11. #include <util/generic/scope.h>
  12. #include <util/random/random.h>
  13. #include <util/string/builder.h>
  14. namespace NYT {
  15. ////////////////////////////////////////////////////////////////////////////////
  16. TPingableTransaction::TPingableTransaction(
  17. const IRawClientPtr& rawClient,
  18. const IClientRetryPolicyPtr& retryPolicy,
  19. const TClientContext& context,
  20. const TTransactionId& parentId,
  21. ITransactionPingerPtr transactionPinger,
  22. const TStartTransactionOptions& options)
  23. : RawClient_(rawClient)
  24. , ClientRetryPolicy_(retryPolicy)
  25. , Context_(context)
  26. , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
  27. , AbortOnTermination_(true)
  28. , AutoPingable_(options.AutoPingable_)
  29. , Pinger_(std::move(transactionPinger))
  30. {
  31. auto transactionId = NDetail::RequestWithRetry<TTransactionId>(
  32. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  33. [this, &parentId, &options] (TMutationId& mutationId) {
  34. return RawClient_->StartTransaction(mutationId, parentId, options);
  35. });
  36. auto actualTimeout = options.Timeout_.GetOrElse(Context_.Config->TxTimeout);
  37. Init(rawClient, context, transactionId, actualTimeout);
  38. }
  39. TPingableTransaction::TPingableTransaction(
  40. const IRawClientPtr& rawClient,
  41. const IClientRetryPolicyPtr& retryPolicy,
  42. const TClientContext& context,
  43. const TTransactionId& transactionId,
  44. ITransactionPingerPtr transactionPinger,
  45. const TAttachTransactionOptions& options)
  46. : RawClient_(rawClient)
  47. , ClientRetryPolicy_(retryPolicy)
  48. , Context_(context)
  49. , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
  50. , AbortOnTermination_(options.AbortOnTermination_)
  51. , AutoPingable_(options.AutoPingable_)
  52. , Pinger_(std::move(transactionPinger))
  53. {
  54. auto timeoutNode = NDetail::RequestWithRetry<TNode>(
  55. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  56. [this, &transactionId] (TMutationId /*mutationId*/) {
  57. return RawClient_->TryGet(
  58. TTransactionId(),
  59. "#" + GetGuidAsString(transactionId) + "/@timeout",
  60. TGetOptions());
  61. });
  62. if (timeoutNode.IsUndefined()) {
  63. throw yexception() << "Transaction " << GetGuidAsString(transactionId) << " does not exist";
  64. }
  65. auto timeout = TDuration::MilliSeconds(timeoutNode.AsInt64());
  66. Init(rawClient, context, transactionId, timeout);
  67. }
  68. void TPingableTransaction::Init(
  69. const IRawClientPtr& rawClient,
  70. const TClientContext& context,
  71. const TTransactionId& transactionId,
  72. TDuration timeout)
  73. {
  74. TransactionId_ = transactionId;
  75. if (AbortOnTermination_) {
  76. AbortableRegistry_->Add(
  77. TransactionId_,
  78. ::MakeIntrusive<NDetail::TTransactionAbortable>(rawClient, context, TransactionId_));
  79. }
  80. if (AutoPingable_) {
  81. // Compute 'MaxPingInterval_' and 'MinPingInterval_' such that 'pingInterval == (max + min) / 2'.
  82. auto pingInterval = context.Config->PingInterval;
  83. auto safeTimeout = timeout - TDuration::Seconds(5);
  84. MaxPingInterval_ = Max(pingInterval, Min(safeTimeout, pingInterval * 1.5));
  85. MinPingInterval_ = pingInterval - (MaxPingInterval_ - pingInterval);
  86. Pinger_->RegisterTransaction(*this);
  87. }
  88. }
  89. TPingableTransaction::~TPingableTransaction()
  90. {
  91. try {
  92. Stop(AbortOnTermination_ ? EStopAction::Abort : EStopAction::Detach);
  93. } catch (...) {
  94. }
  95. }
  96. const TTransactionId TPingableTransaction::GetId() const
  97. {
  98. return TransactionId_;
  99. }
  100. const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const
  101. {
  102. return {MinPingInterval_, MaxPingInterval_};
  103. }
  104. const TClientContext TPingableTransaction::GetContext() const
  105. {
  106. return Context_;
  107. }
  108. void TPingableTransaction::Ping() const
  109. {
  110. RawClient_->PingTransaction(TransactionId_);
  111. }
  112. void TPingableTransaction::Commit()
  113. {
  114. Stop(EStopAction::Commit);
  115. }
  116. void TPingableTransaction::Abort()
  117. {
  118. Stop(EStopAction::Abort);
  119. }
  120. void TPingableTransaction::Detach()
  121. {
  122. Stop(EStopAction::Detach);
  123. }
  124. void TPingableTransaction::Stop(EStopAction action)
  125. {
  126. if (Finalized_) {
  127. return;
  128. }
  129. Y_DEFER {
  130. Finalized_ = true;
  131. if (AutoPingable_ && Pinger_->HasTransaction(*this)) {
  132. Pinger_->RemoveTransaction(*this);
  133. }
  134. };
  135. switch (action) {
  136. case EStopAction::Commit:
  137. NDetail::RequestWithRetry<void>(
  138. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  139. [this] (TMutationId& mutationId) {
  140. RawClient_->CommitTransaction(mutationId, TransactionId_);
  141. });
  142. break;
  143. case EStopAction::Abort:
  144. NDetail::RequestWithRetry<void>(
  145. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  146. [this] (TMutationId& mutationId) {
  147. RawClient_->AbortTransaction(mutationId, TransactionId_);
  148. });
  149. break;
  150. case EStopAction::Detach:
  151. // Do nothing.
  152. break;
  153. }
  154. AbortableRegistry_->Remove(TransactionId_);
  155. }
  156. ////////////////////////////////////////////////////////////////////////////////
  157. TYPath Snapshot(
  158. const IRawClientPtr& rawClient,
  159. const IClientRetryPolicyPtr& clientRetryPolicy,
  160. const TTransactionId& transactionId,
  161. const TYPath& path)
  162. {
  163. auto lockId = NDetail::RequestWithRetry<TLockId>(
  164. clientRetryPolicy->CreatePolicyForGenericRequest(),
  165. [&rawClient, &transactionId, &path] (TMutationId& mutationId) {
  166. return rawClient->Lock(
  167. mutationId,
  168. transactionId,
  169. path,
  170. ELockMode::LM_SNAPSHOT);
  171. });
  172. auto lockedNodeId = NDetail::RequestWithRetry<TNode>(
  173. clientRetryPolicy->CreatePolicyForGenericRequest(),
  174. [&rawClient, &transactionId, &lockId] (TMutationId /*mutationId*/) {
  175. return rawClient->Get(
  176. transactionId,
  177. ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id");
  178. });
  179. return "#" + lockedNodeId.AsString();
  180. }
  181. ////////////////////////////////////////////////////////////////////////////////
  182. } // namespace NYT