#pragma once #ifndef IO_INL_H_ #error "Direct inclusion of this file is not allowed, use io.h" #endif #undef IO_INL_H_ #include "finish_or_die.h" #include #include #include #include #include #include #include namespace NYT { //////////////////////////////////////////////////////////////////////////////// namespace NDetail { template struct TIsProtoOneOf : std::false_type { }; template struct TIsProtoOneOf> : std::true_type { }; template struct TIsSkiffRowOneOf : std::false_type { }; template struct TIsSkiffRowOneOf> : std::true_type { }; } // namespace NDetail //////////////////////////////////////////////////////////////////////////////// template struct TRowTraits; template <> struct TRowTraits { using TRowType = TNode; using IReaderImpl = INodeReaderImpl; using IWriterImpl = INodeWriterImpl; }; template <> struct TRowTraits { using TRowType = TYaMRRow; using IReaderImpl = IYaMRReaderImpl; using IWriterImpl = IYaMRWriterImpl; }; template <> struct TRowTraits { using TRowType = Message; using IReaderImpl = IProtoReaderImpl; using IWriterImpl = IProtoWriterImpl; }; template struct TRowTraits::Value>> { using TRowType = T; using IReaderImpl = IProtoReaderImpl; using IWriterImpl = IProtoWriterImpl; }; template struct TRowTraits::value>> { using TRowType = T; using IReaderImpl = ISkiffRowReaderImpl; }; template struct TRowTraits> { using TRowType = TSkiffRowOneOf; using IReaderImpl = ISkiffRowReaderImpl; }; template struct TRowTraits> { using TRowType = TProtoOneOf; using IReaderImpl = IProtoReaderImpl; using IWriterImpl = IProtoWriterImpl; }; //////////////////////////////////////////////////////////////////////////////// struct IReaderImplBase : public TThrRefBase { virtual bool IsValid() const = 0; virtual void Next() = 0; virtual ui32 GetTableIndex() const = 0; virtual ui32 GetRangeIndex() const = 0; virtual ui64 GetRowIndex() const = 0; virtual void NextKey() = 0; // Not pure virtual because of clients that has already implemented this interface. virtual TMaybe GetReadByteCount() const; virtual i64 GetTabletIndex() const; virtual bool IsEndOfStream() const; virtual bool IsRawReaderExhausted() const; }; struct INodeReaderImpl : public IReaderImplBase { virtual const TNode& GetRow() const = 0; virtual void MoveRow(TNode* row) = 0; }; struct IYaMRReaderImpl : public IReaderImplBase { virtual const TYaMRRow& GetRow() const = 0; virtual void MoveRow(TYaMRRow* row) { *row = GetRow(); } }; struct IProtoReaderImpl : public IReaderImplBase { virtual void ReadRow(Message* row) = 0; }; struct ISkiffRowReaderImpl : public IReaderImplBase { virtual void ReadRow(const ISkiffRowParserPtr& parser) = 0; }; //////////////////////////////////////////////////////////////////////////////// namespace NDetail { //////////////////////////////////////////////////////////////////////////////// // We don't include in this file // to avoid macro name clashes (specifically YT_LOG_DEBUG) void LogTableReaderStatistics(ui64 rowCount, TMaybe byteCount); template class TTableReaderBase : public TThrRefBase { public: using TRowType = typename TRowTraits::TRowType; using IReaderImpl = typename TRowTraits::IReaderImpl; explicit TTableReaderBase(::TIntrusivePtr reader) : Reader_(reader) { } ~TTableReaderBase() override { NDetail::LogTableReaderStatistics(ReadRowCount_, Reader_->GetReadByteCount()); } bool IsValid() const { return Reader_->IsValid(); } void Next() { Reader_->Next(); ++ReadRowCount_; RowState_ = ERowState::None; } bool IsEndOfStream() { return Reader_->IsEndOfStream(); } bool IsRawReaderExhausted() { return Reader_->IsRawReaderExhausted(); } ui32 GetTableIndex() const { return Reader_->GetTableIndex(); } ui32 GetRangeIndex() const { return Reader_->GetRangeIndex(); } ui64 GetRowIndex() const { return Reader_->GetRowIndex(); } i64 GetTabletIndex() const { return Reader_->GetTabletIndex(); } protected: template const auto& DoGetRowCached(TCacher cacher, TCacheGetter cacheGetter) const { switch (RowState_) { case ERowState::None: cacher(); RowState_ = ERowState::Cached; break; case ERowState::Cached: break; case ERowState::MovedOut: ythrow yexception() << "Row is already moved"; } return *cacheGetter(); } template void DoMoveRowCached(U* result, TMover mover, TCacheMover cacheMover) { Y_ABORT_UNLESS(result); switch (RowState_) { case ERowState::None: mover(result); break; case ERowState::Cached: cacheMover(result); break; case ERowState::MovedOut: ythrow yexception() << "Row is already moved"; } RowState_ = ERowState::MovedOut; } private: enum class ERowState { None, Cached, MovedOut, }; protected: ::TIntrusivePtr Reader_; private: ui64 ReadRowCount_ = 0; mutable ERowState RowState_ = ERowState::None; }; template class TSimpleTableReader : public TTableReaderBase { public: using TBase = TTableReaderBase; using typename TBase::TRowType; using TBase::TBase; const TRowType& GetRow() const { // Caching is implemented in underlying reader. return TBase::DoGetRowCached( /* cacher */ [&] {}, /* cacheGetter */ [&] { return &Reader_->GetRow(); }); } void MoveRow(TRowType* result) { // Caching is implemented in underlying reader. TBase::DoMoveRowCached( result, /* mover */ [&] (TRowType* result) { Reader_->MoveRow(result); }, /* cacheMover */ [&] (TRowType* result) { Reader_->MoveRow(result); }); } TRowType MoveRow() { TRowType result; MoveRow(&result); return result; } private: using TBase::Reader_; }; //////////////////////////////////////////////////////////////////////////////// } // namespace NDetail template <> class TTableReader : public NDetail::TSimpleTableReader { using TSimpleTableReader::TSimpleTableReader; }; template <> class TTableReader : public NDetail::TSimpleTableReader { using TSimpleTableReader::TSimpleTableReader; }; template <> class TTableReader : public NDetail::TTableReaderBase { public: using TBase = NDetail::TTableReaderBase; using TBase::TBase; template const U& GetRow() const { static_assert(TIsBaseOf::Value); return TBase::DoGetRowCached( /* cacher */ [&] { CachedRow_.Reset(new U); Reader_->ReadRow(CachedRow_.Get()); }, /* cacheGetter */ [&] { auto result = dynamic_cast(CachedRow_.Get()); Y_ABORT_UNLESS(result); return result; }); } template void MoveRow(U* result) { static_assert(TIsBaseOf::Value); TBase::DoMoveRowCached( result, /* mover */ [&] (U* result) { Reader_->ReadRow(result); }, /* cacheMover */ [&] (U* result) { auto cast = dynamic_cast(CachedRow_.Get()); Y_ABORT_UNLESS(cast); result->Swap(cast); }); } template U MoveRow() { static_assert(TIsBaseOf::Value); U result; MoveRow(&result); return result; } ::TIntrusivePtr GetReaderImpl() const { return Reader_; } private: using TBase::Reader_; mutable THolder CachedRow_; }; template class TTableReader> : public NDetail::TTableReaderBase> { public: using TBase = NDetail::TTableReaderBase>; using TBase::TBase; template const U& GetRow() const { AssertIsOneOf(); return TBase::DoGetRowCached( /* cacher */ [&] { Reader_->ReadRow(&std::get(CachedRows_)); CachedIndex_ = NDetail::TIndexInTuple::Value; }, /* cacheGetter */ [&] { return &std::get(CachedRows_); }); } template void MoveRow(U* result) { AssertIsOneOf(); return TBase::DoMoveRowCached( result, /* mover */ [&] (U* result) { Reader_->ReadRow(result); }, /* cacheMover */ [&] (U* result) { Y_ABORT_UNLESS((NDetail::TIndexInTuple::Value) == CachedIndex_); *result = std::move(std::get(CachedRows_)); }); } template U MoveRow() { U result; MoveRow(&result); return result; } ::TIntrusivePtr GetReaderImpl() const { return Reader_; } private: using TBase::Reader_; // std::variant could also be used here, but std::tuple leads to better performance // because of deallocations that std::variant has to do mutable std::tuple CachedRows_; mutable int CachedIndex_; template static constexpr void AssertIsOneOf() { static_assert( (std::is_same::value || ...), "Template parameter must be one of TProtoOneOf template parameter"); } }; template class TTableReader::Value>> : public TTableReader> { public: using TRowType = T; using TBase = TTableReader>; using TBase::TBase; const T& GetRow() const { return TBase::template GetRow(); } void MoveRow(T* result) { TBase::template MoveRow(result); } T MoveRow() { return TBase::template MoveRow(); } }; template class TTableReader> : public NDetail::TTableReaderBase> { public: using TBase = NDetail::TTableReaderBase>; using TBase::TBase; explicit TTableReader(::TIntrusivePtr reader, const TMaybe& hints) : TBase(reader) , Parsers_({(CreateSkiffParser(&std::get(CachedRows_), hints))...}) { } template const U& GetRow() const { AssertIsOneOf(); return TBase::DoGetRowCached( /* cacher */ [&] { auto index = NDetail::TIndexInTuple::Value; Reader_->ReadRow(Parsers_[index]); CachedIndex_ = index; }, /* cacheGetter */ [&] { return &std::get(CachedRows_); }); } template void MoveRow(U* result) { AssertIsOneOf(); return TBase::DoMoveRowCached( result, /* mover */ [&] (U* result) { auto index = NDetail::TIndexInTuple::Value; Reader_->ReadRow(Parsers_[index]); *result = std::move(std::get(CachedRows_)); }, /* cacheMover */ [&] (U* result) { Y_ABORT_UNLESS((NDetail::TIndexInTuple::Value) == CachedIndex_); *result = std::move(std::get(CachedRows_)); }); } template U MoveRow() { U result; MoveRow(&result); return result; } ::TIntrusivePtr GetReaderImpl() const { return Reader_; } private: using TBase::Reader_; // std::variant could also be used here, but std::tuple leads to better performance // because of deallocations that std::variant has to do mutable std::tuple CachedRows_; mutable std::vector Parsers_; mutable int CachedIndex_; template static constexpr void AssertIsOneOf() { static_assert( (std::is_same::value || ...), "Template parameter must be one of TSkiffRowOneOf template parameter"); } }; template class TTableReader::value>> : public TTableReader> { public: using TRowType = T; using TBase = TTableReader>; using TBase::TBase; const T& GetRow() { return TBase::template GetRow(); } void MoveRow(T* result) { TBase::template MoveRow(result); } T MoveRow() { return TBase::template MoveRow(); } }; template <> inline TTableReaderPtr IIOClient::CreateTableReader( const TRichYPath& path, const TTableReaderOptions& options) { return new TTableReader(CreateNodeReader(path, options)); } template <> inline TTableReaderPtr IIOClient::CreateTableReader( const TRichYPath& path, const TTableReaderOptions& options) { return new TTableReader(CreateYaMRReader(path, options)); } template ::Value>> struct TReaderCreator { static TTableReaderPtr Create(::TIntrusivePtr reader) { return new TTableReader(reader); } }; template inline TTableReaderPtr IIOClient::CreateTableReader( const TRichYPath& path, const TTableReaderOptions& options) { if constexpr (TIsBaseOf::Value) { TAutoPtr prototype(new T); return new TTableReader(CreateProtoReader(path, options, prototype.Get())); } else if constexpr (TIsSkiffRow::value) { const auto& hints = options.FormatHints_ ? options.FormatHints_->SkiffRowHints_ : Nothing(); auto schema = GetSkiffSchema(hints); auto skipper = CreateSkiffSkipper(hints); return new TTableReader(CreateSkiffRowReader(path, options, skipper, schema), hints); } else { static_assert(TDependentFalse, "Unsupported type for table reader"); } } //////////////////////////////////////////////////////////////////////////////// template TTableReaderPtr CreateTableReader( IInputStream* stream, const TTableReaderOptions& options) { return TReaderCreator::Create(NDetail::CreateProtoReader(stream, options, T::descriptor())); } template TTableReaderPtr::TType> CreateProtoMultiTableReader( IInputStream* stream, const TTableReaderOptions& options) { return new TTableReader::TType>( NDetail::CreateProtoReader(stream, options, {Ts::descriptor()...})); } template TTableReaderPtr CreateProtoMultiTableReader( IInputStream* stream, int tableCount, const TTableReaderOptions& options) { static_assert(TIsBaseOf<::google::protobuf::Message, T>::Value); TVector descriptors(tableCount, T::descriptor()); return new TTableReader(NDetail::CreateProtoReader(stream, options, std::move(descriptors))); } //////////////////////////////////////////////////////////////////////////////// template class TTableRangesReader : public TThrRefBase { public: using TRowType = T; private: using TReaderImpl = typename TRowTraits::IReaderImpl; public: TTableRangesReader(::TIntrusivePtr readerImpl) : ReaderImpl_(readerImpl) , Reader_(MakeIntrusive>(readerImpl)) , IsValid_(Reader_->IsValid()) { } TTableReader& GetRange() { return *Reader_; } bool IsValid() const { return IsValid_; } void Next() { ReaderImpl_->NextKey(); if ((IsValid_ = Reader_->IsValid())) { Reader_->Next(); } } private: ::TIntrusivePtr ReaderImpl_; ::TIntrusivePtr> Reader_; bool IsValid_; }; //////////////////////////////////////////////////////////////////////////////// template struct IWriterImplBase : public TThrRefBase { virtual void AddRow(const T& row, size_t tableIndex) = 0; virtual void AddRow(const T& row, size_t tableIndex, size_t /*rowWeight*/) { AddRow(row, tableIndex); } virtual void AddRow(T&& row, size_t tableIndex) = 0; virtual void AddRow(T&& row, size_t tableIndex, size_t /*rowWeight*/) { AddRow(std::move(row), tableIndex); } virtual void AddRowBatch(const TVector& rowBatch, size_t tableIndex, size_t rowBatchWeight = 0) { for (const auto& row : rowBatch) { AddRow(row, tableIndex, rowBatchWeight / rowBatch.size()); } } virtual void AddRowBatch(TVector&& rowBatch, size_t tableIndex, size_t rowBatchWeight = 0) { auto rowBatchSize = rowBatch.size(); for (auto&& row : std::move(rowBatch)) { AddRow(std::move(row), tableIndex, rowBatchWeight / rowBatchSize); } } virtual size_t GetBufferMemoryUsage() const { return 0; } virtual size_t GetTableCount() const = 0; virtual void FinishTable(size_t tableIndex) = 0; virtual void Abort() { } }; struct INodeWriterImpl : public IWriterImplBase { }; struct IYaMRWriterImpl : public IWriterImplBase { }; struct IProtoWriterImpl : public IWriterImplBase { }; //////////////////////////////////////////////////////////////////////////////// template class TTableWriterBase : public TThrRefBase { public: using TRowType = T; using IWriterImpl = typename TRowTraits::IWriterImpl; explicit TTableWriterBase(::TIntrusivePtr writer) : Writer_(writer) , Locks_(MakeAtomicShared>(writer->GetTableCount())) { } ~TTableWriterBase() override { if (Locks_.RefCount() == 1) { NDetail::FinishOrDie(this, /*autoFinish*/ true, "TTableWriterBase"); } } void Abort() { Writer_->Abort(); } void AddRow(const T& row, size_t tableIndex = 0, size_t rowWeight = 0) { DoAddRow(row, tableIndex, rowWeight); } void AddRow(T&& row, size_t tableIndex = 0, size_t rowWeight = 0) { DoAddRow(std::move(row), tableIndex, rowWeight); } void AddRowBatch(const TVector& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0) { DoAddRowBatch(rowBatch, tableIndex, rowBatchWeight); } void AddRowBatch(TVector&& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0) { DoAddRowBatch(std::move(rowBatch), tableIndex, rowBatchWeight); } void Finish() { for (size_t i = 0; i < Locks_->size(); ++i) { auto guard = Guard((*Locks_)[i]); Writer_->FinishTable(i); } } size_t GetBufferMemoryUsage() const { return DoGetBufferMemoryUsage(); } protected: template void DoAddRow(const U& row, size_t tableIndex = 0, size_t rowWeight = 0) { if (tableIndex >= Locks_->size()) { ythrow TIOException() << "Table index " << tableIndex << " is out of range [0, " << Locks_->size() << ")"; } auto guard = Guard((*Locks_)[tableIndex]); Writer_->AddRow(row, tableIndex, rowWeight); } template void DoAddRow(U&& row, size_t tableIndex = 0, size_t rowWeight = 0) { if (tableIndex >= Locks_->size()) { ythrow TIOException() << "Table index " << tableIndex << " is out of range [0, " << Locks_->size() << ")"; } auto guard = Guard((*Locks_)[tableIndex]); Writer_->AddRow(std::move(row), tableIndex, rowWeight); } template void DoAddRowBatch(const TVector& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0) { if (tableIndex >= Locks_->size()) { ythrow TIOException() << "Table index " << tableIndex << " is out of range [0, " << Locks_->size() << ")"; } auto guard = Guard((*Locks_)[tableIndex]); Writer_->AddRowBatch(rowBatch, tableIndex, rowBatchWeight); } template void DoAddRowBatch(TVector&& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0) { if (tableIndex >= Locks_->size()) { ythrow TIOException() << "Table index " << tableIndex << " is out of range [0, " << Locks_->size() << ")"; } auto guard = Guard((*Locks_)[tableIndex]); Writer_->AddRowBatch(std::move(rowBatch), tableIndex, rowBatchWeight); } size_t DoGetBufferMemoryUsage() const { return Writer_->GetBufferMemoryUsage(); } ::TIntrusivePtr GetWriterImpl() { return Writer_; } private: ::TIntrusivePtr Writer_; TAtomicSharedPtr> Locks_; }; template <> class TTableWriter : public TTableWriterBase { public: using TBase = TTableWriterBase; explicit TTableWriter(::TIntrusivePtr writer) : TBase(writer) { } }; template <> class TTableWriter : public TTableWriterBase { public: using TBase = TTableWriterBase; explicit TTableWriter(::TIntrusivePtr writer) : TBase(writer) { } }; template <> class TTableWriter : public TTableWriterBase { public: using TBase = TTableWriterBase; explicit TTableWriter(::TIntrusivePtr writer) : TBase(writer) { } template ::value>* = nullptr> void AddRow(const U& row, size_t tableIndex = 0, size_t rowWeight = 0) { TBase::AddRow(row, tableIndex, rowWeight); } template ::value>* = nullptr> void AddRowBatch(const TVector& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0) { for (const auto& row : rowBatch) { AddRow(row, tableIndex, rowBatchWeight / rowBatch.size()); } } }; template class TTableWriter::Value>> : public TTableWriter { public: using TRowType = T; using TBase = TTableWriter; explicit TTableWriter(::TIntrusivePtr writer) : TBase(writer) { } void AddRow(const T& row, size_t tableIndex = 0, size_t rowWeight = 0) { TBase::AddRow(row, tableIndex, rowWeight); } void AddRowBatch(const TVector& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0) { TBase::AddRowBatch(rowBatch, tableIndex, rowBatchWeight); } }; template <> inline TTableWriterPtr IIOClient::CreateTableWriter( const TRichYPath& path, const TTableWriterOptions& options) { return new TTableWriter(CreateNodeWriter(path, options)); } template <> inline TTableWriterPtr IIOClient::CreateTableWriter( const TRichYPath& path, const TTableWriterOptions& options) { return new TTableWriter(CreateYaMRWriter(path, options)); } template inline TTableWriterPtr IIOClient::CreateTableWriter( const TRichYPath& path, const TTableWriterOptions& options) { if constexpr (TIsBaseOf::Value) { TAutoPtr prototype(new T); return new TTableWriter(CreateProtoWriter(path, options, prototype.Get())); } else { static_assert(TDependentFalse, "Unsupported type for table writer"); } } //////////////////////////////////////////////////////////////////////////////// template TTableReaderPtr CreateConcreteProtobufReader(TTableReader* reader) { static_assert(std::is_base_of_v, "T must be a protobuf type (either Message or its descendant)"); Y_ENSURE(reader, "reader must be non-null"); return ::MakeIntrusive>(reader->GetReaderImpl()); } template TTableReaderPtr CreateConcreteProtobufReader(const TTableReaderPtr& reader) { Y_ENSURE(reader, "reader must be non-null"); return CreateConcreteProtobufReader(reader.Get()); } template TTableReaderPtr CreateGenericProtobufReader(TTableReader* reader) { static_assert(std::is_base_of_v, "T must be a protobuf type (either Message or its descendant)"); Y_ENSURE(reader, "reader must be non-null"); return ::MakeIntrusive>(reader->GetReaderImpl()); } template TTableReaderPtr CreateGenericProtobufReader(const TTableReaderPtr& reader) { Y_ENSURE(reader, "reader must be non-null"); return CreateGenericProtobufReader(reader.Get()); } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT