batch_request_impl.cpp 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. #include "batch_request_impl.h"
  2. #include "lock.h"
  3. #include <yt/cpp/mapreduce/common/helpers.h>
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/http/retry_request.h>
  6. #include <yt/cpp/mapreduce/interface/config.h>
  7. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  8. #include <library/cpp/yson/node/node.h>
  9. #include <library/cpp/yson/node/serialize.h>
  10. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  11. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  12. #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h>
  13. #include <util/generic/guid.h>
  14. #include <util/string/builder.h>
  15. #include <exception>
  16. namespace NYT {
  17. namespace NDetail {
  18. using namespace NRawClient;
  19. using ::NThreading::TFuture;
  20. using ::NThreading::TPromise;
  21. using ::NThreading::NewPromise;
  22. ////////////////////////////////////////////////////////////////////
  23. TBatchRequest::TBatchRequest(const TTransactionId& defaultTransaction, ::TIntrusivePtr<TClient> client)
  24. : DefaultTransaction_(defaultTransaction)
  25. , Impl_(MakeIntrusive<TRawBatchRequest>(client->GetContext().Config))
  26. , Client_(client)
  27. { }
  28. TBatchRequest::TBatchRequest(TRawBatchRequest* impl, ::TIntrusivePtr<TClient> client)
  29. : Impl_(impl)
  30. , Client_(std::move(client))
  31. { }
  32. TBatchRequest::~TBatchRequest() = default;
  33. IBatchRequestBase& TBatchRequest::WithTransaction(const TTransactionId& transactionId)
  34. {
  35. if (!TmpWithTransaction_) {
  36. TmpWithTransaction_.Reset(new TBatchRequest(Impl_.Get(), Client_));
  37. }
  38. TmpWithTransaction_->DefaultTransaction_ = transactionId;
  39. return *TmpWithTransaction_;
  40. }
  41. TFuture<TNode> TBatchRequest::Get(
  42. const TYPath& path,
  43. const TGetOptions& options)
  44. {
  45. return Impl_->Get(DefaultTransaction_, path, options);
  46. }
  47. TFuture<void> TBatchRequest::Set(const TYPath& path, const TNode& node, const TSetOptions& options)
  48. {
  49. return Impl_->Set(DefaultTransaction_, path, node, options);
  50. }
  51. TFuture<TNode::TListType> TBatchRequest::List(const TYPath& path, const TListOptions& options)
  52. {
  53. return Impl_->List(DefaultTransaction_, path, options);
  54. }
  55. TFuture<bool> TBatchRequest::Exists(const TYPath& path, const TExistsOptions& options)
  56. {
  57. return Impl_->Exists(DefaultTransaction_, path, options);
  58. }
  59. TFuture<ILockPtr> TBatchRequest::Lock(
  60. const TYPath& path,
  61. ELockMode mode,
  62. const TLockOptions& options)
  63. {
  64. auto convert = [waitable=options.Waitable_, client=Client_] (TFuture<TNodeId> nodeIdFuture) -> ILockPtr {
  65. return ::MakeIntrusive<TLock>(nodeIdFuture.GetValue(), client, waitable);
  66. };
  67. return Impl_->Lock(DefaultTransaction_, path, mode, options).Apply(convert);
  68. }
  69. ::NThreading::TFuture<void> TBatchRequest::Unlock(
  70. const TYPath& path,
  71. const TUnlockOptions& options = TUnlockOptions())
  72. {
  73. return Impl_->Unlock(DefaultTransaction_, path, options);
  74. }
  75. TFuture<TLockId> TBatchRequest::Create(
  76. const TYPath& path,
  77. ENodeType type,
  78. const TCreateOptions& options)
  79. {
  80. return Impl_->Create(DefaultTransaction_, path, type, options);
  81. }
  82. TFuture<void> TBatchRequest::Remove(
  83. const TYPath& path,
  84. const TRemoveOptions& options)
  85. {
  86. return Impl_->Remove(DefaultTransaction_, path, options);
  87. }
  88. TFuture<TNodeId> TBatchRequest::Move(
  89. const TYPath& sourcePath,
  90. const TYPath& destinationPath,
  91. const TMoveOptions& options)
  92. {
  93. return Impl_->Move(DefaultTransaction_, sourcePath, destinationPath, options);
  94. }
  95. TFuture<TNodeId> TBatchRequest::Copy(
  96. const TYPath& sourcePath,
  97. const TYPath& destinationPath,
  98. const TCopyOptions& options)
  99. {
  100. return Impl_->Copy(DefaultTransaction_, sourcePath, destinationPath, options);
  101. }
  102. TFuture<TNodeId> TBatchRequest::Link(
  103. const TYPath& targetPath,
  104. const TYPath& linkPath,
  105. const TLinkOptions& options)
  106. {
  107. return Impl_->Link(DefaultTransaction_, targetPath, linkPath, options);
  108. }
  109. TFuture<void> TBatchRequest::AbortOperation(const NYT::TOperationId& operationId)
  110. {
  111. return Impl_->AbortOperation(operationId);
  112. }
  113. TFuture<void> TBatchRequest::CompleteOperation(const NYT::TOperationId& operationId)
  114. {
  115. return Impl_->CompleteOperation(operationId);
  116. }
  117. TFuture<void> TBatchRequest::SuspendOperation(
  118. const TOperationId& operationId,
  119. const TSuspendOperationOptions& options)
  120. {
  121. return Impl_->SuspendOperation(operationId, options);
  122. }
  123. TFuture<void> TBatchRequest::ResumeOperation(
  124. const TOperationId& operationId,
  125. const TResumeOperationOptions& options)
  126. {
  127. return Impl_->ResumeOperation(operationId, options);
  128. }
  129. TFuture<void> TBatchRequest::UpdateOperationParameters(
  130. const NYT::TOperationId& operationId,
  131. const NYT::TUpdateOperationParametersOptions& options)
  132. {
  133. return Impl_->UpdateOperationParameters(operationId, options);
  134. }
  135. TFuture<TRichYPath> TBatchRequest::CanonizeYPath(const TRichYPath& path)
  136. {
  137. return Impl_->CanonizeYPath(path);
  138. }
  139. TFuture<TVector<TTableColumnarStatistics>> TBatchRequest::GetTableColumnarStatistics(
  140. const TVector<TRichYPath>& paths,
  141. const NYT::TGetTableColumnarStatisticsOptions& options)
  142. {
  143. return Impl_->GetTableColumnarStatistics(DefaultTransaction_, paths, options);
  144. }
  145. TFuture<TCheckPermissionResponse> TBatchRequest::CheckPermission(
  146. const TString& user,
  147. EPermission permission,
  148. const TYPath& path,
  149. const TCheckPermissionOptions& options)
  150. {
  151. return Impl_->CheckPermission(user, permission, path, options);
  152. }
  153. void TBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
  154. {
  155. NYT::NDetail::ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), *Impl_, options);
  156. }
  157. ////////////////////////////////////////////////////////////////////////////////
  158. } // namespace NDetail
  159. } // namespace NYT