#pragma once /// /// @file yt/cpp/mapreduce/interface/io.h /// /// Header containing client interface for reading and writing tables and files. #include "fwd.h" #include "client_method_options.h" #include "common.h" #include "format.h" #include "node.h" #include "mpl.h" #include "skiff_row.h" #include #include #include #include #include namespace NYT { //////////////////////////////////////////////////////////////////////////////// /// /// @brief "Marker" type to use for several protobuf types in @ref NYT::TTableReader. /// /// @tparam Ts Possible types of rows to be read. template class TProtoOneOf { public: static_assert( (TIsBaseOf<::google::protobuf::Message, TProtoRowTypes>::Value && ...), "Template parameters can only be protobuf types"); TProtoOneOf() = delete; }; /// /// @brief "Marker" type to use for several skiff row types in @ref NYT::TTableReader. /// /// @tparam Ts Possible types of rows to be read. template class TSkiffRowOneOf { public: static_assert( (TIsSkiffRow::value && ...), "Template parameters can only be SkiffRow types"); TSkiffRowOneOf() = delete; }; //////////////////////////////////////////////////////////////////////////////// /// @cond Doxygen_Suppress namespace NDetail { //////////////////////////////////////////////////////////////////////////////// template struct TProtoOneOfFromTuple; template struct TProtoOneOfFromTuple> { using TType = TProtoOneOf; }; template struct TProtoOneOfUnique { using TTuple = typename TUniqueTypes, std::tuple>::TType; using TType = typename TProtoOneOfFromTuple::TType; }; //////////////////////////////////////////////////////////////////////////////// } // namespace NDetail /// @endcond //////////////////////////////////////////////////////////////////////////////// struct INodeReaderImpl; struct IYaMRReaderImpl; struct IProtoReaderImpl; struct ISkiffRowReaderImpl; struct INodeWriterImpl; struct IYaMRWriterImpl; struct IProtoWriterImpl; //////////////////////////////////////////////////////////////////////////////// /// Class of exceptions connected to reading or writing tables or files. class TIOException : public yexception { }; /////////////////////////////////////////////////////////////////////////////// /// Interface representing YT file reader. class IFileReader : public TThrRefBase , public IInputStream { }; /// Interface representing YT file writer. class IFileWriter : public TThrRefBase , public IOutputStream { public: virtual size_t GetBufferMemoryUsage() const { return 0; } }; //////////////////////////////////////////////////////////////////////////////// /// Low-level interface to read YT table with retries. class TRawTableReader : public TThrRefBase , public IInputStream { public: /// @brief Retry table read starting from the specified `rangeIndex` and `rowIndex`. /// /// @param rangeIndex Index of first range to read /// @param rowIndex Index of first row to read; if `rowIndex == Nothing` entire request will be retried. /// /// @return `true` on successful request retry, `false` if no retry attempts are left (then `Retry()` shouldn't be called any more). /// /// `rowIndex` must be inside the range with index `rangeIndex` if the latter is specified. /// /// After successful retry the user should reset `rangeIndex` / `rowIndex` values and read new ones /// from the stream. virtual bool Retry( const TMaybe& rangeIndex, const TMaybe& rowIndex) = 0; /// Resets retry attempt count to the initial value (then `Retry()` can be called again). virtual void ResetRetries() = 0; /// @brief May the input stream contain table ranges? /// /// In the case when it is `true` the `TRawTableReader` user is responsible /// to track active range index in order to pass it to Retry(). virtual bool HasRangeIndices() const = 0; }; /// @brief Low-level interface to write YT table. /// /// Retries must be handled by implementation. class TRawTableWriter : public TThrRefBase , public IOutputStream { public: /// @brief Call this method after complete row representation is written to the stream. /// /// When this method is called `TRowTableWriter` can check its buffer /// and if it is full send data to YT. /// @note `TRawTableWriter` never sends partial records to YT (due to retries). virtual void NotifyRowEnd() = 0; /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers). /// /// By default it does nothing, but implementations are welcome to override this method. virtual void Abort() { } virtual size_t GetBufferMemoryUsage() const { return 0; } }; /// @brief Interface to deal with multiple raw output streams. class IProxyOutput { public: virtual ~IProxyOutput() { } /// Get amount of managed streams. virtual size_t GetStreamCount() const = 0; /// Get stream corresponding to the specified table index. virtual IOutputStream* GetStream(size_t tableIndex) const = 0; /// This handler must be called right after the next row has been written. virtual void OnRowFinished(size_t tableIndex) = 0; /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers). /// /// By default it does nothing, but implementations are welcome to override this method. virtual void Abort() { } virtual size_t GetBufferMemoryUsage() const { return 0; } }; //////////////////////////////////////////////////////////////////////////////// /// @brief Class template to read typed rows from YT tables. /// /// @tparam T Row type. /// /// Correct usage of this class usually looks like /// ``` /// for (const auto& cursor : *reader) { /// const auto& row = cursor.GetRow(); /// ... /// } /// ``` /// or, more verbosely, /// ``` /// for (; reader->IsValid(); reader->Next()) { /// const auto& row = reader->GetRow(); /// ... /// } /// ``` /// /// @note Actual (partial) specializations of this template may look a bit different, /// e.g. @ref NYT::TTableReader::GetRow, @ref NYT::TTableReader::MoveRow may be method templates. template class TTableReader : public TThrRefBase { public: /// Get current row. const T& GetRow() const; /// Extract current row; further calls to `GetRow` and `MoveRow` will fail. T MoveRow(); /// Extract current row to `result`; further calls to `GetRow` and `MoveRow` will fail. void MoveRow(T* result); /// Check whether all the rows were read. bool IsValid() const; /// Move the cursor to the next row. void Next(); /// Get table index of the current row. ui32 GetTableIndex() const; /// Get range index of the current row (zero if it is unknown or read request contains no ranges) ui32 GetRangeIndex() const; /// Get current row index (zero if it unknown). ui64 GetRowIndex() const; /// Get current tablet index (for ordered dynamic tables). i64 GetTabletIndex() const; /// Returns `true` if job consumed all the input and `false` otherwise. bool IsEndOfStream() const; /// Returns `true` if job raw input stream was closed and `false` otherwise. bool IsRawReaderExhausted() const; }; /// @brief Iterator for use in range-based-for. /// /// @note Idiomatic usage: /// ``` /// for (const auto& cursor : *reader) { /// const auto& row = cursor.GetRow(); /// ... /// } /// ``` template class TTableReaderIterator { public: /// Construct iterator from table reader (can be `nullptr`). explicit TTableReaderIterator(TTableReader* reader) { if (reader && reader->IsValid()) { Reader_ = reader; } else { Reader_ = nullptr; } } /// Equality operator. bool operator==(const TTableReaderIterator& it) const { return Reader_ == it.Reader_; } /// Inequality operator. bool operator!=(const TTableReaderIterator& it) const { return Reader_ != it.Reader_; } /// Dereference operator. TTableReader& operator*() { return *Reader_; } /// Const dereference operator. const TTableReader& operator*() const { return *Reader_; } /// Preincrement operator. TTableReaderIterator& operator++() { Reader_->Next(); if (!Reader_->IsValid()) { Reader_ = nullptr; } return *this; } private: TTableReader* Reader_; }; /// @brief Function to facilitate range-based-for for @ref NYT::TTableReader. /// /// @see @ref NYT::TTableReaderIterator template TTableReaderIterator begin(TTableReader& reader) { return TTableReaderIterator(&reader); } /// @brief Function to facilitate range-based-for for @ref NYT::TTableReader. /// /// @see @ref NYT::TTableReaderIterator template TTableReaderIterator end(TTableReader&) { return TTableReaderIterator(nullptr); } //////////////////////////////////////////////////////////////////////////////// /// @brief Class to facilitate reading table rows sorted by key. /// /// Each reader returned from @ref NYT::TTableRangesReader::GetRange represents /// a range of rows with the same key. /// /// @note Idiomatic usage: /// ``` /// for (; reader->IsValid(); reader->Next()) { /// auto& rangeReader = reader->GetRange(); /// ... /// } /// ``` template class TTableRangesReader : public TThrRefBase { public: /// Get reader for rows with the same key. TTableReader& GetRange(); /// Check whether all rows are read. bool IsValid() const; /// Move cursor to the next range. void Next(); }; //////////////////////////////////////////////////////////////////////////////// /// Class template to write typed rows to YT tables. template class TTableWriter : public TThrRefBase { public: /// @brief Submit a row for writing. /// /// The row may (and very probably will) *not* be written immediately. void AddRow(const T& row); /// Stop writing data as soon as possible (without flushing data, e.g. before aborting parent transaction). void Finish(); size_t GetBufferMemoryUsage() const; }; //////////////////////////////////////////////////////////////////////////////// /// @brief Type representing YaMR table row. /// /// @deprecated struct TYaMRRow { /// Key column. TStringBuf Key; /// Subkey column. TStringBuf SubKey; /// Value column. TStringBuf Value; }; //////////////////////////////////////////////////////////////////////////////// /// Interface for creating table and file readers and writer. class IIOClient { public: virtual ~IIOClient() = default; /// Create a reader for file at `path`. virtual IFileReaderPtr CreateFileReader( const TRichYPath& path, const TFileReaderOptions& options = TFileReaderOptions()) = 0; /// Create a writer for file at `path`. virtual IFileWriterPtr CreateFileWriter( const TRichYPath& path, const TFileWriterOptions& options = TFileWriterOptions()) = 0; /// Create a typed reader for table at `path`. template TTableReaderPtr CreateTableReader( const TRichYPath& path, const TTableReaderOptions& options = TTableReaderOptions()); /// Create a typed writer for table at `path`. template TTableWriterPtr CreateTableWriter( const TRichYPath& path, const TTableWriterOptions& options = TTableWriterOptions()); /// Create a writer to write protobuf messages with specified descriptor. virtual TTableWriterPtr<::google::protobuf::Message> CreateTableWriter( const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options = TTableWriterOptions()) = 0; /// Create a reader to read a table using specified format. virtual TRawTableReaderPtr CreateRawReader( const TRichYPath& path, const TFormat& format, const TTableReaderOptions& options = TTableReaderOptions()) = 0; /// Create a reader to write a table using specified format. virtual TRawTableWriterPtr CreateRawWriter( const TRichYPath& path, const TFormat& format, const TTableWriterOptions& options = TTableWriterOptions()) = 0; /// /// @brief Create a reader for [blob table](https://docs.yandex-team.ru/docs/yt/description/storage/blobtables) at `path`. /// /// @param path Blob table path. /// @param blobId Key identifying the blob. /// @param options Optional parameters /// /// Blob table is a table that stores a number of blobs. /// Blobs are sliced into parts of the same size (maybe except of last part). /// Those parts are stored in the separate rows. /// /// Blob table have constraints on its schema. /// - There must be columns that identify blob (blob id columns). That columns might be of any type. /// - There must be a column of `int64` type that identify part inside the blob (this column is called `part index`). /// - There must be a column of `string` type that stores actual data (this column is called `data column`). virtual IFileReaderPtr CreateBlobTableReader( const TYPath& path, const TKey& blobId, const TBlobTableReaderOptions& options = TBlobTableReaderOptions()) = 0; private: virtual ::TIntrusivePtr CreateNodeReader( const TRichYPath& path, const TTableReaderOptions& options) = 0; virtual ::TIntrusivePtr CreateYaMRReader( const TRichYPath& path, const TTableReaderOptions& options) = 0; virtual ::TIntrusivePtr CreateProtoReader( const TRichYPath& path, const TTableReaderOptions& options, const ::google::protobuf::Message* prototype) = 0; virtual ::TIntrusivePtr CreateSkiffRowReader( const TRichYPath& path, const TTableReaderOptions& options, const ISkiffRowSkipperPtr& skipper, const NSkiff::TSkiffSchemaPtr& schema) = 0; virtual ::TIntrusivePtr CreateNodeWriter( const TRichYPath& path, const TTableWriterOptions& options) = 0; virtual ::TIntrusivePtr CreateYaMRWriter( const TRichYPath& path, const TTableWriterOptions& options) = 0; virtual ::TIntrusivePtr CreateProtoWriter( const TRichYPath& path, const TTableWriterOptions& options, const ::google::protobuf::Message* prototype) = 0; }; //////////////////////////////////////////////////////////////////////////////// /// /// @brief Create a protobuf table reader from a stream. /// /// @tparam T Protobuf message type to read (must be inherited from `Message`). /// /// @param stream Input stream in YT protobuf format. template TTableReaderPtr CreateTableReader( IInputStream* stream, const TTableReaderOptions& options = {}); /// /// @brief Create a protobuf multi table reader from a stream. /// /// @tparam Ts Protobuf message types to read (must be inherited from `Message`). /// /// @param stream Input stream in YT protobuf format. template TTableReaderPtr::TType> CreateProtoMultiTableReader( IInputStream* stream, const TTableReaderOptions& options = {}); /// /// @brief Create a homogeneous protobuf multi table reader from a stream. /// /// @tparam T Protobuf message type to read (must be inherited from `Message`). /// /// @param stream Input stream in YT protobuf format. /// @param tableCount Number of tables in input stream. template TTableReaderPtr CreateProtoMultiTableReader( IInputStream* stream, int tableCount, const TTableReaderOptions& options = {}); /// Create a @ref NYT::TNode table reader from a stream. template <> TTableReaderPtr CreateTableReader( IInputStream* stream, const TTableReaderOptions& options); /// Create a @ref NYT::TYaMRRow table reader from a stream. template <> TTableReaderPtr CreateTableReader( IInputStream* stream, const TTableReaderOptions& options); namespace NDetail { /// Create a protobuf table reader from a stream. ::TIntrusivePtr CreateProtoReader( IInputStream* stream, const TTableReaderOptions& options, const ::google::protobuf::Descriptor* descriptor); /// Create a protobuf table reader from a stream that can contain table switches. ::TIntrusivePtr CreateProtoReader( IInputStream* stream, const TTableReaderOptions& options, TVector descriptors); } // namespace NDetail //////////////////////////////////////////////////////////////////////////////// /// Convert generic protobuf table reader to a concrete one (for certain type `T`). template TTableReaderPtr CreateConcreteProtobufReader(TTableReader* reader); /// Convert generic protobuf table reader to a concrete one (for certain type `T`). template TTableReaderPtr CreateConcreteProtobufReader(const TTableReaderPtr& reader); /// Convert a concrete (for certain type `T`) protobuf table reader to a generic one. template TTableReaderPtr CreateGenericProtobufReader(TTableReader* reader); /// Convert a concrete (for certain type `T`) protobuf table reader to a generic one. template TTableReaderPtr CreateGenericProtobufReader(const TTableReaderPtr& reader); //////////////////////////////////////////////////////////////////////////////// } // namespace NYT #define IO_INL_H_ #include "io-inl.h" #undef IO_INL_H_