kqp_explain_ut.cpp 29 KB


  1. #include <ydb/core/kqp/ut/common/kqp_ut_common.h>
  2. #include <ydb/public/sdk/cpp/client/ydb_table/table.h>
  3. #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
  4. namespace NKikimr {
  5. namespace NKqp {
  6. using namespace NYdb;
  7. using namespace NYdb::NTable;
  8. namespace {
  9. void CreateSampleTables(TKikimrRunner& kikimr) {
  10. kikimr.GetTestClient().CreateTable("/Root", R"(
  11. Name: "FourShard"
  12. Columns { Name: "Key", Type: "Uint64" }
  13. Columns { Name: "Value1", Type: "String" }
  14. Columns { Name: "Value2", Type: "String" }
  15. KeyColumnNames: ["Key"],
  16. SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 100 } } } }
  17. SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 200 } } } }
  18. SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 300 } } } }
  19. )");
  20. TTableClient tableClient{kikimr.GetDriver()};
  21. auto session = tableClient.CreateSession().GetValueSync().GetSession();
  22. auto result = session.ExecuteDataQuery(R"(
  23. REPLACE INTO `/Root/FourShard` (Key, Value1, Value2) VALUES
  24. (1u, "Value-001", "1"),
  25. (2u, "Value-002", "2"),
  26. (101u, "Value-101", "101"),
  27. (102u, "Value-102", "102"),
  28. (201u, "Value-201", "201"),
  29. (202u, "Value-202", "202"),
  30. (301u, "Value-301", "301"),
  31. (302u, "Value-302", "302")
  32. )", TTxControl::BeginTx().CommitTx()).GetValueSync();
  33. UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
  34. session.Close();
  35. }
  36. }
  37. Y_UNIT_TEST_SUITE(KqpExplain) {
  38. Y_UNIT_TEST_TWIN(Explain, UseSessionActor) {
  39. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  40. auto db = kikimr.GetTableClient();
  41. TStreamExecScanQuerySettings settings;
  42. settings.Explain(true);
  43. auto it = db.StreamExecuteScanQuery(R"(
  44. SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
  45. )", settings).GetValueSync();
  46. auto res = CollectStreamResult(it);
  47. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  48. UNIT_ASSERT(res.PlanJson);
  49. Cerr << *res.PlanJson;
  50. NJson::TJsonValue plan;
  51. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  52. auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
  53. UNIT_ASSERT(join.IsDefined());
  54. auto left = FindPlanNodeByKv(join, "Table", "EightShard");
  55. UNIT_ASSERT(left.IsDefined());
  56. auto right = FindPlanNodeByKv(join, "Table", "KeyValue");
  57. UNIT_ASSERT(right.IsDefined());
  58. }
  59. Y_UNIT_TEST_TWIN(ExplainStream, UseSessionActor) {
  60. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  61. auto db = kikimr.GetTableClient();
  62. TStreamExecScanQuerySettings settings;
  63. settings.Explain(true);
  64. auto it = db.StreamExecuteScanQuery(R"(
  65. SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
  66. )", settings).GetValueSync();
  67. auto res = CollectStreamResult(it);
  68. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  69. UNIT_ASSERT(res.PlanJson);
  70. Cerr << *res.PlanJson;
  71. NJson::TJsonValue plan;
  72. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  73. auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
  74. UNIT_ASSERT(join.IsDefined());
  75. auto left = FindPlanNodeByKv(join, "Table", "EightShard");
  76. UNIT_ASSERT(left.IsDefined());
  77. auto right = FindPlanNodeByKv(join, "Table", "KeyValue");
  78. UNIT_ASSERT(right.IsDefined());
  79. }
  80. Y_UNIT_TEST_TWIN(ExplainScanQueryWithParams, UseSessionActor) {
  81. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  82. auto db = kikimr.GetTableClient();
  83. TStreamExecScanQuerySettings settings;
  84. settings.Explain(true);
  85. auto it = db.StreamExecuteScanQuery(R"(
  86. --SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
  87. PRAGMA Kikimr.UseNewEngine = "false";
  88. DECLARE $value as Utf8;
  89. SELECT $value as value;
  90. )", settings).GetValueSync();
  91. auto res = CollectStreamResult(it);
  92. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  93. }
  94. Y_UNIT_TEST_TWIN(AggGroupLimit, UseSessionActor) {
  95. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  96. auto db = kikimr.GetTableClient();
  97. TStreamExecScanQuerySettings settings;
  98. settings.Explain(true);
  99. auto it = db.StreamExecuteScanQuery(R"(
  100. SELECT min(Message), max(Message) FROM `/Root/Logs` WHERE Ts > 1 and Ts <= 4 or App="ydb" GROUP BY App;
  101. )", settings).GetValueSync();
  102. auto res = CollectStreamResult(it);
  103. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  104. UNIT_ASSERT(res.PlanJson);
  105. Cerr << *res.PlanJson;
  106. NJson::TJsonValue plan;
  107. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  108. auto read = FindPlanNodeByKv(plan, "Node Type", "Aggregate-Filter-TableFullScan");
  109. UNIT_ASSERT(read.IsDefined());
  110. auto tables = read.GetMapSafe().at("Tables").GetArraySafe();
  111. UNIT_ASSERT(tables[0].GetStringSafe() == "Logs");
  112. auto shuffle = FindPlanNodeByKv(plan, "Node Type", "HashShuffle");
  113. UNIT_ASSERT(shuffle.IsDefined());
  114. auto& columns = shuffle.GetMapSafe().at("KeyColumns").GetArraySafe();
  115. UNIT_ASSERT(!columns.empty() && columns[0] == "App");
  116. auto aggregate = FindPlanNodeByKv(read, "Name", "Aggregate");
  117. UNIT_ASSERT(aggregate.IsDefined());
  118. UNIT_ASSERT(aggregate.GetMapSafe().at("GroupBy").GetStringSafe() == "item.App");
  119. UNIT_ASSERT(aggregate.GetMapSafe().at("Aggregation").GetStringSafe() ==
  120. "{_yql_agg_0: MIN(item.Message),_yql_agg_1: MAX(item.Message)}");
  121. }
  122. Y_UNIT_TEST_TWIN(ComplexJoin, UseSessionActor) {
  123. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  124. CreateSampleTables(kikimr);
  125. auto db = kikimr.GetTableClient();
  126. TStreamExecScanQuerySettings settings;
  127. settings.Explain(true);
  128. auto it = db.StreamExecuteScanQuery(R"(
  129. $join = (
  130. SELECT l.Key as Key, l.Text as Text, l.Data as Data, r.Value1 as Value1, r.Value2 as Value2
  131. FROM `/Root/EightShard` AS l JOIN `/Root/FourShard` AS r ON l.Key = r.Key
  132. );
  133. SELECT Key, COUNT(*) AS Cnt
  134. FROM $join
  135. WHERE Cast(Data As Int64) < (Key - 100) and Value1 != 'Value-101'
  136. GROUP BY Key
  137. UNION ALL
  138. (SELECT Key FROM `/Root/KeyValue` ORDER BY Key LIMIT 1)
  139. )", settings).GetValueSync();
  140. auto res = CollectStreamResult(it);
  141. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  142. UNIT_ASSERT(res.PlanJson);
  143. NJson::TJsonValue plan;
  144. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  145. auto join = FindPlanNodeByKv(
  146. plan,
  147. "Node Type",
  148. "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan"
  149. );
  150. UNIT_ASSERT(join.IsDefined());
  151. auto left = FindPlanNodeByKv(join, "Table", "EightShard");
  152. UNIT_ASSERT(left.IsDefined());
  153. auto right = FindPlanNodeByKv(join, "Table", "FourShard");
  154. UNIT_ASSERT(right.IsDefined());
  155. }
  156. Y_UNIT_TEST_TWIN(PrecomputeRange, UseSessionActor) {
  157. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  158. auto db = kikimr.GetTableClient();
  159. TStreamExecScanQuerySettings settings;
  160. settings.Explain(true);
  161. auto it = db.StreamExecuteScanQuery(R"(
  162. SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 149 + 1 AND 266 ORDER BY Data LIMIT 4;
  163. )", settings).GetValueSync();
  164. auto res = CollectStreamResult(it);
  165. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  166. UNIT_ASSERT(res.PlanJson);
  167. Cerr << *res.PlanJson;
  168. NJson::TJsonValue plan;
  169. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  170. auto node = FindPlanNodeByKv(plan, "Node Type", "TopSort-TableRangesScan");
  171. UNIT_ASSERT(node.IsDefined());
  172. auto operators = node.GetMapSafe().at("Operators").GetArraySafe();
  173. UNIT_ASSERT(operators[1].GetMapSafe().at("Name") == "TableRangesScan");
  174. auto& readRanges = operators[1].GetMapSafe().at("ReadRanges").GetArraySafe();
  175. UNIT_ASSERT(readRanges[0] == "Key [150, 266]");
  176. }
  177. Y_UNIT_TEST_TWIN(CompoundKeyRange, UseSessionActor) {
  178. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  179. auto db = kikimr.GetTableClient();
  180. TStreamExecScanQuerySettings settings;
  181. settings.Explain(true);
  182. auto it = db.StreamExecuteScanQuery(R"(
  183. PRAGMA Kikimr.OptEnablePredicateExtract = "false";
  184. SELECT * FROM `/Root/Logs` WHERE App = "new_app_1" AND Host < "xyz" AND Ts = (42+7) Limit 10;
  185. )", settings).GetValueSync();
  186. auto res = CollectStreamResult(it);
  187. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  188. UNIT_ASSERT(res.PlanJson);
  189. Cerr << *res.PlanJson;
  190. NJson::TJsonValue plan;
  191. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  192. auto read = FindPlanNodeByKv(plan, "Node Type", "Limit-TablePointLookup");
  193. auto& operators = read.GetMapSafe().at("Operators").GetArraySafe();
  194. UNIT_ASSERT(operators.size() == 2);
  195. auto& lookup = operators[1].GetMapSafe();
  196. UNIT_ASSERT(lookup.at("Name") == "TablePointLookup");
  197. UNIT_ASSERT(lookup.at("ReadRange").GetArraySafe()[0] == "App (new_app_1)");
  198. }
  199. Y_UNIT_TEST_TWIN(SortStage, UseSessionActor) {
  200. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  201. auto db = kikimr.GetTableClient();
  202. TStreamExecScanQuerySettings settings;
  203. settings.Explain(true);
  204. auto it = db.StreamExecuteScanQuery(R"(
  205. SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Text;
  206. )", settings).GetValueSync();
  207. auto res = CollectStreamResult(it);
  208. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  209. UNIT_ASSERT(res.PlanJson);
  210. Cerr << *res.PlanJson;
  211. NJson::TJsonValue plan;
  212. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  213. auto scanSort = FindPlanNodeByKv(plan, "Node Type", "Sort-TableRangeScan");
  214. UNIT_ASSERT(scanSort.IsDefined());
  215. }
  216. Y_UNIT_TEST_TWIN(LimitOffset, UseSessionActor) {
  217. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  218. auto db = kikimr.GetTableClient();
  219. TStreamExecScanQuerySettings settings;
  220. settings.Explain(true);
  221. auto it = db.StreamExecuteScanQuery(R"(
  222. SELECT * FROM `/Root/EightShard` ORDER BY Text LIMIT 10 OFFSET 15;
  223. )", settings).GetValueSync();
  224. auto res = CollectStreamResult(it);
  225. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  226. UNIT_ASSERT(res.PlanJson);
  227. Cerr << *res.PlanJson;
  228. NJson::TJsonValue plan;
  229. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  230. auto limit = FindPlanNodeByKv(plan, "Limit", "10");
  231. UNIT_ASSERT(limit.IsDefined());
  232. auto offset = FindPlanNodeByKv(plan, "Offset", "15");
  233. UNIT_ASSERT(offset.IsDefined());
  234. }
  235. Y_UNIT_TEST_TWIN(SelfJoin3xSameLabels, UseSessionActor) {
  236. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  237. auto db = kikimr.GetTableClient();
  238. TStreamExecScanQuerySettings settings;
  239. settings.Explain(true);
  240. auto it = db.StreamExecuteScanQuery(R"(
  241. $foo = (
  242. SELECT t1.Key AS Key
  243. FROM `/Root/KeyValue` AS t1
  244. JOIN `/Root/KeyValue` AS t2
  245. ON t1.Key = t2.Key
  246. GROUP BY t1.Key
  247. );
  248. SELECT t1.Key AS Key
  249. FROM $foo AS Foo
  250. JOIN `/Root/KeyValue` AS t1
  251. ON t1.Key = Foo.Key
  252. ORDER BY Key
  253. )", settings).GetValueSync();
  254. auto res = CollectStreamResult(it);
  255. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  256. UNIT_ASSERT(res.PlanJson);
  257. Cerr << *res.PlanJson;
  258. NJson::TJsonValue plan;
  259. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  260. auto join1 = FindPlanNodeByKv(plan, "Node Type", "Sort-InnerJoin (MapJoin)-Filter-Aggregate");
  261. UNIT_ASSERT(join1.IsDefined());
  262. auto join2 = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter");
  263. UNIT_ASSERT(join2.IsDefined());
  264. }
  265. Y_UNIT_TEST_TWIN(PureExpr, UseSessionActor) {
  266. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  267. auto db = kikimr.GetTableClient();
  268. TStreamExecScanQuerySettings settings;
  269. settings.Explain(true);
  270. auto it = db.StreamExecuteScanQuery(R"(
  271. SELECT 1,2,3 UNION ALL SELECT 4,5,6;
  272. )", settings).GetValueSync();
  273. auto res = CollectStreamResult(it);
  274. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  275. UNIT_ASSERT(res.PlanJson);
  276. Cerr << *res.PlanJson;
  277. NJson::TJsonValue plan;
  278. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  279. auto constExpr = FindPlanNodeByKv(plan, "Node Type", "ConstantExpr");
  280. UNIT_ASSERT(constExpr.IsDefined());
  281. }
  282. Y_UNIT_TEST_TWIN(MultiUsedStage, UseSessionActor) {
  283. NKikimrConfig::TAppConfig appCfg;
  284. auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();
  285. spilling->SetEnable(true);
  286. spilling->SetRoot("./spilling/");
  287. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, appCfg);
  288. auto db = kikimr.GetTableClient();
  289. TStreamExecScanQuerySettings settings;
  290. settings.Explain(true);
  291. auto it = db.StreamExecuteScanQuery(R"(
  292. select count(*) from `/Root/KeyValue` AS t1 join `/Root/KeyValue` AS t2 on t1.Key = t2.Key;
  293. )", settings).GetValueSync();
  294. auto res = CollectStreamResult(it);
  295. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  296. UNIT_ASSERT(res.PlanJson);
  297. Cerr << *res.PlanJson;
  298. NJson::TJsonValue plan;
  299. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  300. bool containCte = false;
  301. auto& plans = plan.GetMapSafe().at("Plan").GetMapSafe().at("Plans");
  302. for (auto& planNode : plans.GetArraySafe()) {
  303. auto& planMap = planNode.GetMapSafe();
  304. if (planMap.contains("Subplan Name") && planMap["Subplan Name"].GetStringSafe().Contains("CTE")) {
  305. containCte = true;
  306. break;
  307. }
  308. }
  309. UNIT_ASSERT(containCte);
  310. }
  311. Y_UNIT_TEST_TWIN(SqlIn, UseSessionActor) {
  312. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  313. CreateSampleTables(kikimr);
  314. TStreamExecScanQuerySettings settings;
  315. settings.Explain(true);
  316. auto db = kikimr.GetTableClient();
  317. auto query = R"(
  318. PRAGMA Kikimr.OptEnablePredicateExtract = "false";
  319. SELECT Key, Value FROM `/Root/KeyValue` WHERE Key IN (1, 2, 3, 42)
  320. ORDER BY Key
  321. )";
  322. auto it = db.StreamExecuteScanQuery(query, settings).GetValueSync();
  323. auto res = CollectStreamResult(it);
  324. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  325. UNIT_ASSERT(res.PlanJson);
  326. Cerr << *res.PlanJson;
  327. NJson::TJsonValue plan;
  328. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  329. auto unionNode = FindPlanNodeByKv(plan, "Node Type", "Sort-Union");
  330. UNIT_ASSERT_EQUAL(unionNode.GetMap().at("Plans").GetArraySafe().size(), 4);
  331. }
  332. Y_UNIT_TEST_TWIN(ExplainDataQueryOldEngine, UseSessionActor) {
  333. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  334. auto db = kikimr.GetTableClient();
  335. auto session = db.CreateSession().GetValueSync().GetSession();
  336. auto result = session.ExplainDataQuery(R"(
  337. SELECT Key, Value FROM `/Root/KeyValue` WHERE Key IN (1, 2, 3, 42) ORDER BY Key;
  338. )").ExtractValueSync();
  339. result.GetIssues().PrintTo(Cerr);
  340. NJson::TJsonValue plan;
  341. NJson::ReadJsonTree(result.GetPlan(), &plan, true);
  342. UNIT_ASSERT_EQUAL(plan.GetMapSafe().at("tables").GetArraySafe()[0].GetMapSafe().at("name").GetStringSafe(), "/Root/KeyValue");
  343. }
  344. Y_UNIT_TEST_TWIN(ExplainDataQueryNewEngine, UseSessionActor) {
  345. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  346. auto db = kikimr.GetTableClient();
  347. auto session = db.CreateSession().GetValueSync().GetSession();
  348. auto result = session.ExplainDataQuery(R"(
  349. PRAGMA kikimr.UseNewEngine = "true";
  350. SELECT Key, Value FROM `/Root/KeyValue` WHERE Key IN (1, 2, 3, 42) ORDER BY Key;
  351. SELECT Key, Value FROM `/Root/KeyValue` WHERE Key IN (1, 2, 3, 4*2) ORDER BY Key;
  352. SELECT count(distinct Value) FROM `/Root/KeyValue` WHERE Key > 20 and Key <= 120;
  353. SELECT count(distinct Value) FROM `/Root/KeyValue`;
  354. SELECT count(distinct Value) FROM `/Root/KeyValue` WHERE Key >= 10;
  355. )").ExtractValueSync();
  356. result.GetIssues().PrintTo(Cerr);
  357. NJson::TJsonValue plan;
  358. NJson::ReadJsonTree(result.GetPlan(), &plan, true);
  359. auto node = FindPlanNodeByKv(plan, "Name", "TableRangeScan");
  360. UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
  361. node = FindPlanNodeByKv(plan, "Name", "TableFullScan");
  362. UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
  363. node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
  364. UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
  365. }
  366. Y_UNIT_TEST_TWIN(FewEffects, UseSessionActor) {
  367. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  368. auto db = kikimr.GetTableClient();
  369. auto session = db.CreateSession().GetValueSync().GetSession();
  370. auto result = session.ExplainDataQuery(R"(
  371. PRAGMA kikimr.UseNewEngine = "true";
  372. UPDATE `/Root/EightShard` SET Data=Data+1;
  373. UPDATE `/Root/EightShard` SET Data=Data-1 WHERE Key In (100,200,300);
  374. DELETE FROM `/Root/EightShard` WHERE Key > 350;
  375. )").ExtractValueSync();
  376. UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
  377. NJson::TJsonValue plan;
  378. NJson::ReadJsonTree(result.GetPlan(), &plan, true);
  379. // Cerr << plan << Endl;
  380. auto upsertsCount = CountPlanNodesByKv(plan, "Node Type", "Upsert-ConstantExpr");
  381. UNIT_ASSERT_VALUES_EQUAL(upsertsCount, 2);
  382. auto deletesCount = CountPlanNodesByKv(plan, "Node Type", "Delete-ConstantExpr");
  383. UNIT_ASSERT_VALUES_EQUAL(deletesCount, 1);
  384. auto fullScansCount = CountPlanNodesByKv(plan, "Node Type", "TableFullScan");
  385. UNIT_ASSERT_VALUES_EQUAL(fullScansCount, 1);
  386. auto rangeScansCount = CountPlanNodesByKv(plan, "Node Type", "TableRangeScan");
  387. UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1);
  388. auto lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
  389. UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 3);
  390. /* check tables section */
  391. const auto& tableInfo = plan.GetMapSafe().at("tables").GetArraySafe()[0].GetMapSafe();
  392. UNIT_ASSERT_VALUES_EQUAL(tableInfo.at("name"), "/Root/EightShard");
  393. THashMap<TString, int> counter;
  394. auto countOperationsByType = [&tableInfo, &counter](const auto& type) {
  395. for (const auto& op : tableInfo.at(type).GetArraySafe()) {
  396. ++counter[op.GetMapSafe().at("type").GetStringSafe()];
  397. }
  398. };
  399. countOperationsByType("reads");
  400. countOperationsByType("writes");
  401. UNIT_ASSERT_VALUES_EQUAL(counter["MultiUpsert"], upsertsCount);
  402. UNIT_ASSERT_VALUES_EQUAL(counter["MultiErase"], deletesCount);
  403. UNIT_ASSERT_VALUES_EQUAL(counter["FullScan"], fullScansCount);
  404. UNIT_ASSERT_VALUES_EQUAL(counter["Scan"], rangeScansCount);
  405. UNIT_ASSERT_VALUES_EQUAL(counter["Lookup"], lookupsCount);
  406. }
  407. Y_UNIT_TEST_TWIN(ExplainDataQueryWithParams, UseSessionActor) {
  408. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  409. auto db = kikimr.GetTableClient();
  410. auto session = db.CreateSession().GetValueSync().GetSession();
  411. auto result = session.ExplainDataQuery(R"(
  412. PRAGMA Kikimr.UseNewEngine = "false";
  413. DECLARE $value as Utf8;
  414. SELECT $value as value;
  415. )").ExtractValueSync();
  416. UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
  417. auto result2 = session.ExplainDataQuery(R"(
  418. PRAGMA Kikimr.UseNewEngine = "true";
  419. DECLARE $value as Utf8;
  420. SELECT $value as value;
  421. )").ExtractValueSync();
  422. UNIT_ASSERT_VALUES_EQUAL_C(result2.GetStatus(), EStatus::SUCCESS, result2.GetIssues().ToString());
  423. }
  424. Y_UNIT_TEST_TWIN(FullOuterJoin, UseSessionActor) {
  425. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  426. CreateSampleTables(kikimr);
  427. TStreamExecScanQuerySettings settings;
  428. settings.Explain(true);
  429. auto db = kikimr.GetTableClient();
  430. auto it = db.StreamExecuteScanQuery(R"(
  431. SELECT l.Key, l.Text, l.Data, r.Value1, r.Value2
  432. FROM `/Root/EightShard` AS l FULL OUTER JOIN `/Root/FourShard` AS r
  433. ON l.Key = r.Key
  434. )", settings).GetValueSync();
  435. auto res = CollectStreamResult(it);
  436. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  437. UNIT_ASSERT(res.PlanJson);
  438. NJson::TJsonValue plan;
  439. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  440. auto join = FindPlanNodeByKv(plan, "Node Type", "FullJoin (JoinDict)");
  441. UNIT_ASSERT(join.IsDefined());
  442. auto left = FindPlanNodeByKv(join, "Table", "EightShard");
  443. UNIT_ASSERT(left.IsDefined());
  444. auto right = FindPlanNodeByKv(join, "Table", "FourShard");
  445. UNIT_ASSERT(right.IsDefined());
  446. }
  447. Y_UNIT_TEST_TWIN(ReadTableRangesFullScan, UseSessionActor) {
  448. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  449. TStreamExecScanQuerySettings settings;
  450. settings.Explain(true);
  451. auto db = kikimr.GetTableClient();
  452. auto session = db.CreateSession().GetValueSync().GetSession();
  453. auto res = session.ExecuteSchemeQuery(R"(
  454. CREATE TABLE `/Root/TwoKeys` (
  455. Key1 Int32,
  456. Key2 Int32,
  457. Value Int32,
  458. PRIMARY KEY (Key1, Key2)
  459. );
  460. )").GetValueSync();
  461. UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
  462. auto result = session.ExecuteDataQuery(R"(
  463. REPLACE INTO `TwoKeys` (Key1, Key2, Value) VALUES
  464. (1, 1, 1),
  465. (2, 1, 2),
  466. (3, 2, 3),
  467. (4, 2, 4),
  468. (1000, 100, 5),
  469. (1001, 101, 6),
  470. (1002, 102, 7),
  471. (1003, 103, 8);
  472. )", TTxControl::BeginTx().CommitTx()).GetValueSync();
  473. UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
  474. TVector<std::pair<TString, TString>> testData = {
  475. {
  476. "SELECT * FROM `/Root/TwoKeys`;",
  477. "TableFullScan"
  478. },
  479. {
  480. "SELECT * FROM `/Root/TwoKeys` WHERE Key2 > 101;",
  481. "Filter-TableFullScan"
  482. }
  483. };
  484. for (auto& data: testData) {
  485. auto it = db.StreamExecuteScanQuery(data.first, settings).GetValueSync();
  486. auto res = CollectStreamResult(it);
  487. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  488. UNIT_ASSERT(res.PlanJson);
  489. NJson::TJsonValue plan;
  490. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  491. auto read = FindPlanNodeByKv(plan, "Node Type", data.second);
  492. UNIT_ASSERT(read.IsDefined());
  493. auto rangesKeys = FindPlanNodeByKv(plan, "ReadRangesKeys", "[]");
  494. UNIT_ASSERT(!rangesKeys.IsDefined());
  495. auto expected = FindPlanNodeByKv(plan, "ReadRangesExpectedSize", "");
  496. UNIT_ASSERT(!expected.IsDefined());
  497. }
  498. }
  499. Y_UNIT_TEST_TWIN(ReadTableRanges, UseSessionActor) {
  500. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  501. CreateSampleTables(kikimr);
  502. TStreamExecScanQuerySettings settings;
  503. settings.Explain(true);
  504. auto db = kikimr.GetTableClient();
  505. auto it = db.StreamExecuteScanQuery(R"(
  506. SELECT * FROM `/Root/KeyValue`
  507. WHERE Key >= 2000 OR Key < 99 + 1;
  508. )", settings).GetValueSync();
  509. auto res = CollectStreamResult(it);
  510. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  511. UNIT_ASSERT(res.PlanJson);
  512. NJson::TJsonValue plan;
  513. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  514. auto read = FindPlanNodeByKv(plan, "Node Type", "TableRangesScan");
  515. UNIT_ASSERT(read.IsDefined());
  516. auto keys = FindPlanNodeByKv(plan, "ReadRangesKeys", "[\"Key\"]");
  517. UNIT_ASSERT(keys.IsDefined());
  518. auto count = FindPlanNodeByKv(plan, "ReadRangesExpectedSize", "2");
  519. UNIT_ASSERT(count.IsDefined());
  520. }
  521. Y_UNIT_TEST_TWIN(Predicates, UseSessionActor) {
  522. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  523. TStreamExecScanQuerySettings settings;
  524. settings.Explain(true);
  525. auto db = kikimr.GetTableClient();
  526. auto session = db.CreateSession().GetValueSync().GetSession();
  527. auto res = session.ExecuteSchemeQuery(R"(
  528. CREATE TABLE `/Root/TwoKeys` (
  529. Key1 Int32,
  530. Key2 Int32,
  531. Value Int64,
  532. PRIMARY KEY (Key1, Key2)
  533. );
  534. )").GetValueSync();
  535. UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
  536. auto result = session.ExecuteDataQuery(R"(
  537. REPLACE INTO `TwoKeys` (Key1, Key2, Value) VALUES
  538. (1, 1, 1),
  539. (2, 1, 2),
  540. (3, 2, 3),
  541. (4, 2, 4),
  542. (1000, 100, 5),
  543. (1001, 101, 6),
  544. (1002, 102, 7),
  545. (1003, 103, 8);
  546. )", TTxControl::BeginTx().CommitTx()).GetValueSync();
  547. UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
  548. TVector<std::pair<TString, TString>> testData = {
  549. {
  550. "SELECT * FROM `/Root/TwoKeys` WHERE Value > 5 And Value <= 10",
  551. "item.Value > 5 And item.Value <= 10"
  552. },
  553. {
  554. "SELECT * FROM `/Root/TwoKeys` WHERE Key2 < 100 Or Value == 5",
  555. "item.Key2 < 100 Or item.Value == 5"
  556. },
  557. {
  558. "SELECT * FROM `/Root/TwoKeys` WHERE Key2 < 100 And Key2 >= 10 And Value != 5",
  559. "item.Key2 < 100 And item.Key2 >= 10 And item.Value != 5"
  560. },
  561. {
  562. "SELECT * FROM `/Root/TwoKeys` WHERE Key2 < 10 Or Cast(Key2 As Int64) < Value",
  563. "item.Key2 < 10 Or ..."
  564. }
  565. };
  566. for (const auto& [query, predicate] : testData) {
  567. auto it = db.StreamExecuteScanQuery(query, settings).GetValueSync();
  568. auto res = CollectStreamResult(it);
  569. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  570. UNIT_ASSERT(res.PlanJson);
  571. NJson::TJsonValue plan;
  572. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  573. auto filter = FindPlanNodeByKv(plan, "Name", "Filter");
  574. UNIT_ASSERT(filter.IsDefined());
  575. UNIT_ASSERT_C(filter.GetMapSafe().at("Predicate") == predicate,
  576. TStringBuilder() << "For query: " << query
  577. << " expected predicate: " << predicate
  578. << " but received: " << filter.GetMapSafe().at("Predicate"));
  579. }
  580. }
  581. Y_UNIT_TEST_TWIN(MergeConnection, UseSessionActor) {
  582. auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
  583. TStreamExecScanQuerySettings settings;
  584. settings.Explain(true);
  585. auto db = kikimr.GetTableClient();
  586. auto it = db.StreamExecuteScanQuery(R"(
  587. SELECT * FROM `/Root/KeyValue` ORDER BY Key;
  588. )", settings).GetValueSync();
  589. auto res = CollectStreamResult(it);
  590. UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
  591. UNIT_ASSERT(res.PlanJson);
  592. Cerr << res.PlanJson << Endl;
  593. NJson::TJsonValue plan;
  594. NJson::ReadJsonTree(*res.PlanJson, &plan, true);
  595. auto merge = FindPlanNodeByKv(
  596. plan,
  597. "Node Type",
  598. "Merge"
  599. );
  600. UNIT_ASSERT(merge.IsDefined());
  601. const auto& sortColumns = merge.GetMapSafe().at("SortColumns").GetArraySafe();
  602. UNIT_ASSERT(sortColumns.size() == 1);
  603. UNIT_ASSERT(sortColumns.at(0) == "Key (Asc)");
  604. }
  605. }
  606. } // namespace NKqp
  607. } // namespace NKikimr