|
- import importlib.util
- import mimetypes
- import os
- import pathlib
- import pkgutil
- import sys
- import typing as t
- from collections import defaultdict
- from functools import update_wrapper
- from json import JSONDecoder
- from json import JSONEncoder
- from jinja2 import ChoiceLoader, FileSystemLoader, ResourceLoader, PackageLoader
- from werkzeug.exceptions import default_exceptions
- from werkzeug.exceptions import HTTPException
- from . import typing as ft
- from .cli import AppGroup
- from .globals import current_app
- from .helpers import get_root_path
- from .helpers import locked_cached_property
- from .helpers import send_file
- from .helpers import send_from_directory
- from .templating import _default_template_ctx_processor
- if t.TYPE_CHECKING:
- from .wrappers import Response
- # a singleton sentinel value for parameter defaults
- _sentinel = object()
- F = t.TypeVar("F", bound=t.Callable[..., t.Any])
- def setupmethod(f: F) -> F:
- """Wraps a method so that it performs a check in debug mode if the
- first request was already handled.
- """
- def wrapper_func(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
- if self._is_setup_finished():
- raise AssertionError(
- "A setup function was called after the first request "
- "was handled. This usually indicates a bug in the"
- " application where a module was not imported and"
- " decorators or other functionality was called too"
- " late.\nTo fix this make sure to import all your view"
- " modules, database models, and everything related at a"
- " central place before the application starts serving"
- " requests."
- )
- return f(self, *args, **kwargs)
- return t.cast(F, update_wrapper(wrapper_func, f))
- class Scaffold:
- """Common behavior shared between :class:`~flask.Flask` and
- :class:`~flask.blueprints.Blueprint`.
- :param import_name: The import name of the module where this object
- is defined. Usually :attr:`__name__` should be used.
- :param static_folder: Path to a folder of static files to serve.
- If this is set, a static route will be added.
- :param static_url_path: URL prefix for the static route.
- :param template_folder: Path to a folder containing template files.
- for rendering. If this is set, a Jinja loader will be added.
- :param root_path: The path that static, template, and resource files
- are relative to. Typically not set, it is discovered based on
- the ``import_name``.
- .. versionadded:: 2.0
- """
- name: str
- _static_folder: t.Optional[str] = None
- _static_url_path: t.Optional[str] = None
- #: JSON encoder class used by :func:`flask.json.dumps`. If a
- #: blueprint sets this, it will be used instead of the app's value.
- json_encoder: t.Optional[t.Type[JSONEncoder]] = None
- #: JSON decoder class used by :func:`flask.json.loads`. If a
- #: blueprint sets this, it will be used instead of the app's value.
- json_decoder: t.Optional[t.Type[JSONDecoder]] = None
- def __init__(
- self,
- import_name: str,
- static_folder: t.Optional[t.Union[str, os.PathLike]] = None,
- static_url_path: t.Optional[str] = None,
- template_folder: t.Optional[str] = None,
- root_path: t.Optional[str] = None,
- ):
- #: The name of the package or module that this object belongs
- #: to. Do not change this once it is set by the constructor.
- self.import_name = import_name
- self.static_folder = static_folder # type: ignore
- self.static_url_path = static_url_path
- package_name = import_name
- self.module_loader = pkgutil.find_loader(import_name)
- if self.module_loader and not self.module_loader.is_package(import_name):
- package_name = package_name.rsplit('.', 1)[0]
- self._builtin_resource_prefix = package_name.replace('.', '/')
- #: The path to the templates folder, relative to
- #: :attr:`root_path`, to add to the template loader. ``None`` if
- #: templates should not be added.
- self.template_folder = template_folder
- if root_path is None:
- root_path = get_root_path(self.import_name)
- #: Absolute path to the package on the filesystem. Used to look
- #: up resources contained in the package.
- self.root_path = root_path
- #: The Click command group for registering CLI commands for this
- #: object. The commands are available from the ``flask`` command
- #: once the application has been discovered and blueprints have
- #: been registered.
- self.cli = AppGroup()
- #: A dictionary mapping endpoint names to view functions.
- #:
- #: To register a view function, use the :meth:`route` decorator.
- #:
- #: This data structure is internal. It should not be modified
- #: directly and its format may change at any time.
- self.view_functions: t.Dict[str, t.Callable] = {}
- #: A data structure of registered error handlers, in the format
- #: ``{scope: {code: {class: handler}}}``. The ``scope`` key is
- #: the name of a blueprint the handlers are active for, or
- #: ``None`` for all requests. The ``code`` key is the HTTP
- #: status code for ``HTTPException``, or ``None`` for
- #: other exceptions. The innermost dictionary maps exception
- #: classes to handler functions.
- #:
- #: To register an error handler, use the :meth:`errorhandler`
- #: decorator.
- #:
- #: This data structure is internal. It should not be modified
- #: directly and its format may change at any time.
- self.error_handler_spec: t.Dict[
- ft.AppOrBlueprintKey,
- t.Dict[t.Optional[int], t.Dict[t.Type[Exception], ft.ErrorHandlerCallable]],
- ] = defaultdict(lambda: defaultdict(dict))
- #: A data structure of functions to call at the beginning of
- #: each request, in the format ``{scope: [functions]}``. The
- #: ``scope`` key is the name of a blueprint the functions are
- #: active for, or ``None`` for all requests.
- #:
- #: To register a function, use the :meth:`before_request`
- #: decorator.
- #:
- #: This data structure is internal. It should not be modified
- #: directly and its format may change at any time.
- self.before_request_funcs: t.Dict[
- ft.AppOrBlueprintKey, t.List[ft.BeforeRequestCallable]
- ] = defaultdict(list)
- #: A data structure of functions to call at the end of each
- #: request, in the format ``{scope: [functions]}``. The
- #: ``scope`` key is the name of a blueprint the functions are
- #: active for, or ``None`` for all requests.
- #:
- #: To register a function, use the :meth:`after_request`
- #: decorator.
- #:
- #: This data structure is internal. It should not be modified
- #: directly and its format may change at any time.
- self.after_request_funcs: t.Dict[
- ft.AppOrBlueprintKey, t.List[ft.AfterRequestCallable]
- ] = defaultdict(list)
- #: A data structure of functions to call at the end of each
- #: request even if an exception is raised, in the format
- #: ``{scope: [functions]}``. The ``scope`` key is the name of a
- #: blueprint the functions are active for, or ``None`` for all
- #: requests.
- #:
- #: To register a function, use the :meth:`teardown_request`
- #: decorator.
- #:
- #: This data structure is internal. It should not be modified
- #: directly and its format may change at any time.
- self.teardown_request_funcs: t.Dict[
- ft.AppOrBlueprintKey, t.List[ft.TeardownCallable]
- ] = defaultdict(list)
- #: A data structure of functions to call to pass extra context
- #: values when rendering templates, in the format
- #: ``{scope: [functions]}``. The ``scope`` key is the name of a
- #: blueprint the functions are active for, or ``None`` for all
- #: requests.
- #:
- #: To register a function, use the :meth:`context_processor`
- #: decorator.
- #:
- #: This data structure is internal. It should not be modified
- #: directly and its format may change at any time.
- self.template_context_processors: t.Dict[
- ft.AppOrBlueprintKey, t.List[ft.TemplateContextProcessorCallable]
- ] = defaultdict(list, {None: [_default_template_ctx_processor]})
- #: A data structure of functions to call to modify the keyword
- #: arguments passed to the view function, in the format
- #: ``{scope: [functions]}``. The ``scope`` key is the name of a
- #: blueprint the functions are active for, or ``None`` for all
- #: requests.
- #:
- #: To register a function, use the
- #: :meth:`url_value_preprocessor` decorator.
- #:
- #: This data structure is internal. It should not be modified
- #: directly and its format may change at any time.
- self.url_value_preprocessors: t.Dict[
- ft.AppOrBlueprintKey,
- t.List[ft.URLValuePreprocessorCallable],
- ] = defaultdict(list)
- #: A data structure of functions to call to modify the keyword
- #: arguments when generating URLs, in the format
- #: ``{scope: [functions]}``. The ``scope`` key is the name of a
- #: blueprint the functions are active for, or ``None`` for all
- #: requests.
- #:
- #: To register a function, use the :meth:`url_defaults`
- #: decorator.
- #:
- #: This data structure is internal. It should not be modified
- #: directly and its format may change at any time.
- self.url_default_functions: t.Dict[
- ft.AppOrBlueprintKey, t.List[ft.URLDefaultCallable]
- ] = defaultdict(list)
- def __repr__(self) -> str:
- return f"<{type(self).__name__} {self.name!r}>"
- def _is_setup_finished(self) -> bool:
- raise NotImplementedError
- @property
- def static_folder(self) -> t.Optional[str]:
- """The absolute path to the configured static folder. ``None``
- if no static folder is set.
- """
- if self._static_folder is not None:
- return os.path.join(self.root_path, self._static_folder)
- else:
- return None
- @static_folder.setter
- def static_folder(self, value: t.Optional[t.Union[str, os.PathLike]]) -> None:
- if value is not None:
- value = os.fspath(value).rstrip(r"\/")
- self._static_folder = value
- @property
- def has_static_folder(self) -> bool:
- """``True`` if :attr:`static_folder` is set.
- .. versionadded:: 0.5
- """
- return self.static_folder is not None
- @property
- def static_url_path(self) -> t.Optional[str]:
- """The URL prefix that the static route will be accessible from.
- If it was not configured during init, it is derived from
- :attr:`static_folder`.
- """
- if self._static_url_path is not None:
- return self._static_url_path
- if self.static_folder is not None:
- basename = os.path.basename(self.static_folder)
- return f"/{basename}".rstrip("/")
- return None
- @static_url_path.setter
- def static_url_path(self, value: t.Optional[str]) -> None:
- if value is not None:
- value = value.rstrip("/")
- self._static_url_path = value
- def get_send_file_max_age(self, filename: t.Optional[str]) -> t.Optional[int]:
- """Used by :func:`send_file` to determine the ``max_age`` cache
- value for a given file path if it wasn't passed.
- By default, this returns :data:`SEND_FILE_MAX_AGE_DEFAULT` from
- the configuration of :data:`~flask.current_app`. This defaults
- to ``None``, which tells the browser to use conditional requests
- instead of a timed cache, which is usually preferable.
- .. versionchanged:: 2.0
- The default configuration is ``None`` instead of 12 hours.
- .. versionadded:: 0.9
- """
- value = current_app.send_file_max_age_default
- if value is None:
- return None
- return int(value.total_seconds())
- def send_static_file(self, filename: str) -> "Response":
- """The view function used to serve files from
- :attr:`static_folder`. A route is automatically registered for
- this view at :attr:`static_url_path` if :attr:`static_folder` is
- set.
- .. versionadded:: 0.5
- """
- if self.module_loader is not None:
- from io import BytesIO
- path = os.path.join(self._builtin_resource_prefix, self._static_folder, filename)
- try:
- data = self.module_loader.get_data(path)
- except IOError:
- data = None
- if data:
- mimetype = mimetypes.guess_type(filename)[0]
- max_age = self.get_send_file_max_age(filename)
- fobj = BytesIO(data)
- # Note: in case of uWSGI, might also need to set
- # `wsgi-disable-file-wrapper = true`
- # because, otherwise, uwsgi expects a `fileno` on it.
- return send_file(fobj, mimetype=mimetype, max_age=max_age, conditional=True)
- if not self.has_static_folder:
- raise RuntimeError("'static_folder' must be set to serve static_files.")
- # send_file only knows to call get_send_file_max_age on the app,
- # call it here so it works for blueprints too.
- max_age = self.get_send_file_max_age(filename)
- return send_from_directory(
- t.cast(str, self.static_folder), filename, max_age=max_age
- )
- @locked_cached_property
- def jinja_loader(self) -> t.Optional[FileSystemLoader]:
- """The Jinja loader for this object's templates. By default this
- is a class :class:`jinja2.loaders.FileSystemLoader` to
- :attr:`template_folder` if it is set.
- .. versionadded:: 0.5
- """
- if self.template_folder is not None:
- return ChoiceLoader([
- FileSystemLoader(os.path.join(self.root_path, self.template_folder)),
- PackageLoader(self.import_name, self.template_folder, skip_unknown_package=True),
- ResourceLoader(os.path.join(self._builtin_resource_prefix, self.template_folder), self.module_loader),
- ])
- else:
- return None
- def open_resource(self, resource: str, mode: str = "rb") -> t.IO[t.AnyStr]:
- """Open a resource file relative to :attr:`root_path` for
- reading.
- For example, if the file ``schema.sql`` is next to the file
- ``app.py`` where the ``Flask`` app is defined, it can be opened
- with:
- .. code-block:: python
- with app.open_resource("schema.sql") as f:
- conn.executescript(f.read())
- :param resource: Path to the resource relative to
- :attr:`root_path`.
- :param mode: Open the file in this mode. Only reading is
- supported, valid values are "r" (or "rt") and "rb".
- """
- if mode not in {"r", "rt", "rb"}:
- raise ValueError("Resources can only be opened for reading.")
- return open(os.path.join(self.root_path, resource), mode)
- def _method_route(
- self,
- method: str,
- rule: str,
- options: dict,
- ) -> t.Callable[[ft.RouteDecorator], ft.RouteDecorator]:
- if "methods" in options:
- raise TypeError("Use the 'route' decorator to use the 'methods' argument.")
- return self.route(rule, methods=[method], **options)
- def get(
- self, rule: str, **options: t.Any
- ) -> t.Callable[[ft.RouteDecorator], ft.RouteDecorator]:
- """Shortcut for :meth:`route` with ``methods=["GET"]``.
- .. versionadded:: 2.0
- """
- return self._method_route("GET", rule, options)
- def post(
- self, rule: str, **options: t.Any
- ) -> t.Callable[[ft.RouteDecorator], ft.RouteDecorator]:
- """Shortcut for :meth:`route` with ``methods=["POST"]``.
- .. versionadded:: 2.0
- """
- return self._method_route("POST", rule, options)
- def put(
- self, rule: str, **options: t.Any
- ) -> t.Callable[[ft.RouteDecorator], ft.RouteDecorator]:
- """Shortcut for :meth:`route` with ``methods=["PUT"]``.
- .. versionadded:: 2.0
- """
- return self._method_route("PUT", rule, options)
- def delete(
- self, rule: str, **options: t.Any
- ) -> t.Callable[[ft.RouteDecorator], ft.RouteDecorator]:
- """Shortcut for :meth:`route` with ``methods=["DELETE"]``.
- .. versionadded:: 2.0
- """
- return self._method_route("DELETE", rule, options)
- def patch(
- self, rule: str, **options: t.Any
- ) -> t.Callable[[ft.RouteDecorator], ft.RouteDecorator]:
- """Shortcut for :meth:`route` with ``methods=["PATCH"]``.
- .. versionadded:: 2.0
- """
- return self._method_route("PATCH", rule, options)
- def route(
- self, rule: str, **options: t.Any
- ) -> t.Callable[[ft.RouteDecorator], ft.RouteDecorator]:
- """Decorate a view function to register it with the given URL
- rule and options. Calls :meth:`add_url_rule`, which has more
- details about the implementation.
- .. code-block:: python
- @app.route("/")
- def index():
- return "Hello, World!"
- See :ref:`url-route-registrations`.
- The endpoint name for the route defaults to the name of the view
- function if the ``endpoint`` parameter isn't passed.
- The ``methods`` parameter defaults to ``["GET"]``. ``HEAD`` and
- ``OPTIONS`` are added automatically.
- :param rule: The URL rule string.
- :param options: Extra options passed to the
- :class:`~werkzeug.routing.Rule` object.
- """
- def decorator(f: ft.RouteDecorator) -> ft.RouteDecorator:
- endpoint = options.pop("endpoint", None)
- self.add_url_rule(rule, endpoint, f, **options)
- return f
- return decorator
- @setupmethod
- def add_url_rule(
- self,
- rule: str,
- endpoint: t.Optional[str] = None,
- view_func: t.Optional[ft.ViewCallable] = None,
- provide_automatic_options: t.Optional[bool] = None,
- **options: t.Any,
- ) -> None:
- """Register a rule for routing incoming requests and building
- URLs. The :meth:`route` decorator is a shortcut to call this
- with the ``view_func`` argument. These are equivalent:
- .. code-block:: python
- @app.route("/")
- def index():
- ...
- .. code-block:: python
- def index():
- ...
- app.add_url_rule("/", view_func=index)
- See :ref:`url-route-registrations`.
- The endpoint name for the route defaults to the name of the view
- function if the ``endpoint`` parameter isn't passed. An error
- will be raised if a function has already been registered for the
- endpoint.
- The ``methods`` parameter defaults to ``["GET"]``. ``HEAD`` is
- always added automatically, and ``OPTIONS`` is added
- automatically by default.
- ``view_func`` does not necessarily need to be passed, but if the
- rule should participate in routing an endpoint name must be
- associated with a view function at some point with the
- :meth:`endpoint` decorator.
- .. code-block:: python
- app.add_url_rule("/", endpoint="index")
- @app.endpoint("index")
- def index():
- ...
- If ``view_func`` has a ``required_methods`` attribute, those
- methods are added to the passed and automatic methods. If it
- has a ``provide_automatic_methods`` attribute, it is used as the
- default if the parameter is not passed.
- :param rule: The URL rule string.
- :param endpoint: The endpoint name to associate with the rule
- and view function. Used when routing and building URLs.
- Defaults to ``view_func.__name__``.
- :param view_func: The view function to associate with the
- endpoint name.
- :param provide_automatic_options: Add the ``OPTIONS`` method and
- respond to ``OPTIONS`` requests automatically.
- :param options: Extra options passed to the
- :class:`~werkzeug.routing.Rule` object.
- """
- raise NotImplementedError
- def endpoint(self, endpoint: str) -> t.Callable:
- """Decorate a view function to register it for the given
- endpoint. Used if a rule is added without a ``view_func`` with
- :meth:`add_url_rule`.
- .. code-block:: python
- app.add_url_rule("/ex", endpoint="example")
- @app.endpoint("example")
- def example():
- ...
- :param endpoint: The endpoint name to associate with the view
- function.
- """
- def decorator(f):
- self.view_functions[endpoint] = f
- return f
- return decorator
- @setupmethod
- def before_request(self, f: ft.BeforeRequestCallable) -> ft.BeforeRequestCallable:
- """Register a function to run before each request.
- For example, this can be used to open a database connection, or
- to load the logged in user from the session.
- .. code-block:: python
- @app.before_request
- def load_user():
- if "user_id" in session:
- g.user = db.session.get(session["user_id"])
- The function will be called without any arguments. If it returns
- a non-``None`` value, the value is handled as if it was the
- return value from the view, and further request handling is
- stopped.
- """
- self.before_request_funcs.setdefault(None, []).append(f)
- return f
- @setupmethod
- def after_request(self, f: ft.AfterRequestCallable) -> ft.AfterRequestCallable:
- """Register a function to run after each request to this object.
- The function is called with the response object, and must return
- a response object. This allows the functions to modify or
- replace the response before it is sent.
- If a function raises an exception, any remaining
- ``after_request`` functions will not be called. Therefore, this
- should not be used for actions that must execute, such as to
- close resources. Use :meth:`teardown_request` for that.
- """
- self.after_request_funcs.setdefault(None, []).append(f)
- return f
- @setupmethod
- def teardown_request(self, f: ft.TeardownCallable) -> ft.TeardownCallable:
- """Register a function to be run at the end of each request,
- regardless of whether there was an exception or not. These functions
- are executed when the request context is popped, even if not an
- actual request was performed.
- Example::
- ctx = app.test_request_context()
- ctx.push()
- ...
- ctx.pop()
- When ``ctx.pop()`` is executed in the above example, the teardown
- functions are called just before the request context moves from the
- stack of active contexts. This becomes relevant if you are using
- such constructs in tests.
- Teardown functions must avoid raising exceptions. If
- they execute code that might fail they
- will have to surround the execution of that code with try/except
- statements and log any errors.
- When a teardown function was called because of an exception it will
- be passed an error object.
- The return values of teardown functions are ignored.
- .. admonition:: Debug Note
- In debug mode Flask will not tear down a request on an exception
- immediately. Instead it will keep it alive so that the interactive
- debugger can still access it. This behavior can be controlled
- by the ``PRESERVE_CONTEXT_ON_EXCEPTION`` configuration variable.
- """
- self.teardown_request_funcs.setdefault(None, []).append(f)
- return f
- @setupmethod
- def context_processor(
- self, f: ft.TemplateContextProcessorCallable
- ) -> ft.TemplateContextProcessorCallable:
- """Registers a template context processor function."""
- self.template_context_processors[None].append(f)
- return f
- @setupmethod
- def url_value_preprocessor(
- self, f: ft.URLValuePreprocessorCallable
- ) -> ft.URLValuePreprocessorCallable:
- """Register a URL value preprocessor function for all view
- functions in the application. These functions will be called before the
- :meth:`before_request` functions.
- The function can modify the values captured from the matched url before
- they are passed to the view. For example, this can be used to pop a
- common language code value and place it in ``g`` rather than pass it to
- every view.
- The function is passed the endpoint name and values dict. The return
- value is ignored.
- """
- self.url_value_preprocessors[None].append(f)
- return f
- @setupmethod
- def url_defaults(self, f: ft.URLDefaultCallable) -> ft.URLDefaultCallable:
- """Callback function for URL defaults for all view functions of the
- application. It's called with the endpoint and values and should
- update the values passed in place.
- """
- self.url_default_functions[None].append(f)
- return f
- @setupmethod
- def errorhandler(
- self, code_or_exception: t.Union[t.Type[Exception], int]
- ) -> t.Callable[[ft.ErrorHandlerDecorator], ft.ErrorHandlerDecorator]:
- """Register a function to handle errors by code or exception class.
- A decorator that is used to register a function given an
- error code. Example::
- @app.errorhandler(404)
- def page_not_found(error):
- return 'This page does not exist', 404
- You can also register handlers for arbitrary exceptions::
- @app.errorhandler(DatabaseError)
- def special_exception_handler(error):
- return 'Database connection failed', 500
- .. versionadded:: 0.7
- Use :meth:`register_error_handler` instead of modifying
- :attr:`error_handler_spec` directly, for application wide error
- handlers.
- .. versionadded:: 0.7
- One can now additionally also register custom exception types
- that do not necessarily have to be a subclass of the
- :class:`~werkzeug.exceptions.HTTPException` class.
- :param code_or_exception: the code as integer for the handler, or
- an arbitrary exception
- """
- def decorator(f: ft.ErrorHandlerDecorator) -> ft.ErrorHandlerDecorator:
- self.register_error_handler(code_or_exception, f)
- return f
- return decorator
- @setupmethod
- def register_error_handler(
- self,
- code_or_exception: t.Union[t.Type[Exception], int],
- f: ft.ErrorHandlerCallable,
- ) -> None:
- """Alternative error attach function to the :meth:`errorhandler`
- decorator that is more straightforward to use for non decorator
- usage.
- .. versionadded:: 0.7
- """
- if isinstance(code_or_exception, HTTPException): # old broken behavior
- raise ValueError(
- "Tried to register a handler for an exception instance"
- f" {code_or_exception!r}. Handlers can only be"
- " registered for exception classes or HTTP error codes."
- )
- try:
- exc_class, code = self._get_exc_class_and_code(code_or_exception)
- except KeyError:
- raise KeyError(
- f"'{code_or_exception}' is not a recognized HTTP error"
- " code. Use a subclass of HTTPException with that code"
- " instead."
- ) from None
- self.error_handler_spec[None][code][exc_class] = f
- @staticmethod
- def _get_exc_class_and_code(
- exc_class_or_code: t.Union[t.Type[Exception], int]
- ) -> t.Tuple[t.Type[Exception], t.Optional[int]]:
- """Get the exception class being handled. For HTTP status codes
- or ``HTTPException`` subclasses, return both the exception and
- status code.
- :param exc_class_or_code: Any exception class, or an HTTP status
- code as an integer.
- """
- exc_class: t.Type[Exception]
- if isinstance(exc_class_or_code, int):
- exc_class = default_exceptions[exc_class_or_code]
- else:
- exc_class = exc_class_or_code
- assert issubclass(
- exc_class, Exception
- ), "Custom exceptions must be subclasses of Exception."
- if issubclass(exc_class, HTTPException):
- return exc_class, exc_class.code
- else:
- return exc_class, None
- def _endpoint_from_view_func(view_func: t.Callable) -> str:
- """Internal helper that returns the default endpoint for a given
- function. This always is the function name.
- """
- assert view_func is not None, "expected view func if endpoint is not provided."
- return view_func.__name__
- def _matching_loader_thinks_module_is_package(loader, mod_name):
- """Attempt to figure out if the given name is a package or a module.
- :param: loader: The loader that handled the name.
- :param mod_name: The name of the package or module.
- """
- # Use loader.is_package if it's available.
- if hasattr(loader, "is_package"):
- return loader.is_package(mod_name)
- cls = type(loader)
- # NamespaceLoader doesn't implement is_package, but all names it
- # loads must be packages.
- if cls.__module__ == "_frozen_importlib" and cls.__name__ == "NamespaceLoader":
- return True
- # Otherwise we need to fail with an error that explains what went
- # wrong.
- raise AttributeError(
- f"'{cls.__name__}.is_package()' must be implemented for PEP 302"
- f" import hooks."
- )
- def _path_is_relative_to(path: pathlib.PurePath, base: str) -> bool:
- # Path.is_relative_to doesn't exist until Python 3.9
- try:
- path.relative_to(base)
- return True
- except ValueError:
- return False
- def _find_package_path(import_name):
- """Find the path that contains the package or module."""
- root_mod_name, _, _ = import_name.partition(".")
- try:
- root_spec = importlib.util.find_spec(root_mod_name)
- if root_spec is None:
- raise ValueError("not found")
- # ImportError: the machinery told us it does not exist
- # ValueError:
- # - the module name was invalid
- # - the module name is __main__
- # - *we* raised `ValueError` due to `root_spec` being `None`
- except (ImportError, ValueError):
- pass # handled below
- else:
- # namespace package
- if root_spec.origin in {"namespace", None}:
- package_spec = importlib.util.find_spec(import_name)
- if package_spec is not None and package_spec.submodule_search_locations:
- # Pick the path in the namespace that contains the submodule.
- package_path = pathlib.Path(
- os.path.commonpath(package_spec.submodule_search_locations)
- )
- search_locations = (
- location
- for location in root_spec.submodule_search_locations
- if _path_is_relative_to(package_path, location)
- )
- else:
- # Pick the first path.
- search_locations = iter(root_spec.submodule_search_locations)
- return os.path.dirname(next(search_locations))
- # a package (with __init__.py)
- elif root_spec.submodule_search_locations:
- return os.path.dirname(os.path.dirname(root_spec.origin))
- # just a normal module
- else:
- return os.path.dirname(root_spec.origin)
- # we were unable to find the `package_path` using PEP 451 loaders
- loader = pkgutil.get_loader(root_mod_name)
- if loader is None or root_mod_name == "__main__":
- # import name is not found, or interactive/main module
- return os.getcwd()
- if hasattr(loader, "get_filename"):
- filename = loader.get_filename(root_mod_name)
- elif hasattr(loader, "archive"):
- # zipimporter's loader.archive points to the .egg or .zip file.
- filename = loader.archive
- else:
- # At least one loader is missing both get_filename and archive:
- # Google App Engine's HardenedModulesHook, use __file__.
- filename = importlib.import_module(root_mod_name).__file__
- package_path = os.path.abspath(os.path.dirname(filename))
- # If the imported name is a package, filename is currently pointing
- # to the root of the package, need to get the current directory.
- if _matching_loader_thinks_module_is_package(loader, root_mod_name):
- package_path = os.path.dirname(package_path)
- return package_path
- def find_package(import_name: str):
- """Find the prefix that a package is installed under, and the path
- that it would be imported from.
- The prefix is the directory containing the standard directory
- hierarchy (lib, bin, etc.). If the package is not installed to the
- system (:attr:`sys.prefix`) or a virtualenv (``site-packages``),
- ``None`` is returned.
- The path is the entry in :attr:`sys.path` that contains the package
- for import. If the package is not installed, it's assumed that the
- package was imported from the current working directory.
- """
- package_path = _find_package_path(import_name)
- py_prefix = os.path.abspath(sys.prefix)
- # installed to the system
- if _path_is_relative_to(pathlib.PurePath(package_path), py_prefix):
- return py_prefix, package_path
- site_parent, site_folder = os.path.split(package_path)
- # installed to a virtualenv
- if site_folder.lower() == "site-packages":
- parent, folder = os.path.split(site_parent)
- # Windows (prefix/lib/site-packages)
- if folder.lower() == "lib":
- return parent, package_path
- # Unix (prefix/lib/pythonX.Y/site-packages)
- if os.path.basename(parent).lower() == "lib":
- return os.path.dirname(parent), package_path
- # something else (prefix/site-packages)
- return site_parent, package_path
- # not installed
- return None, package_path
|