#include "raw_requests.h" #include "raw_batch_request.h" #include "rpc_parameters_serialization.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace NYT::NDetail::NRawClient { /////////////////////////////////////////////////////////////////////////////// void ExecuteBatch( IRequestRetryPolicyPtr retryPolicy, const TClientContext& context, TRawBatchRequest& batchRequest, const TExecuteBatchOptions& options) { if (batchRequest.IsExecuted()) { ythrow yexception() << "Cannot execute batch request since it is already executed"; } Y_DEFER { batchRequest.MarkExecuted(); }; const auto concurrency = options.Concurrency_.GetOrElse(50); const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); if (!retryPolicy) { retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); } while (batchRequest.BatchSize()) { TRawBatchRequest retryBatch(context.Config); while (batchRequest.BatchSize()) { auto parameters = TNode::CreateMap(); TInstant nextTry; batchRequest.FillParameterList(batchPartMaxSize, ¶meters["requests"], &nextTry); if (nextTry) { SleepUntil(nextTry); } parameters["concurrency"] = concurrency; auto body = NodeToYsonString(parameters); THttpHeader header("POST", "execute_batch"); header.AddMutationId(); NDetail::TResponseInfo result; try { result = RetryRequestWithPolicy(retryPolicy, context, header, body); } catch (const std::exception& e) { batchRequest.SetErrorResult(std::current_exception()); retryBatch.SetErrorResult(std::current_exception()); throw; } batchRequest.ParseResponse(std::move(result), retryPolicy.Get(), &retryBatch); } batchRequest = std::move(retryBatch); } } TNode Get( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const TGetOptions& options) { THttpHeader header("GET", "get"); header.MergeParameters(SerializeParamsForGet(transactionId, context.Config->Prefix, path, options)); return NodeFromYsonString(RetryRequestWithPolicy(retryPolicy, context, header).Response); } TNode TryGet( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const TGetOptions& options) { try { return Get(retryPolicy, context, transactionId, path, options); } catch (const TErrorResponse& error) { if (!error.IsResolveError()) { throw; } return TNode(); } } void Set( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const TNode& value, const TSetOptions& options) { THttpHeader header("PUT", "set"); header.AddMutationId(); header.MergeParameters(SerializeParamsForSet(transactionId, context.Config->Prefix, path, options)); auto body = NodeToYsonString(value); RetryRequestWithPolicy(retryPolicy, context, header, body); } void MultisetAttributes( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options) { THttpHeader header("PUT", "api/v4/multiset_attributes", false); header.AddMutationId(); header.MergeParameters(SerializeParamsForMultisetAttributes(transactionId, context.Config->Prefix, path, options)); auto body = NodeToYsonString(value); RetryRequestWithPolicy(retryPolicy, context, header, body); } bool Exists( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const TExistsOptions& options) { THttpHeader header("GET", "exists"); header.MergeParameters(SerializeParamsForExists(transactionId, context.Config->Prefix, path, options)); return ParseBoolFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); } TNodeId Create( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const ENodeType& type, const TCreateOptions& options) { THttpHeader header("POST", "create"); header.AddMutationId(); header.MergeParameters(SerializeParamsForCreate(transactionId, context.Config->Prefix, path, type, options)); return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); } TNodeId CopyWithoutRetries( const TClientContext& context, const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TCopyOptions& options) { THttpHeader header("POST", "copy"); header.AddMutationId(); header.MergeParameters(SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options)); return ParseGuidFromResponse(RequestWithoutRetry(context, header).Response); } TNodeId CopyInsideMasterCell( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TCopyOptions& options) { THttpHeader header("POST", "copy"); header.AddMutationId(); auto params = SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options); // Make cross cell copying disable. params["enable_cross_cell_copying"] = false; header.MergeParameters(params); return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); } TNodeId MoveWithoutRetries( const TClientContext& context, const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TMoveOptions& options) { THttpHeader header("POST", "move"); header.AddMutationId(); header.MergeParameters(SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options)); return ParseGuidFromResponse(RequestWithoutRetry( context, header).Response); } TNodeId MoveInsideMasterCell( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TMoveOptions& options) { THttpHeader header("POST", "move"); header.AddMutationId(); auto params = SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options); // Make cross cell copying disable. params["enable_cross_cell_copying"] = false; header.MergeParameters(params); return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); } void Remove( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const TRemoveOptions& options) { THttpHeader header("POST", "remove"); header.AddMutationId(); header.MergeParameters(SerializeParamsForRemove(transactionId, context.Config->Prefix, path, options)); RetryRequestWithPolicy(retryPolicy, context, header); } TNode::TListType List( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const TListOptions& options) { THttpHeader header("GET", "list"); TYPath updatedPath = AddPathPrefix(path, context.Config->Prefix); // Translate "//" to "/" // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config" if (path.empty() && updatedPath.EndsWith('/')) { updatedPath.pop_back(); } header.MergeParameters(SerializeParamsForList(transactionId, context.Config->Prefix, updatedPath, options)); auto result = RetryRequestWithPolicy(retryPolicy, context, header); return NodeFromYsonString(result.Response).AsList(); } TNodeId Link( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& targetPath, const TYPath& linkPath, const TLinkOptions& options) { THttpHeader header("POST", "link"); header.AddMutationId(); header.MergeParameters(SerializeParamsForLink(transactionId, context.Config->Prefix, targetPath, linkPath, options)); return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); } TLockId Lock( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, ELockMode mode, const TLockOptions& options) { THttpHeader header("POST", "lock"); header.AddMutationId(); header.MergeParameters(SerializeParamsForLock(transactionId, context.Config->Prefix, path, mode, options)); return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); } void Unlock( IRequestRetryPolicyPtr retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const TUnlockOptions& options) { THttpHeader header("POST", "unlock"); header.AddMutationId(); header.MergeParameters(SerializeParamsForUnlock(transactionId, context.Config->Prefix, path, options)); RetryRequestWithPolicy(retryPolicy, context, header); } void Concatenate( const TClientContext& context, const TTransactionId& transactionId, const TVector& sourcePaths, const TRichYPath& destinationPath, const TConcatenateOptions& options) { THttpHeader header("POST", "concatenate"); header.AddMutationId(); header.MergeParameters(SerializeParamsForConcatenate(transactionId, context.Config->Prefix, sourcePaths, destinationPath, options)); RequestWithoutRetry(context, header); } void PingTx( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId) { THttpHeader header("POST", "ping_tx"); header.MergeParameters(SerializeParamsForPingTx(transactionId)); TRequestConfig requestConfig; requestConfig.HttpConfig = NHttpClient::THttpConfig{ .SocketTimeout = context.Config->PingTimeout }; RetryRequestWithPolicy(retryPolicy, context, header, {}, requestConfig); } TOperationAttributes ParseOperationAttributes(const TNode& node) { const auto& mapNode = node.AsMap(); TOperationAttributes result; if (auto idNode = mapNode.FindPtr("id")) { result.Id = GetGuid(idNode->AsString()); } if (auto typeNode = mapNode.FindPtr("type")) { result.Type = FromString(typeNode->AsString()); } else if (auto operationTypeNode = mapNode.FindPtr("operation_type")) { // COMPAT(levysotsky): "operation_type" is a deprecated synonym for "type". // This branch should be removed when all clusters are updated. result.Type = FromString(operationTypeNode->AsString()); } if (auto stateNode = mapNode.FindPtr("state")) { result.State = stateNode->AsString(); // We don't use FromString here, because OS_IN_PROGRESS unites many states: "initializing", "running", etc. if (*result.State == "completed") { result.BriefState = EOperationBriefState::Completed; } else if (*result.State == "aborted") { result.BriefState = EOperationBriefState::Aborted; } else if (*result.State == "failed") { result.BriefState = EOperationBriefState::Failed; } else { result.BriefState = EOperationBriefState::InProgress; } } if (auto authenticatedUserNode = mapNode.FindPtr("authenticated_user")) { result.AuthenticatedUser = authenticatedUserNode->AsString(); } if (auto startTimeNode = mapNode.FindPtr("start_time")) { result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString()); } if (auto finishTimeNode = mapNode.FindPtr("finish_time")) { result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString()); } auto briefProgressNode = mapNode.FindPtr("brief_progress"); if (briefProgressNode && briefProgressNode->HasKey("jobs")) { result.BriefProgress.ConstructInPlace(); static auto load = [] (const TNode& item) { // Backward compatibility with old YT versions return item.IsInt64() ? item.AsInt64() : item["total"].AsInt64(); }; const auto& jobs = (*briefProgressNode)["jobs"]; result.BriefProgress->Aborted = load(jobs["aborted"]); result.BriefProgress->Completed = load(jobs["completed"]); result.BriefProgress->Running = jobs["running"].AsInt64(); result.BriefProgress->Total = jobs["total"].AsInt64(); result.BriefProgress->Failed = jobs["failed"].AsInt64(); result.BriefProgress->Lost = jobs["lost"].AsInt64(); result.BriefProgress->Pending = jobs["pending"].AsInt64(); } if (auto briefSpecNode = mapNode.FindPtr("brief_spec")) { result.BriefSpec = *briefSpecNode; } if (auto specNode = mapNode.FindPtr("spec")) { result.Spec = *specNode; } if (auto fullSpecNode = mapNode.FindPtr("full_spec")) { result.FullSpec = *fullSpecNode; } if (auto unrecognizedSpecNode = mapNode.FindPtr("unrecognized_spec")) { result.UnrecognizedSpec = *unrecognizedSpecNode; } if (auto suspendedNode = mapNode.FindPtr("suspended")) { result.Suspended = suspendedNode->AsBool(); } if (auto resultNode = mapNode.FindPtr("result")) { result.Result.ConstructInPlace(); auto error = TYtError((*resultNode)["error"]); if (error.GetCode() != 0) { result.Result->Error = std::move(error); } } if (auto progressNode = mapNode.FindPtr("progress")) { const auto& progressMap = progressNode->AsMap(); TMaybe buildTime; if (auto buildTimeNode = progressMap.FindPtr("build_time")) { buildTime = TInstant::ParseIso8601(buildTimeNode->AsString()); } TJobStatistics jobStatistics; if (auto jobStatisticsNode = progressMap.FindPtr("job_statistics")) { jobStatistics = TJobStatistics(*jobStatisticsNode); } TJobCounters jobCounters; if (auto jobCountersNode = progressMap.FindPtr("total_job_counter")) { jobCounters = TJobCounters(*jobCountersNode); } result.Progress = TOperationProgress{ .JobStatistics = std::move(jobStatistics), .JobCounters = std::move(jobCounters), .BuildTime = buildTime, }; } if (auto eventsNode = mapNode.FindPtr("events")) { result.Events.ConstructInPlace().reserve(eventsNode->Size()); for (const auto& eventNode : eventsNode->AsList()) { result.Events->push_back(TOperationEvent{ eventNode["state"].AsString(), TInstant::ParseIso8601(eventNode["time"].AsString()), }); } } if (auto alertsNode = mapNode.FindPtr("alerts")) { result.Alerts.ConstructInPlace(); for (const auto& [alertType, alertError] : alertsNode->AsMap()) { result.Alerts->emplace(alertType, TYtError(alertError)); } } return result; } TOperationAttributes GetOperation( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TOperationId& operationId, const TGetOperationOptions& options) { THttpHeader header("GET", "get_operation"); header.MergeParameters(SerializeParamsForGetOperation(operationId, options)); auto result = RetryRequestWithPolicy(retryPolicy, context, header); return ParseOperationAttributes(NodeFromYsonString(result.Response)); } TOperationAttributes GetOperation( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TString& alias, const TGetOperationOptions& options) { THttpHeader header("GET", "get_operation"); header.MergeParameters(SerializeParamsForGetOperation(alias, options)); auto result = RetryRequestWithPolicy(retryPolicy, context, header); return ParseOperationAttributes(NodeFromYsonString(result.Response)); } void AbortOperation( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TOperationId& operationId) { THttpHeader header("POST", "abort_op"); header.AddMutationId(); header.MergeParameters(SerializeParamsForAbortOperation(operationId)); RetryRequestWithPolicy(retryPolicy, context, header); } void CompleteOperation( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TOperationId& operationId) { THttpHeader header("POST", "complete_op"); header.AddMutationId(); header.MergeParameters(SerializeParamsForCompleteOperation(operationId)); RetryRequestWithPolicy(retryPolicy, context, header); } void SuspendOperation( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TOperationId& operationId, const TSuspendOperationOptions& options) { THttpHeader header("POST", "suspend_op"); header.AddMutationId(); header.MergeParameters(SerializeParamsForSuspendOperation(operationId, options)); RetryRequestWithPolicy(retryPolicy, context, header); } void ResumeOperation( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TOperationId& operationId, const TResumeOperationOptions& options) { THttpHeader header("POST", "resume_op"); header.AddMutationId(); header.MergeParameters(SerializeParamsForResumeOperation(operationId, options)); RetryRequestWithPolicy(retryPolicy, context, header); } template static THashMap GetCounts(const TNode& countsNode) { THashMap counts; for (const auto& entry : countsNode.AsMap()) { counts.emplace(FromString(entry.first), entry.second.AsInt64()); } return counts; } TListOperationsResult ListOperations( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TListOperationsOptions& options) { THttpHeader header("GET", "list_operations"); header.MergeParameters(SerializeParamsForListOperations(options)); auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); auto resultNode = NodeFromYsonString(responseInfo.Response); TListOperationsResult result; for (const auto& operationNode : resultNode["operations"].AsList()) { result.Operations.push_back(ParseOperationAttributes(operationNode)); } if (resultNode.HasKey("pool_counts")) { result.PoolCounts = GetCounts(resultNode["pool_counts"]); } if (resultNode.HasKey("user_counts")) { result.UserCounts = GetCounts(resultNode["user_counts"]); } if (resultNode.HasKey("type_counts")) { result.TypeCounts = GetCounts(resultNode["type_counts"]); } if (resultNode.HasKey("state_counts")) { result.StateCounts = GetCounts(resultNode["state_counts"]); } if (resultNode.HasKey("failed_jobs_count")) { result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64(); } result.Incomplete = resultNode["incomplete"].AsBool(); return result; } void UpdateOperationParameters( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TOperationId& operationId, const TUpdateOperationParametersOptions& options) { THttpHeader header("POST", "update_op_parameters"); header.MergeParameters(SerializeParamsForUpdateOperationParameters(operationId, options)); RetryRequestWithPolicy(retryPolicy, context, header); } TJobAttributes ParseJobAttributes(const TNode& node) { const auto& mapNode = node.AsMap(); TJobAttributes result; // Currently "get_job" returns "job_id" field and "list_jobs" returns "id" field. auto idNode = mapNode.FindPtr("id"); if (!idNode) { idNode = mapNode.FindPtr("job_id"); } if (idNode) { result.Id = GetGuid(idNode->AsString()); } if (auto typeNode = mapNode.FindPtr("type")) { result.Type = FromString(typeNode->AsString()); } if (auto stateNode = mapNode.FindPtr("state")) { result.State = FromString(stateNode->AsString()); } if (auto addressNode = mapNode.FindPtr("address")) { result.Address = addressNode->AsString(); } if (auto taskNameNode = mapNode.FindPtr("task_name")) { result.TaskName = taskNameNode->AsString(); } if (auto startTimeNode = mapNode.FindPtr("start_time")) { result.StartTime = TInstant::ParseIso8601(startTimeNode->AsString()); } if (auto finishTimeNode = mapNode.FindPtr("finish_time")) { result.FinishTime = TInstant::ParseIso8601(finishTimeNode->AsString()); } if (auto progressNode = mapNode.FindPtr("progress")) { result.Progress = progressNode->AsDouble(); } if (auto stderrSizeNode = mapNode.FindPtr("stderr_size")) { result.StderrSize = stderrSizeNode->AsUint64(); } if (auto errorNode = mapNode.FindPtr("error")) { result.Error.ConstructInPlace(*errorNode); } if (auto briefStatisticsNode = mapNode.FindPtr("brief_statistics")) { result.BriefStatistics = *briefStatisticsNode; } if (auto inputPathsNode = mapNode.FindPtr("input_paths")) { const auto& inputPathNodesList = inputPathsNode->AsList(); result.InputPaths.ConstructInPlace(); result.InputPaths->reserve(inputPathNodesList.size()); for (const auto& inputPathNode : inputPathNodesList) { TRichYPath path; Deserialize(path, inputPathNode); result.InputPaths->push_back(std::move(path)); } } if (auto coreInfosNode = mapNode.FindPtr("core_infos")) { const auto& coreInfoNodesList = coreInfosNode->AsList(); result.CoreInfos.ConstructInPlace(); result.CoreInfos->reserve(coreInfoNodesList.size()); for (const auto& coreInfoNode : coreInfoNodesList) { TCoreInfo coreInfo; coreInfo.ProcessId = coreInfoNode["process_id"].AsInt64(); coreInfo.ExecutableName = coreInfoNode["executable_name"].AsString(); if (coreInfoNode.HasKey("size")) { coreInfo.Size = coreInfoNode["size"].AsUint64(); } if (coreInfoNode.HasKey("error")) { coreInfo.Error.ConstructInPlace(coreInfoNode["error"]); } result.CoreInfos->push_back(std::move(coreInfo)); } } return result; } TJobAttributes GetJob( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TOperationId& operationId, const TJobId& jobId, const TGetJobOptions& options) { THttpHeader header("GET", "get_job"); header.MergeParameters(SerializeParamsForGetJob(operationId, jobId, options)); auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); auto resultNode = NodeFromYsonString(responseInfo.Response); return ParseJobAttributes(resultNode); } TListJobsResult ListJobs( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TOperationId& operationId, const TListJobsOptions& options) { THttpHeader header("GET", "list_jobs"); header.MergeParameters(SerializeParamsForListJobs(operationId, options)); auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); auto resultNode = NodeFromYsonString(responseInfo.Response); TListJobsResult result; const auto& jobNodesList = resultNode["jobs"].AsList(); result.Jobs.reserve(jobNodesList.size()); for (const auto& jobNode : jobNodesList) { result.Jobs.push_back(ParseJobAttributes(jobNode)); } if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) { result.CypressJobCount = resultNode["cypress_job_count"].AsInt64(); } if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) { result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64(); } if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) { result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64(); } return result; } class TResponseReader : public IFileReader { public: TResponseReader(const TClientContext& context, THttpHeader header) { if (context.ServiceTicketAuth) { header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); } else { header.SetToken(context.Token); } if (context.ImpersonationUser) { header.SetImpersonationUser(*context.ImpersonationUser); } auto hostName = GetProxyForHeavyRequest(context); auto requestId = CreateGuidAsString(); UpdateHeaderForProxyIfNeed(hostName, context, header); Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); ResponseStream_ = Response_->GetResponseStream(); } private: size_t DoRead(void* buf, size_t len) override { return ResponseStream_->Read(buf, len); } size_t DoSkip(size_t len) override { return ResponseStream_->Skip(len); } private: THttpRequest Request_; NHttpClient::IHttpResponsePtr Response_; IInputStream* ResponseStream_; }; IFileReaderPtr GetJobInput( const TClientContext& context, const TJobId& jobId, const TGetJobInputOptions& /* options */) { THttpHeader header("GET", "get_job_input"); header.AddParameter("job_id", GetGuidAsString(jobId)); return new TResponseReader(context, std::move(header)); } IFileReaderPtr GetJobFailContext( const TClientContext& context, const TOperationId& operationId, const TJobId& jobId, const TGetJobFailContextOptions& /* options */) { THttpHeader header("GET", "get_job_fail_context"); header.AddOperationId(operationId); header.AddParameter("job_id", GetGuidAsString(jobId)); return new TResponseReader(context, std::move(header)); } TString GetJobStderrWithRetries( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TOperationId& operationId, const TJobId& jobId, const TGetJobStderrOptions& /* options */) { THttpHeader header("GET", "get_job_stderr"); header.AddOperationId(operationId); header.AddParameter("job_id", GetGuidAsString(jobId)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header, {}, config); return responseInfo.Response; } IFileReaderPtr GetJobStderr( const TClientContext& context, const TOperationId& operationId, const TJobId& jobId, const TGetJobStderrOptions& /* options */) { THttpHeader header("GET", "get_job_stderr"); header.AddOperationId(operationId); header.AddParameter("job_id", GetGuidAsString(jobId)); return new TResponseReader(context, std::move(header)); } TMaybe GetFileFromCache( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TString& md5Signature, const TYPath& cachePath, const TGetFileFromCacheOptions& options) { THttpHeader header("GET", "get_file_from_cache"); header.MergeParameters(SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options)); auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); auto path = NodeFromYsonString(responseInfo.Response).AsString(); return path.empty() ? Nothing() : TMaybe(path); } TYPath PutFileToCache( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& filePath, const TString& md5Signature, const TYPath& cachePath, const TPutFileToCacheOptions& options) { THttpHeader header("POST", "put_file_to_cache"); header.MergeParameters(SerializeParamsForPutFileToCache(transactionId, context.Config->Prefix, filePath, md5Signature, cachePath, options)); auto result = RetryRequestWithPolicy(retryPolicy, context, header); return NodeFromYsonString(result.Response).AsString(); } TNode::TListType SkyShareTable( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const std::vector& tablePaths, const TSkyShareTableOptions& options) { THttpHeader header("POST", "api/v1/share", /*IsApi*/ false); auto proxyName = context.ServerName.substr(0, context.ServerName.find('.')); auto host = context.Config->SkynetApiHost; if (host == "") { host = "skynet." + proxyName + ".yt.yandex.net"; } header.MergeParameters(SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, options)); TClientContext skyApiHost({ .ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient() }); TResponseInfo response = {}; // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK). while (response.HttpCode != 200) { response = RetryRequestWithPolicy(retryPolicy, skyApiHost, header, ""); TWaitProxy::Get()->Sleep(TDuration::Seconds(5)); } if (options.KeyColumns_) { return NodeFromJsonString(response.Response)["torrents"].AsList(); } else { TNode torrent; torrent["key"] = TNode::CreateList(); torrent["rbtorrent"] = response.Response; return TNode::TListType{ torrent }; } } TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node) { auto parseSingleResult = [] (const TNode::TMapType& node) { TCheckPermissionResult result; result.Action = ::FromString(node.at("action").AsString()); if (auto objectId = node.FindPtr("object_id")) { result.ObjectId = GetGuid(objectId->AsString()); } if (auto objectName = node.FindPtr("object_name")) { result.ObjectName = objectName->AsString(); } if (auto subjectId = node.FindPtr("subject_id")) { result.SubjectId = GetGuid(subjectId->AsString()); } if (auto subjectName = node.FindPtr("subject_name")) { result.SubjectName = subjectName->AsString(); } return result; }; const auto& mapNode = node.AsMap(); TCheckPermissionResponse result; static_cast(result) = parseSingleResult(mapNode); if (auto columns = mapNode.FindPtr("columns")) { result.Columns.reserve(columns->AsList().size()); for (const auto& columnNode : columns->AsList()) { result.Columns.push_back(parseSingleResult(columnNode.AsMap())); } } return result; } TCheckPermissionResponse CheckPermission( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TString& user, EPermission permission, const TYPath& path, const TCheckPermissionOptions& options) { THttpHeader header("GET", "check_permission"); header.MergeParameters(SerializeParamsForCheckPermission(user, permission, context.Config->Prefix, path, options)); auto response = RetryRequestWithPolicy(retryPolicy, context, header); return ParseCheckPermissionResponse(NodeFromYsonString(response.Response)); } TVector GetTabletInfos( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TYPath& path, const TVector& tabletIndexes, const TGetTabletInfosOptions& options) { THttpHeader header("POST", "api/v4/get_tablet_infos", false); header.MergeParameters(SerializeParamsForGetTabletInfos(context.Config->Prefix, path, tabletIndexes, options)); auto response = RetryRequestWithPolicy(retryPolicy, context, header); TVector result; Deserialize(result, *NodeFromYsonString(response.Response).AsMap().FindPtr("tablets")); return result; } TVector GetTableColumnarStatistics( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TVector& paths, const TGetTableColumnarStatisticsOptions& options) { THttpHeader header("GET", "get_table_columnar_statistics"); header.MergeParameters(SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options)); TRequestConfig config; config.IsHeavy = true; auto requestResult = RetryRequestWithPolicy(retryPolicy, context, header, {}, config); auto response = NodeFromYsonString(requestResult.Response); TVector result; Deserialize(result, response); return result; } TMultiTablePartitions GetTablePartitions( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TVector& paths, const TGetTablePartitionsOptions& options) { THttpHeader header("GET", "partition_tables"); header.MergeParameters(SerializeParamsForGetTablePartitions(transactionId, paths, options)); TRequestConfig config; config.IsHeavy = true; auto requestResult = RetryRequestWithPolicy(retryPolicy, context, header, {}, config); auto response = NodeFromYsonString(requestResult.Response); TMultiTablePartitions result; Deserialize(result, response); return result; } TRichYPath CanonizeYPath( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TRichYPath& path) { return CanonizeYPaths(retryPolicy, context, {path}).front(); } TVector CanonizeYPaths( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TVector& paths) { TRawBatchRequest batch(context.Config); TVector> futures; futures.reserve(paths.size()); for (int i = 0; i < static_cast(paths.size()); ++i) { futures.push_back(batch.CanonizeYPath(paths[i])); } ExecuteBatch(retryPolicy, context, batch, TExecuteBatchOptions{}); TVector result; result.reserve(futures.size()); for (auto& future : futures) { result.push_back(future.ExtractValueSync()); } return result; } void AlterTable( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId, const TYPath& path, const TAlterTableOptions& options) { THttpHeader header("POST", "alter_table"); header.AddMutationId(); header.MergeParameters(SerializeParamsForAlterTable(transactionId, context.Config->Prefix, path, options)); RetryRequestWithPolicy(retryPolicy, context, header); } void AlterTableReplica( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TReplicaId& replicaId, const TAlterTableReplicaOptions& options) { THttpHeader header("POST", "alter_table_replica"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options)); RetryRequestWithPolicy(retryPolicy, context, header); } void DeleteRows( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TYPath& path, const TNode::TListType& keys, const TDeleteRowsOptions& options) { THttpHeader header("PUT", "delete_rows"); header.SetInputFormat(TFormat::YsonBinary()); header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(context.Config->Prefix, path, options)); auto body = NodeListToYsonString(keys); TRequestConfig requestConfig; requestConfig.IsHeavy = true; RetryRequestWithPolicy(retryPolicy, context, header, body, requestConfig); } void FreezeTable( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TYPath& path, const TFreezeTableOptions& options) { THttpHeader header("POST", "freeze_table"); header.MergeParameters(SerializeParamsForFreezeTable(context.Config->Prefix, path, options)); RetryRequestWithPolicy(retryPolicy, context, header); } void UnfreezeTable( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TYPath& path, const TUnfreezeTableOptions& options) { THttpHeader header("POST", "unfreeze_table"); header.MergeParameters(SerializeParamsForUnfreezeTable(context.Config->Prefix, path, options)); RetryRequestWithPolicy(retryPolicy, context, header); } void AbortTransaction( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId) { THttpHeader header("POST", "abort_tx"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId)); RetryRequestWithPolicy(retryPolicy, context, header); } void CommitTransaction( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& transactionId) { THttpHeader header("POST", "commit_tx"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId)); RetryRequestWithPolicy(retryPolicy, context, header); } TTransactionId StartTransaction( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, const TTransactionId& parentTransactionId, const TStartTransactionOptions& options) { THttpHeader header("POST", "start_tx"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, context.Config->TxTimeout, options)); return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response); } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail::NRawClient