Browse Source

Intermediate changes

robot-piglet 1 year ago
parent
commit
69ca112108

+ 19 - 3
yt/yql/plugin/bridge/interface.h

@@ -82,10 +82,24 @@ struct TBridgeQueryFile
     EQueryFileContentType Type;
 };
 
+struct TBridgeAbortResult
+{
+    const char* YsonError = nullptr;
+    ssize_t YsonErrorLength = 0;
+};
+
 using TFuncBridgeFreeQueryResult = void(TBridgeQueryResult* result);
-using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* files, int fileCount);
+using TFuncBridgeRun = TBridgeQueryResult*(
+    TBridgeYqlPlugin* plugin,
+    const char* queryId,
+    const char* impersonationUser,
+    const char* queryText,
+    const char* settings,
+    const TBridgeQueryFile* files,
+    int fileCount);
 using TFuncBridgeGetProgress = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId);
-using TFuncBridgeAbort = void(TBridgeYqlPlugin* plugin, const char* queryId);
+using TFuncBridgeAbort = TBridgeAbortResult*(TBridgeYqlPlugin* plugin, const char* queryId);
+using TFuncBridgeFreeAbortResult = void(TBridgeAbortResult* result);
 
 ////////////////////////////////////////////////////////////////////////////////
 
@@ -95,6 +109,8 @@ using TFuncBridgeAbort = void(TBridgeYqlPlugin* plugin, const char* queryId);
     XX(BridgeFreeQueryResult) \
     XX(BridgeRun) \
     XX(BridgeGetProgress) \
-    XX(BridgeGetAbiVersion)
+    XX(BridgeGetAbiVersion) \
+    XX(BridgeAbort) \
+    XX(BridgeFreeAbortResult)
 
 ////////////////////////////////////////////////////////////////////////////////

+ 45 - 10
yt/yql/plugin/bridge/plugin.cpp

@@ -58,6 +58,12 @@ public:
                 } else { \
                     function = reinterpret_cast<TFunc ## function*>(Library_.Sym(#function)); \
                 } \
+            } else if constexpr(#function == TStringBuf("BridgeFreeAbortResult")) { \
+                if (AbiVersion_ < EYqlPluginAbiVersion::AbortQuery) { \
+                    function = reinterpret_cast<TFunc ## function*>(FreeAbortResultStub); \
+                } else { \
+                    function = reinterpret_cast<TFunc ## function*>(Library_.Sym(#function)); \
+                } \
             } else { \
                 function = reinterpret_cast<TFunc ## function*>(Library_.Sym(#function)); \
             } \
@@ -69,8 +75,6 @@ public:
         GetYqlPluginAbiVersion();
 
         FOR_EACH_BRIDGE_INTERFACE_FUNCTION(XX);
-        // COMPAT(gritukan): Remove after commit in YDB repository.
-        XX(BridgeAbort)
         #undef XX
     }
 
@@ -81,15 +85,18 @@ protected:
 
     #define XX(function) TFunc ## function* function;
     FOR_EACH_BRIDGE_INTERFACE_FUNCTION(XX)
-
-    // COMPAT(gritukan): Remove after commit in YDB repository.
-    XX(BridgeAbort)
     #undef XX
 
     // COMPAT(gritukan): AbortQuery
-    static void AbortQueryStub(TBridgeYqlPlugin* /*plugin*/, const char* /*queryId*/)
+    static TBridgeAbortResult* AbortQueryStub(TBridgeYqlPlugin* /*plugin*/, const char* /*queryId*/)
     {
         // Just do nothing. It is not worse than in used to be before.
+        return nullptr;
+    }
+
+    static void FreeAbortResultStub(TBridgeAbortResult* /*result*/)
+    {
+        YT_ABORT();
     }
 
     void GetYqlPluginAbiVersion()
@@ -138,7 +145,12 @@ public:
         BridgePlugin_ = BridgeCreateYqlPlugin(&bridgeOptions);
     }
 
-    TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, NYson::TYsonString settings, std::vector<TQueryFile> files) noexcept override
+    TQueryResult Run(
+        TQueryId queryId,
+        TString impersonationUser,
+        TString queryText,
+        NYson::TYsonString settings,
+        std::vector<TQueryFile> files) noexcept override
     {
         auto settingsString = settings ? settings.ToString() : "{}";
         auto queryIdStr = ToString(queryId);
@@ -155,8 +167,15 @@ public:
             });
         }
 
-        auto* bridgeQueryResult = BridgeRun(BridgePlugin_, queryIdStr.data(), impersonationUser.data(), queryText.data(), settingsString.data(), filesData.data(), filesData.size());
-        TQueryResult queryResult = {
+        auto* bridgeQueryResult = BridgeRun(
+            BridgePlugin_,
+            queryIdStr.data(),
+            impersonationUser.data(),
+            queryText.data(),
+            settingsString.data(),
+            filesData.data(),
+            filesData.size());
+        TQueryResult queryResult{
             .YsonResult = ToString(bridgeQueryResult->YsonResult, bridgeQueryResult->YsonResultLength),
             .Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength),
             .Statistics = ToString(bridgeQueryResult->Statistics, bridgeQueryResult->StatisticsLength),
@@ -172,7 +191,7 @@ public:
     {
         auto queryIdStr = ToString(queryId);
         auto* bridgeQueryResult = BridgeGetProgress(BridgePlugin_, queryIdStr.data());
-        TQueryResult queryResult = {
+        TQueryResult queryResult{
             .Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength),
             .Progress = ToString(bridgeQueryResult->Progress, bridgeQueryResult->ProgressLength),
         };
@@ -180,6 +199,22 @@ public:
         return queryResult;
     }
 
+    TAbortResult Abort(TQueryId queryId) noexcept override
+    {
+        auto queryIdStr = ToString(queryId);
+        auto* bridgeAbortResult = BridgeAbort(BridgePlugin_, queryIdStr.data());
+        // COMPAT(gritukan): AbortQuery
+        if (!bridgeAbortResult) {
+            return {};
+        }
+
+        TAbortResult abortResult{
+            .YsonError = ToString(bridgeAbortResult->YsonError, bridgeAbortResult->YsonErrorLength),
+        };
+        BridgeFreeAbortResult(bridgeAbortResult);
+        return abortResult;
+    }
+
     ~TYqlPlugin() override
     {
         BridgeFreeYqlPlugin(BridgePlugin_);

+ 2 - 0
yt/yql/plugin/dynamic/dylib.exports

@@ -5,6 +5,8 @@ BridgeFreeQueryResult
 BridgeRun
 BridgeGetProgress
 BridgeGetAbiVersion
+BridgeAbort
+BridgeFreeAbortResult
 
 # YQL <-> YQL UDFs interface.
 UdfAllocateWithSize

+ 26 - 2
yt/yql/plugin/dynamic/impl.cpp

@@ -12,7 +12,7 @@ extern "C" {
 
 ssize_t BridgeGetAbiVersion()
 {
-    return 0;
+    return 1; // EYqlPluginAbiVersion::AbortQuery
 }
 
 TBridgeYqlPlugin* BridgeCreateYqlPlugin(const TBridgeYqlPluginOptions* bridgeOptions)
@@ -68,7 +68,14 @@ void FillString(const char*& str, ssize_t& strLength, const std::optional<TStrin
     strLength = original->size();
 }
 
-TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* bridgeFiles, int bridgeFileCount)
+TBridgeQueryResult* BridgeRun(
+    TBridgeYqlPlugin* plugin,
+    const char* queryId,
+    const char* impersonationUser,
+    const char* queryText,
+    const char* settings,
+    const TBridgeQueryFile* bridgeFiles,
+    int bridgeFileCount)
 {
     static const auto EmptyMap = TYsonString(TString("{}"));
 
@@ -113,6 +120,23 @@ TBridgeQueryResult* BridgeGetProgress(TBridgeYqlPlugin* plugin, const char* quer
     return bridgeResult;
 }
 
+TBridgeAbortResult* BridgeAbort(TBridgeYqlPlugin* plugin, const char* queryId)
+{
+    auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
+    auto* bridgeResult = new TBridgeAbortResult;
+
+    auto result = nativePlugin->GetProgress(NYT::TGuid::FromString(queryId));
+    FillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError);
+
+    return bridgeResult;
+}
+
+void BridgeFreeAbortResult(TBridgeAbortResult* result)
+{
+    delete result->YsonError;
+    delete result;
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 
 // Validate that the all functions from the bridge interface are implemented with proper signatures.

+ 2 - 2
yt/yql/plugin/native/error_helpers.h

@@ -1,9 +1,9 @@
 #pragma once
 
-#include <util/generic/string.h>
-
 #include <ydb/library/yql/public/issue/yql_issue.h>
 
+#include <util/generic/string.h>
+
 namespace NYT::NYqlPlugin {
 
 ////////////////////////////////////////////////////////////////////////////////

+ 59 - 3
yt/yql/plugin/native/plugin.cpp

@@ -8,7 +8,7 @@
 #include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
 #include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
 
-#include "ydb/library/yql/providers/common/proto/gateways_config.pb.h"
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
 #include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
 #include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
 
@@ -40,11 +40,15 @@
 #include <library/cpp/yson/writer.h>
 
 #include <library/cpp/digest/md5/md5.h>
+
 #include <library/cpp/resource/resource.h>
 
 #include <util/folder/path.h>
+
 #include <util/stream/file.h>
+
 #include <util/string/builder.h>
+
 #include <util/system/fs.h>
 
 namespace NYT::NYqlPlugin {
@@ -72,6 +76,9 @@ struct TQueryPlan
 
 struct TActiveQuery
 {
+    NYql::TProgramPtr Program;
+    bool Compiled = false;
+
     TProgressMerger ProgressMerger;
     std::optional<TString> Plan;
 };
@@ -242,9 +249,19 @@ public:
         YQL_LOG(INFO) << "YQL plugin initialized";
     }
 
-    TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files)
+    TQueryResult GuardedRun(
+        TQueryId queryId,
+        TString impersonationUser,
+        TString queryText,
+        TYsonString settings,
+        std::vector<TQueryFile> files)
     {
         auto program = ProgramFactory_->Create("-memory-", queryText);
+        {
+            auto guard = WriterGuard(ProgressSpinLock);
+            ActiveQueriesProgress_[queryId].Program = program;
+        }
+
         program->AddCredentials({{"impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser)}});
         program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
 
@@ -291,6 +308,11 @@ public:
             };
         }
 
+        {
+            auto guard = WriterGuard(ProgressSpinLock);
+            ActiveQueriesProgress_[queryId].Compiled = true;
+        }
+
         NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error;
         status = program->RunWithConfig(impersonationUser, pipelineConfigurator);
 
@@ -327,7 +349,12 @@ public:
         };
     }
 
-    TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept override
+    TQueryResult Run(
+        TQueryId queryId,
+        TString impersonationUser,
+        TString queryText,
+        TYsonString settings,
+        std::vector<TQueryFile> files) noexcept override
     {
         try {
             return GuardedRun(queryId, impersonationUser, queryText, settings, files);
@@ -360,6 +387,35 @@ public:
         }
     }
 
+    TAbortResult Abort(TQueryId queryId) noexcept override
+    {
+        NYql::TProgramPtr program;
+        {
+            auto guard = WriterGuard(ProgressSpinLock);
+            if (!ActiveQueriesProgress_.contains(queryId)) {
+                return TAbortResult{
+                    .YsonError = MessageToYtErrorYson(Format("Query %v is not found", queryId)),
+                };
+            }
+            if (!ActiveQueriesProgress_[queryId].Compiled) {
+                return TAbortResult{
+                    .YsonError = MessageToYtErrorYson(Format("Query %v is not compiled", queryId)),
+                };
+            }
+            program = ActiveQueriesProgress_[queryId].Program;
+        }
+
+        try {
+            program->Abort().GetValueSync();
+        } catch (...) {
+            return TAbortResult{
+                .YsonError = MessageToYtErrorYson(Format("Failed to abort query %v: %v", queryId, CurrentExceptionMessage())),
+            };
+        }
+
+        return {};
+    }
+
 private:
     NYql::TFileStoragePtr FileStorage_;
     NYql::TExprContext ExprContext_;

+ 0 - 1
yt/yql/plugin/native/plugin.h

@@ -2,7 +2,6 @@
 
 #include <yt/yql/plugin/plugin.h>
 
-
 namespace NYT::NYqlPlugin {
 
 ////////////////////////////////////////////////////////////////////////////////

+ 21 - 10
yt/yql/plugin/plugin.h

@@ -2,21 +2,19 @@
 
 #include <yt/yql/plugin/bridge/interface.h>
 
-#include <util/generic/hash.h>
-#include <util/generic/string.h>
-
 #include <library/cpp/logger/log.h>
 
 #include <library/cpp/yt/string/guid.h>
 
 #include <library/cpp/yt/yson_string/string.h>
 
+#include <util/generic/hash.h>
+#include <util/generic/string.h>
+
 #include <optional>
 
 namespace NYT::NYqlPlugin {
 
-using namespace NYson;
-
 ////////////////////////////////////////////////////////////////////////////////
 
 using TQueryId = TGuid;
@@ -26,10 +24,10 @@ using TQueryId = TGuid;
 class TYqlPluginOptions
 {
 public:
-    TYsonString SingletonsConfig;
-    TYsonString GatewayConfig;
-    TYsonString FileStorageConfig;
-    TYsonString OperationAttributes;
+    NYson::TYsonString SingletonsConfig;
+    NYson::TYsonString GatewayConfig;
+    NYson::TYsonString FileStorageConfig;
+    NYson::TYsonString OperationAttributes;
 
     TString YTTokenPath;
 
@@ -57,6 +55,12 @@ struct TQueryFile
     EQueryFileContentType Type;
 };
 
+struct TAbortResult
+{
+    //! YSON representation of a YT error.
+    std::optional<TString> YsonError;
+};
+
 //! This interface encapsulates YT <-> YQL integration.
 //! There are two major implementation: one of them is based
 //! on YQL code and another wraps the pure C bridge interface, which
@@ -66,9 +70,16 @@ struct TQueryFile
 */
 struct IYqlPlugin
 {
-    virtual TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept = 0;
+    virtual TQueryResult Run(
+        TQueryId queryId,
+        TString impersonationUser,
+        TString queryText,
+        NYson::TYsonString settings,
+        std::vector<TQueryFile> files) noexcept = 0;
     virtual TQueryResult GetProgress(TQueryId queryId) noexcept = 0;
 
+    virtual TAbortResult Abort(TQueryId queryId) noexcept = 0;
+
     virtual ~IYqlPlugin() = default;
 };