#include #include #include #include #include using namespace NYql::NPureCalc; namespace { class TStatelessInputSpec : public TInputSpecBase { public: TStatelessInputSpec() : Schemas_({NYT::TNode::CreateList() .Add("StructType") .Add(NYT::TNode::CreateList() .Add(NYT::TNode::CreateList() .Add("InputValue") .Add(NYT::TNode::CreateList() .Add("DataType") .Add("Utf8") ) ) ) }) {}; const TVector& GetSchemas() const override { return Schemas_; } private: const TVector Schemas_; }; class TStatelessInputConsumer : public IConsumer { public: TStatelessInputConsumer(TWorkerHolder worker) : Worker_(std::move(worker)) {} void OnObject(const NYql::NUdf::TUnboxedValue& value) override { with_lock (Worker_->GetScopedAlloc()) { NYql::NUdf::TUnboxedValue* items = nullptr; NYql::NUdf::TUnboxedValue result = Worker_->GetGraph().GetHolderFactory().CreateDirectArrayHolder(1, items); items[0] = value; Worker_->Push(std::move(result)); // Clear graph after each object because // values allocated on another allocator and should be released Worker_->Invalidate(); } } void OnFinish() override { with_lock(Worker_->GetScopedAlloc()) { Worker_->OnFinish(); } } private: TWorkerHolder Worker_; }; class TStatelessConsumer : public IConsumer { const TString ExpectedData_; const ui64 ExpectedRows_; ui64 RowId_ = 0; public: TStatelessConsumer(const TString& expectedData, ui64 expectedRows) : ExpectedData_(expectedData) , ExpectedRows_(expectedRows) {} void OnObject(NPureCalcProto::TStringMessage* message) override { UNIT_ASSERT_VALUES_EQUAL_C(ExpectedData_, message->GetX(), RowId_); RowId_++; } void OnFinish() override { UNIT_ASSERT_VALUES_EQUAL(ExpectedRows_, RowId_); } }; } template <> struct TInputSpecTraits { static constexpr bool IsPartial = false; static constexpr bool SupportPushStreamMode = true; using TConsumerType = THolder>; static TConsumerType MakeConsumer(const TStatelessInputSpec&, TWorkerHolder worker) { return MakeHolder(std::move(worker)); } }; Y_UNIT_TEST_SUITE(TestMixedAllocators) { Y_UNIT_TEST(TestPushStream) { const auto targetString = "large string >= 14 bytes"; const auto factory = MakeProgramFactory(); const auto sql = TStringBuilder() << "SELECT InputValue AS X FROM Input WHERE InputValue = \"" << targetString << "\";"; const auto program = factory->MakePushStreamProgram( TStatelessInputSpec(), TProtobufOutputSpec(), sql ); const ui64 numberRows = 5; const auto inputConsumer = program->Apply(MakeHolder(targetString, numberRows)); NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false); const auto pushString = [&](TString inputValue) { NYql::NUdf::TUnboxedValue stringValue; with_lock(alloc) { stringValue = NKikimr::NMiniKQL::MakeString(inputValue); alloc.Ref().LockObject(stringValue); } inputConsumer->OnObject(stringValue); with_lock(alloc) { alloc.Ref().UnlockObject(stringValue); stringValue.Clear(); } }; for (ui64 i = 0; i < numberRows; ++i) { pushString(targetString); pushString("another large string >= 14 bytes"); } inputConsumer->OnFinish(); } }