mkql_iterable.cpp 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. #include "mkql_replicate.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_program_builder.h>
  6. namespace NKikimr {
  7. namespace NMiniKQL {
  8. namespace {
  9. class TIterableWrapper : public TMutableComputationNode<TIterableWrapper> {
  10. typedef TMutableComputationNode<TIterableWrapper> TBaseComputation;
  11. public:
  12. class TValue : public TCustomListValue {
  13. public:
  14. class TIterator : public TComputationValue<TIterator> {
  15. public:
  16. TIterator(TMemoryUsageInfo* memInfo, const NUdf::TUnboxedValue& stream)
  17. : TComputationValue<TIterator>(memInfo)
  18. , Stream(stream)
  19. {}
  20. private:
  21. bool Next(NUdf::TUnboxedValue& value) override {
  22. auto status = Stream.Fetch(value);
  23. MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Yield is not supported");
  24. return status != NUdf::EFetchStatus::Finish;
  25. }
  26. bool Skip() override {
  27. NUdf::TUnboxedValue value;
  28. auto status = Stream.Fetch(value);
  29. MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Yield is not supported");
  30. return status != NUdf::EFetchStatus::Finish;
  31. }
  32. NUdf::TUnboxedValue Stream;
  33. };
  34. TValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx, IComputationNode* stream, IComputationExternalNode* arg)
  35. : TCustomListValue(memInfo)
  36. , Ctx(ctx)
  37. , Stream(stream)
  38. , Arg(arg)
  39. {
  40. }
  41. private:
  42. NUdf::TUnboxedValue GetListIterator() const override {
  43. auto stream = NewStream();
  44. return Ctx.HolderFactory.Create<TIterator>(stream);
  45. }
  46. bool HasFastListLength() const override {
  47. return Length.Defined();
  48. }
  49. ui64 GetListLength() const override {
  50. if (!Length) {
  51. auto stream = NewStream();
  52. NUdf::TUnboxedValue item;
  53. ui64 n = 0;
  54. for (;;) {
  55. auto status = stream.Fetch(item);
  56. MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Yield is not supported");
  57. if (status == NUdf::EFetchStatus::Finish) {
  58. break;
  59. }
  60. ++n;
  61. }
  62. Length = n;
  63. }
  64. return *Length;
  65. }
  66. ui64 GetEstimatedListLength() const override {
  67. return GetListLength();
  68. }
  69. bool HasListItems() const override {
  70. if (!HasItems) {
  71. if (Length) {
  72. HasItems = *Length > 0;
  73. } else {
  74. auto stream = NewStream();
  75. NUdf::TUnboxedValue item;
  76. auto status = stream.Fetch(item);
  77. MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Yield is not supported");
  78. HasItems = (status != NUdf::EFetchStatus::Finish);
  79. }
  80. }
  81. return *HasItems;
  82. }
  83. NUdf::TUnboxedValue NewStream() const {
  84. Arg->SetValue(Ctx, NUdf::TUnboxedValue());
  85. return Stream->GetValue(Ctx);
  86. }
  87. TComputationContext& Ctx;
  88. IComputationNode* const Stream;
  89. IComputationExternalNode* const Arg;
  90. mutable TMaybe<ui64> Length;
  91. mutable TMaybe<bool> HasItems;
  92. };
  93. TIterableWrapper(TComputationMutables& mutables, IComputationNode* stream, IComputationExternalNode* arg)
  94. : TBaseComputation(mutables)
  95. , Stream(stream)
  96. , Arg(arg)
  97. {
  98. }
  99. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  100. return ctx.HolderFactory.Create<TValue>(ctx, Stream, Arg);
  101. }
  102. private:
  103. void RegisterDependencies() const final {
  104. DependsOn(Stream);
  105. Arg->AddDependence(Stream);
  106. }
  107. IComputationNode* const Stream;
  108. IComputationExternalNode* const Arg;
  109. };
  110. }
  111. IComputationNode* WrapIterable(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  112. MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args");
  113. const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
  114. const auto arg = LocateExternalNode(ctx.NodeLocator, callable, 1);
  115. return new TIterableWrapper(ctx.Mutables, stream, arg);
  116. }
  117. }
  118. }