io.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. #pragma once
  2. ///
  3. /// @file yt/cpp/mapreduce/interface/io.h
  4. ///
  5. /// Header containing client interface for reading and writing tables and files.
  6. #include "fwd.h"
  7. #include "client_method_options.h"
  8. #include "common.h"
  9. #include "format.h"
  10. #include "node.h"
  11. #include "mpl.h"
  12. #include "skiff_row.h"
  13. #include <google/protobuf/message.h>
  14. #include <util/stream/input.h>
  15. #include <util/stream/output.h>
  16. #include <util/generic/yexception.h>
  17. #include <util/generic/maybe.h>
  18. namespace NYT {
  19. ////////////////////////////////////////////////////////////////////////////////
  20. ///
  21. /// @brief "Marker" type to use for several protobuf types in @ref NYT::TTableReader.
  22. ///
  23. /// @tparam Ts Possible types of rows to be read.
  24. template<class... TProtoRowTypes>
  25. class TProtoOneOf
  26. {
  27. public:
  28. static_assert(
  29. (TIsBaseOf<::google::protobuf::Message, TProtoRowTypes>::Value && ...),
  30. "Template parameters can only be protobuf types");
  31. TProtoOneOf() = delete;
  32. };
  33. ///
  34. /// @brief "Marker" type to use for several skiff row types in @ref NYT::TTableReader.
  35. ///
  36. /// @tparam Ts Possible types of rows to be read.
  37. template<class... TSkiffRowTypes>
  38. class TSkiffRowOneOf
  39. {
  40. public:
  41. static_assert(
  42. (TIsSkiffRow<TSkiffRowTypes>::value && ...),
  43. "Template parameters can only be SkiffRow types");
  44. TSkiffRowOneOf() = delete;
  45. };
  46. ////////////////////////////////////////////////////////////////////////////////
  47. /// @cond Doxygen_Suppress
  48. namespace NDetail {
  49. ////////////////////////////////////////////////////////////////////////////////
  50. template <class TTuple>
  51. struct TProtoOneOfFromTuple;
  52. template <class... Ts>
  53. struct TProtoOneOfFromTuple<std::tuple<Ts...>>
  54. {
  55. using TType = TProtoOneOf<Ts...>;
  56. };
  57. template <class... Ts>
  58. struct TProtoOneOfUnique
  59. {
  60. using TTuple = typename TUniqueTypes<std::tuple<>, std::tuple<Ts...>>::TType;
  61. using TType = typename TProtoOneOfFromTuple<TTuple>::TType;
  62. };
  63. ////////////////////////////////////////////////////////////////////////////////
  64. } // namespace NDetail
  65. /// @endcond
  66. ////////////////////////////////////////////////////////////////////////////////
  67. struct INodeReaderImpl;
  68. struct IYaMRReaderImpl;
  69. struct IProtoReaderImpl;
  70. struct ISkiffRowReaderImpl;
  71. struct INodeWriterImpl;
  72. struct IYaMRWriterImpl;
  73. struct IProtoWriterImpl;
  74. ////////////////////////////////////////////////////////////////////////////////
  75. /// Class of exceptions connected to reading or writing tables or files.
  76. class TIOException
  77. : public yexception
  78. { };
  79. ////////////////////////////////////////////////////////////////////////////////
  80. /// Interface representing YT file reader.
  81. class IFileReader
  82. : public TThrRefBase
  83. , public IInputStream
  84. { };
  85. /// Interface representing YT file writer.
  86. class IFileWriter
  87. : public TThrRefBase
  88. , public IOutputStream
  89. {
  90. public:
  91. virtual size_t GetBufferMemoryUsage() const
  92. {
  93. return 0;
  94. }
  95. };
  96. ////////////////////////////////////////////////////////////////////////////////
  97. /// Low-level interface to read YT table with retries.
  98. class TRawTableReader
  99. : public TThrRefBase
  100. , public IInputStream
  101. {
  102. public:
  103. /// @brief Retry table read starting from the specified `rangeIndex` and `rowIndex`.
  104. ///
  105. /// @param rangeIndex Index of first range to read
  106. /// @param rowIndex Index of first row to read; if `rowIndex == Nothing` entire request will be retried.
  107. ///
  108. /// @return `true` on successful request retry, `false` if no retry attempts are left (then `Retry()` shouldn't be called any more).
  109. ///
  110. /// `rowIndex` must be inside the range with index `rangeIndex` if the latter is specified.
  111. ///
  112. /// After successful retry the user should reset `rangeIndex` / `rowIndex` values and read new ones
  113. /// from the stream.
  114. virtual bool Retry(
  115. const TMaybe<ui32>& rangeIndex,
  116. const TMaybe<ui64>& rowIndex,
  117. const std::exception_ptr& error) = 0;
  118. /// Resets retry attempt count to the initial value (then `Retry()` can be called again).
  119. virtual void ResetRetries() = 0;
  120. /// @brief May the input stream contain table ranges?
  121. ///
  122. /// In the case when it is `true` the `TRawTableReader` user is responsible
  123. /// to track active range index in order to pass it to Retry().
  124. virtual bool HasRangeIndices() const = 0;
  125. };
  126. /// @brief Low-level interface to write YT table.
  127. ///
  128. /// Retries must be handled by implementation.
  129. class TRawTableWriter
  130. : public TThrRefBase
  131. , public IOutputStream
  132. {
  133. public:
  134. /// @brief Call this method after complete row representation is written to the stream.
  135. ///
  136. /// When this method is called `TRowTableWriter` can check its buffer
  137. /// and if it is full send data to YT.
  138. /// @note `TRawTableWriter` never sends partial records to YT (due to retries).
  139. virtual void NotifyRowEnd() = 0;
  140. /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers).
  141. ///
  142. /// By default it does nothing, but implementations are welcome to override this method.
  143. virtual void Abort()
  144. { }
  145. virtual size_t GetBufferMemoryUsage() const
  146. {
  147. return 0;
  148. }
  149. };
  150. /// @brief Interface to deal with multiple raw output streams.
  151. class IProxyOutput
  152. {
  153. public:
  154. virtual ~IProxyOutput()
  155. { }
  156. /// Get amount of managed streams.
  157. virtual size_t GetStreamCount() const = 0;
  158. /// Get stream corresponding to the specified table index.
  159. virtual IOutputStream* GetStream(size_t tableIndex) const = 0;
  160. /// This handler must be called right after the next row has been written.
  161. virtual void OnRowFinished(size_t tableIndex) = 0;
  162. /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers).
  163. ///
  164. /// By default it does nothing, but implementations are welcome to override this method.
  165. virtual void Abort()
  166. { }
  167. virtual size_t GetBufferMemoryUsage() const
  168. {
  169. return 0;
  170. }
  171. };
  172. ////////////////////////////////////////////////////////////////////////////////
  173. /// @brief Class template to read typed rows from YT tables.
  174. ///
  175. /// @tparam T Row type.
  176. ///
  177. /// Correct usage of this class usually looks like
  178. /// ```
  179. /// for (const auto& cursor : *reader) {
  180. /// const auto& row = cursor.GetRow();
  181. /// ...
  182. /// }
  183. /// ```
  184. /// or, more verbosely,
  185. /// ```
  186. /// for (; reader->IsValid(); reader->Next()) {
  187. /// const auto& row = reader->GetRow();
  188. /// ...
  189. /// }
  190. /// ```
  191. ///
  192. /// @note Actual (partial) specializations of this template may look a bit different,
  193. /// e.g. @ref NYT::TTableReader::GetRow, @ref NYT::TTableReader::MoveRow may be method templates.
  194. template <class T, class>
  195. class TTableReader
  196. : public TThrRefBase
  197. {
  198. public:
  199. /// Get current row.
  200. const T& GetRow() const;
  201. /// Extract current row; further calls to `GetRow` and `MoveRow` will fail.
  202. T MoveRow();
  203. /// Extract current row to `result`; further calls to `GetRow` and `MoveRow` will fail.
  204. void MoveRow(T* result);
  205. /// Check whether all the rows were read.
  206. bool IsValid() const;
  207. /// Move the cursor to the next row.
  208. void Next();
  209. /// Get table index of the current row.
  210. ui32 GetTableIndex() const;
  211. /// Get range index of the current row (zero if it is unknown or read request contains no ranges)
  212. ui32 GetRangeIndex() const;
  213. /// Get current row index (zero if it unknown).
  214. ui64 GetRowIndex() const;
  215. /// Get current tablet index (for ordered dynamic tables).
  216. i64 GetTabletIndex() const;
  217. /// Returns `true` if job consumed all the input and `false` otherwise.
  218. bool IsEndOfStream() const;
  219. /// Returns `true` if job raw input stream was closed and `false` otherwise.
  220. bool IsRawReaderExhausted() const;
  221. };
  222. /// @brief Iterator for use in range-based-for.
  223. ///
  224. /// @note Idiomatic usage:
  225. /// ```
  226. /// for (const auto& cursor : *reader) {
  227. /// const auto& row = cursor.GetRow();
  228. /// ...
  229. /// }
  230. /// ```
  231. template <class T>
  232. class TTableReaderIterator
  233. {
  234. public:
  235. /// Construct iterator from table reader (can be `nullptr`).
  236. explicit TTableReaderIterator<T>(TTableReader<T>* reader)
  237. {
  238. if (reader && reader->IsValid()) {
  239. Reader_ = reader;
  240. } else {
  241. Reader_ = nullptr;
  242. }
  243. }
  244. /// Equality operator.
  245. bool operator==(const TTableReaderIterator& it) const
  246. {
  247. return Reader_ == it.Reader_;
  248. }
  249. /// Dereference operator.
  250. TTableReader<T>& operator*()
  251. {
  252. return *Reader_;
  253. }
  254. /// Const dereference operator.
  255. const TTableReader<T>& operator*() const
  256. {
  257. return *Reader_;
  258. }
  259. /// Preincrement operator.
  260. TTableReaderIterator& operator++()
  261. {
  262. Reader_->Next();
  263. if (!Reader_->IsValid()) {
  264. Reader_ = nullptr;
  265. }
  266. return *this;
  267. }
  268. private:
  269. TTableReader<T>* Reader_;
  270. };
  271. /// @brief Function to facilitate range-based-for for @ref NYT::TTableReader.
  272. ///
  273. /// @see @ref NYT::TTableReaderIterator
  274. template <class T>
  275. TTableReaderIterator<T> begin(TTableReader<T>& reader)
  276. {
  277. return TTableReaderIterator<T>(&reader);
  278. }
  279. /// @brief Function to facilitate range-based-for for @ref NYT::TTableReader.
  280. ///
  281. /// @see @ref NYT::TTableReaderIterator
  282. template <class T>
  283. TTableReaderIterator<T> end(TTableReader<T>&)
  284. {
  285. return TTableReaderIterator<T>(nullptr);
  286. }
  287. ////////////////////////////////////////////////////////////////////////////////
  288. /// @brief Class to facilitate reading table rows sorted by key.
  289. ///
  290. /// Each reader returned from @ref NYT::TTableRangesReader::GetRange represents
  291. /// a range of rows with the same key.
  292. ///
  293. /// @note Idiomatic usage:
  294. /// ```
  295. /// for (; reader->IsValid(); reader->Next()) {
  296. /// auto& rangeReader = reader->GetRange();
  297. /// ...
  298. /// }
  299. /// ```
  300. template <class T, class>
  301. class TTableRangesReader
  302. : public TThrRefBase
  303. {
  304. public:
  305. /// Get reader for rows with the same key.
  306. TTableReader<T>& GetRange();
  307. /// Check whether all rows are read.
  308. bool IsValid() const;
  309. /// Move cursor to the next range.
  310. void Next();
  311. };
  312. ////////////////////////////////////////////////////////////////////////////////
  313. /// Class template to write typed rows to YT tables.
  314. template <class T, class>
  315. class TTableWriter
  316. : public TThrRefBase
  317. {
  318. public:
  319. /// @brief Submit a row for writing.
  320. ///
  321. /// The row may (and very probably will) *not* be written immediately.
  322. void AddRow(const T& row);
  323. /// Complete writing and check that everything is written successfully.
  324. /// No other data can be written after Finish is called.
  325. void Finish();
  326. size_t GetBufferMemoryUsage() const;
  327. };
  328. ////////////////////////////////////////////////////////////////////////////////
  329. /// @brief Type representing YaMR table row.
  330. ///
  331. /// @deprecated
  332. struct TYaMRRow
  333. {
  334. /// Key column.
  335. TStringBuf Key;
  336. /// Subkey column.
  337. TStringBuf SubKey;
  338. /// Value column.
  339. TStringBuf Value;
  340. };
  341. ////////////////////////////////////////////////////////////////////////////////
  342. /// Interface for creating table and file readers and writer.
  343. class IIOClient
  344. {
  345. public:
  346. virtual ~IIOClient() = default;
  347. /// Create a reader for file at `path`.
  348. virtual IFileReaderPtr CreateFileReader(
  349. const TRichYPath& path,
  350. const TFileReaderOptions& options = TFileReaderOptions()) = 0;
  351. /// Create a writer for file at `path`.
  352. virtual IFileWriterPtr CreateFileWriter(
  353. const TRichYPath& path,
  354. const TFileWriterOptions& options = TFileWriterOptions()) = 0;
  355. /// Create a typed reader for table at `path`.
  356. template <class T>
  357. TTableReaderPtr<T> CreateTableReader(
  358. const TRichYPath& path,
  359. const TTableReaderOptions& options = TTableReaderOptions());
  360. /// Create a typed writer for table at `path`.
  361. template <class T>
  362. TTableWriterPtr<T> CreateTableWriter(
  363. const TRichYPath& path,
  364. const TTableWriterOptions& options = TTableWriterOptions());
  365. /// Create a writer to write protobuf messages with specified descriptor.
  366. virtual TTableWriterPtr<::google::protobuf::Message> CreateTableWriter(
  367. const TRichYPath& path,
  368. const ::google::protobuf::Descriptor& descriptor,
  369. const TTableWriterOptions& options = TTableWriterOptions()) = 0;
  370. /// Create a reader to read a table using specified format.
  371. virtual TRawTableReaderPtr CreateRawReader(
  372. const TRichYPath& path,
  373. const TFormat& format,
  374. const TTableReaderOptions& options = TTableReaderOptions()) = 0;
  375. /// Create a reader to write a table using specified format.
  376. virtual TRawTableWriterPtr CreateRawWriter(
  377. const TRichYPath& path,
  378. const TFormat& format,
  379. const TTableWriterOptions& options = TTableWriterOptions()) = 0;
  380. ///
  381. /// @brief Create a reader for [blob table](https://docs.yandex-team.ru/docs/yt/description/storage/blobtables) at `path`.
  382. ///
  383. /// @param path Blob table path.
  384. /// @param blobId Key identifying the blob.
  385. /// @param options Optional parameters
  386. ///
  387. /// Blob table is a table that stores a number of blobs.
  388. /// Blobs are sliced into parts of the same size (maybe except of last part).
  389. /// Those parts are stored in the separate rows.
  390. ///
  391. /// Blob table have constraints on its schema.
  392. /// - There must be columns that identify blob (blob id columns). That columns might be of any type.
  393. /// - There must be a column of `int64` type that identify part inside the blob (this column is called `part index`).
  394. /// - There must be a column of `string` type that stores actual data (this column is called `data column`).
  395. virtual IFileReaderPtr CreateBlobTableReader(
  396. const TYPath& path,
  397. const TKey& blobId,
  398. const TBlobTableReaderOptions& options = TBlobTableReaderOptions()) = 0;
  399. private:
  400. virtual ::TIntrusivePtr<INodeReaderImpl> CreateNodeReader(
  401. const TRichYPath& path, const TTableReaderOptions& options) = 0;
  402. virtual ::TIntrusivePtr<IYaMRReaderImpl> CreateYaMRReader(
  403. const TRichYPath& path, const TTableReaderOptions& options) = 0;
  404. virtual ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
  405. const TRichYPath& path,
  406. const TTableReaderOptions& options,
  407. const ::google::protobuf::Message* prototype) = 0;
  408. virtual ::TIntrusivePtr<ISkiffRowReaderImpl> CreateSkiffRowReader(
  409. const TRichYPath& path,
  410. const TTableReaderOptions& options,
  411. const ISkiffRowSkipperPtr& skipper,
  412. const NSkiff::TSkiffSchemaPtr& schema) = 0;
  413. virtual ::TIntrusivePtr<INodeWriterImpl> CreateNodeWriter(
  414. const TRichYPath& path, const TTableWriterOptions& options) = 0;
  415. virtual ::TIntrusivePtr<IYaMRWriterImpl> CreateYaMRWriter(
  416. const TRichYPath& path, const TTableWriterOptions& options) = 0;
  417. virtual ::TIntrusivePtr<IProtoWriterImpl> CreateProtoWriter(
  418. const TRichYPath& path,
  419. const TTableWriterOptions& options,
  420. const ::google::protobuf::Message* prototype) = 0;
  421. };
  422. ////////////////////////////////////////////////////////////////////////////////
  423. ///
  424. /// @brief Create a protobuf table reader from a stream.
  425. ///
  426. /// @tparam T Protobuf message type to read (must be inherited from `Message`).
  427. ///
  428. /// @param stream Input stream in YT protobuf format.
  429. template <typename T>
  430. TTableReaderPtr<T> CreateTableReader(
  431. IInputStream* stream,
  432. const TTableReaderOptions& options = {});
  433. ///
  434. /// @brief Create a protobuf multi table reader from a stream.
  435. ///
  436. /// @tparam Ts Protobuf message types to read (must be inherited from `Message`).
  437. ///
  438. /// @param stream Input stream in YT protobuf format.
  439. template <class... Ts>
  440. TTableReaderPtr<typename NDetail::TProtoOneOfUnique<Ts...>::TType> CreateProtoMultiTableReader(
  441. IInputStream* stream,
  442. const TTableReaderOptions& options = {});
  443. ///
  444. /// @brief Create a homogeneous protobuf multi table reader from a stream.
  445. ///
  446. /// @tparam T Protobuf message type to read (must be inherited from `Message`).
  447. ///
  448. /// @param stream Input stream in YT protobuf format.
  449. /// @param tableCount Number of tables in input stream.
  450. template <class T>
  451. TTableReaderPtr<T> CreateProtoMultiTableReader(
  452. IInputStream* stream,
  453. int tableCount,
  454. const TTableReaderOptions& options = {});
  455. /// Create a @ref NYT::TNode table reader from a stream.
  456. template <>
  457. TTableReaderPtr<TNode> CreateTableReader<TNode>(
  458. IInputStream* stream, const TTableReaderOptions& options);
  459. /// Create a @ref NYT::TYaMRRow table reader from a stream.
  460. template <>
  461. TTableReaderPtr<TYaMRRow> CreateTableReader<TYaMRRow>(
  462. IInputStream* stream, const TTableReaderOptions& options);
  463. namespace NDetail {
  464. /// Create a protobuf table reader from a stream.
  465. ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
  466. IInputStream* stream,
  467. const TTableReaderOptions& options,
  468. const ::google::protobuf::Descriptor* descriptor);
  469. /// Create a protobuf table reader from a stream that can contain table switches.
  470. ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
  471. IInputStream* stream,
  472. const TTableReaderOptions& options,
  473. TVector<const ::google::protobuf::Descriptor*> descriptors);
  474. } // namespace NDetail
  475. ////////////////////////////////////////////////////////////////////////////////
  476. /// Convert generic protobuf table reader to a concrete one (for certain type `T`).
  477. template <typename T>
  478. TTableReaderPtr<T> CreateConcreteProtobufReader(TTableReader<Message>* reader);
  479. /// Convert generic protobuf table reader to a concrete one (for certain type `T`).
  480. template <typename T>
  481. TTableReaderPtr<T> CreateConcreteProtobufReader(const TTableReaderPtr<Message>& reader);
  482. /// Convert a concrete (for certain type `T`) protobuf table reader to a generic one.
  483. template <typename T>
  484. TTableReaderPtr<Message> CreateGenericProtobufReader(TTableReader<T>* reader);
  485. /// Convert a concrete (for certain type `T`) protobuf table reader to a generic one.
  486. template <typename T>
  487. TTableReaderPtr<Message> CreateGenericProtobufReader(const TTableReaderPtr<T>& reader);
  488. ////////////////////////////////////////////////////////////////////////////////
  489. } // namespace NYT
  490. #define IO_INL_H_
  491. #include "io-inl.h"
  492. #undef IO_INL_H_