io.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. #pragma once
  2. ///
  3. /// @file yt/cpp/mapreduce/interface/io.h
  4. ///
  5. /// Header containing client interface for reading and writing tables and files.
  6. #include "fwd.h"
  7. #include "client_method_options.h"
  8. #include "common.h"
  9. #include "format.h"
  10. #include "node.h"
  11. #include "mpl.h"
  12. #include "skiff_row.h"
  13. #include <google/protobuf/message.h>
  14. #include <util/stream/input.h>
  15. #include <util/stream/output.h>
  16. #include <util/generic/yexception.h>
  17. #include <util/generic/maybe.h>
  18. namespace NYT {
  19. ////////////////////////////////////////////////////////////////////////////////
  20. ///
  21. /// @brief "Marker" type to use for several protobuf types in @ref NYT::TTableReader.
  22. ///
  23. /// @tparam Ts Possible types of rows to be read.
  24. template<class... TProtoRowTypes>
  25. class TProtoOneOf
  26. {
  27. public:
  28. static_assert(
  29. (TIsBaseOf<::google::protobuf::Message, TProtoRowTypes>::Value && ...),
  30. "Template parameters can only be protobuf types");
  31. TProtoOneOf() = delete;
  32. };
  33. ///
  34. /// @brief "Marker" type to use for several skiff row types in @ref NYT::TTableReader.
  35. ///
  36. /// @tparam Ts Possible types of rows to be read.
  37. template<class... TSkiffRowTypes>
  38. class TSkiffRowOneOf
  39. {
  40. public:
  41. static_assert(
  42. (TIsSkiffRow<TSkiffRowTypes>::value && ...),
  43. "Template parameters can only be SkiffRow types");
  44. TSkiffRowOneOf() = delete;
  45. };
  46. ////////////////////////////////////////////////////////////////////////////////
  47. /// @cond Doxygen_Suppress
  48. namespace NDetail {
  49. ////////////////////////////////////////////////////////////////////////////////
  50. template <class TTuple>
  51. struct TProtoOneOfFromTuple;
  52. template <class... Ts>
  53. struct TProtoOneOfFromTuple<std::tuple<Ts...>>
  54. {
  55. using TType = TProtoOneOf<Ts...>;
  56. };
  57. template <class... Ts>
  58. struct TProtoOneOfUnique
  59. {
  60. using TTuple = typename TUniqueTypes<std::tuple<>, std::tuple<Ts...>>::TType;
  61. using TType = typename TProtoOneOfFromTuple<TTuple>::TType;
  62. };
  63. ////////////////////////////////////////////////////////////////////////////////
  64. } // namespace NDetail
  65. /// @endcond
  66. ////////////////////////////////////////////////////////////////////////////////
  67. struct INodeReaderImpl;
  68. struct IYaMRReaderImpl;
  69. struct IProtoReaderImpl;
  70. struct ISkiffRowReaderImpl;
  71. struct INodeWriterImpl;
  72. struct IYaMRWriterImpl;
  73. struct IProtoWriterImpl;
  74. ////////////////////////////////////////////////////////////////////////////////
  75. /// Class of exceptions connected to reading or writing tables or files.
  76. class TIOException
  77. : public yexception
  78. { };
  79. ///////////////////////////////////////////////////////////////////////////////
  80. /// Interface representing YT file reader.
  81. class IFileReader
  82. : public TThrRefBase
  83. , public IInputStream
  84. { };
  85. /// Interface representing YT file writer.
  86. class IFileWriter
  87. : public TThrRefBase
  88. , public IOutputStream
  89. {
  90. public:
  91. virtual size_t GetBufferMemoryUsage() const
  92. {
  93. return 0;
  94. }
  95. };
  96. ////////////////////////////////////////////////////////////////////////////////
  97. /// Low-level interface to read YT table with retries.
  98. class TRawTableReader
  99. : public TThrRefBase
  100. , public IInputStream
  101. {
  102. public:
  103. /// @brief Retry table read starting from the specified `rangeIndex` and `rowIndex`.
  104. ///
  105. /// @param rangeIndex Index of first range to read
  106. /// @param rowIndex Index of first row to read; if `rowIndex == Nothing` entire request will be retried.
  107. ///
  108. /// @return `true` on successful request retry, `false` if no retry attempts are left (then `Retry()` shouldn't be called any more).
  109. ///
  110. /// `rowIndex` must be inside the range with index `rangeIndex` if the latter is specified.
  111. ///
  112. /// After successful retry the user should reset `rangeIndex` / `rowIndex` values and read new ones
  113. /// from the stream.
  114. virtual bool Retry(
  115. const TMaybe<ui32>& rangeIndex,
  116. const TMaybe<ui64>& rowIndex) = 0;
  117. /// Resets retry attempt count to the initial value (then `Retry()` can be called again).
  118. virtual void ResetRetries() = 0;
  119. /// @brief May the input stream contain table ranges?
  120. ///
  121. /// In the case when it is `true` the `TRawTableReader` user is responsible
  122. /// to track active range index in order to pass it to Retry().
  123. virtual bool HasRangeIndices() const = 0;
  124. };
  125. /// @brief Low-level interface to write YT table.
  126. ///
  127. /// Retries must be handled by implementation.
  128. class TRawTableWriter
  129. : public TThrRefBase
  130. , public IOutputStream
  131. {
  132. public:
  133. /// @brief Call this method after complete row representation is written to the stream.
  134. ///
  135. /// When this method is called `TRowTableWriter` can check its buffer
  136. /// and if it is full send data to YT.
  137. /// @note `TRawTableWriter` never sends partial records to YT (due to retries).
  138. virtual void NotifyRowEnd() = 0;
  139. /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers).
  140. ///
  141. /// By default it does nothing, but implementations are welcome to override this method.
  142. virtual void Abort()
  143. { }
  144. virtual size_t GetBufferMemoryUsage() const
  145. {
  146. return 0;
  147. }
  148. };
  149. /// @brief Interface to deal with multiple raw output streams.
  150. class IProxyOutput
  151. {
  152. public:
  153. virtual ~IProxyOutput()
  154. { }
  155. /// Get amount of managed streams.
  156. virtual size_t GetStreamCount() const = 0;
  157. /// Get stream corresponding to the specified table index.
  158. virtual IOutputStream* GetStream(size_t tableIndex) const = 0;
  159. /// This handler must be called right after the next row has been written.
  160. virtual void OnRowFinished(size_t tableIndex) = 0;
  161. /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers).
  162. ///
  163. /// By default it does nothing, but implementations are welcome to override this method.
  164. virtual void Abort()
  165. { }
  166. virtual size_t GetBufferMemoryUsage() const
  167. {
  168. return 0;
  169. }
  170. };
  171. ////////////////////////////////////////////////////////////////////////////////
  172. /// @brief Class template to read typed rows from YT tables.
  173. ///
  174. /// @tparam T Row type.
  175. ///
  176. /// Correct usage of this class usually looks like
  177. /// ```
  178. /// for (const auto& cursor : *reader) {
  179. /// const auto& row = cursor.GetRow();
  180. /// ...
  181. /// }
  182. /// ```
  183. /// or, more verbosely,
  184. /// ```
  185. /// for (; reader->IsValid(); reader->Next()) {
  186. /// const auto& row = reader->GetRow();
  187. /// ...
  188. /// }
  189. /// ```
  190. ///
  191. /// @note Actual (partial) specializations of this template may look a bit different,
  192. /// e.g. @ref NYT::TTableReader::GetRow, @ref NYT::TTableReader::MoveRow may be method templates.
  193. template <class T, class>
  194. class TTableReader
  195. : public TThrRefBase
  196. {
  197. public:
  198. /// Get current row.
  199. const T& GetRow() const;
  200. /// Extract current row; further calls to `GetRow` and `MoveRow` will fail.
  201. T MoveRow();
  202. /// Extract current row to `result`; further calls to `GetRow` and `MoveRow` will fail.
  203. void MoveRow(T* result);
  204. /// Check whether all the rows were read.
  205. bool IsValid() const;
  206. /// Move the cursor to the next row.
  207. void Next();
  208. /// Get table index of the current row.
  209. ui32 GetTableIndex() const;
  210. /// Get range index of the current row (zero if it is unknown or read request contains no ranges)
  211. ui32 GetRangeIndex() const;
  212. /// Get current row index (zero if it unknown).
  213. ui64 GetRowIndex() const;
  214. /// Get current tablet index (for ordered dynamic tables).
  215. i64 GetTabletIndex() const;
  216. /// Returns `true` if job consumed all the input and `false` otherwise.
  217. bool IsEndOfStream() const;
  218. /// Returns `true` if job raw input stream was closed and `false` otherwise.
  219. bool IsRawReaderExhausted() const;
  220. };
  221. /// @brief Iterator for use in range-based-for.
  222. ///
  223. /// @note Idiomatic usage:
  224. /// ```
  225. /// for (const auto& cursor : *reader) {
  226. /// const auto& row = cursor.GetRow();
  227. /// ...
  228. /// }
  229. /// ```
  230. template <class T>
  231. class TTableReaderIterator
  232. {
  233. public:
  234. /// Construct iterator from table reader (can be `nullptr`).
  235. explicit TTableReaderIterator<T>(TTableReader<T>* reader)
  236. {
  237. if (reader && reader->IsValid()) {
  238. Reader_ = reader;
  239. } else {
  240. Reader_ = nullptr;
  241. }
  242. }
  243. /// Equality operator.
  244. bool operator==(const TTableReaderIterator& it) const
  245. {
  246. return Reader_ == it.Reader_;
  247. }
  248. /// Inequality operator.
  249. bool operator!=(const TTableReaderIterator& it) const
  250. {
  251. return Reader_ != it.Reader_;
  252. }
  253. /// Dereference operator.
  254. TTableReader<T>& operator*()
  255. {
  256. return *Reader_;
  257. }
  258. /// Const dereference operator.
  259. const TTableReader<T>& operator*() const
  260. {
  261. return *Reader_;
  262. }
  263. /// Preincrement operator.
  264. TTableReaderIterator& operator++()
  265. {
  266. Reader_->Next();
  267. if (!Reader_->IsValid()) {
  268. Reader_ = nullptr;
  269. }
  270. return *this;
  271. }
  272. private:
  273. TTableReader<T>* Reader_;
  274. };
  275. /// @brief Function to facilitate range-based-for for @ref NYT::TTableReader.
  276. ///
  277. /// @see @ref NYT::TTableReaderIterator
  278. template <class T>
  279. TTableReaderIterator<T> begin(TTableReader<T>& reader)
  280. {
  281. return TTableReaderIterator<T>(&reader);
  282. }
  283. /// @brief Function to facilitate range-based-for for @ref NYT::TTableReader.
  284. ///
  285. /// @see @ref NYT::TTableReaderIterator
  286. template <class T>
  287. TTableReaderIterator<T> end(TTableReader<T>&)
  288. {
  289. return TTableReaderIterator<T>(nullptr);
  290. }
  291. ////////////////////////////////////////////////////////////////////////////////
  292. /// @brief Class to facilitate reading table rows sorted by key.
  293. ///
  294. /// Each reader returned from @ref NYT::TTableRangesReader::GetRange represents
  295. /// a range of rows with the same key.
  296. ///
  297. /// @note Idiomatic usage:
  298. /// ```
  299. /// for (; reader->IsValid(); reader->Next()) {
  300. /// auto& rangeReader = reader->GetRange();
  301. /// ...
  302. /// }
  303. /// ```
  304. template <class T, class>
  305. class TTableRangesReader
  306. : public TThrRefBase
  307. {
  308. public:
  309. /// Get reader for rows with the same key.
  310. TTableReader<T>& GetRange();
  311. /// Check whether all rows are read.
  312. bool IsValid() const;
  313. /// Move cursor to the next range.
  314. void Next();
  315. };
  316. ////////////////////////////////////////////////////////////////////////////////
  317. /// Class template to write typed rows to YT tables.
  318. template <class T, class>
  319. class TTableWriter
  320. : public TThrRefBase
  321. {
  322. public:
  323. /// @brief Submit a row for writing.
  324. ///
  325. /// The row may (and very probably will) *not* be written immediately.
  326. void AddRow(const T& row);
  327. /// Stop writing data as soon as possible (without flushing data, e.g. before aborting parent transaction).
  328. void Finish();
  329. size_t GetBufferMemoryUsage() const;
  330. };
  331. ////////////////////////////////////////////////////////////////////////////////
  332. /// @brief Type representing YaMR table row.
  333. ///
  334. /// @deprecated
  335. struct TYaMRRow
  336. {
  337. /// Key column.
  338. TStringBuf Key;
  339. /// Subkey column.
  340. TStringBuf SubKey;
  341. /// Value column.
  342. TStringBuf Value;
  343. };
  344. ////////////////////////////////////////////////////////////////////////////////
  345. /// Interface for creating table and file readers and writer.
  346. class IIOClient
  347. {
  348. public:
  349. virtual ~IIOClient() = default;
  350. /// Create a reader for file at `path`.
  351. virtual IFileReaderPtr CreateFileReader(
  352. const TRichYPath& path,
  353. const TFileReaderOptions& options = TFileReaderOptions()) = 0;
  354. /// Create a writer for file at `path`.
  355. virtual IFileWriterPtr CreateFileWriter(
  356. const TRichYPath& path,
  357. const TFileWriterOptions& options = TFileWriterOptions()) = 0;
  358. /// Create a typed reader for table at `path`.
  359. template <class T>
  360. TTableReaderPtr<T> CreateTableReader(
  361. const TRichYPath& path,
  362. const TTableReaderOptions& options = TTableReaderOptions());
  363. /// Create a typed writer for table at `path`.
  364. template <class T>
  365. TTableWriterPtr<T> CreateTableWriter(
  366. const TRichYPath& path,
  367. const TTableWriterOptions& options = TTableWriterOptions());
  368. /// Create a writer to write protobuf messages with specified descriptor.
  369. virtual TTableWriterPtr<::google::protobuf::Message> CreateTableWriter(
  370. const TRichYPath& path,
  371. const ::google::protobuf::Descriptor& descriptor,
  372. const TTableWriterOptions& options = TTableWriterOptions()) = 0;
  373. /// Create a reader to read a table using specified format.
  374. virtual TRawTableReaderPtr CreateRawReader(
  375. const TRichYPath& path,
  376. const TFormat& format,
  377. const TTableReaderOptions& options = TTableReaderOptions()) = 0;
  378. /// Create a reader to write a table using specified format.
  379. virtual TRawTableWriterPtr CreateRawWriter(
  380. const TRichYPath& path,
  381. const TFormat& format,
  382. const TTableWriterOptions& options = TTableWriterOptions()) = 0;
  383. ///
  384. /// @brief Create a reader for [blob table](https://docs.yandex-team.ru/docs/yt/description/storage/blobtables) at `path`.
  385. ///
  386. /// @param path Blob table path.
  387. /// @param blobId Key identifying the blob.
  388. /// @param options Optional parameters
  389. ///
  390. /// Blob table is a table that stores a number of blobs.
  391. /// Blobs are sliced into parts of the same size (maybe except of last part).
  392. /// Those parts are stored in the separate rows.
  393. ///
  394. /// Blob table have constraints on its schema.
  395. /// - There must be columns that identify blob (blob id columns). That columns might be of any type.
  396. /// - There must be a column of `int64` type that identify part inside the blob (this column is called `part index`).
  397. /// - There must be a column of `string` type that stores actual data (this column is called `data column`).
  398. virtual IFileReaderPtr CreateBlobTableReader(
  399. const TYPath& path,
  400. const TKey& blobId,
  401. const TBlobTableReaderOptions& options = TBlobTableReaderOptions()) = 0;
  402. private:
  403. virtual ::TIntrusivePtr<INodeReaderImpl> CreateNodeReader(
  404. const TRichYPath& path, const TTableReaderOptions& options) = 0;
  405. virtual ::TIntrusivePtr<IYaMRReaderImpl> CreateYaMRReader(
  406. const TRichYPath& path, const TTableReaderOptions& options) = 0;
  407. virtual ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
  408. const TRichYPath& path,
  409. const TTableReaderOptions& options,
  410. const ::google::protobuf::Message* prototype) = 0;
  411. virtual ::TIntrusivePtr<ISkiffRowReaderImpl> CreateSkiffRowReader(
  412. const TRichYPath& path,
  413. const TTableReaderOptions& options,
  414. const ISkiffRowSkipperPtr& skipper,
  415. const NSkiff::TSkiffSchemaPtr& schema) = 0;
  416. virtual ::TIntrusivePtr<INodeWriterImpl> CreateNodeWriter(
  417. const TRichYPath& path, const TTableWriterOptions& options) = 0;
  418. virtual ::TIntrusivePtr<IYaMRWriterImpl> CreateYaMRWriter(
  419. const TRichYPath& path, const TTableWriterOptions& options) = 0;
  420. virtual ::TIntrusivePtr<IProtoWriterImpl> CreateProtoWriter(
  421. const TRichYPath& path,
  422. const TTableWriterOptions& options,
  423. const ::google::protobuf::Message* prototype) = 0;
  424. };
  425. ////////////////////////////////////////////////////////////////////////////////
  426. ///
  427. /// @brief Create a protobuf table reader from a stream.
  428. ///
  429. /// @tparam T Protobuf message type to read (must be inherited from `Message`).
  430. ///
  431. /// @param stream Input stream in YT protobuf format.
  432. template <typename T>
  433. TTableReaderPtr<T> CreateTableReader(
  434. IInputStream* stream,
  435. const TTableReaderOptions& options = {});
  436. ///
  437. /// @brief Create a protobuf multi table reader from a stream.
  438. ///
  439. /// @tparam Ts Protobuf message types to read (must be inherited from `Message`).
  440. ///
  441. /// @param stream Input stream in YT protobuf format.
  442. template <class... Ts>
  443. TTableReaderPtr<typename NDetail::TProtoOneOfUnique<Ts...>::TType> CreateProtoMultiTableReader(
  444. IInputStream* stream,
  445. const TTableReaderOptions& options = {});
  446. ///
  447. /// @brief Create a homogeneous protobuf multi table reader from a stream.
  448. ///
  449. /// @tparam T Protobuf message type to read (must be inherited from `Message`).
  450. ///
  451. /// @param stream Input stream in YT protobuf format.
  452. /// @param tableCount Number of tables in input stream.
  453. template <class T>
  454. TTableReaderPtr<T> CreateProtoMultiTableReader(
  455. IInputStream* stream,
  456. int tableCount,
  457. const TTableReaderOptions& options = {});
  458. /// Create a @ref NYT::TNode table reader from a stream.
  459. template <>
  460. TTableReaderPtr<TNode> CreateTableReader<TNode>(
  461. IInputStream* stream, const TTableReaderOptions& options);
  462. /// Create a @ref NYT::TYaMRRow table reader from a stream.
  463. template <>
  464. TTableReaderPtr<TYaMRRow> CreateTableReader<TYaMRRow>(
  465. IInputStream* stream, const TTableReaderOptions& options);
  466. namespace NDetail {
  467. /// Create a protobuf table reader from a stream.
  468. ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
  469. IInputStream* stream,
  470. const TTableReaderOptions& options,
  471. const ::google::protobuf::Descriptor* descriptor);
  472. /// Create a protobuf table reader from a stream that can contain table switches.
  473. ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
  474. IInputStream* stream,
  475. const TTableReaderOptions& options,
  476. TVector<const ::google::protobuf::Descriptor*> descriptors);
  477. } // namespace NDetail
  478. ////////////////////////////////////////////////////////////////////////////////
  479. /// Convert generic protobuf table reader to a concrete one (for certain type `T`).
  480. template <typename T>
  481. TTableReaderPtr<T> CreateConcreteProtobufReader(TTableReader<Message>* reader);
  482. /// Convert generic protobuf table reader to a concrete one (for certain type `T`).
  483. template <typename T>
  484. TTableReaderPtr<T> CreateConcreteProtobufReader(const TTableReaderPtr<Message>& reader);
  485. /// Convert a concrete (for certain type `T`) protobuf table reader to a generic one.
  486. template <typename T>
  487. TTableReaderPtr<Message> CreateGenericProtobufReader(TTableReader<T>* reader);
  488. /// Convert a concrete (for certain type `T`) protobuf table reader to a generic one.
  489. template <typename T>
  490. TTableReaderPtr<Message> CreateGenericProtobufReader(const TTableReaderPtr<T>& reader);
  491. ////////////////////////////////////////////////////////////////////////////////
  492. } // namespace NYT
  493. #define IO_INL_H_
  494. #include "io-inl.h"
  495. #undef IO_INL_H_