Browse Source

YQL-17250: Fix operation name at fallback message (#1194)

Maxim Kovalev 1 year ago
parent
commit
495492149e

+ 1 - 0
ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp

@@ -130,6 +130,7 @@ private:
         if (markFinished && !TYtDqProcessWrite::Match(input.Get())) {
             with_lock(state->StatisticsMutex) {
                 state->HybridStatistics[input->Content()].Entries.emplace_back(TString{"YtExecution"}, 0, 0, 0, 0, 1);
+                state->Statistics[Max<ui32>()].Entries.emplace_back(TString{"YtExecution"}, 0, 0, 0, 0, 1);
             }
         }
         auto outSection = TYtOutputOpBase(input).Output();

+ 11 - 20
ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp

@@ -129,10 +129,6 @@ private:
                 flow = node->IsCallable(TCoToFlow::CallableName()) && node->Head().IsCallable(TYtTableContent::CallableName());
                 return false;
             })) {
-                TExprNode::TListType flags;
-                flags.emplace_back(ctx.NewAtom(fill.Pos(), "FallbackOnError", TNodeFlags::Default));
-                flags.emplace_back(ctx.NewAtom(fill.Pos(), "FallbackOpYtFill", TNodeFlags::Default));
-
                 return Build<TYtTryFirst>(ctx, fill.Pos())
                     .First<TYtDqProcessWrite>()
                         .World(fill.World())
@@ -156,7 +152,7 @@ private:
                             .Build()
                             .ColumnHints().Build()
                         .Build()
-                        .Flags().Add(std::move(flags)).Build()
+                        .Flags().Add(GetHybridFlags(fill, ctx)).Build()
                     .Build()
                     .Second<TYtFill>()
                         .InitFrom(fill)
@@ -234,10 +230,6 @@ private:
         auto settings = NYql::AddSetting(sort.Settings().Ref(), EYtSettingType::NoDq, {}, ctx);
         auto operation = ctx.ChangeChild(sort.Ref(), TYtTransientOpBase::idx_Settings, std::move(settings));
 
-        TExprNode::TListType flags;
-        flags.emplace_back(ctx.NewAtom(sort.Pos(), "FallbackOnError", TNodeFlags::Default));
-        flags.emplace_back(ctx.NewAtom(sort.Pos(), "FallbackOpYtSort", TNodeFlags::Default));
-
         return Build<TYtTryFirst>(ctx, sort.Pos())
             .First<TYtDqProcessWrite>()
                 .World(std::move(newWorld))
@@ -261,7 +253,7 @@ private:
                     .Build()
                     .ColumnHints().Build()
                 .Build()
-                .Flags().Add(std::move(flags)).Build()
+                .Flags().Add(GetHybridFlags(sort, ctx)).Build()
             .Build()
             .Second(std::move(operation))
             .Done();
@@ -410,10 +402,6 @@ private:
                                 .Done();
                         }
 
-                        TExprNode::TListType flags;
-                        flags.emplace_back(ctx.NewAtom(map.Pos(), "FallbackOnError", TNodeFlags::Default));
-                        flags.emplace_back(ctx.NewAtom(map.Pos(), "FallbackOpYtMap", TNodeFlags::Default));
-
                         return Build<TYtTryFirst>(ctx, map.Pos())
                             .First<TYtDqProcessWrite>()
                                 .World(std::move(newWorld))
@@ -426,7 +414,7 @@ private:
                                         .Build()
                                     .ColumnHints().Build()
                                     .Build()
-                                .Flags().Add(std::move(flags)).Build()
+                                .Flags().Add(GetHybridFlags(map, ctx)).Build()
                                 .Build()
                             .Second<TYtMap>()
                                 .InitFrom(map)
@@ -581,10 +569,6 @@ private:
 
         auto reducer = ctx.NewLambda(reduce.Pos(), ctx.NewArguments(reduce.Pos(), {std::move(arg)}), std::move(body));
 
-        TExprNode::TListType flags;
-        flags.emplace_back(ctx.NewAtom(reduce.Pos(), "FallbackOnError", TNodeFlags::Default));
-        flags.emplace_back(ctx.NewAtom(reduce.Pos(), "FallbackOpYtReduce", TNodeFlags::Default));
-
         return Build<TYtTryFirst>(ctx, reduce.Pos())
             .template First<TYtDqProcessWrite>()
                 .World(std::move(newWorld))
@@ -614,7 +598,7 @@ private:
                     .Build()
                     .ColumnHints().Build()
                 .Build()
-                .Flags().Add(std::move(flags)).Build()
+                .Flags().Add(GetHybridFlags(reduce, ctx)).Build()
             .Build()
             .template Second<TYtOperation>()
                 .InitFrom(reduce)
@@ -681,6 +665,13 @@ private:
         }
     };
 
+    TExprNode::TListType GetHybridFlags(TExprBase node, TExprContext& ctx) const {
+        TExprNode::TListType flags;
+        flags.emplace_back(ctx.NewAtom(node.Pos(), "FallbackOnError", TNodeFlags::Default));
+        flags.emplace_back(ctx.NewAtom(node.Pos(), "FallbackOp" + TString(node.Raw()->Content()), TNodeFlags::Default));
+        return flags;
+    }
+
     const TYtState::TPtr State_;
     const THolder<IGraphTransformer> Finalizer_;
 };