lock.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #include "lock.h"
  2. #include "yt_poller.h"
  3. #include <yt/cpp/mapreduce/http/retry_request.h>
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  6. #include <util/string/builder.h>
  7. namespace NYT {
  8. namespace NDetail {
  9. using namespace NRawClient;
  10. ////////////////////////////////////////////////////////////////////////////////
  11. class TLockPollerItem
  12. : public IYtPollerItem
  13. {
  14. public:
  15. TLockPollerItem(const TLockId& lockId, ::NThreading::TPromise<void> acquired)
  16. : LockStateYPath_("#" + GetGuidAsString(lockId) + "/@state")
  17. , Acquired_(acquired)
  18. { }
  19. void PrepareRequest(TRawBatchRequest* batchRequest) override
  20. {
  21. LockState_ = batchRequest->Get(TTransactionId(), LockStateYPath_, TGetOptions());
  22. }
  23. EStatus OnRequestExecuted() override
  24. {
  25. try {
  26. const auto& state = LockState_.GetValue().AsString();
  27. if (state == "acquired") {
  28. Acquired_.SetValue();
  29. return PollBreak;
  30. }
  31. } catch (const TErrorResponse& e) {
  32. if (!IsRetriable(e)) {
  33. Acquired_.SetException(std::current_exception());
  34. return PollBreak;
  35. }
  36. } catch (const std::exception& e) {
  37. if (!IsRetriable(e)) {
  38. Acquired_.SetException(std::current_exception());
  39. return PollBreak;
  40. }
  41. }
  42. return PollContinue;
  43. }
  44. void OnItemDiscarded() override
  45. {
  46. Acquired_.SetException(std::make_exception_ptr(yexception() << "Operation cancelled"));
  47. }
  48. private:
  49. const TString LockStateYPath_;
  50. ::NThreading::TPromise<void> Acquired_;
  51. ::NThreading::TFuture<TNode> LockState_;
  52. };
  53. ////////////////////////////////////////////////////////////////////////////////
  54. TLock::TLock(const TLockId& lockId, TClientPtr client, bool waitable)
  55. : LockId_(lockId)
  56. , Client_(std::move(client))
  57. {
  58. if (!waitable) {
  59. Acquired_ = ::NThreading::MakeFuture();
  60. }
  61. }
  62. const TLockId& TLock::GetId() const
  63. {
  64. return LockId_;
  65. }
  66. TNodeId TLock::GetLockedNodeId() const
  67. {
  68. auto nodeIdNode = Client_->Get(
  69. ::TStringBuilder() << '#' << GetGuidAsString(LockId_) << "/@node_id",
  70. TGetOptions());
  71. return GetGuid(nodeIdNode.AsString());
  72. }
  73. const ::NThreading::TFuture<void>& TLock::GetAcquiredFuture() const
  74. {
  75. if (!Acquired_) {
  76. auto promise = ::NThreading::NewPromise<void>();
  77. Client_->GetYtPoller().Watch(::MakeIntrusive<TLockPollerItem>(LockId_, promise));
  78. Acquired_ = promise.GetFuture();
  79. }
  80. return *Acquired_;
  81. }
  82. ////////////////////////////////////////////////////////////////////////////////
  83. } // namespace NDetail
  84. } // namespace NYT