123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023 |
- #pragma once
- #ifndef IO_INL_H_
- #error "Direct inclusion of this file is not allowed, use io.h"
- #endif
- #undef IO_INL_H_
- #include "finish_or_die.h"
- #include <util/generic/typetraits.h>
- #include <util/generic/yexception.h>
- #include <util/stream/length.h>
- #include <util/system/mutex.h>
- #include <util/system/spinlock.h>
- #include <library/cpp/yson/node/node_builder.h>
- #include <yt/cpp/mapreduce/interface/serialize.h>
- namespace NYT {
- ////////////////////////////////////////////////////////////////////////////////
- namespace NDetail {
- template<class T>
- struct TIsProtoOneOf
- : std::false_type
- { };
- template <class ...TProtoRowTypes>
- struct TIsProtoOneOf<TProtoOneOf<TProtoRowTypes...>>
- : std::true_type
- { };
- template <class T>
- struct TIsSkiffRowOneOf
- : std::false_type
- { };
- template <class ...TSkiffRowTypes>
- struct TIsSkiffRowOneOf<TSkiffRowOneOf<TSkiffRowTypes...>>
- : std::true_type
- { };
- } // namespace NDetail
- ////////////////////////////////////////////////////////////////////////////////
- template <class T, class = void>
- struct TRowTraits;
- template <>
- struct TRowTraits<TNode>
- {
- using TRowType = TNode;
- using IReaderImpl = INodeReaderImpl;
- using IWriterImpl = INodeWriterImpl;
- };
- template <>
- struct TRowTraits<TYaMRRow>
- {
- using TRowType = TYaMRRow;
- using IReaderImpl = IYaMRReaderImpl;
- using IWriterImpl = IYaMRWriterImpl;
- };
- template <>
- struct TRowTraits<Message>
- {
- using TRowType = Message;
- using IReaderImpl = IProtoReaderImpl;
- using IWriterImpl = IProtoWriterImpl;
- };
- template <class T>
- struct TRowTraits<T, std::enable_if_t<TIsBaseOf<Message, T>::Value>>
- {
- using TRowType = T;
- using IReaderImpl = IProtoReaderImpl;
- using IWriterImpl = IProtoWriterImpl;
- };
- template <class T>
- struct TRowTraits<T, std::enable_if_t<TIsSkiffRow<T>::value>>
- {
- using TRowType = T;
- using IReaderImpl = ISkiffRowReaderImpl;
- };
- template <class... TSkiffRowTypes>
- struct TRowTraits<TSkiffRowOneOf<TSkiffRowTypes...>>
- {
- using TRowType = TSkiffRowOneOf<TSkiffRowTypes...>;
- using IReaderImpl = ISkiffRowReaderImpl;
- };
- template <class... TProtoRowTypes>
- struct TRowTraits<TProtoOneOf<TProtoRowTypes...>>
- {
- using TRowType = TProtoOneOf<TProtoRowTypes...>;
- using IReaderImpl = IProtoReaderImpl;
- using IWriterImpl = IProtoWriterImpl;
- };
- ////////////////////////////////////////////////////////////////////////////////
- struct IReaderImplBase
- : public TThrRefBase
- {
- virtual bool IsValid() const = 0;
- virtual void Next() = 0;
- virtual ui32 GetTableIndex() const = 0;
- virtual ui32 GetRangeIndex() const = 0;
- virtual ui64 GetRowIndex() const = 0;
- virtual void NextKey() = 0;
- // Not pure virtual because of clients that has already implemented this interface.
- virtual TMaybe<size_t> GetReadByteCount() const;
- virtual i64 GetTabletIndex() const;
- virtual bool IsEndOfStream() const;
- virtual bool IsRawReaderExhausted() const;
- };
- struct INodeReaderImpl
- : public IReaderImplBase
- {
- virtual const TNode& GetRow() const = 0;
- virtual void MoveRow(TNode* row) = 0;
- };
- struct IYaMRReaderImpl
- : public IReaderImplBase
- {
- virtual const TYaMRRow& GetRow() const = 0;
- virtual void MoveRow(TYaMRRow* row)
- {
- *row = GetRow();
- }
- };
- struct IProtoReaderImpl
- : public IReaderImplBase
- {
- virtual void ReadRow(Message* row) = 0;
- };
- struct ISkiffRowReaderImpl
- : public IReaderImplBase
- {
- virtual void ReadRow(const ISkiffRowParserPtr& parser) = 0;
- };
- ////////////////////////////////////////////////////////////////////////////////
- namespace NDetail {
- ////////////////////////////////////////////////////////////////////////////////
- // We don't include <yt/cpp/mapreduce/interface/logging/yt_log.h> in this file
- // to avoid macro name clashes (specifically YT_LOG_DEBUG)
- void LogTableReaderStatistics(ui64 rowCount, TMaybe<size_t> byteCount);
- template <class T>
- class TTableReaderBase
- : public TThrRefBase
- {
- public:
- using TRowType = typename TRowTraits<T>::TRowType;
- using IReaderImpl = typename TRowTraits<T>::IReaderImpl;
- explicit TTableReaderBase(::TIntrusivePtr<IReaderImpl> reader)
- : Reader_(reader)
- { }
- ~TTableReaderBase() override
- {
- NDetail::LogTableReaderStatistics(ReadRowCount_, Reader_->GetReadByteCount());
- }
- bool IsValid() const
- {
- return Reader_->IsValid();
- }
- void Next()
- {
- Reader_->Next();
- ++ReadRowCount_;
- RowState_ = ERowState::None;
- }
- bool IsEndOfStream()
- {
- return Reader_->IsEndOfStream();
- }
- bool IsRawReaderExhausted()
- {
- return Reader_->IsRawReaderExhausted();
- }
- ui32 GetTableIndex() const
- {
- return Reader_->GetTableIndex();
- }
- ui32 GetRangeIndex() const
- {
- return Reader_->GetRangeIndex();
- }
- ui64 GetRowIndex() const
- {
- return Reader_->GetRowIndex();
- }
- i64 GetTabletIndex() const
- {
- return Reader_->GetTabletIndex();
- }
- protected:
- template <typename TCacher, typename TCacheGetter>
- const auto& DoGetRowCached(TCacher cacher, TCacheGetter cacheGetter) const
- {
- switch (RowState_) {
- case ERowState::None:
- cacher();
- RowState_ = ERowState::Cached;
- break;
- case ERowState::Cached:
- break;
- case ERowState::MovedOut:
- ythrow yexception() << "Row is already moved";
- }
- return *cacheGetter();
- }
- template <typename U, typename TMover, typename TCacheMover>
- void DoMoveRowCached(U* result, TMover mover, TCacheMover cacheMover)
- {
- Y_ABORT_UNLESS(result);
- switch (RowState_) {
- case ERowState::None:
- mover(result);
- break;
- case ERowState::Cached:
- cacheMover(result);
- break;
- case ERowState::MovedOut:
- ythrow yexception() << "Row is already moved";
- }
- RowState_ = ERowState::MovedOut;
- }
- private:
- enum class ERowState
- {
- None,
- Cached,
- MovedOut,
- };
- protected:
- ::TIntrusivePtr<IReaderImpl> Reader_;
- private:
- ui64 ReadRowCount_ = 0;
- mutable ERowState RowState_ = ERowState::None;
- };
- template <class T>
- class TSimpleTableReader
- : public TTableReaderBase<T>
- {
- public:
- using TBase = TTableReaderBase<T>;
- using typename TBase::TRowType;
- using TBase::TBase;
- const TRowType& GetRow() const
- {
- // Caching is implemented in underlying reader.
- return TBase::DoGetRowCached(
- /* cacher */ [&] {},
- /* cacheGetter */ [&] {
- return &Reader_->GetRow();
- });
- }
- void MoveRow(TRowType* result)
- {
- // Caching is implemented in underlying reader.
- TBase::DoMoveRowCached(
- result,
- /* mover */ [&] (TRowType* result) {
- Reader_->MoveRow(result);
- },
- /* cacheMover */ [&] (TRowType* result) {
- Reader_->MoveRow(result);
- });
- }
- TRowType MoveRow()
- {
- TRowType result;
- MoveRow(&result);
- return result;
- }
- private:
- using TBase::Reader_;
- };
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NDetail
- template <>
- class TTableReader<TNode>
- : public NDetail::TSimpleTableReader<TNode>
- {
- using TSimpleTableReader<TNode>::TSimpleTableReader;
- };
- template <>
- class TTableReader<TYaMRRow>
- : public NDetail::TSimpleTableReader<TYaMRRow>
- {
- using TSimpleTableReader<TYaMRRow>::TSimpleTableReader;
- };
- template <>
- class TTableReader<Message>
- : public NDetail::TTableReaderBase<Message>
- {
- public:
- using TBase = NDetail::TTableReaderBase<Message>;
- using TBase::TBase;
- template <class U>
- const U& GetRow() const
- {
- static_assert(TIsBaseOf<Message, U>::Value);
- return TBase::DoGetRowCached(
- /* cacher */ [&] {
- CachedRow_.reset(new U);
- Reader_->ReadRow(CachedRow_.get());
- },
- /* cacheGetter */ [&] {
- auto result = dynamic_cast<const U*>(CachedRow_.get());
- Y_ABORT_UNLESS(result);
- return result;
- });
- }
- template <class U>
- void MoveRow(U* result)
- {
- static_assert(TIsBaseOf<Message, U>::Value);
- TBase::DoMoveRowCached(
- result,
- /* mover */ [&] (U* result) {
- Reader_->ReadRow(result);
- },
- /* cacheMover */ [&] (U* result) {
- auto cast = dynamic_cast<U*>(CachedRow_.get());
- Y_ABORT_UNLESS(cast);
- result->Swap(cast);
- });
- }
- template <class U>
- U MoveRow()
- {
- static_assert(TIsBaseOf<Message, U>::Value);
- U result;
- MoveRow(&result);
- return result;
- }
- ::TIntrusivePtr<IProtoReaderImpl> GetReaderImpl() const
- {
- return Reader_;
- }
- private:
- using TBase::Reader_;
- mutable std::unique_ptr<Message> CachedRow_;
- };
- template<class... TProtoRowTypes>
- class TTableReader<TProtoOneOf<TProtoRowTypes...>>
- : public NDetail::TTableReaderBase<TProtoOneOf<TProtoRowTypes...>>
- {
- public:
- using TBase = NDetail::TTableReaderBase<TProtoOneOf<TProtoRowTypes...>>;
- using TBase::TBase;
- template <class U>
- const U& GetRow() const
- {
- AssertIsOneOf<U>();
- return TBase::DoGetRowCached(
- /* cacher */ [&] {
- Reader_->ReadRow(&std::get<U>(CachedRows_));
- CachedIndex_ = NDetail::TIndexInTuple<U, decltype(CachedRows_)>::Value;
- },
- /* cacheGetter */ [&] {
- return &std::get<U>(CachedRows_);
- });
- }
- template <class U>
- void MoveRow(U* result)
- {
- AssertIsOneOf<U>();
- return TBase::DoMoveRowCached(
- result,
- /* mover */ [&] (U* result) {
- Reader_->ReadRow(result);
- },
- /* cacheMover */ [&] (U* result) {
- Y_ABORT_UNLESS((NDetail::TIndexInTuple<U, decltype(CachedRows_)>::Value) == CachedIndex_);
- *result = std::move(std::get<U>(CachedRows_));
- });
- }
- template <class U>
- U MoveRow()
- {
- U result;
- MoveRow(&result);
- return result;
- }
- ::TIntrusivePtr<IProtoReaderImpl> GetReaderImpl() const
- {
- return Reader_;
- }
- private:
- using TBase::Reader_;
- // std::variant could also be used here, but std::tuple leads to better performance
- // because of deallocations that std::variant has to do
- mutable std::tuple<TProtoRowTypes...> CachedRows_;
- mutable int CachedIndex_;
- template <class U>
- static constexpr void AssertIsOneOf()
- {
- static_assert(
- (std::is_same<U, TProtoRowTypes>::value || ...),
- "Template parameter must be one of TProtoOneOf template parameter");
- }
- };
- template <class T>
- class TTableReader<T, std::enable_if_t<TIsBaseOf<Message, T>::Value>>
- : public TTableReader<TProtoOneOf<T>>
- {
- public:
- using TRowType = T;
- using TBase = TTableReader<TProtoOneOf<T>>;
- using TBase::TBase;
- const T& GetRow() const
- {
- return TBase::template GetRow<T>();
- }
- void MoveRow(T* result)
- {
- TBase::template MoveRow<T>(result);
- }
- T MoveRow()
- {
- return TBase::template MoveRow<T>();
- }
- };
- template<class... TSkiffRowTypes>
- class TTableReader<TSkiffRowOneOf<TSkiffRowTypes...>>
- : public NDetail::TTableReaderBase<TSkiffRowOneOf<TSkiffRowTypes...>>
- {
- public:
- using TBase = NDetail::TTableReaderBase<TSkiffRowOneOf<TSkiffRowTypes...>>;
- using TBase::TBase;
- explicit TTableReader(::TIntrusivePtr<typename TBase::IReaderImpl> reader, const TMaybe<TSkiffRowHints>& hints)
- : TBase(reader)
- , Parsers_({(CreateSkiffParser<TSkiffRowTypes>(&std::get<TSkiffRowTypes>(CachedRows_), hints))...})
- { }
- template <class U>
- const U& GetRow() const
- {
- AssertIsOneOf<U>();
- return TBase::DoGetRowCached(
- /* cacher */ [&] {
- auto index = NDetail::TIndexInTuple<U, decltype(CachedRows_)>::Value;
- Reader_->ReadRow(Parsers_[index]);
- CachedIndex_ = index;
- },
- /* cacheGetter */ [&] {
- return &std::get<U>(CachedRows_);
- });
- }
- template <class U>
- void MoveRow(U* result)
- {
- AssertIsOneOf<U>();
- return TBase::DoMoveRowCached(
- result,
- /* mover */ [&] (U* result) {
- auto index = NDetail::TIndexInTuple<U, decltype(CachedRows_)>::Value;
- Reader_->ReadRow(Parsers_[index]);
- *result = std::move(std::get<U>(CachedRows_));
- },
- /* cacheMover */ [&] (U* result) {
- Y_ABORT_UNLESS((NDetail::TIndexInTuple<U, decltype(CachedRows_)>::Value) == CachedIndex_);
- *result = std::move(std::get<U>(CachedRows_));
- });
- }
- template <class U>
- U MoveRow()
- {
- U result;
- MoveRow(&result);
- return result;
- }
- ::TIntrusivePtr<ISkiffRowReaderImpl> GetReaderImpl() const
- {
- return Reader_;
- }
- private:
- using TBase::Reader_;
- // std::variant could also be used here, but std::tuple leads to better performance
- // because of deallocations that std::variant has to do
- mutable std::tuple<TSkiffRowTypes...> CachedRows_;
- mutable std::vector<ISkiffRowParserPtr> Parsers_;
- mutable int CachedIndex_;
- template <class U>
- static constexpr void AssertIsOneOf()
- {
- static_assert(
- (std::is_same<U, TSkiffRowTypes>::value || ...),
- "Template parameter must be one of TSkiffRowOneOf template parameter");
- }
- };
- template <class T>
- class TTableReader<T, std::enable_if_t<TIsSkiffRow<T>::value>>
- : public TTableReader<TSkiffRowOneOf<T>>
- {
- public:
- using TRowType = T;
- using TBase = TTableReader<TSkiffRowOneOf<T>>;
- using TBase::TBase;
- const T& GetRow()
- {
- return TBase::template GetRow<T>();
- }
- void MoveRow(T* result)
- {
- TBase::template MoveRow<T>(result);
- }
- T MoveRow()
- {
- return TBase::template MoveRow<T>();
- }
- };
- template <>
- inline TTableReaderPtr<TNode> IIOClient::CreateTableReader<TNode>(
- const TRichYPath& path, const TTableReaderOptions& options)
- {
- return new TTableReader<TNode>(CreateNodeReader(path, options));
- }
- template <>
- inline TTableReaderPtr<TYaMRRow> IIOClient::CreateTableReader<TYaMRRow>(
- const TRichYPath& path, const TTableReaderOptions& options)
- {
- return new TTableReader<TYaMRRow>(CreateYaMRReader(path, options));
- }
- template <class T, class = std::enable_if_t<TIsBaseOf<Message, T>::Value>>
- struct TReaderCreator
- {
- static TTableReaderPtr<T> Create(::TIntrusivePtr<IProtoReaderImpl> reader)
- {
- return new TTableReader<T>(reader);
- }
- };
- template <class T>
- inline TTableReaderPtr<T> IIOClient::CreateTableReader(
- const TRichYPath& path, const TTableReaderOptions& options)
- {
- if constexpr (TIsBaseOf<Message, T>::Value) {
- TAutoPtr<T> prototype(new T);
- return new TTableReader<T>(CreateProtoReader(path, options, prototype.Get()));
- } else if constexpr (TIsSkiffRow<T>::value) {
- const auto& hints = options.FormatHints_ ? options.FormatHints_->SkiffRowHints_ : Nothing();
- auto schema = GetSkiffSchema<T>(hints);
- auto skipper = CreateSkiffSkipper<T>(hints);
- return new TTableReader<T>(CreateSkiffRowReader(path, options, skipper, schema), hints);
- } else {
- static_assert(TDependentFalse<T>, "Unsupported type for table reader");
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- TTableReaderPtr<T> CreateTableReader(
- IInputStream* stream,
- const TTableReaderOptions& options)
- {
- return TReaderCreator<T>::Create(NDetail::CreateProtoReader(stream, options, T::descriptor()));
- }
- template <class... Ts>
- TTableReaderPtr<typename NDetail::TProtoOneOfUnique<Ts...>::TType> CreateProtoMultiTableReader(
- IInputStream* stream,
- const TTableReaderOptions& options)
- {
- return new TTableReader<typename NDetail::TProtoOneOfUnique<Ts...>::TType>(
- NDetail::CreateProtoReader(stream, options, {Ts::descriptor()...}));
- }
- template <class T>
- TTableReaderPtr<T> CreateProtoMultiTableReader(
- IInputStream* stream,
- int tableCount,
- const TTableReaderOptions& options)
- {
- static_assert(TIsBaseOf<::google::protobuf::Message, T>::Value);
- TVector<const ::google::protobuf::Descriptor*> descriptors(tableCount, T::descriptor());
- return new TTableReader<T>(NDetail::CreateProtoReader(stream, options, std::move(descriptors)));
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <class T>
- class TTableRangesReader<T>
- : public TThrRefBase
- {
- public:
- using TRowType = T;
- private:
- using TReaderImpl = typename TRowTraits<TRowType>::IReaderImpl;
- public:
- TTableRangesReader(::TIntrusivePtr<TReaderImpl> readerImpl)
- : ReaderImpl_(readerImpl)
- , Reader_(MakeIntrusive<TTableReader<TRowType>>(readerImpl))
- , IsValid_(Reader_->IsValid())
- { }
- TTableReader<T>& GetRange()
- {
- return *Reader_;
- }
- bool IsValid() const
- {
- return IsValid_;
- }
- void Next()
- {
- ReaderImpl_->NextKey();
- if ((IsValid_ = Reader_->IsValid())) {
- Reader_->Next();
- }
- }
- private:
- ::TIntrusivePtr<TReaderImpl> ReaderImpl_;
- ::TIntrusivePtr<TTableReader<TRowType>> Reader_;
- bool IsValid_;
- };
- ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- struct IWriterImplBase
- : public TThrRefBase
- {
- virtual void AddRow(const T& row, size_t tableIndex) = 0;
- virtual void AddRow(const T& row, size_t tableIndex, size_t /*rowWeight*/)
- {
- AddRow(row, tableIndex);
- }
- virtual void AddRow(T&& row, size_t tableIndex) = 0;
- virtual void AddRow(T&& row, size_t tableIndex, size_t /*rowWeight*/)
- {
- AddRow(std::move(row), tableIndex);
- }
- virtual void AddRowBatch(const TVector<T>& rowBatch, size_t tableIndex, size_t rowBatchWeight = 0)
- {
- for (const auto& row : rowBatch) {
- AddRow(row, tableIndex, rowBatchWeight / rowBatch.size());
- }
- }
- virtual void AddRowBatch(TVector<T>&& rowBatch, size_t tableIndex, size_t rowBatchWeight = 0)
- {
- auto rowBatchSize = rowBatch.size();
- for (auto&& row : std::move(rowBatch)) {
- AddRow(std::move(row), tableIndex, rowBatchWeight / rowBatchSize);
- }
- }
- virtual size_t GetBufferMemoryUsage() const
- {
- return 0;
- }
- virtual size_t GetTableCount() const = 0;
- virtual void FinishTable(size_t tableIndex) = 0;
- virtual void Abort()
- { }
- };
- struct INodeWriterImpl
- : public IWriterImplBase<TNode>
- {
- };
- struct IYaMRWriterImpl
- : public IWriterImplBase<TYaMRRow>
- {
- };
- struct IProtoWriterImpl
- : public IWriterImplBase<Message>
- {
- };
- ////////////////////////////////////////////////////////////////////////////////
- template <class T>
- class TTableWriterBase
- : public TThrRefBase
- {
- public:
- using TRowType = T;
- using IWriterImpl = typename TRowTraits<T>::IWriterImpl;
- explicit TTableWriterBase(::TIntrusivePtr<IWriterImpl> writer)
- : Writer_(writer)
- , Locks_(MakeAtomicShared<TVector<TAdaptiveLock>>(writer->GetTableCount()))
- { }
- void Abort()
- {
- Writer_->Abort();
- }
- void AddRow(const T& row, size_t tableIndex = 0, size_t rowWeight = 0)
- {
- DoAddRow<T>(row, tableIndex, rowWeight);
- }
- void AddRow(T&& row, size_t tableIndex = 0, size_t rowWeight = 0)
- {
- DoAddRow<T>(std::move(row), tableIndex, rowWeight);
- }
- void AddRowBatch(const TVector<T>& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0)
- {
- DoAddRowBatch<T>(rowBatch, tableIndex, rowBatchWeight);
- }
- void AddRowBatch(TVector<T>&& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0)
- {
- DoAddRowBatch<T>(std::move(rowBatch), tableIndex, rowBatchWeight);
- }
- void Finish()
- {
- for (size_t i = 0; i < Locks_->size(); ++i) {
- auto guard = Guard((*Locks_)[i]);
- Writer_->FinishTable(i);
- }
- }
- size_t GetBufferMemoryUsage() const
- {
- return DoGetBufferMemoryUsage();
- }
- protected:
- template <class U>
- void DoAddRow(const U& row, size_t tableIndex = 0, size_t rowWeight = 0)
- {
- if (tableIndex >= Locks_->size()) {
- ythrow TIOException() <<
- "Table index " << tableIndex <<
- " is out of range [0, " << Locks_->size() << ")";
- }
- auto guard = Guard((*Locks_)[tableIndex]);
- Writer_->AddRow(row, tableIndex, rowWeight);
- }
- template <class U>
- void DoAddRow(U&& row, size_t tableIndex = 0, size_t rowWeight = 0)
- {
- if (tableIndex >= Locks_->size()) {
- ythrow TIOException() <<
- "Table index " << tableIndex <<
- " is out of range [0, " << Locks_->size() << ")";
- }
- auto guard = Guard((*Locks_)[tableIndex]);
- Writer_->AddRow(std::move(row), tableIndex, rowWeight);
- }
- template <class U>
- void DoAddRowBatch(const TVector<U>& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0)
- {
- if (tableIndex >= Locks_->size()) {
- ythrow TIOException() <<
- "Table index " << tableIndex <<
- " is out of range [0, " << Locks_->size() << ")";
- }
- auto guard = Guard((*Locks_)[tableIndex]);
- Writer_->AddRowBatch(rowBatch, tableIndex, rowBatchWeight);
- }
- template <class U>
- void DoAddRowBatch(TVector<U>&& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0)
- {
- if (tableIndex >= Locks_->size()) {
- ythrow TIOException() <<
- "Table index " << tableIndex <<
- " is out of range [0, " << Locks_->size() << ")";
- }
- auto guard = Guard((*Locks_)[tableIndex]);
- Writer_->AddRowBatch(std::move(rowBatch), tableIndex, rowBatchWeight);
- }
- size_t DoGetBufferMemoryUsage() const
- {
- return Writer_->GetBufferMemoryUsage();
- }
- ::TIntrusivePtr<IWriterImpl> GetWriterImpl()
- {
- return Writer_;
- }
- private:
- ::TIntrusivePtr<IWriterImpl> Writer_;
- TAtomicSharedPtr<TVector<TAdaptiveLock>> Locks_;
- };
- template <>
- class TTableWriter<TNode>
- : public TTableWriterBase<TNode>
- {
- public:
- using TBase = TTableWriterBase<TNode>;
- explicit TTableWriter(::TIntrusivePtr<IWriterImpl> writer)
- : TBase(writer)
- { }
- };
- template <>
- class TTableWriter<TYaMRRow>
- : public TTableWriterBase<TYaMRRow>
- {
- public:
- using TBase = TTableWriterBase<TYaMRRow>;
- explicit TTableWriter(::TIntrusivePtr<IWriterImpl> writer)
- : TBase(writer)
- { }
- };
- template <>
- class TTableWriter<Message>
- : public TTableWriterBase<Message>
- {
- public:
- using TBase = TTableWriterBase<Message>;
- explicit TTableWriter(::TIntrusivePtr<IWriterImpl> writer)
- : TBase(writer)
- { }
- template <class U, std::enable_if_t<std::is_base_of<Message, U>::value>* = nullptr>
- void AddRow(const U& row, size_t tableIndex = 0, size_t rowWeight = 0)
- {
- TBase::AddRow(row, tableIndex, rowWeight);
- }
- template <class U, std::enable_if_t<std::is_base_of<Message, U>::value>* = nullptr>
- void AddRowBatch(const TVector<U>& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0)
- {
- for (const auto& row : rowBatch) {
- AddRow(row, tableIndex, rowBatchWeight / rowBatch.size());
- }
- }
- };
- template <class T>
- class TTableWriter<T, std::enable_if_t<TIsBaseOf<Message, T>::Value>>
- : public TTableWriter<Message>
- {
- public:
- using TRowType = T;
- using TBase = TTableWriter<Message>;
- explicit TTableWriter(::TIntrusivePtr<IWriterImpl> writer)
- : TBase(writer)
- { }
- void AddRow(const T& row, size_t tableIndex = 0, size_t rowWeight = 0)
- {
- TBase::AddRow<T>(row, tableIndex, rowWeight);
- }
- void AddRowBatch(const TVector<T>& rowBatch, size_t tableIndex = 0, size_t rowBatchWeight = 0)
- {
- TBase::AddRowBatch<T>(rowBatch, tableIndex, rowBatchWeight);
- }
- };
- template <>
- inline TTableWriterPtr<TNode> IIOClient::CreateTableWriter<TNode>(
- const TRichYPath& path, const TTableWriterOptions& options)
- {
- return new TTableWriter<TNode>(CreateNodeWriter(path, options));
- }
- template <>
- inline TTableWriterPtr<TYaMRRow> IIOClient::CreateTableWriter<TYaMRRow>(
- const TRichYPath& path, const TTableWriterOptions& options)
- {
- return new TTableWriter<TYaMRRow>(CreateYaMRWriter(path, options));
- }
- template <class T>
- inline TTableWriterPtr<T> IIOClient::CreateTableWriter(
- const TRichYPath& path, const TTableWriterOptions& options)
- {
- if constexpr (TIsBaseOf<Message, T>::Value) {
- TAutoPtr<T> prototype(new T);
- return new TTableWriter<T>(CreateProtoWriter(path, options, prototype.Get()));
- } else {
- static_assert(TDependentFalse<T>, "Unsupported type for table writer");
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- TTableReaderPtr<T> CreateConcreteProtobufReader(TTableReader<Message>* reader)
- {
- static_assert(std::is_base_of_v<Message, T>, "T must be a protobuf type (either Message or its descendant)");
- Y_ENSURE(reader, "reader must be non-null");
- return ::MakeIntrusive<TTableReader<T>>(reader->GetReaderImpl());
- }
- template <typename T>
- TTableReaderPtr<T> CreateConcreteProtobufReader(const TTableReaderPtr<Message>& reader)
- {
- Y_ENSURE(reader, "reader must be non-null");
- return CreateConcreteProtobufReader<T>(reader.Get());
- }
- template <typename T>
- TTableReaderPtr<Message> CreateGenericProtobufReader(TTableReader<T>* reader)
- {
- static_assert(std::is_base_of_v<Message, T>, "T must be a protobuf type (either Message or its descendant)");
- Y_ENSURE(reader, "reader must be non-null");
- return ::MakeIntrusive<TTableReader<Message>>(reader->GetReaderImpl());
- }
- template <typename T>
- TTableReaderPtr<Message> CreateGenericProtobufReader(const TTableReaderPtr<T>& reader)
- {
- Y_ENSURE(reader, "reader must be non-null");
- return CreateGenericProtobufReader(reader.Get());
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT
|