123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- #include "yql_opt_match_recognize.h"
- #include "yql_opt_utils.h"
- #include <yql/essentials/core/sql_types/time_order_recover.h>
- #include <yql/essentials/core/yql_expr_optimize.h>
- #include <yql/essentials/utils/log/log.h>
- #include <ranges>
- namespace NYql {
- using namespace NNodes;
- namespace {
- bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) {
- switch (typeAnnCtx.MatchRecognizeStreaming) {
- case EMatchRecognizeStreamingMode::Disable:
- return false;
- case EMatchRecognizeStreamingMode::Force:
- return true;
- case EMatchRecognizeStreamingMode::Auto: {
- bool hasPq = false;
- NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node) {
- if (auto maybeDataSource = TExprBase(node).Maybe<TCoDataSource>()) {
- hasPq = maybeDataSource.Cast().Category().Value() == "pq";
- }
- return !hasPq;
- });
- return hasPq;
- }
- }
- }
- TExprNode::TPtr ExpandMatchRecognizeMeasuresAggregates(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& /* typeAnnCtx */) {
- const auto pos = node->Pos();
- const auto vars = node->Child(3);
- static constexpr size_t AggregatesLambdasStartPos = 4;
- static constexpr size_t MeasuresLambdasStartPos = 3;
- return ctx.Builder(pos)
- .Callable("MatchRecognizeMeasures")
- .Add(0, node->ChildPtr(0))
- .Add(1, node->ChildPtr(1))
- .Add(2, node->ChildPtr(2))
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (size_t i = 0; i < vars->ChildrenSize(); ++i) {
- const auto var = vars->Child(i)->Content();
- const auto handler = node->ChildPtr(AggregatesLambdasStartPos + i);
- if (!var) {
- auto value = handler->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional
- ? ctx.Builder(pos).Callable("Just").Add(0, handler).Seal().Build()
- : handler;
- parent.Add(
- MeasuresLambdasStartPos + i,
- ctx.Builder(pos)
- .Lambda()
- .Param("data")
- .Param("vars")
- .Add(0, std::move(value))
- .Seal()
- .Build()
- );
- continue;
- }
- parent.Add(
- MeasuresLambdasStartPos + i,
- ctx.Builder(pos)
- .Lambda()
- .Param("data")
- .Param("vars")
- .Callable(0, "Member")
- .Callable(0, "Head")
- .Callable(0, "Aggregate")
- .Callable(0, "OrderedMap")
- .Callable(0, "OrderedFlatMap")
- .Callable(0, "Member")
- .Arg(0, "vars")
- .Atom(1, var)
- .Seal()
- .Lambda(1)
- .Param("item")
- .Callable(0, "ListFromRange")
- .Callable(0, "Member")
- .Arg(0, "item")
- .Atom(1, "From")
- .Seal()
- .Callable(1, "+MayWarn")
- .Callable(0, "Member")
- .Arg(0, "item")
- .Atom(1, "To")
- .Seal()
- .Callable(1, "Uint64")
- .Atom(0, "1")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Lambda(1)
- .Param("index")
- .Callable(0, "Unwrap")
- .Callable(0, "Lookup")
- .Callable(0, "ToIndexDict")
- .Arg(0, "data")
- .Seal()
- .Arg(1, "index")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .List(1).Seal()
- .List(2)
- .Add(0, handler)
- .Seal()
- .List(3).Seal()
- .Seal()
- .Seal()
- .Atom(1, handler->Child(0)->Content())
- .Seal()
- .Seal()
- .Build()
- );
- }
- return parent;
- })
- .Seal()
- .Build();
- }
- THashSet<TStringBuf> FindUsedVars(const TExprNode::TPtr& params) {
- THashSet<TStringBuf> result;
- const auto measures = params->Child(0);
- const auto measuresVars = measures->Child(3);
- for (const auto& var : measuresVars->Children()) {
- result.insert(var->Content());
- }
- const auto defines = params->Child(4);
- static constexpr size_t defineLambdasStartPos = 3;
- for (const auto& define : defines->Children() | std::views::drop(defineLambdasStartPos)) {
- const auto lambda = TCoLambda(define);
- const auto varsArg = lambda.Args().Arg(1).Ptr();
- const auto lambdaBody = lambda.Body().Ptr();
- NYql::VisitExpr(
- lambdaBody,
- [varsArg, &result](const TExprNode::TPtr& node) {
- if (auto maybeMember = TMaybeNode<TCoMember>(node);
- maybeMember && maybeMember.Cast().Struct().Ptr() == varsArg) {
- result.insert(maybeMember.Cast().Name().Value());
- return false;
- }
- return true;
- }
- );
- }
- return result;
- }
- TExprNode::TPtr MarkUnusedPatternVars(const TExprNode::TPtr& node, TExprContext& ctx, const THashSet<TStringBuf>& usedVars, const TExprNode::TPtr& rowsPerMatch) {
- const auto pos = node->Pos();
- if (node->ChildrenSize() == 6 && node->Child(0)->IsAtom()) {
- const auto varName = node->Child(0)->Content();
- const auto output = FromString<bool>(node->Child(4)->Content());
- const auto varUnused = ("RowsPerMatch_AllRows" != rowsPerMatch->Content() || !output) && !usedVars.contains(varName);
- return Build<TExprList>(ctx, pos)
- .Add(node->ChildPtr(0))
- .Add(node->ChildPtr(1))
- .Add(node->ChildPtr(2))
- .Add(node->ChildPtr(3))
- .Add(node->ChildPtr(4))
- .Add<TCoAtom>().Build(ToString(varUnused))
- .Done()
- .Ptr();
- }
- TExprNode::TListType newChildren;
- for (const auto& child : node->Children()) {
- newChildren.push_back(MarkUnusedPatternVars(child, ctx, usedVars, rowsPerMatch));
- }
- return ctx.ChangeChildren(*node, std::move(newChildren));
- }
- } // anonymous namespace
- TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
- YQL_CLOG(DEBUG, Core) << "Expand " << node->Content();
- TCoMatchRecognize matchRecognize(node);
- const auto input = matchRecognize.Input().Ptr();
- const auto partitionKeySelector = matchRecognize.PartitionKeySelector().Ptr();
- const auto partitionColumns = matchRecognize.PartitionColumns().Ptr();
- const auto sortTraits = matchRecognize.SortTraits().Ptr();
- const auto params = matchRecognize.Params().Ptr();
- const auto pos = matchRecognize.Pos();
- const auto isStreaming = IsStreaming(input, typeAnnCtx);
- auto newInput = Build<TCoLambda>(ctx, pos)
- .Args({"partition"})
- .Body<TCoToFlow>()
- .Input("partition")
- .Build()
- .Done()
- .Ptr();
- TExprNode::TPtr sortKey;
- TExprNode::TPtr sortOrder;
- ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
- auto timeOrderRecover = [&]() -> TExprNode::TPtr {
- if (!isStreaming) {
- return newInput;
- }
- switch (sortOrder->ChildrenSize()) {
- case 0:
- return newInput;
- case 1: {
- auto timeOrderRecover = ctx.Builder(pos)
- .Lambda()
- .Param("partition")
- .Callable("TimeOrderRecover")
- .Apply(0, std::move(newInput))
- .With(0, "partition")
- .Seal()
- .Add(1, sortKey)
- .Callable(2, "Interval")
- .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverDelay))
- .Seal()
- .Callable(3, "Interval")
- .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverAhead))
- .Seal()
- .Callable(4, "Uint32")
- .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverRowLimit))
- .Seal()
- .Seal()
- .Seal()
- .Build();
- return Build<TCoLambda>(ctx, pos)
- .Args({"partition"})
- .Body<TCoOrderedMap>()
- .Input<TExprApplier>()
- .Apply(TCoLambda(timeOrderRecover))
- .With(0, "partition")
- .Build()
- .Lambda<TCoLambda>()
- .Args({"row"})
- .Body<TCoRemoveMember>()
- .Struct("row")
- .Name<TCoAtom>().Build(NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER)
- .Build()
- .Build()
- .Build()
- .Done()
- .Ptr();
- }
- default:
- ctx.AddError(TIssue(ctx.GetPosition(sortTraits->Pos()), "Expect ORDER BY timestamp for MATCH_RECOGNIZE"));
- return {};
- }
- }();
- auto measures = ExpandMatchRecognizeMeasuresAggregates(params->ChildPtr(0), ctx, typeAnnCtx);
- auto rowsPerMatch = params->ChildPtr(1);
- const auto usedVars = FindUsedVars(params);
- auto pattern = MarkUnusedPatternVars(params->ChildPtr(3), ctx, usedVars, rowsPerMatch);
- auto settings = AddSetting(*ctx.NewList(pos, {}), pos, "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx);
- auto newMatchRecognize = ctx.Builder(pos)
- .Lambda()
- .Param("partition")
- .Callable("MatchRecognizeCore")
- .Apply(0, std::move(timeOrderRecover))
- .With(0, "partition")
- .Seal()
- .Add(1, partitionKeySelector)
- .Add(2, partitionColumns)
- .Callable(3, params->Content())
- .Add(0, std::move(measures))
- .Add(1, std::move(rowsPerMatch))
- .Add(2, params->ChildPtr(2))
- .Add(3, std::move(pattern))
- .Add(4, params->ChildPtr(4))
- .Seal()
- .Add(4, std::move(settings))
- .Seal()
- .Seal()
- .Build();
- auto lambda = Build<TCoLambda>(ctx, pos)
- .Args({"partition"})
- .Body<TCoForwardList>()
- .Stream<TExprApplier>()
- .Apply(TCoLambda(newMatchRecognize))
- .With(0, "partition")
- .Build()
- .Build()
- .Done()
- .Ptr();
- if (isStreaming) {
- TExprNode::TPtr keySelector;
- if (partitionColumns->ChildrenSize() != 0) {
- keySelector = std::move(partitionKeySelector);
- } else {
- // Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage
- // TODO(zverevgeny): fixme
- keySelector = Build<TCoLambda>(ctx, pos)
- .Args({"row"})
- .Body(MakeBool<true>(pos, ctx))
- .Done()
- .Ptr();
- }
- return Build<TCoShuffleByKeys>(ctx, pos)
- .Input(std::move(input))
- .KeySelectorLambda(std::move(keySelector))
- .ListHandlerLambda(std::move(lambda))
- .Done()
- .Ptr();
- } else { // non-streaming
- return Build<TCoPartitionsByKeys>(ctx, pos)
- .Input(std::move(input))
- .KeySelectorLambda(std::move(partitionKeySelector))
- .SortDirections(std::move(sortOrder))
- .SortKeySelectorLambda(std::move(sortKey))
- .ListHandlerLambda(std::move(lambda))
- .Done()
- .Ptr();
- }
- }
- } // namespace NYql
|