Browse Source

chore(hybrid-cloud): Updates user region endpoint permissions (#61202)

Gabe Villalobos 1 year ago
parent
commit
c876a70a8d

+ 33 - 6
src/sentry/api/endpoints/user_regions.py

@@ -1,15 +1,41 @@
+from __future__ import annotations
+
 from rest_framework.request import Request
 from rest_framework.response import Response
 
 from sentry.api.api_owners import ApiOwner
 from sentry.api.api_publish_status import ApiPublishStatus
 from sentry.api.base import control_silo_endpoint
-from sentry.api.bases.user import UserEndpoint
+from sentry.api.bases.user import UserEndpoint, UserPermission
+from sentry.auth.superuser import is_active_superuser
+from sentry.auth.system import is_system_auth
 from sentry.models.organizationmapping import OrganizationMapping
 from sentry.models.organizationmembermapping import OrganizationMemberMapping
+from sentry.models.user import User
+from sentry.services.hybrid_cloud.user import RpcUser
 from sentry.types.region import get_region_by_name
 
 
+# Grants access to the list of regions where a user has organizations.
+# This should only be accessible for the current user or
+# system/superuser requests.
+#
+# This will also grant access via user auth tokens assuming the
+# user ID matches the user that is being queried.
+class UserRegionEndpointPermissions(UserPermission):
+    scope_map = {"GET": ["org:read"]}
+
+    def has_object_permission(self, request, view, user: User | RpcUser | None = None):
+        if user and user.id == request.user.id and request.user.is_authenticated:
+            return True
+        if is_system_auth(request.auth):
+            return True
+        if is_active_superuser(request):
+            return True
+
+        return False
+
+
 @control_silo_endpoint
 class UserRegionsEndpoint(UserEndpoint):
     owner = ApiOwner.HYBRID_CLOUD
@@ -17,18 +43,19 @@ class UserRegionsEndpoint(UserEndpoint):
         "GET": ApiPublishStatus.PRIVATE,
     }
 
-    def get(self, request: Request, **kwargs) -> Response:
+    permission_classes = (UserRegionEndpointPermissions,)
+
+    def get(self, request: Request, user: RpcUser, **kwargs) -> Response:
         """
         Retrieve the Regions a User has membership in
         `````````````````````````````````````````````
 
         Returns a list of regions that the current user has membership in.
-
         :auth: required
         """
-        organization_ids = OrganizationMemberMapping.objects.filter(
-            user_id=request.user.id
-        ).values_list("organization_id", flat=True)
+        organization_ids = OrganizationMemberMapping.objects.filter(user_id=user.id).values_list(
+            "organization_id", flat=True
+        )
         org_mappings = (
             OrganizationMapping.objects.filter(organization_id__in=organization_ids)
             .distinct("region_name")

+ 70 - 1
tests/sentry/api/endpoints/test_user_regions.py

@@ -16,10 +16,10 @@ class UserUserRolesTest(APITestCase):
     def setUp(self):
         super().setUp()
         self.user = self.create_user()
-        self.login_as(user=self.user)
 
     @override_regions(region_config)
     def test_get(self):
+        self.login_as(user=self.user)
         self.create_organization(region="us", owner=self.user)
         self.create_organization(region="de", owner=self.user)
         self.create_organization(region="acme", owner=self.user)
@@ -35,6 +35,7 @@ class UserUserRolesTest(APITestCase):
 
     @override_regions(region_config)
     def test_get_only_memberships(self):
+        self.login_as(user=self.user)
         other = self.create_user()
         self.create_organization(region="acme", owner=other)
         self.create_organization(region="de", owner=self.user)
@@ -46,8 +47,76 @@ class UserUserRolesTest(APITestCase):
 
     @override_regions(region_config)
     def test_get_other_user_error(self):
+        self.login_as(user=self.user)
         other = self.create_user()
         self.create_organization(region="acme", owner=other)
 
         response = self.get_response(other.id)
         assert response.status_code == 403
+
+    @override_regions(region_config)
+    def test_allow_superuser_to_query_all(self):
+        superuser = self.create_user(is_superuser=True)
+        self.login_as(user=superuser, superuser=True)
+
+        test_user_1 = self.create_user()
+        self.create_organization(region="us", owner=test_user_1)
+        self.create_organization(region="de", owner=test_user_1)
+        self.create_organization(region="acme", owner=test_user_1)
+
+        test_user_2 = self.create_user()
+        response = self.get_response(test_user_1.id)
+        assert response.status_code == 200
+        assert "regions" in response.data
+        assert response.data["regions"] == [
+            st.api_serialize(),
+            de.api_serialize(),
+            us.api_serialize(),
+        ]
+
+        response = self.get_response(test_user_2.id)
+        assert response.status_code == 200
+        assert "regions" in response.data
+        assert response.data["regions"] == []
+
+    @override_regions(region_config)
+    def test_get_for_user_with_auth_token(self):
+        self.create_organization(region="us", owner=self.user)
+        self.create_organization(region="de", owner=self.user)
+        auth_token = self.create_user_auth_token(user=self.user, scope_list=["org:read"])
+        response = self.get_success_response(
+            "me", extra_headers={"HTTP_AUTHORIZATION": f"Bearer {auth_token}"}
+        )
+        assert "regions" in response.data
+        assert response.data["regions"] == [de.api_serialize(), us.api_serialize()]
+
+    @override_regions(region_config)
+    def test_get_other_user_with_auth_token_error(self):
+        other_user = self.create_user()
+        self.create_organization(region="us", owner=other_user)
+        self.create_organization(region="de", owner=other_user)
+
+        auth_token = self.create_user_auth_token(user=self.user, scope_list=["org:read"])
+        self.get_error_response(
+            other_user.id,
+            extra_headers={"HTTP_AUTHORIZATION": f"Bearer {auth_token}"},
+            status_code=403,
+        )
+
+    @override_regions(region_config)
+    def test_get_for_user_with_wrong_scopes_error(self):
+        self.create_organization(region="us", owner=self.user)
+        self.create_organization(region="de", owner=self.user)
+
+        auth_token = self.create_user_auth_token(user=self.user, scope_list=["project:read"])
+        self.get_error_response(
+            "me", extra_headers={"HTTP_AUTHORIZATION": f"Bearer {auth_token}"}, status_code=403
+        )
+
+    @override_regions(region_config)
+    def test_get_for_user_with_no_auth(self):
+        self.create_organization(region="us", owner=self.user)
+        self.create_organization(region="de", owner=self.user)
+
+        self.get_error_response("me", status_code=401)
+        self.get_error_response(self.user.id, status_code=401)