123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598 |
- #pragma once
- ///
- /// @file yt/cpp/mapreduce/interface/io.h
- ///
- /// Header containing client interface for reading and writing tables and files.
- #include "fwd.h"
- #include "client_method_options.h"
- #include "common.h"
- #include "format.h"
- #include "node.h"
- #include "mpl.h"
- #include "skiff_row.h"
- #include <google/protobuf/message.h>
- #include <util/stream/input.h>
- #include <util/stream/output.h>
- #include <util/generic/yexception.h>
- #include <util/generic/maybe.h>
- namespace NYT {
- ////////////////////////////////////////////////////////////////////////////////
- ///
- /// @brief "Marker" type to use for several protobuf types in @ref NYT::TTableReader.
- ///
- /// @tparam Ts Possible types of rows to be read.
- template<class... TProtoRowTypes>
- class TProtoOneOf
- {
- public:
- static_assert(
- (TIsBaseOf<::google::protobuf::Message, TProtoRowTypes>::Value && ...),
- "Template parameters can only be protobuf types");
- TProtoOneOf() = delete;
- };
- ///
- /// @brief "Marker" type to use for several skiff row types in @ref NYT::TTableReader.
- ///
- /// @tparam Ts Possible types of rows to be read.
- template<class... TSkiffRowTypes>
- class TSkiffRowOneOf
- {
- public:
- static_assert(
- (TIsSkiffRow<TSkiffRowTypes>::value && ...),
- "Template parameters can only be SkiffRow types");
- TSkiffRowOneOf() = delete;
- };
- ////////////////////////////////////////////////////////////////////////////////
- /// @cond Doxygen_Suppress
- namespace NDetail {
- ////////////////////////////////////////////////////////////////////////////////
- template <class TTuple>
- struct TProtoOneOfFromTuple;
- template <class... Ts>
- struct TProtoOneOfFromTuple<std::tuple<Ts...>>
- {
- using TType = TProtoOneOf<Ts...>;
- };
- template <class... Ts>
- struct TProtoOneOfUnique
- {
- using TTuple = typename TUniqueTypes<std::tuple<>, std::tuple<Ts...>>::TType;
- using TType = typename TProtoOneOfFromTuple<TTuple>::TType;
- };
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NDetail
- /// @endcond
- ////////////////////////////////////////////////////////////////////////////////
- struct INodeReaderImpl;
- struct IYaMRReaderImpl;
- struct IProtoReaderImpl;
- struct ISkiffRowReaderImpl;
- struct INodeWriterImpl;
- struct IYaMRWriterImpl;
- struct IProtoWriterImpl;
- ////////////////////////////////////////////////////////////////////////////////
- /// Class of exceptions connected to reading or writing tables or files.
- class TIOException
- : public yexception
- { };
- ///////////////////////////////////////////////////////////////////////////////
- /// Interface representing YT file reader.
- class IFileReader
- : public TThrRefBase
- , public IInputStream
- { };
- /// Interface representing YT file writer.
- class IFileWriter
- : public TThrRefBase
- , public IOutputStream
- {
- public:
- virtual size_t GetBufferMemoryUsage() const
- {
- return 0;
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- /// Low-level interface to read YT table with retries.
- class TRawTableReader
- : public TThrRefBase
- , public IInputStream
- {
- public:
- /// @brief Retry table read starting from the specified `rangeIndex` and `rowIndex`.
- ///
- /// @param rangeIndex Index of first range to read
- /// @param rowIndex Index of first row to read; if `rowIndex == Nothing` entire request will be retried.
- ///
- /// @return `true` on successful request retry, `false` if no retry attempts are left (then `Retry()` shouldn't be called any more).
- ///
- /// `rowIndex` must be inside the range with index `rangeIndex` if the latter is specified.
- ///
- /// After successful retry the user should reset `rangeIndex` / `rowIndex` values and read new ones
- /// from the stream.
- virtual bool Retry(
- const TMaybe<ui32>& rangeIndex,
- const TMaybe<ui64>& rowIndex) = 0;
- /// Resets retry attempt count to the initial value (then `Retry()` can be called again).
- virtual void ResetRetries() = 0;
- /// @brief May the input stream contain table ranges?
- ///
- /// In the case when it is `true` the `TRawTableReader` user is responsible
- /// to track active range index in order to pass it to Retry().
- virtual bool HasRangeIndices() const = 0;
- };
- /// @brief Low-level interface to write YT table.
- ///
- /// Retries must be handled by implementation.
- class TRawTableWriter
- : public TThrRefBase
- , public IOutputStream
- {
- public:
- /// @brief Call this method after complete row representation is written to the stream.
- ///
- /// When this method is called `TRowTableWriter` can check its buffer
- /// and if it is full send data to YT.
- /// @note `TRawTableWriter` never sends partial records to YT (due to retries).
- virtual void NotifyRowEnd() = 0;
- /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers).
- ///
- /// By default it does nothing, but implementations are welcome to override this method.
- virtual void Abort()
- { }
- virtual size_t GetBufferMemoryUsage() const
- {
- return 0;
- }
- };
- /// @brief Interface to deal with multiple raw output streams.
- class IProxyOutput
- {
- public:
- virtual ~IProxyOutput()
- { }
- /// Get amount of managed streams.
- virtual size_t GetStreamCount() const = 0;
- /// Get stream corresponding to the specified table index.
- virtual IOutputStream* GetStream(size_t tableIndex) const = 0;
- /// This handler must be called right after the next row has been written.
- virtual void OnRowFinished(size_t tableIndex) = 0;
- /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers).
- ///
- /// By default it does nothing, but implementations are welcome to override this method.
- virtual void Abort()
- { }
- virtual size_t GetBufferMemoryUsage() const
- {
- return 0;
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- /// @brief Class template to read typed rows from YT tables.
- ///
- /// @tparam T Row type.
- ///
- /// Correct usage of this class usually looks like
- /// ```
- /// for (const auto& cursor : *reader) {
- /// const auto& row = cursor.GetRow();
- /// ...
- /// }
- /// ```
- /// or, more verbosely,
- /// ```
- /// for (; reader->IsValid(); reader->Next()) {
- /// const auto& row = reader->GetRow();
- /// ...
- /// }
- /// ```
- ///
- /// @note Actual (partial) specializations of this template may look a bit different,
- /// e.g. @ref NYT::TTableReader::GetRow, @ref NYT::TTableReader::MoveRow may be method templates.
- template <class T, class>
- class TTableReader
- : public TThrRefBase
- {
- public:
- /// Get current row.
- const T& GetRow() const;
- /// Extract current row; further calls to `GetRow` and `MoveRow` will fail.
- T MoveRow();
- /// Extract current row to `result`; further calls to `GetRow` and `MoveRow` will fail.
- void MoveRow(T* result);
- /// Check whether all the rows were read.
- bool IsValid() const;
- /// Move the cursor to the next row.
- void Next();
- /// Get table index of the current row.
- ui32 GetTableIndex() const;
- /// Get range index of the current row (zero if it is unknown or read request contains no ranges)
- ui32 GetRangeIndex() const;
- /// Get current row index (zero if it unknown).
- ui64 GetRowIndex() const;
- /// Get current tablet index (for ordered dynamic tables).
- i64 GetTabletIndex() const;
- /// Returns `true` if job consumed all the input and `false` otherwise.
- bool IsEndOfStream() const;
- /// Returns `true` if job raw input stream was closed and `false` otherwise.
- bool IsRawReaderExhausted() const;
- };
- /// @brief Iterator for use in range-based-for.
- ///
- /// @note Idiomatic usage:
- /// ```
- /// for (const auto& cursor : *reader) {
- /// const auto& row = cursor.GetRow();
- /// ...
- /// }
- /// ```
- template <class T>
- class TTableReaderIterator
- {
- public:
- /// Construct iterator from table reader (can be `nullptr`).
- explicit TTableReaderIterator<T>(TTableReader<T>* reader)
- {
- if (reader && reader->IsValid()) {
- Reader_ = reader;
- } else {
- Reader_ = nullptr;
- }
- }
- /// Equality operator.
- bool operator==(const TTableReaderIterator& it) const
- {
- return Reader_ == it.Reader_;
- }
- /// Dereference operator.
- TTableReader<T>& operator*()
- {
- return *Reader_;
- }
- /// Const dereference operator.
- const TTableReader<T>& operator*() const
- {
- return *Reader_;
- }
- /// Preincrement operator.
- TTableReaderIterator& operator++()
- {
- Reader_->Next();
- if (!Reader_->IsValid()) {
- Reader_ = nullptr;
- }
- return *this;
- }
- private:
- TTableReader<T>* Reader_;
- };
- /// @brief Function to facilitate range-based-for for @ref NYT::TTableReader.
- ///
- /// @see @ref NYT::TTableReaderIterator
- template <class T>
- TTableReaderIterator<T> begin(TTableReader<T>& reader)
- {
- return TTableReaderIterator<T>(&reader);
- }
- /// @brief Function to facilitate range-based-for for @ref NYT::TTableReader.
- ///
- /// @see @ref NYT::TTableReaderIterator
- template <class T>
- TTableReaderIterator<T> end(TTableReader<T>&)
- {
- return TTableReaderIterator<T>(nullptr);
- }
- ////////////////////////////////////////////////////////////////////////////////
- /// @brief Class to facilitate reading table rows sorted by key.
- ///
- /// Each reader returned from @ref NYT::TTableRangesReader::GetRange represents
- /// a range of rows with the same key.
- ///
- /// @note Idiomatic usage:
- /// ```
- /// for (; reader->IsValid(); reader->Next()) {
- /// auto& rangeReader = reader->GetRange();
- /// ...
- /// }
- /// ```
- template <class T, class>
- class TTableRangesReader
- : public TThrRefBase
- {
- public:
- /// Get reader for rows with the same key.
- TTableReader<T>& GetRange();
- /// Check whether all rows are read.
- bool IsValid() const;
- /// Move cursor to the next range.
- void Next();
- };
- ////////////////////////////////////////////////////////////////////////////////
- /// Class template to write typed rows to YT tables.
- template <class T, class>
- class TTableWriter
- : public TThrRefBase
- {
- public:
- /// @brief Submit a row for writing.
- ///
- /// The row may (and very probably will) *not* be written immediately.
- void AddRow(const T& row);
- /// Stop writing data as soon as possible (without flushing data, e.g. before aborting parent transaction).
- void Finish();
- size_t GetBufferMemoryUsage() const;
- };
- ////////////////////////////////////////////////////////////////////////////////
- /// @brief Type representing YaMR table row.
- ///
- /// @deprecated
- struct TYaMRRow
- {
- /// Key column.
- TStringBuf Key;
- /// Subkey column.
- TStringBuf SubKey;
- /// Value column.
- TStringBuf Value;
- };
- ////////////////////////////////////////////////////////////////////////////////
- /// Interface for creating table and file readers and writer.
- class IIOClient
- {
- public:
- virtual ~IIOClient() = default;
- /// Create a reader for file at `path`.
- virtual IFileReaderPtr CreateFileReader(
- const TRichYPath& path,
- const TFileReaderOptions& options = TFileReaderOptions()) = 0;
- /// Create a writer for file at `path`.
- virtual IFileWriterPtr CreateFileWriter(
- const TRichYPath& path,
- const TFileWriterOptions& options = TFileWriterOptions()) = 0;
- /// Create a typed reader for table at `path`.
- template <class T>
- TTableReaderPtr<T> CreateTableReader(
- const TRichYPath& path,
- const TTableReaderOptions& options = TTableReaderOptions());
- /// Create a typed writer for table at `path`.
- template <class T>
- TTableWriterPtr<T> CreateTableWriter(
- const TRichYPath& path,
- const TTableWriterOptions& options = TTableWriterOptions());
- /// Create a writer to write protobuf messages with specified descriptor.
- virtual TTableWriterPtr<::google::protobuf::Message> CreateTableWriter(
- const TRichYPath& path,
- const ::google::protobuf::Descriptor& descriptor,
- const TTableWriterOptions& options = TTableWriterOptions()) = 0;
- /// Create a reader to read a table using specified format.
- virtual TRawTableReaderPtr CreateRawReader(
- const TRichYPath& path,
- const TFormat& format,
- const TTableReaderOptions& options = TTableReaderOptions()) = 0;
- /// Create a reader to write a table using specified format.
- virtual TRawTableWriterPtr CreateRawWriter(
- const TRichYPath& path,
- const TFormat& format,
- const TTableWriterOptions& options = TTableWriterOptions()) = 0;
- ///
- /// @brief Create a reader for [blob table](https://docs.yandex-team.ru/docs/yt/description/storage/blobtables) at `path`.
- ///
- /// @param path Blob table path.
- /// @param blobId Key identifying the blob.
- /// @param options Optional parameters
- ///
- /// Blob table is a table that stores a number of blobs.
- /// Blobs are sliced into parts of the same size (maybe except of last part).
- /// Those parts are stored in the separate rows.
- ///
- /// Blob table have constraints on its schema.
- /// - There must be columns that identify blob (blob id columns). That columns might be of any type.
- /// - There must be a column of `int64` type that identify part inside the blob (this column is called `part index`).
- /// - There must be a column of `string` type that stores actual data (this column is called `data column`).
- virtual IFileReaderPtr CreateBlobTableReader(
- const TYPath& path,
- const TKey& blobId,
- const TBlobTableReaderOptions& options = TBlobTableReaderOptions()) = 0;
- private:
- virtual ::TIntrusivePtr<INodeReaderImpl> CreateNodeReader(
- const TRichYPath& path, const TTableReaderOptions& options) = 0;
- virtual ::TIntrusivePtr<IYaMRReaderImpl> CreateYaMRReader(
- const TRichYPath& path, const TTableReaderOptions& options) = 0;
- virtual ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
- const TRichYPath& path,
- const TTableReaderOptions& options,
- const ::google::protobuf::Message* prototype) = 0;
- virtual ::TIntrusivePtr<ISkiffRowReaderImpl> CreateSkiffRowReader(
- const TRichYPath& path,
- const TTableReaderOptions& options,
- const ISkiffRowSkipperPtr& skipper,
- const NSkiff::TSkiffSchemaPtr& schema) = 0;
- virtual ::TIntrusivePtr<INodeWriterImpl> CreateNodeWriter(
- const TRichYPath& path, const TTableWriterOptions& options) = 0;
- virtual ::TIntrusivePtr<IYaMRWriterImpl> CreateYaMRWriter(
- const TRichYPath& path, const TTableWriterOptions& options) = 0;
- virtual ::TIntrusivePtr<IProtoWriterImpl> CreateProtoWriter(
- const TRichYPath& path,
- const TTableWriterOptions& options,
- const ::google::protobuf::Message* prototype) = 0;
- };
- ////////////////////////////////////////////////////////////////////////////////
- ///
- /// @brief Create a protobuf table reader from a stream.
- ///
- /// @tparam T Protobuf message type to read (must be inherited from `Message`).
- ///
- /// @param stream Input stream in YT protobuf format.
- template <typename T>
- TTableReaderPtr<T> CreateTableReader(
- IInputStream* stream,
- const TTableReaderOptions& options = {});
- ///
- /// @brief Create a protobuf multi table reader from a stream.
- ///
- /// @tparam Ts Protobuf message types to read (must be inherited from `Message`).
- ///
- /// @param stream Input stream in YT protobuf format.
- template <class... Ts>
- TTableReaderPtr<typename NDetail::TProtoOneOfUnique<Ts...>::TType> CreateProtoMultiTableReader(
- IInputStream* stream,
- const TTableReaderOptions& options = {});
- ///
- /// @brief Create a homogeneous protobuf multi table reader from a stream.
- ///
- /// @tparam T Protobuf message type to read (must be inherited from `Message`).
- ///
- /// @param stream Input stream in YT protobuf format.
- /// @param tableCount Number of tables in input stream.
- template <class T>
- TTableReaderPtr<T> CreateProtoMultiTableReader(
- IInputStream* stream,
- int tableCount,
- const TTableReaderOptions& options = {});
- /// Create a @ref NYT::TNode table reader from a stream.
- template <>
- TTableReaderPtr<TNode> CreateTableReader<TNode>(
- IInputStream* stream, const TTableReaderOptions& options);
- /// Create a @ref NYT::TYaMRRow table reader from a stream.
- template <>
- TTableReaderPtr<TYaMRRow> CreateTableReader<TYaMRRow>(
- IInputStream* stream, const TTableReaderOptions& options);
- namespace NDetail {
- /// Create a protobuf table reader from a stream.
- ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
- IInputStream* stream,
- const TTableReaderOptions& options,
- const ::google::protobuf::Descriptor* descriptor);
- /// Create a protobuf table reader from a stream that can contain table switches.
- ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
- IInputStream* stream,
- const TTableReaderOptions& options,
- TVector<const ::google::protobuf::Descriptor*> descriptors);
- } // namespace NDetail
- ////////////////////////////////////////////////////////////////////////////////
- /// Convert generic protobuf table reader to a concrete one (for certain type `T`).
- template <typename T>
- TTableReaderPtr<T> CreateConcreteProtobufReader(TTableReader<Message>* reader);
- /// Convert generic protobuf table reader to a concrete one (for certain type `T`).
- template <typename T>
- TTableReaderPtr<T> CreateConcreteProtobufReader(const TTableReaderPtr<Message>& reader);
- /// Convert a concrete (for certain type `T`) protobuf table reader to a generic one.
- template <typename T>
- TTableReaderPtr<Message> CreateGenericProtobufReader(TTableReader<T>* reader);
- /// Convert a concrete (for certain type `T`) protobuf table reader to a generic one.
- template <typename T>
- TTableReaderPtr<Message> CreateGenericProtobufReader(const TTableReaderPtr<T>& reader);
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT
- #define IO_INL_H_
- #include "io-inl.h"
- #undef IO_INL_H_
|