123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- from __future__ import annotations
- import re
- from abc import ABC
- from dataclasses import dataclass
- from typing import Any
- from .common import RequestHandler, register_preference
- from .exceptions import UnsupportedRequest
- from ..compat.types import NoneType
- from ..utils import classproperty, join_nonempty
- from ..utils.networking import std_headers
- @dataclass(order=True, frozen=True)
- class ImpersonateTarget:
- """
- A target for browser impersonation.
- Parameters:
- @param client: the client to impersonate
- @param version: the client version to impersonate
- @param os: the client OS to impersonate
- @param os_version: the client OS version to impersonate
- Note: None is used to indicate to match any.
- """
- client: str | None = None
- version: str | None = None
- os: str | None = None
- os_version: str | None = None
- def __post_init__(self):
- if self.version and not self.client:
- raise ValueError('client is required if version is set')
- if self.os_version and not self.os:
- raise ValueError('os is required if os_version is set')
- def __contains__(self, target: ImpersonateTarget):
- if not isinstance(target, ImpersonateTarget):
- return False
- return (
- (self.client is None or target.client is None or self.client == target.client)
- and (self.version is None or target.version is None or self.version == target.version)
- and (self.os is None or target.os is None or self.os == target.os)
- and (self.os_version is None or target.os_version is None or self.os_version == target.os_version)
- )
- def __str__(self):
- return f'{join_nonempty(self.client, self.version)}:{join_nonempty(self.os, self.os_version)}'.rstrip(':')
- @classmethod
- def from_str(cls, target: str):
- mobj = re.fullmatch(r'(?:(?P<client>[^:-]+)(?:-(?P<version>[^:-]+))?)?(?::(?:(?P<os>[^:-]+)(?:-(?P<os_version>[^:-]+))?)?)?', target)
- if not mobj:
- raise ValueError(f'Invalid impersonate target "{target}"')
- return cls(**mobj.groupdict())
- class ImpersonateRequestHandler(RequestHandler, ABC):
- """
- Base class for request handlers that support browser impersonation.
- This provides a method for checking the validity of the impersonate extension,
- which can be used in _check_extensions.
- Impersonate targets consist of a client, version, os and os_ver.
- See the ImpersonateTarget class for more details.
- The following may be defined:
- - `_SUPPORTED_IMPERSONATE_TARGET_MAP`: a dict mapping supported targets to custom object.
- Any Request with an impersonate target not in this list will raise an UnsupportedRequest.
- Set to None to disable this check.
- Note: Entries are in order of preference
- Parameters:
- @param impersonate: the default impersonate target to use for requests.
- Set to None to disable impersonation.
- """
- _SUPPORTED_IMPERSONATE_TARGET_MAP: dict[ImpersonateTarget, Any] = {}
- def __init__(self, *, impersonate: ImpersonateTarget = None, **kwargs):
- super().__init__(**kwargs)
- self.impersonate = impersonate
- def _check_impersonate_target(self, target: ImpersonateTarget):
- assert isinstance(target, (ImpersonateTarget, NoneType))
- if target is None or not self.supported_targets:
- return
- if not self.is_supported_target(target):
- raise UnsupportedRequest(f'Unsupported impersonate target: {target}')
- def _check_extensions(self, extensions):
- super()._check_extensions(extensions)
- if 'impersonate' in extensions:
- self._check_impersonate_target(extensions.get('impersonate'))
- def _validate(self, request):
- super()._validate(request)
- self._check_impersonate_target(self.impersonate)
- def _resolve_target(self, target: ImpersonateTarget | None):
- """Resolve a target to a supported target."""
- if target is None:
- return
- for supported_target in self.supported_targets:
- if target in supported_target:
- if self.verbose:
- self._logger.stdout(
- f'{self.RH_NAME}: resolved impersonate target {target} to {supported_target}')
- return supported_target
- @classproperty
- def supported_targets(cls) -> tuple[ImpersonateTarget, ...]:
- return tuple(cls._SUPPORTED_IMPERSONATE_TARGET_MAP.keys())
- def is_supported_target(self, target: ImpersonateTarget):
- assert isinstance(target, ImpersonateTarget)
- return self._resolve_target(target) is not None
- def _get_request_target(self, request):
- """Get the requested target for the request"""
- return self._resolve_target(request.extensions.get('impersonate') or self.impersonate)
- def _get_impersonate_headers(self, request):
- headers = self._merge_headers(request.headers)
- if self._get_request_target(request) is not None:
- # remove all headers present in std_headers
- # TODO: change this to not depend on std_headers
- for k, v in std_headers.items():
- if headers.get(k) == v:
- headers.pop(k)
- return headers
- @register_preference(ImpersonateRequestHandler)
- def impersonate_preference(rh, request):
- if request.extensions.get('impersonate') or rh.impersonate:
- return 1000
- return 0
|