yql_yt_helpers.cpp 85 KB


  1. #include "yql_yt_helpers.h"
  2. #include "yql_yt_provider_impl.h"
  3. #include "yql_yt_op_settings.h"
  4. #include "yql_yt_op_hash.h"
  5. #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
  6. #include <ydb/library/yql/providers/yt/lib/mkql_helpers/mkql_helpers.h>
  7. #include <ydb/library/yql/providers/yt/common/yql_configuration.h>
  8. #include <ydb/library/yql/providers/yt/opt/yql_yt_key_selector.h>
  9. #include <ydb/library/yql/providers/common/provider/yql_provider.h>
  10. #include <ydb/library/yql/providers/common/codec/yql_codec_type_flags.h>
  11. #include <ydb/library/yql/providers/common/codec/yql_codec.h>
  12. #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
  13. #include <ydb/library/yql/core/type_ann/type_ann_expr.h>
  14. #include <ydb/library/yql/core/type_ann/type_ann_core.h>
  15. #include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
  16. #include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h>
  17. #include <ydb/library/yql/core/yql_expr_optimize.h>
  18. #include <ydb/library/yql/core/yql_expr_constraint.h>
  19. #include <ydb/library/yql/core/yql_expr_csee.h>
  20. #include <ydb/library/yql/core/yql_graph_transformer.h>
  21. #include <ydb/library/yql/core/yql_opt_utils.h>
  22. #include <ydb/library/yql/ast/yql_expr.h>
  23. #include <ydb/library/yql/utils/log/log.h>
  24. #include <library/cpp/yson/node/node_io.h>
  25. #include <util/string/cast.h>
  26. #include <util/string/hex.h>
  27. #include <util/generic/xrange.h>
  28. #include <util/generic/utility.h>
  29. #include <util/generic/algorithm.h>
  30. #include <util/generic/bitmap.h>
  31. namespace NYql {
  32. using namespace NNodes;
  33. namespace {
  34. bool IsYtIsolatedLambdaImpl(const TExprNode& lambdaBody, TSyncMap& syncList, TString* usedCluster, bool supportsDq, TNodeSet& visited) {
  35. if (!visited.insert(&lambdaBody).second) {
  36. return true;
  37. }
  38. if (TMaybeNode<TCoTypeOf>(&lambdaBody)) {
  39. return true;
  40. }
  41. if (auto maybeLength = TMaybeNode<TYtLength>(&lambdaBody)) {
  42. if (auto maybeRead = maybeLength.Input().Maybe<TYtReadTable>()) {
  43. auto read = maybeRead.Cast();
  44. if (usedCluster && !UpdateUsedCluster(*usedCluster, TString{read.DataSource().Cluster().Value()})) {
  45. return false;
  46. }
  47. syncList.emplace(read.Ptr(), syncList.size());
  48. }
  49. if (auto maybeOutput = maybeLength.Input().Maybe<TYtOutput>()) {
  50. auto output = maybeOutput.Cast();
  51. if (usedCluster && !UpdateUsedCluster(*usedCluster, TString{GetOutputOp(output).DataSink().Cluster().Value()})) {
  52. return false;
  53. }
  54. syncList.emplace(output.Operation().Ptr(), syncList.size());
  55. }
  56. return true;
  57. }
  58. if (auto maybeContent = TMaybeNode<TYtTableContent>(&lambdaBody)) {
  59. if (auto maybeRead = maybeContent.Input().Maybe<TYtReadTable>()) {
  60. auto read = maybeRead.Cast();
  61. if (usedCluster && !UpdateUsedCluster(*usedCluster, TString{read.DataSource().Cluster().Value()})) {
  62. return false;
  63. }
  64. syncList.emplace(read.Ptr(), syncList.size());
  65. }
  66. if (auto maybeOutput = maybeContent.Input().Maybe<TYtOutput>()) {
  67. auto output = maybeOutput.Cast();
  68. if (usedCluster && !UpdateUsedCluster(*usedCluster, TString{GetOutputOp(output).DataSink().Cluster().Value()})) {
  69. return false;
  70. }
  71. syncList.emplace(output.Operation().Ptr(), syncList.size());
  72. }
  73. return true;
  74. }
  75. if (auto maybeContent = TMaybeNode<TDqReadWrapBase>(&lambdaBody)) {
  76. if (!supportsDq) {
  77. return false;
  78. }
  79. if (auto maybeRead = maybeContent.Input().Maybe<TYtReadTable>()) {
  80. auto read = maybeRead.Cast();
  81. if (usedCluster && !UpdateUsedCluster(*usedCluster, TString{read.DataSource().Cluster().Value()})) {
  82. return false;
  83. }
  84. syncList.emplace(read.Ptr(), syncList.size());
  85. }
  86. if (auto maybeOutput = maybeContent.Input().Maybe<TYtOutput>()) {
  87. auto output = maybeOutput.Cast();
  88. if (usedCluster && !UpdateUsedCluster(*usedCluster, TString{GetOutputOp(output).DataSink().Cluster().Value()})) {
  89. return false;
  90. }
  91. syncList.emplace(output.Operation().Ptr(), syncList.size());
  92. }
  93. return true;
  94. }
  95. if (!supportsDq && (TDqConnection::Match(&lambdaBody) || TDqPhyPrecompute::Match(&lambdaBody) || TDqStageBase::Match(&lambdaBody) || TDqSourceWrapBase::Match(&lambdaBody))) {
  96. return false;
  97. }
  98. if (auto maybeRead = TMaybeNode<TCoRight>(&lambdaBody).Input().Maybe<TYtReadTable>()) {
  99. auto read = maybeRead.Cast();
  100. if (usedCluster && !UpdateUsedCluster(*usedCluster, TString{read.DataSource().Cluster().Value()})) {
  101. return false;
  102. }
  103. syncList.emplace(read.Ptr(), syncList.size());
  104. return true;
  105. } else if (auto out = TMaybeNode<TYtOutput>(&lambdaBody)) {
  106. auto op = GetOutputOp(out.Cast());
  107. if (usedCluster && !UpdateUsedCluster(*usedCluster, TString{op.DataSink().Cluster().Value()})) {
  108. return false;
  109. }
  110. syncList.emplace(out.Cast().Operation().Ptr(), syncList.size());
  111. return true;
  112. }
  113. if (auto right = TMaybeNode<TCoRight>(&lambdaBody).Input()) {
  114. if (auto maybeCons = right.Maybe<TCoCons>()) {
  115. syncList.emplace(maybeCons.Cast().World().Ptr(), syncList.size());
  116. return IsYtIsolatedLambdaImpl(maybeCons.Cast().Input().Ref(), syncList, usedCluster, supportsDq, visited);
  117. }
  118. if (right.Cast().Raw()->IsCallable("PgReadTable!")) {
  119. syncList.emplace(right.Cast().Raw()->HeadPtr(), syncList.size());
  120. return true;
  121. }
  122. }
  123. if (lambdaBody.IsCallable("WithWorld")) {
  124. syncList.emplace(lambdaBody.ChildPtr(1), syncList.size());
  125. return true;
  126. }
  127. if (!lambdaBody.GetTypeAnn()->IsComposable()) {
  128. return false;
  129. }
  130. for (auto& child : lambdaBody.Children()) {
  131. if (!IsYtIsolatedLambdaImpl(*child, syncList, usedCluster, supportsDq, visited)) {
  132. return false;
  133. }
  134. }
  135. return true;
  136. }
  137. IGraphTransformer::TStatus EstimateDataSize(TVector<ui64>& result, TSet<TString>& requestedColumns,
  138. const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
  139. const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx, bool sync)
  140. {
  141. result.clear();
  142. result.resize(paths.size(), 0);
  143. requestedColumns.clear();
  144. const bool useColumnarStat = GetJoinCollectColumnarStatisticsMode(*state.Configuration) != EJoinCollectColumnarStatisticsMode::Disable
  145. && !state.Types->UseTableMetaFromGraph;
  146. TVector<size_t> reqMap;
  147. TVector<IYtGateway::TPathStatReq> pathStatReqs;
  148. for (size_t i: xrange(paths.size())) {
  149. const TYtPathInfo::TPtr& pathInfo = paths[i];
  150. YQL_ENSURE(pathInfo->Table->Stat);
  151. result[i] = pathInfo->Table->Stat->DataSize;
  152. if (pathInfo->Ranges) {
  153. if (auto usedRows = pathInfo->Ranges->GetUsedRows(pathInfo->Table->Stat->RecordsCount)) {
  154. if (usedRows.GetRef() && pathInfo->Table->Stat->RecordsCount) {
  155. result[i] *= double(usedRows.GetRef()) / double(pathInfo->Table->Stat->RecordsCount);
  156. } else {
  157. result[i] = 0;
  158. }
  159. }
  160. }
  161. if (useColumnarStat) {
  162. TMaybe<TVector<TString>> overrideColumns;
  163. if (columns && pathInfo->Table->RowSpec && (pathInfo->Table->RowSpec->StrictSchema || nullptr == FindPtr(*columns, YqlOthersColumnName))) {
  164. overrideColumns = columns;
  165. }
  166. auto ytPath = BuildYtPathForStatRequest(cluster, *pathInfo, overrideColumns, state, ctx);
  167. if (!ytPath) {
  168. return IGraphTransformer::TStatus::Error;
  169. }
  170. if (ytPath->Columns_) {
  171. pathStatReqs.push_back(
  172. IYtGateway::TPathStatReq()
  173. .Path(*ytPath)
  174. .IsTemp(pathInfo->Table->IsTemp)
  175. .IsAnonymous(pathInfo->Table->IsAnonymous)
  176. .Epoch(pathInfo->Table->Epoch.GetOrElse(0))
  177. );
  178. reqMap.push_back(i);
  179. }
  180. }
  181. }
  182. if (!pathStatReqs.empty()) {
  183. for (auto& req : pathStatReqs) {
  184. YQL_ENSURE(req.Path().Columns_);
  185. requestedColumns.insert(req.Path().Columns_->Parts_.begin(), req.Path().Columns_->Parts_.end());
  186. }
  187. IYtGateway::TPathStatResult pathStats;
  188. IYtGateway::TPathStatOptions pathStatOptions =
  189. IYtGateway::TPathStatOptions(state.SessionId)
  190. .Cluster(cluster)
  191. .Paths(pathStatReqs)
  192. .Config(state.Configuration->Snapshot());
  193. if (sync) {
  194. auto future = state.Gateway->PathStat(std::move(pathStatOptions));
  195. pathStats = future.GetValueSync();
  196. pathStats.ReportIssues(ctx.IssueManager);
  197. if (!pathStats.Success()) {
  198. return IGraphTransformer::TStatus::Error;
  199. }
  200. } else {
  201. pathStats = state.Gateway->TryPathStat(std::move(pathStatOptions));
  202. if (!pathStats.Success()) {
  203. return IGraphTransformer::TStatus::Repeat;
  204. }
  205. }
  206. YQL_ENSURE(pathStats.DataSize.size() == reqMap.size());
  207. for (size_t i: xrange(pathStats.DataSize.size())) {
  208. result[reqMap[i]] = pathStats.DataSize[i];
  209. }
  210. }
  211. return IGraphTransformer::TStatus::Ok;
  212. }
  213. bool NeedCalc(NNodes::TExprBase node) {
  214. auto type = node.Ref().GetTypeAnn();
  215. if (type->IsSingleton()) {
  216. return false;
  217. }
  218. if (type->GetKind() == ETypeAnnotationKind::Optional) {
  219. if (node.Maybe<TCoNothing>()) {
  220. return false;
  221. }
  222. if (auto maybeJust = node.Maybe<TCoJust>()) {
  223. return NeedCalc(maybeJust.Cast().Input());
  224. }
  225. return true;
  226. }
  227. if (type->GetKind() == ETypeAnnotationKind::Tuple) {
  228. if (auto maybeTuple = node.Maybe<TExprList>()) {
  229. return AnyOf(maybeTuple.Cast(), [](const auto& item) { return NeedCalc(item); });
  230. }
  231. return true;
  232. }
  233. if (type->GetKind() == ETypeAnnotationKind::List) {
  234. if (node.Maybe<TCoList>()) {
  235. YQL_ENSURE(node.Ref().ChildrenSize() == 1, "Should be rewritten to AsList");
  236. return false;
  237. }
  238. if (auto maybeAsList = node.Maybe<TCoAsList>()) {
  239. return AnyOf(maybeAsList.Cast().Args(), [](const auto& item) { return NeedCalc(NNodes::TExprBase(item)); });
  240. }
  241. return true;
  242. }
  243. YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::Data,
  244. "Object of type " << *type << " should not be considered for calculation");
  245. return !node.Maybe<TCoDataCtor>();
  246. }
  247. } // unnamed
  248. bool UpdateUsedCluster(TString& usedCluster, const TString& newCluster) {
  249. if (!usedCluster) {
  250. usedCluster = newCluster;
  251. } else if (usedCluster != newCluster) {
  252. return false;
  253. }
  254. return true;
  255. }
  256. bool IsYtIsolatedLambda(const TExprNode& lambdaBody, TSyncMap& syncList, bool supportsDq) {
  257. TNodeSet visited;
  258. return IsYtIsolatedLambdaImpl(lambdaBody, syncList, nullptr, supportsDq, visited);
  259. }
  260. bool IsYtIsolatedLambda(const TExprNode& lambdaBody, TSyncMap& syncList, TString& usedCluster, bool supportsDq) {
  261. TNodeSet visited;
  262. return IsYtIsolatedLambdaImpl(lambdaBody, syncList, &usedCluster, supportsDq, visited);
  263. }
  264. bool IsYtCompleteIsolatedLambda(const TExprNode& lambda, TSyncMap& syncList, bool supportsDq) {
  265. return lambda.IsComplete() && IsYtIsolatedLambda(lambda, syncList, supportsDq);
  266. }
  267. bool IsYtCompleteIsolatedLambda(const TExprNode& lambda, TSyncMap& syncList, TString& usedCluster, bool supportsDq) {
  268. return lambda.IsComplete() && IsYtIsolatedLambda(lambda, syncList, usedCluster, supportsDq);
  269. }
  270. TExprNode::TPtr YtCleanupWorld(const TExprNode::TPtr& input, TExprContext& ctx, TYtState::TPtr state) {
  271. TExprNode::TPtr output = input;
  272. TNodeOnNodeOwnedMap remaps;
  273. VisitExpr(output, [&remaps, &ctx](const TExprNode::TPtr& node) {
  274. if (TYtLength::Match(node.Get())) {
  275. return false;
  276. }
  277. if (TYtTableContent::Match(node.Get())) {
  278. return false;
  279. }
  280. if (auto read = TMaybeNode<TCoRight>(node).Input().Maybe<TYtReadTable>()) {
  281. remaps[node.Get()] = Build<TYtTableContent>(ctx, node->Pos())
  282. .Input(read.Cast())
  283. .Settings().Build()
  284. .Done().Ptr();
  285. return false;
  286. }
  287. if (TYtReadTable::Match(node.Get())) {
  288. return false;
  289. }
  290. if (node->IsCallable("WithWorld")) {
  291. remaps[node.Get()] = node->HeadPtr();
  292. return false;
  293. }
  294. TDynBitMap outs;
  295. for (size_t i = 0; i < node->ChildrenSize(); ++i) {
  296. if (TYtOutput::Match(node->Child(i))) {
  297. outs.Set(i);
  298. }
  299. }
  300. if (!outs.Empty()) {
  301. auto res = node;
  302. Y_FOR_EACH_BIT(i, outs) {
  303. res = ctx.ChangeChild(*res, i,
  304. Build<TYtTableContent>(ctx, node->Pos())
  305. .Input(node->ChildPtr(i))
  306. .Settings().Build()
  307. .Done().Ptr()
  308. );
  309. }
  310. remaps[node.Get()] = res;
  311. }
  312. if (TYtOutput::Match(node.Get())) {
  313. return false;
  314. }
  315. if (auto right = TMaybeNode<TCoRight>(node)) {
  316. auto cons = right.Cast().Input().Maybe<TCoCons>();
  317. if (cons) {
  318. remaps[node.Get()] = cons.Cast().Input().Ptr();
  319. return false;
  320. }
  321. if (right.Cast().Input().Ref().IsCallable("PgReadTable!")) {
  322. const auto& read = right.Cast().Input().Ref();
  323. remaps[node.Get()] = ctx.Builder(node->Pos())
  324. .Callable("PgTableContent")
  325. .Add(0, read.Child(1)->TailPtr())
  326. .Add(1, read.ChildPtr(2))
  327. .Add(2, read.ChildPtr(3))
  328. .Add(3, read.ChildPtr(4))
  329. .Seal()
  330. .Build();
  331. }
  332. }
  333. return true;
  334. });
  335. if (output->IsLambda() && TYtOutput::Match(output->Child(1))) {
  336. remaps[output->Child(1)] = Build<TYtTableContent>(ctx, output->Child(1)->Pos())
  337. .Input(output->ChildPtr(1))
  338. .Settings().Build()
  339. .Done().Ptr();
  340. }
  341. IGraphTransformer::TStatus status = IGraphTransformer::TStatus::Ok;
  342. if (!remaps.empty()) {
  343. TOptimizeExprSettings settings(state->Types);
  344. settings.VisitChanges = true;
  345. settings.VisitTuples = true;
  346. status = RemapExpr(output, output, remaps, ctx, settings);
  347. }
  348. remaps.clear();
  349. TNodeSet visitedReadTables;
  350. ui64 sumSize = 0;
  351. TMaybe<TPositionHandle> bigPos;
  352. VisitExpr(output, [&remaps, &ctx, &visitedReadTables, &sumSize, &bigPos, state](const TExprNode::TPtr& node) {
  353. if (auto maybeRead = TMaybeNode<TYtReadTable>(node)) {
  354. if (state->Types->EvaluationInProgress &&
  355. state->Configuration->EvaluationTableSizeLimit.Get() &&
  356. visitedReadTables.emplace(maybeRead.Cast().Raw()).second) {
  357. for (auto section : TYtSectionList(maybeRead.Cast().Input())) {
  358. for (auto path : section.Paths()) {
  359. auto info = TYtTableBaseInfo::Parse(path.Table());
  360. if (info && info->Stat) {
  361. sumSize += info->Stat->DataSize;
  362. if (info->Stat->DataSize > *state->Configuration->EvaluationTableSizeLimit.Get()) {
  363. bigPos = path.Table().Pos();
  364. }
  365. }
  366. }
  367. }
  368. }
  369. if (maybeRead.Cast().World().Ref().Type() != TExprNode::World) {
  370. remaps[node.Get()] = ctx.ChangeChild(*node, 0, ctx.NewWorld(node->Pos()));
  371. }
  372. return false;
  373. }
  374. if (TYtOutput::Match(node.Get())) {
  375. return false;
  376. }
  377. return true;
  378. });
  379. if (state->Types->EvaluationInProgress && state->Configuration->EvaluationTableSizeLimit.Get()) {
  380. if (sumSize > *state->Configuration->EvaluationTableSizeLimit.Get()) {
  381. ctx.AddError(TIssue(ctx.GetPosition(bigPos.GetOrElse(input->Pos())), TStringBuilder() << "Too large table(s) for evaluation pass: "
  382. << sumSize << " > " << *state->Configuration->EvaluationTableSizeLimit.Get()));
  383. return nullptr;
  384. }
  385. }
  386. if (!remaps.empty()) {
  387. TOptimizeExprSettings settings(state->Types);
  388. settings.VisitChanges = true;
  389. status = status.Combine(RemapExpr(output, output, remaps, ctx, settings));
  390. }
  391. YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Error, "Bad input graph");
  392. if (state->Types->EvaluationInProgress) {
  393. status = status.Combine(SubstTables(output, state, false, ctx));
  394. YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Error, "Subst tables failed");
  395. }
  396. return output;
  397. }
  398. TYtOutputOpBase GetOutputOp(TYtOutput output) {
  399. if (const auto tr = output.Operation().Maybe<TYtTryFirst>()) {
  400. return tr.Cast().Second();
  401. }
  402. return output.Operation().Cast<TYtOutputOpBase>();
  403. }
  404. TVector<TYtTableBaseInfo::TPtr> GetInputTableInfos(TExprBase input) {
  405. TVector<TYtTableBaseInfo::TPtr> res;
  406. if (auto out = input.Maybe<TYtOutput>()) {
  407. res.push_back(MakeIntrusive<TYtOutTableInfo>(GetOutTable(out.Cast())));
  408. res.back()->IsUnordered = IsUnorderedOutput(out.Cast());
  409. } else {
  410. auto read = input.Maybe<TCoRight>().Input().Maybe<TYtReadTable>();
  411. YQL_ENSURE(read, "Unknown operation input");
  412. for (auto section: read.Cast().Input()) {
  413. for (auto path: section.Paths()) {
  414. res.push_back(TYtTableBaseInfo::Parse(path.Table()));
  415. }
  416. }
  417. }
  418. return res;
  419. }
  420. TVector<TYtPathInfo::TPtr> GetInputPaths(TExprBase input) {
  421. TVector<TYtPathInfo::TPtr> res;
  422. if (auto out = input.Maybe<TYtOutput>()) {
  423. res.push_back(MakeIntrusive<TYtPathInfo>());
  424. res.back()->Table = MakeIntrusive<TYtOutTableInfo>(GetOutTable(out.Cast()));
  425. res.back()->Table->IsUnordered = IsUnorderedOutput(out.Cast());
  426. } else {
  427. auto read = input.Maybe<TCoRight>().Input().Maybe<TYtReadTable>();
  428. YQL_ENSURE(read, "Unknown operation input");
  429. for (auto section: read.Cast().Input()) {
  430. for (auto path: section.Paths()) {
  431. res.push_back(MakeIntrusive<TYtPathInfo>(path));
  432. }
  433. }
  434. }
  435. return res;
  436. }
  437. TStringBuf GetClusterName(NNodes::TExprBase input) {
  438. if (auto read = input.Maybe<TCoRight>().Input().Maybe<TYtReadTable>()) {
  439. return read.Cast().DataSource().Cluster().Value();
  440. } else if (auto output = input.Maybe<TYtOutput>()) {
  441. return GetOutputOp(output.Cast()).DataSink().Cluster().Value();
  442. } else if (auto op = input.Maybe<TCoRight>().Input().Maybe<TYtOutputOpBase>()) {
  443. return op.Cast().DataSink().Cluster().Value();
  444. } else {
  445. YQL_ENSURE(false, "Unknown operation input");
  446. }
  447. }
  448. bool IsYtProviderInput(NNodes::TExprBase input, bool withVariantList) {
  449. if (input.Maybe<TYtOutput>()) {
  450. return true;
  451. }
  452. if (auto maybeYtInput = input.Maybe<TCoRight>().Input()) {
  453. if (withVariantList && maybeYtInput.Maybe<TYtOutputOpBase>()) {
  454. return true;
  455. }
  456. if (auto maybeRead = maybeYtInput.Maybe<TYtReadTable>()) {
  457. return withVariantList || maybeRead.Cast().Input().Size() == 1;
  458. }
  459. }
  460. return false;
  461. }
  462. bool IsConstExpSortDirections(NNodes::TExprBase sortDirections) {
  463. if (sortDirections.Maybe<TCoBool>()) {
  464. return true;
  465. } else if (sortDirections.Maybe<TExprList>()) {
  466. for (auto child: sortDirections.Cast<TExprList>()) {
  467. if (!child.Maybe<TCoBool>()) {
  468. return false;
  469. }
  470. }
  471. return true;
  472. }
  473. return false;
  474. }
  475. TExprNode::TListType GetNodesToCalculate(const TExprNode::TPtr& input) {
  476. TExprNode::TListType needCalc;
  477. TNodeSet uniqNodes;
  478. VisitExpr(input, [&needCalc, &uniqNodes](const TExprNode::TPtr& node) {
  479. if (auto maybeOp = TMaybeNode<TYtTransientOpBase>(node)) {
  480. auto op = maybeOp.Cast();
  481. for (auto setting: op.Settings()) {
  482. switch (FromString<EYtSettingType>(setting.Name().Value())) {
  483. case EYtSettingType::Limit:
  484. for (auto expr: setting.Value().Cast().Ref().Children()) {
  485. for (auto item: expr->Children()) {
  486. if (uniqNodes.insert(item->Child(1)).second) {
  487. if (NeedCalc(TExprBase(item->Child(1)))) {
  488. needCalc.push_back(item->ChildPtr(1));
  489. }
  490. }
  491. }
  492. }
  493. break;
  494. default:
  495. break;
  496. }
  497. }
  498. }
  499. else if (auto maybeSection = TMaybeNode<TYtSection>(node)) {
  500. TYtSection section = maybeSection.Cast();
  501. for (auto setting: section.Settings()) {
  502. switch (FromString<EYtSettingType>(setting.Name().Value())) {
  503. case EYtSettingType::Take:
  504. case EYtSettingType::Skip:
  505. if (uniqNodes.insert(setting.Value().Cast().Raw()).second) {
  506. if (NeedCalc(setting.Value().Cast())) {
  507. needCalc.push_back(setting.Value().Cast().Ptr());
  508. }
  509. }
  510. break;
  511. case EYtSettingType::KeyFilter: {
  512. auto value = setting.Value().Cast<TExprList>();
  513. if (value.Size() > 0) {
  514. for (auto member: value.Item(0).Cast<TCoNameValueTupleList>()) {
  515. for (auto cmp: member.Value().Cast<TCoNameValueTupleList>()) {
  516. if (cmp.Value() && uniqNodes.insert(cmp.Value().Cast().Raw()).second) {
  517. if (NeedCalc(cmp.Value().Cast())) {
  518. needCalc.push_back(cmp.Value().Cast().Ptr());
  519. }
  520. }
  521. }
  522. }
  523. }
  524. break;
  525. }
  526. case EYtSettingType::KeyFilter2: {
  527. auto value = setting.Value().Cast<TExprList>();
  528. if (value.Size() > 0) {
  529. if (uniqNodes.insert(value.Item(0).Raw()).second && NeedCalc(value.Item(0))) {
  530. needCalc.push_back(value.Item(0).Ptr());
  531. }
  532. }
  533. break;
  534. }
  535. default:
  536. break;
  537. }
  538. }
  539. }
  540. else if (TMaybeNode<TYtOutput>(node)) {
  541. // Stop traversing dependent operations
  542. return false;
  543. }
  544. return true;
  545. });
  546. return needCalc;
  547. }
  548. bool HasNodesToCalculate(const TExprNode::TPtr& input) {
  549. bool needCalc = false;
  550. VisitExpr(input, [&needCalc](const TExprNode::TPtr& node) {
  551. if (auto maybeOp = TMaybeNode<TYtTransientOpBase>(node)) {
  552. auto op = maybeOp.Cast();
  553. for (auto setting: op.Settings()) {
  554. switch (FromString<EYtSettingType>(setting.Name().Value())) {
  555. case EYtSettingType::Limit:
  556. for (auto expr: setting.Value().Cast().Ref().Children()) {
  557. for (auto item: expr->Children()) {
  558. if (NeedCalc(TExprBase(item->Child(1)))) {
  559. needCalc = true;
  560. return false;
  561. }
  562. }
  563. }
  564. break;
  565. default:
  566. break;
  567. }
  568. }
  569. }
  570. else if (auto maybeSection = TMaybeNode<TYtSection>(node)) {
  571. TYtSection section = maybeSection.Cast();
  572. for (auto setting: section.Settings()) {
  573. switch (FromString<EYtSettingType>(setting.Name().Value())) {
  574. case EYtSettingType::Take:
  575. case EYtSettingType::Skip:
  576. if (NeedCalc(setting.Value().Cast())) {
  577. needCalc = true;
  578. return false;
  579. }
  580. break;
  581. case EYtSettingType::KeyFilter: {
  582. auto value = setting.Value().Cast<TExprList>();
  583. if (value.Size() > 0) {
  584. for (auto member: value.Item(0).Cast<TCoNameValueTupleList>()) {
  585. for (auto cmp: member.Value().Cast<TCoNameValueTupleList>()) {
  586. if (cmp.Value() && NeedCalc(cmp.Value().Cast())) {
  587. needCalc = true;
  588. return false;
  589. }
  590. }
  591. }
  592. }
  593. break;
  594. }
  595. case EYtSettingType::KeyFilter2: {
  596. auto value = setting.Value().Cast<TExprList>();
  597. if (value.Size() > 0) {
  598. if (value.Item(0).Raw() && NeedCalc(value.Item(0))) {
  599. needCalc = true;
  600. return false;
  601. }
  602. }
  603. break;
  604. }
  605. default:
  606. break;
  607. }
  608. }
  609. }
  610. else if (TMaybeNode<TYtOutput>(node)) {
  611. // Stop traversing dependent operations
  612. return false;
  613. }
  614. return !needCalc;
  615. });
  616. return needCalc;
  617. }
  618. std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> CalculateNodes(TYtState::TPtr state,
  619. const TExprNode::TPtr& input,
  620. const TString& cluster,
  621. const TExprNode::TListType& needCalc,
  622. TExprContext& ctx)
  623. {
  624. YQL_ENSURE(!needCalc.empty());
  625. YQL_ENSURE(!input->HasResult(), "Infinitive calculation loop detected");
  626. TNodeMap<size_t> calcNodes;
  627. TUserDataTable files;
  628. TExprNode::TPtr list = ctx.NewList(input->Pos(), TExprNode::TListType(needCalc));
  629. TTypeAnnotationNode::TListType tupleTypes;
  630. std::transform(needCalc.cbegin(), needCalc.cend(), std::back_inserter(tupleTypes), [](const TExprNode::TPtr& n) { return n->GetTypeAnn(); });
  631. list->SetTypeAnn(ctx.MakeType<TTupleExprType>(tupleTypes));
  632. list->SetState(TExprNode::EState::ConstrComplete);
  633. auto status = SubstTables(list, state, /*anonOnly*/true, ctx);
  634. if (status.Level == IGraphTransformer::TStatus::Error) {
  635. return SyncStatus(status);
  636. }
  637. auto callableTransformer = CreateExtCallableTypeAnnotationTransformer(*state->Types);
  638. auto typeTransformer = CreateTypeAnnotationTransformer(callableTransformer, *state->Types);
  639. TExprNode::TPtr optimized;
  640. bool hasNonDeterministicFunctions = false;
  641. status = PeepHoleOptimizeNode(list, optimized, ctx, *state->Types, typeTransformer.Get(), hasNonDeterministicFunctions);
  642. if (status.Level == IGraphTransformer::TStatus::Error) {
  643. return SyncStatus(status);
  644. }
  645. auto filesRes = NCommon::FreezeUsedFiles(*optimized, files, *state->Types, ctx, MakeUserFilesDownloadFilter(*state->Gateway, cluster));
  646. if (filesRes.first.Level != IGraphTransformer::TStatus::Ok) {
  647. return filesRes;
  648. }
  649. TString calcHash;
  650. auto config = state->Configuration->GetSettingsForNode(*input);
  651. const auto queryCacheMode = config->QueryCacheMode.Get().GetOrElse(EQueryCacheMode::Disable);
  652. if (queryCacheMode != EQueryCacheMode::Disable) {
  653. if (!hasNonDeterministicFunctions && config->QueryCacheUseForCalc.Get().GetOrElse(true)) {
  654. calcHash = TYtNodeHashCalculator(state, cluster, config).GetHash(*list);
  655. }
  656. YQL_CLOG(DEBUG, ProviderYt) << "Calc hash: " << HexEncode(calcHash).Quote()
  657. << ", cache mode: " << queryCacheMode;
  658. }
  659. for (size_t i: xrange(needCalc.size())) {
  660. calcNodes.emplace(needCalc[i].Get(), i);
  661. }
  662. THashMap<TString, TString> secureParams;
  663. NCommon::FillSecureParams(input, *state->Types, secureParams);
  664. auto future = state->Gateway->Calc(optimized->ChildrenList(), ctx,
  665. IYtGateway::TCalcOptions(state->SessionId)
  666. .Cluster(cluster)
  667. .UserDataBlocks(files)
  668. .UdfModules(state->Types->UdfModules)
  669. .UdfResolver(state->Types->UdfResolver)
  670. .UdfValidateMode(state->Types->ValidateMode)
  671. .PublicId(state->Types->TranslateOperationId(input->UniqueId()))
  672. .Config(state->Configuration->GetSettingsForNode(*input))
  673. .OptLLVM(state->Types->OptLLVM.GetOrElse(TString()))
  674. .OperationHash(calcHash)
  675. .SecureParams(secureParams)
  676. );
  677. return WrapFutureCallback(future, [state, calcNodes](const IYtGateway::TCalcResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  678. YQL_ENSURE(res.Data.size() == calcNodes.size());
  679. TProcessedNodesSet processedNodes;
  680. if (TYtOpBase::Match(input.Get())) {
  681. processedNodes.insert(input->Child(TYtOpBase::idx_World)->UniqueId());
  682. }
  683. VisitExpr(input, [&processedNodes](const TExprNode::TPtr& node) {
  684. if (TYtOutput::Match(node.Get())) {
  685. // Stop traversing dependent operations
  686. processedNodes.insert(node->UniqueId());
  687. return false;
  688. }
  689. return true;
  690. });
  691. TNodeOnNodeOwnedMap remaps;
  692. for (auto& it: calcNodes) {
  693. auto node = it.first;
  694. auto type = node->GetTypeAnn();
  695. YQL_ENSURE(type);
  696. NYT::TNode data = res.Data[it.second];
  697. remaps.emplace(node, NCommon::NodeToExprLiteral(node->Pos(), *type, data, ctx));
  698. }
  699. TOptimizeExprSettings settings(state->Types);
  700. settings.VisitChanges = true;
  701. settings.VisitStarted = true;
  702. settings.ProcessedNodes = &processedNodes;
  703. auto status = RemapExpr(input, output, remaps, ctx, settings);
  704. if (status.Level == IGraphTransformer::TStatus::Error) {
  705. return status;
  706. }
  707. input->SetState(TExprNode::EState::ExecutionComplete);
  708. output->SetResult(ctx.NewAtom(output->Pos(), "calc")); // Special marker to check infinitive loop
  709. return status.Combine(IGraphTransformer::TStatus::Repeat);
  710. });
  711. }
  712. TMaybe<ui64> GetLimit(const TExprNode& settings) {
  713. auto limitNode = NYql::GetSetting(settings, EYtSettingType::Limit);
  714. if (!limitNode) {
  715. return Nothing();
  716. }
  717. limitNode = limitNode->ChildPtr(1);
  718. TMaybe<ui64> limit;
  719. for (auto part: limitNode->Children()) {
  720. TRecordsRange partialRange;
  721. partialRange.Fill(*part);
  722. if (!partialRange.Limit.Defined()) {
  723. return Nothing();
  724. }
  725. // check overflow
  726. if (std::numeric_limits<ui64>::max() - partialRange.Limit.GetRef() < partialRange.Offset.GetOrElse(0)) {
  727. return Nothing();
  728. }
  729. if (!limit.Defined()) {
  730. limit = partialRange.Limit.GetRef() + partialRange.Offset.GetOrElse(0);
  731. } else {
  732. limit = Max(limit.GetRef(), partialRange.Limit.GetRef() + partialRange.Offset.GetOrElse(0));
  733. }
  734. }
  735. return limit == std::numeric_limits<ui64>::max() ? Nothing() : limit;
  736. }
  737. TExprNode::TPtr GetLimitExpr(const TExprNode::TPtr& limitSetting, TExprContext& ctx) {
  738. auto limitItems = limitSetting->ChildPtr(1);
  739. TExprNode::TListType limitValues;
  740. for (const auto& child : limitItems->Children()) {
  741. TExprNode::TPtr skip, take;
  742. for (auto& setting: child->Children()) {
  743. if (setting->ChildrenSize() == 0) {
  744. continue;
  745. }
  746. auto settingName = setting->Child(0)->Content();
  747. if (settingName == TStringBuf("take")) {
  748. take = setting->ChildPtr(1);
  749. } else if (settingName == TStringBuf("skip")) {
  750. skip = setting->ChildPtr(1);
  751. }
  752. }
  753. if (!take) {
  754. return nullptr;
  755. }
  756. if (skip) {
  757. limitValues.push_back(ctx.NewCallable(child->Pos(), "+", { take, skip }));
  758. } else {
  759. limitValues.push_back(take);
  760. }
  761. }
  762. if (limitValues.empty()) {
  763. return nullptr;
  764. }
  765. if (limitValues.size() == 1) {
  766. return limitValues.front();
  767. }
  768. return ctx.NewCallable(limitSetting->Pos(), "Max", std::move(limitValues));
  769. }
  770. IGraphTransformer::TStatus UpdateTableMeta(const TExprNode::TPtr& tableNode, TExprNode::TPtr& newTableNode,
  771. const TYtTablesData::TPtr& tablesData, bool checkSqlView, bool updateRowSpecType, TExprContext& ctx)
  772. {
  773. newTableNode = tableNode;
  774. TYtTableInfo tableInfo = tableNode;
  775. const TYtTableDescription& tableDesc = tablesData->GetTable(tableInfo.Cluster, tableInfo.Name, tableInfo.Epoch);
  776. const bool withQB = NYql::HasSetting(tableInfo.Settings.Ref(), EYtSettingType::WithQB);
  777. const bool hasUserSchema = NYql::HasSetting(tableInfo.Settings.Ref(), EYtSettingType::UserSchema);
  778. const bool hasUserColumns = NYql::HasSetting(tableInfo.Settings.Ref(), EYtSettingType::UserColumns);
  779. bool update = false;
  780. auto rowSpec = withQB ? tableDesc.QB2RowSpec : tableDesc.RowSpec;
  781. if (updateRowSpecType) {
  782. if (rowSpec && tableInfo.RowSpec && !rowSpec->GetType()) {
  783. rowSpec->CopyType(*tableInfo.RowSpec);
  784. rowSpec->SortedByTypes = tableInfo.RowSpec->SortedByTypes;
  785. }
  786. }
  787. if (!tableInfo.Stat) {
  788. if (tableDesc.Stat) {
  789. tableInfo.Stat = tableDesc.Stat;
  790. update = true;
  791. }
  792. else if (tableDesc.Meta && tableDesc.Meta->DoesExist && tableInfo.Epoch.GetOrElse(0) == 0) {
  793. ctx.AddError(TIssue(ctx.GetPosition(tableNode->Pos()), TStringBuilder() <<
  794. "Table " << tableInfo.Name << " stat was not loaded"));
  795. return IGraphTransformer::TStatus::Error;
  796. }
  797. }
  798. if (!tableInfo.Meta) {
  799. if (!tableDesc.Meta) {
  800. if (tableInfo.Epoch.GetOrElse(0) != 0) {
  801. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
  802. }
  803. ctx.AddError(TIssue(ctx.GetPosition(tableNode->Pos()), TStringBuilder() <<
  804. "Table " << tableInfo.Name << " metadata was not loaded"));
  805. return IGraphTransformer::TStatus::Error;
  806. }
  807. tableInfo.Meta = tableDesc.Meta;
  808. tableInfo.RowSpec = rowSpec;
  809. update = true;
  810. }
  811. else if (rowSpec && !tableInfo.RowSpec) {
  812. tableInfo.RowSpec = rowSpec;
  813. update = true;
  814. }
  815. if (checkSqlView && tableInfo.Meta->SqlView) {
  816. ctx.AddError(TIssue(ctx.GetPosition(tableNode->Pos()), TStringBuilder()
  817. << "Reading from " << tableInfo.Name.Quote() << " view is not supported"));
  818. return IGraphTransformer::TStatus::Error;
  819. }
  820. if (hasUserSchema || hasUserColumns) {
  821. const auto setting = GetSetting(tableInfo.Settings.Ref(), hasUserSchema ? EYtSettingType::UserSchema : EYtSettingType::UserColumns);
  822. auto type = setting->Tail().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
  823. const auto prevRowSpec = tableInfo.RowSpec;
  824. if (!(prevRowSpec && prevRowSpec->StrictSchema) && type->Cast<TStructExprType>()->FindItem("_other")) {
  825. ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()), "It is forbidden to specify the column '_other'."));
  826. return IGraphTransformer::TStatus::Error;
  827. }
  828. TVector<TString> explicitYson;
  829. if (prevRowSpec && hasUserColumns) {
  830. const bool hasNativeFlags = prevRowSpec->GetNativeYtTypeFlags() != 0;
  831. // patch original type
  832. auto items = prevRowSpec->GetType()->GetItems();
  833. for (const auto& newItem : type->GetItems()) {
  834. if (auto pos = prevRowSpec->GetType()->FindItem(newItem->GetName())) {
  835. if (hasNativeFlags) {
  836. bool isOptional = false;
  837. const TDataExprType* dataType = nullptr;
  838. if (IsDataOrOptionalOfData(items[*pos]->GetItemType(), isOptional, dataType)
  839. && dataType->GetSlot() == EDataSlot::Yson
  840. && !IsDataOrOptionalOfData(newItem->GetItemType()))
  841. {
  842. explicitYson.emplace_back(newItem->GetName());
  843. }
  844. }
  845. items[*pos] = ctx.MakeType<TItemExprType>(newItem->GetName(), newItem->GetItemType());
  846. } else {
  847. items.push_back(newItem);
  848. }
  849. }
  850. type = ctx.MakeType<TStructExprType>(items);
  851. }
  852. if ((prevRowSpec && !IsSameAnnotation(*prevRowSpec->GetType(), *type)) || (!prevRowSpec && hasUserSchema)) {
  853. update = true;
  854. auto strict = hasUserSchema;
  855. if (hasUserColumns) {
  856. if (prevRowSpec) {
  857. strict = prevRowSpec->StrictSchema;
  858. }
  859. }
  860. tableInfo.RowSpec = MakeIntrusive<TYqlRowSpecInfo>();
  861. tableInfo.RowSpec->SetType(type, prevRowSpec ? prevRowSpec->GetNativeYtTypeFlags() : 0ul);
  862. tableInfo.RowSpec->UniqueKeys = false;
  863. tableInfo.RowSpec->StrictSchema = strict;
  864. tableInfo.RowSpec->ExplicitYson = explicitYson;
  865. if (prevRowSpec) {
  866. if (auto nativeType = prevRowSpec->GetNativeYtType()) {
  867. tableInfo.RowSpec->CopyTypeOrders(*nativeType);
  868. }
  869. if (prevRowSpec->IsSorted()) {
  870. tableInfo.RowSpec->CopySortness(ctx, *prevRowSpec, TYqlRowSpecInfo::ECopySort::WithDesc);
  871. tableInfo.RowSpec->MakeCommonSortness(ctx, *prevRowSpec); // Truncated keys with changed types
  872. }
  873. }
  874. }
  875. } else {
  876. if (!update && rowSpec && tableInfo.RowSpec && (!rowSpec->CompareSortness(*tableInfo.RowSpec) || rowSpec->GetNativeYtType() != tableInfo.RowSpec->GetNativeYtType())) {
  877. tableInfo.RowSpec = rowSpec;
  878. update = true;
  879. }
  880. }
  881. if (update) {
  882. newTableNode = tableInfo.ToExprNode(ctx, tableNode->Pos()).Ptr();
  883. return IGraphTransformer::TStatus::Repeat;
  884. }
  885. return IGraphTransformer::TStatus::Ok;
  886. }
  887. TExprNode::TPtr ValidateAndUpdateTablesMeta(const TExprNode::TPtr& input, TStringBuf cluster, const TYtTablesData::TPtr& tablesData, bool updateRowSpecType, TExprContext& ctx) {
  888. TNodeSet tables;
  889. VisitExpr(input, [&](const TExprNode::TPtr& node) {
  890. if (auto maybeTable = TMaybeNode<TYtTable>(node)) {
  891. tables.insert(maybeTable.Cast().Raw());
  892. return false;
  893. }
  894. else if (TMaybeNode<TYtOutput>(node)) {
  895. // Don't traverse deeper to inner operations
  896. return false;
  897. }
  898. return true;
  899. });
  900. if (!tables.empty()) {
  901. bool valid = true;
  902. for (auto table: tables) {
  903. if (cluster != table->Child(TYtTable::idx_Cluster)->Content()) {
  904. ctx.AddError(TIssue(ctx.GetPosition(table->Child(TYtTable::idx_Cluster)->Pos()), TStringBuilder()
  905. << "Table " << TString{table->Child(TYtTable::idx_Name)->Content()}.Quote()
  906. << " cluster doesn't match DataSource/DataSink cluster: "
  907. << TString{table->Child(TYtTable::idx_Cluster)->Content()}.Quote() << " != " << TString{cluster}.Quote()));
  908. valid = false;
  909. }
  910. }
  911. if (!valid) {
  912. return {};
  913. }
  914. TOptimizeExprSettings settings(nullptr);
  915. settings.VisitChanges = true;
  916. TExprNode::TPtr output = input;
  917. auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
  918. if (tables.find(node.Get()) != tables.cend()) {
  919. if (!TYtTableInfo::HasSubstAnonymousLabel(TExprBase(node))) {
  920. TExprNode::TPtr newNode;
  921. auto status = UpdateTableMeta(node, newNode, tablesData, true, updateRowSpecType, ctx);
  922. if (IGraphTransformer::TStatus::Error == status.Level) {
  923. return {};
  924. }
  925. return newNode;
  926. }
  927. }
  928. return node;
  929. }, ctx, settings);
  930. if (IGraphTransformer::TStatus::Error == status.Level) {
  931. return {};
  932. }
  933. return output;
  934. }
  935. return input;
  936. }
  937. TExprNode::TPtr ResetTableMeta(const TExprNode::TPtr& tableNode, TExprContext& ctx) {
  938. TExprNode::TListType children;
  939. for (auto id: {TYtTable::idx_Meta, TYtTable::idx_Stat, TYtTable::idx_RowSpec}) {
  940. if (!TCoVoid::Match(tableNode->Child(id))) {
  941. if (children.empty()) {
  942. children = tableNode->ChildrenList();
  943. }
  944. children[id] = ctx.NewCallable(tableNode->Pos(), TCoVoid::CallableName(), {});
  945. }
  946. }
  947. if (children.empty()) {
  948. return tableNode;
  949. }
  950. return ctx.ChangeChildren(*tableNode, std::move(children));
  951. }
  952. TExprNode::TPtr ResetOutTableMeta(const TExprNode::TPtr& tableNode, TExprContext& ctx) {
  953. TExprNode::TListType children;
  954. if (!TCoVoid::Match(tableNode->Child(TYtOutTable::idx_Stat))) {
  955. if (children.empty()) {
  956. children = tableNode->ChildrenList();
  957. }
  958. children[TYtOutTable::idx_Stat] = ctx.NewCallable(tableNode->Pos(), TCoVoid::CallableName(), {});
  959. }
  960. if (tableNode->Child(TYtOutTable::idx_Name)->Content()) {
  961. if (children.empty()) {
  962. children = tableNode->ChildrenList();
  963. }
  964. children[TYtOutTable::idx_Name] = ctx.NewAtom(tableNode->Pos(), TStringBuf());
  965. }
  966. if (children.empty()) {
  967. return tableNode;
  968. }
  969. return ctx.ChangeChildren(*tableNode, std::move(children));
  970. }
  971. TExprNode::TPtr ResetTablesMeta(const TExprNode::TPtr& input, TExprContext& ctx, bool resetTmpOnly, bool isEvaluationInProgress) {
  972. TNodeSet tables;
  973. TNodeSet outTables;
  974. VisitExpr(input, [&](const TExprNode::TPtr& node) {
  975. if (auto maybeTable = TMaybeNode<TYtTable>(node)) {
  976. const bool isAnonymous = NYql::HasSetting(maybeTable.Cast().Settings().Ref(), EYtSettingType::Anonymous);
  977. if (!resetTmpOnly && !(isEvaluationInProgress && isAnonymous)) {
  978. if (!TCoVoid::Match(maybeTable.Stat().Raw()) || !TCoVoid::Match(maybeTable.Meta().Raw()) || !TCoVoid::Match(maybeTable.RowSpec().Raw())) {
  979. tables.insert(maybeTable.Raw());
  980. }
  981. }
  982. return false;
  983. }
  984. else if (auto maybeTable = TMaybeNode<TYtOutTable>(node)) {
  985. if (!isEvaluationInProgress) {
  986. if (!TCoVoid::Match(maybeTable.Stat().Raw()) || maybeTable.Cast().Name().Value()) {
  987. outTables.insert(maybeTable.Raw());
  988. }
  989. }
  990. return false;
  991. }
  992. else if (TMaybeNode<TYtOutput>(node)) {
  993. // Don't traverse deeper to inner operations
  994. return false;
  995. }
  996. return true;
  997. });
  998. if (!tables.empty() || !outTables.empty()) {
  999. TOptimizeExprSettings settings(nullptr);
  1000. settings.VisitChanges = true;
  1001. TExprNode::TPtr output = input;
  1002. auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
  1003. if (tables.find(node.Get()) != tables.cend()) {
  1004. return ResetTableMeta(node, ctx);
  1005. }
  1006. else if (outTables.find(node.Get()) != outTables.cend()) {
  1007. return ResetOutTableMeta(node, ctx);
  1008. }
  1009. return node;
  1010. }, ctx, settings);
  1011. if (IGraphTransformer::TStatus::Error == status.Level) {
  1012. return {};
  1013. }
  1014. return output;
  1015. }
  1016. return input;
  1017. }
  1018. std::pair<TExprBase, TString> GetOutTableWithCluster(TExprBase ytOutput) {
  1019. const auto output = ytOutput.Cast<TYtOutput>();
  1020. const auto op = GetOutputOp(output);
  1021. const auto cluster = TString{ op.DataSink().Cluster().Value() };
  1022. size_t ndx = 0;
  1023. YQL_ENSURE(TryFromString<size_t>(output.OutIndex().Value(), ndx), "Bad " << TYtOutput::CallableName() << " output index value");
  1024. const auto opOut = op.Output();
  1025. YQL_ENSURE(ndx < opOut.Size());
  1026. return { opOut.Item(ndx), cluster };
  1027. }
  1028. TExprBase GetOutTable(TExprBase ytOutput) {
  1029. return GetOutTableWithCluster(ytOutput).first;
  1030. }
  1031. TMaybeNode<TCoFlatMapBase> GetFlatMapOverInputStream(TCoLambda opLambda, const TParentsMap& parentsMap) {
  1032. TMaybeNode<TCoFlatMapBase> map;
  1033. if (const auto it = parentsMap.find(opLambda.Args().Arg(0).Raw()); parentsMap.cend() != it) {
  1034. for (const auto& parent : it->second) {
  1035. if (!map) {
  1036. if (map = TMaybeNode<TCoFlatMapBase>(parent))
  1037. continue;
  1038. }
  1039. if (!TCoDependsOn::Match(parent)) {
  1040. map = {};
  1041. break;
  1042. }
  1043. }
  1044. }
  1045. return map;
  1046. }
  1047. TMaybeNode<TCoFlatMapBase> GetFlatMapOverInputStream(TCoLambda opLambda) {
  1048. TParentsMap parentsMap;
  1049. GatherParents(opLambda.Body().Ref(), parentsMap);
  1050. return GetFlatMapOverInputStream(opLambda, parentsMap);
  1051. }
  1052. TExprNode::TPtr ToOutTableWithHash(TExprBase output, const TYtState::TPtr& state, TExprContext& ctx) {
  1053. auto [outTableNode, cluster] = GetOutTableWithCluster(output);
  1054. auto outTable = outTableNode.Ptr();
  1055. auto hash = TYtNodeHashCalculator(state, cluster, state->Configuration->Snapshot()).GetHash(output.Ref());
  1056. outTable = ctx.ChangeChild(*outTable, TYtOutTable::idx_Settings,
  1057. NYql::AddSetting(*outTable->Child(TYtOutTable::idx_Settings), EYtSettingType::OpHash, ctx.NewAtom(output.Pos(), HexEncode(hash)), ctx)
  1058. );
  1059. return outTable;
  1060. }
  1061. IGraphTransformer::TStatus SubstTables(TExprNode::TPtr& input, const TYtState::TPtr& state, bool anonOnly, TExprContext& ctx)
  1062. {
  1063. TProcessedNodesSet processedNodes;
  1064. VisitExpr(input, [&processedNodes](const TExprNode::TPtr& node) {
  1065. if (TYtOutput::Match(node.Get())) {
  1066. // Stop traversing dependent operations
  1067. processedNodes.insert(node->UniqueId());
  1068. return false;
  1069. }
  1070. return true;
  1071. });
  1072. TOptimizeExprSettings settings(state->Types);
  1073. settings.VisitChanges = true;
  1074. settings.VisitStarted = true;
  1075. settings.CustomInstantTypeTransformer = state->Types->CustomInstantTypeTransformer.Get();
  1076. settings.ProcessedNodes = &processedNodes;
  1077. TExprNode::TPtr optimizedInput = input;
  1078. auto status = OptimizeExpr(optimizedInput, optimizedInput, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
  1079. if (auto maybeTable = TMaybeNode<TYtTable>(node)) {
  1080. auto table = maybeTable.Cast();
  1081. if (auto anon = NYql::GetSetting(table.Settings().Ref(), EYtSettingType::Anonymous)) {
  1082. if (anon->ChildrenSize() == 1) {
  1083. TString cluster = TString{table.Cluster().Value()};
  1084. TString anonTableName = TString{table.Name().Value()};
  1085. TString realTableName = state->AnonymousLabels.Value(std::make_pair(cluster, anonTableName), TString());
  1086. if (!realTableName) {
  1087. ctx.AddError(TIssue(ctx.GetPosition(table.Pos()), TStringBuilder() << "Unaccounted anonymous table: " << cluster << '.' << anonTableName));
  1088. return {};
  1089. }
  1090. auto children = node->ChildrenList();
  1091. children[TYtTable::idx_Name] = ctx.NewAtom(node->Pos(), realTableName);
  1092. children[TYtTable::idx_Settings] = NYql::AddSetting(
  1093. *NYql::RemoveSetting(table.Settings().Ref(), EYtSettingType::Anonymous, ctx),
  1094. EYtSettingType::Anonymous, ctx.NewAtom(node->Pos(), anonTableName), ctx);
  1095. return ctx.ChangeChildren(*node, std::move(children));
  1096. }
  1097. }
  1098. }
  1099. return node;
  1100. }, ctx, settings);
  1101. if (status.Level == IGraphTransformer::TStatus::Error) {
  1102. return status;
  1103. }
  1104. if (!anonOnly) {
  1105. const bool useQueryCache = state->Configuration->QueryCacheMode.Get().GetOrElse(EQueryCacheMode::Disable) != EQueryCacheMode::Disable
  1106. && state->Configuration->QueryCacheUseForCalc.Get().GetOrElse(true);
  1107. TNodeOnNodeOwnedMap toOpt;
  1108. VisitExpr(optimizedInput, [&toOpt, &state, useQueryCache, &ctx](const TExprNode::TPtr& node) {
  1109. if (auto maybePath = TMaybeNode<TYtPath>(node)) {
  1110. if (maybePath.Table().Maybe<TYtOutput>()) {
  1111. auto path = maybePath.Cast();
  1112. toOpt[node.Get()] = Build<TYtPath>(ctx, node->Pos())
  1113. .InitFrom(path)
  1114. .Table(useQueryCache ? ToOutTableWithHash(path.Table(), state, ctx) : GetOutTable(path.Table()).Ptr())
  1115. .Done().Ptr();
  1116. }
  1117. return false;
  1118. }
  1119. if (TMaybeNode<TYtLength>(node).Input().Maybe<TYtOutput>()) {
  1120. auto length = TYtLength(node);
  1121. toOpt[node.Get()] = Build<TYtLength>(ctx, node->Pos())
  1122. .InitFrom(length)
  1123. .Input<TYtReadTable>()
  1124. .World<TCoWorld>().Build()
  1125. .DataSource(ctx.RenameNode(GetOutputOp(length.Input().Cast<TYtOutput>()).DataSink().Ref(), TYtDSource::CallableName()))
  1126. .Input()
  1127. .Add()
  1128. .Paths()
  1129. .Add()
  1130. .Table(useQueryCache ? ToOutTableWithHash(length.Input(), state, ctx) : GetOutTable(length.Input()).Ptr())
  1131. .Columns<TCoVoid>().Build()
  1132. .Ranges<TCoVoid>().Build()
  1133. .Stat<TCoVoid>().Build()
  1134. .Build()
  1135. .Build()
  1136. .Settings()
  1137. .Build()
  1138. .Build()
  1139. .Build()
  1140. .Build()
  1141. .Done().Ptr();
  1142. return false;
  1143. }
  1144. if (TMaybeNode<TYtTableContent>(node).Input().Maybe<TYtOutput>()) {
  1145. auto content = TYtTableContent(node);
  1146. toOpt[node.Get()] = Build<TYtTableContent>(ctx, node->Pos())
  1147. .InitFrom(content)
  1148. .Input<TYtReadTable>()
  1149. .World<TCoWorld>().Build()
  1150. .DataSource(ctx.RenameNode(GetOutputOp(content.Input().Cast<TYtOutput>()).DataSink().Ref(), TYtDSource::CallableName()))
  1151. .Input()
  1152. .Add()
  1153. .Paths()
  1154. .Add()
  1155. .Table(useQueryCache ? ToOutTableWithHash(content.Input(), state, ctx) : GetOutTable(content.Input()).Ptr())
  1156. .Columns<TCoVoid>().Build()
  1157. .Ranges<TCoVoid>().Build()
  1158. .Stat<TCoVoid>().Build()
  1159. .Build()
  1160. .Build()
  1161. .Settings()
  1162. .Build()
  1163. .Build()
  1164. .Build()
  1165. .Build()
  1166. .Done().Ptr();
  1167. return false;
  1168. }
  1169. if (auto maybeOut = TMaybeNode<TYtOutput>(node)) {
  1170. auto out = maybeOut.Cast();
  1171. toOpt[node.Get()] = Build<TCoRight>(ctx, node->Pos())
  1172. .Input<TYtReadTable>()
  1173. .World<TCoWorld>().Build()
  1174. .DataSource(ctx.RenameNode(GetOutputOp(out).DataSink().Ref(), TYtDSource::CallableName()))
  1175. .Input()
  1176. .Add()
  1177. .Paths()
  1178. .Add()
  1179. .Table(useQueryCache ? ToOutTableWithHash(out, state, ctx) : GetOutTable(out).Ptr())
  1180. .Columns<TCoVoid>().Build()
  1181. .Ranges<TCoVoid>().Build()
  1182. .Stat<TCoVoid>().Build()
  1183. .Build()
  1184. .Build()
  1185. .Settings()
  1186. .Build()
  1187. .Build()
  1188. .Build()
  1189. .Build()
  1190. .Done().Ptr();
  1191. return false;
  1192. }
  1193. return true;
  1194. });
  1195. if (!toOpt.empty()) {
  1196. settings.ProcessedNodes = nullptr;
  1197. status = RemapExpr(optimizedInput, optimizedInput, toOpt, ctx, settings);
  1198. if (status.Level == IGraphTransformer::TStatus::Error) {
  1199. return status;
  1200. }
  1201. }
  1202. }
  1203. if (optimizedInput != input) {
  1204. auto typeTransformer = CreateTypeAnnotationTransformer(CreateExtCallableTypeAnnotationTransformer(*state->Types, true), *state->Types);
  1205. auto constrTransformer = CreateConstraintTransformer(*state->Types, true, true);
  1206. TVector<TTransformStage> transformers;
  1207. const auto issueCode = TIssuesIds::CORE_TYPE_ANN;
  1208. transformers.push_back(TTransformStage(typeTransformer, "TypeAnnotation", issueCode));
  1209. transformers.push_back(TTransformStage(
  1210. CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { return UpdateCompletness(input, output, ctx); }),
  1211. "UpdateCompletness", issueCode));
  1212. transformers.push_back(TTransformStage(constrTransformer, "Constraints", issueCode));
  1213. auto fullTransformer = CreateCompositeGraphTransformer(transformers, false);
  1214. status = InstantTransform(*fullTransformer, optimizedInput, ctx);
  1215. if (status.Level == IGraphTransformer::TStatus::Error) {
  1216. return status;
  1217. }
  1218. input = optimizedInput;
  1219. }
  1220. return IGraphTransformer::TStatus::Ok;
  1221. }
  1222. TYtPath CopyOrTrivialMap(TPositionHandle pos, TExprBase world, TYtDSink dataSink, const TTypeAnnotationNode& scheme,
  1223. TYtSection section, TYqlRowSpecInfo::TPtr outRowSpec, TExprContext& ctx, const TYtState::TPtr& state, const TCopyOrTrivialMapOpts& opts)
  1224. {
  1225. bool tryKeepSortness = opts.TryKeepSortness;
  1226. const bool singleInput = section.Paths().Size() == 1;
  1227. bool needMap = false;
  1228. const auto sysColumns = NYql::GetSetting(section.Settings().Ref(), EYtSettingType::SysColumns);
  1229. bool useExplicitColumns = false;
  1230. bool exactCopySort = false;
  1231. bool hasAux = false;
  1232. TVector<std::pair<TYqlRowSpecInfo::TPtr, bool>> rowSpecs;
  1233. const ui64 outNativeYtTypeFlags = outRowSpec ? outRowSpec->GetNativeYtTypeFlags() : (state->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE);
  1234. TYtOutTableInfo outTable(scheme.Cast<TStructExprType>(), outNativeYtTypeFlags);
  1235. outTable.RowSpec->SetConstraints(opts.Constraints);
  1236. TMaybe<NYT::TNode> outNativeType;
  1237. if (outRowSpec) {
  1238. outNativeType = outRowSpec->GetNativeYtType();
  1239. }
  1240. bool first = !outRowSpec;
  1241. const bool useNativeDescSort = state->Configuration->UseNativeDescSort.Get().GetOrElse(DEFAULT_USE_NATIVE_DESC_SORT);
  1242. for (auto path: section.Paths()) {
  1243. TYtPathInfo pathInfo(path);
  1244. const bool hasRowSpec = !!pathInfo.Table->RowSpec;
  1245. const bool tableHasAux = hasRowSpec && pathInfo.Table->RowSpec->HasAuxColumns();
  1246. TMaybe<NYT::TNode> currentNativeType;
  1247. if (hasRowSpec) {
  1248. currentNativeType = pathInfo.GetNativeYtType();
  1249. }
  1250. if (first) {
  1251. outNativeType = currentNativeType;
  1252. first = false;
  1253. }
  1254. const bool needTableMap = pathInfo.RequiresRemap() || bool(sysColumns)
  1255. || outTable.RowSpec->GetNativeYtTypeFlags() != pathInfo.GetNativeYtTypeFlags()
  1256. || currentNativeType != outNativeType;
  1257. useExplicitColumns = useExplicitColumns || !pathInfo.Table->IsTemp || (tableHasAux && pathInfo.HasColumns());
  1258. needMap = needMap || needTableMap;
  1259. hasAux = hasAux || tableHasAux;
  1260. if (tryKeepSortness) {
  1261. if (pathInfo.Table->IsUnordered || (opts.RangesResetSort && pathInfo.Ranges && pathInfo.Ranges->GetRangesCount() > 1)) {
  1262. tryKeepSortness = false;
  1263. }
  1264. rowSpecs.emplace_back(pathInfo.Table->RowSpec, needTableMap);
  1265. exactCopySort = singleInput && pathInfo.Table->IsTemp && hasRowSpec
  1266. && IsSameAnnotation(scheme, *pathInfo.Table->RowSpec->GetType());
  1267. }
  1268. }
  1269. if (!needMap && outNativeType) {
  1270. outTable.RowSpec->CopyTypeOrders(*outNativeType);
  1271. }
  1272. useExplicitColumns = useExplicitColumns || (!tryKeepSortness && hasAux);
  1273. bool trimSort = false;
  1274. const bool sortConstraintEnabled = ctx.IsConstraintEnabled<TSortedConstraintNode>();
  1275. if (tryKeepSortness) {
  1276. bool sortIsChanged = false;
  1277. for (size_t i = 0; i < rowSpecs.size(); ++i) {
  1278. if (!rowSpecs[i].first) {
  1279. sortIsChanged = outTable.RowSpec->ClearSortness(ctx);
  1280. continue;
  1281. }
  1282. if (0 == i) {
  1283. TYqlRowSpecInfo::ECopySort mode = TYqlRowSpecInfo::ECopySort::Pure;
  1284. if (rowSpecs[i].second) {
  1285. if (sortConstraintEnabled) {
  1286. mode = TYqlRowSpecInfo::ECopySort::WithDesc;
  1287. }
  1288. } else {
  1289. mode = exactCopySort
  1290. ? TYqlRowSpecInfo::ECopySort::Exact
  1291. : TYqlRowSpecInfo::ECopySort::WithDesc;
  1292. }
  1293. sortIsChanged = outTable.RowSpec->CopySortness(ctx, *rowSpecs[i].first, mode);
  1294. } else {
  1295. sortIsChanged = outTable.RowSpec->MakeCommonSortness(ctx, *rowSpecs[i].first) || sortIsChanged;
  1296. if (rowSpecs[i].second && !sortConstraintEnabled) {
  1297. sortIsChanged = outTable.RowSpec->KeepPureSortOnly(ctx) || sortIsChanged;
  1298. }
  1299. }
  1300. }
  1301. useExplicitColumns = useExplicitColumns || (sortIsChanged && hasAux);
  1302. tryKeepSortness = outTable.RowSpec->IsSorted();
  1303. trimSort = !tryKeepSortness;
  1304. }
  1305. outTable.SetUnique(opts.SectionUniq, pos, ctx);
  1306. if (tryKeepSortness) {
  1307. if (needMap && !singleInput) {
  1308. auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, pos);
  1309. settingsBuilder
  1310. .Add()
  1311. .Name()
  1312. .Value(ToString(EYtSettingType::Ordered))
  1313. .Build()
  1314. .Build();
  1315. if (!opts.LimitNodes.empty()) {
  1316. settingsBuilder
  1317. .Add()
  1318. .Name()
  1319. .Value(ToString(EYtSettingType::Limit))
  1320. .Build()
  1321. .Value<TExprList>()
  1322. .Add(opts.LimitNodes)
  1323. .Build()
  1324. .Build();
  1325. }
  1326. if (state->Configuration->UseFlow.Get().GetOrElse(DEFAULT_USE_FLOW)) {
  1327. settingsBuilder
  1328. .Add()
  1329. .Name()
  1330. .Value(ToString(EYtSettingType::Flow))
  1331. .Build()
  1332. .Build();
  1333. }
  1334. TExprNode::TPtr mapSectionSettings = ctx.NewList(section.Pos(), {});
  1335. TExprNode::TPtr sectionSettings = section.Settings().Ptr();
  1336. if (sysColumns) {
  1337. mapSectionSettings = NYql::AddSetting(*mapSectionSettings, EYtSettingType::SysColumns, sysColumns->ChildPtr(1), ctx);
  1338. sectionSettings = NYql::RemoveSetting(*sectionSettings, EYtSettingType::SysColumns, ctx);
  1339. }
  1340. auto getPathUniq = [] (const TYtPath& path) {
  1341. if (path.Ref().GetState() != TExprNode::EState::Initial) {
  1342. return path.Ref().GetConstraint<TDistinctConstraintNode>();
  1343. }
  1344. // Dynamically constructed YtPath for YtOutput
  1345. return path.Table().Ref().GetConstraint<TDistinctConstraintNode>();
  1346. };
  1347. TVector<TYtPath> updatedPaths;
  1348. YQL_ENSURE(rowSpecs.size() == section.Paths().Size());
  1349. for (size_t i = 0; i < section.Paths().Size(); ++i) {
  1350. auto path = section.Paths().Item(i);
  1351. if (rowSpecs[i].second) {
  1352. TYtOutTableInfo mapOutTable(scheme.Cast<TStructExprType>(), outNativeYtTypeFlags);
  1353. if (outNativeType) {
  1354. mapOutTable.RowSpec->CopyTypeOrders(*outNativeType);
  1355. }
  1356. YQL_ENSURE(rowSpecs[i].first);
  1357. mapOutTable.SetUnique(getPathUniq(path), path.Pos(), ctx);
  1358. auto mapper = Build<TCoLambda>(ctx, path.Pos())
  1359. .Args({"stream"})
  1360. .Body("stream")
  1361. .Done().Ptr();
  1362. mapOutTable.RowSpec->CopySortness(ctx, *rowSpecs[i].first, sortConstraintEnabled ? TYqlRowSpecInfo::ECopySort::WithDesc : TYqlRowSpecInfo::ECopySort::Pure);
  1363. if (sortConstraintEnabled) {
  1364. TKeySelectorBuilder builder(path.Pos(), ctx, useNativeDescSort, scheme.Cast<TStructExprType>());
  1365. builder.ProcessRowSpec(*mapOutTable.RowSpec);
  1366. if (builder.NeedMap()) {
  1367. mapper = builder.MakeRemapLambda(true);
  1368. }
  1369. }
  1370. path = Build<TYtPath>(ctx, path.Pos())
  1371. .Table<TYtOutput>()
  1372. .Operation<TYtMap>()
  1373. .World(world)
  1374. .DataSink(dataSink)
  1375. .Input()
  1376. .Add()
  1377. .Paths()
  1378. .Add(path)
  1379. .Build()
  1380. .Settings(mapSectionSettings)
  1381. .Build()
  1382. .Build()
  1383. .Output()
  1384. .Add(mapOutTable.ToExprNode(ctx, path.Pos()).Cast<TYtOutTable>())
  1385. .Build()
  1386. .Settings(settingsBuilder.Done())
  1387. .Mapper(mapper)
  1388. .Build()
  1389. .OutIndex()
  1390. .Value("0")
  1391. .Build()
  1392. .Build()
  1393. .Columns<TCoVoid>().Build()
  1394. .Ranges<TCoVoid>().Build()
  1395. .Stat<TCoVoid>().Build()
  1396. .Done();
  1397. }
  1398. updatedPaths.push_back(path);
  1399. }
  1400. section = Build<TYtSection>(ctx, section.Pos())
  1401. .InitFrom(section)
  1402. .Paths()
  1403. .Add(updatedPaths)
  1404. .Build()
  1405. .Settings(sectionSettings)
  1406. .Done();
  1407. needMap = false;
  1408. }
  1409. } else if (!trimSort) {
  1410. section = MakeUnorderedSection(section, ctx);
  1411. }
  1412. if (needMap) {
  1413. auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, pos);
  1414. settingsBuilder
  1415. .Add()
  1416. .Name()
  1417. .Value(ToString(EYtSettingType::Ordered))
  1418. .Build()
  1419. .Build();
  1420. if (!opts.LimitNodes.empty()) {
  1421. settingsBuilder
  1422. .Add()
  1423. .Name()
  1424. .Value(ToString(EYtSettingType::Limit))
  1425. .Build()
  1426. .Value<TExprList>()
  1427. .Add(opts.LimitNodes)
  1428. .Build()
  1429. .Build();
  1430. }
  1431. if (state->Configuration->UseFlow.Get().GetOrElse(DEFAULT_USE_FLOW)) {
  1432. settingsBuilder
  1433. .Add()
  1434. .Name()
  1435. .Value(ToString(EYtSettingType::Flow))
  1436. .Build()
  1437. .Build();
  1438. }
  1439. auto mapper = Build<TCoLambda>(ctx, pos)
  1440. .Args({"stream"})
  1441. .Body("stream")
  1442. .Done().Ptr();
  1443. if (sortConstraintEnabled && outTable.RowSpec->IsSorted()) {
  1444. TKeySelectorBuilder builder(pos, ctx, useNativeDescSort, scheme.Cast<TStructExprType>());
  1445. builder.ProcessRowSpec(*outTable.RowSpec);
  1446. if (builder.NeedMap()) {
  1447. mapper = builder.MakeRemapLambda(true);
  1448. }
  1449. }
  1450. return Build<TYtPath>(ctx, pos)
  1451. .Table<TYtOutput>()
  1452. .Operation<TYtMap>()
  1453. .World(world)
  1454. .DataSink(dataSink)
  1455. .Input()
  1456. .Add(section)
  1457. .Build()
  1458. .Output()
  1459. .Add(outTable.ToExprNode(ctx, pos).Cast<TYtOutTable>())
  1460. .Build()
  1461. .Settings(settingsBuilder.Done())
  1462. .Mapper(mapper)
  1463. .Build()
  1464. .OutIndex()
  1465. .Value("0")
  1466. .Build()
  1467. .Build()
  1468. .Columns<TCoVoid>().Build()
  1469. .Ranges<TCoVoid>().Build()
  1470. .Stat<TCoVoid>().Build()
  1471. .Done();
  1472. }
  1473. auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, pos);
  1474. if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Sample)) {
  1475. settingsBuilder
  1476. .Add()
  1477. .Name()
  1478. .Value(ToString(EYtSettingType::ForceTransform))
  1479. .Build()
  1480. .Build();
  1481. }
  1482. if (opts.CombineChunks) {
  1483. settingsBuilder
  1484. .Add()
  1485. .Name()
  1486. .Value(ToString(EYtSettingType::CombineChunks))
  1487. .Build()
  1488. .Build();
  1489. }
  1490. if (!opts.LimitNodes.empty()) {
  1491. settingsBuilder
  1492. .Add()
  1493. .Name()
  1494. .Value(ToString(EYtSettingType::Limit))
  1495. .Build()
  1496. .Value<TExprList>()
  1497. .Add(opts.LimitNodes)
  1498. .Build()
  1499. .Build();
  1500. }
  1501. if (useExplicitColumns) {
  1502. TSet<TStringBuf> columns;
  1503. for (auto item: outTable.RowSpec->GetType()->GetItems()) {
  1504. columns.insert(item->GetName());
  1505. }
  1506. for (auto item: outTable.RowSpec->GetAuxColumns()) {
  1507. columns.insert(item.first);
  1508. }
  1509. section = UpdateInputFields(section, std::move(columns), ctx, false);
  1510. }
  1511. return Build<TYtPath>(ctx, pos)
  1512. .Table<TYtOutput>()
  1513. .Operation<TYtMerge>()
  1514. .World(world)
  1515. .DataSink(dataSink)
  1516. .Input()
  1517. .Add(section)
  1518. .Build()
  1519. .Output()
  1520. .Add(outTable.ToExprNode(ctx, pos).Cast<TYtOutTable>())
  1521. .Build()
  1522. .Settings(settingsBuilder.Done())
  1523. .Build()
  1524. .OutIndex()
  1525. .Value(TStringBuf("0"))
  1526. .Build()
  1527. .Build()
  1528. .Columns<TCoVoid>().Build()
  1529. .Ranges<TCoVoid>().Build()
  1530. .Stat<TCoVoid>().Build()
  1531. .Done();
  1532. }
  1533. namespace {
  1534. template <class T>
  1535. const TExprNode* GetSingleParent(const TExprNode* node, const TParentsMap& parentsMap) {
  1536. if (T::Match(node)) {
  1537. auto parentsIt = parentsMap.find(node);
  1538. YQL_ENSURE(parentsIt != parentsMap.cend());
  1539. if (parentsIt->second.size() != 1) {
  1540. return nullptr;
  1541. }
  1542. return *parentsIt->second.begin();
  1543. }
  1544. return node;
  1545. }
  1546. }
  1547. bool IsOutputUsedMultipleTimes(const TExprNode& op, const TParentsMap& parentsMap) {
  1548. const TExprNode* node = &op;
  1549. node = GetSingleParent<TYtOutputOpBase>(node, parentsMap);
  1550. if (nullptr == node) {
  1551. return true;
  1552. }
  1553. node = GetSingleParent<TYtOutput>(node, parentsMap);
  1554. if (nullptr == node) {
  1555. return true;
  1556. }
  1557. node = GetSingleParent<TYtPath>(node, parentsMap);
  1558. if (nullptr == node) {
  1559. return true;
  1560. }
  1561. node = GetSingleParent<TYtPathList>(node, parentsMap);
  1562. if (nullptr == node) {
  1563. return true;
  1564. }
  1565. node = GetSingleParent<TYtSection>(node, parentsMap);
  1566. if (nullptr == node) {
  1567. return true;
  1568. }
  1569. node = GetSingleParent<TYtSectionList>(node, parentsMap);
  1570. return node == nullptr;
  1571. }
  1572. TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const TYtPathInfo& pathInfo,
  1573. const TMaybe<TVector<TString>>& overrideColumns, const TYtState& state, TExprContext& ctx)
  1574. {
  1575. auto ytPath = NYT::TRichYPath(pathInfo.Table->Name);
  1576. pathInfo.FillRichYPath(ytPath);
  1577. if (overrideColumns) {
  1578. ytPath.Columns(*overrideColumns);
  1579. }
  1580. if (ytPath.Columns_ && dynamic_cast<TYtTableInfo*>(pathInfo.Table.Get()) && pathInfo.Table->IsAnonymous
  1581. && !TYtTableInfo::HasSubstAnonymousLabel(pathInfo.Table->FromNode.Cast())) {
  1582. TString realTableName = state.AnonymousLabels.Value(std::make_pair(cluster, pathInfo.Table->Name), TString());
  1583. if (!realTableName) {
  1584. TPositionHandle pos;
  1585. if (pathInfo.FromNode) {
  1586. pos = pathInfo.FromNode.Cast().Pos();
  1587. }
  1588. ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Unaccounted anonymous table: " << cluster << '.' << pathInfo.Table->Name));
  1589. return {};
  1590. }
  1591. ytPath.Path_ = realTableName;
  1592. }
  1593. return ytPath;
  1594. }
  1595. TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
  1596. const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
  1597. {
  1598. TVector<ui64> result;
  1599. TSet<TString> requestedColumns;
  1600. bool sync = true;
  1601. auto status = EstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx, sync);
  1602. if (status != IGraphTransformer::TStatus::Ok) {
  1603. return {};
  1604. }
  1605. return result;
  1606. }
  1607. IGraphTransformer::TStatus TryEstimateDataSize(TVector<ui64>& result, TSet<TString>& requestedColumns,
  1608. const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
  1609. const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
  1610. {
  1611. bool sync = false;
  1612. return EstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx, sync);
  1613. }
  1614. TYtSection UpdateInputFields(TYtSection section, TExprBase fields, TExprContext& ctx) {
  1615. auto settings = section.Settings().Ptr();
  1616. auto sysColumns = NYql::GetSettingAsColumnList(*settings, EYtSettingType::SysColumns);
  1617. if (!sysColumns.empty()) {
  1618. if (auto list = fields.Maybe<TExprList>()) {
  1619. TMap<TStringBuf, TExprNode::TPtr> fieldMap;
  1620. for (auto item: list.Cast()) {
  1621. if (auto atom = item.Maybe<TCoAtom>()) {
  1622. fieldMap.emplace(atom.Cast().Value(), item.Ptr());
  1623. } else {
  1624. fieldMap.emplace(item.Cast<TCoAtomList>().Item(0).Value(), item.Ptr());
  1625. }
  1626. }
  1627. TVector<TString> updatedSysColumns;
  1628. for (auto sys: sysColumns) {
  1629. auto sysColName = TString(YqlSysColumnPrefix).append(sys);
  1630. if (fieldMap.contains(sysColName)) {
  1631. updatedSysColumns.push_back(sys);
  1632. fieldMap.erase(sysColName);
  1633. }
  1634. }
  1635. if (updatedSysColumns.size() != sysColumns.size()) {
  1636. settings = NYql::RemoveSetting(*settings, EYtSettingType::SysColumns, ctx);
  1637. if (!updatedSysColumns.empty()) {
  1638. settings = NYql::AddSettingAsColumnList(*settings, EYtSettingType::SysColumns, updatedSysColumns, ctx);
  1639. }
  1640. }
  1641. if (fieldMap.size() != list.Cast().Size()) {
  1642. TExprNode::TListType children;
  1643. std::transform(fieldMap.begin(), fieldMap.end(), std::back_inserter(children), [](const auto& pair) { return pair.second; });
  1644. fields = TExprBase(ctx.NewList(fields.Pos(), std::move(children)));
  1645. }
  1646. }
  1647. }
  1648. auto pathsBuilder = Build<TYtPathList>(ctx, section.Paths().Pos());
  1649. for (const auto& path : section.Paths()) {
  1650. pathsBuilder.Add<TYtPath>()
  1651. .InitFrom(path)
  1652. .Columns(fields)
  1653. .Build();
  1654. }
  1655. return Build<TYtSection>(ctx, section.Pos())
  1656. .InitFrom(section)
  1657. .Paths(pathsBuilder.Done())
  1658. .Settings(settings)
  1659. .Done();
  1660. }
  1661. TYtSection UpdateInputFields(TYtSection section, TSet<TStringBuf>&& members, TExprContext& ctx, bool hasWeakFields) {
  1662. auto settings = section.Settings().Ptr();
  1663. auto sysColumns = NYql::GetSettingAsColumnList(*settings, EYtSettingType::SysColumns);
  1664. if (!sysColumns.empty()) {
  1665. TVector<TString> updatedSysColumns;
  1666. for (auto sys: sysColumns) {
  1667. auto sysColName = TString(YqlSysColumnPrefix).append(sys);
  1668. if (members.contains(sysColName)) {
  1669. updatedSysColumns.push_back(sys);
  1670. members.erase(sysColName);
  1671. }
  1672. }
  1673. if (updatedSysColumns.size() != sysColumns.size()) {
  1674. settings = NYql::RemoveSetting(*settings, EYtSettingType::SysColumns, ctx);
  1675. if (!updatedSysColumns.empty()) {
  1676. settings = NYql::AddSettingAsColumnList(*settings, EYtSettingType::SysColumns, updatedSysColumns, ctx);
  1677. }
  1678. }
  1679. }
  1680. auto fields = ToAtomList(members, section.Pos(), ctx);
  1681. auto pathsBuilder = Build<TYtPathList>(ctx, section.Paths().Pos());
  1682. for (const auto& path : section.Paths()) {
  1683. if (!hasWeakFields || path.Columns().Maybe<TCoVoid>()) {
  1684. pathsBuilder.Add<TYtPath>()
  1685. .InitFrom(path)
  1686. .Columns(fields)
  1687. .Build();
  1688. } else {
  1689. THashMap<TStringBuf, TExprNode::TPtr> weakFields;
  1690. for (auto col: path.Columns().Cast<TExprList>()) {
  1691. if (col.Ref().ChildrenSize() == 2) {
  1692. weakFields[col.Ref().Child(0)->Content()] = col.Ptr();
  1693. }
  1694. }
  1695. TExprNode::TListType updatedColumns;
  1696. for (auto member: fields->Children()) {
  1697. if (auto p = weakFields.FindPtr(member->Content())) {
  1698. updatedColumns.push_back(*p);
  1699. } else {
  1700. updatedColumns.push_back(member);
  1701. }
  1702. }
  1703. pathsBuilder.Add<TYtPath>()
  1704. .InitFrom(path)
  1705. .Columns(ctx.NewList(path.Pos(), std::move(updatedColumns)))
  1706. .Build();
  1707. }
  1708. }
  1709. return Build<TYtSection>(ctx, section.Pos())
  1710. .InitFrom(section)
  1711. .Paths(pathsBuilder.Done())
  1712. .Settings(settings)
  1713. .Done();
  1714. }
  1715. TYtPath MakeUnorderedPath(TYtPath path, bool hasLimits, TExprContext& ctx) {
  1716. bool makeUnordered = false;
  1717. bool keepSort = false;
  1718. if (auto maybeOut = path.Table().Maybe<TYtOutput>()) {
  1719. const auto out = maybeOut.Cast();
  1720. if (!IsUnorderedOutput(out)) {
  1721. makeUnordered = true;
  1722. if (!path.Ranges().Maybe<TCoVoid>()) {
  1723. for (auto range: path.Ranges().Cast<TExprList>()) {
  1724. if (range.Maybe<TYtKeyExact>() || range.Maybe<TYtKeyRange>()) {
  1725. makeUnordered = false;
  1726. } else if (range.Maybe<TYtRow>() || range.Maybe<TYtRowRange>()) {
  1727. hasLimits = true;
  1728. }
  1729. }
  1730. }
  1731. }
  1732. if (auto settings = GetOutputOp(out).Maybe<TYtTransientOpBase>().Settings()) {
  1733. hasLimits = hasLimits || NYql::HasSetting(settings.Ref(), EYtSettingType::Limit);
  1734. keepSort = NYql::HasSetting(settings.Ref(), EYtSettingType::KeepSorted);
  1735. } else if (auto settings = GetOutputOp(out).Maybe<TYtFill>().Settings()) {
  1736. keepSort = NYql::HasSetting(settings.Ref(), EYtSettingType::KeepSorted);
  1737. }
  1738. keepSort = keepSort || GetOutputOp(out).Maybe<TYtSort>();
  1739. }
  1740. if (makeUnordered && hasLimits && keepSort) {
  1741. makeUnordered = false;
  1742. }
  1743. if (makeUnordered) {
  1744. return Build<TYtPath>(ctx, path.Pos())
  1745. .InitFrom(path)
  1746. .Table<TYtOutput>()
  1747. .InitFrom(path.Table().Cast<TYtOutput>())
  1748. .Mode()
  1749. .Value(ToString(EYtSettingType::Unordered))
  1750. .Build()
  1751. .Build()
  1752. .Done();
  1753. }
  1754. return path;
  1755. }
  1756. template<bool WithUnorderedSetting>
  1757. TYtSection MakeUnorderedSection(TYtSection section, TExprContext& ctx) {
  1758. if (HasNonEmptyKeyFilter(section)) {
  1759. if constexpr (WithUnorderedSetting)
  1760. return Build<TYtSection>(ctx, section.Pos())
  1761. .Paths(section.Paths())
  1762. .Settings(NYql::AddSetting(section.Settings().Ref(), EYtSettingType::Unordered, {}, ctx))
  1763. .Done();
  1764. else
  1765. return section;
  1766. }
  1767. const bool hasLimits = NYql::HasAnySetting(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip);
  1768. bool hasUpdated = false;
  1769. TVector<TYtPath> updatedPaths;
  1770. for (auto path: section.Paths()) {
  1771. updatedPaths.push_back(MakeUnorderedPath(path, hasLimits, ctx));
  1772. hasUpdated = hasUpdated || updatedPaths.back().Raw() != path.Raw();
  1773. }
  1774. if constexpr (WithUnorderedSetting) {
  1775. return Build<TYtSection>(ctx, section.Pos())
  1776. .Paths()
  1777. .Add(updatedPaths)
  1778. .Build()
  1779. .Settings(NYql::AddSetting(section.Settings().Ref(), EYtSettingType::Unordered, {}, ctx))
  1780. .Done();
  1781. } else {
  1782. if (!hasUpdated)
  1783. return section;
  1784. return Build<TYtSection>(ctx, section.Pos())
  1785. .Paths()
  1786. .Add(updatedPaths)
  1787. .Build()
  1788. .Settings(section.Settings())
  1789. .Done();
  1790. }
  1791. }
  1792. template TYtSection MakeUnorderedSection<true>(TYtSection section, TExprContext& ctx);
  1793. template TYtSection MakeUnorderedSection<false>(TYtSection section, TExprContext& ctx);
  1794. TYtSection ClearUnorderedSection(TYtSection section, TExprContext& ctx) {
  1795. const bool hasUnorderedOut = AnyOf(section.Paths(), [](const auto& path) { auto out = path.Table().template Maybe<TYtOutput>(); return out && IsUnorderedOutput(out.Cast()); });
  1796. if (hasUnorderedOut) {
  1797. TVector<TYtPath> updatedPaths;
  1798. for (auto path: section.Paths()) {
  1799. if (auto out = path.Table().Maybe<TYtOutput>()) {
  1800. if (IsUnorderedOutput(out.Cast())) {
  1801. path = Build<TYtPath>(ctx, path.Pos())
  1802. .InitFrom(path)
  1803. .Table<TYtOutput>()
  1804. .InitFrom(out.Cast())
  1805. .Mode(TMaybeNode<TCoAtom>())
  1806. .Build()
  1807. .Done();
  1808. }
  1809. }
  1810. updatedPaths.push_back(path);
  1811. }
  1812. section = Build<TYtSection>(ctx, section.Pos())
  1813. .InitFrom(section)
  1814. .Paths()
  1815. .Add(updatedPaths)
  1816. .Build()
  1817. .Done();
  1818. }
  1819. if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Unordered)) {
  1820. section = Build<TYtSection>(ctx, section.Pos())
  1821. .InitFrom(section)
  1822. .Settings(NYql::RemoveSetting(section.Settings().Ref(), EYtSettingType::Unordered, ctx))
  1823. .Done();
  1824. }
  1825. return section;
  1826. }
  1827. TYtDSource GetDataSource(TExprBase input, TExprContext& ctx) {
  1828. TMaybeNode<TExprBase> n = input;
  1829. if (auto right = input.Maybe<TCoRight>()) {
  1830. n = right.Input();
  1831. } else if (auto content = input.Maybe<TYtTableContent>()) {
  1832. n = content.Input();
  1833. }
  1834. if (auto read = n.Maybe<TYtReadTable>())
  1835. return read.Cast().DataSource();
  1836. if (auto out = n.Maybe<TYtOutput>()) {
  1837. return TYtDSource(ctx.RenameNode(GetOutputOp(out.Cast()).DataSink().Ref(), "DataSource"));
  1838. } else {
  1839. YQL_ENSURE(false, "Unknown operation input");
  1840. }
  1841. }
  1842. TExprNode::TPtr BuildEmptyTablesRead(TPositionHandle pos, const TExprNode& userSchema, TExprContext& ctx) {
  1843. if (!EnsureArgsCount(userSchema, 2, ctx)) {
  1844. return {};
  1845. }
  1846. return ctx.Builder(pos)
  1847. .Callable("Cons!")
  1848. .World(0)
  1849. .Callable(1, "List")
  1850. .Callable(0, "ListType")
  1851. .Add(0, userSchema.ChildPtr(1))
  1852. .Seal()
  1853. .Seal()
  1854. .Seal()
  1855. .Build();
  1856. }
  1857. TExprNode::TPtr GetFlowSettings(TPositionHandle pos, const TYtState& state, TExprContext& ctx, TExprNode::TPtr settings) {
  1858. if (!settings) {
  1859. settings = ctx.NewList(pos, {});
  1860. }
  1861. if (state.Configuration->UseFlow.Get().GetOrElse(DEFAULT_USE_FLOW)) {
  1862. settings = NYql::AddSetting(*settings, EYtSettingType::Flow, {}, ctx);
  1863. }
  1864. return settings;
  1865. }
  1866. TVector<TStringBuf> GetKeyFilterColumns(const NNodes::TYtSection& section, EYtSettingTypes kind) {
  1867. TVector<TStringBuf> result;
  1868. if (kind.HasFlags(EYtSettingType::KeyFilter) && NYql::HasSetting(section.Settings().Ref(), EYtSettingType::KeyFilter)) {
  1869. for (auto keyFilter: NYql::GetAllSettingValues(section.Settings().Ref(), EYtSettingType::KeyFilter)) {
  1870. auto value = TExprList(keyFilter);
  1871. if (value.Size() > 0) {
  1872. for (auto member: value.Item(0).Cast<TCoNameValueTupleList>()) {
  1873. result.emplace_back(member.Name().Value());
  1874. }
  1875. }
  1876. }
  1877. }
  1878. if (kind.HasFlags(EYtSettingType::KeyFilter2) && NYql::HasSetting(section.Settings().Ref(), EYtSettingType::KeyFilter2)) {
  1879. for (auto keyFilter: NYql::GetAllSettingValues(section.Settings().Ref(), EYtSettingType::KeyFilter2)) {
  1880. auto value = TExprList(keyFilter);
  1881. if (value.Size() > 0) {
  1882. for (auto member: value.Item(1).Cast<TCoNameValueTupleList>()) {
  1883. if (member.Name().Value() == "usedKeys") {
  1884. for (auto key : member.Value().Cast<TCoAtomList>()) {
  1885. result.emplace_back(key.Value());
  1886. }
  1887. }
  1888. }
  1889. }
  1890. }
  1891. }
  1892. return result;
  1893. }
  1894. bool HasNonEmptyKeyFilter(const NNodes::TYtSection& section) {
  1895. auto hasChildren = [](const auto& node) { return node->ChildrenSize() > 0; };
  1896. return AnyOf(NYql::GetAllSettingValues(section.Settings().Ref(), EYtSettingType::KeyFilter), hasChildren) ||
  1897. AnyOf(NYql::GetAllSettingValues(section.Settings().Ref(), EYtSettingType::KeyFilter2), hasChildren);
  1898. }
  1899. TYtReadTable ConvertContentInputToRead(TExprBase input, TMaybeNode<TCoNameValueTupleList> settings, TExprContext& ctx, TMaybeNode<TCoAtomList> customFields) {
  1900. TExprNode::TPtr world;
  1901. TVector<TYtSection> sections;
  1902. TExprBase columns = customFields ? TExprBase(customFields.Cast()) : TExprBase(Build<TCoVoid>(ctx, input.Pos()).Done());
  1903. if (auto out = input.Maybe<TYtOutput>()) {
  1904. world = ctx.NewWorld(input.Pos());
  1905. if (!settings) {
  1906. settings = Build<TCoNameValueTupleList>(ctx, input.Pos()).Done();
  1907. }
  1908. sections.push_back(Build<TYtSection>(ctx, input.Pos())
  1909. .Paths()
  1910. .Add()
  1911. .Table(out.Cast())
  1912. .Columns(columns)
  1913. .Ranges<TCoVoid>().Build()
  1914. .Stat<TCoVoid>().Build()
  1915. .Build()
  1916. .Build()
  1917. .Settings(settings.Cast())
  1918. .Done());
  1919. }
  1920. else {
  1921. auto read = input.Maybe<TYtReadTable>();
  1922. YQL_ENSURE(read, "Unknown operation input");
  1923. world = read.Cast().World().Ptr();
  1924. for (auto section: read.Cast().Input()) {
  1925. if (settings) {
  1926. section = Build<TYtSection>(ctx, section.Pos())
  1927. .InitFrom(section)
  1928. .Settings(MergeSettings(section.Settings().Ref(), settings.Cast().Ref(), ctx))
  1929. .Done();
  1930. }
  1931. if (customFields) {
  1932. section = UpdateInputFields(section, customFields.Cast(), ctx);
  1933. }
  1934. sections.push_back(section);
  1935. }
  1936. }
  1937. return Build<TYtReadTable>(ctx, input.Pos())
  1938. .World(world)
  1939. .DataSource(GetDataSource(input, ctx))
  1940. .Input()
  1941. .Add(sections)
  1942. .Build()
  1943. .Done();
  1944. }
  1945. size_t GetMapDirectOutputsCount(const NNodes::TYtMapReduce& mapReduce) {
  1946. if (mapReduce.Mapper().Maybe<TCoVoid>()) {
  1947. return 0;
  1948. }
  1949. const auto& mapOutputType = GetSeqItemType(*mapReduce.Mapper().Ref().GetTypeAnn());
  1950. if (mapOutputType.GetKind() != ETypeAnnotationKind::Variant) {
  1951. return 0;
  1952. }
  1953. auto numVariants = mapOutputType.Cast<TVariantExprType>()->GetUnderlyingType()->Cast<TTupleExprType>()->GetSize();
  1954. YQL_ENSURE(numVariants > 1);
  1955. return numVariants - 1;
  1956. }
  1957. bool HasYtRowNumber(const TExprNode& node) {
  1958. bool hasRowNumber = false;
  1959. VisitExpr(node, [&hasRowNumber](const TExprNode& n) {
  1960. if (TYtRowNumber::Match(&n)) {
  1961. hasRowNumber = true;
  1962. } else if (TYtOutput::Match(&n)) {
  1963. return false;
  1964. }
  1965. return !hasRowNumber;
  1966. });
  1967. return hasRowNumber;
  1968. }
  1969. } // NYql