yql_execution_ut.cpp 32 KB


  1. #include "yql_execution.h"
  2. #include <yql/essentials/core/ut_common/yql_ut_common.h>
  3. #include <yql/essentials/ast/yql_ast_annotation.h>
  4. #include <yql/essentials/ast/yql_expr.h>
  5. #include <yql/essentials/core/type_ann/type_ann_core.h>
  6. #include <yql/essentials/core/yql_expr_optimize.h>
  7. #include <yql/essentials/core/yql_expr_type_annotation.h>
  8. #include <yql/essentials/core/yql_opt_proposed_by_data.h>
  9. #include <yql/essentials/core/yql_opt_rewrite_io.h>
  10. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  11. #include <yql/essentials/providers/common/schema/parser/yql_type_parser.h>
  12. #include <yql/essentials/providers/result/provider/yql_result_provider.h>
  13. #include <yql/essentials/core/facade/yql_facade.h>
  14. #include <contrib/ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
  15. #include <contrib/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h>
  16. #include <contrib/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h>
  17. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  18. #include <library/cpp/testing/unittest/registar.h>
  19. #include <util/system/user.h>
  20. #include <util/system/tempfile.h>
  21. #include <util/system/defaults.h>
  22. #include <util/system/fstat.h>
  23. #include <util/folder/path.h>
  24. #include <util/folder/tempdir.h>
  25. #include <util/string/cast.h>
  26. #include <util/string/builder.h>
  27. #include <util/system/sanitizers.h>
  28. namespace NYql {
  29. static TString BuildFileNameForTmpTable(TStringBuf table, TStringBuf tmpDir) {
  30. return TStringBuilder() << tmpDir << LOCSLASH_C << table.substr(4) << ".tmp";
  31. }
  32. struct TRunSingleProgram {
  33. TString Src;
  34. TString TmpDir;
  35. TString Parameters;
  36. IOutputStream& Err;
  37. TVector<TString> Res;
  38. THashMap<TString, TString> Tables;
  39. TRunSingleProgram(const TString& src, IOutputStream& err)
  40. : Src(src)
  41. , Err(err)
  42. {
  43. }
  44. bool Run(
  45. const NKikimr::NMiniKQL::IFunctionRegistry* funcReg
  46. ) {
  47. auto yqlNativeServices = NFile::TYtFileServices::Make(funcReg, Tables, {}, TmpDir);
  48. auto ytGateway = CreateYtFileGateway(yqlNativeServices);
  49. TVector<TDataProviderInitializer> dataProvidersInit;
  50. dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway));
  51. TProgramFactory factory(true, funcReg, 0ULL, dataProvidersInit, "ut");
  52. TProgramPtr program = factory.Create("-stdin-", Src);
  53. program->ConfigureYsonResultFormat(NYson::EYsonFormat::Text);
  54. if (!Parameters.empty()) {
  55. program->SetParametersYson(Parameters);
  56. }
  57. if (!program->ParseYql() || !program->Compile(GetUsername())) {
  58. program->PrintErrorsTo(Err);
  59. return false;
  60. }
  61. TProgram::TStatus status = program->Run(GetUsername());
  62. if (status == TProgram::TStatus::Error) {
  63. program->PrintErrorsTo(Err);
  64. }
  65. Res = program->Results();
  66. return status == TProgram::TStatus::Ok;
  67. }
  68. void AddResults(TVector<TString>& res) const {
  69. res.insert(res.end(), Res.begin(), Res.end());
  70. }
  71. bool Finished() const {
  72. return true;
  73. }
  74. };
  75. struct TRunMultiplePrograms: public TRunSingleProgram {
  76. TVector<TString> Srcs;
  77. size_t Curr;
  78. TRunMultiplePrograms(const TVector<TString>& srcs, IOutputStream& err)
  79. : TRunSingleProgram(TString(), err)
  80. , Srcs(srcs)
  81. , Curr(0)
  82. {
  83. }
  84. bool Run(
  85. const NKikimr::NMiniKQL::IFunctionRegistry* funcReg
  86. ) {
  87. TString origTmpDir = TmpDir;
  88. if (TmpDir) {
  89. TFsPath newTmp = TFsPath(TmpDir) / ToString(Curr);
  90. newTmp.MkDirs();
  91. TmpDir = newTmp.GetPath();
  92. }
  93. Src = Srcs[Curr];
  94. if (!TRunSingleProgram::Run(funcReg)) {
  95. return false;
  96. }
  97. ui32 idx = 0;
  98. for (auto& resStr: Res) {
  99. NYT::TNode res;
  100. if (!NCommon::ParseYson(res, resStr, Err)) {
  101. return false;
  102. }
  103. if (!res.IsMap() || !res.HasKey("Write") || !res["Write"].IsList()) {
  104. Err << "Invalid result: " << resStr << Endl;
  105. return false;
  106. }
  107. for (auto& elem: res["Write"].AsList()) {
  108. if (!elem.IsMap()) {
  109. Err << "Invalid result element in result: " << resStr << Endl;
  110. return false;
  111. }
  112. if (elem.HasKey("Ref")) {
  113. if (!elem["Ref"].IsList()) {
  114. Err << "Invalid reference in result: " << resStr << Endl;
  115. return false;
  116. }
  117. for (auto& refElem: elem["Ref"].AsList()) {
  118. if (!refElem.IsMap() || !refElem.HasKey("Reference")) {
  119. Err << "Invalid reference in result: " << resStr << Endl;
  120. return false;
  121. }
  122. if (!refElem["Remove"].AsBool()) {
  123. continue;
  124. }
  125. const auto& ref = refElem["Reference"].AsList();
  126. TStringStream name;
  127. name << ref[0].AsString() << "." << ref[1].AsString() << ".Result" << Curr << "_" << idx;
  128. Tables[name.Str()] = BuildFileNameForTmpTable(ref[2].AsString(), TmpDir);
  129. ++idx;
  130. }
  131. }
  132. }
  133. }
  134. ++Curr;
  135. origTmpDir.swap(TmpDir);
  136. return true;
  137. }
  138. bool Finished() const {
  139. return Curr == Srcs.size();
  140. }
  141. };
  142. template <typename TDriver>
  143. TVector<TString> Run(TDriver& driver) {
  144. auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry());
  145. TVector<TString> res;
  146. do {
  147. const bool runRes = driver.Run(functionRegistry.Get());
  148. UNIT_ASSERT(runRes);
  149. driver.AddResults(res);
  150. } while (!driver.Finished());
  151. return res;
  152. }
  153. TVector<TString> RunProgram(const TString& programSrc, const THashMap<TString, TString>& tables, const TString& tmpDir = TString(), const TString& params = TString()) {
  154. TRunSingleProgram driver(programSrc, Cerr);
  155. driver.Tables = tables;
  156. driver.TmpDir = tmpDir;
  157. driver.Parameters = params;
  158. return Run(driver);
  159. }
  160. static const TStringBuf KSV_ATTRS =
  161. "{\"_yql_row_spec\" = {\"Type\" = [\"StructType\";["
  162. "[\"key\";[\"DataType\";\"String\"]];"
  163. "[\"subkey\";[\"DataType\";\"String\"]];"
  164. "[\"value\";[\"DataType\";\"String\"]]"
  165. "]]}}"
  166. ;
  167. Y_UNIT_TEST_SUITE(ExecutionYqlExpr) {
  168. Y_UNIT_TEST(WriteToResultUsingIsolatedGraph) {
  169. auto s = "(\n"
  170. "(let res_sink (DataSink 'result))\n"
  171. "(let data (AsList (String 'x)))\n"
  172. "(let world (Write! world res_sink (Key) data '()))\n"
  173. "(let world (Commit! world res_sink))\n"
  174. "(return world)\n"
  175. ")\n";
  176. auto res = RunProgram(s, THashMap<TString, TString>());
  177. UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
  178. UNIT_ASSERT_NO_DIFF("{\"Write\"=[{\"Data\"=[\"x\"]}]}", res[0]);
  179. }
  180. Y_UNIT_TEST(WriteToResultTableOutput) {
  181. TTempFileHandle inputFile;
  182. TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
  183. TStringBuf data =
  184. "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
  185. "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
  186. "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
  187. "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
  188. ;
  189. inputFile.Write(data.data(), data.size());
  190. inputFile.FlushData();
  191. inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
  192. inputFileAttrs.FlushData();
  193. THashMap<TString, TString> tables;
  194. tables["yt.plato.Input"] = inputFile.Name();
  195. auto s = "(\n"
  196. "(let mr_source (DataSource 'yt 'plato))\n"
  197. "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
  198. "(let world (Left! x))\n"
  199. "(let table1 (Right! x))\n"
  200. "(let res_sink (DataSink 'result))\n"
  201. "(let data (AsList (String 'x)))\n"
  202. "(let world (Write! world res_sink (Key) table1 '()))\n"
  203. "(let world (Commit! world res_sink))\n"
  204. "(return world)\n"
  205. ")\n";
  206. auto res = RunProgram(s, tables);
  207. UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
  208. UNIT_ASSERT_NO_DIFF(
  209. "{\"Write\"=[{\"Data\"=["
  210. "[\"075\";\".\";\"abc\"];"
  211. "[\"800\";\".\";\"ddd\"];"
  212. "[\"020\";\".\";\"q\"];"
  213. "[\"150\";\".\";\"qzz\"]"
  214. "]}]}",
  215. res[0]
  216. );
  217. }
  218. Y_UNIT_TEST(WriteToResultTransformedTable) {
  219. TTempFileHandle inputFile;
  220. TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
  221. TStringBuf data =
  222. "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
  223. "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
  224. "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
  225. "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
  226. ;
  227. inputFile.Write(data.data(), data.size());
  228. inputFile.FlushData();
  229. inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
  230. inputFileAttrs.FlushData();
  231. THashMap<TString, TString> tables;
  232. tables["yt.plato.Input"] = inputFile.Name();
  233. auto s = "(\n"
  234. "(let mr_source (DataSource 'yt 'plato))\n"
  235. "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
  236. "(let world (Left! x))\n"
  237. "(let table1 (Right! x))\n"
  238. "(let table1low (FlatMap table1 (lambda '(item) (block '(\n"
  239. " (let intValueOpt (FromString (Member item 'key) 'Int32))\n"
  240. " (let ret (FlatMap intValueOpt (lambda '(item2) (block '(\n"
  241. " (return (ListIf (< item2 (Int32 '100)) item))\n"
  242. " )))))"
  243. " (return ret)"
  244. ")))))"
  245. "(let res_sink (DataSink 'result))\n"
  246. "(let data (AsList (String 'x)))\n"
  247. "(let world (Write! world res_sink (Key) table1low '()))\n"
  248. "(let world (Commit! world res_sink))\n"
  249. "(return world)\n"
  250. ")\n";
  251. auto res = RunProgram(s, tables);
  252. UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
  253. UNIT_ASSERT_NO_DIFF(
  254. "{\"Write\"=[{\"Data\"=["
  255. "[\"075\";\".\";\"abc\"];"
  256. "[\"020\";\".\";\"q\"]"
  257. "]}]}",
  258. res[0]
  259. );
  260. }
  261. Y_UNIT_TEST(DropTable) {
  262. TTempFileHandle outputFile;
  263. TTempFileHandle outputFileAttrs(outputFile.Name() + ".attr");
  264. TStringBuf data =
  265. "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"sv
  266. ;
  267. outputFile.Write(data.data(), data.size());
  268. outputFile.FlushData();
  269. outputFile.Close();
  270. outputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
  271. outputFileAttrs.FlushData();
  272. outputFileAttrs.Close();
  273. UNIT_ASSERT(TFileStat(outputFile.Name()).IsFile());
  274. THashMap<TString, TString> tables;
  275. tables["yt.plato.Output"] = outputFile.Name();
  276. auto s = "(\n"
  277. "(let mr_sink (DataSink 'yt 'plato))\n"
  278. "(let world (Write! world mr_sink (Key '('table (String 'Output))) (Void) '('('mode 'drop))))\n"
  279. "(let world (Commit! world mr_sink))\n"
  280. "(return world)\n"
  281. ")\n";
  282. RunProgram(s, tables);
  283. UNIT_ASSERT(!TFileStat(outputFile.Name()).IsFile());
  284. }
  285. Y_UNIT_TEST(WriteToResultTableByRef) {
  286. TTempFileHandle inputFile;
  287. TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
  288. TStringBuf data =
  289. "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
  290. "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
  291. "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
  292. "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
  293. ;
  294. inputFile.Write(data.data(), data.size());
  295. inputFile.FlushData();
  296. inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
  297. inputFileAttrs.FlushData();
  298. THashMap<TString, TString> tables;
  299. tables["yt.plato.Input"] = inputFile.Name();
  300. auto s = "(\n"
  301. "(let mr_source (DataSource 'yt 'plato))\n"
  302. "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
  303. "(let world (Left! x))\n"
  304. "(let table1 (Right! x))\n"
  305. "(let res_sink (DataSink 'result))\n"
  306. "(let mr_sink (DataSink 'yt 'plato))\n"
  307. "(let world (Write! world res_sink (Key) table1 '('('ref))))\n"
  308. "(let world (Commit! world mr_sink))\n"
  309. "(let world (Commit! world res_sink))\n"
  310. "(return world)\n"
  311. ")\n";
  312. auto res = RunProgram(s, tables);
  313. UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
  314. UNIT_ASSERT_NO_DIFF(
  315. "{\"Write\"=[{\"Ref\"=["
  316. "{\"Reference\"=[\"yt\";\"plato\";\"Input\"];\"Columns\"=[\"key\";\"subkey\";\"value\"];\"Remove\"=%false}"
  317. "]}]}",
  318. res[0]
  319. );
  320. }
  321. Y_UNIT_TEST(WriteToResultTransformedTableByRef) {
  322. TTempFileHandle inputFile;
  323. TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
  324. TTempDir tmpDir;
  325. TStringBuf data =
  326. "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
  327. "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
  328. "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
  329. "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
  330. ;
  331. inputFile.Write(data.data(), data.size());
  332. inputFile.FlushData();
  333. inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
  334. inputFileAttrs.FlushData();
  335. TVector<TString> progs;
  336. progs.push_back(
  337. "(\n"
  338. "(let mr_source (DataSource 'yt 'plato))\n"
  339. "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
  340. "(let world (Left! x))\n"
  341. "(let table1 (Right! x))\n"
  342. "(let table1low (FlatMap table1 (lambda '(item) (block '(\n"
  343. " (let intValueOpt (FromString (Member item 'key) 'Int32))\n"
  344. " (let ret (FlatMap intValueOpt (lambda '(item2) (block '(\n"
  345. " (return (ListIf (< item2 (Int32 '100)) item))\n"
  346. " )))))"
  347. " (return ret)"
  348. ")))))"
  349. "(let res_sink (DataSink 'result))\n"
  350. "(let mr_sink (DataSink 'yt 'plato))\n"
  351. "(let world (Write! world res_sink (Key) table1low '('('ref))))\n"
  352. "(let world (Commit! world mr_sink))\n"
  353. "(let world (Commit! world res_sink))\n"
  354. "(return world)\n"
  355. ")\n"
  356. );
  357. progs.push_back(
  358. "(\n"
  359. "(let mr_source (DataSource 'yt 'plato))\n"
  360. "(let x (Read! world mr_source (Key '('table (String 'Result0_0))) '('key 'subkey 'value) '()))\n"
  361. "(let world (Left! x))\n"
  362. "(let table1 (Right! x))\n"
  363. "(let res_sink (DataSink 'result))\n"
  364. "(let data (AsList (String 'x)))\n"
  365. "(let world (Write! world res_sink (Key) table1 '()))\n"
  366. "(let world (Commit! world res_sink))\n"
  367. "(return world)\n"
  368. ")\n"
  369. );
  370. TRunMultiplePrograms driver(progs, Cerr);
  371. driver.Tables["yt.plato.Input"] = inputFile.Name();
  372. driver.TmpDir = tmpDir.Name();
  373. auto res = Run(driver);
  374. UNIT_ASSERT_VALUES_EQUAL(res.size(), 2);
  375. UNIT_ASSERT_NO_DIFF(
  376. "{\"Write\"=[{\"Ref\"=["
  377. "{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"key\";\"subkey\";\"value\"];\"Remove\"=%true}"
  378. "]}]}",
  379. res[0]
  380. );
  381. UNIT_ASSERT_NO_DIFF(
  382. "{\"Write\"=[{\"Data\"=["
  383. "[\"075\";\".\";\"abc\"];"
  384. "[\"020\";\".\";\"q\"]"
  385. "]}]}",
  386. res[1]
  387. );
  388. }
  389. Y_UNIT_TEST(WriteAndTakeResult) {
  390. TTempFileHandle inputFile;
  391. TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
  392. TTempDir tmpDir;
  393. TStringBuf data =
  394. "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
  395. "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
  396. "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
  397. "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
  398. ;
  399. inputFile.Write(data.data(), data.size());
  400. inputFile.FlushData();
  401. inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
  402. inputFileAttrs.FlushData();
  403. TVector<TString> progs;
  404. progs.push_back(
  405. "(\n"
  406. "(let mr_source (DataSource 'yt 'plato))\n"
  407. "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
  408. "(let world (Left! x))\n"
  409. "(let table (Right! x))\n"
  410. "(let result (Map table (lambda '(item) (block '("
  411. " (let res (Struct))"
  412. " (let res (AddMember res 'k (Member item 'key)))"
  413. " (let res (AddMember res 's (Member item 'subkey)))"
  414. " (let res (AddMember res 'v (Member item 'value)))"
  415. " (return res)"
  416. ")))))"
  417. "(let res_sink (DataSink 'result))\n"
  418. "(let mr_sink (DataSink 'yt 'plato))\n"
  419. "(let world (Write! world res_sink (Key) result '('('ref))))\n"
  420. "(let world (Commit! world mr_sink))\n"
  421. "(let world (Commit! world res_sink))\n"
  422. "(return world)\n"
  423. ")\n"
  424. );
  425. progs.push_back(
  426. "(\n"
  427. "(let mr_source (DataSource 'yt 'plato))\n"
  428. "(let x (Read! world mr_source (Key '('table (String 'Result0_0))) '('k 's 'v) '()))\n"
  429. "(let world (Left! x))\n"
  430. "(let table (Right! x))\n"
  431. "(let result (Take table (Uint64 '2)))"
  432. "(let res_sink (DataSink 'result))\n"
  433. "(let mr_sink (DataSink 'yt 'plato))\n"
  434. "(let world (Write! world res_sink (Key) result '('('type))))\n"
  435. "(let world (Commit! world mr_sink))\n"
  436. "(let world (Commit! world res_sink))\n"
  437. "(return world)\n"
  438. ")\n"
  439. );
  440. TRunMultiplePrograms driver(progs, Cerr);
  441. driver.Tables["yt.plato.Input"] = inputFile.Name();
  442. driver.TmpDir = tmpDir.Name();
  443. auto res = Run(driver);
  444. UNIT_ASSERT_VALUES_EQUAL(res.size(), 2);
  445. //~ Cerr << res[0] << Endl;
  446. //~ Cerr << res[1] << Endl;
  447. UNIT_ASSERT_NO_DIFF(
  448. "{\"Write\"=[{\"Ref\"=["
  449. "{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"k\";\"s\";\"v\"];\"Remove\"=%true}"
  450. "]}]}",
  451. res[0]
  452. );
  453. UNIT_ASSERT_NO_DIFF(
  454. "{\"Write\"=[{"
  455. "\"Type\"=[\"ListType\";[\"StructType\";["
  456. "[\"k\";[\"DataType\";\"String\"]];[\"s\";[\"DataType\";\"String\"]];[\"v\";[\"DataType\";\"String\"]]"
  457. "]]];"
  458. "\"Data\"=["
  459. "[\"075\";\".\";\"abc\"];"
  460. "[\"800\";\".\";\"ddd\"]"
  461. "]}"
  462. "]}",
  463. res[1]
  464. );
  465. }
  466. Y_UNIT_TEST(WriteAndReadScheme) {
  467. TTempFileHandle inputFile;
  468. TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
  469. TTempDir tmpDir;
  470. TStringBuf data =
  471. "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
  472. "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
  473. "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
  474. "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
  475. ;
  476. inputFile.Write(data.data(), data.size());
  477. inputFile.FlushData();
  478. inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
  479. inputFileAttrs.FlushData();
  480. TVector<TString> progs;
  481. progs.push_back(
  482. "(\n"
  483. "(let mr_source (DataSource 'yt 'plato))\n"
  484. "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
  485. "(let world (Left! x))\n"
  486. "(let table (Right! x))\n"
  487. "(let result0 (Map table (lambda '(item) (block '("
  488. " (return (AsStruct '('bar (Coalesce (FromString (Member item 'key) 'Uint64) (Uint64 '0)))))"
  489. ")))))"
  490. "(let result1 (Map result0 (lambda '(item) (block '("
  491. " (return (AddMember (Struct) 'foo item))"
  492. ")))))"
  493. "(let res_sink (DataSink 'result))\n"
  494. "(let mr_sink (DataSink 'yt 'plato))\n"
  495. "(let world (Write! world res_sink (Key) result0 '('('ref))))\n"
  496. "(let world (Write! world res_sink (Key) result1 '('('ref))))\n"
  497. "(let world (Commit! world mr_sink))\n"
  498. "(let world (Commit! world res_sink))\n"
  499. "(return world)\n"
  500. ")\n"
  501. );
  502. progs.push_back(
  503. "(\n"
  504. "(let mr_source (DataSource 'yt 'plato))\n"
  505. "(let x (Read! world mr_source (Key '('tablescheme (String 'Result0_0))) (Void) '()))\n"
  506. "(let world (Left! x))\n"
  507. "(let scheme (Right! x))\n"
  508. "(let res_sink (DataSink 'result))\n"
  509. "(let mr_sink (DataSink 'yt 'plato))\n"
  510. "(let world (Write! world res_sink (Key) scheme '('('type))))\n"
  511. "(let world (Commit! world mr_sink))\n"
  512. "(let world (Commit! world res_sink))\n"
  513. "(return world)\n"
  514. ")\n"
  515. );
  516. progs.push_back(
  517. "(\n"
  518. "(let mr_source (DataSource 'yt 'plato))\n"
  519. "(let x (Read! world mr_source (Key '('tablescheme (String 'Result0_1))) (Void) '()))\n"
  520. "(let world (Left! x))\n"
  521. "(let scheme (Right! x))\n"
  522. "(let res_sink (DataSink 'result))\n"
  523. "(let mr_sink (DataSink 'yt 'plato))\n"
  524. "(let world (Write! world res_sink (Key) scheme '('('type))))\n"
  525. "(let world (Commit! world mr_sink))\n"
  526. "(let world (Commit! world res_sink))\n"
  527. "(return world)\n"
  528. ")\n"
  529. );
  530. TRunMultiplePrograms driver(progs, Cerr);
  531. driver.Tables["yt.plato.Input"] = inputFile.Name();
  532. driver.TmpDir = tmpDir.Name();
  533. auto res = Run(driver);
  534. UNIT_ASSERT_VALUES_EQUAL(res.size(), 3);
  535. //~ Cerr << res[0] << Endl;
  536. //~ Cerr << res[1] << Endl;
  537. //~ Cerr << res[2] << Endl;
  538. UNIT_ASSERT_NO_DIFF(
  539. "{\"Write\"=["
  540. "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"bar\"];\"Remove\"=%true}]};"
  541. "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/7ae6459a-7382d1e7-7935c08e-2\"];\"Columns\"=[\"foo\"];\"Remove\"=%true}]}"
  542. "]}",
  543. res[0]
  544. );
  545. UNIT_ASSERT(res[1].find("\"Fields\"=[{\"Name\"=\"bar\"") != TString::npos);
  546. UNIT_ASSERT(res[2].find("\"Fields\"=[{\"Name\"=\"foo\"") != TString::npos);
  547. }
  548. Y_UNIT_TEST(ExtendSortedWithNonSortedAndRead) {
  549. TTempFileHandle inputFile;
  550. TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
  551. TTempFileHandle outputFile;
  552. TTempFile outputFileAttr(outputFile.Name() + ".attr");
  553. TTempDir tmpDir;
  554. TStringBuf data =
  555. "{\"key\"=\"foo\";\"subkey\"=\"wat\";\"value\"=\"222\"};\n"
  556. "{\"key\"=\"bar\";\"subkey\"=\"wat\";\"value\"=\"111\"};\n"
  557. "{\"key\"=\"jar\";\"subkey\"=\"wat\";\"value\"=\"333\"};\n"sv
  558. ;
  559. inputFile.Write(data.data(), data.size());
  560. inputFile.FlushData();
  561. inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
  562. inputFileAttrs.FlushData();
  563. TVector<TString> progs;
  564. progs.push_back(
  565. "(\n"
  566. "(let source (DataSource 'yt 'plato))\n"
  567. "(let x (Read! world source (Key '('table (String 'Input))) '('key 'value) '()))\n"
  568. "(let world (Left! x))\n"
  569. "(let table (Right! x))\n"
  570. "(let sorted (Sort table (Bool 'true) (lambda '(item) (Member item 'value))))\n"
  571. "(let result (Extend table sorted))\n"
  572. "(let sink (DataSink 'yt 'plato))\n"
  573. "(let world (Write! world sink (Key '('table (String 'Output))) result '()))\n"
  574. "(let world (Commit! world sink))\n"
  575. "(return world)\n"
  576. ")\n"
  577. );
  578. progs.push_back(
  579. "(\n"
  580. "(let mr_source (DataSource 'yt 'plato))\n"
  581. "(let x (Read! world mr_source (Key '('table (String 'Output))) '('key 'value) '()))\n"
  582. "(let world (Left! x))\n"
  583. "(let result (Right! x))\n"
  584. "(let res_sink (DataSink 'result))\n"
  585. "(let mr_sink (DataSink 'yt 'plato))\n"
  586. "(let world (Write! world res_sink (Key) result '('('type))))\n"
  587. "(let world (Commit! world mr_sink))\n"
  588. "(let world (Commit! world res_sink))\n"
  589. "(return world)\n"
  590. ")\n"
  591. );
  592. TRunMultiplePrograms driver(progs, Cerr);
  593. driver.TmpDir = tmpDir.Name();
  594. driver.Tables["yt.plato.Input"] = inputFile.Name();
  595. driver.Tables["yt.plato.Output"] = outputFile.Name();
  596. auto res = Run(driver);
  597. UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
  598. //~ Cerr << res[0] << Endl;
  599. UNIT_ASSERT_NO_DIFF(
  600. "{\"Write\"=[{"
  601. "\"Type\"=[\"ListType\";[\"StructType\";[[\"key\";[\"DataType\";\"String\"]];[\"value\";[\"DataType\";\"String\"]]]]];"
  602. "\"Data\"=["
  603. "[\"foo\";\"222\"];[\"bar\";\"111\"];[\"jar\";\"333\"];[\"foo\";\"222\"];[\"bar\";\"111\"];[\"jar\";\"333\"]"
  604. "]}]}",
  605. res[0]
  606. );
  607. }
  608. Y_UNIT_TEST(OrderedExtendSortedWithNonSortedAndRead) {
  609. TTempFileHandle inputFile;
  610. TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
  611. TTempFileHandle outputFile;
  612. TTempFile outputFileAttr(outputFile.Name() + ".attr");
  613. TTempDir tmpDir;
  614. TStringBuf data =
  615. "{\"key\"=\"foo\";\"subkey\"=\"wat\";\"value\"=\"222\"};\n"
  616. "{\"key\"=\"bar\";\"subkey\"=\"wat\";\"value\"=\"111\"};\n"
  617. "{\"key\"=\"jar\";\"subkey\"=\"wat\";\"value\"=\"333\"};\n"sv
  618. ;
  619. inputFile.Write(data.data(), data.size());
  620. inputFile.FlushData();
  621. inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
  622. inputFileAttrs.FlushData();
  623. TVector<TString> progs;
  624. progs.push_back(
  625. "(\n"
  626. "(let source (DataSource 'yt 'plato))\n"
  627. "(let x (Read! world source (Key '('table (String 'Input))) '('key 'value) '()))\n"
  628. "(let world (Left! x))\n"
  629. "(let table (Right! x))\n"
  630. "(let sorted (Sort table (Bool 'true) (lambda '(item) (Member item 'value))))\n"
  631. "(let result (OrderedExtend table sorted))\n"
  632. "(let sink (DataSink 'yt 'plato))\n"
  633. "(let world (Write! world sink (Key '('table (String 'Output))) result '()))\n"
  634. "(let world (Commit! world sink))\n"
  635. "(return world)\n"
  636. ")\n"
  637. );
  638. progs.push_back(
  639. "(\n"
  640. "(let mr_source (DataSource 'yt 'plato))\n"
  641. "(let x (Read! world mr_source (Key '('table (String 'Output))) '('key 'value) '()))\n"
  642. "(let world (Left! x))\n"
  643. "(let result (Right! x))\n"
  644. "(let res_sink (DataSink 'result))\n"
  645. "(let mr_sink (DataSink 'yt 'plato))\n"
  646. "(let world (Write! world res_sink (Key) result '('('type))))\n"
  647. "(let world (Commit! world mr_sink))\n"
  648. "(let world (Commit! world res_sink))\n"
  649. "(return world)\n"
  650. ")\n"
  651. );
  652. TRunMultiplePrograms driver(progs, Cerr);
  653. driver.TmpDir = tmpDir.Name();
  654. driver.Tables["yt.plato.Input"] = inputFile.Name();
  655. driver.Tables["yt.plato.Output"] = outputFile.Name();
  656. auto res = Run(driver);
  657. UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
  658. //~ Cerr << res[0] << Endl;
  659. UNIT_ASSERT_NO_DIFF(
  660. "{\"Write\"=[{"
  661. "\"Type\"=[\"ListType\";[\"StructType\";[[\"key\";[\"DataType\";\"String\"]];[\"value\";[\"DataType\";\"String\"]]]]];"
  662. "\"Data\"=["
  663. "[\"foo\";\"222\"];[\"bar\";\"111\"];[\"jar\";\"333\"];[\"bar\";\"111\"];[\"foo\";\"222\"];[\"jar\";\"333\"]"
  664. "]}]}",
  665. res[0]
  666. );
  667. }
  668. Y_UNIT_TEST(TestParametersEvaluation) {
  669. auto s = "(\n"
  670. "(let res_sink (DataSink 'result))\n"
  671. "(let data (Parameter '\"$foo\" (ParseType '\"Tuple<String, Int32 ? , List<Uint32>, Dict<Int32, Bool>, Struct<a : Void, b : Double>, Variant<Int32, Bool>>\")))\n"
  672. "(let world (Write! world res_sink (Key) data '('('type))))\n"
  673. "(let world (Commit! world res_sink))\n"
  674. "(return world)\n"
  675. ")\n";
  676. auto params = R"__(
  677. {"$foo"={Data=[
  678. bar;
  679. "33";
  680. ["1";"2";"3"];
  681. [["7";%true];["12";%false]];
  682. [#;"-1.7"];
  683. ["0";"8"];
  684. ]}}
  685. )__";
  686. auto res = RunProgram(s, THashMap<TString, TString>(), "", params);
  687. UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
  688. UNIT_ASSERT_NO_DIFF(R"__({"Write"=[{"Type"=["TupleType";[["DataType";"String"];["OptionalType";["DataType";"Int32"]];["ListType";["DataType";"Uint32"]];["DictType";["DataType";"Int32"];["DataType";"Bool"]];["StructType";[["a";["VoidType"]];["b";["DataType";"Double"]]]];["VariantType";["TupleType";[["DataType";"Int32"];["DataType";"Bool"]]]]]];"Data"=["bar";["33"];["1";"2";"3"];[["7";%true];["12";%false]];["Void";"-1.7"];["0";"8"]]}]})__", res[0]);
  689. }
  690. }
  691. } // namespace NYql