123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626 |
- import logging
- from os import PathLike
- from typing import BinaryIO, List, Optional, Set, Union
- from .cd import (
- coherence_ratio,
- encoding_languages,
- mb_encoding_languages,
- merge_coherence_ratios,
- )
- from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE
- from .md import mess_ratio
- from .models import CharsetMatch, CharsetMatches
- from .utils import (
- any_specified_encoding,
- cut_sequence_chunks,
- iana_name,
- identify_sig_or_bom,
- is_cp_similar,
- is_multi_byte_encoding,
- should_strip_sig_or_bom,
- )
- # Will most likely be controversial
- # logging.addLevelName(TRACE, "TRACE")
- logger = logging.getLogger("charset_normalizer")
- explain_handler = logging.StreamHandler()
- explain_handler.setFormatter(
- logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
- )
- def from_bytes(
- sequences: Union[bytes, bytearray],
- steps: int = 5,
- chunk_size: int = 512,
- threshold: float = 0.2,
- cp_isolation: Optional[List[str]] = None,
- cp_exclusion: Optional[List[str]] = None,
- preemptive_behaviour: bool = True,
- explain: bool = False,
- language_threshold: float = 0.1,
- enable_fallback: bool = True,
- ) -> CharsetMatches:
- """
- Given a raw bytes sequence, return the best possibles charset usable to render str objects.
- If there is no results, it is a strong indicator that the source is binary/not text.
- By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
- And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
- The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
- but never take it for granted. Can improve the performance.
- You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
- purpose.
- This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
- By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
- toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
- Custom logging format and handler can be set manually.
- """
- if not isinstance(sequences, (bytearray, bytes)):
- raise TypeError(
- "Expected object of type bytes or bytearray, got: {0}".format(
- type(sequences)
- )
- )
- if explain:
- previous_logger_level: int = logger.level
- logger.addHandler(explain_handler)
- logger.setLevel(TRACE)
- length: int = len(sequences)
- if length == 0:
- logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
- if explain:
- logger.removeHandler(explain_handler)
- logger.setLevel(previous_logger_level or logging.WARNING)
- return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
- if cp_isolation is not None:
- logger.log(
- TRACE,
- "cp_isolation is set. use this flag for debugging purpose. "
- "limited list of encoding allowed : %s.",
- ", ".join(cp_isolation),
- )
- cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
- else:
- cp_isolation = []
- if cp_exclusion is not None:
- logger.log(
- TRACE,
- "cp_exclusion is set. use this flag for debugging purpose. "
- "limited list of encoding excluded : %s.",
- ", ".join(cp_exclusion),
- )
- cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
- else:
- cp_exclusion = []
- if length <= (chunk_size * steps):
- logger.log(
- TRACE,
- "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
- steps,
- chunk_size,
- length,
- )
- steps = 1
- chunk_size = length
- if steps > 1 and length / steps < chunk_size:
- chunk_size = int(length / steps)
- is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
- is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
- if is_too_small_sequence:
- logger.log(
- TRACE,
- "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
- length
- ),
- )
- elif is_too_large_sequence:
- logger.log(
- TRACE,
- "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
- length
- ),
- )
- prioritized_encodings: List[str] = []
- specified_encoding: Optional[str] = (
- any_specified_encoding(sequences) if preemptive_behaviour else None
- )
- if specified_encoding is not None:
- prioritized_encodings.append(specified_encoding)
- logger.log(
- TRACE,
- "Detected declarative mark in sequence. Priority +1 given for %s.",
- specified_encoding,
- )
- tested: Set[str] = set()
- tested_but_hard_failure: List[str] = []
- tested_but_soft_failure: List[str] = []
- fallback_ascii: Optional[CharsetMatch] = None
- fallback_u8: Optional[CharsetMatch] = None
- fallback_specified: Optional[CharsetMatch] = None
- results: CharsetMatches = CharsetMatches()
- sig_encoding, sig_payload = identify_sig_or_bom(sequences)
- if sig_encoding is not None:
- prioritized_encodings.append(sig_encoding)
- logger.log(
- TRACE,
- "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
- len(sig_payload),
- sig_encoding,
- )
- prioritized_encodings.append("ascii")
- if "utf_8" not in prioritized_encodings:
- prioritized_encodings.append("utf_8")
- for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
- if cp_isolation and encoding_iana not in cp_isolation:
- continue
- if cp_exclusion and encoding_iana in cp_exclusion:
- continue
- if encoding_iana in tested:
- continue
- tested.add(encoding_iana)
- decoded_payload: Optional[str] = None
- bom_or_sig_available: bool = sig_encoding == encoding_iana
- strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
- encoding_iana
- )
- if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
- logger.log(
- TRACE,
- "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
- encoding_iana,
- )
- continue
- if encoding_iana in {"utf_7"} and not bom_or_sig_available:
- logger.log(
- TRACE,
- "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
- encoding_iana,
- )
- continue
- try:
- is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
- except (ModuleNotFoundError, ImportError):
- logger.log(
- TRACE,
- "Encoding %s does not provide an IncrementalDecoder",
- encoding_iana,
- )
- continue
- try:
- if is_too_large_sequence and is_multi_byte_decoder is False:
- str(
- sequences[: int(50e4)]
- if strip_sig_or_bom is False
- else sequences[len(sig_payload) : int(50e4)],
- encoding=encoding_iana,
- )
- else:
- decoded_payload = str(
- sequences
- if strip_sig_or_bom is False
- else sequences[len(sig_payload) :],
- encoding=encoding_iana,
- )
- except (UnicodeDecodeError, LookupError) as e:
- if not isinstance(e, LookupError):
- logger.log(
- TRACE,
- "Code page %s does not fit given bytes sequence at ALL. %s",
- encoding_iana,
- str(e),
- )
- tested_but_hard_failure.append(encoding_iana)
- continue
- similar_soft_failure_test: bool = False
- for encoding_soft_failed in tested_but_soft_failure:
- if is_cp_similar(encoding_iana, encoding_soft_failed):
- similar_soft_failure_test = True
- break
- if similar_soft_failure_test:
- logger.log(
- TRACE,
- "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!",
- encoding_iana,
- encoding_soft_failed,
- )
- continue
- r_ = range(
- 0 if not bom_or_sig_available else len(sig_payload),
- length,
- int(length / steps),
- )
- multi_byte_bonus: bool = (
- is_multi_byte_decoder
- and decoded_payload is not None
- and len(decoded_payload) < length
- )
- if multi_byte_bonus:
- logger.log(
- TRACE,
- "Code page %s is a multi byte encoding table and it appear that at least one character "
- "was encoded using n-bytes.",
- encoding_iana,
- )
- max_chunk_gave_up: int = int(len(r_) / 4)
- max_chunk_gave_up = max(max_chunk_gave_up, 2)
- early_stop_count: int = 0
- lazy_str_hard_failure = False
- md_chunks: List[str] = []
- md_ratios = []
- try:
- for chunk in cut_sequence_chunks(
- sequences,
- encoding_iana,
- r_,
- chunk_size,
- bom_or_sig_available,
- strip_sig_or_bom,
- sig_payload,
- is_multi_byte_decoder,
- decoded_payload,
- ):
- md_chunks.append(chunk)
- md_ratios.append(
- mess_ratio(
- chunk,
- threshold,
- explain is True and 1 <= len(cp_isolation) <= 2,
- )
- )
- if md_ratios[-1] >= threshold:
- early_stop_count += 1
- if (early_stop_count >= max_chunk_gave_up) or (
- bom_or_sig_available and strip_sig_or_bom is False
- ):
- break
- except (
- UnicodeDecodeError
- ) as e: # Lazy str loading may have missed something there
- logger.log(
- TRACE,
- "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
- encoding_iana,
- str(e),
- )
- early_stop_count = max_chunk_gave_up
- lazy_str_hard_failure = True
- # We might want to check the sequence again with the whole content
- # Only if initial MD tests passes
- if (
- not lazy_str_hard_failure
- and is_too_large_sequence
- and not is_multi_byte_decoder
- ):
- try:
- sequences[int(50e3) :].decode(encoding_iana, errors="strict")
- except UnicodeDecodeError as e:
- logger.log(
- TRACE,
- "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
- encoding_iana,
- str(e),
- )
- tested_but_hard_failure.append(encoding_iana)
- continue
- mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
- if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
- tested_but_soft_failure.append(encoding_iana)
- logger.log(
- TRACE,
- "%s was excluded because of initial chaos probing. Gave up %i time(s). "
- "Computed mean chaos is %f %%.",
- encoding_iana,
- early_stop_count,
- round(mean_mess_ratio * 100, ndigits=3),
- )
- # Preparing those fallbacks in case we got nothing.
- if (
- enable_fallback
- and encoding_iana in ["ascii", "utf_8", specified_encoding]
- and not lazy_str_hard_failure
- ):
- fallback_entry = CharsetMatch(
- sequences, encoding_iana, threshold, False, [], decoded_payload
- )
- if encoding_iana == specified_encoding:
- fallback_specified = fallback_entry
- elif encoding_iana == "ascii":
- fallback_ascii = fallback_entry
- else:
- fallback_u8 = fallback_entry
- continue
- logger.log(
- TRACE,
- "%s passed initial chaos probing. Mean measured chaos is %f %%",
- encoding_iana,
- round(mean_mess_ratio * 100, ndigits=3),
- )
- if not is_multi_byte_decoder:
- target_languages: List[str] = encoding_languages(encoding_iana)
- else:
- target_languages = mb_encoding_languages(encoding_iana)
- if target_languages:
- logger.log(
- TRACE,
- "{} should target any language(s) of {}".format(
- encoding_iana, str(target_languages)
- ),
- )
- cd_ratios = []
- # We shall skip the CD when its about ASCII
- # Most of the time its not relevant to run "language-detection" on it.
- if encoding_iana != "ascii":
- for chunk in md_chunks:
- chunk_languages = coherence_ratio(
- chunk,
- language_threshold,
- ",".join(target_languages) if target_languages else None,
- )
- cd_ratios.append(chunk_languages)
- cd_ratios_merged = merge_coherence_ratios(cd_ratios)
- if cd_ratios_merged:
- logger.log(
- TRACE,
- "We detected language {} using {}".format(
- cd_ratios_merged, encoding_iana
- ),
- )
- results.append(
- CharsetMatch(
- sequences,
- encoding_iana,
- mean_mess_ratio,
- bom_or_sig_available,
- cd_ratios_merged,
- decoded_payload,
- )
- )
- if (
- encoding_iana in [specified_encoding, "ascii", "utf_8"]
- and mean_mess_ratio < 0.1
- ):
- logger.debug(
- "Encoding detection: %s is most likely the one.", encoding_iana
- )
- if explain:
- logger.removeHandler(explain_handler)
- logger.setLevel(previous_logger_level)
- return CharsetMatches([results[encoding_iana]])
- if encoding_iana == sig_encoding:
- logger.debug(
- "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
- "the beginning of the sequence.",
- encoding_iana,
- )
- if explain:
- logger.removeHandler(explain_handler)
- logger.setLevel(previous_logger_level)
- return CharsetMatches([results[encoding_iana]])
- if len(results) == 0:
- if fallback_u8 or fallback_ascii or fallback_specified:
- logger.log(
- TRACE,
- "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
- )
- if fallback_specified:
- logger.debug(
- "Encoding detection: %s will be used as a fallback match",
- fallback_specified.encoding,
- )
- results.append(fallback_specified)
- elif (
- (fallback_u8 and fallback_ascii is None)
- or (
- fallback_u8
- and fallback_ascii
- and fallback_u8.fingerprint != fallback_ascii.fingerprint
- )
- or (fallback_u8 is not None)
- ):
- logger.debug("Encoding detection: utf_8 will be used as a fallback match")
- results.append(fallback_u8)
- elif fallback_ascii:
- logger.debug("Encoding detection: ascii will be used as a fallback match")
- results.append(fallback_ascii)
- if results:
- logger.debug(
- "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
- results.best().encoding, # type: ignore
- len(results) - 1,
- )
- else:
- logger.debug("Encoding detection: Unable to determine any suitable charset.")
- if explain:
- logger.removeHandler(explain_handler)
- logger.setLevel(previous_logger_level)
- return results
- def from_fp(
- fp: BinaryIO,
- steps: int = 5,
- chunk_size: int = 512,
- threshold: float = 0.20,
- cp_isolation: Optional[List[str]] = None,
- cp_exclusion: Optional[List[str]] = None,
- preemptive_behaviour: bool = True,
- explain: bool = False,
- language_threshold: float = 0.1,
- enable_fallback: bool = True,
- ) -> CharsetMatches:
- """
- Same thing than the function from_bytes but using a file pointer that is already ready.
- Will not close the file pointer.
- """
- return from_bytes(
- fp.read(),
- steps,
- chunk_size,
- threshold,
- cp_isolation,
- cp_exclusion,
- preemptive_behaviour,
- explain,
- language_threshold,
- enable_fallback,
- )
- def from_path(
- path: Union[str, bytes, PathLike], # type: ignore[type-arg]
- steps: int = 5,
- chunk_size: int = 512,
- threshold: float = 0.20,
- cp_isolation: Optional[List[str]] = None,
- cp_exclusion: Optional[List[str]] = None,
- preemptive_behaviour: bool = True,
- explain: bool = False,
- language_threshold: float = 0.1,
- enable_fallback: bool = True,
- ) -> CharsetMatches:
- """
- Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
- Can raise IOError.
- """
- with open(path, "rb") as fp:
- return from_fp(
- fp,
- steps,
- chunk_size,
- threshold,
- cp_isolation,
- cp_exclusion,
- preemptive_behaviour,
- explain,
- language_threshold,
- enable_fallback,
- )
- def is_binary(
- fp_or_path_or_payload: Union[PathLike, str, BinaryIO, bytes], # type: ignore[type-arg]
- steps: int = 5,
- chunk_size: int = 512,
- threshold: float = 0.20,
- cp_isolation: Optional[List[str]] = None,
- cp_exclusion: Optional[List[str]] = None,
- preemptive_behaviour: bool = True,
- explain: bool = False,
- language_threshold: float = 0.1,
- enable_fallback: bool = False,
- ) -> bool:
- """
- Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
- Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
- are disabled to be stricter around ASCII-compatible but unlikely to be a string.
- """
- if isinstance(fp_or_path_or_payload, (str, PathLike)):
- guesses = from_path(
- fp_or_path_or_payload,
- steps=steps,
- chunk_size=chunk_size,
- threshold=threshold,
- cp_isolation=cp_isolation,
- cp_exclusion=cp_exclusion,
- preemptive_behaviour=preemptive_behaviour,
- explain=explain,
- language_threshold=language_threshold,
- enable_fallback=enable_fallback,
- )
- elif isinstance(
- fp_or_path_or_payload,
- (
- bytes,
- bytearray,
- ),
- ):
- guesses = from_bytes(
- fp_or_path_or_payload,
- steps=steps,
- chunk_size=chunk_size,
- threshold=threshold,
- cp_isolation=cp_isolation,
- cp_exclusion=cp_exclusion,
- preemptive_behaviour=preemptive_behaviour,
- explain=explain,
- language_threshold=language_threshold,
- enable_fallback=enable_fallback,
- )
- else:
- guesses = from_fp(
- fp_or_path_or_payload,
- steps=steps,
- chunk_size=chunk_size,
- threshold=threshold,
- cp_isolation=cp_isolation,
- cp_exclusion=cp_exclusion,
- preemptive_behaviour=preemptive_behaviour,
- explain=explain,
- language_threshold=language_threshold,
- enable_fallback=enable_fallback,
- )
- return not guesses
|