yql_provider_mkql.cpp 141 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109
  1. #include "yql_provider_mkql.h"
  2. #include "yql_type_mkql.h"
  3. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  4. #include <yql/essentials/core/yql_expr_type_annotation.h>
  5. #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
  6. #include <yql/essentials/core/yql_expr_type_annotation.h>
  7. #include <yql/essentials/core/yql_match_recognize.h>
  8. #include <yql/essentials/core/yql_join.h>
  9. #include <yql/essentials/core/yql_opt_utils.h>
  10. #include <yql/essentials/minikql/mkql_node.h>
  11. #include <yql/essentials/minikql/mkql_node_cast.h>
  12. #include <yql/essentials/minikql/mkql_program_builder.h>
  13. #include <yql/essentials/minikql/mkql_runtime_version.h>
  14. #include <yql/essentials/minikql/mkql_type_ops.h>
  15. #include <yql/essentials/public/decimal/yql_decimal.h>
  16. #include <yql/essentials/parser/pg_catalog/catalog.h>
  17. #include <util/stream/null.h>
  18. #include <util/string/cast.h>
  19. #include <array>
  20. using namespace NKikimr;
  21. using namespace NKikimr::NMiniKQL;
  22. namespace NYql {
  23. namespace NCommon {
  24. TRuntimeNode WideTopImpl(const TExprNode& node, TMkqlBuildContext& ctx,
  25. TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) {
  26. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  27. const auto count = MkqlBuildExpr(*node.Child(1U), ctx);
  28. std::vector<std::pair<ui32, TRuntimeNode>> directions;
  29. directions.reserve(node.Tail().ChildrenSize());
  30. node.Tail().ForEachChild([&](const TExprNode& dir) {
  31. directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
  32. });
  33. return (ctx.ProgramBuilder.*func)(flow, count, directions);
  34. }
  35. TRuntimeNode WideSortImpl(const TExprNode& node, TMkqlBuildContext& ctx,
  36. TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) {
  37. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  38. std::vector<std::pair<ui32, TRuntimeNode>> directions;
  39. directions.reserve(node.Tail().ChildrenSize());
  40. node.Tail().ForEachChild([&](const TExprNode& dir) {
  41. directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
  42. });
  43. return (ctx.ProgramBuilder.*func)(flow, directions);
  44. }
  45. TRuntimeNode CombineByKeyImpl(const TExprNode& node, TMkqlBuildContext& ctx) {
  46. NNodes::TCoCombineByKey combine(&node);
  47. const bool isStreamOrFlow = combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream ||
  48. combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow;
  49. YQL_ENSURE(!isStreamOrFlow);
  50. const auto input = MkqlBuildExpr(combine.Input().Ref(), ctx);
  51. TRuntimeNode preMapList = ctx.ProgramBuilder.FlatMap(input, [&](TRuntimeNode item) {
  52. return MkqlBuildLambda(combine.PreMapLambda().Ref(), ctx, {item});
  53. });
  54. const auto dict = ctx.ProgramBuilder.ToHashedDict(preMapList, true, [&](TRuntimeNode item) {
  55. return MkqlBuildLambda(combine.KeySelectorLambda().Ref(), ctx, {item});
  56. }, [&](TRuntimeNode item) {
  57. return item;
  58. });
  59. const auto values = ctx.ProgramBuilder.DictItems(dict);
  60. return ctx.ProgramBuilder.FlatMap(values, [&](TRuntimeNode item) {
  61. auto key = ctx.ProgramBuilder.Nth(item, 0);
  62. auto payloadList = ctx.ProgramBuilder.Nth(item, 1);
  63. auto fold1 = ctx.ProgramBuilder.Fold1(payloadList, [&](TRuntimeNode item2) {
  64. return MkqlBuildLambda(combine.InitHandlerLambda().Ref(), ctx, {key, item2});
  65. }, [&](TRuntimeNode item2, TRuntimeNode state) {
  66. return MkqlBuildLambda(combine.UpdateHandlerLambda().Ref(), ctx, {key, item2, state});
  67. });
  68. auto res = ctx.ProgramBuilder.FlatMap(fold1, [&](TRuntimeNode state) {
  69. return MkqlBuildLambda(combine.FinishHandlerLambda().Ref(), ctx, {key, state});
  70. });
  71. return res;
  72. });
  73. }
  74. namespace {
  75. std::array<TRuntimeNode, 2U> MkqlBuildSplitLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const std::initializer_list<TRuntimeNode>& args) {
  76. TMkqlBuildContext::TArgumentsMap innerArguments;
  77. innerArguments.reserve(args.size());
  78. auto it = args.begin();
  79. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  80. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  81. const auto& body = lambda.Tail();
  82. MKQL_ENSURE(body.IsList() && body.ChildrenSize() == 2U, "Expected pair of nodes.");
  83. return {{MkqlBuildExpr(body.Head(), innerCtx), MkqlBuildExpr(body.Tail(), innerCtx)}};
  84. }
  85. TMkqlBuildContext* GetNodeContext(const TExprNode& node, TMkqlBuildContext& ctx) {
  86. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  87. const auto knownNode = currCtx->Memoization.find(&node);
  88. if (currCtx->Memoization.cend() != knownNode) {
  89. return currCtx;
  90. }
  91. }
  92. return nullptr;
  93. }
  94. TMkqlBuildContext* GetNodeContextByLambda(const TExprNode& node, TMkqlBuildContext& ctx) {
  95. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  96. if (currCtx->LambdaId == node.UniqueId()) {
  97. return currCtx;
  98. }
  99. }
  100. return nullptr;
  101. }
  102. TMkqlBuildContext* GetContextForMemoizeInUnknowScope(const TExprNode& node, TMkqlBuildContext& ctx) {
  103. TMkqlBuildContext* result = nullptr;
  104. for (const auto& c : node.Children()) {
  105. const auto& child = c->IsLambda() ? c->Tail() : *c;
  106. if (!child.IsAtom()) {
  107. auto nodeCtx = GetNodeContext(child, ctx);
  108. if (!nodeCtx) {
  109. nodeCtx = GetContextForMemoizeInUnknowScope(child, ctx);
  110. }
  111. if (!result || result->Level < nodeCtx->Level) {
  112. result = nodeCtx;
  113. if (result == &ctx) {
  114. break;
  115. }
  116. }
  117. }
  118. }
  119. if (!result) {
  120. for (result = &ctx; result->ParentCtx; result = result->ParentCtx)
  121. continue;
  122. }
  123. return result;
  124. }
  125. TMkqlBuildContext* GetContextForMemoize(const TExprNode& node, TMkqlBuildContext& ctx) {
  126. if (const auto scope = node.GetDependencyScope()) {
  127. if (const auto lambda = scope->second) {
  128. return GetNodeContextByLambda(*lambda, ctx);
  129. }
  130. } else {
  131. return GetContextForMemoizeInUnknowScope(node, ctx);
  132. }
  133. auto result = &ctx;
  134. while (result->ParentCtx) {
  135. result = result->ParentCtx;
  136. }
  137. return result;
  138. }
  139. const TRuntimeNode& CheckTypeAndMemoize(const TExprNode& node, TMkqlBuildContext& ctx, const TRuntimeNode& runtime) {
  140. if (node.GetTypeAnn()) {
  141. TNullOutput null;
  142. if (const auto type = BuildType(*node.GetTypeAnn(), ctx.ProgramBuilder, *ctx.TypeMemoization, null)) {
  143. if (!type->IsSameType(*runtime.GetStaticType())) {
  144. ythrow TNodeException(node) << "Expected: " << *type << " type, but got: " << *runtime.GetStaticType() << ".";
  145. }
  146. }
  147. }
  148. return GetContextForMemoize(node, ctx)->Memoization.emplace(&node, runtime).first->second;
  149. }
  150. std::vector<TRuntimeNode> GetAllArguments(const TExprNode& node, TMkqlBuildContext& ctx) {
  151. std::vector<TRuntimeNode> args;
  152. args.reserve(node.ChildrenSize());
  153. node.ForEachChild([&](const TExprNode& child){ args.emplace_back(MkqlBuildExpr(child, ctx)); });
  154. return args;
  155. }
  156. template <size_t From>
  157. std::vector<TRuntimeNode> GetArgumentsFrom(const TExprNode& node, TMkqlBuildContext& ctx) {
  158. std::vector<TRuntimeNode> args;
  159. args.reserve(node.ChildrenSize() - From);
  160. for (auto i = From; i < node.ChildrenSize(); ++i) {
  161. args.emplace_back(MkqlBuildExpr(*node.Child(i), ctx));
  162. }
  163. return args;
  164. }
  165. NUdf::TDataTypeId ParseDataType(const TExprNode& owner, const std::string_view& type) {
  166. if (const auto slot = NUdf::FindDataSlot(type)) {
  167. return NUdf::GetDataTypeInfo(*slot).TypeId;
  168. }
  169. ythrow TNodeException(owner) << "Unsupported data type: " << type;
  170. }
  171. EJoinKind GetJoinKind(const TExprNode& owner, const std::string_view& content) {
  172. if (content == "Inner") {
  173. return EJoinKind::Inner;
  174. }
  175. else if (content == "Left") {
  176. return EJoinKind::Left;
  177. }
  178. else if (content == "Right") {
  179. return EJoinKind::Right;
  180. }
  181. else if (content == "Full") {
  182. return EJoinKind::Full;
  183. }
  184. else if (content == "LeftOnly") {
  185. return EJoinKind::LeftOnly;
  186. }
  187. else if (content == "RightOnly") {
  188. return EJoinKind::RightOnly;
  189. }
  190. else if (content == "Exclusion") {
  191. return EJoinKind::Exclusion;
  192. }
  193. else if (content == "LeftSemi") {
  194. return EJoinKind::LeftSemi;
  195. }
  196. else if (content == "RightSemi") {
  197. return EJoinKind::RightSemi;
  198. }
  199. else if (content == "Cross") {
  200. return EJoinKind::Cross;
  201. }
  202. else {
  203. ythrow TNodeException(owner) << "Unexpected join kind: " << content;
  204. }
  205. }
  206. template<typename TLayout>
  207. std::pair<TLayout, ui16> CutTimezone(const std::string_view& atom) {
  208. const auto pos = atom.find(',');
  209. MKQL_ENSURE(std::string_view::npos != pos, "Expected two components.");
  210. return std::make_pair(::FromString<TLayout>(atom.substr(0, pos)), GetTimezoneId(atom.substr(pos + 1)));
  211. }
  212. } // namespace
  213. bool TMkqlCallableCompilerBase::HasCallable(const std::string_view& name) const {
  214. return Callables.contains(name);
  215. }
  216. void TMkqlCallableCompilerBase::AddCallable(const std::string_view& name, TCompiler compiler) {
  217. const auto result = Callables.emplace(TString(name), compiler);
  218. YQL_ENSURE(result.second, "Callable already exists: " << name);
  219. }
  220. void TMkqlCallableCompilerBase::AddCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  221. for (const auto& name : names) {
  222. AddCallable(name, compiler);
  223. }
  224. }
  225. void TMkqlCallableCompilerBase::ChainCallable(const std::string_view& name, TCompiler compiler) {
  226. auto prevCompiler = GetCallable(name);
  227. auto chainedCompiler = [compiler = std::move(compiler), prevCompiler = std::move(prevCompiler)](const TExprNode& node, TMkqlBuildContext& ctx) -> NKikimr::NMiniKQL::TRuntimeNode {
  228. if (auto res = compiler(node, ctx)) {
  229. return res;
  230. }
  231. return prevCompiler(node, ctx);
  232. };
  233. OverrideCallable(name, chainedCompiler);
  234. }
  235. void TMkqlCallableCompilerBase::ChainCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  236. for (const auto& name : names) {
  237. ChainCallable(name, compiler);
  238. }
  239. }
  240. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::UnaryFunctionMethod>>& callables) {
  241. for (const auto& callable : callables) {
  242. AddCallable(callable.first,
  243. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  244. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  245. return (ctx.ProgramBuilder.*method)(arg);
  246. }
  247. );
  248. }
  249. }
  250. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::BinaryFunctionMethod>>& callables) {
  251. for (const auto& callable : callables) {
  252. AddCallable(callable.first,
  253. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  254. const auto one = MkqlBuildExpr(node.Head(), ctx);
  255. const auto two = MkqlBuildExpr(node.Tail(), ctx);
  256. return (ctx.ProgramBuilder.*method)(one, two);
  257. }
  258. );
  259. }
  260. }
  261. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::TernaryFunctionMethod>>& callables) {
  262. for (const auto& callable : callables) {
  263. AddCallable(callable.first,
  264. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  265. const auto arg1 = MkqlBuildExpr(node.Head(), ctx);
  266. const auto arg2 = MkqlBuildExpr(*node.Child(1U), ctx);
  267. const auto arg3 = MkqlBuildExpr(node.Tail(), ctx);
  268. return (ctx.ProgramBuilder.*method)(arg1, arg2, arg3);
  269. }
  270. );
  271. }
  272. }
  273. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::ArrayFunctionMethod>>& callables) {
  274. for (const auto& callable : callables) {
  275. AddCallable(callable.first,
  276. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  277. const auto& args = GetAllArguments(node, ctx);
  278. return (ctx.ProgramBuilder.*method)(args);
  279. }
  280. );
  281. }
  282. }
  283. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::ProcessFunctionMethod>>& callables) {
  284. for (const auto& callable : callables) {
  285. AddCallable(callable.first,
  286. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  287. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  288. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(node.Tail(), ctx, {item}); };
  289. return (ctx.ProgramBuilder.*method)(arg, lambda);
  290. }
  291. );
  292. }
  293. }
  294. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::NarrowFunctionMethod>>& callables) {
  295. for (const auto& callable : callables) {
  296. AddCallable(callable.first,
  297. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  298. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  299. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildLambda(node.Tail(), ctx, items); };
  300. return (ctx.ProgramBuilder.*method)(arg, lambda);
  301. }
  302. );
  303. }
  304. }
  305. void TMkqlCallableCompilerBase::OverrideCallable(const std::string_view& name, TCompiler compiler) {
  306. const auto prevCompiler = Callables.find(name);
  307. YQL_ENSURE(Callables.cend() != prevCompiler, "Missed callable: " << name);
  308. prevCompiler->second = compiler;
  309. Callables[name] = compiler;
  310. }
  311. IMkqlCallableCompiler::TCompiler TMkqlCallableCompilerBase::GetCallable(const std::string_view& name) const {
  312. const auto compiler = Callables.find(name);
  313. YQL_ENSURE(Callables.cend() != compiler, "Missed callable: " << name);
  314. return compiler->second;
  315. }
  316. IMkqlCallableCompiler::TCompiler TMkqlCallableCompilerBase::FindCallable(const std::string_view& name) const {
  317. const auto compiler = Callables.find(name);
  318. return Callables.cend() != compiler ? compiler->second : IMkqlCallableCompiler::TCompiler();
  319. }
  320. bool TMkqlCommonCallableCompiler::HasCallable(const std::string_view& name) const {
  321. if (TMkqlCallableCompilerBase::HasCallable(name)) {
  322. return true;
  323. }
  324. return GetShared().HasCallable(name);
  325. }
  326. IMkqlCallableCompiler::TCompiler TMkqlCommonCallableCompiler::FindCallable(const std::string_view& name) const {
  327. if (const auto func = TMkqlCallableCompilerBase::FindCallable(name)) {
  328. return func;
  329. }
  330. return GetShared().FindCallable(name);
  331. }
  332. IMkqlCallableCompiler::TCompiler TMkqlCommonCallableCompiler::GetCallable(const std::string_view& name) const {
  333. if (const auto func = TMkqlCallableCompilerBase::FindCallable(name)) {
  334. return func;
  335. }
  336. return GetShared().GetCallable(name);
  337. }
  338. void TMkqlCommonCallableCompiler::OverrideCallable(const std::string_view& name, TCompiler compiler) {
  339. if (TMkqlCallableCompilerBase::HasCallable(name)) {
  340. TMkqlCallableCompilerBase::OverrideCallable(name, compiler);
  341. } else {
  342. YQL_ENSURE(GetShared().HasCallable(name));
  343. TMkqlCallableCompilerBase::AddCallable(name, compiler);
  344. }
  345. }
  346. void TMkqlCommonCallableCompiler::AddCallable(const std::string_view& name, TCompiler compiler) {
  347. YQL_ENSURE(!GetShared().HasCallable(name), "Compiler already set for callable: " << name);
  348. TMkqlCallableCompilerBase::AddCallable(name, compiler);
  349. }
  350. void TMkqlCommonCallableCompiler::AddCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  351. for (const auto& name : names) {
  352. AddCallable(name, compiler);
  353. }
  354. }
  355. TMkqlCommonCallableCompiler::TShared::TShared() {
  356. AddSimpleCallables({
  357. {"Abs", &TProgramBuilder::Abs},
  358. {"Plus", &TProgramBuilder::Plus},
  359. {"Minus", &TProgramBuilder::Minus},
  360. {"Inc", &TProgramBuilder::Increment},
  361. {"Dec", &TProgramBuilder::Decrement},
  362. {"Not", &TProgramBuilder::Not},
  363. {"BlockNot", &TProgramBuilder::BlockNot},
  364. {"BlockJust", &TProgramBuilder::BlockJust},
  365. {"BitNot", &TProgramBuilder::BitNot},
  366. {"Size", &TProgramBuilder::Size},
  367. {"Way", &TProgramBuilder::Way},
  368. {"VariantItem", &TProgramBuilder::VariantItem},
  369. {"CountBits", &TProgramBuilder::CountBits},
  370. {"Ascending", &TProgramBuilder::Ascending},
  371. {"Descending", &TProgramBuilder::Descending},
  372. {"ToOptional", &TProgramBuilder::Head},
  373. {"Head", &TProgramBuilder::Head},
  374. {"Last", &TProgramBuilder::Last},
  375. {"ToList", &TProgramBuilder::ToList},
  376. {"ToFlow", &TProgramBuilder::ToFlow},
  377. {"FromFlow", &TProgramBuilder::FromFlow},
  378. {"WideToBlocks", &TProgramBuilder::WideToBlocks},
  379. {"WideFromBlocks", &TProgramBuilder::WideFromBlocks},
  380. {"AsScalar", &TProgramBuilder::AsScalar},
  381. {"Just", &TProgramBuilder::NewOptional},
  382. {"Exists", &TProgramBuilder::Exists},
  383. {"BlockExists", &TProgramBuilder::BlockExists},
  384. {"Pickle", &TProgramBuilder::Pickle},
  385. {"StablePickle", &TProgramBuilder::StablePickle},
  386. {"Collect", &TProgramBuilder::Collect},
  387. {"Discard", &TProgramBuilder::Discard},
  388. {"LazyList", &TProgramBuilder::LazyList},
  389. {"ForwardList", &TProgramBuilder::ForwardList},
  390. {"Length", &TProgramBuilder::Length},
  391. {"HasItems", &TProgramBuilder::HasItems},
  392. {"Reverse", &TProgramBuilder::Reverse},
  393. {"ToIndexDict", &TProgramBuilder::ToIndexDict},
  394. {"ToString", &TProgramBuilder::ToString},
  395. {"ToBytes", &TProgramBuilder::ToBytes},
  396. {"AggrCountInit", &TProgramBuilder::AggrCountInit},
  397. {"NewMTRand", &TProgramBuilder::NewMTRand},
  398. {"NextMTRand", &TProgramBuilder::NextMTRand},
  399. {"TimezoneId", &TProgramBuilder::TimezoneId},
  400. {"TimezoneName", &TProgramBuilder::TimezoneName},
  401. {"RemoveTimezone", &TProgramBuilder::RemoveTimezone},
  402. {"DictItems", &TProgramBuilder::DictItems},
  403. {"DictKeys", &TProgramBuilder::DictKeys},
  404. {"DictPayloads", &TProgramBuilder::DictPayloads},
  405. {"QueuePop", &TProgramBuilder::QueuePop}
  406. });
  407. AddSimpleCallables({
  408. {"+", &TProgramBuilder::Add},
  409. {"-", &TProgramBuilder::Sub},
  410. {"*", &TProgramBuilder::Mul},
  411. {"/", &TProgramBuilder::Div},
  412. {"%", &TProgramBuilder::Mod},
  413. {"Add", &TProgramBuilder::Add},
  414. {"Sub", &TProgramBuilder::Sub},
  415. {"Mul", &TProgramBuilder::Mul},
  416. {"Div", &TProgramBuilder::Div},
  417. {"Mod", &TProgramBuilder::Mod},
  418. {"DecimalMul", &TProgramBuilder::DecimalMul},
  419. {"DecimalDiv", &TProgramBuilder::DecimalDiv},
  420. {"DecimalMod", &TProgramBuilder::DecimalMod},
  421. {"BlockDecimalMul", &TProgramBuilder::BlockDecimalMul},
  422. {"BlockDecimalDiv", &TProgramBuilder::BlockDecimalDiv},
  423. {"BlockDecimalMod", &TProgramBuilder::BlockDecimalMod},
  424. {"==", &TProgramBuilder::Equals},
  425. {"!=", &TProgramBuilder::NotEquals},
  426. {"<", &TProgramBuilder::Less},
  427. {"<=", &TProgramBuilder::LessOrEqual},
  428. {">", &TProgramBuilder::Greater},
  429. {">=", &TProgramBuilder::GreaterOrEqual},
  430. {"Equals", &TProgramBuilder::Equals},
  431. {"NotEquals", &TProgramBuilder::NotEquals},
  432. {"Less", &TProgramBuilder::Less},
  433. {"LessOrEqual", &TProgramBuilder::LessOrEqual},
  434. {"Greater", &TProgramBuilder::Greater},
  435. {"GreaterOrEqual", &TProgramBuilder::GreaterOrEqual},
  436. {"AggrEquals", &TProgramBuilder::AggrEquals},
  437. {"AggrNotEquals", &TProgramBuilder::AggrNotEquals},
  438. {"AggrLess", &TProgramBuilder::AggrLess},
  439. {"AggrLessOrEqual", &TProgramBuilder::AggrLessOrEqual},
  440. {"AggrGreater", &TProgramBuilder::AggrGreater},
  441. {"AggrGreaterOrEqual", &TProgramBuilder::AggrGreaterOrEqual},
  442. {"AggrMin", &TProgramBuilder::AggrMin},
  443. {"AggrMax", &TProgramBuilder::AggrMax},
  444. {"AggrAdd", &TProgramBuilder::AggrAdd},
  445. {"AggrCountUpdate", &TProgramBuilder::AggrCountUpdate},
  446. {"BitOr", &TProgramBuilder::BitOr},
  447. {"BitAnd", &TProgramBuilder::BitAnd},
  448. {"BitXor", &TProgramBuilder::BitXor},
  449. {"ShiftLeft", &TProgramBuilder::ShiftLeft},
  450. {"ShiftRight", &TProgramBuilder::ShiftRight},
  451. {"RotLeft", &TProgramBuilder::RotLeft},
  452. {"RotRight", &TProgramBuilder::RotRight},
  453. {"ListIf", &TProgramBuilder::ListIf},
  454. {"Concat", &TProgramBuilder::Concat},
  455. {"AggrConcat", &TProgramBuilder::AggrConcat},
  456. {"ByteAt", &TProgramBuilder::ByteAt},
  457. {"Nanvl", &TProgramBuilder::Nanvl},
  458. {"Skip", &TProgramBuilder::Skip},
  459. {"Take", &TProgramBuilder::Take},
  460. {"Limit", &TProgramBuilder::Take},
  461. {"WideTakeBlocks", &TProgramBuilder::WideTakeBlocks},
  462. {"WideSkipBlocks", &TProgramBuilder::WideSkipBlocks},
  463. {"BlockCoalesce", &TProgramBuilder::BlockCoalesce},
  464. {"ReplicateScalar", &TProgramBuilder::ReplicateScalar},
  465. {"BlockAnd", &TProgramBuilder::BlockAnd},
  466. {"BlockOr", &TProgramBuilder::BlockOr},
  467. {"BlockXor", &TProgramBuilder::BlockXor},
  468. {"Append", &TProgramBuilder::Append},
  469. {"Insert", &TProgramBuilder::Append},
  470. {"Prepend", &TProgramBuilder::Prepend},
  471. {"Lookup", &TProgramBuilder::Lookup},
  472. {"Contains", &TProgramBuilder::Contains},
  473. {"AddTimezone", &TProgramBuilder::AddTimezone},
  474. {"StartsWith", &TProgramBuilder::StartsWith},
  475. {"EndsWith", &TProgramBuilder::EndsWith},
  476. {"StringContains", &TProgramBuilder::StringContains},
  477. {"SqueezeToList", &TProgramBuilder::SqueezeToList},
  478. {"QueuePush", &TProgramBuilder::QueuePush}
  479. });
  480. AddSimpleCallables({
  481. {"Substring", &TProgramBuilder::Substring},
  482. {"Find", &TProgramBuilder::Find},
  483. {"RFind", &TProgramBuilder::RFind},
  484. {"ListFromRange", &TProgramBuilder::ListFromRange},
  485. {"PreserveStream", &TProgramBuilder::PreserveStream},
  486. {"BlockIf", &TProgramBuilder::BlockIf},
  487. });
  488. AddSimpleCallables({
  489. {"If", &TProgramBuilder::If},
  490. {"Or", &TProgramBuilder::Or},
  491. {"And", &TProgramBuilder::And},
  492. {"Xor", &TProgramBuilder::Xor},
  493. {"Min", &TProgramBuilder::Min},
  494. {"Max", &TProgramBuilder::Max},
  495. {"AsList", &TProgramBuilder::AsList},
  496. {"Extend", &TProgramBuilder::Extend},
  497. {"OrderedExtend", &TProgramBuilder::OrderedExtend},
  498. {"Zip", &TProgramBuilder::Zip},
  499. {"ZipAll", &TProgramBuilder::ZipAll},
  500. {"Random", &TProgramBuilder::Random},
  501. {"RandomNumber", &TProgramBuilder::RandomNumber},
  502. {"RandomUuid", &TProgramBuilder::RandomUuid},
  503. {"Now", &TProgramBuilder::Now},
  504. {"CurrentUtcDate", &TProgramBuilder::CurrentUtcDate},
  505. {"CurrentUtcDatetime", &TProgramBuilder::CurrentUtcDatetime},
  506. {"CurrentUtcTimestamp", &TProgramBuilder::CurrentUtcTimestamp},
  507. });
  508. AddSimpleCallables({
  509. {"Map", &TProgramBuilder::Map},
  510. {"OrderedMap", &TProgramBuilder::OrderedMap},
  511. {"FlatMap", &TProgramBuilder::FlatMap},
  512. {"OrderedFlatMap", &TProgramBuilder::OrderedFlatMap},
  513. {"SkipWhile", &TProgramBuilder::SkipWhile},
  514. {"TakeWhile", &TProgramBuilder::TakeWhile},
  515. {"SkipWhileInclusive", &TProgramBuilder::SkipWhileInclusive},
  516. {"TakeWhileInclusive", &TProgramBuilder::TakeWhileInclusive},
  517. });
  518. AddSimpleCallables({
  519. {"NarrowMap", &TProgramBuilder::NarrowMap},
  520. {"NarrowFlatMap", &TProgramBuilder::NarrowFlatMap},
  521. {"WideSkipWhile", &TProgramBuilder::WideSkipWhile},
  522. {"WideTakeWhile", &TProgramBuilder::WideTakeWhile},
  523. {"WideSkipWhileInclusive", &TProgramBuilder::WideSkipWhileInclusive},
  524. {"WideTakeWhileInclusive", &TProgramBuilder::WideTakeWhileInclusive},
  525. });
  526. AddSimpleCallables({
  527. {"RangeUnion", &TProgramBuilder::RangeUnion},
  528. {"RangeIntersect", &TProgramBuilder::RangeIntersect},
  529. {"RangeMultiply", &TProgramBuilder::RangeMultiply},
  530. });
  531. AddSimpleCallables({
  532. {"RangeCreate", &TProgramBuilder::RangeCreate},
  533. {"RangeFinalize", &TProgramBuilder::RangeFinalize},
  534. });
  535. AddCallable({"RoundUp", "RoundDown"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  536. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  537. const auto dstType = ctx.BuildType(node.Tail(), *node.Tail().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  538. return ctx.ProgramBuilder.Round(node.Content(), arg, dstType);
  539. });
  540. AddSimpleCallables({
  541. {"NextValue", &TProgramBuilder::NextValue},
  542. });
  543. AddCallable({"MultiMap", "OrderedMultiMap"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  544. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  545. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildWideLambda(node.Tail(), ctx, {item}); };
  546. return ctx.ProgramBuilder.MultiMap(arg, lambda);
  547. });
  548. AddCallable("ExpandMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  549. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  550. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildWideLambda(node.Tail(), ctx, {item}); };
  551. return ctx.ProgramBuilder.ExpandMap(arg, lambda);
  552. });
  553. AddCallable("WideMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  554. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  555. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildWideLambda(node.Tail(), ctx, items); };
  556. TRuntimeNode result = ctx.ProgramBuilder.WideMap(arg, lambda);
  557. if (IsWideBlockType(*node.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType())) {
  558. result = ctx.ProgramBuilder.BlockExpandChunked(result);
  559. }
  560. return result;
  561. });
  562. AddCallable("WideChain1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  563. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  564. return ctx.ProgramBuilder.WideChain1Map(flow,
  565. [&](TRuntimeNode::TList items) {
  566. return MkqlBuildWideLambda(*node.Child(1), ctx, items);
  567. },
  568. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  569. items.insert(items.cend(), state.cbegin(), state.cend());
  570. return MkqlBuildWideLambda(node.Tail(), ctx, items);
  571. });
  572. });
  573. AddCallable("NarrowMultiMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  574. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  575. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildWideLambda(node.Tail(), ctx, items); };
  576. return ctx.ProgramBuilder.NarrowMultiMap(arg, lambda);
  577. });
  578. AddCallable("WideFilter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  579. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  580. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildLambda(*node.Child(1), ctx, items); };
  581. if (node.ChildrenSize() > 2U) {
  582. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  583. return ctx.ProgramBuilder.WideFilter(arg, limit, lambda);
  584. } else {
  585. return ctx.ProgramBuilder.WideFilter(arg, lambda);
  586. }
  587. });
  588. AddCallable("WideCondense1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  589. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  590. return ctx.ProgramBuilder.WideCondense1(flow,
  591. [&](TRuntimeNode::TList items) {
  592. return MkqlBuildWideLambda(*node.Child(1), ctx, items);
  593. },
  594. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  595. items.insert(items.cend(), state.cbegin(), state.cend());
  596. return MkqlBuildLambda(*node.Child(2), ctx, items);
  597. },
  598. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  599. items.insert(items.cend(), state.cbegin(), state.cend());
  600. return MkqlBuildWideLambda(*node.Child(3), ctx, items);
  601. },
  602. HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
  603. });
  604. AddCallable("WideCombiner", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  605. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  606. i64 memLimit = 0LL;
  607. const bool withLimit = TryFromString<i64>(node.Child(1U)->Content(), memLimit);
  608. const auto keyExtractor = [&](TRuntimeNode::TList items) {
  609. return MkqlBuildWideLambda(*node.Child(2U), ctx, items);
  610. };
  611. const auto init = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) {
  612. keys.insert(keys.cend(), items.cbegin(), items.cend());
  613. return MkqlBuildWideLambda(*node.Child(3U), ctx, keys);
  614. };
  615. const auto update = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items, TRuntimeNode::TList state) {
  616. keys.insert(keys.cend(), items.cbegin(), items.cend());
  617. keys.insert(keys.cend(), state.cbegin(), state.cend());
  618. return MkqlBuildWideLambda(*node.Child(4U), ctx, keys);
  619. };
  620. const auto finish = [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) {
  621. keys.insert(keys.cend(), state.cbegin(), state.cend());
  622. return MkqlBuildWideLambda(node.Tail(), ctx, keys);
  623. };
  624. bool isStatePersistable = true;
  625. // Traverse through childs skipping input and limit children
  626. for (size_t i = 2U; i < node.ChildrenSize(); ++i) {
  627. isStatePersistable = isStatePersistable && node.Child(i)->GetTypeAnn()->IsPersistable();
  628. }
  629. if (withLimit) {
  630. return ctx.ProgramBuilder.WideCombiner(flow, memLimit, keyExtractor, init, update, finish);
  631. }
  632. if (isStatePersistable && RuntimeVersion >= 49U) {
  633. return ctx.ProgramBuilder.WideLastCombinerWithSpilling(flow, keyExtractor, init, update, finish);
  634. }
  635. return ctx.ProgramBuilder.WideLastCombiner(flow, keyExtractor, init, update, finish);
  636. });
  637. AddCallable("WideChopper", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  638. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  639. const auto keyExtractor = [&](TRuntimeNode::TList items) {
  640. return MkqlBuildWideLambda(*node.Child(1U), ctx, items);
  641. };
  642. const auto groupSwitch = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) {
  643. keys.insert(keys.cend(), items.cbegin(), items.cend());
  644. return MkqlBuildLambda(*node.Child(2U), ctx, keys);
  645. };
  646. const auto handler = [&](TRuntimeNode::TList keys, TRuntimeNode flow) {
  647. keys.emplace_back(flow);
  648. return MkqlBuildLambda(node.Tail(), ctx, keys);
  649. };
  650. return ctx.ProgramBuilder.WideChopper(flow, keyExtractor, groupSwitch, handler);
  651. });
  652. AddCallable("WideTop", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  653. return WideTopImpl(node, ctx, &TProgramBuilder::WideTop);
  654. });
  655. AddCallable("WideTopSort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  656. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSort);
  657. });
  658. AddCallable("WideSort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  659. return WideSortImpl(node, ctx, &TProgramBuilder::WideSort);
  660. });
  661. AddCallable("WideTopBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  662. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopBlocks);
  663. });
  664. AddCallable("WideTopSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  665. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSortBlocks);
  666. });
  667. AddCallable("WideSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  668. return WideSortImpl(node, ctx, &TProgramBuilder::WideSortBlocks);
  669. });
  670. AddCallable("Iterable", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  671. const auto lambda = [&]() { return MkqlBuildLambda(node.Head(), ctx, {}); };
  672. return ctx.ProgramBuilder.Iterable(lambda);
  673. });
  674. AddCallable("Filter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  675. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  676. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(*node.Child(1), ctx, {item}); };
  677. if (node.ChildrenSize() > 2U) {
  678. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  679. return ctx.ProgramBuilder.Filter(arg, limit, lambda);
  680. } else {
  681. return ctx.ProgramBuilder.Filter(arg, lambda);
  682. }
  683. });
  684. AddCallable("OrderedFilter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  685. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  686. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(*node.Child(1), ctx, {item}); };
  687. if (node.ChildrenSize() > 2U) {
  688. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  689. return ctx.ProgramBuilder.OrderedFilter(arg, limit, lambda);
  690. } else {
  691. return ctx.ProgramBuilder.OrderedFilter(arg, lambda);
  692. }
  693. });
  694. AddCallable("Member", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  695. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  696. const auto name = node.Tail().Content();
  697. return ctx.ProgramBuilder.Member(structObj, name);
  698. });
  699. AddCallable("RemoveMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  700. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  701. const auto name = node.Tail().Content();
  702. return ctx.ProgramBuilder.RemoveMember(structObj, name, false);
  703. });
  704. AddCallable("ForceRemoveMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  705. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  706. const auto name = node.Tail().Content();
  707. return ctx.ProgramBuilder.RemoveMember(structObj, name, true);
  708. });
  709. AddCallable("Nth", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  710. const auto tupleObj = MkqlBuildExpr(node.Head(), ctx);
  711. const auto index = FromString<ui32>(node.Tail().Content());
  712. return ctx.ProgramBuilder.Nth(tupleObj, index);
  713. });
  714. AddCallable("MatchRecognizeCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  715. const auto& inputStream = node.Child(0);
  716. const auto& partitionKeySelector = node.Child(1);
  717. const auto& partitionColumns = node.Child(2);
  718. const auto& params = node.Child(3);
  719. const auto& settings = node.Child(4);
  720. //explore params
  721. const auto measures = params->Child(0);
  722. const auto skipTo = params->Child(2);
  723. const auto pattern = params->Child(3);
  724. const auto defines = params->Child(4);
  725. //explore measures
  726. const auto measureNames = measures->Child(2);
  727. constexpr size_t FirstMeasureLambdaIndex = 3;
  728. //explore defines
  729. const auto defineNames = defines->Child(2);
  730. const size_t FirstDefineLambdaIndex = 3;
  731. TVector<TStringBuf> partitionColumnNames;
  732. for (const auto& n: partitionColumns->Children()) {
  733. partitionColumnNames.push_back(n->Content());
  734. }
  735. TProgramBuilder::TUnaryLambda getPartitionKeySelector = [partitionKeySelector, &ctx](TRuntimeNode inputRowArg){
  736. return MkqlBuildLambda(*partitionKeySelector, ctx, {inputRowArg});
  737. };
  738. TVector<TStringBuf> defineVarNames(defineNames->ChildrenSize());
  739. TVector<TProgramBuilder::TTernaryLambda> getDefines(defineNames->ChildrenSize());
  740. for (size_t i = 0; i != defineNames->ChildrenSize(); ++i) {
  741. defineVarNames[i] = defineNames->Child(i)->Content();
  742. getDefines[i] = [i, defines, &ctx](TRuntimeNode data, TRuntimeNode matchedVars, TRuntimeNode rowIndex) {
  743. return MkqlBuildLambda(*defines->Child(FirstDefineLambdaIndex + i), ctx, {data, matchedVars, rowIndex});
  744. };
  745. }
  746. TVector<TStringBuf> measureColumnNames(measureNames->ChildrenSize());
  747. TVector<TProgramBuilder::TBinaryLambda> getMeasures(measureNames->ChildrenSize());
  748. for (size_t i = 0; i != measureNames->ChildrenSize(); ++i) {
  749. measureColumnNames[i] = measureNames->Child(i)->Content();
  750. getMeasures[i] = [i, measures, &ctx](TRuntimeNode data, TRuntimeNode matchedVars) {
  751. return MkqlBuildLambda(*measures->Child(FirstMeasureLambdaIndex + i), ctx, {data, matchedVars});
  752. };
  753. }
  754. auto stringTo = skipTo->Child(0)->Content();
  755. auto var = skipTo->Child(1)->Content();
  756. MKQL_ENSURE(stringTo.SkipPrefix("AfterMatchSkip_"), R"(MATCH_RECOGNIZE: <row pattern skip to> should start with "AfterMatchSkip_")");
  757. NYql::NMatchRecognize::EAfterMatchSkipTo to;
  758. MKQL_ENSURE(TryFromString<NYql::NMatchRecognize::EAfterMatchSkipTo>(stringTo, to), "MATCH_RECOGNIZE: <row pattern skip to> cannot parse AfterMatchSkipTo mode");
  759. auto rowsPerMatchString = params->Child(1)->Content();
  760. MKQL_ENSURE(rowsPerMatchString.SkipPrefix("RowsPerMatch_"), R"(MATCH_RECOGNIZE: <row pattern rows per match> should start with "RowsPerMatch_")");
  761. NYql::NMatchRecognize::ERowsPerMatch rowsPerMatch;
  762. MKQL_ENSURE(TryFromString<NYql::NMatchRecognize::ERowsPerMatch>(rowsPerMatchString, rowsPerMatch), "MATCH_RECOGNIZE: cannot parse RowsPerMatch mode");
  763. const auto streamingMode = FromString<bool>(settings->Child(0)->Child(1)->Content());
  764. return ctx.ProgramBuilder.MatchRecognizeCore(
  765. MkqlBuildExpr(*inputStream, ctx),
  766. getPartitionKeySelector,
  767. partitionColumnNames,
  768. measureColumnNames,
  769. getMeasures,
  770. NYql::NMatchRecognize::ConvertPattern(pattern, ctx.ExprCtx),
  771. defineVarNames,
  772. getDefines,
  773. streamingMode,
  774. NYql::NMatchRecognize::TAfterMatchSkipTo{to, TString{var}},
  775. rowsPerMatch
  776. );
  777. });
  778. AddCallable("TimeOrderRecover", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  779. const auto inputStream = node.Child(0);
  780. const auto timeExtractor = node.Child(1);
  781. const auto delay = node.Child(2);
  782. const auto ahead = node.Child(3);
  783. const auto rowLimit = node.Child(4);
  784. return ctx.ProgramBuilder.TimeOrderRecover(
  785. MkqlBuildExpr(*inputStream, ctx),
  786. [timeExtractor, &ctx](TRuntimeNode row) {
  787. return MkqlBuildLambda(*timeExtractor, ctx, {row});
  788. },
  789. MkqlBuildExpr(*delay, ctx),
  790. MkqlBuildExpr(*ahead, ctx),
  791. MkqlBuildExpr(*rowLimit, ctx)
  792. );
  793. });
  794. AddCallable("Guess", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  795. const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
  796. auto type = node.Head().GetTypeAnn();
  797. if (type->GetKind() == ETypeAnnotationKind::Optional) {
  798. type = type->Cast<TOptionalExprType>()->GetItemType();
  799. }
  800. auto varType = type->Cast<TVariantExprType>();
  801. if (varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
  802. auto index = FromString<ui32>(node.Child(1)->Content());
  803. return ctx.ProgramBuilder.Guess(variantObj, index);
  804. } else {
  805. return ctx.ProgramBuilder.Guess(variantObj, node.Child(1)->Content());
  806. }
  807. });
  808. AddCallable("Visit", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  809. const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
  810. const auto type = node.Head().GetTypeAnn()->Cast<TVariantExprType>();
  811. const TTupleExprType* tupleType = nullptr;
  812. const TStructExprType* structType = nullptr;
  813. std::vector<TExprNode*> lambdas;
  814. TRuntimeNode defaultValue;
  815. if (type->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
  816. tupleType = type->GetUnderlyingType()->Cast<TTupleExprType>();
  817. lambdas.resize(tupleType->GetSize());
  818. } else {
  819. structType = type->GetUnderlyingType()->Cast<TStructExprType>();
  820. lambdas.resize(structType->GetSize());
  821. }
  822. for (ui32 index = 1; index < node.ChildrenSize(); ++index) {
  823. const auto child = node.Child(index);
  824. if (!child->IsAtom()) {
  825. defaultValue = MkqlBuildExpr(*child, ctx);
  826. continue;
  827. }
  828. ui32 itemIndex;
  829. if (tupleType) {
  830. itemIndex = FromString<ui32>(child->Content());
  831. } else {
  832. itemIndex = *structType->FindItem(child->Content());
  833. }
  834. YQL_ENSURE(itemIndex < lambdas.size());
  835. ++index;
  836. lambdas[itemIndex] = node.Child(index);
  837. }
  838. const auto handler = [&](ui32 index, TRuntimeNode item) {
  839. if (const auto lambda = lambdas[index]) {
  840. return MkqlBuildLambda(*lambda, ctx, {item});
  841. }
  842. return defaultValue;
  843. };
  844. return ctx.ProgramBuilder.VisitAll(variantObj, handler);
  845. });
  846. AddCallable("CurrentActorId", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  847. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  848. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  849. return TRuntimeNode(call.Build(), false);
  850. });
  851. AddCallable("Uint8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  852. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui8>(node.Head(), NUdf::EDataSlot::Uint8));
  853. });
  854. AddCallable("Int8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  855. return ctx.ProgramBuilder.NewDataLiteral(FromString<i8>(node.Head(), NUdf::EDataSlot::Int8));
  856. });
  857. AddCallable("Uint16", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  858. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui16>(node.Head(), NUdf::EDataSlot::Uint16));
  859. });
  860. AddCallable("Int16", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  861. return ctx.ProgramBuilder.NewDataLiteral(FromString<i16>(node.Head(), NUdf::EDataSlot::Int16));
  862. });
  863. AddCallable("Int32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  864. return ctx.ProgramBuilder.NewDataLiteral(FromString<i32>(node.Head(), NUdf::EDataSlot::Int32));
  865. });
  866. AddCallable("Uint32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  867. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui32>(node.Head(), NUdf::EDataSlot::Uint32));
  868. });
  869. AddCallable("Int64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  870. return ctx.ProgramBuilder.NewDataLiteral(FromString<i64>(node.Head(), NUdf::EDataSlot::Int64));
  871. });
  872. AddCallable("Uint64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  873. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui64>(node.Head(), NUdf::EDataSlot::Uint64));
  874. });
  875. AddCallable("String", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  876. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content());
  877. });
  878. AddCallable("Utf8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  879. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Utf8>(node.Head().Content());
  880. });
  881. AddCallable("Yson", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  882. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(node.Head().Content());
  883. });
  884. AddCallable("Json", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  885. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Json>(node.Head().Content());
  886. });
  887. AddCallable("JsonDocument", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  888. // NOTE: ValueFromString returns TUnboxedValuePod. This type does not free string inside it during destruction.
  889. // To get smart pointer-like behaviour we convert TUnboxedValuePod to TUnboxedValue. Without this conversion there
  890. // will be a memory leak.
  891. NUdf::TUnboxedValue jsonDocument = ValueFromString(NUdf::EDataSlot::JsonDocument, node.Head().Content());
  892. MKQL_ENSURE(bool(jsonDocument), "Invalid JsonDocument literal");
  893. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::JsonDocument>(jsonDocument.AsStringRef());
  894. });
  895. AddCallable("Uuid", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  896. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Uuid>(node.Head().Content());
  897. });
  898. AddCallable("Decimal", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  899. const auto precision = FromString<ui8>(node.Child(1)->Content());
  900. const auto scale = FromString<ui8>(node.Child(2)->Content());
  901. MKQL_ENSURE(precision > 0, "Precision must be positive.");
  902. MKQL_ENSURE(scale <= precision, "Scale too large.");
  903. const auto data = NDecimal::FromString(node.Head().Content(), precision, scale);
  904. MKQL_ENSURE(!NDecimal::IsError(data), "Bad decimal.");
  905. return ctx.ProgramBuilder.NewDecimalLiteral(data, precision, scale);
  906. });
  907. AddCallable("Bool", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  908. return ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(node.Head(), NUdf::EDataSlot::Bool));
  909. });
  910. AddCallable("Float", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  911. return ctx.ProgramBuilder.NewDataLiteral(FromString<float>(node.Head(), NUdf::EDataSlot::Float));
  912. });
  913. AddCallable("Double", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  914. return ctx.ProgramBuilder.NewDataLiteral(FromString<double>(node.Head(), NUdf::EDataSlot::Double));
  915. });
  916. AddCallable("DyNumber", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  917. const NUdf::TUnboxedValue val = ValueFromString(NUdf::EDataSlot::DyNumber, node.Head().Content());
  918. MKQL_ENSURE(val, "Bad DyNumber: " << TString(node.Head().Content()).Quote());
  919. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::DyNumber>(val.AsStringRef());
  920. });
  921. AddCallable("Date", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  922. const auto value = FromString<ui16>(node.Head(), NUdf::EDataSlot::Date);
  923. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Date>(
  924. NUdf::TStringRef((const char*)&value, sizeof(value)));
  925. });
  926. AddCallable("Datetime", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  927. const auto value = FromString<ui32>(node.Head(), NUdf::EDataSlot::Datetime);
  928. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Datetime>(
  929. NUdf::TStringRef((const char*)&value, sizeof(value)));
  930. });
  931. AddCallable("Timestamp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  932. const auto value = FromString<ui64>(node.Head(), NUdf::EDataSlot::Timestamp);
  933. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Timestamp>(
  934. NUdf::TStringRef((const char*)&value, sizeof(value)));
  935. });
  936. AddCallable("Interval", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  937. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Interval);
  938. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(
  939. NUdf::TStringRef((const char*)&value, sizeof(value)));
  940. });
  941. AddCallable("TzDate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  942. const auto& parts = CutTimezone<ui16>(node.Head().Content());
  943. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDate>(parts.first, parts.second);
  944. });
  945. AddCallable("TzDatetime", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  946. const auto& parts = CutTimezone<ui32>(node.Head().Content());
  947. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDatetime>(parts.first, parts.second);
  948. });
  949. AddCallable("TzTimestamp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  950. const auto& parts = CutTimezone<ui64>(node.Head().Content());
  951. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzTimestamp>(parts.first, parts.second);
  952. });
  953. AddCallable("Date32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  954. const auto value = FromString<i32>(node.Head(), NUdf::EDataSlot::Date32);
  955. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Date32>(
  956. NUdf::TStringRef((const char*)&value, sizeof(value)));
  957. });
  958. AddCallable("Datetime64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  959. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Datetime64);
  960. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Datetime64>(
  961. NUdf::TStringRef((const char*)&value, sizeof(value)));
  962. });
  963. AddCallable("Timestamp64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  964. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Timestamp64);
  965. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Timestamp64>(
  966. NUdf::TStringRef((const char*)&value, sizeof(value)));
  967. });
  968. AddCallable("Interval64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  969. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Interval64);
  970. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Interval64>(
  971. NUdf::TStringRef((const char*)&value, sizeof(value)));
  972. });
  973. AddCallable("TzDate32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  974. const auto& parts = CutTimezone<i32>(node.Head().Content());
  975. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDate32>(parts.first, parts.second);
  976. });
  977. AddCallable("TzDatetime64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  978. const auto& parts = CutTimezone<i64>(node.Head().Content());
  979. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDatetime64>(parts.first, parts.second);
  980. });
  981. AddCallable("TzTimestamp64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  982. const auto& parts = CutTimezone<i64>(node.Head().Content());
  983. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzTimestamp64>(parts.first, parts.second);
  984. });
  985. AddCallable("FoldMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  986. const auto list = MkqlBuildExpr(node.Head(), ctx);
  987. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  988. return ctx.ProgramBuilder.ChainMap(list, state, [&](TRuntimeNode item, TRuntimeNode state) {
  989. return MkqlBuildSplitLambda(*node.Child(2), ctx, {item, state});
  990. });
  991. });
  992. AddCallable("Fold1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  993. const auto list = MkqlBuildExpr(node.Head(), ctx);
  994. return ctx.ProgramBuilder.Chain1Map(list, [&](TRuntimeNode item) {
  995. return MkqlBuildSplitLambda(*node.Child(1), ctx, {item});
  996. }, [&](TRuntimeNode item, TRuntimeNode state) {
  997. return MkqlBuildSplitLambda(*node.Child(2), ctx, {item, state});
  998. });
  999. });
  1000. AddCallable("Chain1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1001. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1002. return ctx.ProgramBuilder.Chain1Map(list,
  1003. [&](TRuntimeNode item) -> std::array<TRuntimeNode, 2U> {
  1004. const auto out = MkqlBuildLambda(*node.Child(1), ctx, {item});
  1005. return {{out, out}};
  1006. }, [&](TRuntimeNode item, TRuntimeNode state) -> std::array<TRuntimeNode, 2U> {
  1007. const auto out = MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1008. return {{out, out}};
  1009. });
  1010. });
  1011. AddCallable("Extract", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1012. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1013. const auto name = node.Tail().Content();
  1014. return ctx.ProgramBuilder.Extract(list, name);
  1015. });
  1016. AddCallable("OrderedExtract", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1017. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1018. const auto name = node.Child(1)->Content();
  1019. return ctx.ProgramBuilder.OrderedExtract(list, name);
  1020. });
  1021. AddCallable("Fold", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1022. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1023. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1024. return ctx.ProgramBuilder.Fold(list, state, [&](TRuntimeNode item, TRuntimeNode state) {
  1025. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1026. });
  1027. });
  1028. AddCallable("MapNext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1029. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1030. return ctx.ProgramBuilder.MapNext(list, [&](TRuntimeNode item, TRuntimeNode nextItem) {
  1031. return MkqlBuildLambda(node.Tail(), ctx, {item, nextItem});
  1032. });
  1033. });
  1034. AddCallable("Fold1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1035. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1036. return ctx.ProgramBuilder.Fold1(list, [&](TRuntimeNode item) {
  1037. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1038. }, [&](TRuntimeNode item, TRuntimeNode state) {
  1039. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1040. });
  1041. });
  1042. AddCallable("Condense", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1043. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1044. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1045. return ctx.ProgramBuilder.Condense(stream, state,
  1046. [&](TRuntimeNode item, TRuntimeNode state) {
  1047. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1048. },
  1049. [&](TRuntimeNode item, TRuntimeNode state) {
  1050. return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
  1051. }, HasContextFuncs(*node.Child(3)));
  1052. });
  1053. AddCallable("Condense1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1054. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1055. return ctx.ProgramBuilder.Condense1(stream,
  1056. [&](TRuntimeNode item) {
  1057. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1058. },
  1059. [&](TRuntimeNode item, TRuntimeNode state) {
  1060. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1061. },
  1062. [&](TRuntimeNode item, TRuntimeNode state) {
  1063. return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
  1064. }, HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
  1065. });
  1066. AddCallable("Squeeze", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1067. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1068. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1069. return ctx.ProgramBuilder.Squeeze(stream, state, [&](TRuntimeNode item, TRuntimeNode state) {
  1070. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1071. }, node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1072. return MkqlBuildLambda(*node.Child(3), ctx, {state});
  1073. }, node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1074. return MkqlBuildLambda(*node.Child(4), ctx, {state});
  1075. });
  1076. });
  1077. AddCallable("Squeeze1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1078. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1079. return ctx.ProgramBuilder.Squeeze1(stream, [&](TRuntimeNode item) {
  1080. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1081. }, [&](TRuntimeNode item, TRuntimeNode state) {
  1082. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1083. }, node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1084. return MkqlBuildLambda(*node.Child(3), ctx, {state});
  1085. }, node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1086. return MkqlBuildLambda(*node.Child(4), ctx, {state});
  1087. });
  1088. });
  1089. AddCallable("Sort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1090. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1091. const auto ascending = MkqlBuildExpr(*node.Child(1), ctx);
  1092. return ctx.ProgramBuilder.Sort(list, ascending, [&](TRuntimeNode item) {
  1093. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1094. });
  1095. });
  1096. AddCallable({"Top", "TopSort"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1097. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1098. const auto count = MkqlBuildExpr(*node.Child(1), ctx);
  1099. const auto ascending = MkqlBuildExpr(*node.Child(2), ctx);
  1100. return (ctx.ProgramBuilder.*(node.Content() == "Top" ? &TProgramBuilder::Top : &TProgramBuilder::TopSort))
  1101. (list, count, ascending, [&](TRuntimeNode item) {
  1102. return MkqlBuildLambda(*node.Child(3), ctx, {item});
  1103. });
  1104. });
  1105. AddCallable("KeepTop", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1106. const auto count = MkqlBuildExpr(node.Head(), ctx);
  1107. const auto list = MkqlBuildExpr(*node.Child(1), ctx);
  1108. const auto item = MkqlBuildExpr(*node.Child(2), ctx);
  1109. const auto ascending = MkqlBuildExpr(*node.Child(3), ctx);
  1110. return ctx.ProgramBuilder.KeepTop(count, list, item, ascending, [&](TRuntimeNode item) {
  1111. return MkqlBuildLambda(*node.Child(4), ctx, {item});
  1112. });
  1113. });
  1114. AddCallable("Struct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1115. const auto structType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1116. const auto verifiedStructType = AS_TYPE(TStructType, structType);
  1117. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  1118. members.reserve(verifiedStructType->GetMembersCount());
  1119. node.ForEachChild([&](const TExprNode& child) {
  1120. members.emplace_back(child.Head().Content(), MkqlBuildExpr(child.Tail(), ctx));
  1121. });
  1122. return ctx.ProgramBuilder.NewStruct(verifiedStructType, members);
  1123. });
  1124. AddCallable("AddMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1125. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  1126. const auto memberName = node.Child(1)->Content();
  1127. const auto value = MkqlBuildExpr(node.Tail(), ctx);
  1128. return ctx.ProgramBuilder.AddMember(structObj, memberName, value);
  1129. });
  1130. AddCallable("List", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1131. const auto listType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1132. const auto itemType = AS_TYPE(TListType, listType)->GetItemType();
  1133. const auto& items = GetArgumentsFrom<1U>(node, ctx);
  1134. return ctx.ProgramBuilder.NewList(itemType, items);
  1135. });
  1136. AddCallable("FromString", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1137. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1138. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1139. return ctx.ProgramBuilder.FromString(arg, type);
  1140. });
  1141. AddCallable("StrictFromString", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1142. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1143. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1144. return ctx.ProgramBuilder.StrictFromString(arg, type);
  1145. });
  1146. AddCallable("FromBytes", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1147. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1148. const auto type = ctx.BuildType(node, *node.GetTypeAnn()->Cast<TOptionalExprType>()->GetItemType());
  1149. return ctx.ProgramBuilder.FromBytes(arg, type);
  1150. });
  1151. AddCallable("Convert", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1152. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1153. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1154. return ctx.ProgramBuilder.Convert(arg, type);
  1155. });
  1156. AddCallable("ToIntegral", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1157. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1158. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1159. return ctx.ProgramBuilder.ToIntegral(arg, type);
  1160. });
  1161. AddCallable("UnsafeTimestampCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1162. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1163. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1164. return ctx.ProgramBuilder.Convert(arg, type);
  1165. });
  1166. AddCallable("SafeCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1167. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1168. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1169. return ctx.ProgramBuilder.Cast(arg, type);
  1170. });
  1171. AddCallable("Default", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1172. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1173. return ctx.ProgramBuilder.Default(type);
  1174. });
  1175. AddCallable("Coalesce", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1176. auto ret = MkqlBuildExpr(node.Head(), ctx);
  1177. for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
  1178. auto value = MkqlBuildExpr(*node.Child(i), ctx);
  1179. ret = ctx.ProgramBuilder.Coalesce(ret, value);
  1180. }
  1181. return ret;
  1182. });
  1183. AddCallable("Unwrap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1184. const auto opt = MkqlBuildExpr(node.Head(), ctx);
  1185. const auto message = node.ChildrenSize() > 1 ? MkqlBuildExpr(node.Tail(), ctx) : ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  1186. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1187. return ctx.ProgramBuilder.Unwrap(opt, message, pos.File, pos.Row, pos.Column);
  1188. });
  1189. AddCallable("EmptyFrom", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1190. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1191. switch (node.GetTypeAnn()->GetKind()) {
  1192. case ETypeAnnotationKind::Flow:
  1193. case ETypeAnnotationKind::Stream:
  1194. return ctx.ProgramBuilder.EmptyIterator(type);
  1195. case ETypeAnnotationKind::Optional:
  1196. return ctx.ProgramBuilder.NewEmptyOptional(type);
  1197. case ETypeAnnotationKind::List:
  1198. return ctx.ProgramBuilder.NewEmptyList(AS_TYPE(TListType, type)->GetItemType());
  1199. case ETypeAnnotationKind::Dict:
  1200. return ctx.ProgramBuilder.NewDict(type, {});
  1201. default:
  1202. ythrow TNodeException(node) << "Empty from " << *node.GetTypeAnn() << " isn't supported.";
  1203. }
  1204. });
  1205. AddCallable("Nothing", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1206. const auto optType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1207. return ctx.ProgramBuilder.NewEmptyOptional(optType);
  1208. });
  1209. AddCallable("Unpickle", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1210. const auto type = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1211. const auto serialized = MkqlBuildExpr(node.Tail(), ctx);
  1212. return ctx.ProgramBuilder.Unpickle(type, serialized);
  1213. });
  1214. AddCallable("Optional", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1215. const auto optType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1216. const auto arg = MkqlBuildExpr(node.Tail(), ctx);
  1217. return ctx.ProgramBuilder.NewOptional(optType, arg);
  1218. });
  1219. AddCallable("Iterator", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1220. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1221. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  1222. return ctx.ProgramBuilder.Iterator(arg, args);
  1223. });
  1224. AddCallable("EmptyIterator", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1225. const auto streamType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1226. return ctx.ProgramBuilder.EmptyIterator(streamType);
  1227. });
  1228. AddCallable("Switch", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1229. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1230. std::vector<TSwitchInput> inputs;
  1231. ui64 memoryLimitBytes = FromString<ui64>(node.Child(1)->Content());
  1232. ui32 offset = 0;
  1233. for (ui32 i = 2; i < node.ChildrenSize(); i += 2) {
  1234. TSwitchInput input;
  1235. for (auto& child : node.Child(i)->Children()) {
  1236. input.Indicies.push_back(FromString<ui32>(child->Content()));
  1237. }
  1238. const auto& lambda = *node.Child(i + 1);
  1239. const auto& lambdaArg = lambda.Head().Head();
  1240. auto outputStreams = 1;
  1241. const auto& streamItemType = GetSeqItemType(*lambda.Tail().GetTypeAnn());
  1242. if (streamItemType.GetKind() == ETypeAnnotationKind::Variant) {
  1243. outputStreams = streamItemType.Cast<TVariantExprType>()->GetUnderlyingType()->Cast<TTupleExprType>()->GetSize();
  1244. }
  1245. if (node.ChildrenSize() > 4 || outputStreams != 1) {
  1246. input.ResultVariantOffset = offset;
  1247. }
  1248. offset += outputStreams;
  1249. input.InputType = ctx.BuildType(lambdaArg, *lambdaArg.GetTypeAnn());
  1250. inputs.emplace_back(input);
  1251. }
  1252. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1253. return ctx.ProgramBuilder.Switch(stream, inputs, [&](ui32 index, TRuntimeNode item) -> TRuntimeNode {
  1254. return MkqlBuildLambda(*node.Child(2 + 2 * index + 1), ctx, {item});
  1255. }, memoryLimitBytes, returnType);
  1256. });
  1257. AddCallable("ToStream", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1258. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1259. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  1260. return ctx.ProgramBuilder.Iterator(ctx.ProgramBuilder.ToList(arg), args);
  1261. });
  1262. AddCallable(LeftName, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1263. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1264. return ctx.ProgramBuilder.Nth(arg, 0);
  1265. });
  1266. AddCallable(RightName, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1267. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1268. return ctx.ProgramBuilder.Nth(arg, 1);
  1269. });
  1270. AddCallable("FilterNullMembers", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1271. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1272. if (node.ChildrenSize() < 2U) {
  1273. return ctx.ProgramBuilder.FilterNullMembers(list);
  1274. } else {
  1275. std::vector<std::string_view> members;
  1276. members.reserve(node.Tail().ChildrenSize());
  1277. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Content()); });
  1278. return ctx.ProgramBuilder.FilterNullMembers(list, members);
  1279. }
  1280. });
  1281. AddCallable("SkipNullMembers", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1282. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1283. if (node.ChildrenSize() < 2U) {
  1284. return ctx.ProgramBuilder.SkipNullMembers(list);
  1285. } else {
  1286. std::vector<std::string_view> members;
  1287. members.reserve(node.Tail().ChildrenSize());
  1288. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Content()); });
  1289. return ctx.ProgramBuilder.SkipNullMembers(list, members);
  1290. }
  1291. });
  1292. AddCallable("FilterNullElements", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1293. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1294. if (node.ChildrenSize() < 2U) {
  1295. return ctx.ProgramBuilder.FilterNullElements(list);
  1296. } else {
  1297. std::vector<ui32> members;
  1298. members.reserve(node.Tail().ChildrenSize());
  1299. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(FromString<ui32>(child.Content())); });
  1300. return ctx.ProgramBuilder.FilterNullElements(list, members);
  1301. }
  1302. });
  1303. AddCallable("SkipNullElements", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1304. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1305. if (node.ChildrenSize() < 2U) {
  1306. return ctx.ProgramBuilder.SkipNullElements(list);
  1307. } else {
  1308. std::vector<ui32> members;
  1309. members.reserve(node.Tail().ChildrenSize());
  1310. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(FromString<ui32>(child.Content())); });
  1311. return ctx.ProgramBuilder.SkipNullElements(list, members);
  1312. }
  1313. });
  1314. AddCallable("MapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1315. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1316. const auto dict = MkqlBuildExpr(*node.Child(1), ctx);
  1317. const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
  1318. const auto& outputItemType = GetSeqItemType(*node.GetTypeAnn());
  1319. auto rightItemType = node.Child(1U)->GetTypeAnn()->Cast<TDictExprType>()->GetPayloadType();
  1320. if (ETypeAnnotationKind::List == rightItemType->GetKind()) {
  1321. rightItemType = rightItemType->Cast<TListExprType>()->GetItemType();
  1322. }
  1323. std::vector<ui32> leftKeyColumns, leftRenames, rightRenames;
  1324. switch (const auto& inputItemType = GetSeqItemType(*node.Head().GetTypeAnn()); inputItemType.GetKind()) {
  1325. case ETypeAnnotationKind::Struct: {
  1326. const auto inputStructType = inputItemType.Cast<TStructExprType>();
  1327. const auto outputStructType = outputItemType.Cast<TStructExprType>();
  1328. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1329. bool s = false;
  1330. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputStructType : *outputStructType, child.Content())); });
  1331. switch (rightItemType->GetKind()) {
  1332. case ETypeAnnotationKind::Struct: {
  1333. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1334. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1335. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightStructType : *outputStructType, child.Content())); });
  1336. }
  1337. break;
  1338. case ETypeAnnotationKind::Tuple: {
  1339. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1340. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1341. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightTupleType, child.Content()) : *GetFieldPosition(*outputStructType, child.Content())); });
  1342. }
  1343. break;
  1344. default:
  1345. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1346. }
  1347. break;
  1348. }
  1349. case ETypeAnnotationKind::Tuple: {
  1350. const auto inputTupleType = inputItemType.Cast<TTupleExprType>();
  1351. const auto outputTupleType = outputItemType.Cast<TTupleExprType>();
  1352. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1353. bool s = false;
  1354. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputTupleType : *outputTupleType, child.Content())); });
  1355. switch (rightItemType->GetKind()) {
  1356. case ETypeAnnotationKind::Tuple: {
  1357. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1358. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1359. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); });
  1360. }
  1361. break;
  1362. case ETypeAnnotationKind::Struct: {
  1363. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1364. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1365. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightStructType, child.Content()) : *GetFieldPosition(*outputTupleType, child.Content())); });
  1366. }
  1367. break;
  1368. default:
  1369. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1370. }
  1371. break;
  1372. }
  1373. case ETypeAnnotationKind::Multi: {
  1374. const auto inputMultiType = inputItemType.Cast<TMultiExprType>();
  1375. const auto outputMultiType = outputItemType.Cast<TMultiExprType>();
  1376. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1377. bool s = false;
  1378. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputMultiType : *outputMultiType, child.Content())); });
  1379. switch (rightItemType->GetKind()) {
  1380. case ETypeAnnotationKind::Tuple: {
  1381. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1382. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1383. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightTupleType, child.Content()) : *GetFieldPosition(*outputMultiType, child.Content())); });
  1384. }
  1385. break;
  1386. case ETypeAnnotationKind::Struct: {
  1387. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1388. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1389. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightStructType, child.Content()) : *GetFieldPosition(*outputMultiType, child.Content())); });
  1390. }
  1391. break;
  1392. default:
  1393. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1394. }
  1395. break;
  1396. }
  1397. default:
  1398. ythrow TNodeException(node) << "Wrong MapJoinCore input item type: " << inputItemType;
  1399. }
  1400. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1401. return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType);
  1402. });
  1403. AddCallable("BlockStorage", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1404. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1405. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1406. return ctx.ProgramBuilder.BlockStorage(stream, returnType);
  1407. });
  1408. AddCallable("BlockMapJoinIndex", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1409. const auto blockStorage = MkqlBuildExpr(node.Head(), ctx);
  1410. const auto itemType = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TMultiExprType>();
  1411. std::vector<ui32> keyColumns;
  1412. node.Child(2)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetWideBlockFieldPosition(*itemType, child.Content())); });
  1413. const bool any = HasSetting(node.Tail(), "any");
  1414. const auto itemMkqlType = BuildType(*node.Child(1), *itemType, ctx.ProgramBuilder);
  1415. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1416. return ctx.ProgramBuilder.BlockMapJoinIndex(blockStorage, itemMkqlType, keyColumns, any, returnType);
  1417. });
  1418. AddCallable("BlockMapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1419. const auto leftStream = MkqlBuildExpr(node.Head(), ctx);
  1420. const auto rightBlockStorage = MkqlBuildExpr(*node.Child(1), ctx);
  1421. const auto leftItemType = node.Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
  1422. const auto rightItemType = node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TMultiExprType>();
  1423. const auto joinKind = GetJoinKind(node, node.Child(3)->Content());
  1424. std::vector<ui32> leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops;
  1425. node.Child(4)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); });
  1426. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftKeyDrops.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); });
  1427. node.Child(6)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); });
  1428. node.Child(7)->ForEachChild([&](const TExprNode& child){ rightKeyDrops.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); });
  1429. const auto rightItemMkqlType = BuildType(*node.Child(2), *rightItemType, ctx.ProgramBuilder);
  1430. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1431. return ctx.ProgramBuilder.BlockMapJoinCore(leftStream, rightBlockStorage, rightItemMkqlType, joinKind, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, returnType);
  1432. });
  1433. AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1434. bool selfJoin = node.Content() == "GraceSelfJoinCore";
  1435. int shift = selfJoin ? 0 : 1;
  1436. const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx);
  1437. const auto flowRight = MkqlBuildExpr(*node.Child(shift), ctx);
  1438. const auto joinKind = GetJoinKind(node, node.Child(shift + 1)->Content());
  1439. const auto& outputItemType = GetSeqItemType(*node.GetTypeAnn());
  1440. std::vector<ui32> leftKeyColumns, rightKeyColumns, leftRenames, rightRenames;
  1441. const auto& leftItemType = GetSeqItemType(*node.Child(0)->GetTypeAnn());
  1442. const auto& rightItemType = GetSeqItemType(*node.Child(shift)->GetTypeAnn());
  1443. if (leftItemType.GetKind() != ETypeAnnotationKind::Multi ||
  1444. rightItemType.GetKind() != ETypeAnnotationKind::Multi ) {
  1445. ythrow TNodeException(node) << "Wrong GraceJoinCore input item type: " << leftItemType << " " << rightItemType;
  1446. }
  1447. if (outputItemType.GetKind() != ETypeAnnotationKind::Multi ) {
  1448. ythrow TNodeException(node) << "Wrong GraceJoinCore output item type: " << outputItemType;
  1449. }
  1450. const auto leftTupleType = leftItemType.Cast<TMultiExprType>();
  1451. const auto rightTupleType = rightItemType.Cast<TMultiExprType>();
  1452. const auto outputTupleType = outputItemType.Cast<TMultiExprType>();
  1453. node.Child(shift + 2)->ForEachChild([&](TExprNode& child){
  1454. leftKeyColumns.emplace_back(*GetFieldPosition(*leftTupleType, child.Content()));
  1455. });
  1456. node.Child(shift + 3)->ForEachChild([&](TExprNode& child){
  1457. rightKeyColumns.emplace_back(*GetFieldPosition(*rightTupleType, child.Content())); });
  1458. bool s = false;
  1459. node.Child(shift + 4)->ForEachChild([&](TExprNode& child){
  1460. leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *leftTupleType : *outputTupleType, child.Content())); });
  1461. s = false;
  1462. node.Child(shift + 5)->ForEachChild([&](TExprNode& child){
  1463. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); });
  1464. auto anyJoinSettings = EAnyJoinSettings::None;
  1465. node.Tail().ForEachChild([&](const TExprNode& flag) {
  1466. if (flag.IsAtom("LeftAny"))
  1467. anyJoinSettings = EAnyJoinSettings::Right == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Left;
  1468. else if (flag.IsAtom("RightAny"))
  1469. anyJoinSettings = EAnyJoinSettings::Left == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Right;
  1470. });
  1471. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1472. return selfJoin
  1473. ? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
  1474. : ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
  1475. });
  1476. AddCallable("CommonJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1477. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1478. const auto joinKind = GetJoinKind(node, node.Child(1)->Content());
  1479. std::vector<ui32> leftColumns, rightColumns, requiredColumns, keyColumns;
  1480. ui32 tableIndexFieldPos;
  1481. switch (const auto& inputItemType = GetSeqItemType(*node.Head().GetTypeAnn()); inputItemType.GetKind()) {
  1482. case ETypeAnnotationKind::Struct: {
  1483. const auto inputStructType = inputItemType.Cast<TStructExprType>();
  1484. const auto outputStructType = GetSeqItemType(*node.GetTypeAnn()).Cast<TStructExprType>();
  1485. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1486. leftColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content()));
  1487. leftColumns.emplace_back(*GetFieldPosition(*outputStructType, child.Content()));
  1488. });
  1489. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1490. rightColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content()));
  1491. rightColumns.emplace_back(*GetFieldPosition(*outputStructType, child.Content()));
  1492. });
  1493. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1494. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1495. tableIndexFieldPos = *GetFieldPosition(*inputStructType, node.Tail().Content());
  1496. break;
  1497. }
  1498. case ETypeAnnotationKind::Tuple: {
  1499. const auto inputTupleType = inputItemType.Cast<TTupleExprType>();
  1500. ui32 i = 0U;
  1501. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1502. leftColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content()));
  1503. leftColumns.emplace_back(i++);
  1504. });
  1505. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1506. rightColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content()));
  1507. rightColumns.emplace_back(i++);
  1508. });
  1509. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1510. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1511. tableIndexFieldPos = *GetFieldPosition(*inputTupleType, node.Tail().Content());
  1512. break;
  1513. }
  1514. case ETypeAnnotationKind::Multi: {
  1515. const auto inputMultiType = inputItemType.Cast<TMultiExprType>();
  1516. ui32 i = 0U;
  1517. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1518. leftColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content()));
  1519. leftColumns.emplace_back(i++);
  1520. });
  1521. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1522. rightColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content()));
  1523. rightColumns.emplace_back(i++);
  1524. });
  1525. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1526. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1527. tableIndexFieldPos = *GetFieldPosition(*inputMultiType, node.Tail().Content());
  1528. break;
  1529. }
  1530. default:
  1531. ythrow TNodeException(node) << "Wrong CommonJoinCore input item type: " << inputItemType;
  1532. }
  1533. ui64 memLimit = 0U;
  1534. if (const auto memLimitSetting = GetSetting(*node.Child(6), "memLimit")) {
  1535. memLimit = FromString<ui64>(memLimitSetting->Child(1)->Content());
  1536. }
  1537. std::optional<ui32> sortedTableOrder;
  1538. if (const auto sortSetting = GetSetting(*node.Child(6), "sorted")) {
  1539. sortedTableOrder = sortSetting->Child(1)->Content() == "left" ? 0 : 1;
  1540. }
  1541. EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None;
  1542. if (const auto anyNode = GetSetting(*node.Child(6), "any")) {
  1543. for (auto sideNode : anyNode->Child(1)->Children()) {
  1544. YQL_ENSURE(sideNode->IsAtom());
  1545. AddAnyJoinSide(anyJoinSettings, sideNode->Content() == "left" ? EAnyJoinSettings::Left : EAnyJoinSettings::Right);
  1546. }
  1547. }
  1548. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1549. return ctx.ProgramBuilder.CommonJoinCore(list, joinKind, leftColumns, rightColumns,
  1550. requiredColumns, keyColumns, memLimit, sortedTableOrder, anyJoinSettings, tableIndexFieldPos, returnType);
  1551. });
  1552. AddCallable("CombineCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1553. NNodes::TCoCombineCore core(&node);
  1554. const auto stream = MkqlBuildExpr(core.Input().Ref(), ctx);
  1555. const auto memLimit = NNodes::TCoCombineCore::idx_MemLimit < node.ChildrenSize() ?
  1556. FromString<ui64>(core.MemLimit().Cast().Value()) : 0;
  1557. const auto keyExtractor = [&](TRuntimeNode item) {
  1558. return MkqlBuildLambda(core.KeyExtractor().Ref(), ctx, {item});
  1559. };
  1560. const auto init = [&](TRuntimeNode key, TRuntimeNode item) {
  1561. return MkqlBuildLambda(core.InitHandler().Ref(), ctx, {key, item});
  1562. };
  1563. const auto update = [&](TRuntimeNode key, TRuntimeNode item, TRuntimeNode state) {
  1564. return MkqlBuildLambda(core.UpdateHandler().Ref(), ctx, {key, item, state});
  1565. };
  1566. const auto finish = [&](TRuntimeNode key, TRuntimeNode state) {
  1567. return MkqlBuildLambda(core.FinishHandler().Ref(), ctx, {key, state});
  1568. };
  1569. return ctx.ProgramBuilder.CombineCore(stream, keyExtractor, init, update, finish, memLimit);
  1570. });
  1571. AddCallable("GroupingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1572. NNodes::TCoGroupingCore core(&node);
  1573. const auto stream = MkqlBuildExpr(core.Input().Ref(), ctx);
  1574. const auto groupSwitch = [&](TRuntimeNode key, TRuntimeNode item) {
  1575. return MkqlBuildLambda(core.GroupSwitch().Ref(), ctx, {key, item});
  1576. };
  1577. const auto keyExtractor = [&](TRuntimeNode item) {
  1578. return MkqlBuildLambda(core.KeyExtractor().Ref(), ctx, {item});
  1579. };
  1580. TProgramBuilder::TUnaryLambda handler;
  1581. if (auto lambda = core.ConvertHandler()) {
  1582. handler = [&](TRuntimeNode item) {
  1583. return MkqlBuildLambda(core.ConvertHandler().Ref(), ctx, {item});
  1584. };
  1585. }
  1586. return ctx.ProgramBuilder.GroupingCore(stream, groupSwitch, keyExtractor, handler);
  1587. });
  1588. AddCallable("Chopper", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1589. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1590. const auto keyExtractor = [&](TRuntimeNode item) {
  1591. return MkqlBuildLambda(*node.Child(1U), ctx, {item});
  1592. };
  1593. const auto groupSwitch = [&](TRuntimeNode key, TRuntimeNode item) {
  1594. return MkqlBuildLambda(*node.Child(2U), ctx, {key, item});
  1595. };
  1596. const auto handler = [&](TRuntimeNode key, TRuntimeNode flow) {
  1597. return MkqlBuildLambda(node.Tail(), ctx, {key, flow});
  1598. };
  1599. return ctx.ProgramBuilder.Chopper(stream, keyExtractor, groupSwitch, handler);
  1600. });
  1601. AddCallable("HoppingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1602. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1603. const auto timeExtractor = [&](TRuntimeNode item) {
  1604. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1605. };
  1606. const auto hop = MkqlBuildExpr(*node.Child(2), ctx);
  1607. const auto interval = MkqlBuildExpr(*node.Child(3), ctx);
  1608. const auto delay = MkqlBuildExpr(*node.Child(4), ctx);
  1609. const auto init = [&](TRuntimeNode item) {
  1610. return MkqlBuildLambda(*node.Child(5), ctx, {item});
  1611. };
  1612. const auto update = [&](TRuntimeNode item, TRuntimeNode state) {
  1613. return MkqlBuildLambda(*node.Child(6), ctx, {item, state});
  1614. };
  1615. const auto save = node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1616. return MkqlBuildLambda(*node.Child(7), ctx, {state});
  1617. };
  1618. const auto load = node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1619. return MkqlBuildLambda(*node.Child(8), ctx, {state});
  1620. };
  1621. const auto merge = [&](TRuntimeNode state1, TRuntimeNode state2) {
  1622. return MkqlBuildLambda(*node.Child(9), ctx, {state1, state2});
  1623. };
  1624. const auto finish = [&](TRuntimeNode state, TRuntimeNode time) {
  1625. return MkqlBuildLambda(*node.Child(10), ctx, {state, time});
  1626. };
  1627. return ctx.ProgramBuilder.HoppingCore(
  1628. stream, timeExtractor, init, update, save, load, merge, finish, hop, interval, delay);
  1629. });
  1630. AddCallable("MultiHoppingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1631. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1632. const auto keyExtractor = [&](TRuntimeNode item) {
  1633. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1634. };
  1635. const auto timeExtractor = [&](TRuntimeNode item) {
  1636. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1637. };
  1638. const auto hop = MkqlBuildExpr(*node.Child(3), ctx);
  1639. const auto interval = MkqlBuildExpr(*node.Child(4), ctx);
  1640. const auto delay = MkqlBuildExpr(*node.Child(5), ctx);
  1641. const auto dataWatermarks = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(6), NUdf::EDataSlot::Bool));
  1642. const auto init = [&](TRuntimeNode item) {
  1643. return MkqlBuildLambda(*node.Child(7), ctx, {item});
  1644. };
  1645. const auto update = [&](TRuntimeNode item, TRuntimeNode state) {
  1646. return MkqlBuildLambda(*node.Child(8), ctx, {item, state});
  1647. };
  1648. const auto save = node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1649. return MkqlBuildLambda(*node.Child(9), ctx, {state});
  1650. };
  1651. const auto load = node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1652. return MkqlBuildLambda(*node.Child(10), ctx, {state});
  1653. };
  1654. const auto merge = [&](TRuntimeNode state1, TRuntimeNode state2) {
  1655. return MkqlBuildLambda(*node.Child(11), ctx, {state1, state2});
  1656. };
  1657. const auto finish = [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) {
  1658. return MkqlBuildLambda(*node.Child(12), ctx, {key, state, time});
  1659. };
  1660. const auto watermarksMode = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(13), NUdf::EDataSlot::Bool));
  1661. return ctx.ProgramBuilder.MultiHoppingCore(
  1662. stream, keyExtractor, timeExtractor, init, update, save, load, merge, finish,
  1663. hop, interval, delay, dataWatermarks, watermarksMode);
  1664. });
  1665. AddCallable("ToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1666. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1667. TMaybe<bool> isMany;
  1668. TMaybe<EDictType> type;
  1669. TMaybe<ui64> itemsCount;
  1670. bool isCompact;
  1671. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1672. ythrow TNodeException(node) << error->GetMessage();
  1673. }
  1674. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1675. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::ToHashedDict : &TProgramBuilder::ToSortedDict;
  1676. return (ctx.ProgramBuilder.*factory)(list, *isMany, [&](TRuntimeNode item) {
  1677. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1678. }, [&](TRuntimeNode item) {
  1679. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1680. }, isCompact, itemsCount.GetOrElse(0));
  1681. });
  1682. AddCallable("SqueezeToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1683. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1684. TMaybe<bool> isMany;
  1685. TMaybe<EDictType> type;
  1686. TMaybe<ui64> itemsCount;
  1687. bool isCompact;
  1688. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1689. ythrow TNodeException(node) << error->GetMessage();
  1690. }
  1691. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1692. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::SqueezeToHashedDict : &TProgramBuilder::SqueezeToSortedDict;
  1693. return (ctx.ProgramBuilder.*factory)(stream, *isMany, [&](TRuntimeNode item) {
  1694. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1695. }, [&](TRuntimeNode item) {
  1696. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1697. }, isCompact, itemsCount.GetOrElse(0));
  1698. });
  1699. AddCallable("NarrowSqueezeToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1700. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1701. TMaybe<bool> isMany;
  1702. TMaybe<EDictType> type;
  1703. TMaybe<ui64> itemsCount;
  1704. bool isCompact;
  1705. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1706. ythrow TNodeException(node) << error->GetMessage();
  1707. }
  1708. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1709. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::NarrowSqueezeToHashedDict : &TProgramBuilder::NarrowSqueezeToSortedDict;
  1710. return (ctx.ProgramBuilder.*factory)(stream, *isMany, [&](TRuntimeNode::TList items) {
  1711. return MkqlBuildLambda(*node.Child(1), ctx, items);
  1712. }, [&](TRuntimeNode::TList items) {
  1713. return MkqlBuildLambda(*node.Child(2), ctx, items);
  1714. }, isCompact, itemsCount.GetOrElse(0));
  1715. });
  1716. AddCallable("GroupByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1717. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1718. const auto dict = ctx.ProgramBuilder.ToHashedDict(list, true, [&](TRuntimeNode item) {
  1719. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1720. }, [&](TRuntimeNode item) {
  1721. return item;
  1722. });
  1723. const auto values = ctx.ProgramBuilder.DictItems(dict);
  1724. return ctx.ProgramBuilder.FlatMap(values, [&](TRuntimeNode item) {
  1725. const auto key = ctx.ProgramBuilder.Nth(item, 0);
  1726. const auto payloadList = ctx.ProgramBuilder.Nth(item, 1);
  1727. return MkqlBuildLambda(*node.Child(2), ctx, {key, payloadList});
  1728. });
  1729. });
  1730. AddCallable("PartitionByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1731. const NNodes::TCoPartitionByKey partition(&node);
  1732. const auto input = MkqlBuildExpr(partition.Input().Ref(), ctx);
  1733. const auto makePartitions = [&](TRuntimeNode list) {
  1734. return ctx.ProgramBuilder.Map(
  1735. ctx.ProgramBuilder.DictItems(ctx.ProgramBuilder.ToHashedDict(list, true,
  1736. [&](TRuntimeNode item) { return MkqlBuildLambda(partition.KeySelectorLambda().Ref(), ctx, {item}); },
  1737. [&](TRuntimeNode item) { return item; }
  1738. )),
  1739. [&](TRuntimeNode pair) {
  1740. const auto payload = partition.SortDirections().Ref().IsCallable("Void") ?
  1741. ctx.ProgramBuilder.Nth(pair, 1):
  1742. ctx.ProgramBuilder.Sort(ctx.ProgramBuilder.Nth(pair, 1), MkqlBuildExpr(partition.SortDirections().Ref(), ctx),
  1743. [&](TRuntimeNode item) {
  1744. return MkqlBuildLambda(partition.SortKeySelectorLambda().Ref(), ctx, {item});
  1745. }
  1746. );
  1747. return ctx.ProgramBuilder.NewTuple({ctx.ProgramBuilder.Nth(pair, 0), ctx.ProgramBuilder.Iterator(payload, {list})});
  1748. }
  1749. );
  1750. };
  1751. switch (const auto kind = partition.Ref().GetTypeAnn()->GetKind()) {
  1752. case ETypeAnnotationKind::Flow:
  1753. case ETypeAnnotationKind::Stream: {
  1754. const auto sorted = ctx.ProgramBuilder.FlatMap(
  1755. ctx.ProgramBuilder.Condense1(input,
  1756. [&](TRuntimeNode item) { return ctx.ProgramBuilder.AsList(item); },
  1757. [&](TRuntimeNode, TRuntimeNode) { return ctx.ProgramBuilder.NewDataLiteral(false); },
  1758. [&](TRuntimeNode item, TRuntimeNode state) { return ctx.ProgramBuilder.Append(state, item); }
  1759. ),
  1760. makePartitions
  1761. );
  1762. return ETypeAnnotationKind::Stream == kind ?MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {sorted}):
  1763. ctx.ProgramBuilder.ToFlow(MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {ctx.ProgramBuilder.FromFlow(sorted)}));
  1764. }
  1765. case ETypeAnnotationKind::List: {
  1766. const auto sorted = ctx.ProgramBuilder.Iterator(makePartitions(input), {});
  1767. return ctx.ProgramBuilder.Collect(MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {sorted}));
  1768. }
  1769. default: break;
  1770. }
  1771. Y_ABORT("Wrong case.");
  1772. });
  1773. AddCallable("CombineByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1774. return CombineByKeyImpl(node, ctx);
  1775. });
  1776. AddCallable("Enumerate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1777. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1778. TRuntimeNode start;
  1779. if (node.ChildrenSize() > 1) {
  1780. start = MkqlBuildExpr(*node.Child(1), ctx);
  1781. } else {
  1782. start = ctx.ProgramBuilder.NewDataLiteral<ui64>(0);
  1783. }
  1784. TRuntimeNode step;
  1785. if (node.ChildrenSize() > 2) {
  1786. step = MkqlBuildExpr(node.Tail(), ctx);
  1787. } else {
  1788. step = ctx.ProgramBuilder.NewDataLiteral<ui64>(1);
  1789. }
  1790. return ctx.ProgramBuilder.Enumerate(arg, start, step);
  1791. });
  1792. AddCallable("Dict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1793. const auto listType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1794. const auto dictType = AS_TYPE(TDictType, listType);
  1795. std::vector<std::pair<TRuntimeNode, TRuntimeNode>> items;
  1796. for (size_t i = 1; i < node.ChildrenSize(); ++i) {
  1797. const auto key = MkqlBuildExpr(node.Child(i)->Head(), ctx);
  1798. const auto payload = MkqlBuildExpr(node.Child(i)->Tail(), ctx);
  1799. items.emplace_back(key, payload);
  1800. }
  1801. return ctx.ProgramBuilder.NewDict(dictType, items);
  1802. });
  1803. AddCallable("Variant", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1804. const auto varType = node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TVariantExprType>();
  1805. const auto type = ctx.BuildType(*node.Child(2), *varType);
  1806. const auto item = MkqlBuildExpr(node.Head(), ctx);
  1807. return varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple ?
  1808. ctx.ProgramBuilder.NewVariant(item, FromString<ui32>(node.Child(1)->Content()), type) :
  1809. ctx.ProgramBuilder.NewVariant(item, node.Child(1)->Content(), type);
  1810. });
  1811. AddCallable("DynamicVariant", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1812. const auto varType = ctx.BuildType(*node.Child(2), *node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1813. const auto item = MkqlBuildExpr(node.Head(), ctx);
  1814. const auto index = MkqlBuildExpr(*node.Child(1), ctx);
  1815. return ctx.ProgramBuilder.DynamicVariant(item, index, varType);
  1816. });
  1817. AddCallable("AsStruct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1818. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  1819. members.reserve(node.ChildrenSize());
  1820. node.ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Head().Content(), MkqlBuildExpr(child.Tail(), ctx)); });
  1821. return ctx.ProgramBuilder.NewStruct(members);
  1822. });
  1823. AddCallable("AsDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1824. std::vector<std::pair<TRuntimeNode, TRuntimeNode>> items;
  1825. items.reserve(node.ChildrenSize());
  1826. node.ForEachChild([&](const TExprNode& child){ items.emplace_back(MkqlBuildExpr(*child.Child(0), ctx), MkqlBuildExpr(*child.Child(1), ctx)); });
  1827. const auto dictType = ctx.ProgramBuilder.NewDictType(items[0].first.GetStaticType(), items[0].second.GetStaticType(), false);
  1828. return ctx.ProgramBuilder.NewDict(dictType, items);
  1829. });
  1830. AddCallable("Ensure", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1831. const auto value = MkqlBuildExpr(node.Head(), ctx);
  1832. const auto predicate = MkqlBuildExpr(*node.Child(1), ctx);
  1833. const auto message = node.ChildrenSize() > 2 ? MkqlBuildExpr(node.Tail(), ctx) : ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  1834. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1835. return ctx.ProgramBuilder.Ensure(value, predicate, message, pos.File, pos.Row, pos.Column);
  1836. });
  1837. AddCallable("Replicate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1838. const auto value = MkqlBuildExpr(node.Head(), ctx);
  1839. const auto count = MkqlBuildExpr(*node.Child(1), ctx);
  1840. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1841. return ctx.ProgramBuilder.Replicate(value, count, pos.File, pos.Row, pos.Column);
  1842. });
  1843. AddCallable("IfPresent", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1844. TRuntimeNode::TList optionals;
  1845. const auto width = node.ChildrenSize() - 2U;
  1846. optionals.reserve(width);
  1847. auto i = 0U;
  1848. std::generate_n(std::back_inserter(optionals), width, [&](){ return MkqlBuildExpr(*node.Child(i++), ctx); });
  1849. const auto elseBranch = MkqlBuildExpr(node.Tail(), ctx);
  1850. return ctx.ProgramBuilder.IfPresent(optionals, [&](TRuntimeNode::TList items) {
  1851. return MkqlBuildLambda(*node.Child(width), ctx, items);
  1852. }, elseBranch);
  1853. });
  1854. AddCallable({"DataType",
  1855. "ListType",
  1856. "OptionalType",
  1857. "TupleType",
  1858. "StructType",
  1859. "DictType",
  1860. "VoidType",
  1861. "NullType",
  1862. "CallableType",
  1863. "UnitType",
  1864. "GenericType",
  1865. "ResourceType",
  1866. "TaggedType",
  1867. "VariantType",
  1868. "StreamType",
  1869. "FlowType",
  1870. "EmptyListType",
  1871. "EmptyDictType"},
  1872. [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1873. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1874. return TRuntimeNode(type, true);
  1875. });
  1876. AddCallable("ParseType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1877. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1878. return TRuntimeNode(type, true);
  1879. });
  1880. AddCallable("TypeOf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1881. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1882. return TRuntimeNode(type, true);
  1883. });
  1884. AddCallable("EmptyList", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1885. Y_UNUSED(node);
  1886. if (RuntimeVersion < 11) {
  1887. return ctx.ProgramBuilder.NewEmptyList(ctx.ProgramBuilder.NewVoid().GetStaticType());
  1888. } else {
  1889. return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyListLazy(), true);
  1890. }
  1891. });
  1892. AddCallable("EmptyDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1893. Y_UNUSED(node);
  1894. if (RuntimeVersion < 11) {
  1895. auto voidType = ctx.ProgramBuilder.NewVoid().GetStaticType();
  1896. auto dictType = ctx.ProgramBuilder.NewDictType(voidType, voidType, false);
  1897. return ctx.ProgramBuilder.NewDict(dictType, {});
  1898. } else {
  1899. return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyDictLazy(), true);
  1900. }
  1901. });
  1902. AddCallable("SourceOf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1903. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1904. return ctx.ProgramBuilder.SourceOf(type);
  1905. });
  1906. AddCallable("TypeHandle", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1907. const auto type = node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  1908. const auto yson = WriteTypeToYson(type);
  1909. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1910. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1911. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(yson));
  1912. return TRuntimeNode(call.Build(), false);
  1913. });
  1914. AddCallable("ReprCode", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1915. const auto type = node.Head().GetTypeAnn();
  1916. const auto yson = WriteTypeToYson(type);
  1917. const auto& args = GetAllArguments(node, ctx);
  1918. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1919. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1920. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1921. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1922. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1923. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1924. for (auto arg : args) {
  1925. call.Add(arg);
  1926. }
  1927. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(yson));
  1928. return TRuntimeNode(call.Build(), false);
  1929. });
  1930. // safe and position unaware
  1931. AddCallable({
  1932. "SerializeTypeHandle",
  1933. "TypeKind",
  1934. "FormatCode",
  1935. "FormatCodeWithPositions",
  1936. "SerializeCode",
  1937. }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1938. const auto& args = GetAllArguments(node, ctx);
  1939. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1940. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1941. for (auto arg : args) {
  1942. call.Add(arg);
  1943. }
  1944. return TRuntimeNode(call.Build(), false);
  1945. });
  1946. // with position
  1947. AddCallable({
  1948. "ParseTypeHandle",
  1949. "DataTypeComponents",
  1950. "DataTypeHandle",
  1951. "OptionalItemType",
  1952. "OptionalTypeHandle",
  1953. "ListItemType",
  1954. "ListTypeHandle",
  1955. "StreamItemType",
  1956. "StreamTypeHandle",
  1957. "TupleTypeComponents",
  1958. "TupleTypeHandle",
  1959. "StructTypeComponents",
  1960. "StructTypeHandle",
  1961. "DictTypeComponents",
  1962. "DictTypeHandle",
  1963. "ResourceTypeTag",
  1964. "ResourceTypeHandle",
  1965. "TaggedTypeComponents",
  1966. "TaggedTypeHandle",
  1967. "VariantUnderlyingType",
  1968. "VariantTypeHandle",
  1969. "VoidTypeHandle",
  1970. "NullTypeHandle",
  1971. "EmptyListTypeHandle",
  1972. "EmptyDictTypeHandle",
  1973. "CallableTypeComponents",
  1974. "CallableArgument",
  1975. "CallableTypeHandle",
  1976. "PgTypeName",
  1977. "PgTypeHandle",
  1978. "WorldCode",
  1979. "AtomCode",
  1980. "ListCode",
  1981. "FuncCode",
  1982. }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1983. const auto& args = GetAllArguments(node, ctx);
  1984. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1985. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1986. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1987. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1988. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1989. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1990. for (auto arg : args) {
  1991. call.Add(arg);
  1992. }
  1993. return TRuntimeNode(call.Build(), false);
  1994. });
  1995. AddCallable("LambdaCode", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1996. const auto lambda = node.Child(node.ChildrenSize() - 1);
  1997. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1998. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1999. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  2000. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  2001. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  2002. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  2003. if (node.ChildrenSize() == 2) {
  2004. auto count = MkqlBuildExpr(node.Head(), ctx);
  2005. call.Add(count);
  2006. } else {
  2007. call.Add(ctx.ProgramBuilder.NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
  2008. }
  2009. TRuntimeNode body;
  2010. {
  2011. TMkqlBuildContext::TArgumentsMap innerArguments;
  2012. innerArguments.reserve(lambda->Head().ChildrenSize());
  2013. lambda->Head().ForEachChild([&](const TExprNode& argNode) {
  2014. const auto argType = ctx.BuildType(argNode, *argNode.GetTypeAnn());
  2015. const auto arg = ctx.ProgramBuilder.Arg(argType);
  2016. innerArguments.emplace(&argNode, arg);
  2017. call.Add(arg);
  2018. });
  2019. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda->UniqueId());
  2020. body = MkqlBuildExpr(*lambda->Child(1), innerCtx);
  2021. }
  2022. call.Add(body);
  2023. return TRuntimeNode(call.Build(), false);
  2024. });
  2025. AddCallable("FormatType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2026. TRuntimeNode str;
  2027. if (node.Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource) {
  2028. auto handle = MkqlBuildExpr(node.Head(), ctx);
  2029. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2030. call.Add(handle);
  2031. str = TRuntimeNode(call.Build(), false);
  2032. } else {
  2033. str = ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(FormatType(node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType()));
  2034. }
  2035. return str;
  2036. });
  2037. AddCallable("FormatTypeDiff", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2038. if (node.Child(0)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource) { // if we got resource + resource
  2039. YQL_ENSURE(node.Child(1)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource);
  2040. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2041. call.Add(MkqlBuildExpr(*node.Child(0), ctx));
  2042. call.Add(MkqlBuildExpr(*node.Child(1), ctx));
  2043. call.Add(ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(2), NUdf::EDataSlot::Bool)));
  2044. return TRuntimeNode(call.Build(), false);
  2045. } else { // if we got type + type
  2046. bool pretty = FromString<bool>(*node.Child(2), NUdf::EDataSlot::Bool);
  2047. const auto type_left = node.Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  2048. const auto type_right = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  2049. return pretty ? ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(NYql::GetTypePrettyDiff(*type_left, *type_right)) :
  2050. ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(NYql::GetTypeDiff(*type_left, *type_right));
  2051. }
  2052. });
  2053. AddCallable("Void", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2054. return ctx.ProgramBuilder.NewVoid();
  2055. });
  2056. AddCallable("Null", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2057. return ctx.ProgramBuilder.NewNull();
  2058. });
  2059. AddCallable({ "AsTagged","Untag" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2060. auto input = MkqlBuildExpr(node.Head(), ctx);
  2061. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2062. return ctx.ProgramBuilder.Nop(input, returnType);
  2063. });
  2064. AddCallable({"TableSource", "WideTableSource"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2065. return MkqlBuildExpr(node.Head(), ctx);
  2066. });
  2067. AddCallable({"WithWorld"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2068. return MkqlBuildExpr(node.Head(), ctx);
  2069. });
  2070. AddCallable("Error", [](const TExprNode& node, TMkqlBuildContext& ctx)->NKikimr::NMiniKQL::TRuntimeNode {
  2071. const auto err = node.GetTypeAnn()->Cast<TErrorExprType>()->GetError();
  2072. ythrow TNodeException(ctx.ExprCtx.AppendPosition(err.Position)) << err.GetMessage();
  2073. });
  2074. AddCallable("ErrorType", [](const TExprNode& node, TMkqlBuildContext& ctx)->NKikimr::NMiniKQL::TRuntimeNode {
  2075. const auto err = node.GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TErrorExprType>()->GetError();
  2076. ythrow TNodeException(ctx.ExprCtx.AppendPosition(err.Position)) << err.GetMessage();
  2077. });
  2078. AddCallable("Join", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2079. const auto list1 = MkqlBuildExpr(node.Head(), ctx);
  2080. const auto list2 = MkqlBuildExpr(*node.Child(1), ctx);
  2081. const auto dict1 = ctx.ProgramBuilder.ToHashedDict(list1, true, [&](TRuntimeNode item) {
  2082. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  2083. }, [&](TRuntimeNode item) {
  2084. return item;
  2085. });
  2086. const auto dict2 = ctx.ProgramBuilder.ToHashedDict(list2, true, [&](TRuntimeNode item) {
  2087. return MkqlBuildLambda(*node.Child(3), ctx, {item});
  2088. }, [&](TRuntimeNode item) {
  2089. return item;
  2090. });
  2091. const auto joinKind = GetJoinKind(node, node.Child(4)->Content());
  2092. return ctx.ProgramBuilder.JoinDict(dict1, true, dict2, true, joinKind);
  2093. });
  2094. AddCallable("JoinDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2095. const auto dict1 = MkqlBuildExpr(*node.Child(0), ctx);
  2096. const auto dict2 = MkqlBuildExpr(*node.Child(1), ctx);
  2097. const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
  2098. bool multi1 = true, multi2 = true;
  2099. if (node.ChildrenSize() > 3) {
  2100. node.Tail().ForEachChild([&](const TExprNode& flag){
  2101. if (const auto& content = flag.Content(); content == "LeftUnique")
  2102. multi1 = false;
  2103. else if ( content == "RightUnique")
  2104. multi2 = false;
  2105. });
  2106. }
  2107. return ctx.ProgramBuilder.JoinDict(dict1, multi1, dict2, multi2, joinKind);
  2108. });
  2109. AddCallable({"FilePath", "FileContent", "FolderPath"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2110. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2111. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content()));
  2112. return TRuntimeNode(call.Build(), false);
  2113. });
  2114. AddCallable("TablePath", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2115. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  2116. });
  2117. AddCallable("TableRecord", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2118. return ctx.ProgramBuilder.NewDataLiteral<ui64>(0);
  2119. });
  2120. AddCallable("Udf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2121. YQL_ENSURE(node.ChildrenSize() == 8);
  2122. std::string_view function = node.Head().Content();
  2123. const auto runConfig = MkqlBuildExpr(*node.Child(1), ctx);
  2124. const auto userType = ctx.BuildType(*node.Child(2), *node.Child(2)->GetTypeAnn());
  2125. const auto typeConfig = node.Child(3)->Content();
  2126. const auto callableType = ctx.BuildType(node, *node.GetTypeAnn());
  2127. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2128. return ctx.ProgramBuilder.TypedUdf(function, callableType, runConfig, userType, typeConfig,
  2129. pos.File, pos.Row, pos.Column);
  2130. });
  2131. AddCallable("ScriptUdf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2132. EScriptType scriptType = ScriptTypeFromStr(node.Head().Content());
  2133. if (scriptType == EScriptType::Unknown) {
  2134. ythrow TNodeException(node.Head())
  2135. << "Unknown script type '"
  2136. << node.Head().Content() << '\'';
  2137. }
  2138. std::string_view funcName = node.Child(1)->Content();
  2139. const auto typeNode = node.Child(2);
  2140. const auto funcType = ctx.BuildType(*typeNode, *typeNode->GetTypeAnn());
  2141. const auto script = MkqlBuildExpr(*node.Child(3), ctx);
  2142. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2143. return ctx.ProgramBuilder.ScriptUdf(node.Head().Content(), funcName, funcType, script,
  2144. pos.File, pos.Row, pos.Column);
  2145. });
  2146. AddCallable("Apply", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2147. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2148. const auto callable = MkqlBuildExpr(node.Head(), ctx);
  2149. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  2150. return ctx.ProgramBuilder.Apply(callable, args, pos.File, pos.Row, pos.Column);
  2151. });
  2152. AddCallable("NamedApply", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2153. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2154. const auto callable = MkqlBuildExpr(node.Head(), ctx);
  2155. const auto positionalArgs = MkqlBuildExpr(*node.Child(1), ctx);
  2156. const auto namedArgs = MkqlBuildExpr(*node.Child(2), ctx);
  2157. const auto dependentNodes = node.ChildrenSize() - 3;
  2158. const auto callableType = node.Head().GetTypeAnn()->Cast<TCallableExprType>();
  2159. const auto tupleType = node.Child(1)->GetTypeAnn()->Cast<TTupleExprType>();
  2160. const auto structType = node.Child(2)->GetTypeAnn()->Cast<TStructExprType>();
  2161. std::vector<TRuntimeNode> args(callableType->GetArgumentsSize() + dependentNodes);
  2162. for (size_t i = 0; i < tupleType->GetSize(); ++i) {
  2163. args[i] = node.Child(1)->IsList() ?
  2164. MkqlBuildExpr(*node.Child(1)->Child(i), ctx):
  2165. ctx.ProgramBuilder.Nth(positionalArgs, i);
  2166. }
  2167. for (size_t i = 0; i < structType->GetSize(); ++i) {
  2168. auto memberName = structType->GetItems()[i]->GetName();
  2169. auto index = callableType->ArgumentIndexByName(memberName);
  2170. if (!index || *index < tupleType->GetSize()) {
  2171. ythrow TNodeException(node.Child(2)) << "Wrong named argument: " << memberName;
  2172. }
  2173. TRuntimeNode arg;
  2174. if (node.Child(2)->IsCallable("AsStruct")) {
  2175. for (auto& child : node.Child(2)->Children()) {
  2176. if (child->Head().Content() == memberName) {
  2177. arg = MkqlBuildExpr(child->Tail(), ctx);
  2178. break;
  2179. }
  2180. }
  2181. if (!arg.GetNode()) {
  2182. ythrow TNodeException(node.Child(2)) << "Missing argument: " << memberName;
  2183. }
  2184. }
  2185. else {
  2186. arg = ctx.ProgramBuilder.Member(namedArgs, memberName);
  2187. }
  2188. args[*index] = arg;
  2189. }
  2190. for (ui32 i = tupleType->GetSize(); i < callableType->GetArgumentsSize(); ++i) {
  2191. auto& arg = args[i];
  2192. if (arg.GetNode()) {
  2193. continue;
  2194. }
  2195. auto mkqlType = ctx.BuildType(node, *callableType->GetArguments()[i].Type);
  2196. arg = ctx.ProgramBuilder.NewEmptyOptional(mkqlType);
  2197. }
  2198. for (ui32 i = 0; i < dependentNodes; ++i) {
  2199. args[callableType->GetArgumentsSize() + i] = MkqlBuildExpr(*node.Child(3 + i), ctx);
  2200. }
  2201. return ctx.ProgramBuilder.Apply(callable, args, pos.File, pos.Row, pos.Column, dependentNodes);
  2202. });
  2203. AddCallable("Callable", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2204. const auto callableType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn());
  2205. return ctx.ProgramBuilder.Callable(callableType, [&](const TArrayRef<const TRuntimeNode>& args) {
  2206. const auto& lambda = node.Tail();
  2207. TMkqlBuildContext::TArgumentsMap innerArguments;
  2208. innerArguments.reserve(lambda.Head().ChildrenSize());
  2209. MKQL_ENSURE(args.size() == lambda.Head().ChildrenSize(), "Mismatch of lambda arguments count");
  2210. auto it = args.cbegin();
  2211. lambda.Head().ForEachChild([&](const TExprNode& arg){ innerArguments.emplace(&arg, *it++); });
  2212. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2213. return MkqlBuildExpr(lambda.Tail(), innerCtx);
  2214. });
  2215. });
  2216. AddCallable("PgConst", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2217. auto type = AS_TYPE(TPgType, ctx.BuildType(node, *node.GetTypeAnn()));
  2218. TRuntimeNode typeMod;
  2219. if (node.ChildrenSize() >= 3) {
  2220. typeMod = MkqlBuildExpr(*node.Child(2), ctx);
  2221. }
  2222. auto typeMod1 = typeMod;
  2223. if (node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "interval" &&
  2224. node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "_interval") {
  2225. typeMod1 = TRuntimeNode();
  2226. }
  2227. auto ret = ctx.ProgramBuilder.PgConst(type, node.Head().Content(), typeMod1);
  2228. if (node.ChildrenSize() >= 3) {
  2229. return ctx.ProgramBuilder.PgCast(ret, type, typeMod);
  2230. } else {
  2231. return ret;
  2232. }
  2233. });
  2234. AddCallable("PgInternal0", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2235. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2236. return ctx.ProgramBuilder.PgInternal0(returnType);
  2237. });
  2238. AddCallable({"PgResolvedCall","PgResolvedCallCtx" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2239. auto name = node.Head().Content();
  2240. auto id = FromString<ui32>(node.Child(1)->Content());
  2241. std::vector<TRuntimeNode> args;
  2242. args.reserve(node.ChildrenSize() - 3);
  2243. for (ui32 i = 3; i < node.ChildrenSize(); ++i) {
  2244. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2245. }
  2246. bool rangeFunction = false;
  2247. for (const auto& child : node.Child(2)->Children()) {
  2248. if (child->Head().Content() == "range") {
  2249. rangeFunction = true;
  2250. }
  2251. }
  2252. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2253. return ctx.ProgramBuilder.PgResolvedCall(node.IsCallable("PgResolvedCallCtx"), name, id, args, returnType, rangeFunction);
  2254. });
  2255. AddCallable("PgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2256. auto operId = FromString<ui32>(node.Child(1)->Content());
  2257. auto procId = NPg::LookupOper(operId).ProcId;
  2258. auto procName = NPg::LookupProc(procId).Name;
  2259. std::vector<TRuntimeNode> args;
  2260. args.reserve(node.ChildrenSize() - 2);
  2261. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2262. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2263. }
  2264. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2265. return ctx.ProgramBuilder.PgResolvedCall(false, procName, procId, args, returnType, false);
  2266. });
  2267. AddCallable("BlockPgResolvedCall", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2268. auto name = node.Head().Content();
  2269. auto id = FromString<ui32>(node.Child(1)->Content());
  2270. std::vector<TRuntimeNode> args;
  2271. args.reserve(node.ChildrenSize() - 3);
  2272. for (ui32 i = 3; i < node.ChildrenSize(); ++i) {
  2273. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2274. }
  2275. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2276. return ctx.ProgramBuilder.BlockPgResolvedCall(name, id, args, returnType);
  2277. });
  2278. AddCallable("BlockPgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2279. auto operId = FromString<ui32>(node.Child(1)->Content());
  2280. auto procId = NPg::LookupOper(operId).ProcId;
  2281. auto procName = NPg::LookupProc(procId).Name;
  2282. std::vector<TRuntimeNode> args;
  2283. args.reserve(node.ChildrenSize() - 2);
  2284. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2285. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2286. }
  2287. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2288. return ctx.ProgramBuilder.BlockPgResolvedCall(procName, procId, args, returnType);
  2289. });
  2290. AddCallable("PgCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2291. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2292. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2293. TRuntimeNode typeMod;
  2294. if (node.ChildrenSize() >= 3) {
  2295. typeMod = MkqlBuildExpr(*node.Child(2), ctx);
  2296. }
  2297. auto typeMod1 = typeMod;
  2298. if (node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "interval" &&
  2299. node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "_interval") {
  2300. typeMod1 = TRuntimeNode();
  2301. }
  2302. if (node.Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Null) {
  2303. auto sourceTypeId = node.Head().GetTypeAnn()->Cast<TPgExprType>()->GetId();
  2304. auto targetTypeId = node.GetTypeAnn()->Cast<TPgExprType>()->GetId();
  2305. const auto& sourceTypeDesc = NPg::LookupType(sourceTypeId);
  2306. const auto& targetTypeDesc = NPg::LookupType(targetTypeId);
  2307. const bool isSourceArray = sourceTypeDesc.TypeId == sourceTypeDesc.ArrayTypeId;
  2308. const bool isTargetArray = targetTypeDesc.TypeId == targetTypeDesc.ArrayTypeId;
  2309. if (isSourceArray == isTargetArray && NPg::HasCast(
  2310. isSourceArray ? sourceTypeDesc.ElementTypeId : sourceTypeId,
  2311. isTargetArray ? targetTypeDesc.ElementTypeId : targetTypeId)) {
  2312. typeMod1 = typeMod;
  2313. }
  2314. }
  2315. auto cast = ctx.ProgramBuilder.PgCast(input, returnType, typeMod1);
  2316. if (node.ChildrenSize() >= 3) {
  2317. return ctx.ProgramBuilder.PgCast(cast, returnType, typeMod);
  2318. } else {
  2319. return cast;
  2320. }
  2321. });
  2322. AddCallable("FromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2323. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2324. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2325. return ctx.ProgramBuilder.FromPg(input, returnType);
  2326. });
  2327. AddCallable("ToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2328. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2329. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2330. return ctx.ProgramBuilder.ToPg(input, returnType);
  2331. });
  2332. AddCallable("BlockFromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2333. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2334. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2335. return ctx.ProgramBuilder.BlockFromPg(input, returnType);
  2336. });
  2337. AddCallable("BlockToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2338. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2339. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2340. return ctx.ProgramBuilder.BlockToPg(input, returnType);
  2341. });
  2342. AddCallable("PgClone", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2343. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2344. if (IsNull(node.Head())) {
  2345. return input;
  2346. }
  2347. if (NPg::LookupType(node.GetTypeAnn()->Cast<TPgExprType>()->GetId()).PassByValue) {
  2348. return input;
  2349. }
  2350. TVector<TRuntimeNode> dependentNodes;
  2351. for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
  2352. dependentNodes.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2353. }
  2354. return ctx.ProgramBuilder.PgClone(input, dependentNodes);
  2355. });
  2356. AddCallable("PgTableContent", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2357. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2358. return ctx.ProgramBuilder.PgTableContent(
  2359. node.Child(0)->Content(),
  2360. node.Child(1)->Content(),
  2361. returnType);
  2362. });
  2363. AddCallable("PgToRecord", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2364. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2365. TVector<std::pair<std::string_view, std::string_view>> members;
  2366. for (auto child : node.Child(1)->Children()) {
  2367. members.push_back({child->Head().Content(), child->Tail().Content()});
  2368. }
  2369. return ctx.ProgramBuilder.PgToRecord(input, members);
  2370. });
  2371. AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2372. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2373. return ctx.ProgramBuilder.WithContext(input, node.Child(1)->Content());
  2374. });
  2375. AddCallable("BlockFunc", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2376. TVector<TRuntimeNode> args;
  2377. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2378. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2379. }
  2380. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2381. return ctx.ProgramBuilder.BlockFunc(node.Child(0)->Content(), returnType, args);
  2382. });
  2383. AddCallable("BlockMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2384. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  2385. const auto name = node.Tail().Content();
  2386. return ctx.ProgramBuilder.BlockMember(structObj, name);
  2387. });
  2388. AddCallable("BlockNth", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2389. const auto tupleObj = MkqlBuildExpr(node.Head(), ctx);
  2390. const auto index = FromString<ui32>(node.Tail().Content());
  2391. return ctx.ProgramBuilder.BlockNth(tupleObj, index);
  2392. });
  2393. AddCallable("BlockAsStruct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2394. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  2395. for (const auto& x : node.Children()) {
  2396. members.emplace_back(x->Head().Content(), MkqlBuildExpr(x->Tail(), ctx));
  2397. }
  2398. return ctx.ProgramBuilder.BlockAsStruct(members);
  2399. });
  2400. AddCallable("BlockAsTuple", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2401. TVector<TRuntimeNode> args;
  2402. for (const auto& x : node.Children()) {
  2403. args.push_back(MkqlBuildExpr(*x, ctx));
  2404. }
  2405. return ctx.ProgramBuilder.BlockAsTuple(args);
  2406. });
  2407. AddCallable("BlockCombineAll", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2408. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2409. std::optional<ui32> filterColumn;
  2410. if (!node.Child(1)->IsCallable("Void")) {
  2411. filterColumn = FromString<ui32>(node.Child(1)->Content());
  2412. }
  2413. TVector<TAggInfo> aggs;
  2414. for (const auto& agg : node.Child(2)->Children()) {
  2415. TAggInfo info;
  2416. info.Name = TString(agg->Head().Head().Content());
  2417. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2418. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2419. }
  2420. aggs.push_back(info);
  2421. }
  2422. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2423. return ctx.ProgramBuilder.BlockCombineAll(arg, filterColumn, aggs, returnType);
  2424. });
  2425. AddCallable("BlockCombineHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2426. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2427. std::optional<ui32> filterColumn;
  2428. if (!node.Child(1)->IsCallable("Void")) {
  2429. filterColumn = FromString<ui32>(node.Child(1)->Content());
  2430. }
  2431. TVector<ui32> keys;
  2432. for (const auto& key : node.Child(2)->Children()) {
  2433. keys.push_back(FromString<ui32>(key->Content()));
  2434. }
  2435. TVector<TAggInfo> aggs;
  2436. for (const auto& agg : node.Child(3)->Children()) {
  2437. TAggInfo info;
  2438. info.Name = TString(agg->Head().Head().Content());
  2439. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2440. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2441. }
  2442. aggs.push_back(info);
  2443. }
  2444. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2445. return ctx.ProgramBuilder.BlockCombineHashed(arg, filterColumn, keys, aggs, returnType);
  2446. });
  2447. AddCallable("BlockMergeFinalizeHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2448. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2449. TVector<ui32> keys;
  2450. for (const auto& key : node.Child(1)->Children()) {
  2451. keys.push_back(FromString<ui32>(key->Content()));
  2452. }
  2453. TVector<TAggInfo> aggs;
  2454. for (const auto& agg : node.Child(2)->Children()) {
  2455. TAggInfo info;
  2456. info.Name = TString(agg->Head().Head().Content());
  2457. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2458. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2459. }
  2460. aggs.push_back(info);
  2461. }
  2462. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2463. return ctx.ProgramBuilder.BlockMergeFinalizeHashed(arg, keys, aggs, returnType);
  2464. });
  2465. AddCallable("BlockMergeManyFinalizeHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2466. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2467. TVector<ui32> keys;
  2468. for (const auto& key : node.Child(1)->Children()) {
  2469. keys.push_back(FromString<ui32>(key->Content()));
  2470. }
  2471. TVector<TAggInfo> aggs;
  2472. for (const auto& agg : node.Child(2)->Children()) {
  2473. TAggInfo info;
  2474. info.Name = TString(agg->Head().Head().Content());
  2475. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2476. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2477. }
  2478. aggs.push_back(info);
  2479. }
  2480. ui32 streamIndex = FromString<ui32>(node.Child(3)->Content());
  2481. TVector<TVector<ui32>> streams;
  2482. for (const auto& child : node.Child(4)->Children()) {
  2483. auto& stream = streams.emplace_back();
  2484. for (const auto& atom : child->Children()) {
  2485. stream.emplace_back(FromString<ui32>(atom->Content()));
  2486. }
  2487. }
  2488. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2489. return ctx.ProgramBuilder.BlockMergeManyFinalizeHashed(arg, keys, aggs, streamIndex, streams, returnType);
  2490. });
  2491. AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2492. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  2493. const auto index = FromString<ui32>(node.Child(1)->Content());
  2494. return ctx.ProgramBuilder.BlockCompress(flow, index);
  2495. });
  2496. AddCallable("PgArray", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2497. std::vector<TRuntimeNode> args;
  2498. args.reserve(node.ChildrenSize());
  2499. for (ui32 i = 0; i < node.ChildrenSize(); ++i) {
  2500. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2501. }
  2502. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2503. return ctx.ProgramBuilder.PgArray(args, returnType);
  2504. });
  2505. AddCallable("QueueCreate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2506. const auto initCapacity = MkqlBuildExpr(*node.Child(1), ctx);
  2507. const auto initSize = MkqlBuildExpr(*node.Child(2), ctx);
  2508. const auto& args = GetArgumentsFrom<3U>(node, ctx);
  2509. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2510. return ctx.ProgramBuilder.QueueCreate(initCapacity, initSize, args, returnType);
  2511. });
  2512. AddCallable("QueuePeek", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2513. const auto resource = MkqlBuildExpr(node.Head(), ctx);
  2514. const auto index = MkqlBuildExpr(*node.Child(1), ctx);
  2515. const auto& args = GetArgumentsFrom<2U>(node, ctx);
  2516. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2517. return ctx.ProgramBuilder.QueuePeek(resource, index, args, returnType);
  2518. });
  2519. AddCallable("QueueRange", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2520. const auto resource = MkqlBuildExpr(node.Head(), ctx);
  2521. const auto begin = MkqlBuildExpr(*node.Child(1), ctx);
  2522. const auto end = MkqlBuildExpr(*node.Child(2), ctx);
  2523. const auto& args = GetArgumentsFrom<3U>(node, ctx);
  2524. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2525. return ctx.ProgramBuilder.QueueRange(resource, begin, end, args, returnType);
  2526. });
  2527. AddCallable("Seq", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2528. const auto& args = GetArgumentsFrom<0U>(node, ctx);
  2529. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2530. return ctx.ProgramBuilder.Seq(args, returnType);
  2531. });
  2532. AddCallable("FromYsonSimpleType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2533. const auto input = MkqlBuildExpr(node.Head(), ctx);
  2534. const auto schemeType = ParseDataType(node, node.Child(1)->Content());
  2535. return ctx.ProgramBuilder.FromYsonSimpleType(input, schemeType);
  2536. });
  2537. AddCallable("TryWeakMemberFromDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2538. const auto other = MkqlBuildExpr(node.Head(), ctx);
  2539. const auto rest = MkqlBuildExpr(*node.Child(1), ctx);
  2540. const auto schemeType = ParseDataType(node, node.Child(2)->Content());
  2541. const auto member = node.Child(3)->Content();
  2542. return ctx.ProgramBuilder.TryWeakMemberFromDict(other, rest, schemeType, member);
  2543. });
  2544. AddCallable("DependsOn", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2545. return MkqlBuildExpr(node.Head(), ctx);
  2546. });
  2547. AddCallable("Parameter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2548. const NNodes::TCoParameter parameter(&node);
  2549. return ctx.ProgramBuilder.Member(ctx.Parameters, parameter.Name());
  2550. });
  2551. AddCallable("SecureParam", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2552. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content());
  2553. });
  2554. AddCallable(SkippableCallables, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2555. return MkqlBuildExpr(node.Head(), ctx);
  2556. });
  2557. AddCallable({ "AssumeStrict", "AssumeNonStrict", "Likely" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2558. return MkqlBuildExpr(node.Head(), ctx);
  2559. });
  2560. AddCallable("Merge", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2561. const auto& args = GetAllArguments(node, ctx);
  2562. auto extend = ctx.ProgramBuilder.Extend(args);
  2563. if (auto sortConstr = node.GetConstraint<TSortedConstraintNode>()) {
  2564. const auto input = MkqlBuildExpr(node.Head(), ctx);
  2565. const auto& content = sortConstr->GetContent();
  2566. std::vector<TRuntimeNode> ascending;
  2567. ascending.reserve(content.size());
  2568. for (const auto& c: content) {
  2569. ascending.push_back(ctx.ProgramBuilder.NewDataLiteral(c.second));
  2570. }
  2571. TProgramBuilder::TUnaryLambda keyExractor = [&](TRuntimeNode item) {
  2572. std::vector<TRuntimeNode> keys;
  2573. keys.reserve(content.size());
  2574. for (const auto& c : content) {
  2575. if (c.first.front().empty())
  2576. keys.push_back(item);
  2577. else {
  2578. MKQL_ENSURE(c.first.front().size() == 1U, "Just column expected.");
  2579. keys.push_back(ctx.ProgramBuilder.Member(item, c.first.front().front()));
  2580. }
  2581. }
  2582. return ctx.ProgramBuilder.NewTuple(keys);
  2583. };
  2584. return ctx.ProgramBuilder.Sort(extend, ctx.ProgramBuilder.NewTuple(ascending), keyExractor);
  2585. }
  2586. else {
  2587. return extend;
  2588. }
  2589. });
  2590. }
  2591. TRuntimeNode MkqlBuildLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const TRuntimeNode::TList& args) {
  2592. MKQL_ENSURE(2U == lambda.ChildrenSize(), "Wide lambda isn't supported.");
  2593. TMkqlBuildContext::TArgumentsMap innerArguments;
  2594. innerArguments.reserve(args.size());
  2595. auto it = args.begin();
  2596. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  2597. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2598. return MkqlBuildExpr(lambda.Tail(), innerCtx);
  2599. }
  2600. TRuntimeNode::TList MkqlBuildWideLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const TRuntimeNode::TList& args) {
  2601. MKQL_ENSURE(0U < lambda.ChildrenSize(), "Empty lambda.");
  2602. TMkqlBuildContext::TArgumentsMap innerArguments;
  2603. innerArguments.reserve(args.size());
  2604. auto it = args.begin();
  2605. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  2606. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2607. TRuntimeNode::TList result;
  2608. result.reserve(lambda.ChildrenSize() - 1U);
  2609. for (ui32 i = 1U; i < lambda.ChildrenSize(); ++i)
  2610. result.emplace_back(MkqlBuildExpr(*lambda.Child(i), innerCtx));
  2611. return result;
  2612. }
  2613. TRuntimeNode MkqlBuildExpr(const TExprNode& node, TMkqlBuildContext& ctx) {
  2614. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  2615. const auto knownNode = currCtx->Memoization.find(&node);
  2616. if (currCtx->Memoization.cend() != knownNode) {
  2617. return knownNode->second;
  2618. }
  2619. }
  2620. switch (const auto type = node.Type()) {
  2621. case TExprNode::List:
  2622. return CheckTypeAndMemoize(node, ctx, ctx.ProgramBuilder.NewTuple(GetAllArguments(node, ctx)));
  2623. case TExprNode::Callable:
  2624. return CheckTypeAndMemoize(node, ctx, ctx.MkqlCompiler.GetCallable(node.Content())(node, ctx));
  2625. case TExprNode::Argument:
  2626. ythrow TNodeException(node) << "Unexpected argument: " << node.Content();
  2627. default:
  2628. ythrow TNodeException(node) << "Unexpected node type: " << type;
  2629. }
  2630. }
  2631. } // namespace NCommon
  2632. } // namespace NYql