raw_client.cpp 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946
  1. #include "raw_client.h"
  2. #include "raw_requests.h"
  3. #include "rpc_parameters_serialization.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/http/helpers.h>
  7. #include <yt/cpp/mapreduce/http/http.h>
  8. #include <yt/cpp/mapreduce/http/requests.h>
  9. #include <yt/cpp/mapreduce/http/retry_request.h>
  10. #include <yt/cpp/mapreduce/interface/fluent.h>
  11. #include <yt/cpp/mapreduce/interface/fwd.h>
  12. #include <yt/cpp/mapreduce/interface/operation.h>
  13. #include <yt/cpp/mapreduce/interface/tvm.h>
  14. #include <yt/cpp/mapreduce/io/helpers.h>
  15. #include <library/cpp/yson/node/node_io.h>
  16. #include <library/cpp/yt/yson_string/string.h>
  17. namespace NYT::NDetail {
  18. ////////////////////////////////////////////////////////////////////////////////
  19. THttpRawClient::THttpRawClient(const TClientContext& context)
  20. : Context_(context)
  21. { }
  22. TNode THttpRawClient::Get(
  23. const TTransactionId& transactionId,
  24. const TYPath& path,
  25. const TGetOptions& options)
  26. {
  27. TMutationId mutationId;
  28. THttpHeader header("GET", "get");
  29. header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options));
  30. return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  31. }
  32. TNode THttpRawClient::TryGet(
  33. const TTransactionId& transactionId,
  34. const TYPath& path,
  35. const TGetOptions& options)
  36. {
  37. try {
  38. return Get(transactionId, path, options);
  39. } catch (const TErrorResponse& error) {
  40. if (!error.IsResolveError()) {
  41. throw;
  42. }
  43. return {};
  44. }
  45. }
  46. void THttpRawClient::Set(
  47. TMutationId& mutationId,
  48. const TTransactionId& transactionId,
  49. const TYPath& path,
  50. const TNode& value,
  51. const TSetOptions& options)
  52. {
  53. THttpHeader header("PUT", "set");
  54. header.AddMutationId();
  55. header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options));
  56. auto body = NodeToYsonString(value);
  57. RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
  58. }
  59. bool THttpRawClient::Exists(
  60. const TTransactionId& transactionId,
  61. const TYPath& path,
  62. const TExistsOptions& options)
  63. {
  64. TMutationId mutationId;
  65. THttpHeader header("GET", "exists");
  66. header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options));
  67. return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  68. }
  69. void THttpRawClient::MultisetAttributes(
  70. TMutationId& mutationId,
  71. const TTransactionId& transactionId,
  72. const TYPath& path,
  73. const TNode::TMapType& value,
  74. const TMultisetAttributesOptions& options)
  75. {
  76. THttpHeader header("PUT", "api/v4/multiset_attributes", false);
  77. header.AddMutationId();
  78. header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options));
  79. auto body = NodeToYsonString(value);
  80. RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
  81. }
  82. TNodeId THttpRawClient::Create(
  83. TMutationId& mutationId,
  84. const TTransactionId& transactionId,
  85. const TYPath& path,
  86. const ENodeType& type,
  87. const TCreateOptions& options)
  88. {
  89. THttpHeader header("POST", "create");
  90. header.AddMutationId();
  91. header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options));
  92. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  93. }
  94. TNodeId THttpRawClient::CopyWithoutRetries(
  95. const TTransactionId& transactionId,
  96. const TYPath& sourcePath,
  97. const TYPath& destinationPath,
  98. const TCopyOptions& options)
  99. {
  100. TMutationId mutationId;
  101. THttpHeader header("POST", "copy");
  102. header.AddMutationId();
  103. header.MergeParameters(NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
  104. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  105. }
  106. TNodeId THttpRawClient::CopyInsideMasterCell(
  107. TMutationId& mutationId,
  108. const TTransactionId& transactionId,
  109. const TYPath& sourcePath,
  110. const TYPath& destinationPath,
  111. const TCopyOptions& options)
  112. {
  113. THttpHeader header("POST", "copy");
  114. header.AddMutationId();
  115. auto params = NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
  116. // Make cross cell copying disable.
  117. params["enable_cross_cell_copying"] = false;
  118. header.MergeParameters(params);
  119. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  120. }
  121. TNodeId THttpRawClient::MoveWithoutRetries(
  122. const TTransactionId& transactionId,
  123. const TYPath& sourcePath,
  124. const TYPath& destinationPath,
  125. const TMoveOptions& options)
  126. {
  127. TMutationId mutationId;
  128. THttpHeader header("POST", "move");
  129. header.AddMutationId();
  130. header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
  131. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  132. }
  133. TNodeId THttpRawClient::MoveInsideMasterCell(
  134. TMutationId& mutationId,
  135. const TTransactionId& transactionId,
  136. const TYPath& sourcePath,
  137. const TYPath& destinationPath,
  138. const TMoveOptions& options)
  139. {
  140. THttpHeader header("POST", "move");
  141. header.AddMutationId();
  142. auto params = NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
  143. // Make cross cell copying disable.
  144. params["enable_cross_cell_copying"] = false;
  145. header.MergeParameters(params);
  146. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  147. }
  148. void THttpRawClient::Remove(
  149. TMutationId& mutationId,
  150. const TTransactionId& transactionId,
  151. const TYPath& path,
  152. const TRemoveOptions& options)
  153. {
  154. THttpHeader header("POST", "remove");
  155. header.AddMutationId();
  156. header.MergeParameters(NRawClient::SerializeParamsForRemove(transactionId, Context_.Config->Prefix, path, options));
  157. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  158. }
  159. TNode::TListType THttpRawClient::List(
  160. const TTransactionId& transactionId,
  161. const TYPath& path,
  162. const TListOptions& options)
  163. {
  164. TMutationId mutationId;
  165. THttpHeader header("GET", "list");
  166. TYPath updatedPath = AddPathPrefix(path, Context_.Config->Prefix);
  167. // Translate "//" to "/"
  168. // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config"
  169. if (path.empty() && updatedPath.EndsWith('/')) {
  170. updatedPath.pop_back();
  171. }
  172. header.MergeParameters(NRawClient::SerializeParamsForList(transactionId, Context_.Config->Prefix, updatedPath, options));
  173. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  174. return NodeFromYsonString(responseInfo->GetResponse()).AsList();
  175. }
  176. TNodeId THttpRawClient::Link(
  177. TMutationId& mutationId,
  178. const TTransactionId& transactionId,
  179. const TYPath& targetPath,
  180. const TYPath& linkPath,
  181. const TLinkOptions& options)
  182. {
  183. THttpHeader header("POST", "link");
  184. header.AddMutationId();
  185. header.MergeParameters(NRawClient::SerializeParamsForLink(transactionId, Context_.Config->Prefix, targetPath, linkPath, options));
  186. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  187. }
  188. TLockId THttpRawClient::Lock(
  189. TMutationId& mutationId,
  190. const TTransactionId& transactionId,
  191. const TYPath& path,
  192. ELockMode mode,
  193. const TLockOptions& options)
  194. {
  195. THttpHeader header("POST", "lock");
  196. header.AddMutationId();
  197. header.MergeParameters(NRawClient::SerializeParamsForLock(transactionId, Context_.Config->Prefix, path, mode, options));
  198. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  199. }
  200. void THttpRawClient::Unlock(
  201. TMutationId& mutationId,
  202. const TTransactionId& transactionId,
  203. const TYPath& path,
  204. const TUnlockOptions& options)
  205. {
  206. THttpHeader header("POST", "unlock");
  207. header.AddMutationId();
  208. header.MergeParameters(NRawClient::SerializeParamsForUnlock(transactionId, Context_.Config->Prefix, path, options));
  209. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  210. }
  211. void THttpRawClient::Concatenate(
  212. const TTransactionId& transactionId,
  213. const TVector<TRichYPath>& sourcePaths,
  214. const TRichYPath& destinationPath,
  215. const TConcatenateOptions& options)
  216. {
  217. TMutationId mutationId;
  218. THttpHeader header("POST", "concatenate");
  219. header.AddMutationId();
  220. header.MergeParameters(NRawClient::SerializeParamsForConcatenate(transactionId, Context_.Config->Prefix, sourcePaths, destinationPath, options));
  221. TRequestConfig config;
  222. config.IsHeavy = true;
  223. RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse();
  224. }
  225. TTransactionId THttpRawClient::StartTransaction(
  226. TMutationId& mutationId,
  227. const TTransactionId& parentTransactionId,
  228. const TStartTransactionOptions& options)
  229. {
  230. THttpHeader header("POST", "start_tx");
  231. header.AddMutationId();
  232. header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, Context_.Config->TxTimeout, options));
  233. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  234. }
  235. void THttpRawClient::PingTransaction(const TTransactionId& transactionId)
  236. {
  237. TMutationId mutationId;
  238. THttpHeader header("POST", "ping_tx");
  239. header.MergeParameters(NRawClient::SerializeParamsForPingTx(transactionId));
  240. TRequestConfig requestConfig;
  241. requestConfig.HttpConfig = NHttpClient::THttpConfig{
  242. .SocketTimeout = Context_.Config->PingTimeout
  243. };
  244. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  245. }
  246. void THttpRawClient::AbortTransaction(
  247. TMutationId& mutationId,
  248. const TTransactionId& transactionId)
  249. {
  250. THttpHeader header("POST", "abort_tx");
  251. header.AddMutationId();
  252. header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId));
  253. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  254. }
  255. void THttpRawClient::CommitTransaction(
  256. TMutationId& mutationId,
  257. const TTransactionId& transactionId)
  258. {
  259. THttpHeader header("POST", "commit_tx");
  260. header.AddMutationId();
  261. header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId));
  262. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  263. }
  264. TOperationId THttpRawClient::StartOperation(
  265. TMutationId& mutationId,
  266. const TTransactionId& transactionId,
  267. EOperationType type,
  268. const TNode& spec)
  269. {
  270. THttpHeader header("POST", "start_op");
  271. header.AddMutationId();
  272. header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec));
  273. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  274. }
  275. TOperationAttributes THttpRawClient::GetOperation(
  276. const TOperationId& operationId,
  277. const TGetOperationOptions& options)
  278. {
  279. TMutationId mutationId;
  280. THttpHeader header("GET", "get_operation");
  281. header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options));
  282. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  283. return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
  284. }
  285. TOperationAttributes THttpRawClient::GetOperation(
  286. const TString& alias,
  287. const TGetOperationOptions& options)
  288. {
  289. TMutationId mutationId;
  290. THttpHeader header("GET", "get_operation");
  291. header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options));
  292. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  293. return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
  294. }
  295. void THttpRawClient::AbortOperation(
  296. TMutationId& mutationId,
  297. const TOperationId& operationId)
  298. {
  299. THttpHeader header("POST", "abort_op");
  300. header.AddMutationId();
  301. header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId));
  302. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  303. }
  304. void THttpRawClient::CompleteOperation(
  305. TMutationId& mutationId,
  306. const TOperationId& operationId)
  307. {
  308. THttpHeader header("POST", "complete_op");
  309. header.AddMutationId();
  310. header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId));
  311. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  312. }
  313. void THttpRawClient::SuspendOperation(
  314. TMutationId& mutationId,
  315. const TOperationId& operationId,
  316. const TSuspendOperationOptions& options)
  317. {
  318. THttpHeader header("POST", "suspend_op");
  319. header.AddMutationId();
  320. header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options));
  321. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  322. }
  323. void THttpRawClient::ResumeOperation(
  324. TMutationId& mutationId,
  325. const TOperationId& operationId,
  326. const TResumeOperationOptions& options)
  327. {
  328. THttpHeader header("POST", "resume_op");
  329. header.AddMutationId();
  330. header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options));
  331. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  332. }
  333. template <typename TKey>
  334. static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
  335. {
  336. THashMap<TKey, i64> counts;
  337. for (const auto& entry : countsNode.AsMap()) {
  338. counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
  339. }
  340. return counts;
  341. }
  342. TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options)
  343. {
  344. TMutationId mutationId;
  345. THttpHeader header("GET", "list_operations");
  346. header.MergeParameters(NRawClient::SerializeParamsForListOperations(options));
  347. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  348. auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
  349. const auto& operationNodesList = resultNode["operations"].AsList();
  350. TListOperationsResult result;
  351. result.Operations.reserve(operationNodesList.size());
  352. for (const auto& operationNode : operationNodesList) {
  353. result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode));
  354. }
  355. if (resultNode.HasKey("pool_counts")) {
  356. result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
  357. }
  358. if (resultNode.HasKey("user_counts")) {
  359. result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
  360. }
  361. if (resultNode.HasKey("type_counts")) {
  362. result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
  363. }
  364. if (resultNode.HasKey("state_counts")) {
  365. result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
  366. }
  367. if (resultNode.HasKey("failed_jobs_count")) {
  368. result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
  369. }
  370. result.Incomplete = resultNode["incomplete"].AsBool();
  371. return result;
  372. }
  373. void THttpRawClient::UpdateOperationParameters(
  374. const TOperationId& operationId,
  375. const TUpdateOperationParametersOptions& options)
  376. {
  377. TMutationId mutationId;
  378. THttpHeader header("POST", "update_op_parameters");
  379. header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options));
  380. RequestWithoutRetry(Context_, mutationId, header);
  381. }
  382. NYson::TYsonString THttpRawClient::GetJob(
  383. const TOperationId& operationId,
  384. const TJobId& jobId,
  385. const TGetJobOptions& options)
  386. {
  387. TMutationId mutationId;
  388. THttpHeader header("GET", "get_job");
  389. header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options));
  390. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  391. return NYson::TYsonString(responseInfo->GetResponse());
  392. }
  393. TListJobsResult THttpRawClient::ListJobs(
  394. const TOperationId& operationId,
  395. const TListJobsOptions& options)
  396. {
  397. TMutationId mutationId;
  398. THttpHeader header("GET", "list_jobs");
  399. header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options));
  400. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  401. auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
  402. const auto& jobNodesList = resultNode["jobs"].AsList();
  403. TListJobsResult result;
  404. result.Jobs.reserve(jobNodesList.size());
  405. for (const auto& jobNode : jobNodesList) {
  406. result.Jobs.push_back(NRawClient::ParseJobAttributes(jobNode));
  407. }
  408. if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) {
  409. result.CypressJobCount = resultNode["cypress_job_count"].AsInt64();
  410. }
  411. if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) {
  412. result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64();
  413. }
  414. if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) {
  415. result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64();
  416. }
  417. return result;
  418. }
  419. IFileReaderPtr THttpRawClient::GetJobInput(
  420. const TJobId& jobId,
  421. const TGetJobInputOptions& /*options*/)
  422. {
  423. TMutationId mutationId;
  424. THttpHeader header("GET", "get_job_input");
  425. header.AddParameter("job_id", GetGuidAsString(jobId));
  426. TRequestConfig config;
  427. config.IsHeavy = true;
  428. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  429. return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  430. }
  431. IFileReaderPtr THttpRawClient::GetJobFailContext(
  432. const TOperationId& operationId,
  433. const TJobId& jobId,
  434. const TGetJobFailContextOptions& /*options*/)
  435. {
  436. TMutationId mutationId;
  437. THttpHeader header("GET", "get_job_fail_context");
  438. header.AddOperationId(operationId);
  439. header.AddParameter("job_id", GetGuidAsString(jobId));
  440. TRequestConfig config;
  441. config.IsHeavy = true;
  442. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  443. return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  444. }
  445. IFileReaderPtr THttpRawClient::GetJobStderr(
  446. const TOperationId& operationId,
  447. const TJobId& jobId,
  448. const TGetJobStderrOptions& /*options*/)
  449. {
  450. TMutationId mutationId;
  451. THttpHeader header("GET", "get_job_stderr");
  452. header.AddOperationId(operationId);
  453. header.AddParameter("job_id", GetGuidAsString(jobId));
  454. TRequestConfig config;
  455. config.IsHeavy = true;
  456. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  457. return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  458. }
  459. TJobTraceEvent ParseJobTraceEvent(const TNode& node)
  460. {
  461. const auto& mapNode = node.AsMap();
  462. TJobTraceEvent result;
  463. if (auto idNode = mapNode.FindPtr("operation_id")) {
  464. result.OperationId = GetGuid(idNode->AsString());
  465. }
  466. if (auto idNode = mapNode.FindPtr("job_id")) {
  467. result.JobId = GetGuid(idNode->AsString());
  468. }
  469. if (auto idNode = mapNode.FindPtr("trace_id")) {
  470. result.TraceId = GetGuid(idNode->AsString());
  471. }
  472. if (auto eventIndexNode = mapNode.FindPtr("event_index")) {
  473. result.EventIndex = eventIndexNode->AsInt64();
  474. }
  475. if (auto eventNode = mapNode.FindPtr("event")) {
  476. result.Event = eventNode->AsString();
  477. }
  478. if (auto eventTimeNode = mapNode.FindPtr("event_time")) {
  479. result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());;
  480. }
  481. return result;
  482. }
  483. std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace(
  484. const TOperationId& operationId,
  485. const TGetJobTraceOptions& options)
  486. {
  487. TMutationId mutationId;
  488. THttpHeader header("GET", "get_job_trace");
  489. header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, options));
  490. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  491. auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
  492. const auto& traceEventNodesList = resultNode.AsList();
  493. std::vector<TJobTraceEvent> result;
  494. result.reserve(traceEventNodesList.size());
  495. for (const auto& traceEventNode : traceEventNodesList) {
  496. result.push_back(ParseJobTraceEvent(traceEventNode));
  497. }
  498. return result;
  499. }
  500. std::unique_ptr<IInputStream> THttpRawClient::ReadFile(
  501. const TTransactionId& transactionId,
  502. const TRichYPath& path,
  503. const TFileReaderOptions& options)
  504. {
  505. TMutationId mutationId;
  506. THttpHeader header("GET", GetReadFileCommand(Context_.Config->ApiVersion));
  507. header.AddTransactionId(transactionId);
  508. header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
  509. header.MergeParameters(FormIORequestParameters(path, options));
  510. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  511. TRequestConfig config;
  512. config.IsHeavy = true;
  513. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  514. return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  515. }
  516. TMaybe<TYPath> THttpRawClient::GetFileFromCache(
  517. const TTransactionId& transactionId,
  518. const TString& md5Signature,
  519. const TYPath& cachePath,
  520. const TGetFileFromCacheOptions& options)
  521. {
  522. TMutationId mutationId;
  523. THttpHeader header("GET", "get_file_from_cache");
  524. header.MergeParameters(NRawClient::SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options));
  525. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  526. auto resultNode = NodeFromYsonString(responseInfo->GetResponse()).AsString();
  527. return resultNode.empty() ? Nothing() : TMaybe<TYPath>(resultNode);
  528. }
  529. TYPath THttpRawClient::PutFileToCache(
  530. const TTransactionId& transactionId,
  531. const TYPath& filePath,
  532. const TString& md5Signature,
  533. const TYPath& cachePath,
  534. const TPutFileToCacheOptions& options)
  535. {
  536. TMutationId mutationId;
  537. THttpHeader header("POST", "put_file_to_cache");
  538. header.MergeParameters(NRawClient::SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options));
  539. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  540. return NodeFromYsonString(responseInfo->GetResponse()).AsString();
  541. }
  542. void THttpRawClient::MountTable(
  543. TMutationId& mutationId,
  544. const TYPath& path,
  545. const TMountTableOptions& options)
  546. {
  547. THttpHeader header("POST", "mount_table");
  548. header.AddMutationId();
  549. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  550. if (options.CellId_) {
  551. header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
  552. }
  553. header.AddParameter("freeze", options.Freeze_);
  554. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  555. }
  556. void THttpRawClient::UnmountTable(
  557. TMutationId& mutationId,
  558. const TYPath& path,
  559. const TUnmountTableOptions& options)
  560. {
  561. THttpHeader header("POST", "unmount_table");
  562. header.AddMutationId();
  563. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  564. header.AddParameter("force", options.Force_);
  565. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  566. }
  567. void THttpRawClient::RemountTable(
  568. TMutationId& mutationId,
  569. const TYPath& path,
  570. const TRemountTableOptions& options)
  571. {
  572. THttpHeader header("POST", "remount_table");
  573. header.AddMutationId();
  574. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  575. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  576. }
  577. void THttpRawClient::ReshardTableByPivotKeys(
  578. TMutationId& mutationId,
  579. const TYPath& path,
  580. const TVector<TKey>& keys,
  581. const TReshardTableOptions& options)
  582. {
  583. THttpHeader header("POST", "reshard_table");
  584. header.AddMutationId();
  585. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  586. header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
  587. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  588. }
  589. void THttpRawClient::ReshardTableByTabletCount(
  590. TMutationId& mutationId,
  591. const TYPath& path,
  592. i64 tabletCount,
  593. const TReshardTableOptions& options)
  594. {
  595. THttpHeader header("POST", "reshard_table");
  596. header.AddMutationId();
  597. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  598. header.AddParameter("tablet_count", tabletCount);
  599. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  600. }
  601. void THttpRawClient::InsertRows(
  602. const TYPath& path,
  603. const TNode::TListType& rows,
  604. const TInsertRowsOptions& options)
  605. {
  606. TMutationId mutationId;
  607. THttpHeader header("PUT", "insert_rows");
  608. header.SetInputFormat(TFormat::YsonBinary());
  609. header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
  610. auto body = NodeListToYsonString(rows);
  611. TRequestConfig config;
  612. config.IsHeavy = true;
  613. RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
  614. }
  615. void THttpRawClient::TrimRows(
  616. const TYPath& path,
  617. i64 tabletIndex,
  618. i64 rowCount,
  619. const TTrimRowsOptions& options)
  620. {
  621. TMutationId mutationId;
  622. THttpHeader header("POST", "trim_rows");
  623. header.AddParameter("trimmed_row_count", rowCount);
  624. header.AddParameter("tablet_index", tabletIndex);
  625. header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
  626. TRequestConfig config;
  627. config.IsHeavy = true;
  628. RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse();
  629. }
  630. TNode::TListType THttpRawClient::LookupRows(
  631. const TYPath& path,
  632. const TNode::TListType& keys,
  633. const TLookupRowsOptions& options)
  634. {
  635. TMutationId mutationId;
  636. THttpHeader header("PUT", "lookup_rows");
  637. header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion));
  638. header.SetInputFormat(TFormat::YsonBinary());
  639. header.SetOutputFormat(TFormat::YsonBinary());
  640. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  641. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  642. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  643. })
  644. .Item("keep_missing_rows").Value(options.KeepMissingRows_)
  645. .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
  646. fluent.Item("versioned").Value(*options.Versioned_);
  647. })
  648. .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
  649. fluent.Item("column_names").Value(*options.Columns_);
  650. })
  651. .EndMap());
  652. auto body = NodeListToYsonString(keys);
  653. TRequestConfig config;
  654. config.IsHeavy = true;
  655. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, body, config);
  656. return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
  657. }
  658. TNode::TListType THttpRawClient::SelectRows(
  659. const TString& query,
  660. const TSelectRowsOptions& options)
  661. {
  662. TMutationId mutationId;
  663. THttpHeader header("GET", "select_rows");
  664. header.SetInputFormat(TFormat::YsonBinary());
  665. header.SetOutputFormat(TFormat::YsonBinary());
  666. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  667. .Item("query").Value(query)
  668. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  669. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  670. })
  671. .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  672. fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
  673. })
  674. .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  675. fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
  676. })
  677. .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
  678. .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
  679. .Item("verbose_logging").Value(options.VerboseLogging_)
  680. .Item("enable_code_cache").Value(options.EnableCodeCache_)
  681. .EndMap());
  682. TRequestConfig config;
  683. config.IsHeavy = true;
  684. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  685. return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
  686. }
  687. std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
  688. const TTransactionId& transactionId,
  689. const TRichYPath& path,
  690. const TMaybe<TFormat>& format,
  691. const TTableReaderOptions& options)
  692. {
  693. TMutationId mutationId;
  694. THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
  695. header.SetOutputFormat(format);
  696. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  697. header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, options));
  698. header.MergeParameters(FormIORequestParameters(path, options));
  699. TRequestConfig config;
  700. config.IsHeavy = true;
  701. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  702. return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  703. }
  704. std::unique_ptr<IInputStream> THttpRawClient::ReadBlobTable(
  705. const TTransactionId& transactionId,
  706. const TRichYPath& path,
  707. const TKey& key,
  708. const TBlobTableReaderOptions& options)
  709. {
  710. TMutationId mutationId;
  711. THttpHeader header("GET", "read_blob_table");
  712. header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
  713. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  714. header.MergeParameters(NRawClient::SerializeParamsForReadBlobTable(transactionId, path, key, options));
  715. TRequestConfig config;
  716. config.IsHeavy = true;
  717. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  718. return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  719. }
  720. void THttpRawClient::AlterTable(
  721. TMutationId& mutationId,
  722. const TTransactionId& transactionId,
  723. const TYPath& path,
  724. const TAlterTableOptions& options)
  725. {
  726. THttpHeader header("POST", "alter_table");
  727. header.AddMutationId();
  728. header.MergeParameters(NRawClient::SerializeParamsForAlterTable(transactionId, Context_.Config->Prefix, path, options));
  729. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  730. }
  731. void THttpRawClient::AlterTableReplica(
  732. TMutationId& mutationId,
  733. const TReplicaId& replicaId,
  734. const TAlterTableReplicaOptions& options)
  735. {
  736. THttpHeader header("POST", "alter_table_replica");
  737. header.AddMutationId();
  738. header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options));
  739. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  740. }
  741. void THttpRawClient::DeleteRows(
  742. const TYPath& path,
  743. const TNode::TListType& keys,
  744. const TDeleteRowsOptions& options)
  745. {
  746. TMutationId mutationId;
  747. THttpHeader header("PUT", "delete_rows");
  748. header.SetInputFormat(TFormat::YsonBinary());
  749. header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(Context_.Config->Prefix, path, options));
  750. auto body = NodeListToYsonString(keys);
  751. TRequestConfig config;
  752. config.IsHeavy = true;
  753. RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
  754. }
  755. void THttpRawClient::FreezeTable(
  756. const TYPath& path,
  757. const TFreezeTableOptions& options)
  758. {
  759. TMutationId mutationId;
  760. THttpHeader header("POST", "freeze_table");
  761. header.MergeParameters(NRawClient::SerializeParamsForFreezeTable(Context_.Config->Prefix, path, options));
  762. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  763. }
  764. void THttpRawClient::UnfreezeTable(
  765. const TYPath& path,
  766. const TUnfreezeTableOptions& options)
  767. {
  768. TMutationId mutationId;
  769. THttpHeader header("POST", "unfreeze_table");
  770. header.MergeParameters(NRawClient::SerializeParamsForUnfreezeTable(Context_.Config->Prefix, path, options));
  771. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  772. }
  773. TCheckPermissionResponse THttpRawClient::CheckPermission(
  774. const TString& user,
  775. EPermission permission,
  776. const TYPath& path,
  777. const TCheckPermissionOptions& options)
  778. {
  779. TMutationId mutationId;
  780. THttpHeader header("GET", "check_permission");
  781. header.MergeParameters(NRawClient::SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options));
  782. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  783. return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo->GetResponse()));
  784. }
  785. TVector<TTabletInfo> THttpRawClient::GetTabletInfos(
  786. const TYPath& path,
  787. const TVector<int>& tabletIndexes,
  788. const TGetTabletInfosOptions& options)
  789. {
  790. TMutationId mutationId;
  791. THttpHeader header("POST", "api/v4/get_tablet_infos", /*isApi*/ false);
  792. header.MergeParameters(NRawClient::SerializeParamsForGetTabletInfos(Context_.Config->Prefix, path, tabletIndexes, options));
  793. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  794. TVector<TTabletInfo> result;
  795. Deserialize(result, *NodeFromYsonString(responseInfo->GetResponse()).AsMap().FindPtr("tablets"));
  796. return result;
  797. }
  798. TVector<TTableColumnarStatistics> THttpRawClient::GetTableColumnarStatistics(
  799. const TTransactionId& transactionId,
  800. const TVector<TRichYPath>& paths,
  801. const TGetTableColumnarStatisticsOptions& options)
  802. {
  803. TMutationId mutationId;
  804. THttpHeader header("GET", "get_table_columnar_statistics");
  805. header.MergeParameters(NRawClient::SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options));
  806. TRequestConfig config;
  807. config.IsHeavy = true;
  808. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  809. TVector<TTableColumnarStatistics> result;
  810. Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
  811. return result;
  812. }
  813. TMultiTablePartitions THttpRawClient::GetTablePartitions(
  814. const TTransactionId& transactionId,
  815. const TVector<TRichYPath>& paths,
  816. const TGetTablePartitionsOptions& options)
  817. {
  818. TMutationId mutationId;
  819. THttpHeader header("GET", "partition_tables");
  820. header.MergeParameters(NRawClient::SerializeParamsForGetTablePartitions(transactionId, paths, options));
  821. TRequestConfig config;
  822. config.IsHeavy = true;
  823. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  824. TMultiTablePartitions result;
  825. Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
  826. return result;
  827. }
  828. ui64 THttpRawClient::GenerateTimestamp()
  829. {
  830. TMutationId mutationId;
  831. THttpHeader header("GET", "generate_timestamp");
  832. TRequestConfig config;
  833. config.IsHeavy = true;
  834. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  835. return NodeFromYsonString(responseInfo->GetResponse()).AsUint64();
  836. }
  837. IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest()
  838. {
  839. return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr);
  840. }
  841. IRawClientPtr THttpRawClient::Clone()
  842. {
  843. return ::MakeIntrusive<THttpRawClient>(Context_);
  844. }
  845. ////////////////////////////////////////////////////////////////////////////////
  846. } // namespace NYT::NDetail