raw_requests.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. #include "raw_requests.h"
  2. #include "raw_batch_request.h"
  3. #include "rpc_parameters_serialization.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  7. #include <yt/cpp/mapreduce/http/fwd.h>
  8. #include <yt/cpp/mapreduce/http/context.h>
  9. #include <yt/cpp/mapreduce/http/helpers.h>
  10. #include <yt/cpp/mapreduce/http/http_client.h>
  11. #include <yt/cpp/mapreduce/http/retry_request.h>
  12. #include <yt/cpp/mapreduce/interface/config.h>
  13. #include <yt/cpp/mapreduce/interface/client.h>
  14. #include <yt/cpp/mapreduce/interface/operation.h>
  15. #include <yt/cpp/mapreduce/interface/serialize.h>
  16. #include <yt/cpp/mapreduce/interface/tvm.h>
  17. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  18. #include <library/cpp/yson/node/node_io.h>
  19. #include <util/generic/guid.h>
  20. #include <util/generic/scope.h>
  21. namespace NYT::NDetail::NRawClient {
  22. ////////////////////////////////////////////////////////////////////////////////
  23. TOperationAttributes ParseOperationAttributes(const TNode& node)
  24. {
  25. const auto& mapNode = node.AsMap();
  26. TOperationAttributes result;
  27. if (auto idNode = mapNode.FindPtr("id")) {
  28. result.Id = GetGuid(idNode->AsString());
  29. }
  30. if (auto typeNode = mapNode.FindPtr("type")) {
  31. result.Type = FromString<EOperationType>(typeNode->AsString());
  32. } else if (auto operationTypeNode = mapNode.FindPtr("operation_type")) {
  33. // COMPAT(levysotsky): "operation_type" is a deprecated synonym for "type".
  34. // This branch should be removed when all clusters are updated.
  35. result.Type = FromString<EOperationType>(operationTypeNode->AsString());
  36. }
  37. if (auto stateNode = mapNode.FindPtr("state")) {
  38. result.State = stateNode->AsString();
  39. // We don't use FromString here, because OS_IN_PROGRESS unites many states: "initializing", "running", etc.
  40. if (*result.State == "completed") {
  41. result.BriefState = EOperationBriefState::Completed;
  42. } else if (*result.State == "aborted") {
  43. result.BriefState = EOperationBriefState::Aborted;
  44. } else if (*result.State == "failed") {
  45. result.BriefState = EOperationBriefState::Failed;
  46. } else {
  47. result.BriefState = EOperationBriefState::InProgress;
  48. }
  49. }
  50. if (auto authenticatedUserNode = mapNode.FindPtr("authenticated_user")) {
  51. result.AuthenticatedUser = authenticatedUserNode->AsString();
  52. }
  53. if (auto startTimeNode = mapNode.FindPtr("start_time")) {
  54. result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
  55. }
  56. if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
  57. result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
  58. }
  59. auto briefProgressNode = mapNode.FindPtr("brief_progress");
  60. if (briefProgressNode && briefProgressNode->HasKey("jobs")) {
  61. result.BriefProgress.ConstructInPlace();
  62. static auto load = [] (const TNode& item) {
  63. // Backward compatibility with old YT versions
  64. return item.IsInt64() ? item.AsInt64() : item["total"].AsInt64();
  65. };
  66. const auto& jobs = (*briefProgressNode)["jobs"];
  67. result.BriefProgress->Aborted = load(jobs["aborted"]);
  68. result.BriefProgress->Completed = load(jobs["completed"]);
  69. result.BriefProgress->Running = jobs["running"].AsInt64();
  70. result.BriefProgress->Total = jobs["total"].AsInt64();
  71. result.BriefProgress->Failed = jobs["failed"].AsInt64();
  72. result.BriefProgress->Lost = jobs["lost"].AsInt64();
  73. result.BriefProgress->Pending = jobs["pending"].AsInt64();
  74. }
  75. if (auto briefSpecNode = mapNode.FindPtr("brief_spec")) {
  76. result.BriefSpec = *briefSpecNode;
  77. }
  78. if (auto specNode = mapNode.FindPtr("spec")) {
  79. result.Spec = *specNode;
  80. }
  81. if (auto fullSpecNode = mapNode.FindPtr("full_spec")) {
  82. result.FullSpec = *fullSpecNode;
  83. }
  84. if (auto unrecognizedSpecNode = mapNode.FindPtr("unrecognized_spec")) {
  85. result.UnrecognizedSpec = *unrecognizedSpecNode;
  86. }
  87. if (auto suspendedNode = mapNode.FindPtr("suspended")) {
  88. result.Suspended = suspendedNode->AsBool();
  89. }
  90. if (auto resultNode = mapNode.FindPtr("result")) {
  91. result.Result.ConstructInPlace();
  92. auto error = TYtError((*resultNode)["error"]);
  93. if (error.GetCode() != 0) {
  94. result.Result->Error = std::move(error);
  95. }
  96. }
  97. if (auto progressNode = mapNode.FindPtr("progress")) {
  98. const auto& progressMap = progressNode->AsMap();
  99. TMaybe<TInstant> buildTime;
  100. if (auto buildTimeNode = progressMap.FindPtr("build_time")) {
  101. buildTime = TInstant::ParseIso8601(buildTimeNode->AsString());
  102. }
  103. TJobStatistics jobStatistics;
  104. if (auto jobStatisticsNode = progressMap.FindPtr("job_statistics")) {
  105. jobStatistics = TJobStatistics(*jobStatisticsNode);
  106. }
  107. TJobCounters jobCounters;
  108. if (auto jobCountersNode = progressMap.FindPtr("total_job_counter")) {
  109. jobCounters = TJobCounters(*jobCountersNode);
  110. }
  111. result.Progress = TOperationProgress{
  112. .JobStatistics = std::move(jobStatistics),
  113. .JobCounters = std::move(jobCounters),
  114. .BuildTime = buildTime,
  115. };
  116. }
  117. if (auto eventsNode = mapNode.FindPtr("events")) {
  118. result.Events.ConstructInPlace().reserve(eventsNode->Size());
  119. for (const auto& eventNode : eventsNode->AsList()) {
  120. result.Events->push_back(TOperationEvent{
  121. eventNode["state"].AsString(),
  122. TInstant::ParseIso8601(eventNode["time"].AsString()),
  123. });
  124. }
  125. }
  126. if (auto alertsNode = mapNode.FindPtr("alerts")) {
  127. result.Alerts.ConstructInPlace();
  128. for (const auto& [alertType, alertError] : alertsNode->AsMap()) {
  129. result.Alerts->emplace(alertType, TYtError(alertError));
  130. }
  131. }
  132. return result;
  133. }
  134. TJobAttributes ParseJobAttributes(const TNode& node)
  135. {
  136. const auto& mapNode = node.AsMap();
  137. TJobAttributes result;
  138. // Currently "get_job" returns "job_id" field and "list_jobs" returns "id" field.
  139. auto idNode = mapNode.FindPtr("id");
  140. if (!idNode) {
  141. idNode = mapNode.FindPtr("job_id");
  142. }
  143. if (idNode) {
  144. result.Id = GetGuid(idNode->AsString());
  145. }
  146. if (auto typeNode = mapNode.FindPtr("type")) {
  147. result.Type = FromString<EJobType>(typeNode->AsString());
  148. }
  149. if (auto stateNode = mapNode.FindPtr("state")) {
  150. result.State = FromString<EJobState>(stateNode->AsString());
  151. }
  152. if (auto addressNode = mapNode.FindPtr("address")) {
  153. result.Address = addressNode->AsString();
  154. }
  155. if (auto taskNameNode = mapNode.FindPtr("task_name")) {
  156. result.TaskName = taskNameNode->AsString();
  157. }
  158. if (auto startTimeNode = mapNode.FindPtr("start_time")) {
  159. result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
  160. }
  161. if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
  162. result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
  163. }
  164. if (auto progressNode = mapNode.FindPtr("progress")) {
  165. result.Progress = progressNode->AsDouble();
  166. }
  167. if (auto stderrSizeNode = mapNode.FindPtr("stderr_size")) {
  168. result.StderrSize = stderrSizeNode->AsUint64();
  169. }
  170. if (auto errorNode = mapNode.FindPtr("error")) {
  171. result.Error.ConstructInPlace(*errorNode);
  172. }
  173. if (auto briefStatisticsNode = mapNode.FindPtr("brief_statistics")) {
  174. result.BriefStatistics = *briefStatisticsNode;
  175. }
  176. if (auto inputPathsNode = mapNode.FindPtr("input_paths")) {
  177. const auto& inputPathNodesList = inputPathsNode->AsList();
  178. result.InputPaths.ConstructInPlace();
  179. result.InputPaths->reserve(inputPathNodesList.size());
  180. for (const auto& inputPathNode : inputPathNodesList) {
  181. TRichYPath path;
  182. Deserialize(path, inputPathNode);
  183. result.InputPaths->push_back(std::move(path));
  184. }
  185. }
  186. if (auto coreInfosNode = mapNode.FindPtr("core_infos")) {
  187. const auto& coreInfoNodesList = coreInfosNode->AsList();
  188. result.CoreInfos.ConstructInPlace();
  189. result.CoreInfos->reserve(coreInfoNodesList.size());
  190. for (const auto& coreInfoNode : coreInfoNodesList) {
  191. TCoreInfo coreInfo;
  192. coreInfo.ProcessId = coreInfoNode["process_id"].AsInt64();
  193. coreInfo.ExecutableName = coreInfoNode["executable_name"].AsString();
  194. if (coreInfoNode.HasKey("size")) {
  195. coreInfo.Size = coreInfoNode["size"].AsUint64();
  196. }
  197. if (coreInfoNode.HasKey("error")) {
  198. coreInfo.Error.ConstructInPlace(coreInfoNode["error"]);
  199. }
  200. result.CoreInfos->push_back(std::move(coreInfo));
  201. }
  202. }
  203. return result;
  204. }
  205. TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
  206. {
  207. auto parseSingleResult = [] (const TNode::TMapType& node) {
  208. TCheckPermissionResult result;
  209. result.Action = ::FromString<ESecurityAction>(node.at("action").AsString());
  210. if (auto objectId = node.FindPtr("object_id")) {
  211. result.ObjectId = GetGuid(objectId->AsString());
  212. }
  213. if (auto objectName = node.FindPtr("object_name")) {
  214. result.ObjectName = objectName->AsString();
  215. }
  216. if (auto subjectId = node.FindPtr("subject_id")) {
  217. result.SubjectId = GetGuid(subjectId->AsString());
  218. }
  219. if (auto subjectName = node.FindPtr("subject_name")) {
  220. result.SubjectName = subjectName->AsString();
  221. }
  222. return result;
  223. };
  224. const auto& mapNode = node.AsMap();
  225. TCheckPermissionResponse result;
  226. static_cast<TCheckPermissionResult&>(result) = parseSingleResult(mapNode);
  227. if (auto columns = mapNode.FindPtr("columns")) {
  228. result.Columns.reserve(columns->AsList().size());
  229. for (const auto& columnNode : columns->AsList()) {
  230. result.Columns.push_back(parseSingleResult(columnNode.AsMap()));
  231. }
  232. }
  233. return result;
  234. }
  235. TRichYPath CanonizeYPath(
  236. const IRawClientPtr& rawClient,
  237. const TRichYPath& path)
  238. {
  239. return CanonizeYPaths(rawClient, {path}).front();
  240. }
  241. TVector<TRichYPath> CanonizeYPaths(
  242. const IRawClientPtr& rawClient,
  243. const TVector<TRichYPath>& paths)
  244. {
  245. auto batch = rawClient->CreateRawBatchRequest();
  246. TVector<NThreading::TFuture<TRichYPath>> futures;
  247. futures.reserve(paths.size());
  248. for (const auto& path : paths) {
  249. futures.push_back(batch->CanonizeYPath(path));
  250. }
  251. batch->ExecuteBatch();
  252. TVector<TRichYPath> result;
  253. result.reserve(futures.size());
  254. for (auto& future : futures) {
  255. result.push_back(future.ExtractValueSync());
  256. }
  257. return result;
  258. }
  259. NHttpClient::IHttpResponsePtr SkyShareTable(
  260. const TClientContext& context,
  261. const std::vector<TYPath>& tablePaths,
  262. const TSkyShareTableOptions& options)
  263. {
  264. TMutationId mutationId;
  265. THttpHeader header("POST", "api/v1/share", /*IsApi*/ false);
  266. auto proxyName = context.ServerName.substr(0, context.ServerName.find('.'));
  267. auto host = context.Config->SkynetApiHost;
  268. if (host == "") {
  269. host = "skynet." + proxyName + ".yt.yandex.net";
  270. }
  271. TSkyShareTableOptions patchedOptions = options;
  272. if (context.Config->Pool && !patchedOptions.Pool_) {
  273. patchedOptions.Pool(context.Config->Pool);
  274. }
  275. header.MergeParameters(SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, patchedOptions));
  276. TClientContext skyApiHost({.ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient()});
  277. return RequestWithoutRetry(skyApiHost, mutationId, header, "");
  278. }
  279. TAuthorizationInfo WhoAmI(const TClientContext& context)
  280. {
  281. TMutationId mutationId;
  282. THttpHeader header("GET", "auth/whoami", /*isApi*/ false);
  283. auto requestResult = RequestWithoutRetry(context, mutationId, header);
  284. TAuthorizationInfo result;
  285. NJson::TJsonValue jsonValue;
  286. bool ok = NJson::ReadJsonTree(requestResult->GetResponse(), &jsonValue, /*throwOnError*/ true);
  287. Y_ABORT_UNLESS(ok);
  288. result.Login = jsonValue["login"].GetString();
  289. result.Realm = jsonValue["realm"].GetString();
  290. return result;
  291. }
  292. ////////////////////////////////////////////////////////////////////////////////
  293. } // namespace NYT::NDetail::NRawClient