123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827 |
- #include "mkql_chopper.h"
- #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
- #include <yql/essentials/minikql/mkql_node_cast.h>
- namespace NKikimr {
- namespace NMiniKQL {
- namespace {
- class TChopperFlowWrapper : public TStatefulFlowCodegeneratorNode<TChopperFlowWrapper> {
- typedef TStatefulFlowCodegeneratorNode<TChopperFlowWrapper> TBaseComputation;
- public:
- enum class EState : ui64 {
- Work,
- Chop,
- Next,
- Skip
- };
- TChopperFlowWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationNode* flow, IComputationExternalNode* itemArg, IComputationNode* key, IComputationExternalNode* keyArg, IComputationNode* chop, IComputationExternalNode* input, IComputationNode* output)
- : TBaseComputation(mutables, flow, kind, EValueRepresentation::Any)
- , Flow(flow)
- , ItemArg(itemArg)
- , Key(key)
- , KeyArg(keyArg)
- , Chop(chop)
- , Input(input)
- , Output(output)
- {
- Input->SetGetter(std::bind(&TChopperFlowWrapper::Getter, this, std::bind(&TChopperFlowWrapper::RefState, this, std::placeholders::_1), std::placeholders::_1));
- #ifndef MKQL_DISABLE_CODEGEN
- const auto codegenInput = dynamic_cast<ICodegeneratorExternalNode*>(Input);
- MKQL_ENSURE(codegenInput, "Input arg must be codegenerator node.");
- codegenInput->SetValueGetterBuilder([this](const TCodegenContext& ctx) {
- return GenerateHandler(ctx.Codegen);
- });
- #endif
- }
- NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
- if (state.IsInvalid()) {
- if (auto item = Flow->GetValue(ctx); item.IsSpecial()) {
- return item.Release();
- } else {
- state = NUdf::TUnboxedValuePod(ui64(EState::Next));
- ItemArg->SetValue(ctx, std::move(item));
- KeyArg->SetValue(ctx, Key->GetValue(ctx));
- }
- } else if (EState::Skip == EState(state.Get<ui64>())) {
- do {
- if (auto next = Flow->GetValue(ctx); next.IsSpecial())
- return next.Release();
- else
- ItemArg->SetValue(ctx, std::move(next));
- } while (!Chop->GetValue(ctx).Get<bool>());
- KeyArg->SetValue(ctx, Key->GetValue(ctx));
- state = NUdf::TUnboxedValuePod(ui64(EState::Next));
- }
- while (true) {
- auto output = Output->GetValue(ctx);
- if (output.IsFinish()) {
- Input->InvalidateValue(ctx);
- switch (EState(state.Get<ui64>())) {
- case EState::Work:
- case EState::Next:
- do {
- if (auto next = Flow->GetValue(ctx); next.IsSpecial()) {
- if (next.IsYield()) {
- state = NUdf::TUnboxedValuePod(ui64(EState::Skip));
- }
- return next.Release();
- } else {
- ItemArg->SetValue(ctx, std::move(next));
- }
- } while (!Chop->GetValue(ctx).Get<bool>());
- case EState::Chop:
- KeyArg->SetValue(ctx, Key->GetValue(ctx));
- state = NUdf::TUnboxedValuePod(ui64(EState::Next));
- default:
- continue;
- }
- }
- return output.Release();
- }
- }
- NUdf::TUnboxedValuePod Getter(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
- if (EState::Next == EState(state.Get<ui64>())) {
- state = NUdf::TUnboxedValuePod(ui64(EState::Work));
- return ItemArg->GetValue(ctx).Release();
- }
- auto item = Flow->GetValue(ctx);
- if (!item.IsSpecial()) {
- ItemArg->SetValue(ctx, NUdf::TUnboxedValue(item));
- if (Chop->GetValue(ctx).Get<bool>()) {
- state = NUdf::TUnboxedValuePod(ui64(EState::Chop));
- return NUdf::TUnboxedValuePod::MakeFinish();
- }
- }
- return item.Release();
- }
- #ifndef MKQL_DISABLE_CODEGEN
- private:
- Function* GenerateHandler(NYql::NCodegen::ICodegen& codegen) const {
- auto& module = codegen.GetModule();
- auto& context = codegen.GetContext();
- TStringStream out;
- out << this->DebugString() << "::Handler_(" << static_cast<const void*>(this) << ").";
- const auto& name = out.Str();
- if (const auto f = module.getFunction(name.c_str()))
- return f;
- const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(ItemArg);
- const auto codegenKeyArg = dynamic_cast<ICodegeneratorExternalNode*>(KeyArg);
- MKQL_ENSURE(codegenItemArg, "Item arg must be codegenerator node.");
- MKQL_ENSURE(codegenKeyArg, "Key arg must be codegenerator node.");
- const auto valueType = Type::getInt128Ty(context);
- const auto funcType = FunctionType::get(valueType, {PointerType::getUnqual(GetCompContextType(context))}, false);
- TCodegenContext ctx(codegen);
- ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
- DISubprogramAnnotator annotator(ctx, ctx.Func);
- const auto main = BasicBlock::Create(context, "main", ctx.Func);
- ctx.Ctx = &*ctx.Func->arg_begin();
- ctx.Ctx->addAttr(Attribute::NonNull);
- auto block = main;
- const auto load = BasicBlock::Create(context, "load", ctx.Func);
- const auto work = BasicBlock::Create(context, "work", ctx.Func);
- const auto statePtr = GetElementPtrInst::CreateInBounds(valueType, ctx.GetMutables(), {ConstantInt::get(Type::getInt32Ty(context), static_cast<const IComputationNode*>(this)->GetIndex())}, "state_ptr", block);
- const auto entry = new LoadInst(valueType, statePtr, "entry", block);
- const auto next = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, entry, GetConstant(ui64(EState::Next), context), "next", block);
- BranchInst::Create(load, work, next, block);
- {
- block = load;
- new StoreInst(GetConstant(ui64(EState::Work), context), statePtr, block);
- const auto item = GetNodeValue(ItemArg, ctx, block);
- ReturnInst::Create(context, item, block);
- }
- {
- const auto good = BasicBlock::Create(context, "good", ctx.Func);
- const auto step = BasicBlock::Create(context, "step", ctx.Func);
- const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
- block = work;
- const auto item = GetNodeValue(Flow, ctx, block);
- BranchInst::Create(exit, good, IsSpecial(item, block, context), block);
- block = good;
- codegenItemArg->CreateSetValue(ctx, block, item);
- const auto chop = GetNodeValue(Chop, ctx, block);
- const auto cast = CastInst::Create(Instruction::Trunc, chop, Type::getInt1Ty(context), "bool", block);
- BranchInst::Create(step, exit, cast, block);
- block = step;
- new StoreInst(GetConstant(ui64(EState::Chop), context), statePtr, block);
- ReturnInst::Create(context, GetFinish(context), block);
- block = exit;
- ReturnInst::Create(context, item, block);
- }
- return ctx.Func;
- }
- public:
- Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
- const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(ItemArg);
- const auto codegenKeyArg = dynamic_cast<ICodegeneratorExternalNode*>(KeyArg);
- const auto codegenInput = dynamic_cast<ICodegeneratorExternalNode*>(Input);
- MKQL_ENSURE(codegenItemArg, "Item arg must be codegenerator node.");
- MKQL_ENSURE(codegenKeyArg, "Key arg must be codegenerator node.");
- MKQL_ENSURE(codegenInput, "Input arg must be codegenerator node.");
- auto& context = ctx.Codegen.GetContext();
- const auto init = BasicBlock::Create(context, "init", ctx.Func);
- const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
- const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
- const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
- const auto valueType = Type::getInt128Ty(context);
- const auto result = PHINode::Create(valueType, 5U, "result", exit);
- const auto first = new LoadInst(valueType, statePtr, "first", block);
- const auto enter = SwitchInst::Create(first, loop, 2U, block);
- enter->addCase(GetInvalid(context), init);
- enter->addCase(GetConstant(ui64(EState::Skip), context), pass);
- {
- const auto next = BasicBlock::Create(context, "next", ctx.Func);
- block = init;
- const auto item = GetNodeValue(Flow, ctx, block);
- result->addIncoming(item, block);
- BranchInst::Create(exit, next, IsSpecial(item, block, context), block);
- block = next;
- new StoreInst(GetConstant(ui64(EState::Next), context), statePtr, block);
- codegenItemArg->CreateSetValue(ctx, block, item);
- const auto key = GetNodeValue(Key, ctx, block);
- codegenKeyArg->CreateSetValue(ctx, block, key);
- BranchInst::Create(loop, block);
- }
- {
- const auto part = BasicBlock::Create(context, "part", ctx.Func);
- const auto good = BasicBlock::Create(context, "good", ctx.Func);
- const auto step = BasicBlock::Create(context, "step", ctx.Func);
- const auto skip = BasicBlock::Create(context, "skip", ctx.Func);
- block = loop;
- const auto item = GetNodeValue(Output, ctx, block);
- const auto state = new LoadInst(valueType, statePtr, "state", block);
- result->addIncoming(item, block);
- BranchInst::Create(part, exit, IsFinish(item, block, context), block);
- block = part;
- codegenInput->CreateInvalidate(ctx, block);
- result->addIncoming(GetFinish(context), block);
- const auto choise = SwitchInst::Create(state, exit, 3U, block);
- choise->addCase(GetConstant(ui64(EState::Next), context), pass);
- choise->addCase(GetConstant(ui64(EState::Work), context), pass);
- choise->addCase(GetConstant(ui64(EState::Chop), context), step);
- block = pass;
- const auto next = GetNodeValue(Flow, ctx, block);
- result->addIncoming(next, block);
- const auto way = SwitchInst::Create(next, good, 2U, block);
- way->addCase(GetFinish(context), exit);
- way->addCase(GetYield(context), skip);
- block = good;
- codegenItemArg->CreateSetValue(ctx, block, next);
- const auto chop = GetNodeValue(Chop, ctx, block);
- const auto cast = CastInst::Create(Instruction::Trunc, chop, Type::getInt1Ty(context), "bool", block);
- BranchInst::Create(step, pass, cast, block);
- block = step;
- new StoreInst(GetConstant(ui64(EState::Next), context), statePtr, block);
- const auto key = GetNodeValue(Key, ctx, block);
- codegenKeyArg->CreateSetValue(ctx, block, key);
- BranchInst::Create(loop, block);
- block = skip;
- new StoreInst(GetConstant(ui64(EState::Skip), context), statePtr, block);
- result->addIncoming(next, block);
- BranchInst::Create(exit, block);
- }
- block = exit;
- return result;
- }
- #endif
- private:
- void RegisterDependencies() const final {
- if (const auto flow = FlowDependsOn(Flow)) {
- Own(flow, ItemArg);
- DependsOn(flow, Key);
- Own(flow, KeyArg);
- DependsOn(flow, Chop);
- Own(flow, Input);
- DependsOn(flow, Output);
- }
- }
- IComputationNode *const Flow;
- IComputationExternalNode *const ItemArg;
- IComputationNode *const Key;
- IComputationExternalNode *const KeyArg;
- IComputationNode *const Chop;
- IComputationExternalNode *const Input;
- IComputationNode *const Output;
- };
- class TChopperWrapper : public TCustomValueCodegeneratorNode<TChopperWrapper> {
- typedef TCustomValueCodegeneratorNode<TChopperWrapper> TBaseComputation;
- private:
- enum class EState : ui8 {
- Init,
- Work,
- Chop,
- Next,
- Skip,
- };
- using TStatePtr = std::shared_ptr<EState>;
- class TSubStream : public TComputationValue<TSubStream> {
- public:
- using TBase = TComputationValue<TSubStream>;
- TSubStream(TMemoryUsageInfo* memInfo, const TStatePtr& state, const NUdf::TUnboxedValue& stream, IComputationExternalNode* itemArg, IComputationNode* chop, TComputationContext& ctx)
- : TBase(memInfo), State(state), Stream(stream)
- , ItemArg(itemArg)
- , Chop(chop)
- , Ctx(ctx)
- {}
- private:
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
- auto& state = *State;
- if (EState::Next == state) {
- state = EState::Work;
- result = ItemArg->GetValue(Ctx);
- return NUdf::EFetchStatus::Ok;
- }
- while (true) {
- switch (const auto status = Stream.Fetch(result)) {
- case NUdf::EFetchStatus::Ok: {
- ItemArg->SetValue(Ctx, NUdf::TUnboxedValue(result));
- if (Chop->GetValue(Ctx).Get<bool>()) {
- state = EState::Chop;
- return NUdf::EFetchStatus::Finish;
- }
- return status;
- }
- case NUdf::EFetchStatus::Finish:
- case NUdf::EFetchStatus::Yield:
- return status;
- }
- }
- }
- const TStatePtr State;
- const NUdf::TUnboxedValue Stream;
- IComputationExternalNode *const ItemArg;
- IComputationNode *const Chop;
- TComputationContext& Ctx;
- };
- class TMainStream : public TComputationValue<TMainStream> {
- public:
- TMainStream(TMemoryUsageInfo* memInfo, TStatePtr&& state, NUdf::TUnboxedValue&& stream, const IComputationExternalNode *itemArg, const IComputationNode *key, const IComputationExternalNode *keyArg, const IComputationNode *chop, const IComputationExternalNode *input, const IComputationNode *output, TComputationContext& ctx)
- : TComputationValue(memInfo), State(std::move(state)), ItemArg(itemArg), Key(key), Chop(chop), KeyArg(keyArg), Input(input), Output(output), InputStream(std::move(stream)), Ctx(ctx)
- {}
- private:
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
- while (true) {
- if (Stream) {
- if (const auto status = Stream.Fetch(result); NUdf::EFetchStatus::Finish != status) {
- return status;
- }
- Stream = NUdf::TUnboxedValuePod();
- Input->InvalidateValue(Ctx);
- }
- switch (auto& state = *State) {
- case EState::Init:
- if (const auto status = InputStream.Fetch(ItemArg->RefValue(Ctx)); NUdf::EFetchStatus::Ok != status) {
- return status;
- }
- state = EState::Next;
- KeyArg->SetValue(Ctx, Key->GetValue(Ctx));
- break;
- case EState::Work:
- case EState::Next:
- case EState::Skip:
- do switch (const auto status = InputStream.Fetch(ItemArg->RefValue(Ctx))) {
- case NUdf::EFetchStatus::Ok:
- break;
- case NUdf::EFetchStatus::Yield:
- state = EState::Skip;
- case NUdf::EFetchStatus::Finish:
- return status;
- } while (!Chop->GetValue(Ctx).Get<bool>());
- [[fallthrough]];
- case EState::Chop:
- state = EState::Next;
- KeyArg->SetValue(Ctx, Key->GetValue(Ctx));
- break;
- }
- Stream = Output->GetValue(Ctx);
- }
- }
- const TStatePtr State;
- const IComputationExternalNode *const ItemArg;
- const IComputationNode* Key;
- const IComputationNode* Chop;
- const IComputationExternalNode* KeyArg;
- const IComputationExternalNode* Input;
- const IComputationNode* Output;
- const NUdf::TUnboxedValue InputStream;
- NUdf::TUnboxedValue Stream;
- TComputationContext& Ctx;
- };
- #ifndef MKQL_DISABLE_CODEGEN
- class TCodegenInput : public TComputationValue<TCodegenInput> {
- public:
- using TBase = TComputationValue<TCodegenInput>;
- using TFetchPtr = NUdf::EFetchStatus (*)(TComputationContext*, NUdf::TUnboxedValuePod, EState&, NUdf::TUnboxedValuePod&);
- TCodegenInput(TMemoryUsageInfo* memInfo, TFetchPtr fetch, const NUdf::TUnboxedValue& stream, TComputationContext* ctx, const TStatePtr& init)
- : TBase(memInfo)
- , FetchFunc(fetch)
- , Stream(stream)
- , Ctx(ctx)
- , State(init)
- {}
- protected:
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
- return FetchFunc(Ctx, static_cast<const NUdf::TUnboxedValuePod&>(Stream), *State, result);
- }
- const TFetchPtr FetchFunc;
- const NUdf::TUnboxedValue Stream;
- TComputationContext* const Ctx;
- const TStatePtr State;
- };
- class TCodegenOutput : public TComputationValue<TCodegenOutput> {
- public:
- using TBase = TComputationValue<TCodegenOutput>;
- using TFetchPtr = NUdf::EFetchStatus (*)(TComputationContext*, NUdf::TUnboxedValuePod&, EState&, NUdf::TUnboxedValuePod, NUdf::TUnboxedValuePod&);
- TCodegenOutput(TMemoryUsageInfo* memInfo, TFetchPtr fetch, TComputationContext* ctx, TStatePtr&& init, NUdf::TUnboxedValue&& input)
- : TBase(memInfo)
- , FetchFunc(fetch)
- , Ctx(ctx)
- , State(std::move(init))
- , InputStream(std::move(input))
- {}
- protected:
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
- return FetchFunc(Ctx, Stream, *State, InputStream, result);
- }
- const TFetchPtr FetchFunc;
- TComputationContext* const Ctx;
- const TStatePtr State;
- const NUdf::TUnboxedValue InputStream;
- NUdf::TUnboxedValue Stream;
- };
- #endif
- public:
- TChopperWrapper(TComputationMutables& mutables, IComputationNode* stream, IComputationExternalNode* itemArg, IComputationNode* key, IComputationExternalNode* keyArg, IComputationNode* chop, IComputationExternalNode* input, IComputationNode* output)
- : TBaseComputation(mutables)
- , Stream(stream)
- , ItemArg(itemArg)
- , Key(key)
- , KeyArg(keyArg)
- , Chop(chop)
- , Input(input)
- , Output(output)
- {}
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto sharedState = std::allocate_shared<EState, TMKQLAllocator<EState>>(TMKQLAllocator<EState>(), EState::Init);
- auto stream = Stream->GetValue(ctx);
- #ifndef MKQL_DISABLE_CODEGEN
- if (ctx.ExecuteLLVM && InputPtr)
- Input->SetValue(ctx, ctx.HolderFactory.Create<TCodegenInput>(InputPtr, stream, &ctx, sharedState));
- else
- #endif
- Input->SetValue(ctx, ctx.HolderFactory.Create<TSubStream>(sharedState, stream, ItemArg, Chop, ctx));
- #ifndef MKQL_DISABLE_CODEGEN
- if (ctx.ExecuteLLVM && OutputPtr)
- return ctx.HolderFactory.Create<TCodegenOutput>(OutputPtr, &ctx, std::move(sharedState), std::move(stream));
- #endif
- return ctx.HolderFactory.Create<TMainStream>(std::move(sharedState), std::move(stream), ItemArg, Key, KeyArg, Chop, Input, Output, ctx);
- }
- private:
- void RegisterDependencies() const final {
- DependsOn(Stream);
- Own(ItemArg);
- DependsOn(Key);
- Own(KeyArg);
- DependsOn(Chop);
- Own(Input);
- DependsOn(Output);
- }
- #ifndef MKQL_DISABLE_CODEGEN
- void GenerateFunctions(NYql::NCodegen::ICodegen& codegen) final {
- InputFunc = GenerateInput(codegen);
- OutputFunc = GenerateOutput(codegen);
- codegen.ExportSymbol(InputFunc);
- codegen.ExportSymbol(OutputFunc);
- }
- void FinalizeFunctions(NYql::NCodegen::ICodegen& codegen) final {
- if (InputFunc)
- InputPtr = reinterpret_cast<TInputPtr>(codegen.GetPointerToFunction(InputFunc));
- if (OutputFunc)
- OutputPtr = reinterpret_cast<TOutputPtr>(codegen.GetPointerToFunction(OutputFunc));
- }
- Function* GenerateInput(NYql::NCodegen::ICodegen& codegen) const {
- auto& module = codegen.GetModule();
- auto& context = codegen.GetContext();
- const auto& name = MakeName("Input");
- if (const auto f = module.getFunction(name.c_str()))
- return f;
- const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(ItemArg);
- const auto codegenKeyArg = dynamic_cast<ICodegeneratorExternalNode*>(KeyArg);
- MKQL_ENSURE(codegenItemArg, "Item arg must be codegenerator node.");
- MKQL_ENSURE(codegenKeyArg, "Key arg must be codegenerator node.");
- const auto valueType = Type::getInt128Ty(context);
- const auto containerType = static_cast<Type*>(valueType);
- const auto contextType = GetCompContextType(context);
- const auto statusType = Type::getInt32Ty(context);
- const auto stateType = Type::getInt8Ty(context);
- const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), containerType, PointerType::getUnqual(stateType), PointerType::getUnqual(valueType)}, false);
- TCodegenContext ctx(codegen);
- ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
- DISubprogramAnnotator annotator(ctx, ctx.Func);
- auto args = ctx.Func->arg_begin();
- ctx.Ctx = &*args;
- const auto containerArg = &*++args;
- const auto stateArg = &*++args;
- const auto valuePtr = &*++args;
- const auto main = BasicBlock::Create(context, "main", ctx.Func);
- const auto load = BasicBlock::Create(context, "load", ctx.Func);
- const auto work = BasicBlock::Create(context, "work", ctx.Func);
- auto block = main;
- const auto container = static_cast<Value*>(containerArg);
- const auto first = new LoadInst(stateType, stateArg, "first", block);
- const auto reload = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, first, ConstantInt::get(stateType, ui8(EState::Next)), "reload", block);
- BranchInst::Create(load, work, reload, block);
- {
- block = load;
- new StoreInst(ConstantInt::get(stateType, ui8(EState::Work)), stateArg, block);
- SafeUnRefUnboxedOne(valuePtr, ctx, block);
- GetNodeValue(valuePtr, ItemArg, ctx, block);
- ReturnInst::Create(context, ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Ok)), block);
- }
- {
- const auto good = BasicBlock::Create(context, "good", ctx.Func);
- const auto step = BasicBlock::Create(context, "step", ctx.Func);
- const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
- const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
- block = work;
- const auto itemPtr = codegenItemArg->CreateRefValue(ctx, block);
- const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, container, codegen, block, itemPtr);
- const auto none = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_NE, status, ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Ok)), "none", block);
- BranchInst::Create(exit, good, none, block);
- block = good;
- const auto chop = GetNodeValue(Chop, ctx, block);
- const auto cast = CastInst::Create(Instruction::Trunc, chop, Type::getInt1Ty(context), "bool", block);
- BranchInst::Create(step, pass, cast, block);
- block = step;
- new StoreInst(ConstantInt::get(stateType, ui8(EState::Chop)), stateArg, block);
- ReturnInst::Create(context, ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Finish)), block);
- block = pass;
- SafeUnRefUnboxedOne(valuePtr, ctx, block);
- GetNodeValue(valuePtr, ItemArg, ctx, block);
- BranchInst::Create(exit, block);
- block = exit;
- ReturnInst::Create(context, status, block);
- }
- return ctx.Func;
- }
- Function* GenerateOutput(NYql::NCodegen::ICodegen& codegen) const {
- auto& module = codegen.GetModule();
- auto& context = codegen.GetContext();
- const auto& name = MakeName("Output");
- if (const auto f = module.getFunction(name.c_str()))
- return f;
- const auto codegenInput = dynamic_cast<ICodegeneratorExternalNode*>(Input);
- const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(ItemArg);
- const auto codegenKeyArg = dynamic_cast<ICodegeneratorExternalNode*>(KeyArg);
- MKQL_ENSURE(codegenItemArg, "Item arg must be codegenerator node.");
- MKQL_ENSURE(codegenKeyArg, "Key arg must be codegenerator node.");
- MKQL_ENSURE(codegenInput, "Input arg must be codegenerator node.");
- const auto valueType = Type::getInt128Ty(context);
- const auto containerType = static_cast<Type*>(valueType);
- const auto contextType = GetCompContextType(context);
- const auto statusType = Type::getInt32Ty(context);
- const auto stateType = Type::getInt8Ty(context);
- const auto funcType = FunctionType::get(statusType, {PointerType::getUnqual(contextType), PointerType::getUnqual(valueType), PointerType::getUnqual(stateType), containerType, PointerType::getUnqual(valueType)}, false);
- TCodegenContext ctx(codegen);
- ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
- DISubprogramAnnotator annotator(ctx, ctx.Func);
- auto args = ctx.Func->arg_begin();
- ctx.Ctx = &*args;
- const auto streamArg = &*++args;
- const auto stateArg = &*++args;
- const auto inputArg = &*++args;
- const auto valuePtr = &*++args;
- const auto main = BasicBlock::Create(context, "main", ctx.Func);
- const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
- const auto work = BasicBlock::Create(context, "work", ctx.Func);
- const auto next = BasicBlock::Create(context, "next", ctx.Func);
- const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
- const auto skip = BasicBlock::Create(context, "skip", ctx.Func);
- const auto pull = BasicBlock::Create(context, "pull", ctx.Func);
- const auto init = BasicBlock::Create(context, "init", ctx.Func);
- auto block = main;
- const auto input = static_cast<Value*>(inputArg);
- BranchInst::Create(loop, block);
- block = loop;
- const auto stream = new LoadInst(valueType, streamArg, "stream", block);
- BranchInst::Create(next, work, IsEmpty(stream, block, context), block);
- {
- const auto good = BasicBlock::Create(context, "good", ctx.Func);
- const auto step = BasicBlock::Create(context, "step", ctx.Func);
- block = work;
- const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, stream, codegen, block, valuePtr);
- const auto icmp = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_NE, status, ConstantInt::get(status->getType(), static_cast<ui32>(NUdf::EFetchStatus::Finish)), "cond", block);
- BranchInst::Create(good, step, icmp, block);
- block = good;
- ReturnInst::Create(context, status, block);
- block = step;
- UnRefBoxed(stream, ctx, block);
- new StoreInst(ConstantInt::get(stream->getType(), 0), streamArg, block);
- codegenInput->CreateInvalidate(ctx, block);
- BranchInst::Create(next, block);
- }
- block = next;
- const auto state = new LoadInst(stateType, stateArg, "state", block);
- const auto choise = SwitchInst::Create(state, skip, 2U, block);
- choise->addCase(ConstantInt::get(stateType, ui8(EState::Init)), init);
- choise->addCase(ConstantInt::get(stateType, ui8(EState::Chop)), pass);
- {
- const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
- block = init;
- const auto itemPtr = codegenItemArg->CreateRefValue(ctx, block);
- const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, input, codegen, block, itemPtr);
- const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_NE, status, ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Ok)), "special", block);
- BranchInst::Create(exit, pass, special, block);
- block = exit;
- ReturnInst::Create(context, status, block);
- }
- {
- const auto test = BasicBlock::Create(context, "test", ctx.Func);
- const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
- const auto done = BasicBlock::Create(context, "done", ctx.Func);
- block = skip;
- const auto itemPtr = codegenItemArg->CreateRefValue(ctx, block);
- const auto status = CallBoxedValueVirtualMethod<NUdf::TBoxedValueAccessor::EMethod::Fetch>(statusType, input, codegen, block, itemPtr);
- const auto way = SwitchInst::Create(status, test, 2U, block);
- way->addCase(ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Yield)), exit);
- way->addCase(ConstantInt::get(statusType, ui32(NUdf::EFetchStatus::Finish)), done);
- block = exit;
- new StoreInst(ConstantInt::get(stateType, ui8(EState::Skip)), stateArg, block);
- BranchInst::Create(done, block);
- block = done;
- ReturnInst::Create(context, status, block);
- block = test;
- const auto chop = GetNodeValue(Chop, ctx, block);
- const auto cast = CastInst::Create(Instruction::Trunc, chop, Type::getInt1Ty(context), "bool", block);
- BranchInst::Create(pass, skip, cast, block);
- }
- block = pass;
- new StoreInst(ConstantInt::get(stateType, ui8(EState::Next)), stateArg, block);
- const auto key = GetNodeValue(Key, ctx, block);
- codegenKeyArg->CreateSetValue(ctx, block, key);
- BranchInst::Create(pull, block);
- block = pull;
- GetNodeValue(streamArg, Output, ctx, block);
- BranchInst::Create(loop, block);
- return ctx.Func;
- }
- using TInputPtr = typename TCodegenInput::TFetchPtr;
- using TOutputPtr = typename TCodegenOutput::TFetchPtr;
- Function* InputFunc = nullptr;
- Function* OutputFunc = nullptr;
- TInputPtr InputPtr = nullptr;
- TOutputPtr OutputPtr = nullptr;
- #endif
- IComputationNode *const Stream;
- IComputationExternalNode *const ItemArg;
- IComputationNode *const Key;
- IComputationExternalNode *const KeyArg;
- IComputationNode *const Chop;
- IComputationExternalNode *const Input;
- IComputationNode *const Output;
- };
- }
- IComputationNode* WrapChopper(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 7U, "Expected seven args.");
- const auto type = callable.GetType()->GetReturnType();
- const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
- const auto keyResult = LocateNode(ctx.NodeLocator, callable, 2);
- const auto switchResult = LocateNode(ctx.NodeLocator, callable, 4);
- const auto output = LocateNode(ctx.NodeLocator, callable, 6);
- const auto itemArg = LocateExternalNode(ctx.NodeLocator, callable, 1);
- const auto keyArg = LocateExternalNode(ctx.NodeLocator, callable, 3);
- const auto input = LocateExternalNode(ctx.NodeLocator, callable, 5);
- if (type->IsFlow()) {
- const auto kind = GetValueRepresentation(AS_TYPE(TFlowType, type)->GetItemType());
- return new TChopperFlowWrapper(ctx.Mutables, kind, stream, itemArg, keyResult, keyArg, switchResult, input, output);
- } else if (type->IsStream()) {
- return new TChopperWrapper(ctx.Mutables, stream, itemArg, keyResult, keyArg, switchResult, input, output);
- }
- THROW yexception() << "Expected flow or stream.";
- }
- }
- }
|