yql_yt_provider_impl.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. #include "yql_yt_provider_impl.h"
  2. #include "yql_yt_op_settings.h"
  3. #include "yql_yt_table.h"
  4. #include "yql_yt_helpers.h"
  5. #include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
  6. #include <yql/essentials/core/yql_expr_optimize.h>
  7. #include <util/string/builder.h>
  8. namespace NYql {
  9. using namespace NNodes;
  10. void ScanPlanDependencies(const TExprNode::TPtr& input, TExprNode::TListType& children) {
  11. VisitExpr(input, [&children](const TExprNode::TPtr& node) {
  12. if (TMaybeNode<TYtReadTable>(node)) {
  13. children.push_back(node);
  14. return false;
  15. }
  16. if (const auto maybeOutput = TMaybeNode<TYtOutput>(node)) {
  17. const auto& output = maybeOutput.Cast();
  18. if (const auto& maybeTryFirst = output.Operation().Maybe<TYtTryFirst>()) {
  19. const auto& tryFirst = maybeTryFirst.Cast();
  20. children.emplace_back(tryFirst.Second().Ptr());
  21. children.emplace_back(tryFirst.First().Ptr());
  22. } else
  23. children.emplace_back(GetOutputOp(output).Ptr());
  24. return false;
  25. }
  26. if (node->IsCallable("DqCnResult")) { // For TYtDqProcessWrite.
  27. children.emplace_back(node->HeadPtr());
  28. return false;
  29. }
  30. return true;
  31. });
  32. }
  33. void ScanForUsedOutputTables(const TExprNode& input, TVector<TString>& usedNodeIds)
  34. {
  35. VisitExpr(input, [&usedNodeIds](const TExprNode& node) {
  36. if (auto maybeYtOutput = TMaybeNode<TYtOutput>(&node)) {
  37. auto ytOutput = maybeYtOutput.Cast();
  38. TString cluster = TString{GetOutputOp(ytOutput).DataSink().Cluster().Value()};
  39. TString table = TString{GetOutTable(ytOutput).Cast<TYtOutTable>().Name().Value()};
  40. if (!cluster.empty() && !table.empty()) {
  41. usedNodeIds.push_back(MakeUsedNodeId(cluster, table));
  42. }
  43. return false;
  44. }
  45. return true;
  46. });
  47. }
  48. TString MakeUsedNodeId(const TString& cluster, const TString& table)
  49. {
  50. YQL_ENSURE(!cluster.empty());
  51. YQL_ENSURE(!table.empty());
  52. return cluster + "." + table;
  53. }
  54. TString MakeTableDisplayName(NNodes::TExprBase table, bool isOutput) {
  55. TStringBuilder name;
  56. if (table.Maybe<TYtTable>()) {
  57. auto ytTable = table.Cast<TYtTable>();
  58. name << ytTable.Cluster().Value() << ".";
  59. if (NYql::HasSetting(ytTable.Settings().Ref(), EYtSettingType::Anonymous)) {
  60. name << ytTable.Name().Value();
  61. }
  62. else {
  63. name << '`' << ytTable.Name().Value() << '`';
  64. }
  65. auto epoch = isOutput ? ytTable.CommitEpoch() : ytTable.Epoch();
  66. if (auto epochVal = TEpochInfo::Parse(epoch.Ref()).GetOrElse(0)) {
  67. name << " #" << epochVal;
  68. }
  69. }
  70. else {
  71. name << "(tmp)";
  72. }
  73. return name;
  74. }
  75. } // NYql