mkql_multihopping.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. #include "mkql_multihopping.h"
  2. #include "mkql_saveload.h"
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_stats_registry.h>
  6. #include <yql/essentials/minikql/mkql_string_util.h>
  7. #include <yql/essentials/minikql/mkql_type_builder.h>
  8. #include <yql/essentials/minikql/watermark_tracker.h>
  9. #include <util/generic/scope.h>
  10. namespace NKikimr {
  11. namespace NMiniKQL {
  12. namespace {
  13. const TStatKey Hop_NewHopsCount("MultiHop_NewHopsCount", true);
  14. const TStatKey Hop_EarlyThrownEventsCount("MultiHop_EarlyThrownEventsCount", true);
  15. const TStatKey Hop_LateThrownEventsCount("MultiHop_LateThrownEventsCount", true);
  16. const TStatKey Hop_EmptyTimeCount("MultiHop_EmptyTimeCount", true);
  17. const TStatKey Hop_KeysCount("MultiHop_KeysCount", true);
  18. constexpr ui32 StateVersion = 1;
  19. using TEqualsFunc = std::function<bool(NUdf::TUnboxedValuePod, NUdf::TUnboxedValuePod)>;
  20. using THashFunc = std::function<NYql::NUdf::THashType(NUdf::TUnboxedValuePod)>;
  21. class TMultiHoppingCoreWrapper : public TStatefulSourceComputationNode<TMultiHoppingCoreWrapper, true> {
  22. using TBaseComputation = TStatefulSourceComputationNode<TMultiHoppingCoreWrapper, true>;
  23. public:
  24. using TSelf = TMultiHoppingCoreWrapper;
  25. class TStreamValue : public TComputationValue<TStreamValue> {
  26. public:
  27. using TBase = TComputationValue<TStreamValue>;
  28. TStreamValue(
  29. TMemoryUsageInfo* memInfo,
  30. NUdf::TUnboxedValue&& stream,
  31. const TSelf* self,
  32. ui64 hopTime,
  33. ui64 intervalHopCount,
  34. ui64 delayHopCount,
  35. bool dataWatermarks,
  36. bool watermarkMode,
  37. TComputationContext& ctx,
  38. const THashFunc& hash,
  39. const TEqualsFunc& equal,
  40. TWatermark& watermark)
  41. : TBase(memInfo)
  42. , Stream(std::move(stream))
  43. , Self(self)
  44. , HopTime(hopTime)
  45. , IntervalHopCount(intervalHopCount)
  46. , DelayHopCount(delayHopCount)
  47. , Watermark(watermark)
  48. , WatermarkMode(watermarkMode)
  49. , StatesMap(0, hash, equal)
  50. , Ctx(ctx)
  51. {
  52. if (!watermarkMode && dataWatermarks) {
  53. DataWatermarkTracker.emplace(TWatermarkTracker(delayHopCount * hopTime, hopTime));
  54. }
  55. }
  56. ~TStreamValue() {
  57. ClearState();
  58. }
  59. private:
  60. struct TBucket {
  61. NUdf::TUnboxedValue Value;
  62. bool HasValue = false;
  63. };
  64. struct TKeyState {
  65. std::vector<TBucket, TMKQLAllocator<TBucket>> Buckets; // circular buffer
  66. ui64 HopIndex; // Start index of current window
  67. TKeyState(ui64 bucketsCount, ui64 hopIndex)
  68. : Buckets(bucketsCount)
  69. , HopIndex(hopIndex)
  70. {}
  71. TKeyState(TKeyState&& state)
  72. : Buckets(std::move(state.Buckets))
  73. , HopIndex(state.HopIndex)
  74. {}
  75. };
  76. ui32 GetTraverseCount() const override {
  77. return 1;
  78. }
  79. NUdf::TUnboxedValue GetTraverseItem(ui32 index) const override {
  80. Y_UNUSED(index);
  81. return Stream;
  82. }
  83. NUdf::TUnboxedValue Save() const override {
  84. MKQL_ENSURE(Ready.empty(), "Inconsistent state to save, not all elements are fetched");
  85. TOutputSerializer out(EMkqlStateType::SIMPLE_BLOB, StateVersion, Ctx);
  86. out.Write<ui32>(StatesMap.size());
  87. for (const auto& [key, state] : StatesMap) {
  88. out.WriteUnboxedValue(Self->KeyPacker.RefMutableObject(Ctx, false, Self->KeyType), key);
  89. out(state.HopIndex);
  90. out.Write<ui32>(state.Buckets.size());
  91. for (const auto& bucket : state.Buckets) {
  92. out(bucket.HasValue);
  93. if (bucket.HasValue) {
  94. Self->InSave->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
  95. if (Self->StateType) {
  96. out.WriteUnboxedValue(Self->StatePacker.RefMutableObject(Ctx, false, Self->StateType),
  97. Self->OutSave->GetValue(Ctx));
  98. }
  99. }
  100. }
  101. }
  102. out(Finished);
  103. return out.MakeState();
  104. }
  105. void Load(const NUdf::TStringRef& state) override {
  106. TInputSerializer in(state, EMkqlStateType::SIMPLE_BLOB);
  107. LoadStateImpl(in);
  108. }
  109. bool Load2(const NUdf::TUnboxedValue& state) override {
  110. TInputSerializer in(state, EMkqlStateType::SIMPLE_BLOB);
  111. LoadStateImpl(in);
  112. return true;
  113. }
  114. void LoadStateImpl(TInputSerializer& in) {
  115. const auto loadStateVersion = in.GetStateVersion();
  116. if (loadStateVersion != StateVersion) {
  117. THROW yexception() << "Invalid state version " << loadStateVersion;
  118. }
  119. const auto statesMapSize = in.Read<ui32>();
  120. ClearState();
  121. StatesMap.reserve(statesMapSize);
  122. for (auto i = 0U; i < statesMapSize; ++i) {
  123. auto key = in.ReadUnboxedValue(Self->KeyPacker.RefMutableObject(Ctx, false, Self->KeyType), Ctx);
  124. const auto hopIndex = in.Read<ui64>();
  125. const auto bucketsSize = in.Read<ui32>();
  126. TKeyState keyState(bucketsSize, hopIndex);
  127. for (auto& bucket : keyState.Buckets) {
  128. in(bucket.HasValue);
  129. if (bucket.HasValue) {
  130. if (Self->StateType) {
  131. Self->InLoad->SetValue(Ctx, in.ReadUnboxedValue(Self->StatePacker.RefMutableObject(Ctx, false, Self->StateType), Ctx));
  132. }
  133. bucket.Value = Self->OutLoad->GetValue(Ctx);
  134. }
  135. }
  136. StatesMap.emplace(key, std::move(keyState));
  137. key.Ref();
  138. }
  139. in(Finished);
  140. }
  141. bool HasListItems() const override {
  142. return false;
  143. }
  144. TInstant GetWatermark() {
  145. return Watermark.WatermarkIn;
  146. }
  147. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  148. if (!Ready.empty()) {
  149. result = std::move(Ready.front());
  150. Ready.pop_front();
  151. return NUdf::EFetchStatus::Ok;
  152. }
  153. if (PendingYield) {
  154. PendingYield = false;
  155. return NUdf::EFetchStatus::Yield;
  156. }
  157. if (Finished) {
  158. return NUdf::EFetchStatus::Finish;
  159. }
  160. i64 EarlyEventsThrown = 0;
  161. i64 LateEventsThrown = 0;
  162. i64 newHopsStat = 0;
  163. i64 emptyTimeCtStat = 0;
  164. Y_DEFER {
  165. MKQL_ADD_STAT(Ctx.Stats, Hop_EarlyThrownEventsCount, EarlyEventsThrown);
  166. MKQL_ADD_STAT(Ctx.Stats, Hop_LateThrownEventsCount, LateEventsThrown);
  167. MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHopsStat);
  168. MKQL_ADD_STAT(Ctx.Stats, Hop_EmptyTimeCount, emptyTimeCtStat);
  169. };
  170. for (NUdf::TUnboxedValue item;;) {
  171. if (!Ready.empty()) {
  172. result = std::move(Ready.front());
  173. Ready.pop_front();
  174. return NUdf::EFetchStatus::Ok;
  175. }
  176. const auto status = Stream.Fetch(item);
  177. if (status != NUdf::EFetchStatus::Ok) {
  178. if (status == NUdf::EFetchStatus::Finish) {
  179. CloseOldBuckets(Max<ui64>(), newHopsStat);
  180. Finished = true;
  181. if (!Ready.empty()) {
  182. result = std::move(Ready.front());
  183. Ready.pop_front();
  184. return NUdf::EFetchStatus::Ok;
  185. }
  186. } else if (status == NUdf::EFetchStatus::Yield) {
  187. if (!WatermarkMode) {
  188. return status;
  189. }
  190. PendingYield = true;
  191. CloseOldBuckets(GetWatermark().MicroSeconds(), newHopsStat);
  192. if (!Ready.empty()) {
  193. result = std::move(Ready.front());
  194. Ready.pop_front();
  195. return NUdf::EFetchStatus::Ok;
  196. }
  197. PendingYield = false;
  198. return NUdf::EFetchStatus::Yield;
  199. }
  200. return status;
  201. }
  202. Self->Item->SetValue(Ctx, std::move(item));
  203. auto key = Self->KeyExtract->GetValue(Ctx);
  204. const auto& time = Self->OutTime->GetValue(Ctx);
  205. if (!time) {
  206. ++emptyTimeCtStat;
  207. continue;
  208. }
  209. const auto ts = time.Get<ui64>();
  210. const auto hopIndex = ts / HopTime;
  211. auto& keyState = GetOrCreateKeyState(key, WatermarkMode ? GetWatermark().MicroSeconds() / HopTime : hopIndex);
  212. if (hopIndex < keyState.HopIndex) {
  213. ++LateEventsThrown;
  214. continue;
  215. }
  216. if (WatermarkMode && (hopIndex >= keyState.HopIndex + DelayHopCount + IntervalHopCount)) {
  217. ++EarlyEventsThrown;
  218. continue;
  219. }
  220. // Overflow is not possible, because hopIndex is a product of a division
  221. if (!WatermarkMode) {
  222. auto closeBeforeIndex = Max<i64>(hopIndex + 1 - DelayHopCount - IntervalHopCount, 0);
  223. CloseOldBucketsForKey(key, keyState, closeBeforeIndex, newHopsStat);
  224. }
  225. auto& bucket = keyState.Buckets[hopIndex % keyState.Buckets.size()];
  226. if (!bucket.HasValue) {
  227. bucket.Value = Self->OutInit->GetValue(Ctx);
  228. bucket.HasValue = true;
  229. } else {
  230. Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key));
  231. Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
  232. bucket.Value = Self->OutUpdate->GetValue(Ctx);
  233. }
  234. if (DataWatermarkTracker) {
  235. const auto newWatermark = DataWatermarkTracker->HandleNextEventTime(ts);
  236. if (newWatermark && !WatermarkMode) {
  237. CloseOldBuckets(*newWatermark, newHopsStat);
  238. }
  239. }
  240. MKQL_SET_STAT(Ctx.Stats, Hop_KeysCount, StatesMap.size());
  241. }
  242. }
  243. TKeyState& GetOrCreateKeyState(NUdf::TUnboxedValue& key, ui64 hopIndex) {
  244. i64 keyHopIndex = Max<i64>(hopIndex + 1 - IntervalHopCount, 0);
  245. // For first element we shouldn't forget windows in the past
  246. // Overflow is not possible, because hopIndex is a product of a division
  247. const auto iter = StatesMap.try_emplace(
  248. key,
  249. IntervalHopCount + DelayHopCount,
  250. keyHopIndex
  251. );
  252. if (iter.second) {
  253. key.Ref();
  254. }
  255. return iter.first->second;
  256. }
  257. // Will return true if key state became empty
  258. bool CloseOldBucketsForKey(
  259. const NUdf::TUnboxedValue& key,
  260. TKeyState& keyState,
  261. const ui64 closeBeforeIndex, // Excluded bound
  262. i64& newHopsStat)
  263. {
  264. auto& bucketsForKey = keyState.Buckets;
  265. bool becameEmpty = false;
  266. for (auto i = 0U; i < bucketsForKey.size(); ++i) {
  267. const auto curHopIndex = keyState.HopIndex;
  268. if (curHopIndex >= closeBeforeIndex) {
  269. break;
  270. }
  271. i64 lastIndexWithValue = -1;
  272. TMaybe<NUdf::TUnboxedValue> aggregated;
  273. for (ui64 j = 0; j < IntervalHopCount; j++) {
  274. const auto curBucketIndex = (curHopIndex + j) % bucketsForKey.size();
  275. const auto& bucket = bucketsForKey[curBucketIndex];
  276. if (!bucket.HasValue) {
  277. continue;
  278. }
  279. if (!aggregated) { // todo: clone
  280. Self->InSave->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
  281. Self->InLoad->SetValue(Ctx, Self->OutSave->GetValue(Ctx));
  282. aggregated = Self->OutLoad->GetValue(Ctx);
  283. } else {
  284. Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
  285. Self->State2->SetValue(Ctx, NUdf::TUnboxedValue(*aggregated));
  286. aggregated = Self->OutMerge->GetValue(Ctx);
  287. }
  288. lastIndexWithValue = Max<i64>(lastIndexWithValue, j);
  289. }
  290. if (aggregated) {
  291. Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key));
  292. Self->State->SetValue(Ctx, NUdf::TUnboxedValue(*aggregated));
  293. // Outer code requires window end time (not start as could be expected)
  294. Self->Time->SetValue(Ctx, NUdf::TUnboxedValuePod((curHopIndex + IntervalHopCount) * HopTime));
  295. Ready.emplace_back(Self->OutFinish->GetValue(Ctx));
  296. newHopsStat++;
  297. }
  298. auto& clearBucket = bucketsForKey[curHopIndex % bucketsForKey.size()];
  299. clearBucket.Value = NUdf::TUnboxedValue();
  300. clearBucket.HasValue = false;
  301. keyState.HopIndex++;
  302. if (lastIndexWithValue == 0) {
  303. // Check if there is extra data in delayed buckets
  304. for (ui64 j = IntervalHopCount; j < bucketsForKey.size(); j++) {
  305. const auto curBucketIndex = (curHopIndex + j) % bucketsForKey.size();
  306. const auto& bucket = bucketsForKey[curBucketIndex];
  307. if (bucket.HasValue) {
  308. lastIndexWithValue = Max<i64>(lastIndexWithValue, j);
  309. }
  310. }
  311. if (lastIndexWithValue == 0) {
  312. becameEmpty = true;
  313. break;
  314. }
  315. }
  316. }
  317. keyState.HopIndex = Max<ui64>(keyState.HopIndex, closeBeforeIndex);
  318. return becameEmpty;
  319. }
  320. void CloseOldBuckets(ui64 watermarkTs, i64& newHops) {
  321. const auto watermarkIndex = watermarkTs / HopTime;
  322. EraseNodesIf(StatesMap, [&](auto& iter) {
  323. auto& [key, val] = iter;
  324. ui64 closeBeforeIndex = watermarkIndex + 1 - IntervalHopCount;
  325. const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, closeBeforeIndex, newHops);
  326. if (keyStateBecameEmpty) {
  327. key.UnRef();
  328. }
  329. return keyStateBecameEmpty;
  330. });
  331. return;
  332. }
  333. void ClearState() {
  334. EraseNodesIf(StatesMap, [&](auto& iter) {
  335. iter.first.UnRef();
  336. return true;
  337. });
  338. StatesMap.rehash(0);
  339. }
  340. const NUdf::TUnboxedValue Stream;
  341. const TSelf *const Self;
  342. const ui64 HopTime;
  343. const ui64 IntervalHopCount;
  344. const ui64 DelayHopCount;
  345. TWatermark& Watermark;
  346. bool WatermarkMode;
  347. bool PendingYield = false;
  348. using TStatesMap = std::unordered_map<
  349. NUdf::TUnboxedValuePod, TKeyState,
  350. THashFunc, TEqualsFunc,
  351. TMKQLAllocator<std::pair<const NUdf::TUnboxedValuePod, TKeyState>>>;
  352. TStatesMap StatesMap; // Map of states for each key
  353. std::deque<NUdf::TUnboxedValue> Ready; // buffer for fetching results
  354. bool Finished = false;
  355. TComputationContext& Ctx;
  356. std::optional<TWatermarkTracker> DataWatermarkTracker;
  357. };
  358. TMultiHoppingCoreWrapper(
  359. TComputationMutables& mutables,
  360. IComputationNode* stream,
  361. IComputationExternalNode* item,
  362. IComputationExternalNode* key,
  363. IComputationExternalNode* state,
  364. IComputationExternalNode* state2,
  365. IComputationExternalNode* time,
  366. IComputationExternalNode* inSave,
  367. IComputationExternalNode* inLoad,
  368. IComputationNode* keyExtract,
  369. IComputationNode* outTime,
  370. IComputationNode* outInit,
  371. IComputationNode* outUpdate,
  372. IComputationNode* outSave,
  373. IComputationNode* outLoad,
  374. IComputationNode* outMerge,
  375. IComputationNode* outFinish,
  376. IComputationNode* hop,
  377. IComputationNode* interval,
  378. IComputationNode* delay,
  379. IComputationNode* dataWatermarks,
  380. IComputationNode* watermarkMode,
  381. TType* keyType,
  382. TType* stateType,
  383. TWatermark& watermark)
  384. : TBaseComputation(mutables)
  385. , Stream(stream)
  386. , Item(item)
  387. , Key(key)
  388. , State(state)
  389. , State2(state2)
  390. , Time(time)
  391. , InSave(inSave)
  392. , InLoad(inLoad)
  393. , KeyExtract(keyExtract)
  394. , OutTime(outTime)
  395. , OutInit(outInit)
  396. , OutUpdate(outUpdate)
  397. , OutSave(outSave)
  398. , OutLoad(outLoad)
  399. , OutMerge(outMerge)
  400. , OutFinish(outFinish)
  401. , Hop(hop)
  402. , Interval(interval)
  403. , Delay(delay)
  404. , DataWatermarks(dataWatermarks)
  405. , WatermarkMode(watermarkMode)
  406. , KeyType(keyType)
  407. , StateType(stateType)
  408. , KeyPacker(mutables)
  409. , StatePacker(mutables)
  410. , KeyTypes()
  411. , IsTuple(false)
  412. , UseIHash(false)
  413. , Watermark(watermark)
  414. {
  415. Stateless = false;
  416. bool encoded;
  417. GetDictionaryKeyTypes(keyType, KeyTypes, IsTuple, encoded, UseIHash);
  418. Y_ABORT_UNLESS(!encoded, "TODO");
  419. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  420. Hash = UseIHash ? MakeHashImpl(KeyType) : nullptr;
  421. }
  422. NUdf::TUnboxedValuePod CreateStream(TComputationContext& ctx) const {
  423. const auto hopTime = Hop->GetValue(ctx).Get<ui64>();
  424. const auto interval = Interval->GetValue(ctx).Get<ui64>();
  425. const auto delay = Delay->GetValue(ctx).Get<ui64>();
  426. const auto dataWatermarks = DataWatermarks->GetValue(ctx).Get<bool>();
  427. const auto watermarkMode = WatermarkMode->GetValue(ctx).Get<bool>();
  428. const auto intervalHopCount = interval / hopTime;
  429. const auto delayHopCount = delay / hopTime;
  430. return ctx.HolderFactory.Create<TStreamValue>(
  431. Stream->GetValue(ctx),
  432. this,
  433. hopTime,
  434. intervalHopCount,
  435. delayHopCount,
  436. dataWatermarks,
  437. watermarkMode,
  438. ctx,
  439. TValueHasher(KeyTypes, IsTuple, Hash.Get()),
  440. TValueEqual(KeyTypes, IsTuple, Equate.Get()),
  441. Watermark
  442. );
  443. }
  444. NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override {
  445. NUdf::TUnboxedValue& valueRef = ValueRef(compCtx);
  446. if (valueRef.IsInvalid()) {
  447. // Create new.
  448. valueRef = CreateStream(compCtx);
  449. } else if (valueRef.HasValue()) {
  450. MKQL_ENSURE(valueRef.IsBoxed(), "Expected boxed value");
  451. bool isStateToLoad = valueRef.HasListItems();
  452. if (isStateToLoad) {
  453. // Load from saved state.
  454. NUdf::TUnboxedValue stream = CreateStream(compCtx);
  455. stream.Load2(valueRef);
  456. valueRef = stream;
  457. }
  458. }
  459. return valueRef;
  460. }
  461. private:
  462. void RegisterDependencies() const final {
  463. DependsOn(Stream);
  464. Own(Item);
  465. Own(Key);
  466. Own(State);
  467. Own(State2);
  468. Own(Time);
  469. Own(InSave);
  470. Own(InLoad);
  471. DependsOn(KeyExtract);
  472. DependsOn(OutTime);
  473. DependsOn(OutInit);
  474. DependsOn(OutUpdate);
  475. DependsOn(OutSave);
  476. DependsOn(OutLoad);
  477. DependsOn(OutMerge);
  478. DependsOn(OutFinish);
  479. DependsOn(Hop);
  480. DependsOn(Interval);
  481. DependsOn(Delay);
  482. DependsOn(DataWatermarks);
  483. DependsOn(WatermarkMode);
  484. }
  485. IComputationNode* const Stream;
  486. IComputationExternalNode* const Item;
  487. IComputationExternalNode* const Key;
  488. IComputationExternalNode* const State;
  489. IComputationExternalNode* const State2;
  490. IComputationExternalNode* const Time;
  491. IComputationExternalNode* const InSave;
  492. IComputationExternalNode* const InLoad;
  493. IComputationNode* const KeyExtract;
  494. IComputationNode* const OutTime;
  495. IComputationNode* const OutInit;
  496. IComputationNode* const OutUpdate;
  497. IComputationNode* const OutSave;
  498. IComputationNode* const OutLoad;
  499. IComputationNode* const OutMerge;
  500. IComputationNode* const OutFinish;
  501. IComputationNode* const Hop;
  502. IComputationNode* const Interval;
  503. IComputationNode* const Delay;
  504. IComputationNode* const DataWatermarks;
  505. IComputationNode* const WatermarkMode;
  506. TType* const KeyType;
  507. TType* const StateType;
  508. TMutableObjectOverBoxedValue<TValuePackerBoxed> KeyPacker;
  509. TMutableObjectOverBoxedValue<TValuePackerBoxed> StatePacker;
  510. TKeyTypes KeyTypes;
  511. bool IsTuple;
  512. bool UseIHash;
  513. TWatermark& Watermark;
  514. NUdf::IEquate::TPtr Equate;
  515. NUdf::IHash::TPtr Hash;
  516. };
  517. }
  518. IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark) {
  519. MKQL_ENSURE(callable.GetInputsCount() == 21, "Expected 21 args");
  520. auto hasSaveLoad = !callable.GetInput(12).GetStaticType()->IsVoid();
  521. IComputationExternalNode* inSave = nullptr;
  522. IComputationNode* outSave = nullptr;
  523. IComputationExternalNode* inLoad = nullptr;
  524. IComputationNode* outLoad = nullptr;
  525. auto streamType = callable.GetInput(0).GetStaticType();
  526. MKQL_ENSURE(streamType->IsStream(), "Expected stream");
  527. const auto keyType = callable.GetInput(8).GetStaticType();
  528. auto stream = LocateNode(ctx.NodeLocator, callable, 0);
  529. auto keyExtract = LocateNode(ctx.NodeLocator, callable, 8);
  530. auto outTime = LocateNode(ctx.NodeLocator, callable, 9);
  531. auto outInit = LocateNode(ctx.NodeLocator, callable, 10);
  532. auto outUpdate = LocateNode(ctx.NodeLocator, callable, 11);
  533. if (hasSaveLoad) {
  534. outSave = LocateNode(ctx.NodeLocator, callable, 12);
  535. outLoad = LocateNode(ctx.NodeLocator, callable, 13);
  536. }
  537. auto outMerge = LocateNode(ctx.NodeLocator, callable, 14);
  538. auto outFinish = LocateNode(ctx.NodeLocator, callable, 15);
  539. auto hop = LocateNode(ctx.NodeLocator, callable, 16);
  540. auto interval = LocateNode(ctx.NodeLocator, callable, 17);
  541. auto delay = LocateNode(ctx.NodeLocator, callable, 18);
  542. auto dataWatermarks = LocateNode(ctx.NodeLocator, callable, 19);
  543. auto watermarkMode = LocateNode(ctx.NodeLocator, callable, 20);
  544. auto item = LocateExternalNode(ctx.NodeLocator, callable, 1);
  545. auto key = LocateExternalNode(ctx.NodeLocator, callable, 2);
  546. auto state = LocateExternalNode(ctx.NodeLocator, callable, 3);
  547. auto state2 = LocateExternalNode(ctx.NodeLocator, callable, 4);
  548. auto time = LocateExternalNode(ctx.NodeLocator, callable, 5);
  549. if (hasSaveLoad) {
  550. inSave = LocateExternalNode(ctx.NodeLocator, callable, 6);
  551. inLoad = LocateExternalNode(ctx.NodeLocator, callable, 7);
  552. }
  553. auto stateType = hasSaveLoad ? callable.GetInput(12).GetStaticType() : nullptr;
  554. return new TMultiHoppingCoreWrapper(ctx.Mutables,
  555. stream, item, key, state, state2, time, inSave, inLoad, keyExtract,
  556. outTime, outInit, outUpdate, outSave, outLoad, outMerge, outFinish,
  557. hop, interval, delay, dataWatermarks, watermarkMode, keyType, stateType, watermark);
  558. }
  559. }
  560. }