#include "node_table_reader.h" #include #include #include #include namespace NYT { //////////////////////////////////////////////////////////////////////////////// class TRowBuilder : public ::NYson::TYsonConsumerBase { public: explicit TRowBuilder(TMaybe* resultRow); void OnStringScalar(TStringBuf value) override; void OnInt64Scalar(i64 value) override; void OnUint64Scalar(ui64 value) override; void OnDoubleScalar(double value) override; void OnBooleanScalar(bool value) override; void OnBeginList() override; void OnEntity() override; void OnListItem() override; void OnEndList() override; void OnBeginMap() override; void OnKeyedItem(TStringBuf key) override; void OnEndMap() override; void OnBeginAttributes() override; void OnEndAttributes() override; void Finalize(); private: THolder Builder_; TRowElement Row_; int Depth_ = 0; bool Started_ = false; TMaybe* ResultRow_; void SaveResultRow(); }; TRowBuilder::TRowBuilder(TMaybe* resultRow) : ResultRow_(resultRow) { } void TRowBuilder::OnStringScalar(TStringBuf value) { Row_.Size += sizeof(TNode) + sizeof(TString) + value.size(); Builder_->OnStringScalar(value); } void TRowBuilder::OnInt64Scalar(i64 value) { Row_.Size += sizeof(TNode); Builder_->OnInt64Scalar(value); } void TRowBuilder::OnUint64Scalar(ui64 value) { Row_.Size += sizeof(TNode); Builder_->OnUint64Scalar(value); } void TRowBuilder::OnDoubleScalar(double value) { Row_.Size += sizeof(TNode); Builder_->OnDoubleScalar(value); } void TRowBuilder::OnBooleanScalar(bool value) { Row_.Size += sizeof(TNode); Builder_->OnBooleanScalar(value); } void TRowBuilder::OnBeginList() { ++Depth_; Builder_->OnBeginList(); } void TRowBuilder::OnEntity() { Row_.Size += sizeof(TNode); Builder_->OnEntity(); } void TRowBuilder::OnListItem() { if (Depth_ == 0) { SaveResultRow(); } else { Builder_->OnListItem(); } } void TRowBuilder::OnEndList() { --Depth_; Builder_->OnEndList(); } void TRowBuilder::OnBeginMap() { ++Depth_; Builder_->OnBeginMap(); } void TRowBuilder::OnKeyedItem(TStringBuf key) { Row_.Size += sizeof(TString) + key.size(); Builder_->OnKeyedItem(key); } void TRowBuilder::OnEndMap() { --Depth_; Builder_->OnEndMap(); } void TRowBuilder::OnBeginAttributes() { ++Depth_; Builder_->OnBeginAttributes(); } void TRowBuilder::OnEndAttributes() { --Depth_; Builder_->OnEndAttributes(); } void TRowBuilder::SaveResultRow() { if (!Started_) { Started_ = true; } else { *ResultRow_ = std::move(Row_); } Row_.Reset(); Builder_.Reset(new TNodeBuilder(&Row_.Node)); } void TRowBuilder::Finalize() { if (Started_) { *ResultRow_ = std::move(Row_); } } //////////////////////////////////////////////////////////////////////////////// TNodeTableReader::TNodeTableReader(::TIntrusivePtr input) : Input_(std::move(input)) { PrepareParsing(); Next(); } TNodeTableReader::~TNodeTableReader() { } void TNodeTableReader::ParseListFragmentItem() { if (!Parser_->Parse()) { Builder_->Finalize(); IsLast_ = true; } } const TNode& TNodeTableReader::GetRow() const { CheckValidity(); if (!Row_) { ythrow yexception() << "Row is moved"; } return Row_->Node; } void TNodeTableReader::MoveRow(TNode* result) { CheckValidity(); if (!Row_) { ythrow yexception() << "Row is moved"; } *result = std::move(Row_->Node); Row_.Clear(); } bool TNodeTableReader::IsValid() const { return Valid_; } void TNodeTableReader::Next() { try { NextImpl(); } catch (const std::exception& ex) { YT_LOG_ERROR("TNodeTableReader::Next failed: %v", ex.what()); throw; } } void TNodeTableReader::NextImpl() { CheckValidity(); if (RowIndex_) { ++*RowIndex_; } // At the begin of stream parser doesn't return a finished row. ParseFirstListFragmentItem(); while (true) { if (IsLast_) { Finished_ = true; Valid_ = false; break; } try { ParseListFragmentItem(); } catch (std::exception& ex) { NeedParseFirst_ = true; OnStreamError(std::current_exception(), ex.what()); ParseFirstListFragmentItem(); continue; } Row_ = std::move(*NextRow_); if (!Row_) { throw yexception() << "No row in NextRow_"; } // We successfully parsed one more row from the stream, // so reset retry count to their initial value. Input_.ResetRetries(); if (!Row_->Node.IsNull()) { AtStart_ = false; break; } for (auto& entry : Row_->Node.GetAttributes().AsMap()) { if (entry.first == "key_switch") { if (!AtStart_) { Valid_ = false; } } else if (entry.first == "table_index") { TableIndex_ = static_cast(entry.second.AsInt64()); } else if (entry.first == "row_index") { RowIndex_ = static_cast(entry.second.AsInt64()); } else if (entry.first == "range_index") { RangeIndex_ = static_cast(entry.second.AsInt64()); } else if (entry.first == "tablet_index") { TabletIndex_ = entry.second.AsInt64(); } else if (entry.first == "end_of_stream") { IsEndOfStream_ = true; } } if (!Valid_) { break; } } } void TNodeTableReader::ParseFirstListFragmentItem() { while (NeedParseFirst_) { try { ParseListFragmentItem(); NeedParseFirst_ = false; break; } catch (std::exception& ex) { OnStreamError(std::current_exception(), ex.what()); } } } ui32 TNodeTableReader::GetTableIndex() const { CheckValidity(); return TableIndex_; } ui32 TNodeTableReader::GetRangeIndex() const { CheckValidity(); return RangeIndex_.GetOrElse(0) + RangeIndexShift_; } ui64 TNodeTableReader::GetRowIndex() const { CheckValidity(); return RowIndex_.GetOrElse(0UL); } i64 TNodeTableReader::GetTabletIndex() const { CheckValidity(); return TabletIndex_.GetOrElse(0L); } void TNodeTableReader::NextKey() { while (Valid_) { Next(); } if (Finished_) { return; } Valid_ = true; if (RowIndex_) { --*RowIndex_; } } TMaybe TNodeTableReader::GetReadByteCount() const { return Input_.GetReadByteCount(); } bool TNodeTableReader::IsEndOfStream() const { return IsEndOfStream_; } bool TNodeTableReader::IsRawReaderExhausted() const { return Finished_; } //////////////////////////////////////////////////////////////////////////////// void TNodeTableReader::PrepareParsing() { NextRow_.Clear(); Builder_.Reset(new TRowBuilder(&NextRow_)); Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_)); } void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error) { YT_LOG_ERROR("Read error (RangeIndex: %v, RowIndex: %v, Error: %v)", RangeIndex_, RowIndex_, error); Exception_ = exception; if (Input_.Retry(RangeIndex_, RowIndex_)) { if (RangeIndex_) { RangeIndexShift_ += *RangeIndex_; } RowIndex_.Clear(); RangeIndex_.Clear(); PrepareParsing(); } else { std::rethrow_exception(Exception_); } } void TNodeTableReader::CheckValidity() const { if (!Valid_) { ythrow yexception() << "Iterator is not valid"; } } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT