yql_provider_mkql.cpp 141 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099
  1. #include "yql_provider_mkql.h"
  2. #include "yql_type_mkql.h"
  3. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  4. #include <yql/essentials/core/yql_expr_type_annotation.h>
  5. #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
  6. #include <yql/essentials/core/yql_expr_type_annotation.h>
  7. #include <yql/essentials/core/yql_match_recognize.h>
  8. #include <yql/essentials/core/yql_join.h>
  9. #include <yql/essentials/core/yql_opt_utils.h>
  10. #include <yql/essentials/minikql/mkql_node.h>
  11. #include <yql/essentials/minikql/mkql_node_cast.h>
  12. #include <yql/essentials/minikql/mkql_program_builder.h>
  13. #include <yql/essentials/minikql/mkql_runtime_version.h>
  14. #include <yql/essentials/minikql/mkql_type_ops.h>
  15. #include <yql/essentials/public/decimal/yql_decimal.h>
  16. #include <yql/essentials/parser/pg_catalog/catalog.h>
  17. #include <util/stream/null.h>
  18. #include <util/string/cast.h>
  19. #include <array>
  20. using namespace NKikimr;
  21. using namespace NKikimr::NMiniKQL;
  22. namespace NYql {
  23. namespace NCommon {
  24. TRuntimeNode WideTopImpl(const TExprNode& node, TMkqlBuildContext& ctx,
  25. TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) {
  26. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  27. const auto count = MkqlBuildExpr(*node.Child(1U), ctx);
  28. std::vector<std::pair<ui32, TRuntimeNode>> directions;
  29. directions.reserve(node.Tail().ChildrenSize());
  30. node.Tail().ForEachChild([&](const TExprNode& dir) {
  31. directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
  32. });
  33. return (ctx.ProgramBuilder.*func)(flow, count, directions);
  34. }
  35. TRuntimeNode WideSortImpl(const TExprNode& node, TMkqlBuildContext& ctx,
  36. TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) {
  37. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  38. std::vector<std::pair<ui32, TRuntimeNode>> directions;
  39. directions.reserve(node.Tail().ChildrenSize());
  40. node.Tail().ForEachChild([&](const TExprNode& dir) {
  41. directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
  42. });
  43. return (ctx.ProgramBuilder.*func)(flow, directions);
  44. }
  45. TRuntimeNode CombineByKeyImpl(const TExprNode& node, TMkqlBuildContext& ctx) {
  46. NNodes::TCoCombineByKey combine(&node);
  47. const bool isStreamOrFlow = combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream ||
  48. combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow;
  49. YQL_ENSURE(!isStreamOrFlow);
  50. const auto input = MkqlBuildExpr(combine.Input().Ref(), ctx);
  51. TRuntimeNode preMapList = ctx.ProgramBuilder.FlatMap(input, [&](TRuntimeNode item) {
  52. return MkqlBuildLambda(combine.PreMapLambda().Ref(), ctx, {item});
  53. });
  54. const auto dict = ctx.ProgramBuilder.ToHashedDict(preMapList, true, [&](TRuntimeNode item) {
  55. return MkqlBuildLambda(combine.KeySelectorLambda().Ref(), ctx, {item});
  56. }, [&](TRuntimeNode item) {
  57. return item;
  58. });
  59. const auto values = ctx.ProgramBuilder.DictItems(dict);
  60. return ctx.ProgramBuilder.FlatMap(values, [&](TRuntimeNode item) {
  61. auto key = ctx.ProgramBuilder.Nth(item, 0);
  62. auto payloadList = ctx.ProgramBuilder.Nth(item, 1);
  63. auto fold1 = ctx.ProgramBuilder.Fold1(payloadList, [&](TRuntimeNode item2) {
  64. return MkqlBuildLambda(combine.InitHandlerLambda().Ref(), ctx, {key, item2});
  65. }, [&](TRuntimeNode item2, TRuntimeNode state) {
  66. return MkqlBuildLambda(combine.UpdateHandlerLambda().Ref(), ctx, {key, item2, state});
  67. });
  68. auto res = ctx.ProgramBuilder.FlatMap(fold1, [&](TRuntimeNode state) {
  69. return MkqlBuildLambda(combine.FinishHandlerLambda().Ref(), ctx, {key, state});
  70. });
  71. return res;
  72. });
  73. }
  74. namespace {
  75. std::array<TRuntimeNode, 2U> MkqlBuildSplitLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const std::initializer_list<TRuntimeNode>& args) {
  76. TMkqlBuildContext::TArgumentsMap innerArguments;
  77. innerArguments.reserve(args.size());
  78. auto it = args.begin();
  79. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  80. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  81. const auto& body = lambda.Tail();
  82. MKQL_ENSURE(body.IsList() && body.ChildrenSize() == 2U, "Expected pair of nodes.");
  83. return {{MkqlBuildExpr(body.Head(), innerCtx), MkqlBuildExpr(body.Tail(), innerCtx)}};
  84. }
  85. TMkqlBuildContext* GetNodeContext(const TExprNode& node, TMkqlBuildContext& ctx) {
  86. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  87. const auto knownNode = currCtx->Memoization.find(&node);
  88. if (currCtx->Memoization.cend() != knownNode) {
  89. return currCtx;
  90. }
  91. }
  92. return nullptr;
  93. }
  94. TMkqlBuildContext* GetNodeContextByLambda(const TExprNode& node, TMkqlBuildContext& ctx) {
  95. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  96. if (currCtx->LambdaId == node.UniqueId()) {
  97. return currCtx;
  98. }
  99. }
  100. return nullptr;
  101. }
  102. TMkqlBuildContext* GetContextForMemoizeInUnknowScope(const TExprNode& node, TMkqlBuildContext& ctx) {
  103. TMkqlBuildContext* result = nullptr;
  104. for (const auto& c : node.Children()) {
  105. const auto& child = c->IsLambda() ? c->Tail() : *c;
  106. if (!child.IsAtom()) {
  107. auto nodeCtx = GetNodeContext(child, ctx);
  108. if (!nodeCtx) {
  109. nodeCtx = GetContextForMemoizeInUnknowScope(child, ctx);
  110. }
  111. if (!result || result->Level < nodeCtx->Level) {
  112. result = nodeCtx;
  113. if (result == &ctx) {
  114. break;
  115. }
  116. }
  117. }
  118. }
  119. if (!result) {
  120. for (result = &ctx; result->ParentCtx; result = result->ParentCtx)
  121. continue;
  122. }
  123. return result;
  124. }
  125. TMkqlBuildContext* GetContextForMemoize(const TExprNode& node, TMkqlBuildContext& ctx) {
  126. if (const auto scope = node.GetDependencyScope()) {
  127. if (const auto lambda = scope->second) {
  128. return GetNodeContextByLambda(*lambda, ctx);
  129. }
  130. } else {
  131. return GetContextForMemoizeInUnknowScope(node, ctx);
  132. }
  133. auto result = &ctx;
  134. while (result->ParentCtx) {
  135. result = result->ParentCtx;
  136. }
  137. return result;
  138. }
  139. const TRuntimeNode& CheckTypeAndMemoize(const TExprNode& node, TMkqlBuildContext& ctx, const TRuntimeNode& runtime) {
  140. if (node.GetTypeAnn()) {
  141. TNullOutput null;
  142. if (const auto type = BuildType(*node.GetTypeAnn(), ctx.ProgramBuilder, *ctx.TypeMemoization, null)) {
  143. if (!type->IsSameType(*runtime.GetStaticType())) {
  144. ythrow TNodeException(node) << "Expected: " << *type << " type, but got: " << *runtime.GetStaticType() << ".";
  145. }
  146. }
  147. }
  148. return GetContextForMemoize(node, ctx)->Memoization.emplace(&node, runtime).first->second;
  149. }
  150. std::vector<TRuntimeNode> GetAllArguments(const TExprNode& node, TMkqlBuildContext& ctx) {
  151. std::vector<TRuntimeNode> args;
  152. args.reserve(node.ChildrenSize());
  153. node.ForEachChild([&](const TExprNode& child){ args.emplace_back(MkqlBuildExpr(child, ctx)); });
  154. return args;
  155. }
  156. template <size_t From>
  157. std::vector<TRuntimeNode> GetArgumentsFrom(const TExprNode& node, TMkqlBuildContext& ctx) {
  158. std::vector<TRuntimeNode> args;
  159. args.reserve(node.ChildrenSize() - From);
  160. for (auto i = From; i < node.ChildrenSize(); ++i) {
  161. args.emplace_back(MkqlBuildExpr(*node.Child(i), ctx));
  162. }
  163. return args;
  164. }
  165. NUdf::TDataTypeId ParseDataType(const TExprNode& owner, const std::string_view& type) {
  166. if (const auto slot = NUdf::FindDataSlot(type)) {
  167. return NUdf::GetDataTypeInfo(*slot).TypeId;
  168. }
  169. ythrow TNodeException(owner) << "Unsupported data type: " << type;
  170. }
  171. EJoinKind GetJoinKind(const TExprNode& owner, const std::string_view& content) {
  172. if (content == "Inner") {
  173. return EJoinKind::Inner;
  174. }
  175. else if (content == "Left") {
  176. return EJoinKind::Left;
  177. }
  178. else if (content == "Right") {
  179. return EJoinKind::Right;
  180. }
  181. else if (content == "Full") {
  182. return EJoinKind::Full;
  183. }
  184. else if (content == "LeftOnly") {
  185. return EJoinKind::LeftOnly;
  186. }
  187. else if (content == "RightOnly") {
  188. return EJoinKind::RightOnly;
  189. }
  190. else if (content == "Exclusion") {
  191. return EJoinKind::Exclusion;
  192. }
  193. else if (content == "LeftSemi") {
  194. return EJoinKind::LeftSemi;
  195. }
  196. else if (content == "RightSemi") {
  197. return EJoinKind::RightSemi;
  198. }
  199. else if (content == "Cross") {
  200. return EJoinKind::Cross;
  201. }
  202. else {
  203. ythrow TNodeException(owner) << "Unexpected join kind: " << content;
  204. }
  205. }
  206. template<typename TLayout>
  207. std::pair<TLayout, ui16> CutTimezone(const std::string_view& atom) {
  208. const auto pos = atom.find(',');
  209. MKQL_ENSURE(std::string_view::npos != pos, "Expected two components.");
  210. return std::make_pair(::FromString<TLayout>(atom.substr(0, pos)), GetTimezoneId(atom.substr(pos + 1)));
  211. }
  212. } // namespace
  213. bool TMkqlCallableCompilerBase::HasCallable(const std::string_view& name) const {
  214. return Callables.contains(name);
  215. }
  216. void TMkqlCallableCompilerBase::AddCallable(const std::string_view& name, TCompiler compiler) {
  217. const auto result = Callables.emplace(TString(name), compiler);
  218. YQL_ENSURE(result.second, "Callable already exists: " << name);
  219. }
  220. void TMkqlCallableCompilerBase::AddCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  221. for (const auto& name : names) {
  222. AddCallable(name, compiler);
  223. }
  224. }
  225. void TMkqlCallableCompilerBase::ChainCallable(const std::string_view& name, TCompiler compiler) {
  226. auto prevCompiler = GetCallable(name);
  227. auto chainedCompiler = [compiler = std::move(compiler), prevCompiler = std::move(prevCompiler)](const TExprNode& node, TMkqlBuildContext& ctx) -> NKikimr::NMiniKQL::TRuntimeNode {
  228. if (auto res = compiler(node, ctx)) {
  229. return res;
  230. }
  231. return prevCompiler(node, ctx);
  232. };
  233. OverrideCallable(name, chainedCompiler);
  234. }
  235. void TMkqlCallableCompilerBase::ChainCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  236. for (const auto& name : names) {
  237. ChainCallable(name, compiler);
  238. }
  239. }
  240. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::UnaryFunctionMethod>>& callables) {
  241. for (const auto& callable : callables) {
  242. AddCallable(callable.first,
  243. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  244. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  245. return (ctx.ProgramBuilder.*method)(arg);
  246. }
  247. );
  248. }
  249. }
  250. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::BinaryFunctionMethod>>& callables) {
  251. for (const auto& callable : callables) {
  252. AddCallable(callable.first,
  253. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  254. const auto one = MkqlBuildExpr(node.Head(), ctx);
  255. const auto two = MkqlBuildExpr(node.Tail(), ctx);
  256. return (ctx.ProgramBuilder.*method)(one, two);
  257. }
  258. );
  259. }
  260. }
  261. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::TernaryFunctionMethod>>& callables) {
  262. for (const auto& callable : callables) {
  263. AddCallable(callable.first,
  264. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  265. const auto arg1 = MkqlBuildExpr(node.Head(), ctx);
  266. const auto arg2 = MkqlBuildExpr(*node.Child(1U), ctx);
  267. const auto arg3 = MkqlBuildExpr(node.Tail(), ctx);
  268. return (ctx.ProgramBuilder.*method)(arg1, arg2, arg3);
  269. }
  270. );
  271. }
  272. }
  273. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::ArrayFunctionMethod>>& callables) {
  274. for (const auto& callable : callables) {
  275. AddCallable(callable.first,
  276. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  277. const auto& args = GetAllArguments(node, ctx);
  278. return (ctx.ProgramBuilder.*method)(args);
  279. }
  280. );
  281. }
  282. }
  283. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::ProcessFunctionMethod>>& callables) {
  284. for (const auto& callable : callables) {
  285. AddCallable(callable.first,
  286. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  287. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  288. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(node.Tail(), ctx, {item}); };
  289. return (ctx.ProgramBuilder.*method)(arg, lambda);
  290. }
  291. );
  292. }
  293. }
  294. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::NarrowFunctionMethod>>& callables) {
  295. for (const auto& callable : callables) {
  296. AddCallable(callable.first,
  297. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  298. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  299. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildLambda(node.Tail(), ctx, items); };
  300. return (ctx.ProgramBuilder.*method)(arg, lambda);
  301. }
  302. );
  303. }
  304. }
  305. void TMkqlCallableCompilerBase::OverrideCallable(const std::string_view& name, TCompiler compiler) {
  306. const auto prevCompiler = Callables.find(name);
  307. YQL_ENSURE(Callables.cend() != prevCompiler, "Missed callable: " << name);
  308. prevCompiler->second = compiler;
  309. Callables[name] = compiler;
  310. }
  311. IMkqlCallableCompiler::TCompiler TMkqlCallableCompilerBase::GetCallable(const std::string_view& name) const {
  312. const auto compiler = Callables.find(name);
  313. YQL_ENSURE(Callables.cend() != compiler, "Missed callable: " << name);
  314. return compiler->second;
  315. }
  316. IMkqlCallableCompiler::TCompiler TMkqlCallableCompilerBase::FindCallable(const std::string_view& name) const {
  317. const auto compiler = Callables.find(name);
  318. return Callables.cend() != compiler ? compiler->second : IMkqlCallableCompiler::TCompiler();
  319. }
  320. bool TMkqlCommonCallableCompiler::HasCallable(const std::string_view& name) const {
  321. if (TMkqlCallableCompilerBase::HasCallable(name)) {
  322. return true;
  323. }
  324. return GetShared().HasCallable(name);
  325. }
  326. IMkqlCallableCompiler::TCompiler TMkqlCommonCallableCompiler::FindCallable(const std::string_view& name) const {
  327. if (const auto func = TMkqlCallableCompilerBase::FindCallable(name)) {
  328. return func;
  329. }
  330. return GetShared().FindCallable(name);
  331. }
  332. IMkqlCallableCompiler::TCompiler TMkqlCommonCallableCompiler::GetCallable(const std::string_view& name) const {
  333. if (const auto func = TMkqlCallableCompilerBase::FindCallable(name)) {
  334. return func;
  335. }
  336. return GetShared().GetCallable(name);
  337. }
  338. void TMkqlCommonCallableCompiler::OverrideCallable(const std::string_view& name, TCompiler compiler) {
  339. if (TMkqlCallableCompilerBase::HasCallable(name)) {
  340. TMkqlCallableCompilerBase::OverrideCallable(name, compiler);
  341. } else {
  342. YQL_ENSURE(GetShared().HasCallable(name));
  343. TMkqlCallableCompilerBase::AddCallable(name, compiler);
  344. }
  345. }
  346. void TMkqlCommonCallableCompiler::AddCallable(const std::string_view& name, TCompiler compiler) {
  347. YQL_ENSURE(!GetShared().HasCallable(name), "Compiler already set for callable: " << name);
  348. TMkqlCallableCompilerBase::AddCallable(name, compiler);
  349. }
  350. void TMkqlCommonCallableCompiler::AddCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  351. for (const auto& name : names) {
  352. AddCallable(name, compiler);
  353. }
  354. }
  355. TMkqlCommonCallableCompiler::TShared::TShared() {
  356. AddSimpleCallables({
  357. {"Abs", &TProgramBuilder::Abs},
  358. {"Plus", &TProgramBuilder::Plus},
  359. {"Minus", &TProgramBuilder::Minus},
  360. {"Inc", &TProgramBuilder::Increment},
  361. {"Dec", &TProgramBuilder::Decrement},
  362. {"Not", &TProgramBuilder::Not},
  363. {"BlockNot", &TProgramBuilder::BlockNot},
  364. {"BlockJust", &TProgramBuilder::BlockJust},
  365. {"BitNot", &TProgramBuilder::BitNot},
  366. {"Size", &TProgramBuilder::Size},
  367. {"Way", &TProgramBuilder::Way},
  368. {"VariantItem", &TProgramBuilder::VariantItem},
  369. {"CountBits", &TProgramBuilder::CountBits},
  370. {"Ascending", &TProgramBuilder::Ascending},
  371. {"Descending", &TProgramBuilder::Descending},
  372. {"ToOptional", &TProgramBuilder::Head},
  373. {"Head", &TProgramBuilder::Head},
  374. {"Last", &TProgramBuilder::Last},
  375. {"ToList", &TProgramBuilder::ToList},
  376. {"ToFlow", &TProgramBuilder::ToFlow},
  377. {"FromFlow", &TProgramBuilder::FromFlow},
  378. {"WideToBlocks", &TProgramBuilder::WideToBlocks},
  379. {"WideFromBlocks", &TProgramBuilder::WideFromBlocks},
  380. {"AsScalar", &TProgramBuilder::AsScalar},
  381. {"Just", &TProgramBuilder::NewOptional},
  382. {"Exists", &TProgramBuilder::Exists},
  383. {"BlockExists", &TProgramBuilder::BlockExists},
  384. {"Pickle", &TProgramBuilder::Pickle},
  385. {"StablePickle", &TProgramBuilder::StablePickle},
  386. {"Collect", &TProgramBuilder::Collect},
  387. {"Discard", &TProgramBuilder::Discard},
  388. {"LazyList", &TProgramBuilder::LazyList},
  389. {"ForwardList", &TProgramBuilder::ForwardList},
  390. {"Length", &TProgramBuilder::Length},
  391. {"HasItems", &TProgramBuilder::HasItems},
  392. {"Reverse", &TProgramBuilder::Reverse},
  393. {"ToIndexDict", &TProgramBuilder::ToIndexDict},
  394. {"ToString", &TProgramBuilder::ToString},
  395. {"ToBytes", &TProgramBuilder::ToBytes},
  396. {"AggrCountInit", &TProgramBuilder::AggrCountInit},
  397. {"NewMTRand", &TProgramBuilder::NewMTRand},
  398. {"NextMTRand", &TProgramBuilder::NextMTRand},
  399. {"TimezoneId", &TProgramBuilder::TimezoneId},
  400. {"TimezoneName", &TProgramBuilder::TimezoneName},
  401. {"RemoveTimezone", &TProgramBuilder::RemoveTimezone},
  402. {"DictItems", &TProgramBuilder::DictItems},
  403. {"DictKeys", &TProgramBuilder::DictKeys},
  404. {"DictPayloads", &TProgramBuilder::DictPayloads},
  405. {"QueuePop", &TProgramBuilder::QueuePop}
  406. });
  407. AddSimpleCallables({
  408. {"+", &TProgramBuilder::Add},
  409. {"-", &TProgramBuilder::Sub},
  410. {"*", &TProgramBuilder::Mul},
  411. {"/", &TProgramBuilder::Div},
  412. {"%", &TProgramBuilder::Mod},
  413. {"Add", &TProgramBuilder::Add},
  414. {"Sub", &TProgramBuilder::Sub},
  415. {"Mul", &TProgramBuilder::Mul},
  416. {"Div", &TProgramBuilder::Div},
  417. {"Mod", &TProgramBuilder::Mod},
  418. {"DecimalMul", &TProgramBuilder::DecimalMul},
  419. {"DecimalDiv", &TProgramBuilder::DecimalDiv},
  420. {"DecimalMod", &TProgramBuilder::DecimalMod},
  421. {"BlockDecimalMul", &TProgramBuilder::BlockDecimalMul},
  422. {"BlockDecimalDiv", &TProgramBuilder::BlockDecimalDiv},
  423. {"BlockDecimalMod", &TProgramBuilder::BlockDecimalMod},
  424. {"==", &TProgramBuilder::Equals},
  425. {"!=", &TProgramBuilder::NotEquals},
  426. {"<", &TProgramBuilder::Less},
  427. {"<=", &TProgramBuilder::LessOrEqual},
  428. {">", &TProgramBuilder::Greater},
  429. {">=", &TProgramBuilder::GreaterOrEqual},
  430. {"Equals", &TProgramBuilder::Equals},
  431. {"NotEquals", &TProgramBuilder::NotEquals},
  432. {"Less", &TProgramBuilder::Less},
  433. {"LessOrEqual", &TProgramBuilder::LessOrEqual},
  434. {"Greater", &TProgramBuilder::Greater},
  435. {"GreaterOrEqual", &TProgramBuilder::GreaterOrEqual},
  436. {"AggrEquals", &TProgramBuilder::AggrEquals},
  437. {"AggrNotEquals", &TProgramBuilder::AggrNotEquals},
  438. {"AggrLess", &TProgramBuilder::AggrLess},
  439. {"AggrLessOrEqual", &TProgramBuilder::AggrLessOrEqual},
  440. {"AggrGreater", &TProgramBuilder::AggrGreater},
  441. {"AggrGreaterOrEqual", &TProgramBuilder::AggrGreaterOrEqual},
  442. {"AggrMin", &TProgramBuilder::AggrMin},
  443. {"AggrMax", &TProgramBuilder::AggrMax},
  444. {"AggrAdd", &TProgramBuilder::AggrAdd},
  445. {"AggrCountUpdate", &TProgramBuilder::AggrCountUpdate},
  446. {"BitOr", &TProgramBuilder::BitOr},
  447. {"BitAnd", &TProgramBuilder::BitAnd},
  448. {"BitXor", &TProgramBuilder::BitXor},
  449. {"ShiftLeft", &TProgramBuilder::ShiftLeft},
  450. {"ShiftRight", &TProgramBuilder::ShiftRight},
  451. {"RotLeft", &TProgramBuilder::RotLeft},
  452. {"RotRight", &TProgramBuilder::RotRight},
  453. {"ListIf", &TProgramBuilder::ListIf},
  454. {"Concat", &TProgramBuilder::Concat},
  455. {"AggrConcat", &TProgramBuilder::AggrConcat},
  456. {"ByteAt", &TProgramBuilder::ByteAt},
  457. {"Nanvl", &TProgramBuilder::Nanvl},
  458. {"Skip", &TProgramBuilder::Skip},
  459. {"Take", &TProgramBuilder::Take},
  460. {"Limit", &TProgramBuilder::Take},
  461. {"WideTakeBlocks", &TProgramBuilder::WideTakeBlocks},
  462. {"WideSkipBlocks", &TProgramBuilder::WideSkipBlocks},
  463. {"BlockCoalesce", &TProgramBuilder::BlockCoalesce},
  464. {"ReplicateScalar", &TProgramBuilder::ReplicateScalar},
  465. {"BlockAnd", &TProgramBuilder::BlockAnd},
  466. {"BlockOr", &TProgramBuilder::BlockOr},
  467. {"BlockXor", &TProgramBuilder::BlockXor},
  468. {"Append", &TProgramBuilder::Append},
  469. {"Insert", &TProgramBuilder::Append},
  470. {"Prepend", &TProgramBuilder::Prepend},
  471. {"Lookup", &TProgramBuilder::Lookup},
  472. {"Contains", &TProgramBuilder::Contains},
  473. {"AddTimezone", &TProgramBuilder::AddTimezone},
  474. {"StartsWith", &TProgramBuilder::StartsWith},
  475. {"EndsWith", &TProgramBuilder::EndsWith},
  476. {"StringContains", &TProgramBuilder::StringContains},
  477. {"SqueezeToList", &TProgramBuilder::SqueezeToList},
  478. {"QueuePush", &TProgramBuilder::QueuePush}
  479. });
  480. AddSimpleCallables({
  481. {"Substring", &TProgramBuilder::Substring},
  482. {"Find", &TProgramBuilder::Find},
  483. {"RFind", &TProgramBuilder::RFind},
  484. {"ListFromRange", &TProgramBuilder::ListFromRange},
  485. {"PreserveStream", &TProgramBuilder::PreserveStream},
  486. {"BlockIf", &TProgramBuilder::BlockIf},
  487. });
  488. AddSimpleCallables({
  489. {"If", &TProgramBuilder::If},
  490. {"Or", &TProgramBuilder::Or},
  491. {"And", &TProgramBuilder::And},
  492. {"Xor", &TProgramBuilder::Xor},
  493. {"Min", &TProgramBuilder::Min},
  494. {"Max", &TProgramBuilder::Max},
  495. {"AsList", &TProgramBuilder::AsList},
  496. {"Extend", &TProgramBuilder::Extend},
  497. {"OrderedExtend", &TProgramBuilder::OrderedExtend},
  498. {"Zip", &TProgramBuilder::Zip},
  499. {"ZipAll", &TProgramBuilder::ZipAll},
  500. {"Random", &TProgramBuilder::Random},
  501. {"RandomNumber", &TProgramBuilder::RandomNumber},
  502. {"RandomUuid", &TProgramBuilder::RandomUuid},
  503. {"Now", &TProgramBuilder::Now},
  504. {"CurrentUtcDate", &TProgramBuilder::CurrentUtcDate},
  505. {"CurrentUtcDatetime", &TProgramBuilder::CurrentUtcDatetime},
  506. {"CurrentUtcTimestamp", &TProgramBuilder::CurrentUtcTimestamp},
  507. });
  508. AddSimpleCallables({
  509. {"Map", &TProgramBuilder::Map},
  510. {"OrderedMap", &TProgramBuilder::OrderedMap},
  511. {"FlatMap", &TProgramBuilder::FlatMap},
  512. {"OrderedFlatMap", &TProgramBuilder::OrderedFlatMap},
  513. {"SkipWhile", &TProgramBuilder::SkipWhile},
  514. {"TakeWhile", &TProgramBuilder::TakeWhile},
  515. {"SkipWhileInclusive", &TProgramBuilder::SkipWhileInclusive},
  516. {"TakeWhileInclusive", &TProgramBuilder::TakeWhileInclusive},
  517. });
  518. AddSimpleCallables({
  519. {"NarrowMap", &TProgramBuilder::NarrowMap},
  520. {"NarrowFlatMap", &TProgramBuilder::NarrowFlatMap},
  521. {"WideSkipWhile", &TProgramBuilder::WideSkipWhile},
  522. {"WideTakeWhile", &TProgramBuilder::WideTakeWhile},
  523. {"WideSkipWhileInclusive", &TProgramBuilder::WideSkipWhileInclusive},
  524. {"WideTakeWhileInclusive", &TProgramBuilder::WideTakeWhileInclusive},
  525. });
  526. AddSimpleCallables({
  527. {"RangeUnion", &TProgramBuilder::RangeUnion},
  528. {"RangeIntersect", &TProgramBuilder::RangeIntersect},
  529. {"RangeMultiply", &TProgramBuilder::RangeMultiply},
  530. });
  531. AddSimpleCallables({
  532. {"RangeCreate", &TProgramBuilder::RangeCreate},
  533. {"RangeFinalize", &TProgramBuilder::RangeFinalize},
  534. });
  535. AddCallable({"RoundUp", "RoundDown"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  536. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  537. const auto dstType = ctx.BuildType(node.Tail(), *node.Tail().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  538. return ctx.ProgramBuilder.Round(node.Content(), arg, dstType);
  539. });
  540. AddSimpleCallables({
  541. {"NextValue", &TProgramBuilder::NextValue},
  542. });
  543. AddCallable({"MultiMap", "OrderedMultiMap"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  544. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  545. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildWideLambda(node.Tail(), ctx, {item}); };
  546. return ctx.ProgramBuilder.MultiMap(arg, lambda);
  547. });
  548. AddCallable("ExpandMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  549. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  550. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildWideLambda(node.Tail(), ctx, {item}); };
  551. return ctx.ProgramBuilder.ExpandMap(arg, lambda);
  552. });
  553. AddCallable("WideMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  554. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  555. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildWideLambda(node.Tail(), ctx, items); };
  556. TRuntimeNode result = ctx.ProgramBuilder.WideMap(arg, lambda);
  557. if (IsWideBlockType(*node.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType())) {
  558. result = ctx.ProgramBuilder.BlockExpandChunked(result);
  559. }
  560. return result;
  561. });
  562. AddCallable("WideChain1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  563. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  564. return ctx.ProgramBuilder.WideChain1Map(flow,
  565. [&](TRuntimeNode::TList items) {
  566. return MkqlBuildWideLambda(*node.Child(1), ctx, items);
  567. },
  568. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  569. items.insert(items.cend(), state.cbegin(), state.cend());
  570. return MkqlBuildWideLambda(node.Tail(), ctx, items);
  571. });
  572. });
  573. AddCallable("NarrowMultiMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  574. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  575. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildWideLambda(node.Tail(), ctx, items); };
  576. return ctx.ProgramBuilder.NarrowMultiMap(arg, lambda);
  577. });
  578. AddCallable("WideFilter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  579. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  580. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildLambda(*node.Child(1), ctx, items); };
  581. if (node.ChildrenSize() > 2U) {
  582. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  583. return ctx.ProgramBuilder.WideFilter(arg, limit, lambda);
  584. } else {
  585. return ctx.ProgramBuilder.WideFilter(arg, lambda);
  586. }
  587. });
  588. AddCallable("WideCondense1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  589. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  590. return ctx.ProgramBuilder.WideCondense1(flow,
  591. [&](TRuntimeNode::TList items) {
  592. return MkqlBuildWideLambda(*node.Child(1), ctx, items);
  593. },
  594. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  595. items.insert(items.cend(), state.cbegin(), state.cend());
  596. return MkqlBuildLambda(*node.Child(2), ctx, items);
  597. },
  598. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  599. items.insert(items.cend(), state.cbegin(), state.cend());
  600. return MkqlBuildWideLambda(*node.Child(3), ctx, items);
  601. },
  602. HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
  603. });
  604. AddCallable("WideCombiner", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  605. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  606. i64 memLimit = 0LL;
  607. const bool withLimit = TryFromString<i64>(node.Child(1U)->Content(), memLimit);
  608. const auto keyExtractor = [&](TRuntimeNode::TList items) {
  609. return MkqlBuildWideLambda(*node.Child(2U), ctx, items);
  610. };
  611. const auto init = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) {
  612. keys.insert(keys.cend(), items.cbegin(), items.cend());
  613. return MkqlBuildWideLambda(*node.Child(3U), ctx, keys);
  614. };
  615. const auto update = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items, TRuntimeNode::TList state) {
  616. keys.insert(keys.cend(), items.cbegin(), items.cend());
  617. keys.insert(keys.cend(), state.cbegin(), state.cend());
  618. return MkqlBuildWideLambda(*node.Child(4U), ctx, keys);
  619. };
  620. const auto finish = [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) {
  621. keys.insert(keys.cend(), state.cbegin(), state.cend());
  622. return MkqlBuildWideLambda(node.Tail(), ctx, keys);
  623. };
  624. bool isStatePersistable = true;
  625. // Traverse through childs skipping input and limit children
  626. for (size_t i = 2U; i < node.ChildrenSize(); ++i) {
  627. isStatePersistable = isStatePersistable && node.Child(i)->GetTypeAnn()->IsPersistable();
  628. }
  629. if (withLimit) {
  630. return ctx.ProgramBuilder.WideCombiner(flow, memLimit, keyExtractor, init, update, finish);
  631. }
  632. if (isStatePersistable && RuntimeVersion >= 49U) {
  633. return ctx.ProgramBuilder.WideLastCombinerWithSpilling(flow, keyExtractor, init, update, finish);
  634. }
  635. return ctx.ProgramBuilder.WideLastCombiner(flow, keyExtractor, init, update, finish);
  636. });
  637. AddCallable("WideChopper", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  638. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  639. const auto keyExtractor = [&](TRuntimeNode::TList items) {
  640. return MkqlBuildWideLambda(*node.Child(1U), ctx, items);
  641. };
  642. const auto groupSwitch = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) {
  643. keys.insert(keys.cend(), items.cbegin(), items.cend());
  644. return MkqlBuildLambda(*node.Child(2U), ctx, keys);
  645. };
  646. const auto handler = [&](TRuntimeNode::TList keys, TRuntimeNode flow) {
  647. keys.emplace_back(flow);
  648. return MkqlBuildLambda(node.Tail(), ctx, keys);
  649. };
  650. return ctx.ProgramBuilder.WideChopper(flow, keyExtractor, groupSwitch, handler);
  651. });
  652. AddCallable("WideTop", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  653. return WideTopImpl(node, ctx, &TProgramBuilder::WideTop);
  654. });
  655. AddCallable("WideTopSort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  656. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSort);
  657. });
  658. AddCallable("WideSort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  659. return WideSortImpl(node, ctx, &TProgramBuilder::WideSort);
  660. });
  661. AddCallable("WideTopBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  662. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopBlocks);
  663. });
  664. AddCallable("WideTopSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  665. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSortBlocks);
  666. });
  667. AddCallable("WideSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  668. return WideSortImpl(node, ctx, &TProgramBuilder::WideSortBlocks);
  669. });
  670. AddCallable("Iterable", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  671. const auto lambda = [&]() { return MkqlBuildLambda(node.Head(), ctx, {}); };
  672. return ctx.ProgramBuilder.Iterable(lambda);
  673. });
  674. AddCallable("Filter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  675. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  676. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(*node.Child(1), ctx, {item}); };
  677. if (node.ChildrenSize() > 2U) {
  678. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  679. return ctx.ProgramBuilder.Filter(arg, limit, lambda);
  680. } else {
  681. return ctx.ProgramBuilder.Filter(arg, lambda);
  682. }
  683. });
  684. AddCallable("OrderedFilter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  685. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  686. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(*node.Child(1), ctx, {item}); };
  687. if (node.ChildrenSize() > 2U) {
  688. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  689. return ctx.ProgramBuilder.OrderedFilter(arg, limit, lambda);
  690. } else {
  691. return ctx.ProgramBuilder.OrderedFilter(arg, lambda);
  692. }
  693. });
  694. AddCallable("Member", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  695. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  696. const auto name = node.Tail().Content();
  697. return ctx.ProgramBuilder.Member(structObj, name);
  698. });
  699. AddCallable("RemoveMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  700. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  701. const auto name = node.Tail().Content();
  702. return ctx.ProgramBuilder.RemoveMember(structObj, name, false);
  703. });
  704. AddCallable("ForceRemoveMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  705. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  706. const auto name = node.Tail().Content();
  707. return ctx.ProgramBuilder.RemoveMember(structObj, name, true);
  708. });
  709. AddCallable("Nth", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  710. const auto tupleObj = MkqlBuildExpr(node.Head(), ctx);
  711. const auto index = FromString<ui32>(node.Tail().Content());
  712. return ctx.ProgramBuilder.Nth(tupleObj, index);
  713. });
  714. AddCallable("MatchRecognizeCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  715. const auto& inputStream = node.Child(0);
  716. const auto& partitionKeySelector = node.Child(1);
  717. const auto& partitionColumns = node.Child(2);
  718. const auto& params = node.Child(3);
  719. const auto& settings = node.Child(4);
  720. //explore params
  721. const auto measures = params->Child(0);
  722. const auto skipTo = params->Child(2);
  723. const auto pattern = params->Child(3);
  724. const auto defines = params->Child(4);
  725. //explore measures
  726. const auto measureNames = measures->Child(2);
  727. constexpr size_t FirstMeasureLambdaIndex = 3;
  728. //explore defines
  729. const auto defineNames = defines->Child(2);
  730. const size_t FirstDefineLambdaIndex = 3;
  731. TVector<TStringBuf> partitionColumnNames;
  732. for (const auto& n: partitionColumns->Children()) {
  733. partitionColumnNames.push_back(n->Content());
  734. }
  735. TProgramBuilder::TUnaryLambda getPartitionKeySelector = [partitionKeySelector, &ctx](TRuntimeNode inputRowArg){
  736. return MkqlBuildLambda(*partitionKeySelector, ctx, {inputRowArg});
  737. };
  738. TVector<TStringBuf> defineVarNames(defineNames->ChildrenSize());
  739. TVector<TProgramBuilder::TTernaryLambda> getDefines(defineNames->ChildrenSize());
  740. for (size_t i = 0; i != defineNames->ChildrenSize(); ++i) {
  741. defineVarNames[i] = defineNames->Child(i)->Content();
  742. getDefines[i] = [i, defines, &ctx](TRuntimeNode data, TRuntimeNode matchedVars, TRuntimeNode rowIndex) {
  743. return MkqlBuildLambda(*defines->Child(FirstDefineLambdaIndex + i), ctx, {data, matchedVars, rowIndex});
  744. };
  745. }
  746. TVector<TStringBuf> measureColumnNames(measureNames->ChildrenSize());
  747. TVector<TProgramBuilder::TBinaryLambda> getMeasures(measureNames->ChildrenSize());
  748. for (size_t i = 0; i != measureNames->ChildrenSize(); ++i) {
  749. measureColumnNames[i] = measureNames->Child(i)->Content();
  750. getMeasures[i] = [i, measures, &ctx](TRuntimeNode data, TRuntimeNode matchedVars) {
  751. return MkqlBuildLambda(*measures->Child(FirstMeasureLambdaIndex + i), ctx, {data, matchedVars});
  752. };
  753. }
  754. auto stringTo = skipTo->Child(0)->Content();
  755. auto var = skipTo->Child(1)->Content();
  756. MKQL_ENSURE(stringTo.SkipPrefix("AfterMatchSkip_"), R"(MATCH_RECOGNIZE: <row pattern skip to> should start with "AfterMatchSkip_")");
  757. NYql::NMatchRecognize::EAfterMatchSkipTo to;
  758. MKQL_ENSURE(TryFromString<NYql::NMatchRecognize::EAfterMatchSkipTo>(stringTo, to), "MATCH_RECOGNIZE: <row pattern skip to> cannot parse AfterMatchSkipTo mode");
  759. auto rowsPerMatchString = params->Child(1)->Content();
  760. MKQL_ENSURE(rowsPerMatchString.SkipPrefix("RowsPerMatch_"), R"(MATCH_RECOGNIZE: <row pattern rows per match> should start with "RowsPerMatch_")");
  761. NYql::NMatchRecognize::ERowsPerMatch rowsPerMatch;
  762. MKQL_ENSURE(TryFromString<NYql::NMatchRecognize::ERowsPerMatch>(rowsPerMatchString, rowsPerMatch), "MATCH_RECOGNIZE: cannot parse RowsPerMatch mode");
  763. const auto streamingMode = FromString<bool>(settings->Child(0)->Child(1)->Content());
  764. return ctx.ProgramBuilder.MatchRecognizeCore(
  765. MkqlBuildExpr(*inputStream, ctx),
  766. getPartitionKeySelector,
  767. partitionColumnNames,
  768. measureColumnNames,
  769. getMeasures,
  770. NYql::NMatchRecognize::ConvertPattern(pattern, ctx.ExprCtx),
  771. defineVarNames,
  772. getDefines,
  773. streamingMode,
  774. NYql::NMatchRecognize::TAfterMatchSkipTo{to, TString{var}},
  775. rowsPerMatch
  776. );
  777. });
  778. AddCallable("TimeOrderRecover", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  779. const auto inputStream = node.Child(0);
  780. const auto timeExtractor = node.Child(1);
  781. const auto delay = node.Child(2);
  782. const auto ahead = node.Child(3);
  783. const auto rowLimit = node.Child(4);
  784. return ctx.ProgramBuilder.TimeOrderRecover(
  785. MkqlBuildExpr(*inputStream, ctx),
  786. [timeExtractor, &ctx](TRuntimeNode row) {
  787. return MkqlBuildLambda(*timeExtractor, ctx, {row});
  788. },
  789. MkqlBuildExpr(*delay, ctx),
  790. MkqlBuildExpr(*ahead, ctx),
  791. MkqlBuildExpr(*rowLimit, ctx)
  792. );
  793. });
  794. AddCallable("Guess", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  795. const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
  796. auto type = node.Head().GetTypeAnn();
  797. if (type->GetKind() == ETypeAnnotationKind::Optional) {
  798. type = type->Cast<TOptionalExprType>()->GetItemType();
  799. }
  800. auto varType = type->Cast<TVariantExprType>();
  801. if (varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
  802. auto index = FromString<ui32>(node.Child(1)->Content());
  803. return ctx.ProgramBuilder.Guess(variantObj, index);
  804. } else {
  805. return ctx.ProgramBuilder.Guess(variantObj, node.Child(1)->Content());
  806. }
  807. });
  808. AddCallable("Visit", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  809. const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
  810. const auto type = node.Head().GetTypeAnn()->Cast<TVariantExprType>();
  811. const TTupleExprType* tupleType = nullptr;
  812. const TStructExprType* structType = nullptr;
  813. std::vector<TExprNode*> lambdas;
  814. TRuntimeNode defaultValue;
  815. if (type->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
  816. tupleType = type->GetUnderlyingType()->Cast<TTupleExprType>();
  817. lambdas.resize(tupleType->GetSize());
  818. } else {
  819. structType = type->GetUnderlyingType()->Cast<TStructExprType>();
  820. lambdas.resize(structType->GetSize());
  821. }
  822. for (ui32 index = 1; index < node.ChildrenSize(); ++index) {
  823. const auto child = node.Child(index);
  824. if (!child->IsAtom()) {
  825. defaultValue = MkqlBuildExpr(*child, ctx);
  826. continue;
  827. }
  828. ui32 itemIndex;
  829. if (tupleType) {
  830. itemIndex = FromString<ui32>(child->Content());
  831. } else {
  832. itemIndex = *structType->FindItem(child->Content());
  833. }
  834. YQL_ENSURE(itemIndex < lambdas.size());
  835. ++index;
  836. lambdas[itemIndex] = node.Child(index);
  837. }
  838. const auto handler = [&](ui32 index, TRuntimeNode item) {
  839. if (const auto lambda = lambdas[index]) {
  840. return MkqlBuildLambda(*lambda, ctx, {item});
  841. }
  842. return defaultValue;
  843. };
  844. return ctx.ProgramBuilder.VisitAll(variantObj, handler);
  845. });
  846. AddCallable("CurrentActorId", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  847. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  848. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  849. return TRuntimeNode(call.Build(), false);
  850. });
  851. AddCallable("Uint8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  852. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui8>(node.Head(), NUdf::EDataSlot::Uint8));
  853. });
  854. AddCallable("Int8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  855. return ctx.ProgramBuilder.NewDataLiteral(FromString<i8>(node.Head(), NUdf::EDataSlot::Int8));
  856. });
  857. AddCallable("Uint16", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  858. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui16>(node.Head(), NUdf::EDataSlot::Uint16));
  859. });
  860. AddCallable("Int16", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  861. return ctx.ProgramBuilder.NewDataLiteral(FromString<i16>(node.Head(), NUdf::EDataSlot::Int16));
  862. });
  863. AddCallable("Int32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  864. return ctx.ProgramBuilder.NewDataLiteral(FromString<i32>(node.Head(), NUdf::EDataSlot::Int32));
  865. });
  866. AddCallable("Uint32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  867. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui32>(node.Head(), NUdf::EDataSlot::Uint32));
  868. });
  869. AddCallable("Int64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  870. return ctx.ProgramBuilder.NewDataLiteral(FromString<i64>(node.Head(), NUdf::EDataSlot::Int64));
  871. });
  872. AddCallable("Uint64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  873. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui64>(node.Head(), NUdf::EDataSlot::Uint64));
  874. });
  875. AddCallable("String", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  876. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content());
  877. });
  878. AddCallable("Utf8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  879. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Utf8>(node.Head().Content());
  880. });
  881. AddCallable("Yson", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  882. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(node.Head().Content());
  883. });
  884. AddCallable("Json", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  885. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Json>(node.Head().Content());
  886. });
  887. AddCallable("JsonDocument", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  888. // NOTE: ValueFromString returns TUnboxedValuePod. This type does not free string inside it during destruction.
  889. // To get smart pointer-like behaviour we convert TUnboxedValuePod to TUnboxedValue. Without this conversion there
  890. // will be a memory leak.
  891. NUdf::TUnboxedValue jsonDocument = ValueFromString(NUdf::EDataSlot::JsonDocument, node.Head().Content());
  892. MKQL_ENSURE(bool(jsonDocument), "Invalid JsonDocument literal");
  893. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::JsonDocument>(jsonDocument.AsStringRef());
  894. });
  895. AddCallable("Uuid", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  896. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Uuid>(node.Head().Content());
  897. });
  898. AddCallable("Decimal", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  899. const auto precision = FromString<ui8>(node.Child(1)->Content());
  900. const auto scale = FromString<ui8>(node.Child(2)->Content());
  901. MKQL_ENSURE(precision > 0, "Precision must be positive.");
  902. MKQL_ENSURE(scale <= precision, "Scale too large.");
  903. const auto data = NDecimal::FromString(node.Head().Content(), precision, scale);
  904. MKQL_ENSURE(!NDecimal::IsError(data), "Bad decimal.");
  905. return ctx.ProgramBuilder.NewDecimalLiteral(data, precision, scale);
  906. });
  907. AddCallable("Bool", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  908. return ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(node.Head(), NUdf::EDataSlot::Bool));
  909. });
  910. AddCallable("Float", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  911. return ctx.ProgramBuilder.NewDataLiteral(FromString<float>(node.Head(), NUdf::EDataSlot::Float));
  912. });
  913. AddCallable("Double", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  914. return ctx.ProgramBuilder.NewDataLiteral(FromString<double>(node.Head(), NUdf::EDataSlot::Double));
  915. });
  916. AddCallable("DyNumber", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  917. const NUdf::TUnboxedValue val = ValueFromString(NUdf::EDataSlot::DyNumber, node.Head().Content());
  918. MKQL_ENSURE(val, "Bad DyNumber: " << TString(node.Head().Content()).Quote());
  919. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::DyNumber>(val.AsStringRef());
  920. });
  921. AddCallable("Date", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  922. const auto value = FromString<ui16>(node.Head(), NUdf::EDataSlot::Date);
  923. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Date>(
  924. NUdf::TStringRef((const char*)&value, sizeof(value)));
  925. });
  926. AddCallable("Datetime", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  927. const auto value = FromString<ui32>(node.Head(), NUdf::EDataSlot::Datetime);
  928. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Datetime>(
  929. NUdf::TStringRef((const char*)&value, sizeof(value)));
  930. });
  931. AddCallable("Timestamp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  932. const auto value = FromString<ui64>(node.Head(), NUdf::EDataSlot::Timestamp);
  933. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Timestamp>(
  934. NUdf::TStringRef((const char*)&value, sizeof(value)));
  935. });
  936. AddCallable("Interval", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  937. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Interval);
  938. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(
  939. NUdf::TStringRef((const char*)&value, sizeof(value)));
  940. });
  941. AddCallable("TzDate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  942. const auto& parts = CutTimezone<ui16>(node.Head().Content());
  943. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDate>(parts.first, parts.second);
  944. });
  945. AddCallable("TzDatetime", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  946. const auto& parts = CutTimezone<ui32>(node.Head().Content());
  947. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDatetime>(parts.first, parts.second);
  948. });
  949. AddCallable("TzTimestamp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  950. const auto& parts = CutTimezone<ui64>(node.Head().Content());
  951. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzTimestamp>(parts.first, parts.second);
  952. });
  953. AddCallable("Date32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  954. const auto value = FromString<i32>(node.Head(), NUdf::EDataSlot::Date32);
  955. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Date32>(
  956. NUdf::TStringRef((const char*)&value, sizeof(value)));
  957. });
  958. AddCallable("Datetime64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  959. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Datetime64);
  960. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Datetime64>(
  961. NUdf::TStringRef((const char*)&value, sizeof(value)));
  962. });
  963. AddCallable("Timestamp64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  964. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Timestamp64);
  965. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Timestamp64>(
  966. NUdf::TStringRef((const char*)&value, sizeof(value)));
  967. });
  968. AddCallable("Interval64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  969. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Interval64);
  970. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Interval64>(
  971. NUdf::TStringRef((const char*)&value, sizeof(value)));
  972. });
  973. AddCallable("TzDate32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  974. const auto& parts = CutTimezone<i32>(node.Head().Content());
  975. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDate32>(parts.first, parts.second);
  976. });
  977. AddCallable("TzDatetime64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  978. const auto& parts = CutTimezone<i64>(node.Head().Content());
  979. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDatetime64>(parts.first, parts.second);
  980. });
  981. AddCallable("TzTimestamp64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  982. const auto& parts = CutTimezone<i64>(node.Head().Content());
  983. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzTimestamp64>(parts.first, parts.second);
  984. });
  985. AddCallable("FoldMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  986. const auto list = MkqlBuildExpr(node.Head(), ctx);
  987. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  988. return ctx.ProgramBuilder.ChainMap(list, state, [&](TRuntimeNode item, TRuntimeNode state) {
  989. return MkqlBuildSplitLambda(*node.Child(2), ctx, {item, state});
  990. });
  991. });
  992. AddCallable("Fold1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  993. const auto list = MkqlBuildExpr(node.Head(), ctx);
  994. return ctx.ProgramBuilder.Chain1Map(list, [&](TRuntimeNode item) {
  995. return MkqlBuildSplitLambda(*node.Child(1), ctx, {item});
  996. }, [&](TRuntimeNode item, TRuntimeNode state) {
  997. return MkqlBuildSplitLambda(*node.Child(2), ctx, {item, state});
  998. });
  999. });
  1000. AddCallable("Chain1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1001. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1002. return ctx.ProgramBuilder.Chain1Map(list,
  1003. [&](TRuntimeNode item) -> std::array<TRuntimeNode, 2U> {
  1004. const auto out = MkqlBuildLambda(*node.Child(1), ctx, {item});
  1005. return {{out, out}};
  1006. }, [&](TRuntimeNode item, TRuntimeNode state) -> std::array<TRuntimeNode, 2U> {
  1007. const auto out = MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1008. return {{out, out}};
  1009. });
  1010. });
  1011. AddCallable("Extract", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1012. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1013. const auto name = node.Tail().Content();
  1014. return ctx.ProgramBuilder.Extract(list, name);
  1015. });
  1016. AddCallable("OrderedExtract", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1017. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1018. const auto name = node.Child(1)->Content();
  1019. return ctx.ProgramBuilder.OrderedExtract(list, name);
  1020. });
  1021. AddCallable("Fold", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1022. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1023. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1024. return ctx.ProgramBuilder.Fold(list, state, [&](TRuntimeNode item, TRuntimeNode state) {
  1025. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1026. });
  1027. });
  1028. AddCallable("MapNext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1029. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1030. return ctx.ProgramBuilder.MapNext(list, [&](TRuntimeNode item, TRuntimeNode nextItem) {
  1031. return MkqlBuildLambda(node.Tail(), ctx, {item, nextItem});
  1032. });
  1033. });
  1034. AddCallable("Fold1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1035. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1036. return ctx.ProgramBuilder.Fold1(list, [&](TRuntimeNode item) {
  1037. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1038. }, [&](TRuntimeNode item, TRuntimeNode state) {
  1039. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1040. });
  1041. });
  1042. AddCallable("Condense", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1043. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1044. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1045. return ctx.ProgramBuilder.Condense(stream, state,
  1046. [&](TRuntimeNode item, TRuntimeNode state) {
  1047. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1048. },
  1049. [&](TRuntimeNode item, TRuntimeNode state) {
  1050. return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
  1051. }, HasContextFuncs(*node.Child(3)));
  1052. });
  1053. AddCallable("Condense1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1054. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1055. return ctx.ProgramBuilder.Condense1(stream,
  1056. [&](TRuntimeNode item) {
  1057. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1058. },
  1059. [&](TRuntimeNode item, TRuntimeNode state) {
  1060. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1061. },
  1062. [&](TRuntimeNode item, TRuntimeNode state) {
  1063. return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
  1064. }, HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
  1065. });
  1066. AddCallable("Squeeze", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1067. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1068. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1069. return ctx.ProgramBuilder.Squeeze(stream, state, [&](TRuntimeNode item, TRuntimeNode state) {
  1070. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1071. }, node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1072. return MkqlBuildLambda(*node.Child(3), ctx, {state});
  1073. }, node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1074. return MkqlBuildLambda(*node.Child(4), ctx, {state});
  1075. });
  1076. });
  1077. AddCallable("Squeeze1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1078. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1079. return ctx.ProgramBuilder.Squeeze1(stream, [&](TRuntimeNode item) {
  1080. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1081. }, [&](TRuntimeNode item, TRuntimeNode state) {
  1082. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1083. }, node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1084. return MkqlBuildLambda(*node.Child(3), ctx, {state});
  1085. }, node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1086. return MkqlBuildLambda(*node.Child(4), ctx, {state});
  1087. });
  1088. });
  1089. AddCallable("Sort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1090. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1091. const auto ascending = MkqlBuildExpr(*node.Child(1), ctx);
  1092. return ctx.ProgramBuilder.Sort(list, ascending, [&](TRuntimeNode item) {
  1093. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1094. });
  1095. });
  1096. AddCallable({"Top", "TopSort"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1097. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1098. const auto count = MkqlBuildExpr(*node.Child(1), ctx);
  1099. const auto ascending = MkqlBuildExpr(*node.Child(2), ctx);
  1100. return (ctx.ProgramBuilder.*(node.Content() == "Top" ? &TProgramBuilder::Top : &TProgramBuilder::TopSort))
  1101. (list, count, ascending, [&](TRuntimeNode item) {
  1102. return MkqlBuildLambda(*node.Child(3), ctx, {item});
  1103. });
  1104. });
  1105. AddCallable("KeepTop", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1106. const auto count = MkqlBuildExpr(node.Head(), ctx);
  1107. const auto list = MkqlBuildExpr(*node.Child(1), ctx);
  1108. const auto item = MkqlBuildExpr(*node.Child(2), ctx);
  1109. const auto ascending = MkqlBuildExpr(*node.Child(3), ctx);
  1110. return ctx.ProgramBuilder.KeepTop(count, list, item, ascending, [&](TRuntimeNode item) {
  1111. return MkqlBuildLambda(*node.Child(4), ctx, {item});
  1112. });
  1113. });
  1114. AddCallable("Struct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1115. const auto structType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1116. const auto verifiedStructType = AS_TYPE(TStructType, structType);
  1117. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  1118. members.reserve(verifiedStructType->GetMembersCount());
  1119. node.ForEachChild([&](const TExprNode& child) {
  1120. members.emplace_back(child.Head().Content(), MkqlBuildExpr(child.Tail(), ctx));
  1121. });
  1122. return ctx.ProgramBuilder.NewStruct(verifiedStructType, members);
  1123. });
  1124. AddCallable("AddMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1125. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  1126. const auto memberName = node.Child(1)->Content();
  1127. const auto value = MkqlBuildExpr(node.Tail(), ctx);
  1128. return ctx.ProgramBuilder.AddMember(structObj, memberName, value);
  1129. });
  1130. AddCallable("List", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1131. const auto listType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1132. const auto itemType = AS_TYPE(TListType, listType)->GetItemType();
  1133. const auto& items = GetArgumentsFrom<1U>(node, ctx);
  1134. return ctx.ProgramBuilder.NewList(itemType, items);
  1135. });
  1136. AddCallable("FromString", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1137. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1138. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1139. return ctx.ProgramBuilder.FromString(arg, type);
  1140. });
  1141. AddCallable("StrictFromString", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1142. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1143. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1144. return ctx.ProgramBuilder.StrictFromString(arg, type);
  1145. });
  1146. AddCallable("FromBytes", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1147. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1148. const auto type = ctx.BuildType(node, *node.GetTypeAnn()->Cast<TOptionalExprType>()->GetItemType());
  1149. return ctx.ProgramBuilder.FromBytes(arg, type);
  1150. });
  1151. AddCallable("Convert", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1152. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1153. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1154. return ctx.ProgramBuilder.Convert(arg, type);
  1155. });
  1156. AddCallable("ToIntegral", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1157. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1158. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1159. return ctx.ProgramBuilder.ToIntegral(arg, type);
  1160. });
  1161. AddCallable("UnsafeTimestampCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1162. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1163. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1164. return ctx.ProgramBuilder.Convert(arg, type);
  1165. });
  1166. AddCallable("SafeCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1167. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1168. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1169. return ctx.ProgramBuilder.Cast(arg, type);
  1170. });
  1171. AddCallable("Default", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1172. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1173. return ctx.ProgramBuilder.Default(type);
  1174. });
  1175. AddCallable("Coalesce", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1176. auto ret = MkqlBuildExpr(node.Head(), ctx);
  1177. for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
  1178. auto value = MkqlBuildExpr(*node.Child(i), ctx);
  1179. ret = ctx.ProgramBuilder.Coalesce(ret, value);
  1180. }
  1181. return ret;
  1182. });
  1183. AddCallable("Unwrap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1184. const auto opt = MkqlBuildExpr(node.Head(), ctx);
  1185. const auto message = node.ChildrenSize() > 1 ? MkqlBuildExpr(node.Tail(), ctx) : ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  1186. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1187. return ctx.ProgramBuilder.Unwrap(opt, message, pos.File, pos.Row, pos.Column);
  1188. });
  1189. AddCallable("EmptyFrom", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1190. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1191. switch (node.GetTypeAnn()->GetKind()) {
  1192. case ETypeAnnotationKind::Flow:
  1193. case ETypeAnnotationKind::Stream:
  1194. return ctx.ProgramBuilder.EmptyIterator(type);
  1195. case ETypeAnnotationKind::Optional:
  1196. return ctx.ProgramBuilder.NewEmptyOptional(type);
  1197. case ETypeAnnotationKind::List:
  1198. return ctx.ProgramBuilder.NewEmptyList(AS_TYPE(TListType, type)->GetItemType());
  1199. case ETypeAnnotationKind::Dict:
  1200. return ctx.ProgramBuilder.NewDict(type, {});
  1201. default:
  1202. ythrow TNodeException(node) << "Empty from " << *node.GetTypeAnn() << " isn't supported.";
  1203. }
  1204. });
  1205. AddCallable("Nothing", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1206. const auto optType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1207. return ctx.ProgramBuilder.NewEmptyOptional(optType);
  1208. });
  1209. AddCallable("Unpickle", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1210. const auto type = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1211. const auto serialized = MkqlBuildExpr(node.Tail(), ctx);
  1212. return ctx.ProgramBuilder.Unpickle(type, serialized);
  1213. });
  1214. AddCallable("Optional", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1215. const auto optType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1216. const auto arg = MkqlBuildExpr(node.Tail(), ctx);
  1217. return ctx.ProgramBuilder.NewOptional(optType, arg);
  1218. });
  1219. AddCallable("Iterator", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1220. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1221. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  1222. return ctx.ProgramBuilder.Iterator(arg, args);
  1223. });
  1224. AddCallable("EmptyIterator", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1225. const auto streamType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1226. return ctx.ProgramBuilder.EmptyIterator(streamType);
  1227. });
  1228. AddCallable("Switch", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1229. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1230. std::vector<TSwitchInput> inputs;
  1231. ui64 memoryLimitBytes = FromString<ui64>(node.Child(1)->Content());
  1232. ui32 offset = 0;
  1233. for (ui32 i = 2; i < node.ChildrenSize(); i += 2) {
  1234. TSwitchInput input;
  1235. for (auto& child : node.Child(i)->Children()) {
  1236. input.Indicies.push_back(FromString<ui32>(child->Content()));
  1237. }
  1238. const auto& lambda = *node.Child(i + 1);
  1239. const auto& lambdaArg = lambda.Head().Head();
  1240. auto outputStreams = 1;
  1241. const auto& streamItemType = GetSeqItemType(*lambda.Tail().GetTypeAnn());
  1242. if (streamItemType.GetKind() == ETypeAnnotationKind::Variant) {
  1243. outputStreams = streamItemType.Cast<TVariantExprType>()->GetUnderlyingType()->Cast<TTupleExprType>()->GetSize();
  1244. }
  1245. if (node.ChildrenSize() > 4 || outputStreams != 1) {
  1246. input.ResultVariantOffset = offset;
  1247. }
  1248. offset += outputStreams;
  1249. input.InputType = ctx.BuildType(lambdaArg, *lambdaArg.GetTypeAnn());
  1250. inputs.emplace_back(input);
  1251. }
  1252. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1253. return ctx.ProgramBuilder.Switch(stream, inputs, [&](ui32 index, TRuntimeNode item) -> TRuntimeNode {
  1254. return MkqlBuildLambda(*node.Child(2 + 2 * index + 1), ctx, {item});
  1255. }, memoryLimitBytes, returnType);
  1256. });
  1257. AddCallable("ToStream", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1258. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1259. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  1260. return ctx.ProgramBuilder.Iterator(ctx.ProgramBuilder.ToList(arg), args);
  1261. });
  1262. AddCallable(LeftName, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1263. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1264. return ctx.ProgramBuilder.Nth(arg, 0);
  1265. });
  1266. AddCallable(RightName, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1267. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1268. return ctx.ProgramBuilder.Nth(arg, 1);
  1269. });
  1270. AddCallable("FilterNullMembers", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1271. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1272. if (node.ChildrenSize() < 2U) {
  1273. return ctx.ProgramBuilder.FilterNullMembers(list);
  1274. } else {
  1275. std::vector<std::string_view> members;
  1276. members.reserve(node.Tail().ChildrenSize());
  1277. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Content()); });
  1278. return ctx.ProgramBuilder.FilterNullMembers(list, members);
  1279. }
  1280. });
  1281. AddCallable("SkipNullMembers", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1282. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1283. if (node.ChildrenSize() < 2U) {
  1284. return ctx.ProgramBuilder.SkipNullMembers(list);
  1285. } else {
  1286. std::vector<std::string_view> members;
  1287. members.reserve(node.Tail().ChildrenSize());
  1288. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Content()); });
  1289. return ctx.ProgramBuilder.SkipNullMembers(list, members);
  1290. }
  1291. });
  1292. AddCallable("FilterNullElements", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1293. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1294. if (node.ChildrenSize() < 2U) {
  1295. return ctx.ProgramBuilder.FilterNullElements(list);
  1296. } else {
  1297. std::vector<ui32> members;
  1298. members.reserve(node.Tail().ChildrenSize());
  1299. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(FromString<ui32>(child.Content())); });
  1300. return ctx.ProgramBuilder.FilterNullElements(list, members);
  1301. }
  1302. });
  1303. AddCallable("SkipNullElements", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1304. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1305. if (node.ChildrenSize() < 2U) {
  1306. return ctx.ProgramBuilder.SkipNullElements(list);
  1307. } else {
  1308. std::vector<ui32> members;
  1309. members.reserve(node.Tail().ChildrenSize());
  1310. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(FromString<ui32>(child.Content())); });
  1311. return ctx.ProgramBuilder.SkipNullElements(list, members);
  1312. }
  1313. });
  1314. AddCallable("MapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1315. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1316. const auto dict = MkqlBuildExpr(*node.Child(1), ctx);
  1317. const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
  1318. const auto& outputItemType = GetSeqItemType(*node.GetTypeAnn());
  1319. auto rightItemType = node.Child(1U)->GetTypeAnn()->Cast<TDictExprType>()->GetPayloadType();
  1320. if (ETypeAnnotationKind::List == rightItemType->GetKind()) {
  1321. rightItemType = rightItemType->Cast<TListExprType>()->GetItemType();
  1322. }
  1323. std::vector<ui32> leftKeyColumns, leftRenames, rightRenames;
  1324. switch (const auto& inputItemType = GetSeqItemType(*node.Head().GetTypeAnn()); inputItemType.GetKind()) {
  1325. case ETypeAnnotationKind::Struct: {
  1326. const auto inputStructType = inputItemType.Cast<TStructExprType>();
  1327. const auto outputStructType = outputItemType.Cast<TStructExprType>();
  1328. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1329. bool s = false;
  1330. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputStructType : *outputStructType, child.Content())); });
  1331. switch (rightItemType->GetKind()) {
  1332. case ETypeAnnotationKind::Struct: {
  1333. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1334. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1335. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightStructType : *outputStructType, child.Content())); });
  1336. }
  1337. break;
  1338. case ETypeAnnotationKind::Tuple: {
  1339. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1340. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1341. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightTupleType, child.Content()) : *GetFieldPosition(*outputStructType, child.Content())); });
  1342. }
  1343. break;
  1344. default:
  1345. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1346. }
  1347. break;
  1348. }
  1349. case ETypeAnnotationKind::Tuple: {
  1350. const auto inputTupleType = inputItemType.Cast<TTupleExprType>();
  1351. const auto outputTupleType = outputItemType.Cast<TTupleExprType>();
  1352. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1353. bool s = false;
  1354. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputTupleType : *outputTupleType, child.Content())); });
  1355. switch (rightItemType->GetKind()) {
  1356. case ETypeAnnotationKind::Tuple: {
  1357. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1358. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1359. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); });
  1360. }
  1361. break;
  1362. case ETypeAnnotationKind::Struct: {
  1363. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1364. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1365. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightStructType, child.Content()) : *GetFieldPosition(*outputTupleType, child.Content())); });
  1366. }
  1367. break;
  1368. default:
  1369. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1370. }
  1371. break;
  1372. }
  1373. case ETypeAnnotationKind::Multi: {
  1374. const auto inputMultiType = inputItemType.Cast<TMultiExprType>();
  1375. const auto outputMultiType = outputItemType.Cast<TMultiExprType>();
  1376. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1377. bool s = false;
  1378. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputMultiType : *outputMultiType, child.Content())); });
  1379. switch (rightItemType->GetKind()) {
  1380. case ETypeAnnotationKind::Tuple: {
  1381. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1382. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1383. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightTupleType, child.Content()) : *GetFieldPosition(*outputMultiType, child.Content())); });
  1384. }
  1385. break;
  1386. case ETypeAnnotationKind::Struct: {
  1387. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1388. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1389. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightStructType, child.Content()) : *GetFieldPosition(*outputMultiType, child.Content())); });
  1390. }
  1391. break;
  1392. default:
  1393. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1394. }
  1395. break;
  1396. }
  1397. default:
  1398. ythrow TNodeException(node) << "Wrong MapJoinCore input item type: " << inputItemType;
  1399. }
  1400. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1401. return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType);
  1402. });
  1403. AddCallable("BlockStorage", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1404. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1405. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1406. return ctx.ProgramBuilder.BlockStorage(stream, returnType);
  1407. });
  1408. AddCallable("BlockMapJoinIndex", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1409. const auto blockStorage = MkqlBuildExpr(node.Head(), ctx);
  1410. const auto itemType = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TMultiExprType>();
  1411. std::vector<ui32> keyColumns;
  1412. node.Child(2)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetWideBlockFieldPosition(*itemType, child.Content())); });
  1413. const bool any = HasSetting(node.Tail(), "any");
  1414. const auto itemMkqlType = BuildType(*node.Child(1), *itemType, ctx.ProgramBuilder);
  1415. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1416. return ctx.ProgramBuilder.BlockMapJoinIndex(blockStorage, itemMkqlType, keyColumns, any, returnType);
  1417. });
  1418. AddCallable("BlockMapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1419. const auto leftStream = MkqlBuildExpr(node.Head(), ctx);
  1420. const auto rightBlockStorage = MkqlBuildExpr(*node.Child(1), ctx);
  1421. const auto leftItemType = node.Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
  1422. const auto rightItemType = node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TMultiExprType>();
  1423. const auto joinKind = GetJoinKind(node, node.Child(3)->Content());
  1424. std::vector<ui32> leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops;
  1425. node.Child(4)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); });
  1426. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftKeyDrops.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); });
  1427. node.Child(6)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); });
  1428. node.Child(7)->ForEachChild([&](const TExprNode& child){ rightKeyDrops.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); });
  1429. const auto rightItemMkqlType = BuildType(*node.Child(2), *rightItemType, ctx.ProgramBuilder);
  1430. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1431. return ctx.ProgramBuilder.BlockMapJoinCore(leftStream, rightBlockStorage, rightItemMkqlType, joinKind, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, returnType);
  1432. });
  1433. AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1434. bool selfJoin = node.Content() == "GraceSelfJoinCore";
  1435. int shift = selfJoin ? 0 : 1;
  1436. const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx);
  1437. const auto flowRight = MkqlBuildExpr(*node.Child(shift), ctx);
  1438. const auto joinKind = GetJoinKind(node, node.Child(shift + 1)->Content());
  1439. const auto& outputItemType = GetSeqItemType(*node.GetTypeAnn());
  1440. std::vector<ui32> leftKeyColumns, rightKeyColumns, leftRenames, rightRenames;
  1441. const auto& leftItemType = GetSeqItemType(*node.Child(0)->GetTypeAnn());
  1442. const auto& rightItemType = GetSeqItemType(*node.Child(shift)->GetTypeAnn());
  1443. if (leftItemType.GetKind() != ETypeAnnotationKind::Multi ||
  1444. rightItemType.GetKind() != ETypeAnnotationKind::Multi ) {
  1445. ythrow TNodeException(node) << "Wrong GraceJoinCore input item type: " << leftItemType << " " << rightItemType;
  1446. }
  1447. if (outputItemType.GetKind() != ETypeAnnotationKind::Multi ) {
  1448. ythrow TNodeException(node) << "Wrong GraceJoinCore output item type: " << outputItemType;
  1449. }
  1450. const auto leftTupleType = leftItemType.Cast<TMultiExprType>();
  1451. const auto rightTupleType = rightItemType.Cast<TMultiExprType>();
  1452. const auto outputTupleType = outputItemType.Cast<TMultiExprType>();
  1453. node.Child(shift + 2)->ForEachChild([&](TExprNode& child){
  1454. leftKeyColumns.emplace_back(*GetFieldPosition(*leftTupleType, child.Content()));
  1455. });
  1456. node.Child(shift + 3)->ForEachChild([&](TExprNode& child){
  1457. rightKeyColumns.emplace_back(*GetFieldPosition(*rightTupleType, child.Content())); });
  1458. bool s = false;
  1459. node.Child(shift + 4)->ForEachChild([&](TExprNode& child){
  1460. leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *leftTupleType : *outputTupleType, child.Content())); });
  1461. s = false;
  1462. node.Child(shift + 5)->ForEachChild([&](TExprNode& child){
  1463. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); });
  1464. auto anyJoinSettings = EAnyJoinSettings::None;
  1465. node.Tail().ForEachChild([&](const TExprNode& flag) {
  1466. if (flag.IsAtom("LeftAny"))
  1467. anyJoinSettings = EAnyJoinSettings::Right == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Left;
  1468. else if (flag.IsAtom("RightAny"))
  1469. anyJoinSettings = EAnyJoinSettings::Left == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Right;
  1470. });
  1471. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1472. return selfJoin
  1473. ? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
  1474. : ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
  1475. });
  1476. AddCallable("CommonJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1477. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1478. const auto joinKind = GetJoinKind(node, node.Child(1)->Content());
  1479. std::vector<ui32> leftColumns, rightColumns, requiredColumns, keyColumns;
  1480. ui32 tableIndexFieldPos;
  1481. switch (const auto& inputItemType = GetSeqItemType(*node.Head().GetTypeAnn()); inputItemType.GetKind()) {
  1482. case ETypeAnnotationKind::Struct: {
  1483. const auto inputStructType = inputItemType.Cast<TStructExprType>();
  1484. const auto outputStructType = GetSeqItemType(*node.GetTypeAnn()).Cast<TStructExprType>();
  1485. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1486. leftColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content()));
  1487. leftColumns.emplace_back(*GetFieldPosition(*outputStructType, child.Content()));
  1488. });
  1489. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1490. rightColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content()));
  1491. rightColumns.emplace_back(*GetFieldPosition(*outputStructType, child.Content()));
  1492. });
  1493. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1494. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1495. tableIndexFieldPos = *GetFieldPosition(*inputStructType, node.Tail().Content());
  1496. break;
  1497. }
  1498. case ETypeAnnotationKind::Tuple: {
  1499. const auto inputTupleType = inputItemType.Cast<TTupleExprType>();
  1500. ui32 i = 0U;
  1501. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1502. leftColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content()));
  1503. leftColumns.emplace_back(i++);
  1504. });
  1505. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1506. rightColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content()));
  1507. rightColumns.emplace_back(i++);
  1508. });
  1509. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1510. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1511. tableIndexFieldPos = *GetFieldPosition(*inputTupleType, node.Tail().Content());
  1512. break;
  1513. }
  1514. case ETypeAnnotationKind::Multi: {
  1515. const auto inputMultiType = inputItemType.Cast<TMultiExprType>();
  1516. ui32 i = 0U;
  1517. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1518. leftColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content()));
  1519. leftColumns.emplace_back(i++);
  1520. });
  1521. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1522. rightColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content()));
  1523. rightColumns.emplace_back(i++);
  1524. });
  1525. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1526. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1527. tableIndexFieldPos = *GetFieldPosition(*inputMultiType, node.Tail().Content());
  1528. break;
  1529. }
  1530. default:
  1531. ythrow TNodeException(node) << "Wrong CommonJoinCore input item type: " << inputItemType;
  1532. }
  1533. ui64 memLimit = 0U;
  1534. if (const auto memLimitSetting = GetSetting(*node.Child(6), "memLimit")) {
  1535. memLimit = FromString<ui64>(memLimitSetting->Child(1)->Content());
  1536. }
  1537. std::optional<ui32> sortedTableOrder;
  1538. if (const auto sortSetting = GetSetting(*node.Child(6), "sorted")) {
  1539. sortedTableOrder = sortSetting->Child(1)->Content() == "left" ? 0 : 1;
  1540. }
  1541. EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None;
  1542. if (const auto anyNode = GetSetting(*node.Child(6), "any")) {
  1543. for (auto sideNode : anyNode->Child(1)->Children()) {
  1544. YQL_ENSURE(sideNode->IsAtom());
  1545. AddAnyJoinSide(anyJoinSettings, sideNode->Content() == "left" ? EAnyJoinSettings::Left : EAnyJoinSettings::Right);
  1546. }
  1547. }
  1548. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1549. return ctx.ProgramBuilder.CommonJoinCore(list, joinKind, leftColumns, rightColumns,
  1550. requiredColumns, keyColumns, memLimit, sortedTableOrder, anyJoinSettings, tableIndexFieldPos, returnType);
  1551. });
  1552. AddCallable("CombineCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1553. NNodes::TCoCombineCore core(&node);
  1554. const auto stream = MkqlBuildExpr(core.Input().Ref(), ctx);
  1555. const auto memLimit = NNodes::TCoCombineCore::idx_MemLimit < node.ChildrenSize() ?
  1556. FromString<ui64>(core.MemLimit().Cast().Value()) : 0;
  1557. const auto keyExtractor = [&](TRuntimeNode item) {
  1558. return MkqlBuildLambda(core.KeyExtractor().Ref(), ctx, {item});
  1559. };
  1560. const auto init = [&](TRuntimeNode key, TRuntimeNode item) {
  1561. return MkqlBuildLambda(core.InitHandler().Ref(), ctx, {key, item});
  1562. };
  1563. const auto update = [&](TRuntimeNode key, TRuntimeNode item, TRuntimeNode state) {
  1564. return MkqlBuildLambda(core.UpdateHandler().Ref(), ctx, {key, item, state});
  1565. };
  1566. const auto finish = [&](TRuntimeNode key, TRuntimeNode state) {
  1567. return MkqlBuildLambda(core.FinishHandler().Ref(), ctx, {key, state});
  1568. };
  1569. return ctx.ProgramBuilder.CombineCore(stream, keyExtractor, init, update, finish, memLimit);
  1570. });
  1571. AddCallable("GroupingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1572. NNodes::TCoGroupingCore core(&node);
  1573. const auto stream = MkqlBuildExpr(core.Input().Ref(), ctx);
  1574. const auto groupSwitch = [&](TRuntimeNode key, TRuntimeNode item) {
  1575. return MkqlBuildLambda(core.GroupSwitch().Ref(), ctx, {key, item});
  1576. };
  1577. const auto keyExtractor = [&](TRuntimeNode item) {
  1578. return MkqlBuildLambda(core.KeyExtractor().Ref(), ctx, {item});
  1579. };
  1580. TProgramBuilder::TUnaryLambda handler;
  1581. if (auto lambda = core.ConvertHandler()) {
  1582. handler = [&](TRuntimeNode item) {
  1583. return MkqlBuildLambda(core.ConvertHandler().Ref(), ctx, {item});
  1584. };
  1585. }
  1586. return ctx.ProgramBuilder.GroupingCore(stream, groupSwitch, keyExtractor, handler);
  1587. });
  1588. AddCallable("Chopper", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1589. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1590. const auto keyExtractor = [&](TRuntimeNode item) {
  1591. return MkqlBuildLambda(*node.Child(1U), ctx, {item});
  1592. };
  1593. const auto groupSwitch = [&](TRuntimeNode key, TRuntimeNode item) {
  1594. return MkqlBuildLambda(*node.Child(2U), ctx, {key, item});
  1595. };
  1596. const auto handler = [&](TRuntimeNode key, TRuntimeNode flow) {
  1597. return MkqlBuildLambda(node.Tail(), ctx, {key, flow});
  1598. };
  1599. return ctx.ProgramBuilder.Chopper(stream, keyExtractor, groupSwitch, handler);
  1600. });
  1601. AddCallable("HoppingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1602. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1603. const auto timeExtractor = [&](TRuntimeNode item) {
  1604. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1605. };
  1606. const auto hop = MkqlBuildExpr(*node.Child(2), ctx);
  1607. const auto interval = MkqlBuildExpr(*node.Child(3), ctx);
  1608. const auto delay = MkqlBuildExpr(*node.Child(4), ctx);
  1609. const auto init = [&](TRuntimeNode item) {
  1610. return MkqlBuildLambda(*node.Child(5), ctx, {item});
  1611. };
  1612. const auto update = [&](TRuntimeNode item, TRuntimeNode state) {
  1613. return MkqlBuildLambda(*node.Child(6), ctx, {item, state});
  1614. };
  1615. const auto save = node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1616. return MkqlBuildLambda(*node.Child(7), ctx, {state});
  1617. };
  1618. const auto load = node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1619. return MkqlBuildLambda(*node.Child(8), ctx, {state});
  1620. };
  1621. const auto merge = [&](TRuntimeNode state1, TRuntimeNode state2) {
  1622. return MkqlBuildLambda(*node.Child(9), ctx, {state1, state2});
  1623. };
  1624. const auto finish = [&](TRuntimeNode state, TRuntimeNode time) {
  1625. return MkqlBuildLambda(*node.Child(10), ctx, {state, time});
  1626. };
  1627. return ctx.ProgramBuilder.HoppingCore(
  1628. stream, timeExtractor, init, update, save, load, merge, finish, hop, interval, delay);
  1629. });
  1630. AddCallable("MultiHoppingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1631. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1632. const auto keyExtractor = [&](TRuntimeNode item) {
  1633. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1634. };
  1635. const auto timeExtractor = [&](TRuntimeNode item) {
  1636. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1637. };
  1638. const auto hop = MkqlBuildExpr(*node.Child(3), ctx);
  1639. const auto interval = MkqlBuildExpr(*node.Child(4), ctx);
  1640. const auto delay = MkqlBuildExpr(*node.Child(5), ctx);
  1641. const auto dataWatermarks = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(6), NUdf::EDataSlot::Bool));
  1642. const auto init = [&](TRuntimeNode item) {
  1643. return MkqlBuildLambda(*node.Child(7), ctx, {item});
  1644. };
  1645. const auto update = [&](TRuntimeNode item, TRuntimeNode state) {
  1646. return MkqlBuildLambda(*node.Child(8), ctx, {item, state});
  1647. };
  1648. const auto save = node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1649. return MkqlBuildLambda(*node.Child(9), ctx, {state});
  1650. };
  1651. const auto load = node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1652. return MkqlBuildLambda(*node.Child(10), ctx, {state});
  1653. };
  1654. const auto merge = [&](TRuntimeNode state1, TRuntimeNode state2) {
  1655. return MkqlBuildLambda(*node.Child(11), ctx, {state1, state2});
  1656. };
  1657. const auto finish = [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) {
  1658. return MkqlBuildLambda(*node.Child(12), ctx, {key, state, time});
  1659. };
  1660. const auto watermarksMode = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(13), NUdf::EDataSlot::Bool));
  1661. return ctx.ProgramBuilder.MultiHoppingCore(
  1662. stream, keyExtractor, timeExtractor, init, update, save, load, merge, finish,
  1663. hop, interval, delay, dataWatermarks, watermarksMode);
  1664. });
  1665. AddCallable("ToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1666. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1667. TMaybe<bool> isMany;
  1668. TMaybe<EDictType> type;
  1669. TMaybe<ui64> itemsCount;
  1670. bool isCompact;
  1671. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1672. ythrow TNodeException(node) << error->GetMessage();
  1673. }
  1674. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1675. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::ToHashedDict : &TProgramBuilder::ToSortedDict;
  1676. return (ctx.ProgramBuilder.*factory)(list, *isMany, [&](TRuntimeNode item) {
  1677. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1678. }, [&](TRuntimeNode item) {
  1679. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1680. }, isCompact, itemsCount.GetOrElse(0));
  1681. });
  1682. AddCallable("SqueezeToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1683. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1684. TMaybe<bool> isMany;
  1685. TMaybe<EDictType> type;
  1686. TMaybe<ui64> itemsCount;
  1687. bool isCompact;
  1688. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1689. ythrow TNodeException(node) << error->GetMessage();
  1690. }
  1691. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1692. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::SqueezeToHashedDict : &TProgramBuilder::SqueezeToSortedDict;
  1693. return (ctx.ProgramBuilder.*factory)(stream, *isMany, [&](TRuntimeNode item) {
  1694. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1695. }, [&](TRuntimeNode item) {
  1696. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1697. }, isCompact, itemsCount.GetOrElse(0));
  1698. });
  1699. AddCallable("NarrowSqueezeToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1700. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1701. TMaybe<bool> isMany;
  1702. TMaybe<EDictType> type;
  1703. TMaybe<ui64> itemsCount;
  1704. bool isCompact;
  1705. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1706. ythrow TNodeException(node) << error->GetMessage();
  1707. }
  1708. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1709. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::NarrowSqueezeToHashedDict : &TProgramBuilder::NarrowSqueezeToSortedDict;
  1710. return (ctx.ProgramBuilder.*factory)(stream, *isMany, [&](TRuntimeNode::TList items) {
  1711. return MkqlBuildLambda(*node.Child(1), ctx, items);
  1712. }, [&](TRuntimeNode::TList items) {
  1713. return MkqlBuildLambda(*node.Child(2), ctx, items);
  1714. }, isCompact, itemsCount.GetOrElse(0));
  1715. });
  1716. AddCallable("GroupByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1717. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1718. const auto dict = ctx.ProgramBuilder.ToHashedDict(list, true, [&](TRuntimeNode item) {
  1719. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1720. }, [&](TRuntimeNode item) {
  1721. return item;
  1722. });
  1723. const auto values = ctx.ProgramBuilder.DictItems(dict);
  1724. return ctx.ProgramBuilder.FlatMap(values, [&](TRuntimeNode item) {
  1725. const auto key = ctx.ProgramBuilder.Nth(item, 0);
  1726. const auto payloadList = ctx.ProgramBuilder.Nth(item, 1);
  1727. return MkqlBuildLambda(*node.Child(2), ctx, {key, payloadList});
  1728. });
  1729. });
  1730. AddCallable("PartitionByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1731. const NNodes::TCoPartitionByKey partition(&node);
  1732. const auto input = MkqlBuildExpr(partition.Input().Ref(), ctx);
  1733. const auto makePartitions = [&](TRuntimeNode list) {
  1734. return ctx.ProgramBuilder.Map(
  1735. ctx.ProgramBuilder.DictItems(ctx.ProgramBuilder.ToHashedDict(list, true,
  1736. [&](TRuntimeNode item) { return MkqlBuildLambda(partition.KeySelectorLambda().Ref(), ctx, {item}); },
  1737. [&](TRuntimeNode item) { return item; }
  1738. )),
  1739. [&](TRuntimeNode pair) {
  1740. const auto payload = partition.SortDirections().Ref().IsCallable("Void") ?
  1741. ctx.ProgramBuilder.Nth(pair, 1):
  1742. ctx.ProgramBuilder.Sort(ctx.ProgramBuilder.Nth(pair, 1), MkqlBuildExpr(partition.SortDirections().Ref(), ctx),
  1743. [&](TRuntimeNode item) {
  1744. return MkqlBuildLambda(partition.SortKeySelectorLambda().Ref(), ctx, {item});
  1745. }
  1746. );
  1747. return ctx.ProgramBuilder.NewTuple({ctx.ProgramBuilder.Nth(pair, 0), ctx.ProgramBuilder.Iterator(payload, {list})});
  1748. }
  1749. );
  1750. };
  1751. switch (const auto kind = partition.Ref().GetTypeAnn()->GetKind()) {
  1752. case ETypeAnnotationKind::Flow:
  1753. case ETypeAnnotationKind::Stream: {
  1754. const auto sorted = ctx.ProgramBuilder.FlatMap(
  1755. ctx.ProgramBuilder.Condense1(input,
  1756. [&](TRuntimeNode item) { return ctx.ProgramBuilder.AsList(item); },
  1757. [&](TRuntimeNode, TRuntimeNode) { return ctx.ProgramBuilder.NewDataLiteral(false); },
  1758. [&](TRuntimeNode item, TRuntimeNode state) { return ctx.ProgramBuilder.Append(state, item); }
  1759. ),
  1760. makePartitions
  1761. );
  1762. return ETypeAnnotationKind::Stream == kind ?MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {sorted}):
  1763. ctx.ProgramBuilder.ToFlow(MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {ctx.ProgramBuilder.FromFlow(sorted)}));
  1764. }
  1765. case ETypeAnnotationKind::List: {
  1766. const auto sorted = ctx.ProgramBuilder.Iterator(makePartitions(input), {});
  1767. return ctx.ProgramBuilder.Collect(MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {sorted}));
  1768. }
  1769. default: break;
  1770. }
  1771. Y_ABORT("Wrong case.");
  1772. });
  1773. AddCallable("CombineByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1774. return CombineByKeyImpl(node, ctx);
  1775. });
  1776. AddCallable("Enumerate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1777. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1778. TRuntimeNode start;
  1779. if (node.ChildrenSize() > 1) {
  1780. start = MkqlBuildExpr(*node.Child(1), ctx);
  1781. } else {
  1782. start = ctx.ProgramBuilder.NewDataLiteral<ui64>(0);
  1783. }
  1784. TRuntimeNode step;
  1785. if (node.ChildrenSize() > 2) {
  1786. step = MkqlBuildExpr(node.Tail(), ctx);
  1787. } else {
  1788. step = ctx.ProgramBuilder.NewDataLiteral<ui64>(1);
  1789. }
  1790. return ctx.ProgramBuilder.Enumerate(arg, start, step);
  1791. });
  1792. AddCallable("Dict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1793. const auto listType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1794. const auto dictType = AS_TYPE(TDictType, listType);
  1795. std::vector<std::pair<TRuntimeNode, TRuntimeNode>> items;
  1796. for (size_t i = 1; i < node.ChildrenSize(); ++i) {
  1797. const auto key = MkqlBuildExpr(node.Child(i)->Head(), ctx);
  1798. const auto payload = MkqlBuildExpr(node.Child(i)->Tail(), ctx);
  1799. items.emplace_back(key, payload);
  1800. }
  1801. return ctx.ProgramBuilder.NewDict(dictType, items);
  1802. });
  1803. AddCallable("Variant", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1804. const auto varType = node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TVariantExprType>();
  1805. const auto type = ctx.BuildType(*node.Child(2), *varType);
  1806. const auto item = MkqlBuildExpr(node.Head(), ctx);
  1807. return varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple ?
  1808. ctx.ProgramBuilder.NewVariant(item, FromString<ui32>(node.Child(1)->Content()), type) :
  1809. ctx.ProgramBuilder.NewVariant(item, node.Child(1)->Content(), type);
  1810. });
  1811. AddCallable("DynamicVariant", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1812. const auto varType = ctx.BuildType(*node.Child(2), *node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1813. const auto item = MkqlBuildExpr(node.Head(), ctx);
  1814. const auto index = MkqlBuildExpr(*node.Child(1), ctx);
  1815. return ctx.ProgramBuilder.DynamicVariant(item, index, varType);
  1816. });
  1817. AddCallable("AsStruct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1818. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  1819. members.reserve(node.ChildrenSize());
  1820. node.ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Head().Content(), MkqlBuildExpr(child.Tail(), ctx)); });
  1821. return ctx.ProgramBuilder.NewStruct(members);
  1822. });
  1823. AddCallable("AsDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1824. std::vector<std::pair<TRuntimeNode, TRuntimeNode>> items;
  1825. items.reserve(node.ChildrenSize());
  1826. node.ForEachChild([&](const TExprNode& child){ items.emplace_back(MkqlBuildExpr(*child.Child(0), ctx), MkqlBuildExpr(*child.Child(1), ctx)); });
  1827. const auto dictType = ctx.ProgramBuilder.NewDictType(items[0].first.GetStaticType(), items[0].second.GetStaticType(), false);
  1828. return ctx.ProgramBuilder.NewDict(dictType, items);
  1829. });
  1830. AddCallable("Ensure", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1831. const auto value = MkqlBuildExpr(node.Head(), ctx);
  1832. const auto predicate = MkqlBuildExpr(*node.Child(1), ctx);
  1833. const auto message = node.ChildrenSize() > 2 ? MkqlBuildExpr(node.Tail(), ctx) : ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  1834. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1835. return ctx.ProgramBuilder.Ensure(value, predicate, message, pos.File, pos.Row, pos.Column);
  1836. });
  1837. AddCallable("Replicate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1838. const auto value = MkqlBuildExpr(node.Head(), ctx);
  1839. const auto count = MkqlBuildExpr(*node.Child(1), ctx);
  1840. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1841. return ctx.ProgramBuilder.Replicate(value, count, pos.File, pos.Row, pos.Column);
  1842. });
  1843. AddCallable("IfPresent", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1844. TRuntimeNode::TList optionals;
  1845. const auto width = node.ChildrenSize() - 2U;
  1846. optionals.reserve(width);
  1847. auto i = 0U;
  1848. std::generate_n(std::back_inserter(optionals), width, [&](){ return MkqlBuildExpr(*node.Child(i++), ctx); });
  1849. const auto elseBranch = MkqlBuildExpr(node.Tail(), ctx);
  1850. return ctx.ProgramBuilder.IfPresent(optionals, [&](TRuntimeNode::TList items) {
  1851. return MkqlBuildLambda(*node.Child(width), ctx, items);
  1852. }, elseBranch);
  1853. });
  1854. AddCallable({"DataType",
  1855. "ListType",
  1856. "OptionalType",
  1857. "TupleType",
  1858. "StructType",
  1859. "DictType",
  1860. "VoidType",
  1861. "NullType",
  1862. "CallableType",
  1863. "UnitType",
  1864. "GenericType",
  1865. "ResourceType",
  1866. "TaggedType",
  1867. "VariantType",
  1868. "StreamType",
  1869. "FlowType",
  1870. "EmptyListType",
  1871. "EmptyDictType"},
  1872. [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1873. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1874. return TRuntimeNode(type, true);
  1875. });
  1876. AddCallable("ParseType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1877. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1878. return TRuntimeNode(type, true);
  1879. });
  1880. AddCallable("TypeOf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1881. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1882. return TRuntimeNode(type, true);
  1883. });
  1884. AddCallable("EmptyList", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1885. Y_UNUSED(node);
  1886. return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyListLazy(), true);
  1887. });
  1888. AddCallable("EmptyDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1889. Y_UNUSED(node);
  1890. return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyDictLazy(), true);
  1891. });
  1892. AddCallable("SourceOf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1893. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1894. return ctx.ProgramBuilder.SourceOf(type);
  1895. });
  1896. AddCallable("TypeHandle", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1897. const auto type = node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  1898. const auto yson = WriteTypeToYson(type);
  1899. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1900. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1901. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(yson));
  1902. return TRuntimeNode(call.Build(), false);
  1903. });
  1904. AddCallable("ReprCode", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1905. const auto type = node.Head().GetTypeAnn();
  1906. const auto yson = WriteTypeToYson(type);
  1907. const auto& args = GetAllArguments(node, ctx);
  1908. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1909. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1910. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1911. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1912. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1913. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1914. for (auto arg : args) {
  1915. call.Add(arg);
  1916. }
  1917. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(yson));
  1918. return TRuntimeNode(call.Build(), false);
  1919. });
  1920. // safe and position unaware
  1921. AddCallable({
  1922. "SerializeTypeHandle",
  1923. "TypeKind",
  1924. "FormatCode",
  1925. "FormatCodeWithPositions",
  1926. "SerializeCode",
  1927. }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1928. const auto& args = GetAllArguments(node, ctx);
  1929. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1930. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1931. for (auto arg : args) {
  1932. call.Add(arg);
  1933. }
  1934. return TRuntimeNode(call.Build(), false);
  1935. });
  1936. // with position
  1937. AddCallable({
  1938. "ParseTypeHandle",
  1939. "DataTypeComponents",
  1940. "DataTypeHandle",
  1941. "OptionalItemType",
  1942. "OptionalTypeHandle",
  1943. "ListItemType",
  1944. "ListTypeHandle",
  1945. "StreamItemType",
  1946. "StreamTypeHandle",
  1947. "TupleTypeComponents",
  1948. "TupleTypeHandle",
  1949. "StructTypeComponents",
  1950. "StructTypeHandle",
  1951. "DictTypeComponents",
  1952. "DictTypeHandle",
  1953. "ResourceTypeTag",
  1954. "ResourceTypeHandle",
  1955. "TaggedTypeComponents",
  1956. "TaggedTypeHandle",
  1957. "VariantUnderlyingType",
  1958. "VariantTypeHandle",
  1959. "VoidTypeHandle",
  1960. "NullTypeHandle",
  1961. "EmptyListTypeHandle",
  1962. "EmptyDictTypeHandle",
  1963. "CallableTypeComponents",
  1964. "CallableArgument",
  1965. "CallableTypeHandle",
  1966. "PgTypeName",
  1967. "PgTypeHandle",
  1968. "WorldCode",
  1969. "AtomCode",
  1970. "ListCode",
  1971. "FuncCode",
  1972. }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1973. const auto& args = GetAllArguments(node, ctx);
  1974. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1975. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1976. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1977. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1978. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1979. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1980. for (auto arg : args) {
  1981. call.Add(arg);
  1982. }
  1983. return TRuntimeNode(call.Build(), false);
  1984. });
  1985. AddCallable("LambdaCode", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1986. const auto lambda = node.Child(node.ChildrenSize() - 1);
  1987. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1988. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1989. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1990. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1991. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1992. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1993. if (node.ChildrenSize() == 2) {
  1994. auto count = MkqlBuildExpr(node.Head(), ctx);
  1995. call.Add(count);
  1996. } else {
  1997. call.Add(ctx.ProgramBuilder.NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
  1998. }
  1999. TRuntimeNode body;
  2000. {
  2001. TMkqlBuildContext::TArgumentsMap innerArguments;
  2002. innerArguments.reserve(lambda->Head().ChildrenSize());
  2003. lambda->Head().ForEachChild([&](const TExprNode& argNode) {
  2004. const auto argType = ctx.BuildType(argNode, *argNode.GetTypeAnn());
  2005. const auto arg = ctx.ProgramBuilder.Arg(argType);
  2006. innerArguments.emplace(&argNode, arg);
  2007. call.Add(arg);
  2008. });
  2009. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda->UniqueId());
  2010. body = MkqlBuildExpr(*lambda->Child(1), innerCtx);
  2011. }
  2012. call.Add(body);
  2013. return TRuntimeNode(call.Build(), false);
  2014. });
  2015. AddCallable("FormatType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2016. TRuntimeNode str;
  2017. if (node.Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource) {
  2018. auto handle = MkqlBuildExpr(node.Head(), ctx);
  2019. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2020. call.Add(handle);
  2021. str = TRuntimeNode(call.Build(), false);
  2022. } else {
  2023. str = ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(FormatType(node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType()));
  2024. }
  2025. return str;
  2026. });
  2027. AddCallable("FormatTypeDiff", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2028. if (node.Child(0)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource) { // if we got resource + resource
  2029. YQL_ENSURE(node.Child(1)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource);
  2030. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2031. call.Add(MkqlBuildExpr(*node.Child(0), ctx));
  2032. call.Add(MkqlBuildExpr(*node.Child(1), ctx));
  2033. call.Add(ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(2), NUdf::EDataSlot::Bool)));
  2034. return TRuntimeNode(call.Build(), false);
  2035. } else { // if we got type + type
  2036. bool pretty = FromString<bool>(*node.Child(2), NUdf::EDataSlot::Bool);
  2037. const auto type_left = node.Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  2038. const auto type_right = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  2039. return pretty ? ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(NYql::GetTypePrettyDiff(*type_left, *type_right)) :
  2040. ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(NYql::GetTypeDiff(*type_left, *type_right));
  2041. }
  2042. });
  2043. AddCallable("Void", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2044. return ctx.ProgramBuilder.NewVoid();
  2045. });
  2046. AddCallable("Null", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2047. return ctx.ProgramBuilder.NewNull();
  2048. });
  2049. AddCallable({ "AsTagged","Untag" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2050. auto input = MkqlBuildExpr(node.Head(), ctx);
  2051. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2052. return ctx.ProgramBuilder.Nop(input, returnType);
  2053. });
  2054. AddCallable({"TableSource", "WideTableSource"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2055. return MkqlBuildExpr(node.Head(), ctx);
  2056. });
  2057. AddCallable({"WithWorld"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2058. return MkqlBuildExpr(node.Head(), ctx);
  2059. });
  2060. AddCallable("Error", [](const TExprNode& node, TMkqlBuildContext& ctx)->NKikimr::NMiniKQL::TRuntimeNode {
  2061. const auto err = node.GetTypeAnn()->Cast<TErrorExprType>()->GetError();
  2062. ythrow TNodeException(ctx.ExprCtx.AppendPosition(err.Position)) << err.GetMessage();
  2063. });
  2064. AddCallable("ErrorType", [](const TExprNode& node, TMkqlBuildContext& ctx)->NKikimr::NMiniKQL::TRuntimeNode {
  2065. const auto err = node.GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TErrorExprType>()->GetError();
  2066. ythrow TNodeException(ctx.ExprCtx.AppendPosition(err.Position)) << err.GetMessage();
  2067. });
  2068. AddCallable("Join", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2069. const auto list1 = MkqlBuildExpr(node.Head(), ctx);
  2070. const auto list2 = MkqlBuildExpr(*node.Child(1), ctx);
  2071. const auto dict1 = ctx.ProgramBuilder.ToHashedDict(list1, true, [&](TRuntimeNode item) {
  2072. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  2073. }, [&](TRuntimeNode item) {
  2074. return item;
  2075. });
  2076. const auto dict2 = ctx.ProgramBuilder.ToHashedDict(list2, true, [&](TRuntimeNode item) {
  2077. return MkqlBuildLambda(*node.Child(3), ctx, {item});
  2078. }, [&](TRuntimeNode item) {
  2079. return item;
  2080. });
  2081. const auto joinKind = GetJoinKind(node, node.Child(4)->Content());
  2082. return ctx.ProgramBuilder.JoinDict(dict1, true, dict2, true, joinKind);
  2083. });
  2084. AddCallable("JoinDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2085. const auto dict1 = MkqlBuildExpr(*node.Child(0), ctx);
  2086. const auto dict2 = MkqlBuildExpr(*node.Child(1), ctx);
  2087. const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
  2088. bool multi1 = true, multi2 = true;
  2089. if (node.ChildrenSize() > 3) {
  2090. node.Tail().ForEachChild([&](const TExprNode& flag){
  2091. if (const auto& content = flag.Content(); content == "LeftUnique")
  2092. multi1 = false;
  2093. else if ( content == "RightUnique")
  2094. multi2 = false;
  2095. });
  2096. }
  2097. return ctx.ProgramBuilder.JoinDict(dict1, multi1, dict2, multi2, joinKind);
  2098. });
  2099. AddCallable({"FilePath", "FileContent", "FolderPath"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2100. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2101. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content()));
  2102. return TRuntimeNode(call.Build(), false);
  2103. });
  2104. AddCallable("TablePath", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2105. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  2106. });
  2107. AddCallable("TableRecord", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2108. return ctx.ProgramBuilder.NewDataLiteral<ui64>(0);
  2109. });
  2110. AddCallable("Udf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2111. YQL_ENSURE(node.ChildrenSize() == 8);
  2112. std::string_view function = node.Head().Content();
  2113. const auto runConfig = MkqlBuildExpr(*node.Child(1), ctx);
  2114. const auto userType = ctx.BuildType(*node.Child(2), *node.Child(2)->GetTypeAnn());
  2115. const auto typeConfig = node.Child(3)->Content();
  2116. const auto callableType = ctx.BuildType(node, *node.GetTypeAnn());
  2117. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2118. return ctx.ProgramBuilder.TypedUdf(function, callableType, runConfig, userType, typeConfig,
  2119. pos.File, pos.Row, pos.Column);
  2120. });
  2121. AddCallable("ScriptUdf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2122. EScriptType scriptType = ScriptTypeFromStr(node.Head().Content());
  2123. if (scriptType == EScriptType::Unknown) {
  2124. ythrow TNodeException(node.Head())
  2125. << "Unknown script type '"
  2126. << node.Head().Content() << '\'';
  2127. }
  2128. std::string_view funcName = node.Child(1)->Content();
  2129. const auto typeNode = node.Child(2);
  2130. const auto funcType = ctx.BuildType(*typeNode, *typeNode->GetTypeAnn());
  2131. const auto script = MkqlBuildExpr(*node.Child(3), ctx);
  2132. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2133. return ctx.ProgramBuilder.ScriptUdf(node.Head().Content(), funcName, funcType, script,
  2134. pos.File, pos.Row, pos.Column);
  2135. });
  2136. AddCallable("Apply", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2137. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2138. const auto callable = MkqlBuildExpr(node.Head(), ctx);
  2139. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  2140. return ctx.ProgramBuilder.Apply(callable, args, pos.File, pos.Row, pos.Column);
  2141. });
  2142. AddCallable("NamedApply", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2143. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2144. const auto callable = MkqlBuildExpr(node.Head(), ctx);
  2145. const auto positionalArgs = MkqlBuildExpr(*node.Child(1), ctx);
  2146. const auto namedArgs = MkqlBuildExpr(*node.Child(2), ctx);
  2147. const auto dependentNodes = node.ChildrenSize() - 3;
  2148. const auto callableType = node.Head().GetTypeAnn()->Cast<TCallableExprType>();
  2149. const auto tupleType = node.Child(1)->GetTypeAnn()->Cast<TTupleExprType>();
  2150. const auto structType = node.Child(2)->GetTypeAnn()->Cast<TStructExprType>();
  2151. std::vector<TRuntimeNode> args(callableType->GetArgumentsSize() + dependentNodes);
  2152. for (size_t i = 0; i < tupleType->GetSize(); ++i) {
  2153. args[i] = node.Child(1)->IsList() ?
  2154. MkqlBuildExpr(*node.Child(1)->Child(i), ctx):
  2155. ctx.ProgramBuilder.Nth(positionalArgs, i);
  2156. }
  2157. for (size_t i = 0; i < structType->GetSize(); ++i) {
  2158. auto memberName = structType->GetItems()[i]->GetName();
  2159. auto index = callableType->ArgumentIndexByName(memberName);
  2160. if (!index || *index < tupleType->GetSize()) {
  2161. ythrow TNodeException(node.Child(2)) << "Wrong named argument: " << memberName;
  2162. }
  2163. TRuntimeNode arg;
  2164. if (node.Child(2)->IsCallable("AsStruct")) {
  2165. for (auto& child : node.Child(2)->Children()) {
  2166. if (child->Head().Content() == memberName) {
  2167. arg = MkqlBuildExpr(child->Tail(), ctx);
  2168. break;
  2169. }
  2170. }
  2171. if (!arg.GetNode()) {
  2172. ythrow TNodeException(node.Child(2)) << "Missing argument: " << memberName;
  2173. }
  2174. }
  2175. else {
  2176. arg = ctx.ProgramBuilder.Member(namedArgs, memberName);
  2177. }
  2178. args[*index] = arg;
  2179. }
  2180. for (ui32 i = tupleType->GetSize(); i < callableType->GetArgumentsSize(); ++i) {
  2181. auto& arg = args[i];
  2182. if (arg.GetNode()) {
  2183. continue;
  2184. }
  2185. auto mkqlType = ctx.BuildType(node, *callableType->GetArguments()[i].Type);
  2186. arg = ctx.ProgramBuilder.NewEmptyOptional(mkqlType);
  2187. }
  2188. for (ui32 i = 0; i < dependentNodes; ++i) {
  2189. args[callableType->GetArgumentsSize() + i] = MkqlBuildExpr(*node.Child(3 + i), ctx);
  2190. }
  2191. return ctx.ProgramBuilder.Apply(callable, args, pos.File, pos.Row, pos.Column, dependentNodes);
  2192. });
  2193. AddCallable("Callable", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2194. const auto callableType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn());
  2195. return ctx.ProgramBuilder.Callable(callableType, [&](const TArrayRef<const TRuntimeNode>& args) {
  2196. const auto& lambda = node.Tail();
  2197. TMkqlBuildContext::TArgumentsMap innerArguments;
  2198. innerArguments.reserve(lambda.Head().ChildrenSize());
  2199. MKQL_ENSURE(args.size() == lambda.Head().ChildrenSize(), "Mismatch of lambda arguments count");
  2200. auto it = args.cbegin();
  2201. lambda.Head().ForEachChild([&](const TExprNode& arg){ innerArguments.emplace(&arg, *it++); });
  2202. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2203. return MkqlBuildExpr(lambda.Tail(), innerCtx);
  2204. });
  2205. });
  2206. AddCallable("PgConst", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2207. auto type = AS_TYPE(TPgType, ctx.BuildType(node, *node.GetTypeAnn()));
  2208. TRuntimeNode typeMod;
  2209. if (node.ChildrenSize() >= 3) {
  2210. typeMod = MkqlBuildExpr(*node.Child(2), ctx);
  2211. }
  2212. auto typeMod1 = typeMod;
  2213. if (node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "interval" &&
  2214. node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "_interval") {
  2215. typeMod1 = TRuntimeNode();
  2216. }
  2217. auto ret = ctx.ProgramBuilder.PgConst(type, node.Head().Content(), typeMod1);
  2218. if (node.ChildrenSize() >= 3) {
  2219. return ctx.ProgramBuilder.PgCast(ret, type, typeMod);
  2220. } else {
  2221. return ret;
  2222. }
  2223. });
  2224. AddCallable("PgInternal0", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2225. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2226. return ctx.ProgramBuilder.PgInternal0(returnType);
  2227. });
  2228. AddCallable({"PgResolvedCall","PgResolvedCallCtx" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2229. auto name = node.Head().Content();
  2230. auto id = FromString<ui32>(node.Child(1)->Content());
  2231. std::vector<TRuntimeNode> args;
  2232. args.reserve(node.ChildrenSize() - 3);
  2233. for (ui32 i = 3; i < node.ChildrenSize(); ++i) {
  2234. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2235. }
  2236. bool rangeFunction = false;
  2237. for (const auto& child : node.Child(2)->Children()) {
  2238. if (child->Head().Content() == "range") {
  2239. rangeFunction = true;
  2240. }
  2241. }
  2242. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2243. return ctx.ProgramBuilder.PgResolvedCall(node.IsCallable("PgResolvedCallCtx"), name, id, args, returnType, rangeFunction);
  2244. });
  2245. AddCallable("PgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2246. auto operId = FromString<ui32>(node.Child(1)->Content());
  2247. auto procId = NPg::LookupOper(operId).ProcId;
  2248. auto procName = NPg::LookupProc(procId).Name;
  2249. std::vector<TRuntimeNode> args;
  2250. args.reserve(node.ChildrenSize() - 2);
  2251. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2252. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2253. }
  2254. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2255. return ctx.ProgramBuilder.PgResolvedCall(false, procName, procId, args, returnType, false);
  2256. });
  2257. AddCallable("BlockPgResolvedCall", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2258. auto name = node.Head().Content();
  2259. auto id = FromString<ui32>(node.Child(1)->Content());
  2260. std::vector<TRuntimeNode> args;
  2261. args.reserve(node.ChildrenSize() - 3);
  2262. for (ui32 i = 3; i < node.ChildrenSize(); ++i) {
  2263. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2264. }
  2265. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2266. return ctx.ProgramBuilder.BlockPgResolvedCall(name, id, args, returnType);
  2267. });
  2268. AddCallable("BlockPgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2269. auto operId = FromString<ui32>(node.Child(1)->Content());
  2270. auto procId = NPg::LookupOper(operId).ProcId;
  2271. auto procName = NPg::LookupProc(procId).Name;
  2272. std::vector<TRuntimeNode> args;
  2273. args.reserve(node.ChildrenSize() - 2);
  2274. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2275. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2276. }
  2277. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2278. return ctx.ProgramBuilder.BlockPgResolvedCall(procName, procId, args, returnType);
  2279. });
  2280. AddCallable("PgCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2281. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2282. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2283. TRuntimeNode typeMod;
  2284. if (node.ChildrenSize() >= 3) {
  2285. typeMod = MkqlBuildExpr(*node.Child(2), ctx);
  2286. }
  2287. auto typeMod1 = typeMod;
  2288. if (node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "interval" &&
  2289. node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "_interval") {
  2290. typeMod1 = TRuntimeNode();
  2291. }
  2292. if (node.Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Null) {
  2293. auto sourceTypeId = node.Head().GetTypeAnn()->Cast<TPgExprType>()->GetId();
  2294. auto targetTypeId = node.GetTypeAnn()->Cast<TPgExprType>()->GetId();
  2295. const auto& sourceTypeDesc = NPg::LookupType(sourceTypeId);
  2296. const auto& targetTypeDesc = NPg::LookupType(targetTypeId);
  2297. const bool isSourceArray = sourceTypeDesc.TypeId == sourceTypeDesc.ArrayTypeId;
  2298. const bool isTargetArray = targetTypeDesc.TypeId == targetTypeDesc.ArrayTypeId;
  2299. if (isSourceArray == isTargetArray && NPg::HasCast(
  2300. isSourceArray ? sourceTypeDesc.ElementTypeId : sourceTypeId,
  2301. isTargetArray ? targetTypeDesc.ElementTypeId : targetTypeId)) {
  2302. typeMod1 = typeMod;
  2303. }
  2304. }
  2305. auto cast = ctx.ProgramBuilder.PgCast(input, returnType, typeMod1);
  2306. if (node.ChildrenSize() >= 3) {
  2307. return ctx.ProgramBuilder.PgCast(cast, returnType, typeMod);
  2308. } else {
  2309. return cast;
  2310. }
  2311. });
  2312. AddCallable("FromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2313. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2314. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2315. return ctx.ProgramBuilder.FromPg(input, returnType);
  2316. });
  2317. AddCallable("ToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2318. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2319. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2320. return ctx.ProgramBuilder.ToPg(input, returnType);
  2321. });
  2322. AddCallable("BlockFromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2323. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2324. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2325. return ctx.ProgramBuilder.BlockFromPg(input, returnType);
  2326. });
  2327. AddCallable("BlockToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2328. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2329. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2330. return ctx.ProgramBuilder.BlockToPg(input, returnType);
  2331. });
  2332. AddCallable("PgClone", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2333. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2334. if (IsNull(node.Head())) {
  2335. return input;
  2336. }
  2337. if (NPg::LookupType(node.GetTypeAnn()->Cast<TPgExprType>()->GetId()).PassByValue) {
  2338. return input;
  2339. }
  2340. TVector<TRuntimeNode> dependentNodes;
  2341. for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
  2342. dependentNodes.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2343. }
  2344. return ctx.ProgramBuilder.PgClone(input, dependentNodes);
  2345. });
  2346. AddCallable("PgTableContent", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2347. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2348. return ctx.ProgramBuilder.PgTableContent(
  2349. node.Child(0)->Content(),
  2350. node.Child(1)->Content(),
  2351. returnType);
  2352. });
  2353. AddCallable("PgToRecord", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2354. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2355. TVector<std::pair<std::string_view, std::string_view>> members;
  2356. for (auto child : node.Child(1)->Children()) {
  2357. members.push_back({child->Head().Content(), child->Tail().Content()});
  2358. }
  2359. return ctx.ProgramBuilder.PgToRecord(input, members);
  2360. });
  2361. AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2362. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2363. return ctx.ProgramBuilder.WithContext(input, node.Child(1)->Content());
  2364. });
  2365. AddCallable("BlockFunc", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2366. TVector<TRuntimeNode> args;
  2367. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2368. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2369. }
  2370. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2371. return ctx.ProgramBuilder.BlockFunc(node.Child(0)->Content(), returnType, args);
  2372. });
  2373. AddCallable("BlockMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2374. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  2375. const auto name = node.Tail().Content();
  2376. return ctx.ProgramBuilder.BlockMember(structObj, name);
  2377. });
  2378. AddCallable("BlockNth", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2379. const auto tupleObj = MkqlBuildExpr(node.Head(), ctx);
  2380. const auto index = FromString<ui32>(node.Tail().Content());
  2381. return ctx.ProgramBuilder.BlockNth(tupleObj, index);
  2382. });
  2383. AddCallable("BlockAsStruct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2384. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  2385. for (const auto& x : node.Children()) {
  2386. members.emplace_back(x->Head().Content(), MkqlBuildExpr(x->Tail(), ctx));
  2387. }
  2388. return ctx.ProgramBuilder.BlockAsStruct(members);
  2389. });
  2390. AddCallable("BlockAsTuple", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2391. TVector<TRuntimeNode> args;
  2392. for (const auto& x : node.Children()) {
  2393. args.push_back(MkqlBuildExpr(*x, ctx));
  2394. }
  2395. return ctx.ProgramBuilder.BlockAsTuple(args);
  2396. });
  2397. AddCallable("BlockCombineAll", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2398. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2399. std::optional<ui32> filterColumn;
  2400. if (!node.Child(1)->IsCallable("Void")) {
  2401. filterColumn = FromString<ui32>(node.Child(1)->Content());
  2402. }
  2403. TVector<TAggInfo> aggs;
  2404. for (const auto& agg : node.Child(2)->Children()) {
  2405. TAggInfo info;
  2406. info.Name = TString(agg->Head().Head().Content());
  2407. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2408. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2409. }
  2410. aggs.push_back(info);
  2411. }
  2412. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2413. return ctx.ProgramBuilder.BlockCombineAll(arg, filterColumn, aggs, returnType);
  2414. });
  2415. AddCallable("BlockCombineHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2416. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2417. std::optional<ui32> filterColumn;
  2418. if (!node.Child(1)->IsCallable("Void")) {
  2419. filterColumn = FromString<ui32>(node.Child(1)->Content());
  2420. }
  2421. TVector<ui32> keys;
  2422. for (const auto& key : node.Child(2)->Children()) {
  2423. keys.push_back(FromString<ui32>(key->Content()));
  2424. }
  2425. TVector<TAggInfo> aggs;
  2426. for (const auto& agg : node.Child(3)->Children()) {
  2427. TAggInfo info;
  2428. info.Name = TString(agg->Head().Head().Content());
  2429. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2430. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2431. }
  2432. aggs.push_back(info);
  2433. }
  2434. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2435. return ctx.ProgramBuilder.BlockCombineHashed(arg, filterColumn, keys, aggs, returnType);
  2436. });
  2437. AddCallable("BlockMergeFinalizeHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2438. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2439. TVector<ui32> keys;
  2440. for (const auto& key : node.Child(1)->Children()) {
  2441. keys.push_back(FromString<ui32>(key->Content()));
  2442. }
  2443. TVector<TAggInfo> aggs;
  2444. for (const auto& agg : node.Child(2)->Children()) {
  2445. TAggInfo info;
  2446. info.Name = TString(agg->Head().Head().Content());
  2447. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2448. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2449. }
  2450. aggs.push_back(info);
  2451. }
  2452. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2453. return ctx.ProgramBuilder.BlockMergeFinalizeHashed(arg, keys, aggs, returnType);
  2454. });
  2455. AddCallable("BlockMergeManyFinalizeHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2456. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2457. TVector<ui32> keys;
  2458. for (const auto& key : node.Child(1)->Children()) {
  2459. keys.push_back(FromString<ui32>(key->Content()));
  2460. }
  2461. TVector<TAggInfo> aggs;
  2462. for (const auto& agg : node.Child(2)->Children()) {
  2463. TAggInfo info;
  2464. info.Name = TString(agg->Head().Head().Content());
  2465. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2466. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2467. }
  2468. aggs.push_back(info);
  2469. }
  2470. ui32 streamIndex = FromString<ui32>(node.Child(3)->Content());
  2471. TVector<TVector<ui32>> streams;
  2472. for (const auto& child : node.Child(4)->Children()) {
  2473. auto& stream = streams.emplace_back();
  2474. for (const auto& atom : child->Children()) {
  2475. stream.emplace_back(FromString<ui32>(atom->Content()));
  2476. }
  2477. }
  2478. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2479. return ctx.ProgramBuilder.BlockMergeManyFinalizeHashed(arg, keys, aggs, streamIndex, streams, returnType);
  2480. });
  2481. AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2482. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  2483. const auto index = FromString<ui32>(node.Child(1)->Content());
  2484. return ctx.ProgramBuilder.BlockCompress(flow, index);
  2485. });
  2486. AddCallable("PgArray", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2487. std::vector<TRuntimeNode> args;
  2488. args.reserve(node.ChildrenSize());
  2489. for (ui32 i = 0; i < node.ChildrenSize(); ++i) {
  2490. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2491. }
  2492. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2493. return ctx.ProgramBuilder.PgArray(args, returnType);
  2494. });
  2495. AddCallable("QueueCreate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2496. const auto initCapacity = MkqlBuildExpr(*node.Child(1), ctx);
  2497. const auto initSize = MkqlBuildExpr(*node.Child(2), ctx);
  2498. const auto& args = GetArgumentsFrom<3U>(node, ctx);
  2499. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2500. return ctx.ProgramBuilder.QueueCreate(initCapacity, initSize, args, returnType);
  2501. });
  2502. AddCallable("QueuePeek", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2503. const auto resource = MkqlBuildExpr(node.Head(), ctx);
  2504. const auto index = MkqlBuildExpr(*node.Child(1), ctx);
  2505. const auto& args = GetArgumentsFrom<2U>(node, ctx);
  2506. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2507. return ctx.ProgramBuilder.QueuePeek(resource, index, args, returnType);
  2508. });
  2509. AddCallable("QueueRange", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2510. const auto resource = MkqlBuildExpr(node.Head(), ctx);
  2511. const auto begin = MkqlBuildExpr(*node.Child(1), ctx);
  2512. const auto end = MkqlBuildExpr(*node.Child(2), ctx);
  2513. const auto& args = GetArgumentsFrom<3U>(node, ctx);
  2514. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2515. return ctx.ProgramBuilder.QueueRange(resource, begin, end, args, returnType);
  2516. });
  2517. AddCallable("Seq", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2518. const auto& args = GetArgumentsFrom<0U>(node, ctx);
  2519. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2520. return ctx.ProgramBuilder.Seq(args, returnType);
  2521. });
  2522. AddCallable("FromYsonSimpleType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2523. const auto input = MkqlBuildExpr(node.Head(), ctx);
  2524. const auto schemeType = ParseDataType(node, node.Child(1)->Content());
  2525. return ctx.ProgramBuilder.FromYsonSimpleType(input, schemeType);
  2526. });
  2527. AddCallable("TryWeakMemberFromDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2528. const auto other = MkqlBuildExpr(node.Head(), ctx);
  2529. const auto rest = MkqlBuildExpr(*node.Child(1), ctx);
  2530. const auto schemeType = ParseDataType(node, node.Child(2)->Content());
  2531. const auto member = node.Child(3)->Content();
  2532. return ctx.ProgramBuilder.TryWeakMemberFromDict(other, rest, schemeType, member);
  2533. });
  2534. AddCallable("DependsOn", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2535. return MkqlBuildExpr(node.Head(), ctx);
  2536. });
  2537. AddCallable("Parameter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2538. const NNodes::TCoParameter parameter(&node);
  2539. return ctx.ProgramBuilder.Member(ctx.Parameters, parameter.Name());
  2540. });
  2541. AddCallable("SecureParam", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2542. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content());
  2543. });
  2544. AddCallable(SkippableCallables, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2545. return MkqlBuildExpr(node.Head(), ctx);
  2546. });
  2547. AddCallable({ "AssumeStrict", "AssumeNonStrict", "Likely" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2548. return MkqlBuildExpr(node.Head(), ctx);
  2549. });
  2550. AddCallable("Merge", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2551. const auto& args = GetAllArguments(node, ctx);
  2552. auto extend = ctx.ProgramBuilder.Extend(args);
  2553. if (auto sortConstr = node.GetConstraint<TSortedConstraintNode>()) {
  2554. const auto input = MkqlBuildExpr(node.Head(), ctx);
  2555. const auto& content = sortConstr->GetContent();
  2556. std::vector<TRuntimeNode> ascending;
  2557. ascending.reserve(content.size());
  2558. for (const auto& c: content) {
  2559. ascending.push_back(ctx.ProgramBuilder.NewDataLiteral(c.second));
  2560. }
  2561. TProgramBuilder::TUnaryLambda keyExractor = [&](TRuntimeNode item) {
  2562. std::vector<TRuntimeNode> keys;
  2563. keys.reserve(content.size());
  2564. for (const auto& c : content) {
  2565. if (c.first.front().empty())
  2566. keys.push_back(item);
  2567. else {
  2568. MKQL_ENSURE(c.first.front().size() == 1U, "Just column expected.");
  2569. keys.push_back(ctx.ProgramBuilder.Member(item, c.first.front().front()));
  2570. }
  2571. }
  2572. return ctx.ProgramBuilder.NewTuple(keys);
  2573. };
  2574. return ctx.ProgramBuilder.Sort(extend, ctx.ProgramBuilder.NewTuple(ascending), keyExractor);
  2575. }
  2576. else {
  2577. return extend;
  2578. }
  2579. });
  2580. }
  2581. TRuntimeNode MkqlBuildLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const TRuntimeNode::TList& args) {
  2582. MKQL_ENSURE(2U == lambda.ChildrenSize(), "Wide lambda isn't supported.");
  2583. TMkqlBuildContext::TArgumentsMap innerArguments;
  2584. innerArguments.reserve(args.size());
  2585. auto it = args.begin();
  2586. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  2587. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2588. return MkqlBuildExpr(lambda.Tail(), innerCtx);
  2589. }
  2590. TRuntimeNode::TList MkqlBuildWideLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const TRuntimeNode::TList& args) {
  2591. MKQL_ENSURE(0U < lambda.ChildrenSize(), "Empty lambda.");
  2592. TMkqlBuildContext::TArgumentsMap innerArguments;
  2593. innerArguments.reserve(args.size());
  2594. auto it = args.begin();
  2595. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  2596. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2597. TRuntimeNode::TList result;
  2598. result.reserve(lambda.ChildrenSize() - 1U);
  2599. for (ui32 i = 1U; i < lambda.ChildrenSize(); ++i)
  2600. result.emplace_back(MkqlBuildExpr(*lambda.Child(i), innerCtx));
  2601. return result;
  2602. }
  2603. TRuntimeNode MkqlBuildExpr(const TExprNode& node, TMkqlBuildContext& ctx) {
  2604. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  2605. const auto knownNode = currCtx->Memoization.find(&node);
  2606. if (currCtx->Memoization.cend() != knownNode) {
  2607. return knownNode->second;
  2608. }
  2609. }
  2610. switch (const auto type = node.Type()) {
  2611. case TExprNode::List:
  2612. return CheckTypeAndMemoize(node, ctx, ctx.ProgramBuilder.NewTuple(GetAllArguments(node, ctx)));
  2613. case TExprNode::Callable:
  2614. return CheckTypeAndMemoize(node, ctx, ctx.MkqlCompiler.GetCallable(node.Content())(node, ctx));
  2615. case TExprNode::Argument:
  2616. ythrow TNodeException(node) << "Unexpected argument: " << node.Content();
  2617. default:
  2618. ythrow TNodeException(node) << "Unexpected node type: " << type;
  2619. }
  2620. }
  2621. } // namespace NCommon
  2622. } // namespace NYql