abortable_registry.cpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. #include "abortable_registry.h"
  2. #include <yt/cpp/mapreduce/common/retry_lib.h>
  3. #include <yt/cpp/mapreduce/common/retry_request.h>
  4. #include <yt/cpp/mapreduce/interface/common.h>
  5. #include <yt/cpp/mapreduce/interface/raw_client.h>
  6. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  7. #include <util/generic/singleton.h>
  8. namespace NYT::NDetail {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. TTransactionAbortable::TTransactionAbortable(
  11. const IRawClientPtr& rawClient,
  12. const TClientContext& context,
  13. const TTransactionId& transactionId)
  14. : RawClient_(rawClient)
  15. , Context_(context)
  16. , TransactionId_(transactionId)
  17. { }
  18. void TTransactionAbortable::Abort()
  19. {
  20. RequestWithRetry<void>(
  21. CreateDefaultRequestRetryPolicy(Context_.Config),
  22. [this] (TMutationId& mutationId) {
  23. RawClient_->AbortTransaction(mutationId, TransactionId_);
  24. });
  25. }
  26. TString TTransactionAbortable::GetType() const
  27. {
  28. return "transaction";
  29. }
  30. ////////////////////////////////////////////////////////////////////////////////
  31. TOperationAbortable::TOperationAbortable(
  32. IRawClientPtr rawClient,
  33. IClientRetryPolicyPtr clientRetryPolicy,
  34. const TOperationId& operationId)
  35. : RawClient_(std::move(rawClient))
  36. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  37. , OperationId_(operationId)
  38. { }
  39. void TOperationAbortable::Abort()
  40. {
  41. RequestWithRetry<void>(
  42. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  43. [this] (TMutationId& mutationId) {
  44. RawClient_->AbortOperation(mutationId, OperationId_);
  45. });
  46. }
  47. TString TOperationAbortable::GetType() const
  48. {
  49. return "operation";
  50. }
  51. ////////////////////////////////////////////////////////////////////////////////
  52. void TAbortableRegistry::AbortAllAndBlockForever()
  53. {
  54. auto guard = Guard(Lock_);
  55. for (const auto& entry : ActiveAbortables_) {
  56. const auto& id = entry.first;
  57. const auto& abortable = entry.second;
  58. try {
  59. abortable->Abort();
  60. } catch (std::exception& ex) {
  61. YT_LOG_ERROR("Exception while aborting %v %v: %v",
  62. abortable->GetType(),
  63. id,
  64. ex.what());
  65. }
  66. }
  67. Running_ = false;
  68. }
  69. void TAbortableRegistry::Add(const TGUID& id, IAbortablePtr abortable)
  70. {
  71. auto guard = Guard(Lock_);
  72. if (!Running_) {
  73. Sleep(TDuration::Max());
  74. }
  75. ActiveAbortables_[id] = abortable;
  76. }
  77. void TAbortableRegistry::Remove(const TGUID& id)
  78. {
  79. auto guard = Guard(Lock_);
  80. if (!Running_) {
  81. Sleep(TDuration::Max());
  82. }
  83. ActiveAbortables_.erase(id);
  84. }
  85. ////////////////////////////////////////////////////////////////////////////////
  86. namespace {
  87. class TRegistryHolder
  88. {
  89. public:
  90. TRegistryHolder()
  91. : Registry_(::MakeIntrusive<TAbortableRegistry>())
  92. { }
  93. ::TIntrusivePtr<TAbortableRegistry> Get()
  94. {
  95. return Registry_;
  96. }
  97. private:
  98. ::TIntrusivePtr<TAbortableRegistry> Registry_;
  99. };
  100. } // namespace
  101. ::TIntrusivePtr<TAbortableRegistry> TAbortableRegistry::Get()
  102. {
  103. return Singleton<TRegistryHolder>()->Get();
  104. }
  105. ////////////////////////////////////////////////////////////////////////////////
  106. } // namespace NYT::NDetail