From 308af41748bc15ff794126226cb7bc4be14bf084 Mon Sep 17 00:00:00 2001 From: Aleksandr Khoroshilov Date: Mon, 6 Mar 2023 02:11:54 +0300 Subject: [PATCH 1/2] Split arrow::FileReader::ReadRowGroups() for flexible async IO --- cpp/src/parquet/arrow/reader.cc | 30 +++++++++++++++++++++++++++--- cpp/src/parquet/arrow/reader.h | 7 +++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 5b39de93d9ccf..11e543f935b61 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -311,6 +311,13 @@ class FileReaderImpl : public FileReader { return ReadTable(Iota(reader_->metadata()->num_columns()), table); } + Status WillNeedRowGroups(const std::vector& row_groups, + const std::vector& column_indices) override; + + Status DecodeRowGroups(const std::vector& row_groups, + const std::vector& column_indices, + std::shared_ptr<::arrow::Table>* out) override; + Status ReadRowGroups(const std::vector& row_groups, const std::vector& indices, std::shared_ptr* table) override; @@ -1216,9 +1223,8 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto return Status::OK(); } -Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, - const std::vector& column_indices, - std::shared_ptr
* out) { +Status FileReaderImpl::WillNeedRowGroups(const std::vector& row_groups, + const std::vector& column_indices) { RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled @@ -1229,6 +1235,24 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, reader_properties_.cache_options()); END_PARQUET_CATCH_EXCEPTIONS } + return Status::OK(); +} + +Status FileReaderImpl::DecodeRowGroups(const std::vector& row_groups, + const std::vector& column_indices, + std::shared_ptr<::arrow::Table>* out) { + RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); + + auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices, + /*cpu_executor=*/nullptr); + ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult()); + return Status::OK(); +} + +Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, + const std::vector& column_indices, + std::shared_ptr
* out) { + RETURN_NOT_OK(WillNeedRowGroups(row_groups, column_indices)); auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices, /*cpu_executor=*/nullptr); diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 5dff35e887ef0..fbabeba7c764f 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -249,6 +249,13 @@ class PARQUET_EXPORT FileReader { virtual ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out) = 0; + virtual ::arrow::Status WillNeedRowGroups(const std::vector& row_groups, + const std::vector& column_indices) = 0; + + virtual ::arrow::Status DecodeRowGroups(const std::vector& row_groups, + const std::vector& column_indices, + std::shared_ptr<::arrow::Table>* out) = 0; + virtual ::arrow::Status ReadRowGroups(const std::vector& row_groups, const std::vector& column_indices, std::shared_ptr<::arrow::Table>* out) = 0; From a82d7512faa11b01ff29fb724dd115f62e223aed Mon Sep 17 00:00:00 2001 From: Aleksandr Khoroshilov Date: Mon, 6 Mar 2023 03:16:53 +0300 Subject: [PATCH 2/2] Clang formatting --- cpp/src/parquet/arrow/reader.cc | 6 +++--- cpp/src/parquet/arrow/reader.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 11e543f935b61..d361319f3c96a 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -312,7 +312,7 @@ class FileReaderImpl : public FileReader { } Status WillNeedRowGroups(const std::vector& row_groups, - const std::vector& column_indices) override; + const std::vector& column_indices) override; Status DecodeRowGroups(const std::vector& row_groups, const std::vector& column_indices, @@ -1239,8 +1239,8 @@ Status FileReaderImpl::WillNeedRowGroups(const std::vector& row_groups, } Status FileReaderImpl::DecodeRowGroups(const std::vector& row_groups, - const std::vector& column_indices, - std::shared_ptr<::arrow::Table>* out) { + const std::vector& column_indices, + std::shared_ptr<::arrow::Table>* out) { RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices, diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index fbabeba7c764f..33e8677ef7c15 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -250,7 +250,7 @@ class PARQUET_EXPORT FileReader { virtual ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out) = 0; virtual ::arrow::Status WillNeedRowGroups(const std::vector& row_groups, - const std::vector& column_indices) = 0; + const std::vector& column_indices) = 0; virtual ::arrow::Status DecodeRowGroups(const std::vector& row_groups, const std::vector& column_indices,