yql_opt_match_recognize.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. #include "yql_opt_match_recognize.h"
  2. #include "yql_opt_utils.h"
  3. #include <yql/essentials/core/sql_types/time_order_recover.h>
  4. #include <yql/essentials/core/yql_expr_optimize.h>
  5. #include <yql/essentials/utils/log/log.h>
  6. #include <ranges>
  7. namespace NYql {
  8. using namespace NNodes;
  9. namespace {
  10. bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) {
  11. switch (typeAnnCtx.MatchRecognizeStreaming) {
  12. case EMatchRecognizeStreamingMode::Disable:
  13. return false;
  14. case EMatchRecognizeStreamingMode::Force:
  15. return true;
  16. case EMatchRecognizeStreamingMode::Auto: {
  17. bool hasPq = false;
  18. NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node) {
  19. if (auto maybeDataSource = TExprBase(node).Maybe<TCoDataSource>()) {
  20. hasPq = maybeDataSource.Cast().Category().Value() == "pq";
  21. }
  22. return !hasPq;
  23. });
  24. return hasPq;
  25. }
  26. }
  27. }
  28. TExprNode::TPtr ExpandMatchRecognizeMeasuresAggregates(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& /* typeAnnCtx */) {
  29. const auto pos = node->Pos();
  30. const auto vars = node->Child(3);
  31. static constexpr size_t AggregatesLambdasStartPos = 4;
  32. static constexpr size_t MeasuresLambdasStartPos = 3;
  33. return ctx.Builder(pos)
  34. .Callable("MatchRecognizeMeasures")
  35. .Add(0, node->ChildPtr(0))
  36. .Add(1, node->ChildPtr(1))
  37. .Add(2, node->ChildPtr(2))
  38. .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
  39. for (size_t i = 0; i < vars->ChildrenSize(); ++i) {
  40. const auto var = vars->Child(i)->Content();
  41. const auto handler = node->ChildPtr(AggregatesLambdasStartPos + i);
  42. if (!var) {
  43. auto value = handler->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional
  44. ? ctx.Builder(pos).Callable("Just").Add(0, handler).Seal().Build()
  45. : handler;
  46. parent.Add(
  47. MeasuresLambdasStartPos + i,
  48. ctx.Builder(pos)
  49. .Lambda()
  50. .Param("data")
  51. .Param("vars")
  52. .Add(0, std::move(value))
  53. .Seal()
  54. .Build()
  55. );
  56. continue;
  57. }
  58. parent.Add(
  59. MeasuresLambdasStartPos + i,
  60. ctx.Builder(pos)
  61. .Lambda()
  62. .Param("data")
  63. .Param("vars")
  64. .Callable(0, "Member")
  65. .Callable(0, "Head")
  66. .Callable(0, "Aggregate")
  67. .Callable(0, "OrderedMap")
  68. .Callable(0, "OrderedFlatMap")
  69. .Callable(0, "Member")
  70. .Arg(0, "vars")
  71. .Atom(1, var)
  72. .Seal()
  73. .Lambda(1)
  74. .Param("item")
  75. .Callable(0, "ListFromRange")
  76. .Callable(0, "Member")
  77. .Arg(0, "item")
  78. .Atom(1, "From")
  79. .Seal()
  80. .Callable(1, "+MayWarn")
  81. .Callable(0, "Member")
  82. .Arg(0, "item")
  83. .Atom(1, "To")
  84. .Seal()
  85. .Callable(1, "Uint64")
  86. .Atom(0, "1")
  87. .Seal()
  88. .Seal()
  89. .Seal()
  90. .Seal()
  91. .Seal()
  92. .Lambda(1)
  93. .Param("index")
  94. .Callable(0, "Unwrap")
  95. .Callable(0, "Lookup")
  96. .Callable(0, "ToIndexDict")
  97. .Arg(0, "data")
  98. .Seal()
  99. .Arg(1, "index")
  100. .Seal()
  101. .Seal()
  102. .Seal()
  103. .Seal()
  104. .List(1).Seal()
  105. .List(2)
  106. .Add(0, handler)
  107. .Seal()
  108. .List(3).Seal()
  109. .Seal()
  110. .Seal()
  111. .Atom(1, handler->Child(0)->Content())
  112. .Seal()
  113. .Seal()
  114. .Build()
  115. );
  116. }
  117. return parent;
  118. })
  119. .Seal()
  120. .Build();
  121. }
  122. THashSet<TStringBuf> FindUsedVars(const TExprNode::TPtr& params) {
  123. THashSet<TStringBuf> result;
  124. const auto measures = params->Child(0);
  125. const auto measuresVars = measures->Child(3);
  126. for (const auto& var : measuresVars->Children()) {
  127. result.insert(var->Content());
  128. }
  129. const auto defines = params->Child(4);
  130. static constexpr size_t defineLambdasStartPos = 3;
  131. for (const auto& define : defines->Children() | std::views::drop(defineLambdasStartPos)) {
  132. const auto lambda = TCoLambda(define);
  133. const auto varsArg = lambda.Args().Arg(1).Ptr();
  134. const auto lambdaBody = lambda.Body().Ptr();
  135. NYql::VisitExpr(
  136. lambdaBody,
  137. [varsArg, &result](const TExprNode::TPtr& node) {
  138. if (auto maybeMember = TMaybeNode<TCoMember>(node);
  139. maybeMember && maybeMember.Cast().Struct().Ptr() == varsArg) {
  140. result.insert(maybeMember.Cast().Name().Value());
  141. return false;
  142. }
  143. return true;
  144. }
  145. );
  146. }
  147. return result;
  148. }
  149. TExprNode::TPtr MarkUnusedPatternVars(const TExprNode::TPtr& node, TExprContext& ctx, const THashSet<TStringBuf>& usedVars, const TExprNode::TPtr& rowsPerMatch) {
  150. const auto pos = node->Pos();
  151. if (node->ChildrenSize() == 6 && node->Child(0)->IsAtom()) {
  152. const auto varName = node->Child(0)->Content();
  153. const auto output = FromString<bool>(node->Child(4)->Content());
  154. const auto varUnused = ("RowsPerMatch_AllRows" != rowsPerMatch->Content() || !output) && !usedVars.contains(varName);
  155. return Build<TExprList>(ctx, pos)
  156. .Add(node->ChildPtr(0))
  157. .Add(node->ChildPtr(1))
  158. .Add(node->ChildPtr(2))
  159. .Add(node->ChildPtr(3))
  160. .Add(node->ChildPtr(4))
  161. .Add<TCoAtom>().Build(ToString(varUnused))
  162. .Done()
  163. .Ptr();
  164. }
  165. TExprNode::TListType newChildren;
  166. for (const auto& child : node->Children()) {
  167. newChildren.push_back(MarkUnusedPatternVars(child, ctx, usedVars, rowsPerMatch));
  168. }
  169. return ctx.ChangeChildren(*node, std::move(newChildren));
  170. }
  171. } // anonymous namespace
  172. TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
  173. YQL_CLOG(DEBUG, Core) << "Expand " << node->Content();
  174. TCoMatchRecognize matchRecognize(node);
  175. const auto input = matchRecognize.Input().Ptr();
  176. const auto partitionKeySelector = matchRecognize.PartitionKeySelector().Ptr();
  177. const auto partitionColumns = matchRecognize.PartitionColumns().Ptr();
  178. const auto sortTraits = matchRecognize.SortTraits().Ptr();
  179. const auto params = matchRecognize.Params().Ptr();
  180. const auto pos = matchRecognize.Pos();
  181. const auto isStreaming = IsStreaming(input, typeAnnCtx);
  182. auto newInput = Build<TCoLambda>(ctx, pos)
  183. .Args({"partition"})
  184. .Body<TCoToFlow>()
  185. .Input("partition")
  186. .Build()
  187. .Done()
  188. .Ptr();
  189. TExprNode::TPtr sortKey;
  190. TExprNode::TPtr sortOrder;
  191. ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
  192. auto timeOrderRecover = [&]() -> TExprNode::TPtr {
  193. if (!isStreaming) {
  194. return newInput;
  195. }
  196. switch (sortOrder->ChildrenSize()) {
  197. case 0:
  198. return newInput;
  199. case 1: {
  200. auto timeOrderRecover = ctx.Builder(pos)
  201. .Lambda()
  202. .Param("partition")
  203. .Callable("TimeOrderRecover")
  204. .Apply(0, std::move(newInput))
  205. .With(0, "partition")
  206. .Seal()
  207. .Add(1, sortKey)
  208. .Callable(2, "Interval")
  209. .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverDelay))
  210. .Seal()
  211. .Callable(3, "Interval")
  212. .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverAhead))
  213. .Seal()
  214. .Callable(4, "Uint32")
  215. .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverRowLimit))
  216. .Seal()
  217. .Seal()
  218. .Seal()
  219. .Build();
  220. return Build<TCoLambda>(ctx, pos)
  221. .Args({"partition"})
  222. .Body<TCoOrderedMap>()
  223. .Input<TExprApplier>()
  224. .Apply(TCoLambda(timeOrderRecover))
  225. .With(0, "partition")
  226. .Build()
  227. .Lambda<TCoLambda>()
  228. .Args({"row"})
  229. .Body<TCoRemoveMember>()
  230. .Struct("row")
  231. .Name<TCoAtom>().Build(NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER)
  232. .Build()
  233. .Build()
  234. .Build()
  235. .Done()
  236. .Ptr();
  237. }
  238. default:
  239. ctx.AddError(TIssue(ctx.GetPosition(sortTraits->Pos()), "Expect ORDER BY timestamp for MATCH_RECOGNIZE"));
  240. return {};
  241. }
  242. }();
  243. auto measures = ExpandMatchRecognizeMeasuresAggregates(params->ChildPtr(0), ctx, typeAnnCtx);
  244. auto rowsPerMatch = params->ChildPtr(1);
  245. const auto usedVars = FindUsedVars(params);
  246. auto pattern = MarkUnusedPatternVars(params->ChildPtr(3), ctx, usedVars, rowsPerMatch);
  247. auto settings = AddSetting(*ctx.NewList(pos, {}), pos, "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx);
  248. auto newMatchRecognize = ctx.Builder(pos)
  249. .Lambda()
  250. .Param("partition")
  251. .Callable("MatchRecognizeCore")
  252. .Apply(0, std::move(timeOrderRecover))
  253. .With(0, "partition")
  254. .Seal()
  255. .Add(1, partitionKeySelector)
  256. .Add(2, partitionColumns)
  257. .Callable(3, params->Content())
  258. .Add(0, std::move(measures))
  259. .Add(1, std::move(rowsPerMatch))
  260. .Add(2, params->ChildPtr(2))
  261. .Add(3, std::move(pattern))
  262. .Add(4, params->ChildPtr(4))
  263. .Seal()
  264. .Add(4, std::move(settings))
  265. .Seal()
  266. .Seal()
  267. .Build();
  268. auto lambda = Build<TCoLambda>(ctx, pos)
  269. .Args({"partition"})
  270. .Body<TCoForwardList>()
  271. .Stream<TExprApplier>()
  272. .Apply(TCoLambda(newMatchRecognize))
  273. .With(0, "partition")
  274. .Build()
  275. .Build()
  276. .Done()
  277. .Ptr();
  278. if (isStreaming) {
  279. TExprNode::TPtr keySelector;
  280. if (partitionColumns->ChildrenSize() != 0) {
  281. keySelector = std::move(partitionKeySelector);
  282. } else {
  283. // Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage
  284. // TODO(zverevgeny): fixme
  285. keySelector = Build<TCoLambda>(ctx, pos)
  286. .Args({"row"})
  287. .Body(MakeBool<true>(pos, ctx))
  288. .Done()
  289. .Ptr();
  290. }
  291. return Build<TCoShuffleByKeys>(ctx, pos)
  292. .Input(std::move(input))
  293. .KeySelectorLambda(std::move(keySelector))
  294. .ListHandlerLambda(std::move(lambda))
  295. .Done()
  296. .Ptr();
  297. } else { // non-streaming
  298. return Build<TCoPartitionsByKeys>(ctx, pos)
  299. .Input(std::move(input))
  300. .KeySelectorLambda(std::move(partitionKeySelector))
  301. .SortDirections(std::move(sortOrder))
  302. .SortKeySelectorLambda(std::move(sortKey))
  303. .ListHandlerLambda(std::move(lambda))
  304. .Done()
  305. .Ptr();
  306. }
  307. }
  308. } // namespace NYql