123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- from __future__ import annotations
- import logging
- from importlib import import_module
- from typing import TYPE_CHECKING, Any
- from urllib.parse import parse_qs as urlparse_parse_qs
- from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
- from urllib.request import urlopen
- from django.conf import settings
- from django.contrib.contenttypes.models import ContentType
- from django.db.models import Model
- from sentry.hybridcloud.rpc import RpcModel
- from sentry.services.hybrid_cloud.user import RpcUser
- LEAVE_CHARS = getattr(settings, "SOCIAL_AUTH_LOG_SANITIZE_LEAVE_CHARS", 4)
- if TYPE_CHECKING:
- from sentry.services.hybrid_cloud.usersocialauth.model import RpcUserSocialAuth
- from social_auth.backends import SocialAuthBackend
- from .models import UserSocialAuth
- def get_backend(instance: UserSocialAuth | RpcUserSocialAuth) -> type[SocialAuthBackend] | None:
- # Make import here to avoid recursive imports :-/
- from social_auth.backends import get_backends
- return get_backends().get(instance.provider)
- def tokens(instance: UserSocialAuth | RpcUserSocialAuth) -> dict[str, Any]:
- """Return access_token stored in extra_data or None"""
- backend = instance.get_backend()
- if backend:
- return backend.AUTH_BACKEND.tokens(instance)
- else:
- return {}
- def sanitize_log_data(secret, data=None, leave_characters=LEAVE_CHARS):
- """
- Clean private/secret data from log statements and other data.
- Assumes data and secret are strings. Replaces all but the first
- `leave_characters` of `secret`, as found in `data`, with '*'.
- If no data is given, all but the first `leave_characters` of secret
- are simply replaced and returned.
- """
- replace_secret = secret[:leave_characters] + (len(secret) - leave_characters) * "*"
- if data:
- return data.replace(secret, replace_secret)
- return replace_secret
- def setting(name, default=None):
- """Return setting value for given name or default value."""
- return getattr(settings, name, default)
- def backend_setting(backend, name, default=None):
- """
- Looks for setting value following these rules:
- 1. Search for <backend_name> prefixed setting
- 2. Search for setting given by name
- 3. Return default
- """
- backend_name = get_backend_name(backend)
- setting_name = "{}_{}".format(backend_name.upper().replace("-", "_"), name)
- if hasattr(settings, setting_name):
- return setting(setting_name)
- elif hasattr(settings, name):
- return setting(name)
- else:
- return default
- logger = logging.getLogger("SocialAuth")
- logger.setLevel(logging.DEBUG)
- def log(level, *args, **kwargs):
- """Small wrapper around logger functions."""
- {
- "debug": logger.debug,
- "error": logger.error,
- "exception": logger.exception,
- "warn": logger.warn,
- }[level](*args, **kwargs)
- def model_to_ctype(val):
- """Converts values that are instance of Model to a dictionary
- with enough information to retrieve the instance back later."""
- if isinstance(val, Model):
- return {"pk": val.pk, "ctype": ContentType.objects.get_for_model(val).pk}
- if isinstance(val, RpcModel):
- return val.dict()
- return val
- def ctype_to_model(val):
- """Converts back the instance saved by model_to_ctype function."""
- if isinstance(val, dict) and "pk" in val and "ctype" in val:
- ctype = ContentType.objects.get_for_id(val["ctype"])
- ModelClass = ctype.model_class()
- assert ModelClass is not None
- return ModelClass.objects.get(pk=val["pk"])
- if isinstance(val, dict) and "username" in val and "name" in val:
- return RpcUser.parse_obj(val)
- return val
- def clean_partial_pipeline(request):
- """Cleans any data for partial pipeline."""
- name = setting("SOCIAL_AUTH_PARTIAL_PIPELINE_KEY", "partial_pipeline")
- # Check for key to avoid flagging the session as modified unnecessary
- if name in request.session:
- request.session.pop(name, None)
- def url_add_parameters(url, params):
- """Adds parameters to URL, parameter will be repeated if already present"""
- if params:
- fragments = list(urlparse(url))
- fragments[4] = urlencode(parse_qsl(fragments[4]) + list(params.items()))
- url = urlunparse(fragments)
- return url
- def dsa_urlopen(*args, **kwargs):
- """Like urllib2.urlopen but sets a timeout defined by
- SOCIAL_AUTH_URLOPEN_TIMEOUT setting if defined (and not already in
- kwargs)."""
- timeout = setting("SOCIAL_AUTH_URLOPEN_TIMEOUT")
- if timeout and "timeout" not in kwargs:
- kwargs["timeout"] = timeout
- return urlopen(*args, **kwargs)
- def get_backend_name(backend):
- return getattr(getattr(backend, "AUTH_BACKEND", backend), "name", None)
- def module_member(name):
- mod, member = name.rsplit(".", 1)
- module = import_module(mod)
- return getattr(module, member)
- def parse_qs(value):
- """Like urlparse.parse_qs but transform list values to single items"""
- return drop_lists(urlparse_parse_qs(value))
- def drop_lists(value):
- return {key: val[0] for key, val in value.items()}
- if __name__ == "__main__":
- import doctest
- doctest.testmod()
|