readers_ut.cpp 7.6 KB


  1. #include <yt/cpp/mapreduce/io/node_table_reader.h>
  2. #include <yt/cpp/mapreduce/io/proto_table_reader.h>
  3. #include <yt/cpp/mapreduce/io/skiff_table_reader.h>
  4. #include <yt/cpp/mapreduce/io/stream_table_reader.h>
  5. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  6. #include <yt/cpp/mapreduce/io/ut/ut_row.pb.h>
  7. #include <yt/cpp/mapreduce/skiff/checked_parser.h>
  8. #include <yt/cpp/mapreduce/skiff/skiff_schema.h>
  9. #include <library/cpp/yson/node/node_io.h>
  10. #include <library/cpp/testing/gtest/gtest.h>
  11. using namespace NYT;
  12. using namespace NYT::NDetail;
  13. using namespace NSkiff;
  14. ////////////////////////////////////////////////////////////////////////////////
  15. class TRetryEmulatingRawTableReader
  16. : public TRawTableReader
  17. {
  18. public:
  19. TRetryEmulatingRawTableReader(const TString& string)
  20. : String_(string)
  21. , Stream_(String_)
  22. { }
  23. bool Retry(
  24. const TMaybe<ui32>& /*rangeIndex*/,
  25. const TMaybe<ui64>& /*rowIndex*/,
  26. const std::exception_ptr& /*error*/) override
  27. {
  28. if (RetriesLeft_ == 0) {
  29. return false;
  30. }
  31. Stream_ = TStringStream(String_);
  32. --RetriesLeft_;
  33. return true;
  34. }
  35. void ResetRetries() override
  36. {
  37. RetriesLeft_ = 10;
  38. }
  39. bool HasRangeIndices() const override
  40. {
  41. return false;
  42. }
  43. private:
  44. size_t DoRead(void* buf, size_t len) override
  45. {
  46. switch (DoReadCallCount_++) {
  47. case 0:
  48. return Stream_.Read(buf, std::min(len, String_.size() / 2));
  49. case 1:
  50. ythrow yexception() << "Just wanted to test you, first fail";
  51. case 2:
  52. ythrow yexception() << "Just wanted to test you, second fail";
  53. default:
  54. return Stream_.Read(buf, len);
  55. }
  56. }
  57. private:
  58. const TString String_;
  59. TStringStream Stream_;
  60. int RetriesLeft_ = 10;
  61. int DoReadCallCount_ = 0;
  62. };
  63. ////////////////////////////////////////////////////////////////////////////////
  64. TEST(TReadersTest, YsonGood)
  65. {
  66. auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;b = \"string\"}; {c = {d=12}}");
  67. TNodeTableReader reader(proxy);
  68. TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))};
  69. for (const auto& expectedRow : expectedRows) {
  70. EXPECT_TRUE(reader.IsValid());
  71. EXPECT_TRUE(!reader.IsRawReaderExhausted());
  72. EXPECT_EQ(reader.GetRow(), expectedRow);
  73. reader.Next();
  74. }
  75. EXPECT_TRUE(!reader.IsValid());
  76. EXPECT_TRUE(reader.IsRawReaderExhausted());
  77. }
  78. TEST(TReadersTest, YsonBad)
  79. {
  80. auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;-b := \"string\"}; {c = {d=12}}");
  81. EXPECT_THROW(TNodeTableReader(proxy).GetRow(), yexception);
  82. }
  83. TEST(TReadersTest, SkiffGood)
  84. {
  85. const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\x06\x00\x00\x00""foobar" "\x01"
  86. "\x00\x00" "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" "\x03\x00\x00\x00""abc" "\x00";
  87. auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
  88. TSkiffSchemaPtr schema = CreateVariant16Schema({
  89. CreateTupleSchema({
  90. CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"),
  91. CreateSimpleTypeSchema(EWireType::String32)->SetName("b"),
  92. CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c")
  93. })
  94. });
  95. TSkiffTableReader reader(proxy, schema);
  96. TVector<TNode> expectedRows = {
  97. TNode()("a", 100500)("b", "foobar")("c", true),
  98. TNode()("a", -1)("b", "abc")("c", false),
  99. };
  100. for (const auto& expectedRow : expectedRows) {
  101. EXPECT_TRUE(reader.IsValid());
  102. EXPECT_TRUE(!reader.IsRawReaderExhausted());
  103. EXPECT_EQ(reader.GetRow(), expectedRow);
  104. reader.Next();
  105. }
  106. EXPECT_TRUE(!reader.IsValid());
  107. EXPECT_TRUE(reader.IsRawReaderExhausted());
  108. }
  109. TEST(TReadersTest, SkiffExtraColumns)
  110. {
  111. const char arr[] = "\x00\x00" "\x7B\x00\x00\x00\x00\x00\x00\x00";
  112. auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
  113. TSkiffSchemaPtr schema = CreateVariant16Schema({
  114. CreateTupleSchema({
  115. CreateSimpleTypeSchema(EWireType::Uint64)->SetName("$timestamp")
  116. })
  117. });
  118. TSkiffTableReader reader(proxy, schema);
  119. TVector<TNode> expectedRows = {
  120. TNode()("$timestamp", 123u),
  121. };
  122. for (const auto& expectedRow : expectedRows) {
  123. EXPECT_TRUE(reader.IsValid());
  124. EXPECT_TRUE(!reader.IsRawReaderExhausted());
  125. EXPECT_EQ(reader.GetRow(), expectedRow);
  126. reader.Next();
  127. }
  128. EXPECT_TRUE(!reader.IsValid());
  129. EXPECT_TRUE(reader.IsRawReaderExhausted());
  130. }
  131. TEST(TReadersTest, SkiffBad)
  132. {
  133. const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\xFF\x00\x00\x00""foobar" "\x01";
  134. auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
  135. TSkiffSchemaPtr schema = CreateVariant16Schema({
  136. CreateTupleSchema({
  137. CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"),
  138. CreateSimpleTypeSchema(EWireType::String32)->SetName("b"),
  139. CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c")
  140. })
  141. });
  142. EXPECT_THROW(TSkiffTableReader(proxy, schema).GetRow(), yexception);
  143. }
  144. TEST(TReadersTest, SkiffBadFormat)
  145. {
  146. const char arr[] = "\x00\x00" "\x12" "\x23\x34\x00\x00";
  147. auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
  148. TSkiffSchemaPtr schema = CreateVariant16Schema({
  149. CreateTupleSchema({
  150. CreateVariant8Schema({
  151. CreateSimpleTypeSchema(EWireType::Nothing),
  152. CreateSimpleTypeSchema(EWireType::Int32)
  153. })
  154. })
  155. });
  156. EXPECT_THROW_MESSAGE_HAS_SUBSTR(
  157. TSkiffTableReader(proxy, schema).GetRow(),
  158. yexception,
  159. "Tag for 'variant8<nothing,int32>' expected to be 0 or 1");
  160. }
  161. TEST(TReadersTest, ProtobufGood)
  162. {
  163. using NYT::NTesting::TRow;
  164. const char arr[] = "\x13\x00\x00\x00" "\x0A""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00"
  165. "\x10\x00\x00\x00" "\x0A""\x03""abc" "\x10""\x1F" "\x19""\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF";
  166. auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
  167. TLenvalProtoTableReader reader(proxy, {TRow::descriptor()});
  168. TRow row1, row2;
  169. row1.set_string_field("foobar");
  170. row1.set_int32_field(15);
  171. row1.set_fixed64_field(100500);
  172. row2.set_string_field("abc");
  173. row2.set_int32_field(31);
  174. row2.set_fixed64_field(-1);
  175. TVector<TRow> expectedRows = {row1, row2};
  176. for (const auto& expectedRow : expectedRows) {
  177. TRow row;
  178. EXPECT_TRUE(reader.IsValid());
  179. EXPECT_TRUE(!reader.IsRawReaderExhausted());
  180. reader.ReadRow(&row);
  181. EXPECT_EQ(row.string_field(), expectedRow.string_field());
  182. EXPECT_EQ(row.int32_field(), expectedRow.int32_field());
  183. EXPECT_EQ(row.fixed64_field(), expectedRow.fixed64_field());
  184. reader.Next();
  185. }
  186. EXPECT_TRUE(!reader.IsValid());
  187. EXPECT_TRUE(reader.IsRawReaderExhausted());
  188. }
  189. TEST(TReadersTest, ProtobufBad)
  190. {
  191. const char arr[] = "\x13\x00\x00\x00" "\x0F""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00";
  192. auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
  193. NYT::NTesting::TRow row;
  194. EXPECT_THROW(TLenvalProtoTableReader(proxy, { NYT::NTesting::TRow::descriptor() }).ReadRow(&row), yexception);
  195. }
  196. ////////////////////////////////////////////////////////////////////////////////