#include "raw_batch_request.h" #include "raw_requests.h" #include "rpc_parameters_serialization.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace NYT::NDetail::NRawClient { using NThreading::TFuture; using NThreading::TPromise; using NThreading::NewPromise; //////////////////////////////////////////////////////////////////////////////// static TString RequestInfo(const TNode& request) { return ::TStringBuilder() << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]); } static void EnsureNothing(const TMaybe& node) { Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType()); } static void EnsureSomething(const TMaybe& node) { Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response."); } static void EnsureType(const TNode& node, TNode::EType type) { Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. " << "Expected: " << type << ", actual: " << node.GetType()); } static void EnsureType(const TMaybe& node, TNode::EType type) { Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response."); EnsureType(*node, type); } //////////////////////////////////////////////////////////////////////////////// template class TResponseParserBase : public THttpRawBatchRequest::IResponseItemParser { public: using TFutureResult = TFuture; public: TResponseParserBase() : Result_(NewPromise()) { } void SetException(std::exception_ptr e) override { Result_.SetException(std::move(e)); } TFuture GetFuture() { return Result_.GetFuture(); } protected: TPromise Result_; }; //////////////////////////////////////////////////////////////////////////////// class TGetResponseParser : public TResponseParserBase { public: void SetResponse(TMaybe node) override { EnsureSomething(node); Result_.SetValue(std::move(*node)); } }; //////////////////////////////////////////////////////////////////////////////// class TVoidResponseParser : public TResponseParserBase { public: void SetResponse(TMaybe node) override { EnsureNothing(node); Result_.SetValue(); } }; //////////////////////////////////////////////////////////////////////////////// class TListResponseParser : public TResponseParserBase { public: void SetResponse(TMaybe node) override { EnsureType(node, TNode::List); Result_.SetValue(std::move(node->AsList())); } }; //////////////////////////////////////////////////////////////////////////////// class TExistsResponseParser : public TResponseParserBase { public: void SetResponse(TMaybe node) override { EnsureType(node, TNode::Bool); Result_.SetValue(std::move(node->AsBool())); } }; //////////////////////////////////////////////////////////////////////////////// class TGuidResponseParser : public TResponseParserBase { public: void SetResponse(TMaybe node) override { EnsureType(node, TNode::String); Result_.SetValue(GetGuid(node->AsString())); } }; //////////////////////////////////////////////////////////////////////////////// class TCanonizeYPathResponseParser : public TResponseParserBase { public: explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original) : OriginalNode_(PathToNode(original)) , PathPrefix_(std::move(pathPrefix)) { } void SetResponse(TMaybe node) override { EnsureType(node, TNode::String); for (const auto& item : OriginalNode_.GetAttributes().AsMap()) { node->Attributes()[item.first] = item.second; } TRichYPath result; Deserialize(result, *node); result.Path_ = AddPathPrefix(result.Path_, PathPrefix_); Result_.SetValue(result); } private: TNode OriginalNode_; TString PathPrefix_; }; //////////////////////////////////////////////////////////////////////////////// class TGetOperationResponseParser : public TResponseParserBase { public: void SetResponse(TMaybe node) override { EnsureType(node, TNode::Map); Result_.SetValue(ParseOperationAttributes(*node)); } }; //////////////////////////////////////////////////////////////////////////////// class TTableColumnarStatisticsParser : public TResponseParserBase> { public: void SetResponse(TMaybe node) override { EnsureType(node, TNode::List); TVector statistics; Deserialize(statistics, *node); Result_.SetValue(std::move(statistics)); } }; //////////////////////////////////////////////////////////////////////////////// class TTablePartitionsParser : public TResponseParserBase { public: void SetResponse(TMaybe node) override { EnsureType(node, TNode::Map); TMultiTablePartitions partitions; Deserialize(partitions, *node); Result_.SetValue(std::move(partitions)); } }; //////////////////////////////////////////////////////////////////////////////// class TGetFileFromCacheParser : public TResponseParserBase> { public: void SetResponse(TMaybe node) override { EnsureType(node, TNode::String); if (node->AsString().empty()) { Result_.SetValue(Nothing()); } else { Result_.SetValue(node->AsString()); } } }; //////////////////////////////////////////////////////////////////////////////// class TYPathParser : public TResponseParserBase { public: void SetResponse(TMaybe node) override { EnsureType(node, TNode::String); Result_.SetValue(node->AsString()); } }; //////////////////////////////////////////////////////////////////////////////// class TCheckPermissionParser : public TResponseParserBase { public: void SetResponse(TMaybe node) override { EnsureType(node, TNode::Map); Result_.SetValue(ParseCheckPermissionResponse(*node)); } }; //////////////////////////////////////////////////////////////////////////////// THttpRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr responseParser) : Parameters(std::move(parameters)) , ResponseParser(std::move(responseParser)) , NextTry() { } THttpRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry) : Parameters(batchItem.Parameters) , ResponseParser(batchItem.ResponseParser) , NextTry(nextTry) { } //////////////////////////////////////////////////////////////////////////////// THttpRawBatchRequest::THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy) : Context_(context) , RequestRetryPolicy_(std::move(retryPolicy)) { } THttpRawBatchRequest::~THttpRawBatchRequest() = default; void THttpRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options) { if (IsExecuted()) { ythrow yexception() << "Cannot execute batch request since it is already executed"; } Y_DEFER { MarkExecuted(); }; const auto concurrency = options.Concurrency_.GetOrElse(50); const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); if (!RequestRetryPolicy_) { RequestRetryPolicy_ = CreateDefaultRequestRetryPolicy(Context_.Config); } while (BatchSize()) { auto parameters = TNode::CreateMap(); TInstant nextTry; FillParameterList(batchPartMaxSize, ¶meters["requests"], &nextTry); if (nextTry) { SleepUntil(nextTry); } parameters["concurrency"] = concurrency; auto body = NodeToYsonString(parameters); THttpHeader header("POST", "execute_batch"); header.AddMutationId(); TResponseInfo result; try { result = RequestWithRetry( RequestRetryPolicy_, [this, &header, &body] (TMutationId& mutationId) { auto response = RequestWithoutRetry(Context_, mutationId, header, body); return TResponseInfo{ .RequestId = response->GetRequestId(), .Response = response->GetResponse(), .HttpCode = response->GetStatusCode(), }; }); } catch (const std::exception& e) { SetErrorResult(std::current_exception()); throw; } ParseResponse(std::move(result), RequestRetryPolicy_.Get()); } } bool THttpRawBatchRequest::IsExecuted() const { return Executed_; } void THttpRawBatchRequest::MarkExecuted() { Executed_ = true; } template typename TResponseParser::TFutureResult THttpRawBatchRequest::AddRequest( const TString& command, TNode parameters, TMaybe input) { return AddRequest(command, parameters, input, MakeIntrusive()); } template typename TResponseParser::TFutureResult THttpRawBatchRequest::AddRequest( const TString& command, TNode parameters, TMaybe input, ::TIntrusivePtr parser) { Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); TNode request; request["command"] = command; request["parameters"] = std::move(parameters); if (input) { request["input"] = std::move(*input); } BatchItemList_.emplace_back(std::move(request), parser); return parser->GetFuture(); } void THttpRawBatchRequest::AddRequest(TBatchItem batchItem) { Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed"); BatchItemList_.push_back(batchItem); } TFuture THttpRawBatchRequest::Create( const TTransactionId& transaction, const TYPath& path, ENodeType type, const TCreateOptions& options) { return AddRequest( "create", SerializeParamsForCreate(transaction, Context_.Config->Prefix, path, type, options), Nothing()); } TFuture THttpRawBatchRequest::Remove( const TTransactionId& transaction, const TYPath& path, const TRemoveOptions& options) { return AddRequest( "remove", SerializeParamsForRemove(transaction, Context_.Config->Prefix, path, options), Nothing()); } TFuture THttpRawBatchRequest::Exists( const TTransactionId& transaction, const TYPath& path, const TExistsOptions& options) { return AddRequest( "exists", SerializeParamsForExists(transaction, Context_.Config->Prefix, path, options), Nothing()); } TFuture THttpRawBatchRequest::Get( const TTransactionId& transaction, const TYPath& path, const TGetOptions& options) { return AddRequest( "get", SerializeParamsForGet(transaction, Context_.Config->Prefix, path, options), Nothing()); } TFuture THttpRawBatchRequest::Set( const TTransactionId& transaction, const TYPath& path, const TNode& node, const TSetOptions& options) { return AddRequest( "set", SerializeParamsForSet(transaction, Context_.Config->Prefix, path, options), node); } TFuture THttpRawBatchRequest::List( const TTransactionId& transaction, const TYPath& path, const TListOptions& options) { return AddRequest( "list", SerializeParamsForList(transaction, Context_.Config->Prefix, path, options), Nothing()); } TFuture THttpRawBatchRequest::Copy( const TTransactionId& transaction, const TYPath& sourcePath, const TYPath& destinationPath, const TCopyOptions& options) { return AddRequest( "copy", SerializeParamsForCopy(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options), Nothing()); } TFuture THttpRawBatchRequest::Move( const TTransactionId& transaction, const TYPath& sourcePath, const TYPath& destinationPath, const TMoveOptions& options) { return AddRequest( "move", SerializeParamsForMove(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options), Nothing()); } TFuture THttpRawBatchRequest::Link( const TTransactionId& transaction, const TYPath& targetPath, const TYPath& linkPath, const TLinkOptions& options) { return AddRequest( "link", SerializeParamsForLink(transaction, Context_.Config->Prefix, targetPath, linkPath, options), Nothing()); } TFuture THttpRawBatchRequest::Lock( const TTransactionId& transaction, const TYPath& path, ELockMode mode, const TLockOptions& options) { return AddRequest( "lock", SerializeParamsForLock(transaction, Context_.Config->Prefix, path, mode, options), Nothing()); } TFuture THttpRawBatchRequest::Unlock( const TTransactionId& transaction, const TYPath& path, const TUnlockOptions& options) { return AddRequest( "unlock", SerializeParamsForUnlock(transaction, Context_.Config->Prefix, path, options), Nothing()); } TFuture> THttpRawBatchRequest::GetFileFromCache( const TTransactionId& transactionId, const TString& md5Signature, const TYPath& cachePath, const TGetFileFromCacheOptions& options) { return AddRequest( "get_file_from_cache", SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options), Nothing()); } TFuture THttpRawBatchRequest::PutFileToCache( const TTransactionId& transactionId, const TYPath& filePath, const TString& md5Signature, const TYPath& cachePath, const TPutFileToCacheOptions& options) { return AddRequest( "put_file_to_cache", SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options), Nothing()); } TFuture THttpRawBatchRequest::CheckPermission( const TString& user, EPermission permission, const TYPath& path, const TCheckPermissionOptions& options) { return AddRequest( "check_permission", SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options), Nothing()); } TFuture THttpRawBatchRequest::GetOperation( const TOperationId& operationId, const TGetOperationOptions& options) { return AddRequest( "get_operation", SerializeParamsForGetOperation(operationId, options), Nothing()); } TFuture THttpRawBatchRequest::AbortOperation(const TOperationId& operationId) { return AddRequest( "abort_op", SerializeParamsForAbortOperation(operationId), Nothing()); } TFuture THttpRawBatchRequest::CompleteOperation(const TOperationId& operationId) { return AddRequest( "complete_op", SerializeParamsForCompleteOperation(operationId), Nothing()); } TFuture THttpRawBatchRequest::SuspendOperation( const TOperationId& operationId, const TSuspendOperationOptions& options) { return AddRequest( "suspend_operation", SerializeParamsForSuspendOperation(operationId, options), Nothing()); } TFuture THttpRawBatchRequest::ResumeOperation( const TOperationId& operationId, const TResumeOperationOptions& options) { return AddRequest( "resume_operation", SerializeParamsForResumeOperation(operationId, options), Nothing()); } TFuture THttpRawBatchRequest::UpdateOperationParameters( const TOperationId& operationId, const TUpdateOperationParametersOptions& options) { return AddRequest( "update_op_parameters", SerializeParamsForUpdateOperationParameters(operationId, options), Nothing()); } TFuture THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path) { TRichYPath result = path; // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath. if (!result.Path_.StartsWith("<")) { result.Path_ = AddPathPrefix(result.Path_, Context_.Config->Prefix); } if (result.Path_.find_first_of("<>{}[]") != TString::npos) { return AddRequest( "parse_ypath", SerializeParamsForParseYPath(result), Nothing(), MakeIntrusive(Context_.Config->Prefix, result)); } return NThreading::MakeFuture(result); } TFuture> THttpRawBatchRequest::GetTableColumnarStatistics( const TTransactionId& transaction, const TVector& paths, const TGetTableColumnarStatisticsOptions& options) { return AddRequest( "get_table_columnar_statistics", SerializeParamsForGetTableColumnarStatistics(transaction, paths, options), Nothing()); } TFuture THttpRawBatchRequest::GetTablePartitions( const TTransactionId& transaction, const TVector& paths, const TGetTablePartitionsOptions& options) { return AddRequest( "partition_tables", SerializeParamsForGetTablePartitions(transaction, paths, options), Nothing()); } void THttpRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const { Y_ABORT_UNLESS(result); Y_ABORT_UNLESS(nextTry); *nextTry = TInstant(); maxSize = Min(maxSize, BatchItemList_.size()); *result = TNode::CreateList(); for (size_t i = 0; i < maxSize; ++i) { YT_LOG_DEBUG("ExecuteBatch preparing: %v", RequestInfo(BatchItemList_[i].Parameters)); result->Add(BatchItemList_[i].Parameters); if (BatchItemList_[i].NextTry > *nextTry) { *nextTry = BatchItemList_[i].NextTry; } } } void THttpRawBatchRequest::ParseResponse( const TResponseInfo& requestResult, const IRequestRetryPolicyPtr& retryPolicy, TInstant now) { TNode node = NodeFromYsonString(requestResult.Response); return ParseResponse(node, requestResult.RequestId, retryPolicy, now); } void THttpRawBatchRequest::ParseResponse( TNode node, const TString& requestId, const IRequestRetryPolicyPtr& retryPolicy, TInstant now) { EnsureType(node, TNode::List); auto& responseList = node.AsList(); const auto size = responseList.size(); Y_ENSURE(size <= BatchItemList_.size(), "Size of server response exceeds size of batch request;" " size of batch: " << BatchItemList_.size() << " size of server response: " << size << '.'); for (size_t i = 0; i != size; ++i) { try { EnsureType(responseList[i], TNode::Map); auto& responseNode = responseList[i].AsMap(); const auto outputIt = responseNode.find("output"); if (outputIt != responseNode.end()) { BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second)); } else { const auto errorIt = responseNode.find("error"); if (errorIt == responseNode.end()) { BatchItemList_[i].ResponseParser->SetResponse(Nothing()); } else { TErrorResponse error(400, requestId); error.SetError(TYtError(errorIt->second)); if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) { YT_LOG_INFO( "Batch subrequest (%s) failed, will retry, error: %s", RequestInfo(BatchItemList_[i].Parameters), error.what()); AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval)); } else { YT_LOG_ERROR( "Batch subrequest (%s) failed, error: %s", RequestInfo(BatchItemList_[i].Parameters), error.what()); BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error)); } } } } catch (const std::exception& e) { // We don't expect other exceptions, so we don't catch (...) BatchItemList_[i].ResponseParser->SetException(std::current_exception()); } } BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size); } void THttpRawBatchRequest::SetErrorResult(std::exception_ptr e) const { for (const auto& batchItem : BatchItemList_) { batchItem.ResponseParser->SetException(e); } } size_t THttpRawBatchRequest::BatchSize() const { return BatchItemList_.size(); } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail::NRawClient