Browse Source

Fast mr template

Fast mr template
commit_hash:8a018ae643735f63d073fa7e68cf92402a3e7718
cdzyura171 1 month ago
parent
commit
37731049da

+ 1 - 0
yql/essentials/utils/log/log.cpp

@@ -161,6 +161,7 @@ NYql::NLog::EComponent ConvertComponent(NYql::NProto::TLoggingConfig::EComponent
     case TLoggingConfig::PROVIDER_GENERIC: return EComponent::ProviderGeneric;
     case TLoggingConfig::PROVIDER_PG: return EComponent::ProviderPg;
     case TLoggingConfig::PROVIDER_PURE: return EComponent::ProviderPure;
+    case TLoggingConfig::FAST_MAP_REDUCE: return EComponent::FastMapReduce;
     }
 
     ythrow yexception() << "unknown log component: "

+ 3 - 0
yql/essentials/utils/log/log_component.h

@@ -36,6 +36,7 @@ enum class EComponent {
     ProviderGeneric,
     ProviderPg,
     ProviderPure,
+    FastMapReduce,
     // <--- put other log components here
     MaxValue
 };
@@ -81,6 +82,7 @@ struct EComponentHelpers {
         case EComponent::ProviderGeneric: return TStringBuf("generic");
         case EComponent::ProviderPg: return TStringBuf("PG");
         case EComponent::ProviderPure: return TStringBuf("pure");
+        case EComponent::FastMapReduce: return TStringBuf("fast map reduce");
         default:
             ythrow yexception() << "invalid log component value: "
                                 << ToInt(component);
@@ -115,6 +117,7 @@ struct EComponentHelpers {
         if (str == TStringBuf("generic")) return EComponent::ProviderGeneric;
         if (str == TStringBuf("PG")) return EComponent::ProviderPg;
         if (str == TStringBuf("pure")) return EComponent::ProviderPure;
+        if (str == TStringBuf("fast map reduce")) return EComponent::FastMapReduce;
         ythrow yexception() << "unknown log component: '" << str << '\'';
     }
 

+ 1 - 0
yql/essentials/utils/log/proto/logger_config.proto

@@ -48,6 +48,7 @@ message TLoggingConfig {
         PROVIDER_GENERIC = 24;
         PROVIDER_PG = 25;
         PROVIDER_PURE = 26;
+        FAST_MAP_REDUCE = 27;
     }
 
     message TComponentLevel {

+ 1 - 0
yt/yql/providers/yt/provider/ya.make

@@ -16,6 +16,7 @@ SRCS(
     yql_yt_datasource_type_ann.cpp
     yql_yt_datasource.cpp
     yql_yt_epoch.cpp
+    yql_yt_forwarding_gateway.cpp
     yql_yt_gateway.cpp
     yql_yt_horizontal_join.cpp
     yql_yt_helpers.cpp

+ 140 - 0
yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp

@@ -0,0 +1,140 @@
+#include "yql_yt_forwarding_gateway.h"
+
+using namespace NThreading;
+
+namespace NYql {
+
+TYtForwardingGatewayBase::TYtForwardingGatewayBase(IYtGateway::TPtr&& slave)
+    : Slave_(std::move(slave))
+{
+}
+
+void TYtForwardingGatewayBase::OpenSession(TOpenSessionOptions&& options) {
+    Slave_->OpenSession(std::move(options));
+}
+
+TFuture<void> TYtForwardingGatewayBase::CloseSession(TCloseSessionOptions&& options) {
+    return Slave_->CloseSession(std::move(options));
+}
+
+TFuture<void> TYtForwardingGatewayBase::CleanupSession(TCleanupSessionOptions&& options) {
+    return Slave_->CleanupSession(std::move(options));
+}
+
+TFuture<IYtGateway::TFinalizeResult> TYtForwardingGatewayBase::Finalize(TFinalizeOptions&& options) {
+    return Slave_->Finalize(std::move(options));
+}
+
+TFuture<IYtGateway::TCanonizePathsResult> TYtForwardingGatewayBase::CanonizePaths(TCanonizePathsOptions&& options) {
+    return Slave_->CanonizePaths(std::move(options));
+}
+
+TFuture<IYtGateway::TTableInfoResult> TYtForwardingGatewayBase::GetTableInfo(TGetTableInfoOptions&& options) {
+    return Slave_->GetTableInfo(std::move(options));
+}
+
+TFuture<IYtGateway::TTableRangeResult> TYtForwardingGatewayBase::GetTableRange(TTableRangeOptions&& options) {
+    return Slave_->GetTableRange(std::move(options));
+}
+
+TFuture<IYtGateway::TFolderResult> TYtForwardingGatewayBase::GetFolder(TFolderOptions&& options) {
+    return Slave_->GetFolder(std::move(options));
+}
+
+TFuture<IYtGateway::TBatchFolderResult> TYtForwardingGatewayBase::ResolveLinks(TResolveOptions&& options) {
+    return Slave_->ResolveLinks(std::move(options));
+}
+
+TFuture<IYtGateway::TBatchFolderResult> TYtForwardingGatewayBase::GetFolders(TBatchFolderOptions&& options) {
+    return Slave_->GetFolders(std::move(options));
+}
+
+TFuture<IYtGateway::TResOrPullResult> TYtForwardingGatewayBase::ResOrPull(const TExprNode::TPtr& node, TExprContext& ctx, TResOrPullOptions&& options) {
+    return Slave_->ResOrPull(node, ctx, std::move(options));
+}
+
+TFuture<IYtGateway::TRunResult> TYtForwardingGatewayBase::Run(const TExprNode::TPtr& node, TExprContext& ctx, TRunOptions&& options) {
+    return Slave_->Run(node, ctx, std::move(options));
+}
+
+TFuture<IYtGateway::TRunResult> TYtForwardingGatewayBase::Prepare(const TExprNode::TPtr& node, TExprContext& ctx, TPrepareOptions&& options) const {
+    return Slave_->Prepare(node, ctx, std::move(options));
+}
+
+TFuture<IYtGateway::TCalcResult> TYtForwardingGatewayBase::Calc(const TExprNode::TListType& nodes, TExprContext& ctx, TCalcOptions&& options) {
+    return Slave_->Calc(nodes, ctx, std::move(options));
+}
+
+TFuture<IYtGateway::TPublishResult> TYtForwardingGatewayBase::Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) {
+    return Slave_->Publish(node, ctx, std::move(options));
+}
+
+TFuture<IYtGateway::TCommitResult> TYtForwardingGatewayBase::Commit(TCommitOptions&& options) {
+    return Slave_->Commit(std::move(options));
+}
+
+TFuture<IYtGateway::TDropTrackablesResult> TYtForwardingGatewayBase::DropTrackables(TDropTrackablesOptions&& options) {
+    return Slave_->DropTrackables(std::move(options));
+}
+
+TFuture<IYtGateway::TPathStatResult> TYtForwardingGatewayBase::PathStat(TPathStatOptions&& options) {
+    return Slave_->PathStat(std::move(options));
+}
+
+IYtGateway::TPathStatResult TYtForwardingGatewayBase::TryPathStat(TPathStatOptions&& options) {
+    return Slave_->TryPathStat(std::move(options));
+}
+
+bool TYtForwardingGatewayBase::TryParseYtUrl(const TString& url, TString* cluster, TString* path) const {
+    return Slave_->TryParseYtUrl(url, cluster, path);
+}
+
+TString TYtForwardingGatewayBase::GetDefaultClusterName() const {
+    return Slave_->GetDefaultClusterName();
+}
+
+TString TYtForwardingGatewayBase::GetClusterServer(const TString& cluster) const {
+    return Slave_->GetClusterServer(cluster);
+}
+
+NYT::TRichYPath TYtForwardingGatewayBase::GetRealTable(const TString& sessionId, const TString& cluster, const TString& table, ui32 epoch, const TString& tmpFolder) const {
+    return Slave_->GetRealTable(sessionId, cluster, table, epoch, tmpFolder);
+}
+
+NYT::TRichYPath TYtForwardingGatewayBase::GetWriteTable(const TString& sessionId, const TString& cluster, const TString& table, const TString& tmpFolder) const {
+    return Slave_->GetWriteTable(sessionId, cluster, table, tmpFolder);
+}
+
+TFuture<IYtGateway::TDownloadTablesResult> TYtForwardingGatewayBase::DownloadTables(TDownloadTablesOptions&& options) {
+    return Slave_->DownloadTables(std::move(options));
+}
+
+TFuture<IYtGateway::TUploadTableResult> TYtForwardingGatewayBase::UploadTable(TUploadTableOptions&& options) {
+    return Slave_->UploadTable(std::move(options));
+}
+
+NThreading::TFuture<IYtGateway::TRunResult> TYtForwardingGatewayBase::GetTableStat(const TExprNode::TPtr& node, TExprContext& ctx, TPrepareOptions&& options) {
+    return Slave_->GetTableStat(node, ctx, std::move(options));
+}
+
+IYtGateway::TFullResultTableResult TYtForwardingGatewayBase::PrepareFullResultTable(TFullResultTableOptions&& options) {
+    return Slave_->PrepareFullResultTable(std::move(options));
+}
+
+void TYtForwardingGatewayBase::SetStatUploader(IStatUploader::TPtr statUploader) {
+    Slave_->SetStatUploader(statUploader);
+}
+
+void TYtForwardingGatewayBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) {
+    Slave_->RegisterMkqlCompiler(compiler);
+}
+
+IYtGateway::TGetTablePartitionsResult TYtForwardingGatewayBase::GetTablePartitions(TGetTablePartitionsOptions&& options) {
+    return Slave_->GetTablePartitions(std::move(options));
+}
+
+void TYtForwardingGatewayBase::AddCluster(const TYtClusterConfig& config) {
+    Slave_->AddCluster(config);
+}
+
+} // namspace NYql

+ 78 - 0
yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h

@@ -0,0 +1,78 @@
+#include "yql_yt_provider.h"
+
+using namespace NThreading;
+
+namespace NYql {
+
+class TYtForwardingGatewayBase: public IYtGateway {
+public:
+    TYtForwardingGatewayBase(IYtGateway::TPtr&& slave);
+
+    void OpenSession(TOpenSessionOptions&& options) override;
+
+    TFuture<void> CloseSession(TCloseSessionOptions&& options) override;
+
+    TFuture<void> CleanupSession(TCleanupSessionOptions&& options) override;
+
+    TFuture<TFinalizeResult> Finalize(TFinalizeOptions&& options) override;
+
+    TFuture<TCanonizePathsResult> CanonizePaths(TCanonizePathsOptions&& options) override;
+
+    TFuture<TTableInfoResult> GetTableInfo(TGetTableInfoOptions&& options) override;
+    TFuture<TTableRangeResult> GetTableRange(TTableRangeOptions&& options) override;
+
+    TFuture<TFolderResult> GetFolder(TFolderOptions&& options) override;
+
+    TFuture<TBatchFolderResult> ResolveLinks(TResolveOptions&& options) override;
+
+    TFuture<TBatchFolderResult> GetFolders(TBatchFolderOptions&& options) override;
+
+    TFuture<TResOrPullResult> ResOrPull(const TExprNode::TPtr& node, TExprContext& ctx, TResOrPullOptions&& options) override;
+
+    TFuture<TRunResult> Run(const TExprNode::TPtr& node, TExprContext& ctx, TRunOptions&& options) override;
+
+    TFuture<TRunResult> Prepare(const TExprNode::TPtr& node, TExprContext& ctx, TPrepareOptions&& options) const override;
+
+    TFuture<TCalcResult> Calc(const TExprNode::TListType& nodes, TExprContext& ctx, TCalcOptions&& options) override;
+
+    TFuture<TPublishResult> Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) override;
+
+    TFuture<TCommitResult> Commit(TCommitOptions&& options) override;
+
+    TFuture<TDropTrackablesResult> DropTrackables(TDropTrackablesOptions&& options) override;
+
+    TFuture<TPathStatResult> PathStat(TPathStatOptions&& options) override;
+
+    TPathStatResult TryPathStat(TPathStatOptions&& options) override;
+
+    bool TryParseYtUrl(const TString& url, TString* cluster, TString* path) const override;
+
+    TString GetDefaultClusterName() const override;
+
+    TString GetClusterServer(const TString& cluster) const override;
+
+    NYT::TRichYPath GetRealTable(const TString& sessionId, const TString& cluster, const TString& table, ui32 epoch, const TString& tmpFolder) const override;
+
+    NYT::TRichYPath GetWriteTable(const TString& sessionId, const TString& cluster, const TString& table, const TString& tmpFolder) const override;
+
+    TFuture<TDownloadTablesResult> DownloadTables(TDownloadTablesOptions&& options) override;
+
+    TFuture<TUploadTableResult> UploadTable(TUploadTableOptions&& options) override;
+
+    NThreading::TFuture<TRunResult> GetTableStat(const TExprNode::TPtr& node, TExprContext& ctx, TPrepareOptions&& options) override;
+
+    TFullResultTableResult PrepareFullResultTable(TFullResultTableOptions&& options) override;
+
+    void SetStatUploader(IStatUploader::TPtr statUploader) override;
+
+    void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override;
+
+    TGetTablePartitionsResult GetTablePartitions(TGetTablePartitionsOptions&& options) override;
+
+    void AddCluster(const TYtClusterConfig& config) override;
+
+protected:
+    IYtGateway::TPtr Slave_;
+};
+
+} // namspace NYql