parquet.py 83 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from collections import defaultdict
  18. from concurrent import futures
  19. from functools import partial, reduce
  20. import json
  21. from collections.abc import Collection
  22. import numpy as np
  23. import os
  24. import re
  25. import operator
  26. import urllib.parse
  27. import warnings
  28. import pyarrow as pa
  29. import pyarrow.lib as lib
  30. import pyarrow._parquet as _parquet
  31. from pyarrow._parquet import (ParquetReader, Statistics, # noqa
  32. FileMetaData, RowGroupMetaData,
  33. ColumnChunkMetaData,
  34. ParquetSchema, ColumnSchema)
  35. from pyarrow.fs import (LocalFileSystem, FileSystem,
  36. _resolve_filesystem_and_path, _ensure_filesystem)
  37. from pyarrow import filesystem as legacyfs
  38. from pyarrow.util import guid, _is_path_like, _stringify_path
  39. _URI_STRIP_SCHEMES = ('hdfs',)
  40. def _parse_uri(path):
  41. path = _stringify_path(path)
  42. parsed_uri = urllib.parse.urlparse(path)
  43. if parsed_uri.scheme in _URI_STRIP_SCHEMES:
  44. return parsed_uri.path
  45. else:
  46. # ARROW-4073: On Windows returning the path with the scheme
  47. # stripped removes the drive letter, if any
  48. return path
  49. def _get_filesystem_and_path(passed_filesystem, path):
  50. if passed_filesystem is None:
  51. return legacyfs.resolve_filesystem_and_path(path, passed_filesystem)
  52. else:
  53. passed_filesystem = legacyfs._ensure_filesystem(passed_filesystem)
  54. parsed_path = _parse_uri(path)
  55. return passed_filesystem, parsed_path
  56. def _check_contains_null(val):
  57. if isinstance(val, bytes):
  58. for byte in val:
  59. if isinstance(byte, bytes):
  60. compare_to = chr(0)
  61. else:
  62. compare_to = 0
  63. if byte == compare_to:
  64. return True
  65. elif isinstance(val, str):
  66. return '\x00' in val
  67. return False
  68. def _check_filters(filters, check_null_strings=True):
  69. """
  70. Check if filters are well-formed.
  71. """
  72. if filters is not None:
  73. if len(filters) == 0 or any(len(f) == 0 for f in filters):
  74. raise ValueError("Malformed filters")
  75. if isinstance(filters[0][0], str):
  76. # We have encountered the situation where we have one nesting level
  77. # too few:
  78. # We have [(,,), ..] instead of [[(,,), ..]]
  79. filters = [filters]
  80. if check_null_strings:
  81. for conjunction in filters:
  82. for col, op, val in conjunction:
  83. if (
  84. isinstance(val, list) and
  85. all(_check_contains_null(v) for v in val) or
  86. _check_contains_null(val)
  87. ):
  88. raise NotImplementedError(
  89. "Null-terminated binary strings are not supported "
  90. "as filter values."
  91. )
  92. return filters
  93. _DNF_filter_doc = """Predicates are expressed in disjunctive normal form (DNF), like
  94. ``[[('x', '=', 0), ...], ...]``. DNF allows arbitrary boolean logical
  95. combinations of single column predicates. The innermost tuples each
  96. describe a single column predicate. The list of inner predicates is
  97. interpreted as a conjunction (AND), forming a more selective and
  98. multiple column predicate. Finally, the most outer list combines these
  99. filters as a disjunction (OR).
  100. Predicates may also be passed as List[Tuple]. This form is interpreted
  101. as a single conjunction. To express OR in predicates, one must
  102. use the (preferred) List[List[Tuple]] notation.
  103. Each tuple has format: (``key``, ``op``, ``value``) and compares the
  104. ``key`` with the ``value``.
  105. The supported ``op`` are: ``=`` or ``==``, ``!=``, ``<``, ``>``, ``<=``,
  106. ``>=``, ``in`` and ``not in``. If the ``op`` is ``in`` or ``not in``, the
  107. ``value`` must be a collection such as a ``list``, a ``set`` or a
  108. ``tuple``.
  109. Examples:
  110. .. code-block:: python
  111. ('x', '=', 0)
  112. ('y', 'in', ['a', 'b', 'c'])
  113. ('z', 'not in', {'a','b'})
  114. """
  115. def _filters_to_expression(filters):
  116. """
  117. Check if filters are well-formed.
  118. See _DNF_filter_doc above for more details.
  119. """
  120. import pyarrow.dataset as ds
  121. if isinstance(filters, ds.Expression):
  122. return filters
  123. filters = _check_filters(filters, check_null_strings=False)
  124. def convert_single_predicate(col, op, val):
  125. field = ds.field(col)
  126. if op == "=" or op == "==":
  127. return field == val
  128. elif op == "!=":
  129. return field != val
  130. elif op == '<':
  131. return field < val
  132. elif op == '>':
  133. return field > val
  134. elif op == '<=':
  135. return field <= val
  136. elif op == '>=':
  137. return field >= val
  138. elif op == 'in':
  139. return field.isin(val)
  140. elif op == 'not in':
  141. return ~field.isin(val)
  142. else:
  143. raise ValueError(
  144. '"{0}" is not a valid operator in predicates.'.format(
  145. (col, op, val)))
  146. disjunction_members = []
  147. for conjunction in filters:
  148. conjunction_members = [
  149. convert_single_predicate(col, op, val)
  150. for col, op, val in conjunction
  151. ]
  152. disjunction_members.append(reduce(operator.and_, conjunction_members))
  153. return reduce(operator.or_, disjunction_members)
  154. # ----------------------------------------------------------------------
  155. # Reading a single Parquet file
  156. class ParquetFile:
  157. """
  158. Reader interface for a single Parquet file.
  159. Parameters
  160. ----------
  161. source : str, pathlib.Path, pyarrow.NativeFile, or file-like object
  162. Readable source. For passing bytes or buffer-like file containing a
  163. Parquet file, use pyarrow.BufferReader.
  164. metadata : FileMetaData, default None
  165. Use existing metadata object, rather than reading from file.
  166. common_metadata : FileMetaData, default None
  167. Will be used in reads for pandas schema metadata if not found in the
  168. main file's metadata, no other uses at the moment.
  169. memory_map : bool, default False
  170. If the source is a file path, use a memory map to read file, which can
  171. improve performance in some environments.
  172. buffer_size : int, default 0
  173. If positive, perform read buffering when deserializing individual
  174. column chunks. Otherwise IO calls are unbuffered.
  175. pre_buffer : bool, default False
  176. Coalesce and issue file reads in parallel to improve performance on
  177. high-latency filesystems (e.g. S3). If True, Arrow will use a
  178. background I/O thread pool.
  179. coerce_int96_timestamp_unit : str, default None.
  180. Cast timestamps that are stored in INT96 format to a particular
  181. resolution (e.g. 'ms'). Setting to None is equivalent to 'ns'
  182. and therefore INT96 timestamps will be infered as timestamps
  183. in nanoseconds.
  184. """
  185. def __init__(self, source, metadata=None, common_metadata=None,
  186. read_dictionary=None, memory_map=False, buffer_size=0,
  187. pre_buffer=False, coerce_int96_timestamp_unit=None):
  188. self.reader = ParquetReader()
  189. self.reader.open(
  190. source, use_memory_map=memory_map,
  191. buffer_size=buffer_size, pre_buffer=pre_buffer,
  192. read_dictionary=read_dictionary, metadata=metadata,
  193. coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
  194. )
  195. self.common_metadata = common_metadata
  196. self._nested_paths_by_prefix = self._build_nested_paths()
  197. def _build_nested_paths(self):
  198. paths = self.reader.column_paths
  199. result = defaultdict(list)
  200. for i, path in enumerate(paths):
  201. key = path[0]
  202. rest = path[1:]
  203. while True:
  204. result[key].append(i)
  205. if not rest:
  206. break
  207. key = '.'.join((key, rest[0]))
  208. rest = rest[1:]
  209. return result
  210. @property
  211. def metadata(self):
  212. return self.reader.metadata
  213. @property
  214. def schema(self):
  215. """
  216. Return the Parquet schema, unconverted to Arrow types
  217. """
  218. return self.metadata.schema
  219. @property
  220. def schema_arrow(self):
  221. """
  222. Return the inferred Arrow schema, converted from the whole Parquet
  223. file's schema
  224. """
  225. return self.reader.schema_arrow
  226. @property
  227. def num_row_groups(self):
  228. return self.reader.num_row_groups
  229. def read_row_group(self, i, columns=None, use_threads=True,
  230. use_pandas_metadata=False):
  231. """
  232. Read a single row group from a Parquet file.
  233. Parameters
  234. ----------
  235. columns: list
  236. If not None, only these columns will be read from the row group. A
  237. column name may be a prefix of a nested field, e.g. 'a' will select
  238. 'a.b', 'a.c', and 'a.d.e'.
  239. use_threads : bool, default True
  240. Perform multi-threaded column reads.
  241. use_pandas_metadata : bool, default False
  242. If True and file has custom pandas schema metadata, ensure that
  243. index columns are also loaded.
  244. Returns
  245. -------
  246. pyarrow.table.Table
  247. Content of the row group as a table (of columns)
  248. """
  249. column_indices = self._get_column_indices(
  250. columns, use_pandas_metadata=use_pandas_metadata)
  251. return self.reader.read_row_group(i, column_indices=column_indices,
  252. use_threads=use_threads)
  253. def read_row_groups(self, row_groups, columns=None, use_threads=True,
  254. use_pandas_metadata=False):
  255. """
  256. Read a multiple row groups from a Parquet file.
  257. Parameters
  258. ----------
  259. row_groups: list
  260. Only these row groups will be read from the file.
  261. columns: list
  262. If not None, only these columns will be read from the row group. A
  263. column name may be a prefix of a nested field, e.g. 'a' will select
  264. 'a.b', 'a.c', and 'a.d.e'.
  265. use_threads : bool, default True
  266. Perform multi-threaded column reads.
  267. use_pandas_metadata : bool, default False
  268. If True and file has custom pandas schema metadata, ensure that
  269. index columns are also loaded.
  270. Returns
  271. -------
  272. pyarrow.table.Table
  273. Content of the row groups as a table (of columns).
  274. """
  275. column_indices = self._get_column_indices(
  276. columns, use_pandas_metadata=use_pandas_metadata)
  277. return self.reader.read_row_groups(row_groups,
  278. column_indices=column_indices,
  279. use_threads=use_threads)
  280. def iter_batches(self, batch_size=65536, row_groups=None, columns=None,
  281. use_threads=True, use_pandas_metadata=False):
  282. """
  283. Read streaming batches from a Parquet file
  284. Parameters
  285. ----------
  286. batch_size: int, default 64K
  287. Maximum number of records to yield per batch. Batches may be
  288. smaller if there aren't enough rows in the file.
  289. row_groups: list
  290. Only these row groups will be read from the file.
  291. columns: list
  292. If not None, only these columns will be read from the file. A
  293. column name may be a prefix of a nested field, e.g. 'a' will select
  294. 'a.b', 'a.c', and 'a.d.e'.
  295. use_threads : boolean, default True
  296. Perform multi-threaded column reads.
  297. use_pandas_metadata : boolean, default False
  298. If True and file has custom pandas schema metadata, ensure that
  299. index columns are also loaded.
  300. Returns
  301. -------
  302. iterator of pyarrow.RecordBatch
  303. Contents of each batch as a record batch
  304. """
  305. if row_groups is None:
  306. row_groups = range(0, self.metadata.num_row_groups)
  307. column_indices = self._get_column_indices(
  308. columns, use_pandas_metadata=use_pandas_metadata)
  309. batches = self.reader.iter_batches(batch_size,
  310. row_groups=row_groups,
  311. column_indices=column_indices,
  312. use_threads=use_threads)
  313. return batches
  314. def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
  315. """
  316. Read a Table from Parquet format,
  317. Parameters
  318. ----------
  319. columns: list
  320. If not None, only these columns will be read from the file. A
  321. column name may be a prefix of a nested field, e.g. 'a' will select
  322. 'a.b', 'a.c', and 'a.d.e'.
  323. use_threads : bool, default True
  324. Perform multi-threaded column reads.
  325. use_pandas_metadata : bool, default False
  326. If True and file has custom pandas schema metadata, ensure that
  327. index columns are also loaded.
  328. Returns
  329. -------
  330. pyarrow.table.Table
  331. Content of the file as a table (of columns).
  332. """
  333. column_indices = self._get_column_indices(
  334. columns, use_pandas_metadata=use_pandas_metadata)
  335. return self.reader.read_all(column_indices=column_indices,
  336. use_threads=use_threads)
  337. def scan_contents(self, columns=None, batch_size=65536):
  338. """
  339. Read contents of file for the given columns and batch size.
  340. Notes
  341. -----
  342. This function's primary purpose is benchmarking.
  343. The scan is executed on a single thread.
  344. Parameters
  345. ----------
  346. columns : list of integers, default None
  347. Select columns to read, if None scan all columns.
  348. batch_size : int, default 64K
  349. Number of rows to read at a time internally.
  350. Returns
  351. -------
  352. num_rows : number of rows in file
  353. """
  354. column_indices = self._get_column_indices(columns)
  355. return self.reader.scan_contents(column_indices,
  356. batch_size=batch_size)
  357. def _get_column_indices(self, column_names, use_pandas_metadata=False):
  358. if column_names is None:
  359. return None
  360. indices = []
  361. for name in column_names:
  362. if name in self._nested_paths_by_prefix:
  363. indices.extend(self._nested_paths_by_prefix[name])
  364. if use_pandas_metadata:
  365. file_keyvalues = self.metadata.metadata
  366. common_keyvalues = (self.common_metadata.metadata
  367. if self.common_metadata is not None
  368. else None)
  369. if file_keyvalues and b'pandas' in file_keyvalues:
  370. index_columns = _get_pandas_index_columns(file_keyvalues)
  371. elif common_keyvalues and b'pandas' in common_keyvalues:
  372. index_columns = _get_pandas_index_columns(common_keyvalues)
  373. else:
  374. index_columns = []
  375. if indices is not None and index_columns:
  376. indices += [self.reader.column_name_idx(descr)
  377. for descr in index_columns
  378. if not isinstance(descr, dict)]
  379. return indices
  380. _SPARK_DISALLOWED_CHARS = re.compile('[ ,;{}()\n\t=]')
  381. def _sanitized_spark_field_name(name):
  382. return _SPARK_DISALLOWED_CHARS.sub('_', name)
  383. def _sanitize_schema(schema, flavor):
  384. if 'spark' in flavor:
  385. sanitized_fields = []
  386. schema_changed = False
  387. for field in schema:
  388. name = field.name
  389. sanitized_name = _sanitized_spark_field_name(name)
  390. if sanitized_name != name:
  391. schema_changed = True
  392. sanitized_field = pa.field(sanitized_name, field.type,
  393. field.nullable, field.metadata)
  394. sanitized_fields.append(sanitized_field)
  395. else:
  396. sanitized_fields.append(field)
  397. new_schema = pa.schema(sanitized_fields, metadata=schema.metadata)
  398. return new_schema, schema_changed
  399. else:
  400. return schema, False
  401. def _sanitize_table(table, new_schema, flavor):
  402. # TODO: This will not handle prohibited characters in nested field names
  403. if 'spark' in flavor:
  404. column_data = [table[i] for i in range(table.num_columns)]
  405. return pa.Table.from_arrays(column_data, schema=new_schema)
  406. else:
  407. return table
  408. _parquet_writer_arg_docs = """version : {"1.0", "2.0"}, default "1.0"
  409. Determine which Parquet logical types are available for use, whether the
  410. reduced set from the Parquet 1.x.x format or the expanded logical types
  411. added in format version 2.0.0 and after. Note that files written with
  412. version='2.0' may not be readable in all Parquet implementations, so
  413. version='1.0' is likely the choice that maximizes file compatibility. Some
  414. features, such as lossless storage of nanosecond timestamps as INT64
  415. physical storage, are only available with version='2.0'. The Parquet 2.0.0
  416. format version also introduced a new serialized data page format; this can
  417. be enabled separately using the data_page_version option.
  418. use_dictionary : bool or list
  419. Specify if we should use dictionary encoding in general or only for
  420. some columns.
  421. use_deprecated_int96_timestamps : bool, default None
  422. Write timestamps to INT96 Parquet format. Defaults to False unless enabled
  423. by flavor argument. This take priority over the coerce_timestamps option.
  424. coerce_timestamps : str, default None
  425. Cast timestamps a particular resolution. The defaults depends on `version`.
  426. For ``version='1.0'`` (the default), nanoseconds will be cast to
  427. microseconds ('us'), and seconds to milliseconds ('ms') by default. For
  428. ``version='2.0'``, the original resolution is preserved and no casting
  429. is done by default. The casting might result in loss of data, in which
  430. case ``allow_truncated_timestamps=True`` can be used to suppress the
  431. raised exception.
  432. Valid values: {None, 'ms', 'us'}
  433. data_page_size : int, default None
  434. Set a target threshold for the approximate encoded size of data
  435. pages within a column chunk (in bytes). If None, use the default data page
  436. size of 1MByte.
  437. allow_truncated_timestamps : bool, default False
  438. Allow loss of data when coercing timestamps to a particular
  439. resolution. E.g. if microsecond or nanosecond data is lost when coercing to
  440. 'ms', do not raise an exception. Passing ``allow_truncated_timestamp=True``
  441. will NOT result in the truncation exception being ignored unless
  442. ``coerce_timestamps`` is not None.
  443. compression : str or dict
  444. Specify the compression codec, either on a general basis or per-column.
  445. Valid values: {'NONE', 'SNAPPY', 'GZIP', 'BROTLI', 'LZ4', 'ZSTD'}.
  446. write_statistics : bool or list
  447. Specify if we should write statistics in general (default is True) or only
  448. for some columns.
  449. flavor : {'spark'}, default None
  450. Sanitize schema or set other compatibility options to work with
  451. various target systems.
  452. filesystem : FileSystem, default None
  453. If nothing passed, will be inferred from `where` if path-like, else
  454. `where` is already a file-like object so no filesystem is needed.
  455. compression_level: int or dict, default None
  456. Specify the compression level for a codec, either on a general basis or
  457. per-column. If None is passed, arrow selects the compression level for
  458. the compression codec in use. The compression level has a different
  459. meaning for each codec, so you have to read the documentation of the
  460. codec you are using.
  461. An exception is thrown if the compression codec does not allow specifying
  462. a compression level.
  463. use_byte_stream_split: bool or list, default False
  464. Specify if the byte_stream_split encoding should be used in general or
  465. only for some columns. If both dictionary and byte_stream_stream are
  466. enabled, then dictionary is preferred.
  467. The byte_stream_split encoding is valid only for floating-point data types
  468. and should be combined with a compression codec.
  469. data_page_version : {"1.0", "2.0"}, default "1.0"
  470. The serialized Parquet data page format version to write, defaults to
  471. 1.0. This does not impact the file schema logical types and Arrow to
  472. Parquet type casting behavior; for that use the "version" option.
  473. use_compliant_nested_type: bool, default False
  474. Whether to write compliant Parquet nested type (lists) as defined
  475. `here <https://github.com/apache/parquet-format/blob/master/
  476. LogicalTypes.md#nested-types>`_, defaults to ``False``.
  477. For ``use_compliant_nested_type=True``, this will write into a list
  478. with 3-level structure where the middle level, named ``list``,
  479. is a repeated group with a single field named ``element``::
  480. <list-repetition> group <name> (LIST) {
  481. repeated group list {
  482. <element-repetition> <element-type> element;
  483. }
  484. }
  485. For ``use_compliant_nested_type=False``, this will also write into a list
  486. with 3-level structure, where the name of the single field of the middle
  487. level ``list`` is taken from the element name for nested columns in Arrow,
  488. which defaults to ``item``::
  489. <list-repetition> group <name> (LIST) {
  490. repeated group list {
  491. <element-repetition> <element-type> item;
  492. }
  493. }
  494. """
  495. class ParquetWriter:
  496. __doc__ = """
  497. Class for incrementally building a Parquet file for Arrow tables.
  498. Parameters
  499. ----------
  500. where : path or file-like object
  501. schema : arrow Schema
  502. {}
  503. **options : dict
  504. If options contains a key `metadata_collector` then the
  505. corresponding value is assumed to be a list (or any object with
  506. `.append` method) that will be filled with the file metadata instance
  507. of the written file.
  508. """.format(_parquet_writer_arg_docs)
  509. def __init__(self, where, schema, filesystem=None,
  510. flavor=None,
  511. version='1.0',
  512. use_dictionary=True,
  513. compression='snappy',
  514. write_statistics=True,
  515. use_deprecated_int96_timestamps=None,
  516. compression_level=None,
  517. use_byte_stream_split=False,
  518. writer_engine_version=None,
  519. data_page_version='1.0',
  520. use_compliant_nested_type=False,
  521. **options):
  522. if use_deprecated_int96_timestamps is None:
  523. # Use int96 timestamps for Spark
  524. if flavor is not None and 'spark' in flavor:
  525. use_deprecated_int96_timestamps = True
  526. else:
  527. use_deprecated_int96_timestamps = False
  528. self.flavor = flavor
  529. if flavor is not None:
  530. schema, self.schema_changed = _sanitize_schema(schema, flavor)
  531. else:
  532. self.schema_changed = False
  533. self.schema = schema
  534. self.where = where
  535. # If we open a file using a filesystem, store file handle so we can be
  536. # sure to close it when `self.close` is called.
  537. self.file_handle = None
  538. filesystem, path = _resolve_filesystem_and_path(
  539. where, filesystem, allow_legacy_filesystem=True
  540. )
  541. if filesystem is not None:
  542. if isinstance(filesystem, legacyfs.FileSystem):
  543. # legacy filesystem (eg custom subclass)
  544. # TODO deprecate
  545. sink = self.file_handle = filesystem.open(path, 'wb')
  546. else:
  547. # ARROW-10480: do not auto-detect compression. While
  548. # a filename like foo.parquet.gz is nonconforming, it
  549. # shouldn't implicitly apply compression.
  550. sink = self.file_handle = filesystem.open_output_stream(
  551. path, compression=None)
  552. else:
  553. sink = where
  554. self._metadata_collector = options.pop('metadata_collector', None)
  555. engine_version = 'V2'
  556. self.writer = _parquet.ParquetWriter(
  557. sink, schema,
  558. version=version,
  559. compression=compression,
  560. use_dictionary=use_dictionary,
  561. write_statistics=write_statistics,
  562. use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
  563. compression_level=compression_level,
  564. use_byte_stream_split=use_byte_stream_split,
  565. writer_engine_version=engine_version,
  566. data_page_version=data_page_version,
  567. use_compliant_nested_type=use_compliant_nested_type,
  568. **options)
  569. self.is_open = True
  570. def __del__(self):
  571. if getattr(self, 'is_open', False):
  572. self.close()
  573. def __enter__(self):
  574. return self
  575. def __exit__(self, *args, **kwargs):
  576. self.close()
  577. # return false since we want to propagate exceptions
  578. return False
  579. def write_table(self, table, row_group_size=None):
  580. if self.schema_changed:
  581. table = _sanitize_table(table, self.schema, self.flavor)
  582. assert self.is_open
  583. if not table.schema.equals(self.schema, check_metadata=False):
  584. msg = ('Table schema does not match schema used to create file: '
  585. '\ntable:\n{!s} vs. \nfile:\n{!s}'
  586. .format(table.schema, self.schema))
  587. raise ValueError(msg)
  588. self.writer.write_table(table, row_group_size=row_group_size)
  589. def close(self):
  590. if self.is_open:
  591. self.writer.close()
  592. self.is_open = False
  593. if self._metadata_collector is not None:
  594. self._metadata_collector.append(self.writer.metadata)
  595. if self.file_handle is not None:
  596. self.file_handle.close()
  597. def _get_pandas_index_columns(keyvalues):
  598. return (json.loads(keyvalues[b'pandas'].decode('utf8'))
  599. ['index_columns'])
  600. # ----------------------------------------------------------------------
  601. # Metadata container providing instructions about reading a single Parquet
  602. # file, possibly part of a partitioned dataset
  603. class ParquetDatasetPiece:
  604. """
  605. DEPRECATED: A single chunk of a potentially larger Parquet dataset to read.
  606. The arguments will indicate to read either a single row group or all row
  607. groups, and whether to add partition keys to the resulting pyarrow.Table.
  608. .. deprecated:: 5.0
  609. Directly constructing a ``ParquetDatasetPiece`` is deprecated, as well
  610. as accessing the pieces of a ``ParquetDataset`` object. Specify
  611. ``use_legacy_dataset=False`` when constructing the ``ParquetDataset``
  612. and use the ``ParquetDataset.fragments`` attribute instead.
  613. Parameters
  614. ----------
  615. path : str or pathlib.Path
  616. Path to file in the file system where this piece is located.
  617. open_file_func : callable
  618. Function to use for obtaining file handle to dataset piece.
  619. partition_keys : list of tuples
  620. Two-element tuples of ``(column name, ordinal index)``.
  621. row_group : int, default None
  622. Row group to load. By default, reads all row groups.
  623. """
  624. def __init__(self, path, open_file_func=partial(open, mode='rb'),
  625. file_options=None, row_group=None, partition_keys=None):
  626. warnings.warn(
  627. "ParquetDatasetPiece is deprecated as of pyarrow 5.0.0 and will "
  628. "be removed in a future version.",
  629. DeprecationWarning, stacklevel=2)
  630. self._init(
  631. path, open_file_func, file_options, row_group, partition_keys)
  632. @staticmethod
  633. def _create(path, open_file_func=partial(open, mode='rb'),
  634. file_options=None, row_group=None, partition_keys=None):
  635. self = ParquetDatasetPiece.__new__(ParquetDatasetPiece)
  636. self._init(
  637. path, open_file_func, file_options, row_group, partition_keys)
  638. return self
  639. def _init(self, path, open_file_func, file_options, row_group,
  640. partition_keys):
  641. self.path = _stringify_path(path)
  642. self.open_file_func = open_file_func
  643. self.row_group = row_group
  644. self.partition_keys = partition_keys or []
  645. self.file_options = file_options or {}
  646. def __eq__(self, other):
  647. if not isinstance(other, ParquetDatasetPiece):
  648. return False
  649. return (self.path == other.path and
  650. self.row_group == other.row_group and
  651. self.partition_keys == other.partition_keys)
  652. def __repr__(self):
  653. return ('{}({!r}, row_group={!r}, partition_keys={!r})'
  654. .format(type(self).__name__, self.path,
  655. self.row_group,
  656. self.partition_keys))
  657. def __str__(self):
  658. result = ''
  659. if len(self.partition_keys) > 0:
  660. partition_str = ', '.join('{}={}'.format(name, index)
  661. for name, index in self.partition_keys)
  662. result += 'partition[{}] '.format(partition_str)
  663. result += self.path
  664. if self.row_group is not None:
  665. result += ' | row_group={}'.format(self.row_group)
  666. return result
  667. def get_metadata(self):
  668. """
  669. Return the file's metadata.
  670. Returns
  671. -------
  672. metadata : FileMetaData
  673. """
  674. f = self.open()
  675. return f.metadata
  676. def open(self):
  677. """
  678. Return instance of ParquetFile.
  679. """
  680. reader = self.open_file_func(self.path)
  681. if not isinstance(reader, ParquetFile):
  682. reader = ParquetFile(reader, **self.file_options)
  683. return reader
  684. def read(self, columns=None, use_threads=True, partitions=None,
  685. file=None, use_pandas_metadata=False):
  686. """
  687. Read this piece as a pyarrow.Table.
  688. Parameters
  689. ----------
  690. columns : list of column names, default None
  691. use_threads : bool, default True
  692. Perform multi-threaded column reads.
  693. partitions : ParquetPartitions, default None
  694. file : file-like object
  695. Passed to ParquetFile.
  696. Returns
  697. -------
  698. table : pyarrow.Table
  699. """
  700. if self.open_file_func is not None:
  701. reader = self.open()
  702. elif file is not None:
  703. reader = ParquetFile(file, **self.file_options)
  704. else:
  705. # try to read the local path
  706. reader = ParquetFile(self.path, **self.file_options)
  707. options = dict(columns=columns,
  708. use_threads=use_threads,
  709. use_pandas_metadata=use_pandas_metadata)
  710. if self.row_group is not None:
  711. table = reader.read_row_group(self.row_group, **options)
  712. else:
  713. table = reader.read(**options)
  714. if len(self.partition_keys) > 0:
  715. if partitions is None:
  716. raise ValueError('Must pass partition sets')
  717. # Here, the index is the categorical code of the partition where
  718. # this piece is located. Suppose we had
  719. #
  720. # /foo=a/0.parq
  721. # /foo=b/0.parq
  722. # /foo=c/0.parq
  723. #
  724. # Then we assign a=0, b=1, c=2. And the resulting Table pieces will
  725. # have a DictionaryArray column named foo having the constant index
  726. # value as indicated. The distinct categories of the partition have
  727. # been computed in the ParquetManifest
  728. for i, (name, index) in enumerate(self.partition_keys):
  729. # The partition code is the same for all values in this piece
  730. indices = np.full(len(table), index, dtype='i4')
  731. # This is set of all partition values, computed as part of the
  732. # manifest, so ['a', 'b', 'c'] as in our example above.
  733. dictionary = partitions.levels[i].dictionary
  734. arr = pa.DictionaryArray.from_arrays(indices, dictionary)
  735. table = table.append_column(name, arr)
  736. return table
  737. class PartitionSet:
  738. """
  739. A data structure for cataloguing the observed Parquet partitions at a
  740. particular level. So if we have
  741. /foo=a/bar=0
  742. /foo=a/bar=1
  743. /foo=a/bar=2
  744. /foo=b/bar=0
  745. /foo=b/bar=1
  746. /foo=b/bar=2
  747. Then we have two partition sets, one for foo, another for bar. As we visit
  748. levels of the partition hierarchy, a PartitionSet tracks the distinct
  749. values and assigns categorical codes to use when reading the pieces
  750. """
  751. def __init__(self, name, keys=None):
  752. self.name = name
  753. self.keys = keys or []
  754. self.key_indices = {k: i for i, k in enumerate(self.keys)}
  755. self._dictionary = None
  756. def get_index(self, key):
  757. """
  758. Get the index of the partition value if it is known, otherwise assign
  759. one
  760. """
  761. if key in self.key_indices:
  762. return self.key_indices[key]
  763. else:
  764. index = len(self.key_indices)
  765. self.keys.append(key)
  766. self.key_indices[key] = index
  767. return index
  768. @property
  769. def dictionary(self):
  770. if self._dictionary is not None:
  771. return self._dictionary
  772. if len(self.keys) == 0:
  773. raise ValueError('No known partition keys')
  774. # Only integer and string partition types are supported right now
  775. try:
  776. integer_keys = [int(x) for x in self.keys]
  777. dictionary = lib.array(integer_keys)
  778. except ValueError:
  779. dictionary = lib.array(self.keys)
  780. self._dictionary = dictionary
  781. return dictionary
  782. @property
  783. def is_sorted(self):
  784. return list(self.keys) == sorted(self.keys)
  785. class ParquetPartitions:
  786. def __init__(self):
  787. self.levels = []
  788. self.partition_names = set()
  789. def __len__(self):
  790. return len(self.levels)
  791. def __getitem__(self, i):
  792. return self.levels[i]
  793. def equals(self, other):
  794. if not isinstance(other, ParquetPartitions):
  795. raise TypeError('`other` must be an instance of ParquetPartitions')
  796. return (self.levels == other.levels and
  797. self.partition_names == other.partition_names)
  798. def __eq__(self, other):
  799. try:
  800. return self.equals(other)
  801. except TypeError:
  802. return NotImplemented
  803. def get_index(self, level, name, key):
  804. """
  805. Record a partition value at a particular level, returning the distinct
  806. code for that value at that level.
  807. Example:
  808. partitions.get_index(1, 'foo', 'a') returns 0
  809. partitions.get_index(1, 'foo', 'b') returns 1
  810. partitions.get_index(1, 'foo', 'c') returns 2
  811. partitions.get_index(1, 'foo', 'a') returns 0
  812. Parameters
  813. ----------
  814. level : int
  815. The nesting level of the partition we are observing
  816. name : str
  817. The partition name
  818. key : str or int
  819. The partition value
  820. """
  821. if level == len(self.levels):
  822. if name in self.partition_names:
  823. raise ValueError('{} was the name of the partition in '
  824. 'another level'.format(name))
  825. part_set = PartitionSet(name)
  826. self.levels.append(part_set)
  827. self.partition_names.add(name)
  828. return self.levels[level].get_index(key)
  829. def filter_accepts_partition(self, part_key, filter, level):
  830. p_column, p_value_index = part_key
  831. f_column, op, f_value = filter
  832. if p_column != f_column:
  833. return True
  834. f_type = type(f_value)
  835. if op in {'in', 'not in'}:
  836. if not isinstance(f_value, Collection):
  837. raise TypeError(
  838. "'%s' object is not a collection", f_type.__name__)
  839. if not f_value:
  840. raise ValueError("Cannot use empty collection as filter value")
  841. if len({type(item) for item in f_value}) != 1:
  842. raise ValueError("All elements of the collection '%s' must be"
  843. " of same type", f_value)
  844. f_type = type(next(iter(f_value)))
  845. elif not isinstance(f_value, str) and isinstance(f_value, Collection):
  846. raise ValueError(
  847. "Op '%s' not supported with a collection value", op)
  848. p_value = f_type(self.levels[level]
  849. .dictionary[p_value_index].as_py())
  850. if op == "=" or op == "==":
  851. return p_value == f_value
  852. elif op == "!=":
  853. return p_value != f_value
  854. elif op == '<':
  855. return p_value < f_value
  856. elif op == '>':
  857. return p_value > f_value
  858. elif op == '<=':
  859. return p_value <= f_value
  860. elif op == '>=':
  861. return p_value >= f_value
  862. elif op == 'in':
  863. return p_value in f_value
  864. elif op == 'not in':
  865. return p_value not in f_value
  866. else:
  867. raise ValueError("'%s' is not a valid operator in predicates.",
  868. filter[1])
  869. class ParquetManifest:
  870. def __init__(self, dirpath, open_file_func=None, filesystem=None,
  871. pathsep='/', partition_scheme='hive', metadata_nthreads=1):
  872. filesystem, dirpath = _get_filesystem_and_path(filesystem, dirpath)
  873. self.filesystem = filesystem
  874. self.open_file_func = open_file_func
  875. self.pathsep = pathsep
  876. self.dirpath = _stringify_path(dirpath)
  877. self.partition_scheme = partition_scheme
  878. self.partitions = ParquetPartitions()
  879. self.pieces = []
  880. self._metadata_nthreads = metadata_nthreads
  881. self._thread_pool = futures.ThreadPoolExecutor(
  882. max_workers=metadata_nthreads)
  883. self.common_metadata_path = None
  884. self.metadata_path = None
  885. self._visit_level(0, self.dirpath, [])
  886. # Due to concurrency, pieces will potentially by out of order if the
  887. # dataset is partitioned so we sort them to yield stable results
  888. self.pieces.sort(key=lambda piece: piece.path)
  889. if self.common_metadata_path is None:
  890. # _common_metadata is a subset of _metadata
  891. self.common_metadata_path = self.metadata_path
  892. self._thread_pool.shutdown()
  893. def _visit_level(self, level, base_path, part_keys):
  894. fs = self.filesystem
  895. _, directories, files = next(fs.walk(base_path))
  896. filtered_files = []
  897. for path in files:
  898. full_path = self.pathsep.join((base_path, path))
  899. if path.endswith('_common_metadata'):
  900. self.common_metadata_path = full_path
  901. elif path.endswith('_metadata'):
  902. self.metadata_path = full_path
  903. elif self._should_silently_exclude(path):
  904. continue
  905. else:
  906. filtered_files.append(full_path)
  907. # ARROW-1079: Filter out "private" directories starting with underscore
  908. filtered_directories = [self.pathsep.join((base_path, x))
  909. for x in directories
  910. if not _is_private_directory(x)]
  911. filtered_files.sort()
  912. filtered_directories.sort()
  913. if len(filtered_files) > 0 and len(filtered_directories) > 0:
  914. raise ValueError('Found files in an intermediate '
  915. 'directory: {}'.format(base_path))
  916. elif len(filtered_directories) > 0:
  917. self._visit_directories(level, filtered_directories, part_keys)
  918. else:
  919. self._push_pieces(filtered_files, part_keys)
  920. def _should_silently_exclude(self, file_name):
  921. return (file_name.endswith('.crc') or # Checksums
  922. file_name.endswith('_$folder$') or # HDFS directories in S3
  923. file_name.startswith('.') or # Hidden files starting with .
  924. file_name.startswith('_') or # Hidden files starting with _
  925. file_name in EXCLUDED_PARQUET_PATHS)
  926. def _visit_directories(self, level, directories, part_keys):
  927. futures_list = []
  928. for path in directories:
  929. head, tail = _path_split(path, self.pathsep)
  930. name, key = _parse_hive_partition(tail)
  931. index = self.partitions.get_index(level, name, key)
  932. dir_part_keys = part_keys + [(name, index)]
  933. # If you have less threads than levels, the wait call will block
  934. # indefinitely due to multiple waits within a thread.
  935. if level < self._metadata_nthreads:
  936. future = self._thread_pool.submit(self._visit_level,
  937. level + 1,
  938. path,
  939. dir_part_keys)
  940. futures_list.append(future)
  941. else:
  942. self._visit_level(level + 1, path, dir_part_keys)
  943. if futures_list:
  944. futures.wait(futures_list)
  945. def _parse_partition(self, dirname):
  946. if self.partition_scheme == 'hive':
  947. return _parse_hive_partition(dirname)
  948. else:
  949. raise NotImplementedError('partition schema: {}'
  950. .format(self.partition_scheme))
  951. def _push_pieces(self, files, part_keys):
  952. self.pieces.extend([
  953. ParquetDatasetPiece._create(path, partition_keys=part_keys,
  954. open_file_func=self.open_file_func)
  955. for path in files
  956. ])
  957. def _parse_hive_partition(value):
  958. if '=' not in value:
  959. raise ValueError('Directory name did not appear to be a '
  960. 'partition: {}'.format(value))
  961. return value.split('=', 1)
  962. def _is_private_directory(x):
  963. _, tail = os.path.split(x)
  964. return (tail.startswith('_') or tail.startswith('.')) and '=' not in tail
  965. def _path_split(path, sep):
  966. i = path.rfind(sep) + 1
  967. head, tail = path[:i], path[i:]
  968. head = head.rstrip(sep)
  969. return head, tail
  970. EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
  971. class _ParquetDatasetMetadata:
  972. __slots__ = ('fs', 'memory_map', 'read_dictionary', 'common_metadata',
  973. 'buffer_size')
  974. def _open_dataset_file(dataset, path, meta=None):
  975. if (dataset.fs is not None and
  976. not isinstance(dataset.fs, legacyfs.LocalFileSystem)):
  977. path = dataset.fs.open(path, mode='rb')
  978. return ParquetFile(
  979. path,
  980. metadata=meta,
  981. memory_map=dataset.memory_map,
  982. read_dictionary=dataset.read_dictionary,
  983. common_metadata=dataset.common_metadata,
  984. buffer_size=dataset.buffer_size
  985. )
  986. _DEPR_MSG = (
  987. "'{}' attribute is deprecated as of pyarrow 5.0.0 and will be removed "
  988. "in a future version.{}"
  989. )
  990. _read_docstring_common = """\
  991. read_dictionary : list, default None
  992. List of names or column paths (for nested types) to read directly
  993. as DictionaryArray. Only supported for BYTE_ARRAY storage. To read
  994. a flat column as dictionary-encoded pass the column name. For
  995. nested types, you must pass the full column "path", which could be
  996. something like level1.level2.list.item. Refer to the Parquet
  997. file's schema to obtain the paths.
  998. memory_map : bool, default False
  999. If the source is a file path, use a memory map to read file, which can
  1000. improve performance in some environments.
  1001. buffer_size : int, default 0
  1002. If positive, perform read buffering when deserializing individual
  1003. column chunks. Otherwise IO calls are unbuffered.
  1004. partitioning : Partitioning or str or list of str, default "hive"
  1005. The partitioning scheme for a partitioned dataset. The default of "hive"
  1006. assumes directory names with key=value pairs like "/year=2009/month=11".
  1007. In addition, a scheme like "/2009/11" is also supported, in which case
  1008. you need to specify the field names or a full schema. See the
  1009. ``pyarrow.dataset.partitioning()`` function for more details."""
  1010. class ParquetDataset:
  1011. __doc__ = """
  1012. Encapsulates details of reading a complete Parquet dataset possibly
  1013. consisting of multiple files and partitions in subdirectories.
  1014. Parameters
  1015. ----------
  1016. path_or_paths : str or List[str]
  1017. A directory name, single file name, or list of file names.
  1018. filesystem : FileSystem, default None
  1019. If nothing passed, paths assumed to be found in the local on-disk
  1020. filesystem.
  1021. metadata : pyarrow.parquet.FileMetaData
  1022. Use metadata obtained elsewhere to validate file schemas.
  1023. schema : pyarrow.parquet.Schema
  1024. Use schema obtained elsewhere to validate file schemas. Alternative to
  1025. metadata parameter.
  1026. split_row_groups : bool, default False
  1027. Divide files into pieces for each row group in the file.
  1028. validate_schema : bool, default True
  1029. Check that individual file schemas are all the same / compatible.
  1030. filters : List[Tuple] or List[List[Tuple]] or None (default)
  1031. Rows which do not match the filter predicate will be removed from scanned
  1032. data. Partition keys embedded in a nested directory structure will be
  1033. exploited to avoid loading files at all if they contain no matching rows.
  1034. If `use_legacy_dataset` is True, filters can only reference partition
  1035. keys and only a hive-style directory structure is supported. When
  1036. setting `use_legacy_dataset` to False, also within-file level filtering
  1037. and different partitioning schemes are supported.
  1038. {1}
  1039. metadata_nthreads: int, default 1
  1040. How many threads to allow the thread pool which is used to read the
  1041. dataset metadata. Increasing this is helpful to read partitioned
  1042. datasets.
  1043. {0}
  1044. use_legacy_dataset : bool, default True
  1045. Set to False to enable the new code path (experimental, using the
  1046. new Arrow Dataset API). Among other things, this allows to pass
  1047. `filters` for all columns and not only the partition keys, enables
  1048. different partitioning schemes, etc.
  1049. pre_buffer : bool, default True
  1050. Coalesce and issue file reads in parallel to improve performance on
  1051. high-latency filesystems (e.g. S3). If True, Arrow will use a
  1052. background I/O thread pool. This option is only supported for
  1053. use_legacy_dataset=False. If using a filesystem layer that itself
  1054. performs readahead (e.g. fsspec's S3FS), disable readahead for best
  1055. results.
  1056. coerce_int96_timestamp_unit : str, default None.
  1057. Cast timestamps that are stored in INT96 format to a particular resolution
  1058. (e.g. 'ms'). Setting to None is equivalent to 'ns' and therefore INT96
  1059. timestamps will be infered as timestamps in nanoseconds.
  1060. """.format(_read_docstring_common, _DNF_filter_doc)
  1061. def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
  1062. metadata=None, split_row_groups=False, validate_schema=True,
  1063. filters=None, metadata_nthreads=1, read_dictionary=None,
  1064. memory_map=False, buffer_size=0, partitioning="hive",
  1065. use_legacy_dataset=None, pre_buffer=True,
  1066. coerce_int96_timestamp_unit=None):
  1067. if use_legacy_dataset is None:
  1068. # if a new filesystem is passed -> default to new implementation
  1069. if isinstance(filesystem, FileSystem):
  1070. use_legacy_dataset = False
  1071. # otherwise the default is still True
  1072. else:
  1073. use_legacy_dataset = True
  1074. if not use_legacy_dataset:
  1075. return _ParquetDatasetV2(
  1076. path_or_paths, filesystem=filesystem,
  1077. filters=filters,
  1078. partitioning=partitioning,
  1079. read_dictionary=read_dictionary,
  1080. memory_map=memory_map,
  1081. buffer_size=buffer_size,
  1082. pre_buffer=pre_buffer,
  1083. coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
  1084. # unsupported keywords
  1085. schema=schema, metadata=metadata,
  1086. split_row_groups=split_row_groups,
  1087. validate_schema=validate_schema,
  1088. metadata_nthreads=metadata_nthreads
  1089. )
  1090. self = object.__new__(cls)
  1091. return self
  1092. def __init__(self, path_or_paths, filesystem=None, schema=None,
  1093. metadata=None, split_row_groups=False, validate_schema=True,
  1094. filters=None, metadata_nthreads=1, read_dictionary=None,
  1095. memory_map=False, buffer_size=0, partitioning="hive",
  1096. use_legacy_dataset=True, pre_buffer=True,
  1097. coerce_int96_timestamp_unit=None):
  1098. if partitioning != "hive":
  1099. raise ValueError(
  1100. 'Only "hive" for hive-like partitioning is supported when '
  1101. 'using use_legacy_dataset=True')
  1102. self._metadata = _ParquetDatasetMetadata()
  1103. a_path = path_or_paths
  1104. if isinstance(a_path, list):
  1105. a_path = a_path[0]
  1106. self._metadata.fs, _ = _get_filesystem_and_path(filesystem, a_path)
  1107. if isinstance(path_or_paths, list):
  1108. self.paths = [_parse_uri(path) for path in path_or_paths]
  1109. else:
  1110. self.paths = _parse_uri(path_or_paths)
  1111. self._metadata.read_dictionary = read_dictionary
  1112. self._metadata.memory_map = memory_map
  1113. self._metadata.buffer_size = buffer_size
  1114. (self._pieces,
  1115. self._partitions,
  1116. self.common_metadata_path,
  1117. self.metadata_path) = _make_manifest(
  1118. path_or_paths, self._fs, metadata_nthreads=metadata_nthreads,
  1119. open_file_func=partial(_open_dataset_file, self._metadata)
  1120. )
  1121. if self.common_metadata_path is not None:
  1122. with self._fs.open(self.common_metadata_path) as f:
  1123. self._metadata.common_metadata = read_metadata(
  1124. f,
  1125. memory_map=memory_map
  1126. )
  1127. else:
  1128. self._metadata.common_metadata = None
  1129. if metadata is None and self.metadata_path is not None:
  1130. with self._fs.open(self.metadata_path) as f:
  1131. self.metadata = read_metadata(f, memory_map=memory_map)
  1132. else:
  1133. self.metadata = metadata
  1134. self.schema = schema
  1135. self.split_row_groups = split_row_groups
  1136. if split_row_groups:
  1137. raise NotImplementedError("split_row_groups not yet implemented")
  1138. if filters is not None:
  1139. filters = _check_filters(filters)
  1140. self._filter(filters)
  1141. if validate_schema:
  1142. self.validate_schemas()
  1143. def equals(self, other):
  1144. if not isinstance(other, ParquetDataset):
  1145. raise TypeError('`other` must be an instance of ParquetDataset')
  1146. if self._fs.__class__ != other._fs.__class__:
  1147. return False
  1148. for prop in ('paths', '_pieces', '_partitions',
  1149. 'common_metadata_path', 'metadata_path',
  1150. 'common_metadata', 'metadata', 'schema',
  1151. 'split_row_groups'):
  1152. if getattr(self, prop) != getattr(other, prop):
  1153. return False
  1154. for prop in ('memory_map', 'buffer_size'):
  1155. if getattr(self._metadata, prop) != getattr(other._metadata, prop):
  1156. return False
  1157. return True
  1158. def __eq__(self, other):
  1159. try:
  1160. return self.equals(other)
  1161. except TypeError:
  1162. return NotImplemented
  1163. def validate_schemas(self):
  1164. if self.metadata is None and self.schema is None:
  1165. if self.common_metadata is not None:
  1166. self.schema = self.common_metadata.schema
  1167. else:
  1168. self.schema = self._pieces[0].get_metadata().schema
  1169. elif self.schema is None:
  1170. self.schema = self.metadata.schema
  1171. # Verify schemas are all compatible
  1172. dataset_schema = self.schema.to_arrow_schema()
  1173. # Exclude the partition columns from the schema, they are provided
  1174. # by the path, not the DatasetPiece
  1175. if self._partitions is not None:
  1176. for partition_name in self._partitions.partition_names:
  1177. if dataset_schema.get_field_index(partition_name) != -1:
  1178. field_idx = dataset_schema.get_field_index(partition_name)
  1179. dataset_schema = dataset_schema.remove(field_idx)
  1180. for piece in self._pieces:
  1181. file_metadata = piece.get_metadata()
  1182. file_schema = file_metadata.schema.to_arrow_schema()
  1183. if not dataset_schema.equals(file_schema, check_metadata=False):
  1184. raise ValueError('Schema in {!s} was different. \n'
  1185. '{!s}\n\nvs\n\n{!s}'
  1186. .format(piece, file_schema,
  1187. dataset_schema))
  1188. def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
  1189. """
  1190. Read multiple Parquet files as a single pyarrow.Table.
  1191. Parameters
  1192. ----------
  1193. columns : List[str]
  1194. Names of columns to read from the file.
  1195. use_threads : bool, default True
  1196. Perform multi-threaded column reads
  1197. use_pandas_metadata : bool, default False
  1198. Passed through to each dataset piece.
  1199. Returns
  1200. -------
  1201. pyarrow.Table
  1202. Content of the file as a table (of columns).
  1203. """
  1204. tables = []
  1205. for piece in self._pieces:
  1206. table = piece.read(columns=columns, use_threads=use_threads,
  1207. partitions=self._partitions,
  1208. use_pandas_metadata=use_pandas_metadata)
  1209. tables.append(table)
  1210. all_data = lib.concat_tables(tables)
  1211. if use_pandas_metadata:
  1212. # We need to ensure that this metadata is set in the Table's schema
  1213. # so that Table.to_pandas will construct pandas.DataFrame with the
  1214. # right index
  1215. common_metadata = self._get_common_pandas_metadata()
  1216. current_metadata = all_data.schema.metadata or {}
  1217. if common_metadata and b'pandas' not in current_metadata:
  1218. all_data = all_data.replace_schema_metadata({
  1219. b'pandas': common_metadata})
  1220. return all_data
  1221. def read_pandas(self, **kwargs):
  1222. """
  1223. Read dataset including pandas metadata, if any. Other arguments passed
  1224. through to ParquetDataset.read, see docstring for further details.
  1225. Returns
  1226. -------
  1227. pyarrow.Table
  1228. Content of the file as a table (of columns).
  1229. """
  1230. return self.read(use_pandas_metadata=True, **kwargs)
  1231. def _get_common_pandas_metadata(self):
  1232. if self.common_metadata is None:
  1233. return None
  1234. keyvalues = self.common_metadata.metadata
  1235. return keyvalues.get(b'pandas', None)
  1236. def _filter(self, filters):
  1237. accepts_filter = self._partitions.filter_accepts_partition
  1238. def one_filter_accepts(piece, filter):
  1239. return all(accepts_filter(part_key, filter, level)
  1240. for level, part_key in enumerate(piece.partition_keys))
  1241. def all_filters_accept(piece):
  1242. return any(all(one_filter_accepts(piece, f) for f in conjunction)
  1243. for conjunction in filters)
  1244. self._pieces = [p for p in self._pieces if all_filters_accept(p)]
  1245. @property
  1246. def pieces(self):
  1247. warnings.warn(
  1248. _DEPR_MSG.format(
  1249. "ParquetDataset.pieces",
  1250. " Specify 'use_legacy_dataset=False' while constructing the "
  1251. "ParquetDataset, and then use the '.fragments' attribute "
  1252. "instead."),
  1253. DeprecationWarning, stacklevel=2)
  1254. return self._pieces
  1255. @property
  1256. def partitions(self):
  1257. warnings.warn(
  1258. _DEPR_MSG.format("ParquetDataset.partitions", ""),
  1259. DeprecationWarning, stacklevel=2)
  1260. return self._partitions
  1261. @property
  1262. def memory_map(self):
  1263. warnings.warn(
  1264. _DEPR_MSG.format("ParquetDataset.memory_map", ""),
  1265. DeprecationWarning, stacklevel=2)
  1266. return self._metadata.memory_map
  1267. @property
  1268. def read_dictionary(self):
  1269. warnings.warn(
  1270. _DEPR_MSG.format("ParquetDataset.read_dictionary", ""),
  1271. DeprecationWarning, stacklevel=2)
  1272. return self._metadata.read_dictionary
  1273. @property
  1274. def buffer_size(self):
  1275. warnings.warn(
  1276. _DEPR_MSG.format("ParquetDataset.buffer_size", ""),
  1277. DeprecationWarning, stacklevel=2)
  1278. return self._metadata.buffer_size
  1279. _fs = property(
  1280. operator.attrgetter('_metadata.fs')
  1281. )
  1282. @property
  1283. def fs(self):
  1284. warnings.warn(
  1285. _DEPR_MSG.format(
  1286. "ParquetDataset.fs",
  1287. " Specify 'use_legacy_dataset=False' while constructing the "
  1288. "ParquetDataset, and then use the '.filesystem' attribute "
  1289. "instead."),
  1290. DeprecationWarning, stacklevel=2)
  1291. return self._metadata.fs
  1292. common_metadata = property(
  1293. operator.attrgetter('_metadata.common_metadata')
  1294. )
  1295. def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1,
  1296. open_file_func=None):
  1297. partitions = None
  1298. common_metadata_path = None
  1299. metadata_path = None
  1300. if isinstance(path_or_paths, list) and len(path_or_paths) == 1:
  1301. # Dask passes a directory as a list of length 1
  1302. path_or_paths = path_or_paths[0]
  1303. if _is_path_like(path_or_paths) and fs.isdir(path_or_paths):
  1304. manifest = ParquetManifest(path_or_paths, filesystem=fs,
  1305. open_file_func=open_file_func,
  1306. pathsep=getattr(fs, "pathsep", "/"),
  1307. metadata_nthreads=metadata_nthreads)
  1308. common_metadata_path = manifest.common_metadata_path
  1309. metadata_path = manifest.metadata_path
  1310. pieces = manifest.pieces
  1311. partitions = manifest.partitions
  1312. else:
  1313. if not isinstance(path_or_paths, list):
  1314. path_or_paths = [path_or_paths]
  1315. # List of paths
  1316. if len(path_or_paths) == 0:
  1317. raise ValueError('Must pass at least one file path')
  1318. pieces = []
  1319. for path in path_or_paths:
  1320. if not fs.isfile(path):
  1321. raise OSError('Passed non-file path: {}'
  1322. .format(path))
  1323. piece = ParquetDatasetPiece._create(
  1324. path, open_file_func=open_file_func)
  1325. pieces.append(piece)
  1326. return pieces, partitions, common_metadata_path, metadata_path
  1327. def _is_local_file_system(fs):
  1328. return isinstance(fs, LocalFileSystem) or isinstance(
  1329. fs, legacyfs.LocalFileSystem
  1330. )
  1331. class _ParquetDatasetV2:
  1332. """
  1333. ParquetDataset shim using the Dataset API under the hood.
  1334. """
  1335. def __init__(self, path_or_paths, filesystem=None, filters=None,
  1336. partitioning="hive", read_dictionary=None, buffer_size=None,
  1337. memory_map=False, ignore_prefixes=None, pre_buffer=True,
  1338. coerce_int96_timestamp_unit=None, **kwargs):
  1339. import pyarrow.dataset as ds
  1340. # Raise error for not supported keywords
  1341. for keyword, default in [
  1342. ("schema", None), ("metadata", None),
  1343. ("split_row_groups", False), ("validate_schema", True),
  1344. ("metadata_nthreads", 1)]:
  1345. if keyword in kwargs and kwargs[keyword] is not default:
  1346. raise ValueError(
  1347. "Keyword '{0}' is not yet supported with the new "
  1348. "Dataset API".format(keyword))
  1349. # map format arguments
  1350. read_options = {
  1351. "pre_buffer": pre_buffer,
  1352. "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit
  1353. }
  1354. if buffer_size:
  1355. read_options.update(use_buffered_stream=True,
  1356. buffer_size=buffer_size)
  1357. if read_dictionary is not None:
  1358. read_options.update(dictionary_columns=read_dictionary)
  1359. # map filters to Expressions
  1360. self._filters = filters
  1361. self._filter_expression = filters and _filters_to_expression(filters)
  1362. # map old filesystems to new one
  1363. if filesystem is not None:
  1364. filesystem = _ensure_filesystem(
  1365. filesystem, use_mmap=memory_map)
  1366. elif filesystem is None and memory_map:
  1367. # if memory_map is specified, assume local file system (string
  1368. # path can in principle be URI for any filesystem)
  1369. filesystem = LocalFileSystem(use_mmap=memory_map)
  1370. # This needs to be checked after _ensure_filesystem, because that
  1371. # handles the case of an fsspec LocalFileSystem
  1372. if (
  1373. hasattr(path_or_paths, "__fspath__") and
  1374. filesystem is not None and
  1375. not _is_local_file_system(filesystem)
  1376. ):
  1377. raise TypeError(
  1378. "Path-like objects with __fspath__ must only be used with "
  1379. f"local file systems, not {type(filesystem)}"
  1380. )
  1381. # check for single fragment dataset
  1382. single_file = None
  1383. if isinstance(path_or_paths, list):
  1384. if len(path_or_paths) == 1:
  1385. single_file = path_or_paths[0]
  1386. else:
  1387. if _is_path_like(path_or_paths):
  1388. path_or_paths = _stringify_path(path_or_paths)
  1389. if filesystem is None:
  1390. # path might be a URI describing the FileSystem as well
  1391. try:
  1392. filesystem, path_or_paths = FileSystem.from_uri(
  1393. path_or_paths)
  1394. except ValueError:
  1395. filesystem = LocalFileSystem(use_mmap=memory_map)
  1396. if filesystem.get_file_info(path_or_paths).is_file:
  1397. single_file = path_or_paths
  1398. else:
  1399. single_file = path_or_paths
  1400. if single_file is not None:
  1401. self._enable_parallel_column_conversion = True
  1402. read_options.update(enable_parallel_column_conversion=True)
  1403. parquet_format = ds.ParquetFileFormat(**read_options)
  1404. fragment = parquet_format.make_fragment(single_file, filesystem)
  1405. self._dataset = ds.FileSystemDataset(
  1406. [fragment], schema=fragment.physical_schema,
  1407. format=parquet_format,
  1408. filesystem=fragment.filesystem
  1409. )
  1410. return
  1411. else:
  1412. self._enable_parallel_column_conversion = False
  1413. parquet_format = ds.ParquetFileFormat(**read_options)
  1414. # check partitioning to enable dictionary encoding
  1415. if partitioning == "hive":
  1416. partitioning = ds.HivePartitioning.discover(
  1417. infer_dictionary=True)
  1418. self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
  1419. format=parquet_format,
  1420. partitioning=partitioning,
  1421. ignore_prefixes=ignore_prefixes)
  1422. @property
  1423. def schema(self):
  1424. return self._dataset.schema
  1425. def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
  1426. """
  1427. Read (multiple) Parquet files as a single pyarrow.Table.
  1428. Parameters
  1429. ----------
  1430. columns : List[str]
  1431. Names of columns to read from the dataset. The partition fields
  1432. are not automatically included (in contrast to when setting
  1433. ``use_legacy_dataset=True``).
  1434. use_threads : bool, default True
  1435. Perform multi-threaded column reads.
  1436. use_pandas_metadata : bool, default False
  1437. If True and file has custom pandas schema metadata, ensure that
  1438. index columns are also loaded.
  1439. Returns
  1440. -------
  1441. pyarrow.Table
  1442. Content of the file as a table (of columns).
  1443. """
  1444. # if use_pandas_metadata, we need to include index columns in the
  1445. # column selection, to be able to restore those in the pandas DataFrame
  1446. metadata = self.schema.metadata
  1447. if columns is not None and use_pandas_metadata:
  1448. if metadata and b'pandas' in metadata:
  1449. # RangeIndex can be represented as dict instead of column name
  1450. index_columns = [
  1451. col for col in _get_pandas_index_columns(metadata)
  1452. if not isinstance(col, dict)
  1453. ]
  1454. columns = (
  1455. list(columns) + list(set(index_columns) - set(columns))
  1456. )
  1457. if self._enable_parallel_column_conversion:
  1458. if use_threads:
  1459. # Allow per-column parallelism; would otherwise cause
  1460. # contention in the presence of per-file parallelism.
  1461. use_threads = False
  1462. table = self._dataset.to_table(
  1463. columns=columns, filter=self._filter_expression,
  1464. use_threads=use_threads
  1465. )
  1466. # if use_pandas_metadata, restore the pandas metadata (which gets
  1467. # lost if doing a specific `columns` selection in to_table)
  1468. if use_pandas_metadata:
  1469. if metadata and b"pandas" in metadata:
  1470. new_metadata = table.schema.metadata or {}
  1471. new_metadata.update({b"pandas": metadata[b"pandas"]})
  1472. table = table.replace_schema_metadata(new_metadata)
  1473. return table
  1474. def read_pandas(self, **kwargs):
  1475. """
  1476. Read dataset including pandas metadata, if any. Other arguments passed
  1477. through to ParquetDataset.read, see docstring for further details.
  1478. """
  1479. return self.read(use_pandas_metadata=True, **kwargs)
  1480. @property
  1481. def pieces(self):
  1482. warnings.warn(
  1483. _DEPR_MSG.format("ParquetDataset.pieces",
  1484. " Use the '.fragments' attribute instead"),
  1485. DeprecationWarning, stacklevel=2)
  1486. return list(self._dataset.get_fragments())
  1487. @property
  1488. def fragments(self):
  1489. return list(self._dataset.get_fragments())
  1490. @property
  1491. def files(self):
  1492. return self._dataset.files
  1493. @property
  1494. def filesystem(self):
  1495. return self._dataset.filesystem
  1496. _read_table_docstring = """
  1497. {0}
  1498. Parameters
  1499. ----------
  1500. source: str, pyarrow.NativeFile, or file-like object
  1501. If a string passed, can be a single file name or directory name. For
  1502. file-like objects, only read a single file. Use pyarrow.BufferReader to
  1503. read a file contained in a bytes or buffer-like object.
  1504. columns: list
  1505. If not None, only these columns will be read from the file. A column
  1506. name may be a prefix of a nested field, e.g. 'a' will select 'a.b',
  1507. 'a.c', and 'a.d.e'.
  1508. use_threads : bool, default True
  1509. Perform multi-threaded column reads.
  1510. metadata : FileMetaData
  1511. If separately computed
  1512. {1}
  1513. use_legacy_dataset : bool, default False
  1514. By default, `read_table` uses the new Arrow Datasets API since
  1515. pyarrow 1.0.0. Among other things, this allows to pass `filters`
  1516. for all columns and not only the partition keys, enables
  1517. different partitioning schemes, etc.
  1518. Set to True to use the legacy behaviour.
  1519. ignore_prefixes : list, optional
  1520. Files matching any of these prefixes will be ignored by the
  1521. discovery process if use_legacy_dataset=False.
  1522. This is matched to the basename of a path.
  1523. By default this is ['.', '_'].
  1524. Note that discovery happens only if a directory is passed as source.
  1525. filesystem : FileSystem, default None
  1526. If nothing passed, paths assumed to be found in the local on-disk
  1527. filesystem.
  1528. filters : List[Tuple] or List[List[Tuple]] or None (default)
  1529. Rows which do not match the filter predicate will be removed from scanned
  1530. data. Partition keys embedded in a nested directory structure will be
  1531. exploited to avoid loading files at all if they contain no matching rows.
  1532. If `use_legacy_dataset` is True, filters can only reference partition
  1533. keys and only a hive-style directory structure is supported. When
  1534. setting `use_legacy_dataset` to False, also within-file level filtering
  1535. and different partitioning schemes are supported.
  1536. {3}
  1537. pre_buffer : bool, default True
  1538. Coalesce and issue file reads in parallel to improve performance on
  1539. high-latency filesystems (e.g. S3). If True, Arrow will use a
  1540. background I/O thread pool. This option is only supported for
  1541. use_legacy_dataset=False. If using a filesystem layer that itself
  1542. performs readahead (e.g. fsspec's S3FS), disable readahead for best
  1543. results.
  1544. Returns
  1545. -------
  1546. {2}
  1547. """
  1548. def read_table(source, columns=None, use_threads=True, metadata=None,
  1549. use_pandas_metadata=False, memory_map=False,
  1550. read_dictionary=None, filesystem=None, filters=None,
  1551. buffer_size=0, partitioning="hive", use_legacy_dataset=False,
  1552. ignore_prefixes=None, pre_buffer=True,
  1553. coerce_int96_timestamp_unit=None):
  1554. if not use_legacy_dataset:
  1555. if metadata is not None:
  1556. raise ValueError(
  1557. "The 'metadata' keyword is no longer supported with the new "
  1558. "datasets-based implementation. Specify "
  1559. "'use_legacy_dataset=True' to temporarily recover the old "
  1560. "behaviour."
  1561. )
  1562. try:
  1563. dataset = _ParquetDatasetV2(
  1564. source,
  1565. filesystem=filesystem,
  1566. partitioning=partitioning,
  1567. memory_map=memory_map,
  1568. read_dictionary=read_dictionary,
  1569. buffer_size=buffer_size,
  1570. filters=filters,
  1571. ignore_prefixes=ignore_prefixes,
  1572. pre_buffer=pre_buffer,
  1573. coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
  1574. )
  1575. except ImportError:
  1576. # fall back on ParquetFile for simple cases when pyarrow.dataset
  1577. # module is not available
  1578. if filters is not None:
  1579. raise ValueError(
  1580. "the 'filters' keyword is not supported when the "
  1581. "pyarrow.dataset module is not available"
  1582. )
  1583. if partitioning != "hive":
  1584. raise ValueError(
  1585. "the 'partitioning' keyword is not supported when the "
  1586. "pyarrow.dataset module is not available"
  1587. )
  1588. filesystem, path = _resolve_filesystem_and_path(source, filesystem)
  1589. if filesystem is not None:
  1590. source = filesystem.open_input_file(path)
  1591. # TODO test that source is not a directory or a list
  1592. dataset = ParquetFile(
  1593. source, metadata=metadata, read_dictionary=read_dictionary,
  1594. memory_map=memory_map, buffer_size=buffer_size,
  1595. pre_buffer=pre_buffer,
  1596. coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
  1597. )
  1598. return dataset.read(columns=columns, use_threads=use_threads,
  1599. use_pandas_metadata=use_pandas_metadata)
  1600. if ignore_prefixes is not None:
  1601. raise ValueError(
  1602. "The 'ignore_prefixes' keyword is only supported when "
  1603. "use_legacy_dataset=False")
  1604. if _is_path_like(source):
  1605. pf = ParquetDataset(
  1606. source, metadata=metadata, memory_map=memory_map,
  1607. read_dictionary=read_dictionary,
  1608. buffer_size=buffer_size,
  1609. filesystem=filesystem, filters=filters,
  1610. partitioning=partitioning,
  1611. coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
  1612. )
  1613. else:
  1614. pf = ParquetFile(
  1615. source, metadata=metadata,
  1616. read_dictionary=read_dictionary,
  1617. memory_map=memory_map,
  1618. buffer_size=buffer_size,
  1619. coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
  1620. )
  1621. return pf.read(columns=columns, use_threads=use_threads,
  1622. use_pandas_metadata=use_pandas_metadata)
  1623. read_table.__doc__ = _read_table_docstring.format(
  1624. """Read a Table from Parquet format
  1625. Note: starting with pyarrow 1.0, the default for `use_legacy_dataset` is
  1626. switched to False.""",
  1627. "\n".join((_read_docstring_common,
  1628. """use_pandas_metadata : bool, default False
  1629. If True and file has custom pandas schema metadata, ensure that
  1630. index columns are also loaded.""")),
  1631. """pyarrow.Table
  1632. Content of the file as a table (of columns)""",
  1633. _DNF_filter_doc)
  1634. def read_pandas(source, columns=None, **kwargs):
  1635. return read_table(
  1636. source, columns=columns, use_pandas_metadata=True, **kwargs
  1637. )
  1638. read_pandas.__doc__ = _read_table_docstring.format(
  1639. 'Read a Table from Parquet format, also reading DataFrame\n'
  1640. 'index values if known in the file metadata',
  1641. _read_docstring_common,
  1642. """pyarrow.Table
  1643. Content of the file as a Table of Columns, including DataFrame
  1644. indexes as columns""",
  1645. _DNF_filter_doc)
  1646. def write_table(table, where, row_group_size=None, version='1.0',
  1647. use_dictionary=True, compression='snappy',
  1648. write_statistics=True,
  1649. use_deprecated_int96_timestamps=None,
  1650. coerce_timestamps=None,
  1651. allow_truncated_timestamps=False,
  1652. data_page_size=None, flavor=None,
  1653. filesystem=None,
  1654. compression_level=None,
  1655. use_byte_stream_split=False,
  1656. data_page_version='1.0',
  1657. use_compliant_nested_type=False,
  1658. **kwargs):
  1659. row_group_size = kwargs.pop('chunk_size', row_group_size)
  1660. use_int96 = use_deprecated_int96_timestamps
  1661. try:
  1662. with ParquetWriter(
  1663. where, table.schema,
  1664. filesystem=filesystem,
  1665. version=version,
  1666. flavor=flavor,
  1667. use_dictionary=use_dictionary,
  1668. write_statistics=write_statistics,
  1669. coerce_timestamps=coerce_timestamps,
  1670. data_page_size=data_page_size,
  1671. allow_truncated_timestamps=allow_truncated_timestamps,
  1672. compression=compression,
  1673. use_deprecated_int96_timestamps=use_int96,
  1674. compression_level=compression_level,
  1675. use_byte_stream_split=use_byte_stream_split,
  1676. data_page_version=data_page_version,
  1677. use_compliant_nested_type=use_compliant_nested_type,
  1678. **kwargs) as writer:
  1679. writer.write_table(table, row_group_size=row_group_size)
  1680. except Exception:
  1681. if _is_path_like(where):
  1682. try:
  1683. os.remove(_stringify_path(where))
  1684. except os.error:
  1685. pass
  1686. raise
  1687. write_table.__doc__ = """
  1688. Write a Table to Parquet format.
  1689. Parameters
  1690. ----------
  1691. table : pyarrow.Table
  1692. where: string or pyarrow.NativeFile
  1693. row_group_size: int
  1694. The number of rows per rowgroup
  1695. {}
  1696. """.format(_parquet_writer_arg_docs)
  1697. def _mkdir_if_not_exists(fs, path):
  1698. if fs._isfilestore() and not fs.exists(path):
  1699. try:
  1700. fs.mkdir(path)
  1701. except OSError:
  1702. assert fs.exists(path)
  1703. def write_to_dataset(table, root_path, partition_cols=None,
  1704. partition_filename_cb=None, filesystem=None,
  1705. use_legacy_dataset=None, **kwargs):
  1706. """Wrapper around parquet.write_table for writing a Table to
  1707. Parquet format by partitions.
  1708. For each combination of partition columns and values,
  1709. a subdirectories are created in the following
  1710. manner:
  1711. root_dir/
  1712. group1=value1
  1713. group2=value1
  1714. <uuid>.parquet
  1715. group2=value2
  1716. <uuid>.parquet
  1717. group1=valueN
  1718. group2=value1
  1719. <uuid>.parquet
  1720. group2=valueN
  1721. <uuid>.parquet
  1722. Parameters
  1723. ----------
  1724. table : pyarrow.Table
  1725. root_path : str, pathlib.Path
  1726. The root directory of the dataset
  1727. filesystem : FileSystem, default None
  1728. If nothing passed, paths assumed to be found in the local on-disk
  1729. filesystem
  1730. partition_cols : list,
  1731. Column names by which to partition the dataset
  1732. Columns are partitioned in the order they are given
  1733. partition_filename_cb : callable,
  1734. A callback function that takes the partition key(s) as an argument
  1735. and allow you to override the partition filename. If nothing is
  1736. passed, the filename will consist of a uuid.
  1737. use_legacy_dataset : bool
  1738. Default is True unless a ``pyarrow.fs`` filesystem is passed.
  1739. Set to False to enable the new code path (experimental, using the
  1740. new Arrow Dataset API). This is more efficient when using partition
  1741. columns, but does not (yet) support `partition_filename_cb` and
  1742. `metadata_collector` keywords.
  1743. **kwargs : dict,
  1744. Additional kwargs for write_table function. See docstring for
  1745. `write_table` or `ParquetWriter` for more information.
  1746. Using `metadata_collector` in kwargs allows one to collect the
  1747. file metadata instances of dataset pieces. The file paths in the
  1748. ColumnChunkMetaData will be set relative to `root_path`.
  1749. """
  1750. if use_legacy_dataset is None:
  1751. # if a new filesystem is passed -> default to new implementation
  1752. if isinstance(filesystem, FileSystem):
  1753. use_legacy_dataset = False
  1754. # otherwise the default is still True
  1755. else:
  1756. use_legacy_dataset = True
  1757. if not use_legacy_dataset:
  1758. import pyarrow.dataset as ds
  1759. # extract non-file format options
  1760. schema = kwargs.pop("schema", None)
  1761. use_threads = kwargs.pop("use_threads", True)
  1762. # raise for unsupported keywords
  1763. msg = (
  1764. "The '{}' argument is not supported with the new dataset "
  1765. "implementation."
  1766. )
  1767. metadata_collector = kwargs.pop('metadata_collector', None)
  1768. file_visitor = None
  1769. if metadata_collector is not None:
  1770. def file_visitor(written_file):
  1771. metadata_collector.append(written_file.metadata)
  1772. if partition_filename_cb is not None:
  1773. raise ValueError(msg.format("partition_filename_cb"))
  1774. # map format arguments
  1775. parquet_format = ds.ParquetFileFormat()
  1776. write_options = parquet_format.make_write_options(**kwargs)
  1777. # map old filesystems to new one
  1778. if filesystem is not None:
  1779. filesystem = _ensure_filesystem(filesystem)
  1780. partitioning = None
  1781. if partition_cols:
  1782. part_schema = table.select(partition_cols).schema
  1783. partitioning = ds.partitioning(part_schema, flavor="hive")
  1784. ds.write_dataset(
  1785. table, root_path, filesystem=filesystem,
  1786. format=parquet_format, file_options=write_options, schema=schema,
  1787. partitioning=partitioning, use_threads=use_threads,
  1788. file_visitor=file_visitor)
  1789. return
  1790. fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem)
  1791. _mkdir_if_not_exists(fs, root_path)
  1792. metadata_collector = kwargs.pop('metadata_collector', None)
  1793. if partition_cols is not None and len(partition_cols) > 0:
  1794. df = table.to_pandas()
  1795. partition_keys = [df[col] for col in partition_cols]
  1796. data_df = df.drop(partition_cols, axis='columns')
  1797. data_cols = df.columns.drop(partition_cols)
  1798. if len(data_cols) == 0:
  1799. raise ValueError('No data left to save outside partition columns')
  1800. subschema = table.schema
  1801. # ARROW-2891: Ensure the output_schema is preserved when writing a
  1802. # partitioned dataset
  1803. for col in table.schema.names:
  1804. if col in partition_cols:
  1805. subschema = subschema.remove(subschema.get_field_index(col))
  1806. for keys, subgroup in data_df.groupby(partition_keys):
  1807. if not isinstance(keys, tuple):
  1808. keys = (keys,)
  1809. subdir = '/'.join(
  1810. ['{colname}={value}'.format(colname=name, value=val)
  1811. for name, val in zip(partition_cols, keys)])
  1812. subtable = pa.Table.from_pandas(subgroup, schema=subschema,
  1813. safe=False)
  1814. _mkdir_if_not_exists(fs, '/'.join([root_path, subdir]))
  1815. if partition_filename_cb:
  1816. outfile = partition_filename_cb(keys)
  1817. else:
  1818. outfile = guid() + '.parquet'
  1819. relative_path = '/'.join([subdir, outfile])
  1820. full_path = '/'.join([root_path, relative_path])
  1821. with fs.open(full_path, 'wb') as f:
  1822. write_table(subtable, f, metadata_collector=metadata_collector,
  1823. **kwargs)
  1824. if metadata_collector is not None:
  1825. metadata_collector[-1].set_file_path(relative_path)
  1826. else:
  1827. if partition_filename_cb:
  1828. outfile = partition_filename_cb(None)
  1829. else:
  1830. outfile = guid() + '.parquet'
  1831. full_path = '/'.join([root_path, outfile])
  1832. with fs.open(full_path, 'wb') as f:
  1833. write_table(table, f, metadata_collector=metadata_collector,
  1834. **kwargs)
  1835. if metadata_collector is not None:
  1836. metadata_collector[-1].set_file_path(outfile)
  1837. def write_metadata(schema, where, metadata_collector=None, **kwargs):
  1838. """
  1839. Write metadata-only Parquet file from schema. This can be used with
  1840. `write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar
  1841. files.
  1842. Parameters
  1843. ----------
  1844. schema : pyarrow.Schema
  1845. where: string or pyarrow.NativeFile
  1846. metadata_collector:
  1847. **kwargs : dict,
  1848. Additional kwargs for ParquetWriter class. See docstring for
  1849. `ParquetWriter` for more information.
  1850. Examples
  1851. --------
  1852. Write a dataset and collect metadata information.
  1853. >>> metadata_collector = []
  1854. >>> write_to_dataset(
  1855. ... table, root_path,
  1856. ... metadata_collector=metadata_collector, **writer_kwargs)
  1857. Write the `_common_metadata` parquet file without row groups statistics.
  1858. >>> write_metadata(
  1859. ... table.schema, root_path / '_common_metadata', **writer_kwargs)
  1860. Write the `_metadata` parquet file with row groups statistics.
  1861. >>> write_metadata(
  1862. ... table.schema, root_path / '_metadata',
  1863. ... metadata_collector=metadata_collector, **writer_kwargs)
  1864. """
  1865. writer = ParquetWriter(where, schema, **kwargs)
  1866. writer.close()
  1867. if metadata_collector is not None:
  1868. # ParquetWriter doesn't expose the metadata until it's written. Write
  1869. # it and read it again.
  1870. metadata = read_metadata(where)
  1871. for m in metadata_collector:
  1872. metadata.append_row_groups(m)
  1873. metadata.write_metadata_file(where)
  1874. def read_metadata(where, memory_map=False):
  1875. """
  1876. Read FileMetadata from footer of a single Parquet file.
  1877. Parameters
  1878. ----------
  1879. where : str (filepath) or file-like object
  1880. memory_map : bool, default False
  1881. Create memory map when the source is a file path.
  1882. Returns
  1883. -------
  1884. metadata : FileMetadata
  1885. """
  1886. return ParquetFile(where, memory_map=memory_map).metadata
  1887. def read_schema(where, memory_map=False):
  1888. """
  1889. Read effective Arrow schema from Parquet file metadata.
  1890. Parameters
  1891. ----------
  1892. where : str (filepath) or file-like object
  1893. memory_map : bool, default False
  1894. Create memory map when the source is a file path.
  1895. Returns
  1896. -------
  1897. schema : pyarrow.Schema
  1898. """
  1899. return ParquetFile(where, memory_map=memory_map).schema.to_arrow_schema()