sitecustomize.pyx 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. import pathlib
  2. import io
  3. import os
  4. import re
  5. import sys
  6. from importlib.metadata import (
  7. Distribution,
  8. DistributionFinder,
  9. Prepared,
  10. )
  11. from importlib.resources.abc import Traversable
  12. import __res
  13. METADATA_NAME = re.compile("^Name: (.*)$", re.MULTILINE)
  14. class ArcadiaTraversable(Traversable):
  15. def __init__(self, resfs):
  16. self._resfs = resfs
  17. self._path = pathlib.Path(resfs)
  18. def __eq__(self, other) -> bool:
  19. if isinstance(other, ArcadiaTraversable):
  20. return self._path == other._path
  21. raise NotImplementedError
  22. def __lt__(self, other) -> bool:
  23. if isinstance(other, ArcadiaTraversable):
  24. return self._path < other._path
  25. raise NotImplementedError
  26. def __hash__(self) -> int:
  27. return hash(self._path)
  28. @property
  29. def name(self):
  30. return self._path.name
  31. @property
  32. def suffix(self):
  33. return self._path.suffix
  34. class ArcadiaResource(ArcadiaTraversable):
  35. def is_file(self):
  36. return True
  37. def is_dir(self):
  38. return False
  39. def open(self, mode="r", *args, **kwargs):
  40. data = __res.find(self._resfs.encode("utf-8"))
  41. if data is None:
  42. raise FileNotFoundError(self._resfs)
  43. stream = io.BytesIO(data)
  44. if "b" not in mode:
  45. stream = io.TextIOWrapper(stream, *args, **kwargs)
  46. return stream
  47. def joinpath(self, *name):
  48. raise RuntimeError("Cannot traverse into a resource")
  49. def iterdir(self):
  50. return iter(())
  51. def __repr__(self) -> str:
  52. return f"ArcadiaResource({self._resfs!r})"
  53. class ArcadiaResourceContainer(ArcadiaTraversable):
  54. def is_dir(self):
  55. return True
  56. def is_file(self):
  57. return False
  58. def iterdir(self):
  59. seen = set()
  60. for key, path_without_prefix in __res.iter_keys(self._resfs.encode("utf-8")):
  61. if b"/" in path_without_prefix:
  62. subdir = path_without_prefix.split(b"/", maxsplit=1)[0].decode("utf-8")
  63. if subdir not in seen:
  64. seen.add(subdir)
  65. yield ArcadiaResourceContainer(f"{self._resfs}{subdir}/")
  66. else:
  67. yield ArcadiaResource(key.decode("utf-8"))
  68. def open(self, *args, **kwargs):
  69. raise IsADirectoryError(self._resfs)
  70. @staticmethod
  71. def _flatten(compound_names):
  72. for name in compound_names:
  73. yield from name.split("/")
  74. def joinpath(self, *descendants):
  75. if not descendants:
  76. return self
  77. names = self._flatten(descendants)
  78. target = next(names)
  79. for traversable in self.iterdir():
  80. if traversable.name == target:
  81. if isinstance(traversable, ArcadiaResource):
  82. return traversable
  83. else:
  84. return traversable.joinpath(*names)
  85. raise FileNotFoundError("/".join(self._flatten(descendants)))
  86. def __repr__(self):
  87. return f"ArcadiaResourceContainer({self._resfs!r})"
  88. class ArcadiaDistribution(Distribution):
  89. def __init__(self, prefix):
  90. self._prefix = prefix
  91. self._path = pathlib.Path(prefix)
  92. def read_text(self, filename):
  93. data = __res.resfs_read(f"{self._prefix}{filename}")
  94. if data is not None:
  95. return data.decode("utf-8")
  96. read_text.__doc__ = Distribution.read_text.__doc__
  97. def locate_file(self, path):
  98. return self._path.parent / path
  99. class MetadataArcadiaFinder(DistributionFinder):
  100. prefixes = {}
  101. @classmethod
  102. def find_distributions(cls, context=DistributionFinder.Context()):
  103. found = cls._search_prefixes(context.name)
  104. return map(ArcadiaDistribution, found)
  105. @classmethod
  106. def _init_prefixes(cls):
  107. cls.prefixes.clear()
  108. for resource in __res.resfs_files():
  109. resource = resource.decode("utf-8")
  110. if not resource.endswith("METADATA"):
  111. continue
  112. data = __res.resfs_read(resource).decode("utf-8")
  113. metadata_name = METADATA_NAME.search(data)
  114. if metadata_name:
  115. cls.prefixes[Prepared(metadata_name.group(1)).normalized] = resource.removesuffix("METADATA")
  116. @classmethod
  117. def _search_prefixes(cls, name):
  118. if not cls.prefixes:
  119. cls._init_prefixes()
  120. if name:
  121. try:
  122. yield cls.prefixes[Prepared(name).normalized]
  123. except KeyError:
  124. pass
  125. else:
  126. yield from sorted(cls.prefixes.values())