Browse Source

feat(auth): Add /userinfo endpoint for OIDC (#52493)

Implements the `/userinfo` endpoint for OIDC compliance (initial spec is
[here](https://connect2id.com/products/server/docs/api/userinfo)).

Note that anyone can call this endpoint - the
authentication/authorization comes from the bearer token they pass in.
Eric Hasegawa 1 year ago
parent
commit
8839b6d675

+ 50 - 0
src/sentry/api/endpoints/oauth_userinfo.py

@@ -0,0 +1,50 @@
+from rest_framework import status
+from rest_framework.authentication import get_authorization_header
+from rest_framework.request import Request
+from rest_framework.response import Response
+
+from sentry.api.base import Endpoint, control_silo_endpoint
+from sentry.api.exceptions import ParameterValidationError, ResourceDoesNotExist, SentryAPIException
+from sentry.models import ApiToken, UserEmail
+
+
+class InsufficientScopesError(SentryAPIException):
+    status_code = status.HTTP_403_FORBIDDEN
+    code = "insufficient-scope"
+    message = "openid scope is required for userinfo access"
+
+
+@control_silo_endpoint
+class OAuthUserInfoEndpoint(Endpoint):
+    authentication_classes = ()
+    permission_classes = ()
+
+    def get(self, request: Request) -> Response:
+        try:
+            access_token = get_authorization_header(request).split()[1].decode("utf-8")
+        except IndexError:
+            raise ParameterValidationError("Bearer token not found in authorization header")
+        try:
+            token_details = ApiToken.objects.get(token=access_token)
+        except ApiToken.DoesNotExist:
+            raise ResourceDoesNotExist("Access token not found")
+
+        scopes = token_details.get_scopes()
+        if "openid" not in scopes:
+            raise InsufficientScopesError
+
+        user = token_details.user
+        user_output = {"sub": user.id}
+        if "profile" in scopes:
+            profile_details = {
+                "name": user.name,
+                "avatar_type": user.avatar_type,
+                "avatar_url": user.avatar_url,
+                "date_joined": user.date_joined,
+            }
+            user_output.update(profile_details)
+        if "email" in scopes:
+            email = UserEmail.objects.get(user=user)
+            email_details = {"email": email.email, "email_verified": email.is_verified}
+            user_output.update(email_details)
+        return Response(user_output)

+ 6 - 0
src/sentry/web/urls.py

@@ -8,6 +8,7 @@ from django.http import HttpResponse
 from django.urls import URLPattern, URLResolver, re_path
 from django.views.generic import RedirectView
 
+from sentry.api.endpoints.oauth_userinfo import OAuthUserInfoEndpoint
 from sentry.auth.providers.saml2.provider import SAML2AcceptACSView, SAML2MetadataView, SAML2SLSView
 from sentry.charts.endpoints import serve_chartcuterie_config
 from sentry.web import api
@@ -164,6 +165,11 @@ urlpatterns += [
                     r"^token/$",
                     OAuthTokenView.as_view(),
                 ),
+                re_path(
+                    r"userinfo/$",
+                    OAuthUserInfoEndpoint.as_view(),
+                    name="sentry-api-0-oauth-userinfo",
+                ),
             ]
         ),
     ),

+ 107 - 0
tests/sentry/api/endpoints/test_oauth_userinfo.py

@@ -0,0 +1,107 @@
+import datetime
+
+from django.urls import reverse
+from rest_framework.test import APIClient
+
+from sentry.models import ApiToken
+from sentry.testutils import APITestCase
+from sentry.testutils.silo import control_silo_test
+
+
+@control_silo_test(stable=True)
+class OAuthUserInfoTest(APITestCase):
+    def setUp(self):
+        super().setUp()
+        self.login_as(self.user)
+        self.path = reverse(
+            "sentry-api-0-oauth-userinfo",
+        )
+        self.client = APIClient()
+
+    def test_requires_access_token(self):
+        response = self.client.get(self.path)
+
+        assert response.status_code == 400
+        assert response.data["detail"]["code"] == "parameter-validation-error"
+        assert (
+            response.data["detail"]["message"] == "Bearer token not found in authorization header"
+        )
+
+    def test_declines_invalid_token(self):
+        self.client.credentials(HTTP_AUTHORIZATION="Bearer  abcd")
+        response = self.client.get(self.path)
+        assert response.status_code == 404
+        assert response.data["detail"] == "Access token not found"
+
+    def test_declines_if_no_openid_scope(self):
+        token_without_openid_scope = ApiToken.objects.create(user=self.user, scope_list=[])
+        self.client.credentials(HTTP_AUTHORIZATION="Bearer " + token_without_openid_scope.token)
+
+        response = self.client.get(self.path)
+
+        assert response.status_code == 403
+        assert response.data["detail"]["code"] == "insufficient-scope"
+        assert response.data["detail"]["message"] == "openid scope is required for userinfo access"
+
+    def test_gets_sub_with_openid_scope(self):
+        """
+        Ensures we get `sub`, and only `sub`, if the only scope is openid.
+        """
+        openid_only_token = ApiToken.objects.create(user=self.user, scope_list=["openid"])
+
+        self.client.credentials(HTTP_AUTHORIZATION="Bearer " + openid_only_token.token)
+
+        response = self.client.get(self.path)
+
+        assert response.status_code == 200
+        assert response.data == {"sub": self.user.id}
+
+    def test_gets_email_information(self):
+        email_token = ApiToken.objects.create(user=self.user, scope_list=["openid", "email"])
+        self.client.credentials(HTTP_AUTHORIZATION="Bearer " + email_token.token)
+
+        response = self.client.get(self.path)
+
+        assert response.status_code == 200
+        assert response.data == {
+            "sub": self.user.id,
+            "email": self.user.email,
+            "email_verified": True,
+        }
+
+    def test_gets_profile_information(self):
+        profile_token = ApiToken.objects.create(user=self.user, scope_list=["openid", "profile"])
+        self.client.credentials(HTTP_AUTHORIZATION="Bearer " + profile_token.token)
+
+        response = self.client.get(self.path)
+
+        assert response.status_code == 200
+
+        assert response.data["avatar_type"] == 0
+        assert response.data["avatar_url"] is None
+        assert isinstance(response.data["date_joined"], datetime.datetime)
+        assert response.data["name"] == ""
+        assert response.data["sub"] == self.user.id
+
+    def test_gets_multiple_scopes(self):
+        all_access_token = ApiToken.objects.create(
+            user=self.user, scope_list=["openid", "profile", "email", "address", "phone"]
+        )
+        self.client.credentials(HTTP_AUTHORIZATION="Bearer " + all_access_token.token)
+
+        response = self.client.get(self.path)
+
+        assert response.status_code == 200
+
+        # profile information
+        assert response.data["avatar_type"] == 0
+        assert response.data["avatar_url"] is None
+        assert isinstance(response.data["date_joined"], datetime.datetime)
+        assert response.data["name"] == ""
+
+        # email information
+        assert response.data["email"] == self.user.email
+        assert response.data["email_verified"]
+
+        # openid information
+        assert response.data["sub"] == self.user.id