sitecustomize.pyx 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. import pathlib
  2. import io
  3. import os
  4. import re
  5. import sys
  6. import warnings
  7. from importlib.metadata import (
  8. Distribution,
  9. DistributionFinder,
  10. Prepared,
  11. )
  12. from importlib.resources.abc import Traversable
  13. import __res
  14. with warnings.catch_warnings(action="ignore", category=DeprecationWarning):
  15. from importlib.abc import ResourceReader
  16. ResourceReader.register(__res._ResfsResourceReader)
  17. METADATA_NAME = re.compile("^Name: (.*)$", re.MULTILINE)
  18. class ArcadiaTraversable(Traversable):
  19. def __init__(self, resfs):
  20. self._resfs = resfs
  21. self._path = pathlib.Path(resfs)
  22. def __eq__(self, other) -> bool:
  23. if isinstance(other, ArcadiaTraversable):
  24. return self._path == other._path
  25. raise NotImplementedError
  26. def __lt__(self, other) -> bool:
  27. if isinstance(other, ArcadiaTraversable):
  28. return self._path < other._path
  29. raise NotImplementedError
  30. def __hash__(self) -> int:
  31. return hash(self._path)
  32. @property
  33. def name(self):
  34. return self._path.name
  35. @property
  36. def suffix(self):
  37. return self._path.suffix
  38. class ArcadiaResource(ArcadiaTraversable):
  39. def is_file(self):
  40. return True
  41. def is_dir(self):
  42. return False
  43. def open(self, mode="r", *args, **kwargs):
  44. data = __res.find(self._resfs.encode("utf-8"))
  45. if data is None:
  46. raise FileNotFoundError(self._resfs)
  47. stream = io.BytesIO(data)
  48. if "b" not in mode:
  49. stream = io.TextIOWrapper(stream, *args, **kwargs)
  50. return stream
  51. def joinpath(self, *name):
  52. raise RuntimeError("Cannot traverse into a resource")
  53. def iterdir(self):
  54. return iter(())
  55. def __repr__(self) -> str:
  56. return f"ArcadiaResource({self._resfs!r})"
  57. class ArcadiaResourceContainer(ArcadiaTraversable):
  58. def is_dir(self):
  59. return True
  60. def is_file(self):
  61. return False
  62. def iterdir(self):
  63. seen = set()
  64. for key, path_without_prefix in __res.iter_keys(self._resfs.encode("utf-8")):
  65. if b"/" in path_without_prefix:
  66. subdir = path_without_prefix.split(b"/", maxsplit=1)[0].decode("utf-8")
  67. if subdir not in seen:
  68. seen.add(subdir)
  69. yield ArcadiaResourceContainer(f"{self._resfs}{subdir}/")
  70. else:
  71. yield ArcadiaResource(key.decode("utf-8"))
  72. def open(self, *args, **kwargs):
  73. raise IsADirectoryError(self._resfs)
  74. @staticmethod
  75. def _flatten(compound_names):
  76. for name in compound_names:
  77. yield from name.split("/")
  78. def joinpath(self, *descendants):
  79. if not descendants:
  80. return self
  81. names = self._flatten(descendants)
  82. target = next(names)
  83. for traversable in self.iterdir():
  84. if traversable.name == target:
  85. if isinstance(traversable, ArcadiaResource):
  86. return traversable
  87. else:
  88. return traversable.joinpath(*names)
  89. raise FileNotFoundError("/".join(self._flatten(descendants)))
  90. def __repr__(self):
  91. return f"ArcadiaResourceContainer({self._resfs!r})"
  92. class ArcadiaDistribution(Distribution):
  93. def __init__(self, prefix):
  94. self._prefix = prefix
  95. self._path = pathlib.Path(prefix)
  96. def read_text(self, filename):
  97. data = __res.resfs_read(f"{self._prefix}{filename}")
  98. if data is not None:
  99. return data.decode("utf-8")
  100. read_text.__doc__ = Distribution.read_text.__doc__
  101. def locate_file(self, path):
  102. return self._path.parent / path
  103. class MetadataArcadiaFinder(DistributionFinder):
  104. prefixes = {}
  105. @classmethod
  106. def find_distributions(cls, context=DistributionFinder.Context()):
  107. found = cls._search_prefixes(context.name)
  108. return map(ArcadiaDistribution, found)
  109. @classmethod
  110. def _init_prefixes(cls):
  111. cls.prefixes.clear()
  112. for resource in __res.resfs_files():
  113. resource = resource.decode("utf-8")
  114. if not resource.endswith("METADATA"):
  115. continue
  116. data = __res.resfs_read(resource).decode("utf-8")
  117. metadata_name = METADATA_NAME.search(data)
  118. if metadata_name:
  119. cls.prefixes[Prepared(metadata_name.group(1)).normalized] = resource.removesuffix("METADATA")
  120. @classmethod
  121. def _search_prefixes(cls, name):
  122. if not cls.prefixes:
  123. cls._init_prefixes()
  124. if name:
  125. try:
  126. yield cls.prefixes[Prepared(name).normalized]
  127. except KeyError:
  128. pass
  129. else:
  130. yield from sorted(cls.prefixes.values())