raw_requests.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #include "raw_requests.h"
  2. #include "raw_batch_request.h"
  3. #include "rpc_parameters_serialization.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  7. #include <yt/cpp/mapreduce/http/fwd.h>
  8. #include <yt/cpp/mapreduce/http/context.h>
  9. #include <yt/cpp/mapreduce/http/helpers.h>
  10. #include <yt/cpp/mapreduce/http/http_client.h>
  11. #include <yt/cpp/mapreduce/http/retry_request.h>
  12. #include <yt/cpp/mapreduce/interface/config.h>
  13. #include <yt/cpp/mapreduce/interface/client.h>
  14. #include <yt/cpp/mapreduce/interface/fluent.h>
  15. #include <yt/cpp/mapreduce/interface/operation.h>
  16. #include <yt/cpp/mapreduce/interface/serialize.h>
  17. #include <yt/cpp/mapreduce/interface/tvm.h>
  18. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  19. #include <library/cpp/yson/node/node_io.h>
  20. #include <util/generic/guid.h>
  21. #include <util/generic/scope.h>
  22. namespace NYT::NDetail::NRawClient {
  23. ////////////////////////////////////////////////////////////////////////////////
  24. TOperationAttributes ParseOperationAttributes(const TNode& node)
  25. {
  26. const auto& mapNode = node.AsMap();
  27. TOperationAttributes result;
  28. if (auto idNode = mapNode.FindPtr("id")) {
  29. result.Id = GetGuid(idNode->AsString());
  30. }
  31. if (auto typeNode = mapNode.FindPtr("type")) {
  32. result.Type = FromString<EOperationType>(typeNode->AsString());
  33. } else if (auto operationTypeNode = mapNode.FindPtr("operation_type")) {
  34. // COMPAT(levysotsky): "operation_type" is a deprecated synonym for "type".
  35. // This branch should be removed when all clusters are updated.
  36. result.Type = FromString<EOperationType>(operationTypeNode->AsString());
  37. }
  38. if (auto stateNode = mapNode.FindPtr("state")) {
  39. result.State = stateNode->AsString();
  40. // We don't use FromString here, because OS_IN_PROGRESS unites many states: "initializing", "running", etc.
  41. if (*result.State == "completed") {
  42. result.BriefState = EOperationBriefState::Completed;
  43. } else if (*result.State == "aborted") {
  44. result.BriefState = EOperationBriefState::Aborted;
  45. } else if (*result.State == "failed") {
  46. result.BriefState = EOperationBriefState::Failed;
  47. } else {
  48. result.BriefState = EOperationBriefState::InProgress;
  49. }
  50. }
  51. if (auto authenticatedUserNode = mapNode.FindPtr("authenticated_user")) {
  52. result.AuthenticatedUser = authenticatedUserNode->AsString();
  53. }
  54. if (auto startTimeNode = mapNode.FindPtr("start_time")) {
  55. result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
  56. }
  57. if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
  58. result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
  59. }
  60. auto briefProgressNode = mapNode.FindPtr("brief_progress");
  61. if (briefProgressNode && briefProgressNode->HasKey("jobs")) {
  62. result.BriefProgress.ConstructInPlace();
  63. static auto load = [] (const TNode& item) {
  64. // Backward compatibility with old YT versions
  65. return item.IsInt64() ? item.AsInt64() : item["total"].AsInt64();
  66. };
  67. const auto& jobs = (*briefProgressNode)["jobs"];
  68. result.BriefProgress->Aborted = load(jobs["aborted"]);
  69. result.BriefProgress->Completed = load(jobs["completed"]);
  70. result.BriefProgress->Running = jobs["running"].AsInt64();
  71. result.BriefProgress->Total = jobs["total"].AsInt64();
  72. result.BriefProgress->Failed = jobs["failed"].AsInt64();
  73. result.BriefProgress->Lost = jobs["lost"].AsInt64();
  74. result.BriefProgress->Pending = jobs["pending"].AsInt64();
  75. }
  76. if (auto briefSpecNode = mapNode.FindPtr("brief_spec")) {
  77. result.BriefSpec = *briefSpecNode;
  78. }
  79. if (auto specNode = mapNode.FindPtr("spec")) {
  80. result.Spec = *specNode;
  81. }
  82. if (auto fullSpecNode = mapNode.FindPtr("full_spec")) {
  83. result.FullSpec = *fullSpecNode;
  84. }
  85. if (auto unrecognizedSpecNode = mapNode.FindPtr("unrecognized_spec")) {
  86. result.UnrecognizedSpec = *unrecognizedSpecNode;
  87. }
  88. if (auto suspendedNode = mapNode.FindPtr("suspended")) {
  89. result.Suspended = suspendedNode->AsBool();
  90. }
  91. if (auto resultNode = mapNode.FindPtr("result")) {
  92. result.Result.ConstructInPlace();
  93. auto error = TYtError((*resultNode)["error"]);
  94. if (error.GetCode() != 0) {
  95. result.Result->Error = std::move(error);
  96. }
  97. }
  98. if (auto progressNode = mapNode.FindPtr("progress")) {
  99. const auto& progressMap = progressNode->AsMap();
  100. TMaybe<TInstant> buildTime;
  101. if (auto buildTimeNode = progressMap.FindPtr("build_time")) {
  102. buildTime = TInstant::ParseIso8601(buildTimeNode->AsString());
  103. }
  104. TJobStatistics jobStatistics;
  105. if (auto jobStatisticsNode = progressMap.FindPtr("job_statistics")) {
  106. jobStatistics = TJobStatistics(*jobStatisticsNode);
  107. }
  108. TJobCounters jobCounters;
  109. if (auto jobCountersNode = progressMap.FindPtr("total_job_counter")) {
  110. jobCounters = TJobCounters(*jobCountersNode);
  111. }
  112. result.Progress = TOperationProgress{
  113. .JobStatistics = std::move(jobStatistics),
  114. .JobCounters = std::move(jobCounters),
  115. .BuildTime = buildTime,
  116. };
  117. }
  118. if (auto eventsNode = mapNode.FindPtr("events")) {
  119. result.Events.ConstructInPlace().reserve(eventsNode->Size());
  120. for (const auto& eventNode : eventsNode->AsList()) {
  121. result.Events->push_back(TOperationEvent{
  122. eventNode["state"].AsString(),
  123. TInstant::ParseIso8601(eventNode["time"].AsString()),
  124. });
  125. }
  126. }
  127. if (auto alertsNode = mapNode.FindPtr("alerts")) {
  128. result.Alerts.ConstructInPlace();
  129. for (const auto& [alertType, alertError] : alertsNode->AsMap()) {
  130. result.Alerts->emplace(alertType, TYtError(alertError));
  131. }
  132. }
  133. return result;
  134. }
  135. TJobAttributes ParseJobAttributes(const TNode& node)
  136. {
  137. const auto& mapNode = node.AsMap();
  138. TJobAttributes result;
  139. // Currently "get_job" returns "job_id" field and "list_jobs" returns "id" field.
  140. auto idNode = mapNode.FindPtr("id");
  141. if (!idNode) {
  142. idNode = mapNode.FindPtr("job_id");
  143. }
  144. if (idNode) {
  145. result.Id = GetGuid(idNode->AsString());
  146. }
  147. if (auto typeNode = mapNode.FindPtr("type")) {
  148. result.Type = FromString<EJobType>(typeNode->AsString());
  149. }
  150. if (auto stateNode = mapNode.FindPtr("state")) {
  151. result.State = FromString<EJobState>(stateNode->AsString());
  152. }
  153. if (auto addressNode = mapNode.FindPtr("address")) {
  154. result.Address = addressNode->AsString();
  155. }
  156. if (auto taskNameNode = mapNode.FindPtr("task_name")) {
  157. result.TaskName = taskNameNode->AsString();
  158. }
  159. if (auto startTimeNode = mapNode.FindPtr("start_time")) {
  160. result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
  161. }
  162. if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
  163. result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
  164. }
  165. if (auto progressNode = mapNode.FindPtr("progress")) {
  166. result.Progress = progressNode->AsDouble();
  167. }
  168. if (auto stderrSizeNode = mapNode.FindPtr("stderr_size")) {
  169. result.StderrSize = stderrSizeNode->AsUint64();
  170. }
  171. if (auto errorNode = mapNode.FindPtr("error")) {
  172. result.Error.ConstructInPlace(*errorNode);
  173. }
  174. if (auto briefStatisticsNode = mapNode.FindPtr("brief_statistics")) {
  175. result.BriefStatistics = *briefStatisticsNode;
  176. }
  177. if (auto inputPathsNode = mapNode.FindPtr("input_paths")) {
  178. const auto& inputPathNodesList = inputPathsNode->AsList();
  179. result.InputPaths.ConstructInPlace();
  180. result.InputPaths->reserve(inputPathNodesList.size());
  181. for (const auto& inputPathNode : inputPathNodesList) {
  182. TRichYPath path;
  183. Deserialize(path, inputPathNode);
  184. result.InputPaths->push_back(std::move(path));
  185. }
  186. }
  187. if (auto coreInfosNode = mapNode.FindPtr("core_infos")) {
  188. const auto& coreInfoNodesList = coreInfosNode->AsList();
  189. result.CoreInfos.ConstructInPlace();
  190. result.CoreInfos->reserve(coreInfoNodesList.size());
  191. for (const auto& coreInfoNode : coreInfoNodesList) {
  192. TCoreInfo coreInfo;
  193. coreInfo.ProcessId = coreInfoNode["process_id"].AsInt64();
  194. coreInfo.ExecutableName = coreInfoNode["executable_name"].AsString();
  195. if (coreInfoNode.HasKey("size")) {
  196. coreInfo.Size = coreInfoNode["size"].AsUint64();
  197. }
  198. if (coreInfoNode.HasKey("error")) {
  199. coreInfo.Error.ConstructInPlace(coreInfoNode["error"]);
  200. }
  201. result.CoreInfos->push_back(std::move(coreInfo));
  202. }
  203. }
  204. return result;
  205. }
  206. TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
  207. {
  208. auto parseSingleResult = [] (const TNode::TMapType& node) {
  209. TCheckPermissionResult result;
  210. result.Action = ::FromString<ESecurityAction>(node.at("action").AsString());
  211. if (auto objectId = node.FindPtr("object_id")) {
  212. result.ObjectId = GetGuid(objectId->AsString());
  213. }
  214. if (auto objectName = node.FindPtr("object_name")) {
  215. result.ObjectName = objectName->AsString();
  216. }
  217. if (auto subjectId = node.FindPtr("subject_id")) {
  218. result.SubjectId = GetGuid(subjectId->AsString());
  219. }
  220. if (auto subjectName = node.FindPtr("subject_name")) {
  221. result.SubjectName = subjectName->AsString();
  222. }
  223. return result;
  224. };
  225. const auto& mapNode = node.AsMap();
  226. TCheckPermissionResponse result;
  227. static_cast<TCheckPermissionResult&>(result) = parseSingleResult(mapNode);
  228. if (auto columns = mapNode.FindPtr("columns")) {
  229. result.Columns.reserve(columns->AsList().size());
  230. for (const auto& columnNode : columns->AsList()) {
  231. result.Columns.push_back(parseSingleResult(columnNode.AsMap()));
  232. }
  233. }
  234. return result;
  235. }
  236. TRichYPath CanonizeYPath(
  237. const IRawClientPtr& rawClient,
  238. const TRichYPath& path)
  239. {
  240. return CanonizeYPaths(rawClient, {path}).front();
  241. }
  242. TVector<TRichYPath> CanonizeYPaths(
  243. const IRawClientPtr& rawClient,
  244. const TVector<TRichYPath>& paths)
  245. {
  246. auto batch = rawClient->CreateRawBatchRequest();
  247. TVector<NThreading::TFuture<TRichYPath>> futures;
  248. futures.reserve(paths.size());
  249. for (const auto& path : paths) {
  250. futures.push_back(batch->CanonizeYPath(path));
  251. }
  252. batch->ExecuteBatch();
  253. TVector<TRichYPath> result;
  254. result.reserve(futures.size());
  255. for (auto& future : futures) {
  256. result.push_back(future.ExtractValueSync());
  257. }
  258. return result;
  259. }
  260. NHttpClient::IHttpResponsePtr SkyShareTable(
  261. const TClientContext& context,
  262. const std::vector<TYPath>& tablePaths,
  263. const TSkyShareTableOptions& options)
  264. {
  265. TMutationId mutationId;
  266. THttpHeader header("POST", "api/v1/share", /*IsApi*/ false);
  267. auto proxyName = context.ServerName.substr(0, context.ServerName.find('.'));
  268. auto host = context.Config->SkynetApiHost;
  269. if (host == "") {
  270. host = "skynet." + proxyName + ".yt.yandex.net";
  271. }
  272. TSkyShareTableOptions patchedOptions = options;
  273. if (context.Config->Pool && !patchedOptions.Pool_) {
  274. patchedOptions.Pool(context.Config->Pool);
  275. }
  276. header.MergeParameters(SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, patchedOptions));
  277. TClientContext skyApiHost({.ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient()});
  278. return RequestWithoutRetry(skyApiHost, mutationId, header, "");
  279. }
  280. void InsertRows(
  281. const TClientContext& context,
  282. const TYPath& path,
  283. const TNode::TListType& rows,
  284. const TInsertRowsOptions& options)
  285. {
  286. TMutationId mutationId;
  287. THttpHeader header("PUT", "insert_rows");
  288. header.SetInputFormat(TFormat::YsonBinary());
  289. header.MergeParameters(NRawClient::SerializeParametersForInsertRows(context.Config->Prefix, path, options));
  290. auto body = NodeListToYsonString(rows);
  291. TRequestConfig config;
  292. config.IsHeavy = true;
  293. RequestWithoutRetry(context, mutationId, header, body, config)->GetResponse();
  294. }
  295. TNode::TListType LookupRows(
  296. const TClientContext& context,
  297. const TYPath& path,
  298. const TNode::TListType& keys,
  299. const TLookupRowsOptions& options)
  300. {
  301. TMutationId mutationId;
  302. THttpHeader header("PUT", "lookup_rows");
  303. header.AddPath(AddPathPrefix(path, context.Config->ApiVersion));
  304. header.SetInputFormat(TFormat::YsonBinary());
  305. header.SetOutputFormat(TFormat::YsonBinary());
  306. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  307. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  308. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  309. })
  310. .Item("keep_missing_rows").Value(options.KeepMissingRows_)
  311. .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
  312. fluent.Item("versioned").Value(*options.Versioned_);
  313. })
  314. .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
  315. fluent.Item("column_names").Value(*options.Columns_);
  316. })
  317. .EndMap());
  318. auto body = NodeListToYsonString(keys);
  319. TRequestConfig config;
  320. config.IsHeavy = true;
  321. auto responseInfo = RequestWithoutRetry(context, mutationId, header, body, config);
  322. return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
  323. }
  324. void DeleteRows(
  325. const TClientContext& context,
  326. const TYPath& path,
  327. const TNode::TListType& keys,
  328. const TDeleteRowsOptions& options)
  329. {
  330. TMutationId mutationId;
  331. THttpHeader header("PUT", "delete_rows");
  332. header.SetInputFormat(TFormat::YsonBinary());
  333. header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(context.Config->Prefix, path, options));
  334. auto body = NodeListToYsonString(keys);
  335. TRequestConfig config;
  336. config.IsHeavy = true;
  337. RequestWithoutRetry(context, mutationId, header, body, config)->GetResponse();
  338. }
  339. TAuthorizationInfo WhoAmI(const TClientContext& context)
  340. {
  341. TMutationId mutationId;
  342. THttpHeader header("GET", "auth/whoami", /*isApi*/ false);
  343. auto requestResult = RequestWithoutRetry(context, mutationId, header);
  344. TAuthorizationInfo result;
  345. NJson::TJsonValue jsonValue;
  346. bool ok = NJson::ReadJsonTree(requestResult->GetResponse(), &jsonValue, /*throwOnError*/ true);
  347. Y_ABORT_UNLESS(ok);
  348. result.Login = jsonValue["login"].GetString();
  349. result.Realm = jsonValue["realm"].GetString();
  350. return result;
  351. }
  352. ////////////////////////////////////////////////////////////////////////////////
  353. } // namespace NYT::NDetail::NRawClient