123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- import pathlib
- import io
- import os
- import re
- import sys
- import warnings
- from importlib.metadata import Distribution, DistributionFinder, PackageNotFoundError, Prepared
- from importlib.resources.abc import Traversable
- import __res
- with warnings.catch_warnings(action="ignore", category=DeprecationWarning):
- from importlib.abc import ResourceReader
- ResourceReader.register(__res._ResfsResourceReader)
- METADATA_NAME = re.compile('^Name: (.*)$', re.MULTILINE)
- class ArcadiaResourceHandle(Traversable):
- def __init__(self, key):
- self.resfs_key = key
- def is_file(self):
- return True
- def is_dir(self):
- return False
- def open(self, mode='r', *args, **kwargs):
- data = __res.find(self.resfs_key.encode("utf-8"))
- if data is None:
- raise FileNotFoundError(self.resfs_key)
- stream = io.BytesIO(data)
- if 'b' not in mode:
- stream = io.TextIOWrapper(stream, *args, **kwargs)
- return stream
- def joinpath(self, *name):
- raise RuntimeError("Cannot traverse into a resource")
- def iterdir(self):
- return iter(())
- @property
- def name(self):
- return os.path.basename(self.resfs_key)
- class ArcadiaResourceContainer(Traversable):
- def __init__(self, prefix):
- self.resfs_prefix = prefix
- def is_dir(self):
- return True
- def is_file(self):
- return False
- def iterdir(self):
- for key, path_without_prefix in __res.iter_keys(self.resfs_prefix.encode("utf-8")):
- if b"/" in path_without_prefix:
- name = path_without_prefix.decode("utf-8").split("/", maxsplit=1)[0]
- yield ArcadiaResourceContainer(f"{self.resfs_prefix}{name}/")
- else:
- yield ArcadiaResourceHandle(key.decode("utf-8"))
- def open(self, *args, **kwargs):
- raise IsADirectoryError(self.resfs_prefix)
- def joinpath(self, *descendants):
- if not descendants:
- return self
- return ArcadiaResourceHandle(os.path.join(self.resfs_prefix, *descendants))
- @property
- def name(self):
- return os.path.basename(self.resfs_prefix[:-1])
- class ArcadiaDistribution(Distribution):
- def __init__(self, prefix):
- self.prefix = prefix
- @property
- def _path(self):
- return pathlib.Path(self.prefix)
- def read_text(self, filename):
- data = __res.resfs_read(f'{self.prefix}{filename}')
- if data:
- return data.decode('utf-8')
- read_text.__doc__ = Distribution.read_text.__doc__
- def locate_file(self, path):
- return f'{self.prefix}{path}'
- class ArcadiaMetadataFinder(DistributionFinder):
- prefixes = {}
- @classmethod
- def find_distributions(cls, context=DistributionFinder.Context()):
- found = cls._search_prefixes(context.name)
- return map(ArcadiaDistribution, found)
- @classmethod
- def _init_prefixes(cls):
- cls.prefixes.clear()
- for resource in __res.resfs_files():
- resource = resource.decode('utf-8')
- if not resource.endswith('METADATA'):
- continue
- data = __res.resfs_read(resource).decode('utf-8')
- metadata_name = METADATA_NAME.search(data)
- if metadata_name:
- metadata_name = Prepared(metadata_name.group(1))
- cls.prefixes[metadata_name.normalized] = resource[:-len('METADATA')]
- @classmethod
- def _search_prefixes(cls, name):
- if not cls.prefixes:
- cls._init_prefixes()
- if name:
- try:
- yield cls.prefixes[Prepared(name).normalized]
- except KeyError:
- raise PackageNotFoundError(name)
- else:
- for prefix in sorted(cls.prefixes.values()):
- yield prefix
- # monkeypatch standart library
- import importlib.metadata
- importlib.metadata.MetadataPathFinder = ArcadiaMetadataFinder
|