mkql_zip.cpp 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. #include "mkql_zip.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
  4. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  5. #include <yql/essentials/minikql/mkql_node_cast.h>
  6. namespace NKikimr {
  7. namespace NMiniKQL {
  8. namespace {
  9. template <bool All>
  10. class TZipWrapper : public TMutableComputationNode<TZipWrapper<All>> {
  11. typedef TMutableComputationNode<TZipWrapper<All>> TBaseComputation;
  12. public:
  13. using TSelf = TZipWrapper<All>;
  14. class TValue : public TCustomListValue {
  15. public:
  16. class TIterator : public TComputationValue<TIterator> {
  17. public:
  18. TIterator(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& iters, TComputationContext& ctx, const TSelf* self)
  19. : TComputationValue<TIterator>(memInfo)
  20. , Iters(std::move(iters))
  21. , Ctx(ctx)
  22. , Self(self)
  23. {}
  24. private:
  25. bool Next(NUdf::TUnboxedValue& value) override {
  26. bool hasSome = false;
  27. NUdf::TUnboxedValue* items = nullptr;
  28. auto tuple = Self->ResTuple.NewArray(Ctx, Iters.size(), items);
  29. for (auto& iter : Iters) {
  30. if (iter) {
  31. NUdf::TUnboxedValue item;
  32. if (!iter.Next(item)) {
  33. if (All) {
  34. *items = std::move(item);
  35. iter = NUdf::TUnboxedValue();
  36. } else {
  37. Iters.clear();
  38. return false;
  39. }
  40. } else {
  41. *items = All ? NUdf::TUnboxedValue(item.Release().MakeOptional()) : std::move(item);
  42. hasSome = true;
  43. }
  44. } else {
  45. if (All) {
  46. *items = NUdf::TUnboxedValuePod();
  47. } else {
  48. Iters.clear();
  49. return false;
  50. }
  51. }
  52. ++items;
  53. }
  54. if (!hasSome)
  55. return false;
  56. value = std::move(tuple);
  57. return true;
  58. }
  59. bool Skip() override {
  60. bool hasSome = false;
  61. for (size_t i = 0, e = Iters.size(); i < e; i++) {
  62. auto& iter = Iters[i];
  63. if (iter) {
  64. if (!iter.Skip()) {
  65. if (All) {
  66. Iters[i] = NUdf::TUnboxedValue();
  67. } else {
  68. Iters.clear();
  69. return false;
  70. }
  71. } else {
  72. hasSome = true;
  73. }
  74. } else if (!All) {
  75. return false;
  76. }
  77. }
  78. return hasSome;
  79. }
  80. TUnboxedValueVector Iters;
  81. TComputationContext& Ctx;
  82. const TSelf* const Self;
  83. };
  84. TValue(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& lists, TComputationContext& ctx,
  85. const TSelf* self)
  86. : TCustomListValue(memInfo)
  87. , Lists(std::move(lists))
  88. , Ctx(ctx)
  89. , Self(self)
  90. {
  91. MKQL_MEM_TAKE(memInfo, &Lists, Lists.capacity() * sizeof(NUdf::TUnboxedValue));
  92. Y_ASSERT(!Lists.empty());
  93. }
  94. ~TValue() {
  95. MKQL_MEM_RETURN(GetMemInfo(), &Lists, Lists.capacity() * sizeof(NUdf::TUnboxedValue));
  96. }
  97. private:
  98. NUdf::TUnboxedValue GetListIterator() const override {
  99. if (Lists.empty()) {
  100. return Ctx.HolderFactory.GetEmptyContainerLazy();
  101. }
  102. TUnboxedValueVector iters;
  103. iters.reserve(Lists.size());
  104. for (auto& list : Lists) {
  105. iters.emplace_back(list.GetListIterator());
  106. }
  107. return Ctx.HolderFactory.Create<TIterator>(std::move(iters), Ctx, Self);
  108. }
  109. ui64 GetListLength() const override {
  110. if (!Length) {
  111. ui64 length = 0;
  112. if (!Lists.empty()) {
  113. if (!All) {
  114. length = Max<ui64>();
  115. }
  116. for (auto& list : Lists) {
  117. ui64 partialLength = list.GetListLength();
  118. if (All) {
  119. length = Max(length, partialLength);
  120. } else {
  121. length = Min(length, partialLength);
  122. }
  123. }
  124. }
  125. Length = length;
  126. }
  127. return *Length;
  128. }
  129. bool HasListItems() const override {
  130. if (!HasItems) {
  131. HasItems = GetListLength() != 0;
  132. }
  133. return *HasItems;
  134. }
  135. TUnboxedValueVector Lists;
  136. TComputationContext& Ctx;
  137. const TSelf *const Self;
  138. };
  139. TZipWrapper(TComputationMutables& mutables, TComputationNodePtrVector& lists)
  140. : TBaseComputation(mutables)
  141. , Lists(std::move(lists))
  142. , ResTuple(mutables)
  143. {}
  144. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  145. TUnboxedValueVector listValues;
  146. TSmallVec<const NUdf::TUnboxedValue*, TMKQLAllocator<const NUdf::TUnboxedValue*>> arrays;
  147. listValues.reserve(Lists.size());
  148. arrays.reserve(Lists.size());
  149. for (auto& list : Lists) {
  150. listValues.emplace_back(list->GetValue(ctx));
  151. arrays.emplace_back(listValues.back().GetElements());
  152. }
  153. if (std::any_of(arrays.cbegin(), arrays.cend(), std::logical_not<const NUdf::TUnboxedValue*>()))
  154. return ctx.HolderFactory.Create<TValue>(std::move(listValues), ctx, this);
  155. TSmallVec<ui64, TMKQLAllocator<ui64>> sizes;
  156. sizes.reserve(listValues.size());
  157. std::transform(listValues.cbegin(), listValues.cend(), std::back_inserter(sizes), std::bind(&NUdf::TUnboxedValuePod::GetListLength, std::placeholders::_1));
  158. const auto size = *(All ? std::max_element(sizes.cbegin(), sizes.cend()) : std::min_element(sizes.cbegin(), sizes.cend()));
  159. if (!size)
  160. return ctx.HolderFactory.GetEmptyContainerLazy();
  161. NUdf::TUnboxedValue *listItems = nullptr;
  162. const auto list = ctx.HolderFactory.CreateDirectArrayHolder(size, listItems);
  163. for (auto i = 0U; i < size; ++i) {
  164. NUdf::TUnboxedValue *items = nullptr;
  165. *listItems++ = ctx.HolderFactory.CreateDirectArrayHolder(arrays.size(), items);
  166. for (auto j = 0U; j < arrays.size(); ++j) {
  167. if constexpr (All) {
  168. if (sizes[j] > i)
  169. *items++ = *arrays[j]++;
  170. else
  171. ++items;
  172. } else {
  173. *items++ = *arrays[j]++;
  174. }
  175. }
  176. }
  177. return list;
  178. }
  179. private:
  180. void RegisterDependencies() const final {
  181. std::for_each(Lists.cbegin(), Lists.cend(), std::bind(&TZipWrapper::DependsOn, this, std::placeholders::_1));
  182. }
  183. const TComputationNodePtrVector Lists;
  184. const TContainerCacheOnContext ResTuple;
  185. };
  186. }
  187. template <bool All>
  188. IComputationNode* WrapZip(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  189. TComputationNodePtrVector lists;
  190. lists.reserve(callable.GetInputsCount());
  191. for (ui32 i = 0, e = callable.GetInputsCount(); i < e; ++i) {
  192. auto type = callable.GetInput(i).GetStaticType();
  193. MKQL_ENSURE(type->IsList() || type->IsEmptyList(), "Unexpected list type");
  194. lists.push_back(LocateNode(ctx.NodeLocator, callable, i));
  195. }
  196. return new TZipWrapper<All>(ctx.Mutables, lists);
  197. }
  198. template
  199. IComputationNode* WrapZip<false>(TCallable& callable, const TComputationNodeFactoryContext& ctx);
  200. template
  201. IComputationNode* WrapZip<true>(TCallable& callable, const TComputationNodeFactoryContext& ctx);
  202. }
  203. }