test_reauth.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # Copyright 2021 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import copy
  15. import mock
  16. import pytest
  17. from google.auth import exceptions
  18. from google.oauth2 import reauth
  19. MOCK_REQUEST = mock.Mock()
  20. CHALLENGES_RESPONSE_TEMPLATE = {
  21. "status": "CHALLENGE_REQUIRED",
  22. "sessionId": "123",
  23. "challenges": [
  24. {
  25. "status": "READY",
  26. "challengeId": 1,
  27. "challengeType": "PASSWORD",
  28. "securityKey": {},
  29. }
  30. ],
  31. }
  32. CHALLENGES_RESPONSE_AUTHENTICATED = {
  33. "status": "AUTHENTICATED",
  34. "sessionId": "123",
  35. "encodedProofOfReauthToken": "new_rapt_token",
  36. }
  37. class MockChallenge(object):
  38. def __init__(self, name, locally_eligible, challenge_input):
  39. self.name = name
  40. self.is_locally_eligible = locally_eligible
  41. self.challenge_input = challenge_input
  42. def obtain_challenge_input(self, metadata):
  43. return self.challenge_input
  44. def _test_is_interactive():
  45. with mock.patch("sys.stdin.isatty", return_value=True):
  46. assert reauth.is_interactive()
  47. def test__get_challenges():
  48. with mock.patch(
  49. "google.oauth2._client._token_endpoint_request"
  50. ) as mock_token_endpoint_request:
  51. reauth._get_challenges(MOCK_REQUEST, ["SAML"], "token")
  52. mock_token_endpoint_request.assert_called_with(
  53. MOCK_REQUEST,
  54. reauth._REAUTH_API + ":start",
  55. {"supportedChallengeTypes": ["SAML"]},
  56. access_token="token",
  57. use_json=True,
  58. )
  59. def test__get_challenges_with_scopes():
  60. with mock.patch(
  61. "google.oauth2._client._token_endpoint_request"
  62. ) as mock_token_endpoint_request:
  63. reauth._get_challenges(
  64. MOCK_REQUEST, ["SAML"], "token", requested_scopes=["scope"]
  65. )
  66. mock_token_endpoint_request.assert_called_with(
  67. MOCK_REQUEST,
  68. reauth._REAUTH_API + ":start",
  69. {
  70. "supportedChallengeTypes": ["SAML"],
  71. "oauthScopesForDomainPolicyLookup": ["scope"],
  72. },
  73. access_token="token",
  74. use_json=True,
  75. )
  76. def test__send_challenge_result():
  77. with mock.patch(
  78. "google.oauth2._client._token_endpoint_request"
  79. ) as mock_token_endpoint_request:
  80. reauth._send_challenge_result(
  81. MOCK_REQUEST, "123", "1", {"credential": "password"}, "token"
  82. )
  83. mock_token_endpoint_request.assert_called_with(
  84. MOCK_REQUEST,
  85. reauth._REAUTH_API + "/123:continue",
  86. {
  87. "sessionId": "123",
  88. "challengeId": "1",
  89. "action": "RESPOND",
  90. "proposalResponse": {"credential": "password"},
  91. },
  92. access_token="token",
  93. use_json=True,
  94. )
  95. def test__run_next_challenge_not_ready():
  96. challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
  97. challenges_response["challenges"][0]["status"] = "STATUS_UNSPECIFIED"
  98. assert (
  99. reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") is None
  100. )
  101. def test__run_next_challenge_not_supported():
  102. challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
  103. challenges_response["challenges"][0]["challengeType"] = "CHALLENGE_TYPE_UNSPECIFIED"
  104. with pytest.raises(exceptions.ReauthFailError) as excinfo:
  105. reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token")
  106. assert excinfo.match(r"Unsupported challenge type CHALLENGE_TYPE_UNSPECIFIED")
  107. def test__run_next_challenge_not_locally_eligible():
  108. mock_challenge = MockChallenge("PASSWORD", False, "challenge_input")
  109. with mock.patch(
  110. "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
  111. ):
  112. with pytest.raises(exceptions.ReauthFailError) as excinfo:
  113. reauth._run_next_challenge(
  114. CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
  115. )
  116. assert excinfo.match(r"Challenge PASSWORD is not locally eligible")
  117. def test__run_next_challenge_no_challenge_input():
  118. mock_challenge = MockChallenge("PASSWORD", True, None)
  119. with mock.patch(
  120. "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
  121. ):
  122. assert (
  123. reauth._run_next_challenge(
  124. CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
  125. )
  126. is None
  127. )
  128. def test__run_next_challenge_success():
  129. mock_challenge = MockChallenge("PASSWORD", True, {"credential": "password"})
  130. with mock.patch(
  131. "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
  132. ):
  133. with mock.patch(
  134. "google.oauth2.reauth._send_challenge_result"
  135. ) as mock_send_challenge_result:
  136. reauth._run_next_challenge(
  137. CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
  138. )
  139. mock_send_challenge_result.assert_called_with(
  140. MOCK_REQUEST, "123", 1, {"credential": "password"}, "token"
  141. )
  142. def test__obtain_rapt_authenticated():
  143. with mock.patch(
  144. "google.oauth2.reauth._get_challenges",
  145. return_value=CHALLENGES_RESPONSE_AUTHENTICATED,
  146. ):
  147. assert reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
  148. def test__obtain_rapt_authenticated_after_run_next_challenge():
  149. with mock.patch(
  150. "google.oauth2.reauth._get_challenges",
  151. return_value=CHALLENGES_RESPONSE_TEMPLATE,
  152. ):
  153. with mock.patch(
  154. "google.oauth2.reauth._run_next_challenge",
  155. side_effect=[
  156. CHALLENGES_RESPONSE_TEMPLATE,
  157. CHALLENGES_RESPONSE_AUTHENTICATED,
  158. ],
  159. ):
  160. with mock.patch("google.oauth2.reauth.is_interactive", return_value=True):
  161. assert (
  162. reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
  163. )
  164. def test__obtain_rapt_unsupported_status():
  165. challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
  166. challenges_response["status"] = "STATUS_UNSPECIFIED"
  167. with mock.patch(
  168. "google.oauth2.reauth._get_challenges", return_value=challenges_response
  169. ):
  170. with pytest.raises(exceptions.ReauthFailError) as excinfo:
  171. reauth._obtain_rapt(MOCK_REQUEST, "token", None)
  172. assert excinfo.match(r"API error: STATUS_UNSPECIFIED")
  173. def test__obtain_rapt_not_interactive():
  174. with mock.patch(
  175. "google.oauth2.reauth._get_challenges",
  176. return_value=CHALLENGES_RESPONSE_TEMPLATE,
  177. ):
  178. with mock.patch("google.oauth2.reauth.is_interactive", return_value=False):
  179. with pytest.raises(exceptions.ReauthFailError) as excinfo:
  180. reauth._obtain_rapt(MOCK_REQUEST, "token", None)
  181. assert excinfo.match(r"not in an interactive session")
  182. def test__obtain_rapt_not_authenticated():
  183. with mock.patch(
  184. "google.oauth2.reauth._get_challenges",
  185. return_value=CHALLENGES_RESPONSE_TEMPLATE,
  186. ):
  187. with mock.patch("google.oauth2.reauth.RUN_CHALLENGE_RETRY_LIMIT", 0):
  188. with pytest.raises(exceptions.ReauthFailError) as excinfo:
  189. reauth._obtain_rapt(MOCK_REQUEST, "token", None)
  190. assert excinfo.match(r"Reauthentication failed")
  191. def test_get_rapt_token():
  192. with mock.patch(
  193. "google.oauth2._client.refresh_grant", return_value=("token", None, None, None)
  194. ) as mock_refresh_grant:
  195. with mock.patch(
  196. "google.oauth2.reauth._obtain_rapt", return_value="new_rapt_token"
  197. ) as mock_obtain_rapt:
  198. assert (
  199. reauth.get_rapt_token(
  200. MOCK_REQUEST,
  201. "client_id",
  202. "client_secret",
  203. "refresh_token",
  204. "token_uri",
  205. )
  206. == "new_rapt_token"
  207. )
  208. mock_refresh_grant.assert_called_with(
  209. request=MOCK_REQUEST,
  210. client_id="client_id",
  211. client_secret="client_secret",
  212. refresh_token="refresh_token",
  213. token_uri="token_uri",
  214. scopes=[reauth._REAUTH_SCOPE],
  215. )
  216. mock_obtain_rapt.assert_called_with(
  217. MOCK_REQUEST, "token", requested_scopes=None
  218. )
  219. def test_refresh_grant_failed():
  220. with mock.patch(
  221. "google.oauth2._client._token_endpoint_request_no_throw"
  222. ) as mock_token_request:
  223. mock_token_request.return_value = (False, {"error": "Bad request"})
  224. with pytest.raises(exceptions.RefreshError) as excinfo:
  225. reauth.refresh_grant(
  226. MOCK_REQUEST,
  227. "token_uri",
  228. "refresh_token",
  229. "client_id",
  230. "client_secret",
  231. scopes=["foo", "bar"],
  232. rapt_token="rapt_token",
  233. )
  234. assert excinfo.match(r"Bad request")
  235. mock_token_request.assert_called_with(
  236. MOCK_REQUEST,
  237. "token_uri",
  238. {
  239. "grant_type": "refresh_token",
  240. "client_id": "client_id",
  241. "client_secret": "client_secret",
  242. "refresh_token": "refresh_token",
  243. "scope": "foo bar",
  244. "rapt": "rapt_token",
  245. },
  246. )
  247. def test_refresh_grant_success():
  248. with mock.patch(
  249. "google.oauth2._client._token_endpoint_request_no_throw"
  250. ) as mock_token_request:
  251. mock_token_request.side_effect = [
  252. (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}),
  253. (True, {"access_token": "access_token"}),
  254. ]
  255. with mock.patch(
  256. "google.oauth2.reauth.get_rapt_token", return_value="new_rapt_token"
  257. ):
  258. assert reauth.refresh_grant(
  259. MOCK_REQUEST, "token_uri", "refresh_token", "client_id", "client_secret"
  260. ) == (
  261. "access_token",
  262. "refresh_token",
  263. None,
  264. {"access_token": "access_token"},
  265. "new_rapt_token",
  266. )