Просмотр исходного кода

YQL-19424: Use WideStream instead of WideFlow in WideFromBlocks computation node
commit_hash:21c84a9004cc57883d949b8fc637fc3ae7bfbda9

imunkin 1 месяц назад
Родитель
Сommit
5bf3fd8cf1

+ 147 - 120
yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp

@@ -128,21 +128,26 @@ TExprNode::TPtr RebuildArgumentsOnlyLambdaForBlocks(const TExprNode& lambda, TEx
 
 TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
     Y_UNUSED(types);
-
-    // Static assert to ensure backward compatible change: if the
-    // constant below is true, both input and output types of
-    // WideFromBlocks callable have to be WideStream; otherwise,
-    // both input and output types have to be WideFlow.
-    // FIXME: When all spots using WideFromBlocks are adjusted
-    // to work with WideStream, drop the assertion below.
-    static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
-    if (node->Head().IsCallable("WideFromBlocks")) {
-        YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content();
-        return ctx.NewCallable(node->Pos(), "ReplicateScalars", { node->Head().HeadPtr() });
-    }
-
-    if (const auto& input = node->Head(); input.IsCallable({"Extend", "OrderedExtend"})) {
+    const auto& input = node->Head();
+    if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
+        const auto& wideFromBlocks = input.Head();
+        // Technically, the code below rewrites the following sequence
+        // (WideToBlocks (ToFlow (WideFromBlocks (<input>)))))
+        // into (ReplicateScalars (<input>)), but ToFlow/FromFlow
+        // wrappers will be removed when all other nodes in block
+        // pipeline start using WideStream instead of the WideFlow.
+        // Hence, the logging is left intact.
+        YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideFromBlocks.Content();
+        // If tail is FromFlow, its input is WideFlow and can be
+        // used intact; Otherwise the input is WideStream, so the
+        // new input should be converted to WideFlow.
+        const auto tail = wideFromBlocks.HeadPtr();
+        const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
+                             : ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
+        return ctx.NewCallable(node->Pos(), "ReplicateScalars", { flowInput });
+    }
+
+    if (input.IsCallable({"Extend", "OrderedExtend"})) {
         YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << input.Content();
         TExprNodeList newChildren;
         newChildren.reserve(input.ChildrenSize());
@@ -157,23 +162,35 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext&
 
 TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
     Y_UNUSED(types);
-
-    // Static assert to ensure backward compatible change: if the
-    // constant below is true, both input and output types of
-    // WideFromBlocks callable have to be WideStream; otherwise,
-    // both input and output types have to be WideFlow.
-    // FIXME: When all spots using WideFromBlocks are adjusted
-    // to work with WideStream, drop the assertion below.
-    static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
-    if (node->Head().IsCallable("WideToBlocks")) {
-        YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content();
-        return node->Head().HeadPtr();
-    }
-
-    if (node->Head().IsCallable("ReplicateScalars")) {
-        YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
-        return ctx.ChangeChild(*node, 0, node->Head().HeadPtr());
+    const auto& input = node->Head();
+    if (input.IsCallable("FromFlow") && input.Head().IsCallable("WideToBlocks")) {
+        const auto& wideToBlocks = input.Head();
+        // Technically, the code below rewrites the following sequence
+        // (WideFromBlocks (FromFlow (WideToBlocks (<input>))))
+        // into (FromFlow (<input>)) (to match the ToFlow parent),
+        // but ToFlow/FromFlow wrappers will be removed when all
+        // other nodes in block pipeline start using WideStream
+        // instead of the WideFlow. Hence, the logging is left intact.
+        YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideToBlocks.Content();
+        return ctx.NewCallable(node->Pos(), "FromFlow", {wideToBlocks.HeadPtr()});
+    }
+
+    if (input.IsCallable("FromFlow") && input.Head().IsCallable("ReplicateScalars")) {
+        const auto& replicateScalars = input.Head();
+        // Technically, the code below rewrites the following sequence
+        // (WideFromBlocks (FromFlow (ReplicateScalars (<input>))))
+        // into (WideFromBlocks (FromFlow (<input>))), but ToFlow/FromFlow
+        // wrappers will be removed when all other nodes in block
+        // pipeline start using WideStream instead of the WideFlow.
+        // Hence, the logging is left intact.
+        YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << replicateScalars.Content() << " as input of " << node->Content();
+        return ctx.Builder(node->Pos())
+            .Callable(node->Content())
+                .Callable(0, input.Content())
+                    .Add(0, replicateScalars.HeadPtr())
+                .Seal()
+            .Seal()
+            .Build();
     }
 
     return node;
@@ -6101,34 +6118,41 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext&
     Y_ENSURE(node->IsCallable("WideMap"));
     const auto lambda = node->TailPtr();
     // Swap trivial WideMap and WideFromBlocks.
-    if (node->Head().IsCallable("WideFromBlocks")) {
+    const auto& input = node->Head();
+    if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
         if (auto newLambda = RebuildArgumentsOnlyLambdaForBlocks(*lambda, ctx, types)) {
-            YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Head().Content() << " with " << node->Content();
-
-            // Static assert to ensure backward compatible change: if the
-            // constant below is true, both input and output types of
-            // WideFromBlocks callable have to be WideStream; otherwise,
-            // both input and output types have to be WideFlow.
-            // FIXME: When all spots using WideFromBlocks are adjusted
-            // to work with WideStream, drop the assertion below.
-            static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
+            const auto& wideFromBlocks = input.Head();
+            // Technically, the code below rewrites the following sequence
+            // (WideMap (ToFlow (WideFromBlocks (FromFlow (<input>)))))
+            // into (ToFlow (WideFromBlocks (FromFlow (WideMap (<input>)).
+            // Hence, the logging is left intact.
+            YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << wideFromBlocks.Content() << " with " << node->Content();
+            // If tail is FromFlow, its input is WideFlow and can be
+            // used intact; Otherwise the input is WideStream, so the
+            // new input should be converted to WideFlow.
+            const auto tail = wideFromBlocks.HeadPtr();
+            const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
+                                 : ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
             return ctx.Builder(node->Pos())
-                .Callable("WideFromBlocks")
-                    .Callable(0, "WideMap")
-                        .Add(0, node->Head().HeadPtr())
-                        .Add(1, newLambda)
+                .Callable("ToFlow")
+                    .Callable(0, "WideFromBlocks")
+                        .Callable(0, "FromFlow")
+                            .Callable(0, "WideMap")
+                                .Add(0, flowInput)
+                                .Add(1, newLambda)
+                            .Seal()
+                        .Seal()
                     .Seal()
                 .Seal()
                 .Build();
         }
     }
 
-    if (!CanRewriteToBlocksWithInput(node->Head(), types)) {
+    if (!CanRewriteToBlocksWithInput(input, types)) {
         return node;
     }
 
-    auto multiInputType = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
+    auto multiInputType = input.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
     ui32 newNodes;
     TNodeMap<size_t> rewritePositions;
     TExprNode::TPtr blockLambda;
@@ -6144,22 +6168,17 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext&
 
     YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes
                                   << ", extra columns: " << rewritePositions.size();
-
-    // Static assert to ensure backward compatible change: if the
-    // constant below is true, both input and output types of
-    // WideFromBlocks callable have to be WideStream; otherwise,
-    // both input and output types have to be WideFlow.
-    // FIXME: When all spots using WideFromBlocks are adjusted
-    // to work with WideStream, drop the assertion below.
-    static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
     auto ret = ctx.Builder(node->Pos())
-        .Callable("WideFromBlocks")
-            .Callable(0, "WideMap")
-                .Callable(0, "WideToBlocks")
-                    .Add(0, node->HeadPtr())
+        .Callable("ToFlow")
+            .Callable(0, "WideFromBlocks")
+                .Callable(0, "FromFlow")
+                    .Callable(0, "WideMap")
+                        .Callable(0, "WideToBlocks")
+                            .Add(0, node->HeadPtr())
+                        .Seal()
+                        .Add(1, blockLambda)
+                    .Seal()
                 .Seal()
-                .Add(1, blockLambda)
             .Seal()
         .Seal()
         .Build();
@@ -6220,18 +6239,13 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
 
         YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes
                                       << ", extra columns: " << rewritePositions.size();
-
-        // Static assert to ensure backward compatible change: if the
-        // constant below is true, both input and output types of
-        // WideFromBlocks callable have to be WideStream; otherwise,
-        // both input and output types have to be WideFlow.
-        // FIXME: When all spots using WideFromBlocks are adjusted
-        // to work with WideStream, drop the assertion below.
-        static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
         return ctx.Builder(node->Pos())
-            .Callable("WideFromBlocks")
-                .Add(0, result)
+            .Callable("ToFlow")
+                .Callable(0, "WideFromBlocks")
+                    .Callable(0, "FromFlow")
+                        .Add(0, result)
+                    .Seal()
+                .Seal()
             .Seal()
             .Build();
     }
@@ -6239,19 +6253,14 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
     if (!newNodes) {
         return node;
     }
-
-    // Static assert to ensure backward compatible change: if the
-    // constant below is true, both input and output types of
-    // WideFromBlocks callable have to be WideStream; otherwise,
-    // both input and output types have to be WideFlow.
-    // FIXME: When all spots using WideFromBlocks are adjusted
-    // to work with WideStream, drop the assertion below.
-    static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
     auto filtered = ctx.Builder(node->Pos())
         .Callable("WideFilter")
-            .Callable(0, "WideFromBlocks")
-                .Add(0, blockMapped)
+            .Callable(0, "ToFlow")
+                .Callable(0, "WideFromBlocks")
+                    .Callable(0, "FromFlow")
+                        .Add(0, blockMapped)
+                    .Seal()
+                .Seal()
             .Seal()
             .Add(1, restLambda)
         .Seal()
@@ -6320,22 +6329,17 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
 
     TStringBuf newName = node->Content() == "Skip" ? "WideSkipBlocks" : "WideTakeBlocks";
     YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName;
-
-    // Static assert to ensure backward compatible change: if the
-    // constant below is true, both input and output types of
-    // WideFromBlocks callable have to be WideStream; otherwise,
-    // both input and output types have to be WideFlow.
-    // FIXME: When all spots using WideFromBlocks are adjusted
-    // to work with WideStream, drop the assertion below.
-    static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
     return ctx.Builder(node->Pos())
-        .Callable("WideFromBlocks")
-            .Callable(0, newName)
-                .Callable(0, "WideToBlocks")
-                    .Add(0, node->HeadPtr())
+        .Callable("ToFlow")
+            .Callable(0, "WideFromBlocks")
+                .Callable(0, "FromFlow")
+                    .Callable(0, newName)
+                        .Callable(0, "WideToBlocks")
+                            .Add(0, node->HeadPtr())
+                        .Seal()
+                        .Add(1, node->ChildPtr(1))
+                    .Seal()
                 .Seal()
-                .Add(1, node->ChildPtr(1))
             .Seal()
         .Seal()
         .Build();
@@ -6375,18 +6379,13 @@ TExprNode::TPtr OptimizeTopOrSortBlocks(const TExprNode::TPtr& node, TExprContex
     YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName;
     auto children = node->ChildrenList();
     children[0] = ctx.NewCallable(node->Pos(), "WideToBlocks", { children[0] });
-
-    // Static assert to ensure backward compatible change: if the
-    // constant below is true, both input and output types of
-    // WideFromBlocks callable have to be WideStream; otherwise,
-    // both input and output types have to be WideFlow.
-    // FIXME: When all spots using WideFromBlocks are adjusted
-    // to work with WideStream, drop the assertion below.
-    static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
     return ctx.Builder(node->Pos())
-        .Callable("WideFromBlocks")
-            .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children)))
+        .Callable("ToFlow")
+            .Callable(0, "WideFromBlocks")
+                .Callable(0, "FromFlow")
+                    .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children)))
+                .Seal()
+            .Seal()
         .Seal()
         .Build();
 }
@@ -6606,17 +6605,45 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx)
                         .Add(1, DropUnusedArgs(node->Tail(), actualUnused, ctx))
                     .Seal().Build();
             }
-        } else if (input.IsCallable({"WideFromBlocks", "WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) {
+        } else if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
+            const auto& wideFromBlocks = input.Head();
+            // WideFromBlocks uses WideStream instead of WideFlow,
+            // so it's wrapped with ToFlow/FromFlow. Hence, to drop
+            // unused fields for particular WideFromBlocks node,
+            // the optimizer has to rewrite FromFlow child, but
+            // logging is left intact.
+            YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << wideFromBlocks.Content() << " with " << unused.size() << " unused fields.";
+            const auto tail = wideFromBlocks.HeadPtr();
+            const auto width = tail->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize();
+            const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
+                                 : ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
+            return ctx.Builder(node->Pos())
+                .Callable(node->Content())
+                    .Callable(0, "ToFlow")
+                        .Callable(0, "WideFromBlocks")
+                            .Callable(0, "FromFlow")
+                                .Callable(0, "WideMap")
+                                    .Add(0, flowInput)
+                                    .Lambda(1)
+                                        .Params("items", width)
+                                        .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+                                            for (auto i = 0U, j = 0U; i < width; ++i) {
+                                                if (unused.cend() == std::find(unused.cbegin(), unused.cend(), i)) {
+                                                    parent.Arg(j++, "items", i);
+                                                }
+                                            }
+                                            return parent;
+                                        })
+                                    .Seal()
+                                .Seal()
+                            .Seal()
+                        .Seal()
+                    .Seal()
+                    .Add(1, DropUnusedArgs(node->Tail(), unused, ctx))
+                .Seal()
+                .Build();
+        } else if (input.IsCallable({"WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) {
             YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << input.Content() << " with " << unused.size() << " unused fields.";
-
-            // Static assert to ensure backward compatible change: if the
-            // constant below is true, both input and output types of
-            // WideFromBlocks callable have to be WideStream; otherwise,
-            // both input and output types have to be WideFlow.
-            // FIXME: When all spots using WideFromBlocks are adjusted
-            // to work with WideStream, drop the assertion below.
-            static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
             return ctx.Builder(node->Pos())
                 .Callable(node->Content())
                     .Add(0, ctx.ChangeChild(input, 0U, MakeWideMapForDropUnused(input.HeadPtr(), unused, ctx)))

+ 2 - 11
yql/essentials/core/type_ann/type_ann_blocks.cpp

@@ -966,28 +966,19 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx
 
 IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
     Y_UNUSED(output);
-
-    // Static assert to ensure backward compatible change: if the
-    // constant below is true, both input and output types of
-    // WideFromBlocks callable have to be WideStream; otherwise,
-    // both input and output types have to be WideFlow.
-    // FIXME: When all spots using WideFromBlocks are adjusted
-    // to work with WideStream, drop the assertion below.
-    static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
     if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
         return IGraphTransformer::TStatus::Error;
     }
 
     TTypeAnnotationNode::TListType retMultiType;
-    if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) {
+    if (!EnsureWideStreamBlockType(input->Head(), retMultiType, ctx.Expr)) {
         return IGraphTransformer::TStatus::Error;
     }
 
     YQL_ENSURE(!retMultiType.empty());
     retMultiType.pop_back();
     auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
-    input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+    input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
     return IGraphTransformer::TStatus::Ok;
 }
 

+ 11 - 20
yql/essentials/core/yql_aggregate_expander.cpp

@@ -708,18 +708,9 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
 
     TExprNode::TPtr aggWideFlow;
     if (hashed) {
-
-        // Static assert to ensure backward compatible change: if the
-        // constant below is true, both input and output types of
-        // WideFromBlocks callable have to be WideStream; otherwise,
-        // both input and output types have to be WideFlow.
-        // FIXME: When all spots using WideFromBlocks are adjusted
-        // to work with WideStream, drop the assertion below.
-        static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
         aggWideFlow = Ctx.Builder(Node->Pos())
-            .Callable("WideFromBlocks")
-                .Callable(0, "ToFlow")
+            .Callable("ToFlow")
+                .Callable(0, "WideFromBlocks")
                     .Callable(0, "BlockCombineHashed")
                         .Callable(0, "FromFlow")
                             .Add(0, blocks)
@@ -2939,15 +2930,15 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
             .Build();
     }
 
-    // Static assert to ensure backward compatible change: if the
-    // constant below is true, both input and output types of
-    // WideFromBlocks callable have to be WideStream; otherwise,
-    // both input and output types have to be WideFlow.
-    // FIXME: When all spots using WideFromBlocks are adjusted
-    // to work with WideStream, drop the assertion below.
-    static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
-    auto aggWideFlow = Ctx.NewCallable(Node->Pos(), "WideFromBlocks", { aggBlocks });
+    auto aggWideFlow = Ctx.Builder(Node->Pos())
+        .Callable("ToFlow")
+            .Callable(0, "WideFromBlocks")
+                .Callable(0, "FromFlow")
+                    .Add(0, aggBlocks)
+                .Seal()
+            .Seal()
+        .Seal()
+        .Build();
     auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx);
     auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow });
     auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { streamArg }), std::move(root));

+ 1 - 1
yql/essentials/core/yql_expr_type_annotation.h

@@ -355,7 +355,7 @@ TStringBuf NormalizeCallableName(TStringBuf name);
 void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx);
 
 namespace NBlockStreamIO {
-    constexpr bool WideFromBlocks = false;
+    constexpr bool WideFromBlocks = true;
 } // namespace NBlockStreamIO
 
 }

+ 0 - 12
yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp

@@ -442,18 +442,6 @@ size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) {
     return GetSparseBitmapPopCount(src, len);
 }
 
-TArrayRef<TType *const> GetWideComponents(TType* type) {
-    if (type->IsFlow()) {
-        const auto outputFlowType = AS_TYPE(TFlowType, type);
-        return GetWideComponents(outputFlowType);
-    }
-    if (type->IsStream()) {
-        const auto outputStreamType = AS_TYPE(TStreamType, type);
-        return GetWideComponents(outputStreamType);
-    }
-    MKQL_ENSURE(false, "Expect either flow or stream");
-}
-
 size_t CalcMaxBlockLenForOutput(TType* out) {
     const auto wideComponents = GetWideComponents(out);
     MKQL_ENSURE(wideComponents.size() > 0, "Expecting at least one output column");

+ 132 - 50
yql/essentials/minikql/comp_nodes/mkql_blocks.cpp

@@ -517,10 +517,54 @@ private:
     TType* ItemType_;
 };
 
-class TWideFromBlocksWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper> {
-using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper>;
+struct TWideFromBlocksState : public TComputationValue<TWideFromBlocksState> {
+    size_t Count_ = 0;
+    size_t Index_ = 0;
+    size_t Current_ = 0;
+    NUdf::TUnboxedValue* Pointer_ = nullptr;
+    TUnboxedValueVector Values_;
+    std::vector<std::unique_ptr<IBlockReader>> Readers_;
+    std::vector<std::unique_ptr<IBlockItemConverter>> Converters_;
+    const std::vector<arrow::ValueDescr> ValuesDescr_;
+
+    TWideFromBlocksState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types)
+        : TComputationValue(memInfo)
+        , Values_(types.size() + 1)
+        , ValuesDescr_(ToValueDescr(types))
+    {
+        Pointer_ = Values_.data();
+
+        const auto& pgBuilder = ctx.Builder->GetPgBuilder();
+        for (size_t i = 0; i < types.size(); ++i) {
+            const TType* blockItemType = AS_TYPE(TBlockType, types[i])->GetItemType();
+            Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
+            Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
+        }
+    }
+
+    void ClearValues() {
+        Values_.assign(Values_.size(), NUdf::TUnboxedValuePod());
+    }
+
+    NUdf::TUnboxedValuePod Get(const THolderFactory& holderFactory, size_t idx) const {
+        TBlockItem item;
+        const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum();
+        ARROW_DEBUG_CHECK_DATUM_TYPES(ValuesDescr_[idx], datum.descr());
+        if (datum.is_scalar()) {
+            item = Readers_[idx]->GetScalarItem(*datum.scalar());
+        } else {
+            MKQL_ENSURE(datum.is_array(), "Expecting array");
+            item = Readers_[idx]->GetItem(*datum.array(), Current_);
+        }
+        return Converters_[idx]->MakeValue(item, holderFactory);
+    }
+};
+
+class TWideFromBlocksFlowWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFromBlocksFlowWrapper> {
+using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksFlowWrapper>;
+using TState = TWideFromBlocksState;
 public:
-    TWideFromBlocksWrapper(TComputationMutables& mutables,
+    TWideFromBlocksFlowWrapper(TComputationMutables& mutables,
         IComputationWideFlowNode* flow,
         TVector<TType*>&& types)
         : TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
@@ -589,7 +633,7 @@ public:
 
         const auto ptrType = PointerType::getUnqual(StructType::get(context));
         const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
-        const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideFromBlocksWrapper::MakeState));
+        const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideFromBlocksFlowWrapper::MakeState));
         const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
         const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
         CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
@@ -690,48 +734,6 @@ public:
     }
 #endif
 private:
-    struct TState : public TComputationValue<TState> {
-        size_t Count_ = 0;
-        size_t Index_ = 0;
-        size_t Current_ = 0;
-        NUdf::TUnboxedValue* Pointer_ = nullptr;
-        TUnboxedValueVector Values_;
-        std::vector<std::unique_ptr<IBlockReader>> Readers_;
-        std::vector<std::unique_ptr<IBlockItemConverter>> Converters_;
-        const std::vector<arrow::ValueDescr> ValuesDescr_;
-
-        TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types)
-            : TComputationValue(memInfo)
-            , Values_(types.size() + 1)
-            , ValuesDescr_(ToValueDescr(types))
-        {
-            Pointer_ = Values_.data();
-
-            const auto& pgBuilder = ctx.Builder->GetPgBuilder();
-            for (size_t i = 0; i < types.size(); ++i) {
-                const TType* blockItemType = AS_TYPE(TBlockType, types[i])->GetItemType();
-                Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
-                Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
-            }
-        }
-
-        void ClearValues() {
-            Values_.assign(Values_.size(), NUdf::TUnboxedValuePod());
-        }
-
-        NUdf::TUnboxedValuePod Get(const THolderFactory& holderFactory, size_t idx) const {
-            TBlockItem item;
-            const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum();
-            ARROW_DEBUG_CHECK_DATUM_TYPES(ValuesDescr_[idx], datum.descr());
-            if (datum.is_scalar()) {
-                item = Readers_[idx]->GetScalarItem(*datum.scalar());
-            } else {
-                MKQL_ENSURE(datum.is_array(), "Expecting array");
-                item = Readers_[idx]->GetItem(*datum.array(), Current_);
-            }
-            return Converters_[idx]->MakeValue(item, holderFactory);
-        }
-    };
 #ifndef MKQL_DISABLE_CODEGEN
     class TLLVMFieldsStructureState: public TLLVMFieldsStructure<TComputationValue<TState>> {
     private:
@@ -804,6 +806,74 @@ private:
     const size_t WideFieldsIndex_;
 };
 
+class TWideFromBlocksStreamWrapper : public TMutableComputationNode<TWideFromBlocksStreamWrapper>
+{
+using TBaseComputation = TMutableComputationNode<TWideFromBlocksStreamWrapper>;
+using TState = TWideFromBlocksState;
+public:
+    TWideFromBlocksStreamWrapper(TComputationMutables& mutables,
+        IComputationNode* stream,
+        TVector<TType*>&& types)
+        : TBaseComputation(mutables, EValueRepresentation::Boxed)
+        , Stream_(stream)
+        , Types_(std::move(types))
+    {}
+
+    NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const
+    {
+        const auto state = ctx.HolderFactory.Create<TState>(ctx, Types_);
+        return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
+                                                      std::move(state),
+                                                      std::move(Stream_->GetValue(ctx)));
+    }
+
+private:
+    class TStreamValue : public TComputationValue<TStreamValue> {
+    using TBase = TComputationValue<TStreamValue>;
+    public:
+        TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory,
+                     NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream)
+            : TBase(memInfo)
+            , BlockState_(blockState)
+            , Stream_(stream)
+            , HolderFactory_(holderFactory)
+        {}
+
+    private:
+        NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
+            auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get());
+            auto* inputFields = blockState.Pointer_;
+            const size_t inputWidth = blockState.Values_.size();
+
+            if (blockState.Index_ == blockState.Count_) do {
+                if (const auto result = Stream_.WideFetch(inputFields, inputWidth); result != NUdf::EFetchStatus::Ok)
+                    return result;
+
+                blockState.Index_ = 0;
+                blockState.Count_ = GetBlockCount(blockState.Values_.back());
+            } while (!blockState.Count_);
+
+            blockState.Current_ = blockState.Index_++;
+            for (size_t i = 0; i < width; i++) {
+                output[i] = blockState.Get(HolderFactory_, i);
+            }
+
+            return NUdf::EFetchStatus::Ok;
+        }
+
+        NUdf::TUnboxedValue BlockState_;
+        NUdf::TUnboxedValue Stream_;
+        const THolderFactory& HolderFactory_;
+    };
+
+    void RegisterDependencies() const final {
+        this->DependsOn(Stream_);
+    }
+
+    IComputationNode* const Stream_;
+    const TVector<TType*> Types_;
+};
+
 class TPrecomputedArrowNode : public IArrowKernelComputationNode {
 public:
     TPrecomputedArrowNode(const arrow::Datum& datum, TStringBuf kernelName)
@@ -1209,17 +1279,29 @@ IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFact
 IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
     MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
 
-    const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
-    const auto wideComponents = GetWideComponents(flowType);
+    const auto inputType = callable.GetInput(0).GetStaticType();
+    MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(),
+                "Expected either WideStream or WideFlow as an input");
+    const auto yieldsStream = callable.GetType()->GetReturnType()->IsStream();
+    MKQL_ENSURE(yieldsStream == inputType->IsStream(),
+                "Expected both input and output have to be either WideStream or WideFlow");
+
+    const auto wideComponents = GetWideComponents(inputType);
     MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column");
     TVector<TType*> items;
     for (ui32 i = 0; i < wideComponents.size() - 1; ++i) {
         items.push_back(AS_TYPE(TBlockType, wideComponents[i]));
     }
 
-    const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+    const auto wideFlowOrStream = LocateNode(ctx.NodeLocator, callable, 0);
+    if (yieldsStream) {
+        const auto wideStream = wideFlowOrStream;
+        return new TWideFromBlocksStreamWrapper(ctx.Mutables, wideStream, std::move(items));
+    }
+    // FIXME: Drop the branch below, when the time comes.
+    const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(wideFlowOrStream);
     MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
-    return new TWideFromBlocksWrapper(ctx.Mutables, wideFlow, std::move(items));
+    return new TWideFromBlocksFlowWrapper(ctx.Mutables, wideFlow, std::move(items));
 }
 
 IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx) {

+ 3 - 2
yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp

@@ -86,7 +86,7 @@ void DoNestedTuplesCompressTest() {
     node = pb.BlockExpandChunked(node);
     node = pb.WideSkipBlocks(node, pb.template NewDataLiteral<ui64>(19));
     node = pb.BlockCompress(node, 2);
-    node = pb.WideFromBlocks(node);
+    node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node)));
 
     node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode {
         return pb.NewTuple(resultTupleType, {items[0], items[1]});
@@ -186,7 +186,8 @@ Y_UNIT_TEST_LLVM(CompressBasic) {
     const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
         return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)};
     });
-    const auto compressedFlow = pb.WideFromBlocks(pb.BlockCompress(pb.WideToBlocks(wideFlow), 0));
+    const auto compressedBlocks = pb.BlockCompress(pb.WideToBlocks(wideFlow), 0);
+    const auto compressedFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(compressedBlocks)));
     const auto narrowFlow = pb.NarrowMap(compressedFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
         return pb.NewTuple({items[0], items[1]});
     });

+ 1 - 1
yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp

@@ -66,7 +66,7 @@ void DoBlockExistsOffset(size_t length, size_t offset) {
             items[4],
         };
     });
-    node = pb.WideFromBlocks(node);
+    node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node)));
     node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode {
         return pb.NewTuple(outputTupleType, {items[0], items[1], items[2], items[3]});
     });

+ 3 - 3
yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp

@@ -131,7 +131,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideTakeSkipBlocks) {
         const auto flow = MakeFlow(setup);
 
         const auto part = pb.WideSkipBlocks(flow, pb.NewDataLiteral<ui64>(7));
-        const auto plain = pb.WideFromBlocks(part);
+        const auto plain = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(part)));
 
         const auto singleValueFlow = pb.NarrowMap(plain, [&](TRuntimeNode::TList items) -> TRuntimeNode {
             return pb.Add(items[0], items[1]);
@@ -163,7 +163,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideTakeSkipBlocks) {
         const auto flow = MakeFlow(setup);
 
         const auto part = pb.WideTakeBlocks(flow, pb.NewDataLiteral<ui64>(4));
-        const auto plain = pb.WideFromBlocks(part);
+        const auto plain = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(part)));
 
         const auto singleValueFlow = pb.NarrowMap(plain, [&](TRuntimeNode::TList items) -> TRuntimeNode {
             return pb.Add(items[0], items[1]);
@@ -198,7 +198,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideTakeSkipBlocks) {
         const auto flow = MakeFlow(setup);
 
         const auto part = pb.WideTakeBlocks(pb.WideSkipBlocks(flow, pb.NewDataLiteral<ui64>(3)), pb.NewDataLiteral<ui64>(5));
-        const auto plain = pb.WideFromBlocks(part);
+        const auto plain = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(part)));
 
         const auto singleValueFlow = pb.NarrowMap(plain, [&](TRuntimeNode::TList items) -> TRuntimeNode {
             // 0,  0;

+ 36 - 18
yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp

@@ -48,9 +48,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
 
         const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
 
-        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+        const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
             [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
-            pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}})),
+            pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}});
+        const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks)));
+        const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow,
             [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
         ));
 
@@ -111,9 +113,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
 
         const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
 
-        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+        const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
             [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
-            pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}})),
+            pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}});
+        const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks)));
+        const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow,
             [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
         ));
 
@@ -180,9 +184,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
 
         const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
 
-        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+        const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
             [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
-            pb.NewDataLiteral<ui64>(3ULL), {{1U, pb.NewDataLiteral<bool>(true)}})),
+            pb.NewDataLiteral<ui64>(3ULL), {{1U, pb.NewDataLiteral<bool>(true)}});
+        const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks)));
+        const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow,
             [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
         ));
 
@@ -240,9 +246,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
 
         const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
 
-        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+        const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
             [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
-            pb.NewDataLiteral<ui64>(2ULL), {{1U, pb.NewDataLiteral<bool>(false)}})),
+            pb.NewDataLiteral<ui64>(2ULL), {{1U, pb.NewDataLiteral<bool>(false)}});
+        const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks)));
+        const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow,
             [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
         ));
 
@@ -297,9 +305,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
 
         const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
 
-        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+        const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
             [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
-            pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}, {1U, pb.NewDataLiteral<bool>(false)}})),
+            pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}, {1U, pb.NewDataLiteral<bool>(false)}});
+        const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks)));
+        const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow,
             [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
         ));
 
@@ -360,9 +370,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
 
         const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
 
-        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+        const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
             [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
-            pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}, {1U, pb.NewDataLiteral<bool>(true)}})),
+            pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}, {1U, pb.NewDataLiteral<bool>(true)}});
+        const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks)));
+        const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow,
             [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
         ));
 
@@ -429,9 +441,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
 
         const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
 
-        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+        const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
             [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
-            pb.NewDataLiteral<ui64>(4ULL), {{1U, pb.NewDataLiteral<bool>(true)}, {0U, pb.NewDataLiteral<bool>(false)}})),
+            pb.NewDataLiteral<ui64>(4ULL), {{1U, pb.NewDataLiteral<bool>(true)}, {0U, pb.NewDataLiteral<bool>(false)}});
+        const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks)));
+        const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow,
             [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
         ));
 
@@ -492,9 +506,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
 
         const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
 
-        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+        const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
             [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
-            pb.NewDataLiteral<ui64>(6ULL), {{1U, pb.NewDataLiteral<bool>(false)}, {0U, pb.NewDataLiteral<bool>(true)}})),
+            pb.NewDataLiteral<ui64>(6ULL), {{1U, pb.NewDataLiteral<bool>(false)}, {0U, pb.NewDataLiteral<bool>(true)}});
+        const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks)));
+        const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow,
             [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
         ));
 
@@ -565,9 +581,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockSortTest) {
 
         const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
 
-        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+        const auto sortBlocks = pb.WideSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
             [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
-            {{0U, pb.NewDataLiteral<bool>(true)}})),
+            {{0U, pb.NewDataLiteral<bool>(true)}});
+        const auto sortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(sortBlocks)));
+        const auto pgmReturn = pb.Collect(pb.NarrowMap(sortFlow,
             [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
         ));
 

Некоторые файлы не были показаны из-за большого количества измененных файлов