yql_provider_mkql.cpp 135 KB


  1. #include "yql_provider_mkql.h"
  2. #include "yql_type_mkql.h"
  3. #include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
  4. #include <ydb/library/yql/core/yql_expr_type_annotation.h>
  5. #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
  6. #include <ydb/library/yql/core/yql_expr_type_annotation.h>
  7. #include <ydb/library/yql/core/yql_match_recognize.h>
  8. #include <ydb/library/yql/core/yql_join.h>
  9. #include <ydb/library/yql/core/yql_opt_utils.h>
  10. #include <ydb/library/yql/minikql/mkql_node.h>
  11. #include <ydb/library/yql/minikql/mkql_node_cast.h>
  12. #include <ydb/library/yql/minikql/mkql_program_builder.h>
  13. #include <ydb/library/yql/minikql/mkql_runtime_version.h>
  14. #include <ydb/library/yql/minikql/mkql_type_ops.h>
  15. #include <ydb/library/yql/public/decimal/yql_decimal.h>
  16. #include <ydb/library/yql/parser/pg_catalog/catalog.h>
  17. #include <util/stream/null.h>
  18. #include <array>
  19. using namespace NKikimr;
  20. using namespace NKikimr::NMiniKQL;
  21. namespace NYql {
  22. namespace NCommon {
  23. TRuntimeNode WideTopImpl(const TExprNode& node, TMkqlBuildContext& ctx,
  24. TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) {
  25. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  26. const auto count = MkqlBuildExpr(*node.Child(1U), ctx);
  27. std::vector<std::pair<ui32, TRuntimeNode>> directions;
  28. directions.reserve(node.Tail().ChildrenSize());
  29. node.Tail().ForEachChild([&](const TExprNode& dir) {
  30. directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
  31. });
  32. return (ctx.ProgramBuilder.*func)(flow, count, directions);
  33. }
  34. TRuntimeNode WideSortImpl(const TExprNode& node, TMkqlBuildContext& ctx,
  35. TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) {
  36. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  37. std::vector<std::pair<ui32, TRuntimeNode>> directions;
  38. directions.reserve(node.Tail().ChildrenSize());
  39. node.Tail().ForEachChild([&](const TExprNode& dir) {
  40. directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
  41. });
  42. return (ctx.ProgramBuilder.*func)(flow, directions);
  43. }
  44. TRuntimeNode CombineByKeyImpl(const TExprNode& node, TMkqlBuildContext& ctx) {
  45. NNodes::TCoCombineByKey combine(&node);
  46. const bool isStreamOrFlow = combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream ||
  47. combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow;
  48. YQL_ENSURE(!isStreamOrFlow);
  49. const auto input = MkqlBuildExpr(combine.Input().Ref(), ctx);
  50. TRuntimeNode preMapList = ctx.ProgramBuilder.FlatMap(input, [&](TRuntimeNode item) {
  51. return MkqlBuildLambda(combine.PreMapLambda().Ref(), ctx, {item});
  52. });
  53. const auto dict = ctx.ProgramBuilder.ToHashedDict(preMapList, true, [&](TRuntimeNode item) {
  54. return MkqlBuildLambda(combine.KeySelectorLambda().Ref(), ctx, {item});
  55. }, [&](TRuntimeNode item) {
  56. return item;
  57. });
  58. const auto values = ctx.ProgramBuilder.DictItems(dict);
  59. return ctx.ProgramBuilder.FlatMap(values, [&](TRuntimeNode item) {
  60. auto key = ctx.ProgramBuilder.Nth(item, 0);
  61. auto payloadList = ctx.ProgramBuilder.Nth(item, 1);
  62. auto fold1 = ctx.ProgramBuilder.Fold1(payloadList, [&](TRuntimeNode item2) {
  63. return MkqlBuildLambda(combine.InitHandlerLambda().Ref(), ctx, {key, item2});
  64. }, [&](TRuntimeNode item2, TRuntimeNode state) {
  65. return MkqlBuildLambda(combine.UpdateHandlerLambda().Ref(), ctx, {key, item2, state});
  66. });
  67. auto res = ctx.ProgramBuilder.FlatMap(fold1, [&](TRuntimeNode state) {
  68. return MkqlBuildLambda(combine.FinishHandlerLambda().Ref(), ctx, {key, state});
  69. });
  70. return res;
  71. });
  72. }
  73. namespace {
  74. std::array<TRuntimeNode, 2U> MkqlBuildSplitLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const std::initializer_list<TRuntimeNode>& args) {
  75. TMkqlBuildContext::TArgumentsMap innerArguments;
  76. innerArguments.reserve(args.size());
  77. auto it = args.begin();
  78. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  79. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  80. const auto& body = lambda.Tail();
  81. MKQL_ENSURE(body.IsList() && body.ChildrenSize() == 2U, "Expected pair of nodes.");
  82. return {{MkqlBuildExpr(body.Head(), innerCtx), MkqlBuildExpr(body.Tail(), innerCtx)}};
  83. }
  84. TMkqlBuildContext* GetNodeContext(const TExprNode& node, TMkqlBuildContext& ctx) {
  85. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  86. const auto knownNode = currCtx->Memoization.find(&node);
  87. if (currCtx->Memoization.cend() != knownNode) {
  88. return currCtx;
  89. }
  90. }
  91. return nullptr;
  92. }
  93. TMkqlBuildContext* GetNodeContextByLambda(const TExprNode& node, TMkqlBuildContext& ctx) {
  94. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  95. if (currCtx->LambdaId == node.UniqueId()) {
  96. return currCtx;
  97. }
  98. }
  99. return nullptr;
  100. }
  101. TMkqlBuildContext* GetContextForMemoizeInUnknowScope(const TExprNode& node, TMkqlBuildContext& ctx) {
  102. TMkqlBuildContext* result = nullptr;
  103. for (const auto& c : node.Children()) {
  104. const auto& child = c->IsLambda() ? c->Tail() : *c;
  105. if (!child.IsAtom()) {
  106. auto nodeCtx = GetNodeContext(child, ctx);
  107. if (!nodeCtx) {
  108. nodeCtx = GetContextForMemoizeInUnknowScope(child, ctx);
  109. }
  110. if (!result || result->Level < nodeCtx->Level) {
  111. result = nodeCtx;
  112. if (result == &ctx) {
  113. break;
  114. }
  115. }
  116. }
  117. }
  118. if (!result) {
  119. for (result = &ctx; result->ParentCtx; result = result->ParentCtx)
  120. continue;
  121. }
  122. return result;
  123. }
  124. TMkqlBuildContext* GetContextForMemoize(const TExprNode& node, TMkqlBuildContext& ctx) {
  125. if (const auto scope = node.GetDependencyScope()) {
  126. if (const auto lambda = scope->second) {
  127. return GetNodeContextByLambda(*lambda, ctx);
  128. }
  129. } else {
  130. return GetContextForMemoizeInUnknowScope(node, ctx);
  131. }
  132. auto result = &ctx;
  133. while (result->ParentCtx) {
  134. result = result->ParentCtx;
  135. }
  136. return result;
  137. }
  138. const TRuntimeNode& CheckTypeAndMemoize(const TExprNode& node, TMkqlBuildContext& ctx, const TRuntimeNode& runtime) {
  139. if (node.GetTypeAnn()) {
  140. TNullOutput null;
  141. if (const auto type = BuildType(*node.GetTypeAnn(), ctx.ProgramBuilder, null)) {
  142. if (!type->IsSameType(*runtime.GetStaticType())) {
  143. ythrow TNodeException(node) << "Expected: " << *type << " type, but got: " << *runtime.GetStaticType() << ".";
  144. }
  145. }
  146. }
  147. return GetContextForMemoize(node, ctx)->Memoization.emplace(&node, runtime).first->second;
  148. }
  149. std::vector<TRuntimeNode> GetAllArguments(const TExprNode& node, TMkqlBuildContext& ctx) {
  150. std::vector<TRuntimeNode> args;
  151. args.reserve(node.ChildrenSize());
  152. node.ForEachChild([&](const TExprNode& child){ args.emplace_back(MkqlBuildExpr(child, ctx)); });
  153. return args;
  154. }
  155. template <size_t From>
  156. std::vector<TRuntimeNode> GetArgumentsFrom(const TExprNode& node, TMkqlBuildContext& ctx) {
  157. std::vector<TRuntimeNode> args;
  158. args.reserve(node.ChildrenSize() - From);
  159. for (auto i = From; i < node.ChildrenSize(); ++i) {
  160. args.emplace_back(MkqlBuildExpr(*node.Child(i), ctx));
  161. }
  162. return args;
  163. }
  164. NUdf::TDataTypeId ParseDataType(const TExprNode& owner, const std::string_view& type) {
  165. if (const auto slot = NUdf::FindDataSlot(type)) {
  166. return NUdf::GetDataTypeInfo(*slot).TypeId;
  167. }
  168. ythrow TNodeException(owner) << "Unsupported data type: " << type;
  169. }
  170. EJoinKind GetJoinKind(const TExprNode& owner, const std::string_view& content) {
  171. if (content == "Inner") {
  172. return EJoinKind::Inner;
  173. }
  174. else if (content == "Left") {
  175. return EJoinKind::Left;
  176. }
  177. else if (content == "Right") {
  178. return EJoinKind::Right;
  179. }
  180. else if (content == "Full") {
  181. return EJoinKind::Full;
  182. }
  183. else if (content == "LeftOnly") {
  184. return EJoinKind::LeftOnly;
  185. }
  186. else if (content == "RightOnly") {
  187. return EJoinKind::RightOnly;
  188. }
  189. else if (content == "Exclusion") {
  190. return EJoinKind::Exclusion;
  191. }
  192. else if (content == "LeftSemi") {
  193. return EJoinKind::LeftSemi;
  194. }
  195. else if (content == "RightSemi") {
  196. return EJoinKind::RightSemi;
  197. }
  198. else if (content == "Cross") {
  199. return EJoinKind::Cross;
  200. }
  201. else {
  202. ythrow TNodeException(owner) << "Unexpected join kind: " << content;
  203. }
  204. }
  205. template<typename TLayout>
  206. std::pair<TLayout, ui16> CutTimezone(const std::string_view& atom) {
  207. const auto pos = atom.find(',');
  208. MKQL_ENSURE(std::string_view::npos != pos, "Expected two components.");
  209. return std::make_pair(::FromString<TLayout>(atom.substr(0, pos)), GetTimezoneId(atom.substr(pos + 1)));
  210. }
  211. } // namespace
  212. bool TMkqlCallableCompilerBase::HasCallable(const std::string_view& name) const {
  213. return Callables.contains(name);
  214. }
  215. void TMkqlCallableCompilerBase::AddCallable(const std::string_view& name, TCompiler compiler) {
  216. const auto result = Callables.emplace(TString(name), compiler);
  217. YQL_ENSURE(result.second, "Callable already exists: " << name);
  218. }
  219. void TMkqlCallableCompilerBase::AddCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  220. for (const auto& name : names) {
  221. AddCallable(name, compiler);
  222. }
  223. }
  224. void TMkqlCallableCompilerBase::ChainCallable(const std::string_view& name, TCompiler compiler) {
  225. auto prevCompiler = GetCallable(name);
  226. auto chainedCompiler = [compiler = std::move(compiler), prevCompiler = std::move(prevCompiler)](const TExprNode& node, TMkqlBuildContext& ctx) -> NKikimr::NMiniKQL::TRuntimeNode {
  227. if (auto res = compiler(node, ctx)) {
  228. return res;
  229. }
  230. return prevCompiler(node, ctx);
  231. };
  232. OverrideCallable(name, chainedCompiler);
  233. }
  234. void TMkqlCallableCompilerBase::ChainCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  235. for (const auto& name : names) {
  236. ChainCallable(name, compiler);
  237. }
  238. }
  239. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::UnaryFunctionMethod>>& callables) {
  240. for (const auto& callable : callables) {
  241. AddCallable(callable.first,
  242. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  243. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  244. return (ctx.ProgramBuilder.*method)(arg);
  245. }
  246. );
  247. }
  248. }
  249. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::BinaryFunctionMethod>>& callables) {
  250. for (const auto& callable : callables) {
  251. AddCallable(callable.first,
  252. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  253. const auto one = MkqlBuildExpr(node.Head(), ctx);
  254. const auto two = MkqlBuildExpr(node.Tail(), ctx);
  255. return (ctx.ProgramBuilder.*method)(one, two);
  256. }
  257. );
  258. }
  259. }
  260. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::TernaryFunctionMethod>>& callables) {
  261. for (const auto& callable : callables) {
  262. AddCallable(callable.first,
  263. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  264. const auto arg1 = MkqlBuildExpr(node.Head(), ctx);
  265. const auto arg2 = MkqlBuildExpr(*node.Child(1U), ctx);
  266. const auto arg3 = MkqlBuildExpr(node.Tail(), ctx);
  267. return (ctx.ProgramBuilder.*method)(arg1, arg2, arg3);
  268. }
  269. );
  270. }
  271. }
  272. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::ArrayFunctionMethod>>& callables) {
  273. for (const auto& callable : callables) {
  274. AddCallable(callable.first,
  275. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  276. const auto& args = GetAllArguments(node, ctx);
  277. return (ctx.ProgramBuilder.*method)(args);
  278. }
  279. );
  280. }
  281. }
  282. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::ProcessFunctionMethod>>& callables) {
  283. for (const auto& callable : callables) {
  284. AddCallable(callable.first,
  285. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  286. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  287. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(node.Tail(), ctx, {item}); };
  288. return (ctx.ProgramBuilder.*method)(arg, lambda);
  289. }
  290. );
  291. }
  292. }
  293. void TMkqlCallableCompilerBase::AddSimpleCallables(const std::initializer_list<std::pair<std::string_view, TProgramBuilder::NarrowFunctionMethod>>& callables) {
  294. for (const auto& callable : callables) {
  295. AddCallable(callable.first,
  296. [method=callable.second](const TExprNode& node, TMkqlBuildContext& ctx) {
  297. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  298. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildLambda(node.Tail(), ctx, items); };
  299. return (ctx.ProgramBuilder.*method)(arg, lambda);
  300. }
  301. );
  302. }
  303. }
  304. void TMkqlCallableCompilerBase::OverrideCallable(const std::string_view& name, TCompiler compiler) {
  305. const auto prevCompiler = Callables.find(name);
  306. YQL_ENSURE(Callables.cend() != prevCompiler, "Missed callable: " << name);
  307. prevCompiler->second = compiler;
  308. Callables[name] = compiler;
  309. }
  310. IMkqlCallableCompiler::TCompiler TMkqlCallableCompilerBase::GetCallable(const std::string_view& name) const {
  311. const auto compiler = Callables.find(name);
  312. YQL_ENSURE(Callables.cend() != compiler, "Missed callable: " << name);
  313. return compiler->second;
  314. }
  315. IMkqlCallableCompiler::TCompiler TMkqlCallableCompilerBase::FindCallable(const std::string_view& name) const {
  316. const auto compiler = Callables.find(name);
  317. return Callables.cend() != compiler ? compiler->second : IMkqlCallableCompiler::TCompiler();
  318. }
  319. bool TMkqlCommonCallableCompiler::HasCallable(const std::string_view& name) const {
  320. if (TMkqlCallableCompilerBase::HasCallable(name)) {
  321. return true;
  322. }
  323. return GetShared().HasCallable(name);
  324. }
  325. IMkqlCallableCompiler::TCompiler TMkqlCommonCallableCompiler::FindCallable(const std::string_view& name) const {
  326. if (const auto func = TMkqlCallableCompilerBase::FindCallable(name)) {
  327. return func;
  328. }
  329. return GetShared().FindCallable(name);
  330. }
  331. IMkqlCallableCompiler::TCompiler TMkqlCommonCallableCompiler::GetCallable(const std::string_view& name) const {
  332. if (const auto func = TMkqlCallableCompilerBase::FindCallable(name)) {
  333. return func;
  334. }
  335. return GetShared().GetCallable(name);
  336. }
  337. void TMkqlCommonCallableCompiler::OverrideCallable(const std::string_view& name, TCompiler compiler) {
  338. if (TMkqlCallableCompilerBase::HasCallable(name)) {
  339. TMkqlCallableCompilerBase::OverrideCallable(name, compiler);
  340. } else {
  341. YQL_ENSURE(GetShared().HasCallable(name));
  342. TMkqlCallableCompilerBase::AddCallable(name, compiler);
  343. }
  344. }
  345. void TMkqlCommonCallableCompiler::AddCallable(const std::string_view& name, TCompiler compiler) {
  346. YQL_ENSURE(!GetShared().HasCallable(name), "Compiler already set for callable: " << name);
  347. TMkqlCallableCompilerBase::AddCallable(name, compiler);
  348. }
  349. void TMkqlCommonCallableCompiler::AddCallable(const std::initializer_list<std::string_view>& names, TCompiler compiler) {
  350. for (const auto& name : names) {
  351. AddCallable(name, compiler);
  352. }
  353. }
  354. TMkqlCommonCallableCompiler::TShared::TShared() {
  355. AddSimpleCallables({
  356. {"Abs", &TProgramBuilder::Abs},
  357. {"Plus", &TProgramBuilder::Plus},
  358. {"Minus", &TProgramBuilder::Minus},
  359. {"Inc", &TProgramBuilder::Increment},
  360. {"Dec", &TProgramBuilder::Decrement},
  361. {"Not", &TProgramBuilder::Not},
  362. {"BlockNot", &TProgramBuilder::BlockNot},
  363. {"BlockJust", &TProgramBuilder::BlockJust},
  364. {"BitNot", &TProgramBuilder::BitNot},
  365. {"Size", &TProgramBuilder::Size},
  366. {"Way", &TProgramBuilder::Way},
  367. {"VariantItem", &TProgramBuilder::VariantItem},
  368. {"CountBits", &TProgramBuilder::CountBits},
  369. {"Ascending", &TProgramBuilder::Ascending},
  370. {"Descending", &TProgramBuilder::Descending},
  371. {"ToOptional", &TProgramBuilder::Head},
  372. {"Head", &TProgramBuilder::Head},
  373. {"Last", &TProgramBuilder::Last},
  374. {"ToList", &TProgramBuilder::ToList},
  375. {"ToFlow", &TProgramBuilder::ToFlow},
  376. {"FromFlow", &TProgramBuilder::FromFlow},
  377. {"WideToBlocks", &TProgramBuilder::WideToBlocks},
  378. {"WideFromBlocks", &TProgramBuilder::WideFromBlocks},
  379. {"AsScalar", &TProgramBuilder::AsScalar},
  380. {"Just", &TProgramBuilder::NewOptional},
  381. {"Exists", &TProgramBuilder::Exists},
  382. {"Pickle", &TProgramBuilder::Pickle},
  383. {"StablePickle", &TProgramBuilder::StablePickle},
  384. {"Collect", &TProgramBuilder::Collect},
  385. {"Discard", &TProgramBuilder::Discard},
  386. {"LazyList", &TProgramBuilder::LazyList},
  387. {"ForwardList", &TProgramBuilder::ForwardList},
  388. {"Length", &TProgramBuilder::Length},
  389. {"HasItems", &TProgramBuilder::HasItems},
  390. {"Reverse", &TProgramBuilder::Reverse},
  391. {"ToIndexDict", &TProgramBuilder::ToIndexDict},
  392. {"ToString", &TProgramBuilder::ToString},
  393. {"ToBytes", &TProgramBuilder::ToBytes},
  394. {"AggrCountInit", &TProgramBuilder::AggrCountInit},
  395. {"NewMTRand", &TProgramBuilder::NewMTRand},
  396. {"NextMTRand", &TProgramBuilder::NextMTRand},
  397. {"TimezoneId", &TProgramBuilder::TimezoneId},
  398. {"TimezoneName", &TProgramBuilder::TimezoneName},
  399. {"RemoveTimezone", &TProgramBuilder::RemoveTimezone},
  400. {"DictItems", &TProgramBuilder::DictItems},
  401. {"DictKeys", &TProgramBuilder::DictKeys},
  402. {"DictPayloads", &TProgramBuilder::DictPayloads},
  403. {"QueuePop", &TProgramBuilder::QueuePop}
  404. });
  405. AddSimpleCallables({
  406. {"+", &TProgramBuilder::Add},
  407. {"-", &TProgramBuilder::Sub},
  408. {"*", &TProgramBuilder::Mul},
  409. {"/", &TProgramBuilder::Div},
  410. {"%", &TProgramBuilder::Mod},
  411. {"Add", &TProgramBuilder::Add},
  412. {"Sub", &TProgramBuilder::Sub},
  413. {"Mul", &TProgramBuilder::Mul},
  414. {"Div", &TProgramBuilder::Div},
  415. {"Mod", &TProgramBuilder::Mod},
  416. {"DecimalMul", &TProgramBuilder::DecimalMul},
  417. {"DecimalDiv", &TProgramBuilder::DecimalDiv},
  418. {"DecimalMod", &TProgramBuilder::DecimalMod},
  419. {"==", &TProgramBuilder::Equals},
  420. {"!=", &TProgramBuilder::NotEquals},
  421. {"<", &TProgramBuilder::Less},
  422. {"<=", &TProgramBuilder::LessOrEqual},
  423. {">", &TProgramBuilder::Greater},
  424. {">=", &TProgramBuilder::GreaterOrEqual},
  425. {"Equals", &TProgramBuilder::Equals},
  426. {"NotEquals", &TProgramBuilder::NotEquals},
  427. {"Less", &TProgramBuilder::Less},
  428. {"LessOrEqual", &TProgramBuilder::LessOrEqual},
  429. {"Greater", &TProgramBuilder::Greater},
  430. {"GreaterOrEqual", &TProgramBuilder::GreaterOrEqual},
  431. {"AggrEquals", &TProgramBuilder::AggrEquals},
  432. {"AggrNotEquals", &TProgramBuilder::AggrNotEquals},
  433. {"AggrLess", &TProgramBuilder::AggrLess},
  434. {"AggrLessOrEqual", &TProgramBuilder::AggrLessOrEqual},
  435. {"AggrGreater", &TProgramBuilder::AggrGreater},
  436. {"AggrGreaterOrEqual", &TProgramBuilder::AggrGreaterOrEqual},
  437. {"AggrMin", &TProgramBuilder::AggrMin},
  438. {"AggrMax", &TProgramBuilder::AggrMax},
  439. {"AggrAdd", &TProgramBuilder::AggrAdd},
  440. {"AggrCountUpdate", &TProgramBuilder::AggrCountUpdate},
  441. {"BitOr", &TProgramBuilder::BitOr},
  442. {"BitAnd", &TProgramBuilder::BitAnd},
  443. {"BitXor", &TProgramBuilder::BitXor},
  444. {"ShiftLeft", &TProgramBuilder::ShiftLeft},
  445. {"ShiftRight", &TProgramBuilder::ShiftRight},
  446. {"RotLeft", &TProgramBuilder::RotLeft},
  447. {"RotRight", &TProgramBuilder::RotRight},
  448. {"ListIf", &TProgramBuilder::ListIf},
  449. {"Concat", &TProgramBuilder::Concat},
  450. {"AggrConcat", &TProgramBuilder::AggrConcat},
  451. {"ByteAt", &TProgramBuilder::ByteAt},
  452. {"Nanvl", &TProgramBuilder::Nanvl},
  453. {"Skip", &TProgramBuilder::Skip},
  454. {"Take", &TProgramBuilder::Take},
  455. {"Limit", &TProgramBuilder::Take},
  456. {"WideTakeBlocks", &TProgramBuilder::WideTakeBlocks},
  457. {"WideSkipBlocks", &TProgramBuilder::WideSkipBlocks},
  458. {"BlockCoalesce", &TProgramBuilder::BlockCoalesce},
  459. {"ReplicateScalar", &TProgramBuilder::ReplicateScalar},
  460. {"BlockAnd", &TProgramBuilder::BlockAnd},
  461. {"BlockOr", &TProgramBuilder::BlockOr},
  462. {"BlockXor", &TProgramBuilder::BlockXor},
  463. {"Append", &TProgramBuilder::Append},
  464. {"Insert", &TProgramBuilder::Append},
  465. {"Prepend", &TProgramBuilder::Prepend},
  466. {"Lookup", &TProgramBuilder::Lookup},
  467. {"Contains", &TProgramBuilder::Contains},
  468. {"AddTimezone", &TProgramBuilder::AddTimezone},
  469. {"StartsWith", &TProgramBuilder::StartsWith},
  470. {"EndsWith", &TProgramBuilder::EndsWith},
  471. {"StringContains", &TProgramBuilder::StringContains},
  472. {"SqueezeToList", &TProgramBuilder::SqueezeToList},
  473. {"QueuePush", &TProgramBuilder::QueuePush}
  474. });
  475. AddSimpleCallables({
  476. {"Substring", &TProgramBuilder::Substring},
  477. {"Find", &TProgramBuilder::Find},
  478. {"RFind", &TProgramBuilder::RFind},
  479. {"ListFromRange", &TProgramBuilder::ListFromRange},
  480. {"PreserveStream", &TProgramBuilder::PreserveStream},
  481. {"BlockIf", &TProgramBuilder::BlockIf},
  482. });
  483. AddSimpleCallables({
  484. {"If", &TProgramBuilder::If},
  485. {"Or", &TProgramBuilder::Or},
  486. {"And", &TProgramBuilder::And},
  487. {"Xor", &TProgramBuilder::Xor},
  488. {"Min", &TProgramBuilder::Min},
  489. {"Max", &TProgramBuilder::Max},
  490. {"AsList", &TProgramBuilder::AsList},
  491. {"Extend", &TProgramBuilder::Extend},
  492. {"OrderedExtend", &TProgramBuilder::OrderedExtend},
  493. {"Zip", &TProgramBuilder::Zip},
  494. {"ZipAll", &TProgramBuilder::ZipAll},
  495. {"Random", &TProgramBuilder::Random},
  496. {"RandomNumber", &TProgramBuilder::RandomNumber},
  497. {"RandomUuid", &TProgramBuilder::RandomUuid},
  498. {"Now", &TProgramBuilder::Now},
  499. {"CurrentUtcDate", &TProgramBuilder::CurrentUtcDate},
  500. {"CurrentUtcDatetime", &TProgramBuilder::CurrentUtcDatetime},
  501. {"CurrentUtcTimestamp", &TProgramBuilder::CurrentUtcTimestamp},
  502. });
  503. AddSimpleCallables({
  504. {"Map", &TProgramBuilder::Map},
  505. {"OrderedMap", &TProgramBuilder::OrderedMap},
  506. {"FlatMap", &TProgramBuilder::FlatMap},
  507. {"OrderedFlatMap", &TProgramBuilder::OrderedFlatMap},
  508. {"SkipWhile", &TProgramBuilder::SkipWhile},
  509. {"TakeWhile", &TProgramBuilder::TakeWhile},
  510. {"SkipWhileInclusive", &TProgramBuilder::SkipWhileInclusive},
  511. {"TakeWhileInclusive", &TProgramBuilder::TakeWhileInclusive},
  512. });
  513. AddSimpleCallables({
  514. {"NarrowMap", &TProgramBuilder::NarrowMap},
  515. {"NarrowFlatMap", &TProgramBuilder::NarrowFlatMap},
  516. {"WideSkipWhile", &TProgramBuilder::WideSkipWhile},
  517. {"WideTakeWhile", &TProgramBuilder::WideTakeWhile},
  518. {"WideSkipWhileInclusive", &TProgramBuilder::WideSkipWhileInclusive},
  519. {"WideTakeWhileInclusive", &TProgramBuilder::WideTakeWhileInclusive},
  520. });
  521. AddSimpleCallables({
  522. {"RangeUnion", &TProgramBuilder::RangeUnion},
  523. {"RangeIntersect", &TProgramBuilder::RangeIntersect},
  524. {"RangeMultiply", &TProgramBuilder::RangeMultiply},
  525. });
  526. AddSimpleCallables({
  527. {"RangeCreate", &TProgramBuilder::RangeCreate},
  528. {"RangeFinalize", &TProgramBuilder::RangeFinalize},
  529. });
  530. AddCallable({"RoundUp", "RoundDown"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  531. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  532. const auto dstType = BuildType(node.Tail(), *node.Tail().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), ctx.ProgramBuilder);
  533. return ctx.ProgramBuilder.Round(node.Content(), arg, dstType);
  534. });
  535. AddSimpleCallables({
  536. {"NextValue", &TProgramBuilder::NextValue},
  537. });
  538. AddCallable({"MultiMap", "OrderedMultiMap"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  539. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  540. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildWideLambda(node.Tail(), ctx, {item}); };
  541. return ctx.ProgramBuilder.MultiMap(arg, lambda);
  542. });
  543. AddCallable("ExpandMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  544. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  545. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildWideLambda(node.Tail(), ctx, {item}); };
  546. return ctx.ProgramBuilder.ExpandMap(arg, lambda);
  547. });
  548. AddCallable("WideMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  549. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  550. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildWideLambda(node.Tail(), ctx, items); };
  551. TRuntimeNode result = ctx.ProgramBuilder.WideMap(arg, lambda);
  552. if (IsWideBlockType(*node.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType())) {
  553. result = ctx.ProgramBuilder.BlockExpandChunked(result);
  554. }
  555. return result;
  556. });
  557. AddCallable("WideChain1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  558. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  559. return ctx.ProgramBuilder.WideChain1Map(flow,
  560. [&](TRuntimeNode::TList items) {
  561. return MkqlBuildWideLambda(*node.Child(1), ctx, items);
  562. },
  563. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  564. items.insert(items.cend(), state.cbegin(), state.cend());
  565. return MkqlBuildWideLambda(node.Tail(), ctx, items);
  566. });
  567. });
  568. AddCallable("NarrowMultiMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  569. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  570. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildWideLambda(node.Tail(), ctx, items); };
  571. return ctx.ProgramBuilder.NarrowMultiMap(arg, lambda);
  572. });
  573. AddCallable("WideFilter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  574. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  575. const auto lambda = [&](TRuntimeNode::TList items) { return MkqlBuildLambda(*node.Child(1), ctx, items); };
  576. if (node.ChildrenSize() > 2U) {
  577. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  578. return ctx.ProgramBuilder.WideFilter(arg, limit, lambda);
  579. } else {
  580. return ctx.ProgramBuilder.WideFilter(arg, lambda);
  581. }
  582. });
  583. AddCallable("WideCondense1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  584. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  585. return ctx.ProgramBuilder.WideCondense1(flow,
  586. [&](TRuntimeNode::TList items) {
  587. return MkqlBuildWideLambda(*node.Child(1), ctx, items);
  588. },
  589. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  590. items.insert(items.cend(), state.cbegin(), state.cend());
  591. return MkqlBuildLambda(*node.Child(2), ctx, items);
  592. },
  593. [&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
  594. items.insert(items.cend(), state.cbegin(), state.cend());
  595. return MkqlBuildWideLambda(*node.Child(3), ctx, items);
  596. },
  597. HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
  598. });
  599. AddCallable("WideCombiner", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  600. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  601. i64 memLimit = 0LL;
  602. const bool withLimit = TryFromString<i64>(node.Child(1U)->Content(), memLimit);
  603. const auto keyExtractor = [&](TRuntimeNode::TList items) {
  604. return MkqlBuildWideLambda(*node.Child(2U), ctx, items);
  605. };
  606. const auto init = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) {
  607. keys.insert(keys.cend(), items.cbegin(), items.cend());
  608. return MkqlBuildWideLambda(*node.Child(3U), ctx, keys);
  609. };
  610. const auto update = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items, TRuntimeNode::TList state) {
  611. keys.insert(keys.cend(), items.cbegin(), items.cend());
  612. keys.insert(keys.cend(), state.cbegin(), state.cend());
  613. return MkqlBuildWideLambda(*node.Child(4U), ctx, keys);
  614. };
  615. const auto finish = [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) {
  616. keys.insert(keys.cend(), state.cbegin(), state.cend());
  617. return MkqlBuildWideLambda(node.Tail(), ctx, keys);
  618. };
  619. if (withLimit)
  620. return ctx.ProgramBuilder.WideCombiner(flow, memLimit, keyExtractor, init, update, finish);
  621. else
  622. return ctx.ProgramBuilder.WideLastCombiner(flow, keyExtractor, init, update, finish);
  623. });
  624. AddCallable("WideChopper", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  625. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  626. const auto keyExtractor = [&](TRuntimeNode::TList items) {
  627. return MkqlBuildWideLambda(*node.Child(1U), ctx, items);
  628. };
  629. const auto groupSwitch = [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) {
  630. keys.insert(keys.cend(), items.cbegin(), items.cend());
  631. return MkqlBuildLambda(*node.Child(2U), ctx, keys);
  632. };
  633. const auto handler = [&](TRuntimeNode::TList keys, TRuntimeNode flow) {
  634. keys.emplace_back(flow);
  635. return MkqlBuildLambda(node.Tail(), ctx, keys);
  636. };
  637. return ctx.ProgramBuilder.WideChopper(flow, keyExtractor, groupSwitch, handler);
  638. });
  639. AddCallable("WideTop", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  640. return WideTopImpl(node, ctx, &TProgramBuilder::WideTop);
  641. });
  642. AddCallable("WideTopSort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  643. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSort);
  644. });
  645. AddCallable("WideSort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  646. return WideSortImpl(node, ctx, &TProgramBuilder::WideSort);
  647. });
  648. AddCallable("WideTopBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  649. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopBlocks);
  650. });
  651. AddCallable("WideTopSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  652. return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSortBlocks);
  653. });
  654. AddCallable("WideSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  655. return WideSortImpl(node, ctx, &TProgramBuilder::WideSortBlocks);
  656. });
  657. AddCallable("Iterable", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  658. const auto lambda = [&]() { return MkqlBuildLambda(node.Head(), ctx, {}); };
  659. return ctx.ProgramBuilder.Iterable(lambda);
  660. });
  661. AddCallable("Filter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  662. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  663. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(*node.Child(1), ctx, {item}); };
  664. if (node.ChildrenSize() > 2U) {
  665. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  666. return ctx.ProgramBuilder.Filter(arg, limit, lambda);
  667. } else {
  668. return ctx.ProgramBuilder.Filter(arg, lambda);
  669. }
  670. });
  671. AddCallable("OrderedFilter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  672. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  673. const auto lambda = [&](TRuntimeNode item) { return MkqlBuildLambda(*node.Child(1), ctx, {item}); };
  674. if (node.ChildrenSize() > 2U) {
  675. const auto limit = MkqlBuildExpr(node.Tail(), ctx);
  676. return ctx.ProgramBuilder.OrderedFilter(arg, limit, lambda);
  677. } else {
  678. return ctx.ProgramBuilder.OrderedFilter(arg, lambda);
  679. }
  680. });
  681. AddCallable("Member", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  682. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  683. const auto name = node.Tail().Content();
  684. return ctx.ProgramBuilder.Member(structObj, name);
  685. });
  686. AddCallable("RemoveMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  687. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  688. const auto name = node.Tail().Content();
  689. return ctx.ProgramBuilder.RemoveMember(structObj, name, false);
  690. });
  691. AddCallable("ForceRemoveMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  692. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  693. const auto name = node.Tail().Content();
  694. return ctx.ProgramBuilder.RemoveMember(structObj, name, true);
  695. });
  696. AddCallable("Nth", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  697. const auto tupleObj = MkqlBuildExpr(node.Head(), ctx);
  698. const auto index = FromString<ui32>(node.Tail().Content());
  699. return ctx.ProgramBuilder.Nth(tupleObj, index);
  700. });
  701. AddCallable("MatchRecognizeCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  702. const auto& inputStream = node.Child(0);
  703. const auto& partitionKeySelector = node.Child(1);
  704. const auto& partitionColumns = node.Child(2);
  705. const auto& params = node.Child(3);
  706. const auto& settings = node.Child(4);
  707. //explore params
  708. const auto& measures = params->ChildRef(0);
  709. const auto& pattern = params->ChildRef(3);
  710. const auto& defines = params->ChildRef(4);
  711. //explore measures
  712. const auto measureNames = measures->ChildRef(2);
  713. constexpr size_t FirstMeasureLambdaIndex = 3;
  714. //explore defines
  715. const auto defineNames = defines->ChildRef(2);
  716. const size_t FirstDefineLambdaIndex = 3;
  717. TVector<TStringBuf> partitionColumnNames;
  718. for (const auto& n: partitionColumns->Children()) {
  719. partitionColumnNames.push_back(n->Content());
  720. }
  721. TProgramBuilder::TUnaryLambda getPartitionKeySelector = [partitionKeySelector, &ctx](TRuntimeNode inputRowArg){
  722. return MkqlBuildLambda(*partitionKeySelector, ctx, {inputRowArg});
  723. };
  724. TVector<std::pair<TStringBuf, TProgramBuilder::TTernaryLambda>> getDefines(defineNames->ChildrenSize());
  725. for (size_t i = 0; i != defineNames->ChildrenSize(); ++i) {
  726. getDefines[i] = std::pair{
  727. defineNames->ChildRef(i)->Content(),
  728. [i, defines, &ctx](TRuntimeNode data, TRuntimeNode matchedVars, TRuntimeNode rowIndex) {
  729. return MkqlBuildLambda(*defines->ChildRef(FirstDefineLambdaIndex + i), ctx,
  730. {data, matchedVars, rowIndex});
  731. }
  732. };
  733. }
  734. TVector<std::pair<TStringBuf, TProgramBuilder::TBinaryLambda>> getMeasures(measureNames->ChildrenSize());
  735. for (size_t i = 0; i != measureNames->ChildrenSize(); ++i) {
  736. getMeasures[i] = std::pair{
  737. measureNames->ChildRef(i)->Content(),
  738. [i, measures, &ctx](TRuntimeNode data, TRuntimeNode matchedVars) {
  739. return MkqlBuildLambda(*measures->ChildRef(FirstMeasureLambdaIndex + i), ctx,
  740. {data, matchedVars});
  741. }
  742. };
  743. }
  744. const auto streamingMode = FromString<bool>(settings->Child(0)->Child(1)->Content());
  745. return ctx.ProgramBuilder.MatchRecognizeCore(
  746. MkqlBuildExpr(*inputStream, ctx),
  747. getPartitionKeySelector,
  748. partitionColumnNames,
  749. getMeasures,
  750. NYql::NMatchRecognize::ConvertPattern(pattern, ctx.ExprCtx),
  751. getDefines,
  752. streamingMode
  753. );
  754. });
  755. AddCallable("TimeOrderRecover", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  756. const auto inputStream = node.Child(0);
  757. const auto timeExtractor = node.Child(1);
  758. const auto delay = node.Child(2);
  759. const auto ahead = node.Child(3);
  760. const auto rowLimit = node.Child(4);
  761. return ctx.ProgramBuilder.TimeOrderRecover(
  762. MkqlBuildExpr(*inputStream, ctx),
  763. [timeExtractor, &ctx](TRuntimeNode row) {
  764. return MkqlBuildLambda(*timeExtractor, ctx, {row});
  765. },
  766. MkqlBuildExpr(*delay, ctx),
  767. MkqlBuildExpr(*ahead, ctx),
  768. MkqlBuildExpr(*rowLimit, ctx)
  769. );
  770. });
  771. AddCallable("Guess", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  772. const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
  773. auto type = node.Head().GetTypeAnn();
  774. if (type->GetKind() == ETypeAnnotationKind::Optional) {
  775. type = type->Cast<TOptionalExprType>()->GetItemType();
  776. }
  777. auto varType = type->Cast<TVariantExprType>();
  778. if (varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
  779. auto index = FromString<ui32>(node.Child(1)->Content());
  780. return ctx.ProgramBuilder.Guess(variantObj, index);
  781. } else {
  782. return ctx.ProgramBuilder.Guess(variantObj, node.Child(1)->Content());
  783. }
  784. });
  785. AddCallable("Visit", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  786. const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
  787. const auto type = node.Head().GetTypeAnn()->Cast<TVariantExprType>();
  788. const TTupleExprType* tupleType = nullptr;
  789. const TStructExprType* structType = nullptr;
  790. std::vector<TExprNode*> lambdas;
  791. TRuntimeNode defaultValue;
  792. if (type->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
  793. tupleType = type->GetUnderlyingType()->Cast<TTupleExprType>();
  794. lambdas.resize(tupleType->GetSize());
  795. } else {
  796. structType = type->GetUnderlyingType()->Cast<TStructExprType>();
  797. lambdas.resize(structType->GetSize());
  798. }
  799. for (ui32 index = 1; index < node.ChildrenSize(); ++index) {
  800. const auto child = node.Child(index);
  801. if (!child->IsAtom()) {
  802. defaultValue = MkqlBuildExpr(*child, ctx);
  803. continue;
  804. }
  805. ui32 itemIndex;
  806. if (tupleType) {
  807. itemIndex = FromString<ui32>(child->Content());
  808. } else {
  809. itemIndex = *structType->FindItem(child->Content());
  810. }
  811. YQL_ENSURE(itemIndex < lambdas.size());
  812. ++index;
  813. lambdas[itemIndex] = node.Child(index);
  814. }
  815. const auto handler = [&](ui32 index, TRuntimeNode item) {
  816. if (const auto lambda = lambdas[index]) {
  817. return MkqlBuildLambda(*lambda, ctx, {item});
  818. }
  819. return defaultValue;
  820. };
  821. return ctx.ProgramBuilder.VisitAll(variantObj, handler);
  822. });
  823. AddCallable("CurrentActorId", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  824. const auto retType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  825. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  826. return TRuntimeNode(call.Build(), false);
  827. });
  828. AddCallable("Uint8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  829. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui8>(node.Head(), NUdf::EDataSlot::Uint8));
  830. });
  831. AddCallable("Int8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  832. return ctx.ProgramBuilder.NewDataLiteral(FromString<i8>(node.Head(), NUdf::EDataSlot::Int8));
  833. });
  834. AddCallable("Uint16", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  835. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui16>(node.Head(), NUdf::EDataSlot::Uint16));
  836. });
  837. AddCallable("Int16", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  838. return ctx.ProgramBuilder.NewDataLiteral(FromString<i16>(node.Head(), NUdf::EDataSlot::Int16));
  839. });
  840. AddCallable("Int32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  841. return ctx.ProgramBuilder.NewDataLiteral(FromString<i32>(node.Head(), NUdf::EDataSlot::Int32));
  842. });
  843. AddCallable("Uint32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  844. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui32>(node.Head(), NUdf::EDataSlot::Uint32));
  845. });
  846. AddCallable("Int64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  847. return ctx.ProgramBuilder.NewDataLiteral(FromString<i64>(node.Head(), NUdf::EDataSlot::Int64));
  848. });
  849. AddCallable("Uint64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  850. return ctx.ProgramBuilder.NewDataLiteral(FromString<ui64>(node.Head(), NUdf::EDataSlot::Uint64));
  851. });
  852. AddCallable("String", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  853. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content());
  854. });
  855. AddCallable("Utf8", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  856. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Utf8>(node.Head().Content());
  857. });
  858. AddCallable("Yson", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  859. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(node.Head().Content());
  860. });
  861. AddCallable("Json", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  862. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Json>(node.Head().Content());
  863. });
  864. AddCallable("JsonDocument", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  865. // NOTE: ValueFromString returns TUnboxedValuePod. This type does not free string inside it during destruction.
  866. // To get smart pointer-like behaviour we convert TUnboxedValuePod to TUnboxedValue. Without this conversion there
  867. // will be a memory leak.
  868. NUdf::TUnboxedValue jsonDocument = ValueFromString(NUdf::EDataSlot::JsonDocument, node.Head().Content());
  869. MKQL_ENSURE(bool(jsonDocument), "Invalid JsonDocument literal");
  870. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::JsonDocument>(jsonDocument.AsStringRef());
  871. });
  872. AddCallable("Uuid", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  873. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Uuid>(node.Head().Content());
  874. });
  875. AddCallable("Decimal", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  876. const auto precision = FromString<ui8>(node.Child(1)->Content());
  877. const auto scale = FromString<ui8>(node.Child(2)->Content());
  878. MKQL_ENSURE(precision > 0, "Precision must be positive.");
  879. MKQL_ENSURE(scale <= precision, "Scale too large.");
  880. const auto data = NDecimal::FromString(node.Head().Content(), precision, scale);
  881. MKQL_ENSURE(!NDecimal::IsError(data), "Bad decimal.");
  882. return ctx.ProgramBuilder.NewDecimalLiteral(data, precision, scale);
  883. });
  884. AddCallable("Bool", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  885. return ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(node.Head(), NUdf::EDataSlot::Bool));
  886. });
  887. AddCallable("Float", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  888. return ctx.ProgramBuilder.NewDataLiteral(FromString<float>(node.Head(), NUdf::EDataSlot::Float));
  889. });
  890. AddCallable("Double", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  891. return ctx.ProgramBuilder.NewDataLiteral(FromString<double>(node.Head(), NUdf::EDataSlot::Double));
  892. });
  893. AddCallable("DyNumber", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  894. const NUdf::TUnboxedValue val = ValueFromString(NUdf::EDataSlot::DyNumber, node.Head().Content());
  895. MKQL_ENSURE(val, "Bad DyNumber: " << TString(node.Head().Content()).Quote());
  896. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::DyNumber>(val.AsStringRef());
  897. });
  898. AddCallable("Date", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  899. const auto value = FromString<ui16>(node.Head(), NUdf::EDataSlot::Date);
  900. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Date>(
  901. NUdf::TStringRef((const char*)&value, sizeof(value)));
  902. });
  903. AddCallable("Datetime", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  904. const auto value = FromString<ui32>(node.Head(), NUdf::EDataSlot::Datetime);
  905. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Datetime>(
  906. NUdf::TStringRef((const char*)&value, sizeof(value)));
  907. });
  908. AddCallable("Timestamp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  909. const auto value = FromString<ui64>(node.Head(), NUdf::EDataSlot::Timestamp);
  910. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Timestamp>(
  911. NUdf::TStringRef((const char*)&value, sizeof(value)));
  912. });
  913. AddCallable("Interval", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  914. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Interval);
  915. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(
  916. NUdf::TStringRef((const char*)&value, sizeof(value)));
  917. });
  918. AddCallable("TzDate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  919. const auto& parts = CutTimezone<ui16>(node.Head().Content());
  920. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDate>(parts.first, parts.second);
  921. });
  922. AddCallable("TzDatetime", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  923. const auto& parts = CutTimezone<ui32>(node.Head().Content());
  924. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzDatetime>(parts.first, parts.second);
  925. });
  926. AddCallable("TzTimestamp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  927. const auto& parts = CutTimezone<ui64>(node.Head().Content());
  928. return ctx.ProgramBuilder.NewTzDataLiteral<NUdf::TTzTimestamp>(parts.first, parts.second);
  929. });
  930. AddCallable("Date32", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  931. const auto value = FromString<i32>(node.Head(), NUdf::EDataSlot::Date32);
  932. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Date32>(
  933. NUdf::TStringRef((const char*)&value, sizeof(value)));
  934. });
  935. AddCallable("Datetime64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  936. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Datetime64);
  937. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Datetime64>(
  938. NUdf::TStringRef((const char*)&value, sizeof(value)));
  939. });
  940. AddCallable("Timestamp64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  941. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Timestamp64);
  942. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Timestamp64>(
  943. NUdf::TStringRef((const char*)&value, sizeof(value)));
  944. });
  945. AddCallable("Interval64", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  946. const auto value = FromString<i64>(node.Head(), NUdf::EDataSlot::Interval64);
  947. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Interval64>(
  948. NUdf::TStringRef((const char*)&value, sizeof(value)));
  949. });
  950. AddCallable("FoldMap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  951. const auto list = MkqlBuildExpr(node.Head(), ctx);
  952. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  953. return ctx.ProgramBuilder.ChainMap(list, state, [&](TRuntimeNode item, TRuntimeNode state) {
  954. return MkqlBuildSplitLambda(*node.Child(2), ctx, {item, state});
  955. });
  956. });
  957. AddCallable("Fold1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  958. const auto list = MkqlBuildExpr(node.Head(), ctx);
  959. return ctx.ProgramBuilder.Chain1Map(list, [&](TRuntimeNode item) {
  960. return MkqlBuildSplitLambda(*node.Child(1), ctx, {item});
  961. }, [&](TRuntimeNode item, TRuntimeNode state) {
  962. return MkqlBuildSplitLambda(*node.Child(2), ctx, {item, state});
  963. });
  964. });
  965. AddCallable("Chain1Map", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  966. const auto list = MkqlBuildExpr(node.Head(), ctx);
  967. return ctx.ProgramBuilder.Chain1Map(list,
  968. [&](TRuntimeNode item) -> std::array<TRuntimeNode, 2U> {
  969. const auto out = MkqlBuildLambda(*node.Child(1), ctx, {item});
  970. return {{out, out}};
  971. }, [&](TRuntimeNode item, TRuntimeNode state) -> std::array<TRuntimeNode, 2U> {
  972. const auto out = MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  973. return {{out, out}};
  974. });
  975. });
  976. AddCallable("Extract", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  977. const auto list = MkqlBuildExpr(node.Head(), ctx);
  978. const auto name = node.Tail().Content();
  979. return ctx.ProgramBuilder.Extract(list, name);
  980. });
  981. AddCallable("OrderedExtract", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  982. const auto list = MkqlBuildExpr(node.Head(), ctx);
  983. const auto name = node.Child(1)->Content();
  984. return ctx.ProgramBuilder.OrderedExtract(list, name);
  985. });
  986. AddCallable("Fold", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  987. const auto list = MkqlBuildExpr(node.Head(), ctx);
  988. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  989. return ctx.ProgramBuilder.Fold(list, state, [&](TRuntimeNode item, TRuntimeNode state) {
  990. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  991. });
  992. });
  993. AddCallable("MapNext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  994. const auto list = MkqlBuildExpr(node.Head(), ctx);
  995. return ctx.ProgramBuilder.MapNext(list, [&](TRuntimeNode item, TRuntimeNode nextItem) {
  996. return MkqlBuildLambda(node.Tail(), ctx, {item, nextItem});
  997. });
  998. });
  999. AddCallable("Fold1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1000. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1001. return ctx.ProgramBuilder.Fold1(list, [&](TRuntimeNode item) {
  1002. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1003. }, [&](TRuntimeNode item, TRuntimeNode state) {
  1004. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1005. });
  1006. });
  1007. AddCallable("Condense", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1008. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1009. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1010. return ctx.ProgramBuilder.Condense(stream, state,
  1011. [&](TRuntimeNode item, TRuntimeNode state) {
  1012. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1013. },
  1014. [&](TRuntimeNode item, TRuntimeNode state) {
  1015. return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
  1016. }, HasContextFuncs(*node.Child(3)));
  1017. });
  1018. AddCallable("Condense1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1019. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1020. return ctx.ProgramBuilder.Condense1(stream,
  1021. [&](TRuntimeNode item) {
  1022. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1023. },
  1024. [&](TRuntimeNode item, TRuntimeNode state) {
  1025. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1026. },
  1027. [&](TRuntimeNode item, TRuntimeNode state) {
  1028. return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
  1029. }, HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
  1030. });
  1031. AddCallable("Squeeze", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1032. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1033. const auto state = MkqlBuildExpr(*node.Child(1), ctx);
  1034. return ctx.ProgramBuilder.Squeeze(stream, state, [&](TRuntimeNode item, TRuntimeNode state) {
  1035. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1036. }, node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1037. return MkqlBuildLambda(*node.Child(3), ctx, {state});
  1038. }, node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1039. return MkqlBuildLambda(*node.Child(4), ctx, {state});
  1040. });
  1041. });
  1042. AddCallable("Squeeze1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1043. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1044. return ctx.ProgramBuilder.Squeeze1(stream, [&](TRuntimeNode item) {
  1045. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1046. }, [&](TRuntimeNode item, TRuntimeNode state) {
  1047. return MkqlBuildLambda(*node.Child(2), ctx, {item, state});
  1048. }, node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1049. return MkqlBuildLambda(*node.Child(3), ctx, {state});
  1050. }, node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1051. return MkqlBuildLambda(*node.Child(4), ctx, {state});
  1052. });
  1053. });
  1054. AddCallable("Sort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1055. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1056. const auto ascending = MkqlBuildExpr(*node.Child(1), ctx);
  1057. return ctx.ProgramBuilder.Sort(list, ascending, [&](TRuntimeNode item) {
  1058. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1059. });
  1060. });
  1061. AddCallable({"Top", "TopSort"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1062. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1063. const auto count = MkqlBuildExpr(*node.Child(1), ctx);
  1064. const auto ascending = MkqlBuildExpr(*node.Child(2), ctx);
  1065. return (ctx.ProgramBuilder.*(node.Content() == "Top" ? &TProgramBuilder::Top : &TProgramBuilder::TopSort))
  1066. (list, count, ascending, [&](TRuntimeNode item) {
  1067. return MkqlBuildLambda(*node.Child(3), ctx, {item});
  1068. });
  1069. });
  1070. AddCallable("KeepTop", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1071. const auto count = MkqlBuildExpr(node.Head(), ctx);
  1072. const auto list = MkqlBuildExpr(*node.Child(1), ctx);
  1073. const auto item = MkqlBuildExpr(*node.Child(2), ctx);
  1074. const auto ascending = MkqlBuildExpr(*node.Child(3), ctx);
  1075. return ctx.ProgramBuilder.KeepTop(count, list, item, ascending, [&](TRuntimeNode item) {
  1076. return MkqlBuildLambda(*node.Child(4), ctx, {item});
  1077. });
  1078. });
  1079. AddCallable("Struct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1080. const auto structType = BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), ctx.ProgramBuilder);
  1081. const auto verifiedStructType = AS_TYPE(TStructType, structType);
  1082. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  1083. members.reserve(verifiedStructType->GetMembersCount());
  1084. node.ForEachChild([&](const TExprNode& child) {
  1085. members.emplace_back(child.Head().Content(), MkqlBuildExpr(child.Tail(), ctx));
  1086. });
  1087. return ctx.ProgramBuilder.NewStruct(verifiedStructType, members);
  1088. });
  1089. AddCallable("AddMember", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1090. const auto structObj = MkqlBuildExpr(node.Head(), ctx);
  1091. const auto memberName = node.Child(1)->Content();
  1092. const auto value = MkqlBuildExpr(node.Tail(), ctx);
  1093. return ctx.ProgramBuilder.AddMember(structObj, memberName, value);
  1094. });
  1095. AddCallable("List", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1096. const auto listType = BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), ctx.ProgramBuilder);
  1097. const auto itemType = AS_TYPE(TListType, listType)->GetItemType();
  1098. const auto& items = GetArgumentsFrom<1U>(node, ctx);
  1099. return ctx.ProgramBuilder.NewList(itemType, items);
  1100. });
  1101. AddCallable("FromString", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1102. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1103. const auto type = BuildType(node.Head(), *node.GetTypeAnn(), ctx.ProgramBuilder);
  1104. return ctx.ProgramBuilder.FromString(arg, type);
  1105. });
  1106. AddCallable("StrictFromString", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1107. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1108. const auto type = BuildType(node.Head(), *node.GetTypeAnn(), ctx.ProgramBuilder);
  1109. return ctx.ProgramBuilder.StrictFromString(arg, type);
  1110. });
  1111. AddCallable("FromBytes", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1112. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1113. const auto schemeType = ParseDataType(node, node.Tail().Content());
  1114. return ctx.ProgramBuilder.FromBytes(arg, schemeType);
  1115. });
  1116. AddCallable("Convert", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1117. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1118. const auto type = BuildType(node.Head(), *node.GetTypeAnn(), ctx.ProgramBuilder);
  1119. return ctx.ProgramBuilder.Convert(arg, type);
  1120. });
  1121. AddCallable("ToIntegral", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1122. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1123. const auto type = BuildType(node.Head(), *node.GetTypeAnn(), ctx.ProgramBuilder);
  1124. return ctx.ProgramBuilder.ToIntegral(arg, type);
  1125. });
  1126. AddCallable("UnsafeTimestampCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1127. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1128. const auto type = BuildType(node.Head(), *node.GetTypeAnn(), ctx.ProgramBuilder);
  1129. return ctx.ProgramBuilder.Convert(arg, type);
  1130. });
  1131. AddCallable("SafeCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1132. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1133. const auto type = BuildType(node.Head(), *node.GetTypeAnn(), ctx.ProgramBuilder);
  1134. return ctx.ProgramBuilder.Cast(arg, type);
  1135. });
  1136. AddCallable("Default", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1137. const auto type = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1138. return ctx.ProgramBuilder.Default(type);
  1139. });
  1140. AddCallable("Coalesce", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1141. auto ret = MkqlBuildExpr(node.Head(), ctx);
  1142. for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
  1143. auto value = MkqlBuildExpr(*node.Child(i), ctx);
  1144. ret = ctx.ProgramBuilder.Coalesce(ret, value);
  1145. }
  1146. return ret;
  1147. });
  1148. AddCallable("Unwrap", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1149. const auto opt = MkqlBuildExpr(node.Head(), ctx);
  1150. const auto message = node.ChildrenSize() > 1 ? MkqlBuildExpr(node.Tail(), ctx) : ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  1151. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1152. return ctx.ProgramBuilder.Unwrap(opt, message, pos.File, pos.Row, pos.Column);
  1153. });
  1154. AddCallable("EmptyFrom", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1155. const auto type = BuildType(node.Head(), *node.GetTypeAnn(), ctx.ProgramBuilder);
  1156. switch (node.GetTypeAnn()->GetKind()) {
  1157. case ETypeAnnotationKind::Flow:
  1158. case ETypeAnnotationKind::Stream:
  1159. return ctx.ProgramBuilder.EmptyIterator(type);
  1160. case ETypeAnnotationKind::Optional:
  1161. return ctx.ProgramBuilder.NewEmptyOptional(type);
  1162. case ETypeAnnotationKind::List:
  1163. return ctx.ProgramBuilder.NewEmptyList(AS_TYPE(TListType, type)->GetItemType());
  1164. case ETypeAnnotationKind::Dict:
  1165. return ctx.ProgramBuilder.NewDict(type, {});
  1166. default:
  1167. ythrow TNodeException(node) << "Empty from " << *node.GetTypeAnn() << " isn't supported.";
  1168. }
  1169. });
  1170. AddCallable("Nothing", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1171. const auto optType = BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), ctx.ProgramBuilder);
  1172. return ctx.ProgramBuilder.NewEmptyOptional(optType);
  1173. });
  1174. AddCallable("Unpickle", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1175. const auto type = BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), ctx.ProgramBuilder);
  1176. const auto serialized = MkqlBuildExpr(node.Tail(), ctx);
  1177. return ctx.ProgramBuilder.Unpickle(type, serialized);
  1178. });
  1179. AddCallable("Optional", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1180. const auto optType = BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), ctx.ProgramBuilder);
  1181. const auto arg = MkqlBuildExpr(node.Tail(), ctx);
  1182. return ctx.ProgramBuilder.NewOptional(optType, arg);
  1183. });
  1184. AddCallable("Iterator", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1185. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1186. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  1187. return ctx.ProgramBuilder.Iterator(arg, args);
  1188. });
  1189. AddCallable("EmptyIterator", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1190. const auto streamType = BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), ctx.ProgramBuilder);
  1191. return ctx.ProgramBuilder.EmptyIterator(streamType);
  1192. });
  1193. AddCallable("Switch", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1194. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1195. std::vector<TSwitchInput> inputs;
  1196. ui64 memoryLimitBytes = FromString<ui64>(node.Child(1)->Content());
  1197. ui32 offset = 0;
  1198. for (ui32 i = 2; i < node.ChildrenSize(); i += 2) {
  1199. TSwitchInput input;
  1200. for (auto& child : node.Child(i)->Children()) {
  1201. input.Indicies.push_back(FromString<ui32>(child->Content()));
  1202. }
  1203. const auto& lambda = *node.Child(i + 1);
  1204. const auto& lambdaArg = lambda.Head().Head();
  1205. auto outputStreams = 1;
  1206. const auto& streamItemType = GetSeqItemType(*lambda.Tail().GetTypeAnn());
  1207. if (streamItemType.GetKind() == ETypeAnnotationKind::Variant) {
  1208. outputStreams = streamItemType.Cast<TVariantExprType>()->GetUnderlyingType()->Cast<TTupleExprType>()->GetSize();
  1209. }
  1210. if (node.ChildrenSize() > 4 || outputStreams != 1) {
  1211. input.ResultVariantOffset = offset;
  1212. }
  1213. offset += outputStreams;
  1214. input.InputType = BuildType(lambdaArg, *lambdaArg.GetTypeAnn(), ctx.ProgramBuilder);
  1215. inputs.emplace_back(input);
  1216. }
  1217. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1218. return ctx.ProgramBuilder.Switch(stream, inputs, [&](ui32 index, TRuntimeNode item) -> TRuntimeNode {
  1219. return MkqlBuildLambda(*node.Child(2 + 2 * index + 1), ctx, {item});
  1220. }, memoryLimitBytes, returnType);
  1221. });
  1222. AddCallable("ToStream", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1223. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1224. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  1225. return ctx.ProgramBuilder.Iterator(ctx.ProgramBuilder.ToList(arg), args);
  1226. });
  1227. AddCallable(LeftName, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1228. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1229. return ctx.ProgramBuilder.Nth(arg, 0);
  1230. });
  1231. AddCallable(RightName, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1232. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1233. return ctx.ProgramBuilder.Nth(arg, 1);
  1234. });
  1235. AddCallable("FilterNullMembers", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1236. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1237. if (node.ChildrenSize() < 2U) {
  1238. return ctx.ProgramBuilder.FilterNullMembers(list);
  1239. } else {
  1240. std::vector<std::string_view> members;
  1241. members.reserve(node.Tail().ChildrenSize());
  1242. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Content()); });
  1243. return ctx.ProgramBuilder.FilterNullMembers(list, members);
  1244. }
  1245. });
  1246. AddCallable("SkipNullMembers", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1247. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1248. if (node.ChildrenSize() < 2U) {
  1249. return ctx.ProgramBuilder.SkipNullMembers(list);
  1250. } else {
  1251. std::vector<std::string_view> members;
  1252. members.reserve(node.Tail().ChildrenSize());
  1253. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Content()); });
  1254. return ctx.ProgramBuilder.SkipNullMembers(list, members);
  1255. }
  1256. });
  1257. AddCallable("FilterNullElements", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1258. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1259. if (node.ChildrenSize() < 2U) {
  1260. return ctx.ProgramBuilder.FilterNullElements(list);
  1261. } else {
  1262. std::vector<ui32> members;
  1263. members.reserve(node.Tail().ChildrenSize());
  1264. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(FromString<ui32>(child.Content())); });
  1265. return ctx.ProgramBuilder.FilterNullElements(list, members);
  1266. }
  1267. });
  1268. AddCallable("SkipNullElements", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1269. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1270. if (node.ChildrenSize() < 2U) {
  1271. return ctx.ProgramBuilder.SkipNullElements(list);
  1272. } else {
  1273. std::vector<ui32> members;
  1274. members.reserve(node.Tail().ChildrenSize());
  1275. node.Tail().ForEachChild([&](const TExprNode& child){ members.emplace_back(FromString<ui32>(child.Content())); });
  1276. return ctx.ProgramBuilder.SkipNullElements(list, members);
  1277. }
  1278. });
  1279. AddCallable("MapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1280. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1281. const auto dict = MkqlBuildExpr(*node.Child(1), ctx);
  1282. const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
  1283. const auto& outputItemType = GetSeqItemType(*node.GetTypeAnn());
  1284. auto rightItemType = node.Child(1U)->GetTypeAnn()->Cast<TDictExprType>()->GetPayloadType();
  1285. if (ETypeAnnotationKind::List == rightItemType->GetKind()) {
  1286. rightItemType = rightItemType->Cast<TListExprType>()->GetItemType();
  1287. }
  1288. std::vector<ui32> leftKeyColumns, leftRenames, rightRenames;
  1289. switch (const auto& inputItemType = GetSeqItemType(*node.Head().GetTypeAnn()); inputItemType.GetKind()) {
  1290. case ETypeAnnotationKind::Struct: {
  1291. const auto inputStructType = inputItemType.Cast<TStructExprType>();
  1292. const auto outputStructType = outputItemType.Cast<TStructExprType>();
  1293. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1294. bool s = false;
  1295. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputStructType : *outputStructType, child.Content())); });
  1296. switch (rightItemType->GetKind()) {
  1297. case ETypeAnnotationKind::Struct: {
  1298. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1299. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1300. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightStructType : *outputStructType, child.Content())); });
  1301. }
  1302. break;
  1303. case ETypeAnnotationKind::Tuple: {
  1304. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1305. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1306. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightTupleType, child.Content()) : *GetFieldPosition(*outputStructType, child.Content())); });
  1307. }
  1308. break;
  1309. default:
  1310. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1311. }
  1312. break;
  1313. }
  1314. case ETypeAnnotationKind::Tuple: {
  1315. const auto inputTupleType = inputItemType.Cast<TTupleExprType>();
  1316. const auto outputTupleType = outputItemType.Cast<TTupleExprType>();
  1317. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1318. bool s = false;
  1319. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputTupleType : *outputTupleType, child.Content())); });
  1320. switch (rightItemType->GetKind()) {
  1321. case ETypeAnnotationKind::Tuple: {
  1322. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1323. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1324. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); });
  1325. }
  1326. break;
  1327. case ETypeAnnotationKind::Struct: {
  1328. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1329. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1330. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightStructType, child.Content()) : *GetFieldPosition(*outputTupleType, child.Content())); });
  1331. }
  1332. break;
  1333. default:
  1334. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1335. }
  1336. break;
  1337. }
  1338. case ETypeAnnotationKind::Multi: {
  1339. const auto inputMultiType = inputItemType.Cast<TMultiExprType>();
  1340. const auto outputMultiType = outputItemType.Cast<TMultiExprType>();
  1341. node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1342. bool s = false;
  1343. node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *inputMultiType : *outputMultiType, child.Content())); });
  1344. switch (rightItemType->GetKind()) {
  1345. case ETypeAnnotationKind::Tuple: {
  1346. const auto rightTupleType = rightItemType->Cast<TTupleExprType>();
  1347. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1348. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightTupleType, child.Content()) : *GetFieldPosition(*outputMultiType, child.Content())); });
  1349. }
  1350. break;
  1351. case ETypeAnnotationKind::Struct: {
  1352. const auto rightStructType = rightItemType->Cast<TStructExprType>();
  1353. node.Child(6)->ForEachChild([&](const TExprNode& child){
  1354. rightRenames.emplace_back((s = !s) ? *GetFieldPosition(*rightStructType, child.Content()) : *GetFieldPosition(*outputMultiType, child.Content())); });
  1355. }
  1356. break;
  1357. default:
  1358. MKQL_ENSURE(!node.Child(6)->ChildrenSize(), "Expected empty right output columns.");
  1359. }
  1360. break;
  1361. }
  1362. default:
  1363. ythrow TNodeException(node) << "Wrong MapJoinCore input item type: " << inputItemType;
  1364. }
  1365. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1366. return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType);
  1367. });
  1368. AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1369. bool selfJoin = node.Content() == "GraceSelfJoinCore";
  1370. int shift = selfJoin ? 0 : 1;
  1371. const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx);
  1372. const auto flowRight = MkqlBuildExpr(*node.Child(shift), ctx);
  1373. const auto joinKind = GetJoinKind(node, node.Child(shift + 1)->Content());
  1374. const auto& outputItemType = GetSeqItemType(*node.GetTypeAnn());
  1375. std::vector<ui32> leftKeyColumns, rightKeyColumns, leftRenames, rightRenames;
  1376. const auto& leftItemType = GetSeqItemType(*node.Child(0)->GetTypeAnn());
  1377. const auto& rightItemType = GetSeqItemType(*node.Child(shift)->GetTypeAnn());
  1378. if (leftItemType.GetKind() != ETypeAnnotationKind::Multi ||
  1379. rightItemType.GetKind() != ETypeAnnotationKind::Multi ) {
  1380. ythrow TNodeException(node) << "Wrong GraceJoinCore input item type: " << leftItemType << " " << rightItemType;
  1381. }
  1382. if (outputItemType.GetKind() != ETypeAnnotationKind::Multi ) {
  1383. ythrow TNodeException(node) << "Wrong GraceJoinCore output item type: " << outputItemType;
  1384. }
  1385. const auto leftTupleType = leftItemType.Cast<TMultiExprType>();
  1386. const auto rightTupleType = rightItemType.Cast<TMultiExprType>();
  1387. const auto outputTupleType = outputItemType.Cast<TMultiExprType>();
  1388. node.Child(shift + 2)->ForEachChild([&](TExprNode& child){
  1389. leftKeyColumns.emplace_back(*GetFieldPosition(*leftTupleType, child.Content()));
  1390. });
  1391. node.Child(shift + 3)->ForEachChild([&](TExprNode& child){
  1392. rightKeyColumns.emplace_back(*GetFieldPosition(*rightTupleType, child.Content())); });
  1393. bool s = false;
  1394. node.Child(shift + 4)->ForEachChild([&](TExprNode& child){
  1395. leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *leftTupleType : *outputTupleType, child.Content())); });
  1396. s = false;
  1397. node.Child(shift + 5)->ForEachChild([&](TExprNode& child){
  1398. rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); });
  1399. auto anyJoinSettings = EAnyJoinSettings::None;
  1400. node.Tail().ForEachChild([&](const TExprNode& flag) {
  1401. if (flag.IsAtom("LeftAny"))
  1402. anyJoinSettings = EAnyJoinSettings::Right == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Left;
  1403. else if (flag.IsAtom("RightAny"))
  1404. anyJoinSettings = EAnyJoinSettings::Left == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Right;
  1405. });
  1406. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1407. return selfJoin
  1408. ? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
  1409. : ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
  1410. });
  1411. AddCallable("CommonJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1412. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1413. const auto joinKind = GetJoinKind(node, node.Child(1)->Content());
  1414. std::vector<ui32> leftColumns, rightColumns, requiredColumns, keyColumns;
  1415. ui32 tableIndexFieldPos;
  1416. switch (const auto& inputItemType = GetSeqItemType(*node.Head().GetTypeAnn()); inputItemType.GetKind()) {
  1417. case ETypeAnnotationKind::Struct: {
  1418. const auto inputStructType = inputItemType.Cast<TStructExprType>();
  1419. const auto outputStructType = GetSeqItemType(*node.GetTypeAnn()).Cast<TStructExprType>();
  1420. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1421. leftColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content()));
  1422. leftColumns.emplace_back(*GetFieldPosition(*outputStructType, child.Content()));
  1423. });
  1424. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1425. rightColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content()));
  1426. rightColumns.emplace_back(*GetFieldPosition(*outputStructType, child.Content()));
  1427. });
  1428. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1429. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputStructType, child.Content())); });
  1430. tableIndexFieldPos = *GetFieldPosition(*inputStructType, node.Tail().Content());
  1431. break;
  1432. }
  1433. case ETypeAnnotationKind::Tuple: {
  1434. const auto inputTupleType = inputItemType.Cast<TTupleExprType>();
  1435. ui32 i = 0U;
  1436. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1437. leftColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content()));
  1438. leftColumns.emplace_back(i++);
  1439. });
  1440. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1441. rightColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content()));
  1442. rightColumns.emplace_back(i++);
  1443. });
  1444. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1445. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputTupleType, child.Content())); });
  1446. tableIndexFieldPos = *GetFieldPosition(*inputTupleType, node.Tail().Content());
  1447. break;
  1448. }
  1449. case ETypeAnnotationKind::Multi: {
  1450. const auto inputMultiType = inputItemType.Cast<TMultiExprType>();
  1451. ui32 i = 0U;
  1452. node.Child(2)->ForEachChild([&](const TExprNode& child){
  1453. leftColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content()));
  1454. leftColumns.emplace_back(i++);
  1455. });
  1456. node.Child(3)->ForEachChild([&](const TExprNode& child){
  1457. rightColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content()));
  1458. rightColumns.emplace_back(i++);
  1459. });
  1460. node.Child(4)->ForEachChild([&](const TExprNode& child){ requiredColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1461. node.Child(5)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetFieldPosition(*inputMultiType, child.Content())); });
  1462. tableIndexFieldPos = *GetFieldPosition(*inputMultiType, node.Tail().Content());
  1463. break;
  1464. }
  1465. default:
  1466. ythrow TNodeException(node) << "Wrong CommonJoinCore input item type: " << inputItemType;
  1467. }
  1468. ui64 memLimit = 0U;
  1469. if (const auto memLimitSetting = GetSetting(*node.Child(6), "memLimit")) {
  1470. memLimit = FromString<ui64>(memLimitSetting->Child(1)->Content());
  1471. }
  1472. std::optional<ui32> sortedTableOrder;
  1473. if (const auto sortSetting = GetSetting(*node.Child(6), "sorted")) {
  1474. sortedTableOrder = sortSetting->Child(1)->Content() == "left" ? 0 : 1;
  1475. }
  1476. EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None;
  1477. if (const auto anyNode = GetSetting(*node.Child(6), "any")) {
  1478. for (auto sideNode : anyNode->Child(1)->Children()) {
  1479. YQL_ENSURE(sideNode->IsAtom());
  1480. AddAnyJoinSide(anyJoinSettings, sideNode->Content() == "left" ? EAnyJoinSettings::Left : EAnyJoinSettings::Right);
  1481. }
  1482. }
  1483. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1484. return ctx.ProgramBuilder.CommonJoinCore(list, joinKind, leftColumns, rightColumns,
  1485. requiredColumns, keyColumns, memLimit, sortedTableOrder, anyJoinSettings, tableIndexFieldPos, returnType);
  1486. });
  1487. AddCallable("CombineCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1488. NNodes::TCoCombineCore core(&node);
  1489. const auto stream = MkqlBuildExpr(core.Input().Ref(), ctx);
  1490. const auto memLimit = FromString<ui64>(core.MemLimit().Cast().Value());
  1491. const auto keyExtractor = [&](TRuntimeNode item) {
  1492. return MkqlBuildLambda(core.KeyExtractor().Ref(), ctx, {item});
  1493. };
  1494. const auto init = [&](TRuntimeNode key, TRuntimeNode item) {
  1495. return MkqlBuildLambda(core.InitHandler().Ref(), ctx, {key, item});
  1496. };
  1497. const auto update = [&](TRuntimeNode key, TRuntimeNode item, TRuntimeNode state) {
  1498. return MkqlBuildLambda(core.UpdateHandler().Ref(), ctx, {key, item, state});
  1499. };
  1500. const auto finish = [&](TRuntimeNode key, TRuntimeNode state) {
  1501. return MkqlBuildLambda(core.FinishHandler().Ref(), ctx, {key, state});
  1502. };
  1503. return ctx.ProgramBuilder.CombineCore(stream, keyExtractor, init, update, finish, memLimit);
  1504. });
  1505. AddCallable("GroupingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1506. NNodes::TCoGroupingCore core(&node);
  1507. const auto stream = MkqlBuildExpr(core.Input().Ref(), ctx);
  1508. const auto groupSwitch = [&](TRuntimeNode key, TRuntimeNode item) {
  1509. return MkqlBuildLambda(core.GroupSwitch().Ref(), ctx, {key, item});
  1510. };
  1511. const auto keyExtractor = [&](TRuntimeNode item) {
  1512. return MkqlBuildLambda(core.KeyExtractor().Ref(), ctx, {item});
  1513. };
  1514. TProgramBuilder::TUnaryLambda handler;
  1515. if (auto lambda = core.ConvertHandler()) {
  1516. handler = [&](TRuntimeNode item) {
  1517. return MkqlBuildLambda(core.ConvertHandler().Ref(), ctx, {item});
  1518. };
  1519. }
  1520. return ctx.ProgramBuilder.GroupingCore(stream, groupSwitch, keyExtractor, handler);
  1521. });
  1522. AddCallable("Chopper", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1523. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1524. const auto keyExtractor = [&](TRuntimeNode item) {
  1525. return MkqlBuildLambda(*node.Child(1U), ctx, {item});
  1526. };
  1527. const auto groupSwitch = [&](TRuntimeNode key, TRuntimeNode item) {
  1528. return MkqlBuildLambda(*node.Child(2U), ctx, {key, item});
  1529. };
  1530. const auto handler = [&](TRuntimeNode key, TRuntimeNode flow) {
  1531. return MkqlBuildLambda(node.Tail(), ctx, {key, flow});
  1532. };
  1533. return ctx.ProgramBuilder.Chopper(stream, keyExtractor, groupSwitch, handler);
  1534. });
  1535. AddCallable("HoppingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1536. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1537. const auto timeExtractor = [&](TRuntimeNode item) {
  1538. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1539. };
  1540. const auto hop = MkqlBuildExpr(*node.Child(2), ctx);
  1541. const auto interval = MkqlBuildExpr(*node.Child(3), ctx);
  1542. const auto delay = MkqlBuildExpr(*node.Child(4), ctx);
  1543. const auto init = [&](TRuntimeNode item) {
  1544. return MkqlBuildLambda(*node.Child(5), ctx, {item});
  1545. };
  1546. const auto update = [&](TRuntimeNode item, TRuntimeNode state) {
  1547. return MkqlBuildLambda(*node.Child(6), ctx, {item, state});
  1548. };
  1549. const auto save = node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1550. return MkqlBuildLambda(*node.Child(7), ctx, {state});
  1551. };
  1552. const auto load = node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1553. return MkqlBuildLambda(*node.Child(8), ctx, {state});
  1554. };
  1555. const auto merge = [&](TRuntimeNode state1, TRuntimeNode state2) {
  1556. return MkqlBuildLambda(*node.Child(9), ctx, {state1, state2});
  1557. };
  1558. const auto finish = [&](TRuntimeNode state, TRuntimeNode time) {
  1559. return MkqlBuildLambda(*node.Child(10), ctx, {state, time});
  1560. };
  1561. return ctx.ProgramBuilder.HoppingCore(
  1562. stream, timeExtractor, init, update, save, load, merge, finish, hop, interval, delay);
  1563. });
  1564. AddCallable("MultiHoppingCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1565. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1566. const auto keyExtractor = [&](TRuntimeNode item) {
  1567. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1568. };
  1569. const auto timeExtractor = [&](TRuntimeNode item) {
  1570. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1571. };
  1572. const auto hop = MkqlBuildExpr(*node.Child(3), ctx);
  1573. const auto interval = MkqlBuildExpr(*node.Child(4), ctx);
  1574. const auto delay = MkqlBuildExpr(*node.Child(5), ctx);
  1575. const auto dataWatermarks = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(6), NUdf::EDataSlot::Bool));
  1576. const auto init = [&](TRuntimeNode item) {
  1577. return MkqlBuildLambda(*node.Child(7), ctx, {item});
  1578. };
  1579. const auto update = [&](TRuntimeNode item, TRuntimeNode state) {
  1580. return MkqlBuildLambda(*node.Child(8), ctx, {item, state});
  1581. };
  1582. const auto save = node.Child(3)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1583. return MkqlBuildLambda(*node.Child(9), ctx, {state});
  1584. };
  1585. const auto load = node.Child(4)->IsCallable("Void") ? std::function<TRuntimeNode(TRuntimeNode)>() : [&](TRuntimeNode state) {
  1586. return MkqlBuildLambda(*node.Child(10), ctx, {state});
  1587. };
  1588. const auto merge = [&](TRuntimeNode state1, TRuntimeNode state2) {
  1589. return MkqlBuildLambda(*node.Child(11), ctx, {state1, state2});
  1590. };
  1591. const auto finish = [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) {
  1592. return MkqlBuildLambda(*node.Child(12), ctx, {key, state, time});
  1593. };
  1594. const auto watermarksMode = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(13), NUdf::EDataSlot::Bool));
  1595. return ctx.ProgramBuilder.MultiHoppingCore(
  1596. stream, keyExtractor, timeExtractor, init, update, save, load, merge, finish,
  1597. hop, interval, delay, dataWatermarks, watermarksMode);
  1598. });
  1599. AddCallable("ToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1600. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1601. TMaybe<bool> isMany;
  1602. TMaybe<EDictType> type;
  1603. TMaybe<ui64> itemsCount;
  1604. bool isCompact;
  1605. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1606. ythrow TNodeException(node) << error->GetMessage();
  1607. }
  1608. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1609. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::ToHashedDict : &TProgramBuilder::ToSortedDict;
  1610. return (ctx.ProgramBuilder.*factory)(list, *isMany, [&](TRuntimeNode item) {
  1611. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1612. }, [&](TRuntimeNode item) {
  1613. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1614. }, isCompact, itemsCount.GetOrElse(0));
  1615. });
  1616. AddCallable("SqueezeToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1617. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1618. TMaybe<bool> isMany;
  1619. TMaybe<EDictType> type;
  1620. TMaybe<ui64> itemsCount;
  1621. bool isCompact;
  1622. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1623. ythrow TNodeException(node) << error->GetMessage();
  1624. }
  1625. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1626. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::SqueezeToHashedDict : &TProgramBuilder::SqueezeToSortedDict;
  1627. return (ctx.ProgramBuilder.*factory)(stream, *isMany, [&](TRuntimeNode item) {
  1628. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1629. }, [&](TRuntimeNode item) {
  1630. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  1631. }, isCompact, itemsCount.GetOrElse(0));
  1632. });
  1633. AddCallable("NarrowSqueezeToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1634. const auto stream = MkqlBuildExpr(node.Head(), ctx);
  1635. TMaybe<bool> isMany;
  1636. TMaybe<EDictType> type;
  1637. TMaybe<ui64> itemsCount;
  1638. bool isCompact;
  1639. if (const auto error = ParseToDictSettings(node, ctx.ExprCtx, type, isMany, itemsCount, isCompact)) {
  1640. ythrow TNodeException(node) << error->GetMessage();
  1641. }
  1642. *type = SelectDictType(*type, node.Child(1)->GetTypeAnn());
  1643. const auto factory = *type == EDictType::Hashed ? &TProgramBuilder::NarrowSqueezeToHashedDict : &TProgramBuilder::NarrowSqueezeToSortedDict;
  1644. return (ctx.ProgramBuilder.*factory)(stream, *isMany, [&](TRuntimeNode::TList items) {
  1645. return MkqlBuildLambda(*node.Child(1), ctx, items);
  1646. }, [&](TRuntimeNode::TList items) {
  1647. return MkqlBuildLambda(*node.Child(2), ctx, items);
  1648. }, isCompact, itemsCount.GetOrElse(0));
  1649. });
  1650. AddCallable("GroupByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1651. const auto list = MkqlBuildExpr(node.Head(), ctx);
  1652. const auto dict = ctx.ProgramBuilder.ToHashedDict(list, true, [&](TRuntimeNode item) {
  1653. return MkqlBuildLambda(*node.Child(1), ctx, {item});
  1654. }, [&](TRuntimeNode item) {
  1655. return item;
  1656. });
  1657. const auto values = ctx.ProgramBuilder.DictItems(dict);
  1658. return ctx.ProgramBuilder.FlatMap(values, [&](TRuntimeNode item) {
  1659. const auto key = ctx.ProgramBuilder.Nth(item, 0);
  1660. const auto payloadList = ctx.ProgramBuilder.Nth(item, 1);
  1661. return MkqlBuildLambda(*node.Child(2), ctx, {key, payloadList});
  1662. });
  1663. });
  1664. AddCallable("PartitionByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1665. const NNodes::TCoPartitionByKey partition(&node);
  1666. const auto input = MkqlBuildExpr(partition.Input().Ref(), ctx);
  1667. const auto makePartitions = [&](TRuntimeNode list) {
  1668. return ctx.ProgramBuilder.Map(
  1669. ctx.ProgramBuilder.DictItems(ctx.ProgramBuilder.ToHashedDict(list, true,
  1670. [&](TRuntimeNode item) { return MkqlBuildLambda(partition.KeySelectorLambda().Ref(), ctx, {item}); },
  1671. [&](TRuntimeNode item) { return item; }
  1672. )),
  1673. [&](TRuntimeNode pair) {
  1674. const auto payload = partition.SortDirections().Ref().IsCallable("Void") ?
  1675. ctx.ProgramBuilder.Nth(pair, 1):
  1676. ctx.ProgramBuilder.Sort(ctx.ProgramBuilder.Nth(pair, 1), MkqlBuildExpr(partition.SortDirections().Ref(), ctx),
  1677. [&](TRuntimeNode item) {
  1678. return MkqlBuildLambda(partition.SortKeySelectorLambda().Ref(), ctx, {item});
  1679. }
  1680. );
  1681. return ctx.ProgramBuilder.NewTuple({ctx.ProgramBuilder.Nth(pair, 0), ctx.ProgramBuilder.Iterator(payload, {list})});
  1682. }
  1683. );
  1684. };
  1685. switch (const auto kind = partition.Ref().GetTypeAnn()->GetKind()) {
  1686. case ETypeAnnotationKind::Flow:
  1687. case ETypeAnnotationKind::Stream: {
  1688. const auto sorted = ctx.ProgramBuilder.FlatMap(
  1689. ctx.ProgramBuilder.Condense1(input,
  1690. [&](TRuntimeNode item) { return ctx.ProgramBuilder.AsList(item); },
  1691. [&](TRuntimeNode, TRuntimeNode) { return ctx.ProgramBuilder.NewDataLiteral(false); },
  1692. [&](TRuntimeNode item, TRuntimeNode state) { return ctx.ProgramBuilder.Append(state, item); }
  1693. ),
  1694. makePartitions
  1695. );
  1696. return ETypeAnnotationKind::Stream == kind ?MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {sorted}):
  1697. ctx.ProgramBuilder.ToFlow(MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {ctx.ProgramBuilder.FromFlow(sorted)}));
  1698. }
  1699. case ETypeAnnotationKind::List: {
  1700. const auto sorted = ctx.ProgramBuilder.Iterator(makePartitions(input), {});
  1701. return ctx.ProgramBuilder.Collect(MkqlBuildLambda(partition.ListHandlerLambda().Ref(), ctx, {sorted}));
  1702. }
  1703. default: break;
  1704. }
  1705. Y_ABORT("Wrong case.");
  1706. });
  1707. AddCallable("CombineByKey", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1708. return CombineByKeyImpl(node, ctx);
  1709. });
  1710. AddCallable("Enumerate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1711. const auto arg = MkqlBuildExpr(node.Head(), ctx);
  1712. TRuntimeNode start;
  1713. if (node.ChildrenSize() > 1) {
  1714. start = MkqlBuildExpr(*node.Child(1), ctx);
  1715. } else {
  1716. start = ctx.ProgramBuilder.NewDataLiteral<ui64>(0);
  1717. }
  1718. TRuntimeNode step;
  1719. if (node.ChildrenSize() > 2) {
  1720. step = MkqlBuildExpr(node.Tail(), ctx);
  1721. } else {
  1722. step = ctx.ProgramBuilder.NewDataLiteral<ui64>(1);
  1723. }
  1724. return ctx.ProgramBuilder.Enumerate(arg, start, step);
  1725. });
  1726. AddCallable("Dict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1727. const auto listType = BuildType(node.Head(), *node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), ctx.ProgramBuilder);
  1728. const auto dictType = AS_TYPE(TDictType, listType);
  1729. std::vector<std::pair<TRuntimeNode, TRuntimeNode>> items;
  1730. for (size_t i = 1; i < node.ChildrenSize(); ++i) {
  1731. const auto key = MkqlBuildExpr(node.Child(i)->Head(), ctx);
  1732. const auto payload = MkqlBuildExpr(node.Child(i)->Tail(), ctx);
  1733. items.emplace_back(key, payload);
  1734. }
  1735. return ctx.ProgramBuilder.NewDict(dictType, items);
  1736. });
  1737. AddCallable("Variant", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1738. const auto varType = node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TVariantExprType>();
  1739. const auto type = BuildType(*node.Child(2), *varType, ctx.ProgramBuilder);
  1740. const auto item = MkqlBuildExpr(node.Head(), ctx);
  1741. return varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple ?
  1742. ctx.ProgramBuilder.NewVariant(item, FromString<ui32>(node.Child(1)->Content()), type) :
  1743. ctx.ProgramBuilder.NewVariant(item, node.Child(1)->Content(), type);
  1744. });
  1745. AddCallable("AsStruct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1746. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  1747. members.reserve(node.ChildrenSize());
  1748. node.ForEachChild([&](const TExprNode& child){ members.emplace_back(child.Head().Content(), MkqlBuildExpr(child.Tail(), ctx)); });
  1749. return ctx.ProgramBuilder.NewStruct(members);
  1750. });
  1751. AddCallable("AsDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1752. std::vector<std::pair<TRuntimeNode, TRuntimeNode>> items;
  1753. items.reserve(node.ChildrenSize());
  1754. node.ForEachChild([&](const TExprNode& child){ items.emplace_back(MkqlBuildExpr(*child.Child(0), ctx), MkqlBuildExpr(*child.Child(1), ctx)); });
  1755. const auto dictType = ctx.ProgramBuilder.NewDictType(items[0].first.GetStaticType(), items[0].second.GetStaticType(), false);
  1756. return ctx.ProgramBuilder.NewDict(dictType, items);
  1757. });
  1758. AddCallable("Ensure", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1759. const auto value = MkqlBuildExpr(node.Head(), ctx);
  1760. const auto predicate = MkqlBuildExpr(*node.Child(1), ctx);
  1761. const auto message = node.ChildrenSize() > 2 ? MkqlBuildExpr(node.Tail(), ctx) : ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  1762. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1763. return ctx.ProgramBuilder.Ensure(value, predicate, message, pos.File, pos.Row, pos.Column);
  1764. });
  1765. AddCallable("Replicate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1766. const auto value = MkqlBuildExpr(node.Head(), ctx);
  1767. const auto count = MkqlBuildExpr(*node.Child(1), ctx);
  1768. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1769. return ctx.ProgramBuilder.Replicate(value, count, pos.File, pos.Row, pos.Column);
  1770. });
  1771. AddCallable("IfPresent", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1772. TRuntimeNode::TList optionals;
  1773. const auto width = node.ChildrenSize() - 2U;
  1774. optionals.reserve(width);
  1775. auto i = 0U;
  1776. std::generate_n(std::back_inserter(optionals), width, [&](){ return MkqlBuildExpr(*node.Child(i++), ctx); });
  1777. const auto elseBranch = MkqlBuildExpr(node.Tail(), ctx);
  1778. return ctx.ProgramBuilder.IfPresent(optionals, [&](TRuntimeNode::TList items) {
  1779. return MkqlBuildLambda(*node.Child(width), ctx, items);
  1780. }, elseBranch);
  1781. });
  1782. AddCallable({"DataType",
  1783. "ListType",
  1784. "OptionalType",
  1785. "TupleType",
  1786. "StructType",
  1787. "DictType",
  1788. "VoidType",
  1789. "NullType",
  1790. "CallableType",
  1791. "UnitType",
  1792. "GenericType",
  1793. "ResourceType",
  1794. "TaggedType",
  1795. "VariantType",
  1796. "StreamType",
  1797. "FlowType",
  1798. "EmptyListType",
  1799. "EmptyDictType"},
  1800. [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1801. const auto type = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1802. return TRuntimeNode(type, true);
  1803. });
  1804. AddCallable("ParseType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1805. const auto type = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1806. return TRuntimeNode(type, true);
  1807. });
  1808. AddCallable("TypeOf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1809. const auto type = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1810. return TRuntimeNode(type, true);
  1811. });
  1812. AddCallable("EmptyList", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1813. Y_UNUSED(node);
  1814. if (RuntimeVersion < 11) {
  1815. return ctx.ProgramBuilder.NewEmptyList(ctx.ProgramBuilder.NewVoid().GetStaticType());
  1816. } else {
  1817. return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyList(), true);
  1818. }
  1819. });
  1820. AddCallable("EmptyDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1821. Y_UNUSED(node);
  1822. if (RuntimeVersion < 11) {
  1823. auto voidType = ctx.ProgramBuilder.NewVoid().GetStaticType();
  1824. auto dictType = ctx.ProgramBuilder.NewDictType(voidType, voidType, false);
  1825. return ctx.ProgramBuilder.NewDict(dictType, {});
  1826. } else {
  1827. return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyDict(), true);
  1828. }
  1829. });
  1830. AddCallable("SourceOf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1831. const auto type = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1832. return ctx.ProgramBuilder.SourceOf(type);
  1833. });
  1834. AddCallable("TypeHandle", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1835. const auto type = node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  1836. const auto yson = WriteTypeToYson(type);
  1837. const auto retType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1838. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1839. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(yson));
  1840. return TRuntimeNode(call.Build(), false);
  1841. });
  1842. AddCallable("ReprCode", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1843. const auto type = node.Head().GetTypeAnn();
  1844. const auto yson = WriteTypeToYson(type);
  1845. const auto& args = GetAllArguments(node, ctx);
  1846. const auto retType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1847. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1848. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1849. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1850. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1851. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1852. for (auto arg : args) {
  1853. call.Add(arg);
  1854. }
  1855. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::Yson>(yson));
  1856. return TRuntimeNode(call.Build(), false);
  1857. });
  1858. // safe and position unaware
  1859. AddCallable({
  1860. "SerializeTypeHandle",
  1861. "TypeKind",
  1862. "FormatCode",
  1863. "FormatCodeWithPositions",
  1864. "SerializeCode",
  1865. }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1866. const auto& args = GetAllArguments(node, ctx);
  1867. const auto retType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1868. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1869. for (auto arg : args) {
  1870. call.Add(arg);
  1871. }
  1872. return TRuntimeNode(call.Build(), false);
  1873. });
  1874. // with position
  1875. AddCallable({
  1876. "ParseTypeHandle",
  1877. "DataTypeComponents",
  1878. "DataTypeHandle",
  1879. "OptionalItemType",
  1880. "OptionalTypeHandle",
  1881. "ListItemType",
  1882. "ListTypeHandle",
  1883. "StreamItemType",
  1884. "StreamTypeHandle",
  1885. "TupleTypeComponents",
  1886. "TupleTypeHandle",
  1887. "StructTypeComponents",
  1888. "StructTypeHandle",
  1889. "DictTypeComponents",
  1890. "DictTypeHandle",
  1891. "ResourceTypeTag",
  1892. "ResourceTypeHandle",
  1893. "TaggedTypeComponents",
  1894. "TaggedTypeHandle",
  1895. "VariantUnderlyingType",
  1896. "VariantTypeHandle",
  1897. "VoidTypeHandle",
  1898. "NullTypeHandle",
  1899. "EmptyListTypeHandle",
  1900. "EmptyDictTypeHandle",
  1901. "CallableTypeComponents",
  1902. "CallableArgument",
  1903. "CallableTypeHandle",
  1904. "WorldCode",
  1905. "AtomCode",
  1906. "ListCode",
  1907. "FuncCode",
  1908. }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1909. const auto& args = GetAllArguments(node, ctx);
  1910. const auto retType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1911. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1912. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1913. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1914. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1915. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1916. for (auto arg : args) {
  1917. call.Add(arg);
  1918. }
  1919. return TRuntimeNode(call.Build(), false);
  1920. });
  1921. AddCallable("LambdaCode", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1922. const auto lambda = node.Child(node.ChildrenSize() - 1);
  1923. const auto retType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1924. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  1925. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), retType);
  1926. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(pos.File));
  1927. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Row));
  1928. call.Add(ctx.ProgramBuilder.NewDataLiteral(pos.Column));
  1929. if (node.ChildrenSize() == 2) {
  1930. auto count = MkqlBuildExpr(node.Head(), ctx);
  1931. call.Add(count);
  1932. } else {
  1933. call.Add(ctx.ProgramBuilder.NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
  1934. }
  1935. TRuntimeNode body;
  1936. {
  1937. TMkqlBuildContext::TArgumentsMap innerArguments;
  1938. innerArguments.reserve(lambda->Head().ChildrenSize());
  1939. lambda->Head().ForEachChild([&](const TExprNode& argNode) {
  1940. const auto argType = BuildType(argNode, *argNode.GetTypeAnn(), ctx.ProgramBuilder);
  1941. const auto arg = ctx.ProgramBuilder.Arg(argType);
  1942. innerArguments.emplace(&argNode, arg);
  1943. call.Add(arg);
  1944. });
  1945. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda->UniqueId());
  1946. body = MkqlBuildExpr(*lambda->Child(1), innerCtx);
  1947. }
  1948. call.Add(body);
  1949. return TRuntimeNode(call.Build(), false);
  1950. });
  1951. AddCallable("FormatType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1952. TRuntimeNode str;
  1953. if (node.Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource) {
  1954. auto handle = MkqlBuildExpr(node.Head(), ctx);
  1955. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  1956. call.Add(handle);
  1957. str = TRuntimeNode(call.Build(), false);
  1958. } else {
  1959. str = ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(FormatType(node.Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType()));
  1960. }
  1961. return str;
  1962. });
  1963. AddCallable("FormatTypeDiff", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1964. if (node.Child(0)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource) { // if we got resource + resource
  1965. YQL_ENSURE(node.Child(1)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Resource);
  1966. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  1967. call.Add(MkqlBuildExpr(*node.Child(0), ctx));
  1968. call.Add(MkqlBuildExpr(*node.Child(1), ctx));
  1969. call.Add(ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(2), NUdf::EDataSlot::Bool)));
  1970. return TRuntimeNode(call.Build(), false);
  1971. } else { // if we got type + type
  1972. bool pretty = FromString<bool>(*node.Child(2), NUdf::EDataSlot::Bool);
  1973. const auto type_left = node.Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  1974. const auto type_right = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  1975. return pretty ? ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(NYql::GetTypePrettyDiff(*type_left, *type_right)) :
  1976. ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(NYql::GetTypeDiff(*type_left, *type_right));
  1977. }
  1978. });
  1979. AddCallable("Void", [](const TExprNode&, TMkqlBuildContext& ctx) {
  1980. return ctx.ProgramBuilder.NewVoid();
  1981. });
  1982. AddCallable("Null", [](const TExprNode&, TMkqlBuildContext& ctx) {
  1983. return ctx.ProgramBuilder.NewNull();
  1984. });
  1985. AddCallable({ "AsTagged","Untag" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1986. auto input = MkqlBuildExpr(node.Head(), ctx);
  1987. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  1988. return ctx.ProgramBuilder.Nop(input, returnType);
  1989. });
  1990. AddCallable({"WithWorld"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  1991. return MkqlBuildExpr(node.Head(), ctx);
  1992. });
  1993. AddCallable("Error", [](const TExprNode& node, TMkqlBuildContext& ctx)->NKikimr::NMiniKQL::TRuntimeNode {
  1994. const auto err = node.GetTypeAnn()->Cast<TErrorExprType>()->GetError();
  1995. ythrow TNodeException(ctx.ExprCtx.AppendPosition(err.Position)) << err.GetMessage();
  1996. });
  1997. AddCallable("ErrorType", [](const TExprNode& node, TMkqlBuildContext& ctx)->NKikimr::NMiniKQL::TRuntimeNode {
  1998. const auto err = node.GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TErrorExprType>()->GetError();
  1999. ythrow TNodeException(ctx.ExprCtx.AppendPosition(err.Position)) << err.GetMessage();
  2000. });
  2001. AddCallable("Join", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2002. const auto list1 = MkqlBuildExpr(node.Head(), ctx);
  2003. const auto list2 = MkqlBuildExpr(*node.Child(1), ctx);
  2004. const auto dict1 = ctx.ProgramBuilder.ToHashedDict(list1, true, [&](TRuntimeNode item) {
  2005. return MkqlBuildLambda(*node.Child(2), ctx, {item});
  2006. }, [&](TRuntimeNode item) {
  2007. return item;
  2008. });
  2009. const auto dict2 = ctx.ProgramBuilder.ToHashedDict(list2, true, [&](TRuntimeNode item) {
  2010. return MkqlBuildLambda(*node.Child(3), ctx, {item});
  2011. }, [&](TRuntimeNode item) {
  2012. return item;
  2013. });
  2014. const auto joinKind = GetJoinKind(node, node.Child(4)->Content());
  2015. return ctx.ProgramBuilder.JoinDict(dict1, true, dict2, true, joinKind);
  2016. });
  2017. AddCallable("JoinDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2018. const auto dict1 = MkqlBuildExpr(*node.Child(0), ctx);
  2019. const auto dict2 = MkqlBuildExpr(*node.Child(1), ctx);
  2020. const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
  2021. bool multi1 = true, multi2 = true;
  2022. if (node.ChildrenSize() > 3) {
  2023. node.Tail().ForEachChild([&](const TExprNode& flag){
  2024. if (const auto& content = flag.Content(); content == "LeftUnique")
  2025. multi1 = false;
  2026. else if ( content == "RightUnique")
  2027. multi2 = false;
  2028. });
  2029. }
  2030. return ctx.ProgramBuilder.JoinDict(dict1, multi1, dict2, multi2, joinKind);
  2031. });
  2032. AddCallable({"FilePath", "FileContent", "FolderPath"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2033. TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id));
  2034. call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content()));
  2035. return TRuntimeNode(call.Build(), false);
  2036. });
  2037. AddCallable("TablePath", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2038. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("");
  2039. });
  2040. AddCallable("TableRecord", [](const TExprNode&, TMkqlBuildContext& ctx) {
  2041. return ctx.ProgramBuilder.NewDataLiteral<ui64>(0);
  2042. });
  2043. AddCallable("Udf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2044. YQL_ENSURE(node.ChildrenSize() == 8);
  2045. std::string_view function = node.Head().Content();
  2046. const auto runConfig = MkqlBuildExpr(*node.Child(1), ctx);
  2047. const auto userType = BuildType(*node.Child(2), *node.Child(2)->GetTypeAnn(), ctx.ProgramBuilder);
  2048. const auto typeConfig = node.Child(3)->Content();
  2049. const auto callableType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2050. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2051. return ctx.ProgramBuilder.TypedUdf(function, callableType, runConfig, userType, typeConfig,
  2052. pos.File, pos.Row, pos.Column);
  2053. });
  2054. AddCallable("ScriptUdf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2055. EScriptType scriptType = ScriptTypeFromStr(node.Head().Content());
  2056. if (scriptType == EScriptType::Unknown) {
  2057. ythrow TNodeException(node.Head())
  2058. << "Unknown script type '"
  2059. << node.Head().Content() << '\'';
  2060. }
  2061. std::string_view funcName = node.Child(1)->Content();
  2062. const auto typeNode = node.Child(2);
  2063. const auto funcType = BuildType(*typeNode, *typeNode->GetTypeAnn(), ctx.ProgramBuilder);
  2064. const auto script = MkqlBuildExpr(*node.Child(3), ctx);
  2065. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2066. return ctx.ProgramBuilder.ScriptUdf(node.Head().Content(), funcName, funcType, script,
  2067. pos.File, pos.Row, pos.Column);
  2068. });
  2069. AddCallable("Apply", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2070. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2071. const auto callable = MkqlBuildExpr(node.Head(), ctx);
  2072. const auto& args = GetArgumentsFrom<1U>(node, ctx);
  2073. return ctx.ProgramBuilder.Apply(callable, args, pos.File, pos.Row, pos.Column);
  2074. });
  2075. AddCallable("NamedApply", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2076. const auto pos = ctx.ExprCtx.GetPosition(node.Pos());
  2077. const auto callable = MkqlBuildExpr(node.Head(), ctx);
  2078. const auto positionalArgs = MkqlBuildExpr(*node.Child(1), ctx);
  2079. const auto namedArgs = MkqlBuildExpr(*node.Child(2), ctx);
  2080. const auto dependentNodes = node.ChildrenSize() - 3;
  2081. const auto callableType = node.Head().GetTypeAnn()->Cast<TCallableExprType>();
  2082. const auto tupleType = node.Child(1)->GetTypeAnn()->Cast<TTupleExprType>();
  2083. const auto structType = node.Child(2)->GetTypeAnn()->Cast<TStructExprType>();
  2084. std::vector<TRuntimeNode> args(callableType->GetArgumentsSize() + dependentNodes);
  2085. for (size_t i = 0; i < tupleType->GetSize(); ++i) {
  2086. args[i] = node.Child(1)->IsList() ?
  2087. MkqlBuildExpr(*node.Child(1)->Child(i), ctx):
  2088. ctx.ProgramBuilder.Nth(positionalArgs, i);
  2089. }
  2090. for (size_t i = 0; i < structType->GetSize(); ++i) {
  2091. auto memberName = structType->GetItems()[i]->GetName();
  2092. auto index = callableType->ArgumentIndexByName(memberName);
  2093. if (!index || *index < tupleType->GetSize()) {
  2094. ythrow TNodeException(node.Child(2)) << "Wrong named argument: " << memberName;
  2095. }
  2096. TRuntimeNode arg;
  2097. if (node.Child(2)->IsCallable("AsStruct")) {
  2098. for (auto& child : node.Child(2)->Children()) {
  2099. if (child->Head().Content() == memberName) {
  2100. arg = MkqlBuildExpr(child->Tail(), ctx);
  2101. break;
  2102. }
  2103. }
  2104. if (!arg.GetNode()) {
  2105. ythrow TNodeException(node.Child(2)) << "Missing argument: " << memberName;
  2106. }
  2107. }
  2108. else {
  2109. arg = ctx.ProgramBuilder.Member(namedArgs, memberName);
  2110. }
  2111. args[*index] = arg;
  2112. }
  2113. for (ui32 i = tupleType->GetSize(); i < callableType->GetArgumentsSize(); ++i) {
  2114. auto& arg = args[i];
  2115. if (arg.GetNode()) {
  2116. continue;
  2117. }
  2118. auto mkqlType = BuildType(node, *callableType->GetArguments()[i].Type, ctx.ProgramBuilder);
  2119. arg = ctx.ProgramBuilder.NewEmptyOptional(mkqlType);
  2120. }
  2121. for (ui32 i = 0; i < dependentNodes; ++i) {
  2122. args[callableType->GetArgumentsSize() + i] = MkqlBuildExpr(*node.Child(3 + i), ctx);
  2123. }
  2124. return ctx.ProgramBuilder.Apply(callable, args, pos.File, pos.Row, pos.Column, dependentNodes);
  2125. });
  2126. AddCallable("Callable", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2127. const auto callableType = BuildType(node.Head(), *node.Head().GetTypeAnn(), ctx.ProgramBuilder);
  2128. return ctx.ProgramBuilder.Callable(callableType, [&](const TArrayRef<const TRuntimeNode>& args) {
  2129. const auto& lambda = node.Tail();
  2130. TMkqlBuildContext::TArgumentsMap innerArguments;
  2131. innerArguments.reserve(lambda.Head().ChildrenSize());
  2132. MKQL_ENSURE(args.size() == lambda.Head().ChildrenSize(), "Mismatch of lambda arguments count");
  2133. auto it = args.cbegin();
  2134. lambda.Head().ForEachChild([&](const TExprNode& arg){ innerArguments.emplace(&arg, *it++); });
  2135. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2136. return MkqlBuildExpr(lambda.Tail(), innerCtx);
  2137. });
  2138. });
  2139. AddCallable("PgConst", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2140. auto type = AS_TYPE(TPgType, BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder));
  2141. TRuntimeNode typeMod;
  2142. if (node.ChildrenSize() >= 3) {
  2143. typeMod = MkqlBuildExpr(*node.Child(2), ctx);
  2144. }
  2145. auto typeMod1 = typeMod;
  2146. if (node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "interval" &&
  2147. node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "_interval") {
  2148. typeMod1 = TRuntimeNode();
  2149. }
  2150. auto ret = ctx.ProgramBuilder.PgConst(type, node.Head().Content(), typeMod1);
  2151. if (node.ChildrenSize() >= 3) {
  2152. return ctx.ProgramBuilder.PgCast(ret, type, typeMod);
  2153. } else {
  2154. return ret;
  2155. }
  2156. });
  2157. AddCallable("PgInternal0", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2158. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2159. return ctx.ProgramBuilder.PgInternal0(returnType);
  2160. });
  2161. AddCallable({"PgResolvedCall","PgResolvedCallCtx" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2162. auto name = node.Head().Content();
  2163. auto id = FromString<ui32>(node.Child(1)->Content());
  2164. std::vector<TRuntimeNode> args;
  2165. args.reserve(node.ChildrenSize() - 3);
  2166. for (ui32 i = 3; i < node.ChildrenSize(); ++i) {
  2167. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2168. }
  2169. bool rangeFunction = false;
  2170. for (const auto& child : node.Child(2)->Children()) {
  2171. if (child->Head().Content() == "range") {
  2172. rangeFunction = true;
  2173. }
  2174. }
  2175. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2176. return ctx.ProgramBuilder.PgResolvedCall(node.IsCallable("PgResolvedCallCtx"), name, id, args, returnType, rangeFunction);
  2177. });
  2178. AddCallable("PgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2179. auto operId = FromString<ui32>(node.Child(1)->Content());
  2180. auto procId = NPg::LookupOper(operId).ProcId;
  2181. auto procName = NPg::LookupProc(procId).Name;
  2182. std::vector<TRuntimeNode> args;
  2183. args.reserve(node.ChildrenSize() - 2);
  2184. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2185. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2186. }
  2187. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2188. return ctx.ProgramBuilder.PgResolvedCall(false, procName, procId, args, returnType, false);
  2189. });
  2190. AddCallable("BlockPgResolvedCall", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2191. auto name = node.Head().Content();
  2192. auto id = FromString<ui32>(node.Child(1)->Content());
  2193. std::vector<TRuntimeNode> args;
  2194. args.reserve(node.ChildrenSize() - 3);
  2195. for (ui32 i = 3; i < node.ChildrenSize(); ++i) {
  2196. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2197. }
  2198. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2199. return ctx.ProgramBuilder.BlockPgResolvedCall(name, id, args, returnType);
  2200. });
  2201. AddCallable("BlockPgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2202. auto operId = FromString<ui32>(node.Child(1)->Content());
  2203. auto procId = NPg::LookupOper(operId).ProcId;
  2204. auto procName = NPg::LookupProc(procId).Name;
  2205. std::vector<TRuntimeNode> args;
  2206. args.reserve(node.ChildrenSize() - 2);
  2207. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2208. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2209. }
  2210. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2211. return ctx.ProgramBuilder.BlockPgResolvedCall(procName, procId, args, returnType);
  2212. });
  2213. AddCallable("PgCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2214. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2215. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2216. TRuntimeNode typeMod;
  2217. if (node.ChildrenSize() >= 3) {
  2218. typeMod = MkqlBuildExpr(*node.Child(2), ctx);
  2219. }
  2220. auto typeMod1 = typeMod;
  2221. if (node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "interval" &&
  2222. node.GetTypeAnn()->Cast<TPgExprType>()->GetName() != "_interval") {
  2223. typeMod1 = TRuntimeNode();
  2224. }
  2225. if (node.Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Null) {
  2226. auto sourceTypeId = node.Head().GetTypeAnn()->Cast<TPgExprType>()->GetId();
  2227. auto targetTypeId = node.GetTypeAnn()->Cast<TPgExprType>()->GetId();
  2228. const auto& sourceTypeDesc = NPg::LookupType(sourceTypeId);
  2229. const auto& targetTypeDesc = NPg::LookupType(targetTypeId);
  2230. const bool isSourceArray = sourceTypeDesc.TypeId == sourceTypeDesc.ArrayTypeId;
  2231. const bool isTargetArray = targetTypeDesc.TypeId == targetTypeDesc.ArrayTypeId;
  2232. if (isSourceArray == isTargetArray && NPg::HasCast(
  2233. isSourceArray ? sourceTypeDesc.ElementTypeId : sourceTypeId,
  2234. isTargetArray ? targetTypeDesc.ElementTypeId : targetTypeId)) {
  2235. typeMod1 = typeMod;
  2236. }
  2237. }
  2238. auto cast = ctx.ProgramBuilder.PgCast(input, returnType, typeMod1);
  2239. if (node.ChildrenSize() >= 3) {
  2240. return ctx.ProgramBuilder.PgCast(cast, returnType, typeMod);
  2241. } else {
  2242. return cast;
  2243. }
  2244. });
  2245. AddCallable("FromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2246. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2247. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2248. return ctx.ProgramBuilder.FromPg(input, returnType);
  2249. });
  2250. AddCallable("ToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2251. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2252. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2253. return ctx.ProgramBuilder.ToPg(input, returnType);
  2254. });
  2255. AddCallable("BlockFromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2256. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2257. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2258. return ctx.ProgramBuilder.BlockFromPg(input, returnType);
  2259. });
  2260. AddCallable("BlockToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2261. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2262. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2263. return ctx.ProgramBuilder.BlockToPg(input, returnType);
  2264. });
  2265. AddCallable("PgClone", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2266. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2267. if (IsNull(node.Head())) {
  2268. return input;
  2269. }
  2270. if (NPg::LookupType(node.GetTypeAnn()->Cast<TPgExprType>()->GetId()).PassByValue) {
  2271. return input;
  2272. }
  2273. TVector<TRuntimeNode> dependentNodes;
  2274. for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
  2275. dependentNodes.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2276. }
  2277. return ctx.ProgramBuilder.PgClone(input, dependentNodes);
  2278. });
  2279. AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2280. auto input = MkqlBuildExpr(*node.Child(0), ctx);
  2281. return ctx.ProgramBuilder.WithContext(input, node.Child(1)->Content());
  2282. });
  2283. AddCallable("BlockFunc", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2284. TVector<TRuntimeNode> args;
  2285. for (ui32 i = 2; i < node.ChildrenSize(); ++i) {
  2286. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2287. }
  2288. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2289. return ctx.ProgramBuilder.BlockFunc(node.Child(0)->Content(), returnType, args);
  2290. });
  2291. AddCallable("BlockBitCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2292. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2293. auto targetType = BuildType(node, *node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(), ctx.ProgramBuilder);
  2294. return ctx.ProgramBuilder.BlockBitCast(arg, targetType);
  2295. });
  2296. AddCallable("BlockNth", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2297. const auto tupleObj = MkqlBuildExpr(node.Head(), ctx);
  2298. const auto index = FromString<ui32>(node.Tail().Content());
  2299. return ctx.ProgramBuilder.BlockNth(tupleObj, index);
  2300. });
  2301. AddCallable("BlockAsTuple", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2302. TVector<TRuntimeNode> args;
  2303. for (const auto& x : node.Children()) {
  2304. args.push_back(MkqlBuildExpr(*x, ctx));
  2305. }
  2306. return ctx.ProgramBuilder.BlockAsTuple(args);
  2307. });
  2308. AddCallable("BlockCombineAll", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2309. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2310. std::optional<ui32> filterColumn;
  2311. if (!node.Child(1)->IsCallable("Void")) {
  2312. filterColumn = FromString<ui32>(node.Child(1)->Content());
  2313. }
  2314. TVector<TAggInfo> aggs;
  2315. for (const auto& agg : node.Child(2)->Children()) {
  2316. TAggInfo info;
  2317. info.Name = TString(agg->Head().Head().Content());
  2318. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2319. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2320. }
  2321. aggs.push_back(info);
  2322. }
  2323. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2324. return ctx.ProgramBuilder.BlockCombineAll(arg, filterColumn, aggs, returnType);
  2325. });
  2326. AddCallable("BlockCombineHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2327. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2328. std::optional<ui32> filterColumn;
  2329. if (!node.Child(1)->IsCallable("Void")) {
  2330. filterColumn = FromString<ui32>(node.Child(1)->Content());
  2331. }
  2332. TVector<ui32> keys;
  2333. for (const auto& key : node.Child(2)->Children()) {
  2334. keys.push_back(FromString<ui32>(key->Content()));
  2335. }
  2336. TVector<TAggInfo> aggs;
  2337. for (const auto& agg : node.Child(3)->Children()) {
  2338. TAggInfo info;
  2339. info.Name = TString(agg->Head().Head().Content());
  2340. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2341. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2342. }
  2343. aggs.push_back(info);
  2344. }
  2345. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2346. return ctx.ProgramBuilder.BlockCombineHashed(arg, filterColumn, keys, aggs, returnType);
  2347. });
  2348. AddCallable("BlockMergeFinalizeHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2349. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2350. TVector<ui32> keys;
  2351. for (const auto& key : node.Child(1)->Children()) {
  2352. keys.push_back(FromString<ui32>(key->Content()));
  2353. }
  2354. TVector<TAggInfo> aggs;
  2355. for (const auto& agg : node.Child(2)->Children()) {
  2356. TAggInfo info;
  2357. info.Name = TString(agg->Head().Head().Content());
  2358. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2359. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2360. }
  2361. aggs.push_back(info);
  2362. }
  2363. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2364. return ctx.ProgramBuilder.BlockMergeFinalizeHashed(arg, keys, aggs, returnType);
  2365. });
  2366. AddCallable("BlockMergeManyFinalizeHashed", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2367. auto arg = MkqlBuildExpr(*node.Child(0), ctx);
  2368. TVector<ui32> keys;
  2369. for (const auto& key : node.Child(1)->Children()) {
  2370. keys.push_back(FromString<ui32>(key->Content()));
  2371. }
  2372. TVector<TAggInfo> aggs;
  2373. for (const auto& agg : node.Child(2)->Children()) {
  2374. TAggInfo info;
  2375. info.Name = TString(agg->Head().Head().Content());
  2376. for (ui32 i = 1; i < agg->ChildrenSize(); ++i) {
  2377. info.ArgsColumns.push_back(FromString<ui32>(agg->Child(i)->Content()));
  2378. }
  2379. aggs.push_back(info);
  2380. }
  2381. ui32 streamIndex = FromString<ui32>(node.Child(3)->Content());
  2382. TVector<TVector<ui32>> streams;
  2383. for (const auto& child : node.Child(4)->Children()) {
  2384. auto& stream = streams.emplace_back();
  2385. for (const auto& atom : child->Children()) {
  2386. stream.emplace_back(FromString<ui32>(atom->Content()));
  2387. }
  2388. }
  2389. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2390. return ctx.ProgramBuilder.BlockMergeManyFinalizeHashed(arg, keys, aggs, streamIndex, streams, returnType);
  2391. });
  2392. AddCallable("BlockCompress", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2393. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  2394. const auto index = FromString<ui32>(node.Child(1)->Content());
  2395. return ctx.ProgramBuilder.BlockCompress(flow, index);
  2396. });
  2397. AddCallable("BlockExpandChunked", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2398. const auto flow = MkqlBuildExpr(node.Head(), ctx);
  2399. return ctx.ProgramBuilder.BlockExpandChunked(flow);
  2400. });
  2401. AddCallable("PgArray", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2402. std::vector<TRuntimeNode> args;
  2403. args.reserve(node.ChildrenSize());
  2404. for (ui32 i = 0; i < node.ChildrenSize(); ++i) {
  2405. args.push_back(MkqlBuildExpr(*node.Child(i), ctx));
  2406. }
  2407. auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2408. return ctx.ProgramBuilder.PgArray(args, returnType);
  2409. });
  2410. AddCallable("QueueCreate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2411. const auto initCapacity = MkqlBuildExpr(*node.Child(1), ctx);
  2412. const auto initSize = MkqlBuildExpr(*node.Child(2), ctx);
  2413. const auto& args = GetArgumentsFrom<3U>(node, ctx);
  2414. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2415. return ctx.ProgramBuilder.QueueCreate(initCapacity, initSize, args, returnType);
  2416. });
  2417. AddCallable("QueuePeek", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2418. const auto resource = MkqlBuildExpr(node.Head(), ctx);
  2419. const auto index = MkqlBuildExpr(*node.Child(1), ctx);
  2420. const auto& args = GetArgumentsFrom<2U>(node, ctx);
  2421. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2422. return ctx.ProgramBuilder.QueuePeek(resource, index, args, returnType);
  2423. });
  2424. AddCallable("QueueRange", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2425. const auto resource = MkqlBuildExpr(node.Head(), ctx);
  2426. const auto begin = MkqlBuildExpr(*node.Child(1), ctx);
  2427. const auto end = MkqlBuildExpr(*node.Child(2), ctx);
  2428. const auto& args = GetArgumentsFrom<3U>(node, ctx);
  2429. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2430. return ctx.ProgramBuilder.QueueRange(resource, begin, end, args, returnType);
  2431. });
  2432. AddCallable("Seq", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2433. const auto& args = GetArgumentsFrom<0U>(node, ctx);
  2434. const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
  2435. return ctx.ProgramBuilder.Seq(args, returnType);
  2436. });
  2437. AddCallable("FromYsonSimpleType", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2438. const auto input = MkqlBuildExpr(node.Head(), ctx);
  2439. const auto schemeType = ParseDataType(node, node.Child(1)->Content());
  2440. return ctx.ProgramBuilder.FromYsonSimpleType(input, schemeType);
  2441. });
  2442. AddCallable("TryWeakMemberFromDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2443. const auto other = MkqlBuildExpr(node.Head(), ctx);
  2444. const auto rest = MkqlBuildExpr(*node.Child(1), ctx);
  2445. const auto schemeType = ParseDataType(node, node.Child(2)->Content());
  2446. const auto member = node.Child(3)->Content();
  2447. return ctx.ProgramBuilder.TryWeakMemberFromDict(other, rest, schemeType, member);
  2448. });
  2449. AddCallable("DependsOn", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2450. return MkqlBuildExpr(node.Head(), ctx);
  2451. });
  2452. AddCallable("Parameter", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2453. const NNodes::TCoParameter parameter(&node);
  2454. return ctx.ProgramBuilder.Member(ctx.Parameters, parameter.Name());
  2455. });
  2456. AddCallable("SecureParam", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2457. return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(node.Head().Content());
  2458. });
  2459. AddCallable(SkippableCallables, [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2460. return MkqlBuildExpr(node.Head(), ctx);
  2461. });
  2462. AddCallable("Merge", [](const TExprNode& node, TMkqlBuildContext& ctx) {
  2463. const auto& args = GetAllArguments(node, ctx);
  2464. auto extend = ctx.ProgramBuilder.Extend(args);
  2465. if (auto sortConstr = node.GetConstraint<TSortedConstraintNode>()) {
  2466. const auto input = MkqlBuildExpr(node.Head(), ctx);
  2467. const auto& content = sortConstr->GetContent();
  2468. std::vector<TRuntimeNode> ascending;
  2469. ascending.reserve(content.size());
  2470. for (const auto& c: content) {
  2471. ascending.push_back(ctx.ProgramBuilder.NewDataLiteral(c.second));
  2472. }
  2473. TProgramBuilder::TUnaryLambda keyExractor = [&](TRuntimeNode item) {
  2474. std::vector<TRuntimeNode> keys;
  2475. keys.reserve(content.size());
  2476. for (const auto& c : content) {
  2477. if (c.first.front().empty())
  2478. keys.push_back(item);
  2479. else {
  2480. MKQL_ENSURE(c.first.front().size() == 1U, "Just column expected.");
  2481. keys.push_back(ctx.ProgramBuilder.Member(item, c.first.front().front()));
  2482. }
  2483. }
  2484. return ctx.ProgramBuilder.NewTuple(keys);
  2485. };
  2486. return ctx.ProgramBuilder.Sort(extend, ctx.ProgramBuilder.NewTuple(ascending), keyExractor);
  2487. }
  2488. else {
  2489. return extend;
  2490. }
  2491. });
  2492. }
  2493. TRuntimeNode MkqlBuildLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const TRuntimeNode::TList& args) {
  2494. MKQL_ENSURE(2U == lambda.ChildrenSize(), "Wide lambda isn't supported.");
  2495. TMkqlBuildContext::TArgumentsMap innerArguments;
  2496. innerArguments.reserve(args.size());
  2497. auto it = args.begin();
  2498. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  2499. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2500. return MkqlBuildExpr(lambda.Tail(), innerCtx);
  2501. }
  2502. TRuntimeNode::TList MkqlBuildWideLambda(const TExprNode& lambda, TMkqlBuildContext& ctx, const TRuntimeNode::TList& args) {
  2503. MKQL_ENSURE(0U < lambda.ChildrenSize(), "Empty lambda.");
  2504. TMkqlBuildContext::TArgumentsMap innerArguments;
  2505. innerArguments.reserve(args.size());
  2506. auto it = args.begin();
  2507. lambda.Head().ForEachChild([&](const TExprNode& child){ innerArguments.emplace(&child, *it++); });
  2508. TMkqlBuildContext innerCtx(ctx, std::move(innerArguments), lambda.UniqueId());
  2509. TRuntimeNode::TList result;
  2510. result.reserve(lambda.ChildrenSize() - 1U);
  2511. for (ui32 i = 1U; i < lambda.ChildrenSize(); ++i)
  2512. result.emplace_back(MkqlBuildExpr(*lambda.Child(i), innerCtx));
  2513. return result;
  2514. }
  2515. TRuntimeNode MkqlBuildExpr(const TExprNode& node, TMkqlBuildContext& ctx) {
  2516. for (auto currCtx = &ctx; currCtx; currCtx = currCtx->ParentCtx) {
  2517. const auto knownNode = currCtx->Memoization.find(&node);
  2518. if (currCtx->Memoization.cend() != knownNode) {
  2519. return knownNode->second;
  2520. }
  2521. }
  2522. switch (const auto type = node.Type()) {
  2523. case TExprNode::List:
  2524. return CheckTypeAndMemoize(node, ctx, ctx.ProgramBuilder.NewTuple(GetAllArguments(node, ctx)));
  2525. case TExprNode::Callable:
  2526. return CheckTypeAndMemoize(node, ctx, ctx.MkqlCompiler.GetCallable(node.Content())(node, ctx));
  2527. case TExprNode::Argument:
  2528. ythrow TNodeException(node) << "Unexpected argument: " << node.Content();
  2529. default:
  2530. ythrow TNodeException(node) << "Unexpected node type: " << type;
  2531. }
  2532. }
  2533. } // namespace NCommon
  2534. } // namespace NYql