Просмотр исходного кода

Add federated query sources/sinks to KQP plan

galaxycrab 1 год назад
Родитель
Сommit
be44b88ef3

+ 12 - 0
.mapping.json

@@ -10746,6 +10746,18 @@
   "ydb/library/yql/utils/log/ut/CMakeLists.linux-x86_64.txt":"",
   "ydb/library/yql/utils/log/ut/CMakeLists.txt":"",
   "ydb/library/yql/utils/log/ut/CMakeLists.windows-x86_64.txt":"",
+  "ydb/library/yql/utils/plan/CMakeLists.darwin-arm64.txt":"",
+  "ydb/library/yql/utils/plan/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/library/yql/utils/plan/CMakeLists.linux-aarch64.txt":"",
+  "ydb/library/yql/utils/plan/CMakeLists.linux-x86_64.txt":"",
+  "ydb/library/yql/utils/plan/CMakeLists.txt":"",
+  "ydb/library/yql/utils/plan/CMakeLists.windows-x86_64.txt":"",
+  "ydb/library/yql/utils/plan/ut/CMakeLists.darwin-arm64.txt":"",
+  "ydb/library/yql/utils/plan/ut/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/library/yql/utils/plan/ut/CMakeLists.linux-aarch64.txt":"",
+  "ydb/library/yql/utils/plan/ut/CMakeLists.linux-x86_64.txt":"",
+  "ydb/library/yql/utils/plan/ut/CMakeLists.txt":"",
+  "ydb/library/yql/utils/plan/ut/CMakeLists.windows-x86_64.txt":"",
   "ydb/library/yql/utils/simd/CMakeLists.darwin-arm64.txt":"",
   "ydb/library/yql/utils/simd/CMakeLists.darwin-x86_64.txt":"",
   "ydb/library/yql/utils/simd/CMakeLists.linux-aarch64.txt":"",

+ 1 - 1
ydb/core/kqp/opt/CMakeLists.darwin-arm64.txt

@@ -30,7 +30,7 @@ target_link_libraries(core-kqp-opt PUBLIC
   yql-dq-common
   yql-dq-opt
   yql-dq-type_ann
-  providers-s3-expr_nodes
+  yql-utils-plan
   core-kqp-provider
   tools-enum_parser-enum_serialization_runtime
 )

+ 1 - 1
ydb/core/kqp/opt/CMakeLists.darwin-x86_64.txt

@@ -30,7 +30,7 @@ target_link_libraries(core-kqp-opt PUBLIC
   yql-dq-common
   yql-dq-opt
   yql-dq-type_ann
-  providers-s3-expr_nodes
+  yql-utils-plan
   core-kqp-provider
   tools-enum_parser-enum_serialization_runtime
 )

+ 1 - 1
ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt

@@ -31,7 +31,7 @@ target_link_libraries(core-kqp-opt PUBLIC
   yql-dq-common
   yql-dq-opt
   yql-dq-type_ann
-  providers-s3-expr_nodes
+  yql-utils-plan
   core-kqp-provider
   tools-enum_parser-enum_serialization_runtime
 )

+ 1 - 1
ydb/core/kqp/opt/CMakeLists.linux-x86_64.txt

@@ -31,7 +31,7 @@ target_link_libraries(core-kqp-opt PUBLIC
   yql-dq-common
   yql-dq-opt
   yql-dq-type_ann
-  providers-s3-expr_nodes
+  yql-utils-plan
   core-kqp-provider
   tools-enum_parser-enum_serialization_runtime
 )

+ 1 - 1
ydb/core/kqp/opt/CMakeLists.windows-x86_64.txt

@@ -30,7 +30,7 @@ target_link_libraries(core-kqp-opt PUBLIC
   yql-dq-common
   yql-dq-opt
   yql-dq-type_ann
-  providers-s3-expr_nodes
+  yql-utils-plan
   core-kqp-provider
   tools-enum_parser-enum_serialization_runtime
 )

+ 108 - 222
ydb/core/kqp/opt/kqp_query_plan.cpp

@@ -10,9 +10,10 @@
 #include <ydb/library/yql/core/yql_expr_optimize.h>
 #include <ydb/library/yql/core/yql_opt_utils.h>
 #include <ydb/library/yql/dq/opt/dq_opt.h>
+#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
 #include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
 #include <ydb/library/yql/dq/tasks/dq_tasks_graph.h>
-#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
+#include <ydb/library/yql/utils/plan/plan_utils.h>
 
 #include <library/cpp/json/writer/json.h>
 #include <library/cpp/json/json_reader.h>
@@ -238,11 +239,6 @@ public:
     }
 
 private:
-    struct TPredicate {
-        TVector<TString> Args;
-        TString Body;
-    };
-
     struct TOperator {
         TMap<TString, NJson::TJsonValue> Properties;
         TSet<ui32> Inputs;
@@ -395,157 +391,6 @@ private:
         return QueryPlanNodes[SerializerCtx.PlanNodeId];
     }
 
-    TString ToStr(const TCoDataCtor& data) {
-        TStringStream out;
-        EscapeArbitraryAtom(data.Literal().Value(), '"', &out);
-        return out.Str();
-    }
-
-    TString ToStr(const TCoLambda& lambda) {
-        return PrettyExprStr(lambda.Body());
-    }
-
-    TString ToStr(const TCoAsStruct& asStruct) {
-        TVector<TString> args;
-        for (const auto& kv : asStruct.Args()) {
-            auto key = PrettyExprStr(TExprBase(kv->Child(0)));
-            auto value = PrettyExprStr(TExprBase(kv->Child(1)));
-
-            if (!key.empty() && !value.empty()) {
-                args.push_back(TStringBuilder() << key << ": " << value);
-            }
-        }
-
-        return TStringBuilder() << "{" << JoinStrings(std::move(args), ",") << "}";
-    }
-
-    TString ToStr(const TCoAsList& asList) {
-        TVector<TString> args;
-        for (const auto& arg : asList.Args()) {
-            if (auto str = PrettyExprStr(TExprBase(arg))) {
-                args.push_back(std::move(str));
-            }
-        }
-
-        return TStringBuilder() << "[" << JoinStrings(std::move(args), ",") << "]";
-    }
-
-    TString ToStr(const TCoMember& member) {
-        auto structName = PrettyExprStr(member.Struct());
-        auto memberName = PrettyExprStr(member.Name());
-
-        if (!structName.empty() && !memberName.empty()) {
-            return TStringBuilder() << structName << "." << memberName;
-        }
-
-        return {};
-    }
-
-    TString ToStr(const TCoIfPresent& ifPresent) {
-        /* expected IfPresent with 3 children:
-         * 0-Optional, 1-PresentHandler, 2-MissingValue */
-        if (ifPresent.Ref().ChildrenSize() == 3) {
-            auto arg = PrettyExprStr(ifPresent.Optional());
-            auto pred = ExtractPredicate(ifPresent.PresentHandler());
-
-            Y_ENSURE(!pred.Args.empty());
-            return std::regex_replace(pred.Body.c_str(),
-                   std::regex(pred.Args[0].c_str()), arg.c_str()).data();
-        }
-
-        return "...";
-    }
-
-    TString ToStr(const TCoExists& exist) {
-        if (auto str = PrettyExprStr(exist.Optional())) {
-            return TStringBuilder() << "Exist(" << str << ")";
-        }
-
-        return {};
-    }
-
-    TString AggrOpToStr(const TExprBase& aggr) {
-        TVector<TString> args;
-        for (const auto& child : aggr.Ref().Children()) {
-            if (auto str = PrettyExprStr(TExprBase(child))) {
-                args.push_back(std::move(str));
-            }
-        }
-
-        return TStringBuilder() << aggr.Ref().Content() << "("
-               << JoinStrings(std::move(args), ",") << ")";
-    }
-
-    TString BinaryOpToStr(const TExprBase& op) {
-        auto left = PrettyExprStr(TExprBase(op.Ref().Child(0)));
-        auto right = PrettyExprStr(TExprBase(op.Ref().Child(1)));
-
-        TStringBuilder str;
-        str << left;
-        if (left && right) {
-            str << " " << op.Ref().Content() << " ";
-        }
-        str << right;
-
-        return str;
-    }
-
-    TString LogicOpToStr(const TExprBase& op) {
-        TVector<TString> args;
-        for (const auto& child : op.Ref().Children()) {
-            if (auto str = PrettyExprStr(TExprBase(child))) {
-                args.push_back(std::move(str));
-            }
-        }
-
-        return JoinStrings(std::move(args), TStringBuilder() << " " << op.Ref().Content() << " ");
-    }
-
-    TString PrettyExprStr(const TExprBase& expr) {
-        static const THashMap<TString, TString> aggregations = {
-           {"AggrMin", "MIN"},
-           {"AggrMax", "MAX"},
-           {"AggrCountUpdate", "COUNT"},
-           {"AggrAdd", "SUM"}
-        };
-
-        TStringBuilder str;
-
-        if (expr.Maybe<TCoIntegralCtor>()) {
-            str << expr.Ref().Child(0)->Content();
-        } else if (auto data = expr.Maybe<TCoDataCtor>()) {
-            str << ToStr(data.Cast());
-        } else if (auto lambda = expr.Maybe<TCoLambda>()) {
-            str << ToStr(lambda.Cast());
-        } else if (auto asStruct = expr.Maybe<TCoAsStruct>()) {
-            str << ToStr(asStruct.Cast());
-        } else if (auto asList = expr.Maybe<TCoAsList>()) {
-            str << ToStr(asList.Cast());
-        } else if (auto member = expr.Maybe<TCoMember>()) {
-            str << ToStr(member.Cast());
-        } else if (auto ifPresent = expr.Maybe<TCoIfPresent>()) {
-            str << ToStr(ifPresent.Cast());
-        } else if (auto exist = expr.Maybe<TCoExists>()) {
-            str << ToStr(exist.Cast());
-        } else if (expr.Maybe<TCoMin>() || expr.Maybe<TCoMax>() || expr.Maybe<TCoInc>()) {
-            str << AggrOpToStr(expr);
-        } else if (aggregations.contains(expr.Ref().Content())) {
-            str << aggregations.at(expr.Ref().Content()) << "("
-                << PrettyExprStr(TExprBase(expr.Ref().Child(0))) << ")";
-        } else if (expr.Maybe<TCoBinaryArithmetic>() || expr.Maybe<TCoCompare>()) {
-            str << BinaryOpToStr(expr);
-        } else if (expr.Maybe<TCoAnd>() || expr.Maybe<TCoOr>() || expr.Maybe<TCoXor>()) {
-            str << LogicOpToStr(expr);
-        } else if (expr.Maybe<TCoParameter>() || expr.Maybe<TCoJust>() || expr.Maybe<TCoSafeCast>()
-              || expr.Maybe<TCoCoalesce>() || expr.Maybe<TCoConvert>()) {
-            str << PrettyExprStr(TExprBase(expr.Ref().Child(0)));
-        } else {
-            str << expr.Ref().Content();
-        }
-
-        return str;
-    }
-
     void FillConnectionPlanNode(const TDqConnection& connection, TQueryPlanNode& planNode) {
         planNode.Type = EPlanNodeType::Connection;
 
@@ -771,7 +616,7 @@ private:
             op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table;
             planNode.NodeInfo["Tables"].AppendValue(op.Properties["Table"]);
 
-            auto rangesDesc = PrettyExprStr(sourceSettings.RangesExpr());
+            auto rangesDesc = NPlanUtils::PrettyExprStr(sourceSettings.RangesExpr());
             if (rangesDesc == "Void" || explainPrompt.UsedKeyColumns.empty()) {
                 readInfo.Type = EPlanTableReadType::FullScan;
 
@@ -864,6 +709,92 @@ private:
         }
     }
 
+    // Try get cluster from data surce or data sink node
+    TMaybe<TString> TryGetCluster(const TExprBase& d) {
+        if (d.Raw()->ChildrenSize() >= 2) {
+            TExprBase child(d.Raw()->Child(1));
+            auto cluster = child.Maybe<TCoAtom>();
+            if (cluster) {
+                return cluster.Cast().StringValue();
+            }
+        }
+        return Nothing();
+    }
+
+    TString RemovePathPrefix(TString path) {
+        const auto& prefix = SerializerCtx.Config->_KqpTablePathPrefix.Get();
+        if (prefix && path.StartsWith(*prefix)) {
+            size_t count = prefix->size();
+            if (path.size() > count && path[count] == '/') {
+                ++count;
+            }
+            path.erase(0, count);
+        }
+        return path;
+    }
+
+    void Visit(const TDqSource& source, TQueryPlanNode& stagePlanNode) {
+        // YDB sources
+        if (auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>(); settings.IsValid()) {
+            Visit(settings.Cast(), stagePlanNode);
+            return;
+        }
+
+        // Federated providers
+        TOperator op;
+        TCoDataSource dataSource = source.DataSource().Cast<TCoDataSource>();
+        const TString dataSourceCategory = dataSource.Category().StringValue();
+        IDqIntegration* dqIntegration = nullptr;
+
+        {
+            auto providerIt = SerializerCtx.TypeCtx.DataSourceMap.find(dataSourceCategory);
+            if (providerIt != SerializerCtx.TypeCtx.DataSourceMap.end()) {
+                dqIntegration = providerIt->second->GetDqIntegration();
+            }
+        }
+
+        // Common settings that can be overwritten by provider
+        op.Properties["Name"] = "Read from external data source";
+        op.Properties["SourceType"] = dataSourceCategory;
+        if (auto cluster = TryGetCluster(dataSource)) {
+            op.Properties["ExternalDataSource"] = RemovePathPrefix(std::move(*cluster));
+        }
+
+        if (dqIntegration) {
+            dqIntegration->FillSourcePlanProperties(source, op.Properties);
+        }
+
+        AddOperator(stagePlanNode, "Source", op);
+    }
+
+    void Visit(const TDqSink& sink, TQueryPlanNode& stagePlanNode) {
+        // Federated providers
+        TOperator op;
+        TCoDataSink dataSink = sink.DataSink().Cast<TCoDataSink>();
+        const TString dataSinkCategory = dataSink.Category().StringValue();
+        IDqIntegration* dqIntegration = nullptr;
+
+        {
+            auto providerIt = SerializerCtx.TypeCtx.DataSinkMap.find(dataSinkCategory);
+            if (providerIt != SerializerCtx.TypeCtx.DataSinkMap.end()) {
+                dqIntegration = providerIt->second->GetDqIntegration();
+            }
+        }
+
+        // Common settings that can be overwritten by provider
+        op.Properties["Name"] = "Write to external data source";
+        op.Properties["SinkType"] = dataSinkCategory;
+        if (auto cluster = TryGetCluster(dataSink)) {
+            op.Properties["ExternalDataSource"] = RemovePathPrefix(std::move(*cluster));
+        }
+
+        if (dqIntegration) {
+            dqIntegration->FillSinkPlanProperties(sink, op.Properties);
+        }
+
+        AddOperator(stagePlanNode, "Sink", op);
+    }
+
     void Visit(const TExprBase& expr, TQueryPlanNode& planNode) {
         if (expr.Maybe<TDqPhyStage>()) {
             auto stageGuid = NDq::TDqStageSettings::Parse(expr.Cast<TDqPhyStage>()).Id;
@@ -907,39 +838,7 @@ private:
 
             for (const auto& input : expr.Cast<TDqStageBase>().Inputs()) {
                 if (auto source = input.Maybe<TDqSource>()) {
-                    if (auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>(); settings.IsValid()) {
-                        Visit(settings.Cast(), stagePlanNode);
-                    } else if (auto settings = source.Settings().Maybe<TS3SourceSettings>(); settings.IsValid()) {
-                        TOperator op;
-                        op.Properties["Name"] = S3ProviderName;
-                        op.Properties["Format"] = "raw";
-                        auto cluster = source.Cast().DataSource().Cast<TS3DataSource>().Cluster().StringValue();
-                        if (auto pos = cluster.rfind('/'); pos != TString::npos) {
-                            cluster = cluster.substr(pos + 1);
-                        }
-                        op.Properties["Cluster"] = cluster;
-                        AddOperator(stagePlanNode, "Source", op);
-                    } else if (auto settings = source.Settings().Maybe<TS3ParseSettings>(); settings.IsValid()) {
-                        TOperator op;
-                        op.Properties["Name"] = S3ProviderName;
-                        op.Properties["Format"] = settings.Cast().Format().StringValue();
-                        auto cluster = source.Cast().DataSource().Cast<TS3DataSource>().Cluster().StringValue();
-                        if (auto pos = cluster.rfind('/'); pos != TString::npos) {
-                            cluster = cluster.substr(pos + 1);
-                        }
-                        op.Properties["Cluster"] = cluster;
-                        const TStructExprType* fullRowType = settings.Cast().RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
-                        auto rowTypeItems = fullRowType->GetItems();
-                        auto& columns = op.Properties["ReadColumns"];
-                        for (auto& item : rowTypeItems) {
-                            columns.AppendValue(item->GetName());
-                        }
-                        AddOperator(stagePlanNode, "Source", op);
-                    } else {
-                        TOperator op;
-                        op.Properties["Name"] = source.Cast().DataSource().Cast<TCoDataSource>().Category().StringValue();
-                        AddOperator(stagePlanNode, "Source", op);
-                    }
+                    Visit(source.Cast(), stagePlanNode);
                 } else {
                     auto inputCn = input.Cast<TDqConnection>();
 
@@ -952,10 +851,8 @@ private:
 
             if (auto outputs = expr.Cast<TDqStageBase>().Outputs()) {
                 for (auto output : outputs.Cast()) {
-                    if (output.Maybe<TDqSink>()) {
-                        TOperator op;
-                        op.Properties["Name"] = output.DataSink().Cast<TCoDataSink>().Category().StringValue();
-                        AddOperator(stagePlanNode, "Sink", op);
+                    if (auto sink = output.Maybe<TDqSink>()) {
+                        Visit(sink.Cast(), stagePlanNode);
                     }
                 }
             }
@@ -1077,8 +974,8 @@ private:
     ui32 Visit(const TCoCombineCore& combiner, TQueryPlanNode& planNode) {
         TOperator op;
         op.Properties["Name"] = "Aggregate";
-        op.Properties["GroupBy"] = PrettyExprStr(combiner.KeyExtractor());
-        op.Properties["Aggregation"] = PrettyExprStr(combiner.UpdateHandler());
+        op.Properties["GroupBy"] = NPlanUtils::PrettyExprStr(combiner.KeyExtractor());
+        op.Properties["Aggregation"] = NPlanUtils::PrettyExprStr(combiner.UpdateHandler());
 
         return AddOperator(planNode, "Aggregate", std::move(op));
     }
@@ -1086,7 +983,7 @@ private:
     ui32 Visit(const TCoSort& sort, TQueryPlanNode& planNode) {
         TOperator op;
         op.Properties["Name"] = "Sort";
-        op.Properties["SortBy"] = PrettyExprStr(sort.KeySelectorLambda());
+        op.Properties["SortBy"] = NPlanUtils::PrettyExprStr(sort.KeySelectorLambda());
 
         return AddOperator(planNode, "Sort", std::move(op));
     }
@@ -1094,8 +991,8 @@ private:
     ui32 Visit(const TCoTop& top, TQueryPlanNode& planNode) {
         TOperator op;
         op.Properties["Name"] = "Top";
-        op.Properties["TopBy"] = PrettyExprStr(top.KeySelectorLambda());
-        op.Properties["Limit"] = PrettyExprStr(top.Count());
+        op.Properties["TopBy"] = NPlanUtils::PrettyExprStr(top.KeySelectorLambda());
+        op.Properties["Limit"] = NPlanUtils::PrettyExprStr(top.Count());
 
         return AddOperator(planNode, "Top", std::move(op));
     }
@@ -1103,8 +1000,8 @@ private:
     ui32 Visit(const TCoTopSort& topSort, TQueryPlanNode& planNode) {
         TOperator op;
         op.Properties["Name"] = "TopSort";
-        op.Properties["TopSortBy"] = PrettyExprStr(topSort.KeySelectorLambda());
-        op.Properties["Limit"] = PrettyExprStr(topSort.Count());
+        op.Properties["TopSortBy"] = NPlanUtils::PrettyExprStr(topSort.KeySelectorLambda());
+        op.Properties["Limit"] = NPlanUtils::PrettyExprStr(topSort.Count());
 
         return AddOperator(planNode, "TopSort", std::move(op));
     }
@@ -1112,7 +1009,7 @@ private:
     ui32 Visit(const TCoTake& take, TQueryPlanNode& planNode) {
         TOperator op;
         op.Properties["Name"] = "Limit";
-        op.Properties["Limit"] = PrettyExprStr(take.Count());
+        op.Properties["Limit"] = NPlanUtils::PrettyExprStr(take.Count());
 
         return AddOperator(planNode, "Limit", std::move(op));
     }
@@ -1120,7 +1017,7 @@ private:
     ui32 Visit(const TCoSkip& skip, TQueryPlanNode& planNode) {
         TOperator op;
         op.Properties["Name"] = "Offset";
-        op.Properties["Offset"] = PrettyExprStr(skip.Count());
+        op.Properties["Offset"] = NPlanUtils::PrettyExprStr(skip.Count());
 
         return AddOperator(planNode, "Offset", std::move(op));
     }
@@ -1133,7 +1030,7 @@ private:
     }
 
     ui32 Visit(const TCoIterator& iter, TQueryPlanNode& planNode) {
-        const auto iterValue = PrettyExprStr(iter.List());
+        const auto iterValue = NPlanUtils::PrettyExprStr(iter.List());
 
         TOperator op;
         op.Properties["Name"] = "Iterator";
@@ -1150,7 +1047,7 @@ private:
     }
 
     ui32 Visit(const TCoPartitionByKey& partitionByKey, TQueryPlanNode& planNode) {
-        const auto inputValue = PrettyExprStr(partitionByKey.Input());
+        const auto inputValue = NPlanUtils::PrettyExprStr(partitionByKey.Input());
 
         TOperator op;
         op.Properties["Name"] = "PartitionByKey";
@@ -1278,22 +1175,11 @@ private:
         return AddOperator(planNode, name, std::move(op));
     }
 
-    TPredicate ExtractPredicate(const TCoLambda& expr) {
-        TPredicate pred;
-        pred.Args.reserve(expr.Args().Ref().ChildrenSize());
-        for (const auto& child : expr.Args().Ref().Children()) {
-            pred.Args.push_back(PrettyExprStr(TExprBase(child)));
-        }
-
-        pred.Body = PrettyExprStr(expr.Body());
-        return pred;
-    }
-
     void AddOptimizerEstimates(TOperator& op, const TExprBase& expr) {
         if (!SerializerCtx.Config->HasOptEnableCostBasedOptimization()) {
             return;
         }
-        
+
         if (auto stats = SerializerCtx.TypeCtx.GetStats(expr.Raw())) {
             op.Properties["E-Rows"] = stats->Nrows;
             op.Properties["E-Cost"] = stats->Cost;
@@ -1309,13 +1195,13 @@ private:
         TOperator op;
         op.Properties["Name"] = "Filter";
 
-        auto pred = ExtractPredicate(filter.Lambda());
+        auto pred = NPlanUtils::ExtractPredicate(filter.Lambda());
         op.Properties["Predicate"] = pred.Body;
 
         AddOptimizerEstimates(op, filter);
 
         if (filter.Limit()) {
-            op.Properties["Limit"] = PrettyExprStr(filter.Limit().Cast());
+            op.Properties["Limit"] = NPlanUtils::PrettyExprStr(filter.Limit().Cast());
         }
 
         return AddOperator(planNode, "Filter", std::move(op));
@@ -1371,7 +1257,7 @@ private:
         op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table;
         planNode.NodeInfo["Tables"].AppendValue(op.Properties["Table"]);
 
-        auto rangesDesc = PrettyExprStr(read.Ranges());
+        auto rangesDesc = NPlanUtils::PrettyExprStr(read.Ranges());
         if (rangesDesc == "Void" || explainPrompt.UsedKeyColumns.empty()) {
             readInfo.Type = EPlanTableReadType::FullScan;
 
@@ -2113,7 +1999,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
                         }
                     }
                 }
- 
+
                 NKqpProto::TKqpStageExtraStats kqpStageStats;
                 if ((*stat)->GetExtra().UnpackTo(&kqpStageStats)) {
                     auto& nodesStats = stats.InsertValue("NodesScanShards", NJson::JSON_ARRAY);

+ 1 - 1
ydb/core/kqp/opt/ya.make

@@ -23,7 +23,7 @@ PEERDIR(
     ydb/library/yql/dq/common
     ydb/library/yql/dq/opt
     ydb/library/yql/dq/type_ann
-    ydb/library/yql/providers/s3/expr_nodes
+    ydb/library/yql/utils/plan
     ydb/core/kqp/provider
 )
 

+ 4 - 0
ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-arm64.txt

@@ -35,7 +35,11 @@ target_link_options(ydb-core-kqp-ut-federated_query-generic PRIVATE
   CoreFoundation
 )
 target_sources(ydb-core-kqp-ut-federated_query-generic PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/ch_recipe_ut_helpers.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/connector_recipe_ut_helpers.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/kqp_generic_plan_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_join_ut.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/pg_recipe_ut_helpers.cpp
 )
 set_property(
   TARGET

+ 4 - 0
ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt

@@ -36,7 +36,11 @@ target_link_options(ydb-core-kqp-ut-federated_query-generic PRIVATE
   CoreFoundation
 )
 target_sources(ydb-core-kqp-ut-federated_query-generic PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/ch_recipe_ut_helpers.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/connector_recipe_ut_helpers.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/kqp_generic_plan_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_join_ut.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/generic/pg_recipe_ut_helpers.cpp
 )
 set_property(
   TARGET

Некоторые файлы не были показаны из-за большого количества измененных файлов