|
@@ -1,5 +1,8 @@
|
|
|
#include "mkql_computation_node_ut.h"
|
|
|
+
|
|
|
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
|
|
|
#include <ydb/library/yql/minikql/mkql_runtime_version.h>
|
|
|
+#include <ydb/library/yql/minikql/mkql_string_util.h>
|
|
|
|
|
|
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
|
|
|
|
|
@@ -10,9 +13,112 @@
|
|
|
namespace NKikimr {
|
|
|
namespace NMiniKQL {
|
|
|
namespace {
|
|
|
-const auto border = 9124596000000000ULL;
|
|
|
+
|
|
|
+constexpr auto border = 9124596000000000ULL;
|
|
|
+constexpr ui64 g_Yield = std::numeric_limits<ui64>::max();
|
|
|
+constexpr ui64 g_TestYieldStreamData[] = {0, 1, 0, 2, g_Yield, 0, g_Yield, 1, 2, 0, 1, 3, 0, g_Yield, 1, 2};
|
|
|
+
|
|
|
+class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper> {
|
|
|
+using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
|
|
|
+public:
|
|
|
+ class TStreamValue : public TComputationValue<TStreamValue> {
|
|
|
+ public:
|
|
|
+ using TBase = TComputationValue<TStreamValue>;
|
|
|
+
|
|
|
+ TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx)
|
|
|
+ : TBase(memInfo), CompCtx(compCtx)
|
|
|
+ {}
|
|
|
+ private:
|
|
|
+ NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
|
|
|
+ constexpr auto size = Y_ARRAY_SIZE(g_TestYieldStreamData);
|
|
|
+ if (Index == size) {
|
|
|
+ return NUdf::EFetchStatus::Finish;
|
|
|
+ }
|
|
|
+
|
|
|
+ const auto val = g_TestYieldStreamData[Index];
|
|
|
+ if (g_Yield == val) {
|
|
|
+ ++Index;
|
|
|
+ return NUdf::EFetchStatus::Yield;
|
|
|
+ }
|
|
|
+
|
|
|
+ NUdf::TUnboxedValue* items = nullptr;
|
|
|
+ result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items);
|
|
|
+ items[0] = NUdf::TUnboxedValuePod(val);
|
|
|
+ items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val)));
|
|
|
+
|
|
|
+ ++Index;
|
|
|
+ return NUdf::EFetchStatus::Ok;
|
|
|
+ }
|
|
|
+
|
|
|
+ private:
|
|
|
+ TComputationContext& CompCtx;
|
|
|
+ ui64 Index = 0;
|
|
|
+ };
|
|
|
+
|
|
|
+ TTestStreamWrapper(TComputationMutables& mutables)
|
|
|
+ : TBaseComputation(mutables)
|
|
|
+ {}
|
|
|
+
|
|
|
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
|
|
|
+ return ctx.HolderFactory.Create<TStreamValue>(ctx);
|
|
|
+ }
|
|
|
+private:
|
|
|
+ void RegisterDependencies() const final {}
|
|
|
+};
|
|
|
+
|
|
|
+IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx) {
|
|
|
+ return new TTestStreamWrapper(ctx.Mutables);
|
|
|
+}
|
|
|
+
|
|
|
+TComputationNodeFactory GetNodeFactory() {
|
|
|
+ return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
|
|
|
+ if (callable.GetType()->GetName() == "TestYieldStream") {
|
|
|
+ return WrapTestStream(ctx);
|
|
|
+ }
|
|
|
+ return GetBuiltinFactory()(callable, ctx);
|
|
|
+ };
|
|
|
}
|
|
|
|
|
|
+template <bool LLVM>
|
|
|
+TRuntimeNode MakeStream(TSetup<LLVM>& setup) {
|
|
|
+ TProgramBuilder& pb = *setup.PgmBuilder;
|
|
|
+
|
|
|
+ TCallableBuilder callableBuilder(*setup.Env, "TestYieldStream",
|
|
|
+ pb.NewStreamType(
|
|
|
+ pb.NewStructType({
|
|
|
+ {TStringBuf("a"), pb.NewDataType(NUdf::EDataSlot::Uint64)},
|
|
|
+ {TStringBuf("b"), pb.NewDataType(NUdf::EDataSlot::String)}
|
|
|
+ })
|
|
|
+ )
|
|
|
+ );
|
|
|
+
|
|
|
+ return TRuntimeNode(callableBuilder.Build(), false);
|
|
|
+}
|
|
|
+
|
|
|
+template <bool OverFlow>
|
|
|
+TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)> finishLambda) {
|
|
|
+ const auto keyExtractor = [&](TRuntimeNode item) {
|
|
|
+ return pb.Member(item, "a");
|
|
|
+ };
|
|
|
+ const auto init = [&](TRuntimeNode /*key*/, TRuntimeNode item) {
|
|
|
+ return item;
|
|
|
+ };
|
|
|
+ const auto update = [&](TRuntimeNode /*key*/, TRuntimeNode item, TRuntimeNode state) {
|
|
|
+ const auto a = pb.Add(pb.Member(item, "a"), pb.Member(state, "a"));
|
|
|
+ const auto b = pb.Concat(pb.Member(item, "b"), pb.Member(state, "b"));
|
|
|
+ return pb.NewStruct({
|
|
|
+ {TStringBuf("a"), a},
|
|
|
+ {TStringBuf("b"), b},
|
|
|
+ });
|
|
|
+ };
|
|
|
+
|
|
|
+ return OverFlow ?
|
|
|
+ pb.FromFlow(pb.CombineCore(pb.ToFlow(stream), keyExtractor, init, update, finishLambda, 64ul << 20)):
|
|
|
+ pb.CombineCore(stream, keyExtractor, init, update, finishLambda, 64ul << 20);
|
|
|
+}
|
|
|
+
|
|
|
+} // unnamed
|
|
|
+
|
|
|
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
|
|
|
Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
|
|
|
Y_UNIT_TEST_LLVM(TestLongStringsRefCounting) {
|
|
@@ -55,7 +161,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
|
|
|
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
|
|
|
|
|
|
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
|
|
|
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), 0ULL,
|
|
|
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), -100000LL,
|
|
|
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
|
|
|
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
|
|
|
return {pb.NewOptional(items.back()), pb.NewOptional(keys.front()), pb.NewEmptyOptional(optionalType), pb.NewEmptyOptional(optionalType)};
|
|
@@ -131,7 +237,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
|
|
|
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
|
|
|
|
|
|
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
|
|
|
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), 0ULL,
|
|
|
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), -1000000LL,
|
|
|
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
|
|
|
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
|
|
|
return {items.back(), keys.front(), items.back(), items.front()};
|
|
@@ -203,7 +309,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
|
|
|
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");
|
|
|
|
|
|
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
|
|
|
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }), 0ULL,
|
|
|
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }), -1000000LL,
|
|
|
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
|
|
|
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
|
|
|
return {items.back(), keys.front(), empty, empty};
|
|
@@ -321,6 +427,38 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
|
|
|
UNIT_ASSERT(!iterator.Next(item));
|
|
|
UNIT_ASSERT(!iterator.Next(item));
|
|
|
}
|
|
|
+#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 46u
|
|
|
+ Y_UNIT_TEST_LLVM(TestHasLimitButPasstroughtYields) {
|
|
|
+ TSetup<LLVM> setup(GetNodeFactory());
|
|
|
+ TProgramBuilder& pb = *setup.PgmBuilder;
|
|
|
+
|
|
|
+ const auto stream = MakeStream<LLVM>(setup);
|
|
|
+ const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(stream),
|
|
|
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Member(item, "a"), pb.Member(item, "b")}; }), -123456789LL,
|
|
|
+ [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
|
|
|
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; },
|
|
|
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {state.front(), pb.AggrConcat(state.back(), items.back())}; },
|
|
|
+ [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
|
|
|
+ [&](TRuntimeNode::TList items) { return items.back(); }
|
|
|
+ ));
|
|
|
+ const auto graph = setup.BuildGraph(pgmReturn);
|
|
|
+ const auto streamVal = graph->GetValue();
|
|
|
+ NUdf::TUnboxedValue result;
|
|
|
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
|
|
|
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
|
|
|
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
|
|
|
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
|
|
|
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "00000");
|
|
|
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
|
|
|
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "1111");
|
|
|
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
|
|
|
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "222");
|
|
|
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
|
|
|
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "3");
|
|
|
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
|
|
|
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
|
|
|
+ }
|
|
|
+#endif
|
|
|
}
|
|
|
|
|
|
Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) {
|