mkql_multihopping_ut.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. #include "../mkql_multihopping.h"
  2. #include "mkql_computation_node_ut.h"
  3. #include <yql/essentials/minikql/mkql_node.h>
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_program_builder.h>
  6. #include <yql/essentials/minikql/mkql_function_registry.h>
  7. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  8. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  9. #include <yql/essentials/minikql/computation/mkql_computation_node_graph_saveload.h>
  10. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  11. #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
  12. #include <library/cpp/testing/unittest/registar.h>
  13. namespace NKikimr {
  14. namespace NMiniKQL {
  15. namespace {
  16. struct TInputItem {
  17. ui32 Key = 0;
  18. i64 Time = 0;
  19. ui32 Val = 0;
  20. };
  21. struct TOutputItem {
  22. ui32 Key = 0;
  23. ui32 Val = 0;
  24. ui64 Time = 0;
  25. constexpr bool operator==(const TOutputItem& rhs) const
  26. {
  27. return this->Key == rhs.Key && this->Val == rhs.Val && this->Time == rhs.Time;
  28. }
  29. };
  30. struct TOutputGroup {
  31. TOutputGroup(std::initializer_list<TOutputItem> items) : Items(items) {}
  32. std::vector<TOutputItem> Items;
  33. };
  34. std::vector<TOutputItem> Ordered(std::vector<TOutputItem> vec) {
  35. std::sort(vec.begin(), vec.end(), [](auto l, auto r) {
  36. return std::make_tuple(l.Key, l.Val, l.Time) < std::make_tuple(r.Key, r.Val, r.Time);
  37. });
  38. return vec;
  39. }
  40. IOutputStream &operator<<(IOutputStream &output, std::vector<TOutputItem> items) {
  41. output << "[";
  42. for (ui32 i = 0; i < items.size(); ++i) {
  43. output << "(" << items.at(i).Key << ";" << items.at(i).Val << ";" << items.at(i).Time << ")";
  44. if (i != items.size() - 1)
  45. output << ",";
  46. }
  47. output << "]";
  48. return output;
  49. }
  50. TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark) {
  51. return [&watermark](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
  52. if (callable.GetType()->GetName() == "MyStream") {
  53. return new TExternalComputationNode(ctx.Mutables);
  54. } else if (callable.GetType()->GetName() == "MultiHoppingCore") {
  55. return WrapMultiHoppingCore(callable, ctx, watermark);
  56. }
  57. return GetBuiltinFactory()(callable, ctx);
  58. };
  59. }
  60. struct TStream : public NUdf::TBoxedValue {
  61. TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback, bool* yield)
  62. : Items(items)
  63. , FetchCallback(fetchCallback)
  64. , yield(yield) {}
  65. private:
  66. TUnboxedValueVector Items;
  67. ui32 Index = 0;
  68. std::function<void()> FetchCallback;
  69. bool* yield;
  70. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
  71. FetchCallback();
  72. if (*yield) {
  73. return NUdf::EFetchStatus::Yield;
  74. }
  75. if (Index >= Items.size()) {
  76. return NUdf::EFetchStatus::Finish;
  77. }
  78. result = Items[Index++];
  79. return NUdf::EFetchStatus::Ok;
  80. }
  81. };
  82. THolder<IComputationGraph> BuildGraph(
  83. TSetup<false>& setup,
  84. bool watermarkMode,
  85. const std::vector<TInputItem> items,
  86. std::function<void()> fetchCallback,
  87. bool dataWatermarks,
  88. bool* yield,
  89. ui64 hop = 10,
  90. ui64 interval = 30,
  91. ui64 delay = 20)
  92. {
  93. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  94. auto structType = pgmBuilder.NewEmptyStructType();
  95. structType = pgmBuilder.NewStructType(structType, "key",
  96. pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id));
  97. structType = pgmBuilder.NewStructType(structType, "time",
  98. pgmBuilder.NewDataType(NUdf::TDataType<NUdf::TTimestamp>::Id));
  99. structType = pgmBuilder.NewStructType(structType, "sum",
  100. pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id));
  101. auto keyIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("key");
  102. auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time");
  103. auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum");
  104. auto inStreamType = pgmBuilder.NewStreamType(structType);
  105. TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "MyStream", inStreamType);
  106. auto streamNode = inStream.Build();
  107. auto pgmReturn = pgmBuilder.MultiHoppingCore(
  108. TRuntimeNode(streamNode, false),
  109. [&](TRuntimeNode item) { // keyExtractor
  110. return pgmBuilder.Member(item, "key");
  111. },
  112. [&](TRuntimeNode item) { // timeExtractor
  113. return pgmBuilder.Member(item, "time");
  114. },
  115. [&](TRuntimeNode item) { // init
  116. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  117. members.emplace_back("sum", pgmBuilder.Member(item, "sum"));
  118. return pgmBuilder.NewStruct(members);
  119. },
  120. [&](TRuntimeNode item, TRuntimeNode state) { // update
  121. auto add = pgmBuilder.AggrAdd(
  122. pgmBuilder.Member(item, "sum"),
  123. pgmBuilder.Member(state, "sum"));
  124. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  125. members.emplace_back("sum", add);
  126. return pgmBuilder.NewStruct(members);
  127. },
  128. [&](TRuntimeNode state) { // save
  129. return pgmBuilder.Member(state, "sum");
  130. },
  131. [&](TRuntimeNode savedState) { // load
  132. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  133. members.emplace_back("sum", savedState);
  134. return pgmBuilder.NewStruct(members);
  135. },
  136. [&](TRuntimeNode state1, TRuntimeNode state2) { // merge
  137. auto add = pgmBuilder.AggrAdd(
  138. pgmBuilder.Member(state1, "sum"),
  139. pgmBuilder.Member(state2, "sum"));
  140. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  141. members.emplace_back("sum", add);
  142. return pgmBuilder.NewStruct(members);
  143. },
  144. [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) { // finish
  145. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  146. members.emplace_back("key", key);
  147. members.emplace_back("sum", pgmBuilder.Member(state, "sum"));
  148. members.emplace_back("time", time);
  149. return pgmBuilder.NewStruct(members);
  150. },
  151. pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop
  152. pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval
  153. pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))), // delay
  154. pgmBuilder.NewDataLiteral<bool>(dataWatermarks),
  155. pgmBuilder.NewDataLiteral<bool>(watermarkMode)
  156. );
  157. auto graph = setup.BuildGraph(pgmReturn, {streamNode});
  158. TUnboxedValueVector streamItems;
  159. for (size_t i = 0; i < items.size(); ++i) {
  160. NUdf::TUnboxedValue* itemsPtr;
  161. auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(3, itemsPtr);
  162. itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(items.at(i).Key);
  163. itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items.at(i).Time);
  164. itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items.at(i).Val);
  165. streamItems.push_back(std::move(structValues));
  166. }
  167. auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback, yield));
  168. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
  169. return graph;
  170. }
  171. }
  172. Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
  173. void TestImpl(
  174. const std::vector<TInputItem>& input,
  175. const std::vector<TOutputGroup>& expected,
  176. bool dataWatermarks,
  177. ui64 hop = 10,
  178. ui64 interval = 30,
  179. ui64 delay = 20,
  180. std::function<void(ui32, TSetup<false>&)> customCheck = [](ui32, TSetup<false>&){},
  181. TWatermark* watermark = nullptr,
  182. bool* yield = nullptr,
  183. std::function<void()> fetch_callback= [](){},
  184. bool watermarkMode = false)
  185. {
  186. bool yield_clone = false;
  187. if (!yield) {
  188. yield = &yield_clone;
  189. }
  190. if (watermarkMode) {
  191. dataWatermarks = false;
  192. }
  193. TWatermark watermark_clone{TInstant::Zero()};
  194. if (watermark == nullptr) {
  195. watermark = &watermark_clone;
  196. }
  197. TSetup<false> setup1(GetAuxCallableFactory(*watermark));
  198. ui32 curGroupId = 0;
  199. std::vector<TOutputItem> curResult;
  200. auto check = [&curResult, &curGroupId, &expected, customCheck, &setup1, &fetch_callback]() {
  201. fetch_callback();
  202. auto expectedItems = Ordered(expected.at(curGroupId).Items); // Add more empty lists at yield in expected
  203. curResult = Ordered(curResult);
  204. UNIT_ASSERT_EQUAL_C(curResult, expectedItems, "curGroup: " << curGroupId << " actual: " << curResult << " expected: " << expectedItems);
  205. customCheck(curGroupId, setup1);
  206. curGroupId++;
  207. curResult.clear();
  208. };
  209. auto graph1 = BuildGraph(setup1, watermarkMode, input, check, dataWatermarks, yield, hop, interval, delay);
  210. auto root1 = graph1->GetValue();
  211. NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
  212. while (status == NUdf::EFetchStatus::Ok || status == NUdf::EFetchStatus::Yield) {
  213. NUdf::TUnboxedValue val;
  214. status = root1.Fetch(val);
  215. if (status == NUdf::EFetchStatus::Ok) {
  216. curResult.emplace_back(TOutputItem{val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>(), val.GetElement(2).Get<ui64>()});
  217. }
  218. }
  219. check();
  220. UNIT_ASSERT_EQUAL_C(curGroupId, expected.size(), "1: " << curGroupId << " 2: " << expected.size());
  221. }
  222. void TestWatermarksImpl(
  223. const std::vector<TInputItem>& input,
  224. const std::vector<TOutputGroup>& expected,
  225. const std::vector<std::pair<ui64, TInstant>>& watermarks)
  226. {
  227. bool yield = false;
  228. TWatermark watermark;
  229. ui64 inp_index = 0;
  230. ui64 pattern_index = 0;
  231. auto avant_fetch = [&yield, &watermark, &watermarks, &inp_index, &pattern_index](){
  232. yield = false;
  233. if (pattern_index >= watermarks.size()) {
  234. return;
  235. }
  236. if (inp_index == watermarks[pattern_index].first) {
  237. yield = true;
  238. watermark.WatermarkIn = watermarks[pattern_index].second;
  239. ++pattern_index;
  240. } else {
  241. ++inp_index;
  242. }
  243. };
  244. TestImpl(input, expected, false, 10, 30, 20, [](ui32, TSetup<false>&){}, &watermark, &yield, avant_fetch, true);
  245. }
  246. Y_UNIT_TEST(TestThrowWatermarkFromPast) {
  247. const std::vector<TInputItem> input = {
  248. // Group; Time; Value
  249. {1, 101, 2},
  250. {1, 131, 3},
  251. {1, 200, 4},
  252. {1, 300, 5},
  253. {1, 400, 6}
  254. };
  255. const std::vector<TOutputGroup> expected = {
  256. TOutputGroup({}),
  257. TOutputGroup({}),
  258. TOutputGroup({}),
  259. TOutputGroup({}),
  260. TOutputGroup({}),
  261. TOutputGroup({}),
  262. TOutputGroup({}),
  263. TOutputGroup({}),
  264. TOutputGroup({})
  265. };
  266. std::vector<std::pair<ui64, TInstant>> yield_pattern = {
  267. {2, TInstant::MicroSeconds(20)},
  268. {3, TInstant::MicroSeconds(40)}
  269. };
  270. TestWatermarksImpl(input, expected, yield_pattern);
  271. }
  272. Y_UNIT_TEST(TestThrowWatermarkFromFuture) {
  273. const std::vector<TInputItem> input = {
  274. // Group; Time; Value
  275. {1, 101, 2},
  276. {1, 131, 3},
  277. {1, 200, 4},
  278. {1, 300, 5},
  279. {1, 400, 6}
  280. };
  281. const std::vector<TOutputGroup> expected = {
  282. TOutputGroup({}),
  283. TOutputGroup({}),
  284. TOutputGroup({}),
  285. TOutputGroup({}),
  286. TOutputGroup({}),
  287. TOutputGroup({}),
  288. TOutputGroup({}),
  289. TOutputGroup({}),
  290. TOutputGroup({})
  291. };
  292. std::vector<std::pair<ui64, TInstant>> yield_pattern = {
  293. {2, TInstant::MicroSeconds(1000)},
  294. {3, TInstant::MicroSeconds(2000)}
  295. };
  296. TestWatermarksImpl(input, expected, yield_pattern);
  297. }
  298. Y_UNIT_TEST(TestWatermarkFlow1) {
  299. const std::vector<TInputItem> input = {
  300. // Group; Time; Value
  301. {1, 101, 2},
  302. {1, 131, 3},
  303. {1, 200, 4},
  304. {1, 300, 5},
  305. {1, 400, 6}
  306. };
  307. const std::vector<TOutputGroup> expected = {
  308. TOutputGroup({}),
  309. TOutputGroup({}),
  310. TOutputGroup({}),
  311. TOutputGroup({}),
  312. TOutputGroup({}),
  313. TOutputGroup({{1, 2, 110},{1, 2, 120},{1, 2, 130}}),
  314. TOutputGroup({}),
  315. TOutputGroup({}),
  316. TOutputGroup({})
  317. };
  318. std::vector<std::pair<ui64, TInstant>> yield_pattern = {
  319. {0, TInstant::MicroSeconds(100)},
  320. {3, TInstant::MicroSeconds(200)}
  321. };
  322. TestWatermarksImpl(input, expected, yield_pattern);
  323. }
  324. Y_UNIT_TEST(TestWatermarkFlow2) {
  325. const std::vector<TInputItem> input = {
  326. // Group; Time; Value
  327. {1, 100, 2},
  328. {1, 105, 3},
  329. {1, 80, 4},
  330. {1, 107, 5},
  331. {1, 106, 6}
  332. };
  333. const std::vector<TOutputGroup> expected = {
  334. TOutputGroup({}),
  335. TOutputGroup({}),
  336. TOutputGroup({}),
  337. TOutputGroup({}),
  338. TOutputGroup({}),
  339. TOutputGroup({}),
  340. TOutputGroup({}),
  341. TOutputGroup({{1, 4, 90}, {1, 4, 100}, {1, 4, 110}})
  342. };
  343. std::vector<std::pair<ui64, TInstant>> yield_pattern = {
  344. {0, TInstant::MicroSeconds(76)},
  345. };
  346. TestWatermarksImpl(input, expected, yield_pattern);
  347. }
  348. Y_UNIT_TEST(TestWatermarkFlow3) {
  349. const std::vector<TInputItem> input = {
  350. // Group; Time; Value
  351. {1, 90, 2},
  352. {1, 99, 3},
  353. {1, 80, 4},
  354. {1, 107, 5},
  355. {1, 106, 6}
  356. };
  357. const std::vector<TOutputGroup> expected = {
  358. TOutputGroup({}),
  359. TOutputGroup({}),
  360. TOutputGroup({}),
  361. TOutputGroup({}),
  362. TOutputGroup({}),
  363. TOutputGroup({}),
  364. TOutputGroup({}),
  365. TOutputGroup({{1, 4, 90}, {1, 9, 100}, {1, 9, 110}, {1, 5, 120}})
  366. };
  367. std::vector<std::pair<ui64, TInstant>> yield_pattern = {
  368. {0, TInstant::MicroSeconds(76)},
  369. };
  370. TestWatermarksImpl(input, expected, yield_pattern);
  371. }
  372. Y_UNIT_TEST(TestDataWatermarks) {
  373. const std::vector<TInputItem> input = {
  374. // Group; Time; Value
  375. {1, 101, 2},
  376. {2, 101, 2},
  377. {1, 111, 3},
  378. {2, 140, 5},
  379. {2, 160, 1}
  380. };
  381. const std::vector<TOutputGroup> expected = {
  382. TOutputGroup({}),
  383. TOutputGroup({}),
  384. TOutputGroup({}),
  385. TOutputGroup({}),
  386. TOutputGroup({{1, 2, 110}, {1, 5, 120}, {2, 2, 110}, {2, 2, 120}}),
  387. TOutputGroup({{2, 2, 130}, {1, 5, 130}, {1, 3, 140}}),
  388. TOutputGroup({{2, 5, 150}, {2, 5, 160}, {2, 6, 170}, {2, 1, 180}, {2, 1, 190}}),
  389. };
  390. TestImpl(input, expected, true);
  391. }
  392. Y_UNIT_TEST(TestDataWatermarksNoGarbage) {
  393. const std::vector<TInputItem> input = {
  394. // Group; Time; Value
  395. {1, 100, 2},
  396. {2, 150, 1}
  397. };
  398. const std::vector<TOutputGroup> expected = {
  399. TOutputGroup({}),
  400. TOutputGroup({}),
  401. TOutputGroup({{1, 2, 110}, {1, 2, 120}, {1, 2, 130}}),
  402. TOutputGroup({{2, 1, 160}, {2, 1, 170}, {2, 1, 180}}),
  403. };
  404. TestImpl(input, expected, true, 10, 30, 20,
  405. [](ui32 curGroup, TSetup<false>& setup) {
  406. if (curGroup != 2) {
  407. return;
  408. }
  409. setup.StatsRegistry->ForEachStat([](const TStatKey& key, i64 value) {
  410. if (key.GetName() == "MultiHop_KeysCount") {
  411. UNIT_ASSERT_EQUAL_C(value, 1, "actual: " << value << " expected: " << 1);
  412. }
  413. });
  414. });
  415. }
  416. Y_UNIT_TEST(TestValidness1) {
  417. const std::vector<TInputItem> input1 = {
  418. // Group; Time; Value
  419. {1, 101, 2},
  420. {2, 101, 2},
  421. {1, 111, 3},
  422. {2, 140, 5},
  423. {2, 160, 1}
  424. };
  425. const std::vector<TOutputGroup> expected = {
  426. TOutputGroup({}),
  427. TOutputGroup({}),
  428. TOutputGroup({}),
  429. TOutputGroup({}),
  430. TOutputGroup({{2, 2, 110}, {2, 2, 120}}),
  431. TOutputGroup({{2, 2, 130}}),
  432. TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}, {2, 5, 150},
  433. {2, 5, 160}, {2, 6, 170}, {2, 1, 190}, {2, 1, 180}}),
  434. };
  435. TestImpl(input1, expected, false);
  436. }
  437. Y_UNIT_TEST(TestValidness2) {
  438. const std::vector<TInputItem> input = {
  439. // Group; Time; Value
  440. {2, 101, 2}, {1, 101, 2}, {2, 102, 3}, {1, 102, 3}, {2, 115, 4},
  441. {1, 115, 4}, {2, 123, 6}, {1, 123, 6}, {2, 124, 5}, {1, 124, 5},
  442. {2, 125, 7}, {1, 125, 7}, {2, 140, 2}, {1, 140, 2}, {2, 147, 1},
  443. {1, 147, 1}, {2, 151, 6}, {1, 151, 6}, {2, 159, 2}, {1, 159, 2},
  444. {2, 185, 8}, {1, 185, 8}
  445. };
  446. const std::vector<TOutputGroup> expected = {
  447. TOutputGroup({}),
  448. TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
  449. TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
  450. TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
  451. TOutputGroup({{1, 5, 110}, {1, 9, 120}, {2, 5, 110}, {2, 9, 120}}),
  452. TOutputGroup({}),
  453. TOutputGroup({}), TOutputGroup({}),
  454. TOutputGroup({{2, 27, 130}, {1, 27, 130}}),
  455. TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
  456. TOutputGroup({{2, 22, 140}, {2, 21, 150}, {2, 11, 160}, {1, 22, 140}, {1, 21, 150}, {1, 11, 160}}),
  457. TOutputGroup({}),
  458. TOutputGroup({{1, 11, 170}, {1, 8, 180}, {1, 8, 190}, {1, 8, 200}, {1, 8, 210}, {2, 11, 170},
  459. {2, 8, 180}, {2, 8, 190}, {2, 8, 200}, {2, 8, 210}}),
  460. };
  461. TestImpl(input, expected, true);
  462. }
  463. Y_UNIT_TEST(TestValidness3) {
  464. const std::vector<TInputItem> input = {
  465. // Group; Time; Value
  466. {1, 105, 1}, {1, 107, 4}, {2, 106, 3}, {1, 111, 7}, {1, 117, 3},
  467. {2, 110, 2}, {1, 108, 9}, {1, 121, 4}, {2, 107, 2}, {2, 141, 5},
  468. {1, 141, 10}
  469. };
  470. const std::vector<TOutputGroup> expected = {
  471. TOutputGroup({}),
  472. TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
  473. TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
  474. TOutputGroup({{1, 14, 110}, {2, 3, 110}}),
  475. TOutputGroup({}),
  476. TOutputGroup({{2, 7, 115}, {2, 2, 120}, {1, 21, 115}, {1, 10, 120}, {1, 7, 125}, {1, 4, 130}}),
  477. TOutputGroup({}),
  478. TOutputGroup({{1, 10, 145}, {1, 10, 150}, {2, 5, 145}, {2, 5, 150}})
  479. };
  480. TestImpl(input, expected, true, 5, 10, 10);
  481. }
  482. Y_UNIT_TEST(TestDelay) {
  483. const std::vector<TInputItem> input = {
  484. // Group; Time; Value
  485. {1, 101, 3}, {1, 111, 5}, {1, 120, 7}, {1, 80, 9}, {1, 79, 11}
  486. };
  487. const std::vector<TOutputGroup> expected = {
  488. TOutputGroup({}),
  489. TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
  490. TOutputGroup({}), TOutputGroup({}),
  491. TOutputGroup({{1, 12, 110}, {1, 8, 120}, {1, 15, 130}, {1, 12, 140}, {1, 7, 150}})
  492. };
  493. TestImpl(input, expected, false);
  494. }
  495. Y_UNIT_TEST(TestWindowsBeforeFirstElement) {
  496. const std::vector<TInputItem> input = {
  497. // Group; Time; Value
  498. {1, 101, 2}, {1, 111, 3}
  499. };
  500. const std::vector<TOutputGroup> expected = {
  501. TOutputGroup({}),
  502. TOutputGroup({}),
  503. TOutputGroup({}),
  504. TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}})
  505. };
  506. TestImpl(input, expected, false);
  507. }
  508. Y_UNIT_TEST(TestSubzeroValues) {
  509. const std::vector<TInputItem> input = {
  510. // Group; Time; Value
  511. {1, 1, 2}
  512. };
  513. const std::vector<TOutputGroup> expected = {
  514. TOutputGroup({}),
  515. TOutputGroup({}),
  516. TOutputGroup({{1, 2, 30}}),
  517. };
  518. TestImpl(input, expected, false);
  519. }
  520. }
  521. } // namespace NMiniKQL
  522. } // namespace NKikimr