strings.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. import locale
  2. import logging
  3. import six
  4. import sys
  5. import codecs
  6. import library.python.func
  7. logger = logging.getLogger(__name__)
  8. DEFAULT_ENCODING = 'utf-8'
  9. ENCODING_ERRORS_POLICY = 'replace'
  10. def left_strip(el, prefix):
  11. """
  12. Strips prefix at the left of el
  13. """
  14. if el.startswith(prefix):
  15. return el[len(prefix) :]
  16. return el
  17. # Explicit to-text conversion
  18. # Chooses between str/unicode, i.e. six.binary_type/six.text_type
  19. def to_basestring(value):
  20. if isinstance(value, (six.binary_type, six.text_type)):
  21. return value
  22. try:
  23. if six.PY2:
  24. return unicode(value) # noqa
  25. else:
  26. return str(value)
  27. except UnicodeDecodeError:
  28. try:
  29. return str(value)
  30. except UnicodeEncodeError:
  31. return repr(value)
  32. to_text = to_basestring
  33. def to_unicode(value, from_enc=DEFAULT_ENCODING):
  34. if isinstance(value, six.text_type):
  35. return value
  36. if isinstance(value, six.binary_type):
  37. if six.PY2:
  38. return unicode(value, from_enc, ENCODING_ERRORS_POLICY) # noqa
  39. else:
  40. return value.decode(from_enc, errors=ENCODING_ERRORS_POLICY)
  41. return six.text_type(value)
  42. # Optional from_enc enables transcoding
  43. def to_str(value, to_enc=DEFAULT_ENCODING, from_enc=None):
  44. if isinstance(value, six.binary_type):
  45. if from_enc is None or to_enc == from_enc:
  46. # Unknown input encoding or input and output encoding are the same
  47. return value
  48. value = to_unicode(value, from_enc=from_enc)
  49. if isinstance(value, six.text_type):
  50. return value.encode(to_enc, ENCODING_ERRORS_POLICY)
  51. return six.binary_type(value)
  52. def _convert_deep(x, enc, convert, relaxed=True):
  53. if x is None:
  54. return None
  55. if isinstance(x, (six.text_type, six.binary_type)):
  56. return convert(x, enc)
  57. if isinstance(x, dict):
  58. return {convert(k, enc): _convert_deep(v, enc, convert, relaxed) for k, v in six.iteritems(x)}
  59. if isinstance(x, list):
  60. return [_convert_deep(e, enc, convert, relaxed) for e in x]
  61. if isinstance(x, tuple):
  62. return tuple([_convert_deep(e, enc, convert, relaxed) for e in x])
  63. if relaxed:
  64. return x
  65. raise TypeError('unsupported type')
  66. # Result as from six.ensure_text
  67. def unicodize_deep(x, enc=DEFAULT_ENCODING, relaxed=True):
  68. return _convert_deep(x, enc, to_unicode, relaxed)
  69. # Result as from six.ensure_str
  70. def ensure_str_deep(x, enc=DEFAULT_ENCODING, relaxed=True):
  71. return _convert_deep(x, enc, six.ensure_str, relaxed)
  72. # Result as from six.ensure_binary
  73. def stringize_deep(x, enc=DEFAULT_ENCODING, relaxed=True):
  74. return _convert_deep(x, enc, to_str, relaxed)
  75. @library.python.func.memoize()
  76. def locale_encoding():
  77. try:
  78. if six.PY3:
  79. loc = locale.getencoding()
  80. else:
  81. loc = locale.getdefaultlocale()[1]
  82. if loc:
  83. codecs.lookup(loc)
  84. return loc
  85. except LookupError as e:
  86. logger.debug('Cannot get system locale: %s', e)
  87. return None
  88. except ValueError as e:
  89. logger.debug('Cannot get system locale: %s', e)
  90. return None
  91. def fs_encoding():
  92. return sys.getfilesystemencoding()
  93. def guess_default_encoding():
  94. enc = locale_encoding()
  95. return enc if enc else DEFAULT_ENCODING
  96. @library.python.func.memoize()
  97. def get_stream_encoding(stream):
  98. if stream.encoding:
  99. try:
  100. codecs.lookup(stream.encoding)
  101. return stream.encoding
  102. except LookupError:
  103. pass
  104. return DEFAULT_ENCODING
  105. def encode(value, encoding=DEFAULT_ENCODING):
  106. if isinstance(value, six.binary_type):
  107. value = value.decode(encoding, errors='ignore')
  108. return value.encode(encoding)
  109. class Whence(object):
  110. Start = 0
  111. End = 1
  112. Middle = 2
  113. def truncate(data, limit, whence=None, msg=None):
  114. msg = "..." if msg is None else msg
  115. msg = six.ensure_binary(msg)
  116. whence = Whence.End if whence is None else whence
  117. data = six.ensure_binary(data)
  118. if len(data) <= limit:
  119. return six.ensure_str(data)
  120. text_limit = limit - len(msg)
  121. assert text_limit >= 0
  122. if whence == Whence.Start:
  123. data = msg + data[-text_limit:]
  124. elif whence == Whence.End:
  125. data = data[:text_limit] + msg
  126. elif whence == Whence.Middle:
  127. headpos = limit // 2 - len(msg) // 2
  128. tailpos = len(data) - (text_limit - headpos)
  129. data = data[:headpos] + msg + data[tailpos:]
  130. else:
  131. raise AssertionError("Unknown whence: %s" % str(whence))
  132. return fix_utf8(data)
  133. def fix_utf8(data):
  134. # type: (six.string_types) -> str
  135. # remove destroyed symbol code
  136. udata = six.ensure_text(data, 'utf-8', 'ignore')
  137. return six.ensure_str(udata, 'utf-8', errors='ignore')
  138. _hexdig = "0123456789ABCDEFabcdef"
  139. _hextobyte = {
  140. (a + b).encode(): bytes.fromhex(a + b) if six.PY3 else (a + b).decode("hex") for a in _hexdig for b in _hexdig
  141. }
  142. def parse_qs_binary(qs, keep_blank_values=False, strict_parsing=False, max_num_fields=None, separator=b'&'):
  143. """Parse a query like original `parse_qs` from `urlparse`, `urllib.parse`, but query given as a bytes argument.
  144. Arguments:
  145. qs: percent-encoded query string to be parsed
  146. keep_blank_values: flag indicating whether blank values in
  147. percent-encoded queries should be treated as blank byte strings.
  148. A true value indicates that blanks should be retained as
  149. blank byte strings. The default false value indicates that
  150. blank values are to be ignored and treated as if they were
  151. not included.
  152. strict_parsing: flag indicating what to do with parsing errors.
  153. If false (the default), errors are silently ignored.
  154. If true, errors raise a ValueError exception.
  155. max_num_fields: int. If set, then throws a ValueError if there
  156. are more than n fields read by parse_qsl_binary().
  157. separator: bytes. The symbol to use for separating the query arguments.
  158. Defaults to &.
  159. Returns a dictionary.
  160. """
  161. parsed_result = {}
  162. pairs = parse_qsl_binary(qs, keep_blank_values, strict_parsing, max_num_fields=max_num_fields, separator=separator)
  163. for name, value in pairs:
  164. if name in parsed_result:
  165. parsed_result[name].append(value)
  166. else:
  167. parsed_result[name] = [value]
  168. return parsed_result
  169. def parse_qsl_binary(qs, keep_blank_values=False, strict_parsing=False, max_num_fields=None, separator=b'&'):
  170. """Parse a query like original `parse_qs` from `urlparse`, `urllib.parse`, but query given as a bytes argument.
  171. Arguments:
  172. qs: percent-encoded query bytes to be parsed
  173. keep_blank_values: flag indicating whether blank values in
  174. percent-encoded queries should be treated as blank byte strings.
  175. A true value indicates that blanks should be retained as blank
  176. byte strings. The default false value indicates that blank values
  177. are to be ignored and treated as if they were not included.
  178. strict_parsing: flag indicating what to do with parsing errors. If
  179. false (the default), errors are silently ignored. If true,
  180. errors raise a ValueError exception.
  181. max_num_fields: int. If set, then throws a ValueError
  182. if there are more than n fields read by parse_qsl_binary().
  183. separator: bytes. The symbol to use for separating the query arguments.
  184. Defaults to &.
  185. Returns a list.
  186. """
  187. if max_num_fields is not None:
  188. num_fields = 1 + qs.count(separator) if qs else 0
  189. if max_num_fields < num_fields:
  190. raise ValueError('Max number of fields exceeded')
  191. r = []
  192. query_args = qs.split(separator) if qs else []
  193. for name_value in query_args:
  194. if not name_value and not strict_parsing:
  195. continue
  196. nv = name_value.split(b'=', 1)
  197. if len(nv) != 2:
  198. if strict_parsing:
  199. raise ValueError("bad query field: %r" % (name_value,))
  200. # Handle case of a control-name with no equal sign
  201. if keep_blank_values:
  202. nv.append(b'')
  203. else:
  204. continue
  205. if len(nv[1]) or keep_blank_values:
  206. name = nv[0].replace(b'+', b' ')
  207. name = unquote_binary(name)
  208. value = nv[1].replace(b'+', b' ')
  209. value = unquote_binary(value)
  210. r.append((name, value))
  211. return r
  212. def unquote_binary(string):
  213. """Replace %xx escapes by their single-character equivalent.
  214. By default, percent-encoded sequences are replaced by ASCII character or
  215. byte code, and invalid sequences are replaced by a placeholder character.
  216. unquote('abc%20def') -> 'abc def'
  217. unquote('abc%FFdef') -> 'abc\xffdef'
  218. unquote('%no') -> '%no'
  219. """
  220. bits = string.split(b"%")
  221. if len(bits) == 1:
  222. return bits[0]
  223. res = [bits[0]]
  224. for item in bits[1:]:
  225. res.append(_hextobyte.get(item[:2], b"%"))
  226. res.append(item if res[-1] == b"%" else item[2:])
  227. return b"".join(res)