sitecustomize.pyx 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import pathlib
  2. import io
  3. import os
  4. import re
  5. import sys
  6. from importlib.metadata import (
  7. Distribution,
  8. DistributionFinder,
  9. Prepared,
  10. )
  11. from importlib.resources.abc import Traversable
  12. import __res
  13. METADATA_NAME = re.compile("^Name: (.*)$", re.MULTILINE)
  14. class ArcadiaTraversable(Traversable):
  15. def __init__(self, resfs):
  16. self._resfs = resfs
  17. self._path = pathlib.Path(resfs)
  18. def __eq__(self, other) -> bool:
  19. if isinstance(other, ArcadiaTraversable):
  20. return self._path == other._path
  21. raise NotImplementedError
  22. def __lt__(self, other) -> bool:
  23. if isinstance(other, ArcadiaTraversable):
  24. return self._path < other._path
  25. raise NotImplementedError
  26. def __hash__(self) -> int:
  27. return hash(self._path)
  28. @property
  29. def name(self):
  30. return self._path.name
  31. @property
  32. def suffix(self):
  33. return self._path.suffix
  34. @property
  35. def stem(self):
  36. return self._path.stem
  37. class ArcadiaResource(ArcadiaTraversable):
  38. def is_file(self):
  39. return True
  40. def is_dir(self):
  41. return False
  42. def open(self, mode="r", *args, **kwargs):
  43. data = __res.find(self._resfs.encode("utf-8"))
  44. if data is None:
  45. raise FileNotFoundError(self._resfs)
  46. stream = io.BytesIO(data)
  47. if "b" not in mode:
  48. stream = io.TextIOWrapper(stream, *args, **kwargs)
  49. return stream
  50. def joinpath(self, *name):
  51. raise RuntimeError("Cannot traverse into a resource")
  52. def iterdir(self):
  53. return iter(())
  54. def __repr__(self) -> str:
  55. return f"ArcadiaResource({self._resfs!r})"
  56. class ArcadiaResourceContainer(ArcadiaTraversable):
  57. def is_dir(self):
  58. return True
  59. def is_file(self):
  60. return False
  61. def iterdir(self):
  62. seen = set()
  63. for key, path_without_prefix in __res.iter_keys(self._resfs.encode("utf-8")):
  64. if b"/" in path_without_prefix:
  65. subdir = path_without_prefix.split(b"/", maxsplit=1)[0].decode("utf-8")
  66. if subdir not in seen:
  67. seen.add(subdir)
  68. yield ArcadiaResourceContainer(f"{self._resfs}{subdir}/")
  69. else:
  70. yield ArcadiaResource(key.decode("utf-8"))
  71. def open(self, *args, **kwargs):
  72. raise IsADirectoryError(self._resfs)
  73. @staticmethod
  74. def _flatten(compound_names):
  75. for name in compound_names:
  76. yield from name.split("/")
  77. def joinpath(self, *descendants):
  78. if not descendants:
  79. return self
  80. names = self._flatten(descendants)
  81. target = next(names)
  82. for traversable in self.iterdir():
  83. if traversable.name == target:
  84. if isinstance(traversable, ArcadiaResource):
  85. return traversable
  86. else:
  87. return traversable.joinpath(*names)
  88. raise FileNotFoundError("/".join(self._flatten(descendants)))
  89. def __repr__(self):
  90. return f"ArcadiaResourceContainer({self._resfs!r})"
  91. class ArcadiaDistribution(Distribution):
  92. def __init__(self, prefix):
  93. self._prefix = prefix
  94. self._path = pathlib.Path(prefix)
  95. def read_text(self, filename):
  96. data = __res.resfs_read(f"{self._prefix}{filename}")
  97. if data is not None:
  98. return data.decode("utf-8")
  99. read_text.__doc__ = Distribution.read_text.__doc__
  100. def locate_file(self, path):
  101. return self._path.parent / path
  102. class MetadataArcadiaFinder(DistributionFinder):
  103. prefixes = {}
  104. @classmethod
  105. def find_distributions(cls, context=DistributionFinder.Context()):
  106. found = cls._search_prefixes(context.name)
  107. return map(ArcadiaDistribution, found)
  108. @classmethod
  109. def _init_prefixes(cls):
  110. cls.prefixes.clear()
  111. for resource in __res.resfs_files():
  112. resource = resource.decode("utf-8")
  113. if not resource.endswith("METADATA"):
  114. continue
  115. data = __res.resfs_read(resource).decode("utf-8")
  116. metadata_name = METADATA_NAME.search(data)
  117. if metadata_name:
  118. cls.prefixes[Prepared(metadata_name.group(1)).normalized] = resource.removesuffix("METADATA")
  119. @classmethod
  120. def _search_prefixes(cls, name):
  121. if not cls.prefixes:
  122. cls._init_prefixes()
  123. if name:
  124. try:
  125. yield cls.prefixes[Prepared(name).normalized]
  126. except KeyError:
  127. pass
  128. else:
  129. yield from sorted(cls.prefixes.values())