#include "yql_opt_match_recognize.h" #include "yql_opt_utils.h" #include #include #include #include namespace NYql { using namespace NNodes; namespace { bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) { switch (typeAnnCtx.MatchRecognizeStreaming) { case EMatchRecognizeStreamingMode::Disable: return false; case EMatchRecognizeStreamingMode::Force: return true; case EMatchRecognizeStreamingMode::Auto: { bool hasPq = false; NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node) { if (auto maybeDataSource = TExprBase(node).Maybe()) { hasPq = maybeDataSource.Cast().Category().Value() == "pq"; } return !hasPq; }); return hasPq; } } } TExprNode::TPtr ExpandMatchRecognizeMeasuresAggregates(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& /* typeAnnCtx */) { const auto pos = node->Pos(); const auto vars = node->Child(3); static constexpr size_t AggregatesLambdasStartPos = 4; static constexpr size_t MeasuresLambdasStartPos = 3; return ctx.Builder(pos) .Callable("MatchRecognizeMeasures") .Add(0, node->ChildPtr(0)) .Add(1, node->ChildPtr(1)) .Add(2, node->ChildPtr(2)) .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { for (size_t i = 0; i < vars->ChildrenSize(); ++i) { const auto var = vars->Child(i)->Content(); const auto handler = node->ChildPtr(AggregatesLambdasStartPos + i); if (!var) { auto value = handler->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional ? ctx.Builder(pos).Callable("Just").Add(0, handler).Seal().Build() : handler; parent.Add( MeasuresLambdasStartPos + i, ctx.Builder(pos) .Lambda() .Param("data") .Param("vars") .Add(0, std::move(value)) .Seal() .Build() ); continue; } parent.Add( MeasuresLambdasStartPos + i, ctx.Builder(pos) .Lambda() .Param("data") .Param("vars") .Callable(0, "Member") .Callable(0, "Head") .Callable(0, "Aggregate") .Callable(0, "OrderedMap") .Callable(0, "OrderedFlatMap") .Callable(0, "Member") .Arg(0, "vars") .Atom(1, var) .Seal() .Lambda(1) .Param("item") .Callable(0, "ListFromRange") .Callable(0, "Member") .Arg(0, "item") .Atom(1, "From") .Seal() .Callable(1, "+MayWarn") .Callable(0, "Member") .Arg(0, "item") .Atom(1, "To") .Seal() .Callable(1, "Uint64") .Atom(0, "1") .Seal() .Seal() .Seal() .Seal() .Seal() .Lambda(1) .Param("index") .Callable(0, "Unwrap") .Callable(0, "Lookup") .Callable(0, "ToIndexDict") .Arg(0, "data") .Seal() .Arg(1, "index") .Seal() .Seal() .Seal() .Seal() .List(1).Seal() .List(2) .Add(0, handler) .Seal() .List(3).Seal() .Seal() .Seal() .Atom(1, handler->Child(0)->Content()) .Seal() .Seal() .Build() ); } return parent; }) .Seal() .Build(); } THashSet FindUsedVars(const TExprNode::TPtr& params) { THashSet result; const auto measures = params->Child(0); const auto measuresVars = measures->Child(3); for (const auto& var : measuresVars->Children()) { result.insert(var->Content()); } const auto defines = params->Child(4); static constexpr size_t defineLambdasStartPos = 3; for (const auto& define : defines->Children() | std::views::drop(defineLambdasStartPos)) { const auto lambda = TCoLambda(define); const auto varsArg = lambda.Args().Arg(1).Ptr(); const auto lambdaBody = lambda.Body().Ptr(); NYql::VisitExpr( lambdaBody, [varsArg, &result](const TExprNode::TPtr& node) { if (auto maybeMember = TMaybeNode(node); maybeMember && maybeMember.Cast().Struct().Ptr() == varsArg) { result.insert(maybeMember.Cast().Name().Value()); return false; } return true; } ); } return result; } TExprNode::TPtr MarkUnusedPatternVars(const TExprNode::TPtr& node, TExprContext& ctx, const THashSet& usedVars, const TExprNode::TPtr& rowsPerMatch) { const auto pos = node->Pos(); if (node->ChildrenSize() == 6 && node->Child(0)->IsAtom()) { const auto varName = node->Child(0)->Content(); const auto output = FromString(node->Child(4)->Content()); const auto varUnused = ("RowsPerMatch_AllRows" != rowsPerMatch->Content() || !output) && !usedVars.contains(varName); return Build(ctx, pos) .Add(node->ChildPtr(0)) .Add(node->ChildPtr(1)) .Add(node->ChildPtr(2)) .Add(node->ChildPtr(3)) .Add(node->ChildPtr(4)) .Add().Build(ToString(varUnused)) .Done() .Ptr(); } TExprNode::TListType newChildren; for (const auto& child : node->Children()) { newChildren.push_back(MarkUnusedPatternVars(child, ctx, usedVars, rowsPerMatch)); } return ctx.ChangeChildren(*node, std::move(newChildren)); } } // anonymous namespace TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) { YQL_CLOG(DEBUG, Core) << "Expand " << node->Content(); TCoMatchRecognize matchRecognize(node); const auto input = matchRecognize.Input().Ptr(); const auto partitionKeySelector = matchRecognize.PartitionKeySelector().Ptr(); const auto partitionColumns = matchRecognize.PartitionColumns().Ptr(); const auto sortTraits = matchRecognize.SortTraits().Ptr(); const auto params = matchRecognize.Params().Ptr(); const auto pos = matchRecognize.Pos(); const auto isStreaming = IsStreaming(input, typeAnnCtx); auto newInput = Build(ctx, pos) .Args({"partition"}) .Body() .Input("partition") .Build() .Done() .Ptr(); TExprNode::TPtr sortKey; TExprNode::TPtr sortOrder; ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx); auto timeOrderRecover = [&]() -> TExprNode::TPtr { if (!isStreaming) { return newInput; } switch (sortOrder->ChildrenSize()) { case 0: return newInput; case 1: { auto timeOrderRecover = ctx.Builder(pos) .Lambda() .Param("partition") .Callable("TimeOrderRecover") .Apply(0, std::move(newInput)) .With(0, "partition") .Seal() .Add(1, sortKey) .Callable(2, "Interval") .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverDelay)) .Seal() .Callable(3, "Interval") .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverAhead)) .Seal() .Callable(4, "Uint32") .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverRowLimit)) .Seal() .Seal() .Seal() .Build(); return Build(ctx, pos) .Args({"partition"}) .Body() .Input() .Apply(TCoLambda(timeOrderRecover)) .With(0, "partition") .Build() .Lambda() .Args({"row"}) .Body() .Struct("row") .Name().Build(NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER) .Build() .Build() .Build() .Done() .Ptr(); } default: ctx.AddError(TIssue(ctx.GetPosition(sortTraits->Pos()), "Expect ORDER BY timestamp for MATCH_RECOGNIZE")); return {}; } }(); auto measures = ExpandMatchRecognizeMeasuresAggregates(params->ChildPtr(0), ctx, typeAnnCtx); auto rowsPerMatch = params->ChildPtr(1); const auto usedVars = FindUsedVars(params); auto pattern = MarkUnusedPatternVars(params->ChildPtr(3), ctx, usedVars, rowsPerMatch); auto settings = AddSetting(*ctx.NewList(pos, {}), pos, "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx); auto newMatchRecognize = ctx.Builder(pos) .Lambda() .Param("partition") .Callable("MatchRecognizeCore") .Apply(0, std::move(timeOrderRecover)) .With(0, "partition") .Seal() .Add(1, partitionKeySelector) .Add(2, partitionColumns) .Callable(3, params->Content()) .Add(0, std::move(measures)) .Add(1, std::move(rowsPerMatch)) .Add(2, params->ChildPtr(2)) .Add(3, std::move(pattern)) .Add(4, params->ChildPtr(4)) .Seal() .Add(4, std::move(settings)) .Seal() .Seal() .Build(); auto lambda = Build(ctx, pos) .Args({"partition"}) .Body() .Stream() .Apply(TCoLambda(newMatchRecognize)) .With(0, "partition") .Build() .Build() .Done() .Ptr(); if (isStreaming) { TExprNode::TPtr keySelector; if (partitionColumns->ChildrenSize() != 0) { keySelector = std::move(partitionKeySelector); } else { // Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage // TODO(zverevgeny): fixme keySelector = Build(ctx, pos) .Args({"row"}) .Body(MakeBool(pos, ctx)) .Done() .Ptr(); } return Build(ctx, pos) .Input(std::move(input)) .KeySelectorLambda(std::move(keySelector)) .ListHandlerLambda(std::move(lambda)) .Done() .Ptr(); } else { // non-streaming return Build(ctx, pos) .Input(std::move(input)) .KeySelectorLambda(std::move(partitionKeySelector)) .SortDirections(std::move(sortOrder)) .SortKeySelectorLambda(std::move(sortKey)) .ListHandlerLambda(std::move(lambda)) .Done() .Ptr(); } } } // namespace NYql