test_api.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. from datetime import datetime, timedelta, timezone
  2. from sentry.models.authidentity import AuthIdentity
  3. from sentry.models.authprovider import AuthProvider
  4. from sentry.silo import SiloMode
  5. from sentry.testutils.cases import AuthProviderTestCase
  6. from sentry.testutils.silo import assume_test_silo_mode, region_silo_test
  7. from sentry.testutils.skips import requires_snuba
  8. from sentry.utils.auth import SSO_EXPIRY_TIME, SsoSession
  9. pytestmark = [requires_snuba]
  10. # TODO: move these into the tests/sentry/auth directory and remove deprecated logic
  11. @region_silo_test
  12. class AuthenticationTest(AuthProviderTestCase):
  13. def setUp(self):
  14. self.organization = self.create_organization(name="foo")
  15. self.user = self.create_user("foobar@example.com", is_superuser=False)
  16. team = self.create_team(name="bar", organization=self.organization)
  17. self.project = self.create_project(name="baz", organization=self.organization, teams=[team])
  18. member = self.create_member(user=self.user, organization=self.organization, teams=[team])
  19. setattr(member.flags, "sso:linked", True)
  20. member.save()
  21. event = self.store_event(data={}, project_id=self.project.id)
  22. group_id = event.group_id
  23. with assume_test_silo_mode(SiloMode.CONTROL):
  24. auth_provider = AuthProvider.objects.create(
  25. organization_id=self.organization.id, provider="dummy", flags=0
  26. )
  27. AuthIdentity.objects.create(auth_provider=auth_provider, user=self.user)
  28. self.login_as(self.user)
  29. self.paths = (
  30. f"/api/0/organizations/{self.organization.slug}/",
  31. f"/api/0/projects/{self.organization.slug}/{self.project.slug}/",
  32. f"/api/0/teams/{self.organization.slug}/{self.team.slug}/",
  33. f"/api/0/issues/{group_id}/",
  34. # this uses the internal API, which once upon a time was broken
  35. f"/api/0/issues/{group_id}/events/latest/",
  36. )
  37. def test_sso_auth_required(self):
  38. # we should be redirecting the user to the authentication form as they
  39. # haven't verified this specific organization
  40. self._test_paths_with_status(401)
  41. def test_sso_superuser_required(self):
  42. with assume_test_silo_mode(SiloMode.CONTROL):
  43. # superuser should still require SSO as they're a member of the org
  44. self.user.update(is_superuser=True)
  45. self._test_paths_with_status(401)
  46. def test_sso_with_expiry_valid(self):
  47. sso_session = SsoSession.create(self.organization.id)
  48. self.session[sso_session.session_key] = sso_session.to_dict()
  49. self.save_session()
  50. self._test_paths_with_status(200)
  51. def test_sso_with_expiry_expired(self):
  52. sso_session_expired = SsoSession(
  53. self.organization.id,
  54. datetime.now(tz=timezone.utc) - SSO_EXPIRY_TIME - timedelta(hours=1),
  55. )
  56. self.session[sso_session_expired.session_key] = sso_session_expired.to_dict()
  57. self.save_session()
  58. self._test_paths_with_status(401)
  59. def test_sso_redirect_url_internal(self):
  60. sso_session_expired = SsoSession(
  61. self.organization.id,
  62. datetime.now(tz=timezone.utc) - SSO_EXPIRY_TIME - timedelta(hours=1),
  63. )
  64. self.session[sso_session_expired.session_key] = sso_session_expired.to_dict()
  65. self.save_session()
  66. resp = self.client.get(
  67. f"/api/0/teams/{self.organization.slug}/{self.team.slug}/",
  68. HTTP_REFERER=f"/organizations/{self.organization.slug}/teams",
  69. )
  70. assert (
  71. resp.json()["detail"]["extra"]["loginUrl"]
  72. == "/auth/login/foo/?next=%2Forganizations%2Ffoo%2Fteams"
  73. )
  74. def test_sso_redirect_url_internal_with_domain(self):
  75. sso_session_expired = SsoSession(
  76. self.organization.id,
  77. datetime.now(tz=timezone.utc) - SSO_EXPIRY_TIME - timedelta(hours=1),
  78. )
  79. self.session[sso_session_expired.session_key] = sso_session_expired.to_dict()
  80. self.save_session()
  81. resp = self.client.get(
  82. f"/api/0/teams/{self.organization.slug}/{self.team.slug}/",
  83. HTTP_REFERER=f"https://testdomain.com/organizations/{self.organization.slug}/teams",
  84. SERVER_NAME="testdomain.com",
  85. )
  86. assert (
  87. resp.json()["detail"]["extra"]["loginUrl"]
  88. == "/auth/login/foo/?next=https%3A%2F%2Ftestdomain.com%2Forganizations%2Ffoo%2Fteams"
  89. )
  90. def test_sso_redirect_url_external_removed(self):
  91. sso_session_expired = SsoSession(
  92. self.organization.id,
  93. datetime.now(tz=timezone.utc) - SSO_EXPIRY_TIME - timedelta(hours=1),
  94. )
  95. self.session[sso_session_expired.session_key] = sso_session_expired.to_dict()
  96. self.save_session()
  97. resp = self.client.get(
  98. f"/api/0/teams/{self.organization.slug}/{self.team.slug}/",
  99. HTTP_REFERER="http://example.com",
  100. )
  101. assert resp.json()["detail"]["extra"]["loginUrl"] == "/auth/login/foo/"
  102. def _test_paths_with_status(self, status):
  103. for path in self.paths:
  104. resp = self.client.get(path)
  105. assert resp.status_code == status, (resp.status_code, resp.content)