|
@@ -3,26 +3,26 @@
|
|
|
#include "error_helpers.h"
|
|
|
#include "progress_merger.h"
|
|
|
|
|
|
+#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
|
|
|
#include <ydb/library/yql/providers/yt/lib/log/yt_logger.h>
|
|
|
#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
|
|
|
-#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
|
|
|
#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
|
|
|
|
|
|
-#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
|
|
|
#include "ydb/library/yql/providers/common/proto/gateways_config.pb.h"
|
|
|
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
|
|
|
+#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
|
|
|
|
|
|
#include <ydb/library/yql/ast/yql_expr.h>
|
|
|
-#include <ydb/library/yql/minikql/mkql_function_registry.h>
|
|
|
-#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
|
|
|
#include <ydb/library/yql/core/facade/yql_facade.h>
|
|
|
#include <ydb/library/yql/core/file_storage/file_storage.h>
|
|
|
-#include "ydb/library/yql/core/file_storage/proto/file_storage.pb.h"
|
|
|
+#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
|
|
|
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
|
|
|
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
|
|
|
#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
|
|
|
-#include <ydb/library/yql/utils/log/log.h>
|
|
|
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
|
|
|
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
|
|
|
#include <ydb/library/yql/utils/backtrace/backtrace.h>
|
|
|
+#include <ydb/library/yql/utils/log/log.h>
|
|
|
|
|
|
#include <yt/yt/core/ytree/convert.h>
|
|
|
|
|
@@ -31,14 +31,16 @@
|
|
|
|
|
|
#include <yt/cpp/mapreduce/interface/logging/logger.h>
|
|
|
|
|
|
+#include <yt/yt/core/yson/protobuf_interop.h>
|
|
|
+
|
|
|
#include <library/cpp/yt/threading/rw_spin_lock.h>
|
|
|
|
|
|
#include <library/cpp/yson/node/node_io.h>
|
|
|
#include <library/cpp/yson/parser.h>
|
|
|
#include <library/cpp/yson/writer.h>
|
|
|
|
|
|
-#include <library/cpp/resource/resource.h>
|
|
|
#include <library/cpp/digest/md5/md5.h>
|
|
|
+#include <library/cpp/resource/resource.h>
|
|
|
|
|
|
#include <util/folder/path.h>
|
|
|
#include <util/stream/file.h>
|
|
@@ -81,7 +83,7 @@ class TQueryPipelineConfigurator
|
|
|
{
|
|
|
public:
|
|
|
TQueryPipelineConfigurator(NYql::TProgramPtr program, TQueryPlan& plan)
|
|
|
- : Program_(program)
|
|
|
+ : Program_(std::move(program))
|
|
|
, Plan_(plan)
|
|
|
{ }
|
|
|
|
|
@@ -116,7 +118,7 @@ class TYqlPlugin
|
|
|
: public IYqlPlugin
|
|
|
{
|
|
|
public:
|
|
|
- TYqlPlugin(TYqlPluginOptions& options)
|
|
|
+ TYqlPlugin(TYqlPluginOptions options)
|
|
|
{
|
|
|
try {
|
|
|
auto singletonsConfig = NYTree::ConvertTo<TSingletonsConfigPtr>(options.SingletonsConfig);
|
|
@@ -128,7 +130,7 @@ public:
|
|
|
|
|
|
logger.SetDefaultPriority(ELogPriority::TLOG_DEBUG);
|
|
|
for (int i = 0; i < NYql::NLog::EComponentHelpers::ToInt(NYql::NLog::EComponent::MaxValue); ++i) {
|
|
|
- logger.SetComponentLevel((NYql::NLog::EComponent) i, NYql::NLog::ELevel::DEBUG);
|
|
|
+ logger.SetComponentLevel((NYql::NLog::EComponent)i, NYql::NLog::ELevel::DEBUG);
|
|
|
}
|
|
|
|
|
|
NYql::SetYtLoggerGlobalBackend(NYT::ILogger::ELevel::DEBUG);
|
|
@@ -136,45 +138,30 @@ public:
|
|
|
NYT::TConfig::Get()->Prefix = "//";
|
|
|
}
|
|
|
|
|
|
- auto yqlCoreFlags = GatewaysConfig_.GetYqlCore()
|
|
|
- .GetFlags();
|
|
|
-
|
|
|
- auto ytConfig = GatewaysConfig_.MutableYt();
|
|
|
- if (!ytConfig->HasExecuteUdfLocallyIfPossible()) {
|
|
|
- ytConfig->SetExecuteUdfLocallyIfPossible(true);
|
|
|
- }
|
|
|
-
|
|
|
- auto pattern = ytConfig->AddRemoteFilePatterns();
|
|
|
- pattern->SetPattern("yt://([a-zA-Z0-9\\-_]+)/([^&@?]+)$");
|
|
|
- pattern->SetCluster("$1");
|
|
|
- pattern->SetPath("$2");
|
|
|
+ NYson::TProtobufWriterOptions protobufWriterOptions;
|
|
|
+ protobufWriterOptions.ConvertSnakeToCamelCase = true;
|
|
|
|
|
|
- ytConfig->SetYtLogLevel(NYql::EYtLogLevel::YL_DEBUG);
|
|
|
- ytConfig->SetMrJobBin(options.MRJobBinary);
|
|
|
- ytConfig->SetMrJobBinMd5(MD5::File(options.MRJobBinary));
|
|
|
+ auto* gatewayYtConfig = GatewaysConfig_.MutableYt();
|
|
|
+ gatewayYtConfig->ParseFromStringOrThrow(NYson::YsonStringToProto(
|
|
|
+ options.GatewayConfig,
|
|
|
+ NYson::ReflectProtobufMessageType<NYql::TYtGatewayConfig>(),
|
|
|
+ protobufWriterOptions));
|
|
|
|
|
|
- ytConfig->ClearMrJobUdfsDir();
|
|
|
+ NYql::TFileStorageConfig fileStorageConfig;
|
|
|
+ fileStorageConfig.ParseFromStringOrThrow(NYson::YsonStringToProto(
|
|
|
+ options.FileStorageConfig,
|
|
|
+ NYson::ReflectProtobufMessageType<NYql::TFileStorageConfig>(),
|
|
|
+ protobufWriterOptions));
|
|
|
|
|
|
- auto setting = ytConfig->AddDefaultSettings();
|
|
|
- setting->SetName("NativeYtTypeCompatibility");
|
|
|
- setting->SetValue("all");
|
|
|
+ gatewayYtConfig->SetMrJobBinMd5(MD5::File(gatewayYtConfig->GetMrJobBin()));
|
|
|
|
|
|
- for (const auto& [cluster, address]: options.Clusters) {
|
|
|
- auto item = ytConfig->AddClusterMapping();
|
|
|
- item->SetName(cluster);
|
|
|
- item->SetCluster(address);
|
|
|
- if (cluster == options.DefaultCluster) {
|
|
|
- item->SetDefault(true);
|
|
|
+ for (const auto& mapping : gatewayYtConfig->GetClusterMapping()) {
|
|
|
+ Clusters_.insert({mapping.name(), TString(NYql::YtProviderName)});
|
|
|
+ if (mapping.GetDefault()) {
|
|
|
+ DefaultCluster_ = mapping.name();
|
|
|
}
|
|
|
-
|
|
|
- Clusters_.insert({item->GetName(), TString(NYql::YtProviderName)});
|
|
|
}
|
|
|
- DefaultCluster_ = options.DefaultCluster;
|
|
|
|
|
|
- NYql::TFileStorageConfig fileStorageConfig;
|
|
|
- fileStorageConfig.SetMaxSizeMb(options.MaxFilesSizeMb);
|
|
|
- fileStorageConfig.SetMaxFiles(options.MaxFileCount);
|
|
|
- fileStorageConfig.SetRetryCount(options.DownloadFileRetryCount);
|
|
|
FileStorage_ = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)}));
|
|
|
|
|
|
FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(
|
|
@@ -187,8 +174,8 @@ public:
|
|
|
NKikimr::NMiniKQL::TUdfModulePathsMap systemModules;
|
|
|
|
|
|
TVector<TString> udfPaths;
|
|
|
- NKikimr::NMiniKQL::FindUdfsInDir(options.UdfDirectory, &udfPaths);
|
|
|
- for (const auto& path: udfPaths) {
|
|
|
+ NKikimr::NMiniKQL::FindUdfsInDir(gatewayYtConfig->GetMrJobUdfsDir(), &udfPaths);
|
|
|
+ for (const auto& path : udfPaths) {
|
|
|
// Skip YQL plugin shared library itself, it is not a UDF.
|
|
|
if (path.EndsWith("libyqlplugin.so")) {
|
|
|
continue;
|
|
@@ -196,7 +183,9 @@ public:
|
|
|
FuncRegistry_->LoadUdfs(path, emptyRemappings, 0);
|
|
|
}
|
|
|
|
|
|
- for (auto& m: FuncRegistry_->GetAllModuleNames()) {
|
|
|
+ gatewayYtConfig->ClearMrJobUdfsDir();
|
|
|
+
|
|
|
+ for (const auto& m : FuncRegistry_->GetAllModuleNames()) {
|
|
|
TMaybe<TString> path = FuncRegistry_->FindUdfPath(m);
|
|
|
if (!path) {
|
|
|
YQL_LOG(FATAL) << "Unable to detect UDF path for module " << m;
|
|
@@ -226,7 +215,7 @@ public:
|
|
|
NYql::TYtNativeServices ytServices;
|
|
|
ytServices.FunctionRegistry = FuncRegistry_.Get();
|
|
|
ytServices.FileStorage = FileStorage_;
|
|
|
- ytServices.Config = std::make_shared<NYql::TYtGatewayConfig>(*ytConfig);
|
|
|
+ ytServices.Config = std::make_shared<NYql::TYtGatewayConfig>(*gatewayYtConfig);
|
|
|
auto ytNativeGateway = CreateYtNativeGateway(ytServices);
|
|
|
dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
|
|
|
|
|
@@ -312,7 +301,7 @@ public:
|
|
|
if (program->HasResults()) {
|
|
|
::NYson::TYsonWriter yson(&result, EYsonFormat::Binary);
|
|
|
yson.OnBeginList();
|
|
|
- for (const auto& result: program->Results()) {
|
|
|
+ for (const auto& result : program->Results()) {
|
|
|
yson.OnListItem();
|
|
|
yson.OnRaw(result);
|
|
|
}
|
|
@@ -383,19 +372,19 @@ private:
|
|
|
THashMap<TQueryId, TActiveQuery> ActiveQueriesProgress_;
|
|
|
TVector<NYql::TDataProviderInitializer> DataProvidersInit_;
|
|
|
|
|
|
- TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings)
|
|
|
+ static TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings)
|
|
|
{
|
|
|
auto querySettingsMap = NodeFromYsonString(querySettings.ToString());
|
|
|
auto resultAttributesMap = NodeFromYsonString(configAttributes.ToString());
|
|
|
|
|
|
- for (const auto& item: querySettingsMap.AsMap()) {
|
|
|
+ for (const auto& item : querySettingsMap.AsMap()) {
|
|
|
resultAttributesMap[item.first] = item.second;
|
|
|
}
|
|
|
|
|
|
return NodeToYsonString(resultAttributesMap);
|
|
|
}
|
|
|
|
|
|
- NYql::TUserDataTable FilesToUserTable(const std::vector<TQueryFile>& files)
|
|
|
+ static NYql::TUserDataTable FilesToUserTable(const std::vector<TQueryFile>& files)
|
|
|
{
|
|
|
NYql::TUserDataTable table;
|
|
|
|
|
@@ -428,9 +417,9 @@ private:
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
-std::unique_ptr<IYqlPlugin> CreateYqlPlugin(TYqlPluginOptions& options) noexcept
|
|
|
+std::unique_ptr<IYqlPlugin> CreateYqlPlugin(TYqlPluginOptions options) noexcept
|
|
|
{
|
|
|
- return std::make_unique<NNative::TYqlPlugin>(options);
|
|
|
+ return std::make_unique<NNative::TYqlPlugin>(std::move(options));
|
|
|
}
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|