yamr_table_reader_ut.cpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  2. #include <library/cpp/testing/gtest/gtest.h>
  3. using namespace NYT;
  4. template <>
  5. void Out<std::tuple<TString, TString, TString>>(IOutputStream& out, const std::tuple<TString, TString, TString>& value) {
  6. out << "{" << std::get<0>(value) << ", " << std::get<1>(value) << ", " << std::get<2>(value) << "}";
  7. }
  8. ////////////////////////////////////////////////////////////////////////////////
  9. class TRowCollection
  10. {
  11. public:
  12. void AddRow(TStringBuf key, TStringBuf subkey, TStringBuf value)
  13. {
  14. TStringStream row;
  15. auto appendLenval = [&] (TStringBuf value) {
  16. ui32 size = value.size();
  17. row.Write(&size, sizeof(size));
  18. row.Write(value);
  19. };
  20. appendLenval(key);
  21. appendLenval(subkey);
  22. appendLenval(value);
  23. RowList_.push_back(row.Str());
  24. }
  25. TString GetStreamDataStartFromRow(ui64 rowIndex) const
  26. {
  27. Y_ABORT_UNLESS(rowIndex < RowList_.size());
  28. TStringStream ss;
  29. ss.Write("\xFC\xFF\xFF\xFF");
  30. ss.Write(&rowIndex, sizeof(rowIndex));
  31. for (size_t i = rowIndex; i != RowList_.size(); ++i) {
  32. ss.Write(RowList_[i]);
  33. }
  34. return ss.Str();
  35. }
  36. size_t ComputeTotalStreamSize() const {
  37. return GetStreamDataStartFromRow(0).size();
  38. }
  39. private:
  40. TVector<TString> RowList_;
  41. };
  42. class TTestRawTableReader
  43. : public TRawTableReader
  44. {
  45. public:
  46. TTestRawTableReader(const TRowCollection& rowCollection)
  47. : RowCollection_(rowCollection)
  48. , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0))
  49. , Input_(MakeHolder<TStringStream>(DataToRead_))
  50. { }
  51. TTestRawTableReader(const TRowCollection& rowCollection, size_t breakPoint)
  52. : RowCollection_(rowCollection)
  53. , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0).substr(0, breakPoint))
  54. , Input_(MakeHolder<TStringStream>(DataToRead_))
  55. , Broken_(true)
  56. { }
  57. size_t DoRead(void* buf, size_t size) override
  58. {
  59. Y_ABORT_UNLESS(Input_);
  60. size_t res = Input_->Read(buf, size);
  61. if (!res && Broken_) {
  62. ythrow yexception() << "Stream is broken";
  63. }
  64. return res;
  65. }
  66. bool Retry(
  67. const TMaybe<ui32>& /*rangeIndex*/,
  68. const TMaybe<ui64>& rowIndex,
  69. const std::exception_ptr& /*error*/) override
  70. {
  71. if (--Retries < 0) {
  72. return false;
  73. }
  74. ui64 actualRowIndex = rowIndex ? *rowIndex : 0;
  75. DataToRead_ = RowCollection_.GetStreamDataStartFromRow(actualRowIndex);
  76. Input_ = MakeHolder<TStringInput>(DataToRead_);
  77. Broken_ = false;
  78. return true;
  79. }
  80. void ResetRetries() override
  81. { }
  82. bool HasRangeIndices() const override
  83. {
  84. return false;
  85. }
  86. private:
  87. TRowCollection RowCollection_;
  88. TString DataToRead_;
  89. THolder<IInputStream> Input_;
  90. bool Broken_ = false;
  91. i32 Retries = 1;
  92. };
  93. TEST(TYamrTableReaderTest, TestReadRetry)
  94. {
  95. const TVector<std::tuple<TString, TString, TString>> expectedResult = {
  96. {"foo1", "bar1", "baz1"},
  97. {"foo2", "bar2", "baz2"},
  98. {"foo3", "bar3", "baz3"},
  99. };
  100. TRowCollection rowCollection;
  101. for (const auto& row : expectedResult) {
  102. rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row));
  103. }
  104. ssize_t streamSize = rowCollection.ComputeTotalStreamSize();
  105. for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) {
  106. ::TIntrusivePtr<TRawTableReader> rawReader;
  107. if (breakPoint == -1) {
  108. rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection);
  109. } else {
  110. rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint));
  111. }
  112. TYaMRTableReader tableReader(rawReader);
  113. TVector<std::tuple<TString, TString, TString>> actualResult;
  114. for (; tableReader.IsValid(); tableReader.Next()) {
  115. EXPECT_TRUE(!tableReader.IsRawReaderExhausted());
  116. auto row = tableReader.GetRow();
  117. actualResult.emplace_back(row.Key, row.SubKey, row.Value);
  118. }
  119. EXPECT_TRUE(tableReader.IsRawReaderExhausted());
  120. EXPECT_EQ(actualResult, expectedResult);
  121. }
  122. }
  123. TEST(TYamrTableReaderTest, TestSkipRetry)
  124. {
  125. const TVector<std::tuple<TString, TString, TString>> expectedResult = {
  126. {"foo1", "bar1", "baz1"},
  127. {"foo2", "bar2", "baz2"},
  128. {"foo3", "bar3", "baz3"},
  129. };
  130. TRowCollection rowCollection;
  131. for (const auto& row : expectedResult) {
  132. rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row));
  133. }
  134. ssize_t streamSize = rowCollection.ComputeTotalStreamSize();
  135. for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) {
  136. try {
  137. ::TIntrusivePtr<TRawTableReader> rawReader;
  138. if (breakPoint == -1) {
  139. rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection);
  140. } else {
  141. rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint));
  142. }
  143. TYaMRTableReader tableReader(rawReader);
  144. ui32 rowCount = 0;
  145. for (; tableReader.IsValid(); tableReader.Next()) {
  146. EXPECT_TRUE(!tableReader.IsRawReaderExhausted());
  147. ++rowCount;
  148. }
  149. EXPECT_TRUE(tableReader.IsRawReaderExhausted());
  150. EXPECT_EQ(rowCount, 3u);
  151. } catch (const std::exception& ex) {
  152. Cerr << breakPoint << Endl;
  153. Cerr << ex.what() << Endl;
  154. throw;
  155. }
  156. }
  157. }
  158. ////////////////////////////////////////////////////////////////////////////////