Browse Source

YQL-16196 Add FileOptions pragma and bypass_artifact_cache option for MR job spec

avevad 1 year ago
parent
commit
8d05d68959

+ 2 - 0
ydb/library/yql/core/yql_user_data.h

@@ -39,6 +39,8 @@ struct TUserDataBlock {
     TFileLinkPtr FrozenFile;
     // Prefix added to all UDF module names
     TString CustomUdfPrefix = {};
+
+    THashMap<TString, TString> Options = {};
 };
 
 class TUserDataKey {

+ 23 - 0
ydb/library/yql/providers/config/yql_config_provider.cpp

@@ -477,6 +477,10 @@ namespace {
                 if (!AddFileByUrl(pos, args, ctx)) {
                     return false;
                 }
+            } else if (name == "SetFileOption") {
+                if (!SetFileOption(pos, args, ctx)) {
+                    return false;
+                }
             } else if (name == "AddFolderByUrl") {
                 if (!AddFolderByUrl(pos, args, ctx)) {
                     return false;
@@ -979,6 +983,25 @@ namespace {
             return AddFileByUrlImpl(args[0], args[1], token, pos, ctx);
         }
 
+        bool SetFileOptionImpl(const TStringBuf alias, const TString& key, const TString& value, const TPosition& pos, TExprContext& ctx) {
+            const auto dataKey = TUserDataStorage::ComposeUserDataKey(alias);
+            const auto dataBlock = Types.UserDataStorage->FindUserDataBlock(dataKey);
+            if (!dataBlock) {
+                ctx.AddError(TIssue(pos, TStringBuilder() << "No such file '" << alias << "'"));
+                return false;
+            }
+            dataBlock->Options[key] = value;
+            return true;
+        }
+
+        bool SetFileOption(const TPosition& pos, const TVector<TStringBuf>& args, TExprContext& ctx) {
+            if (args.size() != 3) {
+                ctx.AddError(TIssue(pos, TStringBuilder() << "Expected 3 arguments, but got " << args.size()));
+                return false;
+            }
+            return SetFileOptionImpl(args[0], ToString(args[1]), ToString(args[2]), pos, ctx);
+        }
+
         bool SetPackageVersion(const TPosition& pos, const TVector<TStringBuf>& args, TExprContext& ctx) {
             if (args.size() != 2) {
                 ctx.AddError(TIssue(pos, TStringBuilder() << "Expected 2 arguments, but got " << args.size()));

+ 9 - 0
ydb/library/yql/providers/yt/gateway/lib/user_files.cpp

@@ -27,6 +27,15 @@ void TUserFiles::AddFile(const TUserDataKey& key, const TUserDataBlock& block) {
     TFileInfo userFile;
     userFile.IsUdf = block.Usage.Test(EUserDataBlockUsage::Udf);
 
+    if (block.Options.contains("bypass_artifact_cache")) {
+        auto option = block.Options.at(TString("bypass_artifact_cache"));
+        try {
+            userFile.BypassArtifactCache = FromString<bool>(option);
+        } catch (const TFromStringException &) {
+            YQL_LOG_CTX_THROW yexception() << "FileOption: invalid value for option bypass_artifact_cache: " << option;
+        }
+    }
+
     // we can optimize file copy if file resides on the same cluster
     // and provide only link
     TString cluster;

+ 1 - 0
ydb/library/yql/providers/yt/gateway/lib/user_files.h

@@ -26,6 +26,7 @@ public:
         ui64 InMemorySize = 0;
         TString RemotePath;
         double RemoteMemoryFactor = 0.;
+        bool BypassArtifactCache = false;
     };
 
 public:

+ 14 - 12
ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp

@@ -55,8 +55,8 @@ TGatewayTransformer::TGatewayTransformer(const TExecContextBase& execCtx, TYtSet
     , JobUdfs_(std::make_shared<THashMap<TString, TString>>())
     , UniqFiles_(std::make_shared<THashMap<TString, TString>>())
     , RemoteFiles_(std::make_shared<TVector<NYT::TRichYPath>>())
-    , LocalFiles_(std::make_shared<TVector<std::pair<TString, TString>>>())
-    , DeferredUdfFiles_(std::make_shared<TVector<std::pair<TString, TString>>>())
+    , LocalFiles_(std::make_shared<TVector<std::pair<TString, TLocalFileInfo>>>())
+    , DeferredUdfFiles_(std::make_shared<TVector<std::pair<TString, TLocalFileInfo>>>())
 
 {
     if (optLLVM != "OFF") {
@@ -289,7 +289,7 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName name) {
                             YQL_CLOG(DEBUG, ProviderYt) << "Passing table " << richYPathDesc << " as file "
                                 << fileName.Quote() << " (size=" << TFileStat(outPath).Size << ')';
 
-                            LocalFiles_->emplace_back(outPath, TString());
+                            LocalFiles_->emplace_back(outPath, TLocalFileInfo{TString(), false});
                         }
                     }
                 }
@@ -424,19 +424,21 @@ void TGatewayTransformer::ApplyUserJobSpec(NYT::TUserJobSpec& spec, bool localRu
     bool fakeChecksum = (GetEnv("YQL_LOCAL") == "1");  // YQL-15353
     for (auto& file: *LocalFiles_) {
         TAddLocalFileOptions opts;
-        if (!fakeChecksum && file.second) {
-            opts.MD5CheckSum(file.second);
+        if (!fakeChecksum && file.second.Hash) {
+            opts.MD5CheckSum(file.second.Hash);
         }
+        opts.BypassArtifactCache(file.second.BypassArtifactCache);
         spec.AddLocalFile(file.first, opts);
     }
     const TString binTmpFolder = Settings_->BinaryTmpFolder.Get().GetOrElse(TString());
     if (localRun || !binTmpFolder) {
         for (auto& file: *DeferredUdfFiles_) {
             TAddLocalFileOptions opts;
-            if (!fakeChecksum && file.second) {
-                opts.MD5CheckSum(file.second);
+            if (!fakeChecksum && file.second.Hash) {
+                opts.MD5CheckSum(file.second.Hash);
             }
             YQL_ENSURE(TFileStat(file.first).Size != 0);
+            opts.BypassArtifactCache(file.second.BypassArtifactCache);
             spec.AddLocalFile(file.first, opts);
         }
     } else {
@@ -444,8 +446,8 @@ void TGatewayTransformer::ApplyUserJobSpec(NYT::TUserJobSpec& spec, bool localRu
         auto entry = GetEntry();
         for (auto& file: *DeferredUdfFiles_) {
             YQL_ENSURE(TFileStat(file.first).Size != 0);
-            auto snapshot = entry->GetBinarySnapshot(binTmpFolder, file.second, file.first, binExpiration);
-            spec.AddFile(TRichYPath(snapshot.first).TransactionId(snapshot.second).FileName(TFsPath(file.first).GetName()).Executable(true));
+            auto snapshot = entry->GetBinarySnapshot(binTmpFolder, file.second.Hash, file.first, binExpiration);
+            spec.AddFile(TRichYPath(snapshot.first).TransactionId(snapshot.second).FileName(TFsPath(file.first).GetName()).Executable(true).BypassArtifactCache(file.second.BypassArtifactCache));
         }
     }
     RemoteFiles_->clear();
@@ -482,9 +484,9 @@ void TGatewayTransformer::AddFile(TString alias,
             filePath = fileInfo.Path->GetPath();
             *UsedMem_ += fileInfo.InMemorySize;
             if (fileInfo.IsUdf) {
-                DeferredUdfFiles_->emplace_back(filePath, fileInfo.Path->GetMd5());
+                DeferredUdfFiles_->emplace_back(filePath, TLocalFileInfo{fileInfo.Path->GetMd5(), fileInfo.BypassArtifactCache});
             } else {
-                LocalFiles_->emplace_back(filePath, fileInfo.Path->GetMd5());
+                LocalFiles_->emplace_back(filePath, TLocalFileInfo{fileInfo.Path->GetMd5(), fileInfo.BypassArtifactCache});
             }
         } else {
             filePath = insertRes.first->second;
@@ -507,7 +509,7 @@ void TGatewayTransformer::AddFile(TString alias,
         }
         auto insertRes = UniqFiles_->insert({alias, remoteFile.Path_});
         if (insertRes.second) {
-            RemoteFiles_->push_back(remoteFile.Executable(true));
+            RemoteFiles_->push_back(remoteFile.Executable(true).BypassArtifactCache(fileInfo.BypassArtifactCache));
             if (fileInfo.RemoteMemoryFactor > 0.) {
                 *UsedMem_ += fileInfo.RemoteMemoryFactor * GetUncompressedFileSize(GetTx(), remoteFile.Path_).GetOrElse(ui64(1) << 10);
             }

+ 7 - 2
ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.h

@@ -48,6 +48,11 @@ public:
         All
     };
 
+    struct TLocalFileInfo {
+        TString Hash;
+        bool BypassArtifactCache;
+    };
+
     NKikimr::NMiniKQL::TCallableVisitFunc operator()(NKikimr::NMiniKQL::TInternName name);
 
     void SetTwoPhaseTransform() {
@@ -100,8 +105,8 @@ private:
     std::shared_ptr<THashMap<TString, TString>> JobUdfs_;
     std::shared_ptr<THashMap<TString, TString>> UniqFiles_;
     std::shared_ptr<TVector<NYT::TRichYPath>> RemoteFiles_;
-    std::shared_ptr<TVector<std::pair<TString, TString>>> LocalFiles_;
-    std::shared_ptr<TVector<std::pair<TString, TString>>> DeferredUdfFiles_;
+    std::shared_ptr<TVector<std::pair<TString, TLocalFileInfo>>> LocalFiles_;
+    std::shared_ptr<TVector<std::pair<TString, TLocalFileInfo>>> DeferredUdfFiles_;
 };
 
 } // NNative

+ 1 - 1
ydb/library/yql/sql/v1/query.cpp

@@ -2797,7 +2797,7 @@ public:
                 Node = L(Node, Values[i].Build());
             }
         }
-        else if (Name == TStringBuf("AddFileByUrl") || Name == TStringBuf("AddFolderByUrl") || Name == TStringBuf("ImportUdfs") || Name == TStringBuf("SetPackageVersion")) {
+        else if (Name == TStringBuf("AddFileByUrl") || Name == TStringBuf("SetFileOption") || Name == TStringBuf("AddFolderByUrl") || Name == TStringBuf("ImportUdfs") || Name == TStringBuf("SetPackageVersion")) {
             Node = L(Node, BuildQuotedAtom(Pos, Name));
             for (ui32 i = 0; i < Values.size(); ++i) {
                 Node = L(Node, Values[i].Build());

+ 10 - 0
ydb/library/yql/sql/v1/sql_query.cpp

@@ -1482,6 +1482,16 @@ TNodePtr TSqlQuery::PragmaStatement(const TRule_pragma_stmt& stmt, bool& success
             Ctx.IncrementMonCounter("sql_pragma", "file");
             success = true;
             return BuildPragma(Ctx.Pos(), TString(ConfigProviderName), "AddFileByUrl", values, false);
+        } else if (normalizedPragma == "fileoption") {
+            if (values.size() < 3U) {
+                Error() << "Expected file alias, option key and value";
+                Ctx.IncrementMonCounter("sql_errors", "BadPragmaValue");
+                return {};
+            }
+
+            Ctx.IncrementMonCounter("sql_pragma", "FileOption");
+            success = true;
+            return BuildPragma(Ctx.Pos(), TString(ConfigProviderName), "SetFileOption", values, false);
         } else if (normalizedPragma == "folder") {
             if (values.size() < 2U || values.size() > 3U || pragmaValueDefault) {
                 Error() << "Expected folder alias, url and optional token name as pragma values";