yql_opt_match_recognize.cpp 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. #include "yql_opt_match_recognize.h"
  2. #include "yql_opt_utils.h"
  3. #include <yql/essentials/core/sql_types/time_order_recover.h>
  4. #include <yql/essentials/core/yql_expr_optimize.h>
  5. #include <yql/essentials/utils/log/log.h>
  6. namespace NYql {
  7. using namespace NNodes;
  8. namespace {
  9. bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) {
  10. if (EMatchRecognizeStreamingMode::Disable == typeAnnCtx.MatchRecognizeStreaming){
  11. return false;
  12. }
  13. if (EMatchRecognizeStreamingMode::Force == typeAnnCtx.MatchRecognizeStreaming){
  14. return true;
  15. }
  16. YQL_ENSURE(EMatchRecognizeStreamingMode::Auto == typeAnnCtx.MatchRecognizeStreaming, "Internal logic error");
  17. bool hasPq = false;
  18. NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node){
  19. if (node->IsCallable("DataSource")) {
  20. YQL_ENSURE(node->ChildrenSize() > 0 and node->ChildRef(0)->IsAtom());
  21. hasPq = node->ChildRef(0)->Content() == "pq";
  22. }
  23. return !hasPq;
  24. });
  25. return hasPq;
  26. }
  27. } //namespace
  28. // returns std::nullopt if all vars could be used
  29. std::optional<TSet<TStringBuf>> FindUsedVars(const TExprNode::TPtr& params) {
  30. TSet<TStringBuf> usedVars;
  31. bool allVarsUsed = false;
  32. const auto createVisitor = [&usedVars, &allVarsUsed](const TExprNode::TPtr& varsArg) {
  33. return [&varsArg, &usedVars, &allVarsUsed](const TExprNode::TPtr& node) -> bool {
  34. if (node->IsCallable("Member")) {
  35. if (node->ChildRef(0) == varsArg) {
  36. usedVars.insert(node->ChildRef(1)->Content());
  37. return false;
  38. }
  39. }
  40. if (node == varsArg) {
  41. allVarsUsed = true;
  42. }
  43. return true;
  44. };
  45. };
  46. const auto& measures = params->ChildRef(0);
  47. static constexpr size_t measureLambdasStartPos = 3;
  48. for (size_t pos = measureLambdasStartPos; pos != measures->ChildrenSize(); pos++) {
  49. const auto& lambda = measures->ChildRef(pos);
  50. const auto& lambdaArgs = lambda->ChildRef(0);
  51. const auto& lambdaBody = lambda->ChildRef(1);
  52. const auto& varsArg = lambdaArgs->ChildRef(1);
  53. NYql::VisitExpr(lambdaBody, createVisitor(varsArg));
  54. }
  55. const auto& defines = params->ChildRef(4);
  56. static constexpr size_t defineLambdasStartPos = 3;
  57. for (size_t pos = defineLambdasStartPos; pos != defines->ChildrenSize(); pos++) {
  58. const auto& lambda = defines->ChildRef(pos);
  59. const auto& lambdaArgs = lambda->ChildRef(0);
  60. const auto& lambdaBody = lambda->ChildRef(1);
  61. const auto& varsArg = lambdaArgs->ChildRef(1);
  62. NYql::VisitExpr(lambdaBody, createVisitor(varsArg));
  63. }
  64. return allVarsUsed ? std::nullopt : std::make_optional(usedVars);
  65. }
  66. // usedVars can be std::nullopt if all vars could probably be used
  67. TExprNode::TPtr MarkUnusedPatternVars(const TExprNode::TPtr& node, TExprContext& ctx, const std::optional<TSet<TStringBuf>> &usedVars) {
  68. const auto pos = node->Pos();
  69. if (node->ChildrenSize() != 0 && node->ChildRef(0)->IsAtom()) {
  70. const auto& varName = node->ChildRef(0)->Content();
  71. bool varUsed = !usedVars.has_value() || usedVars.value().contains(varName);
  72. return ctx.Builder(pos)
  73. .List()
  74. .Add(0, node->ChildRef(0))
  75. .Add(1, node->ChildRef(1))
  76. .Add(2, node->ChildRef(2))
  77. .Add(3, node->ChildRef(3))
  78. .Add(4, node->ChildRef(4))
  79. .Add(5, ctx.NewAtom(pos, varUsed ? "0" : "1"))
  80. .Seal()
  81. .Build();
  82. }
  83. TExprNodeList newChildren;
  84. for (size_t chPos = 0; chPos != node->ChildrenSize(); chPos++) {
  85. newChildren.push_back(MarkUnusedPatternVars(node->ChildRef(chPos), ctx, usedVars));
  86. }
  87. if (node->IsCallable()) {
  88. return ctx.Builder(pos).Callable(node->Content()).Add(std::move(newChildren)).Seal().Build();
  89. } else if (node->IsList()) {
  90. return ctx.Builder(pos).List().Add(std::move(newChildren)).Seal().Build();
  91. } else { // Atom
  92. return node;
  93. }
  94. }
  95. TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
  96. YQL_ENSURE(node->IsCallable({"MatchRecognize"}));
  97. const auto& input = node->ChildRef(0);
  98. const auto& partitionKeySelector = node->ChildRef(1);
  99. const auto& partitionColumns = node->ChildRef(2);
  100. const auto& sortTraits = node->ChildRef(3);
  101. const auto& params = node->ChildRef(4);
  102. const auto pos = node->Pos();
  103. const bool isStreaming = IsStreaming(input, typeAnnCtx);
  104. TExprNode::TPtr settings = AddSetting(*ctx.NewList(pos, {}), pos,
  105. "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx);
  106. const auto matchRecognize = ctx.Builder(pos)
  107. .Lambda()
  108. .Param("sortedPartition")
  109. .Callable(0, "ForwardList")
  110. .Callable(0, "MatchRecognizeCore")
  111. .Callable(0, "ToFlow")
  112. .Arg(0, "sortedPartition")
  113. .Seal()
  114. .Add(1, partitionKeySelector)
  115. .Add(2, partitionColumns)
  116. .Callable(3, params->Content())
  117. .Add(0, params->ChildRef(0))
  118. .Add(1, params->ChildRef(1))
  119. .Add(2, params->ChildRef(2))
  120. .Add(3, MarkUnusedPatternVars(params->ChildRef(3), ctx, FindUsedVars(params)))
  121. .Add(4, params->ChildRef(4))
  122. .Seal()
  123. .Add(4, settings)
  124. .Seal()
  125. .Seal()
  126. .Seal()
  127. .Build();
  128. TExprNode::TPtr sortKey;
  129. TExprNode::TPtr sortOrder;
  130. ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
  131. TExprNode::TPtr result;
  132. if (isStreaming) {
  133. YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE");
  134. const auto reordered = ctx.Builder(pos)
  135. .Lambda()
  136. .Param("partition")
  137. .Callable("ForwardList")
  138. .Callable(0, "OrderedMap")
  139. .Callable(0, "TimeOrderRecover")
  140. .Callable(0, "ToFlow").
  141. Arg(0, "partition")
  142. .Seal()
  143. .Add(1, sortKey)
  144. .Callable(2, "Interval")
  145. .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverDelay)))
  146. .Seal()
  147. .Callable(3, "Interval")
  148. .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverAhead)))
  149. .Seal()
  150. .Callable(4, "Uint32")
  151. .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverRowLimit)))
  152. .Seal()
  153. .Seal()
  154. .Lambda(1)
  155. .Param("row")
  156. .Callable("RemoveMember")
  157. .Arg(0, "row")
  158. .Add(1, ctx.NewAtom(pos, NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER))
  159. .Seal()
  160. .Seal()
  161. .Seal()
  162. .Seal()
  163. .Seal()
  164. .Build();
  165. const auto matchRecognizeOnReorderedPartition = ctx.Builder(pos)
  166. .Lambda()
  167. .Param("partition")
  168. .Apply(matchRecognize)
  169. .With(0)
  170. .Apply(reordered)
  171. .With(0)
  172. .Arg("partition")
  173. .Done()
  174. .Seal()
  175. .Done()
  176. .Seal()
  177. .Seal()
  178. .Build();
  179. TExprNode::TPtr keySelector;
  180. if (partitionColumns->ChildrenSize() != 0) {
  181. keySelector = partitionKeySelector;
  182. } else {
  183. //Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage
  184. //TODO(zverevgeny): fixme
  185. keySelector = ctx.Builder(pos)
  186. .Lambda()
  187. .Param("row")
  188. .Callable("Bool")
  189. .Add(0, ctx.NewAtom(pos, "true"))
  190. .Seal()
  191. .Seal()
  192. .Build();
  193. }
  194. result = ctx.Builder(pos)
  195. .Callable("ShuffleByKeys")
  196. .Add(0, input)
  197. .Add(1, keySelector)
  198. .Add(2, matchRecognizeOnReorderedPartition)
  199. .Seal()
  200. .Build();
  201. } else { //non-streaming
  202. result = ctx.Builder(pos)
  203. .Callable("PartitionsByKeys")
  204. .Add(0, input)
  205. .Add(1, partitionKeySelector)
  206. .Add(2, sortOrder)
  207. .Add(3, sortKey)
  208. .Add(4, matchRecognize)
  209. .Seal()
  210. .Build();
  211. }
  212. YQL_CLOG(INFO, Core) << "Expanded MatchRecognize";
  213. return result;
  214. }
  215. } //namespace NYql