replace_table_reads.cpp 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. #include "replace_table_reads.h"
  2. #include <yql/essentials/public/purecalc/common/names.h>
  3. #include <yql/essentials/public/purecalc/common/transformations/utils.h>
  4. #include <yql/essentials/core/yql_expr_optimize.h>
  5. #include <yql/essentials/core/yql_expr_type_annotation.h>
  6. using namespace NYql;
  7. using namespace NYql::NPureCalc;
  8. namespace {
  9. class TTableReadsReplacer: public TSyncTransformerBase {
  10. private:
  11. const TVector<const TStructExprType*>& InputStructs_;
  12. bool UseSystemColumns_;
  13. EProcessorMode ProcessorMode_;
  14. TString CallableName_;
  15. TString TablePrefix_;
  16. bool Complete_ = false;
  17. public:
  18. explicit TTableReadsReplacer(
  19. const TVector<const TStructExprType*>& inputStructs,
  20. bool useSystemColumns,
  21. EProcessorMode processorMode,
  22. TString inputNodeName,
  23. TString tablePrefix
  24. )
  25. : InputStructs_(inputStructs)
  26. , UseSystemColumns_(useSystemColumns)
  27. , ProcessorMode_(processorMode)
  28. , CallableName_(std::move(inputNodeName))
  29. , TablePrefix_(std::move(tablePrefix))
  30. {
  31. }
  32. TTableReadsReplacer(TVector<const TStructExprType*>&&, TString, TString) = delete;
  33. public:
  34. TStatus DoTransform(const TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  35. output = input;
  36. if (Complete_) {
  37. return TStatus::Ok;
  38. }
  39. TOptimizeExprSettings settings(nullptr);
  40. auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
  41. if (node->IsCallable(NNodes::TCoRight::CallableName())) {
  42. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  43. return new TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "At function: " << node->Content());
  44. });
  45. if (!EnsureMinArgsCount(*node, 1, ctx)) {
  46. return nullptr;
  47. }
  48. if (node->Child(0)->IsCallable(NNodes::TCoCons::CallableName())) {
  49. return node;
  50. }
  51. if (!node->Child(0)->IsCallable(NNodes::TCoRead::CallableName())) {
  52. ctx.AddError(TIssue(ctx.GetPosition(node->Child(0)->Pos()), TStringBuilder() << "Expected Read!"));
  53. return nullptr;
  54. }
  55. return BuildInputFromRead(node->Pos(), node->ChildPtr(0), ctx);
  56. } else if (node->IsCallable(NNodes::TCoLeft::CallableName())) {
  57. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  58. return new TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "At function: " << node->Content());
  59. });
  60. if (!EnsureMinArgsCount(*node, 1, ctx)) {
  61. return nullptr;
  62. }
  63. if (!node->Child(0)->IsCallable(NNodes::TCoRead::CallableName())) {
  64. ctx.AddError(TIssue(ctx.GetPosition(node->Child(0)->Pos()), TStringBuilder() << "Expected Read!"));
  65. return nullptr;
  66. }
  67. return node->Child(0)->HeadPtr();
  68. }
  69. return node;
  70. }, ctx, settings);
  71. if (status.Level == TStatus::Ok) {
  72. Complete_ = true;
  73. }
  74. return status;
  75. }
  76. void Rewind() override {
  77. Complete_ = false;
  78. }
  79. private:
  80. TExprNode::TPtr BuildInputFromRead(TPositionHandle replacePos, const TExprNode::TPtr& node, TExprContext& ctx) {
  81. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  82. return MakeIntrusive<TIssue>(ctx.GetPosition(node->Pos()), TStringBuilder() << "At function: " << node->Content());
  83. });
  84. if (!EnsureMinArgsCount(*node, 3, ctx)) {
  85. return nullptr;
  86. }
  87. const auto source = node->ChildPtr(2);
  88. if (source->IsCallable(NNodes::TCoKey::CallableName())) {
  89. return BuildInputFromKey(replacePos, source, ctx);
  90. }
  91. if (source->IsCallable("DataTables")) {
  92. return BuildInputFromDataTables(replacePos, source, ctx);
  93. }
  94. ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), TStringBuilder() << "Unsupported read source: " << source->Content()));
  95. return nullptr;
  96. }
  97. TExprNode::TPtr BuildInputFromKey(TPositionHandle replacePos, const TExprNode::TPtr& node, TExprContext& ctx) {
  98. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  99. return MakeIntrusive<TIssue>(ctx.GetPosition(node->Pos()), TStringBuilder() << "At function: " << node->Content());
  100. });
  101. ui32 inputIndex;
  102. TExprNode::TPtr inputTableName;
  103. if (!TryFetchInputIndexFromKey(node, ctx, inputIndex, inputTableName)) {
  104. return nullptr;
  105. }
  106. YQL_ENSURE(inputTableName->IsCallable(NNodes::TCoString::CallableName()));
  107. auto inputNode = ctx.Builder(replacePos)
  108. .Callable(CallableName_)
  109. .Atom(0, ToString(inputIndex))
  110. .Seal()
  111. .Build();
  112. if (inputNode->IsCallable(PurecalcBlockInputCallableName)) {
  113. const auto inputStruct = InputStructs_[inputIndex]->Cast<TStructExprType>();
  114. const auto blocksLambda = NodeFromBlocks(replacePos, inputStruct, ctx);
  115. bool wrapLMap = ProcessorMode_ == EProcessorMode::PullList;
  116. inputNode = ApplyToIterable(replacePos, inputNode, blocksLambda, wrapLMap, ctx);
  117. }
  118. if (UseSystemColumns_) {
  119. auto mapLambda = ctx.Builder(replacePos)
  120. .Lambda()
  121. .Param("row")
  122. .Callable(0, NNodes::TCoAddMember::CallableName())
  123. .Arg(0, "row")
  124. .Atom(1, PurecalcSysColumnTablePath)
  125. .Add(2, inputTableName)
  126. .Seal()
  127. .Seal()
  128. .Build();
  129. return ctx.Builder(replacePos)
  130. .Callable(NNodes::TCoMap::CallableName())
  131. .Add(0, std::move(inputNode))
  132. .Add(1, std::move(mapLambda))
  133. .Seal()
  134. .Build();
  135. }
  136. return inputNode;
  137. }
  138. TExprNode::TPtr BuildInputFromDataTables(TPositionHandle replacePos, const TExprNode::TPtr& node, TExprContext& ctx) {
  139. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  140. return MakeIntrusive<TIssue>(ctx.GetPosition(node->Pos()), TStringBuilder() << "At function: " << node->Content());
  141. });
  142. if (InputStructs_.empty()) {
  143. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "No inputs provided by input spec"));
  144. return nullptr;
  145. }
  146. if (!EnsureArgsCount(*node, 0, ctx)) {
  147. return nullptr;
  148. }
  149. auto builder = ctx.Builder(replacePos);
  150. if (InputStructs_.size() > 1) {
  151. auto listBuilder = builder.List();
  152. for (ui32 i = 0; i < InputStructs_.size(); ++i) {
  153. listBuilder.Callable(i, CallableName_).Atom(0, ToString(i)).Seal();
  154. }
  155. return listBuilder.Seal().Build();
  156. }
  157. return builder.Callable(CallableName_).Atom(0, "0").Seal().Build();
  158. }
  159. bool TryFetchInputIndexFromKey(const TExprNode::TPtr& node, TExprContext& ctx, ui32& resultIndex, TExprNode::TPtr& resultTableName) {
  160. if (!EnsureArgsCount(*node, 1, ctx)) {
  161. return false;
  162. }
  163. const auto* keyArg = node->Child(0);
  164. if (!keyArg->IsList() || keyArg->ChildrenSize() != 2 || !keyArg->Child(0)->IsAtom("table") ||
  165. !keyArg->Child(1)->IsCallable(NNodes::TCoString::CallableName()))
  166. {
  167. ctx.AddError(TIssue(ctx.GetPosition(keyArg->Pos()), "Expected single table name"));
  168. return false;
  169. }
  170. resultTableName = keyArg->ChildPtr(1);
  171. auto tableName = resultTableName->Child(0)->Content();
  172. if (!tableName.StartsWith(TablePrefix_)) {
  173. ctx.AddError(TIssue(ctx.GetPosition(resultTableName->Child(0)->Pos()),
  174. TStringBuilder() << "Invalid table name " << TString{tableName}.Quote() << ": prefix must be " << TablePrefix_.Quote()));
  175. return false;
  176. }
  177. tableName.SkipPrefix(TablePrefix_);
  178. if (!tableName) {
  179. resultIndex = 0;
  180. } else if (!TryFromString(tableName, resultIndex)) {
  181. ctx.AddError(TIssue(ctx.GetPosition(resultTableName->Child(0)->Pos()),
  182. TStringBuilder() << "Invalid table name " << TString{tableName}.Quote() << ": suffix must be UI32 number"));
  183. return false;
  184. }
  185. return true;
  186. }
  187. };
  188. }
  189. TAutoPtr<IGraphTransformer> NYql::NPureCalc::MakeTableReadsReplacer(
  190. const TVector<const TStructExprType*>& inputStructs,
  191. bool useSystemColumns,
  192. EProcessorMode processorMode,
  193. TString callableName,
  194. TString tablePrefix
  195. ) {
  196. return new TTableReadsReplacer(inputStructs, useSystemColumns, processorMode, std::move(callableName), std::move(tablePrefix));
  197. }