Browse Source

YT-24309 Fix crashes related to missing TStatus::Repeat

Refactor code in a way that allows returning TStatus::Repeat early (when rebuilding section).
commit_hash:ffe00f7ba7ac0a52ff01bb5074c1112fd0c966b8
orlovorlov 5 days ago
parent
commit
f6bfb8011c

+ 3 - 5
yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp

@@ -2,6 +2,7 @@
 #include "yql_yt_phy_opt_helper.h"
 
 #include <yt/yql/providers/yt/provider/yql_yt_helpers.h>
+#include <yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h>
 #include <yt/yql/providers/yt/provider/yql_yt_join_impl.h>
 #include <yql/essentials/providers/common/codec/yql_codec_type_flags.h>
 
@@ -351,16 +352,13 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa
     }
     const auto tree = ImportYtEquiJoin(equiJoin, ctx);
 
-    const TMaybe<ui64> maxChunkCountExtendedStats = State_->Configuration->ExtendedStatsMaxChunkCount.Get();
-
-    if (tryReorder && waitAllInputs && maxChunkCountExtendedStats) {
+    if (tryReorder) {
         YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin";
         auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx);
         if (collectStatus == TStatus::Repeat) {
             return ExportYtEquiJoin(equiJoin, *tree, ctx, State_);
         }
-    }
-    if (tryReorder) {
+
         const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx);
         if (optimizedTree != tree) {
             return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_);

+ 1 - 0
yt/yql/providers/yt/provider/ya.make

@@ -5,6 +5,7 @@ SRCS(
     yql_yt_block_io_filter.cpp
     yql_yt_block_io_utils.cpp
     yql_yt_block_output.cpp
+    yql_yt_cbo_helpers.cpp
     yql_yt_datasink_constraints.cpp
     yql_yt_datasink_exec.cpp
     yql_yt_datasink_finalize.cpp

+ 281 - 0
yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp

@@ -0,0 +1,281 @@
+
+#include "yql_yt_cbo_helpers.h"
+#include "yql_yt_helpers.h"
+
+#include <yql/essentials/utils/log/log.h>
+
+namespace NYql {
+namespace {
+
+void AddJoinColumns(THashMap<TString, THashSet<TString>>& relJoinColumns, const TYtJoinNodeOp& op) {
+    for (ui32 i = 0; i < op.LeftLabel->ChildrenSize(); i += 2) {
+        auto ltable = op.LeftLabel->Child(i)->Content();
+        auto lcolumn = op.LeftLabel->Child(i + 1)->Content();
+        auto rtable = op.RightLabel->Child(i)->Content();
+        auto rcolumn = op.RightLabel->Child(i + 1)->Content();
+
+        relJoinColumns[TString(ltable)].insert(TString(lcolumn));
+        relJoinColumns[TString(rtable)].insert(TString(rcolumn));
+    }
+}
+
+IGraphTransformer::TStatus ExtractInMemorySize(
+    const TYtState::TPtr& state,
+    TString cluster,
+    TExprContext& ctx,
+    TMaybe<ui64>& leftMemorySize,
+    TMaybe<ui64>& rightMemorySize,
+    ESizeStatCollectMode mode,
+    TYtJoinNodeOp* op,
+    const TJoinLabels& labels,
+    int numLeaves,
+    TYtJoinNodeLeaf* leftLeaf,
+    bool leftTablesReady,
+    const TVector<TYtPathInfo::TPtr>& leftTables,
+    const THashSet<TString>& leftJoinKeys,
+    const TStructExprType* leftItemType,
+    TYtJoinNodeLeaf* rightLeaf,
+    bool rightTablesReady,
+    const TVector<TYtPathInfo::TPtr>& rightTables,
+    const THashSet<TString>& rightJoinKeys,
+    const TStructExprType* rightItemType)
+{
+    TMapJoinSettings mapSettings;
+    TJoinSideStats leftStats;
+    TJoinSideStats rightStats;
+    bool isCross = false;
+    auto status = CollectStatsAndMapJoinSettings(mode, mapSettings, leftStats, rightStats,
+                                                 leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
+                                                 leftLeaf, rightLeaf, *state, isCross, cluster, ctx);
+    if (status != IGraphTransformer::TStatus::Ok) {
+        YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status;
+        return status;
+    }
+    if (leftLeaf) {
+        const bool needPayload = op->JoinKind->IsAtom("Inner") || op->JoinKind->IsAtom("Right");
+        const auto& label = labels.Inputs[0];
+        TVector<TString> leftJoinKeyList(leftJoinKeys.begin(), leftJoinKeys.end());
+        const ui64 rows = mapSettings.LeftRows;
+        ui64 size = 0;
+        auto status = CalculateJoinLeafSize(size, mapSettings, leftLeaf->Section, *op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables);
+        if (status != IGraphTransformer::TStatus::Ok) {
+            YQL_CLOG(WARN, ProviderYt) << "Unable to calculate left join leaf size: " << status;
+            return status;
+        }
+        if (op->JoinKind->IsAtom("Cross")) {
+            leftMemorySize = size + rows * (1ULL + label.InputType->GetSize()) * sizeof(NKikimr::NUdf::TUnboxedValuePod);
+        } else {
+            leftMemorySize = CalcInMemorySizeNoCrossJoin(
+                label, *op, mapSettings, true, ctx, needPayload, size);
+        }
+    }
+
+    if (rightLeaf) {
+        const bool needPayload = op->JoinKind->IsAtom("Inner") || op->JoinKind->IsAtom("Left");
+        const auto& label = labels.Inputs[numLeaves - 1];
+        TVector<TString> rightJoinKeyList(rightJoinKeys.begin(), rightJoinKeys.end());
+        const ui64 rows = mapSettings.RightRows;
+        ui64 size = 0;
+
+        auto status = CalculateJoinLeafSize(size, mapSettings, rightLeaf->Section, *op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables);
+        if (status != IGraphTransformer::TStatus::Ok) {
+            YQL_CLOG(WARN, ProviderYt) << "Unable to calculate right join leaf size: " << status;
+            return status;
+        }
+        if (op->JoinKind->IsAtom("Cross")) {
+            rightMemorySize = size + rows * (1ULL + label.InputType->GetSize()) * sizeof(NKikimr::NUdf::TUnboxedValuePod);
+        } else {
+            rightMemorySize = CalcInMemorySizeNoCrossJoin(
+                label, *op, mapSettings, false, ctx, needPayload, size);
+        }
+    }
+    return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus CollectCboStatsLeaf(
+    const THashMap<TString, THashSet<TString>>& relJoinColumns,
+    const TString& cluster,
+    TYtJoinNodeLeaf& leaf,
+    const TYtState::TPtr& state,
+    TExprContext& ctx) {
+
+    const TMaybe<ui64> maxChunkCountExtendedStats = state->Configuration->ExtendedStatsMaxChunkCount.Get();
+    TVector<TYtPathInfo::TPtr> tables;
+    if (maxChunkCountExtendedStats) {
+        TVector<TString> requestedColumnList;
+        auto columnsPos = relJoinColumns.find(JoinLeafLabel(leaf.Label));
+        if (columnsPos != relJoinColumns.end()) {
+            requestedColumnList.assign(columnsPos->second.begin(), columnsPos->second.end());
+        }
+
+        THashSet<TString> memSizeColumns(requestedColumnList.begin(), requestedColumnList.end());
+        TVector<IYtGateway::TPathStatReq> pathStatReqs;
+
+        ui64 sectionChunkCount = 0;
+        for (auto path: leaf.Section.Paths()) {
+            auto pathInfo = MakeIntrusive<TYtPathInfo>(path);
+            tables.push_back(pathInfo);
+            sectionChunkCount += pathInfo->Table->Stat->ChunkCount;
+
+            if (pathInfo->HasColumns()) {
+                NYT::TRichYPath path;
+                pathInfo->FillRichYPath(path);
+                std::copy(path.Columns_->Parts_.begin(), path.Columns_->Parts_.end(), std::inserter(memSizeColumns, memSizeColumns.end()));
+            }
+
+            auto ytPath = BuildYtPathForStatRequest(cluster, *pathInfo, requestedColumnList, *state, ctx);
+
+            if (!ytPath) {
+                return IGraphTransformer::TStatus::Error;
+            }
+
+            pathStatReqs.push_back(
+                IYtGateway::TPathStatReq()
+                    .Path(*ytPath)
+                    .IsTemp(pathInfo->Table->IsTemp)
+                    .IsAnonymous(pathInfo->Table->IsAnonymous)
+                    .Epoch(pathInfo->Table->Epoch.GetOrElse(0)));
+        }
+
+        if (!pathStatReqs.empty() && (*maxChunkCountExtendedStats == 0 || sectionChunkCount <= *maxChunkCountExtendedStats)) {
+            IYtGateway::TPathStatOptions pathStatOptions =
+                IYtGateway::TPathStatOptions(state->SessionId)
+                    .Cluster(cluster)
+                    .Paths(pathStatReqs)
+                    .Config(state->Configuration->Snapshot())
+                    .Extended(true);
+
+            IYtGateway::TPathStatResult pathStats = state->Gateway->TryPathStat(std::move(pathStatOptions));
+
+            if (!pathStats.Success()) {
+                leaf.Section = Build<TYtSection>(ctx, leaf.Section.Ref().Pos())
+                    .InitFrom(leaf.Section)
+                    .Settings(NYql::AddSettingAsColumnList(leaf.Section.Settings().Ref(), EYtSettingType::StatColumns, requestedColumnList, ctx))
+                    .Done();
+                return IGraphTransformer::TStatus::Repeat;
+            }
+        }
+    }
+
+    TVector<ui64> dataSize;
+    return TryEstimateDataSizeChecked(dataSize, leaf.Section, cluster, tables, {}, *state, ctx);
+}
+
+IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
+    TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Left.Get());
+    TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Right.Get());
+    AddJoinColumns(relJoinColumns, op);
+
+    TRelSizeInfo leftSizeInfo;
+    TRelSizeInfo rightSizeInfo;
+    auto result = PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, state, cluster, ctx, &op);
+    if (result != IGraphTransformer::TStatus::Ok) {
+        return result;
+    }
+
+    if (leftLeaf) {
+        result = CollectCboStatsLeaf(relJoinColumns, cluster, *leftLeaf, state, ctx);
+    } else {
+        auto& leftOp = *dynamic_cast<TYtJoinNodeOp*>(op.Left.Get());
+        result = CollectCboStatsNode(relJoinColumns, cluster, leftOp, state, ctx);
+    }
+    if (result != IGraphTransformer::TStatus::Ok) {
+        return result;
+    }
+
+    if (rightLeaf) {
+        result = CollectCboStatsLeaf(relJoinColumns, cluster, *rightLeaf, state, ctx);
+    } else {
+        auto& rightOp = *dynamic_cast<TYtJoinNodeOp*>(op.Right.Get());
+        result = CollectCboStatsNode(relJoinColumns, cluster, rightOp, state, ctx);
+    }
+    return result;
+}
+
+}  // namespace
+
+IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(
+    TRelSizeInfo& outLeft,
+    TRelSizeInfo& outRight,
+    const TYtState::TPtr& state,
+    TString cluster,
+    TExprContext& ctx,
+    TYtJoinNodeOp* op) {
+    auto mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW);
+    if (!mapJoinUseFlow) {
+        // Only support flow map joins in CBO.
+        return IGraphTransformer::TStatus::Ok;
+    }
+
+    TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op->Left.Get());
+    TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op->Right.Get());
+
+    bool leftTablesReady = false;
+    TVector<TYtPathInfo::TPtr> leftTables;
+    bool rightTablesReady = false;
+    TVector<TYtPathInfo::TPtr> rightTables;
+    THashSet<TString> leftJoinKeys, rightJoinKeys;
+    int numLeaves = 0;
+    TJoinLabels labels;
+    const TStructExprType* leftItemType = nullptr;
+    const TStructExprType* leftItemTypeBeforePremap = nullptr;
+    const TStructExprType* rightItemType = nullptr;
+    const TStructExprType* rightItemTypeBeforePremap = nullptr;
+
+    {
+        if (leftLeaf) {
+            TYtSection section{leftLeaf->Section};
+            if (Y_UNLIKELY(!section.Settings().Empty() && section.Settings().Item(0).Name() == "Test")) {
+                return IGraphTransformer::TStatus::Ok;
+            }
+
+            auto status = CollectPathsAndLabelsReady(leftTablesReady, leftTables, labels, leftItemType, leftItemTypeBeforePremap, *leftLeaf, ctx);
+            if (status != IGraphTransformer::TStatus::Ok) {
+                YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status;
+                return status;
+            }
+            if (!labels.Inputs.empty()) {
+                leftJoinKeys = BuildJoinKeys(labels.Inputs[0], *op->LeftLabel);
+            }
+            ++numLeaves;
+        }
+        if (rightLeaf) {
+            TYtSection section{rightLeaf->Section};
+            if (Y_UNLIKELY(!section.Settings().Empty() && section.Settings().Item(0).Name() == "Test")) {
+                return IGraphTransformer::TStatus::Ok;
+            }
+            auto status = CollectPathsAndLabelsReady(rightTablesReady, rightTables, labels, rightItemType, rightItemTypeBeforePremap, *rightLeaf, ctx);
+            if (status != IGraphTransformer::TStatus::Ok) {
+                YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status;
+                return status;
+            }
+            if (std::ssize(labels.Inputs) > numLeaves) {
+                rightJoinKeys = BuildJoinKeys(labels.Inputs[numLeaves], *op->RightLabel);
+            }
+            ++numLeaves;
+        }
+    }
+
+    if (numLeaves == 0) {
+        return IGraphTransformer::TStatus::Ok;
+    }
+
+    auto status = ExtractInMemorySize(state, cluster, ctx, outLeft.MapJoinMemSize, outRight.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels,
+        numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType,
+        rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType);
+    if (status != IGraphTransformer::TStatus::Ok) {
+        return status;
+    }
+
+    status = ExtractInMemorySize(state, cluster, ctx, outLeft.LookupJoinMemSize, outRight.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels,
+        numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType,
+        rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType);
+    return status;
+}
+
+IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
+    THashMap<TString, THashSet<TString>> relJoinColumns;
+    return CollectCboStatsNode(relJoinColumns, cluster, op, state, ctx);
+}
+
+}

+ 12 - 0
yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h

@@ -0,0 +1,12 @@
+#pragma once
+
+#include "yql_yt_join_impl.h"
+#include "yql_yt_provider_context.h"
+
+namespace NYql {
+
+IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
+
+IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, TString cluster, TExprContext& ctx, TYtJoinNodeOp* op);
+
+}  // namespace NYql

+ 42 - 147
yt/yql/providers/yt/provider/yql_yt_join_impl.cpp

@@ -191,47 +191,6 @@ bool HasNonTrivialAny(const TEquiJoinLinkSettings& linkSettings, const TMapJoinS
     return hints.contains("any") && !unique;
 }
 
-IGraphTransformer::TStatus TryEstimateDataSizeChecked(TVector<ui64>& result, TYtSection& inputSection, const TString& cluster,
-    const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
-{
-    if (GetJoinCollectColumnarStatisticsMode(*state.Configuration) == EJoinCollectColumnarStatisticsMode::Sync) {
-        auto syncResult = EstimateDataSize(cluster, paths, columns, state, ctx);
-        if (!syncResult) {
-            return IGraphTransformer::TStatus::Error;
-        }
-        result = std::move(*syncResult);
-        return IGraphTransformer::TStatus::Ok;
-    }
-
-    TSet<TString> requestedColumns;
-    auto status = TryEstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx);
-    auto settings = inputSection.Settings().Ptr();
-    if (status == TStatus::Repeat) {
-        bool hasStatColumns = NYql::HasSetting(inputSection.Settings().Ref(), EYtSettingType::StatColumns);
-        if (hasStatColumns) {
-            auto oldColumns = NYql::GetSettingAsColumnList(*settings, EYtSettingType::StatColumns);
-            TSet<TString> oldColumnSet(oldColumns.begin(), oldColumns.end());
-
-            bool alreadyRequested = AllOf(requestedColumns, [&](const auto& c) {
-                return oldColumnSet.contains(c);
-            });
-
-            YQL_ENSURE(!alreadyRequested);
-
-            settings = NYql::RemoveSetting(*settings, EYtSettingType::StatColumns, ctx);
-        }
-
-        YQL_CLOG(INFO, ProviderYt) << "Stat missing for columns: " << JoinSeq(", ", requestedColumns) << ", rebuilding section";
-        TVector<TString> requestedColumnList(requestedColumns.begin(), requestedColumns.end());
-
-        inputSection = Build<TYtSection>(ctx, inputSection.Ref().Pos())
-            .InitFrom(inputSection)
-            .Settings(NYql::AddSettingAsColumnList(*settings, EYtSettingType::StatColumns, requestedColumnList, ctx))
-            .Done();
-    }
-    return status;
-}
-
 TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputSection, const TJoinLabels& labels,
     const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
     const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
@@ -4857,6 +4816,48 @@ EStarRewriteStatus RewriteYtEquiJoinStar(TYtEquiJoin equiJoin, TYtJoinNodeOp& op
 
 } // namespace
 
+IGraphTransformer::TStatus TryEstimateDataSizeChecked(TVector<ui64>& result, TYtSection& inputSection, const TString& cluster,
+    const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
+{
+    if (GetJoinCollectColumnarStatisticsMode(*state.Configuration) == EJoinCollectColumnarStatisticsMode::Sync) {
+        auto syncResult = EstimateDataSize(cluster, paths, columns, state, ctx);
+        if (!syncResult) {
+            return IGraphTransformer::TStatus::Error;
+        }
+        result = std::move(*syncResult);
+        return IGraphTransformer::TStatus::Ok;
+    }
+
+    TSet<TString> requestedColumns;
+    auto status = TryEstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx);
+    auto settings = inputSection.Settings().Ptr();
+    if (status == TStatus::Repeat) {
+        bool hasStatColumns = NYql::HasSetting(inputSection.Settings().Ref(), EYtSettingType::StatColumns);
+        if (hasStatColumns) {
+            auto oldColumns = NYql::GetSettingAsColumnList(*settings, EYtSettingType::StatColumns);
+            TSet<TString> oldColumnSet(oldColumns.begin(), oldColumns.end());
+
+            bool alreadyRequested = AllOf(requestedColumns, [&](const auto& c) {
+                return oldColumnSet.contains(c);
+            });
+
+            YQL_ENSURE(!alreadyRequested);
+
+            settings = NYql::RemoveSetting(*settings, EYtSettingType::StatColumns, ctx);
+        }
+
+        YQL_CLOG(INFO, ProviderYt) << "Stat missing for columns: " << JoinSeq(", ", requestedColumns) << ", rebuilding section";
+        TVector<TString> requestedColumnList(requestedColumns.begin(), requestedColumns.end());
+
+        inputSection = Build<TYtSection>(ctx, inputSection.Ref().Pos())
+            .InitFrom(inputSection)
+            .Settings(NYql::AddSettingAsColumnList(*settings, EYtSettingType::StatColumns, requestedColumnList, ctx))
+            .Done();
+    }
+    return status;
+}
+
+
 ui64 CalcInMemorySizeNoCrossJoin(const TJoinLabel& label, const TYtJoinNodeOp& op, const TMapJoinSettings& settings, bool isLeft, TExprContext& ctx, bool needPayload, ui64 size)
 {
     const auto& keys = *(isLeft ? op.LeftLabel : op.RightLabel);
@@ -5048,112 +5049,6 @@ TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx) {
     return root;
 }
 
-IGraphTransformer::TStatus CollectCboStatsLeaf(
-    const THashMap<TString, THashSet<TString>>& relJoinColumns,
-    const TString& cluster,
-    TYtJoinNodeLeaf& leaf,
-    const TYtState::TPtr& state,
-    TExprContext& ctx) {
-
-    const TMaybe<ui64> maxChunkCountExtendedStats = state->Configuration->ExtendedStatsMaxChunkCount.Get();
-    TVector<TYtPathInfo::TPtr> tables;
-    if (maxChunkCountExtendedStats) {
-        TVector<TString> requestedColumnList;
-        auto columnsPos = relJoinColumns.find(JoinLeafLabel(leaf.Label));
-        if (columnsPos != relJoinColumns.end()) {
-            requestedColumnList.assign(columnsPos->second.begin(), columnsPos->second.end());
-        }
-
-        THashSet<TString> memSizeColumns(requestedColumnList.begin(), requestedColumnList.end());
-        TVector<IYtGateway::TPathStatReq> pathStatReqs;
-
-        ui64 sectionChunkCount = 0;
-        for (auto path: leaf.Section.Paths()) {
-            auto pathInfo = MakeIntrusive<TYtPathInfo>(path);
-            tables.push_back(pathInfo);
-            sectionChunkCount += pathInfo->Table->Stat->ChunkCount;
-
-            if (pathInfo->HasColumns()) {
-                NYT::TRichYPath path;
-                pathInfo->FillRichYPath(path);
-                std::copy(path.Columns_->Parts_.begin(), path.Columns_->Parts_.end(), std::inserter(memSizeColumns, memSizeColumns.end()));
-            }
-
-            auto ytPath = BuildYtPathForStatRequest(cluster, *pathInfo, requestedColumnList, *state, ctx);
-
-            if (!ytPath) {
-                return IGraphTransformer::TStatus::Error;
-            }
-
-            pathStatReqs.push_back(
-                IYtGateway::TPathStatReq()
-                    .Path(*ytPath)
-                    .IsTemp(pathInfo->Table->IsTemp)
-                    .IsAnonymous(pathInfo->Table->IsAnonymous)
-                    .Epoch(pathInfo->Table->Epoch.GetOrElse(0)));
-        }
-
-        if (!pathStatReqs.empty() && (*maxChunkCountExtendedStats == 0 || sectionChunkCount <= *maxChunkCountExtendedStats)) {
-            IYtGateway::TPathStatOptions pathStatOptions =
-                IYtGateway::TPathStatOptions(state->SessionId)
-                    .Cluster(cluster)
-                    .Paths(pathStatReqs)
-                    .Config(state->Configuration->Snapshot())
-                    .Extended(true);
-
-            IYtGateway::TPathStatResult pathStats = state->Gateway->TryPathStat(std::move(pathStatOptions));
-
-            if (!pathStats.Success()) {
-                leaf.Section = Build<TYtSection>(ctx, leaf.Section.Ref().Pos())
-                    .InitFrom(leaf.Section)
-                    .Settings(NYql::AddSettingAsColumnList(leaf.Section.Settings().Ref(), EYtSettingType::StatColumns, requestedColumnList, ctx))
-                    .Done();
-                return TStatus::Repeat;
-            }
-        }
-    }
-
-    TVector<ui64> dataSize;
-    return TryEstimateDataSizeChecked(dataSize, leaf.Section, cluster, tables, {}, *state, ctx);
-}
-
-void AddJoinColumns(THashMap<TString, THashSet<TString>>& relJoinColumns, const TYtJoinNodeOp& op) {
-    for (ui32 i = 0; i < op.LeftLabel->ChildrenSize(); i += 2) {
-        auto ltable = op.LeftLabel->Child(i)->Content();
-        auto lcolumn = op.LeftLabel->Child(i + 1)->Content();
-        auto rtable = op.RightLabel->Child(i)->Content();
-        auto rcolumn = op.RightLabel->Child(i + 1)->Content();
-
-        relJoinColumns[TString(ltable)].insert(TString(lcolumn));
-        relJoinColumns[TString(rtable)].insert(TString(rcolumn));
-    }
-}
-
-IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
-    IGraphTransformer::TStatus result = TStatus::Ok;
-    TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Left.Get());
-    TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Right.Get());
-    AddJoinColumns(relJoinColumns, op);
-    if (leftLeaf) {
-        result = result.Combine(CollectCboStatsLeaf(relJoinColumns, cluster, *leftLeaf, state, ctx));
-    } else {
-        auto& leftOp = *dynamic_cast<TYtJoinNodeOp*>(op.Left.Get());
-        result = result.Combine(CollectCboStatsNode(relJoinColumns, cluster, leftOp, state, ctx));
-    }
-    if (rightLeaf) {
-        result = result.Combine(CollectCboStatsLeaf(relJoinColumns, cluster, *rightLeaf, state, ctx));
-    } else {
-        auto& rightOp = *dynamic_cast<TYtJoinNodeOp*>(op.Right.Get());
-        result = result.Combine(CollectCboStatsNode(relJoinColumns, cluster, rightOp, state, ctx));
-    }
-    return result;
-}
-
-IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
-    THashMap<TString, THashSet<TString>> relJoinColumns;
-    return CollectCboStatsNode(relJoinColumns, cluster, op, state, ctx);
-}
-
 IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
     switch (RewriteYtEquiJoinStar(equiJoin, op, state, ctx)) {
     case EStarRewriteStatus::Error:

+ 3 - 2
yt/yql/providers/yt/provider/yql_yt_join_impl.h

@@ -69,8 +69,6 @@ struct TOptimizerLinkSettings {
 
 TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx);
 
-IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
-
 IGraphTransformer::TStatus RewriteYtEquiJoinLeaves(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
 IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
 TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp& op, TExprContext& ctx, const TYtState::TPtr& state);
@@ -120,6 +118,9 @@ IGraphTransformer::TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode s
     TYtJoinNodeLeaf* leftLeaf, TYtJoinNodeLeaf* rightLeaf, const TYtState& state, bool isCross,
     TString cluster, TExprContext& ctx);
 
+IGraphTransformer::TStatus TryEstimateDataSizeChecked(TVector<ui64>& result, TYtSection& inputSection, const TString& cluster,
+    const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx);
+
 ui64 CalcInMemorySizeNoCrossJoin(const TJoinLabel& label, const TYtJoinNodeOp& op, const TMapJoinSettings& settings, bool isLeft,
     TExprContext& ctx, bool needPayload, ui64 size);
 

+ 2 - 140
yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp

@@ -1,4 +1,5 @@
 
+#include "yql_yt_cbo_helpers.h"
 #include "yql_yt_provider_context.h"
 #include "yql_yt_join_impl.h"
 #include "yql_yt_helpers.h"
@@ -250,7 +251,7 @@ private:
         }
         TRelSizeInfo leftSizeInfo;
         TRelSizeInfo rightSizeInfo;
-        ExtractMapJoinStats(leftSizeInfo, rightSizeInfo, op);
+        PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, State, Cluster, Ctx, op);
 
         auto left = ProcessNode(op->Left, leftSizeInfo);
         auto right = ProcessNode(op->Right, rightSizeInfo);
@@ -333,145 +334,6 @@ private:
         return std::make_shared<TYtRelOptimizerNode>(std::move(label), std::move(*stat), leaf);
     }
 
-    void ExtractInMemorySize(
-        TMaybe<ui64>& leftMemorySize,
-        TMaybe<ui64>& rightMemorySize,
-        ESizeStatCollectMode mode,
-        TYtJoinNodeOp* op,
-        const TJoinLabels& labels,
-        int numLeaves,
-        TYtJoinNodeLeaf* leftLeaf,
-        bool leftTablesReady,
-        const TVector<TYtPathInfo::TPtr>& leftTables,
-        const THashSet<TString>& leftJoinKeys,
-        const TStructExprType* leftItemType,
-        TYtJoinNodeLeaf* rightLeaf,
-        bool rightTablesReady,
-        const TVector<TYtPathInfo::TPtr>& rightTables,
-        const THashSet<TString>& rightJoinKeys,
-        const TStructExprType* rightItemType)
-    {
-        TMapJoinSettings mapSettings;
-        TJoinSideStats leftStats;
-        TJoinSideStats rightStats;
-        bool isCross = false;
-        auto status = CollectStatsAndMapJoinSettings(mode, mapSettings, leftStats, rightStats,
-                                                     leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
-                                                     leftLeaf, rightLeaf, *State, isCross, Cluster, Ctx);
-        if (status != IGraphTransformer::TStatus::Ok) {
-            YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status;
-            return;
-        }
-        if (leftLeaf) {
-            const bool needPayload = op->JoinKind->IsAtom("Inner") || op->JoinKind->IsAtom("Right");
-            const auto& label = labels.Inputs[0];
-            TVector<TString> leftJoinKeyList(leftJoinKeys.begin(), leftJoinKeys.end());
-            TYtSection inputSection{leftLeaf->Section};
-            const ui64 rows = mapSettings.LeftRows;
-            ui64 size = 0;
-            auto status = CalculateJoinLeafSize(size, mapSettings, inputSection, *op, Ctx, true, leftItemType, leftJoinKeyList, State, Cluster, leftTables);
-            if (status != IGraphTransformer::TStatus::Ok) {
-                YQL_CLOG(WARN, ProviderYt) << "Unable to calculate join leaf size: " << status;
-                return;
-            }
-            if (op->JoinKind->IsAtom("Cross")) {
-                leftMemorySize = size + rows * (1ULL + label.InputType->GetSize()) * sizeof(NKikimr::NUdf::TUnboxedValuePod);
-            } else {
-                leftMemorySize = CalcInMemorySizeNoCrossJoin(
-                    label, *op, mapSettings, true, Ctx, needPayload, size);
-            }
-        }
-
-        if (rightLeaf) {
-            const bool needPayload = op->JoinKind->IsAtom("Inner") || op->JoinKind->IsAtom("Left");
-            const auto& label = labels.Inputs[numLeaves - 1];
-            TVector<TString> rightJoinKeyList(rightJoinKeys.begin(), rightJoinKeys.end());
-            TYtSection inputSection{rightLeaf->Section};
-            const ui64 rows = mapSettings.RightRows;
-            ui64 size = 0;
-
-            auto status = CalculateJoinLeafSize(size, mapSettings, inputSection, *op, Ctx, false, rightItemType, rightJoinKeyList, State, Cluster, rightTables);
-            if (status != IGraphTransformer::TStatus::Ok) {
-                YQL_CLOG(WARN, ProviderYt) << "Unable to calculate join leaf size: " << status;
-                return;
-            }
-            if (op->JoinKind->IsAtom("Cross")) {
-                rightMemorySize = size + rows * (1ULL + label.InputType->GetSize()) * sizeof(NKikimr::NUdf::TUnboxedValuePod);
-            } else {
-                rightMemorySize = CalcInMemorySizeNoCrossJoin(
-                    label, *op, mapSettings, false, Ctx, needPayload, size);
-            }
-        }
-    }
-
-    void ExtractMapJoinStats(TRelSizeInfo& leftSizeInfo, TRelSizeInfo& rightSizeInfo, TYtJoinNodeOp* op) {
-        auto mapJoinUseFlow = State->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW);
-        if (!mapJoinUseFlow) {
-            // Only support flow map joins in CBO.
-            return;
-        }
-
-        TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op->Left.Get());
-        TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op->Right.Get());
-
-        bool leftTablesReady = false;
-        TVector<TYtPathInfo::TPtr> leftTables;
-        bool rightTablesReady = false;
-        TVector<TYtPathInfo::TPtr> rightTables;
-        THashSet<TString> leftJoinKeys, rightJoinKeys;
-        int numLeaves = 0;
-        TJoinLabels labels;
-        const TStructExprType* leftItemType = nullptr;
-        const TStructExprType* leftItemTypeBeforePremap = nullptr;
-        const TStructExprType* rightItemType = nullptr;
-        const TStructExprType* rightItemTypeBeforePremap = nullptr;
-
-        {
-            if (leftLeaf) {
-                TYtSection section{leftLeaf->Section};
-                if (Y_UNLIKELY(!section.Settings().Empty() && section.Settings().Item(0).Name() == "Test")) {
-                    return;
-                }
-
-                auto status = CollectPathsAndLabelsReady(leftTablesReady, leftTables, labels, leftItemType, leftItemTypeBeforePremap, *leftLeaf, Ctx);
-                if (status != IGraphTransformer::TStatus::Ok) {
-                    YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status;
-                    return;
-                }
-                if (!labels.Inputs.empty()) {
-                    leftJoinKeys = BuildJoinKeys(labels.Inputs[0], *op->LeftLabel);
-                }
-                ++numLeaves;
-            }
-            if (rightLeaf) {
-                TYtSection section{rightLeaf->Section};
-                if (Y_UNLIKELY(!section.Settings().Empty() && section.Settings().Item(0).Name() == "Test")) {
-                    return;
-                }
-                auto status = CollectPathsAndLabelsReady(rightTablesReady, rightTables, labels, rightItemType, rightItemTypeBeforePremap, *rightLeaf, Ctx);
-                if (status != IGraphTransformer::TStatus::Ok) {
-                    YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status;
-                    return;
-                }
-                if (labels.Inputs.size() > 1) {
-                    rightJoinKeys = BuildJoinKeys(labels.Inputs[1], *op->RightLabel);
-                }
-                ++numLeaves;
-            }
-        }
-
-        if (numLeaves == 0) {
-            return;
-        }
-
-        ExtractInMemorySize(leftSizeInfo.MapJoinMemSize, rightSizeInfo.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels,
-            numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType,
-            rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType);
-        ExtractInMemorySize(leftSizeInfo.LookupJoinMemSize, rightSizeInfo.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels,
-            numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType,
-            rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType);
-    }
-
     TVector<TMaybe<IYtGateway::TPathStatResult::TExtendedResult>> GetStatsFromCache(
         TYtJoinNodeLeaf* nodeLeaf, const TVector<TString>& columns, ui64 maxChunkCount) {
         TVector<IYtGateway::TPathStatReq> pathStatReqs;