Просмотр исходного кода

fix(integrations): check permission of user for installation (#20737)

Stephen Cefali 4 лет назад
Родитель
Сommit
1e7af4a126

+ 18 - 11
src/sentry/web/frontend/integration_extension_configuration.py

@@ -10,7 +10,7 @@ from sentry import integrations, features
 from sentry.features.exceptions import FeatureNotRegistered
 from sentry.integrations.pipeline import IntegrationPipeline
 from sentry.web.frontend.base import BaseView
-from sentry.models import Organization
+from sentry.models import Organization, OrganizationMember
 
 
 class ExternalIntegrationPipeline(IntegrationPipeline):
@@ -49,8 +49,9 @@ class IntegrationExtensionConfigurationView(BaseView):
 
         # check if we have one org
         organization = None
-        if request.user.get_orgs().count() == 1:
-            organization = request.user.get_orgs()[0]
+        organizations = request.user.get_orgs()
+        if organizations.count() == 1:
+            organization = organizations[0]
         # if we have an org slug in the query param, use that org
         elif "orgSlug" in request.GET:
             organization = Organization.objects.get(slug=request.GET["orgSlug"])
@@ -62,14 +63,20 @@ class IntegrationExtensionConfigurationView(BaseView):
 
             # only continue in the pipeline if there is at least one feature we can get
             if self.has_one_required_feature(organization, request.user):
-                # TODO(steve): we probably should check the user has permissions and show an error page if not
-                try:
-                    pipeline = self.init_pipeline(request, organization, request.GET.dict())
-                    return pipeline.current_step()
-                except SignatureExpired:
-                    return self.respond(
-                        "sentry/pipeline-error.html", {"error": "Installation link expired"},
-                    )
+                # check that the user has the org:integrations permission
+                org_member = OrganizationMember.objects.get(
+                    organization=organization, user=request.user
+                )
+                if "org:integrations" in org_member.get_scopes():
+                    try:
+                        pipeline = self.init_pipeline(request, organization, request.GET.dict())
+                        return pipeline.current_step()
+                    except SignatureExpired:
+                        return self.respond(
+                            "sentry/pipeline-error.html", {"error": "Installation link expired"},
+                        )
+
+        # if anything before fails, we give up and send them to the link page where we can display errors
         return self.redirect(
             u"/extensions/{}/link/?{}".format(self.provider, urlencode(request.GET.dict()))
         )

+ 1 - 1
tests/sentry/web/frontend/test_msteams_extension.py

@@ -13,7 +13,7 @@ class MsTeamsExtensionConfigurationTest(TestCase):
     def hit_configure(self, params):
         self.login_as(self.user)
         org = self.create_organization()
-        OrganizationMember.objects.create(user=self.user, organization=org)
+        OrganizationMember.objects.create(user=self.user, organization=org, role="admin")
         path = u"/extensions/msteams/configure/"
         return self.client.get(path, params)
 

+ 12 - 1
tests/sentry/web/frontend/test_vercel_extension_configuration.py

@@ -16,7 +16,7 @@ class VercelExtensionConfigurationTest(TestCase):
         self.user = self.create_user()
         self.org = self.create_organization()
 
-        OrganizationMember.objects.create(user=self.user, organization=self.org)
+        OrganizationMember.objects.create(user=self.user, organization=self.org, role="admin")
 
         responses.reset()
         # need oauth mocks
@@ -66,6 +66,17 @@ class VercelExtensionConfigurationTest(TestCase):
         # Goes straight to Vercel OAuth
         assert resp.status_code == 302
 
+    def test_logged_in_as_member(self):
+        OrganizationMember.objects.filter(user=self.user, organization=self.org).update(
+            role="member"
+        )
+        self.login_as(self.user)
+
+        resp = self.client.get(self.path, self.params)
+
+        assert resp.status_code == 302
+        assert "/extensions/vercel/link/" in resp.url
+
     def test_logged_in_many_orgs(self):
         self.login_as(self.user)
 

+ 1 - 1
tests/sentry/web/frontend/test_vsts_extension_configuration.py

@@ -16,7 +16,7 @@ class VstsExtensionConfigurationTest(TestCase):
         self.user = self.create_user()
         self.org = self.create_organization()
 
-        OrganizationMember.objects.create(user=self.user, organization=self.org)
+        OrganizationMember.objects.create(user=self.user, organization=self.org, role="admin")
 
     def test_logged_in_one_org(self):
         self.login_as(self.user)