123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- #include <library/cpp/testing/unittest/registar.h>
- #include <yql/essentials/public/purecalc/purecalc.h>
- #include <yql/essentials/public/purecalc/io_specs/protobuf/spec.h>
- #include <yql/essentials/public/purecalc/ut/protos/test_structs.pb.h>
- #include <yql/essentials/public/udf/udf_counter.h>
- #include <yql/essentials/public/udf/udf_type_builder.h>
- #include <library/cpp/testing/unittest/registar.h>
- class TMyModule : public NKikimr::NUdf::IUdfModule {
- public:
- class TFunc : public NKikimr::NUdf::TBoxedValue {
- public:
- TFunc(NKikimr::NUdf::TCounter counter, NKikimr::NUdf::TScopedProbe scopedProbe)
- : Counter_(counter)
- , ScopedProbe_(scopedProbe)
- {}
- NKikimr::NUdf::TUnboxedValue Run(const NKikimr::NUdf::IValueBuilder* valueBuilder, const NKikimr::NUdf::TUnboxedValuePod* args) const override {
- Y_UNUSED(valueBuilder);
- with_lock(ScopedProbe_) {
- Counter_.Inc();
- return NKikimr::NUdf::TUnboxedValuePod(args[0].Get<i32>());
- }
- }
- private:
- mutable NKikimr::NUdf::TCounter Counter_;
- mutable NKikimr::NUdf::TScopedProbe ScopedProbe_;
- };
- void GetAllFunctions(NKikimr::NUdf::IFunctionsSink& sink) const override {
- Y_UNUSED(sink);
- }
- void BuildFunctionTypeInfo(
- const NKikimr::NUdf::TStringRef& name,
- NKikimr::NUdf::TType* userType,
- const NKikimr::NUdf::TStringRef& typeConfig,
- ui32 flags,
- NKikimr::NUdf::IFunctionTypeInfoBuilder& builder) const override {
- Y_UNUSED(userType);
- Y_UNUSED(typeConfig);
- Y_UNUSED(flags);
- if (name == NKikimr::NUdf::TStringRef::Of("Func")) {
- builder.SimpleSignature<i32(i32)>();
- builder.Implementation(new TFunc(
- builder.GetCounter("FuncCalls",true),
- builder.GetScopedProbe("FuncTime")
- ));
- }
- }
- void CleanupOnTerminate() const override {
- }
- };
- class TMyCountersProvider : public NKikimr::NUdf::ICountersProvider, public NKikimr::NUdf::IScopedProbeHost {
- public:
- TMyCountersProvider(i64* calls, TString* log)
- : Calls_(calls)
- , Log_(log)
- {}
- NKikimr::NUdf::TCounter GetCounter(const NKikimr::NUdf::TStringRef& module, const NKikimr::NUdf::TStringRef& name, bool deriv) override {
- UNIT_ASSERT_VALUES_EQUAL(module, "MyModule");
- UNIT_ASSERT_VALUES_EQUAL(name, "FuncCalls");
- UNIT_ASSERT_VALUES_EQUAL(deriv, true);
- return NKikimr::NUdf::TCounter(Calls_);
- }
- NKikimr::NUdf::TScopedProbe GetScopedProbe(const NKikimr::NUdf::TStringRef& module, const NKikimr::NUdf::TStringRef& name) override {
- UNIT_ASSERT_VALUES_EQUAL(module, "MyModule");
- UNIT_ASSERT_VALUES_EQUAL(name, "FuncTime");
- return NKikimr::NUdf::TScopedProbe(Log_ ? this : nullptr, Log_);
- }
- void Acquire(void* cookie) override {
- UNIT_ASSERT(cookie == Log_);
- *Log_ += "Enter\n";
- }
- void Release(void* cookie) override {
- UNIT_ASSERT(cookie == Log_);
- *Log_ += "Exit\n";
- }
- private:
- i64* Calls_;
- TString* Log_;
- };
- namespace NPureCalcProto {
- class TUnparsed;
- class TParsed;
- }
- class TDocInput : public NYql::NPureCalc::IStream<NPureCalcProto::TUnparsed*> {
- public:
- NPureCalcProto::TUnparsed* Fetch() override {
- if (Extracted) {
- return nullptr;
- }
- Extracted = true;
- Msg.SetS("foo");
- return &Msg;
- }
- public:
- NPureCalcProto::TUnparsed Msg;
- bool Extracted = false;
- };
- Y_UNIT_TEST_SUITE(TestUdf) {
- Y_UNIT_TEST(TestCounters) {
- using namespace NYql::NPureCalc;
- auto factory = MakeProgramFactory();
- i64 callCounter = 0;
- TMyCountersProvider myCountersProvider(&callCounter, nullptr);
- factory->AddUdfModule("MyModule", new TMyModule);
- factory->SetCountersProvider(&myCountersProvider);
- auto program = factory->MakePullStreamProgram(
- TProtobufInputSpec<NPureCalcProto::TUnparsed>(),
- TProtobufOutputSpec<NPureCalcProto::TParsed>(),
- "select MyModule::Func(1) as A, 2 as B, 3 as C from Input",
- ETranslationMode::SQL);
- auto out = program->Apply(MakeHolder<TDocInput>());
- auto* message = out->Fetch();
- UNIT_ASSERT(message);
- UNIT_ASSERT_VALUES_EQUAL(message->GetA(), 1);
- UNIT_ASSERT_VALUES_EQUAL(message->GetB(), 2);
- UNIT_ASSERT_VALUES_EQUAL(message->GetC(), 3);
- UNIT_ASSERT_VALUES_EQUAL(callCounter, 1);
- UNIT_ASSERT(!out->Fetch());
- }
- Y_UNIT_TEST(TestCountersFilteredColumns) {
- using namespace NYql::NPureCalc;
- auto factory = MakeProgramFactory();
- i64 callCounter = 0;
- TMyCountersProvider myCountersProvider(&callCounter, nullptr);
- factory->AddUdfModule("MyModule", new TMyModule);
- factory->SetCountersProvider(&myCountersProvider);
- auto ospec = TProtobufOutputSpec<NPureCalcProto::TParsed>();
- ospec.SetOutputColumnsFilter(THashSet<TString>({"B", "C"}));
- auto program = factory->MakePullStreamProgram(
- TProtobufInputSpec<NPureCalcProto::TUnparsed>(),
- ospec,
- "select MyModule::Func(1) as A, 2 as B, 3 as C from Input",
- ETranslationMode::SQL);
- auto out = program->Apply(MakeHolder<TDocInput>());
- auto* message = out->Fetch();
- UNIT_ASSERT(message);
- UNIT_ASSERT_VALUES_EQUAL(message->GetA(), 0);
- UNIT_ASSERT_VALUES_EQUAL(message->GetB(), 2);
- UNIT_ASSERT_VALUES_EQUAL(message->GetC(), 3);
- UNIT_ASSERT_VALUES_EQUAL(callCounter, 0);
- UNIT_ASSERT(!out->Fetch());
- }
- Y_UNIT_TEST(TestScopedProbes) {
- using namespace NYql::NPureCalc;
- auto factory = MakeProgramFactory();
- TString log;
- TMyCountersProvider myCountersProvider(nullptr, &log);
- factory->AddUdfModule("MyModule", new TMyModule);
- factory->SetCountersProvider(&myCountersProvider);
- auto program = factory->MakePullStreamProgram(
- TProtobufInputSpec<NPureCalcProto::TUnparsed>(),
- TProtobufOutputSpec<NPureCalcProto::TParsed>(),
- "select MyModule::Func(1) as A, 2 as B, 3 as C from Input",
- ETranslationMode::SQL);
- auto out = program->Apply(MakeHolder<TDocInput>());
- auto* message = out->Fetch();
- UNIT_ASSERT(message);
- UNIT_ASSERT_VALUES_EQUAL(message->GetA(), 1);
- UNIT_ASSERT_VALUES_EQUAL(message->GetB(), 2);
- UNIT_ASSERT_VALUES_EQUAL(message->GetC(), 3);
- UNIT_ASSERT_VALUES_EQUAL(log, "Enter\nExit\n");
- UNIT_ASSERT(!out->Fetch());
- }
- }
|