transaction.cpp 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. #include "transaction.h"
  2. #include "transaction_pinger.h"
  3. #include <yt/cpp/mapreduce/interface/config.h>
  4. #include <yt/cpp/mapreduce/interface/error_codes.h>
  5. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  6. #include <yt/cpp/mapreduce/common/retry_lib.h>
  7. #include <yt/cpp/mapreduce/http/requests.h>
  8. #include <yt/cpp/mapreduce/http/retry_request.h>
  9. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  10. #include <util/datetime/base.h>
  11. #include <util/generic/scope.h>
  12. #include <util/random/random.h>
  13. #include <util/string/builder.h>
  14. namespace NYT {
  15. ////////////////////////////////////////////////////////////////////////////////
  16. TPingableTransaction::TPingableTransaction(
  17. const IClientRetryPolicyPtr& retryPolicy,
  18. const TClientContext& context,
  19. const TTransactionId& parentId,
  20. ITransactionPingerPtr transactionPinger,
  21. const TStartTransactionOptions& options)
  22. : ClientRetryPolicy_(retryPolicy)
  23. , Context_(context)
  24. , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
  25. , AbortOnTermination_(true)
  26. , AutoPingable_(options.AutoPingable_)
  27. , Pinger_(std::move(transactionPinger))
  28. {
  29. auto transactionId = NDetail::NRawClient::StartTransaction(
  30. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  31. context,
  32. parentId,
  33. options);
  34. auto actualTimeout = options.Timeout_.GetOrElse(Context_.Config->TxTimeout);
  35. Init(context, transactionId, actualTimeout);
  36. }
  37. TPingableTransaction::TPingableTransaction(
  38. const IClientRetryPolicyPtr& retryPolicy,
  39. const TClientContext& context,
  40. const TTransactionId& transactionId,
  41. ITransactionPingerPtr transactionPinger,
  42. const TAttachTransactionOptions& options)
  43. : ClientRetryPolicy_(retryPolicy)
  44. , Context_(context)
  45. , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
  46. , AbortOnTermination_(options.AbortOnTermination_)
  47. , AutoPingable_(options.AutoPingable_)
  48. , Pinger_(std::move(transactionPinger))
  49. {
  50. auto timeoutNode = NDetail::NRawClient::TryGet(
  51. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  52. context,
  53. TTransactionId(),
  54. "#" + GetGuidAsString(transactionId) + "/@timeout",
  55. TGetOptions());
  56. if (timeoutNode.IsUndefined()) {
  57. throw yexception() << "Transaction " << GetGuidAsString(transactionId) << " does not exist";
  58. }
  59. auto timeout = TDuration::MilliSeconds(timeoutNode.AsInt64());
  60. Init(context, transactionId, timeout);
  61. }
  62. void TPingableTransaction::Init(
  63. const TClientContext& context,
  64. const TTransactionId& transactionId,
  65. TDuration timeout)
  66. {
  67. TransactionId_ = transactionId;
  68. if (AbortOnTermination_) {
  69. AbortableRegistry_->Add(
  70. TransactionId_,
  71. ::MakeIntrusive<NDetail::TTransactionAbortable>(context, TransactionId_));
  72. }
  73. if (AutoPingable_) {
  74. // Compute 'MaxPingInterval_' and 'MinPingInterval_' such that 'pingInterval == (max + min) / 2'.
  75. auto pingInterval = Context_.Config->PingInterval;
  76. auto safeTimeout = timeout - TDuration::Seconds(5);
  77. MaxPingInterval_ = Max(pingInterval, Min(safeTimeout, pingInterval * 1.5));
  78. MinPingInterval_ = pingInterval - (MaxPingInterval_ - pingInterval);
  79. Pinger_->RegisterTransaction(*this);
  80. }
  81. }
  82. TPingableTransaction::~TPingableTransaction()
  83. {
  84. try {
  85. Stop(AbortOnTermination_ ? EStopAction::Abort : EStopAction::Detach);
  86. } catch (...) {
  87. }
  88. }
  89. const TTransactionId TPingableTransaction::GetId() const
  90. {
  91. return TransactionId_;
  92. }
  93. const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const {
  94. return {MinPingInterval_, MaxPingInterval_};
  95. }
  96. const TClientContext TPingableTransaction::GetContext() const {
  97. return Context_;
  98. }
  99. void TPingableTransaction::Commit()
  100. {
  101. Stop(EStopAction::Commit);
  102. }
  103. void TPingableTransaction::Abort()
  104. {
  105. Stop(EStopAction::Abort);
  106. }
  107. void TPingableTransaction::Detach()
  108. {
  109. Stop(EStopAction::Detach);
  110. }
  111. void TPingableTransaction::Stop(EStopAction action)
  112. {
  113. if (Finalized_) {
  114. return;
  115. }
  116. Y_DEFER {
  117. Finalized_ = true;
  118. if (AutoPingable_ && Pinger_->HasTransaction(*this)) {
  119. Pinger_->RemoveTransaction(*this);
  120. }
  121. };
  122. switch (action) {
  123. case EStopAction::Commit:
  124. NDetail::NRawClient::CommitTransaction(
  125. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  126. Context_,
  127. TransactionId_);
  128. break;
  129. case EStopAction::Abort:
  130. NDetail::NRawClient::AbortTransaction(
  131. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  132. Context_,
  133. TransactionId_);
  134. break;
  135. case EStopAction::Detach:
  136. // Do nothing.
  137. break;
  138. }
  139. AbortableRegistry_->Remove(TransactionId_);
  140. }
  141. ////////////////////////////////////////////////////////////////////////////////
  142. TYPath Snapshot(
  143. const IClientRetryPolicyPtr& clientRetryPolicy,
  144. const TClientContext& context,
  145. const TTransactionId& transactionId,
  146. const TYPath& path)
  147. {
  148. auto lockId = NDetail::NRawClient::Lock(
  149. clientRetryPolicy->CreatePolicyForGenericRequest(),
  150. context,
  151. transactionId,
  152. path,
  153. ELockMode::LM_SNAPSHOT);
  154. auto lockedNodeId = NDetail::NRawClient::Get(
  155. clientRetryPolicy->CreatePolicyForGenericRequest(),
  156. context,
  157. transactionId,
  158. ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id");
  159. return "#" + lockedNodeId.AsString();
  160. }
  161. ////////////////////////////////////////////////////////////////////////////////
  162. } // namespace NYT