sitecustomize.pyx 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import pathlib
  2. import io
  3. import os
  4. import re
  5. import sys
  6. import __res
  7. from importlib.abc import ResourceReader
  8. from importlib.metadata import Distribution, DistributionFinder, PackageNotFoundError, Prepared
  9. from importlib.resources.abc import Traversable
  10. ResourceReader.register(__res._ResfsResourceReader)
  11. METADATA_NAME = re.compile('^Name: (.*)$', re.MULTILINE)
  12. class ArcadiaResourceHandle(Traversable):
  13. def __init__(self, key):
  14. self.resfs_key = key
  15. def is_file(self):
  16. return True
  17. def is_dir(self):
  18. return False
  19. def open(self, mode='r', *args, **kwargs):
  20. data = __res.find(self.resfs_key.encode("utf-8"))
  21. if data is None:
  22. raise FileNotFoundError(self.resfs_key)
  23. stream = io.BytesIO(data)
  24. if 'b' not in mode:
  25. stream = io.TextIOWrapper(stream, *args, **kwargs)
  26. return stream
  27. def joinpath(self, *name):
  28. raise RuntimeError("Cannot traverse into a resource")
  29. def iterdir(self):
  30. return iter(())
  31. @property
  32. def name(self):
  33. return os.path.basename(self.resfs_key)
  34. class ArcadiaResourceContainer(Traversable):
  35. def __init__(self, prefix):
  36. self.resfs_prefix = prefix
  37. def is_dir(self):
  38. return True
  39. def is_file(self):
  40. return False
  41. def iterdir(self):
  42. for key, path_without_prefix in __res.iter_keys(self.resfs_prefix.encode("utf-8")):
  43. if b"/" in path_without_prefix:
  44. name = path_without_prefix.decode("utf-8").split("/", maxsplit=1)[0]
  45. yield ArcadiaResourceContainer(f"{self.resfs_prefix}{name}/")
  46. else:
  47. yield ArcadiaResourceHandle(key.decode("utf-8"))
  48. def open(self, *args, **kwargs):
  49. raise IsADirectoryError(self.resfs_prefix)
  50. def joinpath(self, *descendants):
  51. if not descendants:
  52. return self
  53. return ArcadiaResourceHandle(os.path.join(self.resfs_prefix, *descendants))
  54. @property
  55. def name(self):
  56. return os.path.basename(self.resfs_prefix[:-1])
  57. class ArcadiaDistribution(Distribution):
  58. def __init__(self, prefix):
  59. self.prefix = prefix
  60. @property
  61. def _path(self):
  62. return pathlib.Path(self.prefix)
  63. def read_text(self, filename):
  64. data = __res.resfs_read(f'{self.prefix}{filename}')
  65. if data:
  66. return data.decode('utf-8')
  67. read_text.__doc__ = Distribution.read_text.__doc__
  68. def locate_file(self, path):
  69. return f'{self.prefix}{path}'
  70. class ArcadiaMetadataFinder(DistributionFinder):
  71. prefixes = {}
  72. @classmethod
  73. def find_distributions(cls, context=DistributionFinder.Context()):
  74. found = cls._search_prefixes(context.name)
  75. return map(ArcadiaDistribution, found)
  76. @classmethod
  77. def _init_prefixes(cls):
  78. cls.prefixes.clear()
  79. for resource in __res.resfs_files():
  80. resource = resource.decode('utf-8')
  81. if not resource.endswith('METADATA'):
  82. continue
  83. data = __res.resfs_read(resource).decode('utf-8')
  84. metadata_name = METADATA_NAME.search(data)
  85. if metadata_name:
  86. metadata_name = Prepared(metadata_name.group(1))
  87. cls.prefixes[metadata_name.normalized] = resource[:-len('METADATA')]
  88. @classmethod
  89. def _search_prefixes(cls, name):
  90. if not cls.prefixes:
  91. cls._init_prefixes()
  92. if name:
  93. try:
  94. yield cls.prefixes[Prepared(name).normalized]
  95. except KeyError:
  96. raise PackageNotFoundError(name)
  97. else:
  98. for prefix in sorted(cls.prefixes.values()):
  99. yield prefix
  100. # monkeypatch standart library
  101. import importlib.metadata
  102. importlib.metadata.MetadataPathFinder = ArcadiaMetadataFinder