raw_batch_request.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. #include "raw_batch_request.h"
  2. #include "raw_requests.h"
  3. #include "rpc_parameters_serialization.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  7. #include <yt/cpp/mapreduce/interface/client.h>
  8. #include <yt/cpp/mapreduce/interface/errors.h>
  9. #include <yt/cpp/mapreduce/interface/serialize.h>
  10. #include <library/cpp/yson/node/node.h>
  11. #include <yt/cpp/mapreduce/http/context.h>
  12. #include <yt/cpp/mapreduce/http/retry_request.h>
  13. #include <util/generic/guid.h>
  14. #include <util/string/builder.h>
  15. #include <exception>
  16. namespace NYT::NDetail::NRawClient {
  17. using NThreading::TFuture;
  18. using NThreading::TPromise;
  19. using NThreading::NewPromise;
  20. ////////////////////////////////////////////////////////////////////
  21. static TString RequestInfo(const TNode& request)
  22. {
  23. return ::TStringBuilder()
  24. << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]);
  25. }
  26. static void EnsureNothing(const TMaybe<TNode>& node)
  27. {
  28. Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType());
  29. }
  30. static void EnsureSomething(const TMaybe<TNode>& node)
  31. {
  32. Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response.");
  33. }
  34. static void EnsureType(const TNode& node, TNode::EType type)
  35. {
  36. Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. "
  37. << "Expected: " << type << ", actual: " << node.GetType());
  38. }
  39. static void EnsureType(const TMaybe<TNode>& node, TNode::EType type)
  40. {
  41. Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response.");
  42. EnsureType(*node, type);
  43. }
  44. ////////////////////////////////////////////////////////////////////
  45. template <typename TReturnType>
  46. class TResponseParserBase
  47. : public TRawBatchRequest::IResponseItemParser
  48. {
  49. public:
  50. using TFutureResult = TFuture<TReturnType>;
  51. public:
  52. TResponseParserBase()
  53. : Result_(NewPromise<TReturnType>())
  54. { }
  55. void SetException(std::exception_ptr e) override
  56. {
  57. Result_.SetException(std::move(e));
  58. }
  59. TFuture<TReturnType> GetFuture()
  60. {
  61. return Result_.GetFuture();
  62. }
  63. protected:
  64. TPromise<TReturnType> Result_;
  65. };
  66. ////////////////////////////////////////////////////////////////////
  67. class TGetResponseParser
  68. : public TResponseParserBase<TNode>
  69. {
  70. public:
  71. void SetResponse(TMaybe<TNode> node) override
  72. {
  73. EnsureSomething(node);
  74. Result_.SetValue(std::move(*node));
  75. }
  76. };
  77. ////////////////////////////////////////////////////////////////////
  78. class TVoidResponseParser
  79. : public TResponseParserBase<void>
  80. {
  81. public:
  82. void SetResponse(TMaybe<TNode> node) override
  83. {
  84. EnsureNothing(node);
  85. Result_.SetValue();
  86. }
  87. };
  88. ////////////////////////////////////////////////////////////////////
  89. class TListResponseParser
  90. : public TResponseParserBase<TNode::TListType>
  91. {
  92. public:
  93. void SetResponse(TMaybe<TNode> node) override
  94. {
  95. EnsureType(node, TNode::List);
  96. Result_.SetValue(std::move(node->AsList()));
  97. }
  98. };
  99. ////////////////////////////////////////////////////////////////////
  100. class TExistsResponseParser
  101. : public TResponseParserBase<bool>
  102. {
  103. public:
  104. void SetResponse(TMaybe<TNode> node) override
  105. {
  106. EnsureType(node, TNode::Bool);
  107. Result_.SetValue(std::move(node->AsBool()));
  108. }
  109. };
  110. ////////////////////////////////////////////////////////////////////
  111. class TGuidResponseParser
  112. : public TResponseParserBase<TGUID>
  113. {
  114. public:
  115. void SetResponse(TMaybe<TNode> node) override
  116. {
  117. EnsureType(node, TNode::String);
  118. Result_.SetValue(GetGuid(node->AsString()));
  119. }
  120. };
  121. ////////////////////////////////////////////////////////////////////
  122. class TCanonizeYPathResponseParser
  123. : public TResponseParserBase<TRichYPath>
  124. {
  125. public:
  126. explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original)
  127. : OriginalNode_(PathToNode(original))
  128. , PathPrefix_(std::move(pathPrefix))
  129. { }
  130. void SetResponse(TMaybe<TNode> node) override
  131. {
  132. EnsureType(node, TNode::String);
  133. for (const auto& item : OriginalNode_.GetAttributes().AsMap()) {
  134. node->Attributes()[item.first] = item.second;
  135. }
  136. TRichYPath result;
  137. Deserialize(result, *node);
  138. result.Path_ = AddPathPrefix(result.Path_, PathPrefix_);
  139. Result_.SetValue(result);
  140. }
  141. private:
  142. TNode OriginalNode_;
  143. TString PathPrefix_;
  144. };
  145. ////////////////////////////////////////////////////////////////////
  146. class TGetOperationResponseParser
  147. : public TResponseParserBase<TOperationAttributes>
  148. {
  149. public:
  150. void SetResponse(TMaybe<TNode> node) override
  151. {
  152. EnsureType(node, TNode::Map);
  153. Result_.SetValue(ParseOperationAttributes(*node));
  154. }
  155. };
  156. ////////////////////////////////////////////////////////////////////
  157. class TTableColumnarStatisticsParser
  158. : public TResponseParserBase<TVector<TTableColumnarStatistics>>
  159. {
  160. public:
  161. void SetResponse(TMaybe<TNode> node) override
  162. {
  163. EnsureType(node, TNode::Map);
  164. TVector<TTableColumnarStatistics> statistics;
  165. Deserialize(statistics, *node);
  166. Result_.SetValue(std::move(statistics));
  167. }
  168. };
  169. ////////////////////////////////////////////////////////////////////
  170. class TTablePartitionsParser
  171. : public TResponseParserBase<TMultiTablePartitions>
  172. {
  173. public:
  174. void SetResponse(TMaybe<TNode> node) override
  175. {
  176. EnsureType(node, TNode::Map);
  177. TMultiTablePartitions partitions;
  178. Deserialize(partitions, *node);
  179. Result_.SetValue(std::move(partitions));
  180. }
  181. };
  182. ////////////////////////////////////////////////////////////////////////////////
  183. class TGetFileFromCacheParser
  184. : public TResponseParserBase<TMaybe<TYPath>>
  185. {
  186. public:
  187. void SetResponse(TMaybe<TNode> node) override
  188. {
  189. EnsureType(node, TNode::String);
  190. if (node->AsString().empty()) {
  191. Result_.SetValue(Nothing());
  192. } else {
  193. Result_.SetValue(node->AsString());
  194. }
  195. }
  196. };
  197. ////////////////////////////////////////////////////////////////////
  198. class TYPathParser
  199. : public TResponseParserBase<TYPath>
  200. {
  201. public:
  202. void SetResponse(TMaybe<TNode> node) override
  203. {
  204. EnsureType(node, TNode::String);
  205. Result_.SetValue(node->AsString());
  206. }
  207. };
  208. ////////////////////////////////////////////////////////////////////
  209. class TCheckPermissionParser
  210. : public TResponseParserBase<TCheckPermissionResponse>
  211. {
  212. public:
  213. void SetResponse(TMaybe<TNode> node) override
  214. {
  215. EnsureType(node, TNode::Map);
  216. Result_.SetValue(ParseCheckPermissionResponse(*node));
  217. }
  218. };
  219. ////////////////////////////////////////////////////////////////////
  220. TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser)
  221. : Parameters(std::move(parameters))
  222. , ResponseParser(std::move(responseParser))
  223. , NextTry()
  224. { }
  225. TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry)
  226. : Parameters(batchItem.Parameters)
  227. , ResponseParser(batchItem.ResponseParser)
  228. , NextTry(nextTry)
  229. { }
  230. ////////////////////////////////////////////////////////////////////
  231. TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config)
  232. : Config_(config)
  233. { }
  234. TRawBatchRequest::~TRawBatchRequest() = default;
  235. bool TRawBatchRequest::IsExecuted() const
  236. {
  237. return Executed_;
  238. }
  239. void TRawBatchRequest::MarkExecuted()
  240. {
  241. Executed_ = true;
  242. }
  243. template <typename TResponseParser>
  244. typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
  245. const TString& command,
  246. TNode parameters,
  247. TMaybe<TNode> input)
  248. {
  249. return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>());
  250. }
  251. template <typename TResponseParser>
  252. typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
  253. const TString& command,
  254. TNode parameters,
  255. TMaybe<TNode> input,
  256. ::TIntrusivePtr<TResponseParser> parser)
  257. {
  258. Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
  259. TNode request;
  260. request["command"] = command;
  261. request["parameters"] = std::move(parameters);
  262. if (input) {
  263. request["input"] = std::move(*input);
  264. }
  265. BatchItemList_.emplace_back(std::move(request), parser);
  266. return parser->GetFuture();
  267. }
  268. void TRawBatchRequest::AddRequest(TBatchItem batchItem)
  269. {
  270. Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
  271. BatchItemList_.push_back(batchItem);
  272. }
  273. TFuture<TNodeId> TRawBatchRequest::Create(
  274. const TTransactionId& transaction,
  275. const TYPath& path,
  276. ENodeType type,
  277. const TCreateOptions& options)
  278. {
  279. return AddRequest<TGuidResponseParser>(
  280. "create",
  281. SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options),
  282. Nothing());
  283. }
  284. TFuture<void> TRawBatchRequest::Remove(
  285. const TTransactionId& transaction,
  286. const TYPath& path,
  287. const TRemoveOptions& options)
  288. {
  289. return AddRequest<TVoidResponseParser>(
  290. "remove",
  291. SerializeParamsForRemove(transaction, Config_->Prefix, path, options),
  292. Nothing());
  293. }
  294. TFuture<bool> TRawBatchRequest::Exists(
  295. const TTransactionId& transaction,
  296. const TYPath& path,
  297. const TExistsOptions& options)
  298. {
  299. return AddRequest<TExistsResponseParser>(
  300. "exists",
  301. SerializeParamsForExists(transaction, Config_->Prefix, path, options),
  302. Nothing());
  303. }
  304. TFuture<TNode> TRawBatchRequest::Get(
  305. const TTransactionId& transaction,
  306. const TYPath& path,
  307. const TGetOptions& options)
  308. {
  309. return AddRequest<TGetResponseParser>(
  310. "get",
  311. SerializeParamsForGet(transaction, Config_->Prefix, path, options),
  312. Nothing());
  313. }
  314. TFuture<void> TRawBatchRequest::Set(
  315. const TTransactionId& transaction,
  316. const TYPath& path,
  317. const TNode& node,
  318. const TSetOptions& options)
  319. {
  320. return AddRequest<TVoidResponseParser>(
  321. "set",
  322. SerializeParamsForSet(transaction, Config_->Prefix, path, options),
  323. node);
  324. }
  325. TFuture<TNode::TListType> TRawBatchRequest::List(
  326. const TTransactionId& transaction,
  327. const TYPath& path,
  328. const TListOptions& options)
  329. {
  330. return AddRequest<TListResponseParser>(
  331. "list",
  332. SerializeParamsForList(transaction, Config_->Prefix, path, options),
  333. Nothing());
  334. }
  335. TFuture<TNodeId> TRawBatchRequest::Copy(
  336. const TTransactionId& transaction,
  337. const TYPath& sourcePath,
  338. const TYPath& destinationPath,
  339. const TCopyOptions& options)
  340. {
  341. return AddRequest<TGuidResponseParser>(
  342. "copy",
  343. SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options),
  344. Nothing());
  345. }
  346. TFuture<TNodeId> TRawBatchRequest::Move(
  347. const TTransactionId& transaction,
  348. const TYPath& sourcePath,
  349. const TYPath& destinationPath,
  350. const TMoveOptions& options)
  351. {
  352. return AddRequest<TGuidResponseParser>(
  353. "move",
  354. SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options),
  355. Nothing());
  356. }
  357. TFuture<TNodeId> TRawBatchRequest::Link(
  358. const TTransactionId& transaction,
  359. const TYPath& targetPath,
  360. const TYPath& linkPath,
  361. const TLinkOptions& options)
  362. {
  363. return AddRequest<TGuidResponseParser>(
  364. "link",
  365. SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options),
  366. Nothing());
  367. }
  368. TFuture<TLockId> TRawBatchRequest::Lock(
  369. const TTransactionId& transaction,
  370. const TYPath& path,
  371. ELockMode mode,
  372. const TLockOptions& options)
  373. {
  374. return AddRequest<TGuidResponseParser>(
  375. "lock",
  376. SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options),
  377. Nothing());
  378. }
  379. TFuture<void> TRawBatchRequest::Unlock(
  380. const TTransactionId& transaction,
  381. const TYPath& path,
  382. const TUnlockOptions& options)
  383. {
  384. return AddRequest<TVoidResponseParser>(
  385. "unlock",
  386. SerializeParamsForUnlock(transaction, Config_->Prefix, path, options),
  387. Nothing());
  388. }
  389. TFuture<TMaybe<TYPath>> TRawBatchRequest::GetFileFromCache(
  390. const TTransactionId& transactionId,
  391. const TString& md5Signature,
  392. const TYPath& cachePath,
  393. const TGetFileFromCacheOptions& options)
  394. {
  395. return AddRequest<TGetFileFromCacheParser>(
  396. "get_file_from_cache",
  397. SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options),
  398. Nothing());
  399. }
  400. TFuture<TYPath> TRawBatchRequest::PutFileToCache(
  401. const TTransactionId& transactionId,
  402. const TYPath& filePath,
  403. const TString& md5Signature,
  404. const TYPath& cachePath,
  405. const TPutFileToCacheOptions& options)
  406. {
  407. return AddRequest<TYPathParser>(
  408. "put_file_to_cache",
  409. SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options),
  410. Nothing());
  411. }
  412. TFuture<TCheckPermissionResponse> TRawBatchRequest::CheckPermission(
  413. const TString& user,
  414. EPermission permission,
  415. const TYPath& path,
  416. const TCheckPermissionOptions& options)
  417. {
  418. return AddRequest<TCheckPermissionParser>(
  419. "check_permission",
  420. SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options),
  421. Nothing());
  422. }
  423. TFuture<TOperationAttributes> TRawBatchRequest::GetOperation(
  424. const TOperationId& operationId,
  425. const TGetOperationOptions& options)
  426. {
  427. return AddRequest<TGetOperationResponseParser>(
  428. "get_operation",
  429. SerializeParamsForGetOperation(operationId, options),
  430. Nothing());
  431. }
  432. TFuture<void> TRawBatchRequest::AbortOperation(const TOperationId& operationId)
  433. {
  434. return AddRequest<TVoidResponseParser>(
  435. "abort_op",
  436. SerializeParamsForAbortOperation(operationId),
  437. Nothing());
  438. }
  439. TFuture<void> TRawBatchRequest::CompleteOperation(const TOperationId& operationId)
  440. {
  441. return AddRequest<TVoidResponseParser>(
  442. "complete_op",
  443. SerializeParamsForCompleteOperation(operationId),
  444. Nothing());
  445. }
  446. TFuture<void> TRawBatchRequest::SuspendOperation(
  447. const TOperationId& operationId,
  448. const TSuspendOperationOptions& options)
  449. {
  450. return AddRequest<TVoidResponseParser>(
  451. "suspend_operation",
  452. SerializeParamsForSuspendOperation(operationId, options),
  453. Nothing());
  454. }
  455. TFuture<void> TRawBatchRequest::ResumeOperation(
  456. const TOperationId& operationId,
  457. const TResumeOperationOptions& options)
  458. {
  459. return AddRequest<TVoidResponseParser>(
  460. "resume_operation",
  461. SerializeParamsForResumeOperation(operationId, options),
  462. Nothing());
  463. }
  464. TFuture<void> TRawBatchRequest::UpdateOperationParameters(
  465. const TOperationId& operationId,
  466. const TUpdateOperationParametersOptions& options)
  467. {
  468. return AddRequest<TVoidResponseParser>(
  469. "update_op_parameters",
  470. SerializeParamsForUpdateOperationParameters(operationId, options),
  471. Nothing());
  472. }
  473. TFuture<TRichYPath> TRawBatchRequest::CanonizeYPath(const TRichYPath& path)
  474. {
  475. TRichYPath result = path;
  476. // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath.
  477. if (!result.Path_.StartsWith("<")) {
  478. result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix);
  479. }
  480. if (result.Path_.find_first_of("<>{}[]") != TString::npos) {
  481. return AddRequest<TCanonizeYPathResponseParser>(
  482. "parse_ypath",
  483. SerializeParamsForParseYPath(result),
  484. Nothing(),
  485. MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, result));
  486. }
  487. return NThreading::MakeFuture(result);
  488. }
  489. TFuture<TVector<TTableColumnarStatistics>> TRawBatchRequest::GetTableColumnarStatistics(
  490. const TTransactionId& transaction,
  491. const TVector<TRichYPath>& paths,
  492. const TGetTableColumnarStatisticsOptions& options)
  493. {
  494. return AddRequest<TTableColumnarStatisticsParser>(
  495. "get_table_columnar_statistics",
  496. SerializeParamsForGetTableColumnarStatistics(transaction, paths, options),
  497. Nothing());
  498. }
  499. TFuture<TMultiTablePartitions> TRawBatchRequest::GetTablePartitions(
  500. const TTransactionId& transaction,
  501. const TVector<TRichYPath>& paths,
  502. const TGetTablePartitionsOptions& options)
  503. {
  504. return AddRequest<TTablePartitionsParser>(
  505. "partition_tables",
  506. SerializeParamsForGetTablePartitions(transaction, paths, options),
  507. Nothing());
  508. }
  509. void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const
  510. {
  511. Y_ABORT_UNLESS(result);
  512. Y_ABORT_UNLESS(nextTry);
  513. *nextTry = TInstant();
  514. maxSize = Min(maxSize, BatchItemList_.size());
  515. *result = TNode::CreateList();
  516. for (size_t i = 0; i < maxSize; ++i) {
  517. YT_LOG_DEBUG("ExecuteBatch preparing: %v",
  518. RequestInfo(BatchItemList_[i].Parameters));
  519. result->Add(BatchItemList_[i].Parameters);
  520. if (BatchItemList_[i].NextTry > *nextTry) {
  521. *nextTry = BatchItemList_[i].NextTry;
  522. }
  523. }
  524. }
  525. void TRawBatchRequest::ParseResponse(
  526. const TResponseInfo& requestResult,
  527. const IRequestRetryPolicyPtr& retryPolicy,
  528. TRawBatchRequest* retryBatch,
  529. TInstant now)
  530. {
  531. TNode node = NodeFromYsonString(requestResult.Response);
  532. return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now);
  533. }
  534. void TRawBatchRequest::ParseResponse(
  535. TNode node,
  536. const TString& requestId,
  537. const IRequestRetryPolicyPtr& retryPolicy,
  538. TRawBatchRequest* retryBatch,
  539. TInstant now)
  540. {
  541. Y_ABORT_UNLESS(retryBatch);
  542. EnsureType(node, TNode::List);
  543. auto& responseList = node.AsList();
  544. const auto size = responseList.size();
  545. Y_ENSURE(size <= BatchItemList_.size(),
  546. "Size of server response exceeds size of batch request;"
  547. " size of batch: " << BatchItemList_.size() <<
  548. " size of server response: " << size << '.');
  549. for (size_t i = 0; i != size; ++i) {
  550. try {
  551. EnsureType(responseList[i], TNode::Map);
  552. auto& responseNode = responseList[i].AsMap();
  553. const auto outputIt = responseNode.find("output");
  554. if (outputIt != responseNode.end()) {
  555. BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second));
  556. } else {
  557. const auto errorIt = responseNode.find("error");
  558. if (errorIt == responseNode.end()) {
  559. BatchItemList_[i].ResponseParser->SetResponse(Nothing());
  560. } else {
  561. TErrorResponse error(400, requestId);
  562. error.SetError(TYtError(errorIt->second));
  563. if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) {
  564. YT_LOG_INFO(
  565. "Batch subrequest (%s) failed, will retry, error: %s",
  566. RequestInfo(BatchItemList_[i].Parameters),
  567. error.what());
  568. retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
  569. } else {
  570. YT_LOG_ERROR(
  571. "Batch subrequest (%s) failed, error: %s",
  572. RequestInfo(BatchItemList_[i].Parameters),
  573. error.what());
  574. BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error));
  575. }
  576. }
  577. }
  578. } catch (const std::exception& e) {
  579. // We don't expect other exceptions, so we don't catch (...)
  580. BatchItemList_[i].ResponseParser->SetException(std::current_exception());
  581. }
  582. }
  583. BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size);
  584. }
  585. void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const
  586. {
  587. for (const auto& batchItem : BatchItemList_) {
  588. batchItem.ResponseParser->SetException(e);
  589. }
  590. }
  591. size_t TRawBatchRequest::BatchSize() const
  592. {
  593. return BatchItemList_.size();
  594. }
  595. ////////////////////////////////////////////////////////////////////////////////
  596. } // namespace NYT::NDetail::NRawClient