123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792 |
- #include "yql_execution.h"
- #include <yql/essentials/core/ut_common/yql_ut_common.h>
- #include <yql/essentials/ast/yql_ast_annotation.h>
- #include <yql/essentials/ast/yql_expr.h>
- #include <yql/essentials/core/type_ann/type_ann_core.h>
- #include <yql/essentials/core/yql_expr_optimize.h>
- #include <yql/essentials/core/yql_expr_type_annotation.h>
- #include <yql/essentials/core/yql_opt_proposed_by_data.h>
- #include <yql/essentials/core/yql_opt_rewrite_io.h>
- #include <yql/essentials/providers/common/provider/yql_provider_names.h>
- #include <yql/essentials/providers/common/schema/parser/yql_type_parser.h>
- #include <yql/essentials/providers/result/provider/yql_result_provider.h>
- #include <yql/essentials/core/facade/yql_facade.h>
- #include <contrib/ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
- #include <contrib/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h>
- #include <contrib/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h>
- #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/system/user.h>
- #include <util/system/tempfile.h>
- #include <util/system/defaults.h>
- #include <util/system/fstat.h>
- #include <util/folder/path.h>
- #include <util/folder/tempdir.h>
- #include <util/string/cast.h>
- #include <util/string/builder.h>
- #include <util/system/sanitizers.h>
- namespace NYql {
- static TString BuildFileNameForTmpTable(TStringBuf table, TStringBuf tmpDir) {
- return TStringBuilder() << tmpDir << LOCSLASH_C << table.substr(4) << ".tmp";
- }
- struct TRunSingleProgram {
- TString Src;
- TString TmpDir;
- TString Parameters;
- IOutputStream& Err;
- TVector<TString> Res;
- THashMap<TString, TString> Tables;
- TRunSingleProgram(const TString& src, IOutputStream& err)
- : Src(src)
- , Err(err)
- {
- }
- bool Run(
- const NKikimr::NMiniKQL::IFunctionRegistry* funcReg
- ) {
- auto yqlNativeServices = NFile::TYtFileServices::Make(funcReg, Tables, {}, TmpDir);
- auto ytGateway = CreateYtFileGateway(yqlNativeServices);
- TVector<TDataProviderInitializer> dataProvidersInit;
- dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway));
- TProgramFactory factory(true, funcReg, 0ULL, dataProvidersInit, "ut");
- TProgramPtr program = factory.Create("-stdin-", Src);
- program->ConfigureYsonResultFormat(NYson::EYsonFormat::Text);
- if (!Parameters.empty()) {
- program->SetParametersYson(Parameters);
- }
- if (!program->ParseYql() || !program->Compile(GetUsername())) {
- program->PrintErrorsTo(Err);
- return false;
- }
- TProgram::TStatus status = program->Run(GetUsername());
- if (status == TProgram::TStatus::Error) {
- program->PrintErrorsTo(Err);
- }
- Res = program->Results();
- return status == TProgram::TStatus::Ok;
- }
- void AddResults(TVector<TString>& res) const {
- res.insert(res.end(), Res.begin(), Res.end());
- }
- bool Finished() const {
- return true;
- }
- };
- struct TRunMultiplePrograms: public TRunSingleProgram {
- TVector<TString> Srcs;
- size_t Curr;
- TRunMultiplePrograms(const TVector<TString>& srcs, IOutputStream& err)
- : TRunSingleProgram(TString(), err)
- , Srcs(srcs)
- , Curr(0)
- {
- }
- bool Run(
- const NKikimr::NMiniKQL::IFunctionRegistry* funcReg
- ) {
- TString origTmpDir = TmpDir;
- if (TmpDir) {
- TFsPath newTmp = TFsPath(TmpDir) / ToString(Curr);
- newTmp.MkDirs();
- TmpDir = newTmp.GetPath();
- }
- Src = Srcs[Curr];
- if (!TRunSingleProgram::Run(funcReg)) {
- return false;
- }
- ui32 idx = 0;
- for (auto& resStr: Res) {
- NYT::TNode res;
- if (!NCommon::ParseYson(res, resStr, Err)) {
- return false;
- }
- if (!res.IsMap() || !res.HasKey("Write") || !res["Write"].IsList()) {
- Err << "Invalid result: " << resStr << Endl;
- return false;
- }
- for (auto& elem: res["Write"].AsList()) {
- if (!elem.IsMap()) {
- Err << "Invalid result element in result: " << resStr << Endl;
- return false;
- }
- if (elem.HasKey("Ref")) {
- if (!elem["Ref"].IsList()) {
- Err << "Invalid reference in result: " << resStr << Endl;
- return false;
- }
- for (auto& refElem: elem["Ref"].AsList()) {
- if (!refElem.IsMap() || !refElem.HasKey("Reference")) {
- Err << "Invalid reference in result: " << resStr << Endl;
- return false;
- }
- if (!refElem["Remove"].AsBool()) {
- continue;
- }
- const auto& ref = refElem["Reference"].AsList();
- TStringStream name;
- name << ref[0].AsString() << "." << ref[1].AsString() << ".Result" << Curr << "_" << idx;
- Tables[name.Str()] = BuildFileNameForTmpTable(ref[2].AsString(), TmpDir);
- ++idx;
- }
- }
- }
- }
- ++Curr;
- origTmpDir.swap(TmpDir);
- return true;
- }
- bool Finished() const {
- return Curr == Srcs.size();
- }
- };
- template <typename TDriver>
- TVector<TString> Run(TDriver& driver) {
- auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry());
- TVector<TString> res;
- do {
- const bool runRes = driver.Run(functionRegistry.Get());
- UNIT_ASSERT(runRes);
- driver.AddResults(res);
- } while (!driver.Finished());
- return res;
- }
- TVector<TString> RunProgram(const TString& programSrc, const THashMap<TString, TString>& tables, const TString& tmpDir = TString(), const TString& params = TString()) {
- TRunSingleProgram driver(programSrc, Cerr);
- driver.Tables = tables;
- driver.TmpDir = tmpDir;
- driver.Parameters = params;
- return Run(driver);
- }
- static const TStringBuf KSV_ATTRS =
- "{\"_yql_row_spec\" = {\"Type\" = [\"StructType\";["
- "[\"key\";[\"DataType\";\"String\"]];"
- "[\"subkey\";[\"DataType\";\"String\"]];"
- "[\"value\";[\"DataType\";\"String\"]]"
- "]]}}"
- ;
- Y_UNIT_TEST_SUITE(ExecutionYqlExpr) {
- Y_UNIT_TEST(WriteToResultUsingIsolatedGraph) {
- auto s = "(\n"
- "(let res_sink (DataSink 'result))\n"
- "(let data (AsList (String 'x)))\n"
- "(let world (Write! world res_sink (Key) data '()))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n";
- auto res = RunProgram(s, THashMap<TString, TString>());
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
- UNIT_ASSERT_NO_DIFF("{\"Write\"=[{\"Data\"=[\"x\"]}]}", res[0]);
- }
- Y_UNIT_TEST(WriteToResultTableOutput) {
- TTempFileHandle inputFile;
- TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
- TStringBuf data =
- "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
- "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
- "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
- "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
- ;
- inputFile.Write(data.data(), data.size());
- inputFile.FlushData();
- inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
- inputFileAttrs.FlushData();
- THashMap<TString, TString> tables;
- tables["yt.plato.Input"] = inputFile.Name();
- auto s = "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let table1 (Right! x))\n"
- "(let res_sink (DataSink 'result))\n"
- "(let data (AsList (String 'x)))\n"
- "(let world (Write! world res_sink (Key) table1 '()))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n";
- auto res = RunProgram(s, tables);
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=[{\"Data\"=["
- "[\"075\";\".\";\"abc\"];"
- "[\"800\";\".\";\"ddd\"];"
- "[\"020\";\".\";\"q\"];"
- "[\"150\";\".\";\"qzz\"]"
- "]}]}",
- res[0]
- );
- }
- Y_UNIT_TEST(WriteToResultTransformedTable) {
- TTempFileHandle inputFile;
- TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
- TStringBuf data =
- "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
- "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
- "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
- "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
- ;
- inputFile.Write(data.data(), data.size());
- inputFile.FlushData();
- inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
- inputFileAttrs.FlushData();
- THashMap<TString, TString> tables;
- tables["yt.plato.Input"] = inputFile.Name();
- auto s = "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let table1 (Right! x))\n"
- "(let table1low (FlatMap table1 (lambda '(item) (block '(\n"
- " (let intValueOpt (FromString (Member item 'key) 'Int32))\n"
- " (let ret (FlatMap intValueOpt (lambda '(item2) (block '(\n"
- " (return (ListIf (< item2 (Int32 '100)) item))\n"
- " )))))"
- " (return ret)"
- ")))))"
- "(let res_sink (DataSink 'result))\n"
- "(let data (AsList (String 'x)))\n"
- "(let world (Write! world res_sink (Key) table1low '()))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n";
- auto res = RunProgram(s, tables);
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=[{\"Data\"=["
- "[\"075\";\".\";\"abc\"];"
- "[\"020\";\".\";\"q\"]"
- "]}]}",
- res[0]
- );
- }
- Y_UNIT_TEST(DropTable) {
- TTempFileHandle outputFile;
- TTempFileHandle outputFileAttrs(outputFile.Name() + ".attr");
- TStringBuf data =
- "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"sv
- ;
- outputFile.Write(data.data(), data.size());
- outputFile.FlushData();
- outputFile.Close();
- outputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
- outputFileAttrs.FlushData();
- outputFileAttrs.Close();
- UNIT_ASSERT(TFileStat(outputFile.Name()).IsFile());
- THashMap<TString, TString> tables;
- tables["yt.plato.Output"] = outputFile.Name();
- auto s = "(\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world mr_sink (Key '('table (String 'Output))) (Void) '('('mode 'drop))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(return world)\n"
- ")\n";
- RunProgram(s, tables);
- UNIT_ASSERT(!TFileStat(outputFile.Name()).IsFile());
- }
- Y_UNIT_TEST(WriteToResultTableByRef) {
- TTempFileHandle inputFile;
- TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
- TStringBuf data =
- "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
- "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
- "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
- "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
- ;
- inputFile.Write(data.data(), data.size());
- inputFile.FlushData();
- inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
- inputFileAttrs.FlushData();
- THashMap<TString, TString> tables;
- tables["yt.plato.Input"] = inputFile.Name();
- auto s = "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let table1 (Right! x))\n"
- "(let res_sink (DataSink 'result))\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world res_sink (Key) table1 '('('ref))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n";
- auto res = RunProgram(s, tables);
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=[{\"Ref\"=["
- "{\"Reference\"=[\"yt\";\"plato\";\"Input\"];\"Columns\"=[\"key\";\"subkey\";\"value\"];\"Remove\"=%false}"
- "]}]}",
- res[0]
- );
- }
- Y_UNIT_TEST(WriteToResultTransformedTableByRef) {
- TTempFileHandle inputFile;
- TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
- TTempDir tmpDir;
- TStringBuf data =
- "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
- "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
- "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
- "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
- ;
- inputFile.Write(data.data(), data.size());
- inputFile.FlushData();
- inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
- inputFileAttrs.FlushData();
- TVector<TString> progs;
- progs.push_back(
- "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let table1 (Right! x))\n"
- "(let table1low (FlatMap table1 (lambda '(item) (block '(\n"
- " (let intValueOpt (FromString (Member item 'key) 'Int32))\n"
- " (let ret (FlatMap intValueOpt (lambda '(item2) (block '(\n"
- " (return (ListIf (< item2 (Int32 '100)) item))\n"
- " )))))"
- " (return ret)"
- ")))))"
- "(let res_sink (DataSink 'result))\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world res_sink (Key) table1low '('('ref))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n"
- );
- progs.push_back(
- "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Result0_0))) '('key 'subkey 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let table1 (Right! x))\n"
- "(let res_sink (DataSink 'result))\n"
- "(let data (AsList (String 'x)))\n"
- "(let world (Write! world res_sink (Key) table1 '()))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n"
- );
- TRunMultiplePrograms driver(progs, Cerr);
- driver.Tables["yt.plato.Input"] = inputFile.Name();
- driver.TmpDir = tmpDir.Name();
- auto res = Run(driver);
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 2);
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=[{\"Ref\"=["
- "{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"key\";\"subkey\";\"value\"];\"Remove\"=%true}"
- "]}]}",
- res[0]
- );
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=[{\"Data\"=["
- "[\"075\";\".\";\"abc\"];"
- "[\"020\";\".\";\"q\"]"
- "]}]}",
- res[1]
- );
- }
- Y_UNIT_TEST(WriteAndTakeResult) {
- TTempFileHandle inputFile;
- TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
- TTempDir tmpDir;
- TStringBuf data =
- "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
- "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
- "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
- "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
- ;
- inputFile.Write(data.data(), data.size());
- inputFile.FlushData();
- inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
- inputFileAttrs.FlushData();
- TVector<TString> progs;
- progs.push_back(
- "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let table (Right! x))\n"
- "(let result (Map table (lambda '(item) (block '("
- " (let res (Struct))"
- " (let res (AddMember res 'k (Member item 'key)))"
- " (let res (AddMember res 's (Member item 'subkey)))"
- " (let res (AddMember res 'v (Member item 'value)))"
- " (return res)"
- ")))))"
- "(let res_sink (DataSink 'result))\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world res_sink (Key) result '('('ref))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n"
- );
- progs.push_back(
- "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Result0_0))) '('k 's 'v) '()))\n"
- "(let world (Left! x))\n"
- "(let table (Right! x))\n"
- "(let result (Take table (Uint64 '2)))"
- "(let res_sink (DataSink 'result))\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world res_sink (Key) result '('('type))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n"
- );
- TRunMultiplePrograms driver(progs, Cerr);
- driver.Tables["yt.plato.Input"] = inputFile.Name();
- driver.TmpDir = tmpDir.Name();
- auto res = Run(driver);
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 2);
- //~ Cerr << res[0] << Endl;
- //~ Cerr << res[1] << Endl;
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=[{\"Ref\"=["
- "{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"k\";\"s\";\"v\"];\"Remove\"=%true}"
- "]}]}",
- res[0]
- );
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=[{"
- "\"Type\"=[\"ListType\";[\"StructType\";["
- "[\"k\";[\"DataType\";\"String\"]];[\"s\";[\"DataType\";\"String\"]];[\"v\";[\"DataType\";\"String\"]]"
- "]]];"
- "\"Data\"=["
- "[\"075\";\".\";\"abc\"];"
- "[\"800\";\".\";\"ddd\"]"
- "]}"
- "]}",
- res[1]
- );
- }
- Y_UNIT_TEST(WriteAndReadScheme) {
- TTempFileHandle inputFile;
- TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
- TTempDir tmpDir;
- TStringBuf data =
- "{\"key\"=\"075\";\"subkey\"=\".\";\"value\"=\"abc\"};\n"
- "{\"key\"=\"800\";\"subkey\"=\".\";\"value\"=\"ddd\"};\n"
- "{\"key\"=\"020\";\"subkey\"=\".\";\"value\"=\"q\"};\n"
- "{\"key\"=\"150\";\"subkey\"=\".\";\"value\"=\"qzz\"};\n"sv
- ;
- inputFile.Write(data.data(), data.size());
- inputFile.FlushData();
- inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
- inputFileAttrs.FlushData();
- TVector<TString> progs;
- progs.push_back(
- "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let table (Right! x))\n"
- "(let result0 (Map table (lambda '(item) (block '("
- " (return (AsStruct '('bar (Coalesce (FromString (Member item 'key) 'Uint64) (Uint64 '0)))))"
- ")))))"
- "(let result1 (Map result0 (lambda '(item) (block '("
- " (return (AddMember (Struct) 'foo item))"
- ")))))"
- "(let res_sink (DataSink 'result))\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world res_sink (Key) result0 '('('ref))))\n"
- "(let world (Write! world res_sink (Key) result1 '('('ref))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n"
- );
- progs.push_back(
- "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('tablescheme (String 'Result0_0))) (Void) '()))\n"
- "(let world (Left! x))\n"
- "(let scheme (Right! x))\n"
- "(let res_sink (DataSink 'result))\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world res_sink (Key) scheme '('('type))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n"
- );
- progs.push_back(
- "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('tablescheme (String 'Result0_1))) (Void) '()))\n"
- "(let world (Left! x))\n"
- "(let scheme (Right! x))\n"
- "(let res_sink (DataSink 'result))\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world res_sink (Key) scheme '('('type))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n"
- );
- TRunMultiplePrograms driver(progs, Cerr);
- driver.Tables["yt.plato.Input"] = inputFile.Name();
- driver.TmpDir = tmpDir.Name();
- auto res = Run(driver);
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 3);
- //~ Cerr << res[0] << Endl;
- //~ Cerr << res[1] << Endl;
- //~ Cerr << res[2] << Endl;
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=["
- "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"bar\"];\"Remove\"=%true}]};"
- "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/7ae6459a-7382d1e7-7935c08e-2\"];\"Columns\"=[\"foo\"];\"Remove\"=%true}]}"
- "]}",
- res[0]
- );
- UNIT_ASSERT(res[1].find("\"Fields\"=[{\"Name\"=\"bar\"") != TString::npos);
- UNIT_ASSERT(res[2].find("\"Fields\"=[{\"Name\"=\"foo\"") != TString::npos);
- }
- Y_UNIT_TEST(ExtendSortedWithNonSortedAndRead) {
- TTempFileHandle inputFile;
- TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
- TTempFileHandle outputFile;
- TTempFile outputFileAttr(outputFile.Name() + ".attr");
- TTempDir tmpDir;
- TStringBuf data =
- "{\"key\"=\"foo\";\"subkey\"=\"wat\";\"value\"=\"222\"};\n"
- "{\"key\"=\"bar\";\"subkey\"=\"wat\";\"value\"=\"111\"};\n"
- "{\"key\"=\"jar\";\"subkey\"=\"wat\";\"value\"=\"333\"};\n"sv
- ;
- inputFile.Write(data.data(), data.size());
- inputFile.FlushData();
- inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
- inputFileAttrs.FlushData();
- TVector<TString> progs;
- progs.push_back(
- "(\n"
- "(let source (DataSource 'yt 'plato))\n"
- "(let x (Read! world source (Key '('table (String 'Input))) '('key 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let table (Right! x))\n"
- "(let sorted (Sort table (Bool 'true) (lambda '(item) (Member item 'value))))\n"
- "(let result (Extend table sorted))\n"
- "(let sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world sink (Key '('table (String 'Output))) result '()))\n"
- "(let world (Commit! world sink))\n"
- "(return world)\n"
- ")\n"
- );
- progs.push_back(
- "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Output))) '('key 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let result (Right! x))\n"
- "(let res_sink (DataSink 'result))\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world res_sink (Key) result '('('type))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n"
- );
- TRunMultiplePrograms driver(progs, Cerr);
- driver.TmpDir = tmpDir.Name();
- driver.Tables["yt.plato.Input"] = inputFile.Name();
- driver.Tables["yt.plato.Output"] = outputFile.Name();
- auto res = Run(driver);
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
- //~ Cerr << res[0] << Endl;
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=[{"
- "\"Type\"=[\"ListType\";[\"StructType\";[[\"key\";[\"DataType\";\"String\"]];[\"value\";[\"DataType\";\"String\"]]]]];"
- "\"Data\"=["
- "[\"foo\";\"222\"];[\"bar\";\"111\"];[\"jar\";\"333\"];[\"foo\";\"222\"];[\"bar\";\"111\"];[\"jar\";\"333\"]"
- "]}]}",
- res[0]
- );
- }
- Y_UNIT_TEST(OrderedExtendSortedWithNonSortedAndRead) {
- TTempFileHandle inputFile;
- TTempFileHandle inputFileAttrs(inputFile.Name() + ".attr");
- TTempFileHandle outputFile;
- TTempFile outputFileAttr(outputFile.Name() + ".attr");
- TTempDir tmpDir;
- TStringBuf data =
- "{\"key\"=\"foo\";\"subkey\"=\"wat\";\"value\"=\"222\"};\n"
- "{\"key\"=\"bar\";\"subkey\"=\"wat\";\"value\"=\"111\"};\n"
- "{\"key\"=\"jar\";\"subkey\"=\"wat\";\"value\"=\"333\"};\n"sv
- ;
- inputFile.Write(data.data(), data.size());
- inputFile.FlushData();
- inputFileAttrs.Write(KSV_ATTRS.data(), KSV_ATTRS.size());
- inputFileAttrs.FlushData();
- TVector<TString> progs;
- progs.push_back(
- "(\n"
- "(let source (DataSource 'yt 'plato))\n"
- "(let x (Read! world source (Key '('table (String 'Input))) '('key 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let table (Right! x))\n"
- "(let sorted (Sort table (Bool 'true) (lambda '(item) (Member item 'value))))\n"
- "(let result (OrderedExtend table sorted))\n"
- "(let sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world sink (Key '('table (String 'Output))) result '()))\n"
- "(let world (Commit! world sink))\n"
- "(return world)\n"
- ")\n"
- );
- progs.push_back(
- "(\n"
- "(let mr_source (DataSource 'yt 'plato))\n"
- "(let x (Read! world mr_source (Key '('table (String 'Output))) '('key 'value) '()))\n"
- "(let world (Left! x))\n"
- "(let result (Right! x))\n"
- "(let res_sink (DataSink 'result))\n"
- "(let mr_sink (DataSink 'yt 'plato))\n"
- "(let world (Write! world res_sink (Key) result '('('type))))\n"
- "(let world (Commit! world mr_sink))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n"
- );
- TRunMultiplePrograms driver(progs, Cerr);
- driver.TmpDir = tmpDir.Name();
- driver.Tables["yt.plato.Input"] = inputFile.Name();
- driver.Tables["yt.plato.Output"] = outputFile.Name();
- auto res = Run(driver);
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
- //~ Cerr << res[0] << Endl;
- UNIT_ASSERT_NO_DIFF(
- "{\"Write\"=[{"
- "\"Type\"=[\"ListType\";[\"StructType\";[[\"key\";[\"DataType\";\"String\"]];[\"value\";[\"DataType\";\"String\"]]]]];"
- "\"Data\"=["
- "[\"foo\";\"222\"];[\"bar\";\"111\"];[\"jar\";\"333\"];[\"bar\";\"111\"];[\"foo\";\"222\"];[\"jar\";\"333\"]"
- "]}]}",
- res[0]
- );
- }
- Y_UNIT_TEST(TestParametersEvaluation) {
- auto s = "(\n"
- "(let res_sink (DataSink 'result))\n"
- "(let data (Parameter '\"$foo\" (ParseType '\"Tuple<String, Int32 ? , List<Uint32>, Dict<Int32, Bool>, Struct<a : Void, b : Double>, Variant<Int32, Bool>>\")))\n"
- "(let world (Write! world res_sink (Key) data '('('type))))\n"
- "(let world (Commit! world res_sink))\n"
- "(return world)\n"
- ")\n";
- auto params = R"__(
- {"$foo"={Data=[
- bar;
- "33";
- ["1";"2";"3"];
- [["7";%true];["12";%false]];
- [#;"-1.7"];
- ["0";"8"];
- ]}}
- )__";
- auto res = RunProgram(s, THashMap<TString, TString>(), "", params);
- UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
- UNIT_ASSERT_NO_DIFF(R"__({"Write"=[{"Type"=["TupleType";[["DataType";"String"];["OptionalType";["DataType";"Int32"]];["ListType";["DataType";"Uint32"]];["DictType";["DataType";"Int32"];["DataType";"Bool"]];["StructType";[["a";["VoidType"]];["b";["DataType";"Double"]]]];["VariantType";["TupleType";[["DataType";"Int32"];["DataType";"Bool"]]]]]];"Data"=["bar";["33"];["1";"2";"3"];[["7";%true];["12";%false]];["Void";"-1.7"];["0";"8"]]}]})__", res[0]);
- }
- }
- } // namespace NYql
|