Reader.cc 58 KB


  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "Reader.hh"
  19. #include "Adaptor.hh"
  20. #include "BloomFilter.hh"
  21. #include "Options.hh"
  22. #include "Statistics.hh"
  23. #include "StripeStream.hh"
  24. #include "Utils.hh"
  25. #include "wrap/coded-stream-wrapper.h"
  26. #include <algorithm>
  27. #include <iostream>
  28. #include <iterator>
  29. #include <memory>
  30. #include <set>
  31. #include <sstream>
  32. #include <string>
  33. #include <vector>
  34. namespace orc {
  35. // ORC files writen by these versions of cpp writers have inconsistent bloom filter
  36. // hashing. Bloom filters of them should not be used.
  37. static const char* BAD_CPP_BLOOM_FILTER_VERSIONS[] = {
  38. "1.6.0", "1.6.1", "1.6.2", "1.6.3", "1.6.4", "1.6.5", "1.6.6",
  39. "1.6.7", "1.6.8", "1.6.9", "1.6.10", "1.6.11", "1.7.0"};
  40. ReaderMetrics* getDefaultReaderMetrics() {
  41. static ReaderMetrics internal;
  42. return &internal;
  43. }
  44. const RowReaderOptions::IdReadIntentMap EMPTY_IDREADINTENTMAP() {
  45. return {};
  46. }
  47. const WriterVersionImpl& WriterVersionImpl::VERSION_HIVE_8732() {
  48. static const WriterVersionImpl version(WriterVersion_HIVE_8732);
  49. return version;
  50. }
  51. uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
  52. if (ps.has_compression_block_size()) {
  53. return ps.compression_block_size();
  54. } else {
  55. return 256 * 1024;
  56. }
  57. }
  58. CompressionKind convertCompressionKind(const proto::PostScript& ps) {
  59. if (ps.has_compression()) {
  60. return static_cast<CompressionKind>(ps.compression());
  61. } else {
  62. throw ParseError("Unknown compression type");
  63. }
  64. }
  65. std::string ColumnSelector::toDotColumnPath() {
  66. if (columns.empty()) {
  67. return std::string();
  68. }
  69. std::ostringstream columnStream;
  70. std::copy(columns.begin(), columns.end(),
  71. std::ostream_iterator<std::string>(columnStream, "."));
  72. std::string columnPath = columnStream.str();
  73. return columnPath.substr(0, columnPath.length() - 1);
  74. }
  75. WriterVersion getWriterVersionImpl(const FileContents* contents) {
  76. if (!contents->postscript->has_writer_version()) {
  77. return WriterVersion_ORIGINAL;
  78. }
  79. return static_cast<WriterVersion>(contents->postscript->writer_version());
  80. }
  81. void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type) {
  82. return selectChildren(selectedColumns, type, EMPTY_IDREADINTENTMAP());
  83. }
  84. void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type,
  85. const RowReaderOptions::IdReadIntentMap& idReadIntentMap) {
  86. size_t id = static_cast<size_t>(type.getColumnId());
  87. TypeKind kind = type.getKind();
  88. if (!selectedColumns[id]) {
  89. selectedColumns[id] = true;
  90. bool selectChild = true;
  91. if (kind == TypeKind::LIST || kind == TypeKind::MAP || kind == TypeKind::UNION) {
  92. auto elem = idReadIntentMap.find(id);
  93. if (elem != idReadIntentMap.end() && elem->second == ReadIntent_OFFSETS) {
  94. selectChild = false;
  95. }
  96. }
  97. if (selectChild) {
  98. for (size_t c = id; c <= type.getMaximumColumnId(); ++c) {
  99. selectedColumns[c] = true;
  100. }
  101. }
  102. }
  103. }
  104. /**
  105. * Recurses over a type tree and selects the parents of every selected type.
  106. * @return true if any child was selected.
  107. */
  108. bool ColumnSelector::selectParents(std::vector<bool>& selectedColumns, const Type& type) {
  109. size_t id = static_cast<size_t>(type.getColumnId());
  110. bool result = selectedColumns[id];
  111. uint64_t numSubtypeSelected = 0;
  112. for (uint64_t c = 0; c < type.getSubtypeCount(); ++c) {
  113. if (selectParents(selectedColumns, *type.getSubtype(c))) {
  114. result = true;
  115. numSubtypeSelected++;
  116. }
  117. }
  118. selectedColumns[id] = result;
  119. if (type.getKind() == TypeKind::UNION && selectedColumns[id]) {
  120. if (0 < numSubtypeSelected && numSubtypeSelected < type.getSubtypeCount()) {
  121. // Subtypes of UNION should be fully selected or not selected at all.
  122. // Override partial subtype selections with full selections.
  123. for (uint64_t c = 0; c < type.getSubtypeCount(); ++c) {
  124. selectChildren(selectedColumns, *type.getSubtype(c));
  125. }
  126. }
  127. }
  128. return result;
  129. }
  130. /**
  131. * Recurses over a type tree and build two maps
  132. * map<TypeName, TypeId>, map<TypeId, Type>
  133. */
  134. void ColumnSelector::buildTypeNameIdMap(const Type* type) {
  135. // map<type_id, Type*>
  136. idTypeMap[type->getColumnId()] = type;
  137. if (STRUCT == type->getKind()) {
  138. for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
  139. const std::string& fieldName = type->getFieldName(i);
  140. columns.push_back(fieldName);
  141. nameIdMap[toDotColumnPath()] = type->getSubtype(i)->getColumnId();
  142. buildTypeNameIdMap(type->getSubtype(i));
  143. columns.pop_back();
  144. }
  145. } else {
  146. // other non-primitive type
  147. for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
  148. buildTypeNameIdMap(type->getSubtype(j));
  149. }
  150. }
  151. }
  152. void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns,
  153. const RowReaderOptions& options) {
  154. selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
  155. if (contents->schema->getKind() == STRUCT && options.getIndexesSet()) {
  156. for (std::list<uint64_t>::const_iterator field = options.getInclude().begin();
  157. field != options.getInclude().end(); ++field) {
  158. updateSelectedByFieldId(selectedColumns, *field);
  159. }
  160. } else if (contents->schema->getKind() == STRUCT && options.getNamesSet()) {
  161. for (std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
  162. field != options.getIncludeNames().end(); ++field) {
  163. updateSelectedByName(selectedColumns, *field);
  164. }
  165. } else if (options.getTypeIdsSet()) {
  166. const RowReaderOptions::IdReadIntentMap idReadIntentMap = options.getIdReadIntentMap();
  167. for (std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
  168. typeId != options.getInclude().end(); ++typeId) {
  169. updateSelectedByTypeId(selectedColumns, *typeId, idReadIntentMap);
  170. }
  171. } else {
  172. // default is to select all columns
  173. std::fill(selectedColumns.begin(), selectedColumns.end(), true);
  174. }
  175. selectParents(selectedColumns, *contents->schema.get());
  176. selectedColumns[0] = true; // column 0 is selected by default
  177. }
  178. void ColumnSelector::updateSelectedByFieldId(std::vector<bool>& selectedColumns,
  179. uint64_t fieldId) {
  180. if (fieldId < contents->schema->getSubtypeCount()) {
  181. selectChildren(selectedColumns, *contents->schema->getSubtype(fieldId));
  182. } else {
  183. std::stringstream buffer;
  184. buffer << "Invalid column selected " << fieldId << " out of "
  185. << contents->schema->getSubtypeCount();
  186. throw ParseError(buffer.str());
  187. }
  188. }
  189. void ColumnSelector::updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId) {
  190. updateSelectedByTypeId(selectedColumns, typeId, EMPTY_IDREADINTENTMAP());
  191. }
  192. void ColumnSelector::updateSelectedByTypeId(
  193. std::vector<bool>& selectedColumns, uint64_t typeId,
  194. const RowReaderOptions::IdReadIntentMap& idReadIntentMap) {
  195. if (typeId < selectedColumns.size()) {
  196. const Type& type = *idTypeMap[typeId];
  197. selectChildren(selectedColumns, type, idReadIntentMap);
  198. } else {
  199. std::stringstream buffer;
  200. buffer << "Invalid type id selected " << typeId << " out of " << selectedColumns.size();
  201. throw ParseError(buffer.str());
  202. }
  203. }
  204. void ColumnSelector::updateSelectedByName(std::vector<bool>& selectedColumns,
  205. const std::string& fieldName) {
  206. std::map<std::string, uint64_t>::const_iterator ite = nameIdMap.find(fieldName);
  207. if (ite != nameIdMap.end()) {
  208. updateSelectedByTypeId(selectedColumns, ite->second);
  209. } else {
  210. bool first = true;
  211. std::ostringstream ss;
  212. ss << "Invalid column selected " << fieldName << ". Valid names are ";
  213. for (auto it = nameIdMap.begin(); it != nameIdMap.end(); ++it) {
  214. if (!first) ss << ", ";
  215. ss << it->first;
  216. first = false;
  217. }
  218. throw ParseError(ss.str());
  219. }
  220. }
  221. ColumnSelector::ColumnSelector(const FileContents* _contents) : contents(_contents) {
  222. buildTypeNameIdMap(contents->schema.get());
  223. }
  224. RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
  225. const RowReaderOptions& opts)
  226. : localTimezone(getLocalTimezone()),
  227. contents(_contents),
  228. throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
  229. forcedScaleOnHive11Decimal(opts.getForcedScaleOnHive11Decimal()),
  230. footer(contents->footer.get()),
  231. firstRowOfStripe(*contents->pool, 0),
  232. enableEncodedBlock(opts.getEnableLazyDecoding()),
  233. readerTimezone(getTimezoneByName(opts.getTimezoneName())),
  234. schemaEvolution(opts.getReadType(), contents->schema.get()) {
  235. uint64_t numberOfStripes;
  236. numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
  237. currentStripe = numberOfStripes;
  238. lastStripe = 0;
  239. currentRowInStripe = 0;
  240. rowsInCurrentStripe = 0;
  241. numRowGroupsInStripeRange = 0;
  242. useTightNumericVector = opts.getUseTightNumericVector();
  243. throwOnSchemaEvolutionOverflow = opts.getThrowOnSchemaEvolutionOverflow();
  244. uint64_t rowTotal = 0;
  245. firstRowOfStripe.resize(numberOfStripes);
  246. for (size_t i = 0; i < numberOfStripes; ++i) {
  247. firstRowOfStripe[i] = rowTotal;
  248. proto::StripeInformation stripeInfo = footer->stripes(static_cast<int>(i));
  249. rowTotal += stripeInfo.number_of_rows();
  250. bool isStripeInRange = stripeInfo.offset() >= opts.getOffset() &&
  251. stripeInfo.offset() < opts.getOffset() + opts.getLength();
  252. if (isStripeInRange) {
  253. if (i < currentStripe) {
  254. currentStripe = i;
  255. }
  256. if (i >= lastStripe) {
  257. lastStripe = i + 1;
  258. }
  259. if (footer->row_index_stride() > 0) {
  260. numRowGroupsInStripeRange +=
  261. (stripeInfo.number_of_rows() + footer->row_index_stride() - 1) /
  262. footer->row_index_stride();
  263. }
  264. }
  265. }
  266. firstStripe = currentStripe;
  267. processingStripe = lastStripe;
  268. if (currentStripe == 0) {
  269. previousRow = (std::numeric_limits<uint64_t>::max)();
  270. } else if (currentStripe == numberOfStripes) {
  271. previousRow = footer->number_of_rows();
  272. } else {
  273. previousRow = firstRowOfStripe[firstStripe] - 1;
  274. }
  275. ColumnSelector column_selector(contents.get());
  276. column_selector.updateSelected(selectedColumns, opts);
  277. // prepare SargsApplier if SearchArgument is available
  278. if (opts.getSearchArgument() && footer->row_index_stride() > 0) {
  279. sargs = opts.getSearchArgument();
  280. sargsApplier.reset(
  281. new SargsApplier(*contents->schema, sargs.get(), footer->row_index_stride(),
  282. getWriterVersionImpl(_contents.get()), contents->readerMetrics));
  283. }
  284. skipBloomFilters = hasBadBloomFilters();
  285. }
  286. // Check if the file has inconsistent bloom filters.
  287. bool RowReaderImpl::hasBadBloomFilters() {
  288. // Only C++ writer in old releases could have bad bloom filters.
  289. if (footer->writer() != ORC_CPP_WRITER) return false;
  290. // 'softwareVersion' is added in 1.5.13, 1.6.11, and 1.7.0.
  291. // 1.6.x releases before 1.6.11 won't have it. On the other side, the C++ writer
  292. // supports writing bloom filters since 1.6.0. So files written by the C++ writer
  293. // and with 'softwareVersion' unset would have bad bloom filters.
  294. if (!footer->has_software_version()) return true;
  295. const std::string& fullVersion = footer->software_version();
  296. std::string version;
  297. // Deal with snapshot versions, e.g. 1.6.12-SNAPSHOT.
  298. if (fullVersion.find('-') != std::string::npos) {
  299. version = fullVersion.substr(0, fullVersion.find('-'));
  300. } else {
  301. version = fullVersion;
  302. }
  303. for (const char* v : BAD_CPP_BLOOM_FILTER_VERSIONS) {
  304. if (version == v) {
  305. return true;
  306. }
  307. }
  308. return false;
  309. }
  310. CompressionKind RowReaderImpl::getCompression() const {
  311. return contents->compression;
  312. }
  313. uint64_t RowReaderImpl::getCompressionSize() const {
  314. return contents->blockSize;
  315. }
  316. const std::vector<bool> RowReaderImpl::getSelectedColumns() const {
  317. return selectedColumns;
  318. }
  319. const Type& RowReaderImpl::getSelectedType() const {
  320. if (selectedSchema.get() == nullptr) {
  321. selectedSchema = buildSelectedType(contents->schema.get(), selectedColumns);
  322. }
  323. return *(selectedSchema.get());
  324. }
  325. uint64_t RowReaderImpl::getRowNumber() const {
  326. return previousRow;
  327. }
  328. void RowReaderImpl::seekToRow(uint64_t rowNumber) {
  329. // Empty file
  330. if (lastStripe == 0) {
  331. return;
  332. }
  333. // If we are reading only a portion of the file
  334. // (bounded by firstStripe and lastStripe),
  335. // seeking before or after the portion of interest should return no data.
  336. // Implement this by setting previousRow to the number of rows in the file.
  337. // seeking past lastStripe
  338. uint64_t num_stripes = static_cast<uint64_t>(footer->stripes_size());
  339. if ((lastStripe == num_stripes && rowNumber >= footer->number_of_rows()) ||
  340. (lastStripe < num_stripes && rowNumber >= firstRowOfStripe[lastStripe])) {
  341. currentStripe = num_stripes;
  342. previousRow = footer->number_of_rows();
  343. return;
  344. }
  345. uint64_t seekToStripe = 0;
  346. while (seekToStripe + 1 < lastStripe && firstRowOfStripe[seekToStripe + 1] <= rowNumber) {
  347. seekToStripe++;
  348. }
  349. // seeking before the first stripe
  350. if (seekToStripe < firstStripe) {
  351. currentStripe = num_stripes;
  352. previousRow = footer->number_of_rows();
  353. return;
  354. }
  355. previousRow = rowNumber;
  356. auto rowIndexStride = footer->row_index_stride();
  357. if (!isCurrentStripeInited() || currentStripe != seekToStripe || rowIndexStride == 0 ||
  358. currentStripeInfo.index_length() == 0) {
  359. // current stripe is not initialized or
  360. // target stripe is not current stripe or
  361. // current stripe doesn't have row indexes
  362. currentStripe = seekToStripe;
  363. currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
  364. startNextStripe();
  365. if (currentStripe >= lastStripe) {
  366. return;
  367. }
  368. } else {
  369. currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
  370. if (sargsApplier) {
  371. // advance to selected row group if predicate pushdown is enabled
  372. currentRowInStripe =
  373. advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe,
  374. footer->row_index_stride(), sargsApplier->getNextSkippedRows());
  375. }
  376. }
  377. uint64_t rowsToSkip = currentRowInStripe;
  378. // seek to the target row group if row indexes exists
  379. if (rowIndexStride > 0 && currentStripeInfo.index_length() > 0) {
  380. if (rowIndexes.empty()) {
  381. loadStripeIndex();
  382. }
  383. // TODO(ORC-1175): process the failures of loadStripeIndex() call
  384. seekToRowGroup(static_cast<uint32_t>(rowsToSkip / rowIndexStride));
  385. // skip leading rows in the target row group
  386. rowsToSkip %= rowIndexStride;
  387. }
  388. // 'reader' is reset in startNextStripe(). It could be nullptr if 'rowsToSkip' is 0,
  389. // e.g. when startNextStripe() skips all remaining rows of the file.
  390. if (rowsToSkip > 0) {
  391. reader->skip(rowsToSkip);
  392. }
  393. }
  394. void RowReaderImpl::loadStripeIndex() {
  395. // reset all previous row indexes
  396. rowIndexes.clear();
  397. bloomFilterIndex.clear();
  398. // obtain row indexes for selected columns
  399. uint64_t offset = currentStripeInfo.offset();
  400. for (int i = 0; i < currentStripeFooter.streams_size(); ++i) {
  401. const proto::Stream& pbStream = currentStripeFooter.streams(i);
  402. uint64_t colId = pbStream.column();
  403. if (selectedColumns[colId] && pbStream.has_kind() &&
  404. (pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
  405. pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
  406. std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
  407. getCompression(),
  408. std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
  409. contents->stream.get(), offset, pbStream.length(), *contents->pool)),
  410. getCompressionSize(), *contents->pool, contents->readerMetrics);
  411. if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
  412. proto::RowIndex rowIndex;
  413. if (!rowIndex.ParseFromZeroCopyStream(inStream.get())) {
  414. throw ParseError("Failed to parse the row index");
  415. }
  416. rowIndexes[colId] = rowIndex;
  417. } else if (!skipBloomFilters) { // Stream_Kind_BLOOM_FILTER_UTF8
  418. proto::BloomFilterIndex pbBFIndex;
  419. if (!pbBFIndex.ParseFromZeroCopyStream(inStream.get())) {
  420. throw ParseError("Failed to parse bloom filter index");
  421. }
  422. BloomFilterIndex bfIndex;
  423. for (int j = 0; j < pbBFIndex.bloom_filter_size(); j++) {
  424. bfIndex.entries.push_back(BloomFilterUTF8Utils::deserialize(
  425. pbStream.kind(), currentStripeFooter.columns(static_cast<int>(pbStream.column())),
  426. pbBFIndex.bloom_filter(j)));
  427. }
  428. // add bloom filters to result for one column
  429. bloomFilterIndex[pbStream.column()] = bfIndex;
  430. }
  431. }
  432. offset += pbStream.length();
  433. }
  434. }
  435. void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
  436. // store positions for selected columns
  437. std::list<std::list<uint64_t>> positions;
  438. // store position providers for selected colimns
  439. std::unordered_map<uint64_t, PositionProvider> positionProviders;
  440. for (auto rowIndex = rowIndexes.cbegin(); rowIndex != rowIndexes.cend(); ++rowIndex) {
  441. uint64_t colId = rowIndex->first;
  442. const proto::RowIndexEntry& entry =
  443. rowIndex->second.entry(static_cast<int32_t>(rowGroupEntryId));
  444. // copy index positions for a specific column
  445. positions.push_back({});
  446. auto& position = positions.back();
  447. for (int pos = 0; pos != entry.positions_size(); ++pos) {
  448. position.push_back(entry.positions(pos));
  449. }
  450. positionProviders.insert(std::make_pair(colId, PositionProvider(position)));
  451. }
  452. reader->seekToRowGroup(positionProviders);
  453. }
  454. const FileContents& RowReaderImpl::getFileContents() const {
  455. return *contents;
  456. }
  457. bool RowReaderImpl::getThrowOnHive11DecimalOverflow() const {
  458. return throwOnHive11DecimalOverflow;
  459. }
  460. bool RowReaderImpl::getIsDecimalAsLong() const {
  461. return contents->isDecimalAsLong;
  462. }
  463. int32_t RowReaderImpl::getForcedScaleOnHive11Decimal() const {
  464. return forcedScaleOnHive11Decimal;
  465. }
  466. proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
  467. const FileContents& contents) {
  468. uint64_t stripeFooterStart = info.offset() + info.index_length() + info.data_length();
  469. uint64_t stripeFooterLength = info.footer_length();
  470. std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
  471. contents.compression,
  472. std::make_unique<SeekableFileInputStream>(contents.stream.get(), stripeFooterStart,
  473. stripeFooterLength, *contents.pool),
  474. contents.blockSize, *contents.pool, contents.readerMetrics);
  475. proto::StripeFooter result;
  476. if (!result.ParseFromZeroCopyStream(pbStream.get())) {
  477. throw ParseError(std::string("bad StripeFooter from ") + pbStream->getName());
  478. }
  479. // Verify StripeFooter in case it's corrupt
  480. if (result.columns_size() != contents.footer->types_size()) {
  481. std::stringstream msg;
  482. msg << "bad number of ColumnEncodings in StripeFooter: expected="
  483. << contents.footer->types_size() << ", actual=" << result.columns_size();
  484. throw ParseError(msg.str());
  485. }
  486. return result;
  487. }
  488. ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> _contents, const ReaderOptions& opts,
  489. uint64_t _fileLength, uint64_t _postscriptLength)
  490. : contents(std::move(_contents)),
  491. options(opts),
  492. fileLength(_fileLength),
  493. postscriptLength(_postscriptLength),
  494. footer(contents->footer.get()) {
  495. isMetadataLoaded = false;
  496. checkOrcVersion();
  497. numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
  498. contents->schema = convertType(footer->types(0), *footer);
  499. contents->blockSize = getCompressionBlockSize(*contents->postscript);
  500. contents->compression = convertCompressionKind(*contents->postscript);
  501. }
  502. std::string ReaderImpl::getSerializedFileTail() const {
  503. proto::FileTail tail;
  504. proto::PostScript* mutable_ps = tail.mutable_postscript();
  505. mutable_ps->CopyFrom(*contents->postscript);
  506. proto::Footer* mutableFooter = tail.mutable_footer();
  507. mutableFooter->CopyFrom(*footer);
  508. tail.set_file_length(fileLength);
  509. tail.set_postscript_length(postscriptLength);
  510. TProtoStringType result;
  511. if (!tail.SerializeToString(&result)) {
  512. throw ParseError("Failed to serialize file tail");
  513. }
  514. return result;
  515. }
  516. const ReaderOptions& ReaderImpl::getReaderOptions() const {
  517. return options;
  518. }
  519. CompressionKind ReaderImpl::getCompression() const {
  520. return contents->compression;
  521. }
  522. uint64_t ReaderImpl::getCompressionSize() const {
  523. return contents->blockSize;
  524. }
  525. uint64_t ReaderImpl::getNumberOfStripes() const {
  526. return numberOfStripes;
  527. }
  528. uint64_t ReaderImpl::getNumberOfStripeStatistics() const {
  529. if (!isMetadataLoaded) {
  530. readMetadata();
  531. }
  532. return contents->metadata == nullptr
  533. ? 0
  534. : static_cast<uint64_t>(contents->metadata->stripe_stats_size());
  535. }
  536. std::unique_ptr<StripeInformation> ReaderImpl::getStripe(uint64_t stripeIndex) const {
  537. if (stripeIndex > getNumberOfStripes()) {
  538. throw std::logic_error("stripe index out of range");
  539. }
  540. proto::StripeInformation stripeInfo = footer->stripes(static_cast<int>(stripeIndex));
  541. return std::unique_ptr<StripeInformation>(new StripeInformationImpl(
  542. stripeInfo.offset(), stripeInfo.index_length(), stripeInfo.data_length(),
  543. stripeInfo.footer_length(), stripeInfo.number_of_rows(), contents->stream.get(),
  544. *contents->pool, contents->compression, contents->blockSize, contents->readerMetrics));
  545. }
  546. FileVersion ReaderImpl::getFormatVersion() const {
  547. if (contents->postscript->version_size() != 2) {
  548. return FileVersion::v_0_11();
  549. }
  550. return {contents->postscript->version(0), contents->postscript->version(1)};
  551. }
  552. uint64_t ReaderImpl::getNumberOfRows() const {
  553. return footer->number_of_rows();
  554. }
  555. WriterId ReaderImpl::getWriterId() const {
  556. if (footer->has_writer()) {
  557. uint32_t id = footer->writer();
  558. if (id > WriterId::CUDF_WRITER) {
  559. return WriterId::UNKNOWN_WRITER;
  560. } else {
  561. return static_cast<WriterId>(id);
  562. }
  563. }
  564. return WriterId::ORC_JAVA_WRITER;
  565. }
  566. uint32_t ReaderImpl::getWriterIdValue() const {
  567. if (footer->has_writer()) {
  568. return footer->writer();
  569. } else {
  570. return WriterId::ORC_JAVA_WRITER;
  571. }
  572. }
  573. std::string ReaderImpl::getSoftwareVersion() const {
  574. std::ostringstream buffer;
  575. buffer << writerIdToString(getWriterIdValue());
  576. if (footer->has_software_version()) {
  577. buffer << " " << footer->software_version();
  578. }
  579. return buffer.str();
  580. }
  581. WriterVersion ReaderImpl::getWriterVersion() const {
  582. return getWriterVersionImpl(contents.get());
  583. }
  584. uint64_t ReaderImpl::getContentLength() const {
  585. return footer->content_length();
  586. }
  587. uint64_t ReaderImpl::getStripeStatisticsLength() const {
  588. return contents->postscript->metadata_length();
  589. }
  590. uint64_t ReaderImpl::getFileFooterLength() const {
  591. return contents->postscript->footer_length();
  592. }
  593. uint64_t ReaderImpl::getFilePostscriptLength() const {
  594. return postscriptLength;
  595. }
  596. uint64_t ReaderImpl::getFileLength() const {
  597. return fileLength;
  598. }
  599. uint64_t ReaderImpl::getRowIndexStride() const {
  600. return footer->row_index_stride();
  601. }
  602. const std::string& ReaderImpl::getStreamName() const {
  603. return contents->stream->getName();
  604. }
  605. std::list<std::string> ReaderImpl::getMetadataKeys() const {
  606. std::list<std::string> result;
  607. for (int i = 0; i < footer->metadata_size(); ++i) {
  608. result.push_back(footer->metadata(i).name());
  609. }
  610. return result;
  611. }
  612. std::string ReaderImpl::getMetadataValue(const std::string& key) const {
  613. for (int i = 0; i < footer->metadata_size(); ++i) {
  614. if (footer->metadata(i).name() == TProtoStringType(key)) {
  615. return footer->metadata(i).value();
  616. }
  617. }
  618. throw std::range_error("key not found");
  619. }
  620. void ReaderImpl::getRowIndexStatistics(
  621. const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
  622. const proto::StripeFooter& currentStripeFooter,
  623. std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const {
  624. int num_streams = currentStripeFooter.streams_size();
  625. uint64_t offset = stripeInfo.offset();
  626. uint64_t indexEnd = stripeInfo.offset() + stripeInfo.index_length();
  627. for (int i = 0; i < num_streams; i++) {
  628. const proto::Stream& stream = currentStripeFooter.streams(i);
  629. StreamKind streamKind = static_cast<StreamKind>(stream.kind());
  630. uint64_t length = static_cast<uint64_t>(stream.length());
  631. if (streamKind == StreamKind::StreamKind_ROW_INDEX) {
  632. if (offset + length > indexEnd) {
  633. std::stringstream msg;
  634. msg << "Malformed RowIndex stream meta in stripe " << stripeIndex
  635. << ": streamOffset=" << offset << ", streamLength=" << length
  636. << ", stripeOffset=" << stripeInfo.offset()
  637. << ", stripeIndexLength=" << stripeInfo.index_length();
  638. throw ParseError(msg.str());
  639. }
  640. std::unique_ptr<SeekableInputStream> pbStream =
  641. createDecompressor(contents->compression,
  642. std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
  643. contents->stream.get(), offset, length, *contents->pool)),
  644. contents->blockSize, *(contents->pool), contents->readerMetrics);
  645. proto::RowIndex rowIndex;
  646. if (!rowIndex.ParseFromZeroCopyStream(pbStream.get())) {
  647. throw ParseError("Failed to parse RowIndex from stripe footer");
  648. }
  649. int num_entries = rowIndex.entry_size();
  650. size_t column = static_cast<size_t>(stream.column());
  651. for (int j = 0; j < num_entries; j++) {
  652. const proto::RowIndexEntry& entry = rowIndex.entry(j);
  653. (*indexStats)[column].push_back(entry.statistics());
  654. }
  655. }
  656. offset += length;
  657. }
  658. }
  659. bool ReaderImpl::hasMetadataValue(const std::string& key) const {
  660. for (int i = 0; i < footer->metadata_size(); ++i) {
  661. if (footer->metadata(i).name() == TProtoStringType(key)) {
  662. return true;
  663. }
  664. }
  665. return false;
  666. }
  667. const Type& ReaderImpl::getType() const {
  668. return *(contents->schema.get());
  669. }
  670. std::unique_ptr<StripeStatistics> ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const {
  671. if (!isMetadataLoaded) {
  672. readMetadata();
  673. }
  674. if (contents->metadata == nullptr) {
  675. throw std::logic_error("No stripe statistics in file");
  676. }
  677. size_t num_cols = static_cast<size_t>(
  678. contents->metadata->stripe_stats(static_cast<int>(stripeIndex)).col_stats_size());
  679. std::vector<std::vector<proto::ColumnStatistics>> indexStats(num_cols);
  680. proto::StripeInformation currentStripeInfo = footer->stripes(static_cast<int>(stripeIndex));
  681. proto::StripeFooter currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
  682. getRowIndexStatistics(currentStripeInfo, stripeIndex, currentStripeFooter, &indexStats);
  683. const Timezone& writerTZ = currentStripeFooter.has_writer_timezone()
  684. ? getTimezoneByName(currentStripeFooter.writer_timezone())
  685. : getLocalTimezone();
  686. StatContext statContext(hasCorrectStatistics(), &writerTZ);
  687. return std::make_unique<StripeStatisticsImpl>(
  688. contents->metadata->stripe_stats(static_cast<int>(stripeIndex)), indexStats, statContext);
  689. }
  690. std::unique_ptr<Statistics> ReaderImpl::getStatistics() const {
  691. StatContext statContext(hasCorrectStatistics());
  692. return std::make_unique<StatisticsImpl>(*footer, statContext);
  693. }
  694. std::unique_ptr<ColumnStatistics> ReaderImpl::getColumnStatistics(uint32_t index) const {
  695. if (index >= static_cast<uint64_t>(footer->statistics_size())) {
  696. throw std::logic_error("column index out of range");
  697. }
  698. proto::ColumnStatistics col = footer->statistics(static_cast<int32_t>(index));
  699. StatContext statContext(hasCorrectStatistics());
  700. return std::unique_ptr<ColumnStatistics>(convertColumnStatistics(col, statContext));
  701. }
  702. void ReaderImpl::readMetadata() const {
  703. uint64_t metadataSize = contents->postscript->metadata_length();
  704. uint64_t footerLength = contents->postscript->footer_length();
  705. if (fileLength < metadataSize + footerLength + postscriptLength + 1) {
  706. std::stringstream msg;
  707. msg << "Invalid Metadata length: fileLength=" << fileLength
  708. << ", metadataLength=" << metadataSize << ", footerLength=" << footerLength
  709. << ", postscriptLength=" << postscriptLength;
  710. throw ParseError(msg.str());
  711. }
  712. uint64_t metadataStart = fileLength - metadataSize - footerLength - postscriptLength - 1;
  713. if (metadataSize != 0) {
  714. std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
  715. contents->compression,
  716. std::make_unique<SeekableFileInputStream>(contents->stream.get(), metadataStart,
  717. metadataSize, *contents->pool),
  718. contents->blockSize, *contents->pool, contents->readerMetrics);
  719. contents->metadata.reset(new proto::Metadata());
  720. if (!contents->metadata->ParseFromZeroCopyStream(pbStream.get())) {
  721. throw ParseError("Failed to parse the metadata");
  722. }
  723. }
  724. isMetadataLoaded = true;
  725. }
  726. bool ReaderImpl::hasCorrectStatistics() const {
  727. return !WriterVersionImpl::VERSION_HIVE_8732().compareGT(getWriterVersion());
  728. }
  729. void ReaderImpl::checkOrcVersion() {
  730. FileVersion version = getFormatVersion();
  731. if (version != FileVersion(0, 11) && version != FileVersion(0, 12)) {
  732. *(options.getErrorStream()) << "Warning: ORC file " << contents->stream->getName()
  733. << " was written in an unknown format version "
  734. << version.toString() << "\n";
  735. }
  736. }
  737. std::unique_ptr<RowReader> ReaderImpl::createRowReader() const {
  738. RowReaderOptions defaultOpts;
  739. return createRowReader(defaultOpts);
  740. }
  741. std::unique_ptr<RowReader> ReaderImpl::createRowReader(const RowReaderOptions& opts) const {
  742. if (opts.getSearchArgument() && !isMetadataLoaded) {
  743. // load stripe statistics for PPD
  744. readMetadata();
  745. }
  746. return std::make_unique<RowReaderImpl>(contents, opts);
  747. }
  748. uint64_t maxStreamsForType(const proto::Type& type) {
  749. switch (static_cast<int64_t>(type.kind())) {
  750. case proto::Type_Kind_STRUCT:
  751. return 1;
  752. case proto::Type_Kind_INT:
  753. case proto::Type_Kind_LONG:
  754. case proto::Type_Kind_SHORT:
  755. case proto::Type_Kind_FLOAT:
  756. case proto::Type_Kind_DOUBLE:
  757. case proto::Type_Kind_BOOLEAN:
  758. case proto::Type_Kind_BYTE:
  759. case proto::Type_Kind_DATE:
  760. case proto::Type_Kind_LIST:
  761. case proto::Type_Kind_MAP:
  762. case proto::Type_Kind_UNION:
  763. return 2;
  764. case proto::Type_Kind_BINARY:
  765. case proto::Type_Kind_DECIMAL:
  766. case proto::Type_Kind_TIMESTAMP:
  767. case proto::Type_Kind_TIMESTAMP_INSTANT:
  768. return 3;
  769. case proto::Type_Kind_CHAR:
  770. case proto::Type_Kind_STRING:
  771. case proto::Type_Kind_VARCHAR:
  772. return 4;
  773. default:
  774. return 0;
  775. }
  776. }
  777. uint64_t ReaderImpl::getMemoryUse(int stripeIx) {
  778. std::vector<bool> selectedColumns;
  779. selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), true);
  780. return getMemoryUse(stripeIx, selectedColumns);
  781. }
  782. uint64_t ReaderImpl::getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx) {
  783. std::vector<bool> selectedColumns;
  784. selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
  785. ColumnSelector column_selector(contents.get());
  786. if (contents->schema->getKind() == STRUCT && include.begin() != include.end()) {
  787. for (std::list<uint64_t>::const_iterator field = include.begin(); field != include.end();
  788. ++field) {
  789. column_selector.updateSelectedByFieldId(selectedColumns, *field);
  790. }
  791. } else {
  792. // default is to select all columns
  793. std::fill(selectedColumns.begin(), selectedColumns.end(), true);
  794. }
  795. column_selector.selectParents(selectedColumns, *contents->schema.get());
  796. selectedColumns[0] = true; // column 0 is selected by default
  797. return getMemoryUse(stripeIx, selectedColumns);
  798. }
  799. uint64_t ReaderImpl::getMemoryUseByName(const std::list<std::string>& names, int stripeIx) {
  800. std::vector<bool> selectedColumns;
  801. selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
  802. ColumnSelector column_selector(contents.get());
  803. if (contents->schema->getKind() == STRUCT && names.begin() != names.end()) {
  804. for (std::list<std::string>::const_iterator field = names.begin(); field != names.end();
  805. ++field) {
  806. column_selector.updateSelectedByName(selectedColumns, *field);
  807. }
  808. } else {
  809. // default is to select all columns
  810. std::fill(selectedColumns.begin(), selectedColumns.end(), true);
  811. }
  812. column_selector.selectParents(selectedColumns, *contents->schema.get());
  813. selectedColumns[0] = true; // column 0 is selected by default
  814. return getMemoryUse(stripeIx, selectedColumns);
  815. }
  816. uint64_t ReaderImpl::getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx) {
  817. std::vector<bool> selectedColumns;
  818. selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
  819. ColumnSelector column_selector(contents.get());
  820. if (include.begin() != include.end()) {
  821. for (std::list<uint64_t>::const_iterator field = include.begin(); field != include.end();
  822. ++field) {
  823. column_selector.updateSelectedByTypeId(selectedColumns, *field);
  824. }
  825. } else {
  826. // default is to select all columns
  827. std::fill(selectedColumns.begin(), selectedColumns.end(), true);
  828. }
  829. column_selector.selectParents(selectedColumns, *contents->schema.get());
  830. selectedColumns[0] = true; // column 0 is selected by default
  831. return getMemoryUse(stripeIx, selectedColumns);
  832. }
  833. uint64_t ReaderImpl::getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns) {
  834. uint64_t maxDataLength = 0;
  835. if (stripeIx >= 0 && stripeIx < footer->stripes_size()) {
  836. uint64_t stripe = footer->stripes(stripeIx).data_length();
  837. if (maxDataLength < stripe) {
  838. maxDataLength = stripe;
  839. }
  840. } else {
  841. for (int i = 0; i < footer->stripes_size(); i++) {
  842. uint64_t stripe = footer->stripes(i).data_length();
  843. if (maxDataLength < stripe) {
  844. maxDataLength = stripe;
  845. }
  846. }
  847. }
  848. bool hasStringColumn = false;
  849. uint64_t nSelectedStreams = 0;
  850. for (int i = 0; !hasStringColumn && i < footer->types_size(); i++) {
  851. if (selectedColumns[static_cast<size_t>(i)]) {
  852. const proto::Type& type = footer->types(i);
  853. nSelectedStreams += maxStreamsForType(type);
  854. switch (static_cast<int64_t>(type.kind())) {
  855. case proto::Type_Kind_CHAR:
  856. case proto::Type_Kind_STRING:
  857. case proto::Type_Kind_VARCHAR:
  858. case proto::Type_Kind_BINARY: {
  859. hasStringColumn = true;
  860. break;
  861. }
  862. default: {
  863. break;
  864. }
  865. }
  866. }
  867. }
  868. /* If a string column is read, use stripe data_length as a memory estimate
  869. * because we don't know the dictionary size. Multiply by 2 because
  870. * a string column requires two buffers:
  871. * in the input stream and in the seekable input stream.
  872. * If no string column is read, estimate from the number of streams.
  873. */
  874. uint64_t memory = hasStringColumn
  875. ? 2 * maxDataLength
  876. : std::min(uint64_t(maxDataLength),
  877. nSelectedStreams * contents->stream->getNaturalReadSize());
  878. // Do we need even more memory to read the footer or the metadata?
  879. if (memory < contents->postscript->footer_length() + DIRECTORY_SIZE_GUESS) {
  880. memory = contents->postscript->footer_length() + DIRECTORY_SIZE_GUESS;
  881. }
  882. if (memory < contents->postscript->metadata_length()) {
  883. memory = contents->postscript->metadata_length();
  884. }
  885. // Account for firstRowOfStripe.
  886. memory += static_cast<uint64_t>(footer->stripes_size()) * sizeof(uint64_t);
  887. // Decompressors need buffers for each stream
  888. uint64_t decompressorMemory = 0;
  889. if (contents->compression != CompressionKind_NONE) {
  890. for (int i = 0; i < footer->types_size(); i++) {
  891. if (selectedColumns[static_cast<size_t>(i)]) {
  892. const proto::Type& type = footer->types(i);
  893. decompressorMemory += maxStreamsForType(type) * contents->blockSize;
  894. }
  895. }
  896. if (contents->compression == CompressionKind_SNAPPY) {
  897. decompressorMemory *= 2; // Snappy decompressor uses a second buffer
  898. }
  899. }
  900. return memory + decompressorMemory;
  901. }
  902. // Update fields to indicate we've reached the end of file
  903. void RowReaderImpl::markEndOfFile() {
  904. currentStripe = lastStripe;
  905. currentRowInStripe = 0;
  906. rowsInCurrentStripe = 0;
  907. if (lastStripe == 0) {
  908. // Empty file
  909. previousRow = 0;
  910. } else {
  911. previousRow = firstRowOfStripe[lastStripe - 1] +
  912. footer->stripes(static_cast<int>(lastStripe - 1)).number_of_rows();
  913. }
  914. }
  915. void RowReaderImpl::startNextStripe() {
  916. reader.reset(); // ColumnReaders use lots of memory; free old memory first
  917. rowIndexes.clear();
  918. bloomFilterIndex.clear();
  919. // evaluate file statistics if it exists
  920. if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer, numRowGroupsInStripeRange)) {
  921. // skip the entire file
  922. markEndOfFile();
  923. return;
  924. }
  925. do {
  926. currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
  927. uint64_t fileLength = contents->stream->getLength();
  928. if (currentStripeInfo.offset() + currentStripeInfo.index_length() +
  929. currentStripeInfo.data_length() + currentStripeInfo.footer_length() >=
  930. fileLength) {
  931. std::stringstream msg;
  932. msg << "Malformed StripeInformation at stripe index " << currentStripe
  933. << ": fileLength=" << fileLength
  934. << ", StripeInfo=(offset=" << currentStripeInfo.offset()
  935. << ", indexLength=" << currentStripeInfo.index_length()
  936. << ", dataLength=" << currentStripeInfo.data_length()
  937. << ", footerLength=" << currentStripeInfo.footer_length() << ")";
  938. throw ParseError(msg.str());
  939. }
  940. currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
  941. rowsInCurrentStripe = currentStripeInfo.number_of_rows();
  942. processingStripe = currentStripe;
  943. if (sargsApplier) {
  944. bool isStripeNeeded = true;
  945. if (contents->metadata) {
  946. const auto& currentStripeStats =
  947. contents->metadata->stripe_stats(static_cast<int>(currentStripe));
  948. // skip this stripe after stats fail to satisfy sargs
  949. uint64_t stripeRowGroupCount =
  950. (rowsInCurrentStripe + footer->row_index_stride() - 1) / footer->row_index_stride();
  951. isStripeNeeded =
  952. sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount);
  953. }
  954. if (isStripeNeeded) {
  955. // read row group statistics and bloom filters of current stripe
  956. loadStripeIndex();
  957. // select row groups to read in the current stripe
  958. sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, bloomFilterIndex);
  959. if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
  960. // current stripe has at least one row group matching the predicate
  961. break;
  962. }
  963. isStripeNeeded = false;
  964. }
  965. if (!isStripeNeeded) {
  966. // advance to next stripe when current stripe has no matching rows
  967. currentStripe += 1;
  968. currentRowInStripe = 0;
  969. }
  970. }
  971. } while (sargsApplier && currentStripe < lastStripe);
  972. if (currentStripe < lastStripe) {
  973. // get writer timezone info from stripe footer to help understand timestamp values.
  974. const Timezone& writerTimezone =
  975. currentStripeFooter.has_writer_timezone()
  976. ? getTimezoneByName(currentStripeFooter.writer_timezone())
  977. : localTimezone;
  978. StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, currentStripeFooter,
  979. currentStripeInfo.offset(), *contents->stream, writerTimezone,
  980. readerTimezone);
  981. reader = buildReader(*contents->schema, stripeStreams, useTightNumericVector,
  982. throwOnSchemaEvolutionOverflow, /*convertToReadType=*/true);
  983. if (sargsApplier) {
  984. // move to the 1st selected row group when PPD is enabled.
  985. currentRowInStripe =
  986. advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe,
  987. footer->row_index_stride(), sargsApplier->getNextSkippedRows());
  988. previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1;
  989. if (currentRowInStripe > 0) {
  990. seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->row_index_stride()));
  991. }
  992. }
  993. } else {
  994. // All remaining stripes are skipped.
  995. markEndOfFile();
  996. }
  997. }
  998. bool RowReaderImpl::next(ColumnVectorBatch& data) {
  999. SCOPED_STOPWATCH(contents->readerMetrics, ReaderInclusiveLatencyUs, ReaderCall);
  1000. if (currentStripe >= lastStripe) {
  1001. data.numElements = 0;
  1002. markEndOfFile();
  1003. return false;
  1004. }
  1005. if (currentRowInStripe == 0) {
  1006. startNextStripe();
  1007. }
  1008. uint64_t rowsToRead =
  1009. std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe - currentRowInStripe);
  1010. if (sargsApplier && rowsToRead > 0) {
  1011. rowsToRead = computeBatchSize(rowsToRead, currentRowInStripe, rowsInCurrentStripe,
  1012. footer->row_index_stride(), sargsApplier->getNextSkippedRows());
  1013. }
  1014. data.numElements = rowsToRead;
  1015. if (rowsToRead == 0) {
  1016. markEndOfFile();
  1017. return false;
  1018. }
  1019. if (enableEncodedBlock) {
  1020. reader->nextEncoded(data, rowsToRead, nullptr);
  1021. } else {
  1022. reader->next(data, rowsToRead, nullptr);
  1023. }
  1024. // update row number
  1025. previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
  1026. currentRowInStripe += rowsToRead;
  1027. // check if we need to advance to next selected row group
  1028. if (sargsApplier) {
  1029. uint64_t nextRowToRead =
  1030. advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe, footer->row_index_stride(),
  1031. sargsApplier->getNextSkippedRows());
  1032. if (currentRowInStripe != nextRowToRead) {
  1033. // it is guaranteed to be at start of a row group
  1034. currentRowInStripe = nextRowToRead;
  1035. if (currentRowInStripe < rowsInCurrentStripe) {
  1036. seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->row_index_stride()));
  1037. }
  1038. }
  1039. }
  1040. if (currentRowInStripe >= rowsInCurrentStripe) {
  1041. currentStripe += 1;
  1042. currentRowInStripe = 0;
  1043. }
  1044. return rowsToRead != 0;
  1045. }
  1046. uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize, uint64_t currentRowInStripe,
  1047. uint64_t rowsInCurrentStripe, uint64_t rowIndexStride,
  1048. const std::vector<uint64_t>& nextSkippedRows) {
  1049. // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
  1050. // groups are selected then marker position is set to the end of range (subset of row groups
  1051. // within stripe).
  1052. uint64_t endRowInStripe = rowsInCurrentStripe;
  1053. uint64_t groupsInStripe = nextSkippedRows.size();
  1054. if (groupsInStripe > 0) {
  1055. auto rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
  1056. if (rg >= groupsInStripe) return 0;
  1057. uint64_t nextSkippedRow = nextSkippedRows[rg];
  1058. if (nextSkippedRow == 0) return 0;
  1059. endRowInStripe = nextSkippedRow;
  1060. }
  1061. return std::min(requestedSize, endRowInStripe - currentRowInStripe);
  1062. }
  1063. uint64_t RowReaderImpl::advanceToNextRowGroup(uint64_t currentRowInStripe,
  1064. uint64_t rowsInCurrentStripe,
  1065. uint64_t rowIndexStride,
  1066. const std::vector<uint64_t>& nextSkippedRows) {
  1067. auto groupsInStripe = nextSkippedRows.size();
  1068. if (groupsInStripe == 0) {
  1069. // No PPD, keeps using the current row in stripe
  1070. return std::min(currentRowInStripe, rowsInCurrentStripe);
  1071. }
  1072. auto rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
  1073. if (rg >= groupsInStripe) {
  1074. // Points to the end of the stripe
  1075. return rowsInCurrentStripe;
  1076. }
  1077. if (nextSkippedRows[rg] != 0) {
  1078. // Current row group is selected
  1079. return currentRowInStripe;
  1080. }
  1081. // Advance to the next selected row group
  1082. while (rg < groupsInStripe && nextSkippedRows[rg] == 0) ++rg;
  1083. if (rg < groupsInStripe) {
  1084. return rg * rowIndexStride;
  1085. }
  1086. return rowsInCurrentStripe;
  1087. }
  1088. static void getColumnIds(const Type* type, std::set<uint64_t>& columnIds) {
  1089. columnIds.insert(type->getColumnId());
  1090. for (uint64_t i = 0; i < type->getSubtypeCount(); ++i) {
  1091. getColumnIds(type->getSubtype(i), columnIds);
  1092. }
  1093. }
  1094. std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch(uint64_t capacity) const {
  1095. // If the read type is specified, then check that the selected schema matches the read type
  1096. // on the first call to createRowBatch.
  1097. if (schemaEvolution.getReadType() && selectedSchema.get() == nullptr) {
  1098. auto fileSchema = &getSelectedType();
  1099. auto readType = schemaEvolution.getReadType();
  1100. std::set<uint64_t> readColumns, fileColumns;
  1101. getColumnIds(readType, readColumns);
  1102. getColumnIds(fileSchema, fileColumns);
  1103. if (readColumns != fileColumns) {
  1104. std::ostringstream ss;
  1105. ss << "The selected schema " << fileSchema->toString() << " doesn't match read type "
  1106. << readType->toString();
  1107. throw SchemaEvolutionError(ss.str());
  1108. }
  1109. }
  1110. const Type& readType =
  1111. schemaEvolution.getReadType() ? *schemaEvolution.getReadType() : getSelectedType();
  1112. return readType.createRowBatch(capacity, *contents->pool, enableEncodedBlock,
  1113. useTightNumericVector);
  1114. }
  1115. void ensureOrcFooter(InputStream* stream, DataBuffer<char>* buffer, uint64_t postscriptLength) {
  1116. const std::string MAGIC("ORC");
  1117. const uint64_t magicLength = MAGIC.length();
  1118. const char* const bufferStart = buffer->data();
  1119. const uint64_t bufferLength = buffer->size();
  1120. if (postscriptLength < magicLength || bufferLength < magicLength) {
  1121. throw ParseError("Invalid ORC postscript length");
  1122. }
  1123. const char* magicStart = bufferStart + bufferLength - 1 - magicLength;
  1124. // Look for the magic string at the end of the postscript.
  1125. if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) {
  1126. // If there is no magic string at the end, check the beginning.
  1127. // Only files written by Hive 0.11.0 don't have the tail ORC string.
  1128. std::unique_ptr<char[]> frontBuffer(new char[magicLength]);
  1129. stream->read(frontBuffer.get(), magicLength, 0);
  1130. bool foundMatch = memcmp(frontBuffer.get(), MAGIC.c_str(), magicLength) == 0;
  1131. if (!foundMatch) {
  1132. throw ParseError("Not an ORC file");
  1133. }
  1134. }
  1135. }
  1136. /**
  1137. * Read the file's postscript from the given buffer.
  1138. * @param stream the file stream
  1139. * @param buffer the buffer with the tail of the file.
  1140. * @param postscriptSize the length of postscript in bytes
  1141. */
  1142. std::unique_ptr<proto::PostScript> readPostscript(InputStream* stream, DataBuffer<char>* buffer,
  1143. uint64_t postscriptSize) {
  1144. char* ptr = buffer->data();
  1145. uint64_t readSize = buffer->size();
  1146. ensureOrcFooter(stream, buffer, postscriptSize);
  1147. auto postscript = std::make_unique<proto::PostScript>();
  1148. if (readSize < 1 + postscriptSize) {
  1149. std::stringstream msg;
  1150. msg << "Invalid ORC postscript length: " << postscriptSize
  1151. << ", file length = " << stream->getLength();
  1152. throw ParseError(msg.str());
  1153. }
  1154. if (!postscript->ParseFromArray(ptr + readSize - 1 - postscriptSize,
  1155. static_cast<int>(postscriptSize))) {
  1156. throw ParseError("Failed to parse the postscript from " + stream->getName());
  1157. }
  1158. return postscript;
  1159. }
  1160. /**
  1161. * Check that proto Types are valid. Indices in the type tree should be valid,
  1162. * so we won't crash when we convert the proto::Types to TypeImpls (ORC-317).
  1163. * For STRUCT types, fieldName size should match subTypes size (ORC-581).
  1164. */
  1165. void checkProtoTypes(const proto::Footer& footer) {
  1166. std::stringstream msg;
  1167. int maxId = footer.types_size();
  1168. if (maxId <= 0) {
  1169. throw ParseError("Footer is corrupt: no types found");
  1170. }
  1171. for (int i = 0; i < maxId; ++i) {
  1172. const proto::Type& type = footer.types(i);
  1173. if (type.kind() == proto::Type_Kind_STRUCT &&
  1174. type.subtypes_size() != type.field_names_size()) {
  1175. msg << "Footer is corrupt: STRUCT type " << i << " has " << type.subtypes_size()
  1176. << " subTypes, but has " << type.field_names_size() << " fieldNames";
  1177. throw ParseError(msg.str());
  1178. }
  1179. for (int j = 0; j < type.subtypes_size(); ++j) {
  1180. int subTypeId = static_cast<int>(type.subtypes(j));
  1181. if (subTypeId <= i) {
  1182. msg << "Footer is corrupt: malformed link from type " << i << " to " << subTypeId;
  1183. throw ParseError(msg.str());
  1184. }
  1185. if (subTypeId >= maxId) {
  1186. msg << "Footer is corrupt: types(" << subTypeId << ") not exists";
  1187. throw ParseError(msg.str());
  1188. }
  1189. if (j > 0 && static_cast<int>(type.subtypes(j - 1)) >= subTypeId) {
  1190. msg << "Footer is corrupt: subType(" << (j - 1) << ") >= subType(" << j << ") in types("
  1191. << i << "). (" << type.subtypes(j - 1) << " >= " << subTypeId << ")";
  1192. throw ParseError(msg.str());
  1193. }
  1194. }
  1195. }
  1196. }
  1197. /**
  1198. * Parse the footer from the given buffer.
  1199. * @param stream the file's stream
  1200. * @param buffer the buffer to parse the footer from
  1201. * @param footerOffset the offset within the buffer that contains the footer
  1202. * @param ps the file's postscript
  1203. * @param memoryPool the memory pool to use
  1204. */
  1205. std::unique_ptr<proto::Footer> readFooter(InputStream* stream, const DataBuffer<char>* buffer,
  1206. uint64_t footerOffset, const proto::PostScript& ps,
  1207. MemoryPool& memoryPool, ReaderMetrics* readerMetrics) {
  1208. const char* footerPtr = buffer->data() + footerOffset;
  1209. std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
  1210. convertCompressionKind(ps),
  1211. std::make_unique<SeekableArrayInputStream>(footerPtr, ps.footer_length()),
  1212. getCompressionBlockSize(ps), memoryPool, readerMetrics);
  1213. auto footer = std::make_unique<proto::Footer>();
  1214. if (!footer->ParseFromZeroCopyStream(pbStream.get())) {
  1215. throw ParseError("Failed to parse the footer from " + stream->getName());
  1216. }
  1217. checkProtoTypes(*footer);
  1218. return footer;
  1219. }
  1220. std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream,
  1221. const ReaderOptions& options) {
  1222. auto contents = std::make_shared<FileContents>();
  1223. contents->pool = options.getMemoryPool();
  1224. contents->errorStream = options.getErrorStream();
  1225. contents->readerMetrics = options.getReaderMetrics();
  1226. std::string serializedFooter = options.getSerializedFileTail();
  1227. uint64_t fileLength;
  1228. uint64_t postscriptLength;
  1229. if (serializedFooter.length() != 0) {
  1230. // Parse the file tail from the serialized one.
  1231. proto::FileTail tail;
  1232. if (!tail.ParseFromString(TProtoStringType(serializedFooter))) {
  1233. throw ParseError("Failed to parse the file tail from string");
  1234. }
  1235. contents->postscript = std::make_unique<proto::PostScript>(tail.postscript());
  1236. contents->footer = std::make_unique<proto::Footer>(tail.footer());
  1237. fileLength = tail.file_length();
  1238. postscriptLength = tail.postscript_length();
  1239. } else {
  1240. // figure out the size of the file using the option or filesystem
  1241. fileLength = std::min(options.getTailLocation(), static_cast<uint64_t>(stream->getLength()));
  1242. // read last bytes into buffer to get PostScript
  1243. uint64_t readSize = std::min(fileLength, DIRECTORY_SIZE_GUESS);
  1244. if (readSize < 4) {
  1245. throw ParseError("File size too small");
  1246. }
  1247. auto buffer = std::make_unique<DataBuffer<char>>(*contents->pool, readSize);
  1248. stream->read(buffer->data(), readSize, fileLength - readSize);
  1249. postscriptLength = buffer->data()[readSize - 1] & 0xff;
  1250. contents->postscript = readPostscript(stream.get(), buffer.get(), postscriptLength);
  1251. uint64_t footerSize = contents->postscript->footer_length();
  1252. uint64_t tailSize = 1 + postscriptLength + footerSize;
  1253. if (tailSize >= fileLength) {
  1254. std::stringstream msg;
  1255. msg << "Invalid ORC tailSize=" << tailSize << ", fileLength=" << fileLength;
  1256. throw ParseError(msg.str());
  1257. }
  1258. uint64_t footerOffset;
  1259. if (tailSize > readSize) {
  1260. buffer->resize(footerSize);
  1261. stream->read(buffer->data(), footerSize, fileLength - tailSize);
  1262. footerOffset = 0;
  1263. } else {
  1264. footerOffset = readSize - tailSize;
  1265. }
  1266. contents->footer = readFooter(stream.get(), buffer.get(), footerOffset, *contents->postscript,
  1267. *contents->pool, contents->readerMetrics);
  1268. }
  1269. contents->isDecimalAsLong = false;
  1270. if (contents->postscript->version_size() == 2) {
  1271. FileVersion v(contents->postscript->version(0), contents->postscript->version(1));
  1272. if (v == FileVersion::UNSTABLE_PRE_2_0()) {
  1273. contents->isDecimalAsLong = true;
  1274. }
  1275. }
  1276. contents->stream = std::move(stream);
  1277. return std::make_unique<ReaderImpl>(std::move(contents), options, fileLength, postscriptLength);
  1278. }
  1279. std::map<uint32_t, BloomFilterIndex> ReaderImpl::getBloomFilters(
  1280. uint32_t stripeIndex, const std::set<uint32_t>& included) const {
  1281. std::map<uint32_t, BloomFilterIndex> ret;
  1282. // find stripe info
  1283. if (stripeIndex >= static_cast<uint32_t>(footer->stripes_size())) {
  1284. throw std::logic_error("Illegal stripe index: " +
  1285. to_string(static_cast<int64_t>(stripeIndex)));
  1286. }
  1287. const proto::StripeInformation currentStripeInfo =
  1288. footer->stripes(static_cast<int>(stripeIndex));
  1289. const proto::StripeFooter currentStripeFooter = getStripeFooter(currentStripeInfo, *contents);
  1290. // iterate stripe footer to get stream of bloom_filter
  1291. uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset());
  1292. for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
  1293. const proto::Stream& stream = currentStripeFooter.streams(i);
  1294. uint32_t column = static_cast<uint32_t>(stream.column());
  1295. uint64_t length = static_cast<uint64_t>(stream.length());
  1296. // a bloom filter stream from a selected column is found
  1297. if (stream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8 &&
  1298. (included.empty() || included.find(column) != included.end())) {
  1299. std::unique_ptr<SeekableInputStream> pbStream =
  1300. createDecompressor(contents->compression,
  1301. std::make_unique<SeekableFileInputStream>(
  1302. contents->stream.get(), offset, length, *contents->pool),
  1303. contents->blockSize, *(contents->pool), contents->readerMetrics);
  1304. proto::BloomFilterIndex pbBFIndex;
  1305. if (!pbBFIndex.ParseFromZeroCopyStream(pbStream.get())) {
  1306. throw ParseError("Failed to parse BloomFilterIndex");
  1307. }
  1308. BloomFilterIndex bfIndex;
  1309. for (int j = 0; j < pbBFIndex.bloom_filter_size(); j++) {
  1310. std::unique_ptr<BloomFilter> entry = BloomFilterUTF8Utils::deserialize(
  1311. stream.kind(), currentStripeFooter.columns(static_cast<int>(stream.column())),
  1312. pbBFIndex.bloom_filter(j));
  1313. bfIndex.entries.push_back(std::shared_ptr<BloomFilter>(std::move(entry)));
  1314. }
  1315. // add bloom filters to result for one column
  1316. ret[column] = bfIndex;
  1317. }
  1318. offset += length;
  1319. }
  1320. return ret;
  1321. }
  1322. RowReader::~RowReader() {
  1323. // PASS
  1324. }
  1325. Reader::~Reader() {
  1326. // PASS
  1327. }
  1328. InputStream::~InputStream(){
  1329. // PASS
  1330. };
  1331. } // namespace orc