mkql_source.cpp 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. #include "mkql_source.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  4. #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
  5. #include <yql/essentials/minikql/mkql_node_cast.h>
  6. namespace NKikimr {
  7. namespace NMiniKQL {
  8. namespace {
  9. class TSourceOfWrapper : public TMutableComputationNode<TSourceOfWrapper> {
  10. typedef TMutableComputationNode<TSourceOfWrapper> TBaseComputation;
  11. private:
  12. class TValue : public TComputationValue<TValue> {
  13. public:
  14. TValue(TMemoryUsageInfo* memInfo)
  15. : TComputationValue<TValue>(memInfo)
  16. {}
  17. private:
  18. ui32 GetTraverseCount() const override { return 0U; }
  19. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  20. result = NUdf::TUnboxedValuePod();
  21. return NUdf::EFetchStatus::Ok;
  22. }
  23. };
  24. public:
  25. TSourceOfWrapper(TComputationMutables& mutables)
  26. : TBaseComputation(mutables)
  27. {}
  28. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  29. return ctx.HolderFactory.Create<TValue>();
  30. }
  31. private:
  32. void RegisterDependencies() const final {}
  33. };
  34. class TSourceWrapper : public TStatelessWideFlowCodegeneratorNode<TSourceWrapper> {
  35. using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TSourceWrapper>;
  36. public:
  37. TSourceWrapper()
  38. : TStatelessWideFlowCodegeneratorNode<TSourceWrapper>(nullptr)
  39. {}
  40. EFetchResult DoCalculate(TComputationContext&, NUdf::TUnboxedValue*const*) const {
  41. return EFetchResult::One;
  42. }
  43. #ifndef MKQL_DISABLE_CODEGEN
  44. TGenerateResult DoGenGetValues(const TCodegenContext& ctx, BasicBlock*&) const {
  45. return {ConstantInt::get(Type::getInt32Ty(ctx.Codegen.GetContext()), static_cast<i32>(EFetchResult::One)), {}};
  46. }
  47. #endif
  48. private:
  49. void RegisterDependencies() const final {}
  50. };
  51. }
  52. IComputationNode* WrapSourceOf(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  53. MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args.");
  54. const auto type = callable.GetType()->GetReturnType();
  55. if (type->IsFlow()) {
  56. return ctx.NodeFactory.CreateImmutableNode(NUdf::TUnboxedValuePod());
  57. } else if (type->IsStream()) {
  58. return new TSourceOfWrapper(ctx.Mutables);
  59. }
  60. THROW yexception() << "Expected flow or stream.";
  61. }
  62. IComputationNode* WrapSource(TCallable& callable, const TComputationNodeFactoryContext&) {
  63. MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args.");
  64. MKQL_ENSURE(!GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())), "Expected zero width of output flow.");
  65. return new TSourceWrapper;
  66. }
  67. }
  68. }