123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- #include "skiff_row_table_reader.h"
- #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
- #include <yt/cpp/mapreduce/interface/skiff_row.h>
- #include <library/cpp/skiff/skiff.h>
- #include <library/cpp/yt/logging/logger.h>
- namespace NYT {
- ////////////////////////////////////////////////////////////////////////////////
- TSkiffRowTableReader::TSkiffRowTableReader(
- ::TIntrusivePtr<TRawTableReader> input,
- const NSkiff::TSkiffSchemaPtr& schema,
- TVector<ISkiffRowSkipperPtr>&& skippers,
- NDetail::TCreateSkiffSchemaOptions&& options)
- : Input_(std::move(input))
- , BufferedInput_(&Input_)
- , Parser_({schema, &BufferedInput_})
- , Skippers_(std::move(skippers))
- , Options_(std::move(options))
- {
- Next();
- }
- TSkiffRowTableReader::~TSkiffRowTableReader()
- { }
- bool TSkiffRowTableReader::Retry()
- {
- if (PrepareRetry()) {
- RowTaken_ = true;
- Next();
- return true;
- }
- return false;
- }
- bool TSkiffRowTableReader::PrepareRetry()
- {
- if (Input_.Retry(RangeIndex_, RowIndex_)) {
- if (RangeIndex_) {
- RangeIndexShift_ += *RangeIndex_;
- }
- RowIndex_.Clear();
- RangeIndex_.Clear();
- BufferedInput_ = TBufferedInput(&Input_);
- Parser_.emplace(&BufferedInput_);
- return true;
- }
- return false;
- }
- void TSkiffRowTableReader::ReadRow(const ISkiffRowParserPtr& parser)
- {
- while (true) {
- try {
- parser->Parse(&Parser_.value());
- RowTaken_ = true;
- // We successfully parsed one more row from the stream,
- // so reset retry count to their initial value.
- Input_.ResetRetries();
- break;
- } catch (const std::exception& ex) {
- YT_LOG_ERROR("Read error during parsing: %v", ex.what());
- if (!Retry()) {
- throw;
- }
- }
- }
- }
- bool TSkiffRowTableReader::IsValid() const
- {
- return Valid_;
- }
- void TSkiffRowTableReader::SkipRow()
- {
- CheckValidity();
- while (true) {
- try {
- Skippers_[TableIndex_]->SkipRow(&Parser_.value());
- break;
- } catch (const std::exception& ex) {
- YT_LOG_ERROR("Read error during skipping row: %v", ex.what());
- if (!Retry()) {
- throw;
- }
- }
- }
- }
- void TSkiffRowTableReader::CheckValidity() const {
- if (!IsValid()) {
- ythrow yexception() << "Iterator is not valid";
- }
- }
- void TSkiffRowTableReader::Next()
- {
- if (!RowTaken_) {
- SkipRow();
- }
- CheckValidity();
- if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) {
- Finished_ = true;
- Valid_ = false;
- return;
- }
- if (AfterKeySwitch_) {
- AfterKeySwitch_ = false;
- return;
- }
- if (RowIndex_) {
- ++*RowIndex_;
- }
- while (true) {
- try {
- auto tag = Parser_->ParseVariant16Tag();
- if (tag == NSkiff::EndOfSequenceTag<ui16>()) {
- IsEndOfStream_ = true;
- break;
- } else {
- TableIndex_ = tag;
- }
- if (TableIndex_ >= Skippers_.size()) {
- ythrow TIOException() <<
- "Table index " << TableIndex_ <<
- " is out of range [0, " << Skippers_.size() <<
- ") in read";
- }
- if (Options_.HasKeySwitch_) {
- auto keySwitch = Parser_->ParseBoolean();
- if (keySwitch) {
- AfterKeySwitch_ = true;
- Valid_ = false;
- }
- }
- auto tagRowIndex = Parser_->ParseVariant8Tag();
- if (tagRowIndex == 1) {
- RowIndex_ = Parser_->ParseInt64();
- } else {
- Y_ENSURE(tagRowIndex == 0, "Tag for row_index was expected to be 0 or 1, got " << tagRowIndex);
- }
- if (Options_.HasRangeIndex_) {
- auto tagRangeIndex = Parser_->ParseVariant8Tag();
- if (tagRangeIndex == 1) {
- RangeIndex_ = Parser_->ParseInt64();
- } else {
- Y_ENSURE(tagRangeIndex == 0, "Tag for range_index was expected to be 0 or 1, got " << tagRangeIndex);
- }
- }
- break;
- } catch (const std::exception& ex) {
- YT_LOG_ERROR("Read error: %v", ex.what());
- if (!PrepareRetry()) {
- throw;
- }
- }
- }
- RowTaken_ = false;
- }
- ui32 TSkiffRowTableReader::GetTableIndex() const
- {
- CheckValidity();
- return TableIndex_;
- }
- ui32 TSkiffRowTableReader::GetRangeIndex() const
- {
- CheckValidity();
- return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
- }
- ui64 TSkiffRowTableReader::GetRowIndex() const
- {
- CheckValidity();
- return RowIndex_.GetOrElse(0ULL);
- }
- void TSkiffRowTableReader::NextKey() {
- while (Valid_) {
- Next();
- }
- if (Finished_) {
- return;
- }
- Valid_ = true;
- if (RowIndex_) {
- --*RowIndex_;
- }
- RowTaken_ = true;
- }
- TMaybe<size_t> TSkiffRowTableReader::GetReadByteCount() const {
- return Input_.GetReadByteCount();
- }
- bool TSkiffRowTableReader::IsEndOfStream() const {
- return IsEndOfStream_;
- }
- bool TSkiffRowTableReader::IsRawReaderExhausted() const {
- return Finished_;
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT
|