Browse Source

refactor alter table: use scheme executer

gvit 1 year ago
parent
commit
3be69bf3de

+ 4 - 3
ydb/core/kqp/executer_actor/kqp_executer.h

@@ -94,9 +94,10 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
     const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
     TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
 
-IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database,
-    TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc,
-    bool temporary, TString SessionId);
+IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target,
+    const TMaybe<TString>& requestType, const TString& database,
+    TIntrusiveConstPtr<NACLib::TUserToken> userToken,
+    bool temporary, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx);
 
 std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(
     IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, const TIntrusivePtr<TUserRequestContext>& userRequestContext);

+ 2 - 1
ydb/core/kqp/executer_actor/kqp_executer_impl.cpp

@@ -35,6 +35,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPt
 
 void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch&& rows) {
     YQL_ENSURE(idx < TxResults.size());
+    YQL_ENSURE(AllocState);
     ResultRowsCount += rows.RowCount();
     ResultRowsBytes += rows.Size();
     auto guard = AllocState->TypeEnv.BindAllocator();
@@ -48,7 +49,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch
 }
 
 TEvKqpExecuter::TEvTxResponse::~TEvTxResponse() {
-    if (!TxResults.empty()) {
+    if (!TxResults.empty() && Y_LIKELY(AllocState)) {
         with_lock(AllocState->Alloc) {
             TxResults.crop(0);
         }

+ 24 - 13
ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

@@ -1,4 +1,5 @@
 #include "kqp_executer.h"
+#include "kqp_executer_impl.h"
 
 #include <ydb/core/kqp/gateway/actors/scheme.h>
 #include <ydb/core/kqp/gateway/local_rpc/helper.h>
@@ -8,15 +9,10 @@
 
 namespace NKikimr::NKqp {
 
-#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext,   NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << ". " << stream)
-#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext,  NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << ". " << stream)
-#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext,   NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << ". " << stream)
-
 using namespace NThreading;
 
 namespace {
 
-
 static bool CheckAlterAccess(const NACLib::TUserToken& userToken, const NSchemeCache::TSchemeCacheNavigate* navigate) {
     bool isDatabase = true; // first entry is always database
 
@@ -53,20 +49,22 @@ public:
         return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR;
     }
 
-    TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database,
-        TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc,
-        bool temporary, TString sessionId)
+    TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TMaybe<TString>& requestType, 
+        const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken,
+        bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx)
         : PhyTx(phyTx)
         , Target(target)
         , Database(database)
         , UserToken(userToken)
         , Temporary(temporary)
         , SessionId(sessionId)
+        , RequestContext(std::move(ctx))
+        , RequestType(requestType)
     {
         YQL_ENSURE(PhyTx);
         YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME);
 
-        ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(txAlloc);
+        ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(nullptr);
     }
 
     void StartBuildOperation() {
@@ -85,6 +83,10 @@ public:
             ev->Record.SetUserToken(UserToken->GetSerializedToken());
         }
 
+        if (RequestType) {
+            ev->Record.SetRequestType(*RequestType);
+        }
+
         const auto& schemeOp = PhyTx->GetSchemeOperation();
         switch (schemeOp.GetOperationCase()) {
             case NKqpProto::TKqpSchemeOperation::kCreateTable: {
@@ -123,7 +125,7 @@ public:
             case NKqpProto::TKqpSchemeOperation::kAlterTable: {
                 auto modifyScheme = schemeOp.GetAlterTable();
                 ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
-                return;
+                break;
             }
 
             case NKqpProto::TKqpSchemeOperation::kBuildOperation: {
@@ -269,6 +271,10 @@ public:
         IActor::PassAway();
     }
 
+    const TIntrusivePtr<TUserRequestContext>& GetUserRequestContext() const {
+        return RequestContext;
+    }
+
     void Handle(NSchemeShard::TEvIndexBuilder::TEvCreateResponse::TPtr& ev) {
         const auto& response = ev->Get()->Record;
         const auto status = response.GetStatus();
@@ -376,6 +382,7 @@ private:
     void UnexpectedEvent(const TString& state, ui32 eventType) {
         LOG_C("TKqpSchemeExecuter, unexpected event: " << eventType
             << ", at state:" << state << ", selfID: " << SelfId());
+
         InternalError(TStringBuilder() << "Unexpected event at TKqpSchemeExecuter, state: " << state
             << ", event: " << eventType);
     }
@@ -419,14 +426,18 @@ private:
     ui64 TxId = 0;
     TActorId SchemePipeActorId_;
     ui64 SchemeShardTabletId = 0;
+    TIntrusivePtr<TUserRequestContext> RequestContext;
+    const TMaybe<TString> RequestType;
 };
 
 } // namespace
 
-IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database,
-    TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc, bool temporary, TString sessionId)
+IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target,
+    const TMaybe<TString>& requestType, const TString& database,
+    TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool temporary, TString sessionId,
+    TIntrusivePtr<TUserRequestContext> ctx)
 {
-    return new TKqpSchemeExecuter(phyTx, target, database, userToken, txAlloc, temporary, sessionId);
+    return new TKqpSchemeExecuter(phyTx, target, requestType, database, userToken, temporary, sessionId, std::move(ctx));
 }
 
 } // namespace NKikimr::NKqp

+ 21 - 0
ydb/core/kqp/gateway/actors/scheme.h

@@ -135,6 +135,27 @@ public:
                         return;
                     }
 
+                    case NKikimrScheme::EStatus::StatusSchemeError: {
+                        Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_SCHEME_ERROR,
+                            response.GetSchemeShardReason(), {}));
+                        this->Die(ctx);
+                        return;
+                    }
+
+                    case NKikimrScheme::EStatus::StatusPreconditionFailed: {
+                        Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
+                            response.GetSchemeShardReason(), {}));
+                        this->Die(ctx);
+                        return;
+                    }
+
+                    case NKikimrScheme::EStatus::StatusInvalidParameter: {
+                        Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_BAD_REQUEST,
+                            response.GetSchemeShardReason(), {}));
+                        this->Die(ctx);
+                        return;
+                    }
+
                     default:
                         break;
                 }

+ 59 - 15
ydb/core/kqp/gateway/kqp_ic_gateway.cpp

@@ -467,6 +467,55 @@ private:
     TVector<NYql::NDqProto::TDqExecutionStats> Executions;
 };
 
+class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped<TKqpSchemeExecuterRequestHandler> {
+public:
+    using TResult = IKqpGateway::TGenericResult;
+
+    TKqpSchemeExecuterRequestHandler(TKqpPhyTxHolder::TConstPtr phyTx, const TMaybe<TString>& requestType, const TString& database,
+        TIntrusiveConstPtr<NACLib::TUserToken> userToken, TPromise<TResult> promise)
+        : PhyTx(std::move(phyTx))
+        , Database(database)
+        , UserToken(std::move(userToken))
+        , Promise(promise)
+        , RequestType(requestType)
+    {}
+
+    void Bootstrap() {
+        auto ctx = MakeIntrusive<TUserRequestContext>();
+        IActor* actor = CreateKqpSchemeExecuter(PhyTx, SelfId(), RequestType, Database, UserToken, false /* temporary */, TString() /* sessionId */, ctx);
+        Register(actor);
+        Become(&TThis::WaitState);
+    }
+
+    STATEFN(WaitState) {
+        switch(ev->GetTypeRewrite()) {
+            hFunc(TEvKqpExecuter::TEvTxResponse, Handle);
+        }
+    }
+
+    void Handle(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
+        auto* response = ev->Get()->Record.MutableResponse();
+
+        TResult result;
+        if (response->GetStatus() == Ydb::StatusIds::SUCCESS) {
+            result.SetSuccess();
+        } else {
+            for (auto& issue : response->GetIssues()) {
+                result.AddIssue(NYql::IssueFromMessage(issue));
+            }
+        }
+
+        Promise.SetValue(result);
+        this->PassAway();
+    }
+
+private:
+    TKqpPhyTxHolder::TConstPtr PhyTx;
+    const TString Database;
+    TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+    TPromise<TResult> Promise;
+    const TMaybe<TString> RequestType;
+};
 
 class TKqpExecLiteralRequestHandler: public TActorBootstrapped<TKqpExecLiteralRequestHandler> {
 public:
@@ -798,23 +847,11 @@ public:
         }
     }
 
-    TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
-        const TMaybe<TString>& requestType, ui64 flags) override
+    TFuture<TGenericResult> AlterTable(const TString&, Ydb::Table::AlterTableRequest&&, const TMaybe<TString>&, ui64) override
     {
         try {
-            YQL_ENSURE(!flags); //Supported only for prepared mode
-            if (!CheckCluster(cluster)) {
-                return InvalidCluster<TGenericResult>(cluster);
-            }
-
-            // FIXME: should be defined in grpc_services/rpc_calls.h, but cause cyclic dependency
-            using namespace NGRpcService;
-            using TEvAlterTableRequest = TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest,
-                Ydb::Table::AlterTableResponse>;
-
-            return SendLocalRpcRequestNoResult<TEvAlterTableRequest>(std::move(req), Database, GetTokenCompat(), requestType);
-        }
-        catch (yexception& e) {
+            YQL_ENSURE(false, "gateway doesn't implement alter");
+        } catch (yexception& e) {
             return MakeFuture(ResultFromException<TGenericResult>(e));
         }
     }
@@ -2075,6 +2112,13 @@ private:
         return promise.GetFuture();
     }
 
+    TFuture<TGenericResult> SendSchemeExecuterRequest(const TString&, const TMaybe<TString>& requestType, const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder>& phyTx) override {
+        auto promise = NewPromise<TGenericResult>();
+        IActor* requestHandler = new TKqpSchemeExecuterRequestHandler(phyTx, requestType, Database, UserToken, promise);
+        RegisterActor(requestHandler);
+        return promise.GetFuture();
+    }
+
     template<typename TRpc>
     TFuture<TGenericResult> SendLocalRpcRequestNoResult(typename TRpc::TRequest&& proto, const TString& databse, const TString& token, const TMaybe<TString>& requestType = {}) {
         return NRpcService::DoLocalRpc<TRpc>(std::move(proto), databse, token, requestType, ActorSystem).Apply([](NThreading::TFuture<typename TRpc::TResponse> future) {

+ 72 - 11
ydb/core/kqp/host/kqp_gateway_proxy.cpp

@@ -544,18 +544,10 @@ public:
         return tablePromise.GetFuture();
     }
 
-    TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
-        const TMaybe<TString>& requestType, ui64 flags) override
+    TFuture<TGenericResult> PrepareAlterTable(const TString&, Ydb::Table::AlterTableRequest&& req,
+        const TMaybe<TString>&, ui64 flags)
     {
-        CHECK_PREPARED_DDL(AlterTable);
-
-        if (!IsPrepare()) {
-            return Gateway->AlterTable(cluster, std::move(req), requestType, flags);
-        }
-        auto &phyQuery =
-            *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
-        auto &phyTx = *phyQuery.AddTransactions();
-        phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+        YQL_ENSURE(SessionCtx->Query().PreparingQuery);
         auto promise = NewPromise<TGenericResult>();
         const auto ops = GetAlterOperationKinds(&req);
         if (ops.size() != 1) {
@@ -571,6 +563,10 @@ public:
         const auto opType = *ops.begin();
         auto tablePromise = NewPromise<TGenericResult>();
         if (opType == EAlterOperationKind::AddIndex) {
+            auto &phyQuery =
+                *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+            auto &phyTx = *phyQuery.AddTransactions();
+            phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
             auto buildOp = phyTx.MutableSchemeOperation()->MutableBuildOperation();
             Ydb::StatusIds::StatusCode code;
             TString error;
@@ -602,6 +598,8 @@ public:
                 auto &phyQuery =
                     *sessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
                 auto &phyTx = *phyQuery.AddTransactions();
+                phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+
                 auto alter = phyTx.MutableSchemeOperation()->MutableAlterTable();
                 const TPathId invalidPathId;
                 Ydb::StatusIds::StatusCode code;
@@ -621,6 +619,69 @@ public:
         return tablePromise.GetFuture();
     }
 
+    TFuture<TGenericResult> SendSchemeExecuterRequest(const TString& cluster, const TMaybe<TString>& requestType,
+        const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder> &phyTx) override
+    {
+        return Gateway->SendSchemeExecuterRequest(cluster, requestType, phyTx);
+    }
+
+    TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
+        const TMaybe<TString>& requestType, ui64 flags) override
+    {
+        CHECK_PREPARED_DDL(AlterTable);
+
+        auto tablePromise = NewPromise<TGenericResult>();
+    
+        if (!IsPrepare()) {
+            SessionCtx->Query().PrepareOnly = false;
+            if (SessionCtx->Query().PreparingQuery) {
+                auto code = Ydb::StatusIds::BAD_REQUEST;
+                auto error = TStringBuilder() << "multiple transactions are not supported for alter table operation.";
+                IKqpGateway::TGenericResult errResult;
+                errResult.AddIssue(NYql::TIssue(error));
+                errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
+                tablePromise.SetValue(errResult);
+                return tablePromise.GetFuture();
+            }
+
+            SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
+        }
+
+        auto prepareFuture = PrepareAlterTable(cluster, std::move(req), requestType, flags);
+        if (IsPrepare())
+            return prepareFuture;
+
+        auto sessionCtx = SessionCtx;
+        auto gateway = Gateway;
+        prepareFuture.Subscribe([cluster, requestType, tablePromise, sessionCtx, gateway](const TFuture<IKqpGateway::TGenericResult> &future) mutable {
+            auto result = future.GetValue();
+            TPreparedQueryHolder::TConstPtr preparedQuery = std::make_shared<TPreparedQueryHolder>(sessionCtx->Query().PreparingQuery.release(), nullptr);
+            if (result.Success()) {
+                auto executeFuture = gateway->SendSchemeExecuterRequest(cluster, requestType, preparedQuery->GetPhyTx(0));
+                executeFuture.Subscribe([tablePromise](const TFuture<IKqpGateway::TGenericResult> &future) mutable {
+                    auto fresult = future.GetValue();
+                    if (fresult.Success()) {
+                        TGenericResult result;
+                        result.SetSuccess();
+                        tablePromise.SetValue(result);
+                    } else {
+                        tablePromise.SetValue(
+                            ResultFromIssues<TGenericResult>(fresult.Status(), fresult.Issues())
+                        );
+                    }
+                });
+                return;
+            } else {
+                tablePromise.SetValue(ResultFromIssues<TGenericResult>(
+                    result.Status(), result.Issues()));
+
+                return;
+            }
+        });
+
+        return tablePromise.GetFuture();
+    }
+
     TFuture<TGenericResult> RenameTable(const TString& src, const TString& dst, const TString& cluster) override {
         FORWARD_ENSURE_NO_PREPARE(RenameTable, src, dst, cluster);
     }

+ 9 - 0
ydb/core/kqp/provider/yql_kikimr_gateway.h

@@ -16,6 +16,7 @@
 #include <ydb/services/metadata/manager/abstract.h>
 
 #include <ydb/core/kqp/query_data/kqp_query_data.h>
+#include <ydb/core/kqp/query_data/kqp_prepared_query.h>
 #include <ydb/core/protos/flat_scheme_op.pb.h>
 #include <ydb/core/protos/kqp.pb.h>
 #include <ydb/core/scheme/scheme_types_proto.h>
@@ -28,6 +29,10 @@ namespace NKikimr {
     namespace NMiniKQL {
         class IFunctionRegistry;
     }
+
+    namespace NKqp {
+        class TKqpPhyTxHolder;
+    }
 }
 
 namespace NYql {
@@ -767,6 +772,10 @@ public:
 
     virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;
 
+    virtual NThreading::TFuture<TGenericResult> SendSchemeExecuterRequest(const TString& cluster,
+        const TMaybe<TString>& requestType,
+        const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder> &phyTx) = 0;
+
     virtual NThreading::TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
         const TMaybe<TString>& requestType, ui64 flags) = 0;
 

+ 7 - 1
ydb/core/kqp/query_data/kqp_prepared_query.cpp

@@ -74,6 +74,7 @@ TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPrepar
         const auto& txResult = Proto->GetResults(i);
         auto& result = TxResultsMeta[i];
 
+        YQL_ENSURE(Alloc);
         result.MkqlItemType = ImportTypeFromProto(txResult.GetItemType(), Alloc->TypeEnv);
         //Hack to prevent data race. Side effect of IsPresortSupported - fill cached value.
         //So no more concurent write subsequently
@@ -107,9 +108,14 @@ const NKikimr::NKqp::TStagePredictor& TKqpPhyTxHolder::GetCalculationPredictor(c
 TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto,
     const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry)
     : Proto(proto)
-    , Alloc(std::move(std::make_shared<TPreparedQueryAllocHolder>(functionRegistry)))
+    , Alloc(nullptr)
     , TableConstInfoById(MakeIntrusive<TTableConstInfoMap>())
 {
+
+    if (functionRegistry) {
+        Alloc = std::make_shared<TPreparedQueryAllocHolder>(functionRegistry);
+    }
+
     THashSet<TString> tablesSet;
     const auto& phyQuery = Proto->GetPhysicalQuery();
     Transactions.reserve(phyQuery.TransactionsSize());

+ 4 - 0
ydb/core/kqp/session_actor/kqp_query_state.h

@@ -139,6 +139,10 @@ public:
         return RequestEv->GetSyntax();
     }
 
+    const TString& GetRequestType() const {
+        return RequestEv->GetRequestType();
+    }
+
     std::shared_ptr<std::map<TString, Ydb::Type>> GetQueryParameterTypes() const {
         return QueryParameterTypes;
     }

+ 3 - 2
ydb/core/kqp/session_actor/kqp_session_actor.cpp

@@ -1040,10 +1040,11 @@ public:
 
     void SendToSchemeExecuter(const TKqpPhyTxHolder::TConstPtr& tx) {
         auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>();
+        const TString requestType = QueryState ? QueryState->GetRequestType() : TString(); 
         bool temporary = GetTemporaryTableInfo(tx).has_value();
 
-        auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), Settings.Database, userToken,
-            QueryState->TxCtx->TxAlloc, temporary, *TempTablesState.SessionId);
+        auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), requestType, Settings.Database, userToken,
+            temporary, *TempTablesState.SessionId, QueryState->UserRequestContext);
 
         ExecuterId = RegisterWithSameMailbox(executerActor);
     }

Some files were not shown because too many files changed in this diff