raw_client.cpp 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936
  1. #include "raw_client.h"
  2. #include "raw_requests.h"
  3. #include "rpc_parameters_serialization.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/http/helpers.h>
  6. #include <yt/cpp/mapreduce/http/http.h>
  7. #include <yt/cpp/mapreduce/http/requests.h>
  8. #include <yt/cpp/mapreduce/http/retry_request.h>
  9. #include <yt/cpp/mapreduce/interface/fluent.h>
  10. #include <yt/cpp/mapreduce/interface/fwd.h>
  11. #include <yt/cpp/mapreduce/interface/operation.h>
  12. #include <yt/cpp/mapreduce/interface/tvm.h>
  13. #include <yt/cpp/mapreduce/io/helpers.h>
  14. #include <library/cpp/yson/node/node_io.h>
  15. namespace NYT::NDetail {
  16. ////////////////////////////////////////////////////////////////////////////////
  17. THttpRawClient::THttpRawClient(const TClientContext& context)
  18. : Context_(context)
  19. { }
  20. TNode THttpRawClient::Get(
  21. const TTransactionId& transactionId,
  22. const TYPath& path,
  23. const TGetOptions& options)
  24. {
  25. TMutationId mutationId;
  26. THttpHeader header("GET", "get");
  27. header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options));
  28. return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  29. }
  30. TNode THttpRawClient::TryGet(
  31. const TTransactionId& transactionId,
  32. const TYPath& path,
  33. const TGetOptions& options)
  34. {
  35. try {
  36. return Get(transactionId, path, options);
  37. } catch (const TErrorResponse& error) {
  38. if (!error.IsResolveError()) {
  39. throw;
  40. }
  41. return {};
  42. }
  43. }
  44. void THttpRawClient::Set(
  45. TMutationId& mutationId,
  46. const TTransactionId& transactionId,
  47. const TYPath& path,
  48. const TNode& value,
  49. const TSetOptions& options)
  50. {
  51. THttpHeader header("PUT", "set");
  52. header.AddMutationId();
  53. header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options));
  54. auto body = NodeToYsonString(value);
  55. RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
  56. }
  57. bool THttpRawClient::Exists(
  58. const TTransactionId& transactionId,
  59. const TYPath& path,
  60. const TExistsOptions& options)
  61. {
  62. TMutationId mutationId;
  63. THttpHeader header("GET", "exists");
  64. header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options));
  65. return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  66. }
  67. void THttpRawClient::MultisetAttributes(
  68. TMutationId& mutationId,
  69. const TTransactionId& transactionId,
  70. const TYPath& path,
  71. const TNode::TMapType& value,
  72. const TMultisetAttributesOptions& options)
  73. {
  74. THttpHeader header("PUT", "api/v4/multiset_attributes", false);
  75. header.AddMutationId();
  76. header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options));
  77. auto body = NodeToYsonString(value);
  78. RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
  79. }
  80. TNodeId THttpRawClient::Create(
  81. TMutationId& mutationId,
  82. const TTransactionId& transactionId,
  83. const TYPath& path,
  84. const ENodeType& type,
  85. const TCreateOptions& options)
  86. {
  87. THttpHeader header("POST", "create");
  88. header.AddMutationId();
  89. header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options));
  90. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  91. }
  92. TNodeId THttpRawClient::CopyWithoutRetries(
  93. const TTransactionId& transactionId,
  94. const TYPath& sourcePath,
  95. const TYPath& destinationPath,
  96. const TCopyOptions& options)
  97. {
  98. TMutationId mutationId;
  99. THttpHeader header("POST", "copy");
  100. header.AddMutationId();
  101. header.MergeParameters(NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
  102. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  103. }
  104. TNodeId THttpRawClient::CopyInsideMasterCell(
  105. TMutationId& mutationId,
  106. const TTransactionId& transactionId,
  107. const TYPath& sourcePath,
  108. const TYPath& destinationPath,
  109. const TCopyOptions& options)
  110. {
  111. THttpHeader header("POST", "copy");
  112. header.AddMutationId();
  113. auto params = NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
  114. // Make cross cell copying disable.
  115. params["enable_cross_cell_copying"] = false;
  116. header.MergeParameters(params);
  117. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  118. }
  119. TNodeId THttpRawClient::MoveWithoutRetries(
  120. const TTransactionId& transactionId,
  121. const TYPath& sourcePath,
  122. const TYPath& destinationPath,
  123. const TMoveOptions& options)
  124. {
  125. TMutationId mutationId;
  126. THttpHeader header("POST", "move");
  127. header.AddMutationId();
  128. header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
  129. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  130. }
  131. TNodeId THttpRawClient::MoveInsideMasterCell(
  132. TMutationId& mutationId,
  133. const TTransactionId& transactionId,
  134. const TYPath& sourcePath,
  135. const TYPath& destinationPath,
  136. const TMoveOptions& options)
  137. {
  138. THttpHeader header("POST", "move");
  139. header.AddMutationId();
  140. auto params = NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
  141. // Make cross cell copying disable.
  142. params["enable_cross_cell_copying"] = false;
  143. header.MergeParameters(params);
  144. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  145. }
  146. void THttpRawClient::Remove(
  147. TMutationId& mutationId,
  148. const TTransactionId& transactionId,
  149. const TYPath& path,
  150. const TRemoveOptions& options)
  151. {
  152. THttpHeader header("POST", "remove");
  153. header.AddMutationId();
  154. header.MergeParameters(NRawClient::SerializeParamsForRemove(transactionId, Context_.Config->Prefix, path, options));
  155. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  156. }
  157. TNode::TListType THttpRawClient::List(
  158. const TTransactionId& transactionId,
  159. const TYPath& path,
  160. const TListOptions& options)
  161. {
  162. TMutationId mutationId;
  163. THttpHeader header("GET", "list");
  164. TYPath updatedPath = AddPathPrefix(path, Context_.Config->Prefix);
  165. // Translate "//" to "/"
  166. // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config"
  167. if (path.empty() && updatedPath.EndsWith('/')) {
  168. updatedPath.pop_back();
  169. }
  170. header.MergeParameters(NRawClient::SerializeParamsForList(transactionId, Context_.Config->Prefix, updatedPath, options));
  171. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  172. return NodeFromYsonString(responseInfo->GetResponse()).AsList();
  173. }
  174. TNodeId THttpRawClient::Link(
  175. TMutationId& mutationId,
  176. const TTransactionId& transactionId,
  177. const TYPath& targetPath,
  178. const TYPath& linkPath,
  179. const TLinkOptions& options)
  180. {
  181. THttpHeader header("POST", "link");
  182. header.AddMutationId();
  183. header.MergeParameters(NRawClient::SerializeParamsForLink(transactionId, Context_.Config->Prefix, targetPath, linkPath, options));
  184. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  185. }
  186. TLockId THttpRawClient::Lock(
  187. TMutationId& mutationId,
  188. const TTransactionId& transactionId,
  189. const TYPath& path,
  190. ELockMode mode,
  191. const TLockOptions& options)
  192. {
  193. THttpHeader header("POST", "lock");
  194. header.AddMutationId();
  195. header.MergeParameters(NRawClient::SerializeParamsForLock(transactionId, Context_.Config->Prefix, path, mode, options));
  196. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  197. }
  198. void THttpRawClient::Unlock(
  199. TMutationId& mutationId,
  200. const TTransactionId& transactionId,
  201. const TYPath& path,
  202. const TUnlockOptions& options)
  203. {
  204. THttpHeader header("POST", "unlock");
  205. header.AddMutationId();
  206. header.MergeParameters(NRawClient::SerializeParamsForUnlock(transactionId, Context_.Config->Prefix, path, options));
  207. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  208. }
  209. void THttpRawClient::Concatenate(
  210. const TTransactionId& transactionId,
  211. const TVector<TRichYPath>& sourcePaths,
  212. const TRichYPath& destinationPath,
  213. const TConcatenateOptions& options)
  214. {
  215. TMutationId mutationId;
  216. THttpHeader header("POST", "concatenate");
  217. header.AddMutationId();
  218. header.MergeParameters(NRawClient::SerializeParamsForConcatenate(transactionId, Context_.Config->Prefix, sourcePaths, destinationPath, options));
  219. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  220. }
  221. TTransactionId THttpRawClient::StartTransaction(
  222. TMutationId& mutationId,
  223. const TTransactionId& parentTransactionId,
  224. const TStartTransactionOptions& options)
  225. {
  226. THttpHeader header("POST", "start_tx");
  227. header.AddMutationId();
  228. header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, Context_.Config->TxTimeout, options));
  229. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  230. }
  231. void THttpRawClient::PingTransaction(const TTransactionId& transactionId)
  232. {
  233. TMutationId mutationId;
  234. THttpHeader header("POST", "ping_tx");
  235. header.MergeParameters(NRawClient::SerializeParamsForPingTx(transactionId));
  236. TRequestConfig requestConfig;
  237. requestConfig.HttpConfig = NHttpClient::THttpConfig{
  238. .SocketTimeout = Context_.Config->PingTimeout
  239. };
  240. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  241. }
  242. void THttpRawClient::AbortTransaction(
  243. TMutationId& mutationId,
  244. const TTransactionId& transactionId)
  245. {
  246. THttpHeader header("POST", "abort_tx");
  247. header.AddMutationId();
  248. header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId));
  249. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  250. }
  251. void THttpRawClient::CommitTransaction(
  252. TMutationId& mutationId,
  253. const TTransactionId& transactionId)
  254. {
  255. THttpHeader header("POST", "commit_tx");
  256. header.AddMutationId();
  257. header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId));
  258. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  259. }
  260. TOperationId THttpRawClient::StartOperation(
  261. TMutationId& mutationId,
  262. const TTransactionId& transactionId,
  263. EOperationType type,
  264. const TNode& spec)
  265. {
  266. THttpHeader header("POST", "start_op");
  267. header.AddMutationId();
  268. header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec));
  269. return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
  270. }
  271. TOperationAttributes THttpRawClient::GetOperation(
  272. const TOperationId& operationId,
  273. const TGetOperationOptions& options)
  274. {
  275. TMutationId mutationId;
  276. THttpHeader header("GET", "get_operation");
  277. header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options));
  278. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  279. return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
  280. }
  281. TOperationAttributes THttpRawClient::GetOperation(
  282. const TString& alias,
  283. const TGetOperationOptions& options)
  284. {
  285. TMutationId mutationId;
  286. THttpHeader header("GET", "get_operation");
  287. header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options));
  288. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  289. return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
  290. }
  291. void THttpRawClient::AbortOperation(
  292. TMutationId& mutationId,
  293. const TOperationId& operationId)
  294. {
  295. THttpHeader header("POST", "abort_op");
  296. header.AddMutationId();
  297. header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId));
  298. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  299. }
  300. void THttpRawClient::CompleteOperation(
  301. TMutationId& mutationId,
  302. const TOperationId& operationId)
  303. {
  304. THttpHeader header("POST", "complete_op");
  305. header.AddMutationId();
  306. header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId));
  307. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  308. }
  309. void THttpRawClient::SuspendOperation(
  310. TMutationId& mutationId,
  311. const TOperationId& operationId,
  312. const TSuspendOperationOptions& options)
  313. {
  314. THttpHeader header("POST", "suspend_op");
  315. header.AddMutationId();
  316. header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options));
  317. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  318. }
  319. void THttpRawClient::ResumeOperation(
  320. TMutationId& mutationId,
  321. const TOperationId& operationId,
  322. const TResumeOperationOptions& options)
  323. {
  324. THttpHeader header("POST", "resume_op");
  325. header.AddMutationId();
  326. header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options));
  327. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  328. }
  329. template <typename TKey>
  330. static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
  331. {
  332. THashMap<TKey, i64> counts;
  333. for (const auto& entry : countsNode.AsMap()) {
  334. counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
  335. }
  336. return counts;
  337. }
  338. TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options)
  339. {
  340. TMutationId mutationId;
  341. THttpHeader header("GET", "list_operations");
  342. header.MergeParameters(NRawClient::SerializeParamsForListOperations(options));
  343. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  344. auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
  345. const auto& operationNodesList = resultNode["operations"].AsList();
  346. TListOperationsResult result;
  347. result.Operations.reserve(operationNodesList.size());
  348. for (const auto& operationNode : operationNodesList) {
  349. result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode));
  350. }
  351. if (resultNode.HasKey("pool_counts")) {
  352. result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
  353. }
  354. if (resultNode.HasKey("user_counts")) {
  355. result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
  356. }
  357. if (resultNode.HasKey("type_counts")) {
  358. result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
  359. }
  360. if (resultNode.HasKey("state_counts")) {
  361. result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
  362. }
  363. if (resultNode.HasKey("failed_jobs_count")) {
  364. result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
  365. }
  366. result.Incomplete = resultNode["incomplete"].AsBool();
  367. return result;
  368. }
  369. void THttpRawClient::UpdateOperationParameters(
  370. const TOperationId& operationId,
  371. const TUpdateOperationParametersOptions& options)
  372. {
  373. TMutationId mutationId;
  374. THttpHeader header("POST", "update_op_parameters");
  375. header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options));
  376. RequestWithoutRetry(Context_, mutationId, header);
  377. }
  378. NYson::TYsonString THttpRawClient::GetJob(
  379. const TOperationId& operationId,
  380. const TJobId& jobId,
  381. const TGetJobOptions& options)
  382. {
  383. TMutationId mutationId;
  384. THttpHeader header("GET", "get_job");
  385. header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options));
  386. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  387. return NYson::TYsonString(responseInfo->GetResponse());
  388. }
  389. TListJobsResult THttpRawClient::ListJobs(
  390. const TOperationId& operationId,
  391. const TListJobsOptions& options)
  392. {
  393. TMutationId mutationId;
  394. THttpHeader header("GET", "list_jobs");
  395. header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options));
  396. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  397. auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
  398. const auto& jobNodesList = resultNode["jobs"].AsList();
  399. TListJobsResult result;
  400. result.Jobs.reserve(jobNodesList.size());
  401. for (const auto& jobNode : jobNodesList) {
  402. result.Jobs.push_back(NRawClient::ParseJobAttributes(jobNode));
  403. }
  404. if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) {
  405. result.CypressJobCount = resultNode["cypress_job_count"].AsInt64();
  406. }
  407. if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) {
  408. result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64();
  409. }
  410. if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) {
  411. result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64();
  412. }
  413. return result;
  414. }
  415. IFileReaderPtr THttpRawClient::GetJobInput(
  416. const TJobId& jobId,
  417. const TGetJobInputOptions& /*options*/)
  418. {
  419. TMutationId mutationId;
  420. THttpHeader header("GET", "get_job_input");
  421. header.AddParameter("job_id", GetGuidAsString(jobId));
  422. TRequestConfig config;
  423. config.IsHeavy = true;
  424. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  425. return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  426. }
  427. IFileReaderPtr THttpRawClient::GetJobFailContext(
  428. const TOperationId& operationId,
  429. const TJobId& jobId,
  430. const TGetJobFailContextOptions& /*options*/)
  431. {
  432. TMutationId mutationId;
  433. THttpHeader header("GET", "get_job_fail_context");
  434. header.AddOperationId(operationId);
  435. header.AddParameter("job_id", GetGuidAsString(jobId));
  436. TRequestConfig config;
  437. config.IsHeavy = true;
  438. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  439. return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  440. }
  441. IFileReaderPtr THttpRawClient::GetJobStderr(
  442. const TOperationId& operationId,
  443. const TJobId& jobId,
  444. const TGetJobStderrOptions& /*options*/)
  445. {
  446. TMutationId mutationId;
  447. THttpHeader header("GET", "get_job_stderr");
  448. header.AddOperationId(operationId);
  449. header.AddParameter("job_id", GetGuidAsString(jobId));
  450. TRequestConfig config;
  451. config.IsHeavy = true;
  452. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  453. return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  454. }
  455. TJobTraceEvent ParseJobTraceEvent(const TNode& node)
  456. {
  457. const auto& mapNode = node.AsMap();
  458. TJobTraceEvent result;
  459. if (auto idNode = mapNode.FindPtr("operation_id")) {
  460. result.OperationId = GetGuid(idNode->AsString());
  461. }
  462. if (auto idNode = mapNode.FindPtr("job_id")) {
  463. result.JobId = GetGuid(idNode->AsString());
  464. }
  465. if (auto idNode = mapNode.FindPtr("trace_id")) {
  466. result.TraceId = GetGuid(idNode->AsString());
  467. }
  468. if (auto eventIndexNode = mapNode.FindPtr("event_index")) {
  469. result.EventIndex = eventIndexNode->AsInt64();
  470. }
  471. if (auto eventNode = mapNode.FindPtr("event")) {
  472. result.Event = eventNode->AsString();
  473. }
  474. if (auto eventTimeNode = mapNode.FindPtr("event_time")) {
  475. result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());;
  476. }
  477. return result;
  478. }
  479. std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace(
  480. const TOperationId& operationId,
  481. const TGetJobTraceOptions& options)
  482. {
  483. TMutationId mutationId;
  484. THttpHeader header("GET", "get_job_trace");
  485. header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, options));
  486. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  487. auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
  488. const auto& traceEventNodesList = resultNode.AsList();
  489. std::vector<TJobTraceEvent> result;
  490. result.reserve(traceEventNodesList.size());
  491. for (const auto& traceEventNode : traceEventNodesList) {
  492. result.push_back(ParseJobTraceEvent(traceEventNode));
  493. }
  494. return result;
  495. }
  496. std::unique_ptr<IInputStream> THttpRawClient::ReadFile(
  497. const TTransactionId& transactionId,
  498. const TRichYPath& path,
  499. const TFileReaderOptions& options)
  500. {
  501. TMutationId mutationId;
  502. THttpHeader header("GET", GetReadFileCommand(Context_.Config->ApiVersion));
  503. header.AddTransactionId(transactionId);
  504. header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
  505. header.MergeParameters(FormIORequestParameters(path, options));
  506. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  507. TRequestConfig config;
  508. config.IsHeavy = true;
  509. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  510. return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  511. }
  512. TMaybe<TYPath> THttpRawClient::GetFileFromCache(
  513. const TTransactionId& transactionId,
  514. const TString& md5Signature,
  515. const TYPath& cachePath,
  516. const TGetFileFromCacheOptions& options)
  517. {
  518. TMutationId mutationId;
  519. THttpHeader header("GET", "get_file_from_cache");
  520. header.MergeParameters(NRawClient::SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options));
  521. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  522. auto resultNode = NodeFromYsonString(responseInfo->GetResponse()).AsString();
  523. return resultNode.empty() ? Nothing() : TMaybe<TYPath>(resultNode);
  524. }
  525. TYPath THttpRawClient::PutFileToCache(
  526. const TTransactionId& transactionId,
  527. const TYPath& filePath,
  528. const TString& md5Signature,
  529. const TYPath& cachePath,
  530. const TPutFileToCacheOptions& options)
  531. {
  532. TMutationId mutationId;
  533. THttpHeader header("POST", "put_file_to_cache");
  534. header.MergeParameters(NRawClient::SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options));
  535. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  536. return NodeFromYsonString(responseInfo->GetResponse()).AsString();
  537. }
  538. void THttpRawClient::MountTable(
  539. TMutationId& mutationId,
  540. const TYPath& path,
  541. const TMountTableOptions& options)
  542. {
  543. THttpHeader header("POST", "mount_table");
  544. header.AddMutationId();
  545. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  546. if (options.CellId_) {
  547. header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
  548. }
  549. header.AddParameter("freeze", options.Freeze_);
  550. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  551. }
  552. void THttpRawClient::UnmountTable(
  553. TMutationId& mutationId,
  554. const TYPath& path,
  555. const TUnmountTableOptions& options)
  556. {
  557. THttpHeader header("POST", "unmount_table");
  558. header.AddMutationId();
  559. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  560. header.AddParameter("force", options.Force_);
  561. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  562. }
  563. void THttpRawClient::RemountTable(
  564. TMutationId& mutationId,
  565. const TYPath& path,
  566. const TRemountTableOptions& options)
  567. {
  568. THttpHeader header("POST", "remount_table");
  569. header.AddMutationId();
  570. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  571. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  572. }
  573. void THttpRawClient::ReshardTableByPivotKeys(
  574. TMutationId& mutationId,
  575. const TYPath& path,
  576. const TVector<TKey>& keys,
  577. const TReshardTableOptions& options)
  578. {
  579. THttpHeader header("POST", "reshard_table");
  580. header.AddMutationId();
  581. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  582. header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
  583. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  584. }
  585. void THttpRawClient::ReshardTableByTabletCount(
  586. TMutationId& mutationId,
  587. const TYPath& path,
  588. i64 tabletCount,
  589. const TReshardTableOptions& options)
  590. {
  591. THttpHeader header("POST", "reshard_table");
  592. header.AddMutationId();
  593. header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
  594. header.AddParameter("tablet_count", tabletCount);
  595. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  596. }
  597. void THttpRawClient::InsertRows(
  598. const TYPath& path,
  599. const TNode::TListType& rows,
  600. const TInsertRowsOptions& options)
  601. {
  602. TMutationId mutationId;
  603. THttpHeader header("PUT", "insert_rows");
  604. header.SetInputFormat(TFormat::YsonBinary());
  605. header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
  606. auto body = NodeListToYsonString(rows);
  607. TRequestConfig config;
  608. config.IsHeavy = true;
  609. RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
  610. }
  611. void THttpRawClient::TrimRows(
  612. const TYPath& path,
  613. i64 tabletIndex,
  614. i64 rowCount,
  615. const TTrimRowsOptions& options)
  616. {
  617. TMutationId mutationId;
  618. THttpHeader header("POST", "trim_rows");
  619. header.AddParameter("trimmed_row_count", rowCount);
  620. header.AddParameter("tablet_index", tabletIndex);
  621. header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
  622. TRequestConfig config;
  623. config.IsHeavy = true;
  624. RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse();
  625. }
  626. TNode::TListType THttpRawClient::LookupRows(
  627. const TYPath& path,
  628. const TNode::TListType& keys,
  629. const TLookupRowsOptions& options)
  630. {
  631. TMutationId mutationId;
  632. THttpHeader header("PUT", "lookup_rows");
  633. header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion));
  634. header.SetInputFormat(TFormat::YsonBinary());
  635. header.SetOutputFormat(TFormat::YsonBinary());
  636. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  637. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  638. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  639. })
  640. .Item("keep_missing_rows").Value(options.KeepMissingRows_)
  641. .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
  642. fluent.Item("versioned").Value(*options.Versioned_);
  643. })
  644. .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
  645. fluent.Item("column_names").Value(*options.Columns_);
  646. })
  647. .EndMap());
  648. auto body = NodeListToYsonString(keys);
  649. TRequestConfig config;
  650. config.IsHeavy = true;
  651. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, body, config);
  652. return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
  653. }
  654. TNode::TListType THttpRawClient::SelectRows(
  655. const TString& query,
  656. const TSelectRowsOptions& options)
  657. {
  658. TMutationId mutationId;
  659. THttpHeader header("GET", "select_rows");
  660. header.SetInputFormat(TFormat::YsonBinary());
  661. header.SetOutputFormat(TFormat::YsonBinary());
  662. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  663. .Item("query").Value(query)
  664. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  665. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  666. })
  667. .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  668. fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
  669. })
  670. .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  671. fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
  672. })
  673. .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
  674. .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
  675. .Item("verbose_logging").Value(options.VerboseLogging_)
  676. .Item("enable_code_cache").Value(options.EnableCodeCache_)
  677. .EndMap());
  678. TRequestConfig config;
  679. config.IsHeavy = true;
  680. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  681. return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
  682. }
  683. std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
  684. const TTransactionId& transactionId,
  685. const TRichYPath& path,
  686. const TMaybe<TFormat>& format,
  687. const TTableReaderOptions& options)
  688. {
  689. TMutationId mutationId;
  690. THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
  691. header.SetOutputFormat(format);
  692. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  693. header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, options));
  694. header.MergeParameters(FormIORequestParameters(path, options));
  695. TRequestConfig config;
  696. config.IsHeavy = true;
  697. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  698. return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  699. }
  700. std::unique_ptr<IInputStream> THttpRawClient::ReadBlobTable(
  701. const TTransactionId& transactionId,
  702. const TRichYPath& path,
  703. const TKey& key,
  704. const TBlobTableReaderOptions& options)
  705. {
  706. TMutationId mutationId;
  707. THttpHeader header("GET", "read_blob_table");
  708. header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
  709. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  710. header.MergeParameters(NRawClient::SerializeParamsForReadBlobTable(transactionId, path, key, options));
  711. TRequestConfig config;
  712. config.IsHeavy = true;
  713. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  714. return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
  715. }
  716. void THttpRawClient::AlterTable(
  717. TMutationId& mutationId,
  718. const TTransactionId& transactionId,
  719. const TYPath& path,
  720. const TAlterTableOptions& options)
  721. {
  722. THttpHeader header("POST", "alter_table");
  723. header.AddMutationId();
  724. header.MergeParameters(NRawClient::SerializeParamsForAlterTable(transactionId, Context_.Config->Prefix, path, options));
  725. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  726. }
  727. void THttpRawClient::AlterTableReplica(
  728. TMutationId& mutationId,
  729. const TReplicaId& replicaId,
  730. const TAlterTableReplicaOptions& options)
  731. {
  732. THttpHeader header("POST", "alter_table_replica");
  733. header.AddMutationId();
  734. header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options));
  735. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  736. }
  737. void THttpRawClient::DeleteRows(
  738. const TYPath& path,
  739. const TNode::TListType& keys,
  740. const TDeleteRowsOptions& options)
  741. {
  742. TMutationId mutationId;
  743. THttpHeader header("PUT", "delete_rows");
  744. header.SetInputFormat(TFormat::YsonBinary());
  745. header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(Context_.Config->Prefix, path, options));
  746. auto body = NodeListToYsonString(keys);
  747. TRequestConfig config;
  748. config.IsHeavy = true;
  749. RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
  750. }
  751. void THttpRawClient::FreezeTable(
  752. const TYPath& path,
  753. const TFreezeTableOptions& options)
  754. {
  755. TMutationId mutationId;
  756. THttpHeader header("POST", "freeze_table");
  757. header.MergeParameters(NRawClient::SerializeParamsForFreezeTable(Context_.Config->Prefix, path, options));
  758. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  759. }
  760. void THttpRawClient::UnfreezeTable(
  761. const TYPath& path,
  762. const TUnfreezeTableOptions& options)
  763. {
  764. TMutationId mutationId;
  765. THttpHeader header("POST", "unfreeze_table");
  766. header.MergeParameters(NRawClient::SerializeParamsForUnfreezeTable(Context_.Config->Prefix, path, options));
  767. RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
  768. }
  769. TCheckPermissionResponse THttpRawClient::CheckPermission(
  770. const TString& user,
  771. EPermission permission,
  772. const TYPath& path,
  773. const TCheckPermissionOptions& options)
  774. {
  775. TMutationId mutationId;
  776. THttpHeader header("GET", "check_permission");
  777. header.MergeParameters(NRawClient::SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options));
  778. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  779. return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo->GetResponse()));
  780. }
  781. TVector<TTabletInfo> THttpRawClient::GetTabletInfos(
  782. const TYPath& path,
  783. const TVector<int>& tabletIndexes,
  784. const TGetTabletInfosOptions& options)
  785. {
  786. TMutationId mutationId;
  787. THttpHeader header("POST", "api/v4/get_tablet_infos", /*isApi*/ false);
  788. header.MergeParameters(NRawClient::SerializeParamsForGetTabletInfos(Context_.Config->Prefix, path, tabletIndexes, options));
  789. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
  790. TVector<TTabletInfo> result;
  791. Deserialize(result, *NodeFromYsonString(responseInfo->GetResponse()).AsMap().FindPtr("tablets"));
  792. return result;
  793. }
  794. TVector<TTableColumnarStatistics> THttpRawClient::GetTableColumnarStatistics(
  795. const TTransactionId& transactionId,
  796. const TVector<TRichYPath>& paths,
  797. const TGetTableColumnarStatisticsOptions& options)
  798. {
  799. TMutationId mutationId;
  800. THttpHeader header("GET", "get_table_columnar_statistics");
  801. header.MergeParameters(NRawClient::SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options));
  802. TRequestConfig config;
  803. config.IsHeavy = true;
  804. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  805. TVector<TTableColumnarStatistics> result;
  806. Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
  807. return result;
  808. }
  809. TMultiTablePartitions THttpRawClient::GetTablePartitions(
  810. const TTransactionId& transactionId,
  811. const TVector<TRichYPath>& paths,
  812. const TGetTablePartitionsOptions& options)
  813. {
  814. TMutationId mutationId;
  815. THttpHeader header("GET", "partition_tables");
  816. header.MergeParameters(NRawClient::SerializeParamsForGetTablePartitions(transactionId, paths, options));
  817. TRequestConfig config;
  818. config.IsHeavy = true;
  819. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  820. TMultiTablePartitions result;
  821. Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
  822. return result;
  823. }
  824. ui64 THttpRawClient::GenerateTimestamp()
  825. {
  826. TMutationId mutationId;
  827. THttpHeader header("GET", "generate_timestamp");
  828. TRequestConfig config;
  829. config.IsHeavy = true;
  830. auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
  831. return NodeFromYsonString(responseInfo->GetResponse()).AsUint64();
  832. }
  833. IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest()
  834. {
  835. return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr);
  836. }
  837. ////////////////////////////////////////////////////////////////////////////////
  838. } // namespace NYT::NDetail