mkql_hopping.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. #include "mkql_hopping.h"
  2. #include "mkql_saveload.h"
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_stats_registry.h>
  6. #include <yql/essentials/minikql/mkql_string_util.h>
  7. #include <util/generic/scope.h>
  8. namespace NKikimr {
  9. namespace NMiniKQL {
  10. namespace {
  11. constexpr ui32 StateVersion = 1;
  12. const TStatKey Hop_NewHopsCount("Hop_NewHopsCount", true);
  13. const TStatKey Hop_ThrownEventsCount("Hop_ThrownEventsCount", true);
  14. class THoppingCoreWrapper : public TMutableComputationNode<THoppingCoreWrapper> {
  15. typedef TMutableComputationNode<THoppingCoreWrapper> TBaseComputation;
  16. public:
  17. using TSelf = THoppingCoreWrapper;
  18. class TStreamValue : public TComputationValue<TStreamValue> {
  19. public:
  20. using TBase = TComputationValue<TStreamValue>;
  21. TStreamValue(
  22. TMemoryUsageInfo* memInfo,
  23. NUdf::TUnboxedValue&& stream,
  24. const TSelf* self,
  25. ui64 hopTime,
  26. ui64 intervalHopCount,
  27. ui64 delayHopCount,
  28. TComputationContext& ctx)
  29. : TBase(memInfo)
  30. , Stream(std::move(stream))
  31. , Self(self)
  32. , HopTime(hopTime)
  33. , IntervalHopCount(intervalHopCount)
  34. , DelayHopCount(delayHopCount)
  35. , Buckets(IntervalHopCount + DelayHopCount)
  36. , Ctx(ctx)
  37. {}
  38. private:
  39. ui32 GetTraverseCount() const override {
  40. return 1;
  41. }
  42. NUdf::TUnboxedValue GetTraverseItem(ui32 index) const override {
  43. Y_UNUSED(index);
  44. return Stream;
  45. }
  46. NUdf::TUnboxedValue Save() const override {
  47. MKQL_ENSURE(Ready.empty(), "Inconsistent state to save, not all elements are fetched");
  48. TOutputSerializer out(EMkqlStateType::SIMPLE_BLOB, StateVersion, Ctx);
  49. out.Write<ui32>(Buckets.size());
  50. for (const auto& bucket : Buckets) {
  51. out(bucket.HasValue);
  52. if (bucket.HasValue) {
  53. Self->InSave->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
  54. if (Self->StateType) {
  55. out.WriteUnboxedValue(Self->Packer.RefMutableObject(Ctx, false, Self->StateType), Self->OutSave->GetValue(Ctx));
  56. }
  57. }
  58. }
  59. out(HopIndex, Started, Finished);
  60. return out.MakeState();
  61. }
  62. void Load(const NUdf::TStringRef& state) override {
  63. TInputSerializer in(state, EMkqlStateType::SIMPLE_BLOB);
  64. const auto loadStateVersion = in.GetStateVersion();
  65. if (loadStateVersion != StateVersion) {
  66. THROW yexception() << "Invalid state version " << loadStateVersion;
  67. }
  68. auto size = in.Read<ui32>();
  69. Buckets.resize(size);
  70. for (auto& bucket : Buckets) {
  71. bucket.HasValue = in.Read<bool>();
  72. if (bucket.HasValue) {
  73. if (Self->StateType) {
  74. Self->InLoad->SetValue(Ctx, in.ReadUnboxedValue(Self->Packer.RefMutableObject(Ctx, false, Self->StateType), Ctx));
  75. }
  76. bucket.Value = Self->OutLoad->GetValue(Ctx);
  77. }
  78. }
  79. in(HopIndex, Started, Finished);
  80. }
  81. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  82. if (!Ready.empty()) {
  83. result = std::move(Ready.front());
  84. Ready.pop_front();
  85. return NUdf::EFetchStatus::Ok;
  86. }
  87. if (Finished) {
  88. return NUdf::EFetchStatus::Finish;
  89. }
  90. i64 thrownEvents = 0;
  91. i64 newHops = 0;
  92. Y_DEFER {
  93. if (thrownEvents) {
  94. MKQL_ADD_STAT(Ctx.Stats, Hop_ThrownEventsCount, thrownEvents);
  95. }
  96. if (newHops) {
  97. MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHops);
  98. }
  99. };
  100. for (NUdf::TUnboxedValue item;;) {
  101. if (!Ready.empty()) {
  102. result = std::move(Ready.front());
  103. Ready.pop_front();
  104. return NUdf::EFetchStatus::Ok;
  105. }
  106. const auto status = Stream.Fetch(item);
  107. if (status != NUdf::EFetchStatus::Ok) {
  108. if (status == NUdf::EFetchStatus::Finish) {
  109. Finished = true;
  110. }
  111. return status;
  112. }
  113. Self->Item->SetValue(Ctx, std::move(item));
  114. auto time = Self->OutTime->GetValue(Ctx);
  115. if (!time) {
  116. continue;
  117. }
  118. auto hopIndex = time.Get<ui64>() / HopTime;
  119. if (!Started) {
  120. HopIndex = hopIndex + 1;
  121. Started = true;
  122. }
  123. while (hopIndex >= HopIndex) {
  124. auto firstBucketIndex = HopIndex % Buckets.size();
  125. auto bucketIndex = firstBucketIndex;
  126. TMaybe<NUdf::TUnboxedValue> aggregated;
  127. for (ui64 i = 0; i < IntervalHopCount; ++i) {
  128. const auto& bucket = Buckets[bucketIndex];
  129. if (bucket.HasValue) {
  130. if (!aggregated) { // todo: clone
  131. Self->InSave->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
  132. Self->InLoad->SetValue(Ctx, Self->OutSave->GetValue(Ctx));
  133. aggregated = Self->OutLoad->GetValue(Ctx);
  134. } else {
  135. Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
  136. Self->State2->SetValue(Ctx, NUdf::TUnboxedValue(*aggregated));
  137. aggregated = Self->OutMerge->GetValue(Ctx);
  138. }
  139. }
  140. if (++bucketIndex == Buckets.size()) {
  141. bucketIndex = 0;
  142. }
  143. }
  144. auto& clearBucket = Buckets[firstBucketIndex];
  145. clearBucket.Value = NUdf::TUnboxedValue();
  146. clearBucket.HasValue = false;
  147. if (aggregated) {
  148. Self->State->SetValue(Ctx, NUdf::TUnboxedValue(*aggregated));
  149. Self->Time->SetValue(Ctx, NUdf::TUnboxedValuePod((HopIndex - DelayHopCount) * HopTime));
  150. Ready.emplace_back(Self->OutFinish->GetValue(Ctx));
  151. }
  152. ++newHops;
  153. ++HopIndex;
  154. }
  155. if (hopIndex + DelayHopCount + 1 >= HopIndex) {
  156. auto& bucket = Buckets[hopIndex % Buckets.size()];
  157. if (!bucket.HasValue) {
  158. bucket.Value = Self->OutInit->GetValue(Ctx);
  159. bucket.HasValue = true;
  160. } else {
  161. Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
  162. bucket.Value = Self->OutUpdate->GetValue(Ctx);
  163. }
  164. } else {
  165. ++thrownEvents;
  166. }
  167. }
  168. }
  169. const NUdf::TUnboxedValue Stream;
  170. const TSelf *const Self;
  171. const ui64 HopTime;
  172. const ui64 IntervalHopCount;
  173. const ui64 DelayHopCount;
  174. struct TBucket {
  175. NUdf::TUnboxedValue Value;
  176. bool HasValue = false;
  177. };
  178. std::vector<TBucket> Buckets; // circular buffer
  179. std::deque<NUdf::TUnboxedValue> Ready; // buffer for fetching results
  180. ui64 HopIndex = 0;
  181. bool Started = false;
  182. bool Finished = false;
  183. TComputationContext& Ctx;
  184. };
  185. THoppingCoreWrapper(
  186. TComputationMutables& mutables,
  187. IComputationNode* stream,
  188. IComputationExternalNode* item,
  189. IComputationExternalNode* state,
  190. IComputationExternalNode* state2,
  191. IComputationExternalNode* time,
  192. IComputationExternalNode* inSave,
  193. IComputationExternalNode* inLoad,
  194. IComputationNode* outTime,
  195. IComputationNode* outInit,
  196. IComputationNode* outUpdate,
  197. IComputationNode* outSave,
  198. IComputationNode* outLoad,
  199. IComputationNode* outMerge,
  200. IComputationNode* outFinish,
  201. IComputationNode* hop,
  202. IComputationNode* interval,
  203. IComputationNode* delay,
  204. TType* stateType)
  205. : TBaseComputation(mutables)
  206. , Stream(stream)
  207. , Item(item)
  208. , State(state)
  209. , State2(state2)
  210. , Time(time)
  211. , InSave(inSave)
  212. , InLoad(inLoad)
  213. , OutTime(outTime)
  214. , OutInit(outInit)
  215. , OutUpdate(outUpdate)
  216. , OutSave(outSave)
  217. , OutLoad(outLoad)
  218. , OutMerge(outMerge)
  219. , OutFinish(outFinish)
  220. , Hop(hop)
  221. , Interval(interval)
  222. , Delay(delay)
  223. , StateType(stateType)
  224. , Packer(mutables)
  225. {
  226. Stateless = false;
  227. }
  228. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  229. const auto hopTime = Hop->GetValue(ctx).Get<ui64>();
  230. const auto interval = Interval->GetValue(ctx).Get<ui64>();
  231. const auto delay = Delay->GetValue(ctx).Get<ui64>();
  232. const auto intervalHopCount = interval / hopTime;
  233. const auto delayHopCount = delay / hopTime;
  234. return ctx.HolderFactory.Create<TStreamValue>(Stream->GetValue(ctx), this, hopTime, intervalHopCount, delayHopCount, ctx);
  235. }
  236. private:
  237. void RegisterDependencies() const final {
  238. DependsOn(Stream);
  239. Own(Item);
  240. Own(State);
  241. Own(State2);
  242. Own(Time);
  243. Own(InSave);
  244. Own(InLoad);
  245. DependsOn(OutTime);
  246. DependsOn(OutInit);
  247. DependsOn(OutUpdate);
  248. DependsOn(OutSave);
  249. DependsOn(OutLoad);
  250. DependsOn(OutMerge);
  251. DependsOn(OutFinish);
  252. DependsOn(Hop);
  253. DependsOn(Interval);
  254. DependsOn(Delay);
  255. }
  256. IComputationNode* const Stream;
  257. IComputationExternalNode* const Item;
  258. IComputationExternalNode* const State;
  259. IComputationExternalNode* const State2;
  260. IComputationExternalNode* const Time;
  261. IComputationExternalNode* const InSave;
  262. IComputationExternalNode* const InLoad;
  263. IComputationNode* const OutTime;
  264. IComputationNode* const OutInit;
  265. IComputationNode* const OutUpdate;
  266. IComputationNode* const OutSave;
  267. IComputationNode* const OutLoad;
  268. IComputationNode* const OutMerge;
  269. IComputationNode* const OutFinish;
  270. IComputationNode* const Hop;
  271. IComputationNode* const Interval;
  272. IComputationNode* const Delay;
  273. TType* const StateType;
  274. TMutableObjectOverBoxedValue<TValuePackerBoxed> Packer;
  275. };
  276. }
  277. IComputationNode* WrapHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  278. MKQL_ENSURE(callable.GetInputsCount() == 17, "Expected 17 args");
  279. auto hasSaveLoad = !callable.GetInput(10).GetStaticType()->IsVoid();
  280. IComputationExternalNode* inSave = nullptr;
  281. IComputationNode* outSave = nullptr;
  282. IComputationExternalNode* inLoad = nullptr;
  283. IComputationNode* outLoad = nullptr;
  284. auto streamType = callable.GetInput(0).GetStaticType();
  285. MKQL_ENSURE(streamType->IsStream(), "Expected stream");
  286. auto stream = LocateNode(ctx.NodeLocator, callable, 0);
  287. auto outTime = LocateNode(ctx.NodeLocator, callable, 7);
  288. auto outInit = LocateNode(ctx.NodeLocator, callable, 8);
  289. auto outUpdate = LocateNode(ctx.NodeLocator, callable, 9);
  290. if (hasSaveLoad) {
  291. outSave = LocateNode(ctx.NodeLocator, callable, 10);
  292. outLoad = LocateNode(ctx.NodeLocator, callable, 11);
  293. }
  294. auto outMerge = LocateNode(ctx.NodeLocator, callable, 12);
  295. auto outFinish = LocateNode(ctx.NodeLocator, callable, 13);
  296. auto hop = LocateNode(ctx.NodeLocator, callable, 14);
  297. auto interval = LocateNode(ctx.NodeLocator, callable, 15);
  298. auto delay = LocateNode(ctx.NodeLocator, callable, 16);
  299. auto item = LocateExternalNode(ctx.NodeLocator, callable, 1);
  300. auto state = LocateExternalNode(ctx.NodeLocator, callable, 2);
  301. auto state2 = LocateExternalNode(ctx.NodeLocator, callable, 3);
  302. auto time = LocateExternalNode(ctx.NodeLocator, callable, 4);
  303. if (hasSaveLoad) {
  304. inSave = LocateExternalNode(ctx.NodeLocator, callable, 5);
  305. inLoad = LocateExternalNode(ctx.NodeLocator, callable, 6);
  306. }
  307. auto stateType = hasSaveLoad ? callable.GetInput(10).GetStaticType() : nullptr;
  308. return new THoppingCoreWrapper(ctx.Mutables,
  309. stream, item, state, state2, time, inSave, inLoad,
  310. outTime, outInit, outUpdate, outSave, outLoad, outMerge, outFinish,
  311. hop, interval, delay, stateType);
  312. }
  313. }
  314. }