abortable_registry.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. #include "abortable_registry.h"
  2. #include <yt/cpp/mapreduce/common/retry_lib.h>
  3. #include <yt/cpp/mapreduce/interface/common.h>
  4. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  5. #include <util/generic/singleton.h>
  6. namespace NYT {
  7. namespace NDetail {
  8. using namespace NRawClient;
  9. ////////////////////////////////////////////////////////////////////////////////
  10. TTransactionAbortable::TTransactionAbortable(const TClientContext& context, const TTransactionId& transactionId)
  11. : Context_(context)
  12. , TransactionId_(transactionId)
  13. { }
  14. void TTransactionAbortable::Abort()
  15. {
  16. AbortTransaction(nullptr, Context_, TransactionId_);
  17. }
  18. TString TTransactionAbortable::GetType() const
  19. {
  20. return "transaction";
  21. }
  22. ////////////////////////////////////////////////////////////////////////////////
  23. TOperationAbortable::TOperationAbortable(IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, const TOperationId& operationId)
  24. : ClientRetryPolicy_(std::move(clientRetryPolicy))
  25. , Context_(std::move(context))
  26. , OperationId_(operationId)
  27. { }
  28. void TOperationAbortable::Abort()
  29. {
  30. AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, OperationId_);
  31. }
  32. TString TOperationAbortable::GetType() const
  33. {
  34. return "operation";
  35. }
  36. ////////////////////////////////////////////////////////////////////////////////
  37. void TAbortableRegistry::AbortAllAndBlockForever()
  38. {
  39. auto guard = Guard(Lock_);
  40. for (const auto& entry : ActiveAbortables_) {
  41. const auto& id = entry.first;
  42. const auto& abortable = entry.second;
  43. try {
  44. abortable->Abort();
  45. } catch (std::exception& ex) {
  46. YT_LOG_ERROR("Exception while aborting %v %v: %v",
  47. abortable->GetType(),
  48. id,
  49. ex.what());
  50. }
  51. }
  52. Running_ = false;
  53. }
  54. void TAbortableRegistry::Add(const TGUID& id, IAbortablePtr abortable)
  55. {
  56. auto guard = Guard(Lock_);
  57. if (!Running_) {
  58. Sleep(TDuration::Max());
  59. }
  60. ActiveAbortables_[id] = abortable;
  61. }
  62. void TAbortableRegistry::Remove(const TGUID& id)
  63. {
  64. auto guard = Guard(Lock_);
  65. if (!Running_) {
  66. Sleep(TDuration::Max());
  67. }
  68. ActiveAbortables_.erase(id);
  69. }
  70. ////////////////////////////////////////////////////////////////////////////////
  71. namespace {
  72. class TRegistryHolder
  73. {
  74. public:
  75. TRegistryHolder()
  76. : Registry_(::MakeIntrusive<TAbortableRegistry>())
  77. { }
  78. ::TIntrusivePtr<TAbortableRegistry> Get()
  79. {
  80. return Registry_;
  81. }
  82. private:
  83. ::TIntrusivePtr<TAbortableRegistry> Registry_;
  84. };
  85. } // namespace
  86. ::TIntrusivePtr<TAbortableRegistry> TAbortableRegistry::Get()
  87. {
  88. return Singleton<TRegistryHolder>()->Get();
  89. }
  90. ////////////////////////////////////////////////////////////////////////////////
  91. } // namespace NDetail
  92. } // namespace NYT