test_mixed_allocators.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <yql/essentials/minikql/mkql_string_util.h>
  4. #include <yql/essentials/public/purecalc/io_specs/protobuf/spec.h>
  5. #include <yql/essentials/public/purecalc/ut/protos/test_structs.pb.h>
  6. using namespace NYql::NPureCalc;
  7. namespace {
  8. class TStatelessInputSpec : public TInputSpecBase {
  9. public:
  10. TStatelessInputSpec()
  11. : Schemas_({NYT::TNode::CreateList()
  12. .Add("StructType")
  13. .Add(NYT::TNode::CreateList()
  14. .Add(NYT::TNode::CreateList()
  15. .Add("InputValue")
  16. .Add(NYT::TNode::CreateList()
  17. .Add("DataType")
  18. .Add("Utf8")
  19. )
  20. )
  21. )
  22. })
  23. {};
  24. const TVector<NYT::TNode>& GetSchemas() const override {
  25. return Schemas_;
  26. }
  27. private:
  28. const TVector<NYT::TNode> Schemas_;
  29. };
  30. class TStatelessInputConsumer : public IConsumer<const NYql::NUdf::TUnboxedValue&> {
  31. public:
  32. TStatelessInputConsumer(TWorkerHolder<IPushStreamWorker> worker)
  33. : Worker_(std::move(worker))
  34. {}
  35. void OnObject(const NYql::NUdf::TUnboxedValue& value) override {
  36. with_lock (Worker_->GetScopedAlloc()) {
  37. NYql::NUdf::TUnboxedValue* items = nullptr;
  38. NYql::NUdf::TUnboxedValue result = Worker_->GetGraph().GetHolderFactory().CreateDirectArrayHolder(1, items);
  39. items[0] = value;
  40. Worker_->Push(std::move(result));
  41. // Clear graph after each object because
  42. // values allocated on another allocator and should be released
  43. Worker_->Invalidate();
  44. }
  45. }
  46. void OnFinish() override {
  47. with_lock(Worker_->GetScopedAlloc()) {
  48. Worker_->OnFinish();
  49. }
  50. }
  51. private:
  52. TWorkerHolder<IPushStreamWorker> Worker_;
  53. };
  54. class TStatelessConsumer : public IConsumer<NPureCalcProto::TStringMessage*> {
  55. const TString ExpectedData_;
  56. const ui64 ExpectedRows_;
  57. ui64 RowId_ = 0;
  58. public:
  59. TStatelessConsumer(const TString& expectedData, ui64 expectedRows)
  60. : ExpectedData_(expectedData)
  61. , ExpectedRows_(expectedRows)
  62. {}
  63. void OnObject(NPureCalcProto::TStringMessage* message) override {
  64. UNIT_ASSERT_VALUES_EQUAL_C(ExpectedData_, message->GetX(), RowId_);
  65. RowId_++;
  66. }
  67. void OnFinish() override {
  68. UNIT_ASSERT_VALUES_EQUAL(ExpectedRows_, RowId_);
  69. }
  70. };
  71. }
  72. template <>
  73. struct TInputSpecTraits<TStatelessInputSpec> {
  74. static constexpr bool IsPartial = false;
  75. static constexpr bool SupportPushStreamMode = true;
  76. using TConsumerType = THolder<IConsumer<const NYql::NUdf::TUnboxedValue&>>;
  77. static TConsumerType MakeConsumer(const TStatelessInputSpec&, TWorkerHolder<IPushStreamWorker> worker) {
  78. return MakeHolder<TStatelessInputConsumer>(std::move(worker));
  79. }
  80. };
  81. Y_UNIT_TEST_SUITE(TestMixedAllocators) {
  82. Y_UNIT_TEST(TestPushStream) {
  83. const auto targetString = "large string >= 14 bytes";
  84. const auto factory = MakeProgramFactory();
  85. const auto sql = TStringBuilder() << "SELECT InputValue AS X FROM Input WHERE InputValue = \"" << targetString << "\";";
  86. const auto program = factory->MakePushStreamProgram(
  87. TStatelessInputSpec(),
  88. TProtobufOutputSpec<NPureCalcProto::TStringMessage>(),
  89. sql
  90. );
  91. const ui64 numberRows = 5;
  92. const auto inputConsumer = program->Apply(MakeHolder<TStatelessConsumer>(targetString, numberRows));
  93. NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false);
  94. const auto pushString = [&](TString inputValue) {
  95. NYql::NUdf::TUnboxedValue stringValue;
  96. with_lock(alloc) {
  97. stringValue = NKikimr::NMiniKQL::MakeString(inputValue);
  98. alloc.Ref().LockObject(stringValue);
  99. }
  100. inputConsumer->OnObject(stringValue);
  101. with_lock(alloc) {
  102. alloc.Ref().UnlockObject(stringValue);
  103. stringValue.Clear();
  104. }
  105. };
  106. for (ui64 i = 0; i < numberRows; ++i) {
  107. pushString(targetString);
  108. pushString("another large string >= 14 bytes");
  109. }
  110. inputConsumer->OnFinish();
  111. }
  112. }