12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079 |
- #include "raw_requests.h"
- #include "raw_batch_request.h"
- #include "rpc_parameters_serialization.h"
- #include <yt/cpp/mapreduce/common/helpers.h>
- #include <yt/cpp/mapreduce/common/retry_lib.h>
- #include <yt/cpp/mapreduce/common/wait_proxy.h>
- #include <yt/cpp/mapreduce/http/fwd.h>
- #include <yt/cpp/mapreduce/http/context.h>
- #include <yt/cpp/mapreduce/http/helpers.h>
- #include <yt/cpp/mapreduce/http/http_client.h>
- #include <yt/cpp/mapreduce/http/retry_request.h>
- #include <yt/cpp/mapreduce/interface/config.h>
- #include <yt/cpp/mapreduce/interface/client.h>
- #include <yt/cpp/mapreduce/interface/operation.h>
- #include <yt/cpp/mapreduce/interface/serialize.h>
- #include <yt/cpp/mapreduce/interface/tvm.h>
- #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
- #include <library/cpp/yson/node/node_io.h>
- #include <util/generic/guid.h>
- #include <util/generic/scope.h>
- namespace NYT::NDetail::NRawClient {
- ////////////////////////////////////////////////////////////////////////////////
- void ExecuteBatch(
- IRequestRetryPolicyPtr retryPolicy,
- const TClientContext& context,
- TRawBatchRequest& batchRequest,
- const TExecuteBatchOptions& options)
- {
- if (batchRequest.IsExecuted()) {
- ythrow yexception() << "Cannot execute batch request since it is already executed";
- }
- Y_DEFER {
- batchRequest.MarkExecuted();
- };
- const auto concurrency = options.Concurrency_.GetOrElse(50);
- const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
- if (!retryPolicy) {
- retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
- }
- while (batchRequest.BatchSize()) {
- TRawBatchRequest retryBatch(context.Config);
- while (batchRequest.BatchSize()) {
- auto parameters = TNode::CreateMap();
- TInstant nextTry;
- batchRequest.FillParameterList(batchPartMaxSize, ¶meters["requests"], &nextTry);
- if (nextTry) {
- SleepUntil(nextTry);
- }
- parameters["concurrency"] = concurrency;
- auto body = NodeToYsonString(parameters);
- THttpHeader header("POST", "execute_batch");
- header.AddMutationId();
- NDetail::TResponseInfo result;
- try {
- result = RetryRequestWithPolicy(retryPolicy, context, header, body);
- } catch (const std::exception& e) {
- batchRequest.SetErrorResult(std::current_exception());
- retryBatch.SetErrorResult(std::current_exception());
- throw;
- }
- batchRequest.ParseResponse(std::move(result), retryPolicy.Get(), &retryBatch);
- }
- batchRequest = std::move(retryBatch);
- }
- }
- TNode Get(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TGetOptions& options)
- {
- THttpHeader header("GET", "get");
- header.MergeParameters(SerializeParamsForGet(transactionId, context.Config->Prefix, path, options));
- return NodeFromYsonString(RetryRequestWithPolicy(retryPolicy, context, header).Response);
- }
- TNode TryGet(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TGetOptions& options)
- {
- try {
- return Get(retryPolicy, context, transactionId, path, options);
- } catch (const TErrorResponse& error) {
- if (!error.IsResolveError()) {
- throw;
- }
- return TNode();
- }
- }
- void Set(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TNode& value,
- const TSetOptions& options)
- {
- THttpHeader header("PUT", "set");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForSet(transactionId, context.Config->Prefix, path, options));
- auto body = NodeToYsonString(value);
- RetryRequestWithPolicy(retryPolicy, context, header, body);
- }
- void MultisetAttributes(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TNode::TMapType& value,
- const TMultisetAttributesOptions& options)
- {
- THttpHeader header("PUT", "api/v4/multiset_attributes", false);
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForMultisetAttributes(transactionId, context.Config->Prefix, path, options));
- auto body = NodeToYsonString(value);
- RetryRequestWithPolicy(retryPolicy, context, header, body);
- }
- bool Exists(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TExistsOptions& options)
- {
- THttpHeader header("GET", "exists");
- header.MergeParameters(SerializeParamsForExists(transactionId, context.Config->Prefix, path, options));
- return ParseBoolFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
- }
- TNodeId Create(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const ENodeType& type,
- const TCreateOptions& options)
- {
- THttpHeader header("POST", "create");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForCreate(transactionId, context.Config->Prefix, path, type, options));
- return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
- }
- TNodeId CopyWithoutRetries(
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TCopyOptions& options)
- {
- THttpHeader header("POST", "copy");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry(context, header).Response);
- }
- TNodeId CopyInsideMasterCell(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TCopyOptions& options)
- {
- THttpHeader header("POST", "copy");
- header.AddMutationId();
- auto params = SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options);
- // Make cross cell copying disable.
- params["enable_cross_cell_copying"] = false;
- header.MergeParameters(params);
- return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
- }
- TNodeId MoveWithoutRetries(
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TMoveOptions& options)
- {
- THttpHeader header("POST", "move");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry( context, header).Response);
- }
- TNodeId MoveInsideMasterCell(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TMoveOptions& options)
- {
- THttpHeader header("POST", "move");
- header.AddMutationId();
- auto params = SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options);
- // Make cross cell copying disable.
- params["enable_cross_cell_copying"] = false;
- header.MergeParameters(params);
- return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
- }
- void Remove(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TRemoveOptions& options)
- {
- THttpHeader header("POST", "remove");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForRemove(transactionId, context.Config->Prefix, path, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- TNode::TListType List(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TListOptions& options)
- {
- THttpHeader header("GET", "list");
- TYPath updatedPath = AddPathPrefix(path, context.Config->Prefix);
- // Translate "//" to "/"
- // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config"
- if (path.empty() && updatedPath.EndsWith('/')) {
- updatedPath.pop_back();
- }
- header.MergeParameters(SerializeParamsForList(transactionId, context.Config->Prefix, updatedPath, options));
- auto result = RetryRequestWithPolicy(retryPolicy, context, header);
- return NodeFromYsonString(result.Response).AsList();
- }
- TNodeId Link(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& targetPath,
- const TYPath& linkPath,
- const TLinkOptions& options)
- {
- THttpHeader header("POST", "link");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForLink(transactionId, context.Config->Prefix, targetPath, linkPath, options));
- return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
- }
- TLockId Lock(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- ELockMode mode,
- const TLockOptions& options)
- {
- THttpHeader header("POST", "lock");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForLock(transactionId, context.Config->Prefix, path, mode, options));
- return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
- }
- void Unlock(
- IRequestRetryPolicyPtr retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TUnlockOptions& options)
- {
- THttpHeader header("POST", "unlock");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForUnlock(transactionId, context.Config->Prefix, path, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- void Concatenate(
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TVector<TRichYPath>& sourcePaths,
- const TRichYPath& destinationPath,
- const TConcatenateOptions& options)
- {
- THttpHeader header("POST", "concatenate");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForConcatenate(transactionId, context.Config->Prefix, sourcePaths, destinationPath, options));
- RequestWithoutRetry(context, header);
- }
- void PingTx(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId)
- {
- THttpHeader header("POST", "ping_tx");
- header.MergeParameters(SerializeParamsForPingTx(transactionId));
- TRequestConfig requestConfig;
- requestConfig.HttpConfig = NHttpClient::THttpConfig{
- .SocketTimeout = context.Config->PingTimeout
- };
- RetryRequestWithPolicy(retryPolicy, context, header, {}, requestConfig);
- }
- TOperationAttributes ParseOperationAttributes(const TNode& node)
- {
- const auto& mapNode = node.AsMap();
- TOperationAttributes result;
- if (auto idNode = mapNode.FindPtr("id")) {
- result.Id = GetGuid(idNode->AsString());
- }
- if (auto typeNode = mapNode.FindPtr("type")) {
- result.Type = FromString<EOperationType>(typeNode->AsString());
- } else if (auto operationTypeNode = mapNode.FindPtr("operation_type")) {
- // COMPAT(levysotsky): "operation_type" is a deprecated synonym for "type".
- // This branch should be removed when all clusters are updated.
- result.Type = FromString<EOperationType>(operationTypeNode->AsString());
- }
- if (auto stateNode = mapNode.FindPtr("state")) {
- result.State = stateNode->AsString();
- // We don't use FromString here, because OS_IN_PROGRESS unites many states: "initializing", "running", etc.
- if (*result.State == "completed") {
- result.BriefState = EOperationBriefState::Completed;
- } else if (*result.State == "aborted") {
- result.BriefState = EOperationBriefState::Aborted;
- } else if (*result.State == "failed") {
- result.BriefState = EOperationBriefState::Failed;
- } else {
- result.BriefState = EOperationBriefState::InProgress;
- }
- }
- if (auto authenticatedUserNode = mapNode.FindPtr("authenticated_user")) {
- result.AuthenticatedUser = authenticatedUserNode->AsString();
- }
- if (auto startTimeNode = mapNode.FindPtr("start_time")) {
- result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
- }
- if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
- result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
- }
- auto briefProgressNode = mapNode.FindPtr("brief_progress");
- if (briefProgressNode && briefProgressNode->HasKey("jobs")) {
- result.BriefProgress.ConstructInPlace();
- static auto load = [] (const TNode& item) {
- // Backward compatibility with old YT versions
- return item.IsInt64() ? item.AsInt64() : item["total"].AsInt64();
- };
- const auto& jobs = (*briefProgressNode)["jobs"];
- result.BriefProgress->Aborted = load(jobs["aborted"]);
- result.BriefProgress->Completed = load(jobs["completed"]);
- result.BriefProgress->Running = jobs["running"].AsInt64();
- result.BriefProgress->Total = jobs["total"].AsInt64();
- result.BriefProgress->Failed = jobs["failed"].AsInt64();
- result.BriefProgress->Lost = jobs["lost"].AsInt64();
- result.BriefProgress->Pending = jobs["pending"].AsInt64();
- }
- if (auto briefSpecNode = mapNode.FindPtr("brief_spec")) {
- result.BriefSpec = *briefSpecNode;
- }
- if (auto specNode = mapNode.FindPtr("spec")) {
- result.Spec = *specNode;
- }
- if (auto fullSpecNode = mapNode.FindPtr("full_spec")) {
- result.FullSpec = *fullSpecNode;
- }
- if (auto unrecognizedSpecNode = mapNode.FindPtr("unrecognized_spec")) {
- result.UnrecognizedSpec = *unrecognizedSpecNode;
- }
- if (auto suspendedNode = mapNode.FindPtr("suspended")) {
- result.Suspended = suspendedNode->AsBool();
- }
- if (auto resultNode = mapNode.FindPtr("result")) {
- result.Result.ConstructInPlace();
- auto error = TYtError((*resultNode)["error"]);
- if (error.GetCode() != 0) {
- result.Result->Error = std::move(error);
- }
- }
- if (auto progressNode = mapNode.FindPtr("progress")) {
- const auto& progressMap = progressNode->AsMap();
- TMaybe<TInstant> buildTime;
- if (auto buildTimeNode = progressMap.FindPtr("build_time")) {
- buildTime = TInstant::ParseIso8601(buildTimeNode->AsString());
- }
- TJobStatistics jobStatistics;
- if (auto jobStatisticsNode = progressMap.FindPtr("job_statistics")) {
- jobStatistics = TJobStatistics(*jobStatisticsNode);
- }
- TJobCounters jobCounters;
- if (auto jobCountersNode = progressMap.FindPtr("total_job_counter")) {
- jobCounters = TJobCounters(*jobCountersNode);
- }
- result.Progress = TOperationProgress{
- .JobStatistics = std::move(jobStatistics),
- .JobCounters = std::move(jobCounters),
- .BuildTime = buildTime,
- };
- }
- if (auto eventsNode = mapNode.FindPtr("events")) {
- result.Events.ConstructInPlace().reserve(eventsNode->Size());
- for (const auto& eventNode : eventsNode->AsList()) {
- result.Events->push_back(TOperationEvent{
- eventNode["state"].AsString(),
- TInstant::ParseIso8601(eventNode["time"].AsString()),
- });
- }
- }
- if (auto alertsNode = mapNode.FindPtr("alerts")) {
- result.Alerts.ConstructInPlace();
- for (const auto& [alertType, alertError] : alertsNode->AsMap()) {
- result.Alerts->emplace(alertType, TYtError(alertError));
- }
- }
- return result;
- }
- TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TGetOperationOptions& options)
- {
- THttpHeader header("GET", "get_operation");
- header.MergeParameters(SerializeParamsForGetOperation(operationId, options));
- auto result = RetryRequestWithPolicy(retryPolicy, context, header);
- return ParseOperationAttributes(NodeFromYsonString(result.Response));
- }
- TOperationAttributes GetOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TString& alias,
- const TGetOperationOptions& options)
- {
- THttpHeader header("GET", "get_operation");
- header.MergeParameters(SerializeParamsForGetOperation(alias, options));
- auto result = RetryRequestWithPolicy(retryPolicy, context, header);
- return ParseOperationAttributes(NodeFromYsonString(result.Response));
- }
- void AbortOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId)
- {
- THttpHeader header("POST", "abort_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForAbortOperation(operationId));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- void CompleteOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId)
- {
- THttpHeader header("POST", "complete_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForCompleteOperation(operationId));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- void SuspendOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TSuspendOperationOptions& options)
- {
- THttpHeader header("POST", "suspend_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForSuspendOperation(operationId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- void ResumeOperation(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TResumeOperationOptions& options)
- {
- THttpHeader header("POST", "resume_op");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForResumeOperation(operationId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- template <typename TKey>
- static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
- {
- THashMap<TKey, i64> counts;
- for (const auto& entry : countsNode.AsMap()) {
- counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
- }
- return counts;
- }
- TListOperationsResult ListOperations(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TListOperationsOptions& options)
- {
- THttpHeader header("GET", "list_operations");
- header.MergeParameters(SerializeParamsForListOperations(options));
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
- TListOperationsResult result;
- for (const auto& operationNode : resultNode["operations"].AsList()) {
- result.Operations.push_back(ParseOperationAttributes(operationNode));
- }
- if (resultNode.HasKey("pool_counts")) {
- result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
- }
- if (resultNode.HasKey("user_counts")) {
- result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
- }
- if (resultNode.HasKey("type_counts")) {
- result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
- }
- if (resultNode.HasKey("state_counts")) {
- result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
- }
- if (resultNode.HasKey("failed_jobs_count")) {
- result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
- }
- result.Incomplete = resultNode["incomplete"].AsBool();
- return result;
- }
- void UpdateOperationParameters(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options)
- {
- THttpHeader header("POST", "update_op_parameters");
- header.MergeParameters(SerializeParamsForUpdateOperationParameters(operationId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- TJobAttributes ParseJobAttributes(const TNode& node)
- {
- const auto& mapNode = node.AsMap();
- TJobAttributes result;
- // Currently "get_job" returns "job_id" field and "list_jobs" returns "id" field.
- auto idNode = mapNode.FindPtr("id");
- if (!idNode) {
- idNode = mapNode.FindPtr("job_id");
- }
- if (idNode) {
- result.Id = GetGuid(idNode->AsString());
- }
- if (auto typeNode = mapNode.FindPtr("type")) {
- result.Type = FromString<EJobType>(typeNode->AsString());
- }
- if (auto stateNode = mapNode.FindPtr("state")) {
- result.State = FromString<EJobState>(stateNode->AsString());
- }
- if (auto addressNode = mapNode.FindPtr("address")) {
- result.Address = addressNode->AsString();
- }
- if (auto taskNameNode = mapNode.FindPtr("task_name")) {
- result.TaskName = taskNameNode->AsString();
- }
- if (auto startTimeNode = mapNode.FindPtr("start_time")) {
- result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString());
- }
- if (auto finishTimeNode = mapNode.FindPtr("finish_time")) {
- result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString());
- }
- if (auto progressNode = mapNode.FindPtr("progress")) {
- result.Progress = progressNode->AsDouble();
- }
- if (auto stderrSizeNode = mapNode.FindPtr("stderr_size")) {
- result.StderrSize = stderrSizeNode->AsUint64();
- }
- if (auto errorNode = mapNode.FindPtr("error")) {
- result.Error.ConstructInPlace(*errorNode);
- }
- if (auto briefStatisticsNode = mapNode.FindPtr("brief_statistics")) {
- result.BriefStatistics = *briefStatisticsNode;
- }
- if (auto inputPathsNode = mapNode.FindPtr("input_paths")) {
- const auto& inputPathNodesList = inputPathsNode->AsList();
- result.InputPaths.ConstructInPlace();
- result.InputPaths->reserve(inputPathNodesList.size());
- for (const auto& inputPathNode : inputPathNodesList) {
- TRichYPath path;
- Deserialize(path, inputPathNode);
- result.InputPaths->push_back(std::move(path));
- }
- }
- if (auto coreInfosNode = mapNode.FindPtr("core_infos")) {
- const auto& coreInfoNodesList = coreInfosNode->AsList();
- result.CoreInfos.ConstructInPlace();
- result.CoreInfos->reserve(coreInfoNodesList.size());
- for (const auto& coreInfoNode : coreInfoNodesList) {
- TCoreInfo coreInfo;
- coreInfo.ProcessId = coreInfoNode["process_id"].AsInt64();
- coreInfo.ExecutableName = coreInfoNode["executable_name"].AsString();
- if (coreInfoNode.HasKey("size")) {
- coreInfo.Size = coreInfoNode["size"].AsUint64();
- }
- if (coreInfoNode.HasKey("error")) {
- coreInfo.Error.ConstructInPlace(coreInfoNode["error"]);
- }
- result.CoreInfos->push_back(std::move(coreInfo));
- }
- }
- return result;
- }
- TJobAttributes GetJob(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobOptions& options)
- {
- THttpHeader header("GET", "get_job");
- header.MergeParameters(SerializeParamsForGetJob(operationId, jobId, options));
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
- return ParseJobAttributes(resultNode);
- }
- TListJobsResult ListJobs(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TListJobsOptions& options)
- {
- THttpHeader header("GET", "list_jobs");
- header.MergeParameters(SerializeParamsForListJobs(operationId, options));
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
- TListJobsResult result;
- const auto& jobNodesList = resultNode["jobs"].AsList();
- result.Jobs.reserve(jobNodesList.size());
- for (const auto& jobNode : jobNodesList) {
- result.Jobs.push_back(ParseJobAttributes(jobNode));
- }
- if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) {
- result.CypressJobCount = resultNode["cypress_job_count"].AsInt64();
- }
- if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) {
- result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64();
- }
- if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) {
- result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64();
- }
- return result;
- }
- class TResponseReader
- : public IFileReader
- {
- public:
- TResponseReader(const TClientContext& context, THttpHeader header)
- {
- if (context.ServiceTicketAuth) {
- header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
- } else {
- header.SetToken(context.Token);
- }
- if (context.ImpersonationUser) {
- header.SetImpersonationUser(*context.ImpersonationUser);
- }
- auto hostName = GetProxyForHeavyRequest(context);
- auto requestId = CreateGuidAsString();
- UpdateHeaderForProxyIfNeed(hostName, context, header);
- Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
- ResponseStream_ = Response_->GetResponseStream();
- }
- private:
- size_t DoRead(void* buf, size_t len) override
- {
- return ResponseStream_->Read(buf, len);
- }
- size_t DoSkip(size_t len) override
- {
- return ResponseStream_->Skip(len);
- }
- private:
- THttpRequest Request_;
- NHttpClient::IHttpResponsePtr Response_;
- IInputStream* ResponseStream_;
- };
- IFileReaderPtr GetJobInput(
- const TClientContext& context,
- const TJobId& jobId,
- const TGetJobInputOptions& /* options */)
- {
- THttpHeader header("GET", "get_job_input");
- header.AddParameter("job_id", GetGuidAsString(jobId));
- return new TResponseReader(context, std::move(header));
- }
- IFileReaderPtr GetJobFailContext(
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobFailContextOptions& /* options */)
- {
- THttpHeader header("GET", "get_job_fail_context");
- header.AddOperationId(operationId);
- header.AddParameter("job_id", GetGuidAsString(jobId));
- return new TResponseReader(context, std::move(header));
- }
- TString GetJobStderrWithRetries(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobStderrOptions& /* options */)
- {
- THttpHeader header("GET", "get_job_stderr");
- header.AddOperationId(operationId);
- header.AddParameter("job_id", GetGuidAsString(jobId));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header, {}, config);
- return responseInfo.Response;
- }
- IFileReaderPtr GetJobStderr(
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobStderrOptions& /* options */)
- {
- THttpHeader header("GET", "get_job_stderr");
- header.AddOperationId(operationId);
- header.AddParameter("job_id", GetGuidAsString(jobId));
- return new TResponseReader(context, std::move(header));
- }
- TMaybe<TYPath> GetFileFromCache(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TString& md5Signature,
- const TYPath& cachePath,
- const TGetFileFromCacheOptions& options)
- {
- THttpHeader header("GET", "get_file_from_cache");
- header.MergeParameters(SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options));
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
- auto path = NodeFromYsonString(responseInfo.Response).AsString();
- return path.empty() ? Nothing() : TMaybe<TYPath>(path);
- }
- TYPath PutFileToCache(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& filePath,
- const TString& md5Signature,
- const TYPath& cachePath,
- const TPutFileToCacheOptions& options)
- {
- THttpHeader header("POST", "put_file_to_cache");
- header.MergeParameters(SerializeParamsForPutFileToCache(transactionId, context.Config->Prefix, filePath, md5Signature, cachePath, options));
- auto result = RetryRequestWithPolicy(retryPolicy, context, header);
- return NodeFromYsonString(result.Response).AsString();
- }
- TNode::TListType SkyShareTable(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const std::vector<TYPath>& tablePaths,
- const TSkyShareTableOptions& options)
- {
- THttpHeader header("POST", "api/v1/share", /*IsApi*/ false);
- auto proxyName = context.ServerName.substr(0, context.ServerName.find('.'));
- auto host = context.Config->SkynetApiHost;
- if (host == "") {
- host = "skynet." + proxyName + ".yt.yandex.net";
- }
- header.MergeParameters(SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, options));
- TClientContext skyApiHost({ .ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient() });
- TResponseInfo response = {};
- // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu
- // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK).
- while (response.HttpCode != 200) {
- response = RetryRequestWithPolicy(retryPolicy, skyApiHost, header, "");
- TWaitProxy::Get()->Sleep(TDuration::Seconds(5));
- }
- if (options.KeyColumns_) {
- return NodeFromJsonString(response.Response)["torrents"].AsList();
- } else {
- TNode torrent;
- torrent["key"] = TNode::CreateList();
- torrent["rbtorrent"] = response.Response;
- return TNode::TListType{ torrent };
- }
- }
- TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
- {
- auto parseSingleResult = [] (const TNode::TMapType& node) {
- TCheckPermissionResult result;
- result.Action = ::FromString<ESecurityAction>(node.at("action").AsString());
- if (auto objectId = node.FindPtr("object_id")) {
- result.ObjectId = GetGuid(objectId->AsString());
- }
- if (auto objectName = node.FindPtr("object_name")) {
- result.ObjectName = objectName->AsString();
- }
- if (auto subjectId = node.FindPtr("subject_id")) {
- result.SubjectId = GetGuid(subjectId->AsString());
- }
- if (auto subjectName = node.FindPtr("subject_name")) {
- result.SubjectName = subjectName->AsString();
- }
- return result;
- };
- const auto& mapNode = node.AsMap();
- TCheckPermissionResponse result;
- static_cast<TCheckPermissionResult&>(result) = parseSingleResult(mapNode);
- if (auto columns = mapNode.FindPtr("columns")) {
- result.Columns.reserve(columns->AsList().size());
- for (const auto& columnNode : columns->AsList()) {
- result.Columns.push_back(parseSingleResult(columnNode.AsMap()));
- }
- }
- return result;
- }
- TCheckPermissionResponse CheckPermission(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TString& user,
- EPermission permission,
- const TYPath& path,
- const TCheckPermissionOptions& options)
- {
- THttpHeader header("GET", "check_permission");
- header.MergeParameters(SerializeParamsForCheckPermission(user, permission, context.Config->Prefix, path, options));
- auto response = RetryRequestWithPolicy(retryPolicy, context, header);
- return ParseCheckPermissionResponse(NodeFromYsonString(response.Response));
- }
- TVector<TTabletInfo> GetTabletInfos(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TYPath& path,
- const TVector<int>& tabletIndexes,
- const TGetTabletInfosOptions& options)
- {
- THttpHeader header("POST", "api/v4/get_tablet_infos", false);
- header.MergeParameters(SerializeParamsForGetTabletInfos(context.Config->Prefix, path, tabletIndexes, options));
- auto response = RetryRequestWithPolicy(retryPolicy, context, header);
- TVector<TTabletInfo> result;
- Deserialize(result, *NodeFromYsonString(response.Response).AsMap().FindPtr("tablets"));
- return result;
- }
- TVector<TTableColumnarStatistics> GetTableColumnarStatistics(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TVector<TRichYPath>& paths,
- const TGetTableColumnarStatisticsOptions& options)
- {
- THttpHeader header("GET", "get_table_columnar_statistics");
- header.MergeParameters(SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options));
- TRequestConfig config;
- config.IsHeavy = true;
- auto requestResult = RetryRequestWithPolicy(retryPolicy, context, header, {}, config);
- auto response = NodeFromYsonString(requestResult.Response);
- TVector<TTableColumnarStatistics> result;
- Deserialize(result, response);
- return result;
- }
- TMultiTablePartitions GetTablePartitions(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TVector<TRichYPath>& paths,
- const TGetTablePartitionsOptions& options)
- {
- THttpHeader header("GET", "partition_tables");
- header.MergeParameters(SerializeParamsForGetTablePartitions(transactionId, paths, options));
- TRequestConfig config;
- config.IsHeavy = true;
- auto requestResult = RetryRequestWithPolicy(retryPolicy, context, header, {}, config);
- auto response = NodeFromYsonString(requestResult.Response);
- TMultiTablePartitions result;
- Deserialize(result, response);
- return result;
- }
- TRichYPath CanonizeYPath(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TRichYPath& path)
- {
- return CanonizeYPaths(retryPolicy, context, {path}).front();
- }
- TVector<TRichYPath> CanonizeYPaths(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TVector<TRichYPath>& paths)
- {
- TRawBatchRequest batch(context.Config);
- TVector<NThreading::TFuture<TRichYPath>> futures;
- futures.reserve(paths.size());
- for (int i = 0; i < static_cast<int>(paths.size()); ++i) {
- futures.push_back(batch.CanonizeYPath(paths[i]));
- }
- ExecuteBatch(retryPolicy, context, batch, TExecuteBatchOptions{});
- TVector<TRichYPath> result;
- result.reserve(futures.size());
- for (auto& future : futures) {
- result.push_back(future.ExtractValueSync());
- }
- return result;
- }
- void AlterTable(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TAlterTableOptions& options)
- {
- THttpHeader header("POST", "alter_table");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForAlterTable(transactionId, context.Config->Prefix, path, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- void AlterTableReplica(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TReplicaId& replicaId,
- const TAlterTableReplicaOptions& options)
- {
- THttpHeader header("POST", "alter_table_replica");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- void DeleteRows(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TYPath& path,
- const TNode::TListType& keys,
- const TDeleteRowsOptions& options)
- {
- THttpHeader header("PUT", "delete_rows");
- header.SetInputFormat(TFormat::YsonBinary());
- header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(context.Config->Prefix, path, options));
- auto body = NodeListToYsonString(keys);
- TRequestConfig requestConfig;
- requestConfig.IsHeavy = true;
- RetryRequestWithPolicy(retryPolicy, context, header, body, requestConfig);
- }
- void FreezeTable(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TYPath& path,
- const TFreezeTableOptions& options)
- {
- THttpHeader header("POST", "freeze_table");
- header.MergeParameters(SerializeParamsForFreezeTable(context.Config->Prefix, path, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- void UnfreezeTable(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TYPath& path,
- const TUnfreezeTableOptions& options)
- {
- THttpHeader header("POST", "unfreeze_table");
- header.MergeParameters(SerializeParamsForUnfreezeTable(context.Config->Prefix, path, options));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- void AbortTransaction(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId)
- {
- THttpHeader header("POST", "abort_tx");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- void CommitTransaction(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId)
- {
- THttpHeader header("POST", "commit_tx");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId));
- RetryRequestWithPolicy(retryPolicy, context, header);
- }
- TTransactionId StartTransaction(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& parentTransactionId,
- const TStartTransactionOptions& options)
- {
- THttpHeader header("POST", "start_tx");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, context.Config->TxTimeout, options));
- return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT::NDetail::NRawClient
|