#include "mkql_computation_node_ut.h"

#include <yql/essentials/minikql/mkql_node_cast.h>
#include <yql/essentials/minikql/mkql_runtime_version.h>
#include <yql/essentials/minikql/mkql_string_util.h>

#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/minikql/computation/mock_spiller_factory_ut.h>

#include <cstring>
#include <algorithm>

namespace NKikimr {
namespace NMiniKQL {
namespace {

constexpr auto border = 9124596000000000ULL;

struct TTestStreamParams {
    static constexpr ui64 Yield = std::numeric_limits<ui64>::max();

    ui64 StringSize = 1;
    std::vector<ui64> TestYieldStreamData;
};

class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper> {
using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
public:
    class TStreamValue : public TComputationValue<TStreamValue> {
    public:
        using TBase = TComputationValue<TStreamValue>;

        TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, TTestStreamParams& params)
            : TBase(memInfo), CompCtx(compCtx), Params(params)
        {}
    private:
        NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {

            auto size = Params.TestYieldStreamData.size();
            if (Index == size) {
                return NUdf::EFetchStatus::Finish;
            }

            const auto val = Params.TestYieldStreamData[Index];
            if (Params.Yield == val) {
                ++Index;
                return NUdf::EFetchStatus::Yield;
            }

            NUdf::TUnboxedValue* items = nullptr;
            result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items);
            items[0] = NUdf::TUnboxedValuePod(val);
            items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val) * Params.StringSize));

            ++Index;

            return NUdf::EFetchStatus::Ok;
        }

    private:
        TComputationContext& CompCtx;
        ui64 Index = 0;
        TTestStreamParams& Params;
    };

    TTestStreamWrapper(TComputationMutables& mutables, TTestStreamParams& params)
        : TBaseComputation(mutables)
        , Params(params)
    {}

    NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
        return ctx.HolderFactory.Create<TStreamValue>(ctx, Params);
    }
private:
    void RegisterDependencies() const final {}

    TTestStreamParams& Params;
};

IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx, TTestStreamParams& params) {
    return new TTestStreamWrapper(ctx.Mutables, params);
}

TComputationNodeFactory GetNodeFactory(TTestStreamParams& params) {
    return [&params](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
        if (callable.GetType()->GetName() == "TestYieldStream") {
            return WrapTestStream(ctx, params);
        }
        return GetBuiltinFactory()(callable, ctx);
    };
}

template <bool LLVM>
TRuntimeNode MakeStream(TSetup<LLVM>& setup) {
    TProgramBuilder& pb = *setup.PgmBuilder;

    TCallableBuilder callableBuilder(*setup.Env, "TestYieldStream",
        pb.NewStreamType(
            pb.NewStructType({
                {TStringBuf("a"), pb.NewDataType(NUdf::EDataSlot::Uint64)},
                {TStringBuf("b"), pb.NewDataType(NUdf::EDataSlot::String)}
            })
        )
    );

    return TRuntimeNode(callableBuilder.Build(), false);
}

template <bool OverFlow>
TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)> finishLambda) {
    const auto keyExtractor = [&](TRuntimeNode item) {
        return pb.Member(item, "a");
    };
    const auto init = [&](TRuntimeNode /*key*/, TRuntimeNode item) {
        return item;
    };
    const auto update = [&](TRuntimeNode /*key*/, TRuntimeNode item, TRuntimeNode state) {
        const auto a = pb.Add(pb.Member(item, "a"), pb.Member(state, "a"));
        const auto b = pb.Concat(pb.Member(item, "b"), pb.Member(state, "b"));
        return pb.NewStruct({
            {TStringBuf("a"), a},
            {TStringBuf("b"), b},
        });
    };

    return OverFlow ?
        pb.FromFlow(pb.CombineCore(pb.ToFlow(stream), keyExtractor, init, update, finishLambda, 64ul << 20)):
        pb.CombineCore(stream, keyExtractor, init, update, finishLambda, 64ul << 20);
}

template<bool SPILLING>
TRuntimeNode WideLastCombiner(TProgramBuilder& pb, TRuntimeNode flow, const TProgramBuilder::TWideLambda& extractor, const TProgramBuilder::TBinaryWideLambda& init, const TProgramBuilder::TTernaryWideLambda& update, const TProgramBuilder::TBinaryWideLambda& finish) {
    return SPILLING ?
        pb.WideLastCombinerWithSpilling(flow, extractor, init, update, finish):
        pb.WideLastCombiner(flow, extractor, init, update, finish);
}

void CheckIfStreamHasExpectedStringValues(const NUdf::TUnboxedValue& streamValue, std::unordered_set<TString>& expected) {
        NUdf::TUnboxedValue item;
        NUdf::EFetchStatus fetchStatus;
        while (!expected.empty()) {
            fetchStatus = streamValue.Fetch(item);
            UNIT_ASSERT_UNEQUAL(fetchStatus, NUdf::EFetchStatus::Finish);
            if (fetchStatus == NYql::NUdf::EFetchStatus::Yield) continue;

            const auto actual = TString(item.AsStringRef());

            auto it = expected.find(actual);
            UNIT_ASSERT(it != expected.end());
            expected.erase(it);
        }
        fetchStatus = streamValue.Fetch(item);
        UNIT_ASSERT_EQUAL(fetchStatus, NUdf::EFetchStatus::Finish);
}

} // unnamed

#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
    Y_UNIT_TEST_LLVM(TestLongStringsRefCounting) {
        TSetup<LLVM> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
        const auto optionalType = pb.NewOptionalType(dataType);
        const auto tupleType = pb.NewTupleType({dataType, dataType});

        const auto keyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("key one");
        const auto keyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("key two");

        const auto longKeyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long key one");
        const auto longKeyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long key two");

        const auto value1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 1");
        const auto value2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 2");
        const auto value3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 3");
        const auto value4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 4");
        const auto value5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 5");
        const auto value6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 6");
        const auto value7 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 7");
        const auto value8 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 8");
        const auto value9 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 9");

        const auto data1 = pb.NewTuple(tupleType, {keyOne, value1});

        const auto data2 = pb.NewTuple(tupleType, {keyTwo, value2});
        const auto data3 = pb.NewTuple(tupleType, {keyTwo, value3});

        const auto data4 = pb.NewTuple(tupleType, {longKeyOne, value4});

        const auto data5 = pb.NewTuple(tupleType, {longKeyTwo, value5});
        const auto data6 = pb.NewTuple(tupleType, {longKeyTwo, value6});
        const auto data7 = pb.NewTuple(tupleType, {longKeyTwo, value7});
        const auto data8 = pb.NewTuple(tupleType, {longKeyTwo, value8});
        const auto data9 = pb.NewTuple(tupleType, {longKeyTwo, value9});

        const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), -100000LL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                return {pb.NewOptional(items.back()), pb.NewOptional(keys.front()), pb.NewEmptyOptional(optionalType), pb.NewEmptyOptional(optionalType)};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.NewOptional(items.back()), state.front(), state[1U], state[2U]};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                state.erase(state.cbegin());
                return {pb.FlatMap(pb.NewList(optionalType, state), [&](TRuntimeNode item) { return item; } )};
            }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode {
                return pb.Fold1(items.front(),
                    [&](TRuntimeNode item) { return item; },
                    [&](TRuntimeNode item, TRuntimeNode state) {
                        return pb.AggrConcat(pb.AggrConcat(state, pb.NewDataLiteral<NUdf::EDataSlot::String>(" / ")), item);
                    }
                );
            }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        const auto iterator = graph->GetValue().GetListIterator();
        NUdf::TUnboxedValue item;
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "key one");
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "very long value 2 / key two");
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "very long key one");
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "very long value 8 / very long value 7 / very long value 6");
        UNIT_ASSERT(!iterator.Next(item));
        UNIT_ASSERT(!iterator.Next(item));
    }

    Y_UNIT_TEST_LLVM(TestLongStringsPasstroughtRefCounting) {
        TSetup<LLVM> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
        const auto tupleType = pb.NewTupleType({dataType, dataType});

        const auto keyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("key one");
        const auto keyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("key two");

        const auto longKeyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long key one");
        const auto longKeyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long key two");

        const auto value1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 1");
        const auto value2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 2");
        const auto value3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 3");
        const auto value4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 4");
        const auto value5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 5");
        const auto value6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 6");
        const auto value7 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 7");
        const auto value8 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 8");
        const auto value9 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 9");

        const auto data1 = pb.NewTuple(tupleType, {keyOne, value1});

        const auto data2 = pb.NewTuple(tupleType, {keyTwo, value2});
        const auto data3 = pb.NewTuple(tupleType, {keyTwo, value3});

        const auto data4 = pb.NewTuple(tupleType, {longKeyOne, value4});

        const auto data5 = pb.NewTuple(tupleType, {longKeyTwo, value5});
        const auto data6 = pb.NewTuple(tupleType, {longKeyTwo, value6});
        const auto data7 = pb.NewTuple(tupleType, {longKeyTwo, value7});
        const auto data8 = pb.NewTuple(tupleType, {longKeyTwo, value8});
        const auto data9 = pb.NewTuple(tupleType, {longKeyTwo, value9});

        const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), -1000000LL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                return {items.back(), keys.front(), items.back(), items.front()};
            },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {items.back(), keys.front(), state[2U], state.back()};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return state;
            }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode {
                return pb.Fold1(pb.NewList(dataType, items),
                    [&](TRuntimeNode item) { return item; },
                    [&](TRuntimeNode item, TRuntimeNode state) {
                        return pb.AggrConcat(pb.AggrConcat(state, pb.NewDataLiteral<NUdf::EDataSlot::String>(" / ")), item);
                    }
                );
            }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        const auto iterator = graph->GetValue().GetListIterator();
        NUdf::TUnboxedValue item;
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "very long value 1 / key one / very long value 1 / key one");
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "very long value 3 / key two / very long value 2 / key two");
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "very long value 4 / very long key one / very long value 4 / very long key one");
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "very long value 9 / very long key two / very long value 5 / very long key two");
        UNIT_ASSERT(!iterator.Next(item));
        UNIT_ASSERT(!iterator.Next(item));
    }

    Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedInput) {
        TSetup<LLVM> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
        const auto optionalType = pb.NewOptionalType(dataType);
        const auto tupleType = pb.NewTupleType({dataType, optionalType, dataType});

        const auto keyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("key one");
        const auto keyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("key two");

        const auto value1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 1");
        const auto value2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 2");
        const auto value3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 3");
        const auto value4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 4");
        const auto value5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 5");

        const auto empty = pb.NewDataLiteral<NUdf::EDataSlot::String>("");

        const auto none = pb.NewEmptyOptional(optionalType);

        const auto data1 = pb.NewTuple(tupleType, {keyOne, none, value1});
        const auto data2 = pb.NewTuple(tupleType, {keyTwo, none, value2});
        const auto data3 = pb.NewTuple(tupleType, {keyTwo, none, value3});
        const auto data4 = pb.NewTuple(tupleType, {keyOne, none, value4});
        const auto data5 = pb.NewTuple(tupleType, {keyOne, none, value5});
        const auto data6 = pb.NewTuple(tupleType, {keyOne, none, value1});
        const auto data7 = pb.NewTuple(tupleType, {keyOne, none, value2});
        const auto data8 = pb.NewTuple(tupleType, {keyTwo, none, value3});
        const auto data9 = pb.NewTuple(tupleType, {keyTwo, none, value4});

        const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

        const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }), -1000000LL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                return {items.back(), keys.front(), empty, empty};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {items.back(), state.front(), state[1U], state[2U]};
            },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                state.insert(state.cbegin(), keys.cbegin(), keys.cend());
                return {pb.NewList(dataType, state)};
            }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode {
                return pb.Fold1(items.front(),
                    [&](TRuntimeNode item) { return item; },
                    [&](TRuntimeNode item, TRuntimeNode state) {
                        return pb.AggrConcat(pb.AggrConcat(state, pb.NewDataLiteral<NUdf::EDataSlot::String>(" / ")), item);
                    }
                );
            }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        const auto iterator = graph->GetValue().GetListIterator();
        NUdf::TUnboxedValue item;
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "key one / value 2 / value 1 / value 5 / value 4");
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "key two / value 4 / value 3 / value 3 / value 2");
        UNIT_ASSERT(!iterator.Next(item));
        UNIT_ASSERT(!iterator.Next(item));
    }

    Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedOutput) {
        TSetup<LLVM> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
        const auto optionalType = pb.NewOptionalType(dataType);
        const auto tupleType = pb.NewTupleType({dataType, optionalType, dataType});

        const auto keyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("key one");
        const auto keyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("key two");

        const auto value1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 1");
        const auto value2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 2");
        const auto value3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 3");
        const auto value4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 4");
        const auto value5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 5");

        const auto empty = pb.NewDataLiteral<NUdf::EDataSlot::String>("");

        const auto none = pb.NewEmptyOptional(optionalType);

        const auto data1 = pb.NewTuple(tupleType, {keyOne, none, value1});
        const auto data2 = pb.NewTuple(tupleType, {keyTwo, none, value2});
        const auto data3 = pb.NewTuple(tupleType, {keyTwo, none, value3});
        const auto data4 = pb.NewTuple(tupleType, {keyOne, none, value4});
        const auto data5 = pb.NewTuple(tupleType, {keyOne, none, value5});
        const auto data6 = pb.NewTuple(tupleType, {keyOne, none, value1});
        const auto data7 = pb.NewTuple(tupleType, {keyOne, none, value2});
        const auto data8 = pb.NewTuple(tupleType, {keyTwo, none, value3});
        const auto data9 = pb.NewTuple(tupleType, {keyTwo, none, value4});

        const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

        const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }), 0ULL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                return {items[1U], items.back()};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.Concat(state.front(), items[1U]), pb.AggrConcat(pb.AggrConcat(state.back(), pb.NewDataLiteral<NUdf::EDataSlot::String>(", ")), items.back())};
            },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.Unwrap(state.front(), landmine, __FILE__, __LINE__, 0),  pb.AggrConcat(pb.AggrConcat(keys.front(), pb.NewDataLiteral<NUdf::EDataSlot::String>(": ")), state.back())};
            }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.back(); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        const auto iterator = graph->GetValue().GetListIterator();
        NUdf::TUnboxedValue item;
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "key one: value 1, value 4, value 5, value 1, value 2");
        UNIT_ASSERT(iterator.Next(item));
        UNBOXED_VALUE_STR_EQUAL(item, "key two: value 2, value 3, value 3, value 4");
        UNIT_ASSERT(!iterator.Next(item));
        UNIT_ASSERT(!iterator.Next(item));
    }

    Y_UNIT_TEST_LLVM(TestThinAllLambdas) {
        TSetup<LLVM> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto tupleType = pb.NewTupleType({});
        const auto data = pb.NewTuple({});

        const auto list = pb.NewList(tupleType, {data, data, data, data});

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
            [](TRuntimeNode) -> TRuntimeNode::TList { return {}; }), 0ULL,
            [](TRuntimeNode::TList items) { return items; },
            [](TRuntimeNode::TList, TRuntimeNode::TList items) { return items; },
            [](TRuntimeNode::TList, TRuntimeNode::TList, TRuntimeNode::TList state) { return state; },
            [](TRuntimeNode::TList, TRuntimeNode::TList state) { return state; }),
            [&](TRuntimeNode::TList) { return pb.NewTuple({}); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        const auto iterator = graph->GetValue().GetListIterator();
        NUdf::TUnboxedValue item;
        UNIT_ASSERT(!iterator.Next(item));
        UNIT_ASSERT(!iterator.Next(item));
    }
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 46u
    Y_UNIT_TEST_LLVM(TestHasLimitButPasstroughtYields) {
        TTestStreamParams params;
        params.TestYieldStreamData = {0, 1, 0, 2, TTestStreamParams::Yield, 0, TTestStreamParams::Yield, 1, 2, 0, 1, 3, 0, TTestStreamParams::Yield, 1, 2};
        TSetup<LLVM> setup(GetNodeFactory(params));
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto stream = MakeStream<LLVM>(setup);
        const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(stream),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Member(item, "a"), pb.Member(item, "b")}; }), -123456789LL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {state.front(), pb.AggrConcat(state.back(), items.back())}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
            [&](TRuntimeNode::TList items) { return items.back(); }
        ));
        const auto graph = setup.BuildGraph(pgmReturn);
        const auto streamVal = graph->GetValue();
        NUdf::TUnboxedValue result;
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
        UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "00000");
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
        UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "1111");
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
        UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "222");
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
        UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "3");
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
    }
#endif
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 46u
    Y_UNIT_TEST_LLVM(TestSkipYieldRespectsMemLimit) {
        TTestStreamParams params;
        params.StringSize = 50000;
        params.TestYieldStreamData = {0, TTestStreamParams::Yield, 2, TTestStreamParams::Yield, 3, TTestStreamParams::Yield, 4};
        TSetup<LLVM> setup(GetNodeFactory(params));
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto stream = MakeStream<LLVM>(setup);
        const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(stream),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Member(item, "a"), pb.Member(item, "b")}; }), -100000LL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {state.front(), pb.AggrConcat(state.back(), items.back())}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
            [&](TRuntimeNode::TList items) { return items.back(); }
        ));
        const auto graph = setup.BuildGraph(pgmReturn);
        const auto streamVal = graph->GetValue();
        NUdf::TUnboxedValue result;

        // skip first 2 yields
        UNIT_ASSERT_VALUES_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
        // return all the collected values
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
        UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
    }
#endif
}

Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) {
    Y_UNIT_TEST_LLVM(TestSumDoubleBooleanKeys) {
        TSetup<LLVM> setup;

        double positive = 0.0, negative = 0.0;
        const auto t = TInstant::Now();
        for (const auto& sample : I8Samples) {
            (sample.second > 0.0 ? positive : negative) += sample.second;
        }
        const auto cppTime = TInstant::Now() - t;

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }), 0ULL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {pb.AggrGreater(items.front(), pb.NewDataLiteral(0.0))}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.front())}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
            [&](TRuntimeNode::TList items) { return items.front(); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
        std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto first = value.GetElement(0);
        const auto second = value.GetElement(1);
        const auto t2 = TInstant::Now();

        if (first.template Get<double>() > 0.0) {
            UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), positive);
            UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), negative);
        } else {
            UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), negative);
            UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), positive);
        }

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleBooleanKeys) {
        TSetup<LLVM> setup;
        auto samples = I8Samples;
        samples.emplace_back(-1, -1.0); //ensure to have at least one negative value
        samples.emplace_back(1, 1.0); //ensure to have at least one positive value
        double pSum = 0.0, nSum = 0.0, pMax = 0.0, nMax = -1000.0, pMin = 1000.0, nMin = 0.0;
        const auto t = TInstant::Now();
        for (const auto& sample : samples) {
            if (sample.second > 0.0) {
                pSum += sample.second;
                pMax = std::max(pMax, sample.second);
                pMin = std::min(pMin, sample.second);
            } else {
                nSum += sample.second;
                nMax = std::max(nMax, sample.second);
                nMin = std::min(nMin, sample.second);
            }
        }

        const auto cppTime = TInstant::Now() - t;

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }), 0ULL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {pb.AggrGreater(items.front(), pb.NewDataLiteral(0.0))}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front(), items.front(), items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.AggrAdd(state.front(), items.front()), pb.AggrMin(state[1U], items.front()), pb.AggrMax(state.back(), items.back()) };
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(samples.size(), items));
        std::transform(samples.cbegin(), samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto first = value.GetElement(0);
        const auto second = value.GetElement(1);
        const auto t2 = TInstant::Now();

        if (first.GetElement(0).template Get<double>() > 0.0) {
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), pSum);
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), pMin);
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), pMax);

            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), nSum);
            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), nMin);
            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), nMax);
        } else {
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), nSum);
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), nMin);
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), nMax);

            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), pSum);
            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), pMin);
            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), pMax);
        }

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestSumDoubleSmallKey) {
        TSetup<LLVM> setup;

        std::unordered_map<i8, double> expects(201);
        const auto t = TInstant::Now();
        for (const auto& sample : I8Samples) {
            expects.emplace(sample.first, 0.0).first->second += sample.second;
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<i8, double>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }), 0ULL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back())}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {keys.front(), state.front()}; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
        for (const auto& sample : I8Samples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[0] = NUdf::TUnboxedValuePod(sample.first);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), ptr[i].GetElement(1).template Get<double>());
        }

        std::sort(two.begin(), two.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleSmallKey) {
        TSetup<LLVM> setup;

        std::unordered_map<i8, std::array<double, 3U>> expects(201);
        const auto t = TInstant::Now();
        for (const auto& sample : I8Samples) {
            auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, std::numeric_limits<double>::max(), std::numeric_limits<double>::min()}).first->second;
            std::get<0U>(item) += sample.second;
            std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
            std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<i8, std::array<double, 3U>>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }), 0ULL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back(), items.back(), items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back()), pb.AggrMin(state[1U], items.back()), pb.AggrMax(state.back(), items.back())}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { state.insert(state.cbegin(), keys.front()); return state; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
        for (const auto& sample : I8Samples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[0] = NUdf::TUnboxedValuePod(sample.first);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
        }

        std::sort(two.begin(), two.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestSumDoubleStringKey) {
        TSetup<LLVM> setup;

        std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
        std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });

        std::unordered_map<std::string, double> expects(201);
        const auto t = TInstant::Now();
        for (const auto& sample : stringI8Samples) {
            expects.emplace(sample.first, 0.0).first->second += sample.second;
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<std::string_view, double>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }), 0ULL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back())}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {keys.front(), state.front()}; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
        for (const auto& sample : stringI8Samples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            two.emplace_back(ptr[i].GetElements()->AsStringRef(), ptr[i].GetElement(1).template Get<double>());
        }

        std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleStringKey) {
        TSetup<LLVM> setup;

        std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
        std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });

        std::unordered_map<std::string, std::array<double, 3U>> expects(201);
        const auto t = TInstant::Now();
        for (const auto& sample : stringI8Samples) {
            auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
            std::get<0U>(item) += sample.second;
            std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
            std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<std::string_view, std::array<double, 3U>>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }), 0ULL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back(), items.back(), items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back()), pb.AggrMin(state[1U], items.back()), pb.AggrMax(state.back(), items.back())}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { state.insert(state.cbegin(), keys.front()); return state; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
        for (const auto& sample : stringI8Samples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            two.emplace_back(ptr[i].GetElements()->AsStringRef(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
        }

        std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestMinMaxSumTupleKey) {
        TSetup<LLVM> setup;

        std::vector<std::pair<std::pair<ui32, std::string>, double>> pairSamples(Ui16Samples.size());
        std::transform(Ui16Samples.cbegin(), Ui16Samples.cend(), pairSamples.begin(), [](std::pair<ui16, double> src){ return std::make_pair(std::make_pair(ui32(src.first / 10U % 100U), ToString(src.first % 10U)), src.second); });

        struct TPairHash { size_t operator()(const std::pair<ui32, std::string>& p) const { return CombineHashes(std::hash<ui32>()(p.first), std::hash<std::string_view>()(p.second)); } };

        std::unordered_map<std::pair<ui32, std::string>, std::array<double, 3U>, TPairHash> expects;
        const auto t = TInstant::Now();
        for (const auto& sample : pairSamples) {
            auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
            std::get<0U>(item) += sample.second;
            std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
            std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<std::pair<ui32, std::string>, std::array<double, 3U>>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewTupleType({pb.NewDataType(NUdf::TDataType<ui32>::Id), pb.NewDataType(NUdf::TDataType<const char*>::Id)}), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(pb.Nth(item, 0U), 0U), pb.Nth(pb.Nth(item, 0U), 1U), pb.Nth(item, 1U) }; }), 0ULL,
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front(), items[1U]}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back(), items.back(), items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.AggrAdd(state.front(), items.back()), pb.AggrMin(state[1U], items.back()), pb.AggrMax(state.back(), items.back()) };
            },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {keys.front(), keys.back(), state.front(), state[1U], state.back()}; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple({pb.NewTuple({items[0U], items[1U]}), items[2U], items[3U], items[4U]}); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(pairSamples.size(), items));
        for (const auto& sample : pairSamples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
            NUdf::TUnboxedValue* keys = nullptr;
            pair[0] =  graph->GetHolderFactory().CreateDirectArrayHolder(2U, keys);
            keys[0] = NUdf::TUnboxedValuePod(sample.first.first);
            keys[1] = NUdf::TUnboxedValuePod::Embedded(sample.first.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            const auto elements = ptr[i].GetElements();
            two.emplace_back(std::make_pair(elements[0].GetElement(0).template Get<ui32>(), (elements[0].GetElements()[1]).AsStringRef()), std::array<double, 3U>{elements[1].template Get<double>(), elements[2].template Get<double>(), elements[3].template Get<double>()});
        }

        std::sort(two.begin(), two.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestTpch) {
        TSetup<LLVM> setup;

        struct TPairHash { size_t operator()(const std::pair<std::string_view, std::string_view>& p) const { return CombineHashes(std::hash<std::string_view>()(p.first), std::hash<std::string_view>()(p.second)); } };

        std::unordered_map<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>, TPairHash> expects;
        const auto t = TInstant::Now();
        for (auto& sample : TpchSamples) {
            if (std::get<0U>(sample) <= border) {
                const auto& ins = expects.emplace(std::pair<std::string_view, std::string_view>{std::get<1U>(sample), std::get<2U>(sample)}, std::pair<ui64, std::array<double, 5U>>{0ULL, {0., 0., 0., 0., 0.}});
                auto& item = ins.first->second;
                ++item.first;
                std::get<0U>(item.second) += std::get<3U>(sample);
                std::get<1U>(item.second) += std::get<5U>(sample);
                std::get<2U>(item.second) += std::get<6U>(sample);
                const auto v = std::get<3U>(sample) * (1. - std::get<5U>(sample));
                std::get<3U>(item.second) += v;
                std::get<4U>(item.second) += v * (1. + std::get<4U>(sample));
            }
        }
        for (auto& item : expects) {
            std::get<1U>(item.second.second) /= item.second.first;
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<std::pair<std::string, std::string>, std::pair<ui64, std::array<double, 5U>>>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> l, const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({
            pb.NewDataType(NUdf::TDataType<ui64>::Id),
            pb.NewDataType(NUdf::TDataType<const char*>::Id),
            pb.NewDataType(NUdf::TDataType<const char*>::Id),
            pb.NewDataType(NUdf::TDataType<double>::Id),
            pb.NewDataType(NUdf::TDataType<double>::Id),
            pb.NewDataType(NUdf::TDataType<double>::Id),
            pb.NewDataType(NUdf::TDataType<double>::Id)
        }));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(
            pb.WideFilter(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
                [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U), pb.Nth(item, 4U), pb.Nth(item, 5U), pb.Nth(item, 6U)}; }),
                [&](TRuntimeNode::TList items) { return pb.AggrLessOrEqual(items.front(), pb.NewDataLiteral<ui64>(border)); }
            ), 0ULL,
            [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item[1U], item[2U]}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                const auto price = items[3U];
                const auto disco = items[5U];
                const auto v = pb.Mul(price, pb.Sub(pb.NewDataLiteral<double>(1.), disco));
                return {pb.NewDataLiteral<ui64>(1ULL), price, disco, items[6U], v, pb.Mul(v, pb.Add(pb.NewDataLiteral<double>(1.), items[4U]))};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                const auto price = items[3U];
                const auto disco = items[5U];
                const auto v = pb.Mul(price, pb.Sub(pb.NewDataLiteral<double>(1.), disco));
                return {pb.Increment(state[0U]), pb.AggrAdd(state[1U], price), pb.AggrAdd(state[2U], disco), pb.AggrAdd(state[3U], items[6U]), pb.AggrAdd(state[4U], v), pb.AggrAdd(state[5U], pb.Mul(v, pb.Add(pb.NewDataLiteral<double>(1.), items[4U])))};
            },
            [&](TRuntimeNode::TList key, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {key.front(), key.back(), state[0U], state[1U], pb.Div(state[2U], state[0U]), state[3U], state[4U], state[5U]}; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(TpchSamples.size(), items));
        for (const auto& sample : TpchSamples) {
            NUdf::TUnboxedValue* elements = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(7U, elements);
            elements[0] = NUdf::TUnboxedValuePod(std::get<0U>(sample));
            elements[1] = NUdf::TUnboxedValuePod::Embedded(std::get<1U>(sample));
            elements[2] = NUdf::TUnboxedValuePod::Embedded(std::get<2U>(sample));
            elements[3] = NUdf::TUnboxedValuePod(std::get<3U>(sample));
            elements[4] = NUdf::TUnboxedValuePod(std::get<4U>(sample));
            elements[5] = NUdf::TUnboxedValuePod(std::get<5U>(sample));
            elements[6] = NUdf::TUnboxedValuePod(std::get<6U>(sample));
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            const auto elements = ptr[i].GetElements();
            two.emplace_back(std::make_pair(elements[0].AsStringRef(), elements[1].AsStringRef()), std::pair<ui64, std::array<double, 5U>>{elements[2].template Get<ui64>(), {elements[3].template Get<double>(), elements[4].template Get<double>(), elements[5].template Get<double>(), elements[6].template Get<double>(), elements[7].template Get<double>()}});
        }

        std::sort(two.begin(), two.end(), [](const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> l, const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }
}
#endif
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 29u
Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
    Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsRefCounting) {
        // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
        if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;

        TSetup<LLVM, SPILLING> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
        const auto optionalType = pb.NewOptionalType(dataType);
        const auto tupleType = pb.NewTupleType({dataType, dataType});

        const auto keyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("key one");
        const auto keyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("key two");

        const auto longKeyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long key one");
        const auto longKeyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long key two");

        const auto value1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 1");
        const auto value2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 2");
        const auto value3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 3");
        const auto value4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 4");
        const auto value5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 5");
        const auto value6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 6");
        const auto value7 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 7");
        const auto value8 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 8");
        const auto value9 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 9");

        const auto data1 = pb.NewTuple(tupleType, {keyOne, value1});

        const auto data2 = pb.NewTuple(tupleType, {keyTwo, value2});
        const auto data3 = pb.NewTuple(tupleType, {keyTwo, value3});

        const auto data4 = pb.NewTuple(tupleType, {longKeyOne, value4});

        const auto data5 = pb.NewTuple(tupleType, {longKeyTwo, value5});
        const auto data6 = pb.NewTuple(tupleType, {longKeyTwo, value6});
        const auto data7 = pb.NewTuple(tupleType, {longKeyTwo, value7});
        const auto data8 = pb.NewTuple(tupleType, {longKeyTwo, value8});
        const auto data9 = pb.NewTuple(tupleType, {longKeyTwo, value9});

        const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

        const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                return {pb.NewOptional(items.back()), pb.NewOptional(keys.front()), pb.NewEmptyOptional(optionalType), pb.NewEmptyOptional(optionalType)};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.NewOptional(items.back()), state.front(), state[1U], state[2U]};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                state.erase(state.cbegin());
                return {pb.FlatMap(pb.NewList(optionalType, state), [&](TRuntimeNode item) { return item; } )};
            }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode {
                return pb.Fold1(items.front(),
                    [&](TRuntimeNode item) { return item; },
                    [&](TRuntimeNode item, TRuntimeNode state) {
                        return pb.AggrConcat(pb.AggrConcat(state, pb.NewDataLiteral<NUdf::EDataSlot::String>(" / ")), item);
                    }
                );
            }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        if (SPILLING) {
            graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
        }

        const auto streamVal = graph->GetValue();
        std::unordered_set<TString> expected {
            "key one",
            "very long value 2 / key two",
            "very long key one",
            "very long value 8 / very long value 7 / very long value 6"
        };

        CheckIfStreamHasExpectedStringValues(streamVal, expected);
    }

    Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsPasstroughtRefCounting) {
        // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
        if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
        TSetup<LLVM, SPILLING> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
        const auto tupleType = pb.NewTupleType({dataType, dataType});

        const auto keyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("key one");
        const auto keyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("key two");

        const auto longKeyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long key one");
        const auto longKeyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long key two");

        const auto value1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 1");
        const auto value2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 2");
        const auto value3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 3");
        const auto value4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 4");
        const auto value5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 5");
        const auto value6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 6");
        const auto value7 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 7");
        const auto value8 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 8");
        const auto value9 = pb.NewDataLiteral<NUdf::EDataSlot::String>("very long value 9");

        const auto data1 = pb.NewTuple(tupleType, {keyOne, value1});

        const auto data2 = pb.NewTuple(tupleType, {keyTwo, value2});
        const auto data3 = pb.NewTuple(tupleType, {keyTwo, value3});

        const auto data4 = pb.NewTuple(tupleType, {longKeyOne, value4});

        const auto data5 = pb.NewTuple(tupleType, {longKeyTwo, value5});
        const auto data6 = pb.NewTuple(tupleType, {longKeyTwo, value6});
        const auto data7 = pb.NewTuple(tupleType, {longKeyTwo, value7});
        const auto data8 = pb.NewTuple(tupleType, {longKeyTwo, value8});
        const auto data9 = pb.NewTuple(tupleType, {longKeyTwo, value9});

        const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

        const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                return {items.back(), keys.front(), items.back(), items.front()};
            },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {items.back(), keys.front(), state[2U], state.back()};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return state;
            }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode {
                return pb.Fold1(pb.NewList(dataType, items),
                    [&](TRuntimeNode item) { return item; },
                    [&](TRuntimeNode item, TRuntimeNode state) {
                        return pb.AggrConcat(pb.AggrConcat(state, pb.NewDataLiteral<NUdf::EDataSlot::String>(" / ")), item);
                    }
                );
            }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        if (SPILLING) {
            graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
        }

        const auto streamVal = graph->GetValue();
        std::unordered_set<TString> expected {
            "very long value 1 / key one / very long value 1 / key one",
            "very long value 3 / key two / very long value 2 / key two",
            "very long value 4 / very long key one / very long value 4 / very long key one",
            "very long value 9 / very long key two / very long value 5 / very long key two"
        };

        CheckIfStreamHasExpectedStringValues(streamVal, expected);
    }

    Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedInput) {
        // Test is broken. Remove this if after YQL-18808.
        if (SPILLING) return;

        // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
        if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
        TSetup<LLVM, SPILLING> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
        const auto optionalType = pb.NewOptionalType(dataType);
        const auto tupleType = pb.NewTupleType({dataType, optionalType, dataType});

        const auto keyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("key one");
        const auto keyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("key two");

        const auto value1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 1");
        const auto value2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 2");
        const auto value3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 3");
        const auto value4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 4");
        const auto value5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 5");

        const auto empty = pb.NewDataLiteral<NUdf::EDataSlot::String>("");

        const auto none = pb.NewEmptyOptional(optionalType);

        const auto data1 = pb.NewTuple(tupleType, {keyOne, none, value1});
        const auto data2 = pb.NewTuple(tupleType, {keyTwo, none, value2});
        const auto data3 = pb.NewTuple(tupleType, {keyTwo, none, value3});
        const auto data4 = pb.NewTuple(tupleType, {keyOne, none, value4});
        const auto data5 = pb.NewTuple(tupleType, {keyOne, none, value5});
        const auto data6 = pb.NewTuple(tupleType, {keyOne, none, value1});
        const auto data7 = pb.NewTuple(tupleType, {keyOne, none, value2});
        const auto data8 = pb.NewTuple(tupleType, {keyTwo, none, value3});
        const auto data9 = pb.NewTuple(tupleType, {keyTwo, none, value4});

        const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

        const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");

        const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                return {items.back(), keys.front(), empty, empty};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {items.back(), state.front(), state[1U], state[2U]};
            },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                state.insert(state.cbegin(), keys.cbegin(), keys.cend());
                return {pb.NewList(dataType, state)};
            }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode {
                return pb.Fold1(items.front(),
                    [&](TRuntimeNode item) { return item; },
                    [&](TRuntimeNode item, TRuntimeNode state) {
                        return pb.AggrConcat(pb.AggrConcat(state, pb.NewDataLiteral<NUdf::EDataSlot::String>(" / ")), item);
                    }
                );
            }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        if (SPILLING) {
            graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
        }

        const auto streamVal = graph->GetValue();
        std::unordered_set<TString> expected {
            "key one / value 2 / value 1 / value 5 / value 4",
            "key two / value 4 / value 3 / value 3 / value 2"
        };

        CheckIfStreamHasExpectedStringValues(streamVal, expected);
    }

    Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedOutput) {
        // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
        if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
        TSetup<LLVM, SPILLING> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
        const auto optionalType = pb.NewOptionalType(dataType);
        const auto tupleType = pb.NewTupleType({dataType, optionalType, dataType});

        const auto keyOne = pb.NewDataLiteral<NUdf::EDataSlot::String>("key one");
        const auto keyTwo = pb.NewDataLiteral<NUdf::EDataSlot::String>("key two");

        const auto value1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 1");
        const auto value2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 2");
        const auto value3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 3");
        const auto value4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 4");
        const auto value5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("value 5");

        const auto empty = pb.NewDataLiteral<NUdf::EDataSlot::String>("");

        const auto none = pb.NewEmptyOptional(optionalType);

        const auto data1 = pb.NewTuple(tupleType, {keyOne, none, value1});
        const auto data2 = pb.NewTuple(tupleType, {keyTwo, none, value2});
        const auto data3 = pb.NewTuple(tupleType, {keyTwo, none, value3});
        const auto data4 = pb.NewTuple(tupleType, {keyOne, none, value4});
        const auto data5 = pb.NewTuple(tupleType, {keyOne, none, value5});
        const auto data6 = pb.NewTuple(tupleType, {keyOne, none, value1});
        const auto data7 = pb.NewTuple(tupleType, {keyOne, none, value2});
        const auto data8 = pb.NewTuple(tupleType, {keyTwo, none, value3});
        const auto data9 = pb.NewTuple(tupleType, {keyTwo, none, value4});

        const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});

        const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");

        const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                return {items[1U], items.back()};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.Concat(state.front(), items[1U]), pb.AggrConcat(pb.AggrConcat(state.back(), pb.NewDataLiteral<NUdf::EDataSlot::String>(", ")), items.back())};
            },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.Unwrap(state.front(), landmine, __FILE__, __LINE__, 0),  pb.AggrConcat(pb.AggrConcat(keys.front(), pb.NewDataLiteral<NUdf::EDataSlot::String>(": ")), state.back())};
            }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.back(); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        if (SPILLING) {
            graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
        }

        const auto streamVal = graph->GetValue();
        std::unordered_set<TString> expected {
            "key one: value 1, value 4, value 5, value 1, value 2",
            "key two: value 2, value 3, value 3, value 4"
        };

        CheckIfStreamHasExpectedStringValues(streamVal, expected);
    }

    Y_UNIT_TEST_LLVM_SPILLING(TestThinAllLambdas) {
        // callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
        if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
        TSetup<LLVM, SPILLING> setup;
        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto tupleType = pb.NewTupleType({});
        const auto data = pb.NewTuple({});

        const auto list = pb.NewList(tupleType, {data, data, data, data});

        const auto pgmReturn = pb.FromFlow(pb.NarrowMap(WideLastCombiner<SPILLING>(pb, pb.ExpandMap(pb.ToFlow(list),
            [](TRuntimeNode) -> TRuntimeNode::TList { return {}; }),
            [](TRuntimeNode::TList items) { return items; },
            [](TRuntimeNode::TList, TRuntimeNode::TList items) { return items; },
            [](TRuntimeNode::TList, TRuntimeNode::TList, TRuntimeNode::TList state) { return state; },
            [](TRuntimeNode::TList, TRuntimeNode::TList state) { return state; }),
            [&](TRuntimeNode::TList) { return pb.NewTuple({}); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn);
        const auto streamVal = graph->GetValue();
        NUdf::TUnboxedValue item;
        const auto fetchStatus = streamVal.Fetch(item);
        UNIT_ASSERT_EQUAL(fetchStatus, NUdf::EFetchStatus::Finish);
    }

    Y_UNIT_TEST_LLVM(TestSpillingBucketsDistribution) {
        const size_t expectedBucketsCount = 128;
        const size_t sampleSize = 8 * 128;

        TSetup<LLVM, true> setup;

        std::vector<std::pair<ui64, ui64>> samples(sampleSize);
        std::generate(samples.begin(), samples.end(), [key = (ui64)1] () mutable -> std::pair<ui64, ui64> {
            key += 64;
            return {key, 1};
        });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<ui64>::Id), pb.NewDataType(NUdf::TDataType<ui64>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideLastCombinerWithSpilling(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back())}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {keys.front(), state.front()}; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); }
        ));

        const auto spillerFactory = std::make_shared<TMockSpillerFactory>();
        const auto graph = setup.BuildGraph(pgmReturn, {list});
        graph->GetContext().SpillerFactory = spillerFactory;

        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(samples.size(), items));
        for (const auto& sample : samples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[0] = NUdf::TUnboxedValuePod(sample.first);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
        }

        const auto& value = graph->GetValue();

        NUdf::TUnboxedValue item;
        while (value.Fetch(item) != NUdf::EFetchStatus::Finish) {
            ;
        }

        UNIT_ASSERT_EQUAL_C(spillerFactory->GetCreatedSpillers().size(), 1, "WideLastCombiner expected to create one spiller ");
        const auto wideCombinerSpiller = std::dynamic_pointer_cast<TMockSpiller>(spillerFactory->GetCreatedSpillers()[0]);
        UNIT_ASSERT_C(wideCombinerSpiller != nullptr, "MockSpillerFactory expected to create only MockSpillers");

        auto flushedBucketsSizes = wideCombinerSpiller->GetPutSizes();
        UNIT_ASSERT_EQUAL_C(flushedBucketsSizes.size(), expectedBucketsCount, "Spiller doesn't Put expected number of buckets");

        auto anyEmpty = std::any_of(flushedBucketsSizes.begin(), flushedBucketsSizes.end(), [](size_t size) { return size == 0; });
        UNIT_ASSERT_C(!anyEmpty, "Spiller flushed empty bucket");
    }
}

Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerPerfTest) {
    Y_UNIT_TEST_LLVM(TestSumDoubleBooleanKeys) {
        TSetup<LLVM> setup;

        double positive = 0.0, negative = 0.0;
        const auto t = TInstant::Now();
        for (const auto& sample : I8Samples) {
            (sample.second > 0.0 ? positive : negative) += sample.second;
        }
        const auto cppTime = TInstant::Now() - t;

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {pb.AggrGreater(items.front(), pb.NewDataLiteral(0.0))}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.front())}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
            [&](TRuntimeNode::TList items) { return items.front(); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
        std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto first = value.GetElement(0);
        const auto second = value.GetElement(1);
        const auto t2 = TInstant::Now();

        if (first.template Get<double>() > 0.0) {
            UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), positive);
            UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), negative);
        } else {
            UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), negative);
            UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), positive);
        }

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleBooleanKeys) {
        TSetup<LLVM> setup;

        double pSum = 0.0, nSum = 0.0, pMax = 0.0, nMax = -1000.0, pMin = 1000.0, nMin = 0.0;
        const auto t = TInstant::Now();
        for (const auto& sample : I8Samples) {
            if (sample.second > 0.0) {
                pSum += sample.second;
                pMax = std::max(pMax, sample.second);
                pMin = std::min(pMin, sample.second);
            } else {
                nSum += sample.second;
                nMax = std::max(nMax, sample.second);
                nMin = std::min(nMin, sample.second);
            }
        }

        const auto cppTime = TInstant::Now() - t;

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {pb.AggrGreater(items.front(), pb.NewDataLiteral(0.0))}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front(), items.front(), items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.AggrAdd(state.front(), items.front()), pb.AggrMin(state[1U], items.front()), pb.AggrMax(state.back(), items.back()) };
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
        std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto first = value.GetElement(0);
        const auto second = value.GetElement(1);
        const auto t2 = TInstant::Now();

        if (first.GetElement(0).template Get<double>() > 0.0) {
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), pSum);
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), pMin);
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), pMax);

            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), nSum);
            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), nMin);
            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), nMax);
        } else {
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), nSum);
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), nMin);
            UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), nMax);

            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), pSum);
            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), pMin);
            UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), pMax);
        }

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestSumDoubleSmallKey) {
        TSetup<LLVM> setup;

        std::unordered_map<i8, double> expects(201);
        const auto t = TInstant::Now();
        for (const auto& sample : I8Samples) {
            expects.emplace(sample.first, 0.0).first->second += sample.second;
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<i8, double>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back())}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {keys.front(), state.front()}; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
        for (const auto& sample : I8Samples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[0] = NUdf::TUnboxedValuePod(sample.first);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), ptr[i].GetElement(1).template Get<double>());
        }

        std::sort(two.begin(), two.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleSmallKey) {
        TSetup<LLVM> setup;

        std::unordered_map<i8, std::array<double, 3U>> expects(201);
        const auto t = TInstant::Now();
        for (const auto& sample : I8Samples) {
            auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, std::numeric_limits<double>::max(), std::numeric_limits<double>::min()}).first->second;
            std::get<0U>(item) += sample.second;
            std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
            std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<i8, std::array<double, 3U>>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back(), items.back(), items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back()), pb.AggrMin(state[1U], items.back()), pb.AggrMax(state.back(), items.back())}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { state.insert(state.cbegin(), keys.front()); return state; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
        for (const auto& sample : I8Samples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[0] = NUdf::TUnboxedValuePod(sample.first);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
        }

        std::sort(two.begin(), two.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestSumDoubleStringKey) {
        TSetup<LLVM> setup;

        std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
        std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });

        std::unordered_map<std::string, double> expects(201);
        const auto t = TInstant::Now();
        for (const auto& sample : stringI8Samples) {
            expects.emplace(sample.first, 0.0).first->second += sample.second;
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<std::string_view, double>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back())}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {keys.front(), state.front()}; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
        for (const auto& sample : stringI8Samples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            two.emplace_back(ptr[i].GetElements()->AsStringRef(), ptr[i].GetElement(1).template Get<double>());
        }

        std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleStringKey) {
        TSetup<LLVM> setup;

        std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
        std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });

        std::unordered_map<std::string, std::array<double, 3U>> expects(201);
        const auto t = TInstant::Now();
        for (const auto& sample : stringI8Samples) {
            auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
            std::get<0U>(item) += sample.second;
            std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
            std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<std::string_view, std::array<double, 3U>>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(item, 0U), pb.Nth(item, 1U) }; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back(), items.back(), items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.back()), pb.AggrMin(state[1U], items.back()), pb.AggrMax(state.back(), items.back())}; },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { state.insert(state.cbegin(), keys.front()); return state; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
        for (const auto& sample : stringI8Samples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            two.emplace_back(ptr[i].GetElements()->AsStringRef(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
        }

        std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestMinMaxSumTupleKey) {
        TSetup<LLVM> setup;

        std::vector<std::pair<std::pair<ui32, std::string>, double>> pairSamples(Ui16Samples.size());
        std::transform(Ui16Samples.cbegin(), Ui16Samples.cend(), pairSamples.begin(), [](std::pair<ui16, double> src){ return std::make_pair(std::make_pair(ui32(src.first / 10U % 100U), ToString(src.first % 10U)), src.second); });

        struct TPairHash { size_t operator()(const std::pair<ui32, std::string>& p) const { return CombineHashes(std::hash<ui32>()(p.first), std::hash<std::string_view>()(p.second)); } };

        std::unordered_map<std::pair<ui32, std::string>, std::array<double, 3U>, TPairHash> expects;
        const auto t = TInstant::Now();
        for (const auto& sample : pairSamples) {
            auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
            std::get<0U>(item) += sample.second;
            std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
            std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<std::pair<ui32, std::string>, std::array<double, 3U>>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({pb.NewTupleType({pb.NewDataType(NUdf::TDataType<ui32>::Id), pb.NewDataType(NUdf::TDataType<const char*>::Id)}), pb.NewDataType(NUdf::TDataType<double>::Id)}));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
            [&](TRuntimeNode item) -> TRuntimeNode::TList { return { pb.Nth(pb.Nth(item, 0U), 0U), pb.Nth(pb.Nth(item, 0U), 1U), pb.Nth(item, 1U) }; }),
            [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front(), items[1U]}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back(), items.back(), items.back()}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                return {pb.AggrAdd(state.front(), items.back()), pb.AggrMin(state[1U], items.back()), pb.AggrMax(state.back(), items.back()) };
            },
            [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {keys.front(), keys.back(), state.front(), state[1U], state.back()}; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple({pb.NewTuple({items[0U], items[1U]}), items[2U], items[3U], items[4U]}); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(pairSamples.size(), items));
        for (const auto& sample : pairSamples) {
            NUdf::TUnboxedValue* pair = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
            pair[1] = NUdf::TUnboxedValuePod(sample.second);
            NUdf::TUnboxedValue* keys = nullptr;
            pair[0] =  graph->GetHolderFactory().CreateDirectArrayHolder(2U, keys);
            keys[0] = NUdf::TUnboxedValuePod(sample.first.first);
            keys[1] = NUdf::TUnboxedValuePod::Embedded(sample.first.second);
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            const auto elements = ptr[i].GetElements();
            two.emplace_back(std::make_pair(elements[0].GetElement(0).template Get<ui32>(), (elements[0].GetElements()[1]).AsStringRef()), std::array<double, 3U>{elements[1].template Get<double>(), elements[2].template Get<double>(), elements[3].template Get<double>()});
        }

        std::sort(two.begin(), two.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }

    Y_UNIT_TEST_LLVM(TestTpch) {
        TSetup<LLVM> setup;

        struct TPairHash { size_t operator()(const std::pair<std::string_view, std::string_view>& p) const { return CombineHashes(std::hash<std::string_view>()(p.first), std::hash<std::string_view>()(p.second)); } };

        std::unordered_map<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>, TPairHash> expects;
        const auto t = TInstant::Now();
        for (auto& sample : TpchSamples) {
            if (std::get<0U>(sample) <= border) {
                const auto& ins = expects.emplace(std::pair<std::string_view, std::string_view>{std::get<1U>(sample), std::get<2U>(sample)}, std::pair<ui64, std::array<double, 5U>>{0ULL, {0., 0., 0., 0., 0.}});
                auto& item = ins.first->second;
                ++item.first;
                std::get<0U>(item.second) += std::get<3U>(sample);
                std::get<1U>(item.second) += std::get<5U>(sample);
                std::get<2U>(item.second) += std::get<6U>(sample);
                const auto v = std::get<3U>(sample) * (1. - std::get<5U>(sample));
                std::get<3U>(item.second) += v;
                std::get<4U>(item.second) += v * (1. + std::get<4U>(sample));
            }
        }
        for (auto& item : expects) {
            std::get<1U>(item.second.second) /= item.second.first;
        }
        const auto cppTime = TInstant::Now() - t;

        std::vector<std::pair<std::pair<std::string, std::string>, std::pair<ui64, std::array<double, 5U>>>> one, two;
        one.reserve(expects.size());
        two.reserve(expects.size());

        one.insert(one.cend(), expects.cbegin(), expects.cend());
        std::sort(one.begin(), one.end(), [](const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> l, const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> r){ return l.first < r.first; });

        TProgramBuilder& pb = *setup.PgmBuilder;

        const auto listType = pb.NewListType(pb.NewTupleType({
            pb.NewDataType(NUdf::TDataType<ui64>::Id),
            pb.NewDataType(NUdf::TDataType<const char*>::Id),
            pb.NewDataType(NUdf::TDataType<const char*>::Id),
            pb.NewDataType(NUdf::TDataType<double>::Id),
            pb.NewDataType(NUdf::TDataType<double>::Id),
            pb.NewDataType(NUdf::TDataType<double>::Id),
            pb.NewDataType(NUdf::TDataType<double>::Id)
        }));
        const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();

        const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(
            pb.WideFilter(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
                [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U), pb.Nth(item, 4U), pb.Nth(item, 5U), pb.Nth(item, 6U)}; }),
                [&](TRuntimeNode::TList items) { return pb.AggrLessOrEqual(items.front(), pb.NewDataLiteral<ui64>(border)); }
            ),
            [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item[1U], item[2U]}; },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList {
                const auto price = items[3U];
                const auto disco = items[5U];
                const auto v = pb.Mul(price, pb.Sub(pb.NewDataLiteral<double>(1.), disco));
                return {pb.NewDataLiteral<ui64>(1ULL), price, disco, items[6U], v, pb.Mul(v, pb.Add(pb.NewDataLiteral<double>(1.), items[4U]))};
            },
            [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
                const auto price = items[3U];
                const auto disco = items[5U];
                const auto v = pb.Mul(price, pb.Sub(pb.NewDataLiteral<double>(1.), disco));
                return {pb.Increment(state[0U]), pb.AggrAdd(state[1U], price), pb.AggrAdd(state[2U], disco), pb.AggrAdd(state[3U], items[6U]), pb.AggrAdd(state[4U], v), pb.AggrAdd(state[5U], pb.Mul(v, pb.Add(pb.NewDataLiteral<double>(1.), items[4U])))};
            },
            [&](TRuntimeNode::TList key, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {key.front(), key.back(), state[0U], state[1U], pb.Div(state[2U], state[0U]), state[3U], state[4U], state[5U]}; }),
            [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
        ));

        const auto graph = setup.BuildGraph(pgmReturn, {list});
        NUdf::TUnboxedValue* items = nullptr;
        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(TpchSamples.size(), items));
        for (const auto& sample : TpchSamples) {
            NUdf::TUnboxedValue* elements = nullptr;
            *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(7U, elements);
            elements[0] = NUdf::TUnboxedValuePod(std::get<0U>(sample));
            elements[1] = NUdf::TUnboxedValuePod::Embedded(std::get<1U>(sample));
            elements[2] = NUdf::TUnboxedValuePod::Embedded(std::get<2U>(sample));
            elements[3] = NUdf::TUnboxedValuePod(std::get<3U>(sample));
            elements[4] = NUdf::TUnboxedValuePod(std::get<4U>(sample));
            elements[5] = NUdf::TUnboxedValuePod(std::get<5U>(sample));
            elements[6] = NUdf::TUnboxedValuePod(std::get<6U>(sample));
        }

        const auto t1 = TInstant::Now();
        const auto& value = graph->GetValue();
        const auto t2 = TInstant::Now();

        UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());

        const auto ptr = value.GetElements();
        for (size_t i = 0ULL; i < expects.size(); ++i) {
            const auto elements = ptr[i].GetElements();
            two.emplace_back(std::make_pair(elements[0].AsStringRef(), elements[1].AsStringRef()), std::pair<ui64, std::array<double, 5U>>{elements[2].template Get<ui64>(), {elements[3].template Get<double>(), elements[4].template Get<double>(), elements[5].template Get<double>(), elements[6].template Get<double>(), elements[7].template Get<double>()}});
        }

        std::sort(two.begin(), two.end(), [](const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> l, const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> r){ return l.first < r.first; });
        UNIT_ASSERT_VALUES_EQUAL(one, two);

        Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
    }
}
#endif
}
}