test_sts.py 18 KB


  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import http.client as http_client
  15. import json
  16. import urllib
  17. import mock
  18. import pytest # type: ignore
  19. from google.auth import exceptions
  20. from google.auth import transport
  21. from google.oauth2 import sts
  22. from google.oauth2 import utils
  23. CLIENT_ID = "username"
  24. CLIENT_SECRET = "password"
  25. # Base64 encoding of "username:password"
  26. BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
  27. class TestStsClient(object):
  28. GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
  29. RESOURCE = "https://api.example.com/"
  30. AUDIENCE = "urn:example:cooperation-context"
  31. SCOPES = ["scope1", "scope2"]
  32. REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
  33. SUBJECT_TOKEN = "HEADER.SUBJECT_TOKEN_PAYLOAD.SIGNATURE"
  34. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  35. ACTOR_TOKEN = "HEADER.ACTOR_TOKEN_PAYLOAD.SIGNATURE"
  36. ACTOR_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  37. TOKEN_EXCHANGE_ENDPOINT = "https://example.com/token.oauth2"
  38. ADDON_HEADERS = {"x-client-version": "0.1.2"}
  39. ADDON_OPTIONS = {"additional": {"non-standard": ["options"], "other": "some-value"}}
  40. SUCCESS_RESPONSE = {
  41. "access_token": "ACCESS_TOKEN",
  42. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  43. "token_type": "Bearer",
  44. "expires_in": 3600,
  45. "scope": "scope1 scope2",
  46. }
  47. SUCCESS_RESPONSE_WITH_REFRESH = {
  48. "access_token": "abc",
  49. "refresh_token": "xyz",
  50. "expires_in": 3600,
  51. }
  52. ERROR_RESPONSE = {
  53. "error": "invalid_request",
  54. "error_description": "Invalid subject token",
  55. "error_uri": "https://tools.ietf.org/html/rfc6749",
  56. }
  57. CLIENT_AUTH_BASIC = utils.ClientAuthentication(
  58. utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET
  59. )
  60. CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication(
  61. utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET
  62. )
  63. @classmethod
  64. def make_client(cls, client_auth=None):
  65. return sts.Client(cls.TOKEN_EXCHANGE_ENDPOINT, client_auth)
  66. @classmethod
  67. def make_mock_request(cls, data, status=http_client.OK):
  68. response = mock.create_autospec(transport.Response, instance=True)
  69. response.status = status
  70. response.data = json.dumps(data).encode("utf-8")
  71. request = mock.create_autospec(transport.Request)
  72. request.return_value = response
  73. return request
  74. @classmethod
  75. def assert_request_kwargs(cls, request_kwargs, headers, request_data):
  76. """Asserts the request was called with the expected parameters.
  77. """
  78. assert request_kwargs["url"] == cls.TOKEN_EXCHANGE_ENDPOINT
  79. assert request_kwargs["method"] == "POST"
  80. assert request_kwargs["headers"] == headers
  81. assert request_kwargs["body"] is not None
  82. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  83. for (k, v) in body_tuples:
  84. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  85. assert len(body_tuples) == len(request_data.keys())
  86. def test_exchange_token_full_success_without_auth(self):
  87. """Test token exchange success without client authentication using full
  88. parameters.
  89. """
  90. client = self.make_client()
  91. headers = self.ADDON_HEADERS.copy()
  92. headers["Content-Type"] = "application/x-www-form-urlencoded"
  93. request_data = {
  94. "grant_type": self.GRANT_TYPE,
  95. "resource": self.RESOURCE,
  96. "audience": self.AUDIENCE,
  97. "scope": " ".join(self.SCOPES),
  98. "requested_token_type": self.REQUESTED_TOKEN_TYPE,
  99. "subject_token": self.SUBJECT_TOKEN,
  100. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  101. "actor_token": self.ACTOR_TOKEN,
  102. "actor_token_type": self.ACTOR_TOKEN_TYPE,
  103. "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
  104. }
  105. request = self.make_mock_request(
  106. status=http_client.OK, data=self.SUCCESS_RESPONSE
  107. )
  108. response = client.exchange_token(
  109. request,
  110. self.GRANT_TYPE,
  111. self.SUBJECT_TOKEN,
  112. self.SUBJECT_TOKEN_TYPE,
  113. self.RESOURCE,
  114. self.AUDIENCE,
  115. self.SCOPES,
  116. self.REQUESTED_TOKEN_TYPE,
  117. self.ACTOR_TOKEN,
  118. self.ACTOR_TOKEN_TYPE,
  119. self.ADDON_OPTIONS,
  120. self.ADDON_HEADERS,
  121. )
  122. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  123. assert response == self.SUCCESS_RESPONSE
  124. def test_exchange_token_partial_success_without_auth(self):
  125. """Test token exchange success without client authentication using
  126. partial (required only) parameters.
  127. """
  128. client = self.make_client()
  129. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  130. request_data = {
  131. "grant_type": self.GRANT_TYPE,
  132. "audience": self.AUDIENCE,
  133. "requested_token_type": self.REQUESTED_TOKEN_TYPE,
  134. "subject_token": self.SUBJECT_TOKEN,
  135. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  136. }
  137. request = self.make_mock_request(
  138. status=http_client.OK, data=self.SUCCESS_RESPONSE
  139. )
  140. response = client.exchange_token(
  141. request,
  142. grant_type=self.GRANT_TYPE,
  143. subject_token=self.SUBJECT_TOKEN,
  144. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  145. audience=self.AUDIENCE,
  146. requested_token_type=self.REQUESTED_TOKEN_TYPE,
  147. )
  148. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  149. assert response == self.SUCCESS_RESPONSE
  150. def test_exchange_token_non200_without_auth(self):
  151. """Test token exchange without client auth responding with non-200 status.
  152. """
  153. client = self.make_client()
  154. request = self.make_mock_request(
  155. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  156. )
  157. with pytest.raises(exceptions.OAuthError) as excinfo:
  158. client.exchange_token(
  159. request,
  160. self.GRANT_TYPE,
  161. self.SUBJECT_TOKEN,
  162. self.SUBJECT_TOKEN_TYPE,
  163. self.RESOURCE,
  164. self.AUDIENCE,
  165. self.SCOPES,
  166. self.REQUESTED_TOKEN_TYPE,
  167. self.ACTOR_TOKEN,
  168. self.ACTOR_TOKEN_TYPE,
  169. self.ADDON_OPTIONS,
  170. self.ADDON_HEADERS,
  171. )
  172. assert excinfo.match(
  173. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  174. )
  175. def test_exchange_token_full_success_with_basic_auth(self):
  176. """Test token exchange success with basic client authentication using full
  177. parameters.
  178. """
  179. client = self.make_client(self.CLIENT_AUTH_BASIC)
  180. headers = self.ADDON_HEADERS.copy()
  181. headers["Content-Type"] = "application/x-www-form-urlencoded"
  182. headers["Authorization"] = "Basic {}".format(BASIC_AUTH_ENCODING)
  183. request_data = {
  184. "grant_type": self.GRANT_TYPE,
  185. "resource": self.RESOURCE,
  186. "audience": self.AUDIENCE,
  187. "scope": " ".join(self.SCOPES),
  188. "requested_token_type": self.REQUESTED_TOKEN_TYPE,
  189. "subject_token": self.SUBJECT_TOKEN,
  190. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  191. "actor_token": self.ACTOR_TOKEN,
  192. "actor_token_type": self.ACTOR_TOKEN_TYPE,
  193. "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
  194. }
  195. request = self.make_mock_request(
  196. status=http_client.OK, data=self.SUCCESS_RESPONSE
  197. )
  198. response = client.exchange_token(
  199. request,
  200. self.GRANT_TYPE,
  201. self.SUBJECT_TOKEN,
  202. self.SUBJECT_TOKEN_TYPE,
  203. self.RESOURCE,
  204. self.AUDIENCE,
  205. self.SCOPES,
  206. self.REQUESTED_TOKEN_TYPE,
  207. self.ACTOR_TOKEN,
  208. self.ACTOR_TOKEN_TYPE,
  209. self.ADDON_OPTIONS,
  210. self.ADDON_HEADERS,
  211. )
  212. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  213. assert response == self.SUCCESS_RESPONSE
  214. def test_exchange_token_partial_success_with_basic_auth(self):
  215. """Test token exchange success with basic client authentication using
  216. partial (required only) parameters.
  217. """
  218. client = self.make_client(self.CLIENT_AUTH_BASIC)
  219. headers = {
  220. "Content-Type": "application/x-www-form-urlencoded",
  221. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  222. }
  223. request_data = {
  224. "grant_type": self.GRANT_TYPE,
  225. "audience": self.AUDIENCE,
  226. "requested_token_type": self.REQUESTED_TOKEN_TYPE,
  227. "subject_token": self.SUBJECT_TOKEN,
  228. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  229. }
  230. request = self.make_mock_request(
  231. status=http_client.OK, data=self.SUCCESS_RESPONSE
  232. )
  233. response = client.exchange_token(
  234. request,
  235. grant_type=self.GRANT_TYPE,
  236. subject_token=self.SUBJECT_TOKEN,
  237. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  238. audience=self.AUDIENCE,
  239. requested_token_type=self.REQUESTED_TOKEN_TYPE,
  240. )
  241. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  242. assert response == self.SUCCESS_RESPONSE
  243. def test_exchange_token_non200_with_basic_auth(self):
  244. """Test token exchange with basic client auth responding with non-200
  245. status.
  246. """
  247. client = self.make_client(self.CLIENT_AUTH_BASIC)
  248. request = self.make_mock_request(
  249. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  250. )
  251. with pytest.raises(exceptions.OAuthError) as excinfo:
  252. client.exchange_token(
  253. request,
  254. self.GRANT_TYPE,
  255. self.SUBJECT_TOKEN,
  256. self.SUBJECT_TOKEN_TYPE,
  257. self.RESOURCE,
  258. self.AUDIENCE,
  259. self.SCOPES,
  260. self.REQUESTED_TOKEN_TYPE,
  261. self.ACTOR_TOKEN,
  262. self.ACTOR_TOKEN_TYPE,
  263. self.ADDON_OPTIONS,
  264. self.ADDON_HEADERS,
  265. )
  266. assert excinfo.match(
  267. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  268. )
  269. def test_exchange_token_full_success_with_reqbody_auth(self):
  270. """Test token exchange success with request body client authenticaiton
  271. using full parameters.
  272. """
  273. client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
  274. headers = self.ADDON_HEADERS.copy()
  275. headers["Content-Type"] = "application/x-www-form-urlencoded"
  276. request_data = {
  277. "grant_type": self.GRANT_TYPE,
  278. "resource": self.RESOURCE,
  279. "audience": self.AUDIENCE,
  280. "scope": " ".join(self.SCOPES),
  281. "requested_token_type": self.REQUESTED_TOKEN_TYPE,
  282. "subject_token": self.SUBJECT_TOKEN,
  283. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  284. "actor_token": self.ACTOR_TOKEN,
  285. "actor_token_type": self.ACTOR_TOKEN_TYPE,
  286. "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
  287. "client_id": CLIENT_ID,
  288. "client_secret": CLIENT_SECRET,
  289. }
  290. request = self.make_mock_request(
  291. status=http_client.OK, data=self.SUCCESS_RESPONSE
  292. )
  293. response = client.exchange_token(
  294. request,
  295. self.GRANT_TYPE,
  296. self.SUBJECT_TOKEN,
  297. self.SUBJECT_TOKEN_TYPE,
  298. self.RESOURCE,
  299. self.AUDIENCE,
  300. self.SCOPES,
  301. self.REQUESTED_TOKEN_TYPE,
  302. self.ACTOR_TOKEN,
  303. self.ACTOR_TOKEN_TYPE,
  304. self.ADDON_OPTIONS,
  305. self.ADDON_HEADERS,
  306. )
  307. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  308. assert response == self.SUCCESS_RESPONSE
  309. def test_exchange_token_partial_success_with_reqbody_auth(self):
  310. """Test token exchange success with request body client authentication
  311. using partial (required only) parameters.
  312. """
  313. client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
  314. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  315. request_data = {
  316. "grant_type": self.GRANT_TYPE,
  317. "audience": self.AUDIENCE,
  318. "requested_token_type": self.REQUESTED_TOKEN_TYPE,
  319. "subject_token": self.SUBJECT_TOKEN,
  320. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  321. "client_id": CLIENT_ID,
  322. "client_secret": CLIENT_SECRET,
  323. }
  324. request = self.make_mock_request(
  325. status=http_client.OK, data=self.SUCCESS_RESPONSE
  326. )
  327. response = client.exchange_token(
  328. request,
  329. grant_type=self.GRANT_TYPE,
  330. subject_token=self.SUBJECT_TOKEN,
  331. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  332. audience=self.AUDIENCE,
  333. requested_token_type=self.REQUESTED_TOKEN_TYPE,
  334. )
  335. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  336. assert response == self.SUCCESS_RESPONSE
  337. def test_exchange_token_non200_with_reqbody_auth(self):
  338. """Test token exchange with POST request body client auth responding
  339. with non-200 status.
  340. """
  341. client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
  342. request = self.make_mock_request(
  343. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  344. )
  345. with pytest.raises(exceptions.OAuthError) as excinfo:
  346. client.exchange_token(
  347. request,
  348. self.GRANT_TYPE,
  349. self.SUBJECT_TOKEN,
  350. self.SUBJECT_TOKEN_TYPE,
  351. self.RESOURCE,
  352. self.AUDIENCE,
  353. self.SCOPES,
  354. self.REQUESTED_TOKEN_TYPE,
  355. self.ACTOR_TOKEN,
  356. self.ACTOR_TOKEN_TYPE,
  357. self.ADDON_OPTIONS,
  358. self.ADDON_HEADERS,
  359. )
  360. assert excinfo.match(
  361. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  362. )
  363. def test_refresh_token_success(self):
  364. """Test refresh token with successful response."""
  365. client = self.make_client(self.CLIENT_AUTH_BASIC)
  366. request = self.make_mock_request(
  367. status=http_client.OK, data=self.SUCCESS_RESPONSE
  368. )
  369. response = client.refresh_token(request, "refreshtoken")
  370. headers = {
  371. "Authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
  372. "Content-Type": "application/x-www-form-urlencoded",
  373. }
  374. request_data = {"grant_type": "refresh_token", "refresh_token": "refreshtoken"}
  375. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  376. assert response == self.SUCCESS_RESPONSE
  377. def test_refresh_token_success_with_refresh(self):
  378. """Test refresh token with successful response."""
  379. client = self.make_client(self.CLIENT_AUTH_BASIC)
  380. request = self.make_mock_request(
  381. status=http_client.OK, data=self.SUCCESS_RESPONSE_WITH_REFRESH
  382. )
  383. response = client.refresh_token(request, "refreshtoken")
  384. headers = {
  385. "Authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
  386. "Content-Type": "application/x-www-form-urlencoded",
  387. }
  388. request_data = {"grant_type": "refresh_token", "refresh_token": "refreshtoken"}
  389. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  390. assert response == self.SUCCESS_RESPONSE_WITH_REFRESH
  391. def test_refresh_token_failure(self):
  392. """Test refresh token with failure response."""
  393. client = self.make_client(self.CLIENT_AUTH_BASIC)
  394. request = self.make_mock_request(
  395. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  396. )
  397. with pytest.raises(exceptions.OAuthError) as excinfo:
  398. client.refresh_token(request, "refreshtoken")
  399. assert excinfo.match(
  400. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  401. )
  402. def test__make_request_success(self):
  403. """Test base method with successful response."""
  404. client = self.make_client(self.CLIENT_AUTH_BASIC)
  405. request = self.make_mock_request(
  406. status=http_client.OK, data=self.SUCCESS_RESPONSE
  407. )
  408. response = client._make_request(request, {"a": "b"}, {"c": "d"})
  409. headers = {
  410. "Authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
  411. "Content-Type": "application/x-www-form-urlencoded",
  412. "a": "b",
  413. }
  414. request_data = {"c": "d"}
  415. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  416. assert response == self.SUCCESS_RESPONSE
  417. def test_make_request_failure(self):
  418. """Test refresh token with failure response."""
  419. client = self.make_client(self.CLIENT_AUTH_BASIC)
  420. request = self.make_mock_request(
  421. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  422. )
  423. with pytest.raises(exceptions.OAuthError) as excinfo:
  424. client._make_request(request, {"a": "b"}, {"c": "d"})
  425. assert excinfo.match(
  426. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  427. )