Browse Source

YQL-16443 test steaming version of match_recognize on tables

zverevgeny 1 year ago
parent
commit
ec05b76b4a

+ 12 - 2
ydb/library/yql/core/yql_opt_match_recognize.cpp

@@ -9,7 +9,16 @@ namespace NYql {
 using namespace NNodes;
 
 namespace {
-bool IsStreaming(const TExprNode::TPtr& input) {
+bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) {
+    if (TTypeAnnotationContext::EMatchRecognizeStreamingMode::Disable == typeAnnCtx.MatchRecognizeStreaming){
+        return false;
+    }
+    if (TTypeAnnotationContext::EMatchRecognizeStreamingMode::Force == typeAnnCtx.MatchRecognizeStreaming){
+        return true;
+    }
+
+    YQL_ENSURE(TTypeAnnotationContext::EMatchRecognizeStreamingMode::Auto == typeAnnCtx.MatchRecognizeStreaming, "Internal logic error");
+
     bool hasPq = false;
     NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node){
         if (node->IsCallable("DataSource")) {
@@ -31,7 +40,8 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
     const auto& params = node->ChildRef(4);
     const auto pos = node->Pos();
 
-    const bool isStreaming = IsStreaming(input);
+    const bool isStreaming = IsStreaming(input, typeAnnCtx);
+
     TExprNode::TPtr settings = AddSetting(*ctx.NewList(pos, {}), pos,
           "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx);
 

+ 6 - 0
ydb/library/yql/core/yql_type_annotation.h

@@ -238,6 +238,12 @@ struct TTypeAnnotationContext: public TThrRefBase {
     IArrowResolver::TPtr ArrowResolver;
     TString CostBasedOptimizerType;
     bool MatchRecognize = false;
+    enum class EMatchRecognizeStreamingMode {
+        Disable,
+        Auto,
+        Force,
+    };
+    EMatchRecognizeStreamingMode MatchRecognizeStreaming = EMatchRecognizeStreamingMode::Auto;
     i64 TimeOrderRecoverDelay = -10'000'000; //microseconds
     i64 TimeOrderRecoverAhead = 10'000'000; //microseconds
     ui32 TimeOrderRecoverRowLimit = 1'000'000;

+ 17 - 0
ydb/library/yql/providers/config/yql_config_provider.cpp

@@ -870,6 +870,23 @@ namespace {
                     return false;
                 }
             }
+            else if (name == "MatchRecognizeStream") {
+                if (args.size() != 1) {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected at most 1 argument, but got " << args.size()));
+                    return false;
+                }
+                const auto& arg = args[0];
+                if (arg == "disable") {
+                    Types.MatchRecognizeStreaming = TTypeAnnotationContext::EMatchRecognizeStreamingMode::Disable;
+                } else if (arg == "auto") {
+                    Types.MatchRecognizeStreaming = TTypeAnnotationContext::EMatchRecognizeStreamingMode::Auto;
+                } else if (arg == "force") {
+                    Types.MatchRecognizeStreaming = TTypeAnnotationContext::EMatchRecognizeStreamingMode::Force;
+                } else {
+                    ctx.AddError(TIssue(pos, TStringBuilder() << "Expected `disable|auto|force', but got: " << args[0]));
+                    return false;
+                }
+            }
             else {
                 ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
                 return false;