safe.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import logging
  2. from collections.abc import Mapping, MutableMapping, Sequence
  3. from typing import Any, Union
  4. from django.conf import settings
  5. from django.utils.encoding import force_str
  6. from django.utils.http import urlencode
  7. from sentry.utils import json
  8. from sentry.utils.strings import truncatechars
  9. PathSearchable = Union[Mapping[str, Any], Sequence[Any], None]
  10. def safe_execute(func, *args, **kwargs):
  11. try:
  12. result = func(*args, **kwargs)
  13. except Exception as e:
  14. if hasattr(func, "im_class"):
  15. cls = func.im_class
  16. else:
  17. cls = func.__class__
  18. func_name = getattr(func, "__name__", str(func))
  19. cls_name = cls.__name__
  20. logger = logging.getLogger(f"sentry.safe.{cls_name.lower()}")
  21. logger.exception("%s.process_error", func_name, extra={"exception": e})
  22. else:
  23. return result
  24. def trim(
  25. value,
  26. max_size=settings.SENTRY_MAX_VARIABLE_SIZE,
  27. max_depth=6,
  28. object_hook=None,
  29. _depth=0,
  30. _size=0,
  31. **kwargs,
  32. ):
  33. """
  34. Truncates a value to ```MAX_VARIABLE_SIZE```.
  35. The method of truncation depends on the type of value.
  36. """
  37. options = {
  38. "max_depth": max_depth,
  39. "max_size": max_size,
  40. "object_hook": object_hook,
  41. "_depth": _depth + 1,
  42. }
  43. if _depth > max_depth:
  44. if not isinstance(value, str):
  45. value = json.dumps(value)
  46. return trim(value, _size=_size, max_size=max_size)
  47. elif isinstance(value, dict):
  48. result: Any = {}
  49. _size += 2
  50. for k in sorted(value.keys(), key=lambda x: (len(force_str(value[x])), x)):
  51. v = value[k]
  52. trim_v = trim(v, _size=_size, **options)
  53. result[k] = trim_v
  54. _size += len(force_str(trim_v)) + 1
  55. if _size >= max_size:
  56. break
  57. elif isinstance(value, (list, tuple)):
  58. result = []
  59. _size += 2
  60. for v in value:
  61. trim_v = trim(v, _size=_size, **options)
  62. result.append(trim_v)
  63. _size += len(force_str(trim_v))
  64. if _size >= max_size:
  65. break
  66. if isinstance(value, tuple):
  67. result = tuple(result)
  68. elif isinstance(value, str):
  69. result = truncatechars(value, max_size - _size)
  70. else:
  71. result = value
  72. if object_hook is None:
  73. return result
  74. return object_hook(result)
  75. def get_path(data: PathSearchable, *path, should_log=False, **kwargs):
  76. """
  77. Safely resolves data from a recursive data structure. A value is only
  78. returned if the full path exists, otherwise ``None`` is returned.
  79. If the ``default`` argument is specified, it is returned instead of ``None``.
  80. If the ``filter`` argument is specified and the value is a list, it is
  81. filtered with the given callback. Alternatively, pass ``True`` as filter to
  82. only filter ``None`` values.
  83. """
  84. logger = logging.getLogger(__name__)
  85. default = kwargs.pop("default", None)
  86. f: bool | None = kwargs.pop("filter", None)
  87. for k in kwargs:
  88. raise TypeError("get_path() got an undefined keyword argument '%s'" % k)
  89. logger_data = {}
  90. if should_log:
  91. logger_data = {
  92. "path_searchable": json.dumps(data),
  93. "path_arg": json.dumps(path),
  94. }
  95. for p in path:
  96. if isinstance(data, Mapping) and p in data:
  97. data = data[p]
  98. elif isinstance(data, (list, tuple)) and isinstance(p, int) and -len(data) <= p < len(data):
  99. data = data[p]
  100. else:
  101. if should_log:
  102. logger_data["invalid_path"] = json.dumps(p)
  103. logger.info("sentry.safe.get_path.invalid_path_section", extra=logger_data)
  104. return default
  105. if should_log:
  106. if data is None:
  107. logger.info("sentry.safe.get_path.iterated_path_is_none", extra=logger_data)
  108. else:
  109. logger_data["iterated_path"] = json.dumps(data)
  110. if f and data and isinstance(data, (list, tuple)):
  111. data = list(filter((lambda x: x is not None) if f is True else f, data))
  112. if should_log and len(data) == 0 and "iterated_path" in logger_data:
  113. logger.info("sentry.safe.get_path.filtered_path_is_none", extra=logger_data)
  114. return data if data is not None else default
  115. def set_path(data, *path, **kwargs):
  116. """
  117. Recursively traverses or creates the specified path and sets the given value
  118. argument. `None` is treated like a missing value. If a non-mapping item is
  119. encountered while traversing, the value is not set.
  120. This function is equivalent to a recursive dict.__setitem__. Returns True if
  121. the value was set, otherwise False.
  122. If the ``overwrite` kwarg is set to False, the value is only set if there is
  123. no existing value or it is None. See ``setdefault_path``.
  124. """
  125. try:
  126. value = kwargs.pop("value")
  127. except KeyError:
  128. raise TypeError("set_path() requires a 'value' keyword argument")
  129. overwrite = kwargs.pop("overwrite", True)
  130. for k in kwargs:
  131. raise TypeError("set_path() got an undefined keyword argument '%s'" % k)
  132. for p in path[:-1]:
  133. if not isinstance(data, MutableMapping):
  134. return False
  135. if data.get(p) is None:
  136. data[p] = {}
  137. data = data[p]
  138. if not isinstance(data, MutableMapping):
  139. return False
  140. p = path[-1]
  141. if overwrite or data.get(p) is None:
  142. data[p] = value
  143. return True
  144. return False
  145. def setdefault_path(data, *path, **kwargs):
  146. """
  147. Recursively traverses or creates the specified path and sets the given value
  148. argument if it does not exist. `None` is treated like a missing value. If a
  149. non-mapping item is encountered while traversing, the value is not set.
  150. This function is equivalent to a recursive dict.setdefault, except for None
  151. values. Returns True if the value was set, otherwise False.
  152. """
  153. kwargs["overwrite"] = False
  154. return set_path(data, *path, **kwargs)
  155. def safe_urlencode(query, **kwargs):
  156. """
  157. django.utils.http.urlencode wrapper that replaces query parameter values
  158. of None with empty string so that urlencode doesn't raise TypeError
  159. "Cannot encode None in a query string".
  160. """
  161. # sequence of 2-element tuples
  162. if isinstance(query, (list, tuple)):
  163. query_seq = ((pair[0], "" if pair[1] is None else pair[1]) for pair in query)
  164. return urlencode(query_seq, **kwargs)
  165. elif isinstance(query, dict):
  166. query_d = {k: "" if v is None else v for k, v in query.items()}
  167. return urlencode(query_d, **kwargs)
  168. else:
  169. return urlencode(query, **kwargs)