mkql_chopper.cpp 33 KB


  1. #include "mkql_chopper.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  3. #include <yql/essentials/minikql/mkql_node_cast.h>
  4. namespace NKikimr {
  5. namespace NMiniKQL {
  6. namespace {
  7. class TChopperFlowWrapper : public TStatefulFlowCodegeneratorNode<TChopperFlowWrapper> {
  8. typedef TStatefulFlowCodegeneratorNode<TChopperFlowWrapper> TBaseComputation;
  9. public:
  10. enum class EState : ui64 {
  11. Work,
  12. Chop,
  13. Next,
  14. Skip
  15. };
  16. TChopperFlowWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationNode* flow, IComputationExternalNode* itemArg, IComputationNode* key, IComputationExternalNode* keyArg, IComputationNode* chop, IComputationExternalNode* input, IComputationNode* output)
  17. : TBaseComputation(mutables, flow, kind, EValueRepresentation::Any)
  18. , Flow(flow)
  19. , ItemArg(itemArg)
  20. , Key(key)
  21. , KeyArg(keyArg)
  22. , Chop(chop)
  23. , Input(input)
  24. , Output(output)
  25. {
  26. Input->SetGetter(std::bind(&TChopperFlowWrapper::Getter, this, std::bind(&TChopperFlowWrapper::RefState, this, std::placeholders::_1), std::placeholders::_1));
  27. #ifndef MKQL_DISABLE_CODEGEN
  28. const auto codegenInput = dynamic_cast<ICodegeneratorExternalNode*>(Input);
  29. MKQL_ENSURE(codegenInput, "Input arg must be codegenerator node.");
  30. codegenInput->SetValueGetterBuilder([this](const TCodegenContext& ctx) {
  31. return GenerateHandler(ctx.Codegen);
  32. });
  33. #endif
  34. }
  35. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  36. if (state.IsInvalid()) {
  37. if (auto item = Flow->GetValue(ctx); item.IsSpecial()) {
  38. return item.Release();
  39. } else {
  40. state = NUdf::TUnboxedValuePod(ui64(EState::Next));
  41. ItemArg->SetValue(ctx, std::move(item));
  42. KeyArg->SetValue(ctx, Key->GetValue(ctx));
  43. }
  44. } else if (EState::Skip == EState(state.Get<ui64>())) {
  45. do {
  46. if (auto next = Flow->GetValue(ctx); next.IsSpecial())
  47. return next.Release();
  48. else
  49. ItemArg->SetValue(ctx, std::move(next));
  50. } while (!Chop->GetValue(ctx).Get<bool>());
  51. KeyArg->SetValue(ctx, Key->GetValue(ctx));
  52. state = NUdf::TUnboxedValuePod(ui64(EState::Next));
  53. }
  54. while (true) {
  55. auto output = Output->GetValue(ctx);
  56. if (output.IsFinish()) {
  57. Input->InvalidateValue(ctx);
  58. switch (EState(state.Get<ui64>())) {
  59. case EState::Work:
  60. case EState::Next:
  61. do {
  62. if (auto next = Flow->GetValue(ctx); next.IsSpecial()) {
  63. if (next.IsYield()) {
  64. state = NUdf::TUnboxedValuePod(ui64(EState::Skip));
  65. }
  66. return next.Release();
  67. } else {
  68. ItemArg->SetValue(ctx, std::move(next));
  69. }
  70. } while (!Chop->GetValue(ctx).Get<bool>());
  71. case EState::Chop:
  72. KeyArg->SetValue(ctx, Key->GetValue(ctx));
  73. state = NUdf::TUnboxedValuePod(ui64(EState::Next));
  74. default:
  75. continue;
  76. }
  77. }
  78. return output.Release();
  79. }
  80. }
  81. NUdf::TUnboxedValuePod Getter(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  82. if (EState::Next == EState(state.Get<ui64>())) {
  83. state = NUdf::TUnboxedValuePod(ui64(EState::Work));
  84. return ItemArg->GetValue(ctx).Release();
  85. }
  86. auto item = Flow->GetValue(ctx);
  87. if (!item.IsSpecial()) {
  88. ItemArg->SetValue(ctx, NUdf::TUnboxedValue(item));
  89. if (Chop->GetValue(ctx).Get<bool>()) {
  90. state = NUdf::TUnboxedValuePod(ui64(EState::Chop));
  91. return NUdf::TUnboxedValuePod::MakeFinish();
  92. }
  93. }
  94. return item.Release();
  95. }
  96. #ifndef MKQL_DISABLE_CODEGEN
  97. private:
  98. Function* GenerateHandler(NYql::NCodegen::ICodegen& codegen) const {
  99. auto& module = codegen.GetModule();
  100. auto& context = codegen.GetContext();
  101. TStringStream out;
  102. out << this->DebugString() << "::Handler_(" << static_cast<const void*>(this) << ").";
  103. const auto& name = out.Str();
  104. if (const auto f = module.getFunction(name.c_str()))
  105. return f;
  106. const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(ItemArg);
  107. const auto codegenKeyArg = dynamic_cast<ICodegeneratorExternalNode*>(KeyArg);
  108. MKQL_ENSURE(codegenItemArg, "Item arg must be codegenerator node.");
  109. MKQL_ENSURE(codegenKeyArg, "Key arg must be codegenerator node.");
  110. const auto valueType = Type::getInt128Ty(context);
  111. const auto funcType = FunctionType::get(valueType, {PointerType::getUnqual(GetCompContextType(context))}, false);
  112. TCodegenContext ctx(codegen);
  113. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  114. DISubprogramAnnotator annotator(ctx, ctx.Func);
  115. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  116. ctx.Ctx = &*ctx.Func->arg_begin();
  117. ctx.Ctx->addAttr(Attribute::NonNull);
  118. auto block = main;
  119. const auto load = BasicBlock::Create(context, "load", ctx.Func);
  120. const auto work = BasicBlock::Create(context, "work", ctx.Func);
  121. const auto statePtr = GetElementPtrInst::CreateInBounds(valueType, ctx.GetMutables(), {ConstantInt::get(Type::getInt32Ty(context), static_cast<const IComputationNode*>(this)->GetIndex())}, "state_ptr", block);
  122. const auto entry = new LoadInst(valueType, statePtr, "entry", block);
  123. const auto next = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, entry, GetConstant(ui64(EState::Next), context), "next", block);
  124. BranchInst::Create(load, work, next, block);
  125. {
  126. block = load;
  127. new StoreInst(GetConstant(ui64(EState::Work), context), statePtr, block);
  128. const auto item = GetNodeValue(ItemArg, ctx, block);
  129. ReturnInst::Create(context, item, block);
  130. }
  131. {
  132. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  133. const auto step = BasicBlock::Create(context, "step", ctx.Func);
  134. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  135. block = work;
  136. const auto item = GetNodeValue(Flow, ctx, block);
  137. BranchInst::Create(exit, good, IsSpecial(item, block, context), block);
  138. block = good;
  139. codegenItemArg->CreateSetValue(ctx, block, item);
  140. const auto chop = GetNodeValue(Chop, ctx, block);
  141. const auto cast = CastInst::Create(Instruction::Trunc, chop, Type::getInt1Ty(context), "bool", block);
  142. BranchInst::Create(step, exit, cast, block);
  143. block = step;
  144. new StoreInst(GetConstant(ui64(EState::Chop), context), statePtr, block);
  145. ReturnInst::Create(context, GetFinish(context), block);
  146. block = exit;
  147. ReturnInst::Create(context, item, block);
  148. }
  149. return ctx.Func;
  150. }
  151. public:
  152. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  153. const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(ItemArg);
  154. const auto codegenKeyArg = dynamic_cast<ICodegeneratorExternalNode*>(KeyArg);
  155. const auto codegenInput = dynamic_cast<ICodegeneratorExternalNode*>(Input);
  156. MKQL_ENSURE(codegenItemArg, "Item arg must be codegenerator node.");
  157. MKQL_ENSURE(codegenKeyArg, "Key arg must be codegenerator node.");
  158. MKQL_ENSURE(codegenInput, "Input arg must be codegenerator node.");
  159. auto& context = ctx.Codegen.GetContext();
  160. const auto init = BasicBlock::Create(context, "init", ctx.Func);
  161. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  162. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  163. const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
  164. const auto valueType = Type::getInt128Ty(context);
  165. const auto result = PHINode::Create(valueType, 5U, "result", exit);
  166. const auto first = new LoadInst(valueType, statePtr, "first", block);
  167. const auto enter = SwitchInst::Create(first, loop, 2U, block);
  168. enter->addCase(GetInvalid(context), init);
  169. enter->addCase(GetConstant(ui64(EState::Skip), context), pass);
  170. {
  171. const auto next = BasicBlock::Create(context, "next", ctx.Func);
  172. block = init;
  173. const auto item = GetNodeValue(Flow, ctx, block);
  174. result->addIncoming(item, block);
  175. BranchInst::Create(exit, next, IsSpecial(item, block, context), block);
  176. block = next;
  177. new StoreInst(GetConstant(ui64(EState::Next), context), statePtr, block);
  178. codegenItemArg->CreateSetValue(ctx, block, item);
  179. const auto key = GetNodeValue(Key, ctx, block);
  180. codegenKeyArg->CreateSetValue(ctx, block, key);
  181. BranchInst::Create(loop, block);
  182. }
  183. {
  184. const auto part = BasicBlock::Create(context, "part", ctx.Func);
  185. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  186. const auto step = BasicBlock::Create(context, "step", ctx.Func);
  187. const auto skip = BasicBlock::Create(context, "skip", ctx.Func);
  188. block = loop;
  189. const auto item = GetNodeValue(Output, ctx, block);
  190. const auto state = new LoadInst(valueType, statePtr, "state", block);
  191. result->addIncoming(item, block);
  192. BranchInst::Create(part, exit, IsFinish(item, block, context), block);
  193. block = part;
  194. codegenInput->CreateInvalidate(ctx, block);
  195. result->addIncoming(GetFinish(context), block);
  196. const auto choise = SwitchInst::Create(state, exit, 3U, block);
  197. choise->addCase(GetConstant(ui64(EState::Next), context), pass);
  198. choise->addCase(GetConstant(ui64(EState::Work), context), pass);
  199. choise->addCase(GetConstant(ui64(EState::Chop), context), step);
  200. block = pass;
  201. const auto next = GetNodeValue(Flow, ctx, block);
  202. result->addIncoming(next, block);
  203. const auto way = SwitchInst::Create(next, good, 2U, block);
  204. way->addCase(GetFinish(context), exit);
  205. way->addCase(GetYield(context), skip);
  206. block = good;
  207. codegenItemArg->CreateSetValue(ctx, block, next);
  208. const auto chop = GetNodeValue(Chop, ctx, block);
  209. const auto cast = CastInst::Create(Instruction::Trunc, chop, Type::getInt1Ty(context), "bool", block);
  210. BranchInst::Create(step, pass, cast, block);
  211. block = step;
  212. new StoreInst(GetConstant(ui64(EState::Next), context), statePtr, block);
  213. const auto key = GetNodeValue(Key, ctx, block);
  214. codegenKeyArg->CreateSetValue(ctx, block, key);
  215. BranchInst::Create(loop, block);
  216. block = skip;
  217. new StoreInst(GetConstant(ui64(EState::Skip), context), statePtr, block);
  218. result->addIncoming(next, block);
  219. BranchInst::Create(exit, block);
  220. }
  221. block = exit;
  222. return result;
  223. }
  224. #endif
  225. private:
  226. void RegisterDependencies() const final {
  227. if (const auto flow = FlowDependsOn(Flow)) {
  228. Own(flow, ItemArg);
  229. DependsOn(flow, Key);
  230. Own(flow, KeyArg);
  231. DependsOn(flow, Chop);
  232. Own(flow, Input);
  233. DependsOn(flow, Output);
  234. }
  235. }
  236. IComputationNode *const Flow;
  237. IComputationExternalNode *const ItemArg;
  238. IComputationNode *const Key;
  239. IComputationExternalNode *const KeyArg;
  240. IComputationNode *const Chop;
  241. IComputationExternalNode *const Input;
  242. IComputationNode *const Output;
  243. };
  244. class TChopperWrapper : public TCustomValueCodegeneratorNode<TChopperWrapper> {
  245. typedef TCustomValueCodegeneratorNode<TChopperWrapper> TBaseComputation;
  246. private:
  247. enum class EState : ui8 {
  248. Init,
  249. Work,
  250. Chop,
  251. Next,
  252. Skip,
  253. };
  254. using TStatePtr = std::shared_ptr<EState>;
  255. class TSubStream : public TComputationValue<TSubStream> {
  256. public:
  257. using TBase = TComputationValue<TSubStream>;
  258. TSubStream(TMemoryUsageInfo* memInfo, const TStatePtr& state, const NUdf::TUnboxedValue& stream, IComputationExternalNode* itemArg, IComputationNode* chop, TComputationContext& ctx)
  259. : TBase(memInfo), State(state), Stream(stream)
  260. , ItemArg(itemArg)
  261. , Chop(chop)
  262. , Ctx(ctx)
  263. {}
  264. private:
  265. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  266. auto& state = *State;
  267. if (EState::Next == state) {
  268. state = EState::Work;
  269. result = ItemArg->GetValue(Ctx);
  270. return NUdf::EFetchStatus::Ok;
  271. }
  272. while (true) {
  273. switch (const auto status = Stream.Fetch(result)) {
  274. case NUdf::EFetchStatus::Ok: {
  275. ItemArg->SetValue(Ctx, NUdf::TUnboxedValue(result));
  276. if (Chop->GetValue(Ctx).Get<bool>()) {
  277. state = EState::Chop;
  278. return NUdf::EFetchStatus::Finish;
  279. }
  280. return status;
  281. }
  282. case NUdf::EFetchStatus::Finish:
  283. case NUdf::EFetchStatus::Yield:
  284. return status;
  285. }
  286. }
  287. }
  288. const TStatePtr State;
  289. const NUdf::TUnboxedValue Stream;
  290. IComputationExternalNode *const ItemArg;
  291. IComputationNode *const Chop;
  292. TComputationContext& Ctx;
  293. };
  294. class TMainStream : public TComputationValue<TMainStream> {
  295. public:
  296. TMainStream(TMemoryUsageInfo* memInfo, TStatePtr&& state, NUdf::TUnboxedValue&& stream, const IComputationExternalNode *itemArg, const IComputationNode *key, const IComputationExternalNode *keyArg, const IComputationNode *chop, const IComputationExternalNode *input, const IComputationNode *output, TComputationContext& ctx)
  297. : TComputationValue(memInfo), State(std::move(state)), ItemArg(itemArg), Key(key), Chop(chop), KeyArg(keyArg), Input(input), Output(output), InputStream(std::move(stream)), Ctx(ctx)
  298. {}
  299. private:
  300. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  301. while (true) {
  302. if (Stream) {
  303. if (const auto status = Stream.Fetch(result); NUdf::EFetchStatus::Finish != status) {
  304. return status;
  305. }
  306. Stream = NUdf::TUnboxedValuePod();
  307. Input->InvalidateValue(Ctx);
  308. }
  309. switch (auto& state = *State) {
  310. case EState::Init:
  311. if (const auto status = InputStream.Fetch(ItemArg->RefValue(Ctx)); NUdf::EFetchStatus::Ok != status) {
  312. return status;
  313. }
  314. state = EState::Next;
  315. KeyArg->SetValue(Ctx, Key->GetValue(Ctx));
  316. break;
  317. case EState::Work:
  318. case EState::Next:
  319. case EState::Skip:
  320. do switch (const auto status = InputStream.Fetch(ItemArg->RefValue(Ctx))) {
  321. case NUdf::EFetchStatus::Ok:
  322. break;
  323. case NUdf::EFetchStatus::Yield:
  324. state = EState::Skip;
  325. case NUdf::EFetchStatus::Finish:
  326. return status;
  327. } while (!Chop->GetValue(Ctx).Get<bool>());
  328. [[fallthrough]];
  329. case EState::Chop:
  330. state = EState::Next;
  331. KeyArg->SetValue(Ctx, Key->GetValue(Ctx));
  332. break;
  333. }
  334. Stream = Output->GetValue(Ctx);
  335. }
  336. }
  337. const TStatePtr State;
  338. const IComputationExternalNode *const ItemArg;
  339. const IComputationNode* Key;
  340. const IComputationNode* Chop;
  341. const IComputationExternalNode* KeyArg;
  342. const IComputationExternalNode* Input;
  343. const IComputationNode* Output;
  344. const NUdf::TUnboxedValue InputStream;
  345. NUdf::TUnboxedValue Stream;
  346. TComputationContext& Ctx;
  347. };
  348. #ifndef MKQL_DISABLE_CODEGEN
  349. class TCodegenInput : public TComputationValue<TCodegenInput> {
  350. public:
  351. using TBase = TComputationValue<TCodegenInput>;
  352. using TFetchPtr = NUdf::EFetchStatus (*)(TComputationContext*, NUdf::TUnboxedValuePod, EState&, NUdf::TUnboxedValuePod&);
  353. TCodegenInput(TMemoryUsageInfo* memInfo, TFetchPtr fetch, const NUdf::TUnboxedValue& stream, TComputationContext* ctx, const TStatePtr& init)
  354. : TBase(memInfo)
  355. , FetchFunc(fetch)
  356. , Stream(stream)
  357. , Ctx(ctx)
  358. , State(init)
  359. {}
  360. protected:
  361. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  362. return FetchFunc(Ctx, static_cast<const NUdf::TUnboxedValuePod&>(Stream), *State, result);
  363. }
  364. const TFetchPtr FetchFunc;
  365. const NUdf::TUnboxedValue Stream;
  366. TComputationContext* const Ctx;
  367. const TStatePtr State;
  368. };
  369. class TCodegenOutput : public TComputationValue<TCodegenOutput> {
  370. public:
  371. using TBase = TComputationValue<TCodegenOutput>;
  372. using TFetchPtr = NUdf::EFetchStatus (*)(TComputationContext*, NUdf::TUnboxedValuePod&, EState&, NUdf::TUnboxedValuePod, NUdf::TUnboxedValuePod&);
  373. TCodegenOutput(TMemoryUsageInfo* memInfo, TFetchPtr fetch, TComputationContext* ctx, TStatePtr&& init, NUdf::TUnboxedValue&& input)
  374. : TBase(memInfo)
  375. , FetchFunc(fetch)
  376. , Ctx(ctx)
  377. , State(std::move(init))
  378. , InputStream(std::move(input))
  379. {}
  380. protected:
  381. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  382. return FetchFunc(Ctx, Stream, *State, InputStream, result);
  383. }
  384. const TFetchPtr FetchFunc;
  385. TComputationContext* const Ctx;
  386. const TStatePtr State;
  387. const NUdf::TUnboxedValue InputStream;
  388. NUdf::TUnboxedValue Stream;
  389. };
  390. #endif
  391. public:
  392. TChopperWrapper(TComputationMutables& mutables, IComputationNode* stream, IComputationExternalNode* itemArg, IComputationNode* key, IComputationExternalNode* keyArg, IComputationNode* chop, IComputationExternalNode* input, IComputationNode* output)
  393. : TBaseComputation(mutables)
  394. , Stream(stream)
  395. , ItemArg(itemArg)
  396. , Key(key)
  397. , KeyArg(keyArg)
  398. , Chop(chop)
  399. , Input(input)
  400. , Output(output)
  401. {}
  402. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  403. auto sharedState = std::allocate_shared<EState, TMKQLAllocator<EState>>(TMKQLAllocator<EState>(), EState::Init);
  404. auto stream = Stream->GetValue(ctx);
  405. #ifndef MKQL_DISABLE_CODEGEN
  406. if (ctx.ExecuteLLVM && InputPtr)
  407. Input->SetValue(ctx, ctx.HolderFactory.Create<TCodegenInput>(InputPtr, stream, &ctx, sharedState));
  408. else
  409. #endif
  410. Input->SetValue(ctx, ctx.HolderFactory.Create<TSubStream>(sharedState, stream, ItemArg, Chop, ctx));
  411. #ifndef MKQL_DISABLE_CODEGEN
  412. if (ctx.ExecuteLLVM && OutputPtr)
  413. return ctx.HolderFactory.Create<TCodegenOutput>(OutputPtr, &ctx, std::move(sharedState), std::move(stream));
  414. #endif
  415. return ctx.HolderFactory.Create<TMainStream>(std::move(sharedState), std::move(stream), ItemArg, Key, KeyArg, Chop, Input, Output, ctx);
  416. }
  417. private:
  418. void RegisterDependencies() const final {
  419. DependsOn(Stream);
  420. Own(ItemArg);
  421. DependsOn(Key);
  422. Own(KeyArg);
  423. DependsOn(Chop);
  424. Own(Input);
  425. DependsOn(Output);
  426. }
  427. #ifndef MKQL_DISABLE_CODEGEN
  428. void GenerateFunctions(NYql::NCodegen::ICodegen& codegen) final {
  429. InputFunc = GenerateInput(codegen);
  430. OutputFunc = GenerateOutput(codegen);
  431. codegen.ExportSymbol(InputFunc);
  432. codegen.ExportSymbol(OutputFunc);
  433. }
  434. void FinalizeFunctions(NYql::NCodegen::ICodegen& codegen) final {
  435. if (InputFunc)
  436. InputPtr = reinterpret_cast<TInputPtr>(codegen.GetPointerToFunction(InputFunc));
  437. if (OutputFunc)
  438. OutputPtr = reinterpret_cast<TOutputPtr>(codegen.GetPointerToFunction(OutputFunc));
  439. }
  440. Function* GenerateInput(NYql::NCodegen::ICodegen& codegen) const {
  441. auto& module = codegen.GetModule();
  442. auto& context = codegen.GetContext();
  443. const auto& name = MakeName("Input");
  444. if (const auto f = module.getFunction(name.c_str()))
  445. return f;
  446. const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(ItemArg);
  447. const auto codegenKeyArg = dynamic_cast<ICodegeneratorExternalNode*>(KeyArg);
  448. MKQL_ENSURE(codegenItemArg, "Item arg must be codegenerator node.");
  449. MKQL_ENSURE(codegenKeyArg, "Key arg must be codegenerator node.");
  450. const auto valueType = Type::getInt128Ty(context);
  451. const auto containerType = static_cast<Type*>(valueType);
  452. const auto contextType = GetCompContextType(context);
  453. const auto statusType = Type::getInt32Ty(context);
  454. const auto stateType = Type::getInt8Ty(context);
  455. const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), containerType, PointerType::getUnqual(stateType), PointerType::getUnqual(valueType)}, false);
  456. TCodegenContext ctx(codegen);
  457. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  458. DISubprogramAnnotator annotator(ctx, ctx.Func);
  459. auto args = ctx.Func->arg_begin();
  460. ctx.Ctx = &*args;
  461. const auto containerArg = &*++args;
  462. const auto stateArg = &*++args;
  463. const auto valuePtr = &*++args;
  464. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  465. const auto load = BasicBlock::Create(context, "load", ctx.Func);
  466. const auto work = BasicBlock::Create(context, "work", ctx.Func);
  467. auto block = main;
  468. const auto container = static_cast<Value*>(containerArg);
  469. const auto first = new LoadInst(stateType, stateArg, "first", block);
  470. const auto reload = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, first, ConstantInt::get(stateType, ui8(EState::Next)), "reload", block);
  471. BranchInst::Create(load, work, reload, block);
  472. {
  473. block = load;
  474. new StoreInst(ConstantInt::get(stateType, ui8(EState::Work)), stateArg, block);
  475. SafeUnRefUnboxedOne(valuePtr, ctx, block);
  476. GetNodeValue(valuePtr, ItemArg, ctx, block);
  477. ReturnInst::Create(context, ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Ok)), block);
  478. }
  479. {
  480. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  481. const auto step = BasicBlock::Create(context, "step", ctx.Func);
  482. const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
  483. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  484. block = work;
  485. const auto itemPtr = codegenItemArg->CreateRefValue(ctx, block);
  486. const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, container, codegen, block, itemPtr);
  487. const auto none = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_NE, status, ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Ok)), "none", block);
  488. BranchInst::Create(exit, good, none, block);
  489. block = good;
  490. const auto chop = GetNodeValue(Chop, ctx, block);
  491. const auto cast = CastInst::Create(Instruction::Trunc, chop, Type::getInt1Ty(context), "bool", block);
  492. BranchInst::Create(step, pass, cast, block);
  493. block = step;
  494. new StoreInst(ConstantInt::get(stateType, ui8(EState::Chop)), stateArg, block);
  495. ReturnInst::Create(context, ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Finish)), block);
  496. block = pass;
  497. SafeUnRefUnboxedOne(valuePtr, ctx, block);
  498. GetNodeValue(valuePtr, ItemArg, ctx, block);
  499. BranchInst::Create(exit, block);
  500. block = exit;
  501. ReturnInst::Create(context, status, block);
  502. }
  503. return ctx.Func;
  504. }
  505. Function* GenerateOutput(NYql::NCodegen::ICodegen& codegen) const {
  506. auto& module = codegen.GetModule();
  507. auto& context = codegen.GetContext();
  508. const auto& name = MakeName("Output");
  509. if (const auto f = module.getFunction(name.c_str()))
  510. return f;
  511. const auto codegenInput = dynamic_cast<ICodegeneratorExternalNode*>(Input);
  512. const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(ItemArg);
  513. const auto codegenKeyArg = dynamic_cast<ICodegeneratorExternalNode*>(KeyArg);
  514. MKQL_ENSURE(codegenItemArg, "Item arg must be codegenerator node.");
  515. MKQL_ENSURE(codegenKeyArg, "Key arg must be codegenerator node.");
  516. MKQL_ENSURE(codegenInput, "Input arg must be codegenerator node.");
  517. const auto valueType = Type::getInt128Ty(context);
  518. const auto containerType = static_cast<Type*>(valueType);
  519. const auto contextType = GetCompContextType(context);
  520. const auto statusType = Type::getInt32Ty(context);
  521. const auto stateType = Type::getInt8Ty(context);
  522. const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), PointerType::getUnqual(valueType), PointerType::getUnqual(stateType), containerType, PointerType::getUnqual(valueType)}, false);
  523. TCodegenContext ctx(codegen);
  524. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  525. DISubprogramAnnotator annotator(ctx, ctx.Func);
  526. auto args = ctx.Func->arg_begin();
  527. ctx.Ctx = &*args;
  528. const auto streamArg = &*++args;
  529. const auto stateArg = &*++args;
  530. const auto inputArg = &*++args;
  531. const auto valuePtr = &*++args;
  532. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  533. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  534. const auto work = BasicBlock::Create(context, "work", ctx.Func);
  535. const auto next = BasicBlock::Create(context, "next", ctx.Func);
  536. const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
  537. const auto skip = BasicBlock::Create(context, "skip", ctx.Func);
  538. const auto pull = BasicBlock::Create(context, "pull", ctx.Func);
  539. const auto init = BasicBlock::Create(context, "init", ctx.Func);
  540. auto block = main;
  541. const auto input = static_cast<Value*>(inputArg);
  542. BranchInst::Create(loop, block);
  543. block = loop;
  544. const auto stream = new LoadInst(valueType, streamArg, "stream", block);
  545. BranchInst::Create(next, work, IsEmpty(stream, block, context), block);
  546. {
  547. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  548. const auto step = BasicBlock::Create(context, "step", ctx.Func);
  549. block = work;
  550. const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, stream, codegen, block, valuePtr);
  551. const auto icmp = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_NE, status, ConstantInt::get(status->getType(), static_cast<ui32>(NUdf::EFetchStatus::Finish)), "cond", block);
  552. BranchInst::Create(good, step, icmp, block);
  553. block = good;
  554. ReturnInst::Create(context, status, block);
  555. block = step;
  556. UnRefBoxed(stream, ctx, block);
  557. new StoreInst(ConstantInt::get(stream->getType(), 0), streamArg, block);
  558. codegenInput->CreateInvalidate(ctx, block);
  559. BranchInst::Create(next, block);
  560. }
  561. block = next;
  562. const auto state = new LoadInst(stateType, stateArg, "state", block);
  563. const auto choise = SwitchInst::Create(state, skip, 2U, block);
  564. choise->addCase(ConstantInt::get(stateType, ui8(EState::Init)), init);
  565. choise->addCase(ConstantInt::get(stateType, ui8(EState::Chop)), pass);
  566. {
  567. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  568. block = init;
  569. const auto itemPtr = codegenItemArg->CreateRefValue(ctx, block);
  570. const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, input, codegen, block, itemPtr);
  571. const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_NE, status, ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Ok)), "special", block);
  572. BranchInst::Create(exit, pass, special, block);
  573. block = exit;
  574. ReturnInst::Create(context, status, block);
  575. }
  576. {
  577. const auto test = BasicBlock::Create(context, "test", ctx.Func);
  578. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  579. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  580. block = skip;
  581. const auto itemPtr = codegenItemArg->CreateRefValue(ctx, block);
  582. const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, input, codegen, block, itemPtr);
  583. const auto way = SwitchInst::Create(status, test, 2U, block);
  584. way->addCase(ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Yield)), exit);
  585. way->addCase(ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Finish)), done);
  586. block = exit;
  587. new StoreInst(ConstantInt::get(stateType, ui8(EState::Skip)), stateArg, block);
  588. BranchInst::Create(done, block);
  589. block = done;
  590. ReturnInst::Create(context, status, block);
  591. block = test;
  592. const auto chop = GetNodeValue(Chop, ctx, block);
  593. const auto cast = CastInst::Create(Instruction::Trunc, chop, Type::getInt1Ty(context), "bool", block);
  594. BranchInst::Create(pass, skip, cast, block);
  595. }
  596. block = pass;
  597. new StoreInst(ConstantInt::get(stateType, ui8(EState::Next)), stateArg, block);
  598. const auto key = GetNodeValue(Key, ctx, block);
  599. codegenKeyArg->CreateSetValue(ctx, block, key);
  600. BranchInst::Create(pull, block);
  601. block = pull;
  602. GetNodeValue(streamArg, Output, ctx, block);
  603. BranchInst::Create(loop, block);
  604. return ctx.Func;
  605. }
  606. using TInputPtr = typename TCodegenInput::TFetchPtr;
  607. using TOutputPtr = typename TCodegenOutput::TFetchPtr;
  608. Function* InputFunc = nullptr;
  609. Function* OutputFunc = nullptr;
  610. TInputPtr InputPtr = nullptr;
  611. TOutputPtr OutputPtr = nullptr;
  612. #endif
  613. IComputationNode *const Stream;
  614. IComputationExternalNode *const ItemArg;
  615. IComputationNode *const Key;
  616. IComputationExternalNode *const KeyArg;
  617. IComputationNode *const Chop;
  618. IComputationExternalNode *const Input;
  619. IComputationNode *const Output;
  620. };
  621. }
  622. IComputationNode* WrapChopper(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  623. MKQL_ENSURE(callable.GetInputsCount() == 7U, "Expected seven args.");
  624. const auto type = callable.GetType()->GetReturnType();
  625. const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
  626. const auto keyResult = LocateNode(ctx.NodeLocator, callable, 2);
  627. const auto switchResult = LocateNode(ctx.NodeLocator, callable, 4);
  628. const auto output = LocateNode(ctx.NodeLocator, callable, 6);
  629. const auto itemArg = LocateExternalNode(ctx.NodeLocator, callable, 1);
  630. const auto keyArg = LocateExternalNode(ctx.NodeLocator, callable, 3);
  631. const auto input = LocateExternalNode(ctx.NodeLocator, callable, 5);
  632. if (type->IsFlow()) {
  633. const auto kind = GetValueRepresentation(AS_TYPE(TFlowType, type)->GetItemType());
  634. return new TChopperFlowWrapper(ctx.Mutables, kind, stream, itemArg, keyResult, keyArg, switchResult, input, output);
  635. } else if (type->IsStream()) {
  636. return new TChopperWrapper(ctx.Mutables, stream, itemArg, keyResult, keyArg, switchResult, input, output);
  637. }
  638. THROW yexception() << "Expected flow or stream.";
  639. }
  640. }
  641. }