raw_batch_request.cpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
  1. #include "raw_batch_request.h"
  2. #include "raw_requests.h"
  3. #include "rpc_parameters_serialization.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/common/retry_request.h>
  7. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  8. #include <yt/cpp/mapreduce/interface/client.h>
  9. #include <yt/cpp/mapreduce/interface/errors.h>
  10. #include <yt/cpp/mapreduce/interface/serialize.h>
  11. #include <library/cpp/yson/node/node.h>
  12. #include <yt/cpp/mapreduce/http/context.h>
  13. #include <yt/cpp/mapreduce/http/retry_request.h>
  14. #include <util/generic/guid.h>
  15. #include <util/generic/scope.h>
  16. #include <util/string/builder.h>
  17. #include <exception>
  18. namespace NYT::NDetail::NRawClient {
  19. using NThreading::TFuture;
  20. using NThreading::TPromise;
  21. using NThreading::NewPromise;
  22. ////////////////////////////////////////////////////////////////////////////////
  23. static TString RequestInfo(const TNode& request)
  24. {
  25. return ::TStringBuilder()
  26. << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]);
  27. }
  28. static void EnsureNothing(const TMaybe<TNode>& node)
  29. {
  30. Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType());
  31. }
  32. static void EnsureSomething(const TMaybe<TNode>& node)
  33. {
  34. Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response.");
  35. }
  36. static void EnsureType(const TNode& node, TNode::EType type)
  37. {
  38. Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. "
  39. << "Expected: " << type << ", actual: " << node.GetType());
  40. }
  41. static void EnsureType(const TMaybe<TNode>& node, TNode::EType type)
  42. {
  43. Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response.");
  44. EnsureType(*node, type);
  45. }
  46. ////////////////////////////////////////////////////////////////////////////////
  47. template <typename TReturnType>
  48. class TResponseParserBase
  49. : public THttpRawBatchRequest::IResponseItemParser
  50. {
  51. public:
  52. using TFutureResult = TFuture<TReturnType>;
  53. public:
  54. TResponseParserBase()
  55. : Result_(NewPromise<TReturnType>())
  56. { }
  57. void SetException(std::exception_ptr e) override
  58. {
  59. Result_.SetException(std::move(e));
  60. }
  61. TFuture<TReturnType> GetFuture()
  62. {
  63. return Result_.GetFuture();
  64. }
  65. protected:
  66. TPromise<TReturnType> Result_;
  67. };
  68. ////////////////////////////////////////////////////////////////////////////////
  69. class TGetResponseParser
  70. : public TResponseParserBase<TNode>
  71. {
  72. public:
  73. void SetResponse(TMaybe<TNode> node) override
  74. {
  75. EnsureSomething(node);
  76. Result_.SetValue(std::move(*node));
  77. }
  78. };
  79. ////////////////////////////////////////////////////////////////////////////////
  80. class TVoidResponseParser
  81. : public TResponseParserBase<void>
  82. {
  83. public:
  84. void SetResponse(TMaybe<TNode> node) override
  85. {
  86. EnsureNothing(node);
  87. Result_.SetValue();
  88. }
  89. };
  90. ////////////////////////////////////////////////////////////////////////////////
  91. class TListResponseParser
  92. : public TResponseParserBase<TNode::TListType>
  93. {
  94. public:
  95. void SetResponse(TMaybe<TNode> node) override
  96. {
  97. EnsureType(node, TNode::List);
  98. Result_.SetValue(std::move(node->AsList()));
  99. }
  100. };
  101. ////////////////////////////////////////////////////////////////////////////////
  102. class TExistsResponseParser
  103. : public TResponseParserBase<bool>
  104. {
  105. public:
  106. void SetResponse(TMaybe<TNode> node) override
  107. {
  108. EnsureType(node, TNode::Bool);
  109. Result_.SetValue(std::move(node->AsBool()));
  110. }
  111. };
  112. ////////////////////////////////////////////////////////////////////////////////
  113. class TGuidResponseParser
  114. : public TResponseParserBase<TGUID>
  115. {
  116. public:
  117. void SetResponse(TMaybe<TNode> node) override
  118. {
  119. EnsureType(node, TNode::String);
  120. Result_.SetValue(GetGuid(node->AsString()));
  121. }
  122. };
  123. ////////////////////////////////////////////////////////////////////////////////
  124. class TCanonizeYPathResponseParser
  125. : public TResponseParserBase<TRichYPath>
  126. {
  127. public:
  128. explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original)
  129. : OriginalNode_(PathToNode(original))
  130. , PathPrefix_(std::move(pathPrefix))
  131. { }
  132. void SetResponse(TMaybe<TNode> node) override
  133. {
  134. EnsureType(node, TNode::String);
  135. for (const auto& item : OriginalNode_.GetAttributes().AsMap()) {
  136. node->Attributes()[item.first] = item.second;
  137. }
  138. TRichYPath result;
  139. Deserialize(result, *node);
  140. result.Path_ = AddPathPrefix(result.Path_, PathPrefix_);
  141. Result_.SetValue(result);
  142. }
  143. private:
  144. TNode OriginalNode_;
  145. TString PathPrefix_;
  146. };
  147. ////////////////////////////////////////////////////////////////////////////////
  148. class TGetOperationResponseParser
  149. : public TResponseParserBase<TOperationAttributes>
  150. {
  151. public:
  152. void SetResponse(TMaybe<TNode> node) override
  153. {
  154. EnsureType(node, TNode::Map);
  155. Result_.SetValue(ParseOperationAttributes(*node));
  156. }
  157. };
  158. ////////////////////////////////////////////////////////////////////////////////
  159. class TTableColumnarStatisticsParser
  160. : public TResponseParserBase<TVector<TTableColumnarStatistics>>
  161. {
  162. public:
  163. void SetResponse(TMaybe<TNode> node) override
  164. {
  165. EnsureType(node, TNode::List);
  166. TVector<TTableColumnarStatistics> statistics;
  167. Deserialize(statistics, *node);
  168. Result_.SetValue(std::move(statistics));
  169. }
  170. };
  171. ////////////////////////////////////////////////////////////////////////////////
  172. class TTablePartitionsParser
  173. : public TResponseParserBase<TMultiTablePartitions>
  174. {
  175. public:
  176. void SetResponse(TMaybe<TNode> node) override
  177. {
  178. EnsureType(node, TNode::Map);
  179. TMultiTablePartitions partitions;
  180. Deserialize(partitions, *node);
  181. Result_.SetValue(std::move(partitions));
  182. }
  183. };
  184. ////////////////////////////////////////////////////////////////////////////////
  185. class TGetFileFromCacheParser
  186. : public TResponseParserBase<TMaybe<TYPath>>
  187. {
  188. public:
  189. void SetResponse(TMaybe<TNode> node) override
  190. {
  191. EnsureType(node, TNode::String);
  192. if (node->AsString().empty()) {
  193. Result_.SetValue(Nothing());
  194. } else {
  195. Result_.SetValue(node->AsString());
  196. }
  197. }
  198. };
  199. ////////////////////////////////////////////////////////////////////////////////
  200. class TYPathParser
  201. : public TResponseParserBase<TYPath>
  202. {
  203. public:
  204. void SetResponse(TMaybe<TNode> node) override
  205. {
  206. EnsureType(node, TNode::String);
  207. Result_.SetValue(node->AsString());
  208. }
  209. };
  210. ////////////////////////////////////////////////////////////////////////////////
  211. class TCheckPermissionParser
  212. : public TResponseParserBase<TCheckPermissionResponse>
  213. {
  214. public:
  215. void SetResponse(TMaybe<TNode> node) override
  216. {
  217. EnsureType(node, TNode::Map);
  218. Result_.SetValue(ParseCheckPermissionResponse(*node));
  219. }
  220. };
  221. ////////////////////////////////////////////////////////////////////////////////
  222. THttpRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser)
  223. : Parameters(std::move(parameters))
  224. , ResponseParser(std::move(responseParser))
  225. , NextTry()
  226. { }
  227. THttpRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry)
  228. : Parameters(batchItem.Parameters)
  229. , ResponseParser(batchItem.ResponseParser)
  230. , NextTry(nextTry)
  231. { }
  232. ////////////////////////////////////////////////////////////////////////////////
  233. THttpRawBatchRequest::THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy)
  234. : Context_(context)
  235. , RequestRetryPolicy_(std::move(retryPolicy))
  236. { }
  237. THttpRawBatchRequest::~THttpRawBatchRequest() = default;
  238. void THttpRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
  239. {
  240. if (IsExecuted()) {
  241. ythrow yexception() << "Cannot execute batch request since it is already executed";
  242. }
  243. Y_DEFER {
  244. MarkExecuted();
  245. };
  246. const auto concurrency = options.Concurrency_.GetOrElse(50);
  247. const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
  248. if (!RequestRetryPolicy_) {
  249. RequestRetryPolicy_ = CreateDefaultRequestRetryPolicy(Context_.Config);
  250. }
  251. while (BatchSize()) {
  252. auto parameters = TNode::CreateMap();
  253. TInstant nextTry;
  254. FillParameterList(batchPartMaxSize, &parameters["requests"], &nextTry);
  255. if (nextTry) {
  256. SleepUntil(nextTry);
  257. }
  258. parameters["concurrency"] = concurrency;
  259. auto body = NodeToYsonString(parameters);
  260. THttpHeader header("POST", "execute_batch");
  261. header.AddMutationId();
  262. TResponseInfo result;
  263. try {
  264. result = RequestWithRetry<TResponseInfo>(
  265. RequestRetryPolicy_,
  266. [this, &header, &body] (TMutationId& mutationId) {
  267. auto response = RequestWithoutRetry(Context_, mutationId, header, body);
  268. return TResponseInfo{
  269. .RequestId = response->GetRequestId(),
  270. .Response = response->GetResponse(),
  271. .HttpCode = response->GetStatusCode(),
  272. };
  273. });
  274. } catch (const std::exception& e) {
  275. SetErrorResult(std::current_exception());
  276. throw;
  277. }
  278. ParseResponse(std::move(result), RequestRetryPolicy_.Get());
  279. }
  280. }
  281. bool THttpRawBatchRequest::IsExecuted() const
  282. {
  283. return Executed_;
  284. }
  285. void THttpRawBatchRequest::MarkExecuted()
  286. {
  287. Executed_ = true;
  288. }
  289. template <typename TResponseParser>
  290. typename TResponseParser::TFutureResult THttpRawBatchRequest::AddRequest(
  291. const TString& command,
  292. TNode parameters,
  293. TMaybe<TNode> input)
  294. {
  295. return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>());
  296. }
  297. template <typename TResponseParser>
  298. typename TResponseParser::TFutureResult THttpRawBatchRequest::AddRequest(
  299. const TString& command,
  300. TNode parameters,
  301. TMaybe<TNode> input,
  302. ::TIntrusivePtr<TResponseParser> parser)
  303. {
  304. Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
  305. TNode request;
  306. request["command"] = command;
  307. request["parameters"] = std::move(parameters);
  308. if (input) {
  309. request["input"] = std::move(*input);
  310. }
  311. BatchItemList_.emplace_back(std::move(request), parser);
  312. return parser->GetFuture();
  313. }
  314. void THttpRawBatchRequest::AddRequest(TBatchItem batchItem)
  315. {
  316. Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
  317. BatchItemList_.push_back(batchItem);
  318. }
  319. TFuture<TNodeId> THttpRawBatchRequest::Create(
  320. const TTransactionId& transaction,
  321. const TYPath& path,
  322. ENodeType type,
  323. const TCreateOptions& options)
  324. {
  325. return AddRequest<TGuidResponseParser>(
  326. "create",
  327. SerializeParamsForCreate(transaction, Context_.Config->Prefix, path, type, options),
  328. Nothing());
  329. }
  330. TFuture<void> THttpRawBatchRequest::Remove(
  331. const TTransactionId& transaction,
  332. const TYPath& path,
  333. const TRemoveOptions& options)
  334. {
  335. return AddRequest<TVoidResponseParser>(
  336. "remove",
  337. SerializeParamsForRemove(transaction, Context_.Config->Prefix, path, options),
  338. Nothing());
  339. }
  340. TFuture<bool> THttpRawBatchRequest::Exists(
  341. const TTransactionId& transaction,
  342. const TYPath& path,
  343. const TExistsOptions& options)
  344. {
  345. return AddRequest<TExistsResponseParser>(
  346. "exists",
  347. SerializeParamsForExists(transaction, Context_.Config->Prefix, path, options),
  348. Nothing());
  349. }
  350. TFuture<TNode> THttpRawBatchRequest::Get(
  351. const TTransactionId& transaction,
  352. const TYPath& path,
  353. const TGetOptions& options)
  354. {
  355. return AddRequest<TGetResponseParser>(
  356. "get",
  357. SerializeParamsForGet(transaction, Context_.Config->Prefix, path, options),
  358. Nothing());
  359. }
  360. TFuture<void> THttpRawBatchRequest::Set(
  361. const TTransactionId& transaction,
  362. const TYPath& path,
  363. const TNode& node,
  364. const TSetOptions& options)
  365. {
  366. return AddRequest<TVoidResponseParser>(
  367. "set",
  368. SerializeParamsForSet(transaction, Context_.Config->Prefix, path, options),
  369. node);
  370. }
  371. TFuture<TNode::TListType> THttpRawBatchRequest::List(
  372. const TTransactionId& transaction,
  373. const TYPath& path,
  374. const TListOptions& options)
  375. {
  376. return AddRequest<TListResponseParser>(
  377. "list",
  378. SerializeParamsForList(transaction, Context_.Config->Prefix, path, options),
  379. Nothing());
  380. }
  381. TFuture<TNodeId> THttpRawBatchRequest::Copy(
  382. const TTransactionId& transaction,
  383. const TYPath& sourcePath,
  384. const TYPath& destinationPath,
  385. const TCopyOptions& options)
  386. {
  387. return AddRequest<TGuidResponseParser>(
  388. "copy",
  389. SerializeParamsForCopy(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
  390. Nothing());
  391. }
  392. TFuture<TNodeId> THttpRawBatchRequest::Move(
  393. const TTransactionId& transaction,
  394. const TYPath& sourcePath,
  395. const TYPath& destinationPath,
  396. const TMoveOptions& options)
  397. {
  398. return AddRequest<TGuidResponseParser>(
  399. "move",
  400. SerializeParamsForMove(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
  401. Nothing());
  402. }
  403. TFuture<TNodeId> THttpRawBatchRequest::Link(
  404. const TTransactionId& transaction,
  405. const TYPath& targetPath,
  406. const TYPath& linkPath,
  407. const TLinkOptions& options)
  408. {
  409. return AddRequest<TGuidResponseParser>(
  410. "link",
  411. SerializeParamsForLink(transaction, Context_.Config->Prefix, targetPath, linkPath, options),
  412. Nothing());
  413. }
  414. TFuture<TLockId> THttpRawBatchRequest::Lock(
  415. const TTransactionId& transaction,
  416. const TYPath& path,
  417. ELockMode mode,
  418. const TLockOptions& options)
  419. {
  420. return AddRequest<TGuidResponseParser>(
  421. "lock",
  422. SerializeParamsForLock(transaction, Context_.Config->Prefix, path, mode, options),
  423. Nothing());
  424. }
  425. TFuture<void> THttpRawBatchRequest::Unlock(
  426. const TTransactionId& transaction,
  427. const TYPath& path,
  428. const TUnlockOptions& options)
  429. {
  430. return AddRequest<TVoidResponseParser>(
  431. "unlock",
  432. SerializeParamsForUnlock(transaction, Context_.Config->Prefix, path, options),
  433. Nothing());
  434. }
  435. TFuture<TMaybe<TYPath>> THttpRawBatchRequest::GetFileFromCache(
  436. const TTransactionId& transactionId,
  437. const TString& md5Signature,
  438. const TYPath& cachePath,
  439. const TGetFileFromCacheOptions& options)
  440. {
  441. return AddRequest<TGetFileFromCacheParser>(
  442. "get_file_from_cache",
  443. SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options),
  444. Nothing());
  445. }
  446. TFuture<TYPath> THttpRawBatchRequest::PutFileToCache(
  447. const TTransactionId& transactionId,
  448. const TYPath& filePath,
  449. const TString& md5Signature,
  450. const TYPath& cachePath,
  451. const TPutFileToCacheOptions& options)
  452. {
  453. return AddRequest<TYPathParser>(
  454. "put_file_to_cache",
  455. SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options),
  456. Nothing());
  457. }
  458. TFuture<TCheckPermissionResponse> THttpRawBatchRequest::CheckPermission(
  459. const TString& user,
  460. EPermission permission,
  461. const TYPath& path,
  462. const TCheckPermissionOptions& options)
  463. {
  464. return AddRequest<TCheckPermissionParser>(
  465. "check_permission",
  466. SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options),
  467. Nothing());
  468. }
  469. TFuture<TOperationAttributes> THttpRawBatchRequest::GetOperation(
  470. const TOperationId& operationId,
  471. const TGetOperationOptions& options)
  472. {
  473. return AddRequest<TGetOperationResponseParser>(
  474. "get_operation",
  475. SerializeParamsForGetOperation(operationId, options),
  476. Nothing());
  477. }
  478. TFuture<void> THttpRawBatchRequest::AbortOperation(const TOperationId& operationId)
  479. {
  480. return AddRequest<TVoidResponseParser>(
  481. "abort_op",
  482. SerializeParamsForAbortOperation(operationId),
  483. Nothing());
  484. }
  485. TFuture<void> THttpRawBatchRequest::CompleteOperation(const TOperationId& operationId)
  486. {
  487. return AddRequest<TVoidResponseParser>(
  488. "complete_op",
  489. SerializeParamsForCompleteOperation(operationId),
  490. Nothing());
  491. }
  492. TFuture<void> THttpRawBatchRequest::SuspendOperation(
  493. const TOperationId& operationId,
  494. const TSuspendOperationOptions& options)
  495. {
  496. return AddRequest<TVoidResponseParser>(
  497. "suspend_operation",
  498. SerializeParamsForSuspendOperation(operationId, options),
  499. Nothing());
  500. }
  501. TFuture<void> THttpRawBatchRequest::ResumeOperation(
  502. const TOperationId& operationId,
  503. const TResumeOperationOptions& options)
  504. {
  505. return AddRequest<TVoidResponseParser>(
  506. "resume_operation",
  507. SerializeParamsForResumeOperation(operationId, options),
  508. Nothing());
  509. }
  510. TFuture<void> THttpRawBatchRequest::UpdateOperationParameters(
  511. const TOperationId& operationId,
  512. const TUpdateOperationParametersOptions& options)
  513. {
  514. return AddRequest<TVoidResponseParser>(
  515. "update_op_parameters",
  516. SerializeParamsForUpdateOperationParameters(operationId, options),
  517. Nothing());
  518. }
  519. TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path)
  520. {
  521. TRichYPath result = path;
  522. // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath.
  523. if (!result.Path_.StartsWith("<")) {
  524. result.Path_ = AddPathPrefix(result.Path_, Context_.Config->Prefix);
  525. }
  526. if (result.Path_.find_first_of("<>{}[]") != TString::npos) {
  527. return AddRequest<TCanonizeYPathResponseParser>(
  528. "parse_ypath",
  529. SerializeParamsForParseYPath(result),
  530. Nothing(),
  531. MakeIntrusive<TCanonizeYPathResponseParser>(Context_.Config->Prefix, result));
  532. }
  533. return NThreading::MakeFuture(result);
  534. }
  535. TFuture<TVector<TTableColumnarStatistics>> THttpRawBatchRequest::GetTableColumnarStatistics(
  536. const TTransactionId& transaction,
  537. const TVector<TRichYPath>& paths,
  538. const TGetTableColumnarStatisticsOptions& options)
  539. {
  540. return AddRequest<TTableColumnarStatisticsParser>(
  541. "get_table_columnar_statistics",
  542. SerializeParamsForGetTableColumnarStatistics(transaction, paths, options),
  543. Nothing());
  544. }
  545. TFuture<TMultiTablePartitions> THttpRawBatchRequest::GetTablePartitions(
  546. const TTransactionId& transaction,
  547. const TVector<TRichYPath>& paths,
  548. const TGetTablePartitionsOptions& options)
  549. {
  550. return AddRequest<TTablePartitionsParser>(
  551. "partition_tables",
  552. SerializeParamsForGetTablePartitions(transaction, paths, options),
  553. Nothing());
  554. }
  555. void THttpRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const
  556. {
  557. Y_ABORT_UNLESS(result);
  558. Y_ABORT_UNLESS(nextTry);
  559. *nextTry = TInstant();
  560. maxSize = Min(maxSize, BatchItemList_.size());
  561. *result = TNode::CreateList();
  562. for (size_t i = 0; i < maxSize; ++i) {
  563. YT_LOG_DEBUG("ExecuteBatch preparing: %v",
  564. RequestInfo(BatchItemList_[i].Parameters));
  565. result->Add(BatchItemList_[i].Parameters);
  566. if (BatchItemList_[i].NextTry > *nextTry) {
  567. *nextTry = BatchItemList_[i].NextTry;
  568. }
  569. }
  570. }
  571. void THttpRawBatchRequest::ParseResponse(
  572. const TResponseInfo& requestResult,
  573. const IRequestRetryPolicyPtr& retryPolicy,
  574. TInstant now)
  575. {
  576. TNode node = NodeFromYsonString(requestResult.Response);
  577. return ParseResponse(node, requestResult.RequestId, retryPolicy, now);
  578. }
  579. void THttpRawBatchRequest::ParseResponse(
  580. TNode node,
  581. const TString& requestId,
  582. const IRequestRetryPolicyPtr& retryPolicy,
  583. TInstant now)
  584. {
  585. EnsureType(node, TNode::List);
  586. auto& responseList = node.AsList();
  587. const auto size = responseList.size();
  588. Y_ENSURE(size <= BatchItemList_.size(),
  589. "Size of server response exceeds size of batch request;"
  590. " size of batch: " << BatchItemList_.size() <<
  591. " size of server response: " << size << '.');
  592. for (size_t i = 0; i != size; ++i) {
  593. try {
  594. EnsureType(responseList[i], TNode::Map);
  595. auto& responseNode = responseList[i].AsMap();
  596. const auto outputIt = responseNode.find("output");
  597. if (outputIt != responseNode.end()) {
  598. BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second));
  599. } else {
  600. const auto errorIt = responseNode.find("error");
  601. if (errorIt == responseNode.end()) {
  602. BatchItemList_[i].ResponseParser->SetResponse(Nothing());
  603. } else {
  604. TErrorResponse error(400, requestId);
  605. error.SetError(TYtError(errorIt->second));
  606. if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) {
  607. YT_LOG_INFO(
  608. "Batch subrequest (%s) failed, will retry, error: %s",
  609. RequestInfo(BatchItemList_[i].Parameters),
  610. error.what());
  611. AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
  612. } else {
  613. YT_LOG_ERROR(
  614. "Batch subrequest (%s) failed, error: %s",
  615. RequestInfo(BatchItemList_[i].Parameters),
  616. error.what());
  617. BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error));
  618. }
  619. }
  620. }
  621. } catch (const std::exception& e) {
  622. // We don't expect other exceptions, so we don't catch (...)
  623. BatchItemList_[i].ResponseParser->SetException(std::current_exception());
  624. }
  625. }
  626. BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size);
  627. }
  628. void THttpRawBatchRequest::SetErrorResult(std::exception_ptr e) const
  629. {
  630. for (const auto& batchItem : BatchItemList_) {
  631. batchItem.ResponseParser->SetException(e);
  632. }
  633. }
  634. size_t THttpRawBatchRequest::BatchSize() const
  635. {
  636. return BatchItemList_.size();
  637. }
  638. ////////////////////////////////////////////////////////////////////////////////
  639. } // namespace NYT::NDetail::NRawClient