yql_provider_mkql.cpp 140 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089
  1. #include "yql_provider_mkql.h"
  2. #include "yql_type_mkql.h"
  3. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  4. #include <yql/essentials/core/yql_expr_type_annotation.h>
  5. #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
  6. #include <yql/essentials/core/yql_expr_type_annotation.h>
  7. #include <yql/essentials/core/yql_match_recognize.h>
  8. #include <yql/essentials/core/yql_join.h>
  9. #include <yql/essentials/core/yql_opt_utils.h>
  10. #include <yql/essentials/minikql/mkql_node.h>
  11. #include <yql/essentials/minikql/mkql_node_cast.h>
  12. #include <yql/essentials/minikql/mkql_program_builder.h>
  13. #include <yql/essentials/minikql/mkql_runtime_version.h>
  14. #include <yql/essentials/minikql/mkql_type_ops.h>
  15. #include <yql/essentials/public/decimal/yql_decimal.h>
  16. #include <yql/essentials/parser/pg_catalog/catalog.h>
  17. #include <util/stream/null.h>
  18. #include <util/string/cast.h>
  19. #include <array>
  20. using namespace NKikimr;
  21. using namespace NKikimr::NMiniKQL;
  22. namespace NYql {
  23. namespace NCommon {
  24. TRuntimeNode WideTopImpl(const TExprNode& node, TMkqlBuildContext& ctx,
  25. TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) {
  26. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  27. const auto count = MkqlBuildExpr(*node.Child(1U), ctx);
  28. std::vector<std::pair<ui32, TRuntimeNode>> directions;
  29. directions.reserve(node.Tail().ChildrenSize());
  30. node.Tail().ForEachChild([&](const TExprNode& dir) {
  31. directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
  32. });
  33. return (ctx.ProgramBuilder.*func)(flow, count, directions);
  34. }
  35. TRuntimeNode WideSortImpl(const TExprNode& node, TMkqlBuildContext& ctx,
  36. TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) {
  37. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  38. std::vector<std::pair<ui32, TRuntimeNode>> directions;
  39. directions.reserve(node.Tail().ChildrenSize());
  40. node.Tail().ForEachChild([&](const TExprNode& dir) {
  41. directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
  42. });
  43. return (ctx.ProgramBuilder.*func)(flow, directions);
  44. }
  45. TRuntimeNode CombineByKeyImpl(const TExprNode& node, TMkqlBuildContext& ctx) {
  46. NNodes::TCoCombineByKey combine(&node);
  47. const bool isStreamOrFlow = combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream ||
  48. combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow;
  49. YQL_ENSURE(!isStreamOrFlow);
  50. const auto input = MkqlBuildExpr(combine.Input().Ref(), ctx);
  51. TRuntimeNode preMapList = ctx.ProgramBuilder.FlatMap(input, [&](TRuntimeNode item) {
  52. return MkqlBuildLambda(combine.PreMapLambda().Ref(), ctx, {item});
  53. });
  54. const auto dict = ctx.ProgramBuilder.ToHashedDict(preMapList, true, [&](TRuntimeNode item) {
  55. return MkqlBuildLambda(combine.KeySelectorLambda().Ref(), ctx, {item});
  56. }, [&](TRuntimeNode item) {
  57. return item;
  58. });
  59. const auto values = ctx.ProgramBuilder.DictItems(dict);
  60. return ctx.ProgramBuilder.FlatMap(values, [&](TRuntimeNode item) {
  61. auto key = ctx.ProgramBuilder.Nth(item, 0);
  62. auto payloadList = ctx.ProgramBuilder.Nth(item, 1);
  63. auto fold1 = ctx.ProgramBuilder.Fold1(payloadList, [&](TRuntimeNode item2) {
  64. return MkqlBuildLambda(combine.InitHandlerLambda().Ref(), ctx, {key, item2});
  65. }, [&](TRuntimeNode item2, TRuntimeNode state) {
  66. return MkqlBuildLambda(combine.UpdateHandlerLambda().Ref(), ctx, {key, item2, state});
  67. });
  68. auto res = ctx.ProgramBuilder.FlatMap(fold1, [&](TRuntimeNode state) {
  69. return MkqlBuildLambda(combine.FinishHandlerLambda().Ref(), ctx, {key, state});
  70. });
  71. return res;
  72. });
  73. }
  74. namespace {
  75. std::array<TRuntimeNode, 2U> MkqlBuildSplitLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const std::initializer_list<TRuntimeNode>& args) {
  76. TMkqlBuildContext::TArgumentsMap innerArguments;
  77. innerArguments.reserve(args.size());
  78. auto it = args.begin();
  79. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  80. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  81. const auto& body = lambda.Tail();
  82. MKQL_ENSURE(body.IsList() && body.ChildrenSize() == 2U, "Expected pair of nodes.");
  83. return {{MkqlBuildExpr(body.Head(), innerCtx), MkqlBuildExpr(body.Tail(), innerCtx)}};
  84. }
  85. TMkqlBuildContext* GetNodeContext(const TExprNode& node, TMkqlBuildContext& ctx) {
  86. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  87. const auto knownNode = currCtx->Memoization.find(&node);
  88. if (currCtx->Memoization.cend() != knownNode) {
  89. return currCtx;
  90. }
  91. }
  92. return nullptr;
  93. }
  94. TMkqlBuildContext* GetNodeContextByLambda(const TExprNode& node, TMkqlBuildContext& ctx) {
  95. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  96. if (currCtx->LambdaId == node.UniqueId()) {
  97. return currCtx;
  98. }
  99. }
  100. return nullptr;
  101. }
  102. TMkqlBuildContext* GetContextForMemoizeInUnknowScope(const TExprNode& node, TMkqlBuildContext& ctx) {
  103. TMkqlBuildContext* result = nullptr;
  104. for (const auto& c : node.Children()) {
  105. const auto& child = c->IsLambda() ? c->Tail() : *c;
  106. if (!child.IsAtom()) {
  107. auto nodeCtx = GetNodeContext(child, ctx);
  108. if (!nodeCtx) {
  109. nodeCtx = GetContextForMemoizeInUnknowScope(child, ctx);
  110. }
  111. if (!result || result->Level < nodeCtx->Level) {
  112. result = nodeCtx;
  113. if (result == &ctx) {
  114. break;
  115. }
  116. }
  117. }
  118. }
  119. if (!result) {
  120. for (result = &ctx; result->ParentCtx; result = result->ParentCtx)
  121. continue;
  122. }
  123. return result;
  124. }
  125. TMkqlBuildContext* GetContextForMemoize(const TExprNode& node, TMkqlBuildContext& ctx) {
  126. if (const auto scope = node.GetDependencyScope()) {
  127. if (const auto lambda = scope->second) {
  128. return GetNodeContextByLambda(*lambda, ctx);
  129. }
  130. } else {
  131. return GetContextForMemoizeInUnknowScope(node, ctx);
  132. }
  133. auto result = &ctx;
  134. while (result->ParentCtx) {
  135. result = result->ParentCtx;
  136. }
  137. return result;
  138. }
  139. const TRuntimeNode& CheckTypeAndMemoize(const TExprNode& node, TMkqlBuildContext& ctx, const TRuntimeNode& runtime) {
  140. if (node.GetTypeAnn()) {
  141. TNullOutput null;
  142. if (const auto type = BuildType(*node.GetTypeAnn(), ctx.ProgramBuilder, *ctx.TypeMemoization, null)) {
  143. if (!type->IsSameType(*runtime.GetStaticType())) {
  144. ythrow TNodeException(node) << "Expected: " << *type << " type, but got: " << *runtime.GetStaticType() << ".";
  145. }
  146. }
  147. }
  148. return GetContextForMemoize(node, ctx)->Memoization.emplace(&node, runtime).first->second;
  149. }
  150. std::vector<TRuntimeNode> GetAllArguments(const TExprNode& node, TMkqlBuildContext& ctx) {
  151. std::vector<TRuntimeNode> args;
  152. args.reserve(node.ChildrenSize());
  153. node.ForEachChild([&](const TExprNode& child){ args.emplace_back(MkqlBuildExpr(child, ctx)); });
  154. return args;
  155. }
  156. template <size_t From>
  157. std::vector<TRuntimeNode> GetArgumentsFrom(const TExprNode& node, TMkqlBuildContext& ctx) {
  158. std::vector<TRuntimeNode> args;
  159. args.reserve(node.ChildrenSize() - From);
  160. for (auto i = From; i < node.ChildrenSize(); ++i) {
  161. args.emplace_back(MkqlBuildExpr(*node.Child(i), ctx));
  162. }
  163. return args;
  164. }
  165. NUdf::TDataTypeId ParseDataType(const TExprNode& owner, const std::string_view& type) {
  166. if (const auto slot = NUdf::FindDataSlot(type)) {
  167. return NUdf::GetDataTypeInfo(*slot).TypeId;
  168. }
  169. ythrow TNodeException(owner) << "Unsupported data type: " << type;
  170. }
  171. EJoinKind GetJoinKind(const TExprNode& owner, const std::string_view& content) {
  172. if (content == "Inner") {
  173. return EJoinKind::Inner;
  174. }
  175. else if (content == "Left") {
  176. return EJoinKind::Left;
  177. }
  178. else if (content == "Right") {
  179. return EJoinKind::Right;
  180. }
  181. else if (content == "Full") {
  182. return EJoinKind::Full;
  183. }
  184. else if (content == "LeftOnly") {
  185. return EJoinKind::LeftOnly;
  186. }
  187. else if (content == "RightOnly") {
  188. return EJoinKind::RightOnly;
  189. }
  190. else if (content == "Exclusion") {
  191. return EJoinKind::Exclusion;
  192. }
  193. else if (content == "LeftSemi") {
  194. return EJoinKind::LeftSemi;
  195. }
  196. else if (content == "RightSemi") {
  197. return EJoinKind::RightSemi;
  198. }
  199. else if (content == "Cross") {
  200. return EJoinKind::Cross;
  201. }
  202. else {
  203. ythrow TNodeException(owner) << "Unexpected join kind: " << content;
  204. }
  205. }
  206. template<typename TLayout>
  207. std::pair<TLayout, ui16> CutTimezone(const std::string_view& atom) {
  208. const auto pos = atom.find(',');
  209. MKQL_ENSURE(std::string_view::npos != pos, "Expected two components.");
  210. return std::make_pair(::FromString<TLayout>(atom.substr(0, pos)), GetTimezoneId(atom.substr(pos + 1)));
  211. }
  212. } // namespace
  213. bool TMkqlCallableCompilerBase::HasCallable(const std::string_view& name) const {
  214. return Callables.contains(name);
  215. }
  216. void TMkqlCallableCompilerBase::AddCallable(const std::string_view& name, TCompiler compiler) {
  217. const auto result = Callables.emplace(TString(name), compiler);
  218. YQL_ENSURE(result.second, "Callable already exists: " << name);
  219. }
  220. void TMkqlCallableCompilerBase::AddCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  221. for (const auto& name : names) {
  222. AddCallable(name, compiler);
  223. }
  224. }
  225. void TMkqlCallableCompilerBase::ChainCallable(const std::string_view& name, TCompiler compiler) {
  226. auto prevCompiler = GetCallable(name);
  227. auto chainedCompiler = [compiler = std::move(compiler), prevCompiler = std::move(prevCompiler)](const TExprNode& node, TMkqlBuildContext& ctx) -> NKikimr::NMiniKQL::TRuntimeNode {
  228. if (auto res = compiler(node, ctx)) {
  229. return res;
  230. }
  231. return prevCompiler(node, ctx);
  232. };
  233. OverrideCallable(name, chainedCompiler);
  234. }
  235. void TMkqlCallableCompilerBase::ChainCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  236. for (const auto& name : names) {
  237. ChainCallable(name, compiler);
  238. }
  239. }
  240. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::UnaryFunctionMethod>>& callables) {
  241. for (const auto& callable : callables) {
  242. AddCallable(callable.first,
  243. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  244. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  245. return (ctx.ProgramBuilder.*method)(arg);
  246. }
  247. );
  248. }
  249. }
  250. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::BinaryFunctionMethod>>& callables) {
  251. for (const auto& callable : callables) {
  252. AddCallable(callable.first,
  253. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  254. const auto one = MkqlBuildExpr(node.Head(), ctx);
  255. const auto two = MkqlBuildExpr(node.Tail(), ctx);
  256. return (ctx.ProgramBuilder.*method)(one, two);
  257. }
  258. );
  259. }
  260. }
  261. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::TernaryFunctionMethod>>& callables) {
  262. for (const auto& callable : callables) {
  263. AddCallable(callable.first,
  264. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  265. const auto arg1 = MkqlBuildExpr(node.Head(), ctx);
  266. const auto arg2 = MkqlBuildExpr(*node.Child(1U), ctx);
  267. const auto arg3 = MkqlBuildExpr(node.Tail(), ctx);
  268. return (ctx.ProgramBuilder.*method)(arg1, arg2, arg3);
  269. }
  270. );
  271. }
  272. }
  273. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::ArrayFunctionMethod>>& callables) {
  274. for (const auto& callable : callables) {
  275. AddCallable(callable.first,
  276. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  277. const auto& args = GetAllArguments(node, ctx);
  278. return (ctx.ProgramBuilder.*method)(args);
  279. }
  280. );
  281. }
  282. }
  283. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::ProcessFunctionMethod>>& callables) {
  284. for (const auto& callable : callables) {
  285. AddCallable(callable.first,
  286. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  287. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  288. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(node.Tail(), ctx, {item}); };
  289. return (ctx.ProgramBuilder.*method)(arg, lambda);
  290. }
  291. );
  292. }
  293. }
  294. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::NarrowFunctionMethod>>& callables) {
  295. for (const auto& callable : callables) {
  296. AddCallable(callable.first,
  297. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  298. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  299. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildLambda(node.Tail(), ctx, items); };
  300. return (ctx.ProgramBuilder.*method)(arg, lambda);
  301. }
  302. );
  303. }
  304. }
  305. void TMkqlCallableCompilerBase::OverrideCallable(const std::string_view& name, TCompiler compiler) {
  306. const auto prevCompiler = Callables.find(name);
  307. YQL_ENSURE(Callables.cend() != prevCompiler, "Missed callable: " << name);
  308. prevCompiler->second = compiler;
  309. Callables[name] = compiler;
  310. }
  311. IMkqlCallableCompiler::TCompiler TMkqlCallableCompilerBase::GetCallable(const std::string_view& name) const {
  312. const auto compiler = Callables.find(name);
  313. YQL_ENSURE(Callables.cend() != compiler, "Missed callable: " << name);
  314. return compiler->second;
  315. }
  316. IMkqlCallableCompiler::TCompiler TMkqlCallableCompilerBase::FindCallable(const std::string_view& name) const {
  317. const auto compiler = Callables.find(name);
  318. return Callables.cend() != compiler ? compiler->second : IMkqlCallableCompiler::TCompiler();
  319. }
  320. bool TMkqlCommonCallableCompiler::HasCallable(const std::string_view& name) const {
  321. if (TMkqlCallableCompilerBase::HasCallable(name)) {
  322. return true;
  323. }
  324. return GetShared().HasCallable(name);
  325. }
  326. IMkqlCallableCompiler::TCompiler TMkqlCommonCallableCompiler::FindCallable(const std::string_view& name) const {
  327. if (const auto func = TMkqlCallableCompilerBase::FindCallable(name)) {
  328. return func;
  329. }
  330. return GetShared().FindCallable(name);
  331. }
  332. IMkqlCallableCompiler::TCompiler TMkqlCommonCallableCompiler::GetCallable(const std::string_view& name) const {
  333. if (const auto func = TMkqlCallableCompilerBase::FindCallable(name)) {
  334. return func;
  335. }
  336. return GetShared().GetCallable(name);
  337. }
  338. void TMkqlCommonCallableCompiler::OverrideCallable(const std::string_view& name, TCompiler compiler) {
  339. if (TMkqlCallableCompilerBase::HasCallable(name)) {
  340. TMkqlCallableCompilerBase::OverrideCallable(name, compiler);
  341. } else {
  342. YQL_ENSURE(GetShared().HasCallable(name));
  343. TMkqlCallableCompilerBase::AddCallable(name, compiler);
  344. }
  345. }
  346. void TMkqlCommonCallableCompiler::AddCallable(const std::string_view& name, TCompiler compiler) {
  347. YQL_ENSURE(!GetShared().HasCallable(name), "Compiler already set for callable: " << name);
  348. TMkqlCallableCompilerBase::AddCallable(name, compiler);
  349. }
  350. void TMkqlCommonCallableCompiler::AddCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  351. for (const auto& name : names) {
  352. AddCallable(name, compiler);
  353. }
  354. }
  355. TMkqlCommonCallableCompiler::TShared::TShared() {
  356. AddSimpleCallables({
  357. {"Abs", &TProgramBuilder::Abs},
  358. {"Plus", &TProgramBuilder::Plus},
  359. {"Minus", &TProgramBuilder::Minus},
  360. {"Inc", &TProgramBuilder::Increment},
  361. {"Dec", &TProgramBuilder::Decrement},
  362. {"Not", &TProgramBuilder::Not},
  363. {"BlockNot", &TProgramBuilder::BlockNot},
  364. {"BlockJust", &TProgramBuilder::BlockJust},
  365. {"BitNot", &TProgramBuilder::BitNot},
  366. {"Size", &TProgramBuilder::Size},
  367. {"Way", &TProgramBuilder::Way},
  368. {"VariantItem", &TProgramBuilder::VariantItem},
  369. {"CountBits", &TProgramBuilder::CountBits},
  370. {"Ascending", &TProgramBuilder::Ascending},
  371. {"Descending", &TProgramBuilder::Descending},
  372. {"ToOptional", &TProgramBuilder::Head},
  373. {"Head", &TProgramBuilder::Head},
  374. {"Last", &TProgramBuilder::Last},
  375. {"ToList", &TProgramBuilder::ToList},
  376. {"ToFlow", &TProgramBuilder::ToFlow},
  377. {"FromFlow", &TProgramBuilder::FromFlow},
  378. {"WideToBlocks", &TProgramBuilder::WideToBlocks},
  379. {"WideFromBlocks", &TProgramBuilder::WideFromBlocks},
  380. {"AsScalar", &TProgramBuilder::AsScalar},
  381. {"Just", &TProgramBuilder::NewOptional},
  382. {"Exists", &TProgramBuilder::Exists},
  383. {"BlockExists", &TProgramBuilder::BlockExists},
  384. {"Pickle", &TProgramBuilder::Pickle},
  385. {"StablePickle", &TProgramBuilder::StablePickle},
  386. {"Collect", &TProgramBuilder::Collect},
  387. {"Discard", &TProgramBuilder::Discard},
  388. {"LazyList", &TProgramBuilder::LazyList},
  389. {"ForwardList", &TProgramBuilder::ForwardList},
  390. {"Length", &TProgramBuilder::Length},
  391. {"HasItems", &TProgramBuilder::HasItems},
  392. {"Reverse", &TProgramBuilder::Reverse},
  393. {"ToIndexDict", &TProgramBuilder::ToIndexDict},
  394. {"ToString", &TProgramBuilder::ToString},
  395. {"ToBytes", &TProgramBuilder::ToBytes},
  396. {"AggrCountInit", &TProgramBuilder::AggrCountInit},
  397. {"NewMTRand", &TProgramBuilder::NewMTRand},
  398. {"NextMTRand", &TProgramBuilder::NextMTRand},
  399. {"TimezoneId", &TProgramBuilder::TimezoneId},
  400. {"TimezoneName", &TProgramBuilder::TimezoneName},
  401. {"RemoveTimezone", &TProgramBuilder::RemoveTimezone},
  402. {"DictItems", &TProgramBuilder::DictItems},
  403. {"DictKeys", &TProgramBuilder::DictKeys},
  404. {"DictPayloads", &TProgramBuilder::DictPayloads},
  405. {"QueuePop", &TProgramBuilder::QueuePop}
  406. });
  407. AddSimpleCallables({
  408. {"+", &TProgramBuilder::Add},
  409. {"-", &TProgramBuilder::Sub},
  410. {"*", &TProgramBuilder::Mul},
  411. {"/", &TProgramBuilder::Div},
  412. {"%", &TProgramBuilder::Mod},
  413. {"Add", &TProgramBuilder::Add},
  414. {"Sub", &TProgramBuilder::Sub},
  415. {"Mul", &TProgramBuilder::Mul},
  416. {"Div", &TProgramBuilder::Div},
  417. {"Mod", &TProgramBuilder::Mod},
  418. {"DecimalMul", &TProgramBuilder::DecimalMul},
  419. {"DecimalDiv", &TProgramBuilder::DecimalDiv},
  420. {"DecimalMod", &TProgramBuilder::DecimalMod},
  421. {"BlockDecimalMul", &TProgramBuilder::BlockDecimalMul},
  422. {"BlockDecimalDiv", &TProgramBuilder::BlockDecimalDiv},
  423. {"BlockDecimalMod", &TProgramBuilder::BlockDecimalMod},
  424. {"==", &TProgramBuilder::Equals},
  425. {"!=", &TProgramBuilder::NotEquals},
  426. {"<", &TProgramBuilder::Less},
  427. {"<=", &TProgramBuilder::LessOrEqual},
  428. {">", &TProgramBuilder::Greater},
  429. {">=", &TProgramBuilder::GreaterOrEqual},
  430. {"Equals", &TProgramBuilder::Equals},
  431. {"NotEquals", &TProgramBuilder::NotEquals},
  432. {"Less", &TProgramBuilder::Less},
  433. {"LessOrEqual", &TProgramBuilder::LessOrEqual},
  434. {"Greater", &TProgramBuilder::Greater},
  435. {"GreaterOrEqual", &TProgramBuilder::GreaterOrEqual},
  436. {"AggrEquals", &TProgramBuilder::AggrEquals},
  437. {"AggrNotEquals", &TProgramBuilder::AggrNotEquals},
  438. {"AggrLess", &TProgramBuilder::AggrLess},
  439. {"AggrLessOrEqual", &TProgramBuilder::AggrLessOrEqual},
  440. {"AggrGreater", &TProgramBuilder::AggrGreater},
  441. {"AggrGreaterOrEqual", &TProgramBuilder::AggrGreaterOrEqual},
  442. {"AggrMin", &TProgramBuilder::AggrMin},
  443. {"AggrMax", &TProgramBuilder::AggrMax},
  444. {"AggrAdd", &TProgramBuilder::AggrAdd},
  445. {"AggrCountUpdate", &TProgramBuilder::AggrCountUpdate},
  446. {"BitOr", &TProgramBuilder::BitOr},
  447. {"BitAnd", &TProgramBuilder::BitAnd},
  448. {"BitXor", &TProgramBuilder::BitXor},
  449. {"ShiftLeft", &TProgramBuilder::ShiftLeft},
  450. {"ShiftRight", &TProgramBuilder::ShiftRight},
  451. {"RotLeft", &TProgramBuilder::RotLeft},
  452. {"RotRight", &TProgramBuilder::RotRight},
  453. {"ListIf", &TProgramBuilder::ListIf},
  454. {"Concat", &TProgramBuilder::Concat},
  455. {"AggrConcat", &TProgramBuilder::AggrConcat},
  456. {"ByteAt", &TProgramBuilder::ByteAt},
  457. {"Nanvl", &TProgramBuilder::Nanvl},
  458. {"Skip", &TProgramBuilder::Skip},
  459. {"Take", &TProgramBuilder::Take},
  460. {"Limit", &TProgramBuilder::Take},
  461. {"WideTakeBlocks", &TProgramBuilder::WideTakeBlocks},
  462. {"WideSkipBlocks", &TProgramBuilder::WideSkipBlocks},
  463. {"BlockCoalesce", &TProgramBuilder::BlockCoalesce},
  464. {"ReplicateScalar", &TProgramBuilder::ReplicateScalar},
  465. {"BlockAnd", &TProgramBuilder::BlockAnd},
  466. {"BlockOr", &TProgramBuilder::BlockOr},
  467. {"BlockXor", &TProgramBuilder::BlockXor},
  468. {"Append", &TProgramBuilder::Append},
  469. {"Insert", &TProgramBuilder::Append},
  470. {"Prepend", &TProgramBuilder::Prepend},
  471. {"Lookup", &TProgramBuilder::Lookup},
  472. {"Contains", &TProgramBuilder::Contains},
  473. {"AddTimezone", &TProgramBuilder::AddTimezone},
  474. {"StartsWith", &TProgramBuilder::StartsWith},
  475. {"EndsWith", &TProgramBuilder::EndsWith},
  476. {"StringContains", &TProgramBuilder::StringContains},
  477. {"SqueezeToList", &TProgramBuilder::SqueezeToList},
  478. {"QueuePush", &TProgramBuilder::QueuePush}
  479. });
  480. AddSimpleCallables({
  481. {"Substring", &TProgramBuilder::Substring},
  482. {"Find", &TProgramBuilder::Find},
  483. {"RFind", &TProgramBuilder::RFind},
  484. {"ListFromRange", &TProgramBuilder::ListFromRange},
  485. {"PreserveStream", &TProgramBuilder::PreserveStream},
  486. {"BlockIf", &TProgramBuilder::BlockIf},
  487. });
  488. AddSimpleCallables({
  489. {"If", &TProgramBuilder::If},
  490. {"Or", &TProgramBuilder::Or},
  491. {"And", &TProgramBuilder::And},
  492. {"Xor", &TProgramBuilder::Xor},
  493. {"Min", &TProgramBuilder::Min},
  494. {"Max", &TProgramBuilder::Max},
  495. {"AsList", &TProgramBuilder::AsList},
  496. {"Extend", &TProgramBuilder::Extend},
  497. {"OrderedExtend", &TProgramBuilder::OrderedExtend},
  498. {"Zip", &TProgramBuilder::Zip},
  499. {"ZipAll", &TProgramBuilder::ZipAll},
  500. {"Random", &TProgramBuilder::Random},
  501. {"RandomNumber", &TProgramBuilder::RandomNumber},
  502. {"RandomUuid", &TProgramBuilder::RandomUuid},
  503. {"Now", &TProgramBuilder::Now},
  504. {"CurrentUtcDate", &TProgramBuilder::CurrentUtcDate},
  505. {"CurrentUtcDatetime", &TProgramBuilder::CurrentUtcDatetime},
  506. {"CurrentUtcTimestamp", &TProgramBuilder::CurrentUtcTimestamp},
  507. });
  508. AddSimpleCallables({
  509. {"Map", &TProgramBuilder::Map},
  510. {"OrderedMap", &TProgramBuilder::OrderedMap},
  511. {"FlatMap", &TProgramBuilder::FlatMap},
  512. {"OrderedFlatMap", &TProgramBuilder::OrderedFlatMap},
  513. {"SkipWhile", &TProgramBuilder::SkipWhile},
  514. {"TakeWhile", &TProgramBuilder::TakeWhile},
  515. {"SkipWhileInclusive", &TProgramBuilder::SkipWhileInclusive},
  516. {"TakeWhileInclusive", &TProgramBuilder::TakeWhileInclusive},
  517. });
  518. AddSimpleCallables({
  519. {"NarrowMap", &TProgramBuilder::NarrowMap},
  520. {"NarrowFlatMap", &TProgramBuilder::NarrowFlatMap},
  521. {"WideSkipWhile", &TProgramBuilder::WideSkipWhile},
  522. {"WideTakeWhile", &TProgramBuilder::WideTakeWhile},
  523. {"WideSkipWhileInclusive", &TProgramBuilder::WideSkipWhileInclusive},
  524. {"WideTakeWhileInclusive", &TProgramBuilder::WideTakeWhileInclusive},
  525. });
  526. AddSimpleCallables({
  527. {"RangeUnion", &TProgramBuilder::RangeUnion},
  528. {"RangeIntersect", &TProgramBuilder::RangeIntersect},
  529. {"RangeMultiply", &TProgramBuilder::RangeMultiply},
  530. });
  531. AddSimpleCallables({
  532. {"RangeCreate", &TProgramBuilder::RangeCreate},
  533. {"RangeFinalize", &TProgramBuilder::RangeFinalize},
  534. });
  535. AddCallable({"RoundUp", "RoundDown"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  536. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  537. const auto dstType = ctx.BuildType(node.Tail(), *node.Tail().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  538. return ctx.ProgramBuilder.Round(node.Content(), arg, dstType);
  539. });
  540. AddSimpleCallables({
  541. {"NextValue", &TProgramBuilder::NextValue},
  542. });
  543. AddCallable({"MultiMap", "OrderedMultiMap"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  544. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  545. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildWideLambda(node.Tail(), ctx, {item}); };
  546. return ctx.ProgramBuilder.MultiMap(arg, lambda);
  547. });
  548. AddCallable("ExpandMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  549. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  550. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildWideLambda(node.Tail(), ctx, {item}); };
  551. return ctx.ProgramBuilder.ExpandMap(arg, lambda);
  552. });
  553. AddCallable("WideMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  554. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  555. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildWideLambda(node.Tail(), ctx, items); };
  556. TRuntimeNode result = ctx.ProgramBuilder.WideMap(arg, lambda);
  557. if (IsWideBlockType(*node.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType())) {
  558. result = ctx.ProgramBuilder.BlockExpandChunked(result);
  559. }
  560. return result;
  561. });
  562. AddCallable("WideChain1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  563. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  564. return ctx.ProgramBuilder.WideChain1Map(flow,
  565. [&](TRuntimeNode::TList items) {
  566. return MkqlBuildWideLambda(*node.Child(1), ctx, items);
  567. },
  568. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  569. items.insert(items.cend(), state.cbegin(), state.cend());
  570. return MkqlBuildWideLambda(node.Tail(), ctx, items);
  571. });
  572. });
  573. AddCallable("NarrowMultiMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  574. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  575. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildWideLambda(node.Tail(), ctx, items); };
  576. return ctx.ProgramBuilder.NarrowMultiMap(arg, lambda);
  577. });
  578. AddCallable("WideFilter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  579. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  580. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildLambda(*node.Child(1), ctx, items); };
  581. if (node.ChildrenSize() > 2U) {
  582. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  583. return ctx.ProgramBuilder.WideFilter(arg, limit, lambda);
  584. } else {
  585. return ctx.ProgramBuilder.WideFilter(arg, lambda);
  586. }
  587. });
  588. AddCallable("WideCondense1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  589. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  590. return ctx.ProgramBuilder.WideCondense1(flow,
  591. [&](TRuntimeNode::TList items) {
  592. return MkqlBuildWideLambda(*node.Child(1), ctx, items);
  593. },
  594. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  595. items.insert(items.cend(), state.cbegin(), state.cend());
  596. return MkqlBuildLambda(*node.Child(2), ctx, items);
  597. },
  598. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  599. items.insert(items.cend(), state.cbegin(), state.cend());
  600. return MkqlBuildWideLambda(*node.Child(3), ctx, items);
  601. },
  602. HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
  603. });
  604. AddCallable("WideCombiner", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  605. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  606. i64 memLimit = 0LL;
  607. const bool withLimit = TryFromString<i64>(node.Child(1U)->Content(), memLimit);
  608. const auto keyExtractor = [&](TRuntimeNode::TList items) {
  609. return MkqlBuildWideLambda(*node.Child(2U), ctx, items);
  610. };
  611. const auto init = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) {
  612. keys.insert(keys.cend(), items.cbegin(), items.cend());
  613. return MkqlBuildWideLambda(*node.Child(3U), ctx, keys);
  614. };
  615. const auto update = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items, TRuntimeNode::TList state) {
  616. keys.insert(keys.cend(), items.cbegin(), items.cend());
  617. keys.insert(keys.cend(), state.cbegin(), state.cend());
  618. return MkqlBuildWideLambda(*node.Child(4U), ctx, keys);
  619. };
  620. const auto finish = [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) {
  621. keys.insert(keys.cend(), state.cbegin(), state.cend());
  622. return MkqlBuildWideLambda(node.Tail(), ctx, keys);
  623. };
  624. bool isStatePersistable = true;
  625. // Traverse through childs skipping input and limit children
  626. for (size_t i = 2U; i < node.ChildrenSize(); ++i) {
  627. isStatePersistable = isStatePersistable && node.Child(i)->GetTypeAnn()->IsPersistable();
  628. }
  629. if (withLimit) {
  630. return ctx.ProgramBuilder.WideCombiner(flow, memLimit, keyExtractor, init, update, finish);
  631. }
  632. if (isStatePersistable && RuntimeVersion >= 49U) {
  633. return ctx.ProgramBuilder.WideLastCombinerWithSpilling(flow, keyExtractor, init, update, finish);
  634. }
  635. return ctx.ProgramBuilder.WideLastCombiner(flow, keyExtractor, init, update, finish);
  636. });
  637. AddCallable("WideChopper", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  638. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  639. const auto keyExtractor = [&](TRuntimeNode::TList items) {
  640. return MkqlBuildWideLambda(*node.Child(1U), ctx, items);
  641. };
  642. const auto groupSwitch = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) {
  643. keys.insert(keys.cend(), items.cbegin(), items.cend());
  644. return MkqlBuildLambda(*node.Child(2U), ctx, keys);
  645. };
  646. const auto handler = [&](TRuntimeNode::TList keys, TRuntimeNode flow) {
  647. keys.emplace_back(flow);
  648. return MkqlBuildLambda(node.Tail(), ctx, keys);
  649. };
  650. return ctx.ProgramBuilder.WideChopper(flow, keyExtractor, groupSwitch, handler);
  651. });
  652. AddCallable("WideTop", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  653. return WideTopImpl(node, ctx, &TProgramBuilder::WideTop);
  654. });
  655. AddCallable("WideTopSort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  656. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSort);
  657. });
  658. AddCallable("WideSort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  659. return WideSortImpl(node, ctx, &TProgramBuilder::WideSort);
  660. });
  661. AddCallable("WideTopBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  662. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopBlocks);
  663. });
  664. AddCallable("WideTopSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  665. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSortBlocks);
  666. });
  667. AddCallable("WideSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  668. return WideSortImpl(node, ctx, &TProgramBuilder::WideSortBlocks);
  669. });
  670. AddCallable("Iterable", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  671. const auto lambda = [&]() { return MkqlBuildLambda(node.Head(), ctx, {}); };
  672. return ctx.ProgramBuilder.Iterable(lambda);
  673. });
  674. AddCallable("Filter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  675. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  676. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(*node.Child(1), ctx, {item}); };
  677. if (node.ChildrenSize() > 2U) {
  678. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  679. return ctx.ProgramBuilder.Filter(arg, limit, lambda);
  680. } else {
  681. return ctx.ProgramBuilder.Filter(arg, lambda);
  682. }
  683. });
  684. AddCallable("OrderedFilter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  685. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  686. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(*node.Child(1), ctx, {item}); };
  687. if (node.ChildrenSize() > 2U) {
  688. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  689. return ctx.ProgramBuilder.OrderedFilter(arg, limit, lambda);
  690. } else {
  691. return ctx.ProgramBuilder.OrderedFilter(arg, lambda);
  692. }
  693. });
  694. AddCallable("Member", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  695. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  696. const auto name = node.Tail().Content();
  697. return ctx.ProgramBuilder.Member(structObj, name);
  698. });
  699. AddCallable("RemoveMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  700. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  701. const auto name = node.Tail().Content();
  702. return ctx.ProgramBuilder.RemoveMember(structObj, name, false);
  703. });
  704. AddCallable("ForceRemoveMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  705. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  706. const auto name = node.Tail().Content();
  707. return ctx.ProgramBuilder.RemoveMember(structObj, name, true);
  708. });
  709. AddCallable("Nth", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  710. const auto tupleObj = MkqlBuildExpr(node.Head(), ctx);
  711. const auto index = FromString<ui32>(node.Tail().Content());
  712. return ctx.ProgramBuilder.Nth(tupleObj, index);
  713. });
  714. AddCallable("MatchRecognizeCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  715. const auto& inputStream = node.Child(0);
  716. const auto& partitionKeySelector = node.Child(1);
  717. const auto& partitionColumns = node.Child(2);
  718. const auto& params = node.Child(3);
  719. const auto& settings = node.Child(4);
  720. //explore params
  721. const auto measures = params->Child(0);
  722. const auto skipTo = params->Child(2);
  723. const auto pattern = params->Child(3);
  724. const auto defines = params->Child(4);
  725. //explore measures
  726. const auto measureNames = measures->Child(2);
  727. constexpr size_t FirstMeasureLambdaIndex = 3;
  728. //explore defines
  729. const auto defineNames = defines->Child(2);
  730. const size_t FirstDefineLambdaIndex = 3;
  731. TVector<TStringBuf> partitionColumnNames;
  732. for (const auto& n: partitionColumns->Children()) {
  733. partitionColumnNames.push_back(n->Content());
  734. }
  735. TProgramBuilder::TUnaryLambda getPartitionKeySelector = [partitionKeySelector, &ctx](TRuntimeNode inputRowArg){
  736. return MkqlBuildLambda(*partitionKeySelector, ctx, {inputRowArg});
  737. };
  738. TVector<TStringBuf> defineVarNames(defineNames->ChildrenSize());
  739. TVector<TProgramBuilder::TTernaryLambda> getDefines(defineNames->ChildrenSize());
  740. for (size_t i = 0; i != defineNames->ChildrenSize(); ++i) {
  741. defineVarNames[i] = defineNames->Child(i)->Content();
  742. getDefines[i] = [i, defines, &ctx](TRuntimeNode data, TRuntimeNode matchedVars, TRuntimeNode rowIndex) {
  743. return MkqlBuildLambda(*defines->Child(FirstDefineLambdaIndex + i), ctx, {data, matchedVars, rowIndex});
  744. };
  745. }
  746. TVector<TStringBuf> measureColumnNames(measureNames->ChildrenSize());
  747. TVector<TProgramBuilder::TBinaryLambda> getMeasures(measureNames->ChildrenSize());
  748. for (size_t i = 0; i != measureNames->ChildrenSize(); ++i) {
  749. measureColumnNames[i] = measureNames->Child(i)->Content();
  750. getMeasures[i] = [i, measures, &ctx](TRuntimeNode data, TRuntimeNode matchedVars) {
  751. return MkqlBuildLambda(*measures->Child(FirstMeasureLambdaIndex + i), ctx, {data, matchedVars});
  752. };
  753. }
  754. auto stringTo = skipTo->Child(0)->Content();
  755. auto var = skipTo->Child(1)->Content();
  756. MKQL_ENSURE(stringTo.SkipPrefix("AfterMatchSkip_"), R"(MATCH_RECOGNIZE: <row pattern skip to> should start with "AfterMatchSkip_")");
  757. NYql::NMatchRecognize::EAfterMatchSkipTo to;
  758. MKQL_ENSURE(TryFromString<NYql::NMatchRecognize::EAfterMatchSkipTo>(stringTo, to), "MATCH_RECOGNIZE: <row pattern skip to> cannot parse AfterMatchSkipTo mode");
  759. auto rowsPerMatchString = params->Child(1)->Content();
  760. MKQL_ENSURE(rowsPerMatchString.SkipPrefix("RowsPerMatch_"), R"(MATCH_RECOGNIZE: <row pattern rows per match> should start with "RowsPerMatch_")");
  761. NYql::NMatchRecognize::ERowsPerMatch rowsPerMatch;
  762. MKQL_ENSURE(TryFromString<NYql::NMatchRecognize::ERowsPerMatch>(rowsPerMatchString, rowsPerMatch), "MATCH_RECOGNIZE: cannot parse RowsPerMatch mode");
  763. const auto streamingMode = FromString<bool>(settings->Child(0)->Child(1)->Content());
  764. return ctx.ProgramBuilder.MatchRecognizeCore(
  765. MkqlBuildExpr(*inputStream, ctx),
  766. getPartitionKeySelector,
  767. partitionColumnNames,
  768. measureColumnNames,
  769. getMeasures,
  770. NYql::NMatchRecognize::ConvertPattern(pattern, ctx.ExprCtx),
  771. defineVarNames,
  772. getDefines,
  773. streamingMode,
  774. NYql::NMatchRecognize::TAfterMatchSkipTo{to, TString{var}},
  775. rowsPerMatch
  776. );
  777. });
  778. AddCallable("TimeOrderRecover", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  779. const auto inputStream = node.Child(0);
  780. const auto timeExtractor = node.Child(1);
  781. const auto delay = node.Child(2);
  782. const auto ahead = node.Child(3);
  783. const auto rowLimit = node.Child(4);
  784. return ctx.ProgramBuilder.TimeOrderRecover(
  785. MkqlBuildExpr(*inputStream, ctx),
  786. [timeExtractor, &ctx](TRuntimeNode row) {
  787. return MkqlBuildLambda(*timeExtractor, ctx, {row});
  788. },
  789. MkqlBuildExpr(*delay, ctx),
  790. MkqlBuildExpr(*ahead, ctx),
  791. MkqlBuildExpr(*rowLimit, ctx)
  792. );
  793. });
  794. AddCallable("Guess", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  795. const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
  796. auto type = node.Head().GetTypeAnn();
  797. if (type->GetKind() == ETypeAnnotationKind::Optional) {
  798. type = type->Cast<TOptionalExprType>()->GetItemType();
  799. }
  800. auto varType = type->Cast<TVariantExprType>();
  801. if (varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
  802. auto index = FromString<ui32>(node.Child(1)->Content());
  803. return ctx.ProgramBuilder.Guess(variantObj, index);
  804. } else {
  805. return ctx.ProgramBuilder.Guess(variantObj, node.Child(1)->Content());
  806. }
  807. });
  808. AddCallable("Visit", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  809. const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
  810. const auto type = node.Head().GetTypeAnn()->Cast<TVariantExprType>();
  811. const TTupleExprType* tupleType = nullptr;
  812. const TStructExprType* structType = nullptr;
  813. std::vector<TExprNode*> lambdas;
  814. TRuntimeNode defaultValue;
  815. if (type->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
  816. tupleType = type->GetUnderlyingType()->Cast<TTupleExprType>();
  817. lambdas.resize(tupleType->GetSize());
  818. } else {
  819. structType = type->GetUnderlyingType()->Cast<TStructExprType>();
  820. lambdas.resize(structType->GetSize());
  821. }
  822. for (ui32 index = 1; index < node.ChildrenSize(); ++index) {
  823. const auto child = node.Child(index);
  824. if (!child->IsAtom()) {
  825. defaultValue = MkqlBuildExpr(*child, ctx);
  826. continue;
  827. }
  828. ui32 itemIndex;
  829. if (tupleType) {
  830. itemIndex = FromString<ui32>(child->Content());
  831. } else {
  832. itemIndex = *structType->FindItem(child->Content());
  833. }
  834. YQL_ENSURE(itemIndex < lambdas.size());
  835. ++index;
  836. lambdas[itemIndex] = node.Child(index);
  837. }
  838. const auto handler = [&](ui32 index, TRuntimeNode item) {
  839. if (const auto lambda = lambdas[index]) {
  840. return MkqlBuildLambda(*lambda, ctx, {item});
  841. }
  842. return defaultValue;
  843. };
  844. return ctx.ProgramBuilder.VisitAll(variantObj, handler);
  845. });
  846. AddCallable("CurrentActorId", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  847. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  848. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  849. return TRuntimeNode(call.Build(), false);
  850. });
  851. AddCallable("Uint8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  852. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui8>(node.Head(), NUdf::EDataSlot::Uint8));
  853. });
  854. AddCallable("Int8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  855. return ctx.ProgramBuilder.NewDataLiteral(FromString<i8>(node.Head(), NUdf::EDataSlot::Int8));
  856. });
  857. AddCallable("Uint16", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  858. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui16>(node.Head(), NUdf::EDataSlot::Uint16));
  859. });
  860. AddCallable("Int16", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  861. return ctx.ProgramBuilder.NewDataLiteral(FromString<i16>(node.Head(), NUdf::EDataSlot::Int16));
  862. });
  863. AddCallable("Int32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  864. return ctx.ProgramBuilder.NewDataLiteral(FromString<i32>(node.Head(), NUdf::EDataSlot::Int32));
  865. });
  866. AddCallable("Uint32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  867. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui32>(node.Head(), NUdf::EDataSlot::Uint32));
  868. });
  869. AddCallable("Int64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  870. return ctx.ProgramBuilder.NewDataLiteral(FromString<i64>(node.Head(), NUdf::EDataSlot::Int64));
  871. });
  872. AddCallable("Uint64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  873. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui64>(node.Head(), NUdf::EDataSlot::Uint64));
  874. });
  875. AddCallable("String", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  876. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content());
  877. });
  878. AddCallable("Utf8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  879. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Utf8>(node.Head().Content());
  880. });
  881. AddCallable("Yson", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  882. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(node.Head().Content());
  883. });
  884. AddCallable("Json", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  885. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Json>(node.Head().Content());
  886. });
  887. AddCallable("JsonDocument", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  888. // NOTE: ValueFromString returns TUnboxedValuePod. This type does not free string inside it during destruction.
  889. // To get smart pointer-like behaviour we convert TUnboxedValuePod to TUnboxedValue. Without this conversion there
  890. // will be a memory leak.
  891. NUdf::TUnboxedValue jsonDocument = ValueFromString(NUdf::EDataSlot::JsonDocument, node.Head().Content());
  892. MKQL_ENSURE(bool(jsonDocument), "Invalid JsonDocument literal");
  893. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::JsonDocument>(jsonDocument.AsStringRef());
  894. });
  895. AddCallable("Uuid", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  896. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Uuid>(node.Head().Content());
  897. });
  898. AddCallable("Decimal", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  899. const auto precision = FromString<ui8>(node.Child(1)->Content());
  900. const auto scale = FromString<ui8>(node.Child(2)->Content());
  901. MKQL_ENSURE(precision > 0, "Precision must be positive.");
  902. MKQL_ENSURE(scale <= precision, "Scale too large.");
  903. const auto data = NDecimal::FromString(node.Head().Content(), precision, scale);
  904. MKQL_ENSURE(!NDecimal::IsError(data), "Bad decimal.");
  905. return ctx.ProgramBuilder.NewDecimalLiteral(data, precision, scale);
  906. });
  907. AddCallable("Bool", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  908. return ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(node.Head(), NUdf::EDataSlot::Bool));
  909. });
  910. AddCallable("Float", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  911. return ctx.ProgramBuilder.NewDataLiteral(FromString<float>(node.Head(), NUdf::EDataSlot::Float));
  912. });
  913. AddCallable("Double", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  914. return ctx.ProgramBuilder.NewDataLiteral(FromString<double>(node.Head(), NUdf::EDataSlot::Double));
  915. });
  916. AddCallable("DyNumber", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  917. const NUdf::TUnboxedValue val = ValueFromString(NUdf::EDataSlot::DyNumber, node.Head().Content());
  918. MKQL_ENSURE(val, "Bad DyNumber: " << TString(node.Head().Content()).Quote());
  919. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::DyNumber>(val.AsStringRef());
  920. });
  921. AddCallable("Date", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  922. const auto value = FromString<ui16>(node.Head(), NUdf::EDataSlot::Date);
  923. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Date>(
  924. NUdf::TStringRef((const char*)&value, sizeof(value)));
  925. });
  926. AddCallable("Datetime", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  927. const auto value = FromString<ui32>(node.Head(), NUdf::EDataSlot::Datetime);
  928. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Datetime>(
  929. NUdf::TStringRef((const char*)&value, sizeof(value)));
  930. });
  931. AddCallable("Timestamp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  932. const auto value = FromString<ui64>(node.Head(), NUdf::EDataSlot::Timestamp);
  933. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Timestamp>(
  934. NUdf::TStringRef((const char*)&value, sizeof(value)));
  935. });
  936. AddCallable("Interval", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  937. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Interval);
  938. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(
  939. NUdf::TStringRef((const char*)&value, sizeof(value)));
  940. });
  941. AddCallable("TzDate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  942. const auto& parts = CutTimezone<ui16>(node.Head().Content());
  943. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDate>(parts.first, parts.second);
  944. });
  945. AddCallable("TzDatetime", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  946. const auto& parts = CutTimezone<ui32>(node.Head().Content());
  947. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDatetime>(parts.first, parts.second);
  948. });
  949. AddCallable("TzTimestamp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  950. const auto& parts = CutTimezone<ui64>(node.Head().Content());
  951. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzTimestamp>(parts.first, parts.second);
  952. });
  953. AddCallable("Date32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  954. const auto value = FromString<i32>(node.Head(), NUdf::EDataSlot::Date32);
  955. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Date32>(
  956. NUdf::TStringRef((const char*)&value, sizeof(value)));
  957. });
  958. AddCallable("Datetime64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  959. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Datetime64);
  960. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Datetime64>(
  961. NUdf::TStringRef((const char*)&value, sizeof(value)));
  962. });
  963. AddCallable("Timestamp64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  964. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Timestamp64);
  965. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Timestamp64>(
  966. NUdf::TStringRef((const char*)&value, sizeof(value)));
  967. });
  968. AddCallable("Interval64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  969. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Interval64);
  970. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Interval64>(
  971. NUdf::TStringRef((const char*)&value, sizeof(value)));
  972. });
  973. AddCallable("TzDate32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  974. const auto& parts = CutTimezone<i32>(node.Head().Content());
  975. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDate32>(parts.first, parts.second);
  976. });
  977. AddCallable("TzDatetime64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  978. const auto& parts = CutTimezone<i64>(node.Head().Content());
  979. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDatetime64>(parts.first, parts.second);
  980. });
  981. AddCallable("TzTimestamp64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  982. const auto& parts = CutTimezone<i64>(node.Head().Content());
  983. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzTimestamp64>(parts.first, parts.second);
  984. });
  985. AddCallable("FoldMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  986. const auto list = MkqlBuildExpr(node.Head(), ctx);
  987. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  988. return ctx.ProgramBuilder.ChainMap(list, state, [&](TRuntimeNode item, TRuntimeNode state) {
  989. return MkqlBuildSplitLambda(*node.Child(2), ctx, {item, state});
  990. });
  991. });
  992. AddCallable("Fold1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  993. const auto list = MkqlBuildExpr(node.Head(), ctx);
  994. return ctx.ProgramBuilder.Chain1Map(list, [&](TRuntimeNode item) {
  995. return MkqlBuildSplitLambda(*node.Child(1), ctx, {item});
  996. }, [&](TRuntimeNode item, TRuntimeNode state) {
  997. return MkqlBuildSplitLambda(*node.Child(2), ctx, {item, state});
  998. });
  999. });
  1000. AddCallable("Chain1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1001. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1002. return ctx.ProgramBuilder.Chain1Map(list,
  1003. [&](TRuntimeNode item) -> std::array<TRuntimeNode, 2U> {
  1004. const auto out = MkqlBuildLambda(*node.Child(1), ctx, {item});
  1005. return {{out, out}};
  1006. }, [&](TRuntimeNode item, TRuntimeNode state) -> std::array<TRuntimeNode, 2U> {
  1007. const auto out = MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1008. return {{out, out}};
  1009. });
  1010. });
  1011. AddCallable("Extract", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1012. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1013. const auto name = node.Tail().Content();
  1014. return ctx.ProgramBuilder.Extract(list, name);
  1015. });
  1016. AddCallable("OrderedExtract", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1017. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1018. const auto name = node.Child(1)->Content();
  1019. return ctx.ProgramBuilder.OrderedExtract(list, name);
  1020. });
  1021. AddCallable("Fold", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1022. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1023. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1024. return ctx.ProgramBuilder.Fold(list, state, [&](TRuntimeNode item, TRuntimeNode state) {
  1025. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1026. });
  1027. });
  1028. AddCallable("MapNext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1029. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1030. return ctx.ProgramBuilder.MapNext(list, [&](TRuntimeNode item, TRuntimeNode nextItem) {
  1031. return MkqlBuildLambda(node.Tail(), ctx, {item, nextItem});
  1032. });
  1033. });
  1034. AddCallable("Fold1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1035. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1036. return ctx.ProgramBuilder.Fold1(list, [&](TRuntimeNode item) {
  1037. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1038. }, [&](TRuntimeNode item, TRuntimeNode state) {
  1039. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1040. });
  1041. });
  1042. AddCallable("Condense", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1043. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1044. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1045. return ctx.ProgramBuilder.Condense(stream, state,
  1046. [&](TRuntimeNode item, TRuntimeNode state) {
  1047. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1048. },
  1049. [&](TRuntimeNode item, TRuntimeNode state) {
  1050. return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
  1051. }, HasContextFuncs(*node.Child(3)));
  1052. });
  1053. AddCallable("Condense1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1054. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1055. return ctx.ProgramBuilder.Condense1(stream,
  1056. [&](TRuntimeNode item) {
  1057. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1058. },
  1059. [&](TRuntimeNode item, TRuntimeNode state) {
  1060. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1061. },
  1062. [&](TRuntimeNode item, TRuntimeNode state) {
  1063. return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
  1064. }, HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
  1065. });
  1066. AddCallable("Squeeze", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1067. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1068. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1069. return ctx.ProgramBuilder.Squeeze(stream, state, [&](TRuntimeNode item, TRuntimeNode state) {
  1070. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1071. }, node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1072. return MkqlBuildLambda(*node.Child(3), ctx, {state});
  1073. }, node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1074. return MkqlBuildLambda(*node.Child(4), ctx, {state});
  1075. });
  1076. });
  1077. AddCallable("Squeeze1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1078. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1079. return ctx.ProgramBuilder.Squeeze1(stream, [&](TRuntimeNode item) {
  1080. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1081. }, [&](TRuntimeNode item, TRuntimeNode state) {
  1082. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1083. }, node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1084. return MkqlBuildLambda(*node.Child(3), ctx, {state});
  1085. }, node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1086. return MkqlBuildLambda(*node.Child(4), ctx, {state});
  1087. });
  1088. });
  1089. AddCallable("Sort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1090. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1091. const auto ascending = MkqlBuildExpr(*node.Child(1), ctx);
  1092. return ctx.ProgramBuilder.Sort(list, ascending, [&](TRuntimeNode item) {
  1093. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1094. });
  1095. });
  1096. AddCallable({"Top", "TopSort"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1097. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1098. const auto count = MkqlBuildExpr(*node.Child(1), ctx);
  1099. const auto ascending = MkqlBuildExpr(*node.Child(2), ctx);
  1100. return (ctx.ProgramBuilder.*(node.Content() == "Top" ? &TProgramBuilder::Top : &TProgramBuilder::TopSort))
  1101. (list, count, ascending, [&](TRuntimeNode item) {
  1102. return MkqlBuildLambda(*node.Child(3), ctx, {item});
  1103. });
  1104. });
  1105. AddCallable("KeepTop", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1106. const auto count = MkqlBuildExpr(node.Head(), ctx);
  1107. const auto list = MkqlBuildExpr(*node.Child(1), ctx);
  1108. const auto item = MkqlBuildExpr(*node.Child(2), ctx);
  1109. const auto ascending = MkqlBuildExpr(*node.Child(3), ctx);
  1110. return ctx.ProgramBuilder.KeepTop(count, list, item, ascending, [&](TRuntimeNode item) {
  1111. return MkqlBuildLambda(*node.Child(4), ctx, {item});
  1112. });
  1113. });
  1114. AddCallable("Struct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1115. const auto structType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1116. const auto verifiedStructType = AS_TYPE(TStructType, structType);
  1117. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  1118. members.reserve(verifiedStructType->GetMembersCount());
  1119. node.ForEachChild([&](const TExprNode& child) {
  1120. members.emplace_back(child.Head().Content(), MkqlBuildExpr(child.Tail(), ctx));
  1121. });
  1122. return ctx.ProgramBuilder.NewStruct(verifiedStructType, members);
  1123. });
  1124. AddCallable("AddMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1125. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  1126. const auto memberName = node.Child(1)->Content();
  1127. const auto value = MkqlBuildExpr(node.Tail(), ctx);
  1128. return ctx.ProgramBuilder.AddMember(structObj, memberName, value);
  1129. });
  1130. AddCallable("List", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1131. const auto listType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1132. const auto itemType = AS_TYPE(TListType, listType)->GetItemType();
  1133. const auto& items = GetArgumentsFrom<1U>(node, ctx);
  1134. return ctx.ProgramBuilder.NewList(itemType, items);
  1135. });
  1136. AddCallable("FromString", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1137. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1138. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1139. return ctx.ProgramBuilder.FromString(arg, type);
  1140. });
  1141. AddCallable("StrictFromString", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1142. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1143. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1144. return ctx.ProgramBuilder.StrictFromString(arg, type);
  1145. });
  1146. AddCallable("FromBytes", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1147. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1148. const auto type = ctx.BuildType(node, *node.GetTypeAnn()->Cast<TOptionalExprType>()->GetItemType());
  1149. return ctx.ProgramBuilder.FromBytes(arg, type);
  1150. });
  1151. AddCallable("Convert", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1152. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1153. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1154. return ctx.ProgramBuilder.Convert(arg, type);
  1155. });
  1156. AddCallable("ToIntegral", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1157. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1158. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1159. return ctx.ProgramBuilder.ToIntegral(arg, type);
  1160. });
  1161. AddCallable("UnsafeTimestampCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1162. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1163. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1164. return ctx.ProgramBuilder.Convert(arg, type);
  1165. });
  1166. AddCallable("SafeCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1167. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1168. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1169. return ctx.ProgramBuilder.Cast(arg, type);
  1170. });
  1171. AddCallable("Default", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1172. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1173. return ctx.ProgramBuilder.Default(type);
  1174. });
  1175. AddCallable("Coalesce", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1176. auto ret = MkqlBuildExpr(node.Head(), ctx);
  1177. for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
  1178. auto value = MkqlBuildExpr(*node.Child(i), ctx);
  1179. ret = ctx.ProgramBuilder.Coalesce(ret, value);
  1180. }
  1181. return ret;
  1182. });
  1183. AddCallable("Unwrap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1184. const auto opt = MkqlBuildExpr(node.Head(), ctx);
  1185. const auto message = node.ChildrenSize() > 1 ? MkqlBuildExpr(node.Tail(), ctx) : ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  1186. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1187. return ctx.ProgramBuilder.Unwrap(opt, message, pos.File, pos.Row, pos.Column);
  1188. });
  1189. AddCallable("EmptyFrom", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1190. const auto type = ctx.BuildType(node.Head(), *node.GetTypeAnn());
  1191. switch (node.GetTypeAnn()->GetKind()) {
  1192. case ETypeAnnotationKind::Flow:
  1193. case ETypeAnnotationKind::Stream:
  1194. return ctx.ProgramBuilder.EmptyIterator(type);
  1195. case ETypeAnnotationKind::Optional:
  1196. return ctx.ProgramBuilder.NewEmptyOptional(type);
  1197. case ETypeAnnotationKind::List:
  1198. return ctx.ProgramBuilder.NewEmptyList(AS_TYPE(TListType, type)->GetItemType());
  1199. case ETypeAnnotationKind::Dict:
  1200. return ctx.ProgramBuilder.NewDict(type, {});
  1201. default:
  1202. ythrow TNodeException(node) << "Empty from " << *node.GetTypeAnn() << " isn't supported.";
  1203. }
  1204. });
  1205. AddCallable("Nothing", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1206. const auto optType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1207. return ctx.ProgramBuilder.NewEmptyOptional(optType);
  1208. });
  1209. AddCallable("Unpickle", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1210. const auto type = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1211. const auto serialized = MkqlBuildExpr(node.Tail(), ctx);
  1212. return ctx.ProgramBuilder.Unpickle(type, serialized);
  1213. });
  1214. AddCallable("Optional", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1215. const auto optType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1216. const auto arg = MkqlBuildExpr(node.Tail(), ctx);
  1217. return ctx.ProgramBuilder.NewOptional(optType, arg);
  1218. });
  1219. AddCallable("Iterator", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1220. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1221. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  1222. return ctx.ProgramBuilder.Iterator(arg, args);
  1223. });
  1224. AddCallable("EmptyIterator", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1225. const auto streamType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1226. return ctx.ProgramBuilder.EmptyIterator(streamType);
  1227. });
  1228. AddCallable("Switch", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1229. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1230. std::vector<TSwitchInput> inputs;
  1231. ui64 memoryLimitBytes = FromString<ui64>(node.Child(1)->Content());
  1232. ui32 offset = 0;
  1233. for (ui32 i = 2; i < node.ChildrenSize(); i += 2) {
  1234. TSwitchInput input;
  1235. for (auto& child : node.Child(i)->Children()) {
  1236. input.Indicies.push_back(FromString<ui32>(child->Content()));
  1237. }
  1238. const auto& lambda = *node.Child(i + 1);
  1239. const auto& lambdaArg = lambda.Head().Head();
  1240. auto outputStreams = 1;
  1241. const auto& streamItemType = GetSeqItemType(*lambda.Tail().GetTypeAnn());
  1242. if (streamItemType.GetKind() == ETypeAnnotationKind::Variant) {
  1243. outputStreams = streamItemType.Cast<TVariantExprType>()->GetUnderlyingType()->Cast<TTupleExprType>()->GetSize();
  1244. }
  1245. if (node.ChildrenSize() > 4 || outputStreams != 1) {
  1246. input.ResultVariantOffset = offset;
  1247. }
  1248. offset += outputStreams;
  1249. input.InputType = ctx.BuildType(lambdaArg, *lambdaArg.GetTypeAnn());
  1250. inputs.emplace_back(input);
  1251. }
  1252. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1253. return ctx.ProgramBuilder.Switch(stream, inputs, [&](ui32 index, TRuntimeNode item) -> TRuntimeNode {
  1254. return MkqlBuildLambda(*node.Child(2 + 2 * index + 1), ctx, {item});
  1255. }, memoryLimitBytes, returnType);
  1256. });
  1257. AddCallable("ToStream", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1258. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1259. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  1260. return ctx.ProgramBuilder.Iterator(ctx.ProgramBuilder.ToList(arg), args);
  1261. });
  1262. AddCallable(LeftName, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1263. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1264. return ctx.ProgramBuilder.Nth(arg, 0);
  1265. });
  1266. AddCallable(RightName, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1267. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1268. return ctx.ProgramBuilder.Nth(arg, 1);
  1269. });
  1270. AddCallable("FilterNullMembers", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1271. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1272. if (node.ChildrenSize() < 2U) {
  1273. return ctx.ProgramBuilder.FilterNullMembers(list);
  1274. } else {
  1275. std::vector<std::string_view> members;
  1276. members.reserve(node.Tail().ChildrenSize());
  1277. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Content()); });
  1278. return ctx.ProgramBuilder.FilterNullMembers(list, members);
  1279. }
  1280. });
  1281. AddCallable("SkipNullMembers", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1282. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1283. if (node.ChildrenSize() < 2U) {
  1284. return ctx.ProgramBuilder.SkipNullMembers(list);
  1285. } else {
  1286. std::vector<std::string_view> members;
  1287. members.reserve(node.Tail().ChildrenSize());
  1288. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Content()); });
  1289. return ctx.ProgramBuilder.SkipNullMembers(list, members);
  1290. }
  1291. });
  1292. AddCallable("FilterNullElements", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1293. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1294. if (node.ChildrenSize() < 2U) {
  1295. return ctx.ProgramBuilder.FilterNullElements(list);
  1296. } else {
  1297. std::vector<ui32> members;
  1298. members.reserve(node.Tail().ChildrenSize());
  1299. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(FromString<ui32>(child.Content())); });
  1300. return ctx.ProgramBuilder.FilterNullElements(list, members);
  1301. }
  1302. });
  1303. AddCallable("SkipNullElements", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1304. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1305. if (node.ChildrenSize() < 2U) {
  1306. return ctx.ProgramBuilder.SkipNullElements(list);
  1307. } else {
  1308. std::vector<ui32> members;
  1309. members.reserve(node.Tail().ChildrenSize());
  1310. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(FromString<ui32>(child.Content())); });
  1311. return ctx.ProgramBuilder.SkipNullElements(list, members);
  1312. }
  1313. });
  1314. AddCallable("MapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1315. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1316. const auto dict = MkqlBuildExpr(*node.Child(1), ctx);
  1317. const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
  1318. const auto& outputItemType = GetSeqItemType(*node.GetTypeAnn());
  1319. auto rightItemType = node.Child(1U)->GetTypeAnn()->Cast<TDictExprType>()->GetPayloadType();
  1320. if (ETypeAnnotationKind::List == rightItemType->GetKind()) {
  1321. rightItemType = rightItemType->Cast<TListExprType>()->GetItemType();
  1322. }
  1323. std::vector<ui32> leftKeyColumns, leftRenames, rightRenames;
  1324. switch (const auto& inputItemType = GetSeqItemType(*node.Head().GetTypeAnn()); inputItemType.GetKind()) {
  1325. case ETypeAnnotationKind::Struct: {
  1326. const auto inputStructType = inputItemType.Cast<TStructExprType>();
  1327. const auto outputStructType = outputItemType.Cast<TStructExprType>();
  1328. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1329. bool s = false;
  1330. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputStructType : *outputStructType, child.Content())); });
  1331. switch (rightItemType->GetKind()) {
  1332. case ETypeAnnotationKind::Struct: {
  1333. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1334. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1335. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightStructType : *outputStructType, child.Content())); });
  1336. }
  1337. break;
  1338. case ETypeAnnotationKind::Tuple: {
  1339. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1340. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1341. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightTupleType, child.Content()) : *GetFieldPosition(*outputStructType, child.Content())); });
  1342. }
  1343. break;
  1344. default:
  1345. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1346. }
  1347. break;
  1348. }
  1349. case ETypeAnnotationKind::Tuple: {
  1350. const auto inputTupleType = inputItemType.Cast<TTupleExprType>();
  1351. const auto outputTupleType = outputItemType.Cast<TTupleExprType>();
  1352. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1353. bool s = false;
  1354. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputTupleType : *outputTupleType, child.Content())); });
  1355. switch (rightItemType->GetKind()) {
  1356. case ETypeAnnotationKind::Tuple: {
  1357. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1358. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1359. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); });
  1360. }
  1361. break;
  1362. case ETypeAnnotationKind::Struct: {
  1363. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1364. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1365. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightStructType, child.Content()) : *GetFieldPosition(*outputTupleType, child.Content())); });
  1366. }
  1367. break;
  1368. default:
  1369. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1370. }
  1371. break;
  1372. }
  1373. case ETypeAnnotationKind::Multi: {
  1374. const auto inputMultiType = inputItemType.Cast<TMultiExprType>();
  1375. const auto outputMultiType = outputItemType.Cast<TMultiExprType>();
  1376. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1377. bool s = false;
  1378. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputMultiType : *outputMultiType, child.Content())); });
  1379. switch (rightItemType->GetKind()) {
  1380. case ETypeAnnotationKind::Tuple: {
  1381. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1382. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1383. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightTupleType, child.Content()) : *GetFieldPosition(*outputMultiType, child.Content())); });
  1384. }
  1385. break;
  1386. case ETypeAnnotationKind::Struct: {
  1387. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1388. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1389. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightStructType, child.Content()) : *GetFieldPosition(*outputMultiType, child.Content())); });
  1390. }
  1391. break;
  1392. default:
  1393. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1394. }
  1395. break;
  1396. }
  1397. default:
  1398. ythrow TNodeException(node) << "Wrong MapJoinCore input item type: " << inputItemType;
  1399. }
  1400. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1401. return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType);
  1402. });
  1403. AddCallable("BlockMapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1404. const auto leftStream = MkqlBuildExpr(node.Head(), ctx);
  1405. const auto rightStream = MkqlBuildExpr(*node.Child(1), ctx);
  1406. const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
  1407. const auto leftItemType = node.Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
  1408. const auto rightItemType = node.Child(1U)->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
  1409. std::vector<ui32> leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops;
  1410. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); });
  1411. node.Child(4)->ForEachChild([&](const TExprNode& child){ leftKeyDrops.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); });
  1412. node.Child(5)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); });
  1413. node.Child(6)->ForEachChild([&](const TExprNode& child){ rightKeyDrops.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); });
  1414. bool rightAny = HasSetting(node.Tail(), "rightAny");
  1415. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1416. return ctx.ProgramBuilder.BlockMapJoinCore(leftStream, rightStream, joinKind, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, rightAny, returnType);
  1417. });
  1418. AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1419. bool selfJoin = node.Content() == "GraceSelfJoinCore";
  1420. int shift = selfJoin ? 0 : 1;
  1421. const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx);
  1422. const auto flowRight = MkqlBuildExpr(*node.Child(shift), ctx);
  1423. const auto joinKind = GetJoinKind(node, node.Child(shift + 1)->Content());
  1424. const auto& outputItemType = GetSeqItemType(*node.GetTypeAnn());
  1425. std::vector<ui32> leftKeyColumns, rightKeyColumns, leftRenames, rightRenames;
  1426. const auto& leftItemType = GetSeqItemType(*node.Child(0)->GetTypeAnn());
  1427. const auto& rightItemType = GetSeqItemType(*node.Child(shift)->GetTypeAnn());
  1428. if (leftItemType.GetKind() != ETypeAnnotationKind::Multi ||
  1429. rightItemType.GetKind() != ETypeAnnotationKind::Multi ) {
  1430. ythrow TNodeException(node) << "Wrong GraceJoinCore input item type: " << leftItemType << " " << rightItemType;
  1431. }
  1432. if (outputItemType.GetKind() != ETypeAnnotationKind::Multi ) {
  1433. ythrow TNodeException(node) << "Wrong GraceJoinCore output item type: " << outputItemType;
  1434. }
  1435. const auto leftTupleType = leftItemType.Cast<TMultiExprType>();
  1436. const auto rightTupleType = rightItemType.Cast<TMultiExprType>();
  1437. const auto outputTupleType = outputItemType.Cast<TMultiExprType>();
  1438. node.Child(shift + 2)->ForEachChild([&](TExprNode& child){
  1439. leftKeyColumns.emplace_back(*GetFieldPosition(*leftTupleType, child.Content()));
  1440. });
  1441. node.Child(shift + 3)->ForEachChild([&](TExprNode& child){
  1442. rightKeyColumns.emplace_back(*GetFieldPosition(*rightTupleType, child.Content())); });
  1443. bool s = false;
  1444. node.Child(shift + 4)->ForEachChild([&](TExprNode& child){
  1445. leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *leftTupleType : *outputTupleType, child.Content())); });
  1446. s = false;
  1447. node.Child(shift + 5)->ForEachChild([&](TExprNode& child){
  1448. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); });
  1449. auto anyJoinSettings = EAnyJoinSettings::None;
  1450. node.Tail().ForEachChild([&](const TExprNode& flag) {
  1451. if (flag.IsAtom("LeftAny"))
  1452. anyJoinSettings = EAnyJoinSettings::Right == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Left;
  1453. else if (flag.IsAtom("RightAny"))
  1454. anyJoinSettings = EAnyJoinSettings::Left == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Right;
  1455. });
  1456. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1457. return selfJoin
  1458. ? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
  1459. : ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
  1460. });
  1461. AddCallable("CommonJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1462. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1463. const auto joinKind = GetJoinKind(node, node.Child(1)->Content());
  1464. std::vector<ui32> leftColumns, rightColumns, requiredColumns, keyColumns;
  1465. ui32 tableIndexFieldPos;
  1466. switch (const auto& inputItemType = GetSeqItemType(*node.Head().GetTypeAnn()); inputItemType.GetKind()) {
  1467. case ETypeAnnotationKind::Struct: {
  1468. const auto inputStructType = inputItemType.Cast<TStructExprType>();
  1469. const auto outputStructType = GetSeqItemType(*node.GetTypeAnn()).Cast<TStructExprType>();
  1470. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1471. leftColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content()));
  1472. leftColumns.emplace_back(*GetFieldPosition(*outputStructType, child.Content()));
  1473. });
  1474. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1475. rightColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content()));
  1476. rightColumns.emplace_back(*GetFieldPosition(*outputStructType, child.Content()));
  1477. });
  1478. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1479. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1480. tableIndexFieldPos = *GetFieldPosition(*inputStructType, node.Tail().Content());
  1481. break;
  1482. }
  1483. case ETypeAnnotationKind::Tuple: {
  1484. const auto inputTupleType = inputItemType.Cast<TTupleExprType>();
  1485. ui32 i = 0U;
  1486. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1487. leftColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content()));
  1488. leftColumns.emplace_back(i++);
  1489. });
  1490. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1491. rightColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content()));
  1492. rightColumns.emplace_back(i++);
  1493. });
  1494. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1495. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1496. tableIndexFieldPos = *GetFieldPosition(*inputTupleType, node.Tail().Content());
  1497. break;
  1498. }
  1499. case ETypeAnnotationKind::Multi: {
  1500. const auto inputMultiType = inputItemType.Cast<TMultiExprType>();
  1501. ui32 i = 0U;
  1502. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1503. leftColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content()));
  1504. leftColumns.emplace_back(i++);
  1505. });
  1506. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1507. rightColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content()));
  1508. rightColumns.emplace_back(i++);
  1509. });
  1510. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1511. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1512. tableIndexFieldPos = *GetFieldPosition(*inputMultiType, node.Tail().Content());
  1513. break;
  1514. }
  1515. default:
  1516. ythrow TNodeException(node) << "Wrong CommonJoinCore input item type: " << inputItemType;
  1517. }
  1518. ui64 memLimit = 0U;
  1519. if (const auto memLimitSetting = GetSetting(*node.Child(6), "memLimit")) {
  1520. memLimit = FromString<ui64>(memLimitSetting->Child(1)->Content());
  1521. }
  1522. std::optional<ui32> sortedTableOrder;
  1523. if (const auto sortSetting = GetSetting(*node.Child(6), "sorted")) {
  1524. sortedTableOrder = sortSetting->Child(1)->Content() == "left" ? 0 : 1;
  1525. }
  1526. EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None;
  1527. if (const auto anyNode = GetSetting(*node.Child(6), "any")) {
  1528. for (auto sideNode : anyNode->Child(1)->Children()) {
  1529. YQL_ENSURE(sideNode->IsAtom());
  1530. AddAnyJoinSide(anyJoinSettings, sideNode->Content() == "left" ? EAnyJoinSettings::Left : EAnyJoinSettings::Right);
  1531. }
  1532. }
  1533. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  1534. return ctx.ProgramBuilder.CommonJoinCore(list, joinKind, leftColumns, rightColumns,
  1535. requiredColumns, keyColumns, memLimit, sortedTableOrder, anyJoinSettings, tableIndexFieldPos, returnType);
  1536. });
  1537. AddCallable("CombineCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1538. NNodes::TCoCombineCore core(&node);
  1539. const auto stream = MkqlBuildExpr(core.Input().Ref(), ctx);
  1540. const auto memLimit = NNodes::TCoCombineCore::idx_MemLimit < node.ChildrenSize() ?
  1541. FromString<ui64>(core.MemLimit().Cast().Value()) : 0;
  1542. const auto keyExtractor = [&](TRuntimeNode item) {
  1543. return MkqlBuildLambda(core.KeyExtractor().Ref(), ctx, {item});
  1544. };
  1545. const auto init = [&](TRuntimeNode key, TRuntimeNode item) {
  1546. return MkqlBuildLambda(core.InitHandler().Ref(), ctx, {key, item});
  1547. };
  1548. const auto update = [&](TRuntimeNode key, TRuntimeNode item, TRuntimeNode state) {
  1549. return MkqlBuildLambda(core.UpdateHandler().Ref(), ctx, {key, item, state});
  1550. };
  1551. const auto finish = [&](TRuntimeNode key, TRuntimeNode state) {
  1552. return MkqlBuildLambda(core.FinishHandler().Ref(), ctx, {key, state});
  1553. };
  1554. return ctx.ProgramBuilder.CombineCore(stream, keyExtractor, init, update, finish, memLimit);
  1555. });
  1556. AddCallable("GroupingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1557. NNodes::TCoGroupingCore core(&node);
  1558. const auto stream = MkqlBuildExpr(core.Input().Ref(), ctx);
  1559. const auto groupSwitch = [&](TRuntimeNode key, TRuntimeNode item) {
  1560. return MkqlBuildLambda(core.GroupSwitch().Ref(), ctx, {key, item});
  1561. };
  1562. const auto keyExtractor = [&](TRuntimeNode item) {
  1563. return MkqlBuildLambda(core.KeyExtractor().Ref(), ctx, {item});
  1564. };
  1565. TProgramBuilder::TUnaryLambda handler;
  1566. if (auto lambda = core.ConvertHandler()) {
  1567. handler = [&](TRuntimeNode item) {
  1568. return MkqlBuildLambda(core.ConvertHandler().Ref(), ctx, {item});
  1569. };
  1570. }
  1571. return ctx.ProgramBuilder.GroupingCore(stream, groupSwitch, keyExtractor, handler);
  1572. });
  1573. AddCallable("Chopper", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1574. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1575. const auto keyExtractor = [&](TRuntimeNode item) {
  1576. return MkqlBuildLambda(*node.Child(1U), ctx, {item});
  1577. };
  1578. const auto groupSwitch = [&](TRuntimeNode key, TRuntimeNode item) {
  1579. return MkqlBuildLambda(*node.Child(2U), ctx, {key, item});
  1580. };
  1581. const auto handler = [&](TRuntimeNode key, TRuntimeNode flow) {
  1582. return MkqlBuildLambda(node.Tail(), ctx, {key, flow});
  1583. };
  1584. return ctx.ProgramBuilder.Chopper(stream, keyExtractor, groupSwitch, handler);
  1585. });
  1586. AddCallable("HoppingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1587. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1588. const auto timeExtractor = [&](TRuntimeNode item) {
  1589. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1590. };
  1591. const auto hop = MkqlBuildExpr(*node.Child(2), ctx);
  1592. const auto interval = MkqlBuildExpr(*node.Child(3), ctx);
  1593. const auto delay = MkqlBuildExpr(*node.Child(4), ctx);
  1594. const auto init = [&](TRuntimeNode item) {
  1595. return MkqlBuildLambda(*node.Child(5), ctx, {item});
  1596. };
  1597. const auto update = [&](TRuntimeNode item, TRuntimeNode state) {
  1598. return MkqlBuildLambda(*node.Child(6), ctx, {item, state});
  1599. };
  1600. const auto save = node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1601. return MkqlBuildLambda(*node.Child(7), ctx, {state});
  1602. };
  1603. const auto load = node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1604. return MkqlBuildLambda(*node.Child(8), ctx, {state});
  1605. };
  1606. const auto merge = [&](TRuntimeNode state1, TRuntimeNode state2) {
  1607. return MkqlBuildLambda(*node.Child(9), ctx, {state1, state2});
  1608. };
  1609. const auto finish = [&](TRuntimeNode state, TRuntimeNode time) {
  1610. return MkqlBuildLambda(*node.Child(10), ctx, {state, time});
  1611. };
  1612. return ctx.ProgramBuilder.HoppingCore(
  1613. stream, timeExtractor, init, update, save, load, merge, finish, hop, interval, delay);
  1614. });
  1615. AddCallable("MultiHoppingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1616. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1617. const auto keyExtractor = [&](TRuntimeNode item) {
  1618. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1619. };
  1620. const auto timeExtractor = [&](TRuntimeNode item) {
  1621. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1622. };
  1623. const auto hop = MkqlBuildExpr(*node.Child(3), ctx);
  1624. const auto interval = MkqlBuildExpr(*node.Child(4), ctx);
  1625. const auto delay = MkqlBuildExpr(*node.Child(5), ctx);
  1626. const auto dataWatermarks = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(6), NUdf::EDataSlot::Bool));
  1627. const auto init = [&](TRuntimeNode item) {
  1628. return MkqlBuildLambda(*node.Child(7), ctx, {item});
  1629. };
  1630. const auto update = [&](TRuntimeNode item, TRuntimeNode state) {
  1631. return MkqlBuildLambda(*node.Child(8), ctx, {item, state});
  1632. };
  1633. const auto save = node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1634. return MkqlBuildLambda(*node.Child(9), ctx, {state});
  1635. };
  1636. const auto load = node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1637. return MkqlBuildLambda(*node.Child(10), ctx, {state});
  1638. };
  1639. const auto merge = [&](TRuntimeNode state1, TRuntimeNode state2) {
  1640. return MkqlBuildLambda(*node.Child(11), ctx, {state1, state2});
  1641. };
  1642. const auto finish = [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) {
  1643. return MkqlBuildLambda(*node.Child(12), ctx, {key, state, time});
  1644. };
  1645. const auto watermarksMode = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(13), NUdf::EDataSlot::Bool));
  1646. return ctx.ProgramBuilder.MultiHoppingCore(
  1647. stream, keyExtractor, timeExtractor, init, update, save, load, merge, finish,
  1648. hop, interval, delay, dataWatermarks, watermarksMode);
  1649. });
  1650. AddCallable("ToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1651. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1652. TMaybe<bool> isMany;
  1653. TMaybe<EDictType> type;
  1654. TMaybe<ui64> itemsCount;
  1655. bool isCompact;
  1656. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1657. ythrow TNodeException(node) << error->GetMessage();
  1658. }
  1659. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1660. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::ToHashedDict : &TProgramBuilder::ToSortedDict;
  1661. return (ctx.ProgramBuilder.*factory)(list, *isMany, [&](TRuntimeNode item) {
  1662. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1663. }, [&](TRuntimeNode item) {
  1664. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1665. }, isCompact, itemsCount.GetOrElse(0));
  1666. });
  1667. AddCallable("SqueezeToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1668. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1669. TMaybe<bool> isMany;
  1670. TMaybe<EDictType> type;
  1671. TMaybe<ui64> itemsCount;
  1672. bool isCompact;
  1673. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1674. ythrow TNodeException(node) << error->GetMessage();
  1675. }
  1676. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1677. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::SqueezeToHashedDict : &TProgramBuilder::SqueezeToSortedDict;
  1678. return (ctx.ProgramBuilder.*factory)(stream, *isMany, [&](TRuntimeNode item) {
  1679. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1680. }, [&](TRuntimeNode item) {
  1681. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1682. }, isCompact, itemsCount.GetOrElse(0));
  1683. });
  1684. AddCallable("NarrowSqueezeToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1685. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1686. TMaybe<bool> isMany;
  1687. TMaybe<EDictType> type;
  1688. TMaybe<ui64> itemsCount;
  1689. bool isCompact;
  1690. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1691. ythrow TNodeException(node) << error->GetMessage();
  1692. }
  1693. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1694. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::NarrowSqueezeToHashedDict : &TProgramBuilder::NarrowSqueezeToSortedDict;
  1695. return (ctx.ProgramBuilder.*factory)(stream, *isMany, [&](TRuntimeNode::TList items) {
  1696. return MkqlBuildLambda(*node.Child(1), ctx, items);
  1697. }, [&](TRuntimeNode::TList items) {
  1698. return MkqlBuildLambda(*node.Child(2), ctx, items);
  1699. }, isCompact, itemsCount.GetOrElse(0));
  1700. });
  1701. AddCallable("GroupByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1702. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1703. const auto dict = ctx.ProgramBuilder.ToHashedDict(list, true, [&](TRuntimeNode item) {
  1704. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1705. }, [&](TRuntimeNode item) {
  1706. return item;
  1707. });
  1708. const auto values = ctx.ProgramBuilder.DictItems(dict);
  1709. return ctx.ProgramBuilder.FlatMap(values, [&](TRuntimeNode item) {
  1710. const auto key = ctx.ProgramBuilder.Nth(item, 0);
  1711. const auto payloadList = ctx.ProgramBuilder.Nth(item, 1);
  1712. return MkqlBuildLambda(*node.Child(2), ctx, {key, payloadList});
  1713. });
  1714. });
  1715. AddCallable("PartitionByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1716. const NNodes::TCoPartitionByKey partition(&node);
  1717. const auto input = MkqlBuildExpr(partition.Input().Ref(), ctx);
  1718. const auto makePartitions = [&](TRuntimeNode list) {
  1719. return ctx.ProgramBuilder.Map(
  1720. ctx.ProgramBuilder.DictItems(ctx.ProgramBuilder.ToHashedDict(list, true,
  1721. [&](TRuntimeNode item) { return MkqlBuildLambda(partition.KeySelectorLambda().Ref(), ctx, {item}); },
  1722. [&](TRuntimeNode item) { return item; }
  1723. )),
  1724. [&](TRuntimeNode pair) {
  1725. const auto payload = partition.SortDirections().Ref().IsCallable("Void") ?
  1726. ctx.ProgramBuilder.Nth(pair, 1):
  1727. ctx.ProgramBuilder.Sort(ctx.ProgramBuilder.Nth(pair, 1), MkqlBuildExpr(partition.SortDirections().Ref(), ctx),
  1728. [&](TRuntimeNode item) {
  1729. return MkqlBuildLambda(partition.SortKeySelectorLambda().Ref(), ctx, {item});
  1730. }
  1731. );
  1732. return ctx.ProgramBuilder.NewTuple({ctx.ProgramBuilder.Nth(pair, 0), ctx.ProgramBuilder.Iterator(payload, {list})});
  1733. }
  1734. );
  1735. };
  1736. switch (const auto kind = partition.Ref().GetTypeAnn()->GetKind()) {
  1737. case ETypeAnnotationKind::Flow:
  1738. case ETypeAnnotationKind::Stream: {
  1739. const auto sorted = ctx.ProgramBuilder.FlatMap(
  1740. ctx.ProgramBuilder.Condense1(input,
  1741. [&](TRuntimeNode item) { return ctx.ProgramBuilder.AsList(item); },
  1742. [&](TRuntimeNode, TRuntimeNode) { return ctx.ProgramBuilder.NewDataLiteral(false); },
  1743. [&](TRuntimeNode item, TRuntimeNode state) { return ctx.ProgramBuilder.Append(state, item); }
  1744. ),
  1745. makePartitions
  1746. );
  1747. return ETypeAnnotationKind::Stream == kind ?MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {sorted}):
  1748. ctx.ProgramBuilder.ToFlow(MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {ctx.ProgramBuilder.FromFlow(sorted)}));
  1749. }
  1750. case ETypeAnnotationKind::List: {
  1751. const auto sorted = ctx.ProgramBuilder.Iterator(makePartitions(input), {});
  1752. return ctx.ProgramBuilder.Collect(MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {sorted}));
  1753. }
  1754. default: break;
  1755. }
  1756. Y_ABORT("Wrong case.");
  1757. });
  1758. AddCallable("CombineByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1759. return CombineByKeyImpl(node, ctx);
  1760. });
  1761. AddCallable("Enumerate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1762. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1763. TRuntimeNode start;
  1764. if (node.ChildrenSize() > 1) {
  1765. start = MkqlBuildExpr(*node.Child(1), ctx);
  1766. } else {
  1767. start = ctx.ProgramBuilder.NewDataLiteral<ui64>(0);
  1768. }
  1769. TRuntimeNode step;
  1770. if (node.ChildrenSize() > 2) {
  1771. step = MkqlBuildExpr(node.Tail(), ctx);
  1772. } else {
  1773. step = ctx.ProgramBuilder.NewDataLiteral<ui64>(1);
  1774. }
  1775. return ctx.ProgramBuilder.Enumerate(arg, start, step);
  1776. });
  1777. AddCallable("Dict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1778. const auto listType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1779. const auto dictType = AS_TYPE(TDictType, listType);
  1780. std::vector<std::pair<TRuntimeNode, TRuntimeNode>> items;
  1781. for (size_t i = 1; i < node.ChildrenSize(); ++i) {
  1782. const auto key = MkqlBuildExpr(node.Child(i)->Head(), ctx);
  1783. const auto payload = MkqlBuildExpr(node.Child(i)->Tail(), ctx);
  1784. items.emplace_back(key, payload);
  1785. }
  1786. return ctx.ProgramBuilder.NewDict(dictType, items);
  1787. });
  1788. AddCallable("Variant", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1789. const auto varType = node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TVariantExprType>();
  1790. const auto type = ctx.BuildType(*node.Child(2), *varType);
  1791. const auto item = MkqlBuildExpr(node.Head(), ctx);
  1792. return varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple ?
  1793. ctx.ProgramBuilder.NewVariant(item, FromString<ui32>(node.Child(1)->Content()), type) :
  1794. ctx.ProgramBuilder.NewVariant(item, node.Child(1)->Content(), type);
  1795. });
  1796. AddCallable("DynamicVariant", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1797. const auto varType = ctx.BuildType(*node.Child(2), *node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType());
  1798. const auto item = MkqlBuildExpr(node.Head(), ctx);
  1799. const auto index = MkqlBuildExpr(*node.Child(1), ctx);
  1800. return ctx.ProgramBuilder.DynamicVariant(item, index, varType);
  1801. });
  1802. AddCallable("AsStruct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1803. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  1804. members.reserve(node.ChildrenSize());
  1805. node.ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Head().Content(), MkqlBuildExpr(child.Tail(), ctx)); });
  1806. return ctx.ProgramBuilder.NewStruct(members);
  1807. });
  1808. AddCallable("AsDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1809. std::vector<std::pair<TRuntimeNode, TRuntimeNode>> items;
  1810. items.reserve(node.ChildrenSize());
  1811. node.ForEachChild([&](const TExprNode& child){ items.emplace_back(MkqlBuildExpr(*child.Child(0), ctx), MkqlBuildExpr(*child.Child(1), ctx)); });
  1812. const auto dictType = ctx.ProgramBuilder.NewDictType(items[0].first.GetStaticType(), items[0].second.GetStaticType(), false);
  1813. return ctx.ProgramBuilder.NewDict(dictType, items);
  1814. });
  1815. AddCallable("Ensure", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1816. const auto value = MkqlBuildExpr(node.Head(), ctx);
  1817. const auto predicate = MkqlBuildExpr(*node.Child(1), ctx);
  1818. const auto message = node.ChildrenSize() > 2 ? MkqlBuildExpr(node.Tail(), ctx) : ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  1819. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1820. return ctx.ProgramBuilder.Ensure(value, predicate, message, pos.File, pos.Row, pos.Column);
  1821. });
  1822. AddCallable("Replicate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1823. const auto value = MkqlBuildExpr(node.Head(), ctx);
  1824. const auto count = MkqlBuildExpr(*node.Child(1), ctx);
  1825. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1826. return ctx.ProgramBuilder.Replicate(value, count, pos.File, pos.Row, pos.Column);
  1827. });
  1828. AddCallable("IfPresent", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1829. TRuntimeNode::TList optionals;
  1830. const auto width = node.ChildrenSize() - 2U;
  1831. optionals.reserve(width);
  1832. auto i = 0U;
  1833. std::generate_n(std::back_inserter(optionals), width, [&](){ return MkqlBuildExpr(*node.Child(i++), ctx); });
  1834. const auto elseBranch = MkqlBuildExpr(node.Tail(), ctx);
  1835. return ctx.ProgramBuilder.IfPresent(optionals, [&](TRuntimeNode::TList items) {
  1836. return MkqlBuildLambda(*node.Child(width), ctx, items);
  1837. }, elseBranch);
  1838. });
  1839. AddCallable({"DataType",
  1840. "ListType",
  1841. "OptionalType",
  1842. "TupleType",
  1843. "StructType",
  1844. "DictType",
  1845. "VoidType",
  1846. "NullType",
  1847. "CallableType",
  1848. "UnitType",
  1849. "GenericType",
  1850. "ResourceType",
  1851. "TaggedType",
  1852. "VariantType",
  1853. "StreamType",
  1854. "FlowType",
  1855. "EmptyListType",
  1856. "EmptyDictType"},
  1857. [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1858. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1859. return TRuntimeNode(type, true);
  1860. });
  1861. AddCallable("ParseType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1862. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1863. return TRuntimeNode(type, true);
  1864. });
  1865. AddCallable("TypeOf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1866. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1867. return TRuntimeNode(type, true);
  1868. });
  1869. AddCallable("EmptyList", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1870. Y_UNUSED(node);
  1871. if (RuntimeVersion < 11) {
  1872. return ctx.ProgramBuilder.NewEmptyList(ctx.ProgramBuilder.NewVoid().GetStaticType());
  1873. } else {
  1874. return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyListLazy(), true);
  1875. }
  1876. });
  1877. AddCallable("EmptyDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1878. Y_UNUSED(node);
  1879. if (RuntimeVersion < 11) {
  1880. auto voidType = ctx.ProgramBuilder.NewVoid().GetStaticType();
  1881. auto dictType = ctx.ProgramBuilder.NewDictType(voidType, voidType, false);
  1882. return ctx.ProgramBuilder.NewDict(dictType, {});
  1883. } else {
  1884. return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyDictLazy(), true);
  1885. }
  1886. });
  1887. AddCallable("SourceOf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1888. const auto type = ctx.BuildType(node, *node.GetTypeAnn());
  1889. return ctx.ProgramBuilder.SourceOf(type);
  1890. });
  1891. AddCallable("TypeHandle", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1892. const auto type = node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  1893. const auto yson = WriteTypeToYson(type);
  1894. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1895. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1896. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(yson));
  1897. return TRuntimeNode(call.Build(), false);
  1898. });
  1899. AddCallable("ReprCode", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1900. const auto type = node.Head().GetTypeAnn();
  1901. const auto yson = WriteTypeToYson(type);
  1902. const auto& args = GetAllArguments(node, ctx);
  1903. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1904. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1905. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1906. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1907. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1908. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1909. for (auto arg : args) {
  1910. call.Add(arg);
  1911. }
  1912. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(yson));
  1913. return TRuntimeNode(call.Build(), false);
  1914. });
  1915. // safe and position unaware
  1916. AddCallable({
  1917. "SerializeTypeHandle",
  1918. "TypeKind",
  1919. "FormatCode",
  1920. "FormatCodeWithPositions",
  1921. "SerializeCode",
  1922. }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1923. const auto& args = GetAllArguments(node, ctx);
  1924. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1925. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1926. for (auto arg : args) {
  1927. call.Add(arg);
  1928. }
  1929. return TRuntimeNode(call.Build(), false);
  1930. });
  1931. // with position
  1932. AddCallable({
  1933. "ParseTypeHandle",
  1934. "DataTypeComponents",
  1935. "DataTypeHandle",
  1936. "OptionalItemType",
  1937. "OptionalTypeHandle",
  1938. "ListItemType",
  1939. "ListTypeHandle",
  1940. "StreamItemType",
  1941. "StreamTypeHandle",
  1942. "TupleTypeComponents",
  1943. "TupleTypeHandle",
  1944. "StructTypeComponents",
  1945. "StructTypeHandle",
  1946. "DictTypeComponents",
  1947. "DictTypeHandle",
  1948. "ResourceTypeTag",
  1949. "ResourceTypeHandle",
  1950. "TaggedTypeComponents",
  1951. "TaggedTypeHandle",
  1952. "VariantUnderlyingType",
  1953. "VariantTypeHandle",
  1954. "VoidTypeHandle",
  1955. "NullTypeHandle",
  1956. "EmptyListTypeHandle",
  1957. "EmptyDictTypeHandle",
  1958. "CallableTypeComponents",
  1959. "CallableArgument",
  1960. "CallableTypeHandle",
  1961. "PgTypeName",
  1962. "PgTypeHandle",
  1963. "WorldCode",
  1964. "AtomCode",
  1965. "ListCode",
  1966. "FuncCode",
  1967. }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1968. const auto& args = GetAllArguments(node, ctx);
  1969. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1970. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1971. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1972. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1973. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1974. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1975. for (auto arg : args) {
  1976. call.Add(arg);
  1977. }
  1978. return TRuntimeNode(call.Build(), false);
  1979. });
  1980. AddCallable("LambdaCode", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1981. const auto lambda = node.Child(node.ChildrenSize() - 1);
  1982. const auto retType = ctx.BuildType(node, *node.GetTypeAnn());
  1983. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1984. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1985. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1986. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1987. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1988. if (node.ChildrenSize() == 2) {
  1989. auto count = MkqlBuildExpr(node.Head(), ctx);
  1990. call.Add(count);
  1991. } else {
  1992. call.Add(ctx.ProgramBuilder.NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
  1993. }
  1994. TRuntimeNode body;
  1995. {
  1996. TMkqlBuildContext::TArgumentsMap innerArguments;
  1997. innerArguments.reserve(lambda->Head().ChildrenSize());
  1998. lambda->Head().ForEachChild([&](const TExprNode& argNode) {
  1999. const auto argType = ctx.BuildType(argNode, *argNode.GetTypeAnn());
  2000. const auto arg = ctx.ProgramBuilder.Arg(argType);
  2001. innerArguments.emplace(&argNode, arg);
  2002. call.Add(arg);
  2003. });
  2004. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda->UniqueId());
  2005. body = MkqlBuildExpr(*lambda->Child(1), innerCtx);
  2006. }
  2007. call.Add(body);
  2008. return TRuntimeNode(call.Build(), false);
  2009. });
  2010. AddCallable("FormatType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2011. TRuntimeNode str;
  2012. if (node.Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource) {
  2013. auto handle = MkqlBuildExpr(node.Head(), ctx);
  2014. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2015. call.Add(handle);
  2016. str = TRuntimeNode(call.Build(), false);
  2017. } else {
  2018. str = ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(FormatType(node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType()));
  2019. }
  2020. return str;
  2021. });
  2022. AddCallable("FormatTypeDiff", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2023. if (node.Child(0)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource) { // if we got resource + resource
  2024. YQL_ENSURE(node.Child(1)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource);
  2025. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2026. call.Add(MkqlBuildExpr(*node.Child(0), ctx));
  2027. call.Add(MkqlBuildExpr(*node.Child(1), ctx));
  2028. call.Add(ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(2), NUdf::EDataSlot::Bool)));
  2029. return TRuntimeNode(call.Build(), false);
  2030. } else { // if we got type + type
  2031. bool pretty = FromString<bool>(*node.Child(2), NUdf::EDataSlot::Bool);
  2032. const auto type_left = node.Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  2033. const auto type_right = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  2034. return pretty ? ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(NYql::GetTypePrettyDiff(*type_left, *type_right)) :
  2035. ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(NYql::GetTypeDiff(*type_left, *type_right));
  2036. }
  2037. });
  2038. AddCallable("Void", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2039. return ctx.ProgramBuilder.NewVoid();
  2040. });
  2041. AddCallable("Null", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2042. return ctx.ProgramBuilder.NewNull();
  2043. });
  2044. AddCallable({ "AsTagged","Untag" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2045. auto input = MkqlBuildExpr(node.Head(), ctx);
  2046. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2047. return ctx.ProgramBuilder.Nop(input, returnType);
  2048. });
  2049. AddCallable({"TableSource", "WideTableSource"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2050. return MkqlBuildExpr(node.Head(), ctx);
  2051. });
  2052. AddCallable({"WithWorld"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2053. return MkqlBuildExpr(node.Head(), ctx);
  2054. });
  2055. AddCallable("Error", [](const TExprNode& node, TMkqlBuildContext& ctx)->NKikimr::NMiniKQL::TRuntimeNode {
  2056. const auto err = node.GetTypeAnn()->Cast<TErrorExprType>()->GetError();
  2057. ythrow TNodeException(ctx.ExprCtx.AppendPosition(err.Position)) << err.GetMessage();
  2058. });
  2059. AddCallable("ErrorType", [](const TExprNode& node, TMkqlBuildContext& ctx)->NKikimr::NMiniKQL::TRuntimeNode {
  2060. const auto err = node.GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TErrorExprType>()->GetError();
  2061. ythrow TNodeException(ctx.ExprCtx.AppendPosition(err.Position)) << err.GetMessage();
  2062. });
  2063. AddCallable("Join", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2064. const auto list1 = MkqlBuildExpr(node.Head(), ctx);
  2065. const auto list2 = MkqlBuildExpr(*node.Child(1), ctx);
  2066. const auto dict1 = ctx.ProgramBuilder.ToHashedDict(list1, true, [&](TRuntimeNode item) {
  2067. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  2068. }, [&](TRuntimeNode item) {
  2069. return item;
  2070. });
  2071. const auto dict2 = ctx.ProgramBuilder.ToHashedDict(list2, true, [&](TRuntimeNode item) {
  2072. return MkqlBuildLambda(*node.Child(3), ctx, {item});
  2073. }, [&](TRuntimeNode item) {
  2074. return item;
  2075. });
  2076. const auto joinKind = GetJoinKind(node, node.Child(4)->Content());
  2077. return ctx.ProgramBuilder.JoinDict(dict1, true, dict2, true, joinKind);
  2078. });
  2079. AddCallable("JoinDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2080. const auto dict1 = MkqlBuildExpr(*node.Child(0), ctx);
  2081. const auto dict2 = MkqlBuildExpr(*node.Child(1), ctx);
  2082. const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
  2083. bool multi1 = true, multi2 = true;
  2084. if (node.ChildrenSize() > 3) {
  2085. node.Tail().ForEachChild([&](const TExprNode& flag){
  2086. if (const auto& content = flag.Content(); content == "LeftUnique")
  2087. multi1 = false;
  2088. else if ( content == "RightUnique")
  2089. multi2 = false;
  2090. });
  2091. }
  2092. return ctx.ProgramBuilder.JoinDict(dict1, multi1, dict2, multi2, joinKind);
  2093. });
  2094. AddCallable({"FilePath", "FileContent", "FolderPath"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2095. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2096. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content()));
  2097. return TRuntimeNode(call.Build(), false);
  2098. });
  2099. AddCallable("TablePath", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2100. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  2101. });
  2102. AddCallable("TableRecord", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2103. return ctx.ProgramBuilder.NewDataLiteral<ui64>(0);
  2104. });
  2105. AddCallable("Udf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2106. YQL_ENSURE(node.ChildrenSize() == 8);
  2107. std::string_view function = node.Head().Content();
  2108. const auto runConfig = MkqlBuildExpr(*node.Child(1), ctx);
  2109. const auto userType = ctx.BuildType(*node.Child(2), *node.Child(2)->GetTypeAnn());
  2110. const auto typeConfig = node.Child(3)->Content();
  2111. const auto callableType = ctx.BuildType(node, *node.GetTypeAnn());
  2112. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2113. return ctx.ProgramBuilder.TypedUdf(function, callableType, runConfig, userType, typeConfig,
  2114. pos.File, pos.Row, pos.Column);
  2115. });
  2116. AddCallable("ScriptUdf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2117. EScriptType scriptType = ScriptTypeFromStr(node.Head().Content());
  2118. if (scriptType == EScriptType::Unknown) {
  2119. ythrow TNodeException(node.Head())
  2120. << "Unknown script type '"
  2121. << node.Head().Content() << '\'';
  2122. }
  2123. std::string_view funcName = node.Child(1)->Content();
  2124. const auto typeNode = node.Child(2);
  2125. const auto funcType = ctx.BuildType(*typeNode, *typeNode->GetTypeAnn());
  2126. const auto script = MkqlBuildExpr(*node.Child(3), ctx);
  2127. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2128. return ctx.ProgramBuilder.ScriptUdf(node.Head().Content(), funcName, funcType, script,
  2129. pos.File, pos.Row, pos.Column);
  2130. });
  2131. AddCallable("Apply", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2132. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2133. const auto callable = MkqlBuildExpr(node.Head(), ctx);
  2134. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  2135. return ctx.ProgramBuilder.Apply(callable, args, pos.File, pos.Row, pos.Column);
  2136. });
  2137. AddCallable("NamedApply", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2138. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2139. const auto callable = MkqlBuildExpr(node.Head(), ctx);
  2140. const auto positionalArgs = MkqlBuildExpr(*node.Child(1), ctx);
  2141. const auto namedArgs = MkqlBuildExpr(*node.Child(2), ctx);
  2142. const auto dependentNodes = node.ChildrenSize() - 3;
  2143. const auto callableType = node.Head().GetTypeAnn()->Cast<TCallableExprType>();
  2144. const auto tupleType = node.Child(1)->GetTypeAnn()->Cast<TTupleExprType>();
  2145. const auto structType = node.Child(2)->GetTypeAnn()->Cast<TStructExprType>();
  2146. std::vector<TRuntimeNode> args(callableType->GetArgumentsSize() + dependentNodes);
  2147. for (size_t i = 0; i < tupleType->GetSize(); ++i) {
  2148. args[i] = node.Child(1)->IsList() ?
  2149. MkqlBuildExpr(*node.Child(1)->Child(i), ctx):
  2150. ctx.ProgramBuilder.Nth(positionalArgs, i);
  2151. }
  2152. for (size_t i = 0; i < structType->GetSize(); ++i) {
  2153. auto memberName = structType->GetItems()[i]->GetName();
  2154. auto index = callableType->ArgumentIndexByName(memberName);
  2155. if (!index || *index < tupleType->GetSize()) {
  2156. ythrow TNodeException(node.Child(2)) << "Wrong named argument: " << memberName;
  2157. }
  2158. TRuntimeNode arg;
  2159. if (node.Child(2)->IsCallable("AsStruct")) {
  2160. for (auto& child : node.Child(2)->Children()) {
  2161. if (child->Head().Content() == memberName) {
  2162. arg = MkqlBuildExpr(child->Tail(), ctx);
  2163. break;
  2164. }
  2165. }
  2166. if (!arg.GetNode()) {
  2167. ythrow TNodeException(node.Child(2)) << "Missing argument: " << memberName;
  2168. }
  2169. }
  2170. else {
  2171. arg = ctx.ProgramBuilder.Member(namedArgs, memberName);
  2172. }
  2173. args[*index] = arg;
  2174. }
  2175. for (ui32 i = tupleType->GetSize(); i < callableType->GetArgumentsSize(); ++i) {
  2176. auto& arg = args[i];
  2177. if (arg.GetNode()) {
  2178. continue;
  2179. }
  2180. auto mkqlType = ctx.BuildType(node, *callableType->GetArguments()[i].Type);
  2181. arg = ctx.ProgramBuilder.NewEmptyOptional(mkqlType);
  2182. }
  2183. for (ui32 i = 0; i < dependentNodes; ++i) {
  2184. args[callableType->GetArgumentsSize() + i] = MkqlBuildExpr(*node.Child(3 + i), ctx);
  2185. }
  2186. return ctx.ProgramBuilder.Apply(callable, args, pos.File, pos.Row, pos.Column, dependentNodes);
  2187. });
  2188. AddCallable("Callable", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2189. const auto callableType = ctx.BuildType(node.Head(), *node.Head().GetTypeAnn());
  2190. return ctx.ProgramBuilder.Callable(callableType, [&](const TArrayRef<const TRuntimeNode>& args) {
  2191. const auto& lambda = node.Tail();
  2192. TMkqlBuildContext::TArgumentsMap innerArguments;
  2193. innerArguments.reserve(lambda.Head().ChildrenSize());
  2194. MKQL_ENSURE(args.size() == lambda.Head().ChildrenSize(), "Mismatch of lambda arguments count");
  2195. auto it = args.cbegin();
  2196. lambda.Head().ForEachChild([&](const TExprNode& arg){ innerArguments.emplace(&arg, *it++); });
  2197. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2198. return MkqlBuildExpr(lambda.Tail(), innerCtx);
  2199. });
  2200. });
  2201. AddCallable("PgConst", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2202. auto type = AS_TYPE(TPgType, ctx.BuildType(node, *node.GetTypeAnn()));
  2203. TRuntimeNode typeMod;
  2204. if (node.ChildrenSize() >= 3) {
  2205. typeMod = MkqlBuildExpr(*node.Child(2), ctx);
  2206. }
  2207. auto typeMod1 = typeMod;
  2208. if (node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "interval" &&
  2209. node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "_interval") {
  2210. typeMod1 = TRuntimeNode();
  2211. }
  2212. auto ret = ctx.ProgramBuilder.PgConst(type, node.Head().Content(), typeMod1);
  2213. if (node.ChildrenSize() >= 3) {
  2214. return ctx.ProgramBuilder.PgCast(ret, type, typeMod);
  2215. } else {
  2216. return ret;
  2217. }
  2218. });
  2219. AddCallable("PgInternal0", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2220. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2221. return ctx.ProgramBuilder.PgInternal0(returnType);
  2222. });
  2223. AddCallable({"PgResolvedCall","PgResolvedCallCtx" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2224. auto name = node.Head().Content();
  2225. auto id = FromString<ui32>(node.Child(1)->Content());
  2226. std::vector<TRuntimeNode> args;
  2227. args.reserve(node.ChildrenSize() - 3);
  2228. for (ui32 i = 3; i < node.ChildrenSize(); ++i) {
  2229. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2230. }
  2231. bool rangeFunction = false;
  2232. for (const auto& child : node.Child(2)->Children()) {
  2233. if (child->Head().Content() == "range") {
  2234. rangeFunction = true;
  2235. }
  2236. }
  2237. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2238. return ctx.ProgramBuilder.PgResolvedCall(node.IsCallable("PgResolvedCallCtx"), name, id, args, returnType, rangeFunction);
  2239. });
  2240. AddCallable("PgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2241. auto operId = FromString<ui32>(node.Child(1)->Content());
  2242. auto procId = NPg::LookupOper(operId).ProcId;
  2243. auto procName = NPg::LookupProc(procId).Name;
  2244. std::vector<TRuntimeNode> args;
  2245. args.reserve(node.ChildrenSize() - 2);
  2246. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2247. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2248. }
  2249. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2250. return ctx.ProgramBuilder.PgResolvedCall(false, procName, procId, args, returnType, false);
  2251. });
  2252. AddCallable("BlockPgResolvedCall", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2253. auto name = node.Head().Content();
  2254. auto id = FromString<ui32>(node.Child(1)->Content());
  2255. std::vector<TRuntimeNode> args;
  2256. args.reserve(node.ChildrenSize() - 3);
  2257. for (ui32 i = 3; i < node.ChildrenSize(); ++i) {
  2258. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2259. }
  2260. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2261. return ctx.ProgramBuilder.BlockPgResolvedCall(name, id, args, returnType);
  2262. });
  2263. AddCallable("BlockPgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2264. auto operId = FromString<ui32>(node.Child(1)->Content());
  2265. auto procId = NPg::LookupOper(operId).ProcId;
  2266. auto procName = NPg::LookupProc(procId).Name;
  2267. std::vector<TRuntimeNode> args;
  2268. args.reserve(node.ChildrenSize() - 2);
  2269. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2270. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2271. }
  2272. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2273. return ctx.ProgramBuilder.BlockPgResolvedCall(procName, procId, args, returnType);
  2274. });
  2275. AddCallable("PgCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2276. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2277. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2278. TRuntimeNode typeMod;
  2279. if (node.ChildrenSize() >= 3) {
  2280. typeMod = MkqlBuildExpr(*node.Child(2), ctx);
  2281. }
  2282. auto typeMod1 = typeMod;
  2283. if (node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "interval" &&
  2284. node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "_interval") {
  2285. typeMod1 = TRuntimeNode();
  2286. }
  2287. if (node.Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Null) {
  2288. auto sourceTypeId = node.Head().GetTypeAnn()->Cast<TPgExprType>()->GetId();
  2289. auto targetTypeId = node.GetTypeAnn()->Cast<TPgExprType>()->GetId();
  2290. const auto& sourceTypeDesc = NPg::LookupType(sourceTypeId);
  2291. const auto& targetTypeDesc = NPg::LookupType(targetTypeId);
  2292. const bool isSourceArray = sourceTypeDesc.TypeId == sourceTypeDesc.ArrayTypeId;
  2293. const bool isTargetArray = targetTypeDesc.TypeId == targetTypeDesc.ArrayTypeId;
  2294. if (isSourceArray == isTargetArray && NPg::HasCast(
  2295. isSourceArray ? sourceTypeDesc.ElementTypeId : sourceTypeId,
  2296. isTargetArray ? targetTypeDesc.ElementTypeId : targetTypeId)) {
  2297. typeMod1 = typeMod;
  2298. }
  2299. }
  2300. auto cast = ctx.ProgramBuilder.PgCast(input, returnType, typeMod1);
  2301. if (node.ChildrenSize() >= 3) {
  2302. return ctx.ProgramBuilder.PgCast(cast, returnType, typeMod);
  2303. } else {
  2304. return cast;
  2305. }
  2306. });
  2307. AddCallable("FromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2308. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2309. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2310. return ctx.ProgramBuilder.FromPg(input, returnType);
  2311. });
  2312. AddCallable("ToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2313. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2314. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2315. return ctx.ProgramBuilder.ToPg(input, returnType);
  2316. });
  2317. AddCallable("BlockFromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2318. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2319. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2320. return ctx.ProgramBuilder.BlockFromPg(input, returnType);
  2321. });
  2322. AddCallable("BlockToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2323. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2324. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2325. return ctx.ProgramBuilder.BlockToPg(input, returnType);
  2326. });
  2327. AddCallable("PgClone", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2328. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2329. if (IsNull(node.Head())) {
  2330. return input;
  2331. }
  2332. if (NPg::LookupType(node.GetTypeAnn()->Cast<TPgExprType>()->GetId()).PassByValue) {
  2333. return input;
  2334. }
  2335. TVector<TRuntimeNode> dependentNodes;
  2336. for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
  2337. dependentNodes.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2338. }
  2339. return ctx.ProgramBuilder.PgClone(input, dependentNodes);
  2340. });
  2341. AddCallable("PgTableContent", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2342. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2343. return ctx.ProgramBuilder.PgTableContent(
  2344. node.Child(0)->Content(),
  2345. node.Child(1)->Content(),
  2346. returnType);
  2347. });
  2348. AddCallable("PgToRecord", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2349. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2350. TVector<std::pair<std::string_view, std::string_view>> members;
  2351. for (auto child : node.Child(1)->Children()) {
  2352. members.push_back({child->Head().Content(), child->Tail().Content()});
  2353. }
  2354. return ctx.ProgramBuilder.PgToRecord(input, members);
  2355. });
  2356. AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2357. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2358. return ctx.ProgramBuilder.WithContext(input, node.Child(1)->Content());
  2359. });
  2360. AddCallable("BlockFunc", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2361. TVector<TRuntimeNode> args;
  2362. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2363. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2364. }
  2365. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2366. return ctx.ProgramBuilder.BlockFunc(node.Child(0)->Content(), returnType, args);
  2367. });
  2368. AddCallable("BlockMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2369. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  2370. const auto name = node.Tail().Content();
  2371. return ctx.ProgramBuilder.BlockMember(structObj, name);
  2372. });
  2373. AddCallable("BlockNth", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2374. const auto tupleObj = MkqlBuildExpr(node.Head(), ctx);
  2375. const auto index = FromString<ui32>(node.Tail().Content());
  2376. return ctx.ProgramBuilder.BlockNth(tupleObj, index);
  2377. });
  2378. AddCallable("BlockAsStruct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2379. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  2380. for (const auto& x : node.Children()) {
  2381. members.emplace_back(x->Head().Content(), MkqlBuildExpr(x->Tail(), ctx));
  2382. }
  2383. return ctx.ProgramBuilder.BlockAsStruct(members);
  2384. });
  2385. AddCallable("BlockAsTuple", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2386. TVector<TRuntimeNode> args;
  2387. for (const auto& x : node.Children()) {
  2388. args.push_back(MkqlBuildExpr(*x, ctx));
  2389. }
  2390. return ctx.ProgramBuilder.BlockAsTuple(args);
  2391. });
  2392. AddCallable("BlockCombineAll", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2393. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2394. std::optional<ui32> filterColumn;
  2395. if (!node.Child(1)->IsCallable("Void")) {
  2396. filterColumn = FromString<ui32>(node.Child(1)->Content());
  2397. }
  2398. TVector<TAggInfo> aggs;
  2399. for (const auto& agg : node.Child(2)->Children()) {
  2400. TAggInfo info;
  2401. info.Name = TString(agg->Head().Head().Content());
  2402. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2403. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2404. }
  2405. aggs.push_back(info);
  2406. }
  2407. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2408. return ctx.ProgramBuilder.BlockCombineAll(arg, filterColumn, aggs, returnType);
  2409. });
  2410. AddCallable("BlockCombineHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2411. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2412. std::optional<ui32> filterColumn;
  2413. if (!node.Child(1)->IsCallable("Void")) {
  2414. filterColumn = FromString<ui32>(node.Child(1)->Content());
  2415. }
  2416. TVector<ui32> keys;
  2417. for (const auto& key : node.Child(2)->Children()) {
  2418. keys.push_back(FromString<ui32>(key->Content()));
  2419. }
  2420. TVector<TAggInfo> aggs;
  2421. for (const auto& agg : node.Child(3)->Children()) {
  2422. TAggInfo info;
  2423. info.Name = TString(agg->Head().Head().Content());
  2424. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2425. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2426. }
  2427. aggs.push_back(info);
  2428. }
  2429. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2430. return ctx.ProgramBuilder.BlockCombineHashed(arg, filterColumn, keys, aggs, returnType);
  2431. });
  2432. AddCallable("BlockMergeFinalizeHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2433. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2434. TVector<ui32> keys;
  2435. for (const auto& key : node.Child(1)->Children()) {
  2436. keys.push_back(FromString<ui32>(key->Content()));
  2437. }
  2438. TVector<TAggInfo> aggs;
  2439. for (const auto& agg : node.Child(2)->Children()) {
  2440. TAggInfo info;
  2441. info.Name = TString(agg->Head().Head().Content());
  2442. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2443. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2444. }
  2445. aggs.push_back(info);
  2446. }
  2447. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2448. return ctx.ProgramBuilder.BlockMergeFinalizeHashed(arg, keys, aggs, returnType);
  2449. });
  2450. AddCallable("BlockMergeManyFinalizeHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2451. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2452. TVector<ui32> keys;
  2453. for (const auto& key : node.Child(1)->Children()) {
  2454. keys.push_back(FromString<ui32>(key->Content()));
  2455. }
  2456. TVector<TAggInfo> aggs;
  2457. for (const auto& agg : node.Child(2)->Children()) {
  2458. TAggInfo info;
  2459. info.Name = TString(agg->Head().Head().Content());
  2460. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2461. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2462. }
  2463. aggs.push_back(info);
  2464. }
  2465. ui32 streamIndex = FromString<ui32>(node.Child(3)->Content());
  2466. TVector<TVector<ui32>> streams;
  2467. for (const auto& child : node.Child(4)->Children()) {
  2468. auto& stream = streams.emplace_back();
  2469. for (const auto& atom : child->Children()) {
  2470. stream.emplace_back(FromString<ui32>(atom->Content()));
  2471. }
  2472. }
  2473. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2474. return ctx.ProgramBuilder.BlockMergeManyFinalizeHashed(arg, keys, aggs, streamIndex, streams, returnType);
  2475. });
  2476. AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2477. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  2478. const auto index = FromString<ui32>(node.Child(1)->Content());
  2479. return ctx.ProgramBuilder.BlockCompress(flow, index);
  2480. });
  2481. AddCallable("PgArray", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2482. std::vector<TRuntimeNode> args;
  2483. args.reserve(node.ChildrenSize());
  2484. for (ui32 i = 0; i < node.ChildrenSize(); ++i) {
  2485. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2486. }
  2487. auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2488. return ctx.ProgramBuilder.PgArray(args, returnType);
  2489. });
  2490. AddCallable("QueueCreate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2491. const auto initCapacity = MkqlBuildExpr(*node.Child(1), ctx);
  2492. const auto initSize = MkqlBuildExpr(*node.Child(2), ctx);
  2493. const auto& args = GetArgumentsFrom<3U>(node, ctx);
  2494. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2495. return ctx.ProgramBuilder.QueueCreate(initCapacity, initSize, args, returnType);
  2496. });
  2497. AddCallable("QueuePeek", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2498. const auto resource = MkqlBuildExpr(node.Head(), ctx);
  2499. const auto index = MkqlBuildExpr(*node.Child(1), ctx);
  2500. const auto& args = GetArgumentsFrom<2U>(node, ctx);
  2501. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2502. return ctx.ProgramBuilder.QueuePeek(resource, index, args, returnType);
  2503. });
  2504. AddCallable("QueueRange", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2505. const auto resource = MkqlBuildExpr(node.Head(), ctx);
  2506. const auto begin = MkqlBuildExpr(*node.Child(1), ctx);
  2507. const auto end = MkqlBuildExpr(*node.Child(2), ctx);
  2508. const auto& args = GetArgumentsFrom<3U>(node, ctx);
  2509. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2510. return ctx.ProgramBuilder.QueueRange(resource, begin, end, args, returnType);
  2511. });
  2512. AddCallable("Seq", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2513. const auto& args = GetArgumentsFrom<0U>(node, ctx);
  2514. const auto returnType = ctx.BuildType(node, *node.GetTypeAnn());
  2515. return ctx.ProgramBuilder.Seq(args, returnType);
  2516. });
  2517. AddCallable("FromYsonSimpleType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2518. const auto input = MkqlBuildExpr(node.Head(), ctx);
  2519. const auto schemeType = ParseDataType(node, node.Child(1)->Content());
  2520. return ctx.ProgramBuilder.FromYsonSimpleType(input, schemeType);
  2521. });
  2522. AddCallable("TryWeakMemberFromDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2523. const auto other = MkqlBuildExpr(node.Head(), ctx);
  2524. const auto rest = MkqlBuildExpr(*node.Child(1), ctx);
  2525. const auto schemeType = ParseDataType(node, node.Child(2)->Content());
  2526. const auto member = node.Child(3)->Content();
  2527. return ctx.ProgramBuilder.TryWeakMemberFromDict(other, rest, schemeType, member);
  2528. });
  2529. AddCallable("DependsOn", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2530. return MkqlBuildExpr(node.Head(), ctx);
  2531. });
  2532. AddCallable("Parameter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2533. const NNodes::TCoParameter parameter(&node);
  2534. return ctx.ProgramBuilder.Member(ctx.Parameters, parameter.Name());
  2535. });
  2536. AddCallable("SecureParam", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2537. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content());
  2538. });
  2539. AddCallable(SkippableCallables, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2540. return MkqlBuildExpr(node.Head(), ctx);
  2541. });
  2542. AddCallable({ "AssumeStrict", "AssumeNonStrict", "Likely" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2543. return MkqlBuildExpr(node.Head(), ctx);
  2544. });
  2545. AddCallable("Merge", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2546. const auto& args = GetAllArguments(node, ctx);
  2547. auto extend = ctx.ProgramBuilder.Extend(args);
  2548. if (auto sortConstr = node.GetConstraint<TSortedConstraintNode>()) {
  2549. const auto input = MkqlBuildExpr(node.Head(), ctx);
  2550. const auto& content = sortConstr->GetContent();
  2551. std::vector<TRuntimeNode> ascending;
  2552. ascending.reserve(content.size());
  2553. for (const auto& c: content) {
  2554. ascending.push_back(ctx.ProgramBuilder.NewDataLiteral(c.second));
  2555. }
  2556. TProgramBuilder::TUnaryLambda keyExractor = [&](TRuntimeNode item) {
  2557. std::vector<TRuntimeNode> keys;
  2558. keys.reserve(content.size());
  2559. for (const auto& c : content) {
  2560. if (c.first.front().empty())
  2561. keys.push_back(item);
  2562. else {
  2563. MKQL_ENSURE(c.first.front().size() == 1U, "Just column expected.");
  2564. keys.push_back(ctx.ProgramBuilder.Member(item, c.first.front().front()));
  2565. }
  2566. }
  2567. return ctx.ProgramBuilder.NewTuple(keys);
  2568. };
  2569. return ctx.ProgramBuilder.Sort(extend, ctx.ProgramBuilder.NewTuple(ascending), keyExractor);
  2570. }
  2571. else {
  2572. return extend;
  2573. }
  2574. });
  2575. }
  2576. TRuntimeNode MkqlBuildLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const TRuntimeNode::TList& args) {
  2577. MKQL_ENSURE(2U == lambda.ChildrenSize(), "Wide lambda isn't supported.");
  2578. TMkqlBuildContext::TArgumentsMap innerArguments;
  2579. innerArguments.reserve(args.size());
  2580. auto it = args.begin();
  2581. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  2582. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2583. return MkqlBuildExpr(lambda.Tail(), innerCtx);
  2584. }
  2585. TRuntimeNode::TList MkqlBuildWideLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const TRuntimeNode::TList& args) {
  2586. MKQL_ENSURE(0U < lambda.ChildrenSize(), "Empty lambda.");
  2587. TMkqlBuildContext::TArgumentsMap innerArguments;
  2588. innerArguments.reserve(args.size());
  2589. auto it = args.begin();
  2590. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  2591. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2592. TRuntimeNode::TList result;
  2593. result.reserve(lambda.ChildrenSize() - 1U);
  2594. for (ui32 i = 1U; i < lambda.ChildrenSize(); ++i)
  2595. result.emplace_back(MkqlBuildExpr(*lambda.Child(i), innerCtx));
  2596. return result;
  2597. }
  2598. TRuntimeNode MkqlBuildExpr(const TExprNode& node, TMkqlBuildContext& ctx) {
  2599. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  2600. const auto knownNode = currCtx->Memoization.find(&node);
  2601. if (currCtx->Memoization.cend() != knownNode) {
  2602. return knownNode->second;
  2603. }
  2604. }
  2605. switch (const auto type = node.Type()) {
  2606. case TExprNode::List:
  2607. return CheckTypeAndMemoize(node, ctx, ctx.ProgramBuilder.NewTuple(GetAllArguments(node, ctx)));
  2608. case TExprNode::Callable:
  2609. return CheckTypeAndMemoize(node, ctx, ctx.MkqlCompiler.GetCallable(node.Content())(node, ctx));
  2610. case TExprNode::Argument:
  2611. ythrow TNodeException(node) << "Unexpected argument: " << node.Content();
  2612. default:
  2613. ythrow TNodeException(node) << "Unexpected node type: " << type;
  2614. }
  2615. }
  2616. } // namespace NCommon
  2617. } // namespace NYql