Browse Source

YT-19415: QueryTracker, add files in yql queries

mpereskokova 1 year ago
parent
commit
1ddfdf5e03

+ 21 - 1
yt/yql/plugin/bridge/interface.h

@@ -26,6 +26,10 @@ struct TBridgeYqlPluginOptions
     const char* OperationAttributes;
     ssize_t OperationAttributesLength = 0;
 
+    ssize_t MaxFilesSizeMb;
+    ssize_t MaxFileCount;
+    ssize_t DownloadFileRetryCount;
+
     const char* YTTokenPath;
 
     // TODO(max42): passing C++ objects across shared libraries is incredibly
@@ -63,8 +67,24 @@ struct TBridgeQueryResult
     ssize_t YsonErrorLength = 0;
 };
 
+enum EQueryFileContentType {
+    RawInlineData,
+    Url,
+};
+
+struct TBridgeQueryFile
+{
+    const char* Name;
+    size_t NameLength = 0;
+
+    const char* Content;
+    size_t ContentLength = 0;
+
+    EQueryFileContentType Type;
+};
+
 using TFuncBridgeFreeQueryResult = void(TBridgeQueryResult* result);
-using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings);
+using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* files, int fileCount);
 using TFuncBridgeGetProgress = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId);
 
 ////////////////////////////////////////////////////////////////////////////////

+ 19 - 3
yt/yql/plugin/bridge/plugin.cpp

@@ -79,7 +79,10 @@ public:
             .Clusters = bridgeClusters.data(),
             .DefaultCluster = defaultCluster,
             .OperationAttributes = operationAttributesString.data(),
-            .OperationAttributesLength = ssize(operationAttributesString),
+            .OperationAttributesLength = static_cast<int>(operationAttributesString.size()),
+            .MaxFilesSizeMb = options.MaxFilesSizeMb,
+            .MaxFileCount = options.MaxFileCount,
+            .DownloadFileRetryCount = options.DownloadFileRetryCount,
             .YTTokenPath = options.YTTokenPath.data(),
             .LogBackend = &options.LogBackend,
         };
@@ -87,11 +90,24 @@ public:
         BridgePlugin_ = BridgeCreateYqlPlugin(&bridgeOptions);
     }
 
-    TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, NYson::TYsonString settings) noexcept override
+    TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, NYson::TYsonString settings, std::vector<TQueryFile> files) noexcept override
     {
         auto settingsString = settings ? settings.ToString() : "{}";
         auto queryIdStr = ToString(queryId);
-        auto* bridgeQueryResult = BridgeRun(BridgePlugin_, queryIdStr.data(), impersonationUser.data(), queryText.data(), settingsString.data());
+
+        std::vector<TBridgeQueryFile> filesData;
+        filesData.reserve(files.size());
+        for (const auto& file : files) {
+            filesData.push_back(TBridgeQueryFile {
+                .Name = file.Name.data(),
+                .NameLength = file.Name.size(),
+                .Content = file.Content.data(),
+                .ContentLength = file.Content.size(),
+                .Type = file.Type,
+            });
+        }
+
+        auto* bridgeQueryResult = BridgeRun(BridgePlugin_, queryIdStr.data(), impersonationUser.data(), queryText.data(), settingsString.data(), filesData.data(), filesData.size());
         TQueryResult queryResult = {
             .YsonResult = ToString(bridgeQueryResult->YsonResult, bridgeQueryResult->YsonResultLength),
             .Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength),

+ 20 - 2
yt/yql/plugin/dynamic/impl.cpp

@@ -28,6 +28,9 @@ TBridgeYqlPlugin* BridgeCreateYqlPlugin(const TBridgeYqlPluginOptions* bridgeOpt
         .Clusters = std::move(clusters),
         .DefaultCluster = std::optional<TString>(bridgeOptions->DefaultCluster),
         .OperationAttributes = operationAttributes,
+        .MaxFilesSizeMb = static_cast<int>(bridgeOptions->MaxFilesSizeMb),
+        .MaxFileCount = static_cast<int>(bridgeOptions->MaxFileCount),
+        .DownloadFileRetryCount = static_cast<int>(bridgeOptions->DownloadFileRetryCount),
         .YTTokenPath = TString(bridgeOptions->YTTokenPath),
         .LogBackend = std::move(*reinterpret_cast<THolder<TLogBackend>*>(bridgeOptions->LogBackend)),
     };
@@ -64,14 +67,29 @@ void FillString(const char*& str, ssize_t& strLength, const std::optional<TStrin
     strLength = original->size();
 }
 
-TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings)
+TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* bridgeFiles, int bridgeFileCount)
 {
     static const auto EmptyMap = TYsonString(TString("{}"));
 
     auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
     auto* bridgeResult = new TBridgeQueryResult;
 
-    auto result = nativePlugin->Run(NYT::TGuid::FromString(queryId), TString(impersonationUser), TString(queryText), settings ? TYsonString(TString(settings)) : EmptyMap);
+    std::vector<TQueryFile> files(bridgeFileCount);
+    for (int index = 0; index < bridgeFileCount; index++) {
+        const auto& file = bridgeFiles[index];
+        files.push_back(TQueryFile {
+            .Name = TStringBuf(file.Name, file.NameLength),
+            .Content = TStringBuf(file.Content, file.ContentLength),
+            .Type = file.Type,
+        });
+    }
+
+    auto result = nativePlugin->Run(
+        NYT::TGuid::FromString(queryId),
+        TString(impersonationUser),
+        TString(queryText),
+        settings ? TYsonString(TString(settings)) : EmptyMap,
+        files);
     FillString(bridgeResult->YsonResult, bridgeResult->YsonResultLength, result.YsonResult);
     FillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan);
     FillString(bridgeResult->Statistics, bridgeResult->StatisticsLength, result.Statistics);

+ 41 - 4
yt/yql/plugin/native/plugin.cpp

@@ -86,6 +86,11 @@ public:
                 ytConfig->SetExecuteUdfLocallyIfPossible(true);
             }
 
+            auto pattern = ytConfig->AddRemoteFilePatterns();
+            pattern->SetPattern("yt://([a-zA-Z0-9\\-_]+)/([^&@?]+)$");
+            pattern->SetCluster("$1");
+            pattern->SetPath("$2");
+
             ytConfig->SetYtLogLevel(NYql::EYtLogLevel::YL_DEBUG);
             ytConfig->SetMrJobBin(options.MRJobBinary);
             ytConfig->SetMrJobBinMd5(MD5::File(options.MRJobBinary));
@@ -109,7 +114,9 @@ public:
             DefaultCluster_ = options.DefaultCluster;
 
             NYql::TFileStorageConfig fileStorageConfig;
-            fileStorageConfig.SetMaxSizeMb(1 << 14);
+            fileStorageConfig.SetMaxSizeMb(options.MaxFilesSizeMb);
+            fileStorageConfig.SetMaxFiles(options.MaxFileCount);
+            fileStorageConfig.SetRetryCount(options.DownloadFileRetryCount);
             FileStorage_ = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)}));
 
             FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(
@@ -181,7 +188,7 @@ public:
         YQL_LOG(INFO) << "YQL plugin initialized";
     }
 
-    TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings)
+    TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files)
     {
         auto credentials = MakeIntrusive<NYql::TCredentials>();
         if (YTTokenPath_) {
@@ -197,6 +204,9 @@ public:
         auto program = ProgramFactory_->Create("-memory-", queryText);
         program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
 
+        auto userDataTable = FilesToUserTable(files);
+        program->AddUserDataTable(userDataTable);
+
         auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> {
             if (!maybeStr) {
                 return std::nullopt;
@@ -267,10 +277,10 @@ public:
         };
     }
 
-    TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) noexcept override
+    TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept override
     {
         try {
-            return GuardedRun(queryId, impersonationUser, queryText, settings);
+            return GuardedRun(queryId, impersonationUser, queryText, settings, files);
         } catch (const std::exception& ex) {
             {
                 auto guard = WriterGuard(ProgressSpinLock);
@@ -314,6 +324,7 @@ private:
     TYsonString OperationAttributes_;
     YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ProgressSpinLock);
     THashMap<TQueryId, TActiveQuery> ActiveQueriesProgress_;
+    TVector<NYql::TDataProviderInitializer> DataProvidersInit_;
 
     TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings)
     {
@@ -326,6 +337,32 @@ private:
 
         return NodeToYsonString(resultAttributesMap);
     }
+
+    NYql::TUserDataTable FilesToUserTable(const std::vector<TQueryFile>& files)
+    {
+        NYql::TUserDataTable table;
+
+        for (const auto& file : files) {
+            NYql::TUserDataBlock& block = table[NYql::TUserDataKey::File(NYql::GetDefaultFilePrefix() + file.Name)];
+
+            block.Data = file.Content;
+            switch (file.Type) {
+                case EQueryFileContentType::RawInlineData: {
+                    block.Type = NYql::EUserDataType::RAW_INLINE_DATA;
+                    break;
+                }
+                case EQueryFileContentType::Url: {
+                    block.Type = NYql::EUserDataType::URL;
+                    break;
+                }
+                default: {
+                    ythrow yexception() << "Unexpected file content type";
+                }
+            }
+        }
+
+        return table;
+    }
 };
 
 ////////////////////////////////////////////////////////////////////////////////

+ 14 - 1
yt/yql/plugin/plugin.h

@@ -1,5 +1,7 @@
 #pragma once
 
+#include <yt/yql/plugin/bridge/interface.h>
+
 #include <util/generic/hash.h>
 #include <util/generic/string.h>
 
@@ -33,6 +35,10 @@ public:
 
     TYsonString OperationAttributes;
 
+    int MaxFilesSizeMb;
+    int MaxFileCount;
+    int DownloadFileRetryCount;
+
     TString YTTokenPath;
 
     THolder<TLogBackend> LogBackend;
@@ -52,6 +58,13 @@ struct TQueryResult
     std::optional<TString> YsonError;
 };
 
+struct TQueryFile
+{
+    TStringBuf Name;
+    TStringBuf Content;
+    EQueryFileContentType Type;
+};
+
 //! This interface encapsulates YT <-> YQL integration.
 //! There are two major implementation: one of them is based
 //! on YQL code and another wraps the pure C bridge interface, which
@@ -61,7 +74,7 @@ struct TQueryResult
 */
 struct IYqlPlugin
 {
-    virtual TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) noexcept = 0;
+    virtual TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept = 0;
     virtual TQueryResult GetProgress(TQueryId queryId) noexcept = 0;
 
     virtual ~IYqlPlugin() = default;

+ 2 - 0
yt/yt/client/api/public.h

@@ -164,6 +164,8 @@ DECLARE_REFCOUNTED_STRUCT(TPrerequisiteRevisionConfig)
 
 DECLARE_REFCOUNTED_STRUCT(TDetailedProfilingInfo)
 
+DECLARE_REFCOUNTED_STRUCT(TQueryFile)
+
 DECLARE_REFCOUNTED_STRUCT(TSchedulingOptions)
 
 DECLARE_REFCOUNTED_CLASS(TJobInputReader)

+ 2 - 1
yt/yt/client/api/query_tracker_client.cpp

@@ -14,12 +14,13 @@ using namespace NYTree;
 
 void Serialize(const TQuery& query, NYson::IYsonConsumer* consumer)
 {
-    static_assert(pfr::tuple_size<TQuery>::value == 13);
+    static_assert(pfr::tuple_size<TQuery>::value == 14);
     BuildYsonFluently(consumer)
         .BeginMap()
             .OptionalItem("id", query.Id)
             .OptionalItem("engine", query.Engine)
             .OptionalItem("query", query.Query)
+            .OptionalItem("files", query.Files)
             .OptionalItem("start_time", query.StartTime)
             .OptionalItem("finish_time", query.FinishTime)
             .OptionalItem("settings", query.Settings)

+ 27 - 0
yt/yt/client/api/query_tracker_client.h

@@ -15,6 +15,31 @@ struct TQueryTrackerOptions
     TString QueryTrackerStage = "production";
 };
 
+DEFINE_ENUM(ContentType,
+    ((RawInlineData)   (0))
+    ((Url)   (1))
+);
+
+struct TQueryFile
+    : public NYTree::TYsonStruct
+{
+    TString Name;
+    TString Content;
+    ContentType Type;
+
+    REGISTER_YSON_STRUCT(TQueryFile);
+
+    static void Register(TRegistrar registrar)
+    {
+        registrar.Parameter("name", &TThis::Name)
+            .NonEmpty();
+        registrar.Parameter("content", &TThis::Content);
+        registrar.Parameter("type", &TThis::Type);
+    }
+};
+
+DEFINE_REFCOUNTED_TYPE(TQueryFile)
+
 struct TStartQueryOptions
     : public TTimeoutOptions
     , public TQueryTrackerOptions
@@ -22,6 +47,7 @@ struct TStartQueryOptions
     NYTree::INodePtr Settings;
     bool Draft = false;
     NYTree::IMapNodePtr Annotations;
+    std::vector<TQueryFilePtr> Files;
 };
 
 struct TAbortQueryOptions
@@ -76,6 +102,7 @@ struct TQuery
     NQueryTrackerClient::TQueryId Id;
     std::optional<NQueryTrackerClient::EQueryEngine> Engine;
     std::optional<TString> Query;
+    NYson::TYsonString Files;
     std::optional<TInstant> StartTime;
     std::optional<TInstant> FinishTime;
     NYson::TYsonString Settings;

+ 2 - 0
yt/yt/client/driver/query_commands.cpp

@@ -23,6 +23,8 @@ TStartQueryCommand::TStartQueryCommand()
 {
     RegisterParameter("engine", Engine);
     RegisterParameter("query", Query);
+    RegisterParameter("files", Options.Files)
+        .Optional();
     RegisterParameter("stage", Options.QueryTrackerStage)
         .Optional();
     RegisterParameter("settings", Options.Settings)