readers.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import collections
  2. import itertools
  3. import pathlib
  4. import operator
  5. import zipfile
  6. from . import abc
  7. from ._itertools import only
  8. def remove_duplicates(items):
  9. return iter(collections.OrderedDict.fromkeys(items))
  10. class FileReader(abc.TraversableResources):
  11. def __init__(self, loader):
  12. self.path = pathlib.Path(loader.path).parent
  13. def resource_path(self, resource):
  14. """
  15. Return the file system path to prevent
  16. `resources.path()` from creating a temporary
  17. copy.
  18. """
  19. return str(self.path.joinpath(resource))
  20. def files(self):
  21. return self.path
  22. class ZipReader(abc.TraversableResources):
  23. def __init__(self, loader, module):
  24. self.prefix = loader.prefix.replace('\\', '/')
  25. if loader.is_package(module):
  26. _, _, name = module.rpartition('.')
  27. self.prefix += name + '/'
  28. self.archive = loader.archive
  29. def open_resource(self, resource):
  30. try:
  31. return super().open_resource(resource)
  32. except KeyError as exc:
  33. raise FileNotFoundError(exc.args[0])
  34. def is_resource(self, path):
  35. """
  36. Workaround for `zipfile.Path.is_file` returning true
  37. for non-existent paths.
  38. """
  39. target = self.files().joinpath(path)
  40. return target.is_file() and target.exists()
  41. def files(self):
  42. return zipfile.Path(self.archive, self.prefix)
  43. class MultiplexedPath(abc.Traversable):
  44. """
  45. Given a series of Traversable objects, implement a merged
  46. version of the interface across all objects. Useful for
  47. namespace packages which may be multihomed at a single
  48. name.
  49. """
  50. def __init__(self, *paths):
  51. self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
  52. if not self._paths:
  53. message = 'MultiplexedPath must contain at least one path'
  54. raise FileNotFoundError(message)
  55. if not all(path.is_dir() for path in self._paths):
  56. raise NotADirectoryError('MultiplexedPath only supports directories')
  57. def iterdir(self):
  58. children = (child for path in self._paths for child in path.iterdir())
  59. by_name = operator.attrgetter('name')
  60. groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
  61. return map(self._follow, (locs for name, locs in groups))
  62. def read_bytes(self):
  63. raise FileNotFoundError(f'{self} is not a file')
  64. def read_text(self, *args, **kwargs):
  65. raise FileNotFoundError(f'{self} is not a file')
  66. def is_dir(self):
  67. return True
  68. def is_file(self):
  69. return False
  70. def joinpath(self, *descendants):
  71. try:
  72. return super().joinpath(*descendants)
  73. except abc.TraversalError:
  74. # One of the paths did not resolve (a directory does not exist).
  75. # Just return something that will not exist.
  76. return self._paths[0].joinpath(*descendants)
  77. @classmethod
  78. def _follow(cls, children):
  79. """
  80. Construct a MultiplexedPath if needed.
  81. If children contains a sole element, return it.
  82. Otherwise, return a MultiplexedPath of the items.
  83. Unless one of the items is not a Directory, then return the first.
  84. """
  85. subdirs, one_dir, one_file = itertools.tee(children, 3)
  86. try:
  87. return only(one_dir)
  88. except ValueError:
  89. try:
  90. return cls(*subdirs)
  91. except NotADirectoryError:
  92. return next(one_file)
  93. def open(self, *args, **kwargs):
  94. raise FileNotFoundError(f'{self} is not a file')
  95. @property
  96. def name(self):
  97. return self._paths[0].name
  98. def __repr__(self):
  99. paths = ', '.join(f"'{path}'" for path in self._paths)
  100. return f'MultiplexedPath({paths})'
  101. class NamespaceReader(abc.TraversableResources):
  102. def __init__(self, namespace_path):
  103. if 'NamespacePath' not in str(namespace_path):
  104. raise ValueError('Invalid path')
  105. self.path = MultiplexedPath(*list(namespace_path))
  106. def resource_path(self, resource):
  107. """
  108. Return the file system path to prevent
  109. `resources.path()` from creating a temporary
  110. copy.
  111. """
  112. return str(self.path.joinpath(resource))
  113. def files(self):
  114. return self.path