123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- import importlib
- import logging
- import unicodedata
- from codecs import IncrementalDecoder
- from encodings.aliases import aliases
- from functools import lru_cache
- from re import findall
- from typing import Generator, List, Optional, Set, Tuple, Union
- from _multibytecodec import MultibyteIncrementalDecoder
- from .constant import (
- ENCODING_MARKS,
- IANA_SUPPORTED_SIMILAR,
- RE_POSSIBLE_ENCODING_INDICATION,
- UNICODE_RANGES_COMBINED,
- UNICODE_SECONDARY_RANGE_KEYWORD,
- UTF8_MAXIMAL_ALLOCATION,
- )
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_accentuated(character: str) -> bool:
- try:
- description: str = unicodedata.name(character)
- except ValueError:
- return False
- return (
- "WITH GRAVE" in description
- or "WITH ACUTE" in description
- or "WITH CEDILLA" in description
- or "WITH DIAERESIS" in description
- or "WITH CIRCUMFLEX" in description
- or "WITH TILDE" in description
- )
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def remove_accent(character: str) -> str:
- decomposed: str = unicodedata.decomposition(character)
- if not decomposed:
- return character
- codes: List[str] = decomposed.split(" ")
- return chr(int(codes[0], 16))
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def unicode_range(character: str) -> Optional[str]:
- """
- Retrieve the Unicode range official name from a single character.
- """
- character_ord: int = ord(character)
- for range_name, ord_range in UNICODE_RANGES_COMBINED.items():
- if character_ord in ord_range:
- return range_name
- return None
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_latin(character: str) -> bool:
- try:
- description: str = unicodedata.name(character)
- except ValueError:
- return False
- return "LATIN" in description
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_punctuation(character: str) -> bool:
- character_category: str = unicodedata.category(character)
- if "P" in character_category:
- return True
- character_range: Optional[str] = unicode_range(character)
- if character_range is None:
- return False
- return "Punctuation" in character_range
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_symbol(character: str) -> bool:
- character_category: str = unicodedata.category(character)
- if "S" in character_category or "N" in character_category:
- return True
- character_range: Optional[str] = unicode_range(character)
- if character_range is None:
- return False
- return "Forms" in character_range
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_emoticon(character: str) -> bool:
- character_range: Optional[str] = unicode_range(character)
- if character_range is None:
- return False
- return "Emoticons" in character_range
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_separator(character: str) -> bool:
- if character.isspace() or character in {"|", "+", "<", ">"}:
- return True
- character_category: str = unicodedata.category(character)
- return "Z" in character_category or character_category in {"Po", "Pd", "Pc"}
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_case_variable(character: str) -> bool:
- return character.islower() != character.isupper()
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_cjk(character: str) -> bool:
- try:
- character_name = unicodedata.name(character)
- except ValueError:
- return False
- return "CJK" in character_name
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_hiragana(character: str) -> bool:
- try:
- character_name = unicodedata.name(character)
- except ValueError:
- return False
- return "HIRAGANA" in character_name
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_katakana(character: str) -> bool:
- try:
- character_name = unicodedata.name(character)
- except ValueError:
- return False
- return "KATAKANA" in character_name
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_hangul(character: str) -> bool:
- try:
- character_name = unicodedata.name(character)
- except ValueError:
- return False
- return "HANGUL" in character_name
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_thai(character: str) -> bool:
- try:
- character_name = unicodedata.name(character)
- except ValueError:
- return False
- return "THAI" in character_name
- @lru_cache(maxsize=len(UNICODE_RANGES_COMBINED))
- def is_unicode_range_secondary(range_name: str) -> bool:
- return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD)
- @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
- def is_unprintable(character: str) -> bool:
- return (
- character.isspace() is False # includes \n \t \r \v
- and character.isprintable() is False
- and character != "\x1A" # Why? Its the ASCII substitute character.
- and character != "\ufeff" # bug discovered in Python,
- # Zero Width No-Break Space located in Arabic Presentation Forms-B, Unicode 1.1 not acknowledged as space.
- )
- def any_specified_encoding(sequence: bytes, search_zone: int = 8192) -> Optional[str]:
- """
- Extract using ASCII-only decoder any specified encoding in the first n-bytes.
- """
- if not isinstance(sequence, bytes):
- raise TypeError
- seq_len: int = len(sequence)
- results: List[str] = findall(
- RE_POSSIBLE_ENCODING_INDICATION,
- sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"),
- )
- if len(results) == 0:
- return None
- for specified_encoding in results:
- specified_encoding = specified_encoding.lower().replace("-", "_")
- encoding_alias: str
- encoding_iana: str
- for encoding_alias, encoding_iana in aliases.items():
- if encoding_alias == specified_encoding:
- return encoding_iana
- if encoding_iana == specified_encoding:
- return encoding_iana
- return None
- @lru_cache(maxsize=128)
- def is_multi_byte_encoding(name: str) -> bool:
- """
- Verify is a specific encoding is a multi byte one based on it IANA name
- """
- return name in {
- "utf_8",
- "utf_8_sig",
- "utf_16",
- "utf_16_be",
- "utf_16_le",
- "utf_32",
- "utf_32_le",
- "utf_32_be",
- "utf_7",
- } or issubclass(
- importlib.import_module("encodings.{}".format(name)).IncrementalDecoder,
- MultibyteIncrementalDecoder,
- )
- def identify_sig_or_bom(sequence: bytes) -> Tuple[Optional[str], bytes]:
- """
- Identify and extract SIG/BOM in given sequence.
- """
- for iana_encoding in ENCODING_MARKS:
- marks: Union[bytes, List[bytes]] = ENCODING_MARKS[iana_encoding]
- if isinstance(marks, bytes):
- marks = [marks]
- for mark in marks:
- if sequence.startswith(mark):
- return iana_encoding, mark
- return None, b""
- def should_strip_sig_or_bom(iana_encoding: str) -> bool:
- return iana_encoding not in {"utf_16", "utf_32"}
- def iana_name(cp_name: str, strict: bool = True) -> str:
- cp_name = cp_name.lower().replace("-", "_")
- encoding_alias: str
- encoding_iana: str
- for encoding_alias, encoding_iana in aliases.items():
- if cp_name in [encoding_alias, encoding_iana]:
- return encoding_iana
- if strict:
- raise ValueError("Unable to retrieve IANA for '{}'".format(cp_name))
- return cp_name
- def range_scan(decoded_sequence: str) -> List[str]:
- ranges: Set[str] = set()
- for character in decoded_sequence:
- character_range: Optional[str] = unicode_range(character)
- if character_range is None:
- continue
- ranges.add(character_range)
- return list(ranges)
- def cp_similarity(iana_name_a: str, iana_name_b: str) -> float:
- if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b):
- return 0.0
- decoder_a = importlib.import_module(
- "encodings.{}".format(iana_name_a)
- ).IncrementalDecoder
- decoder_b = importlib.import_module(
- "encodings.{}".format(iana_name_b)
- ).IncrementalDecoder
- id_a: IncrementalDecoder = decoder_a(errors="ignore")
- id_b: IncrementalDecoder = decoder_b(errors="ignore")
- character_match_count: int = 0
- for i in range(255):
- to_be_decoded: bytes = bytes([i])
- if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded):
- character_match_count += 1
- return character_match_count / 254
- def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool:
- """
- Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using
- the function cp_similarity.
- """
- return (
- iana_name_a in IANA_SUPPORTED_SIMILAR
- and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a]
- )
- def set_logging_handler(
- name: str = "charset_normalizer",
- level: int = logging.INFO,
- format_string: str = "%(asctime)s | %(levelname)s | %(message)s",
- ) -> None:
- logger = logging.getLogger(name)
- logger.setLevel(level)
- handler = logging.StreamHandler()
- handler.setFormatter(logging.Formatter(format_string))
- logger.addHandler(handler)
- def cut_sequence_chunks(
- sequences: bytes,
- encoding_iana: str,
- offsets: range,
- chunk_size: int,
- bom_or_sig_available: bool,
- strip_sig_or_bom: bool,
- sig_payload: bytes,
- is_multi_byte_decoder: bool,
- decoded_payload: Optional[str] = None,
- ) -> Generator[str, None, None]:
- if decoded_payload and is_multi_byte_decoder is False:
- for i in offsets:
- chunk = decoded_payload[i : i + chunk_size]
- if not chunk:
- break
- yield chunk
- else:
- for i in offsets:
- chunk_end = i + chunk_size
- if chunk_end > len(sequences) + 8:
- continue
- cut_sequence = sequences[i : i + chunk_size]
- if bom_or_sig_available and strip_sig_or_bom is False:
- cut_sequence = sig_payload + cut_sequence
- chunk = cut_sequence.decode(
- encoding_iana,
- errors="ignore" if is_multi_byte_decoder else "strict",
- )
- # multi-byte bad cutting detector and adjustment
- # not the cleanest way to perform that fix but clever enough for now.
- if is_multi_byte_decoder and i > 0:
- chunk_partial_size_chk: int = min(chunk_size, 16)
- if (
- decoded_payload
- and chunk[:chunk_partial_size_chk] not in decoded_payload
- ):
- for j in range(i, i - 4, -1):
- cut_sequence = sequences[j:chunk_end]
- if bom_or_sig_available and strip_sig_or_bom is False:
- cut_sequence = sig_payload + cut_sequence
- chunk = cut_sequence.decode(encoding_iana, errors="ignore")
- if chunk[:chunk_partial_size_chk] in decoded_payload:
- break
- yield chunk
|