test_spec.cpp 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996
  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include <yql/essentials/public/purecalc/common/interface.h>
  3. #include <yql/essentials/public/purecalc/io_specs/protobuf/spec.h>
  4. #include <yql/essentials/public/purecalc/ut/protos/test_structs.pb.h>
  5. #include <library/cpp/protobuf/util/pb_io.h>
  6. #include <util/generic/xrange.h>
  7. namespace {
  8. TMaybe<NPureCalcProto::TAllTypes> allTypesMessage;
  9. NPureCalcProto::TAllTypes& GetCanonicalMessage() {
  10. if (!allTypesMessage) {
  11. allTypesMessage = NPureCalcProto::TAllTypes();
  12. allTypesMessage->SetFDouble(1);
  13. allTypesMessage->SetFFloat(2);
  14. allTypesMessage->SetFInt64(3);
  15. allTypesMessage->SetFSfixed64(4);
  16. allTypesMessage->SetFSint64(5);
  17. allTypesMessage->SetFUint64(6);
  18. allTypesMessage->SetFFixed64(7);
  19. allTypesMessage->SetFInt32(8);
  20. allTypesMessage->SetFSfixed32(9);
  21. allTypesMessage->SetFSint32(10);
  22. allTypesMessage->SetFUint32(11);
  23. allTypesMessage->SetFFixed32(12);
  24. allTypesMessage->SetFBool(true);
  25. allTypesMessage->SetFString("asd");
  26. allTypesMessage->SetFBytes("dsa");
  27. }
  28. return allTypesMessage.GetRef();
  29. }
  30. template <typename T1, typename T2>
  31. void AssertEqualToCanonical(const T1& got, const T2& expected) {
  32. UNIT_ASSERT_EQUAL(expected.GetFDouble(), got.GetFDouble());
  33. UNIT_ASSERT_EQUAL(expected.GetFFloat(), got.GetFFloat());
  34. UNIT_ASSERT_EQUAL(expected.GetFInt64(), got.GetFInt64());
  35. UNIT_ASSERT_EQUAL(expected.GetFSfixed64(), got.GetFSfixed64());
  36. UNIT_ASSERT_EQUAL(expected.GetFSint64(), got.GetFSint64());
  37. UNIT_ASSERT_EQUAL(expected.GetFUint64(), got.GetFUint64());
  38. UNIT_ASSERT_EQUAL(expected.GetFFixed64(), got.GetFFixed64());
  39. UNIT_ASSERT_EQUAL(expected.GetFInt32(), got.GetFInt32());
  40. UNIT_ASSERT_EQUAL(expected.GetFSfixed32(), got.GetFSfixed32());
  41. UNIT_ASSERT_EQUAL(expected.GetFSint32(), got.GetFSint32());
  42. UNIT_ASSERT_EQUAL(expected.GetFUint32(), got.GetFUint32());
  43. UNIT_ASSERT_EQUAL(expected.GetFFixed32(), got.GetFFixed32());
  44. UNIT_ASSERT_EQUAL(expected.GetFBool(), got.GetFBool());
  45. UNIT_ASSERT_EQUAL(expected.GetFString(), got.GetFString());
  46. UNIT_ASSERT_EQUAL(expected.GetFBytes(), got.GetFBytes());
  47. }
  48. template <typename T>
  49. void AssertEqualToCanonical(const T& got) {
  50. AssertEqualToCanonical(got, GetCanonicalMessage());
  51. }
  52. TString SerializeToTextFormatAsString(const google::protobuf::Message& message) {
  53. TString result;
  54. {
  55. TStringOutput output(result);
  56. SerializeToTextFormat(message, output);
  57. }
  58. return result;
  59. }
  60. template <typename T>
  61. void AssertProtoEqual(const T& actual, const T& expected) {
  62. UNIT_ASSERT_VALUES_EQUAL(SerializeToTextFormatAsString(actual), SerializeToTextFormatAsString(expected));
  63. }
  64. }
  65. class TAllTypesStreamImpl: public NYql::NPureCalc::IStream<NPureCalcProto::TAllTypes*> {
  66. private:
  67. int I_ = 0;
  68. NPureCalcProto::TAllTypes Message_ = GetCanonicalMessage();
  69. public:
  70. NPureCalcProto::TAllTypes* Fetch() override {
  71. if (I_ > 0) {
  72. return nullptr;
  73. } else {
  74. I_ += 1;
  75. return &Message_;
  76. }
  77. }
  78. };
  79. class TSimpleMessageStreamImpl: public NYql::NPureCalc::IStream<NPureCalcProto::TSimpleMessage*> {
  80. public:
  81. TSimpleMessageStreamImpl(i32 value)
  82. {
  83. Message_.SetX(value);
  84. }
  85. NPureCalcProto::TSimpleMessage* Fetch() override {
  86. if (Exhausted_) {
  87. return nullptr;
  88. } else {
  89. Exhausted_ = true;
  90. return &Message_;
  91. }
  92. }
  93. private:
  94. NPureCalcProto::TSimpleMessage Message_;
  95. bool Exhausted_ = false;
  96. };
  97. class TAllTypesConsumerImpl: public NYql::NPureCalc::IConsumer<NPureCalcProto::TAllTypes*> {
  98. private:
  99. int I_ = 0;
  100. public:
  101. void OnObject(NPureCalcProto::TAllTypes* t) override {
  102. I_ += 1;
  103. AssertEqualToCanonical(*t);
  104. }
  105. void OnFinish() override {
  106. UNIT_ASSERT(I_ > 0);
  107. }
  108. };
  109. class TStringMessageStreamImpl: public NYql::NPureCalc::IStream<NPureCalcProto::TStringMessage*> {
  110. private:
  111. int I_ = 0;
  112. NPureCalcProto::TStringMessage Message_{};
  113. public:
  114. NPureCalcProto::TStringMessage* Fetch() override {
  115. if (I_ >= 3) {
  116. return nullptr;
  117. } else {
  118. Message_.SetX(TString("-") * I_);
  119. I_ += 1;
  120. return &Message_;
  121. }
  122. }
  123. };
  124. class TSimpleMessageConsumerImpl: public NYql::NPureCalc::IConsumer<NPureCalcProto::TSimpleMessage*> {
  125. private:
  126. TVector<int>* Buf_;
  127. public:
  128. TSimpleMessageConsumerImpl(TVector<int>* buf)
  129. : Buf_(buf)
  130. {
  131. }
  132. public:
  133. void OnObject(NPureCalcProto::TSimpleMessage* t) override {
  134. Buf_->push_back(t->GetX());
  135. }
  136. void OnFinish() override {
  137. Buf_->push_back(-100);
  138. }
  139. };
  140. using TMessagesVariant = std::variant<NPureCalcProto::TSplitted1*, NPureCalcProto::TSplitted2*, NPureCalcProto::TStringMessage*>;
  141. class TVariantConsumerImpl: public NYql::NPureCalc::IConsumer<TMessagesVariant> {
  142. public:
  143. using TType0 = TVector<std::pair<i32, TString>>;
  144. using TType1 = TVector<std::pair<ui32, TString>>;
  145. using TType2 = TVector<TString>;
  146. public:
  147. TVariantConsumerImpl(TType0* q0, TType1* q1, TType2* q2, int* v)
  148. : Queue0_(q0)
  149. , Queue1_(q1)
  150. , Queue2_(q2)
  151. , Value_(v)
  152. {
  153. }
  154. void OnObject(TMessagesVariant value) override {
  155. if (auto* p = std::get_if<0>(&value)) {
  156. Queue0_->push_back({(*p)->GetBInt(), std::move(*(*p)->MutableBString())});
  157. } else if (auto* p = std::get_if<1>(&value)) {
  158. Queue1_->push_back({(*p)->GetCUint(), std::move(*(*p)->MutableCString())});
  159. } else if (auto* p = std::get_if<2>(&value)) {
  160. Queue2_->push_back(std::move(*(*p)->MutableX()));
  161. } else {
  162. Y_ABORT("invalid variant alternative");
  163. }
  164. }
  165. void OnFinish() override {
  166. *Value_ = 42;
  167. }
  168. private:
  169. TType0* Queue0_;
  170. TType1* Queue1_;
  171. TType2* Queue2_;
  172. int* Value_;
  173. };
  174. class TUnsplittedStreamImpl: public NYql::NPureCalc::IStream<NPureCalcProto::TUnsplitted*> {
  175. public:
  176. TUnsplittedStreamImpl()
  177. {
  178. Message_.SetAInt(-23);
  179. Message_.SetAUint(111);
  180. Message_.SetAString("Hello!");
  181. }
  182. public:
  183. NPureCalcProto::TUnsplitted* Fetch() override {
  184. switch (I_) {
  185. case 0:
  186. ++I_;
  187. return &Message_;
  188. case 1:
  189. ++I_;
  190. Message_.SetABool(false);
  191. return &Message_;
  192. case 2:
  193. ++I_;
  194. Message_.SetABool(true);
  195. return &Message_;
  196. default:
  197. return nullptr;
  198. }
  199. }
  200. private:
  201. NPureCalcProto::TUnsplitted Message_;
  202. ui32 I_ = 0;
  203. };
  204. template<typename T>
  205. struct TVectorConsumer: public NYql::NPureCalc::IConsumer<T*> {
  206. TVector<T> Data;
  207. void OnObject(T* t) override {
  208. Data.push_back(*t);
  209. }
  210. void OnFinish() override {
  211. }
  212. };
  213. template <typename T>
  214. struct TVectorStream: public NYql::NPureCalc::IStream<T*> {
  215. TVector<T> Data;
  216. size_t Index = 0;
  217. public:
  218. T* Fetch() override {
  219. return Index < Data.size() ? &Data[Index++] : nullptr;
  220. }
  221. };
  222. Y_UNIT_TEST_SUITE(TestProtoIO) {
  223. Y_UNIT_TEST(TestAllTypes) {
  224. using namespace NYql::NPureCalc;
  225. auto factory = MakeProgramFactory();
  226. {
  227. auto program = factory->MakePullStreamProgram(
  228. TProtobufInputSpec<NPureCalcProto::TAllTypes>(),
  229. TProtobufOutputSpec<NPureCalcProto::TAllTypes>(),
  230. "SELECT * FROM Input",
  231. ETranslationMode::SQL
  232. );
  233. auto stream = program->Apply(MakeHolder<TAllTypesStreamImpl>());
  234. NPureCalcProto::TAllTypes* message;
  235. UNIT_ASSERT(message = stream->Fetch());
  236. AssertEqualToCanonical(*message);
  237. UNIT_ASSERT(!stream->Fetch());
  238. }
  239. {
  240. auto program = factory->MakePullListProgram(
  241. TProtobufInputSpec<NPureCalcProto::TAllTypes>(),
  242. TProtobufOutputSpec<NPureCalcProto::TAllTypes>(),
  243. "SELECT * FROM Input",
  244. ETranslationMode::SQL
  245. );
  246. auto stream = program->Apply(MakeHolder<TAllTypesStreamImpl>());
  247. NPureCalcProto::TAllTypes* message;
  248. UNIT_ASSERT(message = stream->Fetch());
  249. AssertEqualToCanonical(*message);
  250. UNIT_ASSERT(!stream->Fetch());
  251. }
  252. {
  253. auto program = factory->MakePushStreamProgram(
  254. TProtobufInputSpec<NPureCalcProto::TAllTypes>(),
  255. TProtobufOutputSpec<NPureCalcProto::TAllTypes>(),
  256. "SELECT * FROM Input",
  257. ETranslationMode::SQL
  258. );
  259. auto consumer = program->Apply(MakeHolder<TAllTypesConsumerImpl>());
  260. UNIT_ASSERT_NO_EXCEPTION([&](){ consumer->OnObject(&GetCanonicalMessage()); }());
  261. UNIT_ASSERT_NO_EXCEPTION([&](){ consumer->OnFinish(); }());
  262. }
  263. }
  264. template <typename T>
  265. void CheckPassThroughYql(T& testInput, google::protobuf::Arena* arena = nullptr) {
  266. using namespace NYql::NPureCalc;
  267. auto resetArena = [arena]() {
  268. if (arena != nullptr) {
  269. arena->Reset();
  270. }
  271. };
  272. auto factory = MakeProgramFactory();
  273. {
  274. auto program = factory->MakePushStreamProgram(
  275. TProtobufInputSpec<T>(),
  276. TProtobufOutputSpec<T>({}, arena),
  277. "SELECT * FROM Input",
  278. ETranslationMode::SQL
  279. );
  280. auto resultConsumer = MakeHolder<TVectorConsumer<T>>();
  281. auto* resultConsumerPtr = resultConsumer.Get();
  282. auto sourceConsumer = program->Apply(std::move(resultConsumer));
  283. sourceConsumer->OnObject(&testInput);
  284. UNIT_ASSERT_VALUES_EQUAL(1, resultConsumerPtr->Data.size());
  285. AssertProtoEqual(resultConsumerPtr->Data[0], testInput);
  286. resultConsumerPtr->Data.clear();
  287. sourceConsumer->OnObject(&testInput);
  288. UNIT_ASSERT_VALUES_EQUAL(1, resultConsumerPtr->Data.size());
  289. AssertProtoEqual(resultConsumerPtr->Data[0], testInput);
  290. }
  291. resetArena();
  292. {
  293. auto program = factory->MakePullStreamProgram(
  294. TProtobufInputSpec<T>(),
  295. TProtobufOutputSpec<T>({}, arena),
  296. "SELECT * FROM Input",
  297. ETranslationMode::SQL
  298. );
  299. auto sourceStream = MakeHolder<TVectorStream<T>>();
  300. auto* sourceStreamPtr = sourceStream.Get();
  301. auto resultStream = program->Apply(std::move(sourceStream));
  302. sourceStreamPtr->Data.push_back(testInput);
  303. T* resultMessage;
  304. UNIT_ASSERT(resultMessage = resultStream->Fetch());
  305. AssertProtoEqual(*resultMessage, testInput);
  306. UNIT_ASSERT(!resultStream->Fetch());
  307. UNIT_ASSERT_VALUES_EQUAL(resultMessage->GetArena(), arena);
  308. }
  309. resetArena();
  310. {
  311. auto program = factory->MakePullListProgram(
  312. TProtobufInputSpec<T>(),
  313. TProtobufOutputSpec<T>({}, arena),
  314. "SELECT * FROM Input",
  315. ETranslationMode::SQL
  316. );
  317. auto sourceStream = MakeHolder<TVectorStream<T>>();
  318. auto* sourceStreamPtr = sourceStream.Get();
  319. auto resultStream = program->Apply(std::move(sourceStream));
  320. sourceStreamPtr->Data.push_back(testInput);
  321. T* resultMessage;
  322. UNIT_ASSERT(resultMessage = resultStream->Fetch());
  323. AssertProtoEqual(*resultMessage, testInput);
  324. UNIT_ASSERT(!resultStream->Fetch());
  325. UNIT_ASSERT_VALUES_EQUAL(resultMessage->GetArena(), arena);
  326. }
  327. resetArena();
  328. }
  329. template <typename T>
  330. void CheckMessageIsInvalid(const TString& expectedExceptionMessage) {
  331. using namespace NYql::NPureCalc;
  332. auto factory = MakeProgramFactory();
  333. UNIT_ASSERT_EXCEPTION_CONTAINS([&]() {
  334. factory->MakePushStreamProgram(TProtobufInputSpec<T>(), TProtobufOutputSpec<T>(), "SELECT * FROM Input", ETranslationMode::SQL);
  335. }(), yexception, expectedExceptionMessage);
  336. UNIT_ASSERT_EXCEPTION_CONTAINS([&]() {
  337. factory->MakePullStreamProgram(TProtobufInputSpec<T>(), TProtobufOutputSpec<T>(), "SELECT * FROM Input", ETranslationMode::SQL);
  338. }(), yexception, expectedExceptionMessage);
  339. UNIT_ASSERT_EXCEPTION_CONTAINS([&]() {
  340. factory->MakePullListProgram(TProtobufInputSpec<T>(), TProtobufOutputSpec<T>(), "SELECT * FROM Input", ETranslationMode::SQL);
  341. }(), yexception, expectedExceptionMessage);
  342. }
  343. Y_UNIT_TEST(TestSimpleNested) {
  344. NPureCalcProto::TSimpleNested input;
  345. input.SetX(10);
  346. {
  347. auto* item = input.MutableY();
  348. *item = GetCanonicalMessage();
  349. item->SetFUint64(100);
  350. }
  351. CheckPassThroughYql(input);
  352. }
  353. Y_UNIT_TEST(TestOptionalNested) {
  354. NPureCalcProto::TOptionalNested input;
  355. {
  356. auto* item = input.MutableX();
  357. *item = GetCanonicalMessage();
  358. item->SetFUint64(100);
  359. }
  360. CheckPassThroughYql(input);
  361. }
  362. Y_UNIT_TEST(TestSimpleRepeated) {
  363. NPureCalcProto::TSimpleRepeated input;
  364. input.SetX(20);
  365. input.AddY(100);
  366. input.AddY(200);
  367. input.AddY(300);
  368. CheckPassThroughYql(input);
  369. }
  370. Y_UNIT_TEST(TestNestedRepeated) {
  371. NPureCalcProto::TNestedRepeated input;
  372. input.SetX(20);
  373. {
  374. auto* item = input.MutableY()->Add();
  375. item->SetX(100);
  376. {
  377. auto* y = item->MutableY();
  378. *y = GetCanonicalMessage();
  379. y->SetFUint64(1000);
  380. }
  381. }
  382. {
  383. auto* item = input.MutableY()->Add();
  384. item->SetX(200);
  385. {
  386. auto* y = item->MutableY();
  387. *y = GetCanonicalMessage();
  388. y->SetFUint64(2000);
  389. }
  390. }
  391. CheckPassThroughYql(input);
  392. }
  393. Y_UNIT_TEST(TestMessageWithEnum) {
  394. NPureCalcProto::TMessageWithEnum input;
  395. input.AddEnumValue(NPureCalcProto::TMessageWithEnum::VALUE1);
  396. input.AddEnumValue(NPureCalcProto::TMessageWithEnum::VALUE2);
  397. CheckPassThroughYql(input);
  398. }
  399. Y_UNIT_TEST(TestRecursive) {
  400. CheckMessageIsInvalid<NPureCalcProto::TRecursive>("NPureCalcProto.TRecursive->NPureCalcProto.TRecursive");
  401. }
  402. Y_UNIT_TEST(TestRecursiveIndirectly) {
  403. CheckMessageIsInvalid<NPureCalcProto::TRecursiveIndirectly>(
  404. "NPureCalcProto.TRecursiveIndirectly->NPureCalcProto.TRecursiveIndirectly.TNested->NPureCalcProto.TRecursiveIndirectly");
  405. }
  406. Y_UNIT_TEST(TestColumnsFilter) {
  407. using namespace NYql::NPureCalc;
  408. auto factory = MakeProgramFactory();
  409. auto filter = THashSet<TString>({"FFixed64", "FBool", "FBytes"});
  410. NPureCalcProto::TOptionalAllTypes canonicalMessage;
  411. canonicalMessage.SetFFixed64(GetCanonicalMessage().GetFFixed64());
  412. canonicalMessage.SetFBool(GetCanonicalMessage().GetFBool());
  413. canonicalMessage.SetFBytes(GetCanonicalMessage().GetFBytes());
  414. {
  415. auto inputSpec = TProtobufInputSpec<NPureCalcProto::TAllTypes>();
  416. auto outputSpec = TProtobufOutputSpec<NPureCalcProto::TOptionalAllTypes>();
  417. outputSpec.SetOutputColumnsFilter(filter);
  418. auto program = factory->MakePullStreamProgram(
  419. inputSpec,
  420. outputSpec,
  421. "SELECT * FROM Input",
  422. ETranslationMode::SQL
  423. );
  424. UNIT_ASSERT_EQUAL(program->GetUsedColumns(), filter);
  425. auto stream = program->Apply(MakeHolder<TAllTypesStreamImpl>());
  426. NPureCalcProto::TOptionalAllTypes* message;
  427. UNIT_ASSERT(message = stream->Fetch());
  428. AssertEqualToCanonical(*message, canonicalMessage);
  429. UNIT_ASSERT(!stream->Fetch());
  430. }
  431. }
  432. Y_UNIT_TEST(TestColumnsFilterWithOptionalFields) {
  433. using namespace NYql::NPureCalc;
  434. auto factory = MakeProgramFactory();
  435. auto fields = THashSet<TString>({"FFixed64", "FBool", "FBytes"});
  436. NPureCalcProto::TOptionalAllTypes canonicalMessage;
  437. canonicalMessage.SetFFixed64(GetCanonicalMessage().GetFFixed64());
  438. canonicalMessage.SetFBool(GetCanonicalMessage().GetFBool());
  439. canonicalMessage.SetFBytes(GetCanonicalMessage().GetFBytes());
  440. {
  441. auto program = factory->MakePullStreamProgram(
  442. TProtobufInputSpec<NPureCalcProto::TAllTypes>(),
  443. TProtobufOutputSpec<NPureCalcProto::TOptionalAllTypes>(),
  444. "SELECT FFixed64, FBool, FBytes FROM Input",
  445. ETranslationMode::SQL
  446. );
  447. UNIT_ASSERT_EQUAL(program->GetUsedColumns(), fields);
  448. auto stream = program->Apply(MakeHolder<TAllTypesStreamImpl>());
  449. NPureCalcProto::TOptionalAllTypes* message;
  450. UNIT_ASSERT(message = stream->Fetch());
  451. AssertEqualToCanonical(*message, canonicalMessage);
  452. UNIT_ASSERT(!stream->Fetch());
  453. }
  454. UNIT_ASSERT_EXCEPTION_CONTAINS([&](){
  455. factory->MakePullStreamProgram(
  456. TProtobufInputSpec<NPureCalcProto::TAllTypes>(),
  457. TProtobufOutputSpec<NPureCalcProto::TAllTypes>(),
  458. "SELECT FFixed64, FBool, FBytes FROM Input",
  459. ETranslationMode::SQL
  460. );
  461. }(), TCompileError, "Failed to optimize");
  462. }
  463. Y_UNIT_TEST(TestUsedColumns) {
  464. using namespace NYql::NPureCalc;
  465. auto factory = MakeProgramFactory();
  466. auto allFields = THashSet<TString>();
  467. for (auto i: xrange(NPureCalcProto::TOptionalAllTypes::descriptor()->field_count())) {
  468. allFields.emplace(NPureCalcProto::TOptionalAllTypes::descriptor()->field(i)->name());
  469. }
  470. {
  471. auto program = factory->MakePullStreamProgram(
  472. TProtobufInputSpec<NPureCalcProto::TAllTypes>(),
  473. TProtobufOutputSpec<NPureCalcProto::TOptionalAllTypes>(),
  474. "SELECT * FROM Input",
  475. ETranslationMode::SQL
  476. );
  477. UNIT_ASSERT_EQUAL(program->GetUsedColumns(), allFields);
  478. }
  479. }
  480. Y_UNIT_TEST(TestChaining) {
  481. using namespace NYql::NPureCalc;
  482. auto factory = MakeProgramFactory();
  483. TString sql1 = "SELECT UNWRAP(X || CAST(\"HI\" AS Utf8)) AS X FROM Input";
  484. TString sql2 = "SELECT LENGTH(X) AS X FROM Input";
  485. {
  486. auto program1 = factory->MakePullStreamProgram(
  487. TProtobufInputSpec<NPureCalcProto::TStringMessage>(),
  488. TProtobufOutputSpec<NPureCalcProto::TStringMessage>(),
  489. sql1,
  490. ETranslationMode::SQL
  491. );
  492. auto program2 = factory->MakePullStreamProgram(
  493. TProtobufInputSpec<NPureCalcProto::TStringMessage>(),
  494. TProtobufOutputSpec<NPureCalcProto::TSimpleMessage>(),
  495. sql2,
  496. ETranslationMode::SQL
  497. );
  498. auto input = MakeHolder<TStringMessageStreamImpl>();
  499. auto intermediate = program1->Apply(std::move(input));
  500. auto output = program2->Apply(std::move(intermediate));
  501. TVector<int> expected = {2, 3, 4};
  502. TVector<int> actual{};
  503. while (auto *x = output->Fetch()) {
  504. actual.push_back(x->GetX());
  505. }
  506. UNIT_ASSERT_EQUAL(expected, actual);
  507. }
  508. {
  509. auto program1 = factory->MakePullListProgram(
  510. TProtobufInputSpec<NPureCalcProto::TStringMessage>(),
  511. TProtobufOutputSpec<NPureCalcProto::TStringMessage>(),
  512. sql1,
  513. ETranslationMode::SQL
  514. );
  515. auto program2 = factory->MakePullListProgram(
  516. TProtobufInputSpec<NPureCalcProto::TStringMessage>(),
  517. TProtobufOutputSpec<NPureCalcProto::TSimpleMessage>(),
  518. sql2,
  519. ETranslationMode::SQL
  520. );
  521. auto input = MakeHolder<TStringMessageStreamImpl>();
  522. auto intermediate = program1->Apply(std::move(input));
  523. auto output = program2->Apply(std::move(intermediate));
  524. TVector<int> expected = {2, 3, 4};
  525. TVector<int> actual{};
  526. while (auto *x = output->Fetch()) {
  527. actual.push_back(x->GetX());
  528. }
  529. UNIT_ASSERT_EQUAL(expected, actual);
  530. }
  531. {
  532. auto program1 = factory->MakePushStreamProgram(
  533. TProtobufInputSpec<NPureCalcProto::TStringMessage>(),
  534. TProtobufOutputSpec<NPureCalcProto::TStringMessage>(),
  535. sql1,
  536. ETranslationMode::SQL
  537. );
  538. auto program2 = factory->MakePushStreamProgram(
  539. TProtobufInputSpec<NPureCalcProto::TStringMessage>(),
  540. TProtobufOutputSpec<NPureCalcProto::TSimpleMessage>(),
  541. sql2,
  542. ETranslationMode::SQL
  543. );
  544. TVector<int> expected = {2, 3, 4, -100};
  545. TVector<int> actual{};
  546. auto consumer = MakeHolder<TSimpleMessageConsumerImpl>(&actual);
  547. auto intermediate = program2->Apply(std::move(consumer));
  548. auto input = program1->Apply(std::move(intermediate));
  549. NPureCalcProto::TStringMessage Message;
  550. Message.SetX("");
  551. input->OnObject(&Message);
  552. Message.SetX("1");
  553. input->OnObject(&Message);
  554. Message.SetX("22");
  555. input->OnObject(&Message);
  556. input->OnFinish();
  557. UNIT_ASSERT_EQUAL(expected, actual);
  558. }
  559. }
  560. Y_UNIT_TEST(TestTimestampColumn) {
  561. using namespace NYql::NPureCalc;
  562. auto factory = MakeProgramFactory(TProgramFactoryOptions()
  563. .SetDeterministicTimeProviderSeed(1)); // seconds
  564. NPureCalcProto::TOptionalAllTypes canonicalMessage;
  565. {
  566. auto inputSpec = TProtobufInputSpec<NPureCalcProto::TAllTypes>("MyTimestamp");
  567. auto outputSpec = TProtobufOutputSpec<NPureCalcProto::TOptionalAllTypes>();
  568. auto program = factory->MakePullStreamProgram(
  569. inputSpec,
  570. outputSpec,
  571. "SELECT MyTimestamp AS FFixed64 FROM Input",
  572. ETranslationMode::SQL
  573. );
  574. auto stream = program->Apply(MakeHolder<TAllTypesStreamImpl>());
  575. NPureCalcProto::TOptionalAllTypes* message;
  576. UNIT_ASSERT(message = stream->Fetch());
  577. UNIT_ASSERT_VALUES_EQUAL(message->GetFFixed64(), 1000000); // microseconds
  578. UNIT_ASSERT(!stream->Fetch());
  579. }
  580. }
  581. Y_UNIT_TEST(TestTableNames) {
  582. using namespace NYql::NPureCalc;
  583. auto factory = MakeProgramFactory(TProgramFactoryOptions().SetUseSystemColumns(true));
  584. auto runTest = [&](TStringBuf tableName, i32 value) {
  585. auto program = factory->MakePullStreamProgram(
  586. TProtobufInputSpec<NPureCalcProto::TSimpleMessage>(),
  587. TProtobufOutputSpec<NPureCalcProto::TNamedSimpleMessage>(),
  588. TString::Join("SELECT TableName() AS Name, X FROM ", tableName),
  589. ETranslationMode::SQL
  590. );
  591. auto stream = program->Apply(MakeHolder<TSimpleMessageStreamImpl>(value));
  592. auto message = stream->Fetch();
  593. UNIT_ASSERT(message);
  594. UNIT_ASSERT_VALUES_EQUAL(message->GetX(), value);
  595. UNIT_ASSERT_VALUES_EQUAL(message->GetName(), tableName);
  596. UNIT_ASSERT(!stream->Fetch());
  597. };
  598. runTest("Input", 37);
  599. runTest("Input0", -23);
  600. }
  601. void CheckMultiOutputs(TMaybe<TVector<google::protobuf::Arena*>> arenas) {
  602. using namespace NYql::NPureCalc;
  603. auto factory = MakeProgramFactory();
  604. TString sExpr = R"(
  605. (
  606. (let $type (ParseType '"Variant<Struct<BInt:Int32,BString:Utf8>, Struct<CUint:Uint32,CString:Utf8>, Struct<X:Utf8>>"))
  607. (let $stream (Self '0))
  608. (return (FlatMap (Self '0) (lambda '(x) (block '(
  609. (let $cond (Member x 'ABool))
  610. (let $item0 (Variant (AsStruct '('BInt (Member x 'AInt)) '('BString (Member x 'AString))) '0 $type))
  611. (let $item1 (Variant (AsStruct '('CUint (Member x 'AUint)) '('CString (Member x 'AString))) '1 $type))
  612. (let $item2 (Variant (AsStruct '('X (Utf8 'Error))) '2 $type))
  613. (return (If (Exists $cond) (If (Unwrap $cond) (AsList $item0) (AsList $item1)) (AsList $item2)))
  614. )))))
  615. )
  616. )";
  617. {
  618. auto program = factory->MakePushStreamProgram(
  619. TProtobufInputSpec<NPureCalcProto::TUnsplitted>(),
  620. TProtobufMultiOutputSpec<NPureCalcProto::TSplitted1, NPureCalcProto::TSplitted2, NPureCalcProto::TStringMessage>(
  621. {}, arenas
  622. ),
  623. sExpr,
  624. ETranslationMode::SExpr
  625. );
  626. TVariantConsumerImpl::TType0 queue0;
  627. TVariantConsumerImpl::TType1 queue1;
  628. TVariantConsumerImpl::TType2 queue2;
  629. int finalValue = 0;
  630. auto consumer = MakeHolder<TVariantConsumerImpl>(&queue0, &queue1, &queue2, &finalValue);
  631. auto input = program->Apply(std::move(consumer));
  632. NPureCalcProto::TUnsplitted message;
  633. message.SetAInt(-13);
  634. message.SetAUint(47);
  635. message.SetAString("first message");
  636. message.SetABool(true);
  637. input->OnObject(&message);
  638. UNIT_ASSERT(queue0.size() == 1 && queue1.empty() && queue2.empty() && finalValue == 0);
  639. message.SetABool(false);
  640. message.SetAString("second message");
  641. input->OnObject(&message);
  642. UNIT_ASSERT(queue0.size() == 1 && queue1.size() == 1 && queue2.empty() && finalValue == 0);
  643. message.ClearABool();
  644. input->OnObject(&message);
  645. UNIT_ASSERT(queue0.size() == 1 && queue1.size() == 1 && queue2.size() == 1 && finalValue == 0);
  646. input->OnFinish();
  647. UNIT_ASSERT(queue0.size() == 1 && queue1.size() == 1 && queue2.size() == 1 && finalValue == 42);
  648. TVariantConsumerImpl::TType0 expected0 = {{-13, "first message"}};
  649. UNIT_ASSERT_EQUAL(queue0, expected0);
  650. TVariantConsumerImpl::TType1 expected1 = {{47, "second message"}};
  651. UNIT_ASSERT_EQUAL(queue1, expected1);
  652. TVariantConsumerImpl::TType2 expected2 = {{"Error"}};
  653. UNIT_ASSERT_EQUAL(queue2, expected2);
  654. }
  655. {
  656. auto program1 = factory->MakePullStreamProgram(
  657. TProtobufInputSpec<NPureCalcProto::TUnsplitted>(),
  658. TProtobufMultiOutputSpec<NPureCalcProto::TSplitted1, NPureCalcProto::TSplitted2, NPureCalcProto::TStringMessage>(
  659. {}, arenas
  660. ),
  661. sExpr,
  662. ETranslationMode::SExpr
  663. );
  664. auto program2 = factory->MakePullListProgram(
  665. TProtobufInputSpec<NPureCalcProto::TUnsplitted>(),
  666. TProtobufMultiOutputSpec<NPureCalcProto::TSplitted1, NPureCalcProto::TSplitted2, NPureCalcProto::TStringMessage>(
  667. {}, arenas
  668. ),
  669. sExpr,
  670. ETranslationMode::SExpr
  671. );
  672. auto input1 = MakeHolder<TUnsplittedStreamImpl>();
  673. auto output1 = program1->Apply(std::move(input1));
  674. auto input2 = MakeHolder<TUnsplittedStreamImpl>();
  675. auto output2 = program2->Apply(std::move(input2));
  676. decltype(output1->Fetch()) variant1;
  677. decltype(output2->Fetch()) variant2;
  678. #define ASSERT_EQUAL_FIELDS(X1, X2, I, F, E) \
  679. UNIT_ASSERT_EQUAL(X1.index(), I); \
  680. UNIT_ASSERT_EQUAL(X2.index(), I); \
  681. UNIT_ASSERT_EQUAL(std::get<I>(X1)->Get##F(), E); \
  682. UNIT_ASSERT_EQUAL(std::get<I>(X2)->Get##F(), E)
  683. variant1 = output1->Fetch();
  684. variant2 = output2->Fetch();
  685. ASSERT_EQUAL_FIELDS(variant1, variant2, 2, X, "Error");
  686. ASSERT_EQUAL_FIELDS(variant1, variant2, 2, Arena, (arenas.Defined() ? arenas->at(2) : nullptr));
  687. variant1 = output1->Fetch();
  688. variant2 = output2->Fetch();
  689. ASSERT_EQUAL_FIELDS(variant1, variant2, 1, CUint, 111);
  690. ASSERT_EQUAL_FIELDS(variant1, variant2, 1, CString, "Hello!");
  691. ASSERT_EQUAL_FIELDS(variant1, variant2, 1, Arena, (arenas.Defined() ? arenas->at(1) : nullptr));
  692. variant1 = output1->Fetch();
  693. variant2 = output2->Fetch();
  694. ASSERT_EQUAL_FIELDS(variant1, variant2, 0, BInt, -23);
  695. ASSERT_EQUAL_FIELDS(variant1, variant2, 0, BString, "Hello!");
  696. ASSERT_EQUAL_FIELDS(variant1, variant2, 0, Arena, (arenas.Defined() ? arenas->at(0) : nullptr));
  697. variant1 = output1->Fetch();
  698. variant2 = output2->Fetch();
  699. UNIT_ASSERT_EQUAL(variant1.index(), 0);
  700. UNIT_ASSERT_EQUAL(variant2.index(), 0);
  701. UNIT_ASSERT_EQUAL(std::get<0>(variant1), nullptr);
  702. UNIT_ASSERT_EQUAL(std::get<0>(variant1), nullptr);
  703. #undef ASSERT_EQUAL_FIELDS
  704. }
  705. }
  706. Y_UNIT_TEST(TestMultiOutputs) {
  707. CheckMultiOutputs(Nothing());
  708. }
  709. Y_UNIT_TEST(TestSupportedTypes) {
  710. }
  711. Y_UNIT_TEST(TestProtobufArena) {
  712. {
  713. NPureCalcProto::TNestedRepeated input;
  714. input.SetX(20);
  715. {
  716. auto* item = input.MutableY()->Add();
  717. item->SetX(100);
  718. {
  719. auto* y = item->MutableY();
  720. *y = GetCanonicalMessage();
  721. y->SetFUint64(1000);
  722. }
  723. }
  724. {
  725. auto* item = input.MutableY()->Add();
  726. item->SetX(200);
  727. {
  728. auto* y = item->MutableY();
  729. *y = GetCanonicalMessage();
  730. y->SetFUint64(2000);
  731. }
  732. }
  733. google::protobuf::Arena arena;
  734. CheckPassThroughYql(input, &arena);
  735. }
  736. {
  737. google::protobuf::Arena arena1;
  738. google::protobuf::Arena arena2;
  739. TVector<google::protobuf::Arena*> arenas{&arena1, &arena2, &arena1};
  740. CheckMultiOutputs(arenas);
  741. }
  742. }
  743. Y_UNIT_TEST(TestFieldRenames) {
  744. using namespace NYql::NPureCalc;
  745. auto factory = MakeProgramFactory();
  746. TString query = "SELECT InputAlias AS OutputAlias FROM Input";
  747. auto inputProtoOptions = TProtoSchemaOptions();
  748. inputProtoOptions.SetFieldRenames({{"X", "InputAlias"}});
  749. auto inputSpec = TProtobufInputSpec<NPureCalcProto::TSimpleMessage>(
  750. Nothing(), std::move(inputProtoOptions)
  751. );
  752. auto outputProtoOptions = TProtoSchemaOptions();
  753. outputProtoOptions.SetFieldRenames({{"X", "OutputAlias"}});
  754. auto outputSpec = TProtobufOutputSpec<NPureCalcProto::TSimpleMessage>(
  755. std::move(outputProtoOptions)
  756. );
  757. {
  758. auto program = factory->MakePullStreamProgram(
  759. inputSpec, outputSpec, query, ETranslationMode::SQL
  760. );
  761. auto input = MakeHolder<TSimpleMessageStreamImpl>(1);
  762. auto output = program->Apply(std::move(input));
  763. TVector<int> expected = {1};
  764. TVector<int> actual;
  765. while (auto* x = output->Fetch()) {
  766. actual.push_back(x->GetX());
  767. }
  768. UNIT_ASSERT_VALUES_EQUAL(expected, actual);
  769. }
  770. {
  771. auto program = factory->MakePullListProgram(
  772. inputSpec, outputSpec, query, ETranslationMode::SQL
  773. );
  774. auto input = MakeHolder<TSimpleMessageStreamImpl>(1);
  775. auto output = program->Apply(std::move(input));
  776. TVector<int> expected = {1};
  777. TVector<int> actual;
  778. while (auto* x = output->Fetch()) {
  779. actual.push_back(x->GetX());
  780. }
  781. UNIT_ASSERT_VALUES_EQUAL(expected, actual);
  782. }
  783. {
  784. auto program = factory->MakePushStreamProgram(
  785. inputSpec, outputSpec, query, ETranslationMode::SQL
  786. );
  787. TVector<int> expected = {1, -100};
  788. TVector<int> actual;
  789. auto consumer = MakeHolder<TSimpleMessageConsumerImpl>(&actual);
  790. auto input = program->Apply(std::move(consumer));
  791. NPureCalcProto::TSimpleMessage Message;
  792. Message.SetX(1);
  793. input->OnObject(&Message);
  794. input->OnFinish();
  795. UNIT_ASSERT_VALUES_EQUAL(expected, actual);
  796. }
  797. }
  798. }