123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946 |
- #include "raw_client.h"
- #include "raw_requests.h"
- #include "rpc_parameters_serialization.h"
- #include <yt/cpp/mapreduce/common/helpers.h>
- #include <yt/cpp/mapreduce/common/retry_lib.h>
- #include <yt/cpp/mapreduce/http/helpers.h>
- #include <yt/cpp/mapreduce/http/http.h>
- #include <yt/cpp/mapreduce/http/requests.h>
- #include <yt/cpp/mapreduce/http/retry_request.h>
- #include <yt/cpp/mapreduce/interface/fluent.h>
- #include <yt/cpp/mapreduce/interface/fwd.h>
- #include <yt/cpp/mapreduce/interface/operation.h>
- #include <yt/cpp/mapreduce/interface/tvm.h>
- #include <yt/cpp/mapreduce/io/helpers.h>
- #include <library/cpp/yson/node/node_io.h>
- #include <library/cpp/yt/yson_string/string.h>
- namespace NYT::NDetail {
- ////////////////////////////////////////////////////////////////////////////////
- THttpRawClient::THttpRawClient(const TClientContext& context)
- : Context_(context)
- { }
- TNode THttpRawClient::Get(
- const TTransactionId& transactionId,
- const TYPath& path,
- const TGetOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get");
- header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options));
- return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- TNode THttpRawClient::TryGet(
- const TTransactionId& transactionId,
- const TYPath& path,
- const TGetOptions& options)
- {
- try {
- return Get(transactionId, path, options);
- } catch (const TErrorResponse& error) {
- if (!error.IsResolveError()) {
- throw;
- }
- return {};
- }
- }
- void THttpRawClient::Set(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TNode& value,
- const TSetOptions& options)
- {
- THttpHeader header("PUT", "set");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options));
- auto body = NodeToYsonString(value);
- RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
- }
- bool THttpRawClient::Exists(
- const TTransactionId& transactionId,
- const TYPath& path,
- const TExistsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "exists");
- header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options));
- return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- void THttpRawClient::MultisetAttributes(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TNode::TMapType& value,
- const TMultisetAttributesOptions& options)
- {
- THttpHeader header("PUT", "api/v4/multiset_attributes", false);
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options));
- auto body = NodeToYsonString(value);
- RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse();
- }
- TNodeId THttpRawClient::Create(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& path,
- const ENodeType& type,
- const TCreateOptions& options)
- {
- THttpHeader header("POST", "create");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- TNodeId THttpRawClient::CopyWithoutRetries(
- const TTransactionId& transactionId,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TCopyOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "copy");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- TNodeId THttpRawClient::CopyInsideMasterCell(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TCopyOptions& options)
- {
- THttpHeader header("POST", "copy");
- header.AddMutationId();
- auto params = NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
- // Make cross cell copying disable.
- params["enable_cross_cell_copying"] = false;
- header.MergeParameters(params);
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- TNodeId THttpRawClient::MoveWithoutRetries(
- const TTransactionId& transactionId,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TMoveOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "move");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- TNodeId THttpRawClient::MoveInsideMasterCell(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TMoveOptions& options)
- {
- THttpHeader header("POST", "move");
- header.AddMutationId();
- auto params = NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options);
- // Make cross cell copying disable.
- params["enable_cross_cell_copying"] = false;
- header.MergeParameters(params);
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- void THttpRawClient::Remove(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TRemoveOptions& options)
- {
- THttpHeader header("POST", "remove");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForRemove(transactionId, Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- TNode::TListType THttpRawClient::List(
- const TTransactionId& transactionId,
- const TYPath& path,
- const TListOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "list");
- TYPath updatedPath = AddPathPrefix(path, Context_.Config->Prefix);
- // Translate "//" to "/"
- // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config"
- if (path.empty() && updatedPath.EndsWith('/')) {
- updatedPath.pop_back();
- }
- header.MergeParameters(NRawClient::SerializeParamsForList(transactionId, Context_.Config->Prefix, updatedPath, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NodeFromYsonString(responseInfo->GetResponse()).AsList();
- }
- TNodeId THttpRawClient::Link(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& targetPath,
- const TYPath& linkPath,
- const TLinkOptions& options)
- {
- THttpHeader header("POST", "link");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForLink(transactionId, Context_.Config->Prefix, targetPath, linkPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- TLockId THttpRawClient::Lock(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& path,
- ELockMode mode,
- const TLockOptions& options)
- {
- THttpHeader header("POST", "lock");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForLock(transactionId, Context_.Config->Prefix, path, mode, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- void THttpRawClient::Unlock(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TUnlockOptions& options)
- {
- THttpHeader header("POST", "unlock");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForUnlock(transactionId, Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::Concatenate(
- const TTransactionId& transactionId,
- const TVector<TRichYPath>& sourcePaths,
- const TRichYPath& destinationPath,
- const TConcatenateOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "concatenate");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForConcatenate(transactionId, Context_.Config->Prefix, sourcePaths, destinationPath, options));
- TRequestConfig config;
- config.IsHeavy = true;
- RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse();
- }
- TTransactionId THttpRawClient::StartTransaction(
- TMutationId& mutationId,
- const TTransactionId& parentTransactionId,
- const TStartTransactionOptions& options)
- {
- THttpHeader header("POST", "start_tx");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, Context_.Config->TxTimeout, options));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- void THttpRawClient::PingTransaction(const TTransactionId& transactionId)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "ping_tx");
- header.MergeParameters(NRawClient::SerializeParamsForPingTx(transactionId));
- TRequestConfig requestConfig;
- requestConfig.HttpConfig = NHttpClient::THttpConfig{
- .SocketTimeout = Context_.Config->PingTimeout
- };
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::AbortTransaction(
- TMutationId& mutationId,
- const TTransactionId& transactionId)
- {
- THttpHeader header("POST", "abort_tx");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForAbortTransaction(transactionId));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::CommitTransaction(
- TMutationId& mutationId,
- const TTransactionId& transactionId)
- {
- THttpHeader header("POST", "commit_tx");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- TOperationId THttpRawClient::StartOperation(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- EOperationType type,
- const TNode& spec)
- {
- THttpHeader header("POST", "start_op");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec));
- return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
- }
- TOperationAttributes THttpRawClient::GetOperation(
- const TOperationId& operationId,
- const TGetOperationOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get_operation");
- header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
- }
- TOperationAttributes THttpRawClient::GetOperation(
- const TString& alias,
- const TGetOperationOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get_operation");
- header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse()));
- }
- void THttpRawClient::AbortOperation(
- TMutationId& mutationId,
- const TOperationId& operationId)
- {
- THttpHeader header("POST", "abort_op");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::CompleteOperation(
- TMutationId& mutationId,
- const TOperationId& operationId)
- {
- THttpHeader header("POST", "complete_op");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::SuspendOperation(
- TMutationId& mutationId,
- const TOperationId& operationId,
- const TSuspendOperationOptions& options)
- {
- THttpHeader header("POST", "suspend_op");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::ResumeOperation(
- TMutationId& mutationId,
- const TOperationId& operationId,
- const TResumeOperationOptions& options)
- {
- THttpHeader header("POST", "resume_op");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- template <typename TKey>
- static THashMap<TKey, i64> GetCounts(const TNode& countsNode)
- {
- THashMap<TKey, i64> counts;
- for (const auto& entry : countsNode.AsMap()) {
- counts.emplace(FromString<TKey>(entry.first), entry.second.AsInt64());
- }
- return counts;
- }
- TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "list_operations");
- header.MergeParameters(NRawClient::SerializeParamsForListOperations(options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
- const auto& operationNodesList = resultNode["operations"].AsList();
- TListOperationsResult result;
- result.Operations.reserve(operationNodesList.size());
- for (const auto& operationNode : operationNodesList) {
- result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode));
- }
- if (resultNode.HasKey("pool_counts")) {
- result.PoolCounts = GetCounts<TString>(resultNode["pool_counts"]);
- }
- if (resultNode.HasKey("user_counts")) {
- result.UserCounts = GetCounts<TString>(resultNode["user_counts"]);
- }
- if (resultNode.HasKey("type_counts")) {
- result.TypeCounts = GetCounts<EOperationType>(resultNode["type_counts"]);
- }
- if (resultNode.HasKey("state_counts")) {
- result.StateCounts = GetCounts<TString>(resultNode["state_counts"]);
- }
- if (resultNode.HasKey("failed_jobs_count")) {
- result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64();
- }
- result.Incomplete = resultNode["incomplete"].AsBool();
- return result;
- }
- void THttpRawClient::UpdateOperationParameters(
- const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "update_op_parameters");
- header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options));
- RequestWithoutRetry(Context_, mutationId, header);
- }
- NYson::TYsonString THttpRawClient::GetJob(
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get_job");
- header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NYson::TYsonString(responseInfo->GetResponse());
- }
- TListJobsResult THttpRawClient::ListJobs(
- const TOperationId& operationId,
- const TListJobsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "list_jobs");
- header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
- const auto& jobNodesList = resultNode["jobs"].AsList();
- TListJobsResult result;
- result.Jobs.reserve(jobNodesList.size());
- for (const auto& jobNode : jobNodesList) {
- result.Jobs.push_back(NRawClient::ParseJobAttributes(jobNode));
- }
- if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) {
- result.CypressJobCount = resultNode["cypress_job_count"].AsInt64();
- }
- if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) {
- result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64();
- }
- if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) {
- result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64();
- }
- return result;
- }
- IFileReaderPtr THttpRawClient::GetJobInput(
- const TJobId& jobId,
- const TGetJobInputOptions& /*options*/)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get_job_input");
- header.AddParameter("job_id", GetGuidAsString(jobId));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
- }
- IFileReaderPtr THttpRawClient::GetJobFailContext(
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobFailContextOptions& /*options*/)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get_job_fail_context");
- header.AddOperationId(operationId);
- header.AddParameter("job_id", GetGuidAsString(jobId));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
- }
- IFileReaderPtr THttpRawClient::GetJobStderr(
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobStderrOptions& /*options*/)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get_job_stderr");
- header.AddOperationId(operationId);
- header.AddParameter("job_id", GetGuidAsString(jobId));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
- }
- TJobTraceEvent ParseJobTraceEvent(const TNode& node)
- {
- const auto& mapNode = node.AsMap();
- TJobTraceEvent result;
- if (auto idNode = mapNode.FindPtr("operation_id")) {
- result.OperationId = GetGuid(idNode->AsString());
- }
- if (auto idNode = mapNode.FindPtr("job_id")) {
- result.JobId = GetGuid(idNode->AsString());
- }
- if (auto idNode = mapNode.FindPtr("trace_id")) {
- result.TraceId = GetGuid(idNode->AsString());
- }
- if (auto eventIndexNode = mapNode.FindPtr("event_index")) {
- result.EventIndex = eventIndexNode->AsInt64();
- }
- if (auto eventNode = mapNode.FindPtr("event")) {
- result.Event = eventNode->AsString();
- }
- if (auto eventTimeNode = mapNode.FindPtr("event_time")) {
- result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());;
- }
- return result;
- }
- std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace(
- const TOperationId& operationId,
- const TGetJobTraceOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get_job_trace");
- header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- auto resultNode = NodeFromYsonString(responseInfo->GetResponse());
- const auto& traceEventNodesList = resultNode.AsList();
- std::vector<TJobTraceEvent> result;
- result.reserve(traceEventNodesList.size());
- for (const auto& traceEventNode : traceEventNodesList) {
- result.push_back(ParseJobTraceEvent(traceEventNode));
- }
- return result;
- }
- std::unique_ptr<IInputStream> THttpRawClient::ReadFile(
- const TTransactionId& transactionId,
- const TRichYPath& path,
- const TFileReaderOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", GetReadFileCommand(Context_.Config->ApiVersion));
- header.AddTransactionId(transactionId);
- header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
- header.MergeParameters(FormIORequestParameters(path, options));
- header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
- }
- TMaybe<TYPath> THttpRawClient::GetFileFromCache(
- const TTransactionId& transactionId,
- const TString& md5Signature,
- const TYPath& cachePath,
- const TGetFileFromCacheOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get_file_from_cache");
- header.MergeParameters(NRawClient::SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- auto resultNode = NodeFromYsonString(responseInfo->GetResponse()).AsString();
- return resultNode.empty() ? Nothing() : TMaybe<TYPath>(resultNode);
- }
- TYPath THttpRawClient::PutFileToCache(
- const TTransactionId& transactionId,
- const TYPath& filePath,
- const TString& md5Signature,
- const TYPath& cachePath,
- const TPutFileToCacheOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "put_file_to_cache");
- header.MergeParameters(NRawClient::SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NodeFromYsonString(responseInfo->GetResponse()).AsString();
- }
- void THttpRawClient::MountTable(
- TMutationId& mutationId,
- const TYPath& path,
- const TMountTableOptions& options)
- {
- THttpHeader header("POST", "mount_table");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
- if (options.CellId_) {
- header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
- }
- header.AddParameter("freeze", options.Freeze_);
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::UnmountTable(
- TMutationId& mutationId,
- const TYPath& path,
- const TUnmountTableOptions& options)
- {
- THttpHeader header("POST", "unmount_table");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
- header.AddParameter("force", options.Force_);
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::RemountTable(
- TMutationId& mutationId,
- const TYPath& path,
- const TRemountTableOptions& options)
- {
- THttpHeader header("POST", "remount_table");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::ReshardTableByPivotKeys(
- TMutationId& mutationId,
- const TYPath& path,
- const TVector<TKey>& keys,
- const TReshardTableOptions& options)
- {
- THttpHeader header("POST", "reshard_table");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
- header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::ReshardTableByTabletCount(
- TMutationId& mutationId,
- const TYPath& path,
- i64 tabletCount,
- const TReshardTableOptions& options)
- {
- THttpHeader header("POST", "reshard_table");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options));
- header.AddParameter("tablet_count", tabletCount);
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::InsertRows(
- const TYPath& path,
- const TNode::TListType& rows,
- const TInsertRowsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("PUT", "insert_rows");
- header.SetInputFormat(TFormat::YsonBinary());
- header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
- auto body = NodeListToYsonString(rows);
- TRequestConfig config;
- config.IsHeavy = true;
- RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
- }
- void THttpRawClient::TrimRows(
- const TYPath& path,
- i64 tabletIndex,
- i64 rowCount,
- const TTrimRowsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "trim_rows");
- header.AddParameter("trimmed_row_count", rowCount);
- header.AddParameter("tablet_index", tabletIndex);
- header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
- TRequestConfig config;
- config.IsHeavy = true;
- RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse();
- }
- TNode::TListType THttpRawClient::LookupRows(
- const TYPath& path,
- const TNode::TListType& keys,
- const TLookupRowsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("PUT", "lookup_rows");
- header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion));
- header.SetInputFormat(TFormat::YsonBinary());
- header.SetOutputFormat(TFormat::YsonBinary());
- header.MergeParameters(BuildYsonNodeFluently().BeginMap()
- .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
- })
- .Item("keep_missing_rows").Value(options.KeepMissingRows_)
- .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("versioned").Value(*options.Versioned_);
- })
- .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("column_names").Value(*options.Columns_);
- })
- .EndMap());
- auto body = NodeListToYsonString(keys);
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, body, config);
- return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
- }
- TNode::TListType THttpRawClient::SelectRows(
- const TString& query,
- const TSelectRowsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "select_rows");
- header.SetInputFormat(TFormat::YsonBinary());
- header.SetOutputFormat(TFormat::YsonBinary());
- header.MergeParameters(BuildYsonNodeFluently().BeginMap()
- .Item("query").Value(query)
- .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
- })
- .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
- })
- .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
- })
- .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
- .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
- .Item("verbose_logging").Value(options.VerboseLogging_)
- .Item("enable_code_cache").Value(options.EnableCodeCache_)
- .EndMap());
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList();
- }
- std::unique_ptr<IInputStream> THttpRawClient::ReadTable(
- const TTransactionId& transactionId,
- const TRichYPath& path,
- const TMaybe<TFormat>& format,
- const TTableReaderOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
- header.SetOutputFormat(format);
- header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
- header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, options));
- header.MergeParameters(FormIORequestParameters(path, options));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
- }
- std::unique_ptr<IInputStream> THttpRawClient::ReadBlobTable(
- const TTransactionId& transactionId,
- const TRichYPath& path,
- const TKey& key,
- const TBlobTableReaderOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "read_blob_table");
- header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
- header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
- header.MergeParameters(NRawClient::SerializeParamsForReadBlobTable(transactionId, path, key, options));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return std::make_unique<NHttpClient::THttpResponseStream>(std::move(responseInfo));
- }
- void THttpRawClient::AlterTable(
- TMutationId& mutationId,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TAlterTableOptions& options)
- {
- THttpHeader header("POST", "alter_table");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForAlterTable(transactionId, Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::AlterTableReplica(
- TMutationId& mutationId,
- const TReplicaId& replicaId,
- const TAlterTableReplicaOptions& options)
- {
- THttpHeader header("POST", "alter_table_replica");
- header.AddMutationId();
- header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::DeleteRows(
- const TYPath& path,
- const TNode::TListType& keys,
- const TDeleteRowsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("PUT", "delete_rows");
- header.SetInputFormat(TFormat::YsonBinary());
- header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(Context_.Config->Prefix, path, options));
- auto body = NodeListToYsonString(keys);
- TRequestConfig config;
- config.IsHeavy = true;
- RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse();
- }
- void THttpRawClient::FreezeTable(
- const TYPath& path,
- const TFreezeTableOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "freeze_table");
- header.MergeParameters(NRawClient::SerializeParamsForFreezeTable(Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- void THttpRawClient::UnfreezeTable(
- const TYPath& path,
- const TUnfreezeTableOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "unfreeze_table");
- header.MergeParameters(NRawClient::SerializeParamsForUnfreezeTable(Context_.Config->Prefix, path, options));
- RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
- }
- TCheckPermissionResponse THttpRawClient::CheckPermission(
- const TString& user,
- EPermission permission,
- const TYPath& path,
- const TCheckPermissionOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "check_permission");
- header.MergeParameters(NRawClient::SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo->GetResponse()));
- }
- TVector<TTabletInfo> THttpRawClient::GetTabletInfos(
- const TYPath& path,
- const TVector<int>& tabletIndexes,
- const TGetTabletInfosOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("POST", "api/v4/get_tablet_infos", /*isApi*/ false);
- header.MergeParameters(NRawClient::SerializeParamsForGetTabletInfos(Context_.Config->Prefix, path, tabletIndexes, options));
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
- TVector<TTabletInfo> result;
- Deserialize(result, *NodeFromYsonString(responseInfo->GetResponse()).AsMap().FindPtr("tablets"));
- return result;
- }
- TVector<TTableColumnarStatistics> THttpRawClient::GetTableColumnarStatistics(
- const TTransactionId& transactionId,
- const TVector<TRichYPath>& paths,
- const TGetTableColumnarStatisticsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "get_table_columnar_statistics");
- header.MergeParameters(NRawClient::SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- TVector<TTableColumnarStatistics> result;
- Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
- return result;
- }
- TMultiTablePartitions THttpRawClient::GetTablePartitions(
- const TTransactionId& transactionId,
- const TVector<TRichYPath>& paths,
- const TGetTablePartitionsOptions& options)
- {
- TMutationId mutationId;
- THttpHeader header("GET", "partition_tables");
- header.MergeParameters(NRawClient::SerializeParamsForGetTablePartitions(transactionId, paths, options));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- TMultiTablePartitions result;
- Deserialize(result, NodeFromYsonString(responseInfo->GetResponse()));
- return result;
- }
- ui64 THttpRawClient::GenerateTimestamp()
- {
- TMutationId mutationId;
- THttpHeader header("GET", "generate_timestamp");
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
- return NodeFromYsonString(responseInfo->GetResponse()).AsUint64();
- }
- IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest()
- {
- return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr);
- }
- IRawClientPtr THttpRawClient::Clone()
- {
- return ::MakeIntrusive<THttpRawClient>(Context_);
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT::NDetail
|