raw_requests.cpp 13 KB


  1. #include "raw_requests.h"
  2. #include "raw_batch_request.h"
  3. #include "rpc_parameters_serialization.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  7. #include <yt/cpp/mapreduce/http/fwd.h>
  8. #include <yt/cpp/mapreduce/http/context.h>
  9. #include <yt/cpp/mapreduce/http/helpers.h>
  10. #include <yt/cpp/mapreduce/http/http_client.h>
  11. #include <yt/cpp/mapreduce/http/retry_request.h>
  12. #include <yt/cpp/mapreduce/interface/config.h>
  13. #include <yt/cpp/mapreduce/interface/client.h>
  14. #include <yt/cpp/mapreduce/interface/operation.h>
  15. #include <yt/cpp/mapreduce/interface/serialize.h>
  16. #include <yt/cpp/mapreduce/interface/tvm.h>
  17. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  18. #include <library/cpp/yson/node/node_io.h>
  19. #include <util/generic/guid.h>
  20. #include <util/generic/scope.h>
  21. namespace NYT::NDetail::NRawClient {
  22. ////////////////////////////////////////////////////////////////////////////////
  23. void ExecuteBatch(
  24. IRequestRetryPolicyPtr retryPolicy,
  25. const TClientContext& context,
  26. TRawBatchRequest& batchRequest,
  27. const TExecuteBatchOptions& options)
  28. {
  29. if (batchRequest.IsExecuted()) {
  30. ythrow yexception() << "Cannot execute batch request since it is already executed";
  31. }
  32. Y_DEFER {
  33. batchRequest.MarkExecuted();
  34. };
  35. const auto concurrency = options.Concurrency_.GetOrElse(50);
  36. const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
  37. if (!retryPolicy) {
  38. retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
  39. }
  40. while (batchRequest.BatchSize()) {
  41. TRawBatchRequest retryBatch(context.Config);
  42. while (batchRequest.BatchSize()) {
  43. auto parameters = TNode::CreateMap();
  44. TInstant nextTry;
  45. batchRequest.FillParameterList(batchPartMaxSize, &parameters["requests"], &nextTry);
  46. if (nextTry) {
  47. SleepUntil(nextTry);
  48. }
  49. parameters["concurrency"] = concurrency;
  50. auto body = NodeToYsonString(parameters);
  51. THttpHeader header("POST", "execute_batch");
  52. header.AddMutationId();
  53. NDetail::TResponseInfo result;
  54. try {
  55. result = RetryRequestWithPolicy(retryPolicy, context, header, body);
  56. } catch (const std::exception& e) {
  57. batchRequest.SetErrorResult(std::current_exception());
  58. retryBatch.SetErrorResult(std::current_exception());
  59. throw;
  60. }
  61. batchRequest.ParseResponse(std::move(result), retryPolicy.Get(), &retryBatch);
  62. }
  63. batchRequest = std::move(retryBatch);
  64. }
  65. }
  66. TOperationAttributes ParseOperationAttributes(const TNode& node)
  67. {
  68. const auto& mapNode = node.AsMap();
  69. TOperationAttributes result;
  70. if (auto idNode = mapNode.FindPtr("id")) {
  71. result.Id = GetGuid(idNode->AsString());
  72. }
  73. if (auto typeNode = mapNode.FindPtr("type")) {
  74. result.Type = FromString<EOperationType>(typeNode->AsString());
  75. } else if (auto operationTypeNode = mapNode.FindPtr("operation_type")) {
  76. // COMPAT(levysotsky): "operation_type" is a deprecated synonym for "type".
  77. // This branch should be removed when all clusters are updated.
  78. result.Type = FromString<EOperationType>(operationTypeNode->AsString());
  79. }
  80. if (auto stateNode = mapNode.FindPtr("state")) {
  81. result.State = stateNode->AsString();
  82. // We don't use FromString here, because OS_IN_PROGRESS unites many states: "initializing", "running", etc.
  83. if (*result.State == "completed") {
  84. result.BriefState = EOperationBriefState::Completed;
  85. } else if (*result.State == "aborted") {
  86. result.BriefState = EOperationBriefState::Aborted;
  87. } else if (*result.State == "failed") {
  88. result.BriefState = EOperationBriefState::Failed;
  89. } else {
  90. result.BriefState = EOperationBriefState::InProgress;
  91. }
  92. }
  93. if (auto authenticatedUserNode = mapNode.FindPtr("authenticated_user")) {
  94. result.AuthenticatedUser = authenticatedUserNode->AsString();
  95. }
  96. if (auto startTimeNode = mapNode.FindPtr("start_time")) {
  97. result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
  98. }
  99. if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
  100. result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
  101. }
  102. auto briefProgressNode = mapNode.FindPtr("brief_progress");
  103. if (briefProgressNode && briefProgressNode->HasKey("jobs")) {
  104. result.BriefProgress.ConstructInPlace();
  105. static auto load = [] (const TNode& item) {
  106. // Backward compatibility with old YT versions
  107. return item.IsInt64() ? item.AsInt64() : item["total"].AsInt64();
  108. };
  109. const auto& jobs = (*briefProgressNode)["jobs"];
  110. result.BriefProgress->Aborted = load(jobs["aborted"]);
  111. result.BriefProgress->Completed = load(jobs["completed"]);
  112. result.BriefProgress->Running = jobs["running"].AsInt64();
  113. result.BriefProgress->Total = jobs["total"].AsInt64();
  114. result.BriefProgress->Failed = jobs["failed"].AsInt64();
  115. result.BriefProgress->Lost = jobs["lost"].AsInt64();
  116. result.BriefProgress->Pending = jobs["pending"].AsInt64();
  117. }
  118. if (auto briefSpecNode = mapNode.FindPtr("brief_spec")) {
  119. result.BriefSpec = *briefSpecNode;
  120. }
  121. if (auto specNode = mapNode.FindPtr("spec")) {
  122. result.Spec = *specNode;
  123. }
  124. if (auto fullSpecNode = mapNode.FindPtr("full_spec")) {
  125. result.FullSpec = *fullSpecNode;
  126. }
  127. if (auto unrecognizedSpecNode = mapNode.FindPtr("unrecognized_spec")) {
  128. result.UnrecognizedSpec = *unrecognizedSpecNode;
  129. }
  130. if (auto suspendedNode = mapNode.FindPtr("suspended")) {
  131. result.Suspended = suspendedNode->AsBool();
  132. }
  133. if (auto resultNode = mapNode.FindPtr("result")) {
  134. result.Result.ConstructInPlace();
  135. auto error = TYtError((*resultNode)["error"]);
  136. if (error.GetCode() != 0) {
  137. result.Result->Error = std::move(error);
  138. }
  139. }
  140. if (auto progressNode = mapNode.FindPtr("progress")) {
  141. const auto& progressMap = progressNode->AsMap();
  142. TMaybe<TInstant> buildTime;
  143. if (auto buildTimeNode = progressMap.FindPtr("build_time")) {
  144. buildTime = TInstant::ParseIso8601(buildTimeNode->AsString());
  145. }
  146. TJobStatistics jobStatistics;
  147. if (auto jobStatisticsNode = progressMap.FindPtr("job_statistics")) {
  148. jobStatistics = TJobStatistics(*jobStatisticsNode);
  149. }
  150. TJobCounters jobCounters;
  151. if (auto jobCountersNode = progressMap.FindPtr("total_job_counter")) {
  152. jobCounters = TJobCounters(*jobCountersNode);
  153. }
  154. result.Progress = TOperationProgress{
  155. .JobStatistics = std::move(jobStatistics),
  156. .JobCounters = std::move(jobCounters),
  157. .BuildTime = buildTime,
  158. };
  159. }
  160. if (auto eventsNode = mapNode.FindPtr("events")) {
  161. result.Events.ConstructInPlace().reserve(eventsNode->Size());
  162. for (const auto& eventNode : eventsNode->AsList()) {
  163. result.Events->push_back(TOperationEvent{
  164. eventNode["state"].AsString(),
  165. TInstant::ParseIso8601(eventNode["time"].AsString()),
  166. });
  167. }
  168. }
  169. if (auto alertsNode = mapNode.FindPtr("alerts")) {
  170. result.Alerts.ConstructInPlace();
  171. for (const auto& [alertType, alertError] : alertsNode->AsMap()) {
  172. result.Alerts->emplace(alertType, TYtError(alertError));
  173. }
  174. }
  175. return result;
  176. }
  177. TJobAttributes ParseJobAttributes(const TNode& node)
  178. {
  179. const auto& mapNode = node.AsMap();
  180. TJobAttributes result;
  181. // Currently "get_job" returns "job_id" field and "list_jobs" returns "id" field.
  182. auto idNode = mapNode.FindPtr("id");
  183. if (!idNode) {
  184. idNode = mapNode.FindPtr("job_id");
  185. }
  186. if (idNode) {
  187. result.Id = GetGuid(idNode->AsString());
  188. }
  189. if (auto typeNode = mapNode.FindPtr("type")) {
  190. result.Type = FromString<EJobType>(typeNode->AsString());
  191. }
  192. if (auto stateNode = mapNode.FindPtr("state")) {
  193. result.State = FromString<EJobState>(stateNode->AsString());
  194. }
  195. if (auto addressNode = mapNode.FindPtr("address")) {
  196. result.Address = addressNode->AsString();
  197. }
  198. if (auto taskNameNode = mapNode.FindPtr("task_name")) {
  199. result.TaskName = taskNameNode->AsString();
  200. }
  201. if (auto startTimeNode = mapNode.FindPtr("start_time")) {
  202. result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
  203. }
  204. if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
  205. result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
  206. }
  207. if (auto progressNode = mapNode.FindPtr("progress")) {
  208. result.Progress = progressNode->AsDouble();
  209. }
  210. if (auto stderrSizeNode = mapNode.FindPtr("stderr_size")) {
  211. result.StderrSize = stderrSizeNode->AsUint64();
  212. }
  213. if (auto errorNode = mapNode.FindPtr("error")) {
  214. result.Error.ConstructInPlace(*errorNode);
  215. }
  216. if (auto briefStatisticsNode = mapNode.FindPtr("brief_statistics")) {
  217. result.BriefStatistics = *briefStatisticsNode;
  218. }
  219. if (auto inputPathsNode = mapNode.FindPtr("input_paths")) {
  220. const auto& inputPathNodesList = inputPathsNode->AsList();
  221. result.InputPaths.ConstructInPlace();
  222. result.InputPaths->reserve(inputPathNodesList.size());
  223. for (const auto& inputPathNode : inputPathNodesList) {
  224. TRichYPath path;
  225. Deserialize(path, inputPathNode);
  226. result.InputPaths->push_back(std::move(path));
  227. }
  228. }
  229. if (auto coreInfosNode = mapNode.FindPtr("core_infos")) {
  230. const auto& coreInfoNodesList = coreInfosNode->AsList();
  231. result.CoreInfos.ConstructInPlace();
  232. result.CoreInfos->reserve(coreInfoNodesList.size());
  233. for (const auto& coreInfoNode : coreInfoNodesList) {
  234. TCoreInfo coreInfo;
  235. coreInfo.ProcessId = coreInfoNode["process_id"].AsInt64();
  236. coreInfo.ExecutableName = coreInfoNode["executable_name"].AsString();
  237. if (coreInfoNode.HasKey("size")) {
  238. coreInfo.Size = coreInfoNode["size"].AsUint64();
  239. }
  240. if (coreInfoNode.HasKey("error")) {
  241. coreInfo.Error.ConstructInPlace(coreInfoNode["error"]);
  242. }
  243. result.CoreInfos->push_back(std::move(coreInfo));
  244. }
  245. }
  246. return result;
  247. }
  248. TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
  249. {
  250. auto parseSingleResult = [] (const TNode::TMapType& node) {
  251. TCheckPermissionResult result;
  252. result.Action = ::FromString<ESecurityAction>(node.at("action").AsString());
  253. if (auto objectId = node.FindPtr("object_id")) {
  254. result.ObjectId = GetGuid(objectId->AsString());
  255. }
  256. if (auto objectName = node.FindPtr("object_name")) {
  257. result.ObjectName = objectName->AsString();
  258. }
  259. if (auto subjectId = node.FindPtr("subject_id")) {
  260. result.SubjectId = GetGuid(subjectId->AsString());
  261. }
  262. if (auto subjectName = node.FindPtr("subject_name")) {
  263. result.SubjectName = subjectName->AsString();
  264. }
  265. return result;
  266. };
  267. const auto& mapNode = node.AsMap();
  268. TCheckPermissionResponse result;
  269. static_cast<TCheckPermissionResult&>(result) = parseSingleResult(mapNode);
  270. if (auto columns = mapNode.FindPtr("columns")) {
  271. result.Columns.reserve(columns->AsList().size());
  272. for (const auto& columnNode : columns->AsList()) {
  273. result.Columns.push_back(parseSingleResult(columnNode.AsMap()));
  274. }
  275. }
  276. return result;
  277. }
  278. TRichYPath CanonizeYPath(
  279. const IRequestRetryPolicyPtr& retryPolicy,
  280. const TClientContext& context,
  281. const TRichYPath& path)
  282. {
  283. return CanonizeYPaths(retryPolicy, context, {path}).front();
  284. }
  285. TVector<TRichYPath> CanonizeYPaths(
  286. const IRequestRetryPolicyPtr& retryPolicy,
  287. const TClientContext& context,
  288. const TVector<TRichYPath>& paths)
  289. {
  290. TRawBatchRequest batch(context.Config);
  291. TVector<NThreading::TFuture<TRichYPath>> futures;
  292. futures.reserve(paths.size());
  293. for (int i = 0; i < static_cast<int>(paths.size()); ++i) {
  294. futures.push_back(batch.CanonizeYPath(paths[i]));
  295. }
  296. ExecuteBatch(retryPolicy, context, batch, TExecuteBatchOptions{});
  297. TVector<TRichYPath> result;
  298. result.reserve(futures.size());
  299. for (auto& future : futures) {
  300. result.push_back(future.ExtractValueSync());
  301. }
  302. return result;
  303. }
  304. ////////////////////////////////////////////////////////////////////////////////
  305. } // namespace NYT::NDetail::NRawClient