mkql_flow.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. #include "mkql_flow.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. namespace NKikimr {
  6. namespace NMiniKQL {
  7. namespace {
  8. template <bool IsStream>
  9. class TToFlowWrapper : public TFlowSourceCodegeneratorNode<TToFlowWrapper<IsStream>> {
  10. typedef TFlowSourceCodegeneratorNode<TToFlowWrapper<IsStream>> TBaseComputation;
  11. public:
  12. TToFlowWrapper(TComputationMutables& mutables, EValueRepresentation kind,IComputationNode* stream)
  13. : TBaseComputation(mutables, kind, EValueRepresentation::Any), Stream(stream)
  14. {}
  15. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& stream, TComputationContext& ctx) const {
  16. if (stream.IsInvalid()) {
  17. stream = IsStream ? Stream->GetValue(ctx) : Stream->GetValue(ctx).GetListIterator();
  18. }
  19. NUdf::TUnboxedValue next;
  20. if constexpr (IsStream) {
  21. switch (const auto state = stream.Fetch(next)) {
  22. case NUdf::EFetchStatus::Ok: return next.Release();
  23. case NUdf::EFetchStatus::Finish: return NUdf::TUnboxedValuePod::MakeFinish();
  24. case NUdf::EFetchStatus::Yield: return NUdf::TUnboxedValuePod::MakeYield();
  25. }
  26. } else {
  27. return stream.Next(next) ? next.Release() : NUdf::TUnboxedValuePod::MakeFinish();
  28. }
  29. }
  30. #ifndef MKQL_DISABLE_CODEGEN
  31. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  32. auto& context = ctx.Codegen.GetContext();
  33. const auto valueType = Type::getInt128Ty(context);
  34. const auto init = BasicBlock::Create(context, "init", ctx.Func);
  35. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  36. const auto load = new LoadInst(valueType, statePtr, "load", block);
  37. const auto state = PHINode::Create(load->getType(), 2U, "state", main);
  38. state->addIncoming(load, block);
  39. BranchInst::Create(init, main, IsInvalid(load, block, context), block);
  40. block = init;
  41. if constexpr (IsStream) {
  42. GetNodeValue(statePtr, Stream, ctx, block);
  43. } else {
  44. const auto list = GetNodeValue(Stream, ctx, block);
  45. CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::GetListIterator>(statePtr, list, ctx.Codegen, block);
  46. if (Stream->IsTemporaryValue())
  47. CleanupBoxed(list, ctx, block);
  48. }
  49. const auto save = new LoadInst(valueType, statePtr, "save", block);
  50. state->addIncoming(save, block);
  51. BranchInst::Create(main, block);
  52. block = main;
  53. const auto valuePtr = new AllocaInst(valueType, 0U, "value_ptr", &ctx.Func->getEntryBlock().back());
  54. new StoreInst(ConstantInt::get(valueType, 0), valuePtr, block);
  55. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  56. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  57. const auto result = PHINode::Create(valueType, 2U, "result", done);
  58. if constexpr (IsStream) {
  59. const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(Type::getInt32Ty(context), state, ctx.Codegen, block, valuePtr);
  60. const auto ok = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, status, ConstantInt::get(status->getType(), static_cast<ui32>(NUdf::EFetchStatus::Ok)), "ok", block);
  61. const auto none = BasicBlock::Create(context, "none", ctx.Func);
  62. BranchInst::Create(good, none, ok, block);
  63. block = none;
  64. const auto yield = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, status, ConstantInt::get(status->getType(), static_cast<ui32>(NUdf::EFetchStatus::Yield)), "yield", block);
  65. const auto special = SelectInst::Create(yield, GetYield(context), GetFinish(context), "special", block);
  66. result->addIncoming(special, block);
  67. BranchInst::Create(done, block);
  68. } else {
  69. const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Next>(Type::getInt1Ty(context), state, ctx.Codegen, block, valuePtr);
  70. result->addIncoming(GetFinish(context), block);
  71. BranchInst::Create(good, done, status, block);
  72. }
  73. block = good;
  74. const auto value = new LoadInst(valueType, valuePtr, "value", block);
  75. ValueRelease(static_cast<const IComputationNode*>(this)->GetRepresentation(), value, ctx, block);
  76. result->addIncoming(value, block);
  77. BranchInst::Create(done, block);
  78. block = done;
  79. return result;
  80. }
  81. #endif
  82. private:
  83. void RegisterDependencies() const final {
  84. this->DependsOn(Stream);
  85. }
  86. IComputationNode* const Stream;
  87. };
  88. template <bool IsItemOptional = true>
  89. class TOptToFlowWrapper : public TFlowSourceCodegeneratorNode<TOptToFlowWrapper<IsItemOptional>> {
  90. typedef TFlowSourceCodegeneratorNode<TOptToFlowWrapper<IsItemOptional>> TBaseComputation;
  91. public:
  92. TOptToFlowWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationNode* optional)
  93. : TBaseComputation(mutables, kind, EValueRepresentation::Embedded), Optional(optional)
  94. {}
  95. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  96. if (state.IsFinish()) {
  97. return state;
  98. }
  99. state = NUdf::TUnboxedValue::MakeFinish();
  100. if (auto value = Optional->GetValue(ctx)) {
  101. return value.Release().GetOptionalValueIf<IsItemOptional>();
  102. }
  103. return state;
  104. }
  105. #ifndef MKQL_DISABLE_CODEGEN
  106. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  107. auto& context = ctx.Codegen.GetContext();
  108. const auto valueType = Type::getInt128Ty(context);
  109. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  110. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  111. const auto load = new LoadInst(valueType, statePtr, "load", block);
  112. const auto result = PHINode::Create(valueType, 2U, "state", done);
  113. result->addIncoming(load, block);
  114. BranchInst::Create(done, main, IsFinish(load, block, context), block);
  115. block = main;
  116. const auto finish = GetFinish(context);
  117. new StoreInst(finish, statePtr, block);
  118. const auto optional = GetNodeValue(Optional, ctx, block);
  119. const auto value = IsItemOptional ? GetOptionalValue(context, optional, block) : optional;
  120. const auto output = SelectInst::Create(IsEmpty(optional, block, context), finish, value, "output", block);
  121. result->addIncoming(output, block);
  122. BranchInst::Create(done, block);
  123. block = done;
  124. return result;
  125. }
  126. #endif
  127. private:
  128. void RegisterDependencies() const final {
  129. this->DependsOn(Optional);
  130. }
  131. IComputationNode* const Optional;
  132. };
  133. class TFromFlowWrapper : public TCustomValueCodegeneratorNode<TFromFlowWrapper> {
  134. typedef TCustomValueCodegeneratorNode<TFromFlowWrapper> TBaseComputation;
  135. public:
  136. class TStreamValue : public TComputationValue<TStreamValue> {
  137. public:
  138. using TBase = TComputationValue<TStreamValue>;
  139. TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, IComputationNode* flow)
  140. : TBase(memInfo), CompCtx(compCtx), Flow(flow)
  141. {}
  142. private:
  143. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  144. result = Flow->GetValue(CompCtx);
  145. if (result.IsFinish())
  146. return NUdf::EFetchStatus::Finish;
  147. if (result.IsYield())
  148. return NUdf::EFetchStatus::Yield;
  149. return NUdf::EFetchStatus::Ok;
  150. }
  151. TComputationContext& CompCtx;
  152. IComputationNode* const Flow;
  153. };
  154. class TStreamCodegenValue : public TComputationValue<TStreamCodegenValue> {
  155. public:
  156. using TBase = TComputationValue<TStreamCodegenValue>;
  157. using TFetchPtr = NUdf::EFetchStatus (*)(TComputationContext*, NUdf::TUnboxedValuePod&);
  158. TStreamCodegenValue(TMemoryUsageInfo* memInfo, TFetchPtr fetch, TComputationContext* ctx)
  159. : TBase(memInfo), FetchFunc(fetch), Ctx(ctx)
  160. {}
  161. protected:
  162. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  163. return FetchFunc(Ctx, result);
  164. }
  165. const TFetchPtr FetchFunc;
  166. TComputationContext* const Ctx;
  167. };
  168. TFromFlowWrapper(TComputationMutables& mutables, IComputationNode* flow)
  169. : TBaseComputation(mutables), Flow(flow)
  170. {}
  171. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  172. #ifndef MKQL_DISABLE_CODEGEN
  173. if (ctx.ExecuteLLVM && Fetch)
  174. return ctx.HolderFactory.Create<TStreamCodegenValue>(Fetch, &ctx);
  175. #endif
  176. return ctx.HolderFactory.Create<TStreamValue>(ctx, Flow);
  177. }
  178. private:
  179. void RegisterDependencies() const final {
  180. this->DependsOn(Flow);
  181. }
  182. #ifndef MKQL_DISABLE_CODEGEN
  183. void GenerateFunctions(NYql::NCodegen::ICodegen& codegen) final {
  184. FetchFunc = GenerateFetcher(codegen);
  185. codegen.ExportSymbol(FetchFunc);
  186. }
  187. void FinalizeFunctions(NYql::NCodegen::ICodegen& codegen) final {
  188. if (FetchFunc)
  189. Fetch = reinterpret_cast<TStreamCodegenValue::TFetchPtr>(codegen.GetPointerToFunction(FetchFunc));
  190. }
  191. Function* GenerateFetcher(NYql::NCodegen::ICodegen& codegen) const {
  192. auto& module = codegen.GetModule();
  193. auto& context = codegen.GetContext();
  194. const auto& name = TBaseComputation::MakeName("Fetch");
  195. if (const auto f = module.getFunction(name.c_str()))
  196. return f;
  197. const auto valueType = Type::getInt128Ty(context);
  198. const auto contextType = GetCompContextType(context);
  199. const auto statusType = Type::getInt32Ty(context);
  200. const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), PointerType::getUnqual(valueType)}, false);
  201. TCodegenContext ctx(codegen);
  202. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  203. DISubprogramAnnotator annotator(ctx, ctx.Func);
  204. auto args = ctx.Func->arg_begin();
  205. ctx.Ctx = &*args;
  206. const auto valuePtr = &*++args;
  207. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  208. auto block = main;
  209. SafeUnRefUnboxedOne(valuePtr, ctx, block);
  210. GetNodeValue(valuePtr, Flow, ctx, block);
  211. const auto value = new LoadInst(valueType, valuePtr, "value", block);
  212. const auto second = SelectInst::Create(IsYield(value, block, context), ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Yield)), ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), "second", block);
  213. const auto first = SelectInst::Create(IsFinish(value, block, context), ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Finish)), second, "second", block);
  214. ReturnInst::Create(context, first, block);
  215. return ctx.Func;
  216. }
  217. Function* FetchFunc = nullptr;
  218. TStreamCodegenValue::TFetchPtr Fetch = nullptr;
  219. #endif
  220. IComputationNode* const Flow;
  221. };
  222. class TToWideFlowWrapper : public TWideFlowSourceCodegeneratorNode<TToWideFlowWrapper> {
  223. using TBaseComputation = TWideFlowSourceCodegeneratorNode<TToWideFlowWrapper>;
  224. public:
  225. TToWideFlowWrapper(TComputationMutables& mutables, IComputationNode* stream, ui32 width)
  226. : TBaseComputation(mutables, EValueRepresentation::Any)
  227. , Stream(stream)
  228. , Width(width)
  229. , TempStateIndex(std::exchange(mutables.CurValueIndex, mutables.CurValueIndex + Width))
  230. {}
  231. EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
  232. if (state.IsInvalid()) {
  233. state = Stream->GetValue(ctx);
  234. }
  235. switch (const auto status = state.WideFetch(ctx.MutableValues.get() + TempStateIndex, Width)) {
  236. case NUdf::EFetchStatus::Finish:
  237. return EFetchResult::Finish;
  238. case NUdf::EFetchStatus::Yield:
  239. return EFetchResult::Yield;
  240. case NUdf::EFetchStatus::Ok:
  241. break;
  242. }
  243. for (auto i = 0U; i < Width; ++i) {
  244. if (const auto out = output[i]) {
  245. *out = std::move(ctx.MutableValues[TempStateIndex + i]);
  246. }
  247. }
  248. return EFetchResult::One;
  249. }
  250. #ifndef MKQL_DISABLE_CODEGEN
  251. TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  252. auto& context = ctx.Codegen.GetContext();
  253. const auto valueType = Type::getInt128Ty(context);
  254. const auto indexType = Type::getInt32Ty(context);
  255. const auto values = GetElementPtrInst::CreateInBounds(valueType, ctx.GetMutables(), {ConstantInt::get(indexType, TempStateIndex)}, "values", &ctx.Func->getEntryBlock().back());
  256. const auto init = BasicBlock::Create(context, "init", ctx.Func);
  257. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  258. const auto load = new LoadInst(valueType, statePtr, "load", block);
  259. const auto state = PHINode::Create(load->getType(), 2U, "state", main);
  260. state->addIncoming(load, block);
  261. BranchInst::Create(init, main, IsInvalid(load, block, context), block);
  262. block = init;
  263. GetNodeValue(statePtr, Stream, ctx, block);
  264. const auto save = new LoadInst(valueType, statePtr, "save", block);
  265. state->addIncoming(save, block);
  266. BranchInst::Create(main, block);
  267. block = main;
  268. const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::WideFetch>(indexType, state, ctx.Codegen, block, values, ConstantInt::get(indexType, Width));
  269. const auto ok = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, status, ConstantInt::get(indexType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), "ok", block);
  270. const auto yield = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, status, ConstantInt::get(indexType, static_cast<ui32>(NUdf::EFetchStatus::Yield)), "yield", block);
  271. const auto special = SelectInst::Create(yield, ConstantInt::get(indexType, static_cast<i32>(EFetchResult::Yield)), ConstantInt::get(indexType, static_cast<i32>(EFetchResult::Finish)), "special", block);
  272. const auto complete = SelectInst::Create(ok, ConstantInt::get(indexType, static_cast<i32>(EFetchResult::One)), special, "complete", block);
  273. TGettersList getters(Width);
  274. for (auto i = 0U; i < getters.size(); ++i) {
  275. getters[i] = [idx = TempStateIndex + i, valueType, indexType](const TCodegenContext& ctx, BasicBlock*& block) {
  276. const auto valuePtr = GetElementPtrInst::CreateInBounds(valueType, ctx.GetMutables(), {ConstantInt::get(indexType, idx)}, (TString("ptr_") += ToString(idx)).c_str(), block);
  277. return new LoadInst(valueType, valuePtr, (TString("val_") += ToString(idx)).c_str(), block);
  278. };
  279. };
  280. return {complete, std::move(getters)};
  281. }
  282. #endif
  283. private:
  284. void RegisterDependencies() const final {
  285. this->DependsOn(Stream);
  286. }
  287. IComputationNode* const Stream;
  288. const ui32 Width;
  289. const ui32 TempStateIndex;
  290. };
  291. class TFromWideFlowWrapper : public TCustomValueCodegeneratorNode<TFromWideFlowWrapper> {
  292. using TBaseComputation = TCustomValueCodegeneratorNode<TFromWideFlowWrapper>;
  293. public:
  294. class TStreamValue : public TComputationValue<TStreamValue> {
  295. public:
  296. using TBase = TComputationValue<TStreamValue>;
  297. TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, IComputationWideFlowNode* wideFlow, ui32 width, ui32 stubsIndex)
  298. : TBase(memInfo)
  299. , CompCtx(compCtx)
  300. , WideFlow(wideFlow)
  301. , Width(width)
  302. , StubsIndex(stubsIndex)
  303. , ClientBuffer(nullptr)
  304. {}
  305. private:
  306. NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final {
  307. if (width != Width)
  308. Throw(width, Width);
  309. const auto valuePtrs = CompCtx.WideFields.data() + StubsIndex;
  310. if (result != ClientBuffer) {
  311. for (ui32 i = 0; i < width; ++i) {
  312. valuePtrs[i] = result + i;
  313. }
  314. ClientBuffer = result;
  315. }
  316. switch (const auto status = WideFlow->FetchValues(CompCtx, valuePtrs)) {
  317. case EFetchResult::Finish:
  318. return NUdf::EFetchStatus::Finish;
  319. case EFetchResult::Yield:
  320. return NUdf::EFetchStatus::Yield;
  321. case EFetchResult::One:
  322. return NUdf::EFetchStatus::Ok;
  323. }
  324. }
  325. TComputationContext& CompCtx;
  326. IComputationWideFlowNode* const WideFlow;
  327. const ui32 Width;
  328. const ui32 StubsIndex;
  329. const NUdf::TUnboxedValue* ClientBuffer;
  330. };
  331. class TStreamCodegenValue : public TComputationValue<TStreamCodegenValue> {
  332. public:
  333. using TBase = TComputationValue<TStreamCodegenValue>;
  334. using TWideFetchPtr = NUdf::EFetchStatus (*)(TComputationContext*, NUdf::TUnboxedValuePod*, ui32);
  335. TStreamCodegenValue(TMemoryUsageInfo* memInfo, TWideFetchPtr fetch, TComputationContext* ctx)
  336. : TBase(memInfo), WideFetchFunc(fetch), Ctx(ctx)
  337. {}
  338. private:
  339. NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final {
  340. return WideFetchFunc(Ctx, result, width);
  341. }
  342. const TWideFetchPtr WideFetchFunc;
  343. TComputationContext* const Ctx;
  344. };
  345. TFromWideFlowWrapper(TComputationMutables& mutables, IComputationWideFlowNode* wideFlow, std::vector<EValueRepresentation>&& representations)
  346. : TBaseComputation(mutables)
  347. , WideFlow(wideFlow)
  348. , Representations(std::move(representations))
  349. , StubsIndex(mutables.IncrementWideFieldsIndex(Representations.size()))
  350. {}
  351. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  352. #ifndef MKQL_DISABLE_CODEGEN
  353. if (ctx.ExecuteLLVM && WideFetch)
  354. return ctx.HolderFactory.Create<TStreamCodegenValue>(WideFetch, &ctx);
  355. #endif
  356. return ctx.HolderFactory.Create<TStreamValue>(ctx, WideFlow, Representations.size(), StubsIndex);
  357. }
  358. private:
  359. void RegisterDependencies() const final {
  360. this->DependsOn(WideFlow);
  361. }
  362. [[noreturn]] static void Throw(ui32 requested, ui32 expected) {
  363. TStringBuilder res;
  364. res << "Requested " << requested << " fields, but expected " << expected;
  365. UdfTerminate(res.data());
  366. }
  367. #ifndef MKQL_DISABLE_CODEGEN
  368. void GenerateFunctions(NYql::NCodegen::ICodegen& codegen) final {
  369. WideFetchFunc = GenerateFetcher(codegen);
  370. codegen.ExportSymbol(WideFetchFunc);
  371. }
  372. void FinalizeFunctions(NYql::NCodegen::ICodegen& codegen) final {
  373. if (WideFetchFunc)
  374. WideFetch = reinterpret_cast<TStreamCodegenValue::TWideFetchPtr>(codegen.GetPointerToFunction(WideFetchFunc));
  375. }
  376. Function* GenerateFetcher(NYql::NCodegen::ICodegen& codegen) const {
  377. auto& module = codegen.GetModule();
  378. auto& context = codegen.GetContext();
  379. const auto& name = TBaseComputation::MakeName("WideFetch");
  380. if (const auto f = module.getFunction(name.c_str()))
  381. return f;
  382. const auto valueType = Type::getInt128Ty(context);
  383. const auto contextType = GetCompContextType(context);
  384. const auto statusType = Type::getInt32Ty(context);
  385. const auto indexType = Type::getInt32Ty(context);
  386. const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), PointerType::getUnqual(valueType), indexType}, false);
  387. TCodegenContext ctx(codegen);
  388. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  389. DISubprogramAnnotator annotator(ctx, ctx.Func);
  390. auto args = ctx.Func->arg_begin();
  391. ctx.Ctx = &*args;
  392. const auto valuesPtr = &*++args;
  393. const auto width = &*++args;
  394. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  395. const auto work = BasicBlock::Create(context, "work", ctx.Func);
  396. const auto fail = BasicBlock::Create(context, "fail", ctx.Func);
  397. auto block = main;
  398. const auto check = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, width, ConstantInt::get(width->getType(), Representations.size()), "check", block);
  399. BranchInst::Create(work, fail, check, block);
  400. block = work;
  401. std::vector<Value*> pointers(Representations.size());
  402. for (auto i = 0U; i < pointers.size(); ++i) {
  403. pointers[i] = GetElementPtrInst::CreateInBounds(valueType, valuesPtr, {ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), block);
  404. SafeUnRefUnboxedOne(pointers[i], ctx, block);
  405. }
  406. const auto getres = GetNodeValues(WideFlow, ctx, block);
  407. const auto yield = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, getres.first, ConstantInt::get(indexType, static_cast<i32>(EFetchResult::Yield)), "yield", block);
  408. const auto special = SelectInst::Create(yield, ConstantInt::get(indexType, static_cast<ui32>(NUdf::EFetchStatus::Yield)), ConstantInt::get(indexType, static_cast<ui32>(NUdf::EFetchStatus::Finish)), "special", block);
  409. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  410. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  411. const auto result = PHINode::Create(statusType, 2U, "result", done);
  412. result->addIncoming(special, block);
  413. const auto row = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, getres.first, ConstantInt::get(indexType, static_cast<i32>(EFetchResult::One)), "row", block);
  414. BranchInst::Create(good, done, row, block);
  415. block = good;
  416. for (auto i = 0U; i < pointers.size(); ++i) {
  417. auto value = getres.second[i](ctx, block);
  418. ValueAddRef(Representations[i], value, ctx, block);
  419. new StoreInst(value, pointers[i], block);
  420. }
  421. result->addIncoming(ConstantInt::get(indexType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), block);
  422. BranchInst::Create(done, block);
  423. block = done;
  424. ReturnInst::Create(context, result, block);
  425. block = fail;
  426. const auto throwFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TFromWideFlowWrapper::Throw));
  427. const auto throwFuncType = FunctionType::get(Type::getVoidTy(context), { indexType, indexType }, false);
  428. const auto throwFuncPtr = CastInst::Create(Instruction::IntToPtr, throwFunc, PointerType::getUnqual(throwFuncType), "thrower", block);
  429. CallInst::Create(throwFuncType, throwFuncPtr, { width, ConstantInt::get(width->getType(), Representations.size()) }, "", block)->setTailCall();
  430. new UnreachableInst(context, block);
  431. return ctx.Func;
  432. }
  433. Function* WideFetchFunc = nullptr;
  434. TStreamCodegenValue::TWideFetchPtr WideFetch = nullptr;
  435. #endif
  436. IComputationWideFlowNode* const WideFlow;
  437. const std::vector<EValueRepresentation> Representations;
  438. const ui32 StubsIndex;
  439. };
  440. } // namespace
  441. IComputationNode* WrapToFlow(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  442. MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
  443. const auto type = callable.GetInput(0).GetStaticType();
  444. const auto outType = AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType();
  445. const auto kind = GetValueRepresentation(outType);
  446. if (type->IsStream()) {
  447. if (const auto streamType = AS_TYPE(TStreamType, type); streamType->GetItemType()->IsMulti()) {
  448. const auto multiType = AS_TYPE(TMultiType, streamType->GetItemType());
  449. return new TToWideFlowWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0), multiType->GetElementsCount());
  450. }
  451. return new TToFlowWrapper<true>(ctx.Mutables, kind, LocateNode(ctx.NodeLocator, callable, 0));
  452. } else if (type->IsList()) {
  453. return new TToFlowWrapper<false>(ctx.Mutables, kind, LocateNode(ctx.NodeLocator, callable, 0));
  454. } else if (type->IsOptional()) {
  455. if (outType->IsOptional()) {
  456. return new TOptToFlowWrapper<true>(ctx.Mutables, kind, LocateNode(ctx.NodeLocator, callable, 0));
  457. } else {
  458. return new TOptToFlowWrapper<false>(ctx.Mutables, kind, LocateNode(ctx.NodeLocator, callable, 0));
  459. }
  460. }
  461. THROW yexception() << "Expected optional, list or stream.";
  462. }
  463. IComputationNode* WrapFromFlow(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  464. MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
  465. const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
  466. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
  467. const auto multiType = AS_TYPE(TMultiType, AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType())->GetItemType());
  468. std::vector<EValueRepresentation> outputRepresentations(multiType->GetElementsCount());
  469. for (auto i = 0U; i < outputRepresentations.size(); ++i)
  470. outputRepresentations[i] = GetValueRepresentation(multiType->GetElementType(i));
  471. return new TFromWideFlowWrapper(ctx.Mutables, wide, std::move(outputRepresentations));
  472. }
  473. return new TFromFlowWrapper(ctx.Mutables, flow);
  474. }
  475. }
  476. }