Browse Source

KIKIMR-17412: add tests for stream yql query

shumkovnd 1 year ago
parent
commit
3424811d47

+ 29 - 6
ydb/core/kqp/ut/common/kqp_ut_common.cpp

@@ -727,15 +727,25 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
         if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>) {
             UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(),
                 "Unexpected empty scan query response.");
+
+            if (streamPart.HasResultSet()) {
+                auto resultSet = streamPart.ExtractResultSet();
+                PrintResultSet(resultSet, resultSetWriter);
+                res.RowsCount += resultSet.RowsCount();
+            }
         }
 
-        if (streamPart.HasResultSet()) {
-            auto resultSet = streamPart.ExtractResultSet();
-            PrintResultSet(resultSet, resultSetWriter);
-            res.RowsCount += resultSet.RowsCount();
+        if constexpr (std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) {
+            if (streamPart.HasPartialResult()) {
+                const auto& partialResult = streamPart.GetPartialResult();
+                const auto& resultSet = partialResult.GetResultSet();
+                PrintResultSet(resultSet, resultSetWriter);
+                res.RowsCount += resultSet.RowsCount();
+            }
         }
 
-        if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>) {
+        if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>
+                || std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) {
             if (streamPart.HasQueryStats() ) {
                 res.QueryStats = NYdb::TProtoAccessor::GetProto(streamPart.GetQueryStats());
 
@@ -757,10 +767,14 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
     return res;
 }
 
-TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it) {
+template<typename TIterator>
+TCollectedStreamResult CollectStreamResult(TIterator& it) {
     return CollectStreamResultImpl(it);
 }
 
+template TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it);
+template TCollectedStreamResult CollectStreamResult(NYdb::NScripting::TYqlResultPartIterator& it);
+
 TString ReadTableToYson(NYdb::NTable::TSession session, const TString& table) {
     TReadTableSettings settings;
     settings.Ordered(true);
@@ -871,6 +885,15 @@ NJson::TJsonValue FindPlanNodeByKv(const NJson::TJsonValue& plan, const TString&
                 }
             }
         }
+
+        if (map.contains("queries")) {
+            for (const auto &node : map["queries"].GetArraySafe()) {
+                auto op = FindPlanNodeByKv(node, key, value);
+                if (op.IsDefined()) {
+                    return op;
+                }
+            }
+        }
     } else {
         Y_ASSERT(false);
     }

+ 2 - 1
ydb/core/kqp/ut/common/kqp_ut_common.h

@@ -179,7 +179,8 @@ struct TCollectedStreamResult {
     ui64 RowsCount = 0;
 };
 
-TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it);
+template<typename TIterator>
+TCollectedStreamResult CollectStreamResult(TIterator& it);
 
 enum class EIndexTypeSql {
     Global,

+ 89 - 36
ydb/core/kqp/ut/query/kqp_stats_ut.cpp

@@ -4,6 +4,7 @@
 #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
 
 #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
 
 #include <cstdlib>
 
@@ -15,17 +16,39 @@ using namespace NYdb::NTable;
 
 Y_UNIT_TEST_SUITE(KqpStats) {
 
-Y_UNIT_TEST(MultiTxStatsFullExp) {
-    auto kikimr = DefaultKikimrRunner();
+auto GetYqlStreamIterator(
+        TKikimrRunner& kikimr,
+        ECollectQueryStatsMode mode,
+        const TString& query) {
+    NYdb::NScripting::TExecuteYqlRequestSettings settings;
+    settings.CollectQueryStats(mode);
+
+    NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
+
+    auto it = client.StreamExecuteYqlScript(query, settings).GetValueSync();
+    return it;
+}
+
+auto GetScanStreamIterator(
+        TKikimrRunner& kikimr,
+        ECollectQueryStatsMode mode,
+        const TString& query) {
     auto db = kikimr.GetTableClient();
 
     TStreamExecScanQuerySettings settings;
-    settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
+    settings.CollectQueryStats(mode);
 
-    auto it = db.StreamExecuteScanQuery(R"(
-        SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4;
-    )", settings).GetValueSync();
+    auto it = db.StreamExecuteScanQuery(query, settings).GetValueSync();
+    return it;
+}
 
+template <typename Iterator>
+void MultiTxStatsFullExp(
+        std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
+    auto kikimr = DefaultKikimrRunner();
+    auto it = getIter(kikimr, ECollectQueryStatsMode::Profile, R"(
+        SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4;
+    )");
     auto res = CollectStreamResult(it);
     CompareYson(R"([
         [[1];[202u];["Value2"]];
@@ -34,7 +57,6 @@ Y_UNIT_TEST(MultiTxStatsFullExp) {
     ])", res.ResultSetYson);
 
     UNIT_ASSERT(res.PlanJson);
-    Cerr << *res.PlanJson << Endl;
     NJson::TJsonValue plan;
     NJson::ReadJsonTree(*res.PlanJson, &plan, true);
     auto node = FindPlanNodeByKv(plan, "Node Type", "TopSort-TableRangeScan");
@@ -44,16 +66,21 @@ Y_UNIT_TEST(MultiTxStatsFullExp) {
     UNIT_ASSERT_EQUAL(node.GetMap().at("Stats").GetMapSafe().at("TotalTasks").GetIntegerSafe(), 2);
 }
 
-Y_UNIT_TEST(JoinNoStats) {
-    auto kikimr = DefaultKikimrRunner();
-    auto db = kikimr.GetTableClient();
-    TStreamExecScanQuerySettings settings;
-    settings.CollectQueryStats(ECollectQueryStatsMode::None);
+Y_UNIT_TEST(MultiTxStatsFullExpYql) {
+    MultiTxStatsFullExp<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator);
+}
 
-    auto it = db.StreamExecuteScanQuery(R"(
-        SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
-    )", settings).GetValueSync();
+Y_UNIT_TEST(MultiTxStatsFullExpScan) {
+    MultiTxStatsFullExp<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator);
+}
 
+template <typename Iterator>
+void JoinNoStats(
+        std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
+    auto kikimr = DefaultKikimrRunner();
+    auto it = getIter(kikimr, ECollectQueryStatsMode::None, R"(
+        SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
+    )");
     auto res = CollectStreamResult(it);
     UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
     UNIT_ASSERT_VALUES_EQUAL(res.ResultSetYson, "[[16u]]");
@@ -62,27 +89,50 @@ Y_UNIT_TEST(JoinNoStats) {
     UNIT_ASSERT(!res.PlanJson);
 }
 
-Y_UNIT_TEST(JoinStatsBasic) {
+Y_UNIT_TEST(JoinNoStatsYql) {
+    JoinNoStats<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator);
+}
+
+Y_UNIT_TEST(JoinNoStatsScan) {
+    JoinNoStats<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator);
+}
+
+template <typename Iterator>
+TCollectedStreamResult JoinStatsBasic(
+        std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
     NKikimrConfig::TAppConfig appConfig;
     appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
     auto settings = TKikimrSettings()
-        .SetAppConfig(appConfig);  // TODO: enable stream lookup KIKIMR-14294
-
+        .SetAppConfig(appConfig);
     TKikimrRunner kikimr(settings);
-    auto db = kikimr.GetTableClient();
-    TStreamExecScanQuerySettings querySettings;
-    querySettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
 
-    auto it = db.StreamExecuteScanQuery(R"(
+    auto it = getIter(kikimr, ECollectQueryStatsMode::Basic, R"(
         SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
-    )", querySettings).GetValueSync();
-
+    )");
     auto res = CollectStreamResult(it);
     UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
 
     UNIT_ASSERT_VALUES_EQUAL(res.ResultSetYson, "[[16u]]");
 
     UNIT_ASSERT(res.QueryStats);
+    return res;
+}
+
+Y_UNIT_TEST(JoinStatsBasicYql) {
+    auto res = JoinStatsBasic<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator);
+
+    UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 3);
+    if (res.QueryStats->query_phases(0).table_access(0).name() == "/Root/KeyValue") {
+        UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(2).table_access(0).name(), "/Root/EightShard");
+    } else {
+        UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).table_access(0).name(), "/Root/EightShard");
+        UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(2).table_access(0).name(), "/Root/KeyValue");
+    }
+}
+
+Y_UNIT_TEST(JoinStatsBasicScan) {
+    auto res = JoinStatsBasic<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator);
+
     UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 2);
     if (res.QueryStats->query_phases(0).table_access(0).name() == "/Root/KeyValue") {
         UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).table_access(0).name(), "/Root/KeyValue");
@@ -99,17 +149,15 @@ Y_UNIT_TEST(JoinStatsBasic) {
     UNIT_ASSERT(!res.PlanJson);
 }
 
-Y_UNIT_TEST(MultiTxStatsFull) {
+template <typename Iterator>
+void MultiTxStatsFull(
+        std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getResult) {
     auto kikimr = DefaultKikimrRunner();
-    auto db = kikimr.GetTableClient();
-    TStreamExecScanQuerySettings settings;
-    settings.CollectQueryStats(ECollectQueryStatsMode::Full);
-
-    auto it = db.StreamExecuteScanQuery(R"(
+    auto it = getResult(kikimr, ECollectQueryStatsMode::Full, R"(
         SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4;
-    )", settings).GetValueSync();
-
+    )");
     auto res = CollectStreamResult(it);
+
     UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
     UNIT_ASSERT_VALUES_EQUAL(
         res.ResultSetYson,
@@ -131,6 +179,14 @@ Y_UNIT_TEST(MultiTxStatsFull) {
     UNIT_ASSERT_EQUAL(node.GetMap().at("Stats").GetMapSafe().at("TotalTasks").GetIntegerSafe(), 2);
 }
 
+Y_UNIT_TEST(MultiTxStatsFullYql) {
+    MultiTxStatsFull<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator);
+}
+
+Y_UNIT_TEST(MultiTxStatsFullScan) {
+    MultiTxStatsFull<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator);
+}
+
 Y_UNIT_TEST(DeferredEffects) {
     auto kikimr = DefaultKikimrRunner();
     auto db = kikimr.GetTableClient();
@@ -142,7 +198,6 @@ Y_UNIT_TEST(DeferredEffects) {
     settings.CollectQueryStats(ECollectQueryStatsMode::Full);
 
     auto result = session.ExecuteDataQuery(R"(
-
         UPSERT INTO `/Root/TwoShard`
         SELECT Key + 100u AS Key, Value1 FROM `/Root/TwoShard` WHERE Key in (1,2,3,4,5);
     )", TTxControl::BeginTx(), settings).ExtractValueSync();
@@ -204,7 +259,6 @@ Y_UNIT_TEST(DataQueryWithEffects) {
     settings.CollectQueryStats(ECollectQueryStatsMode::Full);
 
     auto result = session.ExecuteDataQuery(R"(
-
         UPSERT INTO `/Root/TwoShard`
         SELECT Key + 1u AS Key, Value1 FROM `/Root/TwoShard`;
     )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), settings).ExtractValueSync();
@@ -227,7 +281,6 @@ Y_UNIT_TEST(DataQueryMulti) {
     settings.CollectQueryStats(ECollectQueryStatsMode::Full);
 
     auto result = session.ExecuteDataQuery(R"(
-
         SELECT 1;
         SELECT 2;
         SELECT 3;
@@ -368,7 +421,7 @@ Y_UNIT_TEST(StreamLookupStats) {
     NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
     auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
     UNIT_ASSERT(streamLookup.IsDefined());
-    
+
     auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
     UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
     UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);