|
- #include "raw_batch_request.h"
- #include "raw_requests.h"
- #include "rpc_parameters_serialization.h"
- #include <yt/cpp/mapreduce/common/helpers.h>
- #include <yt/cpp/mapreduce/common/retry_lib.h>
- #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
- #include <yt/cpp/mapreduce/interface/client.h>
- #include <yt/cpp/mapreduce/interface/errors.h>
- #include <yt/cpp/mapreduce/interface/serialize.h>
- #include <library/cpp/yson/node/node.h>
- #include <yt/cpp/mapreduce/http/context.h>
- #include <yt/cpp/mapreduce/http/retry_request.h>
- #include <util/generic/guid.h>
- #include <util/string/builder.h>
- #include <exception>
- namespace NYT::NDetail::NRawClient {
- using NThreading::TFuture;
- using NThreading::TPromise;
- using NThreading::NewPromise;
- ////////////////////////////////////////////////////////////////////
- static TString RequestInfo(const TNode& request)
- {
- return ::TStringBuilder()
- << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]);
- }
- static void EnsureNothing(const TMaybe<TNode>& node)
- {
- Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType());
- }
- static void EnsureSomething(const TMaybe<TNode>& node)
- {
- Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response.");
- }
- static void EnsureType(const TNode& node, TNode::EType type)
- {
- Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. "
- << "Expected: " << type << ", actual: " << node.GetType());
- }
- static void EnsureType(const TMaybe<TNode>& node, TNode::EType type)
- {
- Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response.");
- EnsureType(*node, type);
- }
- ////////////////////////////////////////////////////////////////////
- template <typename TReturnType>
- class TResponseParserBase
- : public TRawBatchRequest::IResponseItemParser
- {
- public:
- using TFutureResult = TFuture<TReturnType>;
- public:
- TResponseParserBase()
- : Result_(NewPromise<TReturnType>())
- { }
- void SetException(std::exception_ptr e) override
- {
- Result_.SetException(std::move(e));
- }
- TFuture<TReturnType> GetFuture()
- {
- return Result_.GetFuture();
- }
- protected:
- TPromise<TReturnType> Result_;
- };
- ////////////////////////////////////////////////////////////////////
- class TGetResponseParser
- : public TResponseParserBase<TNode>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureSomething(node);
- Result_.SetValue(std::move(*node));
- }
- };
- ////////////////////////////////////////////////////////////////////
- class TVoidResponseParser
- : public TResponseParserBase<void>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureNothing(node);
- Result_.SetValue();
- }
- };
- ////////////////////////////////////////////////////////////////////
- class TListResponseParser
- : public TResponseParserBase<TNode::TListType>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::List);
- Result_.SetValue(std::move(node->AsList()));
- }
- };
- ////////////////////////////////////////////////////////////////////
- class TExistsResponseParser
- : public TResponseParserBase<bool>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Bool);
- Result_.SetValue(std::move(node->AsBool()));
- }
- };
- ////////////////////////////////////////////////////////////////////
- class TGuidResponseParser
- : public TResponseParserBase<TGUID>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::String);
- Result_.SetValue(GetGuid(node->AsString()));
- }
- };
- ////////////////////////////////////////////////////////////////////
- class TCanonizeYPathResponseParser
- : public TResponseParserBase<TRichYPath>
- {
- public:
- explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original)
- : OriginalNode_(PathToNode(original))
- , PathPrefix_(std::move(pathPrefix))
- { }
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::String);
- for (const auto& item : OriginalNode_.GetAttributes().AsMap()) {
- node->Attributes()[item.first] = item.second;
- }
- TRichYPath result;
- Deserialize(result, *node);
- result.Path_ = AddPathPrefix(result.Path_, PathPrefix_);
- Result_.SetValue(result);
- }
- private:
- TNode OriginalNode_;
- TString PathPrefix_;
- };
- ////////////////////////////////////////////////////////////////////
- class TGetOperationResponseParser
- : public TResponseParserBase<TOperationAttributes>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Map);
- Result_.SetValue(ParseOperationAttributes(*node));
- }
- };
- ////////////////////////////////////////////////////////////////////
- class TTableColumnarStatisticsParser
- : public TResponseParserBase<TVector<TTableColumnarStatistics>>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Map);
- TVector<TTableColumnarStatistics> statistics;
- Deserialize(statistics, *node);
- Result_.SetValue(std::move(statistics));
- }
- };
- ////////////////////////////////////////////////////////////////////
- class TTablePartitionsParser
- : public TResponseParserBase<TMultiTablePartitions>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Map);
- TMultiTablePartitions partitions;
- Deserialize(partitions, *node);
- Result_.SetValue(std::move(partitions));
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- class TGetFileFromCacheParser
- : public TResponseParserBase<TMaybe<TYPath>>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::String);
- if (node->AsString().empty()) {
- Result_.SetValue(Nothing());
- } else {
- Result_.SetValue(node->AsString());
- }
- }
- };
- ////////////////////////////////////////////////////////////////////
- class TYPathParser
- : public TResponseParserBase<TYPath>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::String);
- Result_.SetValue(node->AsString());
- }
- };
- ////////////////////////////////////////////////////////////////////
- class TCheckPermissionParser
- : public TResponseParserBase<TCheckPermissionResponse>
- {
- public:
- void SetResponse(TMaybe<TNode> node) override
- {
- EnsureType(node, TNode::Map);
- Result_.SetValue(ParseCheckPermissionResponse(*node));
- }
- };
- ////////////////////////////////////////////////////////////////////
- TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser)
- : Parameters(std::move(parameters))
- , ResponseParser(std::move(responseParser))
- , NextTry()
- { }
- TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry)
- : Parameters(batchItem.Parameters)
- , ResponseParser(batchItem.ResponseParser)
- , NextTry(nextTry)
- { }
- ////////////////////////////////////////////////////////////////////
- TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config)
- : Config_(config)
- { }
- TRawBatchRequest::~TRawBatchRequest() = default;
- bool TRawBatchRequest::IsExecuted() const
- {
- return Executed_;
- }
- void TRawBatchRequest::MarkExecuted()
- {
- Executed_ = true;
- }
- template <typename TResponseParser>
- typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
- const TString& command,
- TNode parameters,
- TMaybe<TNode> input)
- {
- return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>());
- }
- template <typename TResponseParser>
- typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
- const TString& command,
- TNode parameters,
- TMaybe<TNode> input,
- ::TIntrusivePtr<TResponseParser> parser)
- {
- Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
- TNode request;
- request["command"] = command;
- request["parameters"] = std::move(parameters);
- if (input) {
- request["input"] = std::move(*input);
- }
- BatchItemList_.emplace_back(std::move(request), parser);
- return parser->GetFuture();
- }
- void TRawBatchRequest::AddRequest(TBatchItem batchItem)
- {
- Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
- BatchItemList_.push_back(batchItem);
- }
- TFuture<TNodeId> TRawBatchRequest::Create(
- const TTransactionId& transaction,
- const TYPath& path,
- ENodeType type,
- const TCreateOptions& options)
- {
- return AddRequest<TGuidResponseParser>(
- "create",
- SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options),
- Nothing());
- }
- TFuture<void> TRawBatchRequest::Remove(
- const TTransactionId& transaction,
- const TYPath& path,
- const TRemoveOptions& options)
- {
- return AddRequest<TVoidResponseParser>(
- "remove",
- SerializeParamsForRemove(transaction, Config_->Prefix, path, options),
- Nothing());
- }
- TFuture<bool> TRawBatchRequest::Exists(
- const TTransactionId& transaction,
- const TYPath& path,
- const TExistsOptions& options)
- {
- return AddRequest<TExistsResponseParser>(
- "exists",
- SerializeParamsForExists(transaction, Config_->Prefix, path, options),
- Nothing());
- }
- TFuture<TNode> TRawBatchRequest::Get(
- const TTransactionId& transaction,
- const TYPath& path,
- const TGetOptions& options)
- {
- return AddRequest<TGetResponseParser>(
- "get",
- SerializeParamsForGet(transaction, Config_->Prefix, path, options),
- Nothing());
- }
- TFuture<void> TRawBatchRequest::Set(
- const TTransactionId& transaction,
- const TYPath& path,
- const TNode& node,
- const TSetOptions& options)
- {
- return AddRequest<TVoidResponseParser>(
- "set",
- SerializeParamsForSet(transaction, Config_->Prefix, path, options),
- node);
- }
- TFuture<TNode::TListType> TRawBatchRequest::List(
- const TTransactionId& transaction,
- const TYPath& path,
- const TListOptions& options)
- {
- return AddRequest<TListResponseParser>(
- "list",
- SerializeParamsForList(transaction, Config_->Prefix, path, options),
- Nothing());
- }
- TFuture<TNodeId> TRawBatchRequest::Copy(
- const TTransactionId& transaction,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TCopyOptions& options)
- {
- return AddRequest<TGuidResponseParser>(
- "copy",
- SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options),
- Nothing());
- }
- TFuture<TNodeId> TRawBatchRequest::Move(
- const TTransactionId& transaction,
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TMoveOptions& options)
- {
- return AddRequest<TGuidResponseParser>(
- "move",
- SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options),
- Nothing());
- }
- TFuture<TNodeId> TRawBatchRequest::Link(
- const TTransactionId& transaction,
- const TYPath& targetPath,
- const TYPath& linkPath,
- const TLinkOptions& options)
- {
- return AddRequest<TGuidResponseParser>(
- "link",
- SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options),
- Nothing());
- }
- TFuture<TLockId> TRawBatchRequest::Lock(
- const TTransactionId& transaction,
- const TYPath& path,
- ELockMode mode,
- const TLockOptions& options)
- {
- return AddRequest<TGuidResponseParser>(
- "lock",
- SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options),
- Nothing());
- }
- TFuture<void> TRawBatchRequest::Unlock(
- const TTransactionId& transaction,
- const TYPath& path,
- const TUnlockOptions& options)
- {
- return AddRequest<TVoidResponseParser>(
- "unlock",
- SerializeParamsForUnlock(transaction, Config_->Prefix, path, options),
- Nothing());
- }
- TFuture<TMaybe<TYPath>> TRawBatchRequest::GetFileFromCache(
- const TTransactionId& transactionId,
- const TString& md5Signature,
- const TYPath& cachePath,
- const TGetFileFromCacheOptions& options)
- {
- return AddRequest<TGetFileFromCacheParser>(
- "get_file_from_cache",
- SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options),
- Nothing());
- }
- TFuture<TYPath> TRawBatchRequest::PutFileToCache(
- const TTransactionId& transactionId,
- const TYPath& filePath,
- const TString& md5Signature,
- const TYPath& cachePath,
- const TPutFileToCacheOptions& options)
- {
- return AddRequest<TYPathParser>(
- "put_file_to_cache",
- SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options),
- Nothing());
- }
- TFuture<TCheckPermissionResponse> TRawBatchRequest::CheckPermission(
- const TString& user,
- EPermission permission,
- const TYPath& path,
- const TCheckPermissionOptions& options)
- {
- return AddRequest<TCheckPermissionParser>(
- "check_permission",
- SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options),
- Nothing());
- }
- TFuture<TOperationAttributes> TRawBatchRequest::GetOperation(
- const TOperationId& operationId,
- const TGetOperationOptions& options)
- {
- return AddRequest<TGetOperationResponseParser>(
- "get_operation",
- SerializeParamsForGetOperation(operationId, options),
- Nothing());
- }
- TFuture<void> TRawBatchRequest::AbortOperation(const TOperationId& operationId)
- {
- return AddRequest<TVoidResponseParser>(
- "abort_op",
- SerializeParamsForAbortOperation(operationId),
- Nothing());
- }
- TFuture<void> TRawBatchRequest::CompleteOperation(const TOperationId& operationId)
- {
- return AddRequest<TVoidResponseParser>(
- "complete_op",
- SerializeParamsForCompleteOperation(operationId),
- Nothing());
- }
- TFuture<void> TRawBatchRequest::SuspendOperation(
- const TOperationId& operationId,
- const TSuspendOperationOptions& options)
- {
- return AddRequest<TVoidResponseParser>(
- "suspend_operation",
- SerializeParamsForSuspendOperation(operationId, options),
- Nothing());
- }
- TFuture<void> TRawBatchRequest::ResumeOperation(
- const TOperationId& operationId,
- const TResumeOperationOptions& options)
- {
- return AddRequest<TVoidResponseParser>(
- "resume_operation",
- SerializeParamsForResumeOperation(operationId, options),
- Nothing());
- }
- TFuture<void> TRawBatchRequest::UpdateOperationParameters(
- const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options)
- {
- return AddRequest<TVoidResponseParser>(
- "update_op_parameters",
- SerializeParamsForUpdateOperationParameters(operationId, options),
- Nothing());
- }
- TFuture<TRichYPath> TRawBatchRequest::CanonizeYPath(const TRichYPath& path)
- {
- TRichYPath result = path;
- // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath.
- if (!result.Path_.StartsWith("<")) {
- result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix);
- }
- if (result.Path_.find_first_of("<>{}[]") != TString::npos) {
- return AddRequest<TCanonizeYPathResponseParser>(
- "parse_ypath",
- SerializeParamsForParseYPath(result),
- Nothing(),
- MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, result));
- }
- return NThreading::MakeFuture(result);
- }
- TFuture<TVector<TTableColumnarStatistics>> TRawBatchRequest::GetTableColumnarStatistics(
- const TTransactionId& transaction,
- const TVector<TRichYPath>& paths,
- const TGetTableColumnarStatisticsOptions& options)
- {
- return AddRequest<TTableColumnarStatisticsParser>(
- "get_table_columnar_statistics",
- SerializeParamsForGetTableColumnarStatistics(transaction, paths, options),
- Nothing());
- }
- TFuture<TMultiTablePartitions> TRawBatchRequest::GetTablePartitions(
- const TTransactionId& transaction,
- const TVector<TRichYPath>& paths,
- const TGetTablePartitionsOptions& options)
- {
- return AddRequest<TTablePartitionsParser>(
- "partition_tables",
- SerializeParamsForGetTablePartitions(transaction, paths, options),
- Nothing());
- }
- void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const
- {
- Y_ABORT_UNLESS(result);
- Y_ABORT_UNLESS(nextTry);
- *nextTry = TInstant();
- maxSize = Min(maxSize, BatchItemList_.size());
- *result = TNode::CreateList();
- for (size_t i = 0; i < maxSize; ++i) {
- YT_LOG_DEBUG("ExecuteBatch preparing: %v",
- RequestInfo(BatchItemList_[i].Parameters));
- result->Add(BatchItemList_[i].Parameters);
- if (BatchItemList_[i].NextTry > *nextTry) {
- *nextTry = BatchItemList_[i].NextTry;
- }
- }
- }
- void TRawBatchRequest::ParseResponse(
- const TResponseInfo& requestResult,
- const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
- TInstant now)
- {
- TNode node = NodeFromYsonString(requestResult.Response);
- return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now);
- }
- void TRawBatchRequest::ParseResponse(
- TNode node,
- const TString& requestId,
- const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
- TInstant now)
- {
- Y_ABORT_UNLESS(retryBatch);
- EnsureType(node, TNode::List);
- auto& responseList = node.AsList();
- const auto size = responseList.size();
- Y_ENSURE(size <= BatchItemList_.size(),
- "Size of server response exceeds size of batch request;"
- " size of batch: " << BatchItemList_.size() <<
- " size of server response: " << size << '.');
- for (size_t i = 0; i != size; ++i) {
- try {
- EnsureType(responseList[i], TNode::Map);
- auto& responseNode = responseList[i].AsMap();
- const auto outputIt = responseNode.find("output");
- if (outputIt != responseNode.end()) {
- BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second));
- } else {
- const auto errorIt = responseNode.find("error");
- if (errorIt == responseNode.end()) {
- BatchItemList_[i].ResponseParser->SetResponse(Nothing());
- } else {
- TErrorResponse error(400, requestId);
- error.SetError(TYtError(errorIt->second));
- if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) {
- YT_LOG_INFO(
- "Batch subrequest (%s) failed, will retry, error: %s",
- RequestInfo(BatchItemList_[i].Parameters),
- error.what());
- retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
- } else {
- YT_LOG_ERROR(
- "Batch subrequest (%s) failed, error: %s",
- RequestInfo(BatchItemList_[i].Parameters),
- error.what());
- BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error));
- }
- }
- }
- } catch (const std::exception& e) {
- // We don't expect other exceptions, so we don't catch (...)
- BatchItemList_[i].ResponseParser->SetException(std::current_exception());
- }
- }
- BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size);
- }
- void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const
- {
- for (const auto& batchItem : BatchItemList_) {
- batchItem.ResponseParser->SetException(e);
- }
- }
- size_t TRawBatchRequest::BatchSize() const
- {
- return BatchItemList_.size();
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT::NDetail::NRawClient
|