mkql_multimap.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. #include "mkql_multimap.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/utils/cast.h>
  6. #include <util/string/cast.h>
  7. namespace NKikimr {
  8. namespace NMiniKQL {
  9. using NYql::EnsureDynamicCast;
  10. namespace {
  11. class TFlowMultiMapWrapper : public TStatefulFlowCodegeneratorNode<TFlowMultiMapWrapper> {
  12. typedef TStatefulFlowCodegeneratorNode<TFlowMultiMapWrapper> TBaseComputation;
  13. public:
  14. TFlowMultiMapWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationNode* flow, IComputationExternalNode* item, TComputationNodePtrVector&& newItems)
  15. : TBaseComputation(mutables, flow, kind), Flow(flow), Item(item), NewItems(std::move(newItems))
  16. {}
  17. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  18. if (state.IsFinish())
  19. return NUdf::TUnboxedValuePod::MakeFinish();
  20. const auto pos = state.IsInvalid() ? 0ULL : state.Get<ui64>();
  21. if (!pos) {
  22. if (auto item = Flow->GetValue(ctx); item.IsSpecial()) {
  23. return item.Release();
  24. } else {
  25. Item->SetValue(ctx, std::move(item));
  26. }
  27. }
  28. const auto next = pos + 1ULL;
  29. state = NewItems.size() == next ? NUdf::TUnboxedValuePod::Invalid() : NUdf::TUnboxedValuePod(ui64(next));
  30. return NewItems[pos]->GetValue(ctx).Release();
  31. }
  32. #ifndef MKQL_DISABLE_CODEGEN
  33. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  34. auto& context = ctx.Codegen.GetContext();
  35. const auto codegenItem = dynamic_cast<ICodegeneratorExternalNode*>(Item);
  36. MKQL_ENSURE(codegenItem, "Item must be codegenerator node.");
  37. const auto valueType = Type::getInt128Ty(context);
  38. const auto state = new LoadInst(valueType, statePtr, "state", block);
  39. const auto zero = BasicBlock::Create(context, "zero", ctx.Func);
  40. const auto work = BasicBlock::Create(context, "work", ctx.Func);
  41. const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
  42. const auto result = PHINode::Create(valueType, NewItems.size() + 1U, "result", pass);
  43. const auto choise = SwitchInst::Create(state, zero, NewItems.size() - 1U, block);
  44. for (ui32 i = 1U; i < NewItems.size();) {
  45. const auto part = BasicBlock::Create(context, (TString("part_") += ToString(i)).c_str(), ctx.Func);
  46. choise->addCase(GetConstant(i, context), part);
  47. block = part;
  48. const auto out = GetNodeValue(NewItems[i], ctx, block);
  49. result->addIncoming(out, block);
  50. const auto next = ++i;
  51. new StoreInst(NewItems.size() <= next ? GetInvalid(context) : GetConstant(next, context), statePtr, block);
  52. BranchInst::Create(pass, block);
  53. }
  54. {
  55. block = zero;
  56. const auto item = GetNodeValue(Flow, ctx, block);
  57. result->addIncoming(item, block);
  58. BranchInst::Create(pass, work, IsSpecial(item, block, context), block);
  59. block = work;
  60. codegenItem->CreateSetValue(ctx, block, item);
  61. const auto out = GetNodeValue(NewItems.front(), ctx, block);
  62. result->addIncoming(out, block);
  63. new StoreInst(GetConstant(1ULL, context), statePtr, block);
  64. BranchInst::Create(pass, block);
  65. }
  66. block = pass;
  67. return result;
  68. }
  69. #endif
  70. private:
  71. void RegisterDependencies() const final {
  72. if (const auto flow = FlowDependsOn(Flow)) {
  73. Own(flow, Item);
  74. }
  75. }
  76. IComputationNode* const Flow;
  77. IComputationExternalNode* const Item;
  78. const TComputationNodePtrVector NewItems;
  79. };
  80. class TListMultiMapWrapper : public TBothWaysCodegeneratorNode<TListMultiMapWrapper> {
  81. private:
  82. typedef TBothWaysCodegeneratorNode<TListMultiMapWrapper> TBaseComputation;
  83. class TListValue : public TCustomListValue {
  84. public:
  85. class TIterator : public TComputationValue<TIterator> {
  86. public:
  87. TIterator(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, NUdf::TUnboxedValue&& iter, IComputationExternalNode* item, const TComputationNodePtrVector& newItems)
  88. : TComputationValue<TIterator>(memInfo)
  89. , CompCtx(compCtx)
  90. , Iter(std::move(iter))
  91. , Item(item)
  92. , NewItems(newItems)
  93. {}
  94. private:
  95. bool Next(NUdf::TUnboxedValue& value) override {
  96. if (!Position) {
  97. if (!Iter.Next(Item->RefValue(CompCtx))) {
  98. return false;
  99. }
  100. }
  101. value = NewItems[Position]->GetValue(CompCtx);
  102. if (++Position == NewItems.size())
  103. Position = 0U;
  104. return true;
  105. }
  106. TComputationContext& CompCtx;
  107. const NUdf::TUnboxedValue Iter;
  108. IComputationExternalNode* const Item;
  109. const TComputationNodePtrVector NewItems;
  110. size_t Position = 0U;
  111. };
  112. TListValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, NUdf::TUnboxedValue&& list, IComputationExternalNode* item, const TComputationNodePtrVector& newItems)
  113. : TCustomListValue(memInfo)
  114. , CompCtx(compCtx)
  115. , List(std::move(list))
  116. , Item(item)
  117. , NewItems(newItems)
  118. {}
  119. private:
  120. NUdf::TUnboxedValue GetListIterator() const final {
  121. return CompCtx.HolderFactory.Create<TIterator>(CompCtx, List.GetListIterator(), Item, NewItems);
  122. }
  123. ui64 GetListLength() const final {
  124. if (!Length) {
  125. Length = List.GetListLength() * NewItems.size();
  126. }
  127. return *Length;
  128. }
  129. bool HasListItems() const final {
  130. if (!HasItems) {
  131. HasItems = List.HasListItems();
  132. }
  133. return *HasItems;
  134. }
  135. bool HasFastListLength() const final {
  136. return List.HasFastListLength();
  137. }
  138. TComputationContext& CompCtx;
  139. const NUdf::TUnboxedValue List;
  140. IComputationExternalNode* const Item;
  141. const TComputationNodePtrVector NewItems;
  142. };
  143. public:
  144. TListMultiMapWrapper(TComputationMutables& mutables, IComputationNode* list, IComputationExternalNode* item, TComputationNodePtrVector&& newItems)
  145. : TBaseComputation(mutables), List(list), Item(item), NewItems(std::move(newItems))
  146. {}
  147. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  148. auto list = List->GetValue(ctx);
  149. if (auto elements = list.GetElements()) {
  150. auto size = list.GetListLength();
  151. NUdf::TUnboxedValue* items = nullptr;
  152. const auto result = ctx.HolderFactory.CreateDirectArrayHolder(size * NewItems.size(), items);
  153. while (size--) {
  154. Item->SetValue(ctx, NUdf::TUnboxedValue(*elements++));
  155. for (const auto newItem : NewItems)
  156. *items++ = newItem->GetValue(ctx);
  157. }
  158. return result;
  159. }
  160. return ctx.HolderFactory.Create<TListValue>(ctx, std::move(list), Item, NewItems);
  161. }
  162. #ifndef MKQL_DISABLE_CODEGEN
  163. using TCodegenValue = TCustomListCodegenStatefulValueT<TCodegenStatefulIterator<ui64>>;
  164. NUdf::TUnboxedValuePod MakeLazyList(TComputationContext& ctx, const NUdf::TUnboxedValuePod value) const {
  165. return ctx.HolderFactory.Create<TCodegenValue>(Map, &ctx, value);
  166. }
  167. Value* DoGenerateGetValue(const TCodegenContext& ctx, BasicBlock*& block) const {
  168. auto& context = ctx.Codegen.GetContext();
  169. const auto codegenItem = dynamic_cast<ICodegeneratorExternalNode*>(Item);
  170. MKQL_ENSURE(codegenItem, "Item must be codegenerator node.");
  171. const auto list = GetNodeValue(List, ctx, block);
  172. const auto lazy = BasicBlock::Create(context, "lazy", ctx.Func);
  173. const auto hard = BasicBlock::Create(context, "hard", ctx.Func);
  174. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  175. const auto map = PHINode::Create(list->getType(), 2U, "map", done);
  176. const auto elementsType = PointerType::getUnqual(list->getType());
  177. const auto elements = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::GetElements>(elementsType, list, ctx.Codegen, block);
  178. const auto fill = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_NE, elements, ConstantPointerNull::get(elementsType), "fill", block);
  179. BranchInst::Create(hard, lazy, fill, block);
  180. {
  181. block = hard;
  182. const auto size = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::GetListLength>(Type::getInt64Ty(context), list, ctx.Codegen, block);
  183. const auto itemsPtr = *Stateless || ctx.AlwaysInline ?
  184. new AllocaInst(elementsType, 0U, "items_ptr", &ctx.Func->getEntryBlock().back()):
  185. new AllocaInst(elementsType, 0U, "items_ptr", block);
  186. const auto full = BinaryOperator::CreateMul(size, ConstantInt::get(size->getType(), NewItems.size()), "full", block);
  187. const auto array = GenNewArray(ctx, full, itemsPtr, block);
  188. const auto items = new LoadInst(elementsType, itemsPtr, "items", block);
  189. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  190. const auto next = BasicBlock::Create(context, "next", ctx.Func);
  191. const auto stop = BasicBlock::Create(context, "stop", ctx.Func);
  192. const auto index = PHINode::Create(size->getType(), 2U, "index", loop);
  193. index->addIncoming(ConstantInt::get(size->getType(), 0), block);
  194. BranchInst::Create(loop, block);
  195. block = loop;
  196. const auto more = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, size, index, "more", block);
  197. BranchInst::Create(next, stop, more, block);
  198. block = next;
  199. const auto src = GetElementPtrInst::CreateInBounds(list->getType(), elements, {index}, "src", block);
  200. const auto item = new LoadInst(list->getType(), src, "item", block);
  201. codegenItem->CreateSetValue(ctx, block, item);
  202. const auto from = BinaryOperator::CreateMul(index, ConstantInt::get(index->getType(), NewItems.size()), "from", block);
  203. for (ui32 i = 0U; i < NewItems.size(); ++i) {
  204. const auto pos = BinaryOperator::CreateAdd(from, ConstantInt::get(from->getType(), i), (TString("pos_") += ToString(i)).c_str(), block);
  205. const auto dst = GetElementPtrInst::CreateInBounds(list->getType(), items, {pos}, (TString("dst_") += ToString(i)).c_str(), block);
  206. GetNodeValue(dst, NewItems[i], ctx, block);
  207. }
  208. const auto plus = BinaryOperator::CreateAdd(index, ConstantInt::get(size->getType(), 1), "plus", block);
  209. index->addIncoming(plus, block);
  210. BranchInst::Create(loop, block);
  211. block = stop;
  212. if (List->IsTemporaryValue()) {
  213. CleanupBoxed(list, ctx, block);
  214. }
  215. map->addIncoming(array, block);
  216. BranchInst::Create(done, block);
  217. }
  218. {
  219. block = lazy;
  220. const auto doFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TListMultiMapWrapper::MakeLazyList));
  221. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  222. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  223. const auto funType = FunctionType::get(list->getType() , {self->getType(), ctx.Ctx->getType(), list->getType()}, false);
  224. const auto doFuncPtr = CastInst::Create(Instruction::IntToPtr, doFunc, PointerType::getUnqual(funType), "function", block);
  225. const auto value = CallInst::Create(funType, doFuncPtr, {self, ctx.Ctx, list}, "value", block);
  226. map->addIncoming(value, block);
  227. BranchInst::Create(done, block);
  228. }
  229. block = done;
  230. return map;
  231. }
  232. #endif
  233. private:
  234. void RegisterDependencies() const final {
  235. DependsOn(List);
  236. Own(Item);
  237. std::for_each(NewItems.cbegin(), NewItems.cend(), std::bind(&TListMultiMapWrapper::DependsOn, this, std::placeholders::_1));
  238. }
  239. #ifndef MKQL_DISABLE_CODEGEN
  240. void GenerateFunctions(NYql::NCodegen::ICodegen& codegen) final {
  241. TMutableCodegeneratorRootNode<TListMultiMapWrapper>::GenerateFunctions(codegen);
  242. MapFunc = GenerateMapper(codegen, TBaseComputation::MakeName("Next"));
  243. codegen.ExportSymbol(MapFunc);
  244. }
  245. void FinalizeFunctions(NYql::NCodegen::ICodegen& codegen) final {
  246. TMutableCodegeneratorRootNode<TListMultiMapWrapper>::FinalizeFunctions(codegen);
  247. if (MapFunc)
  248. Map = reinterpret_cast<TMapPtr>(codegen.GetPointerToFunction(MapFunc));
  249. }
  250. Function* GenerateMapper(NYql::NCodegen::ICodegen& codegen, const TString& name) const {
  251. auto& module = codegen.GetModule();
  252. auto& context = codegen.GetContext();
  253. const auto codegenItem = dynamic_cast<ICodegeneratorExternalNode*>(Item);
  254. MKQL_ENSURE(codegenItem, "Item must be codegenerator node.");
  255. if (const auto f = module.getFunction(name.c_str()))
  256. return f;
  257. const auto valueType = Type::getInt128Ty(context);
  258. const auto positionType = Type::getInt64Ty(context);
  259. const auto containerType = static_cast<Type*>(valueType);
  260. const auto contextType = GetCompContextType(context);
  261. const auto statusType = Type::getInt1Ty(context);
  262. const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), containerType, PointerType::getUnqual(positionType), PointerType::getUnqual(valueType)}, false);
  263. TCodegenContext ctx(codegen);
  264. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  265. DISubprogramAnnotator annotator(ctx, ctx.Func);
  266. auto args = ctx.Func->arg_begin();
  267. ctx.Ctx = &*args;
  268. const auto containerArg = &*++args;
  269. const auto positionArg = &*++args;
  270. const auto valuePtr = &*++args;
  271. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  272. auto block = main;
  273. const auto container = static_cast<Value*>(containerArg);
  274. const auto position = new LoadInst(positionType, positionArg, "position", false, block);
  275. const auto zero = BasicBlock::Create(context, "zero", ctx.Func);
  276. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  277. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  278. const auto choise = SwitchInst::Create(position, zero, NewItems.size() - 1U, block);
  279. for (ui32 i = 1U; i < NewItems.size();) {
  280. const auto part = BasicBlock::Create(context, (TString("part_") += ToString(i)).c_str(), ctx.Func);
  281. choise->addCase(ConstantInt::get(positionType, i), part);
  282. block = part;
  283. SafeUnRefUnboxedOne(valuePtr, ctx, block);
  284. GetNodeValue(valuePtr, NewItems[i], ctx, block);
  285. const auto next = ++i;
  286. new StoreInst(ConstantInt::get(positionType, NewItems.size() <= next ? 0 : next), positionArg, block);
  287. ReturnInst::Create(context, ConstantInt::getTrue(context), block);
  288. }
  289. block = zero;
  290. const auto itemPtr = codegenItem->CreateRefValue(ctx, block);
  291. const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Next>(statusType, container, codegen, block, itemPtr);
  292. BranchInst::Create(good, done, status, block);
  293. block = good;
  294. SafeUnRefUnboxedOne(valuePtr, ctx, block);
  295. GetNodeValue(valuePtr, NewItems.front(), ctx, block);
  296. new StoreInst(ConstantInt::get(positionType, 1), positionArg, block);
  297. BranchInst::Create(done, block);
  298. block = done;
  299. ReturnInst::Create(context, status, block);
  300. return ctx.Func;
  301. }
  302. using TMapPtr = TCodegenValue::TNextPtr;
  303. Function* MapFunc = nullptr;
  304. TMapPtr Map = nullptr;
  305. #endif
  306. IComputationNode* const List;
  307. IComputationExternalNode* const Item;
  308. const TComputationNodePtrVector NewItems;
  309. };
  310. class TNarrowMultiMapWrapper : public TStatefulFlowCodegeneratorNode<TNarrowMultiMapWrapper> {
  311. using TBaseComputation = TStatefulFlowCodegeneratorNode<TNarrowMultiMapWrapper>;
  312. public:
  313. TNarrowMultiMapWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& newItems)
  314. : TBaseComputation(mutables, flow, kind)
  315. , Flow(flow)
  316. , Items(std::move(items))
  317. , NewItems(std::move(newItems))
  318. , PasstroughtMap(GetPasstroughtMap(Items, NewItems))
  319. , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
  320. {}
  321. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  322. if (state.IsFinish())
  323. return NUdf::TUnboxedValuePod::MakeFinish();
  324. const auto pos = state.IsInvalid() ? 0ULL : state.Get<ui64>();
  325. if (!pos) {
  326. auto** fields = ctx.WideFields.data() + WideFieldsIndex;
  327. for (auto i = 0U; i < Items.size(); ++i)
  328. if (Items[i]->GetDependencesCount() > 0U || PasstroughtMap[i])
  329. fields[i] = &Items[i]->RefValue(ctx);
  330. switch (Flow->FetchValues(ctx, fields)) {
  331. case EFetchResult::Finish:
  332. return NUdf::TUnboxedValuePod::MakeFinish();
  333. case EFetchResult::Yield:
  334. return NUdf::TUnboxedValuePod::MakeYield();
  335. default:
  336. break;
  337. }
  338. }
  339. const auto next = pos + 1ULL;
  340. state = NewItems.size() == next ? NUdf::TUnboxedValuePod::Invalid() : NUdf::TUnboxedValuePod(ui64(next));
  341. return NewItems[pos]->GetValue(ctx).Release();
  342. }
  343. #ifndef MKQL_DISABLE_CODEGEN
  344. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  345. auto& context = ctx.Codegen.GetContext();
  346. const auto valueType = Type::getInt128Ty(context);
  347. const auto state = new LoadInst(valueType, statePtr, "state", block);
  348. const auto zero = BasicBlock::Create(context, "zero", ctx.Func);
  349. const auto work = BasicBlock::Create(context, "work", ctx.Func);
  350. const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
  351. const auto result = PHINode::Create(valueType, NewItems.size() + 1U, "result", pass);
  352. const auto choise = SwitchInst::Create(state, zero, NewItems.size() - 1U, block);
  353. for (ui32 i = 1U; i < NewItems.size();) {
  354. const auto part = BasicBlock::Create(context, (TString("part_") += ToString(i)).c_str(), ctx.Func);
  355. choise->addCase(GetConstant(i, context), part);
  356. block = part;
  357. const auto out = GetNodeValue(NewItems[i], ctx, block);
  358. result->addIncoming(out, block);
  359. const auto next = ++i;
  360. new StoreInst(NewItems.size() <= next ? GetInvalid(context) : GetConstant(next, context), statePtr, block);
  361. BranchInst::Create(pass, block);
  362. }
  363. {
  364. block = zero;
  365. const auto getres = GetNodeValues(Flow, ctx, block);
  366. const auto yield = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, getres.first, ConstantInt::get(getres.first->getType(), 0), "yield", block);
  367. const auto good = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SGT, getres.first, ConstantInt::get(getres.first->getType(), 0), "good", block);
  368. const auto outres = SelectInst::Create(yield, GetYield(context), GetFinish(context), "outres", block);
  369. result->addIncoming(outres, block);
  370. BranchInst::Create(work, pass, good, block);
  371. block = work;
  372. Value* head = nullptr;
  373. for (auto i = 0U; i < Items.size(); ++i) {
  374. if (Items[i]->GetDependencesCount() > 0U || PasstroughtMap[i]) {
  375. EnsureDynamicCast<ICodegeneratorExternalNode*>(Items[i])->CreateSetValue(ctx, block, NewItems.front() == Items[i] ? (head = getres.second[i](ctx, block)) : getres.second[i](ctx, block));
  376. }
  377. }
  378. const auto out = head ? head : GetNodeValue(NewItems.front(), ctx, block);
  379. result->addIncoming(out, block);
  380. new StoreInst(GetConstant(1ULL, context), statePtr, block);
  381. BranchInst::Create(pass, block);
  382. }
  383. block = pass;
  384. return result;
  385. }
  386. #endif
  387. private:
  388. void RegisterDependencies() const final {
  389. if (const auto flow = FlowDependsOn(Flow)) {
  390. std::for_each(Items.cbegin(), Items.cend(), std::bind(&TNarrowMultiMapWrapper::Own, flow, std::placeholders::_1));
  391. std::for_each(NewItems.cbegin(), NewItems.cend(), std::bind(&TNarrowMultiMapWrapper::DependsOn, flow, std::placeholders::_1));
  392. }
  393. }
  394. IComputationWideFlowNode* const Flow;
  395. const TComputationExternalNodePtrVector Items;
  396. const TComputationNodePtrVector NewItems;
  397. const TPasstroughtMap PasstroughtMap;
  398. const ui32 WideFieldsIndex;
  399. };
  400. }
  401. IComputationNode* WrapMultiMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  402. MKQL_ENSURE(callable.GetInputsCount() > 2U, "Expected at least three arguments.");
  403. const auto listType = callable.GetInput(0).GetStaticType();
  404. const auto type = callable.GetType()->GetReturnType();
  405. const auto list = LocateNode(ctx.NodeLocator, callable, 0);
  406. TComputationNodePtrVector newItems;
  407. newItems.reserve(callable.GetInputsCount() - 2U);
  408. ui32 index = 1U;
  409. std::generate_n(std::back_inserter(newItems), callable.GetInputsCount() - 2U, [&](){ return LocateNode(ctx.NodeLocator, callable, ++index); });
  410. const auto itemArg = LocateExternalNode(ctx.NodeLocator, callable, 1U);
  411. if (listType->IsFlow()) {
  412. return new TFlowMultiMapWrapper(ctx.Mutables, GetValueRepresentation(type), list, itemArg, std::move(newItems));
  413. } else if (listType->IsList()) {
  414. return new TListMultiMapWrapper(ctx.Mutables, list, itemArg, std::move(newItems));
  415. }
  416. THROW yexception() << "Expected flow or list.";
  417. }
  418. IComputationNode* WrapNarrowMultiMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  419. MKQL_ENSURE(callable.GetInputsCount() > 2U, "Expected at least three arguments.");
  420. auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
  421. const auto width = wideComponents.size();
  422. MKQL_ENSURE(callable.GetInputsCount() > width + 2U, "Wrong signature.");
  423. const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
  424. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
  425. TComputationNodePtrVector newItems;
  426. newItems.reserve(callable.GetInputsCount() - width - 1U);
  427. ui32 index = width;
  428. std::generate_n(std::back_inserter(newItems), callable.GetInputsCount() - width - 1U, [&](){ return LocateNode(ctx.NodeLocator, callable, ++index); });
  429. TComputationExternalNodePtrVector args;
  430. args.reserve(width);
  431. index = 0U;
  432. std::generate_n(std::back_inserter(args), width, [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); });
  433. return new TNarrowMultiMapWrapper(ctx.Mutables, GetValueRepresentation(callable.GetType()->GetReturnType()), wide, std::move(args), std::move(newItems));
  434. }
  435. THROW yexception() << "Expected wide flow.";
  436. }
  437. }
  438. }