raw_client.cpp 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908
  1. #include "raw_client.h"
  2. #include "raw_requests.h"
  3. #include "rpc_parameters_serialization.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/http/helpers.h>
  7. #include <yt/cpp/mapreduce/http/http.h>
  8. #include <yt/cpp/mapreduce/http/requests.h>
  9. #include <yt/cpp/mapreduce/http/retry_request.h>
  10. #include <yt/cpp/mapreduce/interface/fluent.h>
  11. #include <yt/cpp/mapreduce/interface/fwd.h>
  12. #include <yt/cpp/mapreduce/interface/operation.h>
  13. #include <yt/cpp/mapreduce/interface/tvm.h>
  14. #include <yt/cpp/mapreduce/io/helpers.h>
  15. #include <library/cpp/yson/node/node_io.h>
  16. #include <library/cpp/yt/yson_string/string.h>
  17. namespace NYT::NDetail {
  18. ////////////////////////////////////////////////////////////////////////////////
  19. THttpRawClient::THttpRawClient(const TClientContext& context)
  20. : Context_(context)
  21. { }
  22. TNode THttpRawClient::Get(
  23. const TTransactionId& transactionId,
  24. const TYPath& path,
  25. const TGetOptions& options)
  26. {
  27. TMutationId mutationId;
  28. THttpHeader header("GET", "get");
  29. header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options));
  30. return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  31. }
  32. TNode THttpRawClient::TryGet(
  33. const TTransactionId& transactionId,
  34. const TYPath& path,
  35. const TGetOptions& options)
  36. {
  37. try {
  38. return Get(transactionId, path, options);
  39. } catch (const TErrorResponse& error) {
  40. if (!error.IsResolveError()) {
  41. throw;
  42. }
  43. return {};
  44. }
  45. }
  46. void THttpRawClient::Set(
  47. TMutationId& mutationId,
  48. const TTransactionId& transactionId,
  49. const TYPath& path,
  50. const TNode& value,
  51. const TSetOptions& options)
  52. {
  53. THttpHeader header("PUT", "set");
  54. header.AddMutationId();
  55. header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options));
  56. auto body = NodeToYsonString(value);
  57. RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
  58. }
  59. bool THttpRawClient::Exists(
  60. const TTransactionId& transactionId,
  61. const TYPath& path,
  62. const TExistsOptions& options)
  63. {
  64. TMutationId mutationId;
  65. THttpHeader header("GET", "exists");
  66. header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options));
  67. return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  68. }
  69. void THttpRawClient::MultisetAttributes(
  70. TMutationId& mutationId,
  71. const TTransactionId& transactionId,
  72. const TYPath& path,
  73. const TNode::TMapType& value,
  74. const TMultisetAttributesOptions& options)
  75. {
  76. THttpHeader header("PUT", "api/v4/multiset_attributes", false);
  77. header.AddMutationId();
  78. header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options));
  79. auto body = NodeToYsonString(value);
  80. RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
  81. }
  82. TNodeId THttpRawClient::Create(
  83. TMutationId& mutationId,
  84. const TTransactionId& transactionId,
  85. const TYPath& path,
  86. const ENodeType& type,
  87. const TCreateOptions& options)
  88. {
  89. THttpHeader header("POST", "create");
  90. header.AddMutationId();
  91. header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options));
  92. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  93. }
  94. TNodeId THttpRawClient::CopyWithoutRetries(
  95. const TTransactionId& transactionId,
  96. const TYPath& sourcePath,
  97. const TYPath& destinationPath,
  98. const TCopyOptions& options)
  99. {
  100. TMutationId mutationId;
  101. THttpHeader header("POST", "copy");
  102. header.AddMutationId();
  103. header.MergeParameters(NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
  104. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  105. }
  106. TNodeId THttpRawClient::CopyInsideMasterCell(
  107. TMutationId& mutationId,
  108. const TTransactionId& transactionId,
  109. const TYPath& sourcePath,
  110. const TYPath& destinationPath,
  111. const TCopyOptions& options)
  112. {
  113. THttpHeader header("POST", "copy");
  114. header.AddMutationId();
  115. auto params = NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
  116. // Make cross cell copying disable.
  117. params["enable_cross_cell_copying"] = false;
  118. header.MergeParameters(params);
  119. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  120. }
  121. TNodeId THttpRawClient::MoveWithoutRetries(
  122. const TTransactionId& transactionId,
  123. const TYPath& sourcePath,
  124. const TYPath& destinationPath,
  125. const TMoveOptions& options)
  126. {
  127. TMutationId mutationId;
  128. THttpHeader header("POST", "move");
  129. header.AddMutationId();
  130. header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
  131. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  132. }
  133. TNodeId THttpRawClient::MoveInsideMasterCell(
  134. TMutationId& mutationId,
  135. const TTransactionId& transactionId,
  136. const TYPath& sourcePath,
  137. const TYPath& destinationPath,
  138. const TMoveOptions& options)
  139. {
  140. THttpHeader header("POST", "move");
  141. header.AddMutationId();
  142. auto params = NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
  143. // Make cross cell copying disable.
  144. params["enable_cross_cell_copying"] = false;
  145. header.MergeParameters(params);
  146. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  147. }
  148. void THttpRawClient::Remove(
  149. TMutationId& mutationId,
  150. const TTransactionId& transactionId,
  151. const TYPath& path,
  152. const TRemoveOptions& options)
  153. {
  154. THttpHeader header("POST", "remove");
  155. header.AddMutationId();
  156. header.MergeParameters(NRawClient::SerializeParamsForRemove(transactionId, Context_.Config->Prefix, path, options));
  157. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  158. }
  159. TNode::TListType THttpRawClient::List(
  160. const TTransactionId& transactionId,
  161. const TYPath& path,
  162. const TListOptions& options)
  163. {
  164. TMutationId mutationId;
  165. THttpHeader header("GET", "list");
  166. TYPath updatedPath = AddPathPrefix(path, Context_.Config->Prefix);
  167. // Translate "//" to "/"
  168. // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config"
  169. if (path.empty() && updatedPath.EndsWith('/')) {
  170. updatedPath.pop_back();
  171. }
  172. header.MergeParameters(NRawClient::SerializeParamsForList(transactionId, Context_.Config->Prefix, updatedPath, options));
  173. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  174. return NodeFromYsonString(responseInfo->GetResponse()).AsList();
  175. }
  176. TNodeId THttpRawClient::Link(
  177. TMutationId& mutationId,
  178. const TTransactionId& transactionId,
  179. const TYPath& targetPath,
  180. const TYPath& linkPath,
  181. const TLinkOptions& options)
  182. {
  183. THttpHeader header("POST", "link");
  184. header.AddMutationId();
  185. header.MergeParameters(NRawClient::SerializeParamsForLink(transactionId, Context_.Config->Prefix, targetPath, linkPath, options));
  186. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  187. }
  188. TLockId THttpRawClient::Lock(
  189. TMutationId& mutationId,
  190. const TTransactionId& transactionId,
  191. const TYPath& path,
  192. ELockMode mode,
  193. const TLockOptions& options)
  194. {
  195. THttpHeader header("POST", "lock");
  196. header.AddMutationId();
  197. header.MergeParameters(NRawClient::SerializeParamsForLock(transactionId, Context_.Config->Prefix, path, mode, options));
  198. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  199. }
  200. void THttpRawClient::Unlock(
  201. TMutationId& mutationId,
  202. const TTransactionId& transactionId,
  203. const TYPath& path,
  204. const TUnlockOptions& options)
  205. {
  206. THttpHeader header("POST", "unlock");
  207. header.AddMutationId();
  208. header.MergeParameters(NRawClient::SerializeParamsForUnlock(transactionId, Context_.Config->Prefix, path, options));
  209. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  210. }
  211. void THttpRawClient::Concatenate(
  212. const TTransactionId& transactionId,
  213. const TVector<TRichYPath>& sourcePaths,
  214. const TRichYPath& destinationPath,
  215. const TConcatenateOptions& options)
  216. {
  217. TMutationId mutationId;
  218. THttpHeader header("POST", "concatenate");
  219. header.AddMutationId();
  220. header.MergeParameters(NRawClient::SerializeParamsForConcatenate(transactionId, Context_.Config->Prefix, sourcePaths, destinationPath, options));
  221. TRequestConfig config;
  222. config.IsHeavy = true;
  223. RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse();
  224. }
  225. TTransactionId THttpRawClient::StartTransaction(
  226. TMutationId& mutationId,
  227. const TTransactionId& parentTransactionId,
  228. const TStartTransactionOptions& options)
  229. {
  230. THttpHeader header("POST", "start_tx");
  231. header.AddMutationId();
  232. header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, Context_.Config->TxTimeout, options));
  233. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  234. }
  235. void THttpRawClient::PingTransaction(const TTransactionId& transactionId)
  236. {
  237. TMutationId mutationId;
  238. THttpHeader header("POST", "ping_tx");
  239. header.MergeParameters(NRawClient::SerializeParamsForPingTx(transactionId));
  240. TRequestConfig requestConfig;
  241. requestConfig.HttpConfig = NHttpClient::THttpConfig{
  242. .SocketTimeout = Context_.Config->PingTimeout
  243. };
  244. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  245. }
  246. void THttpRawClient::AbortTransaction(
  247. TMutationId& mutationId,
  248. const TTransactionId& transactionId)
  249. {
  250. THttpHeader header("POST", "abort_tx");
  251. header.AddMutationId();
  252. header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId));
  253. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  254. }
  255. void THttpRawClient::CommitTransaction(
  256. TMutationId& mutationId,
  257. const TTransactionId& transactionId)
  258. {
  259. THttpHeader header("POST", "commit_tx");
  260. header.AddMutationId();
  261. header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId));
  262. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  263. }
  264. TOperationId THttpRawClient::StartOperation(
  265. TMutationId& mutationId,
  266. const TTransactionId& transactionId,
  267. EOperationType type,
  268. const TNode& spec)
  269. {
  270. THttpHeader header("POST", "start_op");
  271. header.AddMutationId();
  272. header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec));
  273. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  274. }
  275. TOperationAttributes THttpRawClient::GetOperation(
  276. const TOperationId& operationId,
  277. const TGetOperationOptions& options)
  278. {
  279. TMutationId mutationId;
  280. THttpHeader header("GET", "get_operation");
  281. header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options));
  282. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  283. return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
  284. }
  285. TOperationAttributes THttpRawClient::GetOperation(
  286. const TString& alias,
  287. const TGetOperationOptions& options)
  288. {
  289. TMutationId mutationId;
  290. THttpHeader header("GET", "get_operation");
  291. header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options));
  292. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  293. return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
  294. }
  295. void THttpRawClient::AbortOperation(
  296. TMutationId& mutationId,
  297. const TOperationId& operationId)
  298. {
  299. THttpHeader header("POST", "abort_op");
  300. header.AddMutationId();
  301. header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId));
  302. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  303. }
  304. void THttpRawClient::CompleteOperation(
  305. TMutationId& mutationId,
  306. const TOperationId& operationId)
  307. {
  308. THttpHeader header("POST", "complete_op");
  309. header.AddMutationId();
  310. header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId));
  311. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  312. }
  313. void THttpRawClient::SuspendOperation(
  314. TMutationId& mutationId,
  315. const TOperationId& operationId,
  316. const TSuspendOperationOptions& options)
  317. {
  318. THttpHeader header("POST", "suspend_op");
  319. header.AddMutationId();
  320. header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options));
  321. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  322. }
  323. void THttpRawClient::ResumeOperation(
  324. TMutationId& mutationId,
  325. const TOperationId& operationId,
  326. const TResumeOperationOptions& options)
  327. {
  328. THttpHeader header("POST", "resume_op");
  329. header.AddMutationId();
  330. header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options));
  331. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  332. }
  333. template <typename TKey>
  334. static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
  335. {
  336. THashMap<TKey, i64> counts;
  337. for (const auto& entry : countsNode.AsMap()) {
  338. counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
  339. }
  340. return counts;
  341. }
  342. TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options)
  343. {
  344. TMutationId mutationId;
  345. THttpHeader header("GET", "list_operations");
  346. header.MergeParameters(NRawClient::SerializeParamsForListOperations(options));
  347. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  348. auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
  349. const auto& operationNodesList = resultNode["operations"].AsList();
  350. TListOperationsResult result;
  351. result.Operations.reserve(operationNodesList.size());
  352. for (const auto& operationNode : operationNodesList) {
  353. result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode));
  354. }
  355. if (resultNode.HasKey("pool_counts")) {
  356. result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
  357. }
  358. if (resultNode.HasKey("user_counts")) {
  359. result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
  360. }
  361. if (resultNode.HasKey("type_counts")) {
  362. result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
  363. }
  364. if (resultNode.HasKey("state_counts")) {
  365. result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
  366. }
  367. if (resultNode.HasKey("failed_jobs_count")) {
  368. result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
  369. }
  370. result.Incomplete = resultNode["incomplete"].AsBool();
  371. return result;
  372. }
  373. void THttpRawClient::UpdateOperationParameters(
  374. const TOperationId& operationId,
  375. const TUpdateOperationParametersOptions& options)
  376. {
  377. TMutationId mutationId;
  378. THttpHeader header("POST", "update_op_parameters");
  379. header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options));
  380. RequestWithoutRetry(Context_, mutationId, header);
  381. }
  382. NYson::TYsonString THttpRawClient::GetJob(
  383. const TOperationId& operationId,
  384. const TJobId& jobId,
  385. const TGetJobOptions& options)
  386. {
  387. TMutationId mutationId;
  388. THttpHeader header("GET", "get_job");
  389. header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options));
  390. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  391. return NYson::TYsonString(responseInfo->GetResponse());
  392. }
  393. TListJobsResult THttpRawClient::ListJobs(
  394. const TOperationId& operationId,
  395. const TListJobsOptions& options)
  396. {
  397. TMutationId mutationId;
  398. THttpHeader header("GET", "list_jobs");
  399. header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options));
  400. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  401. auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
  402. const auto& jobNodesList = resultNode["jobs"].AsList();
  403. TListJobsResult result;
  404. result.Jobs.reserve(jobNodesList.size());
  405. for (const auto& jobNode : jobNodesList) {
  406. result.Jobs.push_back(NRawClient::ParseJobAttributes(jobNode));
  407. }
  408. if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) {
  409. result.CypressJobCount = resultNode["cypress_job_count"].AsInt64();
  410. }
  411. if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) {
  412. result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64();
  413. }
  414. if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) {
  415. result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64();
  416. }
  417. return result;
  418. }
  419. IFileReaderPtr THttpRawClient::GetJobInput(
  420. const TJobId& jobId,
  421. const TGetJobInputOptions& /*options*/)
  422. {
  423. TMutationId mutationId;
  424. THttpHeader header("GET", "get_job_input");
  425. header.AddParameter("job_id", GetGuidAsString(jobId));
  426. TRequestConfig config;
  427. config.IsHeavy = true;
  428. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  429. return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  430. }
  431. IFileReaderPtr THttpRawClient::GetJobFailContext(
  432. const TOperationId& operationId,
  433. const TJobId& jobId,
  434. const TGetJobFailContextOptions& /*options*/)
  435. {
  436. TMutationId mutationId;
  437. THttpHeader header("GET", "get_job_fail_context");
  438. header.AddOperationId(operationId);
  439. header.AddParameter("job_id", GetGuidAsString(jobId));
  440. TRequestConfig config;
  441. config.IsHeavy = true;
  442. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  443. return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  444. }
  445. IFileReaderPtr THttpRawClient::GetJobStderr(
  446. const TOperationId& operationId,
  447. const TJobId& jobId,
  448. const TGetJobStderrOptions& /*options*/)
  449. {
  450. TMutationId mutationId;
  451. THttpHeader header("GET", "get_job_stderr");
  452. header.AddOperationId(operationId);
  453. header.AddParameter("job_id", GetGuidAsString(jobId));
  454. TRequestConfig config;
  455. config.IsHeavy = true;
  456. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  457. return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  458. }
  459. TJobTraceEvent ParseJobTraceEvent(const TNode& node)
  460. {
  461. const auto& mapNode = node.AsMap();
  462. TJobTraceEvent result;
  463. if (auto idNode = mapNode.FindPtr("operation_id")) {
  464. result.OperationId = GetGuid(idNode->AsString());
  465. }
  466. if (auto idNode = mapNode.FindPtr("job_id")) {
  467. result.JobId = GetGuid(idNode->AsString());
  468. }
  469. if (auto idNode = mapNode.FindPtr("trace_id")) {
  470. result.TraceId = GetGuid(idNode->AsString());
  471. }
  472. if (auto eventIndexNode = mapNode.FindPtr("event_index")) {
  473. result.EventIndex = eventIndexNode->AsInt64();
  474. }
  475. if (auto eventNode = mapNode.FindPtr("event")) {
  476. result.Event = eventNode->AsString();
  477. }
  478. if (auto eventTimeNode = mapNode.FindPtr("event_time")) {
  479. result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());;
  480. }
  481. return result;
  482. }
  483. std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace(
  484. const TOperationId& operationId,
  485. const TGetJobTraceOptions& options)
  486. {
  487. TMutationId mutationId;
  488. THttpHeader header("GET", "get_job_trace");
  489. header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, options));
  490. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  491. auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
  492. const auto& traceEventNodesList = resultNode.AsList();
  493. std::vector<TJobTraceEvent> result;
  494. result.reserve(traceEventNodesList.size());
  495. for (const auto& traceEventNode : traceEventNodesList) {
  496. result.push_back(ParseJobTraceEvent(traceEventNode));
  497. }
  498. return result;
  499. }
  500. std::unique_ptr<IInputStream> THttpRawClient::ReadFile(
  501. const TTransactionId& transactionId,
  502. const TRichYPath& path,
  503. const TFileReaderOptions& options)
  504. {
  505. TMutationId mutationId;
  506. THttpHeader header("GET", GetReadFileCommand(Context_.Config->ApiVersion));
  507. header.AddTransactionId(transactionId);
  508. header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
  509. header.MergeParameters(FormIORequestParameters(path, options));
  510. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  511. TRequestConfig config;
  512. config.IsHeavy = true;
  513. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  514. return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  515. }
  516. TMaybe<TYPath> THttpRawClient::GetFileFromCache(
  517. const TTransactionId& transactionId,
  518. const TString& md5Signature,
  519. const TYPath& cachePath,
  520. const TGetFileFromCacheOptions& options)
  521. {
  522. TMutationId mutationId;
  523. THttpHeader header("GET", "get_file_from_cache");
  524. header.MergeParameters(NRawClient::SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options));
  525. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  526. auto resultNode = NodeFromYsonString(responseInfo->GetResponse()).AsString();
  527. return resultNode.empty() ? Nothing() : TMaybe<TYPath>(resultNode);
  528. }
  529. TYPath THttpRawClient::PutFileToCache(
  530. const TTransactionId& transactionId,
  531. const TYPath& filePath,
  532. const TString& md5Signature,
  533. const TYPath& cachePath,
  534. const TPutFileToCacheOptions& options)
  535. {
  536. TMutationId mutationId;
  537. THttpHeader header("POST", "put_file_to_cache");
  538. header.MergeParameters(NRawClient::SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options));
  539. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  540. return NodeFromYsonString(responseInfo->GetResponse()).AsString();
  541. }
  542. void THttpRawClient::MountTable(
  543. TMutationId& mutationId,
  544. const TYPath& path,
  545. const TMountTableOptions& options)
  546. {
  547. THttpHeader header("POST", "mount_table");
  548. header.AddMutationId();
  549. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  550. if (options.CellId_) {
  551. header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
  552. }
  553. header.AddParameter("freeze", options.Freeze_);
  554. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  555. }
  556. void THttpRawClient::UnmountTable(
  557. TMutationId& mutationId,
  558. const TYPath& path,
  559. const TUnmountTableOptions& options)
  560. {
  561. THttpHeader header("POST", "unmount_table");
  562. header.AddMutationId();
  563. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  564. header.AddParameter("force", options.Force_);
  565. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  566. }
  567. void THttpRawClient::RemountTable(
  568. TMutationId& mutationId,
  569. const TYPath& path,
  570. const TRemountTableOptions& options)
  571. {
  572. THttpHeader header("POST", "remount_table");
  573. header.AddMutationId();
  574. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  575. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  576. }
  577. void THttpRawClient::ReshardTableByPivotKeys(
  578. TMutationId& mutationId,
  579. const TYPath& path,
  580. const TVector<TKey>& keys,
  581. const TReshardTableOptions& options)
  582. {
  583. THttpHeader header("POST", "reshard_table");
  584. header.AddMutationId();
  585. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  586. header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
  587. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  588. }
  589. void THttpRawClient::ReshardTableByTabletCount(
  590. TMutationId& mutationId,
  591. const TYPath& path,
  592. i64 tabletCount,
  593. const TReshardTableOptions& options)
  594. {
  595. THttpHeader header("POST", "reshard_table");
  596. header.AddMutationId();
  597. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  598. header.AddParameter("tablet_count", tabletCount);
  599. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  600. }
  601. void THttpRawClient::InsertRows(
  602. const TYPath& path,
  603. const TNode::TListType& rows,
  604. const TInsertRowsOptions& options)
  605. {
  606. NRawClient::InsertRows(Context_, path, rows, options);
  607. }
  608. void THttpRawClient::TrimRows(
  609. const TYPath& path,
  610. i64 tabletIndex,
  611. i64 rowCount,
  612. const TTrimRowsOptions& options)
  613. {
  614. TMutationId mutationId;
  615. THttpHeader header("POST", "trim_rows");
  616. header.AddParameter("trimmed_row_count", rowCount);
  617. header.AddParameter("tablet_index", tabletIndex);
  618. header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
  619. TRequestConfig config;
  620. config.IsHeavy = true;
  621. RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse();
  622. }
  623. TNode::TListType THttpRawClient::LookupRows(
  624. const TYPath& path,
  625. const TNode::TListType& keys,
  626. const TLookupRowsOptions& options)
  627. {
  628. return NRawClient::LookupRows(Context_, path, keys, options);
  629. }
  630. TNode::TListType THttpRawClient::SelectRows(
  631. const TString& query,
  632. const TSelectRowsOptions& options)
  633. {
  634. TMutationId mutationId;
  635. THttpHeader header("GET", "select_rows");
  636. header.SetInputFormat(TFormat::YsonBinary());
  637. header.SetOutputFormat(TFormat::YsonBinary());
  638. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  639. .Item("query").Value(query)
  640. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  641. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  642. })
  643. .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  644. fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
  645. })
  646. .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  647. fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
  648. })
  649. .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
  650. .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
  651. .Item("verbose_logging").Value(options.VerboseLogging_)
  652. .Item("enable_code_cache").Value(options.EnableCodeCache_)
  653. .EndMap());
  654. TRequestConfig config;
  655. config.IsHeavy = true;
  656. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  657. return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
  658. }
  659. std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
  660. const TTransactionId& transactionId,
  661. const TRichYPath& path,
  662. const TMaybe<TFormat>& format,
  663. const TTableReaderOptions& options)
  664. {
  665. TMutationId mutationId;
  666. THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
  667. header.SetOutputFormat(format);
  668. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  669. header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, options));
  670. header.MergeParameters(FormIORequestParameters(path, options));
  671. TRequestConfig config;
  672. config.IsHeavy = true;
  673. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  674. return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  675. }
  676. std::unique_ptr<IInputStream> THttpRawClient::ReadBlobTable(
  677. const TTransactionId& transactionId,
  678. const TRichYPath& path,
  679. const TKey& key,
  680. const TBlobTableReaderOptions& options)
  681. {
  682. TMutationId mutationId;
  683. THttpHeader header("GET", "read_blob_table");
  684. header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
  685. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  686. header.MergeParameters(NRawClient::SerializeParamsForReadBlobTable(transactionId, path, key, options));
  687. TRequestConfig config;
  688. config.IsHeavy = true;
  689. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  690. return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  691. }
  692. void THttpRawClient::AlterTable(
  693. TMutationId& mutationId,
  694. const TTransactionId& transactionId,
  695. const TYPath& path,
  696. const TAlterTableOptions& options)
  697. {
  698. THttpHeader header("POST", "alter_table");
  699. header.AddMutationId();
  700. header.MergeParameters(NRawClient::SerializeParamsForAlterTable(transactionId, Context_.Config->Prefix, path, options));
  701. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  702. }
  703. void THttpRawClient::AlterTableReplica(
  704. TMutationId& mutationId,
  705. const TReplicaId& replicaId,
  706. const TAlterTableReplicaOptions& options)
  707. {
  708. THttpHeader header("POST", "alter_table_replica");
  709. header.AddMutationId();
  710. header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options));
  711. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  712. }
  713. void THttpRawClient::DeleteRows(
  714. const TYPath& path,
  715. const TNode::TListType& keys,
  716. const TDeleteRowsOptions& options)
  717. {
  718. NRawClient::DeleteRows(Context_, path, keys, options);
  719. }
  720. void THttpRawClient::FreezeTable(
  721. const TYPath& path,
  722. const TFreezeTableOptions& options)
  723. {
  724. TMutationId mutationId;
  725. THttpHeader header("POST", "freeze_table");
  726. header.MergeParameters(NRawClient::SerializeParamsForFreezeTable(Context_.Config->Prefix, path, options));
  727. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  728. }
  729. void THttpRawClient::UnfreezeTable(
  730. const TYPath& path,
  731. const TUnfreezeTableOptions& options)
  732. {
  733. TMutationId mutationId;
  734. THttpHeader header("POST", "unfreeze_table");
  735. header.MergeParameters(NRawClient::SerializeParamsForUnfreezeTable(Context_.Config->Prefix, path, options));
  736. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  737. }
  738. TCheckPermissionResponse THttpRawClient::CheckPermission(
  739. const TString& user,
  740. EPermission permission,
  741. const TYPath& path,
  742. const TCheckPermissionOptions& options)
  743. {
  744. TMutationId mutationId;
  745. THttpHeader header("GET", "check_permission");
  746. header.MergeParameters(NRawClient::SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options));
  747. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  748. return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo->GetResponse()));
  749. }
  750. TVector<TTabletInfo> THttpRawClient::GetTabletInfos(
  751. const TYPath& path,
  752. const TVector<int>& tabletIndexes,
  753. const TGetTabletInfosOptions& options)
  754. {
  755. TMutationId mutationId;
  756. THttpHeader header("POST", "api/v4/get_tablet_infos", /*isApi*/ false);
  757. header.MergeParameters(NRawClient::SerializeParamsForGetTabletInfos(Context_.Config->Prefix, path, tabletIndexes, options));
  758. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  759. TVector<TTabletInfo> result;
  760. Deserialize(result, *NodeFromYsonString(responseInfo->GetResponse()).AsMap().FindPtr("tablets"));
  761. return result;
  762. }
  763. TVector<TTableColumnarStatistics> THttpRawClient::GetTableColumnarStatistics(
  764. const TTransactionId& transactionId,
  765. const TVector<TRichYPath>& paths,
  766. const TGetTableColumnarStatisticsOptions& options)
  767. {
  768. TMutationId mutationId;
  769. THttpHeader header("GET", "get_table_columnar_statistics");
  770. header.MergeParameters(NRawClient::SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options));
  771. TRequestConfig config;
  772. config.IsHeavy = true;
  773. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  774. TVector<TTableColumnarStatistics> result;
  775. Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
  776. return result;
  777. }
  778. TMultiTablePartitions THttpRawClient::GetTablePartitions(
  779. const TTransactionId& transactionId,
  780. const TVector<TRichYPath>& paths,
  781. const TGetTablePartitionsOptions& options)
  782. {
  783. TMutationId mutationId;
  784. THttpHeader header("GET", "partition_tables");
  785. header.MergeParameters(NRawClient::SerializeParamsForGetTablePartitions(transactionId, paths, options));
  786. TRequestConfig config;
  787. config.IsHeavy = true;
  788. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  789. TMultiTablePartitions result;
  790. Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
  791. return result;
  792. }
  793. ui64 THttpRawClient::GenerateTimestamp()
  794. {
  795. TMutationId mutationId;
  796. THttpHeader header("GET", "generate_timestamp");
  797. TRequestConfig config;
  798. config.IsHeavy = true;
  799. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  800. return NodeFromYsonString(responseInfo->GetResponse()).AsUint64();
  801. }
  802. IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest()
  803. {
  804. return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr);
  805. }
  806. IRawClientPtr THttpRawClient::Clone()
  807. {
  808. return ::MakeIntrusive<THttpRawClient>(Context_);
  809. }
  810. ////////////////////////////////////////////////////////////////////////////////
  811. } // namespace NYT::NDetail