raw_batch_request.cpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. #include "raw_batch_request.h"
  2. #include "raw_requests.h"
  3. #include "rpc_parameters_serialization.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  7. #include <yt/cpp/mapreduce/interface/client.h>
  8. #include <yt/cpp/mapreduce/interface/errors.h>
  9. #include <yt/cpp/mapreduce/interface/serialize.h>
  10. #include <library/cpp/yson/node/node.h>
  11. #include <yt/cpp/mapreduce/http/context.h>
  12. #include <yt/cpp/mapreduce/http/retry_request.h>
  13. #include <util/generic/guid.h>
  14. #include <util/generic/scope.h>
  15. #include <util/string/builder.h>
  16. #include <exception>
  17. namespace NYT::NDetail::NRawClient {
  18. using NThreading::TFuture;
  19. using NThreading::TPromise;
  20. using NThreading::NewPromise;
  21. ////////////////////////////////////////////////////////////////////////////////
  22. static TString RequestInfo(const TNode& request)
  23. {
  24. return ::TStringBuilder()
  25. << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]);
  26. }
  27. static void EnsureNothing(const TMaybe<TNode>& node)
  28. {
  29. Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType());
  30. }
  31. static void EnsureSomething(const TMaybe<TNode>& node)
  32. {
  33. Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response.");
  34. }
  35. static void EnsureType(const TNode& node, TNode::EType type)
  36. {
  37. Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. "
  38. << "Expected: " << type << ", actual: " << node.GetType());
  39. }
  40. static void EnsureType(const TMaybe<TNode>& node, TNode::EType type)
  41. {
  42. Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response.");
  43. EnsureType(*node, type);
  44. }
  45. ////////////////////////////////////////////////////////////////////////////////
  46. template <typename TReturnType>
  47. class TResponseParserBase
  48. : public THttpRawBatchRequest::IResponseItemParser
  49. {
  50. public:
  51. using TFutureResult = TFuture<TReturnType>;
  52. public:
  53. TResponseParserBase()
  54. : Result_(NewPromise<TReturnType>())
  55. { }
  56. void SetException(std::exception_ptr e) override
  57. {
  58. Result_.SetException(std::move(e));
  59. }
  60. TFuture<TReturnType> GetFuture()
  61. {
  62. return Result_.GetFuture();
  63. }
  64. protected:
  65. TPromise<TReturnType> Result_;
  66. };
  67. ////////////////////////////////////////////////////////////////////////////////
  68. class TGetResponseParser
  69. : public TResponseParserBase<TNode>
  70. {
  71. public:
  72. void SetResponse(TMaybe<TNode> node) override
  73. {
  74. EnsureSomething(node);
  75. Result_.SetValue(std::move(*node));
  76. }
  77. };
  78. ////////////////////////////////////////////////////////////////////////////////
  79. class TVoidResponseParser
  80. : public TResponseParserBase<void>
  81. {
  82. public:
  83. void SetResponse(TMaybe<TNode> node) override
  84. {
  85. EnsureNothing(node);
  86. Result_.SetValue();
  87. }
  88. };
  89. ////////////////////////////////////////////////////////////////////////////////
  90. class TListResponseParser
  91. : public TResponseParserBase<TNode::TListType>
  92. {
  93. public:
  94. void SetResponse(TMaybe<TNode> node) override
  95. {
  96. EnsureType(node, TNode::List);
  97. Result_.SetValue(std::move(node->AsList()));
  98. }
  99. };
  100. ////////////////////////////////////////////////////////////////////////////////
  101. class TExistsResponseParser
  102. : public TResponseParserBase<bool>
  103. {
  104. public:
  105. void SetResponse(TMaybe<TNode> node) override
  106. {
  107. EnsureType(node, TNode::Bool);
  108. Result_.SetValue(std::move(node->AsBool()));
  109. }
  110. };
  111. ////////////////////////////////////////////////////////////////////////////////
  112. class TGuidResponseParser
  113. : public TResponseParserBase<TGUID>
  114. {
  115. public:
  116. void SetResponse(TMaybe<TNode> node) override
  117. {
  118. EnsureType(node, TNode::String);
  119. Result_.SetValue(GetGuid(node->AsString()));
  120. }
  121. };
  122. ////////////////////////////////////////////////////////////////////////////////
  123. class TCanonizeYPathResponseParser
  124. : public TResponseParserBase<TRichYPath>
  125. {
  126. public:
  127. explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original)
  128. : OriginalNode_(PathToNode(original))
  129. , PathPrefix_(std::move(pathPrefix))
  130. { }
  131. void SetResponse(TMaybe<TNode> node) override
  132. {
  133. EnsureType(node, TNode::String);
  134. for (const auto& item : OriginalNode_.GetAttributes().AsMap()) {
  135. node->Attributes()[item.first] = item.second;
  136. }
  137. TRichYPath result;
  138. Deserialize(result, *node);
  139. result.Path_ = AddPathPrefix(result.Path_, PathPrefix_);
  140. Result_.SetValue(result);
  141. }
  142. private:
  143. TNode OriginalNode_;
  144. TString PathPrefix_;
  145. };
  146. ////////////////////////////////////////////////////////////////////////////////
  147. class TGetOperationResponseParser
  148. : public TResponseParserBase<TOperationAttributes>
  149. {
  150. public:
  151. void SetResponse(TMaybe<TNode> node) override
  152. {
  153. EnsureType(node, TNode::Map);
  154. Result_.SetValue(ParseOperationAttributes(*node));
  155. }
  156. };
  157. ////////////////////////////////////////////////////////////////////////////////
  158. class TTableColumnarStatisticsParser
  159. : public TResponseParserBase<TVector<TTableColumnarStatistics>>
  160. {
  161. public:
  162. void SetResponse(TMaybe<TNode> node) override
  163. {
  164. EnsureType(node, TNode::List);
  165. TVector<TTableColumnarStatistics> statistics;
  166. Deserialize(statistics, *node);
  167. Result_.SetValue(std::move(statistics));
  168. }
  169. };
  170. ////////////////////////////////////////////////////////////////////////////////
  171. class TTablePartitionsParser
  172. : public TResponseParserBase<TMultiTablePartitions>
  173. {
  174. public:
  175. void SetResponse(TMaybe<TNode> node) override
  176. {
  177. EnsureType(node, TNode::Map);
  178. TMultiTablePartitions partitions;
  179. Deserialize(partitions, *node);
  180. Result_.SetValue(std::move(partitions));
  181. }
  182. };
  183. ////////////////////////////////////////////////////////////////////////////////
  184. class TGetFileFromCacheParser
  185. : public TResponseParserBase<TMaybe<TYPath>>
  186. {
  187. public:
  188. void SetResponse(TMaybe<TNode> node) override
  189. {
  190. EnsureType(node, TNode::String);
  191. if (node->AsString().empty()) {
  192. Result_.SetValue(Nothing());
  193. } else {
  194. Result_.SetValue(node->AsString());
  195. }
  196. }
  197. };
  198. ////////////////////////////////////////////////////////////////////////////////
  199. class TYPathParser
  200. : public TResponseParserBase<TYPath>
  201. {
  202. public:
  203. void SetResponse(TMaybe<TNode> node) override
  204. {
  205. EnsureType(node, TNode::String);
  206. Result_.SetValue(node->AsString());
  207. }
  208. };
  209. ////////////////////////////////////////////////////////////////////////////////
  210. class TCheckPermissionParser
  211. : public TResponseParserBase<TCheckPermissionResponse>
  212. {
  213. public:
  214. void SetResponse(TMaybe<TNode> node) override
  215. {
  216. EnsureType(node, TNode::Map);
  217. Result_.SetValue(ParseCheckPermissionResponse(*node));
  218. }
  219. };
  220. ////////////////////////////////////////////////////////////////////////////////
  221. THttpRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser)
  222. : Parameters(std::move(parameters))
  223. , ResponseParser(std::move(responseParser))
  224. , NextTry()
  225. { }
  226. THttpRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry)
  227. : Parameters(batchItem.Parameters)
  228. , ResponseParser(batchItem.ResponseParser)
  229. , NextTry(nextTry)
  230. { }
  231. ////////////////////////////////////////////////////////////////////////////////
  232. THttpRawBatchRequest::THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy)
  233. : Context_(context)
  234. , RequestRetryPolicy_(std::move(retryPolicy))
  235. { }
  236. THttpRawBatchRequest::~THttpRawBatchRequest() = default;
  237. void THttpRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
  238. {
  239. if (IsExecuted()) {
  240. ythrow yexception() << "Cannot execute batch request since it is already executed";
  241. }
  242. Y_DEFER {
  243. MarkExecuted();
  244. };
  245. const auto concurrency = options.Concurrency_.GetOrElse(50);
  246. const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
  247. if (!RequestRetryPolicy_) {
  248. RequestRetryPolicy_ = CreateDefaultRequestRetryPolicy(Context_.Config);
  249. }
  250. while (BatchSize()) {
  251. auto parameters = TNode::CreateMap();
  252. TInstant nextTry;
  253. FillParameterList(batchPartMaxSize, &parameters["requests"], &nextTry);
  254. if (nextTry) {
  255. SleepUntil(nextTry);
  256. }
  257. parameters["concurrency"] = concurrency;
  258. auto body = NodeToYsonString(parameters);
  259. THttpHeader header("POST", "execute_batch");
  260. header.AddMutationId();
  261. TResponseInfo result;
  262. try {
  263. result = RequestWithRetry<TResponseInfo>(
  264. RequestRetryPolicy_,
  265. [this, &header, &body] (TMutationId& mutationId) {
  266. auto response = RequestWithoutRetry(Context_, mutationId, header, body);
  267. return TResponseInfo{
  268. .RequestId = response->GetRequestId(),
  269. .Response = response->GetResponse(),
  270. .HttpCode = response->GetStatusCode(),
  271. };
  272. });
  273. } catch (const std::exception& e) {
  274. SetErrorResult(std::current_exception());
  275. throw;
  276. }
  277. ParseResponse(std::move(result), RequestRetryPolicy_.Get());
  278. }
  279. }
  280. bool THttpRawBatchRequest::IsExecuted() const
  281. {
  282. return Executed_;
  283. }
  284. void THttpRawBatchRequest::MarkExecuted()
  285. {
  286. Executed_ = true;
  287. }
  288. template <typename TResponseParser>
  289. typename TResponseParser::TFutureResult THttpRawBatchRequest::AddRequest(
  290. const TString& command,
  291. TNode parameters,
  292. TMaybe<TNode> input)
  293. {
  294. return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>());
  295. }
  296. template <typename TResponseParser>
  297. typename TResponseParser::TFutureResult THttpRawBatchRequest::AddRequest(
  298. const TString& command,
  299. TNode parameters,
  300. TMaybe<TNode> input,
  301. ::TIntrusivePtr<TResponseParser> parser)
  302. {
  303. Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
  304. TNode request;
  305. request["command"] = command;
  306. request["parameters"] = std::move(parameters);
  307. if (input) {
  308. request["input"] = std::move(*input);
  309. }
  310. BatchItemList_.emplace_back(std::move(request), parser);
  311. return parser->GetFuture();
  312. }
  313. void THttpRawBatchRequest::AddRequest(TBatchItem batchItem)
  314. {
  315. Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
  316. BatchItemList_.push_back(batchItem);
  317. }
  318. TFuture<TNodeId> THttpRawBatchRequest::Create(
  319. const TTransactionId& transaction,
  320. const TYPath& path,
  321. ENodeType type,
  322. const TCreateOptions& options)
  323. {
  324. return AddRequest<TGuidResponseParser>(
  325. "create",
  326. SerializeParamsForCreate(transaction, Context_.Config->Prefix, path, type, options),
  327. Nothing());
  328. }
  329. TFuture<void> THttpRawBatchRequest::Remove(
  330. const TTransactionId& transaction,
  331. const TYPath& path,
  332. const TRemoveOptions& options)
  333. {
  334. return AddRequest<TVoidResponseParser>(
  335. "remove",
  336. SerializeParamsForRemove(transaction, Context_.Config->Prefix, path, options),
  337. Nothing());
  338. }
  339. TFuture<bool> THttpRawBatchRequest::Exists(
  340. const TTransactionId& transaction,
  341. const TYPath& path,
  342. const TExistsOptions& options)
  343. {
  344. return AddRequest<TExistsResponseParser>(
  345. "exists",
  346. SerializeParamsForExists(transaction, Context_.Config->Prefix, path, options),
  347. Nothing());
  348. }
  349. TFuture<TNode> THttpRawBatchRequest::Get(
  350. const TTransactionId& transaction,
  351. const TYPath& path,
  352. const TGetOptions& options)
  353. {
  354. return AddRequest<TGetResponseParser>(
  355. "get",
  356. SerializeParamsForGet(transaction, Context_.Config->Prefix, path, options),
  357. Nothing());
  358. }
  359. TFuture<void> THttpRawBatchRequest::Set(
  360. const TTransactionId& transaction,
  361. const TYPath& path,
  362. const TNode& node,
  363. const TSetOptions& options)
  364. {
  365. return AddRequest<TVoidResponseParser>(
  366. "set",
  367. SerializeParamsForSet(transaction, Context_.Config->Prefix, path, options),
  368. node);
  369. }
  370. TFuture<TNode::TListType> THttpRawBatchRequest::List(
  371. const TTransactionId& transaction,
  372. const TYPath& path,
  373. const TListOptions& options)
  374. {
  375. return AddRequest<TListResponseParser>(
  376. "list",
  377. SerializeParamsForList(transaction, Context_.Config->Prefix, path, options),
  378. Nothing());
  379. }
  380. TFuture<TNodeId> THttpRawBatchRequest::Copy(
  381. const TTransactionId& transaction,
  382. const TYPath& sourcePath,
  383. const TYPath& destinationPath,
  384. const TCopyOptions& options)
  385. {
  386. return AddRequest<TGuidResponseParser>(
  387. "copy",
  388. SerializeParamsForCopy(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
  389. Nothing());
  390. }
  391. TFuture<TNodeId> THttpRawBatchRequest::Move(
  392. const TTransactionId& transaction,
  393. const TYPath& sourcePath,
  394. const TYPath& destinationPath,
  395. const TMoveOptions& options)
  396. {
  397. return AddRequest<TGuidResponseParser>(
  398. "move",
  399. SerializeParamsForMove(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
  400. Nothing());
  401. }
  402. TFuture<TNodeId> THttpRawBatchRequest::Link(
  403. const TTransactionId& transaction,
  404. const TYPath& targetPath,
  405. const TYPath& linkPath,
  406. const TLinkOptions& options)
  407. {
  408. return AddRequest<TGuidResponseParser>(
  409. "link",
  410. SerializeParamsForLink(transaction, Context_.Config->Prefix, targetPath, linkPath, options),
  411. Nothing());
  412. }
  413. TFuture<TLockId> THttpRawBatchRequest::Lock(
  414. const TTransactionId& transaction,
  415. const TYPath& path,
  416. ELockMode mode,
  417. const TLockOptions& options)
  418. {
  419. return AddRequest<TGuidResponseParser>(
  420. "lock",
  421. SerializeParamsForLock(transaction, Context_.Config->Prefix, path, mode, options),
  422. Nothing());
  423. }
  424. TFuture<void> THttpRawBatchRequest::Unlock(
  425. const TTransactionId& transaction,
  426. const TYPath& path,
  427. const TUnlockOptions& options)
  428. {
  429. return AddRequest<TVoidResponseParser>(
  430. "unlock",
  431. SerializeParamsForUnlock(transaction, Context_.Config->Prefix, path, options),
  432. Nothing());
  433. }
  434. TFuture<TMaybe<TYPath>> THttpRawBatchRequest::GetFileFromCache(
  435. const TTransactionId& transactionId,
  436. const TString& md5Signature,
  437. const TYPath& cachePath,
  438. const TGetFileFromCacheOptions& options)
  439. {
  440. return AddRequest<TGetFileFromCacheParser>(
  441. "get_file_from_cache",
  442. SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options),
  443. Nothing());
  444. }
  445. TFuture<TYPath> THttpRawBatchRequest::PutFileToCache(
  446. const TTransactionId& transactionId,
  447. const TYPath& filePath,
  448. const TString& md5Signature,
  449. const TYPath& cachePath,
  450. const TPutFileToCacheOptions& options)
  451. {
  452. return AddRequest<TYPathParser>(
  453. "put_file_to_cache",
  454. SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options),
  455. Nothing());
  456. }
  457. TFuture<TCheckPermissionResponse> THttpRawBatchRequest::CheckPermission(
  458. const TString& user,
  459. EPermission permission,
  460. const TYPath& path,
  461. const TCheckPermissionOptions& options)
  462. {
  463. return AddRequest<TCheckPermissionParser>(
  464. "check_permission",
  465. SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options),
  466. Nothing());
  467. }
  468. TFuture<TOperationAttributes> THttpRawBatchRequest::GetOperation(
  469. const TOperationId& operationId,
  470. const TGetOperationOptions& options)
  471. {
  472. return AddRequest<TGetOperationResponseParser>(
  473. "get_operation",
  474. SerializeParamsForGetOperation(operationId, options),
  475. Nothing());
  476. }
  477. TFuture<void> THttpRawBatchRequest::AbortOperation(const TOperationId& operationId)
  478. {
  479. return AddRequest<TVoidResponseParser>(
  480. "abort_op",
  481. SerializeParamsForAbortOperation(operationId),
  482. Nothing());
  483. }
  484. TFuture<void> THttpRawBatchRequest::CompleteOperation(const TOperationId& operationId)
  485. {
  486. return AddRequest<TVoidResponseParser>(
  487. "complete_op",
  488. SerializeParamsForCompleteOperation(operationId),
  489. Nothing());
  490. }
  491. TFuture<void> THttpRawBatchRequest::SuspendOperation(
  492. const TOperationId& operationId,
  493. const TSuspendOperationOptions& options)
  494. {
  495. return AddRequest<TVoidResponseParser>(
  496. "suspend_operation",
  497. SerializeParamsForSuspendOperation(operationId, options),
  498. Nothing());
  499. }
  500. TFuture<void> THttpRawBatchRequest::ResumeOperation(
  501. const TOperationId& operationId,
  502. const TResumeOperationOptions& options)
  503. {
  504. return AddRequest<TVoidResponseParser>(
  505. "resume_operation",
  506. SerializeParamsForResumeOperation(operationId, options),
  507. Nothing());
  508. }
  509. TFuture<void> THttpRawBatchRequest::UpdateOperationParameters(
  510. const TOperationId& operationId,
  511. const TUpdateOperationParametersOptions& options)
  512. {
  513. return AddRequest<TVoidResponseParser>(
  514. "update_op_parameters",
  515. SerializeParamsForUpdateOperationParameters(operationId, options),
  516. Nothing());
  517. }
  518. TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path)
  519. {
  520. TRichYPath result = path;
  521. // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath.
  522. if (!result.Path_.StartsWith("<")) {
  523. result.Path_ = AddPathPrefix(result.Path_, Context_.Config->Prefix);
  524. }
  525. if (result.Path_.find_first_of("<>{}[]") != TString::npos) {
  526. return AddRequest<TCanonizeYPathResponseParser>(
  527. "parse_ypath",
  528. SerializeParamsForParseYPath(result),
  529. Nothing(),
  530. MakeIntrusive<TCanonizeYPathResponseParser>(Context_.Config->Prefix, result));
  531. }
  532. return NThreading::MakeFuture(result);
  533. }
  534. TFuture<TVector<TTableColumnarStatistics>> THttpRawBatchRequest::GetTableColumnarStatistics(
  535. const TTransactionId& transaction,
  536. const TVector<TRichYPath>& paths,
  537. const TGetTableColumnarStatisticsOptions& options)
  538. {
  539. return AddRequest<TTableColumnarStatisticsParser>(
  540. "get_table_columnar_statistics",
  541. SerializeParamsForGetTableColumnarStatistics(transaction, paths, options),
  542. Nothing());
  543. }
  544. TFuture<TMultiTablePartitions> THttpRawBatchRequest::GetTablePartitions(
  545. const TTransactionId& transaction,
  546. const TVector<TRichYPath>& paths,
  547. const TGetTablePartitionsOptions& options)
  548. {
  549. return AddRequest<TTablePartitionsParser>(
  550. "partition_tables",
  551. SerializeParamsForGetTablePartitions(transaction, paths, options),
  552. Nothing());
  553. }
  554. void THttpRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const
  555. {
  556. Y_ABORT_UNLESS(result);
  557. Y_ABORT_UNLESS(nextTry);
  558. *nextTry = TInstant();
  559. maxSize = Min(maxSize, BatchItemList_.size());
  560. *result = TNode::CreateList();
  561. for (size_t i = 0; i < maxSize; ++i) {
  562. YT_LOG_DEBUG("ExecuteBatch preparing: %v",
  563. RequestInfo(BatchItemList_[i].Parameters));
  564. result->Add(BatchItemList_[i].Parameters);
  565. if (BatchItemList_[i].NextTry > *nextTry) {
  566. *nextTry = BatchItemList_[i].NextTry;
  567. }
  568. }
  569. }
  570. void THttpRawBatchRequest::ParseResponse(
  571. const TResponseInfo& requestResult,
  572. const IRequestRetryPolicyPtr& retryPolicy,
  573. TInstant now)
  574. {
  575. TNode node = NodeFromYsonString(requestResult.Response);
  576. return ParseResponse(node, requestResult.RequestId, retryPolicy, now);
  577. }
  578. void THttpRawBatchRequest::ParseResponse(
  579. TNode node,
  580. const TString& requestId,
  581. const IRequestRetryPolicyPtr& retryPolicy,
  582. TInstant now)
  583. {
  584. EnsureType(node, TNode::List);
  585. auto& responseList = node.AsList();
  586. const auto size = responseList.size();
  587. Y_ENSURE(size <= BatchItemList_.size(),
  588. "Size of server response exceeds size of batch request;"
  589. " size of batch: " << BatchItemList_.size() <<
  590. " size of server response: " << size << '.');
  591. for (size_t i = 0; i != size; ++i) {
  592. try {
  593. EnsureType(responseList[i], TNode::Map);
  594. auto& responseNode = responseList[i].AsMap();
  595. const auto outputIt = responseNode.find("output");
  596. if (outputIt != responseNode.end()) {
  597. BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second));
  598. } else {
  599. const auto errorIt = responseNode.find("error");
  600. if (errorIt == responseNode.end()) {
  601. BatchItemList_[i].ResponseParser->SetResponse(Nothing());
  602. } else {
  603. TErrorResponse error(400, requestId);
  604. error.SetError(TYtError(errorIt->second));
  605. if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) {
  606. YT_LOG_INFO(
  607. "Batch subrequest (%s) failed, will retry, error: %s",
  608. RequestInfo(BatchItemList_[i].Parameters),
  609. error.what());
  610. AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
  611. } else {
  612. YT_LOG_ERROR(
  613. "Batch subrequest (%s) failed, error: %s",
  614. RequestInfo(BatchItemList_[i].Parameters),
  615. error.what());
  616. BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error));
  617. }
  618. }
  619. }
  620. } catch (const std::exception& e) {
  621. // We don't expect other exceptions, so we don't catch (...)
  622. BatchItemList_[i].ResponseParser->SetException(std::current_exception());
  623. }
  624. }
  625. BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size);
  626. }
  627. void THttpRawBatchRequest::SetErrorResult(std::exception_ptr e) const
  628. {
  629. for (const auto& batchItem : BatchItemList_) {
  630. batchItem.ResponseParser->SetException(e);
  631. }
  632. }
  633. size_t THttpRawBatchRequest::BatchSize() const
  634. {
  635. return BatchItemList_.size();
  636. }
  637. ////////////////////////////////////////////////////////////////////////////////
  638. } // namespace NYT::NDetail::NRawClient