Browse Source

Forward compatibility with TEvQueryRequest message

dcherednik 2 years ago
parent
commit
8a9b85e341

+ 17 - 0
ydb/core/kqp/kqp_session_actor.cpp

@@ -333,6 +333,23 @@ public:
         YQL_ENSURE(event.GetRequest().GetSessionId() == SessionId,
                 "Invalid session, expected: " << SessionId << ", got: " << event.GetRequest().GetSessionId());
 
+        if (event.HasYdbStatus()) {
+            if (event.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+                NYql::TIssues issues;
+                NYql::IssuesFromMessage(event.GetQueryIssues(), issues);
+                TString errMsg = issues.ToString();
+
+                LOG_N(TKqpRequestInfo("", SessionId)
+                    << "Got invalid query request, reply with status: "
+                    << event.GetYdbStatus()
+                    << " msg: "
+                    << errMsg <<".");
+                ReplyProcessError(ev->Sender, proxyRequestId, event.GetYdbStatus(), errMsg);
+                FinalCleanup();
+                return;
+            }
+        }
+
         if (ShutdownState && ShutdownState->SoftTimeoutReached()) {
             // we reached the soft timeout, so at this point we don't allow to accept new queries for session.
             LOG_N("system shutdown requested: soft timeout reached, no queries can be accepted");

+ 39 - 0
ydb/core/kqp/proxy/kqp_proxy_ut.cpp

@@ -1,4 +1,5 @@
 #include <ydb/core/base/tablet.h>
+#include <ydb/core/base/kikimr_issue.h>
 #include <ydb/core/kqp/ut/common/kqp_ut_common.h>
 #include <ydb/core/kqp/proxy/kqp_proxy_service.h>
 #include <ydb/core/kqp/kqp.h>
@@ -97,6 +98,44 @@ Y_UNIT_TEST_SUITE(KqpProxy) {
         SendBadRequestToSession("ydb://session/1?id=ZjY5NWRlM2EtYWMyYjA5YWEtNzQ0MTVlYTMtM2Q4ZDgzOWQ=&node_id=eqweq");
     }
 
+    Y_UNIT_TEST(PassErrroViaSessionActor) {
+        TPortManager tp;
+
+        ui16 mbusport = tp.GetPort(2134);
+        auto settings = Tests::TServerSettings(mbusport);
+
+        Tests::TServer server(settings);
+        Tests::TClient client(settings);
+
+        server.GetRuntime()->SetLogPriority(NKikimrServices::KQP_PROXY, NActors::NLog::PRI_DEBUG);
+        client.InitRootScheme();
+        auto runtime = server.GetRuntime();
+
+        TActorId kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+        TActorId sender = runtime->AllocateEdgeActor();
+
+        auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+        //ev->Record.MutableRequest()->SetSessionId(sessionId);
+        ev->Record.SetYdbStatus(Ydb::StatusIds::BAD_REQUEST);
+        auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "SomeUniqTextForUt");
+
+        NYql::TIssues issues;
+        issues.AddIssue(issue);
+        NYql::IssuesToMessage(issues, ev->Record.MutableQueryIssues());
+
+        ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+        ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT);
+        ev->Record.MutableRequest()->SetQuery("SELECT 1; COMMIT;");
+        ev->Record.MutableRequest()->SetKeepSession(true);
+        ev->Record.MutableRequest()->SetTimeoutMs(10);
+
+        runtime->Send(new IEventHandle(kqpProxy, sender, ev.Release()));
+        TAutoPtr<IEventHandle> handle;
+        auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvProcessResponse>(sender);
+        UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetYdbStatus(), Ydb::StatusIds::BAD_REQUEST);
+        UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetError(), "<main>: Error: SomeUniqTextForUt\n");
+    }
+
     Y_UNIT_TEST(LoadedMetadataAfterCompilationTimeout) {
 
         TPortManager tp;

+ 3 - 0
ydb/core/protos/kqp.proto

@@ -164,6 +164,9 @@ message TEvQueryRequest {
     optional NActorsProto.TActorId RequestActorId = 4;
     optional string RequestType = 5;
     optional TRlPath RlPath = 6;
+
+    repeated Ydb.Issue.IssueMessage QueryIssues = 7;
+    optional Ydb.StatusIds.StatusCode YdbStatus = 8;
 }
 
 message TMkqlProfile {