#include #include #include #include #include #include using namespace NYql::NPureCalc; namespace { class TStringMessageStreamImpl: public IStream { private: ui32 I_ = 0; NPureCalcProto::TStringMessage Message_{}; public: NPureCalcProto::TStringMessage* Fetch() override { if (I_ >= 3) { return nullptr; } else { Message_.SetX(ToString(I_)); ++I_; return &Message_; } } }; class TStringMessageConsumerImpl: public IConsumer { private: TVector* Buf_; public: TStringMessageConsumerImpl(TVector* buf) : Buf_(buf) { } public: void OnObject(NPureCalcProto::TStringMessage* t) override { Buf_->push_back(t->GetX()); } void OnFinish() override { } }; } Y_UNIT_TEST_SUITE(TestWorkerPool) { static TString sql = "SELECT 'abc'u || X AS X FROM Input"; static TVector expected{"abc0", "abc1", "abc2"}; void TestPullStreamImpl(bool useWorkerPool) { auto factory = MakeProgramFactory(TProgramFactoryOptions().SetUseWorkerPool(useWorkerPool)); auto program = factory->MakePullStreamProgram( TProtobufInputSpec(), TProtobufOutputSpec(), sql, ETranslationMode::SQL ); auto check = [](IStream* output) { TVector actual; while (auto *x = output->Fetch()) { actual.push_back(x->GetX()); } UNIT_ASSERT_VALUES_EQUAL(expected, actual); }; // Sequential use for (size_t i = 0; i < 2; ++i) { auto output = program->Apply(MakeHolder()); check(output.Get()); } // Parallel use { auto output1 = program->Apply(MakeHolder()); auto output2 = program->Apply(MakeHolder()); check(output1.Get()); check(output2.Get()); } } Y_UNIT_TEST(TestPullStreamUseWorkerPool) { TestPullStreamImpl(true); } Y_UNIT_TEST(TestPullStreamNoWorkerPool) { TestPullStreamImpl(false); } void TestPullListImpl(bool useWorkerPool) { auto factory = MakeProgramFactory(TProgramFactoryOptions().SetUseWorkerPool(useWorkerPool)); auto program = factory->MakePullListProgram( TProtobufInputSpec(), TProtobufOutputSpec(), sql, ETranslationMode::SQL ); auto check = [](IStream* output) { TVector actual; while (auto *x = output->Fetch()) { actual.push_back(x->GetX()); } UNIT_ASSERT_VALUES_EQUAL(expected, actual); }; // Sequential use for (size_t i = 0; i < 2; ++i) { auto output = program->Apply(MakeHolder()); check(output.Get()); } // Parallel use { auto output1 = program->Apply(MakeHolder()); auto output2 = program->Apply(MakeHolder()); check(output1.Get()); check(output2.Get()); } } Y_UNIT_TEST(TestPullListUseWorkerPool) { TestPullListImpl(true); } Y_UNIT_TEST(TestPullListNoWorkerPool) { TestPullListImpl(false); } void TestPushStreamImpl(bool useWorkerPool) { auto factory = MakeProgramFactory(TProgramFactoryOptions().SetUseWorkerPool(useWorkerPool)); auto program = factory->MakePushStreamProgram( TProtobufInputSpec(), TProtobufOutputSpec(), sql, ETranslationMode::SQL ); auto check = [](IConsumer* input, const TVector& result) { NPureCalcProto::TStringMessage message; for (auto s: {"0", "1", "2"}) { message.SetX(s); input->OnObject(&message); } input->OnFinish(); UNIT_ASSERT_VALUES_EQUAL(expected, result); }; // Sequential use for (size_t i = 0; i < 2; ++i) { TVector actual; auto input = program->Apply(MakeHolder(&actual)); check(input.Get(), actual); } // Parallel use { TVector actual1; auto input1 = program->Apply(MakeHolder(&actual1)); TVector actual2; auto input2 = program->Apply(MakeHolder(&actual2)); check(input1.Get(), actual1); check(input2.Get(), actual2); } } Y_UNIT_TEST(TestPushStreamUseWorkerPool) { TestPushStreamImpl(true); } Y_UNIT_TEST(TestPushStreamNoWorkerPool) { TestPushStreamImpl(false); } }