test_staff.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. from contextlib import contextmanager
  2. from datetime import UTC, datetime, timedelta
  3. from unittest.mock import Mock, patch
  4. from django.contrib.auth.models import AnonymousUser
  5. from django.core import signing
  6. from django.utils import timezone
  7. from sentry.auth import staff
  8. from sentry.auth.staff import (
  9. COOKIE_DOMAIN,
  10. COOKIE_HTTPONLY,
  11. COOKIE_NAME,
  12. COOKIE_PATH,
  13. COOKIE_SALT,
  14. COOKIE_SECURE,
  15. IDLE_MAX_AGE,
  16. MAX_AGE,
  17. SESSION_KEY,
  18. Staff,
  19. is_active_staff,
  20. )
  21. from sentry.auth.system import SystemToken
  22. from sentry.middleware.placeholder import placeholder_get_response
  23. from sentry.middleware.staff import StaffMiddleware
  24. from sentry.testutils.cases import TestCase
  25. from sentry.testutils.helpers.datetime import freeze_time
  26. from sentry.testutils.silo import control_silo_test
  27. from sentry.utils.auth import mark_sso_complete
  28. UNSET = object()
  29. BASETIME = datetime(2022, 3, 21, 0, 0, tzinfo=UTC)
  30. EXPIRE_TIME = timedelta(hours=4, minutes=1)
  31. INSIDE_PRIVILEGE_ACCESS_EXPIRE_TIME = timedelta(minutes=14)
  32. IDLE_EXPIRE_TIME = OUTSIDE_PRIVILEGE_ACCESS_EXPIRE_TIME = timedelta(hours=2)
  33. @contextmanager
  34. def override_org_id(new_org_id: int):
  35. """
  36. ORG_ID in staff.py is loaded from STAFF_ORG_ID in settings. This happens at
  37. the module level, but we cannot override module level variables using
  38. Django's built-in override_settings, so we need this context manager.
  39. """
  40. old_org_id = staff.STAFF_ORG_ID
  41. staff.STAFF_ORG_ID = new_org_id
  42. try:
  43. yield
  44. finally:
  45. staff.STAFF_ORG_ID = old_org_id
  46. @control_silo_test
  47. @freeze_time(BASETIME)
  48. class StaffTestCase(TestCase):
  49. def setUp(self):
  50. super().setUp()
  51. self.current_datetime = timezone.now()
  52. self.default_token = "abcdefghijklmnog"
  53. self.staff_user = self.create_user(is_staff=True)
  54. def build_request(
  55. self,
  56. cookie_token=UNSET,
  57. session_token=UNSET,
  58. expires=UNSET,
  59. idle_expires=UNSET,
  60. uid=UNSET,
  61. session_data=True,
  62. user=None,
  63. ):
  64. if user is None:
  65. user = self.staff_user
  66. request = self.make_request(user=user)
  67. if cookie_token is not None:
  68. request.COOKIES[COOKIE_NAME] = signing.get_cookie_signer(
  69. salt=COOKIE_NAME + COOKIE_SALT
  70. ).sign(self.default_token if cookie_token is UNSET else cookie_token)
  71. if session_data:
  72. request.session[SESSION_KEY] = {
  73. "exp": (
  74. self.current_datetime + timedelta(hours=4) if expires is UNSET else expires
  75. ).strftime("%s"),
  76. "idl": (
  77. self.current_datetime + timedelta(minutes=15)
  78. if idle_expires is UNSET
  79. else idle_expires
  80. ).strftime("%s"),
  81. "tok": self.default_token if session_token is UNSET else session_token,
  82. "uid": str(user.id) if uid is UNSET else uid,
  83. }
  84. return request
  85. def test_ips(self):
  86. request = self.make_request(user=self.staff_user)
  87. request.META["REMOTE_ADDR"] = "10.0.0.1"
  88. # no ips = any host
  89. staff = Staff(request, allowed_ips=())
  90. staff.set_logged_in(request.user)
  91. assert staff.is_active is True
  92. staff = Staff(request, allowed_ips=("127.0.0.1",))
  93. staff.set_logged_in(request.user)
  94. assert staff.is_active is False
  95. staff = Staff(request, allowed_ips=("10.0.0.1",))
  96. staff.set_logged_in(request.user)
  97. assert staff.is_active is True
  98. def test_sso(self):
  99. request = self.make_request(user=self.staff_user)
  100. # no ips = any host
  101. staff = Staff(request)
  102. staff.set_logged_in(request.user)
  103. assert staff.is_active is True
  104. # Set ORG_ID so we run the SSO check
  105. with override_org_id(new_org_id=self.organization.id):
  106. staff = Staff(request)
  107. staff.set_logged_in(request.user)
  108. assert staff.is_active is False
  109. mark_sso_complete(request, self.organization.id)
  110. staff = Staff(request)
  111. staff.set_logged_in(request.user)
  112. assert staff.is_active is True
  113. def test_valid_data(self):
  114. request = self.build_request()
  115. staff = Staff(request, allowed_ips=())
  116. assert staff.is_active is True
  117. def test_missing_cookie(self):
  118. request = self.build_request(cookie_token=None)
  119. staff = Staff(request, allowed_ips=())
  120. assert staff.is_active is False
  121. def test_invalid_cookie_token(self):
  122. request = self.build_request(cookie_token="foobar")
  123. staff = Staff(request, allowed_ips=())
  124. assert staff.is_active is False
  125. def test_invalid_session_token(self):
  126. request = self.build_request(session_token="foobar")
  127. staff = Staff(request, allowed_ips=())
  128. assert staff.is_active is False
  129. def test_missing_data(self):
  130. request = self.build_request(session_data=False)
  131. staff = Staff(request, allowed_ips=())
  132. assert staff.is_active is False
  133. def test_invalid_uid(self):
  134. request = self.build_request(uid=-1)
  135. staff = Staff(request, allowed_ips=())
  136. assert staff.is_active is False
  137. @freeze_time(BASETIME + EXPIRE_TIME)
  138. def test_expired(self):
  139. # Set idle time to the current time so we fail on checking expire time
  140. # and not idle time.
  141. request = self.build_request(
  142. idle_expires=BASETIME + EXPIRE_TIME, expires=self.current_datetime
  143. )
  144. staff = Staff(request, allowed_ips=())
  145. assert staff.is_active is False
  146. @freeze_time(BASETIME + IDLE_EXPIRE_TIME)
  147. def test_idle_expired(self):
  148. request = self.build_request(idle_expires=self.current_datetime)
  149. staff = Staff(request, allowed_ips=())
  150. assert staff.is_active is False
  151. def test_login_saves_session(self):
  152. request = self.make_request()
  153. staff = Staff(request, allowed_ips=())
  154. staff.set_logged_in(self.staff_user)
  155. # request.user wasn't set
  156. assert not staff.is_active
  157. request.user = self.staff_user
  158. assert staff.is_active
  159. # See mypy issue: https://github.com/python/mypy/issues/9457
  160. data = request.session.get(SESSION_KEY) # type:ignore[unreachable]
  161. assert data
  162. assert data["exp"] == (self.current_datetime + MAX_AGE).strftime("%s")
  163. assert data["idl"] == (self.current_datetime + IDLE_MAX_AGE).strftime("%s")
  164. assert len(data["tok"]) == 12
  165. assert data["uid"] == str(self.staff_user.id)
  166. def test_logout_clears_session(self):
  167. request = self.build_request()
  168. staff = Staff(request, allowed_ips=())
  169. staff.set_logged_out()
  170. assert not staff.is_active
  171. assert not request.session.get(SESSION_KEY)
  172. def test_middleware_as_staff(self):
  173. request = self.build_request()
  174. delattr(request, "staff")
  175. middleware = StaffMiddleware(placeholder_get_response)
  176. middleware.process_request(request)
  177. assert request.staff.is_active
  178. assert is_active_staff(request)
  179. response = Mock()
  180. middleware.process_response(request, response)
  181. response.set_signed_cookie.assert_called_once_with(
  182. COOKIE_NAME,
  183. request.staff.token,
  184. salt=COOKIE_SALT,
  185. max_age=None,
  186. secure=request.is_secure() if COOKIE_SECURE is None else COOKIE_SECURE,
  187. httponly=COOKIE_HTTPONLY,
  188. path=COOKIE_PATH,
  189. domain=COOKIE_DOMAIN,
  190. )
  191. def test_middleware_as_staff_without_session(self):
  192. request = self.build_request(session_data=False)
  193. delattr(request, "staff")
  194. middleware = StaffMiddleware(placeholder_get_response)
  195. middleware.process_request(request)
  196. assert not request.staff.is_active
  197. assert not is_active_staff(request)
  198. response = Mock()
  199. middleware.process_response(request, response)
  200. response.delete_cookie.assert_called_once_with(COOKIE_NAME)
  201. def test_middleware_as_non_staff(self):
  202. user = self.create_user("foo@example.com", is_staff=False)
  203. request = self.build_request(user=user)
  204. delattr(request, "staff")
  205. middleware = StaffMiddleware(placeholder_get_response)
  206. middleware.process_request(request)
  207. assert not request.staff.is_active
  208. assert not is_active_staff(request)
  209. response = Mock()
  210. middleware.process_response(request, response)
  211. assert not response.set_signed_cookie.called
  212. def test_changed_user(self):
  213. request = self.build_request()
  214. staff = Staff(request, allowed_ips=())
  215. assert staff.is_active
  216. # anonymous
  217. request.user = AnonymousUser()
  218. assert not staff.is_active
  219. # a non-staff
  220. # See mypy issue: https://github.com/python/mypy/issues/9457
  221. request.user = self.create_user( # type:ignore[unreachable]
  222. "baz@example.com", is_staff=False
  223. )
  224. assert not staff.is_active
  225. # a staff
  226. request.user.update(is_staff=True)
  227. assert not staff.is_active
  228. def test_is_active_staff_sys_token(self):
  229. request = self.build_request()
  230. request.auth = SystemToken()
  231. assert is_active_staff(request)
  232. def test_is_active_staff(self):
  233. request = self.build_request()
  234. request.staff = Staff(request, allowed_ips=())
  235. request.staff._is_active = True
  236. assert is_active_staff(request)
  237. def test_is_not_active_staff(self):
  238. request = self.build_request()
  239. request.staff = Staff(request, allowed_ips=())
  240. request.staff._is_active = False
  241. assert not is_active_staff(request)
  242. @patch.object(Staff, "is_active", return_value=True)
  243. def test_is_active_staff_from_request(self, mock_is_active):
  244. request = self.build_request()
  245. request.staff = None
  246. assert is_active_staff(request)