Browse Source

YQL-16443 use TimeOrderRecover for streams

zverevgeny 1 year ago
parent
commit
6e4b3cea69

+ 1 - 1
ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp

@@ -7368,7 +7368,7 @@ struct TPeepHoleRules {
         {"CheckedMinus", &ExpandCheckedMinus},
         {"JsonValue", &ExpandJsonValue},
         {"JsonExists", &ExpandJsonExists},
-        {"MatchRecognize", &ExpandMatchRecognize}
+        //TODO(zverevgeny): add me {"MatchRecognize", &ExpandMatchRecognize}
     };
 
     static constexpr std::initializer_list<TExtPeepHoleOptimizerMap::value_type> CommonStageExtRulesInit = {

+ 67 - 9
ydb/library/yql/core/yql_opt_match_recognize.cpp

@@ -1,5 +1,6 @@
 #include "yql_opt_match_recognize.h"
 #include "yql_opt_utils.h"
+#include <ydb/library/yql/core/sql_types/time_order_recover.h>
 #include <ydb/library/yql/core/yql_expr_optimize.h>
 #include <ydb/library/yql/utils/log/log.h>
 
@@ -21,7 +22,7 @@ bool IsStreaming(const TExprNode::TPtr& input) {
 }
 } //namespace
 
-TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx) {
+TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& typeAnnCtx) {
     YQL_ENSURE(node->IsCallable({"MatchRecognize"}));
     const auto& input = node->ChildRef(0);
     const auto& partitionKeySelector = node->ChildRef(1);
@@ -56,18 +57,75 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
     ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
     TExprNode::TPtr result;
     if (isStreaming) {
-        //TODO use TimeOrderRecover
+        YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE on streams");
+        const auto reordered = ctx.Builder(pos)
+            .Lambda()
+            .Param("partition")
+                .Callable("ForwardList")
+                    .Callable(0, "OrderedMap")
+                        .Callable(0, "TimeOrderRecover")
+                            .Callable(0, "ToFlow").
+                                Arg(0, "partition")
+                            .Seal()
+                            .Add(1, sortKey)
+                            .Callable(2, "Interval")
+                                .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverDelay)))
+                            .Seal()
+                            .Callable(3, "Interval")
+                                .Add(0,  ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverAhead)))
+                            .Seal()
+                            .Callable(4, "Uint32")
+                                .Add(0,  ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverRowLimit)))
+                            .Seal()
+                        .Seal()
+                        .Lambda(1)
+                            .Param("row")
+                            .Callable("RemoveMember")
+                                .Arg(0, "row")
+                                .Add(1, ctx.NewAtom(pos, NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER))
+                            .Seal()
+                        .Seal()
+                    .Seal()
+                .Seal()
+            .Seal()
+        .Build();
+
+        const auto matchRecognizeOnReorderedPartition = ctx.Builder(pos)
+            .Lambda()
+                .Param("partition")
+                .Apply(matchRecognize)
+                    .With(0)
+                        .Apply(reordered)
+                            .With(0)
+                                .Arg("partition")
+                            .Done()
+                        .Seal()
+                    .Done()
+                .Seal()
+            .Seal()
+        .Build();
+        TExprNode::TPtr keySelector;
         if (partitionColumns->ChildrenSize() != 0) {
-            result = ctx.Builder(pos)
-                .Callable("ShuffleByKeys")
-                    .Add(0, input)
-                    .Add(1, partitionKeySelector)
-                    .Add(2, matchRecognize)
+            keySelector = partitionKeySelector;
+        } else {
+            //Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage
+            //TODO(zverevgeny): fixme
+            keySelector = ctx.Builder(pos)
+                .Lambda()
+                    .Param("row")
+                    .Callable("Bool")
+                        .Add(0, ctx.NewAtom(pos, "true"))
+                    .Seal()
                 .Seal()
             .Build();
-        } else {
-            result = matchRecognize;
         }
+        result = ctx.Builder(pos)
+            .Callable("ShuffleByKeys")
+                .Add(0, input)
+                .Add(1, keySelector)
+                .Add(2, matchRecognizeOnReorderedPartition)
+            .Seal()
+        .Build();
     } else { //non-streaming
         if (partitionColumns->ChildrenSize() != 0) {
             result = ctx.Builder(pos)

+ 3 - 2
ydb/library/yql/core/yql_opt_match_recognize.h

@@ -1,8 +1,9 @@
 #pragma once
 #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/core/yql_type_annotation.h>
 
 namespace NYql {
 
-TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr &node, TExprContext &ctx);
+TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr &node, TExprContext &ctx, const TTypeAnnotationContext& typeAnnCtx);
 
-} //namespace NYql
+} //namespace NYql

+ 3 - 1
ydb/library/yql/core/yql_type_annotation.h

@@ -238,7 +238,9 @@ struct TTypeAnnotationContext: public TThrRefBase {
     IArrowResolver::TPtr ArrowResolver;
     TString CostBasedOptimizerType;
     bool MatchRecognize = false;
-
+    i64 TimeOrderRecoverDelay = -10'000'000; //microseconds
+    i64 TimeOrderRecoverAhead = 10'000'000; //microseconds
+    ui32 TimeOrderRecoverRowLimit = 1'000'000;
     // compatibility with v0 or raw s-expression code
     bool OrderedColumns = false;
     TColumnOrderStorage::TPtr ColumnOrderStorage = new TColumnOrderStorage;

+ 3 - 4
ydb/library/yql/dq/opt/dq_opt_log.cpp

@@ -293,10 +293,9 @@ IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::T
     return status;
 }
 
-TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx) {
-    if (node.Maybe<TCoMatchRecognize>())
-        return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx));
-    return node;
+TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx, const TTypeAnnotationContext& typeAnnCtx) {
+    YQL_ENSURE(node.Maybe<TCoMatchRecognize>(), "Expected MatchRecognize");
+    return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx, typeAnnCtx));
 }
 
 }

+ 1 - 1
ydb/library/yql/dq/opt/dq_opt_log.h

@@ -34,6 +34,6 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx);
 
 IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config);
 
-NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx);
+NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx, const TTypeAnnotationContext& typeAnnCtx);
 
 } // namespace NYql::NDq

+ 42 - 0
ydb/library/yql/providers/config/yql_config_provider.cpp

@@ -828,6 +828,48 @@ namespace {
                 }
                 Types.MatchRecognize = name == "_EnableMatchRecognize";
             }
+            else if (name == "TimeOrderRecoverDelay") {
+                if (args.size() != 1) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected one argument, but got " << args.size()));
+                    return false;
+                }
+                if (!TryFromString(args[0], Types.TimeOrderRecoverDelay)) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0]));
+                    return false;
+                }
+                if (Types.TimeOrderRecoverDelay >= 0) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected negative value, but got: " << args[0]));
+                    return false;
+                }
+            }
+            else if (name == "TimeOrderRecoverAhead") {
+                if (args.size() != 1) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected one argument, but got " << args.size()));
+                    return false;
+                }
+                if (!TryFromString(args[0], Types.TimeOrderRecoverAhead)) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0]));
+                    return false;
+                }
+                if (Types.TimeOrderRecoverAhead <= 0) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected positive value, but got: " << args[0]));
+                    return false;
+                }
+            }
+            else if (name == "TimeOrderRecoverRowLimit") {
+                if (args.size() != 1) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected one argument, but got " << args.size()));
+                    return false;
+                }
+                if (!TryFromString(args[0], Types.TimeOrderRecoverRowLimit)) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0]));
+                    return false;
+                }
+                if (Types.TimeOrderRecoverRowLimit == 0) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected positive value, but got: " << args[0]));
+                    return false;
+                }
+            }
             else {
                 ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
                 return false;

+ 4 - 2
ydb/library/yql/providers/dq/opt/logical_optimize.cpp

@@ -146,8 +146,10 @@ protected:
     }
 
     TMaybeNode<TExprBase> ExpandMatchRecognize(TExprBase node, TExprContext& ctx) {
-        if (node.Cast<TCoInputBase>().Input().Maybe<TDqConnection>()) {
-            return DqExpandMatchRecognize(node, ctx);
+        if (node.Maybe<TCoMatchRecognize>() &&
+            node.Cast<TCoInputBase>().Input().Maybe<TDqConnection>()
+        ) {
+            return DqExpandMatchRecognize(node, ctx, TypesCtx);
         }
         return node;
     }

+ 1 - 1
ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp

@@ -2660,7 +2660,7 @@ protected:
     }
 
     TMaybeNode<TExprBase> MatchRecognize(TExprBase node, TExprContext& ctx) {
-        return ExpandMatchRecognize(node.Ptr(), ctx);
+        return ExpandMatchRecognize(node.Ptr(), ctx, *Types);
     }
 private:
     TYtState::TPtr State_;