Browse Source

feat(apis): add ApiTokenDetailsEndpoint (#74105)

created ApiTokenDetailsEndpoint with GET and PUT endpoints to fetch and
modify user auth tokens

https://github.com/getsentry/sentry/issues/73556
Mia Hsu 8 months ago
parent
commit
f24951b7b1

+ 92 - 0
src/sentry/api/endpoints/api_token_details.py

@@ -0,0 +1,92 @@
+from django.utils.decorators import method_decorator
+from django.views.decorators.cache import never_cache
+from rest_framework import serializers
+from rest_framework.fields import CharField
+from rest_framework.permissions import IsAuthenticated
+from rest_framework.request import Request
+from rest_framework.response import Response
+
+from sentry import analytics
+from sentry.api.api_owners import ApiOwner
+from sentry.api.api_publish_status import ApiPublishStatus
+from sentry.api.base import Endpoint, control_silo_endpoint
+from sentry.api.endpoints.api_tokens import ApiTokensEndpoint
+from sentry.api.exceptions import ResourceDoesNotExist
+from sentry.api.serializers import serialize
+from sentry.models.apitoken import ApiToken
+
+ALLOWED_FIELDS = ["name", "tokenId"]
+
+
+class ApiTokenNameSerializer(serializers.Serializer):
+    name = CharField(max_length=255, allow_blank=True, required=True)
+
+
+@control_silo_endpoint
+class ApiTokenDetailsEndpoint(Endpoint):
+    publish_status = {
+        "GET": ApiPublishStatus.PRIVATE,
+        "PUT": ApiPublishStatus.PRIVATE,
+        "DELETE": ApiPublishStatus.PRIVATE,
+    }
+    owner = ApiOwner.SECURITY
+    permission_classes = (IsAuthenticated,)
+
+    @method_decorator(never_cache)
+    def get(self, request: Request, token_id: int) -> Response:
+
+        user_id = ApiTokensEndpoint.get_appropriate_user_id(request=request)
+
+        try:
+            instance = ApiToken.objects.get(id=token_id, application__isnull=True, user_id=user_id)
+        except ApiToken.DoesNotExist:
+            raise ResourceDoesNotExist
+
+        return Response(serialize(instance, request.user, include_token=False))
+
+    @method_decorator(never_cache)
+    def put(self, request: Request, token_id: int) -> Response:
+        keys = list(request.data.keys())
+        if any(key not in ALLOWED_FIELDS for key in keys):
+            return Response(
+                {"error": "Only auth token name can be edited after creation"}, status=403
+            )
+
+        serializer = ApiTokenNameSerializer(data=request.data)
+
+        if not serializer.is_valid():
+            return Response(serializer.errors, status=400)
+
+        result = serializer.validated_data
+
+        user_id = ApiTokensEndpoint.get_appropriate_user_id(request=request)
+
+        try:
+            token_to_rename = ApiToken.objects.get(
+                id=token_id, application__isnull=True, user_id=user_id
+            )
+        except ApiToken.DoesNotExist:
+            raise ResourceDoesNotExist
+
+        token_to_rename.name = result.get("name")
+        token_to_rename.save()
+
+        return Response(serialize(token_to_rename, request.user, include_token=False), status=200)
+
+    @method_decorator(never_cache)
+    def delete(self, request: Request, token_id: int) -> Response:
+        user_id = ApiTokensEndpoint.get_appropriate_user_id(request=request)
+
+        try:
+            token_to_delete = ApiToken.objects.get(
+                id=token_id, application__isnull=True, user_id=user_id
+            )
+        except ApiToken.DoesNotExist:
+            raise ResourceDoesNotExist
+
+        token_to_delete.delete()
+        analytics.record(
+            "api_token.deleted",
+            user_id=user_id,
+        )
+        return Response(status=204)

+ 5 - 43
src/sentry/api/endpoints/api_tokens.py

@@ -24,11 +24,8 @@ from sentry.types.token import AuthTokenType
 ALLOWED_FIELDS = ["name", "tokenId"]
 
 
-class ApiTokenNameSerializer(serializers.Serializer):
+class ApiTokenSerializer(serializers.Serializer):
     name = CharField(max_length=255, allow_blank=True, required=False)
-
-
-class ApiTokenSerializer(ApiTokenNameSerializer):
     scopes = MultipleChoiceField(required=True, choices=settings.SENTRY_SCOPES)
 
 
@@ -39,13 +36,12 @@ class ApiTokensEndpoint(Endpoint):
         "DELETE": ApiPublishStatus.PRIVATE,
         "GET": ApiPublishStatus.PRIVATE,
         "POST": ApiPublishStatus.PRIVATE,
-        "PUT": ApiPublishStatus.PRIVATE,
     }
     authentication_classes = (SessionNoAuthTokenAuthentication,)
     permission_classes = (IsAuthenticated,)
 
-    @classmethod
-    def _get_appropriate_user_id(cls, request: Request) -> int:
+    @staticmethod
+    def get_appropriate_user_id(request: Request) -> int:
         """
         Gets the user id to use for the request, based on what the current state of the request is.
         If the request is made by a superuser, then they are allowed to act on behalf of other user's data.
@@ -66,7 +62,7 @@ class ApiTokensEndpoint(Endpoint):
 
     @method_decorator(never_cache)
     def get(self, request: Request) -> Response:
-        user_id = self._get_appropriate_user_id(request=request)
+        user_id = self.get_appropriate_user_id(request=request)
 
         token_list = list(
             ApiToken.objects.filter(application__isnull=True, user_id=user_id).select_related(
@@ -104,43 +100,9 @@ class ApiTokensEndpoint(Endpoint):
             return Response(serialize(token, request.user), status=201)
         return Response(serializer.errors, status=400)
 
-    @method_decorator(never_cache)
-    def put(self, request: Request) -> Response:
-        keys = list(request.data.keys())
-
-        if any(key not in ALLOWED_FIELDS for key in keys):
-            return Response(
-                {"error": "Only auth token name can be edited after creation"}, status=403
-            )
-
-        serializer = ApiTokenNameSerializer(data=request.data)
-
-        if serializer.is_valid():
-            result = serializer.validated_data
-
-            user_id = self._get_appropriate_user_id(request=request)
-            token_id = request.data.get("tokenId", None)
-
-            if token_id is None:
-                return Response({"tokenId": token_id}, status=400)
-
-            with outbox_context(transaction.atomic(router.db_for_write(ApiToken)), flush=False):
-                token_to_rename: ApiToken | None = ApiToken.objects.filter(
-                    id=token_id, application__isnull=True, user_id=user_id
-                ).first()
-
-                if token_to_rename is None:
-                    return Response({"tokenId": token_id, "userId": user_id}, status=400)
-
-                token_to_rename.name = result.get("name", None)
-                token_to_rename.save()
-
-            return Response(status=204)
-        return Response(serializer.errors, status=400)
-
     @method_decorator(never_cache)
     def delete(self, request: Request):
-        user_id = self._get_appropriate_user_id(request=request)
+        user_id = self.get_appropriate_user_id(request=request)
         token_id = request.data.get("tokenId", None)
         # Account for token_id being 0, which can be considered valid
         if token_id is None:

+ 6 - 0
src/sentry/api/urls.py

@@ -210,6 +210,7 @@ from .endpoints.api_application_details import ApiApplicationDetailsEndpoint
 from .endpoints.api_application_rotate_secret import ApiApplicationRotateSecretEndpoint
 from .endpoints.api_applications import ApiApplicationsEndpoint
 from .endpoints.api_authorizations import ApiAuthorizationsEndpoint
+from .endpoints.api_token_details import ApiTokenDetailsEndpoint
 from .endpoints.api_tokens import ApiTokensEndpoint
 from .endpoints.artifact_bundles import ArtifactBundlesEndpoint
 from .endpoints.artifact_lookup import ProjectArtifactLookupEndpoint
@@ -3146,6 +3147,11 @@ urlpatterns = [
         ApiTokensEndpoint.as_view(),
         name="sentry-api-0-api-tokens",
     ),
+    re_path(
+        r"^api-tokens/(?P<token_id>[^\/]+)/$",
+        ApiTokenDetailsEndpoint.as_view(),
+        name="sentry-api-0-api-token-details",
+    ),
     re_path(
         r"^prompts-activity/$",
         PromptsActivityEndpoint.as_view(),

+ 1 - 0
static/app/data/controlsiloUrlPatterns.ts

@@ -111,6 +111,7 @@ const patterns: RegExp[] = [
   new RegExp('^api/0/api-applications/[^/]+/rotate-secret/$'),
   new RegExp('^api/0/api-authorizations/$'),
   new RegExp('^api/0/api-tokens/$'),
+  new RegExp('^api/0/api-tokens/[^/]+/$'),
   new RegExp('^api/0/authenticators/$'),
   new RegExp('^api/0/accept-invite/[^/]+/[^/]+/[^/]+/$'),
   new RegExp('^api/0/accept-invite/[^/]+/[^/]+/$'),

+ 152 - 0
tests/sentry/api/endpoints/test_api_token_details.py

@@ -0,0 +1,152 @@
+from rest_framework import status
+
+from sentry.models.apitoken import ApiToken
+from sentry.testutils.cases import APITestCase
+from sentry.testutils.silo import control_silo_test
+
+
+@control_silo_test
+class ApiTokenGetTest(APITestCase):
+    endpoint = "sentry-api-0-api-token-details"
+
+    def test_simple(self):
+        token = ApiToken.objects.create(user=self.user, name="token 1", scope_list=["event:read"])
+
+        self.login_as(self.user)
+        response = self.get_success_response(token.id, status_code=status.HTTP_200_OK)
+        assert response.content
+
+        res = response.data
+
+        assert res.get("id") == str(token.id)
+        assert res.get("name") == "token 1"
+        assert res.get("scopes") == ["event:read"]
+
+    def test_never_cache(self):
+        token = ApiToken.objects.create(user=self.user, name="token 1")
+
+        self.login_as(self.user)
+        response = self.get_success_response(token.id, status_code=status.HTTP_200_OK)
+        assert (
+            response.get("cache-control")
+            == "max-age=0, no-cache, no-store, must-revalidate, private"
+        )
+
+    def test_not_exists(self):
+        self.login_as(self.user)
+        self.get_error_response(-1, status_code=status.HTTP_404_NOT_FOUND)
+
+    def test_no_auth(self):
+        token = ApiToken.objects.create(user=self.user, name="token 1")
+        self.get_error_response(token.id, status_code=status.HTTP_401_UNAUTHORIZED)
+
+
+@control_silo_test
+class ApiTokenPutTest(APITestCase):
+    endpoint = "sentry-api-0-api-token-details"
+    method = "PUT"
+
+    def test_simple(self):
+        token = ApiToken.objects.create(user=self.user, name="token 1")
+        payload = {"name": "new token"}
+
+        self.login_as(self.user)
+        self.get_success_response(token.id, status_code=status.HTTP_200_OK, **payload)
+
+        tokenNew = ApiToken.objects.get(user=self.user)
+        assert tokenNew.name == "new token"
+        assert tokenNew.get_scopes() == token.get_scopes()
+
+    def test_never_cache(self):
+        token = ApiToken.objects.create(user=self.user, name="token 1")
+        payload = {"name": "new token"}
+
+        self.login_as(self.user)
+        response = self.get_success_response(token.id, status_code=status.HTTP_200_OK, **payload)
+        assert (
+            response.get("cache-control")
+            == "max-age=0, no-cache, no-store, must-revalidate, private"
+        )
+
+    def test_remove_name(self):
+        token = ApiToken.objects.create(user=self.user, name="name")
+        payload = {"name": ""}
+
+        self.login_as(self.user)
+        self.get_success_response(token.id, status_code=status.HTTP_200_OK, **payload)
+
+        token = ApiToken.objects.get(user=self.user)
+        assert token.name == ""
+
+    def test_add_name(self):
+        token = ApiToken.objects.create(user=self.user)
+        payload = {"name": "new token"}
+
+        self.login_as(self.user)
+        self.get_success_response(token.id, status_code=status.HTTP_200_OK, **payload)
+
+        token = ApiToken.objects.get(user=self.user)
+        assert token.name == "new token"
+
+    def test_name_too_long(self):
+        token = ApiToken.objects.create(user=self.user)
+        payload = {
+            "name": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in"
+        }
+
+        self.login_as(self.user)
+        self.get_error_response(token.id, status_code=status.HTTP_400_BAD_REQUEST, **payload)
+
+    def test_editing_scopes(self):
+        token = ApiToken.objects.create(user=self.user)
+        payload = {"name": "new token", "scopes": ["event:read"]}
+
+        self.login_as(self.user)
+        response = self.get_error_response(
+            token.id, status_code=status.HTTP_403_FORBIDDEN, **payload
+        )
+        assert response.content
+        assert response.data == {"error": "Only auth token name can be edited after creation"}
+
+    def test_invalid_token_id(self):
+        payload = {"name": "new token"}
+
+        self.login_as(self.user)
+        self.get_error_response(-1, status_code=status.HTTP_404_NOT_FOUND, **payload)
+
+    def test_no_auth(self):
+        token = ApiToken.objects.create(user=self.user, name="token 1")
+        payload = {"name": "new token"}
+
+        self.get_error_response(token.id, status_code=status.HTTP_401_UNAUTHORIZED, **payload)
+
+
+@control_silo_test
+class ApiTokenDeleteTest(APITestCase):
+    endpoint = "sentry-api-0-api-token-details"
+    method = "DELETE"
+
+    def test_simple(self):
+        token = ApiToken.objects.create(user=self.user, name="token 1")
+        self.login_as(self.user)
+        self.get_success_response(token.id, status_code=status.HTTP_204_NO_CONTENT)
+        assert not ApiToken.objects.filter(id=token.id).exists()
+
+    def test_never_cache(self):
+        token = ApiToken.objects.create(user=self.user, name="token 1")
+
+        self.login_as(self.user)
+        response = self.get_success_response(token.id, status_code=status.HTTP_204_NO_CONTENT)
+        assert (
+            response.get("cache-control")
+            == "max-age=0, no-cache, no-store, must-revalidate, private"
+        )
+
+    def test_invalid_token_id(self):
+        self.login_as(self.user)
+        self.get_error_response(-1, status_code=status.HTTP_404_NOT_FOUND)
+
+    def test_no_auth(self):
+        token = ApiToken.objects.create(user=self.user, name="token 1")
+
+        self.get_error_response(token.id, status_code=status.HTTP_401_UNAUTHORIZED)

+ 0 - 98
tests/sentry/api/endpoints/test_api_tokens.py

@@ -123,104 +123,6 @@ class ApiTokensCreateTest(APITestCase):
         assert response.data[0]["name"] is None
 
 
-@control_silo_test
-class ApiTokensPutTest(APITestCase):
-    def test_simple(self):
-        token = ApiToken.objects.create(user=self.user, name="name")
-        self.login_as(self.user)
-        url = reverse("sentry-api-0-api-tokens")
-        assert token.name == "name"
-        response = self.client.put(
-            url,
-            data={"name": "rename1", "tokenId": token.id},
-        )
-        assert response.status_code == 204
-        token = ApiToken.objects.get(user=self.user)
-        assert token.name == "rename1"
-
-    def test_never_cache(self):
-        token = ApiToken.objects.create(user=self.user, name="name")
-        self.login_as(self.user)
-        url = reverse("sentry-api-0-api-tokens")
-        response = self.client.put(
-            url,
-            data={"name": "rename1", "tokenId": token.id},
-        )
-        assert response.status_code == 204
-        assert (
-            response.get("cache-control")
-            == "max-age=0, no-cache, no-store, must-revalidate, private"
-        )
-
-    def test_delete_name(self):
-        token = ApiToken.objects.create(user=self.user, name="name")
-        self.login_as(self.user)
-        url = reverse("sentry-api-0-api-tokens")
-        response = self.client.put(
-            url,
-            data={"tokenId": token.id},
-        )
-        assert response.status_code == 204
-        token = ApiToken.objects.get(user=self.user)
-        assert token.name is None
-
-    def test_add_name(self):
-        token = ApiToken.objects.create(user=self.user)
-        self.login_as(self.user)
-        url = reverse("sentry-api-0-api-tokens")
-        assert token.name is None
-        response = self.client.put(
-            url,
-            data={"name": "rename1", "tokenId": token.id},
-        )
-        assert response.status_code == 204
-        token = ApiToken.objects.get(user=self.user)
-        assert token.name == "rename1"
-
-    def test_invalid_name(self):
-        token = ApiToken.objects.create(user=self.user)
-        self.login_as(self.user)
-        url = reverse("sentry-api-0-api-tokens")
-        response = self.client.put(
-            url,
-            data={
-                "name": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in",
-                "tokenId": token.id,
-            },
-        )
-        assert response.status_code == 400
-
-    def test_editing_scopes(self):
-        token = ApiToken.objects.create(user=self.user)
-        self.login_as(self.user)
-        url = reverse("sentry-api-0-api-tokens")
-        response = self.client.put(
-            url,
-            data={
-                "name": "rename1",
-                "scopes": ["event:read"],
-                "tokenId": token.id,
-            },
-        )
-        assert response.status_code == 403
-
-    def test_invalid_token_id(self):
-        token = ApiToken.objects.create(user=self.user)
-        self.login_as(self.user)
-        url = reverse("sentry-api-0-api-tokens")
-        response = self.client.put(url, data={"tokenId": -1})
-        assert response.status_code == 400
-        assert ApiToken.objects.filter(id=token.id).exists()
-
-    def test_no_token_param(self):
-        token = ApiToken.objects.create(user=self.user)
-        self.login_as(self.user)
-        url = reverse("sentry-api-0-api-tokens")
-        response = self.client.put(url, data={})
-        assert response.status_code == 400
-        assert ApiToken.objects.filter(id=token.id).exists()
-
-
 @control_silo_test
 class ApiTokensDeleteTest(APITestCase):
     def test_simple(self):