Browse Source

feat(api): Block noncomplaint 2FA users from viewing a 2FA enforced organization (#6918)

* Added blocking via api for org details and home.

* got the tests to work. Need to refactor them.

* removed unused import.

* refactored api and django frontend 2fa blocking tests.

* Removed unnecesary test

* swapped range for an if statement
Lauryn Brown 7 years ago
parent
commit
6a37e6c236

+ 27 - 10
src/sentry/api/bases/organization.py

@@ -9,7 +9,7 @@ from sentry.app import raven
 from sentry.auth import access
 from sentry.auth.superuser import is_active_superuser
 from sentry.models import (
-    ApiKey, Organization, OrganizationMemberTeam, Project, ProjectTeam, ReleaseProject, Team
+    ApiKey, Authenticator, Organization, OrganizationMemberTeam, Project, ProjectTeam, ReleaseProject, Team
 )
 from sentry.utils import auth
 
@@ -22,6 +22,9 @@ class OrganizationPermission(ScopedPermission):
         'DELETE': ['org:admin'],
     }
 
+    def is_not_2fa_compliant(self, user, organization):
+        return organization.flags.require_2fa and not Authenticator.objects.user_has_2fa(user)
+
     def needs_sso(self, request, organization):
         # XXX(dcramer): this is very similar to the server-rendered views
         # logic for checking valid SSO
@@ -57,16 +60,30 @@ class OrganizationPermission(ScopedPermission):
                         'user_id': request.user.id,
                     }
                 )
-            elif request.user.is_authenticated() and self.needs_sso(request, organization):
+            elif request.user.is_authenticated():
                 # session auth needs to confirm various permissions
-                logger.info(
-                    'access.must-sso',
-                    extra={
-                        'organization_id': organization.id,
-                        'user_id': request.user.id,
-                    }
-                )
-                raise NotAuthenticated(detail='Must login via SSO')
+                if self.needs_sso(request, organization):
+
+                    logger.info(
+                        'access.must-sso',
+                        extra={
+                            'organization_id': organization.id,
+                            'user_id': request.user.id,
+                        }
+                    )
+                    raise NotAuthenticated(detail='Must login via SSO')
+
+                if self.is_not_2fa_compliant(
+                        request.user, organization):
+                    logger.info(
+                        'access.not-2fa-compliant',
+                        extra={
+                            'organization_id': organization.id,
+                            'user_id': request.user.id,
+                        }
+                    )
+                    raise NotAuthenticated(
+                        detail='Organization requires two-factor authentication to be enabled')
 
         allowed_scopes = set(self.scope_map.get(request.method, []))
         return any(request.access.has_scope(s) for s in allowed_scopes)

+ 1 - 1
src/sentry/api/endpoints/organization_details.py

@@ -15,7 +15,7 @@ from sentry.api.serializers import serialize
 from sentry.api.serializers.models.organization import (DetailedOrganizationSerializer)
 from sentry.api.serializers.rest_framework import ListField
 from sentry.models import (
-    AuditLogEntryEvent, Organization, OrganizationAvatar, OrganizationOption, OrganizationStatus, Authenticator
+    AuditLogEntryEvent, Authenticator, Organization, OrganizationAvatar, OrganizationOption, OrganizationStatus
 )
 from sentry.tasks.deletion import delete_organization
 from sentry.utils.apidocs import scenario, attach_scenarios

+ 2 - 0
src/sentry/api/serializers/models/organization.py

@@ -108,6 +108,8 @@ class DetailedOrganizationSerializer(OrganizationSerializer):
             feature_list.append('open-membership')
         if not getattr(obj.flags, 'disable_shared_issues'):
             feature_list.append('shared-issues')
+        if getattr(obj.flags, 'require_2fa'):
+            feature_list.append('require-2fa')
 
         context = super(DetailedOrganizationSerializer, self).serialize(obj, attrs, user)
         max_rate = quotas.get_maximum_quota(obj)

+ 53 - 2
src/sentry/testutils/cases.py

@@ -9,7 +9,7 @@ sentry.testutils.cases
 from __future__ import absolute_import
 
 __all__ = (
-    'TestCase', 'TransactionTestCase', 'APITestCase', 'AuthProviderTestCase', 'RuleTestCase',
+    'TestCase', 'TransactionTestCase', 'APITestCase', 'TwoFactorAPITestCase', 'AuthProviderTestCase', 'RuleTestCase',
     'PermissionTestCase', 'PluginTestCase', 'CliTestCase', 'AcceptanceTestCase',
     'IntegrationTestCase',
 )
@@ -46,7 +46,7 @@ from sentry.auth.superuser import (
     Superuser, COOKIE_SALT as SU_COOKIE_SALT, COOKIE_NAME as SU_COOKIE_NAME
 )
 from sentry.constants import MODULE_ROOT
-from sentry.models import GroupMeta, ProjectOption, DeletedOrganization
+from sentry.models import GroupMeta, ProjectOption, DeletedOrganization, Organization, TotpInterface
 from sentry.plugins import plugins
 from sentry.rules import EventState
 from sentry.utils import json
@@ -315,6 +315,57 @@ class APITestCase(BaseTestCase, BaseAPITestCase):
     pass
 
 
+class TwoFactorAPITestCase(APITestCase):
+    @fixture
+    def path_2fa(self):
+        return reverse('sentry-account-settings-2fa')
+
+    def enable_org_2fa(self, organization):
+        organization.flags.require_2fa = True
+        organization.save()
+
+    def api_enable_org_2fa(self, organization, user):
+        self.login_as(user)
+        url = reverse('sentry-api-0-organization-details', kwargs={
+            'organization_slug': organization.slug
+        })
+        return self.client.put(url, data={'require2FA': True})
+
+    def api_disable_org_2fa(self, organization, user):
+        url = reverse('sentry-api-0-organization-details', kwargs={
+            'organization_slug': organization.slug,
+        })
+        return self.client.put(url, data={'require2FA': False})
+
+    def assert_can_enable_org_2fa(self, organization, user, status_code=200):
+        self.__helper_enable_organization_2fa(organization, user, status_code)
+
+    def assert_cannot_enable_org_2fa(self, organization, user, status_code):
+        self.__helper_enable_organization_2fa(organization, user, status_code)
+
+    def __helper_enable_organization_2fa(self, organization, user, status_code):
+        response = self.api_enable_org_2fa(organization, user)
+        assert response.status_code == status_code, response.content
+        organization = Organization.objects.get(id=organization.id)
+
+        if status_code >= 200 and status_code < 300:
+            assert organization.flags.require_2fa
+        else:
+            assert not organization.flags.require_2fa
+
+    def add_2fa_users_to_org(self, organization, num_of_users=10, num_with_2fa=5):
+        non_compliant_members = []
+        for num in range(0, num_of_users):
+            user = self.create_user('foo_%s@example.com' % num)
+            self.create_member(organization=organization, user=user)
+            if num_with_2fa:
+                TotpInterface().enroll(user)
+                num_with_2fa -= 1
+            else:
+                non_compliant_members.append(user.email)
+        return non_compliant_members
+
+
 class AuthProviderTestCase(TestCase):
     provider = DummyProvider
     provider_name = 'dummy'

+ 15 - 1
src/sentry/web/frontend/base.py

@@ -15,7 +15,7 @@ from sentry import roles
 from sentry.auth import access
 from sentry.auth.superuser import is_active_superuser
 from sentry.models import (
-    Organization, OrganizationMember, OrganizationStatus, Project, ProjectStatus,
+    Authenticator, Organization, OrganizationMember, OrganizationStatus, Project, ProjectStatus,
     Team, TeamStatus
 )
 from sentry.utils import auth
@@ -106,6 +106,9 @@ class OrganizationMixin(object):
             organization=organization,
         ).exists()
 
+    def is_not_2fa_compliant(self, user, organization):
+        return organization.flags.require_2fa and not Authenticator.objects.user_has_2fa(user)
+
     def get_active_team(self, request, organization, team_slug):
         """
         Returns the currently selected team for the request or None
@@ -208,6 +211,10 @@ class BaseView(View, OrganizationMixin):
         if not self.has_permission(request, *args, **kwargs):
             return self.handle_permission_required(request, *args, **kwargs)
 
+        if 'organization' in kwargs and self.is_not_2fa_compliant(
+                request.user, kwargs['organization']):
+            return self.handle_not_2fa_compliant(request, *args, **kwargs)
+
         self.request = request
         self.default_context = self.get_context_data(request, *args, **kwargs)
 
@@ -252,9 +259,16 @@ class BaseView(View, OrganizationMixin):
         redirect_uri = self.get_no_permission_url(request, *args, **kwargs)
         return self.redirect(redirect_uri)
 
+    def handle_not_2fa_compliant(self, request, *args, **kwargs):
+        redirect_uri = self.get_not_2fa_compliant_url(request, *args, **kwargs)
+        return self.redirect(redirect_uri)
+
     def get_no_permission_url(request, *args, **kwargs):
         return reverse('sentry-login')
 
+    def get_not_2fa_compliant_url(self, request, *args, **kwargs):
+        return reverse('sentry-account-settings-2fa')
+
     def get_context_data(self, request, **kwargs):
         context = csrf(request)
         return context

+ 85 - 105
tests/sentry/api/endpoints/test_organization_details.py

@@ -6,17 +6,18 @@ from base64 import b64encode
 from django.core.urlresolvers import reverse
 from django.core import mail
 from mock import patch
+from exam import fixture
 
 from sentry.models import (
+    Authenticator,
+    DeletedOrganization,
     Organization,
     OrganizationAvatar,
     OrganizationOption,
     OrganizationStatus,
-    DeletedOrganization,
-    Authenticator,
     TotpInterface)
 from sentry.signals import project_created
-from sentry.testutils import APITestCase
+from sentry.testutils import APITestCase, TwoFactorAPITestCase
 
 
 class OrganizationDetailsTest(APITestCase):
@@ -404,124 +405,103 @@ class OrganizationDeleteTest(APITestCase):
         assert response.status_code == 400, response.data
 
 
-class OrganizationSettings2FATest(APITestCase):
-    def enable_user_2fa(self, user):
-        TotpInterface().enroll(user)
-        assert Authenticator.objects.user_has_2fa(user)
-
-    def assert_can_enable_org_2fa(self, organization, user, status_code=200):
-        self.__helper_enable_organization_2fa(organization, user, status_code)
-
-    def assert_cannot_enable_org_2fa(self, organization, user, status_code):
-        self.__helper_enable_organization_2fa(organization, user, status_code)
-
-    def enable_org_2fa(self, organization, user):
-        self.login_as(user)
-        url = reverse(
-            'sentry-api-0-organization-details', kwargs={
-                'organization_slug': organization.slug,
-            }
-        )
-        response = self.client.put(
-            url,
-            data={
-                'require2FA': True,
-            }
-        )
-        return response
-
-    def disable_org_2fa(self, organization, user):
-        url = reverse(
-            'sentry-api-0-organization-details', kwargs={
-                'organization_slug': organization.slug,
-            }
-        )
-        response = self.client.put(
-            url,
-            data={
-                'require2FA': False,
-            }
-        )
-        return response
-
-    def __helper_enable_organization_2fa(self, organization, user, status_code):
-        response = self.enable_org_2fa(organization, user)
-
-        assert response.status_code == status_code, response.content
-        organization = Organization.objects.get(id=organization.id)
-
-        if status_code in range(200, 300):
-            assert organization.flags.require_2fa
-        else:
-            assert not organization.flags.require_2fa
+class OrganizationSettings2FATest(TwoFactorAPITestCase):
+    def setUp(self):
+        self.org_2fa = self.create_organization(owner=self.create_user())
+        self.enable_org_2fa(self.org_2fa)
+        self.no_2fa_user = self.create_user()
+        self.create_member(organization=self.org_2fa, user=self.no_2fa_user, role="member")
+
+        # 2FA not enforced org and members
+        self.owner = self.create_user()
+        self.organization = self.create_organization(owner=self.owner)
+        self.manager = self.create_user()
+        self.create_member(organization=self.organization, user=self.manager, role="manager")
+        self.org_user = self.create_user()
+        self.create_member(organization=self.organization, user=self.org_user, role="member")
+
+    @fixture
+    def path(self):
+        return reverse('sentry-api-0-organization-details', kwargs={
+            'organization_slug': self.org_2fa.slug,
+        })
+
+    def assert_2fa_email_equal(self, outbox, expected):
+        assert len(outbox) == len(expected)
+        assert sorted([email.to[0] for email in outbox]) == sorted(expected)
+
+    def assert_can_access_org_details(self, url):
+        response = self.client.get(url)
+        assert response.status_code == 200
 
-    def add_2fa_users_to_org(self, organization, num_of_users, num_with_2fa):
-        non_compliant_members = []
-        for num in range(0, num_of_users):
-            user = self.create_user('foo_%s@example.com' % num)
-            self.create_member(organization=organization, user=user)
-            if num % num_with_2fa:
-                TotpInterface().enroll(user)
-            else:
-                non_compliant_members.append(user.email)
-        return non_compliant_members
+    def assert_cannot_access_org_details(self, url):
+        response = self.client.get(url)
+        assert response.status_code == 401
 
     def test_cannot_enforce_2fa_without_2fa_enabled(self):
-        owner = self.create_user()
-        organization = self.create_organization(owner=owner)
-
-        assert not Authenticator.objects.user_has_2fa(owner)
-        self.assert_cannot_enable_org_2fa(organization, owner, 400)
-
-    def test_owner_can_set_2fa(self):
-        owner = self.create_user()
-        organization = self.create_organization(owner=owner)
+        assert not Authenticator.objects.user_has_2fa(self.owner)
+        self.assert_cannot_enable_org_2fa(self.organization, self.owner, 400)
 
-        self.enable_user_2fa(owner)
+    def test_owner_can_set_2fa_single_member(self):
+        org = self.create_organization(owner=self.owner)
+        TotpInterface().enroll(self.owner)
         with self.options({'system.url-prefix': 'http://example.com'}), self.tasks():
-            self.assert_can_enable_org_2fa(organization, owner)
+            self.assert_can_enable_org_2fa(org, self.owner)
         assert len(mail.outbox) == 0
 
     def test_manager_can_set_2fa(self):
-        manager = self.create_user()
-        owner = self.create_user()
-        organization = self.create_organization(owner=owner)
-        self.create_member(organization=organization, user=manager, role="manager")
+        org = self.create_organization(owner=self.owner)
+        self.create_member(organization=org, user=self.manager, role="manager")
 
-        self.assert_cannot_enable_org_2fa(organization, manager, 400)
-        self.enable_user_2fa(manager)
+        self.assert_cannot_enable_org_2fa(org, self.manager, 400)
+        TotpInterface().enroll(self.manager)
         with self.options({'system.url-prefix': 'http://example.com'}), self.tasks():
-            self.assert_can_enable_org_2fa(organization, manager)
-
-        assert len(mail.outbox) == 1
-        assert mail.outbox[0].to[0] == owner.email
+            self.assert_can_enable_org_2fa(org, self.manager)
+        self.assert_2fa_email_equal(mail.outbox, [self.owner.email])
 
     def test_members_cannot_set_2fa(self):
-        member = self.create_user()
-        organization = self.create_organization(owner=self.create_user())
-        self.create_member(organization=organization, user=member, role="member")
-
-        self.assert_cannot_enable_org_2fa(organization, member, 403)
-        self.enable_user_2fa(member)
-        self.assert_cannot_enable_org_2fa(organization, member, 403)
-
-    def test_owner_can_disable_org_2fa(self):
-        owner = self.create_user()
-        organization = self.create_organization(owner=owner)
-        user_emails_without_2fa = self.add_2fa_users_to_org(organization, 10, 2)
-        self.enable_user_2fa(owner)
+        self.assert_cannot_enable_org_2fa(self.organization, self.org_user, 403)
+        TotpInterface().enroll(self.org_user)
+        self.assert_cannot_enable_org_2fa(self.organization, self.org_user, 403)
+
+    def test_owner_can_set_org_2fa(self):
+        org = self.create_organization(owner=self.owner)
+        TotpInterface().enroll(self.owner)
+        user_emails_without_2fa = self.add_2fa_users_to_org(org)
+
         with self.options({'system.url-prefix': 'http://example.com'}), self.tasks():
-            self.assert_can_enable_org_2fa(organization, owner)
-        assert len(mail.outbox) == 5
-        assert sorted([email.to[0] for email in mail.outbox]) == sorted(user_emails_without_2fa)
+            self.assert_can_enable_org_2fa(org, self.owner)
+        self.assert_2fa_email_equal(mail.outbox, user_emails_without_2fa)
 
-        # Empty the test outbox
         mail.outbox = []
-
         with self.options({'system.url-prefix': 'http://example.com'}), self.tasks():
-            response = self.disable_org_2fa(organization, owner)
+            response = self.api_disable_org_2fa(org, self.owner)
 
         assert response.status_code == 200
-        org_disabled_2fa = Organization.objects.get(id=organization.id)
-        assert not org_disabled_2fa.flags.require_2fa
+        assert not Organization.objects.get(id=org.id).flags.require_2fa
         assert len(mail.outbox) == 0
+
+    def test_preexisting_members_must_enable_2fa(self):
+        self.login_as(self.no_2fa_user)
+        self.assert_cannot_access_org_details(self.path)
+
+        TotpInterface().enroll(self.no_2fa_user)
+        self.assert_can_access_org_details(self.path)
+
+    def test_new_member_must_enable_2fa(self):
+        new_user = self.create_user()
+        self.create_member(organization=self.org_2fa, user=new_user, role="member")
+        self.login_as(new_user)
+        self.assert_cannot_access_org_details(self.path)
+
+        TotpInterface().enroll(new_user)
+        self.assert_can_access_org_details(self.path)
+
+    def test_member_disable_all_2fa_blocked(self):
+        TotpInterface().enroll(self.no_2fa_user)
+        self.login_as(self.no_2fa_user)
+
+        self.assert_can_access_org_details(self.path)
+
+        Authenticator.objects.get(user=self.no_2fa_user).delete()
+        self.assert_cannot_access_org_details(self.path)

+ 50 - 2
tests/sentry/api/endpoints/test_organization_index.py

@@ -5,8 +5,8 @@ import six
 from django.core.urlresolvers import reverse
 from exam import fixture
 
-from sentry.models import Organization, OrganizationStatus
-from sentry.testutils import APITestCase
+from sentry.models import Authenticator, Organization, OrganizationStatus, TotpInterface
+from sentry.testutils import APITestCase, TwoFactorAPITestCase
 
 
 class OrganizationsListTest(APITestCase):
@@ -80,3 +80,51 @@ class OrganizationsCreateTest(APITestCase):
         assert resp.status_code == 201, resp.content
         org = Organization.objects.get(id=resp.data['id'])
         assert org.slug == 'hello-world'
+
+
+class OrganizationIndex2faTest(TwoFactorAPITestCase):
+    def setUp(self):
+        self.org_2fa = self.create_organization(owner=self.create_user())
+        self.enable_org_2fa(self.org_2fa)
+        self.no_2fa_user = self.create_user()
+        self.create_member(organization=self.org_2fa, user=self.no_2fa_user, role="member")
+
+    @fixture
+    def path(self):
+        return reverse('sentry-organization-home', kwargs={
+            'organization_slug': self.org_2fa.slug,
+        })
+
+    def assert_can_access_org_home(self):
+        response = self.client.get(self.path)
+        assert response.status_code == 200
+
+    def assert_redirected_to_2fa(self):
+        response = self.client.get(self.path)
+        assert response.status_code == 302
+        assert self.path_2fa in response.url
+
+    def test_preexisting_members_must_enable_2fa(self):
+        self.login_as(self.no_2fa_user)
+        self.assert_redirected_to_2fa()
+
+        TotpInterface().enroll(self.no_2fa_user)
+        self.assert_can_access_org_home()
+
+    def test_new_member_must_enable_2fa(self):
+        new_user = self.create_user()
+        self.create_member(organization=self.org_2fa, user=new_user, role="member")
+        self.login_as(new_user)
+
+        self.assert_redirected_to_2fa()
+
+        TotpInterface().enroll(new_user)
+        self.assert_can_access_org_home()
+
+    def test_member_disable_all_2fa_blocked(self):
+        TotpInterface().enroll(self.no_2fa_user)
+        self.login_as(self.no_2fa_user)
+        self.assert_can_access_org_home()
+
+        Authenticator.objects.get(user=self.no_2fa_user).delete()
+        self.assert_redirected_to_2fa()