mkql_switch.cpp 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030
  1. #include "mkql_switch.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  3. #include <yql/essentials/minikql/computation/mkql_llvm_base.h> // Y_IGNORE
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_stats_registry.h>
  6. #include <yql/essentials/utils/cast.h>
  7. #include <util/string/cast.h>
  8. namespace NKikimr {
  9. namespace NMiniKQL {
  10. using NYql::EnsureDynamicCast;
  11. namespace {
  12. static const TStatKey Switch_FlushesCount("Switch_FlushesCount", true);
  13. static const TStatKey Switch_MaxRowsCount("Switch_MaxRowsCount", false);
  14. using TPagedUnboxedValueList = TPagedList<NUdf::TUnboxedValue>;
  15. struct TSwitchHandler {
  16. std::vector<ui32, TMKQLAllocator<ui32>> InputIndices;
  17. IComputationExternalNode* Item = nullptr;
  18. IComputationNode* NewItem = nullptr;
  19. std::optional<ui32> ResultVariantOffset;
  20. bool IsOutputVariant = false;
  21. EValueRepresentation Kind = EValueRepresentation::Any;
  22. };
  23. using TSwitchHandlersList = std::vector<TSwitchHandler, TMKQLAllocator<TSwitchHandler>>;
  24. class TState : public TComputationValue<TState> {
  25. typedef TComputationValue<TState> TBase;
  26. public:
  27. TState(TMemoryUsageInfo* memInfo, ui32 size)
  28. : TBase(memInfo), ChildReadIndex(size)
  29. {}
  30. ui32 ChildReadIndex;
  31. NUdf::EFetchStatus InputStatus = NUdf::EFetchStatus::Ok;
  32. };
  33. #ifndef MKQL_DISABLE_CODEGEN
  34. class TLLVMFieldsStructureForState: public TLLVMFieldsStructure<TComputationValue<TState>> {
  35. private:
  36. using TBase = TLLVMFieldsStructure<TComputationValue<TState>>;
  37. llvm::IntegerType* IndexType;
  38. llvm::IntegerType* StatusType;
  39. const ui32 FieldsCount = 0;
  40. protected:
  41. using TBase::Context;
  42. ui32 GetFieldsCount() const {
  43. return FieldsCount;
  44. }
  45. std::vector<llvm::Type*> GetFields() {
  46. std::vector<llvm::Type*> result = TBase::GetFields();
  47. result.emplace_back(IndexType); // index
  48. result.emplace_back(StatusType); // status
  49. return result;
  50. }
  51. public:
  52. std::vector<llvm::Type*> GetFieldsArray() {
  53. return GetFields();
  54. }
  55. llvm::Constant* GetIndex() {
  56. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 0);
  57. }
  58. llvm::Constant* GetStatus() {
  59. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 1);
  60. }
  61. TLLVMFieldsStructureForState(llvm::LLVMContext& context)
  62. : TBase(context)
  63. , IndexType(Type::getInt32Ty(context))
  64. , StatusType(Type::getInt32Ty(context))
  65. , FieldsCount(GetFields().size())
  66. {
  67. }
  68. };
  69. #endif
  70. template <bool IsInputVariant, bool TrackRss>
  71. class TSwitchFlowWrapper : public TStatefulFlowCodegeneratorNode<TSwitchFlowWrapper<IsInputVariant, TrackRss>> {
  72. typedef TStatefulFlowCodegeneratorNode<TSwitchFlowWrapper<IsInputVariant, TrackRss>> TBaseComputation;
  73. private:
  74. class TFlowState : public TState {
  75. public:
  76. TFlowState(TMemoryUsageInfo* memInfo, TAlignedPagePool& pool, ui32 size)
  77. : TState(memInfo, size), Buffer(pool)
  78. {}
  79. void Add(NUdf::TUnboxedValuePod item) {
  80. Buffer.Add(std::move(item));
  81. }
  82. void PushStat(IStatsRegistry* stats) const {
  83. if (const auto size = Buffer.Size()) {
  84. MKQL_SET_MAX_STAT(stats, Switch_MaxRowsCount, static_cast<i64>(size));
  85. MKQL_INC_STAT(stats, Switch_FlushesCount);
  86. }
  87. }
  88. NUdf::TUnboxedValuePod Get(ui32 i) const {
  89. if (Buffer.Size() == i) {
  90. return NUdf::EFetchStatus::Finish == InputStatus ?
  91. NUdf::TUnboxedValuePod::MakeFinish():
  92. NUdf::TUnboxedValuePod::MakeYield();
  93. }
  94. return Buffer[i];
  95. }
  96. void Clear() {
  97. Buffer.Clear();
  98. }
  99. void ResetPosition() {
  100. Position = 0U;
  101. }
  102. NUdf::TUnboxedValuePod Handler(ui32, const TSwitchHandler& handler, TComputationContext& ctx) {
  103. while (true) {
  104. auto current = Get(Position);
  105. if (current.IsSpecial()) {
  106. if (current.IsYield())
  107. ResetPosition();
  108. return current;
  109. }
  110. ++Position;
  111. ui32 streamIndex = 0U;
  112. if constexpr (IsInputVariant) {
  113. streamIndex = current.GetVariantIndex();
  114. current = current.GetVariantItem().Release();
  115. }
  116. for (ui32 var = 0U; var < handler.InputIndices.size(); ++var) {
  117. if (handler.InputIndices[var] == streamIndex) {
  118. if (handler.InputIndices.size() > 1) {
  119. current = ctx.HolderFactory.CreateVariantHolder(current, var);
  120. }
  121. return current;
  122. }
  123. }
  124. }
  125. }
  126. ui32 Position = 0U;
  127. TPagedUnboxedValueList Buffer;
  128. };
  129. public:
  130. TSwitchFlowWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationNode* flow, ui64 memLimit, TSwitchHandlersList&& handlers)
  131. : TBaseComputation(mutables, flow, kind, EValueRepresentation::Any)
  132. , Flow(flow)
  133. , MemLimit(memLimit)
  134. , Handlers(std::move(handlers))
  135. {
  136. size_t handlersSize = Handlers.size();
  137. for (ui32 handlerIndex = 0; handlerIndex < handlersSize; ++handlerIndex) {
  138. Handlers[handlerIndex].Item->SetGetter([stateIndex = mutables.CurValueIndex - 1, handlerIndex, this](TComputationContext & context) {
  139. NUdf::TUnboxedValue& state = context.MutableValues[stateIndex];
  140. if (state.IsInvalid()) {
  141. MakeState(context, state);
  142. }
  143. auto ptr = static_cast<TFlowState*>(state.AsBoxed().Get());
  144. return ptr->Handler(handlerIndex, Handlers[handlerIndex], context);
  145. });
  146. #ifndef MKQL_DISABLE_CODEGEN
  147. EnsureDynamicCast<ICodegeneratorExternalNode*>(Handlers[handlerIndex].Item)->SetValueGetterBuilder([handlerIndex, this](const TCodegenContext& ctx) {
  148. return GenerateHandler(handlerIndex, ctx.Codegen);
  149. });
  150. #endif
  151. }
  152. }
  153. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  154. if (state.IsInvalid()) {
  155. MakeState(ctx, state);
  156. }
  157. auto ptr = static_cast<TFlowState*>(state.AsBoxed().Get());
  158. while (true) {
  159. if (ptr->ChildReadIndex == Handlers.size()) {
  160. switch (ptr->InputStatus) {
  161. case NUdf::EFetchStatus::Ok: break;
  162. case NUdf::EFetchStatus::Yield:
  163. ptr->InputStatus = NUdf::EFetchStatus::Ok;
  164. return NUdf::TUnboxedValuePod::MakeYield();
  165. case NUdf::EFetchStatus::Finish:
  166. return NUdf::TUnboxedValuePod::MakeFinish();
  167. }
  168. const auto initUsage = MemLimit ? ctx.HolderFactory.GetMemoryUsed() : 0ULL;
  169. do {
  170. auto current = Flow->GetValue(ctx);
  171. if (current.IsFinish()) {
  172. ptr->InputStatus = NUdf::EFetchStatus::Finish;
  173. break;
  174. } else if (current.IsYield()) {
  175. ptr->InputStatus = NUdf::EFetchStatus::Yield;
  176. break;
  177. }
  178. ptr->Add(current.Release());
  179. } while (!ctx.CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage));
  180. ptr->ChildReadIndex = 0U;
  181. ptr->PushStat(ctx.Stats);
  182. }
  183. const auto& handler = Handlers[ptr->ChildReadIndex];
  184. auto childRes = handler.NewItem->GetValue(ctx);
  185. if (childRes.IsSpecial()) {
  186. ptr->ResetPosition();
  187. if (++ptr->ChildReadIndex == Handlers.size()) {
  188. ptr->Clear();
  189. }
  190. continue;
  191. }
  192. if (const auto offset = handler.ResultVariantOffset) {
  193. ui32 localIndex = 0U;
  194. if (handler.IsOutputVariant) {
  195. localIndex = childRes.GetVariantIndex();
  196. childRes = childRes.Release().GetVariantItem();
  197. }
  198. childRes = ctx.HolderFactory.CreateVariantHolder(childRes.Release(), *offset + localIndex);
  199. }
  200. return childRes.Release();
  201. }
  202. Y_UNREACHABLE();
  203. }
  204. #ifndef MKQL_DISABLE_CODEGEN
  205. private:
  206. class TLLVMFieldsStructureForFlowState: public TLLVMFieldsStructureForState {
  207. private:
  208. using TBase = TLLVMFieldsStructureForState;
  209. llvm::PointerType* StructPtrType;
  210. llvm::IntegerType* IndexType;
  211. protected:
  212. using TBase::Context;
  213. public:
  214. std::vector<llvm::Type*> GetFieldsArray() {
  215. std::vector<llvm::Type*> result = TBase::GetFields();
  216. result.emplace_back(IndexType); // position
  217. result.emplace_back(StructPtrType); // buffer
  218. return result;
  219. }
  220. llvm::Constant* GetPosition() const {
  221. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 0);
  222. }
  223. llvm::Constant* GetBuffer() const {
  224. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 1);
  225. }
  226. TLLVMFieldsStructureForFlowState(llvm::LLVMContext& context)
  227. : TBase(context)
  228. , StructPtrType(PointerType::getUnqual(StructType::get(context)))
  229. , IndexType(Type::getInt32Ty(context)) {
  230. }
  231. };
  232. Function* GenerateHandler(ui32 i, NYql::NCodegen::ICodegen& codegen) const {
  233. auto& module = codegen.GetModule();
  234. auto& context = codegen.GetContext();
  235. TStringStream out;
  236. out << this->DebugString() << "::Handler_" << i << "_(" << static_cast<const void*>(this) << ").";
  237. const auto& name = out.Str();
  238. if (const auto f = module.getFunction(name.c_str()))
  239. return f;
  240. const auto valueType = Type::getInt128Ty(context);
  241. const auto funcType = FunctionType::get(valueType, {PointerType::getUnqual(GetCompContextType(context))}, false);
  242. TCodegenContext ctx(codegen);
  243. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  244. DISubprogramAnnotator annotator(ctx, ctx.Func);
  245. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  246. ctx.Ctx = &*ctx.Func->arg_begin();
  247. ctx.Ctx->addAttr(Attribute::NonNull);
  248. const auto indexType = Type::getInt32Ty(context);
  249. TLLVMFieldsStructureForFlowState fieldsStruct(context);
  250. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  251. const auto statePtrType = PointerType::getUnqual(stateType);
  252. auto block = main;
  253. const auto statePtr = GetElementPtrInst::CreateInBounds(valueType, ctx.GetMutables(), {ConstantInt::get(indexType, static_cast<const IComputationNode*>(this)->GetIndex())}, "state_ptr", block);
  254. const auto state = new LoadInst(valueType, statePtr, "state", block);
  255. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  256. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  257. const auto posPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetPosition() }, "pos_ptr", block);
  258. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  259. const auto back = BasicBlock::Create(context, "back", ctx.Func);
  260. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  261. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  262. BranchInst::Create(loop, block);
  263. block = loop;
  264. const auto pos = new LoadInst(indexType, posPtr, "pos", block);
  265. const auto getFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TFlowState::Get));
  266. const auto getType = FunctionType::get(valueType, {stateArg->getType(), pos->getType()}, false);
  267. const auto getPtr = CastInst::Create(Instruction::IntToPtr, getFunc, PointerType::getUnqual(getType), "get", block);
  268. const auto input = CallInst::Create(getType, getPtr, {stateArg, pos}, "input", block);
  269. const auto special = SwitchInst::Create(input, good, 2U, block);
  270. special->addCase(GetYield(context), back);
  271. special->addCase(GetFinish(context), done);
  272. block = back;
  273. new StoreInst(ConstantInt::get(pos->getType(), 0), posPtr, block);
  274. BranchInst::Create(done, block);
  275. block = done;
  276. ReturnInst::Create(context, input, block);
  277. block = good;
  278. const auto plus = BinaryOperator::CreateAdd(pos, ConstantInt::get(pos->getType(), 1), "plus", block);
  279. new StoreInst(plus, posPtr, block);
  280. const auto unpack = IsInputVariant ? GetVariantParts(input, ctx, block) : std::make_pair(ConstantInt::get(indexType, 0), input);
  281. const auto& handler = Handlers[i];
  282. const auto choise = SwitchInst::Create(unpack.first, loop, handler.InputIndices.size(), block);
  283. for (ui32 idx = 0U; idx < handler.InputIndices.size(); ++idx) {
  284. const auto var = BasicBlock::Create(context, (TString("var_") += ToString(idx)).c_str(), ctx.Func);
  285. choise->addCase(ConstantInt::get(indexType, handler.InputIndices[idx]), var);
  286. block = var;
  287. if (handler.InputIndices.size() > 1U) {
  288. const auto variant = MakeVariant(unpack.second, ConstantInt::get(indexType, idx), ctx, block);
  289. ReturnInst::Create(context, variant, block);
  290. } else {
  291. ReturnInst::Create(context, unpack.second, block);
  292. }
  293. }
  294. return ctx.Func;
  295. }
  296. public:
  297. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  298. auto& context = ctx.Codegen.GetContext();
  299. const auto valueType = Type::getInt128Ty(context);
  300. const auto statusType = Type::getInt32Ty(context);
  301. const auto indexType = Type::getInt32Ty(context);
  302. TLLVMFieldsStructureForFlowState fieldsStruct(context);
  303. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  304. const auto statePtrType = PointerType::getUnqual(stateType);
  305. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  306. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  307. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  308. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  309. const auto result = PHINode::Create(valueType, Handlers.size() + 2U, "result", exit);
  310. BranchInst::Create(make, main, IsInvalid(statePtr, block, context), block);
  311. block = make;
  312. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  313. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  314. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSwitchFlowWrapper::MakeState));
  315. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  316. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  317. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  318. BranchInst::Create(main, block);
  319. block = main;
  320. const auto state = new LoadInst(valueType, statePtr, "state", block);
  321. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  322. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  323. const auto indexPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetIndex() }, "index_ptr", block);
  324. BranchInst::Create(more, block);
  325. block = more;
  326. const auto index = new LoadInst(indexType, indexPtr, "index", block);
  327. const auto empty = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, index, ConstantInt::get(index->getType(), Handlers.size()), "empty", block);
  328. const auto next = BasicBlock::Create(context, "next", ctx.Func);
  329. const auto full = BasicBlock::Create(context, "full", ctx.Func);
  330. BranchInst::Create(next, full, empty, block);
  331. {
  332. block = next;
  333. const auto rest = BasicBlock::Create(context, "rest", ctx.Func);
  334. const auto pull = BasicBlock::Create(context, "pull", ctx.Func);
  335. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  336. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  337. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  338. const auto statusPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetStatus() }, "last", block);
  339. const auto last = new LoadInst(statusType, statusPtr, "last", block);
  340. result->addIncoming(GetFinish(context), block);
  341. const auto choise = SwitchInst::Create(last, pull, 2U, block);
  342. choise->addCase(ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Yield)), rest);
  343. choise->addCase(ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Finish)), exit);
  344. block = rest;
  345. new StoreInst(ConstantInt::get(last->getType(), static_cast<ui32>(NUdf::EFetchStatus::Ok)), statusPtr, block);
  346. result->addIncoming(GetYield(context), block);
  347. BranchInst::Create(exit, block);
  348. block = pull;
  349. const auto used = GetMemoryUsed(MemLimit, ctx, block);
  350. BranchInst::Create(loop, block);
  351. block = loop;
  352. const auto item = GetNodeValue(Flow, ctx, block);
  353. const auto finsh = IsFinish(item, block, context);
  354. const auto yield = IsYield(item, block, context);
  355. const auto special = BinaryOperator::CreateOr(finsh, yield, "special", block);
  356. const auto fin = SelectInst::Create(finsh, ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Finish)), ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), "fin", block);
  357. const auto save = SelectInst::Create(yield, ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Yield)), fin, "save", block);
  358. new StoreInst(save, statusPtr, block);
  359. BranchInst::Create(done, good, special, block);
  360. block = good;
  361. const auto addFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TFlowState::Add));
  362. const auto addArg = item;
  363. const auto addType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), addArg->getType()}, false);
  364. const auto addPtr = CastInst::Create(Instruction::IntToPtr, addFunc, PointerType::getUnqual(addType), "add", block);
  365. CallInst::Create(addType, addPtr, {stateArg, addArg}, "", block);
  366. const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, used, ctx, block);
  367. BranchInst::Create(done, loop, check, block);
  368. block = done;
  369. new StoreInst(ConstantInt::get(indexType, 0), indexPtr, block);
  370. const auto stat = ctx.GetStat();
  371. const auto statFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TFlowState::PushStat));
  372. const auto statType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), stat->getType()}, false);
  373. const auto statPtr = CastInst::Create(Instruction::IntToPtr, statFunc, PointerType::getUnqual(statType), "stat", block);
  374. CallInst::Create(statType, statPtr, {stateArg, stat}, "", block);
  375. BranchInst::Create(more, block);
  376. }
  377. {
  378. block = full;
  379. const auto stub = BasicBlock::Create(context, "stub", ctx.Func);
  380. const auto next = BasicBlock::Create(context, "next", ctx.Func);
  381. const auto drop = BasicBlock::Create(context, "drop", ctx.Func);
  382. new UnreachableInst(context, stub);
  383. const auto choise = SwitchInst::Create(index, stub, Handlers.size(), block);
  384. for (ui32 i = 0U; i < Handlers.size(); ++i) {
  385. const auto idx = ConstantInt::get(indexType, i);
  386. const auto var = BasicBlock::Create(context, (TString("var_") += ToString(i)).c_str(), ctx.Func);
  387. choise->addCase(idx, var);
  388. block = var;
  389. const auto output = GetNodeValue(Handlers[i].NewItem, ctx, block);
  390. if (const auto offset = Handlers[i].ResultVariantOffset) {
  391. const auto good = BasicBlock::Create(context, (TString("good_") += ToString(i)).c_str(), ctx.Func);
  392. BranchInst::Create(next, good, IsSpecial(output, block, context), block);
  393. block = good;
  394. const auto unpack = Handlers[i].IsOutputVariant ? GetVariantParts(output, ctx, block) : std::make_pair(ConstantInt::get(indexType, 0), output);
  395. const auto reindex = BinaryOperator::CreateAdd(unpack.first, ConstantInt::get(indexType, *offset), "reindex", block);
  396. const auto variant = MakeVariant(unpack.second, reindex, ctx, block);
  397. result->addIncoming(variant, block);
  398. BranchInst::Create(exit, block);
  399. } else {
  400. result->addIncoming(output, block);
  401. BranchInst::Create(next, exit, IsSpecial(output, block, context), block);
  402. }
  403. }
  404. block = next;
  405. const auto posPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetPosition() }, "pos_ptr", block);
  406. new StoreInst(ConstantInt::get(indexType, 0), posPtr, block);
  407. const auto plus = BinaryOperator::CreateAdd(index, ConstantInt::get(index->getType(), 1), "plus", block);
  408. new StoreInst(plus, indexPtr, block);
  409. const auto flush = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, plus, ConstantInt::get(plus->getType(), Handlers.size()), "flush", block);
  410. BranchInst::Create(drop, more, flush, block);
  411. block = drop;
  412. const auto clearFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TFlowState::Clear));
  413. const auto clearType = FunctionType::get(Type::getInt1Ty(context), {stateArg->getType()}, false);
  414. const auto clearPtr = CastInst::Create(Instruction::IntToPtr, clearFunc, PointerType::getUnqual(clearType), "clear", block);
  415. CallInst::Create(clearType, clearPtr, {stateArg}, "", block);
  416. BranchInst::Create(more, block);
  417. block = exit;
  418. return result;
  419. }
  420. }
  421. #endif
  422. private:
  423. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  424. state = ctx.HolderFactory.Create<TFlowState>(ctx.HolderFactory.GetPagePool(), Handlers.size());
  425. }
  426. void RegisterDependencies() const final {
  427. if (const auto flow = this->FlowDependsOn(Flow)) {
  428. for (const auto& x : Handlers) {
  429. this->Own(flow, x.Item);
  430. this->DependsOn(flow, x.NewItem);
  431. }
  432. }
  433. }
  434. IComputationNode *const Flow;
  435. const ui64 MemLimit;
  436. const TSwitchHandlersList Handlers;
  437. };
  438. template <bool IsInputVariant, bool TrackRss>
  439. class TSwitchWrapper : public TCustomValueCodegeneratorNode<TSwitchWrapper<IsInputVariant, TrackRss>> {
  440. typedef TCustomValueCodegeneratorNode<TSwitchWrapper<IsInputVariant, TrackRss>> TBaseComputation;
  441. private:
  442. class TChildStream : public TComputationValue<TChildStream> {
  443. public:
  444. using TBase = TComputationValue<TChildStream>;
  445. TChildStream(TMemoryUsageInfo* memInfo, const TSwitchHandler& handler,
  446. TComputationContext& ctx, const TPagedUnboxedValueList* buffer)
  447. : TBase(memInfo)
  448. , Handler(handler)
  449. , Ctx(ctx)
  450. , Buffer(buffer)
  451. {}
  452. void Reset(bool isFinished) {
  453. BufferIndex = InputIndex = 0U;
  454. IsFinished = isFinished;
  455. }
  456. private:
  457. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  458. for (;;) {
  459. if (BufferIndex == Buffer->Size()) {
  460. return IsFinished ? NUdf::EFetchStatus::Finish : NUdf::EFetchStatus::Yield;
  461. }
  462. auto current = (*Buffer)[BufferIndex];
  463. ui32 streamIndex = 0;
  464. if constexpr (IsInputVariant) {
  465. streamIndex = current.GetVariantIndex();
  466. current = current.Release().GetVariantItem();
  467. }
  468. for (; InputIndex < Handler.InputIndices.size(); ++InputIndex) {
  469. if (Handler.InputIndices[InputIndex] == streamIndex) {
  470. if (Handler.InputIndices.size() > 1) {
  471. current = Ctx.HolderFactory.CreateVariantHolder(current.Release(), InputIndex);
  472. }
  473. result = std::move(current);
  474. ++InputIndex;
  475. return NUdf::EFetchStatus::Ok;
  476. }
  477. }
  478. InputIndex = 0;
  479. ++BufferIndex;
  480. }
  481. }
  482. const TSwitchHandler Handler;
  483. TComputationContext& Ctx;
  484. const TPagedUnboxedValueList* const Buffer;
  485. ui32 BufferIndex = 0U;
  486. ui32 InputIndex = 0U;
  487. bool IsFinished = false;
  488. };
  489. class TValueBase : public TState {
  490. public:
  491. void Add(NUdf::TUnboxedValue&& item) {
  492. Buffer.Add(std::move(item));
  493. }
  494. void Reset() {
  495. if (const auto size = Buffer.Size()) {
  496. MKQL_SET_MAX_STAT(Ctx.Stats, Switch_MaxRowsCount, static_cast<i64>(size));
  497. MKQL_INC_STAT(Ctx.Stats, Switch_FlushesCount);
  498. }
  499. ChildReadIndex = 0U;
  500. for (const auto& stream : ChildrenInStreams) {
  501. stream->Reset(NUdf::EFetchStatus::Finish == InputStatus);
  502. }
  503. }
  504. bool Get(NUdf::TUnboxedValue& result) {
  505. if (ChildrenOutStreams[ChildReadIndex].Fetch(result) == NUdf::EFetchStatus::Ok) {
  506. return true;
  507. }
  508. if (++ChildReadIndex == Handlers.size()) {
  509. Buffer.Clear();
  510. }
  511. return false;
  512. }
  513. protected:
  514. TValueBase(TMemoryUsageInfo* memInfo, const TSwitchHandlersList& handlers, TComputationContext& ctx)
  515. : TState(memInfo, handlers.size())
  516. , Handlers(handlers)
  517. , Buffer(ctx.HolderFactory.GetPagePool())
  518. , Ctx(ctx)
  519. {
  520. ChildrenInStreams.reserve(Handlers.size());
  521. ChildrenOutStreams.reserve(Handlers.size());
  522. for (const auto& handler : Handlers) {
  523. const auto stream = Ctx.HolderFactory.Create<TChildStream>(handler, Ctx, &Buffer);
  524. ChildrenInStreams.emplace_back(static_cast<TChildStream*>(stream.AsBoxed().Get()));
  525. handler.Item->SetValue(Ctx, stream);
  526. ChildrenOutStreams.emplace_back(handler.NewItem->GetValue(Ctx));
  527. }
  528. }
  529. const TSwitchHandlersList Handlers;
  530. TPagedUnboxedValueList Buffer;
  531. TComputationContext& Ctx;
  532. std::vector<NUdf::TRefCountedPtr<TChildStream>, TMKQLAllocator<NUdf::TRefCountedPtr<TChildStream>>> ChildrenInStreams;
  533. TUnboxedValueVector ChildrenOutStreams;
  534. };
  535. class TValue : public TValueBase {
  536. public:
  537. TValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream,
  538. const TSwitchHandlersList& handlers, ui64 memLimit, TComputationContext& ctx)
  539. : TValueBase(memInfo, handlers, ctx)
  540. , Stream(std::move(stream)), MemLimit(memLimit)
  541. {}
  542. private:
  543. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  544. for (;;) {
  545. if (this->ChildReadIndex == this->Handlers.size()) {
  546. switch (this->InputStatus) {
  547. case NUdf::EFetchStatus::Ok: break;
  548. case NUdf::EFetchStatus::Yield:
  549. this->InputStatus = NUdf::EFetchStatus::Ok;
  550. return NUdf::EFetchStatus::Yield;
  551. case NUdf::EFetchStatus::Finish:
  552. return NUdf::EFetchStatus::Finish;
  553. }
  554. const auto initUsage = this->MemLimit ? this->Ctx.HolderFactory.GetMemoryUsed() : 0ULL;
  555. do {
  556. NUdf::TUnboxedValue current;
  557. this->InputStatus = this->Stream.Fetch(current);
  558. if (NUdf::EFetchStatus::Ok != this->InputStatus) {
  559. break;
  560. }
  561. this->Add(std::move(current));
  562. } while (!this->Ctx.template CheckAdjustedMemLimit<TrackRss>(this->MemLimit, initUsage));
  563. this->Reset();
  564. }
  565. if (!this->Get(result)) {
  566. continue;
  567. }
  568. const auto& handler = this->Handlers[this->ChildReadIndex];
  569. if (const auto offset = handler.ResultVariantOffset) {
  570. ui32 localIndex = 0;
  571. if (handler.IsOutputVariant) {
  572. localIndex = result.GetVariantIndex();
  573. result = result.Release().GetVariantItem();
  574. }
  575. result = this->Ctx.HolderFactory.CreateVariantHolder(result.Release(), *offset + localIndex);
  576. }
  577. return NUdf::EFetchStatus::Ok;
  578. }
  579. }
  580. const NUdf::TUnboxedValue Stream;
  581. const ui64 MemLimit;
  582. };
  583. #ifndef MKQL_DISABLE_CODEGEN
  584. class TCodegenValue : public TStreamCodegenSelfStateValue<TValueBase> {
  585. public:
  586. using TFetchPtr = typename TStreamCodegenSelfStateValue<TValueBase>::TFetchPtr;
  587. TCodegenValue(TMemoryUsageInfo* memInfo, TFetchPtr fetch, TComputationContext* ctx, NUdf::TUnboxedValue&& stream, const TSwitchHandlersList& handlers)
  588. : TStreamCodegenSelfStateValue<TValueBase>(memInfo, fetch, ctx, std::move(stream), handlers, *ctx)
  589. {}
  590. };
  591. #endif
  592. public:
  593. TSwitchWrapper(TComputationMutables& mutables, IComputationNode* stream, ui64 memLimit, TSwitchHandlersList&& handlers)
  594. : TBaseComputation(mutables)
  595. , Stream(stream)
  596. , MemLimit(memLimit)
  597. , Handlers(std::move(handlers))
  598. {}
  599. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  600. #ifndef MKQL_DISABLE_CODEGEN
  601. if (ctx.ExecuteLLVM && Switch)
  602. return ctx.HolderFactory.Create<TCodegenValue>(Switch, &ctx, Stream->GetValue(ctx), Handlers);
  603. #endif
  604. return ctx.HolderFactory.Create<TValue>(Stream->GetValue(ctx), Handlers, MemLimit, ctx);
  605. }
  606. private:
  607. void RegisterDependencies() const final {
  608. this->DependsOn(Stream);
  609. for (const auto& handler : Handlers) {
  610. this->Own(handler.Item);
  611. this->DependsOn(handler.NewItem);
  612. }
  613. }
  614. #ifndef MKQL_DISABLE_CODEGEN
  615. class TLLVMFieldsStructureForValueBase: public TLLVMFieldsStructureForState {
  616. private:
  617. using TBase = TLLVMFieldsStructureForState;
  618. protected:
  619. using TBase::Context;
  620. public:
  621. std::vector<llvm::Type*> GetFieldsArray() {
  622. std::vector<llvm::Type*> result = TBase::GetFields();
  623. return result;
  624. }
  625. TLLVMFieldsStructureForValueBase(llvm::LLVMContext& context)
  626. : TBase(context) {
  627. }
  628. };
  629. void GenerateFunctions(NYql::NCodegen::ICodegen& codegen) final {
  630. SwitchFunc = GenerateSwitch(codegen);
  631. codegen.ExportSymbol(SwitchFunc);
  632. }
  633. void FinalizeFunctions(NYql::NCodegen::ICodegen& codegen) final {
  634. if (SwitchFunc)
  635. Switch = reinterpret_cast<TSwitchPtr>(codegen.GetPointerToFunction(SwitchFunc));
  636. }
  637. Function* GenerateSwitch(NYql::NCodegen::ICodegen& codegen) const {
  638. auto& module = codegen.GetModule();
  639. auto& context = codegen.GetContext();
  640. const auto& name = this->MakeName("Fetch");
  641. if (const auto f = module.getFunction(name.c_str()))
  642. return f;
  643. const auto valueType = Type::getInt128Ty(context);
  644. const auto ptrValueType = PointerType::getUnqual(valueType);
  645. const auto containerType = static_cast<Type*>(valueType);
  646. const auto contextType = GetCompContextType(context);
  647. const auto statusType = Type::getInt32Ty(context);
  648. const auto indexType = Type::getInt32Ty(context);
  649. TLLVMFieldsStructureForValueBase fieldsStruct(context);
  650. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  651. const auto statePtrType = PointerType::getUnqual(stateType);
  652. const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), containerType, statePtrType, ptrValueType}, false);
  653. TCodegenContext ctx(codegen);
  654. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  655. DISubprogramAnnotator annotator(ctx, ctx.Func);
  656. auto args = ctx.Func->arg_begin();
  657. ctx.Ctx = &*args;
  658. const auto containerArg = &*++args;
  659. const auto stateArg = &*++args;
  660. const auto valuePtr = &*++args;
  661. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  662. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  663. auto block = main;
  664. const auto indexPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetIndex() }, "index_ptr", block);
  665. const auto itemPtr = new AllocaInst(valueType, 0U, "item_ptr", block);
  666. new StoreInst(ConstantInt::get(valueType, 0), itemPtr, block);
  667. BranchInst::Create(more, block);
  668. block = more;
  669. const auto index = new LoadInst(indexType, indexPtr, "index", block);
  670. const auto empty = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, index, ConstantInt::get(index->getType(), Handlers.size()), "empty", block);
  671. const auto next = BasicBlock::Create(context, "next", ctx.Func);
  672. const auto full = BasicBlock::Create(context, "full", ctx.Func);
  673. BranchInst::Create(next, full, empty, block);
  674. {
  675. block = next;
  676. const auto rest = BasicBlock::Create(context, "rest", ctx.Func);
  677. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  678. const auto pull = BasicBlock::Create(context, "pull", ctx.Func);
  679. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  680. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  681. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  682. const auto statusPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { fieldsStruct.This(), fieldsStruct.GetStatus() }, "last", block);
  683. const auto last = new LoadInst(statusType, statusPtr, "last", block);
  684. const auto choise = SwitchInst::Create(last, pull, 2U, block);
  685. choise->addCase(ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Yield)), rest);
  686. choise->addCase(ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Finish)), exit);
  687. block = rest;
  688. new StoreInst(ConstantInt::get(last->getType(), static_cast<ui32>(NUdf::EFetchStatus::Ok)), statusPtr, block);
  689. BranchInst::Create(exit, block);
  690. block = exit;
  691. ReturnInst::Create(context, last, block);
  692. block = pull;
  693. const auto used = GetMemoryUsed(MemLimit, ctx, block);
  694. const auto stream = static_cast<Value*>(containerArg);
  695. BranchInst::Create(loop, block);
  696. block = loop;
  697. const auto fetch = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, stream, codegen, block, itemPtr);
  698. new StoreInst(fetch, statusPtr, block);
  699. const auto ok = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, fetch, ConstantInt::get(fetch->getType(), static_cast<ui32>(NUdf::EFetchStatus::Ok)), "ok", block);
  700. BranchInst::Create(good, done, ok, block);
  701. block = good;
  702. const auto addFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TValueBase::Add));
  703. const auto addType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), itemPtr->getType()}, false);
  704. const auto addPtr = CastInst::Create(Instruction::IntToPtr, addFunc, PointerType::getUnqual(addType), "add", block);
  705. CallInst::Create(addType, addPtr, {stateArg, itemPtr}, "", block);
  706. const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, used, ctx, block);
  707. BranchInst::Create(done, loop, check, block);
  708. block = done;
  709. const auto resetFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TValueBase::Reset));
  710. const auto resetType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType()}, false);
  711. const auto resetPtr = CastInst::Create(Instruction::IntToPtr, resetFunc, PointerType::getUnqual(resetType), "reset", block);
  712. CallInst::Create(resetType, resetPtr, {stateArg}, "", block);
  713. BranchInst::Create(more, block);
  714. }
  715. {
  716. block = full;
  717. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  718. const auto stub = BasicBlock::Create(context, "stub", ctx.Func);
  719. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  720. ReturnInst::Create(context, ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), exit);
  721. new UnreachableInst(context, stub);
  722. const auto nextFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TValueBase::Get));
  723. const auto nextType = FunctionType::get(Type::getInt1Ty(context), {stateArg->getType(), valuePtr->getType()}, false);
  724. const auto nextPtr = CastInst::Create(Instruction::IntToPtr, nextFunc, PointerType::getUnqual(nextType), "next", block);
  725. const auto has = CallInst::Create(nextType, nextPtr, {stateArg, valuePtr}, "has", block);
  726. BranchInst::Create(good, more, has, block);
  727. block = good;
  728. const auto choise = SwitchInst::Create(index, stub, Handlers.size(), block);
  729. for (ui32 i = 0U; i < Handlers.size(); ++i) {
  730. const auto idx = ConstantInt::get(indexType, i);
  731. if (const auto offset = Handlers[i].ResultVariantOffset) {
  732. const auto var = BasicBlock::Create(context, (TString("var_") += ToString(i)).c_str(), ctx.Func);
  733. choise->addCase(idx, var);
  734. block = var;
  735. const auto output = new LoadInst(valueType, valuePtr, "output", block);
  736. ValueRelease(Handlers[i].Kind, output, ctx, block);
  737. const auto unpack = Handlers[i].IsOutputVariant ? GetVariantParts(output, ctx, block) : std::make_pair(ConstantInt::get(indexType, 0), output);
  738. const auto reindex = BinaryOperator::CreateAdd(unpack.first, ConstantInt::get(indexType, *offset), "reindex", block);
  739. const auto variant = MakeVariant(unpack.second, reindex, ctx, block);
  740. new StoreInst(variant, valuePtr, block);
  741. ValueAddRef(EValueRepresentation::Any, variant, ctx, block);
  742. ReturnInst::Create(context, ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), block);
  743. } else {
  744. choise->addCase(idx, exit);
  745. }
  746. }
  747. }
  748. return ctx.Func;
  749. }
  750. using TSwitchPtr = typename TCodegenValue::TFetchPtr;
  751. Function* SwitchFunc = nullptr;
  752. TSwitchPtr Switch = nullptr;
  753. #endif
  754. IComputationNode *const Stream;
  755. const ui64 MemLimit;
  756. const TSwitchHandlersList Handlers;
  757. };
  758. }
  759. IComputationNode* WrapSwitch(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  760. MKQL_ENSURE(callable.GetInputsCount() >= 6, "Expected at least 6 args");
  761. MKQL_ENSURE((callable.GetInputsCount() - 2) % 4 == 0, "Corrupted arguments for Switch");
  762. TSwitchHandlersList handlers;
  763. handlers.reserve(callable.GetInputsCount() >> 2U);
  764. const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
  765. const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui64>();
  766. const auto type = callable.GetType()->GetReturnType();
  767. for (ui32 i = 2; i < callable.GetInputsCount(); i += 4) {
  768. TSwitchHandler handler;
  769. const auto tuple = AS_VALUE(TTupleLiteral, callable.GetInput(i));
  770. for (ui32 tupleIndex = 0; tupleIndex < tuple->GetValuesCount(); ++tupleIndex) {
  771. handler.InputIndices.emplace_back(AS_VALUE(TDataLiteral, tuple->GetValue(tupleIndex))->AsValue().Get<ui32>());
  772. }
  773. const auto itemType = type->IsFlow() ?
  774. AS_TYPE(TFlowType, callable.GetInput(i + 2))->GetItemType():
  775. AS_TYPE(TStreamType, callable.GetInput(i + 2))->GetItemType();
  776. handler.IsOutputVariant = itemType->IsVariant();
  777. handler.Kind = GetValueRepresentation(itemType);
  778. handler.NewItem = LocateNode(ctx.NodeLocator, callable, i + 2);
  779. handler.Item = LocateExternalNode(ctx.NodeLocator, callable, i + 1);
  780. const auto offsetNode = callable.GetInput(i + 3);
  781. if (!offsetNode.GetStaticType()->IsVoid()) {
  782. handler.ResultVariantOffset = AS_VALUE(TDataLiteral, offsetNode)->AsValue().Get<ui32>();
  783. }
  784. handlers.emplace_back(std::move(handler));
  785. }
  786. const bool trackRss = EGraphPerProcess::Single == ctx.GraphPerProcess;
  787. if (type->IsFlow()) {
  788. const bool isInputVariant = AS_TYPE(TFlowType, callable.GetInput(0))->GetItemType()->IsVariant();
  789. const auto kind = GetValueRepresentation(type);
  790. if (isInputVariant && trackRss) {
  791. return new TSwitchFlowWrapper<true, true>(ctx.Mutables, kind, stream, memLimit, std::move(handlers));
  792. } else if (isInputVariant) {
  793. return new TSwitchFlowWrapper<true, false>(ctx.Mutables, kind, stream, memLimit, std::move(handlers));
  794. } else if (trackRss) {
  795. return new TSwitchFlowWrapper<false, true>(ctx.Mutables, kind, stream, memLimit, std::move(handlers));
  796. } else {
  797. return new TSwitchFlowWrapper<false, false>(ctx.Mutables, kind, stream, memLimit, std::move(handlers));
  798. }
  799. } else if (type->IsStream()) {
  800. const bool isInputVariant = AS_TYPE(TStreamType, callable.GetInput(0))->GetItemType()->IsVariant();
  801. if (isInputVariant && trackRss) {
  802. return new TSwitchWrapper<true, true>(ctx.Mutables, stream, memLimit, std::move(handlers));
  803. } else if (isInputVariant) {
  804. return new TSwitchWrapper<true, false>(ctx.Mutables, stream, memLimit, std::move(handlers));
  805. } else if (trackRss) {
  806. return new TSwitchWrapper<false, true>(ctx.Mutables, stream, memLimit, std::move(handlers));
  807. } else {
  808. return new TSwitchWrapper<false, false>(ctx.Mutables, stream, memLimit, std::move(handlers));
  809. }
  810. }
  811. THROW yexception() << "Expected flow or stream.";
  812. }
  813. }
  814. }