test__client.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. # Copyright 2016 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import json
  16. import os
  17. import mock
  18. import pytest
  19. import six
  20. from six.moves import http_client
  21. from six.moves import urllib
  22. from google.auth import _helpers
  23. from google.auth import crypt
  24. from google.auth import exceptions
  25. from google.auth import jwt
  26. from google.auth import transport
  27. from google.oauth2 import _client
  28. import yatest.common
  29. DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
  30. with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
  31. PRIVATE_KEY_BYTES = fh.read()
  32. SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
  33. SCOPES_AS_LIST = [
  34. "https://www.googleapis.com/auth/pubsub",
  35. "https://www.googleapis.com/auth/logging.write",
  36. ]
  37. SCOPES_AS_STRING = (
  38. "https://www.googleapis.com/auth/pubsub"
  39. " https://www.googleapis.com/auth/logging.write"
  40. )
  41. def test__handle_error_response():
  42. response_data = {"error": "help", "error_description": "I'm alive"}
  43. with pytest.raises(exceptions.RefreshError) as excinfo:
  44. _client._handle_error_response(response_data)
  45. assert excinfo.match(r"help: I\'m alive")
  46. def test__handle_error_response_non_json():
  47. response_data = {"foo": "bar"}
  48. with pytest.raises(exceptions.RefreshError) as excinfo:
  49. _client._handle_error_response(response_data)
  50. assert excinfo.match(r"{\"foo\": \"bar\"}")
  51. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  52. def test__parse_expiry(unused_utcnow):
  53. result = _client._parse_expiry({"expires_in": 500})
  54. assert result == datetime.datetime.min + datetime.timedelta(seconds=500)
  55. def test__parse_expiry_none():
  56. assert _client._parse_expiry({}) is None
  57. def make_request(response_data, status=http_client.OK):
  58. response = mock.create_autospec(transport.Response, instance=True)
  59. response.status = status
  60. response.data = json.dumps(response_data).encode("utf-8")
  61. request = mock.create_autospec(transport.Request)
  62. request.return_value = response
  63. return request
  64. def test__token_endpoint_request():
  65. request = make_request({"test": "response"})
  66. result = _client._token_endpoint_request(
  67. request, "http://example.com", {"test": "params"}
  68. )
  69. # Check request call
  70. request.assert_called_with(
  71. method="POST",
  72. url="http://example.com",
  73. headers={"Content-Type": "application/x-www-form-urlencoded"},
  74. body="test=params".encode("utf-8"),
  75. )
  76. # Check result
  77. assert result == {"test": "response"}
  78. def test__token_endpoint_request_use_json():
  79. request = make_request({"test": "response"})
  80. result = _client._token_endpoint_request(
  81. request,
  82. "http://example.com",
  83. {"test": "params"},
  84. access_token="access_token",
  85. use_json=True,
  86. )
  87. # Check request call
  88. request.assert_called_with(
  89. method="POST",
  90. url="http://example.com",
  91. headers={
  92. "Content-Type": "application/json",
  93. "Authorization": "Bearer access_token",
  94. },
  95. body=b'{"test": "params"}',
  96. )
  97. # Check result
  98. assert result == {"test": "response"}
  99. def test__token_endpoint_request_error():
  100. request = make_request({}, status=http_client.BAD_REQUEST)
  101. with pytest.raises(exceptions.RefreshError):
  102. _client._token_endpoint_request(request, "http://example.com", {})
  103. def test__token_endpoint_request_internal_failure_error():
  104. request = make_request(
  105. {"error_description": "internal_failure"}, status=http_client.BAD_REQUEST
  106. )
  107. with pytest.raises(exceptions.RefreshError):
  108. _client._token_endpoint_request(
  109. request, "http://example.com", {"error_description": "internal_failure"}
  110. )
  111. request = make_request(
  112. {"error": "internal_failure"}, status=http_client.BAD_REQUEST
  113. )
  114. with pytest.raises(exceptions.RefreshError):
  115. _client._token_endpoint_request(
  116. request, "http://example.com", {"error": "internal_failure"}
  117. )
  118. def verify_request_params(request, params):
  119. request_body = request.call_args[1]["body"].decode("utf-8")
  120. request_params = urllib.parse.parse_qs(request_body)
  121. for key, value in six.iteritems(params):
  122. assert request_params[key][0] == value
  123. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  124. def test_jwt_grant(utcnow):
  125. request = make_request(
  126. {"access_token": "token", "expires_in": 500, "extra": "data"}
  127. )
  128. token, expiry, extra_data = _client.jwt_grant(
  129. request, "http://example.com", "assertion_value"
  130. )
  131. # Check request call
  132. verify_request_params(
  133. request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
  134. )
  135. # Check result
  136. assert token == "token"
  137. assert expiry == utcnow() + datetime.timedelta(seconds=500)
  138. assert extra_data["extra"] == "data"
  139. def test_jwt_grant_no_access_token():
  140. request = make_request(
  141. {
  142. # No access token.
  143. "expires_in": 500,
  144. "extra": "data",
  145. }
  146. )
  147. with pytest.raises(exceptions.RefreshError):
  148. _client.jwt_grant(request, "http://example.com", "assertion_value")
  149. def test_id_token_jwt_grant():
  150. now = _helpers.utcnow()
  151. id_token_expiry = _helpers.datetime_to_secs(now)
  152. id_token = jwt.encode(SIGNER, {"exp": id_token_expiry}).decode("utf-8")
  153. request = make_request({"id_token": id_token, "extra": "data"})
  154. token, expiry, extra_data = _client.id_token_jwt_grant(
  155. request, "http://example.com", "assertion_value"
  156. )
  157. # Check request call
  158. verify_request_params(
  159. request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
  160. )
  161. # Check result
  162. assert token == id_token
  163. # JWT does not store microseconds
  164. now = now.replace(microsecond=0)
  165. assert expiry == now
  166. assert extra_data["extra"] == "data"
  167. def test_id_token_jwt_grant_no_access_token():
  168. request = make_request(
  169. {
  170. # No access token.
  171. "expires_in": 500,
  172. "extra": "data",
  173. }
  174. )
  175. with pytest.raises(exceptions.RefreshError):
  176. _client.id_token_jwt_grant(request, "http://example.com", "assertion_value")
  177. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  178. def test_refresh_grant(unused_utcnow):
  179. request = make_request(
  180. {
  181. "access_token": "token",
  182. "refresh_token": "new_refresh_token",
  183. "expires_in": 500,
  184. "extra": "data",
  185. }
  186. )
  187. token, refresh_token, expiry, extra_data = _client.refresh_grant(
  188. request,
  189. "http://example.com",
  190. "refresh_token",
  191. "client_id",
  192. "client_secret",
  193. rapt_token="rapt_token",
  194. )
  195. # Check request call
  196. verify_request_params(
  197. request,
  198. {
  199. "grant_type": _client._REFRESH_GRANT_TYPE,
  200. "refresh_token": "refresh_token",
  201. "client_id": "client_id",
  202. "client_secret": "client_secret",
  203. "rapt": "rapt_token",
  204. },
  205. )
  206. # Check result
  207. assert token == "token"
  208. assert refresh_token == "new_refresh_token"
  209. assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
  210. assert extra_data["extra"] == "data"
  211. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  212. def test_refresh_grant_with_scopes(unused_utcnow):
  213. request = make_request(
  214. {
  215. "access_token": "token",
  216. "refresh_token": "new_refresh_token",
  217. "expires_in": 500,
  218. "extra": "data",
  219. "scope": SCOPES_AS_STRING,
  220. }
  221. )
  222. token, refresh_token, expiry, extra_data = _client.refresh_grant(
  223. request,
  224. "http://example.com",
  225. "refresh_token",
  226. "client_id",
  227. "client_secret",
  228. SCOPES_AS_LIST,
  229. )
  230. # Check request call.
  231. verify_request_params(
  232. request,
  233. {
  234. "grant_type": _client._REFRESH_GRANT_TYPE,
  235. "refresh_token": "refresh_token",
  236. "client_id": "client_id",
  237. "client_secret": "client_secret",
  238. "scope": SCOPES_AS_STRING,
  239. },
  240. )
  241. # Check result.
  242. assert token == "token"
  243. assert refresh_token == "new_refresh_token"
  244. assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
  245. assert extra_data["extra"] == "data"
  246. def test_refresh_grant_no_access_token():
  247. request = make_request(
  248. {
  249. # No access token.
  250. "refresh_token": "new_refresh_token",
  251. "expires_in": 500,
  252. "extra": "data",
  253. }
  254. )
  255. with pytest.raises(exceptions.RefreshError):
  256. _client.refresh_grant(
  257. request, "http://example.com", "refresh_token", "client_id", "client_secret"
  258. )