yql_yt_optimize.cpp 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. #include "yql_yt_optimize.h"
  2. #include "yql_yt_op_settings.h"
  3. #include "yql_yt_table.h"
  4. #include "yql_yt_helpers.h"
  5. #include "yql_yt_provider_impl.h"
  6. #include <yt/yql/providers/yt/lib/res_pull/table_limiter.h>
  7. #include <yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.h>
  8. #include <yt/yql/providers/yt/common/yql_configuration.h>
  9. #include <yql/essentials/providers/common/codec/yql_codec_type_flags.h>
  10. #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
  11. #include <yql/essentials/core/type_ann/type_ann_expr.h>
  12. #include <yql/essentials/core/yql_expr_optimize.h>
  13. #include <yql/essentials/core/yql_type_helpers.h>
  14. #include <yql/essentials/core/yql_expr_constraint.h>
  15. #include <yql/essentials/core/yql_graph_transformer.h>
  16. #include <yql/essentials/core/yql_expr_csee.h>
  17. #include <yql/essentials/public/udf/udf_value.h>
  18. #include <yql/essentials/utils/log/log.h>
  19. #include <yql/essentials/core/services/yql_transform_pipeline.h>
  20. #include <util/generic/xrange.h>
  21. #include <util/generic/ptr.h>
  22. #include <util/generic/vector.h>
  23. #include <util/generic/size_literals.h>
  24. #include <util/generic/maybe.h>
  25. #include <utility>
  26. namespace NYql {
  27. using namespace NNodes;
  28. namespace {
  29. TMaybeNode<TYtSection> MaterializeSectionIfRequired(TExprBase world, TYtSection section, TYtDSink dataSink, TYqlRowSpecInfo::TPtr outRowSpec, bool keepSortness,
  30. const TExprNode::TListType& limitNodes, const TYtState::TPtr& state, TExprContext& ctx)
  31. {
  32. const bool hasLimit = NYql::HasAnySetting(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip);
  33. bool needMaterialize = hasLimit && NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Sample);
  34. bool hasDynamic = false;
  35. if (!needMaterialize) {
  36. bool hasRanges = false;
  37. for (TYtPath path: section.Paths()) {
  38. TYtPathInfo pathInfo(path);
  39. hasDynamic = hasDynamic || (pathInfo.Table->Meta && pathInfo.Table->Meta->IsDynamic);
  40. hasRanges = hasRanges || pathInfo.Ranges;
  41. }
  42. needMaterialize = hasRanges || (hasLimit && hasDynamic);
  43. }
  44. if (needMaterialize) {
  45. auto scheme = section.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType();
  46. auto path = CopyOrTrivialMap(section.Pos(),
  47. world, dataSink,
  48. *scheme,
  49. Build<TYtSection>(ctx, section.Pos())
  50. .Paths(section.Paths())
  51. .Settings(NYql::RemoveSettings(section.Settings().Ref(),
  52. EYtSettingType::Take | EYtSettingType::Skip |
  53. EYtSettingType::KeyFilter | EYtSettingType::KeyFilter2 | EYtSettingType::JoinLabel |
  54. EYtSettingType::Unordered | EYtSettingType::NonUnique | EYtSettingType::StatColumns, ctx))
  55. .Done(),
  56. outRowSpec,
  57. ctx,
  58. state,
  59. TCopyOrTrivialMapOpts()
  60. .SetTryKeepSortness(keepSortness || (!ctx.IsConstraintEnabled<TSortedConstraintNode>() && (!hasDynamic || (!hasLimit && limitNodes.empty()))))
  61. .SetRangesResetSort(false)
  62. .SetSectionUniq(section.Ref().GetConstraint<TDistinctConstraintNode>())
  63. .SetLimitNodes(limitNodes)
  64. );
  65. return Build<TYtSection>(ctx, section.Pos())
  66. .Paths()
  67. .Add(path)
  68. .Build()
  69. .Settings(NYql::RemoveSettings(section.Settings().Ref(), EYtSettingType::Sample | EYtSettingType::SysColumns, ctx))
  70. .Done();
  71. }
  72. return {};
  73. }
  74. TMaybeNode<TYtSection> UpdateSectionWithRange(TExprBase world, TYtSection section, const TRecordsRange& range,
  75. TYtDSink dataSink, TYqlRowSpecInfo::TPtr outRowSpec, bool keepSortness, bool allowWorldDeps, bool allowMaterialize,
  76. TSyncMap& syncList, const TYtState::TPtr& state, TExprContext& ctx)
  77. {
  78. bool isEmptyInput = allowWorldDeps;
  79. TVector<TYtPath> updatedPaths;
  80. TVector<TYtPath> skippedPaths;
  81. if (auto limiter = TTableLimiter(range)) {
  82. if (auto materialized = MaterializeSectionIfRequired(world, section, dataSink, outRowSpec, keepSortness,
  83. {NYql::KeepOnlySettings(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip, ctx)}, state, ctx))
  84. {
  85. if (!allowMaterialize || state->Types->EvaluationInProgress) {
  86. // Keep section as is
  87. return {};
  88. }
  89. if (!allowWorldDeps) {
  90. if (const auto out = materialized.Paths().Item(0).Table().Maybe<TYtOutput>()) {
  91. syncList.emplace(GetOutputOp(out.Cast()).Ptr(), syncList.size());
  92. }
  93. }
  94. return materialized;
  95. }
  96. for (size_t i: xrange(section.Paths().Size())) {
  97. auto path = section.Paths().Item(i);
  98. TYtPathInfo pathInfo(path);
  99. if (!pathInfo.Table->Stat) {
  100. // Not all tables have required info
  101. return {};
  102. }
  103. ui64 startRecordInTable = 0;
  104. ui64 endRecordInTable = 0;
  105. if (pathInfo.Table->Stat->RecordsCount) {
  106. if (!limiter.NextTable(pathInfo.Table->Stat->RecordsCount)) {
  107. if (allowWorldDeps) {
  108. skippedPaths.push_back(path);
  109. } else {
  110. pathInfo.Stat.Drop();
  111. pathInfo.Ranges = TYtRangesInfo::MakeEmptyRange();
  112. updatedPaths.push_back(pathInfo.ToExprNode(ctx, path.Pos(), path.Table()).Cast<TYtPath>());
  113. }
  114. continue;
  115. }
  116. startRecordInTable = limiter.GetTableStart();
  117. endRecordInTable = limiter.GetTableZEnd(); // 0 means the entire table usage
  118. }
  119. if (startRecordInTable || endRecordInTable) {
  120. pathInfo.Stat.Drop();
  121. pathInfo.Ranges = MakeIntrusive<TYtRangesInfo>();
  122. TYtRangesInfo::TRowRange range;
  123. if (startRecordInTable) {
  124. range.Lower = startRecordInTable;
  125. }
  126. if (endRecordInTable) {
  127. range.Upper = endRecordInTable;
  128. }
  129. pathInfo.Ranges->AddRowRange(range);
  130. updatedPaths.push_back(pathInfo.ToExprNode(ctx, path.Pos(), path.Table()).Cast<TYtPath>());
  131. } else {
  132. updatedPaths.push_back(path);
  133. }
  134. isEmptyInput = false;
  135. if (limiter.Exceed()) {
  136. if (allowWorldDeps) {
  137. for (size_t j = i + 1; j < section.Paths().Size(); ++j) {
  138. skippedPaths.push_back(section.Paths().Item(j));
  139. }
  140. } else {
  141. for (size_t j = i + 1; j < section.Paths().Size(); ++j) {
  142. auto path = section.Paths().Item(j);
  143. path = Build<TYtPath>(ctx, path.Pos())
  144. .InitFrom(path)
  145. .Ranges<TExprList>()
  146. .Build()
  147. .Stat<TCoVoid>().Build()
  148. .Done();
  149. updatedPaths.push_back(path);
  150. }
  151. }
  152. break;
  153. }
  154. }
  155. } else if (!allowWorldDeps) {
  156. for (auto path: section.Paths()) {
  157. updatedPaths.push_back(Build<TYtPath>(ctx, path.Pos())
  158. .InitFrom(path)
  159. .Ranges<TExprList>()
  160. .Build()
  161. .Stat<TCoVoid>().Build()
  162. .Done());
  163. }
  164. }
  165. if (isEmptyInput) {
  166. skippedPaths.assign(section.Paths().begin(), section.Paths().end());
  167. }
  168. for (auto path: skippedPaths) {
  169. if (auto out = path.Table().Maybe<TYtOutput>()) {
  170. syncList.emplace(GetOutputOp(out.Cast()).Ptr(), syncList.size());
  171. }
  172. }
  173. if (isEmptyInput) {
  174. return MakeEmptySection(section, dataSink, keepSortness, state, ctx);
  175. }
  176. return Build<TYtSection>(ctx, section.Pos())
  177. .Paths()
  178. .Add(updatedPaths)
  179. .Build()
  180. .Settings(NYql::RemoveSettings(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip, ctx))
  181. .Done();
  182. }
  183. void EnableKeyBoundApi(TYtPathInfo& pathInfo, const TYtState::TPtr& state) {
  184. if (!pathInfo.Ranges) {
  185. return;
  186. }
  187. YQL_ENSURE(pathInfo.Table);
  188. const bool useKeyBoundApi =
  189. state->Configuration->_UseKeyBoundApi.Get(pathInfo.Table->Cluster).GetOrElse(DEFAULT_USE_KEY_BOUND_API);
  190. pathInfo.Ranges->SetUseKeyBoundApi(useKeyBoundApi);
  191. }
  192. TMaybeNode<TYtSection> UpdateSectionWithLegacyFilters(TYtSection section, const TVector<TExprBase>& filters, const TYtState::TPtr& state, TExprContext& ctx)
  193. {
  194. TVector<TExprBase> commonFilters;
  195. TMap<size_t, TVector<TExprBase>> tableFilters;
  196. for (auto filter: filters) {
  197. auto filterList = filter.Cast<TExprList>();
  198. if (filterList.Size() == 2) {
  199. tableFilters[FromString<size_t>(filterList.Item(1).Cast<TCoAtom>().Value())].push_back(filterList.Item(0));
  200. }
  201. else {
  202. commonFilters.push_back(filterList.Item(0));
  203. }
  204. }
  205. TVector<TYtPath> updatedPaths;
  206. size_t tableIndex = 0;
  207. for (auto path: section.Paths()) {
  208. if (commonFilters.size() == filters.size()) {
  209. TYtPathInfo pathInfo(path);
  210. pathInfo.Stat.Drop();
  211. pathInfo.Ranges = TYtRangesInfo::ApplyLegacyKeyFilters(commonFilters, pathInfo.Table->RowSpec, ctx);
  212. EnableKeyBoundApi(pathInfo, state);
  213. updatedPaths.push_back(pathInfo.ToExprNode(ctx, path.Pos(), path.Table()).Cast<TYtPath>());
  214. }
  215. else {
  216. TVector<TExprBase> pathFilters = commonFilters;
  217. if (auto p = tableFilters.FindPtr(tableIndex)) {
  218. pathFilters.insert(pathFilters.end(), p->begin(), p->end());
  219. }
  220. if (pathFilters.empty()) {
  221. updatedPaths.push_back(path);
  222. }
  223. else {
  224. TYtPathInfo pathInfo(path);
  225. pathInfo.Stat.Drop();
  226. pathInfo.Ranges = TYtRangesInfo::ApplyLegacyKeyFilters(pathFilters, pathInfo.Table->RowSpec, ctx);
  227. EnableKeyBoundApi(pathInfo, state);
  228. updatedPaths.push_back(pathInfo.ToExprNode(ctx, path.Pos(), path.Table()).Cast<TYtPath>());
  229. }
  230. }
  231. ++tableIndex;
  232. }
  233. auto updatedSettings = NYql::RemoveSetting(section.Settings().Ref(), EYtSettingType::KeyFilter, ctx);
  234. updatedSettings = NYql::AddSetting(*updatedSettings, EYtSettingType::KeyFilter, ctx.NewList(section.Pos(), {}), ctx);
  235. return Build<TYtSection>(ctx, section.Pos())
  236. .Paths()
  237. .Add(updatedPaths)
  238. .Build()
  239. .Settings(updatedSettings)
  240. .Done();
  241. }
  242. TMaybeNode<TYtSection> UpdateSectionWithFilters(TYtSection section, const TVector<TExprBase>& filters, const TYtState::TPtr& state, TExprContext& ctx) {
  243. TMap<size_t, TExprNode::TPtr> filtersByTableIndex;
  244. TExprNode::TPtr commonFilter;
  245. for (auto filter: filters) {
  246. auto filterList = filter.Cast<TExprList>();
  247. auto computedFilter = filterList.Item(0).Ptr();
  248. if (filterList.Size() == 3) {
  249. for (auto idxNode : filterList.Item(2).Cast<TCoAtomList>()) {
  250. size_t idx = FromString<size_t>(idxNode.Value());
  251. YQL_ENSURE(!filtersByTableIndex.contains(idx));
  252. filtersByTableIndex[idx] = computedFilter;
  253. }
  254. } else {
  255. YQL_ENSURE(!commonFilter);
  256. commonFilter = computedFilter;
  257. }
  258. }
  259. YQL_ENSURE(filtersByTableIndex.empty() && commonFilter || !commonFilter && !filtersByTableIndex.empty());
  260. TVector<TYtPath> updatedPaths;
  261. size_t tableIndex = 0;
  262. for (auto path: section.Paths()) {
  263. TExprNode::TPtr filter;
  264. if (commonFilter) {
  265. filter = commonFilter;
  266. } else {
  267. auto it = filtersByTableIndex.find(tableIndex);
  268. if (it != filtersByTableIndex.end()) {
  269. filter = it->second;
  270. }
  271. }
  272. if (!filter) {
  273. updatedPaths.push_back(path);
  274. } else {
  275. TYtPathInfo pathInfo(path);
  276. pathInfo.Stat.Drop();
  277. pathInfo.Ranges = TYtRangesInfo::ApplyKeyFilter(*filter);
  278. EnableKeyBoundApi(pathInfo, state);
  279. updatedPaths.push_back(pathInfo.ToExprNode(ctx, path.Pos(), path.Table()).Cast<TYtPath>());
  280. }
  281. ++tableIndex;
  282. }
  283. auto updatedSettings = NYql::RemoveSetting(section.Settings().Ref(), EYtSettingType::KeyFilter2, ctx);
  284. updatedSettings = NYql::AddSetting(*updatedSettings, EYtSettingType::KeyFilter2, ctx.NewList(section.Pos(), {}), ctx);
  285. return Build<TYtSection>(ctx, section.Pos())
  286. .Paths()
  287. .Add(updatedPaths)
  288. .Build()
  289. .Settings(updatedSettings)
  290. .Done();
  291. }
  292. } //namespace
  293. TMaybeNode<TYtSection> UpdateSectionWithSettings(TExprBase world, TYtSection section, TYtDSink dataSink, TYqlRowSpecInfo::TPtr outRowSpec, bool keepSortness, bool allowWorldDeps, bool allowMaterialize,
  294. TSyncMap& syncList, const TYtState::TPtr& state, TExprContext& ctx)
  295. {
  296. if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::DirectRead)) {
  297. return {};
  298. }
  299. if (!NYql::HasAnySetting(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip | EYtSettingType::KeyFilter | EYtSettingType::KeyFilter2)) {
  300. return {};
  301. }
  302. if (HasNodesToCalculate(section.Ptr())) {
  303. return {};
  304. }
  305. TRecordsRange range;
  306. TVector<TExprBase> keyFilters;
  307. bool legacyKeyFilters = false;
  308. for (auto s: section.Settings()) {
  309. switch (FromString<EYtSettingType>(s.Name().Value())) {
  310. case EYtSettingType::KeyFilter:
  311. legacyKeyFilters = true;
  312. [[fallthrough]];
  313. case EYtSettingType::KeyFilter2:
  314. if (s.Value().Cast<TExprList>().Size() > 0) {
  315. keyFilters.push_back(s.Value().Cast());
  316. }
  317. break;
  318. default:
  319. // Skip other settings
  320. break;
  321. }
  322. }
  323. range.Fill(section.Settings().Ref());
  324. if (range.Limit || range.Offset) {
  325. return UpdateSectionWithRange(world, section, range, dataSink, outRowSpec, keepSortness, allowWorldDeps, allowMaterialize, syncList, state, ctx);
  326. }
  327. if (!keyFilters.empty()) {
  328. return legacyKeyFilters ? UpdateSectionWithLegacyFilters(section, keyFilters, state, ctx) : UpdateSectionWithFilters(section, keyFilters, state, ctx);
  329. }
  330. return {};
  331. }
  332. TYtSection MakeEmptySection(TYtSection section, NNodes::TYtDSink dataSink, bool keepSortness, const TYtState::TPtr& state, TExprContext& ctx) {
  333. TYtOutTableInfo outTable(GetSequenceItemType(section, false)->Cast<TStructExprType>(),
  334. state->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE);
  335. if (section.Paths().Size() == 1) {
  336. auto srcTableInfo = TYtTableBaseInfo::Parse(section.Paths().Item(0).Table());
  337. if (keepSortness && srcTableInfo->RowSpec && srcTableInfo->RowSpec->IsSorted()) {
  338. outTable.RowSpec->CopySortness(ctx, *srcTableInfo->RowSpec, TYqlRowSpecInfo::ECopySort::WithCalc);
  339. }
  340. }
  341. outTable.SetUnique(section.Ref().GetConstraint<TDistinctConstraintNode>(), section.Pos(), ctx);
  342. return Build<TYtSection>(ctx, section.Pos())
  343. .Paths()
  344. .Add()
  345. .Table<TYtOutput>()
  346. .Operation<TYtTouch>()
  347. .World<TCoWorld>().Build()
  348. .DataSink(dataSink)
  349. .Output()
  350. .Add(outTable.ToExprNode(ctx, section.Pos()).Cast<TYtOutTable>())
  351. .Build()
  352. .Build()
  353. .OutIndex().Value("0").Build()
  354. .Build()
  355. .Columns<TCoVoid>().Build()
  356. .Ranges<TCoVoid>().Build()
  357. .Stat<TCoVoid>().Build()
  358. .Build()
  359. .Build()
  360. .Settings(NYql::RemoveSettings(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip | EYtSettingType::Sample, ctx))
  361. .Done();
  362. }
  363. TExprNode::TPtr OptimizeReadWithSettings(const TExprNode::TPtr& node, bool allowWorldDeps, bool allowMaterialize, TSyncMap& syncList,
  364. const TYtState::TPtr& state, TExprContext& ctx)
  365. {
  366. auto read = TYtReadTable(node);
  367. auto dataSink = TYtDSink(ctx.RenameNode(read.DataSource().Ref(), "DataSink"));
  368. bool hasUpdates = false;
  369. TVector<TExprBase> updatedSections;
  370. for (auto section: read.Input()) {
  371. updatedSections.push_back(section);
  372. const bool keepSort = ctx.IsConstraintEnabled<TSortedConstraintNode>() && !NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Unordered);
  373. if (auto updatedSection = UpdateSectionWithSettings(read.World(), section, dataSink, {}, keepSort, allowWorldDeps, allowMaterialize, syncList, state, ctx)) {
  374. updatedSections.back() = updatedSection.Cast();
  375. hasUpdates = true;
  376. }
  377. }
  378. if (!hasUpdates) {
  379. return node;
  380. }
  381. auto res = ctx.ChangeChild(read.Ref(), TYtReadTable::idx_Input,
  382. Build<TYtSectionList>(ctx, read.Input().Pos())
  383. .Add(updatedSections)
  384. .Done().Ptr());
  385. return res;
  386. }
  387. IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& input, TExprNode::TPtr& output, const TYtState::TPtr& state,
  388. TExprContext& ctx, bool estimateTableContentWeight)
  389. {
  390. auto current = input;
  391. output.Reset();
  392. for (;;) {
  393. TProcessedNodesSet ignoreNodes;
  394. VisitExpr(current, [&ignoreNodes](const TExprNode::TPtr& node) {
  395. if (TYtOutput::Match(node.Get())) {
  396. // Stop traversing dependent operations
  397. ignoreNodes.insert(node->UniqueId());
  398. return false;
  399. }
  400. return true;
  401. });
  402. TOptimizeExprSettings settings(state->Types);
  403. settings.CustomInstantTypeTransformer = state->Types->CustomInstantTypeTransformer.Get();
  404. settings.ProcessedNodes = &ignoreNodes;
  405. TParentsMap parentsMap;
  406. GatherParents(*current, parentsMap);
  407. TExprNode::TPtr newCurrent;
  408. auto status = OptimizeExpr(current, newCurrent,
  409. [&parentsMap, current, state, estimateTableContentWeight](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
  410. if (auto maybeContent = TMaybeNode<TYtTableContent>(node)) {
  411. auto content = maybeContent.Cast();
  412. if (NYql::HasAnySetting(content.Settings().Ref(), EYtSettingType::MemUsage | EYtSettingType::Small)) {
  413. return node;
  414. }
  415. ui64 collectRowFactor = 0;
  416. if (auto setting = NYql::GetSetting(content.Settings().Ref(), EYtSettingType::RowFactor)) {
  417. collectRowFactor = FromString<ui64>(setting->Child(1)->Content());
  418. } else {
  419. const auto contentItemType = content.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType();
  420. size_t fieldsCount = 0;
  421. switch (contentItemType->GetKind()) {
  422. case ETypeAnnotationKind::Struct:
  423. fieldsCount = contentItemType->Cast<TStructExprType>()->GetSize();
  424. break;
  425. case ETypeAnnotationKind::Tuple:
  426. fieldsCount = contentItemType->Cast<TTupleExprType>()->GetSize();
  427. break;
  428. default:
  429. break;
  430. }
  431. collectRowFactor = 2 * (1 + fieldsCount) * sizeof(NKikimr::NUdf::TUnboxedValuePod);
  432. }
  433. bool wrapToCollect = false;
  434. TVector<std::pair<double, ui64>> factors; // first: sizeFactor, second: rowFactor
  435. TNodeSet tableContentConsumers;
  436. if (!GetTableContentConsumerNodes(*node, *current, parentsMap, tableContentConsumers)) {
  437. wrapToCollect = true;
  438. factors.emplace_back(2., collectRowFactor);
  439. }
  440. else {
  441. for (auto consumer: tableContentConsumers) {
  442. if (consumer->IsCallable({"ToDict","SqueezeToDict", "SqlIn"})) {
  443. double sizeFactor = 1.;
  444. ui64 rowFactor = 0ULL;
  445. if (auto err = CalcToDictFactors(*consumer, ctx, sizeFactor, rowFactor)) {
  446. ctx.AddError(*err);
  447. return {};
  448. }
  449. factors.emplace_back(sizeFactor, rowFactor);
  450. }
  451. else if (consumer->IsCallable("Collect")) {
  452. factors.emplace_back(2., collectRowFactor);
  453. }
  454. }
  455. }
  456. ui64 memUsage = 0;
  457. ui64 dataWeight = 0;
  458. ui64 itemsCount = 0;
  459. bool useItemsCount = !NYql::HasSetting(content.Settings().Ref(), EYtSettingType::ItemsCount);
  460. if (factors.empty()) {
  461. // No ToDict or Collect consumers. Assume memory usage equals to max row size on YT
  462. memUsage = 16_MB;
  463. useItemsCount = false;
  464. }
  465. if (estimateTableContentWeight || !factors.empty()) {
  466. if (auto maybeRead = content.Input().Maybe<TYtReadTable>()) {
  467. TVector<ui64> records;
  468. TVector<TYtPathInfo::TPtr> tableInfos;
  469. bool hasNotCalculated = false;
  470. for (auto section: maybeRead.Cast().Input()) {
  471. for (auto path: section.Paths()) {
  472. TYtPathInfo::TPtr info = MakeIntrusive<TYtPathInfo>(path);
  473. if (info->Table->Stat) {
  474. ui64 tableRecord = info->Table->Stat->RecordsCount;
  475. if (info->Ranges) {
  476. const auto used = info->Ranges->GetUsedRows(tableRecord);
  477. tableRecord = used.GetOrElse(tableRecord);
  478. if (used) {
  479. itemsCount += *used;
  480. } else {
  481. useItemsCount = false;
  482. }
  483. } else {
  484. itemsCount += tableRecord;
  485. }
  486. if (info->Table->Meta->IsDynamic) {
  487. useItemsCount = false;
  488. }
  489. records.push_back(tableRecord);
  490. tableInfos.push_back(info);
  491. }
  492. else {
  493. YQL_CLOG(INFO, ProviderYt) << "Assume 1Gb memory usage for YtTableContent #"
  494. << node->UniqueId() << " because input table is not calculated yet";
  495. memUsage += 1_GB;
  496. hasNotCalculated = true;
  497. useItemsCount = false;
  498. break;
  499. }
  500. }
  501. if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Sample)) {
  502. useItemsCount = false;
  503. }
  504. if (hasNotCalculated) {
  505. break;
  506. }
  507. }
  508. if (!hasNotCalculated && !tableInfos.empty()) {
  509. if (auto dataSizes = EstimateDataSize(TString{maybeRead.Cast().DataSource().Cluster().Value()}, tableInfos, Nothing(), *state, ctx)) {
  510. YQL_ENSURE(dataSizes->size() == records.size());
  511. for (size_t i: xrange(records.size())) {
  512. for (auto& factor: factors) {
  513. memUsage += factor.first * dataSizes->at(i) + factor.second * records.at(i);
  514. }
  515. dataWeight += dataSizes->at(i);
  516. }
  517. } else {
  518. return {};
  519. }
  520. }
  521. }
  522. else {
  523. TYtOutTableInfo info(GetOutTable(content.Input().Cast<TYtOutput>()));
  524. if (info.Stat) {
  525. const ui64 dataSize = info.Stat->DataSize;
  526. const ui64 records = info.Stat->RecordsCount;
  527. for (auto& factor: factors) {
  528. memUsage += factor.first * dataSize + factor.second * records;
  529. }
  530. itemsCount += records;
  531. dataWeight += dataSize;
  532. }
  533. else {
  534. YQL_CLOG(INFO, ProviderYt) << "Assume 1Gb memory usage for YtTableContent #"
  535. << node->UniqueId() << " because input table is not calculated yet";
  536. memUsage += 1_GB;
  537. useItemsCount = false;
  538. }
  539. }
  540. }
  541. auto settings = content.Settings().Ptr();
  542. settings = NYql::AddSetting(*settings, EYtSettingType::MemUsage, ctx.NewAtom(node->Pos(), ToString(memUsage), TNodeFlags::Default), ctx);
  543. if (useItemsCount) {
  544. settings = NYql::AddSetting(*settings, EYtSettingType::ItemsCount, ctx.NewAtom(node->Pos(), ToString(itemsCount), TNodeFlags::Default), ctx);
  545. }
  546. if (estimateTableContentWeight && dataWeight < state->Configuration->TableContentLocalExecution.Get().GetOrElse(DEFAULT_TABLE_CONTENT_LOCAL_EXEC)) {
  547. settings = NYql::AddSetting(*settings, EYtSettingType::Small, {}, ctx);
  548. }
  549. return ctx.WrapByCallableIf(wrapToCollect, "Collect", ctx.ChangeChild(*node, TYtTableContent::idx_Settings, std::move(settings)));
  550. }
  551. return node;
  552. },
  553. ctx, settings);
  554. if (IGraphTransformer::TStatus::Error == status.Level) {
  555. ctx.AddError(TIssue(ctx.GetPosition(current->Pos()), TStringBuilder() << "Failed to update YtTableContent memory usage in node: " << current->Content()));
  556. return status;
  557. }
  558. if (newCurrent != current) {
  559. if (current->IsLambda()) {
  560. YQL_ENSURE(newCurrent->IsLambda());
  561. YQL_ENSURE(newCurrent->Head().ChildrenSize() == current->Head().ChildrenSize());
  562. for (size_t i = 0; i < newCurrent->Head().ChildrenSize(); ++i) {
  563. newCurrent->Head().Child(i)->SetTypeAnn(current->Head().Child(i)->GetTypeAnn());
  564. newCurrent->Head().Child(i)->CopyConstraints(*current->Head().Child(i));
  565. }
  566. }
  567. auto typeTransformer = CreateTypeAnnotationTransformer(CreateExtCallableTypeAnnotationTransformer(*state->Types, true), *state->Types);
  568. auto constrTransformer = CreateConstraintTransformer(*state->Types, true, true);
  569. TVector<TTransformStage> transformers;
  570. const auto issueCode = TIssuesIds::CORE_TYPE_ANN;
  571. transformers.push_back(TTransformStage(typeTransformer, "TypeAnnotation", issueCode));
  572. transformers.push_back(TTransformStage(
  573. CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { return UpdateCompletness(input, output, ctx); }),
  574. "UpdateCompletness", issueCode));
  575. transformers.push_back(TTransformStage(constrTransformer, "Constraints", issueCode));
  576. auto fullTransformer = CreateCompositeGraphTransformer(transformers, false);
  577. status = InstantTransform(*fullTransformer, newCurrent, ctx);
  578. if (status.Level == IGraphTransformer::TStatus::Error) {
  579. return status;
  580. }
  581. current = newCurrent;
  582. continue;
  583. }
  584. output = current;
  585. return IGraphTransformer::TStatus::Ok;
  586. }
  587. }
  588. struct TPeepholePipelineConfigurator : public IPipelineConfigurator {
  589. TPeepholePipelineConfigurator(TYtState::TPtr state)
  590. : State_(std::move(state))
  591. {}
  592. private:
  593. void AfterCreate(TTransformationPipeline*) const final {}
  594. void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
  595. pipeline->Add(CreateYtPeepholeTransformer(State_, {}), "Peephole");
  596. pipeline->Add(CreateYtWideFlowTransformer(State_), "WideFlow");
  597. pipeline->Add(CreateYtBlockInputTransformer(State_), "BlockInput");
  598. }
  599. void AfterOptimize(TTransformationPipeline*) const final {}
  600. const TYtState::TPtr State_;
  601. };
  602. struct TPeepholeFinalPipelineConfigurator : public IPipelineConfigurator {
  603. TPeepholeFinalPipelineConfigurator(TYtState::TPtr state)
  604. : State_(std::move(state))
  605. {}
  606. private:
  607. void AfterCreate(TTransformationPipeline*) const final {}
  608. void AfterTypeAnnotation(TTransformationPipeline*) const final {}
  609. void AfterOptimize(TTransformationPipeline* pipeline) const final {
  610. pipeline->Add(CreateYtBlockOutputTransformer(State_), "BlockOutput");
  611. }
  612. const TYtState::TPtr State_;
  613. };
  614. IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TExprNode::TPtr& output,
  615. const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, bool estimateTableContentWeight)
  616. {
  617. if (const auto status = UpdateTableContentMemoryUsage(input, output, state, ctx, estimateTableContentWeight);
  618. status.Level != IGraphTransformer::TStatus::Ok) {
  619. return status;
  620. }
  621. const TPeepholePipelineConfigurator wideFlowTransformers(state);
  622. const TPeepholeFinalPipelineConfigurator wideFlowFinalTransformers(state);
  623. TPeepholeSettings peepholeSettings;
  624. peepholeSettings.CommonConfig = &wideFlowTransformers;
  625. peepholeSettings.FinalConfig = &wideFlowFinalTransformers;
  626. return PeepHoleOptimizeNode(output, output, ctx, *state->Types, nullptr, hasNonDeterministicFunctions, peepholeSettings);
  627. }
  628. } // NYql