lock.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. #include "lock.h"
  2. #include "yt_poller.h"
  3. #include <yt/cpp/mapreduce/common/retry_lib.h>
  4. #include <yt/cpp/mapreduce/http/retry_request.h>
  5. #include <yt/cpp/mapreduce/interface/raw_batch_request.h>
  6. #include <util/string/builder.h>
  7. namespace NYT {
  8. namespace NDetail {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. class TLockPollerItem
  11. : public IYtPollerItem
  12. {
  13. public:
  14. TLockPollerItem(const TLockId& lockId, ::NThreading::TPromise<void> acquired)
  15. : LockStateYPath_("#" + GetGuidAsString(lockId) + "/@state")
  16. , Acquired_(acquired)
  17. { }
  18. void PrepareRequest(IRawBatchRequest* batchRequest) override
  19. {
  20. LockState_ = batchRequest->Get(TTransactionId(), LockStateYPath_, TGetOptions());
  21. }
  22. EStatus OnRequestExecuted() override
  23. {
  24. try {
  25. const auto& state = LockState_.GetValue().AsString();
  26. if (state == "acquired") {
  27. Acquired_.SetValue();
  28. return PollBreak;
  29. }
  30. } catch (const TErrorResponse& e) {
  31. if (!IsRetriable(e)) {
  32. Acquired_.SetException(std::current_exception());
  33. return PollBreak;
  34. }
  35. } catch (const std::exception& e) {
  36. if (!IsRetriable(e)) {
  37. Acquired_.SetException(std::current_exception());
  38. return PollBreak;
  39. }
  40. }
  41. return PollContinue;
  42. }
  43. void OnItemDiscarded() override
  44. {
  45. Acquired_.SetException(std::make_exception_ptr(yexception() << "Operation cancelled"));
  46. }
  47. private:
  48. const TString LockStateYPath_;
  49. ::NThreading::TPromise<void> Acquired_;
  50. ::NThreading::TFuture<TNode> LockState_;
  51. };
  52. ////////////////////////////////////////////////////////////////////////////////
  53. TLock::TLock(const TLockId& lockId, TClientPtr client, bool waitable)
  54. : LockId_(lockId)
  55. , Client_(std::move(client))
  56. {
  57. if (!waitable) {
  58. Acquired_ = ::NThreading::MakeFuture();
  59. }
  60. }
  61. const TLockId& TLock::GetId() const
  62. {
  63. return LockId_;
  64. }
  65. TNodeId TLock::GetLockedNodeId() const
  66. {
  67. auto nodeIdNode = Client_->Get(
  68. ::TStringBuilder() << '#' << GetGuidAsString(LockId_) << "/@node_id",
  69. TGetOptions());
  70. return GetGuid(nodeIdNode.AsString());
  71. }
  72. const ::NThreading::TFuture<void>& TLock::GetAcquiredFuture() const
  73. {
  74. if (!Acquired_) {
  75. auto promise = ::NThreading::NewPromise<void>();
  76. Client_->GetYtPoller().Watch(::MakeIntrusive<TLockPollerItem>(LockId_, promise));
  77. Acquired_ = promise.GetFuture();
  78. }
  79. return *Acquired_;
  80. }
  81. ////////////////////////////////////////////////////////////////////////////////
  82. } // namespace NDetail
  83. } // namespace NYT