mkql_time_order_recover.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #include "mkql_time_order_recover.h"
  2. #include "mkql_saveload.h"
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  4. #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
  5. #include <yql/essentials/minikql/mkql_node_cast.h>
  6. #include <yql/essentials/minikql/mkql_string_util.h>
  7. #include <queue>
  8. namespace NKikimr::NMiniKQL {
  9. namespace {
  10. constexpr ui32 StateVersion = 1;
  11. class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover, true> {
  12. using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover, true>;
  13. public:
  14. class TState: public TComputationValue<TState> {
  15. public:
  16. using TTimestamp = i64; //use signed integers to simplify arithmetics
  17. using TTimeinterval = i64;
  18. using TSelf = TTimeOrderRecover;
  19. TState(
  20. TMemoryUsageInfo* memInfo,
  21. const TSelf* self,
  22. TTimeinterval delay,
  23. TTimeinterval ahead,
  24. ui32 rowLimit,
  25. TComputationContext& ctx)
  26. : TComputationValue<TState>(memInfo)
  27. , Self(self)
  28. , Heap(Greater)
  29. , Delay(delay)
  30. , Ahead(ahead)
  31. , RowLimit(rowLimit + 1)
  32. , Latest(0)
  33. , Terminating(false)
  34. , MonotonicCounter(0)
  35. , Ctx(ctx)
  36. {}
  37. private:
  38. using THeapKey = std::pair<TTimestamp, ui64>;
  39. using TEntry = std::pair<THeapKey, NUdf::TUnboxedValue>;
  40. static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) {
  41. return lhs.first > rhs.first;
  42. };
  43. using TStdHeap = std::priority_queue<
  44. TEntry,
  45. std::vector<TEntry, TMKQLAllocator<TEntry>>,
  46. decltype(Greater)>;
  47. struct THeap: public TStdHeap {
  48. template<typename...TArgs>
  49. THeap(TArgs... args) : TStdHeap(args...) {}
  50. auto begin() const { return c.begin(); }
  51. auto end() const { return c.end(); }
  52. auto clear() { return c.clear(); }
  53. };
  54. public:
  55. NUdf::TUnboxedValue GetOutputIfReady() {
  56. if (Terminating && Heap.empty()) {
  57. return NUdf::TUnboxedValue::MakeFinish();
  58. }
  59. if (Heap.empty()) {
  60. return NUdf::TUnboxedValue{};
  61. }
  62. THeapKey oldestKey = Heap.top().first;
  63. TTimestamp oldest = oldestKey.first;
  64. if (oldest < Latest + Delay || Heap.size() == RowLimit || Terminating) {
  65. auto result = std::move(Heap.top().second);
  66. Heap.pop();
  67. return result;
  68. }
  69. return NUdf::TUnboxedValue{};
  70. }
  71. ///return input row in case it cannot process it correctly
  72. NUdf::TUnboxedValue ProcessRow(TTimestamp t, NUdf::TUnboxedValue&& row) {
  73. MKQL_ENSURE(!row.IsSpecial(), "Internal logic error");
  74. MKQL_ENSURE(Heap.size() < RowLimit, "Internal logic error");
  75. if (Heap.empty()) {
  76. Latest = t;
  77. }
  78. if (Latest + Delay < t && t < Latest + Ahead) {
  79. Heap.emplace(THeapKey(t, ++MonotonicCounter), std::move(row));
  80. } else {
  81. return row;
  82. }
  83. Latest = std::max(Latest, t);
  84. return NUdf::TUnboxedValue{};
  85. }
  86. void Finish() {
  87. Terminating = true;
  88. }
  89. private:
  90. bool HasListItems() const override {
  91. return false;
  92. }
  93. bool Load2(const NUdf::TUnboxedValue& state) override {
  94. TInputSerializer in(state, EMkqlStateType::SIMPLE_BLOB);
  95. const auto loadStateVersion = in.GetStateVersion();
  96. if (loadStateVersion != StateVersion) {
  97. THROW yexception() << "Invalid state version " << loadStateVersion;
  98. }
  99. const auto heapSize = in.Read<ui32>();
  100. ClearState();
  101. for (auto i = 0U; i < heapSize; ++i) {
  102. TTimestamp t = in.Read<ui64>();
  103. in(MonotonicCounter);
  104. NUdf::TUnboxedValue row = in.ReadUnboxedValue(Self->Packer.RefMutableObject(Ctx, false, Self->StateType), Ctx);
  105. Heap.emplace(THeapKey(t, MonotonicCounter), std::move(row));
  106. }
  107. in(Latest, Terminating);
  108. return true;
  109. }
  110. NUdf::TUnboxedValue Save() const override {
  111. TOutputSerializer out(EMkqlStateType::SIMPLE_BLOB, StateVersion, Ctx);
  112. out.Write<ui32>(Heap.size());
  113. for (const TEntry& entry : Heap) {
  114. THeapKey key = entry.first;
  115. out(key);
  116. out.WriteUnboxedValue(Self->Packer.RefMutableObject(Ctx, false, Self->StateType), entry.second);
  117. }
  118. out(Latest, Terminating);
  119. return out.MakeState();
  120. }
  121. void ClearState() {
  122. Heap.clear();
  123. Latest = 0;
  124. Terminating = false;
  125. }
  126. private:
  127. const TSelf *const Self;
  128. THeap Heap;
  129. const TTimeinterval Delay;
  130. const TTimeinterval Ahead;
  131. const ui32 RowLimit;
  132. TTimestamp Latest;
  133. bool Terminating; //not applicable for streams, but useful for debug and testing
  134. ui64 MonotonicCounter;
  135. TComputationContext& Ctx;
  136. };
  137. TTimeOrderRecover(
  138. TComputationMutables& mutables,
  139. EValueRepresentation kind,
  140. IComputationNode* inputFlow,
  141. IComputationExternalNode* inputRowArg,
  142. IComputationNode* rowTime,
  143. ui32 inputRowColumnCount,
  144. ui32 outOfOrderColumnIndex,
  145. IComputationNode* delay,
  146. IComputationNode* ahead,
  147. IComputationNode* rowLimit,
  148. TType* stateType)
  149. : TBaseComputation(mutables, inputFlow, kind)
  150. , InputFlow(inputFlow)
  151. , InputRowArg(inputRowArg)
  152. , RowTime(rowTime)
  153. , InputRowColumnCount(inputRowColumnCount)
  154. , OutOfOrderColumnIndex(outOfOrderColumnIndex)
  155. , Delay(delay)
  156. , Ahead(ahead)
  157. , RowLimit(rowLimit)
  158. , Cache(mutables)
  159. , StateType(stateType)
  160. , Packer(mutables)
  161. { }
  162. NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx) const {
  163. if (stateValue.IsInvalid()) {
  164. stateValue = ctx.HolderFactory.Create<TState>(
  165. this,
  166. Delay->GetValue(ctx).Get<i64>(),
  167. Ahead->GetValue(ctx).Get<i64>(),
  168. RowLimit->GetValue(ctx).Get<ui32>(),
  169. ctx);
  170. } else if (stateValue.HasValue()) {
  171. MKQL_ENSURE(stateValue.IsBoxed(), "Expected boxed value");
  172. bool isStateToLoad = stateValue.HasListItems();
  173. if (isStateToLoad) {
  174. // Load from saved state.
  175. NUdf::TUnboxedValue state = ctx.HolderFactory.Create<TState>(
  176. this,
  177. Delay->GetValue(ctx).Get<i64>(),
  178. Ahead->GetValue(ctx).Get<i64>(),
  179. RowLimit->GetValue(ctx).Get<ui32>(),
  180. ctx);
  181. state.Load2(stateValue);
  182. stateValue = state;
  183. }
  184. }
  185. auto& state = *static_cast<TState *>(stateValue.AsBoxed().Get());
  186. while (true) {
  187. if (auto out = state.GetOutputIfReady()) {
  188. return AddColumn(std::move(out), false, ctx);
  189. }
  190. auto item = InputFlow->GetValue(ctx);
  191. if (item.IsSpecial()) {
  192. if (item.IsFinish()) {
  193. state.Finish();
  194. } else {
  195. return item;
  196. }
  197. } else {
  198. InputRowArg->SetValue(ctx, NUdf::TUnboxedValue{item});
  199. const auto t = RowTime->GetValue(ctx).Get<ui64>();
  200. if (auto row = state.ProcessRow(static_cast<TState::TTimestamp>(t), std::move(item))) {
  201. return AddColumn(std::move(row), true, ctx);
  202. }
  203. }
  204. }
  205. }
  206. private:
  207. void RegisterDependencies() const final {
  208. if (const auto flow = FlowDependsOn(InputFlow)) {
  209. Own(flow, InputRowArg);
  210. DependsOn(flow, RowTime);
  211. }
  212. }
  213. NUdf::TUnboxedValue AddColumn(NUdf::TUnboxedValue&& row, bool outOfOrder, TComputationContext& ctx) const {
  214. if (row.IsSpecial()) {
  215. return row;
  216. }
  217. NUdf::TUnboxedValue* itemsPtr = nullptr;
  218. auto result = Cache.NewArray(ctx, InputRowColumnCount + 1, itemsPtr);
  219. ui32 inputColumnIndex = 0;
  220. for (ui32 i = 0; i != InputRowColumnCount + 1; ++i) {
  221. if (OutOfOrderColumnIndex == i) {
  222. *itemsPtr++ = NUdf::TUnboxedValuePod{outOfOrder};
  223. } else {
  224. *itemsPtr++ = std::move(row.GetElements()[inputColumnIndex++]);
  225. }
  226. }
  227. return result;
  228. }
  229. IComputationNode* const InputFlow;
  230. IComputationExternalNode* const InputRowArg;
  231. IComputationNode* const RowTime;
  232. const ui32 InputRowColumnCount;
  233. const ui32 OutOfOrderColumnIndex;
  234. const IComputationNode* Delay;
  235. const IComputationNode* Ahead;
  236. const IComputationNode* RowLimit;
  237. const TContainerCacheOnContext Cache;
  238. TType* const StateType;
  239. TMutableObjectOverBoxedValue<TValuePackerBoxed> Packer;
  240. };
  241. } //namespace
  242. IComputationNode* TimeOrderRecover(const TComputationNodeFactoryContext& ctx,
  243. TRuntimeNode inputFlow,
  244. TRuntimeNode inputRowArg,
  245. TRuntimeNode rowTime,
  246. TRuntimeNode inputRowColumnCount,
  247. TRuntimeNode outOfOrderColumnIndex,
  248. TRuntimeNode delay,
  249. TRuntimeNode ahead,
  250. TRuntimeNode rowLimit)
  251. {
  252. auto* rowType = AS_TYPE(TStructType, AS_TYPE(TFlowType, inputFlow.GetStaticType())->GetItemType());
  253. return new TTimeOrderRecover(ctx.Mutables
  254. , GetValueRepresentation(inputFlow.GetStaticType())
  255. , LocateNode(ctx.NodeLocator, *inputFlow.GetNode())
  256. , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode()))
  257. , LocateNode(ctx.NodeLocator, *rowTime.GetNode())
  258. , AS_VALUE(TDataLiteral, inputRowColumnCount)->AsValue().Get<ui32>()
  259. , AS_VALUE(TDataLiteral, outOfOrderColumnIndex)->AsValue().Get<ui32>()
  260. , LocateNode(ctx.NodeLocator, *delay.GetNode())
  261. , LocateNode(ctx.NodeLocator, *ahead.GetNode())
  262. , LocateNode(ctx.NodeLocator, *rowLimit.GetNode())
  263. , rowType
  264. );
  265. }
  266. }//namespace NKikimr::NMiniKQL