mkql_switch.cpp 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044
  1. #include "mkql_switch.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  3. #include <yql/essentials/minikql/computation/mkql_llvm_base.h> // Y_IGNORE
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_stats_registry.h>
  6. #include <yql/essentials/utils/cast.h>
  7. #include <util/string/cast.h>
  8. namespace NKikimr {
  9. namespace NMiniKQL {
  10. using NYql::EnsureDynamicCast;
  11. namespace {
  12. static const TStatKey Switch_FlushesCount("Switch_FlushesCount", true);
  13. static const TStatKey Switch_MaxRowsCount("Switch_MaxRowsCount", false);
  14. using TPagedUnboxedValueList = TPagedList<NUdf::TUnboxedValue>;
  15. struct TSwitchHandler {
  16. std::vector<ui32, TMKQLAllocator<ui32>> InputIndices;
  17. IComputationExternalNode* Item = nullptr;
  18. IComputationNode* NewItem = nullptr;
  19. std::optional<ui32> ResultVariantOffset;
  20. bool IsOutputVariant = false;
  21. EValueRepresentation Kind = EValueRepresentation::Any;
  22. };
  23. using TSwitchHandlersList = std::vector<TSwitchHandler, TMKQLAllocator<TSwitchHandler>>;
  24. class TState : public TComputationValue<TState> {
  25. typedef TComputationValue<TState> TBase;
  26. public:
  27. TState(TMemoryUsageInfo* memInfo, ui32 size)
  28. : TBase(memInfo), ChildReadIndex(size)
  29. {}
  30. ui32 ChildReadIndex;
  31. NUdf::EFetchStatus InputStatus = NUdf::EFetchStatus::Ok;
  32. };
  33. #ifndef MKQL_DISABLE_CODEGEN
  34. class TLLVMFieldsStructureForState: public TLLVMFieldsStructure<TComputationValue<TState>> {
  35. private:
  36. using TBase = TLLVMFieldsStructure<TComputationValue<TState>>;
  37. llvm::IntegerType* IndexType;
  38. llvm::IntegerType* StatusType;
  39. const ui32 FieldsCount = 0;
  40. protected:
  41. using TBase::Context;
  42. ui32 GetFieldsCount() const {
  43. return FieldsCount;
  44. }
  45. std::vector<llvm::Type*> GetFields() {
  46. std::vector<llvm::Type*> result = TBase::GetFields();
  47. result.emplace_back(IndexType); // index
  48. result.emplace_back(StatusType); // status
  49. return result;
  50. }
  51. public:
  52. std::vector<llvm::Type*> GetFieldsArray() {
  53. return GetFields();
  54. }
  55. llvm::Constant* GetIndex() {
  56. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 0);
  57. }
  58. llvm::Constant* GetStatus() {
  59. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 1);
  60. }
  61. TLLVMFieldsStructureForState(llvm::LLVMContext& context)
  62. : TBase(context)
  63. , IndexType(Type::getInt32Ty(context))
  64. , StatusType(Type::getInt32Ty(context))
  65. , FieldsCount(GetFields().size())
  66. {
  67. }
  68. };
  69. #endif
  70. template <bool IsInputVariant, bool TrackRss>
  71. class TSwitchFlowWrapper : public TStatefulFlowCodegeneratorNode<TSwitchFlowWrapper<IsInputVariant, TrackRss>> {
  72. typedef TStatefulFlowCodegeneratorNode<TSwitchFlowWrapper<IsInputVariant, TrackRss>> TBaseComputation;
  73. private:
  74. class TFlowState : public TState {
  75. public:
  76. TFlowState(TMemoryUsageInfo* memInfo, TAlignedPagePool& pool, ui32 size)
  77. : TState(memInfo, size), Buffer(pool)
  78. {}
  79. void Add(NUdf::TUnboxedValuePod item) {
  80. Buffer.Add(std::move(item));
  81. }
  82. void PushStat(IStatsRegistry* stats) const {
  83. if (const auto size = Buffer.Size()) {
  84. MKQL_SET_MAX_STAT(stats, Switch_MaxRowsCount, static_cast<i64>(size));
  85. MKQL_INC_STAT(stats, Switch_FlushesCount);
  86. }
  87. }
  88. NUdf::TUnboxedValuePod Get(ui32 i) const {
  89. if (Buffer.Size() == i) {
  90. return NUdf::EFetchStatus::Finish == InputStatus ?
  91. NUdf::TUnboxedValuePod::MakeFinish():
  92. NUdf::TUnboxedValuePod::MakeYield();
  93. }
  94. return Buffer[i];
  95. }
  96. void Clear() {
  97. Buffer.Clear();
  98. }
  99. void ResetPosition() {
  100. Position = 0U;
  101. }
  102. NUdf::TUnboxedValuePod Handler(ui32, const TSwitchHandler& handler, TComputationContext& ctx) {
  103. while (true) {
  104. auto current = Get(Position);
  105. if (current.IsSpecial()) {
  106. if (current.IsYield())
  107. ResetPosition();
  108. return current;
  109. }
  110. ++Position;
  111. ui32 streamIndex = 0U;
  112. if constexpr (IsInputVariant) {
  113. streamIndex = current.GetVariantIndex();
  114. current = current.GetVariantItem().Release();
  115. }
  116. for (ui32 var = 0U; var < handler.InputIndices.size(); ++var) {
  117. if (handler.InputIndices[var] == streamIndex) {
  118. if (handler.InputIndices.size() > 1) {
  119. current = ctx.HolderFactory.CreateVariantHolder(current, var);
  120. }
  121. return current;
  122. }
  123. }
  124. }
  125. }
  126. ui32 Position = 0U;
  127. TPagedUnboxedValueList Buffer;
  128. };
  129. public:
  130. TSwitchFlowWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationNode* flow, ui64 memLimit, TSwitchHandlersList&& handlers)
  131. : TBaseComputation(mutables, flow, kind, EValueRepresentation::Any)
  132. , Flow(flow)
  133. , MemLimit(memLimit)
  134. , Handlers(std::move(handlers))
  135. {
  136. size_t handlersSize = Handlers.size();
  137. for (ui32 handlerIndex = 0; handlerIndex < handlersSize; ++handlerIndex) {
  138. Handlers[handlerIndex].Item->SetGetter([stateIndex = mutables.CurValueIndex - 1, handlerIndex, this](TComputationContext & context) {
  139. NUdf::TUnboxedValue& state = context.MutableValues[stateIndex];
  140. if (state.IsInvalid()) {
  141. MakeState(context, state);
  142. }
  143. auto ptr = static_cast<TFlowState*>(state.AsBoxed().Get());
  144. return ptr->Handler(handlerIndex, Handlers[handlerIndex], context);
  145. });
  146. #ifndef MKQL_DISABLE_CODEGEN
  147. EnsureDynamicCast<ICodegeneratorExternalNode*>(Handlers[handlerIndex].Item)->SetValueGetterBuilder([handlerIndex, this](const TCodegenContext& ctx) {
  148. return GenerateHandler(handlerIndex, ctx.Codegen);
  149. });
  150. #endif
  151. }
  152. }
  153. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  154. if (state.IsInvalid()) {
  155. MakeState(ctx, state);
  156. }
  157. auto ptr = static_cast<TFlowState*>(state.AsBoxed().Get());
  158. while (true) {
  159. if (ptr->ChildReadIndex == Handlers.size()) {
  160. switch (ptr->InputStatus) {
  161. case NUdf::EFetchStatus::Ok: break;
  162. case NUdf::EFetchStatus::Yield:
  163. ptr->InputStatus = NUdf::EFetchStatus::Ok;
  164. return NUdf::TUnboxedValuePod::MakeYield();
  165. case NUdf::EFetchStatus::Finish:
  166. return NUdf::TUnboxedValuePod::MakeFinish();
  167. }
  168. const auto initUsage = MemLimit ? ctx.HolderFactory.GetMemoryUsed() : 0ULL;
  169. do {
  170. auto current = Flow->GetValue(ctx);
  171. if (current.IsFinish()) {
  172. ptr->InputStatus = NUdf::EFetchStatus::Finish;
  173. break;
  174. } else if (current.IsYield()) {
  175. ptr->InputStatus = NUdf::EFetchStatus::Yield;
  176. break;
  177. }
  178. ptr->Add(current.Release());
  179. } while (!ctx.CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage));
  180. ptr->ChildReadIndex = 0U;
  181. ptr->PushStat(ctx.Stats);
  182. }
  183. const auto& handler = Handlers[ptr->ChildReadIndex];
  184. auto childRes = handler.NewItem->GetValue(ctx);
  185. if (childRes.IsSpecial()) {
  186. ptr->ResetPosition();
  187. if (++ptr->ChildReadIndex == Handlers.size()) {
  188. ptr->Clear();
  189. }
  190. continue;
  191. }
  192. if (const auto offset = handler.ResultVariantOffset) {
  193. ui32 localIndex = 0U;
  194. if (handler.IsOutputVariant) {
  195. localIndex = childRes.GetVariantIndex();
  196. childRes = childRes.Release().GetVariantItem();
  197. }
  198. childRes = ctx.HolderFactory.CreateVariantHolder(childRes.Release(), *offset + localIndex);
  199. }
  200. return childRes.Release();
  201. }
  202. Y_UNREACHABLE();
  203. }
  204. #ifndef MKQL_DISABLE_CODEGEN
  205. private:
  206. class TLLVMFieldsStructureForFlowState: public TLLVMFieldsStructureForState {
  207. private:
  208. using TBase = TLLVMFieldsStructureForState;
  209. llvm::PointerType* StructPtrType;
  210. llvm::IntegerType* IndexType;
  211. protected:
  212. using TBase::Context;
  213. public:
  214. std::vector<llvm::Type*> GetFieldsArray() {
  215. std::vector<llvm::Type*> result = TBase::GetFields();
  216. result.emplace_back(IndexType); // position
  217. result.emplace_back(StructPtrType); // buffer
  218. return result;
  219. }
  220. llvm::Constant* GetPosition() const {
  221. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 0);
  222. }
  223. llvm::Constant* GetBuffer() const {
  224. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 1);
  225. }
  226. TLLVMFieldsStructureForFlowState(llvm::LLVMContext& context)
  227. : TBase(context)
  228. , StructPtrType(PointerType::getUnqual(StructType::get(context)))
  229. , IndexType(Type::getInt32Ty(context)) {
  230. }
  231. };
  232. Function* GenerateHandler(ui32 i, NYql::NCodegen::ICodegen& codegen) const {
  233. auto& module = codegen.GetModule();
  234. auto& context = codegen.GetContext();
  235. TStringStream out;
  236. out << this->DebugString() << "::Handler_" << i << "_(" << static_cast<const void*>(this) << ").";
  237. const auto& name = out.Str();
  238. if (const auto f = module.getFunction(name.c_str()))
  239. return f;
  240. const auto valueType = Type::getInt128Ty(context);
  241. const auto funcType = FunctionType::get(valueType, {PointerType::getUnqual(GetCompContextType(context))}, false);
  242. TCodegenContext ctx(codegen);
  243. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  244. DISubprogramAnnotator annotator(ctx, ctx.Func);
  245. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  246. ctx.Ctx = &*ctx.Func->arg_begin();
  247. ctx.Ctx->addAttr(Attribute::NonNull);
  248. const auto indexType = Type::getInt32Ty(context);
  249. TLLVMFieldsStructureForFlowState fieldsStruct(context);
  250. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  251. const auto statePtrType = PointerType::getUnqual(stateType);
  252. auto block = main;
  253. const auto placeholder = NYql::NCodegen::ETarget::Windows == ctx.Codegen.GetEffectiveTarget() ?
  254. new AllocaInst(valueType, 0U, "placeholder", block) : nullptr;
  255. const auto statePtr = GetElementPtrInst::CreateInBounds(valueType, ctx.GetMutables(), {ConstantInt::get(indexType, static_cast<const IComputationNode*>(this)->GetIndex())}, "state_ptr", block);
  256. const auto state = new LoadInst(valueType, statePtr, "state", block);
  257. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  258. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  259. const auto posPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetPosition() }, "pos_ptr", block);
  260. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  261. const auto back = BasicBlock::Create(context, "back", ctx.Func);
  262. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  263. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  264. BranchInst::Create(loop, block);
  265. block = loop;
  266. const auto pos = new LoadInst(indexType, posPtr, "pos", block);
  267. const auto getFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TFlowState::Get));
  268. Value* input;
  269. if (NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget()) {
  270. const auto getType = FunctionType::get(valueType, {stateArg->getType(), pos->getType()}, false);
  271. const auto getPtr = CastInst::Create(Instruction::IntToPtr, getFunc, PointerType::getUnqual(getType), "get", block);
  272. input = CallInst::Create(getType, getPtr, {stateArg, pos}, "input", block);
  273. } else {
  274. const auto getType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), placeholder->getType(), pos->getType()}, false);
  275. const auto getPtr = CastInst::Create(Instruction::IntToPtr, getFunc, PointerType::getUnqual(getType), "get", block);
  276. CallInst::Create(getType, getPtr, {stateArg, placeholder, pos}, "", block);
  277. input = new LoadInst(valueType, placeholder, "input", block);
  278. }
  279. const auto special = SwitchInst::Create(input, good, 2U, block);
  280. special->addCase(GetYield(context), back);
  281. special->addCase(GetFinish(context), done);
  282. block = back;
  283. new StoreInst(ConstantInt::get(pos->getType(), 0), posPtr, block);
  284. BranchInst::Create(done, block);
  285. block = done;
  286. ReturnInst::Create(context, input, block);
  287. block = good;
  288. const auto plus = BinaryOperator::CreateAdd(pos, ConstantInt::get(pos->getType(), 1), "plus", block);
  289. new StoreInst(plus, posPtr, block);
  290. const auto unpack = IsInputVariant ? GetVariantParts(input, ctx, block) : std::make_pair(ConstantInt::get(indexType, 0), input);
  291. const auto& handler = Handlers[i];
  292. const auto choise = SwitchInst::Create(unpack.first, loop, handler.InputIndices.size(), block);
  293. for (ui32 idx = 0U; idx < handler.InputIndices.size(); ++idx) {
  294. const auto var = BasicBlock::Create(context, (TString("var_") += ToString(idx)).c_str(), ctx.Func);
  295. choise->addCase(ConstantInt::get(indexType, handler.InputIndices[idx]), var);
  296. block = var;
  297. if (handler.InputIndices.size() > 1U) {
  298. const auto variant = MakeVariant(unpack.second, ConstantInt::get(indexType, idx), ctx, block);
  299. ReturnInst::Create(context, variant, block);
  300. } else {
  301. ReturnInst::Create(context, unpack.second, block);
  302. }
  303. }
  304. return ctx.Func;
  305. }
  306. public:
  307. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  308. auto& context = ctx.Codegen.GetContext();
  309. const auto valueType = Type::getInt128Ty(context);
  310. const auto statusType = Type::getInt32Ty(context);
  311. const auto indexType = Type::getInt32Ty(context);
  312. TLLVMFieldsStructureForFlowState fieldsStruct(context);
  313. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  314. const auto statePtrType = PointerType::getUnqual(stateType);
  315. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  316. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  317. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  318. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  319. const auto result = PHINode::Create(valueType, Handlers.size() + 2U, "result", exit);
  320. BranchInst::Create(make, main, IsInvalid(statePtr, block), block);
  321. block = make;
  322. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  323. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  324. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSwitchFlowWrapper::MakeState));
  325. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  326. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  327. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  328. BranchInst::Create(main, block);
  329. block = main;
  330. const auto state = new LoadInst(valueType, statePtr, "state", block);
  331. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  332. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  333. const auto indexPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetIndex() }, "index_ptr", block);
  334. BranchInst::Create(more, block);
  335. block = more;
  336. const auto index = new LoadInst(indexType, indexPtr, "index", block);
  337. const auto empty = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, index, ConstantInt::get(index->getType(), Handlers.size()), "empty", block);
  338. const auto next = BasicBlock::Create(context, "next", ctx.Func);
  339. const auto full = BasicBlock::Create(context, "full", ctx.Func);
  340. BranchInst::Create(next, full, empty, block);
  341. {
  342. block = next;
  343. const auto rest = BasicBlock::Create(context, "rest", ctx.Func);
  344. const auto pull = BasicBlock::Create(context, "pull", ctx.Func);
  345. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  346. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  347. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  348. const auto statusPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetStatus() }, "last", block);
  349. const auto last = new LoadInst(statusType, statusPtr, "last", block);
  350. result->addIncoming(GetFinish(context), block);
  351. const auto choise = SwitchInst::Create(last, pull, 2U, block);
  352. choise->addCase(ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Yield)), rest);
  353. choise->addCase(ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Finish)), exit);
  354. block = rest;
  355. new StoreInst(ConstantInt::get(last->getType(), static_cast<ui32>(NUdf::EFetchStatus::Ok)), statusPtr, block);
  356. result->addIncoming(GetYield(context), block);
  357. BranchInst::Create(exit, block);
  358. block = pull;
  359. const auto used = GetMemoryUsed(MemLimit, ctx, block);
  360. BranchInst::Create(loop, block);
  361. block = loop;
  362. const auto item = GetNodeValue(Flow, ctx, block);
  363. const auto finsh = IsFinish(item, block);
  364. const auto yield = IsYield(item, block);
  365. const auto special = BinaryOperator::CreateOr(finsh, yield, "special", block);
  366. const auto fin = SelectInst::Create(finsh, ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Finish)), ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), "fin", block);
  367. const auto save = SelectInst::Create(yield, ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Yield)), fin, "save", block);
  368. new StoreInst(save, statusPtr, block);
  369. BranchInst::Create(done, good, special, block);
  370. block = good;
  371. const auto addFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TFlowState::Add));
  372. const auto addArg = WrapArgumentForWindows(item, ctx, block);
  373. const auto addType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), addArg->getType()}, false);
  374. const auto addPtr = CastInst::Create(Instruction::IntToPtr, addFunc, PointerType::getUnqual(addType), "add", block);
  375. CallInst::Create(addType, addPtr, {stateArg, addArg}, "", block);
  376. const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, used, ctx, block);
  377. BranchInst::Create(done, loop, check, block);
  378. block = done;
  379. new StoreInst(ConstantInt::get(indexType, 0), indexPtr, block);
  380. const auto stat = ctx.GetStat();
  381. const auto statFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TFlowState::PushStat));
  382. const auto statType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), stat->getType()}, false);
  383. const auto statPtr = CastInst::Create(Instruction::IntToPtr, statFunc, PointerType::getUnqual(statType), "stat", block);
  384. CallInst::Create(statType, statPtr, {stateArg, stat}, "", block);
  385. BranchInst::Create(more, block);
  386. }
  387. {
  388. block = full;
  389. const auto stub = BasicBlock::Create(context, "stub", ctx.Func);
  390. const auto next = BasicBlock::Create(context, "next", ctx.Func);
  391. const auto drop = BasicBlock::Create(context, "drop", ctx.Func);
  392. new UnreachableInst(context, stub);
  393. const auto choise = SwitchInst::Create(index, stub, Handlers.size(), block);
  394. for (ui32 i = 0U; i < Handlers.size(); ++i) {
  395. const auto idx = ConstantInt::get(indexType, i);
  396. const auto var = BasicBlock::Create(context, (TString("var_") += ToString(i)).c_str(), ctx.Func);
  397. choise->addCase(idx, var);
  398. block = var;
  399. const auto output = GetNodeValue(Handlers[i].NewItem, ctx, block);
  400. if (const auto offset = Handlers[i].ResultVariantOffset) {
  401. const auto good = BasicBlock::Create(context, (TString("good_") += ToString(i)).c_str(), ctx.Func);
  402. BranchInst::Create(next, good, IsSpecial(output, block), block);
  403. block = good;
  404. const auto unpack = Handlers[i].IsOutputVariant ? GetVariantParts(output, ctx, block) : std::make_pair(ConstantInt::get(indexType, 0), output);
  405. const auto reindex = BinaryOperator::CreateAdd(unpack.first, ConstantInt::get(indexType, *offset), "reindex", block);
  406. const auto variant = MakeVariant(unpack.second, reindex, ctx, block);
  407. result->addIncoming(variant, block);
  408. BranchInst::Create(exit, block);
  409. } else {
  410. result->addIncoming(output, block);
  411. BranchInst::Create(next, exit, IsSpecial(output, block), block);
  412. }
  413. }
  414. block = next;
  415. const auto posPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetPosition() }, "pos_ptr", block);
  416. new StoreInst(ConstantInt::get(indexType, 0), posPtr, block);
  417. const auto plus = BinaryOperator::CreateAdd(index, ConstantInt::get(index->getType(), 1), "plus", block);
  418. new StoreInst(plus, indexPtr, block);
  419. const auto flush = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, plus, ConstantInt::get(plus->getType(), Handlers.size()), "flush", block);
  420. BranchInst::Create(drop, more, flush, block);
  421. block = drop;
  422. const auto clearFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TFlowState::Clear));
  423. const auto clearType = FunctionType::get(Type::getInt1Ty(context), {stateArg->getType()}, false);
  424. const auto clearPtr = CastInst::Create(Instruction::IntToPtr, clearFunc, PointerType::getUnqual(clearType), "clear", block);
  425. CallInst::Create(clearType, clearPtr, {stateArg}, "", block);
  426. BranchInst::Create(more, block);
  427. block = exit;
  428. return result;
  429. }
  430. }
  431. #endif
  432. private:
  433. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  434. state = ctx.HolderFactory.Create<TFlowState>(ctx.HolderFactory.GetPagePool(), Handlers.size());
  435. }
  436. void RegisterDependencies() const final {
  437. if (const auto flow = this->FlowDependsOn(Flow)) {
  438. for (const auto& x : Handlers) {
  439. this->Own(flow, x.Item);
  440. this->DependsOn(flow, x.NewItem);
  441. }
  442. }
  443. }
  444. IComputationNode *const Flow;
  445. const ui64 MemLimit;
  446. const TSwitchHandlersList Handlers;
  447. };
  448. template <bool IsInputVariant, bool TrackRss>
  449. class TSwitchWrapper : public TCustomValueCodegeneratorNode<TSwitchWrapper<IsInputVariant, TrackRss>> {
  450. typedef TCustomValueCodegeneratorNode<TSwitchWrapper<IsInputVariant, TrackRss>> TBaseComputation;
  451. private:
  452. class TChildStream : public TComputationValue<TChildStream> {
  453. public:
  454. using TBase = TComputationValue<TChildStream>;
  455. TChildStream(TMemoryUsageInfo* memInfo, const TSwitchHandler& handler,
  456. TComputationContext& ctx, const TPagedUnboxedValueList* buffer)
  457. : TBase(memInfo)
  458. , Handler(handler)
  459. , Ctx(ctx)
  460. , Buffer(buffer)
  461. {}
  462. void Reset(bool isFinished) {
  463. BufferIndex = InputIndex = 0U;
  464. IsFinished = isFinished;
  465. }
  466. private:
  467. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  468. for (;;) {
  469. if (BufferIndex == Buffer->Size()) {
  470. return IsFinished ? NUdf::EFetchStatus::Finish : NUdf::EFetchStatus::Yield;
  471. }
  472. auto current = (*Buffer)[BufferIndex];
  473. ui32 streamIndex = 0;
  474. if constexpr (IsInputVariant) {
  475. streamIndex = current.GetVariantIndex();
  476. current = current.Release().GetVariantItem();
  477. }
  478. for (; InputIndex < Handler.InputIndices.size(); ++InputIndex) {
  479. if (Handler.InputIndices[InputIndex] == streamIndex) {
  480. if (Handler.InputIndices.size() > 1) {
  481. current = Ctx.HolderFactory.CreateVariantHolder(current.Release(), InputIndex);
  482. }
  483. result = std::move(current);
  484. ++InputIndex;
  485. return NUdf::EFetchStatus::Ok;
  486. }
  487. }
  488. InputIndex = 0;
  489. ++BufferIndex;
  490. }
  491. }
  492. const TSwitchHandler Handler;
  493. TComputationContext& Ctx;
  494. const TPagedUnboxedValueList* const Buffer;
  495. ui32 BufferIndex = 0U;
  496. ui32 InputIndex = 0U;
  497. bool IsFinished = false;
  498. };
  499. class TValueBase : public TState {
  500. public:
  501. void Add(NUdf::TUnboxedValue&& item) {
  502. Buffer.Add(std::move(item));
  503. }
  504. void Reset() {
  505. if (const auto size = Buffer.Size()) {
  506. MKQL_SET_MAX_STAT(Ctx.Stats, Switch_MaxRowsCount, static_cast<i64>(size));
  507. MKQL_INC_STAT(Ctx.Stats, Switch_FlushesCount);
  508. }
  509. ChildReadIndex = 0U;
  510. for (const auto& stream : ChildrenInStreams) {
  511. stream->Reset(NUdf::EFetchStatus::Finish == InputStatus);
  512. }
  513. }
  514. bool Get(NUdf::TUnboxedValue& result) {
  515. if (ChildrenOutStreams[ChildReadIndex].Fetch(result) == NUdf::EFetchStatus::Ok) {
  516. return true;
  517. }
  518. if (++ChildReadIndex == Handlers.size()) {
  519. Buffer.Clear();
  520. }
  521. return false;
  522. }
  523. protected:
  524. TValueBase(TMemoryUsageInfo* memInfo, const TSwitchHandlersList& handlers, TComputationContext& ctx)
  525. : TState(memInfo, handlers.size())
  526. , Handlers(handlers)
  527. , Buffer(ctx.HolderFactory.GetPagePool())
  528. , Ctx(ctx)
  529. {
  530. ChildrenInStreams.reserve(Handlers.size());
  531. ChildrenOutStreams.reserve(Handlers.size());
  532. for (const auto& handler : Handlers) {
  533. const auto stream = Ctx.HolderFactory.Create<TChildStream>(handler, Ctx, &Buffer);
  534. ChildrenInStreams.emplace_back(static_cast<TChildStream*>(stream.AsBoxed().Get()));
  535. handler.Item->SetValue(Ctx, stream);
  536. ChildrenOutStreams.emplace_back(handler.NewItem->GetValue(Ctx));
  537. }
  538. }
  539. const TSwitchHandlersList Handlers;
  540. TPagedUnboxedValueList Buffer;
  541. TComputationContext& Ctx;
  542. std::vector<NUdf::TRefCountedPtr<TChildStream>, TMKQLAllocator<NUdf::TRefCountedPtr<TChildStream>>> ChildrenInStreams;
  543. TUnboxedValueVector ChildrenOutStreams;
  544. };
  545. class TValue : public TValueBase {
  546. public:
  547. TValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream,
  548. const TSwitchHandlersList& handlers, ui64 memLimit, TComputationContext& ctx)
  549. : TValueBase(memInfo, handlers, ctx)
  550. , Stream(std::move(stream)), MemLimit(memLimit)
  551. {}
  552. private:
  553. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  554. for (;;) {
  555. if (this->ChildReadIndex == this->Handlers.size()) {
  556. switch (this->InputStatus) {
  557. case NUdf::EFetchStatus::Ok: break;
  558. case NUdf::EFetchStatus::Yield:
  559. this->InputStatus = NUdf::EFetchStatus::Ok;
  560. return NUdf::EFetchStatus::Yield;
  561. case NUdf::EFetchStatus::Finish:
  562. return NUdf::EFetchStatus::Finish;
  563. }
  564. const auto initUsage = this->MemLimit ? this->Ctx.HolderFactory.GetMemoryUsed() : 0ULL;
  565. do {
  566. NUdf::TUnboxedValue current;
  567. this->InputStatus = this->Stream.Fetch(current);
  568. if (NUdf::EFetchStatus::Ok != this->InputStatus) {
  569. break;
  570. }
  571. this->Add(std::move(current));
  572. } while (!this->Ctx.template CheckAdjustedMemLimit<TrackRss>(this->MemLimit, initUsage));
  573. this->Reset();
  574. }
  575. if (!this->Get(result)) {
  576. continue;
  577. }
  578. const auto& handler = this->Handlers[this->ChildReadIndex];
  579. if (const auto offset = handler.ResultVariantOffset) {
  580. ui32 localIndex = 0;
  581. if (handler.IsOutputVariant) {
  582. localIndex = result.GetVariantIndex();
  583. result = result.Release().GetVariantItem();
  584. }
  585. result = this->Ctx.HolderFactory.CreateVariantHolder(result.Release(), *offset + localIndex);
  586. }
  587. return NUdf::EFetchStatus::Ok;
  588. }
  589. }
  590. const NUdf::TUnboxedValue Stream;
  591. const ui64 MemLimit;
  592. };
  593. #ifndef MKQL_DISABLE_CODEGEN
  594. class TCodegenValue : public TStreamCodegenSelfStateValue<TValueBase> {
  595. public:
  596. using TFetchPtr = typename TStreamCodegenSelfStateValue<TValueBase>::TFetchPtr;
  597. TCodegenValue(TMemoryUsageInfo* memInfo, TFetchPtr fetch, TComputationContext* ctx, NUdf::TUnboxedValue&& stream, const TSwitchHandlersList& handlers)
  598. : TStreamCodegenSelfStateValue<TValueBase>(memInfo, fetch, ctx, std::move(stream), handlers, *ctx)
  599. {}
  600. };
  601. #endif
  602. public:
  603. TSwitchWrapper(TComputationMutables& mutables, IComputationNode* stream, ui64 memLimit, TSwitchHandlersList&& handlers)
  604. : TBaseComputation(mutables)
  605. , Stream(stream)
  606. , MemLimit(memLimit)
  607. , Handlers(std::move(handlers))
  608. {}
  609. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  610. #ifndef MKQL_DISABLE_CODEGEN
  611. if (ctx.ExecuteLLVM && Switch)
  612. return ctx.HolderFactory.Create<TCodegenValue>(Switch, &ctx, Stream->GetValue(ctx), Handlers);
  613. #endif
  614. return ctx.HolderFactory.Create<TValue>(Stream->GetValue(ctx), Handlers, MemLimit, ctx);
  615. }
  616. private:
  617. void RegisterDependencies() const final {
  618. this->DependsOn(Stream);
  619. for (const auto& handler : Handlers) {
  620. this->Own(handler.Item);
  621. this->DependsOn(handler.NewItem);
  622. }
  623. }
  624. #ifndef MKQL_DISABLE_CODEGEN
  625. class TLLVMFieldsStructureForValueBase: public TLLVMFieldsStructureForState {
  626. private:
  627. using TBase = TLLVMFieldsStructureForState;
  628. protected:
  629. using TBase::Context;
  630. public:
  631. std::vector<llvm::Type*> GetFieldsArray() {
  632. std::vector<llvm::Type*> result = TBase::GetFields();
  633. return result;
  634. }
  635. TLLVMFieldsStructureForValueBase(llvm::LLVMContext& context)
  636. : TBase(context) {
  637. }
  638. };
  639. void GenerateFunctions(NYql::NCodegen::ICodegen& codegen) final {
  640. SwitchFunc = GenerateSwitch(codegen);
  641. codegen.ExportSymbol(SwitchFunc);
  642. }
  643. void FinalizeFunctions(NYql::NCodegen::ICodegen& codegen) final {
  644. if (SwitchFunc)
  645. Switch = reinterpret_cast<TSwitchPtr>(codegen.GetPointerToFunction(SwitchFunc));
  646. }
  647. Function* GenerateSwitch(NYql::NCodegen::ICodegen& codegen) const {
  648. auto& module = codegen.GetModule();
  649. auto& context = codegen.GetContext();
  650. const auto& name = this->MakeName("Fetch");
  651. if (const auto f = module.getFunction(name.c_str()))
  652. return f;
  653. const auto valueType = Type::getInt128Ty(context);
  654. const auto ptrValueType = PointerType::getUnqual(valueType);
  655. const auto containerType = codegen.GetEffectiveTarget() == NYql::NCodegen::ETarget::Windows ? static_cast<Type*>(ptrValueType) : static_cast<Type*>(valueType);
  656. const auto contextType = GetCompContextType(context);
  657. const auto statusType = Type::getInt32Ty(context);
  658. const auto indexType = Type::getInt32Ty(context);
  659. TLLVMFieldsStructureForValueBase fieldsStruct(context);
  660. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  661. const auto statePtrType = PointerType::getUnqual(stateType);
  662. const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), containerType, statePtrType, ptrValueType}, false);
  663. TCodegenContext ctx(codegen);
  664. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  665. DISubprogramAnnotator annotator(ctx, ctx.Func);
  666. auto args = ctx.Func->arg_begin();
  667. ctx.Ctx = &*args;
  668. const auto containerArg = &*++args;
  669. const auto stateArg = &*++args;
  670. const auto valuePtr = &*++args;
  671. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  672. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  673. auto block = main;
  674. const auto indexPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetIndex() }, "index_ptr", block);
  675. const auto itemPtr = new AllocaInst(valueType, 0U, "item_ptr", block);
  676. new StoreInst(ConstantInt::get(valueType, 0), itemPtr, block);
  677. BranchInst::Create(more, block);
  678. block = more;
  679. const auto index = new LoadInst(indexType, indexPtr, "index", block);
  680. const auto empty = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, index, ConstantInt::get(index->getType(), Handlers.size()), "empty", block);
  681. const auto next = BasicBlock::Create(context, "next", ctx.Func);
  682. const auto full = BasicBlock::Create(context, "full", ctx.Func);
  683. BranchInst::Create(next, full, empty, block);
  684. {
  685. block = next;
  686. const auto rest = BasicBlock::Create(context, "rest", ctx.Func);
  687. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  688. const auto pull = BasicBlock::Create(context, "pull", ctx.Func);
  689. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  690. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  691. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  692. const auto statusPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetStatus() }, "last", block);
  693. const auto last = new LoadInst(statusType, statusPtr, "last", block);
  694. const auto choise = SwitchInst::Create(last, pull, 2U, block);
  695. choise->addCase(ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Yield)), rest);
  696. choise->addCase(ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Finish)), exit);
  697. block = rest;
  698. new StoreInst(ConstantInt::get(last->getType(), static_cast<ui32>(NUdf::EFetchStatus::Ok)), statusPtr, block);
  699. BranchInst::Create(exit, block);
  700. block = exit;
  701. ReturnInst::Create(context, last, block);
  702. block = pull;
  703. const auto used = GetMemoryUsed(MemLimit, ctx, block);
  704. const auto stream = codegen.GetEffectiveTarget() == NYql::NCodegen::ETarget::Windows ?
  705. new LoadInst(valueType, containerArg, "load_container", false, block) : static_cast<Value*>(containerArg);
  706. BranchInst::Create(loop, block);
  707. block = loop;
  708. const auto fetch = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, stream, codegen, block, itemPtr);
  709. new StoreInst(fetch, statusPtr, block);
  710. const auto ok = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, fetch, ConstantInt::get(fetch->getType(), static_cast<ui32>(NUdf::EFetchStatus::Ok)), "ok", block);
  711. BranchInst::Create(good, done, ok, block);
  712. block = good;
  713. const auto addFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TValueBase::Add));
  714. const auto addType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), itemPtr->getType()}, false);
  715. const auto addPtr = CastInst::Create(Instruction::IntToPtr, addFunc, PointerType::getUnqual(addType), "add", block);
  716. CallInst::Create(addType, addPtr, {stateArg, itemPtr}, "", block);
  717. const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, used, ctx, block);
  718. BranchInst::Create(done, loop, check, block);
  719. block = done;
  720. const auto resetFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TValueBase::Reset));
  721. const auto resetType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType()}, false);
  722. const auto resetPtr = CastInst::Create(Instruction::IntToPtr, resetFunc, PointerType::getUnqual(resetType), "reset", block);
  723. CallInst::Create(resetType, resetPtr, {stateArg}, "", block);
  724. BranchInst::Create(more, block);
  725. }
  726. {
  727. block = full;
  728. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  729. const auto stub = BasicBlock::Create(context, "stub", ctx.Func);
  730. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  731. ReturnInst::Create(context, ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), exit);
  732. new UnreachableInst(context, stub);
  733. const auto nextFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TValueBase::Get));
  734. const auto nextType = FunctionType::get(Type::getInt1Ty(context), {stateArg->getType(), valuePtr->getType()}, false);
  735. const auto nextPtr = CastInst::Create(Instruction::IntToPtr, nextFunc, PointerType::getUnqual(nextType), "next", block);
  736. const auto has = CallInst::Create(nextType, nextPtr, {stateArg, valuePtr}, "has", block);
  737. BranchInst::Create(good, more, has, block);
  738. block = good;
  739. const auto choise = SwitchInst::Create(index, stub, Handlers.size(), block);
  740. for (ui32 i = 0U; i < Handlers.size(); ++i) {
  741. const auto idx = ConstantInt::get(indexType, i);
  742. if (const auto offset = Handlers[i].ResultVariantOffset) {
  743. const auto var = BasicBlock::Create(context, (TString("var_") += ToString(i)).c_str(), ctx.Func);
  744. choise->addCase(idx, var);
  745. block = var;
  746. const auto output = new LoadInst(valueType, valuePtr, "output", block);
  747. ValueRelease(Handlers[i].Kind, output, ctx, block);
  748. const auto unpack = Handlers[i].IsOutputVariant ? GetVariantParts(output, ctx, block) : std::make_pair(ConstantInt::get(indexType, 0), output);
  749. const auto reindex = BinaryOperator::CreateAdd(unpack.first, ConstantInt::get(indexType, *offset), "reindex", block);
  750. const auto variant = MakeVariant(unpack.second, reindex, ctx, block);
  751. new StoreInst(variant, valuePtr, block);
  752. ValueAddRef(EValueRepresentation::Any, variant, ctx, block);
  753. ReturnInst::Create(context, ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), block);
  754. } else {
  755. choise->addCase(idx, exit);
  756. }
  757. }
  758. }
  759. return ctx.Func;
  760. }
  761. using TSwitchPtr = typename TCodegenValue::TFetchPtr;
  762. Function* SwitchFunc = nullptr;
  763. TSwitchPtr Switch = nullptr;
  764. #endif
  765. IComputationNode *const Stream;
  766. const ui64 MemLimit;
  767. const TSwitchHandlersList Handlers;
  768. };
  769. }
  770. IComputationNode* WrapSwitch(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  771. MKQL_ENSURE(callable.GetInputsCount() >= 6, "Expected at least 6 args");
  772. MKQL_ENSURE((callable.GetInputsCount() - 2) % 4 == 0, "Corrupted arguments for Switch");
  773. TSwitchHandlersList handlers;
  774. handlers.reserve(callable.GetInputsCount() >> 2U);
  775. const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
  776. const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui64>();
  777. const auto type = callable.GetType()->GetReturnType();
  778. for (ui32 i = 2; i < callable.GetInputsCount(); i += 4) {
  779. TSwitchHandler handler;
  780. const auto tuple = AS_VALUE(TTupleLiteral, callable.GetInput(i));
  781. for (ui32 tupleIndex = 0; tupleIndex < tuple->GetValuesCount(); ++tupleIndex) {
  782. handler.InputIndices.emplace_back(AS_VALUE(TDataLiteral, tuple->GetValue(tupleIndex))->AsValue().Get<ui32>());
  783. }
  784. const auto itemType = type->IsFlow() ?
  785. AS_TYPE(TFlowType, callable.GetInput(i + 2))->GetItemType():
  786. AS_TYPE(TStreamType, callable.GetInput(i + 2))->GetItemType();
  787. handler.IsOutputVariant = itemType->IsVariant();
  788. handler.Kind = GetValueRepresentation(itemType);
  789. handler.NewItem = LocateNode(ctx.NodeLocator, callable, i + 2);
  790. handler.Item = LocateExternalNode(ctx.NodeLocator, callable, i + 1);
  791. const auto offsetNode = callable.GetInput(i + 3);
  792. if (!offsetNode.GetStaticType()->IsVoid()) {
  793. handler.ResultVariantOffset = AS_VALUE(TDataLiteral, offsetNode)->AsValue().Get<ui32>();
  794. }
  795. handlers.emplace_back(std::move(handler));
  796. }
  797. const bool trackRss = EGraphPerProcess::Single == ctx.GraphPerProcess;
  798. if (type->IsFlow()) {
  799. const bool isInputVariant = AS_TYPE(TFlowType, callable.GetInput(0))->GetItemType()->IsVariant();
  800. const auto kind = GetValueRepresentation(type);
  801. if (isInputVariant && trackRss) {
  802. return new TSwitchFlowWrapper<true, true>(ctx.Mutables, kind, stream, memLimit, std::move(handlers));
  803. } else if (isInputVariant) {
  804. return new TSwitchFlowWrapper<true, false>(ctx.Mutables, kind, stream, memLimit, std::move(handlers));
  805. } else if (trackRss) {
  806. return new TSwitchFlowWrapper<false, true>(ctx.Mutables, kind, stream, memLimit, std::move(handlers));
  807. } else {
  808. return new TSwitchFlowWrapper<false, false>(ctx.Mutables, kind, stream, memLimit, std::move(handlers));
  809. }
  810. } else if (type->IsStream()) {
  811. const bool isInputVariant = AS_TYPE(TStreamType, callable.GetInput(0))->GetItemType()->IsVariant();
  812. if (isInputVariant && trackRss) {
  813. return new TSwitchWrapper<true, true>(ctx.Mutables, stream, memLimit, std::move(handlers));
  814. } else if (isInputVariant) {
  815. return new TSwitchWrapper<true, false>(ctx.Mutables, stream, memLimit, std::move(handlers));
  816. } else if (trackRss) {
  817. return new TSwitchWrapper<false, true>(ctx.Mutables, stream, memLimit, std::move(handlers));
  818. } else {
  819. return new TSwitchWrapper<false, false>(ctx.Mutables, stream, memLimit, std::move(handlers));
  820. }
  821. }
  822. THROW yexception() << "Expected flow or stream.";
  823. }
  824. }
  825. }