mkql_replicate.cpp 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. #include "mkql_replicate.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_program_builder.h>
  6. namespace NKikimr {
  7. namespace NMiniKQL {
  8. namespace {
  9. class TReplicateWrapper : public TMutableComputationNode<TReplicateWrapper> {
  10. typedef TMutableComputationNode<TReplicateWrapper> TBaseComputation;
  11. public:
  12. class TValue : public TCustomListValue {
  13. public:
  14. template <EDictItems Mode>
  15. class TIterator : public TComputationValue<TIterator<Mode>> {
  16. public:
  17. TIterator(TMemoryUsageInfo* memInfo, const NUdf::TUnboxedValue& item, ui64 count)
  18. : TComputationValue<TIterator<Mode>>(memInfo)
  19. , Item(item)
  20. , Current(0)
  21. , End(count)
  22. {}
  23. private:
  24. bool NextPair(NUdf::TUnboxedValue& key, NUdf::TUnboxedValue& payload) override {
  25. if (Current < End) {
  26. switch (Mode) {
  27. case EDictItems::Payloads:
  28. this->ThrowNotSupported(__func__);
  29. break;
  30. case EDictItems::Keys:
  31. this->ThrowNotSupported(__func__);
  32. break;
  33. case EDictItems::Both:
  34. key = NUdf::TUnboxedValuePod(ui64(Current));
  35. payload = Item;
  36. break;
  37. }
  38. ++Current;
  39. return true;
  40. }
  41. return false;
  42. }
  43. bool Next(NUdf::TUnboxedValue& value) override {
  44. if (Current < End) {
  45. switch (Mode) {
  46. case EDictItems::Payloads:
  47. value = Item;
  48. break;
  49. case EDictItems::Keys:
  50. value = NUdf::TUnboxedValuePod(ui64(Current));
  51. break;
  52. case EDictItems::Both:
  53. this->ThrowNotSupported(__func__);
  54. break;
  55. }
  56. ++Current;
  57. return true;
  58. }
  59. return false;
  60. }
  61. bool Skip() override {
  62. if (Current < End) {
  63. ++Current;
  64. return true;
  65. }
  66. return false;
  67. }
  68. const NUdf::TUnboxedValue Item;
  69. ui64 Current;
  70. const ui64 End;
  71. };
  72. TValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const NUdf::TUnboxedValue& item, ui64 count)
  73. : TCustomListValue(memInfo)
  74. , Ctx(ctx)
  75. , Item(item)
  76. , Count(count)
  77. {
  78. }
  79. private:
  80. NUdf::TUnboxedValue GetListIterator() const override {
  81. return Ctx.HolderFactory.Create<TIterator<EDictItems::Payloads>>(Item, Count);
  82. }
  83. bool HasFastListLength() const override {
  84. return true;
  85. }
  86. ui64 GetListLength() const override {
  87. return Count;
  88. }
  89. ui64 GetEstimatedListLength() const override {
  90. return Count;
  91. }
  92. bool HasListItems() const override {
  93. return Count > 0;
  94. }
  95. NUdf::IBoxedValuePtr ReverseListImpl(const NUdf::IValueBuilder& builder) const override {
  96. Y_UNUSED(builder);
  97. return const_cast<TValue*>(this);
  98. }
  99. NUdf::IBoxedValuePtr SkipListImpl(const NUdf::IValueBuilder& builder, ui64 count) const override {
  100. Y_UNUSED(builder);
  101. if (count == 0) {
  102. return const_cast<TValue*>(this);
  103. }
  104. if (count >= Count) {
  105. return Ctx.HolderFactory.GetEmptyContainerLazy().AsBoxed();
  106. }
  107. return Ctx.HolderFactory.Create<TValue>(Ctx, Item, Count - count).AsBoxed();
  108. }
  109. NUdf::IBoxedValuePtr TakeListImpl(const NUdf::IValueBuilder& builder, ui64 count) const override {
  110. Y_UNUSED(builder);
  111. if (count == 0) {
  112. return Ctx.HolderFactory.GetEmptyContainerLazy().AsBoxed();
  113. }
  114. if (count >= Count) {
  115. return const_cast<TValue*>(this);
  116. }
  117. return Ctx.HolderFactory.Create<TValue>(Ctx, Item, count).AsBoxed();
  118. }
  119. NUdf::IBoxedValuePtr ToIndexDictImpl(const NUdf::IValueBuilder& builder) const override {
  120. Y_UNUSED(builder);
  121. return const_cast<TValue*>(this);
  122. }
  123. ui64 GetDictLength() const override {
  124. return Count;
  125. }
  126. bool HasDictItems() const override {
  127. return Count > 0;
  128. }
  129. bool Contains(const NUdf::TUnboxedValuePod& key) const override {
  130. return key.Get<ui64>() < Count;
  131. }
  132. NUdf::TUnboxedValue Lookup(const NUdf::TUnboxedValuePod& key) const override {
  133. if (key.Get<ui64>() < Count) {
  134. return Item.MakeOptional();
  135. }
  136. return {};
  137. }
  138. NUdf::TUnboxedValue GetDictIterator() const override {
  139. return Ctx.HolderFactory.Create<TIterator<EDictItems::Both>>(Item, Count);
  140. }
  141. NUdf::TUnboxedValue GetKeysIterator() const override {
  142. return Ctx.HolderFactory.Create<TIterator<EDictItems::Keys>>(Item, Count);
  143. }
  144. NUdf::TUnboxedValue GetPayloadsIterator() const override {
  145. return GetListIterator();
  146. }
  147. bool IsSortedDict() const override {
  148. return true;
  149. }
  150. TComputationContext& Ctx;
  151. const NUdf::TUnboxedValue Item;
  152. const ui64 Count;
  153. };
  154. TReplicateWrapper(TComputationMutables& mutables, IComputationNode* item, IComputationNode* count,
  155. NUdf::TSourcePosition pos)
  156. : TBaseComputation(mutables)
  157. , Item(item)
  158. , Count(count)
  159. , Pos(pos)
  160. {
  161. }
  162. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  163. const auto count = Count->GetValue(ctx).Get<ui64>();
  164. const ui64 MAX_VALUE = 1ull << 32;
  165. if (count >= MAX_VALUE) {
  166. TStringBuilder res;
  167. res << Pos << " Second argument in ListReplicate = " << count << " exceeds maximum value = " << MAX_VALUE;
  168. UdfTerminate(res.data());
  169. }
  170. if (!count) {
  171. return ctx.HolderFactory.GetEmptyContainerLazy();
  172. }
  173. return ctx.HolderFactory.Create<TValue>(ctx, Item->GetValue(ctx), count);
  174. }
  175. private:
  176. void RegisterDependencies() const final {
  177. DependsOn(Item);
  178. DependsOn(Count);
  179. }
  180. IComputationNode* const Item;
  181. IComputationNode* const Count;
  182. const NUdf::TSourcePosition Pos;
  183. };
  184. }
  185. IComputationNode* WrapReplicate(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  186. MKQL_ENSURE(callable.GetInputsCount() == 2 || callable.GetInputsCount() == 5, "Expected 2 or 5 args");
  187. const auto countType = AS_TYPE(TDataType, callable.GetInput(1));
  188. MKQL_ENSURE(countType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64");
  189. const auto list = LocateNode(ctx.NodeLocator, callable, 0);
  190. const auto count = LocateNode(ctx.NodeLocator, callable, 1);
  191. NUdf::TSourcePosition pos;
  192. if (callable.GetInputsCount() == 5) {
  193. const TStringBuf file = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().AsStringRef();
  194. const ui32 row = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui32>();
  195. const ui32 column = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<ui32>();
  196. pos = NUdf::TSourcePosition(row, column, file);
  197. }
  198. return new TReplicateWrapper(ctx.Mutables, list, count, pos);
  199. }
  200. }
  201. }