Browse Source

unknown data source has been fixed

hcpp 1 year ago
parent
commit
bd8c060132

+ 50 - 46
ydb/core/fq/libs/actors/run_actor.cpp

@@ -183,64 +183,68 @@ public:
     }
 
     void Bootstrap() {
-        TProgramFactory progFactory(false, FunctionRegistry, NextUniqueId, DataProvidersInit, "yq");
-        progFactory.SetModules(ModuleResolver);
-        progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FunctionRegistry, nullptr));
-        progFactory.SetGatewaysConfig(&GatewaysConfig);
+        try {
+            TProgramFactory progFactory(false, FunctionRegistry, NextUniqueId, DataProvidersInit, "yq");
+            progFactory.SetModules(ModuleResolver);
+            progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FunctionRegistry, nullptr));
+            progFactory.SetGatewaysConfig(&GatewaysConfig);
 
-        Program = progFactory.Create("-stdin-", Sql, SessionId);
-        Program->EnableResultPosition();
+            Program = progFactory.Create("-stdin-", Sql, SessionId);
+            Program->EnableResultPosition();
 
-        // parse phase
-        {
-            if (!Program->ParseSql(SqlSettings)) {
-                SendStatusAndDie(TProgram::TStatus::Error, "Failed to parse query");
-                return;
-            }
+            // parse phase
+            {
+                if (!Program->ParseSql(SqlSettings)) {
+                    SendStatusAndDie(TProgram::TStatus::Error, "Failed to parse query");
+                    return;
+                }
 
-            if (ExecuteMode == FederatedQuery::ExecuteMode::PARSE) {
-                SendStatusAndDie(TProgram::TStatus::Ok);
-                return;
+                if (ExecuteMode == FederatedQuery::ExecuteMode::PARSE) {
+                    SendStatusAndDie(TProgram::TStatus::Ok);
+                    return;
+                }
             }
-        }
 
-        // compile phase
-        {
-            if (!Program->Compile("")) {
-                SendStatusAndDie(TProgram::TStatus::Error, "Failed to compile query");
-                return;
+            // compile phase
+            {
+                if (!Program->Compile("")) {
+                    SendStatusAndDie(TProgram::TStatus::Error, "Failed to compile query");
+                    return;
+                }
+
+                if (ExecuteMode == FederatedQuery::ExecuteMode::COMPILE) {
+                    SendStatusAndDie(TProgram::TStatus::Ok);
+                    return;
+                }
             }
 
-            if (ExecuteMode == FederatedQuery::ExecuteMode::COMPILE) {
-                SendStatusAndDie(TProgram::TStatus::Ok);
+            Compiled = true;
+
+            // next phases can be async: optimize, validate, run
+            TProgram::TFutureStatus futureStatus;
+            switch (ExecuteMode) {
+            case FederatedQuery::ExecuteMode::EXPLAIN:
+                futureStatus = Program->OptimizeAsyncWithConfig("", TraceOptPipelineConfigurator);
+                break;
+            case FederatedQuery::ExecuteMode::VALIDATE:
+                futureStatus = Program->ValidateAsync("");
+                break;
+            case FederatedQuery::ExecuteMode::RUN:
+                futureStatus = Program->RunAsyncWithConfig("", TraceOptPipelineConfigurator);
+                break;
+            default:
+                SendStatusAndDie(TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast<int>(ExecuteMode));
                 return;
             }
-        }
 
-        Compiled = true;
+            futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) {
+                actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f));
+            });
 
-        // next phases can be async: optimize, validate, run
-        TProgram::TFutureStatus futureStatus;
-        switch (ExecuteMode) {
-        case FederatedQuery::ExecuteMode::EXPLAIN:
-            futureStatus = Program->OptimizeAsyncWithConfig("", TraceOptPipelineConfigurator);
-            break;
-        case FederatedQuery::ExecuteMode::VALIDATE:
-            futureStatus = Program->ValidateAsync("");
-            break;
-        case FederatedQuery::ExecuteMode::RUN:
-            futureStatus = Program->RunAsyncWithConfig("", TraceOptPipelineConfigurator);
-            break;
-        default:
-            SendStatusAndDie(TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast<int>(ExecuteMode));
-            return;
+            Become(&TProgramRunnerActor::StateFunc);
+        } catch (...) {
+            SendStatusAndDie(TProgram::TStatus::Error, CurrentExceptionMessage());
         }
-
-        futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) {
-            actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f));
-        });
-
-        Become(&TProgramRunnerActor::StateFunc);
     }
 
     void SendStatusAndDie(NYql::TProgram::TStatus status, const TString& message = "") {

+ 37 - 0
ydb/tests/fq/common/conftest.py

@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import pytest
+
+from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
+from ydb.tests.tools.fq_runner.custom_hooks import *  # noqa: F401,F403 Adding custom hooks for YQv2 support
+from ydb.tests.tools.fq_runner.kikimr_utils import AddInflightExtension
+from ydb.tests.tools.fq_runner.kikimr_utils import AddDataInflightExtension
+from ydb.tests.tools.fq_runner.kikimr_utils import AddFormatSizeLimitExtension
+from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension
+from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
+from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension
+from ydb.tests.tools.fq_runner.kikimr_utils import StatsModeExtension
+from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr
+
+
+@pytest.fixture
+def kikimr(request: pytest.FixtureRequest, yq_version: str):
+    kikimr_extensions = [AddInflightExtension(),
+                         AddDataInflightExtension(),
+                         AddFormatSizeLimitExtension(),
+                         DefaultConfigExtension(''),
+                         YQv2Extension(yq_version),
+                         ComputeExtension(),
+                         StatsModeExtension('')]
+    with start_kikimr(request, kikimr_extensions) as kikimr:
+        yield kikimr
+
+
+@pytest.fixture
+def client(kikimr, request=None):
+    client = FederatedQueryClient(request.param["folder_id"]
+                                  if request is not None
+                                  else "my_folder",
+                                  streaming_over_kikimr=kikimr)
+    return client

+ 30 - 0
ydb/tests/fq/common/test_unknown_data_source.py

@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import pytest
+import ydb.public.api.protos.draft.fq_pb2 as fq
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_all
+
+
+class TestUnknownDataSource:
+    @yq_all
+    @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
+    def test_should_fail_unknown_data_source(self, kikimr, client, yq_version):
+        kikimr.control_plane.wait_bootstrap(1)
+        sql = R'''
+            $h = "hahn";
+
+            SELECT
+                *
+            FROM kikimr:$h.`home/yql/tutorial/users`
+            LIMIT 100;
+            '''
+
+        query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+        client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+        issues = str(client.describe_query(query_id).result.query.issue)
+
+        if yq_version == 'v1':
+            assert "Unknown DataSource: kikimr" in issues, "Incorrect Issues: " + issues
+        else:
+            assert "ATOM evaluation is not supported in YDB queries." in issues, "Incorrect Issues: " + issues

+ 28 - 0
ydb/tests/fq/common/ya.make

@@ -0,0 +1,28 @@
+PY3TEST()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc)
+
+PEERDIR(
+    library/python/testing/yatest_common
+)
+
+TEST_SRCS(
+    test_unknown_data_source.py
+)
+
+PY_SRCS(
+    conftest.py
+)
+
+IF (SANITIZER_TYPE == "thread")
+    TIMEOUT(2400)
+    SIZE(LARGE)
+    TAG(ya:fat)
+ELSE()
+    TIMEOUT(600)
+    SIZE(MEDIUM)
+ENDIF()
+
+REQUIREMENTS(ram:16)
+
+END()

+ 1 - 0
ydb/tests/fq/ya.make

@@ -1,4 +1,5 @@
 RECURSE_FOR_TESTS(
+    common
     http_api
     mem_alloc
     multi_plane