utils.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. from __future__ import annotations
  2. import logging
  3. from importlib import import_module
  4. from typing import TYPE_CHECKING, Any
  5. from urllib.parse import parse_qs as urlparse_parse_qs
  6. from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
  7. from urllib.request import urlopen
  8. from django.conf import settings
  9. from django.contrib.contenttypes.models import ContentType
  10. from django.db.models import Model
  11. from sentry.hybridcloud.rpc import RpcModel
  12. from sentry.services.hybrid_cloud.user import RpcUser
  13. LEAVE_CHARS = getattr(settings, "SOCIAL_AUTH_LOG_SANITIZE_LEAVE_CHARS", 4)
  14. if TYPE_CHECKING:
  15. from sentry.services.hybrid_cloud.usersocialauth.model import RpcUserSocialAuth
  16. from social_auth.backends import SocialAuthBackend
  17. from .models import UserSocialAuth
  18. def get_backend(instance: UserSocialAuth | RpcUserSocialAuth) -> type[SocialAuthBackend] | None:
  19. # Make import here to avoid recursive imports :-/
  20. from social_auth.backends import get_backends
  21. return get_backends().get(instance.provider)
  22. def tokens(instance: UserSocialAuth | RpcUserSocialAuth) -> dict[str, Any]:
  23. """Return access_token stored in extra_data or None"""
  24. backend = instance.get_backend()
  25. if backend:
  26. return backend.AUTH_BACKEND.tokens(instance)
  27. else:
  28. return {}
  29. def sanitize_log_data(secret, data=None, leave_characters=LEAVE_CHARS):
  30. """
  31. Clean private/secret data from log statements and other data.
  32. Assumes data and secret are strings. Replaces all but the first
  33. `leave_characters` of `secret`, as found in `data`, with '*'.
  34. If no data is given, all but the first `leave_characters` of secret
  35. are simply replaced and returned.
  36. """
  37. replace_secret = secret[:leave_characters] + (len(secret) - leave_characters) * "*"
  38. if data:
  39. return data.replace(secret, replace_secret)
  40. return replace_secret
  41. def setting(name, default=None):
  42. """Return setting value for given name or default value."""
  43. return getattr(settings, name, default)
  44. def backend_setting(backend, name, default=None):
  45. """
  46. Looks for setting value following these rules:
  47. 1. Search for <backend_name> prefixed setting
  48. 2. Search for setting given by name
  49. 3. Return default
  50. """
  51. backend_name = get_backend_name(backend)
  52. setting_name = "{}_{}".format(backend_name.upper().replace("-", "_"), name)
  53. if hasattr(settings, setting_name):
  54. return setting(setting_name)
  55. elif hasattr(settings, name):
  56. return setting(name)
  57. else:
  58. return default
  59. logger = logging.getLogger("SocialAuth")
  60. logger.setLevel(logging.DEBUG)
  61. def log(level, *args, **kwargs):
  62. """Small wrapper around logger functions."""
  63. {
  64. "debug": logger.debug,
  65. "error": logger.error,
  66. "exception": logger.exception,
  67. "warn": logger.warn,
  68. }[level](*args, **kwargs)
  69. def model_to_ctype(val):
  70. """Converts values that are instance of Model to a dictionary
  71. with enough information to retrieve the instance back later."""
  72. if isinstance(val, Model):
  73. return {"pk": val.pk, "ctype": ContentType.objects.get_for_model(val).pk}
  74. if isinstance(val, RpcModel):
  75. return val.dict()
  76. return val
  77. def ctype_to_model(val):
  78. """Converts back the instance saved by model_to_ctype function."""
  79. if isinstance(val, dict) and "pk" in val and "ctype" in val:
  80. ctype = ContentType.objects.get_for_id(val["ctype"])
  81. ModelClass = ctype.model_class()
  82. assert ModelClass is not None
  83. return ModelClass.objects.get(pk=val["pk"])
  84. if isinstance(val, dict) and "username" in val and "name" in val:
  85. return RpcUser.parse_obj(val)
  86. return val
  87. def clean_partial_pipeline(request):
  88. """Cleans any data for partial pipeline."""
  89. name = setting("SOCIAL_AUTH_PARTIAL_PIPELINE_KEY", "partial_pipeline")
  90. # Check for key to avoid flagging the session as modified unnecessary
  91. if name in request.session:
  92. request.session.pop(name, None)
  93. def url_add_parameters(url, params):
  94. """Adds parameters to URL, parameter will be repeated if already present"""
  95. if params:
  96. fragments = list(urlparse(url))
  97. fragments[4] = urlencode(parse_qsl(fragments[4]) + list(params.items()))
  98. url = urlunparse(fragments)
  99. return url
  100. def dsa_urlopen(*args, **kwargs):
  101. """Like urllib2.urlopen but sets a timeout defined by
  102. SOCIAL_AUTH_URLOPEN_TIMEOUT setting if defined (and not already in
  103. kwargs)."""
  104. timeout = setting("SOCIAL_AUTH_URLOPEN_TIMEOUT")
  105. if timeout and "timeout" not in kwargs:
  106. kwargs["timeout"] = timeout
  107. return urlopen(*args, **kwargs)
  108. def get_backend_name(backend):
  109. return getattr(getattr(backend, "AUTH_BACKEND", backend), "name", None)
  110. def module_member(name):
  111. mod, member = name.rsplit(".", 1)
  112. module = import_module(mod)
  113. return getattr(module, member)
  114. def parse_qs(value):
  115. """Like urlparse.parse_qs but transform list values to single items"""
  116. return drop_lists(urlparse_parse_qs(value))
  117. def drop_lists(value):
  118. return {key: val[0] for key, val in value.items()}
  119. if __name__ == "__main__":
  120. import doctest
  121. doctest.testmod()