1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- #include "mkql_source.h"
- #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
- #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
- #include <yql/essentials/minikql/mkql_node_cast.h>
- namespace NKikimr {
- namespace NMiniKQL {
- namespace {
- class TSourceOfWrapper : public TMutableComputationNode<TSourceOfWrapper> {
- typedef TMutableComputationNode<TSourceOfWrapper> TBaseComputation;
- private:
- class TValue : public TComputationValue<TValue> {
- public:
- TValue(TMemoryUsageInfo* memInfo)
- : TComputationValue<TValue>(memInfo)
- {}
- private:
- ui32 GetTraverseCount() const override { return 0U; }
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
- result = NUdf::TUnboxedValuePod();
- return NUdf::EFetchStatus::Ok;
- }
- };
- public:
- TSourceOfWrapper(TComputationMutables& mutables)
- : TBaseComputation(mutables)
- {}
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- return ctx.HolderFactory.Create<TValue>();
- }
- private:
- void RegisterDependencies() const final {}
- };
- class TSourceWrapper : public TStatelessWideFlowCodegeneratorNode<TSourceWrapper> {
- using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TSourceWrapper>;
- public:
- TSourceWrapper()
- : TStatelessWideFlowCodegeneratorNode<TSourceWrapper>(nullptr)
- {}
- EFetchResult DoCalculate(TComputationContext&, NUdf::TUnboxedValue*const*) const {
- return EFetchResult::One;
- }
- #ifndef MKQL_DISABLE_CODEGEN
- TGenerateResult DoGenGetValues(const TCodegenContext& ctx, BasicBlock*&) const {
- return {ConstantInt::get(Type::getInt32Ty(ctx.Codegen.GetContext()), static_cast<i32>(EFetchResult::One)), {}};
- }
- #endif
- private:
- void RegisterDependencies() const final {}
- };
- }
- IComputationNode* WrapSourceOf(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args.");
- const auto type = callable.GetType()->GetReturnType();
- if (type->IsFlow()) {
- return ctx.NodeFactory.CreateImmutableNode(NUdf::TUnboxedValuePod());
- } else if (type->IsStream()) {
- return new TSourceOfWrapper(ctx.Mutables);
- }
- THROW yexception() << "Expected flow or stream.";
- }
- IComputationNode* WrapSource(TCallable& callable, const TComputationNodeFactoryContext&) {
- MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args.");
- MKQL_ENSURE(!GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())), "Expected zero width of output flow.");
- return new TSourceWrapper;
- }
- }
- }
|