test_udf.cpp 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include <yql/essentials/public/purecalc/purecalc.h>
  3. #include <yql/essentials/public/purecalc/io_specs/protobuf/spec.h>
  4. #include <yql/essentials/public/purecalc/ut/protos/test_structs.pb.h>
  5. #include <yql/essentials/public/udf/udf_counter.h>
  6. #include <yql/essentials/public/udf/udf_type_builder.h>
  7. #include <library/cpp/testing/unittest/registar.h>
  8. class TMyModule : public NKikimr::NUdf::IUdfModule {
  9. public:
  10. class TFunc : public NKikimr::NUdf::TBoxedValue {
  11. public:
  12. TFunc(NKikimr::NUdf::TCounter counter, NKikimr::NUdf::TScopedProbe scopedProbe)
  13. : Counter_(counter)
  14. , ScopedProbe_(scopedProbe)
  15. {}
  16. NKikimr::NUdf::TUnboxedValue Run(const NKikimr::NUdf::IValueBuilder* valueBuilder, const NKikimr::NUdf::TUnboxedValuePod* args) const override {
  17. Y_UNUSED(valueBuilder);
  18. with_lock(ScopedProbe_) {
  19. Counter_.Inc();
  20. return NKikimr::NUdf::TUnboxedValuePod(args[0].Get<i32>());
  21. }
  22. }
  23. private:
  24. mutable NKikimr::NUdf::TCounter Counter_;
  25. mutable NKikimr::NUdf::TScopedProbe ScopedProbe_;
  26. };
  27. void GetAllFunctions(NKikimr::NUdf::IFunctionsSink& sink) const override {
  28. Y_UNUSED(sink);
  29. }
  30. void BuildFunctionTypeInfo(
  31. const NKikimr::NUdf::TStringRef& name,
  32. NKikimr::NUdf::TType* userType,
  33. const NKikimr::NUdf::TStringRef& typeConfig,
  34. ui32 flags,
  35. NKikimr::NUdf::IFunctionTypeInfoBuilder& builder) const override {
  36. Y_UNUSED(userType);
  37. Y_UNUSED(typeConfig);
  38. Y_UNUSED(flags);
  39. if (name == NKikimr::NUdf::TStringRef::Of("Func")) {
  40. builder.SimpleSignature<i32(i32)>();
  41. builder.Implementation(new TFunc(
  42. builder.GetCounter("FuncCalls",true),
  43. builder.GetScopedProbe("FuncTime")
  44. ));
  45. }
  46. }
  47. void CleanupOnTerminate() const override {
  48. }
  49. };
  50. class TMyCountersProvider : public NKikimr::NUdf::ICountersProvider, public NKikimr::NUdf::IScopedProbeHost {
  51. public:
  52. TMyCountersProvider(i64* calls, TString* log)
  53. : Calls_(calls)
  54. , Log_(log)
  55. {}
  56. NKikimr::NUdf::TCounter GetCounter(const NKikimr::NUdf::TStringRef& module, const NKikimr::NUdf::TStringRef& name, bool deriv) override {
  57. UNIT_ASSERT_VALUES_EQUAL(module, "MyModule");
  58. UNIT_ASSERT_VALUES_EQUAL(name, "FuncCalls");
  59. UNIT_ASSERT_VALUES_EQUAL(deriv, true);
  60. return NKikimr::NUdf::TCounter(Calls_);
  61. }
  62. NKikimr::NUdf::TScopedProbe GetScopedProbe(const NKikimr::NUdf::TStringRef& module, const NKikimr::NUdf::TStringRef& name) override {
  63. UNIT_ASSERT_VALUES_EQUAL(module, "MyModule");
  64. UNIT_ASSERT_VALUES_EQUAL(name, "FuncTime");
  65. return NKikimr::NUdf::TScopedProbe(Log_ ? this : nullptr, Log_);
  66. }
  67. void Acquire(void* cookie) override {
  68. UNIT_ASSERT(cookie == Log_);
  69. *Log_ += "Enter\n";
  70. }
  71. void Release(void* cookie) override {
  72. UNIT_ASSERT(cookie == Log_);
  73. *Log_ += "Exit\n";
  74. }
  75. private:
  76. i64* Calls_;
  77. TString* Log_;
  78. };
  79. namespace NPureCalcProto {
  80. class TUnparsed;
  81. class TParsed;
  82. }
  83. class TDocInput : public NYql::NPureCalc::IStream<NPureCalcProto::TUnparsed*> {
  84. public:
  85. NPureCalcProto::TUnparsed* Fetch() override {
  86. if (Extracted) {
  87. return nullptr;
  88. }
  89. Extracted = true;
  90. Msg.SetS("foo");
  91. return &Msg;
  92. }
  93. public:
  94. NPureCalcProto::TUnparsed Msg;
  95. bool Extracted = false;
  96. };
  97. Y_UNIT_TEST_SUITE(TestUdf) {
  98. Y_UNIT_TEST(TestCounters) {
  99. using namespace NYql::NPureCalc;
  100. auto factory = MakeProgramFactory();
  101. i64 callCounter = 0;
  102. TMyCountersProvider myCountersProvider(&callCounter, nullptr);
  103. factory->AddUdfModule("MyModule", new TMyModule);
  104. factory->SetCountersProvider(&myCountersProvider);
  105. auto program = factory->MakePullStreamProgram(
  106. TProtobufInputSpec<NPureCalcProto::TUnparsed>(),
  107. TProtobufOutputSpec<NPureCalcProto::TParsed>(),
  108. "select MyModule::Func(1) as A, 2 as B, 3 as C from Input",
  109. ETranslationMode::SQL);
  110. auto out = program->Apply(MakeHolder<TDocInput>());
  111. auto* message = out->Fetch();
  112. UNIT_ASSERT(message);
  113. UNIT_ASSERT_VALUES_EQUAL(message->GetA(), 1);
  114. UNIT_ASSERT_VALUES_EQUAL(message->GetB(), 2);
  115. UNIT_ASSERT_VALUES_EQUAL(message->GetC(), 3);
  116. UNIT_ASSERT_VALUES_EQUAL(callCounter, 1);
  117. UNIT_ASSERT(!out->Fetch());
  118. }
  119. Y_UNIT_TEST(TestCountersFilteredColumns) {
  120. using namespace NYql::NPureCalc;
  121. auto factory = MakeProgramFactory();
  122. i64 callCounter = 0;
  123. TMyCountersProvider myCountersProvider(&callCounter, nullptr);
  124. factory->AddUdfModule("MyModule", new TMyModule);
  125. factory->SetCountersProvider(&myCountersProvider);
  126. auto ospec = TProtobufOutputSpec<NPureCalcProto::TParsed>();
  127. ospec.SetOutputColumnsFilter(THashSet<TString>({"B", "C"}));
  128. auto program = factory->MakePullStreamProgram(
  129. TProtobufInputSpec<NPureCalcProto::TUnparsed>(),
  130. ospec,
  131. "select MyModule::Func(1) as A, 2 as B, 3 as C from Input",
  132. ETranslationMode::SQL);
  133. auto out = program->Apply(MakeHolder<TDocInput>());
  134. auto* message = out->Fetch();
  135. UNIT_ASSERT(message);
  136. UNIT_ASSERT_VALUES_EQUAL(message->GetA(), 0);
  137. UNIT_ASSERT_VALUES_EQUAL(message->GetB(), 2);
  138. UNIT_ASSERT_VALUES_EQUAL(message->GetC(), 3);
  139. UNIT_ASSERT_VALUES_EQUAL(callCounter, 0);
  140. UNIT_ASSERT(!out->Fetch());
  141. }
  142. Y_UNIT_TEST(TestScopedProbes) {
  143. using namespace NYql::NPureCalc;
  144. auto factory = MakeProgramFactory();
  145. TString log;
  146. TMyCountersProvider myCountersProvider(nullptr, &log);
  147. factory->AddUdfModule("MyModule", new TMyModule);
  148. factory->SetCountersProvider(&myCountersProvider);
  149. auto program = factory->MakePullStreamProgram(
  150. TProtobufInputSpec<NPureCalcProto::TUnparsed>(),
  151. TProtobufOutputSpec<NPureCalcProto::TParsed>(),
  152. "select MyModule::Func(1) as A, 2 as B, 3 as C from Input",
  153. ETranslationMode::SQL);
  154. auto out = program->Apply(MakeHolder<TDocInput>());
  155. auto* message = out->Fetch();
  156. UNIT_ASSERT(message);
  157. UNIT_ASSERT_VALUES_EQUAL(message->GetA(), 1);
  158. UNIT_ASSERT_VALUES_EQUAL(message->GetB(), 2);
  159. UNIT_ASSERT_VALUES_EQUAL(message->GetC(), 3);
  160. UNIT_ASSERT_VALUES_EQUAL(log, "Enter\nExit\n");
  161. UNIT_ASSERT(!out->Fetch());
  162. }
  163. }