Browse Source

ref(superuser): Extract superuser exception from permission layer (#63541)

### Background
We previously handled the logic of raising the `SuperuserRequired`
exception from the `SuperuserPermission` class itself. This breaks the
abstraction between permissions which are supposed to simply return a
boolean, and the handling of the failed checks which is done in the
`permission_denied` function in DRF's `APIView` class.

### Changes
This PR removes that logic from `SuperuserPermission` and overrides
`permission_denied` to handle raising the specific `SuperuserRequired`
exception. The flow goes:

1. APIView's check_permissions calls `SuperuserPermission`
2. `SuperuserPermission` fails
3. `check_permissions` calls `permission_denied` on failure
4. `permission_denied` raises `SuperuserRequired` appropriately

Note that we only want to raise the exception when it's the only
permission class. If there are other permission checks, we're using an
endpoint that is not solely limited to superusers.

### In action
To ensure the exception is still being raised properly, I went go onto
_admin, delete the superuser cookie, and make sure that the next call
raises `SuperuserRequired` which prompts a modal to open for
re-authentication


https://github.com/getsentry/sentry/assets/67301797/ca2c16b5-5f73-4c9c-9f73-8925b26bedbc

---------

Co-authored-by: Cathy Teng <70817427+cathteng@users.noreply.github.com>
Seiji Chew 1 year ago
parent
commit
a20f0cb56b

+ 19 - 1
src/sentry/api/base.py

@@ -24,6 +24,7 @@ from sentry_sdk import Scope
 from sentry import analytics, options, tsdb
 from sentry.api.api_owners import ApiOwner
 from sentry.api.api_publish_status import ApiPublishStatus
+from sentry.api.exceptions import SuperuserRequired
 from sentry.apidocs.hooks import HTTP_METHOD_NAME
 from sentry.auth import access
 from sentry.models.environment import Environment
@@ -48,7 +49,7 @@ from .authentication import (
     UserAuthTokenAuthentication,
 )
 from .paginator import BadPaginationError, Paginator
-from .permissions import NoPermission
+from .permissions import NoPermission, SuperuserPermission
 
 __all__ = [
     "Endpoint",
@@ -205,6 +206,23 @@ class Endpoint(APIView):
     def convert_args(self, request: Request, *args, **kwargs):
         return (args, kwargs)
 
+    def permission_denied(self, request, message=None, code=None):
+        """
+        Raise a specific superuser exception if the user can become superuser
+        and the only permission class is SuperuserPermission. Otherwise, raises
+        the appropriate exception according to parent DRF function.
+        """
+        permissions = self.get_permissions()
+
+        can_be_superuser = request.user.is_authenticated and request.user.is_superuser
+        has_only_superuser_permission = len(permissions) == 1 and isinstance(
+            permissions[0], SuperuserPermission
+        )
+
+        if can_be_superuser and has_only_superuser_permission:
+            raise SuperuserRequired()
+        super().permission_denied(request, message, code)
+
     def handle_exception(  # type: ignore[override]
         self,
         request: Request,

+ 1 - 5
src/sentry/api/endpoints/relocations/index.py

@@ -17,7 +17,6 @@ from sentry.api.api_owners import ApiOwner
 from sentry.api.api_publish_status import ApiPublishStatus
 from sentry.api.base import Endpoint, region_silo_endpoint
 from sentry.api.endpoints.relocations import ERR_FEATURE_DISABLED
-from sentry.api.exceptions import SuperuserRequired
 from sentry.api.paginator import OffsetPaginator
 from sentry.api.permissions import SuperuserPermission
 from sentry.api.serializers import serialize
@@ -89,10 +88,7 @@ def validate_new_relocation_request(
     request: Request, owner_username: str, org_slugs: list[str], file_size: int
 ) -> Response | None:
     # We only honor the `relocation.enabled` flag for non-superusers.
-    try:
-        is_superuser = SuperuserPermission().has_permission(request, None)
-    except SuperuserRequired:
-        is_superuser = False
+    is_superuser = SuperuserPermission().has_permission(request, None)
     if not options.get("relocation.enabled") and not is_superuser:
         return Response({"detail": ERR_FEATURE_DISABLED}, status=status.HTTP_403_FORBIDDEN)
 

+ 5 - 10
src/sentry/api/permissions.py

@@ -43,6 +43,11 @@ class NoPermission(permissions.BasePermission):
         return False
 
 
+class SuperuserPermission(permissions.BasePermission):
+    def has_permission(self, request: Request, view: object) -> bool:
+        return is_active_superuser(request)
+
+
 class ScopedPermission(permissions.BasePermission):
     """
     Permissions work depending on the type of authentication:
@@ -76,15 +81,6 @@ class ScopedPermission(permissions.BasePermission):
         return False
 
 
-class SuperuserPermission(permissions.BasePermission):
-    def has_permission(self, request: Request, view: object) -> bool:
-        if is_active_superuser(request):
-            return True
-        if request.user.is_authenticated and request.user.is_superuser:
-            raise SuperuserRequired
-        return False
-
-
 class SentryPermission(ScopedPermission):
     def is_not_2fa_compliant(
         self, request: Request, organization: RpcOrganization | Organization
@@ -163,7 +159,6 @@ class SentryPermission(ScopedPermission):
         elif request.user.is_authenticated:
             # session auth needs to confirm various permissions
             if self.needs_sso(request, org_context.organization):
-
                 logger.info(
                     "access.must-sso",
                     extra=extra,

+ 37 - 3
tests/sentry/api/test_base.py

@@ -4,12 +4,15 @@ from unittest.mock import MagicMock
 from django.http import HttpRequest, QueryDict, StreamingHttpResponse
 from django.test import override_settings
 from pytest import raises
+from rest_framework.permissions import AllowAny
 from rest_framework.response import Response
 from sentry_sdk import Scope
 from sentry_sdk.utils import exc_info_from_error
 
 from sentry.api.base import Endpoint, EndpointSiloLimit
+from sentry.api.exceptions import SuperuserRequired
 from sentry.api.paginator import GenericOffsetPaginator
+from sentry.api.permissions import SuperuserPermission
 from sentry.models.apikey import ApiKey
 from sentry.services.hybrid_cloud.util import FunctionSiloLimit
 from sentry.silo import SiloMode
@@ -33,6 +36,17 @@ class DummyEndpoint(Endpoint):
         return Response({"ok": True})
 
 
+class DummySuperuserPermissionEndpoint(DummyEndpoint):
+    permission_classes = (SuperuserPermission,)
+
+
+class DummySuperuserOrAnyPermissionEndpoint(DummyEndpoint):
+    permission_classes = (
+        SuperuserPermission,
+        AllowAny,
+    )
+
+
 class DummyErroringEndpoint(Endpoint):
     permission_classes = ()
     # `as_view` requires that any init args passed to it match attributes already on the
@@ -82,9 +96,6 @@ class DummyPaginationEndpoint(Endpoint):
         )
 
 
-_dummy_endpoint = DummyEndpoint.as_view()
-
-
 class DummyPaginationStreamingEndpoint(Endpoint):
     permission_classes = ()
 
@@ -104,6 +115,7 @@ class DummyPaginationStreamingEndpoint(Endpoint):
         )
 
 
+_dummy_endpoint = DummyEndpoint.as_view()
 _dummy_streaming_endpoint = DummyPaginationStreamingEndpoint.as_view()
 
 
@@ -575,3 +587,25 @@ class FunctionSiloLimitTest(APITestCase):
     def test_with_monolith_mode(self):
         self._test_active_on(SiloMode.REGION, SiloMode.MONOLITH, True)
         self._test_active_on(SiloMode.CONTROL, SiloMode.MONOLITH, True)
+
+
+class SuperuserPermissionTest(APITestCase):
+    def setUp(self):
+        super().setUp()
+        self.request = self.make_request(user=self.user, method="GET")
+        self.superuser_permission_view = DummySuperuserPermissionEndpoint().as_view()
+        self.superuser_or_any_permission_view = DummySuperuserOrAnyPermissionEndpoint().as_view()
+
+    def test_superuser_exception_raised(self):
+        response = self.superuser_permission_view(self.request)
+        response_detail = response.data["detail"]
+
+        assert response.status_code == SuperuserRequired.status_code
+        assert response_detail["code"] == SuperuserRequired.code
+        assert response_detail["message"] == SuperuserRequired.message
+
+    @mock.patch("sentry.api.permissions.is_active_superuser", return_value=True)
+    def test_superuser_or_any_no_exception_raised(self, mock_is_active_superuser):
+        response = self.superuser_or_any_permission_view(self.request)
+
+        assert response.status_code == 200, response.content