sitecustomize.pyx 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import pathlib
  2. import io
  3. import os
  4. import re
  5. import sys
  6. import warnings
  7. from importlib.metadata import (
  8. Distribution,
  9. DistributionFinder,
  10. PackageNotFoundError,
  11. Prepared,
  12. )
  13. from importlib.resources.abc import Traversable
  14. import __res
  15. with warnings.catch_warnings(action="ignore", category=DeprecationWarning):
  16. from importlib.abc import ResourceReader
  17. ResourceReader.register(__res._ResfsResourceReader)
  18. METADATA_NAME = re.compile("^Name: (.*)$", re.MULTILINE)
  19. class ArcadiaResource(Traversable):
  20. def __init__(self, resfs_key):
  21. self.resfs_key = resfs_key
  22. def is_file(self):
  23. return True
  24. def is_dir(self):
  25. return False
  26. def open(self, mode="r", *args, **kwargs):
  27. data = __res.find(self.resfs_key.encode("utf-8"))
  28. if data is None:
  29. raise FileNotFoundError(self.resfs_key)
  30. stream = io.BytesIO(data)
  31. if "b" not in mode:
  32. stream = io.TextIOWrapper(stream, *args, **kwargs)
  33. return stream
  34. def joinpath(self, *name):
  35. raise RuntimeError("Cannot traverse into a resource")
  36. def iterdir(self):
  37. return iter(())
  38. @property
  39. def name(self):
  40. return os.path.basename(self.resfs_key)
  41. def __repr__(self):
  42. return f"ArcadiaResource({self.resfs_key!r})"
  43. class ArcadiaResourceContainer(Traversable):
  44. def __init__(self, prefix):
  45. self.resfs_prefix = prefix
  46. def is_dir(self):
  47. return True
  48. def is_file(self):
  49. return False
  50. def iterdir(self):
  51. seen = set()
  52. for key, path_without_prefix in __res.iter_keys(
  53. self.resfs_prefix.encode("utf-8")
  54. ):
  55. if b"/" in path_without_prefix:
  56. subdir = path_without_prefix.split(b"/", maxsplit=1)[0].decode("utf-8")
  57. if subdir not in seen:
  58. seen.add(subdir)
  59. yield ArcadiaResourceContainer(f"{self.resfs_prefix}{subdir}/")
  60. else:
  61. yield ArcadiaResource(key.decode("utf-8"))
  62. def open(self, *args, **kwargs):
  63. raise IsADirectoryError(self.resfs_prefix)
  64. @staticmethod
  65. def _flatten(compound_names):
  66. for name in compound_names:
  67. yield from name.split("/")
  68. def joinpath(self, *descendants):
  69. if not descendants:
  70. return self
  71. names = self._flatten(descendants)
  72. target = next(names)
  73. for traversable in self.iterdir():
  74. if traversable.name == target:
  75. if isinstance(traversable, ArcadiaResource):
  76. return traversable
  77. else:
  78. return traversable.joinpath(*names)
  79. raise FileNotFoundError("/".join(self._flatten(descendants)))
  80. @property
  81. def name(self):
  82. return os.path.basename(self.resfs_prefix[:-1])
  83. def __repr__(self):
  84. return f"ArcadiaResourceContainer({self.resfs_prefix!r})"
  85. class ArcadiaDistribution(Distribution):
  86. def __init__(self, prefix):
  87. self._prefix = prefix
  88. self._path = pathlib.Path(prefix)
  89. def read_text(self, filename):
  90. data = __res.resfs_read(f"{self._prefix}{filename}")
  91. if data is not None:
  92. return data.decode("utf-8")
  93. read_text.__doc__ = Distribution.read_text.__doc__
  94. def locate_file(self, path):
  95. return self._path.parent / path
  96. class ArcadiaMetadataFinder(DistributionFinder):
  97. prefixes = {}
  98. @classmethod
  99. def find_distributions(cls, context=DistributionFinder.Context()):
  100. found = cls._search_prefixes(context.name)
  101. return map(ArcadiaDistribution, found)
  102. @classmethod
  103. def _init_prefixes(cls):
  104. cls.prefixes.clear()
  105. for resource in __res.resfs_files():
  106. resource = resource.decode("utf-8")
  107. if not resource.endswith("METADATA"):
  108. continue
  109. data = __res.resfs_read(resource).decode("utf-8")
  110. metadata_name = METADATA_NAME.search(data)
  111. if metadata_name:
  112. metadata_name = Prepared(metadata_name.group(1))
  113. cls.prefixes[metadata_name.normalized] = resource[: -len("METADATA")]
  114. @classmethod
  115. def _search_prefixes(cls, name):
  116. if not cls.prefixes:
  117. cls._init_prefixes()
  118. if name:
  119. try:
  120. yield cls.prefixes[Prepared(name).normalized]
  121. except KeyError:
  122. raise PackageNotFoundError(name)
  123. else:
  124. yield from sorted(cls.prefixes.values())
  125. # monkeypatch standart library
  126. import importlib.metadata
  127. importlib.metadata.MetadataPathFinder = ArcadiaMetadataFinder