123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- #include "service_table.h"
- #include <ydb/core/grpc_services/base/base.h>
- #include "rpc_calls.h"
- #include "rpc_kqp_base.h"
- #include "rpc_common.h"
- #include "service_table.h"
- #include <ydb/library/yql/public/issue/yql_issue_message.h>
- #include <ydb/library/yql/public/issue/yql_issue.h>
- namespace NKikimr {
- namespace NGRpcService {
- using namespace NActors;
- using namespace Ydb;
- using namespace NKqp;
- using TEvCommitTransactionRequest = TGrpcRequestOperationCall<Ydb::Table::CommitTransactionRequest,
- Ydb::Table::CommitTransactionResponse>;
- class TCommitTransactionRPC : public TRpcKqpRequestActor<TCommitTransactionRPC, TEvCommitTransactionRequest> {
- using TBase = TRpcKqpRequestActor<TCommitTransactionRPC, TEvCommitTransactionRequest>;
- public:
- TCommitTransactionRPC(IRequestOpCtx* msg)
- : TBase(msg) {}
- void Bootstrap(const TActorContext& ctx) {
- TBase::Bootstrap(ctx);
- CommitTransactionImpl(ctx);
- Become(&TCommitTransactionRPC::StateWork);
- }
- void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
- default: TBase::StateWork(ev, ctx);
- }
- }
- private:
- void CommitTransactionImpl(const TActorContext &ctx) {
- const auto req = GetProtoRequest();
- const auto traceId = Request_->GetTraceId();
- TString sessionId;
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
- SetAuthToken(ev, *Request_);
- SetDatabase(ev, *Request_);
- NYql::TIssues issues;
- if (CheckSession(req->session_id(), issues)) {
- ev->Record.MutableRequest()->SetSessionId(req->session_id());
- } else {
- return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
- }
- if (traceId) {
- ev->Record.SetTraceId(traceId.GetRef());
- }
- if (!req->tx_id()) {
- NYql::TIssues issues;
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty transaction id."));
- return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
- }
- ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_COMMIT_TX);
- ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(req->tx_id());
- ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
- ev->Record.MutableRequest()->SetStatsMode(GetKqpStatsMode(req->collect_stats()));
- ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
- }
- void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record.GetRef();
- SetCost(record.GetConsumedRu());
- AddServerHintsIfAny(record);
- if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
- const auto& kqpResponse = record.GetResponse();
- const auto& issueMessage = kqpResponse.GetQueryIssues();
- auto commitResult = TEvCommitTransactionRequest::AllocateResult<Ydb::Table::CommitTransactionResult>(Request_);
- if (kqpResponse.HasQueryStats()) {
- FillQueryStats(*commitResult->mutable_query_stats(), kqpResponse);
- }
- ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, *commitResult, ctx);
- } else {
- return OnGenericQueryResponseError(record, ctx);
- }
- }
- };
- void DoCommitTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
- TActivationContext::AsActorContext().Register(new TCommitTransactionRPC(p.release()));
- }
- } // namespace NGRpcService
- } // namespace NKikimr
|