#include #include using namespace NYT; template <> void Out>(IOutputStream& out, const std::tuple& value) { out << "{" << std::get<0>(value) << ", " << std::get<1>(value) << ", " << std::get<2>(value) << "}"; } //////////////////////////////////////////////////////////////////////////////// class TRowCollection { public: void AddRow(TStringBuf key, TStringBuf subkey, TStringBuf value) { TStringStream row; auto appendLenval = [&] (TStringBuf value) { ui32 size = value.size(); row.Write(&size, sizeof(size)); row.Write(value); }; appendLenval(key); appendLenval(subkey); appendLenval(value); RowList_.push_back(row.Str()); } TString GetStreamDataStartFromRow(ui64 rowIndex) const { Y_ABORT_UNLESS(rowIndex < RowList_.size()); TStringStream ss; ss.Write("\xFC\xFF\xFF\xFF"); ss.Write(&rowIndex, sizeof(rowIndex)); for (size_t i = rowIndex; i != RowList_.size(); ++i) { ss.Write(RowList_[i]); } return ss.Str(); } size_t ComputeTotalStreamSize() const { return GetStreamDataStartFromRow(0).size(); } private: TVector RowList_; }; class TTestRawTableReader : public TRawTableReader { public: TTestRawTableReader(const TRowCollection& rowCollection) : RowCollection_(rowCollection) , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0)) , Input_(MakeHolder(DataToRead_)) { } TTestRawTableReader(const TRowCollection& rowCollection, size_t breakPoint) : RowCollection_(rowCollection) , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0).substr(0, breakPoint)) , Input_(MakeHolder(DataToRead_)) , Broken_(true) { } size_t DoRead(void* buf, size_t size) override { Y_ABORT_UNLESS(Input_); size_t res = Input_->Read(buf, size); if (!res && Broken_) { ythrow yexception() << "Stream is broken"; } return res; } bool Retry( const TMaybe& /*rangeIndex*/, const TMaybe& rowIndex, const std::exception_ptr& /*error*/) override { if (--Retries < 0) { return false; } ui64 actualRowIndex = rowIndex ? *rowIndex : 0; DataToRead_ = RowCollection_.GetStreamDataStartFromRow(actualRowIndex); Input_ = MakeHolder(DataToRead_); Broken_ = false; return true; } void ResetRetries() override { } bool HasRangeIndices() const override { return false; } private: TRowCollection RowCollection_; TString DataToRead_; THolder Input_; bool Broken_ = false; i32 Retries = 1; }; TEST(TYamrTableReaderTest, TestReadRetry) { const TVector> expectedResult = { {"foo1", "bar1", "baz1"}, {"foo2", "bar2", "baz2"}, {"foo3", "bar3", "baz3"}, }; TRowCollection rowCollection; for (const auto& row : expectedResult) { rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row)); } ssize_t streamSize = rowCollection.ComputeTotalStreamSize(); for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) { ::TIntrusivePtr rawReader; if (breakPoint == -1) { rawReader = ::MakeIntrusive(rowCollection); } else { rawReader = ::MakeIntrusive(rowCollection, static_cast(breakPoint)); } TYaMRTableReader tableReader(rawReader); TVector> actualResult; for (; tableReader.IsValid(); tableReader.Next()) { EXPECT_TRUE(!tableReader.IsRawReaderExhausted()); auto row = tableReader.GetRow(); actualResult.emplace_back(row.Key, row.SubKey, row.Value); } EXPECT_TRUE(tableReader.IsRawReaderExhausted()); EXPECT_EQ(actualResult, expectedResult); } } TEST(TYamrTableReaderTest, TestSkipRetry) { const TVector> expectedResult = { {"foo1", "bar1", "baz1"}, {"foo2", "bar2", "baz2"}, {"foo3", "bar3", "baz3"}, }; TRowCollection rowCollection; for (const auto& row : expectedResult) { rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row)); } ssize_t streamSize = rowCollection.ComputeTotalStreamSize(); for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) { try { ::TIntrusivePtr rawReader; if (breakPoint == -1) { rawReader = ::MakeIntrusive(rowCollection); } else { rawReader = ::MakeIntrusive(rowCollection, static_cast(breakPoint)); } TYaMRTableReader tableReader(rawReader); ui32 rowCount = 0; for (; tableReader.IsValid(); tableReader.Next()) { EXPECT_TRUE(!tableReader.IsRawReaderExhausted()); ++rowCount; } EXPECT_TRUE(tableReader.IsRawReaderExhausted()); EXPECT_EQ(rowCount, 3u); } catch (const std::exception& ex) { Cerr << breakPoint << Endl; Cerr << ex.what() << Endl; throw; } } } ////////////////////////////////////////////////////////////////////////////////