rpc_commit_transaction.cpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #include "service_table.h"
  2. #include <ydb/core/grpc_services/base/base.h>
  3. #include "rpc_calls.h"
  4. #include "rpc_kqp_base.h"
  5. #include "rpc_common.h"
  6. #include "service_table.h"
  7. #include <ydb/library/yql/public/issue/yql_issue_message.h>
  8. #include <ydb/library/yql/public/issue/yql_issue.h>
  9. namespace NKikimr {
  10. namespace NGRpcService {
  11. using namespace NActors;
  12. using namespace Ydb;
  13. using namespace NKqp;
  14. using TEvCommitTransactionRequest = TGrpcRequestOperationCall<Ydb::Table::CommitTransactionRequest,
  15. Ydb::Table::CommitTransactionResponse>;
  16. class TCommitTransactionRPC : public TRpcKqpRequestActor<TCommitTransactionRPC, TEvCommitTransactionRequest> {
  17. using TBase = TRpcKqpRequestActor<TCommitTransactionRPC, TEvCommitTransactionRequest>;
  18. public:
  19. TCommitTransactionRPC(IRequestOpCtx* msg)
  20. : TBase(msg) {}
  21. void Bootstrap(const TActorContext& ctx) {
  22. TBase::Bootstrap(ctx);
  23. CommitTransactionImpl(ctx);
  24. Become(&TCommitTransactionRPC::StateWork);
  25. }
  26. void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
  27. switch (ev->GetTypeRewrite()) {
  28. HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
  29. default: TBase::StateWork(ev, ctx);
  30. }
  31. }
  32. private:
  33. void CommitTransactionImpl(const TActorContext &ctx) {
  34. const auto req = GetProtoRequest();
  35. const auto traceId = Request_->GetTraceId();
  36. TString sessionId;
  37. auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
  38. SetAuthToken(ev, *Request_);
  39. SetDatabase(ev, *Request_);
  40. NYql::TIssues issues;
  41. if (CheckSession(req->session_id(), issues)) {
  42. ev->Record.MutableRequest()->SetSessionId(req->session_id());
  43. } else {
  44. return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
  45. }
  46. if (traceId) {
  47. ev->Record.SetTraceId(traceId.GetRef());
  48. }
  49. if (!req->tx_id()) {
  50. NYql::TIssues issues;
  51. issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty transaction id."));
  52. return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
  53. }
  54. ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_COMMIT_TX);
  55. ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(req->tx_id());
  56. ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
  57. ev->Record.MutableRequest()->SetStatsMode(GetKqpStatsMode(req->collect_stats()));
  58. ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
  59. }
  60. void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
  61. const auto& record = ev->Get()->Record.GetRef();
  62. SetCost(record.GetConsumedRu());
  63. AddServerHintsIfAny(record);
  64. if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
  65. const auto& kqpResponse = record.GetResponse();
  66. const auto& issueMessage = kqpResponse.GetQueryIssues();
  67. auto commitResult = TEvCommitTransactionRequest::AllocateResult<Ydb::Table::CommitTransactionResult>(Request_);
  68. if (kqpResponse.HasQueryStats()) {
  69. FillQueryStats(*commitResult->mutable_query_stats(), kqpResponse);
  70. }
  71. ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, *commitResult, ctx);
  72. } else {
  73. return OnGenericQueryResponseError(record, ctx);
  74. }
  75. }
  76. };
  77. void DoCommitTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
  78. TActivationContext::AsActorContext().Register(new TCommitTransactionRPC(p.release()));
  79. }
  80. } // namespace NGRpcService
  81. } // namespace NKikimr