mkql_discard.cpp 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. #include "mkql_discard.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_runtime_version.h>
  6. namespace NKikimr {
  7. namespace NMiniKQL {
  8. namespace {
  9. class TDiscardFlowWrapper : public TStatelessFlowCodegeneratorRootNode<TDiscardFlowWrapper> {
  10. typedef TStatelessFlowCodegeneratorRootNode<TDiscardFlowWrapper> TBaseComputation;
  11. public:
  12. TDiscardFlowWrapper(IComputationNode* flow)
  13. : TBaseComputation(flow, EValueRepresentation::Embedded), Flow(flow)
  14. {}
  15. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  16. while (true) {
  17. if (auto item = Flow->GetValue(ctx); item.IsSpecial())
  18. return item.Release();
  19. }
  20. }
  21. #ifndef MKQL_DISABLE_CODEGEN
  22. Value* DoGenerateGetValue(const TCodegenContext& ctx, BasicBlock*& block) const {
  23. auto& context = ctx.Codegen.GetContext();
  24. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  25. const auto skip = BasicBlock::Create(context, "skip", ctx.Func);
  26. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  27. BranchInst::Create(loop, block);
  28. block = loop;
  29. const auto item = GetNodeValue(Flow, ctx, block);
  30. BranchInst::Create(exit, skip, IsSpecial(item, block, context), block);
  31. block = skip;
  32. ValueCleanup(Flow->GetRepresentation(), item, ctx, block);
  33. BranchInst::Create(loop, block);
  34. block = exit;
  35. return item;
  36. }
  37. #endif
  38. private:
  39. void RegisterDependencies() const final {
  40. FlowDependsOn(Flow);
  41. }
  42. IComputationNode* const Flow;
  43. };
  44. class TDiscardWideFlowWrapper : public TStatelessFlowCodegeneratorRootNode<TDiscardWideFlowWrapper> {
  45. using TBaseComputation = TStatelessFlowCodegeneratorRootNode<TDiscardWideFlowWrapper>;
  46. public:
  47. TDiscardWideFlowWrapper(IComputationWideFlowNode* flow, ui32 size)
  48. : TBaseComputation(flow, EValueRepresentation::Embedded), Flow(flow), Stub(size, nullptr)
  49. {}
  50. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  51. while (true) {
  52. switch (Flow->FetchValues(ctx, Stub.data())) {
  53. case EFetchResult::Finish:
  54. return NUdf::TUnboxedValuePod::MakeFinish();
  55. case EFetchResult::Yield:
  56. return NUdf::TUnboxedValuePod::MakeYield();
  57. default:
  58. continue;
  59. }
  60. }
  61. }
  62. #ifndef MKQL_DISABLE_CODEGEN
  63. Value* DoGenerateGetValue(const TCodegenContext& ctx, BasicBlock*& block) const {
  64. auto& context = ctx.Codegen.GetContext();
  65. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  66. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  67. BranchInst::Create(loop, block);
  68. block = loop;
  69. const auto result = GetNodeValues(Flow, ctx, block).first;
  70. const auto good = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SGT, result, ConstantInt::get(result->getType(), 0), "good", block);
  71. BranchInst::Create(loop, exit, good, block);
  72. block = exit;
  73. const auto yield = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, result, ConstantInt::get(result->getType(), 0), "yield", block);
  74. const auto outres = SelectInst::Create(yield, GetYield(context), GetFinish(context), "outres", block);
  75. return outres;
  76. }
  77. #endif
  78. private:
  79. void RegisterDependencies() const final {
  80. FlowDependsOn(Flow);
  81. }
  82. IComputationWideFlowNode* const Flow;
  83. mutable std::vector<NUdf::TUnboxedValue*> Stub;
  84. };
  85. class TDiscardWrapper : public TCustomValueCodegeneratorNode<TDiscardWrapper> {
  86. typedef TCustomValueCodegeneratorNode<TDiscardWrapper> TBaseComputation;
  87. public:
  88. class TValue : public TComputationValue<TValue> {
  89. public:
  90. TValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream)
  91. : TComputationValue(memInfo)
  92. , Stream(std::move(stream))
  93. {
  94. }
  95. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue&) override {
  96. for (NUdf::TUnboxedValue item;;) {
  97. const auto status = Stream.Fetch(item);
  98. if (status != NUdf::EFetchStatus::Ok) {
  99. return status;
  100. }
  101. }
  102. }
  103. private:
  104. const NUdf::TUnboxedValue Stream;
  105. };
  106. TDiscardWrapper(TComputationMutables& mutables, IComputationNode* stream)
  107. : TBaseComputation(mutables)
  108. , Stream(stream)
  109. {
  110. }
  111. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  112. #ifndef MKQL_DISABLE_CODEGEN
  113. if (ctx.ExecuteLLVM && Fetch)
  114. return ctx.HolderFactory.Create<TStreamCodegenValueStateless>(Fetch, &ctx, Stream->GetValue(ctx));
  115. #endif
  116. return ctx.HolderFactory.Create<TValue>(Stream->GetValue(ctx));
  117. }
  118. private:
  119. void RegisterDependencies() const final {
  120. DependsOn(Stream);
  121. }
  122. #ifndef MKQL_DISABLE_CODEGEN
  123. void GenerateFunctions(NYql::NCodegen::ICodegen& codegen) final {
  124. FetchFunc = GenerateFetch(codegen);
  125. codegen.ExportSymbol(FetchFunc);
  126. }
  127. void FinalizeFunctions(NYql::NCodegen::ICodegen& codegen) final {
  128. if (FetchFunc)
  129. Fetch = reinterpret_cast<TFetchPtr>(codegen.GetPointerToFunction(FetchFunc));
  130. }
  131. Function* GenerateFetch(NYql::NCodegen::ICodegen& codegen) const {
  132. auto& module = codegen.GetModule();
  133. auto& context = codegen.GetContext();
  134. const auto& name = TBaseComputation::MakeName("Fetch");
  135. if (const auto f = module.getFunction(name.c_str()))
  136. return f;
  137. const auto valueType = Type::getInt128Ty(context);
  138. const auto containerType = static_cast<Type*>(valueType);
  139. const auto contextType = GetCompContextType(context);
  140. const auto statusType = Type::getInt32Ty(context);
  141. const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), containerType, PointerType::getUnqual(valueType)}, false);
  142. TCodegenContext ctx(codegen);
  143. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  144. DISubprogramAnnotator annotator(ctx, ctx.Func);
  145. auto args = ctx.Func->arg_begin();
  146. ctx.Ctx = &*args;
  147. const auto containerArg = &*++args;
  148. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  149. auto block = main;
  150. const auto container = static_cast<Value*>(containerArg);
  151. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  152. const auto stub = new AllocaInst(valueType, 0U, "stub", block);
  153. new StoreInst(ConstantInt::get(valueType, 0), stub, block);
  154. BranchInst::Create(loop, block);
  155. block = loop;
  156. const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, container, codegen, block, stub);
  157. const auto icmp = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_NE, status, ConstantInt::get(statusType, static_cast<ui32>(NUdf::EFetchStatus::Ok)), "cond", block);
  158. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  159. BranchInst::Create(done, loop, icmp, block);
  160. block = done;
  161. ReturnInst::Create(context, status, block);
  162. return ctx.Func;
  163. }
  164. using TFetchPtr = TStreamCodegenValueStateless::TFetchPtr;
  165. Function* FetchFunc = nullptr;
  166. TFetchPtr Fetch = nullptr;
  167. #endif
  168. IComputationNode* const Stream;
  169. };
  170. }
  171. IComputationNode* WrapDiscard(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  172. MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg");
  173. const auto type = callable.GetType()->GetReturnType();
  174. const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
  175. if (type->IsFlow()) {
  176. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
  177. auto flowType = AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType());
  178. if (RuntimeVersion > 35 && flowType->GetItemType()->IsMulti() || flowType->GetItemType()->IsTuple()) {
  179. return new TDiscardWideFlowWrapper(wide, GetWideComponentsCount(flowType));
  180. }
  181. return new TDiscardWideFlowWrapper(wide, 0U);
  182. } else {
  183. return new TDiscardFlowWrapper(flow);
  184. }
  185. } else if (type->IsStream()) {
  186. return new TDiscardWrapper(ctx.Mutables, flow);
  187. }
  188. THROW yexception() << "Expected flow or stream.";
  189. }
  190. }
  191. }