yql_opt_match_recognize.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. #include "yql_opt_match_recognize.h"
  2. #include "yql_opt_utils.h"
  3. #include <yql/essentials/core/sql_types/time_order_recover.h>
  4. #include <yql/essentials/core/yql_expr_optimize.h>
  5. #include <yql/essentials/utils/log/log.h>
  6. #include <ranges>
  7. namespace NYql {
  8. using namespace NNodes;
  9. namespace {
  10. bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) {
  11. switch (typeAnnCtx.MatchRecognizeStreaming) {
  12. case EMatchRecognizeStreamingMode::Disable:
  13. return false;
  14. case EMatchRecognizeStreamingMode::Force:
  15. return true;
  16. case EMatchRecognizeStreamingMode::Auto: {
  17. bool hasPq = false;
  18. NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node) {
  19. if (auto maybeDataSource = TExprBase(node).Maybe<TCoDataSource>()) {
  20. hasPq = maybeDataSource.Cast().Category().Value() == "pq";
  21. }
  22. return !hasPq;
  23. });
  24. return hasPq;
  25. }
  26. }
  27. }
  28. TExprNode::TPtr ExpandMatchRecognizeMeasuresCallables(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& /* typeAnnCtx */) {
  29. YQL_CLOG(DEBUG, Core) << "Expand " << node->Content();
  30. static constexpr size_t MeasuresLambdasStartPos = 3;
  31. return ctx.Builder(node->Pos())
  32. .Callable("MatchRecognizeMeasures")
  33. .Add(0, node->ChildPtr(0))
  34. .Add(1, node->ChildPtr(1))
  35. .Add(2, node->ChildPtr(2))
  36. .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
  37. const auto aggregatesItems = node->Child(3);
  38. for (size_t i = 0; i < aggregatesItems->ChildrenSize(); ++i) {
  39. const auto item = aggregatesItems->Child(i);
  40. auto lambda = item->ChildPtr(0);
  41. const auto vars = item->Child(1);
  42. const auto aggregates = item->Child(2);
  43. parent.Lambda(MeasuresLambdasStartPos + i, lambda->Pos())
  44. .Param("data")
  45. .Param("vars")
  46. .Apply(std::move(lambda))
  47. .With(0)
  48. .Callable("FlattenMembers")
  49. .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
  50. for (size_t i = 0; i < aggregates->ChildrenSize(); ++i) {
  51. const auto var = vars->Child(i)->Content();
  52. auto aggregate = aggregates->Child(i);
  53. parent
  54. .List(i)
  55. .Atom(0, "")
  56. .Callable(1, "Head")
  57. .Callable(0, "Aggregate")
  58. .Callable(0, "OrderedMap")
  59. .Callable(0, "OrderedFlatMap")
  60. .Callable(0, "Member")
  61. .Arg(0, "vars")
  62. .Atom(1, var)
  63. .Seal()
  64. .Lambda(1)
  65. .Param("item")
  66. .Callable(0, "ListFromRange")
  67. .Callable(0, "Member")
  68. .Arg(0, "item")
  69. .Atom(1, "From")
  70. .Seal()
  71. .Callable(1, "+MayWarn")
  72. .Callable(0, "Member")
  73. .Arg(0, "item")
  74. .Atom(1, "To")
  75. .Seal()
  76. .Callable(1, "Uint64")
  77. .Atom(0, "1")
  78. .Seal()
  79. .Seal()
  80. .Seal()
  81. .Seal()
  82. .Seal()
  83. .Lambda(1)
  84. .Param("index")
  85. .Callable(0, "Unwrap")
  86. .Callable(0, "Lookup")
  87. .Callable(0, "ToIndexDict")
  88. .Arg(0, "data")
  89. .Seal()
  90. .Arg(1, "index")
  91. .Seal()
  92. .Seal()
  93. .Seal()
  94. .Seal()
  95. .List(1).Seal()
  96. .List(2)
  97. .Add(0, std::move(aggregate))
  98. .Seal()
  99. .List(3).Seal()
  100. .Seal()
  101. .Seal()
  102. .Seal();
  103. }
  104. return parent;
  105. })
  106. .Seal()
  107. .Done()
  108. .Seal()
  109. .Seal();
  110. }
  111. return parent;
  112. })
  113. .Seal()
  114. .Build();
  115. }
  116. std::unordered_set<std::string_view> FindUsedVars(const TExprNode::TPtr& params) {
  117. std::unordered_set<std::string_view> result;
  118. const auto measures = params->Child(0);
  119. const auto callablesItems = measures->Child(3);
  120. for (const auto& item : callablesItems->Children()) {
  121. const auto vars = item->Child(1);
  122. for (const auto& var : vars->Children()) {
  123. result.insert(var->Content());
  124. }
  125. }
  126. const auto defines = params->Child(4);
  127. static constexpr size_t defineLambdasStartPos = 3;
  128. for (const auto& define : defines->Children() | std::views::drop(defineLambdasStartPos)) {
  129. const auto lambda = TCoLambda(define);
  130. const auto varsArg = lambda.Args().Arg(1).Ptr();
  131. const auto lambdaBody = lambda.Body().Ptr();
  132. NYql::VisitExpr(
  133. lambdaBody,
  134. [varsArg, &result](const TExprNode::TPtr& node) {
  135. if (auto maybeMember = TMaybeNode<TCoMember>(node);
  136. maybeMember && maybeMember.Cast().Struct().Ptr() == varsArg) {
  137. result.insert(maybeMember.Cast().Name().Value());
  138. return false;
  139. }
  140. return true;
  141. }
  142. );
  143. }
  144. return result;
  145. }
  146. TExprNode::TPtr MarkUnusedPatternVars(const TExprNode::TPtr& node, TExprContext& ctx, const std::unordered_set<std::string_view>& usedVars, const TExprNode::TPtr& rowsPerMatch) {
  147. const auto pos = node->Pos();
  148. if (node->ChildrenSize() == 6 && node->Child(0)->IsAtom()) {
  149. const auto varName = node->Child(0)->Content();
  150. const auto output = FromString<bool>(node->Child(4)->Content());
  151. const auto varUnused = ("RowsPerMatch_AllRows" != rowsPerMatch->Content() || !output) && !usedVars.contains(varName);
  152. return Build<TExprList>(ctx, pos)
  153. .Add(node->ChildPtr(0))
  154. .Add(node->ChildPtr(1))
  155. .Add(node->ChildPtr(2))
  156. .Add(node->ChildPtr(3))
  157. .Add(node->ChildPtr(4))
  158. .Add<TCoAtom>().Build(ToString(varUnused))
  159. .Done()
  160. .Ptr();
  161. }
  162. TExprNode::TListType newChildren;
  163. for (const auto& child : node->Children()) {
  164. newChildren.push_back(MarkUnusedPatternVars(child, ctx, usedVars, rowsPerMatch));
  165. }
  166. return ctx.ChangeChildren(*node, std::move(newChildren));
  167. }
  168. } // anonymous namespace
  169. TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
  170. YQL_CLOG(DEBUG, Core) << "Expand " << node->Content();
  171. TCoMatchRecognize matchRecognize(node);
  172. const auto input = matchRecognize.Input().Ptr();
  173. const auto partitionKeySelector = matchRecognize.PartitionKeySelector().Ptr();
  174. const auto partitionColumns = matchRecognize.PartitionColumns().Ptr();
  175. const auto sortTraits = matchRecognize.SortTraits().Ptr();
  176. const auto params = matchRecognize.Params().Ptr();
  177. const auto pos = matchRecognize.Pos();
  178. const auto isStreaming = IsStreaming(input, typeAnnCtx);
  179. auto newInput = Build<TCoLambda>(ctx, pos)
  180. .Args({"partition"})
  181. .Body<TCoToFlow>()
  182. .Input("partition")
  183. .Build()
  184. .Done()
  185. .Ptr();
  186. TExprNode::TPtr sortKey;
  187. TExprNode::TPtr sortOrder;
  188. ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
  189. auto timeOrderRecover = [&]() -> TExprNode::TPtr {
  190. if (!isStreaming) {
  191. return newInput;
  192. }
  193. switch (sortOrder->ChildrenSize()) {
  194. case 0:
  195. return newInput;
  196. case 1: {
  197. auto timeOrderRecover = ctx.Builder(pos)
  198. .Lambda()
  199. .Param("partition")
  200. .Callable("TimeOrderRecover")
  201. .Apply(0, std::move(newInput))
  202. .With(0, "partition")
  203. .Seal()
  204. .Add(1, sortKey)
  205. .Callable(2, "Interval")
  206. .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverDelay))
  207. .Seal()
  208. .Callable(3, "Interval")
  209. .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverAhead))
  210. .Seal()
  211. .Callable(4, "Uint32")
  212. .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverRowLimit))
  213. .Seal()
  214. .Seal()
  215. .Seal()
  216. .Build();
  217. return Build<TCoLambda>(ctx, pos)
  218. .Args({"partition"})
  219. .Body<TCoOrderedMap>()
  220. .Input<TExprApplier>()
  221. .Apply(TCoLambda(timeOrderRecover))
  222. .With(0, "partition")
  223. .Build()
  224. .Lambda<TCoLambda>()
  225. .Args({"row"})
  226. .Body<TCoRemoveMember>()
  227. .Struct("row")
  228. .Name<TCoAtom>().Build(NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER)
  229. .Build()
  230. .Build()
  231. .Build()
  232. .Done()
  233. .Ptr();
  234. }
  235. default:
  236. ctx.AddError(TIssue(ctx.GetPosition(sortTraits->Pos()), "Expected no ORDER BY or ORDER BY timestamp for MATCH_RECOGNIZE"));
  237. return {};
  238. }
  239. }();
  240. if (!timeOrderRecover) {
  241. return {};
  242. }
  243. auto measures = ExpandMatchRecognizeMeasuresCallables(params->ChildPtr(0), ctx, typeAnnCtx);
  244. auto rowsPerMatch = params->ChildPtr(1);
  245. const auto usedVars = FindUsedVars(params);
  246. auto pattern = MarkUnusedPatternVars(params->ChildPtr(3), ctx, usedVars, rowsPerMatch);
  247. auto settings = AddSetting(*ctx.NewList(pos, {}), pos, "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx);
  248. auto newMatchRecognize = ctx.Builder(pos)
  249. .Lambda()
  250. .Param("partition")
  251. .Callable("MatchRecognizeCore")
  252. .Apply(0, std::move(timeOrderRecover))
  253. .With(0, "partition")
  254. .Seal()
  255. .Add(1, partitionKeySelector)
  256. .Add(2, partitionColumns)
  257. .Callable(3, params->Content())
  258. .Add(0, std::move(measures))
  259. .Add(1, std::move(rowsPerMatch))
  260. .Add(2, params->ChildPtr(2))
  261. .Add(3, std::move(pattern))
  262. .Add(4, params->ChildPtr(4))
  263. .Seal()
  264. .Add(4, std::move(settings))
  265. .Seal()
  266. .Seal()
  267. .Build();
  268. auto lambda = Build<TCoLambda>(ctx, pos)
  269. .Args({"partition"})
  270. .Body<TCoForwardList>()
  271. .Stream<TExprApplier>()
  272. .Apply(TCoLambda(newMatchRecognize))
  273. .With(0, "partition")
  274. .Build()
  275. .Build()
  276. .Done()
  277. .Ptr();
  278. if (isStreaming) {
  279. TExprNode::TPtr keySelector;
  280. if (partitionColumns->ChildrenSize() != 0) {
  281. keySelector = std::move(partitionKeySelector);
  282. } else {
  283. // Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage
  284. // TODO(zverevgeny): fixme
  285. keySelector = Build<TCoLambda>(ctx, pos)
  286. .Args({"row"})
  287. .Body(MakeBool<true>(pos, ctx))
  288. .Done()
  289. .Ptr();
  290. }
  291. return Build<TCoShuffleByKeys>(ctx, pos)
  292. .Input(std::move(input))
  293. .KeySelectorLambda(std::move(keySelector))
  294. .ListHandlerLambda(std::move(lambda))
  295. .Done()
  296. .Ptr();
  297. } else { // non-streaming
  298. return Build<TCoPartitionsByKeys>(ctx, pos)
  299. .Input(std::move(input))
  300. .KeySelectorLambda(std::move(partitionKeySelector))
  301. .SortDirections(std::move(sortOrder))
  302. .SortKeySelectorLambda(std::move(sortKey))
  303. .ListHandlerLambda(std::move(lambda))
  304. .Done()
  305. .Ptr();
  306. }
  307. }
  308. } // namespace NYql