Просмотр исходного кода

ref(superuser): switch UserPermission to use superuser_has_permission (#64444)

Cathy Teng 1 год назад
Родитель
Сommit
6f3dd03c82

+ 9 - 2
src/sentry/api/bases/user.py

@@ -7,12 +7,13 @@ from typing_extensions import override
 from sentry.api.base import Endpoint
 from sentry.api.exceptions import ResourceDoesNotExist
 from sentry.api.permissions import SentryPermission, StaffPermissionMixin
-from sentry.auth.superuser import is_active_superuser
+from sentry.auth.superuser import is_active_superuser, superuser_has_permission
 from sentry.auth.system import is_system_auth
 from sentry.models.organization import OrganizationStatus
 from sentry.models.organizationmapping import OrganizationMapping
 from sentry.models.organizationmembermapping import OrganizationMemberMapping
 from sentry.models.user import User
+from sentry.services.hybrid_cloud.access.service import access_service
 from sentry.services.hybrid_cloud.organization import organization_service
 from sentry.services.hybrid_cloud.user import RpcUser
 from sentry.services.hybrid_cloud.user.service import user_service
@@ -26,8 +27,14 @@ class UserPermission(SentryPermission):
             return True
         if request.auth:
             return False
+
         if is_active_superuser(request):
-            return True
+            # collect admin level permissions (only used when a user is active superuser)
+            permissions = access_service.get_permissions_for_user(request.user.id)
+
+            if superuser_has_permission(request, permissions):
+                return True
+
         return False
 
 

+ 13 - 2
src/sentry/auth/superuser.py

@@ -87,12 +87,17 @@ def get_superuser_scopes(auth_state: RpcAuthState, user: Any):
     return superuser_scopes
 
 
-def superuser_has_permission(request: HttpRequest | Request) -> bool:
+def superuser_has_permission(
+    request: HttpRequest | Request, permissions: frozenset[str] | None = None
+) -> bool:
     """
     This is used in place of is_active_superuser() in APIs / permission classes.
     Checks if superuser has permission for the request.
     Superuser read-only is restricted to GET and OPTIONS requests.
     These checks do not affect self-hosted.
+
+    The `permissions` arg is passed in and used when request.access is not populated,
+    e.g. in UserPermission
     """
     if not is_active_superuser(request):
         return False
@@ -104,8 +109,14 @@ def superuser_has_permission(request: HttpRequest | Request) -> bool:
     if not features.has("auth:enterprise-superuser-read-write", actor=request.user):
         return True
 
+    # either request.access or permissions must exist
+    assert getattr(request, "access", None) or permissions is not None
+
     # superuser write can access all requests
-    if request.access.has_permission("superuser.write"):
+    if getattr(request, "access", None) and request.access.has_permission("superuser.write"):
+        return True
+
+    elif permissions is not None and "superuser.write" in permissions:
         return True
 
     # superuser read-only can only hit GET and OPTIONS (pre-flight) requests

+ 35 - 0
tests/sentry/api/bases/test_user.py

@@ -3,6 +3,7 @@ from __future__ import annotations
 from unittest.mock import patch
 
 import pytest
+from django.test import override_settings
 
 from sentry.api.bases.user import (
     RegionSiloUserEndpoint,
@@ -13,6 +14,7 @@ from sentry.api.bases.user import (
 from sentry.api.exceptions import ResourceDoesNotExist
 from sentry.auth.staff import is_active_staff
 from sentry.testutils.cases import DRFPermissionTestCase
+from sentry.testutils.helpers.features import with_feature
 from sentry.testutils.silo import all_silo_test, control_silo_test, region_silo_test
 
 
@@ -32,13 +34,46 @@ class UserPermissionTest(DRFPermissionTestCase):
             self.make_request(self.normal_user), None, self.normal_user
         )
 
+    @override_settings(SUPERUSER_ORG_ID=1000)
     def test_allows_active_superuser(self):
         # The user passed in and the user on the request must be different to
         # check superuser.
+        self.create_organization(owner=self.superuser_user, id=1000)
         assert self.user_permission.has_object_permission(
             self.superuser_request, None, self.normal_user
         )
 
+        with self.settings(SENTRY_SELF_HOSTED=False):
+            assert self.user_permission.has_object_permission(
+                self.superuser_request, None, self.normal_user
+            )
+
+    @override_settings(SENTRY_SELF_HOSTED=False, SUPERUSER_ORG_ID=1000)
+    @with_feature("auth:enterprise-superuser-read-write")
+    def test_active_superuser_read(self):
+        # superuser read can hit GET
+        request = self.make_request(user=self.superuser_user, is_superuser=True, method="GET")
+        self.create_organization(owner=self.superuser_user, id=1000)
+        assert self.user_permission.has_object_permission(request, None, self.normal_user)
+
+        # superuser read cannot hit POST
+        request.method = "POST"
+        assert not self.user_permission.has_object_permission(request, None, self.normal_user)
+
+    @override_settings(SENTRY_SELF_HOSTED=False, SUPERUSER_ORG_ID=1000)
+    @with_feature("auth:enterprise-superuser-read-write")
+    def test_active_superuser_write(self):
+        # superuser write can hit GET
+        self.add_user_permission(self.superuser_user, "superuser.write")
+        self.create_organization(owner=self.superuser_user, id=1000)
+
+        request = self.make_request(user=self.superuser_user, is_superuser=True, method="GET")
+        assert self.user_permission.has_object_permission(request, None, self.normal_user)
+
+        # superuser write can hit POST
+        request.method = "POST"
+        assert self.user_permission.has_object_permission(request, None, self.normal_user)
+
     def test_rejects_active_staff(self):
         # The user passed in and the user on the request must be different to
         # check staff.

+ 94 - 70
tests/sentry/auth/test_superuser.py

@@ -5,6 +5,7 @@ from unittest.mock import Mock, patch
 import pytest
 from django.contrib.auth.models import AnonymousUser
 from django.core import signing
+from django.test import override_settings
 from django.utils import timezone as django_timezone
 
 from sentry.auth.superuser import (
@@ -174,47 +175,44 @@ class SuperuserTestCase(TestCase):
         superuser = Superuser(request, allowed_ips=())
         assert superuser.is_active is False
 
+    @override_settings(SENTRY_SELF_HOSTED=False, VALIDATE_SUPERUSER_ACCESS_CATEGORY_AND_REASON=True)
     @mock.patch("sentry.auth.superuser.logger")
     def test_su_access_logs(self, logger):
-        with self.settings(
-            SENTRY_SELF_HOSTED=False, VALIDATE_SUPERUSER_ACCESS_CATEGORY_AND_REASON=True
-        ):
-            user = User(is_superuser=True, email="test@sentry.io")
-            request = self.make_request(user=user, method="PUT")
-            request._body = json.dumps(
-                {
-                    "superuserAccessCategory": "for_unit_test",
-                    "superuserReason": "Edit organization settings",
-                    "isSuperuserModal": True,
-                }
-            ).encode()
-
-            superuser = Superuser(request, org_id=None)
-            superuser.set_logged_in(request.user)
-            assert superuser.is_active is True
-            assert logger.info.call_count == 2
-            logger.info.assert_any_call(
-                "superuser.superuser_access",
-                extra={
-                    "superuser_token_id": superuser.token,
-                    "user_id": user.id,
-                    "user_email": user.email,
-                    "su_access_category": "for_unit_test",
-                    "reason_for_su": "Edit organization settings",
-                },
-            )
+        user = User(is_superuser=True, email="test@sentry.io")
+        request = self.make_request(user=user, method="PUT")
+        request._body = json.dumps(
+            {
+                "superuserAccessCategory": "for_unit_test",
+                "superuserReason": "Edit organization settings",
+                "isSuperuserModal": True,
+            }
+        ).encode()
+
+        superuser = Superuser(request, org_id=None)
+        superuser.set_logged_in(request.user)
+        assert superuser.is_active is True
+        assert logger.info.call_count == 2
+        logger.info.assert_any_call(
+            "superuser.superuser_access",
+            extra={
+                "superuser_token_id": superuser.token,
+                "user_id": user.id,
+                "user_email": user.email,
+                "su_access_category": "for_unit_test",
+                "reason_for_su": "Edit organization settings",
+            },
+        )
 
+    @override_settings(SENTRY_SELF_HOSTED=False, VALIDATE_SUPERUSER_ACCESS_CATEGORY_AND_REASON=True)
     def test_su_access_no_request(self):
         user = User(is_superuser=True)
         request = self.make_request(user=user, method="PUT")
 
         superuser = Superuser(request, org_id=None)
-        with self.settings(
-            SENTRY_SELF_HOSTED=False, VALIDATE_SUPERUSER_ACCESS_CATEGORY_AND_REASON=True
-        ):
-            with pytest.raises(EmptySuperuserAccessForm):
-                superuser.set_logged_in(request.user)
-                assert superuser.is_active is False
+
+        with pytest.raises(EmptySuperuserAccessForm):
+            superuser.set_logged_in(request.user)
+            assert superuser.is_active is False
 
     @freeze_time(BASETIME + OUTSIDE_PRIVILEGE_ACCESS_EXPIRE_TIME)
     def test_not_expired_check_org_in_request(self):
@@ -250,6 +248,7 @@ class SuperuserTestCase(TestCase):
             extra={"superuser_token": "abcdefghjiklmnog"},
         )
 
+    @override_settings(SENTRY_SELF_HOSTED=False, VALIDATE_SUPERUSER_ACCESS_CATEGORY_AND_REASON=True)
     @mock.patch("sentry.auth.superuser.logger")
     def test_su_access_no_request_user_missing_info(self, logger):
         user = User(is_superuser=True)
@@ -263,12 +262,11 @@ class SuperuserTestCase(TestCase):
         del request.user.id
 
         superuser = Superuser(request, org_id=None)
-        with self.settings(
-            SENTRY_SELF_HOSTED=False, VALIDATE_SUPERUSER_ACCESS_CATEGORY_AND_REASON=True
-        ):
-            superuser.set_logged_in(request.user)
-            logger.exception.assert_any_call("superuser.superuser_access.missing_user_info")
 
+        superuser.set_logged_in(request.user)
+        logger.exception.assert_any_call("superuser.superuser_access.missing_user_info")
+
+    @override_settings(SENTRY_SELF_HOSTED=False, VALIDATE_SUPERUSER_ACCESS_CATEGORY_AND_REASON=True)
     def test_su_access_invalid_request_body(
         self,
     ):
@@ -277,12 +275,10 @@ class SuperuserTestCase(TestCase):
         request._body = b'{"invalid" "json"}'
 
         superuser = Superuser(request, org_id=None)
-        with self.settings(
-            SENTRY_SELF_HOSTED=False, VALIDATE_SUPERUSER_ACCESS_CATEGORY_AND_REASON=True
-        ):
-            with pytest.raises(SuperuserAccessFormInvalidJson):
-                superuser.set_logged_in(request.user)
-                assert superuser.is_active is False
+
+        with pytest.raises(SuperuserAccessFormInvalidJson):
+            superuser.set_logged_in(request.user)
+            assert superuser.is_active is False
 
     def test_login_saves_session(self):
         user = self.create_user("foo@example.com", is_superuser=True)
@@ -474,6 +470,7 @@ class SuperuserTestCase(TestCase):
             == '{"superuserReason":["Ensure this field has no more than 128 characters."]}'
         )
 
+    @override_settings(SENTRY_SELF_HOSTED=False)
     def test_superuser_scopes(self):
         user = self.create_user(is_superuser=True)
 
@@ -482,14 +479,13 @@ class SuperuserTestCase(TestCase):
             sso_state=RpcMemberSsoState(), permissions=["superuser.write"]
         )
 
-        with self.settings(SENTRY_SELF_HOSTED=False):
-            assert get_superuser_scopes(auth_state, user) == SUPERUSER_SCOPES
-            assert get_superuser_scopes(auth_state_with_write, user) == SUPERUSER_SCOPES
+        assert get_superuser_scopes(auth_state, user) == SUPERUSER_SCOPES
+        assert get_superuser_scopes(auth_state_with_write, user) == SUPERUSER_SCOPES
 
-            # test scope separation
-            with self.feature("auth:enterprise-superuser-read-write"):
-                assert get_superuser_scopes(auth_state, user) == SUPERUSER_READONLY_SCOPES
-                assert get_superuser_scopes(auth_state_with_write, user) == SUPERUSER_SCOPES
+        # test scope separation
+        with self.feature("auth:enterprise-superuser-read-write"):
+            assert get_superuser_scopes(auth_state, user) == SUPERUSER_READONLY_SCOPES
+            assert get_superuser_scopes(auth_state_with_write, user) == SUPERUSER_SCOPES
 
     def test_superuser_scopes_self_hosted(self):
         # self hosted always has superuser write scopes
@@ -508,16 +504,16 @@ class SuperuserTestCase(TestCase):
             assert get_superuser_scopes(auth_state, user) == SUPERUSER_SCOPES
             assert get_superuser_scopes(auth_state_with_write, user) == SUPERUSER_SCOPES
 
+    @override_settings(SENTRY_SELF_HOSTED=False)
     def test_superuser_has_permission(self):
         request = self.build_request()
 
-        with self.settings(SENTRY_SELF_HOSTED=False):
-            assert not superuser_has_permission(request)
+        assert not superuser_has_permission(request)
 
-            # logging in gives permission
-            request.superuser = Superuser(request)
-            request.superuser._is_active = True
-            assert superuser_has_permission(request)
+        # logging in gives permission
+        request.superuser = Superuser(request)
+        request.superuser._is_active = True
+        assert superuser_has_permission(request)
 
     def test_superuser_has_permission_self_hosted(self):
         request = self.build_request()
@@ -527,6 +523,7 @@ class SuperuserTestCase(TestCase):
 
         assert superuser_has_permission(request)
 
+    @override_settings(SENTRY_SELF_HOSTED=False)
     @with_feature("auth:enterprise-superuser-read-write")
     def test_superuser_has_permission_read_write_get(self):
         request = self.build_request(method="GET")
@@ -534,14 +531,14 @@ class SuperuserTestCase(TestCase):
         request.superuser = Superuser(request)
         request.superuser._is_active = True
 
-        with self.settings(SENTRY_SELF_HOSTED=False):
-            # all superusers have permission to hit GET
-            request.access = self.create_request_access()
-            assert superuser_has_permission(request)
+        # all superusers have permission to hit GET
+        request.access = self.create_request_access()
+        assert superuser_has_permission(request)
 
-            request.access = self.create_request_access(permissions=["superuser.write"])
-            assert superuser_has_permission(request)
+        request.access = self.create_request_access(permissions=["superuser.write"])
+        assert superuser_has_permission(request)
 
+    @override_settings(SENTRY_SELF_HOSTED=False)
     @with_feature("auth:enterprise-superuser-read-write")
     def test_superuser_has_permission_read_write_post(self):
         request = self.build_request(method="POST")
@@ -549,11 +546,38 @@ class SuperuserTestCase(TestCase):
         request.superuser = Superuser(request)
         request.superuser._is_active = True
 
-        with self.settings(SENTRY_SELF_HOSTED=False):
-            # superuser without superuser.write does not have permission
-            request.access = self.create_request_access()
-            assert not superuser_has_permission(request)
+        # superuser without superuser.write does not have permission
+        request.access = self.create_request_access()
+        assert not superuser_has_permission(request)
+
+        # superuser with superuser.write has permission
+        request.access = self.create_request_access(permissions=["superuser.write"])
+        assert superuser_has_permission(request)
+
+    @override_settings(SENTRY_SELF_HOSTED=False)
+    @with_feature("auth:enterprise-superuser-read-write")
+    def test_superuser_has_permission_read_write_no_request_access(self):
+        request = self.build_request(method="GET")
+
+        request.superuser = Superuser(request)
+        request.superuser._is_active = True
+
+        # no request.access and no permissions passed in
+        with pytest.raises(AssertionError):
+            superuser_has_permission(request)
+
+        # no request.access with permissions passed in
+        # all superusers have permission for GET
+        assert superuser_has_permission(request, frozenset())
+        assert superuser_has_permission(request, frozenset(["superuser.write"]))
+
+        request.method = "POST"
+
+        # no request.access and no permissions passed in
+        with pytest.raises(AssertionError):
+            superuser_has_permission(request)
 
-            # superuser with superuser.write has permission
-            request.access = self.create_request_access(permissions=["superuser.write"])
-            assert superuser_has_permission(request)
+        # no request.access with permissions passed in
+        # only superuser write has permissions for POST
+        assert not superuser_has_permission(request, frozenset())
+        assert superuser_has_permission(request, frozenset(["superuser.write"]))