yql_eval_expr.cpp 45 KB


  1. #include "yql_eval_expr.h"
  2. #include "yql_transform_pipeline.h"
  3. #include "yql_out_transformers.h"
  4. #include <yql/essentials/ast/serialize/yql_expr_serialize.h>
  5. #include <yql/essentials/core/type_ann/type_ann_core.h>
  6. #include <yql/essentials/core/type_ann/type_ann_expr.h>
  7. #include <yql/essentials/core/yql_expr_type_annotation.h>
  8. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  9. #include <yql/essentials/providers/common/codec/yql_codec.h>
  10. #include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
  11. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  12. #include <yql/essentials/providers/result/expr_nodes/yql_res_expr_nodes.h>
  13. #include <yql/essentials/utils/log/log.h>
  14. #include <yql/essentials/utils/yql_paths.h>
  15. #include <library/cpp/yson/node/node_io.h>
  16. #include <library/cpp/string_utils/base64/base64.h>
  17. #include <util/string/builder.h>
  18. namespace NYql {
  19. using namespace NKikimr;
  20. using namespace NKikimr::NMiniKQL;
  21. using namespace NNodes;
  22. const TString EvaluationComponent = "Evaluation";
  23. static THashSet<TStringBuf> EvaluationFuncs = {
  24. TStringBuf("EvaluateAtom"),
  25. TStringBuf("EvaluateExpr"),
  26. TStringBuf("EvaluateType"),
  27. TStringBuf("EvaluateCode")
  28. };
  29. static THashSet<TStringBuf> SubqueryExpandFuncs = {
  30. TStringBuf("SubqueryExtendFor"),
  31. TStringBuf("SubqueryUnionAllFor"),
  32. TStringBuf("SubqueryMergeFor"),
  33. TStringBuf("SubqueryUnionMergeFor"),
  34. TStringBuf("SubqueryOrderBy"),
  35. TStringBuf("SubqueryAssumeOrderBy")
  36. };
  37. bool CheckPendingArgs(const TExprNode& root, TNodeSet& visited, TNodeMap<const TExprNode*>& activeArgs, const TNodeMap<ui32>& externalWorlds, TExprContext& ctx,
  38. bool underTypeOf, bool& hasUnresolvedTypes) {
  39. if (!visited.emplace(&root).second) {
  40. return true;
  41. }
  42. if (root.IsCallable({"TypeOf", "SqlColumnOrType", "SqlPlainColumnOrType"})) {
  43. underTypeOf = true;
  44. }
  45. if (root.Type() == TExprNode::Argument) {
  46. if (activeArgs.find(&root) == activeArgs.cend()) {
  47. if (underTypeOf) {
  48. if (!root.GetTypeAnn() && !externalWorlds.count(&root)) {
  49. hasUnresolvedTypes = true;
  50. }
  51. } else if (!externalWorlds.count(&root)) {
  52. ctx.AddError(TIssue(ctx.GetPosition(root.Pos()), TStringBuilder() << "Failed to evaluate unresolved argument: " << root.Content() << ". Did you use a column?"));
  53. return false;
  54. }
  55. }
  56. }
  57. if (root.Type() == TExprNode::Lambda) {
  58. root.Child(0)->ForEachChild([&](const TExprNode& arg) {
  59. if (!activeArgs.emplace(&arg, &root).second) {
  60. ythrow yexception() << "argument is duplicated, #" << arg.UniqueId();
  61. }
  62. });
  63. }
  64. for (ui32 index = 0; index < root.ChildrenSize(); ++index) {
  65. const auto& child = *root.Child(index);
  66. auto onlyType = underTypeOf || (root.IsCallable("MatchType") || root.IsCallable("IfType")) && (index == 0);
  67. if (!CheckPendingArgs(child, visited, activeArgs, externalWorlds, ctx, onlyType,
  68. hasUnresolvedTypes)) {
  69. return false;
  70. }
  71. };
  72. return true;
  73. }
  74. class TMarkReachable {
  75. public:
  76. TNodeSet Reachable;
  77. TNodeMap<ui32> ExternalWorlds;
  78. TDeque<TExprNode::TPtr> ExternalWorldsList;
  79. bool HasConfigPending = false;
  80. public:
  81. void Scan(const TExprNode& node) {
  82. VisitExprByFirst(node, [this](const TExprNode& n) {
  83. if (n.IsCallable(ConfigureName)) {
  84. if (n.ChildrenSize() > 3 && n.Child(1)->Child(0)->Content() == ConfigProviderName) {
  85. bool pending = false;
  86. for (size_t i = 3; i < n.ChildrenSize(); ++i) {
  87. if (n.Child(i)->IsCallable("EvaluateAtom")) {
  88. pending = true;
  89. break;
  90. }
  91. }
  92. if (pending) {
  93. const TStringBuf command = n.Child(2)->Content();
  94. if (command == "AddFileByUrl") {
  95. PendingFileAliases.insert(n.Child(3)->Content());
  96. } else if (command == "AddFolderByUrl") {
  97. PendingFolderPrefixes.insert(n.Child(3)->Content());
  98. }
  99. }
  100. }
  101. }
  102. return true;
  103. });
  104. ScanImpl(node);
  105. }
  106. private:
  107. void ScanImpl(const TExprNode& node) {
  108. if (!Visited.emplace(&node).second) {
  109. return;
  110. }
  111. if (node.IsCallable("Seq!")) {
  112. for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
  113. auto lambda = node.Child(i);
  114. auto arg = lambda->Child(0)->ChildPtr(0);
  115. ui32 id = ExternalWorlds.size();
  116. YQL_ENSURE(ExternalWorlds.emplace(arg.Get(), id).second);
  117. ExternalWorldsList.push_back(arg);
  118. }
  119. }
  120. static THashSet<TStringBuf> FILE_CALLABLES = {"FilePath", "FileContent", "FolderPath"};
  121. if (node.IsCallable(FILE_CALLABLES)) {
  122. const auto alias = node.Head().Content();
  123. if (PendingFileAliases.contains(alias) || AnyOf(PendingFolderPrefixes, [alias](const TStringBuf prefix) {
  124. auto withSlash = TString(prefix) + "/";
  125. return alias.StartsWith(withSlash);
  126. })) {
  127. for (auto& curr: CurrentEvalNodes) {
  128. Reachable.erase(curr);
  129. }
  130. HasConfigPending = true;
  131. }
  132. }
  133. if (node.IsCallable("QuoteCode")) {
  134. Reachable.insert(&node);
  135. return;
  136. }
  137. bool pop = false;
  138. if (node.IsCallable(EvaluationFuncs) || node.IsCallable(SubqueryExpandFuncs)) {
  139. Reachable.insert(&node);
  140. CurrentEvalNodes.insert(&node);
  141. pop = true;
  142. }
  143. if (node.IsCallable({ "EvaluateIf!", "EvaluateFor!", "EvaluateParallelFor!" })) {
  144. // scan predicate/list only
  145. if (node.ChildrenSize() > 1) {
  146. CurrentEvalNodes.insert(&node);
  147. pop = true;
  148. ScanImpl(*node.Child(1));
  149. }
  150. } else if (node.IsCallable(SubqueryExpandFuncs)) {
  151. // scan list only if it's wrapped by evaluation func
  152. ui32 index = 0;
  153. if (node.IsCallable("SubqueryOrderBy") || node.IsCallable("SubqueryAssumeOrderBy")) {
  154. index = 1;
  155. }
  156. if (node.ChildrenSize() > index) {
  157. if (node.Child(index)->IsCallable(EvaluationFuncs)) {
  158. CurrentEvalNodes.insert(&node);
  159. pop = true;
  160. ScanImpl(*node.Child(index));
  161. } else {
  162. for (const auto& child : node.Children()) {
  163. ScanImpl(*child);
  164. }
  165. }
  166. }
  167. } else {
  168. for (const auto& child : node.Children()) {
  169. ScanImpl(*child);
  170. }
  171. }
  172. if (pop) {
  173. CurrentEvalNodes.erase(&node);
  174. }
  175. }
  176. private:
  177. TNodeSet Visited;
  178. THashSet<TStringBuf> PendingFileAliases;
  179. THashSet<TStringBuf> PendingFolderPrefixes;
  180. TNodeSet CurrentEvalNodes;
  181. };
  182. struct TEvalScope {
  183. TEvalScope(TTypeAnnotationContext& types)
  184. : Types(types)
  185. {
  186. ++Types.EvaluationInProgress;
  187. for (auto& dataProvider : Types.DataSources) {
  188. dataProvider->EnterEvaluation(Types.EvaluationInProgress);
  189. }
  190. }
  191. ~TEvalScope() {
  192. for (auto& dataProvider : Types.DataSources) {
  193. dataProvider->LeaveEvaluation(Types.EvaluationInProgress);
  194. }
  195. --Types.EvaluationInProgress;
  196. }
  197. TTypeAnnotationContext& Types;
  198. };
  199. bool ValidateCalcWorlds(const TExprNode& node, const TTypeAnnotationContext& types, TNodeSet& visited) {
  200. if (!visited.emplace(&node).second) {
  201. return true;
  202. }
  203. if (node.Type() == TExprNode::World) {
  204. return true;
  205. }
  206. if (node.IsCallable("Commit!") || node.IsCallable("CommitAll!") || node.IsCallable("Configure!")) {
  207. return ValidateCalcWorlds(*node.Child(0), types, visited);
  208. }
  209. if (node.IsCallable("Sync!")) {
  210. for (const auto& child : node.Children()) {
  211. if (!ValidateCalcWorlds(*child, types, visited)) {
  212. return false;
  213. }
  214. }
  215. return true;
  216. }
  217. for (auto& dataProvider : types.DataSources) {
  218. if (dataProvider->CanEvaluate(node)) {
  219. return true;
  220. }
  221. }
  222. return false;
  223. }
  224. TExprNode::TPtr QuoteCode(const TExprNode::TPtr& node, TExprContext& ctx, TNodeOnNodeOwnedMap& knownArgs, TNodeOnNodeOwnedMap& visited,
  225. const TNodeMap<ui32>& externalWorlds) {
  226. auto& res = visited[node.Get()];
  227. if (res) {
  228. return res;
  229. }
  230. switch (node->Type()) {
  231. case TExprNode::Atom: {
  232. return res = ctx.Builder(node->Pos())
  233. .Callable("AtomCode")
  234. .Callable(0, "String")
  235. .Atom(0, node->Content())
  236. .Seal()
  237. .Seal()
  238. .Build();
  239. }
  240. case TExprNode::Argument: {
  241. auto it = knownArgs.find(node.Get());
  242. if (it != knownArgs.end()) {
  243. return res = it->second;
  244. }
  245. auto externalWorldIt = externalWorlds.find(node.Get());
  246. if (externalWorldIt != externalWorlds.end()) {
  247. return ctx.Builder(node->Pos())
  248. .Callable("FuncCode")
  249. .Callable(0, "String")
  250. .Atom(0, "WorldArg")
  251. .Seal()
  252. .Callable(1, "AtomCode")
  253. .Callable(0, "String")
  254. .Atom(0, ToString(externalWorldIt->second))
  255. .Seal()
  256. .Seal()
  257. .Seal()
  258. .Build();
  259. }
  260. return res = ctx.Builder(node->Pos())
  261. .Callable("ReprCode")
  262. .Add(0, node)
  263. .Seal()
  264. .Build();
  265. }
  266. case TExprNode::List: {
  267. TExprNode::TListType children;
  268. children.reserve(node->ChildrenSize());
  269. for (auto& child : node->Children()) {
  270. auto childCode = QuoteCode(child, ctx, knownArgs, visited, externalWorlds);
  271. if (!childCode) {
  272. return nullptr;
  273. }
  274. children.push_back(childCode);
  275. }
  276. return res = ctx.NewCallable(node->Pos(), "ListCode", std::move(children));
  277. }
  278. case TExprNode::Callable: {
  279. TExprNode::TListType children;
  280. children.reserve(node->ChildrenSize() + 1);
  281. children.push_back(ctx.Builder(node->Pos())
  282. .Callable("String")
  283. .Atom(0, node->Content())
  284. .Seal()
  285. .Build());
  286. for (auto& child : node->Children()) {
  287. auto childCode = QuoteCode(child, ctx, knownArgs, visited, externalWorlds);
  288. if (!childCode) {
  289. return nullptr;
  290. }
  291. children.push_back(childCode);
  292. }
  293. return res = ctx.NewCallable(node->Pos(), "FuncCode", std::move(children));
  294. }
  295. case TExprNode::Lambda: {
  296. TExprNode::TListType lambdaArgsItems;
  297. for (auto arg : node->Child(0)->Children()) {
  298. auto lambdaArg = ctx.NewArgument(arg->Pos(), arg->Content());
  299. lambdaArgsItems.push_back(lambdaArg);
  300. knownArgs.emplace(arg.Get(), lambdaArg);
  301. }
  302. auto lambdaArgs = ctx.NewArguments(node->Pos(), std::move(lambdaArgsItems));
  303. auto body = QuoteCode(node->ChildPtr(1), ctx, knownArgs, visited, externalWorlds);
  304. if (!body) {
  305. return nullptr;
  306. }
  307. for (auto arg : node->Child(0)->Children()) {
  308. knownArgs.erase(arg.Get());
  309. }
  310. auto lambda = ctx.NewLambda(node->Pos(), std::move(lambdaArgs), std::move(body));
  311. return res = ctx.Builder(node->Pos())
  312. .Callable("LambdaCode")
  313. .Add(0, lambda)
  314. .Seal()
  315. .Build();
  316. }
  317. case TExprNode::World: {
  318. return res = ctx.Builder(node->Pos())
  319. .Callable("WorldCode")
  320. .Seal()
  321. .Build();
  322. }
  323. default:
  324. YQL_ENSURE(false, "Unknown type: " << node->Type());
  325. }
  326. }
  327. IGraphTransformer::TStatus EvaluateExpression(const TExprNode::TPtr& input, TExprNode::TPtr& output,
  328. TTypeAnnotationContext& types, TExprContext& ctx, const IFunctionRegistry& functionRegistry, IGraphTransformer* calcTransfomer) {
  329. output = input;
  330. if (ctx.Step.IsDone(TExprStep::ExprEval))
  331. return IGraphTransformer::TStatus::Ok;
  332. YQL_CLOG(DEBUG, CoreEval) << "EvaluateExpression - start";
  333. bool pure = false;
  334. TString nextProvider;
  335. TMaybe<IDataProvider*> calcProvider;
  336. TExprNode::TPtr calcWorldRoot;
  337. TPositionHandle pipelinePos;
  338. bool isAtomPipeline = false;
  339. bool isOptionalAtom = false;
  340. bool isTypePipeline = false;
  341. bool isCodePipeline = false;
  342. TTransformationPipeline pipeline(&types);
  343. pipeline.AddServiceTransformers();
  344. pipeline.AddPreTypeAnnotation();
  345. pipeline.AddExpressionEvaluation(functionRegistry);
  346. pipeline.AddIOAnnotation();
  347. pipeline.AddTypeAnnotationTransformer();
  348. pipeline.Add(CreateFunctorTransformer(
  349. [&](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
  350. output = input;
  351. if (!input->GetTypeAnn()) {
  352. ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Lambda is not allowed as argument for function: " << input->Content()));
  353. return IGraphTransformer::TStatus::Error;
  354. }
  355. if (isAtomPipeline) {
  356. const TDataExprType* dataType;
  357. if (!EnsureDataOrOptionalOfData(*input, isOptionalAtom, dataType, ctx)) {
  358. return IGraphTransformer::TStatus::Error;
  359. }
  360. if (!EnsureSpecificDataType(input->Pos(), *dataType, EDataSlot::String, ctx)) {
  361. return IGraphTransformer::TStatus::Error;
  362. }
  363. } else if (isTypePipeline) {
  364. if (!EnsureSpecificDataType(*input, EDataSlot::Yson, ctx)) {
  365. return IGraphTransformer::TStatus::Error;
  366. }
  367. } else if (isCodePipeline) {
  368. if (!EnsureSpecificDataType(*input, EDataSlot::String, ctx)) {
  369. return IGraphTransformer::TStatus::Error;
  370. }
  371. } else {
  372. if (!EnsurePersistable(*input, ctx)) {
  373. return IGraphTransformer::TStatus::Error;
  374. }
  375. }
  376. return IGraphTransformer::TStatus::Ok;
  377. }), "TopLevelType", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR, "Ensure type of expression is correct");
  378. const bool forSubGraph = true;
  379. pipeline.AddPostTypeAnnotation(forSubGraph);
  380. pipeline.Add(TExprLogTransformer::Sync("EvalExpressionOpt", NLog::EComponent::CoreEval, NLog::ELevel::TRACE),
  381. "EvalOptTrace", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR, "EvalOptTrace");
  382. pipeline.AddOptimization(false);
  383. pipeline.Add(CreateFunctorTransformer(
  384. [&](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
  385. output = input;
  386. if (!calcProvider) {
  387. pure = false;
  388. if (IsPureIsolatedLambda(*input)) {
  389. pure = true;
  390. if (calcTransfomer) {
  391. calcProvider.ConstructInPlace();
  392. } else {
  393. if (nextProvider.empty()) {
  394. nextProvider = types.GetDefaultDataSource();
  395. }
  396. if (!nextProvider.empty() &&
  397. types.DataSourceMap.contains(nextProvider)) {
  398. calcProvider = types.DataSourceMap[nextProvider].Get();
  399. }
  400. }
  401. } else if (!calcTransfomer) {
  402. for (auto& p : types.DataSources) {
  403. TSyncMap syncList;
  404. if (p->CanBuildResult(*input, syncList)) {
  405. bool canExec = true;
  406. for (auto& x : syncList) {
  407. if (x.first->Type() == TExprNode::World) {
  408. continue;
  409. }
  410. if (!p->GetExecWorld(x.first, calcWorldRoot)) {
  411. canExec = false;
  412. break;
  413. }
  414. if (!calcWorldRoot) {
  415. continue;
  416. }
  417. TNodeSet visited;
  418. if (!ValidateCalcWorlds(*calcWorldRoot, types, visited)) {
  419. canExec = false;
  420. break;
  421. }
  422. }
  423. if (canExec) {
  424. calcProvider = p.Get();
  425. output = (*calcProvider.Get())->CleanupWorld(input, ctx);
  426. if (!output) {
  427. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
  428. }
  429. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
  430. }
  431. }
  432. }
  433. }
  434. if (!calcProvider) {
  435. ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Only pure expressions are supported"));
  436. return IGraphTransformer::TStatus::Error;
  437. }
  438. }
  439. if (!calcWorldRoot) {
  440. calcWorldRoot = ctx.NewWorld(input->Pos());
  441. calcWorldRoot->SetTypeAnn(ctx.MakeType<TUnitExprType>());
  442. calcWorldRoot->SetState(TExprNode::EState::ConstrComplete);
  443. }
  444. return IGraphTransformer::TStatus::Ok;
  445. }), "CheckPure", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR, "Ensure expression is computable");
  446. pipeline.Add(MakePeepholeOptimization(&types), "PeepHole", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR, "Peephole optimizations");
  447. auto fullTransformer = pipeline.Build();
  448. TMarkReachable marked;
  449. marked.Scan(*output);
  450. THashSet<TStringBuf> modifyCallables;
  451. modifyCallables.insert(WriteName);
  452. modifyCallables.insert(ConfigureName);
  453. modifyCallables.insert(CommitName);
  454. modifyCallables.insert("CommitAll!");
  455. for (auto& dataSink: types.DataSinks) {
  456. dataSink->FillModifyCallables(modifyCallables);
  457. }
  458. IGraphTransformer::TStatus hasPendingEvaluations = IGraphTransformer::TStatus::Ok;
  459. TOptimizeExprSettings settings(nullptr);
  460. settings.VisitChanges = true;
  461. auto status = OptimizeExpr(output, output, [&](const TExprNode::TPtr& node, TExprContext& ctx)->TExprNode::TPtr {
  462. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  463. return MakeIntrusive<TIssue>(ctx.GetPosition(node->Pos()), TStringBuilder() << "At function: " << node->Content());
  464. });
  465. if (node->IsCallable("EvaluateIf!")) {
  466. if (!EnsureMinArgsCount(*node, 3, ctx)) {
  467. return nullptr;
  468. }
  469. if (!EnsureMaxArgsCount(*node, 4, ctx)) {
  470. return nullptr;
  471. }
  472. if (!EnsureLambda(*node->Child(2), ctx) || !EnsureArgsCount(*node->Child(2)->Child(0), 1, ctx)) {
  473. return nullptr;
  474. }
  475. if (node->ChildrenSize() == 4) {
  476. if (!EnsureLambda(*node->Child(3), ctx) || !EnsureArgsCount(*node->Child(3)->Child(0), 1, ctx)) {
  477. return nullptr;
  478. }
  479. }
  480. if (node->Child(1)->IsCallable(EvaluationFuncs)) {
  481. return node;
  482. }
  483. if (!node->Child(1)->IsCallable("Bool") || node->Child(1)->ChildrenSize() != 1) {
  484. ctx.AddError(TIssue(ctx.GetPosition(node->Child(1)->Pos()), TStringBuilder() << "Expected literal bool"));
  485. return nullptr;
  486. }
  487. auto predAtom = node->Child(1)->Child(0);
  488. ui8 predValue;
  489. if (predAtom->Flags() & TNodeFlags::BinaryContent) {
  490. if (predAtom->Content().size() != 1) {
  491. ctx.AddError(TIssue(ctx.GetPosition(predAtom->Pos()), TStringBuilder() << "Incorrect literal bool value"));
  492. return nullptr;
  493. }
  494. predValue = *(const ui8*)predAtom->Content().data();
  495. } else {
  496. predValue = (predAtom->Content() == "true" || predAtom->Content() == "1");
  497. }
  498. if (predValue) {
  499. return ctx.Builder(node->Pos())
  500. .Apply(node->ChildPtr(2))
  501. .With(0, node->ChildPtr(0))
  502. .Seal()
  503. .Build();
  504. } else if (node->ChildrenSize() == 4) {
  505. return ctx.Builder(node->Pos())
  506. .Apply(node->ChildPtr(3))
  507. .With(0, node->ChildPtr(0))
  508. .Seal()
  509. .Build();
  510. } else {
  511. return node->ChildPtr(0);
  512. }
  513. }
  514. if (node->IsCallable({"EvaluateFor!", "EvaluateParallelFor!"})) {
  515. const bool seq = node->IsCallable("EvaluateFor!");
  516. if (!EnsureMinArgsCount(*node, 3, ctx)) {
  517. return nullptr;
  518. }
  519. if (!EnsureMaxArgsCount(*node, 4, ctx)) {
  520. return nullptr;
  521. }
  522. if (!EnsureLambda(*node->Child(2), ctx) || !EnsureArgsCount(*node->Child(2)->Child(0), 2, ctx)) {
  523. return nullptr;
  524. }
  525. if (node->ChildrenSize() == 4) {
  526. if (!EnsureLambda(*node->Child(3), ctx) || !EnsureArgsCount(*node->Child(3)->Child(0), 1, ctx)) {
  527. return nullptr;
  528. }
  529. }
  530. auto list = node->Child(1);
  531. if (list->IsCallable(EvaluationFuncs)) {
  532. return node;
  533. }
  534. bool noData = false;
  535. if (list->IsCallable("Just")) {
  536. if (!EnsureArgsCount(*list, 1, ctx)) {
  537. return nullptr;
  538. }
  539. list = list->Child(0);
  540. } else if (list->IsCallable("Nothing") || list->IsCallable("Null")) {
  541. noData = true;
  542. }
  543. if (!noData && list->IsCallable("List") && list->ChildrenSize() == 1) {
  544. noData = true;
  545. }
  546. if (noData) {
  547. if (node->ChildrenSize() == 4) {
  548. return ctx.Builder(node->Pos())
  549. .Apply(node->ChildPtr(3))
  550. .With(0, node->ChildPtr(0))
  551. .Seal()
  552. .Build();
  553. } else {
  554. return node->ChildPtr(0);
  555. }
  556. }
  557. if (!list->IsCallable("List") && !list->IsCallable("AsList")) {
  558. ctx.AddError(TIssue(ctx.GetPosition(list->Pos()), TStringBuilder() << "Expected (optional) literal list"));
  559. return nullptr;
  560. }
  561. auto itemsCount = list->ChildrenSize() - (list->IsCallable("List") ? 1 : 0);
  562. const auto limit = seq ? types.EvaluateForLimit : types.EvaluateParallelForLimit;
  563. if (itemsCount > limit) {
  564. ctx.AddError(TIssue(ctx.GetPosition(list->Pos()), TStringBuilder() << "Too large list for EVALUATE " << (seq ? "" : "PARALLEL ") << "FOR, allowed: " <<
  565. limit << ", got: " << itemsCount));
  566. return nullptr;
  567. }
  568. auto world = node->ChildPtr(0);
  569. auto ret = ctx.Builder(node->Pos())
  570. .Callable(seq ? "Seq!" : "Sync!")
  571. .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
  572. ui32 pos = 0;
  573. if (seq) {
  574. parent.Add(pos++, world);
  575. }
  576. for (ui32 i = list->IsCallable("List") ? 1 : 0; i < list->ChildrenSize(); ++i) {
  577. auto arg = seq ? ctx.NewArgument(node->Pos(), "world") : world;
  578. auto body = ctx.Builder(node->Pos())
  579. .Apply(node->ChildPtr(2))
  580. .With(0, arg)
  581. .With(1, list->ChildPtr(i))
  582. .Seal()
  583. .Build();
  584. if (seq) {
  585. auto lambda = ctx.NewLambda(node->Pos(), ctx.NewArguments(node->Pos(), { arg }), std::move(body));
  586. parent.Add(pos++, lambda);
  587. } else {
  588. parent.Add(pos++, body);
  589. }
  590. }
  591. return parent;
  592. })
  593. .Seal()
  594. .Build();
  595. ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
  596. hasPendingEvaluations = hasPendingEvaluations.Combine(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true));
  597. return ret;
  598. }
  599. if (node->IsCallable(SubqueryExpandFuncs)) {
  600. if (!EnsureArgsCount(*node, 2, ctx)) {
  601. return nullptr;
  602. }
  603. if (node->IsCallable("SubqueryOrderBy") || node->IsCallable("SubqueryAssumeOrderBy")) {
  604. auto inputSub = node->Child(0);
  605. if (inputSub->IsArgument()) {
  606. return node;
  607. }
  608. if (!EnsureLambda(*inputSub, ctx) || !EnsureArgsCount(*inputSub->Child(0), 1, ctx)) {
  609. return nullptr;
  610. }
  611. auto keys = node->Child(1);
  612. if (keys->IsCallable(EvaluationFuncs)) {
  613. return node;
  614. }
  615. if (!keys->IsCallable("AsList") && !keys->IsCallable("List") && !keys->IsCallable("EmptyList")) {
  616. ctx.AddError(TIssue(ctx.GetPosition(keys->Pos()), TStringBuilder() << "Expected literal list"));
  617. return nullptr;
  618. }
  619. auto itemsCount = keys->ChildrenSize() - (keys->IsCallable("List") ? 1 : 0);
  620. if (itemsCount > types.EvaluateOrderByColumnLimit) {
  621. ctx.AddError(TIssue(ctx.GetPosition(keys->Pos()), TStringBuilder() << "Too many columns for subquery order by, allowed: " <<
  622. types.EvaluateOrderByColumnLimit << ", got: " << itemsCount));
  623. return nullptr;
  624. }
  625. auto arg = ctx.NewArgument(node->Pos(), "row");
  626. TExprNode::TListType dirItems;
  627. TExprNode::TListType extractorItems;
  628. for (ui32 i = keys->IsCallable("List") ? 1 : 0; i < keys->ChildrenSize(); ++i) {
  629. auto k = keys->Child(i);
  630. if (!k->IsList() || k->ChildrenSize() != 2) {
  631. ctx.AddError(TIssue(ctx.GetPosition(k->Pos()), TStringBuilder() << "Expected tuple of 2 items"));
  632. return nullptr;
  633. }
  634. auto columnName = k->Child(0);
  635. auto direction = k->Child(1);
  636. if (!columnName->IsCallable("String")) {
  637. ctx.AddError(TIssue(ctx.GetPosition(columnName->Pos()), TStringBuilder() << "Expected String as column name"));
  638. return nullptr;
  639. }
  640. if (!direction->IsCallable("Bool")) {
  641. ctx.AddError(TIssue(ctx.GetPosition(columnName->Pos()), TStringBuilder() << "Expected Bool as direction"));
  642. return nullptr;
  643. }
  644. dirItems.push_back(direction);
  645. extractorItems.push_back(ctx.Builder(k->Pos())
  646. .Callable("Member")
  647. .Add(0, arg)
  648. .Add(1, columnName->ChildPtr(0))
  649. .Seal()
  650. .Build());
  651. }
  652. auto args = ctx.NewArguments(node->Pos(), { arg });
  653. auto body = ctx.NewList(node->Pos(), std::move(extractorItems));
  654. auto extractorLambda = ctx.NewLambda(node->Pos(), std::move(args), std::move(body));
  655. auto dirs = ctx.NewList(node->Pos(), std::move(dirItems));
  656. auto sorted = ctx.Builder(node->Pos())
  657. .Lambda()
  658. .Param("world")
  659. .Callable(node->IsCallable("SubqueryOrderBy") ? "Sort" : "AssumeSorted")
  660. .Apply(0, inputSub)
  661. .With(0, "world")
  662. .Seal()
  663. .Add(1, dirs)
  664. .Add(2, extractorLambda)
  665. .Seal()
  666. .Seal()
  667. .Build();
  668. ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
  669. hasPendingEvaluations = hasPendingEvaluations.Combine(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true));
  670. return sorted;
  671. } else {
  672. auto list = node->Child(0);
  673. if (list->IsCallable(EvaluationFuncs)) {
  674. return node;
  675. }
  676. if (list->IsCallable("Just")) {
  677. list = list->Child(0);
  678. }
  679. if (!list->IsCallable("AsList") || list->ChildrenSize() == 0) {
  680. ctx.AddError(TIssue(ctx.GetPosition(list->Pos()), TStringBuilder() << "Expected non-empty literal list"));
  681. return nullptr;
  682. }
  683. auto itemsCount = list->ChildrenSize();
  684. if (itemsCount > types.EvaluateParallelForLimit) {
  685. ctx.AddError(TIssue(ctx.GetPosition(list->Pos()), TStringBuilder() << "Too large list for subquery loop, allowed: " <<
  686. types.EvaluateParallelForLimit << ", got: " << itemsCount));
  687. return nullptr;
  688. }
  689. if (node->Child(1)->IsCallable(EvaluationFuncs)) {
  690. return node;
  691. }
  692. const auto status = ConvertToLambda(node->ChildRef(1), ctx, 2, 2, false);
  693. if (status.Level == IGraphTransformer::TStatus::Error) {
  694. return nullptr;
  695. }
  696. const auto& lambda = node->Child(1);
  697. TExprNodeList argItems;
  698. argItems.push_back(ctx.NewArgument(node->Pos(), "world"));
  699. TExprNodeList inputs;
  700. for (ui32 i = 0; i < list->ChildrenSize(); ++i) {
  701. TNodeOnNodeOwnedMap replaces;
  702. replaces[lambda->Child(0)->Child(0)] = argItems[0];
  703. replaces[lambda->Child(0)->Child(1)] = list->ChildPtr(i);
  704. inputs.push_back(ctx.ReplaceNodes(lambda->TailPtr(), replaces));
  705. }
  706. auto body = ctx.NewCallable(node->Pos(), node->Content().substr(8, node->Content().size() - 8 - 3), std::move(inputs));
  707. auto args = ctx.NewArguments(node->Pos(), std::move(argItems));
  708. auto merged = ctx.NewLambda(node->Pos(), std::move(args), std::move(body));
  709. ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
  710. hasPendingEvaluations = hasPendingEvaluations.Combine(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true));
  711. return merged;
  712. }
  713. }
  714. if (node->IsCallable("MrTableEach") || node->IsCallable("MrTableEachStrict")) {
  715. TExprNode::TListType keys;
  716. TStringBuf prefix;
  717. if (node->ChildrenSize() != 0 && node->TailPtr()->IsAtom()) {
  718. prefix = node->TailPtr()->Content();
  719. }
  720. for (const auto& eachKey : node->Children()) {
  721. if (eachKey->IsAtom()) {
  722. continue;
  723. }
  724. if (!eachKey->IsCallable("Key")) {
  725. ctx.AddError(TIssue(ctx.GetPosition(eachKey->Pos()), TStringBuilder() << "Expected Key"));
  726. return nullptr;
  727. }
  728. if (!EnsureMinArgsCount(*eachKey, 1, ctx)) {
  729. return nullptr;
  730. }
  731. if (!eachKey->Child(0)->IsList() || eachKey->Child(0)->ChildrenSize() != 2 ||
  732. !eachKey->Child(0)->Child(0)->IsAtom() || eachKey->Child(0)->Child(0)->Content() != "table") {
  733. ctx.AddError(TIssue(ctx.GetPosition(eachKey->Pos()), TStringBuilder() << "Invalid Key"));
  734. return nullptr;
  735. }
  736. auto list = eachKey->Child(0)->Child(1);
  737. if (list->IsCallable(EvaluationFuncs)) {
  738. return node;
  739. }
  740. if (list->IsCallable("List") && list->ChildrenSize() == 0) {
  741. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Invalid literal list value"));
  742. return nullptr;
  743. }
  744. if (!list->IsCallable("List") && !list->IsCallable("AsList")) {
  745. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Expected literal list"));
  746. return nullptr;
  747. }
  748. for (ui32 i = list->IsCallable("List") ? 1 : 0; i < list->ChildrenSize(); ++i) {
  749. auto name = list->ChildPtr(i);
  750. if (!name->IsCallable("String")) {
  751. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Expected literal string as table name"));
  752. return nullptr;
  753. }
  754. if (prefix) {
  755. name = ctx.ChangeChild(*name, 0, ctx.NewAtom(node->Pos(), BuildTablePath(prefix, name->Child(0)->Content())));
  756. }
  757. keys.push_back(ctx.ReplaceNode(TExprNode::TPtr(eachKey), *list, std::move(name)));
  758. }
  759. }
  760. return node->IsCallable("MrTableEach") ?
  761. ctx.NewCallable(node->Pos(), "MrTableConcat", std::move(keys)) :
  762. ctx.NewList(node->Pos(), std::move(keys));
  763. }
  764. if (node->IsCallable("QuoteCode")) {
  765. if (marked.Reachable.find(node.Get()) == marked.Reachable.cend()) {
  766. hasPendingEvaluations = hasPendingEvaluations.Combine(IGraphTransformer::TStatus::Repeat);
  767. return node;
  768. }
  769. if (!EnsureArgsCount(*node, 1, ctx)) {
  770. return nullptr;
  771. }
  772. TNodeOnNodeOwnedMap knownArgs;
  773. TNodeOnNodeOwnedMap visited;
  774. return QuoteCode(node->ChildPtr(0), ctx, knownArgs, visited, marked.ExternalWorlds);
  775. }
  776. if (!node->IsCallable(EvaluationFuncs)) {
  777. return node;
  778. }
  779. if (!EnsureArgsCount(*node, 1, ctx)) {
  780. return nullptr;
  781. }
  782. if (marked.Reachable.find(node.Get()) == marked.Reachable.cend()) {
  783. if (marked.HasConfigPending) {
  784. ctx.Step.Repeat(TExprStep::Configure);
  785. }
  786. hasPendingEvaluations = hasPendingEvaluations.Combine(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, marked.HasConfigPending));
  787. return node;
  788. }
  789. auto newArg = node->ChildPtr(0);
  790. {
  791. TNodeSet visited;
  792. TNodeMap<const TExprNode*> activeArgs;
  793. bool hasUnresolvedTypes = false;
  794. if (!CheckPendingArgs(*newArg, visited, activeArgs, marked.ExternalWorlds, ctx, false, hasUnresolvedTypes)) {
  795. return nullptr;
  796. }
  797. if (hasUnresolvedTypes) {
  798. YQL_CLOG(DEBUG, CoreEval) << "EvaluateExpression - has unresolved types";
  799. return node;
  800. }
  801. }
  802. TNodeOnNodeOwnedMap externalWorldReplaces;
  803. for (auto& x : marked.ExternalWorlds) {
  804. externalWorldReplaces.emplace(x.first, ctx.NewWorld(x.first->Pos()));
  805. }
  806. newArg = ctx.ReplaceNodes(std::move(newArg), externalWorldReplaces);
  807. TExprNode::TPtr clonedArg;
  808. {
  809. TNodeOnNodeOwnedMap deepClones;
  810. clonedArg = ctx.DeepCopy(*newArg, ctx, deepClones, false, true, true);
  811. }
  812. // trim modifications
  813. TOptimizeExprSettings settings(nullptr);
  814. settings.VisitChanges = true;
  815. auto status = OptimizeExpr(clonedArg, clonedArg, [&](const TExprNode::TPtr& node, TExprContext& ctx) {
  816. Y_UNUSED(ctx);
  817. if (node->IsCallable(modifyCallables) && node->ChildrenSize() > 0) {
  818. return node->ChildPtr(0);
  819. }
  820. return node;
  821. }, ctx, settings);
  822. if (status.Level != IGraphTransformer::TStatus::Ok) {
  823. return nullptr;
  824. }
  825. pipelinePos = node->Pos();
  826. isAtomPipeline = node->IsCallable("EvaluateAtom");
  827. isTypePipeline = node->IsCallable("EvaluateType");
  828. isCodePipeline = node->IsCallable("EvaluateCode");
  829. isOptionalAtom = false;
  830. if (isTypePipeline) {
  831. clonedArg = ctx.NewCallable(clonedArg->Pos(), "SerializeTypeHandle", { clonedArg });
  832. } else if (isCodePipeline) {
  833. clonedArg = ctx.NewCallable(clonedArg->Pos(), "SerializeCode", { clonedArg });
  834. }
  835. TString key, yson;
  836. NYT::TNode ysonNode;
  837. if (types.QContext) {
  838. key = MakeCacheKey(*clonedArg);
  839. if (types.QContext.CanRead()) {
  840. auto item = types.QContext.GetReader()->Get({EvaluationComponent, key}).GetValueSync();
  841. if (!item) {
  842. throw yexception() << "Missing replay data";
  843. }
  844. ysonNode = NYT::NodeFromYsonString(item->Value);
  845. }
  846. }
  847. do {
  848. if (ysonNode.IsUndefined() && isAtomPipeline && clonedArg->IsCallable("String")) {
  849. ysonNode = NYT::TNode()("Data",NYT::TNode(clonedArg->Head().Content()));
  850. yson = NYT::NodeToYsonString(ysonNode, NYT::NYson::EYsonFormat::Binary);
  851. } else {
  852. calcProvider.Clear();
  853. calcWorldRoot.Drop();
  854. fullTransformer->Rewind();
  855. auto prevSteps = ctx.Step;
  856. TEvalScope scope(types);
  857. ctx.Step.Reset();
  858. if (prevSteps.IsDone(TExprStep::Recapture)) {
  859. ctx.Step.Done(TExprStep::Recapture);
  860. }
  861. status = SyncTransform(*fullTransformer, clonedArg, ctx);
  862. ctx.Step = prevSteps;
  863. if (status.Level == IGraphTransformer::TStatus::Error) {
  864. return nullptr;
  865. }
  866. // execute calcWorldRoot
  867. auto execTransformer = CreateExecutionTransformer(types, [](const TOperationProgress&){}, false);
  868. status = SyncTransform(*execTransformer, calcWorldRoot, ctx);
  869. if (status.Level == IGraphTransformer::TStatus::Error) {
  870. return nullptr;
  871. }
  872. if (types.QContext.CanRead()) {
  873. break;
  874. }
  875. IDataProvider::TFillSettings fillSettings;
  876. auto delegatedNode = Build<TResult>(ctx, node->Pos())
  877. .Input(clonedArg)
  878. .BytesLimit()
  879. .Value(TString())
  880. .Build()
  881. .RowsLimit()
  882. .Value(TString())
  883. .Build()
  884. .FormatDetails()
  885. .Value(ToString((ui32)NYson::EYsonFormat::Binary))
  886. .Build()
  887. .Settings().Build()
  888. .Format()
  889. .Value(ToString((ui32)IDataProvider::EResultFormat::Yson))
  890. .Build()
  891. .PublicId()
  892. .Value(TString())
  893. .Build()
  894. .Discard()
  895. .Value("false")
  896. .Build()
  897. .Origin(calcWorldRoot)
  898. .Done().Ptr();
  899. auto atomType = ctx.MakeType<TUnitExprType>();
  900. for (auto idx: {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails,
  901. TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard, TResOrPullBase::idx_Settings }) {
  902. delegatedNode->Child(idx)->SetTypeAnn(atomType);
  903. delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete);
  904. }
  905. delegatedNode->SetTypeAnn(atomType);
  906. delegatedNode->SetState(TExprNode::EState::ConstrComplete);
  907. auto& transformer = calcTransfomer ? *calcTransfomer : (*calcProvider.Get())->GetCallableExecutionTransformer();
  908. status = SyncTransform(transformer, delegatedNode, ctx);
  909. if (status.Level == IGraphTransformer::TStatus::Error) {
  910. return nullptr;
  911. }
  912. yson = TString{delegatedNode->GetResult().Content()};
  913. ysonNode = NYT::NodeFromYsonString(yson);
  914. }
  915. if (ysonNode.HasKey("FallbackProvider")) {
  916. nextProvider = ysonNode["FallbackProvider"].AsString();
  917. } else if (types.QContext.CanWrite()) {
  918. types.QContext.GetWriter()->Put({EvaluationComponent, key}, yson).GetValueSync();
  919. }
  920. } while (ysonNode.HasKey("FallbackProvider"));
  921. auto dataNode = ysonNode["Data"];
  922. if (isAtomPipeline) {
  923. if (isOptionalAtom) {
  924. if (dataNode.IsEntity() || dataNode.AsList().empty()) {
  925. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to get atom from an empty optional"));
  926. return nullptr;
  927. }
  928. dataNode = dataNode.AsList().front();
  929. }
  930. TString value;
  931. if (dataNode.IsString()) {
  932. value = dataNode.AsString();
  933. } else {
  934. YQL_ENSURE(dataNode.IsList() && dataNode.AsList().size() == 1 && dataNode.AsList().front().IsString(), "Unexpected atom value: " << NYT::NodeToYsonString(dataNode));
  935. value = Base64Decode(dataNode.AsList().front().AsString());
  936. }
  937. return ctx.NewAtom(node->Pos(), value);
  938. }
  939. TScopedAlloc alloc(__LOCATION__);
  940. TTypeEnvironment env(alloc);
  941. TStringStream err;
  942. TProgramBuilder pgmBuilder(env, functionRegistry);
  943. TType* mkqlType = NCommon::BuildType(*clonedArg->GetTypeAnn(), pgmBuilder, err);
  944. if (!mkqlType) {
  945. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to process type: " << err.Str()));
  946. return nullptr;
  947. }
  948. TMemoryUsageInfo memInfo("Eval");
  949. THolderFactory holderFactory(alloc.Ref(), memInfo);
  950. auto value = NCommon::ParseYsonNodeInResultFormat(holderFactory, dataNode, mkqlType, &err);
  951. if (!value) {
  952. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to parse data: " << err.Str()));
  953. return nullptr;
  954. }
  955. if (isTypePipeline) {
  956. auto yson = TStringBuf(value->AsStringRef());
  957. auto type = NCommon::ParseTypeFromYson(yson, ctx, ctx.GetPosition(node->Pos()));
  958. if (!type) {
  959. return nullptr;
  960. }
  961. return ExpandType(node->Pos(), *type, ctx);
  962. }
  963. if (isCodePipeline) {
  964. TExprNode::TPtr result = DeserializeGraph(node->Pos(), TStringBuf(value->AsStringRef()), ctx);
  965. if (!result) {
  966. return nullptr;
  967. }
  968. TNodeOnNodeOwnedMap replaces;
  969. VisitExpr(*result, [&](const TExprNode& input) {
  970. if (input.IsCallable("WorldArg")) {
  971. YQL_ENSURE(input.ChildrenSize() == 1 && input.Child(0)->IsAtom());
  972. auto index = FromString<ui32>(input.Child(0)->Content());
  973. YQL_ENSURE(index < marked.ExternalWorldsList.size());
  974. YQL_ENSURE(replaces.emplace(&input, marked.ExternalWorldsList[index]).second);
  975. }
  976. return true;
  977. });
  978. result = ctx.ReplaceNodes(std::move(result), replaces);
  979. ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
  980. hasPendingEvaluations = hasPendingEvaluations.Combine(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true));
  981. return result;
  982. }
  983. return NCommon::ValueToExprLiteral(clonedArg->GetTypeAnn(), *value, ctx, node->Pos());
  984. }, ctx, settings);
  985. if (status.Level == IGraphTransformer::TStatus::Error) {
  986. return status;
  987. }
  988. if (hasPendingEvaluations != IGraphTransformer::TStatus::Ok) {
  989. YQL_CLOG(DEBUG, CoreEval) << "EvaluateExpression - has pending evaluations";
  990. return hasPendingEvaluations;
  991. }
  992. YQL_CLOG(DEBUG, CoreEval) << "EvaluateExpression - finish";
  993. // repeat some steps
  994. ctx.Step.Repeat(TExprStep::ValidateProviders);
  995. ctx.Step.Repeat(TExprStep::Configure);
  996. ctx.Step.Done(TExprStep::ExprEval);
  997. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
  998. }
  999. } // namespace NYql